构建基于 Go 的 Kubernetes 准入控制器实现依赖漏洞的强制性扫描


CI/CD流水线中的依赖扫描是必要的,但这远远不够。一个常见的安全盲区是:已经构建并推送到镜像仓库的、甚至正在线上运行的镜像,其依赖库中新爆出的高危漏洞(例如Log4Shell)如何被及时发现并阻止新的实例被创建?流水线只保证了镜像在构建那一刻的安全性,却无法对存量资产和运行时环境提供持续的保护。我们团队面临的挑战正是如此:需要一种强制性机制,在Pod创建的最后一刻,对即将运行的容器镜像进行最终的安全审查,拒绝任何带有不可接受风险的镜像进入我们的生产集群。

初步的构想是利用Kubernetes原生的扩展点。一番调研后,我们很快锁定了ValidatingAdmissionWebhook。它能在API Server持久化对象之前,将请求(例如一个Pod的创建请求)发送到我们自己的服务进行校验。这正是我们需要的入口。如果我们的服务发现镜像存在高危漏洞,就可以直接拒绝该请求,从源头上阻止不安全的容器运行。

技术选型决策围绕着几个核心问题展开:

  1. 开发语言与框架: 鉴于这是为Kubernetes构建的核心组件,Go是毫无疑问的首选。为了避免从零开始处理API Server交互、证书管理等繁琐的细节,我们选择了Kubebuilder框架。它能为我们生成一个功能完备的Webhook项目骨架,让我们能专注于核心的校验逻辑。
  2. 漏洞扫描引擎: 我们需要在Webhook服务内部嵌入一个扫描器。在真实项目中,调用外部SaaS服务进行扫描会引入不可控的网络延迟和单点故障。我们对比了几个开源扫描器,最终选择了Trivy。它的优势在于:
    • 可以作为Go库直接集成,避免了进程调用的开销。
    • 扫描速度快,对准入控制这种要求低延迟的场景至关重要。
    • 漏洞数据库更新及时,且可以离线加载。
  3. 策略与配置: 策略不能硬编码。我们需要能够动态配置哪些漏洞等级(CRITICAL, HIGH等)需要被阻止,以及为某些特殊的镜像或漏洞提供一个“白名单”机制。最终决定使用ConfigMap来承载这些配置,并让Webhook动态加载。

整个流程的架构如下:

sequenceDiagram
    participant User
    participant K8s API Server
    participant Admission Webhook
    participant Image Registry

    User->>K8s API Server: kubectl apply -f pod.yaml
    K8s API Server->>Admission Webhook: POST /validate (AdmissionReview)
    Admission Webhook-->>K8s API Server: Parsing AdmissionReview request
    Admission Webhook->>Image Registry: Pull image layers (if not cached)
    Note right of Admission Webhook: Trivy scans image layers in-memory
    Admission Webhook-->>Admission Webhook: Generate vulnerability report
    alt Vulnerabilities found > threshold
        Admission Webhook->>K8s API Server: Respond with {allowed: false, reason: ...}
        K8s API Server->>User: Error: Admission webhook denied the request
    else No critical vulnerabilities
        Admission Webhook->>K8s API Server: Respond with {allowed: true}
        K8s API Server-->>K8s API Server: Persist Pod object to etcd
    end

步骤化实现:从骨架到生产级Webhook

1. 项目初始化

我们使用Kubebuilder来初始化项目。这会生成所有必要的CRD定义、控制器脚手架和部署清单。

# 安装 Kubebuilder (假设已安装)
# export KUBEBUILDER_ASSETS=$(setup-envtest use -p path <version>)

# 初始化项目
mkdir image-vulnerability-webhook
cd image-vulnerability-webhook
go mod init github.com/my-org/image-vulnerability-webhook
kubebuilder init --domain my.domain --repo github.com/my-org/image-vulnerability-webhook

# 创建 Webhook API
kubebuilder create webhook --group admission --version v1 --kind Pod --validating --defaulting=false

这个命令会在api/v1目录下生成pod_webhook.go,这是我们逻辑的入口。

2. 核心校验逻辑

我们需要修改pkg/webhooks/pod_validator.go (根据kubebuilder版本,文件名可能略有不同) 中的Handle方法。这是接收和处理AdmissionReview请求的核心。

// pkg/webhooks/pod_validator.go

package webhooks

import (
	"context"
	"encoding/json"
	"net/http"
	"sync"

	"github.com/my-org/image-vulnerability-webhook/pkg/scanner"
	"github.com/my-org/image-vulnerability-webhook/pkg/config"

	corev1 "k8s.io/api/core/v1"
	"sigs.k8s.io/controller-runtime/pkg/client"
	"sigs.k8s.io/controller-runtime/pkg/log"
	"sigs.k8s.io/controller-runtime/pkg/webhook/admission"
)

// PodValidator validates Pods
type PodValidator struct {
	Client  client.Client
	decoder *admission.Decoder
	Config  *config.WebhookConfig
	Scanner *scanner.ImageScanner
	// 使用 sync.Map 作为并发安全的镜像扫描结果缓存
	scanCache sync.Map
}

var logger = log.Log.WithName("pod-validator")

func NewPodValidator(c client.Client, cfg *config.WebhookConfig, sc *scanner.ImageScanner) *PodValidator {
	return &PodValidator{
		Client:  c,
		Config:  cfg,
		Scanner: sc,
	}
}

// Handle is the core validation logic.
func (v *PodValidator) Handle(ctx context.Context, req admission.Request) admission.Response {
	pod := &corev1.Pod{}
	err := v.decoder.Decode(req, pod)
	if err != nil {
		logger.Error(err, "Failed to decode pod object")
		return admission.Errored(http.StatusBadRequest, err)
	}

	// 命名空间豁免检查
	if v.Config.IsNamespaceExempt(pod.Namespace) {
		logger.Info("Namespace is exempt, skipping validation", "namespace", pod.Namespace)
		return admission.Allowed("namespace is exempt")
	}

	images := collectImagesFromPod(pod)
	logger.Info("Starting validation for pod", "namespace", pod.Namespace, "name", pod.Name, "images", images)

	var wg sync.WaitGroup
	validationErrors := make(chan string, len(images))

	for _, image := range images {
		wg.Add(1)
		go func(img string) {
			defer wg.Done()
			
			// 检查缓存
			if cachedResult, found := v.scanCache.Load(img); found {
				if cachedResult.(bool) {
					logger.Info("Image already scanned and approved (cached)", "image", img)
					return // 允许
				}
				// 如果缓存结果是拒绝,则需要重新扫描,因为漏洞数据库可能更新
			}
			
			// 检查镜像是否在白名单中
			if v.Config.IsImageWhitelisted(img) {
				logger.Info("Image is whitelisted, skipping scan", "image", img)
				return
			}

			report, err := v.Scanner.Scan(ctx, img)
			if err != nil {
				// 这里的坑在于:如果扫描器本身失败(例如无法拉取镜像,或DB损坏),
				// 我们应该拒绝请求还是放行?根据`failurePolicy`,我们选择拒绝以保障安全。
				logger.Error(err, "Failed to scan image", "image", img)
				validationErrors <- "Failed to scan image " + img + ": " + err.Error()
				return
			}

			// 检查漏洞是否符合策略
			if validationMsg := v.Config.ValidateReport(img, report); validationMsg != "" {
				validationErrors <- validationMsg
				v.scanCache.Store(img, false) // 缓存拒绝结果
			} else {
				v.scanCache.Store(img, true) // 缓存允许结果
			}

		}(image)
	}

	wg.Wait()
	close(validationErrors)

	var errorMessages []string
	for msg := range validationErrors {
		errorMessages = append(errorMessages, msg)
	}

	if len(errorMessages) > 0 {
		fullMessage, _ := json.Marshal(errorMessages)
		logger.Info("Denying pod creation due to vulnerabilities", "pod", pod.Name, "errors", string(fullMessage))
		return admission.Denied(string(fullMessage))
	}
	
	logger.Info("Successfully validated pod, allowing creation", "pod", pod.Name)
	return admission.Allowed("all images are compliant with the security policy")
}

// InjectDecoder injects the decoder.
func (v *PodValidator) InjectDecoder(d *admission.Decoder) error {
	v.decoder = d
	return nil
}

// collectImagesFromPod extracts all container and init container images from a pod spec.
func collectImagesFromPod(pod *corev1.Pod) []string {
	images := map[string]struct{}{}
	for _, container := range pod.Spec.Containers {
		images[container.Image] = struct{}{}
	}
	for _, container := range pod.Spec.InitContainers {
		images[container.Image] = struct{}{}
	}
	
	imageList := make([]string, 0, len(images))
	for image := range images {
		imageList = append(imageList, image)
	}
	return imageList
}

3. 集成Trivy作为扫描引擎

这是最关键的部分。我们需要创建一个scanner包,封装Trivy的库调用。这比执行trivy命令行要高效和安全得多。

// pkg/scanner/trivy_scanner.go
package scanner

import (
	"context"
	"os"
	"time"

	"github.com/aquasecurity/fanal/analyzer"
	"github.com/aquasecurity/fanal/applier"
	"github.com/aquasecurity/fanal/artifact"
	"github.com/aquasecurity/fanal/cache"
	"github.com/aquasecurity/fanal/image"
	"github.com/aquasecurity/fanal/types"
	"github.com/aquasecurity/trivy/pkg/scanner"
	"github.com/aquasecurity/trivy/pkg/report"
	trivytypes "github.com/aquasecurity/trivy/pkg/types"

	"k8s.io/utils/clock"
	"sigs.k8s.io/controller-runtime/pkg/log"
)

var logger = log.Log.WithName("trivy-scanner")

type ImageScanner struct {
	localCache cache.Cache
}

// NewImageScanner 初始化Trivy扫描器.
// 这是一个耗时操作,因为它需要下载漏洞数据库。
// 在真实项目中,应该确保这个数据库被持久化存储,例如使用PVC。
func NewImageScanner(ctx context.Context, cacheDir string) (*ImageScanner, error) {
	// 使用文件系统缓存,而不是默认的内存缓存
	fsCache, err := cache.NewFSCache(cacheDir)
	if err != nil {
		return nil, err
	}

	// 单元测试时可以使用 `clock.NewFakeClock(time.Now())`
	// 这里使用真实时钟
	c := clock.RealClock{}

	// 下载漏洞数据库
	// trivy/pkg/db.NewClient(...).Download(ctx, ...)
	// 为了简化示例,我们假设数据库已存在于cacheDir中。
	// 在生产环境中,你需要一个InitContainer或者一个定时任务来定期更新这个数据库。
	logger.Info("Initializing Trivy vulnerability database... (This may take a while)")
	// ... 此处省略DB下载和更新的复杂逻辑 ...
	logger.Info("Trivy database initialized.")

	return &ImageScanner{localCache: fsCache}, nil
}

// Scan 执行镜像扫描
func (s *ImageScanner) Scan(ctx context.Context, imageName string) (report.Results, error) {
	// 扫描选项,可以配置私有仓库的认证信息
	// os.Setenv("DOCKER_USER", "user")
	// os.Setenv("DOCKER_PASSWORD", "password")
	imageOptions := types.ImageOptions{
		DockerOptions: types.DockerOptions{
			// 可以配置私有仓库的认证信息
			// insecure a.k.a. http
			Insecure: false, 
		},
	}
	img, cleanup, err := image.NewContainerImage(ctx, imageName, imageOptions)
	if err != nil {
		return nil, err
	}
	defer cleanup()

	// 构件扫描的目标
	artifactOption := artifact.Option{
		// 禁用分析器组,例如许可分析器,以提高速度
		DisabledAnalyzers: []analyzer.Type{analyzer.License},
	}

	artifact, err := image.NewArtifact(img, s.localCache, artifactOption)
	if err != nil {
		return nil, err
	}
	
	// 创建扫描器实例
	scannerClient := scanner.NewScanner(applier.NewApplier(s.localCache), artifact)
	
	// 执行扫描
	results, err := scannerClient.ScanArtifact(ctx, trivytypes.ScanOptions{
		// 只扫描漏洞
		VulnType: []string{trivytypes.VulnTypeOS, trivytypes.VulnTypeLibrary},
		// 忽略未修复的漏洞可以减少噪音,但在安全策略中这可能是个风险点
		// IgnoreUnfixed: true,
	})
	if err != nil {
		return nil, err
	}
	
	return results, nil
}

注意: Trivy的内部API可能会随着版本更新而改变,上述代码基于其某个版本。在生产项目中,需要锁定具体的库版本。

4. 策略配置与动态加载

我们将配置存储在一个ConfigMap中,并在Webhook启动时加载,同时通过fsnotify监控其变化,实现配置热加载。

# deploy/configmap.yaml
apiVersion: v1
kind: ConfigMap
metadata:
  name: image-validator-config
  namespace: image-validator-system # 和webhook部署在同一个namespace
data:
  config.yaml: |
    # 阻塞的最低漏洞等级: UNKNOWN, LOW, MEDIUM, HIGH, CRITICAL
    blockSeverity: "CRITICAL"
    # 豁免的命名空间
    exemptNamespaces:
      - kube-system
      - image-validator-system
    # 镜像白名单,支持通配符
    whitelistImages:
      - "k8s.gcr.io/*"
      - "my-company.com/legacy-app:*"
    # 漏洞白名单,CVE-ID: [image1, image2, ...]
    # 如果一个CVE在所有镜像中都忽略,使用"*"
    whitelistVulnerabilities:
      "CVE-2022-1234":
        - "nginx:1.21.0"
      "CVE-2023-5678":
        - "*"

Go代码中需要一个config包来解析和使用这份配置。

// pkg/config/config.go
package config

import (
	"gopkg.in/yaml.v2"
	"io/ioutil"
	"strings"
	
	"github.com/aquasecurity/trivy/pkg/report"
	"github.com/aquasecurity/trivy/pkg/types"
	"path/filepath"
)

type WebhookConfig struct {
	BlockSeverity            string                       `yaml:"blockSeverity"`
	ExemptNamespaces         []string                     `yaml:"exemptNamespaces"`
	WhitelistImages          []string                     `yaml:"whitelistImages"`
	WhitelistVulnerabilities map[string][]string          `yaml:"whitelistVulnerabilities"`
	
	// 内部字段
	blockSeverityLevel int
}

func LoadConfig(path string) (*WebhookConfig, error) {
	data, err := ioutil.ReadFile(path)
	if err != nil {
		return nil, err
	}

	var cfg WebhookConfig
	if err := yaml.Unmarshal(data, &cfg); err != nil {
		return nil, err
	}
	
	cfg.blockSeverityLevel = severityToLevel(cfg.BlockSeverity)
	return &cfg, nil
}

// ValidateReport 根据配置检查扫描报告
func (c *WebhookConfig) ValidateReport(imageName string, results report.Results) string {
	var violations []string
	for _, result := range results {
		for _, vuln := range result.Vulnerabilities {
			// 检查漏洞白名单
			if c.isVulnerabilityWhitelisted(vuln.VulnerabilityID, imageName) {
				continue
			}

			// 检查漏洞等级
			if severityToLevel(vuln.Severity) >= c.blockSeverityLevel {
				violation := "Image '" + imageName + "' has a forbidden vulnerability: " +
					vuln.VulnerabilityID + " (" + vuln.Severity + ") in package " + vuln.PkgName
				violations = append(violations, violation)
			}
		}
	}
	if len(violations) > 0 {
		return strings.Join(violations, "; ")
	}
	return ""
}

func (c *WebhookConfig) isVulnerabilityWhitelisted(cveID, imageName string) bool {
	images, ok := c.WhitelistVulnerabilities[cveID]
	if !ok {
		return false
	}
	for _, whitelistedImagePattern := range images {
		if whitelistedImagePattern == "*" {
			return true
		}
		// 使用filepath.Match支持简单的通配符
		if matched, _ := filepath.Match(whitelistedImagePattern, imageName); matched {
			return true
		}
	}
	return false
}

// 其他辅助函数,如 IsNamespaceExempt, IsImageWhitelisted...

// severityToLevel 将字符串严重性转换为可比较的整数
func severityToLevel(severity string) int {
	switch severity {
	case "UNKNOWN":
		return 0
	case "LOW":
		return 1
	case "MEDIUM":
		return 2
	case "HIGH":
		return 3
	case "CRITICAL":
		return 4
	default:
		return 0
	}
}

5. 部署到Kubernetes

Kubebuilder已经为我们生成了大部分部署清单。我们需要做一些调整:

  1. 挂载ConfigMap:config/default/manager_patches.yaml中,为manager容器添加volumevolumeMounts,将image-validator-config挂载到容器的/etc/config目录。
  2. 挂载缓存卷: 为了持久化Trivy的漏洞数据库和缓存,需要创建一个PersistentVolumeClaim并挂载到manager容器中。这能极大加快冷启动和后续扫描速度。
  3. 配置ValidatingWebhookConfiguration: config/webhook/manifests.yaml中的ValidatingWebhookConfiguration是关键。
    • rules: 确保它只拦截PodCREATE操作。
    • failurePolicy: 在生产环境中应设为Fail。这意味着如果我们的Webhook服务不可用,所有Pod的创建都会失败。这需要确保Webhook本身是高可用的(至少部署2个副本)。
    • caBundle: Kubebuildercert-manager会自动处理这部分的TLS证书注入。

部署后的ValidatingWebhookConfiguration看起来像这样:

apiVersion: admissionregistration.k8s.io/v1
kind: ValidatingWebhookConfiguration
metadata:
  name: image-validator-webhook-configuration
webhooks:
- admissionReviewVersions:
  - v1
  clientConfig:
    service:
      name: image-validator-webhook-service
      namespace: image-validator-system
      path: /validate-v1-pod
  failurePolicy: Fail # 生产环境的关键配置
  name: vpod.kb.io
  rules:
  - apiGroups:
    - ""
    apiVersions:
    - v1
    operations:
    - CREATE
    resources:
    - pods
  sideEffects: None

最终成果与测试

部署完成后,我们可以进行测试。

  1. 尝试部署一个已知的漏洞镜像:

    # 使用一个带有已知高危漏洞的log4j版本
    kubectl run vulnerable-app --image=ghcr.io/christophetd/log4shell-vulnerable-app

    API Server会返回一个错误,信息来自我们的Webhook:

    Error from server: admission webhook "vpod.kb.io" denied the request: ["Image 'ghcr.io/christophetd/log4shell-vulnerable-app:latest' has a forbidden vulnerability: CVE-2021-44228 (CRITICAL) in package log4j-core"]
  2. 尝试部署一个干净的镜像:

    kubectl run clean-app --image=nginx:1.23.3-alpine

    Pod成功创建:

    pod/clean-app created
  3. 测试白名单:
    ghcr.io/christophetd/log4shell-vulnerable-app添加到ConfigMap的whitelistImages中,并等待配置热加载(或重启Webhook Pod),再次尝试部署,这次应该会成功。

局限性与未来迭代路径

这个方案虽然解决了核心问题,但在真实生产环境中,它仍然存在一些局限性,并为未来的迭代指明了方向:

  1. 同步扫描的延迟: 对于一个从未被扫描过的大镜像,同步扫描可能会花费数十秒,这会显著增加kubectl apply的等待时间,甚至可能因为超时而失败。一个更优的架构是异步扫描模型:创建一个ImageScanRequest之类的CRD,一个独立的控制器负责监听这些CRD,在后台扫描镜像并将结果(漏洞报告、是否合规)写回CRD的status字段。我们的准入控制器则从拦截Pod请求,变为查询对应镜像的ImageScanRequest CRD状态。这个查询非常快(毫秒级),从而解决了延迟问题。

  2. 漏洞数据库的更新: 当前设计中,Trivy数据库的更新需要手动触发或伴随Pod重启。一个健壮的系统需要一个后台任务(例如CronJob)来定期下载最新的漏洞数据库到PVC中,Webhook Pod则监视数据库文件的变化并重新加载。

  3. 策略引擎的复杂性: 使用YAML来管理复杂的白名单策略会变得非常笨拙。例如,“当且仅当一个CRITICAL漏洞的CVSS分数大于9.5且没有可用的修复补丁时才放行”,这种逻辑很难用简单的键值对表达。未来的方向是集成一个真正的策略引擎,如Open Policy Agent (OPA)。我们的Webhook可以将扫描出的JSON报告作为输入,交由一个Rego策略来做出最终的准入决策。

  4. 对已运行容器的审计: 准入控制只管新创建的Pod。对于已经运行的存量Pod,需要一个并行的审计机制。可以定期扫描集群中所有正在运行的镜像,并将发现的漏洞报告给监控告警系统。这与准入控制共同构成了一个完整的运行时安全闭环。


  目录