利用 gRPC-Go 与 Trino 构建混合模式 CV 特征管道并以 Cilium 实施网络隔离


项目初期的需求很明确:搭建一个能同时支撑在线实时推理和离线模型训练的计算机视觉(CV)特征平台。在线路径要求毫秒级延迟,用于服务直接调用的特征提取;离线路径则要处理 PB 级的历史图像数据,为算法团队生成训练数据集。整个系统部署在 Kubernetes 上,技术挑战在于,如何让这两种资源消耗模式截然不同的工作负载——IO 密集型的离线任务和延迟敏感的在线服务——在同一个集群里和谐共存,互不干扰。

第一阶段:构建低延迟的在线特征提取服务

在线路径是整个系统的门面,性能是第一指标。我们选择了 gRPC-Go 作为基础 RPC 框架,主要看中其基于 HTTP/2 的高性能、双向流能力以及 Protobuf 的高效序列化。

服务接口定义 (features.proto) 必须清晰且可扩展:

syntax = "proto3";

package features.v1;

option go_package = "github.com/my-org/cv-platform/gen/go/features/v1;featuresv1";

import "google/protobuf/timestamp.proto";

// FeatureExtractionService 定义了在线特征提取的核心功能
service FeatureExtractionService {
  // Extract real-time features from a single image
  rpc ExtractRealtime(ExtractRealtimeRequest) returns (ExtractRealtimeResponse);
}

// 请求体:包含图像原始数据和元信息
message ExtractRealtimeRequest {
  string request_id = 1;
  bytes image_data = 2; // JPEG or PNG encoded image
  enum ImageFormat {
    UNKNOWN = 0;
    JPEG = 1;
    PNG = 2;
  }
  ImageFormat image_format = 3;
}

// 响应体:包含提取的特征向量和处理元数据
message ExtractRealtimeResponse {
  string request_id = 1;
  string model_version = 2;
  repeated float feature_vector = 3; // The dense feature vector
  google.protobuf.Timestamp processed_at = 4;
  map<string, float> quality_scores = 5; // e.g., "blur", "brightness"
}

服务器端的 Go 实现关注点在于健壮性和性能。一个常见的错误是直接在 gRPC handler 中执行耗时的 CV 计算,这会阻塞 gRPC 的工作线程。在真实项目中,我们会使用一个独立的 worker pool 来处理这些计算密集型任务,gRPC handler 仅负责请求的编解码和任务分发。

// internal/server/grpc.go

package server

import (
	"context"
	"fmt"
	"image"
	_ "image/jpeg"
	_ "image/png"
	"io"
	"runtime"
	"sync"

	"github.com/my-org/cv-platform/internal/processor"
	"github.com/uber-go/zap"
	"google.golang.org/grpc"
	"google.golang.org/grpc/codes"
	"google.golang.org/grpc/status"

	featuresv1 "github.com/my-org/cv-platform/gen/go/features/v1"
)

// workerJob 封装了需要处理的任务
type workerJob struct {
	request *featuresv1.ExtractRealtimeRequest
	result  chan<- *featuresv1.ExtractRealtimeResponse
	err     chan<- error
}

// grpcServer 实现了 gRPC 服务接口
type grpcServer struct {
	featuresv1.UnimplementedFeatureExtractionServiceServer
	logger    *zap.Logger
	processor *processor.CVProcessor
	jobQueue  chan workerJob
	wg        sync.WaitGroup
}

// NewGRPCServer 创建并初始化 gRPC 服务器,包括 worker pool
func NewGRPCServer(logger *zap.Logger, proc *processor.CVProcessor, workerCount int) *grpcServer {
	s := &grpcServer{
		logger:    logger,
		processor: proc,
		// 使用带缓冲的 channel 作为任务队列,避免生产者阻塞
		jobQueue: make(chan workerJob, workerCount*2),
	}

	s.startWorkerPool(workerCount)
	return s
}

func (s *grpcServer) startWorkerPool(workerCount int) {
	s.logger.Info("Starting CV processor worker pool", zap.Int("count", workerCount))
	for i := 0; i < workerCount; i++ {
		s.wg.Add(1)
		go func(workerID int) {
			defer s.wg.Done()
			s.logger.Info("Worker started", zap.Int("id", workerID))
			for job := range s.jobQueue {
				// 核心处理逻辑
				img, format, err := image.Decode(bytes.NewReader(job.request.GetImageData()))
				if err != nil {
					job.err <- status.Errorf(codes.InvalidArgument, "failed to decode image: %v", err)
					continue
				}

				s.logger.Debug("Processing image",
					zap.String("request_id", job.request.GetRequestId()),
					zap.String("format", format),
				)

				// 这里的 processor.Extract 是真正耗时的 CV 操作
				vector, metadata, err := s.processor.Extract(job.request.Context(), img)
				if err != nil {
					job.err <- status.Errorf(codes.Internal, "feature extraction failed: %v", err)
					continue
				}

				// 将结果发送回 handler
				job.result <- &featuresv1.ExtractRealtimeResponse{
					RequestId:     job.request.GetRequestId(),
					ModelVersion:  metadata.ModelVersion,
					FeatureVector: vector,
					ProcessedAt:   timestamppb.Now(),
					QualityScores: metadata.QualityScores,
				}
			}
			s.logger.Info("Worker stopped", zap.Int("id", workerID))
		}(i)
	}
}

// ExtractRealtime 是 gRPC 的入口方法
func (s *grpcServer) ExtractRealtime(ctx context.Context, req *featuresv1.ExtractRealtimeRequest) (*featuresv1.ExtractRealtimeResponse, error) {
	if len(req.GetImageData()) == 0 {
		return nil, status.Error(codes.InvalidArgument, "image_data is required")
	}

	resultChan := make(chan *featuresv1.ExtractRealtimeResponse, 1)
	errChan := make(chan error, 1)

	job := workerJob{
		request: req,
		result:  resultChan,
		err:     errChan,
	}

	// 将任务推送到队列,如果队列满则等待或超时
	select {
	case s.jobQueue <- job:
		// 任务已提交
	case <-ctx.Done():
		return nil, status.Error(codes.Canceled, "request canceled by client")
	}

	// 等待 worker 处理结果
	select {
	case res := <-resultChan:
		return res, nil
	case err := <-errChan:
		return nil, err
	case <-ctx.Done():
		// 客户端可能在等待 worker 处理时取消了请求
		return nil, status.Error(codes.Canceled, "request canceled while processing")
	}
}

// Stop 优雅地关闭 worker pool
func (s *grpcServer) Stop() {
	s.logger.Info("Stopping gRPC server and worker pool...")
	close(s.jobQueue)
	s.wg.Wait()
	s.logger.Info("All workers have been stopped.")
}

这个实现在生产环境中是可靠的。它通过 jobQueue 将 gRPC I/O 线程与 CPU 密集型计算解耦,并且通过 context 传递实现了超时和取消的正确处理。

第二阶段:引入 Trino 进行大规模离线分析

算法团队需要定期基于全量历史数据(存储在 S3 数据湖中)重新训练模型。这些数据以 Parquet 格式组织,并通过 Hive Metastore 进行元数据管理。Trino(前身为 PrestoSQL)是这个场景下的理想选择,它允许分析师和数据科学家直接使用 SQL 对 S3 上的数据进行高性能的联邦查询。

在 Kubernetes 上部署 Trino 集群相对直接,但真正的挑战在于配置和查询优化。一个典型的离线特征生成 SQL 可能如下:

-- a_batch_feature_generation.sql
INSERT INTO hive.features.daily_feature_vectors
WITH latest_images AS (
    SELECT 
        image_id,
        raw_data_s3_path,
        -- 使用窗口函数获取每个实体的最新图像
        ROW_NUMBER() OVER(PARTITION BY entity_id ORDER BY event_timestamp DESC) as rn
    FROM hive.raw_data.image_events
    WHERE dt = '2023-10-26'
)
SELECT
    image_id,
    'model_v3.2' AS model_version,
    -- UDF_EXTRACT_FEATURES 是一个自定义函数,它会触发一个外部进程(如调用我们的 gRPC 服务或一个独立的 Python 脚本)
    -- 在生产中,这通常通过 Trino 的函数扩展机制或外部表功能实现
    -- 这里为了演示,我们假设存在一个可以处理 S3 路径的 UDF
    UDF_EXTRACT_FEATURES(read_file(raw_data_s3_path)) AS feature_vector,
    CURRENT_TIMESTAMP AS processing_timestamp,
    '2023-10-26' AS dt
FROM latest_images
WHERE rn = 1;

这里的坑在于 Trino worker 的资源消耗。当这样一个查询运行时,Trino 的 worker Pod 会并发地从 S3 拉取大量数据,进行解压和计算。这不仅会消耗大量的 CPU 和内存,更关键的是,它会产生巨大的网络流量,瞬间占满节点的网卡带宽。

第三阶段:网络风暴与服务降级

我们将在线服务和 Trino 集群部署到了同一个 Kubernetes 集群,以提高资源利用率。很快,问题就暴露了。每当离线训练的数据准备任务启动时,监控系统就会告警:在线 FeatureExtractionService 的 P99 延迟从稳定的 15ms 飙升到 500ms 以上,甚至出现大量超时错误。

graph TD
    subgraph "Kubernetes Cluster"
        subgraph "Node 1"
            A[gRPC Pod]
            B[Trino Worker Pod]
        end
        subgraph "Node 2"
            C[gRPC Pod]
            D[Trino Worker Pod]
        end
        subgraph "Node 3"
            E[API Gateway] --> A & C
            F[Trino Coordinator] --> B & D
        end
        S3[Data Lake on S3]
    end

    B -- "High-Throughput Read" --> S3
    D -- "High-Throughput Read" --> S3
    
    E -- "Low-Latency Request" --> A
    E -- "Low-Latency Request" --> C

    style B fill:#f9f,stroke:#333,stroke-width:2px
    style D fill:#f9f,stroke:#333,stroke-width:2px

    linkStyle 0 stroke-width:4px,stroke:red,fill:none;
    linkStyle 1 stroke-width:4px,stroke:red,fill:none;

问题根源在于网络资源的无差别争抢。Trino worker 为了最大化吞吐量,会尽可能多地建立与 S3 的连接,其产生的网络流量是典型的“突发”和“贪婪”模式。当 gRPC Pod 和 Trino Worker Pod 调度到同一个物理节点时,Trino 的流量洪峰会挤占节点网卡的全部带宽,导致 gRPC 服务处理外部请求的网络包被延迟甚至丢弃,从而引发服务质量的急剧下降。

使用标准的 Kubernetes NetworkPolicy 无法解决这个问题,因为它主要关注 L3/L4 的可达性控制(即谁可以访问谁),而不能对已建立连接的流量进行整形或优先级划分。

第四阶段:Cilium Network Policy 实现精细化隔离

我们的解决方案是引入 Cilium 作为 Kubernetes 的 CNI 插件。Cilium 基于 eBPF 技术,可以直接在内核中对网络包进行编程,这使得它能够实现比 iptables 更高效、更强大的网络策略。

我们的目标是创建一个策略,严格限制在线服务和离线任务的网络行为,确保它们即使在同一个节点上也能互不干扰。

第一步:为 gRPC 服务创建严格的入口和出口策略

我们只希望 gRPC 服务能从 API Gateway 接收流量,并且只能访问特定的内部服务(如模型管理服务)。

# cilium-policy-grpc-service.yaml
apiVersion: "cilium.io/v2"
kind: CiliumNetworkPolicy
metadata:
  name: "grpc-feature-extractor-policy"
  namespace: "online-services"
spec:
  endpointSelector:
    matchLabels:
      app: feature-extractor
  
  # Ingress: 只允许来自 API Gateway 的流量
  ingress:
  - fromEndpoints:
    - matchLabels:
        "k8s:io.kubernetes.pod.namespace": "infra"
        "k8s:app": "api-gateway"
    toPorts:
    - ports:
      - port: "8080"
        protocol: TCP
      rules:
        http:
        - method: "POST"
          path: "/features.v1.FeatureExtractionService/ExtractRealtime"

  # Egress: 默认禁止所有出口流量,只允许必要的 DNS 和对模型服务的访问
  egress:
  - toEndpoints:
    - matchLabels:
        "k8s:io.kubernetes.pod.namespace": "kube-system"
        "k8s:k8s-app": "kube-dns"
    toPorts:
    - ports:
      - port: "53"
        protocol: UDP
      rules:
        dns:
        - matchPattern: "*"
  - toEndpoints:
    - matchLabels:
        "k8s:io.kubernetes.pod.namespace": "ml-infra"
        "k8s:app": "model-registry"
    toPorts:
    - ports:
      - port: "9000"
        protocol: TCP

这个策略不仅限制了 IP 和端口,还通过 rules.http 实现了 L7 层的 gRPC 方法级别的访问控制,这是标准 NetworkPolicy 做不到的。

第二步:为 Trino Worker 创建隔离策略

对于 Trino Worker,我们希望它能自由访问数据湖(S3)和内部的 Hive Metastore,但绝对禁止它与在线服务的命名空间进行任何通信。

# cilium-policy-trino-worker.yaml
apiVersion: "cilium.io/v2"
kind: CiliumNetworkPolicy
metadata:
  name: "trino-worker-isolation"
  namespace: "data-processing"
spec:
  endpointSelector:
    matchLabels:
      app: trino
      role: worker

  # Egress: 只允许访问 S3, Metastore 和 Coordinator
  egress:
  - toCIDR:
    # 假设这是 S3 的 CIDR 范围,真实环境中应使用更精确的范围
    - "52.216.0.0/16" 
  - toEndpoints:
    - matchLabels:
        "k8s:io.kubernetes.pod.namespace": "data-processing"
        "k8s:app": "trino"
        "k8s:role": "coordinator"
    - matchLabels:
        "k8s:io.kubernetes.pod.namespace": "data-infra"
        "k8s:app": "hive-metastore"
  - toEndpoints:
    - matchLabels:
        "k8s:io.kubernetes.pod.namespace": "kube-system"
        "k8s:k8s-app": "kube-dns"
    toPorts:
    - ports:
      - port: "53"
        protocol: UDP
      rules:
        dns:
        - matchPattern: "*"

  # Ingress: 只允许来自 Coordinator 的流量
  ingress:
  - fromEndpoints:
    - matchLabels:
        "k8s:io.kubernetes.pod.namespace": "data-processing"
        "k8s:app": "trino"
        "k8s:role": "coordinator"

应用这些策略后,我们通过 Cilium 的监控工具 Hubble 观察流量拓扑,之前混乱的网络调用图变得清晰有序。Trino worker 和 gRPC 服务之间的意外通信路径被彻底切断。虽然这解决了安全和访问控制问题,但还没有完全解决网络带宽争抢。

最终架构与局限性

最终我们通过 Cilium Network Policy 实现了应用级别的网络隔离。

graph TD
    subgraph "Kubernetes Cluster with Cilium"
        subgraph "Node 1"
            A[gRPC Pod]
            B[Trino Worker Pod]
            CilA[Cilium eBPF]
            CilB[Cilium eBPF]
            A -- "gRPC Traffic" --> CilA
            B -- "S3 Traffic" --> CilB
        end
        
        subgraph "Node 2"
            C[gRPC Pod]
            D[Trino Worker Pod]
            CilC[Cilium eBPF]
            CilD[Cilium eBPF]
            C -- "gRPC Traffic" --> CilC
            D -- "S3 Traffic" --> CilD
        end
        
        E[API Gateway]
        F[Trino Coordinator]
        S3[Data Lake on S3]

        E -- "gRPC Request" --> A
        E -- "gRPC Request" --> C
        
        F -- "Task" --> B
        F -- "Task" --> D

        CilB -- "DENIED" --x A
        CilD -- "DENIED" --x C
        
        CilB -- "ALLOWED" --> S3
        CilD -- "ALLOWED" --> S3
    end

    style CilA fill:#bbf,stroke:#333,stroke-width:2px
    style CilB fill:#bbf,stroke:#333,stroke-width:2px
    style CilC fill:#bbf,stroke:#333,stroke-width:2px
    style CilD fill:#bbf,stroke:#333,stroke-width:2px

实施后,在离线任务运行时,在线服务的 P99 延迟恢复到了 20ms 以下,虽然仍有轻微抖动,但已在可接受的服务等级目标(SLO)范围内。Cilium eBPF 在内核层面执行策略,几乎没有性能损耗,同时提供了远超传统工具的控制粒度。

当前的方案有效隔离了不同工作负载的网络平面,防止了因意外访问或广播风暴导致的服务降级。然而,它并未从根本上解决物理网络带宽的争抢问题。当 Trino worker 的流量达到节点网卡物理上限时,同一节点上的其他 Pod 依然会受到影响。下一步的迭代方向是利用 Kubernetes 的 Node Taints 和 Tolerations,将在线服务和离线任务调度到物理隔离的节点组,实现更彻底的资源隔离。此外,Cilium 的带宽管理(Bandwidth Manager)功能也值得深入研究,通过为 Trino Pod 设置 egress 带宽上限,或许能在共享节点上实现更精细的服务质量(QoS)保障,但这需要更复杂的配置和对业务流量模式的精确建模。


  目录