构建基于 Vector 和 Pulsar 的高吞吐解耦式全链路追踪管道


直接将应用链路追踪数据推向 Zipkin 服务,在开发环境或小流量场景下看似便捷,但在生产环境中,这是一种脆弱的架构。当后端服务实例扩容到几十上百个,前端用户流量洪峰到来,海量的 trace-span 数据会瞬间冲击 Zipkin 服务。这不仅可能导致 Zipkin 实例过载、数据丢失,更严重的是,观测系统的瓶颈会反向影响到业务应用的性能,例如因网络阻塞或 Agent 内存溢出导致的应用延迟升高。我们需要一个更具弹性和韧性的方案。

问题定义:观测数据管道的耦合与瓶颈

一个典型的、耦合紧密的链路追踪架构如下:

graph TD
    subgraph Client
        Frontend[前端应用]
    end
    subgraph Server
        Backend_A[后端服务A]
        Backend_B[后端服务B]
    end
    subgraph Observability
        Zipkin[Zipkin Server]
        Storage[Elasticsearch/Cassandra]
    end

    Frontend -- HTTP Request with Trace Context --> Backend_A
    Backend_A -- RPC Call with Trace Context --> Backend_B
    Backend_A -- Spans --> Zipkin
    Backend_B -- Spans --> Zipkin
    Zipkin -- Writes --> Storage

这种直连模式的核心问题在于:

  1. 可用性耦合:Zipkin Server 的任何抖动或不可用,都会直接影响到所有后端服务的数据发送逻辑,可能导致内存积压甚至应用崩溃。
  2. 性能瓶颈:所有服务的追踪数据汇聚于一个或少数几个入口点,缺乏削峰填谷的能力。
  3. 扩展性受限:如果想增加一个新的观测后端(如ClickHouse用于分析),或对数据进行预处理(如采样、脱敏),需要修改所有后端服务的 Agent 配置,运维成本极高。

方案一:引入本地日志代理 (Log Agent)

一个常见的改进是引入本地代理,例如在每个节点上部署一个 agent,负责收集数据并转发。这在一定程度上实现了应用与后端的解耦。

  • 优点
    • 应用只需将数据发送到 localhost,降低了网络依赖的复杂性。
    • Agent 可以提供简单的本地缓存。
  • 缺点
    • 传统的日志 Agent(如 Logstash)在处理结构化的 Trace 数据时并非原生支持,性能和效率不高。
    • 本地缓存容量有限,无法应对长时间的后端不可用。
    • 数据处理和路由逻辑依然集中在 Agent 配置中,缺乏灵活性。

方案二:构建基于 Vector 和 Pulsar 的解耦管道

为了彻底解决上述问题,我们设计一个基于 Vector 和 Apache Pulsar 的高吞吐、解耦式管道。

  • Vector: 一个用 Rust 编写的高性能可观测性数据管道。它能以极低的资源消耗接收、转换和路由日志、指标和追踪数据。它的设计哲学就是作为观测数据领域的“瑞士军刀”。
  • Apache Pulsar: 一个云原生的分布式消息流平台。其分层存储、多租户、统一消息模型等特性,使其非常适合作为高流量数据的持久化缓冲区。

新的架构图如下:

graph TD
    subgraph Client
        Frontend[前端应用]
    end

    subgraph App Servers
        Backend_A[后端服务A] -->|Spans| Vector_Agent_A[Vector Agent]
        Backend_B[后端服务B] -->|Spans| Vector_Agent_B[Vector Agent]
    end

    subgraph Data Pipeline
        Vector_Agent_A --> Pulsar
        Vector_Agent_B --> Pulsar
        Pulsar[Apache Pulsar Topic] --> Vector_Aggregator[Vector Aggregator]
    end

    subgraph Observability Backend
        Vector_Aggregator -- Spans --> Zipkin[Zipkin Server]
        Vector_Aggregator -- Raw Data (Optional) --> S3[对象存储/归档]
        Zipkin -- Writes --> Storage[Elasticsearch]
    end

    Frontend -- HTTP Request --> Backend_A
    Backend_A -- RPC --> Backend_B
  • 优点

    • 彻底解耦:后端服务只需与本地 Vector Agent 通信。Vector Agent 也只与 Pulsar 通信。Pulsar 与下游消费者(Vector Aggregator, Zipkin)完全解耦。任何一个组件的故障都不会立即传导到上游。
    • 高可用与持久化:Pulsar 提供了企业级的持久性保证。即使 Zipkin 后端宕机数小时,追踪数据也会安全地存储在 Pulsar 中,待后端恢复后继续消费,实现数据零丢失。
    • 削峰填谷:流量洪峰被 Pulsar 的 Topic 吸收,下游的 Vector Aggregator 可以根据自身处理能力平稳消费,保护了 Zipkin Server。
    • 灵活性与可扩展性:Vector Aggregator 成为数据路由中心。增加新的数据目的地,只需在 Aggregator 上修改配置,无需触碰任何业务服务。Vector 强大的 VRL (Vector Remap Language) 还能实现动态采样、数据丰富、格式转换等复杂逻辑。
  • 缺点

    • 架构复杂度增加:引入了 Vector 和 Pulsar 两个新组件,增加了部署和维护的成本。
    • 数据延迟:数据经过 Pulsar 中转,其可见性会比直连模式有一定延迟。但这对于大部分链路追踪场景是可以接受的。

在真实项目中,为了系统的长期稳定性和可扩展性,方案二带来的好处远大于其复杂性成本。接下来,我们将逐步实现这个架构。

核心实现概览

我们将使用 Docker Compose 在本地模拟整个环境,包括:

  • 一个 React 前端应用
  • 一个 Node.js (Express) 后端服务
  • 一个本地 Vector Agent (与后端服务在同一个 pod/container group 中)
  • 一个 Apache Pulsar 实例
  • 一个中央 Vector Aggregator
  • 一个 Zipkin Server

1. 前端应用:生成并传递 Trace Context

我们使用 zipkin-js 库来为前端的 API 请求注入追踪上下文。

frontend/src/api.js

import { Tracer, BatchRecorder, jsonEncoder } from 'zipkin';
import { HttpLogger } from 'zipkin-transport-http';
import wrapFetch from 'zipkin-instrumentation-fetch';

// 在真实项目中, Zipkin Server的地址应该是配置化的,
// 但在我们这个架构中,前端的trace实际上是后端生成的,前端仅需传递上下文。
// 所以这里配置一个假的logger即可,关键是生成Trace ID。
const recorder = new BatchRecorder({
  logger: new HttpLogger({
    endpoint: 'http://localhost:9411/api/v2/spans', // This won't be used directly by the browser
    jsonEncoder: jsonEncoder.JSON_V2
  })
});

const tracer = new Tracer({
  ctxImpl: new (await import('zipkin-context-cls-hooked')).CLSContext(), // or any other context implementation
  recorder: recorder,
  localServiceName: 'frontend-app'
});

// 使用zipkin-instrumentation-fetch包装原生的fetch
const remoteFetch = wrapFetch(fetch, {
  tracer,
  remoteServiceName: 'backend-api' // 调用的后端服务名
});

export const callApi = () => {
  // 发起一个带追踪上下文的请求
  // zipkin-js会自动在HTTP Headers中添加 B3 propagation headers
  // (e.g., X-B3-TraceId, X-B3-SpanId, X-B3-Sampled)
  return remoteFetch('http://localhost:8080/api/data');
};

这里的核心是 wrapFetch,它会拦截 fetch 请求,并自动注入 X-B3-* 的 HTTP 头。前端本身不直接上报 Span,它的主要职责是创建 Trace 的起点并将上下文传递给第一个后端服务。

2. 后端服务:接收、继续并上报 Trace

后端使用 Express 框架,并集成 zipkin-instrumentation-express 中间件。

backend/server.js

const express = require('express');
const cors = require('cors');
const { Tracer, BatchRecorder, jsonEncoder } = require('zipkin');
const { HttpLogger } = require('zipkin-transport-http');
const zipkinMiddleware = require('zipkin-instrumentation-express').expressMiddleware;

const app = express();
const PORT = 8080;

// 关键点:这里的HttpLogger指向的是本地的Vector Agent,而不是远端的Zipkin Server。
// Vector Agent会监听在9411端口,接收Zipkin格式的数据。
const zipkinEndpoint = process.env.ZIPKIN_ENDPOINT || 'http://localhost:9411/api/v2/spans';
console.log(`Reporting spans to: ${zipkinEndpoint}`);

const recorder = new BatchRecorder({
  logger: new HttpLogger({
    endpoint: zipkinEndpoint,
    jsonEncoder: jsonEncoder.JSON_V2,
    // 超时和agent配置对于生产环境至关重要
    timeout: 5000, 
  })
});

const tracer = new Tracer({
  ctxImpl: new (require('zipkin-context-cls-hooked')).CLSContext(),
  recorder: recorder,
  localServiceName: 'backend-api'
});

app.use(cors());
app.use(zipkinMiddleware({ tracer }));

app.get('/api/data', (req, res) => {
  // 中间件已经从请求头中提取了trace context,并创建了一个新的server-side span
  tracer.local('process-request', () => {
    // 模拟一些业务逻辑耗时
    const processingTime = Math.random() * 200;
    setTimeout(() => {
      res.json({ message: 'Hello from backend!', traceId: tracer.id.traceId });
    }, processingTime);
  });
});

app.listen(PORT, () => {
  console.log(`Backend server running on port ${PORT}`);
});

后端服务的配置非常直接。它接收带有 B3 头的请求,继续该 Trace,并在请求处理完成后,将生成的 Span 发送到 http://localhost:9411,也就是本地 Vector Agent 的监听地址。

3. Vector Agent 配置:接收并推送到 Pulsar

这是数据管道的第一站。Vector Agent 与后端服务部署在一起,它的配置 vector-agent.toml 如下:

# vector-agent.toml

# 1. 定义数据源 (Source)
# 监听 Zipkin 格式的追踪数据
[sources.zipkin_traces]
  type = "zipkin"
  address = "0.0.0.0:9411" # 监听所有网络接口的9411端口

# 2. 定义数据出口 (Sink)
# 将数据推送到 Pulsar topic
[sinks.pulsar_buffer]
  type = "pulsar"
  inputs = ["zipkin_traces"] # 从 zipkin_traces source 接收数据
  
  # Pulsar 连接配置
  # 在Docker Compose中, 'pulsar' 是服务名
  endpoint = "pulsar://pulsar:6650" 
  topic = "persistent://public/default/zipkin-spans"

  # 编码: 将追踪数据编码为原始 JSON 字节流
  # Pulsar sink 需要知道如何序列化数据
  encoding.codec = "json"

  # 性能和可靠性调优
  # 批量发送可以显著提高吞吐量
  batch.max_bytes = 1048576 # 1MB, 权衡延迟和吞吐量
  batch.timeout_secs = 5   # 每5秒或达到1MB时发送一次

  # Pulsar 生产者设置
  # 使用 Gzip 压缩来减少网络传输量和存储成本
  compression = "gzip"

  # 错误处理: 如果 Pulsar 不可用, Vector 会在内存中缓冲
  # 这里的配置需要根据可用内存谨慎设置
[sinks.pulsar_buffer.buffer]
  type = "memory"
  max_events = 5000 # 最多缓冲5000个事件
  when_full = "block" # 当缓冲区满时,阻塞上游,提供背压

这个配置的核心是定义了一个 zipkin source 和一个 pulsar sink。它将从 9411 端口接收到的所有 Zipkin spans 打包、压缩,然后发送到 Pulsar 的 zipkin-spans topic。buffer 配置提供了关键的背压机制,防止在 Pulsar 短暂不可用时数据丢失。

4. Vector Aggregator 配置:从 Pulsar 消费并转发到 Zipkin

中央聚合器负责从 Pulsar 读取数据并将其安全地发送到最终目的地。

vector-aggregator.toml

# vector-aggregator.toml

# 1. 定义数据源 (Source)
# 从 Pulsar topic 消费数据
[sources.pulsar_stream]
  type = "pulsar"
  endpoint = "pulsar://pulsar:6650"
  subscription = "vector-aggregator-sub" # 定义一个持久订阅
  subscription_type = "Shared"          # 允许多个aggregator实例并行消费
  topic = "persistent://public/default/zipkin-spans"
  
  # 解码: 因为上游 sink 使用了 json 编码
  decoding.codec = "json"

# 2. (可选) 定义数据转换 (Transform)
# 在这里可以添加采样、过滤或丰富数据的逻辑
# 例如: 只保留包含错误的trace
# [transforms.sampler]
#   type = "filter"
#   inputs = ["pulsar_stream"]
#   condition = '.tags."error" == "true"'

# 3. 定义数据出口 (Sink)
# 将数据发送到 Zipkin Server
[sinks.zipkin_backend]
  type = "zipkin"
  inputs = ["pulsar_stream"] # 直接从pulsar source接收
  # inputs = ["sampler"] # 如果使用transform, 则从transform接收
  endpoint = "http://zipkin:9411/api/v2/spans"
  
  # 生产环境中的重要配置
  request.concurrency = 10 # 并发请求数, 提高发送效率
  request.timeout_secs = 30 # 较长的超时以应对网络抖动

  # 同样配置缓冲区以应对Zipkin后端不可用
[sinks.zipkin_backend.buffer]
  type = "memory"
  max_size = 536870912 # 512MB
  when_full = "block"

这个配置从 Pulsar 的 zipkin-spans topic 消费数据,然后通过 zipkin sink 将其转发到 Zipkin 服务器。这里的 Shared 订阅类型非常重要,它允许我们水平扩展 Vector Aggregator 实例以提高处理能力。

5. Docker Compose 编排

最后,我们用 docker-compose.yml 把所有服务串联起来。

version: '3.8'

services:
  frontend:
    build: ./frontend
    ports:
      - "3000:3000"
    depends_on:
      - backend

  backend:
    build: ./backend
    ports:
      - "8080:8080"
    environment:
      # 后端将traces发送到本地vector-agent
      - ZIPKIN_ENDPOINT=http://vector-agent:9411/api/v2/spans
    depends_on:
      - vector-agent

  vector-agent:
    image: timberio/vector:0.35.0-alpine
    volumes:
      - ./vector-agent.toml:/etc/vector/vector.toml:ro
    ports:
      # 对外暴露9411, 让后端可以访问
      - "9411:9411" 
    depends_on:
      - pulsar

  pulsar:
    image: apachepulsar/pulsar:3.1.1
    ports:
      - "6650:6650"
      - "8081:8080" # Pulsar admin port
    command: >
      bash -c "bin/apply-config-from-env.sh conf/standalone.conf && 
               bin/pulsar standalone"

  vector-aggregator:
    image: timberio/vector:0.35.0-alpine
    volumes:
      - ./vector-aggregator.toml:/etc/vector/vector.toml:ro
    depends_on:
      - pulsar
      - zipkin

  zipkin:
    image: openzipkin/zipkin:2
    ports:
      # Zipkin UI and API port
      - "9412:9411" 

注意端口映射:后端连接到 vector-agent:9411vector-agent9411 端口被暴露出来,但为了避免与 zipkin 端口冲突,我们将主机的 9411 映射到 vector-agent,而将主机的 9412 映射到 zipkin9411 端口。这样,我们访问 http://localhost:9412 就能看到 Zipkin UI。

启动所有服务后 (docker-compose up),访问前端页面并触发 API 调用,稍等片刻,就能在 Zipkin UI 中看到完整的、跨越前端和后端的链路追踪信息。数据流经了 Backend -> Vector Agent -> Pulsar -> Vector Aggregator -> Zipkin 这一整条我们设计的弹性管道。

架构的扩展性与局限性

这个架构的真正威力在于其扩展性。在 Vector Aggregator 层面,我们可以轻松地增加新的 Sink。例如,增加一个 s3 sink,将所有原始追踪数据备份到对象存储中进行长期归档和离线分析,而无需对现有流程做任何改动。同样,使用 Vector 的 vrl 功能,可以实现非常复杂的动态采样策略,比如对包含特定 HTTP 状态码(如 5xx)的 Trace 进行 100% 采样,而对正常的 200 Trace 只采样 1%,从而在不丢失关键错误信息的前提下,大幅降低存储成本。

当然,该方案也存在局限性。最明显的是增加了系统的运维复杂度,团队需要同时掌握 Vector 和 Pulsar 的知识。其次,数据链路的增长引入了额外的延迟,虽然通常在秒级,但对于需要亚秒级追踪数据可见性的场景可能不适用。最后,整个管道的资源消耗高于直连模式,需要在成本和系统韧性之间做出权衡。对于大多数需要处理大规模流量、且视可观测性为核心能力的系统而言,这种投入是完全值得的。


  目录