Redis发布订阅模式

1 列表的局限

通过队列的rpush和blpop可以实现消息队列(队尾进队头出),没有任何元素可以弹出的时候,连接会被阻塞。但是基于list实现的消息队列,不支持一对多的消息分发,相当于只有一个消费者。

2 发布订阅模式

除了通过list实现消息队列之外,Redis还提供了发布订阅的功能

2.1 订阅频道

消息的生产者和消费者是不同的客户端,连接到同一个redis的服务。通过什么对象把生产者和消费者关联起来?

在RabbitMQ中叫Queue,在kafka中叫Topic。Redis的模型中这个叫channel(频道)。订阅者可以订阅一个或多个channel。消息的发布者可以给指定的channel发布于消息,只要有消息到达了channel,所有订阅了这个channel的订阅者都会收到这条消息。

订阅者订阅频道:可以一次订阅多个,比如这个客户端订阅了3个频道,频道不用实现创建。

1
subscribe channel-1 channel-2 channel-3

发布者可以向指定的频道发布消息(并不支持一次向多个频道发送消息)

1
publish channel-1 test

取消订阅(不能在订阅状态下使用)

1
unsubscribe channel-1

2.2 按规则(Pattern)订阅频道

?代表一个字符,*代表0个或多个字符。

例如,现在有三个新闻频道,运动新闻(news-sport),音乐新闻(news-music),天气新闻(news-weather)。

1
2
3
psubscribe *sport  //消费端1关注运动消息
psubscribe news* //消费端2关注所有消息
psubscribe news-weather //消费端3关注天气新闻

一般来说,考虑到性能和持久化的因素,不建议使用Redis的发布订阅功能来实现MQ。Redis的一些内部机制用到了发布订阅功能。

2.3 java伪代码

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
public class PublishTest {
public static void main(String[] args) {
Jedis jedis = new Jedis("127.0.0.1", 6379);
jedis.publish("test-123", "666");
jedis.publish("test-abc", "pengyuyan");
}
}

public class MyListener extends JedisPubSub {
// 取得订阅的消息后的处理
public void onMessage(String channel, String message) {
System.out.println(channel + "=" + message);
}
// 初始化订阅时候的处理
public void onSubscribe(String channel, int subscribedChannels) {
// System.out.println(channel + "=" + subscribedChannels);
}
// 取消订阅时候的处理
public void onUnsubscribe(String channel, int subscribedChannels) {
// System.out.println(channel + "=" + subscribedChannels);
}
// 初始化按表达式的方式订阅时候的处理
public void onPSubscribe(String pattern, int subscribedChannels) {
// System.out.println(pattern + "=" + subscribedChannels);
}
// 取消按表达式的方式订阅时候的处理
public void onPUnsubscribe(String pattern, int subscribedChannels) {
// System.out.println(pattern + "=" + subscribedChannels);
}
// 取得按表达式的方式订阅的消息后的处理
public void onPMessage(String pattern, String channel, String message) {
System.out.println(pattern + "=" + channel + "=" + message);
}
}

public class ListenTest {
public static void main(String[] args) {
Jedis jedis = new Jedis("127.0.0.1", 6379);
final MyListener listener = new MyListener();
// 使用模式匹配的方式设置频道
// 会阻塞
jedis.psubscribe(listener, new String[]{"test-*"});
}
}

Redis发布订阅模式
http://www.zivjie.cn/2023/03/04/中间件/redis/Redis发布订阅模式/
作者
Francis
发布于
2023年3月4日
许可协议