// Copyright (c) 2025-2026 GeWuYou // SPDX-License-Identifier: Apache-2.0 using BenchmarkDotNet.Attributes; using BenchmarkDotNet.Columns; using BenchmarkDotNet.Configs; using BenchmarkDotNet.Diagnosers; using BenchmarkDotNet.Jobs; using BenchmarkDotNet.Order; using System; using System.Collections.Generic; using System.Reflection; using System.Runtime.CompilerServices; using System.Threading; using System.Threading.Tasks; using GFramework.Core.Abstractions.Logging; using GFramework.Core.Ioc; using GFramework.Core.Logging; using GFramework.Cqrs.Abstractions.Cqrs; using MediatR; using Microsoft.Extensions.DependencyInjection; [assembly: GFramework.Cqrs.CqrsHandlerRegistryAttribute( typeof(GFramework.Cqrs.Benchmarks.Messaging.StreamPipelineBenchmarks.GeneratedStreamPipelineBenchmarkRegistry))] namespace GFramework.Cqrs.Benchmarks.Messaging; /// /// 对比不同 stream pipeline 行为数量下,单个 stream request 在直接调用、GFramework.CQRS runtime 与 MediatR 之间的 steady-state dispatch 开销。 /// /// /// 当前矩阵同时覆盖 0 / 1 / 4 个 stream pipeline 行为,以及 /// 两种观测口径, /// 以便把建流固定成本与完整枚举成本拆开观察。 /// [Config(typeof(Config))] public class StreamPipelineBenchmarks { private MicrosoftDiContainer _container = null!; private ICqrsRuntime _runtime = null!; private ServiceProvider _serviceProvider = null!; private IMediator _mediatr = null!; private BenchmarkStreamHandler _baselineHandler = null!; private BenchmarkStreamRequest _request = null!; /// /// 控制当前场景注册的 stream pipeline 行为数量,保持与 request pipeline benchmark 相同的 0 / 1 / 4 矩阵。 /// [Params(0, 1, 4)] public int PipelineCount { get; set; } /// /// 控制当前 benchmark 观察“只推进首个元素”还是“完整枚举整个 stream”。 /// [Params(StreamObservation.FirstItem, StreamObservation.DrainAll)] public StreamObservation Observation { get; set; } /// /// 用于拆分 stream dispatch 固定成本与后续枚举成本的观测模式。 /// public enum StreamObservation { /// /// 只推进到首个元素后立即释放枚举器。 /// FirstItem, /// /// 完整枚举整个 stream,保留原有 benchmark 语义。 /// DrainAll } /// /// 配置 stream pipeline benchmark 的公共输出格式。 /// private sealed class Config : ManualConfig { public Config() { AddJob(Job.Default); AddColumnProvider(DefaultColumnProviders.Instance); AddColumn(new CustomColumn("Scenario", static (_, _) => "StreamPipeline")); AddDiagnoser(MemoryDiagnoser.Default); WithOrderer(new DefaultOrderer(SummaryOrderPolicy.FastestToSlowest, MethodOrderPolicy.Declared)); } } /// /// 构建 stream pipeline dispatch 所需的最小 runtime 宿主和对照对象。 /// [GlobalSetup] public void Setup() { LoggerFactoryResolver.Provider = new ConsoleLoggerFactoryProvider { MinLevel = LogLevel.Fatal }; Fixture.Setup("StreamPipeline", handlerCount: 1, pipelineCount: PipelineCount); BenchmarkDispatcherCacheHelper.ClearDispatcherCaches(); _baselineHandler = new BenchmarkStreamHandler(); _container = BenchmarkHostFactory.CreateFrozenGFrameworkContainer(container => { BenchmarkHostFactory.RegisterGeneratedBenchmarkRegistry(container); RegisterGFrameworkPipelineBehaviors(container, PipelineCount); }); _runtime = GFramework.Cqrs.CqrsRuntimeFactory.CreateRuntime( _container, LoggerFactoryResolver.Provider.CreateLogger(nameof(StreamPipelineBenchmarks))); _serviceProvider = BenchmarkHostFactory.CreateMediatRServiceProvider( services => { RegisterMediatRStreamPipelineBehaviors(services, PipelineCount); }, typeof(StreamPipelineBenchmarks), static candidateType => candidateType == typeof(BenchmarkStreamHandler) || candidateType == typeof(BenchmarkStreamPipelineBehavior1) || candidateType == typeof(BenchmarkStreamPipelineBehavior2) || candidateType == typeof(BenchmarkStreamPipelineBehavior3) || candidateType == typeof(BenchmarkStreamPipelineBehavior4), ServiceLifetime.Singleton); _mediatr = _serviceProvider.GetRequiredService(); _request = new BenchmarkStreamRequest(Guid.NewGuid(), 3); } /// /// 释放 MediatR 对照组使用的 DI 宿主。 /// [GlobalCleanup] public void Cleanup() { try { BenchmarkCleanupHelper.DisposeAll(_container, _serviceProvider); } finally { BenchmarkDispatcherCacheHelper.ClearDispatcherCaches(); } } /// /// 直接调用 handler,并按当前观测模式消费响应序列,作为 stream pipeline 编排之外的基线。 /// /// 按当前观测模式完成 stream 消费后的等待句柄。 [Benchmark(Baseline = true)] public ValueTask Stream_Baseline() { return ObserveAsync(_baselineHandler.Handle(_request, CancellationToken.None), Observation); } /// /// 通过 GFramework.CQRS runtime 创建 stream,并按当前矩阵配置执行 stream pipeline。 /// /// 按当前观测模式完成 stream 消费后的等待句柄。 [Benchmark] public ValueTask Stream_GFrameworkCqrs() { return ObserveAsync( _runtime.CreateStream( BenchmarkContext.Instance, _request, CancellationToken.None), Observation); } /// /// 通过 MediatR 创建 stream,并按当前矩阵配置执行 stream pipeline,作为外部设计对照。 /// /// 按当前观测模式完成 stream 消费后的等待句柄。 [Benchmark] public ValueTask Stream_MediatR() { return ObserveAsync(_mediatr.CreateStream(_request, CancellationToken.None), Observation); } /// /// 按指定数量向 GFramework.CQRS 宿主注册最小 no-op stream pipeline 行为。 /// /// 当前 benchmark 使用的容器。 /// 要注册的行为数量。 /// 行为数量不在支持的矩阵内时抛出。 private static void RegisterGFrameworkPipelineBehaviors(MicrosoftDiContainer container, int pipelineCount) { ArgumentNullException.ThrowIfNull(container); switch (pipelineCount) { case 0: return; case 1: container.RegisterCqrsStreamPipelineBehavior(); return; case 4: container.RegisterCqrsStreamPipelineBehavior(); container.RegisterCqrsStreamPipelineBehavior(); container.RegisterCqrsStreamPipelineBehavior(); container.RegisterCqrsStreamPipelineBehavior(); return; default: throw new ArgumentOutOfRangeException(nameof(pipelineCount), pipelineCount, "Only the 0/1/4 pipeline matrix is supported."); } } /// /// 按指定数量向 MediatR 宿主注册最小 no-op stream pipeline 行为。 /// /// 当前 benchmark 使用的服务集合。 /// 要注册的行为数量。 /// 行为数量不在支持的矩阵内时抛出。 private static void RegisterMediatRStreamPipelineBehaviors(IServiceCollection services, int pipelineCount) { ArgumentNullException.ThrowIfNull(services); switch (pipelineCount) { case 0: return; case 1: services.AddSingleton, BenchmarkStreamPipelineBehavior1>(); return; case 4: services.AddSingleton, BenchmarkStreamPipelineBehavior1>(); services.AddSingleton, BenchmarkStreamPipelineBehavior2>(); services.AddSingleton, BenchmarkStreamPipelineBehavior3>(); services.AddSingleton, BenchmarkStreamPipelineBehavior4>(); return; default: throw new ArgumentOutOfRangeException(nameof(pipelineCount), pipelineCount, "Only the 0/1/4 pipeline matrix is supported."); } } /// /// 按观测模式消费 stream,便于把“建流/首个元素”和“完整枚举”分开观察。 /// /// 当前 stream 的响应类型。 /// 待观察的异步响应序列。 /// 当前 benchmark 选定的观测模式。 /// 异步消费完成后的等待句柄。 private static ValueTask ObserveAsync( IAsyncEnumerable responses, StreamObservation observation) { ArgumentNullException.ThrowIfNull(responses); return observation switch { StreamObservation.FirstItem => ConsumeFirstItemAsync(responses, CancellationToken.None), StreamObservation.DrainAll => DrainAsync(responses), _ => throw new ArgumentOutOfRangeException( nameof(observation), observation, "Unsupported stream observation mode.") }; } /// /// 只推进到首个元素后立即释放枚举器,用来近似隔离建流与首个 MoveNextAsync 的固定成本。 /// /// 当前 stream 的响应类型。 /// 待观察的异步响应序列。 /// 用于向异步枚举器传播取消的令牌。 /// 消费首个元素后的等待句柄。 private static async ValueTask ConsumeFirstItemAsync( IAsyncEnumerable responses, CancellationToken cancellationToken) { var enumerator = responses.GetAsyncEnumerator(cancellationToken); await using (enumerator.ConfigureAwait(false)) { if (await enumerator.MoveNextAsync().ConfigureAwait(false)) { _ = enumerator.Current; } } } /// /// 完整枚举整个 stream,保留原 benchmark 的总成本观测口径。 /// /// 当前 stream 的响应类型。 /// 待完整枚举的异步响应序列。 /// 完整枚举结束后的等待句柄。 private static async ValueTask DrainAsync(IAsyncEnumerable responses) { await foreach (var response in responses.ConfigureAwait(false)) { _ = response; } } /// /// Benchmark stream request。 /// /// 请求标识。 /// 返回元素数量。 public sealed record BenchmarkStreamRequest(Guid Id, int ItemCount) : GFramework.Cqrs.Abstractions.Cqrs.IStreamRequest, MediatR.IStreamRequest; /// /// 复用 stream benchmark 的响应结构,保持跨场景可比性。 /// /// 响应标识。 public sealed record BenchmarkResponse(Guid Id); /// /// 同时实现 GFramework.CQRS 与 MediatR 契约的最小 stream handler。 /// public sealed class BenchmarkStreamHandler : GFramework.Cqrs.Abstractions.Cqrs.IStreamRequestHandler, MediatR.IStreamRequestHandler { /// /// 处理 GFramework.CQRS stream request。 /// /// 当前 benchmark stream 请求。 /// 用于中断异步枚举的取消令牌。 /// 低噪声、可重复的异步响应序列。 public IAsyncEnumerable Handle( BenchmarkStreamRequest request, CancellationToken cancellationToken) { return EnumerateAsync(request, cancellationToken); } /// /// 处理 MediatR stream request。 /// /// 当前 benchmark stream 请求。 /// 用于中断异步枚举的取消令牌。 /// 低噪声、可重复的异步响应序列。 IAsyncEnumerable MediatR.IStreamRequestHandler.Handle( BenchmarkStreamRequest request, CancellationToken cancellationToken) { return EnumerateAsync(request, cancellationToken); } /// /// 为 benchmark 构造稳定、低噪声的异步响应序列。 /// /// 决定元素数量和标识的 benchmark 请求。 /// 用于中断异步枚举的取消令牌。 /// 按请求数量生成的响应序列。 private static async IAsyncEnumerable EnumerateAsync( BenchmarkStreamRequest request, [EnumeratorCancellation] CancellationToken cancellationToken) { for (int index = 0; index < request.ItemCount; index++) { cancellationToken.ThrowIfCancellationRequested(); yield return new BenchmarkResponse(request.Id); await Task.CompletedTask.ConfigureAwait(false); } } } /// /// 为 benchmark 提供统一的 no-op stream pipeline 行为实现,尽量把测量焦点保持在调度器与行为编排本身。 /// public abstract class BenchmarkStreamPipelineBehaviorBase : GFramework.Cqrs.Abstractions.Cqrs.IStreamPipelineBehavior, MediatR.IStreamPipelineBehavior { /// /// 透传 GFramework.CQRS stream pipeline,避免引入额外业务逻辑噪音。 /// /// 当前 benchmark stream 请求。 /// 继续向下执行的 stream pipeline 委托。 /// 取消令牌。 /// 下游 handler 产出的异步响应序列。 public IAsyncEnumerable Handle( BenchmarkStreamRequest message, GFramework.Cqrs.Abstractions.Cqrs.StreamMessageHandlerDelegate next, CancellationToken cancellationToken) { return next(message, cancellationToken); } /// /// 透传 MediatR stream pipeline,保持与 GFramework.CQRS 相同的 no-op 语义。 /// /// 当前 benchmark stream 请求。 /// 继续向下执行的 MediatR stream pipeline 委托。 /// 取消令牌。 /// 下游 handler 产出的异步响应序列。 IAsyncEnumerable MediatR.IStreamPipelineBehavior.Handle( BenchmarkStreamRequest request, MediatR.StreamHandlerDelegate next, CancellationToken cancellationToken) { _ = request; _ = cancellationToken; return next(); } } /// /// pipeline 矩阵中的第一个 no-op stream 行为。 /// public sealed class BenchmarkStreamPipelineBehavior1 : BenchmarkStreamPipelineBehaviorBase { } /// /// pipeline 矩阵中的第二个 no-op stream 行为。 /// public sealed class BenchmarkStreamPipelineBehavior2 : BenchmarkStreamPipelineBehaviorBase { } /// /// pipeline 矩阵中的第三个 no-op stream 行为。 /// public sealed class BenchmarkStreamPipelineBehavior3 : BenchmarkStreamPipelineBehaviorBase { } /// /// pipeline 矩阵中的第四个 no-op stream 行为。 /// public sealed class BenchmarkStreamPipelineBehavior4 : BenchmarkStreamPipelineBehaviorBase { } /// /// 为 stream pipeline benchmark 提供 handwritten generated registry, /// 让默认 pipeline 宿主也能走真实的 generated stream invoker provider 接线路径。 /// public sealed class GeneratedStreamPipelineBenchmarkRegistry : GFramework.Cqrs.ICqrsHandlerRegistry, GFramework.Cqrs.ICqrsStreamInvokerProvider, GFramework.Cqrs.IEnumeratesCqrsStreamInvokerDescriptors { private static readonly GFramework.Cqrs.CqrsStreamInvokerDescriptor Descriptor = new( typeof(GFramework.Cqrs.Abstractions.Cqrs.IStreamRequestHandler< BenchmarkStreamRequest, BenchmarkResponse>), typeof(GeneratedStreamPipelineBenchmarkRegistry).GetMethod( nameof(InvokeBenchmarkStreamHandler), BindingFlags.Public | BindingFlags.Static) ?? throw new InvalidOperationException("Missing generated stream pipeline benchmark method.")); private static readonly IReadOnlyList Descriptors = [ new GFramework.Cqrs.CqrsStreamInvokerDescriptorEntry( typeof(BenchmarkStreamRequest), typeof(BenchmarkResponse), Descriptor) ]; /// /// 把 stream pipeline benchmark handler 注册为单例,保持与当前矩阵宿主一致的生命周期语义。 /// /// 用于承载 generated handler 注册的服务集合。 /// 记录 generated registry 接线结果的日志器。 public void Register(IServiceCollection services, ILogger logger) { ArgumentNullException.ThrowIfNull(services); ArgumentNullException.ThrowIfNull(logger); services.AddSingleton( typeof(GFramework.Cqrs.Abstractions.Cqrs.IStreamRequestHandler), typeof(BenchmarkStreamHandler)); logger.Debug("Registered generated stream pipeline benchmark handler."); } /// /// 返回当前 provider 暴露的全部 generated stream invoker 描述符。 /// /// 当前 benchmark 的 generated stream invoker 描述符集合。 public IReadOnlyList GetDescriptors() { return Descriptors; } /// /// 为目标流式请求/响应类型对返回 generated stream invoker 描述符。 /// /// 要匹配的 stream 请求类型。 /// 要匹配的 stream 响应类型。 /// 命中时返回的 generated stream invoker 描述符。 /// 是否命中了当前 benchmark 的 stream 请求/响应类型对。 public bool TryGetDescriptor( Type requestType, Type responseType, out GFramework.Cqrs.CqrsStreamInvokerDescriptor? descriptor) { if (requestType == typeof(BenchmarkStreamRequest) && responseType == typeof(BenchmarkResponse)) { descriptor = Descriptor; return true; } descriptor = null; return false; } /// /// 模拟 generated stream invoker provider 为 stream pipeline benchmark 产出的开放静态调用入口。 /// /// 当前要调用的 stream handler 实例。 /// 当前要分发的 stream 请求实例。 /// 用于向 handler 传播的取消令牌。 /// handler 产出的异步响应序列。 public static object InvokeBenchmarkStreamHandler( object handler, object request, CancellationToken cancellationToken) { var typedHandler = (GFramework.Cqrs.Abstractions.Cqrs.IStreamRequestHandler< BenchmarkStreamRequest, BenchmarkResponse>)handler; var typedRequest = (BenchmarkStreamRequest)request; return typedHandler.Handle(typedRequest, cancellationToken); } } }