最初面临的技术挑战是为一套高并发的秒杀活动设计库存扣减服务。传统的单体架构中,直接在事务里 SELECT ... FOR UPDATE
会在流量洪峰时导致数据库行锁竞争,吞吐量急剧下降,甚至拖垮整个数据库。即使采用乐观锁,大量的并发更新也会造成密集的重试,这在 Serverless 这种按次计费、有执行时间限制的环境中是不可接受的。我们需要一个能将写操作(扣减库存)和读操作(查询库存)彻底解耦的方案,同时保证系统的弹性和最终一致性。
这个背景下,CQRS(命令查询职责分离)模式成为首选。但真正的难点在于如何在 Serverless 架构下,以一种低成本、高可靠的方式实现它。命令和事件的传递需要一个可靠的消息中间件,而读模型的物化存储则需要一个极低延迟的数据库。技术栈的初步构想逐渐清晰:Kotlin 作为开发语言,利用其协程能力处理并发;Serverless平台(如 AWS Lambda, Knative)承载无状态的计算逻辑;NATS JetStream 作为事件总线,负责命令与事件的持久化和可靠传递;最后,使用键值型 NoSQL 数据库(如 Redis, DynamoDB)作为读模型的投影存储。
整个系统的核心数据流设计如下:
graph TD subgraph "写模型 (Write Model - Command Side)" A[API Gateway] -- Command: DecreaseStock --> B(NATS Command Stream: `commands.inventory`); C[Serverless: Command Handler] -- Subscribes --> B; C -- Business Logic --> D{Stock Available?}; D -- Yes --> E(NATS Event Stream: `events.inventory`); D -- No --> F(NATS Event Stream: `errors.inventory`); E -- Event: StockDecreased --> G[Event Log]; F -- Event: StockUnavailable --> G; end subgraph "读模型 (Read Model - Query Side)" H[Serverless: Projection Handler] -- Subscribes --> E; H -- Updates Read Model --> I[NoSQL KV Store]; J[API Gateway] -- Query: GetStock --> K[Serverless: Query Handler]; K -- Reads --> I; end style C fill:#f9f,stroke:#333,stroke-width:2px style H fill:#f9f,stroke:#333,stroke-width:2px style K fill:#ccf,stroke:#333,stroke-width:2px
这个架构的关键在于,命令处理器只负责接收命令、执行业务规则并发布事件。它不直接更新任何可供查询的状态。真正的状态变更体现在事件流中。而另一个独立的投影处理器,则像一个忠实的听众,订阅事件流并将事件内容“投影”到一个为快速查询而优化的读模型中。
数据结构定义:命令与事件
在 Kotlin 中,使用 data class
配合序列化库(如 kotlinx.serialization
)来定义不可变的命令和事件是最佳实践。
import kotlinx.serialization.Serializable
// 命令通常是祈使句,表达意图
@Serializable
data class DecreaseStockCommand(
val commandId: String, // 用于幂等性检查
val sku: String,
val quantity: Int,
val timestamp: Long = System.currentTimeMillis()
)
// 事件是过去时态,描述已经发生的事实
@Serializable
sealed class InventoryEvent {
abstract val eventId: String
abstract val sku: String
abstract val timestamp: Long
}
@Serializable
data class StockDecreasedEvent(
override val eventId: String,
override val sku: String,
val quantityDecreased: Int,
val correlationId: String, // 关联的 CommandId
override val timestamp: Long = System.currentTimeMillis()
) : InventoryEvent()
@Serializable
data class StockUnavailableEvent(
override val eventId: String,
override val sku: String,
val quantityRequested: Int,
val reason: String,
val correlationId: String, // 关联的 CommandId
override val timestamp: Long = System.currentTimeMillis()
) : InventoryEvent()
commandId
和 correlationId
是实现端到端追踪和幂等性处理的关键。sealed class
用于确保我们能处理所有定义的事件类型。
NATS JetStream 配置
我们需要两个核心的 Stream:一个用于接收命令,一个用于发布领域事件。在生产环境中,这些通常通过 Terraform 或 NATS CLI 进行管理。
# streams.yaml
# 使用 'nats stream create' 命令应用此配置
# 1. 用于接收库存相关命令的 Stream
- name: INVENTORY_COMMANDS
subjects:
- "commands.inventory.>"
storage: file
retention: limits
max_age: "24h" # 命令处理后可丢弃,保留24小时用于调试
discard: old
duplicate_window: "2m" # NATS 级别的消息去重窗口
# 2. 用于发布库存领域事件的 Stream,这是系统的真相来源
- name: INVENTORY_EVENTS
subjects:
- "events.inventory.>"
storage: file
retention: interest # 只要有消费者就保留
max_bytes: 10GB
replicas: 3 # 在生产环境中至少为3
discard: old
INVENTORY_EVENTS
的 retention
策略和 replicas
数量至关重要,它构成了我们事件溯源(Event Sourcing)的基础,是系统的持久化事实记录。
命令处理器实现
这是 Serverless 函数的核心逻辑。它订阅 commands.inventory.decrease
主题,处理消息,然后发布事件。
import io.nats.client.Connection
import io.nats.client.JetStream
import io.nats.client.Nats
import io.nats.client.PushSubscribeOptions
import io.nats.client.api.DeliverPolicy
import kotlinx.coroutines.coroutineScope
import kotlinx.coroutines.isActive
import kotlinx.coroutines.launch
import kotlinx.serialization.json.Json
import kotlinx.serialization.encodeToString
import java.nio.charset.StandardCharsets
import java.util.UUID
// 假设这些是环境变量或配置
val NATS_URL = System.getenv("NATS_URL") ?: "nats://localhost:4222"
val COMMAND_SUBJECT = "commands.inventory.decrease"
val EVENT_SUBJECT_SUCCESS = "events.inventory.decreased"
val EVENT_SUBJECT_FAILURE = "events.inventory.unavailable"
// 模拟一个库存状态存储,真实项目中这可能是另一个服务或数据库
// 但在命令处理器中,它只用于决策,不作为可查询状态
object InventoryStateService {
private val stock = mutableMapOf("SKU-123" to 100)
fun getStock(sku: String): Int = stock.getOrDefault(sku, 0)
fun decreaseStock(sku: String, quantity: Int) {
stock[sku] = getStock(sku) - quantity
}
}
suspend fun main() = coroutineScope {
val nc: Connection = Nats.connect(NATS_URL)
val js: JetStream = nc.jetStream()
// 创建一个持久化的消费者,确保服务重启后能从上次的位置继续
val subOpts = PushSubscribeOptions.builder()
.durable("inv-command-handler")
.deliverPolicy(DeliverPolicy.All) // 总是从头开始,或New
.build()
val subscription = js.subscribe(COMMAND_SUBJECT, "cmd-handler-queue", subOpts)
println("Listening on [$COMMAND_SUBJECT]...")
launch {
while (isActive) {
val msg = subscription.nextMessage(java.time.Duration.ofSeconds(1))
if (msg != null) {
try {
val commandJson = String(msg.data, StandardCharsets.UTF_8)
val command = Json.decodeFromString<DecreaseStockCommand>(commandJson)
// 这里的幂等性检查是关键
// 在真实项目中,需要一个分布式缓存(如Redis)来检查 commandId 是否已处理
// if (isCommandProcessed(command.commandId)) {
// msg.ack()
// continue
// }
println("Processing command: ${command.commandId}")
val currentStock = InventoryStateService.getStock(command.sku)
if (currentStock >= command.quantity) {
// 业务逻辑:扣减库存
InventoryStateService.decreaseStock(command.sku, command.quantity)
// 发布成功事件
val successEvent = StockDecreasedEvent(
eventId = UUID.randomUUID().toString(),
sku = command.sku,
quantityDecreased = command.quantity,
correlationId = command.commandId
)
val eventJson = Json.encodeToString(successEvent)
js.publish(EVENT_SUBJECT_SUCCESS, eventJson.toByteArray(StandardCharsets.UTF_8))
println("Published StockDecreasedEvent for ${command.commandId}")
} else {
// 发布失败事件
val failureEvent = StockUnavailableEvent(
eventId = UUID.randomUUID().toString(),
sku = command.sku,
quantityRequested = command.quantity,
reason = "Insufficient stock. Available: $currentStock",
correlationId = command.commandId
)
val eventJson = Json.encodeToString(failureEvent)
js.publish(EVENT_SUBJECT_FAILURE, eventJson.toByteArray(StandardCharsets.UTF_8))
println("Published StockUnavailableEvent for ${command.commandId}")
}
// 标记 commandId 已处理
// markCommandAsProcessed(command.commandId)
// 确认消息,NATS JetStream 不会再次投递
msg.ack()
} catch (e: Exception) {
System.err.println("Error processing message: ${e.message}")
// 不 Acknowledge 消息,让 NATS 在 ack_wait 时间后重传
// 需要配置合理的重传策略,避免无限重试
msg.nak()
}
}
}
}
}
代码中的生产级考量:
- Durable Consumer:
durable("inv-command-handler")
创建了一个持久消费者。即使函数实例重启,NATS 也会记住它处理到了哪条消息,从而保证消息不丢失。 - Queue Group:
js.subscribe
的第二个参数"cmd-handler-queue"
定义了一个队列组。这允许多个 Serverless 函数实例水平扩展,共同消费命令流,但 NATS 会确保每条消息只被组内的一个实例处理。 - 幂等性: 注释中提到了幂等性检查。这是必须的。因为网络问题或客户端重试,同一个命令可能被发送多次。在处理命令前,必须检查
commandId
是否已经处理过。通常使用一个带 TTL 的 Redis key 来实现。 - 错误处理:
msg.nak()
是一个重要的机制。当处理失败时(例如,数据库连接超时),调用nak()
告诉 NATS 这条消息处理失败,请求重传。这比简单的try-catch
后ack()
要健壮得多。
投影处理器与读模型
这个 Serverless 函数的任务更纯粹:它只关心 StockDecreasedEvent
,并用它来更新一个为查询优化的键值存储。
import io.nats.client.Connection
import io.nats.client.JetStream
import io.nats.client.Nats
import io.nats.client.PushSubscribeOptions
import io.nats.client.api.DeliverPolicy
import kotlinx.coroutines.coroutineScope
import kotlinx.coroutines.isActive
import kotlinx.coroutines.launch
import kotlinx.serialization.json.Json
import redis.clients.jedis.JedisPool
import redis.clients.jedis.JedisPoolConfig
// 假设这些是环境变量或配置
val NATS_URL = System.getenv("NATS_URL") ?: "nats://localhost:4222"
val REDIS_HOST = System.getenv("REDIS_HOST") ?: "localhost"
val EVENT_SUBJECT_DECREASED = "events.inventory.decreased"
// 简化的 Redis 客户端
object ReadModelRepository {
private val pool = JedisPool(JedisPoolConfig(), REDIS_HOST, 6379)
fun applyStockDecrease(sku: String, quantity: Int) {
pool.resource.use { jedis ->
// 使用 HINCRBY 来原子性地更新库存
// 我们的事件是 StockDecreased,所以这里是减去
jedis.hincrBy("inventory_view", sku, -quantity.toLong())
}
println("Updated read model for SKU $sku. Decreased by $quantity.")
}
}
suspend fun main() = coroutineScope {
val nc: Connection = Nats.connect(NATS_URL)
val js: JetStream = nc.jetStream()
// 这个消费者也必须是持久化的,以保证所有事件都被投影
val subOpts = PushSubscribeOptions.builder()
.durable("inv-projection-handler")
.deliverPolicy(DeliverPolicy.All)
.build()
val subscription = js.subscribe(EVENT_SUBJECT_DECREASED, "projection-queue", subOpts)
println("Listening on [$EVENT_SUBJECT_DECREASED] for projection...")
launch {
while (isActive) {
val msg = subscription.nextMessage(java.time.Duration.ofSeconds(1))
if (msg != null) {
try {
val eventJson = String(msg.data, StandardCharsets.UTF_8)
val event = Json.decodeFromString<StockDecreasedEvent>(eventJson)
println("Projecting event: ${event.eventId}")
// 核心逻辑:更新读模型
ReadModelRepository.applyStockDecrease(event.sku, event.quantityDecreased)
// 确认消息
msg.ack()
} catch (e: Exception) {
System.err.println("Error projecting event: ${e.message}")
// 同样,如果更新 Redis 失败,nak() 以便重试
// 这里的重试需要小心,如果 Redis 操作不是幂等的,可能会导致数据不一致
// HINCRBY 是幂等的,多次执行相同事件只会导致结果错误,需要外部幂等性保证
// NATS 的消息序列号可用于实现此目的
msg.nak()
}
}
}
}
}
投影处理器的关键点:
- 原子操作: 使用
HINCRBY
而不是GET
->SET
的组合,确保了对 Redis 中库存计数的更新是原子操作,避免了并发更新时的数据竞争。 - 无业务逻辑: 这个处理器不包含任何业务决策。它只是机械地将事件中的数据应用到读模型上。这使得它非常简单、快速且易于维护。
- 可重播性: 由于
INVENTORY_EVENTS
Stream 是持久的,如果我们需要重建读模型(例如,修复了一个 bug 或增加一个新的读模型),我们可以创建一个新的消费者,从头开始回放所有事件。这是事件溯源模式带来的巨大优势。
测试思路
在真实项目中,对这样一套分布式系统的测试至关重要。
- 单元测试: 针对命令处理器中的业务逻辑(
InventoryStateService
)进行纯粹的单元测试。 - 集成测试: 使用
testcontainers
启动一个 NATS 实例和一个 Redis 实例。编写测试用例,发布一个DecreaseStockCommand
到 NATS,然后轮询 Redis 验证读模型是否在预期时间内被正确更新。同时,检查INVENTORY_EVENTS
Stream 中是否包含了正确的StockDecreasedEvent
。可以模拟各种失败场景,如nak()
消息,验证系统的自愈能力。
例如,一个集成测试的伪代码片段:
// @Test
fun `given a valid decrease command, the read model should be updated eventually`() {
// 1. 使用 testcontainers 启动 natsContainer 和 redisContainer
// 2. 初始化 NATS 和 Redis 客户端连接到容器
// 3. 启动 Command Handler 和 Projection Handler 的协程
// 4. 发布一个命令到 NATS
val command = DecreaseStockCommand(commandId = "cmd-1", sku = "SKU-123", quantity = 5)
js.publish("commands.inventory.decrease", Json.encodeToString(command).toByteArray())
// 5. 使用 Awaitility 或类似库等待条件满足
await.atMost(5, TimeUnit.SECONDS).untilAsserted {
val stockInRedis = redisClient.hget("inventory_view", "SKU-123")?.toLong()
// 假设初始库存为 100
assertEquals(95L, stockInRedis)
}
}
这种测试方式能有效地验证整个数据流的正确性和组件间的协作。
这套基于 Kotlin, NATS, Serverless 和 NoSQL 的 CQRS 实现,并非没有它的复杂性。最显著的代价就是引入了最终一致性。用户在扣减库存后立即查询,可能会在极短的时间窗口内看到旧的数据。这个延迟(从事件发布到投影完成)是需要监控的关键业务指标(SLI)。此外,整个系统的可观测性变得更加重要,需要对命令处理成功率、事件投影延迟、死信队列等进行全面的监控和告警。
尽管如此,该架构获得的收益是巨大的:写操作的吞吐量不再受限于数据库,而是 NATS 的摄入能力,这通常要高出几个数量级。读写分离使得查询服务可以独立扩展,且不会对写操作产生任何影响。Serverless 的弹性伸缩特性与这种事件驱动的模式完美契合,能够以极高的成本效益应对流量的瞬时高峰。对于需要高并发写入和高性能读取的场景,这是一套非常务实且强大的架构选择。