去年写过两篇redis的源码分析文章(redis的watch和ACL),现在回头看,已经过去好久了。说起那两篇文章,收益还是挺大的。这周有时间了,继续学习redis的源码。先来看一下redis的网络处理。
本次的源码基于redis的 7.2 分支,不同版本下,一些细节可能会有差异。
这篇文章因为篇幅所限,没法把redis网络读写的所有细节都分析到,只能把这个网络处理的大概流程走一遍。
看完这篇文章,能对redis网络处理流程有个基本了解,能知道redis加入了多线程后,为什么还是单线程处理数据。能知道redis是如何实现适配不同系统下网络接口。
先来写一下redis网络需要的一些数据结构。
数据结构:
- redis可以支持多个平台(Linux,Unix),这两个平台的网络调用是不一样的,Linux中高性能的网络接口是epoll,Unix上是kqueue。因为版本的原因,在低版本的Linux内核中,epool也是不支持的,这时候就需要用到select。redis使用了 aeEvent 这个结构来封装了网络,实现了对不同网络接口的兼容。
结构的定义在 ae.h
文件中 aeEventLoop
aeEventLoop 是redis网络中,所有网络事件的管理器。我们知道,现在的网络一般都是通过多路复用来处理多个链接,多路复用的实现在不同的系统中有不同的实现,所有redis使用 aeEventLoop
的封装来实现不同平台的兼容。这个结构是全局唯一的,redis启动后完成初始化。
1typedef struct aeEventLoop {
2 int maxfd; /* highest file descriptor currently registered */
3 int setsize; /* max number of file descriptors tracked */
4 long long timeEventNextId;
5 aeFileEvent *events; /* Registered events */
6 aeFiredEvent *fired; /* Fired events */
7 aeTimeEvent *timeEventHead;
8 int stop;
9 void *apidata; /* This is used for polling API specific data */
10 aeBeforeSleepProc *beforesleep;
11 aeBeforeSleepProc *aftersleep;
12 int flags;
13} aeEventLoop;
aeFileEvent
和 aeFiredEvent
是处理网络事件的数组结构,aeTimeEvent
是处理redis定时回调的结构数组。
aeFileEvent
是注册到系统中的事件,在 epoll 中就是struct epoll_event
结构。aeFiredEvent
是这次要处理的事件。在 epoll 中就是 通过epoll_wait
返回的事件。timeEventHead
是定时器要处理的事件。
aeFileEvent 是redis中,多路复用的事件封装。每一个网络链接都会被包装成一个 aeFileEvent
对象,(因为在 Unix系列的系统中,万物皆是文件嘛,所有这里也就叫 aeFileEvent
了),然后加入到 aeEventLoop
中,最后交给操作系统中的多路复用管理。当被管理的事件被触发后,会回调对应的函数,也就是 rfileProc
或者 wfileProc
函数指针,来完成操作。
1typedef struct aeFileEvent {
2 int mask; /* one of AE_(READABLE|WRITABLE|BARRIER) */
3 aeFileProc *rfileProc;
4 aeFileProc *wfileProc;
5 void *clientData;
6} aeFileEvent;
网络处理的实现在 ae.c
中
在ae.c
中,通过宏定义,来实现对不同平台的兼容
1#ifdef HAVE_EVPORT
2#include "ae_evport.c"
3#else
4 #ifdef HAVE_EPOLL
5 #include "ae_epoll.c"
6 #else
7 #ifdef HAVE_KQUEUE
8 #include "ae_kqueue.c"
9 #else
10 #include "ae_select.c"
11 #endif
12 #endif
13#endif
ae.c#L52-L65 通过判断宏定义,然后include不同的 .c文件。
这里可能有对C语言了解不深的同学可能会疑问,include
关键字不是用来包含头文件的吗,还能用来引入 .c文件吗?其实include是编译器的一个预处理指令,就是简单的把对应的文件里的所有内容,放到include的地方而已。
打开 ae_evport.c
ae_epoll.c
ae_kqueue.c
ae_select.c
这四个文件来看,对外暴露的函数接口都是一样的。只是在不同的平台上内部的处理不同。
对外暴露了这些函数,来完成网络的处理
1//创建一个loop
2static int aeApiCreate(aeEventLoop *eventLoop)
3//设置loop的大小
4static int aeApiResize(aeEventLoop *eventLoop, int setsize)
5//释放一个 loop
6static void aeApiFree(aeEventLoop *eventLoop)
7//添加一个 eventLoop
8static int aeApiAddEvent(aeEventLoop *eventLoop, int fd, int mask)
9//删除一个 eventLoop
10static void aeApiDelEvent(aeEventLoop *eventLoop, int fd, int delmask)
11//不同的网络处理中,返回需要处理的事件
12static int aeApiPoll(aeEventLoop *eventLoop, struct timeval *tvp)
13//返回这个ae的名字,在epoll中就是返回 "epoll"
14static char *aeApiName(void)
这里我们以Linux平台为主,所以使用的是 ae_epoll.c
文件中的内容。这里先不展开分析每一个函数,因为这不是本次的重点。
启动网络服务
基本的数据结构介绍完,下面开始启动网络服务
监听client
第一步是调用 aeEventLoop *aeCreateEventLoop(int setsize)
函数,创建 aeEventLoop
这个对象。
1aeEventLoop *aeCreateEventLoop(int setsize) {
2 aeEventLoop *eventLoop;
3 int i;
4
5 monotonicInit(); /* just in case the calling app didn't initialize */
6
7 if ((eventLoop = zmalloc(sizeof(*eventLoop))) == NULL) goto err;
8 eventLoop->events = zmalloc(sizeof(aeFileEvent)*setsize);
9 eventLoop->fired = zmalloc(sizeof(aeFiredEvent)*setsize);
10 if (eventLoop->events == NULL || eventLoop->fired == NULL) goto err;
11 eventLoop->setsize = setsize;
12 eventLoop->timeEventHead = NULL;
13 eventLoop->timeEventNextId = 0;
14 eventLoop->stop = 0;
15 eventLoop->maxfd = -1;
16 eventLoop->beforesleep = NULL;
17 eventLoop->aftersleep = NULL;
18 eventLoop->flags = 0;
19 if (aeApiCreate(eventLoop) == -1) goto err;
20 /* Events with mask == AE_NONE are not set. So let's initialize the
21 * vector with it. */
22 for (i = 0; i < setsize; i++)
23 eventLoop->events[i].mask = AE_NONE;
24 return eventLoop;
25
26err:
27 if (eventLoop) {
28 zfree(eventLoop->events);
29 zfree(eventLoop->fired);
30 zfree(eventLoop);
31 }
32 return NULL;
33}
这里的处理逻辑比较简单,就是初始化 aeEventLoop
中的数据结构。如果中间出错了,就返回 NULL
这个在 void initServer(void)
函数中被调用。
aeCreateEventLoop
创建一个aeEventLoop
并赋值给 server
。再往上翻,最终的调用位置是 main
函数中的initServer();
在往下,还有一个比较重要的函数,通过调用 aeCreateTimeEvent 函数,完成定时器 对应的 eventLoop
的创建。
1long long aeCreateTimeEvent(aeEventLoop *eventLoop, long long milliseconds,
2 aeTimeProc *proc, void *clientData,
3 aeEventFinalizerProc *finalizerProc)
4{
5 long long id = eventLoop->timeEventNextId++;
6 aeTimeEvent *te;
7
8 te = zmalloc(sizeof(*te));
9 if (te == NULL) return AE_ERR;
10 te->id = id;
11 te->when = getMonotonicUs() + milliseconds * 1000;
12 te->timeProc = proc;
13 te->finalizerProc = finalizerProc;
14 te->clientData = clientData;
15 te->prev = NULL;
16 te->next = eventLoop->timeEventHead;
17 te->refcount = 0;
18 if (te->next)
19 te->next->prev = te;
20 eventLoop->timeEventHead = te;
21 return id;
22}
都是常规的处理,注意一下传递进来的几个参数
eventLoop
是之前创建的 eventLoop对象proc
是定时器触发时的回调函数milliseconds
定时器触发的时间间隔
proc 函数的定义
这里不详细展开了,就是redis中的一些定时操作。
在 main
函数中继续往下,会看到 initListeners();
这个函数,中这里也能知道,这里创建 网络的监听
1void initListeners(void) {
2 /* Setup listeners from server config for TCP/TLS/Unix */
3 int conn_index;
4 connListener *listener;
5 //判断是否需要监听普通的TCP
6 if (server.port != 0) {
7 conn_index = connectionIndexByType(CONN_TYPE_SOCKET);
8 if (conn_index < 0)
9 serverPanic("Failed finding connection listener of %s", CONN_TYPE_SOCKET);
10 listener = &server.listeners[conn_index];
11 listener->bindaddr = server.bindaddr;
12 listener->bindaddr_count = server.bindaddr_count;
13 listener->port = server.port;
14 listener->ct = connectionByType(CONN_TYPE_SOCKET);
15 }
16 //判断是否需要监听,如果需要,检查tls需要的配置是否正确
17 if (server.tls_port || server.tls_replication || server.tls_cluster) {
18 ConnectionType *ct_tls = connectionTypeTls();
19 if (!ct_tls) {
20 serverLog(LL_WARNING, "Failed finding TLS support.");
21 exit(1);
22 }
23 if (connTypeConfigure(ct_tls, &server.tls_ctx_config, 1) == C_ERR) {
24 serverLog(LL_WARNING, "Failed to configure TLS. Check logs for more info.");
25 exit(1);
26 }
27 }
28 //判断是否需要监听 TLS的TCP
29 if (server.tls_port != 0) {
30 conn_index = connectionIndexByType(CONN_TYPE_TLS);
31 if (conn_index < 0)
32 serverPanic("Failed finding connection listener of %s", CONN_TYPE_TLS);
33 listener = &server.listeners[conn_index];
34 listener->bindaddr = server.bindaddr;
35 listener->bindaddr_count = server.bindaddr_count;
36 listener->port = server.tls_port;
37 listener->ct = connectionByType(CONN_TYPE_TLS);
38 }
39 //判断是否需要unixsocket
40 if (server.unixsocket != NULL) {
41 conn_index = connectionIndexByType(CONN_TYPE_UNIX);
42 if (conn_index < 0)
43 serverPanic("Failed finding connection listener of %s", CONN_TYPE_UNIX);
44 listener = &server.listeners[conn_index];
45 listener->bindaddr = &server.unixsocket;
46 listener->bindaddr_count = 1;
47 listener->ct = connectionByType(CONN_TYPE_UNIX);
48 listener->priv = &server.unixsocketperm; /* Unix socket specified */
49 }
50
51 /* create all the configured listener, and add handler to start to accept */
52 int listen_fds = 0;
53 for (int j = 0; j < CONN_TYPE_MAX; j++) {
54 listener = &server.listeners[j];
55 if (listener->ct == NULL)
56 continue;
57
58 if (connListen(listener) == C_ERR) {
59 serverLog(LL_WARNING, "Failed listening on port %u (%s), aborting.", listener->port, listener->ct->get_type(NULL));
60 exit(1);
61 }
62 //创建新的连接处理事件
63 if (createSocketAcceptHandler(listener, connAcceptHandler(listener->ct)) != C_OK)
64 serverPanic("Unrecoverable error creating %s listener accept handler.", listener->ct->get_type(NULL));
65
66 listen_fds += listener->count;
67 }
68
69 if (listen_fds == 0) {
70 serverLog(LL_WARNING, "Configured to not listen anywhere, exiting.");
71 exit(1);
72 }
73}
在这个函数里,根据配置文件,开始监听对应的端口或者 unixsockt。
这里引入了 connListener
connection
ConnectionType
这三个结构
connListener
是对网络监听的封装connection
是对网络连接的封装ConnectionType
封装了操作网络需要的函数
ConnectionType
有三个不同的实现,分别是:
ConnectionType内部的函数这里就不展开详细介绍了。
在 initListeners
中调用了 connListen
函数,来完成对不同网络的监听,这里实现也挺简单的,就是调用了ConnectionType
中的 listen
函数,完成了对端口或者unixsocket的监听。
1static inline int connListen(connListener *listener) {
2 return listener->ct->listen(listener);
3}
以 socket
为例,对应的listen函数就是 connSocketListen
对应的初始化在这里.listen = connSocketListen
最终实现对端口监听的函数是 listenToPort(connListener *sfd)
话题收回来,回到 initListeners
函数中 调用 connListen
函数完成监听后,会再调用createSocketAcceptHandler
函数 完成网络的 Accept
处理逻辑。
1int createSocketAcceptHandler(connListener *sfd, aeFileProc *accept_handler) {
2 int j;
3
4 for (j = 0; j < sfd->count; j++) {
5 if (aeCreateFileEvent(server.el, sfd->fd[j], AE_READABLE, accept_handler,sfd) == AE_ERR) {
6 /* Rollback */
7 for (j = j-1; j >= 0; j--) aeDeleteFileEvent(server.el, sfd->fd[j], AE_READABLE);
8 return C_ERR;
9 }
10 }
11 return C_OK;
12}
这里逻辑也挺简单,遍历之前accept的 fd
,将fd加入到 aeEventLoop
中,如果失败了,就回滚所有数据,也就是中 aeEventLoop
中删除对应的事件。
aeCreateFileEvent
的实现
1int aeCreateFileEvent(aeEventLoop *eventLoop, int fd, int mask,
2 aeFileProc *proc, void *clientData)
3{
4 if (fd >= eventLoop->setsize) {
5 errno = ERANGE;
6 return AE_ERR;
7 }
8 aeFileEvent *fe = &eventLoop->events[fd];
9 //调用对应平台的系统函数,把对应的事件加入到操作系统中
10 //比如在epoll中就是调用 epoll_ctl 函数把 事件加入 epoll中
11 if (aeApiAddEvent(eventLoop, fd, mask) == -1)
12 return AE_ERR;
13 fe->mask |= mask;
14 if (mask & AE_READABLE) fe->rfileProc = proc;
15 if (mask & AE_WRITABLE) fe->wfileProc = proc;
16 fe->clientData = clientData;
17 if (fd > eventLoop->maxfd)
18 eventLoop->maxfd = fd;
19 return AE_OK;
20}
也是常规的处理逻辑,这里需要注意一下,传递的几个参数。
aeEventLoop
就是之前通过aeCreateEventLoop
函数创建的对象。fd
是对应的文件描述符mask
对应的这个event 需要处理的事件类型,比如 读,写等等proc
这个是一个回调函数,当对应的事件触发时,使用这个函数来处理。比如这里就是当fd
有读写事件触发时,就会调用这个函数,也就是当有新的客户端连接时,就通过这个函数来处理clientData
是这个event的数据
这里的 proc
是socket.c
文件中的 connSocketAcceptHandler 函数。这个函数后面网络读写在分析。到这里,redis对client的网络监听基本完成了。
监听cluster
如果redis开启了集群,需要对集群中其他的redis做网络监听,与其他的redis互联
在这里调用 clusterInitListeners 开启对 cluster
的监听。
1void clusterInitListeners(void) {
2 if (connectionIndexByType(connTypeOfCluster()->get_type(NULL)) < 0) {
3 serverLog(LL_WARNING, "Missing connection type %s, but it is required for the Cluster bus.", connTypeOfCluster()->get_type(NULL));
4 exit(1);
5 }
6
7 int port = defaultClientPort();
8 connListener *listener = &server.clistener;
9 listener->count = 0;
10 listener->bindaddr = server.bindaddr;
11 listener->bindaddr_count = server.bindaddr_count;
12 listener->port = server.cluster_port ? server.cluster_port : port + CLUSTER_PORT_INCR;
13 listener->ct = connTypeOfCluster();
14 if (connListen(listener) == C_ERR ) {
15 /* Note: the following log text is matched by the test suite. */
16 serverLog(LL_WARNING, "Failed listening on port %u (cluster), aborting.", listener->port);
17 exit(1);
18 }
19
20 if (createSocketAcceptHandler(&server.clistener, clusterAcceptHandler) != C_OK) {
21 serverPanic("Unrecoverable error creating Redis Cluster socket accept handler.");
22 }
23}
这里的逻辑和client那部分类似,不同的地方是 aeFileEvent
中的回调函数变成了 clusterAcceptHandler 这里不展开了。
然后会调用 InitServerLast 函数
这里注意关注 initThreadedIO 这个函数
redis 在6.0 版本加入了多线程支持,其实redis的多线程只是在 网络 I/O 加入了多线程处理,在处理命令时还是单线程,这里就是做了多线程的处理。
1void initThreadedIO(void) {
2 server.io_threads_active = 0; /* We start with threads not active. */
3
4 /* Indicate that io-threads are currently idle */
5 io_threads_op = IO_THREADS_OP_IDLE;
6
7 //如果就一个线程,也没必要开新的线程了
8 if (server.io_threads_num == 1) return;
9
10 //判断一下,如果线程太多了,就提示错误,不能开启太多线程
11 if (server.io_threads_num > IO_THREADS_MAX_NUM) {
12 serverLog(LL_WARNING,"Fatal: too many I/O threads configured. "
13 "The maximum number is %d.", IO_THREADS_MAX_NUM);
14 exit(1);
15 }
16
17 /* Spawn and initialize the I/O threads. */
18 for (int i = 0; i < server.io_threads_num; i++) {
19 /* Things we do for all the threads including the main thread. */
20 io_threads_list[i] = listCreate();
21 if (i == 0) continue; /* Thread 0 is the main thread. */
22
23 /* Things we do only for the additional threads. */
24 pthread_t tid;
25 pthread_mutex_init(&io_threads_mutex[i],NULL);
26 setIOPendingCount(i, 0);
27 pthread_mutex_lock(&io_threads_mutex[i]); /* Thread will be stopped. */
28 if (pthread_create(&tid,NULL,IOThreadMain,(void*)(long)i) != 0) {
29 serverLog(LL_WARNING,"Fatal: Can't initialize IO thread.");
30 exit(1);
31 }
32 io_threads[i] = tid;
33 }
34}
使用 pthread_create
创建线程线程,IOThreadMain 是对应线程的处理函数。
这个函数先不展开,等后面处理网络读写的时候,会提到。
回到 main
函数里,继续往下走,最后会调用 aeMain 来阻塞当前线程,然后处理网络。
1void aeMain(aeEventLoop *eventLoop) {
2 eventLoop->stop = 0;
3 while (!eventLoop->stop) {
4 aeProcessEvents(eventLoop, AE_ALL_EVENTS|
5 AE_CALL_BEFORE_SLEEP|
6 AE_CALL_AFTER_SLEEP);
7 }
8}
aeProcessEvents 中处理之前 添加的 aeEvent
事件回调
1int aeProcessEvents(aeEventLoop *eventLoop, int flags)
2{
3 int processed = 0, numevents;
4
5 /* Nothing to do? return ASAP */
6 if (!(flags & AE_TIME_EVENTS) && !(flags & AE_FILE_EVENTS)) return 0;
7
8 if (eventLoop->maxfd != -1 ||
9 ((flags & AE_TIME_EVENTS) && !(flags & AE_DONT_WAIT))) {
10 int j;
11 struct timeval tv, *tvp = NULL; /* NULL means infinite wait. */
12 int64_t usUntilTimer;
13 //检查beforesleep函数指针,如果有对应的函数,调用这个函数
14 if (eventLoop->beforesleep != NULL && (flags & AE_CALL_BEFORE_SLEEP))
15 eventLoop->beforesleep(eventLoop);
16
17 if ((flags & AE_DONT_WAIT) || (eventLoop->flags & AE_DONT_WAIT)) {
18 tv.tv_sec = tv.tv_usec = 0;
19 tvp = &tv;
20 } else if (flags & AE_TIME_EVENTS) {
21 usUntilTimer = usUntilEarliestTimer(eventLoop);
22 if (usUntilTimer >= 0) {
23 tv.tv_sec = usUntilTimer / 1000000;
24 tv.tv_usec = usUntilTimer % 1000000;
25 tvp = &tv;
26 }
27 }
28
29 numevents = aeApiPoll(eventLoop, tvp);
30
31 /* Don't process file events if not requested. */
32 if (!(flags & AE_FILE_EVENTS)) {
33 numevents = 0;
34 }
35
36 //检查aftersleep函数指针,如果有对应的函数,调用这个函数
37 if (eventLoop->aftersleep != NULL && flags & AE_CALL_AFTER_SLEEP)
38 eventLoop->aftersleep(eventLoop);
39 //检查就绪的事件,然后做对应的处理
40 for (j = 0; j < numevents; j++) {
41 int fd = eventLoop->fired[j].fd;
42 aeFileEvent *fe = &eventLoop->events[fd];
43 int mask = eventLoop->fired[j].mask;
44 int fired = 0; /* Number of events fired for current fd. */
45
46 int invert = fe->mask & AE_BARRIER;
47 //需要翻转的话,先处理读的回调函数
48 if (!invert && fe->mask & mask & AE_READABLE) {
49 fe->rfileProc(eventLoop,fd,fe->clientData,mask);
50 fired++;
51 fe = &eventLoop->events[fd]; /* Refresh in case of resize. */
52 }
53
54 //如果有写事件,做回调
55 if (fe->mask & mask & AE_WRITABLE) {
56 if (!fired || fe->wfileProc != fe->rfileProc) {
57 fe->wfileProc(eventLoop,fd,fe->clientData,mask);
58 fired++;
59 }
60 }
61
62 if (invert) {//如果之前没有处理读事件
63 fe = &eventLoop->events[fd]; /* Refresh in case of resize. */
64 if ((fe->mask & mask & AE_READABLE) &&
65 (!fired || fe->wfileProc != fe->rfileProc))
66 {//读回调函数指针和写回调函数指针不是同一个, 并且有读事件,回调读函数
67 fe->rfileProc(eventLoop,fd,fe->clientData,mask);
68 fired++;
69 }
70 }
71
72 processed++;
73 }
74 }
75 //如果有时间回调事件,回调时间的回调函数
76 if (flags & AE_TIME_EVENTS)
77 processed += processTimeEvents(eventLoop);
78
79 return processed; /* return the number of processed file/time events */
80}
beforeSleep 函数
afterSleep 函数
这两个函数内容非常多,很多和网络是无关的,不展开分析,后面有用的再回来介绍。
aeApiPoll
函数在不同的平台上有不同的实现,最终的目的是等待系统的回调事件。在 epoll
中是等待 epoll_wait
的回调。
网络读写处理
Listener 回调
之前在添加 listener
的时候,添加的是 AE_READABLE
事件, aeCreateFileEvent(server.el, sfd->fd[j], AE_READABLE, accept_handler,sfd)
所以,accept 的回调会在 可读的回调函数中。
accept_handler
指针是 connSocketAcceptHandler 函数
1static void connSocketAcceptHandler(aeEventLoop *el, int fd, void *privdata, int mask) {
2 int cport, cfd, max = MAX_ACCEPTS_PER_CALL;
3 char cip[NET_IP_STR_LEN];
4 UNUSED(el);
5 UNUSED(mask);
6 UNUSED(privdata);
7
8 while(max--) {
9 //把当前的 accept 的 fd 转换成一个TCP连接
10 cfd = anetTcpAccept(server.neterr, fd, cip, sizeof(cip), &cport);
11 if (cfd == ANET_ERR) {
12 if (errno != EWOULDBLOCK)
13 serverLog(LL_WARNING,
14 "Accepting client connection: %s", server.neterr);
15 return;
16 }
17 serverLog(LL_VERBOSE,"Accepted %s:%d", cip, cport);
18 acceptCommonHandler(connCreateAcceptedSocket(cfd, NULL),0,cip);
19 }
20}
在 anetTcpAccept
中 调用 anetGenericAccept 函数, 在函数中
1#ifdef HAVE_ACCEPT4
2 fd = accept4(s, sa, len, SOCK_NONBLOCK | SOCK_CLOEXEC);
3#else
4 fd = accept(s,sa,len);
5#endif
最终通过 accept4
或者 accept
系统调用,接受一个新的TCP链接。
最后调用 acceptCommonHandler
函数, 这个函数里会调用 createClient(conn) 来创建一个 client
在 createClient
中,关注这段函数
1 client *c = zmalloc(sizeof(client));
2 if (conn) {
3 connEnableTcpNoDelay(conn);
4 if (server.tcpkeepalive)
5 connKeepAlive(conn,server.tcpkeepalive);
6 //把当前的conn加入到 eventPool中,并且把 readQueryFromClient 设置为回调函数
7 connSetReadHandler(conn, readQueryFromClient);
8 connSetPrivateData(conn, c);
9 }
10 //初始化client的buffer
11 c->buf = zmalloc_usable(PROTO_REPLY_CHUNK_BYTES, &c->buf_usable_size);
12 selectDb(c,0);
13 uint64_t client_id;
connEnableTcpNoDelay
最终会调用 anet.c中的
anetSetTcpNoDelay 函数,来完成对TCP连接的设置。
connKeepAlive
最终调用到 anet.c中的
anetKeepAlive 对 KeepAlive 的设置
connSetReadHandler
通过这个函数,完成把 该TCP连接的 fd 加入的 eventLoop
中进行管理。
1static inline int connSetReadHandler(connection *conn, ConnectionCallbackFunc func) {
2 return conn->type->set_read_handler(conn, func);
3}
通过 set_read_handler
这个函数指针调用到 socket.c
中的 connSocketSetReadHandler 函数
1static int connSocketSetReadHandler(connection *conn, ConnectionCallbackFunc func) {
2 if (func == conn->read_handler) return C_OK;
3
4 conn->read_handler = func;
5 if (!conn->read_handler)
6 aeDeleteFileEvent(server.el,conn->fd,AE_READABLE);
7 else
8 if (aeCreateFileEvent(server.el,conn->fd,
9 AE_READABLE,conn->type->ae_handler,conn) == AE_ERR) return C_ERR;
10 return C_OK;
11}
调用 aeCreateFileEvent
把当前的 conn
加入到 server.el
也就是启动redis时,创建的 aeEventPool
中。
回到 createClient
函数中,connSetReadHandler(conn, readQueryFromClient);
第二参数传递的是 readQueryFromClient 这个函数的指针。这个函数 会赋值给 aeFileEvent
的 rfileProc
函数指针中。后续,如果这个 conn
有可读事件回调时,会使用 readQueryFromClient
这个函数来处理。
到这里,一个新TCP连接的处理基本就完成了。
处理client读写
读处理
当redis-server 收到client发来的数据后,也是通过 aeEventPool
来回调通知对应的 client来处理。
上面我们说过了,client收到读事件的回调函数是 readQueryFromClient
,当client对应的网络有可读事件触发时,会回调这个函数。
1void readQueryFromClient(connection *conn) {
2 client *c = connGetPrivateData(conn);
3 int nread, big_arg = 0;
4 size_t qblen, readlen;
5 //判断一下,是否开启了IO线程,如果开启了那么会从IO线程读取数据
6 if (postponeClientRead(c)) return;
7
8 //读取次数 原子性的 +1
9 atomicIncr(server.stat_total_reads_processed, 1);
10
11 readlen = PROTO_IOBUF_LEN;
12 //如果是一个multi请求,那么调整一下缓冲区的大小
13 if (c->reqtype == PROTO_REQ_MULTIBULK && c->multibulklen && c->bulklen != -1
14 && c->bulklen >= PROTO_MBULK_BIG_ARG)
15 {
16 ssize_t remaining = (size_t)(c->bulklen+2)-(sdslen(c->querybuf)-c->qb_pos);
17 big_arg = 1;
18
19 if (remaining > 0) readlen = remaining;
20
21 if (c->flags & CLIENT_MASTER && readlen < PROTO_IOBUF_LEN)
22 readlen = PROTO_IOBUF_LEN;
23 }
24
25 qblen = sdslen(c->querybuf);
26 if (!(c->flags & CLIENT_MASTER) &&
27 (big_arg || sdsalloc(c->querybuf) < PROTO_IOBUF_LEN)) {
28 c->querybuf = sdsMakeRoomForNonGreedy(c->querybuf, readlen);
29 if (c->querybuf_peak < qblen + readlen) c->querybuf_peak = qblen + readlen;
30 } else {
31 c->querybuf = sdsMakeRoomFor(c->querybuf, readlen);
32
33 readlen = sdsavail(c->querybuf);
34 }
35
36 //从网络中读取数据,如果是 socket,使用 connSocketRead 函数
37 nread = connRead(c->conn, c->querybuf+qblen, readlen);
38 if (nread == -1) {
39 if (connGetState(conn) == CONN_STATE_CONNECTED) {
40 return;
41 } else {
42 serverLog(LL_VERBOSE, "Reading from client: %s",connGetLastError(c->conn));
43 freeClientAsync(c);
44 goto done;
45 }
46 } else if (nread == 0) {
47 if (server.verbosity <= LL_VERBOSE) {
48 sds info = catClientInfoString(sdsempty(), c);
49 serverLog(LL_VERBOSE, "Client closed connection %s", info);
50 sdsfree(info);
51 }
52 freeClientAsync(c);
53 goto done;
54 }
55
56 sdsIncrLen(c->querybuf,nread);
57 qblen = sdslen(c->querybuf);
58 if (c->querybuf_peak < qblen) c->querybuf_peak = qblen;
59
60 c->lastinteraction = server.unixtime;
61 if (c->flags & CLIENT_MASTER) {
62 c->read_reploff += nread;
63 atomicIncr(server.stat_net_repl_input_bytes, nread);
64 } else {
65 atomicIncr(server.stat_net_input_bytes, nread);
66 }
67
68 if (!(c->flags & CLIENT_MASTER) && sdslen(c->querybuf) > server.client_max_querybuf_len) {
69 sds ci = catClientInfoString(sdsempty(),c), bytes = sdsempty();
70
71 bytes = sdscatrepr(bytes,c->querybuf,64);
72 serverLog(LL_WARNING,"Closing client that reached max query buffer length: %s (qbuf initial bytes: %s)", ci, bytes);
73 sdsfree(ci);
74 sdsfree(bytes);
75 freeClientAsync(c);
76 goto done;
77 }
78 //处理读到的数据,尝试解析RESP协议
79 if (processInputBuffer(c) == C_ERR)
80 c = NULL;
81
82done:
83 beforeNextClient(c);
84}
在这个函数中判断是否需要使用多线程。
1int postponeClientRead(client *c) {
2 //在这里判断是否需要使用多线程从网络读取数据
3 if (server.io_threads_active &&
4 server.io_threads_do_reads &&
5 !ProcessingEventsWhileBlocked &&
6 !(c->flags & (CLIENT_MASTER|CLIENT_SLAVE|CLIENT_BLOCKED)) &&
7 io_threads_op == IO_THREADS_OP_IDLE)
8 {
9 //如果需要多线程读取,把当前client加入到`clients_pending_read`中,等待后续调用
10 listAddNodeHead(server.clients_pending_read,c);
11 c->pending_read_list_node = listFirst(server.clients_pending_read);
12 return 1;
13 } else {
14 return 0;
15 }
16}
在配置文件中
io-threads
IO线程数量,默认只有一个,也就是不用多线程读写网络io-threads-do-reads
是否开启多线程读和解析RESP功能,如果不开启,那么多线程只会处理写
如果只有一个client就绪(也就是没有并发请求)时,即使开了多线程和IO线程读,也不会使用多线程去读网络,只有当多个client同时读写数据,这时候会在 handleClientsWithPendingReadsUsingThreads
中开启 多线程处理。
1if (!server.io_threads_active) startThreadedIO();
此时,当client读事件就绪后,再次回调 readQueryFromClient
函数后,会进入 listAddNodeHead(server.clients_pending_read,c);
加入链表,等待IO线程处理
如果需要多线程
在主线程里调用 使用 aeEventPool
的 beforesleep
函数指针调用 beforeSleep 函数来执行 sleep前的处理函数,然后在调用 handleClientsWithPendingReadsUsingThreads 函数来判断是否需要使用多线程来读数据。
最后的读取是由 readQueryFromClient 函数来完成数据的读取。
readQueryFromClient
函数可以通过 handleClientsWithPendingReadsUsingThreads
函数在主线程调用,也可以通过 IOThreadMain 函数通过IO线程并发读取。
1//启动IO多线程读取数据
2io_threads_op = IO_THREADS_OP_READ;
3for (int j = 1; j < server.io_threads_num; j++) {
4 int count = listLength(io_threads_list[j]);
5 setIOPendingCount(j, count);
6}
7.......
8//等待IO线程读取完成
9while(1) {
10 unsigned long pending = 0;
11 for (int j = 1; j < server.io_threads_num; j++)
12 pending += getIOPendingCount(j);
13 if (pending == 0) break;
14}
当执行完 beforesleep
函数后,再由 aeFileEvent
的 rfileProc
函数指针调用 processInputBuffer 函数内主要是判断和解析字符串,就不展开分析了
主要关注这部分逻辑
1 if (c->reqtype == PROTO_REQ_INLINE) {
2 //如果是一行的字符串,解析方式
3 if (processInlineBuffer(c) != C_OK) break;
4 } else if (c->reqtype == PROTO_REQ_MULTIBULK) {
5 //如果是多行的,解析方式
6 if (processMultibulkBuffer(c) != C_OK) break;
7 } else {
8 serverPanic("Unknown request type");
9 }
10
11 //如果解析到的参数个数是0,那么说明还没有解析完成,重置client
12 if (c->argc == 0) {
13 resetClient(c);
14 } else {
15 //判断一下是否在IO线程里,如果是的话,设置一下flag,然后结束.
16 //因为redis的多线程只能用来读写网络,操作redis的数据库还是用单线的
17 if (io_threads_op != IO_THREADS_OP_IDLE) {
18 serverAssert(io_threads_op == IO_THREADS_OP_READ);
19 c->flags |= CLIENT_PENDING_COMMAND;
20 break;
21 }
22
23 //开始执行命令了
24 if (processCommandAndResetClient(c) == C_ERR) {
25 return C_ERR;
26 }
27 }
最终会调用 processCommand 函数执行命令
执行命令的逻辑先不管,这不是本文的重点。
写处理
当命令处理完成后,把有需要回复客户端的数据写入client的buf中。
最后会调用 writeToClient 函数把 client buf 中的数据写入到网络中,完成数据返回。
writeToClient
有多个回调路径。但是都是由 aeEventpool
中的 aftersleep
函数指针 调用 beforeSleep
函数来完成的。
在 beforeSleep
中调用 handleClientsWithPendingWritesUsingThreads 函数。
这个函数也比较长,就不贴了,只挑一些重点的。这期贴了不少源码,一直看源码,也挺枯燥的。这个函数的大概逻辑和 handleClientsWithPendingReadsUsingThreads
函数的差不多,只不过这里处理的是写网络。
1if (server.io_threads_num == 1 || stopThreadedIOIfNeeded()) {
2 return handleClientsWithPendingWrites();
3}
这里判断一下,如果没有开启多线程或者停了IO线程,就使用 handleClientsWithPendingWrites
函数在主线程处理写逻辑。
handleClientsWithPendingWrites 里的逻辑很简单,遍历 server.clients_pending_write
中的client,然后调用 writeToClient 把数据写入到网络中。
回到 handleClientsWithPendingWritesUsingThreads
函数中,调用 startThreadedIO()
启动 IO线程并发处理写入。
1//在主线程中也会处理写入
2listRewind(io_threads_list[0],&li);
3while((ln = listNext(&li))) {
4 client *c = listNodeValue(ln);
5 writeToClient(c,0);
6}
7listEmpty(io_threads_list[0]);
8//和上面的多线程读一样,这里也会等待所有IO线程任务结束
9while(1) {
10 unsigned long pending = 0;
11 for (int j = 1; j < server.io_threads_num; j++)
12 pending += getIOPendingCount(j);
13 if (pending == 0) break;
14}
到这基本redis的整个网络处理流程大概就写完了。注意,这只是个大概,其中的一些细节还没有深入分析。比如在 aeProcessEvents
函数中,为什么有 beforesleep
和 aftersleep
这两个函数指针,以及调用两个函数中的先后调用顺序。 aeFileEvent
中 rfileProc
和 wfileProc
这两个函数指针的调用顺序,以及调用的函数的逻辑。每一个都可以拿出来单独写一篇。
以后有时间在单独拿出来分析吧。如果有时间再这篇文章补一个流程图。
redis用C写的,C对多态的支持不太好,不像面向对象的语言那样,可以很方便的使用多态。所以在redis的源码里,大量使用了函数指针和回调函数,来间接的实现了多态。这样会在阅读源码的时候增加了很多理解成本。
但是鲁迅先生曾经说过,在断点面前,一切语法都是纸老虎。所以遇到复杂的函数调用逻辑,使用断点去看一下函数的调用关系,堆栈,基本都能解决问题。
也快过年了,提前祝所有读者 新年快乐