目录
python tcp 框架 asyncio
websockets
python tcp 框架 asyncio
import asyncio
import json
import timeclass TCPClient:def __init__(self, host, port, heartbeat_interval=10):self.host = hostself.port = portself.heartbeat_interval = heartbeat_intervalself.reader = Noneself.writer = Noneself.connected = Falseself.last_recv_time = time.time()async def connect(self):while True:try:print(f"正在连接 {self.host}:{self.port} ...")self.reader, self.writer = await asyncio.open_connection(self.host, self.port)self.connected = Trueprint("✅ 已连接服务器")asyncio.create_task(self.send_heartbeat())asyncio.create_task(self.receive_loop())breakexcept Exception as e:print(f"连接失败: {e},3秒后重试")await asyncio.sleep(3)async def send_heartbeat(self):while self.connected:try:await self.send({"type": "heartbeat"})await asyncio.sleep(self.heartbeat_interval)except Exception as e:print(f"心跳发送失败: {e}")self.connected = Falsebreakasync def receive_loop(self):try:while self.connected:data = await self.reader.readline()if not data:print("⚠ 服务器断开连接")self.connected = Falsebreakself.last_recv_time = time.time()try:msg = json.loads(data.decode())self.on_message(msg)except json.JSONDecodeError:print(f"收到非JSON数据: {data}")except Exception as e:print(f"接收出错: {e}")finally:self.connected = Falseawait self.connect() # 自动重连def on_message(self, msg):"""收到消息时触发(你可以改成事件回调)"""print(f"📩 收到消息: {msg}")async def send(self, obj):if self.writer and not self.writer.is_closing():line = json.dumps(obj) + "\n"self.writer.write(line.encode())await self.writer.drain()else:print("❌ 未连接,无法发送")# === 创建全局客户端实例 ===
client = TCPClient("127.0.0.1", 8888)async def main():await client.connect()# === 在任意地方调用发送 ===
async def send_message():await client.send({"type": "chat", "msg": "Hello Server"})if __name__ == "__main__":loop = asyncio.get_event_loop()loop.create_task(main())# 模拟3秒后在别的地方发消息loop.call_later(3, lambda: asyncio.create_task(send_message()))loop.run_forever()
websockets
安装依赖
pip install websockets
示例代码 python
编辑
import asyncio
import websockets
import json
import threadingSERVER_URI = "ws://127.0.0.1:8765"# 全局 websocket 引用,用于在其他地方发消息
ws_conn = Noneasync def heartbeat(ws):"""定时发送心跳包"""while True:try:await ws.send(json.dumps({"type": "ping"}))except Exception as e:print("心跳发送失败:", e)breakawait asyncio.sleep(5) # 心跳间隔async def listen_messages(ws):"""监听服务器消息"""try:async for message in ws:data = json.loads(message)print("收到消息:", data)except websockets.ConnectionClosed:print("连接已关闭")async def send_message(data):"""在其他地方调用的发消息方法"""global ws_connif ws_conn:await ws_conn.send(json.dumps(data))else:print("未连接服务器,无法发送")async def main():global ws_connasync with websockets.connect(SERVER_URI) as websocket:ws_conn = websocket# 并发执行 心跳 和 收消息await asyncio.gather(heartbeat(websocket),listen_messages(websocket))def start_client():"""启动 WebSocket 客户端"""asyncio.run(main())def send_from_other_thread(msg):"""从其他线程发送消息"""asyncio.run(send_message({"type": "chat", "text": msg}))if __name__ == "__main__":# 启动 WebSocket 客户端(独立线程)t = threading.Thread(target=start_client, daemon=True)t.start()# 等待连接建立import timetime.sleep(2)# 从主线程模拟发送消息send_from_other_thread("Hello from main thread!")# 防止主线程退出t.join()