基于 AWS SQS 与 NestJS 构建移动端事件的高韧性异步批处理管道


移动端分析事件的洪峰差点冲垮了我们的主数据库。最初的设计非常直接:移动App采集用户行为,通过一个RESTful API直接写入PostgreSQL。在用户量少的时候,这套方案运行良好。但随着一次市场推广活动的成功,并发请求量在几分钟内飙升了50倍。API响应时间从50ms恶化到5000ms,数据库连接池被打满,最终导致了部分核心业务的阻塞。这是一个典型的、代价高昂的教训——同步写入是脆弱的。

问题根源在于耦合。数据接收和数据处理被紧紧地绑在了一起。我们需要拆解它。初步构想是引入一个消息队列,将数据接收的峰值“削平”,让后端处理系统可以按照自己的节奏去消费。在技术选型上,团队内部对 Kafka 和 AWS SQS 做了比较。考虑到我们现有的技术栈深度和运维成本,最终选择了 AWS SQS。它足够简单,完全托管,按量付费,并且与我们已有的AWS生态无缝集成,无需我们投入额外的精力去维护一个复杂的消息中间件集群。

后端服务我们决定继续使用 NestJS。它提供的模块化和依赖注入体系,非常适合构建结构清晰、易于测试的后台应用,无论是实现一个简单的API接收器,还是一个复杂的后台消费者(Consumer)服务。

第一步:重构写入链路,设计独立的消费者服务

首要任务是创建一个独立的、常驻后台的消费者进程。它的唯一职责就是从 SQS 队列中拉取消息,进行批处理,然后写入数据库。这个服务必须是健壮的,能够处理部分失败、优雅地关闭,并且易于水平扩展。

消费者核心逻辑:长轮询与批处理

为了降低成本和API调用次数,我们必须使用 SQS 的长轮询(Long Polling)和批处理(Batching)特性。这意味着消费者会向 SQS 请求最多10条消息,并等待最长20秒,直到满足任意一个条件才返回。

这是消费者服务的核心模块 sqs-consumer.service.ts 的实现。它不依赖任何特定的 NestJS SQS 封装库,而是直接使用官方的 @aws-sdk/client-sqs,这能让我们对行为有最精细的控制。

// src/sqs-consumer/sqs-consumer.service.ts
import { Injectable, OnModuleInit, OnModuleDestroy, Logger } from '@nestjs/common';
import {
  SQSClient,
  ReceiveMessageCommand,
  DeleteMessageBatchCommand,
  Message,
  ReceiveMessageCommandInput,
  DeleteMessageBatchCommandEntry,
} from '@aws-sdk/client-sqs';
import { ConfigService } from '@nestjs/config';

@Injectable()
export class SqsConsumerService implements OnModuleInit, OnModuleDestroy {
  private readonly logger = new Logger(SqsConsumerService.name);
  private readonly sqsClient: SQSClient;
  private readonly queueUrl: string;
  private readonly maxMessagesPerBatch: number = 10; // SQS 上限
  private readonly waitTimeSeconds: number = 20; // 启用长轮询
  private isShuttingDown = false;
  private pollingInterval: NodeJS.Timeout | null = null;

  constructor(private readonly configService: ConfigService) {
    this.queueUrl = this.configService.get<string>('AWS_SQS_QUEUE_URL');

    // 在真实项目中,Region, Credentials 等应通过 ConfigService 或 IAM Role 动态注入
    this.sqsClient = new SQSClient({
      region: this.configService.get<string>('AWS_REGION'),
      // 如果部署在 EC2 并附加了 IAM Role,则无需显式提供 credentials
      credentials: {
        accessKeyId: this.configService.get<string>('AWS_ACCESS_KEY_ID'),
        secretAccessKey: this.configService.get<string>('AWS_SECRET_ACCESS_KEY'),
      },
    });

    if (!this.queueUrl) {
      throw new Error('AWS_SQS_QUEUE_URL is not defined in environment variables.');
    }
  }

  async onModuleInit() {
    this.logger.log('SQS Consumer Service initialized. Starting to poll...');
    // 使用 setImmediate 确保在模块完全初始化后再开始轮询
    setImmediate(() => this.startPolling());
  }

  async onModuleDestroy() {
    this.logger.log('Shutdown signal received. Stopping SQS polling...');
    this.isShuttingDown = true;
    if (this.pollingInterval) {
      clearTimeout(this.pollingInterval);
    }
    // 这里可以添加逻辑,等待当前正在处理的批次完成
    await new Promise(resolve => setTimeout(resolve, 2000)); // 简单等待
    this.logger.log('SQS Consumer Service stopped.');
  }

  private startPolling() {
    if (this.isShuttingDown) {
      return;
    }

    this.pollMessages()
      .catch((error) => {
        // 捕获轮询过程中的未知错误,防止进程崩溃
        this.logger.error('An unexpected error occurred during polling.', error.stack);
      })
      .finally(() => {
        // 无论成功或失败,只要没在关闭,就安排下一次轮询
        if (!this.isShuttingDown) {
          this.pollingInterval = setTimeout(() => this.startPolling(), 1000); // 1秒后再次轮询
        }
      });
  }

  private async pollMessages(): Promise<void> {
    const receiveParams: ReceiveMessageCommandInput = {
      QueueUrl: this.queueUrl,
      MaxNumberOfMessages: this.maxMessagesPerBatch,
      WaitTimeSeconds: this.waitTimeSeconds,
      // 获取消息体和一些关键属性,用于调试和错误处理
      MessageAttributeNames: ['All'],
      AttributeNames: ['ApproximateReceiveCount'],
    };

    try {
      const command = new ReceiveMessageCommand(receiveParams);
      const { Messages } = await this.sqsClient.send(command);

      if (!Messages || Messages.length === 0) {
        this.logger.verbose('No messages received in this poll.');
        return;
      }

      this.logger.log(`Received ${Messages.length} messages. Processing batch...`);
      await this.processBatch(Messages);

    } catch (error) {
      this.logger.error(`Failed to receive messages from SQS: ${error.message}`, error.stack);
      // 在出现 SDK 错误时(如网络问题),增加等待时间,避免频繁请求
      await new Promise(resolve => setTimeout(resolve, 5000));
    }
  }

  private async processBatch(messages: Message[]): Promise<void> {
    const successfullyProcessed: DeleteMessageBatchCommandEntry[] = [];
    const processingPromises = messages.map(async (message) => {
      try {
        // 核心业务逻辑
        await this.handleSingleMessage(message);
        return { status: 'fulfilled', value: message };
      } catch (error) {
        this.logger.error(`Failed to process message ${message.MessageId}.`, error.stack);
        // 返回错误信息,用于后续判断
        return { status: 'rejected', reason: { message, error } };
      }
    });

    // 使用 allSettled 是关键,它能确保即使批次中部分消息处理失败,
    // 也能继续处理其余消息,并收集所有成功和失败的结果。
    const results = await Promise.allSettled(processingPromises);

    results.forEach(result => {
      if (result.status === 'fulfilled' && result.value.status === 'fulfilled') {
        successfullyProcessed.push({
          Id: result.value.value.MessageId,
          ReceiptHandle: result.value.value.ReceiptHandle,
        });
      } else if (result.status === 'rejected' || (result.status === 'fulfilled' && result.value.status === 'rejected')) {
        // 在这里可以添加逻辑,比如将处理失败的消息信息推送到另一个监控系统
        const failedMsg = result.status === 'rejected' ? result.reason.message : result.value.reason.message;
        this.logger.warn(`Message ${failedMsg.MessageId} will not be deleted and will be re-processed later or sent to DLQ.`);
      }
    });

    if (successfullyProcessed.length > 0) {
      await this.deleteSuccessfullyProcessedMessages(successfullyProcessed);
    }
  }

  private async handleSingleMessage(message: Message): Promise<void> {
    // 模拟业务处理
    this.logger.debug(`Processing message: ${message.MessageId}`);
    const body = JSON.parse(message.Body);
    
    // 模拟一个可能会失败的数据库写入操作
    if (body.shouldFail) {
      throw new Error(`Intentional failure for message with eventId: ${body.eventId}`);
    }

    // 模拟耗时操作
    await new Promise(resolve => setTimeout(resolve, 100 + Math.random() * 200));

    // this.databaseService.saveEvent(body); // 实际的数据库写入
    this.logger.log(`Successfully processed event: ${body.eventId}`);
  }

  private async deleteSuccessfullyProcessedMessages(entries: DeleteMessageBatchCommandEntry[]): Promise<void> {
    if (entries.length === 0) {
      return;
    }

    try {
      const deleteCommand = new DeleteMessageBatchCommand({
        QueueUrl: this.queueUrl,
        Entries: entries,
      });
      const result = await this.sqsClient.send(deleteCommand);
      
      this.logger.log(`Successfully deleted ${result.Successful.length} messages.`);
      
      if (result.Failed && result.Failed.length > 0) {
        // 这种情况很少见,但必须处理。通常是 ReceiptHandle 过期等问题。
        result.Failed.forEach(failure => {
          this.logger.error(`Failed to delete message ${failure.Id}: ${failure.Message}`);
        });
      }
    } catch (error) {
      this.logger.error('Error deleting messages from SQS.', error.stack);
    }
  }
}

这段代码有几个关键的生产级考量:

  1. 优雅停机 (onModuleDestroy): 捕获应用的关闭信号,设置 isShuttingDown 标志位,停止发起新的轮询。这能确保在K8s pod缩容或服务重启时,尽可能完成当前正在处理的批次,避免数据处理中断。
  2. 错误隔离 (Promise.allSettled): 在批处理中,一条消息的失败不应该影响整个批次。Promise.allSettled 保证了这一点,我们能够精确地知道哪些消息成功,哪些失败,从而只删除成功的消息。失败的消息会因为 VisibilityTimeout 超时而重新出现在队列中,最终被送入死信队列(Dead-Letter Queue, DLQ)。
  3. 健壮的轮询循环: startPolling 使用 setTimeout 安排下一次轮询,而不是一个简单的 while(true) 循环。这避免了在出现意外错误时导致CPU空转,并且 try...catch...finally 结构确保了循环的持续性。
  4. 配置驱动: 所有AWS相关的配置(队列URL、区域、凭证)都通过 NestJS 的 ConfigService 注入,而不是硬编码。

第二步:实现轻量级事件接收API

移动端需要一个极其轻快、高可用的API端点来接收事件。这个端点的职责被简化到极致:验证请求体,然后将消息推送到SQS。它不执行任何耗时的业务逻辑或数据库操作。

// src/ingestion/ingestion.controller.ts
import { Controller, Post, Body, HttpCode, HttpStatus, Logger } from '@nestjs/common';
import { SQSClient, SendMessageCommand } from '@aws-sdk/client-sqs';
import { ConfigService } from '@nestjs/config';
import { randomUUID } from 'crypto';

// 使用 class-validator 进行DTO验证
import { IsString, IsNotEmpty, IsOptional, MaxLength } from 'class-validator';

export class IngestionEventDto {
  @IsString()
  @IsNotEmpty()
  @MaxLength(50)
  eventType: string;

  @IsNotEmpty()
  payload: Record<string, any>;

  @IsString()
  @IsOptional()
  userId?: string;
}

@Controller('ingest')
export class IngestionController {
  private readonly logger = new Logger(IngestionController.name);
  private readonly sqsClient: SQSClient;
  private readonly queueUrl: string;

  constructor(private readonly configService: ConfigService) {
    this.queueUrl = this.configService.get<string>('AWS_SQS_QUEUE_URL');
    this.sqsClient = new SQSClient({ region: this.configService.get<string>('AWS_REGION') });
  }

  @Post('event')
  @HttpCode(HttpStatus.ACCEPTED) // 使用 202 表示请求已被接受,将进行异步处理
  async receiveEvent(@Body() eventDto: IngestionEventDto): Promise<{ messageId: string; }> {
    const messageId = randomUUID();
    const command = new SendMessageCommand({
      QueueUrl: this.queueUrl,
      MessageBody: JSON.stringify({
        // 在这里可以丰富消息体,加入接收时间等元数据
        receivedAt: new Date().toISOString(),
        eventId: messageId,
        ...eventDto,
      }),
      // 如果需要消息去重或保证顺序,可以考虑使用 FIFO 队列并提供 MessageDeduplicationId 和 MessageGroupId
      // MessageGroupId: eventDto.userId || 'anonymous',
    });

    try {
      const result = await this.sqsClient.send(command);
      this.logger.log(`Event ${messageId} successfully sent to SQS with MessageId: ${result.MessageId}`);
      return { messageId: result.MessageId };
    } catch (error) {
      this.logger.error(`Failed to send event ${messageId} to SQS`, error.stack);
      // 在真实项目中,这里应该抛出一个自定义的 HttpException
      // 以便全局异常过滤器捕获并返回合适的错误响应
      throw new Error('Internal server error while queueing the event.');
    }
  }
}

这个Controller的设计很简单,但遵循了几个最佳实践:

  • 返回 202 Accepted: 明确告知客户端,请求已被接受,但处理是异步的。
  • DTO验证: 使用 class-validator 确保进入系统的数据是干净和符合预期的。
  • 无业务逻辑: 除了发送到SQS,不做任何其他事情,保证了极低的响应延迟。

第三步:使用 Ansible 实现自动化部署与配置管理

代码写完只是第一步,如何在生产环境可靠、可重复地部署和管理这个消费者服务,是决定项目成败的关键。手动SSH到服务器执行 git pullnpm run start:prod 是不可接受的。这里就是 Ansible 发挥作用的地方。

我们为消费者服务创建了一个Ansible Role,负责完成从环境准备到服务启动的所有工作。

Playbook 结构:

ansible/
├── inventory.ini             # 主机清单
├── playbook-deploy-consumer.yml # 主部署文件
└── roles/
    └── nestjs-consumer/
        ├── tasks/
        │   ├── main.yml      # 主任务文件
        │   ├── setup.yml     # 环境准备 (Node.js, etc.)
        │   └── deploy.yml    # 应用部署
        ├── templates/
        │   ├── consumer.service.j2 # systemd 服务模板
        │   └── .env.j2             # .env 文件模板
        └── defaults/
            └── main.yml        # 默认变量

核心任务文件 roles/nestjs-consumer/tasks/deploy.yml:

- name: Create application directory
  ansible.builtin.file:
    path: "{{ app_dir }}"
    state: directory
    owner: "{{ app_user }}"
    group: "{{ app_group }}"
    mode: '0755'

- name: Checkout application code from Git repository
  ansible.builtin.git:
    repo: "{{ app_repo_url }}"
    dest: "{{ app_dir }}"
    version: "{{ app_branch }}"
    force: yes
  become: yes
  become_user: "{{ app_user }}"

- name: Install npm dependencies
  community.general.npm:
    path: "{{ app_dir }}"
    state: present
    ci: yes # 使用 npm ci 以获得更快、更可靠的构建
  become: yes
  become_user: "{{ app_user }}"
  environment:
    # 代理等环境变量
    HTTP_PROXY: "{{ http_proxy | default('') }}"
    HTTPS_PROXY: "{{ https_proxy | default('') }}"

- name: Build NestJS application
  community.general.npm:
    path: "{{ app_dir }}"
    script: build
  become: yes
  become_user: "{{ app_user }}"

- name: Template .env configuration file
  ansible.builtin.template:
    src: .env.j2
    dest: "{{ app_dir }}/.env"
    owner: "{{ app_user }}"
    group: "{{ app_group }}"
    mode: '0600' # .env 文件包含敏感信息,严格控制权限
  notify: Restart consumer service # 当配置变更时,触发重启

- name: Template systemd service file
  ansible.builtin.template:
    src: consumer.service.j2
    dest: /etc/systemd/system/nestjs-consumer.service
    owner: root
    group: root
    mode: '0644'
  notify: Restart consumer service # 当服务定义变更时,触发重启

- name: Ensure consumer service is started and enabled
  ansible.builtin.systemd:
    name: nestjs-consumer
    state: started
    enabled: yes
    daemon_reload: yes # 在单元文件变更后必须执行

.env 模板文件 roles/nestjs-consumer/templates/.env.j2:

# This file is managed by Ansible. Do not edit manually.
NODE_ENV=production
PORT=3001

# AWS Configuration
AWS_REGION={{ aws_region }}
AWS_SQS_QUEUE_URL={{ aws_sqs_queue_url }}

# 使用 Ansible Vault 加密的密钥
AWS_ACCESS_KEY_ID={{ aws_access_key_id }}
AWS_SECRET_ACCESS_KEY={{ aws_secret_access_key }}

# Database Configuration
DATABASE_URL={{ database_url }}

systemd 服务模板 roles/nestjs-consumer/templates/consumer.service.j2:

[Unit]
Description=NestJS SQS Consumer Service
After=network.target

[Service]
ExecStart=/usr/bin/node {{ app_dir }}/dist/main.js
WorkingDirectory={{ app_dir }}
Restart=always
RestartSec=10
User={{ app_user }}
Group={{ app_group }}
EnvironmentFile={{ app_dir }}/.env

[Install]
WantedBy=multi-user.target

通过这套Ansible playbook,我们可以用一条命令 ansible-playbook -i inventory.ini playbook-deploy-consumer.yml 来完成对任意数量消费者节点的部署和更新,保证了环境的一致性和操作的幂等性。配置(如队列URL、数据库连接串)与代码分离,敏感信息(AWS密钥)可以通过 Ansible Vault 加密管理,大大提升了安全性。

最终形成的架构

通过上述改造,我们构建了一个解耦的、有韧性的事件处理管道。

graph TD
    subgraph Mobile Client
        A[Mobile App]
    end

    subgraph AWS Cloud
        B[ALB Load Balancer] --> C{NestJS Ingestion API};
        C --> D[AWS SQS Queue];
        subgraph "Auto Scaling Group (EC2)"
            E1[Consumer Instance 1]
            E2[Consumer Instance 2]
            E3[...]
        end
        D -- Long Polling --> E1;
        D -- Long Polling --> E2;
        D -- Long Polling --> E3;
        
        E1 --> F[PostgreSQL DB];
        E2 --> F;
        E3 --> F;

        G[SQS Dead-Letter Queue]
        D -- On Failure --> G;
    end
    
    subgraph DevOps
      H[Ansible Controller] -- SSH --> E1;
      H -- SSH --> E2;
      H -- SSH --> E3;
    end


    A -- HTTPS --> B;

    style C fill:#f9f,stroke:#333,stroke-width:2px
    style D fill:#f90,stroke:#333,stroke-width:2px
    style G fill:#c00,stroke:#333,stroke-width:2px
    style E1 fill:#9cf,stroke:#333,stroke-width:2px
    style E2 fill:#9cf,stroke:#333,stroke-width:2px
    style E3 fill:#9cf,stroke:#333,stroke-width:2px

这个架构的优势显而易见:

  1. 高可用性: 接收API和消费者服务相互独立,一方的故障或高负载不会直接影响另一方。
  2. 弹性伸缩: 可以根据SQS队列的积压深度(ApproximateNumberOfMessagesVisible 指标)独立地扩展消费者实例的数量。
  3. 成本效益: SQS长轮询和批处理显著降低了API调用成本和消费者的CPU空转。
  4. 韧性: SQS的DLQ机制自动隔离了处理失败的“毒丸”消息,防止它们阻塞整个队列,方便后续进行人工排查和处理。
  5. 运维效率: Ansible实现了基础设施即代码,部署流程标准化、自动化,极大地降低了人为错误的风险。

方案的局限性与未来优化路径

当前这套方案并非完美。首先,消息的端到端延迟增加了。由于队列和批处理的存在,一个事件从手机端发出到最终写入数据库,可能会有几十秒的延迟,这对于实时性要求高的业务场景是不可接受的。

其次,我们的消费者是无状态的,但Ansible的部署方式仍然是传统的“原地更新”,在部署期间会有短暂的服务中断。更优化的方式是采用蓝绿部署或滚动更新策略,通过负载均衡器来平滑地切换流量,实现零停机部署。

最后,虽然我们处理了批次内的部分失败,但对于依赖顺序的事件流,标准的SQS队列无法保证顺序。如果业务需要严格的顺序处理(例如,用户的操作步骤),则必须切换到SQS FIFO队列,并在架构层面做出相应调整,比如将用户的ID作为MessageGroupId来保证同一用户的事件按序处理。但这也会带来吞吐量的限制,需要在业务需求和技术实现之间做出权衡。


  目录