diff --git a/GFramework.Core/Ioc/MicrosoftDiContainer.cs b/GFramework.Core/Ioc/MicrosoftDiContainer.cs index 6152366f..d1a0576d 100644 --- a/GFramework.Core/Ioc/MicrosoftDiContainer.cs +++ b/GFramework.Core/Ioc/MicrosoftDiContainer.cs @@ -400,7 +400,7 @@ public class MicrosoftDiContainer(IServiceCollection? serviceCollection = null) /// 要接入的程序集集合。 /// /// 中存在 元素。 - /// 容器已冻结,无法继续注册 CQRS 处理器。 + /// 容器已冻结,无法继续注册 CQRS 处理器。 public void RegisterCqrsHandlersFromAssemblies(IEnumerable assemblies) { ArgumentNullException.ThrowIfNull(assemblies); diff --git a/GFramework.Cqrs.Tests/Cqrs/CqrsDispatcherCacheTests.cs b/GFramework.Cqrs.Tests/Cqrs/CqrsDispatcherCacheTests.cs new file mode 100644 index 00000000..6c52a910 --- /dev/null +++ b/GFramework.Cqrs.Tests/Cqrs/CqrsDispatcherCacheTests.cs @@ -0,0 +1,172 @@ +using GFramework.Core.Abstractions.Logging; +using GFramework.Core.Architectures; +using GFramework.Core.Ioc; +using GFramework.Core.Logging; +using GFramework.Cqrs.Abstractions.Cqrs; + +namespace GFramework.Cqrs.Tests.Cqrs; + +/// +/// 验证 CQRS dispatcher 会缓存热路径中的服务类型构造结果。 +/// +[TestFixture] +internal sealed class CqrsDispatcherCacheTests +{ + private MicrosoftDiContainer? _container; + private ArchitectureContext? _context; + + /// + /// 初始化测试上下文。 + /// + [SetUp] + public void SetUp() + { + LoggerFactoryResolver.Provider = new ConsoleLoggerFactoryProvider(); + _container = new MicrosoftDiContainer(); + + CqrsTestRuntime.RegisterHandlers( + _container, + typeof(CqrsDispatcherCacheTests).Assembly, + typeof(ArchitectureContext).Assembly); + + _container.Freeze(); + _context = new ArchitectureContext(_container); + } + + /// + /// 清理测试上下文引用。 + /// + [TearDown] + public void TearDown() + { + _context = null; + _container = null; + } + + /// + /// 验证相同消息类型重复分发时,不会重复扩张服务类型缓存。 + /// + [Test] + public async Task Dispatcher_Should_Cache_Service_Types_After_First_Dispatch() + { + var notificationServiceTypes = GetCacheField("NotificationHandlerServiceTypes"); + var requestServiceTypes = GetCacheField("RequestServiceTypes"); + var streamServiceTypes = GetCacheField("StreamHandlerServiceTypes"); + + var notificationBefore = notificationServiceTypes.Count; + var requestBefore = requestServiceTypes.Count; + var streamBefore = streamServiceTypes.Count; + + await _context!.SendRequestAsync(new DispatcherCacheRequest()); + await _context.PublishAsync(new DispatcherCacheNotification()); + await DrainAsync(_context.CreateStream(new DispatcherCacheStreamRequest())); + + var notificationAfterFirstDispatch = notificationServiceTypes.Count; + var requestAfterFirstDispatch = requestServiceTypes.Count; + var streamAfterFirstDispatch = streamServiceTypes.Count; + + await _context.SendRequestAsync(new DispatcherCacheRequest()); + await _context.PublishAsync(new DispatcherCacheNotification()); + await DrainAsync(_context.CreateStream(new DispatcherCacheStreamRequest())); + + Assert.Multiple(() => + { + Assert.That(notificationAfterFirstDispatch, Is.EqualTo(notificationBefore + 1)); + Assert.That(requestAfterFirstDispatch, Is.EqualTo(requestBefore + 1)); + Assert.That(streamAfterFirstDispatch, Is.EqualTo(streamBefore + 1)); + + Assert.That(notificationServiceTypes.Count, Is.EqualTo(notificationAfterFirstDispatch)); + Assert.That(requestServiceTypes.Count, Is.EqualTo(requestAfterFirstDispatch)); + Assert.That(streamServiceTypes.Count, Is.EqualTo(streamAfterFirstDispatch)); + }); + } + + /// + /// 通过反射读取 dispatcher 的静态缓存字典。 + /// + private static IDictionary GetCacheField(string fieldName) + { + var dispatcherType = typeof(CqrsReflectionFallbackAttribute).Assembly + .GetType("GFramework.Cqrs.Internal.CqrsDispatcher", throwOnError: true)!; + + var field = dispatcherType.GetField( + fieldName, + BindingFlags.NonPublic | BindingFlags.Static); + + Assert.That(field, Is.Not.Null, $"Missing dispatcher cache field {fieldName}."); + + return field!.GetValue(null) as IDictionary + ?? throw new InvalidOperationException( + $"Dispatcher cache field {fieldName} does not implement IDictionary."); + } + + /// + /// 消费整个异步流,确保建流路径被真实执行。 + /// + private static async Task DrainAsync(IAsyncEnumerable stream) + { + await foreach (var _ in stream) + { + } + } +} + +/// +/// 用于验证 request 服务类型缓存的测试请求。 +/// +internal sealed record DispatcherCacheRequest : IRequest; + +/// +/// 用于验证 notification 服务类型缓存的测试通知。 +/// +internal sealed record DispatcherCacheNotification : INotification; + +/// +/// 用于验证 stream 服务类型缓存的测试请求。 +/// +internal sealed record DispatcherCacheStreamRequest : IStreamRequest; + +/// +/// 处理 。 +/// +internal sealed class DispatcherCacheRequestHandler : IRequestHandler +{ + /// + /// 返回固定结果,供缓存测试验证 dispatcher 请求路径。 + /// + public ValueTask Handle(DispatcherCacheRequest request, CancellationToken cancellationToken) + { + return ValueTask.FromResult(1); + } +} + +/// +/// 处理 。 +/// +internal sealed class DispatcherCacheNotificationHandler : INotificationHandler +{ + /// + /// 消费通知,不执行额外副作用。 + /// + public ValueTask Handle(DispatcherCacheNotification notification, CancellationToken cancellationToken) + { + return ValueTask.CompletedTask; + } +} + +/// +/// 处理 。 +/// +internal sealed class DispatcherCacheStreamHandler : IStreamRequestHandler +{ + /// + /// 返回一个最小流,供缓存测试命中 stream 分发路径。 + /// + public async IAsyncEnumerable Handle( + DispatcherCacheStreamRequest request, + [EnumeratorCancellation] CancellationToken cancellationToken) + { + yield return 1; + await Task.CompletedTask; + } +} diff --git a/GFramework.Cqrs.Tests/Cqrs/CqrsHandlerRegistrarTests.cs b/GFramework.Cqrs.Tests/Cqrs/CqrsHandlerRegistrarTests.cs index 95afa92f..b44b0bb1 100644 --- a/GFramework.Cqrs.Tests/Cqrs/CqrsHandlerRegistrarTests.cs +++ b/GFramework.Cqrs.Tests/Cqrs/CqrsHandlerRegistrarTests.cs @@ -243,6 +243,55 @@ internal sealed class CqrsHandlerRegistrarTests Times.Once); generatedAssembly.Verify(static assembly => assembly.GetTypes(), Times.Never); } + + /// + /// 验证手写 fallback metadata 直接提供 handler 类型时,运行时会复用这些类型, + /// 而不会再通过程序集名称查找或整程序集扫描补齐映射。 + /// + [Test] + public void RegisterHandlers_Should_Use_Direct_Fallback_Types_Without_GetType_Or_GetTypes() + { + var generatedAssembly = new Mock(); + generatedAssembly + .SetupGet(static assembly => assembly.FullName) + .Returns(ReflectionFallbackNotificationContainer.ReflectionOnlyHandlerType.Assembly.FullName); + generatedAssembly + .Setup(static assembly => assembly.GetCustomAttributes(typeof(CqrsHandlerRegistryAttribute), false)) + .Returns([new CqrsHandlerRegistryAttribute(typeof(PartialGeneratedNotificationHandlerRegistry))]); + generatedAssembly + .Setup(static assembly => assembly.GetCustomAttributes(typeof(CqrsReflectionFallbackAttribute), false)) + .Returns( + [ + new CqrsReflectionFallbackAttribute( + ReflectionFallbackNotificationContainer.ReflectionOnlyHandlerType) + ]); + + var container = new MicrosoftDiContainer(); + CqrsTestRuntime.RegisterHandlers(container, generatedAssembly.Object); + + var registrations = container.GetServicesUnsafe + .Where(static descriptor => + descriptor.ServiceType == typeof(INotificationHandler) && + descriptor.ImplementationType is not null) + .Select(static descriptor => descriptor.ImplementationType!) + .ToList(); + + Assert.That( + registrations, + Is.EqualTo( + [ + typeof(GeneratedRegistryNotificationHandler), + ReflectionFallbackNotificationContainer.ReflectionOnlyHandlerType + ])); + + generatedAssembly.Verify( + static assembly => assembly.GetType( + ReflectionFallbackNotificationContainer.ReflectionOnlyHandlerType.FullName!, + false, + false), + Times.Never); + generatedAssembly.Verify(static assembly => assembly.GetTypes(), Times.Never); + } } /// diff --git a/GFramework.Cqrs/CqrsReflectionFallbackAttribute.cs b/GFramework.Cqrs/CqrsReflectionFallbackAttribute.cs index 9d3c21bf..da557d84 100644 --- a/GFramework.Cqrs/CqrsReflectionFallbackAttribute.cs +++ b/GFramework.Cqrs/CqrsReflectionFallbackAttribute.cs @@ -11,6 +11,15 @@ namespace GFramework.Cqrs; [AttributeUsage(AttributeTargets.Assembly)] public sealed class CqrsReflectionFallbackAttribute : Attribute { + /// + /// 初始化 ,保留旧版“仅标记需要补扫”的语义。 + /// + public CqrsReflectionFallbackAttribute() + { + FallbackHandlerTypeNames = []; + FallbackHandlerTypes = []; + } + /// /// 初始化 。 /// @@ -27,10 +36,36 @@ public sealed class CqrsReflectionFallbackAttribute : Attribute .Distinct(StringComparer.Ordinal) .OrderBy(static typeName => typeName, StringComparer.Ordinal) .ToArray(); + FallbackHandlerTypes = []; + } + + /// + /// 初始化 。 + /// + /// + /// 需要运行时补充反射注册的处理器类型。 + /// 该重载适合手写或第三方程序集显式声明可直接引用的 fallback handlers, + /// 避免再通过字符串名称回查程序集元数据。 + /// + public CqrsReflectionFallbackAttribute(params Type[] fallbackHandlerTypes) + { + ArgumentNullException.ThrowIfNull(fallbackHandlerTypes); + + FallbackHandlerTypeNames = []; + FallbackHandlerTypes = fallbackHandlerTypes + .Where(static type => type is not null) + .Distinct() + .OrderBy(static type => type.FullName ?? type.Name, StringComparer.Ordinal) + .ToArray(); } /// /// 获取需要运行时补充反射注册的处理器类型全名集合。 /// public IReadOnlyList FallbackHandlerTypeNames { get; } + + /// + /// 获取可直接供运行时补充反射注册的处理器类型集合。 + /// + public IReadOnlyList FallbackHandlerTypes { get; } } diff --git a/GFramework.Cqrs/Internal/CqrsDispatcher.cs b/GFramework.Cqrs/Internal/CqrsDispatcher.cs index 9a125789..91532e17 100644 --- a/GFramework.Cqrs/Internal/CqrsDispatcher.cs +++ b/GFramework.Cqrs/Internal/CqrsDispatcher.cs @@ -1,5 +1,3 @@ -using System.Collections.Concurrent; -using System.Reflection; using GFramework.Core.Abstractions.Architectures; using GFramework.Core.Abstractions.Ioc; using GFramework.Core.Abstractions.Logging; @@ -30,10 +28,22 @@ internal sealed class CqrsDispatcher( // 进程级缓存:缓存通知调用委托,复用并发安全字典以支撑多线程发布路径。 private static readonly ConcurrentDictionary NotificationInvokers = new(); + // 进程级缓存:缓存通知处理器服务类型,避免每次发布都重复 MakeGenericType。 + private static readonly ConcurrentDictionary NotificationHandlerServiceTypes = new(); + // 进程级缓存:缓存流式请求调用委托,避免每次创建流时重复解析反射签名。 private static readonly ConcurrentDictionary<(Type RequestType, Type ResponseType), StreamInvoker> StreamInvokers = new(); + // 进程级缓存:缓存请求处理器与 pipeline 行为的服务类型,减少热路径中的泛型类型构造。 + private static readonly ConcurrentDictionary<(Type RequestType, Type ResponseType), RequestServiceTypeSet> + RequestServiceTypes = new(); + + // 进程级缓存:缓存流式请求处理器服务类型,避免每次建流时重复 MakeGenericType。 + private static readonly ConcurrentDictionary<(Type RequestType, Type ResponseType), Type> + StreamHandlerServiceTypes = + new(); + /// /// 发布通知到所有已注册处理器。 /// @@ -51,7 +61,9 @@ internal sealed class CqrsDispatcher( ArgumentNullException.ThrowIfNull(notification); var notificationType = notification.GetType(); - var handlerType = typeof(INotificationHandler<>).MakeGenericType(notificationType); + var handlerType = NotificationHandlerServiceTypes.GetOrAdd( + notificationType, + static type => typeof(INotificationHandler<>).MakeGenericType(type)); var handlers = container.GetAll(handlerType); if (handlers.Count == 0) @@ -88,14 +100,18 @@ internal sealed class CqrsDispatcher( ArgumentNullException.ThrowIfNull(request); var requestType = request.GetType(); - var handlerType = typeof(IRequestHandler<,>).MakeGenericType(requestType, typeof(TResponse)); + var serviceTypes = RequestServiceTypes.GetOrAdd( + (requestType, typeof(TResponse)), + static key => new RequestServiceTypeSet( + typeof(IRequestHandler<,>).MakeGenericType(key.RequestType, key.ResponseType), + typeof(IPipelineBehavior<,>).MakeGenericType(key.RequestType, key.ResponseType))); + var handlerType = serviceTypes.HandlerType; var handler = container.Get(handlerType) ?? throw new InvalidOperationException( $"No CQRS request handler registered for {requestType.FullName}."); PrepareHandler(handler, context); - var behaviorType = typeof(IPipelineBehavior<,>).MakeGenericType(requestType, typeof(TResponse)); - var behaviors = container.GetAll(behaviorType); + var behaviors = container.GetAll(serviceTypes.BehaviorType); foreach (var behavior in behaviors) PrepareHandler(behavior, context); @@ -135,7 +151,9 @@ internal sealed class CqrsDispatcher( ArgumentNullException.ThrowIfNull(request); var requestType = request.GetType(); - var handlerType = typeof(IStreamRequestHandler<,>).MakeGenericType(requestType, typeof(TResponse)); + var handlerType = StreamHandlerServiceTypes.GetOrAdd( + (requestType, typeof(TResponse)), + static key => typeof(IStreamRequestHandler<,>).MakeGenericType(key.RequestType, key.ResponseType)); var handler = container.Get(handlerType) ?? throw new InvalidOperationException( $"No CQRS stream handler registered for {requestType.FullName}."); @@ -293,4 +311,6 @@ internal sealed class CqrsDispatcher( CancellationToken cancellationToken); private delegate object StreamInvoker(object handler, object request, CancellationToken cancellationToken); + + private readonly record struct RequestServiceTypeSet(Type HandlerType, Type BehaviorType); } diff --git a/GFramework.Cqrs/Internal/CqrsHandlerRegistrar.cs b/GFramework.Cqrs/Internal/CqrsHandlerRegistrar.cs index 968424b7..3604de83 100644 --- a/GFramework.Cqrs/Internal/CqrsHandlerRegistrar.cs +++ b/GFramework.Cqrs/Internal/CqrsHandlerRegistrar.cs @@ -40,7 +40,7 @@ internal static class CqrsHandlerRegistrar container.GetServicesUnsafe, assembly, logger, - generatedRegistrationResult.ReflectionFallbackTypeNames); + generatedRegistrationResult.ReflectionFallbackMetadata); } } @@ -106,13 +106,13 @@ internal static class CqrsHandlerRegistrar registry.Register(services, logger); } - var reflectionFallbackTypeNames = GetReflectionFallbackTypeNames(assembly); - if (reflectionFallbackTypeNames is not null) + var reflectionFallbackMetadata = GetReflectionFallbackMetadata(assembly, logger); + if (reflectionFallbackMetadata is not null) { - if (reflectionFallbackTypeNames.Count > 0) + if (reflectionFallbackMetadata.HasExplicitTypes) { logger.Debug( - $"Generated CQRS registry for assembly {assemblyName} requested targeted reflection fallback for {reflectionFallbackTypeNames.Count} unsupported handler type(s)."); + $"Generated CQRS registry for assembly {assemblyName} requested targeted reflection fallback for {reflectionFallbackMetadata.Types.Count} unsupported handler type(s)."); } else { @@ -120,7 +120,7 @@ internal static class CqrsHandlerRegistrar $"Generated CQRS registry for assembly {assemblyName} requested full reflection fallback for unsupported handlers."); } - return GeneratedRegistrationResult.WithReflectionFallback(reflectionFallbackTypeNames); + return GeneratedRegistrationResult.WithReflectionFallback(reflectionFallbackMetadata); } return GeneratedRegistrationResult.FullyHandled(); @@ -142,9 +142,9 @@ internal static class CqrsHandlerRegistrar IServiceCollection services, Assembly assembly, ILogger logger, - IReadOnlyList? reflectionFallbackTypeNames) + ReflectionFallbackMetadata? reflectionFallbackMetadata) { - foreach (var implementationType in GetCandidateHandlerTypes(assembly, logger, reflectionFallbackTypeNames) + foreach (var implementationType in GetCandidateHandlerTypes(assembly, logger, reflectionFallbackMetadata) .Where(IsConcreteHandlerType)) { var handlerInterfaces = implementationType @@ -180,24 +180,51 @@ internal static class CqrsHandlerRegistrar private static IReadOnlyList GetCandidateHandlerTypes( Assembly assembly, ILogger logger, - IReadOnlyList? reflectionFallbackTypeNames) + ReflectionFallbackMetadata? reflectionFallbackMetadata) { - return reflectionFallbackTypeNames is { Count: > 0 } - ? GetNamedFallbackTypes(assembly, reflectionFallbackTypeNames, logger) + return reflectionFallbackMetadata is { HasExplicitTypes: true } + ? reflectionFallbackMetadata.Types : GetLoadableTypes(assembly, logger); } /// - /// 根据生成器记录的类型全名,精确解析仍需运行时补充注册的处理器类型。 + /// 获取生成注册器要求运行时继续补充反射扫描的 handler 元数据。 /// - private static IReadOnlyList GetNamedFallbackTypes( + private static ReflectionFallbackMetadata? GetReflectionFallbackMetadata( Assembly assembly, - IReadOnlyList reflectionFallbackTypeNames, ILogger logger) { var assemblyName = GetAssemblySortKey(assembly); - var resolvedTypes = new List(reflectionFallbackTypeNames.Count); - foreach (var typeName in reflectionFallbackTypeNames + var fallbackAttributes = assembly + .GetCustomAttributes(typeof(CqrsReflectionFallbackAttribute), inherit: false) + .OfType() + .ToList(); + + if (fallbackAttributes.Count == 0) + return null; + + var resolvedTypes = new List(); + foreach (var fallbackType in fallbackAttributes + .SelectMany(static attribute => attribute.FallbackHandlerTypes) + .Where(static type => type is not null) + .Distinct() + .OrderBy(GetTypeSortKey, StringComparer.Ordinal)) + { + if (!string.Equals( + GetAssemblySortKey(fallbackType.Assembly), + assemblyName, + StringComparison.Ordinal)) + { + logger.Warn( + $"Generated CQRS reflection fallback type {fallbackType.FullName} was declared on assembly {assemblyName} but belongs to assembly {GetAssemblySortKey(fallbackType.Assembly)}. Skipping mismatched fallback entry."); + continue; + } + + resolvedTypes.Add(fallbackType); + } + + foreach (var typeName in fallbackAttributes + .SelectMany(static attribute => attribute.FallbackHandlerTypeNames) .Where(static name => !string.IsNullOrWhiteSpace(name)) .Distinct(StringComparer.Ordinal) .OrderBy(static name => name, StringComparer.Ordinal)) @@ -221,9 +248,11 @@ internal static class CqrsHandlerRegistrar } } - return resolvedTypes - .OrderBy(GetTypeSortKey, StringComparer.Ordinal) - .ToList(); + return new ReflectionFallbackMetadata( + resolvedTypes + .Distinct() + .OrderBy(GetTypeSortKey, StringComparer.Ordinal) + .ToArray()); } /// @@ -291,27 +320,6 @@ internal static class CqrsHandlerRegistrar definition == typeof(IStreamRequestHandler<,>); } - /// - /// 获取生成注册器要求运行时继续补充反射扫描的 handler 类型名清单。 - /// - private static IReadOnlyList? GetReflectionFallbackTypeNames(Assembly assembly) - { - var fallbackAttributes = assembly - .GetCustomAttributes(typeof(CqrsReflectionFallbackAttribute), inherit: false) - .OfType() - .ToList(); - - if (fallbackAttributes.Count == 0) - return null; - - return fallbackAttributes - .SelectMany(static attribute => attribute.FallbackHandlerTypeNames) - .Where(static typeName => !string.IsNullOrWhiteSpace(typeName)) - .Distinct(StringComparer.Ordinal) - .OrderBy(static typeName => typeName, StringComparer.Ordinal) - .ToArray(); - } - /// /// 判断同一 handler 映射是否已经由生成注册器或先前扫描步骤写入服务集合。 /// @@ -346,14 +354,14 @@ internal static class CqrsHandlerRegistrar private readonly record struct GeneratedRegistrationResult( bool UsedGeneratedRegistry, bool RequiresReflectionFallback, - IReadOnlyList? ReflectionFallbackTypeNames) + ReflectionFallbackMetadata? ReflectionFallbackMetadata) { public static GeneratedRegistrationResult NoGeneratedRegistry() { return new GeneratedRegistrationResult( UsedGeneratedRegistry: false, RequiresReflectionFallback: false, - ReflectionFallbackTypeNames: null); + ReflectionFallbackMetadata: null); } public static GeneratedRegistrationResult FullyHandled() @@ -361,18 +369,25 @@ internal static class CqrsHandlerRegistrar return new GeneratedRegistrationResult( UsedGeneratedRegistry: true, RequiresReflectionFallback: false, - ReflectionFallbackTypeNames: null); + ReflectionFallbackMetadata: null); } public static GeneratedRegistrationResult WithReflectionFallback( - IReadOnlyList reflectionFallbackTypeNames) + ReflectionFallbackMetadata reflectionFallbackMetadata) { - ArgumentNullException.ThrowIfNull(reflectionFallbackTypeNames); + ArgumentNullException.ThrowIfNull(reflectionFallbackMetadata); return new GeneratedRegistrationResult( UsedGeneratedRegistry: true, RequiresReflectionFallback: true, - ReflectionFallbackTypeNames: reflectionFallbackTypeNames); + ReflectionFallbackMetadata: reflectionFallbackMetadata); } } + + private sealed class ReflectionFallbackMetadata(IReadOnlyList types) + { + public IReadOnlyList Types { get; } = types ?? throw new ArgumentNullException(nameof(types)); + + public bool HasExplicitTypes => Types.Count > 0; + } }