我们团队的一个核心 Node.js 应用,部署在阿里云几台 ECS 上,最近在处理订单支付回调时频繁出现双重处理的问题。初步排查定位是并发写入导致的状态不一致。最直接的解决方案是引入分布式锁,但用 Redis 的 SETNX
实现的锁并不完全可靠——业务进程如果持有锁并异常崩溃,锁无法自动释放,会导致死锁;而为锁设置过期时间,又可能因为业务执行时间超过预期而导致锁被错误地释放。我们需要一个真正能保证互斥性 (Safety) 的锁。
调研了 ZooKeeper 和 etcd,它们确实能解决问题,但对于我们当前这个单一场景来说,引入一个如此重的组件显得有些过度设计,并且增加了运维的复杂性。更重要的是,团队希望借此机会深入理解分布式一致性的底层原理。因此,我们决定自己动手,用 Go 语言和经典的 Paxos 协议,构建一个轻量级、高可用的分布式锁服务。
技术选型决策:为什么是 Go + Paxos?
选择 Go 作为实现语言几乎是下意识的决定。它的并发模型(goroutine 和 channel)非常适合构建网络服务和处理并发请求,静态类型和编译型特性也为构建可靠的底层服务提供了保障。
选择 Paxos 而非更易于理解的 Raft,是一个更具挑战性的决定。Raft 通过强制指定一个 Leader 来简化了共识过程,工程上更容易实现。但 Paxos 是所有共识算法的理论基石,它将角色(Proposer, Acceptor, Learner)解耦,不依赖一个稳定的 Leader。在某些极端网络分区场景下,理论上 Paxos 的可用性更高。我们希望通过实现这个“更原始”的协议,来彻底搞懂分布式系统中最核心的共识问题。
整个服务将部署在阿里云的 VPC 内,Node.js 业务应用作为客户端,通过 gRPC 与锁服务集群通信。
graph TD subgraph 阿里云 VPC subgraph "业务应用 (Node.js)" App1[ECS: Node App 1] App2[ECS: Node App 2] end subgraph "分布式锁服务 (Go)" P1[ECS: Paxos Node 1] P2[ECS: Paxos Node 2] P3[ECS: Paxos Node 3] end App1 -->|gRPC| P1 App1 -->|gRPC| P2 App1 -->|gRPC| P3 App2 -->|gRPC| P1 App2 -->|gRPC| P2 App2 -->|gRPC| P3 P1 <--> P2 P1 <--> P3 P2 <--> P3 end
从 Basic Paxos 到 Multi-Paxos 日志状态机
一个标准的 Paxos 实例只能就一个值达成一次共识。而一个锁服务需要处理一系列的操作序列(Acquire, Release)。这就需要 Multi-Paxos。
Multi-Paxos 的核心思想是将一系列操作组织成一个日志(Log),并为日志中的每一个槽位(slot/index)运行一次独立的 Basic Paxos 实例来确定该槽位的操作内容。这样,所有节点最终会拥有一个完全一致的日志副本,从而构成一个复制状态机(Replicated State Machine, RSM)。
核心数据结构
我们的 Go 服务需要为每个 Paxos 节点(我们称之为 Server)维护状态。
package main
import (
"log"
"sync"
)
// Paxos 消息体
type Message struct {
Type string // "prepare", "promise", "propose", "accepted"
From string // 发送方地址
To string // 接收方地址
InstanceID int // Paxos 实例 ID, 对应于日志索引
ProposalN int // 提案编号
Value string // 提案值,例如 "LOCK order_123"
}
// Acceptor 状态
type AcceptorState struct {
PromisedN int // 已响应的最高提案编号
AcceptedN int // 已接受的最高提案编号
AcceptedValue string // 已接受的提案值
}
// Server 代表一个 Paxos 节点
type Server struct {
mu sync.Mutex
addr string // 节点地址
peers []string // 其他节点地址
majority int // 集群多数派数量
// 每个日志索引(Paxos 实例)都对应一个 Acceptor 状态
// key: InstanceID, value: AcceptorState
acceptorStates map[int]*AcceptorState
// Proposer 状态
nextProposalN int
log map[int]string // 已确认的日志
nextLogIndex int // 下一个待确定的日志索引
}
func NewServer(addr string, peers []string) *Server {
s := &Server{
addr: addr,
peers: peers,
majority: len(peers)/2 + 1,
acceptorStates: make(map[int]*AcceptorState),
nextProposalN: 1,
log: make(map[int]string),
nextLogIndex: 0,
}
log.Printf("[%s] Server initialized with peers: %v, majority: %d", addr, peers, s.majority)
return s
}
// 获取或初始化指定实例的 Acceptor 状态
func (s *Server) getAcceptorState(instanceID int) *AcceptorState {
if _, ok := s.acceptorStates[instanceID]; !ok {
s.acceptorStates[instanceID] = &AcceptorState{}
}
return s.acceptorStates[instanceID]
}
这里的 Server
结构体同时扮演了 Proposer 和 Acceptor 的角色。在真实项目中,这些角色是逻辑上的,都运行在同一个进程内。acceptorStates
是关键,它用一个 map 将日志索引(InstanceID
)与该索引对应的 Paxos 实例状态分离开,这是实现 Multi-Paxos 的基础。
实现 Paxos 的两个阶段
Phase 1: Prepare-Promise
Proposer 发送一个带有提案号 N
的 Prepare 请求。Acceptor 收到后,如果 N
大于它之前响应过的所有提案号,它就承诺不再响应任何小于 N
的提案,并将自己之前接受过的提案(如果有的话)返回给 Proposer。
// handlePrepare 处理 Prepare 请求
func (s *Server) handlePrepare(msg Message) {
s.mu.Lock()
defer s.mu.Unlock()
state := s.getAcceptorState(msg.InstanceID)
if msg.ProposalN > state.PromisedN {
log.Printf("[%s] Instance %d: Promised for proposal N=%d (previous promised N=%d)", s.addr, msg.InstanceID, msg.ProposalN, state.PromisedN)
state.PromisedN = msg.ProposalN
// 响应 Promise
promiseMsg := Message{
Type: "promise",
From: s.addr,
To: msg.From,
InstanceID: msg.InstanceID,
ProposalN: state.PromisedN, // 返回自己承诺的N
Value: state.AcceptedValue, // 返回之前接受过的值
}
// 在实际项目中,这里会通过网络发送消息
// sendMessage(promiseMsg)
log.Printf("[%s] Instance %d: Sent promise for N=%d", s.addr, msg.InstanceID, promiseMsg.ProposalN)
} else {
log.Printf("[%s] Instance %d: Rejected prepare for N=%d (already promised N=%d)", s.addr, msg.InstanceID, msg.ProposalN, state.PromisedN)
}
}
Phase 2: Propose-Accepted
当 Proposer 收到来自多数派 Acceptors 的 Promise 响应后,它就可以发起 Propose 请求。提案的值需要遵循一个规则:如果收到的 Promise 响应中包含了之前被接受过的值,那么 Proposer 必须选择其中提案号最高的那个值作为本次提案的值。否则,它可以自由选择自己的值。
Acceptor 收到 Propose 请求后,如果请求的提案号 N
不小于它已承诺的提案号,就接受该提案。
// handlePropose 处理 Propose 请求
func (s *Server) handlePropose(msg Message) {
s.mu.Lock()
defer s.mu.Unlock()
state := s.getAcceptorState(msg.InstanceID)
// 只有当提案号不小于已承诺的提案号时,才接受
if msg.ProposalN >= state.PromisedN {
log.Printf("[%s] Instance %d: Accepted proposal N=%d with value '%s'", s.addr, msg.InstanceID, msg.ProposalN, msg.Value)
state.PromisedN = msg.ProposalN
state.AcceptedN = msg.ProposalN
state.AcceptedValue = msg.Value
// 广播 Accepted 消息给所有节点(包括 Learner)
acceptedMsg := Message{
Type: "accepted",
From: s.addr,
To: "broadcast", // 广播
InstanceID: msg.InstanceID,
ProposalN: msg.ProposalN,
Value: msg.Value,
}
// broadcastMessage(acceptedMsg)
log.Printf("[%s] Instance %d: Broadcasted 'accepted' for value '%s'", s.addr, msg.InstanceID, acceptedMsg.Value)
} else {
log.Printf("[%s] Instance %d: Rejected propose for N=%d (already promised N=%d)", s.addr, msg.InstanceID, msg.ProposalN, state.PromisedN)
}
}
// Proposer 逻辑:尝试为一个值达成共识
func (s *Server) propose(value string) {
s.mu.Lock()
instanceID := s.nextLogIndex
proposalN := s.nextProposalN
s.nextProposalN++
s.mu.Unlock()
log.Printf("[%s] Starting proposal for instance %d with value '%s' and N=%d", s.addr, instanceID, value, proposalN)
// Phase 1a: 发送 Prepare 请求
prepareMsg := Message{
Type: "prepare",
From: s.addr,
To: "broadcast",
InstanceID: instanceID,
ProposalN: proposalN,
}
// responses := broadcastAndWaitForMajority(prepareMsg)
// 这里的 broadcastAndWaitForMajority 是一个伪代码,代表了网络通信和等待多数派响应的逻辑
// 假设我们收到了多数派的 Promise 响应
// Phase 1b: 检查响应并决定提案值
highestAcceptedN := -1
valueToPropose := value
// for _, resp := range responses {
// if resp.AcceptedN > highestAcceptedN {
// highestAcceptedN = resp.AcceptedN
// valueToPropose = resp.AcceptedValue
// }
// }
// Phase 2a: 发送 Propose 请求
proposeMsg := Message{
Type: "propose",
From: s.addr,
To: "broadcast",
InstanceID: instanceID,
ProposalN: proposalN,
Value: valueToPropose,
}
// broadcastMessage(proposeMsg)
log.Printf("[%s] Sent propose for instance %d with value '%s'", s.addr, instanceID, valueToPropose)
}
// handleAccepted 学习到一个值
func (s *Server) handleAccepted(msg Message) {
s.mu.Lock()
defer s.mu.Unlock()
// 实际项目中需要统计收到的 Accepted 消息数量,达到多数派才确认
// 为简化,这里直接学习
if _, exists := s.log[msg.InstanceID]; !exists {
s.log[msg.InstanceID] = msg.Value
log.Printf("[%s] Learned value '%s' for instance %d. Log: %v", s.addr, msg.Value, msg.InstanceID, s.log)
// 推进日志索引
for {
if _, ok := s.log[s.nextLogIndex]; ok {
s.nextLogIndex++
} else {
break
}
}
}
}
这里的代码只是骨架,一个生产级的实现需要处理网络通信(gRPC 或其他 RPC 框架)、超时、重试、以及保证提案号的唯一性(通常是 timestamp + server_id
的组合)。
在 Multi-Paxos 之上构建锁服务
我们的复制状态机(日志)现在可以对一系列操作达成共识了。接下来就是定义锁操作。
- Acquire(lockName, clientID, leaseID): 客户端请求获取锁。这个操作会被 Proposer 序列化成一个字符串,例如
"ACQUIRE order_123 client_A lease_XYZ"
,然后提交给 Multi-Paxos 进行共识。 - Release(lockName, clientID, leaseID): 客户端请求释放锁。同理,序列化成
"RELEASE order_123 client_A lease_XYZ"
。
每个 Paxos 节点在学习到一条新的日志条目后,都会在本地应用这个操作,更新一个内存中的锁状态表。
// lock_service.go
type LockState struct {
Name string
Locked bool
OwnerID string // clientID
LeaseID string
}
type LockService struct {
mu sync.Mutex
locks map[string]*LockState // key: lockName
paxosServer *Server // 内嵌的 Paxos 服务实例
}
// 当 Paxos 日志有更新时,此方法被调用
func (ls *LockService) applyLogEntry(entry string) {
ls.mu.Lock()
defer ls.mu.Unlock()
parts := strings.Split(entry, " ")
if len(parts) < 2 {
return
}
op, lockName := parts[0], parts[1]
state, ok := ls.locks[lockName]
if !ok {
state = &LockState{Name: lockName}
ls.locks[lockName] = state
}
switch op {
case "ACQUIRE":
if !state.Locked {
state.Locked = true
state.OwnerID = parts[2]
state.LeaseID = parts[3]
log.Printf("[LockService] Lock '%s' acquired by '%s'", lockName, state.OwnerID)
}
case "RELEASE":
// 必须是锁的持有者才能释放
if state.Locked && state.OwnerID == parts[2] && state.LeaseID == parts[3] {
state.Locked = false
state.OwnerID = ""
state.LeaseID = ""
log.Printf("[LockService] Lock '%s' released by '%s'", lockName, parts[2])
}
}
}
暴露 gRPC 接口并构建 Node.js 客户端
为了让 Node.js 应用能使用我们的服务,我们使用 gRPC 定义接口。
lock.proto
:
syntax = "proto3";
package lock;
service LockService {
rpc Acquire(AcquireRequest) returns (AcquireResponse);
rpc Release(ReleaseRequest) returns (ReleaseResponse);
}
message AcquireRequest {
string lock_name = 1;
string client_id = 2;
}
message AcquireResponse {
bool acquired = 1;
string lease_id = 2; // 租约ID,用于安全释放
}
message ReleaseRequest {
string lock_name = 1;
string client_id = 2;
string lease_id = 3;
}
message ReleaseResponse {
bool released = 1;
}
Go 服务实现这些 gRPC 方法。Acquire
方法会生成一个唯一的 leaseID
,然后将 ACQUIRE
命令通过 paxosServer.propose()
提交。这里的坑在于,propose
是异步的,我们需要一种机制来等待共识结果。一个常见的模式是使用一个 channel,在提交提案时将 channel 存入一个 map,key 是日志索引。当该索引的日志被学习到后,再通过 channel 通知 RPC 处理协程。
Node.js 客户端的实现就相对直接了:
// lockClient.js
const grpc = require('@grpc/grpc-js');
const protoLoader = require('@grpc/proto-loader');
const { randomUUID } = require('crypto');
const PROTO_PATH = './lock.proto';
const packageDefinition = protoLoader.loadSync(PROTO_PATH);
const lockProto = grpc.loadPackageDefinition(packageDefinition).lock;
// 假设我们的Go服务集群有3个节点,客户端可以随机或轮询连接
const LOCK_SERVICE_TARGET = '172.18.0.101:50051'; // 阿里云内网IP
class DistributedLock {
constructor(lockName, clientId) {
this.lockName = lockName;
this.clientId = clientId || `node-client-${randomUUID()}`;
this.leaseId = null;
this.client = new lockProto.LockService(LOCK_SERVICE_TARGET, grpc.credentials.createInsecure());
}
async acquire() {
return new Promise((resolve, reject) => {
this.client.acquire({ lock_name: this.lockName, client_id: this.clientId }, (err, response) => {
if (err) {
console.error(`Error acquiring lock ${this.lockName}:`, err);
return reject(err);
}
if (response.acquired) {
this.leaseId = response.lease_id;
console.log(`Lock ${this.lockName} acquired by ${this.clientId} with lease ${this.leaseId}`);
resolve(true);
} else {
console.log(`Failed to acquire lock ${this.lockName} for ${this.clientId}`);
resolve(false);
}
});
});
}
async release() {
if (!this.leaseId) {
console.warn(`Cannot release lock ${this.lockName} without a lease.`);
return false;
}
return new Promise((resolve, reject) => {
this.client.release({ lock_name: this.lockName, client_id: this.clientId, lease_id: this.leaseId }, (err, response) => {
if (err) {
console.error(`Error releasing lock ${this.lockName}:`, err);
return reject(err);
}
if (response.released) {
console.log(`Lock ${this.lockName} released by ${this.clientId}`);
this.leaseId = null;
resolve(true);
} else {
console.log(`Failed to release lock ${this.lockName} for ${this.clientId}`);
resolve(false);
}
});
});
}
}
// 使用示例
async function criticalSection() {
const lock = new DistributedLock('payment_processing_lock');
try {
const acquired = await lock.acquire();
if (acquired) {
console.log('Inside critical section. Processing payment...');
// 模拟业务处理
await new Promise(resolve => setTimeout(resolve, 2000));
console.log('Payment processed.');
} else {
console.log('Could not acquire lock, another process is working.');
}
} catch (e) {
console.error('An error occurred during critical section:', e);
} finally {
if (lock.leaseId) {
await lock.release();
}
}
}
criticalSection();
部署到阿里云的注意事项
- 网络规划: 将所有 Go 服务节点和 Node.js 应用节点放置在同一个 VPC 下的同一个可用区内,使用内网 IP 通信,以降低延迟和网络成本。
- 安全组: 为 Go 服务所在的 ECS 实例配置安全组规则,仅允许来自 Node.js 应用 ECS 实例所在安全组的流量访问 gRPC 端口(例如 50051),同时允许集群内部节点之间互相通信。这是最小权限原则的体现。
- 服务发现: 在 Node.js 客户端中硬编码 Go 服务地址是不灵活的。在生产环境中,应该使用阿里云的服务发现产品(如 MSE)或自建一个简单的服务注册中心。
- 持久化: 我们的实现中,Paxos 日志是纯内存的。这意味着如果整个集群重启,所有锁的状态都会丢失。生产级系统必须将日志持久化到磁盘。可以使用 LevelDB 或者直接写文件,但需要注意
fsync
的调用时机,确保数据落盘,这是性能和可靠性之间的一个重要权衡。
局限性与未来迭代方向
这个从零构建的分布式锁服务成功解决了我们最初的问题,并且让我们对 Paxos 协议有了深刻的工程理解。但它离一个真正通用的生产级组件还有距离。
首先,这个 Multi-Paxos 实现为了简化,省略了 Leader 选举优化。在没有稳定 Leader 的情况下,每次写操作都可能发生提案冲突,导致多轮 Paxos 协商,延迟较高。实际的 Paxos 工程实现(如 Google 的 Chubby)都会选举出一个稳定的 Leader 来专门处理写请求,只有在 Leader 失效时才需要重新执行 Paxos 来选举新 Leader。这大大降低了正常情况下的延迟。
其次,成员变更(增删节点)的功能没有实现。当前的集群成员是静态配置的,无法动态扩缩容。实现动态成员变更需要通过 Paxos 协议本身来对成员变更的配置达成共识,这是一个相当复杂的话题。
最后,日志压缩也没有实现。随着服务运行,操作日志会无限增长,占用大量内存和磁盘空间,并拖慢新节点的启动恢复速度。需要定期对日志进行快照(Snapshot),丢弃旧的日志条目。
尽管存在这些局限,但这次实践的价值在于,它为我们构建更复杂的分布式系统打下了坚实的基础。我们不再将 etcd 或 ZooKeeper 视为黑盒,而是能够理解它们内部的工作原理、性能权衡以及在特定场景下的适用性。