diff --git a/GFramework.Core.Tests/Architectures/ArchitectureContextTests.cs b/GFramework.Core.Tests/Architectures/ArchitectureContextTests.cs index e0f204c7..9652f63b 100644 --- a/GFramework.Core.Tests/Architectures/ArchitectureContextTests.cs +++ b/GFramework.Core.Tests/Architectures/ArchitectureContextTests.cs @@ -365,7 +365,165 @@ public class ArchitectureContextTests Times.Exactly(requests.Length)); } + /// + /// 测试 CQRS runtime 在并发首次发布通知时只会从容器解析一次。 + /// + [Test] + public async Task PublishAsync_Should_ResolveCqrsRuntime_OnlyOnce_When_AccessedConcurrently() + { + const int workerCount = 8; + var workerStartupTimeout = TimeSpan.FromSeconds(5); + var firstResolutionTimeout = TimeSpan.FromSeconds(5); + using var startGate = new ManualResetEventSlim(false); + using var allowResolutionToComplete = new ManualResetEventSlim(false); + using var workersReady = new CountdownEvent(workerCount); + var resolutionCallCount = 0; + var runtime = new Mock(MockBehavior.Strict); + var container = new Mock(MockBehavior.Strict); + + runtime.Setup(mockRuntime => mockRuntime.PublishAsync( + It.IsAny(), + It.IsAny(), + It.IsAny())) + .Returns(ValueTask.CompletedTask); + + container.Setup(mockContainer => mockContainer.Get()) + .Returns(() => + { + Interlocked.Increment(ref resolutionCallCount); + allowResolutionToComplete.Wait(); + return runtime.Object; + }); + + var context = new ArchitectureContext(container.Object); + var notifications = Enumerable.Range(0, workerCount) + .Select(_ => Task.Run(async () => + { + workersReady.Signal(); + startGate.Wait(); + await context.PublishAsync(new TestCqrsNotification()).ConfigureAwait(false); + })) + .ToArray(); + + Assert.That( + workersReady.Wait(workerStartupTimeout), + Is.True, + "Expected all workers to be ready before releasing start gate."); + startGate.Set(); + + Assert.That( + SpinWait.SpinUntil(() => Volatile.Read(ref resolutionCallCount) > 0, firstResolutionTimeout), + Is.True, + "Expected at least one CQRS runtime resolution attempt."); + + allowResolutionToComplete.Set(); + + await Task.WhenAll(notifications).ConfigureAwait(false); + + Assert.That(resolutionCallCount, Is.EqualTo(1)); + container.Verify(mockContainer => mockContainer.Get(), Times.Once); + runtime.Verify( + mockRuntime => mockRuntime.PublishAsync( + It.IsAny(), + It.IsAny(), + It.IsAny()), + Times.Exactly(notifications.Length)); + } + + /// + /// 测试 CQRS runtime 在并发首次创建流时只会从容器解析一次。 + /// + [Test] + public async Task CreateStream_Should_ResolveCqrsRuntime_OnlyOnce_When_AccessedConcurrently() + { + const int workerCount = 8; + var workerStartupTimeout = TimeSpan.FromSeconds(5); + var firstResolutionTimeout = TimeSpan.FromSeconds(5); + using var startGate = new ManualResetEventSlim(false); + using var allowResolutionToComplete = new ManualResetEventSlim(false); + using var workersReady = new CountdownEvent(workerCount); + var resolutionCallCount = 0; + var runtime = new Mock(MockBehavior.Strict); + var container = new Mock(MockBehavior.Strict); + + runtime.Setup(mockRuntime => mockRuntime.CreateStream( + It.IsAny(), + It.IsAny(), + It.IsAny())) + .Returns(static () => CreateTestCqrsStream()); + + container.Setup(mockContainer => mockContainer.Get()) + .Returns(() => + { + Interlocked.Increment(ref resolutionCallCount); + allowResolutionToComplete.Wait(); + return runtime.Object; + }); + + var context = new ArchitectureContext(container.Object); + var streamTasks = Enumerable.Range(0, workerCount) + .Select(_ => Task.Run(async () => + { + workersReady.Signal(); + startGate.Wait(); + await DrainAsync(context.CreateStream(new TestCqrsStreamRequest())).ConfigureAwait(false); + })) + .ToArray(); + + Assert.That( + workersReady.Wait(workerStartupTimeout), + Is.True, + "Expected all workers to be ready before releasing start gate."); + startGate.Set(); + + Assert.That( + SpinWait.SpinUntil(() => Volatile.Read(ref resolutionCallCount) > 0, firstResolutionTimeout), + Is.True, + "Expected at least one CQRS runtime resolution attempt."); + + allowResolutionToComplete.Set(); + + await Task.WhenAll(streamTasks).ConfigureAwait(false); + + Assert.That(resolutionCallCount, Is.EqualTo(1)); + container.Verify(mockContainer => mockContainer.Get(), Times.Once); + runtime.Verify( + mockRuntime => mockRuntime.CreateStream( + It.IsAny(), + It.IsAny(), + It.IsAny()), + Times.Exactly(streamTasks.Length)); + } + + /// + /// 枚举完整个测试流,确保 `CreateStream` 路径真正执行到底。 + /// + /// 要消费的异步流。 + /// 表示消费完成的任务。 + private static async Task DrainAsync(IAsyncEnumerable stream) + { + ArgumentNullException.ThrowIfNull(stream); + + await foreach (var _ in stream.ConfigureAwait(false)) + { + } + } + + /// + /// 为 `CreateStream` 并发解析测试提供最小异步流。 + /// + /// 只包含单个元素的异步流。 + private static async IAsyncEnumerable CreateTestCqrsStream() + { + yield return 42; + await Task.CompletedTask.ConfigureAwait(false); + } + private sealed class TestCqrsRequest : IRequest { } + + private sealed record TestCqrsNotification : INotification; + + private sealed record TestCqrsStreamRequest : IStreamRequest; } diff --git a/ai-plan/public/cqrs-rewrite/todos/cqrs-rewrite-migration-tracking.md b/ai-plan/public/cqrs-rewrite/todos/cqrs-rewrite-migration-tracking.md index fa5a809d..0d4a6e66 100644 --- a/ai-plan/public/cqrs-rewrite/todos/cqrs-rewrite-migration-tracking.md +++ b/ai-plan/public/cqrs-rewrite/todos/cqrs-rewrite-migration-tracking.md @@ -7,7 +7,7 @@ CQRS 迁移与收敛。 ## 当前恢复点 -- 恢复点编号:`CQRS-REWRITE-RP-064` +- 恢复点编号:`CQRS-REWRITE-RP-065` - 当前阶段:`Phase 8` - 当前焦点: - 已完成一轮 `CQRS vs Mediator` 只读评估归档,结论已沉淀到 `archive/todos/cqrs-vs-mediator-assessment-rp063.md` @@ -26,6 +26,14 @@ CQRS 迁移与收敛。 容器里已显式注册的 `INotificationPublisher` - 已补充 `CqrsNotificationPublisherTests`,覆盖自定义 publisher 接管、上下文注入、零处理器静默完成、首错即停,以及 `RegisterInfrastructure` 默认接线复用预注册 publisher 的回归 + - 已完成一轮 `Mediator` 测试命名收口: + - `MediatorAdvancedFeaturesTests` -> `CqrsArchitectureContextAdvancedFeaturesTests` + - `MediatorArchitectureIntegrationTests` -> `CqrsArchitectureContextIntegrationTests` + - `MediatorComprehensiveTests` -> `ArchitectureContextComprehensiveTests` + - `GFramework.Cqrs.Tests` 中这三份历史测试现已统一迁入 `Cqrs/` 目录,并将命名空间、类名、中文注释与嵌套测试类型中的 + `Mediator` 语义收口为 `CQRS` / `ArchitectureContext` + - 已补充 `ArchitectureContextTests` 并发 lazy-resolution 回归,锁定 `PublishAsync(...)` 与 `CreateStream(...)` + 在并发首次访问时也只会解析一次 `ICqrsRuntime` - 已将 mixed fallback 场景进一步收敛:当 runtime 允许同一程序集声明多个 `CqrsReflectionFallbackAttribute` 实例时,generator 现会把可直接引用的 fallback handlers 与仅能按名称恢复的 fallback handlers 拆分发射 - `CqrsReflectionFallbackAttribute` 现允许多实例,以承载 `Type[]` 与字符串 fallback 元数据的组合输出 - 已将 generator 的程序集级 fallback 元数据进一步收敛:当全部 fallback handlers 都可直接引用且 runtime 暴露 `params Type[]` 合同时,生成器现优先发射 `typeof(...)` 形式的 fallback 元数据 @@ -158,6 +166,10 @@ CQRS 迁移与收敛。 - 当前 seam 刻意保持在默认 runtime 内部:`ICqrsRuntime.PublishAsync(...)` 外形不变,dispatcher 仍负责 handler 解析与 `IContextAware` 上下文注入 - 用户若需替换通知发布策略,只需在 runtime 创建前向容器显式注册 `INotificationPublisher` +- `2026-04-30` 已接受三条 worker 切片并完成一轮测试命名收口: + - 三个 worker 分别独立拥有一份 `GFramework.Cqrs.Tests/Mediator/*.cs` 文件,主线程只做集成验证与后续追踪更新 + - 当前分支已不再保留 `GFramework.Cqrs.Tests/Mediator/` 目录下的生产内涵测试,相关文件均迁移到 `GFramework.Cqrs.Tests/Cqrs/` + - 本轮没有修改测试行为,只收口命名、注释、局部变量与嵌套测试类型语义 - 当前主线优先级: - dispatch/invoker 反射占比继续下降,并优先评估生成前移方案 - 基于已落地 publisher seam,继续评估是否需要公开配置面、并行策略或 telemetry decorator @@ -204,9 +216,15 @@ CQRS 迁移与收敛。 - `dotnet test GFramework.Core.Tests/GFramework.Core.Tests.csproj -c Release --filter "FullyQualifiedName~MicrosoftDiContainerTests"` - 结果:通过 - 备注:`41/41` 通过;确认 CQRS 基础设施默认接线与容器行为未回归 +- `dotnet build GFramework.Cqrs.Tests/GFramework.Cqrs.Tests.csproj -c Release` + - 结果:通过 + - 备注:`0 warning / 0 error`;确认三份 `Mediator` 命名收口后的 CQRS 测试项目构建仍然干净 +- `dotnet test GFramework.Core.Tests/GFramework.Core.Tests.csproj -c Release --filter "FullyQualifiedName~ArchitectureContextTests"` + - 结果:通过 + - 备注:`22/22` 通过;新增 `PublishAsync` / `CreateStream` 并发首次访问只解析一次 `ICqrsRuntime` 的回归 ## 下一步 1. 基于已落地的 notification publisher seam,评估是否需要第二阶段公开配置面、并行 publisher 或 telemetry decorator 2. 继续以 `dispatch/invoker` 生成前移为优先对象,补一轮面向实现的设计评估 -3. 单独规划旧 `Command` / `Query` API、`LegacyICqrsRuntime` 与 `Mediator` 测试命名的收口顺序,避免与 runtime 微优化混做 +3. 单独规划旧 `Command` / `Query` API 与 `LegacyICqrsRuntime` 的收口顺序;`Mediator` 测试命名收口已完成,可移出该子问题 diff --git a/ai-plan/public/cqrs-rewrite/traces/cqrs-rewrite-migration-trace.md b/ai-plan/public/cqrs-rewrite/traces/cqrs-rewrite-migration-trace.md index cffece52..f07bc614 100644 --- a/ai-plan/public/cqrs-rewrite/traces/cqrs-rewrite-migration-trace.md +++ b/ai-plan/public/cqrs-rewrite/traces/cqrs-rewrite-migration-trace.md @@ -2,6 +2,33 @@ ## 2026-04-30 +### 阶段:测试命名收口与 ArchitectureContext lazy-resolution 回归(CQRS-REWRITE-RP-065) + +- 继续按 `gframework-batch-boot 50` 执行,基线仍为本地现有 `origin/main` +- `22f608eb` 之后复算 branch diff,相对 `origin/main` 已达到 `18 files`,仍明显低于 `50 files` stop condition,因此继续下一批 +- 本轮拆成四个互不冲突切片: + - worker 1:`MediatorAdvancedFeaturesTests.cs` + - worker 2:`MediatorArchitectureIntegrationTests.cs` + - worker 3:`MediatorComprehensiveTests.cs` + - 主线程:`GFramework.Core.Tests/Architectures/ArchitectureContextTests.cs` +- 三个 worker 均只收口单文件命名与注释语义,并把测试文件迁移到 `GFramework.Cqrs.Tests/Cqrs/` +- 主线程新增 `ArchitectureContextTests` 并发 lazy-resolution 回归,锁定: + - `PublishAsync(...)` 在并发首次访问时只解析一次 `ICqrsRuntime` + - `CreateStream(...)` 在并发首次访问时只解析一次 `ICqrsRuntime` +- 集成后已确认三份测试文件中不再残留 `GFramework.Cqrs.Tests.Mediator` 命名空间或 `Mediator` 语义命名 + +### 验证 + +- `dotnet build GFramework.Cqrs.Tests/GFramework.Cqrs.Tests.csproj -c Release` + - 结果:通过,`0 warning / 0 error` +- `dotnet test GFramework.Core.Tests/GFramework.Core.Tests.csproj -c Release --filter "FullyQualifiedName~ArchitectureContextTests"` + - 结果:通过,`22/22` passed + +### 当前下一步 + +1. 继续 `Phase 8` 主线,回到 `dispatch/invoker` 生成前移或 `LegacyICqrsRuntime` 收口的下一个低风险切片 +2. 在下一次 batch 结束后复算 branch diff,确认距 `50 files` stop condition 的剩余 headroom + ### 阶段:notification publisher seam 最小落地(CQRS-REWRITE-RP-064) - 本轮按 `gframework-batch-boot 50` 继续 `cqrs-rewrite`,基线使用本地现有 `origin/main`