基于 Go 与 Multi-Paxos 协议构建阿里云环境下的高可用分布式锁服务


我们团队的一个核心 Node.js 应用,部署在阿里云几台 ECS 上,最近在处理订单支付回调时频繁出现双重处理的问题。初步排查定位是并发写入导致的状态不一致。最直接的解决方案是引入分布式锁,但用 Redis 的 SETNX 实现的锁并不完全可靠——业务进程如果持有锁并异常崩溃,锁无法自动释放,会导致死锁;而为锁设置过期时间,又可能因为业务执行时间超过预期而导致锁被错误地释放。我们需要一个真正能保证互斥性 (Safety) 的锁。

调研了 ZooKeeper 和 etcd,它们确实能解决问题,但对于我们当前这个单一场景来说,引入一个如此重的组件显得有些过度设计,并且增加了运维的复杂性。更重要的是,团队希望借此机会深入理解分布式一致性的底层原理。因此,我们决定自己动手,用 Go 语言和经典的 Paxos 协议,构建一个轻量级、高可用的分布式锁服务。

技术选型决策:为什么是 Go + Paxos?

选择 Go 作为实现语言几乎是下意识的决定。它的并发模型(goroutine 和 channel)非常适合构建网络服务和处理并发请求,静态类型和编译型特性也为构建可靠的底层服务提供了保障。

选择 Paxos 而非更易于理解的 Raft,是一个更具挑战性的决定。Raft 通过强制指定一个 Leader 来简化了共识过程,工程上更容易实现。但 Paxos 是所有共识算法的理论基石,它将角色(Proposer, Acceptor, Learner)解耦,不依赖一个稳定的 Leader。在某些极端网络分区场景下,理论上 Paxos 的可用性更高。我们希望通过实现这个“更原始”的协议,来彻底搞懂分布式系统中最核心的共识问题。

整个服务将部署在阿里云的 VPC 内,Node.js 业务应用作为客户端,通过 gRPC 与锁服务集群通信。

graph TD
    subgraph 阿里云 VPC
        subgraph "业务应用 (Node.js)"
            App1[ECS: Node App 1]
            App2[ECS: Node App 2]
        end

        subgraph "分布式锁服务 (Go)"
            P1[ECS: Paxos Node 1]
            P2[ECS: Paxos Node 2]
            P3[ECS: Paxos Node 3]
        end

        App1 -->|gRPC| P1
        App1 -->|gRPC| P2
        App1 -->|gRPC| P3
        App2 -->|gRPC| P1
        App2 -->|gRPC| P2
        App2 -->|gRPC| P3

        P1 <--> P2
        P1 <--> P3
        P2 <--> P3
    end

从 Basic Paxos 到 Multi-Paxos 日志状态机

一个标准的 Paxos 实例只能就一个值达成一次共识。而一个锁服务需要处理一系列的操作序列(Acquire, Release)。这就需要 Multi-Paxos。

Multi-Paxos 的核心思想是将一系列操作组织成一个日志(Log),并为日志中的每一个槽位(slot/index)运行一次独立的 Basic Paxos 实例来确定该槽位的操作内容。这样,所有节点最终会拥有一个完全一致的日志副本,从而构成一个复制状态机(Replicated State Machine, RSM)。

核心数据结构

我们的 Go 服务需要为每个 Paxos 节点(我们称之为 Server)维护状态。

package main

import (
	"log"
	"sync"
)

// Paxos 消息体
type Message struct {
	Type      string // "prepare", "promise", "propose", "accepted"
	From      string // 发送方地址
	To        string // 接收方地址
	InstanceID int    // Paxos 实例 ID, 对应于日志索引
	ProposalN int    // 提案编号
	Value     string // 提案值,例如 "LOCK order_123"
}

// Acceptor 状态
type AcceptorState struct {
	PromisedN int    // 已响应的最高提案编号
	AcceptedN int    // 已接受的最高提案编号
	AcceptedValue string // 已接受的提案值
}

// Server 代表一个 Paxos 节点
type Server struct {
	mu         sync.Mutex
	addr       string       // 节点地址
	peers      []string     // 其他节点地址
	majority   int          // 集群多数派数量
	
	// 每个日志索引(Paxos 实例)都对应一个 Acceptor 状态
	// key: InstanceID, value: AcceptorState
	acceptorStates map[int]*AcceptorState

	// Proposer 状态
	nextProposalN int
	log           map[int]string // 已确认的日志
	nextLogIndex  int            // 下一个待确定的日志索引
}

func NewServer(addr string, peers []string) *Server {
	s := &Server{
		addr:           addr,
		peers:          peers,
		majority:       len(peers)/2 + 1,
		acceptorStates: make(map[int]*AcceptorState),
		nextProposalN:  1,
		log:            make(map[int]string),
		nextLogIndex:   0,
	}
	log.Printf("[%s] Server initialized with peers: %v, majority: %d", addr, peers, s.majority)
	return s
}

// 获取或初始化指定实例的 Acceptor 状态
func (s *Server) getAcceptorState(instanceID int) *AcceptorState {
	if _, ok := s.acceptorStates[instanceID]; !ok {
		s.acceptorStates[instanceID] = &AcceptorState{}
	}
	return s.acceptorStates[instanceID]
}

这里的 Server 结构体同时扮演了 Proposer 和 Acceptor 的角色。在真实项目中,这些角色是逻辑上的,都运行在同一个进程内。acceptorStates 是关键,它用一个 map 将日志索引(InstanceID)与该索引对应的 Paxos 实例状态分离开,这是实现 Multi-Paxos 的基础。

实现 Paxos 的两个阶段

Phase 1: Prepare-Promise

Proposer 发送一个带有提案号 N 的 Prepare 请求。Acceptor 收到后,如果 N 大于它之前响应过的所有提案号,它就承诺不再响应任何小于 N 的提案,并将自己之前接受过的提案(如果有的话)返回给 Proposer。

// handlePrepare 处理 Prepare 请求
func (s *Server) handlePrepare(msg Message) {
	s.mu.Lock()
	defer s.mu.Unlock()

	state := s.getAcceptorState(msg.InstanceID)

	if msg.ProposalN > state.PromisedN {
		log.Printf("[%s] Instance %d: Promised for proposal N=%d (previous promised N=%d)", s.addr, msg.InstanceID, msg.ProposalN, state.PromisedN)
		state.PromisedN = msg.ProposalN
		
		// 响应 Promise
		promiseMsg := Message{
			Type:      "promise",
			From:      s.addr,
			To:        msg.From,
			InstanceID: msg.InstanceID,
			ProposalN: state.PromisedN, // 返回自己承诺的N
			Value:     state.AcceptedValue, // 返回之前接受过的值
		}
		// 在实际项目中,这里会通过网络发送消息
		// sendMessage(promiseMsg)
		log.Printf("[%s] Instance %d: Sent promise for N=%d", s.addr, msg.InstanceID, promiseMsg.ProposalN)
	} else {
		log.Printf("[%s] Instance %d: Rejected prepare for N=%d (already promised N=%d)", s.addr, msg.InstanceID, msg.ProposalN, state.PromisedN)
	}
}

Phase 2: Propose-Accepted

当 Proposer 收到来自多数派 Acceptors 的 Promise 响应后,它就可以发起 Propose 请求。提案的值需要遵循一个规则:如果收到的 Promise 响应中包含了之前被接受过的值,那么 Proposer 必须选择其中提案号最高的那个值作为本次提案的值。否则,它可以自由选择自己的值。

Acceptor 收到 Propose 请求后,如果请求的提案号 N 不小于它已承诺的提案号,就接受该提案。

// handlePropose 处理 Propose 请求
func (s *Server) handlePropose(msg Message) {
	s.mu.Lock()
	defer s.mu.Unlock()

	state := s.getAcceptorState(msg.InstanceID)
	
	// 只有当提案号不小于已承诺的提案号时,才接受
	if msg.ProposalN >= state.PromisedN {
		log.Printf("[%s] Instance %d: Accepted proposal N=%d with value '%s'", s.addr, msg.InstanceID, msg.ProposalN, msg.Value)
		state.PromisedN = msg.ProposalN
		state.AcceptedN = msg.ProposalN
		state.AcceptedValue = msg.Value

		// 广播 Accepted 消息给所有节点(包括 Learner)
		acceptedMsg := Message{
			Type:       "accepted",
			From:       s.addr,
			To:         "broadcast", // 广播
			InstanceID: msg.InstanceID,
			ProposalN:  msg.ProposalN,
			Value:      msg.Value,
		}
		// broadcastMessage(acceptedMsg)
		log.Printf("[%s] Instance %d: Broadcasted 'accepted' for value '%s'", s.addr, msg.InstanceID, acceptedMsg.Value)

	} else {
		log.Printf("[%s] Instance %d: Rejected propose for N=%d (already promised N=%d)", s.addr, msg.InstanceID, msg.ProposalN, state.PromisedN)
	}
}

// Proposer 逻辑:尝试为一个值达成共识
func (s *Server) propose(value string) {
    s.mu.Lock()
    instanceID := s.nextLogIndex
    proposalN := s.nextProposalN
    s.nextProposalN++
    s.mu.Unlock()

    log.Printf("[%s] Starting proposal for instance %d with value '%s' and N=%d", s.addr, instanceID, value, proposalN)

    // Phase 1a: 发送 Prepare 请求
    prepareMsg := Message{
        Type:       "prepare",
        From:       s.addr,
        To:         "broadcast",
        InstanceID: instanceID,
        ProposalN:  proposalN,
    }
    // responses := broadcastAndWaitForMajority(prepareMsg)
    // 这里的 broadcastAndWaitForMajority 是一个伪代码,代表了网络通信和等待多数派响应的逻辑
    
    // 假设我们收到了多数派的 Promise 响应
    // Phase 1b: 检查响应并决定提案值
    highestAcceptedN := -1
    valueToPropose := value
    // for _, resp := range responses {
    //     if resp.AcceptedN > highestAcceptedN {
    //         highestAcceptedN = resp.AcceptedN
    //         valueToPropose = resp.AcceptedValue
    //     }
    // }
    
    // Phase 2a: 发送 Propose 请求
    proposeMsg := Message{
        Type:       "propose",
        From:       s.addr,
        To:         "broadcast",
        InstanceID: instanceID,
        ProposalN:  proposalN,
        Value:      valueToPropose,
    }
    // broadcastMessage(proposeMsg)
    log.Printf("[%s] Sent propose for instance %d with value '%s'", s.addr, instanceID, valueToPropose)
}

// handleAccepted 学习到一个值
func (s *Server) handleAccepted(msg Message) {
    s.mu.Lock()
    defer s.mu.Unlock()

    // 实际项目中需要统计收到的 Accepted 消息数量,达到多数派才确认
    // 为简化,这里直接学习
    if _, exists := s.log[msg.InstanceID]; !exists {
        s.log[msg.InstanceID] = msg.Value
        log.Printf("[%s] Learned value '%s' for instance %d. Log: %v", s.addr, msg.Value, msg.InstanceID, s.log)

        // 推进日志索引
        for {
            if _, ok := s.log[s.nextLogIndex]; ok {
                s.nextLogIndex++
            } else {
                break
            }
        }
    }
}

这里的代码只是骨架,一个生产级的实现需要处理网络通信(gRPC 或其他 RPC 框架)、超时、重试、以及保证提案号的唯一性(通常是 timestamp + server_id 的组合)。

在 Multi-Paxos 之上构建锁服务

我们的复制状态机(日志)现在可以对一系列操作达成共识了。接下来就是定义锁操作。

  • Acquire(lockName, clientID, leaseID): 客户端请求获取锁。这个操作会被 Proposer 序列化成一个字符串,例如 "ACQUIRE order_123 client_A lease_XYZ",然后提交给 Multi-Paxos 进行共识。
  • Release(lockName, clientID, leaseID): 客户端请求释放锁。同理,序列化成 "RELEASE order_123 client_A lease_XYZ"

每个 Paxos 节点在学习到一条新的日志条目后,都会在本地应用这个操作,更新一个内存中的锁状态表。

// lock_service.go

type LockState struct {
	Name    string
	Locked  bool
	OwnerID string // clientID
	LeaseID string
}

type LockService struct {
	mu         sync.Mutex
	locks      map[string]*LockState // key: lockName
	paxosServer *Server // 内嵌的 Paxos 服务实例
}

// 当 Paxos 日志有更新时,此方法被调用
func (ls *LockService) applyLogEntry(entry string) {
	ls.mu.Lock()
	defer ls.mu.Unlock()

	parts := strings.Split(entry, " ")
	if len(parts) < 2 {
		return
	}
	op, lockName := parts[0], parts[1]
	
	state, ok := ls.locks[lockName]
	if !ok {
		state = &LockState{Name: lockName}
		ls.locks[lockName] = state
	}

	switch op {
	case "ACQUIRE":
		if !state.Locked {
			state.Locked = true
			state.OwnerID = parts[2]
			state.LeaseID = parts[3]
			log.Printf("[LockService] Lock '%s' acquired by '%s'", lockName, state.OwnerID)
		}
	case "RELEASE":
		// 必须是锁的持有者才能释放
		if state.Locked && state.OwnerID == parts[2] && state.LeaseID == parts[3] {
			state.Locked = false
			state.OwnerID = ""
			state.LeaseID = ""
			log.Printf("[LockService] Lock '%s' released by '%s'", lockName, parts[2])
		}
	}
}

暴露 gRPC 接口并构建 Node.js 客户端

为了让 Node.js 应用能使用我们的服务,我们使用 gRPC 定义接口。

lock.proto:

syntax = "proto3";

package lock;

service LockService {
  rpc Acquire(AcquireRequest) returns (AcquireResponse);
  rpc Release(ReleaseRequest) returns (ReleaseResponse);
}

message AcquireRequest {
  string lock_name = 1;
  string client_id = 2;
}

message AcquireResponse {
  bool acquired = 1;
  string lease_id = 2; // 租约ID,用于安全释放
}

message ReleaseRequest {
  string lock_name = 1;
  string client_id = 2;
  string lease_id = 3;
}

message ReleaseResponse {
  bool released = 1;
}

Go 服务实现这些 gRPC 方法。Acquire 方法会生成一个唯一的 leaseID,然后将 ACQUIRE 命令通过 paxosServer.propose() 提交。这里的坑在于,propose 是异步的,我们需要一种机制来等待共识结果。一个常见的模式是使用一个 channel,在提交提案时将 channel 存入一个 map,key 是日志索引。当该索引的日志被学习到后,再通过 channel 通知 RPC 处理协程。

Node.js 客户端的实现就相对直接了:

// lockClient.js
const grpc = require('@grpc/grpc-js');
const protoLoader = require('@grpc/proto-loader');
const { randomUUID } = require('crypto');

const PROTO_PATH = './lock.proto';
const packageDefinition = protoLoader.loadSync(PROTO_PATH);
const lockProto = grpc.loadPackageDefinition(packageDefinition).lock;

// 假设我们的Go服务集群有3个节点,客户端可以随机或轮询连接
const LOCK_SERVICE_TARGET = '172.18.0.101:50051'; // 阿里云内网IP

class DistributedLock {
  constructor(lockName, clientId) {
    this.lockName = lockName;
    this.clientId = clientId || `node-client-${randomUUID()}`;
    this.leaseId = null;
    this.client = new lockProto.LockService(LOCK_SERVICE_TARGET, grpc.credentials.createInsecure());
  }

  async acquire() {
    return new Promise((resolve, reject) => {
      this.client.acquire({ lock_name: this.lockName, client_id: this.clientId }, (err, response) => {
        if (err) {
          console.error(`Error acquiring lock ${this.lockName}:`, err);
          return reject(err);
        }
        if (response.acquired) {
          this.leaseId = response.lease_id;
          console.log(`Lock ${this.lockName} acquired by ${this.clientId} with lease ${this.leaseId}`);
          resolve(true);
        } else {
          console.log(`Failed to acquire lock ${this.lockName} for ${this.clientId}`);
          resolve(false);
        }
      });
    });
  }

  async release() {
    if (!this.leaseId) {
      console.warn(`Cannot release lock ${this.lockName} without a lease.`);
      return false;
    }
    return new Promise((resolve, reject) => {
      this.client.release({ lock_name: this.lockName, client_id: this.clientId, lease_id: this.leaseId }, (err, response) => {
        if (err) {
          console.error(`Error releasing lock ${this.lockName}:`, err);
          return reject(err);
        }
        if (response.released) {
          console.log(`Lock ${this.lockName} released by ${this.clientId}`);
          this.leaseId = null;
          resolve(true);
        } else {
          console.log(`Failed to release lock ${this.lockName} for ${this.clientId}`);
          resolve(false);
        }
      });
    });
  }
}

// 使用示例
async function criticalSection() {
    const lock = new DistributedLock('payment_processing_lock');
    try {
        const acquired = await lock.acquire();
        if (acquired) {
            console.log('Inside critical section. Processing payment...');
            // 模拟业务处理
            await new Promise(resolve => setTimeout(resolve, 2000));
            console.log('Payment processed.');
        } else {
            console.log('Could not acquire lock, another process is working.');
        }
    } catch (e) {
        console.error('An error occurred during critical section:', e);
    } finally {
        if (lock.leaseId) {
            await lock.release();
        }
    }
}

criticalSection();

部署到阿里云的注意事项

  1. 网络规划: 将所有 Go 服务节点和 Node.js 应用节点放置在同一个 VPC 下的同一个可用区内,使用内网 IP 通信,以降低延迟和网络成本。
  2. 安全组: 为 Go 服务所在的 ECS 实例配置安全组规则,仅允许来自 Node.js 应用 ECS 实例所在安全组的流量访问 gRPC 端口(例如 50051),同时允许集群内部节点之间互相通信。这是最小权限原则的体现。
  3. 服务发现: 在 Node.js 客户端中硬编码 Go 服务地址是不灵活的。在生产环境中,应该使用阿里云的服务发现产品(如 MSE)或自建一个简单的服务注册中心。
  4. 持久化: 我们的实现中,Paxos 日志是纯内存的。这意味着如果整个集群重启,所有锁的状态都会丢失。生产级系统必须将日志持久化到磁盘。可以使用 LevelDB 或者直接写文件,但需要注意 fsync 的调用时机,确保数据落盘,这是性能和可靠性之间的一个重要权衡。

局限性与未来迭代方向

这个从零构建的分布式锁服务成功解决了我们最初的问题,并且让我们对 Paxos 协议有了深刻的工程理解。但它离一个真正通用的生产级组件还有距离。

首先,这个 Multi-Paxos 实现为了简化,省略了 Leader 选举优化。在没有稳定 Leader 的情况下,每次写操作都可能发生提案冲突,导致多轮 Paxos 协商,延迟较高。实际的 Paxos 工程实现(如 Google 的 Chubby)都会选举出一个稳定的 Leader 来专门处理写请求,只有在 Leader 失效时才需要重新执行 Paxos 来选举新 Leader。这大大降低了正常情况下的延迟。

其次,成员变更(增删节点)的功能没有实现。当前的集群成员是静态配置的,无法动态扩缩容。实现动态成员变更需要通过 Paxos 协议本身来对成员变更的配置达成共识,这是一个相当复杂的话题。

最后,日志压缩也没有实现。随着服务运行,操作日志会无限增长,占用大量内存和磁盘空间,并拖慢新节点的启动恢复速度。需要定期对日志进行快照(Snapshot),丢弃旧的日志条目。

尽管存在这些局限,但这次实践的价值在于,它为我们构建更复杂的分布式系统打下了坚实的基础。我们不再将 etcd 或 ZooKeeper 视为黑盒,而是能够理解它们内部的工作原理、性能权衡以及在特定场景下的适用性。


  目录