基于gRPC与Cassandra构建CQRS架构的实时协作系统后端与前端状态同步实践


构建一个支持多人实时协作的白板系统,技术上的核心挑战在于如何处理高并发的写入操作(用户绘制、移动图形)并以极低的延迟将状态变更广播给所有在线用户。一个用户每秒可能产生数十个操作,当有几十个用户在同一块画布上协作时,系统的状态变更事件将达到每秒上千次。

传统的基于RESTful API和关系型数据库的CRUD模型在这种场景下会迅速遭遇瓶颈。对同一数据行的频繁更新会导致锁竞争,数据库CPU飙升,而轮询或简单的WebSocket广播会造成巨大的网络开销和状态同步难题。这里的核心矛盾是:写操作(Commands)和读操作(Queries/Subscriptions)在性能、一致性要求和扩展性上完全不同。

方案A:传统单体架构与WebSocket

一个直接的思路是采用一个Node.js后端,通过WebSocket与客户端保持长连接,数据库使用PostgreSQL存储白板上所有元素的位置、颜色等状态。

优势:

  • 实现简单: 对于多数开发者而言,技术栈熟悉,上手快。
  • 强一致性: 数据库事务能保证每次操作的原子性,状态是强一致的。

劣势:

  • 性能瓶颈: 所有操作都集中在数据库的几张核心表上。当多人同时移动一个元素时,UPDATE语句会产生行级锁,高并发下将导致大量请求等待,响应延迟急剧上升。
  • 扩展性差: 后端服务与数据库是紧耦合的。数据库成为整个系统的单点瓶颈,垂直扩展成本高昂,水平扩展则需要引入复杂的分库分表方案。
  • 状态管理的复杂性: 后端需要维护每个WebSocket连接的状态,并计算出最小化的状态变更(diff)推送给客户端,这个逻辑本身就非常复杂且容易出错。客户端接收到全量状态或无序的增量状态时,渲染逻辑也会变得混乱。

在真实项目中,这种架构在用户数超过一个小规模时就会出现明显的卡顿,无法满足生产环境的要求。

方案B:CQRS与事件溯源架构

命令查询职责分离(Command Query Responsibility Segregation, CQRS)模式的核心思想是将系统的写操作(Command)和读操作(Query)模型分离。事件溯源(Event Sourcing)则是一种持久化策略,它不存储实体的最终状态,而是存储导致该状态的所有事件序列。

组合优势:

  • 高性能写入: 命令只负责验证业务规则并生成一个不可变的事件,然后将其追加到事件日志中。这是一个纯粹的append-only操作,速度极快,天然适合Cassandra这类为高吞吐量写入而设计的数据库。
  • 灵活的读取模型: 事件日志可以被多个消费者订阅,根据不同查询需求生成不同的、高度优化的读取模型(物化视图)。例如,一个模型用于渲染当前画布,另一个用于历史回放。
  • 天然的审计与调试能力: 完整的事件日志提供了系统状态变化的完整历史,极大地简化了审计和问题排查。
  • 高可扩展性: 写入服务(Command Side)和读取服务(Query Side)可以独立部署和扩展。写入压力大时,增加命令处理节点和Cassandra节点;读取压力大时,增加查询服务节点或优化读取模型。

面临的挑战:

  • 最终一致性: 读取模型是异步更新的,存在短暂的数据延迟。客户端必须能处理这种最终一致性带来的UI/UX问题。
  • 架构复杂性: 引入了事件总线、事件处理器、多套数据模型等概念,对团队的技术能力要求更高。

对于我们面临的高并发协作场景,方案B的优势远大于其复杂性带来的成本。它直接解决了核心的性能和扩展性问题。

最终技术选型与理由

我们决定采用方案B,并确定了具体的技术栈:

  • 命令传输: gRPC。基于HTTP/2,使用Protobuf进行序列化,性能远超JSON/REST。其双向流(Bi-directional streaming)特性是实现状态实时推送的完美选择。
  • 事件存储: Apache Cassandra。其无主(masterless)架构、优秀的水平扩展能力和专为高写入吞吐量优化的LSM-Tree存储引擎,是事件溯源的理想数据库。
  • 前端全局状态: Zustand。用于管理非协作核心的全局状态,如用户信息、UI模式(暗黑/明亮)、连接状态等。其API简洁,心智负担小。
  • 前端协作状态: Valtio。这是关键。Valtio基于Proxy实现状态管理,能够做到真正的“像素级”精确更新。当协作数据(一个包含成百上千个图形对象的嵌套结构)中只有一个对象的颜色发生变化时,只有订阅了该对象颜色的组件会重新渲染,这对保证复杂画布的渲染性能至关重要。

以下是整个架构的流程图:

graph TD
    subgraph Client
        A[React UI] -->|User Action| B(Dispatch Command);
        B --> C{gRPC-Web Client};
        C -->|Unary RPC| D[gRPC Gateway];
        F[State Update Listener] -->|Update Store| G[Valtio/Zustand Store];
        G --> A;
    end

    subgraph Backend
        D --> E[Command Service];
        E -->|Validate & Create Event| H[Command Handler];
        H -->|Append Event| I[Cassandra Event Store];
        
        subgraph Event Processor
            J[CDC Connector / Poller] -->|Read Events| I;
            J --> K[Event Bus - Kafka/NATS];
            L[Query Model Projector] -->|Consume Events| K;
            L -->|Update Read Model| M[Optimized Read Store - e.g., Redis/Elastic];
        end

        subgraph Query & Push Service
            N[gRPC Stream Service] -->|Subscribe to Updates| K;
            N -->|Push State Changes| D;
            D -->|Server Stream| C;
            C --> F;
        end
    end

    style I fill:#f9f,stroke:#333,stroke-width:2px
    style K fill:#ccf,stroke:#333,stroke-width:2px

在我们的简化实现中,为了聚焦核心技术,我们会将Event ProcessorQuery & Push Service合并,直接从Cassandra轮询新事件并推送到客户端。

核心实现概览

1. gRPC定义 (.proto)

我们定义命令(如创建、移动图形)和从服务器到客户端的事件流。

proto/whiteboard.proto:

syntax = "proto3";

package whiteboard;

option go_package = ".;whiteboard";

// 服务定义
service WhiteboardService {
  // 发送一个命令到服务器
  rpc ExecuteCommand(CommandRequest) returns (CommandResponse);

  // 订阅白板的状态变更事件
  rpc SubscribeToEvents(SubscriptionRequest) returns (stream Event);
}

// 命令请求体
message CommandRequest {
  string whiteboard_id = 1;
  oneof command {
    CreateShapeCommand create_shape = 2;
    MoveShapeCommand move_shape = 3;
    ChangeShapeColorCommand change_shape_color = 4;
  }
}

// 命令响应
message CommandResponse {
  bool success = 1;
  string error_message = 2;
  string event_id = 3; // 该命令产生的事件ID
}

// 订阅请求
message SubscriptionRequest {
  string whiteboard_id = 1;
  string last_seen_event_id = 2; // 用于断线重连,告诉服务器从哪个事件开始推
}

// --- 命令详情 ---
message CreateShapeCommand {
  string shape_id = 1;
  string shape_type = 2; // "rectangle", "circle"
  float x = 3;
  float y = 4;
  float width = 5;
  float height = 6;
  string color = 7;
}

message MoveShapeCommand {
  string shape_id = 1;
  float new_x = 2;
  float new_y = 3;
}

message ChangeShapeColorCommand {
  string shape_id = 1;
  string new_color = 2;
}


// --- 事件定义,这是推送到客户端的核心数据结构 ---
message Event {
  string event_id = 1;      // UUID, e.g., TimeUUID
  string whiteboard_id = 2;
  int64 timestamp = 3;      // UTC timestamp in milliseconds
  oneof payload {
    ShapeCreatedEvent shape_created = 4;
    ShapeMovedEvent shape_moved = 5;
    ShapeColorChangedEvent shape_color_changed = 6;
  }
}

// --- 事件详情 ---
message ShapeCreatedEvent {
  string shape_id = 1;
  string shape_type = 2;
  float x = 3;
  float y = 4;
  float width = 5;
  float height = 6;
  string color = 7;
}

message ShapeMovedEvent {
  string shape_id = 1;
  float new_x = 2;
  float new_y = 3;
}

message ShapeColorChangedEvent {
  string shape_id = 1;
  string new_color = 2;
}

2. 后端实现 (Go, gRPC & Cassandra)

Cassandra 表结构设计:

事件存储是核心。我们使用whiteboard_id作为分区键,保证同一个白板的所有事件都在同一个节点上,便于顺序读取。event_id使用timeuuid类型,它自带时间戳,可以按时间自然排序。这是事件溯源在Cassandra中的经典建模方式。

CREATE KEYSPACE IF NOT EXISTS whiteboard_events WITH replication = {'class': 'SimpleStrategy', 'replication_factor': '1'};

USE whiteboard_events;

CREATE TABLE events (
    whiteboard_id text,
    event_id timeuuid,
    event_type text,
    payload blob,  // 存储序列化后的 Protobuf 事件体
    PRIMARY KEY (whiteboard_id, event_id)
) WITH CLUSTERING ORDER BY (event_id ASC);

gRPC 服务端核心逻辑:

server/server.go:

package main

import (
	"context"
	"fmt"
	"log"
	"net"
	"time"

	"github.com/gocql/gocql"
	"google.golang.org/grpc"
	"google.golang.org/protobuf/proto"

	pb "path/to/your/proto/whiteboard" // 替换为你的 proto 包路径
)

type server struct {
	pb.UnimplementedWhiteboardServiceServer
	dbSession *gocql.Session
}

// ExecuteCommand 实现了处理命令的 gRPC 方法
func (s *server) ExecuteCommand(ctx context.Context, req *pb.CommandRequest) (*pb.CommandResponse, error) {
	whiteboardID := req.GetWhiteboardId()
	if whiteboardID == "" {
		return &pb.CommandResponse{Success: false, ErrorMessage: "whiteboard_id is required"}, nil
	}

	// 1. 根据命令类型,生成对应的事件
	var event pb.Event
	event.WhiteboardId = whiteboardID
	event.Timestamp = time.Now().UnixMilli()

	var eventType string
	var eventPayload proto.Message

	switch cmd := req.Command.(type) {
	case *pb.CommandRequest_CreateShape:
		eventType = "ShapeCreated"
		eventPayload = &pb.ShapeCreatedEvent{
			ShapeId:   cmd.CreateShape.ShapeId,
			ShapeType: cmd.CreateShape.ShapeType,
			X:         cmd.CreateShape.X,
			Y:         cmd.CreateShape.Y,
			Width:     cmd.CreateShape.Width,
			Height:    cmd.CreateShape.Height,
			Color:     cmd.CreateShape.Color,
		}
		event.Payload = &pb.Event_ShapeCreated{ShapeCreated: eventPayload.(*pb.ShapeCreatedEvent)}
	case *pb.CommandRequest_MoveShape:
		// ... 其他命令类型的处理
		eventType = "ShapeMoved"
		eventPayload = &pb.ShapeMovedEvent{
			ShapeId: cmd.MoveShape.ShapeId,
			NewX:    cmd.MoveShape.NewX,
			NewY:    cmd.MoveShape.NewY,
		}
		event.Payload = &pb.Event_ShapeMoved{ShapeMoved: eventPayload.(*pb.ShapeMovedEvent)}
	default:
		return &pb.CommandResponse{Success: false, ErrorMessage: "unsupported command type"}, nil
	}
	
	// 2. 序列化事件负载
	payloadBytes, err := proto.Marshal(eventPayload)
	if err != nil {
		log.Printf("Failed to marshal event payload: %v", err)
		return &pb.CommandResponse{Success: false, ErrorMessage: "internal server error"}, nil
	}

	// 3. 生成 TimeUUID 并持久化到 Cassandra
	eventID := gocql.TimeUUID()
	event.EventId = eventID.String()

	if err := s.dbSession.Query(`
		INSERT INTO events (whiteboard_id, event_id, event_type, payload) VALUES (?, ?, ?, ?)`,
		whiteboardID, eventID, eventType, payloadBytes).Exec(); err != nil {
		log.Printf("Failed to insert event into Cassandra: %v", err)
		return &pb.CommandResponse{Success: false, ErrorMessage: "failed to save command"}, nil
	}

	log.Printf("Successfully processed command and stored event %s for whiteboard %s", eventID.String(), whiteboardID)

	// 在真实项目中,这里会通过消息队列(如Kafka/NATS)将事件发布出去
	// 订阅服务会监听消息队列,而不是直接耦合在这里
	// 为了简化,我们暂时忽略这一步,在订阅服务中直接轮询数据库

	return &pb.CommandResponse{Success: true, EventId: eventID.String()}, nil
}

// SubscribeToEvents 实现了事件流的 gRPC 方法
func (s *server) SubscribeToEvents(req *pb.SubscriptionRequest, stream pb.WhiteboardService_SubscribeToEventsServer) error {
	whiteboardID := req.GetWhiteboardId()
	log.Printf("Client subscribed to whiteboard: %s", whiteboardID)

	ticker := time.NewTicker(500 * time.Millisecond) // 每500ms轮询一次数据库
	defer ticker.Stop()

	lastSeenUUID, err := gocql.ParseUUID(req.GetLastSeenEventId())
	if err != nil {
		// 如果客户端没有提供有效的UUID,我们从一个很早的时间点开始查询
		// 在生产环境中,可能需要一个更健壮的初始同步策略
		lastSeenUUID = gocql.MinTimeUUID(time.Now().Add(-24 * time.Hour))
	}

	for {
		select {
		case <-stream.Context().Done():
			log.Printf("Client for whiteboard %s disconnected.", whiteboardID)
			return stream.Context().Err()
		case <-ticker.C:
			iter := s.dbSession.Query(`
				SELECT event_id, event_type, payload FROM events WHERE whiteboard_id = ? AND event_id > ?`,
				whiteboardID, lastSeenUUID).Iter()

			var eventID gocql.UUID
			var eventType string
			var payloadBytes []byte

			for iter.Scan(&eventID, &eventType, &payloadBytes) {
				event := &pb.Event{
					EventId:      eventID.String(),
					WhiteboardId: whiteboardID,
					Timestamp:    gocql.UUIDTimestamp(eventID).UnixMilli(),
				}

				// 根据 event_type 反序列化 payload
				err := unmarshalPayload(eventType, payloadBytes, event)
				if err != nil {
					log.Printf("Error unmarshalling payload for event %s: %v", eventID.String(), err)
					continue // 跳过损坏的事件
				}

				if err := stream.Send(event); err != nil {
					log.Printf("Error sending event to client: %v", err)
					return err
				}

				lastSeenUUID = eventID
			}

			if err := iter.Close(); err != nil {
				log.Printf("Error closing iterator: %v", err)
			}
		}
	}
}

// ... (main函数和其他辅助函数)
// unmarshalPayload 辅助函数,用于将 blob 数据反序列化为具体的 protobuf 消息
func unmarshalPayload(eventType string, data []byte, event *pb.Event) error {
	var msg proto.Message
	switch eventType {
	case "ShapeCreated":
		payload := &pb.ShapeCreatedEvent{}
		msg = payload
		event.Payload = &pb.Event_ShapeCreated{ShapeCreated: payload}
	case "ShapeMoved":
		payload := &pb.ShapeMovedEvent{}
		msg = payload
		event.Payload = &pb.Event_ShapeMoved{ShapeMoved: payload}
	// ... 其他事件类型
	default:
		return fmt.Errorf("unknown event type: %s", eventType)
	}
	return proto.Unmarshal(data, msg)
}

func main() {
    // Cassandra 连接设置
	cluster := gocql.NewCluster("127.0.0.1")
	cluster.Keyspace = "whiteboard_events"
	cluster.Consistency = gocql.Quorum
	session, err := cluster.CreateSession()
	if err != nil {
		log.Fatalf("Failed to connect to Cassandra: %v", err)
	}
	defer session.Close()

	lis, err := net.Listen("tcp", ":50051")
	if err != nil {
		log.Fatalf("failed to listen: %v", err)
	}
	s := grpc.NewServer()
	pb.RegisterWhiteboardServiceServer(s, &server{dbSession: session})
	log.Printf("server listening at %v", lis.Addr())
	if err := s.Serve(lis); err != nil {
		log.Fatalf("failed to serve: %v", err)
	}
}

单元测试思路:

  • ExecuteCommand: 模拟gRPC请求,验证Cassandra中是否正确插入了对应类型的事件。可以使用mocking库替换dbSession,断言QueryExec的调用参数是否正确。
  • SubscribeToEvents: 这个比较复杂。可以启动一个内存中的gRPC服务器,在一个goroutine中向Cassandra插入事件,在另一个goroutine中调用SubscribeToEvents并验证接收到的事件流是否完整、有序。

3. 前端实现 (React, gRPC-Web, Zustand, Valtio)

状态管理分工:

  • Zustand Store (useGlobalStore.ts): 管理与协作画布本身无关的状态。
    import { create } from 'zustand';
    
    interface GlobalState {
      userId: string | null;
      connectionStatus: 'connecting' | 'connected' | 'disconnected';
      setConnectionStatus: (status: GlobalState['connectionStatus']) => void;
    }
    
    export const useGlobalStore = create<GlobalState>((set) => ({
      userId: `user-${Math.random().toString(36).substr(2, 9)}`,
      connectionStatus: 'disconnected',
      setConnectionStatus: (status) => set({ connectionStatus: status }),
    }));
  • Valtio Store (whiteboardStore.ts): 管理画布上的所有图形对象。这是性能关键点。
    import { proxy } from 'valtio';
    
    interface Shape {
      id: string;
      type: string;
      x: number;
      y: number;
      width: number;
      height: number;
      color: string;
    }
    
    // 使用 Record<string, Shape> 而不是 Shape[] 是为了 O(1) 的查找和更新效率
    interface WhiteboardState {
      shapes: Record<string, Shape>;
      lastEventId: string | null;
    }
    
    // proxy() 创建一个可被 Valtio 追踪的可变状态对象
    export const whiteboardStore = proxy<WhiteboardState>({
      shapes: {},
      lastEventId: null,
    });
    
    // 可以在这里定义一些操作 state 的 action,虽然 Valtio 允许直接修改
    export const eventProcessor = {
        applyShapeCreated(event: ShapeCreatedEvent) {
            whiteboardStore.shapes[event.shapeId] = {
                id: event.shapeId,
                type: event.shapeType,
                x: event.x,
                y: event.y,
                width: event.width,
                height: event.height,
                color: event.color,
            };
        },
        applyShapeMoved(event: ShapeMovedEvent) {
            const shape = whiteboardStore.shapes[event.shapeId];
            if (shape) {
                shape.x = event.newX;
                shape.y = event.newY;
            }
        },
        // ...其他事件处理器
    };

核心组件 (WhiteboardCanvas.tsx):

这个组件负责监听gRPC流,并根据事件更新Valtio store。它会渲染出所有的Shape子组件。

import React, { useEffect } from 'react';
import { useProxy } from 'valtio';
import { whiteboardStore, eventProcessor } from './whiteboardStore';
import { useGlobalStore } from './useGlobalStore';
import { grpcClient } from './grpcClient'; // 假设的gRPC客户端实例
import { Event } from './proto/whiteboard_pb'; // 从proto生成的类型

// 单个图形组件
// 使用 useProxy 订阅 whiteboardStore.shapes 中特定id的变化
const ShapeComponent = React.memo(({ shapeId }: { shapeId: string }) => {
  const shapes = useProxy(whiteboardStore.shapes);
  const shape = shapes[shapeId];

  // 这里的关键是:只有当 `shapes[shapeId]` 自身或其属性变化时,此组件才会重渲染
  // 如果另一个 shape 变化,此组件不会受到影响,这就是Valtio的威力
  if (!shape) return null;

  console.log(`Rendering shape: ${shape.id}`);
  
  return (
    <div
      style={{
        position: 'absolute',
        left: shape.x,
        top: shape.y,
        width: shape.width,
        height: shape.height,
        backgroundColor: shape.color,
        border: '1px solid black',
      }}
    >
      {shape.id.substring(0, 5)}
    </div>
  );
});

// 画布主组件
export const WhiteboardCanvas = () => {
  const { shapes } = useProxy(whiteboardStore);
  const { setConnectionStatus } = useGlobalStore();

  useEffect(() => {
    const whiteboardId = 'main-board';
    const request = new SubscriptionRequest();
    request.setWhiteboardId(whiteboardId);
    request.setLastSeenEventId(whiteboardStore.lastEventId || '');
    
    const stream = grpcClient.subscribeToEvents(request, {});
    setConnectionStatus('connecting');

    stream.on('data', (event: Event) => {
      // 这是事件处理的核心逻辑
      const payload = event.getPayloadCase();
      switch(payload) {
        case Event.PayloadCase.SHAPE_CREATED:
          eventProcessor.applyShapeCreated(event.getShapeCreated().toObject());
          break;
        case Event.PayloadCase.SHAPE_MOVED:
          eventProcessor.applyShapeMoved(event.getShapeMoved().toObject());
          break;
        // ... 其他事件
      }
      whiteboardStore.lastEventId = event.getEventId();
    });

    stream.on('status', (status) => {
      if (status.code === 0) {
        setConnectionStatus('connected');
      } else {
        setConnectionStatus('disconnected');
        console.error('gRPC stream error:', status.details);
        // 在这里实现重连逻辑,使用 whiteboardStore.lastEventId
      }
    });

    return () => {
      stream.cancel();
    };
  }, [setConnectionStatus]);

  const shapeIds = Object.keys(shapes);

  return (
    <div style={{ position: 'relative', width: '100vw', height: '100vh' }}>
      {shapeIds.map(id => (
        <ShapeComponent key={id} shapeId={id} />
      ))}
    </div>
  );
};

这里的性能优势在于,当gRPC流传来一个ShapeMovedEvent时,eventProcessor.applyShapeMoved只会修改whiteboardStore.shapes中一个特定shape对象的xy属性。Valtio的useProxy能够精确地知道只有订阅了这个特定shapeShapeComponent实例需要重新渲染,而WhiteboardCanvas本身以及其他上百个ShapeComponent都不会触发渲染。如果使用传统的Redux或Zustand,整个shapes对象被新对象替换,会导致所有ShapeComponent都进行diff比较,在高频更新下开销巨大。

架构的扩展性与局限性

此架构的扩展性非常强。

  1. 读写分离: 命令处理服务可以独立于事件推送服务进行扩缩容。
  2. 多读取模型: 我们可以轻易地增加新的事件消费者,将事件数据投影到Elasticsearch中以支持复杂查询,或者投影到时序数据库中进行性能分析,而无需改动核心的写入逻辑。
  3. 容错性: Cassandra的分布式特性提供了高可用性。事件流的中断可以通过last_seen_event_id进行恢复,保证消息不丢失。

然而,这个架构也存在局限性:

  • 最终一致性延迟: 从命令执行到客户端状态更新,中间存在网络延迟、数据库轮询延迟。对于需要强实时性的场景(如在线游戏),可能需要更低延迟的消息队列(如NATS)或直接在内存中处理。
  • 事件模型演进: 一旦事件被持久化,其结构(schema)就很难修改。对事件的任何变更都需要考虑版本控制和向后兼容性,这对长期维护是个挑战。
  • 开发心智模型: 团队成员需要理解CQRS、事件溯源和最终一致性等概念,这比传统的CRUD模型陡峭。一个常见的错误是试图在命令处理器中进行查询,这破坏了模式的初衷。

总的来说,对于高并发、写密集型的协作应用,这套基于gRPC、Cassandra、Valtio和Zustand的CQRS架构,尽管实现复杂,但它在性能、可扩展性和系统韧性上提供的优势,是传统架构无法比拟的。它的适用边界是那些对性能和扩展性有极高要求的复杂系统,对于简单的CRUD应用则是一种过度设计。


  目录