团队的算法工程师扔给我一个 Git 仓库地址和一个 model.pkl 文件,说:“模型更新了,帮忙上线吧。”我问他这次更新的效果怎么样,他说:“效果嘎嘎好,在我笔记本上准确率提高了5个点。”我再问他,这次用的训练数据是哪一批?特征工程的逻辑改了什么?有没有做过回归测试?他开始支支吾吾。
这就是我们团队在引入 MLOps 之前的日常。模型迭代过程严重依赖于个人经验和本地环境,整个流程就像一个黑盒。没有版本控制,没有自动化测试,更没有可复现的构建过程。任何一次模型上线都像是一次赌博,我们祈祷它不会在线上搞出什么幺蛾子。这种混乱的状态必须终结。
我的初步构想是,必须将软件工程中成熟的 CI/CD 理念引入到机器学习流程中。每一次模型迭代,都应该像一次代码提交一样,被严格地、自动化地对待。我们需要一个流水线,它能做到:
- 代码驱动: 无论是训练脚本、模型定义还是特征工程的变更,都必须通过 Git 提交来触发。
- 环境隔离: 每次流水线运行都应该在干净、一致的环境中执行,杜绝“在我本地是好的”这类问题。
- 自动化验证: 新模型必须自动与线上基线模型进行性能比较,只有表现更优的模型才能进入下一阶段。
- 不可变制品: 通过验证的模型将被打包成一个带版本标签的、不可变的容器镜像,作为唯一的部署单元。
- 过程可追溯: 整个流程的每一步,从数据版本到代码提交哈希,再到最终生成的模型制品,都必须有清晰的记录。
在技术选型上,CI/CD 工具链是核心。Jenkins 过于笨重,其插件生态系统在长期维护中是个不小的负担。GitLab CI 虽然与 Git 集成良好,但其 Runner 的管理和 Kubernetes 的结合并不如我预期的那样原生。
最终,我将目光锁定在 Tekton。它是一个纯粹的 Kubernetes 原生流水线引擎。它的核心优势在于:
- 声明式: 流水线、任务、步骤都通过 YAML 以 CRD (Custom Resource Definitions) 的形式定义,天然契合 GitOps。
- 容器化: 流水线的每一个步骤(Step)都在一个独立的容器中运行。这意味着我可以为数据预处理、模型训练、模型打包等不同阶段指定最合适的镜像,彻底解决了环境和依赖的隔离问题。
- 无中心化: Tekton 没有一个中心化的 Master 节点,所有的执行都由 K8s 的控制器和 Pod 来完成,这让它更轻量、更具弹性。
模型训练部分,团队普遍使用 Python 技术栈,Scikit-learn 是最常用的库之一。它的 API 稳定,生态成熟,非常适合作为我们 MLOps 体系标准化的第一个试点对象。
所以,我们的目标明确了:基于 Tekton 和 Scikit-learn,搭建一套自动化的模型训练、验证和打包流水线。
步骤化实现:从零构建 MLOps 流水线
整个流水线需要一个共享的存储卷,用于在不同的任务(Task)之间传递数据,比如训练好的模型文件、评估指标等。在 Kubernetes 中,PersistentVolumeClaim (PVC) 是实现这一目标的标准方式。
# shared-workspace-pvc.yaml
apiVersion: v1
kind: PersistentVolumeClaim
metadata:
name: mlops-shared-workspace
spec:
accessModes:
- ReadWriteOnce
resources:
requests:
storage: 1Gi
这个 PVC 将作为我们所有 Tekton Task 的共享工作区。
Task 1: 拉取代码与数据
这个任务很简单,就是从 Git 仓库中克隆我们的项目代码。Tekton 社区提供了现成的 git-clone ClusterTask,我们直接使用即可。项目结构如下:
.
├── src/
│ ├── train.py # 模型训练脚本
│ └── serve.py # 用于打包进镜像的API服务脚本
├── data/
│ └── iris.csv # 训练数据
├── Dockerfile # 用于打包模型服务的Dockerfile
└── tekton/
├── ... # Tekton YAML 文件
Task 2: 模型训练与初步评估 (train-model)
这是流水线的第一个核心任务。它负责执行 train.py 脚本,产出两个关键文件并存入共享工作区:
-
model.pkl: 训练好的模型文件。 -
metrics.json: 包含模型性能指标(如准确率)的 JSON 文件。
这个任务的实现需要自定义一个 Task CRD。
# train-model-task.yaml
apiVersion: tekton.dev/v1beta1
kind: Task
metadata:
name: train-model
spec:
workspaces:
- name: source
description: The workspace containing the source code and output artifacts.
params:
- name: model-name
description: "Name of the model file to be generated."
default: "model.pkl"
- name: metrics-name
description: "Name of the metrics file to be generated."
default: "metrics.json"
steps:
- name: train
# 在真实项目中,这里应该是一个预构建好的、包含所有依赖的镜像
# 为了演示方便,我们直接使用一个带 scikit-learn 的通用镜像,并临时安装依赖
image: python:3.9-slim
workingDir: $(workspaces.source.path)
script: |
#!/usr/bin/env bash
set -e # Exit immediately if a command exits with a non-zero status.
echo "-------------------------------------------"
echo "Installing dependencies..."
echo "-------------------------------------------"
pip install scikit-learn==1.2.2 pandas==1.5.3
echo "-------------------------------------------"
echo "Running training script..."
echo "-------------------------------------------"
python src/train.py \
--data-path ./data/iris.csv \
--model-path ./$(params.model-name) \
--metrics-path ./$(params.metrics-name)
echo "-------------------------------------------"
echo "Training finished. Artifacts produced:"
echo "-------------------------------------------"
ls -l $(params.model-name) $(params.metrics-name)
对应的 train.py 脚本必须是生产级的,包含参数解析、日志和错误处理。
# src/train.py
import argparse
import json
import logging
import pandas as pd
from sklearn.model_selection import train_test_split
from sklearn.linear_model import LogisticRegression
from sklearn.metrics import accuracy_score
import joblib
# 配置日志
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')
def train(data_path: str, model_path: str, metrics_path: str):
"""
模型训练、评估和保存函数
"""
try:
logging.info(f"Loading data from {data_path}...")
df = pd.read_csv(data_path)
# 简单的特征和标签分离
X = df[['sepal.length', 'sepal.width', 'petal.length', 'petal.width']]
y = df['variety']
# 划分训练集和测试集
X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2, random_state=42)
logging.info("Data successfully split into training and testing sets.")
# 模型训练
logging.info("Training Logistic Regression model...")
model = LogisticRegression(max_iter=200)
model.fit(X_train, y_train)
logging.info("Model training completed.")
# 模型评估
logging.info("Evaluating model...")
y_pred = model.predict(X_test)
accuracy = accuracy_score(y_test, y_pred)
logging.info(f"Model accuracy on test set: {accuracy:.4f}")
# 保存模型
logging.info(f"Saving model to {model_path}...")
joblib.dump(model, model_path)
# 保存评估指标
logging.info(f"Saving metrics to {metrics_path}...")
metrics = {
"accuracy": accuracy
}
with open(metrics_path, 'w') as f:
json.dump(metrics, f, indent=4)
logging.info("Artifacts saved successfully.")
except FileNotFoundError:
logging.error(f"Error: Data file not found at {data_path}")
raise
except Exception as e:
logging.error(f"An unexpected error occurred: {e}")
raise
if __name__ == '__main__':
parser = argparse.ArgumentParser(description="Train a scikit-learn model.")
parser.add_argument("--data-path", type=str, required=True, help="Path to the training data CSV file.")
parser.add_argument("--model-path", type=str, required=True, help="Path to save the trained model.")
parser.add_argument("--metrics-path", type=str, required=True, help="Path to save the evaluation metrics.")
args = parser.parse_args()
train(args.data_path, args.model_path, args.metrics_path)
Task 3: 模型验证 (validate-model)
这是整个流水线的“质量门”。它的职责是比较新训练模型的性能和当前生产环境中的基线模型性能。只有当新模型的性能优于(或等于)基线时,流水线才能继续。
这里的坑在于,如何获取“基线性能”?在真实项目中,这通常会从一个模型注册表(Model Registry)如 MLflow 中获取。为了简化,我们这里模拟一个简单的模型注册表:用一个 Kubernetes ConfigMap 来存储当前生产模型的指标。
# model-baseline-cm.yaml
apiVersion: v1
kind: ConfigMap
metadata:
name: model-baseline-registry
data:
# 假设当前生产模型的准确率是 0.95
accuracy: "0.95"
现在,我们的 validate-model 任务需要读取这个 ConfigMap,并与上一步生成的 metrics.json 文件进行比较。
# validate-model-task.yaml
apiVersion: tekton.dev/v1beta1
kind: Task
metadata:
name: validate-model
spec:
workspaces:
- name: source
description: The workspace containing the metrics file.
params:
- name: metrics-name
description: "Name of the metrics file to be validated."
default: "metrics.json"
- name: baseline-accuracy-cm
description: "ConfigMap name for baseline accuracy."
default: "model-baseline-registry"
- name: baseline-accuracy-key
description: "ConfigMap key for baseline accuracy."
default: "accuracy"
steps:
- name: validate
# 我们需要一个安装了 jq (用于解析json) 和 bc (用于浮点数比较) 的镜像
image: alpine/k8s:1.24.12
workingDir: $(workspaces.source.path)
script: |
#!/usr/bin/env sh
set -e
echo "-------------------------------------------"
echo "Starting model validation..."
echo "-------------------------------------------"
METRICS_FILE="$(params.metrics-name)"
if [ ! -f "$METRICS_FILE" ]; then
echo "Error: Metrics file '$METRICS_FILE' not found!"
exit 1
fi
# 从文件中提取新模型的准确率
NEW_ACCURACY=$(jq -r '.accuracy' "$METRICS_FILE")
echo "New model accuracy: $NEW_ACCURACY"
# 从 ConfigMap 中获取基线模型的准确率
# 这里的 kubectl 命令能成功是因为我们使用的镜像 alpine/k8s 内置了它
# 并且 Tekton 会自动为 Pod 注入 ServiceAccount,使其拥有访问 K8s API 的权限
BASELINE_ACCURACY=$(kubectl get configmap $(params.baseline-accuracy-cm) -o jsonpath="{.data['$(params.baseline-accuracy-key)']}")
echo "Baseline model accuracy: $BASELINE_ACCURACY"
if [ -z "$BASELINE_ACCURACY" ]; then
echo "Warning: Baseline accuracy not found. Approving model by default."
exit 0
fi
# 使用 bc 进行浮点数比较
# 如果 new_accuracy >= baseline_accuracy,则返回 1,否则返回 0
COMPARISON_RESULT=$(echo "$NEW_ACCURACY >= $BASELINE_ACCURACY" | bc -l)
if [ "$COMPARISON_RESULT" -eq 1 ]; then
echo "Validation PASSED: New model performance is better or equal to the baseline."
exit 0
else
echo "Validation FAILED: New model performance ($NEW_ACCURACY) is worse than the baseline ($BASELINE_ACCURACY)."
exit 1
fi
这个任务的设计是关键。如果比较失败,exit 1 会导致整个 TaskRun 失败,进而使整个 PipelineRun 失败,有效地阻止了劣质模型进入生产环境。
Task 4: 构建并推送模型服务镜像 (build-and-push-image)
模型通过验证后,我们需要将其打包成一个可以独立部署的服务。最常见的方式是构建一个包含模型文件和 API Server(如 Flask 或 FastAPI)的 Docker 镜像。
Tekton 社区提供了 Kaniko 这个优秀的 ClusterTask,它可以在非特权容器内、无需 Docker-in-Docker 的方式构建镜像,这在多租户的 K8s 环境中是最佳实践。
我们需要准备一个简单的 Dockerfile 和一个 API 服务脚本 serve.py。
# Dockerfile
FROM python:3.9-slim
WORKDIR /app
COPY requirements.txt .
RUN pip install --no-cache-dir -r requirements.txt
COPY src/serve.py .
COPY model.pkl .
# 在真实项目中,这里应使用非 root 用户
# USER nonroot:nonroot
EXPOSE 8000
CMD ["uvicorn", "serve:app", "--host", "0.0.0.0", "--port", "8000"]
requirements.txt 包含 fastapi, uvicorn, scikit-learn, joblib, pandas 等。
# src/serve.py
import joblib
import pandas as pd
from fastapi import FastAPI
from pydantic import BaseModel
import logging
# 配置日志
logging.basicConfig(level=logging.INFO)
app = FastAPI()
# 加载模型
try:
model = joblib.load('model.pkl')
logging.info("Model loaded successfully.")
except FileNotFoundError:
logging.error("Model file 'model.pkl' not found.")
model = None
class IrisFeatures(BaseModel):
sepal_length: float
sepal_width: float
petal_length: float
petal_width: float
@app.on_event("startup")
async def startup_event():
if model is None:
logging.error("Application starting without a loaded model. Predictions will fail.")
@app.get("/health")
def health_check():
return {"status": "ok" if model is not None else "error", "model_loaded": model is not None}
@app.post("/predict")
def predict(features: IrisFeatures):
if model is None:
return {"error": "Model is not loaded, cannot make predictions."}
try:
data = pd.DataFrame([[
features.sepal_length,
features.sepal_width,
features.petal_length,
features.petal_width
]], columns=['sepal.length', 'sepal.width', 'petal.length', 'petal.width'])
prediction = model.predict(data)
prediction_proba = model.predict_proba(data)
return {
"prediction": prediction[0],
"probability": prediction_proba.max()
}
except Exception as e:
logging.error(f"Error during prediction: {e}")
return {"error": "An error occurred during prediction."}
有了这些,我们就可以在流水线中调用 Kaniko 任务了。
将所有任务串联成一个流水线 (Pipeline)
现在,我们把上面定义的各个 Task 像乐高积木一样组合起来,形成一个完整的 Pipeline。
graph TD
A[Git Clone] --> B(Train Model);
B --> C{Validate Model};
C -- Succeeded --> D[Build & Push Image];
C -- Failed --> E[Stop];
# model-training-pipeline.yaml
apiVersion: tekton.dev/v1beta1
kind: Pipeline
metadata:
name: sklearn-train-validate-build-pipeline
spec:
workspaces:
- name: shared-data
params:
- name: repo-url
type: string
description: The git repository URL.
- name: repo-revision
type: string
description: The git revision (branch, tag, or commit sha).
default: "main"
- name: image-url
type: string
description: The URL of the image to build and push.
tasks:
- name: fetch-source
taskRef:
name: git-clone
kind: ClusterTask
workspaces:
- name: output
workspace: shared-data
params:
- name: url
value: $(params.repo-url)
- name: revision
value: $(params.repo-revision)
- name: train-iris-model
taskRef:
name: train-model
runAfter:
- fetch-source
workspaces:
- name: source
workspace: shared-data
params:
- name: model-name
value: "model.pkl"
- name: metrics-name
value: "metrics.json"
- name: validate-iris-model
taskRef:
name: validate-model
runAfter:
- train-iris-model
workspaces:
- name: source
workspace: shared-data
params:
- name: metrics-name
value: "metrics.json"
- name: build-model-image
taskRef:
name: kaniko
kind: ClusterTask
runAfter:
- validate-iris-model
workspaces:
- name: source
workspace: shared-data
params:
- name: IMAGE
value: $(params.image-url):$(tasks.fetch-source.results.commit)
# 注意:Dockerfile 的路径是相对于工作空间的根目录
- name: DOCKERFILE
value: ./Dockerfile
# context 也是相对于工作空间的根目录
- name: CONTEXT
value: ./
这个 Pipeline 定义了任务的执行顺序 (runAfter) 和工作区、参数的传递。注意 image-url 的 Tag,我们巧妙地使用了 git-clone 任务输出的 commit 哈希作为镜像标签,这就实现了制品与代码版本的精确对应,可追溯性大大增强。
执行与最终成果
最后,我们通过创建一个 PipelineRun 实例来触发整个流水线。
# pipeline-run.yaml
apiVersion: tekton.dev/v1beta1
kind: PipelineRun
metadata:
generateName: sklearn-pipeline-run-
spec:
pipelineRef:
name: sklearn-train-validate-build-pipeline
workspaces:
- name: shared-data
persistentVolumeClaim:
claimName: mlops-shared-workspace
params:
- name: repo-url
value: "https://github.com/your-repo/your-ml-project.git" # 替换成你的仓库地址
- name: repo-revision
value: "main"
- name: image-url
value: "your-docker-registry/your-image-name" # 替换成你的镜像仓库地址
当这个 PipelineRun 被 kubectl apply 到集群后,Tekton Controller 会立即开始工作。我们可以通过 tkn pipelinerun logs <pr-name> -f 命令实时观察流水线的执行过程。如果一切顺利,一个新的、带有 Git Commit ID 标签的模型服务镜像就会被推送到我们的镜像仓库中。如果模型性能下降,流水线会在验证步骤就自动失败,并给出明确的错误信息。
我们成功地将一个混乱、手动的模型更新流程,改造成了一个自动、可靠、可追溯的 MLOps 管道。
遗留问题与未来迭代
当前这套方案只是一个起点,它解决了最核心的自动化和验证问题,但在生产环境中,还有很多可以改进的地方:
- 部署自动化: 目前流水线只到构建镜像为止,部署是缺失的环节。可以增加一个任务,使用
kubectl、kustomize或者与 ArgoCD 等 GitOps 工具联动,实现通过验证的镜像自动部署到预发或生产环境。 - 模型注册表: 使用
ConfigMap存储基线指标过于简陋。引入 MLflow 或类似工具,可以系统地管理模型的版本、参数、指标和生命周期。流水线的验证任务应该从 MLflow API 获取基线信息,并在成功后将新模型注册回去。 - 触发器 (Triggers): 当前
PipelineRun是手动创建的。在真实场景中,我们应该使用 Tekton Triggers 来创建一个事件监听器,当 Git 仓库有新的提交时,自动创建PipelineRun,实现真正的 CI/CD。 - 更复杂的验证: 仅比较准确率是不够的。验证步骤可以扩展为执行一个更全面的测试套件,比如检查模型预测的延迟、在不同数据切片上的性能表现、模型的公平性指标等。
- 特征存储 (Feature Store): 我们的数据源还是一个简单的 CSV 文件。对于大型团队,建立一个统一的特征存储是必要的,它可以确保训练和推理时使用一致的特征,避免线上线下不一致的问题。
尽管存在这些可迭代的点,但这套基于 Tekton 的 MLOps 流水线已经为我们团队的机器学习实践带来了质的飞跃。它为我们提供了一个坚实的、可扩展的自动化框架,让算法工程师可以更专注于模型和算法本身,而不是繁琐且易错的工程细节。