diff --git a/GFramework.Cqrs.Benchmarks/Messaging/StreamPipelineBenchmarks.cs b/GFramework.Cqrs.Benchmarks/Messaging/StreamPipelineBenchmarks.cs new file mode 100644 index 00000000..db62d513 --- /dev/null +++ b/GFramework.Cqrs.Benchmarks/Messaging/StreamPipelineBenchmarks.cs @@ -0,0 +1,524 @@ +// Copyright (c) 2025-2026 GeWuYou +// SPDX-License-Identifier: Apache-2.0 + +using BenchmarkDotNet.Attributes; +using BenchmarkDotNet.Columns; +using BenchmarkDotNet.Configs; +using BenchmarkDotNet.Diagnosers; +using BenchmarkDotNet.Jobs; +using BenchmarkDotNet.Order; +using System; +using System.Collections.Generic; +using System.Reflection; +using System.Runtime.CompilerServices; +using System.Threading; +using System.Threading.Tasks; +using GFramework.Core.Abstractions.Logging; +using GFramework.Core.Ioc; +using GFramework.Core.Logging; +using GFramework.Cqrs.Abstractions.Cqrs; +using MediatR; +using Microsoft.Extensions.DependencyInjection; + +[assembly: GFramework.Cqrs.CqrsHandlerRegistryAttribute( + typeof(GFramework.Cqrs.Benchmarks.Messaging.StreamPipelineBenchmarks.GeneratedStreamPipelineBenchmarkRegistry))] + +namespace GFramework.Cqrs.Benchmarks.Messaging; + +/// +/// 对比不同 stream pipeline 行为数量下,单个 stream request 在直接调用、GFramework.CQRS runtime 与 MediatR 之间的 steady-state dispatch 开销。 +/// +/// +/// 当前矩阵同时覆盖 0 / 1 / 4 个 stream pipeline 行为,以及 +/// 两种观测口径, +/// 以便把建流固定成本与完整枚举成本拆开观察。 +/// +[Config(typeof(Config))] +public class StreamPipelineBenchmarks +{ + private MicrosoftDiContainer _container = null!; + private ICqrsRuntime _runtime = null!; + private ServiceProvider _serviceProvider = null!; + private IMediator _mediatr = null!; + private BenchmarkStreamHandler _baselineHandler = null!; + private BenchmarkStreamRequest _request = null!; + + /// + /// 控制当前场景注册的 stream pipeline 行为数量,保持与 request pipeline benchmark 相同的 0 / 1 / 4 矩阵。 + /// + [Params(0, 1, 4)] + public int PipelineCount { get; set; } + + /// + /// 控制当前 benchmark 观察“只推进首个元素”还是“完整枚举整个 stream”。 + /// + [Params(StreamObservation.FirstItem, StreamObservation.DrainAll)] + public StreamObservation Observation { get; set; } + + /// + /// 用于拆分 stream dispatch 固定成本与后续枚举成本的观测模式。 + /// + public enum StreamObservation + { + /// + /// 只推进到首个元素后立即释放枚举器。 + /// + FirstItem, + + /// + /// 完整枚举整个 stream,保留原有 benchmark 语义。 + /// + DrainAll + } + + /// + /// 配置 stream pipeline benchmark 的公共输出格式。 + /// + private sealed class Config : ManualConfig + { + public Config() + { + AddJob(Job.Default); + AddColumnProvider(DefaultColumnProviders.Instance); + AddColumn(new CustomColumn("Scenario", static (_, _) => "StreamPipeline")); + AddDiagnoser(MemoryDiagnoser.Default); + WithOrderer(new DefaultOrderer(SummaryOrderPolicy.FastestToSlowest, MethodOrderPolicy.Declared)); + } + } + + /// + /// 构建 stream pipeline dispatch 所需的最小 runtime 宿主和对照对象。 + /// + [GlobalSetup] + public void Setup() + { + LoggerFactoryResolver.Provider = new ConsoleLoggerFactoryProvider + { + MinLevel = LogLevel.Fatal + }; + Fixture.Setup("StreamPipeline", handlerCount: 1, pipelineCount: PipelineCount); + BenchmarkDispatcherCacheHelper.ClearDispatcherCaches(); + + _baselineHandler = new BenchmarkStreamHandler(); + _container = BenchmarkHostFactory.CreateFrozenGFrameworkContainer(container => + { + BenchmarkHostFactory.RegisterGeneratedBenchmarkRegistry(container); + RegisterGFrameworkPipelineBehaviors(container, PipelineCount); + }); + _runtime = GFramework.Cqrs.CqrsRuntimeFactory.CreateRuntime( + _container, + LoggerFactoryResolver.Provider.CreateLogger(nameof(StreamPipelineBenchmarks))); + + _serviceProvider = BenchmarkHostFactory.CreateMediatRServiceProvider( + services => + { + RegisterMediatRStreamPipelineBehaviors(services, PipelineCount); + }, + typeof(StreamPipelineBenchmarks), + static candidateType => + candidateType == typeof(BenchmarkStreamHandler) || + candidateType == typeof(BenchmarkStreamPipelineBehavior1) || + candidateType == typeof(BenchmarkStreamPipelineBehavior2) || + candidateType == typeof(BenchmarkStreamPipelineBehavior3) || + candidateType == typeof(BenchmarkStreamPipelineBehavior4), + ServiceLifetime.Singleton); + _mediatr = _serviceProvider.GetRequiredService(); + + _request = new BenchmarkStreamRequest(Guid.NewGuid(), 3); + } + + /// + /// 释放 MediatR 对照组使用的 DI 宿主。 + /// + [GlobalCleanup] + public void Cleanup() + { + try + { + BenchmarkCleanupHelper.DisposeAll(_container, _serviceProvider); + } + finally + { + BenchmarkDispatcherCacheHelper.ClearDispatcherCaches(); + } + } + + /// + /// 直接调用 handler,并按当前观测模式消费响应序列,作为 stream pipeline 编排之外的基线。 + /// + [Benchmark(Baseline = true)] + public ValueTask Stream_Baseline() + { + return ObserveAsync(_baselineHandler.Handle(_request, CancellationToken.None), Observation); + } + + /// + /// 通过 GFramework.CQRS runtime 创建 stream,并按当前矩阵配置执行 stream pipeline。 + /// + [Benchmark] + public ValueTask Stream_GFrameworkCqrs() + { + return ObserveAsync( + _runtime.CreateStream( + BenchmarkContext.Instance, + _request, + CancellationToken.None), + Observation); + } + + /// + /// 通过 MediatR 创建 stream,并按当前矩阵配置执行 stream pipeline,作为外部设计对照。 + /// + [Benchmark] + public ValueTask Stream_MediatR() + { + return ObserveAsync(_mediatr.CreateStream(_request, CancellationToken.None), Observation); + } + + /// + /// 按指定数量向 GFramework.CQRS 宿主注册最小 no-op stream pipeline 行为。 + /// + /// 当前 benchmark 使用的容器。 + /// 要注册的行为数量。 + /// 行为数量不在支持的矩阵内时抛出。 + private static void RegisterGFrameworkPipelineBehaviors(MicrosoftDiContainer container, int pipelineCount) + { + ArgumentNullException.ThrowIfNull(container); + + switch (pipelineCount) + { + case 0: + return; + case 1: + container.RegisterCqrsStreamPipelineBehavior(); + return; + case 4: + container.RegisterCqrsStreamPipelineBehavior(); + container.RegisterCqrsStreamPipelineBehavior(); + container.RegisterCqrsStreamPipelineBehavior(); + container.RegisterCqrsStreamPipelineBehavior(); + return; + default: + throw new ArgumentOutOfRangeException(nameof(pipelineCount), pipelineCount, + "Only the 0/1/4 pipeline matrix is supported."); + } + } + + /// + /// 按指定数量向 MediatR 宿主注册最小 no-op stream pipeline 行为。 + /// + /// 当前 benchmark 使用的服务集合。 + /// 要注册的行为数量。 + /// 行为数量不在支持的矩阵内时抛出。 + private static void RegisterMediatRStreamPipelineBehaviors(IServiceCollection services, int pipelineCount) + { + ArgumentNullException.ThrowIfNull(services); + + switch (pipelineCount) + { + case 0: + return; + case 1: + services.AddSingleton, BenchmarkStreamPipelineBehavior1>(); + return; + case 4: + services.AddSingleton, BenchmarkStreamPipelineBehavior1>(); + services.AddSingleton, BenchmarkStreamPipelineBehavior2>(); + services.AddSingleton, BenchmarkStreamPipelineBehavior3>(); + services.AddSingleton, BenchmarkStreamPipelineBehavior4>(); + return; + default: + throw new ArgumentOutOfRangeException(nameof(pipelineCount), pipelineCount, + "Only the 0/1/4 pipeline matrix is supported."); + } + } + + /// + /// 按观测模式消费 stream,便于把“建流/首个元素”和“完整枚举”分开观察。 + /// + /// 当前 stream 的响应类型。 + /// 待观察的异步响应序列。 + /// 当前 benchmark 选定的观测模式。 + /// 异步消费完成后的等待句柄。 + private static ValueTask ObserveAsync( + IAsyncEnumerable responses, + StreamObservation observation) + { + ArgumentNullException.ThrowIfNull(responses); + + return observation switch + { + StreamObservation.FirstItem => ConsumeFirstItemAsync(responses, CancellationToken.None), + StreamObservation.DrainAll => DrainAsync(responses), + _ => throw new ArgumentOutOfRangeException( + nameof(observation), + observation, + "Unsupported stream observation mode.") + }; + } + + /// + /// 只推进到首个元素后立即释放枚举器,用来近似隔离建流与首个 MoveNextAsync 的固定成本。 + /// + /// 当前 stream 的响应类型。 + /// 待观察的异步响应序列。 + /// 用于向异步枚举器传播取消的令牌。 + /// 消费首个元素后的等待句柄。 + private static async ValueTask ConsumeFirstItemAsync( + IAsyncEnumerable responses, + CancellationToken cancellationToken) + { + var enumerator = responses.GetAsyncEnumerator(cancellationToken); + await using (enumerator.ConfigureAwait(false)) + { + if (await enumerator.MoveNextAsync().ConfigureAwait(false)) + { + _ = enumerator.Current; + } + } + } + + /// + /// 完整枚举整个 stream,保留原 benchmark 的总成本观测口径。 + /// + /// 当前 stream 的响应类型。 + /// 待完整枚举的异步响应序列。 + /// 完整枚举结束后的等待句柄。 + private static async ValueTask DrainAsync(IAsyncEnumerable responses) + { + await foreach (var response in responses.ConfigureAwait(false)) + { + _ = response; + } + } + + /// + /// Benchmark stream request。 + /// + /// 请求标识。 + /// 返回元素数量。 + public sealed record BenchmarkStreamRequest(Guid Id, int ItemCount) : + GFramework.Cqrs.Abstractions.Cqrs.IStreamRequest, + MediatR.IStreamRequest; + + /// + /// 复用 stream benchmark 的响应结构,保持跨场景可比性。 + /// + /// 响应标识。 + public sealed record BenchmarkResponse(Guid Id); + + /// + /// 同时实现 GFramework.CQRS 与 MediatR 契约的最小 stream handler。 + /// + public sealed class BenchmarkStreamHandler : + GFramework.Cqrs.Abstractions.Cqrs.IStreamRequestHandler, + MediatR.IStreamRequestHandler + { + /// + /// 处理 GFramework.CQRS stream request。 + /// + /// 当前 benchmark stream 请求。 + /// 用于中断异步枚举的取消令牌。 + /// 低噪声、可重复的异步响应序列。 + public IAsyncEnumerable Handle( + BenchmarkStreamRequest request, + CancellationToken cancellationToken) + { + return EnumerateAsync(request, cancellationToken); + } + + /// + /// 处理 MediatR stream request。 + /// + /// 当前 benchmark stream 请求。 + /// 用于中断异步枚举的取消令牌。 + /// 低噪声、可重复的异步响应序列。 + IAsyncEnumerable MediatR.IStreamRequestHandler.Handle( + BenchmarkStreamRequest request, + CancellationToken cancellationToken) + { + return EnumerateAsync(request, cancellationToken); + } + + /// + /// 为 benchmark 构造稳定、低噪声的异步响应序列。 + /// + /// 决定元素数量和标识的 benchmark 请求。 + /// 用于中断异步枚举的取消令牌。 + /// 按请求数量生成的响应序列。 + private static async IAsyncEnumerable EnumerateAsync( + BenchmarkStreamRequest request, + [EnumeratorCancellation] CancellationToken cancellationToken) + { + for (int index = 0; index < request.ItemCount; index++) + { + cancellationToken.ThrowIfCancellationRequested(); + yield return new BenchmarkResponse(request.Id); + await Task.CompletedTask.ConfigureAwait(false); + } + } + } + + /// + /// 为 benchmark 提供统一的 no-op stream pipeline 行为实现,尽量把测量焦点保持在调度器与行为编排本身。 + /// + public abstract class BenchmarkStreamPipelineBehaviorBase : + GFramework.Cqrs.Abstractions.Cqrs.IStreamPipelineBehavior, + MediatR.IStreamPipelineBehavior + { + /// + /// 透传 GFramework.CQRS stream pipeline,避免引入额外业务逻辑噪音。 + /// + /// 当前 benchmark stream 请求。 + /// 继续向下执行的 stream pipeline 委托。 + /// 取消令牌。 + /// 下游 handler 产出的异步响应序列。 + public IAsyncEnumerable Handle( + BenchmarkStreamRequest message, + GFramework.Cqrs.Abstractions.Cqrs.StreamMessageHandlerDelegate next, + CancellationToken cancellationToken) + { + return next(message, cancellationToken); + } + + /// + /// 透传 MediatR stream pipeline,保持与 GFramework.CQRS 相同的 no-op 语义。 + /// + /// 当前 benchmark stream 请求。 + /// 继续向下执行的 MediatR stream pipeline 委托。 + /// 取消令牌。 + /// 下游 handler 产出的异步响应序列。 + IAsyncEnumerable MediatR.IStreamPipelineBehavior.Handle( + BenchmarkStreamRequest request, + MediatR.StreamHandlerDelegate next, + CancellationToken cancellationToken) + { + _ = request; + _ = cancellationToken; + return next(); + } + } + + /// + /// pipeline 矩阵中的第一个 no-op stream 行为。 + /// + public sealed class BenchmarkStreamPipelineBehavior1 : BenchmarkStreamPipelineBehaviorBase + { + } + + /// + /// pipeline 矩阵中的第二个 no-op stream 行为。 + /// + public sealed class BenchmarkStreamPipelineBehavior2 : BenchmarkStreamPipelineBehaviorBase + { + } + + /// + /// pipeline 矩阵中的第三个 no-op stream 行为。 + /// + public sealed class BenchmarkStreamPipelineBehavior3 : BenchmarkStreamPipelineBehaviorBase + { + } + + /// + /// pipeline 矩阵中的第四个 no-op stream 行为。 + /// + public sealed class BenchmarkStreamPipelineBehavior4 : BenchmarkStreamPipelineBehaviorBase + { + } + + /// + /// 为 stream pipeline benchmark 提供 handwritten generated registry, + /// 让默认 pipeline 宿主也能走真实的 generated stream invoker provider 接线路径。 + /// + public sealed class GeneratedStreamPipelineBenchmarkRegistry : + GFramework.Cqrs.ICqrsHandlerRegistry, + GFramework.Cqrs.ICqrsStreamInvokerProvider, + GFramework.Cqrs.IEnumeratesCqrsStreamInvokerDescriptors + { + private static readonly GFramework.Cqrs.CqrsStreamInvokerDescriptor Descriptor = + new( + typeof(GFramework.Cqrs.Abstractions.Cqrs.IStreamRequestHandler< + BenchmarkStreamRequest, + BenchmarkResponse>), + typeof(GeneratedStreamPipelineBenchmarkRegistry).GetMethod( + nameof(InvokeBenchmarkStreamHandler), + BindingFlags.Public | BindingFlags.Static) + ?? throw new InvalidOperationException("Missing generated stream pipeline benchmark method.")); + + private static readonly IReadOnlyList Descriptors = + [ + new GFramework.Cqrs.CqrsStreamInvokerDescriptorEntry( + typeof(BenchmarkStreamRequest), + typeof(BenchmarkResponse), + Descriptor) + ]; + + /// + /// 把 stream pipeline benchmark handler 注册为单例,保持与当前矩阵宿主一致的生命周期语义。 + /// + /// 用于承载 generated handler 注册的服务集合。 + /// 记录 generated registry 接线结果的日志器。 + public void Register(IServiceCollection services, ILogger logger) + { + ArgumentNullException.ThrowIfNull(services); + ArgumentNullException.ThrowIfNull(logger); + + services.AddSingleton( + typeof(GFramework.Cqrs.Abstractions.Cqrs.IStreamRequestHandler), + typeof(BenchmarkStreamHandler)); + logger.Debug("Registered generated stream pipeline benchmark handler."); + } + + /// + /// 返回当前 provider 暴露的全部 generated stream invoker 描述符。 + /// + /// 当前 benchmark 的 generated stream invoker 描述符集合。 + public IReadOnlyList GetDescriptors() + { + return Descriptors; + } + + /// + /// 为目标流式请求/响应类型对返回 generated stream invoker 描述符。 + /// + /// 要匹配的 stream 请求类型。 + /// 要匹配的 stream 响应类型。 + /// 命中时返回的 generated stream invoker 描述符。 + /// 是否命中了当前 benchmark 的 stream 请求/响应类型对。 + public bool TryGetDescriptor( + Type requestType, + Type responseType, + out GFramework.Cqrs.CqrsStreamInvokerDescriptor? descriptor) + { + if (requestType == typeof(BenchmarkStreamRequest) && + responseType == typeof(BenchmarkResponse)) + { + descriptor = Descriptor; + return true; + } + + descriptor = null; + return false; + } + + /// + /// 模拟 generated stream invoker provider 为 stream pipeline benchmark 产出的开放静态调用入口。 + /// + /// 当前要调用的 stream handler 实例。 + /// 当前要分发的 stream 请求实例。 + /// 用于向 handler 传播的取消令牌。 + /// handler 产出的异步响应序列。 + public static object InvokeBenchmarkStreamHandler( + object handler, + object request, + CancellationToken cancellationToken) + { + var typedHandler = (GFramework.Cqrs.Abstractions.Cqrs.IStreamRequestHandler< + BenchmarkStreamRequest, + BenchmarkResponse>)handler; + var typedRequest = (BenchmarkStreamRequest)request; + return typedHandler.Handle(typedRequest, cancellationToken); + } + } +} diff --git a/GFramework.Cqrs.Benchmarks/README.md b/GFramework.Cqrs.Benchmarks/README.md index 69038b45..9466c9bd 100644 --- a/GFramework.Cqrs.Benchmarks/README.md +++ b/GFramework.Cqrs.Benchmarks/README.md @@ -34,6 +34,9 @@ - `Messaging/StreamInvokerBenchmarks.cs` - baseline、`GFramework.Cqrs` reflection stream binding、`GFramework.Cqrs` generated stream invoker、`MediatR` - 同时提供 `FirstItem` 与 `DrainAll` 两种观测口径 + - `Messaging/StreamPipelineBenchmarks.cs` + - `0 / 1 / 4` 个 stream pipeline 行为下,baseline、默认 generated-provider 宿主接线的 `GFramework.Cqrs` runtime 与 `MediatR` + - 同时提供 `FirstItem` 与 `DrainAll` 两种观测口径 - stream startup - `Messaging/StreamStartupBenchmarks.cs` - `Initialization` 与 `ColdStart` 两组下,`GFramework.Cqrs` reflection、`GFramework.Cqrs` generated、`MediatR` @@ -68,6 +71,7 @@ dotnet run --project GFramework.Cqrs.Benchmarks/GFramework.Cqrs.Benchmarks.cspro ```bash dotnet run --project GFramework.Cqrs.Benchmarks/GFramework.Cqrs.Benchmarks.csproj -c Release --no-build -- --filter "*RequestLifetimeBenchmarks.SendRequest_*" dotnet run --project GFramework.Cqrs.Benchmarks/GFramework.Cqrs.Benchmarks.csproj -c Release --no-build -- --filter "*StreamLifetimeBenchmarks.Stream_*" +dotnet run --project GFramework.Cqrs.Benchmarks/GFramework.Cqrs.Benchmarks.csproj -c Release --no-build -- --filter "*StreamPipelineBenchmarks.Stream_*" ``` ## 并发运行约束 @@ -87,7 +91,7 @@ dotnet run --project GFramework.Cqrs.Benchmarks/GFramework.Cqrs.Benchmarks.cspro - `RequestLifetimeBenchmarks` 的 `Scoped` 场景会在每次 request 分发时显式创建并释放真实 DI 作用域;它观察的是 scoped handler 的解析与 dispatch 成本,不把 runtime 构造常量成本混入生命周期对照 - `NotificationLifetimeBenchmarks` 的 `Scoped` 场景也采用真实 DI 作用域;它比较的是 publish 路径上的生命周期额外开销,不是根容器解析退化后的近似值 -- `StreamingBenchmarks`、`StreamLifetimeBenchmarks`、`StreamInvokerBenchmarks` 同时暴露 `FirstItem` 与 `DrainAll` +- `StreamingBenchmarks`、`StreamLifetimeBenchmarks`、`StreamInvokerBenchmarks`、`StreamPipelineBenchmarks` 同时暴露 `FirstItem` 与 `DrainAll` - `FirstItem` 适合观察“建流到首个元素”的固定成本 - `DrainAll` 适合观察完整枚举整个 stream 的总成本 - `StreamStartupBenchmarks` 的 `ColdStart` 只推进到首个元素,因此它回答的是“新宿主下首次建流命中”的边界,不回答完整枚举总成本 @@ -99,4 +103,3 @@ dotnet run --project GFramework.Cqrs.Benchmarks/GFramework.Cqrs.Benchmarks.cspro - 当前没有 stream 版的 NuGet `Mediator` source-generated concrete path 对照;stream steady-state、lifetime、startup 现在都只覆盖 `GFramework.Cqrs` 与 `MediatR` - 当前没有 request 生命周期下的 NuGet `Mediator` compile-time lifetime 矩阵;`RequestLifetimeBenchmarks` 只覆盖 `GFramework.Cqrs` 与 `MediatR` - 当前没有 notification fan-out 的生命周期矩阵;`NotificationFanOutBenchmarks` 只覆盖固定 `4 handler` 的已装配宿主 -- 当前没有 stream pipeline benchmark;现有 pipeline coverage 仅限 request