Redis 发布订阅

Redis 发布订阅

Redis 发布订阅(pub/sub)是一种消息通信模式:发送者(pub)发送消息,订阅者(sub)接收消息。Redis 客户端可以订阅任意数量的 channel。

下图展示了频道 channel1 , 以及订阅这个频道的三个客户端 —— client2 、 client5 和 client1 之间的关系:

当有新消息通过 PUBLISH 命令发送给频道 channel1 时, 这个消息就会被发送给订阅它的三个客户端:

Redis 发布订阅命令

SUBSCRIBE channel [channel …]

订阅一个或多个 channel

1
2
3
4
5
127.0.0.1:6380> SUBSCRIBE testChannel
Reading messages... (press Ctrl-C to quit)
1) "subscribe"
2) "testChannel"
3) (integer) 1

UNSUBSCRIBE [channel [channel …]]

退订一个或多个 channel

PSUBSCRIBE pattern [pattern …]

订阅所有匹配一个或多个 pattern 的 channel

PUNSUBSCRIBE [pattern [pattern …]]

退订所有匹配一个或多个 pattern 的 channel

PUBLISH channel message

发布消息到指定的 channel

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
# 打开一个新的控制台发布消息到 testChannel
127.0.0.1:6380> publish testChannel "test"
(integer) 1
127.0.0.1:6380> publish testChannel "test2"
(integer) 1

# 查看订阅 testChannel 的控制台
127.0.0.1:6380> SUBSCRIBE testChannel
Reading messages... (press Ctrl-C to quit)
1) "subscribe"
2) "testChannel"
3) (integer) 1
1) "message"
2) "testChannel"
3) "test"
1) "message"
2) "testChannel"
3) "test2"

PUBSUB subcommand [argument [argument …]]

查看订阅与发布系统状态。

1
2
127.0.0.1:6380> PUBSUB channels
1) "testChannel"

Java 实现 Redis 发布订阅

JavaApi 向 channel 发送消息可以直接调用 Jedis 的 publish(final String channel, final String message) 方法。订阅 channel 则需要调用 Jedis 的 subscribe(final JedisPubSub jedisPubSub, final String… channels)方法,并实现 JedisPubSub 类的 onMessage() 方法来处理消息。
Jedis 的 subscribe() 方法中调用了 JedisPubSub 的 proceed()方法,proceed()方法中订阅 channels 并调用了 process() 方法。process() 方法会一直循环调用 onMessage() 方法处理消息。所以使用 subscribe() 方法时,一般开启新的线程进行订阅,要不然 subscribe() 方法后面的代码不会执行。
下面是 JedisPubSub proceed()方法和 process() 方法的实现:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
public void proceed(Client client, String... channels) {
this.client = client;
client.subscribe(channels);
client.flush();
process(client);
}

private void process(Client client) {

do {
List<Object> reply = client.getRawObjectMultiBulkReply();
final Object firstObj = reply.get(0);
if (!(firstObj instanceof byte[])) {
throw new JedisException("Unknown message type: " + firstObj);
}
final byte[] resp = (byte[]) firstObj;
if (Arrays.equals(SUBSCRIBE.raw, resp)) {
subscribedChannels = ((Long) reply.get(2)).intValue();
final byte[] bchannel = (byte[]) reply.get(1);
final String strchannel = (bchannel == null) ? null : SafeEncoder.encode(bchannel);
onSubscribe(strchannel, subscribedChannels);
} else if (Arrays.equals(UNSUBSCRIBE.raw, resp)) {
subscribedChannels = ((Long) reply.get(2)).intValue();
final byte[] bchannel = (byte[]) reply.get(1);
final String strchannel = (bchannel == null) ? null : SafeEncoder.encode(bchannel);
onUnsubscribe(strchannel, subscribedChannels);
} else if (Arrays.equals(MESSAGE.raw, resp)) {
final byte[] bchannel = (byte[]) reply.get(1);
final byte[] bmesg = (byte[]) reply.get(2);
final String strchannel = (bchannel == null) ? null : SafeEncoder.encode(bchannel);
final String strmesg = (bmesg == null) ? null : SafeEncoder.encode(bmesg);
onMessage(strchannel, strmesg);
} else if (Arrays.equals(PMESSAGE.raw, resp)) {
final byte[] bpattern = (byte[]) reply.get(1);
final byte[] bchannel = (byte[]) reply.get(2);
final byte[] bmesg = (byte[]) reply.get(3);
final String strpattern = (bpattern == null) ? null : SafeEncoder.encode(bpattern);
final String strchannel = (bchannel == null) ? null : SafeEncoder.encode(bchannel);
final String strmesg = (bmesg == null) ? null : SafeEncoder.encode(bmesg);
onPMessage(strpattern, strchannel, strmesg);
} else if (Arrays.equals(PSUBSCRIBE.raw, resp)) {
subscribedChannels = ((Long) reply.get(2)).intValue();
final byte[] bpattern = (byte[]) reply.get(1);
final String strpattern = (bpattern == null) ? null : SafeEncoder.encode(bpattern);
onPSubscribe(strpattern, subscribedChannels);
} else if (Arrays.equals(PUNSUBSCRIBE.raw, resp)) {
subscribedChannels = ((Long) reply.get(2)).intValue();
final byte[] bpattern = (byte[]) reply.get(1);
final String strpattern = (bpattern == null) ? null : SafeEncoder.encode(bpattern);
onPUnsubscribe(strpattern, subscribedChannels);
} else if (Arrays.equals(PONG.raw, resp)) {
final byte[] bpattern = (byte[]) reply.get(1);
final String strpattern = (bpattern == null) ? null : SafeEncoder.encode(bpattern);
onPong(strpattern);
} else {
throw new JedisException("Unknown message type: " + firstObj);
}
} while (isSubscribed());

下面是一个简单的实现发布的订阅模式的代码,github地址:PubsubDemo.java

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
public class PubsubDemo {

private final String ip;
private final int port;
private final String channel;
private final JedisPool jedisPool;

public PubsubDemo(String ip, int port, String channel) {
this.ip = ip;
this.port = port;
this.channel = channel;
JedisPoolConfig config = new JedisPoolConfig();
jedisPool = new JedisPool(config, ip, port); // 初始化JedisPool
}

/**
* Subscribe 订阅线程
* 订阅 channel,并打印接收到的消息
*/
class Subscribe implements Runnable {

@Override
public void run() {
Jedis jedis = null;
try {
jedis = jedisPool.getResource();
jedis.subscribe(new JedisPubSub() {
@Override
public void onMessage(String channel, String message) {
System.out.println("receive message[ channel: " + channel + ", message: " + message + "]");
}
}, channel);
} finally {
if (jedis != null) {
jedis.close();
}
}
}
}

/**
* Publish 发布线程
* 向 channel 中每秒发送一个递增的数
*/
class Publish implements Runnable {

@Override
public void run() {
Jedis jedis = null;
try {
jedis = jedisPool.getResource();
int i = 0;
while (true) {
jedis.publish(channel, String.valueOf(i++));
Thread.sleep(1000);
}
} catch (Exception e) {
e.printStackTrace();
} finally {
if (jedis != null) {
jedis.close();
}
}
}
}

// 启动方法
public void run() throws Exception {
// 启动一个 Subscribe 订阅线程
Thread subscribeThread = new Thread(new Subscribe(), "SubscribeThread");
subscribeThread.start();

// 启动一个 Publish 发布线程
Thread publishThread = new Thread(new Publish(), "PublishThread");
publishThread.start();

publishThread.join();
subscribeThread.join();
}


public static void main(String[] args) throws Exception {
final String ip = "192.168.1.153";
final int port = 6380;
final String channel = "testChannel";

PubsubDemo pubsub = new PubsubDemo(ip, port, channel);
pubsub.run();
}
}