// Copyright (c) 2025-2026 GeWuYou // SPDX-License-Identifier: Apache-2.0 using BenchmarkDotNet.Attributes; using BenchmarkDotNet.Columns; using BenchmarkDotNet.Configs; using BenchmarkDotNet.Diagnosers; using BenchmarkDotNet.Jobs; using BenchmarkDotNet.Order; using System; using System.Collections.Generic; using System.Threading; using System.Threading.Tasks; using GFramework.Core.Abstractions.Logging; using GFramework.Core.Ioc; using GFramework.Core.Logging; using GFramework.Cqrs.Abstractions.Cqrs; using MediatR; using Microsoft.Extensions.DependencyInjection; namespace GFramework.Cqrs.Benchmarks.Messaging; /// /// 对比 stream 在不同 handler 生命周期与观测方式下的额外开销。 /// /// /// 当前矩阵只覆盖 `Singleton` 与 `Transient`。 /// `Scoped` 仍依赖真实的显式作用域边界;在当前“单根容器最小宿主”模型下直接加入 scoped 会把枚举宿主成本与生命周期成本混在一起, /// 因此保持与 request 生命周期矩阵相同的边界,留待后续 scoped host 基线具备后再扩展。 /// [Config(typeof(Config))] public class StreamLifetimeBenchmarks { private MicrosoftDiContainer _reflectionContainer = null!; private ICqrsRuntime _reflectionRuntime = null!; private MicrosoftDiContainer _generatedContainer = null!; private ICqrsRuntime _generatedRuntime = null!; private ServiceProvider _serviceProvider = null!; private IMediator _mediatr = null!; private ReflectionBenchmarkStreamHandler _baselineHandler = null!; private ReflectionBenchmarkStreamRequest _reflectionRequest = null!; private GeneratedBenchmarkStreamRequest _generatedRequest = null!; private MediatRBenchmarkStreamRequest _mediatrRequest = null!; /// /// 控制当前 benchmark 使用的 handler 生命周期。 /// [Params(HandlerLifetime.Singleton, HandlerLifetime.Transient)] public HandlerLifetime Lifetime { get; set; } /// /// 控制当前 benchmark 观察“只推进首个元素”还是“完整枚举整个 stream”。 /// [Params(StreamObservation.FirstItem, StreamObservation.DrainAll)] public StreamObservation Observation { get; set; } /// /// 可公平比较的 benchmark handler 生命周期集合。 /// public enum HandlerLifetime { /// /// 复用单个 handler 实例。 /// Singleton, /// /// 每次建流都重新解析新的 handler 实例。 /// Transient } /// /// 用于拆分 stream dispatch 与后续枚举成本的观测模式。 /// public enum StreamObservation { /// /// 只推进到首个元素后立即释放枚举器。 /// FirstItem, /// /// 完整枚举整个 stream,保留原有 benchmark 语义。 /// DrainAll } /// /// 配置 stream 生命周期 benchmark 的公共输出格式。 /// private sealed class Config : ManualConfig { public Config() { AddJob(Job.Default); AddColumnProvider(DefaultColumnProviders.Instance); AddColumn(new CustomColumn("Scenario", static (_, _) => "StreamLifetime")); AddDiagnoser(MemoryDiagnoser.Default); WithOrderer(new DefaultOrderer(SummaryOrderPolicy.FastestToSlowest, MethodOrderPolicy.Declared)); } } /// /// 构建当前生命周期下的 GFramework reflection、GFramework generated 与 MediatR stream 对照宿主。 /// [GlobalSetup] public void Setup() { LoggerFactoryResolver.Provider = new ConsoleLoggerFactoryProvider { MinLevel = LogLevel.Fatal }; Fixture.Setup($"StreamLifetime/{Lifetime}", handlerCount: 1, pipelineCount: 0); BenchmarkDispatcherCacheHelper.ClearDispatcherCaches(); _baselineHandler = new ReflectionBenchmarkStreamHandler(); _reflectionRequest = new ReflectionBenchmarkStreamRequest(Guid.NewGuid(), 3); _generatedRequest = new GeneratedBenchmarkStreamRequest(Guid.NewGuid(), 3); _mediatrRequest = new MediatRBenchmarkStreamRequest(Guid.NewGuid(), 3); _reflectionContainer = BenchmarkHostFactory.CreateFrozenGFrameworkContainer(container => { RegisterReflectionHandler(container, Lifetime); }); _reflectionRuntime = GFramework.Cqrs.CqrsRuntimeFactory.CreateRuntime( _reflectionContainer, LoggerFactoryResolver.Provider.CreateLogger(nameof(StreamLifetimeBenchmarks) + ".Reflection." + Lifetime)); _generatedContainer = BenchmarkHostFactory.CreateFrozenGFrameworkContainer(container => { BenchmarkHostFactory.RegisterGeneratedBenchmarkRegistry(container); RegisterGeneratedHandler(container, Lifetime); }); // 容器内已提前保留默认 runtime 以支撑 generated registry 接线; // 这里额外创建带生命周期后缀的 runtime,只是为了区分不同 benchmark 矩阵的 dispatcher 日志。 _generatedRuntime = GFramework.Cqrs.CqrsRuntimeFactory.CreateRuntime( _generatedContainer, LoggerFactoryResolver.Provider.CreateLogger(nameof(StreamLifetimeBenchmarks) + ".Generated." + Lifetime)); _serviceProvider = BenchmarkHostFactory.CreateMediatRServiceProvider( configure: null, typeof(StreamLifetimeBenchmarks), static candidateType => candidateType == typeof(MediatRBenchmarkStreamHandler), ResolveMediatRLifetime(Lifetime)); _mediatr = _serviceProvider.GetRequiredService(); } /// /// 释放当前生命周期矩阵持有的 benchmark 宿主资源,并清理 dispatcher 缓存。 /// [GlobalCleanup] public void Cleanup() { try { BenchmarkCleanupHelper.DisposeAll(_reflectionContainer, _generatedContainer, _serviceProvider); } finally { BenchmarkDispatcherCacheHelper.ClearDispatcherCaches(); } } /// /// 直接调用 handler,并按当前观测模式消费 stream,作为不同生命周期矩阵下的 dispatch 额外开销 baseline。 /// [Benchmark(Baseline = true)] public ValueTask Stream_Baseline() { return ObserveAsync(_baselineHandler.Handle(_reflectionRequest, CancellationToken.None), Observation); } /// /// 通过 GFramework.CQRS reflection stream binding 路径创建 stream,并按当前观测模式消费。 /// [Benchmark] public ValueTask Stream_GFrameworkReflection() { return ObserveAsync( _reflectionRuntime.CreateStream( BenchmarkContext.Instance, _reflectionRequest, CancellationToken.None), Observation); } /// /// 通过 generated stream invoker provider 预热后的 GFramework.CQRS runtime 创建 stream,并按当前观测模式消费。 /// [Benchmark] public ValueTask Stream_GFrameworkGenerated() { return ObserveAsync( _generatedRuntime.CreateStream( BenchmarkContext.Instance, _generatedRequest, CancellationToken.None), Observation); } /// /// 通过 MediatR 创建 stream,并按当前观测模式消费,作为外部对照。 /// [Benchmark] public ValueTask Stream_MediatR() { return ObserveAsync(_mediatr.CreateStream(_mediatrRequest, CancellationToken.None), Observation); } /// /// 按生命周期把 reflection benchmark stream handler 注册到 GFramework 容器。 /// /// 当前 benchmark 拥有并负责释放的容器。 /// 待比较的 handler 生命周期。 private static void RegisterReflectionHandler(MicrosoftDiContainer container, HandlerLifetime lifetime) { ArgumentNullException.ThrowIfNull(container); switch (lifetime) { case HandlerLifetime.Singleton: container.RegisterSingleton< GFramework.Cqrs.Abstractions.Cqrs.IStreamRequestHandler, ReflectionBenchmarkStreamHandler>(); return; case HandlerLifetime.Transient: container.RegisterTransient< GFramework.Cqrs.Abstractions.Cqrs.IStreamRequestHandler, ReflectionBenchmarkStreamHandler>(); return; default: throw new ArgumentOutOfRangeException(nameof(lifetime), lifetime, "Unsupported benchmark handler lifetime."); } } /// /// 按生命周期把 generated benchmark stream handler 注册到 GFramework 容器。 /// /// 当前 benchmark 拥有并负责释放的容器。 /// 待比较的 handler 生命周期。 /// /// generated registry 只负责暴露静态 descriptor; /// 生命周期矩阵仍由 benchmark 主体显式覆盖 handler 注册,避免把 descriptor 发现与实例解析混在一起。 /// private static void RegisterGeneratedHandler(MicrosoftDiContainer container, HandlerLifetime lifetime) { ArgumentNullException.ThrowIfNull(container); switch (lifetime) { case HandlerLifetime.Singleton: container.RegisterSingleton< GFramework.Cqrs.Abstractions.Cqrs.IStreamRequestHandler, GeneratedBenchmarkStreamHandler>(); return; case HandlerLifetime.Transient: container.RegisterTransient< GFramework.Cqrs.Abstractions.Cqrs.IStreamRequestHandler, GeneratedBenchmarkStreamHandler>(); return; default: throw new ArgumentOutOfRangeException(nameof(lifetime), lifetime, "Unsupported benchmark handler lifetime."); } } /// /// 将 benchmark 生命周期映射为 MediatR 组装所需的 。 /// /// 待比较的 handler 生命周期。 /// 当前生命周期对应的 MediatR 注册方式。 private static ServiceLifetime ResolveMediatRLifetime(HandlerLifetime lifetime) { return lifetime switch { HandlerLifetime.Singleton => ServiceLifetime.Singleton, HandlerLifetime.Transient => ServiceLifetime.Transient, _ => throw new ArgumentOutOfRangeException(nameof(lifetime), lifetime, "Unsupported benchmark handler lifetime.") }; } /// /// 按观测模式消费 stream,便于把“建流/首个元素”和“完整枚举”分开观察。 /// /// 当前 stream 的响应类型。 /// 待观察的异步响应序列。 /// 当前 benchmark 选定的观测模式。 /// 异步消费完成后的等待句柄。 private static ValueTask ObserveAsync( IAsyncEnumerable responses, StreamObservation observation) { ArgumentNullException.ThrowIfNull(responses); return observation switch { StreamObservation.FirstItem => ConsumeFirstItemAsync(responses), StreamObservation.DrainAll => DrainAsync(responses), _ => throw new ArgumentOutOfRangeException( nameof(observation), observation, "Unsupported stream observation mode.") }; } /// /// 只推进到首个元素后立即释放枚举器,用来近似隔离建流与首个 `MoveNextAsync` 的固定成本。 /// /// 当前 stream 的响应类型。 /// 待观察的异步响应序列。 /// 消费首个元素后的等待句柄。 private static async ValueTask ConsumeFirstItemAsync(IAsyncEnumerable responses) { var enumerator = responses.GetAsyncEnumerator(); await using (enumerator.ConfigureAwait(false)) { if (await enumerator.MoveNextAsync().ConfigureAwait(false)) { _ = enumerator.Current; } } } /// /// 完整枚举整个 stream,保留原 benchmark 的总成本观测口径。 /// /// 当前 stream 的响应类型。 /// 待完整枚举的异步响应序列。 /// 完整枚举结束后的等待句柄。 private static async ValueTask DrainAsync(IAsyncEnumerable responses) { await foreach (var response in responses.ConfigureAwait(false)) { _ = response; } } /// /// Reflection runtime stream request。 /// /// 请求标识。 /// 返回元素数量。 public sealed record ReflectionBenchmarkStreamRequest(Guid Id, int ItemCount) : GFramework.Cqrs.Abstractions.Cqrs.IStreamRequest; /// /// Reflection runtime stream response。 /// /// 响应标识。 public sealed record ReflectionBenchmarkResponse(Guid Id); /// /// Generated runtime stream request。 /// /// 请求标识。 /// 返回元素数量。 public sealed record GeneratedBenchmarkStreamRequest(Guid Id, int ItemCount) : GFramework.Cqrs.Abstractions.Cqrs.IStreamRequest; /// /// Generated runtime stream response。 /// /// 响应标识。 public sealed record GeneratedBenchmarkResponse(Guid Id); /// /// MediatR stream request。 /// /// 请求标识。 /// 返回元素数量。 public sealed record MediatRBenchmarkStreamRequest(Guid Id, int ItemCount) : MediatR.IStreamRequest; /// /// MediatR stream response。 /// /// 响应标识。 public sealed record MediatRBenchmarkResponse(Guid Id); /// /// Reflection runtime 的最小 stream request handler。 /// public sealed class ReflectionBenchmarkStreamHandler : GFramework.Cqrs.Abstractions.Cqrs.IStreamRequestHandler { /// /// 处理 reflection benchmark stream request。 /// /// 当前 reflection benchmark stream 请求。 /// 用于中断异步枚举的取消令牌。 /// 完整枚举所需的低噪声异步响应序列。 public IAsyncEnumerable Handle( ReflectionBenchmarkStreamRequest request, CancellationToken cancellationToken) { return EnumerateAsync( request.Id, request.ItemCount, static id => new ReflectionBenchmarkResponse(id), cancellationToken); } } /// /// Generated runtime 的最小 stream request handler。 /// public sealed class GeneratedBenchmarkStreamHandler : GFramework.Cqrs.Abstractions.Cqrs.IStreamRequestHandler { /// /// 处理 generated benchmark stream request。 /// /// 当前 generated benchmark stream 请求。 /// 用于中断异步枚举的取消令牌。 /// 完整枚举所需的低噪声异步响应序列。 public IAsyncEnumerable Handle( GeneratedBenchmarkStreamRequest request, CancellationToken cancellationToken) { return EnumerateAsync( request.Id, request.ItemCount, static id => new GeneratedBenchmarkResponse(id), cancellationToken); } } /// /// MediatR 对照组的最小 stream request handler。 /// public sealed class MediatRBenchmarkStreamHandler : MediatR.IStreamRequestHandler { /// /// 处理 MediatR benchmark stream request。 /// /// 当前 MediatR benchmark stream 请求。 /// 用于中断异步枚举的取消令牌。 /// 完整枚举所需的低噪声异步响应序列。 public IAsyncEnumerable Handle( MediatRBenchmarkStreamRequest request, CancellationToken cancellationToken) { return EnumerateAsync( request.Id, request.ItemCount, static id => new MediatRBenchmarkResponse(id), cancellationToken); } } /// /// 为生命周期矩阵构造相同形状的低噪声异步枚举,避免不同口径的枚举体差异干扰 dispatch 对照。 /// private static async IAsyncEnumerable EnumerateAsync( Guid id, int itemCount, Func responseFactory, [System.Runtime.CompilerServices.EnumeratorCancellation] CancellationToken cancellationToken) { for (var index = 0; index < itemCount; index++) { cancellationToken.ThrowIfCancellationRequested(); yield return responseFactory(id); await Task.CompletedTask.ConfigureAwait(false); } } }