From 683034579e8c5f3d5c92a8a7f6ca0499705cd60b Mon Sep 17 00:00:00 2001
From: gewuyou <95328647+GeWuYou@users.noreply.github.com>
Date: Mon, 11 May 2026 12:47:45 +0800
Subject: [PATCH] =?UTF-8?q?test(cqrs-benchmarks):=20=E6=96=B0=E5=A2=9E=20s?=
=?UTF-8?q?tream=20startup=20=E5=9F=BA=E5=87=86=E5=9C=BA=E6=99=AF?=
MIME-Version: 1.0
Content-Type: text/plain; charset=UTF-8
Content-Transfer-Encoding: 8bit
- 新增 StreamStartupBenchmarks,补齐 stream 的 initialization 与 cold-start first-hit 对照
- 内嵌 generated registry 与最小宿主搭建,保持 benchmark 场景在单文件内自管理
---
.../Messaging/StreamStartupBenchmarks.cs | 430 ++++++++++++++++++
1 file changed, 430 insertions(+)
create mode 100644 GFramework.Cqrs.Benchmarks/Messaging/StreamStartupBenchmarks.cs
diff --git a/GFramework.Cqrs.Benchmarks/Messaging/StreamStartupBenchmarks.cs b/GFramework.Cqrs.Benchmarks/Messaging/StreamStartupBenchmarks.cs
new file mode 100644
index 00000000..26947bbd
--- /dev/null
+++ b/GFramework.Cqrs.Benchmarks/Messaging/StreamStartupBenchmarks.cs
@@ -0,0 +1,430 @@
+// Copyright (c) 2025-2026 GeWuYou
+// SPDX-License-Identifier: Apache-2.0
+
+using BenchmarkDotNet.Attributes;
+using BenchmarkDotNet.Columns;
+using BenchmarkDotNet.Configs;
+using BenchmarkDotNet.Diagnosers;
+using BenchmarkDotNet.Jobs;
+using BenchmarkDotNet.Order;
+using System;
+using System.Collections.Generic;
+using System.Reflection;
+using System.Runtime.CompilerServices;
+using System.Threading;
+using System.Threading.Tasks;
+using GFramework.Core.Abstractions.Logging;
+using GFramework.Core.Ioc;
+using GFramework.Core.Logging;
+using GFramework.Cqrs.Abstractions.Cqrs;
+using MediatR;
+using Microsoft.Extensions.DependencyInjection;
+
+[assembly: GFramework.Cqrs.CqrsHandlerRegistryAttribute(
+ typeof(GFramework.Cqrs.Benchmarks.Messaging.StreamStartupBenchmarks.GeneratedRegistry))]
+
+namespace GFramework.Cqrs.Benchmarks.Messaging;
+
+///
+/// 对比 stream 宿主在 GFramework.CQRS reflection / generated 与 MediatR 之间的初始化与首次建流命中成本。
+///
+///
+/// 该场景与 保持相同的 `Initialization + ColdStart` 结构,
+/// 但 cold-start 边界改为“新宿主 + 首个元素命中”,因为 stream 的首个 MoveNextAsync
+/// 才会真正覆盖建流后的首次处理链路。
+///
+[Config(typeof(Config))]
+public class StreamStartupBenchmarks
+{
+ private static readonly ILogger ReflectionRuntimeLogger = CreateLogger(nameof(StreamStartupBenchmarks) + ".Reflection");
+ private static readonly ILogger GeneratedRuntimeLogger = CreateLogger(nameof(StreamStartupBenchmarks) + ".Generated");
+ private static readonly BenchmarkStreamRequest Request = new(Guid.NewGuid(), 3);
+
+ private MicrosoftDiContainer _reflectionContainer = null!;
+ private ICqrsRuntime _reflectionRuntime = null!;
+ private MicrosoftDiContainer _generatedContainer = null!;
+ private ICqrsRuntime _generatedRuntime = null!;
+ private ServiceProvider _serviceProvider = null!;
+ private IMediator _mediatr = null!;
+
+ ///
+ /// 配置 stream startup benchmark 的公共输出格式。
+ ///
+ private sealed class Config : ManualConfig
+ {
+ public Config()
+ {
+ AddJob(Job.Default
+ .WithId("ColdStart")
+ .WithInvocationCount(1)
+ .WithUnrollFactor(1));
+ AddColumnProvider(DefaultColumnProviders.Instance);
+ AddColumn(new CustomColumn("Scenario", static (_, _) => "StreamStartup"), TargetMethodColumn.Method, CategoriesColumn.Default);
+ AddDiagnoser(MemoryDiagnoser.Default);
+ AddLogicalGroupRules(BenchmarkLogicalGroupRule.ByCategory);
+ WithOrderer(new DefaultOrderer(SummaryOrderPolicy.FastestToSlowest, MethodOrderPolicy.Declared));
+ }
+ }
+
+ ///
+ /// 构建 startup benchmark 复用的 reflection / generated / MediatR 宿主对象。
+ ///
+ [GlobalSetup]
+ public void Setup()
+ {
+ Fixture.Setup("StreamStartup", handlerCount: 1, pipelineCount: 0);
+
+ _reflectionContainer = CreateReflectionContainer();
+ _reflectionRuntime = CreateRuntime(_reflectionContainer, ReflectionRuntimeLogger);
+
+ _generatedContainer = CreateGeneratedContainer();
+ _generatedRuntime = CreateRuntime(_generatedContainer, GeneratedRuntimeLogger);
+
+ _serviceProvider = CreateMediatRServiceProvider();
+ _mediatr = _serviceProvider.GetRequiredService();
+ }
+
+ ///
+ /// 在每次 cold-start 迭代前清空 dispatcher 静态缓存,确保首次绑定路径可重复观察。
+ ///
+ [IterationSetup]
+ public void ResetColdStartCaches()
+ {
+ BenchmarkDispatcherCacheHelper.ClearDispatcherCaches();
+ }
+
+ ///
+ /// 释放 startup benchmark 复用的宿主对象。
+ ///
+ [GlobalCleanup]
+ public void Cleanup()
+ {
+ BenchmarkCleanupHelper.DisposeAll(_reflectionContainer, _generatedContainer, _serviceProvider);
+ }
+
+ ///
+ /// 返回已构建宿主中的 MediatR mediator,作为 initialization 组的句柄解析 baseline。
+ ///
+ /// 当前 benchmark 复用的 MediatR mediator。
+ [Benchmark(Baseline = true)]
+ [BenchmarkCategory("Initialization")]
+ public IMediator Initialization_MediatR()
+ {
+ return _mediatr;
+ }
+
+ ///
+ /// 返回已构建宿主中的 GFramework.CQRS reflection runtime,观察默认 stream binding 宿主句柄解析成本。
+ ///
+ /// 当前 benchmark 复用的 reflection CQRS runtime。
+ [Benchmark]
+ [BenchmarkCategory("Initialization")]
+ public ICqrsRuntime Initialization_GFrameworkReflection()
+ {
+ return _reflectionRuntime;
+ }
+
+ ///
+ /// 返回已构建宿主中的 GFramework.CQRS generated runtime,观察 generated stream invoker 宿主句柄解析成本。
+ ///
+ /// 当前 benchmark 复用的 generated CQRS runtime。
+ [Benchmark]
+ [BenchmarkCategory("Initialization")]
+ public ICqrsRuntime Initialization_GFrameworkGenerated()
+ {
+ return _generatedRuntime;
+ }
+
+ ///
+ /// 在新宿主上首次创建并推进 stream,作为 MediatR 的 cold-start baseline。
+ ///
+ /// 首个 stream 响应元素。
+ [Benchmark(Baseline = true)]
+ [BenchmarkCategory("ColdStart")]
+ public async Task ColdStart_MediatR()
+ {
+ using var serviceProvider = CreateMediatRServiceProvider();
+ var mediator = serviceProvider.GetRequiredService();
+ return await ConsumeFirstItemAsync(mediator.CreateStream(Request, CancellationToken.None), CancellationToken.None).ConfigureAwait(false);
+ }
+
+ ///
+ /// 在新的 reflection runtime 上首次创建并推进 stream,量化默认 stream binding 的 first-hit 成本。
+ ///
+ /// 首个 stream 响应元素。
+ [Benchmark]
+ [BenchmarkCategory("ColdStart")]
+ public async ValueTask ColdStart_GFrameworkReflection()
+ {
+ using var container = CreateReflectionContainer();
+ var runtime = CreateRuntime(container, ReflectionRuntimeLogger);
+ return await ConsumeFirstItemAsync(
+ runtime.CreateStream(BenchmarkContext.Instance, Request, CancellationToken.None),
+ CancellationToken.None)
+ .ConfigureAwait(false);
+ }
+
+ ///
+ /// 在新的 generated runtime 上首次创建并推进 stream,量化 generated stream invoker 路径的 first-hit 成本。
+ ///
+ /// 首个 stream 响应元素。
+ [Benchmark]
+ [BenchmarkCategory("ColdStart")]
+ public async ValueTask ColdStart_GFrameworkGenerated()
+ {
+ using var container = CreateGeneratedContainer();
+ var runtime = CreateRuntime(container, GeneratedRuntimeLogger);
+ return await ConsumeFirstItemAsync(
+ runtime.CreateStream(BenchmarkContext.Instance, Request, CancellationToken.None),
+ CancellationToken.None)
+ .ConfigureAwait(false);
+ }
+
+ ///
+ /// 构建只承载当前 benchmark handler 的最小 reflection GFramework.CQRS 容器。
+ ///
+ private static MicrosoftDiContainer CreateReflectionContainer()
+ {
+ return BenchmarkHostFactory.CreateFrozenGFrameworkContainer(static container =>
+ {
+ container.RegisterTransient<
+ GFramework.Cqrs.Abstractions.Cqrs.IStreamRequestHandler,
+ BenchmarkStreamHandler>();
+ });
+ }
+
+ ///
+ /// 构建只承载当前 benchmark generated registry 的最小 generated GFramework.CQRS 容器。
+ ///
+ private static MicrosoftDiContainer CreateGeneratedContainer()
+ {
+ return BenchmarkHostFactory.CreateFrozenGFrameworkContainer(static container =>
+ {
+ BenchmarkHostFactory.RegisterGeneratedBenchmarkRegistry(container);
+ });
+ }
+
+ ///
+ /// 基于已冻结的 benchmark 容器构建最小 GFramework.CQRS runtime。
+ ///
+ /// 当前 benchmark 拥有并负责释放的容器。
+ /// 当前 runtime 使用的 benchmark logger。
+ private static ICqrsRuntime CreateRuntime(MicrosoftDiContainer container, ILogger logger)
+ {
+ ArgumentNullException.ThrowIfNull(container);
+ ArgumentNullException.ThrowIfNull(logger);
+
+ return GFramework.Cqrs.CqrsRuntimeFactory.CreateRuntime(container, logger);
+ }
+
+ ///
+ /// 构建只承载当前 benchmark handler 的最小 MediatR 对照宿主。
+ ///
+ private static ServiceProvider CreateMediatRServiceProvider()
+ {
+ return BenchmarkHostFactory.CreateMediatRServiceProvider(
+ configure: null,
+ typeof(StreamStartupBenchmarks),
+ static candidateType => candidateType == typeof(BenchmarkStreamHandler),
+ ServiceLifetime.Transient);
+ }
+
+ ///
+ /// 推进 stream 到首个元素,并返回该元素作为 cold-start 结果。
+ ///
+ /// 当前 stream 的响应类型。
+ /// 待推进的异步响应序列。
+ /// 用于向异步枚举器传播取消的令牌。
+ /// 首个元素。
+ /// stream 未产生任何元素。
+ private static async ValueTask ConsumeFirstItemAsync(
+ IAsyncEnumerable responses,
+ CancellationToken cancellationToken)
+ {
+ ArgumentNullException.ThrowIfNull(responses);
+
+ var enumerator = responses.GetAsyncEnumerator(cancellationToken);
+ await using (enumerator.ConfigureAwait(false))
+ {
+ if (await enumerator.MoveNextAsync().ConfigureAwait(false))
+ {
+ return enumerator.Current;
+ }
+ }
+
+ throw new InvalidOperationException("The benchmark stream must yield at least one response.");
+ }
+
+ ///
+ /// 为 benchmark 创建稳定的 fatal 级 logger,避免把日志成本混入 startup 测量。
+ ///
+ private static ILogger CreateLogger(string categoryName)
+ {
+ LoggerFactoryResolver.Provider = new ConsoleLoggerFactoryProvider
+ {
+ MinLevel = LogLevel.Fatal
+ };
+ return LoggerFactoryResolver.Provider.CreateLogger(categoryName);
+ }
+
+ ///
+ /// Benchmark stream request。
+ ///
+ /// 请求标识。
+ /// 返回元素数量。
+ public sealed record BenchmarkStreamRequest(Guid Id, int ItemCount) :
+ GFramework.Cqrs.Abstractions.Cqrs.IStreamRequest,
+ MediatR.IStreamRequest;
+
+ ///
+ /// Benchmark stream response。
+ ///
+ /// 响应标识。
+ public sealed record BenchmarkResponse(Guid Id);
+
+ ///
+ /// 同时实现 GFramework.CQRS 与 MediatR 契约的最小 stream handler。
+ ///
+ public sealed class BenchmarkStreamHandler :
+ GFramework.Cqrs.Abstractions.Cqrs.IStreamRequestHandler,
+ MediatR.IStreamRequestHandler
+ {
+ ///
+ /// 处理 GFramework.CQRS stream request。
+ ///
+ /// 当前 stream 请求。
+ /// 用于中断异步枚举的取消令牌。
+ /// 按请求元素数量延迟生成的异步响应序列。
+ public IAsyncEnumerable Handle(
+ BenchmarkStreamRequest request,
+ CancellationToken cancellationToken)
+ {
+ return EnumerateAsync(request, cancellationToken);
+ }
+
+ ///
+ /// 处理 MediatR stream request。
+ ///
+ /// 当前 stream 请求。
+ /// 用于中断异步枚举的取消令牌。
+ /// 按请求元素数量延迟生成的异步响应序列。
+ IAsyncEnumerable MediatR.IStreamRequestHandler.Handle(
+ BenchmarkStreamRequest request,
+ CancellationToken cancellationToken)
+ {
+ return EnumerateAsync(request, cancellationToken);
+ }
+
+ ///
+ /// 生成固定长度的 benchmark stream,确保 cold-start 与 steady-state 维度共用同一份响应形状。
+ ///
+ /// 当前 stream 请求。
+ /// 用于向异步枚举器传播取消的令牌。
+ /// 按请求数量生成的异步响应序列。
+ private static async IAsyncEnumerable EnumerateAsync(
+ BenchmarkStreamRequest request,
+ [EnumeratorCancellation] CancellationToken cancellationToken)
+ {
+ for (var index = 0; index < request.ItemCount; index++)
+ {
+ cancellationToken.ThrowIfCancellationRequested();
+ yield return new BenchmarkResponse(request.Id);
+ await Task.CompletedTask.ConfigureAwait(false);
+ }
+ }
+ }
+
+ ///
+ /// 为 stream startup benchmark 提供 hand-written generated registry,
+ /// 以便独立比较 generated stream invoker 的初始化与首次命中成本。
+ ///
+ public sealed class GeneratedRegistry :
+ GFramework.Cqrs.ICqrsHandlerRegistry,
+ GFramework.Cqrs.ICqrsStreamInvokerProvider,
+ GFramework.Cqrs.IEnumeratesCqrsStreamInvokerDescriptors
+ {
+ private static readonly GFramework.Cqrs.CqrsStreamInvokerDescriptor Descriptor =
+ new(
+ typeof(GFramework.Cqrs.Abstractions.Cqrs.IStreamRequestHandler<
+ BenchmarkStreamRequest,
+ BenchmarkResponse>),
+ typeof(GeneratedRegistry).GetMethod(
+ nameof(InvokeBenchmarkStreamHandler),
+ BindingFlags.Public | BindingFlags.Static)
+ ?? throw new InvalidOperationException("Missing generated stream startup benchmark method."));
+
+ private static readonly IReadOnlyList Descriptors =
+ [
+ new GFramework.Cqrs.CqrsStreamInvokerDescriptorEntry(
+ typeof(BenchmarkStreamRequest),
+ typeof(BenchmarkResponse),
+ Descriptor)
+ ];
+
+ ///
+ /// 把 startup benchmark handler 注册为 transient,保持与 cold-start 对照宿主一致的 handler 生命周期。
+ ///
+ /// 承载 generated handler 注册结果的目标服务集合。
+ /// 记录 generated registry 注册过程的日志器。
+ public void Register(IServiceCollection services, ILogger logger)
+ {
+ ArgumentNullException.ThrowIfNull(services);
+ ArgumentNullException.ThrowIfNull(logger);
+
+ services.AddTransient(
+ typeof(GFramework.Cqrs.Abstractions.Cqrs.IStreamRequestHandler<
+ BenchmarkStreamRequest,
+ BenchmarkResponse>),
+ typeof(BenchmarkStreamHandler));
+ logger.Debug("Registered generated stream startup benchmark handler.");
+ }
+
+ ///
+ /// 返回当前 provider 暴露的全部 generated stream invoker 描述符。
+ ///
+ /// 当前 startup benchmark 的 generated stream invoker 描述符集合。
+ public IReadOnlyList GetDescriptors()
+ {
+ return Descriptors;
+ }
+
+ ///
+ /// 为目标流式请求/响应类型对返回 generated stream invoker 描述符。
+ ///
+ /// 待匹配的 stream 请求类型。
+ /// 待匹配的 stream 响应类型。
+ /// 匹配成功时返回的 generated stream invoker 描述符。
+ /// 命中当前 benchmark 请求/响应类型对时返回 ;否则返回 。
+ public bool TryGetDescriptor(
+ Type requestType,
+ Type responseType,
+ out GFramework.Cqrs.CqrsStreamInvokerDescriptor? descriptor)
+ {
+ if (requestType == typeof(BenchmarkStreamRequest) &&
+ responseType == typeof(BenchmarkResponse))
+ {
+ descriptor = Descriptor;
+ return true;
+ }
+
+ descriptor = null;
+ return false;
+ }
+
+ ///
+ /// 模拟 generated stream invoker provider 为 startup benchmark 产出的开放静态调用入口。
+ ///
+ /// 当前 benchmark 注册的 stream handler 实例。
+ /// 当前 benchmark 的 stream 请求对象。
+ /// 用于中断异步枚举的取消令牌。
+ /// 由 handler 产生的异步响应序列。
+ public static object InvokeBenchmarkStreamHandler(object handler, object request, CancellationToken cancellationToken)
+ {
+ var typedHandler = (GFramework.Cqrs.Abstractions.Cqrs.IStreamRequestHandler<
+ BenchmarkStreamRequest,
+ BenchmarkResponse>)handler;
+ var typedRequest = (BenchmarkStreamRequest)request;
+ return typedHandler.Handle(typedRequest, cancellationToken);
+ }
+ }
+}