构建一个基于 Express 与 WebSockets 的可测试实时监控仪表盘


技术痛点:从轮询到实时推送的必然与挑战

在真实项目中,我们需要频繁地获取服务器的状态信息,比如 CPU 负载、内存使用率。最原始的方案是前端定时轮询(Polling)。但这种方式存在两个致命缺陷:首先是延迟,如果轮询间隔是5秒,那么数据的实时性最多会延迟5秒;其次是资源浪费,无论服务器状态是否变化,HTTP请求都会照常发出,对服务器和网络都造成不必要的压力。

WebSockets 是解决这个问题的标准答案。它提供了一个全双工的通信通道,允许服务器主动向客户端推送数据。然而,从一个简单的“Hello, WebSocket”示例到一个生产可用的实时系统,中间隔着一道鸿沟。我们需要解决的问题包括:如何将 WebSocket 服务优雅地集成到现有的 Express 应用中?如何设计一个健壮的数据推送服务?前端如何高效地处理和渲染这些实时数据流?以及最关键的——如何为这样一个动态、异步的系统编写可靠的自动化测试?

这个复盘日志将记录从零开始构建一个可测试、基于 Express、WebSockets 和 Chakra UI 的实时服务器监控仪表盘的全过程。

初步构想与架构设计

我们的目标是创建一个单页应用,实时显示服务器的 CPU 和内存使用情况。

  1. 后端 (Backend):

    • 使用 Express.js 作为基础 Web 框架,它将提供静态文件服务(我们的React前端应用)和一个 WebSocket 端点。
    • 使用 ws 库,这是 Node.js 生态中最流行和高性能的 WebSocket 实现。
    • 创建一个 MetricService,负责定期采集系统性能指标,并通过 WebSocket 广播给所有连接的客户端。
  2. 前端 (Frontend):

    • 使用 ReactVite 构建。
    • 使用 Chakra UI 来快速构建一个美观、响应式的仪表盘界面。Chakra 的组件化和样式系统能极大地提升开发效率。
    • 使用 recharts 库来将实时数据可视化为平滑的折线图。
  3. 测试 (Testing):

    • 这是项目的核心挑战。单元测试可以覆盖 MetricService 的逻辑,但这远远不够。我们需要验证从后端数据产生到前端UI更新的整个链路。因此,端到端(E2E)测试是必须的。
    • 选择 Playwright 作为E2E测试框架。它能以无头模式启动一个真实的浏览器,访问我们的应用,并验证DOM元素是否根据 WebSocket 推送的数据正确更新。

下面是整个系统的架构图:

graph TD
    subgraph "Browser"
        D(React App) --> E(Chakra UI Components);
        E --> F(Recharts Visualization);
        D -- "Manages Connection" --> G(WebSocket Client);
    end

    subgraph "Server (Node.js)"
        A[Express Server] -- "Serves Static Files" --> D;
        A -- "Upgrades Connection" --> B(WebSocket Server);
        C(MetricService) -- "Broadcasts Data" --> B;
        C -- "Collects os.freemem(), os.cpus()" --> H{System OS};
    end
    
    subgraph "E2E Testing"
        I[Playwright Test Runner] -- "Launches & Controls" --> A;
        I -- "Interacts with & Asserts" --> D;
    end

    G <.-> B;

步骤化实现:代码是最好的文档

1. 后端服务搭建

首先,初始化一个Node.js项目并安装必要的依赖:

npm init -y
npm install express ws os-utils

接下来是核心的服务端代码 server.js。这里的关键在于将 ws 服务附加到 Express 创建的 http.Server 实例上。

// server.js

const express = require('express');
const http = require('http');
const path = require('path');
const { WebSocketServer } = require('ws');
const os = require('os-utils');

const app = express();
const port = process.env.PORT || 3001;

// 服务前端静态文件
app.use(express.static(path.join(__dirname, 'client/dist')));

const server = http.createServer(app);
const wss = new WebSocketServer({ server });

// -------------------------------------------------------------
// MetricService: 负责收集和广播系统指标
// 在真实项目中, 这应该是一个独立的、可测试的模块
// -------------------------------------------------------------
class MetricService {
    constructor(webSocketServer) {
        this.wss = webSocketServer;
        this.intervalId = null;
        // 使用 Set 来存储客户端,确保唯一性且易于增删
        this.clients = new Set(); 
    }

    start() {
        console.log('Metric service started. Broadcasting every 2 seconds...');
        this.wss.on('connection', (ws) => {
            this.clients.add(ws);
            console.log(`Client connected. Total clients: ${this.clients.size}`);

            ws.on('close', () => {
                this.clients.delete(ws);
                console.log(`Client disconnected. Total clients: ${this.clients.size}`);
            });

            ws.on('error', (error) => {
                console.error('WebSocket error:', error);
                this.clients.delete(ws); // 出错时也移除
            });
        });

        // 每2秒采集一次数据
        this.intervalId = setInterval(() => {
            this.collectAndBroadcastMetrics();
        }, 2000);
    }

    stop() {
        if (this.intervalId) {
            clearInterval(this.intervalId);
            this.intervalId = null;
            console.log('Metric service stopped.');
        }
    }

    collectAndBroadcastMetrics() {
        // 使用 Promise.all 并行获取异步数据
        Promise.all([
            this.getCpuUsage(),
            this.getMemoryUsage()
        ]).then(([cpuUsage, memoryUsage]) => {
            const payload = JSON.stringify({
                cpu: (cpuUsage * 100).toFixed(2),
                memory: memoryUsage.toFixed(2),
                timestamp: new Date().toISOString(),
            });

            // 广播给所有连接的客户端
            if (this.clients.size > 0) {
                // console.log(`Broadcasting metrics to ${this.clients.size} clients: ${payload}`);
                this.clients.forEach(client => {
                    // 确保客户端仍然处于连接状态
                    if (client.readyState === client.OPEN) {
                        client.send(payload);
                    }
                });
            }
        }).catch(err => {
            console.error('Error collecting metrics:', err);
        });
    }

    // os.cpuUsage 是异步的,需要封装成 Promise
    getCpuUsage() {
        return new Promise(resolve => {
            os.cpuUsage(v => resolve(v));
        });
    }

    getMemoryUsage() {
        const totalMem = os.totalmem();
        const freeMem = os.freemem();
        const usedMem = totalMem - freeMem;
        return Promise.resolve((usedMem / totalMem) * 100);
    }
}

// 实例化并启动服务
const metricService = new MetricService(wss);
metricService.start();


server.listen(port, () => {
    console.log(`Server is listening on http://localhost:${port}`);
});

// 优雅地关闭
process.on('SIGINT', () => {
    console.log('Shutting down server...');
    metricService.stop();
    wss.close(() => {
        server.close(() => {
            console.log('Server shut down.');
            process.exit(0);
        });
    });
});

这份代码包含了基本的错误处理、连接管理和优雅停机逻辑,这在生产环境中至关重要。

2. 前端仪表盘构建

我们使用 Vite 初始化一个 React + TypeScript 项目,并安装前端依赖。

npm create vite@latest client -- --template react-ts
cd client
npm install chakra-ui @chakra-ui/react @emotion/react @emotion/styled framer-motion recharts

核心逻辑是创建一个自定义 Hook useWebSocketMetrics 来封装 WebSocket 的所有交互,使组件代码保持干净。

// client/src/hooks/useWebSocketMetrics.ts

import { useState, useEffect, useRef } from 'react';

export interface MetricData {
    cpu: string;
    memory: string;
    timestamp: string;
}

const MAX_DATA_POINTS = 30; // 图表上最多显示30个数据点

export const useWebSocketMetrics = (url: string) => {
    const [data, setData] = useState<MetricData[]>([]);
    const [isConnected, setIsConnected] = useState(false);
    const ws = useRef<WebSocket | null>(null);

    useEffect(() => {
        // 防止在开发模式下因 StrictMode 导致重复连接
        if (ws.current) return;

        ws.current = new WebSocket(url);

        ws.current.onopen = () => {
            console.log('WebSocket connected');
            setIsConnected(true);
        };

        ws.current.onclose = () => {
            console.log('WebSocket disconnected');
            setIsConnected(false);
        };

        ws.current.onerror = (error) => {
            console.error('WebSocket error:', error);
            setIsConnected(false);
        };

        ws.current.onmessage = (event) => {
            try {
                const metric: MetricData = JSON.parse(event.data);
                // 使用函数式更新,避免依赖旧的 data state
                setData(prevData => {
                    const newData = [...prevData, metric];
                    // 维持队列长度,防止内存无限增长
                    if (newData.length > MAX_DATA_POINTS) {
                        return newData.slice(newData.length - MAX_DATA_POINTS);
                    }
                    return newData;
                });
            } catch (error) {
                console.error('Failed to parse message:', event.data);
            }
        };

        // 清理函数:组件卸载时关闭连接
        return () => {
            if (ws.current && ws.current.readyState === WebSocket.OPEN) {
                ws.current.close();
            }
        };
    }, [url]);

    return { data, isConnected };
};

这个 Hook 考虑了连接状态管理、数据队列长度限制(防止浏览器内存溢出)和组件卸载时的资源清理。

然后,我们用 Chakra UI 和 Recharts 构建仪表盘组件。

// client/src/components/Dashboard.tsx

import { Box, Grid, GridItem, Heading, Stat, StatLabel, StatNumber, StatHelpText, useColorModeValue, Tag } from '@chakra-ui/react';
import { LineChart, Line, XAxis, YAxis, CartesianGrid, Tooltip, Legend, ResponsiveContainer } from 'recharts';
import { useWebSocketMetrics, MetricData } from '../hooks/useWebSocketMetrics';

const formatTimestamp = (timestamp: string) => {
    return new Date(timestamp).toLocaleTimeString();
};

const Dashboard = () => {
    const wsUrl = `ws://${window.location.host}`;
    const { data, isConnected } = useWebSocketMetrics(wsUrl);

    // 获取最新的数据点用于 Stat 显示
    const latestData: MetricData | undefined = data.length > 0 ? data[data.length - 1] : undefined;

    const chartBg = useColorModeValue('gray.50', 'gray.700');
    const textColor = useColorModeValue('gray.600', 'gray.200');

    return (
        <Box p={8} maxW="1200px" mx="auto">
            <Grid templateColumns="repeat(12, 1fr)" gap={6} alignItems="center" mb={8}>
                <GridItem colSpan={{ base: 12, md: 8 }}>
                    <Heading as="h1" size="xl">Server Performance Dashboard</Heading>
                </GridItem>
                <GridItem colSpan={{ base: 12, md: 4 }} justifySelf={{ base: 'start', md: 'end' }}>
                    <Tag size="lg" colorScheme={isConnected ? 'green' : 'red'}>
                        {isConnected ? 'Connected' : 'Disconnected'}
                    </Tag>
                </GridItem>
            </Grid>

            <Grid templateColumns={{ base: '1fr', md: '1fr 1fr' }} gap={6} mb={8}>
                <Stat p={5} borderWidth="1px" borderRadius="md">
                    <StatLabel>CPU Usage</StatLabel>
                    <StatNumber data-testid="cpu-stat">{latestData ? `${latestData.cpu}%` : 'N/A'}</StatNumber>
                    <StatHelpText>Real-time</StatHelpText>
                </Stat>
                <Stat p={5} borderWidth="1px" borderRadius="md">
                    <StatLabel>Memory Usage</StatLabel>
                    <StatNumber data-testid="memory-stat">{latestData ? `${latestData.memory}%` : 'N/A'}</StatNumber>
                    <StatHelpText>Real-time</StatHelpText>
                </Stat>
            </Grid>

            <Box p={5} borderWidth="1px" borderRadius="md" bg={chartBg} h="400px">
                <ResponsiveContainer width="100%" height="100%">
                    <LineChart data={data}>
                        <CartesianGrid strokeDasharray="3 3" />
                        <XAxis dataKey="timestamp" tickFormatter={formatTimestamp} stroke={textColor}/>
                        <YAxis unit="%" domain={[0, 100]} stroke={textColor}/>
                        <Tooltip />
                        <Legend />
                        <Line type="monotone" dataKey="cpu" stroke="#8884d8" name="CPU Usage (%)" isAnimationActive={false} />
                        <Line type="monotone" dataKey="memory" stroke="#82ca9d" name="Memory Usage (%)" isAnimationActive={false} />
                    </LineChart>
                </ResponsiveContainer>
            </Box>
        </Box>
    );
};

export default Dashboard;

3. 编写端到端测试

这是验证系统是否按预期工作的关键环节。安装 Playwright:

npm install -D @playwright/test
npx playwright install

我们创建一个测试文件 e2e/monitoring.spec.ts

// e2e/monitoring.spec.ts

import { test, expect, Page } from '@playwright/test';
import { spawn, ChildProcess } from 'child_process';

const SERVER_URL = 'http://localhost:3001';

// 使用一个辅助函数来包装服务器的启动和关闭
const withServer = (fn: (server: ChildProcess) => Promise<void>) => async () => {
    // 启动服务器进程。这里的路径需要根据你的项目结构调整
    const serverProcess = spawn('node', ['server.js'], {
        env: { ...process.env, PORT: '3001' },
        stdio: 'pipe', // 我们可以捕获 stdout/stderr
    });

    // 监听服务器输出,确保它已经启动
    await new Promise<void>((resolve, reject) => {
        serverProcess.stdout.on('data', (data) => {
            const output = data.toString();
            console.log(`[Server STDOUT]: ${output}`);
            if (output.includes('Server is listening on http://localhost:3001')) {
                resolve();
            }
        });
        serverProcess.stderr.on('data', (data) => {
            console.error(`[Server STDERR]: ${data}`);
            reject(new Error('Server failed to start'));
        });
    });

    try {
        await fn(serverProcess);
    } finally {
        // 测试结束后,确保服务器进程被杀死
        serverProcess.kill('SIGINT');
        console.log('Server process killed.');
    }
};

// 辅助函数,用于等待Stat组件的文本内容变化
async function waitForStatChange(page: Page, testId: string, initialValue: string | RegExp) {
    const locator = page.getByTestId(testId);
    // 等待文本内容不再是初始值 'N/A'
    await expect(locator).not.toHaveText(initialValue, { timeout: 10000 }); 
    const firstValue = await locator.textContent();
    console.log(`Initial value for ${testId} received: ${firstValue}`);
    
    // 再次等待,确保它会接收到第二次更新
    // 这是一个验证数据流持续性的关键断言
    await expect(async () => {
        const currentValue = await locator.textContent();
        expect(currentValue).not.toEqual(firstValue);
    }).toPass({ timeout: 10000 });

    const secondValue = await locator.textContent();
    console.log(`Second value for ${testId} received: ${secondValue}`);
}


test.describe('Real-time Monitoring Dashboard', () => {
    test('should connect to WebSocket and display real-time metric updates', withServer(async () => {
        const { chromium } = require('@playwright/test');
        const browser = await chromium.launch();
        const page = await browser.newPage();
        
        await page.goto(SERVER_URL);

        // 1. 验证初始状态
        await expect(page.getByText('Server Performance Dashboard')).toBeVisible();
        
        // 2. 验证 WebSocket 连接状态
        const connectionTag = page.getByText('Connected');
        await expect(connectionTag).toBeVisible({ timeout: 5000 }); // 给连接建立留出时间

        // 3. 验证实时数据更新
        // 这里的挑战在于断言一个动态变化的值。
        // 我们不能断言它等于某个具体值,而应该断言它从初始状态'N/A'发生了变化
        // 并且在一段时间后再次变化。
        await waitForStatChange(page, 'cpu-stat', 'N/A');
        await waitForStatChange(page, 'memory-stat', 'N/A');

        // 4. (可选)验证图表是否渲染
        // 由于图表是SVG,我们可以检查是否存在特定的SVG元素
        await expect(page.locator('.recharts-surface')).toBeVisible();
        await expect(page.getByText('CPU Usage (%)')).toBeVisible();

        await browser.close();
    }));
});

这个测试脚本有几个关键点:

  • 进程管理: 在测试开始前以子进程方式启动服务器,并在测试结束后可靠地关闭它。
  • 异步断言: waitForStatChange 是核心。它不检查具体数值,而是检查数值是否按预期从一个状态变为另一个状态,完美地契合了测试实时数据流的场景。一个常见的错误是只检查一次变化,但我们的实现检查了两次,以确保数据流是持续的,而不是一次性的。
  • 超时: 为异步操作(如WebSocket连接、数据接收)设置合理的超时是保证测试稳定性的必要条件。

遗留问题与未来迭代

尽管当前方案已经构成了一个完整的、可测试的系统,但在生产环境中,它还存在一些局限性,可以作为未来的迭代方向:

  1. 广播效率与扩展性: 当前的实现是将数据广播给所有客户端。当连接数成百上千时,这会给服务器造成巨大负担。可以引入 Redis Pub/Sub 或专门的消息队列(如 RabbitMQ)来解耦指标采集和消息推送,并支持更复杂的主题订阅模式,例如只向关心CPU指标的客户端推送CPU数据。

  2. 前端性能: 如果数据推送频率非常高(例如每秒10次),React 的频繁重渲染可能会成为瓶颈。可以引入 useMemoReact.memo 进行优化,或者在更极端的情况下,绕过 React 直接操作 Canvas 来绘制图表,但这会大大增加复杂性。

  3. 数据的持久化与历史查询: 当前系统只展示实时数据。一个完整的监控系统需要将指标数据存入时序数据库(Time Series Database, 如 Prometheus 或 InfluxDB),以便进行历史数据查询、趋势分析和告警。

  4. WebSocket 连接的韧性: 当前端在网络波动中断开连接后,不会自动重连。在生产级的 useWebSocket Hook 中,需要实现一套带有指数退避策略的自动重连机制,以提升用户体验。


  目录