// Copyright (c) 2025-2026 GeWuYou // SPDX-License-Identifier: Apache-2.0 using BenchmarkDotNet.Attributes; using BenchmarkDotNet.Columns; using BenchmarkDotNet.Configs; using BenchmarkDotNet.Diagnosers; using BenchmarkDotNet.Jobs; using BenchmarkDotNet.Order; using System; using System.Collections.Generic; using System.Threading; using System.Threading.Tasks; using GFramework.Core.Abstractions.Logging; using GFramework.Core.Ioc; using GFramework.Core.Logging; using GFramework.Cqrs.Abstractions.Cqrs; using MediatR; using Microsoft.Extensions.DependencyInjection; [assembly: GFramework.Cqrs.CqrsHandlerRegistryAttribute( typeof(GFramework.Cqrs.Benchmarks.Messaging.GeneratedDefaultStreamingBenchmarkRegistry))] namespace GFramework.Cqrs.Benchmarks.Messaging; /// /// 对比单个 stream request 在直接调用、GFramework.CQRS runtime 与 MediatR 之间的 steady-state stream 开销。 /// /// /// 默认 generated-provider stream 宿主同时暴露 与 /// 两种观测口径, /// 以便把“建流到首个元素”的固定成本与“完整枚举整个 stream”的总成本拆开观察。 /// [Config(typeof(Config))] public class StreamingBenchmarks { private MicrosoftDiContainer _container = null!; private ICqrsRuntime _runtime = null!; private ServiceProvider _serviceProvider = null!; private IMediator _mediatr = null!; private BenchmarkStreamHandler _baselineHandler = null!; private BenchmarkStreamRequest _request = null!; /// /// 控制当前 benchmark 观察“只推进首个元素”还是“完整枚举整个 stream”。 /// [Params(StreamObservation.FirstItem, StreamObservation.DrainAll)] public StreamObservation Observation { get; set; } /// /// 用于拆分 stream dispatch 与后续枚举成本的观测模式。 /// public enum StreamObservation { /// /// 只推进到首个元素后立即释放枚举器。 /// FirstItem, /// /// 完整枚举整个 stream,保留原有 benchmark 语义。 /// DrainAll } /// /// 配置 stream benchmark 的公共输出格式。 /// private sealed class Config : ManualConfig { public Config() { AddJob(Job.Default); AddColumnProvider(DefaultColumnProviders.Instance); AddColumn(new CustomColumn("Scenario", static (_, _) => "StreamRequest")); AddDiagnoser(MemoryDiagnoser.Default); WithOrderer(new DefaultOrderer(SummaryOrderPolicy.FastestToSlowest, MethodOrderPolicy.Declared)); } } /// /// 构建 stream dispatch 所需的最小 runtime 宿主和对照对象。 /// [GlobalSetup] public void Setup() { LoggerFactoryResolver.Provider = new ConsoleLoggerFactoryProvider { MinLevel = LogLevel.Fatal }; Fixture.Setup("StreamRequest", handlerCount: 1, pipelineCount: 0); BenchmarkDispatcherCacheHelper.ClearDispatcherCaches(); _baselineHandler = new BenchmarkStreamHandler(); _container = BenchmarkHostFactory.CreateFrozenGFrameworkContainer(container => { BenchmarkHostFactory.RegisterGeneratedBenchmarkRegistry(container); }); _runtime = GFramework.Cqrs.CqrsRuntimeFactory.CreateRuntime( _container, LoggerFactoryResolver.Provider.CreateLogger(nameof(StreamingBenchmarks))); _serviceProvider = BenchmarkHostFactory.CreateMediatRServiceProvider( configure: null, typeof(StreamingBenchmarks), static candidateType => candidateType == typeof(BenchmarkStreamHandler), ServiceLifetime.Singleton); _mediatr = _serviceProvider.GetRequiredService(); _request = new BenchmarkStreamRequest(Guid.NewGuid(), 3); } /// /// 释放 MediatR 对照组使用的 DI 宿主。 /// [GlobalCleanup] public void Cleanup() { try { BenchmarkCleanupHelper.DisposeAll(_container, _serviceProvider); } finally { BenchmarkDispatcherCacheHelper.ClearDispatcherCaches(); } } /// /// 直接调用 handler,并按当前观测模式消费响应序列,作为 stream dispatch 额外开销的 baseline。 /// [Benchmark(Baseline = true)] public ValueTask Stream_Baseline() { return ObserveAsync(_baselineHandler.Handle(_request, CancellationToken.None), Observation); } /// /// 通过 GFramework.CQRS runtime 创建 stream,并按当前观测模式消费。 /// [Benchmark] public ValueTask Stream_GFrameworkCqrs() { return ObserveAsync( _runtime.CreateStream( BenchmarkContext.Instance, _request, CancellationToken.None), Observation); } /// /// 通过 MediatR 创建 stream,并按当前观测模式消费,作为外部设计对照。 /// [Benchmark] public ValueTask Stream_MediatR() { return ObserveAsync(_mediatr.CreateStream(_request, CancellationToken.None), Observation); } /// /// 按观测模式消费 stream,便于把“建流/首个元素”和“完整枚举”分开观察。 /// /// 当前 stream 的响应类型。 /// 待观察的异步响应序列。 /// 当前 benchmark 选定的观测模式。 /// 异步消费完成后的等待句柄。 private static ValueTask ObserveAsync( IAsyncEnumerable responses, StreamObservation observation) { ArgumentNullException.ThrowIfNull(responses); return observation switch { StreamObservation.FirstItem => ConsumeFirstItemAsync(responses, CancellationToken.None), StreamObservation.DrainAll => DrainAsync(responses), _ => throw new ArgumentOutOfRangeException( nameof(observation), observation, "Unsupported stream observation mode.") }; } /// /// 只推进到首个元素后立即释放枚举器,用来近似隔离建流与首个 `MoveNextAsync` 的固定成本。 /// /// 当前 stream 的响应类型。 /// 待观察的异步响应序列。 /// 用于向异步枚举器传播取消的令牌。 /// 消费首个元素后的等待句柄。 private static async ValueTask ConsumeFirstItemAsync( IAsyncEnumerable responses, CancellationToken cancellationToken) { var enumerator = responses.GetAsyncEnumerator(cancellationToken); await using (enumerator.ConfigureAwait(false)) { if (await enumerator.MoveNextAsync().ConfigureAwait(false)) { _ = enumerator.Current; } } } /// /// 完整枚举整个 stream,保留原 benchmark 的总成本观测口径。 /// /// 当前 stream 的响应类型。 /// 待完整枚举的异步响应序列。 /// 完整枚举结束后的等待句柄。 private static async ValueTask DrainAsync(IAsyncEnumerable responses) { await foreach (var response in responses.ConfigureAwait(false)) { _ = response; } } /// /// Benchmark stream request。 /// /// 请求标识。 /// 返回元素数量。 public sealed record BenchmarkStreamRequest(Guid Id, int ItemCount) : GFramework.Cqrs.Abstractions.Cqrs.IStreamRequest, MediatR.IStreamRequest; /// /// 复用 request benchmark 的响应结构,保持跨场景可比性。 /// /// 响应标识。 public sealed record BenchmarkResponse(Guid Id); /// /// 同时实现 GFramework.CQRS 与 MediatR 契约的最小 stream handler。 /// public sealed class BenchmarkStreamHandler : GFramework.Cqrs.Abstractions.Cqrs.IStreamRequestHandler, MediatR.IStreamRequestHandler { /// /// 处理 GFramework.CQRS stream request。 /// public IAsyncEnumerable Handle( BenchmarkStreamRequest request, CancellationToken cancellationToken) { return EnumerateAsync(request, cancellationToken); } /// /// 处理 MediatR stream request。 /// IAsyncEnumerable MediatR.IStreamRequestHandler.Handle( BenchmarkStreamRequest request, CancellationToken cancellationToken) { return EnumerateAsync(request, cancellationToken); } /// /// 为 benchmark 构造稳定、低噪声的异步响应序列。 /// private static async IAsyncEnumerable EnumerateAsync( BenchmarkStreamRequest request, [System.Runtime.CompilerServices.EnumeratorCancellation] CancellationToken cancellationToken) { for (int index = 0; index < request.ItemCount; index++) { cancellationToken.ThrowIfCancellationRequested(); yield return new BenchmarkResponse(request.Id); await Task.CompletedTask.ConfigureAwait(false); } } } }