此版本仍在开发中,尚未被视为稳定版本。如需使用最新稳定版本,请访问 Spring Data Redis 4.0.4spring-doc.cadn.net.cn

订阅(接收消息)

在接收端,可以通过直接命名或使用模式匹配来订阅一个或多个频道。 后一种方法非常有用,因为它不仅可以一条命令创建多个订阅,还可以监听在订阅时尚未创建的频道(只要它们符合模式)。spring-doc.cadn.net.cn

在底层,RedisConnection 提供了 subscribepSubscribe 方法,分别用于映射通过频道或模式订阅的 Redis 命令。 请注意,多个频道或模式可以作为参数使用。 若要更改连接的订阅状态或查询其是否正在监听,RedisConnection 提供了 getSubscriptionisSubscribed 方法。spring-doc.cadn.net.cn

Spring Data Redis 中的订阅命令是阻塞的。 也就是说,在连接上调用 subscribe 会导致当前线程阻塞,因为它开始等待消息。 只有当订阅被取消时,线程才会被释放,而这种情况发生在另一个线程在同一连接上调用了 unsubscribepUnsubscribe 时。 请参阅本文档稍后部分的内容“消息监听器容器”,以了解解决此问题的方法。

如前所述,一旦订阅成功,连接将开始等待消息。 仅允许执行添加新订阅、修改现有订阅和取消现有订阅的命令。 调用除 subscribepSubscribeunsubscribepUnsubscribe 以外的任何操作都会抛出异常。spring-doc.cadn.net.cn

为了订阅消息,需要实现 MessageListener 回调。 每次有新消息到达时,该回调将被调用,并且用户代码将通过 onMessage 方法执行。 该接口不仅提供对实际消息的访问,还提供对接收该消息的通道以及订阅用于匹配通道的模式(如果有)的访问。 这些信息使得被调用者不仅能够根据内容区分各种消息,还可以通过检查其他详细信息来区分。spring-doc.cadn.net.cn

消息监听器容器

由于其阻塞性质,低级订阅并不吸引人,因为它需要为每个监听器管理连接和线程。 为了缓解这个问题,Spring Data 提供了 RedisMessageListenerContainer,它负责处理所有繁重的任务。 如果您熟悉 EJB 和 JMS,您会发现这些概念很熟悉,因为其设计目标是尽可能接近 Spring Framework 及其消息驱动 POJO(MDP)中的支持。spring-doc.cadn.net.cn

RedisMessageListenerContainer 充当消息监听器容器。 它用于从 Redis 通道接收消息,并驱动注入其中的 MessageListener 实例。 监听器容器负责消息接收和分发到监听器进行处理的所有线程管理。 消息监听器容器是 MDP 与消息提供者之间的中介,负责注册以接收消息、资源的获取与释放、异常转换等。 这使得您作为应用程序开发人员可以编写与接收消息(及其响应)相关的(可能复杂的)业务逻辑,而将样板式的 Redis 基础设施问题委托给框架处理。spring-doc.cadn.net.cn

一个 MessageListener 还可以实现 SubscriptionListener,以便在订阅/取消订阅确认时接收通知。 监听订阅通知在同步调用时非常有用。spring-doc.cadn.net.cn

此外,为了最小化应用程序的占用空间,RedisMessageListenerContainer 允许单个连接和单个线程被多个监听器共享,即使它们不共享订阅。 因此,无论应用程序跟踪多少个监听器或通道,其运行时成本在其整个生命周期内都保持不变。 此外,容器支持运行时配置更改,使您能够在应用程序运行时添加或移除监听器,而无需重新启动。 另外,容器采用延迟订阅方式,仅在需要时使用 RedisConnection。 如果所有监听器都已取消订阅,将自动执行清理并释放线程。spring-doc.cadn.net.cn

为了帮助处理消息的异步特性,容器需要一个 java.util.concurrent.Executor(或 Spring 的 TaskExecutor)来分发消息。 根据负载、监听器数量或运行时环境的不同,您应该调整或优化执行器以更好地满足您的需求。 特别是,在托管环境(如应用服务器)中,强烈建议选择合适的 TaskExecutor 以充分利用其运行时优势。spring-doc.cadn.net.cn

消息监听器适配器

MessageListenerAdapter 类是 Spring 异步消息支持的最终组件。 简而言之,它允许您将几乎任何类公开为 MDP(尽管存在一些限制)。spring-doc.cadn.net.cn

考虑以下接口定义:spring-doc.cadn.net.cn

public interface MessageDelegate {
  void handleMessage(String message);
  void handleMessage(Map message);
  void handleMessage(byte[] message);
  void handleMessage(Serializable message);
  // pass the channel/pattern as well
  void handleMessage(Serializable message, String channel);
 }

请注意,尽管该接口未扩展 MessageListener 接口,它仍可通过使用 MessageListenerAdapter 类作为 MDP 来使用。 同时请注意,各种消息处理方法如何根据其能够接收和处理的各类 Message 类型的内容进行强类型定义。 此外,发送消息的通道或模式可以作为类型为 String 的第二个参数传递给方法:spring-doc.cadn.net.cn

public class DefaultMessageDelegate implements MessageDelegate {
  // implementation elided for clarity...
}
Notice how the above implementation of the `MessageDelegate` interface (the above `DefaultMessageDelegate` class) has *no* Redis dependencies at all. It truly is a POJO that we make into an MDP with the following configuration:
@Configuration
class MyConfig {

  // …

  @Bean
  DefaultMessageDelegate listener() {
    return new DefaultMessageDelegate();
  }

  @Bean
  MessageListenerAdapter messageListenerAdapter(DefaultMessageDelegate listener) {
    return new MessageListenerAdapter(listener, "handleMessage");
  }

  @Bean
  RedisMessageListenerContainer redisMessageListenerContainer(RedisConnectionFactory connectionFactory, MessageListenerAdapter listener) {

    RedisMessageListenerContainer container = new RedisMessageListenerContainer();
    container.setConnectionFactory(connectionFactory);
    container.addMessageListener(listener, ChannelTopic.of("chatroom"));
    return container;
  }
}
<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
   xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
   xmlns:redis="http://www.springframework.org/schema/redis"
   xsi:schemaLocation="http://www.springframework.org/schema/beans https://www.springframework.org/schema/beans/spring-beans.xsd
   http://www.springframework.org/schema/redis https://www.springframework.org/schema/redis/spring-redis.xsd">

<!-- the default ConnectionFactory -->
<redis:listener-container>
  <!-- the method attribute can be skipped as the default method name is "handleMessage" -->
  <redis:listener ref="listener" method="handleMessage" topic="chatroom" />
</redis:listener-container>

<bean id="listener" class="redisexample.DefaultMessageDelegate"/>
 ...
</beans>
监听器主题可以是一个通道(例如,topic="chatroom"Topic.channel("chatroom")),也可以是一个模式(例如,topic="*room"Topic.pattern("*room"))。

前面的示例使用 Redis 命名空间来声明消息监听器容器,并自动将 POJO 注册为监听器。 完整的 Bean 定义如下:spring-doc.cadn.net.cn

<bean id="messageListener" class="org.springframework.data.redis.listener.adapter.MessageListenerAdapter">
  <constructor-arg>
    <bean class="redisexample.DefaultMessageDelegate"/>
  </constructor-arg>
</bean>

<bean id="redisContainer" class="org.springframework.data.redis.listener.RedisMessageListenerContainer">
  <property name="connectionFactory" ref="connectionFactory"/>
  <property name="messageListeners">
    <map>
      <entry key-ref="messageListener">
        <bean class="org.springframework.data.redis.listener.ChannelTopic">
          <constructor-arg value="chatroom"/>
        </bean>
      </entry>
    </map>
  </property>
</bean>

每次收到消息时,适配器会自动且透明地在低级格式与所需的对象类型之间执行转换(使用配置的 RedisSerializer)。 由方法调用引发的任何异常都会被容器捕获并处理(默认情况下,异常会被记录到日志中)。spring-doc.cadn.net.cn

响应式消息监听容器

Spring Data 提供了 ReactiveRedisMessageListenerContainer,它为用户承担了所有繁重的转换和订阅状态管理工作。spring-doc.cadn.net.cn

消息监听器容器本身不需要外部线程资源。 它使用驱动程序线程来发布消息。spring-doc.cadn.net.cn

ReactiveRedisConnectionFactory factory = …
ReactiveRedisMessageListenerContainer container = new ReactiveRedisMessageListenerContainer(factory);

Flux<ChannelMessage<String, String>> stream = container.receive(ChannelTopic.of("my-channel"));

为了等待并确保正确的订阅,您可以使用 receiveLater 方法,该方法返回一个 Mono<Flux<ChannelMessage>>。 生成的 Mono 会在完成对给定主题的订阅后,以内层发布者作为结果完成。 通过拦截 onNext 信号,您可以同步服务器端的订阅。spring-doc.cadn.net.cn

ReactiveRedisConnectionFactory factory = …
ReactiveRedisMessageListenerContainer container = new ReactiveRedisMessageListenerContainer(factory);

Mono<Flux<ChannelMessage<String, String>>> stream = container.receiveLater(ChannelTopic.of("my-channel"));

stream.doOnNext(inner -> // notification hook when Redis subscriptions are synchronized with the server)
    .flatMapMany(Function.identity())
    .…;

通过模板 API 进行订阅

如上所述,您可以直接使用 ReactiveRedisTemplate 来订阅频道或模式。 这种方法提供了一种简单直接的解决方案,尽管功能有限,因为您无法在初始订阅后添加新的订阅。 不过,您仍然可以通过返回的 Flux(例如使用 take(Duration))来控制消息流。 当读取完成、发生错误或取消时,所有绑定的资源将被释放。spring-doc.cadn.net.cn

redisTemplate.listenToChannel("channel1", "channel2").doOnNext(msg -> {
    // message processing ...
}).subscribe();