构建基于 Tekton 的状态化模型晋级流水线


一个常见的工程误区是将 MLOps 流水线等同于常规的 CI/CD。模型训练与部署的生命周期远比构建一个二进制包复杂,其核心区别在于对“状态”的依赖。一个无状态的流水线只能执行重复性任务,而一个生产级的 MLOps 系统必须能够回答:当前生产环境的模型版本是什么?上一个候选模型的评估指标如何?这次训练结果是否满足自动晋级的阈值?这些问题都指向了一个核心挑战:如何为本质上无状态的 Tekton 流水线引入可靠的状态管理机制。

单纯使用 Tekton PipelineRunresults 传递数据在简单场景下尚可,但它无法跨 PipelineRun 实例持久化状态。每次运行都是一次孤立的事件。在模型迭代中,我们需要一个单一事实来源(Single Source of Truth)来记录每个模型的谱系、性能和部署状态。

方案权衡:流水线内状态 vs. 外部持久化状态

方案 A: Tekton 原生机制(Results 与 Artifacts)

这种方法尝试在 Tekton 的框架内解决状态传递。例如,评估任务(Task)可以将模型的准确率、F1 分数等指标作为 results 输出,后续的任务可以消费这些 results 来做决策。

  • 优势:

    • 实现简单,无需引入外部依赖。
    • 对于单次运行内的条件判断(例如,评估分数 > 0.9 才执行部署)足够。
  • 劣势:

    • 状态不持久: results 的生命周期与 PipelineRun 绑定,一旦 PipelineRun 被清理,历史信息就丢失了。无法进行长期的模型性能追踪。
    • 数据结构限制: results 通常是简单的键值对字符串,不适合存储复杂的结构化数据,如每个类别的详细指标、模型参数等。
    • 并发问题: 如果多个流水线并发运行,无法协调和获得一个全局一致的状态视图。

方案 B: 引入外部状态存储

此方案承认 Tekton 的无状态特性,并将其与一个专门的状态管理器解耦。这个管理器可以是简单的文件(存储在 PVC 上)、一个关系型数据库,甚至是专用的元数据存储(如 MLflow Tracking)。流水线中的任务通过标准接口(如 API 调用或文件读写)与该存储交互。

  • 优势:

    • 持久化与审计: 所有模型的元数据、指标和部署历史都被永久记录,可供审计和回溯。
    • 全局视图: 提供了一个跨所有流水线运行的单一事实来源,实现了真正的模型注册表(Model Registry)功能。
    • 解耦与扩展: 流水线专注于编排,状态管理逻辑则封装在外部系统中。未来可以轻易替换状态存储(从 JSON 文件升级到数据库)而无需大规模修改流水线定义。
  • 劣劣势:

    • 增加复杂性: 需要部署和维护一个额外的组件。
    • 访问控制: 需要为流水线任务配置访问外部存储的凭证和权限。

对于任何严肃的 MLOps 实践,方案 B 是唯一可行的路径。本文将实现一个精简版的方案 B,使用一个挂载在持久卷(Persistent Volume Claim, PVC)上的 JSON 文件作为我们的模型注册表和状态管理器,以此来驱动一个包含 Hugging Face 和 Scikit-learn 模型的自动化晋级流水线。

核心架构设计

我们的目标是构建一个能够自动训练、评估,并根据预设规则决定是否将新模型“晋级”到候选发布状态的流水线。

graph TD
    A[Start PipelineRun] --> B{1. train-model};
    B -- model.pkl & tokenizer --> C{2. evaluate-model};
    C -- evaluation_metrics.json --> D{3. promote-model};
    
    subgraph "State Management on PVC"
        E[model_registry.json]
    end

    B -- writes --> E;
    C -- writes --> E;
    D -- reads & writes --> E;

    D -- "if promotion_gate_passed" --> F[Mark as 'Staging'];
    D -- "else" --> G[Mark as 'Rejected'];

    subgraph "Tekton Workspace (PVC)"
        H[shared-workspace]
        H -- contains --> I[model.pkl]
        H -- contains --> J[tokenizer]
        H -- contains --> K[evaluation_metrics.json]
        H -- contains --> E
    end

    B -- uses --> H;
    C -- uses --> H;
    D -- uses --> H;

流水线流程解析:

  1. train-model 任务:

    • 使用 Scikit-learn 和 Hugging Face Transformers 训练一个文本分类模型。
    • 将训练好的模型 (model.pkl) 和分词器 (tokenizer/) 保存到共享工作空间。
    • 生成一个唯一的模型版本号(例如,基于时间戳和 Git commit hash)。
    • model_registry.json 中为这个新版本创建一个条目,状态为 TRAINING_COMPLETED
  2. evaluate-model 任务:

    • 从共享工作空间加载模型和分词器。
    • 在预留的测试数据集上评估模型性能,生成包含准确率、精确率、召回率等指标的 evaluation_metrics.json
    • 将评估结果更新到 model_registry.json 中对应模型版本的条目里。
  3. promote-model 任务:

    • 读取 model_registry.json,获取最新训练模型的评估指标。
    • 与预定义的晋级阈值(例如,accuracy > 0.9)进行比较。
    • 如果满足条件,将该模型版本的状态更新为 STAGING
    • 如果不满足条件,则更新为 REJECTED

这个设计的关键在于,model_registry.json 文件作为所有任务之间以及不同 PipelineRun 之间的通信和状态同步媒介。

项目结构与核心代码实现

为了保证流水线的可复现性,我们将所有脚本和依赖打包到一个自定义的 Docker 镜像中。

1. 项目文件结构

.
├── tekton/
│   ├── 00-pvc.yaml               # 持久卷声明
│   ├── 01-task-train-model.yaml  # 训练任务
│   ├── 02-task-evaluate-model.yaml # 评估任务
│   ├── 03-task-promote-model.yaml  # 晋级任务
│   ├── 04-pipeline.yaml          # 完整的流水线
│   └── 05-pipelinerun.yaml       # 触发一次运行
├── app/
│   ├── Dockerfile                # 用于构建任务镜像
│   ├── requirements.txt          # Python 依赖
│   ├── train.py                  # 训练脚本
│   ├── evaluate.py               # 评估脚本
│   └── promote.py                # 晋级逻辑脚本
└── data/                         # 模拟数据
    ├── train.csv
    └── test.csv

2. Dockerfile 与依赖

app/Dockerfile:

# 使用一个包含 Python 的基础镜像
FROM python:3.9-slim

# 设置工作目录
WORKDIR /app

# 复制依赖文件并安装
COPY requirements.txt .
# --no-cache-dir 减小镜像体积
RUN pip install --no-cache-dir -r requirements.txt

# 复制所有应用代码
COPY . .

# 定义默认入口点,方便调试
ENTRYPOINT ["python"]

app/requirements.txt:

scikit-learn==1.2.2
transformers==4.28.1
torch==2.0.0
pandas==1.5.3

3. 状态管理核心:模型注册表 model_registry.json

这是一个示例文件,流水线会动态地创建和更新它。

{
  "models": {
    "20231027103000-a1b2c3d": {
      "version": "20231027103000-a1b2c3d",
      "status": "TRAINING_COMPLETED",
      "timestamp": "2023-10-27T10:30:00Z",
      "artifacts": {
        "model_path": "models/20231027103000-a1b2c3d/model.pkl",
        "tokenizer_path": "models/20231027103000-a1b2c3d/tokenizer"
      },
      "metrics": null
    },
    "20231027110000-f4e5d6c": {
      "version": "20231027110000-f4e5d6c",
      "status": "STAGING",
      "timestamp": "2023-10-27T11:00:00Z",
      "artifacts": {
        "model_path": "models/20231027110000-f4e5d6c/model.pkl",
        "tokenizer_path": "models/20231027110000-f4e5d6c/tokenizer"
      },
      "metrics": {
        "accuracy": 0.92,
        "precision": 0.91,
        "recall": 0.93
      }
    }
  },
  "latest_version": "20231027110000-f4e5d6c"
}

这个结构包含了模型列表和指向最新版本的指针,是状态管理的核心。

4. Python 脚本 (app/*.py)

这些脚本是流水线任务的实际执行者。它们被设计为从命令行接收参数,并与共享工作空间中的文件系统进行交互。

app/train.py (核心片段)

import os
import json
import time
import argparse
import pandas as pd
from sklearn.model_selection import train_test_split
from sklearn.pipeline import Pipeline
from sklearn.feature_extraction.text import TfidfVectorizer
from sklearn.linear_model import LogisticRegression
from transformers import BertTokenizer
import joblib
import logging

# 配置日志
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')

def update_registry(registry_path, model_version, model_path, tokenizer_path):
    """
    更新模型注册表,添加新训练的模型信息
    这是一个关键的状态管理操作,需要处理文件锁以避免并发写入问题
    在生产环境中,应使用更健壮的机制,例如数据库事务
    """
    # 在简单场景下,我们假设流水线串行执行,暂不实现文件锁
    if not os.path.exists(registry_path):
        registry = {"models": {}, "latest_version": None}
    else:
        with open(registry_path, 'r') as f:
            registry = json.load(f)

    registry["models"][model_version] = {
        "version": model_version,
        "status": "TRAINING_COMPLETED",
        "timestamp": time.strftime("%Y-%m-%dT%H:%M:%SZ", time.gmtime()),
        "artifacts": {
            "model_path": model_path,
            "tokenizer_path": tokenizer_path,
        },
        "metrics": None
    }
    registry["latest_version"] = model_version

    with open(registry_path, 'w') as f:
        json.dump(registry, f, indent=2)
    logging.info(f"Updated registry for model version: {model_version}")

def main():
    parser = argparse.ArgumentParser()
    parser.add_argument("--workspace-path", type=str, required=True, help="Path to the shared workspace")
    parser.add_argument("--model-version", type=str, required=True, help="Unique version for this model run")
    args = parser.parse_args()

    # 定义路径
    model_dir = os.path.join(args.workspace_path, "models", args.model_version)
    model_save_path = os.path.join(model_dir, "model.pkl")
    tokenizer_save_path = os.path.join(model_dir, "tokenizer")
    registry_file_path = os.path.join(args.workspace_path, "model_registry.json")

    os.makedirs(model_dir, exist_ok=True)
    os.makedirs(tokenizer_save_path, exist_ok=True)

    # 模拟加载数据
    # 在真实项目中,这里会从数据源(如 S3, GCS)拉取
    data = {'text': ['I love this product', 'This is terrible', 'Hugging Face is great', 'I hate waiting'], 'label': [1, 0, 1, 0]}
    df = pd.DataFrame(data)

    # 使用 Hugging Face Tokenizer (虽然模型是 Sklearn,但可以混合使用生态工具)
    tokenizer = BertTokenizer.from_pretrained('bert-base-uncased')
    tokenizer.save_pretrained(tokenizer_save_path)
    logging.info(f"Tokenizer saved to {tokenizer_save_path}")

    # 定义 Scikit-learn 管道
    pipeline = Pipeline([
        ('tfidf', TfidfVectorizer()),
        ('clf', LogisticRegression())
    ])

    # 训练模型
    logging.info("Starting model training...")
    pipeline.fit(df['text'], df['label'])
    logging.info("Model training completed.")

    # 保存模型
    joblib.dump(pipeline, model_save_path)
    logging.info(f"Model saved to {model_save_path}")

    # 更新状态管理器
    update_registry(
        registry_path=registry_file_path,
        model_version=args.model_version,
        model_path=os.path.relpath(model_save_path, args.workspace_path),
        tokenizer_path=os.path.relpath(tokenizer_save_path, args.workspace_path)
    )

if __name__ == "__main__":
    main()

关键点:

  • update_registry 函数是与状态管理器交互的核心。它读取、修改并写回 JSON 文件。
  • 脚本通过 argparse 接收来自 Tekton Task 的参数,如工作空间路径和模型版本,实现了脚本与编排层的解耦。

app/evaluate.pyapp/promote.py 的结构类似,它们同样接收 workspace-pathmodel-version,然后读取 model_registry.json,执行各自的逻辑(评估并更新 metrics,或比较 metrics 并更新 status),最后写回文件。

5. Tekton 资源定义

tekton/00-pvc.yaml

apiVersion: v1
kind: PersistentVolumeClaim
metadata:
  name: mlops-pvc
spec:
  accessModes:
    - ReadWriteOnce
  resources:
    requests:
      storage: 1Gi

tekton/01-task-train-model.yaml

apiVersion: tekton.dev/v1beta1
kind: Task
metadata:
  name: train-model
spec:
  description: Trains a model and registers it.
  workspaces:
    - name: source
      description: The shared workspace for artifacts and state.
  params:
    - name: model-version
      type: string
      description: The unique version for this model run.
    - name: image
      type: string
      description: The container image to use for this task.
      default: "your-repo/mlops-runner:latest" # 替换为你的镜像
  steps:
    - name: train
      image: $(params.image)
      script: |
        #!/usr/bin/env bash
        set -e
        python train.py \
          --workspace-path $(workspaces.source.path) \
          --model-version $(params.model-version)

关键点:

  • workspaces 定义了对 PVC 的挂载点,这是实现任务间数据共享和状态持久化的基础。
  • params 允许我们在 PipelineRun 中动态传入配置,如每次运行唯一的 model-version

02-task-evaluate-model.yaml03-task-promote-model.yaml 的结构与此非常相似,只是调用了不同的 Python 脚本。

tekton/04-pipeline.yaml

apiVersion: tekton.dev/v1beta1
kind: Pipeline
metadata:
  name: model-promotion-pipeline
spec:
  workspaces:
    - name: shared-workspace
  params:
    - name: model-version
      type: string
      description: "e.g., $(tasks.git-clone.results.commit)" # 在实际项目中,这通常来自 git
    - name: promotion-threshold
      type: string
      description: "Accuracy threshold for model promotion"
      default: "0.9"

  tasks:
    - name: train
      taskRef:
        name: train-model
      workspaces:
        - name: source
          workspace: shared-workspace
      params:
        - name: model-version
          value: $(params.model-version)

    - name: evaluate
      taskRef:
        name: evaluate-model
      runAfter: [train] # 保证执行顺序
      workspaces:
        - name: source
          workspace: shared-workspace
      params:
        - name: model-version
          value: $(params.model-version)

    - name: promote
      taskRef:
        name: promote-model
      runAfter: [evaluate]
      workspaces:
        - name: source
          workspace: shared-workspace
      params:
        - name: model-version
          value: $(params.model-version)
        - name: threshold
          value: $(params.promotion-threshold)

关键点:

  • runAfter 关键字定义了任务之间的依赖关系,构成了流水线的执行图。
  • 整个 Pipeline 共享同一个 workspace (shared-workspace),确保所有任务都读写同一个 PVC。

tekton/05-pipelinerun.yaml

apiVersion: tekton.dev/v1beta1
kind: PipelineRun
metadata:
  generateName: model-promotion-run-
spec:
  pipelineRef:
    name: model-promotion-pipeline
  workspaces:
    - name: shared-workspace
      persistentVolumeClaim:
        claimName: mlops-pvc # 绑定到我们创建的PVC
  params:
    - name: model-version
      value: "run-20231027-$(context.pipelineRun.name | slice 20)" # 生成一个动态的版本号
    - name: promotion-threshold
      value: "0.85"

通过 kubectl apply -f tekton/05-pipelinerun.yaml 即可触发一次完整的、状态化的模型训练与晋级流程。

局限性与未来迭代路径

当前基于 PVC 和 JSON 文件的状态管理方案虽然有效地解决了 Tekton 的无状态问题,但在生产环境中也暴露出了一些局限性。

首先,文件锁机制的缺失是最大的风险。当多个 PipelineRun 并发执行时,对 model_registry.json 的并发写操作可能导致数据损坏或状态不一致。一个直接的改进是引入一个简单的文件锁库(如 filelock for Python),在读写操作前后加锁和解锁,但这会引入锁竞争,降低流水线的并行度。

其次,JSON 文件作为状态数据库,在模型版本数量增多后,查询和更新性能会下降。每次更新都需要完整地读写整个文件,效率低下。

一个更健壮的迭代方向是将状态管理迁移到一个真正的数据库系统。使用 SQLite 可以在不引入外部服务依赖的情况下提供事务支持和结构化查询能力。对于更大规模的团队协作,PostgreSQL 或 MySQL 配合一个简单的 Flask/FastAPI 服务来暴露状态管理的 API 接口,会是更理想的架构。这将把状态管理彻底服务化,Tekton 任务仅作为 API 客户端,进一步强化了关注点分离。最终,可以考虑集成 MLflow Tracking Server 这类专用的 MLOps 元数据存储,它不仅解决了状态管理,还提供了模型谱系追踪、实验对比等更丰富的功能。


  目录