From 79ae5f0b5ae44f81f49c79d85d61f0606909c98c Mon Sep 17 00:00:00 2001
From: gewuyou <95328647+GeWuYou@users.noreply.github.com>
Date: Mon, 11 May 2026 07:50:04 +0800
Subject: [PATCH] =?UTF-8?q?test(cqrs-benchmarks):=20=E6=8B=86=E5=88=86=20s?=
=?UTF-8?q?treaming=20benchmark=20=E8=A7=82=E6=B5=8B=E5=8F=A3=E5=BE=84?=
MIME-Version: 1.0
Content-Type: text/plain; charset=UTF-8
Content-Transfer-Encoding: 8bit
- 新增 FirstItem 与 DrainAll 两种 stream 观测模式,补齐 steady-state StreamingBenchmarks 的参数矩阵。
- 重构 stream 消费路径为共享 helper,分别覆盖首元素观测与完整枚举基线。
---
.../Messaging/StreamingBenchmarks.cs | 125 +++++++++++++++---
1 file changed, 103 insertions(+), 22 deletions(-)
diff --git a/GFramework.Cqrs.Benchmarks/Messaging/StreamingBenchmarks.cs b/GFramework.Cqrs.Benchmarks/Messaging/StreamingBenchmarks.cs
index 8b886d29..bfb7d07e 100644
--- a/GFramework.Cqrs.Benchmarks/Messaging/StreamingBenchmarks.cs
+++ b/GFramework.Cqrs.Benchmarks/Messaging/StreamingBenchmarks.cs
@@ -24,8 +24,13 @@ using Microsoft.Extensions.DependencyInjection;
namespace GFramework.Cqrs.Benchmarks.Messaging;
///
-/// 对比单个 stream request 在直接调用、GFramework.CQRS runtime 与 MediatR 之间的完整枚举开销。
+/// 对比单个 stream request 在直接调用、GFramework.CQRS runtime 与 MediatR 之间的 steady-state stream 开销。
///
+///
+/// 默认 generated-provider stream 宿主同时暴露 与
+/// 两种观测口径,
+/// 以便把“建流到首个元素”的固定成本与“完整枚举整个 stream”的总成本拆开观察。
+///
[Config(typeof(Config))]
public class StreamingBenchmarks
{
@@ -36,6 +41,28 @@ public class StreamingBenchmarks
private BenchmarkStreamHandler _baselineHandler = null!;
private BenchmarkStreamRequest _request = null!;
+ ///
+ /// 控制当前 benchmark 观察“只推进首个元素”还是“完整枚举整个 stream”。
+ ///
+ [Params(StreamObservation.FirstItem, StreamObservation.DrainAll)]
+ public StreamObservation Observation { get; set; }
+
+ ///
+ /// 用于拆分 stream dispatch 与后续枚举成本的观测模式。
+ ///
+ public enum StreamObservation
+ {
+ ///
+ /// 只推进到首个元素后立即释放枚举器。
+ ///
+ FirstItem,
+
+ ///
+ /// 完整枚举整个 stream,保留原有 benchmark 语义。
+ ///
+ DrainAll
+ }
+
///
/// 配置 stream benchmark 的公共输出格式。
///
@@ -100,37 +127,91 @@ public class StreamingBenchmarks
}
///
- /// 直接调用 handler 并完整枚举响应序列,作为 stream dispatch 额外开销的 baseline。
+ /// 直接调用 handler,并按当前观测模式消费响应序列,作为 stream dispatch 额外开销的 baseline。
///
[Benchmark(Baseline = true)]
- public async ValueTask Stream_Baseline()
+ public ValueTask Stream_Baseline()
{
- await foreach (var response in _baselineHandler.Handle(_request, CancellationToken.None).ConfigureAwait(false))
+ return ObserveAsync(_baselineHandler.Handle(_request, CancellationToken.None), Observation);
+ }
+
+ ///
+ /// 通过 GFramework.CQRS runtime 创建 stream,并按当前观测模式消费。
+ ///
+ [Benchmark]
+ public ValueTask Stream_GFrameworkCqrs()
+ {
+ return ObserveAsync(
+ _runtime.CreateStream(
+ BenchmarkContext.Instance,
+ _request,
+ CancellationToken.None),
+ Observation);
+ }
+
+ ///
+ /// 通过 MediatR 创建 stream,并按当前观测模式消费,作为外部设计对照。
+ ///
+ [Benchmark]
+ public ValueTask Stream_MediatR()
+ {
+ return ObserveAsync(_mediatr.CreateStream(_request, CancellationToken.None), Observation);
+ }
+
+ ///
+ /// 按观测模式消费 stream,便于把“建流/首个元素”和“完整枚举”分开观察。
+ ///
+ /// 当前 stream 的响应类型。
+ /// 待观察的异步响应序列。
+ /// 当前 benchmark 选定的观测模式。
+ /// 异步消费完成后的等待句柄。
+ private static ValueTask ObserveAsync(
+ IAsyncEnumerable responses,
+ StreamObservation observation)
+ {
+ ArgumentNullException.ThrowIfNull(responses);
+
+ return observation switch
{
- _ = response;
+ StreamObservation.FirstItem => ConsumeFirstItemAsync(responses, CancellationToken.None),
+ StreamObservation.DrainAll => DrainAsync(responses),
+ _ => throw new ArgumentOutOfRangeException(
+ nameof(observation),
+ observation,
+ "Unsupported stream observation mode.")
+ };
+ }
+
+ ///
+ /// 只推进到首个元素后立即释放枚举器,用来近似隔离建流与首个 `MoveNextAsync` 的固定成本。
+ ///
+ /// 当前 stream 的响应类型。
+ /// 待观察的异步响应序列。
+ /// 用于向异步枚举器传播取消的令牌。
+ /// 消费首个元素后的等待句柄。
+ private static async ValueTask ConsumeFirstItemAsync(
+ IAsyncEnumerable responses,
+ CancellationToken cancellationToken)
+ {
+ var enumerator = responses.GetAsyncEnumerator(cancellationToken);
+ await using (enumerator.ConfigureAwait(false))
+ {
+ if (await enumerator.MoveNextAsync().ConfigureAwait(false))
+ {
+ _ = enumerator.Current;
+ }
}
}
///
- /// 通过 GFramework.CQRS runtime 创建并完整枚举 stream。
+ /// 完整枚举整个 stream,保留原 benchmark 的总成本观测口径。
///
- [Benchmark]
- public async ValueTask Stream_GFrameworkCqrs()
+ /// 当前 stream 的响应类型。
+ /// 待完整枚举的异步响应序列。
+ /// 完整枚举结束后的等待句柄。
+ private static async ValueTask DrainAsync(IAsyncEnumerable responses)
{
- await foreach (var response in _runtime.CreateStream(BenchmarkContext.Instance, _request, CancellationToken.None)
- .ConfigureAwait(false))
- {
- _ = response;
- }
- }
-
- ///
- /// 通过 MediatR 创建并完整枚举 stream,作为外部设计对照。
- ///
- [Benchmark]
- public async ValueTask Stream_MediatR()
- {
- await foreach (var response in _mediatr.CreateStream(_request, CancellationToken.None).ConfigureAwait(false))
+ await foreach (var response in responses.ConfigureAwait(false))
{
_ = response;
}