在 Kubernetes 上利用 C#、Celery 与 Clojure 构建异构模型推理工作流


我们面临的局面是典型的技术债与创新需求的碰撞:一个用 C# 和 .NET Framework 构建的、庞大而稳定的金融风控单体应用,需要集成一组基于 Python (PyTorch) 的复杂机器学习模型。直接通过 REST API 同步调用模型服务是最初的方案,但在压力测试下暴露了致命弱点:推理延迟的P99值无法接受,且任何模型服务的抖动都会直接引发上游 .NET 应用的线程阻塞,造成雪崩效应。我们需要一个解耦的、异步的、能够独立伸缩的架构。

这个需求将我们引向了一条非典型的技术整合之路。我们的核心系统是 .NET,无法轻易撼动;模型推理是 Python 的主场;而数据预处理流程中有一段极其复杂的、涉及多层嵌套结构转换与高并发计算的逻辑,用 Python 实现不仅性能欠佳,代码也变得难以维护。经过几轮评估,我们最终敲定了一个由 C#、Python/Celery 和 Clojure 组成的多语言(Polyglot)工作流,并统一由 Kubernetes 进行编排。

架构痛点与最终设计

最初的构想是通过 C# 调用 Python 服务,Python 服务再调用另一个 Python 模块进行预处理。问题在于那个预处理模块,它需要处理的金融交易数据结构复杂,包含大量条件分支和递归转换,这在 Python 中实现起来既不优雅,性能也无法满足高吞吐量的要求。

此时,团队中一位有函数式编程背景的工程师提议使用 Clojure。Clojure 基于 JVM,拥有出色的并发能力和处理不可变数据的哲学,对于这种纯粹的数据转换任务堪称绝配。我们进行了一次小规模 PoC,用 Clojure 重写了那段最复杂的预处理逻辑,性能提升了近 4 倍,且代码行数减少了 60%。

决策就此清晰:

  1. C# Orchestrator Service: .NET 应用作为任务的发起方,负责封装原始业务数据,将其作为任务投递到消息队列。
  2. RabbitMQ: 作为解耦的缓冲层和任务总线,连接所有异构服务。
  3. Clojure Preprocessing Service: 一个独立的微服务,接收原始数据,执行高性能的数据转换与特征工程,并通过 HTTP 接口提供服务。
  4. Python Celery Worker: 从 RabbitMQ 消费任务,调用 Clojure 服务进行数据预处理,然后将处理后的数据送入 PyTorch 模型进行推理,最后将结果写回另一个 RabbitMQ 队列。
  5. Kubernetes: 作为统一的部署和运维平台,管理所有服务的生命周期、网络、配置和弹性伸缩。

整个工作流的生命周期如下:

sequenceDiagram
    participant C# Service as C# Orchestrator
    participant RabbitMQ
    participant CeleryWorker as Python Celery Worker
    participant ClojureService as Clojure Preprocessor
    participant MLModel as PyTorch Model

    C# Service->>+RabbitMQ: Publish(TaskRequest)
    RabbitMQ-->>-CeleryWorker: Consume(TaskRequest)
    CeleryWorker->>+ClojureService: HTTP POST /preprocess (raw_data)
    ClojureService-->>-CeleryWorker: Response (processed_features)
    CeleryWorker->>+MLModel: predict(processed_features)
    MLModel-->>-CeleryWorker: prediction_result
    CeleryWorker->>+RabbitMQ: Publish(TaskResult)
    Note right of C# Service: C# Service can optionally 
consume results from another queue.

C# 任务发布器的实现

在 .NET 应用中,我们需要一个健壮的 RabbitMQ 生产者。我们不只是简单地发送消息,而是要确保连接的可靠性、通道的线程安全以及消息的持久化。

// --- RabbitMQConnectionFactory.cs ---
// A singleton factory to manage a single, resilient connection.
// In a real project, use Polly for resilience policies (retry, circuit breaker).
public class RabbitMQConnectionFactory : IDisposable
{
    private readonly ConnectionFactory _factory;
    private IConnection _connection;
    private static readonly object _lock = new object();
    private static RabbitMQConnectionFactory _instance;

    private RabbitMQConnectionFactory(string hostName, string userName, string password)
    {
        _factory = new ConnectionFactory()
        {
            HostName = hostName,
            UserName = userName,
            Password = password,
            DispatchConsumersAsync = true // Important for async consumers
        };
    }

    public static RabbitMQConnectionFactory GetInstance(string host, string user, string pass)
    {
        if (_instance == null)
        {
            lock (_lock)
            {
                if (_instance == null)
                {
                    _instance = new RabbitMQConnectionFactory(host, user, pass);
                }
            }
        }
        return _instance;
    }

    public IConnection GetConnection()
    {
        if (_connection == null || !_connection.IsOpen)
        {
            lock (_lock)
            {
                if (_connection == null || !_connection.IsOpen)
                {
                    // Basic retry logic. Production code should be more robust.
                    _connection = _factory.CreateConnection();
                }
            }
        }
        return _connection;
    }

    public void Dispose()
    {
        _connection?.Close();
        _connection?.Dispose();
    }
}

// --- InferenceTaskPublisher.cs ---
public class InferenceTaskPublisher : IInferenceTaskPublisher
{
    private readonly ILogger<InferenceTaskPublisher> _logger;
    private readonly RabbitMQConnectionFactory _connectionFactory;
    private const string ExchangeName = "inference_exchange";
    private const string RoutingKey = "inference.task.financial";
    private const string QueueName = "financial_inference_tasks";

    public InferenceTaskPublisher(ILogger<InferenceTaskPublisher> logger, RabbitMQConnectionFactory connectionFactory)
    {
        _logger = logger;
        _connectionFactory = connectionFactory;
        SetupRabbitMQ();
    }

    private void SetupRabbitMQ()
    {
        using (var channel = _connectionFactory.GetConnection().CreateModel())
        {
            // Declare a durable, direct exchange
            channel.ExchangeDeclare(ExchangeName, ExchangeType.Direct, durable: true);
            // Declare a durable queue
            channel.QueueDeclare(QueueName, durable: true, exclusive: false, autoDelete: false, arguments: null);
            // Bind them
            channel.QueueBind(QueueName, ExchangeName, RoutingKey);
            _logger.LogInformation("RabbitMQ exchange '{Exchange}', queue '{Queue}', and binding are set up.", ExchangeName, QueueName);
        }
    }

    public Task PublishTaskAsync(InferenceRequest request)
    {
        try
        {
            // It's recommended to create a channel per thread/operation.
            // Channels are lightweight.
            using (var channel = _connectionFactory.GetConnection().CreateModel())
            {
                var messageBody = JsonSerializer.Serialize(request);
                var bodyBytes = Encoding.UTF8.GetBytes(messageBody);

                var properties = channel.CreateBasicProperties();
                properties.Persistent = true; // Mark messages as persistent
                properties.ContentType = "application/json";
                properties.MessageId = request.TaskId.ToString(); // Use a unique ID for tracing

                channel.BasicPublish(
                    exchange: ExchangeName,
                    routingKey: RoutingKey,
                    basicProperties: properties,
                    body: bodyBytes);

                _logger.LogInformation("Task {TaskId} published successfully.", request.TaskId);
            }
        }
        catch (Exception ex)
        {
            _logger.LogError(ex, "Failed to publish task {TaskId}.", request.TaskId);
            // In a real-world scenario, you might re-throw a custom exception
            // or implement a dead-lettering strategy.
            throw;
        }

        return Task.CompletedTask;
    }
}

// --- Example usage in a service ---
public class RiskAssessmentService
{
    private readonly IInferenceTaskPublisher _publisher;

    public RiskAssessmentService(IInferenceTaskPublisher publisher)
    {
        _publisher = publisher;
    }

    public async Task AssessRiskAsync(TransactionData data)
    {
        var request = new InferenceRequest
        {
            TaskId = Guid.NewGuid(),
            Timestamp = DateTime.UtcNow,
            RawData = data
            // ... other metadata
        };
        await _publisher.PublishTaskAsync(request);
    }
}

这里的关键点是:

  1. 连接管理: 使用单例模式管理 IConnection,避免为每个请求创建昂贵的 TCP 连接。
  2. 通道管理: IModel (channel) 是轻量级的,应该在需要时创建并及时释放。它不是线程安全的。
  3. 持久化: Exchange、Queue 和 Message 都被设置为 durablepersistent,确保在 RabbitMQ 重启后任务不会丢失。这是生产环境的基本要求。

Clojure 高性能预处理服务

Clojure 服务我们选择使用 http-kit 构建一个轻量级的 HTTP 服务器。核心是纯函数式的数据转换逻辑。

project.clj:

(defproject preprocessor "0.1.0-SNAPSHOT"
  :description "High-performance data preprocessor"
  :dependencies [[org.clojure/clojure "1.11.1"]
                 [http-kit "2.6.0"]
                 [cheshire "5.11.0"] ; For JSON parsing/generation
                 [org.clojure/tools.logging "1.2.4"]]
  :main ^:skip-aot preprocessor.core
  :target-path "target/%s"
  :profiles {:uberjar {:aot :all}})

src/preprocessor/core.clj:

(ns preprocessor.core
  (:require [org.httpkit.server :as server]
            [cheshire.core :as json]
            [clojure.tools.logging :as log])
  (:gen-class))

(defn- complex-transformation
  "A placeholder for the actual complex, CPU-bound transformation logic.
  This is where Clojure's strengths in data manipulation shine.
  It's a pure function: given the same input, it always returns the same output."
  [data]
  (log/info "Performing transformation for transaction:" (:transactionId data))
  ;; Example: Flatten nested structures, calculate derived fields, one-hot encode categories.
  ;; Using `pmap` for parallelizing parts of the transformation can be a huge win.
  (let [transactions (:transactions data)
        enriched-transactions (pmap
                                (fn [tx]
                                  (assoc tx
                                         :risk_score (rand-int 100) ; Placeholder
                                         :is_high_value (> (:amount tx) 10000)))
                                transactions)]
    {:user_id (:userId data)
     :feature_vector (->> enriched-transactions
                          (map vals)
                          flatten
                          vec)}))

(defn- handle-request
  "HTTP request handler function."
  [request]
  (try
    (let [body (-> request :body slurp (json/parse-string true))]
      (log/info "Received request for user:" (:userId body))
      {:status 200
       :headers {"Content-Type" "application/json"}
       :body (json/generate-string (complex-transformation body))})
    (catch Exception e
      (log/error e "Error processing request")
      {:status 500
       :headers {"Content-Type" "application/json"}
       :body (json/generate-string {:error "Internal Server Error"})})))

(defn -main [& args]
  (let [port (Integer/parseInt (or (System/getenv "APP_PORT") "8080"))]
    (log/info (str "Starting server on port " port))
    (server/run-server handle-request {:port port})))

Dockerfile for the Clojure service:

FROM openjdk:11-jre-slim

WORKDIR /app

# First, copy dependencies and build them to leverage Docker layer caching
COPY project.clj .
RUN lein deps

# Now, copy the source and build the uberjar
COPY src/ /app/src
RUN lein uberjar

# Expose the port the app runs on
EXPOSE 8080

# Set the entrypoint
CMD ["java", "-jar", "target/uberjar/preprocessor-0.1.0-SNAPSHOT-standalone.jar"]

Python Celery Worker 的实现

Celery worker 是整个流程的中枢。它消费任务、委派预处理、执行模型推理。

celery_app/tasks.py:

import os
import logging
import requests
import torch
from celery import Celery
from celery.exceptions import Reject
from requests.exceptions import RequestException

# Configure logging
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s')
logger = logging.getLogger(__name__)

# --- Configuration ---
# In a real app, use environment variables for all configs.
RABBITMQ_BROKER_URL = os.getenv('RABBITMQ_BROKER_URL', 'amqp://guest:guest@localhost:5672//')
PREPROCESSOR_URL = os.getenv('PREPROCESSOR_URL', 'http://localhost:8080/preprocess')
RESULT_EXCHANGE = 'inference_exchange'
RESULT_ROUTING_KEY = 'inference.result.financial'

# --- Celery App Initialization ---
# The backend is not strictly necessary if results are published back to RabbitMQ,
# but can be useful for state tracking. Redis is a common choice.
celery_app = Celery(
    'inference_worker',
    broker=RABBITMQ_BROKER_URL,
    backend='rpc://' # Using RPC result backend as an example
)

celery_app.conf.update(
    task_serializer='json',
    accept_content=['json'],
    result_serializer='json',
    task_track_started=True,
    # Define the queue for our tasks
    task_queues={
        'financial_inference_tasks': {
            'exchange': 'inference_exchange',
            'routing_key': 'inference.task.financial',
        }
    },
    task_default_queue='financial_inference_tasks'
)

# --- Mock Model Loading ---
# In production, this would be a more sophisticated model loading and versioning mechanism.
# The model should be loaded ONCE when the worker starts, not per task.
try:
    # model = torch.load('path/to/your/model.pth')
    # model.eval()
    logger.info("Mock model loaded successfully.")
    model = lambda x: {"prediction": "low_risk", "confidence": 0.95} # Placeholder
except Exception as e:
    logger.error(f"Failed to load model: {e}")
    model = None

# --- Celery Task Definition ---
@celery_app.task(
    bind=True,
    name='tasks.process_inference_request',
    # Automatic retry for transient errors like network issues
    autoretry_for=(RequestException, ConnectionError),
    retry_kwargs={'max_retries': 3, 'countdown': 5} # Retry 3 times, with a 5s delay
)
def process_inference_request(self, task_data: dict):
    """
    The main task that orchestrates the inference pipeline.
    """
    task_id = task_data.get('TaskId', 'N/A')
    logger.info(f"[{task_id}] Received task. Starting preprocessing.")

    if not model:
        logger.error(f"[{task_id}] Model is not loaded. Rejecting task.")
        # Reject the task and requeue it if it's a transient issue.
        # Here, it's a permanent failure, so don't requeue.
        raise Reject('Model not loaded', requeue=False)

    # Step 1: Call Clojure preprocessor
    try:
        # Use a session for connection pooling
        with requests.Session() as session:
            response = session.post(PREPROCESSOR_URL, json=task_data['RawData'], timeout=10)
            response.raise_for_status() # Raises HTTPError for bad responses (4xx or 5xx)
            processed_features = response.json()
        logger.info(f"[{task_id}] Preprocessing successful.")
    except RequestException as e:
        logger.error(f"[{task_id}] Failed to call preprocessor: {e}. Retrying task...")
        # This will trigger the automatic retry mechanism of Celery
        raise self.retry(exc=e)
    except Exception as e:
        logger.error(f"[{task_id}] An unexpected error occurred during preprocessing: {e}")
        # For non-transient errors, don't requeue.
        # Here we could send it to a dead-letter queue.
        raise Reject('Preprocessing failed permanently', requeue=False)


    # Step 2: Run model inference
    try:
        # Assuming the model expects a tensor
        # feature_tensor = torch.tensor(processed_features['feature_vector'], dtype=torch.float32)
        # with torch.no_grad():
        #     prediction = model(feature_tensor)
        prediction = model(processed_features['feature_vector']) # Using mock model
        logger.info(f"[{task_id}] Inference successful.")
    except Exception as e:
        logger.error(f"[{task_id}] Model inference failed: {e}")
        raise Reject('Inference failed', requeue=False)

    # Step 3: Publish result back to RabbitMQ
    result_payload = {
        'taskId': task_id,
        'status': 'SUCCESS',
        'result': prediction,
        'original_request': task_data
    }
    try:
        with self.app.producer_pool.acquire(block=True) as producer:
            producer.publish(
                result_payload,
                serializer='json',
                exchange=RESULT_EXCHANGE,
                routing_key=RESULT_ROUTING_KEY,
                retry=True,
                retry_policy={
                    'interval_start': 0,
                    'interval_step': 2,
                    'interval_max': 30,
                    'max_retries': 3,
                }
            )
        logger.info(f"[{task_id}] Result published successfully.")
    except Exception as e:
        # If publishing the result fails, the task will still be marked as successful
        # by Celery. This is a critical point. A more robust solution might use a two-phase commit
        # or a transactional outbox pattern.
        logger.error(f"[{task_id}] CRITICAL: Task completed but failed to publish result: {e}")

    return result_payload # This goes to the Celery backend

这个 worker 的实现包含了生产级的考量:

  • 错误处理: 明确区分可重试的瞬时错误(如网络问题)和不可重试的永久性错误(如数据格式错误)。autoretry_forReject 是处理这些情况的利器。
  • 配置: 所有配置项都从环境变量读取,遵循十二要素应用原则。
  • 资源管理: 模型在 worker 启动时加载一次,避免重复加载的开销。HTTP 请求使用 Session 来复用 TCP 连接。
  • 结果发布: 将结果发布回 RabbitMQ,而不是仅仅依赖 Celery 的 result backend。这让系统更加解耦,任何需要结果的服务都可以订阅该队列。

Kubernetes 部署清单

最后,用 Kubernetes 将这些异构的服务粘合在一起。

k8s/rabbitmq-statefulset.yaml:

apiVersion: v1
kind: Service
metadata:
  name: rabbitmq
spec:
  selector:
    app: rabbitmq
  ports:
    - name: amqp
      port: 5672
      targetPort: 5672
    - name: management
      port: 15672
      targetPort: 15672
---
apiVersion: apps/v1
kind: StatefulSet
metadata:
  name: rabbitmq
spec:
  serviceName: "rabbitmq"
  replicas: 1 # For production, consider a clustered setup
  selector:
    matchLabels:
      app: rabbitmq
  template:
    metadata:
      labels:
        app: rabbitmq
    spec:
      containers:
      - name: rabbitmq
        image: rabbitmq:3.11-management
        ports:
        - containerPort: 5672
        - containerPort: 15672
        env:
        - name: RABBITMQ_DEFAULT_USER
          value: "user"
        - name: RABBITMQ_DEFAULT_PASS
          value: "password" # Use secrets in production

k8s/deployments.yaml:

apiVersion: v1
kind: ConfigMap
metadata:
  name: app-config
data:
  RABBITMQ_BROKER_URL: "amqp://user:password@rabbitmq:5672//"
  PREPROCESSOR_URL: "http://clojure-preprocessor:8080/preprocess"
---
apiVersion: v1
kind: Service
metadata:
  name: clojure-preprocessor
spec:
  selector:
    app: clojure-preprocessor
  ports:
    - protocol: TCP
      port: 8080
      targetPort: 8080
---
apiVersion: apps/v1
kind: Deployment
metadata:
  name: clojure-preprocessor
spec:
  replicas: 2
  selector:
    matchLabels:
      app: clojure-preprocessor
  template:
    metadata:
      labels:
        app: clojure-preprocessor
    spec:
      containers:
      - name: preprocessor
        image: your-repo/clojure-preprocessor:latest
        ports:
        - containerPort: 8080
        env:
          - name: APP_PORT
            value: "8080"
        resources:
          requests:
            cpu: "250m"
            memory: "512Mi"
          limits:
            cpu: "1"
            memory: "1Gi"
        livenessProbe:
          httpGet:
            path: / # A simple health check endpoint would be better
            port: 8080
          initialDelaySeconds: 15
          periodSeconds: 20
---
apiVersion: apps/v1
kind: Deployment
metadata:
  name: celery-worker
spec:
  replicas: 3 # Can be scaled independently
  selector:
    matchLabels:
      app: celery-worker
  template:
    metadata:
      labels:
        app: celery-worker
    spec:
      containers:
      - name: worker
        image: your-repo/celery-worker:latest
        command: ["celery", "-A", "celery_app.tasks", "worker", "-l", "info", "-Q", "financial_inference_tasks", "-c", "4"]
        envFrom:
        - configMapRef:
            name: app-config
        resources:
          requests:
            cpu: "500m"
            memory: "1Gi"
          limits:
            cpu: "2"
            memory: "4Gi" # ML models can be memory hungry
# C# Service deployment would be similar...

这份配置的要点:

  1. 服务发现: 使用 Kubernetes 内置的 DNS (rabbitmq, clojure-preprocessor) 来让服务间相互通信。
  2. 配置管理: 使用 ConfigMap 来外化配置,注入到 worker 的环境变量中。
  3. 独立伸缩: clojure-preprocessorcelery-worker 是独立的 Deployment,可以根据各自的负载(CPU 密集型 vs. ML 推理等待)独立设置副本数和资源限制。
  4. 健康检查: livenessProbe 确保 Kubernetes 可以在服务无响应时自动重启 Pod,提升系统自愈能力。

局限性与未来展望

这套架构成功地解决了我们最初的性能和稳定性问题,但它并非银弹。当前的瓶颈在于 Celery Worker 对 Clojure 服务的同步 HTTP 调用,这在高并发下会消耗大量 Worker 进程的等待时间。一个明确的优化路径是,将 Clojure 服务也改造成 RabbitMQ 的消费者和生产者,实现一个完全异步、事件驱动的流水线。

此外,跨越三种语言的分布式追踪变得至关重要。虽然我们可以在消息头中手动传递追踪 ID,但引入像 OpenTelemetry 这样的标准化方案,为整个调用链提供统一的可观测性视图,将是提升系统可维护性的下一个关键步骤。对每个组件设置精细化的 SLI/SLO,并利用 KEDA (Kubernetes Event-driven Autoscaling) 基于 RabbitMQ 队列长度进行更智能的弹性伸缩,也是我们未来迭代的方向。


  目录