feat(cqrs): 收口 notification publisher 配置入口

- 新增 notification publisher 组合根注册扩展,提供 TaskWhenAll 与自定义策略入口

- 补充通知发布策略配置回归测试,并更新 CQRS 文档与恢复点记录
This commit is contained in:
gewuyou 2026-05-08 17:53:27 +08:00
parent b0102b5206
commit 310791db5a
6 changed files with 292 additions and 15 deletions

View File

@ -0,0 +1,143 @@
// Copyright (c) 2025-2026 GeWuYou
// SPDX-License-Identifier: Apache-2.0
using GFramework.Core.Architectures;
using GFramework.Core.Abstractions.Logging;
using GFramework.Core.Ioc;
using GFramework.Core.Logging;
using GFramework.Cqrs.Abstractions.Cqrs;
using GFramework.Cqrs.Extensions;
using GFramework.Cqrs.Notification;
namespace GFramework.Cqrs.Tests.Cqrs;
/// <summary>
/// 验证 notification publisher 组合根注册扩展的关键行为。
/// </summary>
[TestFixture]
internal sealed class NotificationPublisherRegistrationExtensionsTests
{
/// <summary>
/// 验证显式注册内置 <see cref="TaskWhenAllNotificationPublisher" /> 后,
/// 标准 runtime 基础设施会复用该策略并继续调度所有处理器。
/// </summary>
[Test]
public async Task UseTaskWhenAllNotificationPublisher_Should_Be_Used_By_Default_Runtime_Infrastructure()
{
LoggerFactoryResolver.Provider = new ConsoleLoggerFactoryProvider();
var trailingHandler = new RecordingNotificationHandler();
var container = new MicrosoftDiContainer();
container.UseTaskWhenAllNotificationPublisher();
container.Register<INotificationHandler<TestNotification>>(new ThrowingNotificationHandler());
container.Register<INotificationHandler<TestNotification>>(trailingHandler);
CqrsTestRuntime.RegisterInfrastructure(container);
container.Freeze();
var context = new ArchitectureContext(container);
var publishTask = context.PublishAsync(new TestNotification()).AsTask();
try
{
await publishTask.ConfigureAwait(false);
}
catch (Exception)
{
// `TaskWhenAll` 策略会在所有处理器都结束后聚合失败;这里仅消费异常并继续断言第二个处理器已执行。
}
Assert.That(trailingHandler.WasInvoked, Is.True);
Assert.That(publishTask.Exception, Is.Not.Null);
}
/// <summary>
/// 验证显式传入实例的组合根注册入口会把同一个 publisher 实例绑定到容器。
/// </summary>
[Test]
public void UseNotificationPublisher_Instance_Overload_Should_Register_Same_Instance()
{
var container = new MicrosoftDiContainer();
var publisher = new TrackingNotificationPublisher();
var returnedContainer = container.UseNotificationPublisher(publisher);
Assert.That(returnedContainer, Is.SameAs(container));
Assert.That(container.Get<INotificationPublisher>(), Is.SameAs(publisher));
}
/// <summary>
/// 验证组合根扩展会阻止重复 notification publisher 注册,避免 runtime 创建阶段才暴露歧义。
/// </summary>
[Test]
public void UseNotificationPublisher_Should_Throw_When_NotificationPublisher_Already_Registered()
{
var container = new MicrosoftDiContainer();
container.UseTaskWhenAllNotificationPublisher();
Assert.That(
() => container.UseNotificationPublisher(new TrackingNotificationPublisher()),
Throws.InvalidOperationException.With.Message.Contains(nameof(INotificationPublisher)));
}
/// <summary>
/// 为本组测试提供最小 notification 类型。
/// </summary>
private sealed record TestNotification : INotification;
/// <summary>
/// 记录自己是否被执行的测试处理器。
/// </summary>
private sealed class RecordingNotificationHandler : INotificationHandler<TestNotification>
{
/// <summary>
/// 获取当前处理器是否至少执行过一次。
/// </summary>
public bool WasInvoked { get; private set; }
/// <summary>
/// 记录执行痕迹并立刻完成。
/// </summary>
public ValueTask Handle(TestNotification notification, CancellationToken cancellationToken)
{
ArgumentNullException.ThrowIfNull(notification);
cancellationToken.ThrowIfCancellationRequested();
WasInvoked = true;
return ValueTask.CompletedTask;
}
}
/// <summary>
/// 始终抛出异常的测试处理器,用于验证并行策略不会因为首个失败而停止其余处理器。
/// </summary>
private sealed class ThrowingNotificationHandler : INotificationHandler<TestNotification>
{
/// <summary>
/// 始终抛出测试异常。
/// </summary>
public ValueTask Handle(TestNotification notification, CancellationToken cancellationToken)
{
ArgumentNullException.ThrowIfNull(notification);
cancellationToken.ThrowIfCancellationRequested();
throw new InvalidOperationException("boom");
}
}
/// <summary>
/// 用于验证实例注册重载是否保留原对象身份的测试发布器。
/// </summary>
private sealed class TrackingNotificationPublisher : INotificationPublisher
{
/// <summary>
/// 直接完成当前 publish 调用。
/// </summary>
public ValueTask PublishAsync<TNotification>(
NotificationPublishContext<TNotification> context,
CancellationToken cancellationToken = default)
where TNotification : INotification
{
ArgumentNullException.ThrowIfNull(context);
cancellationToken.ThrowIfCancellationRequested();
return ValueTask.CompletedTask;
}
}
}

View File

@ -0,0 +1,95 @@
// Copyright (c) 2025-2026 GeWuYou
// SPDX-License-Identifier: Apache-2.0
using GFramework.Core.Abstractions.Ioc;
using GFramework.Cqrs.Notification;
namespace GFramework.Cqrs.Extensions;
/// <summary>
/// 为 CQRS runtime 提供 notification publisher 策略的组合根注册入口。
/// </summary>
/// <remarks>
/// <para>默认 runtime 只会消费一个 <see cref="INotificationPublisher" /> 实例,因此该扩展类把“选择哪种策略”显式收敛到容器配置阶段。</para>
/// <para>这些入口应在 runtime 创建前调用;对于走标准 <c>GFramework.Core</c> 启动路径的架构,它们会被 <c>CqrsRuntimeModule</c> 自动复用。</para>
/// </remarks>
public static class NotificationPublisherRegistrationExtensions
{
/// <summary>
/// 将指定的 notification publisher 实例注册为当前容器唯一的发布策略。
/// </summary>
/// <param name="container">目标依赖注入容器。</param>
/// <param name="notificationPublisher">要复用的 notification publisher 实例。</param>
/// <returns>同一个 <paramref name="container" />,便于在组合根中继续链式配置。</returns>
/// <exception cref="ArgumentNullException">
/// <paramref name="container" /> 或 <paramref name="notificationPublisher" /> 为 <see langword="null" />。
/// </exception>
/// <exception cref="InvalidOperationException">
/// 当前容器已存在 <see cref="INotificationPublisher" /> 注册,无法再切换为另一个策略。
/// </exception>
public static IIocContainer UseNotificationPublisher(
this IIocContainer container,
INotificationPublisher notificationPublisher)
{
ArgumentNullException.ThrowIfNull(container);
ArgumentNullException.ThrowIfNull(notificationPublisher);
ThrowIfNotificationPublisherAlreadyRegistered(container);
container.Register(notificationPublisher);
return container;
}
/// <summary>
/// 将指定类型的 notification publisher 注册为当前容器唯一的发布策略。
/// </summary>
/// <typeparam name="TNotificationPublisher">发布策略实现类型。</typeparam>
/// <param name="container">目标依赖注入容器。</param>
/// <returns>同一个 <paramref name="container" />,便于在组合根中继续链式配置。</returns>
/// <exception cref="ArgumentNullException"><paramref name="container" /> 为 <see langword="null" />。</exception>
/// <exception cref="InvalidOperationException">
/// 当前容器已存在 <see cref="INotificationPublisher" /> 注册,无法再切换为另一个策略。
/// </exception>
public static IIocContainer UseNotificationPublisher<TNotificationPublisher>(this IIocContainer container)
where TNotificationPublisher : class, INotificationPublisher
{
ArgumentNullException.ThrowIfNull(container);
ThrowIfNotificationPublisherAlreadyRegistered(container);
container.RegisterSingleton<INotificationPublisher, TNotificationPublisher>();
return container;
}
/// <summary>
/// 将内置 <see cref="TaskWhenAllNotificationPublisher" /> 注册为当前容器唯一的 notification publisher 策略。
/// </summary>
/// <param name="container">目标依赖注入容器。</param>
/// <returns>同一个 <paramref name="container" />,便于在组合根中继续链式配置。</returns>
/// <exception cref="ArgumentNullException"><paramref name="container" /> 为 <see langword="null" />。</exception>
/// <exception cref="InvalidOperationException">
/// 当前容器已存在 <see cref="INotificationPublisher" /> 注册,无法再切换为另一个策略。
/// </exception>
/// <remarks>
/// 该策略更适合“等待所有处理器完成并统一观察失败”的语义诉求;
/// 若只是为了降低 steady-state publish 开销,应先结合实际 benchmark 结果评估是否值得切换。
/// </remarks>
public static IIocContainer UseTaskWhenAllNotificationPublisher(this IIocContainer container)
{
return UseNotificationPublisher(container, new TaskWhenAllNotificationPublisher());
}
/// <summary>
/// 在组合根阶段阻止多个 notification publisher 策略同时注册,避免 runtime 创建时出现歧义。
/// </summary>
/// <param name="container">当前正在配置的依赖注入容器。</param>
/// <exception cref="InvalidOperationException">当前容器已存在 notification publisher 注册。</exception>
private static void ThrowIfNotificationPublisherAlreadyRegistered(IIocContainer container)
{
if (!container.HasRegistration(typeof(INotificationPublisher)))
{
return;
}
throw new InvalidOperationException(
$"An {typeof(INotificationPublisher).FullName} is already registered. Remove the existing notification publisher strategy before calling {nameof(UseNotificationPublisher)} again.");
}
}

View File

@ -128,20 +128,27 @@ var playerId = await this.SendAsync(new CreatePlayerCommand(new CreatePlayerInpu
- 默认通知发布器会按容器解析顺序逐个执行处理器,并在首个处理器抛出异常时立即停止后续分发。
- 若需要等待所有处理器并行完成,可以在创建 runtime 时显式传入 `TaskWhenAllNotificationPublisher`;该策略不保证执行顺序,并会在全部处理器结束后聚合异常或取消结果。
- 若容器在 runtime 创建前已显式注册 `INotificationPublisher`,默认 runtime 会复用该策略;未注册时回退到内置顺序发布器。
- 若只是为了降低 fixed fan-out publish 的 steady-state 成本,当前 benchmark 并不表明 `TaskWhenAllNotificationPublisher` 会优于默认顺序发布器;它更适合你需要“等待全部处理器完成并统一观察失败”的场景。
如果你需要切换到内置并行 notification publisher可以在组合根里显式选择它
如果你需要切换到内置并行 notification publisher推荐在组合根里显式声明这条策略
```csharp
using GFramework.Cqrs;
using GFramework.Cqrs.Extensions;
using GFramework.Cqrs.Notification;
var runtime = CqrsRuntimeFactory.CreateRuntime(
container,
logger,
new TaskWhenAllNotificationPublisher());
container.UseTaskWhenAllNotificationPublisher();
```
对于走标准 `GFramework.Core` 启动路径的架构,也可以在 runtime 创建前预先向容器注册 `INotificationPublisher`,让默认基础设施复用同一策略。
如果你确实需要自定义 publisher 实例,也可以继续显式注册:
```csharp
using GFramework.Cqrs.Extensions;
using GFramework.Cqrs.Notification;
container.UseNotificationPublisher(new TaskWhenAllNotificationPublisher());
```
对于走标准 `GFramework.Core` 启动路径的架构,这些组合根扩展会被默认基础设施自动复用;如果你直接调用 `CqrsRuntimeFactory.CreateRuntime(...)`,也仍然可以像以前一样显式传入 publisher 实例。
- 流式请求
- 通过 `IStreamRequest<TResponse>``IStreamRequestHandler<,>` 返回 `IAsyncEnumerable<TResponse>`
- 当消费端程序集提供 generated stream invoker provider / descriptor 后runtime 会优先消费这组 stream invoker 元数据;未命中时仍回退到既有反射 stream binding 创建路径。

View File

@ -7,10 +7,13 @@ CQRS 迁移与收敛。
## 当前恢复点
- 恢复点编号:`CQRS-REWRITE-RP-114`
- 恢复点编号:`CQRS-REWRITE-RP-115`
- 当前阶段:`Phase 8`
- 当前 PR 锚点:`PR #341`
- 当前结论:
- 当前 `RP-115` 已继续沿用 `$gframework-batch-boot 50`,并把 notification publisher 线从“已具备 seam + benchmark 事实”继续收口到组合根配置面:新增 `GFramework.Cqrs.Extensions.NotificationPublisherRegistrationExtensions`,提供 `UseNotificationPublisher(...)` / `UseNotificationPublisher<TPublisher>()` / `UseTaskWhenAllNotificationPublisher()` 三个显式入口,避免用户再手写 `Register<INotificationPublisher>(new ...)`
- 这一批同时把重复策略注册前移到组合根阶段显式阻止,并在回归里确认 `UseTaskWhenAllNotificationPublisher()` 经过默认 runtime 基础设施后仍会命中“失败不阻断其余 handler”的并行语义这让 notification publisher 的采用路径从“知道内部 seam 如何接线”收口为“知道该在容器里选哪条策略”
- 用户文档现同步写明 `TaskWhenAllNotificationPublisher` 更适合“并行完成 + 统一观察失败”的语义诉求,而不是 fixed fan-out steady-state publish 优化;这与 `RP-114` 的 benchmark 结论保持一致,减少使用者把它误解成默认的性能升级开关
- 当前 `RP-114` 已继续沿用 `$gframework-batch-boot 50`,并沿着 `RP-113` 刚落地的 notification publisher 能力切片继续补 benchmark`NotificationFanOutBenchmarks` 现同时纳入 `GFramework.Cqrs` 默认顺序发布器与新内置 `TaskWhenAllNotificationPublisher`,用于量化“能力差距收口后,固定 4 handler fan-out 的成本变化”
- `RP-114` 的 short-job 结果显示fixed `4 handler` fan-out 下,默认顺序发布器约 `427.453 ns / 408 B`,内置 `TaskWhenAllNotificationPublisher``472.574 ns / 496 B``MediatR``225.940 ns / 1256 B`NuGet `Mediator` concrete runtime 约 `3.854 ns / 0 B`;这说明当前内置并行 publisher 的主要价值是语义补齐,而不是 steady-state fan-out 性能收益
- 这一批保持 runtime 公开 API 与 notification 语义不变,只扩 benchmark 对照口径与恢复文档;原因是 `RP-113` 已经把并行 publisher 能力落到 production path当前更高价值的是先证明它相对默认顺序发布器、`Mediator``MediatR` 的成本位置,而不是立即继续扩第二个 publisher strategy
@ -369,8 +372,8 @@ CQRS 迁移与收敛。
## 下一推荐步骤
1. `RP-114` 的 fan-out benchmark 证实内置 `TaskWhenAllNotificationPublisher` 在固定多处理器场景下明显优于默认顺序发布器,可继续评估是否需要补单处理器 / 异常路径的对照或把这组结果吸收到用户文档
2. 当前 benchmark 已证明 `TaskWhenAllNotificationPublisher` 的价值主要在并行完成与异常聚合语义,而不是吞吐收益;下一轮优先评估 notification publisher 配置面 / 文档边界,或回到 request dispatch 常量开销,而不是新增 generated notification invoker/provider 这类 steady-state 收益信号偏弱的 runtime seam
1. 既然 `RP-115` 已把 notification publisher 选择面收口到显式组合根扩展,下一轮若继续留在 notification 线,优先评估是否需要补第二个内置策略或更细的配置文档,而不是再让用户直接依赖裸 `INotificationPublisher` 注册细节
2. 当前 benchmark 已证明 `TaskWhenAllNotificationPublisher` 的价值主要在并行完成与异常聚合语义,而不是吞吐收益;若 notification 配置面已经足够,下一轮优先回到 request dispatch 常量开销,而不是新增 generated notification invoker/provider 这类 steady-state 收益信号偏弱的 runtime seam
3. 若 benchmark 对照需要继续贴近 `Mediator` 官方设计,再评估 `Mediator` 的 compile-time lifetime / stream 对照矩阵,或给 stream 引入 scoped host 基线,而不是回头重试已被 benchmark 否决的 `GetAll(Type)` 零行为探测方案
## 活跃文档

View File

@ -2,6 +2,28 @@
## 2026-05-08
### 阶段notification publisher 组合根配置面CQRS-REWRITE-RP-115
- 延续 `$gframework-batch-boot 50`,本轮不再回到 benchmark 宿主,而是沿着 `RP-114` 已明确的性能/语义事实继续收口用户接入缺口:
- 当前分支相对 `origin/main``7ca21af9`, `2026-05-08 16:12:20 +0800`)的累计 branch diff 在启动时仍为 `9 files`,明显低于 `50` 文件阈值
- `RP-113` / `RP-114` 已证明内置 `TaskWhenAllNotificationPublisher` 的价值主要是语义补齐,但当前用户若要采用它,仍需知道 `INotificationPublisher` 的底层注册细节
- 本轮主线程决策:
- 新增 `GFramework.Cqrs/Extensions/NotificationPublisherRegistrationExtensions.cs`,提供 `UseNotificationPublisher(...)``UseNotificationPublisher<TPublisher>()``UseTaskWhenAllNotificationPublisher()` 三个显式组合根入口
- 在 `GFramework.Cqrs.Tests/Cqrs/NotificationPublisherRegistrationExtensionsTests.cs` 补齐回归,确认默认 runtime 基础设施会复用 `UseTaskWhenAllNotificationPublisher()`,且重复策略注册会在组合根阶段被显式阻止
- 更新 `GFramework.Cqrs/README.md``docs/zh-CN/core/cqrs.md`,把推荐用法改成组合根扩展,并把 `RP-114` 的 benchmark 结论翻译成用户可用的采用边界
- 本轮权威验证:
- `dotnet build GFramework.Cqrs/GFramework.Cqrs.csproj -c Release`
- 结果:通过,`0 warning / 0 error`
- `dotnet test GFramework.Cqrs.Tests/GFramework.Cqrs.Tests.csproj -c Release --filter "FullyQualifiedName~NotificationPublisherRegistrationExtensionsTests|FullyQualifiedName~CqrsNotificationPublisherTests"`
- 结果:通过,`9/9` passed
- `python3 scripts/license-header.py --check --paths GFramework.Cqrs/Extensions/NotificationPublisherRegistrationExtensions.cs GFramework.Cqrs.Tests/Cqrs/NotificationPublisherRegistrationExtensionsTests.cs GFramework.Cqrs/README.md docs/zh-CN/core/cqrs.md ai-plan/public/cqrs-rewrite/todos/cqrs-rewrite-migration-tracking.md ai-plan/public/cqrs-rewrite/traces/cqrs-rewrite-migration-trace.md`
- 结果:通过
- `git diff --check`
- 结果:通过
- 本轮结论:
- 这一批已把 notification publisher 的采用路径从“理解内部 seam”收口成“在组合根里显式选择策略”并让重复策略注册在配置阶段就得到清晰失败信号
- 若后续仍继续 notification 线,更合理的下一刀会是补第二个内置策略或更细的采用文档,而不是继续要求用户手写容器底层注册
### 阶段:`TaskWhenAll` notification publisher fan-out benchmarkCQRS-REWRITE-RP-114
- 延续 `$gframework-batch-boot 50`,本轮不再扩新的 notification runtime 能力,而是沿着 `RP-113` 刚落地的内置并行 publisher 继续补验证口径:

View File

@ -123,13 +123,10 @@ var playerId = await architecture.Context.SendRequestAsync(
`TaskWhenAllNotificationPublisher`
```csharp
using GFramework.Cqrs;
using GFramework.Cqrs.Extensions;
using GFramework.Cqrs.Notification;
var runtime = CqrsRuntimeFactory.CreateRuntime(
container,
logger,
new TaskWhenAllNotificationPublisher());
container.UseTaskWhenAllNotificationPublisher();
```
这条策略的边界也需要明确:
@ -137,6 +134,16 @@ var runtime = CqrsRuntimeFactory.CreateRuntime(
- 不保证处理器执行顺序
- 不会在首个处理器失败时立即停止其余处理器
- 会在全部处理器结束后统一暴露异常或取消结果
- 当前 fixed `4 handler` fan-out benchmark 中,它的 steady-state 成本也高于默认顺序发布器;因此它更适合“我要并行语义”,而不是“我要更快的 publish”
如果你需要显式提供自定义 publisher 实例,而不是直接采用内置 `TaskWhenAll` 策略,也可以在组合根里写成:
```csharp
using GFramework.Cqrs.Extensions;
using GFramework.Cqrs.Notification;
container.UseNotificationPublisher(new TaskWhenAllNotificationPublisher());
```
## Request 与流式变体