介绍了一些异步IO的基本概念与 Select 即回调函数的使用。
一些基本概念
在学习协程之前,建议先复习一下 socket 编程和多线程的知识。
并发与并行
并发:一个时间段、有几个程序在同一个 CPU 上运行,任意时刻只有一个程序在CPU上运行。
并行:任意时刻,有多个程序运行在多个 CPU 上。
同步和异步
是一种消息通信机制,把操作看成消息在不同线程、协程中发送,然后得到 Future进行后续操作。
同步:代码调用IO操作时,必须等待IO操作完成菜返回的调用方式。
异步:代码调用IO操作时,不必等操作完成就返回的调用方式。
阻塞和非阻塞
是一种函数调用的机制。
阻塞:调用函数是当前线程被挂起。
非阻塞:调用函数时,当前线程不会被挂起,而是立即返回。
什么是 C10K 问题?
C10K,一个1999 年提出来的技术挑战,即我们如何在1颗 1GHz CPU,2G 内存,1gbps 网络环境下,让单台服务器同时为1万个客户端提供FTP服务。
IO 多路复用
Unix 下的五种 I/O 模型:
- 阻塞 I/O
- 非阻塞 I/O
- 多路复用 I/O
- 信号驱动是 I/O
- 异步I/O (POSIX 的 aio_系列函数)
以 socket 中的连接建立为例:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30
|
client = socket.socket(socket.AF_INET, socket.SOCK_STREAM) client.connect((host, 80))
client = socket.socket(socket.AF_INET, socket.SOCK_STREAM) client.setblocking(False) client.connect((host, 80))
while (client connected): pass client.send("")
|
select,poll,epoll 都是 I/0 多路复用的机制。I/0 多路复用就是通过一种机制,一个进程可以监视多个描述符,一旦某个描述符就绪(一般是读就绪或者写就绪),能够通知程序进行相应的读写操作。
但select,poll, epoll 本质上都是同步 I/0,因为他们都需要在读写事件就绪后自己负责 进行读写,也就是说这个读写过程是阻塞的,而异步V/0则无需自己负责进行读写,异步IO的实现会负责把数据从内核拷贝到用户空间。
Select
select 函数监视的文件描述符分3类,分别是 writefds、readfds, 和 exceptfds。
调用后 select 函数会阻塞,直到有描述符就绪(有数据 可读、可写、或者有except),或者超时(timeout指指定等待时间,如果立即返回设为null即可),函数返回。
当select函数返回后,可以通过遍历fdset,来找到就绪的描述符。
select目前几乎在所有的平台上支持,其良好跨平台支持也是它的一个优点。
select的一个缺点在于,单个进程能够监视的文件描述符的数量存在最大限制,在Linux上一般为1024,可以通过修改宏定义甚至重新编译内核的方式提升这一限制,但是这样也会造成效率的降低。
Poll
不同与select使用三个位图来表示三个fdset的方式,poll使用一个pollfd的指针实现。
polfd结构包含了要监视的event和发生的event,不再使用select "参数-值" 传递的方式。
同时,pollfd并没有最大数量限制( 但是数量过大后性能也是会下降)。和select西数一样,pol返回后需要轮询polfd来获取就绪的描述符。从上面看,select和poll都需要在返回后,通过遍历文件描述符来获取 已经就绪的socket。
事实上,同时连接的大量客户端在一时刻可能只有很少的处于就绪状态 ,因此随着监视的描述符数量的增长,其效率也会线性下降
Epoll
epoll是在2.6内核中提出的,是之前的select和pol的增强版本。
相对于select和poll来说,epol更加灵活,没有描述符限制。epoll使用-一个文件描述符管理多个描述符,将用户关系的文件描述符的事件存放到内核的一个事件表中,这样在用户空间和内核空间的copy只需一次。
epoll 实现使用了红黑树。
对比
在并发高的情况下,连接活跃度不是很高,epoll 比 selelct 好;
并发不高,连接很活跃的时候,select 比 epoll 好;
实例:非阻塞I/O
先看一段使用非阻塞IO完成 socket 请求与接受的例子:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53
| import socket from urllib.parse import urlparse
def get_url(url): url = urlparse(url) host = url.netloc path = url.path if path == "": path = "/"
client = socket.socket(socket.AF_INET, socket.SOCK_STREAM) client.setblocking(False) try: client.connect((host, 80)) except BlockingIOError as e: pass
send_to = "GET {} HTTP/1.1\r\nHost:{}\r\nConnection:close\r\n\r\n".format(path, host).encode("utf8") while True: try: client.send(send_to) break except OSError as e: pass
data = b"" while True: try: d = client.recv(1024) except BlockingIOError as e: continue if d: data += d else: break
data = data.decode("utf8") html_data = data.split("\r\n\r\n")[1] print(html_data) client.close()
if __name__ == "__main__": get_url("http://www.baidu.com")
|
实例:Select 回调
下面这个是使用 Select 的版本:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85
| import socket from urllib.parse import urlparse
from selectors import DefaultSelector, EVENT_READ, EVENT_WRITE
selector = DefaultSelector()
urls = [] stop = False
class Fetcher: def connected(self, key): selector.unregister(key.fd) self.client.send("GET {} HTTP/1.1\r\nHost:{}\r\nConnection:close\r\n\r\n".format(self.path, self.host).encode("utf8")) selector.register(self.client.fileno(), EVENT_READ, self.readable)
def readable(self, key): d = self.client.recv(1024) if d: self.data += d else: selector.unregister(key.fd) data = self.data.decode("utf8") html_data = data.split("\r\n\r\n")[1] print(html_data) self.client.close() urls.remove(self.spider_url) if not urls: global stop stop = True
def get_url(self, url): self.spider_url = url url = urlparse(url) self.host = url.netloc self.path = url.path self.data = b"" if self.path == "": self.path = "/"
self.client = socket.socket(socket.AF_INET, socket.SOCK_STREAM) self.client.setblocking(False)
try: self.client.connect((self.host, 80)) except BlockingIOError as e: pass
selector.register(self.client.fileno(), EVENT_WRITE, self.connected)
def loop(): while not stop: ready = selector.select() for key, mask in ready: call_back = key.data call_back(key)
if __name__ == "__main__": fetcher = Fetcher() import time start_time = time.time() for url in range(20): url = "http://shop.projectsedu.com/goods/{}/".format(url) urls.append(url) fetcher = Fetcher() fetcher.get_url(url) loop() print(time.time()-start_time)
|
回调的问题
使用回调虽然可以带来效率上的提升,但是也会有一些问题,包括:
- 回调函数执行不正常怎么办?
- 回调函数里还要嵌套回调怎么办?嵌套多层怎么办?
- 多层嵌套中,某个环节出错了怎么办?
- 有个数据,需要每个回调函数都处理怎么办?
- 怎么使用当前函数中的局部变量?
归纳来看,可以说回调的问题在于:
- 代码可读性变差
- 共享状态的管理困难
- 处理异常比较麻烦
要处理这个问题,这时候就轮到协程出场了。