一个棘手的架构需求摆在面前:一个高合规性的金融文档处理系统,其核心要求是任何文档的元数据入库操作,必须与一个不可篡改的审计日志条目实现原子性的绑定。如果元数据写入成功但审计日志失败,整个操作必须回滚,反之亦然。同时,系统需要在文档入库后,调用一个基于LlamaIndex的AI服务对文档内容进行深度分析和向量化,这是一个计算密集且耗时、可能失败的异步任务。
将所有操作打包进一个巨大的数据库事务是不可行的。AI处理过程可能耗时数分钟,长时间持有数据库锁会直接拖垮整个系统。而如果采用完全解耦的事件驱动架构,审计日志的写入就成了最终一致性,这无法满足“写入即存在”的强合规要求。
这里的核心矛盾是:系统内部存在两种截然不同的一致性需求。一部分需要强一致性(元数据与审计),另一部分则可以接受最终一致性(AI分析)。强行统一或忽略差异都会导致灾难。
方案A:Saga模式的诱惑与陷阱
Saga模式作为分布式事务的常见解决方案,首先进入了我们的视野。其核心思想是将一个长事务拆分为多个本地事务,通过事件或消息驱动,并为每个步骤提供补偿操作。
sequenceDiagram
participant Client
participant MetadataService
participant MessageQueue
participant AuditService
participant AIService
Client->>+MetadataService: 1. CreateDocument()
MetadataService->>MetadataService: a. Begin Local Tx
MetadataService->>MetadataService: b. INSERT INTO documents...
MetadataService->>MessageQueue: 2. Publish DocumentCreated Event
MetadataService->>MetadataService: c. Commit Local Tx
MetadataService-->>-Client: OK
MessageQueue-->>+AuditService: 3. Consume DocumentCreated
AuditService->>AuditService: INSERT INTO audit_logs...
AuditService-->>-MessageQueue: ACK
MessageQueue-->>+AIService: 4. Consume DocumentCreated
AIService->>AIService: Start LlamaIndex processing...
AIService-->>-MessageQueue: ACK
优势分析:
- 高可用性: 服务之间完全解耦。AuditService或AIService的临时故障不会阻塞MetadataService的写入。
- 短事务: 每个服务都只处理自己的本地事务,资源锁定时间极短,系统吞吐量高。
劣势分析:
- 一致性降级: 从
DocumentCreated事件发布到AuditService成功消费之间存在一个时间窗口。在这个窗口内,系统处于不一致状态:元数据存在,但审计日志缺失。对于需要强审计的场景,这是不可接受的。 - 补偿复杂性: 如果AuditService写入失败,我们需要触发一个补偿操作来删除已创建的元数据。补偿逻辑的开发和维护成本非常高,且难以保证100%的可靠性。
在真实项目中,依赖补偿逻辑来保证核心业务数据的完整性是一条充满风险的道路。对于我们这个场景,Saga模式牺牲了最关键的原子性保证。
方案B:两阶段提交(2PC)的回归与约束
既然Saga的最终一致性不满足要求,我们必须回头审视能提供原子性保证的经典协议:两阶段提交(2PC)。2PC通过引入一个协调者(Coordinator)来确保所有参与者(Participants)要么全部提交,要么全部回滚。
graph TD
subgraph "Transaction Scope (Strong Consistency)"
TC(Transaction Coordinator)
P1(Participant 1: Metadata Service)
P2(Participant 2: Audit Service)
TC -- 1. Prepare --> P1
TC -- 1. Prepare --> P2
P1 -- 2. Voted Yes --> TC
P2 -- 2. Voted Yes --> TC
TC -- 3. Commit --> P1
TC -- 3. Commit --> P2
end
subgraph "Asynchronous Scope (Eventual Consistency)"
TC -- 4. Trigger After Commit --> MQ(Message Queue)
MQ --> Worker(AI Worker: LlamaIndex)
end
优势分析:
- 原子性保证: 这是2PC的核心价值。它能确保Metadata和Audit两个操作作为一个原子单元被提交,完美契合我们的合规需求。
- 协议简单: 相较于Paxos或Raft,2PC的逻辑更直观,在参与者数量可控的情况下,自行实现是可行的。
劣势分析:
- 同步阻塞: 在第一阶段(Prepare)结束到第二阶段(Commit/Rollback)开始之间,所有参与者持有的资源(如数据库行锁)都会被锁定。如果协调者宕机,这些资源将永久锁定,需要人工干预。
- 协调者单点: 协调者是整个系统的单点故障。
- 性能开销: 两次网络往返通信带来了额外的延迟。
这里的关键权衡在于,我们是否愿意为了获得绝对的原子性,而去承担2PC带来的同步阻塞和单点风险。对于我们这个场景——核心数据写入操作,答案是肯定的。但前提是,必须将耗时且不可靠的AI处理剥离出事务边界。
最终决策:2PC + 异步消息的混合模型
我们的最终架构选择了一个混合模型:
- 使用2PC来保证
MetadataService和AuditService这两个核心服务的强一致性。 - 在2PC事务成功提交后,由协调者(Coordinator)向消息队列发送一个事件。
- 一个独立的AI Worker消费此事件,异步执行LlamaIndex的文档处理任务。
这个模型精确地将不同的一致性需求映射到了合适的技术方案上,既保证了核心数据的完整性,又避免了长事务对系统可用性的破坏。
核心实现概览
我们将使用PHP来实现这套体系。服务间通信采用HTTP RESTful API。OAuth 2.0的客户端凭证(Client Credentials)模式用于服务间的安全认证。
1. OAuth 2.0 服务间认证
在发起事务前,事务协调者(或其上游服务)必须先从认证服务器获取一个Access Token。
AuthClient.php (获取Token的客户端)
<?php
declare(strict_types=1);
namespace App\Clients;
use GuzzleHttp\Client;
use GuzzleHttp\Exception\GuzzleException;
use Psr\Log\LoggerInterface;
class AuthClient
{
private Client $httpClient;
private string $tokenUrl;
private string $clientId;
private string $clientSecret;
private LoggerInterface $logger;
public function __construct(
string $tokenUrl,
string $clientId,
string $clientSecret,
LoggerInterface $logger
) {
$this->httpClient = new Client(['timeout' => 5.0]);
$this->tokenUrl = $tokenUrl;
$this->clientId = $clientId;
$this->clientSecret = $clientSecret;
$this->logger = $logger;
}
public function getServiceToken(): ?string
{
try {
$response = $this->httpClient->post($this->tokenUrl, [
'form_params' => [
'grant_type' => 'client_credentials',
'client_id' => $this->clientId,
'client_secret' => $this->clientSecret,
'scope' => 'transaction:manage', // 精细化权限控制
]
]);
if ($response->getStatusCode() !== 200) {
$this->logger->error('Failed to get auth token', [
'status' => $response->getStatusCode(),
'body' => $response->getBody()->getContents()
]);
return null;
}
$data = json_decode($response->getBody()->getContents(), true);
return $data['access_token'] ?? null;
} catch (GuzzleException $e) {
$this->logger->critical('Auth service request failed', [
'error' => $e->getMessage()
]);
return null;
}
}
}
2. 两阶段提交协调者 (Transaction Coordinator)
这是整个方案的大脑。它负责生成全局事务ID,并按顺序调用所有参与者的prepare和commit/rollback接口。
TransactionCoordinatorService.php
<?php
declare(strict_types=1);
namespace App\Service;
use GuzzleHttp\Client;
use GuzzleHttp\Promise;
use Psr\Http\Message\ResponseInterface;
use Psr\Log\LoggerInterface;
use Ramsey\Uuid\Uuid;
class TransactionCoordinatorService
{
private array $participants; // e.g., ['http://metadata-service', 'http://audit-service']
private Client $httpClient;
private LoggerInterface $logger;
private MessageQueueClient $mqClient;
private string $accessToken;
public function __construct(
array $participantEndpoints,
MessageQueueClient $mqClient,
LoggerInterface $logger,
string $accessToken
) {
$this->participants = $participantEndpoints;
$this->mqClient = $mqClient;
$this->logger = $logger;
$this->accessToken = $accessToken;
// 生产环境中,超时配置至关重要,防止无限期阻塞
$this->httpClient = new Client([
'timeout' => 10.0, // 总超时
'connect_timeout' => 3.0, // 连接超时
]);
}
public function executeTransaction(array $payload): bool
{
$transactionId = Uuid::uuid4()->toString();
$this->logger->info("Starting 2PC transaction", ['tx_id' => $transactionId]);
// --- Phase 1: Prepare ---
$prepareSuccess = $this->broadcastPrepare($transactionId, $payload);
if (!$prepareSuccess) {
$this->logger->warning("Prepare phase failed. Broadcasting rollback.", ['tx_id' => $transactionId]);
$this->broadcastRollback($transactionId);
return false;
}
// --- Phase 2: Commit ---
$commitSuccess = $this->broadcastCommit($transactionId);
if (!$commitSuccess) {
$this->logger->critical("Commit phase failed! Manual intervention required!", [
'tx_id' => $transactionId,
'participants' => $this->participants
]);
// 这是一个危险状态,部分节点可能已提交。
// 真实项目中需要复杂的恢复机制。
return false;
}
$this->logger->info("2PC transaction committed successfully.", ['tx_id' => $transactionId]);
// --- Post-Commit: Trigger Asynchronous Task ---
$this->triggerAsyncProcessing($payload);
return true;
}
private function broadcastPrepare(string $txId, array $payload): bool
{
$promises = [];
foreach ($this->participants as $endpoint) {
$promises[$endpoint] = $this->httpClient->postAsync("{$endpoint}/prepare", [
'headers' => $this->getAuthHeaders(),
'json' => ['tx_id' => $txId, 'data' => $payload]
]);
}
try {
// 并行发送所有prepare请求
$responses = Promise\Utils::settle($promises)->wait();
foreach ($responses as $endpoint => $result) {
if ($result['state'] === 'rejected' || $result['value']->getStatusCode() !== 200) {
$this->logger->error("Participant failed to prepare.", [
'tx_id' => $txId,
'endpoint' => $endpoint,
'reason' => $result['reason'] ?? $result['value']->getReasonPhrase()
]);
return false;
}
}
return true;
} catch (\Throwable $e) {
$this->logger->error("Error during prepare phase.", ['tx_id' => $txId, 'error' => $e->getMessage()]);
return false;
}
}
private function broadcastCommit(string $txId): bool
{
// 提交阶段必须保证所有参与者都收到指令,即使有网络错误也要重试
// 生产级实现会使用持久化队列或状态机来确保指令的最终送达
$allCommitted = true;
foreach ($this->participants as $endpoint) {
try {
$response = $this->httpClient->post("{$endpoint}/commit", [
'headers' => $this->getAuthHeaders(),
'json' => ['tx_id' => $txId]
]);
if ($response->getStatusCode() !== 200) {
$this->logger->error("Participant failed to commit.", ['tx_id' => $txId, 'endpoint' => $endpoint]);
$allCommitted = false; // 记录失败,但继续通知其他节点
}
} catch (\Exception $e) {
$this->logger->error("Error committing participant.", ['tx_id' => $txId, 'endpoint' => $endpoint, 'error' => $e->getMessage()]);
$allCommitted = false;
}
}
return $allCommitted;
}
private function broadcastRollback(string $txId): void
{
// 回滚也需要尽力而为地通知所有参与者
foreach ($this->participants as $endpoint) {
try {
$this->httpClient->post("{$endpoint}/rollback", [
'headers' => $this->getAuthHeaders(),
'json' => ['tx_id' => $txId]
]);
} catch (\Exception $e) {
$this->logger->error("Error rolling back participant.", ['tx_id' => $txId, 'endpoint' => $endpoint, 'error' => $e->getMessage()]);
}
}
}
private function triggerAsyncProcessing(array $payload): void
{
$jobPayload = [
'document_id' => $payload['id'],
's3_path' => $payload['path'],
];
$this->mqClient->enqueue('llama_index_queue', $jobPayload);
$this->logger->info("Enqueued job for async AI processing.", ['document_id' => $payload['id']]);
}
private function getAuthHeaders(): array
{
return [
'Authorization' => 'Bearer ' . $this->accessToken,
'Content-Type' => 'application/json',
'Accept' => 'application/json',
];
}
}
3. 事务参与者 (Transaction Participant)
每个参与者(MetadataService, AuditService)都需要实现三个接口:/prepare, /commit, /rollback。这里以MetadataService为例,它使用PDO来管理本地数据库事务。
ParticipantController.php (在一个简单的路由框架下)
<?php
declare(strict_types=1);
namespace App\Controller;
use Psr\Http\Message\ServerRequestInterface as Request;
use Psr\Http\Message\ResponseInterface as Response;
use PDO;
class ParticipantController
{
private PDO $pdo;
private array $preparedTransactions = []; // 内存中存储预备事务,真实项目需要持久化
public function __construct(PDO $pdo)
{
$this->pdo = $pdo;
}
public function prepare(Request $request, Response $response): Response
{
$body = json_decode((string)$request->getBody(), true);
$txId = $body['tx_id'] ?? null;
$data = $body['data'] ?? null;
if (!$txId || !$data) {
return $response->withStatus(400);
}
try {
// 关键:开启本地事务,但暂不提交
$this->pdo->beginTransaction();
// 执行业务逻辑
$stmt = $this->pdo->prepare(
"INSERT INTO documents (id, name, path, created_at) VALUES (?, ?, ?, NOW())"
);
$stmt->execute([$data['id'], $data['name'], $data['path']]);
// 记录此事务已准备好,并与本地PDO事务关联
// 在真实项目中,这里应该将 txId 和连接/会话信息持久化到
// 一个事务日志表中,以便服务重启后能恢复状态。
$this->preparedTransactions[$txId] = true;
// 返回成功,代表投票 "YES"
return $response->withStatus(200);
} catch (\Exception $e) {
// 如果准备阶段出错,立即回滚本地事务
if ($this->pdo->inTransaction()) {
$this->pdo->rollBack();
}
unset($this->preparedTransactions[$txId]);
// 返回失败,代表投票 "NO"
return $response->withStatus(500);
}
}
public function commit(Request $request, Response $response): Response
{
$body = json_decode((string)$request->getBody(), true);
$txId = $body['tx_id'] ?? null;
if (!isset($this->preparedTransactions[$txId])) {
// 可能因为网络问题重复收到commit指令,如果已处理则幂等返回成功
// 或者是一个未知的事务ID,返回错误
return $response->withStatus(404);
}
try {
// 提交之前保持的本地事务
$this->pdo->commit();
unset($this->preparedTransactions[$txId]);
return $response->withStatus(200);
} catch (\Exception $e) {
// Commit失败是灾难性的,本地数据可能已损坏
// 需要告警并人工介入
// Log a critical error
return $response->withStatus(500);
}
}
public function rollback(Request $request, Response $response): Response
{
$body = json_decode((string)$request->getBody(), true);
$txId = $body['tx_id'] ?? null;
if (!isset($this->preparedTransactions[$txId])) {
return $response->withStatus(404);
}
try {
$this->pdo->rollBack();
unset($this->preparedTransactions[$txId]);
return $response->withStatus(200);
} catch (\Exception $e) {
// Log error
return $response->withStatus(500);
}
}
}
单元测试思路:
- Coordinator: 模拟参与者返回不同状态码(200, 500)和网络异常,断言Coordinator是否能正确广播
commit或rollback。 - Participant: 模拟数据库操作成功/失败,检查
/prepare接口是否能正确开启事务并投票。测试/commit和/rollback接口的幂等性。
4. 异步AI Worker
这个Worker是一个独立的长时间运行的进程,它监听消息队列。这里我们只展示概念代码,实际项目中它可能是一个用Python编写的、充分利用LlamaIndex能力的服务。
LlamaIndexWorker.php (消费者)
<?php
declare(strict_types=1);
namespace App\Worker;
class LlamaIndexWorker
{
private MessageQueueClient $mqClient;
private LoggerInterface $logger;
public function run()
{
while (true) {
$job = $this->mqClient->dequeue('llama_index_queue');
if ($job) {
try {
$this->processDocument($job);
} catch (\Exception $e) {
$this->logger->error("Failed to process document for LlamaIndex", [
'job' => $job,
'error' => $e->getMessage()
]);
// 将任务移入死信队列或执行重试策略
}
}
sleep(1);
}
}
private function processDocument(array $jobData): void
{
$documentId = $jobData['document_id'];
$s3Path = $jobData['s3_path'];
$this->logger->info("Starting LlamaIndex processing", ['document_id' => $documentId]);
// 1. 从S3下载文档
$localPath = $this->downloadFromS3($s3Path);
// 2. 调用Python脚本执行LlamaIndex核心逻辑
// 生产环境应使用更健壮的IPC或RPC机制
$command = sprintf(
"python3 /app/scripts/llama_indexer.py --path %s --id %s",
escapeshellarg($localPath),
escapeshellarg($documentId)
);
exec($command, $output, $returnCode);
if ($returnCode !== 0) {
throw new \RuntimeException("LlamaIndex script failed: " . implode("\n", $output));
}
$this->logger->info("LlamaIndex processing finished", ['document_id' => $documentId]);
// 3. 清理本地文件
unlink($localPath);
}
// ... downloadFromS3 implementation ...
}
架构的局限性与未来路径
这套基于PHP自实现的2PC混合架构,虽然解决了眼前的强一致性与异步处理的矛盾,但它并非银弹。我们必须清醒地认识到其固有的局限性。
首先,协调者(TC)的实现是简化的。在生产环境中,TC必须是高可用的,并且需要持久化事务状态,以便在自身崩溃重启后能恢复事务,决定是继续提交还是回滚。目前的实现缺少这一点,协调者的崩溃可能导致参与者资源被永久锁定。
其次,2PC协议本身的阻塞特性决定了它不适用于高并发、低延迟的场景。当参与事务的服务增多,或者网络延迟变大时,整个系统的性能会急剧下降。它的适用范围被严格限定在参与者数量少、内部网络环境可靠、且对数据原子性有极高要求的关键业务流上。
最后,从事务成功到AI处理完成之间的数据延迟是客观存在的。业务方必须理解并接受这种最终一致性,相关的监控、告警和失败重试机制(如死信队列)是异步系统不可或-少-的配套设施。
对于未来的演进,如果系统的复杂度和对可用性的要求进一步提高,我们可能需要考虑放弃自研2PC,转而采用经过生产环境检验的、更成熟的分布式事务解决方案,例如引入Seata这样的分布式事务中间件,或者在架构层面转向基于Raft/Paxos协议的共识系统,但这通常也意味着需要引入更复杂的技术栈(如Java生态)。