利用Google Cloud Pub/Sub实现GKE上高韧性的解耦式Jenkins流水线


一个45分钟的CI/CD流水线,在第40分钟的端到端测试阶段因为网络抖动而失败。在传统的单体Jenkinsfile设计中,这意味着从头再来:重新拉取代码、重新编译、重新运行单元测试、重新打包镜像。这不仅仅是40分钟的计算资源浪费,更是对开发迭代效率的严重拖累。在真实项目中,这种脆弱性是不可接受的。

问题的根源在于紧耦合的、线性的流程编排。我们将整个软件交付过程塞进一个巨大的、有状态的脚本里,任何一个环节的暂时性故障都会导致整个链条的崩溃。初步的构想是拆分这个庞大的流水线。我们不应该追求一个能做所有事情的“超级流水线”,而是应该构建一系列小型的、专注的、可独立执行和重试的“任务单元”。例如,“构建”、“单元测试”、“集成测试”、“部署到预发”都应该是独立的单元。

但拆分后,如何串联这些单元?最直接的方式是链式触发,即Job A在成功后触发Job B。这在小规模下可行,但随着流程复杂度的增加,会迅速退化成一个难以维护的“触发器蜘蛛网”。每个Job都硬编码了其下游依赖,耦合性依然很高。

我们需要一个更优雅的模式:事件驱动。每个任务单元在完成自身工作后,不关心下一步该谁执行,而是向一个中心化的消息总线发布一个“事件”,宣告自己的成果。其他任务单元则按需“订阅”它们感兴趣的事件。例如,“单元测试”任务订阅“构建成功”事件,“部署”任务订阅“所有测试通过”事件。这种模式下,任务单元之间实现了彻底解耦,它们只关心事件,不关心事件的生产者或消费者。

为此,我们确定了技术选型:

  1. GCP GKE (Google Kubernetes Engine): 作为我们所有CI/CD任务的运行环境。其弹性的节点池和对动态Jenkins Agent的良好支持,是高效利用计算资源的基础。
  2. Jenkins: 继续作为任务执行引擎。我们不去替换它,而是改变使用它的方式。它不再是中央编排器,而是事件驱动的“函数执行器”。
  3. Google Cloud Pub/Sub: 扮演消息总线的角色。选择它的理由是:完全托管、水平扩展能力几乎无限、提供可靠的消息投递保证和内置的重试与死信队列机制。自己搭建和维护一套高可用的Kafka或RabbitMQ集群,其运维成本在很多场景下是不必要的。

我们的目标架构如下:

graph TD
    subgraph Git Repository
        A[Git Push]
    end

    subgraph Jenkins on GKE
        B[Job: Build & Push Image]
        D[Job: Run Unit & Integration Tests]
        F[Job: Deploy to Staging]
    end

    subgraph Google Cloud Pub/Sub
        C[Topic: build-artifacts-ready]
        E[Topic: tests-passed]
        G[Topic: deployed-to-staging]
    end

    A --> B
    B -- Publishes Message --> C
    C -- Delivers Message --> D
    D -- Publishes Message --> E
    E -- Delivers Message --> F
    F -- Publishes Message --> G

第一步:环境准备与IAM配置

在GKE上运行Jenkins并与GCP服务交互,权限管理是首要问题。一个常见的错误是将高权限的服务账号密钥直接存储在Jenkins中,这存在安全风险。正确的做法是使用Workload Identity,将GKE Service Account与GCP IAM Service Account进行绑定。

  1. 创建GCP服务账号 (IAM SA):

    # 设置环境变量
    export PROJECT_ID=$(gcloud config get-value project)
    export CLUSTER_NAME="ci-cd-cluster"
    export CLUSTER_ZONE="us-central1-c"
    export JENKINS_GSA="jenkins-worker@${PROJECT_ID}.iam.gserviceaccount.com"
    export JENKINS_KSA="jenkins-agent" # Kubernetes Service Account Name
    
    # 创建GCP服务账号
    gcloud iam service-accounts create jenkins-worker \
        --display-name="Jenkins GKE Worker"
    
    # 授予必要权限
    # 允许读写GCS(存储构建产物)
    gcloud projects add-iam-policy-binding ${PROJECT_ID} \
        --member="serviceAccount:${JENKINS_GSA}" \
        --role="roles/storage.admin"
    
    # 允许读写Artifact Registry(存储Docker镜像)
    gcloud projects add-iam-policy-binding ${PROJECT_ID} \
        --member="serviceAccount:${JENKINS_GSA}" \
        --role="roles/artifactregistry.writer"
    
    # 允许发布到Pub/Sub
    gcloud projects add-iam-policy-binding ${PROJECT_ID} \
        --member="serviceAccount:${JENKINS_GSA}" \
        --role="roles/pubsub.publisher"
    
    # 允许从Pub/Sub订阅中拉取消息
    gcloud projects add-iam-policy-binding ${PROJECT_ID} \
        --member="serviceAccount:${JENKINS_GSA}" \
        --role="roles/pubsub.subscriber"
    
    # 允许管理GKE集群资源(用于部署)
    gcloud projects add-iam-policy-binding ${PROJECT_ID} \
        --member="serviceAccount:${JENKINS_GSA}" \
        --role="roles/container.developer"
  2. 启用并配置Workload Identity:

    # 确保GKE集群已启用Workload Identity
    gcloud container clusters update ${CLUSTER_NAME} \
        --zone=${CLUSTER_ZONE} \
        --workload-pool=${PROJECT_ID}.svc.id.goog
    
    # 允许KSA模拟GSA
    gcloud iam service-accounts add-iam-policy-binding ${JENKINS_GSA} \
        --role="roles/iam.workloadIdentityUser" \
        --member="serviceAccount:${PROJECT_ID}.svc.id.goog[default/${JENKINS_KSA}]"
    
    # 在Kubernetes中创建Service Account并注解
    cat <<EOF | kubectl apply -f -
    apiVersion: v1
    kind: ServiceAccount
    metadata:
      name: ${JENKINS_KSA}
      namespace: default
      annotations:
        iam.gke.io/gcp-service-account: ${JENKINS_GSA}
    EOF

    现在,任何以jenkins-agent这个KSA身份运行的Pod,都将自动获得jenkins-worker这个GSA的权限,无需管理任何JSON密钥。

  3. 创建Pub/Sub主题与订阅:

    # 创建主题
    gcloud pubsub topics create build-artifacts-ready
    gcloud pubsub topics create tests-passed
    gcloud pubsub topics create deploy-to-staging-failed # 用于死信队列
    
    # 为“测试”阶段创建订阅,并配置死信队列
    # 如果一条消息处理失败(未被ACK)达到5次,它将被发送到死信主题
    gcloud pubsub subscriptions create run-tests-subscription \
        --topic=build-artifacts-ready \
        --ack-deadline=600 \
        --dead-letter-topic=deploy-to-staging-failed \
        --max-delivery-attempts=5

    这里的ack-deadline=600(10分钟)很重要,它需要比你的任务执行时间长,以防止消息在任务完成前被Pub/Sub重新投递。max-delivery-attempts=5和死信队列的配置,为我们的流水线提供了原生的重试和故障隔离能力。

第二步:配置Jenkins作为事件生产者

第一个流水线单元是“构建与推送镜像”。它由代码提交触发,完成工作后,它的职责是发布一个带有上下文信息的消息到build-artifacts-ready主题。

下面是一个生产级的Jenkinsfile示例,用于执行此操作。它将在一个动态生成的GKE Pod中运行。

pipelines/build-service/Jenkinsfile:

// 声明Pod模板,使用我们配置好的Workload Identity
podTemplate(
  serviceAccount: 'jenkins-agent', // 这是我们上面创建的KSA
  containers: [
    containerTemplate(
      name: 'gcloud',
      image: 'gcr.io/google.com/cloudsdktool/cloud-sdk:slim',
      command: 'cat',
      ttyEnabled: true
    ),
    containerTemplate(
      name: 'kaniko',
      image: 'gcr.io/kaniko-project/executor:v1.9.0-debug',
      command: 'cat',
      ttyEnabled: true
    )
  ]
) {
    node(POD_LABEL) {
        stage('Checkout') {
            checkout scm
        }

        stage('Build and Push Image') {
            container('kaniko') {
                script {
                    // 定义镜像信息
                    def projectId = sh(script: 'gcloud config get-value project', returnStdout: true).trim()
                    def registry = "us-central1-docker.pkg.dev/${projectId}/app-repo"
                    def imageName = "${registry}/my-service:${env.GIT_COMMIT.take(7)}"

                    // 使用Kaniko在无特权容器中构建镜像
                    // 注意:这里的--context和--dockerfile路径需要根据你的项目结构调整
                    sh """
                    /kaniko/executor \
                        --context `pwd` \
                        --dockerfile `pwd`/Dockerfile \
                        --destination ${imageName} \
                        --cache=true
                    """
                    // 将镜像URI存为环境变量,供后续阶段使用
                    env.IMAGE_URI = imageName
                }
            }
        }

        stage('Publish Build Event') {
            container('gcloud') {
                script {
                    // 构造JSON消息体
                    def messagePayload = """
                    {
                        "source": "jenkins-build-job",
                        "commit_sha": "${env.GIT_COMMIT}",
                        "image_uri": "${env.IMAGE_URI}",
                        "build_url": "${env.BUILD_URL}",
                        "status": "SUCCESS"
                    }
                    """

                    // 这里的关键是使用gcloud CLI发布消息
                    // Workload Identity确保了这个命令有权限执行
                    sh """
                    gcloud pubsub topics publish build-artifacts-ready \\
                        --message='${messagePayload}' \\
                        --attribute=commit_sha=${env.GIT_COMMIT},status=SUCCESS
                    """
                    echo "Published build success event for commit ${env.GIT_COMMIT}"
                }
            }
        }
    }
}

关键点剖析:

  • podTemplate: 我们没有使用静态的Jenkins Agent,而是为每次构建动态创建Pod。serviceAccount: 'jenkins-agent'是实现免密认证的关键。
  • Kaniko: 在Kubernetes中构建Docker镜像的最佳实践是使用Kaniko,它不依赖于Docker守护进程,更安全。
  • 消息体设计: 消息体是一个结构化的JSON,包含了下游任务所需的所有上下文信息:代码的commit SHA,以及构建出的镜像URI。这是保证流程可追溯性的核心。build_url也很有用,便于快速定位到触发事件的原始构建。
  • Pub/Sub Attributes: 除了消息体,我们还使用了attribute。Attribute是键值对,可以用于订阅过滤,而无需解析整个JSON消息体。例如,下游消费者可以设置一个过滤器,只接收status=SUCCESS的消息。

第三步:实现Jenkins作为事件消费者

这是整个架构中最具挑战性的一环。如何让Jenkins Job“监听”Pub/Sub的订阅?

一个常见的错误是创建一个轮询任务,例如每分钟运行一次的Cron Job,去gcloud pubsub subscriptions pull。这种方式延迟高、效率低,且可能在消息高峰期错过消息。

我们需要一个更主动的、长连接式的监听器。在Jenkins生态中,并没有一个现成的、完美的Pub/Sub触发器插件能很好地支持Workload Identity和复杂的消费逻辑。因此,我们自己动手实现一个“监听器”流水线。

这个特殊的流水线任务将是一个常驻任务(while(true)),它会阻塞式地从订阅中拉取消息。一旦收到消息,它会解析内容,并以参数化的方式触发真正的业务Job(例如单元测试Job)。

pipelines/event-listener/Jenkinsfile:

properties([pipelineTriggers([])]) // 确保这个Job不会被SCM等触发

pipeline {
    agent any // 这个监听器很简单,不需要在特定Pod中运行

    options {
        // 不允许并发执行,我们只需要一个监听器实例
        disableConcurrentBuilds()
        // 设置超时,以防脚本卡死。Jenkins会终止它,并根据配置自动重启
        timeout(time: 12, unit: 'HOURS')
    }

    stages {
        stage('Listen for Build Events') {
            steps {
                script {
                    while(true) {
                        // 使用gcloud CLI以阻塞方式拉取一条消息
                        // --auto-ack 会在拉取成功后自动确认消息
                        // --format=json 使得输出易于解析
                        // 这里的 try-catch 至关重要,用于处理网络问题或无消息时的超时
                        try {
                            echo "Waiting for new message from subscription 'run-tests-subscription'..."
                            def messageJson = sh(
                                script: """
                                gcloud pubsub subscriptions pull run-tests-subscription \\
                                    --auto-ack \\
                                    --limit=1 \\
                                    --format=json
                                """,
                                returnStdout: true
                            ).trim()

                            // 如果没有消息,gcloud会返回空字符串
                            if (messageJson) {
                                // 使用Groovy的JsonSlurper解析消息
                                def message = new groovy.json.JsonSlurperClassic().parseText(messageJson)
                                
                                // Pub/Sub返回的是一个数组,我们只拉取了一条
                                def payload = new String(message[0].message.data.decodeBase64())
                                def attributes = message[0].message.attributes
                                
                                echo "Received message. Attributes: ${attributes}"
                                echo "Payload: ${payload}"

                                def payloadData = new groovy.json.JsonSlurperClassic().parseText(payload)

                                // 触发下游的测试任务
                                // 将消息内容作为参数传递
                                echo "Triggering unit-test job for image: ${payloadData.image_uri}"
                                build job: 'unit-and-integration-tests',
                                      parameters: [
                                          string(name: 'IMAGE_URI', value: payloadData.image_uri),
                                          string(name: 'COMMIT_SHA', value: payloadData.commit_sha)
                                      ]
                            } else {
                                // 没有消息,短暂休眠避免空轮询
                                sleep(5)
                            }
                        } catch (Exception e) {
                            echo "Error pulling message or triggering job: ${e.message}. Retrying after 10 seconds."
                            sleep(10)
                        }
                    }
                }
            }
        }
    }
}

这种实现方式的权衡与思考:

  • 简单性: 这个方案完全在Jenkins内部实现,不需要引入额外的服务(如Cloud Functions或Cloud Run)。
  • --auto-ack的风险: 这里为了简化,使用了--auto-ack。在生产环境中,这是一个潜在的风险点。如果在build job调用成功但Jenkins Master在下游Job实际启动前崩溃,这条消息就丢失了。
  • 更可靠的ACK机制: 一个更健壮的模式是手动ACK。流程应如下:
    1. gcloud pubsub subscriptions pull 不带 --auto-ack
    2. 获取消息和它的ackId
    3. 触发下游任务 (build job: '...', wait: false)。
    4. 立即使用gcloud pubsub subscriptions ack <subscription> --ack-ids=<ackId>来确认消息。
      这种方式将消息丢失的风险窗口缩到最小。如果下游任务失败,那是它自己的事,不应该影响消息流转。下游任务的失败重试应该由它自身逻辑或Pub/Sub的重投递机制(如果下游任务也负责ACK)来处理。

第四步:实现可重试的测试任务

现在,我们来实现被监听器触发的测试任务。这个任务的核心在于,它必须是幂等的,并且能够根据测试结果决定是否“拒绝”一条消息,从而触发Pub/Sub的重试机制。

为了实现这一点,我们的监听器需要做一些修改,它不能再--auto-ack,而是要把ackId传递给下游任务。

修改后的监听器 pipelines/event-listener/Jenkinsfile (关键部分):

// ...
// 拉取消息,不自动ACK
def messageJson = sh(
    script: """
    gcloud pubsub subscriptions pull run-tests-subscription \\
        --limit=1 \\
        --format=json
    """,
    returnStdout: true
).trim()

if (messageJson) {
    def message = new groovy.json.JsonSlurperClassic().parseText(messageJson)[0]
    def ackId = message.ackId // 获取ackId
    def payload = new String(message.message.data.decodeBase64())
    def payloadData = new groovy.json.JsonSlurperClassic().parseText(payload)

    // 将ackId也作为参数传递
    build job: 'unit-and-integration-tests',
          parameters: [
              string(name: 'IMAGE_URI', value: payloadData.image_uri),
              string(name: 'COMMIT_SHA', value: payloadData.commit_sha),
              string(name: 'ACK_ID', value: ackId)
          ]
}
//...

测试任务 pipelines/unit-and-integration-tests/Jenkinsfile:

// 参数化Job,接收来自监听器的信息
properties([
    parameters([
        string(name: 'IMAGE_URI', defaultValue: '', description: 'Docker image to test'),
        string(name: 'COMMIT_SHA', defaultValue: '', description: 'Git commit SHA'),
        string(name: 'ACK_ID', defaultValue: '', description: 'Pub/Sub message ACK ID')
    ])
])

podTemplate(serviceAccount: 'jenkins-agent', ...) {
    node(POD_LABEL) {
        // 使用try/catch/finally来确保ACK/NACK逻辑总能执行
        try {
            stage('Run Tests') {
                container('gcloud') {
                    // 假设这里是运行测试的复杂逻辑
                    // 比如,使用kubectl apply一个临时的测试环境
                    // 然后运行测试脚本
                    sh """
                    echo "Running tests against image ${params.IMAGE_URI}..."
                    # ./run_tests.sh ${params.IMAGE_URI}
                    # 这里我们模拟一个可能失败的测试
                    if (( RANDOM % 10 < 3 )); then
                        echo "Simulating a flaky test failure!"
                        exit 1
                    fi
                    """
                }
            }

            // 如果所有阶段成功,发布“测试通过”事件
            stage('Publish Success Event') {
                container('gcloud') {
                    // ... 发布到 tests-passed 主题的逻辑,类似构建任务
                }
            }
        } catch (err) {
            // 捕获任何异常(例如测试失败)
            echo "Caught error: ${err}. Not acknowledging the message to trigger a retry."
            // 关键:不执行任何操作。不ACK也不NACK。
            // Pub/Sub将在ack deadline之后自动重传消息。
            currentBuild.result = 'FAILURE'
            error("Test failed, letting Pub/Sub redeliver the message.")
        } finally {
            // 只有在try块成功执行完毕时,才ACK消息
            if (currentBuild.result == null || currentBuild.result == 'SUCCESS') {
                stage('Acknowledge Message') {
                    container('gcloud') {
                        echo "Tests passed. Acknowledging message ${params.ACK_ID}"
                        sh "gcloud pubsub subscriptions ack run-tests-subscription --ack-ids=${params.ACK_ID}"
                    }
                }
            }
        }
    }
}

这个设计的韧性所在:

  • 当测试脚本因为代码bug而稳定失败时,Pub/Sub会重试5次(根据我们的订阅配置)。5次失败后,该消息会被自动发送到deploy-to-staging-failed这个死信主题。这会触发SRE团队的告警,但不会阻塞后续其他commit的CI/CD流程。
  • 当测试脚本因为环境抖动(例如依赖的服务暂时不可用)而偶然失败时,Pub/Sub的自动重试机制有很大概率在后续的尝试中成功,从而让流水线自动“自愈”。

局限性与未来展望

这个基于Jenkins和Pub/Sub的事件驱动架构,极大地提升了CI/CD流水线的解耦度和韧性。但它并非银弹。

首先,系统的可观测性变得更加复杂。排查一个完整的端到端流程问题,需要跨越多个Jenkins构建日志和Pub/Sub消息。一个有效的改进是在第一阶段生成一个唯一的trace_id,并将其作为Pub/Sub消息的attribute在整个流程中传递,最后将所有阶段的日志聚合到统一的日志平台(如Google Cloud Logging)中,通过trace_id进行关联查询。

其次,Jenkins中的那个常驻“监听器”Job,其本质上是一个单点。虽然Jenkins的Job重启机制能提供一定的恢复能力,但它不够云原生。一个更理想的架构演进方向是,用一个轻量级的Cloud Run服务或GKE上的一个专用Deployment来代替这个监听器。这个服务唯一的工作就是订阅Pub/Sub,并在收到消息时通过Jenkins的REST API去触发相应的Job。这将触发逻辑与执行逻辑彻底分离,使得整个系统更加健壮和可扩展。

最后,状态管理变得分散。单体Jenkinsfile的一个隐性好处是它有一个统一的状态视图。在这个事件驱动模型中,整个流程的“状态”分布在各个Pub/Sub主题和正在执行的Job中。对于需要复杂编排(例如扇出/扇入、条件分支)的流水线,可能需要引入一个外部的状态协调器,如Argo Workflows或一个简单的状态数据库,来跟踪跨多个事件的流程状态。


  目录