// Copyright (c) 2025-2026 GeWuYou
// SPDX-License-Identifier: Apache-2.0
using BenchmarkDotNet.Attributes;
using BenchmarkDotNet.Columns;
using BenchmarkDotNet.Configs;
using BenchmarkDotNet.Diagnosers;
using BenchmarkDotNet.Jobs;
using BenchmarkDotNet.Order;
using System;
using System.Collections.Generic;
using System.Threading;
using System.Threading.Tasks;
using GFramework.Core.Abstractions.Logging;
using GFramework.Core.Ioc;
using GFramework.Core.Logging;
using GFramework.Cqrs.Abstractions.Cqrs;
using MediatR;
using Microsoft.Extensions.DependencyInjection;
namespace GFramework.Cqrs.Benchmarks.Messaging;
///
/// 对比 stream 完整枚举在不同 handler 生命周期下的额外开销。
///
///
/// 当前矩阵只覆盖 `Singleton` 与 `Transient`。
/// `Scoped` 仍依赖真实的显式作用域边界;在当前“单根容器最小宿主”模型下直接加入 scoped 会把枚举宿主成本与生命周期成本混在一起,
/// 因此保持与 request 生命周期矩阵相同的边界,留待后续 scoped host 基线具备后再扩展。
///
[Config(typeof(Config))]
public class StreamLifetimeBenchmarks
{
private MicrosoftDiContainer _container = null!;
private ICqrsRuntime _runtime = null!;
private ServiceProvider _serviceProvider = null!;
private IMediator _mediatr = null!;
private BenchmarkStreamHandler _baselineHandler = null!;
private BenchmarkStreamRequest _request = null!;
///
/// 控制当前 benchmark 使用的 handler 生命周期。
///
[Params(HandlerLifetime.Singleton, HandlerLifetime.Transient)]
public HandlerLifetime Lifetime { get; set; }
///
/// 可公平比较的 benchmark handler 生命周期集合。
///
public enum HandlerLifetime
{
///
/// 复用单个 handler 实例。
///
Singleton,
///
/// 每次建流都重新解析新的 handler 实例。
///
Transient
}
///
/// 配置 stream 生命周期 benchmark 的公共输出格式。
///
private sealed class Config : ManualConfig
{
public Config()
{
AddJob(Job.Default);
AddColumnProvider(DefaultColumnProviders.Instance);
AddColumn(new CustomColumn("Scenario", static (_, _) => "StreamLifetime"));
AddDiagnoser(MemoryDiagnoser.Default);
WithOrderer(new DefaultOrderer(SummaryOrderPolicy.FastestToSlowest, MethodOrderPolicy.Declared));
}
}
///
/// 构建当前生命周期下的 GFramework 与 MediatR stream 对照宿主。
///
[GlobalSetup]
public void Setup()
{
LoggerFactoryResolver.Provider = new ConsoleLoggerFactoryProvider
{
MinLevel = LogLevel.Fatal
};
Fixture.Setup($"StreamLifetime/{Lifetime}", handlerCount: 1, pipelineCount: 0);
BenchmarkDispatcherCacheHelper.ClearDispatcherCaches();
_baselineHandler = new BenchmarkStreamHandler();
_request = new BenchmarkStreamRequest(Guid.NewGuid(), 3);
_container = BenchmarkHostFactory.CreateFrozenGFrameworkContainer(container =>
{
BenchmarkHostFactory.RegisterGeneratedBenchmarkRegistry(container);
RegisterGFrameworkHandler(container, Lifetime);
});
// 容器内已提前保留默认 runtime 以支撑 generated registry 接线;
// 这里额外创建带生命周期后缀的 runtime,只是为了区分不同 benchmark 矩阵的 dispatcher 日志。
_runtime = GFramework.Cqrs.CqrsRuntimeFactory.CreateRuntime(
_container,
LoggerFactoryResolver.Provider.CreateLogger(nameof(StreamLifetimeBenchmarks) + "." + Lifetime));
_serviceProvider = BenchmarkHostFactory.CreateMediatRServiceProvider(
configure: null,
typeof(StreamLifetimeBenchmarks),
static candidateType => candidateType == typeof(BenchmarkStreamHandler),
ResolveMediatRLifetime(Lifetime));
_mediatr = _serviceProvider.GetRequiredService();
}
///
/// 释放当前生命周期矩阵持有的 benchmark 宿主资源,并清理 dispatcher 缓存。
///
[GlobalCleanup]
public void Cleanup()
{
try
{
BenchmarkCleanupHelper.DisposeAll(_container, _serviceProvider);
}
finally
{
BenchmarkDispatcherCacheHelper.ClearDispatcherCaches();
}
}
///
/// 直接调用 handler 并完整枚举,作为不同生命周期矩阵下的 dispatch 额外开销 baseline。
///
[Benchmark(Baseline = true)]
public async ValueTask Stream_Baseline()
{
await foreach (var response in _baselineHandler.Handle(_request, CancellationToken.None).ConfigureAwait(false))
{
_ = response;
}
}
///
/// 通过 GFramework.CQRS runtime 创建并完整枚举 stream。
///
[Benchmark]
public async ValueTask Stream_GFrameworkCqrs()
{
await foreach (var response in _runtime.CreateStream(BenchmarkContext.Instance, _request, CancellationToken.None)
.ConfigureAwait(false))
{
_ = response;
}
}
///
/// 通过 MediatR 创建并完整枚举 stream,作为外部对照。
///
[Benchmark]
public async ValueTask Stream_MediatR()
{
await foreach (var response in _mediatr.CreateStream(_request, CancellationToken.None).ConfigureAwait(false))
{
_ = response;
}
}
///
/// 按生命周期把 benchmark stream handler 注册到 GFramework 容器。
///
/// 当前 benchmark 拥有并负责释放的容器。
/// 待比较的 handler 生命周期。
///
/// 先通过 generated registry 提供静态 descriptor,再显式覆盖 handler 生命周期,
/// 可以把比较变量收敛到 handler 解析成本,而不是 descriptor 发现路径本身。
///
private static void RegisterGFrameworkHandler(MicrosoftDiContainer container, HandlerLifetime lifetime)
{
ArgumentNullException.ThrowIfNull(container);
switch (lifetime)
{
case HandlerLifetime.Singleton:
container.RegisterSingleton<
GFramework.Cqrs.Abstractions.Cqrs.IStreamRequestHandler,
BenchmarkStreamHandler>();
return;
case HandlerLifetime.Transient:
container.RegisterTransient<
GFramework.Cqrs.Abstractions.Cqrs.IStreamRequestHandler,
BenchmarkStreamHandler>();
return;
default:
throw new ArgumentOutOfRangeException(nameof(lifetime), lifetime, "Unsupported benchmark handler lifetime.");
}
}
///
/// 将 benchmark 生命周期映射为 MediatR 组装所需的 。
///
/// 待比较的 handler 生命周期。
/// 当前生命周期对应的 MediatR 注册方式。
private static ServiceLifetime ResolveMediatRLifetime(HandlerLifetime lifetime)
{
return lifetime switch
{
HandlerLifetime.Singleton => ServiceLifetime.Singleton,
HandlerLifetime.Transient => ServiceLifetime.Transient,
_ => throw new ArgumentOutOfRangeException(nameof(lifetime), lifetime, "Unsupported benchmark handler lifetime.")
};
}
///
/// Benchmark stream request。
///
/// 请求标识。
/// 返回元素数量。
public sealed record BenchmarkStreamRequest(Guid Id, int ItemCount) :
GFramework.Cqrs.Abstractions.Cqrs.IStreamRequest,
MediatR.IStreamRequest;
///
/// Benchmark stream response。
///
/// 响应标识。
public sealed record BenchmarkResponse(Guid Id);
///
/// 同时实现 GFramework.CQRS 与 MediatR 契约的最小 stream handler。
///
public sealed class BenchmarkStreamHandler :
GFramework.Cqrs.Abstractions.Cqrs.IStreamRequestHandler,
MediatR.IStreamRequestHandler
{
///
/// 处理 GFramework.CQRS stream request。
///
/// 当前 benchmark stream 请求。
/// 用于中断异步枚举的取消令牌。
/// 完整枚举所需的低噪声异步响应序列。
public IAsyncEnumerable Handle(
BenchmarkStreamRequest request,
CancellationToken cancellationToken)
{
return EnumerateAsync(request, cancellationToken);
}
///
/// 处理 MediatR stream request。
///
/// 当前 benchmark stream 请求。
/// 用于中断异步枚举的取消令牌。
/// 完整枚举所需的低噪声异步响应序列。
IAsyncEnumerable MediatR.IStreamRequestHandler.Handle(
BenchmarkStreamRequest request,
CancellationToken cancellationToken)
{
return EnumerateAsync(request, cancellationToken);
}
///
/// 为生命周期矩阵构造稳定、低噪声的异步响应序列。
///
/// 当前 benchmark 请求。
/// 用于中断异步枚举的取消令牌。
/// 按固定元素数量返回的异步响应序列。
private static async IAsyncEnumerable EnumerateAsync(
BenchmarkStreamRequest request,
[System.Runtime.CompilerServices.EnumeratorCancellation] CancellationToken cancellationToken)
{
for (var index = 0; index < request.ItemCount; index++)
{
cancellationToken.ThrowIfCancellationRequested();
yield return new BenchmarkResponse(request.Id);
await Task.CompletedTask.ConfigureAwait(false);
}
}
}
}