--- title: CQRS 与 Mediator description: CQRS 模式通过 Mediator 实现命令查询职责分离,提供清晰的业务逻辑组织方式。 --- # CQRS 与 Mediator ## 概述 CQRS(Command Query Responsibility Segregation,命令查询职责分离)是一种架构模式,将数据的读取(Query)和修改(Command)操作分离。GFramework 通过集成 Mediator 库实现了 CQRS 模式,提供了类型安全、解耦的业务逻辑处理方式。 通过 CQRS,你可以将复杂的业务逻辑拆分为独立的命令和查询处理器,每个处理器只负责单一职责,使代码更易于测试和维护。 **主要特性**: - 命令查询职责分离 - 基于 Mediator 模式的解耦设计 - 支持管道行为(Behaviors) - 异步处理支持 - 与架构系统深度集成 - 支持流式处理 ## 核心概念 ### Command(命令) 命令表示修改系统状态的操作,如创建、更新、删除: ```csharp using GFramework.Core.CQRS.Command; using GFramework.Core.Abstractions.CQRS.Command; // 定义命令输入 public class CreatePlayerInput : ICommandInput { public string Name { get; set; } public int Level { get; set; } } // 定义命令 public class CreatePlayerCommand : CommandBase { public CreatePlayerCommand(CreatePlayerInput input) : base(input) { } } ``` ### Query(查询) 查询表示读取系统状态的操作,不修改数据: ```csharp using GFramework.Core.CQRS.Query; using GFramework.Core.Abstractions.CQRS.Query; // 定义查询输入 public class GetPlayerInput : IQueryInput { public int PlayerId { get; set; } } // 定义查询 public class GetPlayerQuery : QueryBase { public GetPlayerQuery(GetPlayerInput input) : base(input) { } } ``` ### Handler(处理器) 处理器负责执行命令或查询的具体逻辑: ```csharp using GFramework.Core.CQRS.Command; using Mediator; // 命令处理器 public class CreatePlayerCommandHandler : AbstractCommandHandler { public override async ValueTask Handle( CreatePlayerCommand command, CancellationToken cancellationToken) { var input = command.Input; var playerModel = this.GetModel(); // 创建玩家 var playerId = playerModel.CreatePlayer(input.Name, input.Level); return playerId; } } ``` ### Mediator(中介者) Mediator 负责将命令/查询路由到对应的处理器: ```csharp // 通过 Mediator 发送命令 var command = new CreatePlayerCommand(new CreatePlayerInput { Name = "Player1", Level = 1 }); var playerId = await mediator.Send(command); ``` ## 基本用法 ### 定义和发送命令 ```csharp // 1. 定义命令输入 public class SaveGameInput : ICommandInput { public int SlotId { get; set; } public GameData Data { get; set; } } // 2. 定义命令 public class SaveGameCommand : CommandBase { public SaveGameCommand(SaveGameInput input) : base(input) { } } // 3. 实现命令处理器 public class SaveGameCommandHandler : AbstractCommandHandler { public override async ValueTask Handle( SaveGameCommand command, CancellationToken cancellationToken) { var input = command.Input; var saveSystem = this.GetSystem(); // 保存游戏 await saveSystem.SaveAsync(input.SlotId, input.Data); // 发送事件 this.SendEvent(new GameSavedEvent { SlotId = input.SlotId }); return Unit.Value; } } // 4. 发送命令 public async Task SaveGame() { var mediator = this.GetService(); var command = new SaveGameCommand(new SaveGameInput { SlotId = 1, Data = currentGameData }); await mediator.Send(command); } ``` ### 定义和发送查询 ```csharp // 1. 定义查询输入 public class GetHighScoresInput : IQueryInput { public int Count { get; set; } = 10; } // 2. 定义查询 public class GetHighScoresQuery : QueryBase> { public GetHighScoresQuery(GetHighScoresInput input) : base(input) { } } // 3. 实现查询处理器 public class GetHighScoresQueryHandler : AbstractQueryHandler> { public override async ValueTask> Handle( GetHighScoresQuery query, CancellationToken cancellationToken) { var input = query.Input; var scoreModel = this.GetModel(); // 查询高分榜 var scores = await scoreModel.GetTopScoresAsync(input.Count); return scores; } } // 4. 发送查询 public async Task> GetHighScores() { var mediator = this.GetService(); var query = new GetHighScoresQuery(new GetHighScoresInput { Count = 10 }); var scores = await mediator.Send(query); return scores; } ``` ### 注册处理器 在架构中注册 Mediator 和处理器: ```csharp public class GameArchitecture : Architecture { protected override void Init() { // 注册 Mediator 行为 RegisterMediatorBehavior(); RegisterMediatorBehavior(); // 处理器会自动通过依赖注入注册 } } ``` ## 高级用法 ### Request(请求) Request 是更通用的消息类型,可以用于任何场景: ```csharp using GFramework.Core.CQRS.Request; using GFramework.Core.Abstractions.CQRS.Request; // 定义请求输入 public class ValidatePlayerInput : IRequestInput { public string PlayerName { get; set; } } // 定义请求 public class ValidatePlayerRequest : RequestBase { public ValidatePlayerRequest(ValidatePlayerInput input) : base(input) { } } // 实现请求处理器 public class ValidatePlayerRequestHandler : AbstractRequestHandler { public override async ValueTask Handle( ValidatePlayerRequest request, CancellationToken cancellationToken) { var input = request.Input; var playerModel = this.GetModel(); // 验证玩家名称 var isValid = await playerModel.IsNameValidAsync(input.PlayerName); return isValid; } } ``` ### Notification(通知) Notification 用于一对多的消息广播: ```csharp using GFramework.Core.CQRS.Notification; using GFramework.Core.Abstractions.CQRS.Notification; // 定义通知输入 public class PlayerLevelUpInput : INotificationInput { public int PlayerId { get; set; } public int NewLevel { get; set; } } // 定义通知 public class PlayerLevelUpNotification : NotificationBase { public PlayerLevelUpNotification(PlayerLevelUpInput input) : base(input) { } } // 实现通知处理器 1 public class AchievementNotificationHandler : AbstractNotificationHandler { public override async ValueTask Handle( PlayerLevelUpNotification notification, CancellationToken cancellationToken) { var input = notification.Input; // 检查成就 CheckLevelAchievements(input.PlayerId, input.NewLevel); await Task.CompletedTask; } } // 实现通知处理器 2 public class RewardNotificationHandler : AbstractNotificationHandler { public override async ValueTask Handle( PlayerLevelUpNotification notification, CancellationToken cancellationToken) { var input = notification.Input; // 发放奖励 GiveRewards(input.PlayerId, input.NewLevel); await Task.CompletedTask; } } // 发布通知(所有处理器都会收到) var notification = new PlayerLevelUpNotification(new PlayerLevelUpInput { PlayerId = 1, NewLevel = 10 }); await mediator.Publish(notification); ``` ### Pipeline Behaviors(管道行为) Behaviors 可以在处理器执行前后添加横切关注点: ```csharp using Mediator; // 日志行为 public class LoggingBehavior : IPipelineBehavior where TMessage : IMessage { public async ValueTask Handle( TMessage message, CancellationToken cancellationToken, MessageHandlerDelegate next) { var messageName = message.GetType().Name; Console.WriteLine($"[开始] {messageName}"); var response = await next(message, cancellationToken); Console.WriteLine($"[完成] {messageName}"); return response; } } // 性能监控行为 public class PerformanceBehavior : IPipelineBehavior where TMessage : IMessage { public async ValueTask Handle( TMessage message, CancellationToken cancellationToken, MessageHandlerDelegate next) { var stopwatch = Stopwatch.StartNew(); var response = await next(message, cancellationToken); stopwatch.Stop(); var elapsed = stopwatch.ElapsedMilliseconds; if (elapsed > 100) { Console.WriteLine($"警告: {message.GetType().Name} 耗时 {elapsed}ms"); } return response; } } // 注册行为 RegisterMediatorBehavior>(); RegisterMediatorBehavior>(); ``` ### 验证行为 ```csharp public class ValidationBehavior : IPipelineBehavior where TMessage : IMessage { public async ValueTask Handle( TMessage message, CancellationToken cancellationToken, MessageHandlerDelegate next) { // 验证输入 if (message is IValidatable validatable) { var errors = validatable.Validate(); if (errors.Any()) { throw new ValidationException(errors); } } return await next(message, cancellationToken); } } ``` ### 流式处理 处理大量数据时使用流式处理: ```csharp // 流式查询 public class GetAllPlayersStreamQuery : QueryBase> { public GetAllPlayersStreamQuery() : base(new EmptyInput()) { } } // 流式查询处理器 public class GetAllPlayersStreamQueryHandler : AbstractStreamQueryHandler { public override async IAsyncEnumerable Handle( GetAllPlayersStreamQuery query, [EnumeratorCancellation] CancellationToken cancellationToken) { var playerModel = this.GetModel(); await foreach (var player in playerModel.GetAllPlayersAsync(cancellationToken)) { yield return player; } } } // 使用流式查询 var query = new GetAllPlayersStreamQuery(); var stream = await mediator.CreateStream(query); await foreach (var player in stream) { Console.WriteLine($"玩家: {player.Name}"); } ``` ## 最佳实践 1. **命令和查询分离**:严格区分修改和读取操作 ```csharp ✓ CreatePlayerCommand, GetPlayerQuery // 职责清晰 ✗ PlayerCommand // 职责不明确 ``` 2. **使用有意义的命名**:命令用动词,查询用 Get ```csharp ✓ CreatePlayerCommand, UpdateScoreCommand, GetHighScoresQuery ✗ PlayerCommand, ScoreCommand, ScoresQuery ``` 3. **输入验证**:在处理器中验证输入 ```csharp public override async ValueTask Handle(...) { if (string.IsNullOrEmpty(command.Input.Name)) throw new ArgumentException("Name is required"); // 处理逻辑 } ``` 4. **使用 Behaviors 处理横切关注点**:日志、性能、验证等 ```csharp RegisterMediatorBehavior>(); RegisterMediatorBehavior>(); ``` 5. **保持处理器简单**:一个处理器只做一件事 ```csharp ✓ 处理器只负责业务逻辑,通过架构组件访问数据 ✗ 处理器中包含复杂的数据访问和业务逻辑 ``` 6. **使用 CancellationToken**:支持操作取消 ```csharp public override async ValueTask Handle(..., CancellationToken cancellationToken) { await someAsyncOperation(cancellationToken); } ``` ## 常见问题 ### 问题:Command 和 Query 有什么区别? **解答**: - **Command**:修改系统状态,可能有副作用,通常返回 void 或简单结果 - **Query**:只读取数据,无副作用,返回查询结果 ```csharp // Command: 修改状态 CreatePlayerCommand -> 创建玩家 UpdateScoreCommand -> 更新分数 // Query: 读取数据 GetPlayerQuery -> 获取玩家信息 GetHighScoresQuery -> 获取高分榜 ``` ### 问题:什么时候使用 Request? **解答**: Request 是更通用的消息类型,当操作既不是纯命令也不是纯查询时使用: ```csharp // 验证操作:读取数据并返回结果,但不修改状态 ValidatePlayerRequest // 计算操作:基于输入计算结果 CalculateDamageRequest ``` ### 问题:Notification 和 Event 有什么区别? **解答**: - **Notification**:通过 Mediator 发送,处理器在同一请求上下文中执行 - **Event**:通过 EventBus 发送,监听器异步执行 ```csharp // Notification: 同步处理 await mediator.Publish(notification); // 等待所有处理器完成 // Event: 异步处理 this.SendEvent(event); // 立即返回,监听器异步执行 ``` ### 问题:如何处理命令失败? **解答**: 使用异常或返回 Result 类型: ```csharp // 方式 1: 抛出异常 public override async ValueTask Handle(...) { if (!IsValid()) throw new InvalidOperationException("Invalid operation"); return Unit.Value; } // 方式 2: 返回 Result public override async ValueTask Handle(...) { if (!IsValid()) return Result.Failure("Invalid operation"); return Result.Success(); } ``` ### 问题:处理器可以调用其他处理器吗? **解答**: 可以,通过 Mediator 发送新的命令或查询: ```csharp public override async ValueTask Handle(...) { var mediator = this.GetService(); // 调用其他命令 await mediator.Send(new AnotherCommand(...)); return Unit.Value; } ``` ### 问题:如何测试处理器? **解答**: 处理器是独立的类,易于单元测试: ```csharp [Test] public async Task CreatePlayer_ShouldReturnPlayerId() { // Arrange var handler = new CreatePlayerCommandHandler(); handler.SetContext(mockContext); var command = new CreatePlayerCommand(new CreatePlayerInput { Name = "Test", Level = 1 }); // Act var playerId = await handler.Handle(command, CancellationToken.None); // Assert Assert.That(playerId, Is.GreaterThan(0)); } ``` ## 相关文档 - [命令系统](/zh-CN/core/command) - 传统命令模式 - [查询系统](/zh-CN/core/query) - 传统查询模式 - [事件系统](/zh-CN/core/events) - 事件驱动架构 - [协程系统](/zh-CN/core/coroutine) - 在协程中使用 CQRS