Python异步Asyncio的使用及websocket的实现

本文最后更新于:2022年5月16日 晚上

Asyncio是什么?

asyncio是Python的一个标准库,通过该模块,可以在Python实现异步编程和协程。

为什么要用协程?

程序执行过程中,线程的切换需要频繁的保存、加载上下文,产生消耗。而协程的切换是由程序主动切换的,相对的消耗较小。

如何使用协程?

1
2
3
4
5
6
7
8
import asyncio

async def main():
print('Hello ...')
await asyncio.sleep(1)
print('... World!')

asyncio.run(main())

这里参考Python官网提供的一段代码,
可以看到,一段基本的协程代码由三部分构成

1.导入asyncio模块

2.声明一段异步函数

和普通函数的区别是在’def’前面加上’async’。

3.执行异步函数

异步函数不能直接调用,这里分两种情况。

  1. 在异步函数内调用异步函数

    这种情况只需要在异步函数前加上’await’就可以直接调用,如

    1
    await asyncio.sleep(1)
  2. 在默认流程(同步)里调用异步函数

    需要使用asyncio模块调用,如

    1
    asyncio.run(main())

当然,asyncio也提供其他不同的函数以支持在同步、异步函数内调用异步函数。

实现websocket通信

server.py

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
import asyncio
import websockets


async def handler(websocket):
while True:
message = await websocket.recv()
await websocket.send(message)


async def main():
async with websockets.serve(handler, "", 88):
await asyncio.Future()


if __name__ == "__main__":
asyncio.run(main())

这是一个基础的websocket服务端代码,通过websockets模块创建serve对象,并且绑定端口和处理函数。无论服务端接受到什么,都会向客户的返回一个相同的内容。

client.py

1
2
3
4
5
6
7
8
9
10
11
12
13
14
import asyncio
import websockets


async def connect():
async with websockets.connect("ws://127.0.0.1:88") as ws:
while True:
data = input("")
await ws.send(data)
resp = await ws.recv()


if __name__ == "__main__":
asyncio.run(connect())

这是一个基础的websocket客户代码,在连接server成功后可以进行发送,接受消息。网上很多的教程都是用的类似流程的代码,然而这段代码有着一个较大的问题。

如果服务端不返回的话,代码会一直在’ws.recv()’这一行堵塞,用户无法继续发送下一条消息。想象这是一个聊天室,你在发送一条消息后,必须等待好友回复后才能发送下一条消息,这合理吗?这不合理

那要如何解决呢?有两种方法

  1. 多线程

    将发送和接受函数分别放入不同的线程中,就可以避免阻塞导致无法进入下一步流程

  2. 使用asyncio的task

    如果使用多线程的话,一个对话就需要两个线程,100个对话就需要200个线程,这对于性能的影响是致命的。不过还好,有了task,我们可以以一种很优雅的方式去处理这个问题

asyncio task 是什么?

1
await asyncio.sleep(5)

这是一行调用异步函数的代码,在这行代码里,执行与等待在同一位置。只有当这个异步函数执行完成后,阻塞才会消失

1
2
3
task = asyncio.create_task(asyncio.sleep(5))
print("hello")
await task

这段代码通过创建task的方式调用异步函数,在第一行执行时,异步函数就会启动。不同于await方式,task的任务启动后并不会阻塞,而是继续往下执行,也就是打印’hello’。直到碰到该任务对应的await才会阻塞。

优化client

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
import asyncio
import websockets

async def recv(ws):
async for message in ws:
print(message)


async def send(ws):
while True:
data = input("")
await ws.send(data)


async def connect():
async with websockets.connect("ws://127.0.0.1:88") as ws:
while True:
send_task = asyncio.create_task(send(ws))
recv_task = asyncio.create_task(recv(ws))

await send_task
await recv_task


if __name__ == "__main__":
asyncio.run(connect())

通过这一特性,我们可以使用task对client的代码进行优化。

首先,我们分别对发送和接受创建两个异步函数。再使用task的方式启动函数,并且配合await保持异步函数能够一直运行。这样我们就实现了任意的发送和接受消息。

REF

  1. Python.org. asyncio异步 I/O. https://docs.python.org/zh-cn/3/library/asyncio.html
  2. Websockets. https://websockets.readthedocs.io/en/stable/intro/index.html