如何通过python采集windows正在运行的进程信息 最后更新时间:2025年12月04日 ### 基本思路 使用`psutil`采集进程数据,通过pid对比提取出父子关系。 ### 示例 ```python import psutil import os import win32api import argparse import csv import json from datetime import datetime # 常见 Windows 系统关键进程(不完全列表,用于标记) SYSTEM_PROCESS_NAMES = { "System", "Idle", "smss.exe", "csrss.exe", "wininit.exe", "winlogon.exe", "services.exe", "lsass.exe", "lsm.exe", "svchost.exe", "dwm.exe", "conhost.exe", "fontdrvhost.exe", "audiodg.exe", "taskhostw.exe", "RuntimeBroker.exe", "sihost.exe", "ctfmon.exe", "dllhost.exe" } def get_file_version_info(file_path): try: lang, codepage = win32api.GetFileVersionInfo(file_path, '\\VarFileInfo\\Translation')[0] string_file_info = f'\\StringFileInfo\\{lang:04x}{codepage:04x}\\' company = win32api.GetFileVersionInfo(file_path, string_file_info + 'CompanyName') description = win32api.GetFileVersionInfo(file_path, string_file_info + 'FileDescription') return (company or "Unknown").strip(), (description or "No description").strip() except Exception: return "Unknown", "【版本信息不可读】文件可能损坏、非标准PE格式或权限不足" def get_process_source(proc): """推断进程启动来源""" try: parent_pid = proc.ppid() if parent_pid == 0: return "系统初始进程" parent = psutil.Process(parent_pid) parent_name = parent.name() if parent_name.lower() == "explorer.exe": return "用户启动(通过资源管理器)" elif parent_name.lower() in ["cmd.exe", "powershell.exe", "pwsh.exe", "windowsterminal.exe"]: return "用户启动(通过命令行)" elif parent_name.lower() in ["services.exe", "svchost.exe"]: return "系统服务启动" else: return f"{parent_name}" except (psutil.NoSuchProcess, psutil.AccessDenied): return "父进程已退出或无权限访问" except Exception: return "无法确定来源" def collect_running_processes(): processes_info = [] # 获取当前登录用户名(用于辅助判断) current_user = None try: current_user = psutil.users()[0].name if psutil.users() else "Unknown" except Exception: current_user = "Unknown" for proc in psutil.process_iter(['pid', 'name', 'username']): pid = proc.info['pid'] name = proc.info['name'] username = proc.info.get('username') or "Unknown" exe = None company = "Unknown" description = "" tag = "普通进程" source = "未知来源" # 判断是否为系统进程 if name in SYSTEM_PROCESS_NAMES or pid in (0, 4): tag = "系统进程" # 尝试获取 Source(需先获取 ppid) try: source = get_process_source(proc) except Exception: source = "无法确定来源" try: exe = proc.exe() except (psutil.AccessDenied, psutil.NoSuchProcess): processes_info.append({ 'PID': pid, 'Name': name, 'Path': "[不可访问]", 'Company': "Unknown", 'Description': "【权限受限】无法访问进程信息(可能为受保护系统进程)", 'Tag': tag, 'Source': source }) continue except Exception as e: processes_info.append({ 'PID': pid, 'Name': name, 'Path': "[错误]", 'Company': "Unknown", 'Description': f"【未知错误】{str(e)}", 'Tag': tag, 'Source': source }) continue if not exe or exe == "": processes_info.append({ 'PID': pid, 'Name': name, 'Path': "[无]", 'Company': "Microsoft Corporation", 'Description': "【特殊进程】无对应可执行文件(如 System、Idle 等内核实体)", 'Tag': "系统进程", # 强制标记 'Source': source }) continue if not os.path.exists(exe): processes_info.append({ 'PID': pid, 'Name': name, 'Path': exe, 'Company': "Unknown", 'Description': "【路径无效】可执行文件已删除或为虚拟进程", 'Tag': tag, 'Source': source }) continue # 正常读取版本信息 try: company, file_desc = get_file_version_info(exe) if file_desc.startswith("【"): description = file_desc else: description = file_desc or "No description" except Exception: description = "【版本信息不可读】读取文件资源失败" processes_info.append({ 'PID': pid, 'Name': name, 'Path': exe, 'Company': company, 'Description': description, 'Tag': tag, 'Source': source }) return processes_info def save_to_csv(data, filename): fieldnames = ['PID', 'Name', 'Path', 'Company', 'Description', 'Tag', 'Source'] with open(filename, mode='w', newline='', encoding='utf-8-sig') as f: writer = csv.DictWriter(f, fieldnames=fieldnames) writer.writeheader() writer.writerows(data) print(f"✅ 数据已保存至 CSV 文件: {filename}") def save_to_json(data, filename): with open(filename, mode='w', encoding='utf-8') as f: json.dump(data, f, indent=4, ensure_ascii=False) print(f"✅ 数据已保存至 JSON 文件: {filename}") def main(): parser = argparse.ArgumentParser(description="采集 Windows 进程信息(含系统标记与启动来源)") parser.add_argument('-f', '--format', choices=['csv', 'json', 'both'], help="导出格式") parser.add_argument('-o', '--output', default=None, help="输出文件名前缀") args = parser.parse_args() print("正在收集进程信息(含 Tag 与 Source 字段)...\n") processes = collect_running_processes() processes.sort(key=lambda x: (x['Tag'] != '系统进程', x['Name'].lower())) # 系统进程排前面 if not args.format: print(f"共收集到 {len(processes)} 个进程。\n") print("示例(前8条):") print(f"{'PID':<6} {'Name':<20} {'Tag':<8} {'Source'}") print("-" * 70) for p in processes[:8]: print(f"{p['PID']:<6} {p['Name']:<20} {p['Tag']:<8} {p['Source'][:40]}") print("\n使用 -f csv/json 导出完整数据。") return base_name = args.output or f"processes_{datetime.now().strftime('%Y%m%d_%H%M%S')}" if args.format in ['csv', 'both']: save_to_csv(processes, f"{base_name}.csv") if args.format in ['json', 'both']: save_to_json(processes, f"{base_name}.json") if __name__ == "__main__": main() ```
Comments | NOTHING