基本思路
使用psutil采集进程数据,通过pid对比提取出父子关系。
示例
import psutil
import os
import win32api
import argparse
import csv
import json
from datetime import datetime
# 常见 Windows 系统关键进程(不完全列表,用于标记)
SYSTEM_PROCESS_NAMES = {
"System", "Idle", "smss.exe", "csrss.exe", "wininit.exe", "winlogon.exe",
"services.exe", "lsass.exe", "lsm.exe", "svchost.exe", "dwm.exe",
"conhost.exe", "fontdrvhost.exe", "audiodg.exe", "taskhostw.exe",
"RuntimeBroker.exe", "sihost.exe", "ctfmon.exe", "dllhost.exe"
}
def get_file_version_info(file_path):
try:
lang, codepage = win32api.GetFileVersionInfo(file_path, '\\VarFileInfo\\Translation')[0]
string_file_info = f'\\StringFileInfo\\{lang:04x}{codepage:04x}\\'
company = win32api.GetFileVersionInfo(file_path, string_file_info + 'CompanyName')
description = win32api.GetFileVersionInfo(file_path, string_file_info + 'FileDescription')
return (company or "Unknown").strip(), (description or "No description").strip()
except Exception:
return "Unknown", "【版本信息不可读】文件可能损坏、非标准PE格式或权限不足"
def get_process_source(proc):
"""推断进程启动来源"""
try:
parent_pid = proc.ppid()
if parent_pid == 0:
return "系统初始进程"
parent = psutil.Process(parent_pid)
parent_name = parent.name()
if parent_name.lower() == "explorer.exe":
return "用户启动(通过资源管理器)"
elif parent_name.lower() in ["cmd.exe", "powershell.exe", "pwsh.exe", "windowsterminal.exe"]:
return "用户启动(通过命令行)"
elif parent_name.lower() in ["services.exe", "svchost.exe"]:
return "系统服务启动"
else:
return f"{parent_name}"
except (psutil.NoSuchProcess, psutil.AccessDenied):
return "父进程已退出或无权限访问"
except Exception:
return "无法确定来源"
def collect_running_processes():
processes_info = []
# 获取当前登录用户名(用于辅助判断)
current_user = None
try:
current_user = psutil.users()[0].name if psutil.users() else "Unknown"
except Exception:
current_user = "Unknown"
for proc in psutil.process_iter(['pid', 'name', 'username']):
pid = proc.info['pid']
name = proc.info['name']
username = proc.info.get('username') or "Unknown"
exe = None
company = "Unknown"
description = ""
tag = "普通进程"
source = "未知来源"
# 判断是否为系统进程
if name in SYSTEM_PROCESS_NAMES or pid in (0, 4):
tag = "系统进程"
# 尝试获取 Source(需先获取 ppid)
try:
source = get_process_source(proc)
except Exception:
source = "无法确定来源"
try:
exe = proc.exe()
except (psutil.AccessDenied, psutil.NoSuchProcess):
processes_info.append({
'PID': pid,
'Name': name,
'Path': "[不可访问]",
'Company': "Unknown",
'Description': "【权限受限】无法访问进程信息(可能为受保护系统进程)",
'Tag': tag,
'Source': source
})
continue
except Exception as e:
processes_info.append({
'PID': pid,
'Name': name,
'Path': "[错误]",
'Company': "Unknown",
'Description': f"【未知错误】{str(e)}",
'Tag': tag,
'Source': source
})
continue
if not exe or exe == "":
processes_info.append({
'PID': pid,
'Name': name,
'Path': "[无]",
'Company': "Microsoft Corporation",
'Description': "【特殊进程】无对应可执行文件(如 System、Idle 等内核实体)",
'Tag': "系统进程", # 强制标记
'Source': source
})
continue
if not os.path.exists(exe):
processes_info.append({
'PID': pid,
'Name': name,
'Path': exe,
'Company': "Unknown",
'Description': "【路径无效】可执行文件已删除或为虚拟进程",
'Tag': tag,
'Source': source
})
continue
# 正常读取版本信息
try:
company, file_desc = get_file_version_info(exe)
if file_desc.startswith("【"):
description = file_desc
else:
description = file_desc or "No description"
except Exception:
description = "【版本信息不可读】读取文件资源失败"
processes_info.append({
'PID': pid,
'Name': name,
'Path': exe,
'Company': company,
'Description': description,
'Tag': tag,
'Source': source
})
return processes_info
def save_to_csv(data, filename):
fieldnames = ['PID', 'Name', 'Path', 'Company', 'Description', 'Tag', 'Source']
with open(filename, mode='w', newline='', encoding='utf-8-sig') as f:
writer = csv.DictWriter(f, fieldnames=fieldnames)
writer.writeheader()
writer.writerows(data)
print(f"✅ 数据已保存至 CSV 文件: {filename}")
def save_to_json(data, filename):
with open(filename, mode='w', encoding='utf-8') as f:
json.dump(data, f, indent=4, ensure_ascii=False)
print(f"✅ 数据已保存至 JSON 文件: {filename}")
def main():
parser = argparse.ArgumentParser(description="采集 Windows 进程信息(含系统标记与启动来源)")
parser.add_argument('-f', '--format', choices=['csv', 'json', 'both'], help="导出格式")
parser.add_argument('-o', '--output', default=None, help="输出文件名前缀")
args = parser.parse_args()
print("正在收集进程信息(含 Tag 与 Source 字段)...\n")
processes = collect_running_processes()
processes.sort(key=lambda x: (x['Tag'] != '系统进程', x['Name'].lower())) # 系统进程排前面
if not args.format:
print(f"共收集到 {len(processes)} 个进程。\n")
print("示例(前8条):")
print(f"{'PID':<6} {'Name':<20} {'Tag':<8} {'Source'}")
print("-" * 70)
for p in processes[:8]:
print(f"{p['PID']:<6} {p['Name']:<20} {p['Tag']:<8} {p['Source'][:40]}")
print("\n使用 -f csv/json 导出完整数据。")
return
base_name = args.output or f"processes_{datetime.now().strftime('%Y%m%d_%H%M%S')}"
if args.format in ['csv', 'both']:
save_to_csv(processes, f"{base_name}.csv")
if args.format in ['json', 'both']:
save_to_json(processes, f"{base_name}.json")
if __name__ == "__main__":
main() 

Comments | NOTHING