基于BentoML、Iceberg与SSG构建不可变机器学习模型评估报告流水线


团队的模型性能追踪一度陷入混乱。评估指标散落在不同的Jupyter Notebook输出、CSV文件和临时的监控图表中。当需要回溯三个月前某个特定模型版本(比如 v0.7.1)为何在特定数据切片上表现优于当前版本(v1.2.0)时,整个过程就变成了一场灾难性的数据考古。我们需要的是一套具备版本控制、可审计且完全不可变的模型评估报告系统,而不是一个需要持续维护的动态Web服务。

最初的构想是搭建一个常规的Web应用,后端连接一个SQL数据库来存储所有评估结果。但这很快被否决。这种方案引入了状态管理、数据库维护和应用部署的复杂性。更重要的是,它无法保证报告的不可变性——数据库中的数据可能被无意修改,应用逻辑的变更也可能导致同一份原始数据呈现出不同的结果。我们追求的是一种更接近“代码即报告”的理念,每一份报告都应该是评估流程在某个时间点的精确快照,像一个Git commit一样永恒不变。

经过几轮讨论和原型验证,我们确定了一套非主流但极为契合我们目标的架构:利用BentoML标准化模型服务,将详尽的评估结果原子性地写入基于Apache Iceberg的数据湖,并最终通过静态站点生成器(SSG)生成完全独立的、可永久归档的HTML报告。

技术选型决策:为何是BentoML, Iceberg和SSG的组合

在真实项目中,每一个技术决策都源于对具体问题的权衡。

  1. BentoML:解耦模型与评估逻辑。 我们需要一个标准化的方式来调用任何模型,无论它是用TensorFlow、PyTorch还是Scikit-learn构建的。BentoML让我们能将模型及其依赖打包成一个标准的、可预测的REST API服务(一个Bento)。这使得我们的评估运行器(Evaluation Runner)无需关心模型的内部实现,只需向一个固定的HTTP端点发送请求。这种解耦极大地简化了整个流水线的复杂性。

  2. Apache Iceberg on Data Lake:保证评估结果的原子性和可追溯性。 简单地将评估结果(比如每个样本的预测值、真实值、置信度)写入S3上的Parquet文件是脆弱的。如果评估任务中途失败,我们可能会得到一个不完整或损坏的结果集。Iceberg通过其事务性提交机制解决了这个问题。一次完整的评估运行对应着对Iceberg表的一次原子性提交。这不仅保证了数据的完整性,其强大的时间旅行(Time-travel)能力也意味着我们可以精确查询到任何一次历史提交时的完整数据快照,这是实现“可审计”目标的核心。

  3. Static Site Generator (SSG):实现报告的不可变性与低成本。 动态仪表盘(如Grafana或自研Web应用)需要持续运行的服务和对数据源的实时查询,这既带来了成本,也引入了可变性。而SSG(我们选择了Hugo)则是在构建时一次性地从数据源(此场景下是我们从Iceberg查询并预处理好的JSON文件)拉取数据,并生成纯静态的HTML/CSS/JS文件。这些文件一旦生成便不再改变。每个模型版本的报告都可以被部署到S3、Cloudflare Pages这类廉价的对象存储上,访问速度快、安全性高,且维护成本几乎为零。这完美契合了我们对“不可变报告”的定义。

架构与工作流概览

整个流水线由事件驱动,以下是其核心流程的Mermaid图示:

graph TD
    subgraph "CI/CD Pipeline"
        A[Git Push: New model code or training config] --> B{Build & Test};
        B --> C[Package Model with BentoML];
        C --> D[Push Bento to Registry];
    end

    subgraph "Evaluation & Reporting Pipeline (e.g., Airflow DAG)"
        E[Trigger: New Bento available] --> F[Start Evaluation Runner];
        F -- 1. Load golden dataset --> F;
        F -- 2. Pull & run BentoML service --> G(BentoML Model Server);
        F -- 3. Send requests & collect results --> G;
        F -- 4. Batch results --> H[Write to Iceberg Table];
        H -- Atomic Commit on S3 Data Lake --> I{Apache Iceberg Table};
        H -- on success --> J[Trigger Report Generator];
        J -- 1. Query Iceberg Table for latest run --> I;
        J -- 2. Aggregate metrics & generate JSON --> K[report-data.json];
        J -- 3. Run SSG Build --> L[Hugo Build Process];
        L -- uses --> K;
        L --> M[Static HTML Report];
        M --> N[Deploy to Object Storage];
    end

    style G fill:#f9f,stroke:#333,stroke-width:2px
    style I fill:#ccf,stroke:#333,stroke-width:2px
    style N fill:#bbf,stroke:#333,stroke-width:2px

第一阶段:标准化模型服务 - BentoML实现

我们以一个简单的文本分类模型为例。关键在于定义清晰的输入/输出模式,并打包所有依赖。

service.py

import bentoml
import numpy as np
import pandas as pd
from bentoml.io import JSON, PandasDataFrame

# 假设我们有一个预训练好的模型,这里用一个tag来引用它
# 在真实的CI流程中,这个模型会被训练、保存并打上唯一的标签
MODEL_TAG = "text_classifier:latest" 

# 定义模型的runner
classifier_runner = bentoml.sklearn.get(MODEL_TAG).to_runner()

# 创建BentoML服务
# 这里的服务名和runner名会在后续的配置和调用中使用
svc = bentoml.Service(
    name="sentiment_analysis_service",
    runners=[classifier_runner]
)

# 定义API的输入和输出格式
# 使用PandasDataFrame可以方便地处理批量请求
INPUT_SPEC = PandasDataFrame.from_sample(pd.DataFrame({"text": ["some example text"]}))

@svc.api(input=INPUT_SPEC, output=JSON())
async def classify(input_df: pd.DataFrame) -> dict:
    """
    接收包含'text'列的DataFrame,返回一个包含预测结果和置信度的JSON。
    在生产环境中,这里应该包含更健壮的输入验证和错误处理。
    """
    try:
        # 实际的模型调用
        # `async_run` 适用于IO密集型或需要高并发的场景
        results = await classifier_runner.predict_proba.async_run(input_df["text"])
        
        # 结果处理
        predictions = np.argmax(results, axis=1)
        confidences = np.max(results, axis=1)
        
        # 转换为对客户端友好的格式
        output = {
            "predictions": predictions.tolist(),
            "confidences": confidences.tolist()
        }
        return output
    except Exception as e:
        # 简单的错误处理和日志记录
        # 在生产代码中,应使用结构化日志
        print(f"Error during prediction: {e}")
        # 返回一个明确的错误响应
        return {"error": "Prediction failed", "details": str(e)}

bentofile.yaml
这个文件是BentoML项目的核心,定义了服务的元数据、依赖和包含的模型。

service: "service:svc"
labels:
  owner: ml-platform-team
  stage: production
description: "Sentiment analysis model for production use."
include:
  - "*.py" # 包含所有Python文件
python:
  packages:
    - scikit-learn==1.2.2
    - pandas==1.5.3
    - numpy==1.24.2
models:
  - "text_classifier:latest" # 声明依赖的模型

在CI流程中,我们通过 bentoml build 命令来构建Bento,然后通过 bentoml push 将其推送到BentoML的中央仓库(如Yatai或自建的S3存储)。

第二阶段:原子化结果持久化 - 评估运行器与Iceberg

这是整个流水线的核心。评估运行器负责编排整个评估流程,并将结果可靠地写入数据湖。

import os
import uuid
import subprocess
import time
import requests
import pandas as pd
from pyspark.sql import SparkSession, DataFrame
from pyspark.sql.functions import lit, col
from pyspark.sql.types import StructType, StructField, StringType, IntegerType, DoubleType, TimestampType
from datetime import datetime

# --- 配置项 ---
# 在生产中,这些应该来自配置文件或环境变量
BENTO_TAG = os.getenv("BENTO_TAG", "sentiment_analysis_service:latest")
GOLDEN_DATASET_PATH = os.getenv("GOLDEN_DATASET_PATH", "s3a://ml-datasets/golden/sentiment-v2.parquet")
ICEBERG_CATALOG_URI = os.getenv("ICEBERG_CATALOG_URI", "http://rest-catalog:8181")
ICEBERG_WAREHOUSE = os.getenv("ICEBERG_WAREHOUSE", "s3a://ml-datalake/warehouse")
ICEBERG_TABLE_NAME = "ml_eval_results.sentiment_model_predictions"
MODEL_VERSION = os.getenv("MODEL_VERSION", "v1.2.0") # 这个版本号至关重要

class EvaluationRunner:
    def __init__(self, run_id: str):
        self.run_id = run_id
        self.bento_server_process = None
        self.spark = self._init_spark()

    def _init_spark(self) -> SparkSession:
        """初始化Spark Session以支持Iceberg"""
        return SparkSession.builder \
            .appName(f"MLEvaluationRun-{self.run_id}") \
            .config("spark.sql.extensions", "org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions") \
            .config(f"spark.sql.catalog.ml_eval_results", "org.apache.iceberg.spark.SparkCatalog") \
            .config(f"spark.sql.catalog.ml_eval_results.type", "rest") \
            .config(f"spark.sql.catalog.ml_eval_results.uri", ICEBERG_CATALOG_URI) \
            .config(f"spark.sql.catalog.ml_eval_results.warehouse", ICEBERG_WAREHOUSE) \
            .getOrCreate()

    def _start_bento_server(self):
        """在子进程中启动BentoML服务"""
        print(f"Starting BentoML server for {BENTO_TAG}...")
        # 使用bentoml serve-http-grpc可以同时暴露HTTP和gRPC,但这里只用HTTP
        self.bento_server_process = subprocess.Popen(
            ["bentoml", "serve", BENTO_TAG, "--production"],
            stdout=subprocess.PIPE,
            stderr=subprocess.PIPE
        )
        # 等待服务启动。生产中需要更可靠的健康检查。
        time.sleep(15) 
        print("BentoML server started.")

    def _stop_bento_server(self):
        """停止BentoML服务"""
        if self.bento_server_process:
            self.bento_server_process.terminate()
            self.bento_server_process.wait()
            print("BentoML server stopped.")

    def run(self):
        try:
            self._start_bento_server()
            
            # 1. 加载黄金数据集
            golden_df = pd.read_parquet(GOLDEN_DATASET_PATH)
            
            # 2. 发起预测请求
            # 这里的批量大小需要根据模型和服务器资源进行调优
            response = requests.post(
                "http://127.0.0.1:3000/classify",
                headers={"content-type": "application/json"},
                data=golden_df[['text']].to_json(orient="split")
            ).json()

            # 3. 合并结果
            golden_df['prediction'] = response['predictions']
            golden_df['confidence'] = response['confidences']

            # 4. 转换为Spark DataFrame并写入Iceberg
            self._write_to_iceberg(golden_df)

        except Exception as e:
            print(f"Evaluation run {self.run_id} failed: {e}")
            # 这里的错误处理至关重要,确保不会有部分结果写入
            raise
        finally:
            self._stop_bento_server()

    def _write_to_iceberg(self, results_df: pd.DataFrame):
        """将评估结果原子性地写入Iceberg表"""
        print("Preparing data for Iceberg write...")
        spark_df = self.spark.createDataFrame(results_df)

        # 添加元数据列,这对于后续的查询和报告至关重要
        final_df = spark_df.withColumn("run_id", lit(self.run_id)) \
                           .withColumn("model_version", lit(MODEL_VERSION)) \
                           .withColumn("eval_timestamp", lit(datetime.utcnow()))
        
        print(f"Writing {final_df.count()} records to {ICEBERG_TABLE_NAME}...")
        
        # 这里的mode("append")和Iceberg的事务性保证了操作的原子性
        # 如果写入过程中Spark任务失败,Iceberg表不会有任何变更
        final_df.writeTo(ICEBERG_TABLE_NAME).append()

        print("Successfully wrote evaluation results to Iceberg.")

if __name__ == "__main__":
    # 每次运行都有一个唯一的ID
    current_run_id = str(uuid.uuid4())
    print(f"Starting evaluation run with ID: {current_run_id}")
    runner = EvaluationRunner(run_id=current_run_id)
    runner.run()

这个脚本的核心在于 _write_to_iceberg 方法。writeTo(...).append() 是一个原子操作。如果Spark集群中的任何一个task失败,整个写入事务都会回滚,数据湖中的表状态保持不变,从而避免了数据污染。

第三阶段:生成不可变报告 - SSG构建流程

当评估结果成功写入Iceberg后,下游的报告生成任务被触发。这个任务分为两步:查询和聚合数据,然后调用SSG构建站点。

report_generator.py

import json
import os
import subprocess
import pandas as pd
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, lit, when, sum, count

# --- 配置项 ---
ICEBERG_CATALOG_URI = os.getenv("ICEBERG_CATALOG_URI", "http://rest-catalog:8181")
ICEBERG_WAREHOUSE = os.getenv("ICEBERG_WAREHOUSE", "s3a://ml-datalake/warehouse")
ICEBERG_TABLE_NAME = "ml_eval_results.sentiment_model_predictions"
# 在真实的流水线中,这个版本号会由上游任务传递过来
MODEL_VERSION_TO_REPORT = os.getenv("MODEL_VERSION", "v1.2.0") 
HUGO_PROJECT_PATH = "./hugo-report-site"
OUTPUT_DATA_PATH = os.path.join(HUGO_PROJECT_PATH, "data", "metrics.json")

class ReportGenerator:
    def __init__(self, model_version: str):
        self.model_version = model_version
        self.spark = self._init_spark()

    def _init_spark(self):
        # 初始化与评估运行器相同的Spark Session
        # ... (代码同上) ...
        return SparkSession.builder \
            .appName(f"ReportGenerator-{self.model_version}") \
            .config("spark.sql.extensions", "org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions") \
            .config(f"spark.sql.catalog.ml_eval_results", "org.apache.iceberg.spark.SparkCatalog") \
            .config(f"spark.sql.catalog.ml_eval_results.type", "rest") \
            .config(f"spark.sql.catalog.ml_eval_results.uri", ICEBERG_CATALOG_URI) \
            .config(f"spark.sql.catalog.ml_eval_results.warehouse", ICEBERG_WAREHOUSE) \
            .getOrCreate()
            
    def generate(self):
        print(f"Generating report for model version: {self.model_version}")
        
        # 1. 从Iceberg读取特定版本的数据
        try:
            results_df = self.spark.read.table(ICEBERG_TABLE_NAME) \
                .filter(col("model_version") == self.model_version)
            
            # 一个常见的错误是在这里加载全表数据,过滤必须下推
            # Iceberg的分区和元数据可以极大地加速这个查询
            if results_df.count() == 0:
                raise ValueError(f"No data found for model version {self.model_version}")

        except Exception as e:
            print(f"Failed to read data from Iceberg: {e}")
            raise

        # 2. 计算关键指标 (例如,准确率,混淆矩阵)
        metrics = self._calculate_metrics(results_df)

        # 3. 将指标写入Hugo项目中的data目录
        os.makedirs(os.path.dirname(OUTPUT_DATA_PATH), exist_ok=True)
        with open(OUTPUT_DATA_PATH, 'w') as f:
            json.dump(metrics, f, indent=2)
        print(f"Metrics written to {OUTPUT_DATA_PATH}")

        # 4. 调用Hugo构建静态站点
        self._build_static_site()

    def _calculate_metrics(self, df):
        """使用Spark计算性能指标"""
        total_count = df.count()
        
        correct_predictions = df.withColumn("is_correct", when(col("prediction") == col("ground_truth"), 1).otherwise(0)) \
                                .agg(sum("is_correct").alias("correct_count")).collect()[0]['correct_count']
        
        accuracy = (correct_predictions / total_count) if total_count > 0 else 0

        # ... 此处可以添加更复杂的指标计算,如Precision, Recall, F1, Confusion Matrix等
        
        return {
            "modelVersion": self.model_version,
            "evaluationTimestamp": datetime.utcnow().isoformat(),
            "totalSamples": total_count,
            "overallAccuracy": accuracy,
            # 可以在此添加更多聚合数据,供前端图表使用
        }
        
    def _build_static_site(self):
        """执行Hugo build命令"""
        print("Building static site with Hugo...")
        try:
            # 目标输出目录可以根据版本号来定
            output_dir = f"public/{self.model_version}"
            result = subprocess.run(
                ["hugo", "-s", HUGO_PROJECT_PATH, "-d", output_dir],
                check=True,
                capture_output=True,
                text=True
            )
            print("Hugo build successful.")
            print(result.stdout)
        except subprocess.CalledProcessError as e:
            print("Hugo build failed!")
            print(e.stderr)
            raise

if __name__ == "__main__":
    generator = ReportGenerator(model_version=MODEL_VERSION_TO_REPORT)
    generator.generate()

Hugo模板 (hugo-report-site/layouts/_default/single.html)
这个模板会读取 data/metrics.json 文件并渲染页面。

<!DOCTYPE html>
<html>
<head>
    <title>Model Evaluation Report - {{ .Site.Data.metrics.modelVersion }}</title>
    <!-- 引入图表库,例如Chart.js -->
</head>
<body>
    <h1>Model Evaluation Report</h1>
    <h2>Version: <strong>{{ .Site.Data.metrics.modelVersion }}</strong></h2>
    <p>Generated on: {{ .Site.Data.metrics.evaluationTimestamp }}</p>

    <div class="metrics-summary">
        <h3>Key Metrics</h3>
        <ul>
            <li><strong>Total Samples Evaluated:</strong> {{ .Site.Data.metrics.totalSamples }}</li>
            <li><strong>Overall Accuracy:</strong> {{ printf "%.4f" .Site.Data.metrics.overallAccuracy }}</li>
        </ul>
    </div>
    
    <!-- 此处可以利用JavaScript和.Site.Data.metrics中的数据渲染更复杂的图表 -->
    <canvas id="confusionMatrixChart"></canvas>

    <script>
        // JS代码读取 {{ .Site.Data.metrics | safeJS }} 来渲染图表
        const metrics = JSON.parse('{{ .Site.Data.metrics | safeJS }}');
        // ... Chart.js渲染逻辑 ...
    </script>
</body>
</html>

最终,public/v1.2.0/ 目录下会生成一套完整的、自包含的HTML报告。这个目录可以被直接上传到任何对象存储服务,并提供一个永久链接。

当前方案的局限性与未来展望

这套架构虽然解决了我们对不可变性和可审计性的核心诉求,但它并非万能。首先,它完全为离线批量评估设计,不适用于需要实时监控线上模型性能的场景。实时监控仍需依赖传统的Metrics/Logging/Tracing方案。

其次,随着模型版本数量的爆炸性增长,SSG的全量构建时间可能会成为瓶颈。一个可行的优化路径是,流水线只构建最新版本的报告,并将其部署到特定的版本目录下,而不是每次都重新生成所有历史报告。

最后,报告之间的对比分析目前还比较原始,需要用户手动打开两个版本的报告进行比较。未来的一个迭代方向是开发一个“对比报告生成器”,它可以接收两个模型版本号作为参数,从Iceberg中查询这两次运行的数据,然后生成一个专门用于并排比较的静态diff页面。这可以进一步提升模型迭代的分析效率。


  目录