构建基于 NATS 事件驱动的异构微服务架构以绞杀 Ruby on Rails 单体


一个庞大、陈旧的 Ruby on Rails 单体应用正成为团队迭代的瓶颈。它的数据库模式复杂,代码盘根错节,任何微小的改动都可能引发雪崩效应,部署周期以周为单位。直接用 ASP.NET Core 重写整个系统,风险高、周期长,业务无法接受。另一种常见的方案是引入 API 网关做请求路由,逐步将功能模块迁移到新的 ASP.NET Core 服务中。但这治标不治本,核心问题——紧耦合的数据库——依然存在,两个系统同时读写同一张表会引入灾难性的数据一致性与锁竞争问题。

最终的技术决策是采用事件驱动的绞杀者模式(Event-Driven Strangler Fig)。核心思路是:将 Rails 单体视为一个事件源,当其内部发生核心业务实体的状态变更时(如用户更新、订单创建),它不再仅仅是写入自己的数据库,而是向 NATS 消息总线发布一个领域事件。新的 ASP.NET Core 微服务订阅这些事件,构建自己的数据副本(读模型),并在此基础上提供新的查询 API。前端(Gatsby)则通过一个后端换前端(BFF)层聚合新旧两个系统的数据,对用户呈现统一视图。

这个方案的优势在于实现了真正的解耦。新服务拥有独立的数据存储和演进路径,与单体之间通过一层异步、持久化的消息总线隔离,数据库层面的竞争被彻底消除。随着时间推移,越来越多的读请求会从新服务满足,最终,写操作也可以被迁移到新的服务中,单体的职责范围被逐步“绞杀”,直至完全下线。

架构概览

整个系统的交互流程可以通过以下模型来理解:

graph TD
    subgraph "用户端"
        Gatsby[Gatsby 静态站点]
    end

    subgraph "BFF (Backend for Frontend)"
        BFF_API[ASP.NET Core API]
    end

    subgraph "新微服务域"
        UserService[ASP.NET Core 用户服务]
        UserServiceDB[(PostgreSQL)]
    end

    subgraph "消息总线"
        NATS[NATS JetStream]
    end

    subgraph "遗留单体域"
        RailsApp[Ruby on Rails Monolith]
        RailsDB[(MySQL)]
    end

    Gatsby -->|GraphQL/REST 请求| BFF_API
    BFF_API -->|查询新数据| UserService
    BFF_API -->|查询遗留数据| RailsApp
    UserService --> UserServiceDB

    RailsApp -- "after_commit: publish event" --> NATS
    RailsDB -- "读写" <--> RailsApp
    NATS -- "push event" --> UserService

遗留系统改造:在 Ruby on Rails 中发布 NATS 事件

改造的第一步是在 Rails 单体中植入事件发布逻辑。这里的关键是确保事件发布与数据库事务的原子性。我们只希望在数据库事务成功提交后才发布事件,避免发布了事件但数据回滚的脏数据情况。ActiveRecord 的 after_commit 回调是实现这一点的理想选择。

我们首先需要一个健壮的 NATS 客户端。在 Ruby 生态中,nats-pure 是一个不错的选择。

Gemfile

gem 'nats-pure'
gem 'activesupport' # for concerns

接下来,我们创建一个 NatsPublishable Concern,以便在任何需要发布事件的 Model 中轻松复用。

app/models/concerns/nats_publishable.rb

# frozen_string_literal: true

require 'nats/client'
require 'json'
require 'logging'

module NatsPublishable
  extend ActiveSupport::Concern

  included do
    # 在数据库事务成功提交后触发事件发布
    # on: [:create, :update] 意味着只有在创建和更新时才触发
    after_commit :publish_event_to_nats, on: %i[create update], if: :should_publish?
  end

  # 一个抽象方法,子类必须实现它来定义事件的主题 (subject)
  # 例如:"users.updated", "orders.created"
  def nats_subject
    raise NotImplementedError, 'Models including NatsPublishable must implement #nats_subject'
  end

  # 一个抽象方法,子类必须实现它来构建事件负载 (payload)
  def nats_payload
    raise NotImplementedError, 'Models including NatsPublishable must implement #nats_payload'
  end

  private

  # 获取一个单例的 NATS 连接客户端
  # 在生产环境中,配置应该来自环境变量
  def nats_client
    @nats_client ||= NatsClientFactory.client
  end

  # 实际的事件发布逻辑
  def publish_event_to_nats
    subject = nats_subject
    payload = nats_payload.to_json

    # JetStream 的 publish 方法是异步的,但提供了 ack 确认机制
    # 这确保了消息确实被 NATS 服务器持久化了
    ack = nats_client.jetstream.publish(subject, payload)
    
    # 生产级的日志记录是必须的
    logger.info "[NATS] Published event to subject '#{subject}'. Stream: #{ack.stream}, Seq: #{ack.seq}"
  rescue NATS::IO::ConnectError => e
    logger.error "[NATS] Connection failed while publishing to '#{subject}': #{e.message}"
    # 在这里可以加入重试逻辑或告警机制
  rescue NATS::JetStream::Error::Timeout => e
    logger.error "[NATS] Timeout while waiting for ACK for subject '#{subject}': #{e.message}"
    # ACK 超时意味着消息可能没有被持久化,需要告警
  rescue => e
    logger.error "[NATS] An unexpected error occurred while publishing to '#{subject}': #{e.message}"
    # 捕获所有其他异常
  end
  
  # 提供一个开关,允许在某些场景下(如数据迁移)禁用事件发布
  def should_publish?
    # 在真实项目中,这可能是一个功能开关或环境变量
    true
  end

  def logger
    @logger ||= Logging.logger[self]
  end
end

# 单例工厂,负责创建和管理 NATS 连接
class NatsClientFactory
  def self.client
    @client ||= begin
      logger = Logging.logger[self]
      nats_url = ENV.fetch('NATS_URL', 'nats://localhost:4222')
      
      nc = NATS.connect(
        servers: [nats_url],
        # 在断线后自动重连
        reconnect: true,
        max_reconnect_attempts: 10,
        reconnect_time_wait: 2
      )
      
      # 优雅关闭处理
      Signal.trap("TERM") { nc.close }
      Signal.trap("INT") { nc.close }

      logger.info "[NATS] Connected to NATS server at #{nats_url}"
      nc
    end
  end
end

现在,将这个 Concern 应用到 User 模型中。

app/models/user.rb

class User < ApplicationRecord
  include NatsPublishable

  # 实现 Concern 中定义的抽象方法
  def nats_subject
    # 根据是创建还是更新,动态确定 subject
    # 'previously_new_record?' 是 ActiveRecord 的一个技巧,用于判断是否是 create 操作
    action = previously_new_record? ? 'created' : 'updated'
    "users.#{action}"
  end

  def nats_payload
    # 定义事件的数据结构,这应该是一个稳定的契约
    {
      event_id: SecureRandom.uuid,
      event_timestamp: Time.now.utc.iso8601,
      event_source: 'rails-monolith',
      event_version: '1.0',
      data: {
        id: self.id,
        email: self.email,
        full_name: self.full_name,
        status: self.status,
        updated_at: self.updated_at.utc.iso8601
      }
    }
  end
end

通过这种方式,每当一个 User 记录被创建或更新并成功提交到数据库后,一个 users.createdusers.updated 事件就会被发布到 NATS。

NATS JetStream 配置

为了保证事件不丢失,我们必须使用 NATS JetStream 的持久化能力。我们需要创建一个 Stream 来捕获所有与用户相关的事件。

可以使用 NATS CLI 来配置:

# 创建一个名为 'USER_EVENTS' 的流,捕获所有 'users.*' 主题的消息
# --retention Limits: 限制流中消息的数量
# --storage File: 将消息持久化到文件系统
# --ack: 要求消息被确认
nats stream add USER_EVENTS --subjects "users.*" --ack --max-msgs=-1 --max-bytes=-1 --storage file --retention limits --discard new

# 确认流已创建
nats stream info USER_EVENTS

新服务构建:ASP.NET Core 订阅者

现在轮到新的 ASP.NET Core 用户服务。它的核心职责是作为一个后台服务,持续监听 NATS USER_EVENTS 流,并将接收到的数据同步到自己的数据库中。

首先,安装 NATS .NET 客户端:

dotnet add package NATS.Net
dotnet add package Microsoft.Extensions.Hosting

接下来,我们创建一个 NatsEventSubscriber 后台服务。

Services/NatsEventSubscriber.cs

using System.Text;
using System.Text.Json;
using Microsoft.Extensions.Hosting;
using Microsoft.Extensions.Logging;
using Microsoft.Extensions.Options;
using NATS.Client.Core;
using NATS.Client.JetStream;
using UserSvc.Data; // 假设这是 EF Core DbContext
using UserSvc.Models; // 假设这是领域模型

namespace UserSvc.Services;

public class NatsEventSubscriber : BackgroundService
{
    private readonly ILogger<NatsEventSubscriber> _logger;
    private readonly IServiceScopeFactory _scopeFactory;
    private readonly NatsOpts _natsOpts;
    private readonly NatsConnection _natsConnection;

    // 使用 IOptions 来注入配置,IServiceScopeFactory 用于在后台服务中创建独立的依赖注入作用域
    public NatsEventSubscriber(
        ILogger<NatsEventSubscriber> logger,
        IServiceScopeFactory scopeFactory,
        IOptions<NatsConfig> natsConfig)
    {
        _logger = logger;
        _scopeFactory = scopeFactory;
        _natsOpts = NatsOpts.Default with
        {
            Url = natsConfig.Value.Url,
            Name = "aspnetcore-user-subscriber",
            // 配置连接事件的回调,用于日志和监控
            ReconnectHandler = (_, args) => _logger.LogWarning("Reconnecting to NATS... Attempt {Attempt}", args.Attempt),
            DisconnectedHandler = (_, args) => _logger.LogError(args.Error, "Disconnected from NATS server"),
        };
        _natsConnection = new NatsConnection(_natsOpts);
    }
    
    protected override async Task ExecuteAsync(CancellationToken stoppingToken)
    {
        _logger.LogInformation("NATS event subscriber starting.");
        await _natsConnection.ConnectAsync();
        var js = new NatsJSContext(_natsConnection);

        // 创建一个持久化的消费者,'user-svc-consumer' 是消费者的名字
        // NATS 会记住这个消费者处理到了哪条消息,即使服务重启也能从断点处继续
        var consumer = await js.CreateOrUpdateConsumerAsync(
            "USER_EVENTS",
            new NatsJSConsumerOpts("user-svc-consumer")
            {
                // DeliverPolicy = NatsJSDeliverPolicy.All: 从流的第一条消息开始消费
                // AckPolicy = NatsJSAckPolicy.Explicit: 需要手动确认消息
                DeliverPolicy = NatsJSDeliverPolicy.All,
                AckPolicy = NatsJSAckPolicy.Explicit,
            },
            stoppingToken
        );
        
        _logger.LogInformation("NATS consumer '{ConsumerName}' is ready to receive messages.", consumer.Info.Name);

        // 循环消费消息,直到服务被取消
        await foreach (var msg in consumer.ConsumeAsync(cancellationToken: stoppingToken))
        {
            try
            {
                // 在每次处理消息时创建一个新的依赖注入作用域
                // 这是一个最佳实践,可以避免 DbContext 等 Scoped 服务生命周期过长的问题
                using var scope = _scopeFactory.CreateScope();
                var dbContext = scope.ServiceProvider.GetRequiredService<UserDbContext>();
                
                await ProcessMessageAsync(msg, dbContext);

                // 处理成功后,向 NATS 发送 ACK 确认
                await msg.AckAsync(cancellationToken: stoppingToken);
            }
            catch (JsonException ex)
            {
                _logger.LogError(ex, "Failed to deserialize message. Subject: {Subject}. Moving to dead-letter queue.", msg.Subject);
                // 对于无法解析的 "毒消息",我们选择终止处理 (Term),NATS 会根据策略不再投递
                await msg.TermAsync(cancellationToken: stoppingToken);
            }
            catch (Exception ex)
            {
                _logger.LogError(ex, "An unhandled exception occurred while processing message. Subject: {Subject}. Retrying...", msg.Subject);
                // 对于其他异常(如数据库暂时不可用),我们不发送 ACK
                // NATS 会在 AckWait 时间后重新投递该消息
            }
        }
    }

    private async Task ProcessMessageAsync(NatsJSMsg msg, UserDbContext dbContext)
    {
        var messageBody = Encoding.UTF8.GetString(msg.Data);
        _logger.LogInformation("Received message on subject {Subject}", msg.Subject);
        
        var userEvent = JsonSerializer.Deserialize<UserEvent>(messageBody, new JsonSerializerOptions { PropertyNameCaseInsensitive = true });
        
        if (userEvent?.Data == null)
        {
            throw new JsonException("Event data is null or invalid.");
        }

        var userData = userEvent.Data;
        var existingUser = await dbContext.Users.FindAsync(userData.Id);

        if (existingUser != null)
        {
            // 更新逻辑:只在接收到的事件比现有数据新时才更新
            if (userData.UpdatedAt > existingUser.UpdatedAt)
            {
                existingUser.Email = userData.Email;
                existingUser.FullName = userData.FullName;
                existingUser.Status = userData.Status;
                existingUser.UpdatedAt = userData.UpdatedAt;
                _logger.LogInformation("Updating user {UserId}", existingUser.Id);
            }
        }
        else
        {
            // 创建逻辑
            var newUser = new User
            {
                Id = userData.Id,
                Email = userData.Email,
                FullName = userData.FullName,
                Status = userData.Status,
                CreatedAt = userData.UpdatedAt, // 近似处理
                UpdatedAt = userData.UpdatedAt
            };
            dbContext.Users.Add(newUser);
            _logger.LogInformation("Creating new user {UserId}", newUser.Id);
        }

        await dbContext.SaveChangesAsync();
    }

    public override async Task StopAsync(CancellationToken cancellationToken)
    {
        _logger.LogInformation("NATS event subscriber stopping.");
        await _natsConnection.DisposeAsync();
        await base.StopAsync(cancellationToken);
    }
}

// 定义事件的数据传输对象 (DTO)
public record UserEventData(int Id, string Email, string FullName, string Status, DateTime UpdatedAt);
public record UserEvent(Guid EventId, DateTime EventTimestamp, string EventSource, string EventVersion, UserEventData Data);

然后在 Program.cs 中注册这个后台服务。

Program.cs

// ...
builder.Services.AddOptions<NatsConfig>().Bind(builder.Configuration.GetSection("Nats"));
builder.Services.AddDbContext<UserDbContext>(options => 
    options.UseNpgsql(builder.Configuration.GetConnectionString("DefaultConnection")));

builder.Services.AddHostedService<NatsEventSubscriber>();
// ...

这个 BackgroundService 实现了一个健壮的消费者。它使用了持久化消费者来保证消息不丢失,通过显式 ACK 来确保处理成功,并且正确处理了依赖注入作用域和异常情况。

前端整合与 BFF

现在,新的用户服务已经有了自己独立、高效的只读数据副本。最后一步是让前端应用 Gatsby 来消费这些数据。直接让前端同时请求 Rails 单体和新服务会导致复杂的认证和跨域问题。更好的模式是引入一个 BFF (Backend for Frontend)。

这个 BFF 可以是另一个 ASP.NET Core Web API 项目。它的作用是作为一个统一的 API 入口,为 Gatsby 前端量身定做数据。

Bff.Controllers/UserProfileController.cs

[ApiController]
[Route("api/v1/profile")]
public class UserProfileController : ControllerBase
{
    private readonly HttpClient _legacyRailsApiClient;
    private readonly UserDbContext _newUserDbContext; // 直接查询新服务的数据库

    public UserProfileController(IHttpClientFactory httpClientFactory, UserDbContext newUserDbContext)
    {
        _legacyRailsApiClient = httpClientFactory.CreateClient("RailsAPI");
        _newUserDbContext = newUserDbContext;
    }

    [HttpGet("{userId}")]
    public async Task<IActionResult> GetUserProfile(int userId)
    {
        // 1. 从新的、快速的只读副本中获取核心用户信息
        var user = await _newUserDbContext.Users.AsNoTracking().FirstOrDefaultAsync(u => u.Id == userId);
        if (user == null)
        {
            return NotFound();
        }

        // 2. 从遗留的 Rails API 中获取尚未迁移的复杂数据,例如订单历史
        // 这里的真实实现会更复杂,需要处理认证和错误
        var ordersResponse = await _legacyRailsApiClient.GetAsync($"/api/internal/users/{userId}/orders");
        ordersResponse.EnsureSuccessStatusCode();
        var orders = await ordersResponse.Content.ReadFromJsonAsync<List<OrderDto>>();

        // 3. 聚合数据,返回给前端
        var profile = new UserProfileViewModel
        {
            Id = user.Id,
            Email = user.Email,
            FullName = user.FullName,
            Status = user.Status,
            RecentOrders = orders ?? new List<OrderDto>()
        };

        return Ok(profile);
    }
}

Gatsby 应用现在只需要与这个 BFF API 进行交互。它不需要知道后端是单体还是微服务,实现了前后端的彻底解耦。当未来订单功能也迁移到新的微服务后,只需要修改 BFF 的 GetUserProfile 方法,将其数据源从 _legacyRailsApiClient 切换到新的订单微服务即可,前端代码完全不需要改动。

当前方案的局限性与展望

这个架构并非银弹。最显著的代价是引入了最终一致性。从 Rails 单体更新数据到新服务能够查询到最新数据之间,存在一个毫秒到秒级的延迟。所有依赖此数据的业务场景必须能够容忍这种延迟。对于需要强一致性的场景,例如支付,仍然需要直接调用单体的同步 API,或者设计更复杂的分布式事务方案如 Saga 模式。

另一个挑战是数据初始化。当一个新的微服务上线时,它需要获取存量的历史数据。这通常需要开发一个一次性的数据迁移脚本,从 Rails 数据库中批量读取数据,并将其转换为事件注入到 NATS 中,供新服务消费。这个过程需要仔细规划以避免影响线上业务。

未来的演进方向是明确的:逐步将更多的领域和写操作迁移到新的 ASP.NET Core 服务中。例如,可以创建一个新的 UserWriteService,负责处理用户的创建和更新请求。它在完成数据库操作后,同样发布 users.createdusers.updated 事件。此时,Rails 单体和其他服务将成为这些事件的消费者,而不是生产者。通过这种方式,单体的功能被一步步蚕食,最终可以被安全地移除,完成整个现代化改造。


  目录