我们的一个核心平台最近遇到了棘手的性能诊断问题。该平台为企业客户提供服务,身份认证全面采用 SAML 2.0 对接客户的 IdP (Identity Provider)。后端架构则是一套基于 OpenFaaS 的 Serverless 函数,通过 NATS 消息队列进行异步通信。问题在于,当有客户抱怨“登录慢”或“某个操作处理时间过长”时,我们几乎束手无策。日志是分散的,一次用户请求可能触发三到五个函数调用链,其中还夹杂着异步消息。要从海量日志中手动拼凑出一条完整的请求链路,不仅耗时,而且几乎不可能。
传统的 APM 工具对于这种 Serverless 架构的侵入性太强,配置复杂,而且成本高昂。我们的数据分析平台已经在使用 ClickHouse,它惊人的写入性能和分析能力让我们萌生了一个想法:能否构建一套轻量级的、定制化的分布式追踪管道,将所有追踪数据(Spans)直接泵入 ClickHouse,并利用其进行分析?
更进一步,我们能否将这套可观测性系统纳入自动化测试的范畴?在真实项目中,可观测性系统本身也可能出问题。如果 E2E 测试不仅能验证业务功能,还能同时验证追踪数据是否被正确、完整地记录,那么系统的可靠性将形成一个强大的闭环。这正是我们决定将 Cypress、SAML、OpenFaaS 和 ClickHouse 结合起来,解决这个复杂挑战的出发点。
第一步:为追踪数据设计 ClickHouse 表结构
目标是存储符合 OpenTelemetry 规范的 Span 数据。我们需要一个能够支撑高写入吞吐量和快速分析查询的表结构。在真实项目中,表结构的设计至关重要,它直接影响后续的查询性能和存储成本。
-- file: schema.sql
-- 在 ClickHouse 中创建用于存储分布式追踪 Span 的表
-- 我们选择 MergeTree 引擎,这是 ClickHouse 的核心引擎,非常适合高负载的数据写入和分析场景。
CREATE TABLE default.spans (
-- 核心标识符
`Timestamp` DateTime64(9) CODEC(Delta, ZSTD),
`TraceId` String CODEC(ZSTD),
`SpanId` String CODEC(ZSTD),
`ParentSpanId` String CODEC(ZSTD),
-- Span 描述信息
`ServiceName` LowCardinality(String) CODEC(ZSTD),
`SpanName` String CODEC(ZSTD),
`SpanKind` LowCardinality(String) CODEC(ZSTD), -- e.g., 'SERVER', 'CLIENT', 'PRODUCER'
`Duration` UInt64 CODEC(T64, ZSTD), -- 纳秒
-- 状态与事件
`StatusCode` LowCardinality(String) CODEC(ZSTD), -- 'OK', 'ERROR'
`StatusMessage` String CODEC(ZSTD),
-- 资源与属性 (键值对)
-- 这里是设计的关键。将所有标签存储在两个并行的数组中,
-- 相比于使用 Map 类型,查询性能更好,特别是当我们需要过滤特定标签时。
`ResourceAttributesKeys` Array(String) CODEC(ZSTD),
`ResourceAttributesValues` Array(String) CODEC(ZSTD),
`SpanAttributesKeys` Array(String) CODEC(ZSTD),
`SpanAttributesValues` Array(String) CODEC(ZSTD),
-- 用于 TTL (数据生命周期管理)
`IngestTimestamp` DateTime DEFAULT now()
) ENGINE = MergeTree()
PARTITION BY toYYYYMM(Timestamp)
ORDER BY (ServiceName, SpanName, Timestamp, TraceId)
TTL IngestTimestamp + INTERVAL 30 DAY -- 根据业务需求保留30天数据
SETTINGS index_granularity = 8192;
设计考量:
-
ORDER BY
关键:(ServiceName, SpanName, Timestamp, TraceId)
是经过深思熟虑的。查询通常会先筛选服务名 (ServiceName
) 和操作名 (SpanName
),然后按时间范围查找。将这些高频过滤字段放在排序键的前面,可以极大地利用 ClickHouse 的稀疏索引,跳过大量无关的数据块。 -
LowCardinality
: 对于ServiceName
,SpanKind
,StatusCode
这类基数较低(即不同值的数量有限)的字段,使用LowCardinality
类型可以显著减少存储空间并提升查询性能。 -
CODEC
压缩: 对所有字段应用ZSTD
压缩,并对时间戳和时长等数值类型使用Delta
和T64
等专用编解码器,以达到最佳的压缩效果。 -
PARTITION BY toYYYYMM(Timestamp)
: 按月分区。这使得删除过期数据(通过TTL
)或进行大规模数据维护时效率极高,只需删除整个分区目录,而无需扫描和重写数据。 -
Attributes
的存储: 我们没有使用 ClickHouse 的Map
类型,而是选择了两个并行的Array
。这是因为在 ClickHouse 中,对Map
key 的过滤性能远不如对Array
元素的过滤。查询has(SpanAttributesKeys, 'http.status_code')
并找到对应SpanAttributesValues
的索引,通常比SpanAttributes['http.status_code']
更高效。
第二步:为 OpenFaaS 函数注入追踪能力
我们需要一个通用的方式来为所有 Go 语言编写的函数自动处理追踪上下文的提取、创建和传播。一个典型的做法是实现一个 HTTP 中间件,或者一个函数包装器。
// file: tracing/wrapper.go
package tracing
import (
"bytes"
"context"
"encoding/json"
"fmt"
"io"
"log"
"net/http"
"os"
"sync"
"time"
"github.com/google/uuid"
)
// Span 代表一个简化的追踪 Span 结构,与 ClickHouse 表对应
type Span struct {
Timestamp time.Time `json:"Timestamp"`
TraceId string `json:"TraceId"`
SpanId string `json:"SpanId"`
ParentSpanId string `json:"ParentSpanId,omitempty"`
ServiceName string `json:"ServiceName"`
SpanName string `json:"SpanName"`
SpanKind string `json:"SpanKind"`
Duration uint64 `json:"Duration"`
StatusCode string `json:"StatusCode"`
StatusMessage string `json:"StatusMessage,omitempty"`
SpanAttributes map[string]interface{} `json:"-"` // 临时存储,序列化时展开
}
// CollectorEndpoint 是接收 Span 的 OpenFaaS 函数网关地址
var CollectorEndpoint = os.Getenv("TRACE_COLLECTOR_URL") // e.g., "http://gateway.openfaas:8080/async-function/trace-collector"
// spanBuffer 是一个简易的内存批处理缓冲区
var (
spanBuffer []*Span
bufferLock sync.Mutex
bufferSize = 20
)
// WithTracing 是一个函数包装器 (wrapper),为 OpenFaaS handler 注入追踪逻辑
func WithTracing(handler func(http.ResponseWriter, *http.Request) (string, int)) http.HandlerFunc {
serviceName := os.Getenv("faas_function_name")
if serviceName == "" {
serviceName = "unknown-openfaas-service"
}
return func(w http.ResponseWriter, r *http.Request) {
// 1. 提取上游追踪上下文
traceParent := r.Header.Get("traceparent")
traceId, parentSpanId := parseTraceParent(traceParent)
if traceId == "" {
traceId = uuid.NewString()
}
spanId := uuid.NewString()
startTime := time.Now()
// 创建新的追踪上下文并注入到 request context
ctx := r.Context()
newTraceParent := fmt.Sprintf("00-%s-%s-01", traceId, spanId)
ctx = context.WithValue(ctx, "traceparent", newTraceParent)
r = r.WithContext(ctx)
// 2. 执行业务逻辑
// 使用一个 response writer wrapper 来捕获状态码
resWrapper := &responseWriterWrapper{ResponseWriter: w, statusCode: http.StatusOK}
body, statusCode := handler(resWrapper, r)
duration := time.Since(startTime).Nanoseconds()
// 3. 创建 Span
span := &Span{
Timestamp: startTime,
TraceId: traceId,
SpanId: spanId,
ParentSpanId: parentSpanId,
ServiceName: serviceName,
SpanName: r.Method + " " + r.URL.Path,
SpanKind: "SERVER",
Duration: uint64(duration),
SpanAttributes: map[string]interface{}{
"http.method": r.Method,
"http.url": r.URL.String(),
"http.status_code": resWrapper.statusCode,
},
}
if resWrapper.statusCode >= 400 {
span.StatusCode = "ERROR"
span.StatusMessage = http.StatusText(resWrapper.statusCode)
} else {
span.StatusCode = "OK"
}
// 4. 异步发送 Span
go sendSpan(span)
// 5. 返回响应
w.WriteHeader(statusCode)
w.Write([]byte(body))
}
}
// sendSpan 将 Span 添加到缓冲区,并在达到阈值时批量发送
func sendSpan(span *Span) {
if CollectorEndpoint == "" {
log.Println("WARN: TRACE_COLLECTOR_URL not set, skipping trace export.")
return
}
bufferLock.Lock()
spanBuffer = append(spanBuffer, span)
if len(spanBuffer) >= bufferSize {
spansToSend := spanBuffer
spanBuffer = nil
bufferLock.Unlock()
flushSpans(spansToSend)
} else {
bufferLock.Unlock()
}
}
func flushSpans(spans []*Span) {
// 实际生产中,这里应该有重试和死信队列逻辑
// 为了简化,我们只做一次 POST
payload := formatSpansForClickHouse(spans)
body := bytes.NewReader(payload)
req, err := http.NewRequest(http.MethodPost, CollectorEndpoint, body)
if err != nil {
log.Printf("ERROR: failed to create trace collector request: %v\n", err)
return
}
req.Header.Set("Content-Type", "application/json")
// 使用异步调用,不关心 collector 的返回结果
client := &http.Client{Timeout: 2 * time.Second}
_, err = client.Do(req)
if err != nil {
log.Printf("ERROR: failed to send spans to collector: %v\n", err)
}
}
// ... 省略 parseTraceParent, responseWriterWrapper, formatSpansForClickHouse 等辅助函数的实现 ...
// formatSpansForClickHouse 会将 Span 结构体转换为 ClickHouse JSONEachRow 格式
这个包装器通过环境变量自动获取函数名,处理 W3C Trace Context 头,执行业务逻辑,最后异步地将 Span 发送到一个专用的收集器函数。这种方式对业务代码的侵入性极小。
第三步:实现高吞吐的 Trace Collector 函数
这个函数是整个管道的瓶颈所在,它的职责是接收来自各个业务函数的 Span 数据,并高效地批量写入 ClickHouse。
// file: trace-collector/handler.go
package function
import (
"context"
"fmt"
"io"
"log"
"net/http"
"os"
"github.com/ClickHouse/clickhouse-go/v2"
"github.com/ClickHouse/clickhouse-go/v2/lib/driver"
)
var (
conn driver.Conn
)
// 初始化 ClickHouse 连接。在 Serverless 环境中,连接管理至关重要。
// 我们在函数启动时建立连接,并在后续调用中复用它。
func init() {
var err error
conn, err = clickhouse.Open(&clickhouse.Options{
Addr: []string{os.Getenv("CLICKHOUSE_ADDR")}, // e.g., "my-clickhouse-server:9000"
Auth: clickhouse.Auth{
Database: os.Getenv("CLICKHOUSE_DATABASE"),
Username: os.Getenv("CLICKHOUSE_USERNAME"),
Password: os.Getenv("CLICKHOUSE_PASSWORD"),
},
Settings: clickhouse.Settings{
"async_insert": 1,
"wait_for_async_insert": 0,
},
// ... 其他 TLS, 压缩等配置
})
if err != nil {
log.Fatalf("FATAL: failed to connect to ClickHouse: %v", err)
}
}
// Handle 接收批量 Span 数据并写入 ClickHouse
func Handle(w http.ResponseWriter, r *http.Request) {
if r.Method != http.MethodPost {
http.Error(w, "Only POST method is allowed", http.StatusMethodNotAllowed)
return
}
body, err := io.ReadAll(r.Body)
if err != nil {
http.Error(w, "Failed to read request body", http.StatusInternalServerError)
return
}
defer r.Body.Close()
// 这里的 body 已经是 ClickHouse 期望的 JSONEachRow 格式
// e.g., {"TraceId": "...", "SpanId": ...}\n{"TraceId": "...", "SpanId": ...}
err = conn.AsyncInsert(context.Background(), "INSERT INTO default.spans FORMAT JSONEachRow", body)
if err != nil {
log.Printf("ERROR: failed to async insert spans into ClickHouse: %v\n", err)
http.Error(w, "Failed to persist spans", http.StatusInternalServerError)
return
}
w.WriteHeader(http.StatusAccepted)
fmt.Fprint(w, "Spans accepted")
}
关键优化:
- 连接复用:
init
函数确保了 ClickHouse 连接在函数容器的生命周期内只创建一次。 -
async_insert
: 这是一个 ClickHouse 的“杀手级”特性。启用后,服务端会立即确认接收,然后在后台异步地执行数据合并和写入。这极大地降低了客户端的等待时间,从而提升了 Collector 函数的吞吐量。 - 数据格式: 我们约定上游函数直接发送
JSONEachRow
格式的数据。这避免了 Collector 函数进行反序列化和再序列化,节省了大量的 CPU 资源。
第四步:SAML 认证流程与追踪的结合
在处理 SAML 响应的服务端点,我们可以创建整个请求链路的根 Span (Root Span),并将从 SAML断言中获得的用户信息作为标签(Attributes)附加进去。这对于后续按用户排查问题至关重要。
// file: saml-sp/handler.go (伪代码)
// ... SAML SP 逻辑,解析和验证 SAML Response ...
func (sp *ServiceProvider) ServeHTTP(w http.ResponseWriter, r *http.Request) {
// 假设 samlAssertion 已经成功解析和验证
samlAssertion := parseSAMLResponse(r)
// 创建根 Span
traceId := uuid.NewString()
spanId := uuid.NewString()
startTime := time.Now()
// ... 执行创建会话、颁发 token 等业务逻辑 ...
duration := time.Since(startTime).Nanoseconds()
// 创建并发送根 Span
rootSpan := &tracing.Span{
Timestamp: startTime,
TraceId: traceId,
SpanId: spanId,
ParentSpanId: "", // Root Span没有父Span
ServiceName: "saml-sp",
SpanName: "SAML Assertion Consume",
SpanKind: "SERVER",
Duration: uint64(duration),
StatusCode: "OK",
SpanAttributes: map[string]interface{}{
"user.id": samlAssertion.Subject.NameID,
"user.email": samlAssertion.GetAttribute("email"),
"tenant.id": samlAssertion.GetAttribute("tenant_id"),
"idp.issuer": samlAssertion.Issuer,
},
}
go tracing.sendSpan(rootSpan)
// 在颁发给前端的 session token 或 cookie 中携带 traceId
// 或者在重定向的 URL 中包含 traceId,以便前端后续请求可以携带它
session := createSession(samlAssertion.Subject.NameID)
session.Values["trace_id"] = traceId
session.Save(r, w)
// 重定向到应用主页
http.Redirect(w, r, "/app", http.StatusFound)
}
第五步:Cypress E2E 测试的革命性升级
这是整个方案的点睛之笔。我们将编写一个 Cypress 测试,它不仅仅是点击按钮和验证页面元素,而是要验证整个后端追踪链路是否按预期工作。
sequenceDiagram participant Cypress as Cypress Test Runner participant AppFE as Web App (Frontend) participant SamlSP as SAML Service Provider participant FunctionA as OpenFaaS Function A participant TraceCollector as Trace Collector participant ClickHouse as ClickHouse DB participant ValidationAPI as Trace Validation API Cypress->>+AppFE: 1. cy.startTrace() & cy.visit('/login') Note right of Cypress: Injects custom 'X-Trace-Id' header AppFE->>+SamlSP: 2. Redirect to IdP, then back with SAML Response SamlSP->>+TraceCollector: 3. Create Root Span (with user info), send async SamlSP-->>-AppFE: 4. Redirect to app dashboard AppFE->>+FunctionA: 5. User action triggers API call (with 'traceparent' header) FunctionA->>+TraceCollector: 6. Create Child Span, send async FunctionA-->>-AppFE: 7. API Response AppFE-->>-Cypress: 8. UI assertion passes Cypress->>+ValidationAPI: 9. cy.validateTrace(traceId) ValidationAPI->>+ClickHouse: 10. SELECT * FROM spans WHERE TraceId = ? ClickHouse-->>-ValidationAPI: 11. Returns found spans ValidationAPI-->>-Cypress: 12. Spans JSON Cypress->>Cypress: 13. Assert span count, parent-child relations, and tags
首先,在 Cypress 中定义自定义命令:
// file: cypress/support/commands.js
// 为测试生成并注入一个唯一的 Trace ID
Cypress.Commands.add('startTrace', () => {
const traceId = Cypress.env('TRACE_ID') || cy.helpers.generateTraceId();
Cypress.env('TRACE_ID', traceId);
// 使用 `cy.intercept` 来为测试期间的所有请求自动附加 traceparent 头
cy.intercept('*', (req) => {
// 仅为同源或我们关心的 API 域添加
if (req.url.startsWith(Cypress.config('baseUrl'))) {
const spanId = cy.helpers.generateSpanId();
req.headers['traceparent'] = `00-${traceId}-${spanId}-01`;
}
});
});
// 轮询验证 API,断言追踪数据是否已正确记录
Cypress.Commands.add('validateTrace', (options) => {
const traceId = Cypress.env('TRACE_ID');
if (!traceId) {
throw new Error('Trace ID not found. Did you forget to call cy.startTrace()?');
}
const {
expectedSpanCount,
expectedServices,
rootSpanTags,
timeout = 10000 // 默认10秒超时
} = options;
cy.log(`Validating trace: ${traceId}`);
cy.request({
method: 'GET',
url: `${Cypress.env('VALIDATION_API_URL')}/trace/${traceId}`,
retryOnStatusCodeFailure: true,
// Cypress 的重试机制天然适合轮询
retryOn: (res) => {
// 如果 body 是空的或者 span 数量不足,则继续重试
return !res.body || res.body.length < expectedSpanCount;
},
timeout: timeout
}).then((response) => {
expect(response.status).to.eq(200);
const spans = response.body;
cy.log(`Found ${spans.length} spans for trace ${traceId}`);
expect(spans).to.have.lengthOf(expectedSpanCount);
if (expectedServices) {
const serviceNames = spans.map(s => s.ServiceName);
expect(serviceNames).to.include.members(expectedServices);
}
if (rootSpanTags) {
const rootSpan = spans.find(s => s.ParentSpanId === "");
expect(rootSpan).to.not.be.undefined;
// 将 ClickHouse 的 Key-Value 数组转换回对象以便断言
const attributes = cy.helpers.convertArraysToMap(rootSpan.SpanAttributesKeys, rootSpan.SpanAttributesValues);
for (const key in rootSpanTags) {
expect(attributes).to.have.property(key, rootSpanTags[key]);
}
}
});
});
然后,编写一个 E2E 测试用例:
// file: cypress/e2e/saml_login_and_action.cy.js
describe('SAML Login and Core Action Trace Validation', () => {
it('should create a complete and correct trace for the user journey', () => {
// 步骤 1: 启动追踪
cy.startTrace();
// 步骤 2: 执行用户操作 - 这里是 SAML 登录流程
// 这部分会涉及与外部 IdP 的交互,在真实测试中需要 mock IdP 响应
cy.loginWithSAML('[email protected]', 'password123');
// 步骤 3: 登录后,在应用内执行一个会触发后端函数链的操作
cy.get('[data-cy="dashboard-title"]').should('be.visible');
cy.get('[data-cy="trigger-action-button"]').click();
cy.get('[data-cy="action-success-toast"]').should('be.visible');
// 步骤 4: 验证追踪数据
// 这是一个强大的断言:我们确信用户操作触发了3个后端服务,
// 并且根 Span (SAML登录) 必须包含正确的用户ID。
cy.validateTrace({
expectedSpanCount: 3, // saml-sp, function-a, function-b
expectedServices: ['saml-sp', 'user-profile-func', 'data-processor-func'],
rootSpanTags: {
'user.id': '[email protected]',
'tenant.id': 'tenant-123'
}
});
});
});
这个测试不仅确保了 SAML 登录和后续操作的功能正确性,还从根本上保证了系统的可观测性。如果某个开发人员在一次代码重构中破坏了追踪上下文的传递,这个测试将会失败。
局限性与未来展望
我们构建的这套系统虽然解决了核心痛点,但在真实生产环境中,它仍然存在一些局限性:
- 采样策略: 目前我们记录了 100% 的追踪。在高流量下,这会给 ClickHouse 带来巨大压力并增加成本。下一步需要引入采样策略,例如基于头部的采样(上游决定是否追踪)或更复杂的尾部采样(在数据收集端根据整个链路的情况决定是否保留)。
- 可视化: 直接在 ClickHouse 中查询数据对于日常排障来说效率不高。我们需要一个可视化前端,比如 Grafana,通过其 ClickHouse 数据源插件来展示链路火焰图和进行性能分析。
- 上下文传播: 我们手动实现了基于
traceparent
头的 HTTP 上下文传播。但对于 NATS 等其他协议,也需要手动实现消息头的注入和提取。采用像 OpenTelemetry SDK 这样的标准化库,可以自动处理多种协议的上下文传播,但这会增加函数的依赖和一定的复杂性。 - Trace Validation API 的健壮性: 当前的验证 API 比较简单。更复杂的验证可能需要检查 Span 之间的父子关系、耗时是否在预期范围内等,这会增加其查询逻辑的复杂度。
尽管存在这些局限,但这套方案以一种高度定制化、成本可控的方式,为一个复杂的 Serverless 架构提供了深度可观测性,并通过与 E2E 测试的创新性结合,将可观测性本身也纳入了质量保障体系。