使用 Crossplane Composition 声明式构建包含 ClickHouse 与 Python 服务的实时分析平台


在不断扩张的组织中,不同业务团队对实时数据分析能力的需求呈指数级增长。传统的应对方式——通过工单系统向平台工程团队(Platform Engineering)申请资源——很快会成为瓶颈。这个流程不仅缓慢、易错,而且将平台团队变成了单纯的资源交付者,而非价值创造者。我们面临的挑战是:如何构建一个平台,让应用开发者能够以自助、声明式的方式,快速获取一套隔离、完整、生产可用的实时分析堆栈?

这个堆栈不仅仅是一个数据库实例,它必须包含数据接收服务、数据存储以及相应的网络策略和凭证管理。我们的目标是,开发者只需提交一个简单的 YAML 文件,就能得到所有这一切。

方案权衡:命令式的脚本 vs. 声明式的控制平面

在设计这个自助化平台时,我们评估了两种主流的实现路径。

方案A:基于 Terraform/Ansible 的 GitOps 流程

这是一种非常成熟的模式。平台团队负责编写和维护一系列 Terraform 模块或 Ansible Playbook,用于创建 ClickHouse 集群、部署 Python 应用的 Kubernetes 资源等。应用团队通过修改一个 tfvars 文件或变量清单,并通过 CI/CD 流水线触发执行来申请资源。

优势:

  • 生态成熟,工具链完备。
  • 对于熟悉传统 IaC 的团队来说,上手成本较低。

劣势:

  • 流程驱动而非 API 驱动: 核心动作是“执行一个流水线”,而不是“创建一个资源”。这意味着状态的同步是周期性的、被动的。如果有人在流水线之外手动修改了资源,就会产生配置漂移,直到下一次流水线运行才可能被纠正。
  • 状态管理的复杂性: 随着成百上千个分析堆栈被创建,集中的 Terraform state 文件会成为一个巨大的管理负担和潜在的单点故障。
  • 抽象泄漏: 应用开发者仍然需要理解 IaC 模块的输入变量,而不是一个干净、内聚的业务层 API。这种抽象是不彻底的。

方案B:基于 Crossplane 的 Kubernetes 原生控制平面

Crossplane 将基础设施和外部服务变成了 Kubernetes 的一等公民。我们可以定义自己的 CRD (Custom Resource Definition),比如一个叫做 AnalyticsStack 的资源。平台团队则通过 Composition 来定义一个 AnalyticsStack 究竟由哪些底层资源(例如一个 ClickHouse 集群、一个 Kubernetes Deployment、一个 Service 等)组成。

优势:

  • 真正的声明式 API: 开发者通过 kubectl apply 创建一个 AnalyticsStack 对象。Kubernetes 的控制循环和 Crossplane 的 Controller 会持续不断地将真实世界调整为 YAML 中声明的期望状态,从根本上解决了配置漂移问题。
  • 高度抽象和封装: 平台团队可以定义一个非常简洁的 AnalyticsStack API,将所有实现细节(用哪个 ClickHouse Operator、如何配置副本、如何管理密钥)封装在 Composition 中。开发者只需要关心业务参数,如 tenantIdstorageTier: "high-performance"
  • 与 Kubernetes 生态无缝集成: 权限控制(RBAC)、事件、监控等都可以复用现有的 Kubernetes 工具链。

最终选择与理由

我们最终选择了 Crossplane。虽然它的学习曲线比 Terraform 更陡峭,但它提供了一个真正的“平台即服务”模型。它将基础设施的管理从“运行脚本”的思维模式转变为“操作 API”的云原生模式。这种转变带来的长期收益——包括更高的自动化程度、更强的系统自愈能力和更纯粹的开发者体验——是传统 IaC 工具无法比拟的。在真实项目中,运维效率的提升和故障率的降低,将远远超过初期的学习成本。

核心实现概览

我们将使用 Crossplane 构建一个名为 AnalyticsStack 的复合资源。当用户创建一个 AnalyticsStack 实例时,Crossplane 会自动完成以下操作:

  1. 利用 Helm Provider 部署一个 ClickHouse Operator 并创建一个 ClickHouseInstallation 实例。
  2. 利用 Kubernetes Provider 部署一个 Python FastAPI 应用作为数据采集端点。
  3. 动态地将创建好的 ClickHouse 连接信息注入到 Python 应用的环境变量中。
  4. 将数据采集端点的服务地址写回到 AnalyticsStack 的状态中,供应用开发者使用。

下面是这个架构的简化视图:

graph TD
    subgraph Kubernetes API Server
        A[User: kubectl apply -f my-stack.yaml]
    end

    subgraph Crossplane Control Plane
        A -- Creates --> B(AnalyticsStack Claim)
        B -- Selects --> C(Composition)
        C -- Composes --> D{Composite Resource}
    end

    subgraph Managed Resources in Cluster
        D -- Reconciles --> E[HelmRelease: ClickHouse Operator]
        E -- Deploys --> F[ClickHouseInstallation]
        D -- Reconciles --> G[Deployment: Python Ingestor]
        D -- Reconciles --> H[Service: Python Ingestor]
        D -- Reconciles --> I[Secret: Connection Details]
    end

    F -- Provides Connection Details --> I
    I -- Injects as Env Vars --> G
    
    style User fill:#f9f,stroke:#333,stroke-width:2px
    style B fill:#bbf,stroke:#333,stroke-width:2px

1. 定义 AnalyticsStack 的 API (CompositeResourceDefinition)

这是我们暴露给开发者的 API 契约。它定义了 AnalyticsStack 资源有哪些可配置的参数。

# xrds/analytics-stack.yaml
apiVersion: apiextensions.crossplane.io/v1
kind: CompositeResourceDefinition
metadata:
  name: xanalyticsstacks.platform.acme.io
spec:
  group: platform.acme.io
  names:
    kind: XAnalyticsStack
    plural: xanalyticsstacks
  claimNames:
    kind: AnalyticsStack
    plural: analyticsstacks
  versions:
  - name: v1alpha1
    served: true
    referenceable: true
    schema:
      openAPIV3Schema:
        type: object
        properties:
          spec:
            type: object
            properties:
              parameters:
                type: object
                properties:
                  # 租户ID,用于资源命名和隔离
                  tenantId:
                    type: string
                    description: "Unique identifier for the tenant, used for naming and isolation."
                  # ClickHouse 数据存储容量
                  storageGb:
                    type: integer
                    description: "Storage size in GB for the ClickHouse cluster."
                    default: 10
                  # Python 采集服务的镜像
                  ingestorImage:
                    type: string
                    description: "Container image for the Python data ingestor service."
                    default: "acme/default-ingestor:latest"
                required:
                - tenantId
            required:
            - parameters
          status:
            type: object
            properties:
              # 采集服务的内部访问地址
              ingestorEndpoint:
                type: string
                description: "The internal cluster endpoint for the data ingestor service."

2. 实现 AnalyticsStackComposition

这是平台工程的核心。它像一个蓝图,告诉 Crossplane 如何将一个 AnalyticsStack 资源实例化为一组具体的底层资源。

# composition/composition.yaml
apiVersion: apiextensions.crossplane.io/v1
kind: Composition
metadata:
  name: analytics-stack.aws
  labels:
    provider: aws
spec:
  compositeTypeRef:
    apiVersion: platform.acme.io/v1alpha1
    kind: XAnalyticsStack
  resources:
    # 资源1: 使用 Helm Provider 部署 ClickHouse Operator 和实例
    - name: clickhouse-cluster
      base:
        apiVersion: helm.crossplane.io/v1beta1
        kind: Release
        spec:
          forProvider:
            namespace: default # 在真实项目中,应该为每个租户创建独立的 namespace
            chart:
              name: clickhouse-operator
              repository: https://altinity.github.io/clickhouse-operator-helm-chart/
              version: "0.23.1"
            values:
              clickhouseOperator:
                install: true
              # 注意:这里直接嵌入了 ClickHouseInstallation 的定义
              # 这是一个简化的做法,更健壮的方式是分两步,先装 Operator,再创建 CR
              chInstallations:
                - name: "chi-template" # 模板名称
                  spec:
                    defaults:
                      templates:
                        dataVolumeClaimTemplate: default
                    configuration:
                      clusters:
                        - name: "placeholder" # 将被 patch 覆盖
                          layout:
                            shardsCount: 1
                            replicasCount: 1
                    templates:
                      volumeClaimTemplates:
                        - name: default
                          spec:
                            accessModes:
                              - ReadWriteOnce
                            resources:
                              requests:
                                storage: "10Gi" # 将被 patch 覆盖
      patches:
        # 从 AnalyticsStack 的 spec.parameters 中取值,填充到 Helm values
        - fromFieldPath: "spec.parameters.tenantId"
          toFieldPath: "metadata.name"
          transforms:
            - type: string
              string:
                fmt: "clickhouse-%s"
        - fromFieldPath: "spec.parameters.tenantId"
          toFieldPath: "spec.forProvider.values.chInstallations[0].spec.configuration.clusters[0].name"
        - fromFieldPath: "spec.parameters.storageGb"
          toFieldPath: "spec.forProvider.values.chInstallations[0].spec.templates.volumeClaimTemplates[0].spec.resources.requests.storage"
          transforms:
            - type: string
              string:
                fmt: "%dGi"
      # 这是关键一步:定义从创建的资源中提取哪些连接信息
      connectionDetails:
        - fromConnectionSecretKey: host
        - fromConnectionSecretKey: tcp_port
          name: port
        - fromConnectionSecretKey: user
        - fromConnectionSecretKey: password

    # 资源2: 部署 Python 采集服务
    - name: python-ingestor
      base:
        apiVersion: kubernetes.crossplane.io/v1alpha1
        kind: Object
        spec:
          forProvider:
            manifest:
              apiVersion: apps/v1
              kind: Deployment
              spec:
                replicas: 1
                selector:
                  matchLabels:
                    app: "placeholder" # 将被 patch 覆盖
                template:
                  metadata:
                    labels:
                      app: "placeholder" # 将被 patch 覆盖
                  spec:
                    containers:
                      - name: ingestor
                        ports:
                        - containerPort: 8000
                        env:
                          - name: CLICKHOUSE_HOST
                            valueFrom:
                              secretKeyRef:
                                name: "placeholder" # 将被 patch 覆盖
                                key: host
                          - name: CLICKHOUSE_PORT
                            valueFrom:
                              secretKeyRef:
                                name: "placeholder" # 将被 patch 覆盖
                                key: port
                          - name: CLICKHOUSE_USER
                            valueFrom:
                              secretKeyRef:
                                name: "placeholder" # 将被 patch 覆盖
                                key: user
                          - name: CLICKHOUSE_PASSWORD
                            valueFrom:
                              secretKeyRef:
                                name: "placeholder" # 将被 patch 覆盖
                                key: password
      patches:
        - fromFieldPath: "spec.parameters.tenantId"
          toFieldPath: "spec.forProvider.manifest.metadata.name"
          transforms:
            - type: string
              string:
                fmt: "ingestor-%s"
        # ... 其他 patch 用于设置 label, image 等 ...
        - fromFieldPath: "spec.parameters.ingestorImage"
          toFieldPath: "spec.forProvider.manifest.spec.template.spec.containers[0].image"
        # 关键: 将 ClickHouse 的 connection secret 名称注入到 Deployment 中
        - fromFieldPath: "metadata.name"
          toFieldPath: "spec.forProvider.manifest.spec.template.spec.containers[0].env[0].valueFrom.secretKeyRef.name"
          transforms:
            - type: string
              string:
                fmt: "%s-clickhouse-cluster" # 对应上面资源1的 connection secret 名称
        # ... 对其他环境变量重复此操作 ...

    # 资源3: 为 Python 服务创建 Service
    - name: ingestor-service
      base:
        apiVersion: kubernetes.crossplane.io/v1alpha1
        kind: Object
        spec:
          forProvider:
            manifest:
              apiVersion: v1
              kind: Service
              spec:
                type: ClusterIP
                selector:
                  app: "placeholder" # 将被 patch 覆盖
                ports:
                  - protocol: TCP
                    port: 80
                    targetPort: 8000
      patches:
        - fromFieldPath: "spec.parameters.tenantId"
          toFieldPath: "spec.forProvider.manifest.metadata.name"
          transforms:
            - type: string
              string:
                fmt: "ingestor-svc-%s"
        # ... 其他 patch ...
      
      # 关键: 将 Service 的地址写回到 AnalyticsStack 的 status 中
      - fromFieldPath: "spec.forProvider.manifest.metadata.name"
        toFieldPath: "status.ingestorEndpoint"
        transforms:
          - type: string
            string:
              # 构造完整的 K8s DNS 名称
              fmt: "%s.default.svc.cluster.local" 

3. Python 数据采集服务

这是一个简单的 FastAPI 应用,它从环境变量中读取 ClickHouse 的连接信息,并提供一个 /ingest 接口用于写入数据。

# ingestor/app.py
import os
import logging
from contextlib import asynccontextmanager
from typing import List

from fastapi import FastAPI, HTTPException, Request
from pydantic import BaseModel, Field

from clickhouse_driver import Client

# 配置日志
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')

# 从环境变量读取配置,这是云原生应用的最佳实践
CLICKHOUSE_HOST = os.getenv("CLICKHOUSE_HOST")
CLICKHOUSE_PORT = os.getenv("CLICKHOUSE_PORT", 9000)
CLICKHOUSE_USER = os.getenv("CLICKHOUSE_USER", "default")
CLICKHOUSE_PASSWORD = os.getenv("CLICKHOUSE_PASSWORD", "")

# 生产级代码需要更健壮的配置检查
if not CLICKHOUSE_HOST:
    raise ValueError("CLICKHOUSE_HOST environment variable not set.")

db_connection = {
    'host': CLICKHOUSE_HOST,
    'port': CLICKHOUSE_PORT,
    'user': CLICKHOUSE_USER,
    'password': CLICKHOUSE_PASSWORD,
    'database': 'default',
    # 生产环境中必须配置超时和重连策略
    'connect_timeout': 10,
    'send_receive_timeout': 300,
    'sync_request_timeout': 10,
}

# 全局的 ClickHouse client
ch_client = None

@asynccontextmanager
async def lifespan(app: FastAPI):
    global ch_client
    logging.info("Application startup: connecting to ClickHouse...")
    try:
        ch_client = Client(**db_connection)
        # 检查连接并创建表(如果不存在)
        ch_client.execute("CREATE DATABASE IF NOT EXISTS analytics")
        ch_client.execute("""
        CREATE TABLE IF NOT EXISTS analytics.events (
            timestamp DateTime,
            event_type String,
            user_id String,
            payload String
        ) ENGINE = MergeTree()
        PARTITION BY toYYYYMM(timestamp)
        ORDER BY (timestamp, event_type)
        """)
        logging.info("Successfully connected to ClickHouse and ensured table exists.")
    except Exception as e:
        # 启动失败是严重问题,直接抛出异常让容器重启
        logging.error(f"Failed to connect to ClickHouse on startup: {e}")
        raise
    yield
    # 清理资源
    logging.info("Application shutdown: disconnecting from ClickHouse.")
    if ch_client:
        ch_client.disconnect()

app = FastAPI(lifespan=lifespan)

class Event(BaseModel):
    timestamp: int
    event_type: str = Field(..., max_length=50)
    user_id: str = Field(..., max_length=100)
    payload: str

@app.post("/ingest")
async def ingest_events(events: List[Event]):
    """
    接收事件列表并批量写入 ClickHouse。
    批量写入是 ClickHouse 的关键性能优化点。
    """
    if not ch_client:
        raise HTTPException(status_code=503, detail="Database connection not available.")
    if not events:
        return {"status": "ok", "inserted": 0}

    try:
        # 准备批量插入的数据
        data_to_insert = [
            (e.timestamp, e.event_type, e.user_id, e.payload) for e in events
        ]
        
        # ClickHouse 的 client 支持直接插入 list of tuples/dicts
        inserted_rows = ch_client.execute(
            'INSERT INTO analytics.events (timestamp, event_type, user_id, payload) VALUES',
            data_to_insert,
            types_check=True # 在客户端进行类型检查,可以提前发现问题
        )
        logging.info(f"Successfully inserted {len(data_to_insert)} events.")
        return {"status": "ok", "inserted": len(data_to_insert)}
    except Exception as e:
        logging.error(f"Error inserting data into ClickHouse: {e}")
        # 在真实项目中,这里可能需要将失败的数据推送到死信队列进行重试
        raise HTTPException(status_code=500, detail="Failed to ingest data.")

@app.get("/health")
def health_check():
    # 健康检查端点,用于 Kubernetes liveness/readiness probes
    try:
        ch_client.execute('SELECT 1')
        return {"status": "ok"}
    except Exception as e:
        logging.warning(f"Health check failed: {e}")
        raise HTTPException(status_code=503, detail="Cannot connect to ClickHouse")

4. 终端消费者:Jetpack Compose 移动端

移动端应用(或其他服务)现在可以通过平台获取到 ingestorEndpoint,并开始上报和查询数据。下面是一个 Android ViewModel 的示例,展示如何与这个后端交互。

// AnalyticsViewModel.kt
// 假设已使用 Retrofit 或 Ktor 配置好网络客户端
import androidx.lifecycle.ViewModel
import androidx.lifecycle.viewModelScope
import kotlinx.coroutines.launch

// 数据模型需与 Python 服务中的 Event 对应
data class AnalyticsEvent(
    val timestamp: Long,
    val eventType: String,
    val userId: String,
    val payload: String
)

class AnalyticsViewModel(private val analyticsApi: AnalyticsApiService) : ViewModel() {

    // 缓存待发送事件,实现批量上报以优化性能和电量
    private val eventBuffer = mutableListOf<AnalyticsEvent>()

    fun trackEvent(eventType: String, userId: String, payload: String) {
        val event = AnalyticsEvent(
            timestamp = System.currentTimeMillis() / 1000, // 使用 Unix 时间戳 (秒)
            eventType = eventType,
            userId = userId,
            payload = payload
        )
        synchronized(eventBuffer) {
            eventBuffer.add(event)
        }
        
        // 当缓冲区达到一定大小时触发上报,这是一个常见的优化策略
        if (eventBuffer.size >= 50) {
            flushEvents()
        }
    }

    // 主动或被动触发上报
    fun flushEvents() {
        val eventsToFlush: List<AnalyticsEvent>
        synchronized(eventBuffer) {
            if (eventBuffer.isEmpty()) return
            eventsToFlush = ArrayList(eventBuffer)
            eventBuffer.clear()
        }

        viewModelScope.launch {
            try {
                // 调用 Python 服务的 /ingest 接口
                val response = analyticsApi.ingestEvents(eventsToFlush)
                if (response.isSuccessful) {
                    // Log success
                } else {
                    // 失败处理:在真实应用中,应将事件存入本地数据库以便稍后重试
                    // 这是一个关键的容错设计
                    handleFailedIngestion(eventsToFlush)
                }
            } catch (e: Exception) {
                // 网络或其他异常
                handleFailedIngestion(eventsToFlush)
            }
        }
    }

    private fun handleFailedIngestion(events: List<AnalyticsEvent>) {
        // 实现将失败事件持久化到本地 Room 数据库或文件中
        // 并设置一个后台任务(如 WorkManager)在网络恢复时重试
        // ...
    }
}

5. 平台 UI 的可靠性保障:Vitest

这个自助平台本身也需要一个管理界面(Web UI),让平台工程师和开发者能查看所有 AnalyticsStack 的状态。这个 UI 的可靠性至关重要。我们使用 Vite + Vue/React 构建前端,并采用 Vitest 进行单元和组件测试。

假设我们有一个 Vue 组件 AnalyticsStackStatus.vue 用于显示某个 AnalyticsStack 的状态。

// components/__tests__/AnalyticsStackStatus.spec.ts
import { describe, it, expect } from 'vitest';
import { mount } from '@vue/test-utils';
import AnalyticsStackStatus from '../AnalyticsStackStatus.vue';

describe('AnalyticsStackStatus.vue', () => {

    // 测试用例 1: 当资源正在同步时,显示正确的状态
    it('renders syncing status correctly', () => {
        const mockStack = {
            status: {
                conditions: [
                    { type: 'Ready', status: 'False', reason: 'Syncing' }
                ]
            },
            spec: {
                parameters: { tenantId: 'tenant-a' }
            }
        };

        const wrapper = mount(AnalyticsStackStatus, {
            props: { stack: mockStack }
        });

        // 断言组件渲染出了 "Syncing" 文本
        expect(wrapper.text()).toContain('Syncing');
        // 断言应用了正确的 CSS class 用于视觉提示
        const statusIndicator = wrapper.find('.status-indicator');
        expect(statusIndicator.classes()).toContain('bg-yellow-500');
    });

    // 测试用例 2: 当资源就绪时,显示 ingestor endpoint
    it('renders ready status and endpoint when available', () => {
        const mockStack = {
            status: {
                conditions: [
                    { type: 'Ready', status: 'True', reason: 'Available' }
                ],
                ingestorEndpoint: 'ingestor-svc-tenant-b.default.svc.cluster.local'
            },
            spec: {
                parameters: { tenantId: 'tenant-b' }
            }
        };

        const wrapper = mount(AnalyticsStackStatus, {
            props: { stack: mockStack }
        });

        expect(wrapper.text()).toContain('Ready');
        expect(wrapper.text()).toContain('ingestor-svc-tenant-b.default.svc.cluster.local');
        const statusIndicator = wrapper.find('.status-indicator');
        expect(statusIndicator.classes()).toContain('bg-green-500');
    });

    // 测试用例 3: 当 props 为空或无效时,优雅地处理
    it('handles missing status gracefully', () => {
        const mockStack = {
             spec: {
                parameters: { tenantId: 'tenant-c' }
            }
             // status 字段缺失
        };

        const wrapper = mount(AnalyticsStackStatus, {
            props: { stack: mockStack }
        });

        // 一个常见的错误是在这种边界条件下组件崩溃
        // 我们期望它能显示一个未知或等待的状态
        expect(wrapper.text()).toContain('Unknown');
        const statusIndicator = wrapper.find('.status-indicator');
        expect(statusIndicator.classes()).toContain('bg-gray-500');
    });
});

这个测试文件确保了平台 UI 的核心组件在不同数据状态下都能按预期工作,这对于维护一个可靠的内部开发者平台至关重要。

架构的扩展性与局限性

这个基于 Crossplane 的声明式平台为我们提供了一个强大的基础,但它并非没有局限性。

扩展路径:

  • 多环境部署: 我们可以为不同的环境(dev, staging, prod)创建不同的 Composition,它们可能指向不同的云提供商、使用不同的资源规格,但共享同一个 AnalyticsStack API。
  • 增加组件: 可以在 Composition 中轻松添加更多资源,例如一个用于数据缓冲的 Kafka Topic、一个预置了仪表盘的 Grafana 实例,或者一个用于数据查询的 API 网关路由。
  • 策略和配额: 可以集成 OPA Gatekeeper 等策略引擎,对 AnalyticsStack 的创建施加约束,例如限制 storageGb 的最大值,或者只允许特定的 ingestorImage

潜在陷阱与局限:

  • Day-2 操作的复杂性: 资源的创建(Day-1)被极大地简化了,但升级、备份、故障恢复等 Day-2 操作需要更深入的设计。例如,升级 ClickHouse 版本需要底层 ClickHouse Operator 的支持,并通过修改 CompositionAnalyticsStack CR 来触发。这个过程必须经过严格测试。
  • Provider 的质量和成熟度: 整个系统的可靠性强依赖于我们使用的 Crossplane Provider(如 provider-helm, provider-kubernetes)的质量。选择社区活跃、有良好支持的 Provider 至关重要。一个有 bug 的 Provider 可能会导致资源泄漏或状态不一致。
  • 调试挑战: 当一个 AnalyticsStack 卡在 Syncing 状态时,排查问题需要深入理解 Crossplane 的工作原理、Provider 的日志以及底层云平台或 Kubernetes 的事件。这比调试一个失败的 Terraform apply 更具挑战性。
  • 抽象的代价: 高度抽象在简化开发者体验的同时,也可能隐藏了底层的重要细节。当出现性能问题或需要精细调优时,开发者可能需要穿透这层抽象,这要求平台团队提供良好的文档和支持渠道。当前的实现也未考虑跨云厂商的兼容性,要实现真正可移植的 Composition 需要处理大量云厂商之间的差异。

  目录