标准的 RUM (Real User Monitoring) 工具总让人觉得少了点什么。它们能告诉我 LCP 是多少、FID 有多长、有没有 JS 错误,但无法回答一个更根本的问题:“用户到底做了什么才导致了这次糟糕的体验?” 孤立的性能指标就像一张张静态的照片,而我需要的是一部完整的、能够回溯因果的电影。当线上出现性能抖动时,我不想只看到一个 API 耗时 5 秒的告警,我想知道是哪个用户,在哪条业务路径下,点击了哪个按钮,才触发了这个缓慢的 API 调用,以及这个调用最终又影响了哪些后续的渲染。
要实现这一点,我们需要捕获的不仅仅是指标,更是事件之间的关系。用户的每一次点击、每一次路由跳转、每一次 API 请求,都构成了一个有向无环图。这个想法催生了一个自建可观测性管道的构想:在客户端利用 Service Worker 捕获高保真事件,通过一个轻量级的 Express.js 网关进行数据接收,将原始日志流推送到 Loki 进行低成本存储与查询,同时,将事件间的关系建模并存入图数据库,用于深度、上下文感知的故障排查。
第一步:设计数据采集器 - 一个不影响性能的 Service Worker
采集端的首要原则是低侵入性和高性能。如果监控本身就拖慢了应用,那就本末倒置了。Service Worker 是这个角色的最佳选择。它独立于主线程,生命周期也与页面无关,即使在页面跳转或关闭的瞬间,它仍然有时间将缓存的事件发送出去。
我们采集的核心是 fetch
事件。拦截所有出站请求,我们就能捕alahkan API 调用的完整生命周期。
// public/sw.js
// 全局唯一的会话ID,在 Service Worker 激活时生成
let sessionId = self.crypto.randomUUID();
let eventQueue = [];
let flushTimeout = null;
const FLUSH_INTERVAL = 5000; // 每5秒批量上报一次
const MAX_QUEUE_SIZE = 100; // 或者队列达到100个事件时上报
const INGESTION_ENDPOINT = '/api/telemetry';
self.addEventListener('install', (event) => {
// 强制新的 Service Worker 立即取代旧的
event.waitUntil(self.skipWaiting());
});
self.addEventListener('activate', (event) => {
// 立即控制所有客户端
event.waitUntil(self.clients.claim().then(() => {
// 当SW激活时,重置sessionId,代表一个新的用户会话开始
sessionId = self.crypto.randomUUID();
console.log(`Service Worker activated. New session ID: ${sessionId}`);
}));
});
// 核心:拦截网络请求
self.addEventListener('fetch', (event) => {
const { request } = event;
// 只监控我们自己的API调用和关键资源,避免采集第三方脚本
if (!request.url.startsWith(self.location.origin) || request.url.includes(INGESTION_ENDPOINT)) {
return;
}
const eventId = self.crypto.randomUUID();
const startTime = performance.now();
const recordFetchEvent = (response) => {
const duration = performance.now() - startTime;
const isError = response instanceof Error;
const status = isError ? 599 : response.status; // 599作为客户端网络错误代码
const eventData = {
type: 'API_CALL',
id: eventId,
sessionId,
timestamp: Date.now(),
payload: {
method: request.method,
url: request.url,
status,
duration: parseFloat(duration.toFixed(2)),
// 在真实项目中,需要谨慎处理header和body,可能包含敏感信息
// requestHeaders: Object.fromEntries(request.headers.entries()),
// responseHeaders: isError ? {} : Object.fromEntries(response.headers.entries()),
error: isError ? response.message : null,
initiator: request.referrer, // 调用来源
},
};
pushToQueue(eventData);
};
event.respondWith(
fetch(request.clone())
.then((response) => {
recordFetchEvent(response.clone());
return response;
})
.catch((err) => {
recordFetchEvent(err);
// 必须抛出错误,否则原始请求将无法感知到失败
throw err;
})
);
});
// 接收来自主线程的消息,例如UI交互事件
self.addEventListener('message', (event) => {
const { data } = event;
if (data && data.type) {
const eventData = {
...data,
id: self.crypto.randomUUID(),
sessionId,
timestamp: Date.now(),
};
pushToQueue(eventData);
}
});
function pushToQueue(eventData) {
eventQueue.push(eventData);
if (eventQueue.length >= MAX_QUEUE_SIZE) {
// 队列满,立即发送
flushQueue();
} else if (!flushTimeout) {
// 队列未满,设置定时器
flushTimeout = setTimeout(flushQueue, FLUSH_INTERVAL);
}
}
async function flushQueue() {
if (flushTimeout) {
clearTimeout(flushTimeout);
flushTimeout = null;
}
if (eventQueue.length === 0) {
return;
}
const batch = [...eventQueue];
eventQueue = [];
try {
// 使用 keepalive 确保在页面卸载时请求也能成功发送
const response = await fetch(INGESTION_ENDPOINT, {
method: 'POST',
headers: {
'Content-Type': 'application/json',
},
body: JSON.stringify(batch),
keepalive: true,
});
if (!response.ok) {
// 在真实项目中,这里需要实现重试机制,例如将失败的batch存入IndexedDB
console.error('Failed to send telemetry data:', response.statusText);
// 将发送失败的数据重新放回队列前端
eventQueue.unshift(...batch);
}
} catch (error) {
console.error('Error sending telemetry data:', error);
eventQueue.unshift(...batch);
}
}
客户端页面需要注册此 Service Worker,并使用 postMessage
发送 UI 事件:
// client-app.js
// 注册 Service Worker
if ('serviceWorker' in navigator) {
window.addEventListener('load', () => {
navigator.serviceWorker.register('/sw.js').then(registration => {
console.log('ServiceWorker registration successful with scope: ', registration.scope);
}).catch(error => {
console.log('ServiceWorker registration failed: ', error);
});
});
}
// 封装一个函数来向SW发送事件
function sendTelemetryEvent(type, payload) {
if (navigator.serviceWorker && navigator.serviceWorker.controller) {
navigator.serviceWorker.controller.postMessage({
type,
payload,
});
}
}
// 示例:监听按钮点击
document.getElementById('checkout-button').addEventListener('click', () => {
sendTelemetryEvent('UI_CLICK', {
elementId: 'checkout-button',
elementText: '确认支付',
path: window.location.pathname,
});
// ... 触发API调用等
});
// 示例:路由变化
window.addEventListener('popstate', () => {
sendTelemetryEvent('ROUTE_CHANGE', {
from: previousPath, // 需要自己维护 previousPath
to: window.location.pathname,
});
});
这里的关键在于批处理(flushQueue
)和 keepalive
标志。批处理避免了为每个事件都发起网络请求,极大地降低了客户端和服务端的负载。keepalive
确保了即使用户关闭了标签页,这批数据的发送请求也会被浏览器继续处理。
第二步:构建数据接收端 - 高吞吐的 Express.js 服务
接收端的角色很简单:接收数据,快速响应,然后异步地将数据分发到下游。Express.js 加上一些优化,完全可以胜任。
// server.js
const express = require('express');
const compression = require('compression');
const neo4j = require('neo4j-driver');
const { createLogger, transports } = require('winston');
const LokiTransport = require('winston-loki');
// --- 配置 ---
const PORT = process.env.PORT || 3001;
const LOKI_HOST = process.env.LOKI_HOST || 'http://localhost:3100';
const NEO4J_URI = process.env.NEO4J_URI || 'bolt://localhost:7687';
const NEO4J_USER = process.env.NEO4J_USER || 'neo4j';
const NEO4J_PASSWORD = process.env.NEO4J_PASSWORD || 'password';
// --- 初始化连接 ---
const app = express();
const driver = neo4j.driver(NEO4J_URI, neo4j.auth.basic(NEO4J_USER, NEO4J_PASSWORD));
const lokiLogger = createLogger({
transports: [
new LokiTransport({
host: LOKI_HOST,
json: true,
labels: { job: 'frontend-telemetry' },
// 避免将所有元数据作为Loki标签,防止索引爆炸
// 在真实项目中,应该只把低基数的维度如 'env', 'app_version' 作为标签
replaceTimestamp: true,
onConnectionError: (err) => console.error('Loki connection error:', err),
}),
],
});
// --- 中间件 ---
// 启用 Gzip 压缩,对于 JSON 负载效果显著
app.use(compression());
// 解析 JSON body,设置一个较大的限制,因为我们是批量接收
app.use(express.json({ limit: '2mb' }));
// --- 核心路由 ---
app.post('/api/telemetry', (req, res) => {
const events = req.body;
if (!Array.isArray(events) || events.length === 0) {
return res.status(400).send('Bad Request: Expected a non-empty array of events.');
}
// 立即响应客户端,表示已接收
res.status(202).send('Accepted');
// 异步处理,不阻塞响应
processEvents(events).catch(err => {
// 这里的日志记录至关重要,用于排查数据处理失败的问题
console.error('Failed to process event batch:', err, 'Batch size:', events.length);
});
});
async function processEvents(events) {
// 1. 将所有原始事件推送到 Loki
events.forEach(event => {
// Loki 的日志消息是字符串,我们将整个事件对象序列化
lokiLogger.info({
message: JSON.stringify(event),
labels: {
sessionId: event.sessionId,
eventType: event.type,
}
});
});
// 2. 将事件关系写入图数据库
const session = driver.session();
try {
// 在一个事务中处理整个批次,保证原子性
await session.executeWrite(async tx => {
for (const event of events) {
// 创建会话节点(如果不存在)
await tx.run(
'MERGE (s:Session {id: $sessionId}) ON CREATE SET s.createdAt = timestamp()',
{ sessionId: event.sessionId }
);
// 根据事件类型创建不同的节点和关系
switch (event.type) {
case 'UI_CLICK':
await tx.run(
`
MERGE (e:UIElement {id: $elementId}) ON CREATE SET e.text = $elementText
MATCH (s:Session {id: $sessionId})
CREATE (s)-[:PERFORMED]->(a:Action:Click {id: $id, timestamp: $timestamp, path: $path})
CREATE (a)-[:TARGETS]->(e)
`,
{ ...event, ...event.payload }
);
break;
case 'API_CALL':
await tx.run(
`
MERGE (api:APIEndpoint {url: $url, method: $method})
MATCH (s:Session {id: $sessionId})
// 找到这个API调用之前的最后一个动作
OPTIONAL MATCH (s)-[:PERFORMED]->(prevAction)
WITH s, api, prevAction ORDER BY prevAction.timestamp DESC LIMIT 1
CREATE (call:APICall {id: $id, timestamp: $timestamp, status: $status, duration: $duration, error: $error})
CREATE (s)-[:TRIGGERED]->(call)
CREATE (call)-[:CALLS]->(api)
// 如果能找到前一个动作,就建立因果关系
// 这是一个简化的逻辑,真实世界需要更复杂的关联逻辑
// 比如通过一个明确的 correlationId
FOREACH (_ IN CASE WHEN prevAction IS NOT NULL THEN [1] ELSE [] END |
CREATE (prevAction)-[:LEADS_TO]->(call)
)
`,
{ ...event, ...event.payload }
);
break;
// 此处可以添加更多事件类型的处理逻辑,如 ROUTE_CHANGE, ERROR 等
}
}
});
} finally {
await session.close();
}
}
// --- 启动服务器 ---
app.listen(PORT, () => {
console.log(`Telemetry server listening on port ${PORT}`);
});
// 优雅关闭
process.on('SIGINT', async () => {
console.log('Closing connections...');
await driver.close();
// 等待 Loki logger 刷新
await new Promise(resolve => lokiLogger.on('finish', resolve));
process.exit(0);
});
这个服务器有几个关键设计:
- **快速响应 (
202 Accepted
)**:接收到数据后立即响应,然后异步处理。这避免了客户端因等待后端处理而超时。 - 双写模式:数据被分发到两个目的地。Loki 接收全量的、结构化的原始日志,用于搜索和聚合。Neo4j (或任何图数据库) 接收经过建模的关系数据。
- 原子性写入:对图数据库的写入操作被包裹在一个事务中。这意味着一个批次中的所有事件要么全部成功写入,要么全部失败回滚,避免了数据不一致的状态。
- 连接管理:数据库和日志服务的连接被妥善管理,并在进程退出时优雅关闭。
第三步:数据建模与查询 - 图数据库的威力
这套系统的核心价值在于图数据库。我们将用户旅程建模为一系列节点和关系:
graph TD subgraph "Session: session-123" A[Action: Click checkout-btn] -->|LEADS_TO| B(APICall: POST /api/orders); B -->|LEADS_TO| C(APICall: GET /api/payment/status); C -->|FAILED_WITH_STATUS_500| C; C -->|LEADS_TO| D[Action: Render ErrorModal]; end U(User: user-abc) -->|HAS_SESSION| A; A -->|TARGETS| E(UIElement: checkout-btn); B -->|CALLS| F(APIEndpoint: /api/orders); C -->|CALLS| G(APIEndpoint: /api/payment/status);
有了这样的模型,我们可以提出传统时序数据库难以回答的问题。
查询场景1:找到所有导致支付失败的用户会话
MATCH (s:Session)-[:TRIGGERED]->(call:APICall)-[:CALLS]->(api:APIEndpoint)
WHERE api.url CONTAINS '/api/payment' AND call.status >= 500
// 找到该会话中的所有事件,并按时间排序
WITH s
MATCH (s)-[r]->(event)
RETURN s.id AS sessionId, event.id AS eventId, event.timestamp AS eventTimestamp, labels(event) AS eventType, properties(event) AS eventPayload
ORDER BY eventTimestamp
这个查询能立即返回所有失败会话的完整事件流,让我们可以一步步复盘用户的操作,直到出错的那个 API 调用。
查询场景2:分析某个特定 UI 元素的点击后性能
MATCH (e:UIElement {id: 'checkout-button'})<-[:TARGETS]-(click:Click)-[:LEADS_TO*1..3]->(apiCall:APICall)
// 找到点击 'checkout-button' 后,3步之内触发的所有API调用
RETURN
e.id AS element,
avg(apiCall.duration) AS average_duration,
percentileCont(apiCall.duration, 0.95) AS p95_duration,
count(apiCall) AS total_calls
这个查询能精确分析出点击某个按钮后,下游 API 的性能表现。这对于定位由前端交互触发的后端性能瓶颈非常有用。
局限性与未来迭代路径
这套自建管道提供了极大的灵活性和深度洞察力,但它并非没有成本。首先,运维成本是存在的。你需要维护 Express 服务、Loki 集群和图数据库实例。在数据量激增时,对这些组件的性能调优和扩容会成为新的挑战。
其次,客户端采集的准确性是一个持续优化的过程。比如,如何更精确地将 UI 交互和它触发的异步 API 调用关联起来?当前模型中的 LEADS_TO
关系是基于时间的简单推断,一个更健壮的方案是在发起请求时注入一个由交互事件生成的唯一 correlationId
,从而建立明确的因果链。
最后,数据的可视化也是关键一环。虽然 Loki 与 Grafana 无缝集成,可以很好地展示原始日志和聚合指标,但图数据的可视化需要额外的工作。可以开发一个简单的内部工具,输入 session ID,就能渲染出该会话的用户旅程图,这将是故障排查的终极利器。这个管道不是一个一劳永逸的解决方案,而是一个可观测性平台的地基,未来可以在其上构建更复杂的异常检测、性能回归分析和业务漏斗分析等高级功能。