redis 十一. IO 多路复用

一. 基础

  1. 首先知道一下五种io模型有个概念

Blocking IO: 阻塞IO
NoneBlockin IO: 非阻塞IO
IO multiplexing (redis6实际应用的io) : IO多路复用
signal driven IO: 信号驱动IO
asynchronous IO: 异步IO

  1. 通过BIO,NIO 解释多路复用是怎么一步步演变出来的
  1. BIO网络通信时会小于accept()阻塞等待客户端连接,调用read()阻塞等待客户端请求,会产生两个阻塞,假设一个客户端没有执行完毕,会造成其它请求一直阻塞等待,解决这个问题,可以通过多线程的方式,新建线程处理每个客户端请求,但是操作系统中用户态不能直接开辟线程,需要调用内核来创建,会有用户态到内核态的上下文切换,十分耗费资源,所以提出NIO非阻塞式IO进行通信
  2. NIO网络通信中,将多个socket连接放入一个连接容器中,以Java I/O框架为例通过一个线程使用Java的Selector来同时监控多个通道,遍历这个连接容器拿到每个连接,然后调用read方法判断客户端是否传输数据,出现两个问题,要遍历所有连接,假设多个连接中只有少量的几个连接有请求数据,也要遍历一遍,问题二,调用read方法判断客户端是否有传输数据,遍历是在用户态进行的,调用内核态的read()方法,虽然read()不阻塞,但是涉及到用户态到内核态的切换,每遍历调用一次就要切换一次开销较大
  1. 进而提出了IO多路复用,
  1. 将请求封装为文件描述符FD(文件描述符可以是套接字、管道等 I/O 设备等),将多个请求的文件描述符保存到一个集合中
  2. 创建一个事件集合()event set)将需要监听的文件描述符添加到事件集合中,在Linux中,可以使用epoll_create函数创建一个epoll实例,并使用epoll_ctl函数将需要监听的文件描述符添加到epoll实例中
  3. 调用select、poll或epoll等系统调用,将事件集合传递给内核,并等待事件的发生,在Linux中.可以使用epoll_wait函数等待事件的发生,并将发生事件的文件描述符及其事件类型返回给应用程序。
  4. 当发送事件后根据返回的事件类型,进行相应的处理,例如:如果是读事件,就读取数据并进行处理,如果是写事件,就写入数据并进行处理,如果是连接事件,就接受连接并进行处理
  1. 多路复用的优点是:
  1. 将Socket请求, 管道等 I/O等封装为FD文件描述符,内部通过专门的线程去处理,减少线程的创建与销毁,减少资源浪费
  2. 以Linux为例,通过select、poll 或 epoll 等系统调用,将监听文件描述符集合传递给内核,直接放入Linux内核上,不再出现用户态到内核态的切换,直接从内核态获取结果,内核是非阻塞的,也减少资源浪费
  1. Linux中IO多路复用通过select, poll, epoll 三大函数实现,又被称为event driver IO事件驱动IO, 一个进程同时等待多个文件描述符也就是socket套接字,socket连接,其中任意一个进入就绪状态,select函数就可以返回,相当于监听到了事件开始执行
  2. 将socket对应的fd注册到epoll, 通过epoll来监听socket上的消息,整个过程只在调用select, poll, epoll这三个函数时才会阻塞,收发客户消息是不会阻塞的,整个进程或线程都被充分利用起来了
  3. select, poll, epoll解释
  1. select模型: 使用数组来存储Socket连接文件描述符, 容量是固定的,需要通过轮询来判断是否发生了IO事件
  2. poll模型: 使用链表来存储Socket连接文件描述符,容量是不固定的,同样需要轮询来判断是否发生了IO事件
  3. epoll模型: epoll是一种事件通知模型,当发生了IO事件时,应用程序才会进行IO操作,不需要像poll模型那样主动轮询
    在这里插入图片描述

select

  1. select版本多路复用时涉及到的函数
/* According to POSIX.1-2001 */
#include <sys/select.h>

/* According to earlier standards */
#include <sys/time.h>
#include <sys/types.h>
#include <unistd.h>

/** nfds:       监控的文件描述符集里最大文件描述符加1,因为此参数会告诉内核检测前多少个文件描述符的状态
    readfds:    监控有读数据到达文件描述符集合,传入传出参数
    writefds:   监控写数据到达文件描述符集合,传入传出参数
    exceptfds:  监控异常发生达文件描述符集合,如带外数据到达异常,传入传出参数
    timeout:    定时阻塞监控时间,3种情况
                1.NULL,永远等下去
                2.设置timeval,等待固定时间
                3.设置timeval里时间均为0,检查描述字后立即返回,轮询
    struct timeval {
        long tv_sec; // seconds 
        long tv_usec; // microseconds 
    };*/
int select(int nfds, fd_set *readfds, fd_set *writefds,
           fd_set *exceptfds, struct timeval *timeout);
void FD_CLR(int fd, fd_set *set);   // 把文件描述符集合里fd清0
int  FD_ISSET(int fd, fd_set *set); // 测试文件描述符集合里fd是否置1
void FD_SET(int fd, fd_set *set);   // 把文件描述符集合里fd位置1
void FD_ZERO(fd_set *set);         //把文件描述符集合里所有位清0
  1. select多路复用伪代码,
#include <stdio.h>
#include <stdlib.h>
#include <unistd.h>
#include <sys/select.h>

#define MAX_FD 1000

int main() {
    int max_fd = -1;
    fd_set read_fds;
    FD_ZERO(&read_fds);

    // TODO: 添加需要监听的文件描述符到read_fds中
    int fd1 = ...; // 定义需要监听的文件描述符fd1、fd2、fd3
    int fd2 = ...;
    int fd3 = ...;
    FD_SET(fd1, &read_fds); // 将fd1加入read_fds中,以监听其可读事件
    FD_SET(fd2, &read_fds); // 将fd2加入read_fds中,以监听其可读事件
    FD_SET(fd3, &read_fds); // 将fd3加入read_fds中,以监听其可读事件
    max_fd = fd1 > fd2 ? fd1 : fd2;  // 计算需要监听的文件描述符的最大编号,用于select函数中

    while (1) {
        fd_set tmp_read_fds = read_fds;
        // 设置超时时间为5s
        struct timeval timeout;
        timeout.tv_sec = 5;
        timeout.tv_usec = 0;
        // 使用select进行多路复用
        int ret = select(max_fd + 1, &tmp_read_fds, NULL, NULL, &timeout);
        if (ret == -1) {
            perror("select error");
            exit(1);
        } else if (ret == 0) {
            printf("select timeout\n");
            continue;
        } else {
            // 遍历read_fds中的所有文件描述符
            for (int fd = 0; fd <= max_fd; fd++) {
                if (!FD_ISSET(fd, &tmp_read_fds)) {
                    continue;
                }
                if (fd == fd1) {
                    // 处理可读事件
                } else if (fd == fd2) {
                    // 处理可读事件
                } else if (fd == fd3) {
                    // 处理可读事件
                }
            }
        }
    }
    return 0;
}
  1. select实现多路复用的过程
  1. 首先内部会将请求封装为FD文件描述符,添加到FD_SET文件描述符集合中
  2. 执行select()函数,内部会执行系统调用,将监听文件描述符集合传递给内核,同时指定超时时间
  3. 此时select会阻塞,直到有事件发生,或超时返回,如果有事件发生内核会将其标记为“就绪状态”, select函数返回值: 当执行异常时会返回-1,当执行超时会返回0,当返回值大于0时说明发生了关注的事件,
  4. 代码执行,判断select函数如果返回值大于0,表示有一个或多个文件描述符已经准备好进行读写操作,遍历文件描述符集合,找到对应的文件描述符,应用程序根据相应的文件描述符可读/可写/异常状态,进行读取、写入或者其他处理
  1. select函数的缺点:
  1. 文件描述符数组用bitmap类型来存放,bitmap默认是1024,虽然可以调整但是有限度
  2. rest 每次循环都必须重新置位为0,不可重复使用
  3. 尽管将rest从用户态拷贝到内核态,由内核态判断是否有数据,select调用需要将fd数组拷贝到内核,所以还是有拷贝开销
  4. 当有数据时select就返回,但是select并不知道是哪个文件描述符有数据,还是要遍历文件描述符数组

poll

  1. poll 函数的优点: poll中通过pollfds数组来代替select中使用bitmap实现的文件描述符数组,解决了bitmap1024的限制,可以一次管理更多的client, pollfds数组中存放的也可以看为是文件描述符,当有事件发生时相应的revents位置为1,遍历时又会置位为0,解决了select不能重用的问题
  2. poll函数的缺点:
  1. pollfds文件描述符数组拷贝到内核态,依然有拷贝开销
  2. poll函数返回时还是不知道到底哪个文件描述符发生了事件,依然需要遍历文件描述符,找到对应,进行处理
#include <poll.h>

#define MAX_FD_NUM 1024  // 最大文件描述符数量

int main()
{
    int fd1, fd2, nfds;
    struct pollfd fds[MAX_FD_NUM];  // 定义pollfd数组,保存要监听的文件描述符和事件类型
    int timeout = 1000;  // 超时时间,单位为毫秒

    // 打开两个文件描述符,假设它们已经正确设置
    fd1 = open("file1", O_RDONLY);  // 只读方式打开文件1
    fd2 = open("file2", O_RDWR);    // 读写方式打开文件2

    // 初始化pollfd数组
    nfds = 0;  // 文件描述符数量从0开始计数
    fds[nfds].fd = fd1;            // 将文件描述符1添加到pollfd数组中
    fds[nfds].events = POLLIN;     // 监听可读事件
    nfds++;                        // 数量加一,将索引移动到下一个空闲位置
    fds[nfds].fd = fd2;            // 将文件描述符2添加到pollfd数组中
    fds[nfds].events = POLLOUT;    // 监听可写事件
    nfds++;                        // 数量加一

    // 不断监听文件描述符事件
    while (1) {
        int ret = poll(fds, nfds, timeout);  // 调用poll函数等待文件描述符准备就绪
        if (ret > 0) {  // 文件描述符中有事件发生
            for (int i = 0; i < nfds; i++) {  // 遍历所有文件描述符,查看它们的事件类型
                if (fds[i].revents & POLLIN) {  // 文件描述符i发生可读事件
                    // 处理可读事件
                }
                if (fds[i].revents & POLLOUT) {  // 文件描述符i发生可写事件
                    // 处理可写事件
                }
                if (fds[i].revents & POLLERR) {  // 文件描述符i发生错误事件
                    // 处理错误事件
                }
            }
        } else if (ret == 0) {
            // 超时事件
        } else {
            // 错误事件
        }
    }
    return 0;
}

epoll

  1. epoll是由三个函数构成, epoll_create, epoll_ctl, epoll_wait,

epoll_create: 创建一个epoll句柄,这个句柄相当于一个文件描述符,用于唯一标识这个epoll实例
epoll_ctl: 向内核中添加,修改或删除要监控的文件描述符,并设置需要监听的事件类型
epoll_wait: 发起类似select()调用,等待IO事件的发生, 会阻塞进程并等待监听的文件描述符中任意一个文件描述符的事件发生

int epoll_create(int size);
//epfd:epoll实例的文件描述符,由epoll_create函数创建。
//op:操作类型,包括EPOLL_CTL_ADD、EPOLL_CTL_MOD、EPOLL_CTL_DEL三种:
	//EPOLL_CTL_ADD:向epoll实例添加新的待监听文件描述符和事件。
	//EPOLL_CTL_MOD:修改已经在epoll实例上注册的文件描述符和事件。
	//EPOLL_CTL_DEL:从epoll实例删除一个文件描述符。
//fd:待监听的文件描述符。
//event:指向epoll_event结构体的指针,用于描述需要监听的事件类型。
int epoll_ctl(int epfd, int op, int fd, struct epoll_event *event);
//epfd:epoll实例的文件描述符,由epoll_create函数创建。
//events:指向epoll_event结构体数组的指针,用于存储已经就绪的文件描述符信息。
//maxevents:表示events数组中最多可以存储多少个就绪的文件描述符信息。
//timeout:表示等待事件发生的超时时间,单位为毫秒。
//如果timeout为-1,则表示一直等待直到事件发生;如果timeout为0,则表示立即返回,不会阻塞当前进程
int epoll_wait(int epfd, struct epoll_event *events, int maxevents, int timeout);
  1. events成员变量表示需要监听的事件类型,取值范围包括以下几个常量:
  1. EPOLLIN:表示文件描述符可读。
  2. EPOLLOUT:表示文件描述符可写。
  3. EPOLLPRI:表示有紧急数据可读。
  4. EPOLLERR:表示发生错误。
  5. EPOLLHUP:表示文件描述符被挂起。
  6. EPOLLET:表示使用边缘触发模式,即只在状态变化时通知事件
struct epoll_event {
    uint32_t events;    // 表示需要监听的事件类型
    epoll_data_t data;  // 表示需要监听的文件描述符的数据,可以是任意类型的数据
};

//对于一个新的文件描述符,可以通过调用如下代码向epoll实例注册需要监听的事
struct epoll_event ev;
ev.events = EPOLLIN;  // 只监听文件描述符可读事件
ev.data.fd = fd;      // 注册事件的文件描述符为fd
int ret = epoll_ctl(epfd, EPOLL_CTL_ADD, fd, &ev);

//如果要同时监听文件描述符的多个事件类型如:同时监听文件描述符可读和可写事件
ev.events = EPOLLIN | EPOLLOUT;  // 
  1. epoll是非阻塞的,执行流程是:
  1. 调用epoll_create()创建一个epoll实例文件描述符
  2. 封装epoll_event ,设置关注的事件
  3. 调用epoll_ctl()添加绑定了事件的文件描述符,文件描述符在epoll中以红黑树的结构存储,用于快速查找就绪的描述符已经
  4. 调用epoll_wait()等待,内核会遍历所有已注册的文件描述符和事件类型,检查是否有事件已经就绪,如果有就绪的事件,内核会将它们添加到就绪队列中,并返回给应用程序。解除阻塞,应用程序可以通过遍历就绪队列,获取已经就绪的文件描述符及其对应的事件类型,并执行相应的回调函数来处理事件
  1. 伪代码
// 创建epoll实例
int epoll_fd = epoll_create(1);

// 将要监听的文件描述符添加到epoll实例
struct epoll_event event;
event.events = EPOLLIN;
event.data.fd = sockfd;
epoll_ctl(epoll_fd, EPOLL_CTL_ADD, sockfd, &event);

while (1) {
    struct epoll_event events[MAX_EVENTS];
    // 等待有事件发生
    int nfds = epoll_wait(epoll_fd, events, MAX_EVENTS, -1);
    for (int i = 0; i < nfds; ++i) {
        if (events[i].data.fd == sockfd) { // 如果有新的连接请求
            struct sockaddr_in client_addr;
            socklen_t client_len = sizeof(client_addr);
            int connfd = accept(sockfd, (struct sockaddr *)&client_addr, &client_len);
            if (connfd < 0) {
                printf("accept error\n");
                continue;
            }

            // 将新连接的文件描述符添加到epoll实例,并注册回调函数
            event.events = EPOLLIN | EPOLLET;
            event.data.fd = connfd;
            epoll_ctl(epoll_fd, EPOLL_CTL_ADD, connfd, &event);
            handlers[connfd] = [] (int sockfd) {
                char buf[1024];
                int n = read(sockfd, buf, sizeof(buf));
                if (n == 0) { // 对端关闭连接
                    close(sockfd);
                    epoll_ctl(epoll_fd, EPOLL_CTL_DEL, sockfd, NULL);
                    handlers.erase(sockfd); // 删除handlers中对应的回调函数
                    return;
                } else if (n < 0) { // 出错
                    if (errno == EAGAIN || errno == EWOULDBLOCK) {
                        return;
                    }
                    printf("read error\n");
                    close(sockfd);
                    epoll_ctl(epoll_fd, EPOLL_CTL_DEL, sockfd, NULL);
                    handlers.erase(sockfd);
                    return;
                }
                printf("received %d bytes\n", n);
                // 处理数据...
				
				// 处理完数据后,需要重新将该文件描述符加入到epoll实例中,并修改事件类型
				event.events = EPOLLIN | EPOLLET;
				event.data.fd = sockfd;
				epoll_ctl(epoll_fd, EPOLL_CTL_MOD, sockfd, &event);
            };
        } else { // 如果是已连接的socket上有数据可读
            int sockfd = events[i].data.fd;
            handlers[sockfd](sockfd); // 执行回调函数
        }
    }
}
  1. epoll采用的是事件驱动机制,不是select、poll的轮询机制,epoll中在调用epoll_ctl添加文件描述符是,会绑定事件,与该事件对应的回调函数,当接收到关注的事件时执行回调函数同时将该fd的引用放入rdlist就绪列表中,当调用epoll_wait检查是否有事件发生时,只需要检查eventpoll对象中的rdlist双链表中是否有epitem元素即可,拿到就绪的文件描述符。如果rdlist不为空,则把发生的事件复制到用户态,同时将事件数量返回给用户(上方伪代码中没有这块逻辑),回调函数内部重点完成了
  1. 读取数据:根据文件描述符所对应的事件类型,判断该文件描述符是否可读或者可写,如果可读则通过 read 等函数读取数据。
  2. 处理数据:对于读取到的数据进行相应的处理,比如解析 HTTP 请求、计算逻辑等。
  3. 判断文件描述符状态:根据 read 或者其它函数的返回值判断该文件描述符是否出现错误(比如连接关闭),以及是否还需要继续等待事件就绪。
  4. 修改事件类型:如果需要继续等待事件就绪,则需要重新设置该文件描述符所对应的事件类型,并将其添加到 epoll 实例中,以便能够再次监听该事件。
  5. 关闭并删除文件描述符:如果文件描述符出现错误或者连接已经关闭,则需要在 epoll 实例中删除该文件描述符,并释放相关资源。
  1. 需要注意的是,在使用epoll的边缘触发模式EPOLLET时,当一个文件描述符变为就绪时,内核只会通知应用程序一次,直到应用程序对该文件描述符的所有数据都被读取或写入。因此在使用边缘触发模式时,应用程序需要在读取或写入数据时,确保将缓冲区中的所有数据都处理完毕,否则就可能会错过某些就绪的事件
  2. 相比于 select 和 poll,epoll 具有以下一些优点:
  1. 更高的并发能力:在处理大量连接时,select 和 poll 的效率都会随着文件描述符数量的增加而下降,因为每次调用这两个函数都需要遍历所有的文件描述符。而 epoll 采用了基于内核事件表的机制,每当一个文件描述符上有事件就绪时,内核会将它添加到事件表中,并唤醒用户程序进程,从而避免了无效遍历造成的效率低下。
  2. 更高的性能:由于 epoll 使用了基于事件驱动的异步 IO 模型,可以避免了同步 IO 的阻塞等待,从而提高了程序的响应速度和吞吐量。此外,epoll 还支持 EPOLLONESHOT 和 EPOLLET 两个事件触发模式,可以更加灵活地满足不同场景的需求
  3. 更易扩展、更易用:与 select 和 poll 相比,epoll 的编程接口更加简单、直观,同时还提供了更多的功能和选项,例如支持边缘触发、一次性操作等,这使得开发者可以更方便地进行网络编程,并实现更加复杂的应用程序

二. redis 与多路复用

  1. 在redis没有使用多路复用时,每个客户端连接都需要单独的线程或进程进行处理,如果同时有大量的客户端连接占用大量的redis线程会导致CPU资源的浪费和系统性能的下降, 当使用了多路复用后redis不用单独为该请求创建一个新的线程或进程,而是把请求放入事件队列中等待处理,减少上下文的切换,减少资源浪费
  2. redis多路复用的主要思路是: 将主线程的IO读写任务拆分给一组独立的线程执行,这样就可以使多个socket并行化读写,采用多路IO复用技术可以让单个线程高效的处理多个连接请求,将最耗时的socket的读取,请求解析, 写入等单独外包出去,剩下的命令仍然由主线程传下执行并与内存数据进行交互,进而保证redis的非阻塞IO能给顺利执行完成,多路指的是多个socket连接,复用指的是复用一个线程,多路复用中最重要的三种技术select, poll,epoll
  3. redis内部由: 套接字、I/O多路复用程序、文件时间分派器(dispatcher)、事件处理器四个部分构成,可以从这四个部分去讲述redis与多路复用的执行过程
  1. 当一个客户端请求redis时,redis会在内部为这个这个请求创建一个socket套接字,每个套接字准备好执行连接应答、写入、读取、关闭等操作时,就会相应产生一个文件事件
  2. 将多个连接套接字注册到事件排列队列event loop中
  3. 多路复用器后续会迭代这个事件排列队列,负责监听多个套接字,也就是监听FD文件描述符
  4. 如果有事件发生,redis将相应的套接字添加到就绪队列中,通过这个队列,以有序、同步、每次一个套接字的方式向文件事件分派器传送套接字
  5. 然后通过文件事件的处理器,解析请求事件,执行相应操作并返回结果(几种文件事件处理器: 连接应答处理器, 命令请求处理器, 命令回复处理器)
    在这里插入图片描述
  1. 官方解释:
  1. Reids基于Reactor模式IO多路复用开发了自己的网络事件处理器,被称为文件事件处理器,
  2. 程序同时监听多个套接字,根据套接字执行的任务不同为套接字关联不同的事件处理器
  3. 当被监听的套接字准备好执行:连接应答, 读取, 写入, 关闭等操作时,相对应的文件事件就会产生
  4. 文件事件处理器就会调用套接字之前关联好的事件处理器来处理这些事件
  5. 文件事件处理器以单线程方式运行,但是通过io多路复用既实现了高性能网络通信模型,保证redis以单线程方式提供服务
  1. 客户端与redis通信的一次流程
  1. 在redis启动初始化时,会将连接应答处理器跟 AE_READABLE 事件关联起来
  2. 如果客户端跟redis发起连接,会产生一个 AE_READABLE 事件,连接应答处理器执行,跟客户端建立连接,创建客户端对应的Socket,同时将这个 AE_READABLE 事件跟命令请求处理器关联起来
  3. 当客户端向Redis发起请求的时候(不管是读请求还是写请求)首先会在 Socket 产生一个 AE_READABLE 事件,然后由对应的命令请求处理器来处理。命令请求处理器会从Socket中读取请求相关数据,然后进行执行和处理。
  4. 接着Redis准备好了给客户端的响应数据之后,将Socket的AE_WRITABLE事件跟命令回复处理器关联起来,
  5. 当客户端准备好读取响应数据时,会在 Socket 上产生一个 AE_WRITABLE 事件,会由对应的命令回复处理器来处理,将准备好的响应数据写入 Socket,供客户端来读取。
  6. 命令回复处理器写完之后,会删除这个 Socket 的 AE_WRITABLE 事件和命令回复处理器的关联关系
  1. 下面是 redis 3.x 版本中,使用 epoll() 实现多路复用的伪代码
  1. create_api_state() 函数用于创建 epoll 状态实例,它会分配内存空间、创建 epoll 实例并分配保存触发了就绪事件的文件描述符及其事件类型的数组。aeApiCreate() 函数用于创建事件循环实例,它将 epoll 状态实例赋给事件循环的 apidata 字段。aeApiPoll() 函数是使用 epoll 来等待文件描述符上的事件,并返回就绪事件的数量。
  2. 在 aeApiPoll() 函数中,先阻塞等待事件,直到有事件发生或者等待超时。如果有事件就绪,就将事件类型转换为事件掩码(mask),并调用相应的事件处理函数处理,最后将触发了就绪事件的文件描述符及其事件类型保存到 fired 数组中,并返回事件就绪数目。
  3. 需要注意的是,上面的代码中用到了 EPOLLIN、EPOLLOUT、EPOLLERR 和 EPOLLHUP 四种事件类型,其中 EPOLLIN 表示可读事件,EPOLLOUT 表示可写事件,EPOLLERR 表示出错事件,EPOLLHUP 表示断开连接事件。
// 定义用于保存 epoll 状态的数据结构
typedef struct aeApiState {
    int epfd;                       // epoll 文件描述符
    struct epoll_event *events;     // 用于保存触发了就绪事件的文件描述符及其事件类型的数组
} aeApiState;

// 创建一个 epoll 状态实例
aeApiState *create_api_state(int setsize) {
    aeApiState *state = malloc(sizeof(aeApiState));            // 分配内存空间
    state->events = malloc(sizeof(struct epoll_event) * setsize);// 分配 events 数组的内存空间
    state->epfd = epoll_create(1024);                           // 创建 epoll 实例
    return state;
}

// 创建事件循环实例,将 epoll 状态实例赋给事件循环的 apidata 字段
int aeApiCreate(AeEventLoop *event_loop) {
    event_loop->apidata = create_api_state(event_loop->setsize); // 创建 epoll 状态实例
    return 0;
}

// 使用 epoll 来等待文件描述符上的事件
int aeApiPoll(AeEventLoop *event_loop, struct timeval *tvp) {
    aeApiState *state = event_loop->apidata;                     // 获取 epoll 状态实例
    int retval, numevents = 0;

    // 阻塞等待事件,直到有事件发生或者等待超时
    retval = epoll_wait(state->epfd, state->events, event_loop->setsize,
                        tvp ? (tvp->tv_sec * 1000 + tvp->tv_usec / 1000) : -1);

    if (retval > 0) {                                           // 如果有事件就绪
        numevents = retval;
        for (int i = 0; i < numevents; i++) {
            // 获取事件类型,并调用相应的事件处理函数处理
            int mask = 0;
            struct epoll_event *e = state->events + i;

            if (e->events & EPOLLIN) mask |= EVENT_READABLE;      // 可读事件
            if (e->events & EPOLLOUT) mask |= EVENT_WRITABLE;     // 可写事件
            if (e->events & EPOLLERR) mask |= EVENT_EXCEPTION;    // 出错事件
            if (e->events & EPOLLHUP) mask |= EVENT_EXCEPTION;    // 断开连接事件

            event_loop->fired[i].fd = e->data.fd;
            event_loop->fired[i].mask = mask;
        }
    }
    return numevents;                                           // 返回事件就绪数目
}