diff --git a/GFramework.Cqrs.Benchmarks/Messaging/GeneratedStreamInvokerBenchmarkRegistry.cs b/GFramework.Cqrs.Benchmarks/Messaging/GeneratedStreamInvokerBenchmarkRegistry.cs new file mode 100644 index 00000000..433da49f --- /dev/null +++ b/GFramework.Cqrs.Benchmarks/Messaging/GeneratedStreamInvokerBenchmarkRegistry.cs @@ -0,0 +1,94 @@ +// Copyright (c) 2025-2026 GeWuYou +// SPDX-License-Identifier: Apache-2.0 + +using System; +using System.Collections.Generic; +using System.Reflection; +using System.Threading; +using GFramework.Core.Abstractions.Logging; +using GFramework.Cqrs.Abstractions.Cqrs; +using Microsoft.Extensions.DependencyInjection; + +namespace GFramework.Cqrs.Benchmarks.Messaging; + +/// +/// 为 benchmark 手写一个“生成后等价物” stream registry,用于驱动真实的 generated stream invoker provider 运行时接线路径。 +/// +public sealed class GeneratedStreamInvokerBenchmarkRegistry : + GFramework.Cqrs.ICqrsHandlerRegistry, + GFramework.Cqrs.ICqrsStreamInvokerProvider, + GFramework.Cqrs.IEnumeratesCqrsStreamInvokerDescriptors +{ + private static readonly GFramework.Cqrs.CqrsStreamInvokerDescriptor Descriptor = + new( + typeof(IStreamRequestHandler< + StreamInvokerBenchmarks.GeneratedBenchmarkStreamRequest, + StreamInvokerBenchmarks.GeneratedBenchmarkResponse>), + typeof(GeneratedStreamInvokerBenchmarkRegistry).GetMethod( + nameof(InvokeGeneratedStreamHandler), + BindingFlags.Public | BindingFlags.Static) + ?? throw new InvalidOperationException("Missing generated stream invoker benchmark method.")); + + private static readonly IReadOnlyList Descriptors = + [ + new GFramework.Cqrs.CqrsStreamInvokerDescriptorEntry( + typeof(StreamInvokerBenchmarks.GeneratedBenchmarkStreamRequest), + typeof(StreamInvokerBenchmarks.GeneratedBenchmarkResponse), + Descriptor) + ]; + + /// + /// 将 generated benchmark stream handler 注册到目标服务集合。 + /// + public void Register(IServiceCollection services, ILogger logger) + { + ArgumentNullException.ThrowIfNull(services); + ArgumentNullException.ThrowIfNull(logger); + + services.AddTransient( + typeof(IStreamRequestHandler< + StreamInvokerBenchmarks.GeneratedBenchmarkStreamRequest, + StreamInvokerBenchmarks.GeneratedBenchmarkResponse>), + typeof(StreamInvokerBenchmarks.GeneratedBenchmarkStreamHandler)); + logger.Debug("Registered generated stream invoker benchmark handler."); + } + + /// + /// 返回当前 provider 暴露的全部 generated stream invoker 描述符。 + /// + public IReadOnlyList GetDescriptors() + { + return Descriptors; + } + + /// + /// 为目标流式请求/响应类型对返回 generated stream invoker 描述符。 + /// + public bool TryGetDescriptor( + Type requestType, + Type responseType, + out GFramework.Cqrs.CqrsStreamInvokerDescriptor? descriptor) + { + if (requestType == typeof(StreamInvokerBenchmarks.GeneratedBenchmarkStreamRequest) && + responseType == typeof(StreamInvokerBenchmarks.GeneratedBenchmarkResponse)) + { + descriptor = Descriptor; + return true; + } + + descriptor = null; + return false; + } + + /// + /// 模拟 generated stream invoker provider 产出的开放静态调用入口。 + /// + public static object InvokeGeneratedStreamHandler(object handler, object request, CancellationToken cancellationToken) + { + var typedHandler = (IStreamRequestHandler< + StreamInvokerBenchmarks.GeneratedBenchmarkStreamRequest, + StreamInvokerBenchmarks.GeneratedBenchmarkResponse>)handler; + var typedRequest = (StreamInvokerBenchmarks.GeneratedBenchmarkStreamRequest)request; + return typedHandler.Handle(typedRequest, cancellationToken); + } +} diff --git a/GFramework.Cqrs.Benchmarks/Messaging/StreamInvokerBenchmarks.cs b/GFramework.Cqrs.Benchmarks/Messaging/StreamInvokerBenchmarks.cs new file mode 100644 index 00000000..90c80b92 --- /dev/null +++ b/GFramework.Cqrs.Benchmarks/Messaging/StreamInvokerBenchmarks.cs @@ -0,0 +1,307 @@ +// Copyright (c) 2025-2026 GeWuYou +// SPDX-License-Identifier: Apache-2.0 + +using BenchmarkDotNet.Attributes; +using BenchmarkDotNet.Columns; +using BenchmarkDotNet.Configs; +using BenchmarkDotNet.Diagnosers; +using BenchmarkDotNet.Jobs; +using BenchmarkDotNet.Order; +using System; +using System.Collections.Generic; +using System.Reflection; +using System.Threading; +using System.Threading.Tasks; +using GFramework.Core.Abstractions.Logging; +using GFramework.Core.Ioc; +using GFramework.Core.Logging; +using GFramework.Cqrs.Abstractions.Cqrs; +using MediatR; +using Microsoft.Extensions.DependencyInjection; + +[assembly: GFramework.Cqrs.CqrsHandlerRegistryAttribute( + typeof(GFramework.Cqrs.Benchmarks.Messaging.GeneratedStreamInvokerBenchmarkRegistry))] + +namespace GFramework.Cqrs.Benchmarks.Messaging; + +/// +/// 对比 stream 完整枚举在 direct handler、GFramework 反射路径、GFramework generated invoker 路径与 MediatR 之间的开销差异。 +/// +[Config(typeof(Config))] +public class StreamInvokerBenchmarks +{ + private MicrosoftDiContainer _reflectionContainer = null!; + private ICqrsRuntime _reflectionRuntime = null!; + private MicrosoftDiContainer _generatedContainer = null!; + private ICqrsRuntime _generatedRuntime = null!; + private ServiceProvider _serviceProvider = null!; + private IMediator _mediatr = null!; + private ReflectionBenchmarkStreamHandler _baselineHandler = null!; + private ReflectionBenchmarkStreamRequest _reflectionRequest = null!; + private GeneratedBenchmarkStreamRequest _generatedRequest = null!; + private MediatRBenchmarkStreamRequest _mediatrRequest = null!; + + /// + /// 配置 stream invoker benchmark 的公共输出格式。 + /// + private sealed class Config : ManualConfig + { + public Config() + { + AddJob(Job.Default); + AddColumnProvider(DefaultColumnProviders.Instance); + AddColumn(new CustomColumn("Scenario", static (_, _) => "StreamInvoker")); + AddDiagnoser(MemoryDiagnoser.Default); + WithOrderer(new DefaultOrderer(SummaryOrderPolicy.FastestToSlowest, MethodOrderPolicy.Declared)); + } + } + + /// + /// 构建 reflection / generated / MediatR 三组 stream dispatch 对照宿主。 + /// + [GlobalSetup] + public void Setup() + { + LoggerFactoryResolver.Provider = new ConsoleLoggerFactoryProvider + { + MinLevel = LogLevel.Fatal + }; + Fixture.Setup("StreamInvoker", handlerCount: 1, pipelineCount: 0); + ClearDispatcherCaches(); + + _baselineHandler = new ReflectionBenchmarkStreamHandler(); + _reflectionRequest = new ReflectionBenchmarkStreamRequest(Guid.NewGuid(), 3); + _generatedRequest = new GeneratedBenchmarkStreamRequest(Guid.NewGuid(), 3); + _mediatrRequest = new MediatRBenchmarkStreamRequest(Guid.NewGuid(), 3); + + _reflectionContainer = new MicrosoftDiContainer(); + _reflectionContainer.RegisterTransient, ReflectionBenchmarkStreamHandler>(); + _reflectionRuntime = GFramework.Cqrs.CqrsRuntimeFactory.CreateRuntime( + _reflectionContainer, + LoggerFactoryResolver.Provider.CreateLogger(nameof(StreamInvokerBenchmarks) + ".Reflection")); + + _generatedContainer = new MicrosoftDiContainer(); + _generatedContainer.RegisterCqrsHandlersFromAssembly(typeof(StreamInvokerBenchmarks).Assembly); + _generatedRuntime = GFramework.Cqrs.CqrsRuntimeFactory.CreateRuntime( + _generatedContainer, + LoggerFactoryResolver.Provider.CreateLogger(nameof(StreamInvokerBenchmarks) + ".Generated")); + + var services = new ServiceCollection(); + services.AddSingleton, MediatRBenchmarkStreamHandler>(); + services.AddMediatR(static options => options.RegisterServicesFromAssembly(typeof(StreamInvokerBenchmarks).Assembly)); + _serviceProvider = services.BuildServiceProvider(); + _mediatr = _serviceProvider.GetRequiredService(); + } + + /// + /// 释放 MediatR 对照组使用的 DI 宿主,并清理静态 dispatcher 缓存。 + /// + [GlobalCleanup] + public void Cleanup() + { + _serviceProvider.Dispose(); + ClearDispatcherCaches(); + } + + /// + /// 直接调用最小 stream handler 并完整枚举,作为 dispatch 额外开销 baseline。 + /// + [Benchmark(Baseline = true)] + public async ValueTask Stream_Baseline() + { + await foreach (var response in _baselineHandler.Handle(_reflectionRequest, CancellationToken.None).ConfigureAwait(false)) + { + _ = response; + } + } + + /// + /// 通过 GFramework.CQRS 反射 stream binding 路径创建并完整枚举 stream。 + /// + [Benchmark] + public async ValueTask Stream_GFrameworkReflection() + { + await foreach (var response in _reflectionRuntime.CreateStream(BenchmarkContext.Instance, _reflectionRequest, CancellationToken.None) + .ConfigureAwait(false)) + { + _ = response; + } + } + + /// + /// 通过 generated stream invoker provider 预热后的 GFramework.CQRS runtime 创建并完整枚举 stream。 + /// + [Benchmark] + public async ValueTask Stream_GFrameworkGenerated() + { + await foreach (var response in _generatedRuntime.CreateStream(BenchmarkContext.Instance, _generatedRequest, CancellationToken.None) + .ConfigureAwait(false)) + { + _ = response; + } + } + + /// + /// 通过 MediatR 创建并完整枚举 stream,作为外部对照。 + /// + [Benchmark] + public async ValueTask Stream_MediatR() + { + await foreach (var response in _mediatr.CreateStream(_mediatrRequest, CancellationToken.None).ConfigureAwait(false)) + { + _ = response; + } + } + + /// + /// 清空 dispatcher 静态缓存,避免上一轮基准残留的 generated metadata 影响当前对照。 + /// + private static void ClearDispatcherCaches() + { + ClearDispatcherCache("NotificationDispatchBindings"); + ClearDispatcherCache("RequestDispatchBindings"); + ClearDispatcherCache("StreamDispatchBindings"); + ClearDispatcherCache("GeneratedRequestInvokers"); + ClearDispatcherCache("GeneratedStreamInvokers"); + } + + /// + /// 通过反射定位并清空 dispatcher 的指定缓存字段。 + /// + /// 要清理的静态缓存字段名。 + private static void ClearDispatcherCache(string fieldName) + { + var field = typeof(GFramework.Cqrs.CqrsRuntimeFactory).Assembly + .GetType("GFramework.Cqrs.Internal.CqrsDispatcher", throwOnError: true)! + .GetField(fieldName, BindingFlags.NonPublic | BindingFlags.Static) + ?? throw new InvalidOperationException($"Missing dispatcher cache field {fieldName}."); + var cache = field.GetValue(null) + ?? throw new InvalidOperationException($"Dispatcher cache field {fieldName} returned null."); + var clearMethod = cache.GetType().GetMethod("Clear", BindingFlags.Public | BindingFlags.Instance) + ?? throw new InvalidOperationException( + $"Dispatcher cache field {fieldName} does not expose a Clear method."); + _ = clearMethod.Invoke(cache, null); + } + + /// + /// Reflection runtime stream request。 + /// + /// 请求标识。 + /// 返回元素数量。 + public sealed record ReflectionBenchmarkStreamRequest(Guid Id, int ItemCount) : + GFramework.Cqrs.Abstractions.Cqrs.IStreamRequest; + + /// + /// Reflection runtime stream response。 + /// + /// 响应标识。 + public sealed record ReflectionBenchmarkResponse(Guid Id); + + /// + /// Generated runtime stream request。 + /// + /// 请求标识。 + /// 返回元素数量。 + public sealed record GeneratedBenchmarkStreamRequest(Guid Id, int ItemCount) : + GFramework.Cqrs.Abstractions.Cqrs.IStreamRequest; + + /// + /// Generated runtime stream response。 + /// + /// 响应标识。 + public sealed record GeneratedBenchmarkResponse(Guid Id); + + /// + /// MediatR stream request。 + /// + /// 请求标识。 + /// 返回元素数量。 + public sealed record MediatRBenchmarkStreamRequest(Guid Id, int ItemCount) : + MediatR.IStreamRequest; + + /// + /// MediatR stream response。 + /// + /// 响应标识。 + public sealed record MediatRBenchmarkResponse(Guid Id); + + /// + /// Reflection runtime 的最小 stream request handler。 + /// + public sealed class ReflectionBenchmarkStreamHandler : + GFramework.Cqrs.Abstractions.Cqrs.IStreamRequestHandler + { + /// + /// 处理 reflection benchmark stream request。 + /// + public IAsyncEnumerable Handle( + ReflectionBenchmarkStreamRequest request, + CancellationToken cancellationToken) + { + return EnumerateAsync( + request.Id, + request.ItemCount, + static id => new ReflectionBenchmarkResponse(id), + cancellationToken); + } + } + + /// + /// Generated runtime 的最小 stream request handler。 + /// + public sealed class GeneratedBenchmarkStreamHandler : + GFramework.Cqrs.Abstractions.Cqrs.IStreamRequestHandler + { + /// + /// 处理 generated benchmark stream request。 + /// + public IAsyncEnumerable Handle( + GeneratedBenchmarkStreamRequest request, + CancellationToken cancellationToken) + { + return EnumerateAsync( + request.Id, + request.ItemCount, + static id => new GeneratedBenchmarkResponse(id), + cancellationToken); + } + } + + /// + /// MediatR 对照组的最小 stream request handler。 + /// + public sealed class MediatRBenchmarkStreamHandler : + MediatR.IStreamRequestHandler + { + /// + /// 处理 MediatR benchmark stream request。 + /// + public IAsyncEnumerable Handle( + MediatRBenchmarkStreamRequest request, + CancellationToken cancellationToken) + { + return EnumerateAsync( + request.Id, + request.ItemCount, + static id => new MediatRBenchmarkResponse(id), + cancellationToken); + } + } + + /// + /// 为三组 stream benchmark 构造相同形状的低噪声异步枚举,避免枚举体差异干扰 invoker 对照。 + /// + private static async IAsyncEnumerable EnumerateAsync( + Guid id, + int itemCount, + Func responseFactory, + [System.Runtime.CompilerServices.EnumeratorCancellation] CancellationToken cancellationToken) + { + for (var index = 0; index < itemCount; index++) + { + cancellationToken.ThrowIfCancellationRequested(); + yield return responseFactory(id); + await Task.CompletedTask.ConfigureAwait(false); + } + } +} diff --git a/GFramework.Cqrs.Benchmarks/README.md b/GFramework.Cqrs.Benchmarks/README.md index c98a11fb..3ac5fccb 100644 --- a/GFramework.Cqrs.Benchmarks/README.md +++ b/GFramework.Cqrs.Benchmarks/README.md @@ -22,6 +22,8 @@ - `Initialization` 与 `ColdStart` 两组 request startup 成本对比,补齐与 `Mediator` comparison benchmark 更接近的 startup 维度 - `Messaging/RequestInvokerBenchmarks.cs` - direct handler、`GFramework.Cqrs` reflection runtime、handwritten generated-invoker runtime 与 `MediatR` 的 request steady-state dispatch 对比 +- `Messaging/StreamInvokerBenchmarks.cs` + - direct handler、`GFramework.Cqrs` reflection runtime、handwritten generated-invoker runtime 与 `MediatR` 的 stream 完整枚举对比 - `Messaging/NotificationBenchmarks.cs` - `GFramework.Cqrs` runtime 与 `MediatR` 的单处理器 notification publish 对比 - `Messaging/StreamingBenchmarks.cs` @@ -38,5 +40,6 @@ dotnet run --project GFramework.Cqrs.Benchmarks/GFramework.Cqrs.Benchmarks.cspro ## 后续扩展方向 - generated invoker provider 与纯反射 dispatch 对比 +- generated stream invoker provider 与纯反射建流对比 - registration / service lifetime 矩阵 - request / stream 的真实 source-generator 产物与 handwritten generated provider 对照 diff --git a/ai-plan/public/cqrs-rewrite/todos/cqrs-rewrite-migration-tracking.md b/ai-plan/public/cqrs-rewrite/todos/cqrs-rewrite-migration-tracking.md index cc0fff4b..4f8c6f36 100644 --- a/ai-plan/public/cqrs-rewrite/todos/cqrs-rewrite-migration-tracking.md +++ b/ai-plan/public/cqrs-rewrite/todos/cqrs-rewrite-migration-tracking.md @@ -7,7 +7,7 @@ CQRS 迁移与收敛。 ## 当前恢复点 -- 恢复点编号:`CQRS-REWRITE-RP-088` +- 恢复点编号:`CQRS-REWRITE-RP-089` - 当前阶段:`Phase 8` - 当前 PR 锚点:`PR #323` - 当前结论: @@ -24,6 +24,7 @@ CQRS 迁移与收敛。 - `RP-086` 已补齐 request pipeline `0 / 1 / 4` 数量矩阵,开始把 benchmark 关注点从单纯 messaging steady-state 扩展到行为编排开销 - `RP-087` 已补齐 request startup benchmark,把 initialization 与 cold-start 维度正式纳入 `GFramework.Cqrs.Benchmarks` - 当前 `RP-088` 已补齐 request invoker reflection / generated-provider 对照,开始直接量化 dispatcher 预热 generated descriptor 的收益 + - 当前 `RP-089` 已补齐 stream invoker reflection / generated-provider 对照,使 generated descriptor 预热收益从 request 扩展到 stream 路径 - `ai-plan` active 入口现以 `PR #323` 和 `RP-082` 为唯一权威恢复锚点;`PR #307`、其他更早 PR 与阶段细节均以下方归档或说明为准 ## 当前活跃事实 @@ -67,6 +68,13 @@ CQRS 迁移与收敛。 - `dotnet build GFramework.Cqrs.Benchmarks/GFramework.Cqrs.Benchmarks.csproj -c Release` - 结果:通过,`0 warning / 0 error` - 备注:包含新增 `RequestInvokerBenchmarks` 与 handwritten generated registry/provider 后再次复核通过 +- `dotnet build GFramework.Cqrs.Benchmarks/GFramework.Cqrs.Benchmarks.csproj -c Release` + - 结果:通过,`0 warning / 0 error` + - 备注:包含新增 `StreamInvokerBenchmarks` 与 handwritten generated stream registry/provider 后再次复核通过 +- `GIT_DIR= GIT_WORK_TREE= python3 scripts/license-header.py --check` + - 结果:通过 +- `git diff --check` + - 结果:通过 - `GIT_DIR= GIT_WORK_TREE= python3 scripts/license-header.py --check` - 结果:通过 - `git diff --check` @@ -119,7 +127,7 @@ CQRS 迁移与收敛。 ## 下一推荐步骤 1. 继续处理 `PR #323` 的剩余 review 收尾,优先保持 `ai-plan` active 入口与 trace 的单一锚点一致 -2. 若继续推进“吸收 Mediator 设计哲学”的切片,优先扩展 benchmark 场景矩阵到 registration / service lifetime、stream generated provider 与更贴近 `Mediator` concrete runtime 的对照 +2. 若继续推进“吸收 Mediator 设计哲学”的切片,优先扩展 benchmark 场景矩阵到 registration / service lifetime、notification publish strategy 或更贴近 `Mediator` concrete runtime 的对照 3. 在进入下一批 runtime / generator 收敛前,保持最小 Release build、targeted test 或 benchmark project build 作为权威验证 ## 活跃文档 diff --git a/ai-plan/public/cqrs-rewrite/traces/cqrs-rewrite-migration-trace.md b/ai-plan/public/cqrs-rewrite/traces/cqrs-rewrite-migration-trace.md index 602f5154..1889617e 100644 --- a/ai-plan/public/cqrs-rewrite/traces/cqrs-rewrite-migration-trace.md +++ b/ai-plan/public/cqrs-rewrite/traces/cqrs-rewrite-migration-trace.md @@ -429,4 +429,38 @@ ### 当前下一步(RP-088) 1. 提交本轮 request invoker benchmark 后,继续扩展 `GFramework.Cqrs.Benchmarks`,优先评估 registration / service lifetime 或 stream generated provider + +### 阶段:stream invoker reflection / generated 对照(CQRS-REWRITE-RP-089) + +- 使用 `$gframework-batch-boot 30` 继续 `feat/cqrs-optimization` 的 CQRS 收口批次 +- 本轮基线选择: + - `origin/main c01abac0`,committer date `2026-05-06 09:40:08 +0800` + - `main a8c6c11e`,committer date `2026-05-05 13:14:24 +0800` +- 启动时 branch diff vs `origin/main` 为 `18` files / `2100` lines,低于 `30` 文件阈值,因此继续选择单模块、低风险 benchmark 切片 +- 复核 `GFramework.Cqrs.Benchmarks` 与 `ai-libs/Mediator/benchmarks` 后确认: + - `RP-088` 已把 generated descriptor 预热收益量化到 request dispatch 路径 + - stream benchmark 仍停留在 direct handler / reflection runtime / `MediatR` 三路对照,尚未量化 generated stream invoker provider 的收益 + - 虽然 `Mediator` 参考基准大量使用 service lifetime 矩阵,但当前 `GFramework.Cqrs.Benchmarks` 尚未建立对称的 scoped host 模式;直接扩 lifetime 会引入超出本批风险预算的宿主语义变化 +- 本轮因此优先选择 request 对称切片,而不是 service lifetime 扩展: + - 新增 `Messaging/StreamInvokerBenchmarks.cs` + - 新增 `Messaging/GeneratedStreamInvokerBenchmarkRegistry.cs` + - 更新 `GFramework.Cqrs.Benchmarks/README.md` +- 设计约束: + - 继续沿用 handwritten generated registry/provider 模式,避免把 benchmark 基础设施与真实 source-generator 输出耦合 + - 复用与 `RP-088` 相同的 dispatcher 缓存清理策略,确保 reflection / generated 路径对照不受静态缓存残留污染 + - 使用统一的异步枚举体工厂,让三组 stream handler 共享同一枚举成本基线,把变量收敛到 invoker/provider 接线路径 + +### 当前下一步(RP-089) + +1. 完成本轮 benchmark 项目 Release build、license header 检查与 diff 校验后,更新 active tracking 的权威验证列表 +2. 若 branch diff 仍明显低于 `30` 文件阈值,可继续评估 notification publish strategy 或更贴近 `Mediator` concrete runtime 的单批对照 + +### 验证(RP-089) + +- `dotnet build GFramework.Cqrs.Benchmarks/GFramework.Cqrs.Benchmarks.csproj -c Release` + - 结果:通过,`0 warning / 0 error` +- `GIT_DIR= GIT_WORK_TREE= python3 scripts/license-header.py --check` + - 结果:通过 +- `git diff --check` + - 结果:通过 2. 若要继续贴近 `Mediator` 的 comparison benchmark 设计哲学,评估是否把 `Mediator` concrete runtime 本身接入 benchmark 项目,而不是长期只保留 `MediatR`