python tcp 框架

发布于:2025-08-11 ⋅ 阅读:(22) ⋅ 点赞:(0)

目录

python tcp 框架 asyncio

websockets


python tcp 框架 asyncio

import asyncio
import json
import time

class TCPClient:
    def __init__(self, host, port, heartbeat_interval=10):
        self.host = host
        self.port = port
        self.heartbeat_interval = heartbeat_interval
        self.reader = None
        self.writer = None
        self.connected = False
        self.last_recv_time = time.time()

    async def connect(self):
        while True:
            try:
                print(f"正在连接 {self.host}:{self.port} ...")
                self.reader, self.writer = await asyncio.open_connection(self.host, self.port)
                self.connected = True
                print("✅ 已连接服务器")
                asyncio.create_task(self.send_heartbeat())
                asyncio.create_task(self.receive_loop())
                break
            except Exception as e:
                print(f"连接失败: {e},3秒后重试")
                await asyncio.sleep(3)

    async def send_heartbeat(self):
        while self.connected:
            try:
                await self.send({"type": "heartbeat"})
                await asyncio.sleep(self.heartbeat_interval)
            except Exception as e:
                print(f"心跳发送失败: {e}")
                self.connected = False
                break

    async def receive_loop(self):
        try:
            while self.connected:
                data = await self.reader.readline()
                if not data:
                    print("⚠ 服务器断开连接")
                    self.connected = False
                    break
                self.last_recv_time = time.time()
                try:
                    msg = json.loads(data.decode())
                    self.on_message(msg)
                except json.JSONDecodeError:
                    print(f"收到非JSON数据: {data}")
        except Exception as e:
            print(f"接收出错: {e}")
        finally:
            self.connected = False
            await self.connect()  # 自动重连

    def on_message(self, msg):
        """收到消息时触发(你可以改成事件回调)"""
        print(f"📩 收到消息: {msg}")

    async def send(self, obj):
        if self.writer and not self.writer.is_closing():
            line = json.dumps(obj) + "\n"
            self.writer.write(line.encode())
            await self.writer.drain()
        else:
            print("❌ 未连接,无法发送")

# === 创建全局客户端实例 ===
client = TCPClient("127.0.0.1", 8888)

async def main():
    await client.connect()

# === 在任意地方调用发送 ===
async def send_message():
    await client.send({"type": "chat", "msg": "Hello Server"})

if __name__ == "__main__":
    loop = asyncio.get_event_loop()
    loop.create_task(main())

    # 模拟3秒后在别的地方发消息
    loop.call_later(3, lambda: asyncio.create_task(send_message()))

    loop.run_forever()

websockets

安装依赖
pip install websockets
示例代码 python


编辑
import asyncio
import websockets
import json
import threading

SERVER_URI = "ws://127.0.0.1:8765"

# 全局 websocket 引用,用于在其他地方发消息
ws_conn = None


async def heartbeat(ws):
    """定时发送心跳包"""
    while True:
        try:
            await ws.send(json.dumps({"type": "ping"}))
        except Exception as e:
            print("心跳发送失败:", e)
            break
        await asyncio.sleep(5)  # 心跳间隔


async def listen_messages(ws):
    """监听服务器消息"""
    try:
        async for message in ws:
            data = json.loads(message)
            print("收到消息:", data)
    except websockets.ConnectionClosed:
        print("连接已关闭")


async def send_message(data):
    """在其他地方调用的发消息方法"""
    global ws_conn
    if ws_conn:
        await ws_conn.send(json.dumps(data))
    else:
        print("未连接服务器,无法发送")


async def main():
    global ws_conn
    async with websockets.connect(SERVER_URI) as websocket:
        ws_conn = websocket

        # 并发执行 心跳 和 收消息
        await asyncio.gather(
            heartbeat(websocket),
            listen_messages(websocket)
        )


def start_client():
    """启动 WebSocket 客户端"""
    asyncio.run(main())


def send_from_other_thread(msg):
    """从其他线程发送消息"""
    asyncio.run(send_message({"type": "chat", "text": msg}))


if __name__ == "__main__":
    # 启动 WebSocket 客户端(独立线程)
    t = threading.Thread(target=start_client, daemon=True)
    t.start()

    # 等待连接建立
    import time
    time.sleep(2)

    # 从主线程模拟发送消息
    send_from_other_thread("Hello from main thread!")

    # 防止主线程退出
    t.join()