From f9dd105bccb8184fc9201ca0e901fff9f4717d74 Mon Sep 17 00:00:00 2001
From: gewuyou <95328647+GeWuYou@users.noreply.github.com>
Date: Sat, 9 May 2026 12:42:03 +0800
Subject: [PATCH] =?UTF-8?q?perf(cqrs):=20=E7=BC=93=E5=AD=98=20stream=20pip?=
=?UTF-8?q?eline=20=E5=AD=98=E5=9C=A8=E6=80=A7=E5=88=A4=E5=AE=9A?=
MIME-Version: 1.0
Content-Type: text/plain; charset=UTF-8
Content-Transfer-Encoding: 8bit
- 优化 CqrsDispatcher 的 CreateStream 热路径,按 dispatcher 实例缓存 stream pipeline behavior 的服务可见性
- 新增 stream presence cache 回归与最小测试桩,锁住同容器共享、跨容器隔离的缓存语义
- 更新 cqrs-rewrite 恢复文档并补充本轮 stream benchmark 验证结果
---
.../Cqrs/CqrsDispatcherCacheTests.cs | 115 ++++++++++++++++++
.../DispatcherZeroPipelineStreamHandler.cs | 32 +++++
.../DispatcherZeroPipelineStreamRequest.cs | 11 ++
GFramework.Cqrs/Internal/CqrsDispatcher.cs | 21 +++-
.../todos/cqrs-rewrite-migration-tracking.md | 35 +++++-
.../traces/cqrs-rewrite-migration-trace.md | 27 ++++
6 files changed, 235 insertions(+), 6 deletions(-)
create mode 100644 GFramework.Cqrs.Tests/Cqrs/DispatcherZeroPipelineStreamHandler.cs
create mode 100644 GFramework.Cqrs.Tests/Cqrs/DispatcherZeroPipelineStreamRequest.cs
diff --git a/GFramework.Cqrs.Tests/Cqrs/CqrsDispatcherCacheTests.cs b/GFramework.Cqrs.Tests/Cqrs/CqrsDispatcherCacheTests.cs
index 3001d0c1..82208f85 100644
--- a/GFramework.Cqrs.Tests/Cqrs/CqrsDispatcherCacheTests.cs
+++ b/GFramework.Cqrs.Tests/Cqrs/CqrsDispatcherCacheTests.cs
@@ -195,6 +195,55 @@ internal sealed class CqrsDispatcherCacheTests
false);
}
+ ///
+ /// 验证 stream 的“是否存在 pipeline behavior”判定会按 dispatcher 实例缓存,
+ /// 并与当前容器的实际服务可见性保持一致,同时不同 dispatcher 不共享该实例级状态。
+ ///
+ [Test]
+ public async Task Dispatcher_Should_Cache_Stream_Behavior_Presence_Per_Dispatcher_Instance()
+ {
+ var firstContext = new ArchitectureContext(_container!);
+ var secondContext = new ArchitectureContext(_container!);
+ var firstDispatcher = GetDispatcherFromContext(firstContext);
+ var secondDispatcher = GetDispatcherFromContext(secondContext);
+ using var isolatedContainer = CreateFrozenContainer();
+ var isolatedContext = new ArchitectureContext(isolatedContainer);
+ var isolatedDispatcher = GetDispatcherFromContext(isolatedContext);
+ var zeroPipelineBehaviorType = typeof(IStreamPipelineBehavior);
+ var twoPipelineBehaviorType = typeof(IStreamPipelineBehavior);
+ var expectedZeroPipelinePresence = _container!.HasRegistration(zeroPipelineBehaviorType);
+ var expectedTwoPipelinePresence = _container.HasRegistration(twoPipelineBehaviorType);
+
+ AssertStreamBehaviorPresenceIsUnset(firstDispatcher, zeroPipelineBehaviorType);
+ AssertStreamBehaviorPresenceIsUnset(secondDispatcher, zeroPipelineBehaviorType);
+ AssertStreamBehaviorPresenceIsUnset(isolatedDispatcher, zeroPipelineBehaviorType);
+ AssertStreamBehaviorPresenceIsUnset(firstDispatcher, twoPipelineBehaviorType);
+
+ await DrainAsync(firstContext.CreateStream(new DispatcherZeroPipelineStreamRequest()));
+ await DrainAsync(firstContext.CreateStream(new DispatcherStreamPipelineOrderRequest()));
+
+ var zeroPipelinePresence = GetStreamBehaviorPresenceCacheValue(
+ firstDispatcher,
+ zeroPipelineBehaviorType);
+ var twoPipelinePresence = GetStreamBehaviorPresenceCacheValue(
+ firstDispatcher,
+ twoPipelineBehaviorType);
+
+ AssertSharedStreamDispatcherCacheState(
+ firstDispatcher,
+ secondDispatcher,
+ isolatedDispatcher,
+ zeroPipelinePresence,
+ twoPipelinePresence,
+ zeroPipelineBehaviorType,
+ expectedZeroPipelinePresence,
+ expectedTwoPipelinePresence);
+
+ await DrainAsync(isolatedContext.CreateStream(new DispatcherZeroPipelineStreamRequest()));
+
+ AssertStreamBehaviorPresenceEquals(isolatedDispatcher, zeroPipelineBehaviorType, expectedZeroPipelinePresence);
+ }
+
///
/// 验证 request pipeline executor 会按行为数量在 binding 内首次创建并在后续分发中复用。
///
@@ -713,6 +762,33 @@ internal sealed class CqrsDispatcherCacheTests
return found ? arguments[1] : null;
}
+ ///
+ /// 读取指定 dispatcher 实例中当前保存的 stream behavior presence 缓存项。
+ ///
+ private static object? GetStreamBehaviorPresenceCacheValue(object dispatcher, Type behaviorType)
+ {
+ var field = dispatcher.GetType().GetField(
+ "_streamBehaviorPresenceCache",
+ BindingFlags.Instance | BindingFlags.NonPublic);
+
+ Assert.That(field, Is.Not.Null, "Missing dispatcher stream behavior presence cache field.");
+
+ var cache = field!.GetValue(dispatcher)
+ ?? throw new InvalidOperationException(
+ "Dispatcher stream behavior presence cache returned null.");
+ var tryGetValueMethod = cache.GetType().GetMethod(
+ "TryGetValue",
+ BindingFlags.Instance | BindingFlags.Public);
+
+ Assert.That(tryGetValueMethod, Is.Not.Null, "Missing ConcurrentDictionary.TryGetValue accessor.");
+
+ object?[] arguments = [behaviorType, null];
+ var found = (bool)(tryGetValueMethod!.Invoke(cache, arguments)
+ ?? throw new InvalidOperationException(
+ "ConcurrentDictionary.TryGetValue returned null."));
+ return found ? arguments[1] : null;
+ }
+
///
/// 断言指定 dispatcher 上某个 request behavior presence 缓存项尚未建立。
///
@@ -729,6 +805,22 @@ internal sealed class CqrsDispatcherCacheTests
Assert.That(GetRequestBehaviorPresenceCacheValue(dispatcher, behaviorType), Is.EqualTo(expected));
}
+ ///
+ /// 断言指定 dispatcher 上某个 stream behavior presence 缓存项尚未建立。
+ ///
+ private static void AssertStreamBehaviorPresenceIsUnset(object dispatcher, Type behaviorType)
+ {
+ Assert.That(GetStreamBehaviorPresenceCacheValue(dispatcher, behaviorType), Is.Null);
+ }
+
+ ///
+ /// 断言指定 dispatcher 上某个 stream behavior presence 缓存项等于预期值。
+ ///
+ private static void AssertStreamBehaviorPresenceEquals(object dispatcher, Type behaviorType, bool expected)
+ {
+ Assert.That(GetStreamBehaviorPresenceCacheValue(dispatcher, behaviorType), Is.EqualTo(expected));
+ }
+
///
/// 断言同一容器解析出的 dispatcher 会共享实例级缓存,而另一独立容器的 dispatcher 不会提前命中。
///
@@ -754,6 +846,29 @@ internal sealed class CqrsDispatcherCacheTests
});
}
+ ///
+ /// 断言同一容器解析出的 dispatcher 会共享 stream 的实例级缓存,而另一独立容器的 dispatcher 不会提前命中。
+ ///
+ private static void AssertSharedStreamDispatcherCacheState(
+ object firstDispatcher,
+ object secondDispatcher,
+ object isolatedDispatcher,
+ object? zeroPipelinePresence,
+ object? twoPipelinePresence,
+ Type zeroPipelineBehaviorType,
+ bool expectedZeroPipelinePresence,
+ bool expectedTwoPipelinePresence)
+ {
+ Assert.Multiple(() =>
+ {
+ Assert.That(secondDispatcher, Is.SameAs(firstDispatcher));
+ Assert.That(zeroPipelinePresence, Is.EqualTo(expectedZeroPipelinePresence));
+ Assert.That(twoPipelinePresence, Is.EqualTo(expectedTwoPipelinePresence));
+ AssertStreamBehaviorPresenceEquals(secondDispatcher, zeroPipelineBehaviorType, expectedZeroPipelinePresence);
+ AssertStreamBehaviorPresenceIsUnset(isolatedDispatcher, zeroPipelineBehaviorType);
+ });
+ }
+
///
/// 读取 request dispatch binding 中指定行为数量的 pipeline executor 缓存项。
///
diff --git a/GFramework.Cqrs.Tests/Cqrs/DispatcherZeroPipelineStreamHandler.cs b/GFramework.Cqrs.Tests/Cqrs/DispatcherZeroPipelineStreamHandler.cs
new file mode 100644
index 00000000..94c73ac1
--- /dev/null
+++ b/GFramework.Cqrs.Tests/Cqrs/DispatcherZeroPipelineStreamHandler.cs
@@ -0,0 +1,32 @@
+// Copyright (c) 2025-2026 GeWuYou
+// SPDX-License-Identifier: Apache-2.0
+
+using System;
+using System.Collections.Generic;
+using System.Runtime.CompilerServices;
+using System.Threading;
+using GFramework.Cqrs.Abstractions.Cqrs;
+
+namespace GFramework.Cqrs.Tests.Cqrs;
+
+///
+/// 处理 ,用于验证零管道 stream 的 dispatcher 缓存路径。
+///
+internal sealed class DispatcherZeroPipelineStreamHandler : IStreamRequestHandler
+{
+ ///
+ /// 返回一个单元素异步流,便于在缓存测试中最小化处理噪音。
+ ///
+ /// 当前零管道 stream 请求。
+ /// 用于终止异步枚举的取消令牌。
+ /// 只包含一个元素的异步响应流。
+ public async IAsyncEnumerable Handle(
+ DispatcherZeroPipelineStreamRequest request,
+ [EnumeratorCancellation] CancellationToken cancellationToken)
+ {
+ ArgumentNullException.ThrowIfNull(request);
+ cancellationToken.ThrowIfCancellationRequested();
+
+ yield return 1;
+ }
+}
diff --git a/GFramework.Cqrs.Tests/Cqrs/DispatcherZeroPipelineStreamRequest.cs b/GFramework.Cqrs.Tests/Cqrs/DispatcherZeroPipelineStreamRequest.cs
new file mode 100644
index 00000000..cb2bb73d
--- /dev/null
+++ b/GFramework.Cqrs.Tests/Cqrs/DispatcherZeroPipelineStreamRequest.cs
@@ -0,0 +1,11 @@
+// Copyright (c) 2025-2026 GeWuYou
+// SPDX-License-Identifier: Apache-2.0
+
+using GFramework.Cqrs.Abstractions.Cqrs;
+
+namespace GFramework.Cqrs.Tests.Cqrs;
+
+///
+/// 表示未注册任何 stream pipeline behavior 的最小缓存验证请求。
+///
+internal sealed record DispatcherZeroPipelineStreamRequest : IStreamRequest;
diff --git a/GFramework.Cqrs/Internal/CqrsDispatcher.cs b/GFramework.Cqrs/Internal/CqrsDispatcher.cs
index 52cb730e..a4891f1b 100644
--- a/GFramework.Cqrs/Internal/CqrsDispatcher.cs
+++ b/GFramework.Cqrs/Internal/CqrsDispatcher.cs
@@ -26,6 +26,10 @@ internal sealed class CqrsDispatcher(
// 每次 SendAsync 都重复询问容器。缓存值只反映当前 dispatcher 持有容器的注册可见性,不跨 runtime 共享。
private readonly ConcurrentDictionary _requestBehaviorPresenceCache = new();
+ // 与 request 路径相同,stream 的 behavior 注册可见性在当前 dispatcher 生命周期内保持稳定。
+ // 这里缓存 “CreateStream(...) 对应 behaviorType 是否存在注册”,避免零管道 stream 每次建流都重复询问容器。
+ private readonly ConcurrentDictionary _streamBehaviorPresenceCache = new();
+
// 卸载安全的进程级缓存:当 generated registry 提供 request invoker 元数据时,
// registrar 会按请求/响应类型对把它们写入这里;若类型被卸载,条目会自然失效。
private static readonly WeakTypePairCache
@@ -195,7 +199,7 @@ internal sealed class CqrsDispatcher(
$"No CQRS stream handler registered for {requestType.FullName}.");
PrepareHandler(handler, context);
- if (!container.HasRegistration(dispatchBinding.BehaviorType))
+ if (!HasStreamBehaviorRegistration(dispatchBinding.BehaviorType))
{
return (IAsyncEnumerable)dispatchBinding.StreamInvoker(handler, request, cancellationToken);
}
@@ -211,6 +215,21 @@ internal sealed class CqrsDispatcher(
.Invoke(handler, behaviors, dispatchBinding.StreamInvoker, request, cancellationToken);
}
+ ///
+ /// 读取当前 dispatcher 容器里是否存在指定 stream pipeline 行为注册,并在首次命中后缓存结果。
+ ///
+ /// 目标 stream pipeline 行为服务类型。
+ /// 存在注册时返回 ;否则返回 。
+ private bool HasStreamBehaviorRegistration(Type behaviorType)
+ {
+ ArgumentNullException.ThrowIfNull(behaviorType);
+
+ return _streamBehaviorPresenceCache.GetOrAdd(
+ behaviorType,
+ static (cachedBehaviorType, currentContainer) => currentContainer.HasRegistration(cachedBehaviorType),
+ container);
+ }
+
///
/// 为上下文感知处理器注入当前 CQRS 分发上下文。
///
diff --git a/ai-plan/public/cqrs-rewrite/todos/cqrs-rewrite-migration-tracking.md b/ai-plan/public/cqrs-rewrite/todos/cqrs-rewrite-migration-tracking.md
index 732804fe..310820ca 100644
--- a/ai-plan/public/cqrs-rewrite/todos/cqrs-rewrite-migration-tracking.md
+++ b/ai-plan/public/cqrs-rewrite/todos/cqrs-rewrite-migration-tracking.md
@@ -7,11 +7,18 @@ CQRS 迁移与收敛。
## 当前恢复点
-- 恢复点编号:`CQRS-REWRITE-RP-123`
+- 恢复点编号:`CQRS-REWRITE-RP-124`
- 当前阶段:`Phase 8`
- 当前 PR 锚点:`PR #344`
- 当前结论:
- - 当前 `RP-123` 通过 `$gframework-pr-review` 重新复核 `feat/cqrs-optimization` 的 latest-head review,确认 `PR #344` 仍成立且值得在本轮一起收口的问题共有四类:`CqrsDispatcher.ResolveNotificationPublisher()` 默认路径每次 publish 都重复查容器并在零注册分支分配新的 `SequentialNotificationPublisher`;`CqrsDispatcherContextValidationTests` 与 `CqrsNotificationPublisherTests` 的 strict `IIocContainer` helper 缺少 `GetAll(typeof(INotificationPublisher))` 默认装配,导致 CI 在真正断言前就被 mock 异常短路;`NotificationPublisherRegistrationExtensionsTests` 缺少“唯一注册”断言;`CqrsDispatcherCacheTests` 的隔离容器构建复制了 `SetUp()` 的注册形状,存在后续漂移风险
+- 当前 `RP-124` 延续 `$gframework-batch-boot 50`,在 `RP-123` 收口 latest-head review 对 notification publisher 的四类问题后,回到 stream steady-state 热路径,选择与 `RP-122` request cache 对称、且仍保持小写面的一刀:把 `CreateStream(...)` 上的 behavior presence 判定收口为 dispatcher 实例级缓存
+- 本轮改动面只落在 `GFramework.Cqrs/Internal/CqrsDispatcher.cs`、`GFramework.Cqrs.Tests/Cqrs/CqrsDispatcherCacheTests.cs`、新增的 `DispatcherZeroPipelineStreamRequest/Handler` 测试桩,以及 `ai-plan/public/cqrs-rewrite` 恢复文档,不扩到新的公开 API、文档页或额外 benchmark 宿主代码
+- `CqrsDispatcher` 现新增 `_streamBehaviorPresenceCache`,按闭合 `IStreamPipelineBehavior<,>` 服务类型缓存当前 dispatcher 容器中的服务可见性;这样 `CreateStream(...)` 在 steady-state 下不会每次都重复执行 `HasRegistration(Type)`,并继续保持“同一 runtime 实例内缓存、不同容器不共享”的边界
+- `CqrsDispatcherCacheTests` 现新增 `Dispatcher_Should_Cache_Stream_Behavior_Presence_Per_Dispatcher_Instance()`,锁住两件事:同一容器解析出的多个 `ArchitectureContext` 会共享同一个 dispatcher 实例及其 stream presence cache;另一独立冻结容器创建的 dispatcher 不会提前共享该实例级状态。为避免夹具里已有 stream behavior 注册干扰零管道建流路径,本轮补入 `DispatcherZeroPipelineStreamRequest` 与 `DispatcherZeroPipelineStreamHandler` 作为最小测试桩
+- 本轮 benchmark 仅复跑最小 stream 生命周期矩阵中的 `StreamLifetimeBenchmarks.Stream_GFrameworkCqrs`,确认 benchmark 宿主在这刀之后仍能稳定产出:`Singleton` 约 `107.241 ns / 240 B`,`Transient` 约 `119.434 ns / 264 B`
+- 当前已提交分支相对 `origin/main`(`d85828c5`, `2026-05-09 12:25:41 +0800`)的累计 branch diff 仍为 `0 files / 0 changed lines`;本批待提交工作树共触达 `4 files`,其中已跟踪 diff 为 `135 insertions / 1 deletion`,另有 `2` 个新测试文件共 `43` 行,明显低于 `$gframework-batch-boot 50` 的文件阈值
+- 下一推荐步骤:提交并推送本轮 commit 后,再次运行 `$gframework-pr-review` 复核 `PR #344` latest-head thread 是否已收敛;若 review 已清空,则下一批优先比较完整 `StreamLifetimeBenchmarks` 三方对照是否仍优于 `MediatR`,再决定继续压 stream 零管道常量路径还是切回 request `Transient` 热点
+- 当前 `RP-123` 通过 `$gframework-pr-review` 重新复核 `feat/cqrs-optimization` 的 latest-head review,确认 `PR #344` 仍成立且值得在本轮一起收口的问题共有四类:`CqrsDispatcher.ResolveNotificationPublisher()` 默认路径每次 publish 都重复查容器并在零注册分支分配新的 `SequentialNotificationPublisher`;`CqrsDispatcherContextValidationTests` 与 `CqrsNotificationPublisherTests` 的 strict `IIocContainer` helper 缺少 `GetAll(typeof(INotificationPublisher))` 默认装配,导致 CI 在真正断言前就被 mock 异常短路;`NotificationPublisherRegistrationExtensionsTests` 缺少“唯一注册”断言;`CqrsDispatcherCacheTests` 的隔离容器构建复制了 `SetUp()` 的注册形状,存在后续漂移风险
- 本轮保持改动面只落在 `GFramework.Cqrs`、`GFramework.Cqrs.Tests` 与 `ai-plan/public/cqrs-rewrite`,不扩散到新的 benchmark 宿主或额外 notification API;其中 `CqrsDispatcher` 新增 dispatcher 实例级 `_resolvedNotificationPublisher` 缓存,并在首次解析后通过线程安全比较交换固定最终策略实例,继续保持“显式实例优先、容器内唯一注册次之、默认顺序发布器兜底”的既有契约
- 两个 strict mock runtime helper 现统一预设 `IIocContainer.GetAll(typeof(INotificationPublisher)) => Array.Empty