一个45分钟的CI/CD流水线,在第40分钟的端到端测试阶段因为网络抖动而失败。在传统的单体Jenkinsfile
设计中,这意味着从头再来:重新拉取代码、重新编译、重新运行单元测试、重新打包镜像。这不仅仅是40分钟的计算资源浪费,更是对开发迭代效率的严重拖累。在真实项目中,这种脆弱性是不可接受的。
问题的根源在于紧耦合的、线性的流程编排。我们将整个软件交付过程塞进一个巨大的、有状态的脚本里,任何一个环节的暂时性故障都会导致整个链条的崩溃。初步的构想是拆分这个庞大的流水线。我们不应该追求一个能做所有事情的“超级流水线”,而是应该构建一系列小型的、专注的、可独立执行和重试的“任务单元”。例如,“构建”、“单元测试”、“集成测试”、“部署到预发”都应该是独立的单元。
但拆分后,如何串联这些单元?最直接的方式是链式触发,即Job A在成功后触发Job B。这在小规模下可行,但随着流程复杂度的增加,会迅速退化成一个难以维护的“触发器蜘蛛网”。每个Job都硬编码了其下游依赖,耦合性依然很高。
我们需要一个更优雅的模式:事件驱动。每个任务单元在完成自身工作后,不关心下一步该谁执行,而是向一个中心化的消息总线发布一个“事件”,宣告自己的成果。其他任务单元则按需“订阅”它们感兴趣的事件。例如,“单元测试”任务订阅“构建成功”事件,“部署”任务订阅“所有测试通过”事件。这种模式下,任务单元之间实现了彻底解耦,它们只关心事件,不关心事件的生产者或消费者。
为此,我们确定了技术选型:
- GCP GKE (Google Kubernetes Engine): 作为我们所有CI/CD任务的运行环境。其弹性的节点池和对动态Jenkins Agent的良好支持,是高效利用计算资源的基础。
- Jenkins: 继续作为任务执行引擎。我们不去替换它,而是改变使用它的方式。它不再是中央编排器,而是事件驱动的“函数执行器”。
- Google Cloud Pub/Sub: 扮演消息总线的角色。选择它的理由是:完全托管、水平扩展能力几乎无限、提供可靠的消息投递保证和内置的重试与死信队列机制。自己搭建和维护一套高可用的Kafka或RabbitMQ集群,其运维成本在很多场景下是不必要的。
我们的目标架构如下:
graph TD subgraph Git Repository A[Git Push] end subgraph Jenkins on GKE B[Job: Build & Push Image] D[Job: Run Unit & Integration Tests] F[Job: Deploy to Staging] end subgraph Google Cloud Pub/Sub C[Topic: build-artifacts-ready] E[Topic: tests-passed] G[Topic: deployed-to-staging] end A --> B B -- Publishes Message --> C C -- Delivers Message --> D D -- Publishes Message --> E E -- Delivers Message --> F F -- Publishes Message --> G
第一步:环境准备与IAM配置
在GKE上运行Jenkins并与GCP服务交互,权限管理是首要问题。一个常见的错误是将高权限的服务账号密钥直接存储在Jenkins中,这存在安全风险。正确的做法是使用Workload Identity,将GKE Service Account与GCP IAM Service Account进行绑定。
创建GCP服务账号 (IAM SA):
# 设置环境变量 export PROJECT_ID=$(gcloud config get-value project) export CLUSTER_NAME="ci-cd-cluster" export CLUSTER_ZONE="us-central1-c" export JENKINS_GSA="jenkins-worker@${PROJECT_ID}.iam.gserviceaccount.com" export JENKINS_KSA="jenkins-agent" # Kubernetes Service Account Name # 创建GCP服务账号 gcloud iam service-accounts create jenkins-worker \ --display-name="Jenkins GKE Worker" # 授予必要权限 # 允许读写GCS(存储构建产物) gcloud projects add-iam-policy-binding ${PROJECT_ID} \ --member="serviceAccount:${JENKINS_GSA}" \ --role="roles/storage.admin" # 允许读写Artifact Registry(存储Docker镜像) gcloud projects add-iam-policy-binding ${PROJECT_ID} \ --member="serviceAccount:${JENKINS_GSA}" \ --role="roles/artifactregistry.writer" # 允许发布到Pub/Sub gcloud projects add-iam-policy-binding ${PROJECT_ID} \ --member="serviceAccount:${JENKINS_GSA}" \ --role="roles/pubsub.publisher" # 允许从Pub/Sub订阅中拉取消息 gcloud projects add-iam-policy-binding ${PROJECT_ID} \ --member="serviceAccount:${JENKINS_GSA}" \ --role="roles/pubsub.subscriber" # 允许管理GKE集群资源(用于部署) gcloud projects add-iam-policy-binding ${PROJECT_ID} \ --member="serviceAccount:${JENKINS_GSA}" \ --role="roles/container.developer"
启用并配置Workload Identity:
# 确保GKE集群已启用Workload Identity gcloud container clusters update ${CLUSTER_NAME} \ --zone=${CLUSTER_ZONE} \ --workload-pool=${PROJECT_ID}.svc.id.goog # 允许KSA模拟GSA gcloud iam service-accounts add-iam-policy-binding ${JENKINS_GSA} \ --role="roles/iam.workloadIdentityUser" \ --member="serviceAccount:${PROJECT_ID}.svc.id.goog[default/${JENKINS_KSA}]" # 在Kubernetes中创建Service Account并注解 cat <<EOF | kubectl apply -f - apiVersion: v1 kind: ServiceAccount metadata: name: ${JENKINS_KSA} namespace: default annotations: iam.gke.io/gcp-service-account: ${JENKINS_GSA} EOF
现在,任何以
jenkins-agent
这个KSA身份运行的Pod,都将自动获得jenkins-worker
这个GSA的权限,无需管理任何JSON密钥。创建Pub/Sub主题与订阅:
# 创建主题 gcloud pubsub topics create build-artifacts-ready gcloud pubsub topics create tests-passed gcloud pubsub topics create deploy-to-staging-failed # 用于死信队列 # 为“测试”阶段创建订阅,并配置死信队列 # 如果一条消息处理失败(未被ACK)达到5次,它将被发送到死信主题 gcloud pubsub subscriptions create run-tests-subscription \ --topic=build-artifacts-ready \ --ack-deadline=600 \ --dead-letter-topic=deploy-to-staging-failed \ --max-delivery-attempts=5
这里的
ack-deadline=600
(10分钟)很重要,它需要比你的任务执行时间长,以防止消息在任务完成前被Pub/Sub重新投递。max-delivery-attempts=5
和死信队列的配置,为我们的流水线提供了原生的重试和故障隔离能力。
第二步:配置Jenkins作为事件生产者
第一个流水线单元是“构建与推送镜像”。它由代码提交触发,完成工作后,它的职责是发布一个带有上下文信息的消息到build-artifacts-ready
主题。
下面是一个生产级的Jenkinsfile
示例,用于执行此操作。它将在一个动态生成的GKE Pod中运行。
pipelines/build-service/Jenkinsfile
:
// 声明Pod模板,使用我们配置好的Workload Identity
podTemplate(
serviceAccount: 'jenkins-agent', // 这是我们上面创建的KSA
containers: [
containerTemplate(
name: 'gcloud',
image: 'gcr.io/google.com/cloudsdktool/cloud-sdk:slim',
command: 'cat',
ttyEnabled: true
),
containerTemplate(
name: 'kaniko',
image: 'gcr.io/kaniko-project/executor:v1.9.0-debug',
command: 'cat',
ttyEnabled: true
)
]
) {
node(POD_LABEL) {
stage('Checkout') {
checkout scm
}
stage('Build and Push Image') {
container('kaniko') {
script {
// 定义镜像信息
def projectId = sh(script: 'gcloud config get-value project', returnStdout: true).trim()
def registry = "us-central1-docker.pkg.dev/${projectId}/app-repo"
def imageName = "${registry}/my-service:${env.GIT_COMMIT.take(7)}"
// 使用Kaniko在无特权容器中构建镜像
// 注意:这里的--context和--dockerfile路径需要根据你的项目结构调整
sh """
/kaniko/executor \
--context `pwd` \
--dockerfile `pwd`/Dockerfile \
--destination ${imageName} \
--cache=true
"""
// 将镜像URI存为环境变量,供后续阶段使用
env.IMAGE_URI = imageName
}
}
}
stage('Publish Build Event') {
container('gcloud') {
script {
// 构造JSON消息体
def messagePayload = """
{
"source": "jenkins-build-job",
"commit_sha": "${env.GIT_COMMIT}",
"image_uri": "${env.IMAGE_URI}",
"build_url": "${env.BUILD_URL}",
"status": "SUCCESS"
}
"""
// 这里的关键是使用gcloud CLI发布消息
// Workload Identity确保了这个命令有权限执行
sh """
gcloud pubsub topics publish build-artifacts-ready \\
--message='${messagePayload}' \\
--attribute=commit_sha=${env.GIT_COMMIT},status=SUCCESS
"""
echo "Published build success event for commit ${env.GIT_COMMIT}"
}
}
}
}
}
关键点剖析:
-
podTemplate
: 我们没有使用静态的Jenkins Agent,而是为每次构建动态创建Pod。serviceAccount: 'jenkins-agent'
是实现免密认证的关键。 - Kaniko: 在Kubernetes中构建Docker镜像的最佳实践是使用Kaniko,它不依赖于Docker守护进程,更安全。
- 消息体设计: 消息体是一个结构化的JSON,包含了下游任务所需的所有上下文信息:代码的commit SHA,以及构建出的镜像URI。这是保证流程可追溯性的核心。
build_url
也很有用,便于快速定位到触发事件的原始构建。 - Pub/Sub Attributes: 除了消息体,我们还使用了
attribute
。Attribute是键值对,可以用于订阅过滤,而无需解析整个JSON消息体。例如,下游消费者可以设置一个过滤器,只接收status=SUCCESS
的消息。
第三步:实现Jenkins作为事件消费者
这是整个架构中最具挑战性的一环。如何让Jenkins Job“监听”Pub/Sub的订阅?
一个常见的错误是创建一个轮询任务,例如每分钟运行一次的Cron Job,去gcloud pubsub subscriptions pull
。这种方式延迟高、效率低,且可能在消息高峰期错过消息。
我们需要一个更主动的、长连接式的监听器。在Jenkins生态中,并没有一个现成的、完美的Pub/Sub触发器插件能很好地支持Workload Identity和复杂的消费逻辑。因此,我们自己动手实现一个“监听器”流水线。
这个特殊的流水线任务将是一个常驻任务(while(true)
),它会阻塞式地从订阅中拉取消息。一旦收到消息,它会解析内容,并以参数化的方式触发真正的业务Job(例如单元测试Job)。
pipelines/event-listener/Jenkinsfile
:
properties([pipelineTriggers([])]) // 确保这个Job不会被SCM等触发
pipeline {
agent any // 这个监听器很简单,不需要在特定Pod中运行
options {
// 不允许并发执行,我们只需要一个监听器实例
disableConcurrentBuilds()
// 设置超时,以防脚本卡死。Jenkins会终止它,并根据配置自动重启
timeout(time: 12, unit: 'HOURS')
}
stages {
stage('Listen for Build Events') {
steps {
script {
while(true) {
// 使用gcloud CLI以阻塞方式拉取一条消息
// --auto-ack 会在拉取成功后自动确认消息
// --format=json 使得输出易于解析
// 这里的 try-catch 至关重要,用于处理网络问题或无消息时的超时
try {
echo "Waiting for new message from subscription 'run-tests-subscription'..."
def messageJson = sh(
script: """
gcloud pubsub subscriptions pull run-tests-subscription \\
--auto-ack \\
--limit=1 \\
--format=json
""",
returnStdout: true
).trim()
// 如果没有消息,gcloud会返回空字符串
if (messageJson) {
// 使用Groovy的JsonSlurper解析消息
def message = new groovy.json.JsonSlurperClassic().parseText(messageJson)
// Pub/Sub返回的是一个数组,我们只拉取了一条
def payload = new String(message[0].message.data.decodeBase64())
def attributes = message[0].message.attributes
echo "Received message. Attributes: ${attributes}"
echo "Payload: ${payload}"
def payloadData = new groovy.json.JsonSlurperClassic().parseText(payload)
// 触发下游的测试任务
// 将消息内容作为参数传递
echo "Triggering unit-test job for image: ${payloadData.image_uri}"
build job: 'unit-and-integration-tests',
parameters: [
string(name: 'IMAGE_URI', value: payloadData.image_uri),
string(name: 'COMMIT_SHA', value: payloadData.commit_sha)
]
} else {
// 没有消息,短暂休眠避免空轮询
sleep(5)
}
} catch (Exception e) {
echo "Error pulling message or triggering job: ${e.message}. Retrying after 10 seconds."
sleep(10)
}
}
}
}
}
}
}
这种实现方式的权衡与思考:
- 简单性: 这个方案完全在Jenkins内部实现,不需要引入额外的服务(如Cloud Functions或Cloud Run)。
-
--auto-ack
的风险: 这里为了简化,使用了--auto-ack
。在生产环境中,这是一个潜在的风险点。如果在build job
调用成功但Jenkins Master在下游Job实际启动前崩溃,这条消息就丢失了。 - 更可靠的ACK机制: 一个更健壮的模式是手动ACK。流程应如下:
-
gcloud pubsub subscriptions pull
不带--auto-ack
。 - 获取消息和它的
ackId
。 - 触发下游任务 (
build job: '...', wait: false
)。 - 立即使用
gcloud pubsub subscriptions ack <subscription> --ack-ids=<ackId>
来确认消息。
这种方式将消息丢失的风险窗口缩到最小。如果下游任务失败,那是它自己的事,不应该影响消息流转。下游任务的失败重试应该由它自身逻辑或Pub/Sub的重投递机制(如果下游任务也负责ACK)来处理。
-
第四步:实现可重试的测试任务
现在,我们来实现被监听器触发的测试任务。这个任务的核心在于,它必须是幂等的,并且能够根据测试结果决定是否“拒绝”一条消息,从而触发Pub/Sub的重试机制。
为了实现这一点,我们的监听器需要做一些修改,它不能再--auto-ack
,而是要把ackId
传递给下游任务。
修改后的监听器 pipelines/event-listener/Jenkinsfile
(关键部分):
// ...
// 拉取消息,不自动ACK
def messageJson = sh(
script: """
gcloud pubsub subscriptions pull run-tests-subscription \\
--limit=1 \\
--format=json
""",
returnStdout: true
).trim()
if (messageJson) {
def message = new groovy.json.JsonSlurperClassic().parseText(messageJson)[0]
def ackId = message.ackId // 获取ackId
def payload = new String(message.message.data.decodeBase64())
def payloadData = new groovy.json.JsonSlurperClassic().parseText(payload)
// 将ackId也作为参数传递
build job: 'unit-and-integration-tests',
parameters: [
string(name: 'IMAGE_URI', value: payloadData.image_uri),
string(name: 'COMMIT_SHA', value: payloadData.commit_sha),
string(name: 'ACK_ID', value: ackId)
]
}
//...
测试任务 pipelines/unit-and-integration-tests/Jenkinsfile
:
// 参数化Job,接收来自监听器的信息
properties([
parameters([
string(name: 'IMAGE_URI', defaultValue: '', description: 'Docker image to test'),
string(name: 'COMMIT_SHA', defaultValue: '', description: 'Git commit SHA'),
string(name: 'ACK_ID', defaultValue: '', description: 'Pub/Sub message ACK ID')
])
])
podTemplate(serviceAccount: 'jenkins-agent', ...) {
node(POD_LABEL) {
// 使用try/catch/finally来确保ACK/NACK逻辑总能执行
try {
stage('Run Tests') {
container('gcloud') {
// 假设这里是运行测试的复杂逻辑
// 比如,使用kubectl apply一个临时的测试环境
// 然后运行测试脚本
sh """
echo "Running tests against image ${params.IMAGE_URI}..."
# ./run_tests.sh ${params.IMAGE_URI}
# 这里我们模拟一个可能失败的测试
if (( RANDOM % 10 < 3 )); then
echo "Simulating a flaky test failure!"
exit 1
fi
"""
}
}
// 如果所有阶段成功,发布“测试通过”事件
stage('Publish Success Event') {
container('gcloud') {
// ... 发布到 tests-passed 主题的逻辑,类似构建任务
}
}
} catch (err) {
// 捕获任何异常(例如测试失败)
echo "Caught error: ${err}. Not acknowledging the message to trigger a retry."
// 关键:不执行任何操作。不ACK也不NACK。
// Pub/Sub将在ack deadline之后自动重传消息。
currentBuild.result = 'FAILURE'
error("Test failed, letting Pub/Sub redeliver the message.")
} finally {
// 只有在try块成功执行完毕时,才ACK消息
if (currentBuild.result == null || currentBuild.result == 'SUCCESS') {
stage('Acknowledge Message') {
container('gcloud') {
echo "Tests passed. Acknowledging message ${params.ACK_ID}"
sh "gcloud pubsub subscriptions ack run-tests-subscription --ack-ids=${params.ACK_ID}"
}
}
}
}
}
}
这个设计的韧性所在:
- 当测试脚本因为代码bug而稳定失败时,Pub/Sub会重试5次(根据我们的订阅配置)。5次失败后,该消息会被自动发送到
deploy-to-staging-failed
这个死信主题。这会触发SRE团队的告警,但不会阻塞后续其他commit的CI/CD流程。 - 当测试脚本因为环境抖动(例如依赖的服务暂时不可用)而偶然失败时,Pub/Sub的自动重试机制有很大概率在后续的尝试中成功,从而让流水线自动“自愈”。
局限性与未来展望
这个基于Jenkins和Pub/Sub的事件驱动架构,极大地提升了CI/CD流水线的解耦度和韧性。但它并非银弹。
首先,系统的可观测性变得更加复杂。排查一个完整的端到端流程问题,需要跨越多个Jenkins构建日志和Pub/Sub消息。一个有效的改进是在第一阶段生成一个唯一的trace_id
,并将其作为Pub/Sub消息的attribute在整个流程中传递,最后将所有阶段的日志聚合到统一的日志平台(如Google Cloud Logging)中,通过trace_id
进行关联查询。
其次,Jenkins中的那个常驻“监听器”Job,其本质上是一个单点。虽然Jenkins的Job重启机制能提供一定的恢复能力,但它不够云原生。一个更理想的架构演进方向是,用一个轻量级的Cloud Run服务或GKE上的一个专用Deployment来代替这个监听器。这个服务唯一的工作就是订阅Pub/Sub,并在收到消息时通过Jenkins的REST API去触发相应的Job。这将触发逻辑与执行逻辑彻底分离,使得整个系统更加健壮和可扩展。
最后,状态管理变得分散。单体Jenkinsfile
的一个隐性好处是它有一个统一的状态视图。在这个事件驱动模型中,整个流程的“状态”分布在各个Pub/Sub主题和正在执行的Job中。对于需要复杂编排(例如扇出/扇入、条件分支)的流水线,可能需要引入一个外部的状态协调器,如Argo Workflows或一个简单的状态数据库,来跟踪跨多个事件的流程状态。