基于 Go 与 NATS JetStream 构建协调海量 Playwright 实例的分布式压测架构


问题起源于一次复盘。我们传统的后端性能压测,例如使用 JMeter 或 k6,虽然能精确模拟 API 负载,但完全忽略了一个关键环节:真实的用户体验。现代 Web 应用的性能瓶颈早已不单单是服务端响应时间,前端渲染、JavaScript 执行、资源加载等共同构成了用户感知的“慢”。单纯压测 API,得出的 P99 响应 50ms 的结论,在用户浏览器白屏 3 秒的现实面前显得毫无意义。

为了弥补这个鸿沟,我们最初尝试在 CI 流程中引入 Playwright 进行端到端测试,捕获核心用户旅程的性能指标,如 LCP (Largest Contentful Paint)。但这带来了新问题:单个 Playwright 实例的测试是串行的、缓慢的,无法形成“压力”。我们需要同时模拟成百上千个真实用户,这意味着要同时运行成百上千个独立的浏览器实例。这就将问题从“如何测试”转变成了“如何构建一个大规模浏览器调度系统”。

初步构想与技术选型风暴

我们的目标是建立一个能够按需启动、协调并回收数千个 Playwright 实例的平台。它需要一个控制平面来分发任务,一个执行单元(Worker)来运行浏览器,以及一个可靠的通信主干。

1. 控制平面 (Control Plane) 的编程语言选型

最初的讨论集中在 Node.js/TypeScript 上,因为 Playwright 本身就是基于 Node.js 生态。这似乎是“顺理成章”的选择。但在真实项目中,控制平面的核心职责是高并发调度与状态管理,而不是执行业务逻辑。它需要同时与成千上万的 Worker 建立长连接,处理任务分发、心跳、结果回收。

在这一点上,Go 的优势变得极为突出。它的 Goroutine 和 Channel 机制为构建高并发网络服务提供了语言级别的支持。为每个 Worker 连接创建一个 Goroutine 的开销极小,远低于 Node.js 中管理同等数量的并发连接所需的复杂性。此外,Go 编译后的静态二进制文件部署简单,无运行时依赖,这对于基础设施组件来说是个巨大的加分项。最终,我们决定使用 Go 来构建这个核心控制平面。

2. 通信中间件:HTTP, gRPC 还是消息队列?

  • HTTP/WebSocket: 让 Worker 反向连接 Master。这种模型在规模扩大时,Master 需要维护海量长连接,状态管理复杂,且 Master 成为单点瓶颈。
  • gRPC: 性能优越,但同样存在上述长连接管理问题。
  • 消息队列: 这是最适合的解耦方案。Master 作为生产者,将压测任务投递到队列中。Worker 作为消费者,按自身处理能力从队列中拉取任务。Master 无需关心 Worker 的具体位置、数量和存活状态。

在消息队列选型上,我们排除了 RabbitMQ 和 Kafka。RabbitMQ 过于重,而 Kafka 虽然性能卓越,但其基于分区日志的模式对于我们这种“任务分发-执行-丢弃”的场景来说,配置和运维都过于复杂。

最终,我们选择了 NATS,更具体地说是 NATS JetStream。NATS 本身是一个极其轻量和高性能的核心消息系统,而 JetStream 在其之上提供了持久化、至少一次送达保证和工作队列(Work-Queue)消费模型。这完美契合我们的需求:

  • 持久化流 (Stream): 即使所有 Worker 都离线,任务也不会丢失。
  • 消费者组 (Consumer Group): 多个 Worker 实例可以订阅同一个流,但每个任务消息只会被组内的一个 Worker 消费。这天然实现了任务的负载均衡。
  • ACK 机制: Worker 完成任务后发送确认,NATS 才会将消息从队列中删除。如果 Worker 中途崩溃,消息会在超时后被重新投递给另一个健康的 Worker,保证了任务的最终执行。

3. 结果存储:为什么是文档型 NoSQL

压测任务的配置(我们称之为 TestPlan)和压测产生的结果数据结构都比较复杂且可能演进。TestPlan 可能包含多个步骤、自定义的 JS 脚本、header 等。结果数据则包含各种性能指标(FCP, LCP, TTI)、截图、Trace 文件路径等。

使用关系型数据库需要设计复杂的表结构,每次调整 TestPlan 结构都可能引发痛苦的 schema migration。而文档型数据库,如 MongoDB,则非常适合。它可以直接存储 JSON 格式的 TestPlan 和结果,模式灵活,查询和聚合能力强大,便于后续对压测结果进行多维度分析。

架构图谱

经过选型,我们的系统架构逐渐清晰。

graph TD
    subgraph User Interaction
        A[CLI / API Client] -- HTTP POST --> B(Control Plane - Go Service)
    end

    subgraph Core Infrastructure
        B -- 1. Store TestPlan --> D[MongoDB]
        B -- 2. Publish Tasks --> C{NATS JetStream}
        C -- 3. Push Task --> E[Playwright Worker Fleet]
        E -- 4. Publish Result --> C
        F(Result Collector - Go Service) -- 5. Consume Result --> C
        F -- 6. Store Result --> D
    end

    subgraph Scalable Workers
        E -- Run in Docker/K8s --> P1[Worker 1: Node.js + Playwright]
        E -- Run in Docker/K8s --> P2[Worker 2: Node.js + Playwright]
        E -- Run in Docker/K8s --> Pn[Worker N: Node.js + Playwright]
    end

    style E fill:#f9f,stroke:#333,stroke-width:2px

流程如下:

  1. 用户通过 API 提交一个 TestPlan(例如:模拟 1000 个用户,在 10 分钟内完成注册流程)。
  2. Go 编写的控制平面接收请求,将 TestPlan 存入 MongoDB,并根据并发数和持续时间,生成数千个独立的 Task 消息。
  3. 控制平面将这些 Task 消息发布到 NATS JetStream 的 TASKS 流中。
  4. Playwright Worker 集群作为消费者组订阅 TASKS 流。NATS 自动将任务分发给空闲的 Worker。
  5. 每个 Worker 收到任务后,启动一个无头浏览器实例执行 Playwright 脚本,收集性能指标。
  6. 执行完毕后,Worker 将包含性能数据和状态的 Result 消息发布到 NATS 的 RESULTS 流中。
  7. 另一个 Go 服务(Result Collector)专门订阅 RESULTS 流,将结果持久化到 MongoDB 中进行聚合与分析。

核心实现:代码即架构

理论讲得再多,不如直接看代码。这里的代码是生产级的简化版,包含了完整的错误处理和关键逻辑。

1. 控制平面 (Go): 任务分发器

这个服务负责接收 HTTP 请求,并向 NATS JetStream 发布任务。

main.go

package main

import (
	"context"
	"encoding/json"
	"log"
	"net/http"
	"os"
	"time"

	"github.com/google/uuid"
	"github.com/nats-io/nats.go"
	"go.mongodb.org/mongo-driver/mongo"
	"go.mongodb.org/mongo-driver/mongo/options"
)

const (
	natsURL      = "nats://localhost:4222"
	mongoURI     = "mongodb://localhost:27017"
	streamName   = "PERF_TESTS"
	tasksSubject = "PERF_TESTS.tasks"
	dbName       = "perftest"
	planColl     = "test_plans"
)

// TestPlan defines the structure for a performance test run.
type TestPlan struct {
	ID            string   `json:"id" bson:"_id"`
	Name          string   `json:"name" bson:"name"`
	TargetURL     string   `json:"target_url" bson:"target_url"`
	Concurrency   int      `json:"concurrency" bson:"concurrency"` // Number of parallel users
	DurationSecs  int      `json:"duration_secs" bson:"duration_secs"`
	PlaywrightScript string `json:"playwright_script" bson:"playwright_script"` // Base64 encoded script
	CreatedAt     time.Time `json:"created_at" bson:"created_at"`
}

// Task is the individual work unit for a worker.
type Task struct {
	TaskID    string `json:"task_id"`
	PlanID    string `json:"plan_id"`
	TargetURL string `json:"target_url"`
	Script    string `json:"script"`
}

type APIServer struct {
	js     nats.JetStreamContext
	dbClient *mongo.Client
}

func main() {
	// --- NATS JetStream Connection ---
	nc, err := nats.Connect(natsURL)
	if err != nil {
		log.Fatalf("Failed to connect to NATS: %v", err)
	}
	defer nc.Close()

	js, err := nc.JetStream(nats.PublishAsyncMaxPending(256))
	if err != nil {
		log.Fatalf("Failed to create JetStream context: %v", err)
	}

	// Ensure stream exists
	_, err = js.StreamInfo(streamName)
	if err != nil {
		log.Printf("Stream '%s' not found, creating it...", streamName)
		_, err = js.AddStream(&nats.StreamConfig{
			Name:     streamName,
			Subjects: []string{tasksSubject, "PERF_TESTS.results"}, // Subject for tasks and results
		})
		if err != nil {
			log.Fatalf("Failed to create stream: %v", err)
		}
	}
	
	// --- MongoDB Connection ---
	ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
	defer cancel()
	client, err := mongo.Connect(ctx, options.Client().ApplyURI(mongoURI))
	if err != nil {
		log.Fatalf("Failed to connect to MongoDB: %v", err)
	}
	defer client.Disconnect(ctx)

	server := &APIServer{js: js, dbClient: client}

	http.HandleFunc("/submit", server.handleSubmit)
	log.Println("API Server listening on :8080...")
	log.Fatal(http.ListenAndServe(":8080", nil))
}

func (s *APIServer) handleSubmit(w http.ResponseWriter, r *http.Request) {
	if r.Method != http.MethodPost {
		http.Error(w, "Method not allowed", http.StatusMethodNotAllowed)
		return
	}

	var plan TestPlan
	if err := json.NewDecoder(r.Body).Decode(&plan); err != nil {
		http.Error(w, "Invalid request body", http.StatusBadRequest)
		return
	}

	plan.ID = uuid.New().String()
	plan.CreatedAt = time.Now().UTC()

	// --- Persist the TestPlan ---
	collection := s.dbClient.Database(dbName).Collection(planColl)
	ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
	defer cancel()
	_, err := collection.InsertOne(ctx, plan)
	if err != nil {
		log.Printf("ERROR: Failed to save TestPlan %s: %v", plan.ID, err)
		http.Error(w, "Failed to save test plan", http.StatusInternalServerError)
		return
	}

	log.Printf("Accepted TestPlan: %s (%s), dispatching %d tasks.", plan.ID, plan.Name, plan.Concurrency)

	// --- Dispatch Tasks ---
	// In a real-world scenario, you'd have more sophisticated logic for ramp-up, etc.
	// Here we dispatch all tasks at once for simplicity.
	for i := 0; i < plan.Concurrency; i++ {
		task := Task{
			TaskID:    uuid.New().String(),
			PlanID:    plan.ID,
			TargetURL: plan.TargetURL,
			Script:    plan.PlaywrightScript,
		}
		taskJSON, _ := json.Marshal(task)

		// Publish async for max throughput
		// NATS client library handles buffering and retries.
		_, err := s.js.Publish(tasksSubject, taskJSON)
		if err != nil {
			// This might happen if the publish buffer is full.
			log.Printf("WARN: Failed to publish task for plan %s: %v", plan.ID, err)
		}
	}

	w.WriteHeader(http.StatusAccepted)
	json.NewEncoder(w).Encode(map[string]string{"plan_id": plan.ID})
}

关键点分析:

  • 幂等性与流创建: 我们在启动时检查 Stream 是否存在,不存在则创建。这使得服务可以无状态地重启。
  • 异步发布: js.Publish 是一个异步操作。为了达到高吞吐量,我们不等待每次发布的确认。NATS 客户端库在后台处理这些。这里的坑在于,如果 NATS 服务器短暂不可用,发布缓冲区可能会满,导致错误。生产环境中需要监控这种情况。
  • 任务粒度: 我们将一个大的 TestPlan 拆分成多个小的 Task。每个 Task 都是一个 Worker 可以独立执行的单元。这正是分布式处理的核心思想。

2. Playwright Worker (Node.js/TypeScript): 执行单元

Worker 是无状态的计算节点,可以水平扩展。它从 NATS 拉取任务,执行,然后上报结果。

worker.ts

import { connect, JSONCodec, NatsConnection, Subscription } from 'nats';
import { chromium, Browser, Page } from 'playwright';
import { v4 as uuidv4 } from 'uuid';

// --- Configuration ---
const NATS_URL = process.env.NATS_URL || "nats://localhost:4222";
const TASKS_SUBJECT = "PERF_TESTS.tasks";
const RESULTS_SUBJECT = "PERF_TESTS.results";
const STREAM_NAME = "PERF_TESTS";
const CONSUMER_NAME = "playwright-worker-group"; // All workers share this name
const WORKER_ID = `worker-${uuidv4()}`;

// --- Types ---
interface Task {
    task_id: string;
    plan_id: string;
    target_url: string;
    script: string; // Base64 encoded Playwright script
}

interface Result {
    worker_id: string;
    task_id: string;
    plan_id: string;
    status: 'success' | 'failure';
    error_message?: string;
    metrics: {
        fcp?: number;
        lcp?: number;
        dom_load?: number;
        total_duration_ms: number;
    };
    timestamp: string;
}

const jc = JSONCodec();

async function main() {
    let browser: Browser | null = null;
    let nc: NatsConnection | null = null;
    
    console.log(`[${WORKER_ID}] Starting up...`);

    try {
        // --- Connect to NATS and Playwright ---
        nc = await connect({ servers: NATS_URL });
        const js = nc.jetstream();

        browser = await chromium.launch({ headless: true });
        console.log(`[${WORKER_ID}] Connected to NATS and browser launched.`);

        // --- Create a durable, pull-based consumer ---
        // This is key for a work-queue pattern.
        // It's "pull-based" in the sense that we control the rate of message fetching.
        const sub = await js.subscribe(TASKS_SUBJECT, {
            queue: CONSUMER_NAME, // This creates the consumer group
            durable: CONSUMER_NAME, // The consumer's state is persisted
            ack_policy: 'explicit', // We must explicitly ack messages
            ack_wait: 30000, // 30 seconds to process a message
        });

        console.log(`[${WORKER_ID}] Subscribed to subject '${TASKS_SUBJECT}' with consumer group '${CONSUMER_NAME}'. Waiting for tasks...`);

        // --- Main processing loop ---
        for await (const msg of sub) {
            const startTime = performance.now();
            const task = jc.decode(msg.data) as Task;
            console.log(`[${WORKER_ID}] Received task ${task.task_id} for plan ${task.plan_id}`);

            let page: Page | null = null;
            const result: Partial<Result> = {
                worker_id: WORKER_ID,
                task_id: task.task_id,
                plan_id: task.plan_id,
                timestamp: new Date().toISOString(),
                metrics: { total_duration_ms: 0 },
            };

            try {
                page = await browser.newPage();
                
                // A common mistake is to not handle navigation timeouts.
                // In a stress test, the target site WILL be slow.
                await page.goto(task.target_url, { waitUntil: 'domcontentloaded', timeout: 20000 });

                // In a real system, the script would be more complex.
                // Here we simulate getting performance metrics.
                const perfTimings = await page.evaluate(() => {
                    const paintTimings = performance.getEntriesByType('paint');
                    const fcp = paintTimings.find(entry => entry.name === 'first-contentful-paint')?.startTime;
                    const lcpEntry = performance.getEntriesByType('largest-contentful-paint').pop();
                    const lcp = lcpEntry?.startTime;
                    
                    const navTiming = performance.getEntriesByType('navigation')[0] as PerformanceNavigationTiming;
                    const domLoad = navTiming.domContentLoadedEventEnd - navTiming.fetchStart;

                    return { fcp, lcp, domLoad };
                });
                
                result.status = 'success';
                result.metrics = { ...perfTimings, total_duration_ms: 0 }; // duration will be set finally

            } catch (err: any) {
                console.error(`[${WORKER_ID}] Task ${task.task_id} failed:`, err.message);
                result.status = 'failure';
                result.error_message = err.message.substring(0, 500); // Truncate long errors
            } finally {
                if (page) {
                    await page.close();
                }
                result.metrics!.total_duration_ms = performance.now() - startTime;
                
                // Publish the result and then ACK the message.
                // This ensures "at-least-once" delivery. If publishing fails, we don't ACK.
                await js.publish(RESULTS_SUBJECT, jc.encode(result));
                msg.ack();
                console.log(`[${WORKER_ID}] Completed task ${task.task_id}, result published.`);
            }
        }
    } catch (err) {
        console.error(`[${WORKER_ID}] A critical error occurred:`, err);
    } finally {
        await browser?.close();
        await nc?.close();
        console.log(`[${WORKER_ID}] Shutting down.`);
        process.exit(1); // Exit to allow orchestrator (like K8s) to restart it.
    }
}

main();

package.json

{
  "name": "playwright-worker",
  "version": "1.0.0",
  "main": "worker.js",
  "scripts": {
    "start": "ts-node worker.ts"
  },
  "dependencies": {
    "nats": "^2.18.0",
    "playwright": "^1.40.0",
    "uuid": "^9.0.1"
  },
  "devDependencies": {
    "@types/node": "^20.10.0",
    "@types/uuid": "^9.0.7",
    "ts-node": "^10.9.1",
    "typescript": "^5.3.2"
  }
}

关键点分析:

  • 消费者组 (Consumer Group): queue: CONSUMER_NAME 是实现工作队列的核心。所有使用相同 queue 名的订阅者构成一个组,消息会被负载均衡到组内成员。
  • 持久消费者 (Durable Consumer): durable: CONSUMER_NAME 告诉 JetStream 记住这个消费者的状态(比如它处理到哪条消息了)。即使所有 Worker 都下线再上线,它们也能从上次中断的地方继续处理,而不是从头开始。
  • 显式 ACK: ack_policy: 'explicit'msg.ack() 构成了可靠性的基石。我们必须在处理完任务 并成功上报结果后,才告诉 NATS 这个消息可以删了。如果在 ack() 之前 Worker 崩溃,NATS 会在 ack_wait 超时后将该消息重新投递给组内的其他 Worker。
  • 资源管理: 浏览器实例 browser 在 Worker 启动时创建,并在 Worker 的生命周期内复用。为每个任务都 launch 一个新浏览器开销太大。page 则是每个任务隔离的。这是一个重要的性能优化。

3. 数据持久化 (Go): 结果收集器

这个服务很简单,它扮演着数据管道中最后一环的角色,将 NATS 中的临时结果数据转存到 MongoDB 长期存储。

collector.go

package main

import (
	"context"
	"encoding/json"
	"log"
	"os"
	"time"

	"github.com/nats-io/nats.go"
	"go.mongodb.org/mongo-driver/bson"
	"go.mongodb.org/mongo-driver/mongo"
	"go.mongodb.org/mongo-driver/mongo/options"
)

// Same consts as main.go
const (
    natsURL       = "nats://localhost:4222"
    mongoURI      = "mongodb://localhost:27017"
    streamName    = "PERF_TESTS"
    resultsSubject= "PERF_TESTS.results"
    dbName        = "perftest"
    resultColl    = "results"
)

type Result struct {
    // Define the same Result struct as in TypeScript
    // ...
}

func main() {
    // --- NATS Connection ---
    nc, err := nats.Connect(natsURL)
    if err != nil {
        log.Fatalf("Failed to connect to NATS: %v", err)
    }
    defer nc.Close()

    js, err := nc.JetStream()
    if err != nil {
        log.Fatalf("Failed to create JetStream context: %v", err)
    }

    // --- MongoDB Connection ---
    ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
	defer cancel()
	client, err := mongo.Connect(ctx, options.Client().ApplyURI(mongoURI))
	if err != nil {
		log.Fatalf("Failed to connect to MongoDB: %v", err)
	}
	defer client.Disconnect(ctx)
    collection := client.Database(dbName).Collection(resultColl)

    log.Println("Result Collector starting...")

    // --- Create a durable consumer for results ---
    _, err = js.Subscribe(resultsSubject, func(msg *nats.Msg) {
        var res map[string]interface{} // Use a map for flexibility
        if err := json.Unmarshal(msg.Data, &res); err != nil {
            log.Printf("ERROR: Failed to unmarshal result message: %v", err)
            msg.Nak() // Tell NATS something went wrong, maybe redeliver later
            return
        }

        ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
        defer cancel()

        // Insert into MongoDB
        _, err := collection.InsertOne(ctx, res)
        if err != nil {
            log.Printf("ERROR: Failed to insert result into MongoDB: %v. PlanID: %s", err, res["plan_id"])
            msg.Nak() // Failed to persist, ask NATS to redeliver
            return
        }

        msg.Ack() // Successfully processed
        log.Printf("INFO: Saved result for PlanID: %s, TaskID: %s", res["plan_id"], res["task_id"])

    }, nats.Durable("result-collector"), nats.AckExplicit())

    select{} // Block forever
}

局限性与未来展望

这个架构解决了一个核心问题:如何以分布式、可扩展的方式运行海量浏览器实例来执行性能测试。它足够健壮,能够处理 Worker 的动态增减和故障。然而,它并非银弹。

当前的局限性:

  1. 资源消耗: 浏览器是资源密集型应用。每个 Playwright Worker 都需要显著的 CPU 和内存。在 Kubernetes 上部署时,必须仔细配置 resource requests/limits,并利用 HPA (Horizontal Pod Autoscaler) 基于 NATS 队列积压的任务数进行弹性伸缩。
  2. 测试脚本的复杂性: 当前设计将脚本作为字符串传递,对于复杂的、多步骤的测试脚本,管理和调试会变得困难。一个改进方向是引入脚本版本管理,Worker 在启动时从代码仓库或对象存储中拉取指定版本的测试脚本。
  3. 实时反馈缺失: 这是一个批处理系统。我们提交任务,等待其完成,然后分析 MongoDB 中的结果。对于压测过程的实时监控,例如实时 QPS、P99 响应时间曲线,还需要构建一个数据流管道,将结果从 NATS 同时推送到像 InfluxDB 或 Prometheus 这样的时序数据库中。
  4. 成本问题: 在公有云上运行数千个浏览器实例一小时的成本不容忽视。必须有配套的成本控制和预算告警机制。

未来的迭代路径:
一个可行的演进方向是引入更智能的调度策略。例如,实现一个“校准”模式,系统自动增加并发数,直到观察到目标应用的性能指标(如 LCP 或错误率)达到预设阈值,从而自动发现系统的性能拐点。此外,可以将 eBPF 技术集成到 Worker 节点,以非侵入的方式收集更底层的网络和系统指标,将前端性能数据与后端基础设施表现关联起来,实现真正的“全链路”压测与分析。


  目录