问题起源于一次复盘。我们传统的后端性能压测,例如使用 JMeter 或 k6,虽然能精确模拟 API 负载,但完全忽略了一个关键环节:真实的用户体验。现代 Web 应用的性能瓶颈早已不单单是服务端响应时间,前端渲染、JavaScript 执行、资源加载等共同构成了用户感知的“慢”。单纯压测 API,得出的 P99 响应 50ms 的结论,在用户浏览器白屏 3 秒的现实面前显得毫无意义。
为了弥补这个鸿沟,我们最初尝试在 CI 流程中引入 Playwright 进行端到端测试,捕获核心用户旅程的性能指标,如 LCP (Largest Contentful Paint)。但这带来了新问题:单个 Playwright 实例的测试是串行的、缓慢的,无法形成“压力”。我们需要同时模拟成百上千个真实用户,这意味着要同时运行成百上千个独立的浏览器实例。这就将问题从“如何测试”转变成了“如何构建一个大规模浏览器调度系统”。
初步构想与技术选型风暴
我们的目标是建立一个能够按需启动、协调并回收数千个 Playwright 实例的平台。它需要一个控制平面来分发任务,一个执行单元(Worker)来运行浏览器,以及一个可靠的通信主干。
1. 控制平面 (Control Plane) 的编程语言选型
最初的讨论集中在 Node.js/TypeScript 上,因为 Playwright 本身就是基于 Node.js 生态。这似乎是“顺理成章”的选择。但在真实项目中,控制平面的核心职责是高并发调度与状态管理,而不是执行业务逻辑。它需要同时与成千上万的 Worker 建立长连接,处理任务分发、心跳、结果回收。
在这一点上,Go 的优势变得极为突出。它的 Goroutine 和 Channel 机制为构建高并发网络服务提供了语言级别的支持。为每个 Worker 连接创建一个 Goroutine 的开销极小,远低于 Node.js 中管理同等数量的并发连接所需的复杂性。此外,Go 编译后的静态二进制文件部署简单,无运行时依赖,这对于基础设施组件来说是个巨大的加分项。最终,我们决定使用 Go 来构建这个核心控制平面。
2. 通信中间件:HTTP, gRPC 还是消息队列?
- HTTP/WebSocket: 让 Worker 反向连接 Master。这种模型在规模扩大时,Master 需要维护海量长连接,状态管理复杂,且 Master 成为单点瓶颈。
- gRPC: 性能优越,但同样存在上述长连接管理问题。
- 消息队列: 这是最适合的解耦方案。Master 作为生产者,将压测任务投递到队列中。Worker 作为消费者,按自身处理能力从队列中拉取任务。Master 无需关心 Worker 的具体位置、数量和存活状态。
在消息队列选型上,我们排除了 RabbitMQ 和 Kafka。RabbitMQ 过于重,而 Kafka 虽然性能卓越,但其基于分区日志的模式对于我们这种“任务分发-执行-丢弃”的场景来说,配置和运维都过于复杂。
最终,我们选择了 NATS,更具体地说是 NATS JetStream。NATS 本身是一个极其轻量和高性能的核心消息系统,而 JetStream 在其之上提供了持久化、至少一次送达保证和工作队列(Work-Queue)消费模型。这完美契合我们的需求:
- 持久化流 (Stream): 即使所有 Worker 都离线,任务也不会丢失。
- 消费者组 (Consumer Group): 多个 Worker 实例可以订阅同一个流,但每个任务消息只会被组内的一个 Worker 消费。这天然实现了任务的负载均衡。
- ACK 机制: Worker 完成任务后发送确认,NATS 才会将消息从队列中删除。如果 Worker 中途崩溃,消息会在超时后被重新投递给另一个健康的 Worker,保证了任务的最终执行。
3. 结果存储:为什么是文档型 NoSQL
压测任务的配置(我们称之为 TestPlan)和压测产生的结果数据结构都比较复杂且可能演进。TestPlan 可能包含多个步骤、自定义的 JS 脚本、header 等。结果数据则包含各种性能指标(FCP, LCP, TTI)、截图、Trace 文件路径等。
使用关系型数据库需要设计复杂的表结构,每次调整 TestPlan 结构都可能引发痛苦的 schema migration。而文档型数据库,如 MongoDB,则非常适合。它可以直接存储 JSON 格式的 TestPlan 和结果,模式灵活,查询和聚合能力强大,便于后续对压测结果进行多维度分析。
架构图谱
经过选型,我们的系统架构逐渐清晰。
graph TD
subgraph User Interaction
A[CLI / API Client] -- HTTP POST --> B(Control Plane - Go Service)
end
subgraph Core Infrastructure
B -- 1. Store TestPlan --> D[MongoDB]
B -- 2. Publish Tasks --> C{NATS JetStream}
C -- 3. Push Task --> E[Playwright Worker Fleet]
E -- 4. Publish Result --> C
F(Result Collector - Go Service) -- 5. Consume Result --> C
F -- 6. Store Result --> D
end
subgraph Scalable Workers
E -- Run in Docker/K8s --> P1[Worker 1: Node.js + Playwright]
E -- Run in Docker/K8s --> P2[Worker 2: Node.js + Playwright]
E -- Run in Docker/K8s --> Pn[Worker N: Node.js + Playwright]
end
style E fill:#f9f,stroke:#333,stroke-width:2px
流程如下:
- 用户通过 API 提交一个
TestPlan(例如:模拟 1000 个用户,在 10 分钟内完成注册流程)。 - Go 编写的控制平面接收请求,将
TestPlan存入 MongoDB,并根据并发数和持续时间,生成数千个独立的Task消息。 - 控制平面将这些
Task消息发布到 NATS JetStream 的TASKS流中。 - Playwright Worker 集群作为消费者组订阅
TASKS流。NATS 自动将任务分发给空闲的 Worker。 - 每个 Worker 收到任务后,启动一个无头浏览器实例执行 Playwright 脚本,收集性能指标。
- 执行完毕后,Worker 将包含性能数据和状态的
Result消息发布到 NATS 的RESULTS流中。 - 另一个 Go 服务(Result Collector)专门订阅
RESULTS流,将结果持久化到 MongoDB 中进行聚合与分析。
核心实现:代码即架构
理论讲得再多,不如直接看代码。这里的代码是生产级的简化版,包含了完整的错误处理和关键逻辑。
1. 控制平面 (Go): 任务分发器
这个服务负责接收 HTTP 请求,并向 NATS JetStream 发布任务。
main.go
package main
import (
"context"
"encoding/json"
"log"
"net/http"
"os"
"time"
"github.com/google/uuid"
"github.com/nats-io/nats.go"
"go.mongodb.org/mongo-driver/mongo"
"go.mongodb.org/mongo-driver/mongo/options"
)
const (
natsURL = "nats://localhost:4222"
mongoURI = "mongodb://localhost:27017"
streamName = "PERF_TESTS"
tasksSubject = "PERF_TESTS.tasks"
dbName = "perftest"
planColl = "test_plans"
)
// TestPlan defines the structure for a performance test run.
type TestPlan struct {
ID string `json:"id" bson:"_id"`
Name string `json:"name" bson:"name"`
TargetURL string `json:"target_url" bson:"target_url"`
Concurrency int `json:"concurrency" bson:"concurrency"` // Number of parallel users
DurationSecs int `json:"duration_secs" bson:"duration_secs"`
PlaywrightScript string `json:"playwright_script" bson:"playwright_script"` // Base64 encoded script
CreatedAt time.Time `json:"created_at" bson:"created_at"`
}
// Task is the individual work unit for a worker.
type Task struct {
TaskID string `json:"task_id"`
PlanID string `json:"plan_id"`
TargetURL string `json:"target_url"`
Script string `json:"script"`
}
type APIServer struct {
js nats.JetStreamContext
dbClient *mongo.Client
}
func main() {
// --- NATS JetStream Connection ---
nc, err := nats.Connect(natsURL)
if err != nil {
log.Fatalf("Failed to connect to NATS: %v", err)
}
defer nc.Close()
js, err := nc.JetStream(nats.PublishAsyncMaxPending(256))
if err != nil {
log.Fatalf("Failed to create JetStream context: %v", err)
}
// Ensure stream exists
_, err = js.StreamInfo(streamName)
if err != nil {
log.Printf("Stream '%s' not found, creating it...", streamName)
_, err = js.AddStream(&nats.StreamConfig{
Name: streamName,
Subjects: []string{tasksSubject, "PERF_TESTS.results"}, // Subject for tasks and results
})
if err != nil {
log.Fatalf("Failed to create stream: %v", err)
}
}
// --- MongoDB Connection ---
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
defer cancel()
client, err := mongo.Connect(ctx, options.Client().ApplyURI(mongoURI))
if err != nil {
log.Fatalf("Failed to connect to MongoDB: %v", err)
}
defer client.Disconnect(ctx)
server := &APIServer{js: js, dbClient: client}
http.HandleFunc("/submit", server.handleSubmit)
log.Println("API Server listening on :8080...")
log.Fatal(http.ListenAndServe(":8080", nil))
}
func (s *APIServer) handleSubmit(w http.ResponseWriter, r *http.Request) {
if r.Method != http.MethodPost {
http.Error(w, "Method not allowed", http.StatusMethodNotAllowed)
return
}
var plan TestPlan
if err := json.NewDecoder(r.Body).Decode(&plan); err != nil {
http.Error(w, "Invalid request body", http.StatusBadRequest)
return
}
plan.ID = uuid.New().String()
plan.CreatedAt = time.Now().UTC()
// --- Persist the TestPlan ---
collection := s.dbClient.Database(dbName).Collection(planColl)
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()
_, err := collection.InsertOne(ctx, plan)
if err != nil {
log.Printf("ERROR: Failed to save TestPlan %s: %v", plan.ID, err)
http.Error(w, "Failed to save test plan", http.StatusInternalServerError)
return
}
log.Printf("Accepted TestPlan: %s (%s), dispatching %d tasks.", plan.ID, plan.Name, plan.Concurrency)
// --- Dispatch Tasks ---
// In a real-world scenario, you'd have more sophisticated logic for ramp-up, etc.
// Here we dispatch all tasks at once for simplicity.
for i := 0; i < plan.Concurrency; i++ {
task := Task{
TaskID: uuid.New().String(),
PlanID: plan.ID,
TargetURL: plan.TargetURL,
Script: plan.PlaywrightScript,
}
taskJSON, _ := json.Marshal(task)
// Publish async for max throughput
// NATS client library handles buffering and retries.
_, err := s.js.Publish(tasksSubject, taskJSON)
if err != nil {
// This might happen if the publish buffer is full.
log.Printf("WARN: Failed to publish task for plan %s: %v", plan.ID, err)
}
}
w.WriteHeader(http.StatusAccepted)
json.NewEncoder(w).Encode(map[string]string{"plan_id": plan.ID})
}
关键点分析:
- 幂等性与流创建: 我们在启动时检查 Stream 是否存在,不存在则创建。这使得服务可以无状态地重启。
- 异步发布:
js.Publish是一个异步操作。为了达到高吞吐量,我们不等待每次发布的确认。NATS 客户端库在后台处理这些。这里的坑在于,如果 NATS 服务器短暂不可用,发布缓冲区可能会满,导致错误。生产环境中需要监控这种情况。 - 任务粒度: 我们将一个大的
TestPlan拆分成多个小的Task。每个Task都是一个 Worker 可以独立执行的单元。这正是分布式处理的核心思想。
2. Playwright Worker (Node.js/TypeScript): 执行单元
Worker 是无状态的计算节点,可以水平扩展。它从 NATS 拉取任务,执行,然后上报结果。
worker.ts
import { connect, JSONCodec, NatsConnection, Subscription } from 'nats';
import { chromium, Browser, Page } from 'playwright';
import { v4 as uuidv4 } from 'uuid';
// --- Configuration ---
const NATS_URL = process.env.NATS_URL || "nats://localhost:4222";
const TASKS_SUBJECT = "PERF_TESTS.tasks";
const RESULTS_SUBJECT = "PERF_TESTS.results";
const STREAM_NAME = "PERF_TESTS";
const CONSUMER_NAME = "playwright-worker-group"; // All workers share this name
const WORKER_ID = `worker-${uuidv4()}`;
// --- Types ---
interface Task {
task_id: string;
plan_id: string;
target_url: string;
script: string; // Base64 encoded Playwright script
}
interface Result {
worker_id: string;
task_id: string;
plan_id: string;
status: 'success' | 'failure';
error_message?: string;
metrics: {
fcp?: number;
lcp?: number;
dom_load?: number;
total_duration_ms: number;
};
timestamp: string;
}
const jc = JSONCodec();
async function main() {
let browser: Browser | null = null;
let nc: NatsConnection | null = null;
console.log(`[${WORKER_ID}] Starting up...`);
try {
// --- Connect to NATS and Playwright ---
nc = await connect({ servers: NATS_URL });
const js = nc.jetstream();
browser = await chromium.launch({ headless: true });
console.log(`[${WORKER_ID}] Connected to NATS and browser launched.`);
// --- Create a durable, pull-based consumer ---
// This is key for a work-queue pattern.
// It's "pull-based" in the sense that we control the rate of message fetching.
const sub = await js.subscribe(TASKS_SUBJECT, {
queue: CONSUMER_NAME, // This creates the consumer group
durable: CONSUMER_NAME, // The consumer's state is persisted
ack_policy: 'explicit', // We must explicitly ack messages
ack_wait: 30000, // 30 seconds to process a message
});
console.log(`[${WORKER_ID}] Subscribed to subject '${TASKS_SUBJECT}' with consumer group '${CONSUMER_NAME}'. Waiting for tasks...`);
// --- Main processing loop ---
for await (const msg of sub) {
const startTime = performance.now();
const task = jc.decode(msg.data) as Task;
console.log(`[${WORKER_ID}] Received task ${task.task_id} for plan ${task.plan_id}`);
let page: Page | null = null;
const result: Partial<Result> = {
worker_id: WORKER_ID,
task_id: task.task_id,
plan_id: task.plan_id,
timestamp: new Date().toISOString(),
metrics: { total_duration_ms: 0 },
};
try {
page = await browser.newPage();
// A common mistake is to not handle navigation timeouts.
// In a stress test, the target site WILL be slow.
await page.goto(task.target_url, { waitUntil: 'domcontentloaded', timeout: 20000 });
// In a real system, the script would be more complex.
// Here we simulate getting performance metrics.
const perfTimings = await page.evaluate(() => {
const paintTimings = performance.getEntriesByType('paint');
const fcp = paintTimings.find(entry => entry.name === 'first-contentful-paint')?.startTime;
const lcpEntry = performance.getEntriesByType('largest-contentful-paint').pop();
const lcp = lcpEntry?.startTime;
const navTiming = performance.getEntriesByType('navigation')[0] as PerformanceNavigationTiming;
const domLoad = navTiming.domContentLoadedEventEnd - navTiming.fetchStart;
return { fcp, lcp, domLoad };
});
result.status = 'success';
result.metrics = { ...perfTimings, total_duration_ms: 0 }; // duration will be set finally
} catch (err: any) {
console.error(`[${WORKER_ID}] Task ${task.task_id} failed:`, err.message);
result.status = 'failure';
result.error_message = err.message.substring(0, 500); // Truncate long errors
} finally {
if (page) {
await page.close();
}
result.metrics!.total_duration_ms = performance.now() - startTime;
// Publish the result and then ACK the message.
// This ensures "at-least-once" delivery. If publishing fails, we don't ACK.
await js.publish(RESULTS_SUBJECT, jc.encode(result));
msg.ack();
console.log(`[${WORKER_ID}] Completed task ${task.task_id}, result published.`);
}
}
} catch (err) {
console.error(`[${WORKER_ID}] A critical error occurred:`, err);
} finally {
await browser?.close();
await nc?.close();
console.log(`[${WORKER_ID}] Shutting down.`);
process.exit(1); // Exit to allow orchestrator (like K8s) to restart it.
}
}
main();
package.json
{
"name": "playwright-worker",
"version": "1.0.0",
"main": "worker.js",
"scripts": {
"start": "ts-node worker.ts"
},
"dependencies": {
"nats": "^2.18.0",
"playwright": "^1.40.0",
"uuid": "^9.0.1"
},
"devDependencies": {
"@types/node": "^20.10.0",
"@types/uuid": "^9.0.7",
"ts-node": "^10.9.1",
"typescript": "^5.3.2"
}
}
关键点分析:
- 消费者组 (Consumer Group):
queue: CONSUMER_NAME是实现工作队列的核心。所有使用相同queue名的订阅者构成一个组,消息会被负载均衡到组内成员。 - 持久消费者 (Durable Consumer):
durable: CONSUMER_NAME告诉 JetStream 记住这个消费者的状态(比如它处理到哪条消息了)。即使所有 Worker 都下线再上线,它们也能从上次中断的地方继续处理,而不是从头开始。 - 显式 ACK:
ack_policy: 'explicit'和msg.ack()构成了可靠性的基石。我们必须在处理完任务 并成功上报结果后,才告诉 NATS 这个消息可以删了。如果在ack()之前 Worker 崩溃,NATS 会在ack_wait超时后将该消息重新投递给组内的其他 Worker。 - 资源管理: 浏览器实例
browser在 Worker 启动时创建,并在 Worker 的生命周期内复用。为每个任务都launch一个新浏览器开销太大。page则是每个任务隔离的。这是一个重要的性能优化。
3. 数据持久化 (Go): 结果收集器
这个服务很简单,它扮演着数据管道中最后一环的角色,将 NATS 中的临时结果数据转存到 MongoDB 长期存储。
collector.go
package main
import (
"context"
"encoding/json"
"log"
"os"
"time"
"github.com/nats-io/nats.go"
"go.mongodb.org/mongo-driver/bson"
"go.mongodb.org/mongo-driver/mongo"
"go.mongodb.org/mongo-driver/mongo/options"
)
// Same consts as main.go
const (
natsURL = "nats://localhost:4222"
mongoURI = "mongodb://localhost:27017"
streamName = "PERF_TESTS"
resultsSubject= "PERF_TESTS.results"
dbName = "perftest"
resultColl = "results"
)
type Result struct {
// Define the same Result struct as in TypeScript
// ...
}
func main() {
// --- NATS Connection ---
nc, err := nats.Connect(natsURL)
if err != nil {
log.Fatalf("Failed to connect to NATS: %v", err)
}
defer nc.Close()
js, err := nc.JetStream()
if err != nil {
log.Fatalf("Failed to create JetStream context: %v", err)
}
// --- MongoDB Connection ---
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
defer cancel()
client, err := mongo.Connect(ctx, options.Client().ApplyURI(mongoURI))
if err != nil {
log.Fatalf("Failed to connect to MongoDB: %v", err)
}
defer client.Disconnect(ctx)
collection := client.Database(dbName).Collection(resultColl)
log.Println("Result Collector starting...")
// --- Create a durable consumer for results ---
_, err = js.Subscribe(resultsSubject, func(msg *nats.Msg) {
var res map[string]interface{} // Use a map for flexibility
if err := json.Unmarshal(msg.Data, &res); err != nil {
log.Printf("ERROR: Failed to unmarshal result message: %v", err)
msg.Nak() // Tell NATS something went wrong, maybe redeliver later
return
}
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()
// Insert into MongoDB
_, err := collection.InsertOne(ctx, res)
if err != nil {
log.Printf("ERROR: Failed to insert result into MongoDB: %v. PlanID: %s", err, res["plan_id"])
msg.Nak() // Failed to persist, ask NATS to redeliver
return
}
msg.Ack() // Successfully processed
log.Printf("INFO: Saved result for PlanID: %s, TaskID: %s", res["plan_id"], res["task_id"])
}, nats.Durable("result-collector"), nats.AckExplicit())
select{} // Block forever
}
局限性与未来展望
这个架构解决了一个核心问题:如何以分布式、可扩展的方式运行海量浏览器实例来执行性能测试。它足够健壮,能够处理 Worker 的动态增减和故障。然而,它并非银弹。
当前的局限性:
- 资源消耗: 浏览器是资源密集型应用。每个 Playwright Worker 都需要显著的 CPU 和内存。在 Kubernetes 上部署时,必须仔细配置 resource requests/limits,并利用 HPA (Horizontal Pod Autoscaler) 基于 NATS 队列积压的任务数进行弹性伸缩。
- 测试脚本的复杂性: 当前设计将脚本作为字符串传递,对于复杂的、多步骤的测试脚本,管理和调试会变得困难。一个改进方向是引入脚本版本管理,Worker 在启动时从代码仓库或对象存储中拉取指定版本的测试脚本。
- 实时反馈缺失: 这是一个批处理系统。我们提交任务,等待其完成,然后分析 MongoDB 中的结果。对于压测过程的实时监控,例如实时 QPS、P99 响应时间曲线,还需要构建一个数据流管道,将结果从 NATS 同时推送到像 InfluxDB 或 Prometheus 这样的时序数据库中。
- 成本问题: 在公有云上运行数千个浏览器实例一小时的成本不容忽视。必须有配套的成本控制和预算告警机制。
未来的迭代路径:
一个可行的演进方向是引入更智能的调度策略。例如,实现一个“校准”模式,系统自动增加并发数,直到观察到目标应用的性能指标(如 LCP 或错误率)达到预设阈值,从而自动发现系统的性能拐点。此外,可以将 eBPF 技术集成到 Worker 节点,以非侵入的方式收集更底层的网络和系统指标,将前端性能数据与后端基础设施表现关联起来,实现真正的“全链路”压测与分析。