Featured image of post redis源码学习|watch

redis源码学习|watch

探索Redis中的WATCH命令和乐观锁机制。本文详细解读了Redis 7.0.11版本中WATCH的实现原理,包括其数据结构、如何监控和管理key的变化,以及在事务中如何确保数据一致性。文章通过清晰的代码示例和图表,展示了WATCH命令在Redis中的工作流程,从监控key到执行事务的全过程。了解Redis如何通过WATCH实现乐观锁,以及在集群模式下的限制。加入我们,一起深入学习Redis源码,掌握数据一致性的核心技术。

最近想深入学习一下redis,那阅读源码是最好的学习方式了,正好最近pika社区在讨论 事务 的实现,实现事务的基础就是数据的一致性 虽然redis的事务没有像 关系型数据库那样,支持数据回滚。但是redis的事务也可以保证数据的一致性,如何保证数据 一致性,就是靠 watch 这个功能来实现的。说白了,redis的watch功能就是一个乐观锁

乐观锁

我的理解是,所谓的乐观锁,对数据加锁不会阻止别的人修改数据,但是别人修改过的数据,加锁的人能知道这个数据被修改过

redis 实现

本文中的源码基于redis 7.0.11,不同的版本实现可能会不同

先放一张watch的数据结构图

在redis的 db 结构中,有一个map,用来记录所有被watch的key,所有watch这个key的client都使用链表串联在一起

源码

 1/* Redis database representation. There are multiple databases identified
 2 * by integers from 0 (the default database) up to the max configured
 3 * database. The database number is the 'id' field in the structure. */
 4typedef struct redisDb {
 5    dict *dict;                 /* The keyspace for this DB */
 6    dict *expires;              /* Timeout of keys with a timeout set */
 7    dict *blocking_keys;        /* Keys with clients waiting for data (BLPOP)*/
 8    dict *ready_keys;           /* Blocked keys that received a PUSH */
 9    dict *watched_keys;         /* WATCHED keys for MULTI/EXEC CAS 这个map就是记录被watch的key*/
10    int id;                     /* Database ID */
11    long long avg_ttl;          /* Average TTL, just for stats */
12    unsigned long expires_cursor; /* Cursor of the active expire cycle. */
13    list *defrag_later;         /* List of key names to attempt to defrag one by one, gradually. */
14    clusterSlotToKeyMapping *slots_to_keys; /* Array of slots to keys. Only used in cluster mode (db 0). */
15} redisDb;

在redis的 client中也有一个链表,记录了当前client watch的所有key

还有一个flag记录当前client的状态,这个flag可以记录很多状态,在watch功能里,可以记录这个当前client watch的key,是否被别的client修改过

源码

1typedef struct client {
2    uint64_t flags;         /* Client flags: CLIENT_* macros. */
3    list *watched_keys;     /* Keys WATCHED for MULTI/EXEC CAS */
4};

因为redis不同的db是互相隔离的,所以在db层面做watch就可以了

watch key

现在来看一下如何watch一个key

源码

 1void watchForKey(client *c, robj *key) {
 2    list *clients = NULL;
 3    listIter li;
 4    listNode *ln;
 5    watchedKey *wk;
 6
 7    /* Check if we are already watching for this key */
 8    listRewind(c->watched_keys,&li);
 9    //遍历当前client已经watch的key中,是否包含当前要watch的key
10    while((ln = listNext(&li))) {
11        wk = listNodeValue(ln);
12        //因为redis不同的db中,数据是隔离的,所以要判断一下,db是否相同
13        //比如db0和db1都有key1这个key
14        if (wk->db == c->db && equalStringObjects(key,wk->key))
15            //如果这个key已经被watch了,直接rerun就好了
16            return; /* Key already watched */
17    }
18    /* This key is not already watched in this DB. Let's add it */
19    //去db 的 `watched_keys`这个map中,找到watch这个key的所有client
20    clients = dictFetchValue(c->db->watched_keys,key);
21    if (!clients) {
22        //如果没有client watch 这个key,返回的链表是空指针,这时候初始化一个链表
23        clients = listCreate();
24        //把链表赋值给map
25        dictAdd(c->db->watched_keys,key,clients);
26        incrRefCount(key);
27    }
28    /* Add the new key to the list of keys watched by this client */
29    wk = zmalloc(sizeof(*wk));
30    wk->key = key;
31    wk->client = c;
32    wk->db = c->db;
33    //记录一下,在watch时,这个key是否过期了,如果在watch前就已经过期了,在执行事务的时候,就忽略这个key
34    wk->expired = keyIsExpired(c->db, key);
35    incrRefCount(key);
36    //把当前的client加到链表中
37    listAddNodeTail(c->watched_keys,wk);
38    //把watch的key 加入的client watch链表中
39    listAddNodeTail(clients,wk);
40}

watch一个key前,会检查一下这个key是否已经被watch了,避免重复watch

找到watch这个key的所有client,并使用尾插法把当前client加入链表,并且记录一下watch这个key之前,这个key是否过期了

到这里,watch的过程就完成了,通过源码可以发现,watch的过程还是挺好理解的,就是在记录一下

执行事务

当redis的client在执行 EXEC命令时,会把当前事务所有的命令一起执行,在执行命令前,会先检查一下watch的key是否被修改了,如果被修改了, 就会放弃执行命令,返回失败

源码

 1void execCommand(client *c) {
 2    int j;
 3    robj **orig_argv;
 4    int orig_argc, orig_argv_len;
 5    struct redisCommand *orig_cmd;
 6
 7    //先检查一下,如果client在此之前没有执行过`MULTI`命令,就执行`EXEC`,返回错误
 8    if (!(c->flags & CLIENT_MULTI)) {
 9        addReplyError(c,"EXEC without MULTI");
10        return;
11    }
12
13    /* EXEC with expired watched key is disallowed*/
14    //检查一下被watch的key是否过期了,如果过期了,事务也会失败
15    //如果这个key在watch之前就过期了,那么这个key会被忽略
16    if (isWatchedKeyExpired(c)) {
17        c->flags |= (CLIENT_DIRTY_CAS);
18    }
19
20    /* Check if we need to abort the EXEC because:
21     * 1) Some WATCHed key was touched.
22     * 2) There was a previous error while queueing commands.
23     * A failed EXEC in the first case returns a multi bulk nil object
24     * (technically it is not an error but a special behavior), while
25     * in the second an EXECABORT error is returned. */
26    //这里就是判断这个key是否被其他client改动了,如果key被别的client改动了,或者事务出错了,那么本次事务都会失败
27    if (c->flags & (CLIENT_DIRTY_CAS | CLIENT_DIRTY_EXEC)) {
28        if (c->flags & CLIENT_DIRTY_EXEC) {
29            addReplyErrorObject(c, shared.execaborterr);
30        } else {
31            addReply(c, shared.nullarray[c->resp]);
32        }
33
34        discardTransaction(c);
35        return;
36    }
37    uint64_t old_flags = c->flags;
38
39    /* we do not want to allow blocking commands inside multi */
40    c->flags |= CLIENT_DENY_BLOCKING;
41
42    /* Exec all the queued commands */
43    //取消watch的所有key
44    unwatchAllKeys(c); /* Unwatch ASAP otherwise we'll waste CPU cycles */
45    .....................................................
46    省略
47}

在执行事务前,会检查一下当前client是否开启了事务(是否执行了 MULTI 命令),没有开启事务,这次事务会失败

再检查这个client watch的key是否被修改了。如果这个key被其他的client修改了,则这个事务会执行失败

watch检查没有问题后,会清除当前client watch的所有key

到这,事务执行前的检查就完成了,后面就是事务相关的判断和操作了

修改watch的key

如果一个被watch的key被修改了,那么所有watch这个key的client都会知道,具体实现如下

源码

所有修改命令(比如set,hset等等)在执行后,都会调用这个函数,把watch的key标记为修改状态

 1void touchWatchedKey(redisDb *db, robj *key) {
 2    list *clients;
 3    listIter li;
 4    listNode *ln;
 5
 6    //如果记录key的map为空或者client链表是空,直接return
 7    if (dictSize(db->watched_keys) == 0) return;
 8    clients = dictFetchValue(db->watched_keys, key);
 9    if (!clients) return;
10
11    /* Mark all the clients watching this key as CLIENT_DIRTY_CAS */
12    /* Check if we are already watching for this key */
13    listRewind(clients,&li);
14    //遍历watch这个key的client链表,把所有client的flag修改为 `CLIENT_DIRTY_CAS` 状态
15    while((ln = listNext(&li))) {
16        watchedKey *wk = listNodeValue(ln);
17        client *c = wk->client;
18
19        if (wk->expired) {
20            /* The key was already expired when WATCH was called. */
21            if (db == wk->db &&
22                equalStringObjects(key, wk->key) &&
23                dictFind(db->dict, key->ptr) == NULL)
24            {
25                //如果在这个client在watch前,这个key就已经过期了 && 是这个db的 && key是同一个 && 内存里没有这个key了
26                //就清除 watch就过期的 flag
27                /* Already expired key is deleted, so logically no change. Clear
28                 * the flag. Deleted keys are not flagged as expired. */
29                wk->expired = 0;
30                goto skip_client;
31            }
32            //否则就结束
33            break;
34        }
35
36        //把这个client的flag修改为 被其他client修改了
37        c->flags |= CLIENT_DIRTY_CAS;
38        /* As the client is marked as dirty, there is no point in getting here
39         * again in case that key (or others) are modified again (or keep the
40         * memory overhead till EXEC). */
41        //如果这个client watch的key被标记为以修改,那么就把这个client watch的key都删掉
42        unwatchAllKeys(c);
43
44    skip_client:
45        continue;
46    }
47}
48
49//删除client watch的key
50void unwatchAllKeys(client *c) {
51    listIter li;
52    listNode *ln;
53
54    if (listLength(c->watched_keys) == 0) return;
55    listRewind(c->watched_keys,&li);
56    //变量当前client watch的key 把这些key在对应的db的`watched_keys`中删除
57    while((ln = listNext(&li))) {
58        list *clients;
59        watchedKey *wk;
60
61        /* Lookup the watched key -> clients list and remove the client's wk
62         * from the list */
63        wk = listNodeValue(ln);
64        clients = dictFetchValue(wk->db->watched_keys, wk->key);
65        serverAssertWithInfo(c,NULL,clients != NULL);
66        listDelNode(clients,listSearchKey(clients,wk));
67        /* Kill the entry at all if this was the only client */
68        if (listLength(clients) == 0)
69            dictDelete(wk->db->watched_keys, wk->key);
70        /* Remove this watched key from the client->watched list */
71        listDelNode(c->watched_keys,ln);
72        decrRefCount(wk->key);
73        zfree(wk);
74    }
75}

任意client在修改key后,都会调用 touchWatchedKey 把watch这个key的client的flag标记为被修改状态,watch这个key的client,会遍历自己的watch 的链表,把所有的key都删掉。后续在执行事务时会失败

在执行 UNWATCH 命令时,最终调用的也是 unwatchAllKeys 这个函数

总结

redis通过 watch 命令实现乐观锁,保证了事务中数据的一致性。

redis的乐观锁在集群模式下并不适用,在集群模式下,还是要使用 SETEX KEY_NAME TIMEOUT VALUE 这种方式加锁来保证数据一致

刚开始学习redis源码,对一些概念理解还不是很深,如果有错误的地方,还请批评指正

发表了56篇文章 · 总计128.44k字
本博客已稳定运行
© QX
使用 Hugo 构建
主题 StackJimmy 设计