feat(cqrs): 添加流式命令处理器和自动注册功能

- 实现 AbstractStreamCommandHandler 基类支持流式命令处理
- 创建 CqrsHandlerRegistrar 自动扫描注册 CQRS 处理器
- 添加流式处理器接口 IStreamRequestHandler 支持
- 实现处理器注册的容错机制和类型加载恢复
- 添加确定性排序确保跨环境稳定的处理器注册顺序
- 提供完整的单元测试验证注册行为和异常处理
This commit is contained in:
GeWuYou 2026-04-14 22:05:20 +08:00
parent 195c8321a1
commit f8fa2a8481
3 changed files with 134 additions and 45 deletions

View File

@ -14,18 +14,7 @@ namespace GFramework.Core.Tests.Cqrs;
[TestFixture] [TestFixture]
internal sealed class CqrsHandlerRegistrarTests internal sealed class CqrsHandlerRegistrarTests
{ {
private static readonly MethodInfo RecoverLoadableTypesMethod = typeof(ArchitectureContext).Assembly
.GetType(
"GFramework.Core.Cqrs.Internal.CqrsHandlerRegistrar",
throwOnError: true)!
.GetMethod("RecoverLoadableTypes",
BindingFlags.NonPublic |
BindingFlags.Static)!
?? throw new InvalidOperationException(
"Failed to locate CqrsHandlerRegistrar.RecoverLoadableTypes.");
private MicrosoftDiContainer? _container; private MicrosoftDiContainer? _container;
private ArchitectureContext? _context; private ArchitectureContext? _context;
/// <summary> /// <summary>
@ -40,8 +29,7 @@ internal sealed class CqrsHandlerRegistrarTests
_container = new MicrosoftDiContainer(); _container = new MicrosoftDiContainer();
CqrsTestRuntime.RegisterHandlers( CqrsTestRuntime.RegisterHandlers(
_container, _container,
typeof(CqrsHandlerRegistrarTests).Assembly, typeof(CqrsHandlerRegistrarTests).Assembly);
typeof(ArchitectureContext).Assembly);
_container.Freeze(); _container.Freeze();
_context = new ArchitectureContext(_container); _context = new ArchitectureContext(_container);
@ -79,28 +67,53 @@ internal sealed class CqrsHandlerRegistrarTests
/// 验证部分类型加载失败时仍能保留可加载类型,并记录诊断日志。 /// 验证部分类型加载失败时仍能保留可加载类型,并记录诊断日志。
/// </summary> /// </summary>
[Test] [Test]
public void RecoverLoadableTypes_Should_Return_Loadable_Types_And_Log_Warnings() public void RegisterHandlers_Should_Register_Loadable_Types_And_Log_Warnings_When_Assembly_Load_Partially_Fails()
{ {
var logger = new TestLogger(nameof(CqrsHandlerRegistrarTests), LogLevel.Warning); var originalProvider = LoggerFactoryResolver.Provider;
var capturingProvider = new CapturingLoggerFactoryProvider(LogLevel.Warning);
var reflectionTypeLoadException = new ReflectionTypeLoadException( var reflectionTypeLoadException = new ReflectionTypeLoadException(
[typeof(AlphaDeterministicNotificationHandler), null], [typeof(AlphaDeterministicNotificationHandler), null],
[new TypeLoadException("Missing optional dependency for registrar test.")]); [new TypeLoadException("Missing optional dependency for registrar test.")]);
var partiallyLoadableAssembly = new Mock<Assembly>();
partiallyLoadableAssembly
.SetupGet(static assembly => assembly.FullName)
.Returns("GFramework.Core.Tests.Cqrs.PartiallyLoadableAssembly, Version=1.0.0.0");
partiallyLoadableAssembly
.Setup(static assembly => assembly.GetTypes())
.Throws(reflectionTypeLoadException);
var recoveredTypes = (IReadOnlyList<Type>)RecoverLoadableTypesMethod.Invoke( LoggerFactoryResolver.Provider = capturingProvider;
null, try
[typeof(CqrsHandlerRegistrarTests).Assembly, reflectionTypeLoadException, logger])!;
Assert.Multiple(() =>
{ {
Assert.That(recoveredTypes, Is.EqualTo([typeof(AlphaDeterministicNotificationHandler)])); var container = new MicrosoftDiContainer();
Assert.That(logger.Logs.Count(log => log.Level == LogLevel.Warning), Is.GreaterThanOrEqualTo(2)); CqrsTestRuntime.RegisterHandlers(container, partiallyLoadableAssembly.Object);
Assert.That( container.Freeze();
logger.Logs.Any(log => log.Message.Contains("partially failed", StringComparison.Ordinal)),
Is.True); var handlers = container.GetAll<INotificationHandler<DeterministicOrderNotification>>();
Assert.That( var warningLogs = capturingProvider.Loggers
logger.Logs.Any(log => log.Message.Contains("Missing optional dependency", StringComparison.Ordinal)), .SelectMany(static logger => logger.Logs)
Is.True); .Where(static log => log.Level == LogLevel.Warning)
}); .ToList();
Assert.Multiple(() =>
{
Assert.That(
handlers.Select(static handler => handler.GetType()),
Is.EqualTo([typeof(AlphaDeterministicNotificationHandler)]));
Assert.That(warningLogs.Count, Is.GreaterThanOrEqualTo(2));
Assert.That(
warningLogs.Any(log => log.Message.Contains("partially failed", StringComparison.Ordinal)),
Is.True);
Assert.That(
warningLogs.Any(log =>
log.Message.Contains("Missing optional dependency", StringComparison.Ordinal)),
Is.True);
});
}
finally
{
LoggerFactoryResolver.Provider = originalProvider;
}
} }
} }
@ -163,3 +176,46 @@ internal sealed class AlphaDeterministicNotificationHandler : INotificationHandl
return ValueTask.CompletedTask; return ValueTask.CompletedTask;
} }
} }
/// <summary>
/// 为 CQRS 注册测试捕获真实启动路径中创建的日志记录器。
/// </summary>
/// <remarks>
/// 处理器注册入口会分别为测试运行时、容器和注册器创建日志器。
/// 该提供程序统一保留这些测试日志器,以便断言警告是否经由公开入口真正发出。
/// </remarks>
internal sealed class CapturingLoggerFactoryProvider : ILoggerFactoryProvider
{
private readonly List<TestLogger> _loggers = [];
/// <summary>
/// 使用指定的最小日志级别初始化一个新的捕获型日志工厂提供程序。
/// </summary>
/// <param name="minLevel">要应用到新建测试日志器的最小日志级别。</param>
public CapturingLoggerFactoryProvider(LogLevel minLevel = LogLevel.Info)
{
MinLevel = minLevel;
}
/// <summary>
/// 获取通过当前提供程序创建的全部测试日志器。
/// </summary>
public IReadOnlyList<TestLogger> Loggers => _loggers;
/// <summary>
/// 获取或设置新建测试日志器的最小日志级别。
/// </summary>
public LogLevel MinLevel { get; set; }
/// <summary>
/// 创建一个测试日志器并将其纳入捕获集合。
/// </summary>
/// <param name="name">日志记录器名称。</param>
/// <returns>用于后续断言的测试日志器。</returns>
public ILogger CreateLogger(string name)
{
var logger = new TestLogger(name, MinLevel);
_loggers.Add(logger);
return logger;
}
}

View File

@ -1,24 +1,49 @@
using System.Reflection; using System.Reflection;
using GFramework.Core.Abstractions.Ioc;
using GFramework.Core.Abstractions.Logging;
using GFramework.Core.Architectures; using GFramework.Core.Architectures;
using GFramework.Core.Ioc; using GFramework.Core.Ioc;
using GFramework.Core.Logging; using GFramework.Core.Logging;
namespace GFramework.Core.Tests; namespace GFramework.Core.Tests;
/// <summary>
/// 为测试项目提供对 CQRS 处理器真实注册入口的受控访问。
/// </summary>
/// <remarks>
/// 测试应通过该入口驱动注册流程,而不是直接反射调用注册器的私有辅助方法,
/// 这样可以覆盖生产启动路径中的程序集去重、日志记录与容错恢复行为。
/// </remarks>
internal static class CqrsTestRuntime internal static class CqrsTestRuntime
{ {
private static readonly MethodInfo RegisterHandlersMethod = typeof(ArchitectureContext).Assembly private static readonly Type CqrsHandlerRegistrarType = typeof(ArchitectureContext).Assembly
.GetType( .GetType(
"GFramework.Core.Cqrs.Internal.CqrsHandlerRegistrar", "GFramework.Core.Cqrs.Internal.CqrsHandlerRegistrar",
throwOnError: true)! throwOnError: true)!
?? throw new InvalidOperationException(
"Failed to locate CqrsHandlerRegistrar type.");
private static readonly MethodInfo RegisterHandlersMethod = CqrsHandlerRegistrarType
.GetMethod( .GetMethod(
"RegisterHandlers", "RegisterHandlers",
BindingFlags.Public | BindingFlags.NonPublic | BindingFlags.Public | BindingFlags.NonPublic |
BindingFlags.Static)! BindingFlags.Static,
binder: null,
[
typeof(IIocContainer),
typeof(IEnumerable<Assembly>),
typeof(ILogger)
],
modifiers: null)
?? throw new InvalidOperationException( ?? throw new InvalidOperationException(
"Failed to locate CqrsHandlerRegistrar.RegisterHandlers."); "Failed to locate CqrsHandlerRegistrar.RegisterHandlers.");
public static void RegisterHandlers(MicrosoftDiContainer container, params Assembly[] assemblies) /// <summary>
/// 通过与生产代码一致的注册入口扫描并注册指定程序集中的 CQRS 处理器。
/// </summary>
/// <param name="container">承载处理器映射的测试容器。</param>
/// <param name="assemblies">要扫描的程序集集合。</param>
internal static void RegisterHandlers(MicrosoftDiContainer container, params Assembly[] assemblies)
{ {
ArgumentNullException.ThrowIfNull(container); ArgumentNullException.ThrowIfNull(container);
ArgumentNullException.ThrowIfNull(assemblies); ArgumentNullException.ThrowIfNull(assemblies);

View File

@ -18,22 +18,30 @@ using GFramework.Core.Rule;
namespace GFramework.Core.Cqrs.Command; namespace GFramework.Core.Cqrs.Command;
/// <summary> /// <summary>
/// 抽象流式命令处理器基类 /// 抽象流式命令处理器基类
/// 继承自 ContextAwareBase 并实现 IStreamRequestHandler 接口,为具体的流式命令处理器提供基础功能。 /// 继承自 <see cref="ContextAwareBase" /> 并实现 <see cref="IStreamRequestHandler{TRequest,TResponse}" />
/// 支持流式处理命令并产生异步可枚举的响应序列,框架会在每次创建流前注入当前架构上下文 /// 为具体的流式命令处理器提供基础功能
/// </summary> /// </summary>
/// <typeparam name="TCommand">流式命令类型必须实现IStreamCommand接口</typeparam> /// <typeparam name="TCommand">流式命令类型,必须实现 <see cref="IStreamCommand{TResponse}" />。</typeparam>
/// <typeparam name="TResponse">流式命令响应元素类型</typeparam> /// <typeparam name="TResponse">流式命令响应元素类型。</typeparam>
/// <remarks>
/// 框架会在每次调用 <c>CreateStream</c> 进入实际处理逻辑前,为当前处理器实例注入架构上下文,
/// 因此派生类只能在 <see cref="Handle" /> 执行期间及其返回的异步枚举序列内假定 <c>Context</c> 可用。
/// 默认注册器会将流式命令处理器注册为瞬态服务,以避免同一个上下文感知实例在多个流或并发请求之间复用。
/// 派生类不应缓存处理器实例,也不应把依赖当前上下文的可变状态泄漏到流外部。
/// 传入 <see cref="Handle" /> 的取消令牌同时约束流的创建与后续枚举,
/// 派生类应在启动阶段和每次生成响应前尊重取消请求,避免在调用方停止枚举后继续执行后台工作。
/// </remarks>
public abstract class AbstractStreamCommandHandler<TCommand, TResponse> : ContextAwareBase, public abstract class AbstractStreamCommandHandler<TCommand, TResponse> : ContextAwareBase,
IStreamRequestHandler<TCommand, TResponse> IStreamRequestHandler<TCommand, TResponse>
where TCommand : IStreamCommand<TResponse> where TCommand : IStreamCommand<TResponse>
{ {
/// <summary> /// <summary>
/// 处理流式命令并返回异步可枚举的响应序列 /// 处理流式命令并返回异步可枚举的响应序列
/// 由具体的流式命令处理器子类实现流式处理逻辑 /// 由具体的流式命令处理器子类实现流式处理逻辑
/// </summary> /// </summary>
/// <param name="command">要处理的流式命令对象</param> /// <param name="command">要处理的流式命令对象</param>
/// <param name="cancellationToken">取消令牌,用于取消流式处理操作</param> /// <param name="cancellationToken">取消令牌,用于取消流式处理操作</param>
/// <returns>异步可枚举的响应序列,每个元素类型为TResponse</returns> /// <returns>异步可枚举的响应序列,每个元素类型为 <typeparamref name="TResponse" />。</returns>
public abstract IAsyncEnumerable<TResponse> Handle(TCommand command, CancellationToken cancellationToken); public abstract IAsyncEnumerable<TResponse> Handle(TCommand command, CancellationToken cancellationToken);
} }