团队的算法工程师们开始频繁抱怨提交 Ray 分布式任务的流程。最初,我们只是提供了标准的 KubeRay Operator YAML 模板,让他们手动修改并使用 kubectl apply
提交。这种方式在实验阶段尚可接受,但随着业务规模扩大,问题暴露无遗:
- 认知负加载体: 并非所有算法人员都精通 Kubernetes,他们需要理解
RayJob
、RayCluster
的 spec、资源请求(requests/limits)、节点亲和性等,这偏离了他们的核心工作。 - 配置蔓延与不一致: 每个人的本地机器上都散落着各种版本的 YAML 文件,
latest
标签的滥用导致了难以复现的“在我这里是好的”问题。 - 缺乏审计与管控: 谁、在何时、提交了什么参数的任务,完全没有记录。资源误用(例如申请过多 GPU)也难以追溯。
我的第一反应是构建一个简单的后端服务,提供一个 API 接口,前端页面调用这个接口,后端直接执行 kubectl
命令。这个方案很快被我否决了。这种命令式的、由服务端直接操作集群状态的方式,本质上只是将 kubectl
封装了一层,它脆弱、难以审计,是通往运维噩梦的捷径。我们需要一个声明式的、可追溯的、与 Kubernetes 生态融为一体的方案。这自然而然地导向了 GitOps。
初步构想:一个 GitOps 驱动的提交通道
我们的目标是创建一个内部开发者平台(IDP)的雏形,让用户通过一个友好的 Web 界面来定义和提交 Ray 任务,而系统的核心驱动力是 Git。
整个工作流被设计成这样:
- 用户接口 (UI): 使用 Ant Design 构建一个清晰的表单,用户在此填写任务名称、执行脚本、计算资源(worker 数量、CPU、内存、GPU)等核心参数。
- 提交网关 (Gateway): UI 提交的不再是直接执行的命令,而是一个结构化的数据。一个后端服务(网关)接收这些数据。
- 声明式物化: 网关的核心职责不是调用
kubectl
,而是根据接收到的参数,渲染出一个标准的RayJob
Kubernetes YAML 清单。 - GitOps 核心: 网关将生成的 YAML 文件作为一个新的 commit 推送到一个专门用于存放 Kubernetes 配置的 Git 仓库中。
- 持续交付: Argo CD 持续监控该 Git 仓库。一旦检测到新的 commit,它会自动将新的
RayJob
同步到 Kubernetes 集群中。 - 任务执行: KubeRay Operator 在集群中监听到新的
RayJob
资源后,会创建相应的RayCluster
并执行任务。
这个架构的优势是显而易见的:
- 单一可信源: Git 仓库成为所有任务的唯一真实来源。
- 完全可审计: Git 的提交历史记录了每一次任务的变更。
- 声明式与幂等: Argo CD 保证了集群的状态始终与 Git 仓库中声明的状态一致。
- 关注点分离: 算法工程师专注于业务逻辑参数,平台工程师专注于底层基础设施的稳定与自动化。
sequenceDiagram participant User as 用户 participant Frontend as Ant Design UI participant Gateway as 提交网关 (Python/Flask) participant GitRepo as 配置 Git 仓库 participant ArgoCD as Argo CD participant K8s as Kubernetes API Server participant KubeRay as KubeRay Operator User->>Frontend: 填写 Ray 任务表单并点击提交 Frontend->>Gateway: 发送 POST 请求 (包含表单 JSON) Gateway->>Gateway: 验证参数并渲染 RayJob YAML 模板 Gateway->>GitRepo: Clone/Pull, 写入新 YAML, Commit & Push Note right of GitRepo: 触发 Webhook (可选) ArgoCD->>GitRepo: 定期 Fetch, 发现新的 Commit ArgoCD->>K8s: Sync 操作, Apply RayJob manifest K8s-->>KubeRay: 通知 RayJob 资源变更 KubeRay->>K8s: 创建 RayCluster Pods KubeRay->>K8s: 创建 RayJob Pod (执行用户脚本)
技术选型与环境准备
- Kubernetes 集群: 一个已经正常运行的集群。
- Argo CD: 已安装并配置好,指向我们的应用配置仓库。
- KubeRay Operator: 已通过 Helm 或 YAML 安装在集群中。
- 配置仓库 (Git): 一个专门用于存放 RayJob YAML 的 Git 仓库。
- 提交网关: 选择 Python + Flask,因为它在数据科学领域生态良好,处理 YAML 和 Git 操作的库很成熟。
- 前端: React + Ant Design,用于快速构建高质量的内部工具界面。
在 Argo CD 中,我们采用 App of Apps 模式。一个顶层的 Application
负责同步我们所有 Ray 任务的目录。
# argocd/apps/ray-jobs-app.yaml
apiVersion: argoproj.io/v1alpha1
kind: Application
metadata:
name: ray-jobs
namespace: argocd
finalizers:
- resources-finalizer.argocd.argoproj.io
spec:
project: default
source:
repoURL: 'https://github.com/your-org/k8s-ray-jobs.git' # 存放 RayJob 的仓库
targetRevision: HEAD
path: jobs # 所有 RayJob YAML 存放在此目录下
destination:
server: https://kubernetes.default.svc
namespace: ray-jobs # 任务运行的命名空间
syncPolicy:
automated:
prune: true
selfHeal: true
syncOptions:
- CreateNamespace=true
这个配置告诉 Argo CD 去监控 k8s-ray-jobs.git
仓库的 jobs/
目录,并将其中的所有 YAML 文件同步到集群的 ray-jobs
命名空间。
核心实现:从表单到 Git Commit
1. 前端交互界面 (Ant Design)
前端的核心是一个表单,它必须直观地捕获创建 Ray 任务所需的所有关键信息。在真实项目中,这个表单会复杂得多,可能包括挂载存储卷、设置环境变量、选择镜像等。这里我们简化为一个核心版本。
src/components/RayJobForm.tsx
import React, { useState } from 'react';
import { Form, Input, Button, Slider, InputNumber, message, Spin } from 'antd';
import axios from 'axios';
const RayJobForm: React.FC = () => {
const [form] = Form.useForm();
const [loading, setLoading] = useState(false);
const onFinish = async (values: any) => {
setLoading(true);
try {
// 后端网关的地址
const gatewayEndpoint = '/api/v1/submit-ray-job';
// 构造请求体
const payload = {
jobName: values.jobName.trim(),
entrypoint: values.entrypoint.trim(),
numWorkers: values.numWorkers,
cpuPerWorker: values.cpuPerWorker,
// 其他参数...
};
// 在真实项目中,你需要处理认证和授权
await axios.post(gatewayEndpoint, payload);
message.success(`任务 '${payload.jobName}' 已成功提交到 GitOps 流程,Argo CD 将在几分钟内开始部署。`);
form.resetFields();
} catch (error: any) {
console.error("Job submission failed:", error);
const errorMessage = error.response?.data?.error || '任务提交失败,请检查网关服务日志。';
message.error(errorMessage);
} finally {
setLoading(false);
}
};
return (
<Spin spinning={loading} tip="正在提交任务至 Git 仓库...">
<Form
form={form}
layout="vertical"
onFinish={onFinish}
initialValues={{ numWorkers: 2, cpuPerWorker: 0.5 }}
style={{ maxWidth: 600, margin: 'auto', padding: '24px', border: '1px solid #f0f0f0', borderRadius: '8px' }}
>
<Form.Item
name="jobName"
label="任务名称"
rules={[
{ required: true, message: '请输入任务名称' },
{ pattern: /^[a-z0-9]([-a-z0-9]*[a-z0-9])?$/, message: '名称必须符合 Kubernetes 命名规范 (小写字母, 数字, -)' }
]}
>
<Input placeholder="例如: my-model-training-job-v1" />
</Form.Item>
<Form.Item
name="entrypoint"
label="执行入口命令"
rules={[{ required: true, message: '请输入执行命令' }]}
>
<Input.TextArea rows={2} placeholder="例如: python /path/to/your/script.py --data /data" />
</Form.Item>
<Form.Item label="Worker 节点数量">
<Form.Item name="numWorkers" noStyle>
<InputNumber min={1} max={16} />
</Form.Item>
<span className="ant-form-text"> 个节点</span>
</Form.Item>
<Form.Item label="每个 Worker 的 CPU 核数" help="建议设置为 0.5 到 2 之间的小数值">
<Form.Item name="cpuPerWorker" noStyle>
<InputNumber min={0.1} max={8} step={0.1} />
</Form.Item>
<span className="ant-form-text"> 核</span>
</Form.Item>
{/* 可以在这里添加内存、GPU等更多资源选项 */}
<Form.Item>
<Button type="primary" htmlType="submit" block>
通过 GitOps 提交任务
</Button>
</Form.Item>
</Form>
</Spin>
);
};
export default RayJobForm;
这里的重点是 onFinish
函数。它不执行任何魔法,只是将表单数据打包成 JSON,然后 POST 到我们的后端网关。错误处理很重要,需要给用户明确的反馈。
2. 后端提交网关 (Python/Flask & GitPython)
这是整个系统的粘合剂。它接收前端请求,执行 Git 操作。
gateway/app.py
import os
import uuid
import logging
from flask import Flask, request, jsonify
from jinja2 import Environment, FileSystemLoader
from git import Repo, Actor
from git.exc import GitCommandError
import threading
# --- 配置 ---
# 在生产环境中, 这些配置应该来自环境变量或配置文件
GIT_REPO_PATH = "/tmp/k8s-ray-jobs" # 本地克隆仓库的路径
GIT_REPO_URL = os.environ.get("GIT_REPO_URL", "https://your-user:[email protected]/your-org/k8s-ray-jobs.git")
GIT_BRANCH = "main"
RAY_JOBS_SUBDIR = "jobs"
TEMPLATE_DIR = "templates"
AUTHOR_NAME = "RayJob Gateway"
AUTHOR_EMAIL = "[email protected]"
# --- 日志配置 ---
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')
# --- Flask 应用 ---
app = Flask(__name__)
env = Environment(loader=FileSystemLoader(TEMPLATE_DIR))
# --- Git 操作锁 ---
# 这是一个关键点:防止多个请求同时对 Git 仓库进行写操作,导致冲突。
# 在生产级应用中,可能需要使用更健壮的分布式锁,例如 Redis。
git_lock = threading.Lock()
def _clone_or_pull_repo():
"""克隆或更新本地仓库"""
if not os.path.exists(GIT_REPO_PATH):
logging.info(f"Cloning repo from {GIT_REPO_URL} to {GIT_REPO_PATH}")
Repo.clone_from(GIT_REPO_URL, GIT_REPO_PATH, branch=GIT_BRANCH)
else:
logging.info(f"Pulling latest changes for repo in {GIT_REPO_PATH}")
repo = Repo(GIT_REPO_PATH)
origin = repo.remotes.origin
origin.pull()
@app.route("/api/v1/submit-ray-job", methods=["POST"])
def submit_ray_job():
data = request.get_json()
if not data:
return jsonify({"error": "Invalid JSON payload"}), 400
# --- 参数校验 ---
job_name = data.get("jobName")
entrypoint = data.get("entrypoint")
# ... 其他参数校验
if not all([job_name, entrypoint]):
return jsonify({"error": "Missing required fields: jobName, entrypoint"}), 400
# 这里的坑在于,jobName 必须是 DNS-1123 兼容的,前端虽然做了校验,后端必须再次强制校验。
# 这里省略了复杂的正则,但在生产代码中是必须的。
# --- Git 操作 ---
with git_lock:
try:
_clone_or_pull_repo()
repo = Repo(GIT_REPO_PATH)
# --- 渲染 YAML 模板 ---
template = env.get_template('rayjob_template.yaml.j2')
# 传递给模板的上下文
context = {
"job_name": job_name,
"job_id": str(uuid.uuid4())[:8], # 添加一个唯一 ID 防止元数据冲突
"entrypoint": entrypoint,
"num_workers": data.get("numWorkers", 1),
"cpu_per_worker": data.get("cpuPerWorker", 0.5),
"head_cpu": 0.5, # 可以根据需要配置
"ray_image": "rayproject/ray:2.7.0" # 镜像版本也应该可配置
}
rendered_yaml = template.render(context)
# --- 写入文件并提交 ---
# 文件名与 job name 保持一致,方便管理
job_file_path = os.path.join(GIT_REPO_PATH, RAY_JOBS_SUBDIR, f"{job_name}.yaml")
# 一个常见的错误是,如果用户提交同名任务,会覆盖旧文件。
# 这符合 GitOps 的更新逻辑,但需要明确告知用户。
if os.path.exists(job_file_path):
logging.warning(f"Job file {job_file_path} already exists. Overwriting.")
with open(job_file_path, "w") as f:
f.write(rendered_yaml)
repo.index.add([job_file_path])
commit_message = f"feat(ray): Add/Update RayJob {job_name}"
author = Actor(AUTHOR_NAME, AUTHOR_EMAIL)
repo.index.commit(commit_message, author=author, committer=author)
logging.info(f"Pushing commit '{commit_message}' to origin/{GIT_BRANCH}")
origin = repo.remotes.origin
origin.push()
except GitCommandError as e:
logging.error(f"Git operation failed: {e}")
# 这里的错误处理很关键, 要返回给前端有意义的信息
return jsonify({"error": f"Git command failed: {e.stderr}"}), 500
except Exception as e:
logging.error(f"An unexpected error occurred: {e}")
return jsonify({"error": "An internal server error occurred"}), 500
return jsonify({"message": "Job submitted successfully", "jobName": job_name}), 201
if __name__ == '__main__':
app.run(host='0.0.0.0', port=5001)
这个网关服务有几个关键点:
- 配置管理: 敏感信息(如 Git Token)必须通过环境变量注入,而不是硬编码。
- 线程锁:
git_lock
至关重要。如果没有它,在高并发提交下,本地 Git 仓库的状态会损坏。对于多实例部署的网关,需要升级为分布式锁(如基于 Redis 的 Redlock)。 - 错误处理: 对
GitCommandError
的捕获可以向前端返回具体的失败原因(如认证失败、合并冲突等),这对于调试非常有价值。
3. RayJob YAML 模板 (Jinja2)
这是连接用户输入和 Kubernetes 资源的桥梁。
gateway/templates/rayjob_template.yaml.j2
apiVersion: ray.io/v1alpha1
kind: RayJob
metadata:
name: {{ job_name }}
# Argo CD 会自动管理 namespace,但在这里明确指定也是一种好习惯
namespace: ray-jobs
labels:
# 标签对于后续的监控和筛选至关重要
submitter: "gitops-gateway"
jobId: "{{ job_id }}"
spec:
# suspend: false # 控制任务是否立即执行
entrypoint: "{{ entrypoint }}"
runtimeEnvYAML: |
# 可以在这里定义 pip 依赖或环境变量
pip:
- pandas
- scikit-learn
# Ray 集群的配置
rayClusterSpec:
# Head 节点配置
headGroupSpec:
rayStartParams:
dashboard-host: '0.0.0.0'
template:
spec:
containers:
- name: ray-head
image: {{ ray_image }}
ports:
- containerPort: 6379 # Ray GCS
name: gcs-server
- containerPort: 8265 # Dashboard
name: dashboard
- containerPort: 10001
name: client
resources:
requests:
cpu: "{{ head_cpu }}"
memory: "1Gi"
limits:
cpu: "1"
memory: "2Gi"
# Worker 节点配置
workerGroupSpecs:
- replicas: {{ num_workers }}
minReplicas: {{ num_workers }}
maxReplicas: {{ num_workers }}
groupName: small-group # 组名
rayStartParams: {}
template:
spec:
containers:
- name: ray-worker
image: {{ ray_image }}
lifecycle:
preStop:
exec:
command: ["/bin/sh","-c","ray stop"]
resources:
requests:
cpu: "{{ cpu_per_worker }}"
memory: "1Gi"
limits:
cpu: "{{ cpu_per_worker|float + 0.5 }}" # 给予一些 buffer
memory: "2Gi"
# 当任务完成后,集群的 TTL (秒)
clusterSelector: {}
shutdownAfterJobFinishes: true
ttlSecondsAfterFinished: 600
闭环:状态反馈与未来迭代
目前,我们已经实现了一个单向的提交通道。用户提交任务后,只能去 Argo CD 的界面或者使用 kubectl
查看任务状态。这在生产环境中是不够的。一个完整的 IDP 需要形成闭环,将任务的状态反馈到提交界面上。
当前的方案存在一些局限性,也是下一步优化的方向:
状态反馈缺失: UI 无法实时展示任务是
Pending
,Running
,Succeeded
还是Failed
。实现这一点有几种路径:- 轮询 K8s API: 网关可以增加一个
/status
接口,接收jobName
,然后直接查询 K8s API Server 获取RayJob
的状态。这是最直接但效率较低的方式。 - 利用 Argo CD API: Argo CD 提供了丰富的 API,可以查询
Application
的同步状态和健康状况。网关可以通过查询 Argo CD 来间接获取任务状态。 - 事件驱动: 这是一个更优雅的方案。可以部署一个 K8s 控制器(或使用像 Argo Events 这样的工具)来监听
RayJob
的状态变化,并将这些事件推送到一个消息队列(如 Kafka)或 WebSocket 服务,前端实时订阅这些状态更新。
- 轮询 K8s API: 网关可以增加一个
日志查看: 用户需要一种简便的方式查看任务日志,而不是ssh到机器或者执行
kubectl logs
。平台需要提供日志聚合和展示功能,可以将 Pod 日志通过 Fluentd/Logstash 收集到 Elasticsearch 中,并通过统一的界面展示。Git 冲突管理: 当前的
git_lock
只能解决单实例网关的并发问题。如果部署多个网关实例,需要分布式锁。更进一步,可以引入一个任务队列(如 Celery + Redis),所有 Git 操作都由一个单线程的 worker 来串行处理,从根本上消除冲突。动态模板与参数化: 当前的 YAML 模板是固定的。一个更强大的平台应该允许用户上传或选择不同的任务模板,支持更复杂的参数,例如存储卷挂载、Secrets 注入、节点选择器等。这需要对模板渲染逻辑和前端表单进行更复杂的抽象设计。
尽管存在这些待办事项,但这个基于 Ant Design、Argo CD 和 Ray 的 GitOps 提交通道已经构建了一个坚实的基础。它将混乱的手动操作转变为一个自动化的、可审计的、声明式的流程,极大地提升了 MLOps 的效率和规范性,让算法工程师能重新聚焦于他们的模型和数据,而不是底层的 Kubernetes 细节。