利用 Kafka 事件流构建解耦读写模型:ASP.NET Core 与 Google Cloud Functions 的协同架构


一个日益复杂的 ASP.NET Core 单体应用,其核心数据库正同时承受着事务性写入和高并发分析性读取的双重压力。写入操作要求强一致性,而读取操作则需要聚合、变形后的数据,并且流量峰值是写入流量的数十倍。这种读写模式的根本性冲突导致数据库频繁出现锁竞争,性能瓶颈日益凸显,每一次功能迭代都小心翼翼,生怕对核心表结构的修改影响到报表和前端展示。

定义问题:读写模型的内在矛盾

问题的核心在于,服务于命令(Commands)的数据模型和需求,与服务于查询(Queries)的模型和需求截然不同。

  • 写入模型 (Write Model): 侧重于领域逻辑、数据一致性和规范化。它需要处理复杂的业务规则,通常以高度规范化的形式持久化数据,以确保事务的原子性和数据的完整性。
  • 读取模型 (Read Model): 侧重于查询效率和数据展现。它通常需要反规范化的、预先计算好的、为特定视图量身定制的数据结构,以便前端能够用最简单的查询一次性获取所有需要的信息。

将这两种模型强行捆绑在同一个数据库、同一套表结构上,是大多数系统演进到一定阶段必然会遇到的天花板。

方案A:传统的读写分离与物化视图

最直接的解决方案是数据库层面的读写分离。设置一个或多个只读副本(Read Replicas),将所有查询流量引导至副本,写入流量保留在主库。

优势:

  1. 实现简单: 这是数据库管理的标准操作,对应用层代码的侵入性相对较小。
  2. 强一致性保障: 在许多云厂商的服务中,主从复制延迟可以做到非常低,接近实时。
  3. 技术成熟: 有大量的现有工具和实践可以遵循。

劣势:

  1. 成本高昂: 只读副本通常需要与主库同等规格的硬件,当读取负载巨大时,需要维护一个庞大的副本集群,成本不菲。
  2. 结构耦合: 读写模型依然共享同一套表结构。如果为了优化查询性能而在副本上创建索引或修改表结构,会增加主从同步的复杂性和风险。读取模型无法拥有自己独立优化的结构。
  3. 查询局限: 对于复杂的聚合查询,即使在只读副本上运行,依然会消耗大量计算资源。它并未解决数据“形态”不匹配的问题,前端为了一个复杂的仪表盘页面,可能仍需发起多次 JOIN 查询。

在真实项目中,方案 A 往往是一个短期止痛药,而非长期解决方案。它缓解了资源争抢,但没有解决模型耦合的根本问题。

方案B:基于事件驱动(EDA)的CQRS架构

另一种思路是彻底将命令和查询的职责分离开来,即命令查询职责分离(CQRS)。我们可以通过事件驱动架构来实现这一点。

  1. 命令侧 (ASP.NET Core): 负责处理所有写操作。在完成业务逻辑后,它不直接更新读取所需的数据,而是发布一个描述了“发生了什么”的领域事件(Domain Event)到消息队列(如 Kafka)。
  2. 事件总线 (Kafka): 作为整个架构的神经中枢,持久化所有领域事件,形成一个不可变的事件日志。
  3. 查询侧 (Google Cloud Functions): 订阅 Kafka 中的相关事件。每当接收到一个事件,就触发一个无服务器函数。该函数唯一的职责就是根据事件内容,更新一个或多个为查询而优化的读取模型(Read Model / Materialized View)。这个读取模型可以存储在任何最适合查询场景的数据库中,如 Firestore、Elasticsearch 或一个简单的 PostgreSQL。

优势:

  1. 完全解耦: 读写模型在物理和逻辑上都完全分离。写入服务可以独立演进、部署和扩展,无需关心数据如何被读取。
  2. 极致的读取性能: 读取模型可以被设计为完全反规范化的。前端的一个 GraphQL 查询可能只需要对应到一个简单的 SELECT * FROM pre-aggregated_view WHERE id = ?,性能极高。
  3. 弹性与成本效益: 使用 Google Cloud Functions 这样的无服务器方案处理事件,可以根据事件流量自动伸缩。在没有事件时,成本几乎为零。这对于应对突发性读取流量的场景(例如营销活动)尤其有效。
  4. 架构灵活性: 同一个事件流可以被多个不同的消费者订阅,用于构建不同的读取模型。例如,一个消费者构建给用户前端的视图,另一个消费者将数据同步到数据仓库用于商业智能分析。

劣势:

  1. 最终一致性: 这是最大的挑战。从事件发布到被消费并更新读取模型,存在一个时间窗口。系统必须能够容忍这种短暂的数据不一致。
  2. 复杂性增加: 引入了 Kafka 和 Serverless,增加了系统的运维和监控复杂性。需要处理消息的幂等性、顺序性(如果需要)、错误重试和“毒丸消息”等问题。
  3. 开发心智模型转变: 开发者需要从传统的请求-响应模式切换到异步的、事件驱动的思维模式。

最终选择与理由:拥抱异步,走向未来

尽管方案 B 引入了复杂性,但它提供的解耦、弹性和可扩展性正是我们解决当前核心矛盾所需要的。对于一个需要长期演进的系统而言,前期在架构上的投入是值得的。我们决定采用方案 B,将 ASP.NET Core 服务作为命令处理器和事件发布者,利用 Kafka 作为事件总线,并通过 Google Cloud Functions 构建一个为 GraphQL 查询端点优化的读取模型。

以下是该架构的核心实现概览。

graph TD
    subgraph "前端 (Webpack Build)"
        A[React App with GraphQL Client]
    end

    subgraph "查询服务 (Query Side)"
        B[GraphQL API on Cloud Run/Functions] --> C{Read Database: Firestore};
    end
    
    subgraph "事件处理 (Event Processing)"
        E[Google Cloud Function] -- 更新/写入 --> C;
    end

    subgraph "事件总线 (Event Bus)"
        F[Kafka Cluster: order-events Topic];
    end

    subgraph "命令服务 (Command Side)"
        G[ASP.NET Core API] -- 发布事件 --> F;
    end
    
    A -- GraphQL Query --> B;
    E -- 订阅 --> F;

    style G fill:#3498db,stroke:#2980b9,stroke-width:2px,color:#fff
    style F fill:#2c3e50,stroke:#000,stroke-width:2px,color:#fff
    style E fill:#f39c12,stroke:#e67e22,stroke-width:2px,color:#fff
    style B fill:#2ecc71,stroke:#27ae60,stroke-width:2px,color:#fff

核心实现:ASP.NET Core 事件生产者

在 ASP.NET Core 服务中,我们需要一个可靠的 Kafka 生产者。这里的关键在于确保事件能够“至少一次”成功发布到 Kafka。

首先,定义我们的事件契约。使用 record 类型是定义不可变数据传输对象(DTO)的绝佳方式。

// --- EventContracts/OrderEvents.cs ---
namespace CoreApi.EventContracts
{
    /// <summary>
    /// 订单创建事件。这是我们系统中事实的来源。
    /// </summary>
    /// <param name="OrderId">订单ID</param>
    /// <param name="CustomerId">客户ID</param>
    /// <param name="OrderItems">订单项列表</param>
    /// <param name="TotalPrice">总价</param>
    /// <param name="TimestampUtc">事件发生时间戳</param>
    public record OrderCreatedEvent(
        Guid OrderId,
        Guid CustomerId,
        List<OrderItem> OrderItems,
        decimal TotalPrice,
        DateTime TimestampUtc
    );

    public record OrderItem(
        Guid ProductId,
        int Quantity,
        decimal UnitPrice
    );
}

接下来,是 Kafka 生产者的配置和服务。我们不应该在每个业务请求中都去创建生产者实例,而是将其注册为单例,并提供一个封装好的服务。

// --- Infrastructure/Kafka/KafkaProducerConfig.cs ---
using Confluent.Kafka;

public class KafkaProducerConfig
{
    public ProducerConfig Producer { get; set; } = new ProducerConfig();
    public string Topic { get; set; }
}

// --- appsettings.json ---
/*
{
  "KafkaProducer": {
    "Topic": "order-events",
    "Producer": {
      "BootstrapServers": "your-kafka-broker:9092",
      "ClientId": "aspnet-core-producer",
      // 在生产环境中,这里的配置至关重要
      "Acks": "All", // 保证所有ISR都收到消息,最高的数据持久性保证
      "EnableIdempotence": true, // 开启幂等性,防止因重试导致的消息重复
      "MessageSendMaxRetries": 3,
      "RetryBackoffMs": 1000
    }
  }
}
*/

// --- Infrastructure/Kafka/EventProducerService.cs ---
using Confluent.Kafka;
using Microsoft.Extensions.Options;
using System.Text.Json;
using System.Threading.Tasks;

public interface IEventProducer
{
    Task ProduceAsync<T>(string key, T message);
}

public class EventProducerService : IEventProducer, IDisposable
{
    private readonly IProducer<string, string> _producer;
    private readonly string _topic;
    private readonly ILogger<EventProducerService> _logger;

    public EventProducerService(IOptions<KafkaProducerConfig> config, ILogger<EventProducerService> logger)
    {
        _topic = config.Value.Topic;
        _producer = new ProducerBuilder<string, string>(config.Value.Producer).Build();
        _logger = logger;
    }

    public async Task ProduceAsync<T>(string key, T message)
    {
        var serializedMessage = JsonSerializer.Serialize(message);
        
        try
        {
            // 这里的 key 很重要,它决定了消息在 Kafka 分区中的分布。
            // 使用订单ID作为 key 可以保证同一个订单的所有事件都进入同一个分区,从而保证了顺序性。
            var kafkaMessage = new Message<string, string> { Key = key, Value = serializedMessage };

            // Produce 是一个异步操作,但我们通常不等待它完成以避免阻塞请求线程。
            // 相反,我们依赖回调来处理结果。
            _producer.Produce(_topic, kafkaMessage, deliveryReport =>
            {
                if (deliveryReport.Error.Code != ErrorCode.NoError)
                {
                    // 在真实项目中,这里必须有可靠的失败处理机制。
                    // 可能是写入本地失败日志表,然后通过后台任务重试。
                    // 或者直接告警,人工介入。
                    _logger.LogError(
                        "Failed to deliver message: {Key} | Reason: {Reason}", 
                        deliveryReport.Message.Key, 
                        deliveryReport.Error.Reason);
                }
                else
                {
                    _logger.LogInformation(
                        "Message delivered: {Key} to {TopicPartitionOffset}", 
                        deliveryReport.Message.Key, 
                        deliveryReport.TopicPartitionOffset);
                }
            });
        }
        catch (Exception ex)
        {
            _logger.LogError(ex, "Error producing message to Kafka.");
            // 异常也需要处理,确保不会丢失事件
            throw;
        }
        
        // 我们没有 await produce 调用,所以方法会很快返回。
        // 但我们需要确保在应用关闭时,所有缓冲区的消息都被发送出去。
        await Task.CompletedTask;
    }

    public void Dispose()
    {
        // Flush 会阻塞直到所有缓冲区的消息都发送完成。
        // 这是保证在服务关闭时不丢失事件的关键。
        _producer.Flush(TimeSpan.FromSeconds(10));
        _producer.Dispose();
    }
}

Startup.csProgram.cs 中注册服务:

// services.Configure<KafkaProducerConfig>(builder.Configuration.GetSection("KafkaProducer"));
// services.AddSingleton<IEventProducer, EventProducerService>();

在业务逻辑中使用:

// --- Application/OrderService.cs ---
public class OrderService
{
    private readonly IEventProducer _eventProducer;

    public OrderService(IEventProducer eventProducer)
    {
        _eventProducer = eventProducer;
    }

    public async Task CreateOrderAsync(/*... order data ...*/)
    {
        // 1. 执行核心业务逻辑,例如验证、扣减库存、保存到主数据库
        // ...
        // var order = new Order(...);
        // _dbContext.Orders.Add(order);
        // await _dbContext.SaveChangesAsync();

        // 2. 逻辑成功后,发布领域事件
        var orderCreatedEvent = new OrderCreatedEvent(
            Guid.NewGuid(), 
            Guid.NewGuid(), 
            new List<OrderItem>(), 
            199.99m, 
            DateTime.UtcNow
        );

        // 使用订单ID作为key
        await _eventProducer.ProduceAsync(orderCreatedEvent.OrderId.ToString(), orderCreatedEvent);
    }
}

这里的坑在于:数据库事务和消息发布不是原子的。如果 SaveChangesAsync() 成功了,但 ProduceAsync 在应用崩溃前失败了,就会导致数据不一致。解决方案通常是采用“事务性发件箱模式”(Transactional Outbox Pattern),但这会增加实现的复杂度,对于许多项目,依赖 Kafka 生产者的重试和幂等性配置已经足够健壮。

核心实现:Google Cloud Function 事件消费者

现在我们需要一个 C# 的 Google Cloud Function 来消费这些事件。我们将使用 CloudEvents 格式,并假设有一个 Kafka 事件源(如通过 Eventarc)触发该函数。

// --- OrderEventHandler.csproj ---
/*
<Project Sdk="Google.Cloud.Functions.Sdk/1.0.0">
  ...
  <ItemGroup>
    <PackageReference Include="CloudNative.CloudEvents.Kafka" Version="2.7.0" />
    <PackageReference Include="Confluent.Kafka" Version="2.3.0" />
    <PackageReference Include="Google.Cloud.Firestore" Version="3.4.0" />
    <PackageReference Include="Microsoft.Extensions.Logging.Console" Version="8.0.0" />
  </ItemGroup>
</Project>
*/

// --- Function.cs ---
using Google.Cloud.Functions.Framework;
using Google.Events.Protobuf.Cloud.Kafka.V1;
using Microsoft.Extensions.Logging;
using System.Text.Json;
using System.Threading.Tasks;
using Google.Cloud.Firestore;

// 定义事件契约,与生产者保持一致
public record OrderCreatedEvent(Guid OrderId, Guid CustomerId, decimal TotalPrice, DateTime TimestampUtc);

public class Function : ICloudEventFunction<KafkaMessagePublishedData>
{
    private readonly ILogger<Function> _logger;
    private readonly FirestoreDb _firestoreDb;
    private const string ProjectId = "your-gcp-project-id";
    private const string CollectionName = "order-read-models";

    public Function(ILogger<Function> logger)
    {
        _logger = logger;
        // 在生产环境中,FirestoreDb 实例应该是单例的
        _firestoreDb = FirestoreDb.Create(ProjectId);
    }

    /// <summary>
    /// 当 Kafka topic 中有新消息时,此函数将被触发
    /// </summary>
    public async Task HandleAsync(CloudEvent cloudEvent, KafkaMessagePublishedData data, CancellationToken cancellationToken)
    {
        if (data.Messages.Count == 0)
        {
            _logger.LogWarning("Received an empty Kafka message batch.");
            return;
        }

        // Eventarc for Kafka 可能会批量发送消息
        foreach (var message in data.Messages)
        {
            var key = message.Key.ToStringUtf8();
            var value = message.Value.ToStringUtf8();
            _logger.LogInformation("Processing Kafka message with key: {Key}", key);

            try
            {
                // 反序列化事件
                var orderEvent = JsonSerializer.Deserialize<OrderCreatedEvent>(value);
                if (orderEvent == null)
                {
                    // 这是“毒丸消息”的一个例子。我们必须跳过它,否则函数会不断重试和失败。
                    _logger.LogError("Failed to deserialize message with key: {Key}. Skipping.", key);
                    continue;
                }

                // 创建或更新读取模型
                // Firestore 的文档ID可以直接使用订单ID,这天然保证了幂等性。
                // 多次处理同一个 OrderCreatedEvent 只会覆盖同一个文档。
                DocumentReference docRef = _firestoreDb.Collection(CollectionName).Document(orderEvent.OrderId.ToString());

                // 这里的对象结构是完全为查询优化的,可以包含冗余数据
                var readModel = new
                {
                    orderEvent.OrderId,
                    orderEvent.CustomerId,
                    orderEvent.TotalPrice,
                    // 可能需要从其他服务查询客户名称并冗余存储,以避免前端进行JOIN
                    CustomerName = "Fetched Customer Name", 
                    Status = "Created",
                    orderEvent.TimestampUtc,
                    LastUpdatedUtc = DateTime.UtcNow
                };

                await docRef.SetAsync(readModel, cancellationToken: cancellationToken);
                _logger.LogInformation("Successfully processed and stored read model for OrderId: {OrderId}", orderEvent.OrderId);
            }
            catch (JsonException jsonEx)
            {
                _logger.LogError(jsonEx, "JSON deserialization error for key {Key}. This is likely a poison pill.", key);
                // 对于无法解析的消息,我们选择记录并跳过
            }
            catch (Exception ex)
            {
                _logger.LogError(ex, "An unexpected error occurred while processing message with key {Key}", key);
                // 对于其他瞬时错误 (如 Firestore 连接问题),函数运行时会自动重试。
                // 如果错误是持久的,需要配置死信队列 (DLQ)。
                throw;
            }
        }
    }
}

前端消费与挑战

前端应用由 Webpack 打包,使用 Apollo Client 或类似的 GraphQL 客户端。它现在连接到一个非常快速的 GraphQL 端点(可以由另一个 Cloud Function 或 Cloud Run 实例提供),该端点直接查询 Firestore 中的 order-read-models 集合。

查询变得极其简单:

query GetOrderDetails($orderId: ID!) {
  order(id: $orderId) {
    orderId
    customerName
    totalPrice
    status
    timestampUtc
  }
}

最大的挑战是处理最终一致性带来的用户体验问题。当用户创建了一个订单后,立即跳转到订单详情页,此时读取模型可能还没有被更新。

处理策略:

  1. UI 轮询: 最简单的方法。在详情页短时间内轮询 GraphQL 端点,直到获取到数据。体验较差。
  2. 乐观更新 (Optimistic UI): 在提交创建请求后,前端立即使用已知的数据(来自提交的表单)渲染一个“伪造”的详情页。然后,在后台进行轮询或等待一个信号来获取真实数据并替换。
  3. WebSocket / GraphQL Subscriptions: 最高级的方案。当订单创建成功后,前端可以订阅一个与该订单ID相关的通知。当 Cloud Function 更新完读取模型后,可以额外发布一个消息(例如到 Pub/Sub),触发一个推送通知给前端,告知数据已准备就绪。

一个常见的错误是,在选择了最终一致性的后端架构后,却期望前端能像在强一致性系统中那样工作。这要求前后端团队紧密协作,共同设计用户体验来优雅地处理数据延迟。

架构的扩展性与局限性

这个架构的美妙之处在于其扩展性。明天如果需要一个搜索功能,我们可以添加一个新的 Elasticsearch Sink Function,同样订阅 order-events 主题,将数据写入 Elasticsearch 集群,而无需对核心的 ASP.NET Core 服务做任何改动。

局限性也同样明显:

  1. 不适用于需要强一致性的读后写场景。 如果业务流程要求用户创建一个订单后,必须马上能读取到并进行下一步操作,那么这种架构就不适用。
  2. 调试和追踪变得复杂。 一个请求的完整生命周期跨越了多个分布式组件。必须依赖 OpenTelemetry 这样的分布式追踪工具来建立端到端的可观测性,否则排查问题将是一场噩梦。
  3. 事件模式(Schema)演进需要严格管理。 一旦事件被发布,就可能有多个消费者依赖其结构。对事件结构的任何更改都需要考虑向后兼容性,这需要引入 Avro 或 Protobuf 这样的模式注册中心(Schema Registry)来强制执行规则。

此架构并非万能药,它是一个权衡。它用最终一致性的复杂性换取了系统组件之间的高度解耦、独立的伸缩能力和长期的技术演进灵活性。对于那些读写压力和数据模型不匹配的系统来说,这是一个值得认真考虑的演进方向。


  目录