GFramework/GFramework.Cqrs/Internal/CqrsDispatcher.cs
GeWuYou be59dc7f27 refactor(cqrs): 优化调度器缓存结构提升性能
- 将通知、请求和流式处理的缓存从多个独立字典合并为统一的分发绑定结构
- 减少热路径上的重复字典查询操作,提升分发性能
- 重构请求分发绑定缓存,按响应类型分层避免值类型装箱开销
- 添加完整的调度绑定单元测试验证缓存行为正确性
- 简化通知和流式处理的缓存查找逻辑,统一调用模式
2026-04-17 14:21:56 +08:00

334 lines
14 KiB
C#

using GFramework.Core.Abstractions.Architectures;
using GFramework.Core.Abstractions.Ioc;
using GFramework.Core.Abstractions.Logging;
using GFramework.Core.Abstractions.Rule;
using GFramework.Cqrs.Abstractions.Cqrs;
using ICqrsRuntime = GFramework.Core.Abstractions.Cqrs.ICqrsRuntime;
namespace GFramework.Cqrs.Internal;
/// <summary>
/// GFramework 自有 CQRS 运行时分发器。
/// 该类型负责解析请求/通知处理器,并在调用前为上下文感知对象注入当前 CQRS 分发上下文。
/// </summary>
internal sealed class CqrsDispatcher(
IIocContainer container,
ILogger logger) : ICqrsRuntime
{
// 进程级缓存:把通知服务类型与调用委托绑定到同一项,减少发布热路径上的重复字典查询。
private static readonly ConcurrentDictionary<Type, NotificationDispatchBinding>
NotificationDispatchBindings = new();
// 进程级缓存:把流式处理器服务类型与调用委托绑定到同一项,减少建流热路径上的重复字典查询。
private static readonly ConcurrentDictionary<(Type RequestType, Type ResponseType), StreamDispatchBinding>
StreamDispatchBindings = new();
// 静态方法定义缓存:这些反射查找与消息类型无关,只需解析一次即可复用。
private static readonly MethodInfo RequestHandlerInvokerMethodDefinition = typeof(CqrsDispatcher)
.GetMethod(nameof(InvokeRequestHandlerAsync), BindingFlags.NonPublic | BindingFlags.Static)!;
private static readonly MethodInfo RequestPipelineInvokerMethodDefinition = typeof(CqrsDispatcher)
.GetMethod(nameof(InvokeRequestPipelineAsync), BindingFlags.NonPublic | BindingFlags.Static)!;
private static readonly MethodInfo NotificationHandlerInvokerMethodDefinition = typeof(CqrsDispatcher)
.GetMethod(nameof(InvokeNotificationHandlerAsync), BindingFlags.NonPublic | BindingFlags.Static)!;
private static readonly MethodInfo StreamHandlerInvokerMethodDefinition = typeof(CqrsDispatcher)
.GetMethod(nameof(InvokeStreamHandler), BindingFlags.NonPublic | BindingFlags.Static)!;
/// <summary>
/// 发布通知到所有已注册处理器。
/// </summary>
/// <typeparam name="TNotification">通知类型。</typeparam>
/// <param name="context">当前 CQRS 分发上下文,用于上下文感知处理器注入。</param>
/// <param name="notification">通知对象。</param>
/// <param name="cancellationToken">取消令牌。</param>
public async ValueTask PublishAsync<TNotification>(
ICqrsContext context,
TNotification notification,
CancellationToken cancellationToken = default)
where TNotification : INotification
{
ArgumentNullException.ThrowIfNull(context);
ArgumentNullException.ThrowIfNull(notification);
var notificationType = notification.GetType();
var dispatchBinding = NotificationDispatchBindings.GetOrAdd(
notificationType,
CreateNotificationDispatchBinding);
var handlers = container.GetAll(dispatchBinding.HandlerType);
if (handlers.Count == 0)
{
logger.Debug($"No CQRS notification handler registered for {notificationType.FullName}.");
return;
}
foreach (var handler in handlers)
{
PrepareHandler(handler, context);
await dispatchBinding.Invoker(handler, notification, cancellationToken);
}
}
/// <summary>
/// 发送请求并返回结果。
/// </summary>
/// <typeparam name="TResponse">响应类型。</typeparam>
/// <param name="context">当前 CQRS 分发上下文,用于上下文感知处理器注入。</param>
/// <param name="request">请求对象。</param>
/// <param name="cancellationToken">取消令牌。</param>
/// <returns>请求响应。</returns>
public async ValueTask<TResponse> SendAsync<TResponse>(
ICqrsContext context,
IRequest<TResponse> request,
CancellationToken cancellationToken = default)
{
ArgumentNullException.ThrowIfNull(context);
ArgumentNullException.ThrowIfNull(request);
var requestType = request.GetType();
var dispatchBinding = RequestDispatchBindingCache<TResponse>.Bindings.GetOrAdd(
requestType,
CreateRequestDispatchBinding<TResponse>);
var handler = container.Get(dispatchBinding.HandlerType)
?? throw new InvalidOperationException(
$"No CQRS request handler registered for {requestType.FullName}.");
PrepareHandler(handler, context);
var behaviors = container.GetAll(dispatchBinding.BehaviorType);
foreach (var behavior in behaviors)
PrepareHandler(behavior, context);
if (behaviors.Count == 0)
return await dispatchBinding.RequestInvoker(handler, request, cancellationToken);
return await dispatchBinding.PipelineInvoker(handler, behaviors, request, cancellationToken);
}
/// <summary>
/// 创建流式请求并返回异步响应序列。
/// </summary>
/// <typeparam name="TResponse">响应元素类型。</typeparam>
/// <param name="context">当前 CQRS 分发上下文,用于上下文感知处理器注入。</param>
/// <param name="request">流式请求对象。</param>
/// <param name="cancellationToken">取消令牌。</param>
/// <returns>异步响应序列。</returns>
public IAsyncEnumerable<TResponse> CreateStream<TResponse>(
ICqrsContext context,
IStreamRequest<TResponse> request,
CancellationToken cancellationToken = default)
{
ArgumentNullException.ThrowIfNull(context);
ArgumentNullException.ThrowIfNull(request);
var requestType = request.GetType();
var dispatchBinding = StreamDispatchBindings.GetOrAdd(
(requestType, typeof(TResponse)),
static key => CreateStreamDispatchBinding(key.RequestType, key.ResponseType));
var handler = container.Get(dispatchBinding.HandlerType)
?? throw new InvalidOperationException(
$"No CQRS stream handler registered for {requestType.FullName}.");
PrepareHandler(handler, context);
return (IAsyncEnumerable<TResponse>)dispatchBinding.Invoker(handler, request, cancellationToken);
}
/// <summary>
/// 为上下文感知处理器注入当前 CQRS 分发上下文。
/// </summary>
/// <param name="handler">处理器实例。</param>
/// <param name="context">当前 CQRS 分发上下文。</param>
private static void PrepareHandler(object handler, ICqrsContext context)
{
if (handler is IContextAware contextAware)
{
if (context is not IArchitectureContext architectureContext)
throw new InvalidOperationException(
"The current CQRS context does not implement IArchitectureContext, so it cannot be injected into IContextAware handlers.");
contextAware.SetContext(architectureContext);
}
}
/// <summary>
/// 为指定请求类型构造完整分发绑定,把服务类型与强类型调用委托一次性收敛到同一缓存项。
/// </summary>
private static RequestDispatchBinding<TResponse> CreateRequestDispatchBinding<TResponse>(Type requestType)
{
return new RequestDispatchBinding<TResponse>(
typeof(IRequestHandler<,>).MakeGenericType(requestType, typeof(TResponse)),
typeof(IPipelineBehavior<,>).MakeGenericType(requestType, typeof(TResponse)),
CreateRequestInvoker<TResponse>(requestType),
CreateRequestPipelineInvoker<TResponse>(requestType));
}
/// <summary>
/// 为指定通知类型构造完整分发绑定,把服务类型与调用委托聚合到同一缓存项。
/// </summary>
private static NotificationDispatchBinding CreateNotificationDispatchBinding(Type notificationType)
{
return new NotificationDispatchBinding(
typeof(INotificationHandler<>).MakeGenericType(notificationType),
CreateNotificationInvoker(notificationType));
}
/// <summary>
/// 为指定流式请求类型构造完整分发绑定,把服务类型与调用委托聚合到同一缓存项。
/// </summary>
private static StreamDispatchBinding CreateStreamDispatchBinding(Type requestType, Type responseType)
{
return new StreamDispatchBinding(
typeof(IStreamRequestHandler<,>).MakeGenericType(requestType, responseType),
CreateStreamInvoker(requestType, responseType));
}
/// <summary>
/// 生成请求处理器调用委托,避免每次发送都重复反射。
/// </summary>
private static RequestInvoker<TResponse> CreateRequestInvoker<TResponse>(Type requestType)
{
var method = RequestHandlerInvokerMethodDefinition
.MakeGenericMethod(requestType, typeof(TResponse));
return (RequestInvoker<TResponse>)Delegate.CreateDelegate(typeof(RequestInvoker<TResponse>), method);
}
/// <summary>
/// 生成带管道行为的请求处理委托,避免每次发送都重复反射。
/// </summary>
private static RequestPipelineInvoker<TResponse> CreateRequestPipelineInvoker<TResponse>(Type requestType)
{
var method = RequestPipelineInvokerMethodDefinition
.MakeGenericMethod(requestType, typeof(TResponse));
return (RequestPipelineInvoker<TResponse>)Delegate.CreateDelegate(
typeof(RequestPipelineInvoker<TResponse>),
method);
}
/// <summary>
/// 生成通知处理器调用委托,避免每次发布都重复反射。
/// </summary>
private static NotificationInvoker CreateNotificationInvoker(Type notificationType)
{
var method = NotificationHandlerInvokerMethodDefinition
.MakeGenericMethod(notificationType);
return (NotificationInvoker)Delegate.CreateDelegate(typeof(NotificationInvoker), method);
}
/// <summary>
/// 生成流式处理器调用委托,避免每次创建流都重复反射。
/// </summary>
private static StreamInvoker CreateStreamInvoker(Type requestType, Type responseType)
{
var method = StreamHandlerInvokerMethodDefinition
.MakeGenericMethod(requestType, responseType);
return (StreamInvoker)Delegate.CreateDelegate(typeof(StreamInvoker), method);
}
/// <summary>
/// 执行已强类型化的请求处理器调用。
/// </summary>
private static ValueTask<TResponse> InvokeRequestHandlerAsync<TRequest, TResponse>(
object handler,
object request,
CancellationToken cancellationToken)
where TRequest : IRequest<TResponse>
{
var typedHandler = (IRequestHandler<TRequest, TResponse>)handler;
var typedRequest = (TRequest)request;
return typedHandler.Handle(typedRequest, cancellationToken);
}
/// <summary>
/// 执行包含管道行为链的请求处理。
/// </summary>
private static ValueTask<TResponse> InvokeRequestPipelineAsync<TRequest, TResponse>(
object handler,
IReadOnlyList<object> behaviors,
object request,
CancellationToken cancellationToken)
where TRequest : IRequest<TResponse>
{
var typedHandler = (IRequestHandler<TRequest, TResponse>)handler;
var typedRequest = (TRequest)request;
MessageHandlerDelegate<TRequest, TResponse> next =
(message, token) => typedHandler.Handle(message, token);
for (var i = behaviors.Count - 1; i >= 0; i--)
{
var behavior = (IPipelineBehavior<TRequest, TResponse>)behaviors[i];
var currentNext = next;
next = (message, token) => behavior.Handle(message, currentNext, token);
}
return next(typedRequest, cancellationToken);
}
/// <summary>
/// 执行已强类型化的通知处理器调用。
/// </summary>
private static ValueTask InvokeNotificationHandlerAsync<TNotification>(
object handler,
object notification,
CancellationToken cancellationToken)
where TNotification : INotification
{
var typedHandler = (INotificationHandler<TNotification>)handler;
var typedNotification = (TNotification)notification;
return typedHandler.Handle(typedNotification, cancellationToken);
}
/// <summary>
/// 执行已强类型化的流式处理器调用。
/// </summary>
private static object InvokeStreamHandler<TRequest, TResponse>(
object handler,
object request,
CancellationToken cancellationToken)
where TRequest : IStreamRequest<TResponse>
{
var typedHandler = (IStreamRequestHandler<TRequest, TResponse>)handler;
var typedRequest = (TRequest)request;
return typedHandler.Handle(typedRequest, cancellationToken);
}
private delegate ValueTask<TResponse> RequestInvoker<TResponse>(
object handler,
object request,
CancellationToken cancellationToken);
private delegate ValueTask<TResponse> RequestPipelineInvoker<TResponse>(
object handler,
IReadOnlyList<object> behaviors,
object request,
CancellationToken cancellationToken);
private delegate ValueTask NotificationInvoker(object handler, object notification,
CancellationToken cancellationToken);
private delegate object StreamInvoker(object handler, object request, CancellationToken cancellationToken);
/// <summary>
/// 按响应类型分层缓存 request 分发绑定,既避免 value-type 响应走 object 桥接,
/// 也让 handler/pipeline 服务类型与调用委托在热路径上只命中一次缓存。
/// </summary>
/// <typeparam name="TResponse">请求响应类型。</typeparam>
private static class RequestDispatchBindingCache<TResponse>
{
internal static readonly ConcurrentDictionary<Type, RequestDispatchBinding<TResponse>> Bindings = new();
}
private readonly record struct NotificationDispatchBinding(Type HandlerType, NotificationInvoker Invoker);
private readonly record struct StreamDispatchBinding(Type HandlerType, StreamInvoker Invoker);
private readonly record struct RequestDispatchBinding<TResponse>(
Type HandlerType,
Type BehaviorType,
RequestInvoker<TResponse> RequestInvoker,
RequestPipelineInvoker<TResponse> PipelineInvoker);
}