mirror of
https://github.com/GeWuYou/GFramework.git
synced 2026-05-06 16:16:44 +08:00
refactor(cqrs): 优化调度器缓存结构提升性能
- 将通知、请求和流式处理的缓存从多个独立字典合并为统一的分发绑定结构 - 减少热路径上的重复字典查询操作,提升分发性能 - 重构请求分发绑定缓存,按响应类型分层避免值类型装箱开销 - 添加完整的调度绑定单元测试验证缓存行为正确性 - 简化通知和流式处理的缓存查找逻辑,统一调用模式
This commit is contained in:
parent
59dfb68add
commit
be59dc7f27
@ -7,14 +7,11 @@ using GFramework.Cqrs.Abstractions.Cqrs;
|
||||
namespace GFramework.Cqrs.Tests.Cqrs;
|
||||
|
||||
/// <summary>
|
||||
/// 验证 CQRS dispatcher 会缓存热路径中的服务类型与调用委托。
|
||||
/// 验证 CQRS dispatcher 会缓存热路径中的 dispatch binding。
|
||||
/// </summary>
|
||||
[TestFixture]
|
||||
internal sealed class CqrsDispatcherCacheTests
|
||||
{
|
||||
private MicrosoftDiContainer? _container;
|
||||
private ArchitectureContext? _context;
|
||||
|
||||
/// <summary>
|
||||
/// 初始化测试上下文。
|
||||
/// </summary>
|
||||
@ -45,40 +42,31 @@ internal sealed class CqrsDispatcherCacheTests
|
||||
_container = null;
|
||||
}
|
||||
|
||||
private MicrosoftDiContainer? _container;
|
||||
private ArchitectureContext? _context;
|
||||
|
||||
/// <summary>
|
||||
/// 验证相同消息类型重复分发时,不会重复扩张服务类型与调用委托缓存。
|
||||
/// 验证相同消息类型重复分发时,不会重复扩张 dispatch binding 缓存。
|
||||
/// </summary>
|
||||
[Test]
|
||||
public async Task Dispatcher_Should_Cache_Service_Types_After_First_Dispatch()
|
||||
public async Task Dispatcher_Should_Cache_Dispatch_Bindings_After_First_Dispatch()
|
||||
{
|
||||
var notificationServiceTypes = GetCacheField("NotificationHandlerServiceTypes");
|
||||
var requestServiceTypes = GetCacheField("RequestServiceTypes");
|
||||
var streamServiceTypes = GetCacheField("StreamHandlerServiceTypes");
|
||||
var requestInvokers = GetGenericCacheField("RequestInvokerCache`1", typeof(int), "Invokers");
|
||||
var requestPipelineInvokers = GetGenericCacheField("RequestPipelineInvokerCache`1", typeof(int), "Invokers");
|
||||
var notificationInvokers = GetCacheField("NotificationInvokers");
|
||||
var streamInvokers = GetCacheField("StreamInvokers");
|
||||
var notificationBindings = GetCacheField("NotificationDispatchBindings");
|
||||
var requestBindings = GetGenericCacheField("RequestDispatchBindingCache`1", typeof(int), "Bindings");
|
||||
var streamBindings = GetCacheField("StreamDispatchBindings");
|
||||
|
||||
var notificationBefore = notificationServiceTypes.Count;
|
||||
var requestBefore = requestServiceTypes.Count;
|
||||
var streamBefore = streamServiceTypes.Count;
|
||||
var requestInvokersBefore = requestInvokers.Count;
|
||||
var requestPipelineInvokersBefore = requestPipelineInvokers.Count;
|
||||
var notificationInvokersBefore = notificationInvokers.Count;
|
||||
var streamInvokersBefore = streamInvokers.Count;
|
||||
var notificationBefore = notificationBindings.Count;
|
||||
var requestBefore = requestBindings.Count;
|
||||
var streamBefore = streamBindings.Count;
|
||||
|
||||
await _context!.SendRequestAsync(new DispatcherCacheRequest());
|
||||
await _context.SendRequestAsync(new DispatcherPipelineCacheRequest());
|
||||
await _context.PublishAsync(new DispatcherCacheNotification());
|
||||
await DrainAsync(_context.CreateStream(new DispatcherCacheStreamRequest()));
|
||||
|
||||
var notificationAfterFirstDispatch = notificationServiceTypes.Count;
|
||||
var requestAfterFirstDispatch = requestServiceTypes.Count;
|
||||
var streamAfterFirstDispatch = streamServiceTypes.Count;
|
||||
var requestInvokersAfterFirstDispatch = requestInvokers.Count;
|
||||
var requestPipelineInvokersAfterFirstDispatch = requestPipelineInvokers.Count;
|
||||
var notificationInvokersAfterFirstDispatch = notificationInvokers.Count;
|
||||
var streamInvokersAfterFirstDispatch = streamInvokers.Count;
|
||||
var notificationAfterFirstDispatch = notificationBindings.Count;
|
||||
var requestAfterFirstDispatch = requestBindings.Count;
|
||||
var streamAfterFirstDispatch = streamBindings.Count;
|
||||
|
||||
await _context.SendRequestAsync(new DispatcherCacheRequest());
|
||||
await _context.SendRequestAsync(new DispatcherPipelineCacheRequest());
|
||||
@ -90,38 +78,30 @@ internal sealed class CqrsDispatcherCacheTests
|
||||
Assert.That(notificationAfterFirstDispatch, Is.EqualTo(notificationBefore + 1));
|
||||
Assert.That(requestAfterFirstDispatch, Is.EqualTo(requestBefore + 2));
|
||||
Assert.That(streamAfterFirstDispatch, Is.EqualTo(streamBefore + 1));
|
||||
Assert.That(requestInvokersAfterFirstDispatch, Is.EqualTo(requestInvokersBefore + 1));
|
||||
Assert.That(requestPipelineInvokersAfterFirstDispatch, Is.EqualTo(requestPipelineInvokersBefore + 1));
|
||||
Assert.That(notificationInvokersAfterFirstDispatch, Is.EqualTo(notificationInvokersBefore + 1));
|
||||
Assert.That(streamInvokersAfterFirstDispatch, Is.EqualTo(streamInvokersBefore + 1));
|
||||
|
||||
Assert.That(notificationServiceTypes.Count, Is.EqualTo(notificationAfterFirstDispatch));
|
||||
Assert.That(requestServiceTypes.Count, Is.EqualTo(requestAfterFirstDispatch));
|
||||
Assert.That(streamServiceTypes.Count, Is.EqualTo(streamAfterFirstDispatch));
|
||||
Assert.That(requestInvokers.Count, Is.EqualTo(requestInvokersAfterFirstDispatch));
|
||||
Assert.That(requestPipelineInvokers.Count, Is.EqualTo(requestPipelineInvokersAfterFirstDispatch));
|
||||
Assert.That(notificationInvokers.Count, Is.EqualTo(notificationInvokersAfterFirstDispatch));
|
||||
Assert.That(streamInvokers.Count, Is.EqualTo(streamInvokersAfterFirstDispatch));
|
||||
Assert.That(notificationBindings.Count, Is.EqualTo(notificationAfterFirstDispatch));
|
||||
Assert.That(requestBindings.Count, Is.EqualTo(requestAfterFirstDispatch));
|
||||
Assert.That(streamBindings.Count, Is.EqualTo(streamAfterFirstDispatch));
|
||||
});
|
||||
}
|
||||
|
||||
/// <summary>
|
||||
/// 验证 request 调用委托会按响应类型分别缓存,避免不同响应类型共用 object 结果桥接。
|
||||
/// 验证 request dispatch binding 会按响应类型分别缓存,避免不同响应类型共用 object 结果桥接。
|
||||
/// </summary>
|
||||
[Test]
|
||||
public async Task Dispatcher_Should_Cache_Request_Invokers_Per_Response_Type()
|
||||
public async Task Dispatcher_Should_Cache_Request_Dispatch_Bindings_Per_Response_Type()
|
||||
{
|
||||
var intRequestInvokers = GetGenericCacheField("RequestInvokerCache`1", typeof(int), "Invokers");
|
||||
var stringRequestInvokers = GetGenericCacheField("RequestInvokerCache`1", typeof(string), "Invokers");
|
||||
var intRequestBindings = GetGenericCacheField("RequestDispatchBindingCache`1", typeof(int), "Bindings");
|
||||
var stringRequestBindings = GetGenericCacheField("RequestDispatchBindingCache`1", typeof(string), "Bindings");
|
||||
|
||||
var intBefore = intRequestInvokers.Count;
|
||||
var stringBefore = stringRequestInvokers.Count;
|
||||
var intBefore = intRequestBindings.Count;
|
||||
var stringBefore = stringRequestBindings.Count;
|
||||
|
||||
await _context!.SendRequestAsync(new DispatcherCacheRequest());
|
||||
await _context.SendRequestAsync(new DispatcherStringCacheRequest());
|
||||
|
||||
var intAfterFirstDispatch = intRequestInvokers.Count;
|
||||
var stringAfterFirstDispatch = stringRequestInvokers.Count;
|
||||
var intAfterFirstDispatch = intRequestBindings.Count;
|
||||
var stringAfterFirstDispatch = stringRequestBindings.Count;
|
||||
|
||||
await _context.SendRequestAsync(new DispatcherCacheRequest());
|
||||
await _context.SendRequestAsync(new DispatcherStringCacheRequest());
|
||||
@ -130,8 +110,8 @@ internal sealed class CqrsDispatcherCacheTests
|
||||
{
|
||||
Assert.That(intAfterFirstDispatch, Is.EqualTo(intBefore + 1));
|
||||
Assert.That(stringAfterFirstDispatch, Is.EqualTo(stringBefore + 1));
|
||||
Assert.That(intRequestInvokers.Count, Is.EqualTo(intAfterFirstDispatch));
|
||||
Assert.That(stringRequestInvokers.Count, Is.EqualTo(stringAfterFirstDispatch));
|
||||
Assert.That(intRequestBindings.Count, Is.EqualTo(intAfterFirstDispatch));
|
||||
Assert.That(stringRequestBindings.Count, Is.EqualTo(stringAfterFirstDispatch));
|
||||
});
|
||||
}
|
||||
|
||||
@ -157,15 +137,10 @@ internal sealed class CqrsDispatcherCacheTests
|
||||
/// </summary>
|
||||
private static void ClearDispatcherCaches()
|
||||
{
|
||||
GetCacheField("NotificationHandlerServiceTypes").Clear();
|
||||
GetCacheField("RequestServiceTypes").Clear();
|
||||
GetCacheField("StreamHandlerServiceTypes").Clear();
|
||||
GetCacheField("NotificationInvokers").Clear();
|
||||
GetCacheField("StreamInvokers").Clear();
|
||||
GetGenericCacheField("RequestInvokerCache`1", typeof(int), "Invokers").Clear();
|
||||
GetGenericCacheField("RequestInvokerCache`1", typeof(string), "Invokers").Clear();
|
||||
GetGenericCacheField("RequestPipelineInvokerCache`1", typeof(int), "Invokers").Clear();
|
||||
GetGenericCacheField("RequestPipelineInvokerCache`1", typeof(string), "Invokers").Clear();
|
||||
GetCacheField("NotificationDispatchBindings").Clear();
|
||||
GetCacheField("StreamDispatchBindings").Clear();
|
||||
GetGenericCacheField("RequestDispatchBindingCache`1", typeof(int), "Bindings").Clear();
|
||||
GetGenericCacheField("RequestDispatchBindingCache`1", typeof(string), "Bindings").Clear();
|
||||
}
|
||||
|
||||
/// <summary>
|
||||
|
||||
@ -15,24 +15,13 @@ internal sealed class CqrsDispatcher(
|
||||
IIocContainer container,
|
||||
ILogger logger) : ICqrsRuntime
|
||||
{
|
||||
// 进程级缓存:缓存通知调用委托,复用并发安全字典以支撑多线程发布路径。
|
||||
private static readonly ConcurrentDictionary<Type, NotificationInvoker> NotificationInvokers = new();
|
||||
// 进程级缓存:把通知服务类型与调用委托绑定到同一项,减少发布热路径上的重复字典查询。
|
||||
private static readonly ConcurrentDictionary<Type, NotificationDispatchBinding>
|
||||
NotificationDispatchBindings = new();
|
||||
|
||||
// 进程级缓存:缓存通知处理器服务类型,避免每次发布都重复 MakeGenericType。
|
||||
private static readonly ConcurrentDictionary<Type, Type> NotificationHandlerServiceTypes = new();
|
||||
|
||||
// 进程级缓存:缓存流式请求调用委托,避免每次创建流时重复解析反射签名。
|
||||
private static readonly ConcurrentDictionary<(Type RequestType, Type ResponseType), StreamInvoker> StreamInvokers =
|
||||
new();
|
||||
|
||||
// 进程级缓存:缓存请求处理器与 pipeline 行为的服务类型,减少热路径中的泛型类型构造。
|
||||
private static readonly ConcurrentDictionary<(Type RequestType, Type ResponseType), RequestServiceTypeSet>
|
||||
RequestServiceTypes = new();
|
||||
|
||||
// 进程级缓存:缓存流式请求处理器服务类型,避免每次建流时重复 MakeGenericType。
|
||||
private static readonly ConcurrentDictionary<(Type RequestType, Type ResponseType), Type>
|
||||
StreamHandlerServiceTypes =
|
||||
new();
|
||||
// 进程级缓存:把流式处理器服务类型与调用委托绑定到同一项,减少建流热路径上的重复字典查询。
|
||||
private static readonly ConcurrentDictionary<(Type RequestType, Type ResponseType), StreamDispatchBinding>
|
||||
StreamDispatchBindings = new();
|
||||
|
||||
// 静态方法定义缓存:这些反射查找与消息类型无关,只需解析一次即可复用。
|
||||
private static readonly MethodInfo RequestHandlerInvokerMethodDefinition = typeof(CqrsDispatcher)
|
||||
@ -64,10 +53,10 @@ internal sealed class CqrsDispatcher(
|
||||
ArgumentNullException.ThrowIfNull(notification);
|
||||
|
||||
var notificationType = notification.GetType();
|
||||
var handlerType = NotificationHandlerServiceTypes.GetOrAdd(
|
||||
var dispatchBinding = NotificationDispatchBindings.GetOrAdd(
|
||||
notificationType,
|
||||
static type => typeof(INotificationHandler<>).MakeGenericType(type));
|
||||
var handlers = container.GetAll(handlerType);
|
||||
CreateNotificationDispatchBinding);
|
||||
var handlers = container.GetAll(dispatchBinding.HandlerType);
|
||||
|
||||
if (handlers.Count == 0)
|
||||
{
|
||||
@ -75,14 +64,10 @@ internal sealed class CqrsDispatcher(
|
||||
return;
|
||||
}
|
||||
|
||||
var invoker = NotificationInvokers.GetOrAdd(
|
||||
notificationType,
|
||||
CreateNotificationInvoker);
|
||||
|
||||
foreach (var handler in handlers)
|
||||
{
|
||||
PrepareHandler(handler, context);
|
||||
await invoker(handler, notification, cancellationToken);
|
||||
await dispatchBinding.Invoker(handler, notification, cancellationToken);
|
||||
}
|
||||
}
|
||||
|
||||
@ -103,36 +88,23 @@ internal sealed class CqrsDispatcher(
|
||||
ArgumentNullException.ThrowIfNull(request);
|
||||
|
||||
var requestType = request.GetType();
|
||||
var serviceTypes = RequestServiceTypes.GetOrAdd(
|
||||
(requestType, typeof(TResponse)),
|
||||
static key => new RequestServiceTypeSet(
|
||||
typeof(IRequestHandler<,>).MakeGenericType(key.RequestType, key.ResponseType),
|
||||
typeof(IPipelineBehavior<,>).MakeGenericType(key.RequestType, key.ResponseType)));
|
||||
var handlerType = serviceTypes.HandlerType;
|
||||
var handler = container.Get(handlerType)
|
||||
var dispatchBinding = RequestDispatchBindingCache<TResponse>.Bindings.GetOrAdd(
|
||||
requestType,
|
||||
CreateRequestDispatchBinding<TResponse>);
|
||||
var handler = container.Get(dispatchBinding.HandlerType)
|
||||
?? throw new InvalidOperationException(
|
||||
$"No CQRS request handler registered for {requestType.FullName}.");
|
||||
|
||||
PrepareHandler(handler, context);
|
||||
var behaviors = container.GetAll(serviceTypes.BehaviorType);
|
||||
var behaviors = container.GetAll(dispatchBinding.BehaviorType);
|
||||
|
||||
foreach (var behavior in behaviors)
|
||||
PrepareHandler(behavior, context);
|
||||
|
||||
if (behaviors.Count == 0)
|
||||
{
|
||||
var invoker = RequestInvokerCache<TResponse>.Invokers.GetOrAdd(
|
||||
requestType,
|
||||
CreateRequestInvoker<TResponse>);
|
||||
return await dispatchBinding.RequestInvoker(handler, request, cancellationToken);
|
||||
|
||||
return await invoker(handler, request, cancellationToken);
|
||||
}
|
||||
|
||||
var pipelineInvoker = RequestPipelineInvokerCache<TResponse>.Invokers.GetOrAdd(
|
||||
requestType,
|
||||
CreateRequestPipelineInvoker<TResponse>);
|
||||
|
||||
return await pipelineInvoker(handler, behaviors, request, cancellationToken);
|
||||
return await dispatchBinding.PipelineInvoker(handler, behaviors, request, cancellationToken);
|
||||
}
|
||||
|
||||
/// <summary>
|
||||
@ -152,20 +124,16 @@ internal sealed class CqrsDispatcher(
|
||||
ArgumentNullException.ThrowIfNull(request);
|
||||
|
||||
var requestType = request.GetType();
|
||||
var handlerType = StreamHandlerServiceTypes.GetOrAdd(
|
||||
var dispatchBinding = StreamDispatchBindings.GetOrAdd(
|
||||
(requestType, typeof(TResponse)),
|
||||
static key => typeof(IStreamRequestHandler<,>).MakeGenericType(key.RequestType, key.ResponseType));
|
||||
var handler = container.Get(handlerType)
|
||||
static key => CreateStreamDispatchBinding(key.RequestType, key.ResponseType));
|
||||
var handler = container.Get(dispatchBinding.HandlerType)
|
||||
?? throw new InvalidOperationException(
|
||||
$"No CQRS stream handler registered for {requestType.FullName}.");
|
||||
|
||||
PrepareHandler(handler, context);
|
||||
|
||||
var invoker = StreamInvokers.GetOrAdd(
|
||||
(requestType, typeof(TResponse)),
|
||||
static key => CreateStreamInvoker(key.RequestType, key.ResponseType));
|
||||
|
||||
return (IAsyncEnumerable<TResponse>)invoker(handler, request, cancellationToken);
|
||||
return (IAsyncEnumerable<TResponse>)dispatchBinding.Invoker(handler, request, cancellationToken);
|
||||
}
|
||||
|
||||
/// <summary>
|
||||
@ -185,6 +153,38 @@ internal sealed class CqrsDispatcher(
|
||||
}
|
||||
}
|
||||
|
||||
/// <summary>
|
||||
/// 为指定请求类型构造完整分发绑定,把服务类型与强类型调用委托一次性收敛到同一缓存项。
|
||||
/// </summary>
|
||||
private static RequestDispatchBinding<TResponse> CreateRequestDispatchBinding<TResponse>(Type requestType)
|
||||
{
|
||||
return new RequestDispatchBinding<TResponse>(
|
||||
typeof(IRequestHandler<,>).MakeGenericType(requestType, typeof(TResponse)),
|
||||
typeof(IPipelineBehavior<,>).MakeGenericType(requestType, typeof(TResponse)),
|
||||
CreateRequestInvoker<TResponse>(requestType),
|
||||
CreateRequestPipelineInvoker<TResponse>(requestType));
|
||||
}
|
||||
|
||||
/// <summary>
|
||||
/// 为指定通知类型构造完整分发绑定,把服务类型与调用委托聚合到同一缓存项。
|
||||
/// </summary>
|
||||
private static NotificationDispatchBinding CreateNotificationDispatchBinding(Type notificationType)
|
||||
{
|
||||
return new NotificationDispatchBinding(
|
||||
typeof(INotificationHandler<>).MakeGenericType(notificationType),
|
||||
CreateNotificationInvoker(notificationType));
|
||||
}
|
||||
|
||||
/// <summary>
|
||||
/// 为指定流式请求类型构造完整分发绑定,把服务类型与调用委托聚合到同一缓存项。
|
||||
/// </summary>
|
||||
private static StreamDispatchBinding CreateStreamDispatchBinding(Type requestType, Type responseType)
|
||||
{
|
||||
return new StreamDispatchBinding(
|
||||
typeof(IStreamRequestHandler<,>).MakeGenericType(requestType, responseType),
|
||||
CreateStreamInvoker(requestType, responseType));
|
||||
}
|
||||
|
||||
/// <summary>
|
||||
/// 生成请求处理器调用委托,避免每次发送都重复反射。
|
||||
/// </summary>
|
||||
@ -312,22 +312,22 @@ internal sealed class CqrsDispatcher(
|
||||
private delegate object StreamInvoker(object handler, object request, CancellationToken cancellationToken);
|
||||
|
||||
/// <summary>
|
||||
/// 按响应类型分层缓存 request 处理器调用委托,避免 value-type 响应在 object 桥接中产生装箱。
|
||||
/// 按响应类型分层缓存 request 分发绑定,既避免 value-type 响应走 object 桥接,
|
||||
/// 也让 handler/pipeline 服务类型与调用委托在热路径上只命中一次缓存。
|
||||
/// </summary>
|
||||
/// <typeparam name="TResponse">请求响应类型。</typeparam>
|
||||
private static class RequestInvokerCache<TResponse>
|
||||
private static class RequestDispatchBindingCache<TResponse>
|
||||
{
|
||||
internal static readonly ConcurrentDictionary<Type, RequestInvoker<TResponse>> Invokers = new();
|
||||
internal static readonly ConcurrentDictionary<Type, RequestDispatchBinding<TResponse>> Bindings = new();
|
||||
}
|
||||
|
||||
/// <summary>
|
||||
/// 按响应类型分层缓存带 pipeline 的 request 调用委托,避免 pipeline 热路径上的额外装箱。
|
||||
/// </summary>
|
||||
/// <typeparam name="TResponse">请求响应类型。</typeparam>
|
||||
private static class RequestPipelineInvokerCache<TResponse>
|
||||
{
|
||||
internal static readonly ConcurrentDictionary<Type, RequestPipelineInvoker<TResponse>> Invokers = new();
|
||||
}
|
||||
private readonly record struct NotificationDispatchBinding(Type HandlerType, NotificationInvoker Invoker);
|
||||
|
||||
private readonly record struct RequestServiceTypeSet(Type HandlerType, Type BehaviorType);
|
||||
private readonly record struct StreamDispatchBinding(Type HandlerType, StreamInvoker Invoker);
|
||||
|
||||
private readonly record struct RequestDispatchBinding<TResponse>(
|
||||
Type HandlerType,
|
||||
Type BehaviorType,
|
||||
RequestInvoker<TResponse> RequestInvoker,
|
||||
RequestPipelineInvoker<TResponse> PipelineInvoker);
|
||||
}
|
||||
|
||||
Loading…
x
Reference in New Issue
Block a user