团队最初面临的问题是全球用户数据的读取一致性。我们为一款金融交易分析工具提供后端服务,用户遍布北美、欧洲和亚洲。传统的读写分离、主从复制架构在跨大西洋光缆的延迟面前显得力不从心。欧洲用户提交的交易,需要数十秒甚至数分钟才能在亚洲的只读副本上查询到,这在我们的业务场景中是不可接受的。更糟糕的是,复杂的缓存策略和数据同步逻辑让系统变得脆弱不堪,一次网络分区就可能导致数据长时间不一致,排查起来如同噩梦。
我们需要的不是在现有架构上打补丁,而是一种根本性的变革。核心诉around是:拥有一个单一、不可变的事件真相源,并能从这个真相源中,以低延迟、高一致性的方式,在全球任何地方构建出我们需要的读取模型(Read Model)。这直接将我们的目光引向了事件溯源(Event Sourcing)和CQRS架构,而挑战在于,如何在地理分布式环境中实现它。
初步构想与技术选型决策
我们的构想是这样的:所有改变系统状态的操作都以“事件”的形式,被持久化到一个仅追加(Append-Only)的日志中。这个日志就是我们唯一的真相源。然后,独立的后台进程(我们称之为“投影器”)异步地读取这些事件,并将它们“投影”成用户查询所需的各种物化视图。
这个架构的成败,取决于底层数据库和通信协议。在经过几轮激烈的技术辩论后,我们确定了核心技术栈:
数据库: CockroachDB
- 为什么不用传统的PostgreSQL或MySQL?它们的分布式能力有限,要实现地理分布下的强一致性写入,需要引入复杂的中间件,这正是我们想摆脱的。
- 为什么不用Cassandra或DynamoDB?它们是优秀的NoSQL数据库,但我们业务中复杂的查询和对事务的需求,使得SQL的表达能力和ACID保证至关重要。
- CockroachDB成了唯一的选择。它兼容PostgreSQL协议,天生为地理分布式设计,提供了可串行化(Serializable)的隔离级别。它的
AS OF SYSTEM TIME
查询能力,对于调试和重放事件流来说,简直是量身定做的。我们可以将整个事件存储和投影存储都放在一个逻辑集群里,由它来处理跨区域的数据复制和一致性。
后端服务与通信: gRPC-Go
- 在事件溯源系统中,服务间的性能至关重要。投影器需要高效地拉取事件,客户端需要实时地接收投影更新。RESTful API在这种高频、低延迟的场景下显得笨拙。
- gRPC基于HTTP/2,使用Protobuf进行序列化,性能优越。更关键的是,它对双向流(Bi-directional Streaming)的原生支持,完美契合了我们的需求:客户端(例如我们的Swift App)可以与服务器建立一个长连接,服务器一旦更新了投影,就能立即将变化推送到客户端。
- 选择Go语言,是因为它出色的并发性能、简洁的语法以及成熟的gRPC生态。对于构建高并发的投影器和API服务来说,它是理想的工具。
设计方法论: Domain-Driven Design (DDD)
- 事件溯源与DDD是天作之合。DDD的核心就是领域事件。通过DDD,我们可以清晰地定义出系统的限界上下文(Bounded Contexts)、聚合(Aggregates)、命令(Commands)和事件(Events)。这套语言不仅指导了我们的代码结构,也让
order_placed
,order_updated
这些事件拥有了明确的业务含义。
- 事件溯源与DDD是天作之合。DDD的核心就是领域事件。通过DDD,我们可以清晰地定义出系统的限界上下文(Bounded Contexts)、聚合(Aggregates)、命令(Commands)和事件(Events)。这套语言不仅指导了我们的代码结构,也让
客户端: Swift
- 我们的终端用户主要在macOS和iOS上使用我们的分析工具。Swift作为苹果生态的一等公民,配合gRPC-Swift库,可以方便地构建出能够消费gRPC流式接口的原生应用,提供流畅的实时体验。
可观测性: Prometheus
- 在一个复杂的分布式系统中,没有监控就等于蒙眼狂奔。我们需要精确度量关键指标,例如:事件从产生到被投影的延迟(Projection Lag)、活跃的gRPC流数量、数据库事务冲突率等。Prometheus的拉模型(Pull-based)和强大的查询语言PromQL,非常适合监控这类系统。
整个系统架构的草图如下:
graph TD subgraph "Swift Client (macOS/iOS)" A[UI/ViewModel] --> B{gRPC-Swift Client}; end subgraph "Go Backend (Geo-Distributed)" subgraph "API Layer" B -- gRPC Stream: SubscribeToProjection --> C[gRPC Server]; end subgraph "Command Service" D[Command Handler] -- Append Events --> E[(CockroachDB: Event Store)]; end subgraph "Projection Service" F[Projector Worker] -- Polls Events --> E; F -- Updates Projection --> G[(CockroachDB: Projection Store)]; C -- Reads & Streams Changes --> G; end end subgraph "Observability Stack" H[Prometheus] -- Scrapes Metrics --> D; H -- Scrapes Metrics --> F; H -- Scrapes Metrics --> C; end
步骤化实现:代码、问题与解决方案
1. DDD建模与Protobuf定义
我们以一个简化的“交易订单”聚合(Order
Aggregate)为例。一个订单可以被创建、修改价格或关闭。这些行为对应着不同的命令,并会产生相应的事件。
首先,我们在Protobuf中定义这些结构。这是我们服务间通信的契约,也是DDD模型的直接体现。
pkg/proto/order/v1/order.proto
:
syntax = "proto3";
package order.v1;
import "google/protobuf/timestamp.proto";
import "google/protobuf/wrappers.proto";
option go_package = "github.com/your-org/your-project/pkg/proto/order/v1;orderv1";
// Order Aggregate state
message Order {
string order_id = 1;
string user_id = 2;
string instrument = 3;
double price = 4;
int64 quantity = 5;
string status = 6; // e.g., "OPEN", "CLOSED"
google.protobuf.Timestamp created_at = 7;
google.protobuf.Timestamp updated_at = 8;
int64 version = 9;
}
// Events
message OrderCreatedEvent {
string order_id = 1;
string user_id = 2;
string instrument = 3;
double price = 4;
int64 quantity = 5;
google.protobuf.Timestamp created_at = 7;
}
message OrderPriceUpdatedEvent {
string order_id = 1;
double new_price = 2;
google.protobuf.Timestamp updated_at = 3;
}
// ... other events like OrderClosedEvent
// gRPC Service for subscribing to real-time updates
service OrderProjectionService {
// Establishes a server-side stream for real-time order updates for a user.
rpc SubscribeUserOrders(SubscribeUserOrdersRequest) returns (stream Order);
}
message SubscribeUserOrdersRequest {
string user_id = 1;
}
这里的关键在于,Protobuf文件不仅定义了数据结构,还通过OrderProjectionService
定义了客户端与服务器的交互方式。
2. CockroachDB中的事件存储
事件存储是系统的基石。它的设计必须保证事件的顺序和原子性。
SQL schema migrations/001_create_events_table.up.sql
:
CREATE TABLE events (
event_id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
aggregate_id VARCHAR(255) NOT NULL,
aggregate_type VARCHAR(100) NOT NULL,
event_type VARCHAR(100) NOT NULL,
payload JSONB NOT NULL,
version INT NOT NULL,
created_at TIMESTAMPTZ NOT NULL DEFAULT now(),
-- We need to ensure that for a given aggregate, the version is unique.
-- This is the core of our optimistic concurrency control.
CONSTRAINT unique_aggregate_version UNIQUE (aggregate_id, aggregate_type, version)
);
-- Index for efficient event polling by projectors
CREATE INDEX idx_events_created_at ON events (created_at);
unique_aggregate_version
约束是这个设计的核心。它利用数据库的能力来防止并发写入同一个聚合时产生冲突,实现了乐观锁。
接下来是Go语言实现的事件仓库(Event Repository)。
internal/eventstore/cockroach_store.go
:
package eventstore
import (
"context"
"database/sql"
"encoding/json"
"errors"
"fmt"
"github.com/jackc/pgx/v5/pgconn"
"github.com/your-org/your-project/internal/domain"
"go.uber.org/zap"
)
// CockroachStore implements the EventStore interface for CockroachDB.
type CockroachStore struct {
db *sql.DB
logger *zap.Logger
}
// NewCockroachStore creates a new event store instance.
func NewCockroachStore(db *sql.DB, logger *zap.Logger) *CockroachStore {
return &CockroachStore{db: db, logger: logger}
}
// Append saves new events to the store. It's transactional.
func (s *CockroachStore) Append(ctx context.Context, events []domain.Event, expectedVersion int64) error {
tx, err := s.db.BeginTx(ctx, nil)
if err != nil {
return fmt.Errorf("failed to begin transaction: %w", err)
}
defer tx.Rollback() // Rollback is a no-op if Commit succeeds.
for _, event := range events {
payload, err := json.Marshal(event.Payload())
if err != nil {
return fmt.Errorf("failed to marshal event payload: %w", err)
}
// The core insert statement.
query := `
INSERT INTO events (aggregate_id, aggregate_type, event_type, payload, version)
VALUES ($1, $2, $3, $4, $5)
`
_, err = tx.ExecContext(ctx, query, event.AggregateID(), event.AggregateType(), event.Type(), payload, event.Version())
if err != nil {
var pgErr *pgconn.PgError
// Check for the unique constraint violation (code 23505 in PostgreSQL).
if errors.As(err, &pgErr) && pgErr.Code == "23505" {
return fmt.Errorf("concurrency conflict for aggregate %s: %w", event.AggregateID(), domain.ErrConcurrencyConflict)
}
s.logger.Error("Failed to insert event", zap.Error(err), zap.String("aggregate_id", event.AggregateID()))
return fmt.Errorf("failed to insert event: %w", err)
}
}
if err := tx.Commit(); err != nil {
return fmt.Errorf("failed to commit transaction: %w", err)
}
return nil
}
这段代码的健壮性体现在几个方面:
- 事务性: 所有事件要么全部成功,要么全部失败,保证了聚合状态变更的原子性。
- 错误处理: 精确地捕获了数据库的唯一约束冲突错误(
23505
),并将其转化为领域层的ErrConcurrencyConflict
。这是实现乐观并发控制的关键。上层业务逻辑(命令处理器)在收到这个错误后,可以重新加载聚合、应用命令、然后重试保存。 - 日志记录: 在发生未知数据库错误时,记录了详细的上下文信息,便于生产环境排障。
3. 投影器:构建读取模型
投影器是系统的“心脏”之一。它不断地从事件存储中拉取新事件,并更新物化视图。
首先是投影表的Schema。migrations/002_create_orders_projection.up.sql
:
CREATE TABLE orders_projection (
order_id VARCHAR(255) PRIMARY KEY,
user_id VARCHAR(255) NOT NULL,
instrument VARCHAR(50) NOT NULL,
price DECIMAL(18, 8) NOT NULL,
quantity BIGINT NOT NULL,
status VARCHAR(50) NOT NULL,
created_at TIMESTAMPTZ NOT NULL,
updated_at TIMESTAMPTZ NOT NULL,
-- This version helps in ensuring idempotency and debugging.
last_processed_version BIGINT NOT NULL
);
CREATE INDEX idx_orders_projection_user_id ON orders_projection(user_id);
-- This table tracks the progress of each projector.
CREATE TABLE projector_offsets (
projector_name VARCHAR(255) PRIMARY KEY,
last_processed_event_id UUID,
last_processed_timestamp TIMESTAMPTZ NOT NULL
);
projector_offsets
表至关重要。它让投影器可以“记住”自己处理到了哪里,即使服务重启也能从断点处继续,保证了事件不会被重复处理或遗漏。
下面是投影器工作逻辑的简化版实现。
internal/projection/order_projector.go
:
package projection
import (
"context"
"database/sql"
"time"
"github.com/your-org/your-project/internal/eventstore"
"go.uber.org/zap"
)
type OrderProjector struct {
db *sql.DB
eventStore *eventstore.CockroachStore // Simplified for example, should be an interface
logger *zap.Logger
projectorName string
pollInterval time.Duration
}
// Start begins the projection loop.
func (p *OrderProjector) Start(ctx context.Context) {
ticker := time.NewTicker(p.pollInterval)
defer ticker.Stop()
p.logger.Info("Starting order projector", zap.String("name", p.projectorName))
for {
select {
case <-ctx.Done():
p.logger.Info("Stopping order projector", zap.String("name", p.projectorName))
return
case <-ticker.C:
if err := p.processBatch(ctx); err != nil {
p.logger.Error("Failed to process event batch", zap.Error(err))
// In a real system, you'd have more sophisticated error handling,
// like exponential backoff.
}
}
}
}
func (p *OrderProjector) processBatch(ctx context.Context) error {
tx, err := p.db.BeginTx(ctx, nil)
if err != nil {
return err
}
defer tx.Rollback()
// 1. Get the last processed timestamp from the offsets table.
var lastProcessed time.Time
// ... code to query projector_offsets within the transaction ...
// 2. Poll for new events since the last timestamp.
// ... code to query events table for events where created_at > lastProcessed ...
var lastEventTimestamp time.Time
// 3. Apply each event to the projection table.
for _, event := range newEvents {
switch e := event.Payload().(type) {
case *orderv1.OrderCreatedEvent:
// ... SQL INSERT statement for orders_projection ...
case *orderv1.OrderPriceUpdatedEvent:
// ... SQL UPDATE statement for orders_projection ...
}
// ... handle errors from SQL execution ...
lastEventTimestamp = event.CreatedAt()
}
// 4. If events were processed, update the offset.
if len(newEvents) > 0 {
// ... SQL INSERT...ON CONFLICT...DO UPDATE statement for projector_offsets ...
}
// 5. Commit the entire transaction.
return tx.Commit()
}
遇到的问题与解决:
最初,我们在事务外读取offset
,处理完一批事件后,在另一个事务里更新offset
。这导致了一个严重问题:如果在更新投影表和更新offset
之间服务崩溃,重启后会重复处理同一批事件,可能导致数据不一致。
解决方案: 将读取offset
、处理事件、更新投影、更新offset
这四个步骤全部放在一个数据库事务中。这确保了整个投影过程的原子性,要么全部成功,要么全部回滚,从而保证了“至少一次”的处理语义,并通过投影表的主键约束来保证幂等性。
4. gRPC 流式API与Swift客户端
gRPC服务器的职责是监控orders_projection
表的变化,并将这些变化实时流式传输给订阅的客户端。
internal/transport/grpc/server.go
:
package grpc
import (
"context"
// ... other imports
orderv1 "github.com/your-org/your-project/pkg/proto/order/v1"
)
// SubscribeUserOrders implements the gRPC streaming endpoint.
func (s *Server) SubscribeUserOrders(req *orderv1.SubscribeUserOrdersRequest, stream orderv1.OrderProjectionService_SubscribeUserOrdersServer) error {
userID := req.GetUserId()
if userID == "" {
return status.Error(codes.InvalidArgument, "user_id is required")
}
// This is a simplified implementation. A production system would use a more
// efficient mechanism than polling, perhaps leveraging a message bus or
// CockroachDB's CDC feature.
var lastSentVersion int64 = 0
ticker := time.NewTicker(500 * time.Millisecond)
defer ticker.Stop()
for {
select {
case <-stream.Context().Done():
s.logger.Info("Client disconnected", zap.String("user_id", userID))
return nil
case <-ticker.C:
// Query for orders for the user that are newer than what we've sent.
rows, err := s.db.QueryContext(stream.Context(),
`SELECT ... FROM orders_projection WHERE user_id = $1 AND last_processed_version > $2 ORDER BY updated_at`,
userID, lastSentVersion)
// ... error handling ...
for rows.Next() {
var order orderv1.Order
// ... scan row into order proto message ...
if err := stream.Send(&order); err != nil {
s.logger.Error("Failed to send order to stream", zap.Error(err))
return err
}
if order.Version > lastSentVersion {
lastSentVersion = order.Version
}
}
}
}
}
在Swift客户端,我们使用grpc-swift
库来消费这个流。
OrderViewModel.swift
:
import Foundation
import Combine
import GRPC
import NIO
@MainActor
class OrderViewModel: ObservableObject {
@Published var orders: [Order_V1_Order] = []
private var stream: BidirectionalStreamingCall<Order_V1_SubscribeUserOrdersRequest, Order_V1_Order>?
private var cancellables = Set<AnyCancellable>()
private let client: Order_V1_OrderProjectionServiceAsyncClient
init(client: Order_V1_OrderProjectionServiceAsyncClient) {
self.client = client
}
func subscribe(for userID: String) {
let request = Order_V1_SubscribeUserOrdersRequest.with {
$0.userID = userID
}
Task {
do {
// The stream is an AsyncSequence, which is incredibly powerful.
let stream = client.subscribeUserOrders(request)
for try await order in stream {
// This code runs on the MainActor, so it's safe to update UI.
updateOrAppendOrder(order)
print("Received or updated order: \(order.orderID)")
}
} catch {
// Handle stream errors, e.g., network issues.
print("Subscription failed with error: \(error)")
}
}
}
private func updateOrAppendOrder(_ order: Order_V1_Order) {
if let index = orders.firstIndex(where: { $0.orderID == order.orderID }) {
orders[index] = order
} else {
orders.append(order)
}
}
}
Swift的async/await
语法与gRPC的AsyncSequence
结合得天衣无缝。这使得处理复杂的流式数据就像遍历一个简单的数组一样直观,大大降低了客户端开发的复杂度。
5. 用Prometheus进行监控
最后,我们为系统装上“眼睛”。在Go服务中,我们使用官方的Prometheus客户端库来暴露指标。
internal/metrics/metrics.go
:
package metrics
import (
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promauto"
)
var (
// A gauge to track the time difference between event creation and projection.
ProjectionLag = promauto.NewGaugeVec(
prometheus.GaugeOpts{
Name: "projection_lag_seconds",
Help: "The lag in seconds between an event's creation and its projection.",
},
[]string{"projector_name"},
)
// A counter for database errors during event processing.
ProjectionDBErrors = promauto.NewCounterVec(
prometheus.CounterOpts{
Name: "projection_db_errors_total",
Help: "Total number of database errors encountered by the projector.",
},
[]string{"projector_name"},
)
)
// In the projector code, when an event is processed:
// lag := time.Since(event.CreatedAt())
// metrics.ProjectionLag.WithLabelValues(p.projectorName).Set(lag.Seconds())
通过暴露一个/metrics
HTTP端点,Prometheus可以定期抓取这些数据。在Grafana中,我们可以轻松地创建仪表盘来可视化projection_lag_seconds
。当这个值出现尖峰时,我们就能立即收到告警,知道某个区域的投影处理可能出现了瓶颈或故障,从而在影响用户之前介入调查。
当前方案的局限性与未来迭代
尽管这套架构解决了我们最初的地理分布式一致性读取问题,但它远非完美。
投影器的可伸缩性: 目前的投影器是单体工作模式。虽然可以水平部署多个实例,但它们会处理相同的事件流,对数据库造成不必要的压力。下一步的迭代方向是实现投影器的分区处理,例如,基于
aggregate_id
的哈希值将事件分发给不同的投影器实例。这需要一个更复杂的offset
管理机制。投影重建: 当我们需要引入一个新的投影模型,或者修改现有投影的结构时,我们必须从头开始重放整个事件历史。对于一个拥有数十亿事件的系统来说,这个过程可能需要数小时甚至数天。我们正在研究的解决方案是引入聚合快照(Aggregate Snapshots),定期将聚合的完整状态保存下来,这样投影器就不必每次都从第一个事件开始重放。
对CDC的探索: 投影器目前使用的轮询(Polling)模式虽然简单可靠,但会给事件存储带来持续的读压力,并且存在固有的延迟(取决于轮询间隔)。CockroachDB支持Change Data Capture (CDC),可以以流的形式将数据变更推送出来。我们计划将投影器改造为消费CDC流,这将使投影延迟进一步降低,并减少对主数据库的负载,实现真正的事件驱动。