Featured image of post redis源码学习|网络

redis源码学习|网络

redis的网络处理是redis的核心功能之一,redis的网络处理是单线程的,这里通过源码分析,来看一下redis是如何处理网络的。

去年写过两篇redis的源码分析文章(redis的watch和ACL),现在回头看,已经过去好久了。说起那两篇文章,收益还是挺大的。这周有时间了,继续学习redis的源码。先来看一下redis的网络处理。

本次的源码基于redis的 7.2 分支,不同版本下,一些细节可能会有差异。

这篇文章因为篇幅所限,没法把redis网络读写的所有细节都分析到,只能把这个网络处理的大概流程走一遍。

看完这篇文章,能对redis网络处理流程有个基本了解,能知道redis加入了多线程后,为什么还是单线程处理数据。能知道redis是如何实现适配不同系统下网络接口。

先来写一下redis网络需要的一些数据结构。

数据结构:

  • redis可以支持多个平台(Linux,Unix),这两个平台的网络调用是不一样的,Linux中高性能的网络接口是epoll,Unix上是kqueue。因为版本的原因,在低版本的Linux内核中,epool也是不支持的,这时候就需要用到select。redis使用了 aeEvent 这个结构来封装了网络,实现了对不同网络接口的兼容。

结构的定义在 ae.h 文件中 aeEventLoop

aeEventLoop 是redis网络中,所有网络事件的管理器。我们知道,现在的网络一般都是通过多路复用来处理多个链接,多路复用的实现在不同的系统中有不同的实现,所有redis使用 aeEventLoop 的封装来实现不同平台的兼容。这个结构是全局唯一的,redis启动后完成初始化。

 1typedef struct aeEventLoop {
 2    int maxfd;   /* highest file descriptor currently registered */
 3    int setsize; /* max number of file descriptors tracked */
 4    long long timeEventNextId;
 5    aeFileEvent *events; /* Registered events */
 6    aeFiredEvent *fired; /* Fired events */
 7    aeTimeEvent *timeEventHead;
 8    int stop;
 9    void *apidata; /* This is used for polling API specific data */
10    aeBeforeSleepProc *beforesleep;
11    aeBeforeSleepProc *aftersleep;
12    int flags;
13} aeEventLoop;

aeFileEventaeFiredEvent 是处理网络事件的数组结构,aeTimeEvent 是处理redis定时回调的结构数组。

  • aeFileEvent 是注册到系统中的事件,在 epoll 中就是 struct epoll_event 结构。
  • aeFiredEvent 是这次要处理的事件。在 epoll 中就是 通过 epoll_wait 返回的事件。
  • timeEventHead 是定时器要处理的事件。

aeFileEvent 是redis中,多路复用的事件封装。每一个网络链接都会被包装成一个 aeFileEvent 对象,(因为在 Unix系列的系统中,万物皆是文件嘛,所有这里也就叫 aeFileEvent了),然后加入到 aeEventLoop 中,最后交给操作系统中的多路复用管理。当被管理的事件被触发后,会回调对应的函数,也就是 rfileProc 或者 wfileProc 函数指针,来完成操作。

1typedef struct aeFileEvent {
2    int mask; /* one of AE_(READABLE|WRITABLE|BARRIER) */
3    aeFileProc *rfileProc;
4    aeFileProc *wfileProc;
5    void *clientData;
6} aeFileEvent;

网络处理的实现在 ae.c

ae.c中,通过宏定义,来实现对不同平台的兼容

 1#ifdef HAVE_EVPORT
 2#include "ae_evport.c"
 3#else
 4    #ifdef HAVE_EPOLL
 5    #include "ae_epoll.c"
 6    #else
 7        #ifdef HAVE_KQUEUE
 8        #include "ae_kqueue.c"
 9        #else
10        #include "ae_select.c"
11        #endif
12    #endif
13#endif

ae.c#L52-L65 通过判断宏定义,然后include不同的 .c文件。

这里可能有对C语言了解不深的同学可能会疑问,include 关键字不是用来包含头文件的吗,还能用来引入 .c文件吗?其实include是编译器的一个预处理指令,就是简单的把对应的文件里的所有内容,放到include的地方而已。

打开 ae_evport.c ae_epoll.c ae_kqueue.c ae_select.c 这四个文件来看,对外暴露的函数接口都是一样的。只是在不同的平台上内部的处理不同。

对外暴露了这些函数,来完成网络的处理

 1//创建一个loop
 2static int aeApiCreate(aeEventLoop *eventLoop)
 3//设置loop的大小
 4static int aeApiResize(aeEventLoop *eventLoop, int setsize)
 5//释放一个 loop
 6static void aeApiFree(aeEventLoop *eventLoop)
 7//添加一个 eventLoop
 8static int aeApiAddEvent(aeEventLoop *eventLoop, int fd, int mask)
 9//删除一个 eventLoop
10static void aeApiDelEvent(aeEventLoop *eventLoop, int fd, int delmask)
11//不同的网络处理中,返回需要处理的事件
12static int aeApiPoll(aeEventLoop *eventLoop, struct timeval *tvp)
13//返回这个ae的名字,在epoll中就是返回 "epoll"
14static char *aeApiName(void)

这里我们以Linux平台为主,所以使用的是 ae_epoll.c文件中的内容。这里先不展开分析每一个函数,因为这不是本次的重点。

启动网络服务

基本的数据结构介绍完,下面开始启动网络服务

监听client

第一步是调用 aeEventLoop *aeCreateEventLoop(int setsize) 函数,创建 aeEventLoop 这个对象。

 1aeEventLoop *aeCreateEventLoop(int setsize) {
 2    aeEventLoop *eventLoop;
 3    int i;
 4
 5    monotonicInit();    /* just in case the calling app didn't initialize */
 6
 7    if ((eventLoop = zmalloc(sizeof(*eventLoop))) == NULL) goto err;
 8    eventLoop->events = zmalloc(sizeof(aeFileEvent)*setsize);
 9    eventLoop->fired = zmalloc(sizeof(aeFiredEvent)*setsize);
10    if (eventLoop->events == NULL || eventLoop->fired == NULL) goto err;
11    eventLoop->setsize = setsize;
12    eventLoop->timeEventHead = NULL;
13    eventLoop->timeEventNextId = 0;
14    eventLoop->stop = 0;
15    eventLoop->maxfd = -1;
16    eventLoop->beforesleep = NULL;
17    eventLoop->aftersleep = NULL;
18    eventLoop->flags = 0;
19    if (aeApiCreate(eventLoop) == -1) goto err;
20    /* Events with mask == AE_NONE are not set. So let's initialize the
21     * vector with it. */
22    for (i = 0; i < setsize; i++)
23        eventLoop->events[i].mask = AE_NONE;
24    return eventLoop;
25
26err:
27    if (eventLoop) {
28        zfree(eventLoop->events);
29        zfree(eventLoop->fired);
30        zfree(eventLoop);
31    }
32    return NULL;
33}

这里的处理逻辑比较简单,就是初始化 aeEventLoop 中的数据结构。如果中间出错了,就返回 NULL

这个在 void initServer(void) 函数中被调用。 aeCreateEventLoop

创建一个aeEventLoop 并赋值给 server。再往上翻,最终的调用位置是 main 函数中的initServer();

在往下,还有一个比较重要的函数,通过调用 aeCreateTimeEvent 函数,完成定时器 对应的 eventLoop 的创建。

 1long long aeCreateTimeEvent(aeEventLoop *eventLoop, long long milliseconds,
 2        aeTimeProc *proc, void *clientData,
 3        aeEventFinalizerProc *finalizerProc)
 4{
 5    long long id = eventLoop->timeEventNextId++;
 6    aeTimeEvent *te;
 7
 8    te = zmalloc(sizeof(*te));
 9    if (te == NULL) return AE_ERR;
10    te->id = id;
11    te->when = getMonotonicUs() + milliseconds * 1000;
12    te->timeProc = proc;
13    te->finalizerProc = finalizerProc;
14    te->clientData = clientData;
15    te->prev = NULL;
16    te->next = eventLoop->timeEventHead;
17    te->refcount = 0;
18    if (te->next)
19        te->next->prev = te;
20    eventLoop->timeEventHead = te;
21    return id;
22}

都是常规的处理,注意一下传递进来的几个参数

  • eventLoop 是之前创建的 eventLoop对象
  • proc 是定时器触发时的回调函数
  • milliseconds 定时器触发的时间间隔

proc 函数的定义

这里不详细展开了,就是redis中的一些定时操作。

main 函数中继续往下,会看到 initListeners(); 这个函数,中这里也能知道,这里创建 网络的监听

 1void initListeners(void) {
 2    /* Setup listeners from server config for TCP/TLS/Unix */
 3    int conn_index;
 4    connListener *listener;
 5    //判断是否需要监听普通的TCP
 6    if (server.port != 0) {
 7        conn_index = connectionIndexByType(CONN_TYPE_SOCKET);
 8        if (conn_index < 0)
 9            serverPanic("Failed finding connection listener of %s", CONN_TYPE_SOCKET);
10        listener = &server.listeners[conn_index];
11        listener->bindaddr = server.bindaddr;
12        listener->bindaddr_count = server.bindaddr_count;
13        listener->port = server.port;
14        listener->ct = connectionByType(CONN_TYPE_SOCKET);
15    }
16    //判断是否需要监听,如果需要,检查tls需要的配置是否正确
17    if (server.tls_port || server.tls_replication || server.tls_cluster) {
18        ConnectionType *ct_tls = connectionTypeTls();
19        if (!ct_tls) {
20            serverLog(LL_WARNING, "Failed finding TLS support.");
21            exit(1);
22        }
23        if (connTypeConfigure(ct_tls, &server.tls_ctx_config, 1) == C_ERR) {
24            serverLog(LL_WARNING, "Failed to configure TLS. Check logs for more info.");
25            exit(1);
26        }
27    }
28    //判断是否需要监听 TLS的TCP
29    if (server.tls_port != 0) {
30        conn_index = connectionIndexByType(CONN_TYPE_TLS);
31        if (conn_index < 0)
32            serverPanic("Failed finding connection listener of %s", CONN_TYPE_TLS);
33        listener = &server.listeners[conn_index];
34        listener->bindaddr = server.bindaddr;
35        listener->bindaddr_count = server.bindaddr_count;
36        listener->port = server.tls_port;
37        listener->ct = connectionByType(CONN_TYPE_TLS);
38    }
39    //判断是否需要unixsocket
40    if (server.unixsocket != NULL) {
41        conn_index = connectionIndexByType(CONN_TYPE_UNIX);
42        if (conn_index < 0)
43            serverPanic("Failed finding connection listener of %s", CONN_TYPE_UNIX);
44        listener = &server.listeners[conn_index];
45        listener->bindaddr = &server.unixsocket;
46        listener->bindaddr_count = 1;
47        listener->ct = connectionByType(CONN_TYPE_UNIX);
48        listener->priv = &server.unixsocketperm; /* Unix socket specified */
49    }
50
51    /* create all the configured listener, and add handler to start to accept */
52    int listen_fds = 0;
53    for (int j = 0; j < CONN_TYPE_MAX; j++) {
54        listener = &server.listeners[j];
55        if (listener->ct == NULL)
56            continue;
57
58        if (connListen(listener) == C_ERR) {
59            serverLog(LL_WARNING, "Failed listening on port %u (%s), aborting.", listener->port, listener->ct->get_type(NULL));
60            exit(1);
61        }
62        //创建新的连接处理事件
63        if (createSocketAcceptHandler(listener, connAcceptHandler(listener->ct)) != C_OK)
64            serverPanic("Unrecoverable error creating %s listener accept handler.", listener->ct->get_type(NULL));
65
66       listen_fds += listener->count;
67    }
68
69    if (listen_fds == 0) {
70        serverLog(LL_WARNING, "Configured to not listen anywhere, exiting.");
71        exit(1);
72    }
73}

在这个函数里,根据配置文件,开始监听对应的端口或者 unixsockt。

这里引入了 connListener connection ConnectionType 这三个结构

  • connListener 是对网络监听的封装
  • connection 是对网络连接的封装
  • ConnectionType 封装了操作网络需要的函数

ConnectionType 有三个不同的实现,分别是:

  • CT_Socket 对应的TCP CT_Socket
  • CT_Unix 对应的是 unix socket CT_Unix
  • CT_TLS 对应的 使用了 TLS的TCP CT_TLS

ConnectionType 定义

ConnectionType内部的函数这里就不展开详细介绍了。

initListeners 中调用了 connListen 函数,来完成对不同网络的监听,这里实现也挺简单的,就是调用了ConnectionType 中的 listen 函数,完成了对端口或者unixsocket的监听。

1static inline int connListen(connListener *listener) {
2    return listener->ct->listen(listener);
3}

socket 为例,对应的listen函数就是 connSocketListen 对应的初始化在这里.listen = connSocketListen

最终实现对端口监听的函数是 listenToPort(connListener *sfd)

话题收回来,回到 initListeners 函数中 调用 connListen 函数完成监听后,会再调用createSocketAcceptHandler 函数 完成网络的 Accept 处理逻辑。

 1int createSocketAcceptHandler(connListener *sfd, aeFileProc *accept_handler) {
 2    int j;
 3
 4    for (j = 0; j < sfd->count; j++) {
 5        if (aeCreateFileEvent(server.el, sfd->fd[j], AE_READABLE, accept_handler,sfd) == AE_ERR) {
 6            /* Rollback */
 7            for (j = j-1; j >= 0; j--) aeDeleteFileEvent(server.el, sfd->fd[j], AE_READABLE);
 8            return C_ERR;
 9        }
10    }
11    return C_OK;
12}

这里逻辑也挺简单,遍历之前accept的 fd,将fd加入到 aeEventLoop 中,如果失败了,就回滚所有数据,也就是中 aeEventLoop 中删除对应的事件。

aeCreateFileEvent的实现

 1int aeCreateFileEvent(aeEventLoop *eventLoop, int fd, int mask,
 2        aeFileProc *proc, void *clientData)
 3{
 4    if (fd >= eventLoop->setsize) {
 5        errno = ERANGE;
 6        return AE_ERR;
 7    }
 8    aeFileEvent *fe = &eventLoop->events[fd];
 9    //调用对应平台的系统函数,把对应的事件加入到操作系统中
10    //比如在epoll中就是调用 epoll_ctl 函数把 事件加入 epoll中
11    if (aeApiAddEvent(eventLoop, fd, mask) == -1)
12        return AE_ERR;
13    fe->mask |= mask;
14    if (mask & AE_READABLE) fe->rfileProc = proc;
15    if (mask & AE_WRITABLE) fe->wfileProc = proc;
16    fe->clientData = clientData;
17    if (fd > eventLoop->maxfd)
18        eventLoop->maxfd = fd;
19    return AE_OK;
20}

也是常规的处理逻辑,这里需要注意一下,传递的几个参数。

  • aeEventLoop 就是之前通过 aeCreateEventLoop 函数创建的对象。
  • fd 是对应的文件描述符
  • mask 对应的这个event 需要处理的事件类型,比如 读,写等等
  • proc 这个是一个回调函数,当对应的事件触发时,使用这个函数来处理。比如这里就是当fd有读写事件触发时,就会调用这个函数,也就是当有新的客户端连接时,就通过这个函数来处理
  • clientData 是这个event的数据

这里的 procsocket.c文件中的 connSocketAcceptHandler 函数。这个函数后面网络读写在分析。到这里,redis对client的网络监听基本完成了。

监听cluster

如果redis开启了集群,需要对集群中其他的redis做网络监听,与其他的redis互联

开启cluster

在这里调用 clusterInitListeners 开启对 cluster 的监听。

 1void clusterInitListeners(void) {
 2    if (connectionIndexByType(connTypeOfCluster()->get_type(NULL)) < 0) {
 3        serverLog(LL_WARNING, "Missing connection type %s, but it is required for the Cluster bus.", connTypeOfCluster()->get_type(NULL));
 4        exit(1);
 5    }
 6
 7    int port = defaultClientPort();
 8    connListener *listener = &server.clistener;
 9    listener->count = 0;
10    listener->bindaddr = server.bindaddr;
11    listener->bindaddr_count = server.bindaddr_count;
12    listener->port = server.cluster_port ? server.cluster_port : port + CLUSTER_PORT_INCR;
13    listener->ct = connTypeOfCluster();
14    if (connListen(listener) == C_ERR ) {
15        /* Note: the following log text is matched by the test suite. */
16        serverLog(LL_WARNING, "Failed listening on port %u (cluster), aborting.", listener->port);
17        exit(1);
18    }
19    
20    if (createSocketAcceptHandler(&server.clistener, clusterAcceptHandler) != C_OK) {
21        serverPanic("Unrecoverable error creating Redis Cluster socket accept handler.");
22    }
23}

这里的逻辑和client那部分类似,不同的地方是 aeFileEvent 中的回调函数变成了 clusterAcceptHandler 这里不展开了。

然后会调用 InitServerLast 函数

这里注意关注 initThreadedIO 这个函数

redis 在6.0 版本加入了多线程支持,其实redis的多线程只是在 网络 I/O 加入了多线程处理,在处理命令时还是单线程,这里就是做了多线程的处理。

 1void initThreadedIO(void) {
 2    server.io_threads_active = 0; /* We start with threads not active. */
 3
 4    /* Indicate that io-threads are currently idle */
 5    io_threads_op = IO_THREADS_OP_IDLE;
 6
 7    //如果就一个线程,也没必要开新的线程了
 8    if (server.io_threads_num == 1) return;
 9
10    //判断一下,如果线程太多了,就提示错误,不能开启太多线程
11    if (server.io_threads_num > IO_THREADS_MAX_NUM) {
12        serverLog(LL_WARNING,"Fatal: too many I/O threads configured. "
13                             "The maximum number is %d.", IO_THREADS_MAX_NUM);
14        exit(1);
15    }
16
17    /* Spawn and initialize the I/O threads. */
18    for (int i = 0; i < server.io_threads_num; i++) {
19        /* Things we do for all the threads including the main thread. */
20        io_threads_list[i] = listCreate();
21        if (i == 0) continue; /* Thread 0 is the main thread. */
22
23        /* Things we do only for the additional threads. */
24        pthread_t tid;
25        pthread_mutex_init(&io_threads_mutex[i],NULL);
26        setIOPendingCount(i, 0);
27        pthread_mutex_lock(&io_threads_mutex[i]); /* Thread will be stopped. */
28        if (pthread_create(&tid,NULL,IOThreadMain,(void*)(long)i) != 0) {
29            serverLog(LL_WARNING,"Fatal: Can't initialize IO thread.");
30            exit(1);
31        }
32        io_threads[i] = tid;
33    }
34}

使用 pthread_create 创建线程线程,IOThreadMain 是对应线程的处理函数。

这个函数先不展开,等后面处理网络读写的时候,会提到。

回到 main 函数里,继续往下走,最后会调用 aeMain 来阻塞当前线程,然后处理网络。

1void aeMain(aeEventLoop *eventLoop) {
2    eventLoop->stop = 0;
3    while (!eventLoop->stop) {
4        aeProcessEvents(eventLoop, AE_ALL_EVENTS|
5                                   AE_CALL_BEFORE_SLEEP|
6                                   AE_CALL_AFTER_SLEEP);
7    }
8}

aeProcessEvents 中处理之前 添加的 aeEvent 事件回调

 1int aeProcessEvents(aeEventLoop *eventLoop, int flags)
 2{
 3    int processed = 0, numevents;
 4
 5    /* Nothing to do? return ASAP */
 6    if (!(flags & AE_TIME_EVENTS) && !(flags & AE_FILE_EVENTS)) return 0;
 7
 8    if (eventLoop->maxfd != -1 ||
 9        ((flags & AE_TIME_EVENTS) && !(flags & AE_DONT_WAIT))) {
10        int j;
11        struct timeval tv, *tvp = NULL; /* NULL means infinite wait. */
12        int64_t usUntilTimer;
13        //检查beforesleep函数指针,如果有对应的函数,调用这个函数
14        if (eventLoop->beforesleep != NULL && (flags & AE_CALL_BEFORE_SLEEP))
15            eventLoop->beforesleep(eventLoop);
16
17        if ((flags & AE_DONT_WAIT) || (eventLoop->flags & AE_DONT_WAIT)) {
18            tv.tv_sec = tv.tv_usec = 0;
19            tvp = &tv;
20        } else if (flags & AE_TIME_EVENTS) {
21            usUntilTimer = usUntilEarliestTimer(eventLoop);
22            if (usUntilTimer >= 0) {
23                tv.tv_sec = usUntilTimer / 1000000;
24                tv.tv_usec = usUntilTimer % 1000000;
25                tvp = &tv;
26            }
27        }
28
29        numevents = aeApiPoll(eventLoop, tvp);
30
31        /* Don't process file events if not requested. */
32        if (!(flags & AE_FILE_EVENTS)) {
33            numevents = 0;
34        }
35
36        //检查aftersleep函数指针,如果有对应的函数,调用这个函数
37        if (eventLoop->aftersleep != NULL && flags & AE_CALL_AFTER_SLEEP)
38            eventLoop->aftersleep(eventLoop);
39        //检查就绪的事件,然后做对应的处理
40        for (j = 0; j < numevents; j++) {
41            int fd = eventLoop->fired[j].fd;
42            aeFileEvent *fe = &eventLoop->events[fd];
43            int mask = eventLoop->fired[j].mask;
44            int fired = 0; /* Number of events fired for current fd. */
45
46            int invert = fe->mask & AE_BARRIER;
47            //需要翻转的话,先处理读的回调函数
48            if (!invert && fe->mask & mask & AE_READABLE) {
49                fe->rfileProc(eventLoop,fd,fe->clientData,mask);
50                fired++;
51                fe = &eventLoop->events[fd]; /* Refresh in case of resize. */
52            }
53
54            //如果有写事件,做回调
55            if (fe->mask & mask & AE_WRITABLE) {
56                if (!fired || fe->wfileProc != fe->rfileProc) {
57                    fe->wfileProc(eventLoop,fd,fe->clientData,mask);
58                    fired++;
59                }
60            }
61
62            if (invert) {//如果之前没有处理读事件
63                fe = &eventLoop->events[fd]; /* Refresh in case of resize. */
64                if ((fe->mask & mask & AE_READABLE) &&
65                    (!fired || fe->wfileProc != fe->rfileProc))
66                {//读回调函数指针和写回调函数指针不是同一个, 并且有读事件,回调读函数
67                    fe->rfileProc(eventLoop,fd,fe->clientData,mask);
68                    fired++;
69                }
70            }
71
72            processed++;
73        }
74    }
75    //如果有时间回调事件,回调时间的回调函数
76    if (flags & AE_TIME_EVENTS)
77        processed += processTimeEvents(eventLoop);
78
79    return processed; /* return the number of processed file/time events */
80}

beforeSleep 函数

afterSleep 函数

这两个函数内容非常多,很多和网络是无关的,不展开分析,后面有用的再回来介绍。

aeApiPoll 函数在不同的平台上有不同的实现,最终的目的是等待系统的回调事件。在 epoll 中是等待 epoll_wait 的回调。

网络读写处理

Listener 回调

之前在添加 listener 的时候,添加的是 AE_READABLE 事件, aeCreateFileEvent(server.el, sfd->fd[j], AE_READABLE, accept_handler,sfd)

所以,accept 的回调会在 可读的回调函数中。

accept_handler指针是 connSocketAcceptHandler 函数

 1static void connSocketAcceptHandler(aeEventLoop *el, int fd, void *privdata, int mask) {
 2    int cport, cfd, max = MAX_ACCEPTS_PER_CALL;
 3    char cip[NET_IP_STR_LEN];
 4    UNUSED(el);
 5    UNUSED(mask);
 6    UNUSED(privdata);
 7
 8    while(max--) {
 9        //把当前的 accept 的 fd 转换成一个TCP连接
10        cfd = anetTcpAccept(server.neterr, fd, cip, sizeof(cip), &cport);
11        if (cfd == ANET_ERR) {
12            if (errno != EWOULDBLOCK)
13                serverLog(LL_WARNING,
14                    "Accepting client connection: %s", server.neterr);
15            return;
16        }
17        serverLog(LL_VERBOSE,"Accepted %s:%d", cip, cport);
18        acceptCommonHandler(connCreateAcceptedSocket(cfd, NULL),0,cip);
19    }
20}

anetTcpAccept 中 调用 anetGenericAccept 函数, 在函数中

1#ifdef HAVE_ACCEPT4
2        fd = accept4(s, sa, len,  SOCK_NONBLOCK | SOCK_CLOEXEC);
3#else
4        fd = accept(s,sa,len);
5#endif

最终通过 accept4 或者 accept 系统调用,接受一个新的TCP链接。

最后调用 acceptCommonHandler 函数, 这个函数里会调用 createClient(conn) 来创建一个 client

createClient 中,关注这段函数

 1    client *c = zmalloc(sizeof(client));
 2    if (conn) {
 3        connEnableTcpNoDelay(conn);
 4        if (server.tcpkeepalive)
 5            connKeepAlive(conn,server.tcpkeepalive);
 6        //把当前的conn加入到 eventPool中,并且把 readQueryFromClient 设置为回调函数
 7        connSetReadHandler(conn, readQueryFromClient);
 8        connSetPrivateData(conn, c);
 9    }
10    //初始化client的buffer
11    c->buf = zmalloc_usable(PROTO_REPLY_CHUNK_BYTES, &c->buf_usable_size);
12    selectDb(c,0);
13    uint64_t client_id;

connEnableTcpNoDelay 最终会调用 anet.c中的 anetSetTcpNoDelay 函数,来完成对TCP连接的设置。

connKeepAlive 最终调用到 anet.c中的 anetKeepAliveKeepAlive 的设置

connSetReadHandler 通过这个函数,完成把 该TCP连接的 fd 加入的 eventLoop 中进行管理。

1static inline int connSetReadHandler(connection *conn, ConnectionCallbackFunc func) {
2    return conn->type->set_read_handler(conn, func);
3}

通过 set_read_handler 这个函数指针调用到 socket.c 中的 connSocketSetReadHandler 函数

 1static int connSocketSetReadHandler(connection *conn, ConnectionCallbackFunc func) {
 2    if (func == conn->read_handler) return C_OK;
 3
 4    conn->read_handler = func;
 5    if (!conn->read_handler)
 6        aeDeleteFileEvent(server.el,conn->fd,AE_READABLE);
 7    else
 8        if (aeCreateFileEvent(server.el,conn->fd,
 9                    AE_READABLE,conn->type->ae_handler,conn) == AE_ERR) return C_ERR;
10    return C_OK;
11}

调用 aeCreateFileEvent 把当前的 conn 加入到 server.el 也就是启动redis时,创建的 aeEventPool 中。

回到 createClient 函数中,connSetReadHandler(conn, readQueryFromClient); 第二参数传递的是 readQueryFromClient 这个函数的指针。这个函数 会赋值给 aeFileEventrfileProc 函数指针中。后续,如果这个 conn 有可读事件回调时,会使用 readQueryFromClient 这个函数来处理。

到这里,一个新TCP连接的处理基本就完成了。

处理client读写

读处理

当redis-server 收到client发来的数据后,也是通过 aeEventPool 来回调通知对应的 client来处理。

上面我们说过了,client收到读事件的回调函数是 readQueryFromClient,当client对应的网络有可读事件触发时,会回调这个函数。

 1void readQueryFromClient(connection *conn) {
 2    client *c = connGetPrivateData(conn);
 3    int nread, big_arg = 0;
 4    size_t qblen, readlen;
 5    //判断一下,是否开启了IO线程,如果开启了那么会从IO线程读取数据
 6    if (postponeClientRead(c)) return;
 7
 8    //读取次数 原子性的 +1
 9    atomicIncr(server.stat_total_reads_processed, 1);
10
11    readlen = PROTO_IOBUF_LEN;
12    //如果是一个multi请求,那么调整一下缓冲区的大小
13    if (c->reqtype == PROTO_REQ_MULTIBULK && c->multibulklen && c->bulklen != -1
14        && c->bulklen >= PROTO_MBULK_BIG_ARG)
15    {
16        ssize_t remaining = (size_t)(c->bulklen+2)-(sdslen(c->querybuf)-c->qb_pos);
17        big_arg = 1;
18
19        if (remaining > 0) readlen = remaining;
20
21        if (c->flags & CLIENT_MASTER && readlen < PROTO_IOBUF_LEN)
22            readlen = PROTO_IOBUF_LEN;
23    }
24
25    qblen = sdslen(c->querybuf);
26    if (!(c->flags & CLIENT_MASTER) && 
27        (big_arg || sdsalloc(c->querybuf) < PROTO_IOBUF_LEN)) {
28        c->querybuf = sdsMakeRoomForNonGreedy(c->querybuf, readlen);
29        if (c->querybuf_peak < qblen + readlen) c->querybuf_peak = qblen + readlen;
30    } else {
31        c->querybuf = sdsMakeRoomFor(c->querybuf, readlen);
32
33        readlen = sdsavail(c->querybuf);
34    }
35    
36    //从网络中读取数据,如果是 socket,使用 connSocketRead 函数
37    nread = connRead(c->conn, c->querybuf+qblen, readlen);
38    if (nread == -1) {
39        if (connGetState(conn) == CONN_STATE_CONNECTED) {
40            return;
41        } else {
42            serverLog(LL_VERBOSE, "Reading from client: %s",connGetLastError(c->conn));
43            freeClientAsync(c);
44            goto done;
45        }
46    } else if (nread == 0) {
47        if (server.verbosity <= LL_VERBOSE) {
48            sds info = catClientInfoString(sdsempty(), c);
49            serverLog(LL_VERBOSE, "Client closed connection %s", info);
50            sdsfree(info);
51        }
52        freeClientAsync(c);
53        goto done;
54    }
55
56    sdsIncrLen(c->querybuf,nread);
57    qblen = sdslen(c->querybuf);
58    if (c->querybuf_peak < qblen) c->querybuf_peak = qblen;
59
60    c->lastinteraction = server.unixtime;
61    if (c->flags & CLIENT_MASTER) {
62        c->read_reploff += nread;
63        atomicIncr(server.stat_net_repl_input_bytes, nread);
64    } else {
65        atomicIncr(server.stat_net_input_bytes, nread);
66    }
67
68    if (!(c->flags & CLIENT_MASTER) && sdslen(c->querybuf) > server.client_max_querybuf_len) {
69        sds ci = catClientInfoString(sdsempty(),c), bytes = sdsempty();
70
71        bytes = sdscatrepr(bytes,c->querybuf,64);
72        serverLog(LL_WARNING,"Closing client that reached max query buffer length: %s (qbuf initial bytes: %s)", ci, bytes);
73        sdsfree(ci);
74        sdsfree(bytes);
75        freeClientAsync(c);
76        goto done;
77    }
78    //处理读到的数据,尝试解析RESP协议
79    if (processInputBuffer(c) == C_ERR)
80         c = NULL;
81
82done:
83    beforeNextClient(c);
84}

在这个函数中判断是否需要使用多线程。

 1int postponeClientRead(client *c) {
 2    //在这里判断是否需要使用多线程从网络读取数据
 3    if (server.io_threads_active &&
 4        server.io_threads_do_reads &&
 5        !ProcessingEventsWhileBlocked &&
 6        !(c->flags & (CLIENT_MASTER|CLIENT_SLAVE|CLIENT_BLOCKED)) &&
 7        io_threads_op == IO_THREADS_OP_IDLE)
 8    {
 9        //如果需要多线程读取,把当前client加入到`clients_pending_read`中,等待后续调用
10        listAddNodeHead(server.clients_pending_read,c);
11        c->pending_read_list_node = listFirst(server.clients_pending_read);
12        return 1;
13    } else {
14        return 0;
15    }
16}

在配置文件中

  • io-threads IO线程数量,默认只有一个,也就是不用多线程读写网络
  • io-threads-do-reads 是否开启多线程读和解析RESP功能,如果不开启,那么多线程只会处理写

如果只有一个client就绪(也就是没有并发请求)时,即使开了多线程和IO线程读,也不会使用多线程去读网络,只有当多个client同时读写数据,这时候会在 handleClientsWithPendingReadsUsingThreads 中开启 多线程处理。

开启多线程读写

1if (!server.io_threads_active) startThreadedIO();

此时,当client读事件就绪后,再次回调 readQueryFromClient 函数后,会进入 listAddNodeHead(server.clients_pending_read,c); 加入链表,等待IO线程处理

如果需要多线程

在主线程里调用 使用 aeEventPoolbeforesleep 函数指针调用 beforeSleep 函数来执行 sleep前的处理函数,然后在调用 handleClientsWithPendingReadsUsingThreads 函数来判断是否需要使用多线程来读数据。

最后的读取是由 readQueryFromClient 函数来完成数据的读取。

readQueryFromClient 函数可以通过 handleClientsWithPendingReadsUsingThreads 函数在主线程调用,也可以通过 IOThreadMain 函数通过IO线程并发读取。

 1//启动IO多线程读取数据
 2io_threads_op = IO_THREADS_OP_READ;
 3for (int j = 1; j < server.io_threads_num; j++) {
 4    int count = listLength(io_threads_list[j]);
 5    setIOPendingCount(j, count);
 6}
 7.......
 8//等待IO线程读取完成
 9while(1) {
10    unsigned long pending = 0;
11    for (int j = 1; j < server.io_threads_num; j++)
12        pending += getIOPendingCount(j);
13    if (pending == 0) break;
14}

当执行完 beforesleep 函数后,再由 aeFileEventrfileProc 函数指针调用 processInputBuffer 函数内主要是判断和解析字符串,就不展开分析了

主要关注这部分逻辑

 1  if (c->reqtype == PROTO_REQ_INLINE) {
 2      //如果是一行的字符串,解析方式
 3      if (processInlineBuffer(c) != C_OK) break;
 4  } else if (c->reqtype == PROTO_REQ_MULTIBULK) {
 5      //如果是多行的,解析方式
 6      if (processMultibulkBuffer(c) != C_OK) break;
 7  } else {
 8      serverPanic("Unknown request type");
 9  }
10
11  //如果解析到的参数个数是0,那么说明还没有解析完成,重置client
12  if (c->argc == 0) {
13      resetClient(c);
14  } else {
15      //判断一下是否在IO线程里,如果是的话,设置一下flag,然后结束.
16      //因为redis的多线程只能用来读写网络,操作redis的数据库还是用单线的
17      if (io_threads_op != IO_THREADS_OP_IDLE) {
18          serverAssert(io_threads_op == IO_THREADS_OP_READ);
19          c->flags |= CLIENT_PENDING_COMMAND;
20          break;
21      }
22
23      //开始执行命令了
24      if (processCommandAndResetClient(c) == C_ERR) {
25          return C_ERR;
26      }
27  }

最终会调用 processCommand 函数执行命令

执行命令的逻辑先不管,这不是本文的重点。

写处理

当命令处理完成后,把有需要回复客户端的数据写入client的buf中。

最后会调用 writeToClient 函数把 client buf 中的数据写入到网络中,完成数据返回。

writeToClient 有多个回调路径。但是都是由 aeEventpool 中的 aftersleep 函数指针 调用 beforeSleep 函数来完成的。

beforeSleep 中调用 handleClientsWithPendingWritesUsingThreads 函数。

这个函数也比较长,就不贴了,只挑一些重点的。这期贴了不少源码,一直看源码,也挺枯燥的。这个函数的大概逻辑和 handleClientsWithPendingReadsUsingThreads 函数的差不多,只不过这里处理的是写网络。

1if (server.io_threads_num == 1 || stopThreadedIOIfNeeded()) {
2    return handleClientsWithPendingWrites();
3}

这里判断一下,如果没有开启多线程或者停了IO线程,就使用 handleClientsWithPendingWrites 函数在主线程处理写逻辑。

handleClientsWithPendingWrites 里的逻辑很简单,遍历 server.clients_pending_write 中的client,然后调用 writeToClient 把数据写入到网络中。

回到 handleClientsWithPendingWritesUsingThreads 函数中,调用 startThreadedIO() 启动 IO线程并发处理写入。

 1//在主线程中也会处理写入
 2listRewind(io_threads_list[0],&li);
 3while((ln = listNext(&li))) {
 4    client *c = listNodeValue(ln);
 5    writeToClient(c,0);
 6}
 7listEmpty(io_threads_list[0]);
 8//和上面的多线程读一样,这里也会等待所有IO线程任务结束
 9while(1) {
10    unsigned long pending = 0;
11    for (int j = 1; j < server.io_threads_num; j++)
12        pending += getIOPendingCount(j);
13    if (pending == 0) break;
14}

到这基本redis的整个网络处理流程大概就写完了。注意,这只是个大概,其中的一些细节还没有深入分析。比如在 aeProcessEvents 函数中,为什么有 beforesleepaftersleep 这两个函数指针,以及调用两个函数中的先后调用顺序。 aeFileEventrfileProcwfileProc 这两个函数指针的调用顺序,以及调用的函数的逻辑。每一个都可以拿出来单独写一篇。

以后有时间在单独拿出来分析吧。如果有时间再这篇文章补一个流程图。

redis用C写的,C对多态的支持不太好,不像面向对象的语言那样,可以很方便的使用多态。所以在redis的源码里,大量使用了函数指针和回调函数,来间接的实现了多态。这样会在阅读源码的时候增加了很多理解成本。

但是鲁迅先生曾经说过,在断点面前,一切语法都是纸老虎。所以遇到复杂的函数调用逻辑,使用断点去看一下函数的调用关系,堆栈,基本都能解决问题。

也快过年了,提前祝所有读者 新年快乐

发表了58篇文章 · 总计133.24k字
本博客已稳定运行
© QX
使用 Hugo 构建
主题 StackJimmy 设计