diff --git a/GFramework.Cqrs.Tests/Cqrs/CqrsDispatcherCacheTests.cs b/GFramework.Cqrs.Tests/Cqrs/CqrsDispatcherCacheTests.cs index 5ae794ad..6d6e6a00 100644 --- a/GFramework.Cqrs.Tests/Cqrs/CqrsDispatcherCacheTests.cs +++ b/GFramework.Cqrs.Tests/Cqrs/CqrsDispatcherCacheTests.cs @@ -7,7 +7,7 @@ using GFramework.Cqrs.Abstractions.Cqrs; namespace GFramework.Cqrs.Tests.Cqrs; /// -/// 验证 CQRS dispatcher 会缓存热路径中的服务类型构造结果。 +/// 验证 CQRS dispatcher 会缓存热路径中的服务类型与调用委托。 /// [TestFixture] internal sealed class CqrsDispatcherCacheTests @@ -29,6 +29,7 @@ internal sealed class CqrsDispatcherCacheTests _container.Freeze(); _context = new ArchitectureContext(_container); + ClearDispatcherCaches(); } /// @@ -45,7 +46,7 @@ internal sealed class CqrsDispatcherCacheTests private ArchitectureContext? _context; /// - /// 验证相同消息类型重复分发时,不会重复扩张服务类型缓存。 + /// 验证相同消息类型重复分发时,不会重复扩张服务类型与调用委托缓存。 /// [Test] public async Task Dispatcher_Should_Cache_Service_Types_After_First_Dispatch() @@ -53,8 +54,8 @@ internal sealed class CqrsDispatcherCacheTests var notificationServiceTypes = GetCacheField("NotificationHandlerServiceTypes"); var requestServiceTypes = GetCacheField("RequestServiceTypes"); var streamServiceTypes = GetCacheField("StreamHandlerServiceTypes"); - var requestInvokers = GetCacheField("RequestInvokers"); - var requestPipelineInvokers = GetCacheField("RequestPipelineInvokers"); + var requestInvokers = GetGenericCacheField("RequestInvokerCache`1", typeof(int), "Invokers"); + var requestPipelineInvokers = GetGenericCacheField("RequestPipelineInvokerCache`1", typeof(int), "Invokers"); var notificationInvokers = GetCacheField("NotificationInvokers"); var streamInvokers = GetCacheField("StreamInvokers"); @@ -104,14 +105,42 @@ internal sealed class CqrsDispatcherCacheTests }); } + /// + /// 验证 request 调用委托会按响应类型分别缓存,避免不同响应类型共用 object 结果桥接。 + /// + [Test] + public async Task Dispatcher_Should_Cache_Request_Invokers_Per_Response_Type() + { + var intRequestInvokers = GetGenericCacheField("RequestInvokerCache`1", typeof(int), "Invokers"); + var stringRequestInvokers = GetGenericCacheField("RequestInvokerCache`1", typeof(string), "Invokers"); + + var intBefore = intRequestInvokers.Count; + var stringBefore = stringRequestInvokers.Count; + + await _context!.SendRequestAsync(new DispatcherCacheRequest()); + await _context.SendRequestAsync(new DispatcherStringCacheRequest()); + + var intAfterFirstDispatch = intRequestInvokers.Count; + var stringAfterFirstDispatch = stringRequestInvokers.Count; + + await _context.SendRequestAsync(new DispatcherCacheRequest()); + await _context.SendRequestAsync(new DispatcherStringCacheRequest()); + + Assert.Multiple(() => + { + Assert.That(intAfterFirstDispatch, Is.EqualTo(intBefore + 1)); + Assert.That(stringAfterFirstDispatch, Is.EqualTo(stringBefore + 1)); + Assert.That(intRequestInvokers.Count, Is.EqualTo(intAfterFirstDispatch)); + Assert.That(stringRequestInvokers.Count, Is.EqualTo(stringAfterFirstDispatch)); + }); + } + /// /// 通过反射读取 dispatcher 的静态缓存字典。 /// private static IDictionary GetCacheField(string fieldName) { - var dispatcherType = typeof(CqrsReflectionFallbackAttribute).Assembly - .GetType("GFramework.Cqrs.Internal.CqrsDispatcher", throwOnError: true)!; - + var dispatcherType = GetDispatcherType(); var field = dispatcherType.GetField( fieldName, BindingFlags.NonPublic | BindingFlags.Static); @@ -123,6 +152,56 @@ internal sealed class CqrsDispatcherCacheTests $"Dispatcher cache field {fieldName} does not implement IDictionary."); } + /// + /// 清空本测试依赖的 dispatcher 静态缓存,避免跨用例共享进程级状态导致断言漂移。 + /// + private static void ClearDispatcherCaches() + { + GetCacheField("NotificationHandlerServiceTypes").Clear(); + GetCacheField("RequestServiceTypes").Clear(); + GetCacheField("StreamHandlerServiceTypes").Clear(); + GetCacheField("NotificationInvokers").Clear(); + GetCacheField("StreamInvokers").Clear(); + GetGenericCacheField("RequestInvokerCache`1", typeof(int), "Invokers").Clear(); + GetGenericCacheField("RequestInvokerCache`1", typeof(string), "Invokers").Clear(); + GetGenericCacheField("RequestPipelineInvokerCache`1", typeof(int), "Invokers").Clear(); + } + + /// + /// 通过反射读取 dispatcher 嵌套泛型缓存类型上的静态缓存字典。 + /// + private static IDictionary GetGenericCacheField(string nestedTypeName, Type genericTypeArgument, string fieldName) + { + var nestedGenericType = GetDispatcherType().GetNestedType( + nestedTypeName, + BindingFlags.NonPublic); + + Assert.That(nestedGenericType, Is.Not.Null, $"Missing dispatcher nested cache type {nestedTypeName}."); + + var closedNestedType = nestedGenericType!.MakeGenericType(genericTypeArgument); + var field = closedNestedType.GetField( + fieldName, + BindingFlags.NonPublic | BindingFlags.Static); + + Assert.That( + field, + Is.Not.Null, + $"Missing dispatcher nested cache field {nestedTypeName}.{fieldName} for {genericTypeArgument.FullName}."); + + return field!.GetValue(null) as IDictionary + ?? throw new InvalidOperationException( + $"Dispatcher nested cache field {nestedTypeName}.{fieldName} does not implement IDictionary."); + } + + /// + /// 获取 CQRS dispatcher 运行时类型。 + /// + private static Type GetDispatcherType() + { + return typeof(CqrsReflectionFallbackAttribute).Assembly + .GetType("GFramework.Cqrs.Internal.CqrsDispatcher", throwOnError: true)!; + } + /// /// 消费整个异步流,确保建流路径被真实执行。 /// @@ -154,6 +233,11 @@ internal sealed record DispatcherCacheStreamRequest : IStreamRequest; /// internal sealed record DispatcherPipelineCacheRequest : IRequest; +/// +/// 用于验证按响应类型分层 request invoker 缓存的测试请求。 +/// +internal sealed record DispatcherStringCacheRequest : IRequest; + /// /// 处理 。 /// @@ -213,6 +297,20 @@ internal sealed class DispatcherPipelineCacheRequestHandler : IRequestHandler +/// 处理 。 +/// +internal sealed class DispatcherStringCacheRequestHandler : IRequestHandler +{ + /// + /// 返回固定字符串,供按响应类型缓存测试验证 string 路径。 + /// + public ValueTask Handle(DispatcherStringCacheRequest request, CancellationToken cancellationToken) + { + return ValueTask.FromResult("dispatcher-cache"); + } +} + /// /// 为 提供最小 pipeline 行为, /// 用于命中 dispatcher 的 pipeline invoker 缓存分支。 diff --git a/GFramework.Cqrs/Internal/CqrsDispatcher.cs b/GFramework.Cqrs/Internal/CqrsDispatcher.cs index 002b7edc..a6d62f96 100644 --- a/GFramework.Cqrs/Internal/CqrsDispatcher.cs +++ b/GFramework.Cqrs/Internal/CqrsDispatcher.cs @@ -15,16 +15,6 @@ internal sealed class CqrsDispatcher( IIocContainer container, ILogger logger) : ICqrsRuntime { - // 进程级缓存:按请求/响应类型缓存直接处理器调用委托,避免热路径重复反射。 - // 线程安全依赖 ConcurrentDictionary;缓存与进程同寿命,默认假设请求类型集合有限且稳定。 - private static readonly ConcurrentDictionary<(Type RequestType, Type ResponseType), RequestInvoker> - RequestInvokers = new(); - - // 进程级缓存:缓存带 pipeline 的请求调用委托,减少每次分发时的反射与表达式重建开销。 - // 若后续引入动态生成请求类型,需要重新评估该缓存的增长边界。 - private static readonly ConcurrentDictionary<(Type RequestType, Type ResponseType), RequestPipelineInvoker> - RequestPipelineInvokers = new(); - // 进程级缓存:缓存通知调用委托,复用并发安全字典以支撑多线程发布路径。 private static readonly ConcurrentDictionary NotificationInvokers = new(); @@ -131,20 +121,18 @@ internal sealed class CqrsDispatcher( if (behaviors.Count == 0) { - var invoker = RequestInvokers.GetOrAdd( - (requestType, typeof(TResponse)), - static key => CreateRequestInvoker(key.RequestType, key.ResponseType)); + var invoker = RequestInvokerCache.Invokers.GetOrAdd( + requestType, + CreateRequestInvoker); - var result = await invoker(handler, request, cancellationToken); - return result is null ? default! : (TResponse)result; + return await invoker(handler, request, cancellationToken); } - var pipelineInvoker = RequestPipelineInvokers.GetOrAdd( - (requestType, typeof(TResponse)), - static key => CreateRequestPipelineInvoker(key.RequestType, key.ResponseType)); + var pipelineInvoker = RequestPipelineInvokerCache.Invokers.GetOrAdd( + requestType, + CreateRequestPipelineInvoker); - var pipelineResult = await pipelineInvoker(handler, behaviors, request, cancellationToken); - return pipelineResult is null ? default! : (TResponse)pipelineResult; + return await pipelineInvoker(handler, behaviors, request, cancellationToken); } /// @@ -200,21 +188,23 @@ internal sealed class CqrsDispatcher( /// /// 生成请求处理器调用委托,避免每次发送都重复反射。 /// - private static RequestInvoker CreateRequestInvoker(Type requestType, Type responseType) + private static RequestInvoker CreateRequestInvoker(Type requestType) { var method = RequestHandlerInvokerMethodDefinition - .MakeGenericMethod(requestType, responseType); - return (RequestInvoker)Delegate.CreateDelegate(typeof(RequestInvoker), method); + .MakeGenericMethod(requestType, typeof(TResponse)); + return (RequestInvoker)Delegate.CreateDelegate(typeof(RequestInvoker), method); } /// /// 生成带管道行为的请求处理委托,避免每次发送都重复反射。 /// - private static RequestPipelineInvoker CreateRequestPipelineInvoker(Type requestType, Type responseType) + private static RequestPipelineInvoker CreateRequestPipelineInvoker(Type requestType) { var method = RequestPipelineInvokerMethodDefinition - .MakeGenericMethod(requestType, responseType); - return (RequestPipelineInvoker)Delegate.CreateDelegate(typeof(RequestPipelineInvoker), method); + .MakeGenericMethod(requestType, typeof(TResponse)); + return (RequestPipelineInvoker)Delegate.CreateDelegate( + typeof(RequestPipelineInvoker), + method); } /// @@ -240,7 +230,7 @@ internal sealed class CqrsDispatcher( /// /// 执行已强类型化的请求处理器调用。 /// - private static async ValueTask InvokeRequestHandlerAsync( + private static ValueTask InvokeRequestHandlerAsync( object handler, object request, CancellationToken cancellationToken) @@ -248,14 +238,13 @@ internal sealed class CqrsDispatcher( { var typedHandler = (IRequestHandler)handler; var typedRequest = (TRequest)request; - var result = await typedHandler.Handle(typedRequest, cancellationToken); - return result; + return typedHandler.Handle(typedRequest, cancellationToken); } /// /// 执行包含管道行为链的请求处理。 /// - private static async ValueTask InvokeRequestPipelineAsync( + private static ValueTask InvokeRequestPipelineAsync( object handler, IReadOnlyList behaviors, object request, @@ -275,8 +264,7 @@ internal sealed class CqrsDispatcher( next = (message, token) => behavior.Handle(message, currentNext, token); } - var result = await next(typedRequest, cancellationToken); - return result; + return next(typedRequest, cancellationToken); } /// @@ -307,10 +295,12 @@ internal sealed class CqrsDispatcher( return typedHandler.Handle(typedRequest, cancellationToken); } - private delegate ValueTask RequestInvoker(object handler, object request, + private delegate ValueTask RequestInvoker( + object handler, + object request, CancellationToken cancellationToken); - private delegate ValueTask RequestPipelineInvoker( + private delegate ValueTask RequestPipelineInvoker( object handler, IReadOnlyList behaviors, object request, @@ -321,5 +311,23 @@ internal sealed class CqrsDispatcher( private delegate object StreamInvoker(object handler, object request, CancellationToken cancellationToken); + /// + /// 按响应类型分层缓存 request 处理器调用委托,避免 value-type 响应在 object 桥接中产生装箱。 + /// + /// 请求响应类型。 + private static class RequestInvokerCache + { + internal static readonly ConcurrentDictionary> Invokers = new(); + } + + /// + /// 按响应类型分层缓存带 pipeline 的 request 调用委托,避免 pipeline 热路径上的额外装箱。 + /// + /// 请求响应类型。 + private static class RequestPipelineInvokerCache + { + internal static readonly ConcurrentDictionary> Invokers = new(); + } + private readonly record struct RequestServiceTypeSet(Type HandlerType, Type BehaviorType); }