From 7ff4b628a1a0bf49f3438c642d7d5b6d8a5cf1a1 Mon Sep 17 00:00:00 2001 From: gewuyou <95328647+GeWuYou@users.noreply.github.com> Date: Fri, 8 May 2026 17:25:42 +0800 Subject: [PATCH] =?UTF-8?q?feat(cqrs):=20=E6=96=B0=E5=A2=9E=E5=B9=B6?= =?UTF-8?q?=E8=A1=8C=E9=80=9A=E7=9F=A5=E5=8F=91=E5=B8=83=E7=AD=96=E7=95=A5?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - 新增 TaskWhenAllNotificationPublisher 内置并行通知发布器并保留默认顺序语义 - 补充通知发布策略回归测试与采用边界文档 - 更新 cqrs-rewrite 跟踪与执行追踪恢复点 --- .../Cqrs/CqrsNotificationPublisherTests.cs | 35 +++++++++ .../TaskWhenAllNotificationPublisher.cs | 71 +++++++++++++++++++ GFramework.Cqrs/README.md | 16 +++++ .../todos/cqrs-rewrite-migration-tracking.md | 10 ++- .../traces/cqrs-rewrite-migration-trace.md | 23 ++++++ docs/zh-CN/core/cqrs.md | 19 +++++ 6 files changed, 171 insertions(+), 3 deletions(-) create mode 100644 GFramework.Cqrs/Notification/TaskWhenAllNotificationPublisher.cs diff --git a/GFramework.Cqrs.Tests/Cqrs/CqrsNotificationPublisherTests.cs b/GFramework.Cqrs.Tests/Cqrs/CqrsNotificationPublisherTests.cs index 3b4b66e6..6e674d30 100644 --- a/GFramework.Cqrs.Tests/Cqrs/CqrsNotificationPublisherTests.cs +++ b/GFramework.Cqrs.Tests/Cqrs/CqrsNotificationPublisherTests.cs @@ -91,6 +91,41 @@ internal sealed class CqrsNotificationPublisherTests Assert.That(handler.ObservedContext, Is.SameAs(architectureContext.Object)); } + /// + /// 验证内置 `TaskWhenAll` 发布器会继续调度所有处理器,而不是沿用默认顺序发布器的失败即停语义。 + /// + [Test] + public async Task PublishAsync_Should_Invoke_All_Handlers_When_Using_TaskWhenAll_NotificationPublisher() + { + var trailingHandler = new RecordingNotificationHandler("second", []); + var runtime = CreateRuntime( + container => + { + container + .Setup(currentContainer => currentContainer.GetAll(typeof(INotificationHandler))) + .Returns( + [ + new ThrowingNotificationHandler(), + trailingHandler + ]); + }, + new TaskWhenAllNotificationPublisher()); + + var publishTask = runtime.PublishAsync(new FakeCqrsContext(), new PublisherNotification()).AsTask(); + + try + { + await publishTask.ConfigureAwait(false); + } + catch (Exception) + { + // 并行发布会把处理器失败收敛到返回任务;这里仅消费异常并继续验证所有处理器都已被触发。 + } + + Assert.That(trailingHandler.Invoked, Is.True); + Assert.That(publishTask.Exception, Is.Not.Null); + } + /// /// 验证默认通知发布器在零处理器场景下会保持静默完成。 /// diff --git a/GFramework.Cqrs/Notification/TaskWhenAllNotificationPublisher.cs b/GFramework.Cqrs/Notification/TaskWhenAllNotificationPublisher.cs new file mode 100644 index 00000000..5ab6e53d --- /dev/null +++ b/GFramework.Cqrs/Notification/TaskWhenAllNotificationPublisher.cs @@ -0,0 +1,71 @@ +// Copyright (c) 2025-2026 GeWuYou +// SPDX-License-Identifier: Apache-2.0 + +using System.Threading.Tasks; +using GFramework.Cqrs.Abstractions.Cqrs; + +namespace GFramework.Cqrs.Notification; + +/// +/// 以内置 Task.WhenAll(...) 策略并行分发通知处理器。 +/// +/// +/// 该实现会先为当前发布调用中的每个处理器创建独立执行任务,再等待全部任务完成。 +/// 它不会保留默认顺序发布器的“首个异常立即停止”语义;如果多个处理器失败,返回任务会聚合这些异常。 +/// 适合处理器之间互不依赖,且调用方更关心总耗时而不是处理顺序的场景。 +/// +public sealed class TaskWhenAllNotificationPublisher : INotificationPublisher +{ + /// + /// 并行启动当前通知的所有处理器,并等待它们全部结束。 + /// + /// 通知类型。 + /// 当前发布调用的执行上下文。 + /// 取消令牌。 + /// 表示所有处理器都已完成的值任务。 + /// + public ValueTask PublishAsync( + NotificationPublishContext context, + CancellationToken cancellationToken = default) + where TNotification : INotification + { + ArgumentNullException.ThrowIfNull(context); + + return context.Handlers.Count switch + { + 0 => ValueTask.CompletedTask, + 1 => context.InvokeHandlerAsync(context.Handlers[0], cancellationToken), + _ => PublishCoreAsync(context, cancellationToken) + }; + } + + /// + /// 为多处理器场景建立并行等待,确保单个处理器的同步异常也会被收敛到返回任务中。 + /// + private static async ValueTask PublishCoreAsync( + NotificationPublishContext context, + CancellationToken cancellationToken) + where TNotification : INotification + { + var tasks = new Task[context.Handlers.Count]; + + for (var index = 0; index < context.Handlers.Count; index++) + { + tasks[index] = InvokeHandlerSafelyAsync(context, context.Handlers[index], cancellationToken).AsTask(); + } + + await Task.WhenAll(tasks).ConfigureAwait(false); + } + + /// + /// 通过异步包装把同步抛出的处理器异常也转换成可聚合的任务结果。 + /// + private static async ValueTask InvokeHandlerSafelyAsync( + NotificationPublishContext context, + object handler, + CancellationToken cancellationToken) + where TNotification : INotification + { + await context.InvokeHandlerAsync(handler, cancellationToken).ConfigureAwait(false); + } +} diff --git a/GFramework.Cqrs/README.md b/GFramework.Cqrs/README.md index 1072d53c..cf67c5c8 100644 --- a/GFramework.Cqrs/README.md +++ b/GFramework.Cqrs/README.md @@ -50,6 +50,7 @@ - `CqrsRuntimeFactory.cs` - `Internal/CqrsDispatcher.cs` - `Notification/INotificationPublisher.cs` + - `Notification/TaskWhenAllNotificationPublisher.cs` - `Internal/CqrsHandlerRegistrar.cs` - `Internal/DefaultCqrsHandlerRegistrar.cs` - `Internal/DefaultCqrsRegistrationService.cs` @@ -125,7 +126,22 @@ var playerId = await this.SendAsync(new CreatePlayerCommand(new CreatePlayerInpu - 通知分发 - 通知会分发给所有已注册 `INotificationHandler<>`;零处理器时默认静默完成。 - 默认通知发布器会按容器解析顺序逐个执行处理器,并在首个处理器抛出异常时立即停止后续分发。 + - 若需要等待所有处理器并行完成,可以在创建 runtime 时显式传入 `TaskWhenAllNotificationPublisher`;该策略不保证执行顺序,并会在全部处理器结束后聚合异常或取消结果。 - 若容器在 runtime 创建前已显式注册 `INotificationPublisher`,默认 runtime 会复用该策略;未注册时回退到内置顺序发布器。 + +如果你需要切换到内置并行 notification publisher,可以在组合根里显式选择它: + +```csharp +using GFramework.Cqrs; +using GFramework.Cqrs.Notification; + +var runtime = CqrsRuntimeFactory.CreateRuntime( + container, + logger, + new TaskWhenAllNotificationPublisher()); +``` + +对于走标准 `GFramework.Core` 启动路径的架构,也可以在 runtime 创建前预先向容器注册 `INotificationPublisher`,让默认基础设施复用同一策略。 - 流式请求 - 通过 `IStreamRequest` 和 `IStreamRequestHandler<,>` 返回 `IAsyncEnumerable`。 - 当消费端程序集提供 generated stream invoker provider / descriptor 后,runtime 会优先消费这组 stream invoker 元数据;未命中时仍回退到既有反射 stream binding 创建路径。 diff --git a/ai-plan/public/cqrs-rewrite/todos/cqrs-rewrite-migration-tracking.md b/ai-plan/public/cqrs-rewrite/todos/cqrs-rewrite-migration-tracking.md index 16d106b8..c4b3b16f 100644 --- a/ai-plan/public/cqrs-rewrite/todos/cqrs-rewrite-migration-tracking.md +++ b/ai-plan/public/cqrs-rewrite/todos/cqrs-rewrite-migration-tracking.md @@ -7,10 +7,14 @@ CQRS 迁移与收敛。 ## 当前恢复点 -- 恢复点编号:`CQRS-REWRITE-RP-112` +- 恢复点编号:`CQRS-REWRITE-RP-113` - 当前阶段:`Phase 8` - 当前 PR 锚点:`PR #341` - 当前结论: + - 当前 `RP-113` 已继续沿用 `$gframework-batch-boot 50`,并把 notification 线从 benchmark 对照推进到实际 runtime 能力:新增公开内置 `TaskWhenAllNotificationPublisher`,让 `GFramework.Cqrs` 在保留默认顺序发布器的同时,提供与 `Mediator` `TaskWhenAllPublisher` 对齐的并行 notification publish 策略 + - `TaskWhenAllNotificationPublisher` 当前语义明确为:零处理器静默完成,单处理器直接透传,多处理器并行启动并等待全部结束;它不保留默认顺序发布器的“首个异常立即停止”语义,而是把全部处理器的失败/取消结果收敛到同一个返回任务 + - 本轮同时补齐 `CqrsNotificationPublisherTests` 对新内置策略的回归,并更新 `GFramework.Cqrs/README.md` 与 `docs/zh-CN/core/cqrs.md`,把切换方式和语义边界写回用户可见文档;当前已提交 branch diff 仍明显低于 `$gframework-batch-boot 50` 的停止阈值 + - 这一批选择真正落一个内置 publisher strategy,而不是继续加 notification benchmark 维度;原因是 `RP-111` / `RP-112` 已经把 notification gap 量化清楚,下一步更高价值的是开始收口“能力差距”而不是继续重复建立对照数据 - 当前 `RP-112` 已继续沿用 `$gframework-batch-boot 50`,并在 `RP-111` 的单处理器 notification 对照基础上补齐固定 `4 handler` 的 fan-out publish benchmark:新增 `NotificationFanOutBenchmarks`,对比 baseline、`GFramework.Cqrs`、NuGet `Mediator` concrete runtime 与 `MediatR` - `NotificationFanOutBenchmarks` 当前 short-job 基线约为 baseline `8.302 ns / 0 B`、`Mediator` `4.314 ns / 0 B`、`MediatR` `230.304 ns / 1256 B`、`GFramework.Cqrs` `434.413 ns / 408 B`;这说明 notification fan-out 的差距已经不只体现在单处理器 publish,而是在固定 4 处理器场景下依然保持相近量级 - 本轮仍然只扩 benchmark 对照口径,没有直接修改 notification runtime 或 publisher 策略语义;原因是当前更高价值的事实是先量化“单处理器”和“固定 fan-out”两条 notification 路径的外部差距,再决定下一批是否值得切进 publisher strategy 或 runtime 热点 @@ -361,8 +365,8 @@ CQRS 迁移与收敛。 ## 下一推荐步骤 -1. 当前 turn 仍可继续自动推进;若下一批继续沿用 `$gframework-batch-boot` 且优先处理 notification 线,先评估 publisher strategy 或异常/取消语义的对照,而不是继续机械扩充更多同层级 fan-out benchmark -2. 若下一轮要切回更高价值热点,优先重新审视 request dispatch 常量开销或 notification publisher strategy,而不是新增 generated notification invoker/provider 这一类 steady-state 收益信号偏弱的 runtime seam +1. 当前 turn 仍可继续自动推进;若下一批继续沿用 `$gframework-batch-boot 50` 且仍留在 notification 线,优先补 `TaskWhenAllNotificationPublisher` 的 benchmark / 异常聚合文档细化,或继续评估是否需要第二个内置 publisher strategy,而不是回头再加同层级 fan-out 对照 +2. 若下一轮要切回更高价值热点,优先重新审视 request dispatch 常量开销或 notification publisher 策略配置面,而不是新增 generated notification invoker/provider 这一类 steady-state 收益信号偏弱的 runtime seam 3. 若 benchmark 对照需要继续贴近 `Mediator` 官方设计,再评估 `Mediator` 的 compile-time lifetime / stream 对照矩阵,或给 stream 引入 scoped host 基线,而不是回头重试已被 benchmark 否决的 `GetAll(Type)` 零行为探测方案 ## 活跃文档 diff --git a/ai-plan/public/cqrs-rewrite/traces/cqrs-rewrite-migration-trace.md b/ai-plan/public/cqrs-rewrite/traces/cqrs-rewrite-migration-trace.md index 35bc6a8b..f742d121 100644 --- a/ai-plan/public/cqrs-rewrite/traces/cqrs-rewrite-migration-trace.md +++ b/ai-plan/public/cqrs-rewrite/traces/cqrs-rewrite-migration-trace.md @@ -2,6 +2,29 @@ ## 2026-05-08 +### 阶段:内置 `TaskWhenAll` notification publisher(CQRS-REWRITE-RP-113) + +- 延续 `$gframework-batch-boot 50`,本轮不再继续堆 notification benchmark 维度,而是直接把上一批已经量化清楚的 capability gap 收口到 runtime: + - `RP-111` / `RP-112` 已证明当前 notification publish 无论单处理器还是固定 fan-out,都和 `Mediator` 的 publish strategy 能力差距相关,而不只是“缺 benchmark” + - 当前分支相对 `origin/main` 的累计 branch diff 仍明显低于 `50` 文件阈值,因此适合用一个单模块、可回归、可文档化的能力切片继续自动推进 +- 本轮主线程决策: + - 新增 `GFramework.Cqrs/Notification/TaskWhenAllNotificationPublisher.cs`,提供公开内置并行 notification publisher,并把“同步抛出的处理器异常也收敛到返回任务中”作为实现约束 + - 在 `GFramework.Cqrs.Tests/Cqrs/CqrsNotificationPublisherTests.cs` 补齐针对新策略的回归,确认它不会像默认顺序发布器那样在首个失败处停止其余处理器 + - 更新 `GFramework.Cqrs/README.md` 与 `docs/zh-CN/core/cqrs.md`,写明切换方式,以及“不保证顺序 / 等待全部处理器完成 / 统一暴露异常或取消结果”的采用边界 +- 本轮权威验证: + - `dotnet build GFramework.Cqrs/GFramework.Cqrs.csproj -c Release` + - 结果:通过,`0 warning / 0 error` + - `dotnet test GFramework.Cqrs.Tests/GFramework.Cqrs.Tests.csproj -c Release --filter "FullyQualifiedName~CqrsNotificationPublisherTests"` + - 结果:通过 + - `python3 scripts/license-header.py --check --paths GFramework.Cqrs/Notification/TaskWhenAllNotificationPublisher.cs GFramework.Cqrs.Tests/Cqrs/CqrsNotificationPublisherTests.cs GFramework.Cqrs/README.md docs/zh-CN/core/cqrs.md` + - 结果:通过 + - `git diff --check` + - 结果:通过 +- 本轮结论: + - `GFramework.Cqrs` 现在不再只有“自定义 seam”这一种 notification publisher 扩展方式,而是先提供了一个仓库维护的内置并行策略,开始实质缩小和 `Mediator` 在 publisher strategy 上的能力差距 + - 这批改动保持默认顺序语义不变,因此风险主要落在“新策略的异常聚合和用户理解边界”,已通过测试和文档同步收口 + - 当前可以继续自动推进,但更合理的下一批应优先补新策略的 benchmark 或继续评估 notification publisher 配置面,而不是回头重复扩更多 fan-out benchmark + ### 阶段:notification fan-out publish benchmark(CQRS-REWRITE-RP-112) - 延续 `$gframework-batch-boot 50`,本轮没有直接切入 notification runtime 或 publisher strategy,而是先补齐固定 `4 handler` 的 fan-out publish 对照: diff --git a/docs/zh-CN/core/cqrs.md b/docs/zh-CN/core/cqrs.md index 7c5d2228..35dc0546 100644 --- a/docs/zh-CN/core/cqrs.md +++ b/docs/zh-CN/core/cqrs.md @@ -119,6 +119,25 @@ var playerId = await architecture.Context.SendRequestAsync( - 首个处理器抛出异常时立即停止后续分发 - 如果容器在 runtime 创建前已显式注册 `INotificationPublisher`,默认 runtime 会复用该策略;未注册时回退到内置顺序发布器 +如果你需要等待所有通知处理器并行完成,而不是沿用默认顺序语义,可以显式切换到内置 +`TaskWhenAllNotificationPublisher`: + +```csharp +using GFramework.Cqrs; +using GFramework.Cqrs.Notification; + +var runtime = CqrsRuntimeFactory.CreateRuntime( + container, + logger, + new TaskWhenAllNotificationPublisher()); +``` + +这条策略的边界也需要明确: + +- 不保证处理器执行顺序 +- 不会在首个处理器失败时立即停止其余处理器 +- 会在全部处理器结束后统一暴露异常或取消结果 + ## Request 与流式变体 除了最常见的 `Command` / `Query` / `Notification`,当前公开面还覆盖两类容易被忽略的入口: