0%

redis源码学习|watch

最近想深入学习一下redis,那阅读源码是最好的学习方式了,正好最近pika社区在讨论 事务 的实现,实现事务的基础就是数据的一致性
虽然redis的事务没有像 关系型数据库那样,支持数据回滚。但是redis的事务也可以保证数据的一致性,如何保证数据 一致性,就是靠 watch 这个功能来实现的。说白了,redis的watch功能就是一个乐观锁

乐观锁

我的理解是,所谓的乐观锁,对数据加锁不会阻止别的人修改数据,但是别人修改过的数据,加锁的人能知道这个数据被修改过

redis 实现

本文中的源码基于redis 7.0.11,不同的版本实现可能会不同

先放一张watch的数据结构图

在redis的 db 结构中,有一个map,用来记录所有被watch的key,所有watch这个key的client都使用链表串联在一起

源码

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
/* Redis database representation. There are multiple databases identified
* by integers from 0 (the default database) up to the max configured
* database. The database number is the 'id' field in the structure. */
typedef struct redisDb {
dict *dict; /* The keyspace for this DB */
dict *expires; /* Timeout of keys with a timeout set */
dict *blocking_keys; /* Keys with clients waiting for data (BLPOP)*/
dict *ready_keys; /* Blocked keys that received a PUSH */
dict *watched_keys; /* WATCHED keys for MULTI/EXEC CAS 这个map就是记录被watch的key*/
int id; /* Database ID */
long long avg_ttl; /* Average TTL, just for stats */
unsigned long expires_cursor; /* Cursor of the active expire cycle. */
list *defrag_later; /* List of key names to attempt to defrag one by one, gradually. */
clusterSlotToKeyMapping *slots_to_keys; /* Array of slots to keys. Only used in cluster mode (db 0). */
} redisDb;

在redis的 client中也有一个链表,记录了当前client watch的所有key

还有一个flag记录当前client的状态,这个flag可以记录很多状态,在watch功能里,可以记录这个当前client watch的key,是否被别的client修改过

源码

1
2
3
4
typedef struct client {
uint64_t flags; /* Client flags: CLIENT_* macros. */
list *watched_keys; /* Keys WATCHED for MULTI/EXEC CAS */
};

因为redis不同的db是互相隔离的,所以在db层面做watch就可以了

watch key

现在来看一下如何watch一个key

源码

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
void watchForKey(client *c, robj *key) {
list *clients = NULL;
listIter li;
listNode *ln;
watchedKey *wk;

/* Check if we are already watching for this key */
listRewind(c->watched_keys,&li);
//遍历当前client已经watch的key中,是否包含当前要watch的key
while((ln = listNext(&li))) {
wk = listNodeValue(ln);
//因为redis不同的db中,数据是隔离的,所以要判断一下,db是否相同
//比如db0和db1都有key1这个key
if (wk->db == c->db && equalStringObjects(key,wk->key))
//如果这个key已经被watch了,直接rerun就好了
return; /* Key already watched */
}
/* This key is not already watched in this DB. Let's add it */
//去db 的 `watched_keys`这个map中,找到watch这个key的所有client
clients = dictFetchValue(c->db->watched_keys,key);
if (!clients) {
//如果没有client watch 这个key,返回的链表是空指针,这时候初始化一个链表
clients = listCreate();
//把链表赋值给map
dictAdd(c->db->watched_keys,key,clients);
incrRefCount(key);
}
/* Add the new key to the list of keys watched by this client */
wk = zmalloc(sizeof(*wk));
wk->key = key;
wk->client = c;
wk->db = c->db;
//记录一下,在watch时,这个key是否过期了,如果在watch前就已经过期了,在执行事务的时候,就忽略这个key
wk->expired = keyIsExpired(c->db, key);
incrRefCount(key);
//把当前的client加到链表中
listAddNodeTail(c->watched_keys,wk);
//把watch的key 加入的client watch链表中
listAddNodeTail(clients,wk);
}

watch一个key前,会检查一下这个key是否已经被watch了,避免重复watch

找到watch这个key的所有client,并使用尾插法把当前client加入链表,并且记录一下watch这个key之前,这个key是否过期了

到这里,watch的过程就完成了,通过源码可以发现,watch的过程还是挺好理解的,就是在记录一下

执行事务

当redis的client在执行 EXEC命令时,会把当前事务所有的命令一起执行,在执行命令前,会先检查一下watch的key是否被修改了,如果被修改了,
就会放弃执行命令,返回失败

源码

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
void execCommand(client *c) {
int j;
robj **orig_argv;
int orig_argc, orig_argv_len;
struct redisCommand *orig_cmd;

//先检查一下,如果client在此之前没有执行过`MULTI`命令,就执行`EXEC`,返回错误
if (!(c->flags & CLIENT_MULTI)) {
addReplyError(c,"EXEC without MULTI");
return;
}

/* EXEC with expired watched key is disallowed*/
//检查一下被watch的key是否过期了,如果过期了,事务也会失败
//如果这个key在watch之前就过期了,那么这个key会被忽略
if (isWatchedKeyExpired(c)) {
c->flags |= (CLIENT_DIRTY_CAS);
}

/* Check if we need to abort the EXEC because:
* 1) Some WATCHed key was touched.
* 2) There was a previous error while queueing commands.
* A failed EXEC in the first case returns a multi bulk nil object
* (technically it is not an error but a special behavior), while
* in the second an EXECABORT error is returned. */
//这里就是判断这个key是否被其他client改动了,如果key被别的client改动了,或者事务出错了,那么本次事务都会失败
if (c->flags & (CLIENT_DIRTY_CAS | CLIENT_DIRTY_EXEC)) {
if (c->flags & CLIENT_DIRTY_EXEC) {
addReplyErrorObject(c, shared.execaborterr);
} else {
addReply(c, shared.nullarray[c->resp]);
}

discardTransaction(c);
return;
}
uint64_t old_flags = c->flags;

/* we do not want to allow blocking commands inside multi */
c->flags |= CLIENT_DENY_BLOCKING;

/* Exec all the queued commands */
//取消watch的所有key
unwatchAllKeys(c); /* Unwatch ASAP otherwise we'll waste CPU cycles */
.....................................................
省略
}

在执行事务前,会检查一下当前client是否开启了事务(是否执行了 MULTI 命令),没有开启事务,这次事务会失败

再检查这个client watch的key是否被修改了。如果这个key被其他的client修改了,则这个事务会执行失败

watch检查没有问题后,会清除当前client watch的所有key

到这,事务执行前的检查就完成了,后面就是事务相关的判断和操作了

修改watch的key

如果一个被watch的key被修改了,那么所有watch这个key的client都会知道,具体实现如下

源码

所有修改命令(比如set,hset等等)在执行后,都会调用这个函数,把watch的key标记为修改状态

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
void touchWatchedKey(redisDb *db, robj *key) {
list *clients;
listIter li;
listNode *ln;

//如果记录key的map为空或者client链表是空,直接return
if (dictSize(db->watched_keys) == 0) return;
clients = dictFetchValue(db->watched_keys, key);
if (!clients) return;

/* Mark all the clients watching this key as CLIENT_DIRTY_CAS */
/* Check if we are already watching for this key */
listRewind(clients,&li);
//遍历watch这个key的client链表,把所有client的flag修改为 `CLIENT_DIRTY_CAS` 状态
while((ln = listNext(&li))) {
watchedKey *wk = listNodeValue(ln);
client *c = wk->client;

if (wk->expired) {
/* The key was already expired when WATCH was called. */
if (db == wk->db &&
equalStringObjects(key, wk->key) &&
dictFind(db->dict, key->ptr) == NULL)
{
//如果在这个client在watch前,这个key就已经过期了 && 是这个db的 && key是同一个 && 内存里没有这个key了
//就清除 watch就过期的 flag
/* Already expired key is deleted, so logically no change. Clear
* the flag. Deleted keys are not flagged as expired. */
wk->expired = 0;
goto skip_client;
}
//否则就结束
break;
}

//把这个client的flag修改为 被其他client修改了
c->flags |= CLIENT_DIRTY_CAS;
/* As the client is marked as dirty, there is no point in getting here
* again in case that key (or others) are modified again (or keep the
* memory overhead till EXEC). */
//如果这个client watch的key被标记为以修改,那么就把这个client watch的key都删掉
unwatchAllKeys(c);

skip_client:
continue;
}
}

//删除client watch的key
void unwatchAllKeys(client *c) {
listIter li;
listNode *ln;

if (listLength(c->watched_keys) == 0) return;
listRewind(c->watched_keys,&li);
//变量当前client watch的key 把这些key在对应的db的`watched_keys`中删除
while((ln = listNext(&li))) {
list *clients;
watchedKey *wk;

/* Lookup the watched key -> clients list and remove the client's wk
* from the list */
wk = listNodeValue(ln);
clients = dictFetchValue(wk->db->watched_keys, wk->key);
serverAssertWithInfo(c,NULL,clients != NULL);
listDelNode(clients,listSearchKey(clients,wk));
/* Kill the entry at all if this was the only client */
if (listLength(clients) == 0)
dictDelete(wk->db->watched_keys, wk->key);
/* Remove this watched key from the client->watched list */
listDelNode(c->watched_keys,ln);
decrRefCount(wk->key);
zfree(wk);
}
}

任意client在修改key后,都会调用 touchWatchedKey 把watch这个key的client的flag标记为被修改状态,watch这个key的client,会遍历自己的watch 的链表,把所有的key都删掉。后续在执行事务时会失败

在执行 UNWATCH 命令时,最终调用的也是 unwatchAllKeys 这个函数

总结

redis通过 watch 命令实现乐观锁,保证了事务中数据的一致性。

redis的乐观锁在集群模式下并不适用,在集群模式下,还是要使用 SETEX KEY_NAME TIMEOUT VALUE 这种方式加锁来保证数据一致

刚开始学习redis源码,对一些概念理解还不是很深,如果有错误的地方,还请批评指正