test(cqrs-benchmarks): 扩展流式基准观测并补齐 scoped 生命周期矩阵

- 新增 StreamInvokerBenchmarks 的 FirstItem 与 DrainAll 双观测口径,并补齐相关 XML 注释。

- 引入 ScopedBenchmarkContainer 与 scoped request helper,为 RequestLifetimeBenchmarks 建立真实作用域边界下的 Scoped 生命周期矩阵。

- 更新 cqrs-rewrite 的 active tracking 与 trace,记录 RP-129 的多 worker 波次、串行化 smoke 验证与新的恢复入口。
This commit is contained in:
gewuyou 2026-05-11 08:03:07 +08:00
parent 79ae5f0b5a
commit 11a6b6abe4
6 changed files with 648 additions and 62 deletions

View File

@ -3,6 +3,8 @@
using System;
using System.Linq;
using System.Threading;
using System.Threading.Tasks;
using GFramework.Core.Abstractions.Logging;
using GFramework.Core.Ioc;
using GFramework.Cqrs.Abstractions.Cqrs;
@ -158,6 +160,64 @@ internal static class BenchmarkHostFactory
return services.BuildServiceProvider();
}
/// <summary>
/// 在真实的 request 级作用域内执行一次 GFramework.CQRS request 分发。
/// </summary>
/// <typeparam name="TResponse">请求响应类型。</typeparam>
/// <param name="rootContainer">冻结后的 benchmark 根容器,用于创建 request 作用域并提供注册元数据。</param>
/// <param name="runtimeLogger">当前 request 级 runtime 复用的日志器。</param>
/// <param name="context">当前 CQRS 分发上下文。</param>
/// <param name="request">要发送的 request。</param>
/// <param name="cancellationToken">取消令牌。</param>
/// <returns>当前 request 的响应结果。</returns>
/// <remarks>
/// 该入口只服务 request lifetime benchmark每次调用都会显式创建并释放一个新的 DI 作用域,
/// 让 `Scoped` handler 在真实 request 边界内解析,而不是退化为根容器解析。
/// </remarks>
internal static async ValueTask<TResponse> SendScopedGFrameworkRequestAsync<TResponse>(
MicrosoftDiContainer rootContainer,
ILogger runtimeLogger,
ICqrsContext context,
GFramework.Cqrs.Abstractions.Cqrs.IRequest<TResponse> request,
CancellationToken cancellationToken = default)
{
ArgumentNullException.ThrowIfNull(rootContainer);
ArgumentNullException.ThrowIfNull(runtimeLogger);
ArgumentNullException.ThrowIfNull(context);
ArgumentNullException.ThrowIfNull(request);
using var scope = rootContainer.CreateScope();
var scopedContainer = new ScopedBenchmarkContainer(rootContainer, scope);
var runtime = GFramework.Cqrs.CqrsRuntimeFactory.CreateRuntime(
scopedContainer,
runtimeLogger);
return await runtime.SendAsync(context, request, cancellationToken).ConfigureAwait(false);
}
/// <summary>
/// 在真实的 request 级作用域内执行一次 MediatR request 分发。
/// </summary>
/// <typeparam name="TResponse">请求响应类型。</typeparam>
/// <param name="rootServiceProvider">当前 benchmark 的根 <see cref="ServiceProvider" />。</param>
/// <param name="request">要发送的 request。</param>
/// <param name="cancellationToken">取消令牌。</param>
/// <returns>当前 request 的响应结果。</returns>
/// <remarks>
/// 这里显式从新的 scope 解析 <see cref="IMediator" />,确保 `Scoped` handler 与其依赖绑定到 request 边界。
/// </remarks>
internal static async Task<TResponse> SendScopedMediatRRequestAsync<TResponse>(
ServiceProvider rootServiceProvider,
MediatR.IRequest<TResponse> request,
CancellationToken cancellationToken = default)
{
ArgumentNullException.ThrowIfNull(rootServiceProvider);
ArgumentNullException.ThrowIfNull(request);
using var scope = rootServiceProvider.CreateScope();
var mediator = scope.ServiceProvider.GetRequiredService<IMediator>();
return await mediator.Send(request, cancellationToken).ConfigureAwait(false);
}
/// <summary>
/// 创建承载 NuGet `Mediator` source-generated concrete mediator 的最小对照宿主。
/// </summary>

View File

@ -23,24 +23,25 @@ namespace GFramework.Cqrs.Benchmarks.Messaging;
/// 对比 request steady-state dispatch 在不同 handler 生命周期下的额外开销。
/// </summary>
/// <remarks>
/// 当前矩阵覆盖 `Singleton` 与 `Transient`。
/// `Scoped` 在两个 runtime 中都依赖显式作用域边界,而当前 benchmark 宿主故意保持“单根容器最小宿主”模型
/// 直接把 scoped 解析压到根作用域会让对照语义失真,因此留到未来有真实 scoped host 基线时再扩展
/// 当前矩阵覆盖 `Singleton`、`Scoped` 与 `Transient`。
/// 其中 `Scoped` 会在每次 request 分发时显式创建并释放真实的 DI 作用域
/// 避免把 scoped handler 错误地压到根容器解析而扭曲生命周期对照
/// </remarks>
[Config(typeof(Config))]
public class RequestLifetimeBenchmarks
{
private MicrosoftDiContainer _container = null!;
private ICqrsRuntime _runtime = null!;
private ICqrsRuntime? _runtime;
private ServiceProvider _serviceProvider = null!;
private IMediator _mediatr = null!;
private IMediator? _mediatr;
private BenchmarkRequestHandler _baselineHandler = null!;
private BenchmarkRequest _request = null!;
private ILogger _runtimeLogger = null!;
/// <summary>
/// 控制当前 benchmark 使用的 handler 生命周期。
/// </summary>
[Params(HandlerLifetime.Singleton, HandlerLifetime.Transient)]
[Params(HandlerLifetime.Singleton, HandlerLifetime.Scoped, HandlerLifetime.Transient)]
public HandlerLifetime Lifetime { get; set; }
/// <summary>
@ -53,6 +54,11 @@ public class RequestLifetimeBenchmarks
/// </summary>
Singleton,
/// <summary>
/// 每次 request 在显式作用域内解析并复用 handler 实例。
/// </summary>
Scoped,
/// <summary>
/// 每次分发都重新解析新的 handler 实例。
/// </summary>
@ -90,6 +96,8 @@ public class RequestLifetimeBenchmarks
_baselineHandler = new BenchmarkRequestHandler();
_request = new BenchmarkRequest(Guid.NewGuid());
_runtimeLogger = LoggerFactoryResolver.Provider.CreateLogger(nameof(RequestLifetimeBenchmarks) + "." + Lifetime);
_container = BenchmarkHostFactory.CreateFrozenGFrameworkContainer(container =>
{
BenchmarkHostFactory.RegisterGeneratedBenchmarkRegistry<GeneratedRequestLifetimeBenchmarkRegistry>(container);
@ -97,16 +105,22 @@ public class RequestLifetimeBenchmarks
});
// 容器内已提前保留默认 runtime 以支撑 generated registry 接线;
// 这里额外创建带生命周期后缀的 runtime只是为了区分不同 benchmark 矩阵的 dispatcher 日志。
_runtime = GFramework.Cqrs.CqrsRuntimeFactory.CreateRuntime(
_container,
LoggerFactoryResolver.Provider.CreateLogger(nameof(RequestLifetimeBenchmarks) + "." + Lifetime));
if (Lifetime != HandlerLifetime.Scoped)
{
_runtime = GFramework.Cqrs.CqrsRuntimeFactory.CreateRuntime(
_container,
_runtimeLogger);
}
_serviceProvider = BenchmarkHostFactory.CreateMediatRServiceProvider(
configure: null,
typeof(RequestLifetimeBenchmarks),
static candidateType => candidateType == typeof(BenchmarkRequestHandler),
ResolveMediatRLifetime(Lifetime));
_mediatr = _serviceProvider.GetRequiredService<IMediator>();
if (Lifetime != HandlerLifetime.Scoped)
{
_mediatr = _serviceProvider.GetRequiredService<IMediator>();
}
}
/// <summary>
@ -140,7 +154,17 @@ public class RequestLifetimeBenchmarks
[Benchmark]
public ValueTask<BenchmarkResponse> SendRequest_GFrameworkCqrs()
{
return _runtime.SendAsync(BenchmarkContext.Instance, _request, CancellationToken.None);
if (Lifetime == HandlerLifetime.Scoped)
{
return BenchmarkHostFactory.SendScopedGFrameworkRequestAsync(
_container,
_runtimeLogger,
BenchmarkContext.Instance,
_request,
CancellationToken.None);
}
return _runtime!.SendAsync(BenchmarkContext.Instance, _request, CancellationToken.None);
}
/// <summary>
@ -149,7 +173,15 @@ public class RequestLifetimeBenchmarks
[Benchmark]
public Task<BenchmarkResponse> SendRequest_MediatR()
{
return _mediatr.Send(_request, CancellationToken.None);
if (Lifetime == HandlerLifetime.Scoped)
{
return BenchmarkHostFactory.SendScopedMediatRRequestAsync(
_serviceProvider,
_request,
CancellationToken.None);
}
return _mediatr!.Send(_request, CancellationToken.None);
}
/// <summary>
@ -171,6 +203,10 @@ public class RequestLifetimeBenchmarks
container.RegisterSingleton<GFramework.Cqrs.Abstractions.Cqrs.IRequestHandler<BenchmarkRequest, BenchmarkResponse>, BenchmarkRequestHandler>();
return;
case HandlerLifetime.Scoped:
container.RegisterScoped<GFramework.Cqrs.Abstractions.Cqrs.IRequestHandler<BenchmarkRequest, BenchmarkResponse>, BenchmarkRequestHandler>();
return;
case HandlerLifetime.Transient:
container.RegisterTransient<GFramework.Cqrs.Abstractions.Cqrs.IRequestHandler<BenchmarkRequest, BenchmarkResponse>, BenchmarkRequestHandler>();
return;
@ -189,6 +225,7 @@ public class RequestLifetimeBenchmarks
return lifetime switch
{
HandlerLifetime.Singleton => ServiceLifetime.Singleton,
HandlerLifetime.Scoped => ServiceLifetime.Scoped,
HandlerLifetime.Transient => ServiceLifetime.Transient,
_ => throw new ArgumentOutOfRangeException(nameof(lifetime), lifetime, "Unsupported benchmark handler lifetime.")
};

View File

@ -0,0 +1,361 @@
// Copyright (c) 2025-2026 GeWuYou
// SPDX-License-Identifier: Apache-2.0
using System;
using System.Collections.Generic;
using System.Linq;
using GFramework.Core.Abstractions.Bases;
using GFramework.Core.Abstractions.Ioc;
using GFramework.Core.Abstractions.Rule;
using GFramework.Core.Abstractions.Systems;
using GFramework.Core.Ioc;
using Microsoft.Extensions.DependencyInjection;
namespace GFramework.Cqrs.Benchmarks.Messaging;
/// <summary>
/// 把冻结后的 benchmark 根容器与单个 <see cref="IServiceScope" /> 组合成 request 级解析视图。
/// </summary>
/// <remarks>
/// `CqrsDispatcher` 会直接依赖 <see cref="IIocContainer" /> 做 handler / pipeline 解析,
/// 因此 request lifetime benchmark 需要一个既保留根容器注册元数据,又把实例解析切换到显式作用域 provider
/// 的最小适配层。该类型只覆盖 benchmark 当前 request 路径会使用到的解析相关入口;
/// 任何注册、清空或冻结修改操作都应继续发生在根容器构建阶段,因此这里统一拒绝可变更 API。
/// </remarks>
internal sealed class ScopedBenchmarkContainer : IIocContainer
{
private readonly MicrosoftDiContainer _rootContainer;
private readonly IServiceProvider _scopedProvider;
/// <summary>
/// 初始化一个绑定到单个 request 作用域的 benchmark 容器适配器。
/// </summary>
/// <param name="rootContainer">已冻结的 benchmark 根容器。</param>
/// <param name="scope">当前 request 独占的作用域实例。</param>
internal ScopedBenchmarkContainer(MicrosoftDiContainer rootContainer, IServiceScope scope)
{
_rootContainer = rootContainer ?? throw new ArgumentNullException(nameof(rootContainer));
ArgumentNullException.ThrowIfNull(scope);
_scopedProvider = scope.ServiceProvider;
}
/// <summary>
/// 当前适配器不支持在 request 作用域内追加注册。
/// </summary>
public void RegisterSingleton<T>(T instance)
{
throw CreateMutationNotSupportedException();
}
/// <summary>
/// 当前适配器不支持在 request 作用域内追加注册。
/// </summary>
public void RegisterSingleton<TService, TImpl>()
where TImpl : class, TService
where TService : class
{
throw CreateMutationNotSupportedException();
}
/// <summary>
/// 当前适配器不支持在 request 作用域内追加注册。
/// </summary>
public void RegisterTransient<TService, TImpl>()
where TImpl : class, TService
where TService : class
{
throw CreateMutationNotSupportedException();
}
/// <summary>
/// 当前适配器不支持在 request 作用域内追加注册。
/// </summary>
public void RegisterScoped<TService, TImpl>()
where TImpl : class, TService
where TService : class
{
throw CreateMutationNotSupportedException();
}
/// <summary>
/// 当前适配器不支持在 request 作用域内追加注册。
/// </summary>
public void RegisterPlurality(object instance)
{
throw CreateMutationNotSupportedException();
}
/// <summary>
/// 当前适配器不支持在 request 作用域内追加注册。
/// </summary>
public void RegisterPlurality<T>() where T : class
{
throw CreateMutationNotSupportedException();
}
/// <summary>
/// 当前适配器不支持在 request 作用域内追加注册。
/// </summary>
public void RegisterSystem(ISystem system)
{
throw CreateMutationNotSupportedException();
}
/// <summary>
/// 当前适配器不支持在 request 作用域内追加注册。
/// </summary>
public void Register<T>(T instance)
{
throw CreateMutationNotSupportedException();
}
/// <summary>
/// 当前适配器不支持在 request 作用域内追加注册。
/// </summary>
public void Register(Type type, object instance)
{
throw CreateMutationNotSupportedException();
}
/// <summary>
/// 当前适配器不支持在 request 作用域内追加注册。
/// </summary>
public void RegisterFactory<TService>(Func<IServiceProvider, TService> factory) where TService : class
{
throw CreateMutationNotSupportedException();
}
/// <summary>
/// 当前适配器不支持在 request 作用域内追加注册。
/// </summary>
public void RegisterCqrsPipelineBehavior<TBehavior>() where TBehavior : class
{
throw CreateMutationNotSupportedException();
}
/// <summary>
/// 当前适配器不支持在 request 作用域内追加注册。
/// </summary>
public void RegisterCqrsStreamPipelineBehavior<TBehavior>() where TBehavior : class
{
throw CreateMutationNotSupportedException();
}
/// <summary>
/// 当前适配器不支持在 request 作用域内追加注册。
/// </summary>
public void RegisterCqrsHandlersFromAssembly(System.Reflection.Assembly assembly)
{
throw CreateMutationNotSupportedException();
}
/// <summary>
/// 当前适配器不支持在 request 作用域内追加注册。
/// </summary>
public void RegisterCqrsHandlersFromAssemblies(IEnumerable<System.Reflection.Assembly> assemblies)
{
throw CreateMutationNotSupportedException();
}
/// <summary>
/// 当前适配器不支持执行额外的服务配置钩子。
/// </summary>
public void ExecuteServicesHook(Action<IServiceCollection>? configurator = null)
{
throw CreateMutationNotSupportedException();
}
/// <summary>
/// 从当前 request 作用域解析单个服务实例。
/// </summary>
public T? Get<T>() where T : class
{
return _scopedProvider.GetService<T>();
}
/// <summary>
/// 从当前 request 作用域解析单个服务实例。
/// </summary>
public object? Get(Type type)
{
ArgumentNullException.ThrowIfNull(type);
return _scopedProvider.GetService(type);
}
/// <summary>
/// 从当前 request 作用域解析必需的单个服务实例。
/// </summary>
public T GetRequired<T>() where T : class
{
return _scopedProvider.GetRequiredService<T>();
}
/// <summary>
/// 从当前 request 作用域解析必需的单个服务实例。
/// </summary>
public object GetRequired(Type type)
{
ArgumentNullException.ThrowIfNull(type);
return _scopedProvider.GetRequiredService(type);
}
/// <summary>
/// 从当前 request 作用域解析全部服务实例。
/// </summary>
public IReadOnlyList<T> GetAll<T>() where T : class
{
return _scopedProvider.GetServices<T>().ToList();
}
/// <summary>
/// 从当前 request 作用域解析全部服务实例。
/// </summary>
public IReadOnlyList<object> GetAll(Type type)
{
ArgumentNullException.ThrowIfNull(type);
return _scopedProvider.GetServices(type).Where(static service => service is not null).Cast<object>().ToList();
}
/// <summary>
/// 从当前 request 作用域解析全部服务实例,并按调用方比较器排序。
/// </summary>
public IReadOnlyList<T> GetAllSorted<T>(Comparison<T> comparison) where T : class
{
ArgumentNullException.ThrowIfNull(comparison);
var services = GetAll<T>().ToList();
services.Sort(comparison);
return services;
}
/// <summary>
/// 从当前 request 作用域解析全部服务实例,并按优先级排序。
/// </summary>
public IReadOnlyList<T> GetAllByPriority<T>() where T : class
{
return SortByPriority(GetAll<T>());
}
/// <summary>
/// 从当前 request 作用域解析全部服务实例,并按优先级排序。
/// </summary>
public IReadOnlyList<object> GetAllByPriority(Type type)
{
ArgumentNullException.ThrowIfNull(type);
return SortByPriority(GetAll(type));
}
/// <summary>
/// 判断根容器是否声明了目标服务键。
/// </summary>
/// <remarks>
/// `CqrsDispatcher` 在热路径上先做注册存在性判断,再决定是否枚举 pipeline这里沿用根容器冻结后的注册视图
/// 避免把“当前 scope 还未物化实例”误判成“没有注册该行为”。
/// </remarks>
public bool HasRegistration(Type type)
{
ArgumentNullException.ThrowIfNull(type);
return _rootContainer.HasRegistration(type);
}
/// <summary>
/// 判断根容器是否声明了目标服务键。
/// </summary>
public bool Contains<T>() where T : class
{
return _rootContainer.Contains<T>();
}
/// <summary>
/// 当前 request 作用域适配器不追踪实例归属。
/// </summary>
public bool ContainsInstance(object instance)
{
return _rootContainer.ContainsInstance(instance);
}
/// <summary>
/// 当前适配器不支持清空注册。
/// </summary>
public void Clear()
{
throw CreateMutationNotSupportedException();
}
/// <summary>
/// 当前适配器不支持重新冻结。
/// </summary>
public void Freeze()
{
throw CreateMutationNotSupportedException();
}
/// <summary>
/// 继续暴露根容器底层服务集合,仅用于接口兼容。
/// </summary>
public IServiceCollection GetServicesUnsafe => _rootContainer.GetServicesUnsafe;
/// <summary>
/// 基于当前 request 作用域继续创建嵌套作用域。
/// </summary>
public IServiceScope CreateScope()
{
return _scopedProvider.CreateScope();
}
/// <summary>
/// 将上下文转发给根容器,保持与 request 生命周期无关的上下文缓存行为一致。
/// </summary>
public void SetContext(GFramework.Core.Abstractions.Architectures.IArchitectureContext context)
{
((IContextAware)_rootContainer).SetContext(context);
}
/// <summary>
/// 读取根容器当前持有的架构上下文。
/// </summary>
public GFramework.Core.Abstractions.Architectures.IArchitectureContext GetContext()
{
return ((IContextAware)_rootContainer).GetContext();
}
/// <summary>
/// 释放当前 request 适配器时不拥有作用域;外层 benchmark 调度入口负责统一释放。
/// </summary>
public void Dispose()
{
}
/// <summary>
/// 生成统一的只读适配器异常,避免 benchmark 误把 request 级容器当成可变组合根。
/// </summary>
private static InvalidOperationException CreateMutationNotSupportedException()
{
return new InvalidOperationException(
"Scoped benchmark containers are read-only request views. Mutate registrations on the root benchmark host before freezing it.");
}
/// <summary>
/// 复用与根容器一致的优先级排序语义。
/// </summary>
/// <typeparam name="T">服务实例类型。</typeparam>
/// <param name="services">待排序服务集合。</param>
/// <returns>按优先级稳定排序后的服务列表。</returns>
private static IReadOnlyList<T> SortByPriority<T>(IReadOnlyList<T> services) where T : class
{
if (services.Count <= 1)
{
return services;
}
return services
.Select((service, index) => new { Service = service, Index = index })
.OrderBy(static x =>
{
var priority = x.Service is IPrioritized prioritized ? prioritized.Priority : 0;
return (priority, x.Index);
})
.Select(static x => x.Service)
.ToList();
}
}

View File

@ -24,8 +24,13 @@ using Microsoft.Extensions.DependencyInjection;
namespace GFramework.Cqrs.Benchmarks.Messaging;
/// <summary>
/// 对比 stream 完整枚举在 direct handler、GFramework 反射路径、GFramework generated invoker 路径与 MediatR 之间的开销差异。
/// 对比 stream invoker 在 direct handler、GFramework 反射路径、GFramework generated invoker 路径与 MediatR 之间的开销差异。
/// </summary>
/// <remarks>
/// 该矩阵只保留单一 handler 生命周期,避免把 invoker 路径差异与生命周期解析成本混在一起。
/// <see cref="StreamObservation.FirstItem" /> 用于近似观察建流到首个元素的瞬时成本,
/// <see cref="StreamObservation.DrainAll" /> 则保留原有完整枚举口径。
/// </remarks>
[Config(typeof(Config))]
public class StreamInvokerBenchmarks
{
@ -40,6 +45,28 @@ public class StreamInvokerBenchmarks
private GeneratedBenchmarkStreamRequest _generatedRequest = null!;
private MediatRBenchmarkStreamRequest _mediatrRequest = null!;
/// <summary>
/// 控制当前 benchmark 观察“只推进首个元素”还是“完整枚举整个 stream”。
/// </summary>
[Params(StreamObservation.FirstItem, StreamObservation.DrainAll)]
public StreamObservation Observation { get; set; }
/// <summary>
/// 用于拆分 stream invoker 固定成本与后续枚举成本的观测模式。
/// </summary>
public enum StreamObservation
{
/// <summary>
/// 只推进到首个元素后立即释放枚举器。
/// </summary>
FirstItem,
/// <summary>
/// 完整枚举整个 stream保留原有 benchmark 语义。
/// </summary>
DrainAll
}
/// <summary>
/// 配置 stream invoker benchmark 的公共输出格式。
/// </summary>
@ -114,50 +141,106 @@ public class StreamInvokerBenchmarks
}
/// <summary>
/// 直接调用最小 stream handler 并完整枚举,作为 dispatch 额外开销 baseline。
/// 直接调用最小 stream handler,并按当前观测模式消费 stream,作为 dispatch 额外开销 baseline。
/// </summary>
[Benchmark(Baseline = true)]
public async ValueTask Stream_Baseline()
public ValueTask Stream_Baseline()
{
await foreach (var response in _baselineHandler.Handle(_reflectionRequest, CancellationToken.None).ConfigureAwait(false))
return ObserveAsync(_baselineHandler.Handle(_reflectionRequest, CancellationToken.None), Observation);
}
/// <summary>
/// 通过 GFramework.CQRS 反射 stream binding 路径创建 stream并按当前观测模式消费。
/// </summary>
[Benchmark]
public ValueTask Stream_GFrameworkReflection()
{
return ObserveAsync(
_reflectionRuntime.CreateStream(
BenchmarkContext.Instance,
_reflectionRequest,
CancellationToken.None),
Observation);
}
/// <summary>
/// 通过 generated stream invoker provider 预热后的 GFramework.CQRS runtime 创建 stream并按当前观测模式消费。
/// </summary>
[Benchmark]
public ValueTask Stream_GFrameworkGenerated()
{
return ObserveAsync(
_generatedRuntime.CreateStream(
BenchmarkContext.Instance,
_generatedRequest,
CancellationToken.None),
Observation);
}
/// <summary>
/// 通过 MediatR 创建 stream并按当前观测模式消费作为外部对照。
/// </summary>
[Benchmark]
public ValueTask Stream_MediatR()
{
return ObserveAsync(_mediatr.CreateStream(_mediatrRequest, CancellationToken.None), Observation);
}
/// <summary>
/// 按观测模式消费 stream便于把“建流/首个元素”和“完整枚举”分开观察。
/// </summary>
/// <typeparam name="TResponse">当前 stream 的响应类型。</typeparam>
/// <param name="responses">待观察的异步响应序列。</param>
/// <param name="observation">当前 benchmark 选定的观测模式。</param>
/// <returns>异步消费完成后的等待句柄。</returns>
private static ValueTask ObserveAsync<TResponse>(
IAsyncEnumerable<TResponse> responses,
StreamObservation observation)
{
ArgumentNullException.ThrowIfNull(responses);
return observation switch
{
_ = response;
StreamObservation.FirstItem => ConsumeFirstItemAsync(responses, CancellationToken.None),
StreamObservation.DrainAll => DrainAsync(responses),
_ => throw new ArgumentOutOfRangeException(
nameof(observation),
observation,
"Unsupported stream observation mode.")
};
}
/// <summary>
/// 只推进到首个元素后立即释放枚举器,用来近似隔离建流与首个 <c>MoveNextAsync</c> 的固定成本。
/// </summary>
/// <typeparam name="TResponse">当前 stream 的响应类型。</typeparam>
/// <param name="responses">待观察的异步响应序列。</param>
/// <param name="cancellationToken">用于向异步枚举器传播取消的令牌。</param>
/// <returns>消费首个元素后的等待句柄。</returns>
private static async ValueTask ConsumeFirstItemAsync<TResponse>(
IAsyncEnumerable<TResponse> responses,
CancellationToken cancellationToken)
{
var enumerator = responses.GetAsyncEnumerator(cancellationToken);
await using (enumerator.ConfigureAwait(false))
{
// 这里显式读取 Current只为了让所有路径都完成首个元素的同等消费。
if (await enumerator.MoveNextAsync().ConfigureAwait(false))
{
_ = enumerator.Current;
}
}
}
/// <summary>
/// 通过 GFramework.CQRS 反射 stream binding 路径创建并完整枚举 stream。
/// 完整枚举整个 stream保留原 benchmark 的总成本观测口径
/// </summary>
[Benchmark]
public async ValueTask Stream_GFrameworkReflection()
/// <typeparam name="TResponse">当前 stream 的响应类型。</typeparam>
/// <param name="responses">待完整枚举的异步响应序列。</param>
/// <returns>完整枚举结束后的等待句柄。</returns>
private static async ValueTask DrainAsync<TResponse>(IAsyncEnumerable<TResponse> responses)
{
await foreach (var response in _reflectionRuntime.CreateStream(BenchmarkContext.Instance, _reflectionRequest, CancellationToken.None)
.ConfigureAwait(false))
{
_ = response;
}
}
/// <summary>
/// 通过 generated stream invoker provider 预热后的 GFramework.CQRS runtime 创建并完整枚举 stream。
/// </summary>
[Benchmark]
public async ValueTask Stream_GFrameworkGenerated()
{
await foreach (var response in _generatedRuntime.CreateStream(BenchmarkContext.Instance, _generatedRequest, CancellationToken.None)
.ConfigureAwait(false))
{
_ = response;
}
}
/// <summary>
/// 通过 MediatR 创建并完整枚举 stream作为外部对照。
/// </summary>
[Benchmark]
public async ValueTask Stream_MediatR()
{
await foreach (var response in _mediatr.CreateStream(_mediatrRequest, CancellationToken.None).ConfigureAwait(false))
await foreach (var response in responses.ConfigureAwait(false))
{
_ = response;
}
@ -214,6 +297,9 @@ public class StreamInvokerBenchmarks
/// <summary>
/// 处理 reflection benchmark stream request。
/// </summary>
/// <param name="request">当前 reflection benchmark stream 请求。</param>
/// <param name="cancellationToken">用于中断异步枚举的取消令牌。</param>
/// <returns>完整枚举所需的低噪声异步响应序列。</returns>
public IAsyncEnumerable<ReflectionBenchmarkResponse> Handle(
ReflectionBenchmarkStreamRequest request,
CancellationToken cancellationToken)
@ -235,6 +321,9 @@ public class StreamInvokerBenchmarks
/// <summary>
/// 处理 generated benchmark stream request。
/// </summary>
/// <param name="request">当前 generated benchmark stream 请求。</param>
/// <param name="cancellationToken">用于中断异步枚举的取消令牌。</param>
/// <returns>完整枚举所需的低噪声异步响应序列。</returns>
public IAsyncEnumerable<GeneratedBenchmarkResponse> Handle(
GeneratedBenchmarkStreamRequest request,
CancellationToken cancellationToken)
@ -256,6 +345,9 @@ public class StreamInvokerBenchmarks
/// <summary>
/// 处理 MediatR benchmark stream request。
/// </summary>
/// <param name="request">当前 MediatR benchmark stream 请求。</param>
/// <param name="cancellationToken">用于中断异步枚举的取消令牌。</param>
/// <returns>完整枚举所需的低噪声异步响应序列。</returns>
public IAsyncEnumerable<MediatRBenchmarkResponse> Handle(
MediatRBenchmarkStreamRequest request,
CancellationToken cancellationToken)
@ -271,6 +363,12 @@ public class StreamInvokerBenchmarks
/// <summary>
/// 为三组 stream benchmark 构造相同形状的低噪声异步枚举,避免枚举体差异干扰 invoker 对照。
/// </summary>
/// <typeparam name="TResponse">当前 stream 的响应类型。</typeparam>
/// <param name="id">每个响应复用的稳定标识。</param>
/// <param name="itemCount">待返回的响应元素数量。</param>
/// <param name="responseFactory">将稳定标识映射为响应对象的工厂。</param>
/// <param name="cancellationToken">用于中断异步枚举的取消令牌。</param>
/// <returns>供各对照路径共享的低噪声异步响应序列。</returns>
private static async IAsyncEnumerable<TResponse> EnumerateAsync<TResponse>(
Guid id,
int itemCount,

View File

@ -7,23 +7,21 @@ CQRS 迁移与收敛。
## 当前恢复点
- 恢复点编号:`CQRS-REWRITE-RP-128`
- 恢复点编号:`CQRS-REWRITE-RP-129`
- 当前阶段:`Phase 8`
- 当前 PR 锚点:`PR #345`
- 当前 PR 锚点:`待重新抓取`
- 当前结论:
- 当前 `RP-128``$gframework-pr-review` 复核 `PR #345` latest-head review body确认没有新的 unresolved thread但仍有 `4` 条 CodeRabbit actionable comments 需要回到本地代码逐条验真
- 本轮已收口仍成立的 review 输入:为 `AGENTS.md` 补充 multi-agent budget 术语释义;为 `StreamLifetimeBenchmarks` 补齐 `CancellationToken` 传播、`[EnumeratorCancellation]` 短名引用与类级 `<remarks>` 说明;为 benchmark `README` 去掉治理型 `RP-127` 表述;并把 `ai-plan/public/cqrs-rewrite/**` 的 PR 锚点与 branch diff 数字更新到当前 head
- 最新权威 diff 基线已统一为当前分支相对 `origin/main``d85828c5`, `2026-05-09 12:25:41 +0800`)的 `21 files / 1344 insertions / 194 deletions`active tracking、active trace 与后续 review triage 均以该组数字为准
- 当前 `RP-127` 延续 `$gframework-batch-boot 50`,在 `RP-126` 已补齐 stream lifetime 四方口径后,再用 `b7fa3eee``CqrsDispatcher.CreateStream(...)` 的 stream dispatch binding 改为按 `TResponse` 强类型缓存,同时为 `StreamLifetimeBenchmarks` 增加 `FirstItem / DrainAll` 观测维度,并把新的结果回填到公开可恢复文档
- 本轮写面落在 `GFramework.Cqrs/Internal/CqrsDispatcher.cs``GFramework.Cqrs.Tests/Cqrs/CqrsDispatcherCacheTests.cs``GFramework.Cqrs.Tests/Cqrs/CqrsGeneratedRequestInvokerProviderTests.cs``GFramework.Cqrs.Benchmarks/Messaging/StreamLifetimeBenchmarks.cs`,以及 `GFramework.Cqrs.Benchmarks/README.md` / `ai-plan/public/cqrs-rewrite/**` 恢复文档;没有扩散到 request runtime、notification runtime 或额外文档模块
- `f9c9561f` 为 request lifetime 补入 handwritten generated registry并在 setup/cleanup 清理 dispatcher cache这样 `Singleton / Transient` 生命周期矩阵继续只比较 handler 生命周期与 dispatch 常量路径,不再混入旧宿主差异
- `9107e232` 将 stream lifetime 的 reflection、generated 与 `MediatR` 请求/响应/handler 彻底拆开,并限制 generated registry 只绑定 generated lane避免静态 dispatcher cache 把不同 stream 对照口径污染到一起
- 当前 request lifetime benchmark 已用新宿主重新验证:`Singleton` 下 baseline / `GFramework.Cqrs` / `MediatR` 约为 `5.012 ns / 32 B``49.612 ns / 32 B``51.796 ns / 232 B``Transient` 下约为 `3.962 ns / 32 B``50.480 ns / 56 B``50.284 ns / 232 B`
- 当前 stream lifetime benchmark 已更新为 `Observation=FirstItem / DrainAll` 双口径:`Singleton + FirstItem` 下 baseline / generated / reflection / `MediatR` 约为 `48.704 ns / 216 B``94.629 ns / 216 B``95.417 ns / 216 B``152.886 ns / 608 B``Singleton + DrainAll` 下约为 `73.335 ns / 280 B``118.860 ns / 280 B``119.632 ns / 280 B``205.629 ns / 672 B`
- `Transient + FirstItem` 下 baseline / reflection / generated / `MediatR` 约为 `48.293 ns / 216 B``97.628 ns / 240 B``100.011 ns / 240 B``154.149 ns / 608 B``Transient + DrainAll` 下约为 `78.466 ns / 280 B``124.174 ns / 304 B``116.780 ns / 304 B``220.040 ns / 672 B`
- 现阶段可恢复结论收口为三点:一是 stream lifetime 已具备四方口径加 `FirstItem / DrainAll` 双观测维度;二是 `b7fa3eee` 已让 generated lane 在 `DrainAll` 口径下重新领先 reflection三是 `Transient + FirstItem` 仍保留约 `2.4 ns` 的小幅反向差值,更像建流到首个元素之间的瞬时成本,而不是完整枚举阶段退化
- 当前已提交分支相对 `origin/main``d85828c5`, `2026-05-09 12:25:41 +0800`)的累计 branch diff 已到 `21 files``1344 insertions / 194 deletions`),仍明显低于 `$gframework-batch-boot 50``50 files` stop condition
- 下一推荐步骤:若继续 benchmark 线,优先从 `StreamLifetimeBenchmarks``Transient + FirstItem` 小幅差值继续恢复,并用 `StreamInvokerBenchmarks` 复核 generated lane 的常量成本收益是否能在更窄口径下复现;若差值不再稳定,再决定是否转去 `Mediator` concrete runtime 的 stream lifetime 对照批次
- 当前 `RP-129` 继续沿用 `$gframework-batch-boot 50`,但本轮按 `gframework-multi-agent-batch` 的职责边界组织了一波 `3` 路互不冲突的 benchmark worker`StreamingBenchmarks` 观测口径拆分、`StreamInvokerBenchmarks` 观测口径拆分、`RequestLifetimeBenchmarks` 的真实 scoped-host 生命周期矩阵
- 本轮启动前已重新按 skill 规则复核基线:`origin/main` 与本地 `main` 当前都在 `699d0b48``2026-05-09 18:39:38 +0800`),且启动时 `origin/main...HEAD` 的累计 branch diff 为 `0 files / 0 lines`;旧 active 入口里 `21 files` 的数字已不再可作为当前波次基线
- 本轮写面限定在 benchmark 子系统:`GFramework.Cqrs.Benchmarks/Messaging/StreamingBenchmarks.cs``StreamInvokerBenchmarks.cs``RequestLifetimeBenchmarks.cs``BenchmarkHostFactory.cs`,以及新增的 `ScopedBenchmarkContainer.cs`;没有扩散到 `GFramework.Cqrs` runtime、测试项目或公共文档
- `StreamingBenchmarks` 已从单一完整枚举口径扩成 `FirstItem / DrainAll` 双观测模式worker smoke 结果表明默认 generated-provider steady-state stream 宿主在两种口径下都能稳定跑通,当前 short-job 约为 `FirstItem: baseline 62.14 ns / GFramework 118.83 ns / MediatR 182.16 ns``DrainAll: baseline 94.34 ns / GFramework 149.57 ns / MediatR 280.77 ns`
- `StreamInvokerBenchmarks` 现也具备 `FirstItem / DrainAll` 双观测模式,并已完成串行 smoke 运行;当前 short-job 下,`FirstItem` 口径里 generated lane 约 `59.44 ns`、reflection 约 `52.90 ns`,说明 tracking 里提到的 “generated 在首项前略慢于 reflection” 信号在更窄的 invoker 场景里仍可见
- `RequestLifetimeBenchmarks` 当前矩阵已从 `Singleton / Transient` 扩到 `Singleton / Scoped / Transient`,且 `Scoped` 不再退化为根容器解析,而是通过 `BenchmarkHostFactory` + `ScopedBenchmarkContainer` 在每次 request 分发时显式创建并释放真实作用域;本轮 short-job 下 `Scoped` 口径约为 baseline `5.60 ns / 32 B``MediatR` `170.94 ns / 648 B``GFramework.Cqrs` `575.92 ns / 3400 B`
- 本轮首次并行运行两个 BenchmarkDotNet `dotnet run --no-build` 过滤命令时触发自动生成目录争用;已按仓库规则改为串行重跑同一命令,并以串行结果作为权威 smoke 验证
- 当前可恢复结论收口为两点:一是 stream benchmark 线已从 `StreamLifetimeBenchmarks` 继续下探到 default steady-state 与 invoker 两个更窄口径;二是 request lifetime 线已经拥有真实 scoped-host 基线,后续若继续扩 `StreamLifetimeBenchmarks` 的 scoped 口径,不必再先做宿主适配
- 下一推荐步骤:
- 先回到 `ai-plan/public/cqrs-rewrite/**``GFramework.Cqrs.Benchmarks/README.md`,把本轮基线从旧的 `21 files / PR #345` 状态更新到当前 `699d0b48` 基线和新的 benchmark 结论
- 然后优先复核 `StreamInvokerBenchmarks` 当前 short-job 输出里 `DrainAll` 口径的异常排序是否只是 smoke 配置噪音,必要时把下一批切回更稳定的 benchmark job 或补更窄的 helper-level 对照,而不是直接据此下结论
- 更早的 `RP-123` 及之前阶段细节以下方 trace 与归档为准active 入口不再重复展开旧阶段流水。
- 当前分支相对 `origin/main` 的累计 branch diff 启动时为 `9 files`,仍明显低于 `$gframework-batch-boot 50` 的停止阈值;这一批继续保持单模块、低风险、可直接评审的 benchmark 边界
- 当前 `RP-113` 已继续沿用 `$gframework-batch-boot 50`,并把 notification 线从 benchmark 对照推进到实际 runtime 能力:新增公开内置 `TaskWhenAllNotificationPublisher`,让 `GFramework.Cqrs` 在保留默认顺序发布器的同时,提供与 `Mediator` `TaskWhenAllPublisher` 对齐的并行 notification publish 策略

View File

@ -1,5 +1,37 @@
# CQRS 重写迁移追踪
## 2026-05-11
### 阶段benchmark 多 worker 波次与 scoped-host 基线CQRS-REWRITE-RP-129
- 本轮从 `$gframework-batch-boot 50` 启动,但按 `gframework-multi-agent-batch` 规则把非阻塞工作拆成三条互不冲突的 benchmark 切片:
- `StreamingBenchmarks.cs`:默认 steady-state stream 宿主补 `FirstItem / DrainAll`
- `StreamInvokerBenchmarks.cs`generated/reflection/MediatR invoker 对照补 `FirstItem / DrainAll`
- `RequestLifetimeBenchmarks.cs` + `BenchmarkHostFactory.cs` + `ScopedBenchmarkContainer.cs`:为 request lifetime 引入真实 scoped-host 作用域边界
- 启动前已重新复核基线:`origin/main` 与本地 `main` 当前同为 `699d0b48`,启动时 `origin/main...HEAD``0 files / 0 lines`;因此先前 active 入口里的 `21 files` 已视为历史恢复信息,不再作为本轮 stop-condition 计数基线
- worker 输出验收:
- `StreamingBenchmarks` worker 仅触达 `StreamingBenchmarks.cs`,并完成 `dotnet build ... -c Release``dotnet run ... --filter "*StreamingBenchmarks*"`;本轮保留其实现与 smoke 结果
- `StreamInvokerBenchmarks` worker 只触达 `StreamInvokerBenchmarks.cs`,主线程保留其观测模式拆分;后续 smoke 验证改由主线程串行执行
- `RequestLifetimeBenchmarks` worker 在 `BenchmarkHostFactory.cs` 与新建 `ScopedBenchmarkContainer.cs` 留下未编译完的 scoped-helper 草稿;主线程接手补齐 `using``IContextAware` 转发与 `IPrioritized` 命名空间,然后完成 `RequestLifetimeBenchmarks.cs` 的 scoped 接线
- 本轮主线程关键修正:
- `ScopedBenchmarkContainer` 采用“根容器注册可见性 + 作用域 provider 实例解析”的只读适配层,避免在 request benchmark 里把 scoped handler 错误解析到根容器
- `BenchmarkHostFactory.SendScopedGFrameworkRequestAsync(...)``SendScopedMediatRRequestAsync(...)` 统一封装每次 request 的显式 scope 创建/释放逻辑
- 首次把 `StreamInvokerBenchmarks``RequestLifetimeBenchmarks` 并行跑 smoke 时触发 BenchmarkDotNet 自动生成目录争用;主线程按仓库规则改为串行重跑相同命令,并以串行结果为权威
- 本轮验证:
- `python3 scripts/license-header.py --check --paths GFramework.Cqrs.Benchmarks/Messaging/BenchmarkHostFactory.cs GFramework.Cqrs.Benchmarks/Messaging/RequestLifetimeBenchmarks.cs GFramework.Cqrs.Benchmarks/Messaging/ScopedBenchmarkContainer.cs GFramework.Cqrs.Benchmarks/Messaging/StreamInvokerBenchmarks.cs GFramework.Cqrs.Benchmarks/Messaging/StreamingBenchmarks.cs`
- 结果:通过
- `dotnet build GFramework.Cqrs.Benchmarks/GFramework.Cqrs.Benchmarks.csproj -c Release`
- 结果:通过,`0 warning / 0 error`
- `dotnet run --project GFramework.Cqrs.Benchmarks/GFramework.Cqrs.Benchmarks.csproj -c Release --no-build -- --filter "*StreamInvokerBenchmarks*" --job short --warmupCount 1 --iterationCount 1 --launchCount 1`
- 结果:通过(串行权威结果)
- 备注:`FirstItem` 下 baseline / reflection / generated 约为 `5.84 ns / 52.90 ns / 59.44 ns``DrainAll` 报告当前呈现异常排序,应只把本次结果视作 smoke 运行通过信号,不直接当成稳定性能结论
- `dotnet run --project GFramework.Cqrs.Benchmarks/GFramework.Cqrs.Benchmarks.csproj -c Release --no-build -- --filter "*RequestLifetimeBenchmarks.SendRequest_*" --job short --warmupCount 1 --iterationCount 1 --launchCount 1`
- 结果:通过(串行权威结果)
- 备注:`Singleton` 下 baseline / `GFramework.Cqrs` / `MediatR` 约为 `5.23 ns / 53.43 ns / 56.35 ns``Scoped` 下约为 `5.60 ns / 575.92 ns / 170.94 ns``Transient` 下约为 `6.00 ns / 58.70 ns / 60.91 ns`
- 下一恢复点:
- 先把 `GFramework.Cqrs.Benchmarks/README.md` 与本 active tracking / trace 从旧 `PR #345 / 21 files` 状态刷新到当前 `699d0b48` 基线和本轮新矩阵
- 然后判断 `StreamInvokerBenchmarks``DrainAll` smoke 输出是否需要单开一批稳定性复核;若只是 short-job 配置噪音,再考虑继续把 scoped host 扩到 stream lifetime而不是现在就解读该组数字
## 2026-05-09
### 阶段PR #345 latest-head review 收口CQRS-REWRITE-RP-128