构建连接Jupyter探索与Flink生产的自动化实时特征工程平台


在任何依赖机器学习的业务中,数据科学家与工程团队之间的鸿沟都是一个普遍存在的痛点。数据科学家在 Jupyter Notebook 中使用 Python 灵活地探索数据、验证假设、构建特征;而工程团队则需要将这些经过验证的特征逻辑,用 Java 或 Scala 重新实现,部署到稳定、可扩展的 Apache Flink 集群上。这个手动翻译的过程不仅耗时,而且极易引入错误,严重拖慢了从模型构思到上线的迭代速度。

我们面临的正是这样一个挑战:如何构建一个平台,既能保留数据科学家在 Jupyter 环境中的探索自由度,又能实现特征逻辑到生产级 Flink 作业的一键式、自动化部署,同时保证整个基础设施的一致性和可维护性。

定义问题:两种方案的架构权衡

问题的核心是弥合探索环境(动态、灵活的 Python)与生产环境(静态、高性能的 JVM)之间的裂痕。我们评估了两种截然不同的架构方案。

方案 A:基于代码转换的“胶水”方案

初步构想是建立一个“智能”转换层。数据科学家仍然编写 Python UDF,平台后端尝试解析 Python 代码(或其 AST),将其转换为等效的 Java/Scala 代码,再动态编译并注入到一个模板化的 Flink 作业中。

优势:

  • 对数据科学家来说,学习成本极低,他们可以继续使用熟悉的 Pandas-like API。

劣势:

  1. 脆弱性: Python 的动态特性使得静态代码分析和转换极其困难且不可靠。一个细微的库版本差异或一个冷门的 Python 语法特性都可能导致转换失败。
  2. 性能黑盒: 自动转换的代码很难保证其在 Flink 上的执行效率。内存管理、序列化、状态访问等都可能成为性能瓶颈,且难以调试。
  3. 依赖地狱: 管理 Python UDF 的依赖(如 aiohttp, numpy, scikit-learn 等)在 Flink 的 Java 环境中是一个噩梦。虽然 Flink 提供了 Python API (PyFlink),但在我们的场景中,需要与大量现存的 Java 连接器和库进行深度集成,混合部署的运维成本很高。
  4. 维护成本: 代码转换逻辑本身会成为一个极其复杂的项目,充满了各种边界情况和补丁,最终变得无法维护。在真实项目中,这种方案往往会演变成一个技术债的无底洞。

方案 B:基于声明式定义的平台化方案

这个方案的核心思想是“约定优于配置”,我们不转换代码,而是定义一种标准的、声明式的“特征语言”(Feature Definition Language, FDL)。数据科学家不再提交任意的 Python 代码,而是通过一个我们提供的 Python 库在 Jupyter 中生成这种 FDL,FDL 本质上是一个描述了数据源、转换逻辑和输出格式的结构化文档(如 JSON 或 YAML)。

核心组件:

  1. 特征定义 (FDL): 一种与语言无关的规范,用于描述特征的计算逻辑。例如,一个 FDL 可以定义“过去5分钟内用户点击商品A的次数”。
  2. Jupyter SDK: 一个 Python 库,提供流畅的 API 让数据科学家在 Notebook 中构建、测试和验证 FDL。SDK 可以在本地模式下使用 Pandas DataFrame 模拟 Flink 的流式计算,提供即时反馈。
  3. 通用 Flink 执行器: 一个单一的、高度可配置的 Flink 作业。它的唯一职责是启动时加载一个或多个 FDL,并根据其内容动态构建 Flink DataStream 的计算图。
  4. Feature Hub (管理中心): 一个 Web 应用,作为 FDL 的注册、版本管理、部署和监控中心。数据科学家通过 Jupyter SDK 将最终确定的 FDL 发布到 Feature Hub。
  5. 基础设施自动化 (Chef): 使用 Chef 来管理整个平台的所有组件,包括 Flink 集群、JupyterHub、Feature Hub 应用服务器及其数据库。

优势:

  1. 解耦与稳定: 特征逻辑(FDL)与执行引擎(Flink)完全解耦。我们可以独立升级 Flink 执行器,优化性能,而无需改动任何已上线的特征。
  2. 确定性与可预测性: FDL 描述的是“什么”,而不是“如何”。由于所有特征都在同一个通用 Flink 执行器中运行,它们的性能模式、资源消耗和故障行为都是可预测和可管理的。
  3. 跨语言能力: FDL 是语言无关的。虽然我们首先提供 Python SDK,但未来可以轻易地为 R 或其他语言提供同样的 SDK。
  4. 易于治理与审计: 所有生产中的特征逻辑都有结构化的、版本化的记录,易于审计、回溯和理解。

劣势:

  • 初期投入大: 需要设计 FDL、开发 SDK、通用 Flink 执行器和 Feature Hub,前期开发成本较高。
  • 表达能力限制: FDL 不可能覆盖所有天马行空的计算逻辑。对于极其复杂的特征,可能需要提供“逃生舱”机制,例如允许注册自定义的 Java UDF Jar包。

经过权衡,我们选择了方案 B。虽然初期投入更大,但它提供了一个可扩展、可维护且长期来看总拥有成本更低的解决方案。一个常见的错误是低估了“胶水”方案在规模化后带来的隐形成本。方案 B 是一个真正的平台工程思路,旨在赋能用户,而不是被动地修补流程。

架构实现概览

我们最终的架构由五个核心部分协同工作,通过 Chef 实现统一的部署与管理。

graph TD
    subgraph "探索环境 (Data Scientist)"
        A[Jupyter Notebook] -- 使用 SDK --> B(Feature Definition SDK - Python)
        B -- 1. 本地模拟执行 --> C[Pandas DataFrame]
        B -- 2. 发布FDL --> D[Feature Hub API]
    end

    subgraph "管理与控制平面"
        D -- 写入/读取 --> E[PostgreSQL - 存储FDL]
        F[Feature Hub UI - Dart/Flutter Web] -- 与API交互 --> D
        style F fill:#cde4ff
    end

    subgraph "生产执行环境"
        G[Generic Flink Executor] -- 启动时拉取FDL --> D
        G -- 根据FDL构建 --> H(Flink DataStream DAG)
        H -- 读取 --> I[Kafka - 输入源]
        H -- 写入 --> J[Kafka/Redis - 输出结果]
    end

    subgraph "基础设施即代码"
        K[Chef Server] -- 管理配置 --> L[Flink Cluster Nodes]
        K -- 管理配置 --> M[JupyterHub Node]
        K -- 管理配置 --> N[Feature Hub App Node]
    end

    F -- 用户操作 --> D
    A --> F

    linkStyle 0 stroke-width:2px,fill:none,stroke:green;
    linkStyle 1 stroke-width:1px,fill:none,stroke:grey,stroke-dasharray: 3 3;
    linkStyle 2 stroke-width:2px,fill:none,stroke:blue;
    linkStyle 3 stroke-width:2px,fill:none,stroke:blue;
    linkStyle 4 stroke-width:2px,fill:none,stroke:red;
    linkStyle 5 stroke-width:1px,fill:none,stroke:purple;
    linkStyle 6 stroke-width:1px,fill:none,stroke:purple;
    linkStyle 7 stroke-width:1px,fill:none,stroke:purple;
    linkStyle 8 stroke-width:1px,fill:none,stroke:grey;
    linkStyle 9 stroke-width:1px,fill:none,stroke:grey;

核心实现:代码与细节解析

基础设施的稳定性是平台成功的基石。我们使用 Chef 来确保 Flink 集群的每个节点配置完全一致且可追溯。这里的坑在于,手动配置 Flink 集群很容易出现节点间 flink-conf.yaml 不一致的问题,导致难以诊断的运行时错误。

下面是一个简化的 Chef Recipe 示例,用于配置一个 Flink TaskManager 节点。它负责处理用户、下载、解压、配置生成和系统服务管理。

# cookbooks/flink_platform/recipes/taskmanager.rb

# 定义 Flink 版本和下载 URL,集中管理
flink_version = node['flink_platform']['version']
flink_base_url = node['flink_platform']['base_url']
flink_archive = "flink-#{flink_version}-bin-scala_2.12.tgz"
flink_url = "#{flink_base_url}/flink-#{flink_version}/#{flink_archive}"
install_dir = node['flink_platform']['install_dir']
flink_home = "#{install_dir}/flink-#{flink_version}"

# 1. 创建专用的 flink 用户和组,遵循最小权限原则
group 'flink' do
  action :create
end

user 'flink' do
  gid 'flink'
  shell '/bin/bash'
  home '/home/flink'
  manage_home true
  action :create
end

# 2. 使用 ark cookbook 下载和解压 Flink,这是 Chef 的标准实践
ark 'flink' do
  url flink_url
  version flink_version
  checksum node['flink_platform']['checksum'] # 校验文件完整性,防止篡改
  path install_dir
  owner 'flink'
  group 'flink'
  action :put
end

# 3. 动态生成 flink-conf.yaml 配置文件
# 这是 Chef 最核心的价值,根据属性动态生成配置,确保一致性
# JobManager 的地址通过 Chef Search 或属性动态发现
jobmanager_ip = search(:node, "roles:flink_jobmanager").first['ipaddress']

template "#{flink_home}/conf/flink-conf.yaml" do
  source 'flink-conf.yaml.erb'
  owner 'flink'
  group 'flink'
  mode '0644'
  variables(
    jobmanager_rpc_address: jobmanager_ip,
    taskmanager_heap_size: node['flink_platform']['taskmanager']['heap_size'],
    taskmanager_slots: node['flink_platform']['taskmanager']['number_of_slots'],
    # ... 其他数十个配置项
  )
  # 当配置改变时,通知服务重启
  notifies :restart, 'systemd_unit[flink-taskmanager.service]', :delayed
end

# 4. 创建并启用 systemd 服务,确保 Flink 进程由系统管理
systemd_unit 'flink-taskmanager.service' do
  content({
    Unit: {
      Description: 'Apache Flink TaskManager',
      After: 'network.target',
    },
    Service: {
      Type: 'simple',
      User: 'flink',
      Group: 'flink',
      ExecStart: "#{flink_home}/bin/taskmanager.sh start-foreground",
      Restart: 'on-failure',
      WorkingDirectory: flink_home,
    },
    Install: {
      WantedBy: 'multi-user.target',
    },
  })
  action [:create, :enable, :start]
end

这段 Recipe 保证了部署的幂等性。无论运行多少次,TaskManager 节点的状态都会收敛到我们定义的状态。这种确定性在规模化运维中至关重要。

这是整个平台的技术核心。它是一个标准的 Flink 流处理作业,但其内部的计算图并非硬编码,而是根据从 Feature Hub 拉取的 FDL 动态构建的。

假设我们的 FDL JSON 格式如下:

{
  "featureName": "user_5min_click_count",
  "version": 1,
  "source": {
    "type": "kafka",
    "topic": "user_events",
    "schema": {"userId": "STRING", "eventId": "STRING", "timestamp": "BIGINT"}
  },
  "transformations": [
    {
      "type": "filter",
      "expression": "eventId == 'click'"
    },
    {
      "type": "keyBy",
      "key": "userId"
    },
    {
      "type": "tumbleWindow",
      "size": "5m"
    },
    {
      "type": "aggregate",
      "function": "COUNT"
    }
  ],
  "sink": {
    "type": "redis",
    "host": "redis.prod",
    "keyPrefix": "feature:"
  }
}

我们的 Flink 作业需要一个解析器和构建器。

// Simplified Flink Job Main Class

public class GenericFeatureExecutor {

    public static void main(String[] args) throws Exception {
        final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        
        // 生产级配置: Checkpointing, StateBackend, Restart Strategy
        env.enableCheckpointing(60000); // 每60秒一次 checkpoint
        env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
        env.setStateBackend(new RocksDBStateBackend("hdfs:///flink/checkpoints"));

        // 从参数或配置中心获取 Feature Name
        final String featureName = "user_5min_click_count"; // In real world, from args

        // 1. 从 Feature Hub 拉取 FDL
        FeatureDefinition fdl = fetchFeatureDefinition(featureName);

        // 2. 根据 FDL 构建数据源
        DataStream<Row> sourceStream = buildSource(env, fdl.getSource());

        // 3. 链式应用转换逻辑
        DataStream<?> transformedStream = applyTransformations(sourceStream, fdl.getTransformations());

        // 4. 构建数据汇
        buildSink(transformedStream, fdl.getSink());

        // 命名作业以便于监控
        env.execute("FeatureEngine - " + featureName);
    }

    private static DataStream<Row> buildSource(StreamExecutionEnvironment env, SourceDef sourceDef) {
        // Factory pattern to build different sources
        if ("kafka".equals(sourceDef.getType())) {
            // ... Kafka Connector setup logic ...
            // 返回一个 DataStream<Row>,Row 的类型信息来自 sourceDef.getSchema()
        }
        throw new IllegalArgumentException("Unsupported source type: " + sourceDef.getType());
    }

    private static DataStream<?> applyTransformations(DataStream<Row> stream, List<TransformationDef> defs) {
        DataStream<?> currentStream = stream;
        for (TransformationDef def : defs) {
            switch (def.getType()) {
                case "filter":
                    // 注意:这里的表达式需要被解析和执行。
                    // 可以使用 Janino 或其他表达式引擎来实现动态过滤。
                    // 这是一个难点,但比转换整个Python UDF要可控得多。
                    currentStream = ((DataStream<Row>)currentStream).filter(new DynamicFilterFunction(def.getExpression()));
                    break;
                case "keyBy":
                    // KeySelector 同样需要是动态的
                    currentStream = ((DataStream<Row>)currentStream).keyBy(new DynamicKeySelector(def.getKey()));
                    break;
                case "tumbleWindow":
                    // 这里需要将窗口操作应用在 KeyedStream 上
                    // 需要维护流的类型信息,例如从 DataStream 切换到 KeyedStream
                    // ... Windowing logic ...
                    break;
                // ... other transformation types
            }
        }
        return currentStream;
    }

    // ... buildSink and other helper methods
}

这个执行器的挑战在于如何安全、高效地执行动态逻辑(如 filter 表达式),以及如何管理不同转换步骤之间流类型的变化(DataStream -> KeyedStream -> WindowedStream)。但好处是,所有这些复杂性都被封装在单一的、由工程团队维护的作业中。

3. Dart 与样式方案:构建清晰的 Feature Hub UI

Feature Hub 是数据科学家与平台交互的主要界面。选择 Dart (配合 Flutter Web) 是因为我们需要快速构建一个高性能、体验一致的前端应用。后端 API 可以使用 Dart Frog 或 Shelf。一个统一的、精心设计的样式方案在此至关重要,它能降低用户的认知负荷。

样式方案核心原则:

  • 状态可视化: 特征的不同状态(草稿、已部署、失败、已归档)必须有明确的颜色和图标标识。
  • 信息密度: 在列表页清晰展示特征名称、版本、创建者、源 Topic、Sink 类型和健康状态。
  • 操作引导: “部署”、“回滚”、“监控”等核心操作按钮必须在界面上最显眼的位置。

下面是一个 Dart 后端 (使用 Shelf 框架) 处理“部署”请求的伪代码。

// main.dart in a Dart backend application

import 'package:shelf/shelf.dart';
import 'package:shelf/shelf_io.dart' as io;
import 'package:shelf_router/shelf_router.dart';
import 'dart:convert';
import 'db_service.dart'; // 数据库服务
import 'flink_api_client.dart'; // Flink API 客户端

void main() async {
  final app = Router();

  // API: 部署一个特定版本的特征
  // PUT /api/features/{name}/versions/{version}/deploy
  app.put('/api/features/<name>/versions/<version>/deploy', 
    (Request request, String name, String versionStr) async {
      final version = int.tryParse(versionStr);
      if (version == null) {
        return Response.badRequest(body: 'Invalid version format');
      }

      try {
        // 1. 数据库操作:将 FDL 状态更新为 "DEPLOYING"
        await dbService.updateFeatureStatus(name, version, 'DEPLOYING');
        
        // 2. 与 Flink API 交互,提交或更新作业
        // 这里的关键是:我们是停止旧作业再启动新作业,还是使用 Savepoint 进行升级?
        // Savepoint 是更优选,能保证状态不丢失。
        final fdl = await dbService.getFdl(name, version);
        final savepointPath = await flinkApiClient.stopJobWithSavepoint(name);

        // 提交新作业,并从 savepoint 恢复
        final jobId = await flinkApiClient.submitJob(
          name, 
          fdl, 
          savepointPath
        );

        // 3. 数据库操作:更新状态为 "ACTIVE",并记录 Flink Job ID
        await dbService.updateFeatureStatus(name, version, 'ACTIVE', jobId: jobId);

        return Response.ok(json.encode({'status': 'ok', 'jobId': jobId}));

      } catch (e, s) {
        // 完整的错误处理和日志记录
        print('Deployment failed: $e\n$s');
        await dbService.updateFeatureStatus(name, version, 'FAILED', error: e.toString());
        return Response.internalServerError(body: 'Deployment failed');
      }
  });

  await io.serve(app, '0.0.0.0', 8080);
}

这个 Dart 后端负责编排整个部署流程,体现了后端即服务(BaaS)的思想。它将复杂的 Flink API 操作封装成一个简单的 RESTful 接口,供前端调用。

架构的局限性与未来迭代

尽管该方案解决了核心痛点,但它并非银弹。

  1. FDL 的表达能力: 目前的 FDL 主要支持无状态和简单的窗口聚合操作。对于需要机器学习模型调用、或复杂会话窗口(Session Window)的特征,FDL 需要进一步扩展。一个可能的方向是允许用户在 FDL 中引用一个已注册的、经过审核的 Java UDF Jar。
  2. 测试的完备性: Jupyter SDK 中的本地模拟执行无法完全复现 Flink 分布式环境下的行为,尤其是在处理事件时间和状态一致性方面。未来的迭代需要构建一个“准生产”的测试环境,允许数据科学家将他们的 FDL 部署到这个环境中,用真实的流量进行验证。
  3. 对 Chef 的依赖: 虽然 Chef 在虚拟机环境中表现出色,但在一个完全拥抱 Kubernetes 的云原生环境中,使用 Kubernetes Operator 来管理 Flink 作业生命周期会是更自然的选择。将部署逻辑从 Chef Recipe 迁移到一个专用的 Flink Feature Operator,将是平台演进的下一个重要步骤。这个 Operator 可以直接监听 Feature Hub 中 FDL 的变化,并自动调谐(reconcile)线上的 Flink Job,实现真正的 GitOps 式特征管理。

  目录