/* Redis database representation. There are multiple databases identified * by integers from 0 (the default database) up to the max configured * database. The database number is the 'id' field in the structure. */ typedefstructredisDb { dict *dict; /* The keyspace for this DB */ dict *expires; /* Timeout of keys with a timeout set */ dict *blocking_keys; /* Keys with clients waiting for data (BLPOP)*/ dict *ready_keys; /* Blocked keys that received a PUSH */ dict *watched_keys; /* WATCHED keys for MULTI/EXEC CAS 这个map就是记录被watch的key*/ int id; /* Database ID */ longlong avg_ttl; /* Average TTL, just for stats */ unsignedlong expires_cursor; /* Cursor of the active expire cycle. */ list *defrag_later; /* List of key names to attempt to defrag one by one, gradually. */ clusterSlotToKeyMapping *slots_to_keys; /* Array of slots to keys. Only used in cluster mode (db 0). */ } redisDb;
voidexecCommand(client *c) { int j; robj **orig_argv; int orig_argc, orig_argv_len; structredisCommand *orig_cmd;
//先检查一下,如果client在此之前没有执行过`MULTI`命令,就执行`EXEC`,返回错误 if (!(c->flags & CLIENT_MULTI)) { addReplyError(c,"EXEC without MULTI"); return; }
/* EXEC with expired watched key is disallowed*/ //检查一下被watch的key是否过期了,如果过期了,事务也会失败 //如果这个key在watch之前就过期了,那么这个key会被忽略 if (isWatchedKeyExpired(c)) { c->flags |= (CLIENT_DIRTY_CAS); }
/* Check if we need to abort the EXEC because: * 1) Some WATCHed key was touched. * 2) There was a previous error while queueing commands. * A failed EXEC in the first case returns a multi bulk nil object * (technically it is not an error but a special behavior), while * in the second an EXECABORT error is returned. */ //这里就是判断这个key是否被其他client改动了,如果key被别的client改动了,或者事务出错了,那么本次事务都会失败 if (c->flags & (CLIENT_DIRTY_CAS | CLIENT_DIRTY_EXEC)) { if (c->flags & CLIENT_DIRTY_EXEC) { addReplyErrorObject(c, shared.execaborterr); } else { addReply(c, shared.nullarray[c->resp]); }
//如果记录key的map为空或者client链表是空,直接return if (dictSize(db->watched_keys) == 0) return; clients = dictFetchValue(db->watched_keys, key); if (!clients) return;
/* Mark all the clients watching this key as CLIENT_DIRTY_CAS */ /* Check if we are already watching for this key */ listRewind(clients,&li); //遍历watch这个key的client链表,把所有client的flag修改为 `CLIENT_DIRTY_CAS` 状态 while((ln = listNext(&li))) { watchedKey *wk = listNodeValue(ln); client *c = wk->client;
if (wk->expired) { /* The key was already expired when WATCH was called. */ if (db == wk->db && equalStringObjects(key, wk->key) && dictFind(db->dict, key->ptr) == NULL) { //如果在这个client在watch前,这个key就已经过期了 && 是这个db的 && key是同一个 && 内存里没有这个key了 //就清除 watch就过期的 flag /* Already expired key is deleted, so logically no change. Clear * the flag. Deleted keys are not flagged as expired. */ wk->expired = 0; goto skip_client; } //否则就结束 break; }
//把这个client的flag修改为 被其他client修改了 c->flags |= CLIENT_DIRTY_CAS; /* As the client is marked as dirty, there is no point in getting here * again in case that key (or others) are modified again (or keep the * memory overhead till EXEC). */ //如果这个client watch的key被标记为以修改,那么就把这个client watch的key都删掉 unwatchAllKeys(c);
if (listLength(c->watched_keys) == 0) return; listRewind(c->watched_keys,&li); //变量当前client watch的key 把这些key在对应的db的`watched_keys`中删除 while((ln = listNext(&li))) { list *clients; watchedKey *wk;
/* Lookup the watched key -> clients list and remove the client's wk * from the list */ wk = listNodeValue(ln); clients = dictFetchValue(wk->db->watched_keys, wk->key); serverAssertWithInfo(c,NULL,clients != NULL); listDelNode(clients,listSearchKey(clients,wk)); /* Kill the entry at all if this was the only client */ if (listLength(clients) == 0) dictDelete(wk->db->watched_keys, wk->key); /* Remove this watched key from the client->watched list */ listDelNode(c->watched_keys,ln); decrRefCount(wk->key); zfree(wk); } }