在微服务架构中,应用的频繁部署和滚动更新是常态。这对于无状态的REST API来说不成问题,但对于Server-Sent Events (SSE) 这种长连接技术,却是一个致命的挑战。一个标准的Spring Boot应用在接收到SIGTERM
信号后,会立即中断所有活动的SSE连接,导致客户端瞬间断线,用户体验断崖式下跌。这种脆弱性在生产环境中是不可接受的。
问题的核心在于,大多数SSE的实现都忽略了服务的生命周期。一个生产级的服务必须能够“优雅地”关闭,它需要有能力通知客户端:“我即将关闭,请稍后重连”。同时,客户端也必须足够“智能”,能够理解这种指令,并在恰当的时机、以一种不冲击后端服务的方式重新建立连接。
这不仅仅是理论,而是真实项目中必须解决的工程问题。下面我们将从零开始,使用Spring Framework和TypeScript,构建一个完整的、支持优雅停机和智能重连的SSE解决方案。我们将直接跳过基础概念,聚焦于解决这一核心痛点的具体实现。
服务端架构:管理生命周期的 SseEmitter
一个健壮的服务端需要做的不仅仅是创建一个SseEmitter
实例并返回。它必须能够追踪、管理所有活跃的连接,并在应用关闭前对它们进行有序的处理。
1. 连接管理核心:SseEmitterManager
我们需要一个线程安全的组件来存储所有活跃的SseEmitter
实例。ConcurrentHashMap
是理想的选择,以一个唯一的客户端标识(例如用户ID)为键。
package com.example.sse.service;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.stereotype.Component;
import org.springframework.web.servlet.mvc.method.annotation.SseEmitter;
import java.io.IOException;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicLong;
@Component
public class SseEmitterManager {
private static final Logger logger = LoggerFactory.getLogger(SseEmitterManager.class);
// 使用 ConcurrentHashMap 存储 SseEmitter 实例,保证线程安全
private final Map<String, SseEmitter> emitters = new ConcurrentHashMap<>();
// 用于生成唯一的连接ID,便于追踪
private final AtomicLong connectionIdGenerator = new AtomicLong(0);
/**
* 添加一个新的 SseEmitter 连接
*
* @param clientId 客户端唯一标识
* @return 创建的 SseEmitter 实例
*/
public SseEmitter add(String clientId) {
// 设置一个较长的超时时间,或根据业务需求调整
// 默认超时后,SseEmitter 会自动完成,但我们通过心跳来维持连接
SseEmitter emitter = new SseEmitter(Long.MAX_VALUE);
String connectionId = clientId + "-" + connectionIdGenerator.incrementAndGet();
// 注册回调,处理连接完成、超时和错误
emitter.onCompletion(() -> {
logger.info("Connection completed, removing emitter for client: {}", connectionId);
this.emitters.remove(clientId);
});
emitter.onTimeout(() -> {
logger.warn("Connection timed out, completing emitter for client: {}", connectionId);
emitter.complete(); // 主动完成,触发 onCompletion
});
emitter.onError(throwable -> {
logger.error("Connection error for client: {}", connectionId, throwable);
emitter.completeWithError(throwable); // 触发 onCompletion
});
this.emitters.put(clientId, emitter);
logger.info("New SSE connection added for client: {}", connectionId);
// 立即发送一个连接确认事件
try {
SseEmitter.SseEventBuilder event = SseEmitter.event()
.name("connected")
.data("{\"message\":\"Connection established successfully\"}")
.id(String.valueOf(System.currentTimeMillis()));
emitter.send(event);
} catch (IOException e) {
logger.error("Failed to send connection confirmation to client: {}", connectionId, e);
emitter.completeWithError(e);
}
return emitter;
}
/**
* 向特定客户端发送消息
*
* @param clientId 客户端ID
* @param eventName 事件名称
* @param data 事件数据
*/
public void sendMessage(String clientId, String eventName, Object data) {
SseEmitter emitter = emitters.get(clientId);
if (emitter != null) {
try {
SseEmitter.SseEventBuilder event = SseEmitter.event()
.name(eventName)
.data(data)
.id(String.valueOf(System.currentTimeMillis()));
emitter.send(event);
} catch (IOException e) {
logger.error("Failed to send message to client {}: {}", clientId, e.getMessage());
// 发送失败时,可以考虑移除这个emitter
emitter.completeWithError(e);
}
} else {
logger.warn("No active emitter found for client: {}", clientId);
}
}
/**
* 广播消息给所有连接的客户端
*
* @param eventName 事件名称
* @param data 事件数据
*/
public void broadcast(String eventName, Object data) {
emitters.forEach((clientId, emitter) -> {
try {
SseEmitter.SseEventBuilder event = SseEmitter.event()
.name(eventName)
.data(data);
emitter.send(event);
} catch (IOException e) {
logger.error("Failed to broadcast to client {}: {}", clientId, e.getMessage());
// 考虑将错误处理逻辑封装,避免重复代码
emitter.completeWithError(e);
}
});
}
public Map<String, SseEmitter> getEmitters() {
return emitters;
}
}
这个SseEmitterManager
是整个服务端的核心。它不仅存储连接,还统一处理了onCompletion
, onTimeout
, onError
这些关键的生命周期回调,确保任何原因导致的连接中断都能正确地将SseEmitter
实例从Map中移除,防止内存泄漏。
2. API 入口:SseController
Controller层负责建立连接,它将具体实现委托给SseEmitterManager
。
package com.example.sse.controller;
import com.example.sse.service.SseEmitterManager;
import org.springframework.http.MediaType;
import org.springframework.http.ResponseEntity;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.PathVariable;
import org.springframework.web.bind.annotation.RestController;
import org.springframework.web.servlet.mvc.method.annotation.SseEmitter;
@RestController
public class SseController {
private final SseEmitterManager sseEmitterManager;
public SseController(SseEmitterManager sseEmitterManager) {
this.sseEmitterManager = sseEmitterManager;
}
@GetMapping(path = "/sse/subscribe/{clientId}", produces = MediaType.TEXT_EVENT_STREAM_VALUE)
public ResponseEntity<SseEmitter> subscribe(@PathVariable String clientId) {
// 此处应该有安全校验,确保clientId的合法性
SseEmitter emitter = sseEmitterManager.add(clientId);
return ResponseEntity.ok(emitter);
}
}
3. 优雅停机处理器:GracefulShutdownHandler
这是实现优雅停机的关键。我们利用Spring的@PreDestroy
注解,在应用上下文关闭之前执行一段逻辑。这段逻辑会遍历所有活跃的连接,并发送一个特殊的shutdown
事件。
package com.example.sse.service;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.stereotype.Component;
import org.springframework.web.servlet.mvc.method.annotation.SseEmitter;
import javax.annotation.PreDestroy;
import java.io.IOException;
@Component
public class GracefulShutdownHandler {
private static final Logger logger = LoggerFactory.getLogger(GracefulShutdownHandler.class);
private final SseEmitterManager sseEmitterManager;
// 定义一个建议客户端在5秒后重连的值
private static final long RECONNECT_DELAY_MS = 5000;
public GracefulShutdownHandler(SseEmitterManager sseEmitterManager) {
this.sseEmitterManager = sseEmitterManager;
}
@PreDestroy
public void onShutdown() {
logger.info("Application is shutting down. Notifying all SSE clients to reconnect.");
sseEmitterManager.getEmitters().forEach((clientId, emitter) -> {
try {
// SSE规范允许通过 'retry:' 字段建议客户端重连的延迟时间
// 我们发送一个自定义的 'shutdown' 事件
SseEmitter.SseEventBuilder shutdownEvent = SseEmitter.event()
.name("shutdown")
.data("Server is shutting down. Please reconnect.")
.reconnectTime(RECONNECT_DELAY_MS); // 设置 retry 字段
emitter.send(shutdownEvent);
// 发送完指令后,立即完成这个emitter
emitter.complete();
logger.info("Shutdown notification sent to client: {}", clientId);
} catch (IOException e) {
logger.error("Error notifying client {} of shutdown: {}", clientId, e.getMessage());
emitter.completeWithError(e);
}
});
logger.info("All active SSE clients have been notified.");
}
}
为了让@PreDestroy
有足够的时间执行完毕,我们需要在application.properties
中配置一个优雅停机的宽限期。
# application.properties
server.shutdown=graceful
# 给予应用15秒的时间来完成正在进行的操作,包括我们的优雅停机通知
spring.lifecycle.timeout-per-shutdown-phase=15s
4. 维持连接的心跳机制
许多网络中间件(如负载均衡器、代理)会对空闲的TCP连接进行清理。为了防止SSE连接因此被意外断开,必须定期发送心跳数据。
package com.example.sse.service;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.scheduling.annotation.EnableScheduling;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Component;
import java.io.IOException;
@Component
@EnableScheduling
public class HeartbeatService {
private static final Logger logger = LoggerFactory.getLogger(HeartbeatService.class);
private final SseEmitterManager sseEmitterManager;
public HeartbeatService(SseEmitterManager sseEmitterManager) {
this.sseEmitterManager = sseEmitterManager;
}
// 每20秒发送一次心跳
@Scheduled(fixedRate = 20000)
public void sendHeartbeat() {
// SSE规范中,以冒号开头的行是注释,不会触发客户端的 onmessage 事件
// 这是一种轻量级的、无业务干扰的心跳方式
sseEmitterManager.getEmitters().forEach((clientId, emitter) -> {
try {
emitter.send(":heartbeat");
} catch (IOException e) {
// 当连接不可用时,这里会抛出异常
logger.warn("Failed to send heartbeat to client {}, connection might be closed.", clientId);
emitter.completeWithError(e); // 标记并移除无效连接
}
});
}
}
// 别忘了在主应用类上添加 @EnableScheduling
服务端的工作已经完成。它现在具备了连接管理、心跳维持以及在关闭前通知客户端的能力。
客户端实现:TypeScript 中的智能重连逻辑
客户端不能再简单地依赖EventSource
的默认行为了。我们需要封装它,创建一个能够理解shutdown
事件、处理网络错误,并实现指数退避重连策略的智能客户端。
1. 封装 EventSource
: SseClientService.ts
我们将创建一个 TypeScript 类来管理 SSE 连接的整个生命周期。
// SseClientService.ts
export enum SseConnectionStatus {
CONNECTING,
OPEN,
CLOSED,
}
type SseEventListener = (event: MessageEvent) => void;
export class SseClientService {
private eventSource: EventSource | null = null;
private status: SseConnectionStatus = SseConnectionStatus.CLOSED;
private url: string;
private readonly clientId: string;
// 重连逻辑相关参数
private reconnectAttempts = 0;
private maxReconnectAttempts = 10;
private initialReconnectDelay = 1000; // 1秒
private maxReconnectDelay = 30000; // 30秒
private backoffFactor = 2;
// 事件监听器
private listeners: Map<string, SseEventListener[]> = new Map();
constructor(baseUrl: string, clientId: string) {
this.url = `${baseUrl}/sse/subscribe/${clientId}`;
this.clientId = clientId;
}
public connect(): void {
if (this.status !== SseConnectionStatus.CLOSED) {
console.warn("SSE connection is already open or connecting.");
return;
}
console.log(`[SSE] Connecting to ${this.url}`);
this.status = SseConnectionStatus.CONNECTING;
this.eventSource = new EventSource(this.url);
this.eventSource.onopen = () => {
console.log("[SSE] Connection opened.");
this.status = SseConnectionStatus.OPEN;
this.reconnectAttempts = 0; // 连接成功后重置重试次数
this.dispatchEvent(new MessageEvent('open'));
};
this.eventSource.onerror = (error) => {
console.error("[SSE] Connection error:", error);
// EventSource 默认会尝试重连,但它的策略很简陋。
// 当发生错误时,我们关闭当前连接,并启动自定义的重连逻辑。
this.eventSource?.close();
this.status = SseConnectionStatus.CLOSED;
this.handleReconnection();
};
// 监听所有未命名事件
this.eventSource.onmessage = (event) => {
this.dispatchEvent(event, 'message');
};
// 监听服务端自定义事件
this.setupCustomEventListeners();
}
private setupCustomEventListeners(): void {
if (!this.eventSource) return;
// 关键:监听服务端发送的 shutdown 事件
this.eventSource.addEventListener('shutdown', (event) => {
console.warn("[SSE] Received shutdown signal from server. Closing connection and will reconnect as instructed.");
this.eventSource?.close();
this.status = SseConnectionStatus.CLOSED;
// 服务端通过 'retry' 字段建议了重连时间
// MessageEvent 没有直接的 retry 属性,但它可能在 event.data 里
// EventSource规范里,retry 是通过一个单独的`retry: <milliseconds>`行发送的
// 不过,我们这里逻辑上直接使用一个固定的延迟+jitter
const instructedDelay = event.lastEventId ? parseInt(event.lastEventId, 10) : 5000;
setTimeout(() => this.connect(), instructedDelay + Math.random() * 1000);
});
// 监听连接确认事件
this.eventSource.addEventListener('connected', (event) => {
console.log('[SSE] Server confirmed connection:', JSON.parse(event.data));
});
// 动态为用户注册的事件添加监听器
this.listeners.forEach((_, eventName) => {
if (eventName !== 'message' && eventName !== 'open') {
this.eventSource?.addEventListener(eventName, (event) => this.dispatchEvent(event, eventName));
}
});
}
private handleReconnection(): void {
if (this.reconnectAttempts >= this.maxReconnectAttempts) {
console.error(`[SSE] Max reconnection attempts (${this.maxReconnectAttempts}) reached. Giving up.`);
return;
}
this.reconnectAttempts++;
// 指数退避算法
const delay = Math.min(
this.initialReconnectDelay * Math.pow(this.backoffFactor, this.reconnectAttempts - 1),
this.maxReconnectDelay
);
// 添加 Jitter(抖动),防止所有客户端在同一时间重连,造成“惊群效应”
const jitter = delay * 0.2 * Math.random();
const reconnectTimeout = delay + jitter;
console.log(`[SSE] Attempting to reconnect in ${reconnectTimeout.toFixed(0)} ms (attempt ${this.reconnectAttempts})...`);
setTimeout(() => this.connect(), reconnectTimeout);
}
public addEventListener(eventName: string, listener: SseEventListener): void {
if (!this.listeners.has(eventName)) {
this.listeners.set(eventName, []);
}
this.listeners.get(eventName)?.push(listener);
// 如果连接已建立,动态添加监听器
if (this.status === SseConnectionStatus.OPEN && this.eventSource) {
this.eventSource.addEventListener(eventName, listener);
}
}
public removeEventListener(eventName: string, listener: SseEventListener): void {
const eventListeners = this.listeners.get(eventName);
if (eventListeners) {
const index = eventListeners.indexOf(listener);
if (index > -1) {
eventListeners.splice(index, 1);
}
}
// 同样,如果连接已建立,也需要移除
if (this.status === SseConnectionStatus.OPEN && this.eventSource) {
this.eventSource.removeEventListener(eventName, listener);
}
}
private dispatchEvent(event: MessageEvent, eventName: string = 'message'): void {
this.listeners.get(eventName)?.forEach(listener => listener(event));
}
public close(): void {
if (this.eventSource) {
console.log("[SSE] Closing connection manually.");
this.eventSource.close();
this.status = SseConnectionStatus.CLOSED;
this.eventSource = null;
}
}
}
这段TypeScript代码是整个方案的另一半核心。它实现了:
- 状态管理: 清晰地追踪
CONNECTING
,OPEN
,CLOSED
状态。 - 优雅停机响应: 专门监听
shutdown
事件,并执行受控的关闭与重连。 - 指数退避与Jitter: 在发生网络错误时,避免了简单粗暴的立即重连,减轻了服务端在故障恢复期间的压力。这是生产级客户端必须具备的特性。
- 事件总线: 提供
addEventListener
和removeEventListener
,使得业务代码可以解耦地监听来自SSE的各类事件。
2. 可视化流程:Mermaid 时序图
整个优雅停机到重连的流程可以用下面的时序图清晰地表示:
sequenceDiagram participant Client participant LoadBalancer participant Server_Instance_A participant Server_Instance_B participant Operator Client->>LoadBalancer: GET /sse/subscribe/user123 LoadBalancer->>Server_Instance_A: GET /sse/subscribe/user123 Server_Instance_A-->>Client: Connection Opened (HTTP 200) Server_Instance_A-->>Client: event: connected loop Data Exchange Server_Instance_A-->>Client: event: notification, data: {...} end Operator->>Server_Instance_A: SIGTERM (Shutdown Signal) Note over Server_Instance_A: Triggering @PreDestroy hook Server_Instance_A-->>Client: event: shutdown, reconnectTime: 5000 Server_Instance_A-->>Client: Connection Closed Note over Client: Receives shutdown, closes connection. Client->>Client: setTimeout(reconnect, 5000ms) Note over LoadBalancer: Detects Server_Instance_A is unhealthy, removes it from pool. Client->>LoadBalancer: GET /sse/subscribe/user123 (After 5s) LoadBalancer->>Server_Instance_B: GET /sse/subscribe/user123 (New instance) Server_Instance_B-->>Client: Connection Opened (HTTP 200) Server_Instance_B-->>Client: event: connected Note over Client: Reconnection successful. User experiences minimal disruption.
常见误区与最佳实践
误区:依赖默认的
EventSource
重连机制。浏览器的默认重连非常简单,通常是固定的3秒延迟,它无法识别计划内的停机,也无法应对服务端的大规模故障。自定义的指数退避和抖动策略是必不可少的。误区:忘记处理
SseEmitter
的生命周期。在onCompletion
,onError
,onTimeout
回调中未能从管理器中移除SseEmitter
,是导致服务端内存泄漏的常见原因。必须确保任何路径的连接终止都会触发清理逻辑。实践:clientId 的安全与验证。在示例中我们直接使用了路径变量
{clientId}
。在真实项目中,这个ID必须与用户的认证信息(如JWT中的用户ID)进行严格匹配,防止任何用户订阅到不属于自己的数据流。实践:负载均衡器的配置。如果使用了Nginx等反向代理,需要确保代理配置支持长连接,并关闭或调大响应缓冲(
proxy_buffering off;
),否则SSE事件可能会被代理缓冲,导致客户端无法实时接收。
适用边界与未来展望
这套方案为在需要频繁部署的微服务环境中实现可靠的SSE通信提供了坚实的基础。它非常适合实时通知、仪表盘数据更新、后台任务进度展示等场景。
然而,该方案并未解决所有问题。它本质上是通过最小化中断时间来提升用户体验,但并不能保证在停机瞬间的“零消息丢失”。如果一个业务消息在SIGTERM
信号到达和shutdown
事件发送之间产生,它可能会丢失。对于需要绝对消息可靠性的场景,例如金融交易通知,单纯的SSE是不够的。这类场景需要一个更复杂的架构,可能包括:
- 消息队列集成:服务端将消息推送到持久化的消息队列(如RabbitMQ, Kafka),SSE服务作为消费者,再将消息推送给客户端。
- 客户端消息确认与追溯:客户端在收到消息后,可以向服务端发送一个ACK确认。结合消息ID或序列号,客户端在重连后可以请求从上一个确认的消息ID之后开始重新同步数据。
这种增强方案将系统从“尽力而为”的推送模型,升级为具备可靠消息传递能力的系统,但这已超出了本文探讨的范畴,是向更高可用性架构演进的下一步。