python-串口助手(OV7670图传)

发布于:2025-03-06 ⋅ 阅读:(17) ⋅ 点赞:(0)

代码

主python文件

import serial
import serial.tools.list_ports
import time
import tkinter as tk
from tkinter import ttk
import numpy as np
from matplotlib.backends.backend_tkagg import FigureCanvasTkAgg
from matplotlib.figure import Figure
import threading
from queue import Queue
from PIL import Image, ImageTk
import tkinter as tk
import LightSwitch
import tkinter as tk
from datetime import datetime
import cv2 as cv
import cv2
import numpy as np
import os
'''
智能红绿灯上位机和MCU通信协议:
MCU->上位机 0x01 0xfe ...(320*240*2)BYTE...0xfe 0x01
上位机->MCU 0x02 0xff 0xXX 0xXX 0xXX 0xXX 0xff 0x02
字节2:高5bit传输当前系统时间0h-23h;低3bit传输时间增减标志位(0非深夜和高峰模式 1高峰模式  2深夜模式)
字节3:按钮值(0无按键 1按键+   2按键-   3按键切换灯)
字节4:0空闲(识别关闭);
        如果无按键和深夜或者高峰模式(判断标志位),当接收到图像数据后进行识别,
        否则接收到的图像数据丢弃,若图片中方形白色>20个,高4bit发送绿灯增加5s标志位1;
        当白色方块0<白色方块数<20,正常计时标志2(也为默认开启值);
        当白色方形==0,发送下次红灯长红标志位3。
字节5:识别到的车辆数量
'''
# 初始化窗口和控件
window = tk.Tk()
window.title('智能红绿灯串口助手')
window.geometry('900x600+200+100')
serialName = []#串口名称
ser = serial.Serial()  # 串口实例对象

def opencom(*args):
    if not ser.is_open:
        ser.open()
        setbaudrate()
        setPort()
        setstop()
        setShujv()
        setcheck()
        if ser.is_open:
            print(f"串口已打开")
            light.turn_on()

def closecom(*args):
    if ser.is_open:
        ser.close()
        if not ser.is_open:
            print(f"串口已关闭")
            light.turn_off()

def clearreceive(*args):
    t0.delete(1.0, 'end')

def send(*args):
    pass


def hex_string_to_byte_list(hex_str):
    if len(hex_str) % 2 != 0:
        raise ValueError("Hex string length must be even.")
    return [int(hex_str[i:i+2], 16) for i in range(0, len(hex_str), 2)]


Str_Get_List = []#存储接收到的Hex字符串转成的十六进制字节列表
Get_Picture_List = [] #存储图像数据
PackHead = [1,254]
PackEnd = [254,1]
PackHeadGetFlag = False
PackGetFlag = False
Picture_ByteLen = 320*240*2+4
lock = threading.Lock()# 创建一个线程锁
rgb_array = []#存储RGB888转换的数组数据,用于保存图片
SendData = [0x02,0xff,0x00,0x00,0x00,0x00,0xff,0x02]#存储发送数据包
IdentificationCount = 0 #图像识别数量
def showdata(*args):
    global Str_GetString
    global Str_Get_List
    global PackHeadGetFlag
    global PackGetFlag
    while not stop_event.is_set():
        if ser.is_open:
            try:
                try:
                    with lock:  # 使用锁确保线程安全
                        s = ser.read_all()  # 获取字节对象
                        if s:
                            sHex = s.hex()  # 获取Hex字符串 ,2个字符表示一个十六进制1字节
                            t0.delete("1.0", tk.END)
                            t0.insert('end', sHex)
                            t0.see('end')  # 自动滚动到最新接收到的数据
                            # Str_Get_List = Str_Get_List.append(hex_string_to_byte_list(sHex))
                            # print(sHex,len(Str_Get))

                            #获取数据列表
                            sHexList = hex_string_to_byte_list(sHex)
                            for i in sHexList:
                                Str_Get_List.append(i)

                            #处理数据和发送相关数据包
                            if Hour_CarFlag ==0:
                                pictureDataDispose()
                            else:
                                print("特殊模式,摄像头不启用")

                            #情况接收到的数据
                            Str_Get_List = []
                except Exception as e:
                    print(f"Unexpected error: {e}")
            except serial.SerialException as e:
                print(f"Error reading from serial port: {e}")
            except Exception as e:
                print(f"Unexpected error: {e}")
        else:
            pass



def pictureDataDispose(*args):
    global PackHeadGetFlag
    global Get_Picture_List
    global Picture_ByteLen
    global PackHead
    global Str_Get_List
    global rgb_array
    #print(Str_Get_List)
    if not PackHeadGetFlag:#未接收到包头
        index_Str_Get_List = find_sublist(Str_Get_List,PackHead)
        if index_Str_Get_List!=-1:
            Get_Picture_List = Str_Get_List[index_Str_Get_List:]
            PackHeadGetFlag = True
            print(f"Element {PackHead} found at index {index_Str_Get_List}")
        else:
            print(f"Element {PackHead} not found")
    else:
        Get_Picture_List.extend(Str_Get_List)#添加所有元素到尾部
        if len(Get_Picture_List) >= Picture_ByteLen:
            Get_Picture_List_PackEnd = Get_Picture_List[Picture_ByteLen-2:Picture_ByteLen]
            index_PackEnd = find_sublist(Get_Picture_List_PackEnd,PackEnd)
            if index_PackEnd==0:
                print("接收到一帧图片数据",print(len(Get_Picture_List)))
                PictureData = Get_Picture_List[2:Picture_ByteLen-2]
                # 转换图像数据
                rgb565_values = [(PictureData[i] << 8 | PictureData[i + 1]) for i in range(0, len(PictureData), 2)]
                rgb888_values = [rgb565_to_rgb888(val) for val in rgb565_values]
                #print(rgb888_values, len(rgb888_values), len(rgb565_values))
                update_image(rgb888_values)
                # 将RGB数据转换为numpy数组,并调整其形状以匹配图像尺寸和通道数(高度, 宽度, 通道)
                rgb_array = np.array(rgb888_values).reshape((240, 320, 3))
                #print(rgb_array)
                SavePicture(rgb_array)
                PictureIdentification()

                #---------------------处理第二帧图片包头
                Get_Picture_List = []
                PackHeadGetFlag = False
            else:
                Get_Picture_List = []
                PackHeadGetFlag = False

def PictureIdentification(*args):
    global IdentificationCount
    img = cv.imread('./image/2.jpg')

    # cv_show('img', img)
    #print(img)
    HSV = cv.cvtColor(img, cv.COLOR_BGR2HSV)  # 转换图像
    lowerColor = np.array([0, 0, 200])  # 设置最低阈值  np.array([Hmin, Smin, Vmin])
    upperColor = np.array([180, 20, 255])  # 设置最高阈值
    # 因为要保留的就是白色区域,因此根据白色阈值填入,提取白色部分(指定区域变白,其他变黑)
    binary = cv.inRange(HSV, lowerColor, upperColor)
    #print(binary)
    # 运用中值滤波去除噪声
    median = cv.medianBlur(binary, 9)
    # 显示二值图
    #cv_show('median', median)  # 这里是我自己写的cv_show()函数,函数声明可放开头
    contours, hierachy = cv.findContours(median, cv.RETR_EXTERNAL, cv.CHAIN_APPROX_NONE)
    # 后两个为轮廓检索模式和轮廓逼近模式

    res = cv.drawContours(img, contours, -1, (0, 0, 255), 4)
    #cv_show('res', res)

    # 4、原图白色中心点
    L = len(contours)  # contours轮廓数据是数组,因此用len()测数组长度,为了循环画点使用
    IdentificationCount = L#识别白色块的数量

    for i in range(L):
        cnt = contours[i]  # cnt表示第i个白色快的轮廓信息
        (x, y), radius = cv.minEnclosingCircle(cnt)  # 得到白色块外接圆的圆心坐标和半径
        center = (int(x), int(y))  # 画center圆心时。x,y必须是整数

        # 标出中心点
        img2 = cv.circle(img, center, 3, (0, 0, 255), 5)  # 传入圆心信息,并画在原图上

        print(center)  # 输出各个中心点

    # 显示有中心点的图像
    #cv_show("frame", img2)  # 展示花了中心点的魔方图


# 写一个显示函数
def cv_show(name, img):
    cv.imshow(name, img)
    cv.waitKey(0)
    cv.destroyAllWindows()




def SavePicture(*args):
    global rgb_array
    # 检查并创建保存图像的目录(如果不存在)
    output_dir = './image'
    os.makedirs(output_dir, exist_ok=True)
    # 图像保存路径
    output_path = os.path.join(output_dir, '2.jpg')
    # 使用OpenCV将RGB图像保存为JPEG格式
    cv2.imwrite(output_path, rgb_array[:, :, ::-1])  # 注意这里对颜色通道进行了转换(BGR到RGB)
    print(f"Image saved successfully at {output_path}")

# def dispose_LastData(*args):
#     global PackHeadGetFlag
#     global Get_Picture_List
#     global Picture_ByteLen
#     global PackHead
#
#     if len(Get_Picture_List[Picture_ByteLen:]) >= 2:
#         Get_Picture_List_last = Get_Picture_List[Picture_ByteLen:]  # 保留剩下的数据
#         index_Str_Get_List = find_sublist(Get_Picture_List_last, PackHead)  # 判断剩下的数据
#         if index_Str_Get_List != -1:
#             Get_Picture_List = Get_Picture_List_last[index_Str_Get_List:]
#             print(f"Element {PackHead} found at index {index_Str_Get_List}")
#         else:
#             print(f"Element {PackHead} not found")
#             Get_Picture_List = []
#             PackHeadGetFlag = False
#             print(f"剩余非图片数据")
#     else :
#         Get_Picture_List = []
#         PackHeadGetFlag = False


def update_image(rgb888_data):
    """更新Canvas显示图像"""
    image = Image.new('RGB', (320, 240))
    image.putdata(rgb888_data)
    tk_image = ImageTk.PhotoImage(image)
    # 保持图像引用
    if hasattr(canvas, 'image'):
        canvas.image = tk_image
    else:
        canvas.image = tk_image
    canvas.create_image(0, 0, anchor="nw", image=tk_image)
    print("图像已更新")

def rgb565_to_rgb888(rgb565):
    """将单个RGB565值转换为RGB888元组"""
    # 提取红色部分 (5位),并将其从5位扩展到8位
    r = (rgb565 & 0xF800) >> 11
    r = int(r * 255 / 31)

    # 提取绿色部分 (6位),并将其从6位扩展到8位
    g = (rgb565 & 0x07E0) >> 5
    g = int(g * 255 / 63)

    # 提取蓝色部分 (5位),并将其从5位扩展到8位
    b = rgb565 & 0x001F
    b = int(b * 255 / 31)

    return (r, g, b)


# 设置串口参数
def setPort(*args):
    ser.port = comboxlist0.get()

# 获取可用串口列表
port_list = list(serial.tools.list_ports.comports())
for port in port_list:
    serialName.append(port[0])

def closethread(*args):
    while ser.is_open:
        ser.close()
        if not ser.is_open:
            break
    while not ser.is_open:
        print(f"串口已关闭")
        stop_event.set()
        stop_event1.set()
        stop_event2.set()
        if stop_event.is_set() and stop_event1.is_set() and stop_event2.is_set():
            break
    while stop_event2.is_set():
        print(f"时间线程等待stop_event2事件关闭")
        th2.join()
        if not th2.is_alive():
            break
    while stop_event1.is_set():
        print(f"时间线程等待stop_event1事件关闭")
        th1.join()
        if not th1.is_alive():
            break
    while stop_event.is_set():
        print(f"串口线程等待stop_event事件关闭")
        th0.join()
        window.quit()
        if not th0.is_alive():
            break

def printfData(*args):
    if not ser.is_open:
        print("串口未打开")
        return
    ser.write(SendData)

def find_sublist(main_list, sub_list):
    """查找子列表在主列表中的位置"""
    len_main, len_sub = len(main_list), len(sub_list)
    if len_sub == 0 or len_main < len_sub:
        return -1
    for i in range(len_main - len_sub + 1):
        if main_list[i:i+len_sub] == sub_list:
            return i
    return -1

# 设置串口参数
def setPort(*args):
    ser.port = comboxlist0.get()


def setbaudrate(*args):
        while True:
            baudrate = int(comboxlist1.get())
            ser.baudrate = baudrate
            if ser.baudrate == baudrate:
                break

def setShujv(*args):
    ser.bytesize = int(comboxlist2.get())

check = {"N": serial.PARITY_NONE, "E": serial.PARITY_EVEN, "O": serial.PARITY_ODD}
def setcheck(*args):
    ser.parity = check[comboxlist3.get()]

stop = {"1": serial.STOPBITS_ONE, "1.5": serial.STOPBITS_ONE_POINT_FIVE, "2": serial.STOPBITS_TWO}
def setstop(*args):
    ser.stopbits = stop[comboxlist4.get()]

# 获取当前系统时间的小时并更新 Label
def get_current_hour():
    global current_hour
    global Hour_CarFlag
    # 获取当前时间
    now = datetime.now()
    # 提取小时(0-23)
    current_hour = now.hour
    # 更新 Label 的文本
    label6.config(text = f"{now}")
    label8.config(text=f"{IdentificationCount}")
    #当前时间对应模式和标志位判断
    '''current_hour
        其他时间 : 0 正常模式(使用摄像头功能)
        7-9 and 16-18 : 1 ,早晚高峰,绿灯增加10s
        22-23 and 0-3 : 2 ,深夜模式,红灯时间变为20s
    '''
    if (current_hour>=7 and current_hour<=9)or(current_hour>=16 and current_hour<=18):
        Hour_CarFlag = 1
    elif (current_hour>=22 and current_hour<=23)or(current_hour>=0 and current_hour<=3):
        Hour_CarFlag = 2
    else:
        Hour_CarFlag = 0

current_hour = 0 #当前h
Hour_CarFlag = 0 #模式标志位 0正常模式(使用摄像头)  1早晚高峰模式  2深夜模式
#th1线程
def TimeShow(*args):
    while not stop_event1.is_set():
        get_current_hour()
        time.sleep(0.5)

#th2线程
def SendDatathread(*args):
    global SendData
    global current_hour
    global IdentificationCount
    while not stop_event2.is_set():
        time.sleep(1)#1s发送1次数据

        #修改发送的数据
        if Hour_CarFlag == 0:#正常模式
            if IdentificationCount==0:
                SendData[4] = 3 #下次红灯停止计数标志
            elif IdentificationCount >= 20:
                SendData[4] = 1 #车多,绿灯增加5s标志位,正常计时
            elif IdentificationCount > 0 and IdentificationCount < 20:
                SendData[4] = 2 #正常计时标志位
        elif Hour_CarFlag == 1:#高峰模式
            SendData[4] = 0 #摄像头失能  正常计时
            IdentificationCount = 0 #识别车辆数清零
        elif Hour_CarFlag == 2:#夜间模式
            SendData[4] = 0 #摄像头失能  正常计时
            IdentificationCount = 0 #识别车辆数清零

        SendData[2] = ((current_hour<<3)&0xff)+Hour_CarFlag
        SendData[3] = 0 #按键清空,方式按键事件重复

        SendData[5] = IdentificationCount #备用 当前采集车辆

        #发送
        if not ser.is_open:
            print("串口未打开")
        else:
            ser.write(SendData)
            print(SendData)


#信号灯时间增点击事件
def LightTimeIncrease(*args):
    global SendData
    '''SendData[3]
    无按键:0
    当前灯时间增加:1  当前信号灯增加10s 最大99s
    当前灯时间减少:2  当前信号灯减少10s 最小5s
    切换灯:3  当前信号灯变为5s
    '''
    SendData[3] = 1  # 按键清空
    ser.write(SendData)
    SendData[3] = 0  # 按键清空


#信号灯时间减点击事件
def LightTimeDecrease(*args):
    SendData[3] = 2  # 按键清空
    ser.write(SendData)
    SendData[3] = 0  # 按键清空

#信号灯切换点击事件
def SwihchOver(*args):
    SendData[3] = 3  # 按键清空
    ser.write(SendData)
    SendData[3] = 0  # 按键清空

# 创建文本控件和下拉列表控件
label0 = tk.Label(window, text='串口', font=10)
label0.place(y=40, x=50)
label1 = tk.Label(window, text='波特率', font=10)
label1.place(y=70, x=50)
label2 = tk.Label(window, text='数据位', font=10)
label2.place(y=100, x=50)
label3 = tk.Label(window, text='校验位', font=10)
label3.place(y=130, x=50)
label4 = tk.Label(window, text='停止位', font=10)
label4.place(y=160, x=50)
label5 = tk.Label(window, text='时间:', font=10)
label5.place(y=5, x=350)
label6 = tk.Label(window, text='XXXX', font=10)
label6.place(y=5, x=420)
label7 = tk.Label(window, text='车辆:', font=10)
label7.place(y=5, x=700)
label8 = tk.Label(window, text='XXXX', font=10)
label8.place(y=5, x=770)

comvalue0 = tk.StringVar()
comboxlist0 = ttk.Combobox(window, textvariable=comvalue0)
comboxlist0["values"] = tuple(serialName)
comboxlist0.bind("<<ComboboxSelected>>", setPort)
comboxlist0.place(y=40, x=150)

#波特率下拉框
comvalue1 = tk.StringVar()
comboxlist1 = ttk.Combobox(window, textvariable=comvalue1)
comboxlist1["values"] = (1200, 2400, 4800, 9600, 14400, 19200, 115200, 256000)
comboxlist1.current(7)
comboxlist1.bind("<<ComboboxSelected>>", setbaudrate)
comboxlist1.place(y=70, x=150)

#数据位下拉框
comvalue2 = tk.StringVar()
comboxlist2 = ttk.Combobox(window, textvariable=comvalue2)
comboxlist2["values"] = ("8", "7", "6", "5")
comboxlist2.current(0)
comboxlist2.bind("<<ComboboxSelected>>", setShujv)
comboxlist2.place(y=100, x=150)

#校验位下拉框
comvalue3 = tk.StringVar()
comboxlist3 = ttk.Combobox(window, textvariable=comvalue3)
comboxlist3["values"] = ("N", "E", "O")
comboxlist3.current(0)
comboxlist3.bind("<<ComboboxSelected>>", setcheck)
comboxlist3.place(y=130, x=150)

#停止位下拉框
comvalue4 = tk.StringVar()
comboxlist4 = ttk.Combobox(window, textvariable=comvalue4)
comboxlist4["values"] = ("1", "1.5", "2")
comboxlist4.current(0)
comboxlist4.bind("<<ComboboxSelected>>", setstop)
comboxlist4.place(y=160, x=150)


b4 = tk.Button(window, text='关闭线程', width=35, height=1, command=closethread)
b4.place(y=0, x=0)
b0 = tk.Button(window, text='打开串口', width=35, height=1, command=opencom)
b0.place(y=190, x=50)
light = LightSwitch.BooleanLight(window, size=35, x=10, y=190)#串口连接状态显示灯

b1 = tk.Button(window, text='清除接收', width=35, height=1, command=clearreceive)
b1.place(y=250, x=50)
b2 = tk.Button(window, text='关闭串口', width=35, height=1, command=closecom)
b2.place(y=220, x=50)
e0 = tk.Entry(window, show=None, width=35)
e0.place(y=290, x=50)
b5 = tk.Button(window, text='Send', width=35, height=1, command=send)
b5.place(y=320, x=50)
t0 = tk.Text(window, width=65, height=3)
t0.place(y=50, x=350)
b6 = tk.Button(window, text='输出接收到的数据-测试按钮', width=35, height=1, command=printfData)
b6.place(y=350, x=50)
b6 = tk.Button(window, text='当前信号灯时间+', width=35, height=1, command=LightTimeIncrease)
b6.place(y=380, x=50)
b6 = tk.Button(window, text='当前信号灯时间-', width=35, height=1, command=LightTimeDecrease)
b6.place(y=410, x=50)
b6 = tk.Button(window, text='切换信号灯', width=35, height=1, command=SwihchOver)
b6.place(y=440, x=50)
label5 = tk.Label(window, text='Canvas', font=10)
label5.place(y=100, x=350)
# 图像参数
canvas = tk.Canvas(window,width=320, height=240,bg='#{:02x}{:02x}{:02x}'.format(*(255,255,255)),bd=1,highlightthickness=1,relief="groove")
canvas.place(x=420, y=125)





if __name__ == '__main__':
    stop_event = threading.Event()  #等待线程th0停止事件
    stop_event1 = threading.Event()  # 等待线程th1停止事件
    stop_event2 = threading.Event()  # 等待线程th2停止事件
    th0 = threading.Thread(target=showdata)
    th0.start()
    th1 = threading.Thread(target=TimeShow)
    th1.start()
    th2 = threading.Thread(target=SendDatathread)
    th2.start()

    window.mainloop()


 从文件,同目录下LightSwitch.py

import tkinter as tk
class BooleanLight(tk.Canvas):
    def __init__(self, master=None, size=50, x=0, y=0):
        super().__init__(master, width=size, height=size)
        self.pack()
        self.place(x=x, y=y)
        self.size = size
        self.is_light_on = False  # 初始状态为关闭

        # 创建初始的圆形表示灯的状态
        self.light = self.create_oval(5, 5, size - 5, size - 5, fill="gray")

    def turn_on(self):
        """打开灯"""
        self.itemconfig(self.light, fill="green")
        self.is_light_on = True

    def turn_off(self):
        """关闭灯"""
        self.itemconfig(self.light, fill="gray")
        self.is_light_on = False

    def toggle(self):
        """切换灯的状态"""
        if self.is_light_on:
            self.turn_off()
        else:
            self.turn_on()

 完成功能

  1. tk、ttk基础的UI界面建立,并建立相关列表和按钮的电机事件方法。
  2. 选择串口参数,每次打开时自动获取串口端口列表,可以选择后在进行连接(显示连接成功)。
  3. 通过多线程,串口接收数据,十六进制字节对象字符串显示。按钮发送输入框内容(有函数但是目前没写,可以自己写,比较简单)。
  4. 建立3个线程(和三个线程关闭事件,防止线程异常不能正确结束导致的死循环),分别用来接收数据包、发送数据包、更新界面显示内容。
  5. 根据接收的数据包,在canvas进行图像显示(先将获取的RGB565数据转换为RGB888,数据进行保存成图片保存在文件夹中、然后从文件夹内读出,进行HSV处理,阈值化处理,边框圈出,计算白色内容数量)。
  6. 实时显示系统时间和接收数据包图像采集的车辆数量。
  7. 手动关闭线程,每次点击关闭线程来关闭应用,确保线程彻底关闭。