构建面向 Ray 作业的声明式提交通道 集成 Ant Design 与 Argo CD 的 GitOps 实践


团队的算法工程师们开始频繁抱怨提交 Ray 分布式任务的流程。最初,我们只是提供了标准的 KubeRay Operator YAML 模板,让他们手动修改并使用 kubectl apply 提交。这种方式在实验阶段尚可接受,但随着业务规模扩大,问题暴露无遗:

  1. 认知负加载体: 并非所有算法人员都精通 Kubernetes,他们需要理解 RayJobRayCluster 的 spec、资源请求(requests/limits)、节点亲和性等,这偏离了他们的核心工作。
  2. 配置蔓延与不一致: 每个人的本地机器上都散落着各种版本的 YAML 文件,latest 标签的滥用导致了难以复现的“在我这里是好的”问题。
  3. 缺乏审计与管控: 谁、在何时、提交了什么参数的任务,完全没有记录。资源误用(例如申请过多 GPU)也难以追溯。

我的第一反应是构建一个简单的后端服务,提供一个 API 接口,前端页面调用这个接口,后端直接执行 kubectl 命令。这个方案很快被我否决了。这种命令式的、由服务端直接操作集群状态的方式,本质上只是将 kubectl 封装了一层,它脆弱、难以审计,是通往运维噩梦的捷径。我们需要一个声明式的、可追溯的、与 Kubernetes 生态融为一体的方案。这自然而然地导向了 GitOps。

初步构想:一个 GitOps 驱动的提交通道

我们的目标是创建一个内部开发者平台(IDP)的雏形,让用户通过一个友好的 Web 界面来定义和提交 Ray 任务,而系统的核心驱动力是 Git。

整个工作流被设计成这样:

  1. 用户接口 (UI): 使用 Ant Design 构建一个清晰的表单,用户在此填写任务名称、执行脚本、计算资源(worker 数量、CPU、内存、GPU)等核心参数。
  2. 提交网关 (Gateway): UI 提交的不再是直接执行的命令,而是一个结构化的数据。一个后端服务(网关)接收这些数据。
  3. 声明式物化: 网关的核心职责不是调用 kubectl,而是根据接收到的参数,渲染出一个标准的 RayJob Kubernetes YAML 清单。
  4. GitOps 核心: 网关将生成的 YAML 文件作为一个新的 commit 推送到一个专门用于存放 Kubernetes 配置的 Git 仓库中。
  5. 持续交付: Argo CD 持续监控该 Git 仓库。一旦检测到新的 commit,它会自动将新的 RayJob 同步到 Kubernetes 集群中。
  6. 任务执行: KubeRay Operator 在集群中监听到新的 RayJob 资源后,会创建相应的 RayCluster 并执行任务。

这个架构的优势是显而易见的:

  • 单一可信源: Git 仓库成为所有任务的唯一真实来源。
  • 完全可审计: Git 的提交历史记录了每一次任务的变更。
  • 声明式与幂等: Argo CD 保证了集群的状态始终与 Git 仓库中声明的状态一致。
  • 关注点分离: 算法工程师专注于业务逻辑参数,平台工程师专注于底层基础设施的稳定与自动化。
sequenceDiagram
    participant User as 用户
    participant Frontend as Ant Design UI
    participant Gateway as 提交网关 (Python/Flask)
    participant GitRepo as 配置 Git 仓库
    participant ArgoCD as Argo CD
    participant K8s as Kubernetes API Server
    participant KubeRay as KubeRay Operator

    User->>Frontend: 填写 Ray 任务表单并点击提交
    Frontend->>Gateway: 发送 POST 请求 (包含表单 JSON)
    Gateway->>Gateway: 验证参数并渲染 RayJob YAML 模板
    Gateway->>GitRepo: Clone/Pull, 写入新 YAML, Commit & Push
    Note right of GitRepo: 触发 Webhook (可选)
    ArgoCD->>GitRepo: 定期 Fetch, 发现新的 Commit
    ArgoCD->>K8s: Sync 操作, Apply RayJob manifest
    K8s-->>KubeRay: 通知 RayJob 资源变更
    KubeRay->>K8s: 创建 RayCluster Pods
    KubeRay->>K8s: 创建 RayJob Pod (执行用户脚本)

技术选型与环境准备

  • Kubernetes 集群: 一个已经正常运行的集群。
  • Argo CD: 已安装并配置好,指向我们的应用配置仓库。
  • KubeRay Operator: 已通过 Helm 或 YAML 安装在集群中。
  • 配置仓库 (Git): 一个专门用于存放 RayJob YAML 的 Git 仓库。
  • 提交网关: 选择 Python + Flask,因为它在数据科学领域生态良好,处理 YAML 和 Git 操作的库很成熟。
  • 前端: React + Ant Design,用于快速构建高质量的内部工具界面。

在 Argo CD 中,我们采用 App of Apps 模式。一个顶层的 Application 负责同步我们所有 Ray 任务的目录。

# argocd/apps/ray-jobs-app.yaml
apiVersion: argoproj.io/v1alpha1
kind: Application
metadata:
  name: ray-jobs
  namespace: argocd
  finalizers:
    - resources-finalizer.argocd.argoproj.io
spec:
  project: default
  source:
    repoURL: 'https://github.com/your-org/k8s-ray-jobs.git' # 存放 RayJob 的仓库
    targetRevision: HEAD
    path: jobs # 所有 RayJob YAML 存放在此目录下
  destination:
    server: https://kubernetes.default.svc
    namespace: ray-jobs # 任务运行的命名空间
  syncPolicy:
    automated:
      prune: true
      selfHeal: true
    syncOptions:
    - CreateNamespace=true

这个配置告诉 Argo CD 去监控 k8s-ray-jobs.git 仓库的 jobs/ 目录,并将其中的所有 YAML 文件同步到集群的 ray-jobs 命名空间。

核心实现:从表单到 Git Commit

1. 前端交互界面 (Ant Design)

前端的核心是一个表单,它必须直观地捕获创建 Ray 任务所需的所有关键信息。在真实项目中,这个表单会复杂得多,可能包括挂载存储卷、设置环境变量、选择镜像等。这里我们简化为一个核心版本。

src/components/RayJobForm.tsx

import React, { useState } from 'react';
import { Form, Input, Button, Slider, InputNumber, message, Spin } from 'antd';
import axios from 'axios';

const RayJobForm: React.FC = () => {
    const [form] = Form.useForm();
    const [loading, setLoading] = useState(false);

    const onFinish = async (values: any) => {
        setLoading(true);
        try {
            // 后端网关的地址
            const gatewayEndpoint = '/api/v1/submit-ray-job';
            
            // 构造请求体
            const payload = {
                jobName: values.jobName.trim(),
                entrypoint: values.entrypoint.trim(),
                numWorkers: values.numWorkers,
                cpuPerWorker: values.cpuPerWorker,
                // 其他参数...
            };

            // 在真实项目中,你需要处理认证和授权
            await axios.post(gatewayEndpoint, payload);

            message.success(`任务 '${payload.jobName}' 已成功提交到 GitOps 流程,Argo CD 将在几分钟内开始部署。`);
            form.resetFields();
        } catch (error: any) {
            console.error("Job submission failed:", error);
            const errorMessage = error.response?.data?.error || '任务提交失败,请检查网关服务日志。';
            message.error(errorMessage);
        } finally {
            setLoading(false);
        }
    };

    return (
        <Spin spinning={loading} tip="正在提交任务至 Git 仓库...">
            <Form
                form={form}
                layout="vertical"
                onFinish={onFinish}
                initialValues={{ numWorkers: 2, cpuPerWorker: 0.5 }}
                style={{ maxWidth: 600, margin: 'auto', padding: '24px', border: '1px solid #f0f0f0', borderRadius: '8px' }}
            >
                <Form.Item
                    name="jobName"
                    label="任务名称"
                    rules={[
                        { required: true, message: '请输入任务名称' },
                        { pattern: /^[a-z0-9]([-a-z0-9]*[a-z0-9])?$/, message: '名称必须符合 Kubernetes 命名规范 (小写字母, 数字, -)' }
                    ]}
                >
                    <Input placeholder="例如: my-model-training-job-v1" />
                </Form.Item>

                <Form.Item
                    name="entrypoint"
                    label="执行入口命令"
                    rules={[{ required: true, message: '请输入执行命令' }]}
                >
                    <Input.TextArea rows={2} placeholder="例如: python /path/to/your/script.py --data /data" />
                </Form.Item>
                
                <Form.Item label="Worker 节点数量">
                    <Form.Item name="numWorkers" noStyle>
                        <InputNumber min={1} max={16} />
                    </Form.Item>
                    <span className="ant-form-text"> 个节点</span>
                </Form.Item>

                <Form.Item label="每个 Worker 的 CPU 核数" help="建议设置为 0.5 到 2 之间的小数值">
                     <Form.Item name="cpuPerWorker" noStyle>
                        <InputNumber min={0.1} max={8} step={0.1} />
                    </Form.Item>
                     <span className="ant-form-text"></span>
                </Form.Item>
                
                {/* 可以在这里添加内存、GPU等更多资源选项 */}

                <Form.Item>
                    <Button type="primary" htmlType="submit" block>
                        通过 GitOps 提交任务
                    </Button>
                </Form.Item>
            </Form>
        </Spin>
    );
};

export default RayJobForm;

这里的重点是 onFinish 函数。它不执行任何魔法,只是将表单数据打包成 JSON,然后 POST 到我们的后端网关。错误处理很重要,需要给用户明确的反馈。

2. 后端提交网关 (Python/Flask & GitPython)

这是整个系统的粘合剂。它接收前端请求,执行 Git 操作。

gateway/app.py

import os
import uuid
import logging
from flask import Flask, request, jsonify
from jinja2 import Environment, FileSystemLoader
from git import Repo, Actor
from git.exc import GitCommandError
import threading

# --- 配置 ---
# 在生产环境中, 这些配置应该来自环境变量或配置文件
GIT_REPO_PATH = "/tmp/k8s-ray-jobs" # 本地克隆仓库的路径
GIT_REPO_URL = os.environ.get("GIT_REPO_URL", "https://your-user:[email protected]/your-org/k8s-ray-jobs.git")
GIT_BRANCH = "main"
RAY_JOBS_SUBDIR = "jobs"
TEMPLATE_DIR = "templates"
AUTHOR_NAME = "RayJob Gateway"
AUTHOR_EMAIL = "[email protected]"

# --- 日志配置 ---
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')

# --- Flask 应用 ---
app = Flask(__name__)
env = Environment(loader=FileSystemLoader(TEMPLATE_DIR))

# --- Git 操作锁 ---
# 这是一个关键点:防止多个请求同时对 Git 仓库进行写操作,导致冲突。
# 在生产级应用中,可能需要使用更健壮的分布式锁,例如 Redis。
git_lock = threading.Lock()

def _clone_or_pull_repo():
    """克隆或更新本地仓库"""
    if not os.path.exists(GIT_REPO_PATH):
        logging.info(f"Cloning repo from {GIT_REPO_URL} to {GIT_REPO_PATH}")
        Repo.clone_from(GIT_REPO_URL, GIT_REPO_PATH, branch=GIT_BRANCH)
    else:
        logging.info(f"Pulling latest changes for repo in {GIT_REPO_PATH}")
        repo = Repo(GIT_REPO_PATH)
        origin = repo.remotes.origin
        origin.pull()

@app.route("/api/v1/submit-ray-job", methods=["POST"])
def submit_ray_job():
    data = request.get_json()
    if not data:
        return jsonify({"error": "Invalid JSON payload"}), 400

    # --- 参数校验 ---
    job_name = data.get("jobName")
    entrypoint = data.get("entrypoint")
    # ... 其他参数校验
    if not all([job_name, entrypoint]):
         return jsonify({"error": "Missing required fields: jobName, entrypoint"}), 400

    # 这里的坑在于,jobName 必须是 DNS-1123 兼容的,前端虽然做了校验,后端必须再次强制校验。
    # 这里省略了复杂的正则,但在生产代码中是必须的。

    # --- Git 操作 ---
    with git_lock:
        try:
            _clone_or_pull_repo()
            repo = Repo(GIT_REPO_PATH)

            # --- 渲染 YAML 模板 ---
            template = env.get_template('rayjob_template.yaml.j2')
            # 传递给模板的上下文
            context = {
                "job_name": job_name,
                "job_id": str(uuid.uuid4())[:8], # 添加一个唯一 ID 防止元数据冲突
                "entrypoint": entrypoint,
                "num_workers": data.get("numWorkers", 1),
                "cpu_per_worker": data.get("cpuPerWorker", 0.5),
                "head_cpu": 0.5, # 可以根据需要配置
                "ray_image": "rayproject/ray:2.7.0" # 镜像版本也应该可配置
            }
            rendered_yaml = template.render(context)
            
            # --- 写入文件并提交 ---
            # 文件名与 job name 保持一致,方便管理
            job_file_path = os.path.join(GIT_REPO_PATH, RAY_JOBS_SUBDIR, f"{job_name}.yaml")
            
            # 一个常见的错误是,如果用户提交同名任务,会覆盖旧文件。
            # 这符合 GitOps 的更新逻辑,但需要明确告知用户。
            if os.path.exists(job_file_path):
                logging.warning(f"Job file {job_file_path} already exists. Overwriting.")

            with open(job_file_path, "w") as f:
                f.write(rendered_yaml)
            
            repo.index.add([job_file_path])

            commit_message = f"feat(ray): Add/Update RayJob {job_name}"
            author = Actor(AUTHOR_NAME, AUTHOR_EMAIL)
            repo.index.commit(commit_message, author=author, committer=author)
            
            logging.info(f"Pushing commit '{commit_message}' to origin/{GIT_BRANCH}")
            origin = repo.remotes.origin
            origin.push()

        except GitCommandError as e:
            logging.error(f"Git operation failed: {e}")
            # 这里的错误处理很关键, 要返回给前端有意义的信息
            return jsonify({"error": f"Git command failed: {e.stderr}"}), 500
        except Exception as e:
            logging.error(f"An unexpected error occurred: {e}")
            return jsonify({"error": "An internal server error occurred"}), 500

    return jsonify({"message": "Job submitted successfully", "jobName": job_name}), 201

if __name__ == '__main__':
    app.run(host='0.0.0.0', port=5001)

这个网关服务有几个关键点:

  • 配置管理: 敏感信息(如 Git Token)必须通过环境变量注入,而不是硬编码。
  • 线程锁: git_lock 至关重要。如果没有它,在高并发提交下,本地 Git 仓库的状态会损坏。对于多实例部署的网关,需要升级为分布式锁(如基于 Redis 的 Redlock)。
  • 错误处理:GitCommandError 的捕获可以向前端返回具体的失败原因(如认证失败、合并冲突等),这对于调试非常有价值。

3. RayJob YAML 模板 (Jinja2)

这是连接用户输入和 Kubernetes 资源的桥梁。

gateway/templates/rayjob_template.yaml.j2

apiVersion: ray.io/v1alpha1
kind: RayJob
metadata:
  name: {{ job_name }}
  # Argo CD 会自动管理 namespace,但在这里明确指定也是一种好习惯
  namespace: ray-jobs 
  labels:
    # 标签对于后续的监控和筛选至关重要
    submitter: "gitops-gateway"
    jobId: "{{ job_id }}"
spec:
  # suspend: false # 控制任务是否立即执行
  entrypoint: "{{ entrypoint }}"
  runtimeEnvYAML: |
    # 可以在这里定义 pip 依赖或环境变量
    pip:
      - pandas
      - scikit-learn
  
  # Ray 集群的配置
  rayClusterSpec:
    # Head 节点配置
    headGroupSpec:
      rayStartParams:
        dashboard-host: '0.0.0.0'
      template:
        spec:
          containers:
            - name: ray-head
              image: {{ ray_image }}
              ports:
                - containerPort: 6379 # Ray GCS
                  name: gcs-server
                - containerPort: 8265 # Dashboard
                  name: dashboard
                - containerPort: 10001
                  name: client
              resources:
                requests:
                  cpu: "{{ head_cpu }}"
                  memory: "1Gi"
                limits:
                  cpu: "1"
                  memory: "2Gi"
    
    # Worker 节点配置
    workerGroupSpecs:
      - replicas: {{ num_workers }}
        minReplicas: {{ num_workers }}
        maxReplicas: {{ num_workers }}
        groupName: small-group # 组名
        rayStartParams: {}
        template:
          spec:
            containers:
              - name: ray-worker
                image: {{ ray_image }}
                lifecycle:
                  preStop:
                    exec:
                      command: ["/bin/sh","-c","ray stop"]
                resources:
                  requests:
                    cpu: "{{ cpu_per_worker }}"
                    memory: "1Gi"
                  limits:
                    cpu: "{{ cpu_per_worker|float + 0.5 }}" # 给予一些 buffer
                    memory: "2Gi"
  # 当任务完成后,集群的 TTL (秒)
  clusterSelector: {}
  shutdownAfterJobFinishes: true
  ttlSecondsAfterFinished: 600

闭环:状态反馈与未来迭代

目前,我们已经实现了一个单向的提交通道。用户提交任务后,只能去 Argo CD 的界面或者使用 kubectl 查看任务状态。这在生产环境中是不够的。一个完整的 IDP 需要形成闭环,将任务的状态反馈到提交界面上。

当前的方案存在一些局限性,也是下一步优化的方向:

  1. 状态反馈缺失: UI 无法实时展示任务是 Pending, Running, Succeeded 还是 Failed。实现这一点有几种路径:

    • 轮询 K8s API: 网关可以增加一个 /status 接口,接收 jobName,然后直接查询 K8s API Server 获取 RayJob 的状态。这是最直接但效率较低的方式。
    • 利用 Argo CD API: Argo CD 提供了丰富的 API,可以查询 Application 的同步状态和健康状况。网关可以通过查询 Argo CD 来间接获取任务状态。
    • 事件驱动: 这是一个更优雅的方案。可以部署一个 K8s 控制器(或使用像 Argo Events 这样的工具)来监听 RayJob 的状态变化,并将这些事件推送到一个消息队列(如 Kafka)或 WebSocket 服务,前端实时订阅这些状态更新。
  2. 日志查看: 用户需要一种简便的方式查看任务日志,而不是ssh到机器或者执行 kubectl logs。平台需要提供日志聚合和展示功能,可以将 Pod 日志通过 Fluentd/Logstash 收集到 Elasticsearch 中,并通过统一的界面展示。

  3. Git 冲突管理: 当前的 git_lock 只能解决单实例网关的并发问题。如果部署多个网关实例,需要分布式锁。更进一步,可以引入一个任务队列(如 Celery + Redis),所有 Git 操作都由一个单线程的 worker 来串行处理,从根本上消除冲突。

  4. 动态模板与参数化: 当前的 YAML 模板是固定的。一个更强大的平台应该允许用户上传或选择不同的任务模板,支持更复杂的参数,例如存储卷挂载、Secrets 注入、节点选择器等。这需要对模板渲染逻辑和前端表单进行更复杂的抽象设计。

尽管存在这些待办事项,但这个基于 Ant Design、Argo CD 和 Ray 的 GitOps 提交通道已经构建了一个坚实的基础。它将混乱的手动操作转变为一个自动化的、可审计的、声明式的流程,极大地提升了 MLOps 的效率和规范性,让算法工程师能重新聚焦于他们的模型和数据,而不是底层的 Kubernetes 细节。


  目录