// Copyright (c) 2025-2026 GeWuYou
// SPDX-License-Identifier: Apache-2.0
using BenchmarkDotNet.Attributes;
using BenchmarkDotNet.Columns;
using BenchmarkDotNet.Configs;
using BenchmarkDotNet.Diagnosers;
using BenchmarkDotNet.Jobs;
using BenchmarkDotNet.Order;
using System;
using System.Collections.Generic;
using System.Reflection;
using System.Runtime.CompilerServices;
using System.Threading;
using System.Threading.Tasks;
using GFramework.Core.Abstractions.Logging;
using GFramework.Core.Ioc;
using GFramework.Core.Logging;
using GFramework.Cqrs.Abstractions.Cqrs;
using MediatR;
using Microsoft.Extensions.DependencyInjection;
[assembly: GFramework.Cqrs.CqrsHandlerRegistryAttribute(
typeof(GFramework.Cqrs.Benchmarks.Messaging.StreamPipelineBenchmarks.GeneratedStreamPipelineBenchmarkRegistry))]
namespace GFramework.Cqrs.Benchmarks.Messaging;
///
/// 对比不同 stream pipeline 行为数量下,单个 stream request 在直接调用、GFramework.CQRS runtime 与 MediatR 之间的 steady-state dispatch 开销。
///
///
/// 当前矩阵同时覆盖 0 / 1 / 4 个 stream pipeline 行为,以及
/// 与 两种观测口径,
/// 以便把建流固定成本与完整枚举成本拆开观察。
///
[Config(typeof(Config))]
public class StreamPipelineBenchmarks
{
private MicrosoftDiContainer _container = null!;
private ICqrsRuntime _runtime = null!;
private ServiceProvider _serviceProvider = null!;
private IMediator _mediatr = null!;
private BenchmarkStreamHandler _baselineHandler = null!;
private BenchmarkStreamRequest _request = null!;
///
/// 控制当前场景注册的 stream pipeline 行为数量,保持与 request pipeline benchmark 相同的 0 / 1 / 4 矩阵。
///
[Params(0, 1, 4)]
public int PipelineCount { get; set; }
///
/// 控制当前 benchmark 观察“只推进首个元素”还是“完整枚举整个 stream”。
///
[Params(StreamObservation.FirstItem, StreamObservation.DrainAll)]
public StreamObservation Observation { get; set; }
///
/// 用于拆分 stream dispatch 固定成本与后续枚举成本的观测模式。
///
public enum StreamObservation
{
///
/// 只推进到首个元素后立即释放枚举器。
///
FirstItem,
///
/// 完整枚举整个 stream,保留原有 benchmark 语义。
///
DrainAll
}
///
/// 配置 stream pipeline benchmark 的公共输出格式。
///
private sealed class Config : ManualConfig
{
public Config()
{
AddJob(Job.Default);
AddColumnProvider(DefaultColumnProviders.Instance);
AddColumn(new CustomColumn("Scenario", static (_, _) => "StreamPipeline"));
AddDiagnoser(MemoryDiagnoser.Default);
WithOrderer(new DefaultOrderer(SummaryOrderPolicy.FastestToSlowest, MethodOrderPolicy.Declared));
}
}
///
/// 构建 stream pipeline dispatch 所需的最小 runtime 宿主和对照对象。
///
[GlobalSetup]
public void Setup()
{
LoggerFactoryResolver.Provider = new ConsoleLoggerFactoryProvider
{
MinLevel = LogLevel.Fatal
};
Fixture.Setup("StreamPipeline", handlerCount: 1, pipelineCount: PipelineCount);
BenchmarkDispatcherCacheHelper.ClearDispatcherCaches();
_baselineHandler = new BenchmarkStreamHandler();
_container = BenchmarkHostFactory.CreateFrozenGFrameworkContainer(container =>
{
BenchmarkHostFactory.RegisterGeneratedBenchmarkRegistry(container);
RegisterGFrameworkPipelineBehaviors(container, PipelineCount);
});
_runtime = GFramework.Cqrs.CqrsRuntimeFactory.CreateRuntime(
_container,
LoggerFactoryResolver.Provider.CreateLogger(nameof(StreamPipelineBenchmarks)));
_serviceProvider = BenchmarkHostFactory.CreateMediatRServiceProvider(
services =>
{
RegisterMediatRStreamPipelineBehaviors(services, PipelineCount);
},
typeof(StreamPipelineBenchmarks),
static candidateType =>
candidateType == typeof(BenchmarkStreamHandler) ||
candidateType == typeof(BenchmarkStreamPipelineBehavior1) ||
candidateType == typeof(BenchmarkStreamPipelineBehavior2) ||
candidateType == typeof(BenchmarkStreamPipelineBehavior3) ||
candidateType == typeof(BenchmarkStreamPipelineBehavior4),
ServiceLifetime.Singleton);
_mediatr = _serviceProvider.GetRequiredService();
_request = new BenchmarkStreamRequest(Guid.NewGuid(), 3);
}
///
/// 释放 MediatR 对照组使用的 DI 宿主。
///
[GlobalCleanup]
public void Cleanup()
{
try
{
BenchmarkCleanupHelper.DisposeAll(_container, _serviceProvider);
}
finally
{
BenchmarkDispatcherCacheHelper.ClearDispatcherCaches();
}
}
///
/// 直接调用 handler,并按当前观测模式消费响应序列,作为 stream pipeline 编排之外的基线。
///
[Benchmark(Baseline = true)]
public ValueTask Stream_Baseline()
{
return ObserveAsync(_baselineHandler.Handle(_request, CancellationToken.None), Observation);
}
///
/// 通过 GFramework.CQRS runtime 创建 stream,并按当前矩阵配置执行 stream pipeline。
///
[Benchmark]
public ValueTask Stream_GFrameworkCqrs()
{
return ObserveAsync(
_runtime.CreateStream(
BenchmarkContext.Instance,
_request,
CancellationToken.None),
Observation);
}
///
/// 通过 MediatR 创建 stream,并按当前矩阵配置执行 stream pipeline,作为外部设计对照。
///
[Benchmark]
public ValueTask Stream_MediatR()
{
return ObserveAsync(_mediatr.CreateStream(_request, CancellationToken.None), Observation);
}
///
/// 按指定数量向 GFramework.CQRS 宿主注册最小 no-op stream pipeline 行为。
///
/// 当前 benchmark 使用的容器。
/// 要注册的行为数量。
/// 行为数量不在支持的矩阵内时抛出。
private static void RegisterGFrameworkPipelineBehaviors(MicrosoftDiContainer container, int pipelineCount)
{
ArgumentNullException.ThrowIfNull(container);
switch (pipelineCount)
{
case 0:
return;
case 1:
container.RegisterCqrsStreamPipelineBehavior();
return;
case 4:
container.RegisterCqrsStreamPipelineBehavior();
container.RegisterCqrsStreamPipelineBehavior();
container.RegisterCqrsStreamPipelineBehavior();
container.RegisterCqrsStreamPipelineBehavior();
return;
default:
throw new ArgumentOutOfRangeException(nameof(pipelineCount), pipelineCount,
"Only the 0/1/4 pipeline matrix is supported.");
}
}
///
/// 按指定数量向 MediatR 宿主注册最小 no-op stream pipeline 行为。
///
/// 当前 benchmark 使用的服务集合。
/// 要注册的行为数量。
/// 行为数量不在支持的矩阵内时抛出。
private static void RegisterMediatRStreamPipelineBehaviors(IServiceCollection services, int pipelineCount)
{
ArgumentNullException.ThrowIfNull(services);
switch (pipelineCount)
{
case 0:
return;
case 1:
services.AddSingleton, BenchmarkStreamPipelineBehavior1>();
return;
case 4:
services.AddSingleton, BenchmarkStreamPipelineBehavior1>();
services.AddSingleton, BenchmarkStreamPipelineBehavior2>();
services.AddSingleton, BenchmarkStreamPipelineBehavior3>();
services.AddSingleton, BenchmarkStreamPipelineBehavior4>();
return;
default:
throw new ArgumentOutOfRangeException(nameof(pipelineCount), pipelineCount,
"Only the 0/1/4 pipeline matrix is supported.");
}
}
///
/// 按观测模式消费 stream,便于把“建流/首个元素”和“完整枚举”分开观察。
///
/// 当前 stream 的响应类型。
/// 待观察的异步响应序列。
/// 当前 benchmark 选定的观测模式。
/// 异步消费完成后的等待句柄。
private static ValueTask ObserveAsync(
IAsyncEnumerable responses,
StreamObservation observation)
{
ArgumentNullException.ThrowIfNull(responses);
return observation switch
{
StreamObservation.FirstItem => ConsumeFirstItemAsync(responses, CancellationToken.None),
StreamObservation.DrainAll => DrainAsync(responses),
_ => throw new ArgumentOutOfRangeException(
nameof(observation),
observation,
"Unsupported stream observation mode.")
};
}
///
/// 只推进到首个元素后立即释放枚举器,用来近似隔离建流与首个 MoveNextAsync 的固定成本。
///
/// 当前 stream 的响应类型。
/// 待观察的异步响应序列。
/// 用于向异步枚举器传播取消的令牌。
/// 消费首个元素后的等待句柄。
private static async ValueTask ConsumeFirstItemAsync(
IAsyncEnumerable responses,
CancellationToken cancellationToken)
{
var enumerator = responses.GetAsyncEnumerator(cancellationToken);
await using (enumerator.ConfigureAwait(false))
{
if (await enumerator.MoveNextAsync().ConfigureAwait(false))
{
_ = enumerator.Current;
}
}
}
///
/// 完整枚举整个 stream,保留原 benchmark 的总成本观测口径。
///
/// 当前 stream 的响应类型。
/// 待完整枚举的异步响应序列。
/// 完整枚举结束后的等待句柄。
private static async ValueTask DrainAsync(IAsyncEnumerable responses)
{
await foreach (var response in responses.ConfigureAwait(false))
{
_ = response;
}
}
///
/// Benchmark stream request。
///
/// 请求标识。
/// 返回元素数量。
public sealed record BenchmarkStreamRequest(Guid Id, int ItemCount) :
GFramework.Cqrs.Abstractions.Cqrs.IStreamRequest,
MediatR.IStreamRequest;
///
/// 复用 stream benchmark 的响应结构,保持跨场景可比性。
///
/// 响应标识。
public sealed record BenchmarkResponse(Guid Id);
///
/// 同时实现 GFramework.CQRS 与 MediatR 契约的最小 stream handler。
///
public sealed class BenchmarkStreamHandler :
GFramework.Cqrs.Abstractions.Cqrs.IStreamRequestHandler,
MediatR.IStreamRequestHandler
{
///
/// 处理 GFramework.CQRS stream request。
///
/// 当前 benchmark stream 请求。
/// 用于中断异步枚举的取消令牌。
/// 低噪声、可重复的异步响应序列。
public IAsyncEnumerable Handle(
BenchmarkStreamRequest request,
CancellationToken cancellationToken)
{
return EnumerateAsync(request, cancellationToken);
}
///
/// 处理 MediatR stream request。
///
/// 当前 benchmark stream 请求。
/// 用于中断异步枚举的取消令牌。
/// 低噪声、可重复的异步响应序列。
IAsyncEnumerable MediatR.IStreamRequestHandler.Handle(
BenchmarkStreamRequest request,
CancellationToken cancellationToken)
{
return EnumerateAsync(request, cancellationToken);
}
///
/// 为 benchmark 构造稳定、低噪声的异步响应序列。
///
/// 决定元素数量和标识的 benchmark 请求。
/// 用于中断异步枚举的取消令牌。
/// 按请求数量生成的响应序列。
private static async IAsyncEnumerable EnumerateAsync(
BenchmarkStreamRequest request,
[EnumeratorCancellation] CancellationToken cancellationToken)
{
for (int index = 0; index < request.ItemCount; index++)
{
cancellationToken.ThrowIfCancellationRequested();
yield return new BenchmarkResponse(request.Id);
await Task.CompletedTask.ConfigureAwait(false);
}
}
}
///
/// 为 benchmark 提供统一的 no-op stream pipeline 行为实现,尽量把测量焦点保持在调度器与行为编排本身。
///
public abstract class BenchmarkStreamPipelineBehaviorBase :
GFramework.Cqrs.Abstractions.Cqrs.IStreamPipelineBehavior,
MediatR.IStreamPipelineBehavior
{
///
/// 透传 GFramework.CQRS stream pipeline,避免引入额外业务逻辑噪音。
///
/// 当前 benchmark stream 请求。
/// 继续向下执行的 stream pipeline 委托。
/// 取消令牌。
/// 下游 handler 产出的异步响应序列。
public IAsyncEnumerable Handle(
BenchmarkStreamRequest message,
GFramework.Cqrs.Abstractions.Cqrs.StreamMessageHandlerDelegate next,
CancellationToken cancellationToken)
{
return next(message, cancellationToken);
}
///
/// 透传 MediatR stream pipeline,保持与 GFramework.CQRS 相同的 no-op 语义。
///
/// 当前 benchmark stream 请求。
/// 继续向下执行的 MediatR stream pipeline 委托。
/// 取消令牌。
/// 下游 handler 产出的异步响应序列。
IAsyncEnumerable MediatR.IStreamPipelineBehavior.Handle(
BenchmarkStreamRequest request,
MediatR.StreamHandlerDelegate next,
CancellationToken cancellationToken)
{
_ = request;
_ = cancellationToken;
return next();
}
}
///
/// pipeline 矩阵中的第一个 no-op stream 行为。
///
public sealed class BenchmarkStreamPipelineBehavior1 : BenchmarkStreamPipelineBehaviorBase
{
}
///
/// pipeline 矩阵中的第二个 no-op stream 行为。
///
public sealed class BenchmarkStreamPipelineBehavior2 : BenchmarkStreamPipelineBehaviorBase
{
}
///
/// pipeline 矩阵中的第三个 no-op stream 行为。
///
public sealed class BenchmarkStreamPipelineBehavior3 : BenchmarkStreamPipelineBehaviorBase
{
}
///
/// pipeline 矩阵中的第四个 no-op stream 行为。
///
public sealed class BenchmarkStreamPipelineBehavior4 : BenchmarkStreamPipelineBehaviorBase
{
}
///
/// 为 stream pipeline benchmark 提供 handwritten generated registry,
/// 让默认 pipeline 宿主也能走真实的 generated stream invoker provider 接线路径。
///
public sealed class GeneratedStreamPipelineBenchmarkRegistry :
GFramework.Cqrs.ICqrsHandlerRegistry,
GFramework.Cqrs.ICqrsStreamInvokerProvider,
GFramework.Cqrs.IEnumeratesCqrsStreamInvokerDescriptors
{
private static readonly GFramework.Cqrs.CqrsStreamInvokerDescriptor Descriptor =
new(
typeof(GFramework.Cqrs.Abstractions.Cqrs.IStreamRequestHandler<
BenchmarkStreamRequest,
BenchmarkResponse>),
typeof(GeneratedStreamPipelineBenchmarkRegistry).GetMethod(
nameof(InvokeBenchmarkStreamHandler),
BindingFlags.Public | BindingFlags.Static)
?? throw new InvalidOperationException("Missing generated stream pipeline benchmark method."));
private static readonly IReadOnlyList Descriptors =
[
new GFramework.Cqrs.CqrsStreamInvokerDescriptorEntry(
typeof(BenchmarkStreamRequest),
typeof(BenchmarkResponse),
Descriptor)
];
///
/// 把 stream pipeline benchmark handler 注册为单例,保持与当前矩阵宿主一致的生命周期语义。
///
/// 用于承载 generated handler 注册的服务集合。
/// 记录 generated registry 接线结果的日志器。
public void Register(IServiceCollection services, ILogger logger)
{
ArgumentNullException.ThrowIfNull(services);
ArgumentNullException.ThrowIfNull(logger);
services.AddSingleton(
typeof(GFramework.Cqrs.Abstractions.Cqrs.IStreamRequestHandler),
typeof(BenchmarkStreamHandler));
logger.Debug("Registered generated stream pipeline benchmark handler.");
}
///
/// 返回当前 provider 暴露的全部 generated stream invoker 描述符。
///
/// 当前 benchmark 的 generated stream invoker 描述符集合。
public IReadOnlyList GetDescriptors()
{
return Descriptors;
}
///
/// 为目标流式请求/响应类型对返回 generated stream invoker 描述符。
///
/// 要匹配的 stream 请求类型。
/// 要匹配的 stream 响应类型。
/// 命中时返回的 generated stream invoker 描述符。
/// 是否命中了当前 benchmark 的 stream 请求/响应类型对。
public bool TryGetDescriptor(
Type requestType,
Type responseType,
out GFramework.Cqrs.CqrsStreamInvokerDescriptor? descriptor)
{
if (requestType == typeof(BenchmarkStreamRequest) &&
responseType == typeof(BenchmarkResponse))
{
descriptor = Descriptor;
return true;
}
descriptor = null;
return false;
}
///
/// 模拟 generated stream invoker provider 为 stream pipeline benchmark 产出的开放静态调用入口。
///
/// 当前要调用的 stream handler 实例。
/// 当前要分发的 stream 请求实例。
/// 用于向 handler 传播的取消令牌。
/// handler 产出的异步响应序列。
public static object InvokeBenchmarkStreamHandler(
object handler,
object request,
CancellationToken cancellationToken)
{
var typedHandler = (GFramework.Cqrs.Abstractions.Cqrs.IStreamRequestHandler<
BenchmarkStreamRequest,
BenchmarkResponse>)handler;
var typedRequest = (BenchmarkStreamRequest)request;
return typedHandler.Handle(typedRequest, cancellationToken);
}
}
}