diff --git a/GFramework.Cqrs.Tests/Cqrs/CqrsDispatcherCacheTests.cs b/GFramework.Cqrs.Tests/Cqrs/CqrsDispatcherCacheTests.cs index ee8c549a..13ffb761 100644 --- a/GFramework.Cqrs.Tests/Cqrs/CqrsDispatcherCacheTests.cs +++ b/GFramework.Cqrs.Tests/Cqrs/CqrsDispatcherCacheTests.cs @@ -24,6 +24,8 @@ internal sealed class CqrsDispatcherCacheTests LoggerFactoryResolver.Provider = new ConsoleLoggerFactoryProvider(); _container = new MicrosoftDiContainer(); _container.RegisterCqrsPipelineBehavior(); + _container.RegisterCqrsPipelineBehavior(); + _container.RegisterCqrsPipelineBehavior(); CqrsTestRuntime.RegisterHandlers( _container, @@ -145,6 +147,103 @@ internal sealed class CqrsDispatcherCacheTests }); } + /// + /// 验证 request pipeline executor 会按行为数量在 binding 内首次创建并在后续分发中复用。 + /// + [Test] + public async Task Dispatcher_Should_Cache_Request_Pipeline_Executors_Per_Behavior_Count() + { + var requestBindings = GetCacheField("RequestDispatchBindings"); + + Assert.Multiple(() => + { + Assert.That( + GetRequestPipelineExecutorValue( + requestBindings, + typeof(DispatcherPipelineCacheRequest), + typeof(int), + 1), + Is.Null); + Assert.That( + GetRequestPipelineExecutorValue( + requestBindings, + typeof(DispatcherPipelineOrderCacheRequest), + typeof(int), + 2), + Is.Null); + }); + + await _context!.SendRequestAsync(new DispatcherPipelineCacheRequest()); + await _context.SendRequestAsync(new DispatcherPipelineOrderCacheRequest()); + + var singleBehaviorExecutor = GetRequestPipelineExecutorValue( + requestBindings, + typeof(DispatcherPipelineCacheRequest), + typeof(int), + 1); + var twoBehaviorExecutor = GetRequestPipelineExecutorValue( + requestBindings, + typeof(DispatcherPipelineOrderCacheRequest), + typeof(int), + 2); + + await _context.SendRequestAsync(new DispatcherPipelineCacheRequest()); + await _context.SendRequestAsync(new DispatcherPipelineOrderCacheRequest()); + + Assert.Multiple(() => + { + Assert.That(singleBehaviorExecutor, Is.Not.Null); + Assert.That(twoBehaviorExecutor, Is.Not.Null); + Assert.That(singleBehaviorExecutor, Is.Not.SameAs(twoBehaviorExecutor)); + Assert.That( + GetRequestPipelineExecutorValue( + requestBindings, + typeof(DispatcherPipelineCacheRequest), + typeof(int), + 1), + Is.SameAs(singleBehaviorExecutor)); + Assert.That( + GetRequestPipelineExecutorValue( + requestBindings, + typeof(DispatcherPipelineOrderCacheRequest), + typeof(int), + 2), + Is.SameAs(twoBehaviorExecutor)); + }); + } + + /// + /// 验证复用缓存的 request pipeline executor 后,行为顺序和最终处理器顺序保持不变。 + /// + [Test] + public async Task Dispatcher_Should_Preserve_Request_Pipeline_Order_When_Reusing_Cached_Executor() + { + DispatcherPipelineOrderState.Reset(); + + await _context!.SendRequestAsync(new DispatcherPipelineOrderCacheRequest()); + var firstInvocation = DispatcherPipelineOrderState.Steps.ToArray(); + + DispatcherPipelineOrderState.Reset(); + + await _context.SendRequestAsync(new DispatcherPipelineOrderCacheRequest()); + var secondInvocation = DispatcherPipelineOrderState.Steps.ToArray(); + + var expectedOrder = new[] + { + "Outer:Before", + "Inner:Before", + "Handler", + "Inner:After", + "Outer:After" + }; + + Assert.Multiple(() => + { + Assert.That(firstInvocation, Is.EqualTo(expectedOrder)); + Assert.That(secondInvocation, Is.EqualTo(expectedOrder)); + }); + } + /// /// 通过反射读取 dispatcher 的静态缓存对象。 /// @@ -188,6 +287,21 @@ internal sealed class CqrsDispatcherCacheTests return InvokeInstanceMethod(cache, "GetValueOrDefaultForTesting", primaryType, secondaryType); } + /// + /// 读取 request dispatch binding 中指定行为数量的 pipeline executor 缓存项。 + /// + private static object? GetRequestPipelineExecutorValue( + object requestBindings, + Type requestType, + Type responseType, + int behaviorCount) + { + var binding = GetRequestDispatchBindingValue(requestBindings, requestType, responseType); + return binding is null + ? null + : InvokeInstanceMethod(binding, "GetPipelineExecutorForTesting", behaviorCount); + } + /// /// 调用缓存实例上的无参清理方法。 /// @@ -210,6 +324,28 @@ internal sealed class CqrsDispatcherCacheTests return method!.Invoke(target, arguments); } + /// + /// 读取指定请求/响应类型对对应的强类型 request dispatch binding。 + /// + private static object? GetRequestDispatchBindingValue(object requestBindings, Type requestType, Type responseType) + { + var bindingBox = GetPairCacheValue(requestBindings, requestType, responseType); + if (bindingBox is null) + { + return null; + } + + var method = bindingBox.GetType().GetMethod( + "Get", + BindingFlags.Instance | BindingFlags.Public | BindingFlags.NonPublic); + + Assert.That(method, Is.Not.Null, $"Missing request binding accessor on {bindingBox.GetType().FullName}."); + + return method! + .MakeGenericMethod(responseType) + .Invoke(bindingBox, Array.Empty()); + } + /// /// 获取 CQRS dispatcher 运行时类型。 /// diff --git a/GFramework.Cqrs.Tests/Cqrs/DispatcherPipelineOrderCacheRequest.cs b/GFramework.Cqrs.Tests/Cqrs/DispatcherPipelineOrderCacheRequest.cs new file mode 100644 index 00000000..9528a060 --- /dev/null +++ b/GFramework.Cqrs.Tests/Cqrs/DispatcherPipelineOrderCacheRequest.cs @@ -0,0 +1,8 @@ +using GFramework.Cqrs.Abstractions.Cqrs; + +namespace GFramework.Cqrs.Tests.Cqrs; + +/// +/// 为双行为 pipeline 顺序回归提供最小请求。 +/// +internal sealed record DispatcherPipelineOrderCacheRequest : IRequest; diff --git a/GFramework.Cqrs.Tests/Cqrs/DispatcherPipelineOrderCacheRequestHandler.cs b/GFramework.Cqrs.Tests/Cqrs/DispatcherPipelineOrderCacheRequestHandler.cs new file mode 100644 index 00000000..00224ff8 --- /dev/null +++ b/GFramework.Cqrs.Tests/Cqrs/DispatcherPipelineOrderCacheRequestHandler.cs @@ -0,0 +1,21 @@ +using GFramework.Cqrs.Abstractions.Cqrs; + +namespace GFramework.Cqrs.Tests.Cqrs; + +/// +/// 为双行为顺序回归提供最终请求处理器。 +/// +internal sealed class DispatcherPipelineOrderCacheRequestHandler : IRequestHandler +{ + /// + /// 记录处理器执行并返回固定结果。 + /// + /// 当前请求。 + /// 取消令牌。 + /// 固定整数结果。 + public ValueTask Handle(DispatcherPipelineOrderCacheRequest request, CancellationToken cancellationToken) + { + DispatcherPipelineOrderState.Steps.Add("Handler"); + return ValueTask.FromResult(3); + } +} diff --git a/GFramework.Cqrs.Tests/Cqrs/DispatcherPipelineOrderInnerBehavior.cs b/GFramework.Cqrs.Tests/Cqrs/DispatcherPipelineOrderInnerBehavior.cs new file mode 100644 index 00000000..a4b7def6 --- /dev/null +++ b/GFramework.Cqrs.Tests/Cqrs/DispatcherPipelineOrderInnerBehavior.cs @@ -0,0 +1,27 @@ +using GFramework.Cqrs.Abstractions.Cqrs; + +namespace GFramework.Cqrs.Tests.Cqrs; + +/// +/// 作为内层行为验证缓存 executor 复用后仍保持注册顺序。 +/// +internal sealed class DispatcherPipelineOrderInnerBehavior : IPipelineBehavior +{ + /// + /// 记录内层行为的前后执行节点。 + /// + /// 当前请求。 + /// 下一个处理阶段。 + /// 取消令牌。 + /// 下游处理器结果。 + public async ValueTask Handle( + DispatcherPipelineOrderCacheRequest request, + MessageHandlerDelegate next, + CancellationToken cancellationToken) + { + DispatcherPipelineOrderState.Steps.Add("Inner:Before"); + var result = await next(request, cancellationToken).ConfigureAwait(false); + DispatcherPipelineOrderState.Steps.Add("Inner:After"); + return result; + } +} diff --git a/GFramework.Cqrs.Tests/Cqrs/DispatcherPipelineOrderOuterBehavior.cs b/GFramework.Cqrs.Tests/Cqrs/DispatcherPipelineOrderOuterBehavior.cs new file mode 100644 index 00000000..b9ba2315 --- /dev/null +++ b/GFramework.Cqrs.Tests/Cqrs/DispatcherPipelineOrderOuterBehavior.cs @@ -0,0 +1,27 @@ +using GFramework.Cqrs.Abstractions.Cqrs; + +namespace GFramework.Cqrs.Tests.Cqrs; + +/// +/// 作为外层行为验证缓存 executor 复用后仍保持注册顺序。 +/// +internal sealed class DispatcherPipelineOrderOuterBehavior : IPipelineBehavior +{ + /// + /// 记录外层行为的前后执行节点。 + /// + /// 当前请求。 + /// 下一个处理阶段。 + /// 取消令牌。 + /// 下游处理器结果。 + public async ValueTask Handle( + DispatcherPipelineOrderCacheRequest request, + MessageHandlerDelegate next, + CancellationToken cancellationToken) + { + DispatcherPipelineOrderState.Steps.Add("Outer:Before"); + var result = await next(request, cancellationToken).ConfigureAwait(false); + DispatcherPipelineOrderState.Steps.Add("Outer:After"); + return result; + } +} diff --git a/GFramework.Cqrs.Tests/Cqrs/DispatcherPipelineOrderState.cs b/GFramework.Cqrs.Tests/Cqrs/DispatcherPipelineOrderState.cs new file mode 100644 index 00000000..2673371e --- /dev/null +++ b/GFramework.Cqrs.Tests/Cqrs/DispatcherPipelineOrderState.cs @@ -0,0 +1,20 @@ +namespace GFramework.Cqrs.Tests.Cqrs; + +/// +/// 记录双行为 pipeline 的实际执行顺序。 +/// +internal static class DispatcherPipelineOrderState +{ + /// + /// 获取按执行顺序追加的步骤名称。 + /// + public static List Steps { get; } = []; + + /// + /// 清空当前记录,供下一次断言使用。 + /// + public static void Reset() + { + Steps.Clear(); + } +} diff --git a/GFramework.Cqrs/Internal/CqrsDispatcher.cs b/GFramework.Cqrs/Internal/CqrsDispatcher.cs index 108f6c03..505299c5 100644 --- a/GFramework.Cqrs/Internal/CqrsDispatcher.cs +++ b/GFramework.Cqrs/Internal/CqrsDispatcher.cs @@ -1,3 +1,4 @@ +using System.Collections.Concurrent; using GFramework.Core.Abstractions.Architectures; using GFramework.Core.Abstractions.Ioc; using GFramework.Core.Abstractions.Logging; @@ -34,7 +35,7 @@ internal sealed class CqrsDispatcher( .GetMethod(nameof(InvokeRequestHandlerAsync), BindingFlags.NonPublic | BindingFlags.Static)!; private static readonly MethodInfo RequestPipelineInvokerMethodDefinition = typeof(CqrsDispatcher) - .GetMethod(nameof(InvokeRequestPipelineAsync), BindingFlags.NonPublic | BindingFlags.Static)!; + .GetMethod(nameof(InvokeRequestPipelineExecutorAsync), BindingFlags.NonPublic | BindingFlags.Static)!; private static readonly MethodInfo NotificationHandlerInvokerMethodDefinition = typeof(CqrsDispatcher) .GetMethod(nameof(InvokeNotificationHandlerAsync), BindingFlags.NonPublic | BindingFlags.Static)!; @@ -108,7 +109,8 @@ internal sealed class CqrsDispatcher( if (behaviors.Count == 0) return await dispatchBinding.RequestInvoker(handler, request, cancellationToken).ConfigureAwait(false); - return await dispatchBinding.PipelineInvoker(handler, behaviors, request, cancellationToken) + return await dispatchBinding.GetPipelineExecutor(behaviors.Count) + .Invoke(handler, behaviors, request, cancellationToken) .ConfigureAwait(false); } @@ -168,7 +170,7 @@ internal sealed class CqrsDispatcher( typeof(IRequestHandler<,>).MakeGenericType(requestType, typeof(TResponse)), typeof(IPipelineBehavior<,>).MakeGenericType(requestType, typeof(TResponse)), CreateRequestInvoker(requestType), - CreateRequestPipelineInvoker(requestType)); + requestType); } /// @@ -227,18 +229,6 @@ internal sealed class CqrsDispatcher( return (RequestInvoker)Delegate.CreateDelegate(typeof(RequestInvoker), method); } - /// - /// 生成带管道行为的请求处理委托,避免每次发送都重复反射。 - /// - private static RequestPipelineInvoker CreateRequestPipelineInvoker(Type requestType) - { - var method = RequestPipelineInvokerMethodDefinition - .MakeGenericMethod(requestType, typeof(TResponse)); - return (RequestPipelineInvoker)Delegate.CreateDelegate( - typeof(RequestPipelineInvoker), - method); - } - /// /// 生成通知处理器调用委托,避免每次发布都重复反射。 /// @@ -274,29 +264,20 @@ internal sealed class CqrsDispatcher( } /// - /// 执行包含管道行为链的请求处理。 + /// 执行指定行为数量的强类型 request pipeline executor。 + /// 该入口本身是缓存的固定 executor 形状;每次分发只绑定当前 handler 与 behaviors 实例。 /// - private static ValueTask InvokeRequestPipelineAsync( + private static ValueTask InvokeRequestPipelineExecutorAsync( object handler, IReadOnlyList behaviors, object request, CancellationToken cancellationToken) where TRequest : IRequest { - var typedHandler = (IRequestHandler)handler; - var typedRequest = (TRequest)request; - - MessageHandlerDelegate next = - (message, token) => typedHandler.Handle(message, token); - - for (var i = behaviors.Count - 1; i >= 0; i--) - { - var behavior = (IPipelineBehavior)behaviors[i]; - var currentNext = next; - next = (message, token) => behavior.Handle(message, currentNext, token); - } - - return next(typedRequest, cancellationToken); + var invocation = new RequestPipelineInvocation( + (IRequestHandler)handler, + behaviors); + return invocation.InvokeAsync((TRequest)request, cancellationToken); } /// @@ -424,15 +405,17 @@ internal sealed class CqrsDispatcher( /// /// 保存普通请求分发路径所需的 handler 服务类型、pipeline 服务类型与强类型调用委托。 - /// 该绑定同时覆盖“直接请求处理”和“带 pipeline 的请求处理”两条路径。 + /// 该绑定同时覆盖“直接请求处理”和“按行为数量缓存 pipeline executor 形状”的两条路径。 /// /// 请求响应类型。 private sealed class RequestDispatchBinding( Type handlerType, Type behaviorType, RequestInvoker requestInvoker, - RequestPipelineInvoker pipelineInvoker) + Type requestType) { + private readonly ConcurrentDictionary> _pipelineExecutors = new(); + /// /// 获取请求处理器在容器中的服务类型。 /// @@ -449,8 +432,157 @@ internal sealed class CqrsDispatcher( public RequestInvoker RequestInvoker { get; } = requestInvoker; /// - /// 获取执行 pipeline 行为链的强类型委托。 + /// 获取指定行为数量对应的 pipeline executor。 + /// executor 形状会按请求/响应类型与行为数量缓存,但不会缓存 handler 或 behavior 实例。 /// - public RequestPipelineInvoker PipelineInvoker { get; } = pipelineInvoker; + public RequestPipelineExecutor GetPipelineExecutor(int behaviorCount) + { + ArgumentOutOfRangeException.ThrowIfNegative(behaviorCount); + return _pipelineExecutors.GetOrAdd( + behaviorCount, + count => CreateRequestPipelineExecutor(requestType, count)); + } + + /// + /// 仅供测试读取指定行为数量是否已存在缓存 executor。 + /// + public object? GetPipelineExecutorForTesting(int behaviorCount) + { + _pipelineExecutors.TryGetValue(behaviorCount, out var executor); + return executor; + } + } + + /// + /// 为指定请求/响应类型与固定行为数量创建 pipeline executor。 + /// 行为数量用于表达缓存形状,实际分发仍会消费本次容器解析出的 handler 与 behaviors 实例。 + /// + private static RequestPipelineExecutor CreateRequestPipelineExecutor( + Type requestType, + int behaviorCount) + { + ArgumentOutOfRangeException.ThrowIfNegative(behaviorCount); + + var method = RequestPipelineInvokerMethodDefinition + .MakeGenericMethod(requestType, typeof(TResponse)); + var invoker = (RequestPipelineInvoker)Delegate.CreateDelegate( + typeof(RequestPipelineInvoker), + method); + return new RequestPipelineExecutor(behaviorCount, invoker); + } + + /// + /// 保存固定行为数量下的 typed pipeline executor 形状。 + /// 该对象自身可跨分发复用,但每次调用都只绑定当前 handler 与 behavior 实例。 + /// + /// 请求响应类型。 + private sealed class RequestPipelineExecutor( + int behaviorCount, + RequestPipelineInvoker invoker) + { + /// + /// 获取此 executor 预期处理的行为数量。 + /// + public int BehaviorCount { get; } = behaviorCount; + + /// + /// 使用当前 handler / behaviors / request 执行缓存的 pipeline 形状。 + /// + public ValueTask Invoke( + object handler, + IReadOnlyList behaviors, + object request, + CancellationToken cancellationToken) + { + if (behaviors.Count != BehaviorCount) + { + throw new InvalidOperationException( + $"Cached request pipeline executor expected {BehaviorCount} behaviors, but received {behaviors.Count}."); + } + + return invoker(handler, behaviors, request, cancellationToken); + } + } + + /// + /// 保存单次 request pipeline 分发所需的当前 handler、behavior 列表和 continuation 缓存。 + /// 该对象只存在于本次分发,不会跨请求保留容器解析出的实例。 + /// + private sealed class RequestPipelineInvocation( + IRequestHandler handler, + IReadOnlyList behaviors) + where TRequest : IRequest + { + private readonly IRequestHandler _handler = handler; + private readonly IReadOnlyList _behaviors = behaviors; + private readonly MessageHandlerDelegate?[] _continuations = + new MessageHandlerDelegate?[behaviors.Count + 1]; + + /// + /// 从 pipeline 起点执行当前请求。 + /// + public ValueTask InvokeAsync(TRequest request, CancellationToken cancellationToken) + { + return GetContinuation(0)(request, cancellationToken); + } + + /// + /// 获取指定阶段的 continuation,并在首次请求时为该阶段绑定一次不可变调用入口。 + /// 同一行为多次调用 next 时会命中相同 continuation,保持与传统链式委托一致的语义。 + /// + private MessageHandlerDelegate GetContinuation(int index) + { + var continuation = _continuations[index]; + if (continuation is not null) + { + return continuation; + } + + continuation = index == _behaviors.Count + ? InvokeHandlerAsync + : new RequestPipelineContinuation(this, index).InvokeAsync; + _continuations[index] = continuation; + return continuation; + } + + /// + /// 执行指定索引的 pipeline behavior。 + /// + private ValueTask InvokeBehaviorAsync( + int index, + TRequest request, + CancellationToken cancellationToken) + { + var behavior = (IPipelineBehavior)_behaviors[index]; + return behavior.Handle(request, GetContinuation(index + 1), cancellationToken); + } + + /// + /// 调用最终请求处理器。 + /// + private ValueTask InvokeHandlerAsync(TRequest request, CancellationToken cancellationToken) + { + return _handler.Handle(request, cancellationToken); + } + + /// + /// 将固定阶段索引绑定为标准 。 + /// 该包装只在单次分发生命周期内存在,用于把缓存 shape 套入当前实例。 + /// + private sealed class RequestPipelineContinuation( + RequestPipelineInvocation invocation, + int index) + where TCurrentRequest : IRequest + { + /// + /// 执行当前阶段并跳转到下一个 continuation。 + /// + public ValueTask InvokeAsync( + TCurrentRequest request, + CancellationToken cancellationToken) + { + return invocation.InvokeBehaviorAsync(index, request, cancellationToken); + } + } } }