对于最新的稳定版本,请使用 Spring Data Redis 4.0.4spring-doc.cadn.net.cn

发布/订阅消息传递

Spring Data 为 Redis 提供了专门的消息传递集成,其功能和命名方式类似于 Spring Framework 中的 JMS 集成。spring-doc.cadn.net.cn

Redis 消息传递的功能大致可分为两个方面:spring-doc.cadn.net.cn

这是一种通常被称为发布/订阅(简称 Pub/Sub)模式的示例。RedisTemplate 类用于消息的发送。对于类似于 Java EE 消息驱动 Bean 风格的异步接收,Spring Data 提供了一个专用的消息监听器容器,用于创建消息驱动 POJO(MDP);而对于同步接收,则使用 RedisConnection 接口。spring-doc.cadn.net.cn

org.springframework.data.redis.connectionorg.springframework.data.redis.listener 包提供了 Redis 消息传递的核心功能。spring-doc.cadn.net.cn

发布(发送消息)

要发布消息,您可以像使用其他操作一样,选择使用底层的 [Reactive]RedisConnection 或高层的 [Reactive]RedisOperations。这两个实体都提供了 publish 方法,该方法接受消息和目标频道作为参数。虽然 RedisConnection 需要原始数据(字节数组),但 [Reactive]RedisOperations 允许传入任意对象作为消息,如下例所示:spring-doc.cadn.net.cn

// send message through connection
RedisConnection con = …
byte[] msg = …
byte[] channel = …
con.pubSubCommands().publish(msg, channel);

// send message through RedisOperations
RedisOperations operations = …
Long numberOfClients = operations.convertAndSend("hello!", "world");
// send message through connection
ReactiveRedisConnection con = …
ByteBuffer[] msg = …
ByteBuffer[] channel = …
con.pubSubCommands().publish(msg, channel);

// send message through ReactiveRedisOperations
ReactiveRedisOperations operations = …
Mono<Long> numberOfClients = operations.convertAndSend("hello!", "world");

订阅(接收消息)

在接收端,可以订阅一个或多个频道,既可以显式指定频道名称,也可以使用模式匹配。后一种方法非常有用,因为它不仅可以通过一条命令创建多个订阅,还可以监听在订阅时尚未创建的频道(只要它们符合该模式即可)。spring-doc.cadn.net.cn

在底层,RedisConnection 提供了 subscribepSubscribe 方法,分别对应 Redis 中按通道订阅和按模式订阅的命令。请注意,多个通道或模式可以作为参数使用。为了更改连接的订阅状态或查询其是否正在监听,RedisConnection 提供了 getSubscriptionisSubscribed 方法。spring-doc.cadn.net.cn

Spring Data Redis 中的订阅命令是阻塞的。也就是说,在连接上调用 subscribe 会导致当前线程阻塞,因为它开始等待消息。只有当订阅被取消时,该线程才会被释放,而订阅取消发生在另一个线程在同一个连接上调用 unsubscribepUnsubscribe 时。有关此问题的解决方案,请参见本文档后文的“消息监听器容器”。

如前所述,一旦订阅后,连接就会开始等待消息。此时只允许执行用于添加新订阅、修改现有订阅和取消现有订阅的命令。如果调用除 subscribepSubscribeunsubscribepUnsubscribe 之外的任何操作,都会抛出异常。spring-doc.cadn.net.cn

为了订阅消息,需要实现 MessageListener 回调接口。每当有新消息到达时,该回调会被触发,并通过 onMessage 方法执行用户代码。该接口不仅提供了对实际消息的访问,还提供了接收该消息的通道以及订阅时用于匹配通道的模式(如果有的话)。这些信息使得调用方不仅能根据消息内容,还能通过检查额外的细节来区分不同的消息。spring-doc.cadn.net.cn

消息监听器容器

由于其阻塞性质,低级订阅并不吸引人,因为它需要为每个监听器管理连接和线程。为了缓解这个问题,Spring Data 提供了 RedisMessageListenerContainer,它承担了所有繁重的工作。如果您熟悉 EJB 和 JMS,您会发现这些概念很熟悉,因为它的设计尽可能接近 Spring Framework 及其消息驱动 POJO(MDP)中的支持。spring-doc.cadn.net.cn

RedisMessageListenerContainer 充当消息监听容器。它用于从 Redis 通道接收消息,并驱动注入其中的 MessageListener 实例。监听容器负责消息接收和分发的所有线程管理,并将消息分发到监听器进行处理。消息监听容器是消息驱动 bean(MDP)与消息提供者之间的中介,负责注册接收消息、资源获取与释放、异常转换等事宜。这使得您作为应用程序开发人员可以编写与接收消息(及对其作出响应)相关的(可能复杂的)业务逻辑,而将冗余的 Redis 基础设施问题委托给框架处理。spring-doc.cadn.net.cn

一个 MessageListener 还可以实现 SubscriptionListener,以便在订阅/取消订阅确认时接收通知。监听订阅通知在同步调用时非常有用。spring-doc.cadn.net.cn

此外,为了最小化应用程序的占用空间,RedisMessageListenerContainer 允许单个连接和单个线程被多个监听器共享,即使它们不共享订阅。因此,无论应用程序跟踪多少监听器或通道,其运行时的成本在其整个生命周期内都保持不变。此外,容器支持运行时配置更改,因此您可以在应用程序运行时添加或移除监听器,而无需重新启动。另外,容器采用延迟订阅方式,仅在需要时使用 RedisConnection。如果所有监听器都取消订阅,系统将自动执行清理并释放线程。spring-doc.cadn.net.cn

为了应对消息的异步特性,该容器需要一个 java.util.concurrent.Executor(或 Spring 的 TaskExecutor)来分发消息。根据负载、监听器数量或运行环境的不同,您应调整或优化执行器,以更好地满足自身需求。特别是在托管环境(例如应用服务器)中,强烈建议选择合适的 TaskExecutor,以充分利用其运行时特性。spring-doc.cadn.net.cn

消息监听器适配器

MessageListenerAdapter 类是 Spring 异步消息支持的最终组件。简而言之,它允许您将几乎任何类公开为 MDP(尽管存在一些限制)。spring-doc.cadn.net.cn

考虑以下接口定义:spring-doc.cadn.net.cn

public interface MessageDelegate {
  void handleMessage(String message);
  void handleMessage(Map message);
  void handleMessage(byte[] message);
  void handleMessage(Serializable message);
  // pass the channel/pattern as well
  void handleMessage(Serializable message, String channel);
 }

请注意,尽管该接口没有扩展 MessageListener 接口,它仍然可以通过使用 MessageListenerAdapter 类作为 MDP 来使用。此外,请观察各种消息处理方法如何根据其能够接收和处理的各类 Message 类型的内容进行强类型化。另外,发送消息的通道或模式可以作为类型为 String 的第二个参数传递给方法:spring-doc.cadn.net.cn

public class DefaultMessageDelegate implements MessageDelegate {
  // implementation elided for clarity...
}
Notice how the above implementation of the `MessageDelegate` interface (the above `DefaultMessageDelegate` class) has *no* Redis dependencies at all. It truly is a POJO that we make into an MDP with the following configuration:
@Configuration
class MyConfig {

  // …

  @Bean
  DefaultMessageDelegate listener() {
    return new DefaultMessageDelegate();
  }

  @Bean
  MessageListenerAdapter messageListenerAdapter(DefaultMessageDelegate listener) {
    return new MessageListenerAdapter(listener, "handleMessage");
  }

  @Bean
  RedisMessageListenerContainer redisMessageListenerContainer(RedisConnectionFactory connectionFactory, MessageListenerAdapter listener) {

    RedisMessageListenerContainer container = new RedisMessageListenerContainer();
    container.setConnectionFactory(connectionFactory);
    container.addMessageListener(listener, ChannelTopic.of("chatroom"));
    return container;
  }
}
<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
   xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
   xmlns:redis="http://www.springframework.org/schema/redis"
   xsi:schemaLocation="http://www.springframework.org/schema/beans https://www.springframework.org/schema/beans/spring-beans.xsd
   http://www.springframework.org/schema/redis https://www.springframework.org/schema/redis/spring-redis.xsd">

<!-- the default ConnectionFactory -->
<redis:listener-container>
  <!-- the method attribute can be skipped as the default method name is "handleMessage" -->
  <redis:listener ref="listener" method="handleMessage" topic="chatroom" />
</redis:listener-container>

<bean id="listener" class="redisexample.DefaultMessageDelegate"/>
 ...
</beans>
监听器主题可以是一个通道(例如,topic="chatroom"Topic.channel("chatroom")),也可以是一个模式(例如,topic="*room"Topic.pattern("*room"))。

前面的示例使用 Redis 命名空间来声明消息监听器容器,并自动将 POJO 注册为监听器。完整的 Bean 定义如下所示:spring-doc.cadn.net.cn

<bean id="messageListener" class="org.springframework.data.redis.listener.adapter.MessageListenerAdapter">
  <constructor-arg>
    <bean class="redisexample.DefaultMessageDelegate"/>
  </constructor-arg>
</bean>

<bean id="redisContainer" class="org.springframework.data.redis.listener.RedisMessageListenerContainer">
  <property name="connectionFactory" ref="connectionFactory"/>
  <property name="messageListeners">
    <map>
      <entry key-ref="messageListener">
        <bean class="org.springframework.data.redis.listener.ChannelTopic">
          <constructor-arg value="chatroom"/>
        </bean>
      </entry>
    </map>
  </property>
</bean>

每次接收到消息时,适配器都会自动且透明地(使用配置的RedisSerializer)在底层格式和所需对象类型之间执行转换。方法调用引发的任何异常都会被容器捕获并处理(默认情况下,异常会被记录日志)。spring-doc.cadn.net.cn

响应式消息监听容器

Spring Data 提供了 ReactiveRedisMessageListenerContainer,它为用户承担了所有繁重的转换和订阅状态管理工作。spring-doc.cadn.net.cn

消息监听器容器本身不需要外部线程资源。它使用驱动程序线程来发布消息。spring-doc.cadn.net.cn

ReactiveRedisConnectionFactory factory = …
ReactiveRedisMessageListenerContainer container = new ReactiveRedisMessageListenerContainer(factory);

Flux<ChannelMessage<String, String>> stream = container.receive(ChannelTopic.of("my-channel"));

为了等待并确保正确完成订阅,您可以使用 receiveLater 方法,该方法返回一个 Mono<Flux<ChannelMessage>>。 所得到的 Mono 会在成功订阅指定主题后,以一个内部发布者(publisher)作为结果完成。通过拦截 onNext 信号,您可以同步服务器端的订阅。spring-doc.cadn.net.cn

ReactiveRedisConnectionFactory factory = …
ReactiveRedisMessageListenerContainer container = new ReactiveRedisMessageListenerContainer(factory);

Mono<Flux<ChannelMessage<String, String>>> stream = container.receiveLater(ChannelTopic.of("my-channel"));

stream.doOnNext(inner -> // notification hook when Redis subscriptions are synchronized with the server)
    .flatMapMany(Function.identity())
    .…;

通过模板 API 订阅

如上所述,您可以直接使用 ReactiveRedisTemplate 来订阅频道或模式。这种方法提供了一种简单直接的解决方案,尽管存在局限性,即您无法在初始订阅之后添加新的订阅。不过,您仍然可以通过返回的 Flux(例如使用 take(Duration))来控制消息流。当完成读取、发生错误或取消时,所有绑定的资源将被释放。spring-doc.cadn.net.cn

redisTemplate.listenToChannel("channel1", "channel2").doOnNext(msg -> {
    // message processing ...
}).subscribe();