003.消息的发布和订阅

发布消息

要发布消息,可以使用与其他操作一样的低级RedisConnection或高级RedisTemplate。两个实体都提供发布方法,该方法接受消息和目标通道作为参数。RedisConnection需要原始数据(字节数组),RedisTemplate允许将任意对象作为消息传入。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12

@Test
public void test() {
    byte[] msg = "msg1".getBytes(StandardCharsets.UTF_8);
    byte[] channel = "channel1".getBytes(StandardCharsets.UTF_8);

    stringRedisTemplate.getConnectionFactory().getConnection().publish(channel, msg);
    stringRedisTemplate.convertAndSend("channel2", "msg2");

    System.out.println("");
}

(实验中,我发送了消息,似乎在ARDP中看不到)

接受消息

在接收端可以通过直接命名或使用模式来匹配订阅一个或多个频道。后一种方法非常有用,因为它不仅允许使用一个命令创建多个订阅,而且还可以侦听订阅时尚未创建的频道(只要它们与模式匹配)。

在底层,RedisConnection提供subscribe和pSubscribe方法,它们分别映射Redis命令以按通道或按模式订阅。请注意,多个通道或模式可以用作参数。

RedisConnection提供了getSubscription和isSubscribed方法来更改连接的订阅或查询它是否正在侦听。

Spring Data Redis中的订阅命令是阻塞的。 也就是说,在连接上调用subscribe会导致当前线程在开始等待消息时阻塞。 仅当取消订阅时才释放线程,这发生在另一个线程在同一连接上调用unsubscribe或pUnsubscribe时。

Message Listener Containers

因为原生的写法是阻塞的,所以没有太大的意义,而且还需要对每个Listener进行连接和线程管理。

Spring Data提供了RedisMessageListenerContainer,它完成了所有繁复的工作。

RedisMessageListenerContainer充当消息侦听器容器。它用于从Redis通道接收消息并驱动注入其中的MessageListener实例。侦听器容器负责消息接收的所有线程,并分派到侦听器中进行处理。消息侦听器容器是MDP 和消息传递提供者之间的中介,负责注册以接收消息、资源获取和释放、异常转换等。这使您作为应用程序开发人员可以编写与接收消息(并对其作出反应)相关的(可能很复杂的)业务逻辑,并将样板Redis基础架构问题委托给框架。

此外,为了最小化应用程序占用空间,RedisMessageListenerContainer允许多个侦听器共享一个连接和一个线程,即使它们不共享订阅。因此,无论应用程序跟踪多少侦听器或通道,运行时成本在其整个生命周期内都保持不变。此外,容器允许更改运行时配置,以便您可以在应用程序运行时添加或删除侦听器,而无需重新启动。此外,容器使用惰性订阅方法,仅在需要时使用RedisConnection。如果所有侦听器都取消订阅,则会自动执行清理,并释放线程。

为了帮助处理消息的异步特性,容器需要一个java.util.concurrent.Executor(或 Spring 的 TaskExecutor)来分派消息。根据负载、侦听器的数量或运行时环境,您应该更改或调整执行器以更好地满足您的需求。特别是在托管环境(例如应用服务器)中,强烈建议选择合适的TaskExecutor以利用其运行时。

MessageListenerAdapter

MessageListenerAdapter允许将任何类公开为MDP。

(这部分知识很酷,但是我对Redis的Pub和Sub的用法兴趣并不是很高,生产中这部分知识几乎没有用到,所以暂时不深入研究了)