fix(cqrs): 缓存请求管道执行形状

- 优化 CqrsDispatcher 的 request pipeline 路径,按请求类型与行为数量缓存 typed executor 形状并在单次分发中绑定当前 handler 与 behaviors

- 补充 dispatcher 缓存回归测试,覆盖 pipeline executor 的首次创建、后续复用与行为顺序稳定
This commit is contained in:
gewuyou 2026-04-29 16:49:08 +08:00 committed by GeWuYou
parent 3b4eb3e40a
commit e81a43680d
7 changed files with 406 additions and 35 deletions

View File

@ -24,6 +24,8 @@ internal sealed class CqrsDispatcherCacheTests
LoggerFactoryResolver.Provider = new ConsoleLoggerFactoryProvider(); LoggerFactoryResolver.Provider = new ConsoleLoggerFactoryProvider();
_container = new MicrosoftDiContainer(); _container = new MicrosoftDiContainer();
_container.RegisterCqrsPipelineBehavior<DispatcherPipelineCacheBehavior>(); _container.RegisterCqrsPipelineBehavior<DispatcherPipelineCacheBehavior>();
_container.RegisterCqrsPipelineBehavior<DispatcherPipelineOrderOuterBehavior>();
_container.RegisterCqrsPipelineBehavior<DispatcherPipelineOrderInnerBehavior>();
CqrsTestRuntime.RegisterHandlers( CqrsTestRuntime.RegisterHandlers(
_container, _container,
@ -145,6 +147,103 @@ internal sealed class CqrsDispatcherCacheTests
}); });
} }
/// <summary>
/// 验证 request pipeline executor 会按行为数量在 binding 内首次创建并在后续分发中复用。
/// </summary>
[Test]
public async Task Dispatcher_Should_Cache_Request_Pipeline_Executors_Per_Behavior_Count()
{
var requestBindings = GetCacheField("RequestDispatchBindings");
Assert.Multiple(() =>
{
Assert.That(
GetRequestPipelineExecutorValue(
requestBindings,
typeof(DispatcherPipelineCacheRequest),
typeof(int),
1),
Is.Null);
Assert.That(
GetRequestPipelineExecutorValue(
requestBindings,
typeof(DispatcherPipelineOrderCacheRequest),
typeof(int),
2),
Is.Null);
});
await _context!.SendRequestAsync(new DispatcherPipelineCacheRequest());
await _context.SendRequestAsync(new DispatcherPipelineOrderCacheRequest());
var singleBehaviorExecutor = GetRequestPipelineExecutorValue(
requestBindings,
typeof(DispatcherPipelineCacheRequest),
typeof(int),
1);
var twoBehaviorExecutor = GetRequestPipelineExecutorValue(
requestBindings,
typeof(DispatcherPipelineOrderCacheRequest),
typeof(int),
2);
await _context.SendRequestAsync(new DispatcherPipelineCacheRequest());
await _context.SendRequestAsync(new DispatcherPipelineOrderCacheRequest());
Assert.Multiple(() =>
{
Assert.That(singleBehaviorExecutor, Is.Not.Null);
Assert.That(twoBehaviorExecutor, Is.Not.Null);
Assert.That(singleBehaviorExecutor, Is.Not.SameAs(twoBehaviorExecutor));
Assert.That(
GetRequestPipelineExecutorValue(
requestBindings,
typeof(DispatcherPipelineCacheRequest),
typeof(int),
1),
Is.SameAs(singleBehaviorExecutor));
Assert.That(
GetRequestPipelineExecutorValue(
requestBindings,
typeof(DispatcherPipelineOrderCacheRequest),
typeof(int),
2),
Is.SameAs(twoBehaviorExecutor));
});
}
/// <summary>
/// 验证复用缓存的 request pipeline executor 后,行为顺序和最终处理器顺序保持不变。
/// </summary>
[Test]
public async Task Dispatcher_Should_Preserve_Request_Pipeline_Order_When_Reusing_Cached_Executor()
{
DispatcherPipelineOrderState.Reset();
await _context!.SendRequestAsync(new DispatcherPipelineOrderCacheRequest());
var firstInvocation = DispatcherPipelineOrderState.Steps.ToArray();
DispatcherPipelineOrderState.Reset();
await _context.SendRequestAsync(new DispatcherPipelineOrderCacheRequest());
var secondInvocation = DispatcherPipelineOrderState.Steps.ToArray();
var expectedOrder = new[]
{
"Outer:Before",
"Inner:Before",
"Handler",
"Inner:After",
"Outer:After"
};
Assert.Multiple(() =>
{
Assert.That(firstInvocation, Is.EqualTo(expectedOrder));
Assert.That(secondInvocation, Is.EqualTo(expectedOrder));
});
}
/// <summary> /// <summary>
/// 通过反射读取 dispatcher 的静态缓存对象。 /// 通过反射读取 dispatcher 的静态缓存对象。
/// </summary> /// </summary>
@ -188,6 +287,21 @@ internal sealed class CqrsDispatcherCacheTests
return InvokeInstanceMethod(cache, "GetValueOrDefaultForTesting", primaryType, secondaryType); return InvokeInstanceMethod(cache, "GetValueOrDefaultForTesting", primaryType, secondaryType);
} }
/// <summary>
/// 读取 request dispatch binding 中指定行为数量的 pipeline executor 缓存项。
/// </summary>
private static object? GetRequestPipelineExecutorValue(
object requestBindings,
Type requestType,
Type responseType,
int behaviorCount)
{
var binding = GetRequestDispatchBindingValue(requestBindings, requestType, responseType);
return binding is null
? null
: InvokeInstanceMethod(binding, "GetPipelineExecutorForTesting", behaviorCount);
}
/// <summary> /// <summary>
/// 调用缓存实例上的无参清理方法。 /// 调用缓存实例上的无参清理方法。
/// </summary> /// </summary>
@ -210,6 +324,28 @@ internal sealed class CqrsDispatcherCacheTests
return method!.Invoke(target, arguments); return method!.Invoke(target, arguments);
} }
/// <summary>
/// 读取指定请求/响应类型对对应的强类型 request dispatch binding。
/// </summary>
private static object? GetRequestDispatchBindingValue(object requestBindings, Type requestType, Type responseType)
{
var bindingBox = GetPairCacheValue(requestBindings, requestType, responseType);
if (bindingBox is null)
{
return null;
}
var method = bindingBox.GetType().GetMethod(
"Get",
BindingFlags.Instance | BindingFlags.Public | BindingFlags.NonPublic);
Assert.That(method, Is.Not.Null, $"Missing request binding accessor on {bindingBox.GetType().FullName}.");
return method!
.MakeGenericMethod(responseType)
.Invoke(bindingBox, Array.Empty<object>());
}
/// <summary> /// <summary>
/// 获取 CQRS dispatcher 运行时类型。 /// 获取 CQRS dispatcher 运行时类型。
/// </summary> /// </summary>

View File

@ -0,0 +1,8 @@
using GFramework.Cqrs.Abstractions.Cqrs;
namespace GFramework.Cqrs.Tests.Cqrs;
/// <summary>
/// 为双行为 pipeline 顺序回归提供最小请求。
/// </summary>
internal sealed record DispatcherPipelineOrderCacheRequest : IRequest<int>;

View File

@ -0,0 +1,21 @@
using GFramework.Cqrs.Abstractions.Cqrs;
namespace GFramework.Cqrs.Tests.Cqrs;
/// <summary>
/// 为双行为顺序回归提供最终请求处理器。
/// </summary>
internal sealed class DispatcherPipelineOrderCacheRequestHandler : IRequestHandler<DispatcherPipelineOrderCacheRequest, int>
{
/// <summary>
/// 记录处理器执行并返回固定结果。
/// </summary>
/// <param name="request">当前请求。</param>
/// <param name="cancellationToken">取消令牌。</param>
/// <returns>固定整数结果。</returns>
public ValueTask<int> Handle(DispatcherPipelineOrderCacheRequest request, CancellationToken cancellationToken)
{
DispatcherPipelineOrderState.Steps.Add("Handler");
return ValueTask.FromResult(3);
}
}

View File

@ -0,0 +1,27 @@
using GFramework.Cqrs.Abstractions.Cqrs;
namespace GFramework.Cqrs.Tests.Cqrs;
/// <summary>
/// 作为内层行为验证缓存 executor 复用后仍保持注册顺序。
/// </summary>
internal sealed class DispatcherPipelineOrderInnerBehavior : IPipelineBehavior<DispatcherPipelineOrderCacheRequest, int>
{
/// <summary>
/// 记录内层行为的前后执行节点。
/// </summary>
/// <param name="request">当前请求。</param>
/// <param name="next">下一个处理阶段。</param>
/// <param name="cancellationToken">取消令牌。</param>
/// <returns>下游处理器结果。</returns>
public async ValueTask<int> Handle(
DispatcherPipelineOrderCacheRequest request,
MessageHandlerDelegate<DispatcherPipelineOrderCacheRequest, int> next,
CancellationToken cancellationToken)
{
DispatcherPipelineOrderState.Steps.Add("Inner:Before");
var result = await next(request, cancellationToken).ConfigureAwait(false);
DispatcherPipelineOrderState.Steps.Add("Inner:After");
return result;
}
}

View File

@ -0,0 +1,27 @@
using GFramework.Cqrs.Abstractions.Cqrs;
namespace GFramework.Cqrs.Tests.Cqrs;
/// <summary>
/// 作为外层行为验证缓存 executor 复用后仍保持注册顺序。
/// </summary>
internal sealed class DispatcherPipelineOrderOuterBehavior : IPipelineBehavior<DispatcherPipelineOrderCacheRequest, int>
{
/// <summary>
/// 记录外层行为的前后执行节点。
/// </summary>
/// <param name="request">当前请求。</param>
/// <param name="next">下一个处理阶段。</param>
/// <param name="cancellationToken">取消令牌。</param>
/// <returns>下游处理器结果。</returns>
public async ValueTask<int> Handle(
DispatcherPipelineOrderCacheRequest request,
MessageHandlerDelegate<DispatcherPipelineOrderCacheRequest, int> next,
CancellationToken cancellationToken)
{
DispatcherPipelineOrderState.Steps.Add("Outer:Before");
var result = await next(request, cancellationToken).ConfigureAwait(false);
DispatcherPipelineOrderState.Steps.Add("Outer:After");
return result;
}
}

View File

@ -0,0 +1,20 @@
namespace GFramework.Cqrs.Tests.Cqrs;
/// <summary>
/// 记录双行为 pipeline 的实际执行顺序。
/// </summary>
internal static class DispatcherPipelineOrderState
{
/// <summary>
/// 获取按执行顺序追加的步骤名称。
/// </summary>
public static List<string> Steps { get; } = [];
/// <summary>
/// 清空当前记录,供下一次断言使用。
/// </summary>
public static void Reset()
{
Steps.Clear();
}
}

View File

@ -1,3 +1,4 @@
using System.Collections.Concurrent;
using GFramework.Core.Abstractions.Architectures; using GFramework.Core.Abstractions.Architectures;
using GFramework.Core.Abstractions.Ioc; using GFramework.Core.Abstractions.Ioc;
using GFramework.Core.Abstractions.Logging; using GFramework.Core.Abstractions.Logging;
@ -34,7 +35,7 @@ internal sealed class CqrsDispatcher(
.GetMethod(nameof(InvokeRequestHandlerAsync), BindingFlags.NonPublic | BindingFlags.Static)!; .GetMethod(nameof(InvokeRequestHandlerAsync), BindingFlags.NonPublic | BindingFlags.Static)!;
private static readonly MethodInfo RequestPipelineInvokerMethodDefinition = typeof(CqrsDispatcher) private static readonly MethodInfo RequestPipelineInvokerMethodDefinition = typeof(CqrsDispatcher)
.GetMethod(nameof(InvokeRequestPipelineAsync), BindingFlags.NonPublic | BindingFlags.Static)!; .GetMethod(nameof(InvokeRequestPipelineExecutorAsync), BindingFlags.NonPublic | BindingFlags.Static)!;
private static readonly MethodInfo NotificationHandlerInvokerMethodDefinition = typeof(CqrsDispatcher) private static readonly MethodInfo NotificationHandlerInvokerMethodDefinition = typeof(CqrsDispatcher)
.GetMethod(nameof(InvokeNotificationHandlerAsync), BindingFlags.NonPublic | BindingFlags.Static)!; .GetMethod(nameof(InvokeNotificationHandlerAsync), BindingFlags.NonPublic | BindingFlags.Static)!;
@ -108,7 +109,8 @@ internal sealed class CqrsDispatcher(
if (behaviors.Count == 0) if (behaviors.Count == 0)
return await dispatchBinding.RequestInvoker(handler, request, cancellationToken).ConfigureAwait(false); return await dispatchBinding.RequestInvoker(handler, request, cancellationToken).ConfigureAwait(false);
return await dispatchBinding.PipelineInvoker(handler, behaviors, request, cancellationToken) return await dispatchBinding.GetPipelineExecutor(behaviors.Count)
.Invoke(handler, behaviors, request, cancellationToken)
.ConfigureAwait(false); .ConfigureAwait(false);
} }
@ -168,7 +170,7 @@ internal sealed class CqrsDispatcher(
typeof(IRequestHandler<,>).MakeGenericType(requestType, typeof(TResponse)), typeof(IRequestHandler<,>).MakeGenericType(requestType, typeof(TResponse)),
typeof(IPipelineBehavior<,>).MakeGenericType(requestType, typeof(TResponse)), typeof(IPipelineBehavior<,>).MakeGenericType(requestType, typeof(TResponse)),
CreateRequestInvoker<TResponse>(requestType), CreateRequestInvoker<TResponse>(requestType),
CreateRequestPipelineInvoker<TResponse>(requestType)); requestType);
} }
/// <summary> /// <summary>
@ -227,18 +229,6 @@ internal sealed class CqrsDispatcher(
return (RequestInvoker<TResponse>)Delegate.CreateDelegate(typeof(RequestInvoker<TResponse>), method); return (RequestInvoker<TResponse>)Delegate.CreateDelegate(typeof(RequestInvoker<TResponse>), method);
} }
/// <summary>
/// 生成带管道行为的请求处理委托,避免每次发送都重复反射。
/// </summary>
private static RequestPipelineInvoker<TResponse> CreateRequestPipelineInvoker<TResponse>(Type requestType)
{
var method = RequestPipelineInvokerMethodDefinition
.MakeGenericMethod(requestType, typeof(TResponse));
return (RequestPipelineInvoker<TResponse>)Delegate.CreateDelegate(
typeof(RequestPipelineInvoker<TResponse>),
method);
}
/// <summary> /// <summary>
/// 生成通知处理器调用委托,避免每次发布都重复反射。 /// 生成通知处理器调用委托,避免每次发布都重复反射。
/// </summary> /// </summary>
@ -274,29 +264,20 @@ internal sealed class CqrsDispatcher(
} }
/// <summary> /// <summary>
/// 执行包含管道行为链的请求处理。 /// 执行指定行为数量的强类型 request pipeline executor。
/// 该入口本身是缓存的固定 executor 形状;每次分发只绑定当前 handler 与 behaviors 实例。
/// </summary> /// </summary>
private static ValueTask<TResponse> InvokeRequestPipelineAsync<TRequest, TResponse>( private static ValueTask<TResponse> InvokeRequestPipelineExecutorAsync<TRequest, TResponse>(
object handler, object handler,
IReadOnlyList<object> behaviors, IReadOnlyList<object> behaviors,
object request, object request,
CancellationToken cancellationToken) CancellationToken cancellationToken)
where TRequest : IRequest<TResponse> where TRequest : IRequest<TResponse>
{ {
var typedHandler = (IRequestHandler<TRequest, TResponse>)handler; var invocation = new RequestPipelineInvocation<TRequest, TResponse>(
var typedRequest = (TRequest)request; (IRequestHandler<TRequest, TResponse>)handler,
behaviors);
MessageHandlerDelegate<TRequest, TResponse> next = return invocation.InvokeAsync((TRequest)request, cancellationToken);
(message, token) => typedHandler.Handle(message, token);
for (var i = behaviors.Count - 1; i >= 0; i--)
{
var behavior = (IPipelineBehavior<TRequest, TResponse>)behaviors[i];
var currentNext = next;
next = (message, token) => behavior.Handle(message, currentNext, token);
}
return next(typedRequest, cancellationToken);
} }
/// <summary> /// <summary>
@ -424,15 +405,17 @@ internal sealed class CqrsDispatcher(
/// <summary> /// <summary>
/// 保存普通请求分发路径所需的 handler 服务类型、pipeline 服务类型与强类型调用委托。 /// 保存普通请求分发路径所需的 handler 服务类型、pipeline 服务类型与强类型调用委托。
/// 该绑定同时覆盖“直接请求处理”和“带 pipeline 的请求处理”两条路径。 /// 该绑定同时覆盖“直接请求处理”和“按行为数量缓存 pipeline executor 形状”的两条路径。
/// </summary> /// </summary>
/// <typeparam name="TResponse">请求响应类型。</typeparam> /// <typeparam name="TResponse">请求响应类型。</typeparam>
private sealed class RequestDispatchBinding<TResponse>( private sealed class RequestDispatchBinding<TResponse>(
Type handlerType, Type handlerType,
Type behaviorType, Type behaviorType,
RequestInvoker<TResponse> requestInvoker, RequestInvoker<TResponse> requestInvoker,
RequestPipelineInvoker<TResponse> pipelineInvoker) Type requestType)
{ {
private readonly ConcurrentDictionary<int, RequestPipelineExecutor<TResponse>> _pipelineExecutors = new();
/// <summary> /// <summary>
/// 获取请求处理器在容器中的服务类型。 /// 获取请求处理器在容器中的服务类型。
/// </summary> /// </summary>
@ -449,8 +432,157 @@ internal sealed class CqrsDispatcher(
public RequestInvoker<TResponse> RequestInvoker { get; } = requestInvoker; public RequestInvoker<TResponse> RequestInvoker { get; } = requestInvoker;
/// <summary> /// <summary>
/// 获取执行 pipeline 行为链的强类型委托。 /// 获取指定行为数量对应的 pipeline executor。
/// executor 形状会按请求/响应类型与行为数量缓存,但不会缓存 handler 或 behavior 实例。
/// </summary> /// </summary>
public RequestPipelineInvoker<TResponse> PipelineInvoker { get; } = pipelineInvoker; public RequestPipelineExecutor<TResponse> GetPipelineExecutor(int behaviorCount)
{
ArgumentOutOfRangeException.ThrowIfNegative(behaviorCount);
return _pipelineExecutors.GetOrAdd(
behaviorCount,
count => CreateRequestPipelineExecutor<TResponse>(requestType, count));
}
/// <summary>
/// 仅供测试读取指定行为数量是否已存在缓存 executor。
/// </summary>
public object? GetPipelineExecutorForTesting(int behaviorCount)
{
_pipelineExecutors.TryGetValue(behaviorCount, out var executor);
return executor;
}
}
/// <summary>
/// 为指定请求/响应类型与固定行为数量创建 pipeline executor。
/// 行为数量用于表达缓存形状,实际分发仍会消费本次容器解析出的 handler 与 behaviors 实例。
/// </summary>
private static RequestPipelineExecutor<TResponse> CreateRequestPipelineExecutor<TResponse>(
Type requestType,
int behaviorCount)
{
ArgumentOutOfRangeException.ThrowIfNegative(behaviorCount);
var method = RequestPipelineInvokerMethodDefinition
.MakeGenericMethod(requestType, typeof(TResponse));
var invoker = (RequestPipelineInvoker<TResponse>)Delegate.CreateDelegate(
typeof(RequestPipelineInvoker<TResponse>),
method);
return new RequestPipelineExecutor<TResponse>(behaviorCount, invoker);
}
/// <summary>
/// 保存固定行为数量下的 typed pipeline executor 形状。
/// 该对象自身可跨分发复用,但每次调用都只绑定当前 handler 与 behavior 实例。
/// </summary>
/// <typeparam name="TResponse">请求响应类型。</typeparam>
private sealed class RequestPipelineExecutor<TResponse>(
int behaviorCount,
RequestPipelineInvoker<TResponse> invoker)
{
/// <summary>
/// 获取此 executor 预期处理的行为数量。
/// </summary>
public int BehaviorCount { get; } = behaviorCount;
/// <summary>
/// 使用当前 handler / behaviors / request 执行缓存的 pipeline 形状。
/// </summary>
public ValueTask<TResponse> Invoke(
object handler,
IReadOnlyList<object> behaviors,
object request,
CancellationToken cancellationToken)
{
if (behaviors.Count != BehaviorCount)
{
throw new InvalidOperationException(
$"Cached request pipeline executor expected {BehaviorCount} behaviors, but received {behaviors.Count}.");
}
return invoker(handler, behaviors, request, cancellationToken);
}
}
/// <summary>
/// 保存单次 request pipeline 分发所需的当前 handler、behavior 列表和 continuation 缓存。
/// 该对象只存在于本次分发,不会跨请求保留容器解析出的实例。
/// </summary>
private sealed class RequestPipelineInvocation<TRequest, TResponse>(
IRequestHandler<TRequest, TResponse> handler,
IReadOnlyList<object> behaviors)
where TRequest : IRequest<TResponse>
{
private readonly IRequestHandler<TRequest, TResponse> _handler = handler;
private readonly IReadOnlyList<object> _behaviors = behaviors;
private readonly MessageHandlerDelegate<TRequest, TResponse>?[] _continuations =
new MessageHandlerDelegate<TRequest, TResponse>?[behaviors.Count + 1];
/// <summary>
/// 从 pipeline 起点执行当前请求。
/// </summary>
public ValueTask<TResponse> InvokeAsync(TRequest request, CancellationToken cancellationToken)
{
return GetContinuation(0)(request, cancellationToken);
}
/// <summary>
/// 获取指定阶段的 continuation并在首次请求时为该阶段绑定一次不可变调用入口。
/// 同一行为多次调用 <c>next</c> 时会命中相同 continuation保持与传统链式委托一致的语义。
/// </summary>
private MessageHandlerDelegate<TRequest, TResponse> GetContinuation(int index)
{
var continuation = _continuations[index];
if (continuation is not null)
{
return continuation;
}
continuation = index == _behaviors.Count
? InvokeHandlerAsync
: new RequestPipelineContinuation<TRequest, TResponse>(this, index).InvokeAsync;
_continuations[index] = continuation;
return continuation;
}
/// <summary>
/// 执行指定索引的 pipeline behavior。
/// </summary>
private ValueTask<TResponse> InvokeBehaviorAsync(
int index,
TRequest request,
CancellationToken cancellationToken)
{
var behavior = (IPipelineBehavior<TRequest, TResponse>)_behaviors[index];
return behavior.Handle(request, GetContinuation(index + 1), cancellationToken);
}
/// <summary>
/// 调用最终请求处理器。
/// </summary>
private ValueTask<TResponse> InvokeHandlerAsync(TRequest request, CancellationToken cancellationToken)
{
return _handler.Handle(request, cancellationToken);
}
/// <summary>
/// 将固定阶段索引绑定为标准 <see cref="MessageHandlerDelegate{TRequest,TResponse}" />。
/// 该包装只在单次分发生命周期内存在,用于把缓存 shape 套入当前实例。
/// </summary>
private sealed class RequestPipelineContinuation<TCurrentRequest, TCurrentResponse>(
RequestPipelineInvocation<TCurrentRequest, TCurrentResponse> invocation,
int index)
where TCurrentRequest : IRequest<TCurrentResponse>
{
/// <summary>
/// 执行当前阶段并跳转到下一个 continuation。
/// </summary>
public ValueTask<TCurrentResponse> InvokeAsync(
TCurrentRequest request,
CancellationToken cancellationToken)
{
return invocation.InvokeBehaviorAsync(index, request, cancellationToken);
}
}
} }
} }