基于Socketserver+ThreadPoolExecutor+Thread构造的TCP网络实时通信程序

发布于:2025-07-02 ⋅ 阅读:(13) ⋅ 点赞:(0)

目录

介绍:

源代码:

Socketserver-服务端代码

Socketserver客户端代码:


介绍:

socketserver是一种传统的传输层网络编程接口,相比WebSocket这种应用层的协议来说,socketserver比较底层,socketserver的网络通信逻辑与收发、传输的数据格式与都要由开发者自己来定义,适合用来学习网络底层通信逻辑。我采用Python脚本来编程Socketserver的接口,我在下面放出源代码。

源代码:

我先讲一下我实现的转发模型,是C/S架构,不是P2P,由服务端中转客户端的发送消息这样。

Socketserver-服务端代码

import json
import socketserver
import struct
from threading import Thread
from concurrent.futures import ThreadPoolExecutor

from threading import Lock




def send_byte(conn,msg):

    msg__bs_len = len(msg)
    msg_bs_len_bs = struct.pack('i',msg__bs_len)

    conn.sendall(msg_bs_len_bs)
    conn.sendall(msg)

def recv_byte(conn):
    msg_recv_len_bs = conn.recv(4)
    msg_recv_len = struct.unpack('i', msg_recv_len_bs)[0]
    msg_recv = conn.recv( msg_recv_len )
    return msg_recv

def send(conn,msg):
    msg_json = json.dumps(msg)
    msg_bs = msg_json.encode('utf-8')

    msg_bs_len = len(msg_bs)
    msg_bs_len_pack=(struct.pack('i', msg_bs_len))

    conn.sendall(msg_bs_len_pack)
    conn.sendall(msg_bs)

def recv_name(conn):
    name_len_bs = conn.recv(4)
    name_len = struct.unpack('i', name_len_bs)[0]
    name_bs = conn.recv(name_len)
    name = name_bs.decode('utf-8')
    return name


def recv(conn):
    msg_len_bs = conn.recv(4)
    msg_len = struct.unpack('i', msg_len_bs)[0]

    msg_bs = conn.recv(msg_len)
    msg = msg_bs.decode('utf-8')

    msg = json.loads(msg)

    return msg



class MyRequestHandler(socketserver.BaseRequestHandler):
    client_dict = {} #{address_port:address_port,sk_conn:conn}
    name_list = []
    stor_user_list = []
    retr_user_list = []
    lock = Lock()

    def handle(self):
        conn = self.request
        address_port = self.client_address
        client_name = recv_name(conn)
        try:
            with ThreadPoolExecutor() as t:
                future = t.submit(handle_is_newuser,address_port,conn,client_name)
                def broadcast_welcome(future):
                    is_new = future.result()
                    if is_new:
                        for key,value in MyRequestHandler.client_dict.items():
                                sk_conn = value['sk_conn']
                                send(sk_conn, f"系统消息: 【{client_name}】 加入了群聊,输入/help获取命令")
                future.add_done_callback(broadcast_welcome)

        except Exception as e:
            print ('出现异常:',e)

        while 1:
            msg_dict = recv(conn)
            print (msg_dict)
            msg = msg_dict['msg']
            name = msg_dict['name']
            try:
                if msg.upper() == 'Q':
                    MyRequestHandler.name_list.remove(client_name)
                    del MyRequestHandler.client_dict[name]
                    for key, value in MyRequestHandler.client_dict.items():
                        sk_conn = value['sk_conn']
                        print (f'【{name}】退出了群聊')
                        send(sk_conn, f'【{name}】退出了群聊')
                    conn.close()

                elif msg == 'client/all':
                    send(conn,f'在线用户列表:{MyRequestHandler.name_list}')

                elif msg == '/help':
                    text ='查看在线用户:client/all\n私聊:/chat [对方名字] [消息内容]\n退出群聊:[q] or [Q]\n向对方传输文件:/stor [对方名字] [本地文件路径]\n显示递归目录树:/tree [对方名字] [远端目录]'
                    send(conn,text)


                elif msg.lstrip().startswith('/tree_content'):
                    try:
                        parts = msg.split(' ',2)
                        ip_or_name = parts[1]
                        if ip_or_name == name:
                            send(conn,'请指定对方名字')
                            continue
                        if ip_or_name in MyRequestHandler.name_list:
                            values = MyRequestHandler.client_dict[ip_or_name]
                            pri_conn = values['sk_conn']
                            send(pri_conn,msg_dict)
                    except Exception as e:
                        print ('命令执行错误',e)

                elif msg.lstrip().startswith('/tree'):
                    parts = msg.split(' ',2)
                    ip_or_name = parts[1]
                    if ip_or_name == name:
                        send(conn, '请指定对方名字')
                        continue
                    if ip_or_name in MyRequestHandler.name_list:
                        values = MyRequestHandler.client_dict[ip_or_name]
                        remote_conn = values['sk_conn']
                        send(remote_conn,msg_dict)
                    continue
                elif msg.lstrip().startswith('stor')  or  msg.lstrip().startswith('retr') :
                    print ('第一次文件传输交互')
                    msg_bytes = recv_byte(conn)
                    parts = msg.split(' ',3)
                    remote_name= parts[1]
                    client_dict_value = MyRequestHandler.client_dict[remote_name]
                    remote_conn = client_dict_value['sk_conn']
                    cmd = parts[0]
                    send(remote_conn,msg_dict)
                    if cmd == '/stor':
                            print('进来了')
                            send_byte(remote_conn,msg_bytes)
                            print (msg_bytes)
                            print ('发送成功')
                            continue

                else:
                    for key, value in MyRequestHandler.client_dict.items():
                        sk_conn = value['sk_conn']
                        send(sk_conn, msg_dict)

            except Exception as e:
                print ('意外报错:',e)



def handle_is_newuser(address_port,conn,client_name):
    dict_addr_conn = {}

    with MyRequestHandler.lock:
        if client_name in MyRequestHandler.name_list:
            return
        else:
            dict_addr_conn['address_port'] = address_port
            dict_addr_conn['sk_conn'] = conn
            MyRequestHandler.client_dict[client_name] = dict_addr_conn
            MyRequestHandler.name_list.append(client_name)
    return True


if __name__ == '__main__':
    server = socketserver.ThreadingTCPServer(('127.0.0.1', 12345), MyRequestHandler)
    print("服务器正在运行...")
    server.serve_forever()

Socketserver客户端代码:
 

import json
import os
import socket
import struct
from threading import Thread
import sys
import  time

name = ''
stor_user_list=[]

def send_byte(conn,msg):

    msg__bs_len = len(msg)
    msg_bs_len_bs = struct.pack('i',msg__bs_len)

    conn.sendall(msg_bs_len_bs)
    conn.sendall(msg)

def recv_byte(conn):
    msg_recv_len_bs = conn.recv(4)
    msg_recv_len = struct.unpack('i', msg_recv_len_bs)[0]
    msg_recv = conn.recv( msg_recv_len)
    return msg_recv

def send_name(conn):
    global name
    name = input('请取个名字吧:')
    name_bs = name.encode('utf-8')
    name_len = len(name_bs)
    conn.sendall(struct.pack('i', name_len))
    conn.sendall(name_bs)

def send_handle(conn,name_msg):
    name_msg_json = json.dumps(name_msg)
    name_msg_json_bs = name_msg_json.encode('utf-8')

    name_msg_json_bs_len = len(name_msg_json_bs)
    name_msg_json_bs_len_pack = struct.pack('i', name_msg_json_bs_len)
    conn.sendall(name_msg_json_bs_len_pack)
    conn.sendall(name_msg_json_bs)

def send(conn):
    global stor_user_list
    while True:
        name_msg = {}
        msg = input()
        name_msg['name'] = name
        name_msg['msg'] = msg
        try:
            if msg.upper() == 'Q':
                # name_msg_json = json.dumps(name_msg)
                # msg_bs = name_msg_json.encode('utf-8')
                # msg_len = len(msg_bs)
                # conn.sendall(struct.pack('i', msg_len))
                # conn.sendall(msg_bs)
                send_handle(conn,name_msg)
                print ('我退出了群聊!')
                conn.close()
                sys.exit()
            if  str(msg.lstrip()).startswith('/stor') or  str(msg.lstrip()).startswith('/retr') :
                    print('主动发起文件传输(A端)')
                    parts = msg.split(' ', 3)
                    command = parts[0]
                    remote_name = parts[1]
                    localpath = parts[2]
                    # name_msg_json = json.dumps(name_msg)
                    # msg_json_bs = name_msg_json.encode('utf-8')
                    #
                    # msg_json_bs_len = len(msg_json_bs)
                    # msg_json_bs_len_pack = struct.pack('i', msg_json_bs_len)
                    #
                    # conn.sendall(msg_json_bs_len_pack)
                    # conn.sendall(msg_json_bs )

                    if '/stor' in command:
                        name_byte = {}
                        name_byte['name'] = name

                        name_byte['msg'] = msg

                        send_handle(conn,name_byte)

                        with open(localpath, mode='rb') as read_file:
                            bytes = read_file.read()
                            print('开始发送文件')
                            send_byte(conn,bytes)
                            print('文件发送成功')
                            sys.stdout.write(f'{name}>>')
                            sys.stdout.flush()
                            continue

            send_handle(conn,name_msg)

            sys.stdout.write(f'{name}>>')
            sys.stdout.flush()

        except Exception as e:
            print('异常报错:', e)
            sys.exit()

def recv_handle(conn):
    msg_len_pack = conn.recv(4)
    msg_bs_len = struct.unpack('i', msg_len_pack)[0]
    msg_bs = conn.recv(msg_bs_len)

    msg_dict_json = msg_bs.decode('utf-8')
    msg_dict = json.loads(msg_dict_json)
    return msg_dict

def recv(conn):
    global stor_user_list
    while True:
        try:
            msg_dict = recv_handle(conn)
            sys.stdout.write('\r' + ' ' * 100 + '\r')  # 覆盖当前行
            sys.stdout.flush()
            if isinstance(msg_dict,list):
                print (msg_dict)
            elif isinstance(msg_dict,str):#由服务器发送的消息,因此无需以字典格式传输
                print (msg_dict)
            elif isinstance(msg_dict,dict):
                msg = msg_dict['msg']
                name_msg = msg_dict['name']
                print (name_msg)
                if msg.lstrip().startswith('/chat'):
                    parts = msg.split(' ', 2)
                    pri_msg = parts[2]
                    print(f'{name_msg}>>{name} {pri_msg}')

                if msg.lstrip().startswith('/tree'):
                    parts = msg.split(' ', 2)
                    local_path = parts[2]
                    tree =''
                    def list_tree(path,tree,depth=1):
                        dir_name = os.path.basename(path)
                        tree += str(depth * '|-----')+str(dir_name).strip() + '\n'
                        file_list = os.listdir(path)
                        for file in file_list:
                            filepath = os.path.join(path,file)
                            if os.path.isdir(filepath):
                                tree = list_tree(filepath,tree,depth+1)
                            if os.path.isfile(filepath):
                                tree += str(depth * '|-----') + '|-----' + file + '\n'
                        return tree
                    dir_tree =list_tree(local_path,tree)
                    dir_tree_full = '\n' + dir_tree
                    print (dir_tree_full)
                    msg_dir_tree = {}
                    msg_dir_tree['name'] = name
                    msg_dir_tree['msg'] = dir_tree_full
                    send_handle(conn,msg_dir_tree)



                if name_msg != name and msg.upper() != 'Q' and  not msg.lstrip().startswith('/chat') and not  msg.lstrip().startswith('stor')  and not msg.lstrip().startswith('retr'):
                    print(f'{name_msg}>> {msg}')
                if msg.lstrip().startswith('stor') or msg.lstrip().startswith('retr'):
                    msg_bytes = recv_byte(conn)
                    parts = msg_dict['msg'].split(' ',3)
                    command = parts[0]
                    local_path = parts[3]
                    if  '/stor' in command:
                        with open(local_path, mode='wb') as writefile:
                            print('开始文件传输(B端)',flush=True)
                            writefile.write(msg_bytes)
                            writefile.flush()
                            os.fsync(writefile.fileno())

                        print ('传输完毕', flush=True)

            sys.stdout.write(f'{name}>>')
            sys.stdout.flush()

        except Exception as e:
            print ('接收消息出错:',e)



if __name__ == '__main__':
    try:
        sk = socket.socket()
        sk.connect(('127.0.0.1', 12345))
    except Exception as e:
        print ('socket连接失败',e)
        sys.exit()

    send_name(sk)

    receiver = Thread(target=recv, args=(sk,), daemon=True)

    receiver.start()

    send(sk)
    sk.close()

定义的功能:

查看在线用户:client/all
私聊:/chat [对方名字] [消息内容]
退出群聊:[q] or [Q]
向对方传输文件:/stor [对方名字] [本地文件路径]
显示递归目录树:/tree [对方名字] [远端目录]

PS:

有点bug未修,还有些逻辑未完善(如递归目录树没有单播传递),不过能运行,小问题,你们可以拿去优化一下,我感觉我多线程逻辑也有点狗市,后面了解到websocket就毅然弃坑socketserver了