我们面临的局面是典型的技术债与创新需求的碰撞:一个用 C# 和 .NET Framework 构建的、庞大而稳定的金融风控单体应用,需要集成一组基于 Python (PyTorch) 的复杂机器学习模型。直接通过 REST API 同步调用模型服务是最初的方案,但在压力测试下暴露了致命弱点:推理延迟的P99值无法接受,且任何模型服务的抖动都会直接引发上游 .NET 应用的线程阻塞,造成雪崩效应。我们需要一个解耦的、异步的、能够独立伸缩的架构。
这个需求将我们引向了一条非典型的技术整合之路。我们的核心系统是 .NET,无法轻易撼动;模型推理是 Python 的主场;而数据预处理流程中有一段极其复杂的、涉及多层嵌套结构转换与高并发计算的逻辑,用 Python 实现不仅性能欠佳,代码也变得难以维护。经过几轮评估,我们最终敲定了一个由 C#、Python/Celery 和 Clojure 组成的多语言(Polyglot)工作流,并统一由 Kubernetes 进行编排。
架构痛点与最终设计
最初的构想是通过 C# 调用 Python 服务,Python 服务再调用另一个 Python 模块进行预处理。问题在于那个预处理模块,它需要处理的金融交易数据结构复杂,包含大量条件分支和递归转换,这在 Python 中实现起来既不优雅,性能也无法满足高吞吐量的要求。
此时,团队中一位有函数式编程背景的工程师提议使用 Clojure。Clojure 基于 JVM,拥有出色的并发能力和处理不可变数据的哲学,对于这种纯粹的数据转换任务堪称绝配。我们进行了一次小规模 PoC,用 Clojure 重写了那段最复杂的预处理逻辑,性能提升了近 4 倍,且代码行数减少了 60%。
决策就此清晰:
- C# Orchestrator Service: .NET 应用作为任务的发起方,负责封装原始业务数据,将其作为任务投递到消息队列。
- RabbitMQ: 作为解耦的缓冲层和任务总线,连接所有异构服务。
- Clojure Preprocessing Service: 一个独立的微服务,接收原始数据,执行高性能的数据转换与特征工程,并通过 HTTP 接口提供服务。
- Python Celery Worker: 从 RabbitMQ 消费任务,调用 Clojure 服务进行数据预处理,然后将处理后的数据送入 PyTorch 模型进行推理,最后将结果写回另一个 RabbitMQ 队列。
- Kubernetes: 作为统一的部署和运维平台,管理所有服务的生命周期、网络、配置和弹性伸缩。
整个工作流的生命周期如下:
sequenceDiagram participant C# Service as C# Orchestrator participant RabbitMQ participant CeleryWorker as Python Celery Worker participant ClojureService as Clojure Preprocessor participant MLModel as PyTorch Model C# Service->>+RabbitMQ: Publish(TaskRequest) RabbitMQ-->>-CeleryWorker: Consume(TaskRequest) CeleryWorker->>+ClojureService: HTTP POST /preprocess (raw_data) ClojureService-->>-CeleryWorker: Response (processed_features) CeleryWorker->>+MLModel: predict(processed_features) MLModel-->>-CeleryWorker: prediction_result CeleryWorker->>+RabbitMQ: Publish(TaskResult) Note right of C# Service: C# Service can optionally
consume results from another queue.
C# 任务发布器的实现
在 .NET 应用中,我们需要一个健壮的 RabbitMQ 生产者。我们不只是简单地发送消息,而是要确保连接的可靠性、通道的线程安全以及消息的持久化。
// --- RabbitMQConnectionFactory.cs ---
// A singleton factory to manage a single, resilient connection.
// In a real project, use Polly for resilience policies (retry, circuit breaker).
public class RabbitMQConnectionFactory : IDisposable
{
private readonly ConnectionFactory _factory;
private IConnection _connection;
private static readonly object _lock = new object();
private static RabbitMQConnectionFactory _instance;
private RabbitMQConnectionFactory(string hostName, string userName, string password)
{
_factory = new ConnectionFactory()
{
HostName = hostName,
UserName = userName,
Password = password,
DispatchConsumersAsync = true // Important for async consumers
};
}
public static RabbitMQConnectionFactory GetInstance(string host, string user, string pass)
{
if (_instance == null)
{
lock (_lock)
{
if (_instance == null)
{
_instance = new RabbitMQConnectionFactory(host, user, pass);
}
}
}
return _instance;
}
public IConnection GetConnection()
{
if (_connection == null || !_connection.IsOpen)
{
lock (_lock)
{
if (_connection == null || !_connection.IsOpen)
{
// Basic retry logic. Production code should be more robust.
_connection = _factory.CreateConnection();
}
}
}
return _connection;
}
public void Dispose()
{
_connection?.Close();
_connection?.Dispose();
}
}
// --- InferenceTaskPublisher.cs ---
public class InferenceTaskPublisher : IInferenceTaskPublisher
{
private readonly ILogger<InferenceTaskPublisher> _logger;
private readonly RabbitMQConnectionFactory _connectionFactory;
private const string ExchangeName = "inference_exchange";
private const string RoutingKey = "inference.task.financial";
private const string QueueName = "financial_inference_tasks";
public InferenceTaskPublisher(ILogger<InferenceTaskPublisher> logger, RabbitMQConnectionFactory connectionFactory)
{
_logger = logger;
_connectionFactory = connectionFactory;
SetupRabbitMQ();
}
private void SetupRabbitMQ()
{
using (var channel = _connectionFactory.GetConnection().CreateModel())
{
// Declare a durable, direct exchange
channel.ExchangeDeclare(ExchangeName, ExchangeType.Direct, durable: true);
// Declare a durable queue
channel.QueueDeclare(QueueName, durable: true, exclusive: false, autoDelete: false, arguments: null);
// Bind them
channel.QueueBind(QueueName, ExchangeName, RoutingKey);
_logger.LogInformation("RabbitMQ exchange '{Exchange}', queue '{Queue}', and binding are set up.", ExchangeName, QueueName);
}
}
public Task PublishTaskAsync(InferenceRequest request)
{
try
{
// It's recommended to create a channel per thread/operation.
// Channels are lightweight.
using (var channel = _connectionFactory.GetConnection().CreateModel())
{
var messageBody = JsonSerializer.Serialize(request);
var bodyBytes = Encoding.UTF8.GetBytes(messageBody);
var properties = channel.CreateBasicProperties();
properties.Persistent = true; // Mark messages as persistent
properties.ContentType = "application/json";
properties.MessageId = request.TaskId.ToString(); // Use a unique ID for tracing
channel.BasicPublish(
exchange: ExchangeName,
routingKey: RoutingKey,
basicProperties: properties,
body: bodyBytes);
_logger.LogInformation("Task {TaskId} published successfully.", request.TaskId);
}
}
catch (Exception ex)
{
_logger.LogError(ex, "Failed to publish task {TaskId}.", request.TaskId);
// In a real-world scenario, you might re-throw a custom exception
// or implement a dead-lettering strategy.
throw;
}
return Task.CompletedTask;
}
}
// --- Example usage in a service ---
public class RiskAssessmentService
{
private readonly IInferenceTaskPublisher _publisher;
public RiskAssessmentService(IInferenceTaskPublisher publisher)
{
_publisher = publisher;
}
public async Task AssessRiskAsync(TransactionData data)
{
var request = new InferenceRequest
{
TaskId = Guid.NewGuid(),
Timestamp = DateTime.UtcNow,
RawData = data
// ... other metadata
};
await _publisher.PublishTaskAsync(request);
}
}
这里的关键点是:
- 连接管理: 使用单例模式管理
IConnection
,避免为每个请求创建昂贵的 TCP 连接。 - 通道管理:
IModel
(channel) 是轻量级的,应该在需要时创建并及时释放。它不是线程安全的。 - 持久化: Exchange、Queue 和 Message 都被设置为
durable
或persistent
,确保在 RabbitMQ 重启后任务不会丢失。这是生产环境的基本要求。
Clojure 高性能预处理服务
Clojure 服务我们选择使用 http-kit
构建一个轻量级的 HTTP 服务器。核心是纯函数式的数据转换逻辑。
project.clj
:
(defproject preprocessor "0.1.0-SNAPSHOT"
:description "High-performance data preprocessor"
:dependencies [[org.clojure/clojure "1.11.1"]
[http-kit "2.6.0"]
[cheshire "5.11.0"] ; For JSON parsing/generation
[org.clojure/tools.logging "1.2.4"]]
:main ^:skip-aot preprocessor.core
:target-path "target/%s"
:profiles {:uberjar {:aot :all}})
src/preprocessor/core.clj
:
(ns preprocessor.core
(:require [org.httpkit.server :as server]
[cheshire.core :as json]
[clojure.tools.logging :as log])
(:gen-class))
(defn- complex-transformation
"A placeholder for the actual complex, CPU-bound transformation logic.
This is where Clojure's strengths in data manipulation shine.
It's a pure function: given the same input, it always returns the same output."
[data]
(log/info "Performing transformation for transaction:" (:transactionId data))
;; Example: Flatten nested structures, calculate derived fields, one-hot encode categories.
;; Using `pmap` for parallelizing parts of the transformation can be a huge win.
(let [transactions (:transactions data)
enriched-transactions (pmap
(fn [tx]
(assoc tx
:risk_score (rand-int 100) ; Placeholder
:is_high_value (> (:amount tx) 10000)))
transactions)]
{:user_id (:userId data)
:feature_vector (->> enriched-transactions
(map vals)
flatten
vec)}))
(defn- handle-request
"HTTP request handler function."
[request]
(try
(let [body (-> request :body slurp (json/parse-string true))]
(log/info "Received request for user:" (:userId body))
{:status 200
:headers {"Content-Type" "application/json"}
:body (json/generate-string (complex-transformation body))})
(catch Exception e
(log/error e "Error processing request")
{:status 500
:headers {"Content-Type" "application/json"}
:body (json/generate-string {:error "Internal Server Error"})})))
(defn -main [& args]
(let [port (Integer/parseInt (or (System/getenv "APP_PORT") "8080"))]
(log/info (str "Starting server on port " port))
(server/run-server handle-request {:port port})))
Dockerfile
for the Clojure service:
FROM openjdk:11-jre-slim
WORKDIR /app
# First, copy dependencies and build them to leverage Docker layer caching
COPY project.clj .
RUN lein deps
# Now, copy the source and build the uberjar
COPY src/ /app/src
RUN lein uberjar
# Expose the port the app runs on
EXPOSE 8080
# Set the entrypoint
CMD ["java", "-jar", "target/uberjar/preprocessor-0.1.0-SNAPSHOT-standalone.jar"]
Python Celery Worker 的实现
Celery worker 是整个流程的中枢。它消费任务、委派预处理、执行模型推理。
celery_app/tasks.py
:
import os
import logging
import requests
import torch
from celery import Celery
from celery.exceptions import Reject
from requests.exceptions import RequestException
# Configure logging
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s')
logger = logging.getLogger(__name__)
# --- Configuration ---
# In a real app, use environment variables for all configs.
RABBITMQ_BROKER_URL = os.getenv('RABBITMQ_BROKER_URL', 'amqp://guest:guest@localhost:5672//')
PREPROCESSOR_URL = os.getenv('PREPROCESSOR_URL', 'http://localhost:8080/preprocess')
RESULT_EXCHANGE = 'inference_exchange'
RESULT_ROUTING_KEY = 'inference.result.financial'
# --- Celery App Initialization ---
# The backend is not strictly necessary if results are published back to RabbitMQ,
# but can be useful for state tracking. Redis is a common choice.
celery_app = Celery(
'inference_worker',
broker=RABBITMQ_BROKER_URL,
backend='rpc://' # Using RPC result backend as an example
)
celery_app.conf.update(
task_serializer='json',
accept_content=['json'],
result_serializer='json',
task_track_started=True,
# Define the queue for our tasks
task_queues={
'financial_inference_tasks': {
'exchange': 'inference_exchange',
'routing_key': 'inference.task.financial',
}
},
task_default_queue='financial_inference_tasks'
)
# --- Mock Model Loading ---
# In production, this would be a more sophisticated model loading and versioning mechanism.
# The model should be loaded ONCE when the worker starts, not per task.
try:
# model = torch.load('path/to/your/model.pth')
# model.eval()
logger.info("Mock model loaded successfully.")
model = lambda x: {"prediction": "low_risk", "confidence": 0.95} # Placeholder
except Exception as e:
logger.error(f"Failed to load model: {e}")
model = None
# --- Celery Task Definition ---
@celery_app.task(
bind=True,
name='tasks.process_inference_request',
# Automatic retry for transient errors like network issues
autoretry_for=(RequestException, ConnectionError),
retry_kwargs={'max_retries': 3, 'countdown': 5} # Retry 3 times, with a 5s delay
)
def process_inference_request(self, task_data: dict):
"""
The main task that orchestrates the inference pipeline.
"""
task_id = task_data.get('TaskId', 'N/A')
logger.info(f"[{task_id}] Received task. Starting preprocessing.")
if not model:
logger.error(f"[{task_id}] Model is not loaded. Rejecting task.")
# Reject the task and requeue it if it's a transient issue.
# Here, it's a permanent failure, so don't requeue.
raise Reject('Model not loaded', requeue=False)
# Step 1: Call Clojure preprocessor
try:
# Use a session for connection pooling
with requests.Session() as session:
response = session.post(PREPROCESSOR_URL, json=task_data['RawData'], timeout=10)
response.raise_for_status() # Raises HTTPError for bad responses (4xx or 5xx)
processed_features = response.json()
logger.info(f"[{task_id}] Preprocessing successful.")
except RequestException as e:
logger.error(f"[{task_id}] Failed to call preprocessor: {e}. Retrying task...")
# This will trigger the automatic retry mechanism of Celery
raise self.retry(exc=e)
except Exception as e:
logger.error(f"[{task_id}] An unexpected error occurred during preprocessing: {e}")
# For non-transient errors, don't requeue.
# Here we could send it to a dead-letter queue.
raise Reject('Preprocessing failed permanently', requeue=False)
# Step 2: Run model inference
try:
# Assuming the model expects a tensor
# feature_tensor = torch.tensor(processed_features['feature_vector'], dtype=torch.float32)
# with torch.no_grad():
# prediction = model(feature_tensor)
prediction = model(processed_features['feature_vector']) # Using mock model
logger.info(f"[{task_id}] Inference successful.")
except Exception as e:
logger.error(f"[{task_id}] Model inference failed: {e}")
raise Reject('Inference failed', requeue=False)
# Step 3: Publish result back to RabbitMQ
result_payload = {
'taskId': task_id,
'status': 'SUCCESS',
'result': prediction,
'original_request': task_data
}
try:
with self.app.producer_pool.acquire(block=True) as producer:
producer.publish(
result_payload,
serializer='json',
exchange=RESULT_EXCHANGE,
routing_key=RESULT_ROUTING_KEY,
retry=True,
retry_policy={
'interval_start': 0,
'interval_step': 2,
'interval_max': 30,
'max_retries': 3,
}
)
logger.info(f"[{task_id}] Result published successfully.")
except Exception as e:
# If publishing the result fails, the task will still be marked as successful
# by Celery. This is a critical point. A more robust solution might use a two-phase commit
# or a transactional outbox pattern.
logger.error(f"[{task_id}] CRITICAL: Task completed but failed to publish result: {e}")
return result_payload # This goes to the Celery backend
这个 worker 的实现包含了生产级的考量:
- 错误处理: 明确区分可重试的瞬时错误(如网络问题)和不可重试的永久性错误(如数据格式错误)。
autoretry_for
和Reject
是处理这些情况的利器。 - 配置: 所有配置项都从环境变量读取,遵循十二要素应用原则。
- 资源管理: 模型在 worker 启动时加载一次,避免重复加载的开销。HTTP 请求使用
Session
来复用 TCP 连接。 - 结果发布: 将结果发布回 RabbitMQ,而不是仅仅依赖 Celery 的 result backend。这让系统更加解耦,任何需要结果的服务都可以订阅该队列。
Kubernetes 部署清单
最后,用 Kubernetes 将这些异构的服务粘合在一起。
k8s/rabbitmq-statefulset.yaml
:
apiVersion: v1
kind: Service
metadata:
name: rabbitmq
spec:
selector:
app: rabbitmq
ports:
- name: amqp
port: 5672
targetPort: 5672
- name: management
port: 15672
targetPort: 15672
---
apiVersion: apps/v1
kind: StatefulSet
metadata:
name: rabbitmq
spec:
serviceName: "rabbitmq"
replicas: 1 # For production, consider a clustered setup
selector:
matchLabels:
app: rabbitmq
template:
metadata:
labels:
app: rabbitmq
spec:
containers:
- name: rabbitmq
image: rabbitmq:3.11-management
ports:
- containerPort: 5672
- containerPort: 15672
env:
- name: RABBITMQ_DEFAULT_USER
value: "user"
- name: RABBITMQ_DEFAULT_PASS
value: "password" # Use secrets in production
k8s/deployments.yaml
:
apiVersion: v1
kind: ConfigMap
metadata:
name: app-config
data:
RABBITMQ_BROKER_URL: "amqp://user:password@rabbitmq:5672//"
PREPROCESSOR_URL: "http://clojure-preprocessor:8080/preprocess"
---
apiVersion: v1
kind: Service
metadata:
name: clojure-preprocessor
spec:
selector:
app: clojure-preprocessor
ports:
- protocol: TCP
port: 8080
targetPort: 8080
---
apiVersion: apps/v1
kind: Deployment
metadata:
name: clojure-preprocessor
spec:
replicas: 2
selector:
matchLabels:
app: clojure-preprocessor
template:
metadata:
labels:
app: clojure-preprocessor
spec:
containers:
- name: preprocessor
image: your-repo/clojure-preprocessor:latest
ports:
- containerPort: 8080
env:
- name: APP_PORT
value: "8080"
resources:
requests:
cpu: "250m"
memory: "512Mi"
limits:
cpu: "1"
memory: "1Gi"
livenessProbe:
httpGet:
path: / # A simple health check endpoint would be better
port: 8080
initialDelaySeconds: 15
periodSeconds: 20
---
apiVersion: apps/v1
kind: Deployment
metadata:
name: celery-worker
spec:
replicas: 3 # Can be scaled independently
selector:
matchLabels:
app: celery-worker
template:
metadata:
labels:
app: celery-worker
spec:
containers:
- name: worker
image: your-repo/celery-worker:latest
command: ["celery", "-A", "celery_app.tasks", "worker", "-l", "info", "-Q", "financial_inference_tasks", "-c", "4"]
envFrom:
- configMapRef:
name: app-config
resources:
requests:
cpu: "500m"
memory: "1Gi"
limits:
cpu: "2"
memory: "4Gi" # ML models can be memory hungry
# C# Service deployment would be similar...
这份配置的要点:
- 服务发现: 使用 Kubernetes 内置的 DNS (
rabbitmq
,clojure-preprocessor
) 来让服务间相互通信。 - 配置管理: 使用
ConfigMap
来外化配置,注入到 worker 的环境变量中。 - 独立伸缩:
clojure-preprocessor
和celery-worker
是独立的Deployment
,可以根据各自的负载(CPU 密集型 vs. ML 推理等待)独立设置副本数和资源限制。 - 健康检查:
livenessProbe
确保 Kubernetes 可以在服务无响应时自动重启 Pod,提升系统自愈能力。
局限性与未来展望
这套架构成功地解决了我们最初的性能和稳定性问题,但它并非银弹。当前的瓶颈在于 Celery Worker 对 Clojure 服务的同步 HTTP 调用,这在高并发下会消耗大量 Worker 进程的等待时间。一个明确的优化路径是,将 Clojure 服务也改造成 RabbitMQ 的消费者和生产者,实现一个完全异步、事件驱动的流水线。
此外,跨越三种语言的分布式追踪变得至关重要。虽然我们可以在消息头中手动传递追踪 ID,但引入像 OpenTelemetry 这样的标准化方案,为整个调用链提供统一的可观测性视图,将是提升系统可维护性的下一个关键步骤。对每个组件设置精细化的 SLI/SLO,并利用 KEDA (Kubernetes Event-driven Autoscaling) 基于 RabbitMQ 队列长度进行更智能的弹性伸缩,也是我们未来迭代的方向。