使用 Caddy 和 Lit 构建 PyTorch 模型的自动化金丝雀发布工作流


模型上线后的迭代是个棘手问题。直接替换线上正在运行的 v1 模型为 v2 版本,无异于一场赌博。新模型在离线评估中表现再好,也无法保证它在真实生产流量下的性能、延迟或业务指标不会出现衰退。我们需要一个受控的、可观测的、能快速回滚的发布流程。传统的蓝绿部署虽然能实现零停机,但它是全量切换,一旦 v2 有问题,影响面就是100%。金丝雀发布是更优解,它允许我们将一小部分实时流量引导至新版本,在真实环境中验证其表现,然后再逐步扩大流量比例。

问题的核心在于如何优雅地实现动态、可编程的流量切分。在 Kubernetes 环境中,这通常由 Service Mesh (如 Istio, Linkerd) 的 TrafficSplitting 功能完成,但这对于中小型项目或非 K8s 环境而言过于笨重。我们需要一个更轻量、配置简单且具备强大 API 管控能力的反向代理。这正是 Caddy 的用武之地。它的 Admin API 允许我们通过一个简单的 HTTP 请求,在毫秒内原子化地更新整个路由配置,且不会中断任何现有连接。

本次构建的目标是:

  1. 将一个 PyTorch 模型包装成两个独立的 FastAPI 服务:model-v1 (稳定版) 和 model-v2 (金丝雀版)。
  2. 使用 Caddy 作为前端网关,初始时将 100% 流量代理到 model-v1
  3. 通过一个独立的 Python 控制脚本,调用 Caddy Admin API,动态调整流量分配策略,例如将 10% 的流量分配给 model-v2
  4. 构建一个基于 Lit 的极简前端,用于上传图片进行推理,并清晰地展示响应来自哪个版本的模型,从而直观验证金丝雀策略的有效性。

整个架构如下:

graph TD
    subgraph "用户端"
        User[用户浏览器] --> |上传图片| LitApp[Lit Web Component]
    end

    LitApp --> |/api/predict| Caddy[Caddy Server]

    subgraph "后端服务"
        Caddy -- 90% 流量 --> ModelV1[PyTorch + FastAPI v1.0]
        Caddy -- 10% 流量 (金丝雀) --> ModelV2[PyTorch + FastAPI v2.0]
    end

    subgraph "控制平面"
        Admin[运维/CI/CD] --> |HTTP PATCH| ControlScript[deploy.py]
        ControlScript --> |更新JSON配置| CaddyAdminAPI[Caddy Admin API :2019]
    end

第一步:构建可运行的 PyTorch 推理服务

我们需要一个基础的推理服务。这里使用 FastAPI 作为 Web 框架,因为它性能高且易于使用。模型选用预训练的 ResNet-18。为了模拟版本迭代,v2 版本将返回一个稍有不同的标签格式,并明确标识自己是 v2

项目结构:

.
├── model_service
│   ├── main.py
│   ├── Dockerfile
│   └── requirements.txt
├── control
│   └── deploy.py
└── frontend
    └── index.html
    └── lit-app.js
└── docker-compose.yml
└── Caddyfile

model_service/requirements.txt

fastapi==0.104.1
uvicorn[standard]==0.23.2
torch==2.1.0
torchvision==0.16.0
Pillow==10.1.0
python-multipart==0.0.6

model_service/main.py
这个文件是核心的推理服务代码。注意 MODEL_VERSION 环境变量,我们将通过它来区分 v1v2 实例。

import os
import io
import logging
from contextlib import asynccontextmanager

import torch
from torchvision import models, transforms
from PIL import Image
from fastapi import FastAPI, UploadFile, File, HTTPException
from fastapi.responses import JSONResponse
from fastapi.middleware.cors import CORSMiddleware

# --- 配置与日志 ---
# 在生产环境中,日志应该输出为 JSON 格式并由采集器收集
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')
logger = logging.getLogger(__name__)

# 从环境变量获取模型版本,这是区分v1和v2的关键
MODEL_VERSION = os.getenv("MODEL_VERSION", "1.0.0")
# 模拟v2模型的变化,例如返回更详细的标签
IS_V2 = MODEL_VERSION.startswith("2.")

# --- 模型加载与管理 ---
# 使用一个字典来持有模型,方便未来扩展多个模型
models_registry = {}

# 使用 FastAPI 的 lifespan 事件来管理模型生命周期
# 这确保模型在应用启动时加载,在关闭时清理,而不是在每次请求时加载
@asynccontextmanager
async def lifespan(app: FastAPI):
    logger.info(f"Model service v{MODEL_VERSION} starting up...")
    try:
        models_registry['resnet18'] = models.resnet18(weights=models.ResNet18_Weights.DEFAULT)
        models_registry['resnet18'].eval()
        # 加载ImageNet标签
        with open(os.path.join(os.path.dirname(__file__), "imagenet_classes.txt")) as f:
            models_registry['labels'] = [line.strip() for line in f.readlines()]
        logger.info("ResNet-18 model and labels loaded successfully.")
    except Exception as e:
        logger.error(f"Failed to load model: {e}")
        # 如果模型加载失败,服务不应该启动
        raise RuntimeError("Model loading failed")
    yield
    # 清理资源
    models_registry.clear()
    logger.info("Model service shutting down and resources cleaned up.")

app = FastAPI(lifespan=lifespan)

# --- CORS 中间件 ---
# 允许前端 Lit 应用跨域访问
app.add_middleware(
    CORSMiddleware,
    allow_origins=["*"], # 生产环境应配置为具体的前端域名
    allow_credentials=True,
    allow_methods=["*"],
    allow_headers=["*"],
)

# --- 图像预处理 ---
# 这是标准的 PyTorch 图像预处理流程
preprocess = transforms.Compose([
    transforms.Resize(256),
    transforms.CenterCrop(224),
    transforms.ToTensor(),
    transforms.Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225]),
])

def transform_image(image_bytes: bytes) -> torch.Tensor:
    """将上传的图片字节流转换为模型输入张量"""
    try:
        image = Image.open(io.BytesIO(image_bytes))
        # 确保图片是RGB格式
        if image.mode != "RGB":
            image = image.convert("RGB")
        return preprocess(image).unsqueeze(0)
    except Exception as e:
        logger.error(f"Image processing failed: {e}")
        raise ValueError("Invalid image file")

# --- API 端点 ---
@app.get("/version")
async def get_version():
    """一个简单的健康检查/版本检查端点"""
    return {"model_version": MODEL_VERSION}

@app.post("/predict")
async def predict(file: UploadFile = File(...)):
    """接收图片,执行推理并返回结果"""
    if 'resnet18' not in models_registry:
        raise HTTPException(status_code=503, detail="Model is not available.")

    try:
        image_bytes = await file.read()
        tensor = transform_image(image_bytes)
    except ValueError as e:
        raise HTTPException(status_code=400, detail=str(e))
    except Exception:
        raise HTTPException(status_code=500, detail="Failed to read or process image.")

    with torch.no_grad():
        outputs = models_registry['resnet18'](tensor)
        _, y_hat = outputs.max(1)
        predicted_idx = y_hat.item()
        
        labels = models_registry['labels']
        prediction = labels[predicted_idx]

    # 模拟 v2 模型的返回格式变化
    if IS_V2:
        # v2 返回更丰富的结构
        response_data = {
            "prediction": {
                "label": prediction,
                "index": predicted_idx
            },
            "model_version": MODEL_VERSION,
            "engine": "PyTorch"
        }
    else:
        # v1 返回简单格式
        response_data = {
            "prediction": prediction,
            "model_version": MODEL_VERSION
        }

    return JSONResponse(content=response_data)

# 在真实项目中,这里应该用 gunicorn 等生产级服务器启动,而不是 uvicorn 开发服务器
# if __name__ == "__main__":
#     import uvicorn
#     uvicorn.run(app, host="0.0.0.0", port=8000)

(你需要自行下载一份 imagenet_classes.txt 文件并放在 model_service 目录下。)

model_service/Dockerfile
一个标准的多阶段构建 Dockerfile,以减小最终镜像体积。

# --- Builder Stage ---
FROM python:3.10-slim as builder

WORKDIR /app

# 安装构建依赖
RUN pip install --no-cache-dir --upgrade pip
COPY requirements.txt .
# 这里只下载包,不安装,以便在下一阶段利用缓存
RUN pip wheel --no-cache-dir --wheel-dir /wheels -r requirements.txt


# --- Final Stage ---
FROM python:3.10-slim

WORKDIR /app

# 从 builder 阶段复制 wheel 文件并安装
COPY --from=builder /wheels /wheels
RUN pip install --no-cache-dir /wheels/* && rm -rf /wheels

# 复制应用代码
COPY main.py .
COPY imagenet_classes.txt .

# 暴露端口并设置启动命令
EXPOSE 8000
CMD ["uvicorn", "main:app", "--host", "0.0.0.0", "--port", "8000"]

第二步:配置 Caddy 作为动态反向代理

Caddy 的配置是整个工作流的核心。我们将使用它的 JSON 配置格式,因为它比 Caddyfile 更适合通过 API 进行编程控制。

Caddyfile
我们依然使用 Caddyfile 作为初始配置文件,它更易读。Caddy 启动时会把它转换成 JSON。

# Caddyfile
{
    # 启用 Admin API,这是动态配置的关键
    admin 0.0.0.0:2019 {
        # 生产环境必须加强安全措施,例如限制来源 IP
        # origin localhost:2019
    }
}

# 前端服务
:8080 {
    # 开启 gzip 压缩
    encode zstd gzip

    # 文件服务器,托管我们的 Lit 前端应用
    file_server {
        root /usr/share/caddy
    }

    # API 路由
    # 所有 /api/* 的请求都会被代理到后端服务集群
    handle /api/* {
        reverse_proxy /* {
            # 定义一个名为 "models" 的上游服务集群
            # Caddy 启动时会寻找名为 "model-v1" 的主机
            to model-v1:8000
            
            # 使用 round_robin 负载均衡策略
            lb_policy round_robin
        }
    }
}

这个初始配置将所有 API 请求都发送到 model-v1:8000。我们的控制脚本将通过 Admin API 修改这部分 reverse_proxy 的配置。

第三步:实现流量控制脚本

这个脚本是金丝雀发布的指挥官。它生成 Caddy 需要的 JSON 配置片段,并通过 HTTP PATCH 请求发送给 Caddy Admin API。

control/deploy.py

import requests
import json
import argparse
import sys

# Caddy Admin API 地址
CADDY_ADMIN_API = "http://localhost:2019"
# 需要更新的配置路径
# 我们要修改的是 :8080 服务器中,处理 /api/* 的那个 handle 里的 reverse_proxy
CONFIG_PATH = "/config/apps/http/servers/srv0/routes/1/handle/0/routes/0/handle/0"

def get_caddy_config_fragment(v1_weight: int, v2_weight: int) -> dict:
    """
    根据权重生成 Caddy reverse_proxy 的 JSON 配置片段。
    Caddy 的 round_robin 策略并不直接支持权重,我们通过重复上游地址来模拟。
    例如,9:1 的权重就意味着在列表中添加9个v1地址和1个v2地址。
    这是一个简单但有效的实现方式。
    """
    if v1_weight < 0 or v2_weight < 0:
        raise ValueError("Weights must be non-negative.")
    
    # 如果 v2 权重为0,则完全关闭金丝雀
    if v2_weight == 0 and v1_weight > 0:
        upstreams = [{"dial": "model-v1:8000"}]
    # 如果 v1 权重为0,则完成发布,v2 成为新的稳定版
    elif v1_weight == 0 and v2_weight > 0:
        upstreams = [{"dial": "model-v2:8000"}]
    else:
        # 按比例创建上游列表
        upstreams = [
            {"dial": "model-v1:8000"} for _ in range(v1_weight)
        ] + [
            {"dial": "model-v2:8000"} for _ in range(v2_weight)
        ]

    config_fragment = {
        "handler": "reverse_proxy",
        "upstreams": upstreams,
        "load_balancing": {
            "policy": "round_robin"
        }
    }
    return config_fragment


def update_caddy_config(config_fragment: dict):
    """
    通过 Caddy Admin API 发送 PATCH 请求更新配置。
    """
    headers = {"Content-Type": "application/json"}
    url = CADDY_ADMIN_API + CONFIG_PATH
    
    print(f"Targeting Caddy API endpoint: {url}")
    print("Sending configuration patch:")
    print(json.dumps(config_fragment, indent=2))

    try:
        response = requests.patch(url, headers=headers, data=json.dumps(config_fragment))
        response.raise_for_status()  # 如果 HTTP 状态码不是 2xx,则抛出异常
        print("\nCaddy configuration updated successfully.")
        print(f"Status Code: {response.status_code}")
    except requests.exceptions.RequestException as e:
        print(f"\nError updating Caddy configuration: {e}", file=sys.stderr)
        sys.exit(1)


if __name__ == "__main__":
    parser = argparse.ArgumentParser(description="Control Canary Release via Caddy Admin API.")
    parser.add_argument(
        "--v1-weight", type=int, default=9,
        help="Weight for the stable version (v1). Default is 9."
    )
    parser.add_argument(
        "--v2-weight", type=int, default=1,
        help="Weight for the canary version (v2). Default is 1."
    )
    
    args = parser.parse_args()
    
    if args.v1_weight == 0 and args.v2_weight == 0:
        print("Error: At least one weight must be greater than zero.", file=sys.stderr)
        sys.exit(1)

    total_weight = args.v1_weight + args.v2_weight
    v2_percentage = (args.v2_weight / total_weight) * 100
    print(f"Setting traffic split: v1 ~{100-v2_percentage:.1f}%, v2 (canary) ~{v2_percentage:.1f}%")

    new_config = get_caddy_config_fragment(args.v1_weight, args.v2_weight)
    update_caddy_config(new_config)

这个脚本的核心在于 get_caddy_config_fragment 函数。它通过构造一个包含不同数量 v1v2 服务地址的列表,巧妙地利用 round_robin 策略实现了加权负载均衡。

第四步:构建 Lit 前端进行验证

前端非常简单,只包含一个文件上传组件和一个结果展示区。关键在于它会显示返回结果中的 model_version 字段,让我们能肉眼观察到流量分配。

frontend/lit-app.js

import { LitElement, html, css } from 'https://cdn.jsdelivr.net/gh/lit/dist@3/core/lit-core.min.js';

class ImageClassifier extends LitElement {
  static styles = css`
    :host {
      display: block;
      font-family: -apple-system, BlinkMacSystemFont, "Segoe UI", Roboto, Helvetica, Arial, sans-serif;
      max-width: 600px;
      margin: 40px auto;
      padding: 20px;
      border: 1px solid #ddd;
      border-radius: 8px;
      box-shadow: 0 2px 4px rgba(0,0,0,0.1);
    }
    h1 {
      color: #333;
      text-align: center;
    }
    .uploader {
      display: flex;
      flex-direction: column;
      align-items: center;
      margin-bottom: 20px;
    }
    input[type="file"] {
      border: 1px solid #ccc;
      padding: 10px;
      border-radius: 4px;
    }
    button {
      margin-top: 10px;
      padding: 10px 20px;
      font-size: 16px;
      background-color: #007bff;
      color: white;
      border: none;
      border-radius: 4px;
      cursor: pointer;
      transition: background-color 0.2s;
    }
    button:disabled {
      background-color: #aaa;
      cursor: not-allowed;
    }
    button:hover:not(:disabled) {
      background-color: #0056b3;
    }
    .result {
      padding: 15px;
      border-radius: 4px;
      text-align: center;
      font-size: 18px;
      min-height: 50px;
      line-height: 50px;
    }
    .result.loading {
      background-color: #f0f0f0;
      color: #555;
    }
    .result.error {
      background-color: #f8d7da;
      color: #721c24;
    }
    .result.v1 {
      background-color: #d4edda;
      color: #155724;
      border: 1px solid #c3e6cb;
    }
    .result.v2 {
      background-color: #fff3cd;
      color: #856404;
      border: 1px solid #ffeeba;
    }
    .version-tag {
      font-weight: bold;
      padding: 2px 6px;
      border-radius: 3px;
    }
  `;

  static properties = {
    isLoading: { type: Boolean },
    result: { type: Object },
    error: { type: String },
    file: { type: Object },
  };

  constructor() {
    super();
    this.isLoading = false;
    this.result = null;
    this.error = null;
    this.file = null;
  }

  handleFileChange(e) {
    this.file = e.target.files[0];
  }

  async handleSubmit() {
    if (!this.file) {
      this.error = 'Please select a file first.';
      return;
    }

    this.isLoading = true;
    this.result = null;
    this.error = null;

    const formData = new FormData();
    formData.append('file', this.file);

    try {
      const response = await fetch('/api/predict', {
        method: 'POST',
        body: formData,
      });

      if (!response.ok) {
        const errData = await response.json();
        throw new Error(errData.detail || `HTTP error! status: ${response.status}`);
      }

      this.result = await response.json();
    } catch (e) {
      this.error = e.message;
    } finally {
      this.isLoading = false;
    }
  }

  renderResult() {
    if (this.isLoading) {
      return html`<div class="result loading">Predicting...</div>`;
    }
    if (this.error) {
      return html`<div class="result error">Error: ${this.error}</div>`;
    }
    if (this.result) {
      const version = this.result.model_version;
      const isV2 = version.startsWith('2.');
      const predictionLabel = isV2 ? this.result.prediction.label : this.result.prediction;
      
      return html`
        <div class="result ${isV2 ? 'v2' : 'v1'}">
          Prediction: <strong>${predictionLabel}</strong> 
          (Served by <span class="version-tag">${version}</span>)
        </div>
      `;
    }
    return html`<div class="result">Upload an image to see the prediction.</div>`;
  }

  render() {
    return html`
      <h1>PyTorch Canary Deployment Demo</h1>
      <div class="uploader">
        <input type="file" @change=${this.handleFileChange} accept="image/*">
        <button @click=${this.handleSubmit} .disabled=${!this.file || this.isLoading}>
          ${this.isLoading ? 'Processing...' : 'Classify Image'}
        </button>
      </div>
      ${this.renderResult()}
    `;
  }
}

customElements.define('image-classifier', ImageClassifier);

frontend/index.html
一个简单的 HTML 容器。

<!DOCTYPE html>
<html lang="en">
<head>
    <meta charset="UTF-8">
    <meta name="viewport" content="width=device-width, initial-scale=1.0">
    <title>Image Classifier</title>
    <script type="module" src="/lit-app.js"></script>
</head>
<body>
    <image-classifier></image-classifier>
</body>
</html>

第五步:使用 Docker Compose 编排一切

docker-compose.yml 文件将所有服务粘合在一起,定义了它们的构建方式、依赖关系和网络。

docker-compose.yml

version: '3.8'

services:
  model-v1:
    build:
      context: ./model_service
    image: pytorch-model-service:1.0.0
    container_name: model-v1
    environment:
      - MODEL_VERSION=1.0.0
    # 在生产中,需要配置重启策略
    # restart: always
    networks:
      - modelnet

  model-v2:
    build:
      context: ./model_service
    image: pytorch-model-service:2.0.0
    container_name: model-v2
    environment:
      - MODEL_VERSION=2.0.0-canary
    networks:
      - modelnet

  caddy:
    image: caddy:2.7.5
    container_name: caddy-gateway
    ports:
      - "8080:8080"  # 前端和 API
      - "2019:2019"  # Admin API
    volumes:
      - ./Caddyfile:/etc/caddy/Caddyfile
      - ./frontend:/usr/share/caddy
    depends_on:
      - model-v1
      - model-v2
    networks:
      - modelnet

networks:
  modelnet:
    driver: bridge

执行与验证

  1. 启动所有服务:

    docker-compose up --build -d
  2. 初始状态验证:
    打开浏览器访问 http://localhost:8080。上传一张图片(例如猫或狗的图片),反复提交。你会发现每次结果都来自 model_version: 1.0.0,背景是绿色的。

  3. 开启金丝雀发布 (10% 流量):
    在主机上运行控制脚本。

    # 确保 control/deploy.py 和 model_service/ 在同一目录下
    pip install requests
    python control/deploy.py --v1-weight 9 --v2-weight 1

    脚本会输出它发送给 Caddy 的 JSON 配置。现在回到浏览器,继续上传图片。大约每10次请求中,你就会看到一次结果来自 model_version: 2.0.0-canary,背景变为黄色。这证明流量切分已生效。

    此时,金丝雀版本的性能和准确率就可以在真实流量下进行监控了。

  4. 扩大金丝雀流量 (50%):

    python control/deploy.py --v1-weight 1 --v2-weight 1

    现在,两个版本的响应应该会以大致相等的频率出现。

  5. 完成发布 (100% 流量到 v2):
    假设金丝雀版本表现良好,我们可以将所有流量都切换过去。

    python control/deploy.py --v1-weight 0 --v2-weight 1

    现在所有请求都由 v2 处理。v1 服务实例仍然在运行,可以随时用于回滚,直到我们确认 v2 完全稳定后,再将其下线。

  6. 快速回滚:
    如果发现 v2 有问题,只需一步即可回滚所有流量。

    python control/deploy.py --v1-weight 1 --v2-weight 0

    所有流量立即切回 v1,实现了快速止损。

方案局限性与未来展望

这个方案提供了一个轻量级、与平台无关的金丝雀发布实现,尤其适用于没有引入 Kubernetes 或 Service Mesh 的环境。然而,它也存在一些局限性:

  1. 控制脚本的健壮性: 当前的 deploy.py 是一个基础实现。在生产环境中,它需要更强的错误处理、重试逻辑,并且应该从 Caddy API 读取当前配置以进行更智能的更新,而不是硬编码配置路径。
  2. 监控与自动化: 真正的金丝雀发布依赖于自动化的指标分析。此工作流需要与监控系统(如 Prometheus/Grafana)集成。CI/CD 流水线应自动执行流量切换,并根据 SLI/SLO(服务等级指标/目标)的健康状况自动决定是继续推进发布还是执行回滚。
  3. 流量切分的精细度: 基于权重的轮询适用于无状态服务。对于需要会话保持的场景,或者希望基于用户特征(如内部员工、特定地区用户)进行灰度发布的场景,需要利用 Caddy 更高级的请求匹配器(matchers),例如匹配请求头或 Cookie,来实现更复杂的路由逻辑。
  4. 有状态服务: 此方案完美适用于无状态的推理服务。如果服务更新涉及到数据库 schema 变更等有状态的修改,金丝雀发布会变得异常复杂,需要考虑数据向后兼容和迁移策略,这超出了流量路由本身能解决的范畴。

未来的迭代方向可以聚焦于将 deploy.py 脚本演变成一个更通用的部署控制器,集成到 CI/CD 平台中,并基于监控告警实现全自动的、渐进式的交付流程。


  目录