diff --git a/GFramework.Cqrs.Benchmarks/Messaging/StreamingBenchmarks.cs b/GFramework.Cqrs.Benchmarks/Messaging/StreamingBenchmarks.cs new file mode 100644 index 00000000..658d7486 --- /dev/null +++ b/GFramework.Cqrs.Benchmarks/Messaging/StreamingBenchmarks.cs @@ -0,0 +1,183 @@ +// Copyright (c) 2025-2026 GeWuYou +// SPDX-License-Identifier: Apache-2.0 + +using BenchmarkDotNet.Attributes; +using BenchmarkDotNet.Columns; +using BenchmarkDotNet.Configs; +using BenchmarkDotNet.Diagnosers; +using BenchmarkDotNet.Jobs; +using BenchmarkDotNet.Order; +using System; +using System.Collections.Generic; +using System.Threading; +using System.Threading.Tasks; +using GFramework.Core.Abstractions.Logging; +using GFramework.Core.Ioc; +using GFramework.Core.Logging; +using GFramework.Cqrs.Abstractions.Cqrs; +using MediatR; +using Microsoft.Extensions.DependencyInjection; + +namespace GFramework.Cqrs.Benchmarks.Messaging; + +/// +/// 对比单个 stream request 在直接调用、GFramework.CQRS runtime 与 MediatR 之间的完整枚举开销。 +/// +[Config(typeof(Config))] +public class StreamingBenchmarks +{ + private MicrosoftDiContainer _container = null!; + private ICqrsRuntime _runtime = null!; + private ServiceProvider _serviceProvider = null!; + private IMediator _mediatr = null!; + private BenchmarkStreamHandler _baselineHandler = null!; + private BenchmarkStreamRequest _request = null!; + + /// + /// 配置 stream benchmark 的公共输出格式。 + /// + private sealed class Config : ManualConfig + { + public Config() + { + AddJob(Job.Default); + AddColumnProvider(DefaultColumnProviders.Instance); + AddColumn(new CustomColumn("Scenario", static (_, _) => "StreamRequest")); + AddDiagnoser(MemoryDiagnoser.Default); + WithOrderer(new DefaultOrderer(SummaryOrderPolicy.FastestToSlowest, MethodOrderPolicy.Declared)); + } + } + + /// + /// 构建 stream dispatch 所需的最小 runtime 宿主和对照对象。 + /// + [GlobalSetup] + public void Setup() + { + LoggerFactoryResolver.Provider = new ConsoleLoggerFactoryProvider + { + MinLevel = LogLevel.Fatal + }; + Fixture.Setup("StreamRequest", handlerCount: 1, pipelineCount: 0); + + _container = new MicrosoftDiContainer(); + _baselineHandler = new BenchmarkStreamHandler(); + + _container.RegisterTransient, BenchmarkStreamHandler>(); + _runtime = GFramework.Cqrs.CqrsRuntimeFactory.CreateRuntime( + _container, + LoggerFactoryResolver.Provider.CreateLogger(nameof(StreamingBenchmarks))); + + var services = new ServiceCollection(); + services.AddSingleton, BenchmarkStreamHandler>(); + services.AddMediatR(static options => options.RegisterServicesFromAssembly(typeof(StreamingBenchmarks).Assembly)); + _serviceProvider = services.BuildServiceProvider(); + _mediatr = _serviceProvider.GetRequiredService(); + + _request = new BenchmarkStreamRequest(Guid.NewGuid(), 3); + } + + /// + /// 释放 MediatR 对照组使用的 DI 宿主。 + /// + [GlobalCleanup] + public void Cleanup() + { + _serviceProvider.Dispose(); + } + + /// + /// 直接调用 handler 并完整枚举响应序列,作为 stream dispatch 额外开销的 baseline。 + /// + [Benchmark(Baseline = true)] + public async ValueTask Stream_Baseline() + { + await foreach (var response in _baselineHandler.Handle(_request, CancellationToken.None).ConfigureAwait(false)) + { + _ = response; + } + } + + /// + /// 通过 GFramework.CQRS runtime 创建并完整枚举 stream。 + /// + [Benchmark] + public async ValueTask Stream_GFrameworkCqrs() + { + await foreach (var response in _runtime.CreateStream(BenchmarkContext.Instance, _request, CancellationToken.None) + .ConfigureAwait(false)) + { + _ = response; + } + } + + /// + /// 通过 MediatR 创建并完整枚举 stream,作为外部设计对照。 + /// + [Benchmark] + public async ValueTask Stream_MediatR() + { + await foreach (var response in _mediatr.CreateStream(_request, CancellationToken.None).ConfigureAwait(false)) + { + _ = response; + } + } + + /// + /// Benchmark stream request。 + /// + /// 请求标识。 + /// 返回元素数量。 + public sealed record BenchmarkStreamRequest(Guid Id, int ItemCount) : + GFramework.Cqrs.Abstractions.Cqrs.IStreamRequest, + MediatR.IStreamRequest; + + /// + /// 复用 request benchmark 的响应结构,保持跨场景可比性。 + /// + /// 响应标识。 + public sealed record BenchmarkResponse(Guid Id); + + /// + /// 同时实现 GFramework.CQRS 与 MediatR 契约的最小 stream handler。 + /// + public sealed class BenchmarkStreamHandler : + GFramework.Cqrs.Abstractions.Cqrs.IStreamRequestHandler, + MediatR.IStreamRequestHandler + { + /// + /// 处理 GFramework.CQRS stream request。 + /// + public IAsyncEnumerable Handle( + BenchmarkStreamRequest request, + CancellationToken cancellationToken) + { + return EnumerateAsync(request, cancellationToken); + } + + /// + /// 处理 MediatR stream request。 + /// + IAsyncEnumerable MediatR.IStreamRequestHandler.Handle( + BenchmarkStreamRequest request, + CancellationToken cancellationToken) + { + return EnumerateAsync(request, cancellationToken); + } + + /// + /// 为 benchmark 构造稳定、低噪声的异步响应序列。 + /// + private static async IAsyncEnumerable EnumerateAsync( + BenchmarkStreamRequest request, + [System.Runtime.CompilerServices.EnumeratorCancellation] CancellationToken cancellationToken) + { + for (int index = 0; index < request.ItemCount; index++) + { + cancellationToken.ThrowIfCancellationRequested(); + yield return new BenchmarkResponse(request.Id); + await Task.CompletedTask.ConfigureAwait(false); + } + } + } +} diff --git a/GFramework.Cqrs.Benchmarks/README.md b/GFramework.Cqrs.Benchmarks/README.md index 3ea73f65..8d281a50 100644 --- a/GFramework.Cqrs.Benchmarks/README.md +++ b/GFramework.Cqrs.Benchmarks/README.md @@ -18,6 +18,8 @@ - direct handler、`GFramework.Cqrs` runtime 与 `MediatR` 的 request steady-state dispatch 对比 - `Messaging/NotificationBenchmarks.cs` - `GFramework.Cqrs` runtime 与 `MediatR` 的单处理器 notification publish 对比 +- `Messaging/StreamingBenchmarks.cs` + - direct handler、`GFramework.Cqrs` runtime 与 `MediatR` 的 stream request 完整枚举对比 ## 最小使用方式 @@ -31,5 +33,4 @@ dotnet run --project GFramework.Cqrs.Benchmarks/GFramework.Cqrs.Benchmarks.cspro - pipeline behavior 数量矩阵 - generated invoker provider 与纯反射 dispatch 对比 -- stream request benchmark - cold-start 与 registration 成本对比 diff --git a/ai-plan/public/cqrs-rewrite/todos/cqrs-rewrite-migration-tracking.md b/ai-plan/public/cqrs-rewrite/todos/cqrs-rewrite-migration-tracking.md index 17cf7319..40336c70 100644 --- a/ai-plan/public/cqrs-rewrite/todos/cqrs-rewrite-migration-tracking.md +++ b/ai-plan/public/cqrs-rewrite/todos/cqrs-rewrite-migration-tracking.md @@ -7,7 +7,7 @@ CQRS 迁移与收敛。 ## 当前恢复点 -- 恢复点编号:`CQRS-REWRITE-RP-084` +- 恢复点编号:`CQRS-REWRITE-RP-085` - 当前阶段:`Phase 8` - 当前 PR 锚点:`PR #323` - 当前结论: @@ -19,7 +19,8 @@ CQRS 迁移与收敛。 - `RP-081` 已继续补齐基础 generation gate 的 logging 与 DI runtime contract 缺失分支 - 当前 `RP-082` 已补齐基础 generation gate 的 request handler runtime contract 缺失分支 - `RP-083` 已补齐 mixed direct / reflected-implementation request 与 stream invoker provider 发射顺序回归 - - 当前 `RP-084` 已引入独立 `GFramework.Cqrs.Benchmarks` 项目,作为持续吸收 `Mediator` benchmark 组织方式的第一落点 + - `RP-084` 已引入独立 `GFramework.Cqrs.Benchmarks` 项目,作为持续吸收 `Mediator` benchmark 组织方式的第一落点 + - 当前 `RP-085` 已补齐 stream request benchmark,对齐 `Mediator` messaging benchmark 的第二个核心场景 - `ai-plan` active 入口现以 `PR #323` 和 `RP-082` 为唯一权威恢复锚点;`PR #307`、其他更早 PR 与阶段细节均以下方归档或说明为准 ## 当前活跃事实 @@ -51,6 +52,9 @@ CQRS 迁移与收敛。 - 结果:通过,`2/2` passed - `dotnet build GFramework.Cqrs.Benchmarks/GFramework.Cqrs.Benchmarks.csproj -c Release` - 结果:通过,`0 warning / 0 error` +- `dotnet build GFramework.Cqrs.Benchmarks/GFramework.Cqrs.Benchmarks.csproj -c Release` + - 结果:通过,`0 warning / 0 error` + - 备注:包含新增 `StreamingBenchmarks` 后再次复核通过 - `GIT_DIR= GIT_WORK_TREE= python3 scripts/license-header.py --check` - 结果:通过 - `git diff --check` @@ -103,7 +107,7 @@ CQRS 迁移与收敛。 ## 下一推荐步骤 1. 继续处理 `PR #323` 的剩余 review 收尾,优先保持 `ai-plan` active 入口与 trace 的单一锚点一致 -2. 若继续推进“吸收 Mediator 设计哲学”的切片,优先扩展 benchmark 场景矩阵到 pipeline、stream、cold-start 与 generated invoker provider 对照 +2. 若继续推进“吸收 Mediator 设计哲学”的切片,优先扩展 benchmark 场景矩阵到 request pipeline 数量矩阵、cold-start / initialization 与 generated invoker provider 对照 3. 在进入下一批 runtime / generator 收敛前,保持最小 Release build、targeted test 或 benchmark project build 作为权威验证 ## 活跃文档 diff --git a/ai-plan/public/cqrs-rewrite/traces/cqrs-rewrite-migration-trace.md b/ai-plan/public/cqrs-rewrite/traces/cqrs-rewrite-migration-trace.md index 5a8f4129..76680e79 100644 --- a/ai-plan/public/cqrs-rewrite/traces/cqrs-rewrite-migration-trace.md +++ b/ai-plan/public/cqrs-rewrite/traces/cqrs-rewrite-migration-trace.md @@ -296,3 +296,41 @@ 1. 继续扩展 `GFramework.Cqrs.Benchmarks`,优先补齐 pipeline、stream、cold-start 与 generated invoker provider 对照场景 2. 当后续有具体 runtime 优化切片时,用该 benchmark 项目验证是否真正吸收到了 `Mediator` 的低开销 dispatch 设计收益 + +### 阶段:stream request benchmark 对照(CQRS-REWRITE-RP-085) + +- 继续沿用 `$gframework-batch-boot 50`,当前 branch diff 相对 `origin/main` 仍明显低于阈值 +- 在 `RP-084` 已建立独立 benchmark 项目后,本轮优先补齐 `ai-libs/Mediator/benchmarks/Mediator.Benchmarks/Messaging/StreamingBenchmarks.cs` 对应的最小 stream 场景 +- 选择 stream 作为第二批 benchmark 的原因: + - 已有独立的 `CreateStream` runtime 路径和单独的 stream invoker provider 元数据契约 + - 与 `Mediator` 的 messaging benchmark 分层直接对应 + - 不需要像 pipeline / cold-start 那样先进一步澄清运行时或宿主边界 +- 本轮新增: + - `GFramework.Cqrs.Benchmarks/Messaging/StreamingBenchmarks.cs` + - `GFramework.Cqrs.Benchmarks/README.md` 中的 stream 场景说明 +- 设计约束: + - 保持与前一批一致的三路对照:`Baseline`、`GFramework.Cqrs`、`MediatR` + - 基准测量“完整枚举 3 个元素”的全量消费成本,而不是只测创建异步枚举器 + - 使用最小 `ICqrsContext` marker,继续避免把完整 `ArchitectureContext` 初始化成本混入 steady-state stream dispatch +- 结论: + - 当前 benchmark 项目已经覆盖 `Request`、`Notification`、`StreamRequest` 三个核心 messaging steady-state 场景 + - 下一批更适合转向 request pipeline 数量矩阵或 cold-start / initialization,而不是继续扩同层次的 messaging 基线 + +### 验证(RP-085) + +- `dotnet build GFramework.Cqrs.Benchmarks/GFramework.Cqrs.Benchmarks.csproj -c Release` + - 结果:通过,`0 warning / 0 error` +- `GIT_DIR= GIT_WORK_TREE= python3 scripts/license-header.py --check` + - 结果:通过 +- `git diff --check` + - 结果:通过 + +### 当前 stop-condition 度量(RP-085) + +- primary metric:branch diff files vs `origin/main` +- 当前说明:新增 stream benchmark 后仍处于 `50` 文件阈值以内,适合继续下一批 request pipeline 或 cold-start 场景 + +### 当前下一步(RP-085) + +1. 继续扩展 `GFramework.Cqrs.Benchmarks`,优先补齐 request pipeline 数量矩阵,随后再评估 cold-start / initialization +2. 当需要验证 generated invoker provider 的实际收益时,把 request benchmark 扩展为 reflection / generated provider 对照,而不是只停留在框架间对比