此版本仍在开发中,尚未被视为稳定版本。如需使用最新稳定版本,请访问 Spring Data Redis 4.0.4spring-doc.cadn.net.cn

基于注解驱动的监听器端点

接收异步消息最简单的方法是使用带注解的监听器端点基础设施。 简而言之,它允许您将托管 Bean 的方法公开为 Redis 监听器端点。 以下示例展示了如何使用它:spring-doc.cadn.net.cn

@Component
public class MyService {

  @RedisListener(topic = "my-channel")
  public void processOrder(String data) { ... }
}

前述示例的思路是,每当通过通道 my-channel 接收到消息时,就会相应地调用 processOrder 方法(在本例中,使用 Redis Pub/Sub 消息的内容,类似于 MessageListenerAdapter 所提供的功能)。spring-doc.cadn.net.cn

带注解的端点基础设施会为每个带注解的方法在后台创建一个消息监听器,并注册到 RedisMessageListenerContainer。 端点不会针对应用程序上下文进行注册,但可以通过使用 RedisListenerEndpointRegistry bean 轻松定位以进行管理。spring-doc.cadn.net.cn

@RedisListener 是一个可重复注解,因此您可以通过添加额外的 @RedisListener 声明,将多个 Redis 主题(频道或模式)与同一方法关联。

启用监听器端点注解

要启用对 @RedisListener 注解的支持,您可以将 @EnableRedisListeners 添加到您的一个 @Configuration 类中,如下示例所示:spring-doc.cadn.net.cn

@Configuration
@EnableRedisListeners
public class RedisConfiguration {

  @Bean
  public RedisMessageListenerContainer redisMessageListenerContainer(RedisConnectionFactory connectionFactory) {

    RedisMessageListenerContainer factory = new RedisMessageListenerContainer();
    factory.setConnectionFactory(connectionFactory);
    return factory;
  }
}
@Configuration
@EnableRedisListeners
class RedisConfiguration {

  @Bean
  fun redisMessageListenerContainer(connectionFactory: RedisConnectionFactory) =
    RedisMessageListenerContainer().apply {
      setConnectionFactory(connectionFactory);
    }
}

您可以通过实现 RedisListenerConfigurer 接口来自定义 监听器注册器。 有关详细信息和示例,请参阅实现 RedisListenerConfigurer 的类的 javadoc。spring-doc.cadn.net.cn

编程式端点注册

RedisListenerEndpoint 提供了 Redis 端点的模型,并负责为该模型配置容器。 该基础设施允许您以编程方式配置端点,除了通过 RedisListener 注解自动检测到的端点之外。 以下示例展示了如何实现这一点:spring-doc.cadn.net.cn

@Configuration
@EnableRedisListeners
public class AppConfig implements RedisListenerConfigurer {

  @Override
  public void configureRedisListeners(RedisListenerEndpointRegistrar registrar) {
    SimpleRedisListenerEndpoint endpoint = new SimpleRedisListenerEndpoint();
    endpoint.setId("myRedisEndpoint");
    endpoint.setTopic("my-channel");
    endpoint.setMessageListener((message, pattern) -> {
      // processing
    });
    registrar.registerEndpoint(endpoint);
  }
}

在上面的示例中,我们使用了 SimpleRedisListenerEndpoint,它提供了实际用于调用的 MessageListener。 然而,您也可以构建自己的端点变体,以描述自定义的调用机制。spring-doc.cadn.net.cn

请注意,您可以完全跳过使用 @RedisListener,并通过 RedisListenerConfigurer 以编程方式仅注册您的端点。spring-doc.cadn.net.cn

带注解的端点方法签名

到目前为止,我们一直在端点中注入一个简单的 String,但实际上它可以拥有非常灵活的方法签名。 在下面的示例中,我们将其重写为通过头部注入 Orderspring-doc.cadn.net.cn

@Component
public class MyService {

  @RedisListener("my-order-channels*")
  public void processOrder(Order order, @Header("pattern") Topic pattern) {
    ...
  }
}

您可以在 Redis 监听器端点中注入的主要元素如下:spring-doc.cadn.net.cn

  • 表示传入 Redis 消息的 org.springframework.messaging.Message。 请注意,此消息包含标头(由 PubSubHeaders 定义)。spring-doc.cadn.net.cn

  • @Header-annotated method arguments to extract a specific header value. 由于 Redis Pub/Sub 消息仅包含正文,因此像主题或匹配消息的模式等标头值会被合成添加为标头。spring-doc.cadn.net.cn

  • 一个 @Headers 注解的参数,该参数还必须可赋值给 java.util.Map,以便获取所有标头的访问权限。spring-doc.cadn.net.cn

  • 未加注解且不属于支持类型的元素被视为有效载荷。 您可以通过使用 @Payload 注解参数来明确这一点。 您还可以通过添加额外的 @Valid 来启用验证。spring-doc.cadn.net.cn

注入 Spring 的 Message 抽象的能力特别有用,它使我们能够从传输特定消息中存储的所有信息中受益,而无需依赖传输特定的 API。 以下示例展示了如何实现这一点:spring-doc.cadn.net.cn

@RedisListener("my-channel")
public void processOrder(Message<byte[]> order) { ... }

方法参数的处理由DefaultMessageHandlerMethodFactory提供,您可以进一步自定义它以支持额外的方法参数。 基于注解的端点支持灵活的消息转换,该功能由RedisMessageConverters提供,并附带一组默认的转换器,用于 String、字节数组和 JSON(如果类路径中存在受支持的库)。 如果您希望自定义消息转换,可以通过实现RedisListenerConfigurer并重写configureMessageConverters方法来完成,如下示例所示:spring-doc.cadn.net.cn

@Configuration
@EnableRedisListeners
public class AppConfig implements RedisListenerConfigurer {

  @Override
  public void configureMessageConverters(RedisMessageConverters.Builder builder) {
    builder.withStringConverter(StandardCharsets.UTF_8)
      .addCustomConverter(new JdkSerializationRedisSerializer());
  }
}

RedisMessageConverters 默认注册以下转换器:spring-doc.cadn.net.cn

RedisMessageConverters 使用 Spring Data Redis 的 RedisSerializers` 进行 JSON 序列化。 其 JSON 序列化格式和行为可能与 Spring Messaging 的 JacksonJsonMessageConverter 略有不同。 如果您希望使用 Spring Messaging 的变体,请通过 Builder.addCustomConverter(MessageConverter) 配置所需的转换器。

@RedisListener(consumes) 在使用不同的消息转换器以选择适当的转换器时,用于指示所需的内容类型。 下一个示例展示了假设按照上一个示例进行注册的情况下,JdkSerializationRedisSerializer 转换器的转换器选择:spring-doc.cadn.net.cn

@RedisListener(topic = "my-channel", consumes = JdkSerializerMessageConverter.APPLICATION_JAVA_SERIALIZED_OBJECT_VALUE)
public void processOrder(Person person) { ... }
对于未指定内容类型的注解端点,其负载参数转换将由能够读取消息负载的第一个转换器处理。

如果您需要对方法参数解析进行更多控制,还可以通过 MessageHandlerMethodFactory 配置自定义的 RedisListenerConfigurer。 您也可以在此处自定义转换和验证支持。spring-doc.cadn.net.cn

例如,如果我们希望在处理之前确保我们的 Order 有效,可以使用 @Valid 对负载进行注解,并配置必要的验证器,如下示例所示:spring-doc.cadn.net.cn

@Configuration
@EnableRedisListeners
public class AppConfig implements RedisListenerConfigurer {

  @Override
  public void configureRedisListeners(RedisListenerEndpointRegistrar registrar) {
    registrar.setMessageHandlerMethodFactory(myRedisHandlerMethodFactory());
  }

  @Bean
  public DefaultMessageHandlerMethodFactory myRedisHandlerMethodFactory() {
    DefaultMessageHandlerMethodFactory factory = new DefaultMessageHandlerMethodFactory();
    factory.setValidator(myValidator());
    return factory;
  }
}