From be59dc7f27ea1b78294cfb0a0b9b7192ad2d22e4 Mon Sep 17 00:00:00 2001
From: GeWuYou <95328647+GeWuYou@users.noreply.github.com>
Date: Fri, 17 Apr 2026 14:21:56 +0800
Subject: [PATCH] =?UTF-8?q?refactor(cqrs):=20=E4=BC=98=E5=8C=96=E8=B0=83?=
=?UTF-8?q?=E5=BA=A6=E5=99=A8=E7=BC=93=E5=AD=98=E7=BB=93=E6=9E=84=E6=8F=90?=
=?UTF-8?q?=E5=8D=87=E6=80=A7=E8=83=BD?=
MIME-Version: 1.0
Content-Type: text/plain; charset=UTF-8
Content-Transfer-Encoding: 8bit
- 将通知、请求和流式处理的缓存从多个独立字典合并为统一的分发绑定结构
- 减少热路径上的重复字典查询操作,提升分发性能
- 重构请求分发绑定缓存,按响应类型分层避免值类型装箱开销
- 添加完整的调度绑定单元测试验证缓存行为正确性
- 简化通知和流式处理的缓存查找逻辑,统一调用模式
---
.../Cqrs/CqrsDispatcherCacheTests.cs | 89 +++++-------
GFramework.Cqrs/Internal/CqrsDispatcher.cs | 130 +++++++++---------
2 files changed, 97 insertions(+), 122 deletions(-)
diff --git a/GFramework.Cqrs.Tests/Cqrs/CqrsDispatcherCacheTests.cs b/GFramework.Cqrs.Tests/Cqrs/CqrsDispatcherCacheTests.cs
index badd7490..b16995e4 100644
--- a/GFramework.Cqrs.Tests/Cqrs/CqrsDispatcherCacheTests.cs
+++ b/GFramework.Cqrs.Tests/Cqrs/CqrsDispatcherCacheTests.cs
@@ -7,14 +7,11 @@ using GFramework.Cqrs.Abstractions.Cqrs;
namespace GFramework.Cqrs.Tests.Cqrs;
///
-/// 验证 CQRS dispatcher 会缓存热路径中的服务类型与调用委托。
+/// 验证 CQRS dispatcher 会缓存热路径中的 dispatch binding。
///
[TestFixture]
internal sealed class CqrsDispatcherCacheTests
{
- private MicrosoftDiContainer? _container;
- private ArchitectureContext? _context;
-
///
/// 初始化测试上下文。
///
@@ -45,40 +42,31 @@ internal sealed class CqrsDispatcherCacheTests
_container = null;
}
+ private MicrosoftDiContainer? _container;
+ private ArchitectureContext? _context;
+
///
- /// 验证相同消息类型重复分发时,不会重复扩张服务类型与调用委托缓存。
+ /// 验证相同消息类型重复分发时,不会重复扩张 dispatch binding 缓存。
///
[Test]
- public async Task Dispatcher_Should_Cache_Service_Types_After_First_Dispatch()
+ public async Task Dispatcher_Should_Cache_Dispatch_Bindings_After_First_Dispatch()
{
- var notificationServiceTypes = GetCacheField("NotificationHandlerServiceTypes");
- var requestServiceTypes = GetCacheField("RequestServiceTypes");
- var streamServiceTypes = GetCacheField("StreamHandlerServiceTypes");
- var requestInvokers = GetGenericCacheField("RequestInvokerCache`1", typeof(int), "Invokers");
- var requestPipelineInvokers = GetGenericCacheField("RequestPipelineInvokerCache`1", typeof(int), "Invokers");
- var notificationInvokers = GetCacheField("NotificationInvokers");
- var streamInvokers = GetCacheField("StreamInvokers");
+ var notificationBindings = GetCacheField("NotificationDispatchBindings");
+ var requestBindings = GetGenericCacheField("RequestDispatchBindingCache`1", typeof(int), "Bindings");
+ var streamBindings = GetCacheField("StreamDispatchBindings");
- var notificationBefore = notificationServiceTypes.Count;
- var requestBefore = requestServiceTypes.Count;
- var streamBefore = streamServiceTypes.Count;
- var requestInvokersBefore = requestInvokers.Count;
- var requestPipelineInvokersBefore = requestPipelineInvokers.Count;
- var notificationInvokersBefore = notificationInvokers.Count;
- var streamInvokersBefore = streamInvokers.Count;
+ var notificationBefore = notificationBindings.Count;
+ var requestBefore = requestBindings.Count;
+ var streamBefore = streamBindings.Count;
await _context!.SendRequestAsync(new DispatcherCacheRequest());
await _context.SendRequestAsync(new DispatcherPipelineCacheRequest());
await _context.PublishAsync(new DispatcherCacheNotification());
await DrainAsync(_context.CreateStream(new DispatcherCacheStreamRequest()));
- var notificationAfterFirstDispatch = notificationServiceTypes.Count;
- var requestAfterFirstDispatch = requestServiceTypes.Count;
- var streamAfterFirstDispatch = streamServiceTypes.Count;
- var requestInvokersAfterFirstDispatch = requestInvokers.Count;
- var requestPipelineInvokersAfterFirstDispatch = requestPipelineInvokers.Count;
- var notificationInvokersAfterFirstDispatch = notificationInvokers.Count;
- var streamInvokersAfterFirstDispatch = streamInvokers.Count;
+ var notificationAfterFirstDispatch = notificationBindings.Count;
+ var requestAfterFirstDispatch = requestBindings.Count;
+ var streamAfterFirstDispatch = streamBindings.Count;
await _context.SendRequestAsync(new DispatcherCacheRequest());
await _context.SendRequestAsync(new DispatcherPipelineCacheRequest());
@@ -90,38 +78,30 @@ internal sealed class CqrsDispatcherCacheTests
Assert.That(notificationAfterFirstDispatch, Is.EqualTo(notificationBefore + 1));
Assert.That(requestAfterFirstDispatch, Is.EqualTo(requestBefore + 2));
Assert.That(streamAfterFirstDispatch, Is.EqualTo(streamBefore + 1));
- Assert.That(requestInvokersAfterFirstDispatch, Is.EqualTo(requestInvokersBefore + 1));
- Assert.That(requestPipelineInvokersAfterFirstDispatch, Is.EqualTo(requestPipelineInvokersBefore + 1));
- Assert.That(notificationInvokersAfterFirstDispatch, Is.EqualTo(notificationInvokersBefore + 1));
- Assert.That(streamInvokersAfterFirstDispatch, Is.EqualTo(streamInvokersBefore + 1));
- Assert.That(notificationServiceTypes.Count, Is.EqualTo(notificationAfterFirstDispatch));
- Assert.That(requestServiceTypes.Count, Is.EqualTo(requestAfterFirstDispatch));
- Assert.That(streamServiceTypes.Count, Is.EqualTo(streamAfterFirstDispatch));
- Assert.That(requestInvokers.Count, Is.EqualTo(requestInvokersAfterFirstDispatch));
- Assert.That(requestPipelineInvokers.Count, Is.EqualTo(requestPipelineInvokersAfterFirstDispatch));
- Assert.That(notificationInvokers.Count, Is.EqualTo(notificationInvokersAfterFirstDispatch));
- Assert.That(streamInvokers.Count, Is.EqualTo(streamInvokersAfterFirstDispatch));
+ Assert.That(notificationBindings.Count, Is.EqualTo(notificationAfterFirstDispatch));
+ Assert.That(requestBindings.Count, Is.EqualTo(requestAfterFirstDispatch));
+ Assert.That(streamBindings.Count, Is.EqualTo(streamAfterFirstDispatch));
});
}
///
- /// 验证 request 调用委托会按响应类型分别缓存,避免不同响应类型共用 object 结果桥接。
+ /// 验证 request dispatch binding 会按响应类型分别缓存,避免不同响应类型共用 object 结果桥接。
///
[Test]
- public async Task Dispatcher_Should_Cache_Request_Invokers_Per_Response_Type()
+ public async Task Dispatcher_Should_Cache_Request_Dispatch_Bindings_Per_Response_Type()
{
- var intRequestInvokers = GetGenericCacheField("RequestInvokerCache`1", typeof(int), "Invokers");
- var stringRequestInvokers = GetGenericCacheField("RequestInvokerCache`1", typeof(string), "Invokers");
+ var intRequestBindings = GetGenericCacheField("RequestDispatchBindingCache`1", typeof(int), "Bindings");
+ var stringRequestBindings = GetGenericCacheField("RequestDispatchBindingCache`1", typeof(string), "Bindings");
- var intBefore = intRequestInvokers.Count;
- var stringBefore = stringRequestInvokers.Count;
+ var intBefore = intRequestBindings.Count;
+ var stringBefore = stringRequestBindings.Count;
await _context!.SendRequestAsync(new DispatcherCacheRequest());
await _context.SendRequestAsync(new DispatcherStringCacheRequest());
- var intAfterFirstDispatch = intRequestInvokers.Count;
- var stringAfterFirstDispatch = stringRequestInvokers.Count;
+ var intAfterFirstDispatch = intRequestBindings.Count;
+ var stringAfterFirstDispatch = stringRequestBindings.Count;
await _context.SendRequestAsync(new DispatcherCacheRequest());
await _context.SendRequestAsync(new DispatcherStringCacheRequest());
@@ -130,8 +110,8 @@ internal sealed class CqrsDispatcherCacheTests
{
Assert.That(intAfterFirstDispatch, Is.EqualTo(intBefore + 1));
Assert.That(stringAfterFirstDispatch, Is.EqualTo(stringBefore + 1));
- Assert.That(intRequestInvokers.Count, Is.EqualTo(intAfterFirstDispatch));
- Assert.That(stringRequestInvokers.Count, Is.EqualTo(stringAfterFirstDispatch));
+ Assert.That(intRequestBindings.Count, Is.EqualTo(intAfterFirstDispatch));
+ Assert.That(stringRequestBindings.Count, Is.EqualTo(stringAfterFirstDispatch));
});
}
@@ -157,15 +137,10 @@ internal sealed class CqrsDispatcherCacheTests
///
private static void ClearDispatcherCaches()
{
- GetCacheField("NotificationHandlerServiceTypes").Clear();
- GetCacheField("RequestServiceTypes").Clear();
- GetCacheField("StreamHandlerServiceTypes").Clear();
- GetCacheField("NotificationInvokers").Clear();
- GetCacheField("StreamInvokers").Clear();
- GetGenericCacheField("RequestInvokerCache`1", typeof(int), "Invokers").Clear();
- GetGenericCacheField("RequestInvokerCache`1", typeof(string), "Invokers").Clear();
- GetGenericCacheField("RequestPipelineInvokerCache`1", typeof(int), "Invokers").Clear();
- GetGenericCacheField("RequestPipelineInvokerCache`1", typeof(string), "Invokers").Clear();
+ GetCacheField("NotificationDispatchBindings").Clear();
+ GetCacheField("StreamDispatchBindings").Clear();
+ GetGenericCacheField("RequestDispatchBindingCache`1", typeof(int), "Bindings").Clear();
+ GetGenericCacheField("RequestDispatchBindingCache`1", typeof(string), "Bindings").Clear();
}
///
diff --git a/GFramework.Cqrs/Internal/CqrsDispatcher.cs b/GFramework.Cqrs/Internal/CqrsDispatcher.cs
index a6d62f96..e396998c 100644
--- a/GFramework.Cqrs/Internal/CqrsDispatcher.cs
+++ b/GFramework.Cqrs/Internal/CqrsDispatcher.cs
@@ -15,24 +15,13 @@ internal sealed class CqrsDispatcher(
IIocContainer container,
ILogger logger) : ICqrsRuntime
{
- // 进程级缓存:缓存通知调用委托,复用并发安全字典以支撑多线程发布路径。
- private static readonly ConcurrentDictionary NotificationInvokers = new();
+ // 进程级缓存:把通知服务类型与调用委托绑定到同一项,减少发布热路径上的重复字典查询。
+ private static readonly ConcurrentDictionary
+ NotificationDispatchBindings = new();
- // 进程级缓存:缓存通知处理器服务类型,避免每次发布都重复 MakeGenericType。
- private static readonly ConcurrentDictionary NotificationHandlerServiceTypes = new();
-
- // 进程级缓存:缓存流式请求调用委托,避免每次创建流时重复解析反射签名。
- private static readonly ConcurrentDictionary<(Type RequestType, Type ResponseType), StreamInvoker> StreamInvokers =
- new();
-
- // 进程级缓存:缓存请求处理器与 pipeline 行为的服务类型,减少热路径中的泛型类型构造。
- private static readonly ConcurrentDictionary<(Type RequestType, Type ResponseType), RequestServiceTypeSet>
- RequestServiceTypes = new();
-
- // 进程级缓存:缓存流式请求处理器服务类型,避免每次建流时重复 MakeGenericType。
- private static readonly ConcurrentDictionary<(Type RequestType, Type ResponseType), Type>
- StreamHandlerServiceTypes =
- new();
+ // 进程级缓存:把流式处理器服务类型与调用委托绑定到同一项,减少建流热路径上的重复字典查询。
+ private static readonly ConcurrentDictionary<(Type RequestType, Type ResponseType), StreamDispatchBinding>
+ StreamDispatchBindings = new();
// 静态方法定义缓存:这些反射查找与消息类型无关,只需解析一次即可复用。
private static readonly MethodInfo RequestHandlerInvokerMethodDefinition = typeof(CqrsDispatcher)
@@ -64,10 +53,10 @@ internal sealed class CqrsDispatcher(
ArgumentNullException.ThrowIfNull(notification);
var notificationType = notification.GetType();
- var handlerType = NotificationHandlerServiceTypes.GetOrAdd(
+ var dispatchBinding = NotificationDispatchBindings.GetOrAdd(
notificationType,
- static type => typeof(INotificationHandler<>).MakeGenericType(type));
- var handlers = container.GetAll(handlerType);
+ CreateNotificationDispatchBinding);
+ var handlers = container.GetAll(dispatchBinding.HandlerType);
if (handlers.Count == 0)
{
@@ -75,14 +64,10 @@ internal sealed class CqrsDispatcher(
return;
}
- var invoker = NotificationInvokers.GetOrAdd(
- notificationType,
- CreateNotificationInvoker);
-
foreach (var handler in handlers)
{
PrepareHandler(handler, context);
- await invoker(handler, notification, cancellationToken);
+ await dispatchBinding.Invoker(handler, notification, cancellationToken);
}
}
@@ -103,36 +88,23 @@ internal sealed class CqrsDispatcher(
ArgumentNullException.ThrowIfNull(request);
var requestType = request.GetType();
- var serviceTypes = RequestServiceTypes.GetOrAdd(
- (requestType, typeof(TResponse)),
- static key => new RequestServiceTypeSet(
- typeof(IRequestHandler<,>).MakeGenericType(key.RequestType, key.ResponseType),
- typeof(IPipelineBehavior<,>).MakeGenericType(key.RequestType, key.ResponseType)));
- var handlerType = serviceTypes.HandlerType;
- var handler = container.Get(handlerType)
+ var dispatchBinding = RequestDispatchBindingCache.Bindings.GetOrAdd(
+ requestType,
+ CreateRequestDispatchBinding);
+ var handler = container.Get(dispatchBinding.HandlerType)
?? throw new InvalidOperationException(
$"No CQRS request handler registered for {requestType.FullName}.");
PrepareHandler(handler, context);
- var behaviors = container.GetAll(serviceTypes.BehaviorType);
+ var behaviors = container.GetAll(dispatchBinding.BehaviorType);
foreach (var behavior in behaviors)
PrepareHandler(behavior, context);
if (behaviors.Count == 0)
- {
- var invoker = RequestInvokerCache.Invokers.GetOrAdd(
- requestType,
- CreateRequestInvoker);
+ return await dispatchBinding.RequestInvoker(handler, request, cancellationToken);
- return await invoker(handler, request, cancellationToken);
- }
-
- var pipelineInvoker = RequestPipelineInvokerCache.Invokers.GetOrAdd(
- requestType,
- CreateRequestPipelineInvoker);
-
- return await pipelineInvoker(handler, behaviors, request, cancellationToken);
+ return await dispatchBinding.PipelineInvoker(handler, behaviors, request, cancellationToken);
}
///
@@ -152,20 +124,16 @@ internal sealed class CqrsDispatcher(
ArgumentNullException.ThrowIfNull(request);
var requestType = request.GetType();
- var handlerType = StreamHandlerServiceTypes.GetOrAdd(
+ var dispatchBinding = StreamDispatchBindings.GetOrAdd(
(requestType, typeof(TResponse)),
- static key => typeof(IStreamRequestHandler<,>).MakeGenericType(key.RequestType, key.ResponseType));
- var handler = container.Get(handlerType)
+ static key => CreateStreamDispatchBinding(key.RequestType, key.ResponseType));
+ var handler = container.Get(dispatchBinding.HandlerType)
?? throw new InvalidOperationException(
$"No CQRS stream handler registered for {requestType.FullName}.");
PrepareHandler(handler, context);
- var invoker = StreamInvokers.GetOrAdd(
- (requestType, typeof(TResponse)),
- static key => CreateStreamInvoker(key.RequestType, key.ResponseType));
-
- return (IAsyncEnumerable)invoker(handler, request, cancellationToken);
+ return (IAsyncEnumerable)dispatchBinding.Invoker(handler, request, cancellationToken);
}
///
@@ -185,6 +153,38 @@ internal sealed class CqrsDispatcher(
}
}
+ ///
+ /// 为指定请求类型构造完整分发绑定,把服务类型与强类型调用委托一次性收敛到同一缓存项。
+ ///
+ private static RequestDispatchBinding CreateRequestDispatchBinding(Type requestType)
+ {
+ return new RequestDispatchBinding(
+ typeof(IRequestHandler<,>).MakeGenericType(requestType, typeof(TResponse)),
+ typeof(IPipelineBehavior<,>).MakeGenericType(requestType, typeof(TResponse)),
+ CreateRequestInvoker(requestType),
+ CreateRequestPipelineInvoker(requestType));
+ }
+
+ ///
+ /// 为指定通知类型构造完整分发绑定,把服务类型与调用委托聚合到同一缓存项。
+ ///
+ private static NotificationDispatchBinding CreateNotificationDispatchBinding(Type notificationType)
+ {
+ return new NotificationDispatchBinding(
+ typeof(INotificationHandler<>).MakeGenericType(notificationType),
+ CreateNotificationInvoker(notificationType));
+ }
+
+ ///
+ /// 为指定流式请求类型构造完整分发绑定,把服务类型与调用委托聚合到同一缓存项。
+ ///
+ private static StreamDispatchBinding CreateStreamDispatchBinding(Type requestType, Type responseType)
+ {
+ return new StreamDispatchBinding(
+ typeof(IStreamRequestHandler<,>).MakeGenericType(requestType, responseType),
+ CreateStreamInvoker(requestType, responseType));
+ }
+
///
/// 生成请求处理器调用委托,避免每次发送都重复反射。
///
@@ -312,22 +312,22 @@ internal sealed class CqrsDispatcher(
private delegate object StreamInvoker(object handler, object request, CancellationToken cancellationToken);
///
- /// 按响应类型分层缓存 request 处理器调用委托,避免 value-type 响应在 object 桥接中产生装箱。
+ /// 按响应类型分层缓存 request 分发绑定,既避免 value-type 响应走 object 桥接,
+ /// 也让 handler/pipeline 服务类型与调用委托在热路径上只命中一次缓存。
///
/// 请求响应类型。
- private static class RequestInvokerCache
+ private static class RequestDispatchBindingCache
{
- internal static readonly ConcurrentDictionary> Invokers = new();
+ internal static readonly ConcurrentDictionary> Bindings = new();
}
- ///
- /// 按响应类型分层缓存带 pipeline 的 request 调用委托,避免 pipeline 热路径上的额外装箱。
- ///
- /// 请求响应类型。
- private static class RequestPipelineInvokerCache
- {
- internal static readonly ConcurrentDictionary> Invokers = new();
- }
+ private readonly record struct NotificationDispatchBinding(Type HandlerType, NotificationInvoker Invoker);
- private readonly record struct RequestServiceTypeSet(Type HandlerType, Type BehaviorType);
+ private readonly record struct StreamDispatchBinding(Type HandlerType, StreamInvoker Invoker);
+
+ private readonly record struct RequestDispatchBinding(
+ Type HandlerType,
+ Type BehaviorType,
+ RequestInvoker RequestInvoker,
+ RequestPipelineInvoker PipelineInvoker);
}