构建基于 Service Worker 与图数据库的客户端用户旅程可观测性管道


标准的 RUM (Real User Monitoring) 工具总让人觉得少了点什么。它们能告诉我 LCP 是多少、FID 有多长、有没有 JS 错误,但无法回答一个更根本的问题:“用户到底做了什么才导致了这次糟糕的体验?” 孤立的性能指标就像一张张静态的照片,而我需要的是一部完整的、能够回溯因果的电影。当线上出现性能抖动时,我不想只看到一个 API 耗时 5 秒的告警,我想知道是哪个用户,在哪条业务路径下,点击了哪个按钮,才触发了这个缓慢的 API 调用,以及这个调用最终又影响了哪些后续的渲染。

要实现这一点,我们需要捕获的不仅仅是指标,更是事件之间的关系。用户的每一次点击、每一次路由跳转、每一次 API 请求,都构成了一个有向无环图。这个想法催生了一个自建可观测性管道的构想:在客户端利用 Service Worker 捕获高保真事件,通过一个轻量级的 Express.js 网关进行数据接收,将原始日志流推送到 Loki 进行低成本存储与查询,同时,将事件间的关系建模并存入图数据库,用于深度、上下文感知的故障排查。

第一步:设计数据采集器 - 一个不影响性能的 Service Worker

采集端的首要原则是低侵入性高性能。如果监控本身就拖慢了应用,那就本末倒置了。Service Worker 是这个角色的最佳选择。它独立于主线程,生命周期也与页面无关,即使在页面跳转或关闭的瞬间,它仍然有时间将缓存的事件发送出去。

我们采集的核心是 fetch 事件。拦截所有出站请求,我们就能捕alahkan API 调用的完整生命周期。

// public/sw.js

// 全局唯一的会话ID,在 Service Worker 激活时生成
let sessionId = self.crypto.randomUUID();
let eventQueue = [];
let flushTimeout = null;
const FLUSH_INTERVAL = 5000; // 每5秒批量上报一次
const MAX_QUEUE_SIZE = 100; // 或者队列达到100个事件时上报
const INGESTION_ENDPOINT = '/api/telemetry';

self.addEventListener('install', (event) => {
  // 强制新的 Service Worker 立即取代旧的
  event.waitUntil(self.skipWaiting());
});

self.addEventListener('activate', (event) => {
  // 立即控制所有客户端
  event.waitUntil(self.clients.claim().then(() => {
    // 当SW激活时,重置sessionId,代表一个新的用户会话开始
    sessionId = self.crypto.randomUUID();
    console.log(`Service Worker activated. New session ID: ${sessionId}`);
  }));
});

// 核心:拦截网络请求
self.addEventListener('fetch', (event) => {
  const { request } = event;

  // 只监控我们自己的API调用和关键资源,避免采集第三方脚本
  if (!request.url.startsWith(self.location.origin) || request.url.includes(INGESTION_ENDPOINT)) {
    return;
  }

  const eventId = self.crypto.randomUUID();
  const startTime = performance.now();

  const recordFetchEvent = (response) => {
    const duration = performance.now() - startTime;
    const isError = response instanceof Error;
    const status = isError ? 599 : response.status; // 599作为客户端网络错误代码
    
    const eventData = {
      type: 'API_CALL',
      id: eventId,
      sessionId,
      timestamp: Date.now(),
      payload: {
        method: request.method,
        url: request.url,
        status,
        duration: parseFloat(duration.toFixed(2)),
        // 在真实项目中,需要谨慎处理header和body,可能包含敏感信息
        // requestHeaders: Object.fromEntries(request.headers.entries()),
        // responseHeaders: isError ? {} : Object.fromEntries(response.headers.entries()),
        error: isError ? response.message : null,
        initiator: request.referrer, // 调用来源
      },
    };
    pushToQueue(eventData);
  };

  event.respondWith(
    fetch(request.clone())
      .then((response) => {
        recordFetchEvent(response.clone());
        return response;
      })
      .catch((err) => {
        recordFetchEvent(err);
        // 必须抛出错误,否则原始请求将无法感知到失败
        throw err;
      })
  );
});

// 接收来自主线程的消息,例如UI交互事件
self.addEventListener('message', (event) => {
  const { data } = event;
  if (data && data.type) {
    const eventData = {
      ...data,
      id: self.crypto.randomUUID(),
      sessionId,
      timestamp: Date.now(),
    };
    pushToQueue(eventData);
  }
});

function pushToQueue(eventData) {
  eventQueue.push(eventData);
  if (eventQueue.length >= MAX_QUEUE_SIZE) {
    // 队列满,立即发送
    flushQueue();
  } else if (!flushTimeout) {
    // 队列未满,设置定时器
    flushTimeout = setTimeout(flushQueue, FLUSH_INTERVAL);
  }
}

async function flushQueue() {
  if (flushTimeout) {
    clearTimeout(flushTimeout);
    flushTimeout = null;
  }
  
  if (eventQueue.length === 0) {
    return;
  }

  const batch = [...eventQueue];
  eventQueue = [];

  try {
    // 使用 keepalive 确保在页面卸载时请求也能成功发送
    const response = await fetch(INGESTION_ENDPOINT, {
      method: 'POST',
      headers: {
        'Content-Type': 'application/json',
      },
      body: JSON.stringify(batch),
      keepalive: true,
    });
    if (!response.ok) {
        // 在真实项目中,这里需要实现重试机制,例如将失败的batch存入IndexedDB
        console.error('Failed to send telemetry data:', response.statusText);
        // 将发送失败的数据重新放回队列前端
        eventQueue.unshift(...batch);
    }
  } catch (error) {
    console.error('Error sending telemetry data:', error);
    eventQueue.unshift(...batch);
  }
}

客户端页面需要注册此 Service Worker,并使用 postMessage 发送 UI 事件:

// client-app.js

// 注册 Service Worker
if ('serviceWorker' in navigator) {
  window.addEventListener('load', () => {
    navigator.serviceWorker.register('/sw.js').then(registration => {
      console.log('ServiceWorker registration successful with scope: ', registration.scope);
    }).catch(error => {
      console.log('ServiceWorker registration failed: ', error);
    });
  });
}

// 封装一个函数来向SW发送事件
function sendTelemetryEvent(type, payload) {
  if (navigator.serviceWorker && navigator.serviceWorker.controller) {
    navigator.serviceWorker.controller.postMessage({
      type,
      payload,
    });
  }
}

// 示例:监听按钮点击
document.getElementById('checkout-button').addEventListener('click', () => {
  sendTelemetryEvent('UI_CLICK', {
    elementId: 'checkout-button',
    elementText: '确认支付',
    path: window.location.pathname,
  });
  // ... 触发API调用等
});

// 示例:路由变化
window.addEventListener('popstate', () => {
    sendTelemetryEvent('ROUTE_CHANGE', {
        from: previousPath, // 需要自己维护 previousPath
        to: window.location.pathname,
    });
});

这里的关键在于批处理(flushQueue)和 keepalive 标志。批处理避免了为每个事件都发起网络请求,极大地降低了客户端和服务端的负载。keepalive 确保了即使用户关闭了标签页,这批数据的发送请求也会被浏览器继续处理。

第二步:构建数据接收端 - 高吞吐的 Express.js 服务

接收端的角色很简单:接收数据,快速响应,然后异步地将数据分发到下游。Express.js 加上一些优化,完全可以胜任。

// server.js

const express = require('express');
const compression = require('compression');
const neo4j = require('neo4j-driver');
const { createLogger, transports } = require('winston');
const LokiTransport = require('winston-loki');

// --- 配置 ---
const PORT = process.env.PORT || 3001;
const LOKI_HOST = process.env.LOKI_HOST || 'http://localhost:3100';
const NEO4J_URI = process.env.NEO4J_URI || 'bolt://localhost:7687';
const NEO4J_USER = process.env.NEO4J_USER || 'neo4j';
const NEO4J_PASSWORD = process.env.NEO4J_PASSWORD || 'password';

// --- 初始化连接 ---
const app = express();
const driver = neo4j.driver(NEO4J_URI, neo4j.auth.basic(NEO4J_USER, NEO4J_PASSWORD));
const lokiLogger = createLogger({
  transports: [
    new LokiTransport({
      host: LOKI_HOST,
      json: true,
      labels: { job: 'frontend-telemetry' },
      // 避免将所有元数据作为Loki标签,防止索引爆炸
      // 在真实项目中,应该只把低基数的维度如 'env', 'app_version' 作为标签
      replaceTimestamp: true,
      onConnectionError: (err) => console.error('Loki connection error:', err),
    }),
  ],
});

// --- 中间件 ---
// 启用 Gzip 压缩,对于 JSON 负载效果显著
app.use(compression());
// 解析 JSON body,设置一个较大的限制,因为我们是批量接收
app.use(express.json({ limit: '2mb' }));

// --- 核心路由 ---
app.post('/api/telemetry', (req, res) => {
  const events = req.body;

  if (!Array.isArray(events) || events.length === 0) {
    return res.status(400).send('Bad Request: Expected a non-empty array of events.');
  }

  // 立即响应客户端,表示已接收
  res.status(202).send('Accepted');

  // 异步处理,不阻塞响应
  processEvents(events).catch(err => {
    // 这里的日志记录至关重要,用于排查数据处理失败的问题
    console.error('Failed to process event batch:', err, 'Batch size:', events.length);
  });
});

async function processEvents(events) {
  // 1. 将所有原始事件推送到 Loki
  events.forEach(event => {
    // Loki 的日志消息是字符串,我们将整个事件对象序列化
    lokiLogger.info({
      message: JSON.stringify(event),
      labels: {
        sessionId: event.sessionId,
        eventType: event.type,
      }
    });
  });

  // 2. 将事件关系写入图数据库
  const session = driver.session();
  try {
    // 在一个事务中处理整个批次,保证原子性
    await session.executeWrite(async tx => {
      for (const event of events) {
        // 创建会话节点(如果不存在)
        await tx.run(
          'MERGE (s:Session {id: $sessionId}) ON CREATE SET s.createdAt = timestamp()',
          { sessionId: event.sessionId }
        );

        // 根据事件类型创建不同的节点和关系
        switch (event.type) {
          case 'UI_CLICK':
            await tx.run(
              `
                MERGE (e:UIElement {id: $elementId}) ON CREATE SET e.text = $elementText
                MATCH (s:Session {id: $sessionId})
                CREATE (s)-[:PERFORMED]->(a:Action:Click {id: $id, timestamp: $timestamp, path: $path})
                CREATE (a)-[:TARGETS]->(e)
              `,
              { ...event, ...event.payload }
            );
            break;
          case 'API_CALL':
            await tx.run(
              `
                MERGE (api:APIEndpoint {url: $url, method: $method})
                MATCH (s:Session {id: $sessionId})
                // 找到这个API调用之前的最后一个动作
                OPTIONAL MATCH (s)-[:PERFORMED]->(prevAction)
                WITH s, api, prevAction ORDER BY prevAction.timestamp DESC LIMIT 1
                
                CREATE (call:APICall {id: $id, timestamp: $timestamp, status: $status, duration: $duration, error: $error})
                CREATE (s)-[:TRIGGERED]->(call)
                CREATE (call)-[:CALLS]->(api)
                
                // 如果能找到前一个动作,就建立因果关系
                // 这是一个简化的逻辑,真实世界需要更复杂的关联逻辑
                // 比如通过一个明确的 correlationId
                FOREACH (_ IN CASE WHEN prevAction IS NOT NULL THEN [1] ELSE [] END |
                    CREATE (prevAction)-[:LEADS_TO]->(call)
                )
              `,
              { ...event, ...event.payload }
            );
            break;
          // 此处可以添加更多事件类型的处理逻辑,如 ROUTE_CHANGE, ERROR 等
        }
      }
    });
  } finally {
    await session.close();
  }
}

// --- 启动服务器 ---
app.listen(PORT, () => {
  console.log(`Telemetry server listening on port ${PORT}`);
});

// 优雅关闭
process.on('SIGINT', async () => {
  console.log('Closing connections...');
  await driver.close();
  // 等待 Loki logger 刷新
  await new Promise(resolve => lokiLogger.on('finish', resolve));
  process.exit(0);
});

这个服务器有几个关键设计:

  1. **快速响应 (202 Accepted)**:接收到数据后立即响应,然后异步处理。这避免了客户端因等待后端处理而超时。
  2. 双写模式:数据被分发到两个目的地。Loki 接收全量的、结构化的原始日志,用于搜索和聚合。Neo4j (或任何图数据库) 接收经过建模的关系数据。
  3. 原子性写入:对图数据库的写入操作被包裹在一个事务中。这意味着一个批次中的所有事件要么全部成功写入,要么全部失败回滚,避免了数据不一致的状态。
  4. 连接管理:数据库和日志服务的连接被妥善管理,并在进程退出时优雅关闭。

第三步:数据建模与查询 - 图数据库的威力

这套系统的核心价值在于图数据库。我们将用户旅程建模为一系列节点和关系:

graph TD
    subgraph "Session: session-123"
        A[Action: Click checkout-btn] -->|LEADS_TO| B(APICall: POST /api/orders);
        B -->|LEADS_TO| C(APICall: GET /api/payment/status);
        C -->|FAILED_WITH_STATUS_500| C;
        C -->|LEADS_TO| D[Action: Render ErrorModal];
    end
    
    U(User: user-abc) -->|HAS_SESSION| A;
    A -->|TARGETS| E(UIElement: checkout-btn);
    B -->|CALLS| F(APIEndpoint: /api/orders);
    C -->|CALLS| G(APIEndpoint: /api/payment/status);

有了这样的模型,我们可以提出传统时序数据库难以回答的问题。

查询场景1:找到所有导致支付失败的用户会话

MATCH (s:Session)-[:TRIGGERED]->(call:APICall)-[:CALLS]->(api:APIEndpoint)
WHERE api.url CONTAINS '/api/payment' AND call.status >= 500
// 找到该会话中的所有事件,并按时间排序
WITH s
MATCH (s)-[r]->(event)
RETURN s.id AS sessionId, event.id AS eventId, event.timestamp AS eventTimestamp, labels(event) AS eventType, properties(event) AS eventPayload
ORDER BY eventTimestamp

这个查询能立即返回所有失败会话的完整事件流,让我们可以一步步复盘用户的操作,直到出错的那个 API 调用。

查询场景2:分析某个特定 UI 元素的点击后性能

MATCH (e:UIElement {id: 'checkout-button'})<-[:TARGETS]-(click:Click)-[:LEADS_TO*1..3]->(apiCall:APICall)
// 找到点击 'checkout-button' 后,3步之内触发的所有API调用
RETURN 
    e.id AS element,
    avg(apiCall.duration) AS average_duration,
    percentileCont(apiCall.duration, 0.95) AS p95_duration,
    count(apiCall) AS total_calls

这个查询能精确分析出点击某个按钮后,下游 API 的性能表现。这对于定位由前端交互触发的后端性能瓶颈非常有用。

局限性与未来迭代路径

这套自建管道提供了极大的灵活性和深度洞察力,但它并非没有成本。首先,运维成本是存在的。你需要维护 Express 服务、Loki 集群和图数据库实例。在数据量激增时,对这些组件的性能调优和扩容会成为新的挑战。

其次,客户端采集的准确性是一个持续优化的过程。比如,如何更精确地将 UI 交互和它触发的异步 API 调用关联起来?当前模型中的 LEADS_TO 关系是基于时间的简单推断,一个更健壮的方案是在发起请求时注入一个由交互事件生成的唯一 correlationId,从而建立明确的因果链。

最后,数据的可视化也是关键一环。虽然 Loki 与 Grafana 无缝集成,可以很好地展示原始日志和聚合指标,但图数据的可视化需要额外的工作。可以开发一个简单的内部工具,输入 session ID,就能渲染出该会话的用户旅程图,这将是故障排查的终极利器。这个管道不是一个一劳永逸的解决方案,而是一个可观测性平台的地基,未来可以在其上构建更复杂的异常检测、性能回归分析和业务漏斗分析等高级功能。


  目录