对于最新的稳定版本,请使用 Spring Data Redis 4.0.4spring-doc.cadn.net.cn

Redis Streams

Redis Streams 以一种抽象的方式对日志数据结构进行建模。通常,日志是仅追加(append-only)的数据结构,可以从开头、任意位置进行消费,也可以通过流式传输新消息的方式进行消费。spring-doc.cadn.net.cn

Redis 参考文档中了解有关 Redis Streams 的更多信息。

Redis Streams 的功能大致可分为两个方面:spring-doc.cadn.net.cn

尽管这种模式与发布/订阅(Pub/Sub)有相似之处,但主要区别在于消息的持久性以及它们被消费的方式。spring-doc.cadn.net.cn

虽然发布/订阅(Pub/Sub)依赖于瞬时消息的广播(即,如果你不监听,就会错过消息),但 Redis Stream 使用一种持久化的、仅可追加的数据类型,会一直保留消息,直到流被裁剪。在消费方式上的另一个区别是,发布/订阅会在服务端注册订阅,Redis 会将到达的消息主动推送给客户端;而 Redis Stream 则需要客户端主动轮询。spring-doc.cadn.net.cn

org.springframework.data.redis.connectionorg.springframework.data.redis.stream 包提供了 Redis Streams 的核心功能。spring-doc.cadn.net.cn

追加

要发送一条记录,你可以像执行其他操作一样,使用底层的 RedisConnection 或高层的 StreamOperations。这两个实体都提供了 add(即 xAdd)方法,该方法接受记录和目标流作为参数。虽然 RedisConnection 要求传入原始数据(字节数组),但 StreamOperations 允许传入任意对象作为记录,如下例所示:spring-doc.cadn.net.cn

// append message through connection
RedisConnection con = …
byte[] stream = …
ByteRecord record = StreamRecords.rawBytes(…).withStreamKey(stream);
con.xAdd(record);

// append message through RedisTemplate
RedisTemplate template = …
StringRecord record = StreamRecords.string(…).withStreamKey("my-stream");
template.opsForStream().add(record);

流记录(Stream records)以 Map(键值对元组)作为其有效载荷。向流中追加一条记录会返回一个 RecordId,该 ID 可用作后续引用。spring-doc.cadn.net.cn

消费

在消费端,可以消费一个或多个流。Redis Streams 提供了读取命令,允许从已知流内容中的任意位置(随机访问)开始消费,也可以从流的末尾之后开始消费新的流记录。spring-doc.cadn.net.cn

在底层,RedisConnection 提供了 xReadxReadGroup 方法,分别对应 Redis 中用于读取流和在消费者组内读取流的命令。请注意,可以将多个流作为参数使用。spring-doc.cadn.net.cn

Redis 中的订阅命令可能是阻塞的。也就是说,在连接上调用 xRead 会导致当前线程阻塞,因为它开始等待消息。只有当读取命令超时或接收到消息时,该线程才会被释放。

要消费流消息,可以在应用程序代码中轮询消息,也可以使用两种通过消息监听器容器进行异步接收的方式之一:命令式(imperative)或响应式(reactive)。每当有新记录到达时,容器都会通知应用程序代码。spring-doc.cadn.net.cn

同步接收

虽然流消费通常与异步处理相关联,但也可以同步地消费消息。StreamOperations.read(…) 方法的重载版本提供了此功能。在同步接收过程中,调用线程可能会阻塞,直到有消息可用为止。属性 StreamReadOptions.block 指定了接收方在放弃等待消息之前应等待多长时间。spring-doc.cadn.net.cn

// Read message through RedisTemplate
RedisTemplate template = …

List<MapRecord<K, HK, HV>> messages = template.opsForStream().read(StreamReadOptions.empty().count(2),
				StreamOffset.latest("my-stream"));

List<MapRecord<K, HK, HV>> messages = template.opsForStream().read(Consumer.from("my-group", "my-consumer"),
				StreamReadOptions.empty().count(2),
				StreamOffset.create("my-stream", ReadOffset.lastConsumed()))

通过消息监听容器进行异步接收

由于其阻塞特性,底层轮询并不理想,因为每个消费者都需要进行连接和线程管理。为缓解这一问题,Spring Data 提供了消息监听器,它们承担了所有繁重的工作。如果你熟悉 EJB 和 JMS,应该会对这些概念感到熟悉,因为它的设计尽可能贴近 Spring Framework 及其消息驱动 POJO(MDP)所提供的支持。spring-doc.cadn.net.cn

Spring Data 提供了两种实现,专门针对所使用的编程模型进行了定制:spring-doc.cadn.net.cn

StreamMessageListenerContainerStreamReceiver 负责处理消息接收及将其分发到监听器进行处理的所有线程相关工作。消息监听器容器/接收器是消息驱动 POJO(MDP)与消息提供者之间的中介,负责注册以接收消息、资源的获取与释放、异常转换等任务。这使得作为应用程序开发人员的你可以专注于编写与接收消息(并对其作出响应)相关的(可能很复杂的)业务逻辑,而将 Redis 基础设施相关的样板代码委托给框架处理。spring-doc.cadn.net.cn

两种容器都允许在运行时进行配置更改,因此你可以在应用程序运行过程中添加或移除订阅,而无需重启应用。此外,该容器采用延迟订阅(lazy subscription)的方式,仅在需要时才使用 RedisConnection。如果所有监听器都已取消订阅,容器会自动执行清理操作,并释放线程。spring-doc.cadn.net.cn

命令式StreamMessageListenerContainer

与 EJB 世界中的消息驱动 Bean (MDB) 类似,流驱动 POJO (SDP) 充当流消息的接收者。对 SDP 的唯一限制是它必须实现 StreamListener 接口。另外请注意,如果您的 POJO 在多个线程上接收消息,确保您的实现是线程安全的是非常重要的。spring-doc.cadn.net.cn

class ExampleStreamListener implements StreamListener<String, MapRecord<String, String, String>> {

	@Override
	public void onMessage(MapRecord<String, String, String> message) {

		System.out.println("MessageId: " + message.getId());
		System.out.println("Stream: " + message.getStream());
		System.out.println("Body: " + message.getValue());
	}
}

StreamListener 表示一个函数式接口,因此其实现可以使用 Lambda 表达式形式重写:spring-doc.cadn.net.cn

message -> {

    System.out.println("MessageId: " + message.getId());
    System.out.println("Stream: " + message.getStream());
    System.out.println("Body: " + message.getValue());
};

一旦你实现了自己的 StreamListener,就该创建一个消息监听器容器并注册订阅了:spring-doc.cadn.net.cn

RedisConnectionFactory connectionFactory = …
StreamListener<String, MapRecord<String, String, String>> streamListener = …

StreamMessageListenerContainerOptions<String, MapRecord<String, String, String>> containerOptions = StreamMessageListenerContainerOptions
			.builder().pollTimeout(Duration.ofMillis(100)).build();

StreamMessageListenerContainer<String, MapRecord<String, String, String>> container = StreamMessageListenerContainer.create(connectionFactory,
				containerOptions);

Subscription subscription = container.receive(StreamOffset.fromStart("my-stream"), streamListener);

请参阅各个消息监听器容器的 Javadoc,以全面了解每个实现所支持的功能。spring-doc.cadn.net.cn

响应式StreamReceiver

流式数据源的响应式消费通常通过事件或消息的 Flux 来实现。响应式接收器的实现由 StreamReceiver 及其重载的 receive(…) 方法提供。与 StreamMessageListenerContainer 相比,这种响应式方法所需的基础设施资源(例如线程)更少,因为它利用了驱动程序提供的线程资源。接收的数据流是一个按需驱动的 StreamMessage 发布者:spring-doc.cadn.net.cn

Flux<MapRecord<String, String, String>> messages = …

return messages.doOnNext(it -> {
    System.out.println("MessageId: " + message.getId());
    System.out.println("Stream: " + message.getStream());
    System.out.println("Body: " + message.getValue());
});

现在我们需要创建 StreamReceiver 并注册一个订阅以消费流消息:spring-doc.cadn.net.cn

ReactiveRedisConnectionFactory connectionFactory = …

StreamReceiverOptions<String, MapRecord<String, String, String>> options = StreamReceiverOptions.builder().pollTimeout(Duration.ofMillis(100))
				.build();
StreamReceiver<String, MapRecord<String, String, String>> receiver = StreamReceiver.create(connectionFactory, options);

Flux<MapRecord<String, String, String>> messages = receiver.receive(StreamOffset.fromStart("my-stream"));

请参阅各个消息监听器容器的 Javadoc,以全面了解每个实现所支持的功能。spring-doc.cadn.net.cn

需求驱动的消费使用背压信号来激活和停用轮询。StreamReceiver 订阅在需求得到满足时会暂停轮询,直到订阅者发出进一步的需求信号。根据 ReadOffset 策略的不同,这可能会导致消息被跳过。

Acknowledge策略

当您通过消息阅读时Consumer Group,服务器将记住给定消息已送达,并将其添加到待处理条目列表(PEL)中。这是一个已送达但尚未确认的消息列表。
消息必须通过以下方式确认:StreamOperations.acknowledge为了从待处理条目列表中移除,如下代码片段所示。spring-doc.cadn.net.cn

StreamMessageListenerContainer<String, MapRecord<String, String, String>> container = ...

container.receive(Consumer.from("my-group", "my-consumer"), (1)
	StreamOffset.create("my-stream", ReadOffset.lastConsumed()),
    msg -> {

	    // ...
	    redisTemplate.opsForStream().acknowledge("my-group", msg); (2)
    });
1 从组 my-group 中以 my-consumer 身份读取。接收到的消息不会被确认。
2 在处理完消息后进行确认。
要自动确认接收到的消息,请使用 receiveAutoAck 而不是 receive

ReadOffset策略

流读取操作接受一个读取偏移量规范,用于从指定偏移量开始消费消息。ReadOffset 表示该读取偏移量规范。Redis 支持三种类型的偏移量,具体取决于您是独立消费流还是在消费者组内消费流:spring-doc.cadn.net.cn

在基于消息容器的消费场景中,我们在消费一条消息时需要推进(或递增)读取偏移量。偏移量的推进方式取决于所请求的 ReadOffset 以及消费模式(是否使用消费者组)。下表说明了容器如何推进 ReadOffsetspring-doc.cadn.net.cn

表1. ReadOffset 推进
读取偏移量 独立运行 消费者组

最新spring-doc.cadn.net.cn

读取最新消息spring-doc.cadn.net.cn

读取最新消息spring-doc.cadn.net.cn

特定消息 IDspring-doc.cadn.net.cn

使用最后看到的消息作为下一个 MessageIdspring-doc.cadn.net.cn

使用最后看到的消息作为下一个 MessageIdspring-doc.cadn.net.cn

最近消费spring-doc.cadn.net.cn

使用最后看到的消息作为下一个 MessageIdspring-doc.cadn.net.cn

按消费者组划分的最后一条已消费消息spring-doc.cadn.net.cn

从特定的消息 ID 开始读取以及从上次已消费的消息开始读取,可被视为安全操作,能够确保消费所有已追加到流中的消息。 若使用最新消息进行读取,则可能会跳过在轮询操作处于空闲期(dead time)时添加到流中的消息。轮询会引入一段空闲期,在此期间,消息可能在各次轮询命令之间到达。流的消费并非线性的连续读取,而是被拆分为多次重复的 XREAD 调用。spring-doc.cadn.net.cn

序列化

发送到流中的任何 Record 都需要序列化为其二进制格式。由于流与哈希数据结构密切相关,因此流的键、字段名和值会使用在 RedisTemplate 上配置的相应序列化器。spring-doc.cadn.net.cn

表2. 流序列化
流属性 序列化器 描述

spring-doc.cadn.net.cn

键序列化器spring-doc.cadn.net.cn

用于 Record#getStream()spring-doc.cadn.net.cn

字段spring-doc.cadn.net.cn

hashKeySerializerspring-doc.cadn.net.cn

用于负载中每个映射键spring-doc.cadn.net.cn

spring-doc.cadn.net.cn

hashValueSerializerspring-doc.cadn.net.cn

用于有效载荷中每个映射值spring-doc.cadn.net.cn

请务必检查所使用的 RedisSerializer,并注意:如果您决定不使用任何序列化器,则需要确保这些值已经是二进制格式。spring-doc.cadn.net.cn

对象映射

简单值

StreamOperations 允许通过 ObjectRecord 将简单值直接追加到流中,而无需将这些值放入 Map 结构中。 该值随后会被分配到一个 payload 字段中,并在读取该值时可以被提取出来。spring-doc.cadn.net.cn

ObjectRecord<String, String> record = StreamRecords.newRecord()
    .in("my-stream")
    .ofObject("my-value");

redisTemplate()
    .opsForStream()
    .add(record); (1)

List<ObjectRecord<String, String>> records = redisTemplate()
    .opsForStream()
    .read(String.class, StreamOffset.fromStart("my-stream"));
1 XADD my-stream * "_class" "java.lang.String" "_raw" "my-value"

ObjectRecord 经历与其他所有记录完全相同的序列化过程,因此也可以通过返回 MapRecord 的非类型化读取操作来获取该记录。spring-doc.cadn.net.cn

复杂值

向流中添加复杂值可以通过以下3种方式完成:spring-doc.cadn.net.cn

第一种变体是最直接的方式,但它忽略了流结构所提供的字段值能力,尽管如此,流中的值对其他消费者仍然是可读的。 第二种选项与第一种具有相同的优点,但可能会导致消费者受到非常具体的限制,因为所有消费者都必须实现完全相同的序列化机制。 HashMapper 方法则稍微复杂一些,它利用了流的哈希结构,但会将源数据扁平化。只要选择了合适的序列化器组合,其他消费者仍然能够读取这些记录。spring-doc.cadn.net.cn

HashMappers 将有效载荷转换为具有特定类型的 Map。请确保使用能够对哈希进行(反)序列化的哈希键(Hash-Key)和哈希值(Hash-Value)序列化器。
ObjectRecord<String, User> record = StreamRecords.newRecord()
    .in("user-logon")
    .ofObject(new User("night", "angel"));

redisTemplate()
    .opsForStream()
    .add(record); (1)

List<ObjectRecord<String, User>> records = redisTemplate()
    .opsForStream()
    .read(User.class, StreamOffset.fromStart("user-logon"));
1 XADD user-logon * "_class" "com.example.User" "firstname" "夜" "lastname" "天使"

StreamOperations 默认使用 ObjectHashMapper。 在获取 HashMapper 时,您可以提供一个符合您需求的 StreamOperationsspring-doc.cadn.net.cn

redisTemplate()
    .opsForStream(new Jackson2HashMapper(true))
    .add(record); (1)
1 XADD user-logon * "firstname" "夜" "@class" "com.example.User" "lastname" "天使"

StreamMessageListenerContainer 可能无法感知在领域类型上使用的任何 @TypeAlias 注解,因为这些别名需要通过 MappingContext 进行解析。 请确保使用 RedisMappingContext 初始化 initialEntitySetspring-doc.cadn.net.cn

@Bean
RedisMappingContext redisMappingContext() {
    RedisMappingContext ctx = new RedisMappingContext();
    ctx.setInitialEntitySet(Collections.singleton(Person.class));
    return ctx;
}

@Bean
RedisConverter redisConverter(RedisMappingContext mappingContext) {
    return new MappingRedisConverter(mappingContext);
}

@Bean
ObjectHashMapper hashMapper(RedisConverter converter) {
    return new ObjectHashMapper(converter);
}

@Bean
StreamMessageListenerContainer streamMessageListenerContainer(RedisConnectionFactory connectionFactory, ObjectHashMapper hashMapper) {
    StreamMessageListenerContainerOptions<String, ObjectRecord<String, Object>> options = StreamMessageListenerContainerOptions.builder()
            .objectMapper(hashMapper)
            .build();

    return StreamMessageListenerContainer.create(connectionFactory, options);
}