typedefstructaeEventLoop { int maxfd; /* highest file descriptor currently registered */ int setsize; /* max number of file descriptors tracked */ longlong timeEventNextId; aeFileEvent *events; /* Registered events */ aeFiredEvent *fired; /* Fired events */ aeTimeEvent *timeEventHead; int stop; void *apidata; /* This is used for polling API specific data */ aeBeforeSleepProc *beforesleep; aeBeforeSleepProc *aftersleep; int flags; } aeEventLoop;
voidinitListeners(void) { /* Setup listeners from server config for TCP/TLS/Unix */ int conn_index; connListener *listener; //判断是否需要监听普通的TCP if (server.port != 0) { conn_index = connectionIndexByType(CONN_TYPE_SOCKET); if (conn_index < 0) serverPanic("Failed finding connection listener of %s", CONN_TYPE_SOCKET); listener = &server.listeners[conn_index]; listener->bindaddr = server.bindaddr; listener->bindaddr_count = server.bindaddr_count; listener->port = server.port; listener->ct = connectionByType(CONN_TYPE_SOCKET); } //判断是否需要监听,如果需要,检查tls需要的配置是否正确 if (server.tls_port || server.tls_replication || server.tls_cluster) { ConnectionType *ct_tls = connectionTypeTls(); if (!ct_tls) { serverLog(LL_WARNING, "Failed finding TLS support."); exit(1); } if (connTypeConfigure(ct_tls, &server.tls_ctx_config, 1) == C_ERR) { serverLog(LL_WARNING, "Failed to configure TLS. Check logs for more info."); exit(1); } } //判断是否需要监听 TLS的TCP if (server.tls_port != 0) { conn_index = connectionIndexByType(CONN_TYPE_TLS); if (conn_index < 0) serverPanic("Failed finding connection listener of %s", CONN_TYPE_TLS); listener = &server.listeners[conn_index]; listener->bindaddr = server.bindaddr; listener->bindaddr_count = server.bindaddr_count; listener->port = server.tls_port; listener->ct = connectionByType(CONN_TYPE_TLS); } //判断是否需要unixsocket if (server.unixsocket != NULL) { conn_index = connectionIndexByType(CONN_TYPE_UNIX); if (conn_index < 0) serverPanic("Failed finding connection listener of %s", CONN_TYPE_UNIX); listener = &server.listeners[conn_index]; listener->bindaddr = &server.unixsocket; listener->bindaddr_count = 1; listener->ct = connectionByType(CONN_TYPE_UNIX); listener->priv = &server.unixsocketperm; /* Unix socket specified */ }
/* create all the configured listener, and add handler to start to accept */ int listen_fds = 0; for (int j = 0; j < CONN_TYPE_MAX; j++) { listener = &server.listeners[j]; if (listener->ct == NULL) continue;
if (connListen(listener) == C_ERR) { serverLog(LL_WARNING, "Failed listening on port %u (%s), aborting.", listener->port, listener->ct->get_type(NULL)); exit(1); } //创建新的连接处理事件 if (createSocketAcceptHandler(listener, connAcceptHandler(listener->ct)) != C_OK) serverPanic("Unrecoverable error creating %s listener accept handler.", listener->ct->get_type(NULL));
listen_fds += listener->count; }
if (listen_fds == 0) { serverLog(LL_WARNING, "Configured to not listen anywhere, exiting."); exit(1); } }
voidclusterInitListeners(void) { if (connectionIndexByType(connTypeOfCluster()->get_type(NULL)) < 0) { serverLog(LL_WARNING, "Missing connection type %s, but it is required for the Cluster bus.", connTypeOfCluster()->get_type(NULL)); exit(1); }
int port = defaultClientPort(); connListener *listener = &server.clistener; listener->count = 0; listener->bindaddr = server.bindaddr; listener->bindaddr_count = server.bindaddr_count; listener->port = server.cluster_port ? server.cluster_port : port + CLUSTER_PORT_INCR; listener->ct = connTypeOfCluster(); if (connListen(listener) == C_ERR ) { /* Note: the following log text is matched by the test suite. */ serverLog(LL_WARNING, "Failed listening on port %u (cluster), aborting.", listener->port); exit(1); } if (createSocketAcceptHandler(&server.clistener, clusterAcceptHandler) != C_OK) { serverPanic("Unrecoverable error creating Redis Cluster socket accept handler."); } }
voidinitThreadedIO(void) { server.io_threads_active = 0; /* We start with threads not active. */
/* Indicate that io-threads are currently idle */ io_threads_op = IO_THREADS_OP_IDLE;
//如果就一个线程,也没必要开新的线程了 if (server.io_threads_num == 1) return;
//判断一下,如果线程太多了,就提示错误,不能开启太多线程 if (server.io_threads_num > IO_THREADS_MAX_NUM) { serverLog(LL_WARNING,"Fatal: too many I/O threads configured. " "The maximum number is %d.", IO_THREADS_MAX_NUM); exit(1); }
/* Spawn and initialize the I/O threads. */ for (int i = 0; i < server.io_threads_num; i++) { /* Things we do for all the threads including the main thread. */ io_threads_list[i] = listCreate(); if (i == 0) continue; /* Thread 0 is the main thread. */
/* Things we do only for the additional threads. */ pthread_t tid; pthread_mutex_init(&io_threads_mutex[i],NULL); setIOPendingCount(i, 0); pthread_mutex_lock(&io_threads_mutex[i]); /* Thread will be stopped. */ if (pthread_create(&tid,NULL,IOThreadMain,(void*)(long)i) != 0) { serverLog(LL_WARNING,"Fatal: Can't initialize IO thread."); exit(1); } io_threads[i] = tid; } }
/* Don't process file events if not requested. */ if (!(flags & AE_FILE_EVENTS)) { numevents = 0; }
//检查aftersleep函数指针,如果有对应的函数,调用这个函数 if (eventLoop->aftersleep != NULL && flags & AE_CALL_AFTER_SLEEP) eventLoop->aftersleep(eventLoop); //检查就绪的事件,然后做对应的处理 for (j = 0; j < numevents; j++) { int fd = eventLoop->fired[j].fd; aeFileEvent *fe = &eventLoop->events[fd]; int mask = eventLoop->fired[j].mask; int fired = 0; /* Number of events fired for current fd. */
int invert = fe->mask & AE_BARRIER; //需要翻转的话,先处理读的回调函数 if (!invert && fe->mask & mask & AE_READABLE) { fe->rfileProc(eventLoop,fd,fe->clientData,mask); fired++; fe = &eventLoop->events[fd]; /* Refresh in case of resize. */ }
//如果有写事件,做回调 if (fe->mask & mask & AE_WRITABLE) { if (!fired || fe->wfileProc != fe->rfileProc) { fe->wfileProc(eventLoop,fd,fe->clientData,mask); fired++; } }
if (invert) {//如果之前没有处理读事件 fe = &eventLoop->events[fd]; /* Refresh in case of resize. */ if ((fe->mask & mask & AE_READABLE) && (!fired || fe->wfileProc != fe->rfileProc)) {//读回调函数指针和写回调函数指针不是同一个, 并且有读事件,回调读函数 fe->rfileProc(eventLoop,fd,fe->clientData,mask); fired++; } }
staticvoidconnSocketAcceptHandler(aeEventLoop *el, int fd, void *privdata, int mask) { int cport, cfd, max = MAX_ACCEPTS_PER_CALL; char cip[NET_IP_STR_LEN]; UNUSED(el); UNUSED(mask); UNUSED(privdata);