diff --git a/GFramework.Cqrs.Tests/Cqrs/NotificationPublisherRegistrationExtensionsTests.cs b/GFramework.Cqrs.Tests/Cqrs/NotificationPublisherRegistrationExtensionsTests.cs new file mode 100644 index 00000000..7657754a --- /dev/null +++ b/GFramework.Cqrs.Tests/Cqrs/NotificationPublisherRegistrationExtensionsTests.cs @@ -0,0 +1,143 @@ +// Copyright (c) 2025-2026 GeWuYou +// SPDX-License-Identifier: Apache-2.0 + +using GFramework.Core.Architectures; +using GFramework.Core.Abstractions.Logging; +using GFramework.Core.Ioc; +using GFramework.Core.Logging; +using GFramework.Cqrs.Abstractions.Cqrs; +using GFramework.Cqrs.Extensions; +using GFramework.Cqrs.Notification; + +namespace GFramework.Cqrs.Tests.Cqrs; + +/// +/// 验证 notification publisher 组合根注册扩展的关键行为。 +/// +[TestFixture] +internal sealed class NotificationPublisherRegistrationExtensionsTests +{ + /// + /// 验证显式注册内置 后, + /// 标准 runtime 基础设施会复用该策略并继续调度所有处理器。 + /// + [Test] + public async Task UseTaskWhenAllNotificationPublisher_Should_Be_Used_By_Default_Runtime_Infrastructure() + { + LoggerFactoryResolver.Provider = new ConsoleLoggerFactoryProvider(); + + var trailingHandler = new RecordingNotificationHandler(); + var container = new MicrosoftDiContainer(); + container.UseTaskWhenAllNotificationPublisher(); + container.Register>(new ThrowingNotificationHandler()); + container.Register>(trailingHandler); + CqrsTestRuntime.RegisterInfrastructure(container); + container.Freeze(); + + var context = new ArchitectureContext(container); + var publishTask = context.PublishAsync(new TestNotification()).AsTask(); + + try + { + await publishTask.ConfigureAwait(false); + } + catch (Exception) + { + // `TaskWhenAll` 策略会在所有处理器都结束后聚合失败;这里仅消费异常并继续断言第二个处理器已执行。 + } + + Assert.That(trailingHandler.WasInvoked, Is.True); + Assert.That(publishTask.Exception, Is.Not.Null); + } + + /// + /// 验证显式传入实例的组合根注册入口会把同一个 publisher 实例绑定到容器。 + /// + [Test] + public void UseNotificationPublisher_Instance_Overload_Should_Register_Same_Instance() + { + var container = new MicrosoftDiContainer(); + var publisher = new TrackingNotificationPublisher(); + + var returnedContainer = container.UseNotificationPublisher(publisher); + + Assert.That(returnedContainer, Is.SameAs(container)); + Assert.That(container.Get(), Is.SameAs(publisher)); + } + + /// + /// 验证组合根扩展会阻止重复 notification publisher 注册,避免 runtime 创建阶段才暴露歧义。 + /// + [Test] + public void UseNotificationPublisher_Should_Throw_When_NotificationPublisher_Already_Registered() + { + var container = new MicrosoftDiContainer(); + container.UseTaskWhenAllNotificationPublisher(); + + Assert.That( + () => container.UseNotificationPublisher(new TrackingNotificationPublisher()), + Throws.InvalidOperationException.With.Message.Contains(nameof(INotificationPublisher))); + } + + /// + /// 为本组测试提供最小 notification 类型。 + /// + private sealed record TestNotification : INotification; + + /// + /// 记录自己是否被执行的测试处理器。 + /// + private sealed class RecordingNotificationHandler : INotificationHandler + { + /// + /// 获取当前处理器是否至少执行过一次。 + /// + public bool WasInvoked { get; private set; } + + /// + /// 记录执行痕迹并立刻完成。 + /// + public ValueTask Handle(TestNotification notification, CancellationToken cancellationToken) + { + ArgumentNullException.ThrowIfNull(notification); + cancellationToken.ThrowIfCancellationRequested(); + WasInvoked = true; + return ValueTask.CompletedTask; + } + } + + /// + /// 始终抛出异常的测试处理器,用于验证并行策略不会因为首个失败而停止其余处理器。 + /// + private sealed class ThrowingNotificationHandler : INotificationHandler + { + /// + /// 始终抛出测试异常。 + /// + public ValueTask Handle(TestNotification notification, CancellationToken cancellationToken) + { + ArgumentNullException.ThrowIfNull(notification); + cancellationToken.ThrowIfCancellationRequested(); + throw new InvalidOperationException("boom"); + } + } + + /// + /// 用于验证实例注册重载是否保留原对象身份的测试发布器。 + /// + private sealed class TrackingNotificationPublisher : INotificationPublisher + { + /// + /// 直接完成当前 publish 调用。 + /// + public ValueTask PublishAsync( + NotificationPublishContext context, + CancellationToken cancellationToken = default) + where TNotification : INotification + { + ArgumentNullException.ThrowIfNull(context); + cancellationToken.ThrowIfCancellationRequested(); + return ValueTask.CompletedTask; + } + } +} diff --git a/GFramework.Cqrs/Extensions/NotificationPublisherRegistrationExtensions.cs b/GFramework.Cqrs/Extensions/NotificationPublisherRegistrationExtensions.cs new file mode 100644 index 00000000..034e878b --- /dev/null +++ b/GFramework.Cqrs/Extensions/NotificationPublisherRegistrationExtensions.cs @@ -0,0 +1,95 @@ +// Copyright (c) 2025-2026 GeWuYou +// SPDX-License-Identifier: Apache-2.0 + +using GFramework.Core.Abstractions.Ioc; +using GFramework.Cqrs.Notification; + +namespace GFramework.Cqrs.Extensions; + +/// +/// 为 CQRS runtime 提供 notification publisher 策略的组合根注册入口。 +/// +/// +/// 默认 runtime 只会消费一个 实例,因此该扩展类把“选择哪种策略”显式收敛到容器配置阶段。 +/// 这些入口应在 runtime 创建前调用;对于走标准 GFramework.Core 启动路径的架构,它们会被 CqrsRuntimeModule 自动复用。 +/// +public static class NotificationPublisherRegistrationExtensions +{ + /// + /// 将指定的 notification publisher 实例注册为当前容器唯一的发布策略。 + /// + /// 目标依赖注入容器。 + /// 要复用的 notification publisher 实例。 + /// 同一个 ,便于在组合根中继续链式配置。 + /// + /// 。 + /// + /// + /// 当前容器已存在 注册,无法再切换为另一个策略。 + /// + public static IIocContainer UseNotificationPublisher( + this IIocContainer container, + INotificationPublisher notificationPublisher) + { + ArgumentNullException.ThrowIfNull(container); + ArgumentNullException.ThrowIfNull(notificationPublisher); + + ThrowIfNotificationPublisherAlreadyRegistered(container); + container.Register(notificationPublisher); + return container; + } + + /// + /// 将指定类型的 notification publisher 注册为当前容器唯一的发布策略。 + /// + /// 发布策略实现类型。 + /// 目标依赖注入容器。 + /// 同一个 ,便于在组合根中继续链式配置。 + /// + /// + /// 当前容器已存在 注册,无法再切换为另一个策略。 + /// + public static IIocContainer UseNotificationPublisher(this IIocContainer container) + where TNotificationPublisher : class, INotificationPublisher + { + ArgumentNullException.ThrowIfNull(container); + + ThrowIfNotificationPublisherAlreadyRegistered(container); + container.RegisterSingleton(); + return container; + } + + /// + /// 将内置 注册为当前容器唯一的 notification publisher 策略。 + /// + /// 目标依赖注入容器。 + /// 同一个 ,便于在组合根中继续链式配置。 + /// + /// + /// 当前容器已存在 注册,无法再切换为另一个策略。 + /// + /// + /// 该策略更适合“等待所有处理器完成并统一观察失败”的语义诉求; + /// 若只是为了降低 steady-state publish 开销,应先结合实际 benchmark 结果评估是否值得切换。 + /// + public static IIocContainer UseTaskWhenAllNotificationPublisher(this IIocContainer container) + { + return UseNotificationPublisher(container, new TaskWhenAllNotificationPublisher()); + } + + /// + /// 在组合根阶段阻止多个 notification publisher 策略同时注册,避免 runtime 创建时出现歧义。 + /// + /// 当前正在配置的依赖注入容器。 + /// 当前容器已存在 notification publisher 注册。 + private static void ThrowIfNotificationPublisherAlreadyRegistered(IIocContainer container) + { + if (!container.HasRegistration(typeof(INotificationPublisher))) + { + return; + } + + throw new InvalidOperationException( + $"An {typeof(INotificationPublisher).FullName} is already registered. Remove the existing notification publisher strategy before calling {nameof(UseNotificationPublisher)} again."); + } +} diff --git a/GFramework.Cqrs/README.md b/GFramework.Cqrs/README.md index cf67c5c8..6d5b5835 100644 --- a/GFramework.Cqrs/README.md +++ b/GFramework.Cqrs/README.md @@ -128,20 +128,27 @@ var playerId = await this.SendAsync(new CreatePlayerCommand(new CreatePlayerInpu - 默认通知发布器会按容器解析顺序逐个执行处理器,并在首个处理器抛出异常时立即停止后续分发。 - 若需要等待所有处理器并行完成,可以在创建 runtime 时显式传入 `TaskWhenAllNotificationPublisher`;该策略不保证执行顺序,并会在全部处理器结束后聚合异常或取消结果。 - 若容器在 runtime 创建前已显式注册 `INotificationPublisher`,默认 runtime 会复用该策略;未注册时回退到内置顺序发布器。 + - 若只是为了降低 fixed fan-out publish 的 steady-state 成本,当前 benchmark 并不表明 `TaskWhenAllNotificationPublisher` 会优于默认顺序发布器;它更适合你需要“等待全部处理器完成并统一观察失败”的场景。 -如果你需要切换到内置并行 notification publisher,可以在组合根里显式选择它: +如果你需要切换到内置并行 notification publisher,推荐在组合根里显式声明这条策略: ```csharp -using GFramework.Cqrs; +using GFramework.Cqrs.Extensions; using GFramework.Cqrs.Notification; -var runtime = CqrsRuntimeFactory.CreateRuntime( - container, - logger, - new TaskWhenAllNotificationPublisher()); +container.UseTaskWhenAllNotificationPublisher(); ``` -对于走标准 `GFramework.Core` 启动路径的架构,也可以在 runtime 创建前预先向容器注册 `INotificationPublisher`,让默认基础设施复用同一策略。 +如果你确实需要自定义 publisher 实例,也可以继续显式注册: + +```csharp +using GFramework.Cqrs.Extensions; +using GFramework.Cqrs.Notification; + +container.UseNotificationPublisher(new TaskWhenAllNotificationPublisher()); +``` + +对于走标准 `GFramework.Core` 启动路径的架构,这些组合根扩展会被默认基础设施自动复用;如果你直接调用 `CqrsRuntimeFactory.CreateRuntime(...)`,也仍然可以像以前一样显式传入 publisher 实例。 - 流式请求 - 通过 `IStreamRequest` 和 `IStreamRequestHandler<,>` 返回 `IAsyncEnumerable`。 - 当消费端程序集提供 generated stream invoker provider / descriptor 后,runtime 会优先消费这组 stream invoker 元数据;未命中时仍回退到既有反射 stream binding 创建路径。 diff --git a/ai-plan/public/cqrs-rewrite/todos/cqrs-rewrite-migration-tracking.md b/ai-plan/public/cqrs-rewrite/todos/cqrs-rewrite-migration-tracking.md index 9b1f576b..f0b458af 100644 --- a/ai-plan/public/cqrs-rewrite/todos/cqrs-rewrite-migration-tracking.md +++ b/ai-plan/public/cqrs-rewrite/todos/cqrs-rewrite-migration-tracking.md @@ -7,10 +7,13 @@ CQRS 迁移与收敛。 ## 当前恢复点 -- 恢复点编号:`CQRS-REWRITE-RP-114` +- 恢复点编号:`CQRS-REWRITE-RP-115` - 当前阶段:`Phase 8` - 当前 PR 锚点:`PR #341` - 当前结论: + - 当前 `RP-115` 已继续沿用 `$gframework-batch-boot 50`,并把 notification publisher 线从“已具备 seam + benchmark 事实”继续收口到组合根配置面:新增 `GFramework.Cqrs.Extensions.NotificationPublisherRegistrationExtensions`,提供 `UseNotificationPublisher(...)` / `UseNotificationPublisher()` / `UseTaskWhenAllNotificationPublisher()` 三个显式入口,避免用户再手写 `Register(new ...)` + - 这一批同时把重复策略注册前移到组合根阶段显式阻止,并在回归里确认 `UseTaskWhenAllNotificationPublisher()` 经过默认 runtime 基础设施后仍会命中“失败不阻断其余 handler”的并行语义;这让 notification publisher 的采用路径从“知道内部 seam 如何接线”收口为“知道该在容器里选哪条策略” + - 用户文档现同步写明 `TaskWhenAllNotificationPublisher` 更适合“并行完成 + 统一观察失败”的语义诉求,而不是 fixed fan-out steady-state publish 优化;这与 `RP-114` 的 benchmark 结论保持一致,减少使用者把它误解成默认的性能升级开关 - 当前 `RP-114` 已继续沿用 `$gframework-batch-boot 50`,并沿着 `RP-113` 刚落地的 notification publisher 能力切片继续补 benchmark:`NotificationFanOutBenchmarks` 现同时纳入 `GFramework.Cqrs` 默认顺序发布器与新内置 `TaskWhenAllNotificationPublisher`,用于量化“能力差距收口后,固定 4 handler fan-out 的成本变化” - `RP-114` 的 short-job 结果显示:fixed `4 handler` fan-out 下,默认顺序发布器约 `427.453 ns / 408 B`,内置 `TaskWhenAllNotificationPublisher` 约 `472.574 ns / 496 B`,`MediatR` 约 `225.940 ns / 1256 B`,NuGet `Mediator` concrete runtime 约 `3.854 ns / 0 B`;这说明当前内置并行 publisher 的主要价值是语义补齐,而不是 steady-state fan-out 性能收益 - 这一批保持 runtime 公开 API 与 notification 语义不变,只扩 benchmark 对照口径与恢复文档;原因是 `RP-113` 已经把并行 publisher 能力落到 production path,当前更高价值的是先证明它相对默认顺序发布器、`Mediator` 与 `MediatR` 的成本位置,而不是立即继续扩第二个 publisher strategy @@ -369,8 +372,8 @@ CQRS 迁移与收敛。 ## 下一推荐步骤 -1. 若 `RP-114` 的 fan-out benchmark 证实内置 `TaskWhenAllNotificationPublisher` 在固定多处理器场景下明显优于默认顺序发布器,可继续评估是否需要补单处理器 / 异常路径的对照或把这组结果吸收到用户文档 -2. 当前 benchmark 已证明 `TaskWhenAllNotificationPublisher` 的价值主要在并行完成与异常聚合语义,而不是吞吐收益;下一轮优先评估 notification publisher 配置面 / 文档边界,或回到 request dispatch 常量开销,而不是新增 generated notification invoker/provider 这类 steady-state 收益信号偏弱的 runtime seam +1. 既然 `RP-115` 已把 notification publisher 选择面收口到显式组合根扩展,下一轮若继续留在 notification 线,优先评估是否需要补第二个内置策略或更细的配置文档,而不是再让用户直接依赖裸 `INotificationPublisher` 注册细节 +2. 当前 benchmark 已证明 `TaskWhenAllNotificationPublisher` 的价值主要在并行完成与异常聚合语义,而不是吞吐收益;若 notification 配置面已经足够,下一轮优先回到 request dispatch 常量开销,而不是新增 generated notification invoker/provider 这类 steady-state 收益信号偏弱的 runtime seam 3. 若 benchmark 对照需要继续贴近 `Mediator` 官方设计,再评估 `Mediator` 的 compile-time lifetime / stream 对照矩阵,或给 stream 引入 scoped host 基线,而不是回头重试已被 benchmark 否决的 `GetAll(Type)` 零行为探测方案 ## 活跃文档 diff --git a/ai-plan/public/cqrs-rewrite/traces/cqrs-rewrite-migration-trace.md b/ai-plan/public/cqrs-rewrite/traces/cqrs-rewrite-migration-trace.md index 35f8fc3e..c23bfdb6 100644 --- a/ai-plan/public/cqrs-rewrite/traces/cqrs-rewrite-migration-trace.md +++ b/ai-plan/public/cqrs-rewrite/traces/cqrs-rewrite-migration-trace.md @@ -2,6 +2,28 @@ ## 2026-05-08 +### 阶段:notification publisher 组合根配置面(CQRS-REWRITE-RP-115) + +- 延续 `$gframework-batch-boot 50`,本轮不再回到 benchmark 宿主,而是沿着 `RP-114` 已明确的性能/语义事实继续收口用户接入缺口: + - 当前分支相对 `origin/main`(`7ca21af9`, `2026-05-08 16:12:20 +0800`)的累计 branch diff 在启动时仍为 `9 files`,明显低于 `50` 文件阈值 + - `RP-113` / `RP-114` 已证明内置 `TaskWhenAllNotificationPublisher` 的价值主要是语义补齐,但当前用户若要采用它,仍需知道 `INotificationPublisher` 的底层注册细节 +- 本轮主线程决策: + - 新增 `GFramework.Cqrs/Extensions/NotificationPublisherRegistrationExtensions.cs`,提供 `UseNotificationPublisher(...)`、`UseNotificationPublisher()` 与 `UseTaskWhenAllNotificationPublisher()` 三个显式组合根入口 + - 在 `GFramework.Cqrs.Tests/Cqrs/NotificationPublisherRegistrationExtensionsTests.cs` 补齐回归,确认默认 runtime 基础设施会复用 `UseTaskWhenAllNotificationPublisher()`,且重复策略注册会在组合根阶段被显式阻止 + - 更新 `GFramework.Cqrs/README.md` 与 `docs/zh-CN/core/cqrs.md`,把推荐用法改成组合根扩展,并把 `RP-114` 的 benchmark 结论翻译成用户可用的采用边界 +- 本轮权威验证: + - `dotnet build GFramework.Cqrs/GFramework.Cqrs.csproj -c Release` + - 结果:通过,`0 warning / 0 error` + - `dotnet test GFramework.Cqrs.Tests/GFramework.Cqrs.Tests.csproj -c Release --filter "FullyQualifiedName~NotificationPublisherRegistrationExtensionsTests|FullyQualifiedName~CqrsNotificationPublisherTests"` + - 结果:通过,`9/9` passed + - `python3 scripts/license-header.py --check --paths GFramework.Cqrs/Extensions/NotificationPublisherRegistrationExtensions.cs GFramework.Cqrs.Tests/Cqrs/NotificationPublisherRegistrationExtensionsTests.cs GFramework.Cqrs/README.md docs/zh-CN/core/cqrs.md ai-plan/public/cqrs-rewrite/todos/cqrs-rewrite-migration-tracking.md ai-plan/public/cqrs-rewrite/traces/cqrs-rewrite-migration-trace.md` + - 结果:通过 + - `git diff --check` + - 结果:通过 +- 本轮结论: + - 这一批已把 notification publisher 的采用路径从“理解内部 seam”收口成“在组合根里显式选择策略”,并让重复策略注册在配置阶段就得到清晰失败信号 + - 若后续仍继续 notification 线,更合理的下一刀会是补第二个内置策略或更细的采用文档,而不是继续要求用户手写容器底层注册 + ### 阶段:`TaskWhenAll` notification publisher fan-out benchmark(CQRS-REWRITE-RP-114) - 延续 `$gframework-batch-boot 50`,本轮不再扩新的 notification runtime 能力,而是沿着 `RP-113` 刚落地的内置并行 publisher 继续补验证口径: diff --git a/docs/zh-CN/core/cqrs.md b/docs/zh-CN/core/cqrs.md index 35dc0546..82331786 100644 --- a/docs/zh-CN/core/cqrs.md +++ b/docs/zh-CN/core/cqrs.md @@ -123,13 +123,10 @@ var playerId = await architecture.Context.SendRequestAsync( `TaskWhenAllNotificationPublisher`: ```csharp -using GFramework.Cqrs; +using GFramework.Cqrs.Extensions; using GFramework.Cqrs.Notification; -var runtime = CqrsRuntimeFactory.CreateRuntime( - container, - logger, - new TaskWhenAllNotificationPublisher()); +container.UseTaskWhenAllNotificationPublisher(); ``` 这条策略的边界也需要明确: @@ -137,6 +134,16 @@ var runtime = CqrsRuntimeFactory.CreateRuntime( - 不保证处理器执行顺序 - 不会在首个处理器失败时立即停止其余处理器 - 会在全部处理器结束后统一暴露异常或取消结果 +- 当前 fixed `4 handler` fan-out benchmark 中,它的 steady-state 成本也高于默认顺序发布器;因此它更适合“我要并行语义”,而不是“我要更快的 publish” + +如果你需要显式提供自定义 publisher 实例,而不是直接采用内置 `TaskWhenAll` 策略,也可以在组合根里写成: + +```csharp +using GFramework.Cqrs.Extensions; +using GFramework.Cqrs.Notification; + +container.UseNotificationPublisher(new TaskWhenAllNotificationPublisher()); +``` ## Request 与流式变体