0%

redis源码学习|网络

去年写过两篇redis的源码分析文章(redis的watch和ACL),现在回头看,已经过去好久了。说起那两篇文章,收益还是挺大的。这周有时间了,继续学习redis的源码。先来看一下redis的网络处理。

本次的源码基于redis的 7.2 分支,不同版本下,一些细节可能会有差异。

这篇文章因为篇幅所限,没法把redis网络读写的所有细节都分析到,只能把这个网络处理的大概流程走一遍。

看完这篇文章,能对redis网络处理流程有个基本了解,能知道redis加入了多线程后,为什么还是单线程处理数据。能知道redis是如何实现适配不同系统下网络接口。

先来写一下redis网络需要的一些数据结构。

数据结构:

  • redis可以支持多个平台(Linux,Unix),这两个平台的网络调用是不一样的,Linux中高性能的网络接口是epoll,Unix上是kqueue。因为版本的原因,在低版本的Linux内核中,epool也是不支持的,这时候就需要用到select。redis使用了 aeEvent 这个结构来封装了网络,实现了对不同网络接口的兼容。

结构的定义在 ae.h 文件中 aeEventLoop

aeEventLoop 是redis网络中,所有网络事件的管理器。我们知道,现在的网络一般都是通过多路复用来处理多个链接,多路复用的实现在不同的系统中有不同的实现,所有redis使用 aeEventLoop 的封装来实现不同平台的兼容。这个结构是全局唯一的,redis启动后完成初始化。

1
2
3
4
5
6
7
8
9
10
11
12
13
typedef struct aeEventLoop {
int maxfd; /* highest file descriptor currently registered */
int setsize; /* max number of file descriptors tracked */
long long timeEventNextId;
aeFileEvent *events; /* Registered events */
aeFiredEvent *fired; /* Fired events */
aeTimeEvent *timeEventHead;
int stop;
void *apidata; /* This is used for polling API specific data */
aeBeforeSleepProc *beforesleep;
aeBeforeSleepProc *aftersleep;
int flags;
} aeEventLoop;

aeFileEventaeFiredEvent 是处理网络事件的数组结构,aeTimeEvent 是处理redis定时回调的结构数组。

  • aeFileEvent 是注册到系统中的事件,在 epoll 中就是 struct epoll_event 结构。
  • aeFiredEvent 是这次要处理的事件。在 epoll 中就是 通过 epoll_wait 返回的事件。
  • timeEventHead 是定时器要处理的事件。

aeFileEvent 是redis中,多路复用的事件封装。每一个网络链接都会被包装成一个 aeFileEvent 对象,(因为在 Unix系列的系统中,万物皆是文件嘛,所有这里也就叫 aeFileEvent了),然后加入到 aeEventLoop 中,最后交给操作系统中的多路复用管理。当被管理的事件被触发后,会回调对应的函数,也就是 rfileProc 或者 wfileProc 函数指针,来完成操作。

1
2
3
4
5
6
typedef struct aeFileEvent {
int mask; /* one of AE_(READABLE|WRITABLE|BARRIER) */
aeFileProc *rfileProc;
aeFileProc *wfileProc;
void *clientData;
} aeFileEvent;

网络处理的实现在 ae.c

ae.c中,通过宏定义,来实现对不同平台的兼容

1
2
3
4
5
6
7
8
9
10
11
12
13
#ifdef HAVE_EVPORT
#include "ae_evport.c"
#else
#ifdef HAVE_EPOLL
#include "ae_epoll.c"
#else
#ifdef HAVE_KQUEUE
#include "ae_kqueue.c"
#else
#include "ae_select.c"
#endif
#endif
#endif

ae.c#L52-L65
通过判断宏定义,然后include不同的 .c文件。

这里可能有对C语言了解不深的同学可能会疑问,include 关键字不是用来包含头文件的吗,还能用来引入 .c文件吗?其实include是编译器的一个预处理指令,就是简单的把对应的文件里的所有内容,放到include的地方而已。

打开 ae_evport.c ae_epoll.c ae_kqueue.c ae_select.c 这四个文件来看,对外暴露的函数接口都是一样的。只是在不同的平台上内部的处理不同。

对外暴露了这些函数,来完成网络的处理

1
2
3
4
5
6
7
8
9
10
11
12
13
14
//创建一个loop
static int aeApiCreate(aeEventLoop *eventLoop)
//设置loop的大小
static int aeApiResize(aeEventLoop *eventLoop, int setsize)
//释放一个 loop
static void aeApiFree(aeEventLoop *eventLoop)
//添加一个 eventLoop
static int aeApiAddEvent(aeEventLoop *eventLoop, int fd, int mask)
//删除一个 eventLoop
static void aeApiDelEvent(aeEventLoop *eventLoop, int fd, int delmask)
//不同的网络处理中,返回需要处理的事件
static int aeApiPoll(aeEventLoop *eventLoop, struct timeval *tvp)
//返回这个ae的名字,在epoll中就是返回 "epoll"
static char *aeApiName(void)

这里我们以Linux平台为主,所以使用的是 ae_epoll.c文件中的内容。这里先不展开分析每一个函数,因为这不是本次的重点。

启动网络服务

基本的数据结构介绍完,下面开始启动网络服务

监听client

第一步是调用 aeEventLoop *aeCreateEventLoop(int setsize) 函数,创建 aeEventLoop 这个对象。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
aeEventLoop *aeCreateEventLoop(int setsize) {
aeEventLoop *eventLoop;
int i;

monotonicInit(); /* just in case the calling app didn't initialize */

if ((eventLoop = zmalloc(sizeof(*eventLoop))) == NULL) goto err;
eventLoop->events = zmalloc(sizeof(aeFileEvent)*setsize);
eventLoop->fired = zmalloc(sizeof(aeFiredEvent)*setsize);
if (eventLoop->events == NULL || eventLoop->fired == NULL) goto err;
eventLoop->setsize = setsize;
eventLoop->timeEventHead = NULL;
eventLoop->timeEventNextId = 0;
eventLoop->stop = 0;
eventLoop->maxfd = -1;
eventLoop->beforesleep = NULL;
eventLoop->aftersleep = NULL;
eventLoop->flags = 0;
if (aeApiCreate(eventLoop) == -1) goto err;
/* Events with mask == AE_NONE are not set. So let's initialize the
* vector with it. */
for (i = 0; i < setsize; i++)
eventLoop->events[i].mask = AE_NONE;
return eventLoop;

err:
if (eventLoop) {
zfree(eventLoop->events);
zfree(eventLoop->fired);
zfree(eventLoop);
}
return NULL;
}

这里的处理逻辑比较简单,就是初始化 aeEventLoop 中的数据结构。如果中间出错了,就返回 NULL

这个在 void initServer(void) 函数中被调用。
aeCreateEventLoop

创建一个aeEventLoop 并赋值给 server。再往上翻,最终的调用位置是 main 函数中的initServer();

在往下,还有一个比较重要的函数,通过调用 aeCreateTimeEvent 函数,完成定时器 对应的 eventLoop 的创建。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
long long aeCreateTimeEvent(aeEventLoop *eventLoop, long long milliseconds,
aeTimeProc *proc, void *clientData,
aeEventFinalizerProc *finalizerProc)
{
long long id = eventLoop->timeEventNextId++;
aeTimeEvent *te;

te = zmalloc(sizeof(*te));
if (te == NULL) return AE_ERR;
te->id = id;
te->when = getMonotonicUs() + milliseconds * 1000;
te->timeProc = proc;
te->finalizerProc = finalizerProc;
te->clientData = clientData;
te->prev = NULL;
te->next = eventLoop->timeEventHead;
te->refcount = 0;
if (te->next)
te->next->prev = te;
eventLoop->timeEventHead = te;
return id;
}

都是常规的处理,注意一下传递进来的几个参数

  • eventLoop 是之前创建的 eventLoop对象
  • proc 是定时器触发时的回调函数
  • milliseconds 定时器触发的时间间隔

proc 函数的定义

这里不详细展开了,就是redis中的一些定时操作。

main 函数中继续往下,会看到 initListeners(); 这个函数,中这里也能知道,这里创建 网络的监听

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
void initListeners(void) {
/* Setup listeners from server config for TCP/TLS/Unix */
int conn_index;
connListener *listener;
//判断是否需要监听普通的TCP
if (server.port != 0) {
conn_index = connectionIndexByType(CONN_TYPE_SOCKET);
if (conn_index < 0)
serverPanic("Failed finding connection listener of %s", CONN_TYPE_SOCKET);
listener = &server.listeners[conn_index];
listener->bindaddr = server.bindaddr;
listener->bindaddr_count = server.bindaddr_count;
listener->port = server.port;
listener->ct = connectionByType(CONN_TYPE_SOCKET);
}
//判断是否需要监听,如果需要,检查tls需要的配置是否正确
if (server.tls_port || server.tls_replication || server.tls_cluster) {
ConnectionType *ct_tls = connectionTypeTls();
if (!ct_tls) {
serverLog(LL_WARNING, "Failed finding TLS support.");
exit(1);
}
if (connTypeConfigure(ct_tls, &server.tls_ctx_config, 1) == C_ERR) {
serverLog(LL_WARNING, "Failed to configure TLS. Check logs for more info.");
exit(1);
}
}
//判断是否需要监听 TLS的TCP
if (server.tls_port != 0) {
conn_index = connectionIndexByType(CONN_TYPE_TLS);
if (conn_index < 0)
serverPanic("Failed finding connection listener of %s", CONN_TYPE_TLS);
listener = &server.listeners[conn_index];
listener->bindaddr = server.bindaddr;
listener->bindaddr_count = server.bindaddr_count;
listener->port = server.tls_port;
listener->ct = connectionByType(CONN_TYPE_TLS);
}
//判断是否需要unixsocket
if (server.unixsocket != NULL) {
conn_index = connectionIndexByType(CONN_TYPE_UNIX);
if (conn_index < 0)
serverPanic("Failed finding connection listener of %s", CONN_TYPE_UNIX);
listener = &server.listeners[conn_index];
listener->bindaddr = &server.unixsocket;
listener->bindaddr_count = 1;
listener->ct = connectionByType(CONN_TYPE_UNIX);
listener->priv = &server.unixsocketperm; /* Unix socket specified */
}

/* create all the configured listener, and add handler to start to accept */
int listen_fds = 0;
for (int j = 0; j < CONN_TYPE_MAX; j++) {
listener = &server.listeners[j];
if (listener->ct == NULL)
continue;

if (connListen(listener) == C_ERR) {
serverLog(LL_WARNING, "Failed listening on port %u (%s), aborting.", listener->port, listener->ct->get_type(NULL));
exit(1);
}
//创建新的连接处理事件
if (createSocketAcceptHandler(listener, connAcceptHandler(listener->ct)) != C_OK)
serverPanic("Unrecoverable error creating %s listener accept handler.", listener->ct->get_type(NULL));

listen_fds += listener->count;
}

if (listen_fds == 0) {
serverLog(LL_WARNING, "Configured to not listen anywhere, exiting.");
exit(1);
}
}

在这个函数里,根据配置文件,开始监听对应的端口或者 unixsockt。

这里引入了 connListener connection ConnectionType 这三个结构

  • connListener 是对网络监听的封装
  • connection 是对网络连接的封装
  • ConnectionType 封装了操作网络需要的函数

ConnectionType 有三个不同的实现,分别是:

  • CT_Socket 对应的TCP CT_Socket
  • CT_Unix 对应的是 unix socket CT_Unix
  • CT_TLS 对应的 使用了 TLS的TCP CT_TLS

ConnectionType 定义

ConnectionType内部的函数这里就不展开详细介绍了。

initListeners 中调用了 connListen 函数,来完成对不同网络的监听,这里实现也挺简单的,就是调用了ConnectionType 中的 listen 函数,完成了对端口或者unixsocket的监听。

1
2
3
static inline int connListen(connListener *listener) {
return listener->ct->listen(listener);
}

socket 为例,对应的listen函数就是 connSocketListen 对应的初始化在这里.listen = connSocketListen

最终实现对端口监听的函数是 listenToPort(connListener *sfd)

话题收回来,回到 initListeners 函数中 调用 connListen 函数完成监听后,会再调用createSocketAcceptHandler 函数 完成网络的 Accept 处理逻辑。

1
2
3
4
5
6
7
8
9
10
11
12
int createSocketAcceptHandler(connListener *sfd, aeFileProc *accept_handler) {
int j;

for (j = 0; j < sfd->count; j++) {
if (aeCreateFileEvent(server.el, sfd->fd[j], AE_READABLE, accept_handler,sfd) == AE_ERR) {
/* Rollback */
for (j = j-1; j >= 0; j--) aeDeleteFileEvent(server.el, sfd->fd[j], AE_READABLE);
return C_ERR;
}
}
return C_OK;
}

这里逻辑也挺简单,遍历之前accept的 fd,将fd加入到 aeEventLoop 中,如果失败了,就回滚所有数据,也就是中 aeEventLoop 中删除对应的事件。

aeCreateFileEvent的实现

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
int aeCreateFileEvent(aeEventLoop *eventLoop, int fd, int mask,
aeFileProc *proc, void *clientData)
{
if (fd >= eventLoop->setsize) {
errno = ERANGE;
return AE_ERR;
}
aeFileEvent *fe = &eventLoop->events[fd];
//调用对应平台的系统函数,把对应的事件加入到操作系统中
//比如在epoll中就是调用 epoll_ctl 函数把 事件加入 epoll中
if (aeApiAddEvent(eventLoop, fd, mask) == -1)
return AE_ERR;
fe->mask |= mask;
if (mask & AE_READABLE) fe->rfileProc = proc;
if (mask & AE_WRITABLE) fe->wfileProc = proc;
fe->clientData = clientData;
if (fd > eventLoop->maxfd)
eventLoop->maxfd = fd;
return AE_OK;
}

也是常规的处理逻辑,这里需要注意一下,传递的几个参数。

  • aeEventLoop 就是之前通过 aeCreateEventLoop 函数创建的对象。
  • fd 是对应的文件描述符
  • mask 对应的这个event 需要处理的事件类型,比如 读,写等等
  • proc 这个是一个回调函数,当对应的事件触发时,使用这个函数来处理。比如这里就是当fd有读写事件触发时,就会调用这个函数,也就是当有新的客户端连接时,就通过这个函数来处理
  • clientData 是这个event的数据

这里的 procsocket.c文件中的 connSocketAcceptHandler 函数。这个函数后面网络读写在分析。到这里,redis对client的网络监听基本完成了。

监听cluster

如果redis开启了集群,需要对集群中其他的redis做网络监听,与其他的redis互联

开启cluster

在这里调用 clusterInitListeners 开启对 cluster 的监听。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
void clusterInitListeners(void) {
if (connectionIndexByType(connTypeOfCluster()->get_type(NULL)) < 0) {
serverLog(LL_WARNING, "Missing connection type %s, but it is required for the Cluster bus.", connTypeOfCluster()->get_type(NULL));
exit(1);
}

int port = defaultClientPort();
connListener *listener = &server.clistener;
listener->count = 0;
listener->bindaddr = server.bindaddr;
listener->bindaddr_count = server.bindaddr_count;
listener->port = server.cluster_port ? server.cluster_port : port + CLUSTER_PORT_INCR;
listener->ct = connTypeOfCluster();
if (connListen(listener) == C_ERR ) {
/* Note: the following log text is matched by the test suite. */
serverLog(LL_WARNING, "Failed listening on port %u (cluster), aborting.", listener->port);
exit(1);
}

if (createSocketAcceptHandler(&server.clistener, clusterAcceptHandler) != C_OK) {
serverPanic("Unrecoverable error creating Redis Cluster socket accept handler.");
}
}

这里的逻辑和client那部分类似,不同的地方是 aeFileEvent 中的回调函数变成了 clusterAcceptHandler 这里不展开了。

然后会调用 InitServerLast 函数

这里注意关注 initThreadedIO 这个函数

redis 在6.0 版本加入了多线程支持,其实redis的多线程只是在 网络 I/O 加入了多线程处理,在处理命令时还是单线程,这里就是做了多线程的处理。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
void initThreadedIO(void) {
server.io_threads_active = 0; /* We start with threads not active. */

/* Indicate that io-threads are currently idle */
io_threads_op = IO_THREADS_OP_IDLE;

//如果就一个线程,也没必要开新的线程了
if (server.io_threads_num == 1) return;

//判断一下,如果线程太多了,就提示错误,不能开启太多线程
if (server.io_threads_num > IO_THREADS_MAX_NUM) {
serverLog(LL_WARNING,"Fatal: too many I/O threads configured. "
"The maximum number is %d.", IO_THREADS_MAX_NUM);
exit(1);
}

/* Spawn and initialize the I/O threads. */
for (int i = 0; i < server.io_threads_num; i++) {
/* Things we do for all the threads including the main thread. */
io_threads_list[i] = listCreate();
if (i == 0) continue; /* Thread 0 is the main thread. */

/* Things we do only for the additional threads. */
pthread_t tid;
pthread_mutex_init(&io_threads_mutex[i],NULL);
setIOPendingCount(i, 0);
pthread_mutex_lock(&io_threads_mutex[i]); /* Thread will be stopped. */
if (pthread_create(&tid,NULL,IOThreadMain,(void*)(long)i) != 0) {
serverLog(LL_WARNING,"Fatal: Can't initialize IO thread.");
exit(1);
}
io_threads[i] = tid;
}
}

使用 pthread_create 创建线程线程,IOThreadMain 是对应线程的处理函数。

这个函数先不展开,等后面处理网络读写的时候,会提到。

回到 main 函数里,继续往下走,最后会调用 aeMain 来阻塞当前线程,然后处理网络。

1
2
3
4
5
6
7
8
void aeMain(aeEventLoop *eventLoop) {
eventLoop->stop = 0;
while (!eventLoop->stop) {
aeProcessEvents(eventLoop, AE_ALL_EVENTS|
AE_CALL_BEFORE_SLEEP|
AE_CALL_AFTER_SLEEP);
}
}

aeProcessEvents 中处理之前 添加的 aeEvent 事件回调

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
int aeProcessEvents(aeEventLoop *eventLoop, int flags)
{
int processed = 0, numevents;

/* Nothing to do? return ASAP */
if (!(flags & AE_TIME_EVENTS) && !(flags & AE_FILE_EVENTS)) return 0;

if (eventLoop->maxfd != -1 ||
((flags & AE_TIME_EVENTS) && !(flags & AE_DONT_WAIT))) {
int j;
struct timeval tv, *tvp = NULL; /* NULL means infinite wait. */
int64_t usUntilTimer;
//检查beforesleep函数指针,如果有对应的函数,调用这个函数
if (eventLoop->beforesleep != NULL && (flags & AE_CALL_BEFORE_SLEEP))
eventLoop->beforesleep(eventLoop);

if ((flags & AE_DONT_WAIT) || (eventLoop->flags & AE_DONT_WAIT)) {
tv.tv_sec = tv.tv_usec = 0;
tvp = &tv;
} else if (flags & AE_TIME_EVENTS) {
usUntilTimer = usUntilEarliestTimer(eventLoop);
if (usUntilTimer >= 0) {
tv.tv_sec = usUntilTimer / 1000000;
tv.tv_usec = usUntilTimer % 1000000;
tvp = &tv;
}
}

numevents = aeApiPoll(eventLoop, tvp);

/* Don't process file events if not requested. */
if (!(flags & AE_FILE_EVENTS)) {
numevents = 0;
}

//检查aftersleep函数指针,如果有对应的函数,调用这个函数
if (eventLoop->aftersleep != NULL && flags & AE_CALL_AFTER_SLEEP)
eventLoop->aftersleep(eventLoop);
//检查就绪的事件,然后做对应的处理
for (j = 0; j < numevents; j++) {
int fd = eventLoop->fired[j].fd;
aeFileEvent *fe = &eventLoop->events[fd];
int mask = eventLoop->fired[j].mask;
int fired = 0; /* Number of events fired for current fd. */

int invert = fe->mask & AE_BARRIER;
//需要翻转的话,先处理读的回调函数
if (!invert && fe->mask & mask & AE_READABLE) {
fe->rfileProc(eventLoop,fd,fe->clientData,mask);
fired++;
fe = &eventLoop->events[fd]; /* Refresh in case of resize. */
}

//如果有写事件,做回调
if (fe->mask & mask & AE_WRITABLE) {
if (!fired || fe->wfileProc != fe->rfileProc) {
fe->wfileProc(eventLoop,fd,fe->clientData,mask);
fired++;
}
}

if (invert) {//如果之前没有处理读事件
fe = &eventLoop->events[fd]; /* Refresh in case of resize. */
if ((fe->mask & mask & AE_READABLE) &&
(!fired || fe->wfileProc != fe->rfileProc))
{//读回调函数指针和写回调函数指针不是同一个, 并且有读事件,回调读函数
fe->rfileProc(eventLoop,fd,fe->clientData,mask);
fired++;
}
}

processed++;
}
}
//如果有时间回调事件,回调时间的回调函数
if (flags & AE_TIME_EVENTS)
processed += processTimeEvents(eventLoop);

return processed; /* return the number of processed file/time events */
}

beforeSleep 函数

afterSleep 函数

这两个函数内容非常多,很多和网络是无关的,不展开分析,后面有用的再回来介绍。

aeApiPoll 函数在不同的平台上有不同的实现,最终的目的是等待系统的回调事件。在 epoll 中是等待 epoll_wait 的回调。

网络读写处理

Listener 回调

之前在添加 listener 的时候,添加的是 AE_READABLE 事件, aeCreateFileEvent(server.el, sfd->fd[j], AE_READABLE, accept_handler,sfd)

所以,accept 的回调会在 可读的回调函数中。

accept_handler指针是 connSocketAcceptHandler 函数

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
static void connSocketAcceptHandler(aeEventLoop *el, int fd, void *privdata, int mask) {
int cport, cfd, max = MAX_ACCEPTS_PER_CALL;
char cip[NET_IP_STR_LEN];
UNUSED(el);
UNUSED(mask);
UNUSED(privdata);

while(max--) {
//把当前的 accept 的 fd 转换成一个TCP连接
cfd = anetTcpAccept(server.neterr, fd, cip, sizeof(cip), &cport);
if (cfd == ANET_ERR) {
if (errno != EWOULDBLOCK)
serverLog(LL_WARNING,
"Accepting client connection: %s", server.neterr);
return;
}
serverLog(LL_VERBOSE,"Accepted %s:%d", cip, cport);
acceptCommonHandler(connCreateAcceptedSocket(cfd, NULL),0,cip);
}
}

anetTcpAccept 中 调用 anetGenericAccept 函数, 在函数中

1
2
3
4
5
#ifdef HAVE_ACCEPT4
fd = accept4(s, sa, len, SOCK_NONBLOCK | SOCK_CLOEXEC);
#else
fd = accept(s,sa,len);
#endif

最终通过 accept4 或者 accept 系统调用,接受一个新的TCP链接。

最后调用 acceptCommonHandler 函数, 这个函数里会调用 createClient(conn) 来创建一个 client

createClient 中,关注这段函数

1
2
3
4
5
6
7
8
9
10
11
12
13
client *c = zmalloc(sizeof(client));
if (conn) {
connEnableTcpNoDelay(conn);
if (server.tcpkeepalive)
connKeepAlive(conn,server.tcpkeepalive);
//把当前的conn加入到 eventPool中,并且把 readQueryFromClient 设置为回调函数
connSetReadHandler(conn, readQueryFromClient);
connSetPrivateData(conn, c);
}
//初始化client的buffer
c->buf = zmalloc_usable(PROTO_REPLY_CHUNK_BYTES, &c->buf_usable_size);
selectDb(c,0);
uint64_t client_id;

connEnableTcpNoDelay 最终会调用 anet.c中的 anetSetTcpNoDelay 函数,来完成对TCP连接的设置。

connKeepAlive 最终调用到 anet.c中的 anetKeepAliveKeepAlive 的设置

connSetReadHandler 通过这个函数,完成把 该TCP连接的 fd 加入的 eventLoop 中进行管理。

1
2
3
static inline int connSetReadHandler(connection *conn, ConnectionCallbackFunc func) {
return conn->type->set_read_handler(conn, func);
}

通过 set_read_handler 这个函数指针调用到 socket.c 中的 connSocketSetReadHandler 函数

1
2
3
4
5
6
7
8
9
10
11
static int connSocketSetReadHandler(connection *conn, ConnectionCallbackFunc func) {
if (func == conn->read_handler) return C_OK;

conn->read_handler = func;
if (!conn->read_handler)
aeDeleteFileEvent(server.el,conn->fd,AE_READABLE);
else
if (aeCreateFileEvent(server.el,conn->fd,
AE_READABLE,conn->type->ae_handler,conn) == AE_ERR) return C_ERR;
return C_OK;
}

调用 aeCreateFileEvent 把当前的 conn 加入到 server.el 也就是启动redis时,创建的 aeEventPool 中。

回到 createClient 函数中,connSetReadHandler(conn, readQueryFromClient); 第二参数传递的是 readQueryFromClient 这个函数的指针。这个函数 会赋值给 aeFileEventrfileProc 函数指针中。后续,如果这个 conn 有可读事件回调时,会使用 readQueryFromClient 这个函数来处理。

到这里,一个新TCP连接的处理基本就完成了。

处理client读写

读处理

当redis-server 收到client发来的数据后,也是通过 aeEventPool 来回调通知对应的 client来处理。

上面我们说过了,client收到读事件的回调函数是 readQueryFromClient,当client对应的网络有可读事件触发时,会回调这个函数。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
void readQueryFromClient(connection *conn) {
client *c = connGetPrivateData(conn);
int nread, big_arg = 0;
size_t qblen, readlen;
//判断一下,是否开启了IO线程,如果开启了那么会从IO线程读取数据
if (postponeClientRead(c)) return;

//读取次数 原子性的 +1
atomicIncr(server.stat_total_reads_processed, 1);

readlen = PROTO_IOBUF_LEN;
//如果是一个multi请求,那么调整一下缓冲区的大小
if (c->reqtype == PROTO_REQ_MULTIBULK && c->multibulklen && c->bulklen != -1
&& c->bulklen >= PROTO_MBULK_BIG_ARG)
{
ssize_t remaining = (size_t)(c->bulklen+2)-(sdslen(c->querybuf)-c->qb_pos);
big_arg = 1;

if (remaining > 0) readlen = remaining;

if (c->flags & CLIENT_MASTER && readlen < PROTO_IOBUF_LEN)
readlen = PROTO_IOBUF_LEN;
}

qblen = sdslen(c->querybuf);
if (!(c->flags & CLIENT_MASTER) &&
(big_arg || sdsalloc(c->querybuf) < PROTO_IOBUF_LEN)) {
c->querybuf = sdsMakeRoomForNonGreedy(c->querybuf, readlen);
if (c->querybuf_peak < qblen + readlen) c->querybuf_peak = qblen + readlen;
} else {
c->querybuf = sdsMakeRoomFor(c->querybuf, readlen);

readlen = sdsavail(c->querybuf);
}

//从网络中读取数据,如果是 socket,使用 connSocketRead 函数
nread = connRead(c->conn, c->querybuf+qblen, readlen);
if (nread == -1) {
if (connGetState(conn) == CONN_STATE_CONNECTED) {
return;
} else {
serverLog(LL_VERBOSE, "Reading from client: %s",connGetLastError(c->conn));
freeClientAsync(c);
goto done;
}
} else if (nread == 0) {
if (server.verbosity <= LL_VERBOSE) {
sds info = catClientInfoString(sdsempty(), c);
serverLog(LL_VERBOSE, "Client closed connection %s", info);
sdsfree(info);
}
freeClientAsync(c);
goto done;
}

sdsIncrLen(c->querybuf,nread);
qblen = sdslen(c->querybuf);
if (c->querybuf_peak < qblen) c->querybuf_peak = qblen;

c->lastinteraction = server.unixtime;
if (c->flags & CLIENT_MASTER) {
c->read_reploff += nread;
atomicIncr(server.stat_net_repl_input_bytes, nread);
} else {
atomicIncr(server.stat_net_input_bytes, nread);
}

if (!(c->flags & CLIENT_MASTER) && sdslen(c->querybuf) > server.client_max_querybuf_len) {
sds ci = catClientInfoString(sdsempty(),c), bytes = sdsempty();

bytes = sdscatrepr(bytes,c->querybuf,64);
serverLog(LL_WARNING,"Closing client that reached max query buffer length: %s (qbuf initial bytes: %s)", ci, bytes);
sdsfree(ci);
sdsfree(bytes);
freeClientAsync(c);
goto done;
}
//处理读到的数据,尝试解析RESP协议
if (processInputBuffer(c) == C_ERR)
c = NULL;

done:
beforeNextClient(c);
}

在这个函数中判断是否需要使用多线程。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
int postponeClientRead(client *c) {
//在这里判断是否需要使用多线程从网络读取数据
if (server.io_threads_active &&
server.io_threads_do_reads &&
!ProcessingEventsWhileBlocked &&
!(c->flags & (CLIENT_MASTER|CLIENT_SLAVE|CLIENT_BLOCKED)) &&
io_threads_op == IO_THREADS_OP_IDLE)
{
//如果需要多线程读取,把当前client加入到`clients_pending_read`中,等待后续调用
listAddNodeHead(server.clients_pending_read,c);
c->pending_read_list_node = listFirst(server.clients_pending_read);
return 1;
} else {
return 0;
}
}

在配置文件中

  • io-threads IO线程数量,默认只有一个,也就是不用多线程读写网络
  • io-threads-do-reads 是否开启多线程读和解析RESP功能,如果不开启,那么多线程只会处理写

如果只有一个client就绪(也就是没有并发请求)时,即使开了多线程和IO线程读,也不会使用多线程去读网络,只有当多个client同时读写数据,这时候会在 handleClientsWithPendingReadsUsingThreads 中开启 多线程处理。

开启多线程读写

1
if (!server.io_threads_active) startThreadedIO();

此时,当client读事件就绪后,再次回调 readQueryFromClient 函数后,会进入 listAddNodeHead(server.clients_pending_read,c); 加入链表,等待IO线程处理

如果需要多线程

在主线程里调用 使用 aeEventPoolbeforesleep 函数指针调用 beforeSleep 函数来执行 sleep前的处理函数,然后在调用 handleClientsWithPendingReadsUsingThreads 函数来判断是否需要使用多线程来读数据。

最后的读取是由 readQueryFromClient 函数来完成数据的读取。

readQueryFromClient 函数可以通过 handleClientsWithPendingReadsUsingThreads 函数在主线程调用,也可以通过 IOThreadMain 函数通过IO线程并发读取。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
//启动IO多线程读取数据
io_threads_op = IO_THREADS_OP_READ;
for (int j = 1; j < server.io_threads_num; j++) {
int count = listLength(io_threads_list[j]);
setIOPendingCount(j, count);
}
.......
//等待IO线程读取完成
while(1) {
unsigned long pending = 0;
for (int j = 1; j < server.io_threads_num; j++)
pending += getIOPendingCount(j);
if (pending == 0) break;
}

当执行完 beforesleep 函数后,再由 aeFileEventrfileProc 函数指针调用 processInputBuffer 函数内主要是判断和解析字符串,就不展开分析了

主要关注这部分逻辑

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
if (c->reqtype == PROTO_REQ_INLINE) {
//如果是一行的字符串,解析方式
if (processInlineBuffer(c) != C_OK) break;
} else if (c->reqtype == PROTO_REQ_MULTIBULK) {
//如果是多行的,解析方式
if (processMultibulkBuffer(c) != C_OK) break;
} else {
serverPanic("Unknown request type");
}

//如果解析到的参数个数是0,那么说明还没有解析完成,重置client
if (c->argc == 0) {
resetClient(c);
} else {
//判断一下是否在IO线程里,如果是的话,设置一下flag,然后结束.
//因为redis的多线程只能用来读写网络,操作redis的数据库还是用单线的
if (io_threads_op != IO_THREADS_OP_IDLE) {
serverAssert(io_threads_op == IO_THREADS_OP_READ);
c->flags |= CLIENT_PENDING_COMMAND;
break;
}

//开始执行命令了
if (processCommandAndResetClient(c) == C_ERR) {
return C_ERR;
}
}

最终会调用 processCommand 函数执行命令

执行命令的逻辑先不管,这不是本文的重点。

写处理

当命令处理完成后,把有需要回复客户端的数据写入client的buf中。

最后会调用 writeToClient 函数把 client buf 中的数据写入到网络中,完成数据返回。

writeToClient 有多个回调路径。但是都是由 aeEventpool 中的 aftersleep 函数指针 调用 beforeSleep 函数来完成的。

beforeSleep 中调用 handleClientsWithPendingWritesUsingThreads 函数。

这个函数也比较长,就不贴了,只挑一些重点的。这期贴了不少源码,一直看源码,也挺枯燥的。这个函数的大概逻辑和 handleClientsWithPendingReadsUsingThreads 函数的差不多,只不过这里处理的是写网络。

1
2
3
if (server.io_threads_num == 1 || stopThreadedIOIfNeeded()) {
return handleClientsWithPendingWrites();
}

这里判断一下,如果没有开启多线程或者停了IO线程,就使用 handleClientsWithPendingWrites 函数在主线程处理写逻辑。

handleClientsWithPendingWrites 里的逻辑很简单,遍历 server.clients_pending_write 中的client,然后调用 writeToClient 把数据写入到网络中。

回到 handleClientsWithPendingWritesUsingThreads 函数中,调用 startThreadedIO() 启动 IO线程并发处理写入。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
//在主线程中也会处理写入
listRewind(io_threads_list[0],&li);
while((ln = listNext(&li))) {
client *c = listNodeValue(ln);
writeToClient(c,0);
}
listEmpty(io_threads_list[0]);
//和上面的多线程读一样,这里也会等待所有IO线程任务结束
while(1) {
unsigned long pending = 0;
for (int j = 1; j < server.io_threads_num; j++)
pending += getIOPendingCount(j);
if (pending == 0) break;
}

到这基本redis的整个网络处理流程大概就写完了。注意,这只是个大概,其中的一些细节还没有深入分析。比如在 aeProcessEvents 函数中,为什么有 beforesleepaftersleep 这两个函数指针,以及调用两个函数中的先后调用顺序。 aeFileEventrfileProcwfileProc 这两个函数指针的调用顺序,以及调用的函数的逻辑。每一个都可以拿出来单独写一篇。

以后有时间在单独拿出来分析吧。如果有时间再这篇文章补一个流程图。

redis用C写的,C对多态的支持不太好,不像面向对象的语言那样,可以很方便的使用多态。所以在redis的源码里,大量使用了函数指针和回调函数,来间接的实现了多态。这样会在阅读源码的时候增加了很多理解成本。

但是鲁迅先生曾经说过,在断点面前,一切语法都是纸老虎。所以遇到复杂的函数调用逻辑,使用断点去看一下函数的调用关系,堆栈,基本都能解决问题。

也快过年了,提前祝所有读者 新年快乐