构建一个支持多人实时协作的白板系统,技术上的核心挑战在于如何处理高并发的写入操作(用户绘制、移动图形)并以极低的延迟将状态变更广播给所有在线用户。一个用户每秒可能产生数十个操作,当有几十个用户在同一块画布上协作时,系统的状态变更事件将达到每秒上千次。
传统的基于RESTful API和关系型数据库的CRUD模型在这种场景下会迅速遭遇瓶颈。对同一数据行的频繁更新会导致锁竞争,数据库CPU飙升,而轮询或简单的WebSocket广播会造成巨大的网络开销和状态同步难题。这里的核心矛盾是:写操作(Commands)和读操作(Queries/Subscriptions)在性能、一致性要求和扩展性上完全不同。
方案A:传统单体架构与WebSocket
一个直接的思路是采用一个Node.js后端,通过WebSocket与客户端保持长连接,数据库使用PostgreSQL存储白板上所有元素的位置、颜色等状态。
优势:
- 实现简单: 对于多数开发者而言,技术栈熟悉,上手快。
- 强一致性: 数据库事务能保证每次操作的原子性,状态是强一致的。
劣势:
- 性能瓶颈: 所有操作都集中在数据库的几张核心表上。当多人同时移动一个元素时,
UPDATE
语句会产生行级锁,高并发下将导致大量请求等待,响应延迟急剧上升。 - 扩展性差: 后端服务与数据库是紧耦合的。数据库成为整个系统的单点瓶颈,垂直扩展成本高昂,水平扩展则需要引入复杂的分库分表方案。
- 状态管理的复杂性: 后端需要维护每个WebSocket连接的状态,并计算出最小化的状态变更(diff)推送给客户端,这个逻辑本身就非常复杂且容易出错。客户端接收到全量状态或无序的增量状态时,渲染逻辑也会变得混乱。
在真实项目中,这种架构在用户数超过一个小规模时就会出现明显的卡顿,无法满足生产环境的要求。
方案B:CQRS与事件溯源架构
命令查询职责分离(Command Query Responsibility Segregation, CQRS)模式的核心思想是将系统的写操作(Command)和读操作(Query)模型分离。事件溯源(Event Sourcing)则是一种持久化策略,它不存储实体的最终状态,而是存储导致该状态的所有事件序列。
组合优势:
- 高性能写入: 命令只负责验证业务规则并生成一个不可变的事件,然后将其追加到事件日志中。这是一个纯粹的append-only操作,速度极快,天然适合Cassandra这类为高吞吐量写入而设计的数据库。
- 灵活的读取模型: 事件日志可以被多个消费者订阅,根据不同查询需求生成不同的、高度优化的读取模型(物化视图)。例如,一个模型用于渲染当前画布,另一个用于历史回放。
- 天然的审计与调试能力: 完整的事件日志提供了系统状态变化的完整历史,极大地简化了审计和问题排查。
- 高可扩展性: 写入服务(Command Side)和读取服务(Query Side)可以独立部署和扩展。写入压力大时,增加命令处理节点和Cassandra节点;读取压力大时,增加查询服务节点或优化读取模型。
面临的挑战:
- 最终一致性: 读取模型是异步更新的,存在短暂的数据延迟。客户端必须能处理这种最终一致性带来的UI/UX问题。
- 架构复杂性: 引入了事件总线、事件处理器、多套数据模型等概念,对团队的技术能力要求更高。
对于我们面临的高并发协作场景,方案B的优势远大于其复杂性带来的成本。它直接解决了核心的性能和扩展性问题。
最终技术选型与理由
我们决定采用方案B,并确定了具体的技术栈:
- 命令传输: gRPC。基于HTTP/2,使用Protobuf进行序列化,性能远超JSON/REST。其双向流(Bi-directional streaming)特性是实现状态实时推送的完美选择。
- 事件存储: Apache Cassandra。其无主(masterless)架构、优秀的水平扩展能力和专为高写入吞吐量优化的LSM-Tree存储引擎,是事件溯源的理想数据库。
- 前端全局状态: Zustand。用于管理非协作核心的全局状态,如用户信息、UI模式(暗黑/明亮)、连接状态等。其API简洁,心智负担小。
- 前端协作状态: Valtio。这是关键。Valtio基于Proxy实现状态管理,能够做到真正的“像素级”精确更新。当协作数据(一个包含成百上千个图形对象的嵌套结构)中只有一个对象的颜色发生变化时,只有订阅了该对象颜色的组件会重新渲染,这对保证复杂画布的渲染性能至关重要。
以下是整个架构的流程图:
graph TD subgraph Client A[React UI] -->|User Action| B(Dispatch Command); B --> C{gRPC-Web Client}; C -->|Unary RPC| D[gRPC Gateway]; F[State Update Listener] -->|Update Store| G[Valtio/Zustand Store]; G --> A; end subgraph Backend D --> E[Command Service]; E -->|Validate & Create Event| H[Command Handler]; H -->|Append Event| I[Cassandra Event Store]; subgraph Event Processor J[CDC Connector / Poller] -->|Read Events| I; J --> K[Event Bus - Kafka/NATS]; L[Query Model Projector] -->|Consume Events| K; L -->|Update Read Model| M[Optimized Read Store - e.g., Redis/Elastic]; end subgraph Query & Push Service N[gRPC Stream Service] -->|Subscribe to Updates| K; N -->|Push State Changes| D; D -->|Server Stream| C; C --> F; end end style I fill:#f9f,stroke:#333,stroke-width:2px style K fill:#ccf,stroke:#333,stroke-width:2px
在我们的简化实现中,为了聚焦核心技术,我们会将Event Processor
和Query & Push Service
合并,直接从Cassandra轮询新事件并推送到客户端。
核心实现概览
1. gRPC定义 (.proto)
我们定义命令(如创建、移动图形)和从服务器到客户端的事件流。
proto/whiteboard.proto
:
syntax = "proto3";
package whiteboard;
option go_package = ".;whiteboard";
// 服务定义
service WhiteboardService {
// 发送一个命令到服务器
rpc ExecuteCommand(CommandRequest) returns (CommandResponse);
// 订阅白板的状态变更事件
rpc SubscribeToEvents(SubscriptionRequest) returns (stream Event);
}
// 命令请求体
message CommandRequest {
string whiteboard_id = 1;
oneof command {
CreateShapeCommand create_shape = 2;
MoveShapeCommand move_shape = 3;
ChangeShapeColorCommand change_shape_color = 4;
}
}
// 命令响应
message CommandResponse {
bool success = 1;
string error_message = 2;
string event_id = 3; // 该命令产生的事件ID
}
// 订阅请求
message SubscriptionRequest {
string whiteboard_id = 1;
string last_seen_event_id = 2; // 用于断线重连,告诉服务器从哪个事件开始推
}
// --- 命令详情 ---
message CreateShapeCommand {
string shape_id = 1;
string shape_type = 2; // "rectangle", "circle"
float x = 3;
float y = 4;
float width = 5;
float height = 6;
string color = 7;
}
message MoveShapeCommand {
string shape_id = 1;
float new_x = 2;
float new_y = 3;
}
message ChangeShapeColorCommand {
string shape_id = 1;
string new_color = 2;
}
// --- 事件定义,这是推送到客户端的核心数据结构 ---
message Event {
string event_id = 1; // UUID, e.g., TimeUUID
string whiteboard_id = 2;
int64 timestamp = 3; // UTC timestamp in milliseconds
oneof payload {
ShapeCreatedEvent shape_created = 4;
ShapeMovedEvent shape_moved = 5;
ShapeColorChangedEvent shape_color_changed = 6;
}
}
// --- 事件详情 ---
message ShapeCreatedEvent {
string shape_id = 1;
string shape_type = 2;
float x = 3;
float y = 4;
float width = 5;
float height = 6;
string color = 7;
}
message ShapeMovedEvent {
string shape_id = 1;
float new_x = 2;
float new_y = 3;
}
message ShapeColorChangedEvent {
string shape_id = 1;
string new_color = 2;
}
2. 后端实现 (Go, gRPC & Cassandra)
Cassandra 表结构设计:
事件存储是核心。我们使用whiteboard_id
作为分区键,保证同一个白板的所有事件都在同一个节点上,便于顺序读取。event_id
使用timeuuid
类型,它自带时间戳,可以按时间自然排序。这是事件溯源在Cassandra中的经典建模方式。
CREATE KEYSPACE IF NOT EXISTS whiteboard_events WITH replication = {'class': 'SimpleStrategy', 'replication_factor': '1'};
USE whiteboard_events;
CREATE TABLE events (
whiteboard_id text,
event_id timeuuid,
event_type text,
payload blob, // 存储序列化后的 Protobuf 事件体
PRIMARY KEY (whiteboard_id, event_id)
) WITH CLUSTERING ORDER BY (event_id ASC);
gRPC 服务端核心逻辑:
server/server.go
:
package main
import (
"context"
"fmt"
"log"
"net"
"time"
"github.com/gocql/gocql"
"google.golang.org/grpc"
"google.golang.org/protobuf/proto"
pb "path/to/your/proto/whiteboard" // 替换为你的 proto 包路径
)
type server struct {
pb.UnimplementedWhiteboardServiceServer
dbSession *gocql.Session
}
// ExecuteCommand 实现了处理命令的 gRPC 方法
func (s *server) ExecuteCommand(ctx context.Context, req *pb.CommandRequest) (*pb.CommandResponse, error) {
whiteboardID := req.GetWhiteboardId()
if whiteboardID == "" {
return &pb.CommandResponse{Success: false, ErrorMessage: "whiteboard_id is required"}, nil
}
// 1. 根据命令类型,生成对应的事件
var event pb.Event
event.WhiteboardId = whiteboardID
event.Timestamp = time.Now().UnixMilli()
var eventType string
var eventPayload proto.Message
switch cmd := req.Command.(type) {
case *pb.CommandRequest_CreateShape:
eventType = "ShapeCreated"
eventPayload = &pb.ShapeCreatedEvent{
ShapeId: cmd.CreateShape.ShapeId,
ShapeType: cmd.CreateShape.ShapeType,
X: cmd.CreateShape.X,
Y: cmd.CreateShape.Y,
Width: cmd.CreateShape.Width,
Height: cmd.CreateShape.Height,
Color: cmd.CreateShape.Color,
}
event.Payload = &pb.Event_ShapeCreated{ShapeCreated: eventPayload.(*pb.ShapeCreatedEvent)}
case *pb.CommandRequest_MoveShape:
// ... 其他命令类型的处理
eventType = "ShapeMoved"
eventPayload = &pb.ShapeMovedEvent{
ShapeId: cmd.MoveShape.ShapeId,
NewX: cmd.MoveShape.NewX,
NewY: cmd.MoveShape.NewY,
}
event.Payload = &pb.Event_ShapeMoved{ShapeMoved: eventPayload.(*pb.ShapeMovedEvent)}
default:
return &pb.CommandResponse{Success: false, ErrorMessage: "unsupported command type"}, nil
}
// 2. 序列化事件负载
payloadBytes, err := proto.Marshal(eventPayload)
if err != nil {
log.Printf("Failed to marshal event payload: %v", err)
return &pb.CommandResponse{Success: false, ErrorMessage: "internal server error"}, nil
}
// 3. 生成 TimeUUID 并持久化到 Cassandra
eventID := gocql.TimeUUID()
event.EventId = eventID.String()
if err := s.dbSession.Query(`
INSERT INTO events (whiteboard_id, event_id, event_type, payload) VALUES (?, ?, ?, ?)`,
whiteboardID, eventID, eventType, payloadBytes).Exec(); err != nil {
log.Printf("Failed to insert event into Cassandra: %v", err)
return &pb.CommandResponse{Success: false, ErrorMessage: "failed to save command"}, nil
}
log.Printf("Successfully processed command and stored event %s for whiteboard %s", eventID.String(), whiteboardID)
// 在真实项目中,这里会通过消息队列(如Kafka/NATS)将事件发布出去
// 订阅服务会监听消息队列,而不是直接耦合在这里
// 为了简化,我们暂时忽略这一步,在订阅服务中直接轮询数据库
return &pb.CommandResponse{Success: true, EventId: eventID.String()}, nil
}
// SubscribeToEvents 实现了事件流的 gRPC 方法
func (s *server) SubscribeToEvents(req *pb.SubscriptionRequest, stream pb.WhiteboardService_SubscribeToEventsServer) error {
whiteboardID := req.GetWhiteboardId()
log.Printf("Client subscribed to whiteboard: %s", whiteboardID)
ticker := time.NewTicker(500 * time.Millisecond) // 每500ms轮询一次数据库
defer ticker.Stop()
lastSeenUUID, err := gocql.ParseUUID(req.GetLastSeenEventId())
if err != nil {
// 如果客户端没有提供有效的UUID,我们从一个很早的时间点开始查询
// 在生产环境中,可能需要一个更健壮的初始同步策略
lastSeenUUID = gocql.MinTimeUUID(time.Now().Add(-24 * time.Hour))
}
for {
select {
case <-stream.Context().Done():
log.Printf("Client for whiteboard %s disconnected.", whiteboardID)
return stream.Context().Err()
case <-ticker.C:
iter := s.dbSession.Query(`
SELECT event_id, event_type, payload FROM events WHERE whiteboard_id = ? AND event_id > ?`,
whiteboardID, lastSeenUUID).Iter()
var eventID gocql.UUID
var eventType string
var payloadBytes []byte
for iter.Scan(&eventID, &eventType, &payloadBytes) {
event := &pb.Event{
EventId: eventID.String(),
WhiteboardId: whiteboardID,
Timestamp: gocql.UUIDTimestamp(eventID).UnixMilli(),
}
// 根据 event_type 反序列化 payload
err := unmarshalPayload(eventType, payloadBytes, event)
if err != nil {
log.Printf("Error unmarshalling payload for event %s: %v", eventID.String(), err)
continue // 跳过损坏的事件
}
if err := stream.Send(event); err != nil {
log.Printf("Error sending event to client: %v", err)
return err
}
lastSeenUUID = eventID
}
if err := iter.Close(); err != nil {
log.Printf("Error closing iterator: %v", err)
}
}
}
}
// ... (main函数和其他辅助函数)
// unmarshalPayload 辅助函数,用于将 blob 数据反序列化为具体的 protobuf 消息
func unmarshalPayload(eventType string, data []byte, event *pb.Event) error {
var msg proto.Message
switch eventType {
case "ShapeCreated":
payload := &pb.ShapeCreatedEvent{}
msg = payload
event.Payload = &pb.Event_ShapeCreated{ShapeCreated: payload}
case "ShapeMoved":
payload := &pb.ShapeMovedEvent{}
msg = payload
event.Payload = &pb.Event_ShapeMoved{ShapeMoved: payload}
// ... 其他事件类型
default:
return fmt.Errorf("unknown event type: %s", eventType)
}
return proto.Unmarshal(data, msg)
}
func main() {
// Cassandra 连接设置
cluster := gocql.NewCluster("127.0.0.1")
cluster.Keyspace = "whiteboard_events"
cluster.Consistency = gocql.Quorum
session, err := cluster.CreateSession()
if err != nil {
log.Fatalf("Failed to connect to Cassandra: %v", err)
}
defer session.Close()
lis, err := net.Listen("tcp", ":50051")
if err != nil {
log.Fatalf("failed to listen: %v", err)
}
s := grpc.NewServer()
pb.RegisterWhiteboardServiceServer(s, &server{dbSession: session})
log.Printf("server listening at %v", lis.Addr())
if err := s.Serve(lis); err != nil {
log.Fatalf("failed to serve: %v", err)
}
}
单元测试思路:
-
ExecuteCommand
: 模拟gRPC请求,验证Cassandra中是否正确插入了对应类型的事件。可以使用mocking库替换dbSession
,断言Query
和Exec
的调用参数是否正确。 -
SubscribeToEvents
: 这个比较复杂。可以启动一个内存中的gRPC服务器,在一个goroutine中向Cassandra插入事件,在另一个goroutine中调用SubscribeToEvents
并验证接收到的事件流是否完整、有序。
3. 前端实现 (React, gRPC-Web, Zustand, Valtio)
状态管理分工:
- Zustand Store (
useGlobalStore.ts
): 管理与协作画布本身无关的状态。import { create } from 'zustand'; interface GlobalState { userId: string | null; connectionStatus: 'connecting' | 'connected' | 'disconnected'; setConnectionStatus: (status: GlobalState['connectionStatus']) => void; } export const useGlobalStore = create<GlobalState>((set) => ({ userId: `user-${Math.random().toString(36).substr(2, 9)}`, connectionStatus: 'disconnected', setConnectionStatus: (status) => set({ connectionStatus: status }), }));
- Valtio Store (
whiteboardStore.ts
): 管理画布上的所有图形对象。这是性能关键点。import { proxy } from 'valtio'; interface Shape { id: string; type: string; x: number; y: number; width: number; height: number; color: string; } // 使用 Record<string, Shape> 而不是 Shape[] 是为了 O(1) 的查找和更新效率 interface WhiteboardState { shapes: Record<string, Shape>; lastEventId: string | null; } // proxy() 创建一个可被 Valtio 追踪的可变状态对象 export const whiteboardStore = proxy<WhiteboardState>({ shapes: {}, lastEventId: null, }); // 可以在这里定义一些操作 state 的 action,虽然 Valtio 允许直接修改 export const eventProcessor = { applyShapeCreated(event: ShapeCreatedEvent) { whiteboardStore.shapes[event.shapeId] = { id: event.shapeId, type: event.shapeType, x: event.x, y: event.y, width: event.width, height: event.height, color: event.color, }; }, applyShapeMoved(event: ShapeMovedEvent) { const shape = whiteboardStore.shapes[event.shapeId]; if (shape) { shape.x = event.newX; shape.y = event.newY; } }, // ...其他事件处理器 };
核心组件 (WhiteboardCanvas.tsx
):
这个组件负责监听gRPC流,并根据事件更新Valtio store。它会渲染出所有的Shape
子组件。
import React, { useEffect } from 'react';
import { useProxy } from 'valtio';
import { whiteboardStore, eventProcessor } from './whiteboardStore';
import { useGlobalStore } from './useGlobalStore';
import { grpcClient } from './grpcClient'; // 假设的gRPC客户端实例
import { Event } from './proto/whiteboard_pb'; // 从proto生成的类型
// 单个图形组件
// 使用 useProxy 订阅 whiteboardStore.shapes 中特定id的变化
const ShapeComponent = React.memo(({ shapeId }: { shapeId: string }) => {
const shapes = useProxy(whiteboardStore.shapes);
const shape = shapes[shapeId];
// 这里的关键是:只有当 `shapes[shapeId]` 自身或其属性变化时,此组件才会重渲染
// 如果另一个 shape 变化,此组件不会受到影响,这就是Valtio的威力
if (!shape) return null;
console.log(`Rendering shape: ${shape.id}`);
return (
<div
style={{
position: 'absolute',
left: shape.x,
top: shape.y,
width: shape.width,
height: shape.height,
backgroundColor: shape.color,
border: '1px solid black',
}}
>
{shape.id.substring(0, 5)}
</div>
);
});
// 画布主组件
export const WhiteboardCanvas = () => {
const { shapes } = useProxy(whiteboardStore);
const { setConnectionStatus } = useGlobalStore();
useEffect(() => {
const whiteboardId = 'main-board';
const request = new SubscriptionRequest();
request.setWhiteboardId(whiteboardId);
request.setLastSeenEventId(whiteboardStore.lastEventId || '');
const stream = grpcClient.subscribeToEvents(request, {});
setConnectionStatus('connecting');
stream.on('data', (event: Event) => {
// 这是事件处理的核心逻辑
const payload = event.getPayloadCase();
switch(payload) {
case Event.PayloadCase.SHAPE_CREATED:
eventProcessor.applyShapeCreated(event.getShapeCreated().toObject());
break;
case Event.PayloadCase.SHAPE_MOVED:
eventProcessor.applyShapeMoved(event.getShapeMoved().toObject());
break;
// ... 其他事件
}
whiteboardStore.lastEventId = event.getEventId();
});
stream.on('status', (status) => {
if (status.code === 0) {
setConnectionStatus('connected');
} else {
setConnectionStatus('disconnected');
console.error('gRPC stream error:', status.details);
// 在这里实现重连逻辑,使用 whiteboardStore.lastEventId
}
});
return () => {
stream.cancel();
};
}, [setConnectionStatus]);
const shapeIds = Object.keys(shapes);
return (
<div style={{ position: 'relative', width: '100vw', height: '100vh' }}>
{shapeIds.map(id => (
<ShapeComponent key={id} shapeId={id} />
))}
</div>
);
};
这里的性能优势在于,当gRPC流传来一个ShapeMovedEvent
时,eventProcessor.applyShapeMoved
只会修改whiteboardStore.shapes
中一个特定shape
对象的x
和y
属性。Valtio的useProxy
能够精确地知道只有订阅了这个特定shape
的ShapeComponent
实例需要重新渲染,而WhiteboardCanvas
本身以及其他上百个ShapeComponent
都不会触发渲染。如果使用传统的Redux或Zustand,整个shapes
对象被新对象替换,会导致所有ShapeComponent
都进行diff比较,在高频更新下开销巨大。
架构的扩展性与局限性
此架构的扩展性非常强。
- 读写分离: 命令处理服务可以独立于事件推送服务进行扩缩容。
- 多读取模型: 我们可以轻易地增加新的事件消费者,将事件数据投影到Elasticsearch中以支持复杂查询,或者投影到时序数据库中进行性能分析,而无需改动核心的写入逻辑。
- 容错性: Cassandra的分布式特性提供了高可用性。事件流的中断可以通过
last_seen_event_id
进行恢复,保证消息不丢失。
然而,这个架构也存在局限性:
- 最终一致性延迟: 从命令执行到客户端状态更新,中间存在网络延迟、数据库轮询延迟。对于需要强实时性的场景(如在线游戏),可能需要更低延迟的消息队列(如NATS)或直接在内存中处理。
- 事件模型演进: 一旦事件被持久化,其结构(schema)就很难修改。对事件的任何变更都需要考虑版本控制和向后兼容性,这对长期维护是个挑战。
- 开发心智模型: 团队成员需要理解CQRS、事件溯源和最终一致性等概念,这比传统的CRUD模型陡峭。一个常见的错误是试图在命令处理器中进行查询,这破坏了模式的初衷。
总的来说,对于高并发、写密集型的协作应用,这套基于gRPC、Cassandra、Valtio和Zustand的CQRS架构,尽管实现复杂,但它在性能、可扩展性和系统韧性上提供的优势,是传统架构无法比拟的。它的适用边界是那些对性能和扩展性有极高要求的复杂系统,对于简单的CRUD应用则是一种过度设计。