目录
python tcp 框架 asyncio
import asyncio
import json
import time
class TCPClient:
def __init__(self, host, port, heartbeat_interval=10):
self.host = host
self.port = port
self.heartbeat_interval = heartbeat_interval
self.reader = None
self.writer = None
self.connected = False
self.last_recv_time = time.time()
async def connect(self):
while True:
try:
print(f"正在连接 {self.host}:{self.port} ...")
self.reader, self.writer = await asyncio.open_connection(self.host, self.port)
self.connected = True
print("✅ 已连接服务器")
asyncio.create_task(self.send_heartbeat())
asyncio.create_task(self.receive_loop())
break
except Exception as e:
print(f"连接失败: {e},3秒后重试")
await asyncio.sleep(3)
async def send_heartbeat(self):
while self.connected:
try:
await self.send({"type": "heartbeat"})
await asyncio.sleep(self.heartbeat_interval)
except Exception as e:
print(f"心跳发送失败: {e}")
self.connected = False
break
async def receive_loop(self):
try:
while self.connected:
data = await self.reader.readline()
if not data:
print("⚠ 服务器断开连接")
self.connected = False
break
self.last_recv_time = time.time()
try:
msg = json.loads(data.decode())
self.on_message(msg)
except json.JSONDecodeError:
print(f"收到非JSON数据: {data}")
except Exception as e:
print(f"接收出错: {e}")
finally:
self.connected = False
await self.connect() # 自动重连
def on_message(self, msg):
"""收到消息时触发(你可以改成事件回调)"""
print(f"📩 收到消息: {msg}")
async def send(self, obj):
if self.writer and not self.writer.is_closing():
line = json.dumps(obj) + "\n"
self.writer.write(line.encode())
await self.writer.drain()
else:
print("❌ 未连接,无法发送")
# === 创建全局客户端实例 ===
client = TCPClient("127.0.0.1", 8888)
async def main():
await client.connect()
# === 在任意地方调用发送 ===
async def send_message():
await client.send({"type": "chat", "msg": "Hello Server"})
if __name__ == "__main__":
loop = asyncio.get_event_loop()
loop.create_task(main())
# 模拟3秒后在别的地方发消息
loop.call_later(3, lambda: asyncio.create_task(send_message()))
loop.run_forever()
websockets
安装依赖
pip install websockets
示例代码 python
编辑
import asyncio
import websockets
import json
import threading
SERVER_URI = "ws://127.0.0.1:8765"
# 全局 websocket 引用,用于在其他地方发消息
ws_conn = None
async def heartbeat(ws):
"""定时发送心跳包"""
while True:
try:
await ws.send(json.dumps({"type": "ping"}))
except Exception as e:
print("心跳发送失败:", e)
break
await asyncio.sleep(5) # 心跳间隔
async def listen_messages(ws):
"""监听服务器消息"""
try:
async for message in ws:
data = json.loads(message)
print("收到消息:", data)
except websockets.ConnectionClosed:
print("连接已关闭")
async def send_message(data):
"""在其他地方调用的发消息方法"""
global ws_conn
if ws_conn:
await ws_conn.send(json.dumps(data))
else:
print("未连接服务器,无法发送")
async def main():
global ws_conn
async with websockets.connect(SERVER_URI) as websocket:
ws_conn = websocket
# 并发执行 心跳 和 收消息
await asyncio.gather(
heartbeat(websocket),
listen_messages(websocket)
)
def start_client():
"""启动 WebSocket 客户端"""
asyncio.run(main())
def send_from_other_thread(msg):
"""从其他线程发送消息"""
asyncio.run(send_message({"type": "chat", "text": msg}))
if __name__ == "__main__":
# 启动 WebSocket 客户端(独立线程)
t = threading.Thread(target=start_client, daemon=True)
t.start()
# 等待连接建立
import time
time.sleep(2)
# 从主线程模拟发送消息
send_from_other_thread("Hello from main thread!")
# 防止主线程退出
t.join()