Merge pull request #344 from GeWuYou/feat/cqrs-optimization

Feat/cqrs optimization
This commit is contained in:
gewuyou 2026-05-09 12:25:41 +08:00 committed by GitHub
commit d85828c533
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
13 changed files with 694 additions and 38 deletions

View File

@ -1,12 +1,15 @@
// Copyright (c) 2025-2026 GeWuYou
// SPDX-License-Identifier: Apache-2.0
using System.Reflection;
using GFramework.Core.Abstractions.Architectures;
using GFramework.Core.Abstractions.Logging;
using GFramework.Core.Abstractions.Utility;
using GFramework.Core.Architectures;
using GFramework.Core.Logging;
using GFramework.Cqrs;
using GFramework.Cqrs.Abstractions.Cqrs;
using GFramework.Cqrs.Notification;
using Microsoft.Extensions.DependencyInjection;
namespace GFramework.Core.Tests.Architectures;
@ -27,6 +30,7 @@ public class ArchitectureModulesBehaviorTests
{
LoggerFactoryResolver.Provider = new ConsoleLoggerFactoryProvider();
GameContext.Clear();
AdditionalAssemblyNotificationHandlerState.Reset();
TrackingPipelineBehavior<ModuleBehaviorRequest, string>.InvocationCount = 0;
TrackingStreamPipelineBehavior<ModuleStreamBehaviorRequest, int>.InvocationCount = 0;
}
@ -37,6 +41,7 @@ public class ArchitectureModulesBehaviorTests
[TearDown]
public void TearDown()
{
AdditionalAssemblyNotificationHandlerState.Reset();
GameContext.Clear();
TrackingPipelineBehavior<ModuleBehaviorRequest, string>.InvocationCount = 0;
TrackingStreamPipelineBehavior<ModuleStreamBehaviorRequest, int>.InvocationCount = 0;
@ -156,6 +161,35 @@ public class ArchitectureModulesBehaviorTests
}
}
/// <summary>
/// 验证标准架构启动路径会复用通过 <see cref="Architecture.Configurator" /> 声明的自定义 notification publisher
/// 而不是在 <see cref="GFramework.Core.Services.Modules.CqrsRuntimeModule" /> 创建 runtime 时提前固化默认顺序策略。
/// </summary>
[Test]
public async Task InitializeAsync_Should_Reuse_Custom_NotificationPublisher_From_Configurator()
{
var generatedAssembly = CreateGeneratedHandlerAssembly();
var architecture = new ConfiguredNotificationPublisherArchitecture(generatedAssembly.Object);
await architecture.InitializeAsync();
try
{
var probe = architecture.Context.GetService<ArchitectureNotificationPublisherProbe>();
await architecture.Context.PublishAsync(new AdditionalAssemblyNotification());
Assert.Multiple(() =>
{
Assert.That(probe.WasCalled, Is.True);
Assert.That(AdditionalAssemblyNotificationHandlerState.InvocationCount, Is.EqualTo(1));
});
}
finally
{
await architecture.DestroyAsync();
}
}
/// <summary>
/// 用于测试模块行为的最小架构实现。
/// </summary>
@ -191,6 +225,31 @@ public class ArchitectureModulesBehaviorTests
}
}
/// <summary>
/// 通过标准架构启动路径声明自定义 notification publisher 的最小架构。
/// </summary>
private sealed class ConfiguredNotificationPublisherArchitecture(Assembly generatedAssembly) : Architecture
{
/// <summary>
/// 在服务钩子阶段注册 probe 与自定义 publisher
/// 以模拟真实项目在组合根里通过 <see cref="IServiceCollection" /> 覆盖默认策略的路径。
/// </summary>
public override Action<IServiceCollection>? Configurator => services =>
{
services.AddSingleton<ArchitectureNotificationPublisherProbe>();
services.AddSingleton<INotificationPublisher, ArchitectureTrackingNotificationPublisher>();
};
/// <summary>
/// 在用户初始化阶段显式接入额外程序集里的 notification handler
/// 让测试聚焦“publisher 是否被复用”,而不是依赖当前测试文件自己的 handler 扫描形状。
/// </summary>
protected override void OnInitialize()
{
RegisterCqrsHandlersFromAssembly(generatedAssembly);
}
}
/// <summary>
/// 记录模块安装调用情况的测试模块。
/// </summary>
@ -225,6 +284,69 @@ public class ArchitectureModulesBehaviorTests
{
}
/// <summary>
/// 创建一个仅暴露程序集级 CQRS registry 元数据的 mocked Assembly。
/// 该测试替身模拟扩展程序集已经提供 notification handler registry而架构只需在初始化时显式接入该程序集。
/// </summary>
/// <returns>包含程序集级 notification handler registry 元数据的 mocked Assembly。</returns>
private static Mock<Assembly> CreateGeneratedHandlerAssembly()
{
var generatedAssembly = new Mock<Assembly>();
generatedAssembly
.SetupGet(static assembly => assembly.FullName)
.Returns("GFramework.Core.Tests.Architectures.ExplicitAdditionalHandlers, Version=1.0.0.0");
generatedAssembly
.Setup(static assembly => assembly.GetCustomAttributes(typeof(CqrsHandlerRegistryAttribute), false))
.Returns([new CqrsHandlerRegistryAttribute(typeof(AdditionalAssemblyNotificationHandlerRegistry))]);
return generatedAssembly;
}
/// <summary>
/// 记录自定义 notification publisher 是否真正参与了标准架构启动路径下的 publish 调用。
/// </summary>
private sealed class ArchitectureNotificationPublisherProbe
{
/// <summary>
/// 获取 probe 是否已被 publisher 标记为执行过。
/// </summary>
public bool WasCalled { get; private set; }
/// <summary>
/// 记录当前 publish 调用已经命中了自定义 publisher。
/// </summary>
public void MarkCalled()
{
WasCalled = true;
}
}
/// <summary>
/// 依赖容器内 probe 的自定义 notification publisher。
/// 该类型通过显式标记 + 正常转发处理器执行,验证标准架构启动路径不会把自定义策略短路成默认顺序发布器。
/// </summary>
private sealed class ArchitectureTrackingNotificationPublisher(
ArchitectureNotificationPublisherProbe probe) : INotificationPublisher
{
/// <summary>
/// 记录自定义 publisher 已参与当前发布调用,并继续按处理器解析顺序转发执行。
/// </summary>
public async ValueTask PublishAsync<TNotification>(
NotificationPublishContext<TNotification> context,
CancellationToken cancellationToken = default)
where TNotification : INotification
{
ArgumentNullException.ThrowIfNull(context);
cancellationToken.ThrowIfCancellationRequested();
probe.MarkCalled();
foreach (var handler in context.Handlers)
{
await context.InvokeHandlerAsync(handler, cancellationToken).ConfigureAwait(false);
}
}
}
/// <summary>
/// 物化异步流为只读列表,便于断言 stream pipeline 行为的最终可观察结果。
/// </summary>

View File

@ -6,7 +6,6 @@ using GFramework.Core.Abstractions.Ioc;
using GFramework.Core.Abstractions.Logging;
using GFramework.Cqrs;
using GFramework.Cqrs.Abstractions.Cqrs;
using GFramework.Cqrs.Notification;
using LegacyICqrsRuntime = GFramework.Core.Abstractions.Cqrs.ICqrsRuntime;
namespace GFramework.Core.Services.Modules;
@ -46,8 +45,7 @@ public sealed class CqrsRuntimeModule : IServiceModule
var dispatcherLogger = LoggerFactoryResolver.Provider.CreateLogger("CqrsDispatcher");
var registrarLogger = LoggerFactoryResolver.Provider.CreateLogger("DefaultCqrsHandlerRegistrar");
var registrationLogger = LoggerFactoryResolver.Provider.CreateLogger("DefaultCqrsRegistrationService");
var notificationPublisher = container.Get<INotificationPublisher>();
var runtime = CqrsRuntimeFactory.CreateRuntime(container, dispatcherLogger, notificationPublisher);
var runtime = CqrsRuntimeFactory.CreateRuntime(container, dispatcherLogger);
var registrar = CqrsRuntimeFactory.CreateHandlerRegistrar(container, registrarLogger);
container.Register(runtime);

View File

@ -27,19 +27,7 @@ internal sealed class CqrsDispatcherCacheTests
{
LoggerFactoryResolver.Provider = new ConsoleLoggerFactoryProvider();
_container = new MicrosoftDiContainer();
_container.RegisterCqrsPipelineBehavior<DispatcherPipelineCacheBehavior>();
_container.RegisterCqrsPipelineBehavior<DispatcherPipelineContextRefreshBehavior>();
_container.RegisterCqrsPipelineBehavior<DispatcherPipelineOrderOuterBehavior>();
_container.RegisterCqrsPipelineBehavior<DispatcherPipelineOrderInnerBehavior>();
_container.RegisterCqrsStreamPipelineBehavior<DispatcherStreamPipelineCacheBehavior>();
_container.RegisterCqrsStreamPipelineBehavior<DispatcherStreamPipelineContextRefreshBehavior>();
_container.RegisterCqrsStreamPipelineBehavior<DispatcherStreamPipelineOrderOuterBehavior>();
_container.RegisterCqrsStreamPipelineBehavior<DispatcherStreamPipelineOrderInnerBehavior>();
CqrsTestRuntime.RegisterHandlers(
_container,
typeof(CqrsDispatcherCacheTests).Assembly,
typeof(ArchitectureContext).Assembly);
ConfigureDispatcherCacheFixture(_container);
_container.Freeze();
_context = new ArchitectureContext(_container);
@ -160,6 +148,53 @@ internal sealed class CqrsDispatcherCacheTests
});
}
/// <summary>
/// 验证 request 的“是否存在 pipeline behavior”判定会按 dispatcher 实例缓存,
/// 让零行为请求在首次分发后不再重复查询容器,同时不同 dispatcher 不共享该实例级状态。
/// </summary>
[Test]
public async Task Dispatcher_Should_Cache_Zero_Pipeline_Request_Presence_Per_Dispatcher_Instance()
{
var firstContext = new ArchitectureContext(_container!);
var secondContext = new ArchitectureContext(_container!);
var firstDispatcher = GetDispatcherFromContext(firstContext);
var secondDispatcher = GetDispatcherFromContext(secondContext);
using var isolatedContainer = CreateFrozenContainer();
var isolatedContext = new ArchitectureContext(isolatedContainer);
var isolatedDispatcher = GetDispatcherFromContext(isolatedContext);
AssertRequestBehaviorPresenceIsUnset(firstDispatcher, typeof(IPipelineBehavior<DispatcherCacheRequest, int>));
AssertRequestBehaviorPresenceIsUnset(secondDispatcher, typeof(IPipelineBehavior<DispatcherCacheRequest, int>));
AssertRequestBehaviorPresenceIsUnset(isolatedDispatcher, typeof(IPipelineBehavior<DispatcherCacheRequest, int>));
AssertRequestBehaviorPresenceIsUnset(
firstDispatcher,
typeof(IPipelineBehavior<DispatcherPipelineCacheRequest, int>));
await firstContext.SendRequestAsync(new DispatcherCacheRequest());
await firstContext.SendRequestAsync(new DispatcherPipelineCacheRequest());
var zeroPipelinePresence = GetRequestBehaviorPresenceCacheValue(
firstDispatcher,
typeof(IPipelineBehavior<DispatcherCacheRequest, int>));
var onePipelinePresence = GetRequestBehaviorPresenceCacheValue(
firstDispatcher,
typeof(IPipelineBehavior<DispatcherPipelineCacheRequest, int>));
AssertSharedDispatcherCacheState(
firstDispatcher,
secondDispatcher,
isolatedDispatcher,
zeroPipelinePresence,
onePipelinePresence);
await isolatedContext.SendRequestAsync(new DispatcherCacheRequest());
AssertRequestBehaviorPresenceEquals(
isolatedDispatcher,
typeof(IPipelineBehavior<DispatcherCacheRequest, int>),
false);
}
/// <summary>
/// 验证 request pipeline executor 会按行为数量在 binding 内首次创建并在后续分发中复用。
/// </summary>
@ -565,6 +600,66 @@ internal sealed class CqrsDispatcherCacheTests
$"Dispatcher cache field {fieldName} returned null.");
}
/// <summary>
/// 从架构上下文中解析当前延迟创建的 dispatcher 实例,便于验证其实例级热路径缓存。
/// </summary>
private static object GetDispatcherFromContext(ArchitectureContext context)
{
ArgumentNullException.ThrowIfNull(context);
var lazyRuntimeField = typeof(ArchitectureContext).GetField(
"_cqrsRuntime",
BindingFlags.Instance | BindingFlags.NonPublic);
Assert.That(lazyRuntimeField, Is.Not.Null, "Missing ArchitectureContext._cqrsRuntime field.");
var lazyRuntime = lazyRuntimeField!.GetValue(context)
?? throw new InvalidOperationException(
"ArchitectureContext._cqrsRuntime returned null.");
var lazyValueProperty = lazyRuntime.GetType().GetProperty(
"Value",
BindingFlags.Instance | BindingFlags.Public);
Assert.That(lazyValueProperty, Is.Not.Null, "Missing Lazy<ICqrsRuntime>.Value accessor.");
return lazyValueProperty!.GetValue(lazyRuntime)
?? throw new InvalidOperationException("Resolved CQRS runtime instance was null.");
}
/// <summary>
/// 创建与当前 fixture 注册形状一致、但拥有独立 runtime 实例的冻结容器,
/// 用于验证 dispatcher 的实例级缓存不会跨容器共享。
/// </summary>
private static MicrosoftDiContainer CreateFrozenContainer()
{
var container = new MicrosoftDiContainer();
ConfigureDispatcherCacheFixture(container);
container.Freeze();
return container;
}
/// <summary>
/// 组装当前 fixture 依赖的 CQRS 容器注册形状,确保默认上下文与隔离容器复用同一份装配基线。
/// </summary>
/// <param name="container">待补齐 CQRS 注册的目标容器。</param>
private static void ConfigureDispatcherCacheFixture(MicrosoftDiContainer container)
{
container.RegisterCqrsPipelineBehavior<DispatcherPipelineCacheBehavior>();
container.RegisterCqrsPipelineBehavior<DispatcherPipelineContextRefreshBehavior>();
container.RegisterCqrsPipelineBehavior<DispatcherPipelineOrderOuterBehavior>();
container.RegisterCqrsPipelineBehavior<DispatcherPipelineOrderInnerBehavior>();
container.RegisterCqrsStreamPipelineBehavior<DispatcherStreamPipelineCacheBehavior>();
container.RegisterCqrsStreamPipelineBehavior<DispatcherStreamPipelineContextRefreshBehavior>();
container.RegisterCqrsStreamPipelineBehavior<DispatcherStreamPipelineOrderOuterBehavior>();
container.RegisterCqrsStreamPipelineBehavior<DispatcherStreamPipelineOrderInnerBehavior>();
CqrsTestRuntime.RegisterHandlers(
container,
typeof(CqrsDispatcherCacheTests).Assembly,
typeof(ArchitectureContext).Assembly);
}
/// <summary>
/// 清空本测试依赖的 dispatcher 静态缓存,避免跨用例共享进程级状态导致断言漂移。
/// </summary>
@ -591,6 +686,74 @@ internal sealed class CqrsDispatcherCacheTests
return InvokeInstanceMethod(cache, "GetValueOrDefaultForTesting", primaryType, secondaryType);
}
/// <summary>
/// 读取指定 dispatcher 实例中当前保存的 request behavior presence 缓存项。
/// </summary>
private static object? GetRequestBehaviorPresenceCacheValue(object dispatcher, Type behaviorType)
{
var field = dispatcher.GetType().GetField(
"_requestBehaviorPresenceCache",
BindingFlags.Instance | BindingFlags.NonPublic);
Assert.That(field, Is.Not.Null, "Missing dispatcher request behavior presence cache field.");
var cache = field!.GetValue(dispatcher)
?? throw new InvalidOperationException(
"Dispatcher request behavior presence cache returned null.");
var tryGetValueMethod = cache.GetType().GetMethod(
"TryGetValue",
BindingFlags.Instance | BindingFlags.Public);
Assert.That(tryGetValueMethod, Is.Not.Null, "Missing ConcurrentDictionary.TryGetValue accessor.");
object?[] arguments = [behaviorType, null];
var found = (bool)(tryGetValueMethod!.Invoke(cache, arguments)
?? throw new InvalidOperationException(
"ConcurrentDictionary.TryGetValue returned null."));
return found ? arguments[1] : null;
}
/// <summary>
/// 断言指定 dispatcher 上某个 request behavior presence 缓存项尚未建立。
/// </summary>
private static void AssertRequestBehaviorPresenceIsUnset(object dispatcher, Type behaviorType)
{
Assert.That(GetRequestBehaviorPresenceCacheValue(dispatcher, behaviorType), Is.Null);
}
/// <summary>
/// 断言指定 dispatcher 上某个 request behavior presence 缓存项等于预期值。
/// </summary>
private static void AssertRequestBehaviorPresenceEquals(object dispatcher, Type behaviorType, bool expected)
{
Assert.That(GetRequestBehaviorPresenceCacheValue(dispatcher, behaviorType), Is.EqualTo(expected));
}
/// <summary>
/// 断言同一容器解析出的 dispatcher 会共享实例级缓存,而另一独立容器的 dispatcher 不会提前命中。
/// </summary>
private static void AssertSharedDispatcherCacheState(
object firstDispatcher,
object secondDispatcher,
object isolatedDispatcher,
object? zeroPipelinePresence,
object? onePipelinePresence)
{
Assert.Multiple(() =>
{
Assert.That(secondDispatcher, Is.SameAs(firstDispatcher));
Assert.That(zeroPipelinePresence, Is.EqualTo(false));
Assert.That(onePipelinePresence, Is.EqualTo(true));
AssertRequestBehaviorPresenceEquals(
secondDispatcher,
typeof(IPipelineBehavior<DispatcherCacheRequest, int>),
false);
AssertRequestBehaviorPresenceIsUnset(
isolatedDispatcher,
typeof(IPipelineBehavior<DispatcherCacheRequest, int>));
});
}
/// <summary>
/// 读取 request dispatch binding 中指定行为数量的 pipeline executor 缓存项。
/// </summary>

View File

@ -7,6 +7,7 @@ using GFramework.Core.Abstractions.Ioc;
using GFramework.Core.Abstractions.Logging;
using GFramework.Cqrs.Abstractions.Cqrs;
using GFramework.Cqrs.Cqrs;
using GFramework.Cqrs.Notification;
using GFramework.Cqrs.Tests.Logging;
namespace GFramework.Cqrs.Tests.Cqrs;
@ -184,6 +185,11 @@ internal sealed class CqrsDispatcherContextValidationTests
var container = new Mock<IIocContainer>(MockBehavior.Strict);
var logger = new TestLogger("CqrsDispatcherContextValidationTests", LogLevel.Debug);
// PublishAsync 的默认路径会在真正发布时查询通知发布器注册strict mock 需显式覆盖空注册分支。
container
.Setup(currentContainer => currentContainer.GetAll(typeof(INotificationPublisher)))
.Returns(Array.Empty<object>());
configureContainer(container);
return CqrsRuntimeFactory.CreateRuntime(container.Object, logger);
}

View File

@ -183,6 +183,11 @@ internal sealed class CqrsNotificationPublisherTests
var container = new Mock<IIocContainer>(MockBehavior.Strict);
var logger = new TestLogger(nameof(CqrsNotificationPublisherTests), LogLevel.Debug);
// 默认 runtime 会延迟解析通知发布器strict mock 需要声明“未注册自定义 publisher”的空集合返回。
container
.Setup(currentContainer => currentContainer.GetAll(typeof(INotificationPublisher)))
.Returns(Array.Empty<object>());
configureContainer(container);
return CqrsRuntimeFactory.CreateRuntime(container.Object, logger, notificationPublisher);
}

View File

@ -33,6 +33,8 @@ internal sealed class NotificationPublisherRegistrationExtensionsTests
container.Register<INotificationHandler<TestNotification>>(trailingHandler);
CqrsTestRuntime.RegisterInfrastructure(container);
container.Freeze();
Assert.That(container.GetAll(typeof(INotificationHandler<TestNotification>)), Has.Count.EqualTo(2));
Assert.That(container.GetAll(typeof(INotificationPublisher)), Has.Count.EqualTo(1));
var context = new ArchitectureContext(container);
var publishTask = context.PublishAsync(new TestNotification()).AsTask();
@ -66,6 +68,8 @@ internal sealed class NotificationPublisherRegistrationExtensionsTests
container.Register<INotificationHandler<TestNotification>>(trailingHandler);
CqrsTestRuntime.RegisterInfrastructure(container);
container.Freeze();
Assert.That(container.GetAll(typeof(INotificationHandler<TestNotification>)), Has.Count.EqualTo(2));
Assert.That(container.GetAll(typeof(INotificationPublisher)), Has.Count.EqualTo(1));
var context = new ArchitectureContext(container);
@ -91,6 +95,52 @@ internal sealed class NotificationPublisherRegistrationExtensionsTests
Assert.That(container.Get<INotificationPublisher>(), Is.SameAs(publisher));
}
/// <summary>
/// 验证泛型组合根注册入口会把指定的 publisher 类型注册为容器内唯一的单例策略。
/// </summary>
[Test]
public void UseNotificationPublisher_Generic_Overload_Should_Register_Configured_Type()
{
var container = new MicrosoftDiContainer();
var returnedContainer = container.UseNotificationPublisher<TrackingNotificationPublisher>();
container.Freeze();
Assert.That(returnedContainer, Is.SameAs(container));
Assert.That(container.HasRegistration(typeof(INotificationPublisher)), Is.True);
Assert.That(container.GetAll(typeof(INotificationPublisher)), Has.Count.EqualTo(1));
Assert.That(container.GetRequired<INotificationPublisher>(), Is.TypeOf<TrackingNotificationPublisher>());
Assert.That(container.GetRequired<INotificationPublisher>(), Is.SameAs(container.GetRequired<INotificationPublisher>()));
}
/// <summary>
/// 验证当自定义 publisher 依赖其他容器服务时,泛型组合根入口仍会被默认 runtime 基础设施正确复用。
/// </summary>
[Test]
public async Task UseNotificationPublisher_Generic_Overload_Should_Be_Used_By_Default_Runtime_Infrastructure()
{
LoggerFactoryResolver.Provider = new ConsoleLoggerFactoryProvider();
var probe = new NotificationPublisherProbe();
var handler = new RecordingNotificationHandler();
var container = new MicrosoftDiContainer();
container.Register(probe);
container.UseNotificationPublisher<DependencyAwareNotificationPublisher>();
container.Register<INotificationHandler<TestNotification>>(handler);
CqrsTestRuntime.RegisterInfrastructure(container);
container.Freeze();
Assert.That(container.GetAll(typeof(INotificationHandler<TestNotification>)), Has.Count.EqualTo(1));
Assert.That(container.GetAll(typeof(INotificationPublisher)), Has.Count.EqualTo(1));
var context = new ArchitectureContext(container);
await context.PublishAsync(new TestNotification()).ConfigureAwait(false);
Assert.That(probe.WasCalled, Is.True);
Assert.That(handler.WasInvoked, Is.True);
Assert.That(container.GetRequired<INotificationPublisher>(), Is.TypeOf<DependencyAwareNotificationPublisher>());
}
/// <summary>
/// 验证组合根扩展会阻止重复 notification publisher 注册,避免 runtime 创建阶段才暴露歧义。
/// </summary>
@ -105,6 +155,20 @@ internal sealed class NotificationPublisherRegistrationExtensionsTests
Throws.InvalidOperationException.With.Message.Contains(nameof(INotificationPublisher)));
}
/// <summary>
/// 验证当容器已存在 notification publisher 注册时,泛型组合根入口也会拒绝重复策略声明。
/// </summary>
[Test]
public void UseNotificationPublisher_Generic_Overload_Should_Throw_When_NotificationPublisher_Already_Registered()
{
var container = new MicrosoftDiContainer();
container.UseSequentialNotificationPublisher();
Assert.That(
() => container.UseNotificationPublisher<TrackingNotificationPublisher>(),
Throws.InvalidOperationException.With.Message.Contains(nameof(INotificationPublisher)));
}
/// <summary>
/// 为本组测试提供最小 notification 类型。
/// </summary>
@ -166,4 +230,48 @@ internal sealed class NotificationPublisherRegistrationExtensionsTests
return ValueTask.CompletedTask;
}
}
/// <summary>
/// 记录泛型 publisher 是否真正参与了 publish 调用的探针。
/// </summary>
private sealed class NotificationPublisherProbe
{
/// <summary>
/// 获取探针是否已被自定义 publisher 标记为执行过。
/// </summary>
public bool WasCalled { get; private set; }
/// <summary>
/// 记录当前自定义 publisher 已接管本次通知发布。
/// </summary>
public void MarkCalled()
{
WasCalled = true;
}
}
/// <summary>
/// 依赖容器内探针服务的自定义 publisher用于验证泛型重载确实走过了 provider 构造路径。
/// </summary>
private sealed class DependencyAwareNotificationPublisher(NotificationPublisherProbe probe) : INotificationPublisher
{
/// <summary>
/// 记录 publisher 已参与调用,再按当前处理器顺序继续执行。
/// </summary>
public async ValueTask PublishAsync<TNotification>(
NotificationPublishContext<TNotification> context,
CancellationToken cancellationToken = default)
where TNotification : INotification
{
ArgumentNullException.ThrowIfNull(context);
cancellationToken.ThrowIfCancellationRequested();
probe.MarkCalled();
foreach (var handler in context.Handlers)
{
await context.InvokeHandlerAsync(handler, cancellationToken).ConfigureAwait(false);
}
}
}
}

View File

@ -24,6 +24,10 @@ public static class CqrsRuntimeFactory
/// <param name="container">目标依赖注入容器。</param>
/// <param name="logger">用于 runtime 诊断的日志器。</param>
/// <returns>默认 CQRS runtime。</returns>
/// <remarks>
/// 若调用方未显式传入 notification publisherruntime 会在真正发布通知时优先复用容器里声明的
/// <see cref="INotificationPublisher" />;若仍未声明,则回退到默认顺序发布器。
/// </remarks>
/// <exception cref="ArgumentNullException">
/// <paramref name="container" /> 或 <paramref name="logger" /> 为 <see langword="null" />。
/// </exception>
@ -37,7 +41,10 @@ public static class CqrsRuntimeFactory
/// </summary>
/// <param name="container">目标依赖注入容器。</param>
/// <param name="logger">用于 runtime 诊断的日志器。</param>
/// <param name="notificationPublisher">可选的通知发布策略;若为 <see langword="null" /> 则使用默认顺序发布器。</param>
/// <param name="notificationPublisher">
/// 可选的通知发布策略;若为 <see langword="null" />runtime 会在发布时优先尝试解析容器中已声明的
/// <see cref="INotificationPublisher" />,否则再回退到默认顺序发布器。
/// </param>
/// <returns>默认 CQRS runtime。</returns>
/// <exception cref="ArgumentNullException">
/// <paramref name="container" /> 或 <paramref name="logger" /> 为 <see langword="null" />。
@ -53,7 +60,7 @@ public static class CqrsRuntimeFactory
return new CqrsDispatcher(
container,
logger,
notificationPublisher ?? new SequentialNotificationPublisher());
notificationPublisher);
}
/// <summary>

View File

@ -19,8 +19,13 @@ namespace GFramework.Cqrs.Internal;
internal sealed class CqrsDispatcher(
IIocContainer container,
ILogger logger,
INotificationPublisher notificationPublisher) : ICqrsRuntime
INotificationPublisher? notificationPublisher) : ICqrsRuntime
{
// 实例级热路径缓存:默认 runtime 在容器冻结前创建,但请求/stream 行为注册在架构生命周期内保持稳定。
// 因此这里按 behavior service type 记住“当前 dispatcher 对应容器里是否存在该行为”,避免 0-pipeline steady-state
// 每次 SendAsync 都重复询问容器。缓存值只反映当前 dispatcher 持有容器的注册可见性,不跨 runtime 共享。
private readonly ConcurrentDictionary<Type, bool> _requestBehaviorPresenceCache = new();
// 卸载安全的进程级缓存:当 generated registry 提供 request invoker 元数据时,
// registrar 会按请求/响应类型对把它们写入这里;若类型被卸载,条目会自然失效。
private static readonly WeakTypePairCache<GeneratedRequestInvokerMetadata>
@ -61,9 +66,14 @@ internal sealed class CqrsDispatcher(
private static readonly MethodInfo StreamPipelineInvokerMethodDefinition = typeof(CqrsDispatcher)
.GetMethod(nameof(InvokeStreamPipelineExecutor), BindingFlags.NonPublic | BindingFlags.Static)!;
private readonly INotificationPublisher _notificationPublisher = notificationPublisher
?? throw new ArgumentNullException(
nameof(notificationPublisher));
// runtime 通常会在容器冻结前创建;此时通过实现类型注册的 notification publisher
// 还没有被底层 provider 物化,因此不能只在构造阶段抓取一次。
// 显式传入实例时仍优先复用该实例;否则在真正 publish 时再尝试从容器解析。
private readonly INotificationPublisher? _notificationPublisher = notificationPublisher;
// 容器冻结后 notification publisher 解析结果在当前 dispatcher 生命周期内保持稳定;
// 因此首次 publish 后缓存最终策略实例,避免后续热路径重复查容器和重复分配默认 publisher。
private INotificationPublisher? _resolvedNotificationPublisher;
/// <summary>
/// 发布通知到所有已注册处理器。
@ -94,7 +104,7 @@ internal sealed class CqrsDispatcher(
}
var publishContext = CreateNotificationPublishContext(notification, handlers, context, dispatchBinding.Invoker);
await _notificationPublisher.PublishAsync(publishContext, cancellationToken).ConfigureAwait(false);
await ResolveNotificationPublisher().PublishAsync(publishContext, cancellationToken).ConfigureAwait(false);
}
/// <summary>
@ -122,7 +132,7 @@ internal sealed class CqrsDispatcher(
$"No CQRS request handler registered for {requestType.FullName}.");
PrepareHandler(handler, context);
if (!container.HasRegistration(dispatchBinding.BehaviorType))
if (!HasRequestBehaviorRegistration(dispatchBinding.BehaviorType))
{
return dispatchBinding.RequestInvoker(handler, request, cancellationToken);
}
@ -144,6 +154,21 @@ internal sealed class CqrsDispatcher(
}
}
/// <summary>
/// 读取当前 dispatcher 容器里是否存在指定 request pipeline 行为注册,并在首次命中后缓存结果。
/// </summary>
/// <param name="behaviorType">目标 pipeline 行为服务类型。</param>
/// <returns>存在注册时返回 <see langword="true" />;否则返回 <see langword="false" />。</returns>
private bool HasRequestBehaviorRegistration(Type behaviorType)
{
ArgumentNullException.ThrowIfNull(behaviorType);
return _requestBehaviorPresenceCache.GetOrAdd(
behaviorType,
static (cachedBehaviorType, currentContainer) => currentContainer.HasRegistration(cachedBehaviorType),
container);
}
/// <summary>
/// 创建流式请求并返回异步响应序列。
/// </summary>
@ -203,6 +228,44 @@ internal sealed class CqrsDispatcher(
}
}
/// <summary>
/// 解析当前 publish 调用应使用的 notification publisher。
/// </summary>
/// <remarks>
/// 显式传入实例的路径优先;若调用方只在组合根里声明了 <see cref="INotificationPublisher" /> 类型映射,
/// 则在容器冻结后的首次 publish 才能拿到底层 provider 构造出来的实例。
/// 若容器中仍未声明任何策略,则回退到默认顺序发布器。
/// </remarks>
private INotificationPublisher ResolveNotificationPublisher()
{
if (_notificationPublisher is not null)
{
return _notificationPublisher;
}
var resolvedNotificationPublisher = _resolvedNotificationPublisher;
if (resolvedNotificationPublisher is not null)
{
return resolvedNotificationPublisher;
}
var registeredPublishers = container.GetAll(typeof(INotificationPublisher));
resolvedNotificationPublisher = registeredPublishers.Count switch
{
0 => new SequentialNotificationPublisher(),
1 => (INotificationPublisher)registeredPublishers[0],
_ => throw new InvalidOperationException(
$"Multiple {typeof(INotificationPublisher).FullName} instances are registered. Remove duplicate notification publisher strategies before publishing notifications.")
};
Interlocked.CompareExchange(
ref _resolvedNotificationPublisher,
resolvedNotificationPublisher,
comparand: null);
return _resolvedNotificationPublisher;
}
/// <summary>
/// 为指定请求类型构造完整分发绑定,把服务类型与强类型调用委托一次性收敛到同一缓存项。
/// </summary>

View File

@ -132,7 +132,7 @@ var playerId = await this.SendAsync(new CreatePlayerCommand(new CreatePlayerInpu
| --- | --- | --- | --- | --- |
| `SequentialNotificationPublisher` | 需要保持容器顺序,且希望首个失败立即停止后续分发 | 保证按容器解析顺序逐个执行 | 首个处理器抛出异常时立即停止 | 也是默认回退策略 |
| `TaskWhenAllNotificationPublisher` | 需要让全部处理器并行完成,并在结束后统一观察失败或取消 | 不保证顺序 | 不会在首个失败时停止其余处理器;会聚合最终异常或取消结果 | 更适合语义补齐,不是性能开关 |
| `UseNotificationPublisher(...)` 自定义实例 | 需要接入仓库外的自定义策略或第三方策略 | 取决于具体实现 | 取决于具体实现 | 仅在内置顺序 / 并行策略都不满足时使用 |
| `UseNotificationPublisher(...)` / `UseNotificationPublisher<TPublisher>()` | 需要接入仓库外的自定义策略或第三方策略 | 取决于具体实现 | 取决于具体实现 | 前者复用现成实例,后者让容器负责单例生命周期 |
- 若只是为了降低 fixed fan-out publish 的 steady-state 成本,当前 benchmark 并不表明 `TaskWhenAllNotificationPublisher` 会优于默认顺序发布器;它更适合你需要“等待全部处理器完成并统一观察失败”的场景。
@ -161,6 +161,14 @@ using GFramework.Cqrs.Notification;
container.UseNotificationPublisher(new TaskWhenAllNotificationPublisher());
```
如果你希望由容器负责创建并长期复用自定义 publisher也可以改用泛型重载
```csharp
using GFramework.Cqrs.Extensions;
container.UseNotificationPublisher<MyCustomNotificationPublisher>();
```
对于走标准 `GFramework.Core` 启动路径的架构,这些组合根扩展会被默认基础设施自动复用;如果你直接调用 `CqrsRuntimeFactory.CreateRuntime(...)`,也仍然可以像以前一样显式传入 publisher 实例。
- 流式请求
- 通过 `IStreamRequest<TResponse>``IStreamRequestHandler<,>` 返回 `IAsyncEnumerable<TResponse>`

View File

@ -10,7 +10,6 @@ using GFramework.Core.Ioc;
using GFramework.Cqrs;
using GFramework.Cqrs.Abstractions.Cqrs;
using GFramework.Cqrs.Command;
using GFramework.Cqrs.Notification;
using LegacyICqrsRuntime = GFramework.Core.Abstractions.Cqrs.ICqrsRuntime;
namespace GFramework.Tests.Common;
@ -65,8 +64,7 @@ public static class CqrsTestRuntime
if (container.Get<ICqrsRuntime>() is null)
{
var runtimeLogger = LoggerFactoryResolver.Provider.CreateLogger("CqrsDispatcher");
var notificationPublisher = container.Get<INotificationPublisher>();
var runtime = CqrsRuntimeFactory.CreateRuntime(container, runtimeLogger, notificationPublisher);
var runtime = CqrsRuntimeFactory.CreateRuntime(container, runtimeLogger);
container.Register(runtime);
RegisterLegacyRuntimeAlias(container, runtime);
}

View File

@ -7,10 +7,28 @@ CQRS 迁移与收敛。
## 当前恢复点
- 恢复点编号:`CQRS-REWRITE-RP-118`
- 恢复点编号:`CQRS-REWRITE-RP-123`
- 当前阶段:`Phase 8`
- 当前 PR 锚点:`PR #342`
- 当前 PR 锚点:`PR #344`
- 当前结论:
- 当前 `RP-123` 通过 `$gframework-pr-review` 重新复核 `feat/cqrs-optimization` 的 latest-head review确认 `PR #344` 仍成立且值得在本轮一起收口的问题共有四类:`CqrsDispatcher.ResolveNotificationPublisher()` 默认路径每次 publish 都重复查容器并在零注册分支分配新的 `SequentialNotificationPublisher``CqrsDispatcherContextValidationTests``CqrsNotificationPublisherTests` 的 strict `IIocContainer` helper 缺少 `GetAll(typeof(INotificationPublisher))` 默认装配,导致 CI 在真正断言前就被 mock 异常短路;`NotificationPublisherRegistrationExtensionsTests` 缺少“唯一注册”断言;`CqrsDispatcherCacheTests` 的隔离容器构建复制了 `SetUp()` 的注册形状,存在后续漂移风险
- 本轮保持改动面只落在 `GFramework.Cqrs``GFramework.Cqrs.Tests``ai-plan/public/cqrs-rewrite`,不扩散到新的 benchmark 宿主或额外 notification API其中 `CqrsDispatcher` 新增 dispatcher 实例级 `_resolvedNotificationPublisher` 缓存,并在首次解析后通过线程安全比较交换固定最终策略实例,继续保持“显式实例优先、容器内唯一注册次之、默认顺序发布器兜底”的既有契约
- 两个 strict mock runtime helper 现统一预设 `IIocContainer.GetAll(typeof(INotificationPublisher)) => Array.Empty<object>()`,把“未注册自定义 publisher 时回退到默认顺序发布器”这条默认路径显式纳入测试装配,避免后续相同语义再次被环境性 mock 配置遗漏掩盖
- `NotificationPublisherRegistrationExtensionsTests` 现在对泛型组合根重载补上 `container.GetAll(typeof(INotificationPublisher))` 的唯一注册断言,防止实现未来意外追加重复 descriptor 却仍因 `GetRequired<INotificationPublisher>()` 返回单个实例而误通过
- `CqrsDispatcherCacheTests` 新增 `ConfigureDispatcherCacheFixture(MicrosoftDiContainer)` 共享装配 helper`SetUp()``CreateFrozenContainer()` 复用同一份 CQRS 注册形状,消除 latest-head nitpick 指出的夹具/隔离容器漂移风险
- 本轮本地权威验证已通过:许可证头检查通过,`GFramework.Cqrs``GFramework.Cqrs.Tests` 的 Release build 通过;目标回归 `CqrsDispatcherContextValidationTests``CqrsNotificationPublisherTests``NotificationPublisherRegistrationExtensionsTests``CqrsDispatcherCacheTests` 合计 `30/30` passed
- `GFramework.Cqrs` 首轮与测试项目并行构建时曾出现 `MSB3026` 单次复制重试;串行重跑同一 `dotnet build GFramework.Cqrs/GFramework.Cqrs.csproj -c Release` 后稳定为 `0 warning / 0 error`,因此该信号判定为并行输出目录竞争噪音而非代码问题
- 当前 `RP-122` 继续沿用 `$gframework-batch-boot 50`,并在 `RP-121` 收口 notification 线阶段性闭环后切回 request steady-state 热点;本轮不再继续压 `HasRegistration(Type)` 内部实现,而是把“是否存在 request pipeline behavior”从每次 `SendAsync(...)` 都查询容器,收口为 `CqrsDispatcher` 实例级的首次判定缓存
- `GFramework.Cqrs/Internal/CqrsDispatcher.cs` 现新增 `_requestBehaviorPresenceCache`,按 `IPipelineBehavior<,>` 的闭合服务类型记住当前 dispatcher 持有容器里该行为是否存在注册;零管道 request 在首次命中后会直接走缓存分支,不再重复询问容器
- `GFramework.Cqrs.Tests/Cqrs/CqrsDispatcherCacheTests.cs` 现新增 `Dispatcher_Should_Cache_Zero_Pipeline_Request_Presence_Per_Dispatcher_Instance()`:该回归同时锁住两件事,一是同一容器解析出的多个 `ArchitectureContext` 共享同一个 runtime/dispatcher因此会复用同一实例级缓存二是另一套独立容器创建的 dispatcher 不会提前共享该缓存
- 本轮 short-job benchmark 表明这刀继续有效:默认 request steady-state 当前约为 baseline `5.876 ns / 32 B``Mediator` `5.275 ns / 32 B``GFramework.Cqrs` `51.717 ns / 32 B``MediatR` `56.108 ns / 232 B`request lifetime 下 `Singleton``52.490 ns / 32 B` vs `MediatR` `56.890 ns / 232 B``Transient``57.746 ns / 56 B` vs `MediatR` `55.545 ns / 232 B`
- 当前已提交分支相对 `origin/main``d389eb36`, `2026-05-08 20:08:33 +0800`)的累计 branch diff 为 `10 files / 377 changed lines`;本批待提交工作树只新增 `2 files / 187 changed lines`,即使提交后也仍明显低于 `$gframework-batch-boot 50` 的文件阈值
- request 线经过这批后已经从“direct-return ValueTask”“generated provider 宿主吸收”“零管道 presence cache”三层继续下探`Transient` 仍未稳定快于 `MediatR`;因此下一轮若继续压 request 热点,应继续选择真正减少 steady-state 常量路径的切片,而不是回头重试已被否决的 `IContextAware` 类型判定缓存
- 当前 `RP-119` 继续沿用 `$gframework-batch-boot 50`,并在分支已与 `origin/main` 对齐(`d389eb36`, `2026-05-08 20:08:33 +0800`)后,重新选择 notification publisher 线上一个更小的采用面切片:补齐 `UseNotificationPublisher<TPublisher>()` 的组合根采用说明与回归,而不是提前切回 request dispatch 热路径
- 本轮不修改 `GFramework.Cqrs` runtime 语义,只收口“泛型组合根入口是否真的可用、以及读者是否知道该在什么情况下选它”这两个采用缺口
- `NotificationPublisherRegistrationExtensionsTests` 现额外覆盖两条行为:泛型重载会把指定 publisher 类型注册为容器内唯一的单例策略;当容器里已存在 `INotificationPublisher` 注册时,泛型重载也会像实例重载一样在组合根阶段拒绝重复声明
- `GFramework.Cqrs/README.md``docs/zh-CN/core/cqrs.md` 现在把自定义策略入口统一写成 `UseNotificationPublisher(...)` / `UseNotificationPublisher<TPublisher>()`,并明确前者复用现成实例、后者让容器负责单例生命周期,避免用户误以为只能手写实例注册
- 当前批次提交前的工作树 diff 为 `5 files / 77 lines`,仍远低于 `$gframework-batch-boot 50` 的文件阈值;但这一轮的主停止依据仍是上下文预算与自然评审边界,因此本批完成后应直接收口,而不是顺手再开启新的 runtime 热点实验
- 当前 `RP-118` 已使用 `$gframework-pr-review` 复核 `PR #342` latest-head reviewCodeRabbit 当前仍成立的是 `NotificationFanOutBenchmarks` 中 MediatR 分支绕过共享 `HandleCore(...)``GFramework.Cqrs/README.md` 的 MD058 表格空行、以及恢复文档的 PR 锚点与 fan-out 历史值表述Greptile 额外指出的 `UseTaskWhenAllNotificationPublisher()` 示例多余 `using GFramework.Cqrs.Notification;` 也在本轮一并收口
- 本轮不改 `GFramework.Cqrs` runtime 语义,只让 benchmark 的 MediatR handler 与其余对照分支共用同一组空值 / 取消检查,并把 README、中文文档与 `cqrs-rewrite` 恢复文档同步到当前 PR #342 上下文
- 本轮按 `NotificationFanOutBenchmarks` short-job 复跑确认,对称化 MediatR handler 后当前 fixed `4 handler` fan-out 结果约为 `Mediator` `3.598 ns / 0 B`、baseline `7.033 ns / 0 B``MediatR` `257.533 ns / 1256 B``GFramework.Cqrs` 顺序 `409.557 ns / 408 B``TaskWhenAll` `484.531 ns / 496 B`
@ -74,14 +92,14 @@ CQRS 迁移与收敛。
- 当前 `RP-106` 已把同一套 generated-provider 宿主收口扩展到 `RequestPipelineBenchmarks`:新增 handwritten `GeneratedRequestPipelineBenchmarkRegistry`,并让 `RequestPipelineBenchmarks` 改走 `RegisterCqrsHandlersFromAssembly(...)` + benchmark CQRS 基础设施预接线;本轮 benchmark 表明 `0 pipeline` steady-state 进一步收敛到约 `64.755 ns / 32 B``1 pipeline``353.141 ns / 536 B``4 pipeline` 在短跑噪音下维持约 `555.083 ns / 896 B`
- 当前 `RP-107` 已把默认 stream steady-state 宿主也切到 generated-provider 路径:新增 handwritten `GeneratedDefaultStreamingBenchmarkRegistry`,让 `StreamingBenchmarks` 改走 `RegisterCqrsHandlersFromAssembly(...)` 并在 setup/cleanup 清理 dispatcher cache同时将 `gframework-boot` / `gframework-batch-boot` 的默认停止规则改为“AI 上下文预算优先,建议在预计接近约 80% 安全上下文占用前收口”,不再把 changed files 误当作唯一阈值
- 当前 `RP-108` 已补齐 stream handler `Singleton / Transient` 生命周期矩阵 benchmark新增 `StreamLifetimeBenchmarks``GeneratedStreamLifetimeBenchmarkRegistry`,让 stream 生命周期对照沿用 generated-provider 宿主接线而不是退回纯反射路径;本轮 benchmark 表明 `Singleton` 下 baseline / `GFramework.Cqrs` / `MediatR``80.144 ns / 137.515 ns / 229.242 ns``Transient` 下约 `77.198 ns / 144.998 ns / 228.185 ns`
- `ai-plan` active 入口现以 `RP-108` 为最新恢复锚点;`PR #340``PR #339``PR #334``PR #331``PR #326``PR #323``PR #307` 与其他更早阶段细节均以下方归档或说明为准
- `ai-plan` active 入口现以 `RP-122` 为最新恢复锚点;`PR #340``PR #339``PR #334``PR #331``PR #326``PR #323``PR #307` 与其他更早阶段细节均以下方归档或说明为准
## 当前活跃事实
- 当前分支为 `feat/cqrs-optimization`
- 本轮 `$gframework-batch-boot 50``origin/main` (`4d6dbba6`, 2026-05-08 11:13:33 +0800) 为基线;本地 `main` 仍落后,不作为 branch diff 基线
- 当前已提交分支相对 `origin/main` 的累计 branch diff 为 `14 files / 507 lines`
- 本批待提交工作树集中在 `GFramework.Cqrs.Benchmarks/Messaging/StreamLifetimeBenchmarks.cs`、`GFramework.Cqrs.Benchmarks/Messaging/GeneratedStreamLifetimeBenchmarkRegistry.cs``GFramework.Cqrs.Benchmarks/README.md`
- 本轮 `$gframework-batch-boot 50``origin/main` (`d389eb36`, 2026-05-08 20:08:33 +0800) 为基线;本地 `main` 仍落后,不作为 branch diff 基线
- 当前已提交分支相对 `origin/main` 的累计 branch diff 为 `10 files / 377 changed lines`
- 本批待提交工作树集中在 `GFramework.Cqrs/Internal/CqrsDispatcher.cs` 与 `GFramework.Cqrs.Tests/Cqrs/CqrsDispatcherCacheTests.cs`
- 当前批次后的默认停止依据已改为 AI 上下文预算:若下一轮预计会让活动对话、已加载 recovery 文档、验证输出与当前 diff 接近约 `80%` 安全上下文占用,应在当前自然批次边界停止,即使 branch diff 仍有余量
- `GFramework.Cqrs.Benchmarks` 作为 benchmark 基础设施项目,必须持续排除在 NuGet / GitHub Packages 发布集合之外
- `GFramework.Cqrs.Benchmarks` 现已覆盖 request steady-state、pipeline 数量矩阵、startup、request/stream generated invoker以及 request handler `Singleton / Transient` 生命周期矩阵
@ -137,6 +155,8 @@ CQRS 迁移与收敛。
## 当前风险
- 当前 `_requestBehaviorPresenceCache` 依赖“同一 dispatcher 生命周期内request pipeline 行为注册在容器冻结后保持稳定”这一约束;若未来引入运行时动态增删 request behavior 的模型,需要重新评估这类实例级 presence cache 的失效策略
- 标准架构启动路径现在已经有“自定义 notification publisher 不被默认顺序策略短路”的集成回归;但若后续再引入第三种仓库内置策略或新的启动快捷入口,仍需要同步补这条生产路径验证,不能只看 `CqrsTestRuntime` 测试宿主
- 顶层 `GFramework.sln` / `GFramework.csproj` 在 WSL 下仍可能受 Windows NuGet fallback 配置影响,完整 solution 级验证成本高于模块级验证
- 若后续新增 benchmark / example / tooling 项目但未同步校验发布面solution 级 `dotnet pack` 仍可能在 tag 发布前才暴露异常包
- `RequestStartupBenchmarks` 为了量化真正的单次 cold-start引入了 `InvocationCount=1` / `UnrollFactor=1` 的专用 job该配置会触发 BenchmarkDotNet 的 `MinIterationTime` 提示,后续若要做稳定基线比较,还需要决定是否引入批量外层循环或自定义 cold-start harness
@ -152,9 +172,42 @@ CQRS 迁移与收敛。
- stream pipeline 当前只在“单次建流”层面包裹 handler 调用;若后续需要 per-item 拦截、元素级重试或流内 metrics 聚合,仍需额外设计更细粒度 contract而不是把本轮 seam 直接等同于元素级 middleware
- `PR #339` 在 GitHub 上仍有 1 个已本地失效但未 resolve 的 stale test-thread若后续 head 再次变化,需要重新抓取 latest-head review 确认未解决线程是否收敛
- 若后续继续依赖 `HasRegistration(Type)` 做热路径短路,新增测试替身或 strict mock 时必须同步配置该调用,否则容易在真正业务断言之前被 mock 框架短路成环境性失败
- `PR #344` 当前 latest-head review 仍需等待新 commit 推送后的 GitHub 重新索引;在远端 thread 状态刷新前,不应仅凭现有 open-thread 计数判断本轮修复未生效
## 最近权威验证
- `python3 scripts/license-header.py --check --paths GFramework.Cqrs/Internal/CqrsDispatcher.cs GFramework.Cqrs.Tests/Cqrs/CqrsDispatcherContextValidationTests.cs GFramework.Cqrs.Tests/Cqrs/CqrsNotificationPublisherTests.cs GFramework.Cqrs.Tests/Cqrs/NotificationPublisherRegistrationExtensionsTests.cs GFramework.Cqrs.Tests/Cqrs/CqrsDispatcherCacheTests.cs ai-plan/public/cqrs-rewrite/todos/cqrs-rewrite-migration-tracking.md ai-plan/public/cqrs-rewrite/traces/cqrs-rewrite-migration-trace.md`
- 结果:通过
- `dotnet build GFramework.Cqrs/GFramework.Cqrs.csproj -c Release`
- 结果:通过,`0 warning / 0 error`
- 备注:首轮与 `GFramework.Cqrs.Tests` 并行构建时曾出现 `MSB3026` 单次复制重试;串行重跑同一命令后稳定通过
- `dotnet build GFramework.Cqrs.Tests/GFramework.Cqrs.Tests.csproj -c Release`
- 结果:通过,`0 warning / 0 error`
- `dotnet test GFramework.Cqrs.Tests/GFramework.Cqrs.Tests.csproj -c Release --no-build --filter "FullyQualifiedName~CqrsDispatcherContextValidationTests|FullyQualifiedName~CqrsNotificationPublisherTests|FullyQualifiedName~NotificationPublisherRegistrationExtensionsTests|FullyQualifiedName~CqrsDispatcherCacheTests"`
- 结果:通过,`30/30` passed
- `git diff --check`
- 结果:通过
- `dotnet build GFramework.Cqrs/GFramework.Cqrs.csproj -c Release`
- 结果:通过,`0 warning / 0 error`
- `dotnet test GFramework.Cqrs.Tests/GFramework.Cqrs.Tests.csproj -c Release --filter "FullyQualifiedName~CqrsDispatcherCacheTests"`
- 结果:通过,`11/11` passed
- `dotnet build GFramework.Cqrs.Benchmarks/GFramework.Cqrs.Benchmarks.csproj -c Release`
- 结果:通过,`0 warning / 0 error`
- `dotnet run --project GFramework.Cqrs.Benchmarks/GFramework.Cqrs.Benchmarks.csproj -c Release --no-build -- --filter "*RequestBenchmarks.SendRequest_*" --job short --warmupCount 1 --iterationCount 1 --launchCount 1`
- 结果:通过
- 备注:默认 request steady-state 当前约为 baseline `5.876 ns / 32 B``Mediator` `5.275 ns / 32 B``GFramework.Cqrs` `51.717 ns / 32 B``MediatR` `56.108 ns / 232 B`
- `dotnet run --project GFramework.Cqrs.Benchmarks/GFramework.Cqrs.Benchmarks.csproj -c Release --no-build -- --filter "*RequestLifetimeBenchmarks.SendRequest_*" --job short --warmupCount 1 --iterationCount 1 --launchCount 1`
- 结果:通过
- 备注:`Singleton` 下 baseline / `GFramework.Cqrs` / `MediatR``5.720 ns / 52.490 ns / 56.890 ns``Transient` 下约 `5.814 ns / 57.746 ns / 55.545 ns`
- `python3 scripts/license-header.py --check --paths GFramework.Cqrs/Internal/CqrsDispatcher.cs GFramework.Cqrs.Tests/Cqrs/CqrsDispatcherCacheTests.cs`
- 结果:通过
- `dotnet build GFramework.Core.Tests/GFramework.Core.Tests.csproj -c Release`
- 结果:通过,`0 warning / 0 error`
- `dotnet test GFramework.Core.Tests/GFramework.Core.Tests.csproj -c Release --no-build --filter "FullyQualifiedName~ArchitectureModulesBehaviorTests"`
- 结果:通过,`5/5` passed
- `python3 scripts/license-header.py --check --paths GFramework.Core.Tests/Architectures/ArchitectureModulesBehaviorTests.cs`
- 结果:通过
- `dotnet build GFramework.Cqrs.Benchmarks/GFramework.Cqrs.Benchmarks.csproj -c Release`
- 结果:通过,`0 warning / 0 error`
- `dotnet run --project GFramework.Cqrs.Benchmarks/GFramework.Cqrs.Benchmarks.csproj -c Release --no-build -- --filter "*NotificationFanOutBenchmarks*" --job short --warmupCount 1 --iterationCount 1 --launchCount 1`
@ -392,9 +445,9 @@ CQRS 迁移与收敛。
## 下一推荐步骤
1. 既然 `RP-117` 已把 notification publisher 的采用路径收口成显式策略矩阵,下一轮若继续留在 notification 线,优先评估是否需要补第三种仓库内置策略或更贴近示例代码的采用文档,而不是再重复翻写同一套边界说明
2. 当前 benchmark 仍证明 `TaskWhenAllNotificationPublisher` 的价值主要在并行完成与异常聚合语义,而不是吞吐收益;若 notification 文档已经足够,下一轮再回到 request dispatch 常量开销时,应先避开“类型级 `IContextAware` 判定缓存”这条已验证无收益的热点假设
3. 若 benchmark 对照需要继续贴近 `Mediator` 官方设计,再评估 `Mediator` 的 compile-time lifetime / stream 对照矩阵,或给 stream 引入 scoped host 基线,而不是回头重试已被 benchmark 否决的 `GetAll(Type)` 零行为探测方案
1. 若下一轮继续压 request steady-state优先挑选仍能减少常量热路径查询/分支的切片;继续避开“类型级 `IContextAware` 判定缓存”这条已验证无收益的热点假设
2. 若下一轮转向 benchmark 对齐,优先评估 `request scoped host + compile-time lifetime` 对照,而不是继续并行跑多个 BenchmarkDotNet 任务去争用同一自动生成目录
3. 若下一轮回到 notification 线,应把问题重新收敛到“是否值得公开第三种仓库内置 publisher strategy”或“是否需要 `IServiceCollection` 版本的公开入口”,而不是继续重复扩同层级回归
## 活跃文档

View File

@ -1,5 +1,122 @@
# CQRS 重写迁移追踪
## 2026-05-09
### 阶段PR #344 latest-head review 收尾CQRS-REWRITE-RP-123
- 使用 `$gframework-pr-review` 重新抓取当前分支 PR确认当前 worktree 对应 `PR #344`latest-head 仍有 `CodeRabbit 2` / `Greptile 1` open thread
- 主线程逐条复核后确认仍成立的问题:
- `CodeRabbit``NotificationPublisherRegistrationExtensionsTests` 的“唯一注册”断言建议仍有效
- `CodeRabbit` 对 strict `IIocContainer` mock 缺少 `GetAll(typeof(INotificationPublisher))` 默认装配的 CI 失败结论仍有效,且更适合在两个测试 helper 层统一兜底
- `CodeRabbit``CqrsDispatcherCacheTests` 的共享装配 helper 建议仍有效,属于真实维护性风险而非纯样式问题
- `Greptile` 指出的 `ResolveNotificationPublisher()` 热路径重复 `GetAll(...)` 与默认 publisher 重复分配也成立;由于容器在 publish 前已冻结dispatcher 生命周期内可以安全缓存最终解析结果
- 本轮决策:
- 为 `CqrsDispatcher` 增加 dispatcher 实例级 `_resolvedNotificationPublisher` 缓存,并使用线程安全比较交换固定首次解析出的最终策略实例
- 在 `CqrsDispatcherContextValidationTests``CqrsNotificationPublisherTests` 的 strict mock runtime helper 中统一预设 `GetAll(typeof(INotificationPublisher))` 返回空集合
- 在 `NotificationPublisherRegistrationExtensionsTests` 为泛型组合根重载补上 `INotificationPublisher` 唯一注册断言
- 在 `CqrsDispatcherCacheTests` 提取共享的 `ConfigureDispatcherCacheFixture(...)`,消除 `SetUp()``CreateFrozenContainer()` 的注册漂移风险
- 本轮验证:
- `python3 scripts/license-header.py --check --paths GFramework.Cqrs/Internal/CqrsDispatcher.cs GFramework.Cqrs.Tests/Cqrs/CqrsDispatcherContextValidationTests.cs GFramework.Cqrs.Tests/Cqrs/CqrsNotificationPublisherTests.cs GFramework.Cqrs.Tests/Cqrs/NotificationPublisherRegistrationExtensionsTests.cs GFramework.Cqrs.Tests/Cqrs/CqrsDispatcherCacheTests.cs ai-plan/public/cqrs-rewrite/todos/cqrs-rewrite-migration-tracking.md ai-plan/public/cqrs-rewrite/traces/cqrs-rewrite-migration-trace.md`
- 结果:通过
- `dotnet build GFramework.Cqrs/GFramework.Cqrs.csproj -c Release`
- 结果:通过,`0 warning / 0 error`
- 备注:首轮与 `GFramework.Cqrs.Tests` 并行构建时出现 `MSB3026` 单次复制重试;串行重跑后稳定通过,判定为输出目录竞争噪音
- `dotnet build GFramework.Cqrs.Tests/GFramework.Cqrs.Tests.csproj -c Release`
- 结果:通过,`0 warning / 0 error`
- `dotnet test GFramework.Cqrs.Tests/GFramework.Cqrs.Tests.csproj -c Release --no-build --filter "FullyQualifiedName~CqrsDispatcherContextValidationTests|FullyQualifiedName~CqrsNotificationPublisherTests|FullyQualifiedName~NotificationPublisherRegistrationExtensionsTests|FullyQualifiedName~CqrsDispatcherCacheTests"`
- 结果:通过,`30/30` passed
- `git diff --check`
- 结果:通过
- 下一恢复点:
- 推送本轮 commit 后,再次运行 `$gframework-pr-review` 复核 `PR #344` latest-head open thread 是否已随新 head 收敛;若仍残留 open thread再区分 stale 状态与新增 review
### 阶段request 零管道 behavior presence cacheCQRS-REWRITE-RP-122
- 延续 `$gframework-batch-boot 50`,本轮在 `RP-121` 把 notification 线阶段性收口后,重新回到 request steady-state 常量开销,并接受并行 explorer 的共同结论:下一刀应继续减少每次 `SendAsync(...)` 必经的通用查询,而不是回头优化 `HasRegistration(Type)` 内部实现或重试已证伪的 `IContextAware` 类型缓存
- 本轮主线程决策:
- 只改 `GFramework.Cqrs/Internal/CqrsDispatcher.cs``GFramework.Cqrs.Tests/Cqrs/CqrsDispatcherCacheTests.cs`,不同时打开 scoped benchmark 宿主或 notification 新公开 API 两条线
- 为 `CqrsDispatcher` 新增 `_requestBehaviorPresenceCache`,按闭合 `IPipelineBehavior<,>` 服务类型缓存“当前 dispatcher 的容器里是否存在该 request behavior 注册”
- 保持优化面只覆盖 request `0 pipeline` 热路径stream 对称缓存与 scoped host benchmark 继续留到后续独立批次
- 在 `CqrsDispatcherCacheTests` 新增实例级回归,明确“同容器多个 `ArchitectureContext` 解析到同一个 runtime/dispatcher会共享该缓存另一独立容器创建的 dispatcher 不共享该缓存”
- 本轮权威验证:
- `dotnet build GFramework.Cqrs/GFramework.Cqrs.csproj -c Release`
- 结果:通过,`0 warning / 0 error`
- `dotnet test GFramework.Cqrs.Tests/GFramework.Cqrs.Tests.csproj -c Release --filter "FullyQualifiedName~CqrsDispatcherCacheTests"`
- 结果:通过,`11/11` passed
- 备注:新增回归首轮曾因错误假设“不同 `ArchitectureContext` 必定对应不同 dispatcher”而失败修正为“同容器共享 runtime、独立容器不共享缓存”后稳定通过
- `dotnet build GFramework.Cqrs.Benchmarks/GFramework.Cqrs.Benchmarks.csproj -c Release`
- 结果:通过,`0 warning / 0 error`
- `dotnet run --project GFramework.Cqrs.Benchmarks/GFramework.Cqrs.Benchmarks.csproj -c Release --no-build -- --filter "*RequestBenchmarks.SendRequest_*" --job short --warmupCount 1 --iterationCount 1 --launchCount 1`
- 结果:通过
- 备注:默认 request steady-state 当前约为 baseline `5.876 ns / 32 B``Mediator` `5.275 ns / 32 B``GFramework.Cqrs` `51.717 ns / 32 B``MediatR` `56.108 ns / 232 B`
- `dotnet run --project GFramework.Cqrs.Benchmarks/GFramework.Cqrs.Benchmarks.csproj -c Release --no-build -- --filter "*RequestLifetimeBenchmarks.SendRequest_*" --job short --warmupCount 1 --iterationCount 1 --launchCount 1`
- 结果:通过
- 备注:首次与 `RequestBenchmarks` 并行触发时BenchmarkDotNet 自动生成项目目录发生 `.nuget.g.props already exists` 冲突;改为串行重跑同一命令后,`Singleton` 下 baseline / `GFramework.Cqrs` / `MediatR``5.720 ns / 52.490 ns / 56.890 ns``Transient` 下约 `5.814 ns / 57.746 ns / 55.545 ns`
- `python3 scripts/license-header.py --check --paths GFramework.Cqrs/Internal/CqrsDispatcher.cs GFramework.Cqrs.Tests/Cqrs/CqrsDispatcherCacheTests.cs`
- 结果:通过
- 本轮结论:
- request `0 pipeline` 常量路径再次被压短,默认 steady-state request 与 `Singleton` lifetime 均继续快于当前 `MediatR` short-job 基线
- `Transient` 仍略慢于 `MediatR`,但相较更早轮次已明显收敛;下一轮若继续 request 热点,更值得继续减少 steady-state 必经路径,或切到 explorer 建议的 `request scoped host + compile-time lifetime` 对齐线,而不是继续打磨已收益有限的 `HasRegistration(Type)` 内部细节
### 阶段:标准架构启动路径 notification publisher 回归CQRS-REWRITE-RP-121
- 延续 `$gframework-batch-boot 50`,本轮没有继续扩 notification runtime 语义,而是先给 `RP-120` 刚修复的默认接线补一条更贴近生产的架构启动回归
- 本轮主线程决策:
- 保持写面只落在 `GFramework.Core.Tests/Architectures/ArchitectureModulesBehaviorTests.cs`,不再改动 `GFramework.Cqrs` / `GFramework.Core` 运行时代码
- 通过 `Architecture.Configurator` 注册依赖容器 probe 的自定义 `INotificationPublisher`,并在 `OnInitialize()` 显式接入额外程序集 notification handler验证默认 `Architecture.InitializeAsync()` 路径最终 publish 时不会退回默认顺序策略
- 用现有 `AdditionalAssemblyNotificationHandlerRegistry` 测试桩承载 handler 执行观察,把本轮信号收敛到“标准架构启动路径是否真正复用自定义 publisher”
- 本轮权威验证:
- `dotnet build GFramework.Core.Tests/GFramework.Core.Tests.csproj -c Release`
- 结果:通过,`0 warning / 0 error`
- `dotnet test GFramework.Core.Tests/GFramework.Core.Tests.csproj -c Release --no-build --filter "FullyQualifiedName~ArchitectureModulesBehaviorTests"`
- 结果:通过,`5/5` passed
- `python3 scripts/license-header.py --check --paths GFramework.Core.Tests/Architectures/ArchitectureModulesBehaviorTests.cs`
- 结果:通过
- 本轮结论:
- 标准 `Architecture.InitializeAsync()` 启动路径现在也被回归锁住:通过 `Configurator` 声明的自定义 `INotificationPublisher` 会在真实 publish 路径里被复用,不会再被 `CqrsRuntimeModule` 创建 runtime 时静默短路成默认顺序发布器
- notification 线当前已形成“组合根入口 -> 默认接线修复 -> 标准架构启动回归”的闭环;下一轮若继续留在该方向,更合理的是重新评估产品面是否真的需要第三种仓库内置策略,而不是继续堆同层级回归
### 阶段notification publisher 默认接线修复CQRS-REWRITE-RP-120
- 延续 `$gframework-batch-boot 50`,本轮沿着 `RP-119` 的 notification publisher 组合根回归继续向下追,发现这不是单纯的文档或测试补洞,而是默认 runtime 接线存在真实时序缺陷
- 本轮主线程决策:
- 保持修复面收敛在 notification publisher 单线,不把问题扩散到 request dispatch 热路径或无关模块
- 让 `CqrsRuntimeFactory.CreateRuntime(...)` 不再在工厂层把 `null` publisher 立即替换成 `SequentialNotificationPublisher`,改由 `CqrsDispatcher` 在真正 publish 时优先复用显式实例或容器内唯一注册策略,最后才回退到默认顺序发布器
- 同步移除 `CqrsRuntimeModule``GFramework.Tests.Common/CqrsTestRuntime` 里对 `container.Get<INotificationPublisher>()` 的预解析,避免冻结前可见性再次把策略短路掉
- 在 `NotificationPublisherRegistrationExtensionsTests` 新增“publisher 依赖容器内探针服务”的真实采用回归,并重新验证 `UseTaskWhenAllNotificationPublisher()` 在默认基础设施路径里会继续调度所有处理器
- 本轮权威验证:
- `dotnet build GFramework.Core/GFramework.Core.csproj -c Release`
- 结果:通过,`0 warning / 0 error`
- `dotnet test GFramework.Cqrs.Tests/GFramework.Cqrs.Tests.csproj -c Release --filter "FullyQualifiedName~NotificationPublisherRegistrationExtensionsTests"`
- 结果:通过,`7/7` passed
- `python3 scripts/license-header.py --check --paths GFramework.Cqrs/Internal/CqrsDispatcher.cs GFramework.Cqrs/CqrsRuntimeFactory.cs GFramework.Core/Services/Modules/CqrsRuntimeModule.cs GFramework.Tests.Common/CqrsTestRuntime.cs GFramework.Cqrs.Tests/Cqrs/NotificationPublisherRegistrationExtensionsTests.cs`
- 结果:通过
- `git diff --check`
- 结果:通过
- 本轮结论:
- `UseTaskWhenAllNotificationPublisher()``UseNotificationPublisher<TPublisher>()` 现在不再只是“能注册进容器”,而是能真正穿过默认 runtime 基础设施参与 publish 路径
- 本轮属于完整的语义修复批次,应在提交后再决定是否继续 notification 线或切回 request steady-state 热点
### 阶段notification publisher 泛型组合根入口收口CQRS-REWRITE-RP-119
- 延续 `$gframework-batch-boot 50`,本轮在 `feat/cqrs-optimization` 已与 `origin/main` 对齐后,没有直接重开 request dispatch 热路径实验,而是先选择 notification publisher 线上一个更小、可直接评审的采用面切片
- 本轮主线程决策:
- 保持 `GFramework.Cqrs` runtime 代码不变,只补 `UseNotificationPublisher<TPublisher>()` 的组合根回归与用户文档说明
- 在 `NotificationPublisherRegistrationExtensionsTests` 新增两条 targeted 回归,确认泛型重载会注册唯一单例策略,且在容器已存在 `INotificationPublisher` 时同样会拒绝重复声明
- 在 `GFramework.Cqrs/README.md``docs/zh-CN/core/cqrs.md` 把自定义入口统一写成 `UseNotificationPublisher(...)` / `UseNotificationPublisher<TPublisher>()`,并明确实例重载与泛型重载的生命周期边界
- 本轮权威验证:
- `dotnet build GFramework.Cqrs/GFramework.Cqrs.csproj -c Release`
- 结果:通过,`0 warning / 0 error`
- `dotnet test GFramework.Cqrs.Tests/GFramework.Cqrs.Tests.csproj -c Release --filter "FullyQualifiedName~NotificationPublisherRegistrationExtensionsTests"`
- 结果:通过,`6/6` passed
- `python3 scripts/license-header.py --check --paths GFramework.Cqrs.Tests/Cqrs/NotificationPublisherRegistrationExtensionsTests.cs GFramework.Cqrs/README.md docs/zh-CN/core/cqrs.md ai-plan/public/cqrs-rewrite/todos/cqrs-rewrite-migration-tracking.md ai-plan/public/cqrs-rewrite/traces/cqrs-rewrite-migration-trace.md`
- 结果:通过
- `git diff --check`
- 结果:通过
- 本轮结论:
- notification publisher 的组合根采用面现在不再默认读者只能“手里先有一个实例”;文档与回归都已明确容器托管型自定义 publisher 的标准入口
- 这批仍然保持在低风险、单模块、易评审边界内,适合在完成验证后直接收口为新的恢复点
## 2026-05-08
### 阶段PR #342 latest-head review 收口CQRS-REWRITE-RP-118

View File

@ -125,7 +125,7 @@ var playerId = await architecture.Context.SendRequestAsync(
| --- | --- | --- | --- | --- |
| `UseSequentialNotificationPublisher()` | 需要保持容器顺序,且希望首个失败立即停止 | 保证按容器顺序执行 | 首个处理器异常会中断后续处理器 | 这也是默认回退策略 |
| `UseTaskWhenAllNotificationPublisher()` | 需要让全部处理器并行完成,再统一观察异常或取消 | 不保证顺序 | 不会在首个失败时中断其余处理器;全部结束后统一暴露结果 | 更适合语义补齐,不是性能优化开关 |
| `UseNotificationPublisher(...)` | 需要接入自定义或第三方 publisher 策略 | 取决于实现 | 取决于实现 | 仅在内置顺序 / 并行策略都不满足时使用 |
| `UseNotificationPublisher(...)` / `UseNotificationPublisher<TPublisher>()` | 需要接入自定义或第三方 publisher 策略 | 取决于实现 | 取决于实现 | 前者复用现成实例,后者让容器负责单例生命周期 |
如果你想在组合根里显式保留默认顺序语义,也可以直接写成:
@ -160,6 +160,14 @@ using GFramework.Cqrs.Notification;
container.UseNotificationPublisher(new TaskWhenAllNotificationPublisher());
```
如果你的自定义 publisher 需要继续由容器构造和托管,也可以改用泛型注册入口:
```csharp
using GFramework.Cqrs.Extensions;
container.UseNotificationPublisher<MyCustomNotificationPublisher>();
```
## Request 与流式变体
除了最常见的 `Command` / `Query` / `Notification`,当前公开面还覆盖两类容易被忽略的入口: