如何通过python采集windows正在运行的进程信息


基本思路

使用psutil采集进程数据,通过pid对比提取出父子关系。

示例

import psutil
import os
import win32api
import argparse
import csv
import json
from datetime import datetime
# 常见 Windows 系统关键进程(不完全列表,用于标记)
SYSTEM_PROCESS_NAMES = {
    "System", "Idle", "smss.exe", "csrss.exe", "wininit.exe", "winlogon.exe",
    "services.exe", "lsass.exe", "lsm.exe", "svchost.exe", "dwm.exe",
    "conhost.exe", "fontdrvhost.exe", "audiodg.exe", "taskhostw.exe",
    "RuntimeBroker.exe", "sihost.exe", "ctfmon.exe", "dllhost.exe"
}
def get_file_version_info(file_path):
    try:
        lang, codepage = win32api.GetFileVersionInfo(file_path, '\\VarFileInfo\\Translation')[0]
        string_file_info = f'\\StringFileInfo\\{lang:04x}{codepage:04x}\\'
        company = win32api.GetFileVersionInfo(file_path, string_file_info + 'CompanyName')
        description = win32api.GetFileVersionInfo(file_path, string_file_info + 'FileDescription')
        return (company or "Unknown").strip(), (description or "No description").strip()
    except Exception:
        return "Unknown", "【版本信息不可读】文件可能损坏、非标准PE格式或权限不足"
def get_process_source(proc):
    """推断进程启动来源"""
    try:
        parent_pid = proc.ppid()
        if parent_pid == 0:
            return "系统初始进程"
        parent = psutil.Process(parent_pid)
        parent_name = parent.name()
        if parent_name.lower() == "explorer.exe":
            return "用户启动(通过资源管理器)"
        elif parent_name.lower() in ["cmd.exe", "powershell.exe", "pwsh.exe", "windowsterminal.exe"]:
            return "用户启动(通过命令行)"
        elif parent_name.lower() in ["services.exe", "svchost.exe"]:
            return "系统服务启动"
        else:
            return f"{parent_name}"
    except (psutil.NoSuchProcess, psutil.AccessDenied):
        return "父进程已退出或无权限访问"
    except Exception:
        return "无法确定来源"

def collect_running_processes():
    processes_info = []
    # 获取当前登录用户名(用于辅助判断)
    current_user = None
    try:
        current_user = psutil.users()[0].name if psutil.users() else "Unknown"
    except Exception:
        current_user = "Unknown"
    for proc in psutil.process_iter(['pid', 'name', 'username']):
        pid = proc.info['pid']
        name = proc.info['name']
        username = proc.info.get('username') or "Unknown"
        exe = None
        company = "Unknown"
        description = ""
        tag = "普通进程"
        source = "未知来源"
        # 判断是否为系统进程
        if name in SYSTEM_PROCESS_NAMES or pid in (0, 4):
            tag = "系统进程"
        # 尝试获取 Source(需先获取 ppid)
        try:
            source = get_process_source(proc)
        except Exception:
            source = "无法确定来源"
        try:
            exe = proc.exe()
        except (psutil.AccessDenied, psutil.NoSuchProcess):
            processes_info.append({
                'PID': pid,
                'Name': name,
                'Path': "[不可访问]",
                'Company': "Unknown",
                'Description': "【权限受限】无法访问进程信息(可能为受保护系统进程)",
                'Tag': tag,
                'Source': source
            })
            continue
        except Exception as e:
            processes_info.append({
                'PID': pid,
                'Name': name,
                'Path': "[错误]",
                'Company': "Unknown",
                'Description': f"【未知错误】{str(e)}",
                'Tag': tag,
                'Source': source
            })
            continue
        if not exe or exe == "":
            processes_info.append({
                'PID': pid,
                'Name': name,
                'Path': "[无]",
                'Company': "Microsoft Corporation",
                'Description': "【特殊进程】无对应可执行文件(如 System、Idle 等内核实体)",
                'Tag': "系统进程",  # 强制标记
                'Source': source
            })
            continue
        if not os.path.exists(exe):
            processes_info.append({
                'PID': pid,
                'Name': name,
                'Path': exe,
                'Company': "Unknown",
                'Description': "【路径无效】可执行文件已删除或为虚拟进程",
                'Tag': tag,
                'Source': source
            })
            continue
        # 正常读取版本信息
        try:
            company, file_desc = get_file_version_info(exe)
            if file_desc.startswith("【"):
                description = file_desc
            else:
                description = file_desc or "No description"
        except Exception:
            description = "【版本信息不可读】读取文件资源失败"
        processes_info.append({
            'PID': pid,
            'Name': name,
            'Path': exe,
            'Company': company,
            'Description': description,
            'Tag': tag,
            'Source': source
        })
    return processes_info
def save_to_csv(data, filename):
    fieldnames = ['PID', 'Name', 'Path', 'Company', 'Description', 'Tag', 'Source']
    with open(filename, mode='w', newline='', encoding='utf-8-sig') as f:
        writer = csv.DictWriter(f, fieldnames=fieldnames)
        writer.writeheader()
        writer.writerows(data)
    print(f"✅ 数据已保存至 CSV 文件: {filename}")
def save_to_json(data, filename):
    with open(filename, mode='w', encoding='utf-8') as f:
        json.dump(data, f, indent=4, ensure_ascii=False)
    print(f"✅ 数据已保存至 JSON 文件: {filename}")
def main():
    parser = argparse.ArgumentParser(description="采集 Windows 进程信息(含系统标记与启动来源)")
    parser.add_argument('-f', '--format', choices=['csv', 'json', 'both'], help="导出格式")
    parser.add_argument('-o', '--output', default=None, help="输出文件名前缀")
    args = parser.parse_args()
    print("正在收集进程信息(含 Tag 与 Source 字段)...\n")
    processes = collect_running_processes()
    processes.sort(key=lambda x: (x['Tag'] != '系统进程', x['Name'].lower()))  # 系统进程排前面
    if not args.format:
        print(f"共收集到 {len(processes)} 个进程。\n")
        print("示例(前8条):")
        print(f"{'PID':<6} {'Name':<20} {'Tag':<8} {'Source'}")
        print("-" * 70)
        for p in processes[:8]:
            print(f"{p['PID']:<6} {p['Name']:<20} {p['Tag']:<8} {p['Source'][:40]}")
        print("\n使用 -f csv/json 导出完整数据。")
        return
    base_name = args.output or f"processes_{datetime.now().strftime('%Y%m%d_%H%M%S')}"
    if args.format in ['csv', 'both']:
        save_to_csv(processes, f"{base_name}.csv")
    if args.format in ['json', 'both']:
        save_to_json(processes, f"{base_name}.json")
if __name__ == "__main__":
    main()

声明:一代明君的小屋|版权所有,违者必究|如未注明,均为原创|本网站采用BY-NC-SA协议进行授权

转载:转载请注明原文链接 - 如何通过python采集windows正在运行的进程信息


欢迎来到我的小屋