基于向量嵌入与服务发现构建可自我诊断的分布式Playwright测试架构


E2E测试失败的根因分析,往往比编写测试本身更耗时。当CI流水线因为一个像素级的UI偏移而全线飘红时,整个团队的效率都会被拖累。尤其是在大型项目中,视觉回归测试失败的截图堆积如山,其中大量是重复的、已知的渲染问题。我们面临的挑战并非是无法修复这些bug,而是在修复前,如何快速、准确地对海量失败报告进行分类和去重。

传统的像素比对(pixel-diff)过于脆弱,任何抗锯齿、字体渲染的微小差异都会导致误判。我们需要一个能“理解”图像内容的系统。这自然引向了向量嵌入技术。初步构想是:当Playwright测试断言失败并捕获截图时,我们不存储图片本身,而是通过一个视觉模型(如CLIP)将其转换为一个高维向量。相似的视觉问题会产生在向量空间中距离相近的向量。这样,一个新的失败可以通过查询向量数据库,迅速找到与之最相似的历史失败记录。

要实现这个构想,单机的Playwright执行环境远远不够。我们需要一个可扩展的、弹性的测试执行集群。这些执行节点(Worker)必须是无状态的,可以动态增减。这就引入了第二个核心问题:一个中心调度器(Coordinator)如何知道哪些Worker是可用的、健康的?这正是服务发现(Service Discovery)要解决的问题。

于是,一个结合了Playwright服务发现向量嵌入的架构蓝图逐渐清晰:

  1. Playwright Worker: 一个独立的、可执行Playwright任务的gRPC服务。
  2. 服务发现 (Consul): 每个Worker启动时向Consul注册自己,并维持心跳。Coordinator通过查询Consul获取可用的Worker列表。
  3. 向量数据库 (Qdrant): 存储所有已知失败截图的向量嵌入,并提供高速相似度检索。
  4. Coordinator: 接收测试任务,从Consul发现Worker,分发任务,并在任务失败时,调用分析模块处理截图,与Qdrant交互,最终输出带有“智能诊断”的测试报告。

这是一个典型的分布式系统,关注点在于服务的弹性和智能化处理。

graph TD
    subgraph "CI/CD Pipeline"
        A[Trigger Test Run]
    end

    subgraph "Coordinator Service"
        A -- gRPC Request --> B{Test Job Queue}
        B --> C[Job Dispatcher]
        C -- 1. Query available workers --> D[Consul Client]
        D -- 2. Returns healthy worker list --> C
        C -- 3. Dispatches job via gRPC --> E[Playwright Worker gRPC Client]
    end
    
    subgraph "Service Discovery"
        F[Consul Server]
    end

    subgraph "Vector Analysis Pipeline"
        G[Qdrant DB]
        H[Embedding Service]
    end

    subgraph "Playwright Worker Pool (Dynamic)"
        W1[Worker 1: gRPC Server + Playwright]
        W2[Worker 2: gRPC Server + Playwright]
        W3[Worker 3: gRPC Server + Playwright]
        
        W1 -- Registers/Health Check --> F
        W2 -- Registers/Health Check --> F
        W3 -- Registers/Health Check --> F
    end

    E -- 4. Executes test on worker --> W2
    W2 -- 5. Test Fails, returns screenshot bytes --> C
    C -- 6. Screenshot --> H
    H -- 7. Generates Vector Embedding --> C
    C -- 8. Search for similar vectors --> G
    G -- 9. Returns nearest neighbors --> C
    C -- 10. Augments test report --> R[Final Report]
    R -- "Failure: Similar to JIRA-123 (95%)" --> A

第一步:定义和实现Playwright Worker gRPC服务

首先需要将Playwright的执行能力封装成一个标准化的远程服务。HTTP/REST在这种内部长连接、高性能通信场景下显得过于笨重,gRPC是更合适的选择。它基于HTTP/2,使用Protocol Buffers进行序列化,性能和跨语言支持都非常出色。

我们的服务定义很简单,一个RunTest方法,接收测试URL和一些参数,返回测试结果。如果失败,TestResult会包含截图的字节流。

proto/worker.proto

syntax = "proto3";

package playwright_worker;

service PlaywrightWorker {
  rpc RunTest(TestRequest) returns (TestResult) {}
}

message TestRequest {
  string test_id = 1;
  string target_url = 2;
  // 可以扩展更多参数,如 viewport, browser type 等
  int32 viewport_width = 3;
  int32 viewport_height = 4;
}

message TestResult {
  string test_id = 1;
  bool success = 2;
  string message = 3;
  bytes screenshot_on_failure = 4; // 关键字段
  int64 duration_ms = 5;
}

接下来是Node.js/TypeScript的服务端实现。这里的关键点不仅是调用Playwright,还包括服务的生命周期管理和与Consul的集成。

worker/src/server.ts

import * as grpc from '@grpc/grpc-js';
import * as protoLoader from '@grpc/proto-loader';
import { chromium, Page } from 'playwright';
import { ConsulService } from './consul.service';
import { randomUUID } from 'crypto';

const PROTO_PATH = __dirname + '/../../proto/worker.proto';
const SERVICE_ID = `playwright-worker-${randomUUID()}`;
const SERVICE_NAME = 'playwright-worker';
const SERVICE_PORT = 50051;

const packageDefinition = protoLoader.loadSync(PROTO_PATH, {
  keepCase: true,
  longs: String,
  enums: String,
  defaults: true,
  oneofs: true,
});
const workerProto = grpc.loadPackageDefinition(packageDefinition).playwright_worker as any;

async function executePlaywrightTest(targetUrl: string, width: number, height: number): Promise<{ success: boolean; message: string; screenshot?: Buffer }> {
  const browser = await chromium.launch();
  try {
    const context = await browser.newContext({ viewport: { width, height } });
    const page: Page = await context.newPage();
    await page.goto(targetUrl, { waitUntil: 'networkidle' });

    // 这是一个示例性的断言,真实项目中会是更复杂的测试脚本
    // 比如检查某个元素是否存在、内容是否正确等
    const title = await page.title();
    if (!title.includes('Expected Title')) {
      const screenshot = await page.screenshot();
      return {
        success: false,
        message: `Assertion failed: Title was "${title}"`,
        screenshot: screenshot,
      };
    }
    
    // 更多测试步骤...

    await browser.close();
    return { success: true, message: 'All assertions passed.' };
  } catch (error) {
    if (error instanceof Error) {
        // 尝试捕获截图,即使在异常情况下
        const page = (await browser.contexts()[0]?.pages())?.[0];
        const screenshot = page ? await page.screenshot() : undefined;
        await browser.close();
        return { success: false, message: error.message, screenshot };
    }
    await browser.close();
    return { success: false, message: 'An unknown error occurred' };
  }
}

const runTest = async (call: any, callback: any) => {
  const { test_id, target_url, viewport_width, viewport_height } = call.request;
  console.log(`[${test_id}] Received test request for URL: ${target_url}`);
  const startTime = Date.now();

  try {
    const result = await executePlaywrightTest(target_url, viewport_width, viewport_height);
    const duration = Date.now() - startTime;
    
    callback(null, {
      test_id: test_id,
      success: result.success,
      message: result.message,
      screenshot_on_failure: result.screenshot,
      duration_ms: duration
    });
    console.log(`[${test_id}] Test finished in ${duration}ms. Success: ${result.success}`);
  } catch (e: any) {
    const duration = Date.now() - startTime;
    callback({
      code: grpc.status.INTERNAL,
      details: e.message
    }, null);
    console.error(`[${test_id}] Test failed with internal error: ${e.message}`);
  }
};

function main() {
  const server = new grpc.Server();
  server.addService(workerProto.PlaywrightWorker.service, { RunTest: runTest });

  const consulService = new ConsulService({
    serviceId: SERVICE_ID,
    serviceName: SERVICE_NAME,
    servicePort: SERVICE_PORT,
    // 在真实项目中,这里应该指向 Consul agent 的地址
    consulHost: 'localhost' 
  });

  server.bindAsync(`0.0.0.0:${SERVICE_PORT}`, grpc.ServerCredentials.createInsecure(), (err, port) => {
    if (err) {
      console.error(`Server bind failed: ${err}`);
      return;
    }
    server.start();
    console.log(`gRPC server listening on port ${port}`);

    // 启动后立即注册到 Consul
    consulService.register().catch(console.error);

    // 优雅停机处理
    process.on('SIGINT', () => {
      console.log('Received SIGINT. Shutting down gracefully...');
      consulService.deregister().then(() => {
        server.tryShutdown(() => {
          console.log('gRPC server shut down.');
          process.exit(0);
        });
      }).catch(e => {
        console.error('Failed to deregister from Consul:', e);
        process.exit(1);
      });
    });
  });
}

main();

第二步:集成服务发现 (Consul)

Worker必须能被Coordinator找到。硬编码地址是灾难性的,尤其是在一个Worker数量会根据负载动态变化的云环境中。Consul提供了服务注册、发现和健康检查的能力。

上面的server.ts中已经引用了ConsulService,现在来实现它。这个类封装了与Consul Agent交互的逻辑。

worker/src/consul.service.ts

import Consul from 'consul';

interface ConsulConfig {
  serviceName: string;
  serviceId: string;
  servicePort: number;
  consulHost: string;
  consulPort?: number;
}

export class ConsulService {
  private consul: Consul.Consul;
  private readonly serviceName: string;
  private readonly serviceId: string;
  private readonly servicePort: number;

  constructor(config: ConsulConfig) {
    this.consul = new Consul({
      host: config.consulHost,
      port: (config.consulPort || 8500).toString(),
      promisify: true, // 使用 Promise API
    });
    this.serviceName = config.serviceName;
    this.serviceId = config.serviceId;
    this.servicePort = config.servicePort;
  }

  public async register(): Promise<void> {
    const serviceDefinition = {
      id: this.serviceId,
      name: this.serviceName,
      port: this.servicePort,
      // tags: ['grpc', 'playwright', 'v1'], // 可以添加标签用于更精细的服务发现
      check: {
        // Consul 会定期调用这个 gRPC 健康检查接口
        grpc: `localhost:${this.servicePort}`,
        interval: '10s',
        timeout: '5s',
        // 如果连续3次检查失败,服务将被标记为不健康
        deregister_critical_service_after: '30s',
      },
    };

    try {
      await this.consul.agent.service.register(serviceDefinition);
      console.log(`Service [${this.serviceId}] registered with Consul.`);
    } catch (error) {
      console.error(`Failed to register service with Consul:`, error);
      throw error;
    }
  }

  public async deregister(): Promise<void> {
    try {
      await this.consul.agent.service.deregister({id: this.serviceId});
      console.log(`Service [${this.serviceId}] deregistered from Consul.`);
    } catch (error) {
      console.error(`Failed to deregister service from Consul:`, error);
      throw error;
    }
  }
}

这里的坑在于健康检查。gRPC有标准的健康检查协议。我们需要在Worker中实现这个协议,否则Consul无法判断服务是否真的可用。幸运的是,@grpc/grpc-js 提供了现成的实现。我们需要修改server.ts来添加健康检查服务。

第三步:Coordinator的实现与服务发现

Coordinator是整个系统的大脑。它的核心职责是:

  1. 从Consul发现健康的Worker节点。
  2. 维护一个到这些节点的gRPC客户端连接池。
  3. 从任务队列中取出任务,选择一个空闲的Worker执行。
  4. 处理测试结果,特别是失败时的截图。

coordinator/src/coordinator.ts

import * as grpc from '@grpc/grpc-js';
import * as protoLoader from '@grpc/proto-loader';
import Consul from 'consul';
import { EmbeddingService } from './embedding.service';
import { VectorDBService } from './vectordb.service';

const PROTO_PATH = __dirname + '/../../proto/worker.proto';

const packageDefinition = protoLoader.loadSync(PROTO_PATH, { /* ... options ... */ });
const workerProto = grpc.loadPackageDefinition(packageDefinition).playwright_worker as any;

class Coordinator {
  private consul: Consul.Consul;
  private availableWorkers: Map<string, any> = new Map(); // Map<serviceId, gRPCClient>
  private embeddingService: EmbeddingService;
  private vectorDBService: VectorDBService;

  constructor() {
    this.consul = new Consul({ host: 'localhost', port: '8500', promisify: true });
    this.embeddingService = new EmbeddingService(); // 封装了调用视觉模型的逻辑
    this.vectorDBService = new VectorDBService('failed_tests_collection'); // 封装了与 Qdrant 的交互
    this.watchWorkers();
  }

  private watchWorkers() {
    const watch = this.consul.watch({
      method: this.consul.health.service,
      options: {
        service: 'playwright-worker',
        passing: true, // 只关注健康的服务实例
      },
    });

    watch.on('change', (data) => {
      console.log('Worker pool changed:', data.length, 'workers available.');
      const currentWorkerIds = new Set(data.map(entry => entry.Service.ID));
      
      // Add new workers
      for (const entry of data) {
        const service = entry.Service;
        if (!this.availableWorkers.has(service.ID)) {
          const address = `${service.Address}:${service.Port}`;
          const client = new workerProto.PlaywrightWorker(address, grpc.credentials.createInsecure());
          this.availableWorkers.set(service.ID, client);
          console.log(`Added worker: ${service.ID} at ${address}`);
        }
      }

      // Remove dead workers
      for (const workerId of this.availableWorkers.keys()) {
        if (!currentWorkerIds.has(workerId)) {
          // 在真实项目中,需要确保没有正在进行的任务再关闭连接
          // client.close(); 
          this.availableWorkers.delete(workerId);
          console.log(`Removed worker: ${workerId}`);
        }
      }
    });

    watch.on('error', (err) => {
      console.error('Consul watch error:', err);
    });
  }

  public async submitTest(targetUrl: string) {
    if (this.availableWorkers.size === 0) {
      console.error('No available workers to run the test.');
      return;
    }

    // 简单的轮询调度策略
    const workerIds = Array.from(this.availableWorkers.keys());
    const workerId = workerIds[Math.floor(Math.random() * workerIds.length)];
    const client = this.availableWorkers.get(workerId);
    
    const request = {
      test_id: `test-${Date.now()}`,
      target_url: targetUrl,
      viewport_width: 1920,
      viewport_height: 1080,
    };

    console.log(`Submitting test [${request.test_id}] to worker [${workerId}]`);
    client.RunTest(request, async (err: any, result: any) => {
      if (err) {
        console.error(`gRPC call failed for test [${request.test_id}]:`, err.details);
        return;
      }

      if (!result.success) {
        console.log(`Test [${result.test_id}] failed: ${result.message}`);
        if (result.screenshot_on_failure) {
          await this.analyzeFailure(result.screenshot_on_failure);
        }
      } else {
        console.log(`Test [${result.test_id}] passed.`);
      }
    });
  }
  
  private async analyzeFailure(screenshot: Buffer) {
      try {
        console.log("Analyzing failure screenshot...");
        const vector = await this.embeddingService.generateEmbedding(screenshot);

        const searchResult = await this.vectorDBService.searchSimilar(vector, 0.95); // 0.95 a high similarity threshold
        
        if (searchResult && searchResult.length > 0) {
            const mostSimilar = searchResult[0];
            console.log(`FAILURE DIAGNOSIS: This failure is ${Math.round(mostSimilar.score * 100)}% similar to a known issue.`);
            console.log(`Known issue payload:`, mostSimilar.payload);
        } else {
            console.log("FAILURE DIAGNOSIS: This appears to be a new, unique visual failure. Indexing for future reference.");
            await this.vectorDBService.indexNewFailure(vector, {
                timestamp: new Date().toISOString(),
                // 在真实项目中,这里会有关联的 commit hash, test name 等元数据
                source: 'CI build #5678' 
            });
        }
      } catch(e) {
          console.error("Error during failure analysis:", e);
      }
  }
}

// 模拟CI触发
const coordinator = new Coordinator();
setInterval(() => {
    // 模拟每 20 秒运行一次测试
    coordinator.submitTest('http://example.com'); 
}, 20000);

第四步:向量分析与诊断

这是系统的“智能”所在。当Coordinator收到失败截图后,它会调用两个服务:

  1. EmbeddingService: 将图片转换为向量。这通常是一个独立的Python服务,因为它能方便地使用transformersPyTorch等库。为了简化,我们假设它是一个可以通过HTTP或gRPC调用的服务。
  2. VectorDBService: 与Qdrant数据库交互,执行相似度搜索和数据插入。

coordinator/src/embedding.service.ts (概念实现)

import { CLIPModel, CLIPProcessor } from '@xenova/transformers';

// 在Node.js环境中使用ONNX Runtime执行模型
// 这是一个概念验证,生产环境建议使用独立的Python微服务以获得更好的性能和依赖管理
export class EmbeddingService {
    private model: CLIPModel;
    private processor: CLIPProcessor;
    private ready: boolean = false;

    constructor() {
        // 使用 Hugging Face 的预训练模型
        CLIPModel.from_pretrained('Xenova/clip-vit-base-patch32').then(model => {
            this.model = model;
        });
        CLIPProcessor.from_pretrained('Xenova/clip-vit-base-patch32').then(processor => {
            this.processor = processor;
            this.ready = true;
            console.log("Embedding model loaded.");
        });
    }

    public async generateEmbedding(imageBuffer: Buffer): Promise<number[]> {
        if (!this.ready) {
            throw new Error("Embedding model is not ready yet.");
        }
        // transformers.js 需要一个能被Jimp库读取的格式
        const inputs = await this.processor(imageBuffer);
        const { image_embeds } = await this.model(inputs);
        // 将TypedArray转换为普通数组
        return Array.from(image_embeds.data as Float32Array);
    }
}

coordinator/src/vectordb.service.ts

import { QdrantClient } from '@qdrant/js-client-rest';

export class VectorDBService {
    private client: QdrantClient;
    private collectionName: string;

    constructor(collectionName: string) {
        // 假设 Qdrant 运行在本地
        this.client = new QdrantClient({ url: 'http://localhost:6333' });
        this.collectionName = collectionName;
        this.ensureCollection();
    }

    private async ensureCollection() {
        const result = await this.client.getCollections();
        const collectionExists = result.collections.find(c => c.name === this.collectionName);
        if (!collectionExists) {
            console.log(`Collection ${this.collectionName} does not exist. Creating...`);
            await this.client.createCollection(this.collectionName, {
                vectors: {
                    size: 512, // CLIP-ViT-B/32 模型的向量维度
                    distance: 'Cosine',
                },
            });
        }
    }

    public async searchSimilar(vector: number[], threshold: number): Promise<any[]> {
        const result = await this.client.search(this.collectionName, {
            vector: vector,
            limit: 1,
            score_threshold: threshold,
            with_payload: true,
        });
        return result;
    }

    public async indexNewFailure(vector: number[], payload: Record<string, any>): Promise<void> {
        await this.client.upsert(this.collectionName, {
            wait: true,
            points: [
                {
                    // 使用UUID作为点ID
                    id: crypto.randomUUID(),
                    vector: vector,
                    payload: payload,
                },
            ],
        });
        console.log("New failure indexed in Qdrant.");
    }
}

一个常见的错误是在实现向量搜索时,忽略了对向量进行归一化(Normalization)。Cosine相似度在计算前,如果向量不是单位向量,结果可能会不准确。虽然很多现代向量数据库和模型库会自动处理,但在真实项目中,这是一个必须确认的检查点。

局限性与未来展望

这个架构虽然解决了最初的问题,但它并非银弹,存在一些显而易见的局限性:

  1. 调度策略过于简单: 当前的随机/轮询调度没有考虑Worker的负载、特定能力(如安装了特定版本浏览器的Worker)或数据的亲和性。一个更成熟的调度器会实现更复杂的算法,甚至支持基于标签的路由。
  2. 视觉模型的通用性: 使用通用的CLIP模型对于大部分场景有效,但对于包含大量专有设计语言或图标的复杂应用,其“理解”能力可能不足。未来的优化方向是对一个基础模型在项目自身的UI截图上进行微调(Fine-tuning),以提高识别精度。
  3. 失败的上下文缺失: 当前系统只分析了失败瞬间的截图。但很多E2E失败是时序性的,是前几个步骤的错误累积导致的。一个更强大的系统需要记录失败前的操作序列和状态快照,并将这些信息也编码进某种形式的向量表示中,进行多模态的失败分析。
  4. 可观测性不足: 虽然我们有了服务的注册与发现,但对于整个测试任务的生命周期、每个阶段的耗时、Worker的资源利用率等,缺乏集中的度量和监控。集成OpenTelemetry来做分布式链路追踪,将指标暴露给Prometheus,是生产化的必经之路。

尽管存在这些局限,这个架构展示了如何创造性地组合看似无关的技术(浏览器自动化、服务发现、向量搜索),来解决一个在工程实践中普遍存在且极具价值的痛点。它将E2E测试从一个被动的、需要大量人工介入的质量保障环节,向前推进了一步,使其具备了初步的“自我诊断”能力。


  目录