我们团队在一个文档协作项目中落地了事件溯源(Event Sourcing)架构。选择它的初衷很明确:完整的审计日志、强大的业务追溯能力以及灵活构建任意时间点状态的能力。技术选型上,我们没有引入专门的事件存储,而是选择用团队最熟悉的 PostgreSQL,利用其强大的 JSONB 类型和事务保证来构建事件存储。初期一切顺利,敏捷开发的节奏下,我们快速迭代,业务功能也稳步上线。
然而,当一个新需求出现时,这个看似稳固的架构开始暴露出裂缝。需求是在用户每次保存文档时,对文档内容进行深度的自然语言处理(NLP),提取实体、情感倾向和关键词,并生成一份分析报告。这个报告本身就是一个读模型(Read Model / Projection)。
最初的实现思路简单直接:在处理保存命令的同一个事务里,追加写入事件后,同步运行一个投影器(Projector),调用 spaCy 库完成NLP分析,再将结果写入报告表中。这是我们提交的第一个版本:
# WARNING: 这是错误的、会引发严重性能问题的实现
import spacy
import psycopg2
# 假设 nlp 是一个已加载的 spaCy 模型
nlp = spacy.load("en_core_web_sm")
def handle_document_saved_synchronously(event_data: dict, db_cursor):
"""
同步处理文档保存事件,并立即进行NLP分析。
这是一个典型的反模式。
"""
document_id = event_data['document_id']
document_content = event_data['content']
# 1. 对文档内容进行耗时的NLP处理
# 这里的操作是CPU密集型的,会阻塞整个处理流程
doc = nlp(document_content)
entities = [(ent.text, ent.label_) for ent in doc.ents]
keywords = [token.lemma_ for token in doc if not token.is_stop and not token.is_punct]
# 2. 将分析结果写入一个读模型表
# 这个操作发生在同一个事务中,延长了锁的持有时间
try:
db_cursor.execute(
"""
INSERT INTO document_analysis_reports (document_id, entities, keywords, version)
VALUES (%s, %s, %s, %s)
ON CONFLICT (document_id) DO UPDATE
SET entities = EXCLUDED.entities,
keywords = EXCLUDED.keywords,
version = EXCLUDED.version;
""",
(document_id, json.dumps(entities), json.dumps(keywords), event_data['version'])
)
print(f"Successfully generated report for document {document_id}")
except psycopg2.Error as e:
# 在真实项目中,这里需要有完善的日志记录
print(f"Error updating report for document {document_id}: {e}")
raise # 向上抛出异常,以便回滚整个事务
# 在主应用逻辑中...
# with connection.cursor() as cursor:
# # ... 写入 DocumentSavedEvent 到 events 表 ...
# handle_document_saved_synchronously(event_payload, cursor)
# # ... 提交事务 ...
这段代码在开发环境运行良好。但在预生产环境的压力测试中,灾难降临了。API 的响应时间从几十毫秒飙升到几秒甚至十几秒。spaCy 的模型加载和计算是CPU密集型操作,对于一篇几千字的文档,处理时间可能达到数百毫-秒到数秒。将这个过程放在同步的请求/响应循环中,直接阻塞了API工作线程,导致吞吐量急剧下降。数据库侧,由于整个过程在一个长事务中,对 events 表和 document_analysis_reports 表的锁持有时间变得不可接受。
这次失败是敏捷开发过程中的一个典型教训:快速实现的原型验证了功能,但也暴露了架构的性能瓶颈。我们必须重构。
初步构想与技术选型决策
痛点已经明确:CPU密集型的投影计算任务必须与主业务流程异步解耦。
第一个想法是使用一个简单的后台任务队列,比如 Celery。在写入事件后,向 RabbitMQ 或 Redis 发送一个任务消息,由 Celery worker 异步消费并处理。这是一个非常成熟的模式,但在我们的场景下,我们预见到了几个潜在的坑:
- 双写一致性问题: 如何保证写入事件到PostgreSQL和发送消息到消息队列这两个操作的原子性?如果数据库事务提交成功,但消息发送失败,这个NLP分析任务就永远丢失了。反之,如果消息发送成功但数据库事务回滚,我们就会处理一个不存在的事件。
- 资源隔离与调度: NLP任务对CPU和内存的消耗都很大。如果将这些
Celeryworker 与我们的API服务部署在同一批机器上,很可能因为资源竞争而相互影响。更重要的是,spaCy的模型很大,每个worker进程都加载一份模型会消耗大量内存。理想情况下,我们希望有一个更懂计算任务调度的系统。
这两个问题,特别是第一个,促使我们排除了“DB + 外部MQ”的方案。转而采用事务性发件箱模式 (Transactional Outbox Pattern)**。该模式的核心思想是:将待发送的消息(或待执行的任务)作为一条记录,与业务数据的变更(即写入新事件)存储在同一数据库的同一事务**中。这样,我们就借助数据库的ACID特性保证了操作的原子性。
一旦任务被原子地写入数据库,一个独立的分发器 (Dispatcher) 进程就可以安全地轮询这张任务表,将任务分发给真正的计算集群。
那么,计算集群用什么?Celery 依然是备选。但考虑到 spaCy 和未来可能引入的更复杂的机器学习模型(比如 scikit-learn 或 PyTorch),我们最终将目光投向了 Dask。Dask 是一个为分析计算而生的并行计算库。它的优势在于:
- 原生支持复杂计算图: Dask不仅仅是任务队列,它能理解任务间的依赖关系,优化执行计划。
- 高效的数据本地化: Dask调度器会尝试将计算任务调度到数据所在的节点,减少数据传输开销。虽然我们场景下主要是传递事件payload,但这个特性对未来的扩展很有价值。
- 与Python科学计算生态无缝集成:
Dask与NumPy,pandas,scikit-learn等库深度集成,对于我们这种NLP/ML密集型应用来说,它是比通用任务队列更自然的选择。
最终的技术方案定格在:**PostgreSQL (Event Store + Outbox Table) + Polling Dispatcher + Dask Cluster (for spaCy computation)**。
步骤化实现
第一阶段:加固事件存储与引入发件箱
首先,我们定义了 events 表和新的 projection_jobs 表的结构。
-- events 表用于存储所有事件
CREATE TABLE events (
id BIGSERIAL PRIMARY KEY,
stream_id UUID NOT NULL,
version INT NOT NULL,
event_type VARCHAR(255) NOT NULL,
payload JSONB NOT NULL,
metadata JSONB DEFAULT '{}',
created_at TIMESTAMPTZ DEFAULT NOW(),
CONSTRAINT stream_version_unique UNIQUE (stream_id, version)
);
CREATE INDEX idx_events_stream_id ON events (stream_id);
CREATE INDEX idx_events_event_type ON events (event_type);
-- projection_jobs 表,即我们的“发件箱”
CREATE TABLE projection_jobs (
id BIGSERIAL PRIMARY KEY,
event_id BIGINT NOT NULL REFERENCES events(id),
projector_name VARCHAR(255) NOT NULL, -- 哪个投影器要处理
status VARCHAR(50) NOT NULL DEFAULT 'pending', -- pending, processing, completed, failed
attempts INT NOT NULL DEFAULT 0,
last_error TEXT,
created_at TIMESTAMPTZ DEFAULT NOW(),
updated_at TIMESTAMPTZ DEFAULT NOW(),
CONSTRAINT event_projector_unique UNIQUE (event_id, projector_name)
);
CREATE INDEX idx_projection_jobs_status ON projection_jobs (status);
关键在于,追加事件和创建投影任务必须在同一个事务中完成。我们重写了事件追加的逻辑:
import uuid
import json
import psycopg2
from psycopg2.extras import Json
class EventStore:
def __init__(self, connection):
self.conn = connection
def append_event(self, stream_id: uuid.UUID, expected_version: int, event_type: str, payload: dict):
"""
原子地追加事件并为需要异步计算的投影创建任务。
"""
with self.conn.cursor() as cursor:
try:
# 1. 检查当前版本,实现乐观并发控制
cursor.execute(
"SELECT MAX(version) FROM events WHERE stream_id = %s",
(str(stream_id),)
)
current_version = cursor.fetchone()[0] or 0
if current_version != expected_version:
raise ConcurrencyException(f"Expected version {expected_version}, but got {current_version}")
new_version = current_version + 1
# 2. 插入新事件
cursor.execute(
"""
INSERT INTO events (stream_id, version, event_type, payload)
VALUES (%s, %s, %s, %s) RETURNING id;
""",
(str(stream_id), new_version, event_type, Json(payload))
)
event_id = cursor.fetchone()[0]
# 3. 如果事件需要NLP处理,在同一个事务中插入job
if event_type == 'DocumentSavedEvent':
cursor.execute(
"""
INSERT INTO projection_jobs (event_id, projector_name)
VALUES (%s, %s);
""",
(event_id, 'nlp_analysis_projector')
)
self.conn.commit()
return event_id
except (psycopg2.Error, ConcurrencyException) as e:
self.conn.rollback()
# 关键:在这里记录详细的错误日志
print(f"Failed to append event: {e}")
raise
class ConcurrencyException(Exception):
pass
现在,主业务流程变得非常快。它只做一件有保障的事情:将事件和任务写入数据库。整个API请求的响应时间恢复到了几十毫秒。
第二阶段:构建分发器与 Dask Worker
分发器是一个独立的、长期运行的进程。它的逻辑很简单:定期查询 projection_jobs 表中 pending 状态的任务,并将其提交给Dask。
一个常见的错误是直接 SELECT ... WHERE status = 'pending',这在多个分发器实例下会导致任务被重复分发。正确的做法是使用 SELECT ... FOR UPDATE SKIP LOCKED,这是一个原子操作,能确保每个任务只被一个工作线程锁定和获取。
# dispatcher.py
import time
import logging
from dask.distributed import Client, as_completed
import psycopg2
# 配置日志
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')
DASK_SCHEDULER_ADDRESS = 'tcp://127.0.0.1:8786'
DB_CONN_STRING = "dbname=events_db user=user password=pass host=localhost"
def fetch_and_dispatch_jobs(db_conn, dask_client):
"""
查询并分发任务。使用了 FOR UPDATE SKIP LOCKED 来保证多实例安全。
"""
fetch_sql = """
SELECT id, event_id FROM projection_jobs
WHERE status = 'pending'
ORDER BY created_at
LIMIT 10
FOR UPDATE SKIP LOCKED;
"""
update_to_processing_sql = "UPDATE projection_jobs SET status = 'processing', updated_at = NOW() WHERE id = %s;"
pending_jobs = []
with db_conn.cursor() as cursor:
try:
cursor.execute(fetch_sql)
jobs_to_process = cursor.fetchall()
if not jobs_to_process:
return [] # 没有待处理任务
for job_id, event_id in jobs_to_process:
cursor.execute(update_to_processing_sql, (job_id,))
pending_jobs.append({'job_id': job_id, 'event_id': event_id})
db_conn.commit()
logging.info(f"Dispatched {len(pending_jobs)} jobs.")
except psycopg2.Error as e:
db_conn.rollback()
logging.error(f"Database error while fetching jobs: {e}")
return []
# 将任务提交到 Dask
futures = []
for job in pending_jobs:
# dask_task.run_nlp_projection 是在 worker 端定义的函数
future = dask_client.submit(dask_task.run_nlp_projection, job['event_id'], job['job_id'])
futures.append(future)
return futures
def main_loop():
dask_client = Client(DASK_SCHEDULER_ADDRESS)
db_conn = psycopg2.connect(DB_CONN_STRING)
db_conn.autocommit = False # 手动管理事务
logging.info("Dispatcher started. Connecting to Dask and PostgreSQL.")
while True:
try:
futures = fetch_and_dispatch_jobs(db_conn, dask_client)
# 这里可以对 futures 做一些管理,但为简化,我们仅轮询
if not futures:
time.sleep(5) # 轮询间隔
except Exception as e:
logging.critical(f"Unhandled error in dispatcher main loop: {e}", exc_info=True)
# 在生产环境中,这里应该有重连逻辑
time.sleep(15)
# 这是一个简化的示例,并未包含 Dask task 的定义
# import dask_task # 假设任务定义在 dask_task.py 中
接下来是 Dask worker 端执行的实际任务。这里的关键挑战是 spaCy 模型的管理。模型文件很大(en_core_web_sm 约几十MB,大的模型几百MB),不能在每次任务执行时都去加载。一个好的实践是利用 Dask worker 的生命周期,在 worker 启动时预加载模型,或者使用一个单例模式来确保每个 worker 进程只加载一次。
# dask_task.py
import os
import spacy
import psycopg2
import json
from psycopg2.extras import Json
DB_CONN_STRING = "dbname=events_db user=user password=pass host=localhost"
# 全局变量,用于在 worker 进程中缓存 spaCy 模型
# 这是一个简化处理,在真实项目中可能需要更复杂的管理类
NLP_MODEL = None
def get_nlp_model():
"""
确保每个 Dask worker 进程只加载一次模型。
"""
global NLP_MODEL
if NLP_MODEL is None:
model_name = "en_core_web_sm"
print(f"Worker {os.getpid()}: Loading spaCy model '{model_name}'...")
NLP_MODEL = spacy.load(model_name)
print(f"Worker {os.getpid()}: Model loaded.")
return NLP_MODEL
def run_nlp_projection(event_id: int, job_id: int):
"""
Dask worker 执行的函数。
它必须是幂等的,并且处理所有可能的异常。
"""
db_conn = None
try:
db_conn = psycopg2.connect(DB_CONN_STRING)
with db_conn.cursor() as cursor:
# 1. 获取事件内容
cursor.execute("SELECT payload FROM events WHERE id = %s", (event_id,))
event_row = cursor.fetchone()
if not event_row:
raise ValueError(f"Event with id {event_id} not found.")
payload = event_row[0]
document_id = payload['document_id']
content = payload['content']
version = payload['version']
# 2. 执行CPU密集型计算
nlp = get_nlp_model()
doc = nlp(content)
entities = [{'text': ent.text, 'label': ent.label_} for ent in doc.ents]
# 这里可以加入更多分析...
# 3. 结果写入读模型(幂等操作)
# 使用 ON CONFLICT 确保重复执行任务不会导致错误或重复数据
upsert_sql = """
INSERT INTO document_analysis_reports (document_id, entities, version)
VALUES (%s, %s, %s)
ON CONFLICT (document_id) DO UPDATE
SET entities = EXCLUDED.entities,
version = EXCLUDED.version
WHERE document_analysis_reports.version < EXCLUDED.version;
"""
cursor.execute(upsert_sql, (document_id, Json(entities), version))
# 4. 更新 job 状态为 completed
cursor.execute(
"UPDATE projection_jobs SET status = 'completed', updated_at = NOW(), last_error = NULL, attempts = attempts + 1 WHERE id = %s",
(job_id,)
)
db_conn.commit()
return {"status": "success", "job_id": job_id}
except Exception as e:
error_message = str(e)
if db_conn:
db_conn.rollback()
# 5. 记录错误并更新 job 状态为 failed
# 在真实项目中,这里应该有重试逻辑,比如当 attempts < 5 时,状态设回 pending
with db_conn.cursor() as error_cursor:
error_cursor.execute(
"UPDATE projection_jobs SET status = 'failed', last_error = %s, updated_at = NOW(), attempts = attempts + 1 WHERE id = %s",
(error_message, job_id)
)
db_conn.commit()
return {"status": "error", "job_id": job_id, "error": error_message}
finally:
if db_conn:
db_conn.close()
这个 run_nlp_projection 函数体现了生产级代码的几个要点:
- 幂等性:
INSERT ... ON CONFLICT保证了即使任务被重复执行,读模型的状态也是正确的。我们还加入了版本检查,防止旧事件覆盖新状态。 - 独立的数据库连接: 每个Dask任务都创建自己的数据库连接,避免了连接共享带来的并发问题。
- 详尽的错误处理: 任何异常都会被捕获,任务状态会被更新为
failed,并记录错误信息,便于后续排查。分发器可以根据attempts字段实现重试策略。
最终成果与架构图
经过重构,整个系统的数据流变得清晰且健壮。
sequenceDiagram
participant Client
participant API Service
participant PostgreSQL_DB as PostgreSQL DB
participant Dispatcher
participant Dask_Scheduler as Dask Scheduler
participant Dask_Worker as Dask Worker
Client->>+API Service: POST /documents/{id} (save command)
activate API Service
Note over API Service, PostgreSQL_DB: Start Transaction
API Service->>PostgreSQL_DB: INSERT INTO events (...)
API Service->>PostgreSQL_DB: INSERT INTO projection_jobs (..., status='pending')
Note over API Service, PostgreSQL_DB: Commit Transaction
API Service-->>-Client: 202 Accepted
deactivate API Service
loop Poll for jobs
Dispatcher->>+PostgreSQL_DB: SELECT ... FROM projection_jobs WHERE status='pending' FOR UPDATE SKIP LOCKED
PostgreSQL_DB-->>-Dispatcher: Job[id=123, event_id=456]
Dispatcher->>+PostgreSQL_DB: UPDATE projection_jobs SET status='processing' WHERE id=123
PostgreSQL_DB-->>-Dispatcher: OK
end
Dispatcher->>+Dask_Scheduler: Submit task(run_nlp_projection, event_id=456)
Dask_Scheduler->>+Dask_Worker: Execute task
activate Dask_Worker
Dask_Worker->>PostgreSQL_DB: SELECT payload FROM events WHERE id=456
Note over Dask_Worker: Run spaCy NLP analysis (CPU Intensive)
Dask_Worker->>PostgreSQL_DB: INSERT ... ON CONFLICT INTO document_analysis_reports
Dask_Worker->>PostgreSQL_DB: UPDATE projection_jobs SET status='completed' WHERE id=123
deactivate Dask_Worker
Dask_Worker-->>-Dask_Scheduler: Task finished
Dask_Scheduler-->>-Dispatcher: Future is done
这个架构彻底解决了性能瓶颈。API服务的职责变得单一,只负责快速、可靠地接收命令并持久化事件。所有耗时的、复杂的计算都被推到隔离的、可水平扩展的Dask集群中。敏捷开发的迭代过程并未受阻,反而因为架构的解耦,前后端的功能开发可以更独立地进行。
遗留问题与未来迭代路径
这套方案并非一劳永逸,在真实项目中,它仍有几个需要持续关注和改进的地方:
- 分发器的可用性: 当前的分发器是单点。虽然
SKIP LOCKED保证了多实例的数据安全,但我们还需要一套部署和监控机制(如Kubernetes Deployment)来确保至少有一个分发器实例在运行。 - 读模型的最终一致性: 从事件产生到NLP分析报告生成,中间存在延迟。UI/UX设计必须考虑到这种最终一致性,比如可以显示“报告生成中…”的状态。监控这个延迟(
projection_jobs.created_at与updated_at的差值)是关键的SLO指标。 - Dask集群的运维成本:
Dask虽好,但它是一个分布式系统,需要专门的运维。在云原生环境中,可以使用Dask Kubernetes Operator来简化部署和弹性伸缩,但这本身也带来了新的技术复杂性。 - 死信队列与手动干预: 对于反复失败的任务(例如,由于一个持久性的bug或脏数据),简单的重试会无济于事。需要引入一个“死信”状态,并将这些任务放入一个专门的队列中,等待工程师手动分析和干预。
- 事件回放(Replay)的挑战: 事件溯源的一大优势是能够通过回放所有事件来重建或创建新的读模型。对于这种计算密集型的投影,完整的回放将是一项计算量巨大的任务。我们需要设计一个高效的回放机制,也许是直接批量从
events表读取数据并提交给Dask,绕过projection_jobs表。