GeWuYou fb14d7122c docs(style): 更新文档中的命名空间导入格式
- 将所有小写的命名空间导入更正为首字母大写格式
- 统一 GFramework 框架的命名空间引用规范
- 修复 core、ecs、godot 等模块的命名空间导入错误
- 标准化文档示例代码中的 using 语句格式
- 确保所有文档中的命名空间引用保持一致性
- 更新 global using 语句以匹配正确的命名空间格式
2026-03-10 07:18:49 +08:00

15 KiB
Raw Blame History

title, description
title description
CQRS 与 Mediator CQRS 模式通过 Mediator 实现命令查询职责分离,提供清晰的业务逻辑组织方式。

CQRS 与 Mediator

概述

CQRSCommand Query Responsibility Segregation命令查询职责分离是一种架构模式将数据的读取Query和修改Command操作分离。GFramework 通过集成 Mediator 库实现了 CQRS 模式,提供了类型安全、解耦的业务逻辑处理方式。

通过 CQRS你可以将复杂的业务逻辑拆分为独立的命令和查询处理器每个处理器只负责单一职责使代码更易于测试和维护。

主要特性

  • 命令查询职责分离
  • 基于 Mediator 模式的解耦设计
  • 支持管道行为Behaviors
  • 异步处理支持
  • 与架构系统深度集成
  • 支持流式处理

核心概念

Command命令

命令表示修改系统状态的操作,如创建、更新、删除:

using GFramework.Core.CQRS.Command;
using GFramework.Core.Abstractions.CQRS.Command;

// 定义命令输入
public class CreatePlayerInput : ICommandInput
{
    public string Name { get; set; }
    public int Level { get; set; }
}

// 定义命令
public class CreatePlayerCommand : CommandBase<CreatePlayerInput, int>
{
    public CreatePlayerCommand(CreatePlayerInput input) : base(input) { }
}

Query查询

查询表示读取系统状态的操作,不修改数据:

using GFramework.Core.CQRS.Query;
using GFramework.Core.Abstractions.CQRS.Query;

// 定义查询输入
public class GetPlayerInput : IQueryInput
{
    public int PlayerId { get; set; }
}

// 定义查询
public class GetPlayerQuery : QueryBase<GetPlayerInput, PlayerData>
{
    public GetPlayerQuery(GetPlayerInput input) : base(input) { }
}

Handler处理器

处理器负责执行命令或查询的具体逻辑:

using GFramework.Core.CQRS.Command;
using Mediator;

// 命令处理器
public class CreatePlayerCommandHandler : AbstractCommandHandler<CreatePlayerCommand, int>
{
    public override async ValueTask<int> Handle(
        CreatePlayerCommand command,
        CancellationToken cancellationToken)
    {
        var input = command.Input;
        var playerModel = this.GetModel<PlayerModel>();

        // 创建玩家
        var playerId = playerModel.CreatePlayer(input.Name, input.Level);

        return playerId;
    }
}

Mediator中介者

Mediator 负责将命令/查询路由到对应的处理器:

// 通过 Mediator 发送命令
var command = new CreatePlayerCommand(new CreatePlayerInput
{
    Name = "Player1",
    Level = 1
});

var playerId = await mediator.Send(command);

基本用法

定义和发送命令

// 1. 定义命令输入
public class SaveGameInput : ICommandInput
{
    public int SlotId { get; set; }
    public GameData Data { get; set; }
}

// 2. 定义命令
public class SaveGameCommand : CommandBase<SaveGameInput, Unit>
{
    public SaveGameCommand(SaveGameInput input) : base(input) { }
}

// 3. 实现命令处理器
public class SaveGameCommandHandler : AbstractCommandHandler<SaveGameCommand>
{
    public override async ValueTask<Unit> Handle(
        SaveGameCommand command,
        CancellationToken cancellationToken)
    {
        var input = command.Input;
        var saveSystem = this.GetSystem<SaveSystem>();

        // 保存游戏
        await saveSystem.SaveAsync(input.SlotId, input.Data);

        // 发送事件
        this.SendEvent(new GameSavedEvent { SlotId = input.SlotId });

        return Unit.Value;
    }
}

// 4. 发送命令
public async Task SaveGame()
{
    var mediator = this.GetService<IMediator>();

    var command = new SaveGameCommand(new SaveGameInput
    {
        SlotId = 1,
        Data = currentGameData
    });

    await mediator.Send(command);
}

定义和发送查询

// 1. 定义查询输入
public class GetHighScoresInput : IQueryInput
{
    public int Count { get; set; } = 10;
}

// 2. 定义查询
public class GetHighScoresQuery : QueryBase<GetHighScoresInput, List<ScoreData>>
{
    public GetHighScoresQuery(GetHighScoresInput input) : base(input) { }
}

// 3. 实现查询处理器
public class GetHighScoresQueryHandler : AbstractQueryHandler<GetHighScoresQuery, List<ScoreData>>
{
    public override async ValueTask<List<ScoreData>> Handle(
        GetHighScoresQuery query,
        CancellationToken cancellationToken)
    {
        var input = query.Input;
        var scoreModel = this.GetModel<ScoreModel>();

        // 查询高分榜
        var scores = await scoreModel.GetTopScoresAsync(input.Count);

        return scores;
    }
}

// 4. 发送查询
public async Task<List<ScoreData>> GetHighScores()
{
    var mediator = this.GetService<IMediator>();

    var query = new GetHighScoresQuery(new GetHighScoresInput
    {
        Count = 10
    });

    var scores = await mediator.Send(query);
    return scores;
}

注册处理器

在架构中注册 Mediator 和处理器:

public class GameArchitecture : Architecture
{
    protected override void Init()
    {
        // 注册 Mediator 行为
        RegisterMediatorBehavior<LoggingBehavior>();
        RegisterMediatorBehavior<PerformanceBehavior>();

        // 处理器会自动通过依赖注入注册
    }
}

高级用法

Request请求

Request 是更通用的消息类型,可以用于任何场景:

using GFramework.Core.CQRS.Request;
using GFramework.Core.Abstractions.CQRS.Request;

// 定义请求输入
public class ValidatePlayerInput : IRequestInput
{
    public string PlayerName { get; set; }
}

// 定义请求
public class ValidatePlayerRequest : RequestBase<ValidatePlayerInput, bool>
{
    public ValidatePlayerRequest(ValidatePlayerInput input) : base(input) { }
}

// 实现请求处理器
public class ValidatePlayerRequestHandler : AbstractRequestHandler<ValidatePlayerRequest, bool>
{
    public override async ValueTask<bool> Handle(
        ValidatePlayerRequest request,
        CancellationToken cancellationToken)
    {
        var input = request.Input;
        var playerModel = this.GetModel<PlayerModel>();

        // 验证玩家名称
        var isValid = await playerModel.IsNameValidAsync(input.PlayerName);

        return isValid;
    }
}

Notification通知

Notification 用于一对多的消息广播:

using GFramework.Core.CQRS.Notification;
using GFramework.Core.Abstractions.CQRS.Notification;

// 定义通知输入
public class PlayerLevelUpInput : INotificationInput
{
    public int PlayerId { get; set; }
    public int NewLevel { get; set; }
}

// 定义通知
public class PlayerLevelUpNotification : NotificationBase<PlayerLevelUpInput>
{
    public PlayerLevelUpNotification(PlayerLevelUpInput input) : base(input) { }
}

// 实现通知处理器 1
public class AchievementNotificationHandler : AbstractNotificationHandler<PlayerLevelUpNotification>
{
    public override async ValueTask Handle(
        PlayerLevelUpNotification notification,
        CancellationToken cancellationToken)
    {
        var input = notification.Input;
        // 检查成就
        CheckLevelAchievements(input.PlayerId, input.NewLevel);
        await Task.CompletedTask;
    }
}

// 实现通知处理器 2
public class RewardNotificationHandler : AbstractNotificationHandler<PlayerLevelUpNotification>
{
    public override async ValueTask Handle(
        PlayerLevelUpNotification notification,
        CancellationToken cancellationToken)
    {
        var input = notification.Input;
        // 发放奖励
        GiveRewards(input.PlayerId, input.NewLevel);
        await Task.CompletedTask;
    }
}

// 发布通知(所有处理器都会收到)
var notification = new PlayerLevelUpNotification(new PlayerLevelUpInput
{
    PlayerId = 1,
    NewLevel = 10
});

await mediator.Publish(notification);

Pipeline Behaviors管道行为

Behaviors 可以在处理器执行前后添加横切关注点:

using Mediator;

// 日志行为
public class LoggingBehavior<TMessage, TResponse> : IPipelineBehavior<TMessage, TResponse>
    where TMessage : IMessage
{
    public async ValueTask<TResponse> Handle(
        TMessage message,
        CancellationToken cancellationToken,
        MessageHandlerDelegate<TMessage, TResponse> next)
    {
        var messageName = message.GetType().Name;
        Console.WriteLine($"[开始] {messageName}");

        var response = await next(message, cancellationToken);

        Console.WriteLine($"[完成] {messageName}");

        return response;
    }
}

// 性能监控行为
public class PerformanceBehavior<TMessage, TResponse> : IPipelineBehavior<TMessage, TResponse>
    where TMessage : IMessage
{
    public async ValueTask<TResponse> Handle(
        TMessage message,
        CancellationToken cancellationToken,
        MessageHandlerDelegate<TMessage, TResponse> next)
    {
        var stopwatch = Stopwatch.StartNew();

        var response = await next(message, cancellationToken);

        stopwatch.Stop();
        var elapsed = stopwatch.ElapsedMilliseconds;

        if (elapsed > 100)
        {
            Console.WriteLine($"警告: {message.GetType().Name} 耗时 {elapsed}ms");
        }

        return response;
    }
}

// 注册行为
RegisterMediatorBehavior<LoggingBehavior<,>>();
RegisterMediatorBehavior<PerformanceBehavior<,>>();

验证行为

public class ValidationBehavior<TMessage, TResponse> : IPipelineBehavior<TMessage, TResponse>
    where TMessage : IMessage
{
    public async ValueTask<TResponse> Handle(
        TMessage message,
        CancellationToken cancellationToken,
        MessageHandlerDelegate<TMessage, TResponse> next)
    {
        // 验证输入
        if (message is IValidatable validatable)
        {
            var errors = validatable.Validate();
            if (errors.Any())
            {
                throw new ValidationException(errors);
            }
        }

        return await next(message, cancellationToken);
    }
}

流式处理

处理大量数据时使用流式处理:

// 流式查询
public class GetAllPlayersStreamQuery : QueryBase<EmptyInput, IAsyncEnumerable<PlayerData>>
{
    public GetAllPlayersStreamQuery() : base(new EmptyInput()) { }
}

// 流式查询处理器
public class GetAllPlayersStreamQueryHandler : AbstractStreamQueryHandler<GetAllPlayersStreamQuery, PlayerData>
{
    public override async IAsyncEnumerable<PlayerData> Handle(
        GetAllPlayersStreamQuery query,
        [EnumeratorCancellation] CancellationToken cancellationToken)
    {
        var playerModel = this.GetModel<PlayerModel>();

        await foreach (var player in playerModel.GetAllPlayersAsync(cancellationToken))
        {
            yield return player;
        }
    }
}

// 使用流式查询
var query = new GetAllPlayersStreamQuery();
var stream = await mediator.CreateStream(query);

await foreach (var player in stream)
{
    Console.WriteLine($"玩家: {player.Name}");
}

最佳实践

  1. 命令和查询分离:严格区分修改和读取操作

     CreatePlayerCommand, GetPlayerQuery // 职责清晰
     PlayerCommand // 职责不明确
    
  2. 使用有意义的命名:命令用动词,查询用 Get

     CreatePlayerCommand, UpdateScoreCommand, GetHighScoresQuery
     PlayerCommand, ScoreCommand, ScoresQuery
    
  3. 输入验证:在处理器中验证输入

    public override async ValueTask<int> Handle(...)
    {
        if (string.IsNullOrEmpty(command.Input.Name))
            throw new ArgumentException("Name is required");
    
        // 处理逻辑
    }
    
  4. 使用 Behaviors 处理横切关注点:日志、性能、验证等

    RegisterMediatorBehavior<LoggingBehavior<,>>();
    RegisterMediatorBehavior<ValidationBehavior<,>>();
    
  5. 保持处理器简单:一个处理器只做一件事

     处理器只负责业务逻辑,通过架构组件访问数据
     处理器中包含复杂的数据访问和业务逻辑
    
  6. 使用 CancellationToken:支持操作取消

    public override async ValueTask<T> Handle(..., CancellationToken cancellationToken)
    {
        await someAsyncOperation(cancellationToken);
    }
    

常见问题

问题Command 和 Query 有什么区别?

解答

  • Command:修改系统状态,可能有副作用,通常返回 void 或简单结果
  • Query:只读取数据,无副作用,返回查询结果
// Command: 修改状态
CreatePlayerCommand -> 创建玩家
UpdateScoreCommand -> 更新分数

// Query: 读取数据
GetPlayerQuery -> 获取玩家信息
GetHighScoresQuery -> 获取高分榜

问题:什么时候使用 Request

解答 Request 是更通用的消息类型,当操作既不是纯命令也不是纯查询时使用:

// 验证操作:读取数据并返回结果,但不修改状态
ValidatePlayerRequest

// 计算操作:基于输入计算结果
CalculateDamageRequest

问题Notification 和 Event 有什么区别?

解答

  • Notification:通过 Mediator 发送,处理器在同一请求上下文中执行
  • Event:通过 EventBus 发送,监听器异步执行
// Notification: 同步处理
await mediator.Publish(notification); // 等待所有处理器完成

// Event: 异步处理
this.SendEvent(event); // 立即返回,监听器异步执行

问题:如何处理命令失败?

解答 使用异常或返回 Result 类型:

// 方式 1: 抛出异常
public override async ValueTask<Unit> Handle(...)
{
    if (!IsValid())
        throw new InvalidOperationException("Invalid operation");

    return Unit.Value;
}

// 方式 2: 返回 Result
public override async ValueTask<Result> Handle(...)
{
    if (!IsValid())
        return Result.Failure("Invalid operation");

    return Result.Success();
}

问题:处理器可以调用其他处理器吗?

解答 可以,通过 Mediator 发送新的命令或查询:

public override async ValueTask<Unit> Handle(...)
{
    var mediator = this.GetService<IMediator>();

    // 调用其他命令
    await mediator.Send(new AnotherCommand(...));

    return Unit.Value;
}

问题:如何测试处理器?

解答 处理器是独立的类,易于单元测试:

[Test]
public async Task CreatePlayer_ShouldReturnPlayerId()
{
    // Arrange
    var handler = new CreatePlayerCommandHandler();
    handler.SetContext(mockContext);

    var command = new CreatePlayerCommand(new CreatePlayerInput
    {
        Name = "Test",
        Level = 1
    });

    // Act
    var playerId = await handler.Handle(command, CancellationToken.None);

    // Assert
    Assert.That(playerId, Is.GreaterThan(0));
}

相关文档