diff --git a/GFramework.Cqrs.Tests/Cqrs/CqrsDispatcherCacheTests.cs b/GFramework.Cqrs.Tests/Cqrs/CqrsDispatcherCacheTests.cs index 919a15d7..3d2040ab 100644 --- a/GFramework.Cqrs.Tests/Cqrs/CqrsDispatcherCacheTests.cs +++ b/GFramework.Cqrs.Tests/Cqrs/CqrsDispatcherCacheTests.cs @@ -35,7 +35,9 @@ internal sealed class CqrsDispatcherCacheTests _container.Freeze(); _context = new ArchitectureContext(_container); + DispatcherNotificationContextRefreshState.Reset(); DispatcherPipelineContextRefreshState.Reset(); + DispatcherStreamContextRefreshState.Reset(); ClearDispatcherCaches(); } @@ -300,6 +302,92 @@ internal sealed class CqrsDispatcherCacheTests }); } + /// + /// 验证缓存的 notification dispatch binding 在重复分发时仍会重新解析 handler, + /// 并为当次实例重新注入当前架构上下文。 + /// + [Test] + public async Task Dispatcher_Should_Reinject_Current_Context_When_Reusing_Cached_Notification_Dispatch_Binding() + { + DispatcherNotificationContextRefreshState.Reset(); + + var notificationBindings = GetCacheField("NotificationDispatchBindings"); + var firstContext = new ArchitectureContext(_container!); + var secondContext = new ArchitectureContext(_container!); + + await firstContext.PublishAsync(new DispatcherNotificationContextRefreshNotification("first")); + + var bindingAfterFirstDispatch = GetSingleKeyCacheValue( + notificationBindings, + typeof(DispatcherNotificationContextRefreshNotification)); + + await secondContext.PublishAsync(new DispatcherNotificationContextRefreshNotification("second")); + + var bindingAfterSecondDispatch = GetSingleKeyCacheValue( + notificationBindings, + typeof(DispatcherNotificationContextRefreshNotification)); + var handlerSnapshots = DispatcherNotificationContextRefreshState.HandlerSnapshots.ToArray(); + + Assert.Multiple(() => + { + Assert.That(bindingAfterFirstDispatch, Is.Not.Null); + Assert.That(bindingAfterSecondDispatch, Is.SameAs(bindingAfterFirstDispatch)); + + Assert.That(handlerSnapshots, Has.Length.EqualTo(2)); + Assert.That(handlerSnapshots[0].DispatchId, Is.EqualTo("first")); + Assert.That(handlerSnapshots[0].Context, Is.SameAs(firstContext)); + Assert.That(handlerSnapshots[1].DispatchId, Is.EqualTo("second")); + Assert.That(handlerSnapshots[1].Context, Is.SameAs(secondContext)); + Assert.That(handlerSnapshots[1].Context, Is.Not.SameAs(handlerSnapshots[0].Context)); + Assert.That(handlerSnapshots[1].InstanceId, Is.Not.EqualTo(handlerSnapshots[0].InstanceId)); + }); + } + + /// + /// 验证缓存的 stream dispatch binding 在重复建流时仍会重新解析 handler, + /// 并为当次实例重新注入当前架构上下文。 + /// + [Test] + public async Task Dispatcher_Should_Reinject_Current_Context_When_Reusing_Cached_Stream_Dispatch_Binding() + { + DispatcherStreamContextRefreshState.Reset(); + + var streamBindings = GetCacheField("StreamDispatchBindings"); + var firstContext = new ArchitectureContext(_container!); + var secondContext = new ArchitectureContext(_container!); + + var firstStream = firstContext.CreateStream(new DispatcherStreamContextRefreshRequest("first")); + await DrainAsync(firstStream); + + var bindingAfterFirstDispatch = GetPairCacheValue( + streamBindings, + typeof(DispatcherStreamContextRefreshRequest), + typeof(int)); + + var secondStream = secondContext.CreateStream(new DispatcherStreamContextRefreshRequest("second")); + await DrainAsync(secondStream); + + var bindingAfterSecondDispatch = GetPairCacheValue( + streamBindings, + typeof(DispatcherStreamContextRefreshRequest), + typeof(int)); + var handlerSnapshots = DispatcherStreamContextRefreshState.HandlerSnapshots.ToArray(); + + Assert.Multiple(() => + { + Assert.That(bindingAfterFirstDispatch, Is.Not.Null); + Assert.That(bindingAfterSecondDispatch, Is.SameAs(bindingAfterFirstDispatch)); + + Assert.That(handlerSnapshots, Has.Length.EqualTo(2)); + Assert.That(handlerSnapshots[0].DispatchId, Is.EqualTo("first")); + Assert.That(handlerSnapshots[0].Context, Is.SameAs(firstContext)); + Assert.That(handlerSnapshots[1].DispatchId, Is.EqualTo("second")); + Assert.That(handlerSnapshots[1].Context, Is.SameAs(secondContext)); + Assert.That(handlerSnapshots[1].Context, Is.Not.SameAs(handlerSnapshots[0].Context)); + Assert.That(handlerSnapshots[1].InstanceId, Is.Not.EqualTo(handlerSnapshots[0].InstanceId)); + }); + } + /// /// 通过反射读取 dispatcher 的静态缓存对象。 /// diff --git a/GFramework.Cqrs.Tests/Cqrs/DispatcherNotificationContextRefreshHandler.cs b/GFramework.Cqrs.Tests/Cqrs/DispatcherNotificationContextRefreshHandler.cs new file mode 100644 index 00000000..61a07ccc --- /dev/null +++ b/GFramework.Cqrs.Tests/Cqrs/DispatcherNotificationContextRefreshHandler.cs @@ -0,0 +1,29 @@ +using System.Threading; +using GFramework.Cqrs.Abstractions.Cqrs; +using GFramework.Cqrs.Cqrs; + +namespace GFramework.Cqrs.Tests.Cqrs; + +/// +/// 记录缓存 notification binding 复用场景下每次分发注入到 handler 的上下文与实例身份。 +/// +internal sealed class DispatcherNotificationContextRefreshHandler + : CqrsContextAwareHandlerBase, + INotificationHandler +{ + private readonly int _instanceId = DispatcherNotificationContextRefreshState.AllocateHandlerInstanceId(); + + /// + /// 记录当前 handler 实例收到的上下文。 + /// + /// 当前通知。 + /// 取消令牌。 + /// 已完成任务。 + public ValueTask Handle( + DispatcherNotificationContextRefreshNotification notification, + CancellationToken cancellationToken) + { + DispatcherNotificationContextRefreshState.Record(notification.DispatchId, _instanceId, Context); + return ValueTask.CompletedTask; + } +} diff --git a/GFramework.Cqrs.Tests/Cqrs/DispatcherNotificationContextRefreshNotification.cs b/GFramework.Cqrs.Tests/Cqrs/DispatcherNotificationContextRefreshNotification.cs new file mode 100644 index 00000000..f1f54dd2 --- /dev/null +++ b/GFramework.Cqrs.Tests/Cqrs/DispatcherNotificationContextRefreshNotification.cs @@ -0,0 +1,8 @@ +using GFramework.Cqrs.Abstractions.Cqrs; + +namespace GFramework.Cqrs.Tests.Cqrs; + +/// +/// 为 notification dispatch binding 上下文刷新回归提供带分发标识的最小通知。 +/// +internal sealed record DispatcherNotificationContextRefreshNotification(string DispatchId) : INotification; diff --git a/GFramework.Cqrs.Tests/Cqrs/DispatcherNotificationContextRefreshState.cs b/GFramework.Cqrs.Tests/Cqrs/DispatcherNotificationContextRefreshState.cs new file mode 100644 index 00000000..43c32531 --- /dev/null +++ b/GFramework.Cqrs.Tests/Cqrs/DispatcherNotificationContextRefreshState.cs @@ -0,0 +1,42 @@ +using System.Threading; +using GFramework.Core.Abstractions.Architectures; + +namespace GFramework.Cqrs.Tests.Cqrs; + +/// +/// 记录 notification dispatch binding 缓存回归中每次分发实际使用的上下文与实例身份。 +/// +internal static class DispatcherNotificationContextRefreshState +{ + private static int _nextHandlerInstanceId; + + /// + /// 获取每次 notification 分发时记录的快照。 + /// + public static List HandlerSnapshots { get; } = []; + + /// + /// 为新的 handler 测试实例分配稳定编号。 + /// + public static int AllocateHandlerInstanceId() + { + return Interlocked.Increment(ref _nextHandlerInstanceId); + } + + /// + /// 记录 handler 在当前分发中观察到的上下文。 + /// + public static void Record(string dispatchId, int instanceId, IArchitectureContext context) + { + HandlerSnapshots.Add(new DispatcherPipelineContextSnapshot(dispatchId, instanceId, context)); + } + + /// + /// 清空历史记录与实例编号,避免跨测试污染断言。 + /// + public static void Reset() + { + _nextHandlerInstanceId = 0; + HandlerSnapshots.Clear(); + } +} diff --git a/GFramework.Cqrs.Tests/Cqrs/DispatcherStreamContextRefreshHandler.cs b/GFramework.Cqrs.Tests/Cqrs/DispatcherStreamContextRefreshHandler.cs new file mode 100644 index 00000000..95d7c998 --- /dev/null +++ b/GFramework.Cqrs.Tests/Cqrs/DispatcherStreamContextRefreshHandler.cs @@ -0,0 +1,32 @@ +using System.Collections.Generic; +using System.Runtime.CompilerServices; +using System.Threading; +using GFramework.Cqrs.Abstractions.Cqrs; +using GFramework.Cqrs.Cqrs; + +namespace GFramework.Cqrs.Tests.Cqrs; + +/// +/// 记录缓存 stream binding 复用场景下每次分发注入到 handler 的上下文与实例身份。 +/// +internal sealed class DispatcherStreamContextRefreshHandler + : CqrsContextAwareHandlerBase, + IStreamRequestHandler +{ + private readonly int _instanceId = DispatcherStreamContextRefreshState.AllocateHandlerInstanceId(); + + /// + /// 记录当前 handler 实例收到的上下文,并返回稳定元素。 + /// + /// 当前流请求。 + /// 取消令牌。 + /// 包含一个固定元素的异步流。 + public async IAsyncEnumerable Handle( + DispatcherStreamContextRefreshRequest request, + [EnumeratorCancellation] CancellationToken cancellationToken) + { + DispatcherStreamContextRefreshState.Record(request.DispatchId, _instanceId, Context); + yield return 11; + await ValueTask.CompletedTask.ConfigureAwait(false); + } +} diff --git a/GFramework.Cqrs.Tests/Cqrs/DispatcherStreamContextRefreshRequest.cs b/GFramework.Cqrs.Tests/Cqrs/DispatcherStreamContextRefreshRequest.cs new file mode 100644 index 00000000..c8d69cfe --- /dev/null +++ b/GFramework.Cqrs.Tests/Cqrs/DispatcherStreamContextRefreshRequest.cs @@ -0,0 +1,8 @@ +using GFramework.Cqrs.Abstractions.Cqrs; + +namespace GFramework.Cqrs.Tests.Cqrs; + +/// +/// 为 stream dispatch binding 上下文刷新回归提供带分发标识的最小流请求。 +/// +internal sealed record DispatcherStreamContextRefreshRequest(string DispatchId) : IStreamRequest; diff --git a/GFramework.Cqrs.Tests/Cqrs/DispatcherStreamContextRefreshState.cs b/GFramework.Cqrs.Tests/Cqrs/DispatcherStreamContextRefreshState.cs new file mode 100644 index 00000000..a315451d --- /dev/null +++ b/GFramework.Cqrs.Tests/Cqrs/DispatcherStreamContextRefreshState.cs @@ -0,0 +1,42 @@ +using System.Threading; +using GFramework.Core.Abstractions.Architectures; + +namespace GFramework.Cqrs.Tests.Cqrs; + +/// +/// 记录 stream dispatch binding 缓存回归中每次分发实际使用的上下文与实例身份。 +/// +internal static class DispatcherStreamContextRefreshState +{ + private static int _nextHandlerInstanceId; + + /// + /// 获取每次建流时记录的快照。 + /// + public static List HandlerSnapshots { get; } = []; + + /// + /// 为新的 handler 测试实例分配稳定编号。 + /// + public static int AllocateHandlerInstanceId() + { + return Interlocked.Increment(ref _nextHandlerInstanceId); + } + + /// + /// 记录 handler 在当前建流中观察到的上下文。 + /// + public static void Record(string dispatchId, int instanceId, IArchitectureContext context) + { + HandlerSnapshots.Add(new DispatcherPipelineContextSnapshot(dispatchId, instanceId, context)); + } + + /// + /// 清空历史记录与实例编号,避免跨测试污染断言。 + /// + public static void Reset() + { + _nextHandlerInstanceId = 0; + HandlerSnapshots.Clear(); + } +} diff --git a/ai-plan/public/cqrs-rewrite/todos/cqrs-rewrite-migration-tracking.md b/ai-plan/public/cqrs-rewrite/todos/cqrs-rewrite-migration-tracking.md index ec7fcc6b..fec9f710 100644 --- a/ai-plan/public/cqrs-rewrite/todos/cqrs-rewrite-migration-tracking.md +++ b/ai-plan/public/cqrs-rewrite/todos/cqrs-rewrite-migration-tracking.md @@ -7,7 +7,7 @@ CQRS 迁移与收敛。 ## 当前恢复点 -- 恢复点编号:`CQRS-REWRITE-RP-057` +- 恢复点编号:`CQRS-REWRITE-RP-059` - 当前阶段:`Phase 8` - 当前焦点: - 当前功能历史已归档,active 跟踪仅保留 `Phase 8` 主线的恢复入口 @@ -18,6 +18,7 @@ CQRS 迁移与收敛。 - 已完成 request pipeline executor 形状缓存:`CqrsDispatcher` 现会在单个 request binding 内按 `behaviorCount` 复用强类型 pipeline executor,而不是每次 `SendAsync` 都重建整条 `next` 委托链 - 已补充 dispatcher pipeline executor 缓存与双行为顺序回归,锁定缓存复用后仍保持现有行为执行顺序 - 已补充 cached request pipeline executor 的上下文刷新回归,锁定 executor 复用时仍会为当次 handler / singleton behavior 重新注入当前 `ArchitectureContext` + - 已补充 cached notification / stream dispatch binding 的上下文刷新回归,锁定 binding 复用时仍会为当次 handler 重新注入当前 `ArchitectureContext` - 已完成 generated registry 激活路径收敛:`CqrsHandlerRegistrar` 现优先复用缓存工厂委托,避免重复 `ConstructorInfo.Invoke` - 已补充私有无参构造 generated registry 的回归测试,确保兼容现有生成器产物 - 已修正 pointer / function pointer 泛型合同的错误覆盖:生成器不再为这两类类型发射 precise runtime type 重建代码 @@ -25,6 +26,7 @@ CQRS 迁移与收敛。 - 已为 registrar 的 reflection 注册路径补充 handler-interface 元数据缓存,减少跨容器重复注册时的 `GetInterfaces()` 反射 - 已将 registrar 的重复映射判定从线性扫描 `IServiceCollection` 收敛为本地映射索引,减少 fallback 注册路径的重复查找 - 已完成一轮 `static lambda + state` 微收敛:`CqrsDispatcher` 与 `CqrsHandlerRegistrar` 现会在弱缓存 / 并发缓存入口优先使用无捕获工厂,继续压低热路径上的额外闭包分配 + - 已补充 `CqrsReflectionFallbackAttribute` 叶子级合同测试,锁定空 marker、字符串 fallback 名称归一化、直接 `Type` fallback 归一化与空参数防御语义 - 中期上继续 `Phase 8` 主线:参考 `ai-libs/Mediator`,继续扩大 generator 覆盖,并选择下一个收益明确的 dispatch / invoker 反射收敛点 ## 当前状态摘要 @@ -95,6 +97,13 @@ CQRS 迁移与收敛。 - `GFramework.Cqrs.Tests` 已新增 `DispatcherPipelineContextRefresh*` 测试替身,分别记录 request handler 与 pipeline behavior 在每次分发中实际观察到的实例身份与 `ArchitectureContext` - `CqrsDispatcherCacheTests` 现明确断言:同一个 cached request pipeline executor 在重复分发时会继续命中同一 executor 形状,但不会跨分发保留旧上下文 - 本轮定向测试未暴露新的 runtime 缺口,因此没有改动 `GFramework.Cqrs/Internal/CqrsDispatcher.cs` +- `2026-04-29` 已完成一轮 cached notification / stream binding 上下文刷新回归补强: + - `GFramework.Cqrs.Tests` 已新增 `DispatcherNotificationContextRefresh*` 与 `DispatcherStreamContextRefresh*` 测试替身,分别记录 notification handler 与 stream handler 在重复分发时观察到的实例身份与 `ArchitectureContext` + - `CqrsDispatcherCacheTests` 现明确断言:同一个 cached notification / stream dispatch binding 在重复分发时会继续命中同一 binding,但不会跨分发保留旧上下文 + - 本轮定向测试未暴露新的 runtime 缺口,因此没有改动 `GFramework.Cqrs/Internal/CqrsDispatcher.cs` +- `2026-04-29` 已接受一轮 delegated 叶子级 fallback 合同测试: + - `GFramework.Cqrs.Tests/Cqrs/CqrsReflectionFallbackAttributeTests.cs` 已锁定空 marker、字符串 fallback 名称去空/去重/排序、直接 `Type` fallback 去空/去重/排序与空参数数组防御语义 + - 当前 runtime 读取程序集级 fallback 元数据时所依赖的 attribute 归一化合同,现已有独立叶子级测试文件覆盖 - `2026-04-29` 已完成一轮 CQRS 入口文档对齐: - `GFramework.Cqrs/README.md`、`docs/zh-CN/core/cqrs.md` 与 `docs/zh-CN/api-reference/index.md` 现已明确 generated registry 优先、targeted fallback 补齐剩余 handler 的当前语义 - `2026-04-29` 已完成一轮 generator pointer runtime-reconstruction 残留清理: @@ -125,6 +134,12 @@ CQRS 迁移与收敛。 - `dotnet test GFramework.Cqrs.Tests/GFramework.Cqrs.Tests.csproj -c Release --filter "FullyQualifiedName~GFramework.Cqrs.Tests.Cqrs.CqrsDispatcherCacheTests"` - 结果:通过 - 备注:`5/5` 测试通过;本轮新增 cached executor 上下文刷新回归,确认 executor 复用时仍按当次分发重新注入上下文 +- `dotnet test GFramework.Cqrs.Tests/GFramework.Cqrs.Tests.csproj -c Release --filter "FullyQualifiedName~GFramework.Cqrs.Tests.Cqrs.CqrsReflectionFallbackAttributeTests"` + - 结果:通过 + - 备注:`5/5` 测试通过;本轮锁定 fallback attribute 的公开归一化合同与空参数防御语义 +- `dotnet test GFramework.Cqrs.Tests/GFramework.Cqrs.Tests.csproj -c Release --filter "FullyQualifiedName~GFramework.Cqrs.Tests.Cqrs.CqrsDispatcherCacheTests"` + - 结果:通过 + - 备注:`7/7` 测试通过;本轮新增 cached notification / stream binding 上下文刷新回归,确认 binding 复用时仍按当次分发重新注入上下文 - `dotnet test GFramework.Cqrs.Tests/GFramework.Cqrs.Tests.csproj -c Release --no-restore -p:RestoreFallbackFolders= -m:1 -nodeReuse:false` - 结果:通过 - 备注:`63/63` 测试通过;当前沙箱限制了 MSBuild named pipe,验证需在提权环境下运行 @@ -194,6 +209,6 @@ CQRS 迁移与收敛。 ## 下一步 -1. 继续 `Phase 8` 主线,优先再找一个收益明确且写集独立的 generator 或 registrar/dispatcher 热点;在下一次提交后重新计算相对 `origin/main` 的累计 diff,并继续朝 `50 files` stop condition 推进 +1. 继续 `Phase 8` 主线,优先再找一个收益明确且写集独立的 generator 或 registrar/dispatcher 热点;当前工作区若提交主线程 notification / stream 回归批次,相对 `origin/main` 的累计 diff 将达到 `29 files`,仍低于本轮 `gframework-batch-boot 50` 的主要 stop condition 2. 若继续文档主线,优先再扫教程入口页与 API 参考中的 CQRS 采用说明,确认是否还有旧 Command / Query 迁移口径残留 3. 若后续再出现新的 PR review 或 review thread 变化,再重新执行 `$gframework-pr-review` 作为独立验证步骤 diff --git a/ai-plan/public/cqrs-rewrite/traces/cqrs-rewrite-migration-trace.md b/ai-plan/public/cqrs-rewrite/traces/cqrs-rewrite-migration-trace.md index 7cf929c4..41a14f44 100644 --- a/ai-plan/public/cqrs-rewrite/traces/cqrs-rewrite-migration-trace.md +++ b/ai-plan/public/cqrs-rewrite/traces/cqrs-rewrite-migration-trace.md @@ -2,6 +2,33 @@ ## 2026-04-29 +### 阶段:notification / stream binding 上下文刷新回归(CQRS-REWRITE-RP-059) + +- 延续 `gframework-batch-boot 50` 的 `Phase 8` 主线,本轮继续沿着上一批 dispatcher cached executor 上下文回归往外扩一圈,但只覆盖 notification / stream 两条非 request 路径 +- 主线程先复核 `CqrsDispatcher` 当前实现后确认: + - `PublishAsync(...)` 与 `CreateStream(...)` 都会在命中缓存 binding 后重新解析 handler,并在调用前执行 `PrepareHandler(...)` + - 因此本轮最稳妥的切片仍是测试补强,而不是继续改 runtime +- 已完成的测试补强: + - 在 `GFramework.Cqrs.Tests/Cqrs/` 新增 `DispatcherNotificationContextRefresh*` 与 `DispatcherStreamContextRefresh*` 测试替身,记录重复分发时 handler 实例身份与 `ArchitectureContext` + - `CqrsDispatcherCacheTests` 新增 `Dispatcher_Should_Reinject_Current_Context_When_Reusing_Cached_Notification_Dispatch_Binding` + - `CqrsDispatcherCacheTests` 新增 `Dispatcher_Should_Reinject_Current_Context_When_Reusing_Cached_Stream_Dispatch_Binding` +- 定向验证已通过: + - `dotnet test GFramework.Cqrs.Tests/GFramework.Cqrs.Tests.csproj -c Release --filter "FullyQualifiedName~GFramework.Cqrs.Tests.Cqrs.CqrsDispatcherCacheTests"` + - `7/7` passed +- 结果: + - 本轮未暴露新的 runtime 实现缺口,因此没有改动 `GFramework.Cqrs/Internal/CqrsDispatcher.cs` + - 若连同当前工作区一起计算,当前分支相对 `origin/main` 的累计 diff 将达到 `29 files`,继续低于 `gframework-batch-boot 50` 的主要 stop condition + +### 阶段:delegated fallback attribute 合同测试(CQRS-REWRITE-RP-058) + +- 本轮按 `gframework-batch-boot 50` 的并行约束,把一个与主线程写集完全独立的叶子级测试文件交给 worker: + - delegated scope:`GFramework.Cqrs.Tests/Cqrs/CqrsReflectionFallbackAttributeTests.cs` + - delegated objective:锁定 `CqrsReflectionFallbackAttribute` 的公开归一化合同,而不扩张到 registrar / generator / dispatcher 实现 +- 已接受的 worker 结果: + - 新增 `CqrsReflectionFallbackAttributeTests`,覆盖空 marker、字符串 fallback 名称的去空/去重/排序、直接 `Type` fallback 的去空/去重/排序,以及两个重载对空参数数组的防御行为 + - worker 已独立验证 `dotnet test GFramework.Cqrs.Tests/GFramework.Cqrs.Tests.csproj -c Release --filter "FullyQualifiedName~GFramework.Cqrs.Tests.Cqrs.CqrsReflectionFallbackAttributeTests"`,结果为 `5/5` passed + - 该叶子级测试批次已作为独立提交落地:`86a24e00` `test(cqrs): 新增 ReflectionFallbackAttribute 合同测试` + ### 阶段:cached executor 上下文刷新回归(CQRS-REWRITE-RP-057) - 延续 `gframework-batch-boot 50` 的 `Phase 8` 主线,本轮只处理一个窄写集测试批次:为 cached request pipeline executor 增加“重复分发仍重新注入上下文”的回归