server.py
import asyncio
import websockets
import time
"""每隔 2s 给客户端发送一条音讯"""
async def echo(websocket, path):
while True:
now = time.strftime("%Y-%m-%d %H:%M:%S", time.localtime())
await websocket.send(now)
await asyncio.sleep(2)
if __name__ == '__main__':
start_server = websockets.serve(echo,'127.0.0.1',5000) # 改为你本人的地址
asyncio.get_event_loop().run_until_complete(start_server)
asyncio.get_event_loop().run_forever()
client.py
import asyncio
import websockets
async def hello(uri):
async with websockets.connect(uri) as websocket:
while True:
recv_text = await websocket.recv() # 接管音讯
print(recv_text)
if __name__ == '__main__':
asyncio.get_event_loop().run_until_complete(hello('ws://127.0.0.1:5000')) # 改为你本人的地址
- pip 装置
websockets
- 先启动
server.py
,再启动client.py
。 - 成果:
在网页上展现。client.html
<!DOCTYPE html>
<html>
<head>
<title>WebSocket demo</title>
</head>
<body>
<script>
var ws = new WebSocket("ws://127.0.0.1:5000/"),
messages = document.createElement('ul');
ws.onmessage = function (event) {var messages = document.getElementsByTagName('ul')[0],
message = document.createElement('li'),
content = document.createTextNode(event.data);
message.appendChild(content);
messages.appendChild(message);
};
document.body.appendChild(messages);
</script>
</body>
</html>
启动 server.py
后再点开页面,成果:
参考:
https://websockets.readthedoc…