此版本仍在开发中,尚不被认为是稳定的。对于最新的稳定版本,请使用 Spring Data Redis 3.5.3spring-doc.cadn.net.cn

Redis 流

Redis Streams 以抽象方法对日志数据结构进行建模。通常,日志是仅追加的数据结构,从头开始、随机位置或通过流式传输新消息来使用。spring-doc.cadn.net.cn

Redis 参考文档中了解有关 Redis Streams 的更多信息。

Redis Streams 大致可以分为两个功能领域:spring-doc.cadn.net.cn

尽管这种模式与 Pub/Sub 有相似之处,但主要区别在于消息的持久性及其使用方式。spring-doc.cadn.net.cn

虽然 Pub/Sub 依赖于暂时性消息的广播(即,如果您不收听,就会错过消息),但 Redis Stream 使用持久的、仅追加的数据类型,该数据类型会保留消息,直到流被修剪为止。消耗的另一个区别是 Pub/Sub 注册服务器端订阅。Redis 将到达的消息推送到客户端,而 Redis Streams 需要主动轮询。spring-doc.cadn.net.cn

org.springframework.data.redis.connectionorg.springframework.data.redis.stream包为 Redis Streams 提供核心功能。spring-doc.cadn.net.cn

附加

要发送记录,您可以像其他作一样使用低级RedisConnection或高级StreamOperations.这两个实体都提供add (xAdd) 方法,该方法接受记录和目标流作为参数。而RedisConnection需要原始数据(字节数组),则StreamOperations允许将任意对象作为记录传入,如以下示例所示:spring-doc.cadn.net.cn

// append message through connection
RedisConnection con = …
byte[] stream = …
ByteRecord record = StreamRecords.rawBytes(…).withStreamKey(stream);
con.xAdd(record);

// append message through RedisTemplate
RedisTemplate template = …
StringRecord record = StreamRecords.string(…).withStreamKey("my-stream");
template.opsForStream().add(record);

流记录带有Map,键值元组作为其有效负载。将记录附加到流返回RecordId这可以作为进一步的参考。spring-doc.cadn.net.cn

消费

在消费方面,可以使用一个或多个流。Redis Streams 提供读取命令,允许从已知流内容内和流端之外的任意位置(随机访问)使用流,以使用新的流记录。spring-doc.cadn.net.cn

在低层,RedisConnection提供xReadxReadGroup映射 Redis 命令的方法,分别用于在消费者组内读取和读取。请注意,可以将多个流用作参数。spring-doc.cadn.net.cn

Redis 中的订阅命令可能会阻塞。也就是说,调用xRead在连接上会导致当前线程在开始等待消息时阻塞。仅当读取命令超时或收到消息时,才会释放线程。

要使用流消息,可以在应用程序代码中轮询消息,或者通过消息侦听器容器使用两种异步接收之一,命令式或响应式接收。每次有新记录到达时,容器都会通知应用程序代码。spring-doc.cadn.net.cn

同步接收

虽然流消耗通常与异步处理相关联,但可以同步使用消息。超载的StreamOperations.read(…)方法提供了此功能。在同步接收期间,调用线程可能会阻塞,直到消息可用。该物业StreamReadOptions.block指定接收方在放弃等待消息之前应等待多长时间。spring-doc.cadn.net.cn

// Read message through RedisTemplate
RedisTemplate template = …

List<MapRecord<K, HK, HV>> messages = template.opsForStream().read(StreamReadOptions.empty().count(2),
				StreamOffset.latest("my-stream"));

List<MapRecord<K, HK, HV>> messages = template.opsForStream().read(Consumer.from("my-group", "my-consumer"),
				StreamReadOptions.empty().count(2),
				StreamOffset.create("my-stream", ReadOffset.lastConsumed()))

通过消息侦听器容器异步接收

由于其阻塞性质,低级轮询并不吸引人,因为它需要为每个消费者进行连接和线程管理。为了缓解这个问题,Spring Data 提供了消息侦听器,它可以完成所有繁重的工作。如果您熟悉 EJB 和 JMS,您应该会发现这些概念很熟悉,因为它被设计为尽可能接近 Spring Framework 及其消息驱动 POJO (MDP) 中的支持。spring-doc.cadn.net.cn

Spring Data 附带了两个针对所使用的编程模型量身定制的实现:spring-doc.cadn.net.cn

StreamMessageListenerContainerStreamReceiver负责消息接收的所有线程,并分派到侦听器中进行处理。消息侦听器容器/接收器是 MDP 和消息传递提供程序之间的中介,负责注册以接收消息、资源获取和释放、异常转换等。这使您作为应用程序开发人员编写与接收消息(并对其做出反应)相关的(可能复杂的)业务逻辑,并将样板 Redis 基础设施问题委托给框架。spring-doc.cadn.net.cn

这两个容器都允许运行时配置更改,以便您可以在应用程序运行时添加或删除订阅,而无需重新启动。此外,容器使用延迟订阅方法,使用RedisConnection仅在需要时。如果所有侦听器都已取消订阅,则会自动执行清理,并释放线程。spring-doc.cadn.net.cn

祈使的StreamMessageListenerContainer

以类似于 EJB 世界中的消息驱动 Bean (MDB) 的方式,Stream-Driven POJO (SDP) 充当 Stream 消息的接收器。SDP 的一个限制是它必须实现StreamListener接口。另请注意,如果您的 POJO 在多个线程上接收消息,请务必确保您的实现是线程安全的。spring-doc.cadn.net.cn

class ExampleStreamListener implements StreamListener<String, MapRecord<String, String, String>> {

	@Override
	public void onMessage(MapRecord<String, String, String> message) {

		System.out.println("MessageId: " + message.getId());
		System.out.println("Stream: " + message.getStream());
		System.out.println("Body: " + message.getValue());
	}
}

StreamListener表示一个功能接口,因此可以使用其 Lambda 形式重写实现:spring-doc.cadn.net.cn

message -> {

    System.out.println("MessageId: " + message.getId());
    System.out.println("Stream: " + message.getStream());
    System.out.println("Body: " + message.getValue());
};

一旦你实现了StreamListener,是时候创建一个消息侦听器容器并注册订阅了:spring-doc.cadn.net.cn

RedisConnectionFactory connectionFactory = …
StreamListener<String, MapRecord<String, String, String>> streamListener = …

StreamMessageListenerContainerOptions<String, MapRecord<String, String, String>> containerOptions = StreamMessageListenerContainerOptions
			.builder().pollTimeout(Duration.ofMillis(100)).build();

StreamMessageListenerContainer<String, MapRecord<String, String, String>> container = StreamMessageListenerContainer.create(connectionFactory,
				containerOptions);

Subscription subscription = container.receive(StreamOffset.fromStart("my-stream"), streamListener);

请参阅各种消息侦听器容器的 Javadoc,了解每个实现支持的功能的完整描述。spring-doc.cadn.net.cn

反应性的StreamReceiver

流数据源的响应式消耗通常通过Flux事件或消息。响应式接收器实现提供了StreamReceiver及其过载receive(…)消息。与StreamMessageListenerContainer因为它正在利用驱动程序提供的线程资源。接收流是需求驱动的发布者StreamMessage:spring-doc.cadn.net.cn

Flux<MapRecord<String, String, String>> messages = …

return messages.doOnNext(it -> {
    System.out.println("MessageId: " + message.getId());
    System.out.println("Stream: " + message.getStream());
    System.out.println("Body: " + message.getValue());
});

现在我们需要创建StreamReceiver并注册订阅以使用流消息:spring-doc.cadn.net.cn

ReactiveRedisConnectionFactory connectionFactory = …

StreamReceiverOptions<String, MapRecord<String, String, String>> options = StreamReceiverOptions.builder().pollTimeout(Duration.ofMillis(100))
				.build();
StreamReceiver<String, MapRecord<String, String, String>> receiver = StreamReceiver.create(connectionFactory, options);

Flux<MapRecord<String, String, String>> messages = receiver.receive(StreamOffset.fromStart("my-stream"));

请参阅各种消息侦听器容器的 Javadoc,了解每个实现支持的功能的完整描述。spring-doc.cadn.net.cn

需求驱动的消耗使用背压信号来激活和停用轮询。StreamReceiver如果满足需求,则订阅将暂停轮询,直到订阅者发出进一步的需求信号。根据ReadOffset策略,这可能会导致跳过消息。

Acknowledge策略

当您通过Consumer Group,服务器将记住给定消息已传递,并将其添加到待处理条目列表 (PEL) 中。已传递但尚未确认的消息列表。
消息必须通过
StreamOperations.acknowledge以便从待处理条目列表中删除,如下面的代码片段所示。spring-doc.cadn.net.cn

StreamMessageListenerContainer<String, MapRecord<String, String, String>> container = ...

container.receive(Consumer.from("my-group", "my-consumer"), (1)
	StreamOffset.create("my-stream", ReadOffset.lastConsumed()),
    msg -> {

	    // ...
	    redisTemplate.opsForStream().acknowledge("my-group", msg); (2)
    });
1 从 group my-group 中读取为 my-consumer接收到的消息不会被确认。
2 处理后确认消息。
在接收时自动确认消息receiveAutoAck而不是receive.

ReadOffset策略

流读取作接受读取偏移量规范,以使用给定偏移量中的消息。ReadOffset表示读取偏移规格。Redis 支持 3 种偏移量变体,具体取决于您是独立使用流还是在消费者组中使用流:spring-doc.cadn.net.cn

在基于消息容器的消费上下文中,我们需要在消费消息时提前(或增加)读取偏移量。前进取决于请求ReadOffset和消费模式(有/没有消费群体)。以下矩阵说明了容器如何前进ReadOffset:spring-doc.cadn.net.cn

表 1.读取偏移推进
读取偏移量 独立 消费群体

最近的spring-doc.cadn.net.cn

阅读最新消息spring-doc.cadn.net.cn

阅读最新消息spring-doc.cadn.net.cn

特定消息 IDspring-doc.cadn.net.cn

使用上次查看的消息作为下一个 MessageIdspring-doc.cadn.net.cn

使用上次查看的消息作为下一个 MessageIdspring-doc.cadn.net.cn

上次消耗spring-doc.cadn.net.cn

使用上次查看的消息作为下一个 MessageIdspring-doc.cadn.net.cn

根据使用者组的上次使用的消息spring-doc.cadn.net.cn

从特定消息 ID 和最后使用的消息读取可以被视为安全作,可确保使用附加到流的所有消息。 使用最新消息进行读取可以跳过在轮询作处于死区状态时添加到流中的消息。轮询引入了一个死区时间,在此期间,消息可以在各个轮询命令之间到达。流消耗不是线性连续读取,而是拆分为重复XREAD调用。spring-doc.cadn.net.cn

序列化

发送到流的任何记录都需要序列化为其二进制格式。由于流与哈希数据结构的接近,流键、字段名称和值使用RedisTemplate.spring-doc.cadn.net.cn

表 2.流序列化
流属性 序列化器 描述

钥匙spring-doc.cadn.net.cn

键序列化器spring-doc.cadn.net.cn

用于Record#getStream()spring-doc.cadn.net.cn

spring-doc.cadn.net.cn

hashKey序列化器spring-doc.cadn.net.cn

用于有效负载中的每个映射键spring-doc.cadn.net.cn

spring-doc.cadn.net.cn

hashValue序列化器spring-doc.cadn.net.cn

用于有效负载中的每个映射值spring-doc.cadn.net.cn

请务必查看RedisSerializers 的使用中,请注意,如果您决定不使用任何序列化程序,则需要确保这些值已经是二进制的。spring-doc.cadn.net.cn

对象映射

简单值

StreamOperations允许附加简单值,通过ObjectRecord,直接发送到流中,而无需将这些值放入Map结构。 然后,该值将分配给有效负载字段,并在读回值时提取该值。spring-doc.cadn.net.cn

ObjectRecord<String, String> record = StreamRecords.newRecord()
    .in("my-stream")
    .ofObject("my-value");

redisTemplate()
    .opsForStream()
    .add(record); (1)

List<ObjectRecord<String, String>> records = redisTemplate()
    .opsForStream()
    .read(String.class, StreamOffset.fromStart("my-stream"));
1 XADD my-stream * “_class” “java.lang.String” “_raw” “my-value”

ObjectRecords 会通过与所有其他记录完全相同的序列化过程,因此也可以使用返回MapRecord.spring-doc.cadn.net.cn

复数值

可以通过 3 种方式向流添加复杂值:spring-doc.cadn.net.cn

第一个变体是最直接的变体,但忽略了流结构提供的字段值功能,流中的值仍然可供其他消费者读取。 第二个选项具有与第一个选项相同的好处,但可能会导致非常具体的消费者限制,因为所有消费者都必须实现完全相同的序列化机制。 这HashMapper方法是利用 Steam 哈希结构但扁平化源的更复杂的方法。还有一些消费者仍然能够读取记录,只要选择合适的序列化器组合。spring-doc.cadn.net.cn

HashMapper 将有效负载转换为Map具有特定类型。确保使用能够(反)序列化哈希的 Hash-Key 和 Hash-Value 序列化器。
ObjectRecord<String, User> record = StreamRecords.newRecord()
    .in("user-logon")
    .ofObject(new User("night", "angel"));

redisTemplate()
    .opsForStream()
    .add(record); (1)

List<ObjectRecord<String, User>> records = redisTemplate()
    .opsForStream()
    .read(User.class, StreamOffset.fromStart("user-logon"));
1 XADD 用户登录 * “_class” “com.example.User” “firstname” “night” “lastname” “angel”

StreamOperations默认使用 ObjectHashMapper。 您可以提供HashMapper适合您获取时的要求StreamOperations.spring-doc.cadn.net.cn

redisTemplate()
    .opsForStream(new Jackson2HashMapper(true))
    .add(record); (1)
1 XADD 用户登录 * “firstname” “night” “@class” “com.example.User” “lastname” “angel”

一个StreamMessageListenerContainer可能不知道任何@TypeAlias用于域类型,因为这些需要通过MappingContext. 确保初始化RedisMappingContext使用initialEntitySet.spring-doc.cadn.net.cn

@Bean
RedisMappingContext redisMappingContext() {
    RedisMappingContext ctx = new RedisMappingContext();
    ctx.setInitialEntitySet(Collections.singleton(Person.class));
    return ctx;
}

@Bean
RedisConverter redisConverter(RedisMappingContext mappingContext) {
    return new MappingRedisConverter(mappingContext);
}

@Bean
ObjectHashMapper hashMapper(RedisConverter converter) {
    return new ObjectHashMapper(converter);
}

@Bean
StreamMessageListenerContainer streamMessageListenerContainer(RedisConnectionFactory connectionFactory, ObjectHashMapper hashMapper) {
    StreamMessageListenerContainerOptions<String, ObjectRecord<String, Object>> options = StreamMessageListenerContainerOptions.builder()
            .objectMapper(hashMapper)
            .build();

    return StreamMessageListenerContainer.create(connectionFactory, options);
}