feat(cqrs): 补齐流式管道行为接缝

- 新增 stream pipeline 契约、dispatcher executor 缓存与 generated invoker 兼容路径

- 补充 Architecture 与 IOC 的流式管道注册入口及对应回归测试

- 更新 CQRS 文档和 cqrs-rewrite 的 active tracking/trace
This commit is contained in:
gewuyou 2026-05-08 08:20:48 +08:00
parent 02a60df718
commit aebf1e974d
31 changed files with 1176 additions and 15 deletions

View File

@ -84,6 +84,15 @@ public interface IArchitecture : IAsyncInitializable, IAsyncDestroyable, IInitia
void RegisterCqrsPipelineBehavior<TBehavior>()
where TBehavior : class;
/// <summary>
/// 注册 CQRS 流式请求管道行为。
/// 既支持实现 <c>IStreamPipelineBehavior&lt;,&gt;</c> 的开放泛型行为类型,
/// 也支持绑定到单一流式请求/响应对的封闭行为类型。
/// </summary>
/// <typeparam name="TBehavior">行为类型,必须是引用类型</typeparam>
void RegisterCqrsStreamPipelineBehavior<TBehavior>()
where TBehavior : class;
/// <summary>
/// 从指定程序集显式注册 CQRS 处理器。
/// 当处理器位于默认架构程序集之外的模块或扩展程序集中时,可在初始化阶段调用该入口接入对应程序集。

View File

@ -105,6 +105,13 @@ public interface IIocContainer : IContextAware, IDisposable
void RegisterCqrsPipelineBehavior<TBehavior>()
where TBehavior : class;
/// <summary>
/// 注册 CQRS 流式请求管道行为。
/// </summary>
/// <typeparam name="TBehavior">行为类型,必须是引用类型</typeparam>
void RegisterCqrsStreamPipelineBehavior<TBehavior>()
where TBehavior : class;
/// <summary>
/// 从指定程序集显式注册 CQRS 处理器。
/// 该入口适用于处理器不位于默认架构程序集中的场景,例如扩展包、模块程序集或拆分后的业务程序集。

View File

@ -28,6 +28,7 @@ public class ArchitectureModulesBehaviorTests
LoggerFactoryResolver.Provider = new ConsoleLoggerFactoryProvider();
GameContext.Clear();
TrackingPipelineBehavior<ModuleBehaviorRequest, string>.InvocationCount = 0;
TrackingStreamPipelineBehavior<ModuleStreamBehaviorRequest, int>.InvocationCount = 0;
}
/// <summary>
@ -38,6 +39,7 @@ public class ArchitectureModulesBehaviorTests
{
GameContext.Clear();
TrackingPipelineBehavior<ModuleBehaviorRequest, string>.InvocationCount = 0;
TrackingStreamPipelineBehavior<ModuleStreamBehaviorRequest, int>.InvocationCount = 0;
LegacyBridgePipelineTracker.Reset();
}
@ -92,6 +94,34 @@ public class ArchitectureModulesBehaviorTests
}
}
/// <summary>
/// 验证注册的 CQRS stream 行为会参与建流处理流程。
/// </summary>
[Test]
public async Task RegisterCqrsStreamPipelineBehavior_Should_Apply_Pipeline_Behavior_To_Stream_Request()
{
var architecture = new ModuleTestArchitecture(target =>
target.RegisterCqrsStreamPipelineBehavior<TrackingStreamPipelineBehavior<ModuleStreamBehaviorRequest, int>>());
await architecture.InitializeAsync();
try
{
var response = await DrainAsync(architecture.Context.CreateStream(new ModuleStreamBehaviorRequest()));
Assert.Multiple(() =>
{
Assert.That(response, Is.EqualTo([7]));
Assert.That(
TrackingStreamPipelineBehavior<ModuleStreamBehaviorRequest, int>.InvocationCount,
Is.EqualTo(1));
});
}
finally
{
await architecture.DestroyAsync();
}
}
/// <summary>
/// 验证默认架构初始化路径会自动扫描 Core 程序集里的 legacy bridge handler
/// 使旧 <c>SendCommand</c> / <c>SendQuery</c> 入口也能进入统一 CQRS pipeline。
@ -194,4 +224,22 @@ public class ArchitectureModulesBehaviorTests
private sealed class InstalledByModuleUtility : IUtility
{
}
/// <summary>
/// 物化异步流为只读列表,便于断言 stream pipeline 行为的最终可观察结果。
/// </summary>
/// <typeparam name="T">流元素类型。</typeparam>
/// <param name="stream">要物化的异步流。</param>
/// <returns>按枚举顺序收集的元素列表。</returns>
private static async Task<IReadOnlyList<T>> DrainAsync<T>(IAsyncEnumerable<T> stream)
{
var results = new List<T>();
await foreach (var item in stream.ConfigureAwait(false))
{
results.Add(item);
}
return results;
}
}

View File

@ -0,0 +1,13 @@
// Copyright (c) 2025-2026 GeWuYou
// SPDX-License-Identifier: Apache-2.0
using GFramework.Cqrs.Abstractions.Cqrs;
namespace GFramework.Core.Tests.Architectures;
/// <summary>
/// 用于验证架构公开 stream pipeline 行为注册入口的最小流式请求。
/// </summary>
public sealed class ModuleStreamBehaviorRequest : IStreamRequest<int>
{
}

View File

@ -0,0 +1,28 @@
// Copyright (c) 2025-2026 GeWuYou
// SPDX-License-Identifier: Apache-2.0
using System.Collections.Generic;
using System.Runtime.CompilerServices;
using GFramework.Cqrs.Abstractions.Cqrs;
namespace GFramework.Core.Tests.Architectures;
/// <summary>
/// 处理 <see cref="ModuleStreamBehaviorRequest" /> 并返回一个固定元素。
/// </summary>
public sealed class ModuleStreamBehaviorRequestHandler : IStreamRequestHandler<ModuleStreamBehaviorRequest, int>
{
/// <summary>
/// 返回一个固定元素,供架构 stream pipeline 行为回归断言使用。
/// </summary>
/// <param name="request">当前流式请求。</param>
/// <param name="cancellationToken">取消令牌。</param>
/// <returns>包含一个固定元素的异步流。</returns>
public async IAsyncEnumerable<int> Handle(
ModuleStreamBehaviorRequest request,
[EnumeratorCancellation] CancellationToken cancellationToken)
{
yield return 7;
await ValueTask.CompletedTask.ConfigureAwait(false);
}
}

View File

@ -65,6 +65,16 @@ public class TestArchitectureWithRegistry : IArchitecture
throw new NotSupportedException();
}
/// <summary>
/// 测试替身未实现 CQRS 流式管道行为注册。
/// </summary>
/// <typeparam name="TBehavior">行为类型。</typeparam>
/// <exception cref="NotSupportedException">该测试替身不参与 CQRS 流式管道配置验证。</exception>
public void RegisterCqrsStreamPipelineBehavior<TBehavior>() where TBehavior : class
{
throw new NotSupportedException();
}
/// <summary>
/// 测试替身未实现显式程序集 CQRS 处理器接入入口。
/// </summary>

View File

@ -63,6 +63,16 @@ public class TestArchitectureWithoutRegistry : IArchitecture
throw new NotSupportedException();
}
/// <summary>
/// 测试替身未实现 CQRS 流式管道行为注册。
/// </summary>
/// <typeparam name="TBehavior">行为类型。</typeparam>
/// <exception cref="NotSupportedException">该测试替身不参与 CQRS 流式管道配置验证。</exception>
public void RegisterCqrsStreamPipelineBehavior<TBehavior>() where TBehavior : class
{
throw new NotSupportedException();
}
/// <summary>
/// 测试替身未实现显式程序集 CQRS 处理器接入入口。
/// </summary>

View File

@ -0,0 +1,44 @@
// Copyright (c) 2025-2026 GeWuYou
// SPDX-License-Identifier: Apache-2.0
using System.Threading;
using GFramework.Cqrs.Abstractions.Cqrs;
namespace GFramework.Core.Tests.Architectures;
/// <summary>
/// 记录流式请求通过管道次数的测试行为。
/// </summary>
/// <typeparam name="TRequest">流式请求类型。</typeparam>
/// <typeparam name="TResponse">流式响应元素类型。</typeparam>
public sealed class TrackingStreamPipelineBehavior<TRequest, TResponse> : IStreamPipelineBehavior<TRequest, TResponse>
where TRequest : IStreamRequest<TResponse>
{
private static int _invocationCount;
/// <summary>
/// 获取当前测试进程中该流式请求类型对应的行为触发次数。
/// 该计数器是按泛型闭包共享的静态状态,测试需要在每次运行前显式重置。
/// </summary>
public static int InvocationCount
{
get => Volatile.Read(ref _invocationCount);
set => Volatile.Write(ref _invocationCount, value);
}
/// <summary>
/// 以线程安全方式记录一次行为执行,然后继续执行下一个处理阶段。
/// </summary>
/// <param name="message">当前流式请求消息。</param>
/// <param name="next">下一个处理阶段。</param>
/// <param name="cancellationToken">取消令牌。</param>
/// <returns>下游处理阶段返回的异步流。</returns>
public IAsyncEnumerable<TResponse> Handle(
TRequest message,
StreamMessageHandlerDelegate<TRequest, TResponse> next,
CancellationToken cancellationToken)
{
Interlocked.Increment(ref _invocationCount);
return next(message, cancellationToken);
}
}

View File

@ -177,6 +177,16 @@ public abstract class Architecture : IArchitecture
_modules.RegisterCqrsPipelineBehavior<TBehavior>();
}
/// <summary>
/// 注册 CQRS 流式请求管道行为。
/// 可以传入开放泛型行为类型,也可以传入绑定到特定流式请求的封闭行为类型。
/// </summary>
/// <typeparam name="TBehavior">行为类型,必须是引用类型</typeparam>
public void RegisterCqrsStreamPipelineBehavior<TBehavior>() where TBehavior : class
{
_modules.RegisterCqrsStreamPipelineBehavior<TBehavior>();
}
/// <summary>
/// 从指定程序集显式注册 CQRS 处理器。
/// 该入口适用于把拆分到其他模块或扩展包程序集中的 handlers 接入当前架构。

View File

@ -27,6 +27,17 @@ internal sealed class ArchitectureModules(
services.Container.RegisterCqrsPipelineBehavior<TBehavior>();
}
/// <summary>
/// 注册 CQRS 流式请求管道行为。
/// 支持开放泛型行为类型和针对单一流式请求的封闭行为类型。
/// </summary>
/// <typeparam name="TBehavior">行为类型,必须是引用类型</typeparam>
public void RegisterCqrsStreamPipelineBehavior<TBehavior>() where TBehavior : class
{
logger.Debug($"Registering CQRS stream pipeline behavior: {typeof(TBehavior).Name}");
services.Container.RegisterCqrsStreamPipelineBehavior<TBehavior>();
}
/// <summary>
/// 从指定程序集显式注册 CQRS 处理器。
/// 该入口用于把默认架构程序集之外的扩展处理器接入当前架构容器。

View File

@ -522,6 +522,56 @@ public class MicrosoftDiContainer(IServiceCollection? serviceCollection = null)
}
}
/// <summary>
/// 注册 CQRS 流式请求管道行为。
/// 同时支持开放泛型行为类型和已闭合的具体行为类型,
/// 以兼容通用行为和针对单一流式请求的专用行为两种注册方式。
/// </summary>
/// <typeparam name="TBehavior">行为类型,必须是引用类型</typeparam>
public void RegisterCqrsStreamPipelineBehavior<TBehavior>() where TBehavior : class
{
ThrowIfDisposed();
EnterWriteLockOrThrowDisposed();
try
{
ThrowIfFrozen();
var behaviorType = typeof(TBehavior);
if (behaviorType.IsGenericTypeDefinition)
{
GetServicesUnsafe.AddSingleton(typeof(IStreamPipelineBehavior<,>), behaviorType);
}
else
{
var pipelineInterfaces = behaviorType
.GetInterfaces()
.Where(type => type.IsGenericType &&
type.GetGenericTypeDefinition() == typeof(IStreamPipelineBehavior<,>))
.ToList();
if (pipelineInterfaces.Count == 0)
{
var errorMessage = $"{behaviorType.Name} does not implement IStreamPipelineBehavior<,>";
_logger.Error(errorMessage);
throw new InvalidOperationException(errorMessage);
}
// 为每个已闭合的流式管道接口建立显式映射,支持针对特定流式请求/响应的专用行为。
foreach (var pipelineInterface in pipelineInterfaces)
{
GetServicesUnsafe.AddSingleton(pipelineInterface, behaviorType);
}
}
_logger.Debug($"CQRS stream pipeline behavior registered: {behaviorType.Name}");
}
finally
{
_lock.ExitWriteLock();
}
}
/// <summary>
/// 从指定程序集显式注册 CQRS 处理器。
/// </summary>

View File

@ -0,0 +1,25 @@
// Copyright (c) 2025-2026 GeWuYou
// SPDX-License-Identifier: Apache-2.0
namespace GFramework.Cqrs.Abstractions.Cqrs;
/// <summary>
/// 定义流式 CQRS 请求在建流阶段使用的管道行为。
/// </summary>
/// <typeparam name="TRequest">流式请求类型。</typeparam>
/// <typeparam name="TResponse">流式响应元素类型。</typeparam>
public interface IStreamPipelineBehavior<TRequest, TResponse>
where TRequest : IStreamRequest<TResponse>
{
/// <summary>
/// 处理当前流式请求,并决定是否继续调用后续行为或最终处理器。
/// </summary>
/// <param name="message">当前流式请求消息。</param>
/// <param name="next">下一个处理委托。</param>
/// <param name="cancellationToken">取消令牌。</param>
/// <returns>异步响应序列。</returns>
IAsyncEnumerable<TResponse> Handle(
TRequest message,
StreamMessageHandlerDelegate<TRequest, TResponse> next,
CancellationToken cancellationToken);
}

View File

@ -0,0 +1,22 @@
// Copyright (c) 2025-2026 GeWuYou
// SPDX-License-Identifier: Apache-2.0
namespace GFramework.Cqrs.Abstractions.Cqrs;
/// <summary>
/// 表示流式 CQRS 请求在管道中继续向下执行的处理委托。
/// </summary>
/// <remarks>
/// <para>stream 行为可以通过不调用该委托来短路整个流式处理链。</para>
/// <para>除显式实现重试、回放或分支等高级语义外,行为通常应最多调用一次该委托,以维持单次建流的确定性。</para>
/// <para>调用方应传递当前收到的 <paramref name="cancellationToken" />,确保取消信号沿建流入口与后续枚举链路一致传播。</para>
/// </remarks>
/// <typeparam name="TRequest">流式请求类型。</typeparam>
/// <typeparam name="TResponse">流式响应元素类型。</typeparam>
/// <param name="message">当前流式请求消息。</param>
/// <param name="cancellationToken">取消令牌。</param>
/// <returns>异步响应序列。</returns>
public delegate IAsyncEnumerable<TResponse> StreamMessageHandlerDelegate<in TRequest, out TResponse>(
TRequest message,
CancellationToken cancellationToken)
where TRequest : IStreamRequest<TResponse>;

View File

@ -19,7 +19,7 @@
推荐按职责引用:
- `GeWuYou.GFramework.Cqrs.Abstractions`
- 提供 `IRequest<TResponse>``INotification``IStreamRequest<TResponse>``IRequestHandler<,>``INotificationHandler<>``IPipelineBehavior<,>``ICqrsRuntime`、`ICqrsContext``Unit` 等基础契约。
- 提供 `IRequest<TResponse>``INotification``IStreamRequest<TResponse>``IRequestHandler<,>``INotificationHandler<>``IPipelineBehavior<,>``IStreamPipelineBehavior<,>`、`ICqrsRuntime`、`ICqrsContext``Unit` 等基础契约。
- `GeWuYou.GFramework.Cqrs`
- 引用本包,并提供默认 runtime、处理器注册、消息基类、处理器基类、上下文扩展方法。
- `GeWuYou.GFramework.Cqrs.SourceGenerators`
@ -38,7 +38,7 @@
- 运行时协作接口
- `ICqrsRuntime``ICqrsContext``ICqrsHandlerRegistrar`
- 管道与辅助类型
- `IPipelineBehavior<,>``MessageHandlerDelegate<,>`、`Unit`
- `IPipelineBehavior<,>``IStreamPipelineBehavior<,>`、`MessageHandlerDelegate<,>``StreamMessageHandlerDelegate<,>`、`Unit`
## 最小接入路径

View File

@ -31,6 +31,10 @@ internal sealed class CqrsDispatcherCacheTests
_container.RegisterCqrsPipelineBehavior<DispatcherPipelineContextRefreshBehavior>();
_container.RegisterCqrsPipelineBehavior<DispatcherPipelineOrderOuterBehavior>();
_container.RegisterCqrsPipelineBehavior<DispatcherPipelineOrderInnerBehavior>();
_container.RegisterCqrsStreamPipelineBehavior<DispatcherStreamPipelineCacheBehavior>();
_container.RegisterCqrsStreamPipelineBehavior<DispatcherStreamPipelineContextRefreshBehavior>();
_container.RegisterCqrsStreamPipelineBehavior<DispatcherStreamPipelineOrderOuterBehavior>();
_container.RegisterCqrsStreamPipelineBehavior<DispatcherStreamPipelineOrderInnerBehavior>();
CqrsTestRuntime.RegisterHandlers(
_container,
@ -41,6 +45,7 @@ internal sealed class CqrsDispatcherCacheTests
_context = new ArchitectureContext(_container);
DispatcherNotificationContextRefreshState.Reset();
DispatcherPipelineContextRefreshState.Reset();
DispatcherStreamPipelineOrderState.Reset();
DispatcherStreamContextRefreshState.Reset();
ClearDispatcherCaches();
}
@ -220,6 +225,71 @@ internal sealed class CqrsDispatcherCacheTests
});
}
/// <summary>
/// 验证 stream pipeline executor 会按行为数量在 binding 内首次创建并在后续建流中复用。
/// </summary>
[Test]
public async Task Dispatcher_Should_Cache_Stream_Pipeline_Executors_Per_Behavior_Count()
{
var streamBindings = GetCacheField("StreamDispatchBindings");
Assert.Multiple(() =>
{
Assert.That(
GetStreamPipelineExecutorValue(
streamBindings,
typeof(DispatcherCacheStreamRequest),
typeof(int),
1),
Is.Null);
Assert.That(
GetStreamPipelineExecutorValue(
streamBindings,
typeof(DispatcherStreamPipelineOrderRequest),
typeof(int),
2),
Is.Null);
});
await DrainAsync(_context!.CreateStream(new DispatcherCacheStreamRequest()));
await DrainAsync(_context.CreateStream(new DispatcherStreamPipelineOrderRequest()));
var singleBehaviorExecutor = GetStreamPipelineExecutorValue(
streamBindings,
typeof(DispatcherCacheStreamRequest),
typeof(int),
1);
var twoBehaviorExecutor = GetStreamPipelineExecutorValue(
streamBindings,
typeof(DispatcherStreamPipelineOrderRequest),
typeof(int),
2);
await DrainAsync(_context.CreateStream(new DispatcherCacheStreamRequest()));
await DrainAsync(_context.CreateStream(new DispatcherStreamPipelineOrderRequest()));
Assert.Multiple(() =>
{
Assert.That(singleBehaviorExecutor, Is.Not.Null);
Assert.That(twoBehaviorExecutor, Is.Not.Null);
Assert.That(singleBehaviorExecutor, Is.Not.SameAs(twoBehaviorExecutor));
Assert.That(
GetStreamPipelineExecutorValue(
streamBindings,
typeof(DispatcherCacheStreamRequest),
typeof(int),
1),
Is.SameAs(singleBehaviorExecutor));
Assert.That(
GetStreamPipelineExecutorValue(
streamBindings,
typeof(DispatcherStreamPipelineOrderRequest),
typeof(int),
2),
Is.SameAs(twoBehaviorExecutor));
});
}
/// <summary>
/// 验证复用缓存的 request pipeline executor 后,行为顺序和最终处理器顺序保持不变。
/// </summary>
@ -252,6 +322,38 @@ internal sealed class CqrsDispatcherCacheTests
});
}
/// <summary>
/// 验证复用缓存的 stream pipeline executor 后,行为顺序和最终处理器顺序保持不变。
/// </summary>
[Test]
public async Task Dispatcher_Should_Preserve_Stream_Pipeline_Order_When_Reusing_Cached_Executor()
{
DispatcherStreamPipelineOrderState.Reset();
await DrainAsync(_context!.CreateStream(new DispatcherStreamPipelineOrderRequest()));
var firstInvocation = DispatcherStreamPipelineOrderState.Steps.ToArray();
DispatcherStreamPipelineOrderState.Reset();
await DrainAsync(_context.CreateStream(new DispatcherStreamPipelineOrderRequest()));
var secondInvocation = DispatcherStreamPipelineOrderState.Steps.ToArray();
var expectedOrder = new[]
{
"Outer:Before",
"Inner:Before",
"Handler",
"Inner:After",
"Outer:After"
};
Assert.Multiple(() =>
{
Assert.That(firstInvocation, Is.EqualTo(expectedOrder));
Assert.That(secondInvocation, Is.EqualTo(expectedOrder));
});
}
/// <summary>
/// 验证缓存的 request pipeline executor 在重复分发时仍会重新解析 handler/behavior
/// 并为当次实例重新注入当前架构上下文。
@ -392,6 +494,60 @@ internal sealed class CqrsDispatcherCacheTests
});
}
/// <summary>
/// 验证缓存的 stream pipeline executor 在重复建流时仍会重新解析 behavior/handler
/// 并为当次实例重新注入当前架构上下文。
/// </summary>
[Test]
public async Task Dispatcher_Should_Reinject_Current_Context_When_Reusing_Cached_Stream_Pipeline_Executor()
{
DispatcherStreamContextRefreshState.Reset();
var streamBindings = GetCacheField("StreamDispatchBindings");
var firstContext = new ArchitectureContext(_container!);
var secondContext = new ArchitectureContext(_container!);
await DrainAsync(firstContext.CreateStream(new DispatcherStreamContextRefreshRequest("first")));
var executorAfterFirstDispatch = GetStreamPipelineExecutorValue(
streamBindings,
typeof(DispatcherStreamContextRefreshRequest),
typeof(int),
1);
await DrainAsync(secondContext.CreateStream(new DispatcherStreamContextRefreshRequest("second")));
var executorAfterSecondDispatch = GetStreamPipelineExecutorValue(
streamBindings,
typeof(DispatcherStreamContextRefreshRequest),
typeof(int),
1);
var behaviorSnapshots = DispatcherStreamContextRefreshState.BehaviorSnapshots.ToArray();
var handlerSnapshots = DispatcherStreamContextRefreshState.HandlerSnapshots.ToArray();
Assert.Multiple(() =>
{
Assert.That(executorAfterFirstDispatch, Is.Not.Null);
Assert.That(executorAfterSecondDispatch, Is.SameAs(executorAfterFirstDispatch));
Assert.That(behaviorSnapshots, Has.Length.EqualTo(2));
Assert.That(handlerSnapshots, Has.Length.EqualTo(2));
Assert.That(behaviorSnapshots[0].DispatchId, Is.EqualTo("first"));
Assert.That(behaviorSnapshots[0].Context, Is.SameAs(firstContext));
Assert.That(behaviorSnapshots[1].DispatchId, Is.EqualTo("second"));
Assert.That(behaviorSnapshots[1].Context, Is.SameAs(secondContext));
Assert.That(behaviorSnapshots[1].Context, Is.Not.SameAs(behaviorSnapshots[0].Context));
Assert.That(handlerSnapshots[0].DispatchId, Is.EqualTo("first"));
Assert.That(handlerSnapshots[0].Context, Is.SameAs(firstContext));
Assert.That(handlerSnapshots[1].DispatchId, Is.EqualTo("second"));
Assert.That(handlerSnapshots[1].Context, Is.SameAs(secondContext));
Assert.That(handlerSnapshots[1].Context, Is.Not.SameAs(handlerSnapshots[0].Context));
Assert.That(handlerSnapshots[1].InstanceId, Is.Not.EqualTo(handlerSnapshots[0].InstanceId));
});
}
/// <summary>
/// 通过反射读取 dispatcher 的静态缓存对象。
/// </summary>
@ -455,6 +611,26 @@ internal sealed class CqrsDispatcherCacheTests
: InvokeInstanceMethod(binding, "GetPipelineExecutorForTesting", behaviorCount);
}
/// <summary>
/// 读取 stream dispatch binding 中指定行为数量的 pipeline executor 缓存项。
/// </summary>
/// <param name="streamBindings">dispatcher 内部的 stream binding 缓存对象。</param>
/// <param name="requestType">要读取的流式请求运行时类型。</param>
/// <param name="responseType">要读取的响应元素类型。</param>
/// <param name="behaviorCount">目标 executor 对应的行为数量。</param>
/// <returns>已缓存的 executor若 binding 或 executor 尚未建立则返回 <see langword="null" />。</returns>
private static object? GetStreamPipelineExecutorValue(
object streamBindings,
Type requestType,
Type responseType,
int behaviorCount)
{
var binding = GetPairCacheValue(streamBindings, requestType, responseType);
return binding is null
? null
: InvokeInstanceMethod(binding, "GetPipelineExecutorForTesting", behaviorCount);
}
/// <summary>
/// 调用缓存实例上的无参清理方法。
/// </summary>

View File

@ -73,6 +73,33 @@ internal sealed class CqrsDispatcherContextValidationTests
container
.Setup(currentContainer => currentContainer.Get(typeof(IStreamRequestHandler<ContextAwareStreamRequest, int>)))
.Returns(new ContextAwareStreamHandler());
container
.Setup(currentContainer => currentContainer.GetAll(typeof(IStreamPipelineBehavior<ContextAwareStreamRequest, int>)))
.Returns(Array.Empty<object>());
});
Assert.That(
() => runtime.CreateStream(new FakeCqrsContext(), new ContextAwareStreamRequest()),
Throws.InvalidOperationException.With.Message.Contains("does not implement IArchitectureContext"));
}
/// <summary>
/// 验证当 stream pipeline behavior 需要上下文注入、但当前 CQRS 上下文不实现
/// <see cref="GFramework.Core.Abstractions.Architectures.IArchitectureContext" /> 时,
/// dispatcher 会在建流前显式失败。
/// </summary>
[Test]
public void CreateStream_Should_Throw_When_Stream_Pipeline_Behavior_Context_Does_Not_Implement_IArchitectureContext()
{
var runtime = CreateRuntime(
container =>
{
container
.Setup(currentContainer => currentContainer.Get(typeof(IStreamRequestHandler<ContextAwareStreamRequest, int>)))
.Returns(new PassthroughStreamHandler());
container
.Setup(currentContainer => currentContainer.GetAll(typeof(IStreamPipelineBehavior<ContextAwareStreamRequest, int>)))
.Returns([new ContextAwareStreamBehavior()]);
});
Assert.That(
@ -174,4 +201,47 @@ internal sealed class CqrsDispatcherContextValidationTests
await ValueTask.CompletedTask.ConfigureAwait(false);
}
}
/// <summary>
/// 为 stream behavior 上下文校验提供不依赖上下文注入的最小 handler。
/// </summary>
private sealed class PassthroughStreamHandler : IStreamRequestHandler<ContextAwareStreamRequest, int>
{
/// <summary>
/// 返回一个最小流;当前测试只关心 behavior 注入前的上下文校验。
/// </summary>
/// <param name="request">当前流请求。</param>
/// <param name="cancellationToken">取消枚举时使用的取消令牌。</param>
/// <returns>包含单个固定元素的异步流。</returns>
public async IAsyncEnumerable<int> Handle(
ContextAwareStreamRequest request,
[EnumeratorCancellation] CancellationToken cancellationToken)
{
yield return 1;
await ValueTask.CompletedTask.ConfigureAwait(false);
}
}
/// <summary>
/// 为 stream behavior 上下文校验提供需要注入架构上下文的最小 behavior。
/// </summary>
private sealed class ContextAwareStreamBehavior
: CqrsContextAwareHandlerBase,
IStreamPipelineBehavior<ContextAwareStreamRequest, int>
{
/// <summary>
/// 直接转发到下一个处理阶段;当前测试只关心调用前的上下文校验。
/// </summary>
/// <param name="message">当前流式请求。</param>
/// <param name="next">下一个处理阶段。</param>
/// <param name="cancellationToken">取消令牌。</param>
/// <returns>下游处理阶段返回的异步流。</returns>
public IAsyncEnumerable<int> Handle(
ContextAwareStreamRequest message,
StreamMessageHandlerDelegate<ContextAwareStreamRequest, int> next,
CancellationToken cancellationToken)
{
return next(message, cancellationToken);
}
}
}

View File

@ -27,6 +27,7 @@ internal sealed class CqrsGeneratedRequestInvokerProviderTests
{
_previousLoggerFactoryProvider = LoggerFactoryResolver.Provider;
LoggerFactoryResolver.Provider = new ConsoleLoggerFactoryProvider();
GeneratedStreamPipelineTrackingBehavior.InvocationCount = 0;
ClearRegistrarCaches();
ClearDispatcherCaches();
}
@ -38,6 +39,7 @@ internal sealed class CqrsGeneratedRequestInvokerProviderTests
public void TearDown()
{
LoggerFactoryResolver.Provider = _previousLoggerFactoryProvider ?? new ConsoleLoggerFactoryProvider();
GeneratedStreamPipelineTrackingBehavior.InvocationCount = 0;
ClearRegistrarCaches();
ClearDispatcherCaches();
}
@ -169,6 +171,30 @@ internal sealed class CqrsGeneratedRequestInvokerProviderTests
Assert.That(results, Is.EqualTo([30, 31]));
}
/// <summary>
/// 验证 generated stream invoker 与 stream pipeline 行为同时存在时,
/// dispatcher 仍会保持 generated invoker 优先,并正确包裹到行为链内。
/// </summary>
[Test]
public async Task CreateStream_Should_Use_Generated_Stream_Invoker_Inside_Stream_Pipeline()
{
var generatedAssembly = CreateGeneratedStreamInvokerAssembly();
var container = new MicrosoftDiContainer();
container.RegisterCqrsStreamPipelineBehavior<GeneratedStreamPipelineTrackingBehavior>();
CqrsTestRuntime.RegisterHandlers(container, generatedAssembly.Object);
container.Freeze();
var context = new ArchitectureContext(container);
var results = await DrainAsync(context.CreateStream(new GeneratedStreamInvokerRequest(3))).ConfigureAwait(false);
Assert.Multiple(() =>
{
Assert.That(results, Is.EqualTo([30, 31]));
Assert.That(GeneratedStreamPipelineTrackingBehavior.InvocationCount, Is.EqualTo(1));
});
}
/// <summary>
/// 验证当实现类型隐藏、但 stream handler interface 仍可直接表达时,
/// dispatcher 仍会消费 generated stream invoker descriptor。

View File

@ -12,9 +12,27 @@ namespace GFramework.Cqrs.Tests.Cqrs;
internal static class DispatcherStreamContextRefreshState
{
private static readonly Lock _syncRoot = new();
private static int _nextBehaviorInstanceId;
private static int _nextHandlerInstanceId;
private static readonly List<DispatcherPipelineContextSnapshot> _behaviorSnapshots = [];
private static readonly List<DispatcherPipelineContextSnapshot> _handlerSnapshots = [];
/// <summary>
/// 获取每次建流时记录的 behavior 快照副本。
/// </summary>
/// <returns>当前已记录的 behavior 上下文快照副本。</returns>
/// <remarks>共享状态通过 <c>_syncRoot</c> 串行化,避免并行测试写入抖动。</remarks>
public static IReadOnlyList<DispatcherPipelineContextSnapshot> BehaviorSnapshots
{
get
{
lock (_syncRoot)
{
return _behaviorSnapshots.ToArray();
}
}
}
/// <summary>
/// 获取每次建流时记录的快照副本。
/// </summary>
@ -31,6 +49,15 @@ internal static class DispatcherStreamContextRefreshState
}
}
/// <summary>
/// 为新的 behavior 测试实例分配稳定编号。
/// </summary>
/// <returns>单调递增的 behavior 实例编号。</returns>
public static int AllocateBehaviorInstanceId()
{
return Interlocked.Increment(ref _nextBehaviorInstanceId);
}
/// <summary>
/// 为新的 handler 测试实例分配稳定编号。
/// </summary>
@ -40,6 +67,21 @@ internal static class DispatcherStreamContextRefreshState
return Interlocked.Increment(ref _nextHandlerInstanceId);
}
/// <summary>
/// 记录 behavior 在当前建流中观察到的上下文。
/// </summary>
/// <param name="dispatchId">触发本次记录的稳定分发标识。</param>
/// <param name="instanceId">观察到该上下文的 behavior 实例编号。</param>
/// <param name="context">当前分发注入到 behavior 的架构上下文。</param>
/// <remarks>写入过程通过 <c>_syncRoot</c> 串行化,确保快照列表保持稳定顺序。</remarks>
public static void RecordBehavior(string dispatchId, int instanceId, IArchitectureContext context)
{
lock (_syncRoot)
{
_behaviorSnapshots.Add(new DispatcherPipelineContextSnapshot(dispatchId, instanceId, context));
}
}
/// <summary>
/// 记录 handler 在当前建流中观察到的上下文。
/// </summary>
@ -63,7 +105,9 @@ internal static class DispatcherStreamContextRefreshState
{
lock (_syncRoot)
{
_nextBehaviorInstanceId = 0;
_nextHandlerInstanceId = 0;
_behaviorSnapshots.Clear();
_handlerSnapshots.Clear();
}
}

View File

@ -0,0 +1,29 @@
// Copyright (c) 2025-2026 GeWuYou
// SPDX-License-Identifier: Apache-2.0
using System.Threading;
using GFramework.Cqrs.Abstractions.Cqrs;
namespace GFramework.Cqrs.Tests.Cqrs;
/// <summary>
/// 为 <see cref="DispatcherCacheStreamRequest" /> 提供最小 stream pipeline 行为,
/// 用于命中 dispatcher 的 stream pipeline invoker 缓存分支。
/// </summary>
internal sealed class DispatcherStreamPipelineCacheBehavior : IStreamPipelineBehavior<DispatcherCacheStreamRequest, int>
{
/// <summary>
/// 直接转发到下一个处理阶段。
/// </summary>
/// <param name="request">当前流式请求。</param>
/// <param name="next">下一个处理阶段。</param>
/// <param name="cancellationToken">取消令牌。</param>
/// <returns>下游处理阶段返回的异步流。</returns>
public IAsyncEnumerable<int> Handle(
DispatcherCacheStreamRequest request,
StreamMessageHandlerDelegate<DispatcherCacheStreamRequest, int> next,
CancellationToken cancellationToken)
{
return next(request, cancellationToken);
}
}

View File

@ -0,0 +1,42 @@
// Copyright (c) 2025-2026 GeWuYou
// SPDX-License-Identifier: Apache-2.0
using System.Collections.Generic;
using System.Runtime.CompilerServices;
using System.Threading;
using GFramework.Cqrs.Abstractions.Cqrs;
using GFramework.Cqrs.Cqrs;
namespace GFramework.Cqrs.Tests.Cqrs;
/// <summary>
/// 记录缓存 stream pipeline executor 复用场景下每次建流注入到 behavior 的上下文与实例身份。
/// </summary>
internal sealed class DispatcherStreamPipelineContextRefreshBehavior
: CqrsContextAwareHandlerBase,
IStreamPipelineBehavior<DispatcherStreamContextRefreshRequest, int>
{
private readonly int _instanceId = DispatcherStreamContextRefreshState.AllocateBehaviorInstanceId();
/// <summary>
/// 记录当前 behavior 实例实际收到的上下文,然后继续执行下游处理阶段。
/// </summary>
/// <param name="request">当前流式请求。</param>
/// <param name="next">下一个处理阶段。</param>
/// <param name="cancellationToken">取消令牌。</param>
/// <returns>下游处理阶段返回的异步流。</returns>
public async IAsyncEnumerable<int> Handle(
DispatcherStreamContextRefreshRequest request,
StreamMessageHandlerDelegate<DispatcherStreamContextRefreshRequest, int> next,
[EnumeratorCancellation] CancellationToken cancellationToken)
{
DispatcherStreamContextRefreshState.RecordBehavior(request.DispatchId, _instanceId, Context);
await foreach (var item in next(request, cancellationToken)
.WithCancellation(cancellationToken)
.ConfigureAwait(false))
{
yield return item;
}
}
}

View File

@ -0,0 +1,31 @@
// Copyright (c) 2025-2026 GeWuYou
// SPDX-License-Identifier: Apache-2.0
using System.Collections.Generic;
using System.Runtime.CompilerServices;
using System.Threading;
using System.Threading.Tasks;
using GFramework.Cqrs.Abstractions.Cqrs;
namespace GFramework.Cqrs.Tests.Cqrs;
/// <summary>
/// 处理 <see cref="DispatcherStreamPipelineOrderRequest" /> 并记录最终 handler 执行位置。
/// </summary>
internal sealed class DispatcherStreamPipelineOrderHandler : IStreamRequestHandler<DispatcherStreamPipelineOrderRequest, int>
{
/// <summary>
/// 记录 handler 执行步骤并返回稳定元素。
/// </summary>
/// <param name="request">当前流式请求。</param>
/// <param name="cancellationToken">取消令牌。</param>
/// <returns>包含一个固定元素的异步流。</returns>
public async IAsyncEnumerable<int> Handle(
DispatcherStreamPipelineOrderRequest request,
[EnumeratorCancellation] CancellationToken cancellationToken)
{
DispatcherStreamPipelineOrderState.Record("Handler");
yield return 21;
await Task.CompletedTask.ConfigureAwait(false);
}
}

View File

@ -0,0 +1,39 @@
// Copyright (c) 2025-2026 GeWuYou
// SPDX-License-Identifier: Apache-2.0
using System.Collections.Generic;
using System.Runtime.CompilerServices;
using System.Threading;
using GFramework.Cqrs.Abstractions.Cqrs;
namespace GFramework.Cqrs.Tests.Cqrs;
/// <summary>
/// 记录双 stream pipeline 的内层行为顺序。
/// </summary>
internal sealed class DispatcherStreamPipelineOrderInnerBehavior : IStreamPipelineBehavior<DispatcherStreamPipelineOrderRequest, int>
{
/// <summary>
/// 在进入和离开下游阶段时记录顺序。
/// </summary>
/// <param name="request">当前流式请求。</param>
/// <param name="next">下一个处理阶段。</param>
/// <param name="cancellationToken">取消令牌。</param>
/// <returns>下游处理阶段返回的异步流。</returns>
public async IAsyncEnumerable<int> Handle(
DispatcherStreamPipelineOrderRequest request,
StreamMessageHandlerDelegate<DispatcherStreamPipelineOrderRequest, int> next,
[EnumeratorCancellation] CancellationToken cancellationToken)
{
DispatcherStreamPipelineOrderState.Record("Inner:Before");
await foreach (var item in next(request, cancellationToken)
.WithCancellation(cancellationToken)
.ConfigureAwait(false))
{
yield return item;
}
DispatcherStreamPipelineOrderState.Record("Inner:After");
}
}

View File

@ -0,0 +1,39 @@
// Copyright (c) 2025-2026 GeWuYou
// SPDX-License-Identifier: Apache-2.0
using System.Collections.Generic;
using System.Runtime.CompilerServices;
using System.Threading;
using GFramework.Cqrs.Abstractions.Cqrs;
namespace GFramework.Cqrs.Tests.Cqrs;
/// <summary>
/// 记录双 stream pipeline 的外层行为顺序。
/// </summary>
internal sealed class DispatcherStreamPipelineOrderOuterBehavior : IStreamPipelineBehavior<DispatcherStreamPipelineOrderRequest, int>
{
/// <summary>
/// 在进入和离开下游阶段时记录顺序。
/// </summary>
/// <param name="request">当前流式请求。</param>
/// <param name="next">下一个处理阶段。</param>
/// <param name="cancellationToken">取消令牌。</param>
/// <returns>下游处理阶段返回的异步流。</returns>
public async IAsyncEnumerable<int> Handle(
DispatcherStreamPipelineOrderRequest request,
StreamMessageHandlerDelegate<DispatcherStreamPipelineOrderRequest, int> next,
[EnumeratorCancellation] CancellationToken cancellationToken)
{
DispatcherStreamPipelineOrderState.Record("Outer:Before");
await foreach (var item in next(request, cancellationToken)
.WithCancellation(cancellationToken)
.ConfigureAwait(false))
{
yield return item;
}
DispatcherStreamPipelineOrderState.Record("Outer:After");
}
}

View File

@ -0,0 +1,11 @@
// Copyright (c) 2025-2026 GeWuYou
// SPDX-License-Identifier: Apache-2.0
using GFramework.Cqrs.Abstractions.Cqrs;
namespace GFramework.Cqrs.Tests.Cqrs;
/// <summary>
/// 用于验证双 stream pipeline 行为执行顺序的最小流式请求。
/// </summary>
internal sealed record DispatcherStreamPipelineOrderRequest : IStreamRequest<int>;

View File

@ -0,0 +1,53 @@
// Copyright (c) 2025-2026 GeWuYou
// SPDX-License-Identifier: Apache-2.0
using System.Threading;
namespace GFramework.Cqrs.Tests.Cqrs;
/// <summary>
/// 记录双 stream pipeline 行为的实际执行顺序。
/// </summary>
internal static class DispatcherStreamPipelineOrderState
{
private static readonly Lock SyncRoot = new();
private static readonly List<string> _steps = [];
/// <summary>
/// 获取按执行顺序追加的步骤快照。
/// 共享状态通过 <c>SyncRoot</c> 串行化,避免并行 stream 行为测试互相污染步骤列表。
/// </summary>
public static IReadOnlyList<string> Steps
{
get
{
lock (SyncRoot)
{
return _steps.ToArray();
}
}
}
/// <summary>
/// 记录一个新的 stream pipeline 执行步骤。
/// </summary>
/// <param name="step">要追加的步骤名称。</param>
public static void Record(string step)
{
lock (SyncRoot)
{
_steps.Add(step);
}
}
/// <summary>
/// 清空当前记录,供下一次断言使用。
/// </summary>
public static void Reset()
{
lock (SyncRoot)
{
_steps.Clear();
}
}
}

View File

@ -0,0 +1,41 @@
// Copyright (c) 2025-2026 GeWuYou
// SPDX-License-Identifier: Apache-2.0
using System.Threading;
using GFramework.Cqrs.Abstractions.Cqrs;
namespace GFramework.Cqrs.Tests.Cqrs;
/// <summary>
/// 记录 generated stream invoker 与 stream pipeline 行为组合时的命中次数。
/// </summary>
internal sealed class GeneratedStreamPipelineTrackingBehavior
: IStreamPipelineBehavior<GeneratedStreamInvokerRequest, int>
{
private static int _invocationCount;
/// <summary>
/// 获取或重置当前测试进程中的行为触发次数。
/// </summary>
public static int InvocationCount
{
get => Volatile.Read(ref _invocationCount);
set => Volatile.Write(ref _invocationCount, value);
}
/// <summary>
/// 记录一次行为执行,然后继续执行 generated stream invoker。
/// </summary>
/// <param name="message">当前流式请求消息。</param>
/// <param name="next">下一个处理阶段。</param>
/// <param name="cancellationToken">取消令牌。</param>
/// <returns>下游处理阶段返回的异步流。</returns>
public IAsyncEnumerable<int> Handle(
GeneratedStreamInvokerRequest message,
StreamMessageHandlerDelegate<GeneratedStreamInvokerRequest, int> next,
CancellationToken cancellationToken)
{
Interlocked.Increment(ref _invocationCount);
return next(message, cancellationToken);
}
}

View File

@ -58,6 +58,9 @@ internal sealed class CqrsDispatcher(
private static readonly MethodInfo StreamHandlerInvokerMethodDefinition = typeof(CqrsDispatcher)
.GetMethod(nameof(InvokeStreamHandler), BindingFlags.NonPublic | BindingFlags.Static)!;
private static readonly MethodInfo StreamPipelineInvokerMethodDefinition = typeof(CqrsDispatcher)
.GetMethod(nameof(InvokeStreamPipelineExecutor), BindingFlags.NonPublic | BindingFlags.Static)!;
private readonly INotificationPublisher _notificationPublisher = notificationPublisher
?? throw new ArgumentNullException(
nameof(notificationPublisher));
@ -156,8 +159,16 @@ internal sealed class CqrsDispatcher(
$"No CQRS stream handler registered for {requestType.FullName}.");
PrepareHandler(handler, context);
var behaviors = container.GetAll(dispatchBinding.BehaviorType);
return (IAsyncEnumerable<TResponse>)dispatchBinding.Invoker(handler, request, cancellationToken);
foreach (var behavior in behaviors)
PrepareHandler(behavior, context);
if (behaviors.Count == 0)
return (IAsyncEnumerable<TResponse>)dispatchBinding.StreamInvoker(handler, request, cancellationToken);
return (IAsyncEnumerable<TResponse>)dispatchBinding.GetPipelineExecutor(behaviors.Count)
.Invoke(handler, behaviors, dispatchBinding.StreamInvoker, request, cancellationToken);
}
/// <summary>
@ -299,11 +310,17 @@ internal sealed class CqrsDispatcher(
var resolvedGeneratedDescriptor = generatedDescriptor.Value;
return new StreamDispatchBinding(
resolvedGeneratedDescriptor.HandlerType,
typeof(IStreamPipelineBehavior<,>).MakeGenericType(requestType, responseType),
requestType,
responseType,
resolvedGeneratedDescriptor.Invoker);
}
return new StreamDispatchBinding(
typeof(IStreamRequestHandler<,>).MakeGenericType(requestType, responseType),
typeof(IStreamPipelineBehavior<,>).MakeGenericType(requestType, responseType),
requestType,
responseType,
CreateStreamInvoker(requestType, responseType));
}
@ -491,6 +508,25 @@ internal sealed class CqrsDispatcher(
return typedHandler.Handle(typedRequest, cancellationToken);
}
/// <summary>
/// 执行指定行为数量的强类型 stream pipeline executor。
/// 该入口本身是缓存的固定 executor 形状;每次建流只绑定当前 handler 与 behaviors 实例。
/// </summary>
private static object InvokeStreamPipelineExecutor<TRequest, TResponse>(
object handler,
IReadOnlyList<object> behaviors,
StreamInvoker streamInvoker,
object request,
CancellationToken cancellationToken)
where TRequest : IStreamRequest<TResponse>
{
var invocation = new StreamPipelineInvocation<TRequest, TResponse>(
(IStreamRequestHandler<TRequest, TResponse>)handler,
streamInvoker,
behaviors);
return invocation.Invoke((TRequest)request, cancellationToken);
}
private delegate ValueTask<TResponse> RequestInvoker<TResponse>(
object handler,
object request,
@ -507,6 +543,13 @@ internal sealed class CqrsDispatcher(
private delegate object StreamInvoker(object handler, object request, CancellationToken cancellationToken);
private delegate object StreamPipelineInvoker(
object handler,
IReadOnlyList<object> behaviors,
StreamInvoker streamInvoker,
object request,
CancellationToken cancellationToken);
/// <summary>
/// 将不同响应类型的 request dispatch binding 包装到统一弱缓存值中,
/// 同时保留强类型委托,避免值类型响应退化为 object 桥接。
@ -582,17 +625,54 @@ internal sealed class CqrsDispatcher(
/// 保存流式请求分发路径所需的服务类型与调用委托。
/// 该绑定让建流热路径只需一次缓存命中即可获得解析与调用所需元数据。
/// </summary>
private sealed class StreamDispatchBinding(Type handlerType, StreamInvoker invoker)
private sealed class StreamDispatchBinding(
Type handlerType,
Type behaviorType,
Type requestType,
Type responseType,
StreamInvoker streamInvoker)
{
// 线程安全:该缓存按 behaviorCount 复用 stream pipeline executor 形状,缓存项只保存委托与数量信息,
// 不会跨建流缓存 handler 或 behavior 实例。若不同请求持续出现新的行为数量组合,字典会随之增长。
private readonly ConcurrentDictionary<int, StreamPipelineExecutor> _pipelineExecutors = new();
private readonly StreamPipelineInvoker _pipelineInvoker = CreateStreamPipelineInvoker(requestType, responseType);
/// <summary>
/// 获取流式请求处理器在容器中的服务类型。
/// </summary>
public Type HandlerType { get; } = handlerType;
/// <summary>
/// 获取 stream pipeline 行为在容器中的服务类型。
/// </summary>
public Type BehaviorType { get; } = behaviorType;
/// <summary>
/// 获取执行流式请求处理器的调用委托。
/// </summary>
public StreamInvoker Invoker { get; } = invoker;
public StreamInvoker StreamInvoker { get; } = streamInvoker;
/// <summary>
/// 获取指定行为数量对应的 stream pipeline executor。
/// executor 形状会按行为数量缓存,但不会缓存 handler 或 behavior 实例。
/// </summary>
public StreamPipelineExecutor GetPipelineExecutor(int behaviorCount)
{
ArgumentOutOfRangeException.ThrowIfNegative(behaviorCount);
return _pipelineExecutors.GetOrAdd(
behaviorCount,
static (count, state) => new StreamPipelineExecutor(count, state.PipelineInvoker),
new StreamPipelineExecutorFactoryState(_pipelineInvoker));
}
/// <summary>
/// 仅供测试读取指定行为数量是否已存在缓存 executor。
/// </summary>
public object? GetPipelineExecutorForTesting(int behaviorCount)
{
_pipelineExecutors.TryGetValue(behaviorCount, out var executor);
return executor;
}
}
/// <summary>
@ -749,6 +829,55 @@ internal sealed class CqrsDispatcher(
Type HandlerType,
StreamInvoker Invoker);
/// <summary>
/// 为指定流式请求类型创建可跨多个 behaviorCount 复用的 typed pipeline invoker。
/// </summary>
private static StreamPipelineInvoker CreateStreamPipelineInvoker(Type requestType, Type responseType)
{
var method = StreamPipelineInvokerMethodDefinition
.MakeGenericMethod(requestType, responseType);
return (StreamPipelineInvoker)Delegate.CreateDelegate(typeof(StreamPipelineInvoker), method);
}
/// <summary>
/// 保存固定行为数量下的 typed stream pipeline executor 形状。
/// 该对象自身可跨建流复用,但每次调用都只绑定当前 handler 与 behavior 实例。
/// </summary>
private sealed class StreamPipelineExecutor(
int behaviorCount,
StreamPipelineInvoker invoker)
{
/// <summary>
/// 获取此 executor 预期处理的行为数量。
/// </summary>
public int BehaviorCount { get; } = behaviorCount;
/// <summary>
/// 使用当前 handler / behaviors / request 执行缓存的 pipeline 形状。
/// </summary>
public object Invoke(
object handler,
IReadOnlyList<object> behaviors,
StreamInvoker streamInvoker,
object request,
CancellationToken cancellationToken)
{
if (behaviors.Count != BehaviorCount)
{
throw new InvalidOperationException(
$"Cached stream pipeline executor expected {BehaviorCount} behaviors, but received {behaviors.Count}.");
}
return invoker(handler, behaviors, streamInvoker, request, cancellationToken);
}
}
/// <summary>
/// 为 stream pipeline executor 缓存携带 typed pipeline invoker避免按行为数量建缓存时创建闭包。
/// </summary>
private readonly record struct StreamPipelineExecutorFactoryState(
StreamPipelineInvoker PipelineInvoker);
/// <summary>
/// 供 registrar 在 generated registry 激活后登记 request invoker 元数据。
/// </summary>
@ -878,4 +1007,88 @@ internal sealed class CqrsDispatcher(
}
}
}
/// <summary>
/// 保存单次 stream pipeline 分发所需的当前 handler、behavior 列表和 continuation 缓存。
/// 该对象只存在于本次建流,不会跨请求保留容器解析出的实例。
/// </summary>
private sealed class StreamPipelineInvocation<TRequest, TResponse>(
IStreamRequestHandler<TRequest, TResponse> handler,
StreamInvoker streamInvoker,
IReadOnlyList<object> behaviors)
where TRequest : IStreamRequest<TResponse>
{
private readonly IStreamRequestHandler<TRequest, TResponse> _handler = handler;
private readonly StreamInvoker _streamInvoker = streamInvoker;
private readonly IReadOnlyList<object> _behaviors = behaviors;
private readonly StreamMessageHandlerDelegate<TRequest, TResponse>?[] _continuations =
new StreamMessageHandlerDelegate<TRequest, TResponse>?[behaviors.Count + 1];
/// <summary>
/// 从 stream pipeline 起点开始创建异步响应序列。
/// </summary>
public IAsyncEnumerable<TResponse> Invoke(TRequest request, CancellationToken cancellationToken)
{
return GetContinuation(0)(request, cancellationToken);
}
/// <summary>
/// 获取指定阶段的 continuation并在首次请求时为该阶段绑定一次不可变调用入口。
/// 同一行为多次调用 <c>next</c> 时会命中相同 continuation保持与 request pipeline 一致的链式语义。
/// </summary>
private StreamMessageHandlerDelegate<TRequest, TResponse> GetContinuation(int index)
{
var continuation = _continuations[index];
if (continuation is not null)
{
return continuation;
}
continuation = index == _behaviors.Count
? InvokeHandler
: new StreamPipelineContinuation<TRequest, TResponse>(this, index).Invoke;
_continuations[index] = continuation;
return continuation;
}
/// <summary>
/// 执行指定索引的 stream pipeline behavior。
/// </summary>
private IAsyncEnumerable<TResponse> InvokeBehavior(
int index,
TRequest request,
CancellationToken cancellationToken)
{
var behavior = (IStreamPipelineBehavior<TRequest, TResponse>)_behaviors[index];
return behavior.Handle(request, GetContinuation(index + 1), cancellationToken);
}
/// <summary>
/// 调用最终流式请求处理器。
/// </summary>
private IAsyncEnumerable<TResponse> InvokeHandler(TRequest request, CancellationToken cancellationToken)
{
return (IAsyncEnumerable<TResponse>)_streamInvoker(_handler, request, cancellationToken);
}
/// <summary>
/// 将固定阶段索引绑定为标准 <see cref="StreamMessageHandlerDelegate{TRequest,TResponse}" />。
/// 该包装只在单次建流生命周期内存在,用于把缓存 shape 套入当前实例。
/// </summary>
private sealed class StreamPipelineContinuation<TCurrentRequest, TCurrentResponse>(
StreamPipelineInvocation<TCurrentRequest, TCurrentResponse> invocation,
int index)
where TCurrentRequest : IStreamRequest<TCurrentResponse>
{
/// <summary>
/// 执行当前阶段并跳转到下一个 continuation。
/// </summary>
public IAsyncEnumerable<TCurrentResponse> Invoke(
TCurrentRequest request,
CancellationToken cancellationToken)
{
return invocation.InvokeBehavior(index, request, cancellationToken);
}
}
}
}

View File

@ -129,12 +129,15 @@ var playerId = await this.SendAsync(new CreatePlayerCommand(new CreatePlayerInpu
- 流式请求
- 通过 `IStreamRequest<TResponse>``IStreamRequestHandler<,>` 返回 `IAsyncEnumerable<TResponse>`
- 当消费端程序集提供 generated stream invoker provider / descriptor 后runtime 会优先消费这组 stream invoker 元数据;未命中时仍回退到既有反射 stream binding 创建路径。
- 所有已注册 `IStreamPipelineBehavior<TRequest, TResponse>` 会在建流阶段包裹对应 stream handler默认实现不拦截每个元素而是围绕单次 `CreateStream(...)` 调用编排行为链。
- 上下文注入
- 处理器基类继承 `CqrsContextAwareHandlerBase`runtime 会在分发前注入当前 `IArchitectureContext`
- 如果处理器或行为需要上下文注入,而当前 `ICqrsContext` 不是 `IArchitectureContext`,默认实现会抛出异常。
- 管道行为
- 所有已注册 `IPipelineBehavior<TRequest, TResponse>` 会包裹请求处理器执行。
- 当前包内提供了 `LoggingBehavior``PerformanceBehavior` 两个可复用行为。
- 所有已注册 `IStreamPipelineBehavior<TRequest, TResponse>` 会包裹流式请求处理器执行。
- 注册入口分别为 `RegisterCqrsPipelineBehavior<TBehavior>()``RegisterCqrsStreamPipelineBehavior<TBehavior>()`
- 当前包内提供了 `LoggingBehavior``PerformanceBehavior` 两个可复用 request 行为stream 行为需要按业务需求自行实现。
## 处理器注册与程序集接入

View File

@ -7,7 +7,7 @@ CQRS 迁移与收敛。
## 当前恢复点
- 恢复点编号:`CQRS-REWRITE-RP-098`
- 恢复点编号:`CQRS-REWRITE-RP-099`
- 当前阶段:`Phase 8`
- 当前 PR 锚点:`PR #334`
- 当前结论:
@ -34,13 +34,14 @@ CQRS 迁移与收敛。
- `RP-096` 已再次使用 `$gframework-pr-review` 复核 `PR #334` latest-head review确认仍显示为 open 的 AI threads 在本地代码中已无新增仍成立的运行时 / 测试 / 文档缺陷,剩余差异主要是 GitHub thread 未 resolve 的状态滞后
- `RP-097` 已继续收口 `PR #334` latest-head nitpick`AsyncQueryExecutorTests` / `CommandExecutorTests` 补齐可观察的上下文保留断言,并让 `RecordingCqrsRuntime` 在测试替身返回错误响应类型时抛出带请求/类型信息的诊断异常
- 当前 `RP-098` 已再次使用 `$gframework-pr-review` 复核 `PR #334` latest-head review并收口 `LegacyCqrsDispatchHelper.TryResolveDispatchContext(...)` 过宽吞掉 `InvalidOperationException` 的真实运行时诊断退化问题;现在仅把“上下文尚未就绪”视为允许 fallback 的信号,并为 fallback / 异常冒泡分别补齐回归测试
- `ai-plan` active 入口现以 `RP-098` 为最新恢复锚点;`PR #334``PR #331``PR #326``PR #323``PR #307` 与其他更早阶段细节均以下方归档或说明为准
- 当前 `RP-099` 已补齐 `GFramework.Cqrs` 的最小 stream pipeline seam新增 `IStreamPipelineBehavior<,>` / `StreamMessageHandlerDelegate<,>``RegisterCqrsStreamPipelineBehavior<TBehavior>()`、dispatcher 侧 stream pipeline executor 缓存与 generated stream invoker 兼容回归,以及 `Architecture` 公开注册入口与对应文档说明
- `ai-plan` active 入口现以 `RP-099` 为最新恢复锚点;`PR #334``PR #331``PR #326``PR #323``PR #307` 与其他更早阶段细节均以下方归档或说明为准
## 当前活跃事实
- 当前分支为 `feat/cqrs-optimization`
- 本轮 `$gframework-batch-boot 50``origin/main` (`2c58d8b6`, 2026-05-07 13:24:46 +0800) 为基线;本地 `main` (`c2d22285`) 已落后,不作为 branch diff 基线
- 当前分支相对 `origin/main` 的累计 branch diff 为 `4 files / 303 lines`,仍明显低于 `$gframework-batch-boot 50` 的文件阈值
- 当前分支相对 `origin/main` 的累计 branch diff 为 `0 files / 0 lines`,仍明显低于 `$gframework-batch-boot 50` 的文件 / 行数阈值
- `GFramework.Cqrs.Benchmarks` 作为 benchmark 基础设施项目,必须持续排除在 NuGet / GitHub Packages 发布集合之外
- `GFramework.Cqrs.Benchmarks` 现已覆盖 request steady-state、pipeline 数量矩阵、startup、request/stream generated invoker以及 request handler `Singleton / Transient` 生命周期矩阵
- `GFramework.Core` 当前已通过内部 bridge request / handler 把 legacy `ICommand``IAsyncCommand``IQuery``IAsyncQuery` 接到统一 `ICqrsRuntime`
@ -53,7 +54,7 @@ CQRS 迁移与收敛。
- `ArchitectureContextTests.CreateFrozenBridgeContext(...)` 现把冻结容器所有权显式交回调用方,并在每个 bridge 用例的 `finally` 中释放
- `CommandExecutorModule``QueryExecutorModule``AsyncQueryExecutorModule` 现改为 `GetRequired<ICqrsRuntime>()` 并在 XML 文档里显式声明注册顺序契约,避免 runtime 缺失时静默回退
- `LegacyAsyncQueryDispatchRequestHandler``LegacyAsyncCommandResultDispatchRequestHandler``LegacyAsyncCommandDispatchRequestHandler` 现都通过 `ThrowIfCancellationRequested()` + `WaitAsync(cancellationToken)` 显式保留调用方取消可见性
- 相对 `ai-libs/Mediator`,当前仍未完全吸收的能力集中在六类facade 公开入口、telemetry、stream pipeline、notification publisher 策略、生成器配置与诊断、生命周期/缓存公开配置面
- 相对 `ai-libs/Mediator`,当前仍未完全吸收的能力集中在五类facade 公开入口、telemetry、notification publisher 策略、生成器配置与诊断、生命周期/缓存公开配置面
- 发布工作流已有 packed modules 校验,但 PR 工作流此前没有等价的 solution pack 产物名单校验
- 本地 `dotnet pack GFramework.sln -c Release --no-restore -o <temp-dir>` 当前只产出 14 个预期包,未复现 benchmark `.nupkg`
- `PR #334``2026-05-07` 的 latest-head review 当前显示 `CodeRabbit 10` / `Greptile 5` 个 open thread本轮再次复核后确认其中大部分仍是已实质修复但未 resolve 的 stale thread`LegacyCqrsDispatchHelper.TryResolveDispatchContext(...)` 的异常边界仍需要继续收口
@ -79,6 +80,7 @@ CQRS 迁移与收敛。
- 若继续扩大 generated invoker 覆盖面,需要持续区分“可静态表达的合同”与 `PreciseReflectedRegistrationSpec` 等仍需保守回退的场景
- legacy bridge 当前只为已有 `Command` / `Query` 兼容入口接到统一 request pipeline若后续要继续对齐 `Mediator`,仍需要单独设计 stream pipeline、telemetry 与 facade 公开面,而不是把这次 bridge 当成“全部收口完成”
- `LegacyBridgePipelineTracker` 仍是进程级静态测试辅助;虽然现在已在相关 fixture 清理阶段重置并补充线程安全说明,但若将来扩大并行 bridge fixture 数量,仍要继续控制共享状态扩散
- stream pipeline 当前只在“单次建流”层面包裹 handler 调用;若后续需要 per-item 拦截、元素级重试或流内 metrics 聚合,仍需额外设计更细粒度 contract而不是把本轮 seam 直接等同于元素级 middleware
## 最近权威验证
@ -175,9 +177,9 @@ CQRS 迁移与收敛。
## 下一推荐步骤
1. 在 GitHub 上 resolve / reply 已被当前分支实质吸收的 `PR #334` stale review threads尤其是仍停留在旧 head 上的 CodeRabbit / Greptile open thread若 head 更新后线程数量继续变化,再用 `$gframework-pr-review` 复核
2. 若继续沿用 `$gframework-batch-boot 50` 且优先处理 `Mediator` 能力吸收,下一批建议从 `stream pipeline``notification publisher` 策略中选择一个独立切片推进
3. 若继续收敛 legacy Core CQRS可评估是否补一个 `IMediator` 风格 facade而不是继续扩大 `ArchitectureContext` 兼容入口的职责
1. 若继续沿用 `$gframework-batch-boot 50` 且优先处理 `Mediator` 能力吸收,下一批建议从 `notification publisher` 策略或 facade 公开入口中选择一个独立切片推进
2. 若后续要增强 stream observability优先评估是否需要元素级 hook而不是直接复用当前建流级 seam 承载更多语义
3. 在 GitHub 上 resolve / reply 已被当前分支实质吸收的 `PR #334` stale review threads若 head 更新后线程数量继续变化,再用 `$gframework-pr-review` 复核
## 活跃文档

View File

@ -2,6 +2,50 @@
## 2026-05-07
### 阶段stream pipeline seam 收口CQRS-REWRITE-RP-099
- 延续 `$gframework-batch-boot 50`,主线程先按 `origin/main` 评估 branch diff 容量,并在 `stream pipeline``notification publisher` 两个独立切片中选择更贴近 active gap 的下一批目标
- 只读 subagent 结论已被接受:
- `notification publisher` 已有稳定 seam、默认顺序实现与专门回归缺口主要在“更多内置策略”
- `stream pipeline` 仍缺独立 contract、注册入口与 runtime executor对应缺口在 public docs 与 active tracking 中都已显式列出
- 本轮主线程决策:
- 为 `GFramework.Cqrs.Abstractions` 新增 `IStreamPipelineBehavior<,>``StreamMessageHandlerDelegate<,>`
- 为 `IIocContainer``IArchitecture``Architecture``ArchitectureModules``MicrosoftDiContainer` 新增 `RegisterCqrsStreamPipelineBehavior<TBehavior>()`
- 为 `CqrsDispatcher``CreateStream(...)` 路径补齐 stream behavior 解析、上下文注入,以及按 behaviorCount 缓存的 stream pipeline executor 形状
- 保持语义边界清晰:本轮 stream pipeline 只包裹单次 `CreateStream(...)` 建流,不扩展到每个元素的逐项 middleware 语义
- 让 generated stream invoker provider 与 stream pipeline seam 共存并补齐“generated invoker 仍命中、行为链仍生效”的回归
- 本轮新增 / 更新的测试方向:
- `CqrsDispatcherCacheTests`stream pipeline executor 缓存、顺序稳定性、上下文重新注入
- `CqrsDispatcherContextValidationTests`stream behavior 需要 `IArchitectureContext` 时的显式失败语义
- `CqrsGeneratedRequestInvokerProviderTests`generated stream invoker 与 stream behavior 并存时仍优先消费 generated descriptor
- `ArchitectureModulesBehaviorTests`:公开 `RegisterCqrsStreamPipelineBehavior<TBehavior>()` 冒烟回归
- 文档收口:
- `GFramework.Cqrs/README.md` 现在显式说明 stream behavior 的建流级作用域
- `docs/zh-CN/core/cqrs.md` 现在区分 request pipeline 与 stream pipeline 两个注册入口,并从“仍缺 stream pipeline seam”的能力差距列表中移除该项
- 当前立即下一步:
- 运行 `GFramework.Cqrs` / `GFramework.Cqrs.Tests` / `GFramework.Core.Tests` 的 Release build 与 targeted tests
- 刷新 `origin/main...HEAD` 的 branch diff files / lines 指标
- 若验证通过,再补 license / diff check 与自动提交
- 本轮权威验证:
- `dotnet build GFramework.Cqrs/GFramework.Cqrs.csproj -c Release`
- 结果:通过,`0 warning / 0 error`
- `dotnet build GFramework.Core/GFramework.Core.csproj -c Release`
- 结果:通过,`0 warning / 0 error`
- `dotnet build GFramework.Cqrs.Tests/GFramework.Cqrs.Tests.csproj -c Release`
- 结果:通过,`0 warning / 0 error`
- `dotnet build GFramework.Core.Tests/GFramework.Core.Tests.csproj -c Release`
- 结果:通过,`0 warning / 0 error`
- `dotnet test GFramework.Cqrs.Tests/GFramework.Cqrs.Tests.csproj -c Release --no-build --filter "FullyQualifiedName~CqrsDispatcherCacheTests|FullyQualifiedName~CqrsGeneratedRequestInvokerProviderTests|FullyQualifiedName~CqrsDispatcherContextValidationTests"`
- 结果:通过,`31/31` passed
- `dotnet test GFramework.Core.Tests/GFramework.Core.Tests.csproj -c Release --no-build --filter "FullyQualifiedName~ArchitectureModulesBehaviorTests"`
- 结果:通过,`4/4` passed
- `env GIT_DIR=... GIT_WORK_TREE=... python3 scripts/license-header.py --check`
- 结果:通过
- `git diff --check`
- 结果:通过
- `origin/main...HEAD`
- 结果:`0 files / 0 lines`
### 阶段PR #334 latest-head helper 异常边界收口CQRS-REWRITE-RP-098
- 再次使用 `$gframework-pr-review` 抓取 `feat/cqrs-optimization` 对应的 `PR #334` latest-head review并重新核对 `/tmp/current-pr-review.json` 中最新 open thread

View File

@ -207,7 +207,19 @@ RegisterCqrsPipelineBehavior<LoggingBehavior<,>>();
- 审计
- 重试或统一异常封装
当前公开入口只有 `RegisterCqrsPipelineBehavior<TBehavior>()`
如果你需要围绕 `CreateStream(...)` 的建流过程插入横切逻辑,实现 `IStreamPipelineBehavior<TRequest, TResponse>`,并在初始化阶段注册:
```csharp
RegisterCqrsStreamPipelineBehavior<MyStreamBehavior<,>>();
```
这里有两个需要明确的边界:
- `RegisterCqrsPipelineBehavior<TBehavior>()`
- 只作用于 `SendRequestAsync(...)` / `SendAsync(...)` / `SendQueryAsync(...)` 这类单次请求分发
- `RegisterCqrsStreamPipelineBehavior<TBehavior>()`
- 作用于 `CreateStream(...)` 的建流阶段
- 默认实现围绕单次建流调用编排行为链,不会自动把行为扩展成“每个流元素都单独拦截一次”
## 和旧 Command / Query 的关系
@ -237,7 +249,6 @@ RegisterCqrsPipelineBehavior<LoggingBehavior<,>>();
- `IMediator` / `ISender` / `IPublisher` 风格的一等 facade 公开入口
- telemetry / tracing / metrics 的运行时与生成器配置面
- 独立的 stream pipeline 行为体系
- 更丰富的 notification publisher 策略
- 更强的生成器配置与诊断公开面
- 生命周期 / 缓存策略的显式公开配置面