基于webflux推送消息给前端的实践
webflux推送消息给前端页面的demo
依赖
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-webflux</artifactId>
</dependency>
html
<!DOCTYPE html>
<html>
<head>
<title>SSE Demo</title>
<script type="text/javascript">
if (typeof(EventSource) !== "undefined") {
var source = new EventSource("http://localhost:10086/stream-sse");
source.onopen = function(event) {
console.log("Connection to server opened.");
};
source.onerror = function(event) {
console.error("EventSource failed.", event);
};
source.addEventListener('periodic-event', function(event) {
console.log("reseive data.");
var dataElement = document.getElementById("sse-data");
dataElement.innerHTML += event.data + "<br>";
}, false);
source.onmessage = function(event) {
console.log("reseive data.");
var dataElement = document.getElementById("sse-data");
dataElement.innerHTML += event.data + "<br>";
};
} else {
document.getElementById("sse-data").innerHTML = "Sorry, your browser does not support server-sent events...";
}
</script>
<script type="text/javascript">
if (typeof(EventSource) !== "undefined") {
var source = new EventSource("http://localhost:10086/stream-sse2");
source.onopen = function(event) {
console.log("Connection to server opened2.");
};
source.onerror = function(event) {
console.error("EventSource failed2.", event);
};
source.addEventListener('periodic-event2', function(event) {
console.log("reseive data2.addEventListener");
var dataElement = document.getElementById("sse-data2");
dataElement.innerHTML += event.data + "<br>";
}, false);
source.onmessage = function(event) {
console.log("reseive data2.onmessage");
var dataElement = document.getElementById("sse-data2");
dataElement.innerHTML += event.data + "<br>";
if (event.data.endsWith('.')) {
eventSource.close();
console.log('connection is closed');
}
};
} else {
document.getElementById("sse-data").innerHTML = "Sorry, your browser does not support server-sent events...";
}
</script>
</head>
<body>
<div id="sse-data">Loading...</div>
<div id="sse-data2">Loading2...</div>
</body>
</html>
在这段代码中,我们添加了一个事件监听器来处理由服务器端发送的 “periodic-event” 类型的事件。现在,每当服务器发送这种类型的事件时,客户端都会接收到它,并更新页面上的内容。
事件格式问题:确保服务器发送的事件遵循正确的SSE格式。每条消息应该以 "data: " 开头,后面跟着实际的数据,并以两个连续的换行符结束。
自定义事件类型:如果服务器发送的是自定义事件类型而不是默认的 “message” 类型,那么您需要在客户端监听这个特定的事件类型而不是使用 onmessage。例如,如果事件类型是 “periodic-event”,您应该这样做:
source.addEventListener('periodic-event', function(event) {
var dataElement = document.getElementById("sse-data");
dataElement.innerHTML += event.data + "<br>";
}, false);
controller
static ThreadLocal<Integer> tl = new TransmittableThreadLocal<>();
static AtomicInteger i = new AtomicInteger(0);
@GetMapping("/stream-sse")
public Flux<ServerSentEvent<String>> streamEvents() {
tl.set(i.incrementAndGet());
log.info("streamEvents"+tl.get());
AtomicLong counter = new AtomicLong();
return Flux.interval(Duration.ofSeconds(1))
.map(seq -> Tuples.of(seq, LocalTime.now()))
.map(data -> ServerSentEvent.<String>builder()
.id(Long.toString(counter.incrementAndGet()))
.event("periodic-event")
.data("SSE"+tl.get()+" - " + data.getT2().toString())
.build());
}
尽管 SSE 旨在将事件从服务器发送到客户端,但可以使用 GET 查询参数将数据从客户端传递到服务器。
var eventSource = new EventSource('/sse?event=type1);
... eventSource.close(); eventSource = new EventSource('/sse?event=type1&event=type2);
Server-Sent Events (SSE) 和 WebSocket 都是现代浏览器提供的两种不同的实时通信技术,它们各自有优缺点,并在资源消耗方面也有所不同。
Server-Sent Events (SSE):
优点:
- 简单易用:SSE API 使用起来比较简单,只需要设置一个EventSource对象即可。
- 内建支持重连:SSE 自动处理断线重连。
- HTTP协议:SSE 基于标准的HTTP协议,更容易与现有的Web架构和安全模型(如SSL/TLS)集成。
- 无需额外协议支持:与WebSocket相比,SSE不需要服务器支持新的协议。
- 单向通信:对于只需要服务器到客户端的单向数据流非常有效。
缺点:
- 单向通信:SSE 不支持浏览器到服务器的通信,如果需要双向通信,则要另外使用传统的AJAX请求。
- 受限于浏览器连接数限制:浏览器对同一域名的并发连接数有限制,这可能会限制SSE的使用。
许多浏览器允许打开数量非常有限的 SSE 连接(Chrome、Firefox 每个浏览器最多 6 个连接) - 不是所有浏览器都支持:尽管大多数现代浏览器支持SSE,但部分旧版浏览器不支持。
- 只发送文本消息;尽管可以使用 Base64 编码和 gzip 压缩来发送二进制消息,但效率很低。
资源消耗:
SSE 连接在服务器上保持打开状态,每个客户端都需要一个独立的TCP连接。虽然头信息较小,但如果客户端数量很多,会占用大量的服务器资源。
WebSocket:
优点:
- 双向通信:WebSocket 提供了一个全双工的通信渠道,允许数据同时在两个方向上传输。
- 更少的开销:一旦WebSocket连接建立,消息传递的开销非常小,特别适合频繁和/或实时的通信。
- 更好的控制:WebSocket API 提供了更细粒度的控制,包括错误处理、关闭连接等。
- 广泛支持:被大多数现代浏览器支持。
缺点:
- 复杂性高:需要在客户端和服务器端实现WebSocket协议。
- 握手开销:初始化WebSocket连接时需要进行一次HTTP握手。
- 不利于传统HTTP服务器:由于长时间保持连接,可能需要专门的服务器或配置来优化性能和资源使用。
资源消耗:
WebSocket 保持一个全双工的连接,虽然消息传递效率高,但每个客户端都需要一个持久的连接,这会占用服务器资源。对于需要维护大量活跃连接的应用来说,服务器的内存和网络带宽成本可能会更高。
总结:
选择SSE还是WebSocket取决于具体的应用场景。如果应用只需要服务器到客户端的单向数据流,且对客户端数量没有太高要求,SSE是一个简单高效的选择。而如果应用需要双向通信,或者客户端之间需要互相通信,WebSocket则是更好的选择。在考虑哪种技术时,还应该评估服务器的资源和性能,以及是否需要处理复杂的连接管理。
sse什么时候触发关闭
Server-Sent Events (SSE) 连接在以下情况下可能会关闭:
-
服务器端关闭: 服务器可以通过关闭HTTP连接来主动结束SSE流。这通常是通过响应完成或关闭底层套接字来实现的。
-
客户端关闭: 客户端可以通过调用 EventSource 对象的 close() 方法来关闭SSE连接。
javascript
var source = new EventSource('/stream-sse');
// ... 在需要的时候
source.close();
-
网络错误: 网络中断或其他类型的连接问题(如代理服务器断开连接)会导致SSE连接关闭。
-
浏览器导航: 用户导航到另一个页面或关闭浏览器标签/窗口,也会导致SSE连接关闭。
-
超时: 如果没有数据传输,某些服务器或代理可能会因为超时而关闭连接。为了避免这种情况,服务器可以定期发送注释行(例如,冒号后跟换行符 :\n\n)作为心跳消息,以保持连接活跃。
-
达到最大重连次数: SSE自动处理重连机制,如果连接断开,浏览器会尝试重新连接。但是,如果达到了浏览器设定的最大重连次数,或者服务器持续报告错误状态码(如HTTP 500),连接可能不会再次建立。
当SSE连接关闭时,浏览器不会自动重新打开它,除非是由于网络错误或其他非正常关闭情况。在这些情况下,浏览器会根据EventSource对象的reconnect属性(默认是自动的)尝试重新连接。开发者可以通过监听onerror事件来捕获连接关闭的情况,并采取适当的措施,比如手动重新建立连接或通知用户。
前端连接关闭时,后端的flux会自动关闭吗
-
在Spring WebFlux中,当一个前端连接(如SSE客户端)关闭时,服务器端的Flux流通常会自动取消订阅,这意味着与该特定客户端关联的数据流将不再产生新的数据。这是因为Spring WebFlux框架内部处理了请求生命周期,并且能够感知到客户端断开连接的事件。
-
在响应式编程中,当所有订阅者都取消订阅时,Flux或Mono序列就会完成其生命周期。对于HTTP连接,例如Server-Sent Events (SSE),当客户端断开连接时,底层的网络库(如Netty)会通知Spring WebFlux框架,然后框架会取消对Flux流的订阅。这样做可以释放相关资源,并避免不必要的数据生成和处理。
-
尽管如此,还是有可能出现某些情况导致服务器端的资源没有立即释放。例如,如果您在服务器端创建了一个热源(Hot Source)或者使用了某些缓存机制,那么即使没有订阅者,数据仍然可能在服务器上生成并被缓存。因此,最佳实践是确保资源的使用和释放逻辑符合您的应用程序需求,并且在设计系统时考虑到资源管理。
如果需要显式地处理资源释放,可以通过添加相应的逻辑来手动清理资源。例如,在Flux的doOnCancel或doFinally操作符中执行清理代码:
return flux.doFinally(signalType -> {
if (signalType == SignalType.CANCEL) {
// 执行清理逻辑
}
});
在Spring WebFlux中,您可以创建一个Flux数据流,该数据流是由外部接口推送的数据生成的。这通常涉及到处理服务器发送事件(SSE),或者通过某种方式从外部服务获取数据并将其转换为响应式数据流。
请注意,如果您的数据源是外部接口,则需要编写代码以侦听外部接口的数据并将其转化为Flux流。这可能涉及到与外部系统集成,例如通过WebSocket、MQTT、AMQP等协议接收数据,然后将其包装为Flux以供进一步处理和传输。
询mysql数据库并将数据转化为Flux流,该怎么写
依赖
<!-- 添加Spring Boot的R2DBC Starter -->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-data-r2dbc</artifactId>
</dependency>
<!-- 添加R2DBC MySQL驱动 -->
<dependency>
<groupId>dev.miku</groupId>
<artifactId>r2dbc-mysql</artifactId>
</dependency>
配置
# application.properties
spring.r2dbc.url=r2dbc:mysql://localhost:3306/your_database
spring.r2dbc.username=your_username
spring.r2dbc.password=your_password
代码
创建一个实体类和响应式存储库接口:
import org.springframework.data.annotation.Id;
import org.springframework.data.relational.core.mapping.Table;
@Table("your_table")
public class YourEntity {
@Id
private Long id;
// 其他属性...
}
import org.springframework.data.repository.reactive.ReactiveCrudRepository;
public interface YourEntityRepository extends ReactiveCrudRepository<YourEntity, Long> {
// 自定义查询方法...
}
创建一个控制器来执行查询并返回Flux流
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RestController;
import reactor.core.publisher.Flux;
@RestController
public class YourEntityController {
private final YourEntityRepository repository;
public YourEntityController(YourEntityRepository repository) {
this.repository = repository;
}
@GetMapping("/entities")
public Flux<YourEntity> getAllEntities() {
return repository.findAll();
}
}
spring data r2dbc 查询时读的是快照还是实时数据
-
Spring Data R2DBC 在查询时读取的是数据库在执行查询那一刻的实时数据,而不是快照数据。这意味着如果数据库中的数据在查询执行的过程中发生了变化,查询返回的结果将反映这些变化。
-
R2DBC(Reactive Relational Database Connectivity)是一个基于响应式编程原则建立的数据库访问规范,它使用非阻塞和背压机制来支持数据流的处理。当你使用Spring Data R2DBC执行查询时,它会创建一个数据库连接,并在该连接上执行SQL查询以获取最新的数据。
-
然而,对于某些数据库管理系统来说,可以通过隔离级别来控制事务看到的数据是“快照”还是能够看到其他并发事务所做的更改。例如,在可重复读(REPEATABLE READ)隔离级别下,事务在首次读取数据后,后续读取会看到相同的数据快照,即使其他事务已经提交了更新。而在读已提交(READ COMMITTED)隔离级别下,每次读取都可能看到最新提交的数据。
-
在使用Spring Data R2DBC时,默认情况下,事务隔离级别通常由底层数据库管理系统或连接池决定。如果需要特定的隔离级别行为,你可能需要根据数据库文档来配置事务管理器或在声明事务时指定隔离级别。
总之,Spring Data R2DBC 执行查询时默认读取的是实时数据,但具体的行为可能受到数据库事务隔离级别设置的影响。
消费kafka的topic,并将消息转化为Flux流,该怎么写
Spring for Apache Kafka
依赖
dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
</dependency>
实现
@Service
public class KafkaService {
private final BlockingQueue<String> queue = new LinkedBlockingQueue<>();
@KafkaListener(topics = "your-topic", groupId = "webflux-group")
public void listen(String message) {
queue.offer(message);
}
public Flux<String> consumeMessages() {
return Flux.create(sink -> {
while (true) {
try {
String message = queue.take(); // Blocks until a message is available
sink.next(message);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
sink.error(e);
}
}
}, FluxSink.OverflowStrategy.BUFFER);
}
}
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RestController;
import reactor.core.publisher.Flux;
@RestController
public class KafkaController {
private final KafkaService kafkaService;
public KafkaController(KafkaService kafkaService) {
this.kafkaService = kafkaService;
}
@GetMapping(value = "/kafka-stream", produces = MediaType.TEXT_EVENT_STREAM_VALUE)
public Flux<String> getKafkaStream() {
return kafkaService.consumeMessages();
}
}
Reactor Kafka
依赖
<dependency>
<groupId>io.projectreactor.kafka</groupId>
<artifactId>reactor-kafka</artifactId>
</dependency>
代码
创建一个Kafka消费者Flux流:
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.common.serialization.StringDeserializer;
import reactor.core.publisher.Flux;
import reactor.kafka.receiver.KafkaReceiver;
import reactor.kafka.receiver.ReceiverOptions;
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
public class KafkaFluxService {
public Flux<ConsumerRecord<String, String>> createKafkaFlux(String bootstrapServers, String topic) {
Map<String, Object> props = new HashMap<>();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
props.put(ConsumerConfig.CLIENT_ID_CONFIG, "client-id");
props.put(ConsumerConfig.GROUP_ID_CONFIG, "group-id");
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
ReceiverOptions<String, String> receiverOptions = ReceiverOptions.create(props)
.subscription(Collections.singleton(topic));
return KafkaReceiver.create(receiverOptions).receive();
}
}
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RestController;
import reactor.core.publisher.Flux;
@RestController
public class KafkaFluxController {
private final KafkaFluxService kafkaFluxService;
public KafkaFluxController(KafkaFluxService kafkaFluxService) {
this.kafkaFluxService = kafkaFluxService;
}
@GetMapping(value = "/kafka-flux")
public Flux<ConsumerRecord<String, String>> getMessages() {
return kafkaFluxService.createKafkaFlux("localhost:9092", "your-topic");
}
}
当一个客户端通过HTTP请求连接到 /kafka-flux 端点并开始消费这个 Flux 时,Spring WebFlux 框架会处理来自客户端的请求,并且将服务器端的 Flux 流与客户端连接关联起来。如果客户端连接断开(例如,用户关闭了浏览器标签、网络连接丢失等),那么订阅这个 Flux 的客户端会被取消订阅。
一旦所有订阅者都取消订阅了,通常情况下,Reactor Kafka Receiver的内部机制也应该停止从Kafka主题消费消息。然而,这取决于您如何设置和管理 Kafka Receiver 和它的资源。
为了确保在客户端断开连接时正确地停止消费Kafka消息,您可能需要在创建 Flux 时添加适当的清理逻辑,例如使用 doOnCancel 或 doFinally 操作符
public Flux<String> createKafkaFlux(String bootstrapServers, String topic) {
// ... Kafka Receiver配置代码 ...
return KafkaReceiver.create(receiverOptions).receive()
.doOnCancel(() -> {
// 客户端取消订阅时调用
// 在这里执行清理操作,例如关闭Kafka Receiver
})
.doFinally(signalType -> {
if (signalType == SignalType.CANCEL) {
// 清理资源,例如关闭Kafka Receiver
}
})
.map(record -> record.value()); // 假设我们只对消息的值感兴趣
}
前端除了用sse客户端处理 响应式接口,还能怎么处理流数据的响应
前端处理来自Spring WebFlux响应式接口的数据流除了使用Server-Sent Events (SSE)外,还有其他几种方法:
轮询:
前端可以定期发送HTTP请求到服务器以获取最新数据。这种方法不是真正的实时通信,但对于更新频率要求不高的场景可能足够用。
长轮询:
长轮询是轮询的改进版,客户端发送请求后,服务器会持有该请求直到有新数据可发送或超时。这减少了请求的数量,但仍然没有SSE和WebSocket那样的实时性能。
Fetch API with ReadableStream:
使用现代浏览器的fetch API,您可以访问响应体的ReadableStream,这允许您逐步处理流式数据。这种方法适用于处理大量数据或者分块传输编码的响应。
fetch('/your-flux-endpoint')
.then(response => {
const reader = response.body.getReader();
// 处理stream的代码...
});
WebSocket:
如果后端支持,您可以使用WebSocket进行双向通信。WebSocket可以提供一个全双工的通信通道,允许服务器主动推送数据到客户端。
GraphQL Subscriptions:
如果您的应用使用GraphQL,并且后端实现了GraphQL subscriptions,那么您可以通过WebSocket在客户端订阅特定事件的数据更新。
第三方库:
例如,使用像Socket.IO这样的库,它在WebSocket之上提供了更高级别的抽象,包括自动重连、事件分发等功能。
gRPC-Web:
对于支持gRPC的应用程序,gRPC-Web客户端可以与服务端进行流式通信。虽然gRPC主要用于内部服务通信,但gRPC-Web将其扩展到了Web客户端。
选择哪种方法取决于具体需求、后端API的支持情况以及目标用户的浏览器兼容性。例如,如果需要低延迟的实时更新,可能会选择WebSocket;如果只需要偶尔从服务器获取更新,可能会选择轮询或长轮询。
使用Spring MVC来发送事件:
- 使用 @RestController 注解创建一个控制器类(Controller)
- 创建一个方法来创建一个客户端连接,它返回一个 SseEmitter,处理 GET 请求并产生(produces)文本/事件流 (text/event-stream)
- 创建一个新的 SseEmitter, 保存它并从方法中返回
- 在另一个线程中异步发送事件, 先拿到保存的 SseEmitter 并根据需要多次调用调用SseEmitter.send 方法
- 完成事件发送, 调用 SseEmitter.complete() 方法
- 要异常完成发送事件,请调用 SseEmitter.completeWithError() 方法
private static SseEmitter emitter;
@GetMapping(path="/stream-sse2", produces= MediaType.TEXT_EVENT_STREAM_VALUE)
SseEmitter createConnection() {
emitter = new SseEmitter();
new Thread(() -> {
try {
Thread.sleep(1000);
sendEvents();
} catch (InterruptedException e) {
e.printStackTrace();
}
}).start();
return emitter;
}
// in another thread
void sendEvents() {
try {
emitter.send("Alpha");
emitter.send("Omega");
emitter.complete();
} catch(Exception e) {
emitter.completeWithError(e);
}
}