好久没有写东西了,最近有点时间了,就继续写写Redis源码学习系列文章吧。在今年5月,redis发布了8.0正式版,8.0这个版本做了很多优化,其中非常重要的一个功能就是优化了多线程模块。
早在 6.0 版本,redis就支持了 IO 多线程,之前也写过一篇文章介绍 redis IO 多线程的实现原理,感兴趣的可以看一下:Redis 6.0网络。在 6.0 版本中。多线程只用来处理网络 IO 操作,其他的操作还是在主线程中完成的。在 8.0 版本中,多线程也是只用来处理网络 IO 操作,具体执行命令还是在主线程中,但是做了很多优化。
先来看一下 6.0 版本多线程存在的问题:
比较影响性能的一个问题就是,所有 IO 线程在读取数据时,main 线程需要等待所有 IO 线程读取完数据之后,才能继续处理命令。也就是说,如果有一个 IO 线程读取数据比较慢,那么 main 线程就会被阻塞,等待这个慢的 IO 线程读取完数据之后,才能继续处理命令,在这个过程中,main 线程只能干等待,这个慢的 IO 线程会拖累整个系统的性能。
理想情况下,应该是每个IO线程读取完数据之后,就可以立即通知 main 线程,main 线程可以立即处理这些已经读取完数据的命令,而不需要等待其他 IO 线程读取完数据之后,才能继续处理命令。
在 8.0 版本中,redis解决了这个问题,实现了每个 IO 线程读取完数据之后,就可以立即通知 main 线程,main 线程可以立即处理这些已经读取完数据的命令,而不需要等待其他 IO 线程读取完数据之后,才能继续处理命令。
Github PR 这是 redis 8.0 版本多线程优化的 PR,感兴趣的可以看一下。
这是作者在 PR 中画的示意图。
回首看之前写的那篇文章,当时学疏才浅,文章整体逻辑不够清晰,一些地方没有讲清楚,可能阅读起来不是很连贯,但是应该没有大的错误。现在重新学习了 8.0 版本的网络实现之后,重新梳理了一下思路,写下这篇文章,希望能把 redis 8.0 版本的网络实现讲清楚。
本篇文章使用的代码是 redis 8.2.3 不同的版本可能会有一些差异。
本篇文中,主要从 网络初始化 , 新链接处理 , IO线程逻辑 , 主线程逻辑 , 数据读写 五个方面来介绍 redis 8.0 版本的网络实现。
在开始前,需要先了解 redis 中几个重要的数据结构:
aeEventLoop:事件循环结构体,负责管理所有的网络事件。
1typedef struct aeEventLoop {
2 int maxfd; /* fd 的最大值 */
3 int setsize; /* 追踪的最大fd数量 */
4 long long timeEventNextId;
5 int nevents; /* 当前注册的事件数量 */
6 aeFileEvent *events; /* 已注册的事件(其实是数组) */
7 aeFiredEvent *fired; /* 触发的事件 */
8 aeTimeEvent *timeEventHead; /* 定时事件链表头 */
9 int stop; /* 停止标志 */
10 void *apidata; /* 不同平台的事件轮训器指针(linux使用epoll) */
11 aeBeforeSleepProc *beforesleep; /* 事件循环睡眠前的回调函数 */
12 aeBeforeSleepProc *aftersleep; /* 事件循环睡眠后的回调函数 */
13 int flags; /* 事件循环标志 */
14 void *privdata[2]; /* 私有数据指针 */
15} aeEventLoop;
通过 aeCreateEventLoop 函数创建:
首先会在 服务启动时,在主线线程创建一个 aeEventLoop,放到 server.el 中,如果启用了多线程,还会为每个 IO 线程创建一个 aeEventLoop,放到 server.io_threads[i]->el 中。
connection:连接结构体,表示一个网络连接。
1struct connection {
2 ConnectionType *type; /* 连接类型,本次只关注 socket类型 */
3 ConnectionState state; /* 连接状态 */
4 int last_errno; /* 最近一次错误码 */
5 int fd; /* 当前连接的文件描述符 (fd) */
6 short int flags; /* 连接标志 */
7 short int refs; /* 连接引用计数 */
8 unsigned short int iovcnt; /* 当前 iovec 数量 */
9 void *private_data; /* 私有数据指针 */
10 struct aeEventLoop *el; /* 关联的事件循环 */
11 ConnectionCallbackFunc conn_handler; /* 连接处理回调函数 */
12 ConnectionCallbackFunc write_handler; /* 写处理回调函数 */
13 ConnectionCallbackFunc read_handler; /* 读处理回调函数 */
14};
ConnectionType 结构体定义了连接的各种操作函数指针,不同类型的连接(比如 TCP 连接、TLS 连接等)会有不同的实现。本次只关注 socket 类型的连接。这里因为 ConnectionType 结构体比较大,先贴出来,不展开讲解,后面用到的地方会指出和解释。
1typedef struct ConnectionType {
2 /* connection type */
3 const char *(*get_type)(struct connection *conn);
4
5 /* connection type initialize & finalize & configure */
6 void (*init)(void); /* auto-call during register */
7 void (*cleanup)(void);
8 int (*configure)(void *priv, int reconfigure);
9
10 /* ae & accept & listen & error & address handler */
11 void (*ae_handler)(struct aeEventLoop *el, int fd, void *clientData, int mask);
12 aeFileProc *accept_handler;
13 int (*addr)(connection *conn, char *ip, size_t ip_len, int *port, int remote);
14 int (*is_local)(connection *conn);
15 int (*listen)(connListener *listener);
16
17 /* create/shutdown/close connection */
18 connection* (*conn_create)(struct aeEventLoop *el);
19 connection* (*conn_create_accepted)(struct aeEventLoop *el, int fd, void *priv);
20 void (*shutdown)(struct connection *conn);
21 void (*close)(struct connection *conn);
22
23 /* connect & accept */
24 int (*connect)(struct connection *conn, const char *addr, int port, const char *source_addr, ConnectionCallbackFunc connect_handler);
25 int (*blocking_connect)(struct connection *conn, const char *addr, int port, long long timeout);
26 int (*accept)(struct connection *conn, ConnectionCallbackFunc accept_handler);
27
28 /* IO */
29 int (*write)(struct connection *conn, const void *data, size_t data_len);
30 int (*writev)(struct connection *conn, const struct iovec *iov, int iovcnt);
31 int (*read)(struct connection *conn, void *buf, size_t buf_len);
32 int (*set_write_handler)(struct connection *conn, ConnectionCallbackFunc handler, int barrier);
33 int (*set_read_handler)(struct connection *conn, ConnectionCallbackFunc handler);
34 const char *(*get_last_error)(struct connection *conn);
35 ssize_t (*sync_write)(struct connection *conn, char *ptr, ssize_t size, long long timeout);
36 ssize_t (*sync_read)(struct connection *conn, char *ptr, ssize_t size, long long timeout);
37 ssize_t (*sync_readline)(struct connection *conn, char *ptr, ssize_t size, long long timeout);
38
39 /* event loop */
40 void (*unbind_event_loop)(struct connection *conn);
41 int (*rebind_event_loop)(struct connection *conn, aeEventLoop *el);
42
43 /* pending data */
44 int (*has_pending_data)(struct aeEventLoop *el);
45 int (*process_pending_data)(struct aeEventLoop *el);
46
47 /* TLS specified methods */
48 sds (*get_peer_cert)(struct connection *conn);
49} ConnectionType;
每个连接都会对应一个 connection 结构体,表示这个连接的各种信息,比如文件描述符、连接状态、读写处理函数等。在 redis 中,所有的网络连接都是通过 connection 结构体来管理的。
eventNotifier:事件通知器结构体,负责在 IO 线程和主线程之间传递事件通知。
1typedef struct eventNotifier {
2#ifdef HAVE_EVENT_FD
3 int efd;
4#else
5 int pipefd[2];
6#endif
7} eventNotifier;
根据不同的平台,使用不同的方式实现事件通知。在 linux 平台上,使用 eventfd 来实现事件通知;在其他平台上,使用管道(pipe)来实现事件通知。这个结构体主要用于 IO 线程和主线程之间的通信。
下面开始一点点分析源码。
网络初始化
这是一个大概的网络初始化流程图,主要包括以下几个步骤:
- 创建主事件循环:
在 main 函数中,调用 initServer 函数初始化 redis 服务器,主要是为 redisServer 这个结构体初始化各种参数。其中和网络相关的是
1server.el = aeCreateEventLoop(server.maxclients+CONFIG_FDSET_INCR);
这行代码创建了主事件循环 aeEventLoop,并将其赋值给 server.el。
main 函数继续执行,调用 initListeners 函数初始化监听套接字,主要是创建监听套接字,并将其注册到主事件循环中。
1void initListeners(void) {
2 int conn_index;
3 connListener *listener;
4 //如果配置了端口,就创建 socket 监听器
5 if (server.port != 0) {
6 conn_index = connectionIndexByType(CONN_TYPE_SOCKET);
7 if (conn_index < 0)
8 serverPanic("Failed finding connection listener of %s", CONN_TYPE_SOCKET);
9 listener = &server.listeners[conn_index];
10 listener->bindaddr = server.bindaddr;
11 listener->bindaddr_count = server.bindaddr_count;
12 listener->port = server.port;
13 listener->ct = connectionByType(CONN_TYPE_SOCKET);
14 }
15
16 //省略unixdomain和tls监听器的相关逻辑
17 ......
18
19 /* create all the configured listener, and add handler to start to accept */
20 int listen_fds = 0;
21 for (int j = 0; j < CONN_TYPE_MAX; j++) {
22 listener = &server.listeners[j];
23 if (listener->ct == NULL)
24 continue;
25
26 // 创建监听socket
27 if (connListen(listener) == C_ERR) {
28 serverLog(LL_WARNING, "Failed listening on port %u (%s), aborting.", listener->port, listener->ct->get_type(NULL));
29 exit(1);
30 }
31 // 为监听socket创建accept处理器,并注册到server.el中
32 if (createSocketAcceptHandler(listener, connAcceptHandler(listener->ct)) != C_OK)
33 serverPanic("Unrecoverable error creating %s listener accept handler.", listener->ct->get_type(NULL));
34
35 listen_fds += listener->count;
36 }
37
38 if (listen_fds == 0) {
39 serverLog(LL_WARNING, "Configured to not listen anywhere, exiting.");
40 exit(1);
41 }
42}
createSocketAcceptHandler 函数最终会把 connSocketAcceptHandler 注册到主事件循环 server.el 中,当有新的连接到来时,主事件循环会调用这个处理器来处理新连接。
connListen 函数
1static inline int connListen(connListener *listener) {
2 return listener->ct->listen(listener);
3}
调用 ConnectionType 结构体中的 listen 函数指针(指向 connSocketListen),创建监听 socket,并绑定到指定的地址和端口。
createSocketAcceptHandler 函数
1int createSocketAcceptHandler(connListener *sfd, aeFileProc *accept_handler) {
2 int j;
3
4 for (j = 0; j < sfd->count; j++) {
5 if (aeCreateFileEvent(server.el, sfd->fd[j], AE_READABLE, accept_handler,sfd) == AE_ERR) {
6 /* Rollback */
7 for (j = j-1; j >= 0; j--) aeDeleteFileEvent(server.el, sfd->fd[j], AE_READABLE);
8 return C_ERR;
9 }
10 }
11 return C_OK;
12}
这个函数为每个监听 socket 创建一个 accept 处理器,并将其注册到主事件循环 server.el 中。当有新的连接到来时,主事件循环会调用这个 accept 处理器来处理新连接。
main 函数继续执行,调用 InitServerLast 函数完成 多线程相关的初始化
1void InitServerLast(void) {
2 bioInit();
3 initThreadedIO();
4 set_jemalloc_bg_thread(server.jemalloc_bg_thread);
5 server.initial_memory_usage = zmalloc_used_memory();
6}
initThreadedIO 函数
1void initThreadedIO(void) {
2 //如果没有启用多线程,直接返回
3 if (server.io_threads_num <= 1) return;
4
5 //把IO线程标记为激活状态
6 server.io_threads_active = 1;
7
8 ........
9
10 //为每个IO线程创建事件循环和相关数据结构
11 for (int i = 1; i < server.io_threads_num; i++) {
12 IOThread *t = &IOThreads[i];
13 t->id = i;
14 t->el = aeCreateEventLoop(server.maxclients+CONFIG_FDSET_INCR);
15 .............
16 //创建通知器,用于IO线程通知主线程有新数据可读
17 t->pending_clients_notifier = createEventNotifier();
18 //为IO线程创建处理函数,当主线程需要调用IO线程来处理时,可以通过触发这个通知器,
19 //调用handleClientsFromMainThread函数处理
20 if (aeCreateFileEvent(t->el, getReadEventFd(t->pending_clients_notifier),
21 AE_READABLE, handleClientsFromMainThread, t) != AE_OK)
22 {.................}
23
24 ............
25
26 //创建IO线程
27 if (pthread_create(&t->tid, NULL, IOThreadMain, (void*)t) != 0) {
28 serverLog(LL_WARNING, "Fatal: Can't initialize IO thread.");
29 exit(1);
30 }
31
32 //为主线程创建相关数据结构,用于接收IO线程的通知,后面会详细介绍
33 mainThreadPendingClientsToIOThreads[i] = listCreate();
34 mainThreadPendingClients[i] = listCreate();
35 mainThreadProcessingClients[i] = listCreate();
36 pthread_mutex_init(&mainThreadPendingClientsMutexes[i], attr);
37 //创建通知器,用于IO线程通知主线程有新数据可读
38 mainThreadPendingClientsNotifiers[i] = createEventNotifier();
39 //为主线程创建处理函数,当IO线程读取完数据后,可以通过触发这个通知器,
40 //调用handleClientsFromIOThread函数处理
41 if (aeCreateFileEvent(server.el, getReadEventFd(mainThreadPendingClientsNotifiers[i]),
42 AE_READABLE, handleClientsFromIOThread, t) != AE_OK)
43 {
44 exit(1);
45 }
46 if (attr) zfree(attr);
47 }
48}
到这里,网络初始化的工作就完成了,主事件循环创建好了,监听 socket 创建好了,并注册到了主事件循环中,IO 线程也创建好了,并为每个 IO 线程创建了事件循环和相关数据结构。
新连接处理
先来看一下新连接的处理流程图:
当有新的连接到来时,会在主线线程的事件循环中触发监听 socket 的可读事件,然后再把这个连接分配到某个 IO 线程中去处理。
最终会通过 ConnectionType 结构体中的 accept 函数指针(指向 connSocketAccept)来处理新连接。
connSocketAccept 函数
1static int connSocketAccept(connection *conn, ConnectionCallbackFunc accept_handler) {
2 ...............
3 if (!callHandler(conn, accept_handler)) ret = C_ERR;
4 ...............
5 return ret;
6}
accept_handler 指向 connSocketAcceptHandler 函数
1static void connSocketAcceptHandler(aeEventLoop *el, int fd, void *privdata, int mask) {
2 int cport, cfd;
3 int max = server.max_new_conns_per_cycle;//每次循环最多接受的连接数
4 ...............
5
6 while(max--) {
7 cfd = anetTcpAccept(server.neterr, fd, cip, sizeof(cip), &cport);
8 ...............
9 acceptCommonHandler(connCreateAcceptedSocket(el,cfd,NULL), 0, cip);
10 }
11}
在这个函数中,会循环接受新的连接,并为每个新连接创建一个 connection 结构体,然后调用 acceptCommonHandler 函数处理新连接。
acceptCommonHandler 函数 就不展开分析了,主要是对新连接进行一些初始化操作,最终是通过 accept 或者 accept4 函数创建新的 socket 连接。
acceptCommonHandler 函数会完成 connection 结构体的初始化工作,包括为这个连接创建一个 client 结构体,并将这个连接分配到某个 IO 线程中去处理。
1void acceptCommonHandler(connection *conn, int flags, char *ip) {
2 //检查最大连接数
3 if (listLength(server.clients) + getClusterConnectionsCount()
4 >= server.maxclients)
5 {
6 char *err;
7 if (server.cluster_enabled)
8 err = "-ERR max number of clients + cluster "
9 "connections reached\r\n";
10 else
11 err = "-ERR max number of clients reached\r\n";
12 // 返回客户端错误信息
13 if (connWrite(conn,err,strlen(err)) == -1) {
14 /* Nothing to do, Just to avoid the warning... */
15 }
16 server.stat_rejected_conn++;
17 connClose(conn);//关闭连接
18 return;
19 }
20
21 //为连接创建 client 结构体
22 if ((c = createClient(conn)) == NULL) {
23 .........
24 return;
25 }
26
27 //最终会调用 clientAcceptHandler 函数处理新连接
28 if (connAccept(conn, clientAcceptHandler) == C_ERR) {
29 if (connGetState(conn) == CONN_STATE_ERROR)
30 serverLog(LL_WARNING,
31 "Error accepting a client connection: %s (addr=%s laddr=%s)",
32 connGetLastError(conn), getClientPeerId(c), getClientSockname(c));
33 freeClient(connGetPrivateData(conn));
34 return;
35 }
36}
createClient 函数为这个连接创建一个 client 结构体,并将其初始化。比较重要的一点是把 readQueryFromClient 函数设置为这个连接的读处理函数。后续当这个连接有数据可读时,就会调用这个函数来读取数据。
connAccpet 函数 会使用 conn->type->accept 函数指针(指向 connSocketAccept)来处理新连接,最终会调用到 clientAcceptHandler 函数。
clientAcceptHandler 会做一些检查,最终调用 assignClientToIOThread 函数把这个连接分配到某个 IO 线程中去处理。
1void assignClientToIOThread(client *c) {
2 serverAssert(c->tid == IOTHREAD_MAIN_THREAD_ID);
3 // 找到当前连接数最少的 IO 线程
4 int min_id = 0;
5 int min = INT_MAX;
6 for (int i = 1; i < server.io_threads_num; i++) {
7 if (server.io_threads_clients_num[i] < min) {
8 min = server.io_threads_clients_num[i];
9 min_id = i;
10 }
11 }
12
13 // 把连接分配到这个 IO 线程中去
14 server.io_threads_clients_num[c->tid]--;
15 c->tid = min_id;//把client中tid,也就是所属IO线程id设置为当前IO线程id
16 c->running_tid = min_id;//把client中running_tid也设置为当前IO线程id
17 server.io_threads_clients_num[min_id]++;
18 ..........
19 //socket类型的连接中,这里不会调用unbind_event_loop函数
20 connUnbindEventLoop(c->conn);
21 c->io_flags &= ~(CLIENT_IO_READ_ENABLED | CLIENT_IO_WRITE_ENABLED);
22 listAddNodeTail(mainThreadPendingClientsToIOThreads[c->tid], c);
23}
到这里,新连接处理的工作就完成了,连接已经分配到某个 IO 线程中去处理了,接下来就看 IO 线程是如何处理这些连接的。
IO线程逻辑
在讲 IO 线程逻辑前,需要先了解几个重要的数据结构:
首先是 IOThread 结构体,表示一个 IO 线程。
1typedef struct __attribute__((aligned(CACHE_LINE_SIZE))) {
2 uint8_t id; // 当前 IO 线程的 ID
3 pthread_t tid; // 操作系统线程 ID
4 redisAtomic int paused; // 当前 IO 线程是否暂停
5 redisAtomic int running; // 当前 IO 线程是否在运行
6 aeEventLoop *el; // 当前线程的事件循环,在 initThreadedIO 中创建,底层使用 epoll 实现
7 list *pending_clients; // 等待处理的客户端list
8 list *processing_clients; // 正在处理的客户端list
9 eventNotifier *pending_clients_notifier; // 通知器,用于通知主线程有事件需要处理
10 pthread_mutex_t pending_clients_mutex; // 互斥锁,保护 pending_clients 列表
11 list *pending_clients_to_main_thread; // 主线程分配给 IO 线程的客户端list
12 list *clients; // 当前 IO 线程管理的客户端list
13} IOThread;
这个结构体在 initThreadedIO 函数中创建,为每个 IO 线程创建一个 IOThread 结构体,并初始化相关数据结构。el 是 IO 线程的事件循环,和 server.el 作用类似,只不过是每个 IO 线程自己的事件循环,在linux平台上,底层使用 epoll 实现。
下面是一些全局变量,主要用于主线程和 IO 线程之间的通信。
1// IO 线程数组
2static IOThread IOThreads[IO_THREADS_MAX_NUM];
3
4// 主线程和 IO 线程之间的通信数据结构
5
6// list链表的数组,主线程处理完成后要交回给相应 IO 线程继续处理的客户端放到这里
7static list *mainThreadPendingClientsToIOThreads[IO_THREADS_MAX_NUM];
8
9// 主线程从 mainThreadPendingClients 把待处理的客户端搬到这个列表,
10// 表示当前正在主线程上下文中逐一处理的客户端(避免在处理过程中被其他线程修改)
11static list *mainThreadProcessingClients[IO_THREADS_MAX_NUM];
12
13// 主线程从 IO 线程接收待处理的客户端放到这里,当需要通知主线程有来自某个 IO 线程的新客户端可处理时,
14// IO 线程使用对应的 notifier 唤醒主线程;
15static list *mainThreadPendingClients[IO_THREADS_MAX_NUM];
16
17// 保护 mainThreadPendingClients 列表的互斥锁数组
18static pthread_mutex_t mainThreadPendingClientsMutexes[IO_THREADS_MAX_NUM];
19
20// 用于通知主线程有新客户端可处理的通知器数组
21static eventNotifier* mainThreadPendingClientsNotifiers[IO_THREADS_MAX_NUM];
看一下大概的 IO 线程逻辑流程图:
这里的 el 就是 IO 线程自己的事件循环,在 initThreadedIO 函数中创建好了。pending_clients 列表用于存放主线程分配给 IO 线程的待处理客户端,processing_clients 列表用于存放当前正在处理的客户端,clients 列表用于存放当前 IO 线程管理的所有客户端。
在 initThreadedIO 函数中,这些数据结构也都创建好了。同时也为每个 IO 线程创建 el,当 el 中有事件发生时,会调用 handleClientsFromMainThread 函数来处理。
最终会调用到 processClientsFromMainThread 函数来处理
1int processClientsFromMainThread(IOThread *t) {
2 pthread_mutex_lock(&t->pending_clients_mutex);
3 // 把主线程分配给 IO 线程的待处理客户端搬到 IO 线程自己的待处理列表中
4 listJoin(t->processing_clients, t->pending_clients);
5 pthread_mutex_unlock(&t->pending_clients_mutex);
6 size_t processed = listLength(t->processing_clients);
7 if (processed == 0) return 0;
8
9 listIter li;
10 listNode *ln;
11 listRewind(t->processing_clients, &li);
12 // 遍历所有待处理客户端
13 while((ln = listNext(&li))) {
14 client *c = listNodeValue(ln);
15 ..............
16 listUnlinkNode(t->processing_clients, ln);
17 listLinkNodeTail(t->clients, ln);
18 c->io_thread_client_list_node = listLast(t->clients);
19 // 如果连接没有绑定事件循环(第一次),就绑定到当前 IO 线程的事件循环中,
20 // 并设置读处理函数为 readQueryFromClient
21 if (!connHasEventLoop(c->conn)) {
22 connRebindEventLoop(c->conn, t->el);
23 serverAssert(!connHasReadHandler(c->conn));
24 connSetReadHandler(c->conn, readQueryFromClient);
25 }
26
27 // 如果连接有未处理的回复数据,就先写数据给客户端
28 if (clientHasPendingReplies(c)) {
29 writeToClient(c, 0);
30 if (!(c->io_flags & CLIENT_IO_CLOSE_ASAP) && clientHasPendingReplies(c)) {
31 connSetWriteHandler(c->conn, sendReplyToClient);
32 }
33 }
34 }
35 serverAssert(listLength(t->processing_clients) == 0);
36 return processed;
37}
在这个函数中,IO 线程会处理主线程分配给它的待处理客户端。主要做了以下几件事:
- 把主线程分配给 IO 线程的待处理客户端搬到 IO 线程自己的待处理列表中。
- 遍历所有待处理客户端,如果连接没有绑定事件循环(第一次),就绑定到当前 IO 线程的事件循环中,并设置读处理函数为
readQueryFromClient。 - 如果连接有未处理的回复数据,就先写数据给客户端。
- 最后把客户端从待处理列表中移除,加入到 IO 线程管理的客户端列表中。
IOThreadBeforeSleep 函数 在 IO 线程事件循环睡眠前调用,主要是处理一些待处理的客户端。
1void IOThreadBeforeSleep(struct aeEventLoop *el) {
2 IOThread *t = el->privdata[0];
3 //socket类型的连接中,这里不会调用processPendingData函数
4 connTypeProcessPendingData(el);
5
6 // 检查当前是否有待处理的数据,如果有,就不让事件循环睡眠
7 int dont_sleep = connTypeHasPendingData(el);
8
9 // 调用 processClientsFromMainThread 处理主线程分配给 IO 线程的待处理客户端
10 if (processClientsFromMainThread(t) > 0) {
11 dont_sleep = 1;
12 }
13 if (!dont_sleep) {
14 atomicSetWithSync(t->running, 0);//标记当前 IO 线程不在运行状态
15 // 再次调用 processClientsFromMainThread 处理主线程分配给 IO 线程的待处理客户端
16 processClientsFromMainThread(t);
17 }
18 aeSetDontWait(t->el, dont_sleep);
19
20 /* Check if i am being paused, pause myself and resume. */
21 handlePauseAndResume(t);
22 // 在sleep之前,通知主线程处理从 IO 线程读取完数据的客户端
23 sendPendingClientsToMainThreadIfNeeded(t, 0);
24}
在这个函数中,IO 线程会处理一些待处理的客户端,主要做了以下几件事:
- 检查当前是否有待处理的数据,如果有,就不让事件循环睡眠。
- 调用
processClientsFromMainThread处理主线程分配给 IO 线程的待处理客户端。 - 如果没有待处理的数据,就标记当前 IO 线程不在运行状态。
- 再次调用
processClientsFromMainThread处理主线程分配给 IO 线程的待处理客户端。 - 在 sleep 之前,通知主线程处理从 IO 线程读取完数据的客户端。
IOThreadAfterSleep 函数 在 IO 线程事件循环睡眠后调用,主要是标记当前 IO 线程在运行状态。
1void IOThreadAfterSleep(struct aeEventLoop *el) {
2 IOThread *t = el->privdata[0];
3 atomicSetWithSync(t->running, 1);//标记当前 IO 线程在运行状态
4}
整个 IO 线程逻辑主要就是通过事件循环来处理主线程分配给它的待处理客户端,并在 sleep 前通知主线程处理从 IO 线程读取完数据的客户端。
主线程逻辑
先来看一下主线程逻辑的大概流程图:
相比之下,主线程做的工作更多一些,这里只介绍命令处理的相关逻辑,其他的逻辑就不展开分析了。
主线程的事件循环是 server.el,在 initServer 函数中创建好了。当 server.el 中有事件发生时,会调用 handleClientsFromIOThread 函数来处理,然后调用 processClientsFromIOThread 函数来处理从 IO 线程读取完数据的客户端。
1int processClientsFromIOThread(IOThread *t) {
2 /* Get the list of clients to process. */
3 pthread_mutex_lock(&mainThreadPendingClientsMutexes[t->id]);
4 // 把从 IO 线程读取完数据的客户端搬到主线程自己的待处理列表中
5 listJoin(mainThreadProcessingClients[t->id], mainThreadPendingClients[t->id]);
6 pthread_mutex_unlock(&mainThreadPendingClientsMutexes[t->id]);
7 size_t processed = listLength(mainThreadProcessingClients[t->id]);
8 if (processed == 0) return 0;
9
10 int prefetch_clients = 0;
11 resetCommandsBatch();
12
13 listNode *node = NULL;
14 while (listLength(mainThreadProcessingClients[t->id])) {
15 // 如果没有预取的客户端,就预取一批客户端
16 if (prefetch_clients <= 0) prefetch_clients = prefetchIOThreadCommands(t);
17 // 每处理完一个客户端,就减少预取的客户端数量
18 if (--prefetch_clients <= 0) resetCommandsBatch();
19
20 // 取出一个客户端进行处理
21 if (node) zfree(node);
22 node = listFirst(mainThreadProcessingClients[t->id]);
23 listUnlinkNode(mainThreadProcessingClients[t->id], node);
24 client *c = listNodeValue(node);
25 ................
26 // 把client的running_tid设置为主线程id
27 c->running_tid = IOTHREAD_MAIN_THREAD_ID;
28 if (c->read_error) handleClientReadError(c);
29
30 // 如果客户端被标记为关闭,就释放客户端
31 if (c->io_flags & CLIENT_IO_CLOSE_ASAP) {
32 freeClient(c);
33 continue;
34 }
35
36 // 处理定时任务
37 if (c->last_cron_check_time + 1000 <= server.mstime ||
38 c->io_flags & CLIENT_IO_PENDING_CRON)
39 {
40 c->last_cron_check_time = server.mstime;
41 if (clientsCronRunClient(c)) continue;
42 } else {
43 // 更新客户端内存使用情况
44 updateClientMemUsageAndBucket(c);
45 }
46
47 if (!c->read_error && c->io_flags & CLIENT_IO_PENDING_COMMAND) {
48 c->flags |= CLIENT_PENDING_COMMAND;
49 // 处理挂起的命令和读取缓冲区
50 if (processPendingCommandAndInputBuffer(c) == C_ERR) {
51 // 如果处理命令失败,跳过该客户端
52 continue;
53 }
54 }
55 // 如果客户端有未处理的回复数据,就把客户端放到待写队列中
56 if (!(c->flags & CLIENT_PENDING_WRITE) && clientHasPendingReplies(c))
57 putClientInPendingWriteQueue(c);
58
59 // 如果客户端必须在主线程处理,就把客户端保留在主线程中
60 /*CLIENT_CLOSE_ASAP | CLIENT_MASTER | CLIENT_SLAVE |
61 CLIENT_PUBSUB | CLIENT_MONITOR | CLIENT_BLOCKED |
62 CLIENT_UNBLOCKED | CLIENT_TRACKING | CLIENT_LUA_DEBUG |
63 CLIENT_LUA_DEBUG_SYNC)*/
64 // 这些类型的客户端必须在主线程处理
65 if (isClientMustHandledByMainThread(c)) {
66 keepClientInMainThread(c);
67 continue;
68 }
69
70 // 把client从主线程待处理列表中移除(有些client可能没有回复数据需要写)
71 if (c->flags & CLIENT_PENDING_WRITE) {
72 c->flags &= ~CLIENT_PENDING_WRITE;
73 listUnlinkNode(server.clients_pending_write, &c->clients_pending_write_node);
74 }
75 //到这,主线程对这个client的处理就完成了,把client发送回对应的IO线程继续处理
76 c->running_tid = c->tid;
77 listLinkNodeHead(mainThreadPendingClientsToIOThreads[c->tid], node);
78 node = NULL;
79
80 // 主线程处理了一定数量的客户端,就把这些客户端发送回对应的 IO 线程继续处理
81 sendPendingClientsToIOThreadIfNeeded(t, 1);
82 }
83 if (node) zfree(node);
84
85 // 处理完所有客户端后,再把剩余的客户端发送回对应的 IO 线程继续处理
86 sendPendingClientsToIOThreadIfNeeded(t, 0);
87
88 return processed;
89}
processPendingCommandAndInputBuffer 函数 主要是处理客户端的挂起命令和读取缓冲区。
1int processPendingCommandAndInputBuffer(client *c) {
2 // 如果当前client的数据已经准备好,可以处理挂起的命令
3 if (c->flags & CLIENT_PENDING_COMMAND) {
4 c->flags &= ~CLIENT_PENDING_COMMAND;
5 if (processCommandAndResetClient(c) == C_ERR) {
6 return C_ERR;
7 }
8 }
9
10 // 继续从网络中读取数据,解析 resp 协议
11 if (c->querybuf && sdslen(c->querybuf) > 0) {
12 return processInputBuffer(c);
13 }
14 return C_OK;
15}
processCommandAndResetClient 函数 主要是处理客户端的命令。最后会调用 processCommand 函数来处理这个 client 的命令。最终会调用到 client 的 proc 函数来执行具体的命令处理函数。
在这个函数中,主线程会处理从 IO 线程读取完数据的客户端。主要做了以下几件事:
- 把从 IO 线程读取完数据的客户端搬到主线程自己的待处理列表中。
- 遍历所有待处理客户端,处理每个客户端的命令。
- 如果客户端被标记为关闭,就释放客户端。
- 处理定时任务。
- 处理挂起的命令和读取缓冲区。
- 如果客户端有未处理的回复数据,就把客户端放到待写队列中。
- 如果客户端必须在主线程处理,就把客户端保留在主线程中。
- 把客户端发送回对应的 IO 线程继续处理。
到这里,主线程逻辑的工作就完成了,主线程已经处理完从 IO 线程读取完数据的客户端,并把这些客户端发送回对应的 IO 线程继续处理了。
数据读写
数据读取
先来看一下数据读取的大概流程图:
连接 (conn) 被分配到 IO 线程后,最终会把 conn 放到 IO线程的 el 中管理,并设置读处理函数为 readQueryFromClient。
当有数据可读时,IO 线程的事件循环会调用 readQueryFromClient 函数来读取数据。
1void readQueryFromClient(connection *conn) {
2 client *c = connGetPrivateData(conn);
3 ..................
4 // 最终会调用到下面的代码 从 网络中读取数据
5 nread = connRead(c->conn, c->querybuf+qblen, readlen);
6 if (nread == -1) {
7 if (connGetState(conn) == CONN_STATE_CONNECTED) {
8 goto done;
9 } else {
10 c->read_error = CLIENT_READ_CONN_DISCONNECTED;
11 freeClientAsync(c);
12 goto done;
13 }
14 } else if (nread == 0) {
15 c->read_error = CLIENT_READ_CONN_CLOSED;
16 freeClientAsync(c);
17 goto done;
18 }
19
20 ....................
21 // 读取到数据后,开始解析 resp 协议,最终会调用 enqueuePendingClientsToMainThread 函数
22 // 把当前 client 放到主线程待处理列表中
23 if (processInputBuffer(c) == C_ERR)
24 c = NULL;
25
26 ...................
27}
在这个函数中,IO 线程会从网络中读取数据,并把数据放到客户端的 querybuf 中。然后调用 processInputBuffer 函数来解析 resp 协议。最终会调用 enqueuePendingClientsToMainThread 函数,然后调用 sendPendingClientsToMainThreadIfNeeded 函数通知主线程有新客户端可处理。
1static inline void sendPendingClientsToMainThreadIfNeeded(IOThread *t, int check_size) {
2 size_t len = listLength(t->pending_clients_to_main_thread);
3 if (len == 0 || (check_size && len < IO_THREAD_MAX_PENDING_CLIENTS)) return;
4
5 int running = 0, pending = 0;
6 pthread_mutex_lock(&mainThreadPendingClientsMutexes[t->id]);
7 pending = listLength(mainThreadPendingClients[t->id]);
8 // 把待处理的客户端搬到主线程的待处理 list 中
9 listJoin(mainThreadPendingClients[t->id], t->pending_clients_to_main_thread);
10 pthread_mutex_unlock(&mainThreadPendingClientsMutexes[t->id]);
11 if (!pending) atomicGetWithSync(server.running, running);
12
13 // 如果主线程没有在运行,并且没有待处理的客户端,就触发通知器,唤醒主线程处理
14 if (!running && !pending) {
15 triggerEventNotifier(mainThreadPendingClientsNotifiers[t->id]);
16 }
17}
到这,数据读取的工作就完成了,IO 线程已经从网络中读取数据,并把客户端发送到主线程进行处理了。
数据写入
数据写入的大概流程图:
IO 线程的 el 收到主线程发来的通知 (使用 pending_clients_notifier) ,会调用 handleClientsFromMainThread 然后会调用 processClientsFromMainThread 函数来处理主线程分配给它的待处理客户端。
这个函数在 IO 线程逻辑那里已经分析过了,就不完全展开了
1int processClientsFromMainThread(IOThread *t) {
2 pthread_mutex_lock(&t->pending_clients_mutex);
3 // 把主线程分配给 IO 线程的待处理客户端搬到 IO 线程自己的待处理列表中
4 listJoin(t->processing_clients, t->pending_clients);
5 pthread_mutex_unlock(&t->pending_clients_mutex);
6 size_t processed = listLength(t->processing_clients);
7 if (processed == 0) return 0;
8 while((ln = listNext(&li))) {
9 client *c = listNodeValue(ln);
10 ..............
11 // 把客户端从待处理列表中移除,加入到 IO 线程管理的客户端列表中
12 listUnlinkNode(t->processing_clients, ln);
13 listLinkNodeTail(t->clients, ln);
14 c->io_thread_client_list_node = listLast(t->clients);
15 // 如果客户端被标记为关闭,就把客户端发送回主线程处理
16 if (c->io_flags & CLIENT_IO_CLOSE_ASAP) {
17 enqueuePendingClientsToMainThread(c, 1);
18 continue;
19 }
20
21 // 如果有需要写入的数据,就写数据给客户端
22 if (clientHasPendingReplies(c)) {
23 //writeToClient函数负责把回复数据写入到网络中
24 writeToClient(c, 0);
25 }
26 }
27
28 return processed;
29}
除了 el 中的 fileProc 处理函数外,还会在 IOThreadBeforeSleep 函数中调用 processClientsFromMainThread 来处理
因为主线程可能会在 IO 线程处理事件期间传递客户端而不发出通知,所以在 IO 线程进入睡眠之前再调用一次 processClientsFromMainThread 确保不会遗漏任何待处理的客户端。
总结
洋洋洒洒写了这么多,应该是能把 redis 多线程 IO 模型的大概的框架理清楚了。
相比之前的版本(6.0版本),这次更新主要是把网络 IO 操作完全放到了 IO 线程中去处理。并且主线程不会等待所有 IO 线程处理完数据后再继续处理,而是通过通知器的方式,让主线程和 IO 线程并行处理数据,从而提高了整体的吞吐量。这对于 IO 密集型的应用场景来说,提升还是比较明显的。
这其中,非常巧妙的使用了事件通知器(eventNotifier)来实现主线程和 IO 线程之间的通信,从而避免了复杂的锁机制,提高了并发性能。
如果 C 语言中有更高级的并发编程模型(比如 Golang 的协程和通道),实现起来会更加简洁和清晰。使用 C 语言实现多线程 IO 模型,代码会比较复杂,需要手动管理线程和同步机制。尤其是大量使用了函数指针等待回调函数,第一次阅读源码时,理解起来会比较吃力。不过通过这次分析,还是对 redis 的多线程 IO 模型有了更深入的理解。
这次只是简单的学习了一下大概的框架,其中的一些细节和优化手段没有展开分析,比如命令预取机制、IO 线程的暂停和恢复机制等,后续有时间再深入研究一下。

