using GFramework.Core.Abstractions.Architectures; using GFramework.Core.Abstractions.Ioc; using GFramework.Core.Abstractions.Logging; using GFramework.Core.Abstractions.Rule; using GFramework.Cqrs.Abstractions.Cqrs; using ICqrsRuntime = GFramework.Core.Abstractions.Cqrs.ICqrsRuntime; namespace GFramework.Cqrs.Internal; /// /// GFramework 自有 CQRS 运行时分发器。 /// 该类型负责解析请求/通知处理器,并在调用前为上下文感知对象注入当前 CQRS 分发上下文。 /// internal sealed class CqrsDispatcher( IIocContainer container, ILogger logger) : ICqrsRuntime { // 进程级缓存:缓存通知调用委托,复用并发安全字典以支撑多线程发布路径。 private static readonly ConcurrentDictionary NotificationInvokers = new(); // 进程级缓存:缓存通知处理器服务类型,避免每次发布都重复 MakeGenericType。 private static readonly ConcurrentDictionary NotificationHandlerServiceTypes = new(); // 进程级缓存:缓存流式请求调用委托,避免每次创建流时重复解析反射签名。 private static readonly ConcurrentDictionary<(Type RequestType, Type ResponseType), StreamInvoker> StreamInvokers = new(); // 进程级缓存:缓存请求处理器与 pipeline 行为的服务类型,减少热路径中的泛型类型构造。 private static readonly ConcurrentDictionary<(Type RequestType, Type ResponseType), RequestServiceTypeSet> RequestServiceTypes = new(); // 进程级缓存:缓存流式请求处理器服务类型,避免每次建流时重复 MakeGenericType。 private static readonly ConcurrentDictionary<(Type RequestType, Type ResponseType), Type> StreamHandlerServiceTypes = new(); // 静态方法定义缓存:这些反射查找与消息类型无关,只需解析一次即可复用。 private static readonly MethodInfo RequestHandlerInvokerMethodDefinition = typeof(CqrsDispatcher) .GetMethod(nameof(InvokeRequestHandlerAsync), BindingFlags.NonPublic | BindingFlags.Static)!; private static readonly MethodInfo RequestPipelineInvokerMethodDefinition = typeof(CqrsDispatcher) .GetMethod(nameof(InvokeRequestPipelineAsync), BindingFlags.NonPublic | BindingFlags.Static)!; private static readonly MethodInfo NotificationHandlerInvokerMethodDefinition = typeof(CqrsDispatcher) .GetMethod(nameof(InvokeNotificationHandlerAsync), BindingFlags.NonPublic | BindingFlags.Static)!; private static readonly MethodInfo StreamHandlerInvokerMethodDefinition = typeof(CqrsDispatcher) .GetMethod(nameof(InvokeStreamHandler), BindingFlags.NonPublic | BindingFlags.Static)!; /// /// 发布通知到所有已注册处理器。 /// /// 通知类型。 /// 当前 CQRS 分发上下文,用于上下文感知处理器注入。 /// 通知对象。 /// 取消令牌。 public async ValueTask PublishAsync( ICqrsContext context, TNotification notification, CancellationToken cancellationToken = default) where TNotification : INotification { ArgumentNullException.ThrowIfNull(context); ArgumentNullException.ThrowIfNull(notification); var notificationType = notification.GetType(); var handlerType = NotificationHandlerServiceTypes.GetOrAdd( notificationType, static type => typeof(INotificationHandler<>).MakeGenericType(type)); var handlers = container.GetAll(handlerType); if (handlers.Count == 0) { logger.Debug($"No CQRS notification handler registered for {notificationType.FullName}."); return; } var invoker = NotificationInvokers.GetOrAdd( notificationType, CreateNotificationInvoker); foreach (var handler in handlers) { PrepareHandler(handler, context); await invoker(handler, notification, cancellationToken); } } /// /// 发送请求并返回结果。 /// /// 响应类型。 /// 当前 CQRS 分发上下文,用于上下文感知处理器注入。 /// 请求对象。 /// 取消令牌。 /// 请求响应。 public async ValueTask SendAsync( ICqrsContext context, IRequest request, CancellationToken cancellationToken = default) { ArgumentNullException.ThrowIfNull(context); ArgumentNullException.ThrowIfNull(request); var requestType = request.GetType(); var serviceTypes = RequestServiceTypes.GetOrAdd( (requestType, typeof(TResponse)), static key => new RequestServiceTypeSet( typeof(IRequestHandler<,>).MakeGenericType(key.RequestType, key.ResponseType), typeof(IPipelineBehavior<,>).MakeGenericType(key.RequestType, key.ResponseType))); var handlerType = serviceTypes.HandlerType; var handler = container.Get(handlerType) ?? throw new InvalidOperationException( $"No CQRS request handler registered for {requestType.FullName}."); PrepareHandler(handler, context); var behaviors = container.GetAll(serviceTypes.BehaviorType); foreach (var behavior in behaviors) PrepareHandler(behavior, context); if (behaviors.Count == 0) { var invoker = RequestInvokerCache.Invokers.GetOrAdd( requestType, CreateRequestInvoker); return await invoker(handler, request, cancellationToken); } var pipelineInvoker = RequestPipelineInvokerCache.Invokers.GetOrAdd( requestType, CreateRequestPipelineInvoker); return await pipelineInvoker(handler, behaviors, request, cancellationToken); } /// /// 创建流式请求并返回异步响应序列。 /// /// 响应元素类型。 /// 当前 CQRS 分发上下文,用于上下文感知处理器注入。 /// 流式请求对象。 /// 取消令牌。 /// 异步响应序列。 public IAsyncEnumerable CreateStream( ICqrsContext context, IStreamRequest request, CancellationToken cancellationToken = default) { ArgumentNullException.ThrowIfNull(context); ArgumentNullException.ThrowIfNull(request); var requestType = request.GetType(); var handlerType = StreamHandlerServiceTypes.GetOrAdd( (requestType, typeof(TResponse)), static key => typeof(IStreamRequestHandler<,>).MakeGenericType(key.RequestType, key.ResponseType)); var handler = container.Get(handlerType) ?? throw new InvalidOperationException( $"No CQRS stream handler registered for {requestType.FullName}."); PrepareHandler(handler, context); var invoker = StreamInvokers.GetOrAdd( (requestType, typeof(TResponse)), static key => CreateStreamInvoker(key.RequestType, key.ResponseType)); return (IAsyncEnumerable)invoker(handler, request, cancellationToken); } /// /// 为上下文感知处理器注入当前 CQRS 分发上下文。 /// /// 处理器实例。 /// 当前 CQRS 分发上下文。 private static void PrepareHandler(object handler, ICqrsContext context) { if (handler is IContextAware contextAware) { if (context is not IArchitectureContext architectureContext) throw new InvalidOperationException( "The current CQRS context does not implement IArchitectureContext, so it cannot be injected into IContextAware handlers."); contextAware.SetContext(architectureContext); } } /// /// 生成请求处理器调用委托,避免每次发送都重复反射。 /// private static RequestInvoker CreateRequestInvoker(Type requestType) { var method = RequestHandlerInvokerMethodDefinition .MakeGenericMethod(requestType, typeof(TResponse)); return (RequestInvoker)Delegate.CreateDelegate(typeof(RequestInvoker), method); } /// /// 生成带管道行为的请求处理委托,避免每次发送都重复反射。 /// private static RequestPipelineInvoker CreateRequestPipelineInvoker(Type requestType) { var method = RequestPipelineInvokerMethodDefinition .MakeGenericMethod(requestType, typeof(TResponse)); return (RequestPipelineInvoker)Delegate.CreateDelegate( typeof(RequestPipelineInvoker), method); } /// /// 生成通知处理器调用委托,避免每次发布都重复反射。 /// private static NotificationInvoker CreateNotificationInvoker(Type notificationType) { var method = NotificationHandlerInvokerMethodDefinition .MakeGenericMethod(notificationType); return (NotificationInvoker)Delegate.CreateDelegate(typeof(NotificationInvoker), method); } /// /// 生成流式处理器调用委托,避免每次创建流都重复反射。 /// private static StreamInvoker CreateStreamInvoker(Type requestType, Type responseType) { var method = StreamHandlerInvokerMethodDefinition .MakeGenericMethod(requestType, responseType); return (StreamInvoker)Delegate.CreateDelegate(typeof(StreamInvoker), method); } /// /// 执行已强类型化的请求处理器调用。 /// private static ValueTask InvokeRequestHandlerAsync( object handler, object request, CancellationToken cancellationToken) where TRequest : IRequest { var typedHandler = (IRequestHandler)handler; var typedRequest = (TRequest)request; return typedHandler.Handle(typedRequest, cancellationToken); } /// /// 执行包含管道行为链的请求处理。 /// private static ValueTask InvokeRequestPipelineAsync( object handler, IReadOnlyList behaviors, object request, CancellationToken cancellationToken) where TRequest : IRequest { var typedHandler = (IRequestHandler)handler; var typedRequest = (TRequest)request; MessageHandlerDelegate next = (message, token) => typedHandler.Handle(message, token); for (var i = behaviors.Count - 1; i >= 0; i--) { var behavior = (IPipelineBehavior)behaviors[i]; var currentNext = next; next = (message, token) => behavior.Handle(message, currentNext, token); } return next(typedRequest, cancellationToken); } /// /// 执行已强类型化的通知处理器调用。 /// private static ValueTask InvokeNotificationHandlerAsync( object handler, object notification, CancellationToken cancellationToken) where TNotification : INotification { var typedHandler = (INotificationHandler)handler; var typedNotification = (TNotification)notification; return typedHandler.Handle(typedNotification, cancellationToken); } /// /// 执行已强类型化的流式处理器调用。 /// private static object InvokeStreamHandler( object handler, object request, CancellationToken cancellationToken) where TRequest : IStreamRequest { var typedHandler = (IStreamRequestHandler)handler; var typedRequest = (TRequest)request; return typedHandler.Handle(typedRequest, cancellationToken); } private delegate ValueTask RequestInvoker( object handler, object request, CancellationToken cancellationToken); private delegate ValueTask RequestPipelineInvoker( object handler, IReadOnlyList behaviors, object request, CancellationToken cancellationToken); private delegate ValueTask NotificationInvoker(object handler, object notification, CancellationToken cancellationToken); private delegate object StreamInvoker(object handler, object request, CancellationToken cancellationToken); /// /// 按响应类型分层缓存 request 处理器调用委托,避免 value-type 响应在 object 桥接中产生装箱。 /// /// 请求响应类型。 private static class RequestInvokerCache { internal static readonly ConcurrentDictionary> Invokers = new(); } /// /// 按响应类型分层缓存带 pipeline 的 request 调用委托,避免 pipeline 热路径上的额外装箱。 /// /// 请求响应类型。 private static class RequestPipelineInvokerCache { internal static readonly ConcurrentDictionary> Invokers = new(); } private readonly record struct RequestServiceTypeSet(Type HandlerType, Type BehaviorType); }