GFramework/GFramework.Cqrs.Benchmarks/Messaging/StreamLifetimeBenchmarks.cs
gewuyou 6d5d4be20b docs(cqrs): 收口PR345评审反馈
- 更新 AGENTS 多智能体预算术语说明,明确 worker 波次停止边界

- 修复 StreamLifetimeBenchmarks 的取消传播、观测维度注释与枚举器特性写法

- 调整 benchmark README 与 CQRS ai-plan 恢复文档,移除过期 PR 锚点和旧 diff 统计
2026-05-09 17:10:54 +08:00

480 lines
20 KiB
C#
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

// Copyright (c) 2025-2026 GeWuYou
// SPDX-License-Identifier: Apache-2.0
using BenchmarkDotNet.Attributes;
using BenchmarkDotNet.Columns;
using BenchmarkDotNet.Configs;
using BenchmarkDotNet.Diagnosers;
using BenchmarkDotNet.Jobs;
using BenchmarkDotNet.Order;
using System;
using System.Collections.Generic;
using System.Runtime.CompilerServices;
using System.Threading;
using System.Threading.Tasks;
using GFramework.Core.Abstractions.Logging;
using GFramework.Core.Ioc;
using GFramework.Core.Logging;
using GFramework.Cqrs.Abstractions.Cqrs;
using MediatR;
using Microsoft.Extensions.DependencyInjection;
namespace GFramework.Cqrs.Benchmarks.Messaging;
/// <summary>
/// 对比 stream 在不同 handler 生命周期与观测方式下的额外开销。
/// </summary>
/// <remarks>
/// 当前矩阵只覆盖 `Singleton` 与 `Transient`。
/// `Scoped` 仍依赖真实的显式作用域边界;在当前“单根容器最小宿主”模型下直接加入 scoped 会把枚举宿主成本与生命周期成本混在一起,
/// 因此保持与 request 生命周期矩阵相同的边界,留待后续 scoped host 基线具备后再扩展。
/// <see cref="StreamObservation" /> 当前只保留 <see cref="StreamObservation.FirstItem" /> 与
/// <see cref="StreamObservation.DrainAll" /> 两种模式,分别用于观察建流到首个元素的固定成本与完整枚举的总成本,
/// 以避免把更多观测策略与 <see cref="StreamLifetimeBenchmarks" /> 的生命周期对照目标混在一起。
/// </remarks>
[Config(typeof(Config))]
public class StreamLifetimeBenchmarks
{
private MicrosoftDiContainer _reflectionContainer = null!;
private ICqrsRuntime _reflectionRuntime = null!;
private MicrosoftDiContainer _generatedContainer = null!;
private ICqrsRuntime _generatedRuntime = null!;
private ServiceProvider _serviceProvider = null!;
private IMediator _mediatr = null!;
private ReflectionBenchmarkStreamHandler _baselineHandler = null!;
private ReflectionBenchmarkStreamRequest _reflectionRequest = null!;
private GeneratedBenchmarkStreamRequest _generatedRequest = null!;
private MediatRBenchmarkStreamRequest _mediatrRequest = null!;
/// <summary>
/// 控制当前 benchmark 使用的 handler 生命周期。
/// </summary>
[Params(HandlerLifetime.Singleton, HandlerLifetime.Transient)]
public HandlerLifetime Lifetime { get; set; }
/// <summary>
/// 控制当前 benchmark 观察“只推进首个元素”还是“完整枚举整个 stream”。
/// </summary>
[Params(StreamObservation.FirstItem, StreamObservation.DrainAll)]
public StreamObservation Observation { get; set; }
/// <summary>
/// 可公平比较的 benchmark handler 生命周期集合。
/// </summary>
public enum HandlerLifetime
{
/// <summary>
/// 复用单个 handler 实例。
/// </summary>
Singleton,
/// <summary>
/// 每次建流都重新解析新的 handler 实例。
/// </summary>
Transient
}
/// <summary>
/// 用于拆分 stream dispatch 与后续枚举成本的观测模式。
/// </summary>
public enum StreamObservation
{
/// <summary>
/// 只推进到首个元素后立即释放枚举器。
/// </summary>
FirstItem,
/// <summary>
/// 完整枚举整个 stream保留原有 benchmark 语义。
/// </summary>
DrainAll
}
/// <summary>
/// 配置 stream 生命周期 benchmark 的公共输出格式。
/// </summary>
private sealed class Config : ManualConfig
{
public Config()
{
AddJob(Job.Default);
AddColumnProvider(DefaultColumnProviders.Instance);
AddColumn(new CustomColumn("Scenario", static (_, _) => "StreamLifetime"));
AddDiagnoser(MemoryDiagnoser.Default);
WithOrderer(new DefaultOrderer(SummaryOrderPolicy.FastestToSlowest, MethodOrderPolicy.Declared));
}
}
/// <summary>
/// 构建当前生命周期下的 GFramework reflection、GFramework generated 与 MediatR stream 对照宿主。
/// </summary>
[GlobalSetup]
public void Setup()
{
LoggerFactoryResolver.Provider = new ConsoleLoggerFactoryProvider
{
MinLevel = LogLevel.Fatal
};
Fixture.Setup($"StreamLifetime/{Lifetime}", handlerCount: 1, pipelineCount: 0);
BenchmarkDispatcherCacheHelper.ClearDispatcherCaches();
_baselineHandler = new ReflectionBenchmarkStreamHandler();
_reflectionRequest = new ReflectionBenchmarkStreamRequest(Guid.NewGuid(), 3);
_generatedRequest = new GeneratedBenchmarkStreamRequest(Guid.NewGuid(), 3);
_mediatrRequest = new MediatRBenchmarkStreamRequest(Guid.NewGuid(), 3);
_reflectionContainer = BenchmarkHostFactory.CreateFrozenGFrameworkContainer(container =>
{
RegisterReflectionHandler(container, Lifetime);
});
_reflectionRuntime = GFramework.Cqrs.CqrsRuntimeFactory.CreateRuntime(
_reflectionContainer,
LoggerFactoryResolver.Provider.CreateLogger(nameof(StreamLifetimeBenchmarks) + ".Reflection." + Lifetime));
_generatedContainer = BenchmarkHostFactory.CreateFrozenGFrameworkContainer(container =>
{
BenchmarkHostFactory.RegisterGeneratedBenchmarkRegistry<GeneratedStreamLifetimeBenchmarkRegistry>(container);
RegisterGeneratedHandler(container, Lifetime);
});
// 容器内已提前保留默认 runtime 以支撑 generated registry 接线;
// 这里额外创建带生命周期后缀的 runtime只是为了区分不同 benchmark 矩阵的 dispatcher 日志。
_generatedRuntime = GFramework.Cqrs.CqrsRuntimeFactory.CreateRuntime(
_generatedContainer,
LoggerFactoryResolver.Provider.CreateLogger(nameof(StreamLifetimeBenchmarks) + ".Generated." + Lifetime));
_serviceProvider = BenchmarkHostFactory.CreateMediatRServiceProvider(
configure: null,
typeof(StreamLifetimeBenchmarks),
static candidateType => candidateType == typeof(MediatRBenchmarkStreamHandler),
ResolveMediatRLifetime(Lifetime));
_mediatr = _serviceProvider.GetRequiredService<IMediator>();
}
/// <summary>
/// 释放当前生命周期矩阵持有的 benchmark 宿主资源,并清理 dispatcher 缓存。
/// </summary>
[GlobalCleanup]
public void Cleanup()
{
try
{
BenchmarkCleanupHelper.DisposeAll(_reflectionContainer, _generatedContainer, _serviceProvider);
}
finally
{
BenchmarkDispatcherCacheHelper.ClearDispatcherCaches();
}
}
/// <summary>
/// 直接调用 handler并按当前观测模式消费 stream作为不同生命周期矩阵下的 dispatch 额外开销 baseline。
/// </summary>
[Benchmark(Baseline = true)]
public ValueTask Stream_Baseline()
{
return ObserveAsync(_baselineHandler.Handle(_reflectionRequest, CancellationToken.None), Observation);
}
/// <summary>
/// 通过 GFramework.CQRS reflection stream binding 路径创建 stream并按当前观测模式消费。
/// </summary>
[Benchmark]
public ValueTask Stream_GFrameworkReflection()
{
return ObserveAsync(
_reflectionRuntime.CreateStream(
BenchmarkContext.Instance,
_reflectionRequest,
CancellationToken.None),
Observation);
}
/// <summary>
/// 通过 generated stream invoker provider 预热后的 GFramework.CQRS runtime 创建 stream并按当前观测模式消费。
/// </summary>
[Benchmark]
public ValueTask Stream_GFrameworkGenerated()
{
return ObserveAsync(
_generatedRuntime.CreateStream(
BenchmarkContext.Instance,
_generatedRequest,
CancellationToken.None),
Observation);
}
/// <summary>
/// 通过 MediatR 创建 stream并按当前观测模式消费作为外部对照。
/// </summary>
[Benchmark]
public ValueTask Stream_MediatR()
{
return ObserveAsync(_mediatr.CreateStream(_mediatrRequest, CancellationToken.None), Observation);
}
/// <summary>
/// 按生命周期把 reflection benchmark stream handler 注册到 GFramework 容器。
/// </summary>
/// <param name="container">当前 benchmark 拥有并负责释放的容器。</param>
/// <param name="lifetime">待比较的 handler 生命周期。</param>
private static void RegisterReflectionHandler(MicrosoftDiContainer container, HandlerLifetime lifetime)
{
ArgumentNullException.ThrowIfNull(container);
switch (lifetime)
{
case HandlerLifetime.Singleton:
container.RegisterSingleton<
GFramework.Cqrs.Abstractions.Cqrs.IStreamRequestHandler<ReflectionBenchmarkStreamRequest, ReflectionBenchmarkResponse>,
ReflectionBenchmarkStreamHandler>();
return;
case HandlerLifetime.Transient:
container.RegisterTransient<
GFramework.Cqrs.Abstractions.Cqrs.IStreamRequestHandler<ReflectionBenchmarkStreamRequest, ReflectionBenchmarkResponse>,
ReflectionBenchmarkStreamHandler>();
return;
default:
throw new ArgumentOutOfRangeException(nameof(lifetime), lifetime, "Unsupported benchmark handler lifetime.");
}
}
/// <summary>
/// 按生命周期把 generated benchmark stream handler 注册到 GFramework 容器。
/// </summary>
/// <param name="container">当前 benchmark 拥有并负责释放的容器。</param>
/// <param name="lifetime">待比较的 handler 生命周期。</param>
/// <remarks>
/// generated registry 只负责暴露静态 descriptor
/// 生命周期矩阵仍由 benchmark 主体显式覆盖 handler 注册,避免把 descriptor 发现与实例解析混在一起。
/// </remarks>
private static void RegisterGeneratedHandler(MicrosoftDiContainer container, HandlerLifetime lifetime)
{
ArgumentNullException.ThrowIfNull(container);
switch (lifetime)
{
case HandlerLifetime.Singleton:
container.RegisterSingleton<
GFramework.Cqrs.Abstractions.Cqrs.IStreamRequestHandler<GeneratedBenchmarkStreamRequest, GeneratedBenchmarkResponse>,
GeneratedBenchmarkStreamHandler>();
return;
case HandlerLifetime.Transient:
container.RegisterTransient<
GFramework.Cqrs.Abstractions.Cqrs.IStreamRequestHandler<GeneratedBenchmarkStreamRequest, GeneratedBenchmarkResponse>,
GeneratedBenchmarkStreamHandler>();
return;
default:
throw new ArgumentOutOfRangeException(nameof(lifetime), lifetime, "Unsupported benchmark handler lifetime.");
}
}
/// <summary>
/// 将 benchmark 生命周期映射为 MediatR 组装所需的 <see cref="ServiceLifetime" />。
/// </summary>
/// <param name="lifetime">待比较的 handler 生命周期。</param>
/// <returns>当前生命周期对应的 MediatR 注册方式。</returns>
private static ServiceLifetime ResolveMediatRLifetime(HandlerLifetime lifetime)
{
return lifetime switch
{
HandlerLifetime.Singleton => ServiceLifetime.Singleton,
HandlerLifetime.Transient => ServiceLifetime.Transient,
_ => throw new ArgumentOutOfRangeException(nameof(lifetime), lifetime, "Unsupported benchmark handler lifetime.")
};
}
/// <summary>
/// 按观测模式消费 stream便于把“建流/首个元素”和“完整枚举”分开观察。
/// </summary>
/// <typeparam name="TResponse">当前 stream 的响应类型。</typeparam>
/// <param name="responses">待观察的异步响应序列。</param>
/// <param name="observation">当前 benchmark 选定的观测模式。</param>
/// <returns>异步消费完成后的等待句柄。</returns>
private static ValueTask ObserveAsync<TResponse>(
IAsyncEnumerable<TResponse> responses,
StreamObservation observation)
{
ArgumentNullException.ThrowIfNull(responses);
return observation switch
{
StreamObservation.FirstItem => ConsumeFirstItemAsync(responses, CancellationToken.None),
StreamObservation.DrainAll => DrainAsync(responses),
_ => throw new ArgumentOutOfRangeException(
nameof(observation),
observation,
"Unsupported stream observation mode.")
};
}
/// <summary>
/// 只推进到首个元素后立即释放枚举器,用来近似隔离建流与首个 `MoveNextAsync` 的固定成本。
/// </summary>
/// <typeparam name="TResponse">当前 stream 的响应类型。</typeparam>
/// <param name="responses">待观察的异步响应序列。</param>
/// <param name="cancellationToken">用于向异步枚举器传播取消的令牌。</param>
/// <returns>消费首个元素后的等待句柄。</returns>
private static async ValueTask ConsumeFirstItemAsync<TResponse>(
IAsyncEnumerable<TResponse> responses,
CancellationToken cancellationToken)
{
var enumerator = responses.GetAsyncEnumerator(cancellationToken);
await using (enumerator.ConfigureAwait(false))
{
if (await enumerator.MoveNextAsync().ConfigureAwait(false))
{
_ = enumerator.Current;
}
}
}
/// <summary>
/// 完整枚举整个 stream保留原 benchmark 的总成本观测口径。
/// </summary>
/// <typeparam name="TResponse">当前 stream 的响应类型。</typeparam>
/// <param name="responses">待完整枚举的异步响应序列。</param>
/// <returns>完整枚举结束后的等待句柄。</returns>
private static async ValueTask DrainAsync<TResponse>(IAsyncEnumerable<TResponse> responses)
{
await foreach (var response in responses.ConfigureAwait(false))
{
_ = response;
}
}
/// <summary>
/// Reflection runtime stream request。
/// </summary>
/// <param name="Id">请求标识。</param>
/// <param name="ItemCount">返回元素数量。</param>
public sealed record ReflectionBenchmarkStreamRequest(Guid Id, int ItemCount) :
GFramework.Cqrs.Abstractions.Cqrs.IStreamRequest<ReflectionBenchmarkResponse>;
/// <summary>
/// Reflection runtime stream response。
/// </summary>
/// <param name="Id">响应标识。</param>
public sealed record ReflectionBenchmarkResponse(Guid Id);
/// <summary>
/// Generated runtime stream request。
/// </summary>
/// <param name="Id">请求标识。</param>
/// <param name="ItemCount">返回元素数量。</param>
public sealed record GeneratedBenchmarkStreamRequest(Guid Id, int ItemCount) :
GFramework.Cqrs.Abstractions.Cqrs.IStreamRequest<GeneratedBenchmarkResponse>;
/// <summary>
/// Generated runtime stream response。
/// </summary>
/// <param name="Id">响应标识。</param>
public sealed record GeneratedBenchmarkResponse(Guid Id);
/// <summary>
/// MediatR stream request。
/// </summary>
/// <param name="Id">请求标识。</param>
/// <param name="ItemCount">返回元素数量。</param>
public sealed record MediatRBenchmarkStreamRequest(Guid Id, int ItemCount) :
MediatR.IStreamRequest<MediatRBenchmarkResponse>;
/// <summary>
/// MediatR stream response。
/// </summary>
/// <param name="Id">响应标识。</param>
public sealed record MediatRBenchmarkResponse(Guid Id);
/// <summary>
/// Reflection runtime 的最小 stream request handler。
/// </summary>
public sealed class ReflectionBenchmarkStreamHandler :
GFramework.Cqrs.Abstractions.Cqrs.IStreamRequestHandler<ReflectionBenchmarkStreamRequest, ReflectionBenchmarkResponse>
{
/// <summary>
/// 处理 reflection benchmark stream request。
/// </summary>
/// <param name="request">当前 reflection benchmark stream 请求。</param>
/// <param name="cancellationToken">用于中断异步枚举的取消令牌。</param>
/// <returns>完整枚举所需的低噪声异步响应序列。</returns>
public IAsyncEnumerable<ReflectionBenchmarkResponse> Handle(
ReflectionBenchmarkStreamRequest request,
CancellationToken cancellationToken)
{
return EnumerateAsync(
request.Id,
request.ItemCount,
static id => new ReflectionBenchmarkResponse(id),
cancellationToken);
}
}
/// <summary>
/// Generated runtime 的最小 stream request handler。
/// </summary>
public sealed class GeneratedBenchmarkStreamHandler :
GFramework.Cqrs.Abstractions.Cqrs.IStreamRequestHandler<GeneratedBenchmarkStreamRequest, GeneratedBenchmarkResponse>
{
/// <summary>
/// 处理 generated benchmark stream request。
/// </summary>
/// <param name="request">当前 generated benchmark stream 请求。</param>
/// <param name="cancellationToken">用于中断异步枚举的取消令牌。</param>
/// <returns>完整枚举所需的低噪声异步响应序列。</returns>
public IAsyncEnumerable<GeneratedBenchmarkResponse> Handle(
GeneratedBenchmarkStreamRequest request,
CancellationToken cancellationToken)
{
return EnumerateAsync(
request.Id,
request.ItemCount,
static id => new GeneratedBenchmarkResponse(id),
cancellationToken);
}
}
/// <summary>
/// MediatR 对照组的最小 stream request handler。
/// </summary>
public sealed class MediatRBenchmarkStreamHandler :
MediatR.IStreamRequestHandler<MediatRBenchmarkStreamRequest, MediatRBenchmarkResponse>
{
/// <summary>
/// 处理 MediatR benchmark stream request。
/// </summary>
/// <param name="request">当前 MediatR benchmark stream 请求。</param>
/// <param name="cancellationToken">用于中断异步枚举的取消令牌。</param>
/// <returns>完整枚举所需的低噪声异步响应序列。</returns>
public IAsyncEnumerable<MediatRBenchmarkResponse> Handle(
MediatRBenchmarkStreamRequest request,
CancellationToken cancellationToken)
{
return EnumerateAsync(
request.Id,
request.ItemCount,
static id => new MediatRBenchmarkResponse(id),
cancellationToken);
}
}
/// <summary>
/// 为生命周期矩阵构造相同形状的低噪声异步枚举,避免不同口径的枚举体差异干扰 dispatch 对照。
/// </summary>
private static async IAsyncEnumerable<TResponse> EnumerateAsync<TResponse>(
Guid id,
int itemCount,
Func<Guid, TResponse> responseFactory,
[EnumeratorCancellation] CancellationToken cancellationToken)
{
for (var index = 0; index < itemCount; index++)
{
cancellationToken.ThrowIfCancellationRequested();
yield return responseFactory(id);
await Task.CompletedTask.ConfigureAwait(false);
}
}
}