mirror of
https://github.com/GeWuYou/GFramework.git
synced 2026-03-22 10:34:30 +08:00
- 将所有小写的命名空间导入更正为首字母大写格式 - 统一 GFramework 框架的命名空间引用规范 - 修复 core、ecs、godot 等模块的命名空间导入错误 - 标准化文档示例代码中的 using 语句格式 - 确保所有文档中的命名空间引用保持一致性 - 更新 global using 语句以匹配正确的命名空间格式
15 KiB
15 KiB
title, description
| title | description |
|---|---|
| CQRS 与 Mediator | CQRS 模式通过 Mediator 实现命令查询职责分离,提供清晰的业务逻辑组织方式。 |
CQRS 与 Mediator
概述
CQRS(Command Query Responsibility Segregation,命令查询职责分离)是一种架构模式,将数据的读取(Query)和修改(Command)操作分离。GFramework 通过集成 Mediator 库实现了 CQRS 模式,提供了类型安全、解耦的业务逻辑处理方式。
通过 CQRS,你可以将复杂的业务逻辑拆分为独立的命令和查询处理器,每个处理器只负责单一职责,使代码更易于测试和维护。
主要特性:
- 命令查询职责分离
- 基于 Mediator 模式的解耦设计
- 支持管道行为(Behaviors)
- 异步处理支持
- 与架构系统深度集成
- 支持流式处理
核心概念
Command(命令)
命令表示修改系统状态的操作,如创建、更新、删除:
using GFramework.Core.CQRS.Command;
using GFramework.Core.Abstractions.CQRS.Command;
// 定义命令输入
public class CreatePlayerInput : ICommandInput
{
public string Name { get; set; }
public int Level { get; set; }
}
// 定义命令
public class CreatePlayerCommand : CommandBase<CreatePlayerInput, int>
{
public CreatePlayerCommand(CreatePlayerInput input) : base(input) { }
}
Query(查询)
查询表示读取系统状态的操作,不修改数据:
using GFramework.Core.CQRS.Query;
using GFramework.Core.Abstractions.CQRS.Query;
// 定义查询输入
public class GetPlayerInput : IQueryInput
{
public int PlayerId { get; set; }
}
// 定义查询
public class GetPlayerQuery : QueryBase<GetPlayerInput, PlayerData>
{
public GetPlayerQuery(GetPlayerInput input) : base(input) { }
}
Handler(处理器)
处理器负责执行命令或查询的具体逻辑:
using GFramework.Core.CQRS.Command;
using Mediator;
// 命令处理器
public class CreatePlayerCommandHandler : AbstractCommandHandler<CreatePlayerCommand, int>
{
public override async ValueTask<int> Handle(
CreatePlayerCommand command,
CancellationToken cancellationToken)
{
var input = command.Input;
var playerModel = this.GetModel<PlayerModel>();
// 创建玩家
var playerId = playerModel.CreatePlayer(input.Name, input.Level);
return playerId;
}
}
Mediator(中介者)
Mediator 负责将命令/查询路由到对应的处理器:
// 通过 Mediator 发送命令
var command = new CreatePlayerCommand(new CreatePlayerInput
{
Name = "Player1",
Level = 1
});
var playerId = await mediator.Send(command);
基本用法
定义和发送命令
// 1. 定义命令输入
public class SaveGameInput : ICommandInput
{
public int SlotId { get; set; }
public GameData Data { get; set; }
}
// 2. 定义命令
public class SaveGameCommand : CommandBase<SaveGameInput, Unit>
{
public SaveGameCommand(SaveGameInput input) : base(input) { }
}
// 3. 实现命令处理器
public class SaveGameCommandHandler : AbstractCommandHandler<SaveGameCommand>
{
public override async ValueTask<Unit> Handle(
SaveGameCommand command,
CancellationToken cancellationToken)
{
var input = command.Input;
var saveSystem = this.GetSystem<SaveSystem>();
// 保存游戏
await saveSystem.SaveAsync(input.SlotId, input.Data);
// 发送事件
this.SendEvent(new GameSavedEvent { SlotId = input.SlotId });
return Unit.Value;
}
}
// 4. 发送命令
public async Task SaveGame()
{
var mediator = this.GetService<IMediator>();
var command = new SaveGameCommand(new SaveGameInput
{
SlotId = 1,
Data = currentGameData
});
await mediator.Send(command);
}
定义和发送查询
// 1. 定义查询输入
public class GetHighScoresInput : IQueryInput
{
public int Count { get; set; } = 10;
}
// 2. 定义查询
public class GetHighScoresQuery : QueryBase<GetHighScoresInput, List<ScoreData>>
{
public GetHighScoresQuery(GetHighScoresInput input) : base(input) { }
}
// 3. 实现查询处理器
public class GetHighScoresQueryHandler : AbstractQueryHandler<GetHighScoresQuery, List<ScoreData>>
{
public override async ValueTask<List<ScoreData>> Handle(
GetHighScoresQuery query,
CancellationToken cancellationToken)
{
var input = query.Input;
var scoreModel = this.GetModel<ScoreModel>();
// 查询高分榜
var scores = await scoreModel.GetTopScoresAsync(input.Count);
return scores;
}
}
// 4. 发送查询
public async Task<List<ScoreData>> GetHighScores()
{
var mediator = this.GetService<IMediator>();
var query = new GetHighScoresQuery(new GetHighScoresInput
{
Count = 10
});
var scores = await mediator.Send(query);
return scores;
}
注册处理器
在架构中注册 Mediator 和处理器:
public class GameArchitecture : Architecture
{
protected override void Init()
{
// 注册 Mediator 行为
RegisterMediatorBehavior<LoggingBehavior>();
RegisterMediatorBehavior<PerformanceBehavior>();
// 处理器会自动通过依赖注入注册
}
}
高级用法
Request(请求)
Request 是更通用的消息类型,可以用于任何场景:
using GFramework.Core.CQRS.Request;
using GFramework.Core.Abstractions.CQRS.Request;
// 定义请求输入
public class ValidatePlayerInput : IRequestInput
{
public string PlayerName { get; set; }
}
// 定义请求
public class ValidatePlayerRequest : RequestBase<ValidatePlayerInput, bool>
{
public ValidatePlayerRequest(ValidatePlayerInput input) : base(input) { }
}
// 实现请求处理器
public class ValidatePlayerRequestHandler : AbstractRequestHandler<ValidatePlayerRequest, bool>
{
public override async ValueTask<bool> Handle(
ValidatePlayerRequest request,
CancellationToken cancellationToken)
{
var input = request.Input;
var playerModel = this.GetModel<PlayerModel>();
// 验证玩家名称
var isValid = await playerModel.IsNameValidAsync(input.PlayerName);
return isValid;
}
}
Notification(通知)
Notification 用于一对多的消息广播:
using GFramework.Core.CQRS.Notification;
using GFramework.Core.Abstractions.CQRS.Notification;
// 定义通知输入
public class PlayerLevelUpInput : INotificationInput
{
public int PlayerId { get; set; }
public int NewLevel { get; set; }
}
// 定义通知
public class PlayerLevelUpNotification : NotificationBase<PlayerLevelUpInput>
{
public PlayerLevelUpNotification(PlayerLevelUpInput input) : base(input) { }
}
// 实现通知处理器 1
public class AchievementNotificationHandler : AbstractNotificationHandler<PlayerLevelUpNotification>
{
public override async ValueTask Handle(
PlayerLevelUpNotification notification,
CancellationToken cancellationToken)
{
var input = notification.Input;
// 检查成就
CheckLevelAchievements(input.PlayerId, input.NewLevel);
await Task.CompletedTask;
}
}
// 实现通知处理器 2
public class RewardNotificationHandler : AbstractNotificationHandler<PlayerLevelUpNotification>
{
public override async ValueTask Handle(
PlayerLevelUpNotification notification,
CancellationToken cancellationToken)
{
var input = notification.Input;
// 发放奖励
GiveRewards(input.PlayerId, input.NewLevel);
await Task.CompletedTask;
}
}
// 发布通知(所有处理器都会收到)
var notification = new PlayerLevelUpNotification(new PlayerLevelUpInput
{
PlayerId = 1,
NewLevel = 10
});
await mediator.Publish(notification);
Pipeline Behaviors(管道行为)
Behaviors 可以在处理器执行前后添加横切关注点:
using Mediator;
// 日志行为
public class LoggingBehavior<TMessage, TResponse> : IPipelineBehavior<TMessage, TResponse>
where TMessage : IMessage
{
public async ValueTask<TResponse> Handle(
TMessage message,
CancellationToken cancellationToken,
MessageHandlerDelegate<TMessage, TResponse> next)
{
var messageName = message.GetType().Name;
Console.WriteLine($"[开始] {messageName}");
var response = await next(message, cancellationToken);
Console.WriteLine($"[完成] {messageName}");
return response;
}
}
// 性能监控行为
public class PerformanceBehavior<TMessage, TResponse> : IPipelineBehavior<TMessage, TResponse>
where TMessage : IMessage
{
public async ValueTask<TResponse> Handle(
TMessage message,
CancellationToken cancellationToken,
MessageHandlerDelegate<TMessage, TResponse> next)
{
var stopwatch = Stopwatch.StartNew();
var response = await next(message, cancellationToken);
stopwatch.Stop();
var elapsed = stopwatch.ElapsedMilliseconds;
if (elapsed > 100)
{
Console.WriteLine($"警告: {message.GetType().Name} 耗时 {elapsed}ms");
}
return response;
}
}
// 注册行为
RegisterMediatorBehavior<LoggingBehavior<,>>();
RegisterMediatorBehavior<PerformanceBehavior<,>>();
验证行为
public class ValidationBehavior<TMessage, TResponse> : IPipelineBehavior<TMessage, TResponse>
where TMessage : IMessage
{
public async ValueTask<TResponse> Handle(
TMessage message,
CancellationToken cancellationToken,
MessageHandlerDelegate<TMessage, TResponse> next)
{
// 验证输入
if (message is IValidatable validatable)
{
var errors = validatable.Validate();
if (errors.Any())
{
throw new ValidationException(errors);
}
}
return await next(message, cancellationToken);
}
}
流式处理
处理大量数据时使用流式处理:
// 流式查询
public class GetAllPlayersStreamQuery : QueryBase<EmptyInput, IAsyncEnumerable<PlayerData>>
{
public GetAllPlayersStreamQuery() : base(new EmptyInput()) { }
}
// 流式查询处理器
public class GetAllPlayersStreamQueryHandler : AbstractStreamQueryHandler<GetAllPlayersStreamQuery, PlayerData>
{
public override async IAsyncEnumerable<PlayerData> Handle(
GetAllPlayersStreamQuery query,
[EnumeratorCancellation] CancellationToken cancellationToken)
{
var playerModel = this.GetModel<PlayerModel>();
await foreach (var player in playerModel.GetAllPlayersAsync(cancellationToken))
{
yield return player;
}
}
}
// 使用流式查询
var query = new GetAllPlayersStreamQuery();
var stream = await mediator.CreateStream(query);
await foreach (var player in stream)
{
Console.WriteLine($"玩家: {player.Name}");
}
最佳实践
-
命令和查询分离:严格区分修改和读取操作
✓ CreatePlayerCommand, GetPlayerQuery // 职责清晰 ✗ PlayerCommand // 职责不明确 -
使用有意义的命名:命令用动词,查询用 Get
✓ CreatePlayerCommand, UpdateScoreCommand, GetHighScoresQuery ✗ PlayerCommand, ScoreCommand, ScoresQuery -
输入验证:在处理器中验证输入
public override async ValueTask<int> Handle(...) { if (string.IsNullOrEmpty(command.Input.Name)) throw new ArgumentException("Name is required"); // 处理逻辑 } -
使用 Behaviors 处理横切关注点:日志、性能、验证等
RegisterMediatorBehavior<LoggingBehavior<,>>(); RegisterMediatorBehavior<ValidationBehavior<,>>(); -
保持处理器简单:一个处理器只做一件事
✓ 处理器只负责业务逻辑,通过架构组件访问数据 ✗ 处理器中包含复杂的数据访问和业务逻辑 -
使用 CancellationToken:支持操作取消
public override async ValueTask<T> Handle(..., CancellationToken cancellationToken) { await someAsyncOperation(cancellationToken); }
常见问题
问题:Command 和 Query 有什么区别?
解答:
- Command:修改系统状态,可能有副作用,通常返回 void 或简单结果
- Query:只读取数据,无副作用,返回查询结果
// Command: 修改状态
CreatePlayerCommand -> 创建玩家
UpdateScoreCommand -> 更新分数
// Query: 读取数据
GetPlayerQuery -> 获取玩家信息
GetHighScoresQuery -> 获取高分榜
问题:什么时候使用 Request?
解答: Request 是更通用的消息类型,当操作既不是纯命令也不是纯查询时使用:
// 验证操作:读取数据并返回结果,但不修改状态
ValidatePlayerRequest
// 计算操作:基于输入计算结果
CalculateDamageRequest
问题:Notification 和 Event 有什么区别?
解答:
- Notification:通过 Mediator 发送,处理器在同一请求上下文中执行
- Event:通过 EventBus 发送,监听器异步执行
// Notification: 同步处理
await mediator.Publish(notification); // 等待所有处理器完成
// Event: 异步处理
this.SendEvent(event); // 立即返回,监听器异步执行
问题:如何处理命令失败?
解答: 使用异常或返回 Result 类型:
// 方式 1: 抛出异常
public override async ValueTask<Unit> Handle(...)
{
if (!IsValid())
throw new InvalidOperationException("Invalid operation");
return Unit.Value;
}
// 方式 2: 返回 Result
public override async ValueTask<Result> Handle(...)
{
if (!IsValid())
return Result.Failure("Invalid operation");
return Result.Success();
}
问题:处理器可以调用其他处理器吗?
解答: 可以,通过 Mediator 发送新的命令或查询:
public override async ValueTask<Unit> Handle(...)
{
var mediator = this.GetService<IMediator>();
// 调用其他命令
await mediator.Send(new AnotherCommand(...));
return Unit.Value;
}
问题:如何测试处理器?
解答: 处理器是独立的类,易于单元测试:
[Test]
public async Task CreatePlayer_ShouldReturnPlayerId()
{
// Arrange
var handler = new CreatePlayerCommandHandler();
handler.SetContext(mockContext);
var command = new CreatePlayerCommand(new CreatePlayerInput
{
Name = "Test",
Level = 1
});
// Act
var playerId = await handler.Handle(command, CancellationToken.None);
// Assert
Assert.That(playerId, Is.GreaterThan(0));
}