构建 OpenFaaS 与 ClickHouse 驱动的 Serverless 可观测性管道并集成 Cypress 进行追踪验证


我们的一个核心平台最近遇到了棘手的性能诊断问题。该平台为企业客户提供服务,身份认证全面采用 SAML 2.0 对接客户的 IdP (Identity Provider)。后端架构则是一套基于 OpenFaaS 的 Serverless 函数,通过 NATS 消息队列进行异步通信。问题在于,当有客户抱怨“登录慢”或“某个操作处理时间过长”时,我们几乎束手无策。日志是分散的,一次用户请求可能触发三到五个函数调用链,其中还夹杂着异步消息。要从海量日志中手动拼凑出一条完整的请求链路,不仅耗时,而且几乎不可能。

传统的 APM 工具对于这种 Serverless 架构的侵入性太强,配置复杂,而且成本高昂。我们的数据分析平台已经在使用 ClickHouse,它惊人的写入性能和分析能力让我们萌生了一个想法:能否构建一套轻量级的、定制化的分布式追踪管道,将所有追踪数据(Spans)直接泵入 ClickHouse,并利用其进行分析?

更进一步,我们能否将这套可观测性系统纳入自动化测试的范畴?在真实项目中,可观测性系统本身也可能出问题。如果 E2E 测试不仅能验证业务功能,还能同时验证追踪数据是否被正确、完整地记录,那么系统的可靠性将形成一个强大的闭环。这正是我们决定将 Cypress、SAML、OpenFaaS 和 ClickHouse 结合起来,解决这个复杂挑战的出发点。

第一步:为追踪数据设计 ClickHouse 表结构

目标是存储符合 OpenTelemetry 规范的 Span 数据。我们需要一个能够支撑高写入吞吐量和快速分析查询的表结构。在真实项目中,表结构的设计至关重要,它直接影响后续的查询性能和存储成本。

-- file: schema.sql
-- 在 ClickHouse 中创建用于存储分布式追踪 Span 的表
-- 我们选择 MergeTree 引擎,这是 ClickHouse 的核心引擎,非常适合高负载的数据写入和分析场景。

CREATE TABLE default.spans (
    -- 核心标识符
    `Timestamp` DateTime64(9) CODEC(Delta, ZSTD),
    `TraceId` String CODEC(ZSTD),
    `SpanId` String CODEC(ZSTD),
    `ParentSpanId` String CODEC(ZSTD),

    -- Span 描述信息
    `ServiceName` LowCardinality(String) CODEC(ZSTD),
    `SpanName` String CODEC(ZSTD),
    `SpanKind` LowCardinality(String) CODEC(ZSTD), -- e.g., 'SERVER', 'CLIENT', 'PRODUCER'
    `Duration` UInt64 CODEC(T64, ZSTD), -- 纳秒

    -- 状态与事件
    `StatusCode` LowCardinality(String) CODEC(ZSTD), -- 'OK', 'ERROR'
    `StatusMessage` String CODEC(ZSTD),

    -- 资源与属性 (键值对)
    -- 这里是设计的关键。将所有标签存储在两个并行的数组中,
    -- 相比于使用 Map 类型,查询性能更好,特别是当我们需要过滤特定标签时。
    `ResourceAttributesKeys` Array(String) CODEC(ZSTD),
    `ResourceAttributesValues` Array(String) CODEC(ZSTD),
    `SpanAttributesKeys` Array(String) CODEC(ZSTD),
    `SpanAttributesValues` Array(String) CODEC(ZSTD),

    -- 用于 TTL (数据生命周期管理)
    `IngestTimestamp` DateTime DEFAULT now()

) ENGINE = MergeTree()
PARTITION BY toYYYYMM(Timestamp)
ORDER BY (ServiceName, SpanName, Timestamp, TraceId)
TTL IngestTimestamp + INTERVAL 30 DAY -- 根据业务需求保留30天数据
SETTINGS index_granularity = 8192;

设计考量:

  1. ORDER BY 关键: (ServiceName, SpanName, Timestamp, TraceId) 是经过深思熟虑的。查询通常会先筛选服务名 (ServiceName) 和操作名 (SpanName),然后按时间范围查找。将这些高频过滤字段放在排序键的前面,可以极大地利用 ClickHouse 的稀疏索引,跳过大量无关的数据块。
  2. LowCardinality: 对于 ServiceName, SpanKind, StatusCode 这类基数较低(即不同值的数量有限)的字段,使用 LowCardinality 类型可以显著减少存储空间并提升查询性能。
  3. CODEC 压缩: 对所有字段应用 ZSTD 压缩,并对时间戳和时长等数值类型使用 DeltaT64 等专用编解码器,以达到最佳的压缩效果。
  4. PARTITION BY toYYYYMM(Timestamp): 按月分区。这使得删除过期数据(通过 TTL)或进行大规模数据维护时效率极高,只需删除整个分区目录,而无需扫描和重写数据。
  5. Attributes 的存储: 我们没有使用 ClickHouse 的 Map 类型,而是选择了两个并行的 Array。这是因为在 ClickHouse 中,对 Map key 的过滤性能远不如对 Array 元素的过滤。查询 has(SpanAttributesKeys, 'http.status_code') 并找到对应 SpanAttributesValues 的索引,通常比 SpanAttributes['http.status_code'] 更高效。

第二步:为 OpenFaaS 函数注入追踪能力

我们需要一个通用的方式来为所有 Go 语言编写的函数自动处理追踪上下文的提取、创建和传播。一个典型的做法是实现一个 HTTP 中间件,或者一个函数包装器。

// file: tracing/wrapper.go
package tracing

import (
	"bytes"
	"context"
	"encoding/json"
	"fmt"
	"io"
	"log"
	"net/http"
	"os"
	"sync"
	"time"

	"github.com/google/uuid"
)

// Span 代表一个简化的追踪 Span 结构,与 ClickHouse 表对应
type Span struct {
	Timestamp      time.Time `json:"Timestamp"`
	TraceId        string    `json:"TraceId"`
	SpanId         string    `json:"SpanId"`
	ParentSpanId   string    `json:"ParentSpanId,omitempty"`
	ServiceName    string    `json:"ServiceName"`
	SpanName       string    `json:"SpanName"`
	SpanKind       string    `json:"SpanKind"`
	Duration       uint64    `json:"Duration"`
	StatusCode     string    `json:"StatusCode"`
	StatusMessage  string    `json:"StatusMessage,omitempty"`
	SpanAttributes map[string]interface{} `json:"-"` // 临时存储,序列化时展开
}

// CollectorEndpoint 是接收 Span 的 OpenFaaS 函数网关地址
var CollectorEndpoint = os.Getenv("TRACE_COLLECTOR_URL") // e.g., "http://gateway.openfaas:8080/async-function/trace-collector"

// spanBuffer 是一个简易的内存批处理缓冲区
var (
	spanBuffer []*Span
	bufferLock sync.Mutex
	bufferSize = 20
)

// WithTracing 是一个函数包装器 (wrapper),为 OpenFaaS handler 注入追踪逻辑
func WithTracing(handler func(http.ResponseWriter, *http.Request) (string, int)) http.HandlerFunc {
	serviceName := os.Getenv("faas_function_name")
	if serviceName == "" {
		serviceName = "unknown-openfaas-service"
	}

	return func(w http.ResponseWriter, r *http.Request) {
		// 1. 提取上游追踪上下文
		traceParent := r.Header.Get("traceparent")
		traceId, parentSpanId := parseTraceParent(traceParent)
		if traceId == "" {
			traceId = uuid.NewString()
		}

		spanId := uuid.NewString()
		startTime := time.Now()

		// 创建新的追踪上下文并注入到 request context
		ctx := r.Context()
		newTraceParent := fmt.Sprintf("00-%s-%s-01", traceId, spanId)
		ctx = context.WithValue(ctx, "traceparent", newTraceParent)
		r = r.WithContext(ctx)

		// 2. 执行业务逻辑
		// 使用一个 response writer wrapper 来捕获状态码
		resWrapper := &responseWriterWrapper{ResponseWriter: w, statusCode: http.StatusOK}
		body, statusCode := handler(resWrapper, r)
		
		duration := time.Since(startTime).Nanoseconds()

		// 3. 创建 Span
		span := &Span{
			Timestamp:   startTime,
			TraceId:     traceId,
			SpanId:      spanId,
			ParentSpanId: parentSpanId,
			ServiceName: serviceName,
			SpanName:    r.Method + " " + r.URL.Path,
			SpanKind:    "SERVER",
			Duration:    uint64(duration),
			SpanAttributes: map[string]interface{}{
				"http.method": r.Method,
				"http.url": r.URL.String(),
				"http.status_code": resWrapper.statusCode,
			},
		}

		if resWrapper.statusCode >= 400 {
			span.StatusCode = "ERROR"
			span.StatusMessage = http.StatusText(resWrapper.statusCode)
		} else {
			span.StatusCode = "OK"
		}
		
		// 4. 异步发送 Span
		go sendSpan(span)

		// 5. 返回响应
		w.WriteHeader(statusCode)
		w.Write([]byte(body))
	}
}


// sendSpan 将 Span 添加到缓冲区,并在达到阈值时批量发送
func sendSpan(span *Span) {
    if CollectorEndpoint == "" {
        log.Println("WARN: TRACE_COLLECTOR_URL not set, skipping trace export.")
        return
    }

	bufferLock.Lock()
	spanBuffer = append(spanBuffer, span)
	if len(spanBuffer) >= bufferSize {
		spansToSend := spanBuffer
		spanBuffer = nil
		bufferLock.Unlock()
		flushSpans(spansToSend)
	} else {
		bufferLock.Unlock()
	}
}

func flushSpans(spans []*Span) {
    // 实际生产中,这里应该有重试和死信队列逻辑
    // 为了简化,我们只做一次 POST
    payload := formatSpansForClickHouse(spans)
    body := bytes.NewReader(payload)
    
    req, err := http.NewRequest(http.MethodPost, CollectorEndpoint, body)
    if err != nil {
        log.Printf("ERROR: failed to create trace collector request: %v\n", err)
        return
    }
    req.Header.Set("Content-Type", "application/json")

    // 使用异步调用,不关心 collector 的返回结果
    client := &http.Client{Timeout: 2 * time.Second}
    _, err = client.Do(req)
    if err != nil {
        log.Printf("ERROR: failed to send spans to collector: %v\n", err)
    }
}

// ... 省略 parseTraceParent, responseWriterWrapper, formatSpansForClickHouse 等辅助函数的实现 ...
// formatSpansForClickHouse 会将 Span 结构体转换为 ClickHouse JSONEachRow 格式

这个包装器通过环境变量自动获取函数名,处理 W3C Trace Context 头,执行业务逻辑,最后异步地将 Span 发送到一个专用的收集器函数。这种方式对业务代码的侵入性极小。

第三步:实现高吞吐的 Trace Collector 函数

这个函数是整个管道的瓶颈所在,它的职责是接收来自各个业务函数的 Span 数据,并高效地批量写入 ClickHouse。

// file: trace-collector/handler.go
package function

import (
	"context"
	"fmt"
	"io"
	"log"
	"net/http"
	"os"

	"github.com/ClickHouse/clickhouse-go/v2"
	"github.com/ClickHouse/clickhouse-go/v2/lib/driver"
)

var (
	conn driver.Conn
)

// 初始化 ClickHouse 连接。在 Serverless 环境中,连接管理至关重要。
// 我们在函数启动时建立连接,并在后续调用中复用它。
func init() {
	var err error
	conn, err = clickhouse.Open(&clickhouse.Options{
		Addr: []string{os.Getenv("CLICKHOUSE_ADDR")}, // e.g., "my-clickhouse-server:9000"
		Auth: clickhouse.Auth{
			Database: os.Getenv("CLICKHOUSE_DATABASE"),
			Username: os.Getenv("CLICKHOUSE_USERNAME"),
			Password: os.Getenv("CLICKHOUSE_PASSWORD"),
		},
		Settings: clickhouse.Settings{
			"async_insert": 1,
			"wait_for_async_insert": 0,
		},
		// ... 其他 TLS, 压缩等配置
	})
	if err != nil {
		log.Fatalf("FATAL: failed to connect to ClickHouse: %v", err)
	}
}

// Handle 接收批量 Span 数据并写入 ClickHouse
func Handle(w http.ResponseWriter, r *http.Request) {
	if r.Method != http.MethodPost {
		http.Error(w, "Only POST method is allowed", http.StatusMethodNotAllowed)
		return
	}

	body, err := io.ReadAll(r.Body)
	if err != nil {
		http.Error(w, "Failed to read request body", http.StatusInternalServerError)
		return
	}
	defer r.Body.Close()

	// 这里的 body 已经是 ClickHouse 期望的 JSONEachRow 格式
	// e.g., {"TraceId": "...", "SpanId": ...}\n{"TraceId": "...", "SpanId": ...}
	err = conn.AsyncInsert(context.Background(), "INSERT INTO default.spans FORMAT JSONEachRow", body)
	if err != nil {
		log.Printf("ERROR: failed to async insert spans into ClickHouse: %v\n", err)
		http.Error(w, "Failed to persist spans", http.StatusInternalServerError)
		return
	}

	w.WriteHeader(http.StatusAccepted)
	fmt.Fprint(w, "Spans accepted")
}

关键优化:

  • 连接复用: init 函数确保了 ClickHouse 连接在函数容器的生命周期内只创建一次。
  • async_insert: 这是一个 ClickHouse 的“杀手级”特性。启用后,服务端会立即确认接收,然后在后台异步地执行数据合并和写入。这极大地降低了客户端的等待时间,从而提升了 Collector 函数的吞吐量。
  • 数据格式: 我们约定上游函数直接发送 JSONEachRow 格式的数据。这避免了 Collector 函数进行反序列化和再序列化,节省了大量的 CPU 资源。

第四步:SAML 认证流程与追踪的结合

在处理 SAML 响应的服务端点,我们可以创建整个请求链路的根 Span (Root Span),并将从 SAML断言中获得的用户信息作为标签(Attributes)附加进去。这对于后续按用户排查问题至关重要。

// file: saml-sp/handler.go (伪代码)

// ... SAML SP 逻辑,解析和验证 SAML Response ...

func (sp *ServiceProvider) ServeHTTP(w http.ResponseWriter, r *http.Request) {
    // 假设 samlAssertion 已经成功解析和验证
    samlAssertion := parseSAMLResponse(r)

    // 创建根 Span
    traceId := uuid.NewString()
    spanId := uuid.NewString()
    startTime := time.Now()

    // ... 执行创建会话、颁发 token 等业务逻辑 ...
    
    duration := time.Since(startTime).Nanoseconds()

    // 创建并发送根 Span
    rootSpan := &tracing.Span{
        Timestamp:    startTime,
        TraceId:      traceId,
        SpanId:       spanId,
        ParentSpanId: "", // Root Span没有父Span
        ServiceName:  "saml-sp",
        SpanName:     "SAML Assertion Consume",
        SpanKind:     "SERVER",
        Duration:     uint64(duration),
        StatusCode:   "OK",
        SpanAttributes: map[string]interface{}{
            "user.id": samlAssertion.Subject.NameID,
            "user.email": samlAssertion.GetAttribute("email"),
            "tenant.id": samlAssertion.GetAttribute("tenant_id"),
            "idp.issuer": samlAssertion.Issuer,
        },
    }
    go tracing.sendSpan(rootSpan)
    
    // 在颁发给前端的 session token 或 cookie 中携带 traceId
    // 或者在重定向的 URL 中包含 traceId,以便前端后续请求可以携带它
    session := createSession(samlAssertion.Subject.NameID)
    session.Values["trace_id"] = traceId
    session.Save(r, w)

    // 重定向到应用主页
    http.Redirect(w, r, "/app", http.StatusFound)
}

第五步:Cypress E2E 测试的革命性升级

这是整个方案的点睛之笔。我们将编写一个 Cypress 测试,它不仅仅是点击按钮和验证页面元素,而是要验证整个后端追踪链路是否按预期工作。

sequenceDiagram
    participant Cypress as Cypress Test Runner
    participant AppFE as Web App (Frontend)
    participant SamlSP as SAML Service Provider
    participant FunctionA as OpenFaaS Function A
    participant TraceCollector as Trace Collector
    participant ClickHouse as ClickHouse DB
    participant ValidationAPI as Trace Validation API

    Cypress->>+AppFE: 1. cy.startTrace() & cy.visit('/login')
    Note right of Cypress: Injects custom 'X-Trace-Id' header
    AppFE->>+SamlSP: 2. Redirect to IdP, then back with SAML Response
    SamlSP->>+TraceCollector: 3. Create Root Span (with user info), send async
    SamlSP-->>-AppFE: 4. Redirect to app dashboard
    AppFE->>+FunctionA: 5. User action triggers API call (with 'traceparent' header)
    FunctionA->>+TraceCollector: 6. Create Child Span, send async
    FunctionA-->>-AppFE: 7. API Response
    AppFE-->>-Cypress: 8. UI assertion passes
    
    Cypress->>+ValidationAPI: 9. cy.validateTrace(traceId)
    ValidationAPI->>+ClickHouse: 10. SELECT * FROM spans WHERE TraceId = ?
    ClickHouse-->>-ValidationAPI: 11. Returns found spans
    ValidationAPI-->>-Cypress: 12. Spans JSON
    Cypress->>Cypress: 13. Assert span count, parent-child relations, and tags

首先,在 Cypress 中定义自定义命令:

// file: cypress/support/commands.js

// 为测试生成并注入一个唯一的 Trace ID
Cypress.Commands.add('startTrace', () => {
  const traceId = Cypress.env('TRACE_ID') || cy.helpers.generateTraceId();
  Cypress.env('TRACE_ID', traceId);

  // 使用 `cy.intercept` 来为测试期间的所有请求自动附加 traceparent 头
  cy.intercept('*', (req) => {
    // 仅为同源或我们关心的 API 域添加
    if (req.url.startsWith(Cypress.config('baseUrl'))) {
        const spanId = cy.helpers.generateSpanId();
        req.headers['traceparent'] = `00-${traceId}-${spanId}-01`;
    }
  });
});

// 轮询验证 API,断言追踪数据是否已正确记录
Cypress.Commands.add('validateTrace', (options) => {
  const traceId = Cypress.env('TRACE_ID');
  if (!traceId) {
    throw new Error('Trace ID not found. Did you forget to call cy.startTrace()?');
  }

  const {
    expectedSpanCount,
    expectedServices,
    rootSpanTags,
    timeout = 10000 // 默认10秒超时
  } = options;

  cy.log(`Validating trace: ${traceId}`);

  cy.request({
    method: 'GET',
    url: `${Cypress.env('VALIDATION_API_URL')}/trace/${traceId}`,
    retryOnStatusCodeFailure: true,
    // Cypress 的重试机制天然适合轮询
    retryOn: (res) => {
      // 如果 body 是空的或者 span 数量不足,则继续重试
      return !res.body || res.body.length < expectedSpanCount;
    },
    timeout: timeout
  }).then((response) => {
    expect(response.status).to.eq(200);
    const spans = response.body;

    cy.log(`Found ${spans.length} spans for trace ${traceId}`);
    expect(spans).to.have.lengthOf(expectedSpanCount);

    if (expectedServices) {
      const serviceNames = spans.map(s => s.ServiceName);
      expect(serviceNames).to.include.members(expectedServices);
    }
    
    if (rootSpanTags) {
        const rootSpan = spans.find(s => s.ParentSpanId === "");
        expect(rootSpan).to.not.be.undefined;

        // 将 ClickHouse 的 Key-Value 数组转换回对象以便断言
        const attributes = cy.helpers.convertArraysToMap(rootSpan.SpanAttributesKeys, rootSpan.SpanAttributesValues);
        
        for (const key in rootSpanTags) {
            expect(attributes).to.have.property(key, rootSpanTags[key]);
        }
    }
  });
});

然后,编写一个 E2E 测试用例:

// file: cypress/e2e/saml_login_and_action.cy.js

describe('SAML Login and Core Action Trace Validation', () => {
  it('should create a complete and correct trace for the user journey', () => {
    // 步骤 1: 启动追踪
    cy.startTrace();

    // 步骤 2: 执行用户操作 - 这里是 SAML 登录流程
    // 这部分会涉及与外部 IdP 的交互,在真实测试中需要 mock IdP 响应
    cy.loginWithSAML('[email protected]', 'password123');

    // 步骤 3: 登录后,在应用内执行一个会触发后端函数链的操作
    cy.get('[data-cy="dashboard-title"]').should('be.visible');
    cy.get('[data-cy="trigger-action-button"]').click();
    cy.get('[data-cy="action-success-toast"]').should('be.visible');

    // 步骤 4: 验证追踪数据
    // 这是一个强大的断言:我们确信用户操作触发了3个后端服务,
    // 并且根 Span (SAML登录) 必须包含正确的用户ID。
    cy.validateTrace({
      expectedSpanCount: 3, // saml-sp, function-a, function-b
      expectedServices: ['saml-sp', 'user-profile-func', 'data-processor-func'],
      rootSpanTags: {
        'user.id': '[email protected]',
        'tenant.id': 'tenant-123'
      }
    });
  });
});

这个测试不仅确保了 SAML 登录和后续操作的功能正确性,还从根本上保证了系统的可观测性。如果某个开发人员在一次代码重构中破坏了追踪上下文的传递,这个测试将会失败。

局限性与未来展望

我们构建的这套系统虽然解决了核心痛点,但在真实生产环境中,它仍然存在一些局限性:

  1. 采样策略: 目前我们记录了 100% 的追踪。在高流量下,这会给 ClickHouse 带来巨大压力并增加成本。下一步需要引入采样策略,例如基于头部的采样(上游决定是否追踪)或更复杂的尾部采样(在数据收集端根据整个链路的情况决定是否保留)。
  2. 可视化: 直接在 ClickHouse 中查询数据对于日常排障来说效率不高。我们需要一个可视化前端,比如 Grafana,通过其 ClickHouse 数据源插件来展示链路火焰图和进行性能分析。
  3. 上下文传播: 我们手动实现了基于 traceparent 头的 HTTP 上下文传播。但对于 NATS 等其他协议,也需要手动实现消息头的注入和提取。采用像 OpenTelemetry SDK 这样的标准化库,可以自动处理多种协议的上下文传播,但这会增加函数的依赖和一定的复杂性。
  4. Trace Validation API 的健壮性: 当前的验证 API 比较简单。更复杂的验证可能需要检查 Span 之间的父子关系、耗时是否在预期范围内等,这会增加其查询逻辑的复杂度。

尽管存在这些局限,但这套方案以一种高度定制化、成本可控的方式,为一个复杂的 Serverless 架构提供了深度可观测性,并通过与 E2E 测试的创新性结合,将可观测性本身也纳入了质量保障体系。


  目录