以下是一个基于Python Websockets的客户端程序,能够连接服务端、处理ping/pong、发送订阅请求并保持长连接:
import asyncio
import websockets
import json
import time
import logging
import traceback
from datetime import datetime, timedelta
# 配置日志
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(levelname)s - %(message)s',
handlers=[
logging.FileHandler("websocket_client.log"),
logging.StreamHandler()
]
)
logger = logging.getLogger(__name__)
class WebSocketClient:
def __init__(self, uri, subscribe_message, max_reconnect_attempts=5):
self.uri = uri
self.subscribe_message = subscribe_message
self.max_reconnect_attempts = max_reconnect_attempts
self.reconnect_delay = 5 # 初始重连延迟(秒)
self.last_message_time = time.time()
self.connection_active = False
self.websocket = None
self.message_counter = 0
self.ping_counter = 0
self.pong_counter = 0
async def connect(self):
"""建立WebSocket连接并保持活动状态"""
reconnect_attempts = 0
while True:
try:
logger.info(f"Connecting to {
self.uri}")
async with websockets.connect(
self.uri,
ping_interval=20, # 发送底层ping的间隔
ping_timeout=15, # 等待pong的超时时间
close_timeout=5 # 关闭超时
) as websocket:
self.websocket = websocket
self.connection_active = True
reconnect_attempts = 0
self.reconnect_delay = 5
logger.info("Connection established. Sending subscribe message...")
await self.send_subscribe()
# 启动消息处理任务
await self.handle_messages()
except (websockets.ConnectionClosed, ConnectionRefusedError) as e:
logger.warning(f"Connection closed: {
e}")
self.connection_active = False
except Exception as e:
logger.error(f"Unexpected error: {
str(e)}")
logger.debug(traceback.format_exc())
self.connection_active = False
finally:
# 重连逻辑
if reconnect_attempts