perf(cqrs): 优化 stream 建流常量成本

- 优化 generated stream dispatch binding 为按响应类型缓存强类型 invoker 与 pipeline executor,压缩 CreateStream 热路径桥接开销

- 保持 stream 异常契约与行为缓存语义不变,并补齐相关 XML 注释与必要内联说明

- 补充 generated stream binding 与 pipeline executor 复用回归,覆盖 generated invoker 与 stream pipeline 组合场景
This commit is contained in:
gewuyou 2026-05-09 16:14:16 +08:00
parent 228e954d2d
commit b7fa3eee29
3 changed files with 313 additions and 64 deletions

View File

@ -903,7 +903,7 @@ internal sealed class CqrsDispatcherCacheTests
Type responseType,
int behaviorCount)
{
var binding = GetPairCacheValue(streamBindings, requestType, responseType);
var binding = GetStreamDispatchBindingValue(streamBindings, requestType, responseType);
return binding is null
? null
: InvokeInstanceMethod(binding, "GetPipelineExecutorForTesting", behaviorCount);
@ -957,6 +957,32 @@ internal sealed class CqrsDispatcherCacheTests
.Invoke(bindingBox, Array.Empty<object>());
}
/// <summary>
/// 读取指定流式请求/响应类型对对应的强类型 stream dispatch binding。
/// </summary>
/// <param name="streamBindings">dispatcher 内部的 stream binding 缓存对象。</param>
/// <param name="requestType">要读取的流式请求运行时类型。</param>
/// <param name="responseType">要读取的响应元素类型。</param>
/// <returns>强类型 binding若缓存尚未建立则返回 <see langword="null" />。</returns>
private static object? GetStreamDispatchBindingValue(object streamBindings, Type requestType, Type responseType)
{
var bindingBox = GetPairCacheValue(streamBindings, requestType, responseType);
if (bindingBox is null)
{
return null;
}
var method = bindingBox.GetType().GetMethod(
"Get",
BindingFlags.Instance | BindingFlags.Public | BindingFlags.NonPublic);
Assert.That(method, Is.Not.Null, $"Missing stream binding accessor on {bindingBox.GetType().FullName}.");
return method!
.MakeGenericMethod(responseType)
.Invoke(bindingBox, Array.Empty<object>());
}
/// <summary>
/// 获取 CQRS dispatcher 运行时类型。
/// </summary>

View File

@ -222,6 +222,49 @@ internal sealed class CqrsGeneratedRequestInvokerProviderTests
});
}
/// <summary>
/// 验证 generated stream binding 与对应的 pipeline executor 在首次建流后会被缓存并复用,
/// 同时保持 generated invoker 的结果与行为执行语义不变。
/// </summary>
[Test]
public async Task CreateStream_Should_Reuse_Cached_Generated_Stream_Binding_And_Pipeline_Executor()
{
var generatedAssembly = CreateGeneratedStreamInvokerAssembly();
var container = new MicrosoftDiContainer();
container.RegisterCqrsStreamPipelineBehavior<GeneratedStreamPipelineTrackingBehavior>();
CqrsTestRuntime.RegisterHandlers(container, generatedAssembly.Object);
container.Freeze();
var streamBindings = GetDispatcherCacheField("StreamDispatchBindings");
var requestType = typeof(GeneratedStreamInvokerRequest);
var responseType = typeof(int);
Assert.That(
GetStreamPipelineExecutorValue(streamBindings, requestType, responseType, 1),
Is.Null);
var context = new ArchitectureContext(container);
var firstResults = await DrainAsync(context.CreateStream(new GeneratedStreamInvokerRequest(3))).ConfigureAwait(false);
var bindingAfterFirstDispatch = GetPairCacheValue(streamBindings, requestType, responseType);
var executorAfterFirstDispatch = GetStreamPipelineExecutorValue(streamBindings, requestType, responseType, 1);
var secondResults = await DrainAsync(context.CreateStream(new GeneratedStreamInvokerRequest(3))).ConfigureAwait(false);
var bindingAfterSecondDispatch = GetPairCacheValue(streamBindings, requestType, responseType);
var executorAfterSecondDispatch = GetStreamPipelineExecutorValue(streamBindings, requestType, responseType, 1);
Assert.Multiple(() =>
{
Assert.That(firstResults, Is.EqualTo([30, 31]));
Assert.That(secondResults, Is.EqualTo([30, 31]));
Assert.That(bindingAfterFirstDispatch, Is.Not.Null);
Assert.That(bindingAfterSecondDispatch, Is.SameAs(bindingAfterFirstDispatch));
Assert.That(executorAfterFirstDispatch, Is.Not.Null);
Assert.That(executorAfterSecondDispatch, Is.SameAs(executorAfterFirstDispatch));
Assert.That(GeneratedStreamPipelineTrackingBehavior.InvocationCount, Is.EqualTo(2));
});
}
/// <summary>
/// 验证当实现类型隐藏、但 stream handler interface 仍可直接表达时,
/// dispatcher 仍会消费 generated stream invoker descriptor。
@ -959,6 +1002,65 @@ internal sealed class CqrsGeneratedRequestInvokerProviderTests
.Invoke(cache, Array.Empty<object>());
}
/// <summary>
/// 读取双键缓存中当前保存的对象。
/// </summary>
private static object? GetPairCacheValue(object cache, Type primaryType, Type secondaryType)
{
return InvokeInstanceMethod(cache, "GetValueOrDefaultForTesting", primaryType, secondaryType);
}
/// <summary>
/// 读取指定 stream dispatch binding 中当前缓存的 pipeline executor。
/// </summary>
private static object? GetStreamPipelineExecutorValue(
object streamBindings,
Type requestType,
Type responseType,
int behaviorCount)
{
var binding = GetStreamDispatchBindingValue(streamBindings, requestType, responseType);
return binding is null
? null
: InvokeInstanceMethod(binding, "GetPipelineExecutorForTesting", behaviorCount);
}
/// <summary>
/// 读取指定流式请求/响应类型对对应的强类型 stream dispatch binding。
/// </summary>
private static object? GetStreamDispatchBindingValue(object streamBindings, Type requestType, Type responseType)
{
var bindingBox = GetPairCacheValue(streamBindings, requestType, responseType);
if (bindingBox is null)
{
return null;
}
var method = bindingBox.GetType().GetMethod(
"Get",
BindingFlags.Instance | BindingFlags.Public | BindingFlags.NonPublic);
Assert.That(method, Is.Not.Null, $"Missing stream binding accessor on {bindingBox.GetType().FullName}.");
return method!
.MakeGenericMethod(responseType)
.Invoke(bindingBox, Array.Empty<object>());
}
/// <summary>
/// 调用目标对象上的实例方法。
/// </summary>
private static object? InvokeInstanceMethod(object target, string methodName, params object[] arguments)
{
var method = target.GetType().GetMethod(
methodName,
BindingFlags.Instance | BindingFlags.Public | BindingFlags.NonPublic);
Assert.That(method, Is.Not.Null, $"Missing method {target.GetType().FullName}.{methodName}.");
return method!.Invoke(target, arguments);
}
/// <summary>
/// 枚举并收集当前异步流中的全部元素,便于断言 generated stream invoker 的输出。
/// </summary>

View File

@ -45,8 +45,9 @@ internal sealed class CqrsDispatcher(
private static readonly WeakKeyCache<Type, NotificationDispatchBinding>
NotificationDispatchBindings = new();
// 卸载安全的进程级缓存:请求/响应类型对采用弱键缓存,避免流式消息类型被静态字典永久保留。
private static readonly WeakTypePairCache<StreamDispatchBinding>
// 卸载安全的进程级缓存:流式请求/响应类型对命中后复用强类型 dispatch binding 盒子,
// 避免 stream 响应元素在热路径上退化为 object 桥接,同时仍保持弱键卸载安全语义。
private static readonly WeakTypePairCache<StreamDispatchBindingBox>
StreamDispatchBindings = new();
// 卸载安全的进程级缓存:请求/响应类型对命中后复用强类型 dispatch binding
@ -190,10 +191,7 @@ internal sealed class CqrsDispatcher(
ArgumentNullException.ThrowIfNull(request);
var requestType = request.GetType();
var dispatchBinding = StreamDispatchBindings.GetOrAdd(
requestType,
typeof(TResponse),
static (requestType, responseType) => CreateStreamDispatchBinding(requestType, responseType));
var dispatchBinding = GetStreamDispatchBinding<TResponse>(requestType);
var handler = container.Get(dispatchBinding.HandlerType)
?? throw new InvalidOperationException(
$"No CQRS stream handler registered for {requestType.FullName}.");
@ -201,7 +199,7 @@ internal sealed class CqrsDispatcher(
PrepareHandler(handler, context);
if (!HasStreamBehaviorRegistration(dispatchBinding.BehaviorType))
{
return (IAsyncEnumerable<TResponse>)dispatchBinding.StreamInvoker(handler, request, cancellationToken);
return dispatchBinding.StreamInvoker(handler, request, cancellationToken);
}
var behaviors = container.GetAll(dispatchBinding.BehaviorType);
@ -211,7 +209,7 @@ internal sealed class CqrsDispatcher(
PrepareHandler(behavior, context);
}
return (IAsyncEnumerable<TResponse>)dispatchBinding.GetPipelineExecutor(behaviors.Count)
return dispatchBinding.GetPipelineExecutor(behaviors.Count)
.Invoke(handler, behaviors, dispatchBinding.StreamInvoker, request, cancellationToken);
}
@ -399,75 +397,114 @@ internal sealed class CqrsDispatcher(
/// <summary>
/// 为指定流式请求类型构造完整分发绑定,把服务类型与调用委托聚合到同一缓存项。
/// </summary>
private static StreamDispatchBinding CreateStreamDispatchBinding(Type requestType, Type responseType)
private static StreamDispatchBinding<TResponse> CreateStreamDispatchBinding<TResponse>(Type requestType)
{
var generatedDescriptor = TryGetGeneratedStreamInvokerDescriptor(requestType, responseType);
var generatedDescriptor = TryGetGeneratedStreamInvokerDescriptor<TResponse>(requestType);
if (generatedDescriptor is not null)
{
var resolvedGeneratedDescriptor = generatedDescriptor.Value;
return new StreamDispatchBinding(
return new StreamDispatchBinding<TResponse>(
resolvedGeneratedDescriptor.HandlerType,
typeof(IStreamPipelineBehavior<,>).MakeGenericType(requestType, responseType),
typeof(IStreamPipelineBehavior<,>).MakeGenericType(requestType, typeof(TResponse)),
requestType,
responseType,
resolvedGeneratedDescriptor.Invoker);
}
return new StreamDispatchBinding(
typeof(IStreamRequestHandler<,>).MakeGenericType(requestType, responseType),
typeof(IStreamPipelineBehavior<,>).MakeGenericType(requestType, responseType),
return new StreamDispatchBinding<TResponse>(
typeof(IStreamRequestHandler<,>).MakeGenericType(requestType, typeof(TResponse)),
typeof(IStreamPipelineBehavior<,>).MakeGenericType(requestType, typeof(TResponse)),
requestType,
responseType,
CreateStreamInvoker(requestType, responseType));
CreateStreamInvoker<TResponse>(requestType));
}
/// <summary>
/// 获取指定流式请求/响应类型对的 dispatch binding若缓存未命中则按当前加载状态创建。
/// </summary>
/// <typeparam name="TResponse">流式响应元素类型。</typeparam>
/// <param name="requestType">流式请求运行时类型。</param>
/// <returns>当前请求/响应类型对对应的强类型 stream dispatch binding。</returns>
private static StreamDispatchBinding<TResponse> GetStreamDispatchBinding<TResponse>(Type requestType)
{
var bindingBox = StreamDispatchBindings.GetOrAdd(
requestType,
typeof(TResponse),
static (cachedRequestType, cachedResponseType) =>
CreateStreamDispatchBindingBox<TResponse>(cachedRequestType, cachedResponseType));
return bindingBox.Get<TResponse>();
}
/// <summary>
/// 为弱键流式请求缓存创建强类型 binding 盒子,避免响应元素走 object 结果桥接。
/// </summary>
/// <typeparam name="TResponse">流式响应元素类型。</typeparam>
/// <param name="requestType">流式请求运行时类型。</param>
/// <param name="responseType">缓存命中的响应运行时类型。</param>
/// <returns>可放入弱键缓存的强类型 binding 盒子。</returns>
private static StreamDispatchBindingBox CreateStreamDispatchBindingBox<TResponse>(
Type requestType,
Type responseType)
{
if (responseType != typeof(TResponse))
{
throw new InvalidOperationException(
$"Stream dispatch binding cache expected response type {typeof(TResponse).FullName}, but received {responseType.FullName}.");
}
return StreamDispatchBindingBox.Create(CreateStreamDispatchBinding<TResponse>(requestType));
}
/// <summary>
/// 尝试从容器已注册的 generated stream invoker provider 中获取指定流式请求/响应类型对的元数据。
/// </summary>
/// <typeparam name="TResponse">流式响应元素类型。</typeparam>
/// <param name="requestType">流式请求运行时类型。</param>
/// <param name="responseType">流式响应元素类型。</param>
/// <returns>命中时返回强类型化后的描述符;否则返回 <see langword="null" />。</returns>
private static StreamInvokerDescriptor? TryGetGeneratedStreamInvokerDescriptor(Type requestType, Type responseType)
private static StreamInvokerDescriptor<TResponse>? TryGetGeneratedStreamInvokerDescriptor<TResponse>(Type requestType)
{
return GeneratedStreamInvokers.TryGetValue(requestType, responseType, out var metadata) &&
return GeneratedStreamInvokers.TryGetValue(requestType, typeof(TResponse), out var metadata) &&
metadata is not null
? CreateStreamInvokerDescriptor(requestType, responseType, metadata)
? CreateStreamInvokerDescriptor<TResponse>(requestType, metadata)
: null;
}
/// <summary>
/// 把 provider 返回的弱类型描述符转换为 dispatcher 内部使用的 stream invoker 描述符。
/// </summary>
/// <typeparam name="TResponse">流式响应元素类型。</typeparam>
/// <param name="requestType">流式请求运行时类型。</param>
/// <param name="responseType">流式响应元素类型。</param>
/// <param name="descriptor">provider 返回的弱类型描述符。</param>
/// <returns>可直接用于创建 stream dispatch binding 的描述符。</returns>
/// <exception cref="InvalidOperationException">当 provider 返回的委托签名与当前流式请求/响应类型对不匹配时抛出。</exception>
private static StreamInvokerDescriptor CreateStreamInvokerDescriptor(
private static StreamInvokerDescriptor<TResponse> CreateStreamInvokerDescriptor<TResponse>(
Type requestType,
Type responseType,
GeneratedStreamInvokerMetadata descriptor)
{
if (!descriptor.InvokerMethod.IsStatic)
{
throw new InvalidOperationException(
$"Generated CQRS stream invoker provider returned a non-static invoker method for request type {requestType.FullName} and response type {responseType.FullName}.");
$"Generated CQRS stream invoker provider returned a non-static invoker method for request type {requestType.FullName} and response type {typeof(TResponse).FullName}.");
}
try
{
if (Delegate.CreateDelegate(typeof(StreamInvoker), descriptor.InvokerMethod) is not StreamInvoker invoker)
if (Delegate.CreateDelegate(typeof(WeakStreamInvoker), descriptor.InvokerMethod) is not
WeakStreamInvoker weakInvoker)
{
throw new InvalidOperationException(
$"Generated CQRS stream invoker provider returned an incompatible invoker for request type {requestType.FullName} and response type {responseType.FullName}.");
$"Generated CQRS stream invoker provider returned an incompatible invoker for request type {requestType.FullName} and response type {typeof(TResponse).FullName}.");
}
return new StreamInvokerDescriptor(descriptor.HandlerType, invoker);
// generated stream descriptor 的公开契约仍以 object 返回值暴露异步流;
// 这里在 binding 创建时只做一次适配,把后续 CreateStream 热路径保持为强类型调用。
var adapter = new GeneratedStreamInvokerAdapter<TResponse>(weakInvoker);
StreamInvoker<TResponse> invoker = (handler, request, cancellationToken) =>
adapter.Invoke(handler, request, cancellationToken);
return new StreamInvokerDescriptor<TResponse>(descriptor.HandlerType, invoker);
}
catch (ArgumentException exception)
{
throw new InvalidOperationException(
$"Generated CQRS stream invoker provider returned an incompatible invoker for request type {requestType.FullName} and response type {responseType.FullName}.",
$"Generated CQRS stream invoker provider returned an incompatible invoker for request type {requestType.FullName} and response type {typeof(TResponse).FullName}.",
exception);
}
}
@ -539,11 +576,11 @@ internal sealed class CqrsDispatcher(
/// <summary>
/// 生成流式处理器调用委托,避免每次创建流都重复反射。
/// </summary>
private static StreamInvoker CreateStreamInvoker(Type requestType, Type responseType)
private static StreamInvoker<TResponse> CreateStreamInvoker<TResponse>(Type requestType)
{
var method = StreamHandlerInvokerMethodDefinition
.MakeGenericMethod(requestType, responseType);
return (StreamInvoker)Delegate.CreateDelegate(typeof(StreamInvoker), method);
.MakeGenericMethod(requestType, typeof(TResponse));
return (StreamInvoker<TResponse>)Delegate.CreateDelegate(typeof(StreamInvoker<TResponse>), method);
}
/// <summary>
@ -594,7 +631,7 @@ internal sealed class CqrsDispatcher(
/// <summary>
/// 执行已强类型化的流式处理器调用。
/// </summary>
private static object InvokeStreamHandler<TRequest, TResponse>(
private static IAsyncEnumerable<TResponse> InvokeStreamHandler<TRequest, TResponse>(
object handler,
object request,
CancellationToken cancellationToken)
@ -609,10 +646,10 @@ internal sealed class CqrsDispatcher(
/// 执行指定行为数量的强类型 stream pipeline executor。
/// 该入口本身是缓存的固定 executor 形状;每次建流只绑定当前 handler 与 behaviors 实例。
/// </summary>
private static object InvokeStreamPipelineExecutor<TRequest, TResponse>(
private static IAsyncEnumerable<TResponse> InvokeStreamPipelineExecutor<TRequest, TResponse>(
object handler,
IReadOnlyList<object> behaviors,
StreamInvoker streamInvoker,
StreamInvoker<TResponse> streamInvoker,
object request,
CancellationToken cancellationToken)
where TRequest : IStreamRequest<TResponse>
@ -638,12 +675,17 @@ internal sealed class CqrsDispatcher(
private delegate ValueTask NotificationInvoker(object handler, object notification,
CancellationToken cancellationToken);
private delegate object StreamInvoker(object handler, object request, CancellationToken cancellationToken);
private delegate IAsyncEnumerable<TResponse> StreamInvoker<TResponse>(
object handler,
object request,
CancellationToken cancellationToken);
private delegate object StreamPipelineInvoker(
private delegate object WeakStreamInvoker(object handler, object request, CancellationToken cancellationToken);
private delegate IAsyncEnumerable<TResponse> StreamPipelineInvoker<TResponse>(
object handler,
IReadOnlyList<object> behaviors,
StreamInvoker streamInvoker,
StreamInvoker<TResponse> streamInvoker,
object request,
CancellationToken cancellationToken);
@ -692,6 +734,72 @@ internal sealed class CqrsDispatcher(
}
}
/// <summary>
/// 将不同响应类型的 stream dispatch binding 包装到统一弱缓存值中,
/// 同时保留强类型流式委托,避免响应元素退化为 object 桥接。
/// </summary>
private abstract class StreamDispatchBindingBox
{
/// <summary>
/// 创建一个新的强类型 stream dispatch binding 盒子。
/// </summary>
public static StreamDispatchBindingBox Create<TResponse>(StreamDispatchBinding<TResponse> binding)
{
ArgumentNullException.ThrowIfNull(binding);
return new StreamDispatchBindingBox<TResponse>(binding);
}
/// <summary>
/// 读取指定响应类型的 stream dispatch binding。
/// </summary>
public abstract StreamDispatchBinding<TResponse> Get<TResponse>();
}
/// <summary>
/// 保存特定响应类型的 stream dispatch binding。
/// </summary>
/// <typeparam name="TResponse">流式响应元素类型。</typeparam>
private sealed class StreamDispatchBindingBox<TResponse>(StreamDispatchBinding<TResponse> binding)
: StreamDispatchBindingBox
{
private readonly StreamDispatchBinding<TResponse> _binding = binding;
/// <summary>
/// 以原始强类型返回当前 binding若请求的响应类型不匹配则抛出异常。
/// </summary>
public override StreamDispatchBinding<TRequestedResponse> Get<TRequestedResponse>()
{
if (typeof(TRequestedResponse) != typeof(TResponse))
{
throw new InvalidOperationException(
$"Cached stream dispatch binding for {typeof(TResponse).FullName} cannot be used as {typeof(TRequestedResponse).FullName}.");
}
return (StreamDispatchBinding<TRequestedResponse>)(object)_binding;
}
}
/// <summary>
/// 将 generated stream provider 的弱类型开放静态入口适配为 dispatcher 内部的强类型流式委托。
/// 适配对象与 binding 同生命周期缓存,避免在每次建流时重复创建桥接闭包。
/// </summary>
/// <typeparam name="TResponse">流式响应元素类型。</typeparam>
private sealed class GeneratedStreamInvokerAdapter<TResponse>(WeakStreamInvoker invoker)
{
private readonly WeakStreamInvoker _invoker = invoker;
/// <summary>
/// 调用 generated provider 暴露的弱类型入口,并把返回结果物化为当前响应类型的异步流。
/// </summary>
public IAsyncEnumerable<TResponse> Invoke(
object handler,
object request,
CancellationToken cancellationToken)
{
return (IAsyncEnumerable<TResponse>)_invoker(handler, request, cancellationToken);
}
}
/// <summary>
/// 保存通知分发路径所需的服务类型与强类型调用委托。
/// 该绑定把“容器解析哪个服务类型”与“如何调用处理器”聚合到同一缓存项中。
@ -722,17 +830,16 @@ internal sealed class CqrsDispatcher(
/// 保存流式请求分发路径所需的服务类型与调用委托。
/// 该绑定让建流热路径只需一次缓存命中即可获得解析与调用所需元数据。
/// </summary>
private sealed class StreamDispatchBinding(
private sealed class StreamDispatchBinding<TResponse>(
Type handlerType,
Type behaviorType,
Type requestType,
Type responseType,
StreamInvoker streamInvoker)
StreamInvoker<TResponse> streamInvoker)
{
// 线程安全:该缓存按 behaviorCount 复用 stream pipeline executor 形状,缓存项只保存委托与数量信息,
// 不会跨建流缓存 handler 或 behavior 实例。若不同请求持续出现新的行为数量组合,字典会随之增长。
private readonly ConcurrentDictionary<int, StreamPipelineExecutor> _pipelineExecutors = new();
private readonly StreamPipelineInvoker _pipelineInvoker = CreateStreamPipelineInvoker(requestType, responseType);
private readonly ConcurrentDictionary<int, StreamPipelineExecutor<TResponse>> _pipelineExecutors = new();
private readonly StreamPipelineInvoker<TResponse> _pipelineInvoker = CreateStreamPipelineInvoker<TResponse>(requestType);
/// <summary>
/// 获取流式请求处理器在容器中的服务类型。
@ -747,19 +854,19 @@ internal sealed class CqrsDispatcher(
/// <summary>
/// 获取执行流式请求处理器的调用委托。
/// </summary>
public StreamInvoker StreamInvoker { get; } = streamInvoker;
public StreamInvoker<TResponse> StreamInvoker { get; } = streamInvoker;
/// <summary>
/// 获取指定行为数量对应的 stream pipeline executor。
/// executor 形状会按行为数量缓存,但不会缓存 handler 或 behavior 实例。
/// </summary>
public StreamPipelineExecutor GetPipelineExecutor(int behaviorCount)
public StreamPipelineExecutor<TResponse> GetPipelineExecutor(int behaviorCount)
{
ArgumentOutOfRangeException.ThrowIfNegative(behaviorCount);
return _pipelineExecutors.GetOrAdd(
return _pipelineExecutors.GetOrAdd<StreamPipelineExecutorFactoryState<TResponse>>(
behaviorCount,
static (count, state) => new StreamPipelineExecutor(count, state.PipelineInvoker),
new StreamPipelineExecutorFactoryState(_pipelineInvoker));
static (count, state) => CreateStreamPipelineExecutor(count, state.PipelineInvoker),
new StreamPipelineExecutorFactoryState<TResponse>(_pipelineInvoker));
}
/// <summary>
@ -922,27 +1029,41 @@ internal sealed class CqrsDispatcher(
/// </summary>
/// <param name="HandlerType">流式请求处理器在容器中的服务类型。</param>
/// <param name="Invoker">执行流式请求处理器的调用委托。</param>
private readonly record struct StreamInvokerDescriptor(
private readonly record struct StreamInvokerDescriptor<TResponse>(
Type HandlerType,
StreamInvoker Invoker);
StreamInvoker<TResponse> Invoker);
/// <summary>
/// 为指定流式请求类型创建可跨多个 behaviorCount 复用的 typed pipeline invoker。
/// </summary>
private static StreamPipelineInvoker CreateStreamPipelineInvoker(Type requestType, Type responseType)
private static StreamPipelineInvoker<TResponse> CreateStreamPipelineInvoker<TResponse>(Type requestType)
{
var method = StreamPipelineInvokerMethodDefinition
.MakeGenericMethod(requestType, responseType);
return (StreamPipelineInvoker)Delegate.CreateDelegate(typeof(StreamPipelineInvoker), method);
.MakeGenericMethod(requestType, typeof(TResponse));
return (StreamPipelineInvoker<TResponse>)Delegate.CreateDelegate(
typeof(StreamPipelineInvoker<TResponse>),
method);
}
/// <summary>
/// 为指定流式请求/响应类型与固定行为数量创建 pipeline executor。
/// 行为数量用于表达缓存形状,实际建流仍会消费本次容器解析出的 handler 与 behaviors 实例。
/// </summary>
private static StreamPipelineExecutor<TResponse> CreateStreamPipelineExecutor<TResponse>(
int behaviorCount,
StreamPipelineInvoker<TResponse> invoker)
{
ArgumentOutOfRangeException.ThrowIfNegative(behaviorCount);
return new StreamPipelineExecutor<TResponse>(behaviorCount, invoker);
}
/// <summary>
/// 保存固定行为数量下的 typed stream pipeline executor 形状。
/// 该对象自身可跨建流复用,但每次调用都只绑定当前 handler 与 behavior 实例。
/// </summary>
private sealed class StreamPipelineExecutor(
private sealed class StreamPipelineExecutor<TResponse>(
int behaviorCount,
StreamPipelineInvoker invoker)
StreamPipelineInvoker<TResponse> invoker)
{
/// <summary>
/// 获取此 executor 预期处理的行为数量。
@ -952,10 +1073,10 @@ internal sealed class CqrsDispatcher(
/// <summary>
/// 使用当前 handler / behaviors / request 执行缓存的 pipeline 形状。
/// </summary>
public object Invoke(
public IAsyncEnumerable<TResponse> Invoke(
object handler,
IReadOnlyList<object> behaviors,
StreamInvoker streamInvoker,
StreamInvoker<TResponse> streamInvoker,
object request,
CancellationToken cancellationToken)
{
@ -972,8 +1093,8 @@ internal sealed class CqrsDispatcher(
/// <summary>
/// 为 stream pipeline executor 缓存携带 typed pipeline invoker避免按行为数量建缓存时创建闭包。
/// </summary>
private readonly record struct StreamPipelineExecutorFactoryState(
StreamPipelineInvoker PipelineInvoker);
private readonly record struct StreamPipelineExecutorFactoryState<TResponse>(
StreamPipelineInvoker<TResponse> PipelineInvoker);
/// <summary>
/// 供 registrar 在 generated registry 激活后登记 request invoker 元数据。
@ -1111,12 +1232,12 @@ internal sealed class CqrsDispatcher(
/// </summary>
private sealed class StreamPipelineInvocation<TRequest, TResponse>(
IStreamRequestHandler<TRequest, TResponse> handler,
StreamInvoker streamInvoker,
StreamInvoker<TResponse> streamInvoker,
IReadOnlyList<object> behaviors)
where TRequest : IStreamRequest<TResponse>
{
private readonly IStreamRequestHandler<TRequest, TResponse> _handler = handler;
private readonly StreamInvoker _streamInvoker = streamInvoker;
private readonly StreamInvoker<TResponse> _streamInvoker = streamInvoker;
private readonly IReadOnlyList<object> _behaviors = behaviors;
private readonly StreamMessageHandlerDelegate<TRequest, TResponse>?[] _continuations =
new StreamMessageHandlerDelegate<TRequest, TResponse>?[behaviors.Count + 1];
@ -1167,7 +1288,7 @@ internal sealed class CqrsDispatcher(
/// </summary>
private IAsyncEnumerable<TResponse> InvokeHandler(TRequest request, CancellationToken cancellationToken)
{
return (IAsyncEnumerable<TResponse>)_streamInvoker(_handler, request, cancellationToken);
return _streamInvoker(_handler, request, cancellationToken);
}
/// <summary>