结合DDD与事件溯源构建基于gRPC的幂等命令处理层


在分布式系统中,一个看似简单的API调用背后,网络延迟、瞬时故障或客户端重试逻辑都可能导致同一请求被多次发送。对于一个普通的读请求,这通常无伤大雅。但对于一个修改系统状态的写操作,例如“为用户账户充值100元”,重复执行将直接导致数据不一致甚至资金损失。在真实项目中,保证命令操作的幂等性(Idempotency)并非可选项,而是构建可靠系统的基石。

当系统架构采用领域驱动设计(DDD)与事件溯源(Event Sourcing)模式时,这个挑战会呈现出新的形态。与传统CRUD直接修改状态不同,事件溯源通过追加一系列不可变的事件来记录状态的每一次变迁。这意味着我们不能简单地通过检查最终状态来判断一个操作是否已执行。例如,我们无法通过查询库存余量来确定一次“出库”操作是否已发生,因为多次出库操作可能导致相同的余量。幂等性的保证必须深入到命令处理与事件产生的核心流程中。

本文将围绕一个具体的库存管理领域,从零开始构建一个基于gRPC-Go的、支持幂等性保证的命令处理服务。我们将看到DDD如何帮助我们划分边界、事件溯源如何提供事实真相的唯一来源,以及gRPC如何作为高效、强类型的API层将这一切串联起来。

架构蓝图:幂等性作为一等公民

在动手编写代码前,我们必须先明确处理流程。一个携带幂等键(Idempotency Key)的命令请求,其完整的生命周期应该如下:

sequenceDiagram
    participant Client
    participant gRPC Server
    participant Interceptor as Idempotency Interceptor
    participant Handler as Command Handler
    participant Aggregate as Inventory Aggregate
    participant EventStore

    Client->>gRPC Server: SendCommand(IdempotencyKey, ...)
    gRPC Server->>Interceptor: Intercept Request
    Interceptor->>Handler: Forward if new key
    Handler->>EventStore: Load Aggregate Events
    EventStore-->>Handler: Return Event Stream
    Handler->>Aggregate: Rehydrate from Events
    Handler->>Aggregate: ExecuteCommand(IdempotencyKey, ...)
    alt Command not processed
        Aggregate->>Handler: Return New Events
        Handler->>EventStore: Persist Events (atomically)
        EventStore-->>Handler: Success
        Handler-->>gRPC Server: Return Success Response
    else Command already processed
        Aggregate->>Handler: Return nil (no new events)
        Handler-->>gRPC Server: Return Success Response (cached or generic)
    end
    gRPC Server-->>Client: Response

这个流程的核心在于:

  1. gRPC API层: 负责接收命令。我们将设计.proto文件,强制要求所有变更状态的RPC都必须携带幂等键。
  2. 幂等性拦截器 (Interceptor): 这是一个可选但推荐的优化层。它可以拦截请求,对幂等键进行快速检查,例如利用Redis进行存在性判断,避免对已完成的请求进行重复的业务逻辑处理。为了聚焦核心逻辑,本文将暂时省略独立的拦截器,而将幂等性检查的职责完全放在聚合根内部。在真实项目中,两者结合效果更佳。
  3. 命令处理器 (Command Handler): 负责协调,它从事件存储中加载指定聚合的历史事件,重建聚合状态,然后将命令委派给聚合根处理。
  4. 聚合根 (Aggregate Root): 这是DDD的核心,封装了所有业务规则。幂等性检查是聚合必须遵守的业务规则之一。它需要记录下所有已处理的命令ID,并在执行新命令时进行校验。
  5. 事件存储 (Event Store): 持久化事件流的唯一事实来源。它的设计需要保证事件追加的原子性。

API 定义先行:用 Protobuf 契约强制幂等性

一切从API的契约开始。使用gRPC,我们的契约就是.proto文件。我们将为库存服务定义两个核心命令:ReserveStock(预留库存)和Restock(补充库存)。

proto/inventory/v1/inventory.proto:

syntax = "proto3";

package inventory.v1;

import "google/protobuf/empty.proto";

option go_package = "github.com/your-repo/inventory/gen/go/inventory/v1;inventoryv1";

// InventoryService 定义了库存管理相关的命令
service InventoryService {
  // 预留库存
  rpc ReserveStock(ReserveStockRequest) returns (google.protobuf.Empty);
  // 补充库存
  rpc Restock(RestockRequest) returns (google.protobuf.Empty);
}

// 通用的命令元数据,强制包含幂等键
message CommandMetadata {
  string idempotency_key = 1; // UUIDv4, 由客户端生成并保证在重试时不变
}

message ReserveStockRequest {
  CommandMetadata metadata = 1;
  string product_id = 2;
  int32 quantity = 3;
}

message RestockRequest {
  CommandMetadata metadata = 1;
  string product_id = 2;
  int32 quantity = 3;
}

这里的关键设计是CommandMetadata。我们没有直接在每个Request中放一个idempotency_key字段,而是将其封装在一个公共的CommandMetadata消息中。这是一种良好的实践,它清晰地分离了业务载荷(如product_id)和执行上下文元数据。未来如果需要添加如causation_idcorrelation_id等用于分布式追踪的字段,都可以平滑地扩展到CommandMetadata中,而无需修改大量的RPC定义。

领域模型:将幂等性检查内化为业务规则

现在进入核心的DDD建模部分。我们的主角是Inventory聚合。在事件溯源中,聚合的状态是通过应用(Apply)一系列历史事件计算得出的,它本身不直接持久化。

internal/domain/inventory.go:

package domain

import (
	"errors"
	"fmt"
)

var (
	ErrInsufficientStock     = errors.New("insufficient stock")
	ErrCommandAlreadyProcessed = errors.New("command already processed")
)

// Event 是系统中所有事件的基础接口
type Event interface {
	isEvent()
}

// Inventory 聚合根
type Inventory struct {
	ProductID         string
	AvailableStock    int32
	version           int
	changes           []Event // 本次操作产生的新事件
	processedCommands map[string]bool // 用于幂等性检查
}

// NewInventory 创建一个干净的库存聚合实例
func NewInventory(productID string) *Inventory {
	return &Inventory{
		ProductID:         productID,
		processedCommands: make(map[string]bool),
	}
}

// RehydrateFromEvents 从事件历史中重建聚合状态
func (i *Inventory) RehydrateFromEvents(events []Event) {
	for _, e := range events {
		i.Apply(e)
		i.version++
	}
}

// --- 命令处理方法 ---

// ReserveStock 处理预留库存命令
func (i *Inventory) ReserveStock(commandID string, quantity int32) error {
	// 规则1:幂等性检查
	if i.processedCommands[commandID] {
		// 这是幂等性的体现:对于已处理的命令,直接返回成功,但不产生任何新事件。
		return nil
	}

	// 规则2:业务逻辑检查 - 库存是否充足
	if i.AvailableStock < quantity {
		return fmt.Errorf("%w: requested %d, available %d", ErrInsufficientStock, quantity, i.AvailableStock)
	}

	// 如果所有规则通过,产生新事件
	// 注意:此时状态尚未改变
	i.trackChange(&StockReserved{
		ProductID: i.ProductID,
		Quantity:  quantity,
	})

	// 记录命令已被处理
	i.trackChange(&CommandProcessed{
		CommandID: commandID,
	})

	return nil
}

// Restock 处理补货命令
func (i *Inventory) Restock(commandID string, quantity int32) error {
	if i.processedCommands[commandID] {
		return nil
	}

	i.trackChange(&StockRestocked{
		ProductID: i.ProductID,
		Quantity:  quantity,
	})

	i.trackChange(&CommandProcessed{
		CommandID: commandID,
	})

	return nil
}


// --- 状态变更方法 ---

// Apply 根据事件类型,修改聚合的内部状态
func (i *Inventory) Apply(event Event) {
	switch e := event.(type) {
	case *StockReserved:
		i.AvailableStock -= e.Quantity
	case *StockRestocked:
		i.AvailableStock += e.Quantity
	case *CommandProcessed:
		i.processedCommands[e.CommandID] = true
	}
}

// trackChange 记录一个新产生的事件
func (i *Inventory) trackChange(event Event) {
	i.Apply(event)
	i.changes = append(i.changes, event)
}

// GetChanges 返回本次操作产生的所有新事件
func (i *Inventory) GetChanges() []Event {
	return i.changes
}

// ClearChanges 清空待提交事件列表
func (i *Inventory) ClearChanges() {
	i.changes = nil
}

// Version 返回当前聚合的版本号
func (i *Inventory) Version() int {
	return i.version
}

// --- 事件定义 ---

type StockReserved struct {
	ProductID string
	Quantity  int32
}
func (e *StockReserved) isEvent() {}

type StockRestocked struct {
	ProductID string
	Quantity  int32
}
func (e *StockRestocked) isEvent() {}

// CommandProcessed 是一个关键的技术事件,专门用于记录幂等键
type CommandProcessed struct {
	CommandID string
}
func (e *CommandProcessed) isEvent() {}

这段代码蕴含了几个关键设计决策:

  1. processedCommands map: 这是实现幂等性的核心数据结构。聚合在从事件流重建时,会把所有CommandProcessed事件中的CommandID加载到这个map中。
  2. CommandProcessed 事件: 我们没有将幂等键附加到业务事件(如StockReserved)上,而是创建了一个独立的CommandProcessed事件。这是一个重要的解耦。业务事件应该只关心业务状态的变更,而命令是否被处理过是一个技术层面的保障。这样做让业务事件更纯粹。
  3. 幂等性作为业务规则: ReserveStock方法的第一步就是检查processedCommands。这明确地将幂等性检查提升为领域逻辑的一部分。如果命令已处理,方法直接返回nil,表示操作成功(因为从客户端视角看,其意图已经达成),但不会产生任何新的业务事件。
  4. RehydrateFromEventsApply: 这是事件溯源模式的经典实现。RehydrateFromEvents负责重放历史,Apply则负责根据事件更新内部状态。注意,Apply只做状态变更,不包含任何业务逻辑。所有决策都在命令处理方法中完成。

事件存储与资源库:持久化与重建的桥梁

聚合根和事件是内存中的对象,我们需要一个机制将它们持久化到数据库,并在需要时重新加载。这就是资源库(Repository)和事件存储(Event Store)的职责。

为了简单起见,我们先实现一个内存版的事件存储。在生产环境中,这通常由PostgreSQL、EventStoreDB或类似的支持事务和乐观并发控制的数据库实现。

internal/eventstore/memory.go:

package eventstore

import (
	"context"
	"errors"
	"fmt"
	"sync"
	"github.com/your-repo/inventory/internal/domain"
)

var (
	ErrConcurrency = errors.New("concurrency conflict: aggregate has been modified")
)

// InMemoryEventStore 是一个简单的线程安全的内存事件存储
type InMemoryEventStore struct {
	sync.RWMutex
	streams map[string][]domain.Event
}

func NewInMemoryEventStore() *InMemoryEventStore {
	return &InMemoryEventStore{
		streams: make(map[string][]domain.Event),
	}
}

// GetEventsForAggregate 获取指定聚合的所有事件
func (s *InMemoryEventStore) GetEventsForAggregate(ctx context.Context, aggregateID string) ([]domain.Event, error) {
	s.RLock()
	defer s.RUnlock()

	stream, ok := s.streams[aggregateID]
	if !ok {
		return nil, nil // Not an error, just no events yet
	}
	return stream, nil
}

// SaveEvents 原子地保存事件
func (s *InMemoryEventStore) SaveEvents(ctx context.Context, aggregateID string, expectedVersion int, events []domain.Event) error {
	s.Lock()
	defer s.Unlock()

	currentStream := s.streams[aggregateID]
	currentVersion := len(currentStream)

	// 关键:乐观并发控制检查
	if currentVersion != expectedVersion {
		return fmt.Errorf("%w: expected version %d, but was %d", ErrConcurrency, expectedVersion, currentVersion)
	}

	s.streams[aggregateID] = append(currentStream, events...)
	return nil
}

// InventoryRepository 封装了与库存聚合相关的加载和保存逻辑
type InventoryRepository struct {
	store *InMemoryEventStore
}

func NewInventoryRepository(store *InMemoryEventStore) *InventoryRepository {
	return &InventoryRepository{store: store}
}

// Load 根据产品ID加载库存聚合
func (r *InventoryRepository) Load(ctx context.Context, productID string) (*domain.Inventory, error) {
	events, err := r.store.GetEventsForAggregate(ctx, productID)
	if err != nil {
		return nil, fmt.Errorf("failed to get events for aggregate %s: %w", productID, err)
	}

	inventory := domain.NewInventory(productID)
	if len(events) > 0 {
		inventory.RehydrateFromEvents(events)
	}
	return inventory, nil
}

// Save 保存库存聚合的变更
func (r *InventoryRepository) Save(ctx context.Context, inv *domain.Inventory) error {
	changes := inv.GetChanges()
	if len(changes) == 0 {
		return nil // No changes to save
	}

	err := r.store.SaveEvents(ctx, inv.ProductID, inv.Version(), changes)
	if err != nil {
		return fmt.Errorf("failed to save events for aggregate %s: %w", inv.ProductID, err)
	}

	// 成功保存后,清空聚合的待变更列表
	inv.ClearChanges()
	return nil
}

这段代码的重点是SaveEvents中的乐观并发控制。在加载聚合时,我们记录其版本号(即事件数量)。在保存时,我们检查当前数据库中的版本号是否与我们加载时的一致。如果不一致,说明在我们处理命令的期间,有另一个进程已经修改了该聚合。此时,我们的操作必须失败并回滚,由上层逻辑(例如客户端)决定是否重试。这是保证高并发环境下数据一致性的关键手段。

应用层:粘合 gRPC 和领域逻辑

应用层是所有组件的粘合剂。它实现了由.proto文件生成的gRPC服务接口,并负责编排整个命令处理流程。

internal/application/grpc_server.go:

package application

import (
	"context"
	"errors"
	"log/slog"

	"google.golang.org/grpc"
	"google.golang.org/grpc/codes"
	"google.golang.org/grpc/status"

	"github.com/your-repo/inventory/internal/domain"
	"github.com/your-repo/inventory/internal/eventstore"
	inventoryv1 "github.com/your-repo/inventory/gen/go/inventory/v1"
)

// GrpcServer 实现了 InventoryServiceServer 接口
type GrpcServer struct {
	inventoryv1.UnimplementedInventoryServiceServer
	repo   *eventstore.InventoryRepository
	logger *slog.Logger
}

func NewGrpcServer(repo *eventstore.InventoryRepository, logger *slog.Logger) *GrpcServer {
	return &GrpcServer{repo: repo, logger: logger}
}

func (s *GrpcServer) Register(grpcServer *grpc.Server) {
	inventoryv1.RegisterInventoryServiceServer(grpcServer, s)
}

// ReserveStock gRPC 方法的实现
func (s *GrpcServer) ReserveStock(ctx context.Context, req *inventoryv1.ReserveStockRequest) (*empty.Empty, error) {
    // 1. 输入校验
	if req.GetMetadata().GetIdempotencyKey() == "" || req.GetProductId() == "" || req.GetQuantity() <= 0 {
		return nil, status.Error(codes.InvalidArgument, "metadata.idempotency_key, product_id and quantity are required")
	}

	logger := s.logger.With(
		"rpc", "ReserveStock",
		"product_id", req.GetProductId(),
		"idempotency_key", req.GetMetadata().GetIdempotencyKey(),
	)

	// 2. 命令处理委托给一个通用的处理函数
	err := s.handleCommand(ctx, req.GetProductId(), func(inv *domain.Inventory) error {
		return inv.ReserveStock(req.GetMetadata().GetIdempotencyKey(), req.GetQuantity())
	})

	if err != nil {
		logger.Error("failed to handle ReserveStock command", "error", err)
		if errors.Is(err, domain.ErrInsufficientStock) {
			return nil, status.Error(codes.FailedPrecondition, err.Error())
		}
		if errors.Is(err, eventstore.ErrConcurrency) {
			// 对于并发冲突,通常建议客户端进行重试
			return nil, status.Error(codes.Aborted, err.Error())
		}
		return nil, status.Error(codes.Internal, "internal server error")
	}

	logger.Info("ReserveStock command processed successfully")
	return &empty.Empty{}, nil
}

// Restock gRPC 方法的实现
func (s *GrpcServer) Restock(ctx context.Context, req *inventoryv1.RestockRequest) (*empty.Empty, error) {
	// ... 实现与 ReserveStock 类似 ...
    // 只是委托给 inv.Restock(...)
    return &empty.Empty{}, nil // 简化示例
}

// handleCommand 是一个通用的命令处理流程,封装了加载-执行-保存的模式
func (s *GrpcServer) handleCommand(ctx context.Context, productID string, handlerFunc func(inv *domain.Inventory) error) error {
	// 加载聚合
	inventory, err := s.repo.Load(ctx, productID)
	if err != nil {
		return fmt.Errorf("failed to load inventory: %w", err)
	}

	// 执行业务逻辑
	if err := handlerFunc(inventory); err != nil {
		return err // 直接返回业务错误,如库存不足
	}

	// 保存变更
	if err := s.repo.Save(ctx, inventory); err != nil {
		return fmt.Errorf("failed to save inventory: %w", err)
	}
	return nil
}

handleCommand函数是这里的核心抽象。它体现了事件溯源中命令处理的固定模式:加载聚合 -> 执行命令 -> 保存事件。通过将这个模式提取出来,每个具体的gRPC方法(如ReserveStock)的实现就变得非常简洁,只需关注参数校验和调用正确的聚合方法。同时,错误处理也变得集中和清晰。例如,我们将领域层的ErrInsufficientStock映射为gRPC的FailedPrecondition状态码,将存储层的ErrConcurrency映射为Aborted状态码,为客户端提供了明确的错误语义。

常见误区与最佳实践的再思考

  1. 数据库层面的最终保障: 我们在聚合根的processedCommands map中实现了幂等性检查。这在单节点服务中是可靠的。但在一个真实的、持久化的事件存储(例如基于PostgreSQL)中,我们应该更进一步。可以在events表中,对aggregate_id和事件载荷中的command_id建立一个联合唯一索引。这样,当尝试插入一个重复的CommandProcessed事件时,数据库会直接拒绝,提供了最强的一致性保证,防止了任何可能绕过应用层逻辑的并发写入问题。

  2. 幂等键的生命周期: 客户端生成的幂等键应该存储多久?如果一个命令在1年后被重试,我们还需要保证幂等性吗?通常不需要。幂等性保证窗口一般是24小时到几天。对于基于事件溯源的聚合检查,只要聚合本身存在,检查就有效。但如果事件流会归档或快照会覆盖早期历史,那么依赖事件流中的CommandProcessed事件可能会失效。在这种情况下,一个独立的、带TTL的幂等键存储(如Redis)作为第一道防线就显得尤为重要。

  3. 幂等性与响应缓存: 我们的实现保证了命令的“至多一次执行”。但如果客户端第一次请求超时,然后重试并成功,它如何获取第一次执行的结果?完整的幂等性方案不仅要记录命令已处理,还应该缓存第一次执行的响应。当检测到重复请求时,直接返回缓存的响应。这在gRPC拦截器层实现比在领域层更合适,因为它不属于核心业务逻辑。

方案的局限性与演进方向

我们构建的这套幂等性命令处理层,虽然健壮,但并非银弹。它的主要开销在于每次命令处理都需要从事件存储中加载一个聚合的全部历史事件。对于事件流非常长(例如,一个产品的库存变更历史长达数百万条)的聚合,重建成本会变得不可接受。此时,引入快照(Snapshotting)机制是必然的演进路径。系统可以定期为聚合状态创建一个快照,在加载时,只需加载最新的快照,并重放快照之后发生的事件即可,大大降低了重建聚合的I/O和计算成本。

此外,本方案聚焦于命令端的幂等性。在完整的CQRS(命令查询职责分离)架构中,事件最终会被投射到各种读模型(Read Models)中以优化查询性能。这个投射过程是异步的,存在最终一致性。如何确保下游消费者(比如更新搜索索引的服务)也能幂等地处理这些事件,是另一个需要深入探讨的课题,通常会涉及到在事件本身或消息中间件层面进行去重。


  目录