mirror of
https://github.com/GeWuYou/GFramework.git
synced 2026-05-13 14:14:29 +08:00
test(cqrs-benchmarks): 拆分 streaming benchmark 观测口径
- 新增 FirstItem 与 DrainAll 两种 stream 观测模式,补齐 steady-state StreamingBenchmarks 的参数矩阵。 - 重构 stream 消费路径为共享 helper,分别覆盖首元素观测与完整枚举基线。
This commit is contained in:
parent
699d0b4896
commit
79ae5f0b5a
@ -24,8 +24,13 @@ using Microsoft.Extensions.DependencyInjection;
|
|||||||
namespace GFramework.Cqrs.Benchmarks.Messaging;
|
namespace GFramework.Cqrs.Benchmarks.Messaging;
|
||||||
|
|
||||||
/// <summary>
|
/// <summary>
|
||||||
/// 对比单个 stream request 在直接调用、GFramework.CQRS runtime 与 MediatR 之间的完整枚举开销。
|
/// 对比单个 stream request 在直接调用、GFramework.CQRS runtime 与 MediatR 之间的 steady-state stream 开销。
|
||||||
/// </summary>
|
/// </summary>
|
||||||
|
/// <remarks>
|
||||||
|
/// 默认 generated-provider stream 宿主同时暴露 <see cref="StreamObservation.FirstItem" /> 与
|
||||||
|
/// <see cref="StreamObservation.DrainAll" /> 两种观测口径,
|
||||||
|
/// 以便把“建流到首个元素”的固定成本与“完整枚举整个 stream”的总成本拆开观察。
|
||||||
|
/// </remarks>
|
||||||
[Config(typeof(Config))]
|
[Config(typeof(Config))]
|
||||||
public class StreamingBenchmarks
|
public class StreamingBenchmarks
|
||||||
{
|
{
|
||||||
@ -36,6 +41,28 @@ public class StreamingBenchmarks
|
|||||||
private BenchmarkStreamHandler _baselineHandler = null!;
|
private BenchmarkStreamHandler _baselineHandler = null!;
|
||||||
private BenchmarkStreamRequest _request = null!;
|
private BenchmarkStreamRequest _request = null!;
|
||||||
|
|
||||||
|
/// <summary>
|
||||||
|
/// 控制当前 benchmark 观察“只推进首个元素”还是“完整枚举整个 stream”。
|
||||||
|
/// </summary>
|
||||||
|
[Params(StreamObservation.FirstItem, StreamObservation.DrainAll)]
|
||||||
|
public StreamObservation Observation { get; set; }
|
||||||
|
|
||||||
|
/// <summary>
|
||||||
|
/// 用于拆分 stream dispatch 与后续枚举成本的观测模式。
|
||||||
|
/// </summary>
|
||||||
|
public enum StreamObservation
|
||||||
|
{
|
||||||
|
/// <summary>
|
||||||
|
/// 只推进到首个元素后立即释放枚举器。
|
||||||
|
/// </summary>
|
||||||
|
FirstItem,
|
||||||
|
|
||||||
|
/// <summary>
|
||||||
|
/// 完整枚举整个 stream,保留原有 benchmark 语义。
|
||||||
|
/// </summary>
|
||||||
|
DrainAll
|
||||||
|
}
|
||||||
|
|
||||||
/// <summary>
|
/// <summary>
|
||||||
/// 配置 stream benchmark 的公共输出格式。
|
/// 配置 stream benchmark 的公共输出格式。
|
||||||
/// </summary>
|
/// </summary>
|
||||||
@ -100,37 +127,91 @@ public class StreamingBenchmarks
|
|||||||
}
|
}
|
||||||
|
|
||||||
/// <summary>
|
/// <summary>
|
||||||
/// 直接调用 handler 并完整枚举响应序列,作为 stream dispatch 额外开销的 baseline。
|
/// 直接调用 handler,并按当前观测模式消费响应序列,作为 stream dispatch 额外开销的 baseline。
|
||||||
/// </summary>
|
/// </summary>
|
||||||
[Benchmark(Baseline = true)]
|
[Benchmark(Baseline = true)]
|
||||||
public async ValueTask Stream_Baseline()
|
public ValueTask Stream_Baseline()
|
||||||
{
|
{
|
||||||
await foreach (var response in _baselineHandler.Handle(_request, CancellationToken.None).ConfigureAwait(false))
|
return ObserveAsync(_baselineHandler.Handle(_request, CancellationToken.None), Observation);
|
||||||
|
}
|
||||||
|
|
||||||
|
/// <summary>
|
||||||
|
/// 通过 GFramework.CQRS runtime 创建 stream,并按当前观测模式消费。
|
||||||
|
/// </summary>
|
||||||
|
[Benchmark]
|
||||||
|
public ValueTask Stream_GFrameworkCqrs()
|
||||||
|
{
|
||||||
|
return ObserveAsync(
|
||||||
|
_runtime.CreateStream(
|
||||||
|
BenchmarkContext.Instance,
|
||||||
|
_request,
|
||||||
|
CancellationToken.None),
|
||||||
|
Observation);
|
||||||
|
}
|
||||||
|
|
||||||
|
/// <summary>
|
||||||
|
/// 通过 MediatR 创建 stream,并按当前观测模式消费,作为外部设计对照。
|
||||||
|
/// </summary>
|
||||||
|
[Benchmark]
|
||||||
|
public ValueTask Stream_MediatR()
|
||||||
|
{
|
||||||
|
return ObserveAsync(_mediatr.CreateStream(_request, CancellationToken.None), Observation);
|
||||||
|
}
|
||||||
|
|
||||||
|
/// <summary>
|
||||||
|
/// 按观测模式消费 stream,便于把“建流/首个元素”和“完整枚举”分开观察。
|
||||||
|
/// </summary>
|
||||||
|
/// <typeparam name="TResponse">当前 stream 的响应类型。</typeparam>
|
||||||
|
/// <param name="responses">待观察的异步响应序列。</param>
|
||||||
|
/// <param name="observation">当前 benchmark 选定的观测模式。</param>
|
||||||
|
/// <returns>异步消费完成后的等待句柄。</returns>
|
||||||
|
private static ValueTask ObserveAsync<TResponse>(
|
||||||
|
IAsyncEnumerable<TResponse> responses,
|
||||||
|
StreamObservation observation)
|
||||||
|
{
|
||||||
|
ArgumentNullException.ThrowIfNull(responses);
|
||||||
|
|
||||||
|
return observation switch
|
||||||
{
|
{
|
||||||
_ = response;
|
StreamObservation.FirstItem => ConsumeFirstItemAsync(responses, CancellationToken.None),
|
||||||
|
StreamObservation.DrainAll => DrainAsync(responses),
|
||||||
|
_ => throw new ArgumentOutOfRangeException(
|
||||||
|
nameof(observation),
|
||||||
|
observation,
|
||||||
|
"Unsupported stream observation mode.")
|
||||||
|
};
|
||||||
|
}
|
||||||
|
|
||||||
|
/// <summary>
|
||||||
|
/// 只推进到首个元素后立即释放枚举器,用来近似隔离建流与首个 `MoveNextAsync` 的固定成本。
|
||||||
|
/// </summary>
|
||||||
|
/// <typeparam name="TResponse">当前 stream 的响应类型。</typeparam>
|
||||||
|
/// <param name="responses">待观察的异步响应序列。</param>
|
||||||
|
/// <param name="cancellationToken">用于向异步枚举器传播取消的令牌。</param>
|
||||||
|
/// <returns>消费首个元素后的等待句柄。</returns>
|
||||||
|
private static async ValueTask ConsumeFirstItemAsync<TResponse>(
|
||||||
|
IAsyncEnumerable<TResponse> responses,
|
||||||
|
CancellationToken cancellationToken)
|
||||||
|
{
|
||||||
|
var enumerator = responses.GetAsyncEnumerator(cancellationToken);
|
||||||
|
await using (enumerator.ConfigureAwait(false))
|
||||||
|
{
|
||||||
|
if (await enumerator.MoveNextAsync().ConfigureAwait(false))
|
||||||
|
{
|
||||||
|
_ = enumerator.Current;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
/// <summary>
|
/// <summary>
|
||||||
/// 通过 GFramework.CQRS runtime 创建并完整枚举 stream。
|
/// 完整枚举整个 stream,保留原 benchmark 的总成本观测口径。
|
||||||
/// </summary>
|
/// </summary>
|
||||||
[Benchmark]
|
/// <typeparam name="TResponse">当前 stream 的响应类型。</typeparam>
|
||||||
public async ValueTask Stream_GFrameworkCqrs()
|
/// <param name="responses">待完整枚举的异步响应序列。</param>
|
||||||
|
/// <returns>完整枚举结束后的等待句柄。</returns>
|
||||||
|
private static async ValueTask DrainAsync<TResponse>(IAsyncEnumerable<TResponse> responses)
|
||||||
{
|
{
|
||||||
await foreach (var response in _runtime.CreateStream(BenchmarkContext.Instance, _request, CancellationToken.None)
|
await foreach (var response in responses.ConfigureAwait(false))
|
||||||
.ConfigureAwait(false))
|
|
||||||
{
|
|
||||||
_ = response;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
/// <summary>
|
|
||||||
/// 通过 MediatR 创建并完整枚举 stream,作为外部设计对照。
|
|
||||||
/// </summary>
|
|
||||||
[Benchmark]
|
|
||||||
public async ValueTask Stream_MediatR()
|
|
||||||
{
|
|
||||||
await foreach (var response in _mediatr.CreateStream(_request, CancellationToken.None).ConfigureAwait(false))
|
|
||||||
{
|
{
|
||||||
_ = response;
|
_ = response;
|
||||||
}
|
}
|
||||||
|
|||||||
Loading…
x
Reference in New Issue
Block a user