fix(cqrs): 修正request管道中的generated invoker回退

- 修正 request pipeline 末端继续复用缓存 RequestInvoker 的运行时语义
- 补充 generated request invoker 与 stream 缺失 handler 的回归测试覆盖
- 更新 benchmark reader-facing 注释与 cqrs-rewrite 恢复入口记录
This commit is contained in:
gewuyou 2026-05-13 09:04:13 +08:00
parent 2bc6502297
commit e5b173c29a
9 changed files with 198 additions and 30 deletions

View File

@ -22,8 +22,8 @@ using GeneratedMediator = Mediator.Mediator;
namespace GFramework.Cqrs.Benchmarks.Messaging;
/// <summary>
/// 对比固定 4 个处理器的 notification fan-out publish 在 baseline、GFramework.CQRS、NuGet `Mediator`
/// 与 MediatR 之间的开销。
/// 对比固定 4 个处理器的 notification fan-out publish 在 baseline、GFramework.CQRS 默认顺序发布器、
/// GFramework.CQRS 内置 <c>TaskWhenAllNotificationPublisher</c>、NuGet `Mediator` 与 MediatR 之间的开销。
/// </summary>
[Config(typeof(Config))]
public class NotificationFanOutBenchmarks

View File

@ -132,7 +132,7 @@ public class RequestBenchmarks
}
/// <summary>
/// 通过 `ai-libs/Mediator` 的 source-generated concrete mediator 发送 request作为高性能对照组。
/// 通过 NuGet `Mediator` 的 source-generated concrete mediator 发送 request作为高性能对照组。
/// </summary>
/// <returns>代表当前 `Mediator` request dispatch 完成的值任务。</returns>
[Benchmark]

View File

@ -201,7 +201,7 @@ public class RequestLifetimeBenchmarks
}
/// <summary>
/// 按生命周期把 benchmark request handler 注册到 GFramework 容器。
/// 按生命周期把 benchmark request handler 注册到 GFramework 容器。
/// </summary>
/// <param name="container">当前 benchmark 拥有并负责释放的容器。</param>
/// <param name="lifetime">待比较的 handler 生命周期。</param>
@ -248,7 +248,7 @@ public class RequestLifetimeBenchmarks
}
/// <summary>
/// Benchmark request。
/// Benchmark request。
/// </summary>
/// <param name="Id">请求标识。</param>
public sealed record BenchmarkRequest(Guid Id) :

View File

@ -168,7 +168,7 @@ public class StreamingBenchmarks
}
/// <summary>
/// 通过 `ai-libs/Mediator` 的 source-generated concrete mediator 创建 stream并按当前观测模式消费。
/// 通过 NuGet `Mediator` 的 source-generated concrete mediator 创建 stream并按当前观测模式消费。
/// </summary>
/// <returns>按当前观测模式完成 stream 消费后的等待句柄。</returns>
[Benchmark]

View File

@ -147,6 +147,29 @@ internal sealed class CqrsDispatcherContextValidationTests
Throws.InvalidOperationException.With.Message.Contains("does not implement IArchitectureContext"));
}
/// <summary>
/// 验证 stream handler 缺失时dispatcher 会在建流调用点同步抛出异常,
/// 而不是返回一个延迟到枚举阶段才失败的异步流。
/// </summary>
[Test]
public void CreateStream_Should_Throw_When_Handler_Is_Missing()
{
var runtime = CreateRuntime(
container =>
{
container
.Setup(currentContainer => currentContainer.Get(typeof(IStreamRequestHandler<ContextAwareStreamRequest, int>)))
.Returns((object?)null);
container
.Setup(currentContainer => currentContainer.HasRegistration(typeof(IStreamPipelineBehavior<ContextAwareStreamRequest, int>)))
.Returns(false);
});
Assert.That(
() => runtime.CreateStream(new FakeCqrsContext(), new ContextAwareStreamRequest()),
Throws.InvalidOperationException.With.Message.Contains("No CQRS stream handler registered"));
}
/// <summary>
/// 验证当 stream pipeline behavior 需要上下文注入、但当前 CQRS 上下文不实现
/// <see cref="GFramework.Core.Abstractions.Architectures.IArchitectureContext" /> 时,

View File

@ -2,6 +2,7 @@
// SPDX-License-Identifier: Apache-2.0
using System.Reflection;
using System.Threading;
using GFramework.Core.Abstractions.Logging;
using GFramework.Core.Architectures;
using GFramework.Core.Ioc;
@ -28,6 +29,7 @@ internal sealed class CqrsGeneratedRequestInvokerProviderTests
{
_previousLoggerFactoryProvider = LoggerFactoryResolver.Provider;
LoggerFactoryResolver.Provider = new ConsoleLoggerFactoryProvider();
GeneratedRequestPipelineTrackingBehavior.InvocationCount = 0;
GeneratedStreamPipelineTrackingBehavior.InvocationCount = 0;
ClearRegistrarCaches();
ClearDispatcherCaches();
@ -40,6 +42,7 @@ internal sealed class CqrsGeneratedRequestInvokerProviderTests
public void TearDown()
{
LoggerFactoryResolver.Provider = _previousLoggerFactoryProvider ?? new ConsoleLoggerFactoryProvider();
GeneratedRequestPipelineTrackingBehavior.InvocationCount = 0;
GeneratedStreamPipelineTrackingBehavior.InvocationCount = 0;
ClearRegistrarCaches();
ClearDispatcherCaches();
@ -181,6 +184,30 @@ internal sealed class CqrsGeneratedRequestInvokerProviderTests
Assert.That(response, Is.EqualTo("generated-hidden:payload"));
}
/// <summary>
/// 验证 generated request invoker 与 request pipeline 行为同时存在时,
/// dispatcher 仍会保持 generated invoker 优先,并正确复用现有 request 执行链。
/// </summary>
[Test]
public async Task SendAsync_Should_Use_Generated_Request_Invoker_Inside_Request_Pipeline()
{
var generatedAssembly = CreateGeneratedRequestInvokerAssembly();
var container = new MicrosoftDiContainer();
container.RegisterCqrsPipelineBehavior<GeneratedRequestPipelineTrackingBehavior>();
CqrsTestRuntime.RegisterHandlers(container, generatedAssembly.Object);
container.Freeze();
var context = new ArchitectureContext(container);
var response = await context.SendRequestAsync(new GeneratedRequestInvokerRequest("payload")).ConfigureAwait(false);
Assert.Multiple(() =>
{
Assert.That(response, Is.EqualTo("generated:payload"));
Assert.That(GeneratedRequestPipelineTrackingBehavior.InvocationCount, Is.EqualTo(1));
});
}
/// <summary>
/// 验证 dispatcher 在首次创建 stream binding 时,会优先消费 generated stream invoker provider。
/// </summary>
@ -608,6 +635,40 @@ internal sealed class CqrsGeneratedRequestInvokerProviderTests
Assert.That(results, Is.EqualTo([30, 31]));
}
/// <summary>
/// 记录 generated request invoker 与 request pipeline 行为组合时的命中次数。
/// </summary>
private sealed class GeneratedRequestPipelineTrackingBehavior
: IPipelineBehavior<GeneratedRequestInvokerRequest, string>
{
private static int _invocationCount;
/// <summary>
/// 获取或重置当前测试进程中的行为触发次数。
/// </summary>
public static int InvocationCount
{
get => Volatile.Read(ref _invocationCount);
set => Volatile.Write(ref _invocationCount, value);
}
/// <summary>
/// 记录一次行为执行,然后继续执行 generated request invoker。
/// </summary>
/// <param name="message">当前请求消息。</param>
/// <param name="next">下一个处理阶段。</param>
/// <param name="cancellationToken">取消令牌。</param>
/// <returns>下游处理阶段返回的响应。</returns>
public ValueTask<string> Handle(
GeneratedRequestInvokerRequest message,
MessageHandlerDelegate<GeneratedRequestInvokerRequest, string> next,
CancellationToken cancellationToken)
{
Interlocked.Increment(ref _invocationCount);
return next(message, cancellationToken);
}
}
/// <summary>
/// 模拟返回实例 request invoker 方法的 generated registry。
/// </summary>

View File

@ -150,7 +150,7 @@ internal sealed class CqrsDispatcher(
}
return dispatchBinding.GetPipelineExecutor(behaviors.Count)
.Invoke(handler, behaviors, request, cancellationToken);
.Invoke(handler, behaviors, dispatchBinding.RequestInvoker, request, cancellationToken);
}
catch (Exception exception)
{
@ -604,12 +604,14 @@ internal sealed class CqrsDispatcher(
private static ValueTask<TResponse> InvokeRequestPipelineExecutorAsync<TRequest, TResponse>(
object handler,
IReadOnlyList<object> behaviors,
RequestInvoker<TResponse> requestInvoker,
object request,
CancellationToken cancellationToken)
where TRequest : IRequest<TResponse>
{
var invocation = new RequestPipelineInvocation<TRequest, TResponse>(
(IRequestHandler<TRequest, TResponse>)handler,
handler,
requestInvoker,
behaviors);
return invocation.InvokeAsync((TRequest)request, cancellationToken);
}
@ -669,6 +671,7 @@ internal sealed class CqrsDispatcher(
private delegate ValueTask<TResponse> RequestPipelineInvoker<TResponse>(
object handler,
IReadOnlyList<object> behaviors,
RequestInvoker<TResponse> requestInvoker,
object request,
CancellationToken cancellationToken);
@ -978,6 +981,7 @@ internal sealed class CqrsDispatcher(
public ValueTask<TResponse> Invoke(
object handler,
IReadOnlyList<object> behaviors,
RequestInvoker<TResponse> requestInvoker,
object request,
CancellationToken cancellationToken)
{
@ -987,7 +991,7 @@ internal sealed class CqrsDispatcher(
$"Cached request pipeline executor expected {BehaviorCount} behaviors, but received {behaviors.Count}.");
}
return invoker(handler, behaviors, request, cancellationToken);
return invoker(handler, behaviors, requestInvoker, request, cancellationToken);
}
}
@ -1147,11 +1151,13 @@ internal sealed class CqrsDispatcher(
/// 该对象只存在于本次分发,不会跨请求保留容器解析出的实例。
/// </summary>
private sealed class RequestPipelineInvocation<TRequest, TResponse>(
IRequestHandler<TRequest, TResponse> handler,
object handler,
RequestInvoker<TResponse> requestInvoker,
IReadOnlyList<object> behaviors)
where TRequest : IRequest<TResponse>
{
private readonly IRequestHandler<TRequest, TResponse> _handler = handler;
private readonly object _handler = handler;
private readonly RequestInvoker<TResponse> _requestInvoker = requestInvoker;
private readonly IReadOnlyList<object> _behaviors = behaviors;
private readonly MessageHandlerDelegate<TRequest, TResponse>?[] _continuations =
new MessageHandlerDelegate<TRequest, TResponse>?[behaviors.Count + 1];
@ -1198,11 +1204,16 @@ internal sealed class CqrsDispatcher(
}
/// <summary>
/// 调用最终请求处理
/// 调用最终请求处理入口
/// </summary>
/// <remarks>
/// request pipeline 末端必须继续复用当前 binding 上缓存的 <see cref="RequestInvoker{TResponse}" />
/// 这样 generated request invoker provider 才能在接入 pipeline 后保持与无 pipeline 路径一致的调用语义,
/// 而不是退回到接口虚调用路径。
/// </remarks>
private ValueTask<TResponse> InvokeHandlerAsync(TRequest request, CancellationToken cancellationToken)
{
return _handler.Handle(request, cancellationToken);
return _requestInvoker(_handler, request, cancellationToken);
}
/// <summary>

View File

@ -12,21 +12,32 @@ CQRS 迁移与收敛。
## 当前恢复点
- 恢复点编号:`CQRS-REWRITE-RP-142`
- 恢复点编号:`CQRS-REWRITE-RP-143`
- 当前阶段:`Phase 8`
- 当前 PR 锚点:`PR #350MERGED2026-05-13`
- 当前结论:
- 本轮按 `$gframework-batch-boot 50` 恢复后,先核对本地仓库真值,确认 `feat/cqrs-optimization` 已与
`origin/main` 指向同一合并提交 `4837aa2a`,此前 tracking 中的 `PR #350OPEN``14 files` 等事实已过期。
- 当前 topic 的 benchmark runtime 修正、XML 文档补齐与 `README` 边界收口已经随 `PR #350` 合并进入
`origin/main`,不再存在可继续扩批的活动写面。
- 这轮收口不再继续新增 benchmark 或测试切片,而是把 public recovery 入口刷新为“已合并、无 branch diff、等待下一轮新任务”的状态
避免后续 `boot` 落回已完成的 PR 上下文。
- recovery 刷新提交落地后,当前 branch-wide 停止原因仍不是 `50 files` 阈值,而是语义边界已经完成:
- 刷新开始前,`origin/main...HEAD` 的评估结果为 `0 files / 0 lines`
- 当前工作树干净
- 当前唯一新增 diff 只来自 `cqrs-rewrite` 的 public recovery 文档刷新
- 继续在同一 topic 上机械扩批不会产生新的低风险、可验证切片
- 在用户允许 subagent 后,本轮按 `context-budget 优先、reviewability 次之、50 files 仅作粗阈值` 重开了一波小型 multi-agent 批处理,
只接受单文件或窄文件组的 docs/test 切片。
- 已接受的低风险切片:
- `GFramework.Cqrs.Benchmarks/Messaging/RequestLifetimeBenchmarks.cs`
- 修正两处 XML 文档缩进异常
- `GFramework.Cqrs.Benchmarks/Messaging/NotificationFanOutBenchmarks.cs`
- 类级摘要明确当前 `GFramework.CQRS` fan-out 对照包含默认顺序发布器与内置 `TaskWhenAllNotificationPublisher`
- `GFramework.Cqrs.Benchmarks/Messaging/RequestBenchmarks.cs`
- `GFramework.Cqrs.Benchmarks/Messaging/StreamingBenchmarks.cs`
- 将 `Mediator` reader-facing 注释统一为 “NuGet `Mediator` 的 source-generated concrete mediator”
- `GFramework.Cqrs.Tests/Cqrs/CqrsDispatcherContextValidationTests.cs`
- 补一条 stream 缺失 handler 的失败语义回归,固定当前分支在调用点同步抛 `InvalidOperationException`
- worker 在 `CqrsGeneratedRequestInvokerProviderTests.cs` 补 request/generated + pipeline 对称测试后,主线程确认这不是环境噪音,而是命中了真实运行时缺口:
- request 路径在接入 `IPipelineBehavior<,>` 后,会退回 `_handler.Handle(...)`
- 因此 generated request invoker 无法像 stream 路径那样在 pipeline 末端继续保持优先
- 主线程已在 `GFramework.Cqrs/Internal/CqrsDispatcher.cs` 收口该缺口:
- request pipeline executor 现在显式复用当前 binding 缓存的 `RequestInvoker`
- generated request invoker provider 在接入 pipeline 后保持与无 pipeline 路径一致的调用语义
- 当前 stop decision
- 不再继续下一波
- 原因不是 `50 files` 阈值耗尽;当前 accepted scope 仍然很小
- 停止原因是当前上下文已接近本轮安全预算,而剩余候选只剩 descriptor / factory / publisher 类小测试,继续扩批收益明显下降
## 当前活跃事实
@ -34,12 +45,19 @@ CQRS 迁移与收敛。
- 当前 HEAD / 基线:`origin/main @ 4837aa2a (2026-05-12 20:37:56 +0800)`
- 当前 PR`PR #350已合并到 origin/main`
- 当前写面:
- `GFramework.Cqrs.Benchmarks/Messaging/NotificationFanOutBenchmarks.cs`
- `GFramework.Cqrs.Benchmarks/Messaging/RequestBenchmarks.cs`
- `GFramework.Cqrs.Benchmarks/Messaging/RequestLifetimeBenchmarks.cs`
- `GFramework.Cqrs.Benchmarks/Messaging/StreamingBenchmarks.cs`
- `GFramework.Cqrs.Tests/Cqrs/CqrsDispatcherContextValidationTests.cs`
- `GFramework.Cqrs.Tests/Cqrs/CqrsGeneratedRequestInvokerProviderTests.cs`
- `GFramework.Cqrs/Internal/CqrsDispatcher.cs`
- `ai-plan/public/cqrs-rewrite/todos/cqrs-rewrite-migration-tracking.md`
- `ai-plan/public/cqrs-rewrite/traces/cqrs-rewrite-migration-trace.md`
- 当前基线:
- `origin/main @ 4837aa2a (2026-05-12 20:37:56 +0800)`
- 当前已提交 branch diff 只涉及 `2``ai-plan/public/cqrs-rewrite/**` recovery 文档
- 当前工作面已收口为 public recovery 文档刷新当前工作树干净CQRS benchmark 代码不再处于活跃修改状态
- 当前 batch working-tree diff`7` 个源码 / 测试文件
- 当前工作面已收口为 docs/test 小切片 + 1 处 request pipeline runtime 修正;没有重新打开 benchmark 工程设计级改造
- 最近已合并提交:
- `2dd9435c` `fix(cqrs-benchmarks): 修正Mediator基准运行时配置`
- `e3532fc2` `feat(cqrs-benchmarks): 补齐request生命周期的Mediator对照`
@ -51,6 +69,7 @@ CQRS 迁移与收敛。
或独立 benchmark 工程,而不是在同一份 source-generated 产物上切换 runtime `ServiceLifetime`
- 如果 `feat/cqrs-optimization` 继续承载新的 CQRS 任务而不先分支public recovery 入口会把“已合并的 PR #350”与下一轮新工作混在一起。
- 若未来再开 benchmark XML / docs 波次,仍需要主线程先抽样核对代表文件,避免重复接受误报 inventory。
- 剩余低风险候选主要是 `NotificationPublisher` / invoker descriptor / `CqrsRuntimeFactory` 的单文件测试;它们不是当前上下文预算下的高收益下一波。
## 最近权威验证
@ -62,13 +81,22 @@ CQRS 迁移与收敛。
- 备注:确认当前 branch diff 始终只覆盖两份 `ai-plan` recovery 文档,没有重新打开 CQRS benchmark 代码写面
- `dotnet build GFramework.Cqrs.Benchmarks/GFramework.Cqrs.Benchmarks.csproj -c Release`
- 结果:通过,`0 warning / 0 error`
- 备注:作为当前 recovery 刷新任务的最小 build validation继续确认 benchmark 工程在合并后保持可编译
- 备注:确认 benchmark reader-facing 文档收口后工程仍保持 `Release` 可编译
- `dotnet build GFramework.Cqrs/GFramework.Cqrs.csproj -c Release`
- 结果:通过,`0 warning / 0 error`
- 备注:确认 request pipeline 复用 generated invoker 的 runtime 修正未引入编译回归
- `dotnet test GFramework.Cqrs.Tests/GFramework.Cqrs.Tests.csproj -c Release --filter "FullyQualifiedName~CqrsGeneratedRequestInvokerProviderTests"`
- 结果:通过,`Passed: 28, Failed: 0`
- 备注:确认 generated request invoker 在接入 request pipeline 后仍保持优先,并通过新增回归测试锁定
- `dotnet test GFramework.Cqrs.Tests/GFramework.Cqrs.Tests.csproj -c Release --filter "FullyQualifiedName~CqrsDispatcherContextValidationTests"`
- 结果:通过,`Passed: 7, Failed: 0`
- 备注:确认 stream 缺失 handler 的失败语义回归与既有上下文校验测试仍全部通过
## 下一推荐步骤
1. 若要继续 CQRS 主题的新一轮实现,先把当前 recovery 刷新提交合并或重放到新的 topic branch再从最新 `origin/main` 继续,而不是复用已完成的 `PR #350` 上下文。
1. 若继续 CQRS 主题的低风险硬化,优先从 `NotificationPublisher`、invoker descriptor、`CqrsRuntimeFactory` 三类单文件测试候选中再挑一条,不要重新打开 benchmark 设计级议题
2. 若后续重新打开 `Mediator` 生命周期 parity 工作,优先设计独立 compile-time config / 独立 benchmark 工程,并把该设计单独记录到新的 tracking phase。
3. 若只是恢复本 worktree 继续其他 topic可把 `cqrs-rewrite` 视为“当前已在自然停点完成”的历史入口,不再默认把 benchmark README / XML 清扫当作活跃批处理目标。
3. 若只是恢复其他 topic可把当前 `cqrs-rewrite` active 入口视为“本轮已在上下文预算前的自然停点完成”
## 活跃文档

View File

@ -7,6 +7,51 @@ SPDX-License-Identifier: Apache-2.0
## 2026-05-13
### 阶段:允许 subagent 后的低风险多 Agent 收口CQRS-REWRITE-RP-143
- 在用户允许 subagent 后,主线程切到小型 multi-agent 协调模式,但仍把 critical path 保留在本地:
- 主线程负责候选筛选、runtime 缺口判断、最终验证与 `ai-plan`
- explorer 只做只读盘点
- worker 只接单文件或窄文件组 ownership
- explorer 结论汇总:
- 没有值得继续扩成“实现型 benchmark 波次”的大切片
- 低风险候选主要是 reader-facing 术语对齐与单文件回归测试
- `Mediator` 生命周期 parity、notification fan-out 生命周期矩阵、benchmark 工程级拆分仍判定为高风险
- accepted worker / 主线程 scope
- `RequestLifetimeBenchmarks.cs`
- 主线程修正 2 处 XML 文档缩进异常
- `NotificationFanOutBenchmarks.cs`
- `RequestBenchmarks.cs`
- `StreamingBenchmarks.cs`
- worker 收口 benchmark reader-facing 术语build 通过
- `CqrsDispatcherContextValidationTests.cs`
- worker 补 stream 缺失 handler 的同步抛错回归targeted test 通过
- `CqrsGeneratedRequestInvokerProviderTests.cs`
- worker 补 generated request + pipeline 对称测试
- 首次运行失败,暴露 request pipeline 路径在接入 behavior 后退回 `_handler.Handle(...)`
- `CqrsDispatcher.cs`
- 主线程将 request pipeline 末端改为继续复用当前 binding 的 `RequestInvoker`
- 使 generated request invoker provider 在 pipeline 存在时保持与无 pipeline 路径一致
- 本轮权威验证:
- `dotnet build GFramework.Cqrs/GFramework.Cqrs.csproj -c Release`
- 结果:通过,`0 warning / 0 error`
- `dotnet test GFramework.Cqrs.Tests/GFramework.Cqrs.Tests.csproj -c Release --filter "FullyQualifiedName~CqrsGeneratedRequestInvokerProviderTests"`
- 结果:通过,`Passed: 28, Failed: 0`
- `dotnet test GFramework.Cqrs.Tests/GFramework.Cqrs.Tests.csproj -c Release --filter "FullyQualifiedName~CqrsDispatcherContextValidationTests"`
- 结果:通过,`Passed: 7, Failed: 0`
- `dotnet build GFramework.Cqrs.Benchmarks/GFramework.Cqrs.Benchmarks.csproj -c Release`
- 结果:通过,`0 warning / 0 error`
- 当前 stop decision
- 不再继续下一波
- 原因不是 branch-size当前变更仍远低于 `$gframework-batch-boot 50``
- 停止原因是主线程上下文已接近本轮安全预算,而剩余候选只剩收益较低的单文件测试硬化
- 当前下一步:
- 更新 `ai-plan/public/cqrs-rewrite/**`
- 运行 license-header / `git diff --check`
- 提交本轮多 Agent 小波次收口
## 2026-05-13
### 阶段PR #350 合并后的 recovery 入口刷新CQRS-REWRITE-RP-142
- 继续按 `$gframework-batch-boot 50` 恢复当前 topic但启动时先核对分支真值而不是沿用 active tracking 中的旧 PR 状态。