PHP构建混合事务模型 融合2PC强一致性与LlamaIndex异步AI处理


一个棘手的架构需求摆在面前:一个高合规性的金融文档处理系统,其核心要求是任何文档的元数据入库操作,必须与一个不可篡改的审计日志条目实现原子性的绑定。如果元数据写入成功但审计日志失败,整个操作必须回滚,反之亦然。同时,系统需要在文档入库后,调用一个基于LlamaIndex的AI服务对文档内容进行深度分析和向量化,这是一个计算密集且耗时、可能失败的异步任务。

将所有操作打包进一个巨大的数据库事务是不可行的。AI处理过程可能耗时数分钟,长时间持有数据库锁会直接拖垮整个系统。而如果采用完全解耦的事件驱动架构,审计日志的写入就成了最终一致性,这无法满足“写入即存在”的强合规要求。

这里的核心矛盾是:系统内部存在两种截然不同的一致性需求。一部分需要强一致性(元数据与审计),另一部分则可以接受最终一致性(AI分析)。强行统一或忽略差异都会导致灾难。

方案A:Saga模式的诱惑与陷阱

Saga模式作为分布式事务的常见解决方案,首先进入了我们的视野。其核心思想是将一个长事务拆分为多个本地事务,通过事件或消息驱动,并为每个步骤提供补偿操作。

sequenceDiagram
    participant Client
    participant MetadataService
    participant MessageQueue
    participant AuditService
    participant AIService

    Client->>+MetadataService: 1. CreateDocument()
    MetadataService->>MetadataService: a. Begin Local Tx
    MetadataService->>MetadataService: b. INSERT INTO documents...
    MetadataService->>MessageQueue: 2. Publish DocumentCreated Event
    MetadataService->>MetadataService: c. Commit Local Tx
    MetadataService-->>-Client: OK
    
    MessageQueue-->>+AuditService: 3. Consume DocumentCreated
    AuditService->>AuditService: INSERT INTO audit_logs...
    AuditService-->>-MessageQueue: ACK

    MessageQueue-->>+AIService: 4. Consume DocumentCreated
    AIService->>AIService: Start LlamaIndex processing...
    AIService-->>-MessageQueue: ACK

优势分析:

  1. 高可用性: 服务之间完全解耦。AuditService或AIService的临时故障不会阻塞MetadataService的写入。
  2. 短事务: 每个服务都只处理自己的本地事务,资源锁定时间极短,系统吞吐量高。

劣势分析:

  1. 一致性降级:DocumentCreated事件发布到AuditService成功消费之间存在一个时间窗口。在这个窗口内,系统处于不一致状态:元数据存在,但审计日志缺失。对于需要强审计的场景,这是不可接受的。
  2. 补偿复杂性: 如果AuditService写入失败,我们需要触发一个补偿操作来删除已创建的元数据。补偿逻辑的开发和维护成本非常高,且难以保证100%的可靠性。

在真实项目中,依赖补偿逻辑来保证核心业务数据的完整性是一条充满风险的道路。对于我们这个场景,Saga模式牺牲了最关键的原子性保证。

方案B:两阶段提交(2PC)的回归与约束

既然Saga的最终一致性不满足要求,我们必须回头审视能提供原子性保证的经典协议:两阶段提交(2PC)。2PC通过引入一个协调者(Coordinator)来确保所有参与者(Participants)要么全部提交,要么全部回滚。

graph TD
    subgraph "Transaction Scope (Strong Consistency)"
        TC(Transaction Coordinator)
        P1(Participant 1: Metadata Service)
        P2(Participant 2: Audit Service)
        TC -- 1. Prepare --> P1
        TC -- 1. Prepare --> P2
        P1 -- 2. Voted Yes --> TC
        P2 -- 2. Voted Yes --> TC
        TC -- 3. Commit --> P1
        TC -- 3. Commit --> P2
    end

    subgraph "Asynchronous Scope (Eventual Consistency)"
        TC -- 4. Trigger After Commit --> MQ(Message Queue)
        MQ --> Worker(AI Worker: LlamaIndex)
    end

优势分析:

  1. 原子性保证: 这是2PC的核心价值。它能确保Metadata和Audit两个操作作为一个原子单元被提交,完美契合我们的合规需求。
  2. 协议简单: 相较于Paxos或Raft,2PC的逻辑更直观,在参与者数量可控的情况下,自行实现是可行的。

劣势分析:

  1. 同步阻塞: 在第一阶段(Prepare)结束到第二阶段(Commit/Rollback)开始之间,所有参与者持有的资源(如数据库行锁)都会被锁定。如果协调者宕机,这些资源将永久锁定,需要人工干预。
  2. 协调者单点: 协调者是整个系统的单点故障。
  3. 性能开销: 两次网络往返通信带来了额外的延迟。

这里的关键权衡在于,我们是否愿意为了获得绝对的原子性,而去承担2PC带来的同步阻塞和单点风险。对于我们这个场景——核心数据写入操作,答案是肯定的。但前提是,必须将耗时且不可靠的AI处理剥离出事务边界。

最终决策:2PC + 异步消息的混合模型

我们的最终架构选择了一个混合模型:

  1. 使用2PC来保证MetadataServiceAuditService这两个核心服务的强一致性。
  2. 在2PC事务成功提交后,由协调者(Coordinator)向消息队列发送一个事件。
  3. 一个独立的AI Worker消费此事件,异步执行LlamaIndex的文档处理任务。

这个模型精确地将不同的一致性需求映射到了合适的技术方案上,既保证了核心数据的完整性,又避免了长事务对系统可用性的破坏。

核心实现概览

我们将使用PHP来实现这套体系。服务间通信采用HTTP RESTful API。OAuth 2.0的客户端凭证(Client Credentials)模式用于服务间的安全认证。

1. OAuth 2.0 服务间认证

在发起事务前,事务协调者(或其上游服务)必须先从认证服务器获取一个Access Token。

AuthClient.php (获取Token的客户端)

<?php
declare(strict_types=1);

namespace App\Clients;

use GuzzleHttp\Client;
use GuzzleHttp\Exception\GuzzleException;
use Psr\Log\LoggerInterface;

class AuthClient
{
    private Client $httpClient;
    private string $tokenUrl;
    private string $clientId;
    private string $clientSecret;
    private LoggerInterface $logger;

    public function __construct(
        string $tokenUrl,
        string $clientId,
        string $clientSecret,
        LoggerInterface $logger
    ) {
        $this->httpClient = new Client(['timeout' => 5.0]);
        $this->tokenUrl = $tokenUrl;
        $this->clientId = $clientId;
        $this->clientSecret = $clientSecret;
        $this->logger = $logger;
    }

    public function getServiceToken(): ?string
    {
        try {
            $response = $this->httpClient->post($this->tokenUrl, [
                'form_params' => [
                    'grant_type' => 'client_credentials',
                    'client_id' => $this->clientId,
                    'client_secret' => $this->clientSecret,
                    'scope' => 'transaction:manage', // 精细化权限控制
                ]
            ]);

            if ($response->getStatusCode() !== 200) {
                $this->logger->error('Failed to get auth token', [
                    'status' => $response->getStatusCode(),
                    'body' => $response->getBody()->getContents()
                ]);
                return null;
            }

            $data = json_decode($response->getBody()->getContents(), true);
            return $data['access_token'] ?? null;
        } catch (GuzzleException $e) {
            $this->logger->critical('Auth service request failed', [
                'error' => $e->getMessage()
            ]);
            return null;
        }
    }
}

2. 两阶段提交协调者 (Transaction Coordinator)

这是整个方案的大脑。它负责生成全局事务ID,并按顺序调用所有参与者的preparecommit/rollback接口。

TransactionCoordinatorService.php

<?php
declare(strict_types=1);

namespace App\Service;

use GuzzleHttp\Client;
use GuzzleHttp\Promise;
use Psr\Http\Message\ResponseInterface;
use Psr\Log\LoggerInterface;
use Ramsey\Uuid\Uuid;

class TransactionCoordinatorService
{
    private array $participants; // e.g., ['http://metadata-service', 'http://audit-service']
    private Client $httpClient;
    private LoggerInterface $logger;
    private MessageQueueClient $mqClient;
    private string $accessToken;

    public function __construct(
        array $participantEndpoints,
        MessageQueueClient $mqClient,
        LoggerInterface $logger,
        string $accessToken
    ) {
        $this->participants = $participantEndpoints;
        $this->mqClient = $mqClient;
        $this->logger = $logger;
        $this->accessToken = $accessToken;
        
        // 生产环境中,超时配置至关重要,防止无限期阻塞
        $this->httpClient = new Client([
            'timeout' => 10.0, // 总超时
            'connect_timeout' => 3.0, // 连接超时
        ]);
    }

    public function executeTransaction(array $payload): bool
    {
        $transactionId = Uuid::uuid4()->toString();
        $this->logger->info("Starting 2PC transaction", ['tx_id' => $transactionId]);

        // --- Phase 1: Prepare ---
        $prepareSuccess = $this->broadcastPrepare($transactionId, $payload);
        
        if (!$prepareSuccess) {
            $this->logger->warning("Prepare phase failed. Broadcasting rollback.", ['tx_id' => $transactionId]);
            $this->broadcastRollback($transactionId);
            return false;
        }

        // --- Phase 2: Commit ---
        $commitSuccess = $this->broadcastCommit($transactionId);

        if (!$commitSuccess) {
            $this->logger->critical("Commit phase failed! Manual intervention required!", [
                'tx_id' => $transactionId,
                'participants' => $this->participants
            ]);
            // 这是一个危险状态,部分节点可能已提交。
            // 真实项目中需要复杂的恢复机制。
            return false;
        }

        $this->logger->info("2PC transaction committed successfully.", ['tx_id' => $transactionId]);

        // --- Post-Commit: Trigger Asynchronous Task ---
        $this->triggerAsyncProcessing($payload);

        return true;
    }

    private function broadcastPrepare(string $txId, array $payload): bool
    {
        $promises = [];
        foreach ($this->participants as $endpoint) {
            $promises[$endpoint] = $this->httpClient->postAsync("{$endpoint}/prepare", [
                'headers' => $this->getAuthHeaders(),
                'json' => ['tx_id' => $txId, 'data' => $payload]
            ]);
        }
        
        try {
            // 并行发送所有prepare请求
            $responses = Promise\Utils::settle($promises)->wait();
            
            foreach ($responses as $endpoint => $result) {
                if ($result['state'] === 'rejected' || $result['value']->getStatusCode() !== 200) {
                    $this->logger->error("Participant failed to prepare.", [
                        'tx_id' => $txId,
                        'endpoint' => $endpoint,
                        'reason' => $result['reason'] ?? $result['value']->getReasonPhrase()
                    ]);
                    return false;
                }
            }
            return true;
        } catch (\Throwable $e) {
            $this->logger->error("Error during prepare phase.", ['tx_id' => $txId, 'error' => $e->getMessage()]);
            return false;
        }
    }

    private function broadcastCommit(string $txId): bool
    {
        // 提交阶段必须保证所有参与者都收到指令,即使有网络错误也要重试
        // 生产级实现会使用持久化队列或状态机来确保指令的最终送达
        $allCommitted = true;
        foreach ($this->participants as $endpoint) {
            try {
                $response = $this->httpClient->post("{$endpoint}/commit", [
                    'headers' => $this->getAuthHeaders(),
                    'json' => ['tx_id' => $txId]
                ]);
                if ($response->getStatusCode() !== 200) {
                     $this->logger->error("Participant failed to commit.", ['tx_id' => $txId, 'endpoint' => $endpoint]);
                     $allCommitted = false; // 记录失败,但继续通知其他节点
                }
            } catch (\Exception $e) {
                 $this->logger->error("Error committing participant.", ['tx_id' => $txId, 'endpoint' => $endpoint, 'error' => $e->getMessage()]);
                 $allCommitted = false;
            }
        }
        return $allCommitted;
    }

    private function broadcastRollback(string $txId): void
    {
        // 回滚也需要尽力而为地通知所有参与者
        foreach ($this->participants as $endpoint) {
             try {
                $this->httpClient->post("{$endpoint}/rollback", [
                    'headers' => $this->getAuthHeaders(),
                    'json' => ['tx_id' => $txId]
                ]);
            } catch (\Exception $e) {
                 $this->logger->error("Error rolling back participant.", ['tx_id' => $txId, 'endpoint' => $endpoint, 'error' => $e->getMessage()]);
            }
        }
    }
    
    private function triggerAsyncProcessing(array $payload): void
    {
        $jobPayload = [
            'document_id' => $payload['id'],
            's3_path' => $payload['path'],
        ];
        $this->mqClient->enqueue('llama_index_queue', $jobPayload);
        $this->logger->info("Enqueued job for async AI processing.", ['document_id' => $payload['id']]);
    }

    private function getAuthHeaders(): array
    {
        return [
            'Authorization' => 'Bearer ' . $this->accessToken,
            'Content-Type' => 'application/json',
            'Accept' => 'application/json',
        ];
    }
}

3. 事务参与者 (Transaction Participant)

每个参与者(MetadataService, AuditService)都需要实现三个接口:/prepare, /commit, /rollback。这里以MetadataService为例,它使用PDO来管理本地数据库事务。

ParticipantController.php (在一个简单的路由框架下)

<?php
declare(strict_types=1);

namespace App\Controller;

use Psr\Http\Message\ServerRequestInterface as Request;
use Psr\Http\Message\ResponseInterface as Response;
use PDO;

class ParticipantController
{
    private PDO $pdo;
    private array $preparedTransactions = []; // 内存中存储预备事务,真实项目需要持久化

    public function __construct(PDO $pdo)
    {
        $this->pdo = $pdo;
    }

    public function prepare(Request $request, Response $response): Response
    {
        $body = json_decode((string)$request->getBody(), true);
        $txId = $body['tx_id'] ?? null;
        $data = $body['data'] ?? null;

        if (!$txId || !$data) {
            return $response->withStatus(400);
        }

        try {
            // 关键:开启本地事务,但暂不提交
            $this->pdo->beginTransaction();
            
            // 执行业务逻辑
            $stmt = $this->pdo->prepare(
                "INSERT INTO documents (id, name, path, created_at) VALUES (?, ?, ?, NOW())"
            );
            $stmt->execute([$data['id'], $data['name'], $data['path']]);
            
            // 记录此事务已准备好,并与本地PDO事务关联
            // 在真实项目中,这里应该将 txId 和连接/会话信息持久化到
            // 一个事务日志表中,以便服务重启后能恢复状态。
            $this->preparedTransactions[$txId] = true;
            
            // 返回成功,代表投票 "YES"
            return $response->withStatus(200);

        } catch (\Exception $e) {
            // 如果准备阶段出错,立即回滚本地事务
            if ($this->pdo->inTransaction()) {
                $this->pdo->rollBack();
            }
            unset($this->preparedTransactions[$txId]);
            
            // 返回失败,代表投票 "NO"
            return $response->withStatus(500);
        }
    }

    public function commit(Request $request, Response $response): Response
    {
        $body = json_decode((string)$request->getBody(), true);
        $txId = $body['tx_id'] ?? null;

        if (!isset($this->preparedTransactions[$txId])) {
            // 可能因为网络问题重复收到commit指令,如果已处理则幂等返回成功
            // 或者是一个未知的事务ID,返回错误
            return $response->withStatus(404);
        }

        try {
            // 提交之前保持的本地事务
            $this->pdo->commit();
            unset($this->preparedTransactions[$txId]);
            return $response->withStatus(200);
        } catch (\Exception $e) {
            // Commit失败是灾难性的,本地数据可能已损坏
            // 需要告警并人工介入
            // Log a critical error
            return $response->withStatus(500);
        }
    }
    
    public function rollback(Request $request, Response $response): Response
    {
        $body = json_decode((string)$request->getBody(), true);
        $txId = $body['tx_id'] ?? null;

        if (!isset($this->preparedTransactions[$txId])) {
            return $response->withStatus(404);
        }
        
        try {
            $this->pdo->rollBack();
            unset($this->preparedTransactions[$txId]);
            return $response->withStatus(200);
        } catch (\Exception $e) {
            // Log error
            return $response->withStatus(500);
        }
    }
}

单元测试思路:

  • Coordinator: 模拟参与者返回不同状态码(200, 500)和网络异常,断言Coordinator是否能正确广播commitrollback
  • Participant: 模拟数据库操作成功/失败,检查/prepare接口是否能正确开启事务并投票。测试/commit/rollback接口的幂等性。

4. 异步AI Worker

这个Worker是一个独立的长时间运行的进程,它监听消息队列。这里我们只展示概念代码,实际项目中它可能是一个用Python编写的、充分利用LlamaIndex能力的服务。

LlamaIndexWorker.php (消费者)

<?php
declare(strict_types=1);

namespace App\Worker;

class LlamaIndexWorker 
{
    private MessageQueueClient $mqClient;
    private LoggerInterface $logger;

    public function run()
    {
        while (true) {
            $job = $this->mqClient->dequeue('llama_index_queue');
            if ($job) {
                try {
                    $this->processDocument($job);
                } catch (\Exception $e) {
                    $this->logger->error("Failed to process document for LlamaIndex", [
                        'job' => $job,
                        'error' => $e->getMessage()
                    ]);
                    // 将任务移入死信队列或执行重试策略
                }
            }
            sleep(1);
        }
    }
    
    private function processDocument(array $jobData): void
    {
        $documentId = $jobData['document_id'];
        $s3Path = $jobData['s3_path'];
        $this->logger->info("Starting LlamaIndex processing", ['document_id' => $documentId]);
        
        // 1. 从S3下载文档
        $localPath = $this->downloadFromS3($s3Path);

        // 2. 调用Python脚本执行LlamaIndex核心逻辑
        // 生产环境应使用更健壮的IPC或RPC机制
        $command = sprintf(
            "python3 /app/scripts/llama_indexer.py --path %s --id %s",
            escapeshellarg($localPath),
            escapeshellarg($documentId)
        );
        
        exec($command, $output, $returnCode);

        if ($returnCode !== 0) {
            throw new \RuntimeException("LlamaIndex script failed: " . implode("\n", $output));
        }

        $this->logger->info("LlamaIndex processing finished", ['document_id' => $documentId]);
        
        // 3. 清理本地文件
        unlink($localPath);
    }
    // ... downloadFromS3 implementation ...
}

架构的局限性与未来路径

这套基于PHP自实现的2PC混合架构,虽然解决了眼前的强一致性与异步处理的矛盾,但它并非银弹。我们必须清醒地认识到其固有的局限性。

首先,协调者(TC)的实现是简化的。在生产环境中,TC必须是高可用的,并且需要持久化事务状态,以便在自身崩溃重启后能恢复事务,决定是继续提交还是回滚。目前的实现缺少这一点,协调者的崩溃可能导致参与者资源被永久锁定。

其次,2PC协议本身的阻塞特性决定了它不适用于高并发、低延迟的场景。当参与事务的服务增多,或者网络延迟变大时,整个系统的性能会急剧下降。它的适用范围被严格限定在参与者数量少、内部网络环境可靠、且对数据原子性有极高要求的关键业务流上。

最后,从事务成功到AI处理完成之间的数据延迟是客观存在的。业务方必须理解并接受这种最终一致性,相关的监控、告警和失败重试机制(如死信队列)是异步系统不可或-少-的配套设施。

对于未来的演进,如果系统的复杂度和对可用性的要求进一步提高,我们可能需要考虑放弃自研2PC,转而采用经过生产环境检验的、更成熟的分布式事务解决方案,例如引入Seata这样的分布式事务中间件,或者在架构层面转向基于Raft/Paxos协议的共识系统,但这通常也意味着需要引入更复杂的技术栈(如Java生态)。


  目录