构建一个基于CockroachDB和gRPC-Go的地理分布式事件溯源投影系统


团队最初面临的问题是全球用户数据的读取一致性。我们为一款金融交易分析工具提供后端服务,用户遍布北美、欧洲和亚洲。传统的读写分离、主从复制架构在跨大西洋光缆的延迟面前显得力不从心。欧洲用户提交的交易,需要数十秒甚至数分钟才能在亚洲的只读副本上查询到,这在我们的业务场景中是不可接受的。更糟糕的是,复杂的缓存策略和数据同步逻辑让系统变得脆弱不堪,一次网络分区就可能导致数据长时间不一致,排查起来如同噩梦。

我们需要的不是在现有架构上打补丁,而是一种根本性的变革。核心诉around是:拥有一个单一、不可变的事件真相源,并能从这个真相源中,以低延迟、高一致性的方式,在全球任何地方构建出我们需要的读取模型(Read Model)。这直接将我们的目光引向了事件溯源(Event Sourcing)和CQRS架构,而挑战在于,如何在地理分布式环境中实现它。

初步构想与技术选型决策

我们的构想是这样的:所有改变系统状态的操作都以“事件”的形式,被持久化到一个仅追加(Append-Only)的日志中。这个日志就是我们唯一的真相源。然后,独立的后台进程(我们称之为“投影器”)异步地读取这些事件,并将它们“投影”成用户查询所需的各种物化视图。

这个架构的成败,取决于底层数据库和通信协议。在经过几轮激烈的技术辩论后,我们确定了核心技术栈:

  1. 数据库: CockroachDB

    • 为什么不用传统的PostgreSQL或MySQL?它们的分布式能力有限,要实现地理分布下的强一致性写入,需要引入复杂的中间件,这正是我们想摆脱的。
    • 为什么不用Cassandra或DynamoDB?它们是优秀的NoSQL数据库,但我们业务中复杂的查询和对事务的需求,使得SQL的表达能力和ACID保证至关重要。
    • CockroachDB成了唯一的选择。它兼容PostgreSQL协议,天生为地理分布式设计,提供了可串行化(Serializable)的隔离级别。它的AS OF SYSTEM TIME查询能力,对于调试和重放事件流来说,简直是量身定做的。我们可以将整个事件存储和投影存储都放在一个逻辑集群里,由它来处理跨区域的数据复制和一致性。
  2. 后端服务与通信: gRPC-Go

    • 在事件溯源系统中,服务间的性能至关重要。投影器需要高效地拉取事件,客户端需要实时地接收投影更新。RESTful API在这种高频、低延迟的场景下显得笨拙。
    • gRPC基于HTTP/2,使用Protobuf进行序列化,性能优越。更关键的是,它对双向流(Bi-directional Streaming)的原生支持,完美契合了我们的需求:客户端(例如我们的Swift App)可以与服务器建立一个长连接,服务器一旦更新了投影,就能立即将变化推送到客户端。
    • 选择Go语言,是因为它出色的并发性能、简洁的语法以及成熟的gRPC生态。对于构建高并发的投影器和API服务来说,它是理想的工具。
  3. 设计方法论: Domain-Driven Design (DDD)

    • 事件溯源与DDD是天作之合。DDD的核心就是领域事件。通过DDD,我们可以清晰地定义出系统的限界上下文(Bounded Contexts)、聚合(Aggregates)、命令(Commands)和事件(Events)。这套语言不仅指导了我们的代码结构,也让order_placed, order_updated这些事件拥有了明确的业务含义。
  4. 客户端: Swift

    • 我们的终端用户主要在macOS和iOS上使用我们的分析工具。Swift作为苹果生态的一等公民,配合gRPC-Swift库,可以方便地构建出能够消费gRPC流式接口的原生应用,提供流畅的实时体验。
  5. 可观测性: Prometheus

    • 在一个复杂的分布式系统中,没有监控就等于蒙眼狂奔。我们需要精确度量关键指标,例如:事件从产生到被投影的延迟(Projection Lag)、活跃的gRPC流数量、数据库事务冲突率等。Prometheus的拉模型(Pull-based)和强大的查询语言PromQL,非常适合监控这类系统。

整个系统架构的草图如下:

graph TD
    subgraph "Swift Client (macOS/iOS)"
        A[UI/ViewModel] --> B{gRPC-Swift Client};
    end

    subgraph "Go Backend (Geo-Distributed)"
        subgraph "API Layer"
            B -- gRPC Stream: SubscribeToProjection --> C[gRPC Server];
        end
        subgraph "Command Service"
            D[Command Handler] -- Append Events --> E[(CockroachDB: Event Store)];
        end
        subgraph "Projection Service"
            F[Projector Worker] -- Polls Events --> E;
            F -- Updates Projection --> G[(CockroachDB: Projection Store)];
            C -- Reads & Streams Changes --> G;
        end
    end

    subgraph "Observability Stack"
        H[Prometheus] -- Scrapes Metrics --> D;
        H -- Scrapes Metrics --> F;
        H -- Scrapes Metrics --> C;
    end

步骤化实现:代码、问题与解决方案

1. DDD建模与Protobuf定义

我们以一个简化的“交易订单”聚合(Order Aggregate)为例。一个订单可以被创建、修改价格或关闭。这些行为对应着不同的命令,并会产生相应的事件。

首先,我们在Protobuf中定义这些结构。这是我们服务间通信的契约,也是DDD模型的直接体现。

pkg/proto/order/v1/order.proto:

syntax = "proto3";

package order.v1;

import "google/protobuf/timestamp.proto";
import "google/protobuf/wrappers.proto";

option go_package = "github.com/your-org/your-project/pkg/proto/order/v1;orderv1";

// Order Aggregate state
message Order {
  string order_id = 1;
  string user_id = 2;
  string instrument = 3;
  double price = 4;
  int64 quantity = 5;
  string status = 6; // e.g., "OPEN", "CLOSED"
  google.protobuf.Timestamp created_at = 7;
  google.protobuf.Timestamp updated_at = 8;
  int64 version = 9;
}

// Events
message OrderCreatedEvent {
  string order_id = 1;
  string user_id = 2;
  string instrument = 3;
  double price = 4;
  int64 quantity = 5;
  google.protobuf.Timestamp created_at = 7;
}

message OrderPriceUpdatedEvent {
  string order_id = 1;
  double new_price = 2;
  google.protobuf.Timestamp updated_at = 3;
}

// ... other events like OrderClosedEvent

// gRPC Service for subscribing to real-time updates
service OrderProjectionService {
  // Establishes a server-side stream for real-time order updates for a user.
  rpc SubscribeUserOrders(SubscribeUserOrdersRequest) returns (stream Order);
}

message SubscribeUserOrdersRequest {
  string user_id = 1;
}

这里的关键在于,Protobuf文件不仅定义了数据结构,还通过OrderProjectionService定义了客户端与服务器的交互方式。

2. CockroachDB中的事件存储

事件存储是系统的基石。它的设计必须保证事件的顺序和原子性。

SQL schema migrations/001_create_events_table.up.sql:

CREATE TABLE events (
    event_id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
    aggregate_id VARCHAR(255) NOT NULL,
    aggregate_type VARCHAR(100) NOT NULL,
    event_type VARCHAR(100) NOT NULL,
    payload JSONB NOT NULL,
    version INT NOT NULL,
    created_at TIMESTAMPTZ NOT NULL DEFAULT now(),
    -- We need to ensure that for a given aggregate, the version is unique.
    -- This is the core of our optimistic concurrency control.
    CONSTRAINT unique_aggregate_version UNIQUE (aggregate_id, aggregate_type, version)
);

-- Index for efficient event polling by projectors
CREATE INDEX idx_events_created_at ON events (created_at);

unique_aggregate_version约束是这个设计的核心。它利用数据库的能力来防止并发写入同一个聚合时产生冲突,实现了乐观锁。

接下来是Go语言实现的事件仓库(Event Repository)。

internal/eventstore/cockroach_store.go:

package eventstore

import (
	"context"
	"database/sql"
	"encoding/json"
	"errors"
	"fmt"

	"github.com/jackc/pgx/v5/pgconn"
	"github.com/your-org/your-project/internal/domain"
	"go.uber.org/zap"
)

// CockroachStore implements the EventStore interface for CockroachDB.
type CockroachStore struct {
	db     *sql.DB
	logger *zap.Logger
}

// NewCockroachStore creates a new event store instance.
func NewCockroachStore(db *sql.DB, logger *zap.Logger) *CockroachStore {
	return &CockroachStore{db: db, logger: logger}
}

// Append saves new events to the store. It's transactional.
func (s *CockroachStore) Append(ctx context.Context, events []domain.Event, expectedVersion int64) error {
	tx, err := s.db.BeginTx(ctx, nil)
	if err != nil {
		return fmt.Errorf("failed to begin transaction: %w", err)
	}
	defer tx.Rollback() // Rollback is a no-op if Commit succeeds.

	for _, event := range events {
		payload, err := json.Marshal(event.Payload())
		if err != nil {
			return fmt.Errorf("failed to marshal event payload: %w", err)
		}

		// The core insert statement.
		query := `
            INSERT INTO events (aggregate_id, aggregate_type, event_type, payload, version)
            VALUES ($1, $2, $3, $4, $5)
        `
		_, err = tx.ExecContext(ctx, query, event.AggregateID(), event.AggregateType(), event.Type(), payload, event.Version())
		if err != nil {
			var pgErr *pgconn.PgError
			// Check for the unique constraint violation (code 23505 in PostgreSQL).
			if errors.As(err, &pgErr) && pgErr.Code == "23505" {
				return fmt.Errorf("concurrency conflict for aggregate %s: %w", event.AggregateID(), domain.ErrConcurrencyConflict)
			}
			s.logger.Error("Failed to insert event", zap.Error(err), zap.String("aggregate_id", event.AggregateID()))
			return fmt.Errorf("failed to insert event: %w", err)
		}
	}

	if err := tx.Commit(); err != nil {
		return fmt.Errorf("failed to commit transaction: %w", err)
	}

	return nil
}

这段代码的健壮性体现在几个方面:

  • 事务性: 所有事件要么全部成功,要么全部失败,保证了聚合状态变更的原子性。
  • 错误处理: 精确地捕获了数据库的唯一约束冲突错误(23505),并将其转化为领域层的ErrConcurrencyConflict。这是实现乐观并发控制的关键。上层业务逻辑(命令处理器)在收到这个错误后,可以重新加载聚合、应用命令、然后重试保存。
  • 日志记录: 在发生未知数据库错误时,记录了详细的上下文信息,便于生产环境排障。

3. 投影器:构建读取模型

投影器是系统的“心脏”之一。它不断地从事件存储中拉取新事件,并更新物化视图。

首先是投影表的Schema。
migrations/002_create_orders_projection.up.sql:

CREATE TABLE orders_projection (
    order_id VARCHAR(255) PRIMARY KEY,
    user_id VARCHAR(255) NOT NULL,
    instrument VARCHAR(50) NOT NULL,
    price DECIMAL(18, 8) NOT NULL,
    quantity BIGINT NOT NULL,
    status VARCHAR(50) NOT NULL,
    created_at TIMESTAMPTZ NOT NULL,
    updated_at TIMESTAMPTZ NOT NULL,
    -- This version helps in ensuring idempotency and debugging.
    last_processed_version BIGINT NOT NULL
);

CREATE INDEX idx_orders_projection_user_id ON orders_projection(user_id);

-- This table tracks the progress of each projector.
CREATE TABLE projector_offsets (
    projector_name VARCHAR(255) PRIMARY KEY,
    last_processed_event_id UUID,
    last_processed_timestamp TIMESTAMPTZ NOT NULL
);

projector_offsets 表至关重要。它让投影器可以“记住”自己处理到了哪里,即使服务重启也能从断点处继续,保证了事件不会被重复处理或遗漏。

下面是投影器工作逻辑的简化版实现。

internal/projection/order_projector.go:

package projection

import (
	"context"
	"database/sql"
	"time"
	"github.com/your-org/your-project/internal/eventstore"
	"go.uber.org/zap"
)

type OrderProjector struct {
	db             *sql.DB
	eventStore     *eventstore.CockroachStore // Simplified for example, should be an interface
	logger         *zap.Logger
	projectorName  string
	pollInterval   time.Duration
}

// Start begins the projection loop.
func (p *OrderProjector) Start(ctx context.Context) {
	ticker := time.NewTicker(p.pollInterval)
	defer ticker.Stop()

	p.logger.Info("Starting order projector", zap.String("name", p.projectorName))

	for {
		select {
		case <-ctx.Done():
			p.logger.Info("Stopping order projector", zap.String("name", p.projectorName))
			return
		case <-ticker.C:
			if err := p.processBatch(ctx); err != nil {
				p.logger.Error("Failed to process event batch", zap.Error(err))
				// In a real system, you'd have more sophisticated error handling,
				// like exponential backoff.
			}
		}
	}
}

func (p *OrderProjector) processBatch(ctx context.Context) error {
	tx, err := p.db.BeginTx(ctx, nil)
	if err != nil {
		return err
	}
	defer tx.Rollback()

	// 1. Get the last processed timestamp from the offsets table.
	var lastProcessed time.Time
	// ... code to query projector_offsets within the transaction ...
	
	// 2. Poll for new events since the last timestamp.
	// ... code to query events table for events where created_at > lastProcessed ...
	
	var lastEventTimestamp time.Time
	
	// 3. Apply each event to the projection table.
	for _, event := range newEvents {
		switch e := event.Payload().(type) {
		case *orderv1.OrderCreatedEvent:
			// ... SQL INSERT statement for orders_projection ...
		case *orderv1.OrderPriceUpdatedEvent:
			// ... SQL UPDATE statement for orders_projection ...
		}
		// ... handle errors from SQL execution ...
		lastEventTimestamp = event.CreatedAt()
	}

	// 4. If events were processed, update the offset.
	if len(newEvents) > 0 {
		// ... SQL INSERT...ON CONFLICT...DO UPDATE statement for projector_offsets ...
	}
	
	// 5. Commit the entire transaction.
	return tx.Commit()
}

遇到的问题与解决:
最初,我们在事务外读取offset,处理完一批事件后,在另一个事务里更新offset。这导致了一个严重问题:如果在更新投影表和更新offset之间服务崩溃,重启后会重复处理同一批事件,可能导致数据不一致。

解决方案: 将读取offset、处理事件、更新投影、更新offset四个步骤全部放在一个数据库事务中。这确保了整个投影过程的原子性,要么全部成功,要么全部回滚,从而保证了“至少一次”的处理语义,并通过投影表的主键约束来保证幂等性。

4. gRPC 流式API与Swift客户端

gRPC服务器的职责是监控orders_projection表的变化,并将这些变化实时流式传输给订阅的客户端。

internal/transport/grpc/server.go:

package grpc

import (
	"context"
	// ... other imports
	orderv1 "github.com/your-org/your-project/pkg/proto/order/v1"
)

// SubscribeUserOrders implements the gRPC streaming endpoint.
func (s *Server) SubscribeUserOrders(req *orderv1.SubscribeUserOrdersRequest, stream orderv1.OrderProjectionService_SubscribeUserOrdersServer) error {
	userID := req.GetUserId()
	if userID == "" {
		return status.Error(codes.InvalidArgument, "user_id is required")
	}

	// This is a simplified implementation. A production system would use a more
	// efficient mechanism than polling, perhaps leveraging a message bus or
	// CockroachDB's CDC feature.
	
	var lastSentVersion int64 = 0
	ticker := time.NewTicker(500 * time.Millisecond)
	defer ticker.Stop()

	for {
		select {
		case <-stream.Context().Done():
			s.logger.Info("Client disconnected", zap.String("user_id", userID))
			return nil
		case <-ticker.C:
			// Query for orders for the user that are newer than what we've sent.
			rows, err := s.db.QueryContext(stream.Context(), 
				`SELECT ... FROM orders_projection WHERE user_id = $1 AND last_processed_version > $2 ORDER BY updated_at`, 
				userID, lastSentVersion)
			// ... error handling ...
			
			for rows.Next() {
				var order orderv1.Order
				// ... scan row into order proto message ...

				if err := stream.Send(&order); err != nil {
					s.logger.Error("Failed to send order to stream", zap.Error(err))
					return err
				}
				if order.Version > lastSentVersion {
					lastSentVersion = order.Version
				}
			}
		}
	}
}

在Swift客户端,我们使用grpc-swift库来消费这个流。

OrderViewModel.swift:

import Foundation
import Combine
import GRPC
import NIO

@MainActor
class OrderViewModel: ObservableObject {
    @Published var orders: [Order_V1_Order] = []
    
    private var stream: BidirectionalStreamingCall<Order_V1_SubscribeUserOrdersRequest, Order_V1_Order>?
    private var cancellables = Set<AnyCancellable>()
    private let client: Order_V1_OrderProjectionServiceAsyncClient

    init(client: Order_V1_OrderProjectionServiceAsyncClient) {
        self.client = client
    }
    
    func subscribe(for userID: String) {
        let request = Order_V1_SubscribeUserOrdersRequest.with {
            $0.userID = userID
        }

        Task {
            do {
                // The stream is an AsyncSequence, which is incredibly powerful.
                let stream = client.subscribeUserOrders(request)
                
                for try await order in stream {
                    // This code runs on the MainActor, so it's safe to update UI.
                    updateOrAppendOrder(order)
                    print("Received or updated order: \(order.orderID)")
                }
            } catch {
                // Handle stream errors, e.g., network issues.
                print("Subscription failed with error: \(error)")
            }
        }
    }
    
    private func updateOrAppendOrder(_ order: Order_V1_Order) {
        if let index = orders.firstIndex(where: { $0.orderID == order.orderID }) {
            orders[index] = order
        } else {
            orders.append(order)
        }
    }
}

Swift的async/await语法与gRPC的AsyncSequence结合得天衣无缝。这使得处理复杂的流式数据就像遍历一个简单的数组一样直观,大大降低了客户端开发的复杂度。

5. 用Prometheus进行监控

最后,我们为系统装上“眼睛”。在Go服务中,我们使用官方的Prometheus客户端库来暴露指标。

internal/metrics/metrics.go:

package metrics

import (
	"github.com/prometheus/client_golang/prometheus"
	"github.com/prometheus/client_golang/prometheus/promauto"
)

var (
	// A gauge to track the time difference between event creation and projection.
	ProjectionLag = promauto.NewGaugeVec(
		prometheus.GaugeOpts{
			Name: "projection_lag_seconds",
			Help: "The lag in seconds between an event's creation and its projection.",
		},
		[]string{"projector_name"},
	)

	// A counter for database errors during event processing.
	ProjectionDBErrors = promauto.NewCounterVec(
		prometheus.CounterOpts{
			Name: "projection_db_errors_total",
			Help: "Total number of database errors encountered by the projector.",
		},
		[]string{"projector_name"},
	)
)

// In the projector code, when an event is processed:
// lag := time.Since(event.CreatedAt())
// metrics.ProjectionLag.WithLabelValues(p.projectorName).Set(lag.Seconds())

通过暴露一个/metrics HTTP端点,Prometheus可以定期抓取这些数据。在Grafana中,我们可以轻松地创建仪表盘来可视化projection_lag_seconds。当这个值出现尖峰时,我们就能立即收到告警,知道某个区域的投影处理可能出现了瓶颈或故障,从而在影响用户之前介入调查。

当前方案的局限性与未来迭代

尽管这套架构解决了我们最初的地理分布式一致性读取问题,但它远非完美。

  1. 投影器的可伸缩性: 目前的投影器是单体工作模式。虽然可以水平部署多个实例,但它们会处理相同的事件流,对数据库造成不必要的压力。下一步的迭代方向是实现投影器的分区处理,例如,基于aggregate_id的哈希值将事件分发给不同的投影器实例。这需要一个更复杂的offset管理机制。

  2. 投影重建: 当我们需要引入一个新的投影模型,或者修改现有投影的结构时,我们必须从头开始重放整个事件历史。对于一个拥有数十亿事件的系统来说,这个过程可能需要数小时甚至数天。我们正在研究的解决方案是引入聚合快照(Aggregate Snapshots),定期将聚合的完整状态保存下来,这样投影器就不必每次都从第一个事件开始重放。

  3. 对CDC的探索: 投影器目前使用的轮询(Polling)模式虽然简单可靠,但会给事件存储带来持续的读压力,并且存在固有的延迟(取决于轮询间隔)。CockroachDB支持Change Data Capture (CDC),可以以流的形式将数据变更推送出来。我们计划将投影器改造为消费CDC流,这将使投影延迟进一步降低,并减少对主数据库的负载,实现真正的事件驱动。


  目录