移动端分析事件的洪峰差点冲垮了我们的主数据库。最初的设计非常直接:移动App采集用户行为,通过一个RESTful API直接写入PostgreSQL。在用户量少的时候,这套方案运行良好。但随着一次市场推广活动的成功,并发请求量在几分钟内飙升了50倍。API响应时间从50ms恶化到5000ms,数据库连接池被打满,最终导致了部分核心业务的阻塞。这是一个典型的、代价高昂的教训——同步写入是脆弱的。
问题根源在于耦合。数据接收和数据处理被紧紧地绑在了一起。我们需要拆解它。初步构想是引入一个消息队列,将数据接收的峰值“削平”,让后端处理系统可以按照自己的节奏去消费。在技术选型上,团队内部对 Kafka 和 AWS SQS 做了比较。考虑到我们现有的技术栈深度和运维成本,最终选择了 AWS SQS。它足够简单,完全托管,按量付费,并且与我们已有的AWS生态无缝集成,无需我们投入额外的精力去维护一个复杂的消息中间件集群。
后端服务我们决定继续使用 NestJS。它提供的模块化和依赖注入体系,非常适合构建结构清晰、易于测试的后台应用,无论是实现一个简单的API接收器,还是一个复杂的后台消费者(Consumer)服务。
第一步:重构写入链路,设计独立的消费者服务
首要任务是创建一个独立的、常驻后台的消费者进程。它的唯一职责就是从 SQS 队列中拉取消息,进行批处理,然后写入数据库。这个服务必须是健壮的,能够处理部分失败、优雅地关闭,并且易于水平扩展。
消费者核心逻辑:长轮询与批处理
为了降低成本和API调用次数,我们必须使用 SQS 的长轮询(Long Polling)和批处理(Batching)特性。这意味着消费者会向 SQS 请求最多10条消息,并等待最长20秒,直到满足任意一个条件才返回。
这是消费者服务的核心模块 sqs-consumer.service.ts
的实现。它不依赖任何特定的 NestJS SQS 封装库,而是直接使用官方的 @aws-sdk/client-sqs
,这能让我们对行为有最精细的控制。
// src/sqs-consumer/sqs-consumer.service.ts
import { Injectable, OnModuleInit, OnModuleDestroy, Logger } from '@nestjs/common';
import {
SQSClient,
ReceiveMessageCommand,
DeleteMessageBatchCommand,
Message,
ReceiveMessageCommandInput,
DeleteMessageBatchCommandEntry,
} from '@aws-sdk/client-sqs';
import { ConfigService } from '@nestjs/config';
@Injectable()
export class SqsConsumerService implements OnModuleInit, OnModuleDestroy {
private readonly logger = new Logger(SqsConsumerService.name);
private readonly sqsClient: SQSClient;
private readonly queueUrl: string;
private readonly maxMessagesPerBatch: number = 10; // SQS 上限
private readonly waitTimeSeconds: number = 20; // 启用长轮询
private isShuttingDown = false;
private pollingInterval: NodeJS.Timeout | null = null;
constructor(private readonly configService: ConfigService) {
this.queueUrl = this.configService.get<string>('AWS_SQS_QUEUE_URL');
// 在真实项目中,Region, Credentials 等应通过 ConfigService 或 IAM Role 动态注入
this.sqsClient = new SQSClient({
region: this.configService.get<string>('AWS_REGION'),
// 如果部署在 EC2 并附加了 IAM Role,则无需显式提供 credentials
credentials: {
accessKeyId: this.configService.get<string>('AWS_ACCESS_KEY_ID'),
secretAccessKey: this.configService.get<string>('AWS_SECRET_ACCESS_KEY'),
},
});
if (!this.queueUrl) {
throw new Error('AWS_SQS_QUEUE_URL is not defined in environment variables.');
}
}
async onModuleInit() {
this.logger.log('SQS Consumer Service initialized. Starting to poll...');
// 使用 setImmediate 确保在模块完全初始化后再开始轮询
setImmediate(() => this.startPolling());
}
async onModuleDestroy() {
this.logger.log('Shutdown signal received. Stopping SQS polling...');
this.isShuttingDown = true;
if (this.pollingInterval) {
clearTimeout(this.pollingInterval);
}
// 这里可以添加逻辑,等待当前正在处理的批次完成
await new Promise(resolve => setTimeout(resolve, 2000)); // 简单等待
this.logger.log('SQS Consumer Service stopped.');
}
private startPolling() {
if (this.isShuttingDown) {
return;
}
this.pollMessages()
.catch((error) => {
// 捕获轮询过程中的未知错误,防止进程崩溃
this.logger.error('An unexpected error occurred during polling.', error.stack);
})
.finally(() => {
// 无论成功或失败,只要没在关闭,就安排下一次轮询
if (!this.isShuttingDown) {
this.pollingInterval = setTimeout(() => this.startPolling(), 1000); // 1秒后再次轮询
}
});
}
private async pollMessages(): Promise<void> {
const receiveParams: ReceiveMessageCommandInput = {
QueueUrl: this.queueUrl,
MaxNumberOfMessages: this.maxMessagesPerBatch,
WaitTimeSeconds: this.waitTimeSeconds,
// 获取消息体和一些关键属性,用于调试和错误处理
MessageAttributeNames: ['All'],
AttributeNames: ['ApproximateReceiveCount'],
};
try {
const command = new ReceiveMessageCommand(receiveParams);
const { Messages } = await this.sqsClient.send(command);
if (!Messages || Messages.length === 0) {
this.logger.verbose('No messages received in this poll.');
return;
}
this.logger.log(`Received ${Messages.length} messages. Processing batch...`);
await this.processBatch(Messages);
} catch (error) {
this.logger.error(`Failed to receive messages from SQS: ${error.message}`, error.stack);
// 在出现 SDK 错误时(如网络问题),增加等待时间,避免频繁请求
await new Promise(resolve => setTimeout(resolve, 5000));
}
}
private async processBatch(messages: Message[]): Promise<void> {
const successfullyProcessed: DeleteMessageBatchCommandEntry[] = [];
const processingPromises = messages.map(async (message) => {
try {
// 核心业务逻辑
await this.handleSingleMessage(message);
return { status: 'fulfilled', value: message };
} catch (error) {
this.logger.error(`Failed to process message ${message.MessageId}.`, error.stack);
// 返回错误信息,用于后续判断
return { status: 'rejected', reason: { message, error } };
}
});
// 使用 allSettled 是关键,它能确保即使批次中部分消息处理失败,
// 也能继续处理其余消息,并收集所有成功和失败的结果。
const results = await Promise.allSettled(processingPromises);
results.forEach(result => {
if (result.status === 'fulfilled' && result.value.status === 'fulfilled') {
successfullyProcessed.push({
Id: result.value.value.MessageId,
ReceiptHandle: result.value.value.ReceiptHandle,
});
} else if (result.status === 'rejected' || (result.status === 'fulfilled' && result.value.status === 'rejected')) {
// 在这里可以添加逻辑,比如将处理失败的消息信息推送到另一个监控系统
const failedMsg = result.status === 'rejected' ? result.reason.message : result.value.reason.message;
this.logger.warn(`Message ${failedMsg.MessageId} will not be deleted and will be re-processed later or sent to DLQ.`);
}
});
if (successfullyProcessed.length > 0) {
await this.deleteSuccessfullyProcessedMessages(successfullyProcessed);
}
}
private async handleSingleMessage(message: Message): Promise<void> {
// 模拟业务处理
this.logger.debug(`Processing message: ${message.MessageId}`);
const body = JSON.parse(message.Body);
// 模拟一个可能会失败的数据库写入操作
if (body.shouldFail) {
throw new Error(`Intentional failure for message with eventId: ${body.eventId}`);
}
// 模拟耗时操作
await new Promise(resolve => setTimeout(resolve, 100 + Math.random() * 200));
// this.databaseService.saveEvent(body); // 实际的数据库写入
this.logger.log(`Successfully processed event: ${body.eventId}`);
}
private async deleteSuccessfullyProcessedMessages(entries: DeleteMessageBatchCommandEntry[]): Promise<void> {
if (entries.length === 0) {
return;
}
try {
const deleteCommand = new DeleteMessageBatchCommand({
QueueUrl: this.queueUrl,
Entries: entries,
});
const result = await this.sqsClient.send(deleteCommand);
this.logger.log(`Successfully deleted ${result.Successful.length} messages.`);
if (result.Failed && result.Failed.length > 0) {
// 这种情况很少见,但必须处理。通常是 ReceiptHandle 过期等问题。
result.Failed.forEach(failure => {
this.logger.error(`Failed to delete message ${failure.Id}: ${failure.Message}`);
});
}
} catch (error) {
this.logger.error('Error deleting messages from SQS.', error.stack);
}
}
}
这段代码有几个关键的生产级考量:
- 优雅停机 (
onModuleDestroy
): 捕获应用的关闭信号,设置isShuttingDown
标志位,停止发起新的轮询。这能确保在K8s pod缩容或服务重启时,尽可能完成当前正在处理的批次,避免数据处理中断。 - 错误隔离 (
Promise.allSettled
): 在批处理中,一条消息的失败不应该影响整个批次。Promise.allSettled
保证了这一点,我们能够精确地知道哪些消息成功,哪些失败,从而只删除成功的消息。失败的消息会因为VisibilityTimeout
超时而重新出现在队列中,最终被送入死信队列(Dead-Letter Queue, DLQ)。 - 健壮的轮询循环:
startPolling
使用setTimeout
安排下一次轮询,而不是一个简单的while(true)
循环。这避免了在出现意外错误时导致CPU空转,并且try...catch...finally
结构确保了循环的持续性。 - 配置驱动: 所有AWS相关的配置(队列URL、区域、凭证)都通过 NestJS 的
ConfigService
注入,而不是硬编码。
第二步:实现轻量级事件接收API
移动端需要一个极其轻快、高可用的API端点来接收事件。这个端点的职责被简化到极致:验证请求体,然后将消息推送到SQS。它不执行任何耗时的业务逻辑或数据库操作。
// src/ingestion/ingestion.controller.ts
import { Controller, Post, Body, HttpCode, HttpStatus, Logger } from '@nestjs/common';
import { SQSClient, SendMessageCommand } from '@aws-sdk/client-sqs';
import { ConfigService } from '@nestjs/config';
import { randomUUID } from 'crypto';
// 使用 class-validator 进行DTO验证
import { IsString, IsNotEmpty, IsOptional, MaxLength } from 'class-validator';
export class IngestionEventDto {
@IsString()
@IsNotEmpty()
@MaxLength(50)
eventType: string;
@IsNotEmpty()
payload: Record<string, any>;
@IsString()
@IsOptional()
userId?: string;
}
@Controller('ingest')
export class IngestionController {
private readonly logger = new Logger(IngestionController.name);
private readonly sqsClient: SQSClient;
private readonly queueUrl: string;
constructor(private readonly configService: ConfigService) {
this.queueUrl = this.configService.get<string>('AWS_SQS_QUEUE_URL');
this.sqsClient = new SQSClient({ region: this.configService.get<string>('AWS_REGION') });
}
@Post('event')
@HttpCode(HttpStatus.ACCEPTED) // 使用 202 表示请求已被接受,将进行异步处理
async receiveEvent(@Body() eventDto: IngestionEventDto): Promise<{ messageId: string; }> {
const messageId = randomUUID();
const command = new SendMessageCommand({
QueueUrl: this.queueUrl,
MessageBody: JSON.stringify({
// 在这里可以丰富消息体,加入接收时间等元数据
receivedAt: new Date().toISOString(),
eventId: messageId,
...eventDto,
}),
// 如果需要消息去重或保证顺序,可以考虑使用 FIFO 队列并提供 MessageDeduplicationId 和 MessageGroupId
// MessageGroupId: eventDto.userId || 'anonymous',
});
try {
const result = await this.sqsClient.send(command);
this.logger.log(`Event ${messageId} successfully sent to SQS with MessageId: ${result.MessageId}`);
return { messageId: result.MessageId };
} catch (error) {
this.logger.error(`Failed to send event ${messageId} to SQS`, error.stack);
// 在真实项目中,这里应该抛出一个自定义的 HttpException
// 以便全局异常过滤器捕获并返回合适的错误响应
throw new Error('Internal server error while queueing the event.');
}
}
}
这个Controller的设计很简单,但遵循了几个最佳实践:
- 返回
202 Accepted
: 明确告知客户端,请求已被接受,但处理是异步的。 - DTO验证: 使用
class-validator
确保进入系统的数据是干净和符合预期的。 - 无业务逻辑: 除了发送到SQS,不做任何其他事情,保证了极低的响应延迟。
第三步:使用 Ansible 实现自动化部署与配置管理
代码写完只是第一步,如何在生产环境可靠、可重复地部署和管理这个消费者服务,是决定项目成败的关键。手动SSH到服务器执行 git pull
和 npm run start:prod
是不可接受的。这里就是 Ansible 发挥作用的地方。
我们为消费者服务创建了一个Ansible Role,负责完成从环境准备到服务启动的所有工作。
Playbook 结构:
ansible/
├── inventory.ini # 主机清单
├── playbook-deploy-consumer.yml # 主部署文件
└── roles/
└── nestjs-consumer/
├── tasks/
│ ├── main.yml # 主任务文件
│ ├── setup.yml # 环境准备 (Node.js, etc.)
│ └── deploy.yml # 应用部署
├── templates/
│ ├── consumer.service.j2 # systemd 服务模板
│ └── .env.j2 # .env 文件模板
└── defaults/
└── main.yml # 默认变量
核心任务文件 roles/nestjs-consumer/tasks/deploy.yml
:
- name: Create application directory
ansible.builtin.file:
path: "{{ app_dir }}"
state: directory
owner: "{{ app_user }}"
group: "{{ app_group }}"
mode: '0755'
- name: Checkout application code from Git repository
ansible.builtin.git:
repo: "{{ app_repo_url }}"
dest: "{{ app_dir }}"
version: "{{ app_branch }}"
force: yes
become: yes
become_user: "{{ app_user }}"
- name: Install npm dependencies
community.general.npm:
path: "{{ app_dir }}"
state: present
ci: yes # 使用 npm ci 以获得更快、更可靠的构建
become: yes
become_user: "{{ app_user }}"
environment:
# 代理等环境变量
HTTP_PROXY: "{{ http_proxy | default('') }}"
HTTPS_PROXY: "{{ https_proxy | default('') }}"
- name: Build NestJS application
community.general.npm:
path: "{{ app_dir }}"
script: build
become: yes
become_user: "{{ app_user }}"
- name: Template .env configuration file
ansible.builtin.template:
src: .env.j2
dest: "{{ app_dir }}/.env"
owner: "{{ app_user }}"
group: "{{ app_group }}"
mode: '0600' # .env 文件包含敏感信息,严格控制权限
notify: Restart consumer service # 当配置变更时,触发重启
- name: Template systemd service file
ansible.builtin.template:
src: consumer.service.j2
dest: /etc/systemd/system/nestjs-consumer.service
owner: root
group: root
mode: '0644'
notify: Restart consumer service # 当服务定义变更时,触发重启
- name: Ensure consumer service is started and enabled
ansible.builtin.systemd:
name: nestjs-consumer
state: started
enabled: yes
daemon_reload: yes # 在单元文件变更后必须执行
.env 模板文件 roles/nestjs-consumer/templates/.env.j2
:
# This file is managed by Ansible. Do not edit manually.
NODE_ENV=production
PORT=3001
# AWS Configuration
AWS_REGION={{ aws_region }}
AWS_SQS_QUEUE_URL={{ aws_sqs_queue_url }}
# 使用 Ansible Vault 加密的密钥
AWS_ACCESS_KEY_ID={{ aws_access_key_id }}
AWS_SECRET_ACCESS_KEY={{ aws_secret_access_key }}
# Database Configuration
DATABASE_URL={{ database_url }}
systemd 服务模板 roles/nestjs-consumer/templates/consumer.service.j2
:
[Unit]
Description=NestJS SQS Consumer Service
After=network.target
[Service]
ExecStart=/usr/bin/node {{ app_dir }}/dist/main.js
WorkingDirectory={{ app_dir }}
Restart=always
RestartSec=10
User={{ app_user }}
Group={{ app_group }}
EnvironmentFile={{ app_dir }}/.env
[Install]
WantedBy=multi-user.target
通过这套Ansible playbook,我们可以用一条命令 ansible-playbook -i inventory.ini playbook-deploy-consumer.yml
来完成对任意数量消费者节点的部署和更新,保证了环境的一致性和操作的幂等性。配置(如队列URL、数据库连接串)与代码分离,敏感信息(AWS密钥)可以通过 Ansible Vault 加密管理,大大提升了安全性。
最终形成的架构
通过上述改造,我们构建了一个解耦的、有韧性的事件处理管道。
graph TD subgraph Mobile Client A[Mobile App] end subgraph AWS Cloud B[ALB Load Balancer] --> C{NestJS Ingestion API}; C --> D[AWS SQS Queue]; subgraph "Auto Scaling Group (EC2)" E1[Consumer Instance 1] E2[Consumer Instance 2] E3[...] end D -- Long Polling --> E1; D -- Long Polling --> E2; D -- Long Polling --> E3; E1 --> F[PostgreSQL DB]; E2 --> F; E3 --> F; G[SQS Dead-Letter Queue] D -- On Failure --> G; end subgraph DevOps H[Ansible Controller] -- SSH --> E1; H -- SSH --> E2; H -- SSH --> E3; end A -- HTTPS --> B; style C fill:#f9f,stroke:#333,stroke-width:2px style D fill:#f90,stroke:#333,stroke-width:2px style G fill:#c00,stroke:#333,stroke-width:2px style E1 fill:#9cf,stroke:#333,stroke-width:2px style E2 fill:#9cf,stroke:#333,stroke-width:2px style E3 fill:#9cf,stroke:#333,stroke-width:2px
这个架构的优势显而易见:
- 高可用性: 接收API和消费者服务相互独立,一方的故障或高负载不会直接影响另一方。
- 弹性伸缩: 可以根据SQS队列的积压深度(
ApproximateNumberOfMessagesVisible
指标)独立地扩展消费者实例的数量。 - 成本效益: SQS长轮询和批处理显著降低了API调用成本和消费者的CPU空转。
- 韧性: SQS的DLQ机制自动隔离了处理失败的“毒丸”消息,防止它们阻塞整个队列,方便后续进行人工排查和处理。
- 运维效率: Ansible实现了基础设施即代码,部署流程标准化、自动化,极大地降低了人为错误的风险。
方案的局限性与未来优化路径
当前这套方案并非完美。首先,消息的端到端延迟增加了。由于队列和批处理的存在,一个事件从手机端发出到最终写入数据库,可能会有几十秒的延迟,这对于实时性要求高的业务场景是不可接受的。
其次,我们的消费者是无状态的,但Ansible的部署方式仍然是传统的“原地更新”,在部署期间会有短暂的服务中断。更优化的方式是采用蓝绿部署或滚动更新策略,通过负载均衡器来平滑地切换流量,实现零停机部署。
最后,虽然我们处理了批次内的部分失败,但对于依赖顺序的事件流,标准的SQS队列无法保证顺序。如果业务需要严格的顺序处理(例如,用户的操作步骤),则必须切换到SQS FIFO队列,并在架构层面做出相应调整,比如将用户的ID作为MessageGroupId
来保证同一用户的事件按序处理。但这也会带来吞吐量的限制,需要在业务需求和技术实现之间做出权衡。