python自动化过程中如何通过win32api实现键盘鼠标的监听(不阻塞线程)


前言

一直以来都想要给之前开发的自动化库pyautoass加上键鼠监听的功能。可是原本的pywin32恰好没有接入hook相关的api。再加上之前从没有学习和研究过原本的win32api以及win的运行机制。没想到这个功能居然成了规划中难度较大的部分。好在通过学习了大量的资料,以及对pynput库的源码的阅读,终于理解和实现了键盘鼠标的监听功能。

所用到的库

  1. pywin32
  2. ctypes

所使用到的win32api

  1. SetWindowsHookEx(), 将用户定义的钩子函数添加到钩子链中, 也就是我们的注册钩子函数
  2. UnhookWindowsHookEx(), 卸载钩子函数
  3. CallNextHookEx()在我们的钩子函数中必须调用, 这样才能让程序的传递消息
  4. GetMessageW(),用于监听的核心,用来消耗按键的消息,从调用线程的消息队列中检索消息。 函数调度传入的已发送消息,直到已发布的消息可供检索。因为启动后会阻塞一个线程,所以可以实现监听的作用。
  5. PeekMessageA(),调度传入的非排队消息,检查线程消息队列中是否存在已发布的消息,并检索 (消息(如果存在任何) )。不同于GetMessage(),该方法不会阻塞线程,只检查是否存在已发布的消息,并检索,如果不存在就直接结束了。
  6. PostMessageA(),将 (帖子) 与创建指定窗口的线程关联的消息队列中,并在不等待线程处理消息的情况下返回。主要是用来最后销毁线程。
  7. GetCurrentThreadId(),检索调用线程的线程标识符。

引用内容

在没有钩子函数的情况下windows程序运行机制

键盘输入 --> 系统消息队列 --> 对应应用程序的消息队列 --> 将消息发送到对应的窗口中
在有了钩子函数的情况下windows程序运行机制
键盘输入 --> 系统消息队列 --> 对应应用程序消息队列 --> 将消息发送到钩子链中 --> 消息一一调用完毕所有的钩子函数(需要调用CallNextHookEx函数才能将消息传递下去) --> 将消息发送到对应的窗口中

示例代码

注意:
在程序中, 我们通过CFUNCTYPE返回一个类对象, 通过该类对象可以实例化出我们需要的c类型的函数, 但是如果不将他放在全局的话则会失去效果, 因为在C语言中函数是全局的

from ctypes import *
from ctypes import wintypes
import win32con
SetWindowsHookEx = windll.user32.SetWindowsHookExA
UnhookWindowsHookEx = windll.user32.UnhookWindowsHookEx
CallNextHookEx = windll.user32.CallNextHookEx
GetMessage = windll.user32.GetMessageW
GetModuleHandle = windll.kernel32.GetModuleHandleW
PeekMessage = windll.user32.PeekMessageA
PostThreadMessage = windll.user32.PostThreadMessageW
GetCurrentThreadId = windll.kernel32.GetCurrentThreadId
GetCurrentThreadId.restype = wintypes.DWORD
global t_id
t_id = GetCurrentThreadId()
# 保存键盘钩子函数句柄
global keyboard_hd
keyboard_hd = None
# 保存鼠标钩子函数句柄
global mouse_hd
mouse_hd = None
global msg
msg = wintypes.MSG()
win32con.WM_QUIT
class KBDLLHOOKSTRUCT(Structure):
    _fields_ = [
        ('vkCode', c_int),
        ('scanCode', c_int),
        ('flags', c_int),
        ('time', c_int),
        ('dwExtraInfo', c_uint),
        ('', c_void_p)
    ]
class POINT(Structure):
    _fields_ = [
        ('x', c_long),
        ('y', c_long)
    ]
def wait_for_msg():
    print("监听开始")
    msg = wintypes.MSG()
    lpmsg = byref(msg)
    GetMessage(lpmsg, None, 0, 0)
def keyboard_pro(nCode, wParam, lParam):
    """
    函数功能:键盘钩子函数,当有按键按下时此函数被回调
    """
    if nCode == win32con.HC_ACTION:
        KBDLLHOOKSTRUCT_p = POINTER(KBDLLHOOKSTRUCT)
        param = cast(lParam, KBDLLHOOKSTRUCT_p)
        print(f"按下了:{param.contents.vkCode}")
        # 用于退出监听
        if param.contents.vkCode == 27 and wParam == 256: 
            stop_keyboard_hook()
    else:
        KBDLLHOOKSTRUCT_p = POINTER(KBDLLHOOKSTRUCT)
        param = cast(lParam, KBDLLHOOKSTRUCT_p)
        return CallNextHookEx(None, None, None, None)
        # return None
    global msg
    global keyboard_hd
    return CallNextHookEx(keyboard_hd, nCode, wParam, lParam)
class StartKeyboardHook:
    """
    函数功能:启动键盘监听
    """
    def __enter__(self):
        # try:
        global keyboard_hd
        HOOKPROTYPE = CFUNCTYPE(c_int, c_int, c_int, POINTER(c_void_p))
        pointer = HOOKPROTYPE(keyboard_pro)
        keyboard_hd = SetWindowsHookEx(
            win32con.WH_KEYBOARD_LL,
            pointer,
            None,
            0)
        wait_for_msg()
    def __exit__(self, type, value, trace):
        stop_keyboard_hook()
def stop_keyboard_hook():
    """
    函数功能:停止键盘监听
    """
    try:
        global keyboard_hd
        global t_id
        UnhookWindowsHookEx(keyboard_hd) # 卸载钩子
        PostThreadMessage(t_id, 0x0401, 0, 0) # 销毁线程
    except:
        pass
class testIter:
    def __iter__(self):
        while 1:
            print("112")
            global msg
            msg = wintypes.MSG()
            lpmsg = byref(msg)
            GetMessage(lpmsg, None, 0, 0)
            yield msg
if __name__ == '__main__':
    with StartKeyboardHook():
        ...
    print("监听结束了")

参考资料

  1. pynput源码
  2. SetWindowsHookEx - 微软windows引用开发文档
  3. getMessage - 微软windows引用开发文档
  4. GetCurrentThreadId - 微软windows引用开发文档
  5. Python监听键盘和鼠标事件
  6. python 在windows下监听键盘按键
  7. Windows窗口与消息:钩子
  8. Windows中的Hook机制

学习过程中混乱的源代码用例

# coding = gbk
from threading import Thread
from ctypes import *
from ctypes import wintypes
import os
import sys
import win32con
import time
SetWindowsHookEx = windll.user32.SetWindowsHookExA
UnhookWindowsHookEx = windll.user32.UnhookWindowsHookEx
CallNextHookEx = windll.user32.CallNextHookEx
GetMessage = windll.user32.GetMessageW
GetModuleHandle = windll.kernel32.GetModuleHandleW
PeekMessage = windll.user32.PeekMessageA
PostThreadMessage = windll.user32.PostThreadMessageW
GetCurrentThreadId = windll.kernel32.GetCurrentThreadId
GetCurrentThreadId.restype = wintypes.DWORD
global t_id
t_id = GetCurrentThreadId()
# 保存键盘钩子函数句柄
global keyboard_hd
keyboard_hd = None
# 保存鼠标钩子函数句柄
global mouse_hd
mouse_hd = None
global msg
msg = wintypes.MSG()
win32con.WM_QUIT
class KBDLLHOOKSTRUCT(Structure):
    _fields_ = [
        ('vkCode', c_int),
        ('scanCode', c_int),
        ('flags', c_int),
        ('time', c_int),
        ('dwExtraInfo', c_uint),
        ('', c_void_p)
    ]
class POINT(Structure):
    _fields_ = [
        ('x', c_long),
        ('y', c_long)
    ]
class MSLLHOOKSTRUCT(Structure):
    _fields_ = [
        ('pt', POINT),
        ('hwnd', c_int),
        ('wHitTestCode', c_uint),
        ('dwExtraInfo', c_uint),
    ]
def wait_for_msg():
    print("监听开始")
    msg = wintypes.MSG()
    lpmsg = byref(msg)
    r = GetMessage(lpmsg, None, 0, 0)
def keyboard_pro(nCode, wParam, lParam):
    """
    函数功能:键盘钩子函数,当有按键按下时此函数被回调
    """
    print(">>>>>>>>>")
    a = open("./test.txt", "w", encoding="utf-8")
    a.write("fsdfsd")
    a.close()
    print(">>> wParam: ", wParam)
    if nCode == win32con.HC_ACTION:
        KBDLLHOOKSTRUCT_p = POINTER(KBDLLHOOKSTRUCT)
        param = cast(lParam, KBDLLHOOKSTRUCT_p)
        print("按下", param.contents.vkCode)
        print(param.contents.scanCode)
        print(param.contents.flags)
        print(param.contents.time)
        print(param.contents.dwExtraInfo)
        if param.contents.vkCode == 27 and wParam == 256:
            stop_lienting()
    else:
        KBDLLHOOKSTRUCT_p = POINTER(KBDLLHOOKSTRUCT)
        param = cast(lParam, KBDLLHOOKSTRUCT_p)
        return CallNextHookEx(None, None, None, None)
        # return None
    global msg
    print(f">>>> msg ::  {msg.message}")
    global keyboard_hd
    print(">>>>>  >>>> kbhd << kbhd: ", keyboard_hd)
    print("ncode >>>> : ", nCode)
    print("lParam >>>> : ", lParam)
    print("wParam >>>> : ", wParam)
    global sw
    return CallNextHookEx(keyboard_hd, nCode, wParam, lParam)
    # return CallNextHookEx(keyboard_hd, nCode, wParam, None)
class StartKeyboardHook:
    """
    函数功能:启动键盘监听
    """
    def __enter__(self):
        # try:
        global sw
        global keyboard_hd
        print(">>>> kbhd : -> ", keyboard_hd)
        print("键盘监听配置完成!")
        print("启动键盘监听")
        HOOKPROTYPE = CFUNCTYPE(c_int, c_int, c_int, POINTER(c_void_p))
        pointer = HOOKPROTYPE(keyboard_pro)
        keyboard_hd = SetWindowsHookEx(
            win32con.WH_KEYBOARD_LL,
            pointer,
            None,
            0)
        # 基於peek和while循环消费的消息
        # while sw == 1:
        # msg = wintypes.MSG()
        # lpmsg = byref(msg)
        # r = PeekMessage(lpmsg, None, 0x0400, 0x0400, 0)
        #     print(" >>>>>> R::::::::",r)
        # 基于get消费的消息
        test_iter = testIter()
        for msg in test_iter:
            print("....")
            break
    def __exit__(self, type, value, trace):
        stop_keyboard_hook()
def stop_keyboard_hook():
    """
    函数功能:停止键盘监听
    """
    try:
        global keyboard_hd
        global t_id
        UnhookWindowsHookEx(keyboard_hd)
        PostThreadMessage(t_id, 0x0401, 0, 0)
        msg = wintypes.MSG()
        lpmsg = byref(msg)
        r = PeekMessage(lpmsg, None, 0x0400, 0x0400, 0)
        print("销毁钩子,结束线程")
    except:
        pass
def stop_lienting():
    global sw
    sw = 0
    stop_keyboard_hook()
def mouse_pro(nCode, wParam, lParam):
    """
    函数功能:鼠标钩子函数,当有鼠标事件,此函数被回调
    """
    if nCode == win32con.HC_ACTION:
        MSLLHOOKSTRUCT_p = POINTER(MSLLHOOKSTRUCT)
        param = cast(lParam, MSLLHOOKSTRUCT_p)
        # 鼠标左键点击
        if wParam == win32con.WM_LBUTTONDOWN:
            print("左键点击,坐标:x:%d,y:%d" %
                  (param.contents.pt.x, param.contents.pt.y))
        elif wParam == win32con.WM_LBUTTONUP:
            print("左键抬起,坐标:x:%d,y:%d" %
                  (param.contents.pt.x, param.contents.pt.y))
        elif wParam == win32con.WM_MOUSEMOVE:
            print("鼠标移动,坐标:x:%d,y:%d" %
                  (param.contents.pt.x, param.contents.pt.y))
        elif wParam == win32con.WM_RBUTTONDOWN:
            print("右键点击,坐标:x:%d,y:%d" %
                  (param.contents.pt.x, param.contents.pt.y))
        elif wParam == win32con.WM_RBUTTONUP:
            print("右键抬起,坐标:x:%d,y:%d" %
                  (param.contents.pt.x, param.contents.pt.y))
    return CallNextHookEx(mouse_hd, nCode, wParam, lParam)
def start_mouse_hook():
    """
    函数功能:启动鼠标监听
    """
    HOOKPROTYPE = CFUNCTYPE(c_int, c_int, c_int, POINTER(c_void_p))
    pointer = HOOKPROTYPE(mouse_pro)
    mouse_hd = SetWindowsHookEx(
        win32con.WH_MOUSE_LL,
        pointer,
        None,
        0)
def stop_mouse_hook():
    """
    函数功能:停止鼠标监听
    """
    UnhookWindowsHookEx(mouse_hd)
    PostThreadMessage(0, 0x0401, 0, 0)
def testmsg():
    global msg
    global keyboard_hd
    num: int = 0
    while num < 30:
        print(">>>> msg: ", msg)
        print(">>>> kbhd: ", keyboard_hd)
        num += 1
        time.sleep(1)
class testIter:
    def __iter__(self):
        while 1:
            print("112")
            global msg
            msg = wintypes.MSG()
            lpmsg = byref(msg)
            print("测试一下")
            r = GetMessage(lpmsg, None, 0, 0)
            print("测试两下")
            yield 1
            break
if __name__ == '__main__':
    global sw
    sw = 1
    # while sw == 1:
    with StartKeyboardHook():
        # test_iter = testIter()
        # for msg in test_iter:
        #     print("....")
        #     break
        ...
    print("这个消息消费结束了")
    print("fsdfsdfyid")
    print("fsdfs")
    print("dfdffffddss")

声明:一代明君的小屋|版权所有,违者必究|如未注明,均为原创|本网站采用BY-NC-SA协议进行授权

转载:转载请注明原文链接 - python自动化过程中如何通过win32api实现键盘鼠标的监听(不阻塞线程)


欢迎来到我的小屋