构建由 Apache Spark 驱动并基于 Qwik 与 PostgreSQL 的大规模机器学习特征可观测性平台


我们的机器学习平台每天处理数TB的数据,生成数千个特征。这些特征是模型的命脉,但长期以来,数据科学家和工程师都在一个黑盒中操作。特征生成管道由 Apache Spark 驱动,将结果存入 PostgreSQL,但要验证一批新生成的特征是否有数据漂移、分布是否异常、空值率是否飙升,唯一的方法是运行一系列笨重、缓慢的 ad-hoc SQL 查询。现有的BI工具无法应对这种交互性与数据规模的需求,它们要么在加载时就崩溃,要么交互延迟高到无法忍受。我们需要一个内部工具,一个为性能而生的特征可观测性平台,它必须能够即时加载,并对海量特征统计数据进行流畅的交互式探索。

最初的构想是构建一个标准的 React SPA。但我们很快意识到,一个页面可能需要展示数百个特征的时间序列分布图、直方图和关键统计指标。一个典型的 React 应用在加载这样一个仪表盘时,需要下载庞大的 JavaScript 包,执行漫长的水合(Hydration)过程,最终导致数秒甚至更长的“可交互前等待时间”(Time to Interactive, TTI)。这在我们的场景中是不可接受的。

这就是 Qwik 进入我们视野的原因。Qwik 的可恢复性(Resumability)承诺了近乎为零的 JavaScript 启动成本。它不在客户端重新执行任何代码,而是从服务端暂停的地方恢复。对于我们这种数据密集、交互点繁多的仪表盘来说,这意味着用户几乎可以立即与页面交互,而相关功能的代码只有在用户实际操作时才会被渐进式地下载和执行。这是一个根本性的改变。

技术栈选型决策如下:

  • 数据处理层 (Batch Processing): Apache Spark。这是现有技术栈,负责大规模离线计算特征的统计元数据(均值、标准差、分位数、空值率、基数等)。
  • 存储层 (Storage): PostgreSQL。用于存储 Spark 计算出的时间序列特征统计数据。我们将利用其分区表功能来优化时间范围查询。
  • API层 (Data Service): Node.js + Apollo Server + GraphQL。GraphQL 提供了强大的查询能力,允许前端精确地请求其所需的数据,避免了 RESTful API 常见的过度获取或获取不足的问题。
  • 前端层 (Presentation): Qwik + Vite + Apollo Client。Qwik 负责极致的启动性能和交互体验,Vite 提供闪电般的开发服务器和优化的构建输出,Apollo Client 则作为连接 Qwik 与 GraphQL 服务的桥梁。

整个系统的数据流设计如下:

graph TD
    subgraph "批处理集群"
        A[Spark Job: Feature Statistics] -- JDBC Write --> B[PostgreSQL Staging Table]
    end

    subgraph "数据库服务"
        B -- Trigger/Scheduled Procedure --> C{PostgreSQL Partitioned Tables}
        style C fill:#f9f,stroke:#333,stroke-width:2px
    end

    subgraph "API 服务"
        D[Node.js Server] -- Queries --> C
        E[Apollo GraphQL Server] -- Resolvers --> D
    end

    subgraph "客户端"
        F[Browser] -- HTTP/GraphQL Requests --> E
        G[Qwik UI Components] -- useResource$ / Apollo Client --> F
        style G fill:#ccf,stroke:#333,stroke-width:2px
    end

第一步: Spark 特征统计与 PostgreSQL 存储

我们的 Spark 作业是整个流程的起点。它不是简单地转换数据,而是为每个特征、每个批次计算出一套丰富的统计指标。在真实项目中,这通常使用 Deequ 或类似的库来完成,但为了清晰起见,我们用 Spark SQL 来展示核心逻辑。

假设我们有一个包含特征的 Parquet 数据集:

// FeatureStatsJob.scala
import org.apache.spark.sql.{SparkSession, DataFrame, SaveMode}
import org.apache.spark.sql.functions._
import java.util.Properties

object FeatureStatsJob {
  def main(args: Array[String]): Unit = {
    val spark = SparkSession.builder
      .appName("Feature Statistics Computation")
      .master("local[*]") // In production, this would be yarn or kubernetes
      .getOrCreate()

    // Assuming input data is partitioned by processing_date
    val processingDate = "2023-10-27"
    val inputDf = spark.read.parquet(s"/path/to/feature/data/processing_date=$processingDate")

    // A list of features we want to analyze
    val featureColumns = Seq("user_age", "purchase_amount", "login_frequency_7d")

    val statsExpressions = featureColumns.flatMap(c => Seq(
      // Basic stats
      lit(c).alias("feature_name"),
      count(col(c)).alias(s"${c}_count"),
      count(when(col(c).isNull, 1)).alias(s"${c}_null_count"),
      mean(col(c)).alias(s"${c}_mean"),
      stddev(col(c)).alias(s"${c}_stddev"),
      min(col(c)).alias(s"${c}_min"),
      max(col(c)).alias(s"${c}_max"),
      // Approximate quantiles for distribution analysis
      expr(s"percentile_approx(${c}, array(0.0, 0.25, 0.5, 0.75, 1.0))").alias(s"${c}_quantiles")
    ))

    // This is a simplified aggregation. In a real scenario, you'd unpivot the data.
    // For this example, we'll process and format it post-aggregation.
    val aggregatedDf = inputDf.agg(
        statsExpressions.head,
        statsExpressions.tail: _*
    )
    
    // Unpivot logic would transform the wide DF into a long format suitable for storage
    // For brevity, let's assume a function `unpivotStats` does this transformation.
    // val finalStatsDf = unpivotStats(aggregatedDf, processingDate)
    // The final schema should be: (processing_date, feature_name, stat_name, stat_value)
    
    // For demonstration, let's manually create a DF with the target schema
    import spark.implicits._
    val finalStatsDf = Seq(
      ("2023-10-27", "user_age", "mean", "35.2"),
      ("2023-10-27", "user_age", "stddev", "8.1"),
      ("2023-10-27", "user_age", "null_rate", "0.01"),
      ("2023-10-27", "purchase_amount", "mean", "120.5"),
      ("2023-10-27", "purchase_amount", "stddev", "45.3"),
      ("2023-10-27", "purchase_amount", "null_rate", "0.05")
    ).toDF("processing_date", "feature_name", "metric_name", "metric_value")


    // Database connection properties
    val pgUrl = "jdbc:postgresql://your-pg-host:5432/features"
    val pgProperties = new Properties()
    pgProperties.setProperty("user", "spark_user")
    pgProperties.setProperty("password", "your_password")
    pgProperties.setProperty("driver", "org.postgresql.Driver")

    // Writing to a dedicated table for feature statistics
    // In a real-world scenario, you write to a staging table first.
    finalStatsDf.write
      .mode(SaveMode.Append)
      .jdbc(pgUrl, "public.feature_time_series_stats", pgProperties)
      
    spark.stop()
  }
}

PostgreSQL 中的表结构至关重要。我们使用按日期分区的表来确保时间范围查询的高效性。

-- DDL for PostgreSQL
CREATE TABLE feature_time_series_stats (
    processing_date DATE NOT NULL,
    feature_name VARCHAR(255) NOT NULL,
    metric_name VARCHAR(100) NOT NULL,
    metric_value DOUBLE PRECISION,
    -- Add a unique constraint for idempotency
    PRIMARY KEY (processing_date, feature_name, metric_name)
) PARTITION BY RANGE (processing_date);

-- Example partition for a specific month. This should be managed by an automation script.
CREATE TABLE feature_time_series_stats_2023_10 PARTITION OF feature_time_series_stats
    FOR VALUES FROM ('2023-10-01') TO ('2023-11-01');

-- Indexes are critical for query performance
CREATE INDEX idx_feature_name_date ON feature_time_series_stats (feature_name, processing_date DESC);

这里的坑在于分区管理。必须有自动化的脚本或数据库扩展(如 pg_partman)来创建未来的分区并归档旧的分区,否则写入或查询会失败。

第二步: 搭建 GraphQL API 服务

我们使用 Node.js、Fastify 和 mercurius (一个用于 Fastify 的高性能 GraphQL 适配器) 来构建 API。

# Project setup
npm init -y
npm install fastify mercurius graphql pg
// server.js
const fastify = require('fastify')({ logger: true });
const mercurius = require('mercurius');
const { Pool } = require('pg');

// Production-ready PG Pool configuration
const pool = new Pool({
  user: 'api_user',
  host: 'your-pg-host',
  database: 'features',
  password: 'api_password',
  max: 20, // Max clients in pool
  idleTimeoutMillis: 30000, // Close idle clients after 30s
  connectionTimeoutMillis: 5000, // Fail if cannot connect in 5s
});

const schema = `
  type Query {
    getFeatureNames: [String!]!
    getFeatureStats(featureName: String!, startDate: String!, endDate: String!): [FeatureStatPoint!]
  }

  type FeatureStatPoint {
    date: String!
    metrics: [Metric!]!
  }

  type Metric {
    name: String!
    value: Float
  }
`;

const resolvers = {
  Query: {
    getFeatureNames: async (_, __, { db }) => {
      try {
        const res = await db.query('SELECT DISTINCT feature_name FROM feature_time_series_stats ORDER BY feature_name');
        return res.rows.map(row => row.feature_name);
      } catch (err) {
        fastify.log.error({ err }, 'Failed to fetch feature names');
        throw new mercurius.ErrorWithProps('DATABASE_QUERY_FAILED', {
          details: 'Could not retrieve distinct feature names.',
          originalError: err.message,
        });
      }
    },
    getFeatureStats: async (_, { featureName, startDate, endDate }, { db }) => {
      // Input validation is crucial in a real application
      if (!featureName || !startDate || !endDate) {
        throw new mercurius.ErrorWithProps('VALIDATION_ERROR', {
          details: 'featureName, startDate, and endDate are required.',
        });
      }

      const query = {
        text: `
          SELECT 
            processing_date,
            json_agg(json_build_object('name', metric_name, 'value', metric_value)) as metrics
          FROM feature_time_series_stats
          WHERE feature_name = $1 AND processing_date >= $2 AND processing_date <= $3
          GROUP BY processing_date
          ORDER BY processing_date ASC;
        `,
        values: [featureName, startDate, endDate],
      };

      try {
        const res = await db.query(query);
        // Transform data to match GraphQL schema
        return res.rows.map(row => ({
          date: new Date(row.processing_date).toISOString().split('T')[0],
          metrics: row.metrics,
        }));
      } catch (err) {
        fastify.log.error({ err, query: query.text, params: query.values }, `Failed to fetch stats for ${featureName}`);
        throw new mercurius.ErrorWithProps('DATABASE_QUERY_FAILED', {
          details: `Could not retrieve stats for feature: ${featureName}`,
          originalError: err.message,
        });
      }
    }
  },
};

fastify.register(mercurius, {
  schema,
  resolvers,
  context: async (request, reply) => {
    // This creates a per-request database client, essential for proper transaction management
    // though for read-only queries, the pool is sufficient.
    return { db: pool };
  },
  graphiql: true, // Enable GraphiQL UI for development
});

const start = async () => {
  try {
    await fastify.listen({ port: 4000 });
  } catch (err) {
    fastify.log.error(err);
    process.exit(1);
  }
};

start();

这个 API 服务的设计考虑了生产环境的需求:使用连接池、详细的错误日志记录以及通过 ErrorWithProps 向客户端返回结构化的错误。

第三步: Qwik 前端实现

现在是核心部分。我们将创建一个 Qwik 应用来消费这个 GraphQL API。

# Create a new Qwik app with Vite
npm create qwik@latest
# Select "Empty App"

cd my-qwik-app
npm install @apollo/client graphql

我们需要配置 Apollo Client。一个常见的错误是在 Qwik 组件中直接实例化 ApolloClient,这会破坏 Qwik 的可恢复性。正确的做法是将其封装在一个可以被序列化的上下文中,或在根组件中一次性创建。

// src/apollo/client.ts
import { ApolloClient, InMemoryCache, createHttpLink } from '@apollo/client/core';

export const getClient = () => {
  const httpLink = createHttpLink({
    // IMPORTANT: Use the server's internal address during SSR
    // and the public address on the client.
    uri: import.meta.env.SSR
      ? 'http://localhost:4000/graphql' // Server-to-server
      : '/graphql', // Client-side proxy
  });

  return new ApolloClient({
    link: httpLink,
    cache: new InMemoryCache(),
    // This is crucial for Qwik: disable SSR fetching in the client itself
    // as Qwik's `useResource$` will handle the data transfer.
    ssrMode: import.meta.env.SSR,
  });
};

在 Vite 配置中,我们需要设置一个代理来解决开发环境下的跨域问题。

// vite.config.ts
import { defineConfig } from 'vite';
import { qwikVite } from '@builder.io/qwik/optimizer';

export default defineConfig(() => {
  return {
    plugins: [qwikVite()],
    server: {
      proxy: {
        '/graphql': {
          target: 'http://localhost:4000',
          changeOrigin: true,
          rewrite: (path) => path.replace(/^\/graphql/, '/graphql'),
        },
      },
    },
  };
});

现在,我们来构建核心的特征仪表盘组件。

// src/routes/index.tsx
import { component$, useStore, useResource$, Resource } from '@builder.io/qwik';
import { gql } from '@apollo/client/core';
import { getClient } from '~/apollo/client';

// Define the GraphQL query
const GET_FEATURE_STATS_QUERY = gql`
  query GetFeatureStats($featureName: String!, $startDate: String!, $endDate: String!) {
    getFeatureStats(featureName: $featureName, startDate: $startDate, endDate: $endDate) {
      date
      metrics {
        name
        value
      }
    }
  }
`;

// A simple chart component to visualize data
export const TimeSeriesChart = component$<{ data: any[], metric: string }>(({ data, metric }) => {
  // In a real app, this would use D3, Chart.js, or a dedicated charting library.
  // This is a simplified SVG representation.
  const points = data.map((d, i) => {
    const metricValue = d.metrics.find((m: any) => m.name === metric)?.value ?? 0;
    return `${i * 30},${100 - metricValue}`;
  }).join(' ');

  return (
    <div style={{ border: '1px solid #ccc', padding: '10px', margin: '10px 0' }}>
      <h4>{metric} over time</h4>
      <svg width="100%" height="100" viewBox="0 0 300 100" preserveAspectRatio="none">
        <polyline fill="none" stroke="steelblue" stroke-width="2" points={points} />
      </svg>
    </div>
  );
});

export default component$(() => {
  const state = useStore({
    featureNames: [] as string[],
    selectedFeature: 'user_age', // Default feature
    startDate: '2023-10-01',
    endDate: '2023-10-27',
  });
  
  // This resource fetches the list of available features once.
  const featureNamesResource = useResource$<string[]>(async ({ cleanup }) => {
    const controller = new AbortController();
    cleanup(() => controller.abort());
    
    const client = getClient();
    const { data } = await client.query({
      query: gql`query GetFeatureNames { getFeatureNames }`,
      signal: controller.signal,
    });
    
    state.featureNames = data.getFeatureNames; // Populate the store
    return data.getFeatureNames;
  });

  // This resource re-fetches whenever the selected feature or date range changes.
  // This is the core of Qwik's reactivity model for data fetching.
  const featureStatsResource = useResource$<any[]>(async ({ track, cleanup }) => {
    // track() tells Qwik to re-run this function when these state properties change.
    track(() => state.selectedFeature);
    track(() => state.startDate);
    track(() => state.endDate);

    const controller = new AbortController();
    cleanup(() => controller.abort());

    // Prevent fetching if no feature is selected
    if (!state.selectedFeature) {
      return [];
    }

    const client = getClient();
    const { data, errors } = await client.query({
      query: GET_FEATURE_STATS_QUERY,
      variables: {
        featureName: state.selectedFeature,
        startDate: state.startDate,
        endDate: state.endDate,
      },
      signal: controller.signal,
    });

    if (errors) {
      console.error('GraphQL Errors:', errors);
      // This allows the Resource component to render the 'onRejected' slot.
      throw new Error(errors.map(e => e.message).join('\n'));
    }

    return data.getFeatureStats;
  });

  return (
    <div style={{ padding: '20px' }}>
      <h1>Feature Observability Platform</h1>
      
      <div style={{ marginBottom: '20px', display: 'flex', gap: '15px' }}>
        <Resource
          value={featureNamesResource}
          onPending={() => <p>Loading feature list...</p>}
          onRejected={(error) => <p>Error loading features: {error.message}</p>}
          onResolved={(names) => (
            <select
              value={state.selectedFeature}
              onChange$={(e) => {
                state.selectedFeature = (e.target as HTMLSelectElement).value;
              }}
            >
              {names.map(name => <option key={name} value={name}>{name}</option>)}
            </select>
          )}
        />
        
        <input type="date" value={state.startDate} onChange$={(e) => state.startDate = (e.target as HTMLInputElement).value} />
        <input type="date" value={state.endDate} onChange$={(e) => state.endDate = (e.target as HTMLInputElement).value} />
      </div>

      <hr />

      <Resource
        value={featureStatsResource}
        onPending={() => <div class="loader">Analyzing feature data...</div>}
        onRejected={(error) => <div class="error">Failed to load feature stats: {error.message}</div>}
        onResolved={(stats) => (
          stats.length > 0 ? (
            <div>
              <h2>Statistics for: {state.selectedFeature}</h2>
              {/* Here we visualize different metrics */}
              <TimeSeriesChart data={stats} metric="mean" />
              <TimeSeriesChart data={stats} metric="stddev" />
              <TimeSeriesChart data={stats} metric="null_rate" />
            </div>
          ) : (
            <p>No data available for the selected feature and date range.</p>
          )
        )}
      />
    </div>
  );
});

这段代码的美妙之处在于其声明式的本质。useResource$ 完美地封装了异步数据获取的生命周期。通过 track 函数,我们精确地告诉 Qwik,featureStatsResource 依赖于 selectedFeature 和日期范围。当用户通过 onChange$ 事件处理器改变这些 state 的值时,Qwik 的运行时会检测到变化,并自动重新执行 useResource$ 的回调函数,触发一次新的 GraphQL 查询。整个过程无需手写 useEffect 依赖数组,也避免了传统框架中可能出现的重复请求或状态不一致的陷阱。

最重要的是,这一切都遵循 Qwik 的可恢复性原则。初始的 HTML 由服务器生成,包含了 featureNamesResourcefeatureStatsResource 的初始数据。浏览器接收到 HTML 后可以立即渲染,无需执行任何 JavaScript。当用户点击下拉菜单时,只有那个 onChange$ 闭包的代码,以及更新视图所需的最少量代码,才会被下载和执行。这就是我们追求的极致性能。

方案的局限性与未来展望

这个架构虽然高性能且可扩展,但并非没有局限。首先,它的数据新鲜度受限于 Apache Spark 批处理作业的运行频率。对于需要秒级延迟的场景,这套方案并不适用。未来的迭代方向是引入流处理组件,例如 Apache Flink 或 Spark Streaming,通过 CDC (Change Data Capture) 从源系统捕获数据,实时计算统计指标并推送到前端,这可能需要通过 WebSocket 协议来完成。

其次,前端的可视化目前非常基础。对于更复杂、更高密度的图表(如热力图、散点图矩阵),直接操作 SVG 可能会遇到性能瓶颈。届时需要引入基于 Canvas 或 WebGL 的高性能图表库,并仔细管理其与 Qwik 的集成,确保不会破坏 Qwik 的性能模型。

最后,随着特征数量增长到数万级别,一次性加载所有特征名称到下拉菜单中将不再现实。需要实现服务器端的分页与搜索功能,进一步优化用户体验。这也会对 GraphQL schema 和 Qwik 组件的交互逻辑提出新的要求。


  目录