广州网站建设中心,品牌设计包括哪些要素,wordpress主题文件夹,百度风云榜排行榜大家好#xff0c;我是jobleap.cn的小九。
Python 的 websockets 库是基于 asyncio 实现的异步 WebSocket 客户端/服务端框架#xff0c;遵循 RFC 6455 标准#xff0c;支持全双工通信、心跳检测、认证等核心特性#xff0c;是构建实时通信场景#xff08;如聊天室、实时监…大家好我是jobleap.cn的小九。Python 的websockets库是基于asyncio实现的异步 WebSocket 客户端/服务端框架遵循 RFC 6455 标准支持全双工通信、心跳检测、认证等核心特性是构建实时通信场景如聊天室、实时监控、推送服务的首选工具。本文将系统讲解websockets的所有常用 API并通过实战案例串联知识点帮助你掌握异步 WebSocket 开发的全流程。一、环境准备安装依赖websockets仅支持 Python 3.7且依赖asyncioPython 3.7 内置安装命令如下# 安装最新稳定版pipinstallwebsockets# 验证安装python -cimport websockets; print(websockets.__version__)二、核心概念与基础架构WebSocket 是基于 TCP 的全双工通信协议核心特点一次握手持久连接无需反复建立 HTTP 连接服务端/客户端可主动推送消息支持文本、二进制消息传输。websockets核心 API 分为两大模块服务端websockets.serve()创建服务端、websockets.WebSocketServerProtocol连接对象客户端websockets.connect()创建客户端、websockets.WebSocketClientProtocol连接对象通用消息收发send()/recv()、连接关闭close()、状态检测等。三、基础常用 API 详解3.1 最简示例Echo 服务服务端客户端实现“客户端发什么服务端就返回什么”的基础功能覆盖核心 APIserve()、connect()、recv()、send()。3.1.1 服务端代码importasyncioimportwebsockets# 核心处理函数每个客户端连接会触发该函数asyncdefecho_handler(websocket,path): websocket: 客户端连接对象WebSocketServerProtocol 实例 path: 客户端请求的路径如 ws://localhost:8765/chat try:# 循环接收客户端消息全双工通信持续监听asyncformessageinwebsocket:print(f服务端收到{message})# 发送消息给客户端echo 逻辑awaitwebsocket.send(fEcho:{message})finally:print(f客户端{websocket.remote_address}断开连接)# 启动服务端asyncdefstart_echo_server():# 核心 APIwebsockets.serve() 创建服务端# 参数处理函数、绑定地址、端口asyncwithwebsockets.serve(echo_handler,localhost,8765):print(Echo 服务端已启动ws://localhost:8765)# 挂起事件循环保持服务运行awaitasyncio.Future()# 无限运行# 运行服务端if__name____main__:asyncio.run(start_echo_server())3.1.2 客户端代码importasyncioimportwebsocketsasyncdefecho_client():# 核心 APIwebsockets.connect() 连接服务端asyncwithwebsockets.connect(ws://localhost:8765)aswebsocket:# 发送文本消息核心 APIsend()awaitwebsocket.send(Hello WebSocket!)# 接收消息核心 APIrecv()responseawaitwebsocket.recv()print(f客户端收到{response})# 主动发送多条消息foriinrange(2):msgf测试消息{i1}awaitwebsocket.send(msg)resawaitwebsocket.recv()print(f客户端收到{res})# 运行客户端需先启动服务端if__name____main__:asyncio.run(echo_client())3.2 消息类型文本与二进制消息WebSocket 支持文本str和二进制bytes两种消息类型websockets原生支持这两种格式的收发3.2.1 服务端支持二进制消息importasyncioimportwebsocketsasyncdefbinary_handler(websocket,path):asyncformessageinwebsocket:# 判断消息类型ifisinstance(message,str):print(f文本消息{message})awaitwebsocket.send(f文本响应{message})elifisinstance(message,bytes):print(f二进制消息长度{len(message)}{message})# 二进制消息响应比如返回原字节后缀awaitwebsocket.send(messageb_response)asyncdefstart_binary_server():asyncwithwebsockets.serve(binary_handler,localhost,8766):print(二进制消息服务端已启动ws://localhost:8766)awaitasyncio.Future()# asyncio.run(start_binary_server())3.2.2 客户端发送二进制消息importasyncioimportwebsocketsasyncdefbinary_client():asyncwithwebsockets.connect(ws://localhost:8766)aswebsocket:# 发送文本消息awaitwebsocket.send(这是文本消息)text_respawaitwebsocket.recv()print(f文本响应{text_resp})# 发送二进制消息比如图片字节、自定义二进制数据binary_databhello_binaryawaitwebsocket.send(binary_data)binary_respawaitwebsocket.recv()print(f二进制响应{binary_resp})# asyncio.run(binary_client())3.3 连接管理关闭连接与状态检测websockets提供连接状态检测、主动关闭连接的 API核心属性/方法websocket.closed判断连接是否关闭布尔值websocket.close(code1000, reason)主动关闭连接code 遵循 WebSocket 标准码websocket.close_code/websocket.close_reason获取关闭码/原因。示例主动关闭连接与状态检测importasyncioimportwebsocketsasyncdefclose_handler(websocket,path):# 接收客户端消息触发关闭逻辑messageawaitwebsocket.recv()ifmessageclose:# 主动关闭连接标准关闭码 1000正常关闭awaitwebsocket.close(code1000,reason客户端请求关闭)print(f连接已关闭码{websocket.close_code}原因{websocket.close_reason})else:awaitwebsocket.send(f收到{message})asyncdefstart_close_server():asyncwithwebsockets.serve(close_handler,localhost,8767):print(关闭连接测试服务端已启动ws://localhost:8767)awaitasyncio.Future()asyncdefclose_client():asyncwithwebsockets.connect(ws://localhost:8767)aswebsocket:# 检测初始状态print(f初始连接状态{关闭ifwebsocket.closedelse开启})# 发送关闭指令awaitwebsocket.send(close)# 等待连接关闭可选awaitasyncio.sleep(0.1)print(f发送关闭指令后状态{关闭ifwebsocket.closedelse开启})# 先启动服务端再运行客户端# asyncio.run(start_close_server())# asyncio.run(close_client())3.4 异常处理捕获 WebSocket 相关异常websockets定义了多种异常类型核心异常包括websockets.exceptions.ConnectionClosed连接已关闭websockets.exceptions.ConnectionClosedError连接异常关闭websockets.exceptions.InvalidURI无效的 WebSocket 地址websockets.exceptions.TimeoutError连接/收发超时。示例完整异常处理importasyncioimportwebsocketsfromwebsockets.exceptionsimport(ConnectionClosed,InvalidURI,TimeoutError)asyncdefexception_handler(websocket,path):try:asyncformessageinwebsocket:print(f收到消息{message})awaitwebsocket.send(f响应{message})exceptConnectionClosedase:print(f连接关闭异常码{e.code}原因{e.reason})exceptExceptionase:print(f未知异常{e})asyncdefstart_exception_server():asyncwithwebsockets.serve(exception_handler,localhost,8768):print(异常处理服务端已启动ws://localhost:8768)awaitasyncio.Future()asyncdefexception_client():try:# 测试1无效 URI故意写错# async with websockets.connect(ws://localhost:8768/invalid) as websocket:# 测试2正常连接后强制断开asyncwithwebsockets.connect(ws://localhost:8768,timeout5)aswebsocket:awaitwebsocket.send(测试消息)# 模拟服务端断开后继续发送awaitasyncio.sleep(1)awaitwebsocket.send(断开后发送的消息)# 触发 ConnectionClosedexceptInvalidURIase:print(f错误无效的 URI -{e})exceptTimeoutErrorase:print(f错误连接超时 -{e})exceptConnectionClosedase:print(f错误连接已关闭 -{e.code}|{e.reason})exceptExceptionase:print(f通用错误{e})# asyncio.run(start_exception_server())# asyncio.run(exception_client())3.5 进阶 API心跳检测防止连接断开WebSocket 长连接可能因网络闲置被网关/服务器断开心跳检测是核心解决方案客户端/服务端定期发送心跳包如ping对方回复pong确认连接存活。websockets内置ping()/pong()方法也可自定义心跳逻辑示例带心跳检测的客户端服务端importasyncioimportwebsocketsfromdatetimeimportdatetime# ---------------------- 服务端 ----------------------asyncdefheartbeat_server_handler(websocket,path):# 启动心跳检测任务后台运行heartbeat_taskasyncio.create_task(server_heartbeat(websocket))try:asyncformessageinwebsocket:ifmessageping:# 响应心跳包awaitwebsocket.send(pong)print(f[{datetime.now()}] 服务端收到心跳回复 pong)else:print(f[{datetime.now()}] 服务端收到消息{message})awaitwebsocket.send(f响应{message})finally:# 取消心跳任务heartbeat_task.cancel()print(f[{datetime.now()}] 客户端断开心跳任务已取消)asyncdefserver_heartbeat(websocket):服务端心跳检测定期检查连接状态whileTrue:awaitasyncio.sleep(10)# 每10秒检测一次ifwebsocket.closed:break# 主动发送 ping可选也可等客户端发awaitwebsocket.ping()print(f[{datetime.now()}] 服务端发送 ping 检测)asyncdefstart_heartbeat_server():asyncwithwebsockets.serve(heartbeat_server_handler,localhost,8769):print(心跳检测服务端已启动ws://localhost:8769)awaitasyncio.Future()# ---------------------- 客户端 ----------------------asyncdefclient_heartbeat(websocket):客户端心跳任务每5秒发送一次 pingwhileTrue:awaitasyncio.sleep(5)ifwebsocket.closed:breakawaitwebsocket.send(ping)# 等待 pong 响应超时则断开try:respawaitasyncio.wait_for(websocket.recv(),timeout3)ifresppong:print(f[{datetime.now()}] 客户端收到 pong连接正常)else:raiseException(非心跳响应)exceptasyncio.TimeoutError:print(f[{datetime.now()}] 心跳超时关闭连接)awaitwebsocket.close()breakasyncdefheartbeat_client():try:asyncwithwebsockets.connect(ws://localhost:8769)aswebsocket:# 启动客户端心跳任务heartbeat_taskasyncio.create_task(client_heartbeat(websocket))# 发送业务消息awaitwebsocket.send(业务消息1)resp1awaitwebsocket.recv()print(f[{datetime.now()}] 客户端收到{resp1})# 等待心跳运行模拟长连接awaitasyncio.sleep(20)# 取消心跳任务heartbeat_task.cancel()exceptExceptionase:print(f客户端异常{e})# 运行先启动服务端再启动客户端# asyncio.run(start_heartbeat_server())# asyncio.run(heartbeat_client())3.6 进阶 API连接认证Token/用户名密码WebSocket 连接建立时可通过请求头或路径参数做身份认证websockets支持在连接握手阶段验证示例基于 Token 的认证importasyncioimportwebsocketsfromwebsockets.exceptionsimportRejectConnection# 模拟合法 Token 列表VALID_TOKENS{token_123,token_456}# ---------------------- 认证中间件 ----------------------asyncdefauth_middleware(websocket,path): 连接握手阶段认证 - 从请求头获取 Token - 验证失败则拒绝连接RejectConnection # 获取请求头中的 Tokenwebsocket.request_headers 是类字典对象tokenwebsocket.request_headers.get(X-Auth-Token)ifnottokenortokennotinVALID_TOKENS:# 拒绝连接标准 401 状态码raiseRejectConnection(401,Invalid or missing token)# 认证通过进入业务处理awaitauth_handler(websocket,path)# ---------------------- 业务处理 ----------------------asyncdefauth_handler(websocket,path):print(f客户端{websocket.remote_address}认证通过开始通信)asyncformessageinwebsocket:awaitwebsocket.send(f认证通过的响应{message})# ---------------------- 启动认证服务端 ----------------------asyncdefstart_auth_server():asyncwithwebsockets.serve(auth_middleware,localhost,8770):print(认证服务端已启动ws://localhost:8770)awaitasyncio.Future()# ---------------------- 认证客户端 ----------------------asyncdefauth_client(valid_token:boolTrue):# 构造请求头携带 Tokenheaders{}ifvalid_token:headers[X-Auth-Token]token_123# 合法 Tokenelse:headers[X-Auth-Token]invalid_token# 非法 Tokentry:asyncwithwebsockets.connect(ws://localhost:8770,extra_headersheaders# 自定义请求头)aswebsocket:awaitwebsocket.send(认证后的业务消息)respawaitwebsocket.recv()print(f客户端收到{resp})exceptwebsockets.exceptions.ConnectionRefusedErrorase:print(f连接被拒绝{e})# 运行# 1. 启动服务端asyncio.run(start_auth_server())# 2. 合法 Token 客户端asyncio.run(auth_client(valid_tokenTrue))# 3. 非法 Token 客户端asyncio.run(auth_client(valid_tokenFalse))3.7 进阶 API服务端广播多客户端通信WebSocket 最典型的场景是多客户端实时通信如聊天室核心是维护客户端连接池实现消息广播示例简易聊天室广播多客户端importasyncioimportwebsocketsfromtypingimportSet# 维护所有活跃的客户端连接active_connections:Set[websockets.WebSocketServerProtocol]set()asyncdefchat_handler(websocket,path):# 1. 新客户端连接加入连接池active_connections.add(websocket)client_addrwebsocket.remote_addressprint(f[{client_addr}] 加入聊天室当前在线{len(active_connections)})try:# 2. 接收客户端消息并广播asyncformessageinwebsocket:broadcast_msgf[{client_addr}]{message}print(f广播消息{broadcast_msg})# 遍历所有客户端发送广播消息forconninactive_connections:ifconn!websocket:# 不发给自己awaitconn.send(broadcast_msg)exceptwebsockets.exceptions.ConnectionClosed:print(f[{client_addr}] 异常断开)finally:# 3. 客户端断开从连接池移除active_connections.remove(websocket)print(f[{client_addr}] 离开聊天室当前在线{len(active_connections)})asyncdefstart_chat_server():asyncwithwebsockets.serve(chat_handler,localhost,8771):print(聊天室服务端已启动ws://localhost:8771)awaitasyncio.Future()# ---------------------- 聊天室客户端 ----------------------asyncdefchat_client(username:str):asyncwithwebsockets.connect(ws://localhost:8771)aswebsocket:# 启动接收消息的任务后台运行实时接收广播recv_taskasyncio.create_task(recv_chat_msg(websocket))# 控制台输入消息并发送whileTrue:msginput(f{username} )ifmsg.lower()exit:breakawaitwebsocket.send(f{username}{msg})# 取消接收任务recv_task.cancel()awaitwebsocket.close()asyncdefrecv_chat_msg(websocket):后台接收广播消息whileTrue:try:msgawaitwebsocket.recv()print(f\n收到广播{msg})print( ,end,flushTrue)# 恢复输入提示符exceptwebsockets.exceptions.ConnectionClosed:break# 运行# 1. 启动聊天室服务端asyncio.run(start_chat_server())# 2. 启动多个客户端不同终端运行# asyncio.run(chat_client(用户1))# asyncio.run(chat_client(用户2))四、实战案例串联所有常用 API 的实时监控系统场景需求实现一个简易的实时监控系统服务端认证客户端Token 验证接收客户端上报的监控数据二进制/文本广播监控数据给所有在线客户端心跳检测清理离线客户端完整异常处理。客户端携带 Token 连接服务端定期上报监控数据CPU/内存使用率模拟实时接收服务端广播的所有客户端监控数据心跳检测断连自动重连。完整代码实现4.1 服务端代码monitor_server.pyimportasyncioimportwebsocketsfromwebsockets.exceptionsimportRejectConnection,ConnectionClosedfromtypingimportSet,Dictfromdatetimeimportdatetime# 配置VALID_TOKENS{monitor_token_2025}# 合法认证 TokenSERVER_HOSTlocalhostSERVER_PORT8772HEARTBEAT_INTERVAL10# 服务端心跳检测间隔秒CLIENT_TIMEOUT30# 客户端超时时间秒# 客户端连接池{连接对象: {addr: 地址, last_heartbeat: 最后心跳时间}}client_pool:Dict[websockets.WebSocketServerProtocol,dict]{}# ---------------------- 认证逻辑 ----------------------asyncdefauth_middleware(websocket,path):tokenwebsocket.request_headers.get(X-Monitor-Token)ifnottokenortokennotinVALID_TOKENS:raiseRejectConnection(401,Invalid Monitor Token)# 认证通过记录客户端信息client_pool[websocket]{addr:websocket.remote_address,last_heartbeat:datetime.now()}print(f[{datetime.now()}] 客户端{websocket.remote_address}认证通过加入监控)# 进入业务处理awaitmonitor_handler(websocket,path)# ---------------------- 心跳检测任务 ----------------------asyncdefserver_heartbeat_check():后台定时清理离线客户端whileTrue:awaitasyncio.sleep(HEARTBEAT_INTERVAL)nowdatetime.now()# 遍历连接池检测超时客户端forconn,infoinlist(client_pool.items()):if(now-info[last_heartbeat]).total_seconds()CLIENT_TIMEOUT:print(f[{datetime.now()}] 客户端{info[addr]}心跳超时强制断开)awaitconn.close(code1001,reasonHeartbeat timeout)delclient_pool[conn]# ---------------------- 业务处理 ----------------------asyncdefmonitor_handler(websocket,path):try:asyncformessageinwebsocket:# 更新最后心跳时间无论消息类型视为存活client_pool[websocket][last_heartbeat]datetime.now()# 处理监控数据ifisinstance(message,str):# 文本消息心跳/控制指令ifmessageping:awaitwebsocket.send(pong)print(f[{datetime.now()}] 客户端{websocket.remote_address}心跳响应)else:print(f[{datetime.now()}] 客户端{websocket.remote_address}文本监控数据{message})elifisinstance(message,bytes):# 二进制消息模拟监控数据如序列化的指标print(f[{datetime.now()}] 客户端{websocket.remote_address}二进制监控数据长度{len(message)})# 广播监控数据给所有客户端broadcast_msgf[{websocket.remote_address}]{message[:50]}...# 截断长消息forconninclient_pool:ifconn!websocket:awaitconn.send(broadcast_msg)exceptConnectionClosedase:print(f[{datetime.now()}] 客户端{websocket.remote_address}断开码{e.code}原因{e.reason})finally:# 清理连接池ifwebsocketinclient_pool:delclient_pool[websocket]print(f[{datetime.now()}] 客户端{websocket.remote_address}已移除当前在线{len(client_pool)})# ---------------------- 启动服务端 ----------------------asyncdefstart_monitor_server():# 启动心跳检测后台任务asyncio.create_task(server_heartbeat_check())# 启动 WebSocket 服务asyncwithwebsockets.serve(auth_middleware,SERVER_HOST,SERVER_PORT):print(f[{datetime.now()}] 监控服务端已启动ws://{SERVER_HOST}:{SERVER_PORT})awaitasyncio.Future()if__name____main__:asyncio.run(start_monitor_server())4.2 客户端代码monitor_client.pyimportasyncioimportwebsocketsfromwebsockets.exceptionsimportConnectionRefusedError,ConnectionClosedimportrandomfromdatetimeimportdatetimeimporttime# 配置SERVER_URLws://localhost:8772AUTH_TOKENmonitor_token_2025# 合法 TokenREPORT_INTERVAL5# 监控数据上报间隔秒HEARTBEAT_INTERVAL8# 客户端心跳间隔秒RECONNECT_INTERVAL3# 断连重连间隔秒# ---------------------- 心跳任务 ----------------------asyncdefclient_heartbeat(websocket):客户端心跳定期发送 pingwhileTrue:awaitasyncio.sleep(HEARTBEAT_INTERVAL)ifwebsocket.closed:breaktry:awaitwebsocket.send(ping)respawaitasyncio.wait_for(websocket.recv(),timeout3)ifresppong:print(f[{datetime.now()}] 心跳正常)else:raiseException(非 pong 响应)exceptasyncio.TimeoutError:print(f[{datetime.now()}] 心跳超时准备重连)awaitwebsocket.close()breakexceptExceptionase:print(f[{datetime.now()}] 心跳异常{e})break# ---------------------- 接收广播任务 ----------------------asyncdefrecv_broadcast(websocket):接收服务端广播的监控数据whileTrue:try:msgawaitwebsocket.recv()print(f\n[{datetime.now()}] 收到监控广播{msg})exceptConnectionClosed:break# ---------------------- 上报监控数据 ----------------------asyncdefreport_monitor_data(websocket):模拟上报 CPU/内存使用率文本二进制whileTrue:awaitasyncio.sleep(REPORT_INTERVAL)ifwebsocket.closed:break# 模拟监控数据cpu_usagerandom.uniform(10.0,80.0)mem_usagerandom.uniform(20.0,90.0)# 1. 发送文本监控数据text_datafCPU:{cpu_usage:.2f}%, MEM:{mem_usage:.2f}%awaitwebsocket.send(text_data)print(f[{datetime.now()}] 上报文本监控数据{text_data})# 2. 发送二进制监控数据模拟序列化的指标binary_dataf{cpu_usage:.2f}|{mem_usage:.2f}.encode(utf-8)awaitwebsocket.send(binary_data)print(f[{datetime.now()}] 上报二进制监控数据{binary_data})# ---------------------- 客户端主逻辑支持重连 ----------------------asyncdefmonitor_client():whileTrue:try:# 建立连接携带认证 Tokenasyncwithwebsockets.connect(SERVER_URL,extra_headers{X-Monitor-Token:AUTH_TOKEN})aswebsocket:print(f[{datetime.now()}] 成功连接监控服务端)# 启动后台任务heartbeat_taskasyncio.create_task(client_heartbeat(websocket))broadcast_taskasyncio.create_task(recv_broadcast(websocket))report_taskasyncio.create_task(report_monitor_data(websocket))# 等待任务完成或连接断开awaitasyncio.gather(heartbeat_task,broadcast_task,report_task)exceptConnectionRefusedError:print(f[{datetime.now()}] 连接被拒绝服务端未启动/Token 错误)exceptConnectionClosed:print(f[{datetime.now()}] 连接断开)exceptExceptionase:print(f[{datetime.now()}] 客户端异常{e})# 断连后重连print(f[{datetime.now()}]{RECONNECT_INTERVAL}秒后尝试重连...)awaitasyncio.sleep(RECONNECT_INTERVAL)if__name____main__:asyncio.run(monitor_client())运行说明启动服务端python monitor_server.py启动多个客户端不同终端python monitor_client.py观察效果客户端自动认证连接定期上报文本/二进制监控数据服务端广播数据给所有客户端心跳检测超时自动清理客户端客户端断连后自动重连。五、核心 API 总结与最佳实践5.1 核心 API 速查表类别API 名称/方法作用服务端websockets.serve(handler, host, port)创建 WebSocket 服务端客户端websockets.connect(uri, **kwargs)连接 WebSocket 服务端消息收发await websocket.send(msg)发送文本/二进制消息await websocket.recv()接收消息单次async for msg in websocket持续监听消息推荐连接管理websocket.closed判断连接是否关闭await websocket.close(code, reason)主动关闭连接code 遵循 WebSocket 标准码websocket.request_headers获取客户端请求头服务端认证RejectConnection(status_code, reason)服务端拒绝连接认证失败心跳await websocket.ping()/pong()内置 ping/pong 心跳底层异常ConnectionClosed连接关闭异常InvalidURI无效 URI 异常TimeoutError连接/收发超时异常5.2 最佳实践连接池管理服务端维护客户端连接池时使用set/dict存储连接并在finally块中清理避免内存泄漏心跳必加长连接场景必须实现心跳检测防止闲置连接被网关/服务器断开异常全覆盖至少捕获ConnectionClosed、ConnectionRefusedError、TimeoutError等核心异常重连机制客户端实现断连自动重连提升鲁棒性并发控制服务端广播时若客户端数量多使用asyncio.Semaphore限制并发发送避免事件循环阻塞认证安全认证 Token 避免明文传输生产环境建议使用 WSSWebSocket Secure即wss://消息大小限制生产环境可通过max_size参数限制消息大小防止超大消息攻击# 服务端限制单条消息最大 1MBwebsockets.serve(handler,host,port,max_size1024*1024)# 客户端限制单条消息最大 1MBwebsockets.connect(uri,max_size1024*1024)六、总结本文覆盖了websockets库的所有核心常用 API基础服务端创建、客户端连接、文本/二进制消息收发进阶连接管理、异常处理、心跳检测、身份认证、服务端广播实战通过实时监控系统串联所有 API实现认证、心跳、数据上报、广播、重连等完整功能。掌握这些知识点后你可以轻松构建各类实时通信场景聊天室、实时监控、推送服务、在线游戏等同时遵循最佳实践确保系统的稳定性和安全性。