项目初期的需求很明确:搭建一个能同时支撑在线实时推理和离线模型训练的计算机视觉(CV)特征平台。在线路径要求毫秒级延迟,用于服务直接调用的特征提取;离线路径则要处理 PB 级的历史图像数据,为算法团队生成训练数据集。整个系统部署在 Kubernetes 上,技术挑战在于,如何让这两种资源消耗模式截然不同的工作负载——IO 密集型的离线任务和延迟敏感的在线服务——在同一个集群里和谐共存,互不干扰。
第一阶段:构建低延迟的在线特征提取服务
在线路径是整个系统的门面,性能是第一指标。我们选择了 gRPC-Go 作为基础 RPC 框架,主要看中其基于 HTTP/2 的高性能、双向流能力以及 Protobuf 的高效序列化。
服务接口定义 (features.proto) 必须清晰且可扩展:
syntax = "proto3";
package features.v1;
option go_package = "github.com/my-org/cv-platform/gen/go/features/v1;featuresv1";
import "google/protobuf/timestamp.proto";
// FeatureExtractionService 定义了在线特征提取的核心功能
service FeatureExtractionService {
// Extract real-time features from a single image
rpc ExtractRealtime(ExtractRealtimeRequest) returns (ExtractRealtimeResponse);
}
// 请求体:包含图像原始数据和元信息
message ExtractRealtimeRequest {
string request_id = 1;
bytes image_data = 2; // JPEG or PNG encoded image
enum ImageFormat {
UNKNOWN = 0;
JPEG = 1;
PNG = 2;
}
ImageFormat image_format = 3;
}
// 响应体:包含提取的特征向量和处理元数据
message ExtractRealtimeResponse {
string request_id = 1;
string model_version = 2;
repeated float feature_vector = 3; // The dense feature vector
google.protobuf.Timestamp processed_at = 4;
map<string, float> quality_scores = 5; // e.g., "blur", "brightness"
}
服务器端的 Go 实现关注点在于健壮性和性能。一个常见的错误是直接在 gRPC handler 中执行耗时的 CV 计算,这会阻塞 gRPC 的工作线程。在真实项目中,我们会使用一个独立的 worker pool 来处理这些计算密集型任务,gRPC handler 仅负责请求的编解码和任务分发。
// internal/server/grpc.go
package server
import (
"context"
"fmt"
"image"
_ "image/jpeg"
_ "image/png"
"io"
"runtime"
"sync"
"github.com/my-org/cv-platform/internal/processor"
"github.com/uber-go/zap"
"google.golang.org/grpc"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
featuresv1 "github.com/my-org/cv-platform/gen/go/features/v1"
)
// workerJob 封装了需要处理的任务
type workerJob struct {
request *featuresv1.ExtractRealtimeRequest
result chan<- *featuresv1.ExtractRealtimeResponse
err chan<- error
}
// grpcServer 实现了 gRPC 服务接口
type grpcServer struct {
featuresv1.UnimplementedFeatureExtractionServiceServer
logger *zap.Logger
processor *processor.CVProcessor
jobQueue chan workerJob
wg sync.WaitGroup
}
// NewGRPCServer 创建并初始化 gRPC 服务器,包括 worker pool
func NewGRPCServer(logger *zap.Logger, proc *processor.CVProcessor, workerCount int) *grpcServer {
s := &grpcServer{
logger: logger,
processor: proc,
// 使用带缓冲的 channel 作为任务队列,避免生产者阻塞
jobQueue: make(chan workerJob, workerCount*2),
}
s.startWorkerPool(workerCount)
return s
}
func (s *grpcServer) startWorkerPool(workerCount int) {
s.logger.Info("Starting CV processor worker pool", zap.Int("count", workerCount))
for i := 0; i < workerCount; i++ {
s.wg.Add(1)
go func(workerID int) {
defer s.wg.Done()
s.logger.Info("Worker started", zap.Int("id", workerID))
for job := range s.jobQueue {
// 核心处理逻辑
img, format, err := image.Decode(bytes.NewReader(job.request.GetImageData()))
if err != nil {
job.err <- status.Errorf(codes.InvalidArgument, "failed to decode image: %v", err)
continue
}
s.logger.Debug("Processing image",
zap.String("request_id", job.request.GetRequestId()),
zap.String("format", format),
)
// 这里的 processor.Extract 是真正耗时的 CV 操作
vector, metadata, err := s.processor.Extract(job.request.Context(), img)
if err != nil {
job.err <- status.Errorf(codes.Internal, "feature extraction failed: %v", err)
continue
}
// 将结果发送回 handler
job.result <- &featuresv1.ExtractRealtimeResponse{
RequestId: job.request.GetRequestId(),
ModelVersion: metadata.ModelVersion,
FeatureVector: vector,
ProcessedAt: timestamppb.Now(),
QualityScores: metadata.QualityScores,
}
}
s.logger.Info("Worker stopped", zap.Int("id", workerID))
}(i)
}
}
// ExtractRealtime 是 gRPC 的入口方法
func (s *grpcServer) ExtractRealtime(ctx context.Context, req *featuresv1.ExtractRealtimeRequest) (*featuresv1.ExtractRealtimeResponse, error) {
if len(req.GetImageData()) == 0 {
return nil, status.Error(codes.InvalidArgument, "image_data is required")
}
resultChan := make(chan *featuresv1.ExtractRealtimeResponse, 1)
errChan := make(chan error, 1)
job := workerJob{
request: req,
result: resultChan,
err: errChan,
}
// 将任务推送到队列,如果队列满则等待或超时
select {
case s.jobQueue <- job:
// 任务已提交
case <-ctx.Done():
return nil, status.Error(codes.Canceled, "request canceled by client")
}
// 等待 worker 处理结果
select {
case res := <-resultChan:
return res, nil
case err := <-errChan:
return nil, err
case <-ctx.Done():
// 客户端可能在等待 worker 处理时取消了请求
return nil, status.Error(codes.Canceled, "request canceled while processing")
}
}
// Stop 优雅地关闭 worker pool
func (s *grpcServer) Stop() {
s.logger.Info("Stopping gRPC server and worker pool...")
close(s.jobQueue)
s.wg.Wait()
s.logger.Info("All workers have been stopped.")
}
这个实现在生产环境中是可靠的。它通过 jobQueue 将 gRPC I/O 线程与 CPU 密集型计算解耦,并且通过 context 传递实现了超时和取消的正确处理。
第二阶段:引入 Trino 进行大规模离线分析
算法团队需要定期基于全量历史数据(存储在 S3 数据湖中)重新训练模型。这些数据以 Parquet 格式组织,并通过 Hive Metastore 进行元数据管理。Trino(前身为 PrestoSQL)是这个场景下的理想选择,它允许分析师和数据科学家直接使用 SQL 对 S3 上的数据进行高性能的联邦查询。
在 Kubernetes 上部署 Trino 集群相对直接,但真正的挑战在于配置和查询优化。一个典型的离线特征生成 SQL 可能如下:
-- a_batch_feature_generation.sql
INSERT INTO hive.features.daily_feature_vectors
WITH latest_images AS (
SELECT
image_id,
raw_data_s3_path,
-- 使用窗口函数获取每个实体的最新图像
ROW_NUMBER() OVER(PARTITION BY entity_id ORDER BY event_timestamp DESC) as rn
FROM hive.raw_data.image_events
WHERE dt = '2023-10-26'
)
SELECT
image_id,
'model_v3.2' AS model_version,
-- UDF_EXTRACT_FEATURES 是一个自定义函数,它会触发一个外部进程(如调用我们的 gRPC 服务或一个独立的 Python 脚本)
-- 在生产中,这通常通过 Trino 的函数扩展机制或外部表功能实现
-- 这里为了演示,我们假设存在一个可以处理 S3 路径的 UDF
UDF_EXTRACT_FEATURES(read_file(raw_data_s3_path)) AS feature_vector,
CURRENT_TIMESTAMP AS processing_timestamp,
'2023-10-26' AS dt
FROM latest_images
WHERE rn = 1;
这里的坑在于 Trino worker 的资源消耗。当这样一个查询运行时,Trino 的 worker Pod 会并发地从 S3 拉取大量数据,进行解压和计算。这不仅会消耗大量的 CPU 和内存,更关键的是,它会产生巨大的网络流量,瞬间占满节点的网卡带宽。
第三阶段:网络风暴与服务降级
我们将在线服务和 Trino 集群部署到了同一个 Kubernetes 集群,以提高资源利用率。很快,问题就暴露了。每当离线训练的数据准备任务启动时,监控系统就会告警:在线 FeatureExtractionService 的 P99 延迟从稳定的 15ms 飙升到 500ms 以上,甚至出现大量超时错误。
graph TD
subgraph "Kubernetes Cluster"
subgraph "Node 1"
A[gRPC Pod]
B[Trino Worker Pod]
end
subgraph "Node 2"
C[gRPC Pod]
D[Trino Worker Pod]
end
subgraph "Node 3"
E[API Gateway] --> A & C
F[Trino Coordinator] --> B & D
end
S3[Data Lake on S3]
end
B -- "High-Throughput Read" --> S3
D -- "High-Throughput Read" --> S3
E -- "Low-Latency Request" --> A
E -- "Low-Latency Request" --> C
style B fill:#f9f,stroke:#333,stroke-width:2px
style D fill:#f9f,stroke:#333,stroke-width:2px
linkStyle 0 stroke-width:4px,stroke:red,fill:none;
linkStyle 1 stroke-width:4px,stroke:red,fill:none;
问题根源在于网络资源的无差别争抢。Trino worker 为了最大化吞吐量,会尽可能多地建立与 S3 的连接,其产生的网络流量是典型的“突发”和“贪婪”模式。当 gRPC Pod 和 Trino Worker Pod 调度到同一个物理节点时,Trino 的流量洪峰会挤占节点网卡的全部带宽,导致 gRPC 服务处理外部请求的网络包被延迟甚至丢弃,从而引发服务质量的急剧下降。
使用标准的 Kubernetes NetworkPolicy 无法解决这个问题,因为它主要关注 L3/L4 的可达性控制(即谁可以访问谁),而不能对已建立连接的流量进行整形或优先级划分。
第四阶段:Cilium Network Policy 实现精细化隔离
我们的解决方案是引入 Cilium 作为 Kubernetes 的 CNI 插件。Cilium 基于 eBPF 技术,可以直接在内核中对网络包进行编程,这使得它能够实现比 iptables 更高效、更强大的网络策略。
我们的目标是创建一个策略,严格限制在线服务和离线任务的网络行为,确保它们即使在同一个节点上也能互不干扰。
第一步:为 gRPC 服务创建严格的入口和出口策略
我们只希望 gRPC 服务能从 API Gateway 接收流量,并且只能访问特定的内部服务(如模型管理服务)。
# cilium-policy-grpc-service.yaml
apiVersion: "cilium.io/v2"
kind: CiliumNetworkPolicy
metadata:
name: "grpc-feature-extractor-policy"
namespace: "online-services"
spec:
endpointSelector:
matchLabels:
app: feature-extractor
# Ingress: 只允许来自 API Gateway 的流量
ingress:
- fromEndpoints:
- matchLabels:
"k8s:io.kubernetes.pod.namespace": "infra"
"k8s:app": "api-gateway"
toPorts:
- ports:
- port: "8080"
protocol: TCP
rules:
http:
- method: "POST"
path: "/features.v1.FeatureExtractionService/ExtractRealtime"
# Egress: 默认禁止所有出口流量,只允许必要的 DNS 和对模型服务的访问
egress:
- toEndpoints:
- matchLabels:
"k8s:io.kubernetes.pod.namespace": "kube-system"
"k8s:k8s-app": "kube-dns"
toPorts:
- ports:
- port: "53"
protocol: UDP
rules:
dns:
- matchPattern: "*"
- toEndpoints:
- matchLabels:
"k8s:io.kubernetes.pod.namespace": "ml-infra"
"k8s:app": "model-registry"
toPorts:
- ports:
- port: "9000"
protocol: TCP
这个策略不仅限制了 IP 和端口,还通过 rules.http 实现了 L7 层的 gRPC 方法级别的访问控制,这是标准 NetworkPolicy 做不到的。
第二步:为 Trino Worker 创建隔离策略
对于 Trino Worker,我们希望它能自由访问数据湖(S3)和内部的 Hive Metastore,但绝对禁止它与在线服务的命名空间进行任何通信。
# cilium-policy-trino-worker.yaml
apiVersion: "cilium.io/v2"
kind: CiliumNetworkPolicy
metadata:
name: "trino-worker-isolation"
namespace: "data-processing"
spec:
endpointSelector:
matchLabels:
app: trino
role: worker
# Egress: 只允许访问 S3, Metastore 和 Coordinator
egress:
- toCIDR:
# 假设这是 S3 的 CIDR 范围,真实环境中应使用更精确的范围
- "52.216.0.0/16"
- toEndpoints:
- matchLabels:
"k8s:io.kubernetes.pod.namespace": "data-processing"
"k8s:app": "trino"
"k8s:role": "coordinator"
- matchLabels:
"k8s:io.kubernetes.pod.namespace": "data-infra"
"k8s:app": "hive-metastore"
- toEndpoints:
- matchLabels:
"k8s:io.kubernetes.pod.namespace": "kube-system"
"k8s:k8s-app": "kube-dns"
toPorts:
- ports:
- port: "53"
protocol: UDP
rules:
dns:
- matchPattern: "*"
# Ingress: 只允许来自 Coordinator 的流量
ingress:
- fromEndpoints:
- matchLabels:
"k8s:io.kubernetes.pod.namespace": "data-processing"
"k8s:app": "trino"
"k8s:role": "coordinator"
应用这些策略后,我们通过 Cilium 的监控工具 Hubble 观察流量拓扑,之前混乱的网络调用图变得清晰有序。Trino worker 和 gRPC 服务之间的意外通信路径被彻底切断。虽然这解决了安全和访问控制问题,但还没有完全解决网络带宽争抢。
最终架构与局限性
最终我们通过 Cilium Network Policy 实现了应用级别的网络隔离。
graph TD
subgraph "Kubernetes Cluster with Cilium"
subgraph "Node 1"
A[gRPC Pod]
B[Trino Worker Pod]
CilA[Cilium eBPF]
CilB[Cilium eBPF]
A -- "gRPC Traffic" --> CilA
B -- "S3 Traffic" --> CilB
end
subgraph "Node 2"
C[gRPC Pod]
D[Trino Worker Pod]
CilC[Cilium eBPF]
CilD[Cilium eBPF]
C -- "gRPC Traffic" --> CilC
D -- "S3 Traffic" --> CilD
end
E[API Gateway]
F[Trino Coordinator]
S3[Data Lake on S3]
E -- "gRPC Request" --> A
E -- "gRPC Request" --> C
F -- "Task" --> B
F -- "Task" --> D
CilB -- "DENIED" --x A
CilD -- "DENIED" --x C
CilB -- "ALLOWED" --> S3
CilD -- "ALLOWED" --> S3
end
style CilA fill:#bbf,stroke:#333,stroke-width:2px
style CilB fill:#bbf,stroke:#333,stroke-width:2px
style CilC fill:#bbf,stroke:#333,stroke-width:2px
style CilD fill:#bbf,stroke:#333,stroke-width:2px
实施后,在离线任务运行时,在线服务的 P99 延迟恢复到了 20ms 以下,虽然仍有轻微抖动,但已在可接受的服务等级目标(SLO)范围内。Cilium eBPF 在内核层面执行策略,几乎没有性能损耗,同时提供了远超传统工具的控制粒度。
当前的方案有效隔离了不同工作负载的网络平面,防止了因意外访问或广播风暴导致的服务降级。然而,它并未从根本上解决物理网络带宽的争抢问题。当 Trino worker 的流量达到节点网卡物理上限时,同一节点上的其他 Pod 依然会受到影响。下一步的迭代方向是利用 Kubernetes 的 Node Taints 和 Tolerations,将在线服务和离线任务调度到物理隔离的节点组,实现更彻底的资源隔离。此外,Cilium 的带宽管理(Bandwidth Manager)功能也值得深入研究,通过为 Trino Pod 设置 egress 带宽上限,或许能在共享节点上实现更精细的服务质量(QoS)保障,但这需要更复杂的配置和对业务流量模式的精确建模。