diff --git a/GFramework.Cqrs.Benchmarks/Messaging/StreamPipelineBenchmarks.cs b/GFramework.Cqrs.Benchmarks/Messaging/StreamPipelineBenchmarks.cs new file mode 100644 index 00000000..b747e733 --- /dev/null +++ b/GFramework.Cqrs.Benchmarks/Messaging/StreamPipelineBenchmarks.cs @@ -0,0 +1,527 @@ +// Copyright (c) 2025-2026 GeWuYou +// SPDX-License-Identifier: Apache-2.0 + +using BenchmarkDotNet.Attributes; +using BenchmarkDotNet.Columns; +using BenchmarkDotNet.Configs; +using BenchmarkDotNet.Diagnosers; +using BenchmarkDotNet.Jobs; +using BenchmarkDotNet.Order; +using System; +using System.Collections.Generic; +using System.Reflection; +using System.Runtime.CompilerServices; +using System.Threading; +using System.Threading.Tasks; +using GFramework.Core.Abstractions.Logging; +using GFramework.Core.Ioc; +using GFramework.Core.Logging; +using GFramework.Cqrs.Abstractions.Cqrs; +using MediatR; +using Microsoft.Extensions.DependencyInjection; + +[assembly: GFramework.Cqrs.CqrsHandlerRegistryAttribute( + typeof(GFramework.Cqrs.Benchmarks.Messaging.StreamPipelineBenchmarks.GeneratedStreamPipelineBenchmarkRegistry))] + +namespace GFramework.Cqrs.Benchmarks.Messaging; + +/// +/// 对比不同 stream pipeline 行为数量下,单个 stream request 在直接调用、GFramework.CQRS runtime 与 MediatR 之间的 steady-state dispatch 开销。 +/// +/// +/// 当前矩阵同时覆盖 0 / 1 / 4 个 stream pipeline 行为,以及 +/// 两种观测口径, +/// 以便把建流固定成本与完整枚举成本拆开观察。 +/// +[Config(typeof(Config))] +public class StreamPipelineBenchmarks +{ + private MicrosoftDiContainer _container = null!; + private ICqrsRuntime _runtime = null!; + private ServiceProvider _serviceProvider = null!; + private IMediator _mediatr = null!; + private BenchmarkStreamHandler _baselineHandler = null!; + private BenchmarkStreamRequest _request = null!; + + /// + /// 控制当前场景注册的 stream pipeline 行为数量,保持与 request pipeline benchmark 相同的 0 / 1 / 4 矩阵。 + /// + [Params(0, 1, 4)] + public int PipelineCount { get; set; } + + /// + /// 控制当前 benchmark 观察“只推进首个元素”还是“完整枚举整个 stream”。 + /// + [Params(StreamObservation.FirstItem, StreamObservation.DrainAll)] + public StreamObservation Observation { get; set; } + + /// + /// 用于拆分 stream dispatch 固定成本与后续枚举成本的观测模式。 + /// + public enum StreamObservation + { + /// + /// 只推进到首个元素后立即释放枚举器。 + /// + FirstItem, + + /// + /// 完整枚举整个 stream,保留原有 benchmark 语义。 + /// + DrainAll + } + + /// + /// 配置 stream pipeline benchmark 的公共输出格式。 + /// + private sealed class Config : ManualConfig + { + public Config() + { + AddJob(Job.Default); + AddColumnProvider(DefaultColumnProviders.Instance); + AddColumn(new CustomColumn("Scenario", static (_, _) => "StreamPipeline")); + AddDiagnoser(MemoryDiagnoser.Default); + WithOrderer(new DefaultOrderer(SummaryOrderPolicy.FastestToSlowest, MethodOrderPolicy.Declared)); + } + } + + /// + /// 构建 stream pipeline dispatch 所需的最小 runtime 宿主和对照对象。 + /// + [GlobalSetup] + public void Setup() + { + LoggerFactoryResolver.Provider = new ConsoleLoggerFactoryProvider + { + MinLevel = LogLevel.Fatal + }; + Fixture.Setup("StreamPipeline", handlerCount: 1, pipelineCount: PipelineCount); + BenchmarkDispatcherCacheHelper.ClearDispatcherCaches(); + + _baselineHandler = new BenchmarkStreamHandler(); + _container = BenchmarkHostFactory.CreateFrozenGFrameworkContainer(container => + { + BenchmarkHostFactory.RegisterGeneratedBenchmarkRegistry(container); + RegisterGFrameworkPipelineBehaviors(container, PipelineCount); + }); + _runtime = GFramework.Cqrs.CqrsRuntimeFactory.CreateRuntime( + _container, + LoggerFactoryResolver.Provider.CreateLogger(nameof(StreamPipelineBenchmarks))); + + _serviceProvider = BenchmarkHostFactory.CreateMediatRServiceProvider( + services => + { + RegisterMediatRStreamPipelineBehaviors(services, PipelineCount); + }, + typeof(StreamPipelineBenchmarks), + static candidateType => + candidateType == typeof(BenchmarkStreamHandler) || + candidateType == typeof(BenchmarkStreamPipelineBehavior1) || + candidateType == typeof(BenchmarkStreamPipelineBehavior2) || + candidateType == typeof(BenchmarkStreamPipelineBehavior3) || + candidateType == typeof(BenchmarkStreamPipelineBehavior4), + ServiceLifetime.Singleton); + _mediatr = _serviceProvider.GetRequiredService(); + + _request = new BenchmarkStreamRequest(Guid.NewGuid(), 3); + } + + /// + /// 释放 MediatR 对照组使用的 DI 宿主。 + /// + [GlobalCleanup] + public void Cleanup() + { + try + { + BenchmarkCleanupHelper.DisposeAll(_container, _serviceProvider); + } + finally + { + BenchmarkDispatcherCacheHelper.ClearDispatcherCaches(); + } + } + + /// + /// 直接调用 handler,并按当前观测模式消费响应序列,作为 stream pipeline 编排之外的基线。 + /// + /// 按当前观测模式完成 stream 消费后的等待句柄。 + [Benchmark(Baseline = true)] + public ValueTask Stream_Baseline() + { + return ObserveAsync(_baselineHandler.Handle(_request, CancellationToken.None), Observation); + } + + /// + /// 通过 GFramework.CQRS runtime 创建 stream,并按当前矩阵配置执行 stream pipeline。 + /// + /// 按当前观测模式完成 stream 消费后的等待句柄。 + [Benchmark] + public ValueTask Stream_GFrameworkCqrs() + { + return ObserveAsync( + _runtime.CreateStream( + BenchmarkContext.Instance, + _request, + CancellationToken.None), + Observation); + } + + /// + /// 通过 MediatR 创建 stream,并按当前矩阵配置执行 stream pipeline,作为外部设计对照。 + /// + /// 按当前观测模式完成 stream 消费后的等待句柄。 + [Benchmark] + public ValueTask Stream_MediatR() + { + return ObserveAsync(_mediatr.CreateStream(_request, CancellationToken.None), Observation); + } + + /// + /// 按指定数量向 GFramework.CQRS 宿主注册最小 no-op stream pipeline 行为。 + /// + /// 当前 benchmark 使用的容器。 + /// 要注册的行为数量。 + /// 行为数量不在支持的矩阵内时抛出。 + private static void RegisterGFrameworkPipelineBehaviors(MicrosoftDiContainer container, int pipelineCount) + { + ArgumentNullException.ThrowIfNull(container); + + switch (pipelineCount) + { + case 0: + return; + case 1: + container.RegisterCqrsStreamPipelineBehavior(); + return; + case 4: + container.RegisterCqrsStreamPipelineBehavior(); + container.RegisterCqrsStreamPipelineBehavior(); + container.RegisterCqrsStreamPipelineBehavior(); + container.RegisterCqrsStreamPipelineBehavior(); + return; + default: + throw new ArgumentOutOfRangeException(nameof(pipelineCount), pipelineCount, + "Only the 0/1/4 pipeline matrix is supported."); + } + } + + /// + /// 按指定数量向 MediatR 宿主注册最小 no-op stream pipeline 行为。 + /// + /// 当前 benchmark 使用的服务集合。 + /// 要注册的行为数量。 + /// 行为数量不在支持的矩阵内时抛出。 + private static void RegisterMediatRStreamPipelineBehaviors(IServiceCollection services, int pipelineCount) + { + ArgumentNullException.ThrowIfNull(services); + + switch (pipelineCount) + { + case 0: + return; + case 1: + services.AddSingleton, BenchmarkStreamPipelineBehavior1>(); + return; + case 4: + services.AddSingleton, BenchmarkStreamPipelineBehavior1>(); + services.AddSingleton, BenchmarkStreamPipelineBehavior2>(); + services.AddSingleton, BenchmarkStreamPipelineBehavior3>(); + services.AddSingleton, BenchmarkStreamPipelineBehavior4>(); + return; + default: + throw new ArgumentOutOfRangeException(nameof(pipelineCount), pipelineCount, + "Only the 0/1/4 pipeline matrix is supported."); + } + } + + /// + /// 按观测模式消费 stream,便于把“建流/首个元素”和“完整枚举”分开观察。 + /// + /// 当前 stream 的响应类型。 + /// 待观察的异步响应序列。 + /// 当前 benchmark 选定的观测模式。 + /// 异步消费完成后的等待句柄。 + private static ValueTask ObserveAsync( + IAsyncEnumerable responses, + StreamObservation observation) + { + ArgumentNullException.ThrowIfNull(responses); + + return observation switch + { + StreamObservation.FirstItem => ConsumeFirstItemAsync(responses, CancellationToken.None), + StreamObservation.DrainAll => DrainAsync(responses), + _ => throw new ArgumentOutOfRangeException( + nameof(observation), + observation, + "Unsupported stream observation mode.") + }; + } + + /// + /// 只推进到首个元素后立即释放枚举器,用来近似隔离建流与首个 MoveNextAsync 的固定成本。 + /// + /// 当前 stream 的响应类型。 + /// 待观察的异步响应序列。 + /// 用于向异步枚举器传播取消的令牌。 + /// 消费首个元素后的等待句柄。 + private static async ValueTask ConsumeFirstItemAsync( + IAsyncEnumerable responses, + CancellationToken cancellationToken) + { + var enumerator = responses.GetAsyncEnumerator(cancellationToken); + await using (enumerator.ConfigureAwait(false)) + { + if (await enumerator.MoveNextAsync().ConfigureAwait(false)) + { + _ = enumerator.Current; + } + } + } + + /// + /// 完整枚举整个 stream,保留原 benchmark 的总成本观测口径。 + /// + /// 当前 stream 的响应类型。 + /// 待完整枚举的异步响应序列。 + /// 完整枚举结束后的等待句柄。 + private static async ValueTask DrainAsync(IAsyncEnumerable responses) + { + await foreach (var response in responses.ConfigureAwait(false)) + { + _ = response; + } + } + + /// + /// Benchmark stream request。 + /// + /// 请求标识。 + /// 返回元素数量。 + public sealed record BenchmarkStreamRequest(Guid Id, int ItemCount) : + GFramework.Cqrs.Abstractions.Cqrs.IStreamRequest, + MediatR.IStreamRequest; + + /// + /// 复用 stream benchmark 的响应结构,保持跨场景可比性。 + /// + /// 响应标识。 + public sealed record BenchmarkResponse(Guid Id); + + /// + /// 同时实现 GFramework.CQRS 与 MediatR 契约的最小 stream handler。 + /// + public sealed class BenchmarkStreamHandler : + GFramework.Cqrs.Abstractions.Cqrs.IStreamRequestHandler, + MediatR.IStreamRequestHandler + { + /// + /// 处理 GFramework.CQRS stream request。 + /// + /// 当前 benchmark stream 请求。 + /// 用于中断异步枚举的取消令牌。 + /// 低噪声、可重复的异步响应序列。 + public IAsyncEnumerable Handle( + BenchmarkStreamRequest request, + CancellationToken cancellationToken) + { + return EnumerateAsync(request, cancellationToken); + } + + /// + /// 处理 MediatR stream request。 + /// + /// 当前 benchmark stream 请求。 + /// 用于中断异步枚举的取消令牌。 + /// 低噪声、可重复的异步响应序列。 + IAsyncEnumerable MediatR.IStreamRequestHandler.Handle( + BenchmarkStreamRequest request, + CancellationToken cancellationToken) + { + return EnumerateAsync(request, cancellationToken); + } + + /// + /// 为 benchmark 构造稳定、低噪声的异步响应序列。 + /// + /// 决定元素数量和标识的 benchmark 请求。 + /// 用于中断异步枚举的取消令牌。 + /// 按请求数量生成的响应序列。 + private static async IAsyncEnumerable EnumerateAsync( + BenchmarkStreamRequest request, + [EnumeratorCancellation] CancellationToken cancellationToken) + { + for (int index = 0; index < request.ItemCount; index++) + { + cancellationToken.ThrowIfCancellationRequested(); + yield return new BenchmarkResponse(request.Id); + await Task.CompletedTask.ConfigureAwait(false); + } + } + } + + /// + /// 为 benchmark 提供统一的 no-op stream pipeline 行为实现,尽量把测量焦点保持在调度器与行为编排本身。 + /// + public abstract class BenchmarkStreamPipelineBehaviorBase : + GFramework.Cqrs.Abstractions.Cqrs.IStreamPipelineBehavior, + MediatR.IStreamPipelineBehavior + { + /// + /// 透传 GFramework.CQRS stream pipeline,避免引入额外业务逻辑噪音。 + /// + /// 当前 benchmark stream 请求。 + /// 继续向下执行的 stream pipeline 委托。 + /// 取消令牌。 + /// 下游 handler 产出的异步响应序列。 + public IAsyncEnumerable Handle( + BenchmarkStreamRequest message, + GFramework.Cqrs.Abstractions.Cqrs.StreamMessageHandlerDelegate next, + CancellationToken cancellationToken) + { + return next(message, cancellationToken); + } + + /// + /// 透传 MediatR stream pipeline,保持与 GFramework.CQRS 相同的 no-op 语义。 + /// + /// 当前 benchmark stream 请求。 + /// 继续向下执行的 MediatR stream pipeline 委托。 + /// 取消令牌。 + /// 下游 handler 产出的异步响应序列。 + IAsyncEnumerable MediatR.IStreamPipelineBehavior.Handle( + BenchmarkStreamRequest request, + MediatR.StreamHandlerDelegate next, + CancellationToken cancellationToken) + { + _ = request; + _ = cancellationToken; + return next(); + } + } + + /// + /// pipeline 矩阵中的第一个 no-op stream 行为。 + /// + public sealed class BenchmarkStreamPipelineBehavior1 : BenchmarkStreamPipelineBehaviorBase + { + } + + /// + /// pipeline 矩阵中的第二个 no-op stream 行为。 + /// + public sealed class BenchmarkStreamPipelineBehavior2 : BenchmarkStreamPipelineBehaviorBase + { + } + + /// + /// pipeline 矩阵中的第三个 no-op stream 行为。 + /// + public sealed class BenchmarkStreamPipelineBehavior3 : BenchmarkStreamPipelineBehaviorBase + { + } + + /// + /// pipeline 矩阵中的第四个 no-op stream 行为。 + /// + public sealed class BenchmarkStreamPipelineBehavior4 : BenchmarkStreamPipelineBehaviorBase + { + } + + /// + /// 为 stream pipeline benchmark 提供 handwritten generated registry, + /// 让默认 pipeline 宿主也能走真实的 generated stream invoker provider 接线路径。 + /// + public sealed class GeneratedStreamPipelineBenchmarkRegistry : + GFramework.Cqrs.ICqrsHandlerRegistry, + GFramework.Cqrs.ICqrsStreamInvokerProvider, + GFramework.Cqrs.IEnumeratesCqrsStreamInvokerDescriptors + { + private static readonly GFramework.Cqrs.CqrsStreamInvokerDescriptor Descriptor = + new( + typeof(GFramework.Cqrs.Abstractions.Cqrs.IStreamRequestHandler< + BenchmarkStreamRequest, + BenchmarkResponse>), + typeof(GeneratedStreamPipelineBenchmarkRegistry).GetMethod( + nameof(InvokeBenchmarkStreamHandler), + BindingFlags.Public | BindingFlags.Static) + ?? throw new InvalidOperationException("Missing generated stream pipeline benchmark method.")); + + private static readonly IReadOnlyList Descriptors = + [ + new GFramework.Cqrs.CqrsStreamInvokerDescriptorEntry( + typeof(BenchmarkStreamRequest), + typeof(BenchmarkResponse), + Descriptor) + ]; + + /// + /// 把 stream pipeline benchmark handler 注册为单例,保持与当前矩阵宿主一致的生命周期语义。 + /// + /// 用于承载 generated handler 注册的服务集合。 + /// 记录 generated registry 接线结果的日志器。 + public void Register(IServiceCollection services, ILogger logger) + { + ArgumentNullException.ThrowIfNull(services); + ArgumentNullException.ThrowIfNull(logger); + + services.AddSingleton( + typeof(GFramework.Cqrs.Abstractions.Cqrs.IStreamRequestHandler), + typeof(BenchmarkStreamHandler)); + logger.Debug("Registered generated stream pipeline benchmark handler."); + } + + /// + /// 返回当前 provider 暴露的全部 generated stream invoker 描述符。 + /// + /// 当前 benchmark 的 generated stream invoker 描述符集合。 + public IReadOnlyList GetDescriptors() + { + return Descriptors; + } + + /// + /// 为目标流式请求/响应类型对返回 generated stream invoker 描述符。 + /// + /// 要匹配的 stream 请求类型。 + /// 要匹配的 stream 响应类型。 + /// 命中时返回的 generated stream invoker 描述符。 + /// 是否命中了当前 benchmark 的 stream 请求/响应类型对。 + public bool TryGetDescriptor( + Type requestType, + Type responseType, + out GFramework.Cqrs.CqrsStreamInvokerDescriptor? descriptor) + { + if (requestType == typeof(BenchmarkStreamRequest) && + responseType == typeof(BenchmarkResponse)) + { + descriptor = Descriptor; + return true; + } + + descriptor = null; + return false; + } + + /// + /// 模拟 generated stream invoker provider 为 stream pipeline benchmark 产出的开放静态调用入口。 + /// + /// 当前要调用的 stream handler 实例。 + /// 当前要分发的 stream 请求实例。 + /// 用于向 handler 传播的取消令牌。 + /// handler 产出的异步响应序列。 + public static object InvokeBenchmarkStreamHandler( + object handler, + object request, + CancellationToken cancellationToken) + { + var typedHandler = (GFramework.Cqrs.Abstractions.Cqrs.IStreamRequestHandler< + BenchmarkStreamRequest, + BenchmarkResponse>)handler; + var typedRequest = (BenchmarkStreamRequest)request; + return typedHandler.Handle(typedRequest, cancellationToken); + } + } +} diff --git a/GFramework.Cqrs.Benchmarks/Messaging/StreamingBenchmarks.cs b/GFramework.Cqrs.Benchmarks/Messaging/StreamingBenchmarks.cs index bfb7d07e..bb3f634a 100644 --- a/GFramework.Cqrs.Benchmarks/Messaging/StreamingBenchmarks.cs +++ b/GFramework.Cqrs.Benchmarks/Messaging/StreamingBenchmarks.cs @@ -17,6 +17,7 @@ using GFramework.Core.Logging; using GFramework.Cqrs.Abstractions.Cqrs; using MediatR; using Microsoft.Extensions.DependencyInjection; +using GeneratedMediator = Mediator.Mediator; [assembly: GFramework.Cqrs.CqrsHandlerRegistryAttribute( typeof(GFramework.Cqrs.Benchmarks.Messaging.GeneratedDefaultStreamingBenchmarkRegistry))] @@ -24,7 +25,7 @@ using Microsoft.Extensions.DependencyInjection; namespace GFramework.Cqrs.Benchmarks.Messaging; /// -/// 对比单个 stream request 在直接调用、GFramework.CQRS runtime 与 MediatR 之间的 steady-state stream 开销。 +/// 对比单个 stream request 在直接调用、GFramework.CQRS runtime、NuGet `Mediator` 与 MediatR 之间的 steady-state stream 开销。 /// /// /// 默认 generated-provider stream 宿主同时暴露 与 @@ -36,8 +37,10 @@ public class StreamingBenchmarks { private MicrosoftDiContainer _container = null!; private ICqrsRuntime _runtime = null!; - private ServiceProvider _serviceProvider = null!; + private ServiceProvider _mediatrServiceProvider = null!; + private ServiceProvider _mediatorServiceProvider = null!; private IMediator _mediatr = null!; + private GeneratedMediator _mediator = null!; private BenchmarkStreamHandler _baselineHandler = null!; private BenchmarkStreamRequest _request = null!; @@ -100,25 +103,28 @@ public class StreamingBenchmarks _container, LoggerFactoryResolver.Provider.CreateLogger(nameof(StreamingBenchmarks))); - _serviceProvider = BenchmarkHostFactory.CreateMediatRServiceProvider( + _mediatrServiceProvider = BenchmarkHostFactory.CreateMediatRServiceProvider( configure: null, typeof(StreamingBenchmarks), static candidateType => candidateType == typeof(BenchmarkStreamHandler), ServiceLifetime.Singleton); - _mediatr = _serviceProvider.GetRequiredService(); + _mediatr = _mediatrServiceProvider.GetRequiredService(); + + _mediatorServiceProvider = BenchmarkHostFactory.CreateMediatorServiceProvider(configure: null); + _mediator = _mediatorServiceProvider.GetRequiredService(); _request = new BenchmarkStreamRequest(Guid.NewGuid(), 3); } /// - /// 释放 MediatR 对照组使用的 DI 宿主。 + /// 释放 MediatR 与 `Mediator` 对照组使用的 DI 宿主。 /// [GlobalCleanup] public void Cleanup() { try { - BenchmarkCleanupHelper.DisposeAll(_container, _serviceProvider); + BenchmarkCleanupHelper.DisposeAll(_container, _mediatrServiceProvider, _mediatorServiceProvider); } finally { @@ -158,6 +164,16 @@ public class StreamingBenchmarks return ObserveAsync(_mediatr.CreateStream(_request, CancellationToken.None), Observation); } + /// + /// 通过 `ai-libs/Mediator` 的 source-generated concrete mediator 创建 stream,并按当前观测模式消费。 + /// + /// 按当前观测模式完成 stream 消费后的等待句柄。 + [Benchmark] + public ValueTask Stream_Mediator() + { + return ObserveAsync(_mediator.CreateStream(_request, CancellationToken.None), Observation); + } + /// /// 按观测模式消费 stream,便于把“建流/首个元素”和“完整枚举”分开观察。 /// @@ -224,6 +240,7 @@ public class StreamingBenchmarks /// 返回元素数量。 public sealed record BenchmarkStreamRequest(Guid Id, int ItemCount) : GFramework.Cqrs.Abstractions.Cqrs.IStreamRequest, + Mediator.IStreamRequest, MediatR.IStreamRequest; /// @@ -233,10 +250,11 @@ public class StreamingBenchmarks public sealed record BenchmarkResponse(Guid Id); /// - /// 同时实现 GFramework.CQRS 与 MediatR 契约的最小 stream handler。 + /// 同时实现 GFramework.CQRS、NuGet `Mediator` 与 MediatR 契约的最小 stream handler。 /// public sealed class BenchmarkStreamHandler : GFramework.Cqrs.Abstractions.Cqrs.IStreamRequestHandler, + Mediator.IStreamRequestHandler, MediatR.IStreamRequestHandler { /// @@ -249,6 +267,16 @@ public class StreamingBenchmarks return EnumerateAsync(request, cancellationToken); } + /// + /// 处理 NuGet `Mediator` stream request。 + /// + IAsyncEnumerable Mediator.IStreamRequestHandler.Handle( + BenchmarkStreamRequest request, + CancellationToken cancellationToken) + { + return Handle(request, cancellationToken); + } + /// /// 处理 MediatR stream request。 /// diff --git a/GFramework.Cqrs.Benchmarks/README.md b/GFramework.Cqrs.Benchmarks/README.md index 69038b45..dc1e0d15 100644 --- a/GFramework.Cqrs.Benchmarks/README.md +++ b/GFramework.Cqrs.Benchmarks/README.md @@ -26,7 +26,7 @@ - `Initialization` 与 `ColdStart` 两组下,`GFramework.Cqrs`、NuGet `Mediator`、`MediatR` - stream steady-state - `Messaging/StreamingBenchmarks.cs` - - baseline、默认 generated-provider 宿主接线的 `GFramework.Cqrs` runtime 与 `MediatR` + - baseline、默认 generated-provider 宿主接线的 `GFramework.Cqrs` runtime、NuGet `Mediator` source-generated concrete path 与 `MediatR` - 同时提供 `FirstItem` 与 `DrainAll` 两种观测口径 - `Messaging/StreamLifetimeBenchmarks.cs` - `Singleton / Scoped / Transient` 三类 handler 生命周期下,baseline、`GFramework.Cqrs` reflection stream binding、`GFramework.Cqrs` generated stream registry、`MediatR` @@ -34,6 +34,9 @@ - `Messaging/StreamInvokerBenchmarks.cs` - baseline、`GFramework.Cqrs` reflection stream binding、`GFramework.Cqrs` generated stream invoker、`MediatR` - 同时提供 `FirstItem` 与 `DrainAll` 两种观测口径 + - `Messaging/StreamPipelineBenchmarks.cs` + - `0 / 1 / 4` 个 stream pipeline 行为下,baseline、默认 generated-provider 宿主接线的 `GFramework.Cqrs` runtime 与 `MediatR` + - 同时提供 `FirstItem` 与 `DrainAll` 两种观测口径 - stream startup - `Messaging/StreamStartupBenchmarks.cs` - `Initialization` 与 `ColdStart` 两组下,`GFramework.Cqrs` reflection、`GFramework.Cqrs` generated、`MediatR` @@ -68,6 +71,7 @@ dotnet run --project GFramework.Cqrs.Benchmarks/GFramework.Cqrs.Benchmarks.cspro ```bash dotnet run --project GFramework.Cqrs.Benchmarks/GFramework.Cqrs.Benchmarks.csproj -c Release --no-build -- --filter "*RequestLifetimeBenchmarks.SendRequest_*" dotnet run --project GFramework.Cqrs.Benchmarks/GFramework.Cqrs.Benchmarks.csproj -c Release --no-build -- --filter "*StreamLifetimeBenchmarks.Stream_*" +dotnet run --project GFramework.Cqrs.Benchmarks/GFramework.Cqrs.Benchmarks.csproj -c Release --no-build -- --filter "*StreamPipelineBenchmarks.Stream_*" ``` ## 并发运行约束 @@ -87,7 +91,7 @@ dotnet run --project GFramework.Cqrs.Benchmarks/GFramework.Cqrs.Benchmarks.cspro - `RequestLifetimeBenchmarks` 的 `Scoped` 场景会在每次 request 分发时显式创建并释放真实 DI 作用域;它观察的是 scoped handler 的解析与 dispatch 成本,不把 runtime 构造常量成本混入生命周期对照 - `NotificationLifetimeBenchmarks` 的 `Scoped` 场景也采用真实 DI 作用域;它比较的是 publish 路径上的生命周期额外开销,不是根容器解析退化后的近似值 -- `StreamingBenchmarks`、`StreamLifetimeBenchmarks`、`StreamInvokerBenchmarks` 同时暴露 `FirstItem` 与 `DrainAll` +- `StreamingBenchmarks`、`StreamLifetimeBenchmarks`、`StreamInvokerBenchmarks`、`StreamPipelineBenchmarks` 同时暴露 `FirstItem` 与 `DrainAll` - `FirstItem` 适合观察“建流到首个元素”的固定成本 - `DrainAll` 适合观察完整枚举整个 stream 的总成本 - `StreamStartupBenchmarks` 的 `ColdStart` 只推进到首个元素,因此它回答的是“新宿主下首次建流命中”的边界,不回答完整枚举总成本 @@ -96,7 +100,6 @@ dotnet run --project GFramework.Cqrs.Benchmarks/GFramework.Cqrs.Benchmarks.cspro ## 当前缺口 -- 当前没有 stream 版的 NuGet `Mediator` source-generated concrete path 对照;stream steady-state、lifetime、startup 现在都只覆盖 `GFramework.Cqrs` 与 `MediatR` +- 当前没有 stream 生命周期与 startup 版的 NuGet `Mediator` source-generated concrete path 对照;`StreamLifetimeBenchmarks` 与 `StreamStartupBenchmarks` 现在都只覆盖 `GFramework.Cqrs` 与 `MediatR` - 当前没有 request 生命周期下的 NuGet `Mediator` compile-time lifetime 矩阵;`RequestLifetimeBenchmarks` 只覆盖 `GFramework.Cqrs` 与 `MediatR` - 当前没有 notification fan-out 的生命周期矩阵;`NotificationFanOutBenchmarks` 只覆盖固定 `4 handler` 的已装配宿主 -- 当前没有 stream pipeline benchmark;现有 pipeline coverage 仅限 request diff --git a/GFramework.Cqrs.Tests/Cqrs/CqrsHandlerRegistrarFallbackFailureTests.cs b/GFramework.Cqrs.Tests/Cqrs/CqrsHandlerRegistrarFallbackFailureTests.cs index ebfa6659..ffbe5196 100644 --- a/GFramework.Cqrs.Tests/Cqrs/CqrsHandlerRegistrarFallbackFailureTests.cs +++ b/GFramework.Cqrs.Tests/Cqrs/CqrsHandlerRegistrarFallbackFailureTests.cs @@ -138,6 +138,68 @@ internal sealed class CqrsHandlerRegistrarFallbackFailureTests }); } + /// + /// 验证当 generated registry 是抽象类型时,registrar 会记录告警并回退到反射扫描。 + /// + [Test] + public void RegisterHandlers_Should_Fall_Back_To_Reflection_When_Generated_Registry_Is_Abstract() + { + var generatedAssembly = CreateGeneratedRegistryAssembly( + "GFramework.Cqrs.Tests.Cqrs.AbstractGeneratedRegistryAssembly, Version=1.0.0.0", + typeof(AbstractGeneratedNotificationHandlerRegistry)); + generatedAssembly + .Setup(static assembly => assembly.GetTypes()) + .Returns([typeof(GeneratedRegistryNotificationHandler)]); + + var container = new MicrosoftDiContainer(); + CqrsTestRuntime.RegisterHandlers(container, generatedAssembly.Object); + + Assert.Multiple(() => + { + Assert.That( + GetGeneratedRegistryNotificationHandlerTypes(container), + Is.EqualTo([typeof(GeneratedRegistryNotificationHandler)])); + Assert.That( + GetWarningLogs().Any(log => + log.Message.Contains("because it is abstract", StringComparison.Ordinal)), + Is.True); + }); + + generatedAssembly.Verify(static assembly => assembly.GetTypes(), Times.Once); + } + + /// + /// 验证当 generated registry 不暴露可访问无参构造器时,registrar 会记录告警并回退到反射扫描。 + /// + [Test] + public void RegisterHandlers_Should_Fall_Back_To_Reflection_When_Generated_Registry_Has_No_Parameterless_Constructor() + { + var generatedAssembly = CreateGeneratedRegistryAssembly( + "GFramework.Cqrs.Tests.Cqrs.NoParameterlessGeneratedRegistryAssembly, Version=1.0.0.0", + typeof(ConstructorArgumentNotificationHandlerRegistry)); + generatedAssembly + .Setup(static assembly => assembly.GetTypes()) + .Returns([typeof(GeneratedRegistryNotificationHandler)]); + + var container = new MicrosoftDiContainer(); + CqrsTestRuntime.RegisterHandlers(container, generatedAssembly.Object); + + Assert.Multiple(() => + { + Assert.That( + GetGeneratedRegistryNotificationHandlerTypes(container), + Is.EqualTo([typeof(GeneratedRegistryNotificationHandler)])); + Assert.That( + GetWarningLogs().Any(log => + log.Message.Contains( + "does not expose an accessible parameterless constructor", + StringComparison.Ordinal)), + Is.True); + }); + + generatedAssembly.Verify(static assembly => assembly.GetTypes(), Times.Once); + } + /// /// 创建一个仅通过 generated registry 注册主 handler、并附带指定 fallback 元数据的程序集替身。 /// @@ -161,6 +223,24 @@ internal sealed class CqrsHandlerRegistrarFallbackFailureTests return generatedAssembly; } + /// + /// 创建一个只声明 generated registry attribute 的程序集替身,用于验证 registry 激活失败后的回退行为。 + /// + /// 用于日志与缓存键的程序集名。 + /// 要暴露给 registrar 的 generated registry 类型。 + /// 已完成基础接线的程序集 mock。 + private static Mock CreateGeneratedRegistryAssembly(string assemblyName, Type registryType) + { + var generatedAssembly = new Mock(); + generatedAssembly + .SetupGet(static assembly => assembly.FullName) + .Returns(assemblyName); + generatedAssembly + .Setup(static assembly => assembly.GetCustomAttributes(typeof(CqrsHandlerRegistryAttribute), false)) + .Returns([new CqrsHandlerRegistryAttribute(registryType)]); + return generatedAssembly; + } + /// /// 提取容器中针对 generated notification 注册的处理器实现类型。 /// @@ -259,4 +339,55 @@ internal sealed class CqrsHandlerRegistrarFallbackFailureTests .Where(static log => log.Level == LogLevel.Warning) .ToArray(); } + + /// + /// 模拟 generated registry 被错误声明为抽象类型时的激活失败场景。 + /// + private abstract class AbstractGeneratedNotificationHandlerRegistry : ICqrsHandlerRegistry + { + /// + /// 抽象 registry 即便具备注册逻辑,也不应被运行时实例化。 + /// + /// 承载处理器映射的服务集合。 + /// 记录注册诊断的日志器。 + public void Register(IServiceCollection services, ILogger logger) + { + ArgumentNullException.ThrowIfNull(services); + ArgumentNullException.ThrowIfNull(logger); + + services.AddTransient( + typeof(INotificationHandler), + typeof(GeneratedRegistryNotificationHandler)); + } + } + + /// + /// 模拟 generated registry 缺少可访问无参构造器时的激活失败场景。 + /// + private sealed class ConstructorArgumentNotificationHandlerRegistry : ICqrsHandlerRegistry + { + /// + /// 初始化一个只能通过额外参数构造的测试 registry。 + /// + /// 用于区分测试场景的占位参数。 + public ConstructorArgumentNotificationHandlerRegistry(string marker) + { + ArgumentNullException.ThrowIfNull(marker); + } + + /// + /// 此实现仅用于满足接口契约;本用例关注的是实例化失败前的回退行为。 + /// + /// 承载处理器映射的服务集合。 + /// 记录注册诊断的日志器。 + public void Register(IServiceCollection services, ILogger logger) + { + ArgumentNullException.ThrowIfNull(services); + ArgumentNullException.ThrowIfNull(logger); + + services.AddTransient( + typeof(INotificationHandler), + typeof(GeneratedRegistryNotificationHandler)); + } + } } diff --git a/GFramework.Cqrs.Tests/Cqrs/CqrsHandlerRegistrarTests.cs b/GFramework.Cqrs.Tests/Cqrs/CqrsHandlerRegistrarTests.cs index 967fc6a8..4fa6b124 100644 --- a/GFramework.Cqrs.Tests/Cqrs/CqrsHandlerRegistrarTests.cs +++ b/GFramework.Cqrs.Tests/Cqrs/CqrsHandlerRegistrarTests.cs @@ -6,6 +6,7 @@ using GFramework.Core.Architectures; using GFramework.Core.Ioc; using GFramework.Core.Logging; using GFramework.Cqrs.Abstractions.Cqrs; +using GFramework.Cqrs.Internal; using GFramework.Cqrs.Tests.Logging; namespace GFramework.Cqrs.Tests.Cqrs; @@ -170,6 +171,74 @@ internal sealed class CqrsHandlerRegistrarTests Is.EqualTo([typeof(GeneratedRegistryNotificationHandler)])); } + /// + /// 验证 direct generated-registry 激活入口在 registry 为抽象类型时会抛出异常,并保留契约告警。 + /// + [Test] + public void RegisterGeneratedRegistry_Should_Throw_When_Generated_Registry_Is_Abstract() + { + var capturingProvider = new CapturingLoggerFactoryProvider(LogLevel.Warning); + var logger = capturingProvider.CreateLogger(nameof(CqrsHandlerRegistrarTests)); + var container = new MicrosoftDiContainer(); + + var exception = Assert.Throws(() => + CqrsHandlerRegistrar.RegisterGeneratedRegistry( + container, + typeof(AbstractGeneratedNotificationHandlerRegistry), + logger)); + + var warningLogs = capturingProvider.Loggers + .SelectMany(static createdLogger => createdLogger.Logs) + .Where(static log => log.Level == LogLevel.Warning) + .ToArray(); + + Assert.Multiple(() => + { + Assert.That(exception, Is.Not.Null); + Assert.That(exception!.Message, Does.Contain(typeof(AbstractGeneratedNotificationHandlerRegistry).FullName)); + Assert.That( + warningLogs.Any(log => + log.Message.Contains("because it is abstract", StringComparison.Ordinal)), + Is.True); + Assert.That(container.GetServicesUnsafe, Is.Empty); + }); + } + + /// + /// 验证 direct generated-registry 激活入口在 registry 缺少无参构造器时会抛出异常,并保留契约告警。 + /// + [Test] + public void RegisterGeneratedRegistry_Should_Throw_When_Generated_Registry_Has_No_Parameterless_Constructor() + { + var capturingProvider = new CapturingLoggerFactoryProvider(LogLevel.Warning); + var logger = capturingProvider.CreateLogger(nameof(CqrsHandlerRegistrarTests)); + var container = new MicrosoftDiContainer(); + + var exception = Assert.Throws(() => + CqrsHandlerRegistrar.RegisterGeneratedRegistry( + container, + typeof(ConstructorArgumentNotificationHandlerRegistry), + logger)); + + var warningLogs = capturingProvider.Loggers + .SelectMany(static createdLogger => createdLogger.Logs) + .Where(static log => log.Level == LogLevel.Warning) + .ToArray(); + + Assert.Multiple(() => + { + Assert.That(exception, Is.Not.Null); + Assert.That(exception!.Message, Does.Contain(typeof(ConstructorArgumentNotificationHandlerRegistry).FullName)); + Assert.That( + warningLogs.Any(log => + log.Message.Contains( + "does not expose an accessible parameterless constructor", + StringComparison.Ordinal)), + Is.True); + Assert.That(container.GetServicesUnsafe, Is.Empty); + }); + } + /// /// 验证当生成注册器元数据损坏时,运行时会记录告警并回退到反射扫描路径。 /// @@ -695,4 +764,55 @@ internal sealed class CqrsHandlerRegistrarTests return typeof(CqrsReflectionFallbackAttribute).Assembly .GetType("GFramework.Cqrs.Internal.CqrsHandlerRegistrar", throwOnError: true)!; } + + /// + /// 模拟被错误声明为抽象类型的 generated registry。 + /// + private abstract class AbstractGeneratedNotificationHandlerRegistry : ICqrsHandlerRegistry + { + /// + /// 抽象 registry 即便具备注册逻辑,也不应被 direct 激活入口实例化。 + /// + /// 承载处理器映射的服务集合。 + /// 记录注册诊断的日志器。 + public void Register(IServiceCollection services, ILogger logger) + { + ArgumentNullException.ThrowIfNull(services); + ArgumentNullException.ThrowIfNull(logger); + + services.AddTransient( + typeof(INotificationHandler), + typeof(GeneratedRegistryNotificationHandler)); + } + } + + /// + /// 模拟缺少无参构造器的 generated registry。 + /// + private sealed class ConstructorArgumentNotificationHandlerRegistry : ICqrsHandlerRegistry + { + /// + /// 初始化一个只能通过额外参数构造的测试 registry。 + /// + /// 用于区分测试场景的占位参数。 + public ConstructorArgumentNotificationHandlerRegistry(string marker) + { + ArgumentNullException.ThrowIfNull(marker); + } + + /// + /// 此实现仅用于满足接口契约;本用例关注的是构造阶段失败后的异常语义。 + /// + /// 承载处理器映射的服务集合。 + /// 记录注册诊断的日志器。 + public void Register(IServiceCollection services, ILogger logger) + { + ArgumentNullException.ThrowIfNull(services); + ArgumentNullException.ThrowIfNull(logger); + + services.AddTransient( + typeof(INotificationHandler), + typeof(GeneratedRegistryNotificationHandler)); + } + } } diff --git a/GFramework.Cqrs.Tests/Cqrs/CqrsNotificationPublisherTests.cs b/GFramework.Cqrs.Tests/Cqrs/CqrsNotificationPublisherTests.cs index 485c11e7..25064cd2 100644 --- a/GFramework.Cqrs.Tests/Cqrs/CqrsNotificationPublisherTests.cs +++ b/GFramework.Cqrs.Tests/Cqrs/CqrsNotificationPublisherTests.cs @@ -155,6 +155,46 @@ internal sealed class CqrsNotificationPublisherTests Assert.That(secondPublisher.PublishCallCount, Is.Zero); } + /// + /// 验证当容器里没有任何通知发布器时,dispatcher 会回退到内置顺序发布器, + /// 并在首次解析后缓存该 fallback 结果而不是在后续发布时重新查询容器。 + /// + [Test] + public async Task PublishAsync_Should_Fallback_To_SequentialNotificationPublisher_And_Cache_It_When_None_Is_Registered() + { + var invocationOrder = new List(); + var notificationPublisherLookupCount = 0; + var runtime = CreateRuntime( + container => + { + container + .Setup(currentContainer => currentContainer.GetAll(typeof(INotificationHandler))) + .Returns( + [ + new RecordingNotificationHandler("first", invocationOrder), + new RecordingNotificationHandler("second", invocationOrder) + ]); + container + .Setup(currentContainer => currentContainer.GetAll(typeof(INotificationPublisher))) + .Returns(() => + { + notificationPublisherLookupCount++; + if (notificationPublisherLookupCount == 1) + { + return Array.Empty(); + } + + throw new AssertionException("Notification publisher should not be resolved more than once."); + }); + }); + + await runtime.PublishAsync(new FakeCqrsContext(), new PublisherNotification()).ConfigureAwait(false); + await runtime.PublishAsync(new FakeCqrsContext(), new PublisherNotification()).ConfigureAwait(false); + + Assert.That(notificationPublisherLookupCount, Is.EqualTo(1)); + Assert.That(invocationOrder, Is.EqualTo(["first", "second", "first", "second"])); + } + /// /// 验证内置 `TaskWhenAll` 发布器会继续调度所有处理器,而不是沿用默认顺序发布器的失败即停语义。 /// diff --git a/GFramework.Cqrs.Tests/Cqrs/CqrsRegistrationServiceTests.cs b/GFramework.Cqrs.Tests/Cqrs/CqrsRegistrationServiceTests.cs index 35f15736..82bf3253 100644 --- a/GFramework.Cqrs.Tests/Cqrs/CqrsRegistrationServiceTests.cs +++ b/GFramework.Cqrs.Tests/Cqrs/CqrsRegistrationServiceTests.cs @@ -13,6 +13,24 @@ namespace GFramework.Cqrs.Tests.Cqrs; [TestFixture] internal sealed class CqrsRegistrationServiceTests { + /// + /// 验证空程序集输入不会触发底层注册,也不会产生重复跳过日志。 + /// + [Test] + public void RegisterHandlers_Should_Not_Invoke_Registrar_When_Assemblies_Are_Empty() + { + var logger = new TestLogger("DefaultCqrsRegistrationService", LogLevel.Debug); + var registrar = new Mock(MockBehavior.Strict); + var service = CqrsRuntimeFactory.CreateRegistrationService(registrar.Object, logger); + + service.RegisterHandlers([]); + + registrar.Verify( + static currentRegistrar => currentRegistrar.RegisterHandlers(It.IsAny>()), + Times.Never); + Assert.That(logger.Logs, Has.Count.EqualTo(0)); + } + /// /// 验证同一次调用内出现重复程序集键时,底层注册器只会接收到一次注册请求。 /// @@ -87,6 +105,82 @@ internal sealed class CqrsRegistrationServiceTests }); } + /// + /// 验证协调器会忽略空项,并按稳定程序集键排序后仅注册当前调用内的唯一程序集。 + /// + [Test] + public void RegisterHandlers_Should_Ignore_Null_Entries_And_Register_Unique_Assemblies_In_Stable_Key_Order() + { + var logger = new TestLogger("DefaultCqrsRegistrationService", LogLevel.Debug); + var registrar = new Mock(MockBehavior.Strict); + var assemblyC = CreateAssembly("GFramework.Cqrs.Tests.Sorting.C, Version=1.0.0.0"); + var assemblyA = CreateAssembly("GFramework.Cqrs.Tests.Sorting.A, Version=1.0.0.0"); + var duplicateAssemblyA = CreateAssembly("GFramework.Cqrs.Tests.Sorting.A, Version=1.0.0.0"); + var assemblyB = CreateAssembly("GFramework.Cqrs.Tests.Sorting.B, Version=1.0.0.0"); + var registeredAssemblies = new List(); + + registrar + .Setup(static currentRegistrar => currentRegistrar.RegisterHandlers(It.IsAny>())) + .Callback>(assemblies => registeredAssemblies.AddRange(assemblies)); + + var service = CqrsRuntimeFactory.CreateRegistrationService(registrar.Object, logger); + + service.RegisterHandlers([assemblyC.Object, null!, assemblyA.Object, duplicateAssemblyA.Object, assemblyB.Object, null!]); + + registrar.Verify( + static currentRegistrar => currentRegistrar.RegisterHandlers(It.IsAny>()), + Times.Exactly(3)); + Assert.Multiple(() => + { + Assert.That( + registeredAssemblies, + Is.EqualTo([assemblyA.Object, assemblyB.Object, assemblyC.Object])); + Assert.That(logger.Logs, Has.Count.EqualTo(0)); + }); + } + + /// + /// 验证跨调用遇到已注册程序集键时,协调器会跳过重复项,同时继续按稳定程序集键顺序处理剩余新程序集。 + /// + [Test] + public void RegisterHandlers_Should_Skip_Previously_Registered_Keys_And_Keep_Stable_Order_For_Remaining_Assemblies() + { + var logger = new TestLogger("DefaultCqrsRegistrationService", LogLevel.Debug); + var registrar = new Mock(MockBehavior.Strict); + var firstAssembly = CreateAssembly("GFramework.Cqrs.Tests.Sorting.B, Version=1.0.0.0"); + var duplicateAssembly = CreateAssembly("GFramework.Cqrs.Tests.Sorting.B, Version=1.0.0.0"); + var assemblyC = CreateAssembly("GFramework.Cqrs.Tests.Sorting.C, Version=1.0.0.0"); + var assemblyA = CreateAssembly("GFramework.Cqrs.Tests.Sorting.A, Version=1.0.0.0"); + var registeredAssemblies = new List(); + + registrar + .Setup(static currentRegistrar => currentRegistrar.RegisterHandlers(It.IsAny>())) + .Callback>(assemblies => registeredAssemblies.AddRange(assemblies)); + + var service = CqrsRuntimeFactory.CreateRegistrationService(registrar.Object, logger); + + service.RegisterHandlers([firstAssembly.Object]); + service.RegisterHandlers([assemblyC.Object, duplicateAssembly.Object, assemblyA.Object]); + + registrar.Verify( + static currentRegistrar => currentRegistrar.RegisterHandlers(It.IsAny>()), + Times.Exactly(3)); + Assert.Multiple(() => + { + Assert.That( + registeredAssemblies, + Is.EqualTo([firstAssembly.Object, assemblyA.Object, assemblyC.Object])); + var debugMessages = logger.Logs + .Where(static log => log.Level == LogLevel.Debug) + .Select(static log => log.Message) + .ToArray(); + Assert.That(debugMessages, Has.Length.EqualTo(1)); + Assert.That( + debugMessages[0], + Does.Contain("GFramework.Cqrs.Tests.Sorting.B, Version=1.0.0.0")); + }); + } + /// /// 验证当 缺失时,协调器会退化到 作为稳定程序集键。 /// diff --git a/ai-plan/public/cqrs-rewrite/todos/cqrs-rewrite-migration-tracking.md b/ai-plan/public/cqrs-rewrite/todos/cqrs-rewrite-migration-tracking.md index a019a586..9e6dafdc 100644 --- a/ai-plan/public/cqrs-rewrite/todos/cqrs-rewrite-migration-tracking.md +++ b/ai-plan/public/cqrs-rewrite/todos/cqrs-rewrite-migration-tracking.md @@ -12,111 +12,80 @@ CQRS 迁移与收敛。 ## 当前恢复点 -- 恢复点编号:`CQRS-REWRITE-RP-134` +- 恢复点编号:`CQRS-REWRITE-RP-136` - 当前阶段:`Phase 8` -- 当前 PR 锚点:`PR #348` +- 当前 PR 锚点:`PR #349` - 当前结论: - - 本轮按 `$gframework-batch-boot` 协调多波 non-conflicting subagent,基线固定为 - `origin/main @ 3b2e6899d5ffdcfb634b28f3846f57528fbf9196 (2026-05-11T12:25:00+08:00)`。 - - 本轮停止继续扩 batch 的主信号是 `reviewability / context-budget`,不是 `50` 文件阈值; - 自然停点时累计 branch diff 约为 `12 files`,仍明显低于阈值。 - - CQRS runtime / tests 侧已补齐并提交: - - `CqrsNotificationPublisherTests` 锁定“多 publisher 报错”与“单 dispatcher 内 publisher 缓存复用” - - `CqrsGeneratedRequestInvokerProviderTests` 与 `CqrsHandlerRegistrar` 收口 generated descriptor 的异常枚举、 - 坏元数据与重复 pair 回退契约 - - `CqrsDispatcherCacheTests` 锁定 request / stream pipeline presence、executor cache 与上下文重新注入组合分支 + - 本轮先按 `$gframework-pr-review` 重新确认当前分支最新 GitHub 上下文,确认 `feat/cqrs-optimization` 在 `2026-05-12` 已切到 `PR #349`,不再沿用旧 tracking 中的 `PR #348` 锚点。 + - 随后按 `$gframework-batch-boot 50` 持续协调多波 non-conflicting subagent,基线固定为 + `origin/main @ ef4d3d5d (2026-05-11 17:33:43 +0800)`。 + - 当前 branch 相对基线的累计 diff 约为 `9 files / 1111 lines`;本轮停点由 + `context-budget / reviewability` 决定,而不是 `50 files` 阈值。 + - `PR #349` latest-head review 当前确认仍成立的项只有: + - `StreamPipelineBenchmarks` 三个公开 benchmark 方法补齐 `` XML 契约 + - `StreamingBenchmarks.Stream_Mediator` 补齐 `` XML 契约 + - `CqrsNotificationPublisherTests` 中 fallback publisher 缓存回归测试去掉误导性的“第二次解析返回其它 publisher”分支 + - active tracking / trace 的当前 PR 锚点与下一步入口同步到 `PR #349` + - tests 侧已补齐并提交: + - `CqrsRegistrationServiceTests`:补空输入、空项过滤、稳定键排序与跨调用跳过边界 + - `CqrsHandlerRegistrarTests` 与 `CqrsHandlerRegistrarFallbackFailureTests`: + 补 abstract registry 与缺少无参构造器 registry 的回退 / 抛错覆盖 + - `CqrsNotificationPublisherTests`:补“零 publisher 回退到默认顺序发布器并缓存”回归 - benchmark 侧已补齐并提交: - - `RequestStartupBenchmarks` 的 `Mediator` startup 对照 - - `StreamStartupBenchmarks` - - `NotificationStartupBenchmarks` - - `GFramework.Cqrs.Benchmarks/README.md` 的 current coverage / gap 收口 - - 文档与恢复入口侧已补齐并提交: - - `GFramework.Cqrs/README.md` - - `docs/zh-CN/core/cqrs.md` - - `docs/zh-CN/source-generators/cqrs-handler-registry-generator.md` - - `ai-plan/public/cqrs-rewrite/archive/**` 顶部导航与跳转约定 - - 当前 `PR #348` latest-head review 再次复核后: - - 跳过 `NotificationLifetimeBenchmarks.HandlerLifetime` 的 `[GenerateEnumExtensions]` 建议,原因是仓库没有“所有枚举统一生成扩展”的约定,且 benchmark 局部枚举不在该能力的强制范围内 - - 接受并修复 `NotificationLifetimeBenchmarks` 的 scoped 容器释放与公开 XML 文档缺口 - - 接受并修复 `CqrsHandlerRegistrar` 对 generated descriptor 的“先去重后校验”缺陷,并补回归测试锁定“首条无效、后条有效”的同键场景 - - 接受并修复 generated descriptor 校验对 `MethodInfo` 使用 `ReferenceEquals` 的过严比较,改为按方法语义等价匹配 - - 当前尚未提交的收尾切片仅剩: - - `GFramework.Cqrs.Benchmarks/Messaging/NotificationLifetimeBenchmarks.cs` - - `GFramework.Cqrs.Tests/Cqrs/CqrsRegistrationServiceTests.cs` - - `GFramework.Cqrs/README.md` - - `docs/zh-CN/core/command.md` - - `docs/zh-CN/core/query.md` - - 本 tracking / trace 文件本身 + - `StreamPipelineBenchmarks` + - `StreamingBenchmarks` 的 steady-state `Mediator` 对照 + - `GFramework.Cqrs.Benchmarks/README.md` 的 stream coverage / gap 同步 + - 本轮未修改 `GFramework.Cqrs` 运行时代码;notification fallback 与 generated registry 激活守卫均由新回归证明现有实现已满足预期。 ## 当前活跃事实 - 当前分支:`feat/cqrs-optimization` -- 当前 PR:`PR #348` +- 当前 PR:`PR #349` - 当前写面: + - `GFramework.Cqrs.Benchmarks/Messaging/StreamPipelineBenchmarks.cs` + - `GFramework.Cqrs.Benchmarks/Messaging/StreamingBenchmarks.cs` - `GFramework.Cqrs.Benchmarks/README.md` - - `GFramework.Cqrs.Benchmarks/Messaging/NotificationLifetimeBenchmarks.cs` - - `GFramework.Cqrs.Benchmarks/Messaging/NotificationStartupBenchmarks.cs` - - `GFramework.Cqrs.Benchmarks/Messaging/RequestStartupBenchmarks.cs` - - `GFramework.Cqrs.Benchmarks/Messaging/StreamStartupBenchmarks.cs` - - `GFramework.Cqrs.Tests/Cqrs/CqrsDispatcherCacheTests.cs` - - `GFramework.Cqrs.Tests/Cqrs/CqrsGeneratedRequestInvokerProviderTests.cs` + - `GFramework.Cqrs.Tests/Cqrs/CqrsHandlerRegistrarFallbackFailureTests.cs` + - `GFramework.Cqrs.Tests/Cqrs/CqrsHandlerRegistrarTests.cs` - `GFramework.Cqrs.Tests/Cqrs/CqrsNotificationPublisherTests.cs` - `GFramework.Cqrs.Tests/Cqrs/CqrsRegistrationServiceTests.cs` - - `GFramework.Cqrs/Internal/CqrsHandlerRegistrar.cs` - - `GFramework.Cqrs/README.md` - `ai-plan/public/cqrs-rewrite/todos/cqrs-rewrite-migration-tracking.md` - `ai-plan/public/cqrs-rewrite/traces/cqrs-rewrite-migration-trace.md` - - `ai-plan/public/cqrs-rewrite/archive/todos/cqrs-rewrite-migration-tracking-history-through-rp131.md` - - `ai-plan/public/cqrs-rewrite/archive/traces/cqrs-rewrite-migration-trace-history-through-rp131.md` - - `docs/zh-CN/core/command.md` - - `docs/zh-CN/core/cqrs.md` - - `docs/zh-CN/core/query.md` - - `docs/zh-CN/source-generators/cqrs-handler-registry-generator.md` - 当前基线: + - `origin/main @ ef4d3d5d (2026-05-11 17:33:43 +0800)` - 本轮 batch 启动前,分支相对基线的累计 diff 为 `0 files / 0 lines` - - 当前自然停点时,累计 diff 约为 `12 files` - - 本轮新增 benchmark smoke 结果: - - `RequestStartupBenchmarks` - - `ColdStart_GFrameworkCqrs 61.648 us / 25336 B` - - `ColdStart_Mediator 110.867 us / 57872 B` - - `ColdStart_MediatR 679.103 us / 606256 B` - - `StreamStartupBenchmarks` - - `ColdStart_GFrameworkReflection 71.13 us / 25504 B` - - `ColdStart_GFrameworkGenerated 82.12 us / 28280 B` - - `ColdStart_MediatR 933.87 us / 678992 B` - - `NotificationStartupBenchmarks` - - `ColdStart_GFrameworkCqrs 85.09 us / 24752 B` - - `ColdStart_Mediator 136.08 us / 62512 B` - - `ColdStart_MediatR 1.379 ms / 719056 B` + - 当前自然停点时,累计 diff 约为 `9 files / 1111 lines` +- 本轮提交: + - `ef3cfdc4` `test(cqrs): 补充注册服务边界测试` + - `bcfecd3c` `test(cqrs): 补充 registrar 激活失败分支测试` + - `59cab567` `test(cqrs-benchmarks): 新增 stream pipeline benchmark 覆盖` + - `010b7028` `test(cqrs): 补充通知回退回归覆盖` + - `ae1c3b89` `test(cqrs-benchmarks): 补齐 stream steady-state Mediator 对照` ## 当前风险 -- `NotificationLifetimeBenchmarks` 当前已跑完整默认作业,但还没并入提交;若继续新开 batch,未提交面会明显降低可审查性。 -- `RequestStartup` 的提交 `8990749d` 连带带入了 `CqrsDispatcherCacheTests.cs`;虽然两条切片均有效且已验证通过,但提交边界不再严格对应单个 ownership slice。 -- startup 与 lifetime benchmark 的默认作业结果已足以证明路径与相对量级,但 `Initialization_*` 与少量 short-run 结果仍不应直接当成稳定排序结论。 +- 分支已累积 5 个窄切片提交;若继续在同一 turn 扩 benchmark + docs,reviewability 会明显下降。 +- 新增 benchmark 目前只做了编译验证,尚未执行 `StreamPipelineBenchmarks` 或更新后的 `StreamingBenchmarks` 实际作业。 +- `ef3cfdc4` 的 commit body 含字面 `\n`;若后续要整理历史,需要在显式允许的前提下单独处理提交格式。 ## 最近权威验证 - `dotnet build GFramework.Cqrs.Benchmarks/GFramework.Cqrs.Benchmarks.csproj -c Release` - 结果:通过,`0 warning / 0 error` -- `dotnet build GFramework.Cqrs/GFramework.Cqrs.csproj -c Release` - - 结果:通过,`0 warning / 0 error` -- `dotnet build GFramework.Core/GFramework.Core.csproj -c Release` - - 结果:通过,`0 warning / 0 error` -- `dotnet test GFramework.Cqrs.Tests/GFramework.Cqrs.Tests.csproj -c Release --filter "FullyQualifiedName~CqrsRegistrationServiceTests"` - - 结果:通过,`Passed: 4, Failed: 0` -- `dotnet run --project GFramework.Cqrs.Benchmarks/GFramework.Cqrs.Benchmarks.csproj -c Release --no-build -- --artifacts-suffix pr347-req-scoped --filter "*RequestLifetimeBenchmarks.SendRequest_GFrameworkCqrs*" --job short --warmupCount 1 --iterationCount 1 --launchCount 1` +- `dotnet test GFramework.Cqrs.Tests/GFramework.Cqrs.Tests.csproj -c Release --filter "FullyQualifiedName~CqrsNotificationPublisherTests"` + - 结果:通过,`Passed: 9, Failed: 0` +- `dotnet test GFramework.Cqrs.Tests/GFramework.Cqrs.Tests.csproj -c Release --filter "FullyQualifiedName~CqrsRegistrationServiceTests|FullyQualifiedName~CqrsHandlerRegistrarTests|FullyQualifiedName~CqrsHandlerRegistrarFallbackFailureTests|FullyQualifiedName~CqrsNotificationPublisherTests"` + - 结果:通过,`Passed: 36, Failed: 0` +- `python3 scripts/license-header.py --check --paths GFramework.Cqrs.Benchmarks/Messaging/StreamPipelineBenchmarks.cs GFramework.Cqrs.Benchmarks/Messaging/StreamingBenchmarks.cs GFramework.Cqrs.Benchmarks/README.md GFramework.Cqrs.Tests/Cqrs/CqrsHandlerRegistrarFallbackFailureTests.cs GFramework.Cqrs.Tests/Cqrs/CqrsHandlerRegistrarTests.cs GFramework.Cqrs.Tests/Cqrs/CqrsNotificationPublisherTests.cs GFramework.Cqrs.Tests/Cqrs/CqrsRegistrationServiceTests.cs ai-plan/public/cqrs-rewrite/todos/cqrs-rewrite-migration-tracking.md ai-plan/public/cqrs-rewrite/traces/cqrs-rewrite-migration-trace.md` - 结果:通过 - - 备注:`Singleton 52.69 ns / 32 B`、`Transient 57.88 ns / 56 B`、`Scoped 144.72 ns / 368 B` -- `dotnet run --project GFramework.Cqrs.Benchmarks/GFramework.Cqrs.Benchmarks.csproj -c Release --no-build -- --artifacts-suffix pr347-stream-scoped --filter "*StreamLifetimeBenchmarks.Stream_GFramework*" --job short --warmupCount 1 --iterationCount 1 --launchCount 1` +- `git diff --check origin/main...HEAD` - 结果:通过 - - 备注:`Scoped + FirstItem` 约为 `266.7~267.0 ns / 792 B`,`Scoped + DrainAll` 约为 `331.6~332.2 ns / 856 B` ## 下一推荐步骤 -1. 先提交当前未提交的 `NotificationLifetime + registration fallback tests + CQRS/legacy docs` 收尾切片,回收工作树到干净状态。 -2. 再次运行 `$gframework-pr-review`,复核 `PR #348` latest-head open thread 是否已随着本轮多波 head 收敛。 -3. 若继续扩 benchmark,优先从 `GFramework.Cqrs.Benchmarks/README.md` 已明确列出的 gap 中选下一个单文件切片,而不是继续扩大 shared infra 改动面。 +1. 再次运行 `$gframework-pr-review`,复核 `PR #349` latest-head open thread 是否已随着当前修复提交收敛。 +2. 若继续扩 benchmark,优先在 `StreamLifetimeBenchmarks` 或 `StreamStartupBenchmarks` 中补单文件 `Mediator` parity,而不是并行扩多个矩阵。 +3. 若切回文档收尾,把 `GFramework.Cqrs/README.md`、`docs/zh-CN/core/command.md`、`docs/zh-CN/core/query.md` 作为单独一波 docs-only 切片处理。 ## 活跃文档 diff --git a/ai-plan/public/cqrs-rewrite/traces/cqrs-rewrite-migration-trace.md b/ai-plan/public/cqrs-rewrite/traces/cqrs-rewrite-migration-trace.md index 0afbabfb..3523d2d0 100644 --- a/ai-plan/public/cqrs-rewrite/traces/cqrs-rewrite-migration-trace.md +++ b/ai-plan/public/cqrs-rewrite/traces/cqrs-rewrite-migration-trace.md @@ -5,6 +5,70 @@ SPDX-License-Identifier: Apache-2.0 # CQRS 重写迁移追踪 +## 2026-05-12 + +### 阶段:PR #349 latest-head review 收口(CQRS-REWRITE-RP-136) + +- 重新执行 `$gframework-pr-review`,按 GitHub 当前分支状态确认 `feat/cqrs-optimization` 在 `2026-05-12` 对应的是 `PR #349`,不再沿用 active tracking 中的 `PR #348` 锚点。 +- 本轮 latest-head open AI thread 复核结论: + - `StreamPipelineBenchmarks` 的 `Stream_Baseline`、`Stream_GFrameworkCqrs`、`Stream_MediatR` 缺少 `` XML 契约,接受修复 + - `StreamingBenchmarks.Stream_Mediator` 缺少 `` XML 契约,接受修复 + - `CqrsNotificationPublisherTests` 的 fallback publisher 缓存回归测试用“第二次解析返回另一个 publisher”充当安全网,和断言消息表达不一致,接受收口为“首次后任何再次解析都直接失败” + - active tracking / trace 的当前 PR 锚点与下一步入口仍停留在 `PR #348`,接受同步到 `PR #349` +- 本轮主线程实施: + - `StreamPipelineBenchmarks` + - 为 3 个公开 benchmark 方法补齐 `` XML 文档 + - `StreamingBenchmarks` + - 为 `Stream_Mediator()` 补齐 `` XML 文档 + - `CqrsNotificationPublisherTests` + - 把 fallback publisher 缓存回归测试改为“首次返回空数组,后续任何再次解析立即抛 `AssertionException`”,避免测试安全网与失败消息自相矛盾 + - `ai-plan/public/cqrs-rewrite/**` + - 将 active tracking / trace 的当前 PR 锚点与下一步入口同步到 `PR #349` +- 本轮权威验证: + - `dotnet build GFramework.Cqrs.Benchmarks/GFramework.Cqrs.Benchmarks.csproj -c Release` + - 结果:通过,`0 warning / 0 error` + - `dotnet test GFramework.Cqrs.Tests/GFramework.Cqrs.Tests.csproj -c Release --filter "FullyQualifiedName~CqrsNotificationPublisherTests"` + - 结果:通过,`Passed: 9, Failed: 0` + - `python3 scripts/license-header.py --check --paths GFramework.Cqrs.Benchmarks/Messaging/StreamPipelineBenchmarks.cs GFramework.Cqrs.Benchmarks/Messaging/StreamingBenchmarks.cs GFramework.Cqrs.Tests/Cqrs/CqrsNotificationPublisherTests.cs ai-plan/public/cqrs-rewrite/todos/cqrs-rewrite-migration-tracking.md ai-plan/public/cqrs-rewrite/traces/cqrs-rewrite-migration-trace.md` + - 结果:通过 + +### 阶段:多波 batch 继续收口(CQRS-REWRITE-RP-135) + +- 按 `$gframework-batch-boot 50` 恢复当前 topic,并把基线固定为 + `origin/main @ ef4d3d5d (2026-05-11 17:33:43 +0800)`。 +- 启动时确认当前工作树干净,branch diff 为 `0 files / 0 lines`;旧 tracking 中“未提交收尾切片”已不再反映真实仓库状态。 +- 第 1 波 accepted delegated scope: + - `CqrsRegistrationServiceTests` + - 补空输入不触发 registrar、忽略空项后按稳定程序集键排序并去重、跨调用跳过已注册键时继续处理剩余新程序集 + - `CqrsHandlerRegistrarTests` + `CqrsHandlerRegistrarFallbackFailureTests` + - 补 abstract registry 与缺少无参构造器 registry 在程序集级回退路径和 direct activation 入口的告警 / 抛错覆盖 + - `StreamPipelineBenchmarks` + `GFramework.Cqrs.Benchmarks/README.md` + - 新增 `0 / 1 / 4` 个 stream pipeline 行为与 `FirstItem / DrainAll` 观测矩阵 + - README 补齐 stream pipeline coverage、运行示例与 gap 说明 +- 第 2 波 accepted delegated scope: + - `CqrsNotificationPublisherTests` + - 补“容器未注册 publisher 时回退到 `SequentialNotificationPublisher`,且首次解析后缓存结果”回归 + - `StreamingBenchmarks` + `GFramework.Cqrs.Benchmarks/README.md` + - 补 steady-state stream 的 `Mediator` 对照 + - README 将 stream steady-state gap 收口为“lifetime / startup 仍缺 `Mediator` parity” +- 主线程验收与修正: + - 审核 5 个 worker 提交均未越出 ownership 边界 + - 在 `StreamPipelineBenchmarks.cs` 修掉 `git diff --check` 报出的 1 处 trailing whitespace + - 更新 active tracking / trace 到当前 branch 事实,避免下次 `boot` 继续落到过期恢复点 +- 本轮权威验证: + - `dotnet build GFramework.Cqrs.Benchmarks/GFramework.Cqrs.Benchmarks.csproj -c Release` + - `dotnet test GFramework.Cqrs.Tests/GFramework.Cqrs.Tests.csproj -c Release --filter "FullyQualifiedName~CqrsRegistrationServiceTests|FullyQualifiedName~CqrsHandlerRegistrarTests|FullyQualifiedName~CqrsHandlerRegistrarFallbackFailureTests|FullyQualifiedName~CqrsNotificationPublisherTests"` + - `python3 scripts/license-header.py --check --paths ...` + - `git diff --check origin/main...HEAD` +- 当前停点判断: + - 当前 branch diff 约为 `9 files / 1111 lines` + - 明显低于 `50 files` 阈值 + - 本轮停止信号来自 `context-budget / reviewability`,不是文件预算耗尽 +- 当前下一步: + - 先按需要运行 `$gframework-pr-review`,确认 `PR #349` latest-head open thread 是否已随当前修复提交收敛 + - 若继续扩 benchmark,优先补 `StreamLifetimeBenchmarks` 或 `StreamStartupBenchmarks` 的单文件 `Mediator` parity + - 若切回文档收尾,把 `GFramework.Cqrs/README.md`、`docs/zh-CN/core/command.md`、`docs/zh-CN/core/query.md` 单独作为 docs-only 下一波 + ## 2026-05-11 ### 阶段:PR #348 latest-head review 再收口(CQRS-REWRITE-RP-134)