// Copyright (c) 2025-2026 GeWuYou
// SPDX-License-Identifier: Apache-2.0
using BenchmarkDotNet.Attributes;
using BenchmarkDotNet.Columns;
using BenchmarkDotNet.Configs;
using BenchmarkDotNet.Diagnosers;
using BenchmarkDotNet.Jobs;
using BenchmarkDotNet.Order;
using System;
using System.Collections.Generic;
using System.Threading;
using System.Threading.Tasks;
using GFramework.Core.Abstractions.Logging;
using GFramework.Core.Ioc;
using GFramework.Core.Logging;
using GFramework.Cqrs.Abstractions.Cqrs;
using MediatR;
using Microsoft.Extensions.DependencyInjection;
[assembly: GFramework.Cqrs.CqrsHandlerRegistryAttribute(
typeof(GFramework.Cqrs.Benchmarks.Messaging.GeneratedStreamInvokerBenchmarkRegistry))]
namespace GFramework.Cqrs.Benchmarks.Messaging;
///
/// 对比 stream invoker 在 direct handler、GFramework 反射路径、GFramework generated invoker 路径与 MediatR 之间的开销差异。
///
///
/// 该矩阵只保留单一 handler 生命周期,避免把 invoker 路径差异与生命周期解析成本混在一起。
/// 用于近似观察建流到首个元素的瞬时成本,
/// 则保留原有完整枚举口径。
///
[Config(typeof(Config))]
public class StreamInvokerBenchmarks
{
private MicrosoftDiContainer _reflectionContainer = null!;
private ICqrsRuntime _reflectionRuntime = null!;
private MicrosoftDiContainer _generatedContainer = null!;
private ICqrsRuntime _generatedRuntime = null!;
private ServiceProvider _serviceProvider = null!;
private IMediator _mediatr = null!;
private ReflectionBenchmarkStreamHandler _baselineHandler = null!;
private ReflectionBenchmarkStreamRequest _reflectionRequest = null!;
private GeneratedBenchmarkStreamRequest _generatedRequest = null!;
private MediatRBenchmarkStreamRequest _mediatrRequest = null!;
///
/// 控制当前 benchmark 观察“只推进首个元素”还是“完整枚举整个 stream”。
///
[Params(StreamObservation.FirstItem, StreamObservation.DrainAll)]
public StreamObservation Observation { get; set; }
///
/// 用于拆分 stream invoker 固定成本与后续枚举成本的观测模式。
///
public enum StreamObservation
{
///
/// 只推进到首个元素后立即释放枚举器。
///
FirstItem,
///
/// 完整枚举整个 stream,保留原有 benchmark 语义。
///
DrainAll
}
///
/// 配置 stream invoker benchmark 的公共输出格式。
///
private sealed class Config : ManualConfig
{
public Config()
{
AddJob(Job.Default);
AddColumnProvider(DefaultColumnProviders.Instance);
AddColumn(new CustomColumn("Scenario", static (_, _) => "StreamInvoker"));
AddDiagnoser(MemoryDiagnoser.Default);
WithOrderer(new DefaultOrderer(SummaryOrderPolicy.FastestToSlowest, MethodOrderPolicy.Declared));
}
}
///
/// 构建 reflection / generated / MediatR 三组 stream dispatch 对照宿主。
///
[GlobalSetup]
public void Setup()
{
LoggerFactoryResolver.Provider = new ConsoleLoggerFactoryProvider
{
MinLevel = LogLevel.Fatal
};
Fixture.Setup("StreamInvoker", handlerCount: 1, pipelineCount: 0);
BenchmarkDispatcherCacheHelper.ClearDispatcherCaches();
_baselineHandler = new ReflectionBenchmarkStreamHandler();
_reflectionRequest = new ReflectionBenchmarkStreamRequest(Guid.NewGuid(), 3);
_generatedRequest = new GeneratedBenchmarkStreamRequest(Guid.NewGuid(), 3);
_mediatrRequest = new MediatRBenchmarkStreamRequest(Guid.NewGuid(), 3);
_reflectionContainer = BenchmarkHostFactory.CreateFrozenGFrameworkContainer(static container =>
{
container.RegisterTransient, ReflectionBenchmarkStreamHandler>();
});
_reflectionRuntime = GFramework.Cqrs.CqrsRuntimeFactory.CreateRuntime(
_reflectionContainer,
LoggerFactoryResolver.Provider.CreateLogger(nameof(StreamInvokerBenchmarks) + ".Reflection"));
_generatedContainer = BenchmarkHostFactory.CreateFrozenGFrameworkContainer(container =>
{
BenchmarkHostFactory.RegisterGeneratedBenchmarkRegistry(container);
});
_generatedRuntime = GFramework.Cqrs.CqrsRuntimeFactory.CreateRuntime(
_generatedContainer,
LoggerFactoryResolver.Provider.CreateLogger(nameof(StreamInvokerBenchmarks) + ".Generated"));
_serviceProvider = BenchmarkHostFactory.CreateMediatRServiceProvider(
configure: null,
typeof(StreamInvokerBenchmarks),
static candidateType => candidateType == typeof(MediatRBenchmarkStreamHandler),
ServiceLifetime.Transient);
_mediatr = _serviceProvider.GetRequiredService();
}
///
/// 释放 MediatR 对照组使用的 DI 宿主,并清理静态 dispatcher 缓存。
///
[GlobalCleanup]
public void Cleanup()
{
try
{
BenchmarkCleanupHelper.DisposeAll(_reflectionContainer, _generatedContainer, _serviceProvider);
}
finally
{
BenchmarkDispatcherCacheHelper.ClearDispatcherCaches();
}
}
///
/// 直接调用最小 stream handler,并按当前观测模式消费 stream,作为 dispatch 额外开销 baseline。
///
[Benchmark(Baseline = true)]
public ValueTask Stream_Baseline()
{
return ObserveAsync(_baselineHandler.Handle(_reflectionRequest, CancellationToken.None), Observation);
}
///
/// 通过 GFramework.CQRS 反射 stream binding 路径创建 stream,并按当前观测模式消费。
///
[Benchmark]
public ValueTask Stream_GFrameworkReflection()
{
return ObserveAsync(
_reflectionRuntime.CreateStream(
BenchmarkContext.Instance,
_reflectionRequest,
CancellationToken.None),
Observation);
}
///
/// 通过 generated stream invoker provider 预热后的 GFramework.CQRS runtime 创建 stream,并按当前观测模式消费。
///
[Benchmark]
public ValueTask Stream_GFrameworkGenerated()
{
return ObserveAsync(
_generatedRuntime.CreateStream(
BenchmarkContext.Instance,
_generatedRequest,
CancellationToken.None),
Observation);
}
///
/// 通过 MediatR 创建 stream,并按当前观测模式消费,作为外部对照。
///
[Benchmark]
public ValueTask Stream_MediatR()
{
return ObserveAsync(_mediatr.CreateStream(_mediatrRequest, CancellationToken.None), Observation);
}
///
/// 按观测模式消费 stream,便于把“建流/首个元素”和“完整枚举”分开观察。
///
/// 当前 stream 的响应类型。
/// 待观察的异步响应序列。
/// 当前 benchmark 选定的观测模式。
/// 异步消费完成后的等待句柄。
private static ValueTask ObserveAsync(
IAsyncEnumerable responses,
StreamObservation observation)
{
ArgumentNullException.ThrowIfNull(responses);
return observation switch
{
StreamObservation.FirstItem => ConsumeFirstItemAsync(responses, CancellationToken.None),
StreamObservation.DrainAll => DrainAsync(responses),
_ => throw new ArgumentOutOfRangeException(
nameof(observation),
observation,
"Unsupported stream observation mode.")
};
}
///
/// 只推进到首个元素后立即释放枚举器,用来近似隔离建流与首个 MoveNextAsync 的固定成本。
///
/// 当前 stream 的响应类型。
/// 待观察的异步响应序列。
/// 用于向异步枚举器传播取消的令牌。
/// 消费首个元素后的等待句柄。
private static async ValueTask ConsumeFirstItemAsync(
IAsyncEnumerable responses,
CancellationToken cancellationToken)
{
var enumerator = responses.GetAsyncEnumerator(cancellationToken);
await using (enumerator.ConfigureAwait(false))
{
// 这里显式读取 Current,只为了让所有路径都完成首个元素的同等消费。
if (await enumerator.MoveNextAsync().ConfigureAwait(false))
{
_ = enumerator.Current;
}
}
}
///
/// 完整枚举整个 stream,保留原 benchmark 的总成本观测口径。
///
/// 当前 stream 的响应类型。
/// 待完整枚举的异步响应序列。
/// 完整枚举结束后的等待句柄。
private static async ValueTask DrainAsync(IAsyncEnumerable responses)
{
await foreach (var response in responses.ConfigureAwait(false))
{
_ = response;
}
}
///
/// Reflection runtime stream request。
///
/// 请求标识。
/// 返回元素数量。
public sealed record ReflectionBenchmarkStreamRequest(Guid Id, int ItemCount) :
GFramework.Cqrs.Abstractions.Cqrs.IStreamRequest;
///
/// Reflection runtime stream response。
///
/// 响应标识。
public sealed record ReflectionBenchmarkResponse(Guid Id);
///
/// Generated runtime stream request。
///
/// 请求标识。
/// 返回元素数量。
public sealed record GeneratedBenchmarkStreamRequest(Guid Id, int ItemCount) :
GFramework.Cqrs.Abstractions.Cqrs.IStreamRequest;
///
/// Generated runtime stream response。
///
/// 响应标识。
public sealed record GeneratedBenchmarkResponse(Guid Id);
///
/// MediatR stream request。
///
/// 请求标识。
/// 返回元素数量。
public sealed record MediatRBenchmarkStreamRequest(Guid Id, int ItemCount) :
MediatR.IStreamRequest;
///
/// MediatR stream response。
///
/// 响应标识。
public sealed record MediatRBenchmarkResponse(Guid Id);
///
/// Reflection runtime 的最小 stream request handler。
///
public sealed class ReflectionBenchmarkStreamHandler :
GFramework.Cqrs.Abstractions.Cqrs.IStreamRequestHandler
{
///
/// 处理 reflection benchmark stream request。
///
/// 当前 reflection benchmark stream 请求。
/// 用于中断异步枚举的取消令牌。
/// 完整枚举所需的低噪声异步响应序列。
public IAsyncEnumerable Handle(
ReflectionBenchmarkStreamRequest request,
CancellationToken cancellationToken)
{
return EnumerateAsync(
request.Id,
request.ItemCount,
static id => new ReflectionBenchmarkResponse(id),
cancellationToken);
}
}
///
/// Generated runtime 的最小 stream request handler。
///
public sealed class GeneratedBenchmarkStreamHandler :
GFramework.Cqrs.Abstractions.Cqrs.IStreamRequestHandler
{
///
/// 处理 generated benchmark stream request。
///
/// 当前 generated benchmark stream 请求。
/// 用于中断异步枚举的取消令牌。
/// 完整枚举所需的低噪声异步响应序列。
public IAsyncEnumerable Handle(
GeneratedBenchmarkStreamRequest request,
CancellationToken cancellationToken)
{
return EnumerateAsync(
request.Id,
request.ItemCount,
static id => new GeneratedBenchmarkResponse(id),
cancellationToken);
}
}
///
/// MediatR 对照组的最小 stream request handler。
///
public sealed class MediatRBenchmarkStreamHandler :
MediatR.IStreamRequestHandler
{
///
/// 处理 MediatR benchmark stream request。
///
/// 当前 MediatR benchmark stream 请求。
/// 用于中断异步枚举的取消令牌。
/// 完整枚举所需的低噪声异步响应序列。
public IAsyncEnumerable Handle(
MediatRBenchmarkStreamRequest request,
CancellationToken cancellationToken)
{
return EnumerateAsync(
request.Id,
request.ItemCount,
static id => new MediatRBenchmarkResponse(id),
cancellationToken);
}
}
///
/// 为三组 stream benchmark 构造相同形状的低噪声异步枚举,避免枚举体差异干扰 invoker 对照。
///
/// 当前 stream 的响应类型。
/// 每个响应复用的稳定标识。
/// 待返回的响应元素数量。
/// 将稳定标识映射为响应对象的工厂。
/// 用于中断异步枚举的取消令牌。
/// 供各对照路径共享的低噪声异步响应序列。
private static async IAsyncEnumerable EnumerateAsync(
Guid id,
int itemCount,
Func responseFactory,
[System.Runtime.CompilerServices.EnumeratorCancellation] CancellationToken cancellationToken)
{
for (var index = 0; index < itemCount; index++)
{
cancellationToken.ThrowIfCancellationRequested();
yield return responseFactory(id);
await Task.CompletedTask.ConfigureAwait(false);
}
}
}