server.py
import asyncioimport websocketsimport time """ 每隔2s给客户端发送一条音讯 """async def echo(websocket, path): while True: now = time.strftime("%Y-%m-%d %H:%M:%S", time.localtime()) await websocket.send(now) await asyncio.sleep(2) if __name__ == '__main__': start_server = websockets.serve(echo,'127.0.0.1',5000) # 改为你本人的地址 asyncio.get_event_loop().run_until_complete(start_server) asyncio.get_event_loop().run_forever()
client.py
import asyncioimport websockets async def hello(uri): async with websockets.connect(uri) as websocket: while True: recv_text = await websocket.recv() # 接管音讯 print(recv_text) if __name__ == '__main__': asyncio.get_event_loop().run_until_complete( hello('ws://127.0.0.1:5000')) # 改为你本人的地址
- pip装置
websockets
- 先启动
server.py
,再启动client.py
。 - 成果:
在网页上展现。client.html
<!DOCTYPE html><html> <head> <title>WebSocket demo</title> </head> <body> <script> var ws = new WebSocket("ws://127.0.0.1:5000/"), messages = document.createElement('ul'); ws.onmessage = function (event) { var messages = document.getElementsByTagName('ul')[0], message = document.createElement('li'), content = document.createTextNode(event.data); message.appendChild(content); messages.appendChild(message); }; document.body.appendChild(messages); </script> </body></html>
启动server.py
后再点开页面,成果:
参考:
https://websockets.readthedoc...