前言
一直以来都想要给之前开发的自动化库pyautoass加上键鼠监听的功能。可是原本的pywin32恰好没有接入hook相关的api。再加上之前从没有学习和研究过原本的win32api以及win的运行机制。没想到这个功能居然成了规划中难度较大的部分。好在通过学习了大量的资料,以及对pynput库的源码的阅读,终于理解和实现了键盘鼠标的监听功能。
所用到的库
- pywin32
- ctypes
所使用到的win32api
- SetWindowsHookEx(), 将用户定义的钩子函数添加到钩子链中, 也就是我们的注册钩子函数
- UnhookWindowsHookEx(), 卸载钩子函数
- CallNextHookEx()在我们的钩子函数中必须调用, 这样才能让程序的传递消息
- GetMessageW(),用于监听的核心,用来消耗按键的消息,从调用线程的消息队列中检索消息。 函数调度传入的已发送消息,直到已发布的消息可供检索。因为启动后会阻塞一个线程,所以可以实现监听的作用。
- PeekMessageA(),调度传入的非排队消息,检查线程消息队列中是否存在已发布的消息,并检索 (消息(如果存在任何) )。不同于GetMessage(),该方法不会阻塞线程,只检查是否存在已发布的消息,并检索,如果不存在就直接结束了。
- PostMessageA(),将 (帖子) 与创建指定窗口的线程关联的消息队列中,并在不等待线程处理消息的情况下返回。主要是用来最后销毁线程。
- GetCurrentThreadId(),检索调用线程的线程标识符。
引用内容
在没有钩子函数的情况下windows程序运行机制
键盘输入 --> 系统消息队列 --> 对应应用程序的消息队列 --> 将消息发送到对应的窗口中
在有了钩子函数的情况下windows程序运行机制
键盘输入 --> 系统消息队列 --> 对应应用程序消息队列 --> 将消息发送到钩子链中 --> 消息一一调用完毕所有的钩子函数(需要调用CallNextHookEx函数才能将消息传递下去) --> 将消息发送到对应的窗口中
示例代码
注意:
在程序中, 我们通过CFUNCTYPE返回一个类对象, 通过该类对象可以实例化出我们需要的c类型的函数, 但是如果不将他放在全局的话则会失去效果, 因为在C语言中函数是全局的
from ctypes import *
from ctypes import wintypes
import win32con
SetWindowsHookEx = windll.user32.SetWindowsHookExA
UnhookWindowsHookEx = windll.user32.UnhookWindowsHookEx
CallNextHookEx = windll.user32.CallNextHookEx
GetMessage = windll.user32.GetMessageW
GetModuleHandle = windll.kernel32.GetModuleHandleW
PeekMessage = windll.user32.PeekMessageA
PostThreadMessage = windll.user32.PostThreadMessageW
GetCurrentThreadId = windll.kernel32.GetCurrentThreadId
GetCurrentThreadId.restype = wintypes.DWORD
global t_id
t_id = GetCurrentThreadId()
# 保存键盘钩子函数句柄
global keyboard_hd
keyboard_hd = None
# 保存鼠标钩子函数句柄
global mouse_hd
mouse_hd = None
global msg
msg = wintypes.MSG()
win32con.WM_QUIT
class KBDLLHOOKSTRUCT(Structure):
_fields_ = [
('vkCode', c_int),
('scanCode', c_int),
('flags', c_int),
('time', c_int),
('dwExtraInfo', c_uint),
('', c_void_p)
]
class POINT(Structure):
_fields_ = [
('x', c_long),
('y', c_long)
]
def wait_for_msg():
print("监听开始")
msg = wintypes.MSG()
lpmsg = byref(msg)
GetMessage(lpmsg, None, 0, 0)
def keyboard_pro(nCode, wParam, lParam):
"""
函数功能:键盘钩子函数,当有按键按下时此函数被回调
"""
if nCode == win32con.HC_ACTION:
KBDLLHOOKSTRUCT_p = POINTER(KBDLLHOOKSTRUCT)
param = cast(lParam, KBDLLHOOKSTRUCT_p)
print(f"按下了:{param.contents.vkCode}")
# 用于退出监听
if param.contents.vkCode == 27 and wParam == 256:
stop_keyboard_hook()
else:
KBDLLHOOKSTRUCT_p = POINTER(KBDLLHOOKSTRUCT)
param = cast(lParam, KBDLLHOOKSTRUCT_p)
return CallNextHookEx(None, None, None, None)
# return None
global msg
global keyboard_hd
return CallNextHookEx(keyboard_hd, nCode, wParam, lParam)
class StartKeyboardHook:
"""
函数功能:启动键盘监听
"""
def __enter__(self):
# try:
global keyboard_hd
HOOKPROTYPE = CFUNCTYPE(c_int, c_int, c_int, POINTER(c_void_p))
pointer = HOOKPROTYPE(keyboard_pro)
keyboard_hd = SetWindowsHookEx(
win32con.WH_KEYBOARD_LL,
pointer,
None,
0)
wait_for_msg()
def __exit__(self, type, value, trace):
stop_keyboard_hook()
def stop_keyboard_hook():
"""
函数功能:停止键盘监听
"""
try:
global keyboard_hd
global t_id
UnhookWindowsHookEx(keyboard_hd) # 卸载钩子
PostThreadMessage(t_id, 0x0401, 0, 0) # 销毁线程
except:
pass
class testIter:
def __iter__(self):
while 1:
print("112")
global msg
msg = wintypes.MSG()
lpmsg = byref(msg)
GetMessage(lpmsg, None, 0, 0)
yield msg
if __name__ == '__main__':
with StartKeyboardHook():
...
print("监听结束了")参考资料
- pynput源码
- SetWindowsHookEx - 微软windows引用开发文档
- getMessage - 微软windows引用开发文档
- GetCurrentThreadId - 微软windows引用开发文档
- Python监听键盘和鼠标事件
- python 在windows下监听键盘按键
- Windows窗口与消息:钩子
- Windows中的Hook机制
学习过程中混乱的源代码用例
# coding = gbk
from threading import Thread
from ctypes import *
from ctypes import wintypes
import os
import sys
import win32con
import time
SetWindowsHookEx = windll.user32.SetWindowsHookExA
UnhookWindowsHookEx = windll.user32.UnhookWindowsHookEx
CallNextHookEx = windll.user32.CallNextHookEx
GetMessage = windll.user32.GetMessageW
GetModuleHandle = windll.kernel32.GetModuleHandleW
PeekMessage = windll.user32.PeekMessageA
PostThreadMessage = windll.user32.PostThreadMessageW
GetCurrentThreadId = windll.kernel32.GetCurrentThreadId
GetCurrentThreadId.restype = wintypes.DWORD
global t_id
t_id = GetCurrentThreadId()
# 保存键盘钩子函数句柄
global keyboard_hd
keyboard_hd = None
# 保存鼠标钩子函数句柄
global mouse_hd
mouse_hd = None
global msg
msg = wintypes.MSG()
win32con.WM_QUIT
class KBDLLHOOKSTRUCT(Structure):
_fields_ = [
('vkCode', c_int),
('scanCode', c_int),
('flags', c_int),
('time', c_int),
('dwExtraInfo', c_uint),
('', c_void_p)
]
class POINT(Structure):
_fields_ = [
('x', c_long),
('y', c_long)
]
class MSLLHOOKSTRUCT(Structure):
_fields_ = [
('pt', POINT),
('hwnd', c_int),
('wHitTestCode', c_uint),
('dwExtraInfo', c_uint),
]
def wait_for_msg():
print("监听开始")
msg = wintypes.MSG()
lpmsg = byref(msg)
r = GetMessage(lpmsg, None, 0, 0)
def keyboard_pro(nCode, wParam, lParam):
"""
函数功能:键盘钩子函数,当有按键按下时此函数被回调
"""
print(">>>>>>>>>")
a = open("./test.txt", "w", encoding="utf-8")
a.write("fsdfsd")
a.close()
print(">>> wParam: ", wParam)
if nCode == win32con.HC_ACTION:
KBDLLHOOKSTRUCT_p = POINTER(KBDLLHOOKSTRUCT)
param = cast(lParam, KBDLLHOOKSTRUCT_p)
print("按下", param.contents.vkCode)
print(param.contents.scanCode)
print(param.contents.flags)
print(param.contents.time)
print(param.contents.dwExtraInfo)
if param.contents.vkCode == 27 and wParam == 256:
stop_lienting()
else:
KBDLLHOOKSTRUCT_p = POINTER(KBDLLHOOKSTRUCT)
param = cast(lParam, KBDLLHOOKSTRUCT_p)
return CallNextHookEx(None, None, None, None)
# return None
global msg
print(f">>>> msg :: {msg.message}")
global keyboard_hd
print(">>>>> >>>> kbhd << kbhd: ", keyboard_hd)
print("ncode >>>> : ", nCode)
print("lParam >>>> : ", lParam)
print("wParam >>>> : ", wParam)
global sw
return CallNextHookEx(keyboard_hd, nCode, wParam, lParam)
# return CallNextHookEx(keyboard_hd, nCode, wParam, None)
class StartKeyboardHook:
"""
函数功能:启动键盘监听
"""
def __enter__(self):
# try:
global sw
global keyboard_hd
print(">>>> kbhd : -> ", keyboard_hd)
print("键盘监听配置完成!")
print("启动键盘监听")
HOOKPROTYPE = CFUNCTYPE(c_int, c_int, c_int, POINTER(c_void_p))
pointer = HOOKPROTYPE(keyboard_pro)
keyboard_hd = SetWindowsHookEx(
win32con.WH_KEYBOARD_LL,
pointer,
None,
0)
# 基於peek和while循环消费的消息
# while sw == 1:
# msg = wintypes.MSG()
# lpmsg = byref(msg)
# r = PeekMessage(lpmsg, None, 0x0400, 0x0400, 0)
# print(" >>>>>> R::::::::",r)
# 基于get消费的消息
test_iter = testIter()
for msg in test_iter:
print("....")
break
def __exit__(self, type, value, trace):
stop_keyboard_hook()
def stop_keyboard_hook():
"""
函数功能:停止键盘监听
"""
try:
global keyboard_hd
global t_id
UnhookWindowsHookEx(keyboard_hd)
PostThreadMessage(t_id, 0x0401, 0, 0)
msg = wintypes.MSG()
lpmsg = byref(msg)
r = PeekMessage(lpmsg, None, 0x0400, 0x0400, 0)
print("销毁钩子,结束线程")
except:
pass
def stop_lienting():
global sw
sw = 0
stop_keyboard_hook()
def mouse_pro(nCode, wParam, lParam):
"""
函数功能:鼠标钩子函数,当有鼠标事件,此函数被回调
"""
if nCode == win32con.HC_ACTION:
MSLLHOOKSTRUCT_p = POINTER(MSLLHOOKSTRUCT)
param = cast(lParam, MSLLHOOKSTRUCT_p)
# 鼠标左键点击
if wParam == win32con.WM_LBUTTONDOWN:
print("左键点击,坐标:x:%d,y:%d" %
(param.contents.pt.x, param.contents.pt.y))
elif wParam == win32con.WM_LBUTTONUP:
print("左键抬起,坐标:x:%d,y:%d" %
(param.contents.pt.x, param.contents.pt.y))
elif wParam == win32con.WM_MOUSEMOVE:
print("鼠标移动,坐标:x:%d,y:%d" %
(param.contents.pt.x, param.contents.pt.y))
elif wParam == win32con.WM_RBUTTONDOWN:
print("右键点击,坐标:x:%d,y:%d" %
(param.contents.pt.x, param.contents.pt.y))
elif wParam == win32con.WM_RBUTTONUP:
print("右键抬起,坐标:x:%d,y:%d" %
(param.contents.pt.x, param.contents.pt.y))
return CallNextHookEx(mouse_hd, nCode, wParam, lParam)
def start_mouse_hook():
"""
函数功能:启动鼠标监听
"""
HOOKPROTYPE = CFUNCTYPE(c_int, c_int, c_int, POINTER(c_void_p))
pointer = HOOKPROTYPE(mouse_pro)
mouse_hd = SetWindowsHookEx(
win32con.WH_MOUSE_LL,
pointer,
None,
0)
def stop_mouse_hook():
"""
函数功能:停止鼠标监听
"""
UnhookWindowsHookEx(mouse_hd)
PostThreadMessage(0, 0x0401, 0, 0)
def testmsg():
global msg
global keyboard_hd
num: int = 0
while num < 30:
print(">>>> msg: ", msg)
print(">>>> kbhd: ", keyboard_hd)
num += 1
time.sleep(1)
class testIter:
def __iter__(self):
while 1:
print("112")
global msg
msg = wintypes.MSG()
lpmsg = byref(msg)
print("测试一下")
r = GetMessage(lpmsg, None, 0, 0)
print("测试两下")
yield 1
break
if __name__ == '__main__':
global sw
sw = 1
# while sw == 1:
with StartKeyboardHook():
# test_iter = testIter()
# for msg in test_iter:
# print("....")
# break
...
print("这个消息消费结束了")
print("fsdfsdfyid")
print("fsdfs")
print("dfdffffddss") 

哈哈找到这个库的作者了, 再用这个库进行开发, 希望能github啥的
@elliott : 呜呜呜,感觉写得像稀饭,所以还没敢挂在github上面。最开始主要是为了临时应付一些问题,后来才慢慢添加了很多东西。
@elliott : 突然发现如果双屏的话, 另一半是黑的, 可能获取的宽度有问题
@elliott : 新版本已修复该问题。
@elliott : 嗯嗯,可以详细聊聊嘛。测试状态没有这个情况,应该是有什么没兼顾到的情况。
@一代明君 : 发现了,因为开发的时候默认主显示屏在左边,所以当主显示屏再右边新增设备以后。就截取不到。后续修复。