From 75e7785592ac5600700f28f00411a39583d01d57 Mon Sep 17 00:00:00 2001
From: gewuyou <95328647+GeWuYou@users.noreply.github.com>
Date: Tue, 12 May 2026 08:36:18 +0800
Subject: [PATCH 1/9] =?UTF-8?q?test(cqrs):=20=E8=A1=A5=E5=85=85=E6=B3=A8?=
=?UTF-8?q?=E5=86=8C=E6=9C=8D=E5=8A=A1=E8=BE=B9=E7=95=8C=E6=B5=8B=E8=AF=95?=
MIME-Version: 1.0
Content-Type: text/plain; charset=UTF-8
Content-Transfer-Encoding: 8bit
- 补充空程序集输入时不触发 registrar 的回归测试
- 验证忽略空项后按稳定程序集键排序并去重的注册顺序
- 验证跨调用跳过已注册程序集键时仍继续处理剩余新程序集
---
.../Cqrs/CqrsRegistrationServiceTests.cs | 94 +++++++++++++++++++
1 file changed, 94 insertions(+)
diff --git a/GFramework.Cqrs.Tests/Cqrs/CqrsRegistrationServiceTests.cs b/GFramework.Cqrs.Tests/Cqrs/CqrsRegistrationServiceTests.cs
index 35f15736..82bf3253 100644
--- a/GFramework.Cqrs.Tests/Cqrs/CqrsRegistrationServiceTests.cs
+++ b/GFramework.Cqrs.Tests/Cqrs/CqrsRegistrationServiceTests.cs
@@ -13,6 +13,24 @@ namespace GFramework.Cqrs.Tests.Cqrs;
[TestFixture]
internal sealed class CqrsRegistrationServiceTests
{
+ ///
+ /// 验证空程序集输入不会触发底层注册,也不会产生重复跳过日志。
+ ///
+ [Test]
+ public void RegisterHandlers_Should_Not_Invoke_Registrar_When_Assemblies_Are_Empty()
+ {
+ var logger = new TestLogger("DefaultCqrsRegistrationService", LogLevel.Debug);
+ var registrar = new Mock(MockBehavior.Strict);
+ var service = CqrsRuntimeFactory.CreateRegistrationService(registrar.Object, logger);
+
+ service.RegisterHandlers([]);
+
+ registrar.Verify(
+ static currentRegistrar => currentRegistrar.RegisterHandlers(It.IsAny>()),
+ Times.Never);
+ Assert.That(logger.Logs, Has.Count.EqualTo(0));
+ }
+
///
/// 验证同一次调用内出现重复程序集键时,底层注册器只会接收到一次注册请求。
///
@@ -87,6 +105,82 @@ internal sealed class CqrsRegistrationServiceTests
});
}
+ ///
+ /// 验证协调器会忽略空项,并按稳定程序集键排序后仅注册当前调用内的唯一程序集。
+ ///
+ [Test]
+ public void RegisterHandlers_Should_Ignore_Null_Entries_And_Register_Unique_Assemblies_In_Stable_Key_Order()
+ {
+ var logger = new TestLogger("DefaultCqrsRegistrationService", LogLevel.Debug);
+ var registrar = new Mock(MockBehavior.Strict);
+ var assemblyC = CreateAssembly("GFramework.Cqrs.Tests.Sorting.C, Version=1.0.0.0");
+ var assemblyA = CreateAssembly("GFramework.Cqrs.Tests.Sorting.A, Version=1.0.0.0");
+ var duplicateAssemblyA = CreateAssembly("GFramework.Cqrs.Tests.Sorting.A, Version=1.0.0.0");
+ var assemblyB = CreateAssembly("GFramework.Cqrs.Tests.Sorting.B, Version=1.0.0.0");
+ var registeredAssemblies = new List();
+
+ registrar
+ .Setup(static currentRegistrar => currentRegistrar.RegisterHandlers(It.IsAny>()))
+ .Callback>(assemblies => registeredAssemblies.AddRange(assemblies));
+
+ var service = CqrsRuntimeFactory.CreateRegistrationService(registrar.Object, logger);
+
+ service.RegisterHandlers([assemblyC.Object, null!, assemblyA.Object, duplicateAssemblyA.Object, assemblyB.Object, null!]);
+
+ registrar.Verify(
+ static currentRegistrar => currentRegistrar.RegisterHandlers(It.IsAny>()),
+ Times.Exactly(3));
+ Assert.Multiple(() =>
+ {
+ Assert.That(
+ registeredAssemblies,
+ Is.EqualTo([assemblyA.Object, assemblyB.Object, assemblyC.Object]));
+ Assert.That(logger.Logs, Has.Count.EqualTo(0));
+ });
+ }
+
+ ///
+ /// 验证跨调用遇到已注册程序集键时,协调器会跳过重复项,同时继续按稳定程序集键顺序处理剩余新程序集。
+ ///
+ [Test]
+ public void RegisterHandlers_Should_Skip_Previously_Registered_Keys_And_Keep_Stable_Order_For_Remaining_Assemblies()
+ {
+ var logger = new TestLogger("DefaultCqrsRegistrationService", LogLevel.Debug);
+ var registrar = new Mock(MockBehavior.Strict);
+ var firstAssembly = CreateAssembly("GFramework.Cqrs.Tests.Sorting.B, Version=1.0.0.0");
+ var duplicateAssembly = CreateAssembly("GFramework.Cqrs.Tests.Sorting.B, Version=1.0.0.0");
+ var assemblyC = CreateAssembly("GFramework.Cqrs.Tests.Sorting.C, Version=1.0.0.0");
+ var assemblyA = CreateAssembly("GFramework.Cqrs.Tests.Sorting.A, Version=1.0.0.0");
+ var registeredAssemblies = new List();
+
+ registrar
+ .Setup(static currentRegistrar => currentRegistrar.RegisterHandlers(It.IsAny>()))
+ .Callback>(assemblies => registeredAssemblies.AddRange(assemblies));
+
+ var service = CqrsRuntimeFactory.CreateRegistrationService(registrar.Object, logger);
+
+ service.RegisterHandlers([firstAssembly.Object]);
+ service.RegisterHandlers([assemblyC.Object, duplicateAssembly.Object, assemblyA.Object]);
+
+ registrar.Verify(
+ static currentRegistrar => currentRegistrar.RegisterHandlers(It.IsAny>()),
+ Times.Exactly(3));
+ Assert.Multiple(() =>
+ {
+ Assert.That(
+ registeredAssemblies,
+ Is.EqualTo([firstAssembly.Object, assemblyA.Object, assemblyC.Object]));
+ var debugMessages = logger.Logs
+ .Where(static log => log.Level == LogLevel.Debug)
+ .Select(static log => log.Message)
+ .ToArray();
+ Assert.That(debugMessages, Has.Length.EqualTo(1));
+ Assert.That(
+ debugMessages[0],
+ Does.Contain("GFramework.Cqrs.Tests.Sorting.B, Version=1.0.0.0"));
+ });
+ }
+
///
/// 验证当 缺失时,协调器会退化到 作为稳定程序集键。
///
From 5e9b903d0f468acb7f7fc5bafa9785a6afc7b05a Mon Sep 17 00:00:00 2001
From: gewuyou <95328647+GeWuYou@users.noreply.github.com>
Date: Tue, 12 May 2026 08:39:00 +0800
Subject: [PATCH 2/9] =?UTF-8?q?test(cqrs):=20=E8=A1=A5=E5=85=85=20registra?=
=?UTF-8?q?r=20=E6=BF=80=E6=B4=BB=E5=A4=B1=E8=B4=A5=E5=88=86=E6=94=AF?=
=?UTF-8?q?=E6=B5=8B=E8=AF=95?=
MIME-Version: 1.0
Content-Type: text/plain; charset=UTF-8
Content-Transfer-Encoding: 8bit
- 补充 generated registry 为抽象类型时的回退与抛错覆盖
- 补充 generated registry 缺少无参构造器时的回退与抛错覆盖
---
...qrsHandlerRegistrarFallbackFailureTests.cs | 131 ++++++++++++++++++
.../Cqrs/CqrsHandlerRegistrarTests.cs | 120 ++++++++++++++++
2 files changed, 251 insertions(+)
diff --git a/GFramework.Cqrs.Tests/Cqrs/CqrsHandlerRegistrarFallbackFailureTests.cs b/GFramework.Cqrs.Tests/Cqrs/CqrsHandlerRegistrarFallbackFailureTests.cs
index ebfa6659..ffbe5196 100644
--- a/GFramework.Cqrs.Tests/Cqrs/CqrsHandlerRegistrarFallbackFailureTests.cs
+++ b/GFramework.Cqrs.Tests/Cqrs/CqrsHandlerRegistrarFallbackFailureTests.cs
@@ -138,6 +138,68 @@ internal sealed class CqrsHandlerRegistrarFallbackFailureTests
});
}
+ ///
+ /// 验证当 generated registry 是抽象类型时,registrar 会记录告警并回退到反射扫描。
+ ///
+ [Test]
+ public void RegisterHandlers_Should_Fall_Back_To_Reflection_When_Generated_Registry_Is_Abstract()
+ {
+ var generatedAssembly = CreateGeneratedRegistryAssembly(
+ "GFramework.Cqrs.Tests.Cqrs.AbstractGeneratedRegistryAssembly, Version=1.0.0.0",
+ typeof(AbstractGeneratedNotificationHandlerRegistry));
+ generatedAssembly
+ .Setup(static assembly => assembly.GetTypes())
+ .Returns([typeof(GeneratedRegistryNotificationHandler)]);
+
+ var container = new MicrosoftDiContainer();
+ CqrsTestRuntime.RegisterHandlers(container, generatedAssembly.Object);
+
+ Assert.Multiple(() =>
+ {
+ Assert.That(
+ GetGeneratedRegistryNotificationHandlerTypes(container),
+ Is.EqualTo([typeof(GeneratedRegistryNotificationHandler)]));
+ Assert.That(
+ GetWarningLogs().Any(log =>
+ log.Message.Contains("because it is abstract", StringComparison.Ordinal)),
+ Is.True);
+ });
+
+ generatedAssembly.Verify(static assembly => assembly.GetTypes(), Times.Once);
+ }
+
+ ///
+ /// 验证当 generated registry 不暴露可访问无参构造器时,registrar 会记录告警并回退到反射扫描。
+ ///
+ [Test]
+ public void RegisterHandlers_Should_Fall_Back_To_Reflection_When_Generated_Registry_Has_No_Parameterless_Constructor()
+ {
+ var generatedAssembly = CreateGeneratedRegistryAssembly(
+ "GFramework.Cqrs.Tests.Cqrs.NoParameterlessGeneratedRegistryAssembly, Version=1.0.0.0",
+ typeof(ConstructorArgumentNotificationHandlerRegistry));
+ generatedAssembly
+ .Setup(static assembly => assembly.GetTypes())
+ .Returns([typeof(GeneratedRegistryNotificationHandler)]);
+
+ var container = new MicrosoftDiContainer();
+ CqrsTestRuntime.RegisterHandlers(container, generatedAssembly.Object);
+
+ Assert.Multiple(() =>
+ {
+ Assert.That(
+ GetGeneratedRegistryNotificationHandlerTypes(container),
+ Is.EqualTo([typeof(GeneratedRegistryNotificationHandler)]));
+ Assert.That(
+ GetWarningLogs().Any(log =>
+ log.Message.Contains(
+ "does not expose an accessible parameterless constructor",
+ StringComparison.Ordinal)),
+ Is.True);
+ });
+
+ generatedAssembly.Verify(static assembly => assembly.GetTypes(), Times.Once);
+ }
+
///
/// 创建一个仅通过 generated registry 注册主 handler、并附带指定 fallback 元数据的程序集替身。
///
@@ -161,6 +223,24 @@ internal sealed class CqrsHandlerRegistrarFallbackFailureTests
return generatedAssembly;
}
+ ///
+ /// 创建一个只声明 generated registry attribute 的程序集替身,用于验证 registry 激活失败后的回退行为。
+ ///
+ /// 用于日志与缓存键的程序集名。
+ /// 要暴露给 registrar 的 generated registry 类型。
+ /// 已完成基础接线的程序集 mock。
+ private static Mock CreateGeneratedRegistryAssembly(string assemblyName, Type registryType)
+ {
+ var generatedAssembly = new Mock();
+ generatedAssembly
+ .SetupGet(static assembly => assembly.FullName)
+ .Returns(assemblyName);
+ generatedAssembly
+ .Setup(static assembly => assembly.GetCustomAttributes(typeof(CqrsHandlerRegistryAttribute), false))
+ .Returns([new CqrsHandlerRegistryAttribute(registryType)]);
+ return generatedAssembly;
+ }
+
///
/// 提取容器中针对 generated notification 注册的处理器实现类型。
///
@@ -259,4 +339,55 @@ internal sealed class CqrsHandlerRegistrarFallbackFailureTests
.Where(static log => log.Level == LogLevel.Warning)
.ToArray();
}
+
+ ///
+ /// 模拟 generated registry 被错误声明为抽象类型时的激活失败场景。
+ ///
+ private abstract class AbstractGeneratedNotificationHandlerRegistry : ICqrsHandlerRegistry
+ {
+ ///
+ /// 抽象 registry 即便具备注册逻辑,也不应被运行时实例化。
+ ///
+ /// 承载处理器映射的服务集合。
+ /// 记录注册诊断的日志器。
+ public void Register(IServiceCollection services, ILogger logger)
+ {
+ ArgumentNullException.ThrowIfNull(services);
+ ArgumentNullException.ThrowIfNull(logger);
+
+ services.AddTransient(
+ typeof(INotificationHandler),
+ typeof(GeneratedRegistryNotificationHandler));
+ }
+ }
+
+ ///
+ /// 模拟 generated registry 缺少可访问无参构造器时的激活失败场景。
+ ///
+ private sealed class ConstructorArgumentNotificationHandlerRegistry : ICqrsHandlerRegistry
+ {
+ ///
+ /// 初始化一个只能通过额外参数构造的测试 registry。
+ ///
+ /// 用于区分测试场景的占位参数。
+ public ConstructorArgumentNotificationHandlerRegistry(string marker)
+ {
+ ArgumentNullException.ThrowIfNull(marker);
+ }
+
+ ///
+ /// 此实现仅用于满足接口契约;本用例关注的是实例化失败前的回退行为。
+ ///
+ /// 承载处理器映射的服务集合。
+ /// 记录注册诊断的日志器。
+ public void Register(IServiceCollection services, ILogger logger)
+ {
+ ArgumentNullException.ThrowIfNull(services);
+ ArgumentNullException.ThrowIfNull(logger);
+
+ services.AddTransient(
+ typeof(INotificationHandler),
+ typeof(GeneratedRegistryNotificationHandler));
+ }
+ }
}
diff --git a/GFramework.Cqrs.Tests/Cqrs/CqrsHandlerRegistrarTests.cs b/GFramework.Cqrs.Tests/Cqrs/CqrsHandlerRegistrarTests.cs
index 967fc6a8..4fa6b124 100644
--- a/GFramework.Cqrs.Tests/Cqrs/CqrsHandlerRegistrarTests.cs
+++ b/GFramework.Cqrs.Tests/Cqrs/CqrsHandlerRegistrarTests.cs
@@ -6,6 +6,7 @@ using GFramework.Core.Architectures;
using GFramework.Core.Ioc;
using GFramework.Core.Logging;
using GFramework.Cqrs.Abstractions.Cqrs;
+using GFramework.Cqrs.Internal;
using GFramework.Cqrs.Tests.Logging;
namespace GFramework.Cqrs.Tests.Cqrs;
@@ -170,6 +171,74 @@ internal sealed class CqrsHandlerRegistrarTests
Is.EqualTo([typeof(GeneratedRegistryNotificationHandler)]));
}
+ ///
+ /// 验证 direct generated-registry 激活入口在 registry 为抽象类型时会抛出异常,并保留契约告警。
+ ///
+ [Test]
+ public void RegisterGeneratedRegistry_Should_Throw_When_Generated_Registry_Is_Abstract()
+ {
+ var capturingProvider = new CapturingLoggerFactoryProvider(LogLevel.Warning);
+ var logger = capturingProvider.CreateLogger(nameof(CqrsHandlerRegistrarTests));
+ var container = new MicrosoftDiContainer();
+
+ var exception = Assert.Throws(() =>
+ CqrsHandlerRegistrar.RegisterGeneratedRegistry(
+ container,
+ typeof(AbstractGeneratedNotificationHandlerRegistry),
+ logger));
+
+ var warningLogs = capturingProvider.Loggers
+ .SelectMany(static createdLogger => createdLogger.Logs)
+ .Where(static log => log.Level == LogLevel.Warning)
+ .ToArray();
+
+ Assert.Multiple(() =>
+ {
+ Assert.That(exception, Is.Not.Null);
+ Assert.That(exception!.Message, Does.Contain(typeof(AbstractGeneratedNotificationHandlerRegistry).FullName));
+ Assert.That(
+ warningLogs.Any(log =>
+ log.Message.Contains("because it is abstract", StringComparison.Ordinal)),
+ Is.True);
+ Assert.That(container.GetServicesUnsafe, Is.Empty);
+ });
+ }
+
+ ///
+ /// 验证 direct generated-registry 激活入口在 registry 缺少无参构造器时会抛出异常,并保留契约告警。
+ ///
+ [Test]
+ public void RegisterGeneratedRegistry_Should_Throw_When_Generated_Registry_Has_No_Parameterless_Constructor()
+ {
+ var capturingProvider = new CapturingLoggerFactoryProvider(LogLevel.Warning);
+ var logger = capturingProvider.CreateLogger(nameof(CqrsHandlerRegistrarTests));
+ var container = new MicrosoftDiContainer();
+
+ var exception = Assert.Throws(() =>
+ CqrsHandlerRegistrar.RegisterGeneratedRegistry(
+ container,
+ typeof(ConstructorArgumentNotificationHandlerRegistry),
+ logger));
+
+ var warningLogs = capturingProvider.Loggers
+ .SelectMany(static createdLogger => createdLogger.Logs)
+ .Where(static log => log.Level == LogLevel.Warning)
+ .ToArray();
+
+ Assert.Multiple(() =>
+ {
+ Assert.That(exception, Is.Not.Null);
+ Assert.That(exception!.Message, Does.Contain(typeof(ConstructorArgumentNotificationHandlerRegistry).FullName));
+ Assert.That(
+ warningLogs.Any(log =>
+ log.Message.Contains(
+ "does not expose an accessible parameterless constructor",
+ StringComparison.Ordinal)),
+ Is.True);
+ Assert.That(container.GetServicesUnsafe, Is.Empty);
+ });
+ }
+
///
/// 验证当生成注册器元数据损坏时,运行时会记录告警并回退到反射扫描路径。
///
@@ -695,4 +764,55 @@ internal sealed class CqrsHandlerRegistrarTests
return typeof(CqrsReflectionFallbackAttribute).Assembly
.GetType("GFramework.Cqrs.Internal.CqrsHandlerRegistrar", throwOnError: true)!;
}
+
+ ///
+ /// 模拟被错误声明为抽象类型的 generated registry。
+ ///
+ private abstract class AbstractGeneratedNotificationHandlerRegistry : ICqrsHandlerRegistry
+ {
+ ///
+ /// 抽象 registry 即便具备注册逻辑,也不应被 direct 激活入口实例化。
+ ///
+ /// 承载处理器映射的服务集合。
+ /// 记录注册诊断的日志器。
+ public void Register(IServiceCollection services, ILogger logger)
+ {
+ ArgumentNullException.ThrowIfNull(services);
+ ArgumentNullException.ThrowIfNull(logger);
+
+ services.AddTransient(
+ typeof(INotificationHandler),
+ typeof(GeneratedRegistryNotificationHandler));
+ }
+ }
+
+ ///
+ /// 模拟缺少无参构造器的 generated registry。
+ ///
+ private sealed class ConstructorArgumentNotificationHandlerRegistry : ICqrsHandlerRegistry
+ {
+ ///
+ /// 初始化一个只能通过额外参数构造的测试 registry。
+ ///
+ /// 用于区分测试场景的占位参数。
+ public ConstructorArgumentNotificationHandlerRegistry(string marker)
+ {
+ ArgumentNullException.ThrowIfNull(marker);
+ }
+
+ ///
+ /// 此实现仅用于满足接口契约;本用例关注的是构造阶段失败后的异常语义。
+ ///
+ /// 承载处理器映射的服务集合。
+ /// 记录注册诊断的日志器。
+ public void Register(IServiceCollection services, ILogger logger)
+ {
+ ArgumentNullException.ThrowIfNull(services);
+ ArgumentNullException.ThrowIfNull(logger);
+
+ services.AddTransient(
+ typeof(INotificationHandler),
+ typeof(GeneratedRegistryNotificationHandler));
+ }
+ }
}
From ee8d65279e0f99dc29830d5d3c4a5cfc8dd413f2 Mon Sep 17 00:00:00 2001
From: gewuyou <95328647+GeWuYou@users.noreply.github.com>
Date: Tue, 12 May 2026 08:41:20 +0800
Subject: [PATCH 3/9] =?UTF-8?q?test(cqrs-benchmarks):=20=E6=96=B0=E5=A2=9E?=
=?UTF-8?q?=20stream=20pipeline=20benchmark=20=E8=A6=86=E7=9B=96?=
MIME-Version: 1.0
Content-Type: text/plain; charset=UTF-8
Content-Transfer-Encoding: 8bit
- 新增 StreamPipelineBenchmarks,覆盖 0/1/4 个 stream pipeline 行为与 FirstItem/DrainAll 观测矩阵
- 内联 GeneratedStreamPipelineBenchmarkRegistry,保持默认 generated-provider benchmark 宿主接线口径
- 更新 benchmark README,补齐 stream pipeline coverage、运行示例与缺口说明
---
.../Messaging/StreamPipelineBenchmarks.cs | 524 ++++++++++++++++++
GFramework.Cqrs.Benchmarks/README.md | 7 +-
2 files changed, 529 insertions(+), 2 deletions(-)
create mode 100644 GFramework.Cqrs.Benchmarks/Messaging/StreamPipelineBenchmarks.cs
diff --git a/GFramework.Cqrs.Benchmarks/Messaging/StreamPipelineBenchmarks.cs b/GFramework.Cqrs.Benchmarks/Messaging/StreamPipelineBenchmarks.cs
new file mode 100644
index 00000000..db62d513
--- /dev/null
+++ b/GFramework.Cqrs.Benchmarks/Messaging/StreamPipelineBenchmarks.cs
@@ -0,0 +1,524 @@
+// Copyright (c) 2025-2026 GeWuYou
+// SPDX-License-Identifier: Apache-2.0
+
+using BenchmarkDotNet.Attributes;
+using BenchmarkDotNet.Columns;
+using BenchmarkDotNet.Configs;
+using BenchmarkDotNet.Diagnosers;
+using BenchmarkDotNet.Jobs;
+using BenchmarkDotNet.Order;
+using System;
+using System.Collections.Generic;
+using System.Reflection;
+using System.Runtime.CompilerServices;
+using System.Threading;
+using System.Threading.Tasks;
+using GFramework.Core.Abstractions.Logging;
+using GFramework.Core.Ioc;
+using GFramework.Core.Logging;
+using GFramework.Cqrs.Abstractions.Cqrs;
+using MediatR;
+using Microsoft.Extensions.DependencyInjection;
+
+[assembly: GFramework.Cqrs.CqrsHandlerRegistryAttribute(
+ typeof(GFramework.Cqrs.Benchmarks.Messaging.StreamPipelineBenchmarks.GeneratedStreamPipelineBenchmarkRegistry))]
+
+namespace GFramework.Cqrs.Benchmarks.Messaging;
+
+///
+/// 对比不同 stream pipeline 行为数量下,单个 stream request 在直接调用、GFramework.CQRS runtime 与 MediatR 之间的 steady-state dispatch 开销。
+///
+///
+/// 当前矩阵同时覆盖 0 / 1 / 4 个 stream pipeline 行为,以及
+/// 与 两种观测口径,
+/// 以便把建流固定成本与完整枚举成本拆开观察。
+///
+[Config(typeof(Config))]
+public class StreamPipelineBenchmarks
+{
+ private MicrosoftDiContainer _container = null!;
+ private ICqrsRuntime _runtime = null!;
+ private ServiceProvider _serviceProvider = null!;
+ private IMediator _mediatr = null!;
+ private BenchmarkStreamHandler _baselineHandler = null!;
+ private BenchmarkStreamRequest _request = null!;
+
+ ///
+ /// 控制当前场景注册的 stream pipeline 行为数量,保持与 request pipeline benchmark 相同的 0 / 1 / 4 矩阵。
+ ///
+ [Params(0, 1, 4)]
+ public int PipelineCount { get; set; }
+
+ ///
+ /// 控制当前 benchmark 观察“只推进首个元素”还是“完整枚举整个 stream”。
+ ///
+ [Params(StreamObservation.FirstItem, StreamObservation.DrainAll)]
+ public StreamObservation Observation { get; set; }
+
+ ///
+ /// 用于拆分 stream dispatch 固定成本与后续枚举成本的观测模式。
+ ///
+ public enum StreamObservation
+ {
+ ///
+ /// 只推进到首个元素后立即释放枚举器。
+ ///
+ FirstItem,
+
+ ///
+ /// 完整枚举整个 stream,保留原有 benchmark 语义。
+ ///
+ DrainAll
+ }
+
+ ///
+ /// 配置 stream pipeline benchmark 的公共输出格式。
+ ///
+ private sealed class Config : ManualConfig
+ {
+ public Config()
+ {
+ AddJob(Job.Default);
+ AddColumnProvider(DefaultColumnProviders.Instance);
+ AddColumn(new CustomColumn("Scenario", static (_, _) => "StreamPipeline"));
+ AddDiagnoser(MemoryDiagnoser.Default);
+ WithOrderer(new DefaultOrderer(SummaryOrderPolicy.FastestToSlowest, MethodOrderPolicy.Declared));
+ }
+ }
+
+ ///
+ /// 构建 stream pipeline dispatch 所需的最小 runtime 宿主和对照对象。
+ ///
+ [GlobalSetup]
+ public void Setup()
+ {
+ LoggerFactoryResolver.Provider = new ConsoleLoggerFactoryProvider
+ {
+ MinLevel = LogLevel.Fatal
+ };
+ Fixture.Setup("StreamPipeline", handlerCount: 1, pipelineCount: PipelineCount);
+ BenchmarkDispatcherCacheHelper.ClearDispatcherCaches();
+
+ _baselineHandler = new BenchmarkStreamHandler();
+ _container = BenchmarkHostFactory.CreateFrozenGFrameworkContainer(container =>
+ {
+ BenchmarkHostFactory.RegisterGeneratedBenchmarkRegistry(container);
+ RegisterGFrameworkPipelineBehaviors(container, PipelineCount);
+ });
+ _runtime = GFramework.Cqrs.CqrsRuntimeFactory.CreateRuntime(
+ _container,
+ LoggerFactoryResolver.Provider.CreateLogger(nameof(StreamPipelineBenchmarks)));
+
+ _serviceProvider = BenchmarkHostFactory.CreateMediatRServiceProvider(
+ services =>
+ {
+ RegisterMediatRStreamPipelineBehaviors(services, PipelineCount);
+ },
+ typeof(StreamPipelineBenchmarks),
+ static candidateType =>
+ candidateType == typeof(BenchmarkStreamHandler) ||
+ candidateType == typeof(BenchmarkStreamPipelineBehavior1) ||
+ candidateType == typeof(BenchmarkStreamPipelineBehavior2) ||
+ candidateType == typeof(BenchmarkStreamPipelineBehavior3) ||
+ candidateType == typeof(BenchmarkStreamPipelineBehavior4),
+ ServiceLifetime.Singleton);
+ _mediatr = _serviceProvider.GetRequiredService();
+
+ _request = new BenchmarkStreamRequest(Guid.NewGuid(), 3);
+ }
+
+ ///
+ /// 释放 MediatR 对照组使用的 DI 宿主。
+ ///
+ [GlobalCleanup]
+ public void Cleanup()
+ {
+ try
+ {
+ BenchmarkCleanupHelper.DisposeAll(_container, _serviceProvider);
+ }
+ finally
+ {
+ BenchmarkDispatcherCacheHelper.ClearDispatcherCaches();
+ }
+ }
+
+ ///
+ /// 直接调用 handler,并按当前观测模式消费响应序列,作为 stream pipeline 编排之外的基线。
+ ///
+ [Benchmark(Baseline = true)]
+ public ValueTask Stream_Baseline()
+ {
+ return ObserveAsync(_baselineHandler.Handle(_request, CancellationToken.None), Observation);
+ }
+
+ ///
+ /// 通过 GFramework.CQRS runtime 创建 stream,并按当前矩阵配置执行 stream pipeline。
+ ///
+ [Benchmark]
+ public ValueTask Stream_GFrameworkCqrs()
+ {
+ return ObserveAsync(
+ _runtime.CreateStream(
+ BenchmarkContext.Instance,
+ _request,
+ CancellationToken.None),
+ Observation);
+ }
+
+ ///
+ /// 通过 MediatR 创建 stream,并按当前矩阵配置执行 stream pipeline,作为外部设计对照。
+ ///
+ [Benchmark]
+ public ValueTask Stream_MediatR()
+ {
+ return ObserveAsync(_mediatr.CreateStream(_request, CancellationToken.None), Observation);
+ }
+
+ ///
+ /// 按指定数量向 GFramework.CQRS 宿主注册最小 no-op stream pipeline 行为。
+ ///
+ /// 当前 benchmark 使用的容器。
+ /// 要注册的行为数量。
+ /// 行为数量不在支持的矩阵内时抛出。
+ private static void RegisterGFrameworkPipelineBehaviors(MicrosoftDiContainer container, int pipelineCount)
+ {
+ ArgumentNullException.ThrowIfNull(container);
+
+ switch (pipelineCount)
+ {
+ case 0:
+ return;
+ case 1:
+ container.RegisterCqrsStreamPipelineBehavior();
+ return;
+ case 4:
+ container.RegisterCqrsStreamPipelineBehavior();
+ container.RegisterCqrsStreamPipelineBehavior();
+ container.RegisterCqrsStreamPipelineBehavior();
+ container.RegisterCqrsStreamPipelineBehavior();
+ return;
+ default:
+ throw new ArgumentOutOfRangeException(nameof(pipelineCount), pipelineCount,
+ "Only the 0/1/4 pipeline matrix is supported.");
+ }
+ }
+
+ ///
+ /// 按指定数量向 MediatR 宿主注册最小 no-op stream pipeline 行为。
+ ///
+ /// 当前 benchmark 使用的服务集合。
+ /// 要注册的行为数量。
+ /// 行为数量不在支持的矩阵内时抛出。
+ private static void RegisterMediatRStreamPipelineBehaviors(IServiceCollection services, int pipelineCount)
+ {
+ ArgumentNullException.ThrowIfNull(services);
+
+ switch (pipelineCount)
+ {
+ case 0:
+ return;
+ case 1:
+ services.AddSingleton, BenchmarkStreamPipelineBehavior1>();
+ return;
+ case 4:
+ services.AddSingleton, BenchmarkStreamPipelineBehavior1>();
+ services.AddSingleton, BenchmarkStreamPipelineBehavior2>();
+ services.AddSingleton, BenchmarkStreamPipelineBehavior3>();
+ services.AddSingleton, BenchmarkStreamPipelineBehavior4>();
+ return;
+ default:
+ throw new ArgumentOutOfRangeException(nameof(pipelineCount), pipelineCount,
+ "Only the 0/1/4 pipeline matrix is supported.");
+ }
+ }
+
+ ///
+ /// 按观测模式消费 stream,便于把“建流/首个元素”和“完整枚举”分开观察。
+ ///
+ /// 当前 stream 的响应类型。
+ /// 待观察的异步响应序列。
+ /// 当前 benchmark 选定的观测模式。
+ /// 异步消费完成后的等待句柄。
+ private static ValueTask ObserveAsync(
+ IAsyncEnumerable responses,
+ StreamObservation observation)
+ {
+ ArgumentNullException.ThrowIfNull(responses);
+
+ return observation switch
+ {
+ StreamObservation.FirstItem => ConsumeFirstItemAsync(responses, CancellationToken.None),
+ StreamObservation.DrainAll => DrainAsync(responses),
+ _ => throw new ArgumentOutOfRangeException(
+ nameof(observation),
+ observation,
+ "Unsupported stream observation mode.")
+ };
+ }
+
+ ///
+ /// 只推进到首个元素后立即释放枚举器,用来近似隔离建流与首个 MoveNextAsync 的固定成本。
+ ///
+ /// 当前 stream 的响应类型。
+ /// 待观察的异步响应序列。
+ /// 用于向异步枚举器传播取消的令牌。
+ /// 消费首个元素后的等待句柄。
+ private static async ValueTask ConsumeFirstItemAsync(
+ IAsyncEnumerable responses,
+ CancellationToken cancellationToken)
+ {
+ var enumerator = responses.GetAsyncEnumerator(cancellationToken);
+ await using (enumerator.ConfigureAwait(false))
+ {
+ if (await enumerator.MoveNextAsync().ConfigureAwait(false))
+ {
+ _ = enumerator.Current;
+ }
+ }
+ }
+
+ ///
+ /// 完整枚举整个 stream,保留原 benchmark 的总成本观测口径。
+ ///
+ /// 当前 stream 的响应类型。
+ /// 待完整枚举的异步响应序列。
+ /// 完整枚举结束后的等待句柄。
+ private static async ValueTask DrainAsync(IAsyncEnumerable responses)
+ {
+ await foreach (var response in responses.ConfigureAwait(false))
+ {
+ _ = response;
+ }
+ }
+
+ ///
+ /// Benchmark stream request。
+ ///
+ /// 请求标识。
+ /// 返回元素数量。
+ public sealed record BenchmarkStreamRequest(Guid Id, int ItemCount) :
+ GFramework.Cqrs.Abstractions.Cqrs.IStreamRequest,
+ MediatR.IStreamRequest;
+
+ ///
+ /// 复用 stream benchmark 的响应结构,保持跨场景可比性。
+ ///
+ /// 响应标识。
+ public sealed record BenchmarkResponse(Guid Id);
+
+ ///
+ /// 同时实现 GFramework.CQRS 与 MediatR 契约的最小 stream handler。
+ ///
+ public sealed class BenchmarkStreamHandler :
+ GFramework.Cqrs.Abstractions.Cqrs.IStreamRequestHandler,
+ MediatR.IStreamRequestHandler
+ {
+ ///
+ /// 处理 GFramework.CQRS stream request。
+ ///
+ /// 当前 benchmark stream 请求。
+ /// 用于中断异步枚举的取消令牌。
+ /// 低噪声、可重复的异步响应序列。
+ public IAsyncEnumerable Handle(
+ BenchmarkStreamRequest request,
+ CancellationToken cancellationToken)
+ {
+ return EnumerateAsync(request, cancellationToken);
+ }
+
+ ///
+ /// 处理 MediatR stream request。
+ ///
+ /// 当前 benchmark stream 请求。
+ /// 用于中断异步枚举的取消令牌。
+ /// 低噪声、可重复的异步响应序列。
+ IAsyncEnumerable MediatR.IStreamRequestHandler.Handle(
+ BenchmarkStreamRequest request,
+ CancellationToken cancellationToken)
+ {
+ return EnumerateAsync(request, cancellationToken);
+ }
+
+ ///
+ /// 为 benchmark 构造稳定、低噪声的异步响应序列。
+ ///
+ /// 决定元素数量和标识的 benchmark 请求。
+ /// 用于中断异步枚举的取消令牌。
+ /// 按请求数量生成的响应序列。
+ private static async IAsyncEnumerable EnumerateAsync(
+ BenchmarkStreamRequest request,
+ [EnumeratorCancellation] CancellationToken cancellationToken)
+ {
+ for (int index = 0; index < request.ItemCount; index++)
+ {
+ cancellationToken.ThrowIfCancellationRequested();
+ yield return new BenchmarkResponse(request.Id);
+ await Task.CompletedTask.ConfigureAwait(false);
+ }
+ }
+ }
+
+ ///
+ /// 为 benchmark 提供统一的 no-op stream pipeline 行为实现,尽量把测量焦点保持在调度器与行为编排本身。
+ ///
+ public abstract class BenchmarkStreamPipelineBehaviorBase :
+ GFramework.Cqrs.Abstractions.Cqrs.IStreamPipelineBehavior,
+ MediatR.IStreamPipelineBehavior
+ {
+ ///
+ /// 透传 GFramework.CQRS stream pipeline,避免引入额外业务逻辑噪音。
+ ///
+ /// 当前 benchmark stream 请求。
+ /// 继续向下执行的 stream pipeline 委托。
+ /// 取消令牌。
+ /// 下游 handler 产出的异步响应序列。
+ public IAsyncEnumerable Handle(
+ BenchmarkStreamRequest message,
+ GFramework.Cqrs.Abstractions.Cqrs.StreamMessageHandlerDelegate next,
+ CancellationToken cancellationToken)
+ {
+ return next(message, cancellationToken);
+ }
+
+ ///
+ /// 透传 MediatR stream pipeline,保持与 GFramework.CQRS 相同的 no-op 语义。
+ ///
+ /// 当前 benchmark stream 请求。
+ /// 继续向下执行的 MediatR stream pipeline 委托。
+ /// 取消令牌。
+ /// 下游 handler 产出的异步响应序列。
+ IAsyncEnumerable MediatR.IStreamPipelineBehavior.Handle(
+ BenchmarkStreamRequest request,
+ MediatR.StreamHandlerDelegate next,
+ CancellationToken cancellationToken)
+ {
+ _ = request;
+ _ = cancellationToken;
+ return next();
+ }
+ }
+
+ ///
+ /// pipeline 矩阵中的第一个 no-op stream 行为。
+ ///
+ public sealed class BenchmarkStreamPipelineBehavior1 : BenchmarkStreamPipelineBehaviorBase
+ {
+ }
+
+ ///
+ /// pipeline 矩阵中的第二个 no-op stream 行为。
+ ///
+ public sealed class BenchmarkStreamPipelineBehavior2 : BenchmarkStreamPipelineBehaviorBase
+ {
+ }
+
+ ///
+ /// pipeline 矩阵中的第三个 no-op stream 行为。
+ ///
+ public sealed class BenchmarkStreamPipelineBehavior3 : BenchmarkStreamPipelineBehaviorBase
+ {
+ }
+
+ ///
+ /// pipeline 矩阵中的第四个 no-op stream 行为。
+ ///
+ public sealed class BenchmarkStreamPipelineBehavior4 : BenchmarkStreamPipelineBehaviorBase
+ {
+ }
+
+ ///
+ /// 为 stream pipeline benchmark 提供 handwritten generated registry,
+ /// 让默认 pipeline 宿主也能走真实的 generated stream invoker provider 接线路径。
+ ///
+ public sealed class GeneratedStreamPipelineBenchmarkRegistry :
+ GFramework.Cqrs.ICqrsHandlerRegistry,
+ GFramework.Cqrs.ICqrsStreamInvokerProvider,
+ GFramework.Cqrs.IEnumeratesCqrsStreamInvokerDescriptors
+ {
+ private static readonly GFramework.Cqrs.CqrsStreamInvokerDescriptor Descriptor =
+ new(
+ typeof(GFramework.Cqrs.Abstractions.Cqrs.IStreamRequestHandler<
+ BenchmarkStreamRequest,
+ BenchmarkResponse>),
+ typeof(GeneratedStreamPipelineBenchmarkRegistry).GetMethod(
+ nameof(InvokeBenchmarkStreamHandler),
+ BindingFlags.Public | BindingFlags.Static)
+ ?? throw new InvalidOperationException("Missing generated stream pipeline benchmark method."));
+
+ private static readonly IReadOnlyList Descriptors =
+ [
+ new GFramework.Cqrs.CqrsStreamInvokerDescriptorEntry(
+ typeof(BenchmarkStreamRequest),
+ typeof(BenchmarkResponse),
+ Descriptor)
+ ];
+
+ ///
+ /// 把 stream pipeline benchmark handler 注册为单例,保持与当前矩阵宿主一致的生命周期语义。
+ ///
+ /// 用于承载 generated handler 注册的服务集合。
+ /// 记录 generated registry 接线结果的日志器。
+ public void Register(IServiceCollection services, ILogger logger)
+ {
+ ArgumentNullException.ThrowIfNull(services);
+ ArgumentNullException.ThrowIfNull(logger);
+
+ services.AddSingleton(
+ typeof(GFramework.Cqrs.Abstractions.Cqrs.IStreamRequestHandler),
+ typeof(BenchmarkStreamHandler));
+ logger.Debug("Registered generated stream pipeline benchmark handler.");
+ }
+
+ ///
+ /// 返回当前 provider 暴露的全部 generated stream invoker 描述符。
+ ///
+ /// 当前 benchmark 的 generated stream invoker 描述符集合。
+ public IReadOnlyList GetDescriptors()
+ {
+ return Descriptors;
+ }
+
+ ///
+ /// 为目标流式请求/响应类型对返回 generated stream invoker 描述符。
+ ///
+ /// 要匹配的 stream 请求类型。
+ /// 要匹配的 stream 响应类型。
+ /// 命中时返回的 generated stream invoker 描述符。
+ /// 是否命中了当前 benchmark 的 stream 请求/响应类型对。
+ public bool TryGetDescriptor(
+ Type requestType,
+ Type responseType,
+ out GFramework.Cqrs.CqrsStreamInvokerDescriptor? descriptor)
+ {
+ if (requestType == typeof(BenchmarkStreamRequest) &&
+ responseType == typeof(BenchmarkResponse))
+ {
+ descriptor = Descriptor;
+ return true;
+ }
+
+ descriptor = null;
+ return false;
+ }
+
+ ///
+ /// 模拟 generated stream invoker provider 为 stream pipeline benchmark 产出的开放静态调用入口。
+ ///
+ /// 当前要调用的 stream handler 实例。
+ /// 当前要分发的 stream 请求实例。
+ /// 用于向 handler 传播的取消令牌。
+ /// handler 产出的异步响应序列。
+ public static object InvokeBenchmarkStreamHandler(
+ object handler,
+ object request,
+ CancellationToken cancellationToken)
+ {
+ var typedHandler = (GFramework.Cqrs.Abstractions.Cqrs.IStreamRequestHandler<
+ BenchmarkStreamRequest,
+ BenchmarkResponse>)handler;
+ var typedRequest = (BenchmarkStreamRequest)request;
+ return typedHandler.Handle(typedRequest, cancellationToken);
+ }
+ }
+}
diff --git a/GFramework.Cqrs.Benchmarks/README.md b/GFramework.Cqrs.Benchmarks/README.md
index 69038b45..9466c9bd 100644
--- a/GFramework.Cqrs.Benchmarks/README.md
+++ b/GFramework.Cqrs.Benchmarks/README.md
@@ -34,6 +34,9 @@
- `Messaging/StreamInvokerBenchmarks.cs`
- baseline、`GFramework.Cqrs` reflection stream binding、`GFramework.Cqrs` generated stream invoker、`MediatR`
- 同时提供 `FirstItem` 与 `DrainAll` 两种观测口径
+ - `Messaging/StreamPipelineBenchmarks.cs`
+ - `0 / 1 / 4` 个 stream pipeline 行为下,baseline、默认 generated-provider 宿主接线的 `GFramework.Cqrs` runtime 与 `MediatR`
+ - 同时提供 `FirstItem` 与 `DrainAll` 两种观测口径
- stream startup
- `Messaging/StreamStartupBenchmarks.cs`
- `Initialization` 与 `ColdStart` 两组下,`GFramework.Cqrs` reflection、`GFramework.Cqrs` generated、`MediatR`
@@ -68,6 +71,7 @@ dotnet run --project GFramework.Cqrs.Benchmarks/GFramework.Cqrs.Benchmarks.cspro
```bash
dotnet run --project GFramework.Cqrs.Benchmarks/GFramework.Cqrs.Benchmarks.csproj -c Release --no-build -- --filter "*RequestLifetimeBenchmarks.SendRequest_*"
dotnet run --project GFramework.Cqrs.Benchmarks/GFramework.Cqrs.Benchmarks.csproj -c Release --no-build -- --filter "*StreamLifetimeBenchmarks.Stream_*"
+dotnet run --project GFramework.Cqrs.Benchmarks/GFramework.Cqrs.Benchmarks.csproj -c Release --no-build -- --filter "*StreamPipelineBenchmarks.Stream_*"
```
## 并发运行约束
@@ -87,7 +91,7 @@ dotnet run --project GFramework.Cqrs.Benchmarks/GFramework.Cqrs.Benchmarks.cspro
- `RequestLifetimeBenchmarks` 的 `Scoped` 场景会在每次 request 分发时显式创建并释放真实 DI 作用域;它观察的是 scoped handler 的解析与 dispatch 成本,不把 runtime 构造常量成本混入生命周期对照
- `NotificationLifetimeBenchmarks` 的 `Scoped` 场景也采用真实 DI 作用域;它比较的是 publish 路径上的生命周期额外开销,不是根容器解析退化后的近似值
-- `StreamingBenchmarks`、`StreamLifetimeBenchmarks`、`StreamInvokerBenchmarks` 同时暴露 `FirstItem` 与 `DrainAll`
+- `StreamingBenchmarks`、`StreamLifetimeBenchmarks`、`StreamInvokerBenchmarks`、`StreamPipelineBenchmarks` 同时暴露 `FirstItem` 与 `DrainAll`
- `FirstItem` 适合观察“建流到首个元素”的固定成本
- `DrainAll` 适合观察完整枚举整个 stream 的总成本
- `StreamStartupBenchmarks` 的 `ColdStart` 只推进到首个元素,因此它回答的是“新宿主下首次建流命中”的边界,不回答完整枚举总成本
@@ -99,4 +103,3 @@ dotnet run --project GFramework.Cqrs.Benchmarks/GFramework.Cqrs.Benchmarks.cspro
- 当前没有 stream 版的 NuGet `Mediator` source-generated concrete path 对照;stream steady-state、lifetime、startup 现在都只覆盖 `GFramework.Cqrs` 与 `MediatR`
- 当前没有 request 生命周期下的 NuGet `Mediator` compile-time lifetime 矩阵;`RequestLifetimeBenchmarks` 只覆盖 `GFramework.Cqrs` 与 `MediatR`
- 当前没有 notification fan-out 的生命周期矩阵;`NotificationFanOutBenchmarks` 只覆盖固定 `4 handler` 的已装配宿主
-- 当前没有 stream pipeline benchmark;现有 pipeline coverage 仅限 request
From 4cc7060fcf423b6b74c1c4c574f73f8228b649e1 Mon Sep 17 00:00:00 2001
From: gewuyou <95328647+GeWuYou@users.noreply.github.com>
Date: Tue, 12 May 2026 08:46:48 +0800
Subject: [PATCH 4/9] =?UTF-8?q?test(cqrs):=20=E8=A1=A5=E5=85=85=E9=80=9A?=
=?UTF-8?q?=E7=9F=A5=E5=9B=9E=E9=80=80=E5=9B=9E=E5=BD=92=E8=A6=86=E7=9B=96?=
MIME-Version: 1.0
Content-Type: text/plain; charset=UTF-8
Content-Transfer-Encoding: 8bit
- 新增默认 notification publisher fallback 与缓存行为的回归测试
- 验证容器无 publisher 时继续使用顺序发布语义且不重复查容器
---
.../Cqrs/CqrsNotificationPublisherTests.cs | 40 +++++++++++++++++++
1 file changed, 40 insertions(+)
diff --git a/GFramework.Cqrs.Tests/Cqrs/CqrsNotificationPublisherTests.cs b/GFramework.Cqrs.Tests/Cqrs/CqrsNotificationPublisherTests.cs
index 485c11e7..21a77a40 100644
--- a/GFramework.Cqrs.Tests/Cqrs/CqrsNotificationPublisherTests.cs
+++ b/GFramework.Cqrs.Tests/Cqrs/CqrsNotificationPublisherTests.cs
@@ -155,6 +155,46 @@ internal sealed class CqrsNotificationPublisherTests
Assert.That(secondPublisher.PublishCallCount, Is.Zero);
}
+ ///
+ /// 验证当容器里没有任何通知发布器时,dispatcher 会回退到内置顺序发布器,
+ /// 并在首次解析后缓存该 fallback 结果而不是在后续发布时重新查询容器。
+ ///
+ [Test]
+ public async Task PublishAsync_Should_Fallback_To_SequentialNotificationPublisher_And_Cache_It_When_None_Is_Registered()
+ {
+ var invocationOrder = new List();
+ var notificationPublisherLookupCount = 0;
+ var runtime = CreateRuntime(
+ container =>
+ {
+ container
+ .Setup(currentContainer => currentContainer.GetAll(typeof(INotificationHandler)))
+ .Returns(
+ [
+ new RecordingNotificationHandler("first", invocationOrder),
+ new RecordingNotificationHandler("second", invocationOrder)
+ ]);
+ container
+ .Setup(currentContainer => currentContainer.GetAll(typeof(INotificationPublisher)))
+ .Returns(() =>
+ {
+ notificationPublisherLookupCount++;
+ return notificationPublisherLookupCount switch
+ {
+ 1 => Array.Empty