diff --git a/GFramework.Cqrs.Tests/Cqrs/CqrsDispatcherCacheTests.cs b/GFramework.Cqrs.Tests/Cqrs/CqrsDispatcherCacheTests.cs index badd7490..b16995e4 100644 --- a/GFramework.Cqrs.Tests/Cqrs/CqrsDispatcherCacheTests.cs +++ b/GFramework.Cqrs.Tests/Cqrs/CqrsDispatcherCacheTests.cs @@ -7,14 +7,11 @@ using GFramework.Cqrs.Abstractions.Cqrs; namespace GFramework.Cqrs.Tests.Cqrs; /// -/// 验证 CQRS dispatcher 会缓存热路径中的服务类型与调用委托。 +/// 验证 CQRS dispatcher 会缓存热路径中的 dispatch binding。 /// [TestFixture] internal sealed class CqrsDispatcherCacheTests { - private MicrosoftDiContainer? _container; - private ArchitectureContext? _context; - /// /// 初始化测试上下文。 /// @@ -45,40 +42,31 @@ internal sealed class CqrsDispatcherCacheTests _container = null; } + private MicrosoftDiContainer? _container; + private ArchitectureContext? _context; + /// - /// 验证相同消息类型重复分发时,不会重复扩张服务类型与调用委托缓存。 + /// 验证相同消息类型重复分发时,不会重复扩张 dispatch binding 缓存。 /// [Test] - public async Task Dispatcher_Should_Cache_Service_Types_After_First_Dispatch() + public async Task Dispatcher_Should_Cache_Dispatch_Bindings_After_First_Dispatch() { - var notificationServiceTypes = GetCacheField("NotificationHandlerServiceTypes"); - var requestServiceTypes = GetCacheField("RequestServiceTypes"); - var streamServiceTypes = GetCacheField("StreamHandlerServiceTypes"); - var requestInvokers = GetGenericCacheField("RequestInvokerCache`1", typeof(int), "Invokers"); - var requestPipelineInvokers = GetGenericCacheField("RequestPipelineInvokerCache`1", typeof(int), "Invokers"); - var notificationInvokers = GetCacheField("NotificationInvokers"); - var streamInvokers = GetCacheField("StreamInvokers"); + var notificationBindings = GetCacheField("NotificationDispatchBindings"); + var requestBindings = GetGenericCacheField("RequestDispatchBindingCache`1", typeof(int), "Bindings"); + var streamBindings = GetCacheField("StreamDispatchBindings"); - var notificationBefore = notificationServiceTypes.Count; - var requestBefore = requestServiceTypes.Count; - var streamBefore = streamServiceTypes.Count; - var requestInvokersBefore = requestInvokers.Count; - var requestPipelineInvokersBefore = requestPipelineInvokers.Count; - var notificationInvokersBefore = notificationInvokers.Count; - var streamInvokersBefore = streamInvokers.Count; + var notificationBefore = notificationBindings.Count; + var requestBefore = requestBindings.Count; + var streamBefore = streamBindings.Count; await _context!.SendRequestAsync(new DispatcherCacheRequest()); await _context.SendRequestAsync(new DispatcherPipelineCacheRequest()); await _context.PublishAsync(new DispatcherCacheNotification()); await DrainAsync(_context.CreateStream(new DispatcherCacheStreamRequest())); - var notificationAfterFirstDispatch = notificationServiceTypes.Count; - var requestAfterFirstDispatch = requestServiceTypes.Count; - var streamAfterFirstDispatch = streamServiceTypes.Count; - var requestInvokersAfterFirstDispatch = requestInvokers.Count; - var requestPipelineInvokersAfterFirstDispatch = requestPipelineInvokers.Count; - var notificationInvokersAfterFirstDispatch = notificationInvokers.Count; - var streamInvokersAfterFirstDispatch = streamInvokers.Count; + var notificationAfterFirstDispatch = notificationBindings.Count; + var requestAfterFirstDispatch = requestBindings.Count; + var streamAfterFirstDispatch = streamBindings.Count; await _context.SendRequestAsync(new DispatcherCacheRequest()); await _context.SendRequestAsync(new DispatcherPipelineCacheRequest()); @@ -90,38 +78,30 @@ internal sealed class CqrsDispatcherCacheTests Assert.That(notificationAfterFirstDispatch, Is.EqualTo(notificationBefore + 1)); Assert.That(requestAfterFirstDispatch, Is.EqualTo(requestBefore + 2)); Assert.That(streamAfterFirstDispatch, Is.EqualTo(streamBefore + 1)); - Assert.That(requestInvokersAfterFirstDispatch, Is.EqualTo(requestInvokersBefore + 1)); - Assert.That(requestPipelineInvokersAfterFirstDispatch, Is.EqualTo(requestPipelineInvokersBefore + 1)); - Assert.That(notificationInvokersAfterFirstDispatch, Is.EqualTo(notificationInvokersBefore + 1)); - Assert.That(streamInvokersAfterFirstDispatch, Is.EqualTo(streamInvokersBefore + 1)); - Assert.That(notificationServiceTypes.Count, Is.EqualTo(notificationAfterFirstDispatch)); - Assert.That(requestServiceTypes.Count, Is.EqualTo(requestAfterFirstDispatch)); - Assert.That(streamServiceTypes.Count, Is.EqualTo(streamAfterFirstDispatch)); - Assert.That(requestInvokers.Count, Is.EqualTo(requestInvokersAfterFirstDispatch)); - Assert.That(requestPipelineInvokers.Count, Is.EqualTo(requestPipelineInvokersAfterFirstDispatch)); - Assert.That(notificationInvokers.Count, Is.EqualTo(notificationInvokersAfterFirstDispatch)); - Assert.That(streamInvokers.Count, Is.EqualTo(streamInvokersAfterFirstDispatch)); + Assert.That(notificationBindings.Count, Is.EqualTo(notificationAfterFirstDispatch)); + Assert.That(requestBindings.Count, Is.EqualTo(requestAfterFirstDispatch)); + Assert.That(streamBindings.Count, Is.EqualTo(streamAfterFirstDispatch)); }); } /// - /// 验证 request 调用委托会按响应类型分别缓存,避免不同响应类型共用 object 结果桥接。 + /// 验证 request dispatch binding 会按响应类型分别缓存,避免不同响应类型共用 object 结果桥接。 /// [Test] - public async Task Dispatcher_Should_Cache_Request_Invokers_Per_Response_Type() + public async Task Dispatcher_Should_Cache_Request_Dispatch_Bindings_Per_Response_Type() { - var intRequestInvokers = GetGenericCacheField("RequestInvokerCache`1", typeof(int), "Invokers"); - var stringRequestInvokers = GetGenericCacheField("RequestInvokerCache`1", typeof(string), "Invokers"); + var intRequestBindings = GetGenericCacheField("RequestDispatchBindingCache`1", typeof(int), "Bindings"); + var stringRequestBindings = GetGenericCacheField("RequestDispatchBindingCache`1", typeof(string), "Bindings"); - var intBefore = intRequestInvokers.Count; - var stringBefore = stringRequestInvokers.Count; + var intBefore = intRequestBindings.Count; + var stringBefore = stringRequestBindings.Count; await _context!.SendRequestAsync(new DispatcherCacheRequest()); await _context.SendRequestAsync(new DispatcherStringCacheRequest()); - var intAfterFirstDispatch = intRequestInvokers.Count; - var stringAfterFirstDispatch = stringRequestInvokers.Count; + var intAfterFirstDispatch = intRequestBindings.Count; + var stringAfterFirstDispatch = stringRequestBindings.Count; await _context.SendRequestAsync(new DispatcherCacheRequest()); await _context.SendRequestAsync(new DispatcherStringCacheRequest()); @@ -130,8 +110,8 @@ internal sealed class CqrsDispatcherCacheTests { Assert.That(intAfterFirstDispatch, Is.EqualTo(intBefore + 1)); Assert.That(stringAfterFirstDispatch, Is.EqualTo(stringBefore + 1)); - Assert.That(intRequestInvokers.Count, Is.EqualTo(intAfterFirstDispatch)); - Assert.That(stringRequestInvokers.Count, Is.EqualTo(stringAfterFirstDispatch)); + Assert.That(intRequestBindings.Count, Is.EqualTo(intAfterFirstDispatch)); + Assert.That(stringRequestBindings.Count, Is.EqualTo(stringAfterFirstDispatch)); }); } @@ -157,15 +137,10 @@ internal sealed class CqrsDispatcherCacheTests /// private static void ClearDispatcherCaches() { - GetCacheField("NotificationHandlerServiceTypes").Clear(); - GetCacheField("RequestServiceTypes").Clear(); - GetCacheField("StreamHandlerServiceTypes").Clear(); - GetCacheField("NotificationInvokers").Clear(); - GetCacheField("StreamInvokers").Clear(); - GetGenericCacheField("RequestInvokerCache`1", typeof(int), "Invokers").Clear(); - GetGenericCacheField("RequestInvokerCache`1", typeof(string), "Invokers").Clear(); - GetGenericCacheField("RequestPipelineInvokerCache`1", typeof(int), "Invokers").Clear(); - GetGenericCacheField("RequestPipelineInvokerCache`1", typeof(string), "Invokers").Clear(); + GetCacheField("NotificationDispatchBindings").Clear(); + GetCacheField("StreamDispatchBindings").Clear(); + GetGenericCacheField("RequestDispatchBindingCache`1", typeof(int), "Bindings").Clear(); + GetGenericCacheField("RequestDispatchBindingCache`1", typeof(string), "Bindings").Clear(); } /// diff --git a/GFramework.Cqrs/Internal/CqrsDispatcher.cs b/GFramework.Cqrs/Internal/CqrsDispatcher.cs index a6d62f96..e396998c 100644 --- a/GFramework.Cqrs/Internal/CqrsDispatcher.cs +++ b/GFramework.Cqrs/Internal/CqrsDispatcher.cs @@ -15,24 +15,13 @@ internal sealed class CqrsDispatcher( IIocContainer container, ILogger logger) : ICqrsRuntime { - // 进程级缓存:缓存通知调用委托,复用并发安全字典以支撑多线程发布路径。 - private static readonly ConcurrentDictionary NotificationInvokers = new(); + // 进程级缓存:把通知服务类型与调用委托绑定到同一项,减少发布热路径上的重复字典查询。 + private static readonly ConcurrentDictionary + NotificationDispatchBindings = new(); - // 进程级缓存:缓存通知处理器服务类型,避免每次发布都重复 MakeGenericType。 - private static readonly ConcurrentDictionary NotificationHandlerServiceTypes = new(); - - // 进程级缓存:缓存流式请求调用委托,避免每次创建流时重复解析反射签名。 - private static readonly ConcurrentDictionary<(Type RequestType, Type ResponseType), StreamInvoker> StreamInvokers = - new(); - - // 进程级缓存:缓存请求处理器与 pipeline 行为的服务类型,减少热路径中的泛型类型构造。 - private static readonly ConcurrentDictionary<(Type RequestType, Type ResponseType), RequestServiceTypeSet> - RequestServiceTypes = new(); - - // 进程级缓存:缓存流式请求处理器服务类型,避免每次建流时重复 MakeGenericType。 - private static readonly ConcurrentDictionary<(Type RequestType, Type ResponseType), Type> - StreamHandlerServiceTypes = - new(); + // 进程级缓存:把流式处理器服务类型与调用委托绑定到同一项,减少建流热路径上的重复字典查询。 + private static readonly ConcurrentDictionary<(Type RequestType, Type ResponseType), StreamDispatchBinding> + StreamDispatchBindings = new(); // 静态方法定义缓存:这些反射查找与消息类型无关,只需解析一次即可复用。 private static readonly MethodInfo RequestHandlerInvokerMethodDefinition = typeof(CqrsDispatcher) @@ -64,10 +53,10 @@ internal sealed class CqrsDispatcher( ArgumentNullException.ThrowIfNull(notification); var notificationType = notification.GetType(); - var handlerType = NotificationHandlerServiceTypes.GetOrAdd( + var dispatchBinding = NotificationDispatchBindings.GetOrAdd( notificationType, - static type => typeof(INotificationHandler<>).MakeGenericType(type)); - var handlers = container.GetAll(handlerType); + CreateNotificationDispatchBinding); + var handlers = container.GetAll(dispatchBinding.HandlerType); if (handlers.Count == 0) { @@ -75,14 +64,10 @@ internal sealed class CqrsDispatcher( return; } - var invoker = NotificationInvokers.GetOrAdd( - notificationType, - CreateNotificationInvoker); - foreach (var handler in handlers) { PrepareHandler(handler, context); - await invoker(handler, notification, cancellationToken); + await dispatchBinding.Invoker(handler, notification, cancellationToken); } } @@ -103,36 +88,23 @@ internal sealed class CqrsDispatcher( ArgumentNullException.ThrowIfNull(request); var requestType = request.GetType(); - var serviceTypes = RequestServiceTypes.GetOrAdd( - (requestType, typeof(TResponse)), - static key => new RequestServiceTypeSet( - typeof(IRequestHandler<,>).MakeGenericType(key.RequestType, key.ResponseType), - typeof(IPipelineBehavior<,>).MakeGenericType(key.RequestType, key.ResponseType))); - var handlerType = serviceTypes.HandlerType; - var handler = container.Get(handlerType) + var dispatchBinding = RequestDispatchBindingCache.Bindings.GetOrAdd( + requestType, + CreateRequestDispatchBinding); + var handler = container.Get(dispatchBinding.HandlerType) ?? throw new InvalidOperationException( $"No CQRS request handler registered for {requestType.FullName}."); PrepareHandler(handler, context); - var behaviors = container.GetAll(serviceTypes.BehaviorType); + var behaviors = container.GetAll(dispatchBinding.BehaviorType); foreach (var behavior in behaviors) PrepareHandler(behavior, context); if (behaviors.Count == 0) - { - var invoker = RequestInvokerCache.Invokers.GetOrAdd( - requestType, - CreateRequestInvoker); + return await dispatchBinding.RequestInvoker(handler, request, cancellationToken); - return await invoker(handler, request, cancellationToken); - } - - var pipelineInvoker = RequestPipelineInvokerCache.Invokers.GetOrAdd( - requestType, - CreateRequestPipelineInvoker); - - return await pipelineInvoker(handler, behaviors, request, cancellationToken); + return await dispatchBinding.PipelineInvoker(handler, behaviors, request, cancellationToken); } /// @@ -152,20 +124,16 @@ internal sealed class CqrsDispatcher( ArgumentNullException.ThrowIfNull(request); var requestType = request.GetType(); - var handlerType = StreamHandlerServiceTypes.GetOrAdd( + var dispatchBinding = StreamDispatchBindings.GetOrAdd( (requestType, typeof(TResponse)), - static key => typeof(IStreamRequestHandler<,>).MakeGenericType(key.RequestType, key.ResponseType)); - var handler = container.Get(handlerType) + static key => CreateStreamDispatchBinding(key.RequestType, key.ResponseType)); + var handler = container.Get(dispatchBinding.HandlerType) ?? throw new InvalidOperationException( $"No CQRS stream handler registered for {requestType.FullName}."); PrepareHandler(handler, context); - var invoker = StreamInvokers.GetOrAdd( - (requestType, typeof(TResponse)), - static key => CreateStreamInvoker(key.RequestType, key.ResponseType)); - - return (IAsyncEnumerable)invoker(handler, request, cancellationToken); + return (IAsyncEnumerable)dispatchBinding.Invoker(handler, request, cancellationToken); } /// @@ -185,6 +153,38 @@ internal sealed class CqrsDispatcher( } } + /// + /// 为指定请求类型构造完整分发绑定,把服务类型与强类型调用委托一次性收敛到同一缓存项。 + /// + private static RequestDispatchBinding CreateRequestDispatchBinding(Type requestType) + { + return new RequestDispatchBinding( + typeof(IRequestHandler<,>).MakeGenericType(requestType, typeof(TResponse)), + typeof(IPipelineBehavior<,>).MakeGenericType(requestType, typeof(TResponse)), + CreateRequestInvoker(requestType), + CreateRequestPipelineInvoker(requestType)); + } + + /// + /// 为指定通知类型构造完整分发绑定,把服务类型与调用委托聚合到同一缓存项。 + /// + private static NotificationDispatchBinding CreateNotificationDispatchBinding(Type notificationType) + { + return new NotificationDispatchBinding( + typeof(INotificationHandler<>).MakeGenericType(notificationType), + CreateNotificationInvoker(notificationType)); + } + + /// + /// 为指定流式请求类型构造完整分发绑定,把服务类型与调用委托聚合到同一缓存项。 + /// + private static StreamDispatchBinding CreateStreamDispatchBinding(Type requestType, Type responseType) + { + return new StreamDispatchBinding( + typeof(IStreamRequestHandler<,>).MakeGenericType(requestType, responseType), + CreateStreamInvoker(requestType, responseType)); + } + /// /// 生成请求处理器调用委托,避免每次发送都重复反射。 /// @@ -312,22 +312,22 @@ internal sealed class CqrsDispatcher( private delegate object StreamInvoker(object handler, object request, CancellationToken cancellationToken); /// - /// 按响应类型分层缓存 request 处理器调用委托,避免 value-type 响应在 object 桥接中产生装箱。 + /// 按响应类型分层缓存 request 分发绑定,既避免 value-type 响应走 object 桥接, + /// 也让 handler/pipeline 服务类型与调用委托在热路径上只命中一次缓存。 /// /// 请求响应类型。 - private static class RequestInvokerCache + private static class RequestDispatchBindingCache { - internal static readonly ConcurrentDictionary> Invokers = new(); + internal static readonly ConcurrentDictionary> Bindings = new(); } - /// - /// 按响应类型分层缓存带 pipeline 的 request 调用委托,避免 pipeline 热路径上的额外装箱。 - /// - /// 请求响应类型。 - private static class RequestPipelineInvokerCache - { - internal static readonly ConcurrentDictionary> Invokers = new(); - } + private readonly record struct NotificationDispatchBinding(Type HandlerType, NotificationInvoker Invoker); - private readonly record struct RequestServiceTypeSet(Type HandlerType, Type BehaviorType); + private readonly record struct StreamDispatchBinding(Type HandlerType, StreamInvoker Invoker); + + private readonly record struct RequestDispatchBinding( + Type HandlerType, + Type BehaviorType, + RequestInvoker RequestInvoker, + RequestPipelineInvoker PipelineInvoker); }