// Copyright (c) 2025-2026 GeWuYou
// SPDX-License-Identifier: Apache-2.0
using BenchmarkDotNet.Attributes;
using BenchmarkDotNet.Columns;
using BenchmarkDotNet.Configs;
using BenchmarkDotNet.Diagnosers;
using BenchmarkDotNet.Jobs;
using BenchmarkDotNet.Order;
using System;
using System.Collections.Generic;
using System.Threading;
using System.Threading.Tasks;
using GFramework.Core.Abstractions.Logging;
using GFramework.Core.Ioc;
using GFramework.Core.Logging;
using GFramework.Cqrs.Abstractions.Cqrs;
using MediatR;
using Microsoft.Extensions.DependencyInjection;
namespace GFramework.Cqrs.Benchmarks.Messaging;
///
/// 对比单个 stream request 在直接调用、GFramework.CQRS runtime 与 MediatR 之间的完整枚举开销。
///
[Config(typeof(Config))]
public class StreamingBenchmarks
{
private MicrosoftDiContainer _container = null!;
private ICqrsRuntime _runtime = null!;
private ServiceProvider _serviceProvider = null!;
private IMediator _mediatr = null!;
private BenchmarkStreamHandler _baselineHandler = null!;
private BenchmarkStreamRequest _request = null!;
///
/// 配置 stream benchmark 的公共输出格式。
///
private sealed class Config : ManualConfig
{
public Config()
{
AddJob(Job.Default);
AddColumnProvider(DefaultColumnProviders.Instance);
AddColumn(new CustomColumn("Scenario", static (_, _) => "StreamRequest"));
AddDiagnoser(MemoryDiagnoser.Default);
WithOrderer(new DefaultOrderer(SummaryOrderPolicy.FastestToSlowest, MethodOrderPolicy.Declared));
}
}
///
/// 构建 stream dispatch 所需的最小 runtime 宿主和对照对象。
///
[GlobalSetup]
public void Setup()
{
LoggerFactoryResolver.Provider = new ConsoleLoggerFactoryProvider
{
MinLevel = LogLevel.Fatal
};
Fixture.Setup("StreamRequest", handlerCount: 1, pipelineCount: 0);
_container = new MicrosoftDiContainer();
_baselineHandler = new BenchmarkStreamHandler();
_container.RegisterTransient, BenchmarkStreamHandler>();
_runtime = GFramework.Cqrs.CqrsRuntimeFactory.CreateRuntime(
_container,
LoggerFactoryResolver.Provider.CreateLogger(nameof(StreamingBenchmarks)));
var services = new ServiceCollection();
services.AddLogging(static builder =>
Microsoft.Extensions.Logging.FilterLoggingBuilderExtensions.AddFilter(
builder,
"LuckyPennySoftware.MediatR.License",
Microsoft.Extensions.Logging.LogLevel.None));
services.AddSingleton, BenchmarkStreamHandler>();
services.AddMediatR(static options => options.RegisterServicesFromAssembly(typeof(StreamingBenchmarks).Assembly));
_serviceProvider = services.BuildServiceProvider();
_mediatr = _serviceProvider.GetRequiredService();
_request = new BenchmarkStreamRequest(Guid.NewGuid(), 3);
}
///
/// 释放 MediatR 对照组使用的 DI 宿主。
///
[GlobalCleanup]
public void Cleanup()
{
_serviceProvider.Dispose();
}
///
/// 直接调用 handler 并完整枚举响应序列,作为 stream dispatch 额外开销的 baseline。
///
[Benchmark(Baseline = true)]
public async ValueTask Stream_Baseline()
{
await foreach (var response in _baselineHandler.Handle(_request, CancellationToken.None).ConfigureAwait(false))
{
_ = response;
}
}
///
/// 通过 GFramework.CQRS runtime 创建并完整枚举 stream。
///
[Benchmark]
public async ValueTask Stream_GFrameworkCqrs()
{
await foreach (var response in _runtime.CreateStream(BenchmarkContext.Instance, _request, CancellationToken.None)
.ConfigureAwait(false))
{
_ = response;
}
}
///
/// 通过 MediatR 创建并完整枚举 stream,作为外部设计对照。
///
[Benchmark]
public async ValueTask Stream_MediatR()
{
await foreach (var response in _mediatr.CreateStream(_request, CancellationToken.None).ConfigureAwait(false))
{
_ = response;
}
}
///
/// Benchmark stream request。
///
/// 请求标识。
/// 返回元素数量。
public sealed record BenchmarkStreamRequest(Guid Id, int ItemCount) :
GFramework.Cqrs.Abstractions.Cqrs.IStreamRequest,
MediatR.IStreamRequest;
///
/// 复用 request benchmark 的响应结构,保持跨场景可比性。
///
/// 响应标识。
public sealed record BenchmarkResponse(Guid Id);
///
/// 同时实现 GFramework.CQRS 与 MediatR 契约的最小 stream handler。
///
public sealed class BenchmarkStreamHandler :
GFramework.Cqrs.Abstractions.Cqrs.IStreamRequestHandler,
MediatR.IStreamRequestHandler
{
///
/// 处理 GFramework.CQRS stream request。
///
public IAsyncEnumerable Handle(
BenchmarkStreamRequest request,
CancellationToken cancellationToken)
{
return EnumerateAsync(request, cancellationToken);
}
///
/// 处理 MediatR stream request。
///
IAsyncEnumerable MediatR.IStreamRequestHandler.Handle(
BenchmarkStreamRequest request,
CancellationToken cancellationToken)
{
return EnumerateAsync(request, cancellationToken);
}
///
/// 为 benchmark 构造稳定、低噪声的异步响应序列。
///
private static async IAsyncEnumerable EnumerateAsync(
BenchmarkStreamRequest request,
[System.Runtime.CompilerServices.EnumeratorCancellation] CancellationToken cancellationToken)
{
for (int index = 0; index < request.ItemCount; index++)
{
cancellationToken.ThrowIfCancellationRequested();
yield return new BenchmarkResponse(request.Id);
await Task.CompletedTask.ConfigureAwait(false);
}
}
}
}