最近想深入学习一下redis,那阅读源码是最好的学习方式了,正好最近pika社区在讨论 事务 的实现,实现事务的基础就是数据的一致性 虽然redis的事务没有像 关系型数据库那样,支持数据回滚。但是redis的事务也可以保证数据的一致性,如何保证数据 一致性,就是靠 watch 这个功能来实现的。说白了,redis的watch功能就是一个乐观锁
乐观锁
我的理解是,所谓的乐观锁,对数据加锁不会阻止别的人修改数据,但是别人修改过的数据,加锁的人能知道这个数据被修改过
redis 实现
本文中的源码基于redis 7.0.11,不同的版本实现可能会不同
先放一张watch的数据结构图
在redis的 db 结构中,有一个map,用来记录所有被watch的key,所有watch这个key的client都使用链表串联在一起
1/* Redis database representation. There are multiple databases identified
2 * by integers from 0 (the default database) up to the max configured
3 * database. The database number is the 'id' field in the structure. */
4typedef struct redisDb {
5 dict *dict; /* The keyspace for this DB */
6 dict *expires; /* Timeout of keys with a timeout set */
7 dict *blocking_keys; /* Keys with clients waiting for data (BLPOP)*/
8 dict *ready_keys; /* Blocked keys that received a PUSH */
9 dict *watched_keys; /* WATCHED keys for MULTI/EXEC CAS 这个map就是记录被watch的key*/
10 int id; /* Database ID */
11 long long avg_ttl; /* Average TTL, just for stats */
12 unsigned long expires_cursor; /* Cursor of the active expire cycle. */
13 list *defrag_later; /* List of key names to attempt to defrag one by one, gradually. */
14 clusterSlotToKeyMapping *slots_to_keys; /* Array of slots to keys. Only used in cluster mode (db 0). */
15} redisDb;
在redis的 client中也有一个链表,记录了当前client watch的所有key
还有一个flag记录当前client的状态,这个flag可以记录很多状态,在watch功能里,可以记录这个当前client watch的key,是否被别的client修改过
1typedef struct client {
2 uint64_t flags; /* Client flags: CLIENT_* macros. */
3 list *watched_keys; /* Keys WATCHED for MULTI/EXEC CAS */
4};
因为redis不同的db是互相隔离的,所以在db层面做watch就可以了
watch key
现在来看一下如何watch一个key
1void watchForKey(client *c, robj *key) {
2 list *clients = NULL;
3 listIter li;
4 listNode *ln;
5 watchedKey *wk;
6
7 /* Check if we are already watching for this key */
8 listRewind(c->watched_keys,&li);
9 //遍历当前client已经watch的key中,是否包含当前要watch的key
10 while((ln = listNext(&li))) {
11 wk = listNodeValue(ln);
12 //因为redis不同的db中,数据是隔离的,所以要判断一下,db是否相同
13 //比如db0和db1都有key1这个key
14 if (wk->db == c->db && equalStringObjects(key,wk->key))
15 //如果这个key已经被watch了,直接rerun就好了
16 return; /* Key already watched */
17 }
18 /* This key is not already watched in this DB. Let's add it */
19 //去db 的 `watched_keys`这个map中,找到watch这个key的所有client
20 clients = dictFetchValue(c->db->watched_keys,key);
21 if (!clients) {
22 //如果没有client watch 这个key,返回的链表是空指针,这时候初始化一个链表
23 clients = listCreate();
24 //把链表赋值给map
25 dictAdd(c->db->watched_keys,key,clients);
26 incrRefCount(key);
27 }
28 /* Add the new key to the list of keys watched by this client */
29 wk = zmalloc(sizeof(*wk));
30 wk->key = key;
31 wk->client = c;
32 wk->db = c->db;
33 //记录一下,在watch时,这个key是否过期了,如果在watch前就已经过期了,在执行事务的时候,就忽略这个key
34 wk->expired = keyIsExpired(c->db, key);
35 incrRefCount(key);
36 //把当前的client加到链表中
37 listAddNodeTail(c->watched_keys,wk);
38 //把watch的key 加入的client watch链表中
39 listAddNodeTail(clients,wk);
40}
watch一个key前,会检查一下这个key是否已经被watch了,避免重复watch
找到watch这个key的所有client,并使用尾插法把当前client加入链表,并且记录一下watch这个key之前,这个key是否过期了
到这里,watch的过程就完成了,通过源码可以发现,watch的过程还是挺好理解的,就是在记录一下
执行事务
当redis的client在执行 EXEC命令时,会把当前事务所有的命令一起执行,在执行命令前,会先检查一下watch的key是否被修改了,如果被修改了, 就会放弃执行命令,返回失败
1void execCommand(client *c) {
2 int j;
3 robj **orig_argv;
4 int orig_argc, orig_argv_len;
5 struct redisCommand *orig_cmd;
6
7 //先检查一下,如果client在此之前没有执行过`MULTI`命令,就执行`EXEC`,返回错误
8 if (!(c->flags & CLIENT_MULTI)) {
9 addReplyError(c,"EXEC without MULTI");
10 return;
11 }
12
13 /* EXEC with expired watched key is disallowed*/
14 //检查一下被watch的key是否过期了,如果过期了,事务也会失败
15 //如果这个key在watch之前就过期了,那么这个key会被忽略
16 if (isWatchedKeyExpired(c)) {
17 c->flags |= (CLIENT_DIRTY_CAS);
18 }
19
20 /* Check if we need to abort the EXEC because:
21 * 1) Some WATCHed key was touched.
22 * 2) There was a previous error while queueing commands.
23 * A failed EXEC in the first case returns a multi bulk nil object
24 * (technically it is not an error but a special behavior), while
25 * in the second an EXECABORT error is returned. */
26 //这里就是判断这个key是否被其他client改动了,如果key被别的client改动了,或者事务出错了,那么本次事务都会失败
27 if (c->flags & (CLIENT_DIRTY_CAS | CLIENT_DIRTY_EXEC)) {
28 if (c->flags & CLIENT_DIRTY_EXEC) {
29 addReplyErrorObject(c, shared.execaborterr);
30 } else {
31 addReply(c, shared.nullarray[c->resp]);
32 }
33
34 discardTransaction(c);
35 return;
36 }
37 uint64_t old_flags = c->flags;
38
39 /* we do not want to allow blocking commands inside multi */
40 c->flags |= CLIENT_DENY_BLOCKING;
41
42 /* Exec all the queued commands */
43 //取消watch的所有key
44 unwatchAllKeys(c); /* Unwatch ASAP otherwise we'll waste CPU cycles */
45 .....................................................
46 省略
47}
在执行事务前,会检查一下当前client是否开启了事务(是否执行了 MULTI 命令),没有开启事务,这次事务会失败
再检查这个client watch的key是否被修改了。如果这个key被其他的client修改了,则这个事务会执行失败
watch检查没有问题后,会清除当前client watch的所有key
到这,事务执行前的检查就完成了,后面就是事务相关的判断和操作了
修改watch的key
如果一个被watch的key被修改了,那么所有watch这个key的client都会知道,具体实现如下
所有修改命令(比如set,hset等等)在执行后,都会调用这个函数,把watch的key标记为修改状态
1void touchWatchedKey(redisDb *db, robj *key) {
2 list *clients;
3 listIter li;
4 listNode *ln;
5
6 //如果记录key的map为空或者client链表是空,直接return
7 if (dictSize(db->watched_keys) == 0) return;
8 clients = dictFetchValue(db->watched_keys, key);
9 if (!clients) return;
10
11 /* Mark all the clients watching this key as CLIENT_DIRTY_CAS */
12 /* Check if we are already watching for this key */
13 listRewind(clients,&li);
14 //遍历watch这个key的client链表,把所有client的flag修改为 `CLIENT_DIRTY_CAS` 状态
15 while((ln = listNext(&li))) {
16 watchedKey *wk = listNodeValue(ln);
17 client *c = wk->client;
18
19 if (wk->expired) {
20 /* The key was already expired when WATCH was called. */
21 if (db == wk->db &&
22 equalStringObjects(key, wk->key) &&
23 dictFind(db->dict, key->ptr) == NULL)
24 {
25 //如果在这个client在watch前,这个key就已经过期了 && 是这个db的 && key是同一个 && 内存里没有这个key了
26 //就清除 watch就过期的 flag
27 /* Already expired key is deleted, so logically no change. Clear
28 * the flag. Deleted keys are not flagged as expired. */
29 wk->expired = 0;
30 goto skip_client;
31 }
32 //否则就结束
33 break;
34 }
35
36 //把这个client的flag修改为 被其他client修改了
37 c->flags |= CLIENT_DIRTY_CAS;
38 /* As the client is marked as dirty, there is no point in getting here
39 * again in case that key (or others) are modified again (or keep the
40 * memory overhead till EXEC). */
41 //如果这个client watch的key被标记为以修改,那么就把这个client watch的key都删掉
42 unwatchAllKeys(c);
43
44 skip_client:
45 continue;
46 }
47}
48
49//删除client watch的key
50void unwatchAllKeys(client *c) {
51 listIter li;
52 listNode *ln;
53
54 if (listLength(c->watched_keys) == 0) return;
55 listRewind(c->watched_keys,&li);
56 //变量当前client watch的key 把这些key在对应的db的`watched_keys`中删除
57 while((ln = listNext(&li))) {
58 list *clients;
59 watchedKey *wk;
60
61 /* Lookup the watched key -> clients list and remove the client's wk
62 * from the list */
63 wk = listNodeValue(ln);
64 clients = dictFetchValue(wk->db->watched_keys, wk->key);
65 serverAssertWithInfo(c,NULL,clients != NULL);
66 listDelNode(clients,listSearchKey(clients,wk));
67 /* Kill the entry at all if this was the only client */
68 if (listLength(clients) == 0)
69 dictDelete(wk->db->watched_keys, wk->key);
70 /* Remove this watched key from the client->watched list */
71 listDelNode(c->watched_keys,ln);
72 decrRefCount(wk->key);
73 zfree(wk);
74 }
75}
任意client在修改key后,都会调用 touchWatchedKey
把watch这个key的client的flag标记为被修改状态,watch这个key的client,会遍历自己的watch 的链表,把所有的key都删掉。后续在执行事务时会失败
在执行 UNWATCH 命令时,最终调用的也是 unwatchAllKeys
这个函数
总结
redis通过 watch 命令实现乐观锁,保证了事务中数据的一致性。
redis的乐观锁在集群模式下并不适用,在集群模式下,还是要使用 SETEX KEY_NAME TIMEOUT VALUE
这种方式加锁来保证数据一致
刚开始学习redis源码,对一些概念理解还不是很深,如果有错误的地方,还请批评指正