在真实项目中,技术选型很少是一次性的终点。随着业务规模的增长和成本结构的变化,原先看似完美的方案可能会成为新的瓶颈。我们面临的正是这样一个场景:核心业务严重依赖 Google Cloud Pub/Sub,其稳定性和易用性在早期阶段为我们提供了巨大价值。然而,当日均消息量突破百亿级别后,其成本已经攀升至一个无法忽视的水平,同时,单一云厂商的锁定也为我们未来的多云战略带来了风险。
经过评估,我们决定将消息系统逐步迁移至自建的 Apache Pulsar 集群。Pulsar 凭借其分层存储、多租户和强大的跨地域复制能力,为我们提供了更优的成本控制和架构灵活性。但挑战也随之而来:如何在一个持续处理高并发请求的生产环境中,实现从 Pub/Sub 到 Pulsar 的零停机、数据无损的平滑迁移?
直接切换是不可行的,任何形式的停机都将导致业务损失。唯一的出路是构建一个能够同时与两个系统交互的过渡性架构,在应用层实现双写和逐步切读,并确保整个过程的安全性。这意味着我们的核心应用需要一种机制,能够动态、安全地获取并管理两套完全不同的凭证体系:GCP 的服务账户密钥和 Pulsar 的认证 JWT。
架构决策:为何不是简单的双系统并存
方案A是维持现状,继续使用 Google Cloud Pub/Sub。其优势显而易见:完全托管,无需运维,与 GCP 生态无缝集成。但劣势也同样突出:成本与消息量强相关,在我们的场景下已成为第二大云开销;技术栈锁定在GCP,不利于后续的混合云部署;对于消息保留策略、多租户隔离等高级特性的控制力较弱。
方案B是迁移至自建 Apache Pulsar。优势在于:开源,无厂商锁定,长期来看总拥有成本(TCO)更低;架构先进,BookKeeper 提供的 IO 隔离和分层存储能力非常适合我们的业务场景;支持多租户和跨地域复制,为未来的架构扩展铺平了道路。其劣势在于:需要投入大量的运维精力来保障集群的稳定性、可扩展性和安全性;技术栈更复杂,团队需要时间来熟悉 ZooKeeper、BookKeeper 和 Broker 的协同工作原理。
我们的最终决策是选择方案B,但必须通过一个精心设计的过渡架构来对冲其风险。这个架构的核心目标是:
- 应用层无感知:上层业务逻辑不应关心当前消息是发往 Pub/Sub 还是 Pulsar。
- 安全性优先:密钥管理必须集中化、自动化,杜绝硬编码或静态配置文件中的凭证。短生命周期的动态密钥是基本要求。
- 可观测性:必须能够清晰地监控双写两端的消息延迟、成功率和失败率。
- 可控的切换:能够通过简单的配置变更,灵活控制双写的开启/关闭,以及消费端的数据源切换。
基于以上目标,我们设计的混合消息网关架构如下:
graph TD
subgraph "客户端请求"
UserRequest
end
subgraph "入口层"
Nginx[Nginx Reverse Proxy]
end
subgraph "应用层 (Fastify Service)"
App[Fastify App]
App -- HTTP Request --> Controller
Controller -- "publishMessage(data)" --> MessagingGateway
MessagingGateway -- "getCredentials('gcp')" --> VaultClient
MessagingGateway -- "getCredentials('pulsar')" --> VaultClient
MessagingGateway -- "双写" --> PubSubClient
MessagingGateway -- "双写" --> PulsarClient
end
subgraph "安全层"
Vault[HashiCorp Vault]
Vault -- "动态生成" --> GCP_SA_Key[GCP Service Account Key]
Vault -- "动态生成" --> Pulsar_JWT[Pulsar JWT Token]
end
subgraph "消息系统"
PubSub[Google Cloud Pub/Sub]
Pulsar[Apache Pulsar Cluster]
end
UserRequest --> Nginx
Nginx --> App
VaultClient -- "API Call" --> Vault
PubSubClient -- "Push Message" --> PubSub
PulsarClient -- "Push Message" --> Pulsar
这个架构中,Fastify 应用是核心,它内部实现了一个 MessagingGateway,负责封装与两个消息系统的所有交互。而所有凭证的获取,都通过 VaultClient 向 HashiCorp Vault 请求动态生成的短期密钥。Nginx 则作为安全网关,负责 TLS 卸载和请求路由。
核心实现:动态密钥管理与消息网关
1. Vault 的配置:动态密钥的源头
在真实项目中,静态的、长期的密钥是巨大的安全隐患。我们使用 Vault 来根除这个问题。
首先,配置 Vault 的 GCP Secrets Engine,使其能够动态生成有时效性的 GCP 服务账户密钥。
# main.tf for Vault GCP Secret Engine configuration
# 启用 GCP secrets engine
resource "vault_mount" "gcp" {
path = "gcp"
type = "gcp"
description = "Dynamic GCP credentials for Pub/Sub"
}
# 配置 GCP secrets engine
# 需要预先提供一个有权限创建和管理服务账户密钥的 Vault 服务账户
resource "vault_gcp_secret_backend" "config" {
mount = vault_mount.gcp.path
credentials = file("path/to/your/vault-sa-credentials.json")
}
# 定义一个角色,应用通过这个角色来申请密钥
# 这个角色绑定的服务账户拥有对特定 Pub/Sub 主题的发布权限
resource "vault_gcp_secret_roleset" "pubsub_publisher" {
backend = vault_gcp_secret_backend.config.backend
name = "pubsub-publisher-role"
project = "your-gcp-project-id"
secret_type = "service_account_key"
# 绑定到一个预先创建好的,权限受限的服务账户
bindings = <<-EOT
resource "//cloudresourcemanager.googleapis.com/projects/your-gcp-project-id" {
roles = ["roles/pubsub.publisher"]
}
EOT
token_scopes = [
"https://www.googleapis.com/auth/cloud-platform",
]
# 密钥的 TTL 设置为 1 小时,最大 4 小时
ttl = 3600
max_ttl = 14400
}
应用将通过访问 Vault 的 gcp/token/pubsub-publisher-role 路径来获取一个有效期仅为1小时的服务账户私钥。
其次,为 Pulsar 配置 JWT/OIDC 认证。我们让 Vault 充当 OIDC Provider,为应用颁发携带特定角色的 JWT。
# main.tf for Vault JWT/OIDC configuration
# 启用 JWT auth method
resource "vault_auth_backend" "jwt" {
type = "jwt"
path = "jwt-pulsar"
}
# 配置 OIDC provider
resource "vault_jwt_auth_backend" "pulsar_provider" {
backend = vault_auth_backend.jwt.path
oidc_discovery_url = "http://127.0.0.1:8200" # Vault 的地址
bound_issuer = "http://127.0.0.1:8200"
}
# 创建一个角色,Fastify 应用将使用这个角色来获取 token
# 'bound_claims' 可以进一步限制哪些应用(基于其身份)可以请求
resource "vault_jwt_auth_backend_role" "app_role" {
backend = vault_auth_backend.jwt.path
role_name = "fastify-app-role"
token_policies = ["pulsar-publisher-policy"] # 关联一个能生成 Pulsar token 的策略
# 定义生成的 token 的 claims
token_type = "OIDC"
user_claim = "sub"
bound_claims = {
"aud" = "pulsar-cluster"
}
# Token 有效期 1 小时
token_ttl = 3600
}
# (Pulsar Broker 端需要配置 JWT 认证,并信任 Vault 的 OIDC key)
这样,Fastify 应用就能通过一个已认证的身份(例如 Kubernetes Service Account)向 Vault 请求一个携带 role: publisher 的 JWT,Pulsar Broker 收到后会验证该 JWT 的签名和声明,从而授予发布权限。
2. Nginx 反向代理配置
Nginx 扮演的角色不仅仅是流量转发,更是安全的第一道防线。这里的配置关注于 TLS 强化和代理的稳定性。
# /etc/nginx/conf.d/app.conf
upstream fastify_backend {
# 假设 Fastify 应用运行在 3000 端口
server 127.0.0.1:3000;
# 可以配置多个后端实例
# server app-instance-1:3000;
# server app-instance-2:3000;
# 启用长连接,减少三次握手的开销
keepalive 32;
}
server {
listen 443 ssl http2;
listen [::]:443 ssl http2;
server_name api.yourdomain.com;
# SSL/TLS 安全配置
ssl_certificate /etc/nginx/ssl/yourdomain.com.crt;
ssl_certificate_key /etc/nginx/ssl/yourdomain.com.key;
ssl_protocols TLSv1.2 TLSv1.3;
ssl_prefer_server_ciphers on;
ssl_ciphers 'ECDHE-ECDSA-AES256-GCM-SHA384:ECDHE-RSA-AES256-GCM-SHA384:ECDHE-ECDSA-CHACHA20-POLY1305:ECDHE-RSA-CHACHA20-POLY1305:ECDHE-ECDSA-AES128-GCM-SHA256:ECDHE-RSA-AES128-GCM-SHA256';
ssl_session_cache shared:SSL:10m;
ssl_session_timeout 10m;
ssl_session_tickets off;
add_header Strict-Transport-Security "max-age=31536000; includeSubDomains" always;
# 代理设置
location / {
proxy_pass http://fastify_backend;
# 传递真实客户端 IP 和协议信息
proxy_set_header Host $host;
proxy_set_header X-Real-IP $remote_addr;
proxy_set_header X-Forwarded-For $proxy_add_x_forwarded_for;
proxy_set_header X-Forwarded-Proto $scheme;
# 连接超时设置
proxy_connect_timeout 5s;
proxy_send_timeout 10s;
proxy_read_timeout 10s;
# 开启 buffering
proxy_buffering on;
proxy_buffer_size 16k;
proxy_buffers 4 64k;
}
# 日志配置
access_log /var/log/nginx/api.access.log;
error_log /var/log/nginx/api.error.log;
}
3. Fastify 消息网关实现
这是整个架构的核心,所有复杂性都被封装在这一层。
a. 配置文件 (config.ts)
我们通过环境变量来控制迁移策略。
// src/config.ts
import 'dotenv/config';
export const config = {
// 迁移总开关
migration: {
// 'pubsub_only': 只写 Pub/Sub
// 'dual_write': 双写
// 'pulsar_only': 只写 Pulsar
writeMode: process.env.MESSAGING_WRITE_MODE || 'pubsub_only',
},
pubsub: {
projectId: process.env.GCP_PROJECT_ID,
topicName: 'my-global-topic',
},
pulsar: {
serviceUrl: process.env.PULSAR_SERVICE_URL || 'pulsar://localhost:6650',
topicName: 'persistent://public/default/my-global-topic',
},
vault: {
addr: process.env.VAULT_ADDR || 'http://127.0.0.1:8200',
token: process.env.VAULT_TOKEN, // 用于开发,生产环境应使用 AppRole 或 K8s Auth
gcpCredsPath: 'gcp/token/pubsub-publisher-role',
pulsarTokenPath: 'identity/oidc/token/fastify-app-role', // Vault OIDC 路径
pulsarClientId: 'fastify-app'
},
server: {
port: parseInt(process.env.PORT || '3000', 10),
host: '0.0.0.0'
}
};
b. 凭证管理服务 (vault.service.ts)
这个服务负责与 Vault 通信,获取并缓存动态密钥。一个常见的错误是不做缓存,导致每次请求都访问 Vault,增加延迟和 Vault 的负载。
// src/services/vault.service.ts
import nodeVault from 'node-vault';
// ... (导入 config, logger 等)
interface GcpCredentials {
private_key: string;
client_email: string;
}
interface PulsarCredentials {
token: string;
}
class VaultService {
private vault: nodeVault.client;
private gcpCredsCache: GcpCredentials | null = null;
private pulsarCredsCache: PulsarCredentials | null = null;
private gcpCredsExpiresAt: number = 0;
private pulsarCredsExpiresAt: number = 0;
constructor() {
this.vault = nodeVault({
apiVersion: 'v1',
endpoint: config.vault.addr,
token: config.vault.token,
});
}
// 为 GCP 获取服务账户密钥
public async getGcpCredentials(): Promise<GcpCredentials> {
// 缓冲期,在密钥过期前 5 分钟就刷新
if (this.gcpCredsCache && Date.now() < this.gcpCredsExpiresAt - 5 * 60 * 1000) {
return this.gcpCredsCache;
}
try {
const result = await this.vault.read(config.vault.gcpCredsPath);
const privateKeyData = Buffer.from(result.data.private_key_data, 'base64').toString('utf-8');
const credentials = JSON.parse(privateKeyData);
this.gcpCredsCache = {
private_key: credentials.private_key,
client_email: credentials.client_email,
};
// result.lease_duration 单位是秒
this.gcpCredsExpiresAt = Date.now() + result.lease_duration * 1000;
logger.info('Successfully fetched new GCP credentials from Vault.');
return this.gcpCredsCache;
} catch (error) {
logger.error({ err: error }, 'Failed to fetch GCP credentials from Vault.');
throw new Error('Vault GCP credential fetch failed');
}
}
// 为 Pulsar 获取 JWT
public async getPulsarCredentials(): Promise<PulsarCredentials> {
if (this.pulsarCredsCache && Date.now() < this.pulsarCredsExpiresAt - 5 * 60 * 1000) {
return this.pulsarCredsCache;
}
try {
const result = await this.vault.write(config.vault.pulsarTokenPath, {
client_id: config.vault.pulsarClientId,
});
this.pulsarCredsCache = { token: result.data.token };
this.pulsarCredsExpiresAt = Date.now() + result.lease_duration * 1000;
logger.info('Successfully fetched new Pulsar JWT from Vault.');
return this.pulsarCredsCache;
} catch (error) {
logger.error({ err: error }, 'Failed to fetch Pulsar JWT from Vault.');
throw new Error('Vault Pulsar token fetch failed');
}
}
}
export const vaultService = new VaultService();
c. 消息网关 (messaging.gateway.ts)
这是核心调度逻辑,根据配置决定消息的流向。
// src/gateways/messaging.gateway.ts
import { PubSub } from '@google-cloud/pubsub';
import Pulsar from 'pulsar-client';
import { vaultService } from '../services/vault.service';
import { config } from '../config';
import { logger } from '../utils/logger';
// 统一接口
interface MessagingClient {
initialize(): Promise<void>;
publish(payload: Buffer): Promise<string>;
close(): Promise<void>;
}
// Pub/Sub 客户端封装
class PubSubClientWrapper implements MessagingClient {
private pubsub: PubSub | null = null;
private topic: any;
async initialize(): Promise<void> {
const credentials = await vaultService.getGcpCredentials();
this.pubsub = new PubSub({
projectId: config.pubsub.projectId,
credentials,
});
this.topic = this.pubsub.topic(config.pubsub.topicName);
logger.info('Pub/Sub client initialized.');
}
async publish(payload: Buffer): Promise<string> {
if (!this.topic) throw new Error('Pub/Sub client not initialized.');
try {
const messageId = await this.topic.publishMessage({ data: payload });
return `pubsub:${messageId}`;
} catch (error) {
logger.error({ err: error }, 'Failed to publish to Pub/Sub');
// 这里的错误处理至关重要,可能需要重试或告警
throw error;
}
}
async close(): Promise<void> {
await this.pubsub?.close();
}
}
// Pulsar 客户端封装
class PulsarClientWrapper implements MessagingClient {
private client: Pulsar.Client | null = null;
private producer: Pulsar.Producer | null = null;
async initialize(): Promise<void> {
const { token } = await vaultService.getPulsarCredentials();
this.client = new Pulsar.Client({
serviceUrl: config.pulsar.serviceUrl,
authentication: new Pulsar.AuthenticationToken({ token }),
// 生产环境应配置 TLS
});
this.producer = await this.client.createProducer({
topic: config.pulsar.topicName,
sendTimeoutMs: 30000,
batchingEnabled: true,
});
logger.info('Pulsar client initialized.');
}
async publish(payload: Buffer): Promise<string> {
if (!this.producer) throw new Error('Pulsar client not initialized.');
try {
const messageId = await this.producer.send({ data: payload });
return `pulsar:${messageId.toString()}`;
} catch (error) {
logger.error({ err: error }, 'Failed to publish to Pulsar');
throw error;
}
}
async close(): Promise<void> {
await this.producer?.close();
await this.client?.close();
}
}
// 核心网关
export class MessagingGateway {
private pubsubClient: PubSubClientWrapper;
private pulsarClient: PulsarClientWrapper;
private writeMode: string;
constructor() {
this.pubsubClient = new PubSubClientWrapper();
this.pulsarClient = new PulsarClientWrapper();
this.writeMode = config.migration.writeMode;
}
// 初始化所有需要的客户端
async initialize(): Promise<void> {
const initializers: Promise<void>[] = [];
if (this.writeMode === 'pubsub_only' || this.writeMode === 'dual_write') {
initializers.push(this.pubsubClient.initialize());
}
if (this.writeMode === 'pulsar_only' || this.writeMode === 'dual_write') {
initializers.push(this.pulsarClient.initialize());
}
await Promise.all(initializers);
logger.info(`MessagingGateway initialized in '${this.writeMode}' mode.`);
}
// 发布逻辑,根据模式决定行为
async publish(payload: object): Promise<string[]> {
const dataBuffer = Buffer.from(JSON.stringify(payload));
const publishPromises: Promise<string>[] = [];
if (this.writeMode === 'pubsub_only' || this.writeMode === 'dual_write') {
publishPromises.push(this.pubsubClient.publish(dataBuffer));
}
if (this.writeMode === 'pulsar_only' || this.writeMode === 'dual_write') {
publishPromises.push(this.pulsarClient.publish(dataBuffer));
}
if (publishPromises.length === 0) {
throw new Error(`Invalid writeMode: ${this.writeMode}`);
}
// 在双写模式下,我们必须处理其中一个失败的情况。
// Promise.allSettled 让我们能知道每个 promise 的结果,无论成功或失败。
const results = await Promise.allSettled(publishPromises);
const successfulIds: string[] = [];
const errors: any[] = [];
results.forEach(result => {
if (result.status === 'fulfilled') {
successfulIds.push(result.value);
} else {
errors.push(result.reason);
}
});
// 关键:如果存在任何错误,必须抛出,让上层业务决定如何处理。
// 不能因为一个成功就认为整个操作成功,这会导致数据不一致。
if (errors.length > 0) {
logger.error({ errors, successfulIds }, 'Partial failure in dual_write mode.');
throw new Error('Dual write failed for one or more targets.');
}
return successfulIds;
}
async close(): Promise<void> {
await Promise.all([this.pubsubClient.close(), this.pulsarClient.close()]);
}
}
局限性与未来迭代路径
这套架构成功地为我们提供了一个安全、可控的迁移路径,但它并非没有代价。最显著的代价是应用复杂性的增加。MessagingGateway 需要维护两套客户端连接,处理更复杂的错误场景,尤其是在双写模式下,一次发布请求的延迟是两个系统中较慢者的延迟,并且失败的概率也更高。
其次,运维 Pulsar 和 Vault 本身是一项重大的投入。虽然长期成本可控,但初期的人力、技术储备和硬件资源投入是必须正视的。在真实项目中,我们需要为这两个核心基础设施组件建立起完善的监控、告警和灾备预案。
未来的迭代方向很明确。一旦数据消费端完全切换到 Pulsar,并且经过一段时间的观察,确认数据一致性无误后,我们会将 MESSAGING_WRITE_MODE 切换为 pulsar_only。在确认系统稳定运行后,与 Pub/Sub 相关的所有代码、配置以及 Vault 中的 GCP Secrets Engine 都可以被安全地移除,从而简化应用架构,完成整个迁移生命周期。这个抽象层也为未来可能接入其他消息系统(如 Kafka)预留了扩展点,使我们的系统在面对未来的技术演进时能更加从容。