使用python来保存键盘输入情况,可保存到sqlite3数据库

发布于:2024-09-17 ⋅ 阅读:(52) ⋅ 点赞:(0)

1.代码单次保存最大键盘输入数目是300,全局变量可改

2、在gui界面可以设置单次保存的名字,方便下次查找,录入数据库

3. gui界面有串口选择按钮,需要有硬件串口转hid模块ch9329的,可以直接发送串口数据来实现模拟键盘输入.  没有硬件的直接注释即可.

好了, 不说多,直接上代码

ch9329的github地址

ch9329 github

import sys
import threading
import sqlite3
from PyQt5.QtWidgets import (QApplication, QComboBox,QWidget, QPushButton,QHBoxLayout, QVBoxLayout, QLineEdit, QLabel, QTextEdit)
from pynput import keyboard
from serial import Serial  # 串口模块
import serial.tools.list_ports  # 用于列出本地可用的串口


from ch9329 import keyboard as ch9329Keyboard
from ch9329 import mouse
from ch9329.config import get_manufacturer
from ch9329.config import get_product
from ch9329.config import get_serial_number


# 定义全局变量来存储键盘事件
key_events = []
MAX_EVENTS = 5 * 60  # 最大事件数量

# 定义键盘事件处理函数
def on_press(key):
    if len(key_events) < MAX_EVENTS:
        key_events.append(('pressed', str(key)))

def on_release(key):
    if len(key_events) < MAX_EVENTS:
        key_events.append(('released', str(key)))
    if str(key) == 'Key.esc':  # 如果按下的是Esc键,停止监听
        return False

# 启动键盘监听器的函数
def start_listener():
    with keyboard.Listener(on_press=on_press, on_release=on_release) as listener:
        listener.join()

# 保存数据到文件
def save_to_file():
    with open('key_events.txt', 'w') as file:
        for event in key_events:
            file.write(f"{event[0]} {event[1]}\n")

# 将数据保存到 SQLite 数据库
def save_to_database(name):
    if not name:
        return

    # 连接到 SQLite 数据库(如果不存在则创建)
    conn = sqlite3.connect('key_events.db')
    cursor = conn.cursor()

    # 创建表格(如果不存在)
    cursor.execute('''
        CREATE TABLE IF NOT EXISTS events (
            id INTEGER PRIMARY KEY AUTOINCREMENT,
            name TEXT,
            event_type TEXT,
            key TEXT
        )
    ''')

    # 插入数据
    for event in key_events:
        cursor.execute('''
            INSERT INTO events (name, event_type, key)
            VALUES (?, ?, ?)
        ''', (name, event[0], event[1]))

    # 提交事务并关闭连接
    conn.commit()
    conn.close()

# 查找数据库中的数据
def find_in_database(name):
    if not name:
        return "Please enter a name."

    # 连接到 SQLite 数据库
    conn = sqlite3.connect('key_events.db')
    cursor = conn.cursor()

    # 查询匹配的记录
    cursor.execute('''
        SELECT * FROM events WHERE name = ?
    ''', (name,))
    results = cursor.fetchall()

    # 关闭连接
    conn.close()

    if results:
        return '\n'.join([f"{row[1]} - {row[2]} - {row[3]}" for row in results])
    else:
        return "No records found."

# 删除数据库中的数据
def delete_from_database(name):
    if not name:
        return "Please enter a name."

    # 连接到 SQLite 数据库
    conn = sqlite3.connect('key_events.db')
    cursor = conn.cursor()

    # 查询并删除匹配的记录
    cursor.execute('''
        SELECT * FROM events WHERE name = ?
    ''', (name,))
    results = cursor.fetchall()

    if results:
        cursor.execute('''
            DELETE FROM events WHERE name = ?
        ''', (name,))
        conn.commit()
        conn.close()
        return f"Deleted {len(results)} records for name: {name}"
    else:
        conn.close()
        return "No records found to delete."

# 串口功能:打开串口
def open_serial(com_port, baud_rate):
    global serial_port
    try:
        serial_port = Serial(com_port, baud_rate)
        return f"Serial port {com_port} opened at {baud_rate} baud."
    except Exception as e:
        return f"Failed to open serial port: {str(e)}"

# 获取本地可用的串口
def get_available_ports():
    ports = serial.tools.list_ports.comports()
    return [port.device for port in ports]

# 串口功能:关闭串口
def close_serial():
    global serial_port
    if serial_port and serial_port.is_open:
        serial_port.close()
        return "Serial port closed."
    else:
        return "No serial port is open."

# GUI 窗口类
class KeyboardListenerApp(QWidget):
    def __init__(self):
        super().__init__()
        self.initUI()
        self.parse_res = []
    def initUI(self):
        # 创建 UI 元素
        self.start_button = QPushButton('Start Listening', self)
        self.save_button = QPushButton('Save Data', self)
        self.find_button = QPushButton('Find Data', self)
        self.delete_button = QPushButton('Delete Data', self)  # 增加删除按钮
        self.open_serial_button = QPushButton('Open Serial', self)  # 打开串口按钮
        self.close_serial_button = QPushButton('Close Serial', self)  # 关闭串口按钮
        
        self.name_edit = QLineEdit(self)
        self.name_edit.setPlaceholderText("Enter your name")
        self.query_edit = QLineEdit(self)
        self.query_edit.setPlaceholderText("Enter name to search")
        self.results_area = QTextEdit(self)
        self.results_area.setReadOnly(True)

        self.send_serial_button = QPushButton('SendSerialData', self)  # 发送串口数据按钮
        # 串口选择和波特率选择
        self.com_port_select = QComboBox(self)
        self.baud_rate_select = QComboBox(self)
        self.baud_rate_select.addItems(["9600", "19200", "38400", "57600", "115200"])

        # 获取可用的串口
        self.update_ports()

        # 创建布局
        namelayout = QHBoxLayout()
        layout = QVBoxLayout()
        namelayout.addWidget(QLabel("Name:"))
        namelayout.addWidget(self.name_edit)
        layout.addLayout(namelayout)
        layout.addWidget(self.start_button)
        layout.addWidget(self.save_button)
        Searchnamelayout = QHBoxLayout()
        Searchnamelayout.addWidget(QLabel("Search Name:"))
        Searchnamelayout.addWidget(self.query_edit)
        layout.addLayout(Searchnamelayout)
        layout.addWidget(self.find_button)
        layout.addWidget(self.delete_button)  # 增加删除按钮到布局
        layout.addWidget(QLabel("Results:"))
        layout.addWidget(self.results_area)

        # add串口配置布局
        serial_layout = QHBoxLayout()
        serial_layout.addWidget(QLabel("COM Port:"))
        serial_layout.addWidget(self.com_port_select)
        serial_layout.addWidget(QLabel("Baud Rate:"))
        serial_layout.addWidget(self.baud_rate_select)

        layout.addLayout(serial_layout)
        layout.addWidget(self.open_serial_button)
        layout.addWidget(self.close_serial_button)
        layout.addWidget(self.send_serial_button)

        self.setLayout(layout)

        # 设置窗口属性
        self.setWindowTitle('Keyboard Listener')
        self.setGeometry(300, 300, 600, 400)

        # 绑定按钮事件
        self.start_button.clicked.connect(self.start_listening)
        self.save_button.clicked.connect(self.save_data)
        self.find_button.clicked.connect(self.find_data)
        self.delete_button.clicked.connect(self.delete_data)  # 绑定删除按钮事件
        self.open_serial_button.clicked.connect(self.open_serial)
        self.close_serial_button.clicked.connect(self.close_serial)
        # 绑定按钮事件
        self.send_serial_button.clicked.connect(self.send_serial_data)
    
    def update_ports(self):
        """更新可用的串口列表"""
        available_ports = get_available_ports()
        self.com_port_select.clear()
        self.com_port_select.addItems(available_ports)

        # 如果没有可用串口,禁用“Open Serial”按钮
        if not available_ports:
            self.open_serial_button.setEnabled(False)
        else:
            self.open_serial_button.setEnabled(True)
    
    def _send_serialdata(self,res,port):
        import time
        if res is None:
            return
        if len(res) == 0:
            return
        for i in res:
            ch9329Keyboard.press_and_release(port, i,min_interval=0.2,max_interval=0.3)
            time.sleep(0.5)
        
    def send_serial_data(self):
        global serial_port
        if serial_port and serial_port.is_open:
            """发送串口数据"""
            send_thread = threading.Thread(target=self._send_serialdata, args=(self.parse_res, serial_port), daemon=True)
            send_thread.start()

    def start_listening(self):
        # 创建并启动一个新的线程来运行键盘监听器
        listener_thread = threading.Thread(target=start_listener, daemon=True)
        listener_thread.start()

    def save_data(self):
        name = self.name_edit.text()
        if key_events:
            # 使用线程来保存数据到文件和数据库
            save_thread = threading.Thread(target=self.save_data_in_background, args=(name,), daemon=True)
            save_thread.start()

    def save_data_in_background(self, name):
        # 在后台线程中保存数据
        save_to_file()
        save_to_database(name)
        print("Data saved successfully.")
    def parse_and_execute(self,result):
        res = []
        """解析查找结果并执行按键事件"""
        # 将结果按行拆分
        lines = result.splitlines()
        print("解析查找结果并执行按键事件.")
        for line in lines:
            # 分割每行的内容
            parts = line.split(" - ")
            if len(parts) >= 3:
                action = parts[1]  # "pressed" 或 "released"
                key = parts[2]     # 键值('a' 或者 Key.ctrl_l)
                # 去除多余的引号,例如 'a' -> a
                key = key.strip("'")
                if len(key) >= 3:
                    key = key.strip("Key.")
                # 执行键盘操作
                if action == 'pressed':
                    #print(key)
                    res.append(key)
                    #keyboard_controller.press(key)
                #elif action == 'released':
                    #keyboard_controller.release(key)
            else:
                print("len < 3.")
        return res
    def find_data(self):
        name = self.query_edit.text()
        result = find_in_database(name)
        self.results_area.setText(result)
        self.parse_res = self.parse_and_execute(result)

    def delete_data(self):
        name = self.query_edit.text()
        result = delete_from_database(name)
        self.results_area.setText(result)
    def open_serial(self):
        com_port = self.com_port_select.currentText()
        baud_rate = int(self.baud_rate_select.currentText())
        result = open_serial(com_port, baud_rate)
        self.results_area.setText(result)

    def close_serial(self):
        result = close_serial()
        self.results_area.setText(result)
# 主函数
def main():
    app = QApplication(sys.argv)
    ex = KeyboardListenerApp()
    ex.show()
    sys.exit(app.exec_())

if __name__ == '__main__':
    main()