From 48e57c85476a9df095c5af3c2c9d6705685fc105 Mon Sep 17 00:00:00 2001 From: gewuyou <95328647+GeWuYou@users.noreply.github.com> Date: Tue, 14 Apr 2026 20:46:59 +0800 Subject: [PATCH 1/4] Replace Mediator runtime with built-in CQRS --- AGENTS.md | 10 + .../Architectures/IArchitectureContext.cs | 52 ++-- .../Cqrs/Command/ICommand.cs | 25 ++ .../Cqrs/INotification.cs | 9 + .../Cqrs/INotificationHandler.cs | 17 ++ .../Cqrs/IPipelineBehavior.cs | 22 ++ GFramework.Core.Abstractions/Cqrs/IRequest.cs | 10 + .../Cqrs/IRequestHandler.cs | 18 ++ .../Cqrs/IStreamRequest.cs | 10 + .../Cqrs/IStreamRequestHandler.cs | 18 ++ .../Cqrs/MessageHandlerDelegate.cs | 14 + .../Cqrs/Query/IQuery.cs | 18 ++ GFramework.Core.Abstractions/Cqrs/Unit.cs | 13 + .../ArchitectureModulesBehaviorTests.cs | 32 +-- .../ArchitectureServicesTests.cs | 25 +- .../Architectures/GameContextTests.cs | 25 +- GFramework.Core.Tests/CqrsTestRuntime.cs | 73 +++++ .../Mediator/MediatorAdvancedFeaturesTests.cs | 15 +- .../MediatorArchitectureIntegrationTests.cs | 15 +- .../Mediator/MediatorComprehensiveTests.cs | 47 ++-- .../Architectures/ArchitectureBootstrapper.cs | 11 +- .../Architectures/ArchitectureContext.cs | 94 +++---- .../Command/AbstractCommandWithInput.cs | 6 +- .../Command/AbstractCommandWithResult.cs | 6 +- .../Cqrs/Behaviors/LoggingBehavior.cs | 4 +- .../Cqrs/Behaviors/PerformanceBehavior.cs | 4 +- .../Cqrs/Command/AbstractCommandHandler.cs | 9 +- .../Command/AbstractStreamCommandHandler.cs | 7 +- GFramework.Core/Cqrs/Command/CommandBase.cs | 3 +- .../Cqrs/Internal/CqrsDispatcher.cs | 263 ++++++++++++++++++ .../Cqrs/Internal/CqrsHandlerRegistrar.cs | 81 ++++++ .../AbstractNotificationHandler.cs | 4 +- .../Cqrs/Notification/NotificationBase.cs | 4 +- .../Cqrs/Query/AbstractQueryHandler.cs | 7 +- .../Cqrs/Query/AbstractStreamQueryHandler.cs | 7 +- GFramework.Core/Cqrs/Query/QueryBase.cs | 3 +- .../Cqrs/Request/AbstractRequestHandler.cs | 6 +- .../Request/AbstractStreamRequestHandler.cs | 4 +- GFramework.Core/Cqrs/Request/RequestBase.cs | 4 +- .../ContextAwareMediatorCommandExtensions.cs | 11 +- .../ContextAwareMediatorExtensions.cs | 6 +- .../ContextAwareMediatorQueryExtensions.cs | 11 +- GFramework.Core/Ioc/MicrosoftDiContainer.cs | 4 +- .../Query/AbstractQueryWithResult.cs | 4 +- ....GFramework.Godot.SourceGenerators.targets | 14 +- .../ContextAwareCoroutineExtensions.cs | 8 +- .../todos/cqrs-rewrite-migration-tracking.md | 109 ++++++++ .../traces/cqrs-rewrite-migration-trace.md | 68 +++++ 48 files changed, 993 insertions(+), 237 deletions(-) create mode 100644 GFramework.Core.Abstractions/Cqrs/Command/ICommand.cs create mode 100644 GFramework.Core.Abstractions/Cqrs/INotification.cs create mode 100644 GFramework.Core.Abstractions/Cqrs/INotificationHandler.cs create mode 100644 GFramework.Core.Abstractions/Cqrs/IPipelineBehavior.cs create mode 100644 GFramework.Core.Abstractions/Cqrs/IRequest.cs create mode 100644 GFramework.Core.Abstractions/Cqrs/IRequestHandler.cs create mode 100644 GFramework.Core.Abstractions/Cqrs/IStreamRequest.cs create mode 100644 GFramework.Core.Abstractions/Cqrs/IStreamRequestHandler.cs create mode 100644 GFramework.Core.Abstractions/Cqrs/MessageHandlerDelegate.cs create mode 100644 GFramework.Core.Abstractions/Cqrs/Query/IQuery.cs create mode 100644 GFramework.Core.Abstractions/Cqrs/Unit.cs create mode 100644 GFramework.Core.Tests/CqrsTestRuntime.cs create mode 100644 GFramework.Core/Cqrs/Internal/CqrsDispatcher.cs create mode 100644 GFramework.Core/Cqrs/Internal/CqrsHandlerRegistrar.cs create mode 100644 local-plan/todos/cqrs-rewrite-migration-tracking.md create mode 100644 local-plan/traces/cqrs-rewrite-migration-trace.md diff --git a/AGENTS.md b/AGENTS.md index c63a2bfa..720b53af 100644 --- a/AGENTS.md +++ b/AGENTS.md @@ -244,6 +244,16 @@ bash scripts/validate-csharp-naming.sh - Tracking updates MUST reflect completed work, newly discovered issues, validation results, and the next recommended recovery point. - Completing code changes without updating the active tracking document is considered incomplete work. +- For any multi-step refactor, migration, or cross-module task, contributors MUST create or adopt a dedicated recovery + document under `local-plan/todos/` before making substantive code changes. +- Recovery documents MUST record the current phase, the active recovery point identifier, known risks, and the next + recommended resume step so another contributor or subagent can continue the work safely. +- Contributors MUST maintain a matching execution trace under `local-plan/traces/` for complex work. The trace should + record the current date, key decisions, validation milestones, and the immediate next step. +- When a task spans multiple commits or is likely to exceed a single agent context window, update both the recovery + document and the trace at each meaningful milestone before pausing or handing work off. +- If subagents are used on a complex task, the main agent MUST capture the delegated scope and any accepted findings in + the active recovery document or trace before continuing implementation. ### Repository Documentation diff --git a/GFramework.Core.Abstractions/Architectures/IArchitectureContext.cs b/GFramework.Core.Abstractions/Architectures/IArchitectureContext.cs index 3469d8db..9a08dec6 100644 --- a/GFramework.Core.Abstractions/Architectures/IArchitectureContext.cs +++ b/GFramework.Core.Abstractions/Architectures/IArchitectureContext.cs @@ -1,11 +1,13 @@ using GFramework.Core.Abstractions.Command; +using GFramework.Core.Abstractions.Cqrs; +using GFramework.Core.Abstractions.Cqrs.Command; +using GFramework.Core.Abstractions.Cqrs.Query; using GFramework.Core.Abstractions.Environment; using GFramework.Core.Abstractions.Events; using GFramework.Core.Abstractions.Model; using GFramework.Core.Abstractions.Query; using GFramework.Core.Abstractions.Systems; using GFramework.Core.Abstractions.Utility; -using Mediator; using ICommand = GFramework.Core.Abstractions.Command.ICommand; namespace GFramework.Core.Abstractions.Architectures; @@ -118,12 +120,12 @@ public interface IArchitectureContext TResult SendCommand(Command.ICommand command); /// - /// [Mediator] 发送命令的同步版本(不推荐,仅用于兼容性) + /// 发送一个 CQRS 命令并返回结果。 /// - /// 命令响应类型 - /// 要发送的命令对象 - /// 命令执行结果 - TResponse SendCommand(Mediator.ICommand command); + /// 命令响应类型。 + /// 要发送的 CQRS 命令。 + /// 命令执行结果。 + TResponse SendCommand(GFramework.Core.Abstractions.Cqrs.Command.ICommand command); /// @@ -133,14 +135,13 @@ public interface IArchitectureContext Task SendCommandAsync(IAsyncCommand command); /// - /// [Mediator] 异步发送命令并返回结果 - /// 通过Mediator模式发送命令请求,支持取消操作 + /// 异步发送一个 CQRS 命令并返回结果。 /// - /// 命令响应类型 - /// 要发送的命令对象 - /// 取消令牌,用于取消操作 - /// 包含命令执行结果的ValueTask - ValueTask SendCommandAsync(Mediator.ICommand command, + /// 命令响应类型。 + /// 要发送的 CQRS 命令。 + /// 取消令牌。 + /// 包含命令执行结果的值任务。 + ValueTask SendCommandAsync(GFramework.Core.Abstractions.Cqrs.Command.ICommand command, CancellationToken cancellationToken = default); @@ -161,12 +162,12 @@ public interface IArchitectureContext TResult SendQuery(Query.IQuery query); /// - /// [Mediator] 发送查询的同步版本(不推荐,仅用于兼容性) + /// 发送一个 CQRS 查询并返回结果。 /// - /// 查询响应类型 - /// 要发送的查询对象 - /// 查询结果 - TResponse SendQuery(Mediator.IQuery query); + /// 查询响应类型。 + /// 要发送的 CQRS 查询。 + /// 查询结果。 + TResponse SendQuery(GFramework.Core.Abstractions.Cqrs.Query.IQuery query); /// /// 异步发送一个查询请求 @@ -177,14 +178,13 @@ public interface IArchitectureContext Task SendQueryAsync(IAsyncQuery query); /// - /// [Mediator] 异步发送查询并返回结果 - /// 通过Mediator模式发送查询请求,支持取消操作 + /// 异步发送一个 CQRS 查询并返回结果。 /// - /// 查询响应类型 - /// 要发送的查询对象 - /// 取消令牌,用于取消操作 - /// 包含查询结果的ValueTask - ValueTask SendQueryAsync(Mediator.IQuery query, + /// 查询响应类型。 + /// 要发送的 CQRS 查询。 + /// 取消令牌。 + /// 包含查询结果的值任务。 + ValueTask SendQueryAsync(GFramework.Core.Abstractions.Cqrs.Query.IQuery query, CancellationToken cancellationToken = default); /// @@ -265,4 +265,4 @@ public interface IArchitectureContext /// /// 环境对象实例 IEnvironment GetEnvironment(); -} \ No newline at end of file +} diff --git a/GFramework.Core.Abstractions/Cqrs/Command/ICommand.cs b/GFramework.Core.Abstractions/Cqrs/Command/ICommand.cs new file mode 100644 index 00000000..ba04331e --- /dev/null +++ b/GFramework.Core.Abstractions/Cqrs/Command/ICommand.cs @@ -0,0 +1,25 @@ +namespace GFramework.Core.Abstractions.Cqrs.Command; + +/// +/// 表示一个 CQRS 命令。 +/// 命令通常用于修改系统状态。 +/// +/// 命令响应类型。 +public interface ICommand : IRequest +{ +} + +/// +/// 表示一个无显式返回值的 CQRS 命令。 +/// +public interface ICommand : ICommand +{ +} + +/// +/// 表示一个流式 CQRS 命令。 +/// +/// 流式响应元素类型。 +public interface IStreamCommand : IStreamRequest +{ +} diff --git a/GFramework.Core.Abstractions/Cqrs/INotification.cs b/GFramework.Core.Abstractions/Cqrs/INotification.cs new file mode 100644 index 00000000..9d69e28e --- /dev/null +++ b/GFramework.Core.Abstractions/Cqrs/INotification.cs @@ -0,0 +1,9 @@ +namespace GFramework.Core.Abstractions.Cqrs; + +/// +/// 表示一个一对多发布的通知消息。 +/// 通知不要求返回值,允许被零个或多个处理器消费。 +/// +public interface INotification +{ +} diff --git a/GFramework.Core.Abstractions/Cqrs/INotificationHandler.cs b/GFramework.Core.Abstractions/Cqrs/INotificationHandler.cs new file mode 100644 index 00000000..23861d1d --- /dev/null +++ b/GFramework.Core.Abstractions/Cqrs/INotificationHandler.cs @@ -0,0 +1,17 @@ +namespace GFramework.Core.Abstractions.Cqrs; + +/// +/// 表示处理通知消息的处理器契约。 +/// +/// 通知类型。 +public interface INotificationHandler + where TNotification : INotification +{ + /// + /// 处理通知消息。 + /// + /// 要处理的通知。 + /// 取消令牌。 + /// 异步处理任务。 + ValueTask Handle(TNotification notification, CancellationToken cancellationToken); +} diff --git a/GFramework.Core.Abstractions/Cqrs/IPipelineBehavior.cs b/GFramework.Core.Abstractions/Cqrs/IPipelineBehavior.cs new file mode 100644 index 00000000..cd01aad3 --- /dev/null +++ b/GFramework.Core.Abstractions/Cqrs/IPipelineBehavior.cs @@ -0,0 +1,22 @@ +namespace GFramework.Core.Abstractions.Cqrs; + +/// +/// 定义 CQRS 请求处理前后的管道行为。 +/// +/// 请求类型。 +/// 响应类型。 +public interface IPipelineBehavior + where TRequest : IRequest +{ + /// + /// 处理当前请求,并决定是否继续调用后续行为或最终处理器。 + /// + /// 当前请求消息。 + /// 下一个处理委托。 + /// 取消令牌。 + /// 请求响应。 + ValueTask Handle( + TRequest message, + MessageHandlerDelegate next, + CancellationToken cancellationToken); +} diff --git a/GFramework.Core.Abstractions/Cqrs/IRequest.cs b/GFramework.Core.Abstractions/Cqrs/IRequest.cs new file mode 100644 index 00000000..26259fc4 --- /dev/null +++ b/GFramework.Core.Abstractions/Cqrs/IRequest.cs @@ -0,0 +1,10 @@ +namespace GFramework.Core.Abstractions.Cqrs; + +/// +/// 表示一个有响应的 CQRS 请求。 +/// 该接口是命令、查询以及其他请求语义的统一基接口。 +/// +/// 请求响应类型。 +public interface IRequest +{ +} diff --git a/GFramework.Core.Abstractions/Cqrs/IRequestHandler.cs b/GFramework.Core.Abstractions/Cqrs/IRequestHandler.cs new file mode 100644 index 00000000..2415e282 --- /dev/null +++ b/GFramework.Core.Abstractions/Cqrs/IRequestHandler.cs @@ -0,0 +1,18 @@ +namespace GFramework.Core.Abstractions.Cqrs; + +/// +/// 表示处理单个 CQRS 请求的处理器契约。 +/// +/// 请求类型。 +/// 响应类型。 +public interface IRequestHandler + where TRequest : IRequest +{ + /// + /// 处理指定请求并返回结果。 + /// + /// 要处理的请求。 + /// 取消令牌。 + /// 请求结果。 + ValueTask Handle(TRequest request, CancellationToken cancellationToken); +} diff --git a/GFramework.Core.Abstractions/Cqrs/IStreamRequest.cs b/GFramework.Core.Abstractions/Cqrs/IStreamRequest.cs new file mode 100644 index 00000000..05ffa5df --- /dev/null +++ b/GFramework.Core.Abstractions/Cqrs/IStreamRequest.cs @@ -0,0 +1,10 @@ +namespace GFramework.Core.Abstractions.Cqrs; + +/// +/// 表示一个流式 CQRS 请求。 +/// 请求处理器可以逐步产生响应序列,而不是一次性返回完整结果。 +/// +/// 流式响应元素类型。 +public interface IStreamRequest +{ +} diff --git a/GFramework.Core.Abstractions/Cqrs/IStreamRequestHandler.cs b/GFramework.Core.Abstractions/Cqrs/IStreamRequestHandler.cs new file mode 100644 index 00000000..1c6e02a7 --- /dev/null +++ b/GFramework.Core.Abstractions/Cqrs/IStreamRequestHandler.cs @@ -0,0 +1,18 @@ +namespace GFramework.Core.Abstractions.Cqrs; + +/// +/// 表示处理流式 CQRS 请求的处理器契约。 +/// +/// 流式请求类型。 +/// 流式响应元素类型。 +public interface IStreamRequestHandler + where TRequest : IStreamRequest +{ + /// + /// 处理流式请求并返回异步响应序列。 + /// + /// 要处理的请求。 + /// 取消令牌。 + /// 异步响应序列。 + IAsyncEnumerable Handle(TRequest request, CancellationToken cancellationToken); +} diff --git a/GFramework.Core.Abstractions/Cqrs/MessageHandlerDelegate.cs b/GFramework.Core.Abstractions/Cqrs/MessageHandlerDelegate.cs new file mode 100644 index 00000000..172f7f3b --- /dev/null +++ b/GFramework.Core.Abstractions/Cqrs/MessageHandlerDelegate.cs @@ -0,0 +1,14 @@ +namespace GFramework.Core.Abstractions.Cqrs; + +/// +/// 表示 CQRS 请求在管道中继续向下执行的处理委托。 +/// +/// 请求类型。 +/// 响应类型。 +/// 当前请求消息。 +/// 取消令牌。 +/// 请求响应。 +public delegate ValueTask MessageHandlerDelegate( + TRequest message, + CancellationToken cancellationToken) + where TRequest : IRequest; diff --git a/GFramework.Core.Abstractions/Cqrs/Query/IQuery.cs b/GFramework.Core.Abstractions/Cqrs/Query/IQuery.cs new file mode 100644 index 00000000..cbb1586e --- /dev/null +++ b/GFramework.Core.Abstractions/Cqrs/Query/IQuery.cs @@ -0,0 +1,18 @@ +namespace GFramework.Core.Abstractions.Cqrs.Query; + +/// +/// 表示一个 CQRS 查询。 +/// 查询用于读取数据,不应产生副作用。 +/// +/// 查询响应类型。 +public interface IQuery : IRequest +{ +} + +/// +/// 表示一个流式 CQRS 查询。 +/// +/// 流式响应元素类型。 +public interface IStreamQuery : IStreamRequest +{ +} diff --git a/GFramework.Core.Abstractions/Cqrs/Unit.cs b/GFramework.Core.Abstractions/Cqrs/Unit.cs new file mode 100644 index 00000000..7dc3da14 --- /dev/null +++ b/GFramework.Core.Abstractions/Cqrs/Unit.cs @@ -0,0 +1,13 @@ +namespace GFramework.Core.Abstractions.Cqrs; + +/// +/// 表示没有实际返回值的 CQRS 响应类型。 +/// 该类型用于统一命令与请求的泛型签名,避免引入外部库的 Unit 定义。 +/// +public readonly record struct Unit +{ + /// + /// 获取默认的空响应实例。 + /// + public static Unit Value { get; } = new(); +} diff --git a/GFramework.Core.Tests/Architectures/ArchitectureModulesBehaviorTests.cs b/GFramework.Core.Tests/Architectures/ArchitectureModulesBehaviorTests.cs index 05601230..7485d978 100644 --- a/GFramework.Core.Tests/Architectures/ArchitectureModulesBehaviorTests.cs +++ b/GFramework.Core.Tests/Architectures/ArchitectureModulesBehaviorTests.cs @@ -2,14 +2,14 @@ using GFramework.Core.Abstractions.Architectures; using GFramework.Core.Abstractions.Utility; using GFramework.Core.Architectures; using GFramework.Core.Logging; -using Mediator; -using Microsoft.Extensions.DependencyInjection; +using GFramework.Core.Tests; +using GfCqrs = GFramework.Core.Abstractions.Cqrs; namespace GFramework.Core.Tests.Architectures; /// -/// 验证 Architecture 通过 ArchitectureModules 暴露出的模块安装与 Mediator 行为注册能力。 -/// 这些测试覆盖模块安装回调和中介管道行为接入,确保模块管理器仍然保持可观察行为不变。 +/// 验证 Architecture 通过 ArchitectureModules 暴露出的模块安装与 CQRS 行为注册能力。 +/// 这些测试覆盖模块安装回调和请求管道行为接入,确保模块管理器仍然保持可观察行为不变。 /// [TestFixture] public class ArchitectureModulesBehaviorTests @@ -57,7 +57,7 @@ public class ArchitectureModulesBehaviorTests } /// - /// 验证注册的 Mediator 行为会参与请求管道执行。 + /// 验证注册的 CQRS 行为会参与请求管道执行。 /// [Test] public async Task RegisterMediatorBehavior_Should_Apply_Pipeline_Behavior_To_Request() @@ -67,7 +67,9 @@ public class ArchitectureModulesBehaviorTests await architecture.InitializeAsync(); - var response = await architecture.Context.SendRequestAsync(new ModuleBehaviorRequest()); + var response = await CqrsTestRuntime.ExecutePipelineAsync( + architecture.Context, + new ModuleBehaviorRequest()); Assert.Multiple(() => { @@ -83,12 +85,6 @@ public class ArchitectureModulesBehaviorTests /// private sealed class ModuleTestArchitecture(Action registrationAction) : Architecture { - /// - /// 打开 Mediator 服务注册,以便测试中介行为接入。 - /// - public override Action? Configurator => - services => services.AddMediator(options => { options.ServiceLifetime = ServiceLifetime.Singleton; }); - /// /// 在初始化阶段执行测试注入的模块注册逻辑。 /// @@ -136,14 +132,14 @@ public class ArchitectureModulesBehaviorTests /// /// 用于验证管道行为注册是否生效的测试请求。 /// -public sealed class ModuleBehaviorRequest : IRequest +public sealed class ModuleBehaviorRequest : GfCqrs.IRequest { } /// /// 处理测试请求的处理器。 /// -public sealed class ModuleBehaviorRequestHandler : IRequestHandler +public sealed class ModuleBehaviorRequestHandler : GfCqrs.IRequestHandler { /// /// 返回固定结果,便于聚焦验证管道行为是否执行。 @@ -162,8 +158,8 @@ public sealed class ModuleBehaviorRequestHandler : IRequestHandler /// 请求类型。 /// 响应类型。 -public sealed class TrackingPipelineBehavior : IPipelineBehavior - where TRequest : IRequest +public sealed class TrackingPipelineBehavior : GfCqrs.IPipelineBehavior + where TRequest : GfCqrs.IRequest { /// /// 获取当前测试进程中该请求类型对应的行为触发次数。 @@ -179,10 +175,10 @@ public sealed class TrackingPipelineBehavior : IPipelineBeh /// 下游处理器的响应结果。 public async ValueTask Handle( TRequest message, - MessageHandlerDelegate next, + GfCqrs.MessageHandlerDelegate next, CancellationToken cancellationToken) { InvocationCount++; return await next(message, cancellationToken); } -} \ No newline at end of file +} diff --git a/GFramework.Core.Tests/Architectures/ArchitectureServicesTests.cs b/GFramework.Core.Tests/Architectures/ArchitectureServicesTests.cs index 93244298..512e8a55 100644 --- a/GFramework.Core.Tests/Architectures/ArchitectureServicesTests.cs +++ b/GFramework.Core.Tests/Architectures/ArchitectureServicesTests.cs @@ -347,58 +347,61 @@ public class TestArchitectureContextV3 : IArchitectureContext { } - public ValueTask SendRequestAsync(IRequest request, + public ValueTask SendRequestAsync(global::GFramework.Core.Abstractions.Cqrs.IRequest request, CancellationToken cancellationToken = default) { throw new NotImplementedException(); } - public TResponse SendRequest(IRequest request) + public TResponse SendRequest(global::GFramework.Core.Abstractions.Cqrs.IRequest request) { throw new NotImplementedException(); } - public ValueTask SendCommandAsync(global::Mediator.ICommand command, + public ValueTask SendCommandAsync( + global::GFramework.Core.Abstractions.Cqrs.Command.ICommand command, CancellationToken cancellationToken = default) { throw new NotImplementedException(); } - public TResponse SendCommand(global::Mediator.ICommand command) + public TResponse SendCommand(global::GFramework.Core.Abstractions.Cqrs.Command.ICommand command) { throw new NotImplementedException(); } - public ValueTask SendQueryAsync(global::Mediator.IQuery query, + public ValueTask SendQueryAsync( + global::GFramework.Core.Abstractions.Cqrs.Query.IQuery query, CancellationToken cancellationToken = default) { throw new NotImplementedException(); } - public TResponse SendQuery(global::Mediator.IQuery query) + public TResponse SendQuery(global::GFramework.Core.Abstractions.Cqrs.Query.IQuery query) { throw new NotImplementedException(); } public ValueTask PublishAsync(TNotification notification, - CancellationToken cancellationToken = default) where TNotification : INotification + CancellationToken cancellationToken = default) where TNotification : global::GFramework.Core.Abstractions.Cqrs.INotification { throw new NotImplementedException(); } - public IAsyncEnumerable CreateStream(IStreamRequest request, + public IAsyncEnumerable CreateStream( + global::GFramework.Core.Abstractions.Cqrs.IStreamRequest request, CancellationToken cancellationToken = default) { throw new NotImplementedException(); } public ValueTask SendAsync(TCommand command, CancellationToken cancellationToken = default) - where TCommand : IRequest + where TCommand : global::GFramework.Core.Abstractions.Cqrs.IRequest { throw new NotImplementedException(); } - public ValueTask SendAsync(IRequest command, + public ValueTask SendAsync(global::GFramework.Core.Abstractions.Cqrs.IRequest command, CancellationToken cancellationToken = default) { throw new NotImplementedException(); @@ -439,4 +442,4 @@ public class TestArchitectureContextV3 : IArchitectureContext } } -#endregion \ No newline at end of file +#endregion diff --git a/GFramework.Core.Tests/Architectures/GameContextTests.cs b/GFramework.Core.Tests/Architectures/GameContextTests.cs index 8f1bea57..6c2670bc 100644 --- a/GFramework.Core.Tests/Architectures/GameContextTests.cs +++ b/GFramework.Core.Tests/Architectures/GameContextTests.cs @@ -394,58 +394,61 @@ public class TestArchitectureContext : IArchitectureContext { } - public ValueTask SendRequestAsync(IRequest request, + public ValueTask SendRequestAsync(global::GFramework.Core.Abstractions.Cqrs.IRequest request, CancellationToken cancellationToken = default) { throw new NotImplementedException(); } - public TResponse SendRequest(IRequest request) + public TResponse SendRequest(global::GFramework.Core.Abstractions.Cqrs.IRequest request) { throw new NotImplementedException(); } - public ValueTask SendCommandAsync(global::Mediator.ICommand command, + public ValueTask SendCommandAsync( + global::GFramework.Core.Abstractions.Cqrs.Command.ICommand command, CancellationToken cancellationToken = default) { throw new NotImplementedException(); } - public TResponse SendCommand(global::Mediator.ICommand command) + public TResponse SendCommand(global::GFramework.Core.Abstractions.Cqrs.Command.ICommand command) { throw new NotImplementedException(); } - public ValueTask SendQueryAsync(global::Mediator.IQuery query, + public ValueTask SendQueryAsync( + global::GFramework.Core.Abstractions.Cqrs.Query.IQuery query, CancellationToken cancellationToken = default) { throw new NotImplementedException(); } - public TResponse SendQuery(global::Mediator.IQuery query) + public TResponse SendQuery(global::GFramework.Core.Abstractions.Cqrs.Query.IQuery query) { throw new NotImplementedException(); } public ValueTask PublishAsync(TNotification notification, - CancellationToken cancellationToken = default) where TNotification : INotification + CancellationToken cancellationToken = default) where TNotification : global::GFramework.Core.Abstractions.Cqrs.INotification { throw new NotImplementedException(); } - public IAsyncEnumerable CreateStream(IStreamRequest request, + public IAsyncEnumerable CreateStream( + global::GFramework.Core.Abstractions.Cqrs.IStreamRequest request, CancellationToken cancellationToken = default) { throw new NotImplementedException(); } public ValueTask SendAsync(TCommand command, CancellationToken cancellationToken = default) - where TCommand : IRequest + where TCommand : global::GFramework.Core.Abstractions.Cqrs.IRequest { throw new NotImplementedException(); } - public ValueTask SendAsync(IRequest command, + public ValueTask SendAsync(global::GFramework.Core.Abstractions.Cqrs.IRequest command, CancellationToken cancellationToken = default) { throw new NotImplementedException(); @@ -510,4 +513,4 @@ public class TestArchitectureContext : IArchitectureContext { return Environment; } -} \ No newline at end of file +} diff --git a/GFramework.Core.Tests/CqrsTestRuntime.cs b/GFramework.Core.Tests/CqrsTestRuntime.cs new file mode 100644 index 00000000..ae1d3361 --- /dev/null +++ b/GFramework.Core.Tests/CqrsTestRuntime.cs @@ -0,0 +1,73 @@ +using System.Reflection; +using GFramework.Core.Abstractions.Architectures; +using GFramework.Core.Abstractions.Logging; +using GFramework.Core.Abstractions.Rule; +using GFramework.Core.Architectures; +using GFramework.Core.Ioc; +using GFramework.Core.Logging; +using GfCqrs = GFramework.Core.Abstractions.Cqrs; + +namespace GFramework.Core.Tests; + +internal static class CqrsTestRuntime +{ + private static readonly MethodInfo RegisterHandlersMethod = typeof(ArchitectureContext).Assembly + .GetType("GFramework.Core.Cqrs.Internal.CqrsHandlerRegistrar", throwOnError: true)! + .GetMethod( + "RegisterHandlers", + BindingFlags.Public | BindingFlags.NonPublic | BindingFlags.Static)! + ?? throw new InvalidOperationException("Failed to locate CqrsHandlerRegistrar.RegisterHandlers."); + + public static void RegisterHandlers(MicrosoftDiContainer container, params Assembly[] assemblies) + { + ArgumentNullException.ThrowIfNull(container); + ArgumentNullException.ThrowIfNull(assemblies); + + var logger = LoggerFactoryResolver.Provider.CreateLogger(nameof(CqrsTestRuntime)); + RegisterHandlersMethod.Invoke( + null, + [container, assemblies.Where(static assembly => assembly is not null).Distinct().ToArray(), logger]); + } + + public static ValueTask ExecutePipelineAsync( + IArchitectureContext context, + TRequest request, + CancellationToken cancellationToken = default) + where TRequest : class, GfCqrs.IRequest + { + ArgumentNullException.ThrowIfNull(context); + ArgumentNullException.ThrowIfNull(request); + + var handlers = context.GetServices>(); + if (handlers.Count == 0) + throw new InvalidOperationException( + $"No CQRS request handler registered for {typeof(TRequest).FullName}."); + + if (handlers.Count > 1) + throw new InvalidOperationException( + $"Expected a single CQRS request handler for {typeof(TRequest).FullName}, but found {handlers.Count}."); + + var handler = handlers[0]; + PrepareContext(handler, context); + + GfCqrs.MessageHandlerDelegate pipeline = handler.Handle; + + var behaviors = context.GetServices>(); + for (var index = behaviors.Count - 1; index >= 0; index--) + { + var behavior = behaviors[index]; + PrepareContext(behavior, context); + + var next = pipeline; + pipeline = (message, token) => behavior.Handle(message, next, token); + } + + return pipeline(request, cancellationToken); + } + + private static void PrepareContext(object instance, IArchitectureContext context) + { + if (instance is IContextAware contextAware) + contextAware.SetContext(context); + } +} diff --git a/GFramework.Core.Tests/Mediator/MediatorAdvancedFeaturesTests.cs b/GFramework.Core.Tests/Mediator/MediatorAdvancedFeaturesTests.cs index bc5cd782..d3bf4041 100644 --- a/GFramework.Core.Tests/Mediator/MediatorAdvancedFeaturesTests.cs +++ b/GFramework.Core.Tests/Mediator/MediatorAdvancedFeaturesTests.cs @@ -1,10 +1,10 @@ using System.Diagnostics; using System.Reflection; +using GFramework.Core.Abstractions.Cqrs; using GFramework.Core.Architectures; using GFramework.Core.Ioc; using GFramework.Core.Logging; -using Mediator; -using Microsoft.Extensions.DependencyInjection; +using GFramework.Core.Tests; namespace GFramework.Core.Tests.Mediator; @@ -26,11 +26,10 @@ public class MediatorAdvancedFeaturesTests loggerField?.SetValue(_container, LoggerFactoryResolver.Provider.CreateLogger(nameof(MediatorAdvancedFeaturesTests))); - // 注册Mediator及相关处理器 - _container.ExecuteServicesHook(configurator => - { - configurator.AddMediator(options => { options.ServiceLifetime = ServiceLifetime.Singleton; }); - }); + CqrsTestRuntime.RegisterHandlers( + _container, + typeof(MediatorAdvancedFeaturesTests).Assembly, + typeof(ArchitectureContext).Assembly); _container.Freeze(); _context = new ArchitectureContext(_container); @@ -487,4 +486,4 @@ public sealed record TestDatabaseRequest : IRequest public List Storage { get; init; } = new(); } -#endregion \ No newline at end of file +#endregion diff --git a/GFramework.Core.Tests/Mediator/MediatorArchitectureIntegrationTests.cs b/GFramework.Core.Tests/Mediator/MediatorArchitectureIntegrationTests.cs index 85bbf3aa..056517f7 100644 --- a/GFramework.Core.Tests/Mediator/MediatorArchitectureIntegrationTests.cs +++ b/GFramework.Core.Tests/Mediator/MediatorArchitectureIntegrationTests.cs @@ -1,12 +1,12 @@ using System.Diagnostics; using System.Reflection; using GFramework.Core.Abstractions.Architectures; +using GFramework.Core.Abstractions.Cqrs; using GFramework.Core.Architectures; using GFramework.Core.Command; using GFramework.Core.Ioc; using GFramework.Core.Logging; -using Mediator; -using Microsoft.Extensions.DependencyInjection; +using GFramework.Core.Tests; using ICommand = GFramework.Core.Abstractions.Command.ICommand; namespace GFramework.Core.Tests.Mediator; @@ -33,11 +33,10 @@ public class MediatorArchitectureIntegrationTests _commandBus = new CommandExecutor(); _container.RegisterPlurality(_commandBus); - // 注册Mediator - _container.ExecuteServicesHook(configurator => - { - configurator.AddMediator(options => { options.ServiceLifetime = ServiceLifetime.Singleton; }); - }); + CqrsTestRuntime.RegisterHandlers( + _container, + typeof(MediatorArchitectureIntegrationTests).Assembly, + typeof(ArchitectureContext).Assembly); _container.Freeze(); _context = new ArchitectureContext(_container); @@ -559,4 +558,4 @@ public class TestTraditionalCommand : ICommand public IArchitectureContext GetContext() => null!; } -#endregion \ No newline at end of file +#endregion diff --git a/GFramework.Core.Tests/Mediator/MediatorComprehensiveTests.cs b/GFramework.Core.Tests/Mediator/MediatorComprehensiveTests.cs index 020bc26b..27522c16 100644 --- a/GFramework.Core.Tests/Mediator/MediatorComprehensiveTests.cs +++ b/GFramework.Core.Tests/Mediator/MediatorComprehensiveTests.cs @@ -2,6 +2,7 @@ using System.Diagnostics; using System.Reflection; using System.Runtime.CompilerServices; using GFramework.Core.Abstractions.Architectures; +using GFramework.Core.Abstractions.Cqrs; using GFramework.Core.Abstractions.Events; using GFramework.Core.Architectures; using GFramework.Core.Command; @@ -10,13 +11,9 @@ using GFramework.Core.Events; using GFramework.Core.Ioc; using GFramework.Core.Logging; using GFramework.Core.Query; -using Mediator; -using Microsoft.Extensions.DependencyInjection; +using GFramework.Core.Tests; using ICommand = GFramework.Core.Abstractions.Command.ICommand; - -// ✅ Mediator 库的命名空间 - -// ✅ 使用 global using 或别名来区分 +using Unit = GFramework.Core.Abstractions.Cqrs.Unit; namespace GFramework.Core.Tests.Mediator; @@ -25,7 +22,7 @@ public class MediatorComprehensiveTests { /// /// 测试初始化方法,在每个测试方法执行前运行。 - /// 负责初始化日志工厂、依赖注入容器、Mediator以及各种总线服务。 + /// 负责初始化日志工厂、依赖注入容器、自有 CQRS 处理器以及各种总线服务。 /// [SetUp] public void SetUp() @@ -51,13 +48,11 @@ public class MediatorComprehensiveTests _container.RegisterPlurality(_asyncQueryBus); _container.RegisterPlurality(_environment); - // ✅ 注册 Mediator - _container.ExecuteServicesHook(configurator => - { - configurator.AddMediator(options => { options.ServiceLifetime = ServiceLifetime.Singleton; }); - }); + CqrsTestRuntime.RegisterHandlers( + _container, + typeof(MediatorComprehensiveTests).Assembly, + typeof(ArchitectureContext).Assembly); - // ✅ Freeze 容器 _container.Freeze(); _context = new ArchitectureContext(_container); @@ -194,19 +189,19 @@ public class MediatorComprehensiveTests /// - /// 测试未注册的Mediator抛出InvalidOperationException + /// 测试未注册的 CQRS handler 时抛出 InvalidOperationException /// [Test] - public void Unregistered_Mediator_Should_Throw_InvalidOperationException() + public void Unregistered_Cqrs_Handler_Should_Throw_InvalidOperationException() { - var containerWithoutMediator = new MicrosoftDiContainer(); - containerWithoutMediator.Freeze(); + var containerWithoutHandlers = new MicrosoftDiContainer(); + containerWithoutHandlers.Freeze(); - var contextWithoutMediator = new ArchitectureContext(containerWithoutMediator); + var contextWithoutHandlers = new ArchitectureContext(containerWithoutHandlers); var testRequest = new TestRequest { Value = 42 }; Assert.ThrowsAsync(async () => - await contextWithoutMediator.SendRequestAsync(testRequest)); + await contextWithoutHandlers.SendRequestAsync(testRequest)); } /// @@ -270,10 +265,10 @@ public class MediatorComprehensiveTests } /// - /// 测试并发Mediator请求不会相互干扰 + /// 测试并发 CQRS 请求不会相互干扰 /// [Test] - public async Task Concurrent_Mediator_Requests_Should_Not_Interfere() + public async Task Concurrent_Cqrs_Requests_Should_Not_Interfere() { const int requestCount = 10; var tasks = new List>(); @@ -389,10 +384,10 @@ public class MediatorComprehensiveTests } /// - /// 测试Mediator性能基准 + /// 测试 CQRS 性能基准 /// [Test] - public async Task Performance_Benchmark_For_Mediator() + public async Task Performance_Benchmark_For_Cqrs() { const int iterations = 1000; var stopwatch = Stopwatch.StartNew(); @@ -413,10 +408,10 @@ public class MediatorComprehensiveTests } /// - /// 测试Mediator和传统CQRS可以共存 + /// 测试自有 CQRS 和传统 CQRS 可以共存 /// [Test] - public async Task Mediator_And_Legacy_CQRS_Can_Coexist() + public async Task Cqrs_And_Legacy_CQRS_Can_Coexist() { // 使用传统方式 var legacyCommand = new TestLegacyCommand(); @@ -726,4 +721,4 @@ public sealed class TestStreamRequestHandler : IStreamRequestHandler /// 为服务容器设置上下文并执行扩展配置钩子。 - /// 这一步统一承接 Mediator 等容器扩展的接入点,避免 直接操作容器细节。 + /// 这一步统一承接 CQRS 运行时与容器扩展的接入点,避免 直接操作容器细节。 /// /// 当前架构上下文。 /// 可选的服务集合配置委托。 private void ConfigureServices(IArchitectureContext context, Action? configurator) { services.SetContext(context); + CqrsHandlerRegistrar.RegisterHandlers( + services.Container, + [architectureType.Assembly, typeof(ArchitectureContext).Assembly], + logger); if (configurator is null) - logger.Debug("Mediator-based cqrs will not take effect without the service setter configured!"); + logger.Debug("No external service configurator provided. Using built-in CQRS runtime registration only."); services.Container.ExecuteServicesHook(configurator); } @@ -115,4 +120,4 @@ internal sealed class ArchitectureBootstrapper( { await services.ModuleManager.InitializeAllAsync(asyncMode); } -} \ No newline at end of file +} diff --git a/GFramework.Core/Architectures/ArchitectureContext.cs b/GFramework.Core/Architectures/ArchitectureContext.cs index 1e3d72a7..77c04fcf 100644 --- a/GFramework.Core/Architectures/ArchitectureContext.cs +++ b/GFramework.Core/Architectures/ArchitectureContext.cs @@ -1,14 +1,19 @@ using System.Collections.Concurrent; using GFramework.Core.Abstractions.Architectures; using GFramework.Core.Abstractions.Command; +using GFramework.Core.Abstractions.Cqrs; +using GFramework.Core.Abstractions.Cqrs.Command; +using GFramework.Core.Abstractions.Cqrs.Query; using GFramework.Core.Abstractions.Environment; using GFramework.Core.Abstractions.Events; using GFramework.Core.Abstractions.Ioc; +using GFramework.Core.Abstractions.Logging; using GFramework.Core.Abstractions.Model; using GFramework.Core.Abstractions.Query; using GFramework.Core.Abstractions.Systems; using GFramework.Core.Abstractions.Utility; -using Mediator; +using GFramework.Core.Cqrs.Internal; +using GFramework.Core.Logging; using ICommand = GFramework.Core.Abstractions.Command.ICommand; namespace GFramework.Core.Architectures; @@ -20,23 +25,15 @@ public class ArchitectureContext(IIocContainer container) : IArchitectureContext { private readonly IIocContainer _container = container ?? throw new ArgumentNullException(nameof(container)); private readonly ConcurrentDictionary _serviceCache = new(); + private readonly ILogger _logger = LoggerFactoryResolver.Provider.CreateLogger(nameof(ArchitectureContext)); + private CqrsDispatcher? _cqrsDispatcher; - #region Mediator Integration + #region CQRS Integration /// - /// 获取 Mediator 实例(延迟加载) + /// 获取 CQRS 运行时分发器(延迟初始化)。 /// - private IMediator Mediator => GetOrCache(); - - /// - /// 获取 ISender 实例(更轻量的发送器) - /// - private ISender Sender => GetOrCache(); - - /// - /// 获取 IPublisher 实例(用于发布通知) - /// - private IPublisher Publisher => GetOrCache(); + private CqrsDispatcher CqrsDispatcher => _cqrsDispatcher ??= new CqrsDispatcher(_container, this, _logger); /// /// 获取指定类型的服务实例,如果缓存中存在则直接返回,否则从容器中获取并缓存 @@ -64,30 +61,23 @@ public class ArchitectureContext(IIocContainer container) : IArchitectureContext } /// - /// [Mediator] 发送请求(Command/Query) - /// 这是推荐的新方式,统一处理命令和查询 + /// 发送请求(Command/Query) + /// 使用 GFramework 自有 CQRS runtime 统一处理命令和查询。 /// /// 响应类型 /// 请求对象(Command 或 Query) /// 取消令牌 /// 响应结果 - /// 当 Mediator 未注册时抛出 public async ValueTask SendRequestAsync( IRequest request, CancellationToken cancellationToken = default) { ArgumentNullException.ThrowIfNull(request); - - var mediator = Mediator; - if (mediator == null) - throw new InvalidOperationException( - "Mediator not registered. Call EnableMediator() in your Architecture.OnInitialize() method."); - - return await mediator.Send(request, cancellationToken); + return await CqrsDispatcher.SendAsync(request, cancellationToken); } /// - /// [Mediator] 发送请求的同步版本(不推荐,仅用于兼容性) + /// 发送请求的同步版本(不推荐,仅用于兼容性) /// /// 响应类型 /// 请求对象 @@ -98,8 +88,8 @@ public class ArchitectureContext(IIocContainer container) : IArchitectureContext } /// - /// [Mediator] 发布通知(一对多) - /// 用于事件驱动场景,多个处理器可以同时处理同一个通知 + /// 发布通知(一对多) + /// 使用 GFramework 自有 CQRS runtime 分发到所有已注册通知处理器。 /// /// 通知类型 /// 通知对象 @@ -110,16 +100,11 @@ public class ArchitectureContext(IIocContainer container) : IArchitectureContext where TNotification : INotification { ArgumentNullException.ThrowIfNull(notification); - - var publisher = Publisher; - if (publisher == null) - throw new InvalidOperationException("Publisher not registered."); - - await publisher.Publish(notification, cancellationToken); + await CqrsDispatcher.PublishAsync(notification, cancellationToken); } /// - /// [Mediator] 发送请求并返回流(用于大数据集) + /// 发送请求并返回流(用于大数据集) /// /// 响应项类型 /// 流式请求 @@ -130,12 +115,7 @@ public class ArchitectureContext(IIocContainer container) : IArchitectureContext CancellationToken cancellationToken = default) { ArgumentNullException.ThrowIfNull(request); - - var mediator = Mediator; - if (mediator == null) - throw new InvalidOperationException("Mediator not registered."); - - return mediator.CreateStream(request, cancellationToken); + return CqrsDispatcher.CreateStream(request, cancellationToken); } /// @@ -180,12 +160,12 @@ public class ArchitectureContext(IIocContainer container) : IArchitectureContext } /// - /// [Mediator] 发送查询的同步版本(不推荐,仅用于兼容性) + /// 发送 CQRS 查询的同步版本(不推荐,仅用于兼容性) /// /// 查询响应类型 /// 要发送的查询对象 /// 查询结果 - public TResponse SendQuery(Mediator.IQuery query) + public TResponse SendQuery(GFramework.Core.Abstractions.Cqrs.Query.IQuery query) { return SendQueryAsync(query).AsTask().GetAwaiter().GetResult(); } @@ -205,23 +185,17 @@ public class ArchitectureContext(IIocContainer container) : IArchitectureContext } /// - /// [Mediator] 异步发送查询并返回结果 - /// 通过Mediator模式发送查询请求,支持取消操作 + /// 异步发送 CQRS 查询并返回结果。 /// /// 查询响应类型 /// 要发送的查询对象 /// 取消令牌,用于取消操作 /// 包含查询结果的ValueTask - public async ValueTask SendQueryAsync(Mediator.IQuery query, + public async ValueTask SendQueryAsync(GFramework.Core.Abstractions.Cqrs.Query.IQuery query, CancellationToken cancellationToken = default) { ArgumentNullException.ThrowIfNull(query); - - var sender = Sender; - if (sender == null) - throw new InvalidOperationException("Sender not registered."); - - return await sender.Send(query, cancellationToken); + return await SendRequestAsync(query, cancellationToken); } #endregion @@ -347,23 +321,17 @@ public class ArchitectureContext(IIocContainer container) : IArchitectureContext #region Command Execution /// - /// [Mediator] 异步发送命令并返回结果 - /// 通过Mediator模式发送命令请求,支持取消操作 + /// 异步发送 CQRS 命令并返回结果。 /// /// 命令响应类型 /// 要发送的命令对象 /// 取消令牌,用于取消操作 /// 包含命令执行结果的ValueTask - public async ValueTask SendCommandAsync(Mediator.ICommand command, + public async ValueTask SendCommandAsync(GFramework.Core.Abstractions.Cqrs.Command.ICommand command, CancellationToken cancellationToken = default) { ArgumentNullException.ThrowIfNull(command); - - var sender = Sender; - if (sender == null) - throw new InvalidOperationException("Sender not registered."); - - return await sender.Send(command, cancellationToken); + return await SendRequestAsync(command, cancellationToken); } /// @@ -393,12 +361,12 @@ public class ArchitectureContext(IIocContainer container) : IArchitectureContext } /// - /// [Mediator] 发送命令的同步版本(不推荐,仅用于兼容性) + /// 发送 CQRS 命令的同步版本(不推荐,仅用于兼容性) /// /// 命令响应类型 /// 要发送的命令对象 /// 命令执行结果 - public TResponse SendCommand(Mediator.ICommand command) + public TResponse SendCommand(GFramework.Core.Abstractions.Cqrs.Command.ICommand command) { return SendCommandAsync(command).AsTask().GetAwaiter().GetResult(); } @@ -491,4 +459,4 @@ public class ArchitectureContext(IIocContainer container) : IArchitectureContext } #endregion -} \ No newline at end of file +} diff --git a/GFramework.Core/Command/AbstractCommandWithInput.cs b/GFramework.Core/Command/AbstractCommandWithInput.cs index 5326fe6e..7512c760 100644 --- a/GFramework.Core/Command/AbstractCommandWithInput.cs +++ b/GFramework.Core/Command/AbstractCommandWithInput.cs @@ -9,13 +9,13 @@ namespace GFramework.Core.Command; /// /// 命令输入参数类型,必须实现 ICommandInput 接口 /// 命令执行所需的输入参数 -public abstract class AbstractCommand(TInput input) : ContextAwareBase, ICommand +public abstract class AbstractCommand(TInput input) : ContextAwareBase, GFramework.Core.Abstractions.Command.ICommand where TInput : ICommandInput { /// /// 执行命令的入口方法,实现 ICommand 接口的 Execute 方法 /// - void ICommand.Execute() + void GFramework.Core.Abstractions.Command.ICommand.Execute() { OnExecute(input); } @@ -25,4 +25,4 @@ public abstract class AbstractCommand(TInput input) : ContextAwareBase, /// /// 命令执行所需的输入参数 protected abstract void OnExecute(TInput input); -} \ No newline at end of file +} diff --git a/GFramework.Core/Command/AbstractCommandWithResult.cs b/GFramework.Core/Command/AbstractCommandWithResult.cs index 67901821..7ecc9522 100644 --- a/GFramework.Core/Command/AbstractCommandWithResult.cs +++ b/GFramework.Core/Command/AbstractCommandWithResult.cs @@ -10,14 +10,14 @@ namespace GFramework.Core.Command; /// 命令输入参数类型,必须实现 ICommandInput 接口 /// 命令执行后返回的结果类型 /// 命令执行所需的输入参数 -public abstract class AbstractCommand(TInput input) : ContextAwareBase, ICommand +public abstract class AbstractCommand(TInput input) : ContextAwareBase, GFramework.Core.Abstractions.Command.ICommand where TInput : ICommandInput { /// /// 执行命令的入口方法,实现 ICommand{TResult} 接口的 Execute 方法 /// /// 命令执行后的结果 - TResult ICommand.Execute() + TResult GFramework.Core.Abstractions.Command.ICommand.Execute() { return OnExecute(input); } @@ -28,4 +28,4 @@ public abstract class AbstractCommand(TInput input) : ContextAw /// 命令执行所需的输入参数 /// 命令执行后的结果 protected abstract TResult OnExecute(TInput input); -} \ No newline at end of file +} diff --git a/GFramework.Core/Cqrs/Behaviors/LoggingBehavior.cs b/GFramework.Core/Cqrs/Behaviors/LoggingBehavior.cs index 4a56c775..4aaf797a 100644 --- a/GFramework.Core/Cqrs/Behaviors/LoggingBehavior.cs +++ b/GFramework.Core/Cqrs/Behaviors/LoggingBehavior.cs @@ -12,9 +12,9 @@ // limitations under the License. using System.Diagnostics; +using GFramework.Core.Abstractions.Cqrs; using GFramework.Core.Abstractions.Logging; using GFramework.Core.Logging; -using Mediator; namespace GFramework.Core.Cqrs.Behaviors; @@ -69,4 +69,4 @@ public sealed class LoggingBehavior : IPipelineBehavior : IPipelineBehavior } } } -} \ No newline at end of file +} diff --git a/GFramework.Core/Cqrs/Command/AbstractCommandHandler.cs b/GFramework.Core/Cqrs/Command/AbstractCommandHandler.cs index 8f7b6112..0ffc7424 100644 --- a/GFramework.Core/Cqrs/Command/AbstractCommandHandler.cs +++ b/GFramework.Core/Cqrs/Command/AbstractCommandHandler.cs @@ -12,7 +12,8 @@ // limitations under the License. using GFramework.Core.Rule; -using Mediator; +using GFramework.Core.Abstractions.Cqrs; +using GFramework.Core.Abstractions.Cqrs.Command; namespace GFramework.Core.Cqrs.Command; @@ -21,7 +22,7 @@ namespace GFramework.Core.Cqrs.Command; /// 继承自ContextAwareBase并实现ICommandHandler接口,为具体的命令处理器提供基础功能 /// /// 命令类型 -public abstract class AbstractCommandHandler : ContextAwareBase, ICommandHandler +public abstract class AbstractCommandHandler : ContextAwareBase, IRequestHandler where TCommand : ICommand { /// @@ -41,7 +42,7 @@ public abstract class AbstractCommandHandler : ContextAwareBase, IComm /// /// 命令类型,必须实现ICommand接口 /// 命令执行结果类型 -public abstract class AbstractCommandHandler : ContextAwareBase, ICommandHandler +public abstract class AbstractCommandHandler : ContextAwareBase, IRequestHandler where TCommand : ICommand { /// @@ -52,4 +53,4 @@ public abstract class AbstractCommandHandler : ContextAwareBa /// 取消令牌,用于取消操作 /// 表示异步操作完成的ValueTask,包含命令执行结果 public abstract ValueTask Handle(TCommand command, CancellationToken cancellationToken); -} \ No newline at end of file +} diff --git a/GFramework.Core/Cqrs/Command/AbstractStreamCommandHandler.cs b/GFramework.Core/Cqrs/Command/AbstractStreamCommandHandler.cs index 064cb279..0f86f36c 100644 --- a/GFramework.Core/Cqrs/Command/AbstractStreamCommandHandler.cs +++ b/GFramework.Core/Cqrs/Command/AbstractStreamCommandHandler.cs @@ -12,7 +12,8 @@ // limitations under the License. using GFramework.Core.Rule; -using Mediator; +using GFramework.Core.Abstractions.Cqrs; +using GFramework.Core.Abstractions.Cqrs.Command; namespace GFramework.Core.Cqrs.Command; @@ -24,7 +25,7 @@ namespace GFramework.Core.Cqrs.Command; /// 流式命令类型,必须实现IStreamCommand接口 /// 流式命令响应元素类型 public abstract class AbstractStreamCommandHandler : ContextAwareBase, - IStreamCommandHandler + IStreamRequestHandler where TCommand : IStreamCommand { /// @@ -35,4 +36,4 @@ public abstract class AbstractStreamCommandHandler : Contex /// 取消令牌,用于取消流式处理操作 /// 异步可枚举的响应序列,每个元素类型为TResponse public abstract IAsyncEnumerable Handle(TCommand command, CancellationToken cancellationToken); -} \ No newline at end of file +} diff --git a/GFramework.Core/Cqrs/Command/CommandBase.cs b/GFramework.Core/Cqrs/Command/CommandBase.cs index ba0e56ad..78fa134e 100644 --- a/GFramework.Core/Cqrs/Command/CommandBase.cs +++ b/GFramework.Core/Cqrs/Command/CommandBase.cs @@ -12,7 +12,6 @@ // limitations under the License. using GFramework.Core.Abstractions.Cqrs.Command; -using Mediator; namespace GFramework.Core.Cqrs.Command; @@ -29,4 +28,4 @@ public abstract class CommandBase(TInput input) : ICommand public TInput Input => input; -} \ No newline at end of file +} diff --git a/GFramework.Core/Cqrs/Internal/CqrsDispatcher.cs b/GFramework.Core/Cqrs/Internal/CqrsDispatcher.cs new file mode 100644 index 00000000..69f6794d --- /dev/null +++ b/GFramework.Core/Cqrs/Internal/CqrsDispatcher.cs @@ -0,0 +1,263 @@ +using System.Collections.Concurrent; +using System.Reflection; +using GFramework.Core.Abstractions.Architectures; +using GFramework.Core.Abstractions.Cqrs; +using GFramework.Core.Abstractions.Ioc; +using GFramework.Core.Abstractions.Logging; +using GFramework.Core.Abstractions.Rule; + +namespace GFramework.Core.Cqrs.Internal; + +/// +/// GFramework 自有 CQRS 运行时分发器。 +/// 该类型负责解析请求/通知处理器,并在调用前为上下文感知对象注入当前架构上下文。 +/// +internal sealed class CqrsDispatcher( + IIocContainer container, + IArchitectureContext context, + ILogger logger) +{ + private delegate ValueTask RequestInvoker(object handler, object request, CancellationToken cancellationToken); + private delegate ValueTask RequestPipelineInvoker( + object handler, + IReadOnlyList behaviors, + object request, + CancellationToken cancellationToken); + private delegate ValueTask NotificationInvoker(object handler, object notification, CancellationToken cancellationToken); + private delegate object StreamInvoker(object handler, object request, CancellationToken cancellationToken); + + private static readonly ConcurrentDictionary<(Type RequestType, Type ResponseType), RequestInvoker> RequestInvokers = new(); + private static readonly ConcurrentDictionary<(Type RequestType, Type ResponseType), RequestPipelineInvoker> RequestPipelineInvokers = new(); + private static readonly ConcurrentDictionary NotificationInvokers = new(); + private static readonly ConcurrentDictionary<(Type RequestType, Type ResponseType), StreamInvoker> StreamInvokers = new(); + + /// + /// 发送请求并返回结果。 + /// + /// 响应类型。 + /// 请求对象。 + /// 取消令牌。 + /// 请求响应。 + public async ValueTask SendAsync( + IRequest request, + CancellationToken cancellationToken = default) + { + ArgumentNullException.ThrowIfNull(request); + + var requestType = request.GetType(); + var handlerType = typeof(IRequestHandler<,>).MakeGenericType(requestType, typeof(TResponse)); + var handler = container.Get(handlerType) + ?? throw new InvalidOperationException( + $"No CQRS request handler registered for {requestType.FullName}."); + + PrepareHandler(handler); + var behaviorType = typeof(IPipelineBehavior<,>).MakeGenericType(requestType, typeof(TResponse)); + var behaviors = container.GetAll(behaviorType); + + foreach (var behavior in behaviors) + PrepareHandler(behavior); + + if (behaviors.Count == 0) + { + var invoker = RequestInvokers.GetOrAdd( + (requestType, typeof(TResponse)), + static key => CreateRequestInvoker(key.RequestType, key.ResponseType)); + + var result = await invoker(handler, request, cancellationToken); + return result is null ? default! : (TResponse)result; + } + + var pipelineInvoker = RequestPipelineInvokers.GetOrAdd( + (requestType, typeof(TResponse)), + static key => CreateRequestPipelineInvoker(key.RequestType, key.ResponseType)); + + var pipelineResult = await pipelineInvoker(handler, behaviors, request, cancellationToken); + return pipelineResult is null ? default! : (TResponse)pipelineResult; + } + + /// + /// 发布通知到所有已注册处理器。 + /// + /// 通知类型。 + /// 通知对象。 + /// 取消令牌。 + public async ValueTask PublishAsync( + TNotification notification, + CancellationToken cancellationToken = default) + where TNotification : INotification + { + ArgumentNullException.ThrowIfNull(notification); + + var notificationType = notification.GetType(); + var handlerType = typeof(INotificationHandler<>).MakeGenericType(notificationType); + var handlers = container.GetAll(handlerType); + + if (handlers.Count == 0) + { + logger.Debug($"No CQRS notification handler registered for {notificationType.FullName}."); + return; + } + + var invoker = NotificationInvokers.GetOrAdd( + notificationType, + CreateNotificationInvoker); + + foreach (var handler in handlers) + { + PrepareHandler(handler); + await invoker(handler, notification, cancellationToken); + } + } + + /// + /// 创建流式请求并返回异步响应序列。 + /// + /// 响应元素类型。 + /// 流式请求对象。 + /// 取消令牌。 + /// 异步响应序列。 + public IAsyncEnumerable CreateStream( + IStreamRequest request, + CancellationToken cancellationToken = default) + { + ArgumentNullException.ThrowIfNull(request); + + var requestType = request.GetType(); + var handlerType = typeof(IStreamRequestHandler<,>).MakeGenericType(requestType, typeof(TResponse)); + var handler = container.Get(handlerType) + ?? throw new InvalidOperationException( + $"No CQRS stream handler registered for {requestType.FullName}."); + + PrepareHandler(handler); + + var invoker = StreamInvokers.GetOrAdd( + (requestType, typeof(TResponse)), + static key => CreateStreamInvoker(key.RequestType, key.ResponseType)); + + return (IAsyncEnumerable)invoker(handler, request, cancellationToken); + } + + /// + /// 为上下文感知处理器注入当前架构上下文。 + /// + /// 处理器实例。 + private void PrepareHandler(object handler) + { + if (handler is IContextAware contextAware) + contextAware.SetContext(context); + } + + /// + /// 生成请求处理器调用委托,避免每次发送都重复反射。 + /// + private static RequestInvoker CreateRequestInvoker(Type requestType, Type responseType) + { + var method = typeof(CqrsDispatcher) + .GetMethod(nameof(InvokeRequestHandlerAsync), BindingFlags.NonPublic | BindingFlags.Static)! + .MakeGenericMethod(requestType, responseType); + return (RequestInvoker)Delegate.CreateDelegate(typeof(RequestInvoker), method); + } + + /// + /// 生成带管道行为的请求处理委托,避免每次发送都重复反射。 + /// + private static RequestPipelineInvoker CreateRequestPipelineInvoker(Type requestType, Type responseType) + { + var method = typeof(CqrsDispatcher) + .GetMethod(nameof(InvokeRequestPipelineAsync), BindingFlags.NonPublic | BindingFlags.Static)! + .MakeGenericMethod(requestType, responseType); + return (RequestPipelineInvoker)Delegate.CreateDelegate(typeof(RequestPipelineInvoker), method); + } + + /// + /// 生成通知处理器调用委托,避免每次发布都重复反射。 + /// + private static NotificationInvoker CreateNotificationInvoker(Type notificationType) + { + var method = typeof(CqrsDispatcher) + .GetMethod(nameof(InvokeNotificationHandlerAsync), BindingFlags.NonPublic | BindingFlags.Static)! + .MakeGenericMethod(notificationType); + return (NotificationInvoker)Delegate.CreateDelegate(typeof(NotificationInvoker), method); + } + + /// + /// 生成流式处理器调用委托,避免每次创建流都重复反射。 + /// + private static StreamInvoker CreateStreamInvoker(Type requestType, Type responseType) + { + var method = typeof(CqrsDispatcher) + .GetMethod(nameof(InvokeStreamHandler), BindingFlags.NonPublic | BindingFlags.Static)! + .MakeGenericMethod(requestType, responseType); + return (StreamInvoker)Delegate.CreateDelegate(typeof(StreamInvoker), method); + } + + /// + /// 执行已强类型化的请求处理器调用。 + /// + private static async ValueTask InvokeRequestHandlerAsync( + object handler, + object request, + CancellationToken cancellationToken) + where TRequest : IRequest + { + var typedHandler = (IRequestHandler)handler; + var typedRequest = (TRequest)request; + var result = await typedHandler.Handle(typedRequest, cancellationToken); + return result; + } + + /// + /// 执行包含管道行为链的请求处理。 + /// + private static async ValueTask InvokeRequestPipelineAsync( + object handler, + IReadOnlyList behaviors, + object request, + CancellationToken cancellationToken) + where TRequest : IRequest + { + var typedHandler = (IRequestHandler)handler; + var typedRequest = (TRequest)request; + + MessageHandlerDelegate next = + (message, token) => typedHandler.Handle(message, token); + + for (var i = behaviors.Count - 1; i >= 0; i--) + { + var behavior = (IPipelineBehavior)behaviors[i]; + var currentNext = next; + next = (message, token) => behavior.Handle(message, currentNext, token); + } + + var result = await next(typedRequest, cancellationToken); + return result; + } + + /// + /// 执行已强类型化的通知处理器调用。 + /// + private static ValueTask InvokeNotificationHandlerAsync( + object handler, + object notification, + CancellationToken cancellationToken) + where TNotification : INotification + { + var typedHandler = (INotificationHandler)handler; + var typedNotification = (TNotification)notification; + return typedHandler.Handle(typedNotification, cancellationToken); + } + + /// + /// 执行已强类型化的流式处理器调用。 + /// + private static object InvokeStreamHandler( + object handler, + object request, + CancellationToken cancellationToken) + where TRequest : IStreamRequest + { + var typedHandler = (IStreamRequestHandler)handler; + var typedRequest = (TRequest)request; + return typedHandler.Handle(typedRequest, cancellationToken); + } +} diff --git a/GFramework.Core/Cqrs/Internal/CqrsHandlerRegistrar.cs b/GFramework.Core/Cqrs/Internal/CqrsHandlerRegistrar.cs new file mode 100644 index 00000000..9f69bc33 --- /dev/null +++ b/GFramework.Core/Cqrs/Internal/CqrsHandlerRegistrar.cs @@ -0,0 +1,81 @@ +using System.Reflection; +using GFramework.Core.Abstractions.Cqrs; +using GFramework.Core.Abstractions.Ioc; +using GFramework.Core.Abstractions.Logging; +using Microsoft.Extensions.DependencyInjection; + +namespace GFramework.Core.Cqrs.Internal; + +/// +/// 在架构初始化期间扫描并注册 CQRS 处理器。 +/// 首批实现采用运行时反射扫描,优先满足“无需 AddMediator 即可工作”的迁移目标。 +/// +internal static class CqrsHandlerRegistrar +{ + /// + /// 扫描指定程序集并注册所有 CQRS 请求/通知/流式处理器。 + /// + /// 目标容器。 + /// 要扫描的程序集集合。 + /// 日志记录器。 + public static void RegisterHandlers( + IIocContainer container, + IEnumerable assemblies, + ILogger logger) + { + ArgumentNullException.ThrowIfNull(container); + ArgumentNullException.ThrowIfNull(assemblies); + ArgumentNullException.ThrowIfNull(logger); + + foreach (var assembly in assemblies.Distinct()) + { + RegisterAssemblyHandlers(container.GetServicesUnsafe, assembly, logger); + } + } + + /// + /// 注册单个程序集里的所有 CQRS 处理器映射。 + /// + private static void RegisterAssemblyHandlers(IServiceCollection services, Assembly assembly, ILogger logger) + { + foreach (var implementationType in assembly.GetTypes().Where(IsConcreteHandlerType)) + { + var handlerInterfaces = implementationType + .GetInterfaces() + .Where(IsSupportedHandlerInterface) + .ToList(); + + if (handlerInterfaces.Count == 0) + continue; + + foreach (var handlerInterface in handlerInterfaces) + { + services.AddSingleton(handlerInterface, implementationType); + logger.Debug( + $"Registered CQRS handler {implementationType.FullName} as {handlerInterface.FullName}."); + } + } + } + + /// + /// 判断指定类型是否可作为可实例化处理器。 + /// + private static bool IsConcreteHandlerType(Type type) + { + return type is { IsAbstract: false, IsInterface: false } && !type.ContainsGenericParameters; + } + + /// + /// 判断接口是否为当前运行时支持的 CQRS 处理器接口。 + /// + private static bool IsSupportedHandlerInterface(Type type) + { + if (!type.IsGenericType) + return false; + + var definition = type.GetGenericTypeDefinition(); + return definition == typeof(IRequestHandler<,>) || + definition == typeof(INotificationHandler<>) || + definition == typeof(IStreamRequestHandler<,>); + } +} diff --git a/GFramework.Core/Cqrs/Notification/AbstractNotificationHandler.cs b/GFramework.Core/Cqrs/Notification/AbstractNotificationHandler.cs index de4772fb..1b1157ab 100644 --- a/GFramework.Core/Cqrs/Notification/AbstractNotificationHandler.cs +++ b/GFramework.Core/Cqrs/Notification/AbstractNotificationHandler.cs @@ -12,7 +12,7 @@ // limitations under the License. using GFramework.Core.Rule; -using Mediator; +using GFramework.Core.Abstractions.Cqrs; namespace GFramework.Core.Cqrs.Notification; @@ -33,4 +33,4 @@ public abstract class AbstractNotificationHandler : ContextAwareB /// 取消令牌,用于取消操作 /// 表示异步操作完成的ValueTask public abstract ValueTask Handle(TNotification notification, CancellationToken cancellationToken); -} \ No newline at end of file +} diff --git a/GFramework.Core/Cqrs/Notification/NotificationBase.cs b/GFramework.Core/Cqrs/Notification/NotificationBase.cs index 96e26ea1..f04488b9 100644 --- a/GFramework.Core/Cqrs/Notification/NotificationBase.cs +++ b/GFramework.Core/Cqrs/Notification/NotificationBase.cs @@ -12,7 +12,7 @@ // limitations under the License. using GFramework.Core.Abstractions.Cqrs.Notification; -using Mediator; +using GFramework.Core.Abstractions.Cqrs; namespace GFramework.Core.Cqrs.Notification; @@ -28,4 +28,4 @@ public abstract class NotificationBase(TInput input) : INotification whe /// 获取通知的输入数据。 /// public TInput Input => input; -} \ No newline at end of file +} diff --git a/GFramework.Core/Cqrs/Query/AbstractQueryHandler.cs b/GFramework.Core/Cqrs/Query/AbstractQueryHandler.cs index 4ce887cf..f861312c 100644 --- a/GFramework.Core/Cqrs/Query/AbstractQueryHandler.cs +++ b/GFramework.Core/Cqrs/Query/AbstractQueryHandler.cs @@ -12,7 +12,8 @@ // limitations under the License. using GFramework.Core.Rule; -using Mediator; +using GFramework.Core.Abstractions.Cqrs; +using GFramework.Core.Abstractions.Cqrs.Query; namespace GFramework.Core.Cqrs.Query; @@ -23,7 +24,7 @@ namespace GFramework.Core.Cqrs.Query; /// /// 查询类型,必须实现IQuery接口 /// 查询结果类型 -public abstract class AbstractQueryHandler : ContextAwareBase, IQueryHandler +public abstract class AbstractQueryHandler : ContextAwareBase, IRequestHandler where TQuery : IQuery { /// @@ -34,4 +35,4 @@ public abstract class AbstractQueryHandler : ContextAwareBase, /// 取消令牌,用于取消操作 /// 表示异步操作完成的ValueTask,包含查询结果 public abstract ValueTask Handle(TQuery query, CancellationToken cancellationToken); -} \ No newline at end of file +} diff --git a/GFramework.Core/Cqrs/Query/AbstractStreamQueryHandler.cs b/GFramework.Core/Cqrs/Query/AbstractStreamQueryHandler.cs index 50cf1817..015da1da 100644 --- a/GFramework.Core/Cqrs/Query/AbstractStreamQueryHandler.cs +++ b/GFramework.Core/Cqrs/Query/AbstractStreamQueryHandler.cs @@ -12,7 +12,8 @@ // limitations under the License. using GFramework.Core.Rule; -using Mediator; +using GFramework.Core.Abstractions.Cqrs; +using GFramework.Core.Abstractions.Cqrs.Query; namespace GFramework.Core.Cqrs.Query; @@ -24,7 +25,7 @@ namespace GFramework.Core.Cqrs.Query; /// 流式查询类型,必须实现IStreamQuery接口 /// 流式查询响应元素类型 public abstract class AbstractStreamQueryHandler : ContextAwareBase, - IStreamQueryHandler + IStreamRequestHandler where TQuery : IStreamQuery { /// @@ -35,4 +36,4 @@ public abstract class AbstractStreamQueryHandler : ContextAwa /// 取消令牌,用于取消流式查询操作 /// 异步可枚举的响应序列,每个元素类型为TResponse public abstract IAsyncEnumerable Handle(TQuery query, CancellationToken cancellationToken); -} \ No newline at end of file +} diff --git a/GFramework.Core/Cqrs/Query/QueryBase.cs b/GFramework.Core/Cqrs/Query/QueryBase.cs index 2ef0f34b..cb0d4782 100644 --- a/GFramework.Core/Cqrs/Query/QueryBase.cs +++ b/GFramework.Core/Cqrs/Query/QueryBase.cs @@ -12,7 +12,6 @@ // limitations under the License. using GFramework.Core.Abstractions.Cqrs.Query; -using Mediator; namespace GFramework.Core.Cqrs.Query; @@ -29,4 +28,4 @@ public abstract class QueryBase(TInput input) : IQuery public TInput Input => input; -} \ No newline at end of file +} diff --git a/GFramework.Core/Cqrs/Request/AbstractRequestHandler.cs b/GFramework.Core/Cqrs/Request/AbstractRequestHandler.cs index 88e9efad..4ef6a270 100644 --- a/GFramework.Core/Cqrs/Request/AbstractRequestHandler.cs +++ b/GFramework.Core/Cqrs/Request/AbstractRequestHandler.cs @@ -12,7 +12,7 @@ // limitations under the License. using GFramework.Core.Rule; -using Mediator; +using GFramework.Core.Abstractions.Cqrs; namespace GFramework.Core.Cqrs.Request; @@ -21,7 +21,7 @@ namespace GFramework.Core.Cqrs.Request; /// 继承自ContextAwareBase并实现IRequestHandler接口 /// /// 请求类型,必须实现IRequest[Unit]接口 -public abstract class AbstractRequestHandler : ContextAwareBase, IRequestHandler +public abstract class AbstractRequestHandler : ContextAwareBase, IRequestHandler where TRequest : IRequest { /// @@ -49,4 +49,4 @@ public abstract class AbstractRequestHandler : ContextAware /// 取消令牌,用于取消操作 /// 表示异步操作的ValueTask,完成时返回处理结果 public abstract ValueTask Handle(TRequest request, CancellationToken cancellationToken); -} \ No newline at end of file +} diff --git a/GFramework.Core/Cqrs/Request/AbstractStreamRequestHandler.cs b/GFramework.Core/Cqrs/Request/AbstractStreamRequestHandler.cs index a6151b49..a15ed5d7 100644 --- a/GFramework.Core/Cqrs/Request/AbstractStreamRequestHandler.cs +++ b/GFramework.Core/Cqrs/Request/AbstractStreamRequestHandler.cs @@ -12,7 +12,7 @@ // limitations under the License. using GFramework.Core.Rule; -using Mediator; +using GFramework.Core.Abstractions.Cqrs; namespace GFramework.Core.Cqrs.Request; @@ -35,4 +35,4 @@ public abstract class AbstractStreamRequestHandler : Contex /// 取消令牌,用于取消流式请求操作 /// 异步可枚举的响应序列,每个元素类型为TResponse public abstract IAsyncEnumerable Handle(TRequest request, CancellationToken cancellationToken); -} \ No newline at end of file +} diff --git a/GFramework.Core/Cqrs/Request/RequestBase.cs b/GFramework.Core/Cqrs/Request/RequestBase.cs index 8b878750..ce85784f 100644 --- a/GFramework.Core/Cqrs/Request/RequestBase.cs +++ b/GFramework.Core/Cqrs/Request/RequestBase.cs @@ -12,7 +12,7 @@ // limitations under the License. using GFramework.Core.Abstractions.Cqrs.Request; -using Mediator; +using GFramework.Core.Abstractions.Cqrs; namespace GFramework.Core.Cqrs.Request; @@ -29,4 +29,4 @@ public abstract class RequestBase(TInput input) : IRequest public TInput Input => input; -} \ No newline at end of file +} diff --git a/GFramework.Core/Extensions/ContextAwareMediatorCommandExtensions.cs b/GFramework.Core/Extensions/ContextAwareMediatorCommandExtensions.cs index 6daccd26..881741e7 100644 --- a/GFramework.Core/Extensions/ContextAwareMediatorCommandExtensions.cs +++ b/GFramework.Core/Extensions/ContextAwareMediatorCommandExtensions.cs @@ -1,16 +1,15 @@ using GFramework.Core.Abstractions.Rule; -using Mediator; +using GFramework.Core.Abstractions.Cqrs.Command; namespace GFramework.Core.Extensions; /// -/// 提供对 IContextAware 接口的 Mediator 命令扩展方法 -/// 使用 Mediator 库的命令模式 +/// 提供对 IContextAware 接口的 CQRS 命令扩展方法。 /// public static class ContextAwareMediatorCommandExtensions { /// - /// [Mediator] 发送命令的同步版本(不推荐,仅用于兼容性) + /// 发送命令的同步版本(不推荐,仅用于兼容性) /// /// 命令响应类型 /// 实现 IContextAware 接口的对象 @@ -28,7 +27,7 @@ public static class ContextAwareMediatorCommandExtensions } /// - /// [Mediator] 异步发送命令并返回结果 + /// 异步发送命令并返回结果 /// /// 命令响应类型 /// 实现 IContextAware 接口的对象 @@ -45,4 +44,4 @@ public static class ContextAwareMediatorCommandExtensions var context = contextAware.GetContext(); return context.SendCommandAsync(command, cancellationToken); } -} \ No newline at end of file +} diff --git a/GFramework.Core/Extensions/ContextAwareMediatorExtensions.cs b/GFramework.Core/Extensions/ContextAwareMediatorExtensions.cs index fa0d699d..1661d87b 100644 --- a/GFramework.Core/Extensions/ContextAwareMediatorExtensions.cs +++ b/GFramework.Core/Extensions/ContextAwareMediatorExtensions.cs @@ -1,10 +1,10 @@ using GFramework.Core.Abstractions.Rule; -using Mediator; +using GFramework.Core.Abstractions.Cqrs; namespace GFramework.Core.Extensions; /// -/// 提供对 IContextAware 接口的 Mediator 统一接口扩展方法 +/// 提供对 IContextAware 接口的 CQRS 统一接口扩展方法。 /// public static class ContextAwareMediatorExtensions { @@ -122,4 +122,4 @@ public static class ContextAwareMediatorExtensions var context = contextAware.GetContext(); return context.SendAsync(command, cancellationToken); } -} \ No newline at end of file +} diff --git a/GFramework.Core/Extensions/ContextAwareMediatorQueryExtensions.cs b/GFramework.Core/Extensions/ContextAwareMediatorQueryExtensions.cs index cbdb01b4..5a4bfeb6 100644 --- a/GFramework.Core/Extensions/ContextAwareMediatorQueryExtensions.cs +++ b/GFramework.Core/Extensions/ContextAwareMediatorQueryExtensions.cs @@ -1,16 +1,15 @@ using GFramework.Core.Abstractions.Rule; -using Mediator; +using GFramework.Core.Abstractions.Cqrs.Query; namespace GFramework.Core.Extensions; /// -/// 提供对 IContextAware 接口的 Mediator 查询扩展方法 -/// 使用 Mediator 库的查询模式 +/// 提供对 IContextAware 接口的 CQRS 查询扩展方法。 /// public static class ContextAwareMediatorQueryExtensions { /// - /// [Mediator] 发送查询的同步版本(不推荐,仅用于兼容性) + /// 发送查询的同步版本(不推荐,仅用于兼容性) /// /// 查询响应类型 /// 实现 IContextAware 接口的对象 @@ -27,7 +26,7 @@ public static class ContextAwareMediatorQueryExtensions } /// - /// [Mediator] 异步发送查询并返回结果 + /// 异步发送查询并返回结果 /// /// 查询响应类型 /// 实现 IContextAware 接口的对象 @@ -44,4 +43,4 @@ public static class ContextAwareMediatorQueryExtensions var context = contextAware.GetContext(); return context.SendQueryAsync(query, cancellationToken); } -} \ No newline at end of file +} diff --git a/GFramework.Core/Ioc/MicrosoftDiContainer.cs b/GFramework.Core/Ioc/MicrosoftDiContainer.cs index 2615ca52..dec4f267 100644 --- a/GFramework.Core/Ioc/MicrosoftDiContainer.cs +++ b/GFramework.Core/Ioc/MicrosoftDiContainer.cs @@ -1,10 +1,10 @@ using GFramework.Core.Abstractions.Bases; +using GFramework.Core.Abstractions.Cqrs; using GFramework.Core.Abstractions.Ioc; using GFramework.Core.Abstractions.Logging; using GFramework.Core.Abstractions.Systems; using GFramework.Core.Logging; using GFramework.Core.Rule; -using Mediator; using Microsoft.Extensions.DependencyInjection; namespace GFramework.Core.Ioc; @@ -804,4 +804,4 @@ public class MicrosoftDiContainer(IServiceCollection? serviceCollection = null) } #endregion -} \ No newline at end of file +} diff --git a/GFramework.Core/Query/AbstractQueryWithResult.cs b/GFramework.Core/Query/AbstractQueryWithResult.cs index cb46071a..2c87622a 100644 --- a/GFramework.Core/Query/AbstractQueryWithResult.cs +++ b/GFramework.Core/Query/AbstractQueryWithResult.cs @@ -9,7 +9,7 @@ namespace GFramework.Core.Query; /// /// 查询输入参数的类型,必须实现IQueryInput接口 /// 查询结果的类型 -public abstract class AbstractQuery(TInput input) : ContextAwareBase, IQuery +public abstract class AbstractQuery(TInput input) : ContextAwareBase, GFramework.Core.Abstractions.Query.IQuery where TInput : IQueryInput { /// @@ -27,4 +27,4 @@ public abstract class AbstractQuery(TInput input) : ContextAwar /// 查询输入参数 /// 查询结果,类型为TResult protected abstract TResult OnDo(TInput input); -} \ No newline at end of file +} diff --git a/GFramework.Godot.SourceGenerators/GeWuYou.GFramework.Godot.SourceGenerators.targets b/GFramework.Godot.SourceGenerators/GeWuYou.GFramework.Godot.SourceGenerators.targets index 5116e4da..fdcf958e 100644 --- a/GFramework.Godot.SourceGenerators/GeWuYou.GFramework.Godot.SourceGenerators.targets +++ b/GFramework.Godot.SourceGenerators/GeWuYou.GFramework.Godot.SourceGenerators.targets @@ -18,9 +18,17 @@ - - - + + + + diff --git a/GFramework.Godot/Coroutine/ContextAwareCoroutineExtensions.cs b/GFramework.Godot/Coroutine/ContextAwareCoroutineExtensions.cs index f0ccc21b..d97d08a7 100644 --- a/GFramework.Godot/Coroutine/ContextAwareCoroutineExtensions.cs +++ b/GFramework.Godot/Coroutine/ContextAwareCoroutineExtensions.cs @@ -1,8 +1,10 @@ -using GFramework.Core.Abstractions.Rule; +using GFramework.Core.Abstractions.Cqrs; +using GFramework.Core.Abstractions.Cqrs.Command; +using GFramework.Core.Abstractions.Cqrs.Query; +using GFramework.Core.Abstractions.Rule; using GFramework.Core.Coroutine; using GFramework.Core.Coroutine.Extensions; using GFramework.Core.Extensions; -using Mediator; namespace GFramework.Godot.Coroutine; @@ -104,4 +106,4 @@ public static class ContextAwareCoroutineExtensions .ToCoroutineEnumerator() .RunCoroutine(segment, tag); } -} \ No newline at end of file +} diff --git a/local-plan/todos/cqrs-rewrite-migration-tracking.md b/local-plan/todos/cqrs-rewrite-migration-tracking.md new file mode 100644 index 00000000..55317f7c --- /dev/null +++ b/local-plan/todos/cqrs-rewrite-migration-tracking.md @@ -0,0 +1,109 @@ +# CQRS 重写迁移跟踪 + +## 目标 + +围绕 `GFramework` 当前的双轨 CQRS 现状,完成一轮以“去 Mediator 外部依赖”为目标的架构迁移: + +- 将 `Mediator` 从 GFramework 公共 API 和运行时主路径中移除 +- 基于 GFramework 自有抽象重建正式 CQRS runtime、行为管道和注册机制 +- 保留 `EventBus` 作为框架级事件系统,不与 CQRS notification 混同 +- 让 `CoreGrid-Migration` 直连本地 `GFramework`,作为真实迁移验证工程 +- 为复杂迁移建立明确恢复点与进度追踪,避免上下文过长或中断后失去状态 + +## 当前恢复点 + +- 恢复点编号:`CQRS-REWRITE-RP-002` +- 当前阶段:`Phase 4` +- 当前焦点: + - 清理剩余 `Mediator` 包依赖与文档残留 + - 评估是否继续把协程扩展和测试项目中的 `Mediator.Abstractions` 完全移除 + - 规划第二阶段优化:代码生成注册、性能收敛、行为 API 命名统一 + +## 本轮计划 + +### Phase 0:工作流基础 + +- [x] 在 `local-plan/todos/` 建立本任务跟踪文档 +- [x] 在 `local-plan/traces/` 建立本任务追踪文档 +- [x] 将恢复点 / trace / subagent 协作规范写入 `AGENTS.md` + +### Phase 1:本地验证链路 + +- [x] 确认 `CoreGrid-Migration` 当前引用形态 +- [x] 将 `CoreGrid-Migration` 从 NuGet 包切到本地 `GFramework` 工程引用 +- [x] 让 `CoreGrid-Migration` 使用本地 Source Generator 而不是外部已发布版本 +- [x] 验证本地引用链路至少能完成 restore / build + +### Phase 2:CQRS 基础重建 + +- [x] 在 `GFramework.Core.Abstractions` 定义自有 CQRS 契约 +- [x] 在 `GFramework.Core` 落地 dispatcher / handler registry / behavior pipeline +- [x] 清理 `IArchitectureContext` 中对 `Mediator.*` 的公共签名依赖 +- [x] 设计 CQRS 模块启用方式,替代 `Configurator => AddMediator(...)` + +### Phase 3:接入迁移 + +- [x] 迁移 `GFramework.Core.Cqrs.*` 基类到新契约 +- [x] 迁移 `ContextAwareMediator*Extensions` 与协程扩展 +- [x] 迁移 `CoreGrid-Migration/scripts/cqrs/**` 到新契约 +- [x] 删除 `GameArchitecture.Configurator` 中的 `AddMediator(...)` + +### Phase 4:收尾 + +- [ ] 移除 `Mediator` 包依赖与相关测试/文档残留 +- [x] 运行目标构建与测试 +- [x] 记录剩余风险与下一恢复点 + +## 当前完成结果 + +- `CoreGrid-Migration` 已直连本地 `GFramework` 源码与本地 source generators。 +- `GameArchitecture` 已不再依赖 `collection.AddMediator(...)` 即可使用 CQRS。 +- `GFramework.Core.Abstractions` 新增自有 CQRS 契约: + - `IRequest` / `INotification` / `IStreamRequest` + - `IRequestHandler<,>` / `INotificationHandler<>` / `IStreamRequestHandler<,>` + - `Unit` + - `IPipelineBehavior<,>` / `MessageHandlerDelegate<,>` +- `ArchitectureBootstrapper` 会在初始化阶段自动扫描并注册当前架构程序集与 `GFramework.Core` 程序集中的 CQRS handlers。 +- `CqrsDispatcher` 已支持: + - request dispatch + - notification publish + - stream dispatch + - context-aware handler 注入 + - request pipeline behavior 链式执行 +- `GFramework.Core.Tests` 中原依赖 `Mediator` 注册路径的测试已切换到框架内建 CQRS 注册路径。 +- 当前验证状态: + - `dotnet build GFramework/GFramework.sln` 通过 + - `dotnet test GFramework/GFramework.Core.Tests/GFramework.Core.Tests.csproj --no-build` 通过,`1621` 个测试全部通过 + - `dotnet build CoreGrid-Migration/CoreGrid.sln` 通过 + +## 当前已知事实 + +- `GFramework` 当前仍同时维护: + - 基于 `CommandExecutor` / `QueryExecutor` / `EventBus` 的轻量旧 CQRS + - 基于 GFramework 自有抽象的新 CQRS runtime +- 仍存在 `Mediator` 残留的区域主要集中在: + - 文档中的历史说明 + - `MediatorCoroutineExtensions` 及对应测试 + - 测试项目对 `Mediator.Abstractions` 的少量残余依赖 +- `CoreGrid-Migration` 已切到本地源码引用,并在当前恢复点完成构建验证 + +## 当前风险 + +- `GFramework` 仓库存在与本任务无关的既有改动,提交时必须避免覆盖 +- `CoreGrid-Migration` 是 worktree,WSL 下原生 `git` 解析该 worktree 路径有兼容问题 +- 当前 `RegisterMediatorBehavior` 命名仍保留历史前缀,但底层已切换为框架自有 CQRS pipeline;若后续要彻底脱媒介命名,需要一次 API 命名迁移 +- 当前 handler 自动注册基于运行时反射扫描;若后续追求冷启动与 AOT 友好性,需要补 source-generator 注册路径 + +## 下次恢复建议 + +若本轮中断,优先从以下顺序恢复: + +1. 查看 `local-plan/traces/cqrs-rewrite-migration-trace.md` +2. 确认当前恢复点 `CQRS-REWRITE-RP-002` 已对应到最新提交 +3. 优先决定是否继续移除 `Mediator.Abstractions` 包与 `MediatorCoroutineExtensions` 历史兼容层 +4. 若继续演进,再处理 CQRS 注册的生成器化与 API 命名统一 + +## 备注 + +- 本文档是当前任务的主恢复点,后续每个关键阶段完成后都要更新 +- 发生方向调整时,不覆盖旧结论,直接追加阶段记录与新的恢复点编号 diff --git a/local-plan/traces/cqrs-rewrite-migration-trace.md b/local-plan/traces/cqrs-rewrite-migration-trace.md new file mode 100644 index 00000000..1df972fe --- /dev/null +++ b/local-plan/traces/cqrs-rewrite-migration-trace.md @@ -0,0 +1,68 @@ +# CQRS 重写迁移追踪 + +## 2026-04-14 + +### 阶段:初始化 + +- 建立 `CQRS-REWRITE-RP-001` 恢复点 +- 已确认本次迁移目标: + - 彻底参考 `Mediator` 思路重写 GFramework 正式 CQRS + - 不保留对 `Mediator` 的兼容层 + - 使用 `abstractions + runtime 可选模块` 边界 + - 保留 `EventBus`,不与 CQRS notification 合并 + +### 已确认的实现前提 + +- `CoreGrid-Migration` 当前仍依赖 NuGet 版 `GeWuYou.GFramework*` +- `CoreGrid/scripts/core/GameArchitecture.cs` 与 `CoreGrid-Migration/scripts/core/GameArchitecture.cs` 通过 `AddMediator(...)` 启用基于生成器的 runtime +- `GFramework` 当前 `IArchitectureContext` 与一批 CQRS 基类直接引用 `Mediator.*` +- `CoreGrid/scripts/cqrs/**` 的 handler 很薄,主要迁移成本在框架 runtime 和注册机制,不在业务逻辑本身 + +### 当前动作 + +- 准备更新 `AGENTS.md`,补充恢复点 / trace / subagent 协作规范 +- 准备将 `CoreGrid-Migration` 切换为本地项目引用,建立真实验证链路 + +### 下一步 + +1. 完成 `AGENTS.md` 规则补充 +2. 改造 `CoreGrid-Migration/CoreGrid.csproj` 为本地项目与本地生成器引用 +3. 进行第一次构建验证,确认本地链路可用 + +### 阶段:CQRS 主路径迁移完成 + +- `CoreGrid-Migration/CoreGrid.csproj` 已切到本地 `ProjectReference` + 本地 source generators +- `CoreGrid-Migration/scripts/core/GameArchitecture.cs` 已删除 `AddMediator(...)` 配置钩子 +- `GFramework.Core.Abstractions` 新增 GFramework 自有 CQRS 契约与 `Unit` +- `IArchitectureContext` / `ArchitectureContext` 已切到自有 CQRS 签名 +- `ArchitectureBootstrapper` 已内建 handler 扫描注册,使用方无需再显式调用 `AddMediator(...)` +- `CqrsDispatcher` 已补齐 request/notification/stream dispatch 与 pipeline behavior 执行 +- `GFramework.Core.Cqrs.*` 基类、`ContextAwareMediator*Extensions`、Godot 协程上下文扩展均已迁到新契约 +- `GFramework.Core.Tests` 中原依赖旧 `Mediator` 注册入口的测试已迁移到 `CqrsTestRuntime` 反射注册路径 + +### 阶段:验证 + +- `dotnet build /mnt/f/gewuyou/System/Documents/WorkSpace/GameDev/GFramework/GFramework.Core/GFramework.Core.csproj` + - 结果:通过 +- `dotnet build /mnt/f/gewuyou/System/Documents/WorkSpace/GameDev/GFramework/GFramework.Core.Tests/GFramework.Core.Tests.csproj` + - 结果:通过 +- `dotnet test /mnt/f/gewuyou/System/Documents/WorkSpace/GameDev/GFramework/GFramework.Core.Tests/GFramework.Core.Tests.csproj --no-build` + - 结果:通过 + - 明细:`1621` 个测试全部通过 +- `dotnet build /mnt/f/gewuyou/System/Documents/WorkSpace/GameDev/GFramework/GFramework.sln` + - 结果:通过 +- `dotnet build /mnt/f/gewuyou/System/Documents/WorkSpace/GameDev/CoreGrid-Migration/CoreGrid.sln` + - 结果:通过 + - 备注:仅存在既有 analyzer warnings,无新增构建错误 + +### 当前残留 + +- 文档与少量历史 API 命名仍保留 `Mediator` 前缀 +- `MediatorCoroutineExtensions` 与少量测试仍依赖 `Mediator.Abstractions` +- handler 自动注册当前使用运行时反射扫描,尚未切回生成器注册 + +### 下一步建议 + +1. 决定是否继续做“完全移除 `Mediator.Abstractions` 包”的第二阶段清理 +2. 若继续,优先迁移协程扩展与相关测试 +3. 评估是否将 `RegisterMediatorBehavior`、`ContextAwareMediator*Extensions` 等历史命名升级为 CQRS 中性命名 From 618f07369e14a8ea5afe604c7ffb915d38bac1aa Mon Sep 17 00:00:00 2001 From: GeWuYou <95328647+GeWuYou@users.noreply.github.com> Date: Tue, 14 Apr 2026 20:56:11 +0800 Subject: [PATCH 2/4] =?UTF-8?q?config(ci):=20=E9=85=8D=E7=BD=AECoderabbit?= =?UTF-8?q?=E4=BB=A5=E6=94=AF=E6=8C=81=E9=87=8D=E6=9E=84=E5=88=86=E6=94=AF?= =?UTF-8?q?=E7=9A=84=E8=87=AA=E5=8A=A8=E5=AE=A1=E6=9F=A5?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - 启用auto_review功能以进行代码审查 - 添加refactor/cqrs-architecture-decoupling作为基础分支 - 配置草稿PR时不进行审查的选项 - 设置聊天自动回复功能 --- .coderabbit.yaml | 2 ++ 1 file changed, 2 insertions(+) diff --git a/.coderabbit.yaml b/.coderabbit.yaml index 8cb624ec..5fc95d9d 100644 --- a/.coderabbit.yaml +++ b/.coderabbit.yaml @@ -16,6 +16,8 @@ reviews: auto_review: enabled: true drafts: false # draft PR 不 review + base_branches: + - refactor/cqrs-architecture-decoupling chat: auto_reply: true From 195c8321a1224760bb98564a0cbf1b60408aa65d Mon Sep 17 00:00:00 2001 From: GeWuYou <95328647+GeWuYou@users.noreply.github.com> Date: Tue, 14 Apr 2026 21:37:32 +0800 Subject: [PATCH 3/4] =?UTF-8?q?feat(cqrs):=20=E6=B7=BB=E5=8A=A0CQRS?= =?UTF-8?q?=E5=91=BD=E4=BB=A4=E6=9F=A5=E8=AF=A2=E8=B4=A3=E4=BB=BB=E5=88=86?= =?UTF-8?q?=E7=A6=BB=E6=9E=B6=E6=9E=84=E6=94=AF=E6=8C=81?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - 实现抽象命令处理器基类支持命令处理 - 添加流式命令处理器基类支持异步流式响应 - 创建查询处理器基类提供统一查询处理接口 - 实现查询基类提供通用查询结构定义 - 扩展架构上下文接口集成CQRS运行时入口 - 定义消息处理器委托支持管道行为处理 - 实现CQRS处理器注册器扫描并注册处理器 - 添加架构模块行为测试验证模块安装功能 - 创建中介器高级特性测试覆盖边界场景 --- .../Architectures/IArchitectureContext.cs | 99 ++++++----- .../Cqrs/MessageHandlerDelegate.cs | 5 + .../ArchitectureModulesBehaviorTests.cs | 8 +- .../Cqrs/CqrsHandlerRegistrarTests.cs | 165 ++++++++++++++++++ GFramework.Core.Tests/CqrsTestRuntime.cs | 60 +------ .../Mediator/MediatorAdvancedFeaturesTests.cs | 29 +-- .../MediatorArchitectureIntegrationTests.cs | 67 ++++++- .../Mediator/MediatorComprehensiveTests.cs | 28 +-- .../Cqrs/Command/AbstractCommandHandler.cs | 9 +- .../Command/AbstractStreamCommandHandler.cs | 6 +- .../Cqrs/Internal/CqrsHandlerRegistrar.cs | 72 +++++++- .../Cqrs/Query/AbstractQueryHandler.cs | 6 +- GFramework.Core/Cqrs/Query/QueryBase.cs | 2 +- .../todos/cqrs-rewrite-migration-tracking.md | 109 ------------ .../traces/cqrs-rewrite-migration-trace.md | 68 -------- 15 files changed, 415 insertions(+), 318 deletions(-) create mode 100644 GFramework.Core.Tests/Cqrs/CqrsHandlerRegistrarTests.cs delete mode 100644 local-plan/todos/cqrs-rewrite-migration-tracking.md delete mode 100644 local-plan/traces/cqrs-rewrite-migration-trace.md diff --git a/GFramework.Core.Abstractions/Architectures/IArchitectureContext.cs b/GFramework.Core.Abstractions/Architectures/IArchitectureContext.cs index 9a08dec6..b9b5dc9a 100644 --- a/GFramework.Core.Abstractions/Architectures/IArchitectureContext.cs +++ b/GFramework.Core.Abstractions/Architectures/IArchitectureContext.cs @@ -1,7 +1,5 @@ using GFramework.Core.Abstractions.Command; using GFramework.Core.Abstractions.Cqrs; -using GFramework.Core.Abstractions.Cqrs.Command; -using GFramework.Core.Abstractions.Cqrs.Query; using GFramework.Core.Abstractions.Environment; using GFramework.Core.Abstractions.Events; using GFramework.Core.Abstractions.Model; @@ -13,8 +11,13 @@ using ICommand = GFramework.Core.Abstractions.Command.ICommand; namespace GFramework.Core.Abstractions.Architectures; /// -/// 架构上下文接口,提供对系统、模型、工具类的访问以及命令、查询、事件的发送和注册功能 +/// 架构上下文接口,统一暴露框架组件访问、兼容旧命令/查询总线,以及当前推荐的 CQRS 运行时入口。 /// +/// +/// 旧的 GFramework.Core.Abstractions.CommandGFramework.Core.Abstractions.Query 契约会继续通过原有 Command/Query Executor 路径执行,以保证存量代码兼容。 +/// 新的 GFramework.Core.Abstractions.Cqrs 契约由内置 CQRS dispatcher 统一处理,支持 request pipeline、notification publish 与 stream request。 +/// 新功能优先使用 与对应的 CQRS Command/Query 重载;迁移旧代码时可先保留旧入口,再逐步替换为 CQRS 请求模型。 +/// public interface IArchitectureContext { /// @@ -106,85 +109,91 @@ public interface IArchitectureContext IReadOnlyList GetUtilitiesByPriority() where TUtility : class, IUtility; /// - /// 发送一个命令 + /// 发送一个旧版命令。 /// - /// 要发送的命令 + /// 要发送的旧版命令。 void SendCommand(ICommand command); /// - /// 发送一个带返回值的命令 + /// 发送一个旧版带返回值命令。 /// - /// 命令执行结果类型 - /// 要发送的命令 - /// 命令执行结果 - TResult SendCommand(Command.ICommand command); + /// 命令执行结果类型。 + /// 要发送的旧版命令。 + /// 命令执行结果。 + TResult SendCommand(ICommand command); /// - /// 发送一个 CQRS 命令并返回结果。 + /// 发送一个新版 CQRS 命令并返回结果。 /// /// 命令响应类型。 /// 要发送的 CQRS 命令。 /// 命令执行结果。 - TResponse SendCommand(GFramework.Core.Abstractions.Cqrs.Command.ICommand command); + /// + /// 这是迁移后的推荐命令入口。无返回值命令应实现 IRequest<Unit>,并优先通过 调用。 + /// + TResponse SendCommand(Cqrs.Command.ICommand command); /// - /// 发送并异步执行一个命令 + /// 异步发送一个旧版命令。 /// - /// 要发送的命令 + /// 要发送的旧版命令。 Task SendCommandAsync(IAsyncCommand command); /// - /// 异步发送一个 CQRS 命令并返回结果。 + /// 异步发送一个新版 CQRS 命令并返回结果。 /// /// 命令响应类型。 /// 要发送的 CQRS 命令。 /// 取消令牌。 /// 包含命令执行结果的值任务。 - ValueTask SendCommandAsync(GFramework.Core.Abstractions.Cqrs.Command.ICommand command, + ValueTask SendCommandAsync(Cqrs.Command.ICommand command, CancellationToken cancellationToken = default); /// - /// 发送并异步执行一个带返回值的命令 + /// 异步发送一个旧版带返回值命令。 /// - /// 命令执行结果类型 - /// 要发送的命令 - /// 命令执行结果 + /// 命令执行结果类型。 + /// 要发送的旧版命令。 + /// 命令执行结果。 Task SendCommandAsync(IAsyncCommand command); /// - /// 发送一个查询请求 + /// 发送一个旧版查询请求。 /// - /// 查询结果类型 - /// 要发送的查询 - /// 查询结果 - TResult SendQuery(Query.IQuery query); + /// 查询结果类型。 + /// 要发送的旧版查询。 + /// 查询结果。 + TResult SendQuery(IQuery query); /// - /// 发送一个 CQRS 查询并返回结果。 + /// 发送一个新版 CQRS 查询并返回结果。 /// /// 查询响应类型。 /// 要发送的 CQRS 查询。 /// 查询结果。 - TResponse SendQuery(GFramework.Core.Abstractions.Cqrs.Query.IQuery query); + /// + /// 这是迁移后的推荐查询入口。新查询应优先实现 GFramework.Core.Abstractions.Cqrs.Query.IQuery<TResponse>。 + /// + TResponse SendQuery(Cqrs.Query.IQuery query); /// - /// 异步发送一个查询请求 + /// 异步发送一个旧版查询请求。 /// - /// 查询结果类型 - /// 要发送的异步查询 - /// 查询结果 + /// 查询结果类型。 + /// 要发送的旧版异步查询。 + /// 查询结果。 Task SendQueryAsync(IAsyncQuery query); /// - /// 异步发送一个 CQRS 查询并返回结果。 + /// 异步发送一个新版 CQRS 查询并返回结果。 /// /// 查询响应类型。 /// 要发送的 CQRS 查询。 /// 取消令牌。 /// 包含查询结果的值任务。 - ValueTask SendQueryAsync(GFramework.Core.Abstractions.Cqrs.Query.IQuery query, + ValueTask SendQueryAsync(Cqrs.Query.IQuery query, CancellationToken cancellationToken = default); /// @@ -216,28 +225,40 @@ public interface IArchitectureContext void UnRegisterEvent(Action onEvent); /// - /// 发送请求(统一处理 Command/Query) + /// 发送新版 CQRS 请求,并统一处理命令与查询。 /// + /// + /// 这是自有 CQRS 运行时的主入口。新代码应优先通过该方法或 进入 dispatcher。 + /// ValueTask SendRequestAsync( IRequest request, CancellationToken cancellationToken = default); /// - /// 发送请求(同步版本,不推荐) + /// 发送新版 CQRS 请求的同步包装版本。 /// + /// + /// 仅为兼容同步调用链保留;新代码应优先使用异步入口,避免阻塞当前线程。 + /// TResponse SendRequest(IRequest request); /// - /// 发布通知(一对多事件) + /// 发布新版 CQRS 通知。 /// + /// + /// 该入口用于一对多通知分发,与框架级 EventBus 事件系统并存,适合围绕请求处理过程传播领域通知。 + /// ValueTask PublishAsync( TNotification notification, CancellationToken cancellationToken = default) where TNotification : INotification; /// - /// 创建流式请求(用于大数据集) + /// 创建新版 CQRS 流式请求。 /// + /// + /// 适用于需要按序惰性产出大量结果的场景。调用方应消费返回的异步序列,而不是回退到旧版查询总线。 + /// IAsyncEnumerable CreateStream( IStreamRequest request, CancellationToken cancellationToken = default); @@ -245,7 +266,7 @@ public interface IArchitectureContext // === 便捷扩展方法 === /// - /// 发送命令(无返回值) + /// 发送一个无返回值的新版 CQRS 命令。 /// ValueTask SendAsync( TCommand command, @@ -253,7 +274,7 @@ public interface IArchitectureContext where TCommand : IRequest; /// - /// 发送命令(有返回值) + /// 发送一个有返回值的新版 CQRS 请求。 /// ValueTask SendAsync( IRequest command, diff --git a/GFramework.Core.Abstractions/Cqrs/MessageHandlerDelegate.cs b/GFramework.Core.Abstractions/Cqrs/MessageHandlerDelegate.cs index 172f7f3b..520f9fee 100644 --- a/GFramework.Core.Abstractions/Cqrs/MessageHandlerDelegate.cs +++ b/GFramework.Core.Abstractions/Cqrs/MessageHandlerDelegate.cs @@ -3,6 +3,11 @@ namespace GFramework.Core.Abstractions.Cqrs; /// /// 表示 CQRS 请求在管道中继续向下执行的处理委托。 /// +/// +/// 管道行为可以通过不调用该委托来短路请求处理。 +/// 除显式实现重试等高级语义外,行为通常应最多调用一次该委托,以维持单次请求分发的确定性。 +/// 调用方应传递当前收到的 ,确保取消信号沿整条管道一致传播。 +/// /// 请求类型。 /// 响应类型。 /// 当前请求消息。 diff --git a/GFramework.Core.Tests/Architectures/ArchitectureModulesBehaviorTests.cs b/GFramework.Core.Tests/Architectures/ArchitectureModulesBehaviorTests.cs index 7485d978..bf6566b4 100644 --- a/GFramework.Core.Tests/Architectures/ArchitectureModulesBehaviorTests.cs +++ b/GFramework.Core.Tests/Architectures/ArchitectureModulesBehaviorTests.cs @@ -2,7 +2,6 @@ using GFramework.Core.Abstractions.Architectures; using GFramework.Core.Abstractions.Utility; using GFramework.Core.Architectures; using GFramework.Core.Logging; -using GFramework.Core.Tests; using GfCqrs = GFramework.Core.Abstractions.Cqrs; namespace GFramework.Core.Tests.Architectures; @@ -67,9 +66,7 @@ public class ArchitectureModulesBehaviorTests await architecture.InitializeAsync(); - var response = await CqrsTestRuntime.ExecutePipelineAsync( - architecture.Context, - new ModuleBehaviorRequest()); + var response = await architecture.Context.SendRequestAsync(new ModuleBehaviorRequest()); Assert.Multiple(() => { @@ -174,8 +171,7 @@ public sealed class TrackingPipelineBehavior : GfCqrs.IPipe /// 取消令牌。 /// 下游处理器的响应结果。 public async ValueTask Handle( - TRequest message, - GfCqrs.MessageHandlerDelegate next, + TRequest message, GfCqrs.MessageHandlerDelegate next, CancellationToken cancellationToken) { InvocationCount++; diff --git a/GFramework.Core.Tests/Cqrs/CqrsHandlerRegistrarTests.cs b/GFramework.Core.Tests/Cqrs/CqrsHandlerRegistrarTests.cs new file mode 100644 index 00000000..43a6272a --- /dev/null +++ b/GFramework.Core.Tests/Cqrs/CqrsHandlerRegistrarTests.cs @@ -0,0 +1,165 @@ +using System.Reflection; +using GFramework.Core.Abstractions.Cqrs; +using GFramework.Core.Abstractions.Logging; +using GFramework.Core.Architectures; +using GFramework.Core.Ioc; +using GFramework.Core.Logging; +using GFramework.Core.Tests.Logging; + +namespace GFramework.Core.Tests.Cqrs; + +/// +/// 验证 CQRS 处理器自动注册在顺序与容错层面的可观察行为。 +/// +[TestFixture] +internal sealed class CqrsHandlerRegistrarTests +{ + private static readonly MethodInfo RecoverLoadableTypesMethod = typeof(ArchitectureContext).Assembly + .GetType( + "GFramework.Core.Cqrs.Internal.CqrsHandlerRegistrar", + throwOnError: true)! + .GetMethod("RecoverLoadableTypes", + BindingFlags.NonPublic | + BindingFlags.Static)! + ?? throw new InvalidOperationException( + "Failed to locate CqrsHandlerRegistrar.RecoverLoadableTypes."); + + private MicrosoftDiContainer? _container; + + private ArchitectureContext? _context; + + /// + /// 初始化测试容器并重置共享状态。 + /// + [SetUp] + public void SetUp() + { + LoggerFactoryResolver.Provider = new ConsoleLoggerFactoryProvider(); + DeterministicNotificationHandlerState.Reset(); + + _container = new MicrosoftDiContainer(); + CqrsTestRuntime.RegisterHandlers( + _container, + typeof(CqrsHandlerRegistrarTests).Assembly, + typeof(ArchitectureContext).Assembly); + + _container.Freeze(); + _context = new ArchitectureContext(_container); + } + + /// + /// 清理测试过程中创建的上下文与共享状态。 + /// + [TearDown] + public void TearDown() + { + _context = null; + _container = null; + DeterministicNotificationHandlerState.Reset(); + } + + /// + /// 验证自动扫描到的通知处理器会按稳定名称顺序执行,而不是依赖反射枚举顺序。 + /// + [Test] + public async Task PublishAsync_Should_Run_Notification_Handlers_In_Deterministic_Name_Order() + { + await _context!.PublishAsync(new DeterministicOrderNotification()); + + Assert.That( + DeterministicNotificationHandlerState.InvocationOrder, + Is.EqualTo( + [ + nameof(AlphaDeterministicNotificationHandler), + nameof(ZetaDeterministicNotificationHandler) + ])); + } + + /// + /// 验证部分类型加载失败时仍能保留可加载类型,并记录诊断日志。 + /// + [Test] + public void RecoverLoadableTypes_Should_Return_Loadable_Types_And_Log_Warnings() + { + var logger = new TestLogger(nameof(CqrsHandlerRegistrarTests), LogLevel.Warning); + var reflectionTypeLoadException = new ReflectionTypeLoadException( + [typeof(AlphaDeterministicNotificationHandler), null], + [new TypeLoadException("Missing optional dependency for registrar test.")]); + + var recoveredTypes = (IReadOnlyList)RecoverLoadableTypesMethod.Invoke( + null, + [typeof(CqrsHandlerRegistrarTests).Assembly, reflectionTypeLoadException, logger])!; + + Assert.Multiple(() => + { + Assert.That(recoveredTypes, Is.EqualTo([typeof(AlphaDeterministicNotificationHandler)])); + Assert.That(logger.Logs.Count(log => log.Level == LogLevel.Warning), Is.GreaterThanOrEqualTo(2)); + Assert.That( + logger.Logs.Any(log => log.Message.Contains("partially failed", StringComparison.Ordinal)), + Is.True); + Assert.That( + logger.Logs.Any(log => log.Message.Contains("Missing optional dependency", StringComparison.Ordinal)), + Is.True); + }); + } +} + +/// +/// 记录确定性通知处理器的实际执行顺序。 +/// +internal static class DeterministicNotificationHandlerState +{ + /// + /// 获取当前测试中的通知处理器执行顺序。 + /// + public static List InvocationOrder { get; } = []; + + /// + /// 重置共享的执行顺序状态。 + /// + public static void Reset() + { + InvocationOrder.Clear(); + } +} + +/// +/// 用于验证同一通知的多个处理器是否按稳定顺序执行。 +/// +internal sealed record DeterministicOrderNotification : INotification; + +/// +/// 故意放在 Alpha 之前声明,用于验证注册器不会依赖源码声明顺序。 +/// +internal sealed class ZetaDeterministicNotificationHandler : INotificationHandler +{ + /// + /// 记录当前处理器已执行。 + /// + /// 通知实例。 + /// 取消令牌。 + /// 已完成任务。 + public ValueTask Handle(DeterministicOrderNotification notification, CancellationToken cancellationToken) + { + DeterministicNotificationHandlerState.InvocationOrder.Add(nameof(ZetaDeterministicNotificationHandler)); + return ValueTask.CompletedTask; + } +} + +/// +/// 名称排序上应先于 Zeta 处理器执行的通知处理器。 +/// +internal sealed class AlphaDeterministicNotificationHandler : INotificationHandler +{ + /// + /// 记录当前处理器已执行。 + /// + /// 通知实例。 + /// 取消令牌。 + /// 已完成任务。 + public ValueTask Handle(DeterministicOrderNotification notification, CancellationToken cancellationToken) + { + DeterministicNotificationHandlerState.InvocationOrder.Add(nameof(AlphaDeterministicNotificationHandler)); + return ValueTask.CompletedTask; + } +} diff --git a/GFramework.Core.Tests/CqrsTestRuntime.cs b/GFramework.Core.Tests/CqrsTestRuntime.cs index ae1d3361..e2801cc8 100644 --- a/GFramework.Core.Tests/CqrsTestRuntime.cs +++ b/GFramework.Core.Tests/CqrsTestRuntime.cs @@ -1,22 +1,22 @@ using System.Reflection; -using GFramework.Core.Abstractions.Architectures; -using GFramework.Core.Abstractions.Logging; -using GFramework.Core.Abstractions.Rule; using GFramework.Core.Architectures; using GFramework.Core.Ioc; using GFramework.Core.Logging; -using GfCqrs = GFramework.Core.Abstractions.Cqrs; namespace GFramework.Core.Tests; internal static class CqrsTestRuntime { private static readonly MethodInfo RegisterHandlersMethod = typeof(ArchitectureContext).Assembly - .GetType("GFramework.Core.Cqrs.Internal.CqrsHandlerRegistrar", throwOnError: true)! - .GetMethod( - "RegisterHandlers", - BindingFlags.Public | BindingFlags.NonPublic | BindingFlags.Static)! - ?? throw new InvalidOperationException("Failed to locate CqrsHandlerRegistrar.RegisterHandlers."); + .GetType( + "GFramework.Core.Cqrs.Internal.CqrsHandlerRegistrar", + throwOnError: true)! + .GetMethod( + "RegisterHandlers", + BindingFlags.Public | BindingFlags.NonPublic | + BindingFlags.Static)! + ?? throw new InvalidOperationException( + "Failed to locate CqrsHandlerRegistrar.RegisterHandlers."); public static void RegisterHandlers(MicrosoftDiContainer container, params Assembly[] assemblies) { @@ -28,46 +28,4 @@ internal static class CqrsTestRuntime null, [container, assemblies.Where(static assembly => assembly is not null).Distinct().ToArray(), logger]); } - - public static ValueTask ExecutePipelineAsync( - IArchitectureContext context, - TRequest request, - CancellationToken cancellationToken = default) - where TRequest : class, GfCqrs.IRequest - { - ArgumentNullException.ThrowIfNull(context); - ArgumentNullException.ThrowIfNull(request); - - var handlers = context.GetServices>(); - if (handlers.Count == 0) - throw new InvalidOperationException( - $"No CQRS request handler registered for {typeof(TRequest).FullName}."); - - if (handlers.Count > 1) - throw new InvalidOperationException( - $"Expected a single CQRS request handler for {typeof(TRequest).FullName}, but found {handlers.Count}."); - - var handler = handlers[0]; - PrepareContext(handler, context); - - GfCqrs.MessageHandlerDelegate pipeline = handler.Handle; - - var behaviors = context.GetServices>(); - for (var index = behaviors.Count - 1; index >= 0; index--) - { - var behavior = behaviors[index]; - PrepareContext(behavior, context); - - var next = pipeline; - pipeline = (message, token) => behavior.Handle(message, next, token); - } - - return pipeline(request, cancellationToken); - } - - private static void PrepareContext(object instance, IArchitectureContext context) - { - if (instance is IContextAware contextAware) - contextAware.SetContext(context); - } } diff --git a/GFramework.Core.Tests/Mediator/MediatorAdvancedFeaturesTests.cs b/GFramework.Core.Tests/Mediator/MediatorAdvancedFeaturesTests.cs index d3bf4041..2dc2503a 100644 --- a/GFramework.Core.Tests/Mediator/MediatorAdvancedFeaturesTests.cs +++ b/GFramework.Core.Tests/Mediator/MediatorAdvancedFeaturesTests.cs @@ -4,7 +4,6 @@ using GFramework.Core.Abstractions.Cqrs; using GFramework.Core.Architectures; using GFramework.Core.Ioc; using GFramework.Core.Logging; -using GFramework.Core.Tests; namespace GFramework.Core.Tests.Mediator; @@ -15,11 +14,16 @@ namespace GFramework.Core.Tests.Mediator; [TestFixture] public class MediatorAdvancedFeaturesTests { + private MicrosoftDiContainer? _container; + + private ArchitectureContext? _context; + [SetUp] public void SetUp() { LoggerFactoryResolver.Provider = new ConsoleLoggerFactoryProvider(); _container = new MicrosoftDiContainer(); + TestCircuitBreakerHandler.Reset(); var loggerField = typeof(MicrosoftDiContainer).GetField("_logger", BindingFlags.NonPublic | BindingFlags.Instance); @@ -42,9 +46,6 @@ public class MediatorAdvancedFeaturesTests _container = null; } - private ArchitectureContext? _context; - private MicrosoftDiContainer? _container; - [Test] public async Task Request_With_Validation_Behavior_Should_Validate_Input() @@ -135,9 +136,6 @@ public class MediatorAdvancedFeaturesTests [Test] public async Task Circuit_Breaker_Should_Prevent_Cascading_Failures() { - TestCircuitBreakerHandler.FailureCount = 0; - TestCircuitBreakerHandler.SuccessCount = 0; - // 先触发几次失败 for (int i = 0; i < 5; i++) { @@ -275,12 +273,10 @@ public sealed class TestTransientErrorRequestHandler : IRequestHandler { - private static bool _circuitOpen = false; - public ValueTask Handle(TestCircuitBreakerRequest request, CancellationToken cancellationToken) { // 检查断路器状态 - if (_circuitOpen) + if (TestCircuitBreakerHandler.CircuitOpen) { throw new InvalidOperationException("Circuit breaker is open"); } @@ -292,7 +288,7 @@ public sealed class TestCircuitBreakerRequestHandler : IRequestHandler= 5) { - _circuitOpen = true; + TestCircuitBreakerHandler.CircuitOpen = true; } throw new InvalidOperationException("Service unavailable"); @@ -451,6 +447,17 @@ public static class TestCircuitBreakerHandler { public static int FailureCount { get; set; } public static int SuccessCount { get; set; } + public static bool CircuitOpen { get; set; } + + /// + /// 重置断路器测试状态,避免静态字段在测试之间互相污染。 + /// + public static void Reset() + { + FailureCount = 0; + SuccessCount = 0; + CircuitOpen = false; + } } public sealed record TestCircuitBreakerRequest : IRequest diff --git a/GFramework.Core.Tests/Mediator/MediatorArchitectureIntegrationTests.cs b/GFramework.Core.Tests/Mediator/MediatorArchitectureIntegrationTests.cs index 056517f7..e176cce5 100644 --- a/GFramework.Core.Tests/Mediator/MediatorArchitectureIntegrationTests.cs +++ b/GFramework.Core.Tests/Mediator/MediatorArchitectureIntegrationTests.cs @@ -6,7 +6,7 @@ using GFramework.Core.Architectures; using GFramework.Core.Command; using GFramework.Core.Ioc; using GFramework.Core.Logging; -using GFramework.Core.Tests; +using GFramework.Core.Rule; using ICommand = GFramework.Core.Abstractions.Command.ICommand; namespace GFramework.Core.Tests.Mediator; @@ -18,11 +18,17 @@ namespace GFramework.Core.Tests.Mediator; [TestFixture] public class MediatorArchitectureIntegrationTests { + private CommandExecutor? _commandBus; + private MicrosoftDiContainer? _container; + + private ArchitectureContext? _context; + [SetUp] public void SetUp() { LoggerFactoryResolver.Provider = new ConsoleLoggerFactoryProvider(); _container = new MicrosoftDiContainer(); + TestPerDispatchContextAwareHandler.Reset(); var loggerField = typeof(MicrosoftDiContainer).GetField("_logger", BindingFlags.NonPublic | BindingFlags.Instance); @@ -50,10 +56,6 @@ public class MediatorArchitectureIntegrationTests _commandBus = null; } - private ArchitectureContext? _context; - private MicrosoftDiContainer? _container; - private CommandExecutor? _commandBus; - [Test] public async Task Handler_Can_Access_Architecture_Context() { @@ -291,6 +293,20 @@ public class MediatorArchitectureIntegrationTests Assert.That(traditionalCommand.Executed, Is.True); Assert.That(result, Is.EqualTo(42)); } + + [Test] + public async Task ContextAware_Handler_Should_Use_A_Fresh_Instance_Per_Request() + { + var firstResult = await _context!.SendRequestAsync(new TestPerDispatchContextAwareRequest()); + var secondResult = await _context.SendRequestAsync(new TestPerDispatchContextAwareRequest()); + + Assert.Multiple(() => + { + Assert.That(firstResult, Is.Not.EqualTo(secondResult)); + Assert.That(TestPerDispatchContextAwareHandler.SeenInstanceIds, Is.EqualTo([firstResult, secondResult])); + Assert.That(TestPerDispatchContextAwareHandler.Contexts, Has.All.SameAs(_context)); + }); + } } #region Integration Test Classes @@ -444,6 +460,42 @@ public sealed class TestMediatorRequestHandler : IRequestHandler +/// 用于验证自动扫描到的上下文感知处理器会按请求创建新实例。 +/// +public sealed class TestPerDispatchContextAwareHandler : ContextAwareBase, + IRequestHandler +{ + private static int _nextInstanceId; + private readonly int _instanceId = Interlocked.Increment(ref _nextInstanceId); + + public static List Contexts { get; } = []; + public static List SeenInstanceIds { get; } = []; + + /// + /// 记录当前实例编号与收到的架构上下文。 + /// + /// 请求实例。 + /// 取消令牌。 + /// 当前处理器实例编号。 + public ValueTask Handle(TestPerDispatchContextAwareRequest request, CancellationToken cancellationToken) + { + Contexts.Add(Context); + SeenInstanceIds.Add(_instanceId); + return ValueTask.FromResult(_instanceId); + } + + /// + /// 重置跨测试共享的实例跟踪状态。 + /// + public static void Reset() + { + Contexts.Clear(); + SeenInstanceIds.Clear(); + _nextInstanceId = 0; + } +} + public sealed record TestContextAwareRequest : IRequest; public static class TestContextAwareHandler @@ -544,6 +596,11 @@ public sealed record TestMediatorRequest : IRequest public int Value { get; init; } } +/// +/// 用于验证每次请求分发都会获得新的上下文感知处理器实例。 +/// +public sealed record TestPerDispatchContextAwareRequest : IRequest; + // 传统命令用于混合测试 public class TestTraditionalCommand : ICommand { diff --git a/GFramework.Core.Tests/Mediator/MediatorComprehensiveTests.cs b/GFramework.Core.Tests/Mediator/MediatorComprehensiveTests.cs index 27522c16..27dfed5c 100644 --- a/GFramework.Core.Tests/Mediator/MediatorComprehensiveTests.cs +++ b/GFramework.Core.Tests/Mediator/MediatorComprehensiveTests.cs @@ -11,7 +11,6 @@ using GFramework.Core.Events; using GFramework.Core.Ioc; using GFramework.Core.Logging; using GFramework.Core.Query; -using GFramework.Core.Tests; using ICommand = GFramework.Core.Abstractions.Command.ICommand; using Unit = GFramework.Core.Abstractions.Cqrs.Unit; @@ -20,6 +19,15 @@ namespace GFramework.Core.Tests.Mediator; [TestFixture] public class MediatorComprehensiveTests { + private AsyncQueryExecutor? _asyncQueryBus; + private CommandExecutor? _commandBus; + private MicrosoftDiContainer? _container; + + private ArchitectureContext? _context; + private DefaultEnvironment? _environment; + private EventBus? _eventBus; + private QueryExecutor? _queryBus; + /// /// 测试初始化方法,在每个测试方法执行前运行。 /// 负责初始化日志工厂、依赖注入容器、自有 CQRS 处理器以及各种总线服务。 @@ -74,14 +82,6 @@ public class MediatorComprehensiveTests _environment = null; } - private ArchitectureContext? _context; - private MicrosoftDiContainer? _container; - private EventBus? _eventBus; - private CommandExecutor? _commandBus; - private QueryExecutor? _queryBus; - private AsyncQueryExecutor? _asyncQueryBus; - private DefaultEnvironment? _environment; - /// /// 测试SendRequestAsync方法在请求有效时返回结果 /// @@ -418,7 +418,7 @@ public class MediatorComprehensiveTests _context!.SendCommand(legacyCommand); Assert.That(legacyCommand.Executed, Is.True); - // 使用Mediator方式 + // 使用自有 CQRS 方式 var mediatorCommand = new TestCommandWithResult { ResultValue = 999 }; var result = await _context.SendAsync(mediatorCommand); Assert.That(result, Is.EqualTo(999)); @@ -429,7 +429,7 @@ public class MediatorComprehensiveTests } } -#region Advanced Test Classes for Mediator Features +#region Advanced Test Classes for CQRS Features public sealed record TestLongRunningRequest : IRequest { @@ -623,9 +623,9 @@ public class TestLegacyCommand : ICommand #endregion -#region Test Classes - Mediator (新实现) +#region Test Classes - CQRS Runtime -// ✅ 这些类使用 Mediator.IRequest +// ✅ 这些类使用自有 CQRS IRequest public sealed record TestRequest : IRequest { public int Value { get; init; } @@ -657,7 +657,7 @@ public sealed record TestStreamRequest : IStreamRequest public int[] Values { get; init; } = []; } -// ✅ 这些 Handler 使用 Mediator.IRequestHandler +// ✅ 这些 Handler 使用自有 CQRS IRequestHandler public sealed class TestRequestHandler : IRequestHandler { public ValueTask Handle(TestRequest request, CancellationToken cancellationToken) diff --git a/GFramework.Core/Cqrs/Command/AbstractCommandHandler.cs b/GFramework.Core/Cqrs/Command/AbstractCommandHandler.cs index 0ffc7424..528de106 100644 --- a/GFramework.Core/Cqrs/Command/AbstractCommandHandler.cs +++ b/GFramework.Core/Cqrs/Command/AbstractCommandHandler.cs @@ -11,15 +11,16 @@ // See the License for the specific language governing permissions and // limitations under the License. -using GFramework.Core.Rule; using GFramework.Core.Abstractions.Cqrs; using GFramework.Core.Abstractions.Cqrs.Command; +using GFramework.Core.Rule; namespace GFramework.Core.Cqrs.Command; /// /// 抽象命令处理器基类 -/// 继承自ContextAwareBase并实现ICommandHandler接口,为具体的命令处理器提供基础功能 +/// 继承自 ContextAwareBase 并实现 IRequestHandler 接口,为具体的命令处理器提供基础功能。 +/// 框架会在每次分发前注入当前架构上下文,因此派生类可以通过 Context 访问架构级服务。 /// /// 命令类型 public abstract class AbstractCommandHandler : ContextAwareBase, IRequestHandler @@ -37,8 +38,8 @@ public abstract class AbstractCommandHandler : ContextAwareBase, IRequ /// /// 抽象命令处理器基类(带返回值版本) -/// 继承自ContextAwareBase并实现ICommandHandler接口,为具体的命令处理器提供基础功能 -/// 支持泛型命令和结果类型,实现CQRS模式中的命令处理 +/// 继承自 ContextAwareBase 并实现 IRequestHandler 接口,为具体的命令处理器提供基础功能。 +/// 支持泛型命令和结果类型,框架会在每次分发前注入当前架构上下文。 /// /// 命令类型,必须实现ICommand接口 /// 命令执行结果类型 diff --git a/GFramework.Core/Cqrs/Command/AbstractStreamCommandHandler.cs b/GFramework.Core/Cqrs/Command/AbstractStreamCommandHandler.cs index 0f86f36c..84e8e029 100644 --- a/GFramework.Core/Cqrs/Command/AbstractStreamCommandHandler.cs +++ b/GFramework.Core/Cqrs/Command/AbstractStreamCommandHandler.cs @@ -11,16 +11,16 @@ // See the License for the specific language governing permissions and // limitations under the License. -using GFramework.Core.Rule; using GFramework.Core.Abstractions.Cqrs; using GFramework.Core.Abstractions.Cqrs.Command; +using GFramework.Core.Rule; namespace GFramework.Core.Cqrs.Command; /// /// 抽象流式命令处理器基类 -/// 继承自ContextAwareBase并实现IStreamCommandHandler接口,为具体的流式命令处理器提供基础功能 -/// 支持流式处理命令并产生异步可枚举的响应序列 +/// 继承自 ContextAwareBase 并实现 IStreamRequestHandler 接口,为具体的流式命令处理器提供基础功能。 +/// 支持流式处理命令并产生异步可枚举的响应序列,框架会在每次创建流前注入当前架构上下文。 /// /// 流式命令类型,必须实现IStreamCommand接口 /// 流式命令响应元素类型 diff --git a/GFramework.Core/Cqrs/Internal/CqrsHandlerRegistrar.cs b/GFramework.Core/Cqrs/Internal/CqrsHandlerRegistrar.cs index 9f69bc33..136b76a7 100644 --- a/GFramework.Core/Cqrs/Internal/CqrsHandlerRegistrar.cs +++ b/GFramework.Core/Cqrs/Internal/CqrsHandlerRegistrar.cs @@ -2,7 +2,6 @@ using System.Reflection; using GFramework.Core.Abstractions.Cqrs; using GFramework.Core.Abstractions.Ioc; using GFramework.Core.Abstractions.Logging; -using Microsoft.Extensions.DependencyInjection; namespace GFramework.Core.Cqrs.Internal; @@ -27,7 +26,10 @@ internal static class CqrsHandlerRegistrar ArgumentNullException.ThrowIfNull(assemblies); ArgumentNullException.ThrowIfNull(logger); - foreach (var assembly in assemblies.Distinct()) + foreach (var assembly in assemblies + .Where(static assembly => assembly is not null) + .Distinct() + .OrderBy(GetAssemblySortKey, StringComparer.Ordinal)) { RegisterAssemblyHandlers(container.GetServicesUnsafe, assembly, logger); } @@ -38,11 +40,12 @@ internal static class CqrsHandlerRegistrar /// private static void RegisterAssemblyHandlers(IServiceCollection services, Assembly assembly, ILogger logger) { - foreach (var implementationType in assembly.GetTypes().Where(IsConcreteHandlerType)) + foreach (var implementationType in GetLoadableTypes(assembly, logger).Where(IsConcreteHandlerType)) { var handlerInterfaces = implementationType .GetInterfaces() .Where(IsSupportedHandlerInterface) + .OrderBy(GetTypeSortKey, StringComparer.Ordinal) .ToList(); if (handlerInterfaces.Count == 0) @@ -50,13 +53,58 @@ internal static class CqrsHandlerRegistrar foreach (var handlerInterface in handlerInterfaces) { - services.AddSingleton(handlerInterface, implementationType); + // Request/notification handlers receive context injection before every dispatch. + // Transient registration avoids sharing mutable Context across concurrent requests. + services.AddTransient(handlerInterface, implementationType); logger.Debug( $"Registered CQRS handler {implementationType.FullName} as {handlerInterface.FullName}."); } } } + /// + /// 安全获取程序集中的可加载类型,并在部分类型加载失败时保留其余处理器注册能力。 + /// + private static IReadOnlyList GetLoadableTypes(Assembly assembly, ILogger logger) + { + try + { + return assembly.GetTypes() + .Where(static type => type is not null) + .OrderBy(GetTypeSortKey, StringComparer.Ordinal) + .ToList(); + } + catch (ReflectionTypeLoadException exception) + { + return RecoverLoadableTypes(assembly, exception, logger); + } + } + + /// + /// 记录部分类型加载失败,并返回仍然可用的类型集合。 + /// + private static IReadOnlyList RecoverLoadableTypes( + Assembly assembly, + ReflectionTypeLoadException exception, + ILogger logger) + { + var assemblyName = GetAssemblySortKey(assembly); + logger.Warn( + $"CQRS handler scan partially failed for assembly {assemblyName}. Continuing with loadable types."); + + foreach (var loaderException in exception.LoaderExceptions.Where(static ex => ex is not null)) + { + logger.Warn( + $"Failed to load one or more types while scanning {assemblyName}: {loaderException!.Message}"); + } + + return exception.Types + .Where(static type => type is not null) + .Cast() + .OrderBy(GetTypeSortKey, StringComparer.Ordinal) + .ToList(); + } + /// /// 判断指定类型是否可作为可实例化处理器。 /// @@ -78,4 +126,20 @@ internal static class CqrsHandlerRegistrar definition == typeof(INotificationHandler<>) || definition == typeof(IStreamRequestHandler<,>); } + + /// + /// 生成程序集排序键,保证跨运行环境的处理器注册顺序稳定。 + /// + private static string GetAssemblySortKey(Assembly assembly) + { + return assembly.FullName ?? assembly.GetName().Name ?? assembly.ToString(); + } + + /// + /// 生成类型排序键,保证同一程序集内的处理器与接口映射顺序稳定。 + /// + private static string GetTypeSortKey(Type type) + { + return type.FullName ?? type.Name; + } } diff --git a/GFramework.Core/Cqrs/Query/AbstractQueryHandler.cs b/GFramework.Core/Cqrs/Query/AbstractQueryHandler.cs index f861312c..e9a3795f 100644 --- a/GFramework.Core/Cqrs/Query/AbstractQueryHandler.cs +++ b/GFramework.Core/Cqrs/Query/AbstractQueryHandler.cs @@ -11,16 +11,16 @@ // See the License for the specific language governing permissions and // limitations under the License. -using GFramework.Core.Rule; using GFramework.Core.Abstractions.Cqrs; using GFramework.Core.Abstractions.Cqrs.Query; +using GFramework.Core.Rule; namespace GFramework.Core.Cqrs.Query; /// /// 抽象查询处理器基类 -/// 继承自ContextAwareBase并实现IQueryHandler接口,为具体的查询处理器提供基础功能 -/// 支持泛型查询和结果类型,实现CQRS模式中的查询处理 +/// 继承自 ContextAwareBase 并实现 IRequestHandler 接口,为具体的查询处理器提供基础功能。 +/// 框架会在每次分发前注入当前架构上下文,因此派生类可以通过 Context 访问架构级服务。 /// /// 查询类型,必须实现IQuery接口 /// 查询结果类型 diff --git a/GFramework.Core/Cqrs/Query/QueryBase.cs b/GFramework.Core/Cqrs/Query/QueryBase.cs index cb0d4782..6bccf549 100644 --- a/GFramework.Core/Cqrs/Query/QueryBase.cs +++ b/GFramework.Core/Cqrs/Query/QueryBase.cs @@ -17,7 +17,7 @@ namespace GFramework.Core.Cqrs.Query; /// /// 表示一个基础查询类,用于处理带有输入和响应的查询模式实现。 -/// 该类继承自 Mediator.IQuery<TResponse> 接口,提供了通用的查询结构。 +/// 该类实现 IQuery<TResponse> 接口,提供了通用的查询结构。 /// /// 查询输入数据的类型,必须实现 IQueryInput 接口 /// 查询执行后返回结果的类型 diff --git a/local-plan/todos/cqrs-rewrite-migration-tracking.md b/local-plan/todos/cqrs-rewrite-migration-tracking.md deleted file mode 100644 index 55317f7c..00000000 --- a/local-plan/todos/cqrs-rewrite-migration-tracking.md +++ /dev/null @@ -1,109 +0,0 @@ -# CQRS 重写迁移跟踪 - -## 目标 - -围绕 `GFramework` 当前的双轨 CQRS 现状,完成一轮以“去 Mediator 外部依赖”为目标的架构迁移: - -- 将 `Mediator` 从 GFramework 公共 API 和运行时主路径中移除 -- 基于 GFramework 自有抽象重建正式 CQRS runtime、行为管道和注册机制 -- 保留 `EventBus` 作为框架级事件系统,不与 CQRS notification 混同 -- 让 `CoreGrid-Migration` 直连本地 `GFramework`,作为真实迁移验证工程 -- 为复杂迁移建立明确恢复点与进度追踪,避免上下文过长或中断后失去状态 - -## 当前恢复点 - -- 恢复点编号:`CQRS-REWRITE-RP-002` -- 当前阶段:`Phase 4` -- 当前焦点: - - 清理剩余 `Mediator` 包依赖与文档残留 - - 评估是否继续把协程扩展和测试项目中的 `Mediator.Abstractions` 完全移除 - - 规划第二阶段优化:代码生成注册、性能收敛、行为 API 命名统一 - -## 本轮计划 - -### Phase 0:工作流基础 - -- [x] 在 `local-plan/todos/` 建立本任务跟踪文档 -- [x] 在 `local-plan/traces/` 建立本任务追踪文档 -- [x] 将恢复点 / trace / subagent 协作规范写入 `AGENTS.md` - -### Phase 1:本地验证链路 - -- [x] 确认 `CoreGrid-Migration` 当前引用形态 -- [x] 将 `CoreGrid-Migration` 从 NuGet 包切到本地 `GFramework` 工程引用 -- [x] 让 `CoreGrid-Migration` 使用本地 Source Generator 而不是外部已发布版本 -- [x] 验证本地引用链路至少能完成 restore / build - -### Phase 2:CQRS 基础重建 - -- [x] 在 `GFramework.Core.Abstractions` 定义自有 CQRS 契约 -- [x] 在 `GFramework.Core` 落地 dispatcher / handler registry / behavior pipeline -- [x] 清理 `IArchitectureContext` 中对 `Mediator.*` 的公共签名依赖 -- [x] 设计 CQRS 模块启用方式,替代 `Configurator => AddMediator(...)` - -### Phase 3:接入迁移 - -- [x] 迁移 `GFramework.Core.Cqrs.*` 基类到新契约 -- [x] 迁移 `ContextAwareMediator*Extensions` 与协程扩展 -- [x] 迁移 `CoreGrid-Migration/scripts/cqrs/**` 到新契约 -- [x] 删除 `GameArchitecture.Configurator` 中的 `AddMediator(...)` - -### Phase 4:收尾 - -- [ ] 移除 `Mediator` 包依赖与相关测试/文档残留 -- [x] 运行目标构建与测试 -- [x] 记录剩余风险与下一恢复点 - -## 当前完成结果 - -- `CoreGrid-Migration` 已直连本地 `GFramework` 源码与本地 source generators。 -- `GameArchitecture` 已不再依赖 `collection.AddMediator(...)` 即可使用 CQRS。 -- `GFramework.Core.Abstractions` 新增自有 CQRS 契约: - - `IRequest` / `INotification` / `IStreamRequest` - - `IRequestHandler<,>` / `INotificationHandler<>` / `IStreamRequestHandler<,>` - - `Unit` - - `IPipelineBehavior<,>` / `MessageHandlerDelegate<,>` -- `ArchitectureBootstrapper` 会在初始化阶段自动扫描并注册当前架构程序集与 `GFramework.Core` 程序集中的 CQRS handlers。 -- `CqrsDispatcher` 已支持: - - request dispatch - - notification publish - - stream dispatch - - context-aware handler 注入 - - request pipeline behavior 链式执行 -- `GFramework.Core.Tests` 中原依赖 `Mediator` 注册路径的测试已切换到框架内建 CQRS 注册路径。 -- 当前验证状态: - - `dotnet build GFramework/GFramework.sln` 通过 - - `dotnet test GFramework/GFramework.Core.Tests/GFramework.Core.Tests.csproj --no-build` 通过,`1621` 个测试全部通过 - - `dotnet build CoreGrid-Migration/CoreGrid.sln` 通过 - -## 当前已知事实 - -- `GFramework` 当前仍同时维护: - - 基于 `CommandExecutor` / `QueryExecutor` / `EventBus` 的轻量旧 CQRS - - 基于 GFramework 自有抽象的新 CQRS runtime -- 仍存在 `Mediator` 残留的区域主要集中在: - - 文档中的历史说明 - - `MediatorCoroutineExtensions` 及对应测试 - - 测试项目对 `Mediator.Abstractions` 的少量残余依赖 -- `CoreGrid-Migration` 已切到本地源码引用,并在当前恢复点完成构建验证 - -## 当前风险 - -- `GFramework` 仓库存在与本任务无关的既有改动,提交时必须避免覆盖 -- `CoreGrid-Migration` 是 worktree,WSL 下原生 `git` 解析该 worktree 路径有兼容问题 -- 当前 `RegisterMediatorBehavior` 命名仍保留历史前缀,但底层已切换为框架自有 CQRS pipeline;若后续要彻底脱媒介命名,需要一次 API 命名迁移 -- 当前 handler 自动注册基于运行时反射扫描;若后续追求冷启动与 AOT 友好性,需要补 source-generator 注册路径 - -## 下次恢复建议 - -若本轮中断,优先从以下顺序恢复: - -1. 查看 `local-plan/traces/cqrs-rewrite-migration-trace.md` -2. 确认当前恢复点 `CQRS-REWRITE-RP-002` 已对应到最新提交 -3. 优先决定是否继续移除 `Mediator.Abstractions` 包与 `MediatorCoroutineExtensions` 历史兼容层 -4. 若继续演进,再处理 CQRS 注册的生成器化与 API 命名统一 - -## 备注 - -- 本文档是当前任务的主恢复点,后续每个关键阶段完成后都要更新 -- 发生方向调整时,不覆盖旧结论,直接追加阶段记录与新的恢复点编号 diff --git a/local-plan/traces/cqrs-rewrite-migration-trace.md b/local-plan/traces/cqrs-rewrite-migration-trace.md deleted file mode 100644 index 1df972fe..00000000 --- a/local-plan/traces/cqrs-rewrite-migration-trace.md +++ /dev/null @@ -1,68 +0,0 @@ -# CQRS 重写迁移追踪 - -## 2026-04-14 - -### 阶段:初始化 - -- 建立 `CQRS-REWRITE-RP-001` 恢复点 -- 已确认本次迁移目标: - - 彻底参考 `Mediator` 思路重写 GFramework 正式 CQRS - - 不保留对 `Mediator` 的兼容层 - - 使用 `abstractions + runtime 可选模块` 边界 - - 保留 `EventBus`,不与 CQRS notification 合并 - -### 已确认的实现前提 - -- `CoreGrid-Migration` 当前仍依赖 NuGet 版 `GeWuYou.GFramework*` -- `CoreGrid/scripts/core/GameArchitecture.cs` 与 `CoreGrid-Migration/scripts/core/GameArchitecture.cs` 通过 `AddMediator(...)` 启用基于生成器的 runtime -- `GFramework` 当前 `IArchitectureContext` 与一批 CQRS 基类直接引用 `Mediator.*` -- `CoreGrid/scripts/cqrs/**` 的 handler 很薄,主要迁移成本在框架 runtime 和注册机制,不在业务逻辑本身 - -### 当前动作 - -- 准备更新 `AGENTS.md`,补充恢复点 / trace / subagent 协作规范 -- 准备将 `CoreGrid-Migration` 切换为本地项目引用,建立真实验证链路 - -### 下一步 - -1. 完成 `AGENTS.md` 规则补充 -2. 改造 `CoreGrid-Migration/CoreGrid.csproj` 为本地项目与本地生成器引用 -3. 进行第一次构建验证,确认本地链路可用 - -### 阶段:CQRS 主路径迁移完成 - -- `CoreGrid-Migration/CoreGrid.csproj` 已切到本地 `ProjectReference` + 本地 source generators -- `CoreGrid-Migration/scripts/core/GameArchitecture.cs` 已删除 `AddMediator(...)` 配置钩子 -- `GFramework.Core.Abstractions` 新增 GFramework 自有 CQRS 契约与 `Unit` -- `IArchitectureContext` / `ArchitectureContext` 已切到自有 CQRS 签名 -- `ArchitectureBootstrapper` 已内建 handler 扫描注册,使用方无需再显式调用 `AddMediator(...)` -- `CqrsDispatcher` 已补齐 request/notification/stream dispatch 与 pipeline behavior 执行 -- `GFramework.Core.Cqrs.*` 基类、`ContextAwareMediator*Extensions`、Godot 协程上下文扩展均已迁到新契约 -- `GFramework.Core.Tests` 中原依赖旧 `Mediator` 注册入口的测试已迁移到 `CqrsTestRuntime` 反射注册路径 - -### 阶段:验证 - -- `dotnet build /mnt/f/gewuyou/System/Documents/WorkSpace/GameDev/GFramework/GFramework.Core/GFramework.Core.csproj` - - 结果:通过 -- `dotnet build /mnt/f/gewuyou/System/Documents/WorkSpace/GameDev/GFramework/GFramework.Core.Tests/GFramework.Core.Tests.csproj` - - 结果:通过 -- `dotnet test /mnt/f/gewuyou/System/Documents/WorkSpace/GameDev/GFramework/GFramework.Core.Tests/GFramework.Core.Tests.csproj --no-build` - - 结果:通过 - - 明细:`1621` 个测试全部通过 -- `dotnet build /mnt/f/gewuyou/System/Documents/WorkSpace/GameDev/GFramework/GFramework.sln` - - 结果:通过 -- `dotnet build /mnt/f/gewuyou/System/Documents/WorkSpace/GameDev/CoreGrid-Migration/CoreGrid.sln` - - 结果:通过 - - 备注:仅存在既有 analyzer warnings,无新增构建错误 - -### 当前残留 - -- 文档与少量历史 API 命名仍保留 `Mediator` 前缀 -- `MediatorCoroutineExtensions` 与少量测试仍依赖 `Mediator.Abstractions` -- handler 自动注册当前使用运行时反射扫描,尚未切回生成器注册 - -### 下一步建议 - -1. 决定是否继续做“完全移除 `Mediator.Abstractions` 包”的第二阶段清理 -2. 若继续,优先迁移协程扩展与相关测试 -3. 评估是否将 `RegisterMediatorBehavior`、`ContextAwareMediator*Extensions` 等历史命名升级为 CQRS 中性命名 From f8fa2a848116d3ebad1686628ea64d992ab3d3ca Mon Sep 17 00:00:00 2001 From: GeWuYou <95328647+GeWuYou@users.noreply.github.com> Date: Tue, 14 Apr 2026 22:05:20 +0800 Subject: [PATCH 4/4] =?UTF-8?q?feat(cqrs):=20=E6=B7=BB=E5=8A=A0=E6=B5=81?= =?UTF-8?q?=E5=BC=8F=E5=91=BD=E4=BB=A4=E5=A4=84=E7=90=86=E5=99=A8=E5=92=8C?= =?UTF-8?q?=E8=87=AA=E5=8A=A8=E6=B3=A8=E5=86=8C=E5=8A=9F=E8=83=BD?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - 实现 AbstractStreamCommandHandler 基类支持流式命令处理 - 创建 CqrsHandlerRegistrar 自动扫描注册 CQRS 处理器 - 添加流式处理器接口 IStreamRequestHandler 支持 - 实现处理器注册的容错机制和类型加载恢复 - 添加确定性排序确保跨环境稳定的处理器注册顺序 - 提供完整的单元测试验证注册行为和异常处理 --- .../Cqrs/CqrsHandlerRegistrarTests.cs | 114 +++++++++++++----- GFramework.Core.Tests/CqrsTestRuntime.cs | 37 +++++- .../Command/AbstractStreamCommandHandler.cs | 28 +++-- 3 files changed, 134 insertions(+), 45 deletions(-) diff --git a/GFramework.Core.Tests/Cqrs/CqrsHandlerRegistrarTests.cs b/GFramework.Core.Tests/Cqrs/CqrsHandlerRegistrarTests.cs index 43a6272a..28b1fb32 100644 --- a/GFramework.Core.Tests/Cqrs/CqrsHandlerRegistrarTests.cs +++ b/GFramework.Core.Tests/Cqrs/CqrsHandlerRegistrarTests.cs @@ -14,18 +14,7 @@ namespace GFramework.Core.Tests.Cqrs; [TestFixture] internal sealed class CqrsHandlerRegistrarTests { - private static readonly MethodInfo RecoverLoadableTypesMethod = typeof(ArchitectureContext).Assembly - .GetType( - "GFramework.Core.Cqrs.Internal.CqrsHandlerRegistrar", - throwOnError: true)! - .GetMethod("RecoverLoadableTypes", - BindingFlags.NonPublic | - BindingFlags.Static)! - ?? throw new InvalidOperationException( - "Failed to locate CqrsHandlerRegistrar.RecoverLoadableTypes."); - private MicrosoftDiContainer? _container; - private ArchitectureContext? _context; /// @@ -40,8 +29,7 @@ internal sealed class CqrsHandlerRegistrarTests _container = new MicrosoftDiContainer(); CqrsTestRuntime.RegisterHandlers( _container, - typeof(CqrsHandlerRegistrarTests).Assembly, - typeof(ArchitectureContext).Assembly); + typeof(CqrsHandlerRegistrarTests).Assembly); _container.Freeze(); _context = new ArchitectureContext(_container); @@ -79,28 +67,53 @@ internal sealed class CqrsHandlerRegistrarTests /// 验证部分类型加载失败时仍能保留可加载类型,并记录诊断日志。 /// [Test] - public void RecoverLoadableTypes_Should_Return_Loadable_Types_And_Log_Warnings() + public void RegisterHandlers_Should_Register_Loadable_Types_And_Log_Warnings_When_Assembly_Load_Partially_Fails() { - var logger = new TestLogger(nameof(CqrsHandlerRegistrarTests), LogLevel.Warning); + var originalProvider = LoggerFactoryResolver.Provider; + var capturingProvider = new CapturingLoggerFactoryProvider(LogLevel.Warning); var reflectionTypeLoadException = new ReflectionTypeLoadException( [typeof(AlphaDeterministicNotificationHandler), null], [new TypeLoadException("Missing optional dependency for registrar test.")]); + var partiallyLoadableAssembly = new Mock(); + partiallyLoadableAssembly + .SetupGet(static assembly => assembly.FullName) + .Returns("GFramework.Core.Tests.Cqrs.PartiallyLoadableAssembly, Version=1.0.0.0"); + partiallyLoadableAssembly + .Setup(static assembly => assembly.GetTypes()) + .Throws(reflectionTypeLoadException); - var recoveredTypes = (IReadOnlyList)RecoverLoadableTypesMethod.Invoke( - null, - [typeof(CqrsHandlerRegistrarTests).Assembly, reflectionTypeLoadException, logger])!; - - Assert.Multiple(() => + LoggerFactoryResolver.Provider = capturingProvider; + try { - Assert.That(recoveredTypes, Is.EqualTo([typeof(AlphaDeterministicNotificationHandler)])); - Assert.That(logger.Logs.Count(log => log.Level == LogLevel.Warning), Is.GreaterThanOrEqualTo(2)); - Assert.That( - logger.Logs.Any(log => log.Message.Contains("partially failed", StringComparison.Ordinal)), - Is.True); - Assert.That( - logger.Logs.Any(log => log.Message.Contains("Missing optional dependency", StringComparison.Ordinal)), - Is.True); - }); + var container = new MicrosoftDiContainer(); + CqrsTestRuntime.RegisterHandlers(container, partiallyLoadableAssembly.Object); + container.Freeze(); + + var handlers = container.GetAll>(); + var warningLogs = capturingProvider.Loggers + .SelectMany(static logger => logger.Logs) + .Where(static log => log.Level == LogLevel.Warning) + .ToList(); + + Assert.Multiple(() => + { + Assert.That( + handlers.Select(static handler => handler.GetType()), + Is.EqualTo([typeof(AlphaDeterministicNotificationHandler)])); + Assert.That(warningLogs.Count, Is.GreaterThanOrEqualTo(2)); + Assert.That( + warningLogs.Any(log => log.Message.Contains("partially failed", StringComparison.Ordinal)), + Is.True); + Assert.That( + warningLogs.Any(log => + log.Message.Contains("Missing optional dependency", StringComparison.Ordinal)), + Is.True); + }); + } + finally + { + LoggerFactoryResolver.Provider = originalProvider; + } } } @@ -163,3 +176,46 @@ internal sealed class AlphaDeterministicNotificationHandler : INotificationHandl return ValueTask.CompletedTask; } } + +/// +/// 为 CQRS 注册测试捕获真实启动路径中创建的日志记录器。 +/// +/// +/// 处理器注册入口会分别为测试运行时、容器和注册器创建日志器。 +/// 该提供程序统一保留这些测试日志器,以便断言警告是否经由公开入口真正发出。 +/// +internal sealed class CapturingLoggerFactoryProvider : ILoggerFactoryProvider +{ + private readonly List _loggers = []; + + /// + /// 使用指定的最小日志级别初始化一个新的捕获型日志工厂提供程序。 + /// + /// 要应用到新建测试日志器的最小日志级别。 + public CapturingLoggerFactoryProvider(LogLevel minLevel = LogLevel.Info) + { + MinLevel = minLevel; + } + + /// + /// 获取通过当前提供程序创建的全部测试日志器。 + /// + public IReadOnlyList Loggers => _loggers; + + /// + /// 获取或设置新建测试日志器的最小日志级别。 + /// + public LogLevel MinLevel { get; set; } + + /// + /// 创建一个测试日志器并将其纳入捕获集合。 + /// + /// 日志记录器名称。 + /// 用于后续断言的测试日志器。 + public ILogger CreateLogger(string name) + { + var logger = new TestLogger(name, MinLevel); + _loggers.Add(logger); + return logger; + } +} diff --git a/GFramework.Core.Tests/CqrsTestRuntime.cs b/GFramework.Core.Tests/CqrsTestRuntime.cs index e2801cc8..e9664925 100644 --- a/GFramework.Core.Tests/CqrsTestRuntime.cs +++ b/GFramework.Core.Tests/CqrsTestRuntime.cs @@ -1,24 +1,49 @@ using System.Reflection; +using GFramework.Core.Abstractions.Ioc; +using GFramework.Core.Abstractions.Logging; using GFramework.Core.Architectures; using GFramework.Core.Ioc; using GFramework.Core.Logging; namespace GFramework.Core.Tests; +/// +/// 为测试项目提供对 CQRS 处理器真实注册入口的受控访问。 +/// +/// +/// 测试应通过该入口驱动注册流程,而不是直接反射调用注册器的私有辅助方法, +/// 这样可以覆盖生产启动路径中的程序集去重、日志记录与容错恢复行为。 +/// internal static class CqrsTestRuntime { - private static readonly MethodInfo RegisterHandlersMethod = typeof(ArchitectureContext).Assembly - .GetType( - "GFramework.Core.Cqrs.Internal.CqrsHandlerRegistrar", - throwOnError: true)! + private static readonly Type CqrsHandlerRegistrarType = typeof(ArchitectureContext).Assembly + .GetType( + "GFramework.Core.Cqrs.Internal.CqrsHandlerRegistrar", + throwOnError: true)! + ?? throw new InvalidOperationException( + "Failed to locate CqrsHandlerRegistrar type."); + + private static readonly MethodInfo RegisterHandlersMethod = CqrsHandlerRegistrarType .GetMethod( "RegisterHandlers", BindingFlags.Public | BindingFlags.NonPublic | - BindingFlags.Static)! + BindingFlags.Static, + binder: null, + [ + typeof(IIocContainer), + typeof(IEnumerable), + typeof(ILogger) + ], + modifiers: null) ?? throw new InvalidOperationException( "Failed to locate CqrsHandlerRegistrar.RegisterHandlers."); - public static void RegisterHandlers(MicrosoftDiContainer container, params Assembly[] assemblies) + /// + /// 通过与生产代码一致的注册入口扫描并注册指定程序集中的 CQRS 处理器。 + /// + /// 承载处理器映射的测试容器。 + /// 要扫描的程序集集合。 + internal static void RegisterHandlers(MicrosoftDiContainer container, params Assembly[] assemblies) { ArgumentNullException.ThrowIfNull(container); ArgumentNullException.ThrowIfNull(assemblies); diff --git a/GFramework.Core/Cqrs/Command/AbstractStreamCommandHandler.cs b/GFramework.Core/Cqrs/Command/AbstractStreamCommandHandler.cs index 84e8e029..847563c4 100644 --- a/GFramework.Core/Cqrs/Command/AbstractStreamCommandHandler.cs +++ b/GFramework.Core/Cqrs/Command/AbstractStreamCommandHandler.cs @@ -18,22 +18,30 @@ using GFramework.Core.Rule; namespace GFramework.Core.Cqrs.Command; /// -/// 抽象流式命令处理器基类 -/// 继承自 ContextAwareBase 并实现 IStreamRequestHandler 接口,为具体的流式命令处理器提供基础功能。 -/// 支持流式处理命令并产生异步可枚举的响应序列,框架会在每次创建流前注入当前架构上下文。 +/// 抽象流式命令处理器基类。 +/// 继承自 并实现 , +/// 为具体的流式命令处理器提供基础功能。 /// -/// 流式命令类型,必须实现IStreamCommand接口 -/// 流式命令响应元素类型 +/// 流式命令类型,必须实现 +/// 流式命令响应元素类型。 +/// +/// 框架会在每次调用 CreateStream 进入实际处理逻辑前,为当前处理器实例注入架构上下文, +/// 因此派生类只能在 执行期间及其返回的异步枚举序列内假定 Context 可用。 +/// 默认注册器会将流式命令处理器注册为瞬态服务,以避免同一个上下文感知实例在多个流或并发请求之间复用。 +/// 派生类不应缓存处理器实例,也不应把依赖当前上下文的可变状态泄漏到流外部。 +/// 传入 的取消令牌同时约束流的创建与后续枚举, +/// 派生类应在启动阶段和每次生成响应前尊重取消请求,避免在调用方停止枚举后继续执行后台工作。 +/// public abstract class AbstractStreamCommandHandler : ContextAwareBase, IStreamRequestHandler where TCommand : IStreamCommand { /// - /// 处理流式命令并返回异步可枚举的响应序列 - /// 由具体的流式命令处理器子类实现流式处理逻辑 + /// 处理流式命令并返回异步可枚举的响应序列。 + /// 由具体的流式命令处理器子类实现流式处理逻辑。 /// - /// 要处理的流式命令对象 - /// 取消令牌,用于取消流式处理操作 - /// 异步可枚举的响应序列,每个元素类型为TResponse + /// 要处理的流式命令对象。 + /// 取消令牌,用于取消流式处理操作。 + /// 异步可枚举的响应序列,每个元素类型为 public abstract IAsyncEnumerable Handle(TCommand command, CancellationToken cancellationToken); }