构建连接无服务器与传统基础设施的反应式控制平面:Vercel Functions、NATS与Puppet的异构集成


在管理一个横跨公有云 Serverless 应用和私有数据中心虚拟机的混合环境时,一个核心挑战是如何实现状态的快速、可靠同步。传统的 Puppet agent pull 模式(默认每30分钟一次)在稳定性与可预测性上表现出色,但无法满足某些业务对近实时配置变更的需求,例如响应安全警报动态封禁IP、根据流量洪峰事件调整应用池配置,或是实现开发者通过 ChatOps 立即触发特定环境的部署。等待下一个30分钟的收敛周期在这些场景下是不可接受的。

直接暴露 Puppet Master API 或为其创建自定义 webhook 入口点,是最初会想到的方案,但这会带来严重的安全风险和架构耦合。它不仅将内部基础设施的核心控制面暴露于潜在威胁之下,还破坏了 Puppet 模型中 agent 作为唯一发起者的安全原则。我们需要一个既能响应外部事件,又能保持基础设施安全边界和声明式配置管理优势的架构。

核心问题可以归结为:如何构建一个安全、解耦、可扩展的事件总线,将来自云端 Serverless 计算层(如 Vercel Functions)的指令,可靠地传递给由 Puppet 管理的传统基础设施节点,并触发其进行即时配置收敛。

方案权衡:从直接暴露到消息队列

方案A:API网关直通Puppet Master

这个方案试图在 Puppet Master 上开启一个特定的API端点,通过API网关(如 AWS API Gateway 或自建 Nginx)代理,并由 Vercel Function 调用。

  • 优势:
    • 逻辑直接,Vercel Function 发起一个 HTTPS 请求,Puppet Master 接收并处理。
  • 劣势:
    • 安全灾难: 任何对 Puppet Master 的直接网络暴露都是高风险操作。即使有复杂的认证授权,攻击面也显著增大。
    • 状态管理混乱: Puppet Master 的核心职责是编译 catalog,而不是处理实时命令。这种模式容易退化为命令式的远程执行,违背了 IaC 的声明式初衷。
    • 扩展性瓶颈: Puppet Master 成为单点瓶颈,处理大量实时请求会严重影响其为 agent 提供 catalog 的核心功能。

方案B:Vercel Function 直连各 Puppet Agent

该方案为每个 Puppet agent 节点部署一个轻量级 HTTP 服务,监听特定端口。Vercel Function 根据需要调用的节点,直接向其发起请求。

  • 优势:
    • 去中心化,避免了 Puppet Master 的瓶颈。
  • 劣劣势:
    • 网络与安全噩梦: 需要为成百上千个节点配置防火墙规则、TLS证书和身份验证。这是一个巨大的运维负担且极不安全。
    • 服务发现复杂: Vercel Function 需要维护一个所有 agent 节点的地址列表,并处理其动态变化。
    • 可靠性差: 如果 agent 节点当时离线或网络不通,请求就会失败,且没有重试或缓冲机制。

最终选择:以NATS为核心的解耦控制平面

为了解决上述所有问题,我们选择引入一个高性能消息中间件——NATS。NATS 将作为连接 Serverless 事件源和传统基础设施节点的神经中枢。

graph TD
    subgraph 外部事件源
        A[Webhook / API Call / Git Push]
    end

    subgraph Vercel Edge Network
        B(Vercel Function API Endpoint)
    end

    subgraph 高可用NATS集群
        C{NATS Cluster / JetStream}
    end

    subgraph 数据中心 / VPC
        D1[Puppet Agent Node 1]
        D2[Puppet Agent Node 2]
        D3[Puppet Agent Node ...]
    end

    subgraph Puppet Agent内部
        E[NATS Listener Service] --> F{/etc/puppetlabs/facter/facts.d/nats_event.json}
        E --> G[Exec 'puppet agent -t']
    end

    subgraph Puppet自身流程
        H[Puppet Run] --> I[Reads Hiera Data]
        I --> F
        H --> J[Converges Node State]
    end

    A --> B
    B -- "Publish: infra.config.update" --> C
    C -- "Subscribe: infra.config.update" --> D1
    C -- "Subscribe: infra.config.update" --> D2
    C -- "Subscribe: infra.config.update" --> D3
    
    D1 --- E

这个架构的优势显而易见:

  1. 安全: 所有 Puppet agent 节点只需向 NATS 集群建立出站连接。数据中心无需为 agent 开放任何入站端口,安全边界清晰。NATS 本身支持强大的 NKEY 和 JWT 认证。
  2. 解耦: Vercel Function 只与 NATS 交互,它不需要知道下游有多少消费者,也不关心它们的网络位置或在线状态。同样,Puppet agent 节点只关心来自 NATS 的消息,不关心消息的来源。
  3. 可靠性: 通过启用 NATS JetStream,我们可以获得消息持久化、至少一次(at-least-once)投递保证和消息重放能力。即使 agent 节点离线,当它重新上线时也能收到错过的指令。
  4. 可扩展性: 增加新的 agent 节点只需让它连接并订阅 NATS 主题即可。Pub/Sub 模型也允许未来加入其他类型的消费者(如 Ansible、监控系统)来响应同一事件。

核心实现:代码与配置

我们将围绕一个具体场景展开:通过 Vercel Function 接收一个 webhook,动态更新一组 Web 服务器的防火墙规则,为其上游负载均衡器开放一个新的端口。

1. Vercel Function: 事件发布者

这是控制平面的入口。它负责验证输入、构造消息并将其安全地发布到 NATS。

项目结构:

/api
  /trigger-firewall-update.ts
/package.json
/tsconfig.json
/vercel.json

代码 (/api/trigger-firewall-update.ts):

// /api/trigger-firewall-update.ts
import { VercelRequest, VercelResponse } from '@vercel/node';
import { connect, StringCodec, JSONCodec, NatsConnection } from 'nats';
import { z } from 'zod';

// 使用 Zod 定义输入负载的 schema,确保类型安全和数据验证
const firewallUpdateSchema = z.object({
  targetGroup: z.string().regex(/^[a-zA-Z0-9_-]+$/), // e.g., 'webservers-prod'
  port: z.number().int().min(1024).max(65535),
  protocol: z.enum(['tcp', 'udp']).default('tcp'),
  action: z.enum(['allow', 'deny']),
  // 用于追踪和审计的元数据
  metadata: z.object({
    source: z.string(),
    correlationId: z.string().uuid(),
  }),
});

type FirewallUpdatePayload = z.infer<typeof firewallUpdateSchema>;

// 在函数外部缓存 NATS 连接,避免每次请求都重新建立
let natsConnection: NatsConnection | null = null;

async function getNatsConnection(): Promise<NatsConnection> {
  if (natsConnection) {
    return natsConnection;
  }
  
  // 从环境变量安全地获取 NATS 连接信息
  // NATS_URL: 'nats://user:[email protected]:4222'
  // NATS_SEED: NKEY SEED, e.g., 'SUA**************************************'
  const { NATS_URL, NATS_SEED } = process.env;

  if (!NATS_URL || !NATS_SEED) {
    throw new Error('NATS_URL and NATS_SEED environment variables are required.');
  }

  try {
    natsConnection = await connect({
      servers: NATS_URL,
      authenticator: seedAuthenticator(Buffer.from(NATS_SEED)),
    });
    console.log(`Connected to NATS server at ${natsConnection.getServer()}`);
    return natsConnection;
  } catch (err) {
    console.error('Failed to connect to NATS', err);
    throw err; // 抛出错误以便 Vercel 捕获并记录
  }
}

// 这是一个辅助函数,实际应使用 nats.js 库提供的 nkeyAuthenticator
// 这里为清晰起见展示其概念
import { createOperator, createAccount, createUser, fromSeed } from 'nkeys.js';
function seedAuthenticator(seed: Buffer) {
    const kp = fromSeed(seed);
    return (nonce: Uint8Array) => kp.sign(nonce);
}

export default async function handler(
  req: VercelRequest,
  res: VercelResponse
) {
  // 仅接受 POST 请求
  if (req.method !== 'POST') {
    res.setHeader('Allow', 'POST');
    return res.status(405).send('Method Not Allowed');
  }

  try {
    // 1. 验证输入
    const validationResult = firewallUpdateSchema.safeParse(req.body);
    if (!validationResult.success) {
      return res.status(400).json({
        message: 'Invalid request body.',
        errors: validationResult.error.flatten(),
      });
    }

    const payload: FirewallUpdatePayload = validationResult.data;

    // 2. 连接到 NATS
    const nc = await getNatsConnection();
    
    // 使用 JSON Codec 自动处理序列化
    const jc = JSONCodec<FirewallUpdatePayload>();

    // 3. 定义 NATS 主题,并发布消息
    // 主题设计:`domain.action.target`,便于细粒度订阅
    const subject = `infra.firewall.update.${payload.targetGroup}`;
    
    // 使用 JetStream context 发布,以获得持久化和确认
    const js = nc.jetstream();
    const pubAck = await js.publish(subject, jc.encode(payload));

    console.log(
      `Published message to subject '${subject}' on stream ${pubAck.stream}, sequence: ${pubAck.seq}`
    );

    // 4. 返回成功响应
    return res.status(202).json({
      message: 'Firewall update event accepted and queued.',
      stream: pubAck.stream,
      sequence: pubAck.seq,
      correlationId: payload.metadata.correlationId,
    });

  } catch (error) {
    console.error('An unexpected error occurred:', error);
    // 生产环境中应有更完善的错误日志系统
    return res.status(500).json({ message: 'Internal Server Error' });
  }
}

关键考量:

  • 环境变量: NATS 的凭证等敏感信息必须通过 Vercel 的环境变量注入,而不是硬编码。
  • 连接管理: NATS 连接是昂贵的,应在函数调用之间复用。
  • 输入验证: 绝对不能信任任何外部输入。使用 zod 这样的库进行严格的 schema 验证是生产级代码的必要条件。
  • 幂等性: 虽然 Puppet 本身是幂等的,但在 API 层设计时也应考虑。correlationId 可用于未来的去重或追踪。
  • JetStream: 我们明确使用 nc.jetstream().publish() 而不是 nc.publish(),这是为了确保消息在 agent 离线时不会丢失。这需要在 NATS 服务端预先配置好一个名为 infra 的 stream,并使其捕获 infra.> 主题。

2. Puppet Agent: 事件消费者

在每个被管理的节点上,我们需要一个常驻服务来监听 NATS 消息,并在接收到消息后触发一次 Puppet agent 运行。使用 Puppet 来管理这个监听服务本身,是实现自举和一致性管理的最佳实践。

Puppet 模块结构:

puppet_nats_listener/
  manifests/
    init.pp
    install.pp
    config.pp
    service.pp
  templates/
    nats_listener.service.epp
    listener_config.json.epp
  files/
    nats_puppet_trigger.rb

主清单 (manifests/init.pp):

# manifests/init.pp
class puppet_nats_listener (
  String $nats_servers,
  String $nats_nkey_seed_content,
  Array[String] $subscribe_subjects,
) {
  # 确保模块各部分按正确顺序执行
  contain puppet_nats_listener::install
  contain puppet_nats_listener::config
  contain puppet_nats_listener::service

  Class['puppet_nats_listener::install']
  -> Class['puppet_nats_listener::config']
  ~> Class['puppet_nats_listener::service']
}

安装 (manifests/install.pp):

# manifests/install.pp
class puppet_nats_listener::install {
  # 确保 ruby 和 nats gem 已安装
  # 在生产环境中,最好使用内部的 gem 仓库
  package { ['ruby', 'build-essential']:
    ensure => installed,
  }

  package { 'nats':
    ensure   => '2.3.0', # 固定版本以保证稳定性
    provider => 'gem',
    require  => Package['ruby'],
  }
}

配置 (manifests/config.pp):
这个类负责部署监听器脚本、它的 systemd 服务单元文件以及包含 NATS 连接信息的配置文件。

# manifests/config.pp
class puppet_nats_listener::config {
  $config_dir = '/etc/puppet_nats_listener'
  $fact_dir = '/etc/puppetlabs/facter/facts.d'
  
  # 创建所需目录
  file { [$config_dir, $fact_dir]:
    ensure => directory,
    owner  => 'root',
    group  => 'root',
    mode   => '0755',
  }

  # 部署监听器脚本
  file { "${config_dir}/nats_puppet_trigger.rb":
    ensure => file,
    owner  => 'root',
    group  => 'root',
    mode   => '0750',
    source => 'puppet:///modules/puppet_nats_listener/nats_puppet_trigger.rb',
  }

  # 部署监听器配置文件 (使用模板)
  file { "${config_dir}/config.json":
    ensure  => file,
    owner   => 'root',
    group   => 'root',
    mode    => '0640',
    content => epp('puppet_nats_listener/listener_config.json.epp'),
  }

  # 部署 systemd 服务单元文件
  file { '/etc/systemd/system/nats-listener.service':
    ensure  => file,
    owner   => 'root',
    group   => 'root',
    mode    => '0644',
    content => epp('puppet_nats_listener/nats_listener.service.epp'),
  }
}

核心监听器脚本 (files/nats_puppet_trigger.rb):

这是整个方案在 agent 端的粘合剂。它是一个简单的 Ruby 脚本,负责连接 NATS、处理消息、写入 fact 并触发 Puppet。

#!/usr/bin/env ruby
# /etc/puppet_nats_listener/nats_puppet_trigger.rb

require 'nats/client'
require 'json'
require 'logger'

# 配置
CONFIG_PATH = '/etc/puppet_nats_listener/config.json'
FACT_PATH = '/etc/puppetlabs/facter/facts.d/nats_event.json'
LOG_PATH = '/var/log/nats_puppet_trigger.log'

# 日志设置
logger = Logger.new(LOG_PATH)
logger.level = Logger::INFO

# 加载配置
begin
  config = JSON.parse(File.read(CONFIG_PATH))
rescue => e
  logger.fatal("Could not load config file at #{CONFIG_PATH}: #{e.message}")
  exit 1
end

def trigger_puppet_run(logger)
  logger.info("Triggering puppet agent run...")
  # 使用 --no-splay 禁用随机延迟,立即执行
  # 捕获输出用于日志记录
  output = `/opt/puppetlabs/bin/puppet agent -t --no-splay`
  exit_status = $?.exitstatus

  if exit_status == 0 || exit_status == 2 # 0: no changes, 2: changes applied
    logger.info("Puppet run completed successfully. Exit code: #{exit_status}")
  else
    logger.error("Puppet run failed. Exit code: #{exit_status}")
    logger.error("Puppet output:\n#{output}")
  end
end

begin
  NATS.start(
    servers: config['nats_servers'],
    nkeys_seed: config['nats_nkey_seed_path'] # 直接从文件加载 seed
  ) do |nc|
    logger.info("Connected to NATS at #{nc.connected_server}")

    # 订阅多个主题
    config['subscribe_subjects'].each do |subject|
      nc.subscribe(subject, queue: 'puppet_agent_workers') do |msg|
        logger.info("Received message on subject '#{msg.subject}'")
        
        begin
          # 1. 解析和验证消息 (简化版,生产应有更强的 schema 验证)
          payload = JSON.parse(msg.data)
          logger.info("Payload: #{payload.inspect}")

          # 2. 将消息内容写入一个 structured fact
          # Puppet 在运行时会自动加载这个 JSON 文件作为顶级 fact
          File.write(FACT_PATH, JSON.pretty_generate(payload))
          logger.info("Wrote event payload to fact file: #{FACT_PATH}")

          # 3. 触发 Puppet run
          # 使用线程以避免阻塞 NATS 的回调
          Thread.new { trigger_puppet_run(logger) }

        rescue JSON::ParserError => e
          logger.error("Failed to parse message data as JSON: #{e.message}")
        rescue => e
          logger.error("Error processing message: #{e.message}\n#{e.backtrace.join("\n")}")
        end
      end
      logger.info("Subscribed to '#{subject}' with queue group 'puppet_agent_workers'")
    end

    # 监听终止信号以优雅关闭
    trap('TERM') { nc.close }
    trap('INT') { nc.close }
  end
rescue NATS::Error => e
  logger.fatal("NATS connection error: #{e.message}")
  exit 1
end

关键设计点:

  • Queue Group (queue: 'puppet_agent_workers'): 这是一个 NATS 的核心特性。当多个订阅者属于同一个队列组时,对于发布的每一条消息,只有一个订阅者会收到它。在这里,我们不使用这个特性,因为我们希望所有订阅了该 targetGroup 的节点都收到消息并更新自己。如果场景是处理任务,只需要一个 worker 执行,则队列组非常有用。对于配置分发,我们省略 queue 选项,让其成为广播。
  • 数据传递机制: 脚本直接执行命令式操作。它将接收到的 payload 写入一个 well-known location (/etc/puppetlabs/facter/facts.d/)。这使得事件数据可以作为标准的 Facter fact 被 Puppet 的 catalog 编译过程使用。这保持了声明式的纯粹性。
  • 触发机制: 在写入 fact 后,脚本执行 puppet agent -t。Puppet run 启动后,会收集所有 facts(包括我们刚写入的 nats_event.json),发送给 Master,Master 基于这些新的 fact 编译出新的 catalog,然后 agent 应用它。
  • 幂等性保障: 即使 NATS 由于某种原因重发了消息,脚本会覆盖 nats_event.json 文件,并再次触发 Puppet run。因为 Puppet 的资源定义是幂等的,第二次运行不会产生任何变更(unless an error occurred in the first run)。

3. 将数据与Puppet代码连接

最后一步,我们需要修改 Puppet 代码,使其能够使用来自 NATS 事件的 fact。

Hiera 配置 (hiera.yaml):

通常不需要修改 Hiera 配置,因为 Facter facts 默认就是最高优先级的数据源之一。Puppet 会自动将 nats_event.json 的内容解析为 $facts['nats_event'] 这个 hash。

使用 fact 的 Puppet Manifest (例如,防火墙模块):

# modules/my_firewall/manifests/init.pp
class my_firewall {
  # 引入标准的 puppetlabs-firewall 模块
  include firewall

  # 从 fact 中获取动态端口配置
  # 使用 hiera() 或直接访问 $facts hash
  # 添加默认值以保证在没有事件触发时的编译也能通过
  $dynamic_rule = $facts.get('nats_event', {})
  
  if $dynamic_rule and $dynamic_rule['targetGroup'] == 'webservers-prod' {
    if $dynamic_rule['action'] == 'allow' {
      firewall { "150 allow dynamic port ${dynamic_rule['port']}":
        dport   => $dynamic_rule['port'],
        proto   => $dynamic_rule['protocol'],
        action  => 'accept',
        # 这里可以加入源IP限制等更多逻辑
        # source  => '10.0.0.0/8',
      }
    }
    # 此处可添加处理 'deny' 动作的逻辑
  }
}

当 Puppet 运行时,$facts['nats_event'] 会包含 Vercel Function 发送的完整 JSON payload。我们的 firewall 资源就可以根据这些动态数据来声明状态,实现了从云端事件到基础设施状态的闭环。

架构的局限性与未来展望

这个架构虽然强大,但也引入了新的复杂性。NATS 集群本身成为一个需要高可用运维的关键组件。对运维团队的技能要求也相应提高。

其次,这是一个最终一致性的模型。从事件发布到所有节点收敛完成,中间存在网络延迟、NATS 投递延迟和 Puppet run 的执行时间。对于需要硬实时、同步响应的场景,此架构可能不适用。

一个潜在的风险是“惊群效应”(thundering herd)。如果一个事件被发布到订阅者非常多的主题,可能会导致成千上万个节点同时开始执行 Puppet run,对 Puppet Master 造成巨大压力。在实践中,Vercel Function 发布端可能需要实现某种形式的批处理或交错发布策略,或者 Puppet Master 需要有相应的容量规划和缓存优化。

未来的演进方向可能包括:在 NATS listener 中增加更复杂的逻辑,例如基于消息内容决定是否需要触发 Puppet run;或者构建一个更通用的事件处理器,它不仅能触发 Puppet,还能调用 Ansible、执行脚本,或与 Kubernetes Operator 交互,真正成为管理混合云资源的统一控制平面。


  目录