test(cqrs-benchmarks): 补齐 stream lifetime 三方对照口径

- 拆分 GFramework stream lifetime benchmark 的 reflection、generated 与 MediatR 独立请求和 handler 类型

- 调整 generated stream registry 仅绑定 generated 口径,避免静态 dispatcher 缓存污染对照结果

- 验证 StreamLifetimeBenchmarks 在 Singleton 与 Transient 下均产出完整四方对照结果
This commit is contained in:
gewuyou 2026-05-09 12:56:27 +08:00
parent f9c9561f40
commit 9107e23268
2 changed files with 195 additions and 70 deletions

View File

@ -26,8 +26,8 @@ public sealed class GeneratedStreamLifetimeBenchmarkRegistry :
private static readonly GFramework.Cqrs.CqrsStreamInvokerDescriptor Descriptor = private static readonly GFramework.Cqrs.CqrsStreamInvokerDescriptor Descriptor =
new( new(
typeof(IStreamRequestHandler< typeof(IStreamRequestHandler<
StreamLifetimeBenchmarks.BenchmarkStreamRequest, StreamLifetimeBenchmarks.GeneratedBenchmarkStreamRequest,
StreamLifetimeBenchmarks.BenchmarkResponse>), StreamLifetimeBenchmarks.GeneratedBenchmarkResponse>),
typeof(GeneratedStreamLifetimeBenchmarkRegistry).GetMethod( typeof(GeneratedStreamLifetimeBenchmarkRegistry).GetMethod(
nameof(InvokeBenchmarkStreamHandler), nameof(InvokeBenchmarkStreamHandler),
BindingFlags.Public | BindingFlags.Static) BindingFlags.Public | BindingFlags.Static)
@ -36,8 +36,8 @@ public sealed class GeneratedStreamLifetimeBenchmarkRegistry :
private static readonly IReadOnlyList<GFramework.Cqrs.CqrsStreamInvokerDescriptorEntry> Descriptors = private static readonly IReadOnlyList<GFramework.Cqrs.CqrsStreamInvokerDescriptorEntry> Descriptors =
[ [
new GFramework.Cqrs.CqrsStreamInvokerDescriptorEntry( new GFramework.Cqrs.CqrsStreamInvokerDescriptorEntry(
typeof(StreamLifetimeBenchmarks.BenchmarkStreamRequest), typeof(StreamLifetimeBenchmarks.GeneratedBenchmarkStreamRequest),
typeof(StreamLifetimeBenchmarks.BenchmarkResponse), typeof(StreamLifetimeBenchmarks.GeneratedBenchmarkResponse),
Descriptor) Descriptor)
]; ];
@ -78,8 +78,8 @@ public sealed class GeneratedStreamLifetimeBenchmarkRegistry :
Type responseType, Type responseType,
out GFramework.Cqrs.CqrsStreamInvokerDescriptor? descriptor) out GFramework.Cqrs.CqrsStreamInvokerDescriptor? descriptor)
{ {
if (requestType == typeof(StreamLifetimeBenchmarks.BenchmarkStreamRequest) && if (requestType == typeof(StreamLifetimeBenchmarks.GeneratedBenchmarkStreamRequest) &&
responseType == typeof(StreamLifetimeBenchmarks.BenchmarkResponse)) responseType == typeof(StreamLifetimeBenchmarks.GeneratedBenchmarkResponse))
{ {
descriptor = Descriptor; descriptor = Descriptor;
return true; return true;
@ -102,9 +102,9 @@ public sealed class GeneratedStreamLifetimeBenchmarkRegistry :
CancellationToken cancellationToken) CancellationToken cancellationToken)
{ {
var typedHandler = (IStreamRequestHandler< var typedHandler = (IStreamRequestHandler<
StreamLifetimeBenchmarks.BenchmarkStreamRequest, StreamLifetimeBenchmarks.GeneratedBenchmarkStreamRequest,
StreamLifetimeBenchmarks.BenchmarkResponse>)handler; StreamLifetimeBenchmarks.GeneratedBenchmarkResponse>)handler;
var typedRequest = (StreamLifetimeBenchmarks.BenchmarkStreamRequest)request; var typedRequest = (StreamLifetimeBenchmarks.GeneratedBenchmarkStreamRequest)request;
return typedHandler.Handle(typedRequest, cancellationToken); return typedHandler.Handle(typedRequest, cancellationToken);
} }
} }

View File

@ -31,12 +31,16 @@ namespace GFramework.Cqrs.Benchmarks.Messaging;
[Config(typeof(Config))] [Config(typeof(Config))]
public class StreamLifetimeBenchmarks public class StreamLifetimeBenchmarks
{ {
private MicrosoftDiContainer _container = null!; private MicrosoftDiContainer _reflectionContainer = null!;
private ICqrsRuntime _runtime = null!; private ICqrsRuntime _reflectionRuntime = null!;
private MicrosoftDiContainer _generatedContainer = null!;
private ICqrsRuntime _generatedRuntime = null!;
private ServiceProvider _serviceProvider = null!; private ServiceProvider _serviceProvider = null!;
private IMediator _mediatr = null!; private IMediator _mediatr = null!;
private BenchmarkStreamHandler _baselineHandler = null!; private ReflectionBenchmarkStreamHandler _baselineHandler = null!;
private BenchmarkStreamRequest _request = null!; private ReflectionBenchmarkStreamRequest _reflectionRequest = null!;
private GeneratedBenchmarkStreamRequest _generatedRequest = null!;
private MediatRBenchmarkStreamRequest _mediatrRequest = null!;
/// <summary> /// <summary>
/// 控制当前 benchmark 使用的 handler 生命周期。 /// 控制当前 benchmark 使用的 handler 生命周期。
@ -76,7 +80,7 @@ public class StreamLifetimeBenchmarks
} }
/// <summary> /// <summary>
/// 构建当前生命周期下的 GFramework 与 MediatR stream 对照宿主。 /// 构建当前生命周期下的 GFramework reflection、GFramework generated 与 MediatR stream 对照宿主。
/// </summary> /// </summary>
[GlobalSetup] [GlobalSetup]
public void Setup() public void Setup()
@ -88,24 +92,34 @@ public class StreamLifetimeBenchmarks
Fixture.Setup($"StreamLifetime/{Lifetime}", handlerCount: 1, pipelineCount: 0); Fixture.Setup($"StreamLifetime/{Lifetime}", handlerCount: 1, pipelineCount: 0);
BenchmarkDispatcherCacheHelper.ClearDispatcherCaches(); BenchmarkDispatcherCacheHelper.ClearDispatcherCaches();
_baselineHandler = new BenchmarkStreamHandler(); _baselineHandler = new ReflectionBenchmarkStreamHandler();
_request = new BenchmarkStreamRequest(Guid.NewGuid(), 3); _reflectionRequest = new ReflectionBenchmarkStreamRequest(Guid.NewGuid(), 3);
_generatedRequest = new GeneratedBenchmarkStreamRequest(Guid.NewGuid(), 3);
_mediatrRequest = new MediatRBenchmarkStreamRequest(Guid.NewGuid(), 3);
_container = BenchmarkHostFactory.CreateFrozenGFrameworkContainer(container => _reflectionContainer = BenchmarkHostFactory.CreateFrozenGFrameworkContainer(container =>
{
RegisterReflectionHandler(container, Lifetime);
});
_reflectionRuntime = GFramework.Cqrs.CqrsRuntimeFactory.CreateRuntime(
_reflectionContainer,
LoggerFactoryResolver.Provider.CreateLogger(nameof(StreamLifetimeBenchmarks) + ".Reflection." + Lifetime));
_generatedContainer = BenchmarkHostFactory.CreateFrozenGFrameworkContainer(container =>
{ {
BenchmarkHostFactory.RegisterGeneratedBenchmarkRegistry<GeneratedStreamLifetimeBenchmarkRegistry>(container); BenchmarkHostFactory.RegisterGeneratedBenchmarkRegistry<GeneratedStreamLifetimeBenchmarkRegistry>(container);
RegisterGFrameworkHandler(container, Lifetime); RegisterGeneratedHandler(container, Lifetime);
}); });
// 容器内已提前保留默认 runtime 以支撑 generated registry 接线; // 容器内已提前保留默认 runtime 以支撑 generated registry 接线;
// 这里额外创建带生命周期后缀的 runtime只是为了区分不同 benchmark 矩阵的 dispatcher 日志。 // 这里额外创建带生命周期后缀的 runtime只是为了区分不同 benchmark 矩阵的 dispatcher 日志。
_runtime = GFramework.Cqrs.CqrsRuntimeFactory.CreateRuntime( _generatedRuntime = GFramework.Cqrs.CqrsRuntimeFactory.CreateRuntime(
_container, _generatedContainer,
LoggerFactoryResolver.Provider.CreateLogger(nameof(StreamLifetimeBenchmarks) + "." + Lifetime)); LoggerFactoryResolver.Provider.CreateLogger(nameof(StreamLifetimeBenchmarks) + ".Generated." + Lifetime));
_serviceProvider = BenchmarkHostFactory.CreateMediatRServiceProvider( _serviceProvider = BenchmarkHostFactory.CreateMediatRServiceProvider(
configure: null, configure: null,
typeof(StreamLifetimeBenchmarks), typeof(StreamLifetimeBenchmarks),
static candidateType => candidateType == typeof(BenchmarkStreamHandler), static candidateType => candidateType == typeof(MediatRBenchmarkStreamHandler),
ResolveMediatRLifetime(Lifetime)); ResolveMediatRLifetime(Lifetime));
_mediatr = _serviceProvider.GetRequiredService<IMediator>(); _mediatr = _serviceProvider.GetRequiredService<IMediator>();
} }
@ -118,7 +132,7 @@ public class StreamLifetimeBenchmarks
{ {
try try
{ {
BenchmarkCleanupHelper.DisposeAll(_container, _serviceProvider); BenchmarkCleanupHelper.DisposeAll(_reflectionContainer, _generatedContainer, _serviceProvider);
} }
finally finally
{ {
@ -132,19 +146,38 @@ public class StreamLifetimeBenchmarks
[Benchmark(Baseline = true)] [Benchmark(Baseline = true)]
public async ValueTask Stream_Baseline() public async ValueTask Stream_Baseline()
{ {
await foreach (var response in _baselineHandler.Handle(_request, CancellationToken.None).ConfigureAwait(false)) await foreach (var response in _baselineHandler.Handle(_reflectionRequest, CancellationToken.None).ConfigureAwait(false))
{ {
_ = response; _ = response;
} }
} }
/// <summary> /// <summary>
/// 通过 GFramework.CQRS runtime 创建并完整枚举 stream。 /// 通过 GFramework.CQRS reflection stream binding 路径创建并完整枚举 stream。
/// </summary> /// </summary>
[Benchmark] [Benchmark]
public async ValueTask Stream_GFrameworkCqrs() public async ValueTask Stream_GFrameworkReflection()
{ {
await foreach (var response in _runtime.CreateStream(BenchmarkContext.Instance, _request, CancellationToken.None) await foreach (var response in _reflectionRuntime.CreateStream(
BenchmarkContext.Instance,
_reflectionRequest,
CancellationToken.None)
.ConfigureAwait(false))
{
_ = response;
}
}
/// <summary>
/// 通过 generated stream invoker provider 预热后的 GFramework.CQRS runtime 创建并完整枚举 stream。
/// </summary>
[Benchmark]
public async ValueTask Stream_GFrameworkGenerated()
{
await foreach (var response in _generatedRuntime.CreateStream(
BenchmarkContext.Instance,
_generatedRequest,
CancellationToken.None)
.ConfigureAwait(false)) .ConfigureAwait(false))
{ {
_ = response; _ = response;
@ -157,22 +190,18 @@ public class StreamLifetimeBenchmarks
[Benchmark] [Benchmark]
public async ValueTask Stream_MediatR() public async ValueTask Stream_MediatR()
{ {
await foreach (var response in _mediatr.CreateStream(_request, CancellationToken.None).ConfigureAwait(false)) await foreach (var response in _mediatr.CreateStream(_mediatrRequest, CancellationToken.None).ConfigureAwait(false))
{ {
_ = response; _ = response;
} }
} }
/// <summary> /// <summary>
/// 按生命周期把 benchmark stream handler 注册到 GFramework 容器。 /// 按生命周期把 reflection benchmark stream handler 注册到 GFramework 容器。
/// </summary> /// </summary>
/// <param name="container">当前 benchmark 拥有并负责释放的容器。</param> /// <param name="container">当前 benchmark 拥有并负责释放的容器。</param>
/// <param name="lifetime">待比较的 handler 生命周期。</param> /// <param name="lifetime">待比较的 handler 生命周期。</param>
/// <remarks> private static void RegisterReflectionHandler(MicrosoftDiContainer container, HandlerLifetime lifetime)
/// 先通过 generated registry 提供静态 descriptor再显式覆盖 handler 生命周期,
/// 可以把比较变量收敛到 handler 解析成本,而不是 descriptor 发现路径本身。
/// </remarks>
private static void RegisterGFrameworkHandler(MicrosoftDiContainer container, HandlerLifetime lifetime)
{ {
ArgumentNullException.ThrowIfNull(container); ArgumentNullException.ThrowIfNull(container);
@ -180,14 +209,46 @@ public class StreamLifetimeBenchmarks
{ {
case HandlerLifetime.Singleton: case HandlerLifetime.Singleton:
container.RegisterSingleton< container.RegisterSingleton<
GFramework.Cqrs.Abstractions.Cqrs.IStreamRequestHandler<BenchmarkStreamRequest, BenchmarkResponse>, GFramework.Cqrs.Abstractions.Cqrs.IStreamRequestHandler<ReflectionBenchmarkStreamRequest, ReflectionBenchmarkResponse>,
BenchmarkStreamHandler>(); ReflectionBenchmarkStreamHandler>();
return; return;
case HandlerLifetime.Transient: case HandlerLifetime.Transient:
container.RegisterTransient< container.RegisterTransient<
GFramework.Cqrs.Abstractions.Cqrs.IStreamRequestHandler<BenchmarkStreamRequest, BenchmarkResponse>, GFramework.Cqrs.Abstractions.Cqrs.IStreamRequestHandler<ReflectionBenchmarkStreamRequest, ReflectionBenchmarkResponse>,
BenchmarkStreamHandler>(); ReflectionBenchmarkStreamHandler>();
return;
default:
throw new ArgumentOutOfRangeException(nameof(lifetime), lifetime, "Unsupported benchmark handler lifetime.");
}
}
/// <summary>
/// 按生命周期把 generated benchmark stream handler 注册到 GFramework 容器。
/// </summary>
/// <param name="container">当前 benchmark 拥有并负责释放的容器。</param>
/// <param name="lifetime">待比较的 handler 生命周期。</param>
/// <remarks>
/// generated registry 只负责暴露静态 descriptor
/// 生命周期矩阵仍由 benchmark 主体显式覆盖 handler 注册,避免把 descriptor 发现与实例解析混在一起。
/// </remarks>
private static void RegisterGeneratedHandler(MicrosoftDiContainer container, HandlerLifetime lifetime)
{
ArgumentNullException.ThrowIfNull(container);
switch (lifetime)
{
case HandlerLifetime.Singleton:
container.RegisterSingleton<
GFramework.Cqrs.Abstractions.Cqrs.IStreamRequestHandler<GeneratedBenchmarkStreamRequest, GeneratedBenchmarkResponse>,
GeneratedBenchmarkStreamHandler>();
return;
case HandlerLifetime.Transient:
container.RegisterTransient<
GFramework.Cqrs.Abstractions.Cqrs.IStreamRequestHandler<GeneratedBenchmarkStreamRequest, GeneratedBenchmarkResponse>,
GeneratedBenchmarkStreamHandler>();
return; return;
default: default:
@ -211,69 +272,133 @@ public class StreamLifetimeBenchmarks
} }
/// <summary> /// <summary>
/// Benchmark stream request。 /// Reflection runtime stream request。
/// </summary> /// </summary>
/// <param name="Id">请求标识。</param> /// <param name="Id">请求标识。</param>
/// <param name="ItemCount">返回元素数量。</param> /// <param name="ItemCount">返回元素数量。</param>
public sealed record BenchmarkStreamRequest(Guid Id, int ItemCount) : public sealed record ReflectionBenchmarkStreamRequest(Guid Id, int ItemCount) :
GFramework.Cqrs.Abstractions.Cqrs.IStreamRequest<BenchmarkResponse>, GFramework.Cqrs.Abstractions.Cqrs.IStreamRequest<ReflectionBenchmarkResponse>;
MediatR.IStreamRequest<BenchmarkResponse>;
/// <summary> /// <summary>
/// Benchmark stream response。 /// Reflection runtime stream response。
/// </summary> /// </summary>
/// <param name="Id">响应标识。</param> /// <param name="Id">响应标识。</param>
public sealed record BenchmarkResponse(Guid Id); public sealed record ReflectionBenchmarkResponse(Guid Id);
/// <summary> /// <summary>
/// 同时实现 GFramework.CQRS 与 MediatR 契约的最小 stream handler /// Generated runtime stream request
/// </summary> /// </summary>
public sealed class BenchmarkStreamHandler : /// <param name="Id">请求标识。</param>
GFramework.Cqrs.Abstractions.Cqrs.IStreamRequestHandler<BenchmarkStreamRequest, BenchmarkResponse>, /// <param name="ItemCount">返回元素数量。</param>
MediatR.IStreamRequestHandler<BenchmarkStreamRequest, BenchmarkResponse> public sealed record GeneratedBenchmarkStreamRequest(Guid Id, int ItemCount) :
GFramework.Cqrs.Abstractions.Cqrs.IStreamRequest<GeneratedBenchmarkResponse>;
/// <summary>
/// Generated runtime stream response。
/// </summary>
/// <param name="Id">响应标识。</param>
public sealed record GeneratedBenchmarkResponse(Guid Id);
/// <summary>
/// MediatR stream request。
/// </summary>
/// <param name="Id">请求标识。</param>
/// <param name="ItemCount">返回元素数量。</param>
public sealed record MediatRBenchmarkStreamRequest(Guid Id, int ItemCount) :
MediatR.IStreamRequest<MediatRBenchmarkResponse>;
/// <summary>
/// MediatR stream response。
/// </summary>
/// <param name="Id">响应标识。</param>
public sealed record MediatRBenchmarkResponse(Guid Id);
/// <summary>
/// Reflection runtime 的最小 stream request handler。
/// </summary>
public sealed class ReflectionBenchmarkStreamHandler :
GFramework.Cqrs.Abstractions.Cqrs.IStreamRequestHandler<ReflectionBenchmarkStreamRequest, ReflectionBenchmarkResponse>
{ {
/// <summary> /// <summary>
/// 处理 GFramework.CQRS stream request。 /// 处理 reflection benchmark stream request。
/// </summary> /// </summary>
/// <param name="request">当前 benchmark stream 请求。</param> /// <param name="request">当前 reflection benchmark stream 请求。</param>
/// <param name="cancellationToken">用于中断异步枚举的取消令牌。</param> /// <param name="cancellationToken">用于中断异步枚举的取消令牌。</param>
/// <returns>完整枚举所需的低噪声异步响应序列。</returns> /// <returns>完整枚举所需的低噪声异步响应序列。</returns>
public IAsyncEnumerable<BenchmarkResponse> Handle( public IAsyncEnumerable<ReflectionBenchmarkResponse> Handle(
BenchmarkStreamRequest request, ReflectionBenchmarkStreamRequest request,
CancellationToken cancellationToken) CancellationToken cancellationToken)
{ {
return EnumerateAsync(request, cancellationToken); return EnumerateAsync(
request.Id,
request.ItemCount,
static id => new ReflectionBenchmarkResponse(id),
cancellationToken);
} }
}
/// <summary>
/// Generated runtime 的最小 stream request handler。
/// </summary>
public sealed class GeneratedBenchmarkStreamHandler :
GFramework.Cqrs.Abstractions.Cqrs.IStreamRequestHandler<GeneratedBenchmarkStreamRequest, GeneratedBenchmarkResponse>
{
/// <summary> /// <summary>
/// 处理 MediatR stream request。 /// 处理 generated benchmark stream request。
/// </summary> /// </summary>
/// <param name="request">当前 benchmark stream 请求。</param> /// <param name="request">当前 generated benchmark stream 请求。</param>
/// <param name="cancellationToken">用于中断异步枚举的取消令牌。</param> /// <param name="cancellationToken">用于中断异步枚举的取消令牌。</param>
/// <returns>完整枚举所需的低噪声异步响应序列。</returns> /// <returns>完整枚举所需的低噪声异步响应序列。</returns>
IAsyncEnumerable<BenchmarkResponse> MediatR.IStreamRequestHandler<BenchmarkStreamRequest, BenchmarkResponse>.Handle( public IAsyncEnumerable<GeneratedBenchmarkResponse> Handle(
BenchmarkStreamRequest request, GeneratedBenchmarkStreamRequest request,
CancellationToken cancellationToken) CancellationToken cancellationToken)
{ {
return EnumerateAsync(request, cancellationToken); return EnumerateAsync(
request.Id,
request.ItemCount,
static id => new GeneratedBenchmarkResponse(id),
cancellationToken);
} }
}
/// <summary>
/// MediatR 对照组的最小 stream request handler。
/// </summary>
public sealed class MediatRBenchmarkStreamHandler :
MediatR.IStreamRequestHandler<MediatRBenchmarkStreamRequest, MediatRBenchmarkResponse>
{
/// <summary> /// <summary>
/// 为生命周期矩阵构造稳定、低噪声的异步响应序列。 /// 处理 MediatR benchmark stream request
/// </summary> /// </summary>
/// <param name="request">当前 benchmark 请求。</param> /// <param name="request">当前 MediatR benchmark stream 请求。</param>
/// <param name="cancellationToken">用于中断异步枚举的取消令牌。</param> /// <param name="cancellationToken">用于中断异步枚举的取消令牌。</param>
/// <returns>按固定元素数量返回的异步响应序列。</returns> /// <returns>完整枚举所需的低噪声异步响应序列。</returns>
private static async IAsyncEnumerable<BenchmarkResponse> EnumerateAsync( public IAsyncEnumerable<MediatRBenchmarkResponse> Handle(
BenchmarkStreamRequest request, MediatRBenchmarkStreamRequest request,
[System.Runtime.CompilerServices.EnumeratorCancellation] CancellationToken cancellationToken) CancellationToken cancellationToken)
{ {
for (var index = 0; index < request.ItemCount; index++) return EnumerateAsync(
{ request.Id,
cancellationToken.ThrowIfCancellationRequested(); request.ItemCount,
yield return new BenchmarkResponse(request.Id); static id => new MediatRBenchmarkResponse(id),
await Task.CompletedTask.ConfigureAwait(false); cancellationToken);
} }
}
/// <summary>
/// 为生命周期矩阵构造相同形状的低噪声异步枚举,避免不同口径的枚举体差异干扰 dispatch 对照。
/// </summary>
private static async IAsyncEnumerable<TResponse> EnumerateAsync<TResponse>(
Guid id,
int itemCount,
Func<Guid, TResponse> responseFactory,
[System.Runtime.CompilerServices.EnumeratorCancellation] CancellationToken cancellationToken)
{
for (var index = 0; index < itemCount; index++)
{
cancellationToken.ThrowIfCancellationRequested();
yield return responseFactory(id);
await Task.CompletedTask.ConfigureAwait(false);
} }
} }
} }