构建基于动态密钥管理的混合消息网关以实现从 Google Pub/Sub 到 Pulsar 的平滑迁移


在真实项目中,技术选型很少是一次性的终点。随着业务规模的增长和成本结构的变化,原先看似完美的方案可能会成为新的瓶颈。我们面临的正是这样一个场景:核心业务严重依赖 Google Cloud Pub/Sub,其稳定性和易用性在早期阶段为我们提供了巨大价值。然而,当日均消息量突破百亿级别后,其成本已经攀升至一个无法忽视的水平,同时,单一云厂商的锁定也为我们未来的多云战略带来了风险。

经过评估,我们决定将消息系统逐步迁移至自建的 Apache Pulsar 集群。Pulsar 凭借其分层存储、多租户和强大的跨地域复制能力,为我们提供了更优的成本控制和架构灵活性。但挑战也随之而来:如何在一个持续处理高并发请求的生产环境中,实现从 Pub/Sub 到 Pulsar 的零停机、数据无损的平滑迁移?

直接切换是不可行的,任何形式的停机都将导致业务损失。唯一的出路是构建一个能够同时与两个系统交互的过渡性架构,在应用层实现双写和逐步切读,并确保整个过程的安全性。这意味着我们的核心应用需要一种机制,能够动态、安全地获取并管理两套完全不同的凭证体系:GCP 的服务账户密钥和 Pulsar 的认证 JWT。

架构决策:为何不是简单的双系统并存

方案A是维持现状,继续使用 Google Cloud Pub/Sub。其优势显而易见:完全托管,无需运维,与 GCP 生态无缝集成。但劣势也同样突出:成本与消息量强相关,在我们的场景下已成为第二大云开销;技术栈锁定在GCP,不利于后续的混合云部署;对于消息保留策略、多租户隔离等高级特性的控制力较弱。

方案B是迁移至自建 Apache Pulsar。优势在于:开源,无厂商锁定,长期来看总拥有成本(TCO)更低;架构先进,BookKeeper 提供的 IO 隔离和分层存储能力非常适合我们的业务场景;支持多租户和跨地域复制,为未来的架构扩展铺平了道路。其劣势在于:需要投入大量的运维精力来保障集群的稳定性、可扩展性和安全性;技术栈更复杂,团队需要时间来熟悉 ZooKeeper、BookKeeper 和 Broker 的协同工作原理。

我们的最终决策是选择方案B,但必须通过一个精心设计的过渡架构来对冲其风险。这个架构的核心目标是:

  1. 应用层无感知:上层业务逻辑不应关心当前消息是发往 Pub/Sub 还是 Pulsar。
  2. 安全性优先:密钥管理必须集中化、自动化,杜绝硬编码或静态配置文件中的凭证。短生命周期的动态密钥是基本要求。
  3. 可观测性:必须能够清晰地监控双写两端的消息延迟、成功率和失败率。
  4. 可控的切换:能够通过简单的配置变更,灵活控制双写的开启/关闭,以及消费端的数据源切换。

基于以上目标,我们设计的混合消息网关架构如下:

graph TD
    subgraph "客户端请求"
        UserRequest
    end

    subgraph "入口层"
        Nginx[Nginx Reverse Proxy]
    end

    subgraph "应用层 (Fastify Service)"
        App[Fastify App]
        App -- HTTP Request --> Controller
        Controller -- "publishMessage(data)" --> MessagingGateway
        MessagingGateway -- "getCredentials('gcp')" --> VaultClient
        MessagingGateway -- "getCredentials('pulsar')" --> VaultClient
        MessagingGateway -- "双写" --> PubSubClient
        MessagingGateway -- "双写" --> PulsarClient
    end

    subgraph "安全层"
        Vault[HashiCorp Vault]
        Vault -- "动态生成" --> GCP_SA_Key[GCP Service Account Key]
        Vault -- "动态生成" --> Pulsar_JWT[Pulsar JWT Token]
    end

    subgraph "消息系统"
        PubSub[Google Cloud Pub/Sub]
        Pulsar[Apache Pulsar Cluster]
    end

    UserRequest --> Nginx
    Nginx --> App
    VaultClient -- "API Call" --> Vault
    PubSubClient -- "Push Message" --> PubSub
    PulsarClient -- "Push Message" --> Pulsar

这个架构中,Fastify 应用是核心,它内部实现了一个 MessagingGateway,负责封装与两个消息系统的所有交互。而所有凭证的获取,都通过 VaultClient 向 HashiCorp Vault 请求动态生成的短期密钥。Nginx 则作为安全网关,负责 TLS 卸载和请求路由。

核心实现:动态密钥管理与消息网关

1. Vault 的配置:动态密钥的源头

在真实项目中,静态的、长期的密钥是巨大的安全隐患。我们使用 Vault 来根除这个问题。

首先,配置 Vault 的 GCP Secrets Engine,使其能够动态生成有时效性的 GCP 服务账户密钥。

# main.tf for Vault GCP Secret Engine configuration

# 启用 GCP secrets engine
resource "vault_mount" "gcp" {
  path        = "gcp"
  type        = "gcp"
  description = "Dynamic GCP credentials for Pub/Sub"
}

# 配置 GCP secrets engine
# 需要预先提供一个有权限创建和管理服务账户密钥的 Vault 服务账户
resource "vault_gcp_secret_backend" "config" {
  mount      = vault_mount.gcp.path
  credentials = file("path/to/your/vault-sa-credentials.json")
}

# 定义一个角色,应用通过这个角色来申请密钥
# 这个角色绑定的服务账户拥有对特定 Pub/Sub 主题的发布权限
resource "vault_gcp_secret_roleset" "pubsub_publisher" {
  backend      = vault_gcp_secret_backend.config.backend
  name         = "pubsub-publisher-role"
  project      = "your-gcp-project-id"
  secret_type  = "service_account_key"
  
  # 绑定到一个预先创建好的,权限受限的服务账户
  bindings = <<-EOT
    resource "//cloudresourcemanager.googleapis.com/projects/your-gcp-project-id" {
      roles = ["roles/pubsub.publisher"]
    }
  EOT
  
  token_scopes = [
    "https://www.googleapis.com/auth/cloud-platform",
  ]
  
  # 密钥的 TTL 设置为 1 小时,最大 4 小时
  ttl     = 3600
  max_ttl = 14400
}

应用将通过访问 Vault 的 gcp/token/pubsub-publisher-role 路径来获取一个有效期仅为1小时的服务账户私钥。

其次,为 Pulsar 配置 JWT/OIDC 认证。我们让 Vault 充当 OIDC Provider,为应用颁发携带特定角色的 JWT。

# main.tf for Vault JWT/OIDC configuration

# 启用 JWT auth method
resource "vault_auth_backend" "jwt" {
  type = "jwt"
  path = "jwt-pulsar"
}

# 配置 OIDC provider
resource "vault_jwt_auth_backend" "pulsar_provider" {
  backend              = vault_auth_backend.jwt.path
  oidc_discovery_url   = "http://127.0.0.1:8200" # Vault 的地址
  bound_issuer         = "http://127.0.0.1:8200"
}

# 创建一个角色,Fastify 应用将使用这个角色来获取 token
# 'bound_claims' 可以进一步限制哪些应用(基于其身份)可以请求
resource "vault_jwt_auth_backend_role" "app_role" {
  backend         = vault_auth_backend.jwt.path
  role_name       = "fastify-app-role"
  token_policies  = ["pulsar-publisher-policy"] # 关联一个能生成 Pulsar token 的策略
  
  # 定义生成的 token 的 claims
  token_type      = "OIDC"
  user_claim      = "sub"
  
  bound_claims = {
    "aud" = "pulsar-cluster"
  }

  # Token 有效期 1 小时
  token_ttl = 3600
}

# (Pulsar Broker 端需要配置 JWT 认证,并信任 Vault 的 OIDC key)

这样,Fastify 应用就能通过一个已认证的身份(例如 Kubernetes Service Account)向 Vault 请求一个携带 role: publisher 的 JWT,Pulsar Broker 收到后会验证该 JWT 的签名和声明,从而授予发布权限。

2. Nginx 反向代理配置

Nginx 扮演的角色不仅仅是流量转发,更是安全的第一道防线。这里的配置关注于 TLS 强化和代理的稳定性。

# /etc/nginx/conf.d/app.conf

upstream fastify_backend {
    # 假设 Fastify 应用运行在 3000 端口
    server 127.0.0.1:3000;
    # 可以配置多个后端实例
    # server app-instance-1:3000;
    # server app-instance-2:3000;
    
    # 启用长连接,减少三次握手的开销
    keepalive 32;
}

server {
    listen 443 ssl http2;
    listen [::]:443 ssl http2;

    server_name api.yourdomain.com;

    # SSL/TLS 安全配置
    ssl_certificate /etc/nginx/ssl/yourdomain.com.crt;
    ssl_certificate_key /etc/nginx/ssl/yourdomain.com.key;
    ssl_protocols TLSv1.2 TLSv1.3;
    ssl_prefer_server_ciphers on;
    ssl_ciphers 'ECDHE-ECDSA-AES256-GCM-SHA384:ECDHE-RSA-AES256-GCM-SHA384:ECDHE-ECDSA-CHACHA20-POLY1305:ECDHE-RSA-CHACHA20-POLY1305:ECDHE-ECDSA-AES128-GCM-SHA256:ECDHE-RSA-AES128-GCM-SHA256';
    ssl_session_cache shared:SSL:10m;
    ssl_session_timeout 10m;
    ssl_session_tickets off;
    add_header Strict-Transport-Security "max-age=31536000; includeSubDomains" always;

    # 代理设置
    location / {
        proxy_pass http://fastify_backend;

        # 传递真实客户端 IP 和协议信息
        proxy_set_header Host $host;
        proxy_set_header X-Real-IP $remote_addr;
        proxy_set_header X-Forwarded-For $proxy_add_x_forwarded_for;
        proxy_set_header X-Forwarded-Proto $scheme;

        # 连接超时设置
        proxy_connect_timeout 5s;
        proxy_send_timeout 10s;
        proxy_read_timeout 10s;

        # 开启 buffering
        proxy_buffering on;
        proxy_buffer_size 16k;
        proxy_buffers 4 64k;
    }

    # 日志配置
    access_log /var/log/nginx/api.access.log;
    error_log /var/log/nginx/api.error.log;
}

3. Fastify 消息网关实现

这是整个架构的核心,所有复杂性都被封装在这一层。

a. 配置文件 (config.ts)
我们通过环境变量来控制迁移策略。

// src/config.ts
import 'dotenv/config';

export const config = {
  // 迁移总开关
  migration: {
    // 'pubsub_only': 只写 Pub/Sub
    // 'dual_write': 双写
    // 'pulsar_only': 只写 Pulsar
    writeMode: process.env.MESSAGING_WRITE_MODE || 'pubsub_only', 
  },
  pubsub: {
    projectId: process.env.GCP_PROJECT_ID,
    topicName: 'my-global-topic',
  },
  pulsar: {
    serviceUrl: process.env.PULSAR_SERVICE_URL || 'pulsar://localhost:6650',
    topicName: 'persistent://public/default/my-global-topic',
  },
  vault: {
    addr: process.env.VAULT_ADDR || 'http://127.0.0.1:8200',
    token: process.env.VAULT_TOKEN, // 用于开发,生产环境应使用 AppRole 或 K8s Auth
    gcpCredsPath: 'gcp/token/pubsub-publisher-role',
    pulsarTokenPath: 'identity/oidc/token/fastify-app-role', // Vault OIDC 路径
    pulsarClientId: 'fastify-app'
  },
  server: {
    port: parseInt(process.env.PORT || '3000', 10),
    host: '0.0.0.0'
  }
};

b. 凭证管理服务 (vault.service.ts)
这个服务负责与 Vault 通信,获取并缓存动态密钥。一个常见的错误是不做缓存,导致每次请求都访问 Vault,增加延迟和 Vault 的负载。

// src/services/vault.service.ts
import nodeVault from 'node-vault';

// ... (导入 config, logger 等)

interface GcpCredentials {
  private_key: string;
  client_email: string;
}

interface PulsarCredentials {
  token: string;
}

class VaultService {
  private vault: nodeVault.client;
  private gcpCredsCache: GcpCredentials | null = null;
  private pulsarCredsCache: PulsarCredentials | null = null;
  private gcpCredsExpiresAt: number = 0;
  private pulsarCredsExpiresAt: number = 0;

  constructor() {
    this.vault = nodeVault({
      apiVersion: 'v1',
      endpoint: config.vault.addr,
      token: config.vault.token,
    });
  }

  // 为 GCP 获取服务账户密钥
  public async getGcpCredentials(): Promise<GcpCredentials> {
    // 缓冲期,在密钥过期前 5 分钟就刷新
    if (this.gcpCredsCache && Date.now() < this.gcpCredsExpiresAt - 5 * 60 * 1000) {
      return this.gcpCredsCache;
    }

    try {
      const result = await this.vault.read(config.vault.gcpCredsPath);
      const privateKeyData = Buffer.from(result.data.private_key_data, 'base64').toString('utf-8');
      const credentials = JSON.parse(privateKeyData);
      
      this.gcpCredsCache = {
        private_key: credentials.private_key,
        client_email: credentials.client_email,
      };
      // result.lease_duration 单位是秒
      this.gcpCredsExpiresAt = Date.now() + result.lease_duration * 1000;

      logger.info('Successfully fetched new GCP credentials from Vault.');
      return this.gcpCredsCache;
    } catch (error) {
      logger.error({ err: error }, 'Failed to fetch GCP credentials from Vault.');
      throw new Error('Vault GCP credential fetch failed');
    }
  }

  // 为 Pulsar 获取 JWT
  public async getPulsarCredentials(): Promise<PulsarCredentials> {
    if (this.pulsarCredsCache && Date.now() < this.pulsarCredsExpiresAt - 5 * 60 * 1000) {
      return this.pulsarCredsCache;
    }

    try {
      const result = await this.vault.write(config.vault.pulsarTokenPath, {
        client_id: config.vault.pulsarClientId,
      });

      this.pulsarCredsCache = { token: result.data.token };
      this.pulsarCredsExpiresAt = Date.now() + result.lease_duration * 1000;

      logger.info('Successfully fetched new Pulsar JWT from Vault.');
      return this.pulsarCredsCache;
    } catch (error) {
      logger.error({ err: error }, 'Failed to fetch Pulsar JWT from Vault.');
      throw new Error('Vault Pulsar token fetch failed');
    }
  }
}

export const vaultService = new VaultService();

c. 消息网关 (messaging.gateway.ts)
这是核心调度逻辑,根据配置决定消息的流向。

// src/gateways/messaging.gateway.ts
import { PubSub } from '@google-cloud/pubsub';
import Pulsar from 'pulsar-client';
import { vaultService } from '../services/vault.service';
import { config } from '../config';
import { logger } from '../utils/logger';

// 统一接口
interface MessagingClient {
  initialize(): Promise<void>;
  publish(payload: Buffer): Promise<string>;
  close(): Promise<void>;
}

// Pub/Sub 客户端封装
class PubSubClientWrapper implements MessagingClient {
  private pubsub: PubSub | null = null;
  private topic: any;

  async initialize(): Promise<void> {
    const credentials = await vaultService.getGcpCredentials();
    this.pubsub = new PubSub({
      projectId: config.pubsub.projectId,
      credentials,
    });
    this.topic = this.pubsub.topic(config.pubsub.topicName);
    logger.info('Pub/Sub client initialized.');
  }

  async publish(payload: Buffer): Promise<string> {
    if (!this.topic) throw new Error('Pub/Sub client not initialized.');
    try {
      const messageId = await this.topic.publishMessage({ data: payload });
      return `pubsub:${messageId}`;
    } catch (error) {
      logger.error({ err: error }, 'Failed to publish to Pub/Sub');
      // 这里的错误处理至关重要,可能需要重试或告警
      throw error;
    }
  }

  async close(): Promise<void> {
    await this.pubsub?.close();
  }
}

// Pulsar 客户端封装
class PulsarClientWrapper implements MessagingClient {
  private client: Pulsar.Client | null = null;
  private producer: Pulsar.Producer | null = null;

  async initialize(): Promise<void> {
    const { token } = await vaultService.getPulsarCredentials();
    this.client = new Pulsar.Client({
      serviceUrl: config.pulsar.serviceUrl,
      authentication: new Pulsar.AuthenticationToken({ token }),
      // 生产环境应配置 TLS
    });
    this.producer = await this.client.createProducer({
      topic: config.pulsar.topicName,
      sendTimeoutMs: 30000,
      batchingEnabled: true,
    });
    logger.info('Pulsar client initialized.');
  }

  async publish(payload: Buffer): Promise<string> {
    if (!this.producer) throw new Error('Pulsar client not initialized.');
    try {
      const messageId = await this.producer.send({ data: payload });
      return `pulsar:${messageId.toString()}`;
    } catch (error) {
      logger.error({ err: error }, 'Failed to publish to Pulsar');
      throw error;
    }
  }

  async close(): Promise<void> {
    await this.producer?.close();
    await this.client?.close();
  }
}


// 核心网关
export class MessagingGateway {
  private pubsubClient: PubSubClientWrapper;
  private pulsarClient: PulsarClientWrapper;
  private writeMode: string;

  constructor() {
    this.pubsubClient = new PubSubClientWrapper();
    this.pulsarClient = new PulsarClientWrapper();
    this.writeMode = config.migration.writeMode;
  }
  
  // 初始化所有需要的客户端
  async initialize(): Promise<void> {
    const initializers: Promise<void>[] = [];
    if (this.writeMode === 'pubsub_only' || this.writeMode === 'dual_write') {
        initializers.push(this.pubsubClient.initialize());
    }
    if (this.writeMode === 'pulsar_only' || this.writeMode === 'dual_write') {
        initializers.push(this.pulsarClient.initialize());
    }
    await Promise.all(initializers);
    logger.info(`MessagingGateway initialized in '${this.writeMode}' mode.`);
  }

  // 发布逻辑,根据模式决定行为
  async publish(payload: object): Promise<string[]> {
    const dataBuffer = Buffer.from(JSON.stringify(payload));
    const publishPromises: Promise<string>[] = [];

    if (this.writeMode === 'pubsub_only' || this.writeMode === 'dual_write') {
      publishPromises.push(this.pubsubClient.publish(dataBuffer));
    }
    
    if (this.writeMode === 'pulsar_only' || this.writeMode === 'dual_write') {
      publishPromises.push(this.pulsarClient.publish(dataBuffer));
    }

    if (publishPromises.length === 0) {
      throw new Error(`Invalid writeMode: ${this.writeMode}`);
    }

    // 在双写模式下,我们必须处理其中一个失败的情况。
    // Promise.allSettled 让我们能知道每个 promise 的结果,无论成功或失败。
    const results = await Promise.allSettled(publishPromises);

    const successfulIds: string[] = [];
    const errors: any[] = [];
    
    results.forEach(result => {
        if (result.status === 'fulfilled') {
            successfulIds.push(result.value);
        } else {
            errors.push(result.reason);
        }
    });

    // 关键:如果存在任何错误,必须抛出,让上层业务决定如何处理。
    // 不能因为一个成功就认为整个操作成功,这会导致数据不一致。
    if (errors.length > 0) {
        logger.error({ errors, successfulIds }, 'Partial failure in dual_write mode.');
        throw new Error('Dual write failed for one or more targets.');
    }

    return successfulIds;
  }
  
  async close(): Promise<void> {
      await Promise.all([this.pubsubClient.close(), this.pulsarClient.close()]);
  }
}

局限性与未来迭代路径

这套架构成功地为我们提供了一个安全、可控的迁移路径,但它并非没有代价。最显著的代价是应用复杂性的增加。MessagingGateway 需要维护两套客户端连接,处理更复杂的错误场景,尤其是在双写模式下,一次发布请求的延迟是两个系统中较慢者的延迟,并且失败的概率也更高。

其次,运维 Pulsar 和 Vault 本身是一项重大的投入。虽然长期成本可控,但初期的人力、技术储备和硬件资源投入是必须正视的。在真实项目中,我们需要为这两个核心基础设施组件建立起完善的监控、告警和灾备预案。

未来的迭代方向很明确。一旦数据消费端完全切换到 Pulsar,并且经过一段时间的观察,确认数据一致性无误后,我们会将 MESSAGING_WRITE_MODE 切换为 pulsar_only。在确认系统稳定运行后,与 Pub/Sub 相关的所有代码、配置以及 Vault 中的 GCP Secrets Engine 都可以被安全地移除,从而简化应用架构,完成整个迁移生命周期。这个抽象层也为未来可能接入其他消息系统(如 Kafka)预留了扩展点,使我们的系统在面对未来的技术演进时能更加从容。


  目录