构建支持优雅停机与智能重连的生产级SSE服务


在微服务架构中,应用的频繁部署和滚动更新是常态。这对于无状态的REST API来说不成问题,但对于Server-Sent Events (SSE) 这种长连接技术,却是一个致命的挑战。一个标准的Spring Boot应用在接收到SIGTERM信号后,会立即中断所有活动的SSE连接,导致客户端瞬间断线,用户体验断崖式下跌。这种脆弱性在生产环境中是不可接受的。

问题的核心在于,大多数SSE的实现都忽略了服务的生命周期。一个生产级的服务必须能够“优雅地”关闭,它需要有能力通知客户端:“我即将关闭,请稍后重连”。同时,客户端也必须足够“智能”,能够理解这种指令,并在恰当的时机、以一种不冲击后端服务的方式重新建立连接。

这不仅仅是理论,而是真实项目中必须解决的工程问题。下面我们将从零开始,使用Spring Framework和TypeScript,构建一个完整的、支持优雅停机和智能重连的SSE解决方案。我们将直接跳过基础概念,聚焦于解决这一核心痛点的具体实现。

服务端架构:管理生命周期的 SseEmitter

一个健壮的服务端需要做的不仅仅是创建一个SseEmitter实例并返回。它必须能够追踪、管理所有活跃的连接,并在应用关闭前对它们进行有序的处理。

1. 连接管理核心:SseEmitterManager

我们需要一个线程安全的组件来存储所有活跃的SseEmitter实例。ConcurrentHashMap是理想的选择,以一个唯一的客户端标识(例如用户ID)为键。

package com.example.sse.service;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.stereotype.Component;
import org.springframework.web.servlet.mvc.method.annotation.SseEmitter;

import java.io.IOException;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicLong;

@Component
public class SseEmitterManager {

    private static final Logger logger = LoggerFactory.getLogger(SseEmitterManager.class);
    // 使用 ConcurrentHashMap 存储 SseEmitter 实例,保证线程安全
    private final Map<String, SseEmitter> emitters = new ConcurrentHashMap<>();
    // 用于生成唯一的连接ID,便于追踪
    private final AtomicLong connectionIdGenerator = new AtomicLong(0);

    /**
     * 添加一个新的 SseEmitter 连接
     *
     * @param clientId 客户端唯一标识
     * @return 创建的 SseEmitter 实例
     */
    public SseEmitter add(String clientId) {
        // 设置一个较长的超时时间,或根据业务需求调整
        // 默认超时后,SseEmitter 会自动完成,但我们通过心跳来维持连接
        SseEmitter emitter = new SseEmitter(Long.MAX_VALUE);
        String connectionId = clientId + "-" + connectionIdGenerator.incrementAndGet();

        // 注册回调,处理连接完成、超时和错误
        emitter.onCompletion(() -> {
            logger.info("Connection completed, removing emitter for client: {}", connectionId);
            this.emitters.remove(clientId);
        });
        emitter.onTimeout(() -> {
            logger.warn("Connection timed out, completing emitter for client: {}", connectionId);
            emitter.complete(); // 主动完成,触发 onCompletion
        });
        emitter.onError(throwable -> {
            logger.error("Connection error for client: {}", connectionId, throwable);
            emitter.completeWithError(throwable); // 触发 onCompletion
        });

        this.emitters.put(clientId, emitter);
        logger.info("New SSE connection added for client: {}", connectionId);

        // 立即发送一个连接确认事件
        try {
            SseEmitter.SseEventBuilder event = SseEmitter.event()
                    .name("connected")
                    .data("{\"message\":\"Connection established successfully\"}")
                    .id(String.valueOf(System.currentTimeMillis()));
            emitter.send(event);
        } catch (IOException e) {
            logger.error("Failed to send connection confirmation to client: {}", connectionId, e);
            emitter.completeWithError(e);
        }
        
        return emitter;
    }

    /**
     * 向特定客户端发送消息
     *
     * @param clientId 客户端ID
     * @param eventName 事件名称
     * @param data      事件数据
     */
    public void sendMessage(String clientId, String eventName, Object data) {
        SseEmitter emitter = emitters.get(clientId);
        if (emitter != null) {
            try {
                SseEmitter.SseEventBuilder event = SseEmitter.event()
                        .name(eventName)
                        .data(data)
                        .id(String.valueOf(System.currentTimeMillis()));
                emitter.send(event);
            } catch (IOException e) {
                logger.error("Failed to send message to client {}: {}", clientId, e.getMessage());
                // 发送失败时,可以考虑移除这个emitter
                emitter.completeWithError(e);
            }
        } else {
            logger.warn("No active emitter found for client: {}", clientId);
        }
    }
    
    /**
     * 广播消息给所有连接的客户端
     *
     * @param eventName 事件名称
     * @param data      事件数据
     */
    public void broadcast(String eventName, Object data) {
        emitters.forEach((clientId, emitter) -> {
            try {
                SseEmitter.SseEventBuilder event = SseEmitter.event()
                        .name(eventName)
                        .data(data);
                emitter.send(event);
            } catch (IOException e) {
                logger.error("Failed to broadcast to client {}: {}", clientId, e.getMessage());
                // 考虑将错误处理逻辑封装,避免重复代码
                emitter.completeWithError(e);
            }
        });
    }

    public Map<String, SseEmitter> getEmitters() {
        return emitters;
    }
}

这个SseEmitterManager是整个服务端的核心。它不仅存储连接,还统一处理了onCompletion, onTimeout, onError这些关键的生命周期回调,确保任何原因导致的连接中断都能正确地将SseEmitter实例从Map中移除,防止内存泄漏。

2. API 入口:SseController

Controller层负责建立连接,它将具体实现委托给SseEmitterManager

package com.example.sse.controller;

import com.example.sse.service.SseEmitterManager;
import org.springframework.http.MediaType;
import org.springframework.http.ResponseEntity;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.PathVariable;
import org.springframework.web.bind.annotation.RestController;
import org.springframework.web.servlet.mvc.method.annotation.SseEmitter;

@RestController
public class SseController {

    private final SseEmitterManager sseEmitterManager;

    public SseController(SseEmitterManager sseEmitterManager) {
        this.sseEmitterManager = sseEmitterManager;
    }

    @GetMapping(path = "/sse/subscribe/{clientId}", produces = MediaType.TEXT_EVENT_STREAM_VALUE)
    public ResponseEntity<SseEmitter> subscribe(@PathVariable String clientId) {
        // 此处应该有安全校验,确保clientId的合法性
        SseEmitter emitter = sseEmitterManager.add(clientId);
        return ResponseEntity.ok(emitter);
    }
}

3. 优雅停机处理器:GracefulShutdownHandler

这是实现优雅停机的关键。我们利用Spring的@PreDestroy注解,在应用上下文关闭之前执行一段逻辑。这段逻辑会遍历所有活跃的连接,并发送一个特殊的shutdown事件。

package com.example.sse.service;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.stereotype.Component;
import org.springframework.web.servlet.mvc.method.annotation.SseEmitter;

import javax.annotation.PreDestroy;
import java.io.IOException;

@Component
public class GracefulShutdownHandler {

    private static final Logger logger = LoggerFactory.getLogger(GracefulShutdownHandler.class);
    private final SseEmitterManager sseEmitterManager;
    // 定义一个建议客户端在5秒后重连的值
    private static final long RECONNECT_DELAY_MS = 5000;

    public GracefulShutdownHandler(SseEmitterManager sseEmitterManager) {
        this.sseEmitterManager = sseEmitterManager;
    }

    @PreDestroy
    public void onShutdown() {
        logger.info("Application is shutting down. Notifying all SSE clients to reconnect.");

        sseEmitterManager.getEmitters().forEach((clientId, emitter) -> {
            try {
                // SSE规范允许通过 'retry:' 字段建议客户端重连的延迟时间
                // 我们发送一个自定义的 'shutdown' 事件
                SseEmitter.SseEventBuilder shutdownEvent = SseEmitter.event()
                        .name("shutdown")
                        .data("Server is shutting down. Please reconnect.")
                        .reconnectTime(RECONNECT_DELAY_MS); // 设置 retry 字段

                emitter.send(shutdownEvent);
                // 发送完指令后,立即完成这个emitter
                emitter.complete();
                logger.info("Shutdown notification sent to client: {}", clientId);
            } catch (IOException e) {
                logger.error("Error notifying client {} of shutdown: {}", clientId, e.getMessage());
                emitter.completeWithError(e);
            }
        });

        logger.info("All active SSE clients have been notified.");
    }
}

为了让@PreDestroy有足够的时间执行完毕,我们需要在application.properties中配置一个优雅停机的宽限期。

# application.properties
server.shutdown=graceful
# 给予应用15秒的时间来完成正在进行的操作,包括我们的优雅停机通知
spring.lifecycle.timeout-per-shutdown-phase=15s

4. 维持连接的心跳机制

许多网络中间件(如负载均衡器、代理)会对空闲的TCP连接进行清理。为了防止SSE连接因此被意外断开,必须定期发送心跳数据。

package com.example.sse.service;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.scheduling.annotation.EnableScheduling;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Component;

import java.io.IOException;

@Component
@EnableScheduling
public class HeartbeatService {

    private static final Logger logger = LoggerFactory.getLogger(HeartbeatService.class);
    private final SseEmitterManager sseEmitterManager;

    public HeartbeatService(SseEmitterManager sseEmitterManager) {
        this.sseEmitterManager = sseEmitterManager;
    }

    // 每20秒发送一次心跳
    @Scheduled(fixedRate = 20000)
    public void sendHeartbeat() {
        // SSE规范中,以冒号开头的行是注释,不会触发客户端的 onmessage 事件
        // 这是一种轻量级的、无业务干扰的心跳方式
        sseEmitterManager.getEmitters().forEach((clientId, emitter) -> {
            try {
                emitter.send(":heartbeat");
            } catch (IOException e) {
                // 当连接不可用时,这里会抛出异常
                logger.warn("Failed to send heartbeat to client {}, connection might be closed.", clientId);
                emitter.completeWithError(e); // 标记并移除无效连接
            }
        });
    }
}

// 别忘了在主应用类上添加 @EnableScheduling

服务端的工作已经完成。它现在具备了连接管理、心跳维持以及在关闭前通知客户端的能力。

客户端实现:TypeScript 中的智能重连逻辑

客户端不能再简单地依赖EventSource的默认行为了。我们需要封装它,创建一个能够理解shutdown事件、处理网络错误,并实现指数退避重连策略的智能客户端。

1. 封装 EventSource: SseClientService.ts

我们将创建一个 TypeScript 类来管理 SSE 连接的整个生命周期。

// SseClientService.ts

export enum SseConnectionStatus {
  CONNECTING,
  OPEN,
  CLOSED,
}

type SseEventListener = (event: MessageEvent) => void;

export class SseClientService {
  private eventSource: EventSource | null = null;
  private status: SseConnectionStatus = SseConnectionStatus.CLOSED;
  private url: string;
  private readonly clientId: string;

  // 重连逻辑相关参数
  private reconnectAttempts = 0;
  private maxReconnectAttempts = 10;
  private initialReconnectDelay = 1000; // 1秒
  private maxReconnectDelay = 30000; // 30秒
  private backoffFactor = 2;

  // 事件监听器
  private listeners: Map<string, SseEventListener[]> = new Map();

  constructor(baseUrl: string, clientId: string) {
    this.url = `${baseUrl}/sse/subscribe/${clientId}`;
    this.clientId = clientId;
  }

  public connect(): void {
    if (this.status !== SseConnectionStatus.CLOSED) {
      console.warn("SSE connection is already open or connecting.");
      return;
    }

    console.log(`[SSE] Connecting to ${this.url}`);
    this.status = SseConnectionStatus.CONNECTING;
    this.eventSource = new EventSource(this.url);

    this.eventSource.onopen = () => {
      console.log("[SSE] Connection opened.");
      this.status = SseConnectionStatus.OPEN;
      this.reconnectAttempts = 0; // 连接成功后重置重试次数
      this.dispatchEvent(new MessageEvent('open'));
    };

    this.eventSource.onerror = (error) => {
      console.error("[SSE] Connection error:", error);
      // EventSource 默认会尝试重连,但它的策略很简陋。
      // 当发生错误时,我们关闭当前连接,并启动自定义的重连逻辑。
      this.eventSource?.close();
      this.status = SseConnectionStatus.CLOSED;
      this.handleReconnection();
    };

    // 监听所有未命名事件
    this.eventSource.onmessage = (event) => {
      this.dispatchEvent(event, 'message');
    };
    
    // 监听服务端自定义事件
    this.setupCustomEventListeners();
  }

  private setupCustomEventListeners(): void {
    if (!this.eventSource) return;

    // 关键:监听服务端发送的 shutdown 事件
    this.eventSource.addEventListener('shutdown', (event) => {
      console.warn("[SSE] Received shutdown signal from server. Closing connection and will reconnect as instructed.");
      this.eventSource?.close();
      this.status = SseConnectionStatus.CLOSED;
      
      // 服务端通过 'retry' 字段建议了重连时间
      // MessageEvent 没有直接的 retry 属性,但它可能在 event.data 里
      // EventSource规范里,retry 是通过一个单独的`retry: <milliseconds>`行发送的
      // 不过,我们这里逻辑上直接使用一个固定的延迟+jitter
      const instructedDelay = event.lastEventId ? parseInt(event.lastEventId, 10) : 5000;
      setTimeout(() => this.connect(), instructedDelay + Math.random() * 1000);
    });
    
    // 监听连接确认事件
    this.eventSource.addEventListener('connected', (event) => {
        console.log('[SSE] Server confirmed connection:', JSON.parse(event.data));
    });

    // 动态为用户注册的事件添加监听器
    this.listeners.forEach((_, eventName) => {
      if (eventName !== 'message' && eventName !== 'open') {
         this.eventSource?.addEventListener(eventName, (event) => this.dispatchEvent(event, eventName));
      }
    });
  }

  private handleReconnection(): void {
    if (this.reconnectAttempts >= this.maxReconnectAttempts) {
      console.error(`[SSE] Max reconnection attempts (${this.maxReconnectAttempts}) reached. Giving up.`);
      return;
    }

    this.reconnectAttempts++;
    // 指数退避算法
    const delay = Math.min(
      this.initialReconnectDelay * Math.pow(this.backoffFactor, this.reconnectAttempts - 1),
      this.maxReconnectDelay
    );
    // 添加 Jitter(抖动),防止所有客户端在同一时间重连,造成“惊群效应”
    const jitter = delay * 0.2 * Math.random();
    const reconnectTimeout = delay + jitter;

    console.log(`[SSE] Attempting to reconnect in ${reconnectTimeout.toFixed(0)} ms (attempt ${this.reconnectAttempts})...`);

    setTimeout(() => this.connect(), reconnectTimeout);
  }

  public addEventListener(eventName: string, listener: SseEventListener): void {
    if (!this.listeners.has(eventName)) {
        this.listeners.set(eventName, []);
    }
    this.listeners.get(eventName)?.push(listener);

    // 如果连接已建立,动态添加监听器
    if (this.status === SseConnectionStatus.OPEN && this.eventSource) {
        this.eventSource.addEventListener(eventName, listener);
    }
  }

  public removeEventListener(eventName: string, listener: SseEventListener): void {
    const eventListeners = this.listeners.get(eventName);
    if (eventListeners) {
        const index = eventListeners.indexOf(listener);
        if (index > -1) {
            eventListeners.splice(index, 1);
        }
    }
    // 同样,如果连接已建立,也需要移除
    if (this.status === SseConnectionStatus.OPEN && this.eventSource) {
        this.eventSource.removeEventListener(eventName, listener);
    }
  }

  private dispatchEvent(event: MessageEvent, eventName: string = 'message'): void {
    this.listeners.get(eventName)?.forEach(listener => listener(event));
  }
  
  public close(): void {
    if (this.eventSource) {
      console.log("[SSE] Closing connection manually.");
      this.eventSource.close();
      this.status = SseConnectionStatus.CLOSED;
      this.eventSource = null;
    }
  }
}

这段TypeScript代码是整个方案的另一半核心。它实现了:

  1. 状态管理: 清晰地追踪CONNECTING, OPEN, CLOSED状态。
  2. 优雅停机响应: 专门监听shutdown事件,并执行受控的关闭与重连。
  3. 指数退避与Jitter: 在发生网络错误时,避免了简单粗暴的立即重连,减轻了服务端在故障恢复期间的压力。这是生产级客户端必须具备的特性。
  4. 事件总线: 提供addEventListenerremoveEventListener,使得业务代码可以解耦地监听来自SSE的各类事件。

2. 可视化流程:Mermaid 时序图

整个优雅停机到重连的流程可以用下面的时序图清晰地表示:

sequenceDiagram
    participant Client
    participant LoadBalancer
    participant Server_Instance_A
    participant Server_Instance_B
    participant Operator

    Client->>LoadBalancer: GET /sse/subscribe/user123
    LoadBalancer->>Server_Instance_A: GET /sse/subscribe/user123
    Server_Instance_A-->>Client: Connection Opened (HTTP 200)
    Server_Instance_A-->>Client: event: connected
    loop Data Exchange
        Server_Instance_A-->>Client: event: notification, data: {...}
    end

    Operator->>Server_Instance_A: SIGTERM (Shutdown Signal)
    Note over Server_Instance_A: Triggering @PreDestroy hook

    Server_Instance_A-->>Client: event: shutdown, reconnectTime: 5000
    Server_Instance_A-->>Client: Connection Closed
    Note over Client: Receives shutdown, closes connection.
    Client->>Client: setTimeout(reconnect, 5000ms)

    Note over LoadBalancer: Detects Server_Instance_A is unhealthy, removes it from pool.

    Client->>LoadBalancer: GET /sse/subscribe/user123 (After 5s)
    LoadBalancer->>Server_Instance_B: GET /sse/subscribe/user123 (New instance)
    Server_Instance_B-->>Client: Connection Opened (HTTP 200)
    Server_Instance_B-->>Client: event: connected
    Note over Client: Reconnection successful. User experiences minimal disruption.

常见误区与最佳实践

  1. 误区:依赖默认的EventSource重连机制。浏览器的默认重连非常简单,通常是固定的3秒延迟,它无法识别计划内的停机,也无法应对服务端的大规模故障。自定义的指数退避和抖动策略是必不可少的。

  2. 误区:忘记处理SseEmitter的生命周期。在onCompletion, onError, onTimeout回调中未能从管理器中移除SseEmitter,是导致服务端内存泄漏的常见原因。必须确保任何路径的连接终止都会触发清理逻辑。

  3. 实践:clientId 的安全与验证。在示例中我们直接使用了路径变量{clientId}。在真实项目中,这个ID必须与用户的认证信息(如JWT中的用户ID)进行严格匹配,防止任何用户订阅到不属于自己的数据流。

  4. 实践:负载均衡器的配置。如果使用了Nginx等反向代理,需要确保代理配置支持长连接,并关闭或调大响应缓冲(proxy_buffering off;),否则SSE事件可能会被代理缓冲,导致客户端无法实时接收。

适用边界与未来展望

这套方案为在需要频繁部署的微服务环境中实现可靠的SSE通信提供了坚实的基础。它非常适合实时通知、仪表盘数据更新、后台任务进度展示等场景。

然而,该方案并未解决所有问题。它本质上是通过最小化中断时间来提升用户体验,但并不能保证在停机瞬间的“零消息丢失”。如果一个业务消息在SIGTERM信号到达和shutdown事件发送之间产生,它可能会丢失。对于需要绝对消息可靠性的场景,例如金融交易通知,单纯的SSE是不够的。这类场景需要一个更复杂的架构,可能包括:

  1. 消息队列集成:服务端将消息推送到持久化的消息队列(如RabbitMQ, Kafka),SSE服务作为消费者,再将消息推送给客户端。
  2. 客户端消息确认与追溯:客户端在收到消息后,可以向服务端发送一个ACK确认。结合消息ID或序列号,客户端在重连后可以请求从上一个确认的消息ID之后开始重新同步数据。

这种增强方案将系统从“尽力而为”的推送模型,升级为具备可靠消息传递能力的系统,但这已超出了本文探讨的范畴,是向更高可用性架构演进的下一步。


  目录