mirror of
https://github.com/GeWuYou/GFramework.git
synced 2026-05-09 01:54:30 +08:00
feat(cqrs): 新增并行通知发布策略
- 新增 TaskWhenAllNotificationPublisher 内置并行通知发布器并保留默认顺序语义 - 补充通知发布策略回归测试与采用边界文档 - 更新 cqrs-rewrite 跟踪与执行追踪恢复点
This commit is contained in:
parent
c7af175f2e
commit
7ff4b628a1
@ -91,6 +91,41 @@ internal sealed class CqrsNotificationPublisherTests
|
||||
Assert.That(handler.ObservedContext, Is.SameAs(architectureContext.Object));
|
||||
}
|
||||
|
||||
/// <summary>
|
||||
/// 验证内置 `TaskWhenAll` 发布器会继续调度所有处理器,而不是沿用默认顺序发布器的失败即停语义。
|
||||
/// </summary>
|
||||
[Test]
|
||||
public async Task PublishAsync_Should_Invoke_All_Handlers_When_Using_TaskWhenAll_NotificationPublisher()
|
||||
{
|
||||
var trailingHandler = new RecordingNotificationHandler("second", []);
|
||||
var runtime = CreateRuntime(
|
||||
container =>
|
||||
{
|
||||
container
|
||||
.Setup(currentContainer => currentContainer.GetAll(typeof(INotificationHandler<PublisherNotification>)))
|
||||
.Returns(
|
||||
[
|
||||
new ThrowingNotificationHandler(),
|
||||
trailingHandler
|
||||
]);
|
||||
},
|
||||
new TaskWhenAllNotificationPublisher());
|
||||
|
||||
var publishTask = runtime.PublishAsync(new FakeCqrsContext(), new PublisherNotification()).AsTask();
|
||||
|
||||
try
|
||||
{
|
||||
await publishTask.ConfigureAwait(false);
|
||||
}
|
||||
catch (Exception)
|
||||
{
|
||||
// 并行发布会把处理器失败收敛到返回任务;这里仅消费异常并继续验证所有处理器都已被触发。
|
||||
}
|
||||
|
||||
Assert.That(trailingHandler.Invoked, Is.True);
|
||||
Assert.That(publishTask.Exception, Is.Not.Null);
|
||||
}
|
||||
|
||||
/// <summary>
|
||||
/// 验证默认通知发布器在零处理器场景下会保持静默完成。
|
||||
/// </summary>
|
||||
|
||||
@ -0,0 +1,71 @@
|
||||
// Copyright (c) 2025-2026 GeWuYou
|
||||
// SPDX-License-Identifier: Apache-2.0
|
||||
|
||||
using System.Threading.Tasks;
|
||||
using GFramework.Cqrs.Abstractions.Cqrs;
|
||||
|
||||
namespace GFramework.Cqrs.Notification;
|
||||
|
||||
/// <summary>
|
||||
/// 以内置 <c>Task.WhenAll(...)</c> 策略并行分发通知处理器。
|
||||
/// </summary>
|
||||
/// <remarks>
|
||||
/// <para>该实现会先为当前发布调用中的每个处理器创建独立执行任务,再等待全部任务完成。</para>
|
||||
/// <para>它不会保留默认顺序发布器的“首个异常立即停止”语义;如果多个处理器失败,返回任务会聚合这些异常。</para>
|
||||
/// <para>适合处理器之间互不依赖,且调用方更关心总耗时而不是处理顺序的场景。</para>
|
||||
/// </remarks>
|
||||
public sealed class TaskWhenAllNotificationPublisher : INotificationPublisher
|
||||
{
|
||||
/// <summary>
|
||||
/// 并行启动当前通知的所有处理器,并等待它们全部结束。
|
||||
/// </summary>
|
||||
/// <typeparam name="TNotification">通知类型。</typeparam>
|
||||
/// <param name="context">当前发布调用的执行上下文。</param>
|
||||
/// <param name="cancellationToken">取消令牌。</param>
|
||||
/// <returns>表示所有处理器都已完成的值任务。</returns>
|
||||
/// <exception cref="ArgumentNullException"><paramref name="context" /> 为 <see langword="null" />。</exception>
|
||||
public ValueTask PublishAsync<TNotification>(
|
||||
NotificationPublishContext<TNotification> context,
|
||||
CancellationToken cancellationToken = default)
|
||||
where TNotification : INotification
|
||||
{
|
||||
ArgumentNullException.ThrowIfNull(context);
|
||||
|
||||
return context.Handlers.Count switch
|
||||
{
|
||||
0 => ValueTask.CompletedTask,
|
||||
1 => context.InvokeHandlerAsync(context.Handlers[0], cancellationToken),
|
||||
_ => PublishCoreAsync(context, cancellationToken)
|
||||
};
|
||||
}
|
||||
|
||||
/// <summary>
|
||||
/// 为多处理器场景建立并行等待,确保单个处理器的同步异常也会被收敛到返回任务中。
|
||||
/// </summary>
|
||||
private static async ValueTask PublishCoreAsync<TNotification>(
|
||||
NotificationPublishContext<TNotification> context,
|
||||
CancellationToken cancellationToken)
|
||||
where TNotification : INotification
|
||||
{
|
||||
var tasks = new Task[context.Handlers.Count];
|
||||
|
||||
for (var index = 0; index < context.Handlers.Count; index++)
|
||||
{
|
||||
tasks[index] = InvokeHandlerSafelyAsync(context, context.Handlers[index], cancellationToken).AsTask();
|
||||
}
|
||||
|
||||
await Task.WhenAll(tasks).ConfigureAwait(false);
|
||||
}
|
||||
|
||||
/// <summary>
|
||||
/// 通过异步包装把同步抛出的处理器异常也转换成可聚合的任务结果。
|
||||
/// </summary>
|
||||
private static async ValueTask InvokeHandlerSafelyAsync<TNotification>(
|
||||
NotificationPublishContext<TNotification> context,
|
||||
object handler,
|
||||
CancellationToken cancellationToken)
|
||||
where TNotification : INotification
|
||||
{
|
||||
await context.InvokeHandlerAsync(handler, cancellationToken).ConfigureAwait(false);
|
||||
}
|
||||
}
|
||||
@ -50,6 +50,7 @@
|
||||
- `CqrsRuntimeFactory.cs`
|
||||
- `Internal/CqrsDispatcher.cs`
|
||||
- `Notification/INotificationPublisher.cs`
|
||||
- `Notification/TaskWhenAllNotificationPublisher.cs`
|
||||
- `Internal/CqrsHandlerRegistrar.cs`
|
||||
- `Internal/DefaultCqrsHandlerRegistrar.cs`
|
||||
- `Internal/DefaultCqrsRegistrationService.cs`
|
||||
@ -125,7 +126,22 @@ var playerId = await this.SendAsync(new CreatePlayerCommand(new CreatePlayerInpu
|
||||
- 通知分发
|
||||
- 通知会分发给所有已注册 `INotificationHandler<>`;零处理器时默认静默完成。
|
||||
- 默认通知发布器会按容器解析顺序逐个执行处理器,并在首个处理器抛出异常时立即停止后续分发。
|
||||
- 若需要等待所有处理器并行完成,可以在创建 runtime 时显式传入 `TaskWhenAllNotificationPublisher`;该策略不保证执行顺序,并会在全部处理器结束后聚合异常或取消结果。
|
||||
- 若容器在 runtime 创建前已显式注册 `INotificationPublisher`,默认 runtime 会复用该策略;未注册时回退到内置顺序发布器。
|
||||
|
||||
如果你需要切换到内置并行 notification publisher,可以在组合根里显式选择它:
|
||||
|
||||
```csharp
|
||||
using GFramework.Cqrs;
|
||||
using GFramework.Cqrs.Notification;
|
||||
|
||||
var runtime = CqrsRuntimeFactory.CreateRuntime(
|
||||
container,
|
||||
logger,
|
||||
new TaskWhenAllNotificationPublisher());
|
||||
```
|
||||
|
||||
对于走标准 `GFramework.Core` 启动路径的架构,也可以在 runtime 创建前预先向容器注册 `INotificationPublisher`,让默认基础设施复用同一策略。
|
||||
- 流式请求
|
||||
- 通过 `IStreamRequest<TResponse>` 和 `IStreamRequestHandler<,>` 返回 `IAsyncEnumerable<TResponse>`。
|
||||
- 当消费端程序集提供 generated stream invoker provider / descriptor 后,runtime 会优先消费这组 stream invoker 元数据;未命中时仍回退到既有反射 stream binding 创建路径。
|
||||
|
||||
@ -7,10 +7,14 @@ CQRS 迁移与收敛。
|
||||
|
||||
## 当前恢复点
|
||||
|
||||
- 恢复点编号:`CQRS-REWRITE-RP-112`
|
||||
- 恢复点编号:`CQRS-REWRITE-RP-113`
|
||||
- 当前阶段:`Phase 8`
|
||||
- 当前 PR 锚点:`PR #341`
|
||||
- 当前结论:
|
||||
- 当前 `RP-113` 已继续沿用 `$gframework-batch-boot 50`,并把 notification 线从 benchmark 对照推进到实际 runtime 能力:新增公开内置 `TaskWhenAllNotificationPublisher`,让 `GFramework.Cqrs` 在保留默认顺序发布器的同时,提供与 `Mediator` `TaskWhenAllPublisher` 对齐的并行 notification publish 策略
|
||||
- `TaskWhenAllNotificationPublisher` 当前语义明确为:零处理器静默完成,单处理器直接透传,多处理器并行启动并等待全部结束;它不保留默认顺序发布器的“首个异常立即停止”语义,而是把全部处理器的失败/取消结果收敛到同一个返回任务
|
||||
- 本轮同时补齐 `CqrsNotificationPublisherTests` 对新内置策略的回归,并更新 `GFramework.Cqrs/README.md` 与 `docs/zh-CN/core/cqrs.md`,把切换方式和语义边界写回用户可见文档;当前已提交 branch diff 仍明显低于 `$gframework-batch-boot 50` 的停止阈值
|
||||
- 这一批选择真正落一个内置 publisher strategy,而不是继续加 notification benchmark 维度;原因是 `RP-111` / `RP-112` 已经把 notification gap 量化清楚,下一步更高价值的是开始收口“能力差距”而不是继续重复建立对照数据
|
||||
- 当前 `RP-112` 已继续沿用 `$gframework-batch-boot 50`,并在 `RP-111` 的单处理器 notification 对照基础上补齐固定 `4 handler` 的 fan-out publish benchmark:新增 `NotificationFanOutBenchmarks`,对比 baseline、`GFramework.Cqrs`、NuGet `Mediator` concrete runtime 与 `MediatR`
|
||||
- `NotificationFanOutBenchmarks` 当前 short-job 基线约为 baseline `8.302 ns / 0 B`、`Mediator` `4.314 ns / 0 B`、`MediatR` `230.304 ns / 1256 B`、`GFramework.Cqrs` `434.413 ns / 408 B`;这说明 notification fan-out 的差距已经不只体现在单处理器 publish,而是在固定 4 处理器场景下依然保持相近量级
|
||||
- 本轮仍然只扩 benchmark 对照口径,没有直接修改 notification runtime 或 publisher 策略语义;原因是当前更高价值的事实是先量化“单处理器”和“固定 fan-out”两条 notification 路径的外部差距,再决定下一批是否值得切进 publisher strategy 或 runtime 热点
|
||||
@ -361,8 +365,8 @@ CQRS 迁移与收敛。
|
||||
|
||||
## 下一推荐步骤
|
||||
|
||||
1. 当前 turn 仍可继续自动推进;若下一批继续沿用 `$gframework-batch-boot` 且优先处理 notification 线,先评估 publisher strategy 或异常/取消语义的对照,而不是继续机械扩充更多同层级 fan-out benchmark
|
||||
2. 若下一轮要切回更高价值热点,优先重新审视 request dispatch 常量开销或 notification publisher strategy,而不是新增 generated notification invoker/provider 这一类 steady-state 收益信号偏弱的 runtime seam
|
||||
1. 当前 turn 仍可继续自动推进;若下一批继续沿用 `$gframework-batch-boot 50` 且仍留在 notification 线,优先补 `TaskWhenAllNotificationPublisher` 的 benchmark / 异常聚合文档细化,或继续评估是否需要第二个内置 publisher strategy,而不是回头再加同层级 fan-out 对照
|
||||
2. 若下一轮要切回更高价值热点,优先重新审视 request dispatch 常量开销或 notification publisher 策略配置面,而不是新增 generated notification invoker/provider 这一类 steady-state 收益信号偏弱的 runtime seam
|
||||
3. 若 benchmark 对照需要继续贴近 `Mediator` 官方设计,再评估 `Mediator` 的 compile-time lifetime / stream 对照矩阵,或给 stream 引入 scoped host 基线,而不是回头重试已被 benchmark 否决的 `GetAll(Type)` 零行为探测方案
|
||||
|
||||
## 活跃文档
|
||||
|
||||
@ -2,6 +2,29 @@
|
||||
|
||||
## 2026-05-08
|
||||
|
||||
### 阶段:内置 `TaskWhenAll` notification publisher(CQRS-REWRITE-RP-113)
|
||||
|
||||
- 延续 `$gframework-batch-boot 50`,本轮不再继续堆 notification benchmark 维度,而是直接把上一批已经量化清楚的 capability gap 收口到 runtime:
|
||||
- `RP-111` / `RP-112` 已证明当前 notification publish 无论单处理器还是固定 fan-out,都和 `Mediator` 的 publish strategy 能力差距相关,而不只是“缺 benchmark”
|
||||
- 当前分支相对 `origin/main` 的累计 branch diff 仍明显低于 `50` 文件阈值,因此适合用一个单模块、可回归、可文档化的能力切片继续自动推进
|
||||
- 本轮主线程决策:
|
||||
- 新增 `GFramework.Cqrs/Notification/TaskWhenAllNotificationPublisher.cs`,提供公开内置并行 notification publisher,并把“同步抛出的处理器异常也收敛到返回任务中”作为实现约束
|
||||
- 在 `GFramework.Cqrs.Tests/Cqrs/CqrsNotificationPublisherTests.cs` 补齐针对新策略的回归,确认它不会像默认顺序发布器那样在首个失败处停止其余处理器
|
||||
- 更新 `GFramework.Cqrs/README.md` 与 `docs/zh-CN/core/cqrs.md`,写明切换方式,以及“不保证顺序 / 等待全部处理器完成 / 统一暴露异常或取消结果”的采用边界
|
||||
- 本轮权威验证:
|
||||
- `dotnet build GFramework.Cqrs/GFramework.Cqrs.csproj -c Release`
|
||||
- 结果:通过,`0 warning / 0 error`
|
||||
- `dotnet test GFramework.Cqrs.Tests/GFramework.Cqrs.Tests.csproj -c Release --filter "FullyQualifiedName~CqrsNotificationPublisherTests"`
|
||||
- 结果:通过
|
||||
- `python3 scripts/license-header.py --check --paths GFramework.Cqrs/Notification/TaskWhenAllNotificationPublisher.cs GFramework.Cqrs.Tests/Cqrs/CqrsNotificationPublisherTests.cs GFramework.Cqrs/README.md docs/zh-CN/core/cqrs.md`
|
||||
- 结果:通过
|
||||
- `git diff --check`
|
||||
- 结果:通过
|
||||
- 本轮结论:
|
||||
- `GFramework.Cqrs` 现在不再只有“自定义 seam”这一种 notification publisher 扩展方式,而是先提供了一个仓库维护的内置并行策略,开始实质缩小和 `Mediator` 在 publisher strategy 上的能力差距
|
||||
- 这批改动保持默认顺序语义不变,因此风险主要落在“新策略的异常聚合和用户理解边界”,已通过测试和文档同步收口
|
||||
- 当前可以继续自动推进,但更合理的下一批应优先补新策略的 benchmark 或继续评估 notification publisher 配置面,而不是回头重复扩更多 fan-out benchmark
|
||||
|
||||
### 阶段:notification fan-out publish benchmark(CQRS-REWRITE-RP-112)
|
||||
|
||||
- 延续 `$gframework-batch-boot 50`,本轮没有直接切入 notification runtime 或 publisher strategy,而是先补齐固定 `4 handler` 的 fan-out publish 对照:
|
||||
|
||||
@ -119,6 +119,25 @@ var playerId = await architecture.Context.SendRequestAsync(
|
||||
- 首个处理器抛出异常时立即停止后续分发
|
||||
- 如果容器在 runtime 创建前已显式注册 `INotificationPublisher`,默认 runtime 会复用该策略;未注册时回退到内置顺序发布器
|
||||
|
||||
如果你需要等待所有通知处理器并行完成,而不是沿用默认顺序语义,可以显式切换到内置
|
||||
`TaskWhenAllNotificationPublisher`:
|
||||
|
||||
```csharp
|
||||
using GFramework.Cqrs;
|
||||
using GFramework.Cqrs.Notification;
|
||||
|
||||
var runtime = CqrsRuntimeFactory.CreateRuntime(
|
||||
container,
|
||||
logger,
|
||||
new TaskWhenAllNotificationPublisher());
|
||||
```
|
||||
|
||||
这条策略的边界也需要明确:
|
||||
|
||||
- 不保证处理器执行顺序
|
||||
- 不会在首个处理器失败时立即停止其余处理器
|
||||
- 会在全部处理器结束后统一暴露异常或取消结果
|
||||
|
||||
## Request 与流式变体
|
||||
|
||||
除了最常见的 `Command` / `Query` / `Notification`,当前公开面还覆盖两类容易被忽略的入口:
|
||||
|
||||
Loading…
x
Reference in New Issue
Block a user