Skip to content

I/O模型

同步/异步

同步与异步是一种通信机制,涉及到调用方和被调用方(针对应用程序与内核而言)。

同步过程中,进程触发IO操作并等待(阻塞)或者轮询的(非阻塞)去查看IO操作是否完成;异步过程中,进程触发IO操作以后,直接返回,做自己的事情,IO交给内核来处理,完成后内核通知进程IO完成。

同步和异步关注的是程序之间的协作关系。同步分为阻塞和非阻塞,异步则只有非阻塞。

阻塞和非阻塞是一种调用机制,只涉及到调用方(针对单个进程的执行状态),关注的是IO操作的执行。调用方等待IO操作完成后返回则为阻塞;调用方无需等待IO操作完成便返回则为非阻塞,在非阻塞的情况下,调用方常常需要主动去check,获得IO的操作结果。

同步阻塞式 I/O

阻塞是指当前发起IO操作的进程被阻塞。同步阻塞IO是指当进程调用某些IO操作的系统调用或者库函数时,比如accept(),send(),revc()等,进程会暂停下来,等待IO操作结束后再继续运行。

这种IO模型比较简单,可以和多进程结合起来有效的利用CPU资源,但这样的代价是需要多进程的内存开销。

图中,进程调用 recvfrom,其系统调用直到数据报到达且被复制到应用进程的缓冲区或者发生错误才返回。进程从调用 recvfrom 开始到它返回的整段时间内是被阻塞的。recvfrom 成功返回后,应用进程开始处理数据报

同步非阻塞式 I/O

同步阻塞IO中,进程的等待时间可能包括两个部分,一个是等待数据就绪,比如等待数据可以读和可以写;另一个是等待数据的复制,当数据准备好后,读写操作的耗时。

同步非阻塞IO的调用的区别在于,不会去等待数据的就绪,如果数据不可读或者不可写,相关系统调用会立即告诉进程(立即返回)。 比如使用非阻塞recv()接受网络数据后,函数就及时返回,告诉进程没有数据可读了。

其好处是如果结合反复轮询来尝试数据是否就绪,那么在一个进程里可以同时处理多个IO操作。问题在于需要进程来轮询查看数据是否就绪,进程处于忙碌等待状态。

图中,前三次调用 recvfrom 时没有数据可返回,因此内核转而立即返回一个 EWOULDBLOCK 错误。第四次调用 recvfrom 时已有一个数据报准备好,它被复制到应用进程缓冲区,于是 recvfrom 成功返回。接着处理数据

当一个应用进程像这样对一个非阻塞描述符循环调用 recvfrom 时,我们称之为轮询。应用进程持续轮询内核,以查看某个操作是否就绪。这么做往往耗费大量 CPU 时间,不过这种模型偶尔也会遇到,通常在专门提供某一种功能的系统中才有

I/O 复用

如果一个或多个I/O条件满足(输入准备好,数据准备好等),我们就能被通知到的这种能力称为I/O复用。

有了 I/O 复用,我们就可以调用 select 或 poll,阻塞在这两个系统调用中的某一个之上,而不是阻塞在真正的 I/O 系统调用上。

我们阻塞于 select 调用,等待数据报套接字变为可读。当 select 返回套接字可读这一条件时,我们调用 recvfrom 把所读数据报复制到应用进程缓冲区。

使用 select 的优势在于可以等待多个描述符就绪

select, pool, epoll 都是 IO 多路复用的机制。 I/O 多路复用就是通过一种机制,一个进程可以监听多个描述符,一旦某个描述符就绪(一般是读就绪或者写就绪),能够通知程序进行相应的读写操作。

但 select, pool, epoll 本质上都是同步 I/O,因为它们都需要在读写事件就绪后自己负责进行读写,也就是说这个读写过程是阻塞的,而异步 I/O 则无需自己负责进行读写,异步 I/O 的实现会负责把数据从内核拷贝到用户空间

select
select 最早于 1983 年出现在 4.2BSD 中,它通过一个 select() 系统调用来监视多个文件描述符的数组,当 select() 返回后,该数组中就绪的文件描述符便会被内核修改标志位,使得进程可以获得这些文件描述符从而进行后续的读写操作。

select 函数监视的文件描述符分 3 类,分别是 writefds、readfds 和 exceptfds。调用后 select 函数会阻塞,直到有描述符就绪(有数据 可读、可写、或者有except),或者超时(timeout 指定等待时间,如果立即返回设为 null 即可),函数返回。当 select 函数返回后,可以通过遍历 fdset,来找到就绪的描述符。

select 目前几乎在所有的平台上支持,其良好跨平台支持也是它的一个优点,事实上从现在看来,这也是它所剩不多的优点之一。

select 的一个缺点在于单个进程能够监视的文件描述符的数量存在最大限制,在 Linux 上一般为 1024,不过可以通过修改宏定义甚至重新编译内核的方式提升这一限制,但是这样也会造成效率的降低。

另外,select() 所维护的存储大量文件描述符的数据结构,随着文件描述符数量的增大,其复制的开销也线性增长。同时,由于网络响应时间的延迟使得大量 TCP 连接处于非活跃状态,但调用 select() 会对所有 socket 进行一次线性扫描,所以这也浪费了一定的开销。

poll
poll 在 1986年 诞生于 System V Release 3,它和 select 在本质上没有多大差别,但是 poll 没有最大文件描述符数量的限制。

不同于 select 使用三个位图来表示三个 fdset 的方式,poll 使用一个 pollfd 的指针实现。

poolfd 结构包含了要监视的 event 和发生的 event,不再使用 select“参数-值”传递方式。同时,pollfd 并没有最大数量限制(但是数量过大后性能也是会下降)。和 select 函数一样,pool 返回后,需要轮询 pollfd 来获取就绪的描述符

从上面看,select 和 poll 都需要在返回后,通过遍历文件描述符来获取已经就绪的 socket。事实上,同时连接的大量客户端在同一时刻可能只有很少的处于就绪状态,因此随着监视的描述符数量的增加,其效率也会线性下降

poll 和 select 同样存在一个缺点就是,包含大量文件描述符的数组被整体复制于用户态和内核的地址空间之间,而不论这些文件描述符是否就绪,它的开销随着文件描述符数量的增加而线性增大。

另外,select() 和 poll() 将就绪的文件描述符告诉进程后,如果进程没有对其进行 IO 操作,那么下次调用 select() 和 poll() 的时候将再次报告这些文件描述符,所以它们一般不会丢失就绪的消息,这种方式称为水平触发(Level Triggered)。

epoll
直到 Linux2.6 才出现了由内核直接支持的实现方法,那就是 epoll,它几乎具备了之前所说的一切优点,被公认为 Linux2.6下 性能最好的多路 I/O 就绪通知方法。

相对于 select 和 poll 来说,epoll 更加灵活,没有描述符限制。epoll 使用一个文件描述符管理多个描述符,将用户关系的文件描述符的事件存放到内核的一个事件表中,这样在用户空间和内核空间的 copy 只需一次。

epoll 可以同时支持水平触发和边缘触发(Edge Triggered,只告诉进程哪些文件描述符刚刚变为就绪状态,它只说一遍,如果我们没有采取行动,那么它将不会再次告知,这种方式称为边缘触发),理论上边缘触发的性能要更高一些,但是代码实现相当复杂。

epoll 同样只告知那些就绪的文件描述符,而且当我们调用 epoll_wait() 获得就绪文件描述符时,返回的不是实际的描述符,而是一个代表就绪描述符数量的值,你只需要去epoll 指定的一个数组中依次取得相应数量的文件描述符即可,这里也使用了内存映射(mmap)技术,这样便彻底省掉了这些文件描述符在系统调用时复制的开销。

另一个本质的改进在于epoll采用基于事件的就绪通知方式。在select/poll中,进程只有在调用一定的方法后,内核才对所有监视的文件描述符进行扫描,而 epoll 事先通过epoll_ctl() 来注册一个文件描述符,一旦基于某个文件描述符就绪时,内核会采用类似 callback 的回调机制,迅速激活这个文件描述符,当进程调用 epoll_wait() 时便得到通知。

epoll 并不代表一定比 select 好,在并发高的情况下,连接活跃度不是很高,epoll 比 select 好;并发性不高,同时连接很活跃,select 比 epoll 好

异步 I/O

使用非阻塞 io 完成 http 请求

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
import socket
from urllib.parse import urlparse

def get_url(url):
    url = urlparse(url)
    host = url.netloc
    path = url.path
    if path == "":
        path = "/"

    client = socket.socket()
    client.setblocking(False) # 设置非阻塞式 io
    try:
        client.connect((host, 80))
    except BlockingIOError as e:
        pass

    # 不停地询问连接是否建立好,需要 while 循环不断去检查状态
    while True:
        try:
            client.send((f"GET {path} HTTP/1.1\r\nHost: {host}\r\nConnection:close\r\n\r\n").encode("utf8"))
            break
        except OSError as e:
            pass

    data = b""
    while True:
        try:
            d = client.recv(1024)
        except BlockingIOError as e:
            continue
        if d:
            data += d
        else:
            break
    data = data.decode("utf8")
    html_data = data.split("\r\n\r\n")[1]
    print(html_data)
    client.close()

if __name__ == "__main__":
    get_url("http://www.baidu.com")

使用 select 完成 http 请求

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
import socket
from urllib.parse import urlparse
from selectors import DefaultSelector, EVENT_WRITE, EVENT_READ

selector = DefaultSelector()

class Fetcher:
    def connected(self, key):
        selector.unregister(key.fd)
        self.client.send((f"GET {self.path} HTTP/1.1\r\nHost: {self.host}\r\nConnection:close\r\n\r\n").encode("utf8"))
        selector.register(self.client.fileno(), EVENT_READ, self.readable)

    def readable(self, key):
        d = self.client.recv(1024)
        if d:
            self.data += d
        else:
            selector.unregister(key.fd)
            data = self.data.decode("utf8")
            html_data = data.split("\r\n\r\n")[1]
            print(html_data)
            self.client.close()

    def get_url(self, url):
        url = urlparse(url)
        self.host = url.netloc
        self.path = url.path
        self.data = b""
        if self.path == "":
            self.path = "/"

        self.client = socket.socket()
        self.client.setblocking(False) # 设置非阻塞式 io

        try:
            self.client.connect((self.host, 80))
        except BlockingIOError as e:
            pass

        # 注册
        selector.register(self.client.fileno(), EVENT_WRITE, self.connected)

def loop():
    # 事件循环,不停的请求 socket 的状态并调用对应的回调函数
    # select 本身不支持 register 模式
    # socket 状态变化以后的回调是由程序员完成的
    while True:
        ready = selector.select()
        for key, mask in ready:
            call_back = key.data
            call_back(key)
    # 回调 + 事件循环 + select(pool/epoll)

if __name__ == "__main__":
    fetcher = Fetcher()
    fetcher.get_url("http://www.baidu.com")
    loop()