发布与订阅

Redis提供了发布订阅功能,可以用于消息的传输

Redis的发布订阅机制包括三个部分,publisher,subscriber和Channel


发布者和订阅者都是Redis客户端,Channel则为Redis服务器端。

发布者将消息发送到某个的频道,订阅了这个频道的订阅者就能接收到这条消息。

频道/模式的订阅与退订

  • subscribe:订阅 subscribe channel1 channel2 …

    Redis客户端1订阅频道1和频道2

    1
    2
    3
    4
    5
    6
    7
    8
    127.0.0.1:6379> subscribe ch1 ch2
    Reading messages... (press Ctrl-C to quit)
    1) "subscribe"
    2) "ch1"
    3) (integer) 1
    1) "subscribe"
    2) "ch2"
    3) (integer) 2
  • publish:发布消息 publish channel message

    Redis客户端2将消息发布在频道1和频道2上

    1
    2
    3
    4
    127.0.0.1:6379> publish ch1 hello
    (integer) 1
    127.0.0.1:6379> publish ch2 world
    (integer) 1

    Redis客户端1接收到频道1和频道2的消息

    1
    2
    3
    4
    5
    6
    1) "message"
    2) "ch1"
    3) "hello"
    1) "message"
    2) "ch2"
    3) "world"
  • unsubscribe:退订 channel

    Redis客户端1退订频道1

    1
    2
    3
    4
    127.0.0.1:6379> unsubscribe ch1
    1) "unsubscribe"
    2) "ch1"
    3) (integer) 0
  • psubscribe:模式匹配 psubscribe +模式

    Redis客户端1订阅所有以ch开头的频道

    1
    2
    3
    4
    5
    127.0.0.1:6379> psubscribe ch*
    Reading messages... (press Ctrl-C to quit)
    1) "psubscribe"
    2) "ch*"
    3) (integer) 1

    Redis客户端2发布信息在频道5上

    1
    2
    127.0.0.1:6379> publish ch5 helloworld
    (integer) 1

    Redis客户端1收到频道5的信息

    1
    2
    3
    4
    1) "pmessage"
    2) "ch*"
    3) "ch5"
    4) "helloworld"
  • punsubscribe 退订模式

    1
    2
    3
    4
    127.0.0.1:6379> punsubscribe ch*
    1) "punsubscribe"
    2) "ch*"
    3) (integer) 0

发布订阅的机制

订阅某个频道或模式

  • 客户端(client)

    • 属性为pubsub_channels,该属性表明了该客户端订阅的所有频道

    • 属性为pubsub_patterns,该属性表示该客户端订阅的所有模式

    1
    2
    3
    4
    5
    6
    typedef struct redisClient {
    ...
    dict *pubsub_channels; //该client订阅的channels,以channel为key用dict的方式组织
    list *pubsub_patterns; //该client订阅的pattern,以list的方式组织
    ...
    }
  • 服务器端(RedisServer)

    • 属性为pubsub_channels,该服务器端中的所有频道以及订阅了这个频道的客户端

    • 属性为pubsub_patterns,该服务器端中的所有模式和订阅了这些模式的客户端

    1
    2
    3
    4
    5
    6
    struct redisServer {
    ...
    dict *pubsub_channels; //redis server进程中维护的channel dict,它以channel为key,订阅channel的client list为value
    list *pubsub_patterns; //redis server进程中维护的pattern list int notify_keyspace_events;
    ...
    };

当客户端向某个频道发送消息时,Redis首先在redisServer中的pubsub_channels中找出键为该频道的结点,遍历该结点的值,即遍历订阅了该频道的所有客户端,将消息发送给这些客户端。

然后,遍历结构体redisServer中的pubsub_patterns,找出包含该频道的模式的结点,将消息发送给订阅了该模式的客户端。

使用场景

  • 哨兵模式

    在Redis哨兵模式中,哨兵通过发布与订阅的方式与Redis主服务器和Redis从服务器进行通信。这个我们将在后面的章节中详细讲解。

  • Redisson框架使用

    Redisson是一个分布式锁框架,在Redisson分布式锁释放的时候,是使用发布与订阅的方式通知的,这个我们将在后面的章节中详细讲解。

事务

Redis的事务是通过multi、exec、discard和watch这四个命令来完成的。

Redis的单个命令都是原子性的,所以这里需要确保事务性的对象是命令集合。

Redis将命令集合序列化并确保处于同一事务的命令集合连续且不被打断的执行。

Redis不支持回滚操作。

事务命令

multi:用于标记事务块的开始,Redis会将后续的命令逐个放入队列中,然后使用exec原子化地执行这个命令队列

exec:执行命令队列

discard:清除命令队列

watch:监视key

unwatch:清除监视key


1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
127.0.0.1:6379> multi
OK
127.0.0.1:6379> set s1 222
QUEUED
127.0.0.1:6379> hset set1 name zhangfei
QUEUED
127.0.0.1:6379> exec
1) OK
2) (integer) 1

127.0.0.1:6379> multi
OK
127.0.0.1:6379> set s2 333
QUEUED
127.0.0.1:6379> hset set2 age 23
QUEUED
127.0.0.1:6379> discard
OK
127.0.0.1:6379> exec
(error) ERR EXEC without MULTI

127.0.0.1:6379> watch s1
OK
127.0.0.1:6379> multi
OK
127.0.0.1:6379> set s1 555
QUEUED
127.0.0.1:6379> exec

# 此时在没有exec之前,通过另一个命令窗口对监控的s1字段进行 修改(nil)
127.0.0.1:6379> get s1
222

127.0.0.1:6379> unwatch OK

事务机制

事务的执行

    1. 事务开始

    在RedisClient中,有属性flags,用来表示是否在事务中flags=REDIS_MULTI

    1. 命令入队

    RedisClient将命令存放在事务队列中(EXEC,DISCARD,WATCH,MULTI除外)

    1. 事务队列

    multiCmd *commands 用于存放命令

    1. 执行事务

    RedisClient向服务器端发送exec命令,RedisServer会遍历事务队列,执行队列中的命令,最后将执行的结果一次性返回给客户端。

如果某条命令在入队过程中发生错误,redisClient将flags置为REDIS_DIRTY_EXEC,EXEC命令将会失败返回。


1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
typedef struct redisClient{
// flags 状态
int flags
// 事务状态
multiState mstate;
// .....
}redisClient;

// 事务状态
typedef struct multiState{
// 事务队列,FIFO顺序
// 是一个数组,先入队的命令在前,后入队在后
multiCmd *commands;
// 已入队命令数
int count;
}multiState;

// 事务队列
typedef struct multiCmd{
// 参数
robj **argv;
// 参数数量
int argc;
// 命令指针
struct redisCommand *cmd;
}multiCmd;

Watch的执行

  • 使用WATCH命令监视数据库键

    redisDb有一个watched_keys字典,key是某个被监视的数据的key,值是一个链表.记录了所有监视这个数据的客户端

  • 监视机制的触发

    当修改数据后,监视这个数据的客户端的flags置为REDIS_DIRTY_CAS

  • 事务执行

    RedisClient向服务器端发送exec命令,服务器判断RedisClient的flags,如果为REDIS_DIRTY_CAS,则清空事务队列。


1
2
3
4
5
6
typedef struct redisDb{
// .....
// 正在被WATCH命令监视的键
dict *watched_keys;
// .....
}redisDb;

Redis的弱事务性

  • Redis语法错误

    整个事务的命令在队列里都清除

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    127.0.0.1:6379> multi
    OK
    127.0.0.1:6379> sets m1 44
    (error) ERR unknown command `sets`, with args beginning with: `m1`, `44`,
    127.0.0.1:6379> set m2 55
    QUEUED
    127.0.0.1:6379> exec
    (error) EXECABORT Transaction discarded because of previous errors.
    127.0.0.1:6379> get m1
    "22"

    flags=REDIS_DIRTY_EXEC

  • Redis运行错误

    在队列里正确的命令可以执行 (弱事务性)

    弱事务性 :

    • 在队列里正确的命令可以执行 (非原子操作)

    • 不支持回滚

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    127.0.0.1:6379> multi
    OK
    127.0.0.1:6379> set m1 55
    QUEUED
    127.0.0.1:6379> lpush m1 1 2 3 #不能是语法错误
    QUEUED
    127.0.0.1:6379> exec
    1) OK
    2) (error) WRONGTYPE Operation against a key holding the wrong kind of value
    127.0.0.1:6379> get m1
    "55"
  • Redis不支持事务回滚(为什么呢)

    • 大多数事务失败是因为语法错误或者类型错误,这两种错误,在开发阶段都是可以预见的

    • Redis为了性能方面就忽略了事务回滚。 (回滚记录历史版本)