From 22f608eb4db4be82f87b747d9291cb427fa38fed Mon Sep 17 00:00:00 2001 From: gewuyou <95328647+GeWuYou@users.noreply.github.com> Date: Thu, 30 Apr 2026 11:07:24 +0800 Subject: [PATCH] =?UTF-8?q?feat(cqrs):=20=E6=96=B0=E5=A2=9E=E9=80=9A?= =?UTF-8?q?=E7=9F=A5=E5=8F=91=E5=B8=83=E7=AD=96=E7=95=A5=E6=8E=A5=E7=BC=9D?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - 新增 notification publisher seam 与默认顺序发布器,保持零处理器静默完成与首错即停语义 - 调整 dispatcher、runtime factory 与测试基础设施,支持复用容器中预注册的通知发布策略 - 补充 publisher 回归测试并更新 CQRS 文档与 ai-plan 恢复点 --- .../Services/Modules/CqrsRuntimeModule.cs | 4 +- .../Cqrs/CqrsNotificationPublisherTests.cs | 326 ++++++++++++++++++ GFramework.Cqrs/CqrsRuntimeFactory.cs | 24 +- GFramework.Cqrs/Internal/CqrsDispatcher.cs | 68 +++- .../SequentialNotificationPublisher.cs | 35 ++ .../DelegatingNotificationPublishContext.cs | 52 +++ .../Notification/INotificationPublisher.cs | 26 ++ .../NotificationHandlerExecutor.cs | 20 ++ .../NotificationPublishContext.cs | 50 +++ GFramework.Cqrs/README.md | 3 + GFramework.Tests.Common/CqrsTestRuntime.cs | 4 +- .../todos/cqrs-rewrite-migration-tracking.md | 34 +- .../traces/cqrs-rewrite-migration-trace.md | 35 ++ docs/zh-CN/core/cqrs.md | 9 +- 14 files changed, 675 insertions(+), 15 deletions(-) create mode 100644 GFramework.Cqrs.Tests/Cqrs/CqrsNotificationPublisherTests.cs create mode 100644 GFramework.Cqrs/Internal/SequentialNotificationPublisher.cs create mode 100644 GFramework.Cqrs/Notification/DelegatingNotificationPublishContext.cs create mode 100644 GFramework.Cqrs/Notification/INotificationPublisher.cs create mode 100644 GFramework.Cqrs/Notification/NotificationHandlerExecutor.cs create mode 100644 GFramework.Cqrs/Notification/NotificationPublishContext.cs diff --git a/GFramework.Core/Services/Modules/CqrsRuntimeModule.cs b/GFramework.Core/Services/Modules/CqrsRuntimeModule.cs index 1da8f684..64076362 100644 --- a/GFramework.Core/Services/Modules/CqrsRuntimeModule.cs +++ b/GFramework.Core/Services/Modules/CqrsRuntimeModule.cs @@ -3,6 +3,7 @@ using GFramework.Core.Abstractions.Ioc; using GFramework.Core.Abstractions.Logging; using GFramework.Cqrs; using GFramework.Cqrs.Abstractions.Cqrs; +using GFramework.Cqrs.Notification; using LegacyICqrsRuntime = GFramework.Core.Abstractions.Cqrs.ICqrsRuntime; namespace GFramework.Core.Services.Modules; @@ -40,7 +41,8 @@ public sealed class CqrsRuntimeModule : IServiceModule var dispatcherLogger = LoggerFactoryResolver.Provider.CreateLogger("CqrsDispatcher"); var registrarLogger = LoggerFactoryResolver.Provider.CreateLogger("DefaultCqrsHandlerRegistrar"); var registrationLogger = LoggerFactoryResolver.Provider.CreateLogger("DefaultCqrsRegistrationService"); - var runtime = CqrsRuntimeFactory.CreateRuntime(container, dispatcherLogger); + var notificationPublisher = container.Get(); + var runtime = CqrsRuntimeFactory.CreateRuntime(container, dispatcherLogger, notificationPublisher); var registrar = CqrsRuntimeFactory.CreateHandlerRegistrar(container, registrarLogger); container.Register(runtime); diff --git a/GFramework.Cqrs.Tests/Cqrs/CqrsNotificationPublisherTests.cs b/GFramework.Cqrs.Tests/Cqrs/CqrsNotificationPublisherTests.cs new file mode 100644 index 00000000..b069be94 --- /dev/null +++ b/GFramework.Cqrs.Tests/Cqrs/CqrsNotificationPublisherTests.cs @@ -0,0 +1,326 @@ +using GFramework.Core.Abstractions.Architectures; +using GFramework.Core.Abstractions.Ioc; +using GFramework.Core.Abstractions.Logging; +using GFramework.Core.Architectures; +using GFramework.Core.Ioc; +using GFramework.Core.Logging; +using GFramework.Cqrs.Abstractions.Cqrs; +using GFramework.Cqrs.Cqrs; +using GFramework.Cqrs.Notification; +using GFramework.Cqrs.Tests.Logging; + +namespace GFramework.Cqrs.Tests.Cqrs; + +/// +/// 验证默认 CQRS runtime 的通知发布策略接缝。 +/// +[TestFixture] +internal sealed class CqrsNotificationPublisherTests +{ + /// + /// 验证当调用方显式提供自定义通知发布器时,dispatcher 会按该发布器定义的顺序执行处理器。 + /// + [Test] + public async Task PublishAsync_Should_Use_Custom_NotificationPublisher_When_Runtime_Is_Created_With_It() + { + var invocationOrder = new List(); + var handlers = new object[] + { + new RecordingNotificationHandler("first", invocationOrder), + new RecordingNotificationHandler("second", invocationOrder) + }; + var runtime = CreateRuntime( + container => + { + container + .Setup(currentContainer => currentContainer.GetAll(typeof(INotificationHandler))) + .Returns(handlers); + }, + new ReverseOrderNotificationPublisher()); + + await runtime.PublishAsync(new FakeCqrsContext(), new PublisherNotification()).ConfigureAwait(false); + + Assert.That(invocationOrder, Is.EqualTo(["second", "first"])); + } + + /// + /// 验证当容器在 runtime 创建前已显式注册自定义通知发布器时, + /// `RegisterInfrastructure` 这条默认接线会复用该策略。 + /// + [Test] + public async Task RegisterInfrastructure_Should_Use_PreRegistered_NotificationPublisher() + { + LoggerFactoryResolver.Provider = new ConsoleLoggerFactoryProvider(); + + var container = new MicrosoftDiContainer(); + var publisher = new TrackingNotificationPublisher(); + container.Register(publisher); + container.Register>(new RecordingNotificationHandler("only", [])); + CqrsTestRuntime.RegisterInfrastructure(container); + container.Freeze(); + + var context = new ArchitectureContext(container); + + await context.PublishAsync(new PublisherNotification()).ConfigureAwait(false); + + Assert.That(publisher.WasCalled, Is.True); + } + + /// + /// 验证自定义通知发布器通过发布上下文回调执行处理器时,dispatcher 仍会在调用前注入当前架构上下文。 + /// + [Test] + public async Task PublishAsync_Should_Prepare_Context_Before_Custom_Publisher_Invokes_Handler() + { + var handler = new ContextAwarePublisherTestHandler(); + var architectureContext = new Mock(MockBehavior.Strict); + var runtime = CreateRuntime( + container => + { + container + .Setup(currentContainer => currentContainer.GetAll(typeof(INotificationHandler))) + .Returns([handler]); + }, + new PassthroughNotificationPublisher()); + + await runtime.PublishAsync(architectureContext.Object, new PublisherNotification()).ConfigureAwait(false); + + Assert.That(handler.ObservedContext, Is.SameAs(architectureContext.Object)); + } + + /// + /// 验证默认通知发布器在零处理器场景下会保持静默完成。 + /// + [Test] + public void PublishAsync_Should_Complete_When_No_Handlers_Are_Registered() + { + var runtime = CreateRuntime( + container => + { + container + .Setup(currentContainer => currentContainer.GetAll(typeof(INotificationHandler))) + .Returns(Array.Empty()); + }); + + Assert.That( + async () => await runtime.PublishAsync(new FakeCqrsContext(), new PublisherNotification()).ConfigureAwait(false), + Throws.Nothing); + } + + /// + /// 验证默认通知发布器会保持“首个异常立即中断后续处理器”的既有语义。 + /// + [Test] + public void PublishAsync_Should_Stop_After_First_Handler_Exception_When_Using_Default_Publisher() + { + var trailingHandler = new RecordingNotificationHandler("second", []); + var runtime = CreateRuntime( + container => + { + container + .Setup(currentContainer => currentContainer.GetAll(typeof(INotificationHandler))) + .Returns( + [ + new ThrowingNotificationHandler(), + trailingHandler + ]); + }); + + Assert.That( + async () => await runtime.PublishAsync(new FakeCqrsContext(), new PublisherNotification()).ConfigureAwait(false), + Throws.InvalidOperationException.With.Message.EqualTo("boom")); + Assert.That(trailingHandler.Invoked, Is.False); + } + + /// + /// 创建一个只满足当前测试最小依赖面的 dispatcher runtime。 + /// + /// 对容器 mock 的额外配置。 + /// 要注入的自定义通知发布器;若为 则使用默认发布器。 + /// 默认 CQRS runtime。 + private static GFramework.Cqrs.Abstractions.Cqrs.ICqrsRuntime CreateRuntime( + Action> configureContainer, + INotificationPublisher? notificationPublisher = null) + { + var container = new Mock(MockBehavior.Strict); + var logger = new TestLogger(nameof(CqrsNotificationPublisherTests), LogLevel.Debug); + + configureContainer(container); + return CqrsRuntimeFactory.CreateRuntime(container.Object, logger, notificationPublisher); + } + + /// + /// 为当前测试提供最小的 CQRS 上下文标记。 + /// + private sealed class FakeCqrsContext : ICqrsContext + { + } + + /// + /// 为通知发布器测试提供最小通知类型。 + /// + private sealed record PublisherNotification : INotification; + + /// + /// 按传入顺序直接执行处理器的测试发布器。 + /// + private sealed class PassthroughNotificationPublisher : INotificationPublisher + { + /// + /// 按当前处理器集合顺序执行所有处理器。 + /// + /// 通知类型。 + /// 当前发布上下文。 + /// 取消令牌。 + /// 表示通知发布完成的值任务。 + public async ValueTask PublishAsync( + NotificationPublishContext context, + CancellationToken cancellationToken = default) + where TNotification : INotification + { + foreach (var handler in context.Handlers) + { + await context.InvokeHandlerAsync(handler, cancellationToken).ConfigureAwait(false); + } + } + } + + /// + /// 按逆序执行处理器的测试发布器,用于证明 dispatcher 已真正委托给自定义策略。 + /// + private sealed class ReverseOrderNotificationPublisher : INotificationPublisher + { + /// + /// 按逆序执行当前发布上下文中的所有处理器。 + /// + /// 通知类型。 + /// 当前发布上下文。 + /// 取消令牌。 + /// 表示通知发布完成的值任务。 + public async ValueTask PublishAsync( + NotificationPublishContext context, + CancellationToken cancellationToken = default) + where TNotification : INotification + { + for (var index = context.Handlers.Count - 1; index >= 0; index--) + { + await context.InvokeHandlerAsync(context.Handlers[index], cancellationToken).ConfigureAwait(false); + } + } + } + + /// + /// 仅记录自身是否被调用的测试发布器,用于验证默认接线是否已接管到自定义策略。 + /// + private sealed class TrackingNotificationPublisher : INotificationPublisher + { + /// + /// 获取当前发布器是否至少执行过一次发布。 + /// + public bool WasCalled { get; private set; } + + /// + /// 记录当前发布器已被调用,并继续按当前顺序执行所有处理器。 + /// + /// 通知类型。 + /// 当前发布上下文。 + /// 取消令牌。 + /// 表示通知发布完成的值任务。 + public async ValueTask PublishAsync( + NotificationPublishContext context, + CancellationToken cancellationToken = default) + where TNotification : INotification + { + WasCalled = true; + + foreach (var handler in context.Handlers) + { + await context.InvokeHandlerAsync(handler, cancellationToken).ConfigureAwait(false); + } + } + } + + /// + /// 记录调用顺序的最小通知处理器。 + /// + private sealed class RecordingNotificationHandler : INotificationHandler + { + private readonly List _invocationOrder; + private readonly string _name; + + /// + /// 初始化一个记录调用顺序的测试处理器。 + /// + /// 当前处理器对应的名称。 + /// 承载调用顺序的列表。 + public RecordingNotificationHandler(string name, List invocationOrder) + { + ArgumentNullException.ThrowIfNull(name); + ArgumentNullException.ThrowIfNull(invocationOrder); + + _name = name; + _invocationOrder = invocationOrder; + } + + /// + /// 获取当前处理器是否已被调用。 + /// + public bool Invoked { get; private set; } + + /// + /// 把当前处理器名称追加到调用顺序列表。 + /// + /// 当前通知。 + /// 取消令牌。 + /// 已完成的值任务。 + public ValueTask Handle(PublisherNotification notification, CancellationToken cancellationToken) + { + Invoked = true; + _invocationOrder.Add(_name); + return ValueTask.CompletedTask; + } + } + + /// + /// 在被调用时主动抛出异常的测试处理器。 + /// + private sealed class ThrowingNotificationHandler : INotificationHandler + { + /// + /// 抛出固定异常,验证默认发布器的失败即停语义。 + /// + /// 当前通知。 + /// 取消令牌。 + /// 不会成功返回。 + /// 始终抛出,表示当前处理器失败。 + public ValueTask Handle(PublisherNotification notification, CancellationToken cancellationToken) + { + throw new InvalidOperationException("boom"); + } + } + + /// + /// 记录 dispatcher 是否在自定义发布器路径中完成上下文注入的测试处理器。 + /// + private sealed class ContextAwarePublisherTestHandler + : CqrsContextAwareHandlerBase, + INotificationHandler + { + /// + /// 获取当前处理器在执行时观察到的架构上下文。 + /// + public IArchitectureContext? ObservedContext { get; private set; } + + /// + /// 记录当前执行时观察到的架构上下文。 + /// + /// 当前通知。 + /// 取消令牌。 + /// 已完成的值任务。 + public ValueTask Handle(PublisherNotification notification, CancellationToken cancellationToken) + { + ObservedContext = Context; + return ValueTask.CompletedTask; + } + } +} diff --git a/GFramework.Cqrs/CqrsRuntimeFactory.cs b/GFramework.Cqrs/CqrsRuntimeFactory.cs index cbed68aa..32433f88 100644 --- a/GFramework.Cqrs/CqrsRuntimeFactory.cs +++ b/GFramework.Cqrs/CqrsRuntimeFactory.cs @@ -2,6 +2,7 @@ using GFramework.Core.Abstractions.Ioc; using GFramework.Core.Abstractions.Logging; using GFramework.Cqrs.Abstractions.Cqrs; using GFramework.Cqrs.Internal; +using GFramework.Cqrs.Notification; namespace GFramework.Cqrs; @@ -24,11 +25,32 @@ public static class CqrsRuntimeFactory /// 。 /// public static ICqrsRuntime CreateRuntime(IIocContainer container, ILogger logger) + { + return CreateRuntime(container, logger, notificationPublisher: null); + } + + /// + /// 创建默认 CQRS runtime 分发器,并允许调用方指定通知发布策略。 + /// + /// 目标依赖注入容器。 + /// 用于 runtime 诊断的日志器。 + /// 可选的通知发布策略;若为 则使用默认顺序发布器。 + /// 默认 CQRS runtime。 + /// + /// 。 + /// + public static ICqrsRuntime CreateRuntime( + IIocContainer container, + ILogger logger, + INotificationPublisher? notificationPublisher) { ArgumentNullException.ThrowIfNull(container); ArgumentNullException.ThrowIfNull(logger); - return new CqrsDispatcher(container, logger); + return new CqrsDispatcher( + container, + logger, + notificationPublisher ?? new SequentialNotificationPublisher()); } /// diff --git a/GFramework.Cqrs/Internal/CqrsDispatcher.cs b/GFramework.Cqrs/Internal/CqrsDispatcher.cs index d604beac..334186db 100644 --- a/GFramework.Cqrs/Internal/CqrsDispatcher.cs +++ b/GFramework.Cqrs/Internal/CqrsDispatcher.cs @@ -4,6 +4,7 @@ using GFramework.Core.Abstractions.Ioc; using GFramework.Core.Abstractions.Logging; using GFramework.Core.Abstractions.Rule; using GFramework.Cqrs.Abstractions.Cqrs; +using GFramework.Cqrs.Notification; using ICqrsRuntime = GFramework.Core.Abstractions.Cqrs.ICqrsRuntime; namespace GFramework.Cqrs.Internal; @@ -14,7 +15,8 @@ namespace GFramework.Cqrs.Internal; /// internal sealed class CqrsDispatcher( IIocContainer container, - ILogger logger) : ICqrsRuntime + ILogger logger, + INotificationPublisher notificationPublisher) : ICqrsRuntime { // 卸载安全的进程级缓存:通知类型只以弱键语义保留。 // 若插件/热重载程序集中的通知类型被卸载,对应分发绑定会自然失效,下次命中时再重新计算。 @@ -43,6 +45,10 @@ internal sealed class CqrsDispatcher( private static readonly MethodInfo StreamHandlerInvokerMethodDefinition = typeof(CqrsDispatcher) .GetMethod(nameof(InvokeStreamHandler), BindingFlags.NonPublic | BindingFlags.Static)!; + private readonly INotificationPublisher _notificationPublisher = notificationPublisher + ?? throw new ArgumentNullException( + nameof(notificationPublisher)); + /// /// 发布通知到所有已注册处理器。 /// @@ -71,11 +77,8 @@ internal sealed class CqrsDispatcher( return; } - foreach (var handler in handlers) - { - PrepareHandler(handler, context); - await dispatchBinding.Invoker(handler, notification, cancellationToken).ConfigureAwait(false); - } + var publishContext = CreateNotificationPublishContext(notification, handlers, context, dispatchBinding.Invoker); + await _notificationPublisher.PublishAsync(publishContext, cancellationToken).ConfigureAwait(false); } /// @@ -240,6 +243,50 @@ internal sealed class CqrsDispatcher( return (NotificationInvoker)Delegate.CreateDelegate(typeof(NotificationInvoker), method); } + /// + /// 为当前通知发布调用创建发布上下文,把处理器集合与执行入口收敛到同一对象。 + /// + /// 通知类型。 + /// 当前通知。 + /// 当前发布调用已解析到的处理器集合。 + /// 当前 CQRS 分发上下文。 + /// 执行单个通知处理器时复用的强类型调用委托。 + /// 供通知发布器消费的执行上下文。 + private static NotificationPublishContext CreateNotificationPublishContext( + TNotification notification, + IReadOnlyList handlers, + ICqrsContext context, + NotificationInvoker invoker) + where TNotification : INotification + { + return new DelegatingNotificationPublishContext( + notification, + handlers, + new NotificationDispatchState(context, invoker), + static (handler, currentNotification, state, currentCancellationToken) => + InvokePublishedNotificationHandlerAsync(handler, currentNotification, state, currentCancellationToken)); + } + + /// + /// 执行通知发布器选中的单个处理器,并在调用前注入当前分发上下文。 + /// + /// 通知类型。 + /// 要执行的处理器实例。 + /// 当前通知。 + /// 当前处理器执行所需的 dispatcher 状态。 + /// 取消令牌。 + /// 表示当前处理器执行完成的值任务。 + private static ValueTask InvokePublishedNotificationHandlerAsync( + object handler, + TNotification notification, + NotificationDispatchState state, + CancellationToken cancellationToken) + where TNotification : INotification + { + PrepareHandler(handler, state.Context); + return state.Invoker(handler, notification!, cancellationToken); + } + /// /// 生成流式处理器调用委托,避免每次创建流都重复反射。 /// @@ -387,6 +434,15 @@ internal sealed class CqrsDispatcher( public NotificationInvoker Invoker { get; } = invoker; } + /// + /// 保存通知发布器执行单个 handler 时需要复用的 dispatcher 状态。 + /// + /// 当前 CQRS 分发上下文。 + /// 执行单个通知处理器的强类型调用委托。 + private readonly record struct NotificationDispatchState( + ICqrsContext Context, + NotificationInvoker Invoker); + /// /// 保存流式请求分发路径所需的服务类型与调用委托。 /// 该绑定让建流热路径只需一次缓存命中即可获得解析与调用所需元数据。 diff --git a/GFramework.Cqrs/Internal/SequentialNotificationPublisher.cs b/GFramework.Cqrs/Internal/SequentialNotificationPublisher.cs new file mode 100644 index 00000000..2f40789f --- /dev/null +++ b/GFramework.Cqrs/Internal/SequentialNotificationPublisher.cs @@ -0,0 +1,35 @@ +using GFramework.Cqrs.Abstractions.Cqrs; +using GFramework.Cqrs.Notification; + +namespace GFramework.Cqrs.Internal; + +/// +/// 默认的通知发布器实现。 +/// +/// +/// 该实现完整保留当前 CQRS runtime 的既有通知语义:按已解析顺序逐个执行处理器, +/// 并在首个处理器抛出异常时立即停止后续发布。 +/// +internal sealed class SequentialNotificationPublisher : INotificationPublisher +{ + /// + /// 按既定顺序逐个执行当前通知的处理器。 + /// + /// 通知类型。 + /// 当前发布调用的执行上下文。 + /// 取消令牌。 + /// 表示通知发布完成的值任务。 + /// + public async ValueTask PublishAsync( + NotificationPublishContext context, + CancellationToken cancellationToken = default) + where TNotification : INotification + { + ArgumentNullException.ThrowIfNull(context); + + foreach (var handler in context.Handlers) + { + await context.InvokeHandlerAsync(handler, cancellationToken).ConfigureAwait(false); + } + } +} diff --git a/GFramework.Cqrs/Notification/DelegatingNotificationPublishContext.cs b/GFramework.Cqrs/Notification/DelegatingNotificationPublishContext.cs new file mode 100644 index 00000000..ee78517a --- /dev/null +++ b/GFramework.Cqrs/Notification/DelegatingNotificationPublishContext.cs @@ -0,0 +1,52 @@ +using GFramework.Cqrs.Abstractions.Cqrs; + +namespace GFramework.Cqrs.Notification; + +/// +/// 通过内部回调桥接 dispatcher 执行逻辑的通知发布上下文。 +/// +/// 通知类型。 +/// 执行单个处理器所需的内部状态类型。 +internal sealed class DelegatingNotificationPublishContext : NotificationPublishContext + where TNotification : INotification +{ + private readonly NotificationHandlerExecutor _handlerExecutor; + private readonly TState _state; + + /// + /// 初始化一个委托驱动的通知发布上下文。 + /// + /// 当前通知。 + /// 当前发布调用已解析到的处理器集合。 + /// 执行处理器时需要的内部状态。 + /// 执行单个处理器时调用的内部回调。 + /// + /// 。 + /// + internal DelegatingNotificationPublishContext( + TNotification notification, + IReadOnlyList handlers, + TState state, + NotificationHandlerExecutor handlerExecutor) + : base(notification, handlers) + { + ArgumentNullException.ThrowIfNull(handlerExecutor); + + _state = state; + _handlerExecutor = handlerExecutor; + } + + /// + /// 通过默认 dispatcher 提供的内部回调执行单个处理器。 + /// + /// 要执行的处理器实例。 + /// 取消令牌。 + /// 表示当前处理器执行完成的值任务。 + /// + public override ValueTask InvokeHandlerAsync(object handler, CancellationToken cancellationToken) + { + ArgumentNullException.ThrowIfNull(handler); + + return _handlerExecutor(handler, Notification, _state, cancellationToken); + } +} diff --git a/GFramework.Cqrs/Notification/INotificationPublisher.cs b/GFramework.Cqrs/Notification/INotificationPublisher.cs new file mode 100644 index 00000000..85f4238b --- /dev/null +++ b/GFramework.Cqrs/Notification/INotificationPublisher.cs @@ -0,0 +1,26 @@ +using GFramework.Cqrs.Abstractions.Cqrs; + +namespace GFramework.Cqrs.Notification; + +/// +/// 定义默认 CQRS runtime 的通知发布策略。 +/// +/// +/// dispatcher 会先解析当前通知对应的处理器集合,再把本次发布上下文交给该抽象决定执行顺序。 +/// 实现应把 视为当前发布调用的瞬时数据, +/// 不要跨发布缓存处理器实例或假设它们已经脱离当前上下文。 +/// +public interface INotificationPublisher +{ + /// + /// 执行一次通知发布。 + /// + /// 通知类型。 + /// 当前发布调用的处理器集合与执行入口。 + /// 取消令牌。 + /// 表示通知发布完成的值任务。 + ValueTask PublishAsync( + NotificationPublishContext context, + CancellationToken cancellationToken = default) + where TNotification : INotification; +} diff --git a/GFramework.Cqrs/Notification/NotificationHandlerExecutor.cs b/GFramework.Cqrs/Notification/NotificationHandlerExecutor.cs new file mode 100644 index 00000000..9dcf28ee --- /dev/null +++ b/GFramework.Cqrs/Notification/NotificationHandlerExecutor.cs @@ -0,0 +1,20 @@ +using GFramework.Cqrs.Abstractions.Cqrs; + +namespace GFramework.Cqrs.Notification; + +/// +/// 表示默认 dispatcher 执行单个通知处理器时使用的内部回调。 +/// +/// 通知类型。 +/// 执行当前处理器所需的内部状态类型。 +/// 要执行的处理器实例。 +/// 当前通知。 +/// 当前处理器执行所需的内部状态。 +/// 取消令牌。 +/// 表示当前处理器执行完成的值任务。 +internal delegate ValueTask NotificationHandlerExecutor( + object handler, + TNotification notification, + TState state, + CancellationToken cancellationToken) + where TNotification : INotification; diff --git a/GFramework.Cqrs/Notification/NotificationPublishContext.cs b/GFramework.Cqrs/Notification/NotificationPublishContext.cs new file mode 100644 index 00000000..0ddeda81 --- /dev/null +++ b/GFramework.Cqrs/Notification/NotificationPublishContext.cs @@ -0,0 +1,50 @@ +using GFramework.Cqrs.Abstractions.Cqrs; + +namespace GFramework.Cqrs.Notification; + +/// +/// 表示一次通知发布调用的执行上下文。 +/// +/// 通知类型。 +/// +/// 该上下文把“当前通知”“已解析处理器集合”和“执行单个处理器”的入口收敛到同一对象中, +/// 使发布策略只需决定遍历、排序或并发方式,而无需了解 dispatcher 的上下文注入细节。 +/// +public abstract class NotificationPublishContext + where TNotification : INotification +{ + /// + /// 初始化一次通知发布上下文。 + /// + /// 当前通知。 + /// 当前发布调用已解析到的处理器集合。 + /// + /// 。 + /// + protected NotificationPublishContext(TNotification notification, IReadOnlyList handlers) + { + ArgumentNullException.ThrowIfNull(notification); + ArgumentNullException.ThrowIfNull(handlers); + + Notification = notification; + Handlers = handlers; + } + + /// + /// 获取当前要发布的通知。 + /// + public TNotification Notification { get; } + + /// + /// 获取当前发布调用已解析到的处理器集合。 + /// + public IReadOnlyList Handlers { get; } + + /// + /// 执行单个通知处理器。 + /// + /// 要执行的处理器实例。 + /// 取消令牌。 + /// 表示当前处理器执行完成的值任务。 + public abstract ValueTask InvokeHandlerAsync(object handler, CancellationToken cancellationToken); +} diff --git a/GFramework.Cqrs/README.md b/GFramework.Cqrs/README.md index 62547a39..65a92db5 100644 --- a/GFramework.Cqrs/README.md +++ b/GFramework.Cqrs/README.md @@ -49,6 +49,7 @@ - 默认 runtime 与注册入口 - `CqrsRuntimeFactory.cs` - `Internal/CqrsDispatcher.cs` + - `Notification/INotificationPublisher.cs` - `Internal/CqrsHandlerRegistrar.cs` - `Internal/DefaultCqrsHandlerRegistrar.cs` - `Internal/DefaultCqrsRegistrationService.cs` @@ -122,6 +123,8 @@ var playerId = await this.SendAsync(new CreatePlayerCommand(new CreatePlayerInpu - `CqrsDispatcher` 按请求实际类型解析 `IRequestHandler<,>`,未找到处理器会抛出异常。 - 通知分发 - 通知会分发给所有已注册 `INotificationHandler<>`;零处理器时默认静默完成。 + - 默认通知发布器会按容器解析顺序逐个执行处理器,并在首个处理器抛出异常时立即停止后续分发。 + - 若容器在 runtime 创建前已显式注册 `INotificationPublisher`,默认 runtime 会复用该策略;未注册时回退到内置顺序发布器。 - 流式请求 - 通过 `IStreamRequest` 和 `IStreamRequestHandler<,>` 返回 `IAsyncEnumerable`。 - 上下文注入 diff --git a/GFramework.Tests.Common/CqrsTestRuntime.cs b/GFramework.Tests.Common/CqrsTestRuntime.cs index 6109bd54..f3ca2c7c 100644 --- a/GFramework.Tests.Common/CqrsTestRuntime.cs +++ b/GFramework.Tests.Common/CqrsTestRuntime.cs @@ -7,6 +7,7 @@ using GFramework.Core.Ioc; using GFramework.Cqrs; using GFramework.Cqrs.Abstractions.Cqrs; using GFramework.Cqrs.Command; +using GFramework.Cqrs.Notification; using LegacyICqrsRuntime = GFramework.Core.Abstractions.Cqrs.ICqrsRuntime; namespace GFramework.Tests.Common; @@ -59,7 +60,8 @@ public static class CqrsTestRuntime if (container.Get() is null) { var runtimeLogger = LoggerFactoryResolver.Provider.CreateLogger("CqrsDispatcher"); - var runtime = CqrsRuntimeFactory.CreateRuntime(container, runtimeLogger); + var notificationPublisher = container.Get(); + var runtime = CqrsRuntimeFactory.CreateRuntime(container, runtimeLogger, notificationPublisher); container.Register(runtime); container.Register((LegacyICqrsRuntime)runtime); } diff --git a/ai-plan/public/cqrs-rewrite/todos/cqrs-rewrite-migration-tracking.md b/ai-plan/public/cqrs-rewrite/todos/cqrs-rewrite-migration-tracking.md index 78545e85..fa5a809d 100644 --- a/ai-plan/public/cqrs-rewrite/todos/cqrs-rewrite-migration-tracking.md +++ b/ai-plan/public/cqrs-rewrite/todos/cqrs-rewrite-migration-tracking.md @@ -7,7 +7,7 @@ CQRS 迁移与收敛。 ## 当前恢复点 -- 恢复点编号:`CQRS-REWRITE-RP-063` +- 恢复点编号:`CQRS-REWRITE-RP-064` - 当前阶段:`Phase 8` - 当前焦点: - 已完成一轮 `CQRS vs Mediator` 只读评估归档,结论已沉淀到 `archive/todos/cqrs-vs-mediator-assessment-rp063.md` @@ -18,6 +18,14 @@ CQRS 迁移与收敛。 - 下一阶段建议优先级已收敛为:`notification publisher seam`、`dispatch/invoker 生成前移`、`pipeline 分层扩展`、 `可观测性 seam` 与 `benchmark / allocation baseline` - 当前功能历史已归档,active 跟踪仅保留 `Phase 8` 主线的恢复入口 + - 已完成一轮 notification publisher seam 最小落地:`GFramework.Cqrs` 新增 `INotificationPublisher`、 + `NotificationPublishContext` 与默认 `SequentialNotificationPublisher` + - `CqrsDispatcher` 现会在解析当前通知处理器集合后,把执行顺序委托给 publisher seam;默认行为仍保持 + “零处理器静默完成、顺序执行、首错即停” + - `CqrsRuntimeFactory`、`CqrsRuntimeModule` 与 `GFramework.Tests.Common.CqrsTestRuntime` 现支持在 runtime 创建前复用 + 容器里已显式注册的 `INotificationPublisher` + - 已补充 `CqrsNotificationPublisherTests`,覆盖自定义 publisher 接管、上下文注入、零处理器静默完成、首错即停,以及 + `RegisterInfrastructure` 默认接线复用预注册 publisher 的回归 - 已将 mixed fallback 场景进一步收敛:当 runtime 允许同一程序集声明多个 `CqrsReflectionFallbackAttribute` 实例时,generator 现会把可直接引用的 fallback handlers 与仅能按名称恢复的 fallback handlers 拆分发射 - `CqrsReflectionFallbackAttribute` 现允许多实例,以承载 `Type[]` 与字符串 fallback 元数据的组合输出 - 已将 generator 的程序集级 fallback 元数据进一步收敛:当全部 fallback handlers 都可直接引用且 runtime 暴露 `params Type[]` 合同时,生成器现优先发射 `typeof(...)` 形式的 fallback 元数据 @@ -144,9 +152,15 @@ CQRS 迁移与收敛。 - 设计吸收层面,当前已吸收统一消息模型、generator 优先注册与反射收敛思路;仍未完整吸收 publisher 策略抽象、 stream / exception pipeline、telemetry / diagnostics / benchmark 体系与 runtime 主体生成 - 详细结论与证据已归档到 `archive/todos/cqrs-vs-mediator-assessment-rp063.md` +- `2026-04-30` 已接受两条只读 subagent 结论并完成 notification publisher seam 最小实现: + - 相对 `ai-libs/Mediator`,本轮只吸收 notification publisher 的策略接缝,不照搬 `NotificationHandlers` 包装、 + 并行 publisher 或异常聚合语义 + - 当前 seam 刻意保持在默认 runtime 内部:`ICqrsRuntime.PublishAsync(...)` 外形不变,dispatcher 仍负责 handler 解析与 + `IContextAware` 上下文注入 + - 用户若需替换通知发布策略,只需在 runtime 创建前向容器显式注册 `INotificationPublisher` - 当前主线优先级: - - `notification publisher seam` 评估与设计优先 - dispatch/invoker 反射占比继续下降,并优先评估生成前移方案 + - 基于已落地 publisher seam,继续评估是否需要公开配置面、并行策略或 telemetry decorator - package / facade / 兼容层继续收口 - pipeline 分层扩展、可观测性 seam 与 benchmark baseline 进入中期候选 @@ -180,9 +194,19 @@ CQRS 迁移与收敛。 - 备注:使用显式 `GIT_DIR` / `GIT_WORK_TREE` 绑定重跑后,`1045` 个 tracked C# 文件的命名校验全部通过;本轮 `_syncRoot` 改名未引入命名规则回归 - `dotnet build GFramework.Cqrs/GFramework.Cqrs.csproj -c Release` - 结果:通过 - - 备注:`0 warning / 0 error`;本轮确认 `ai-plan` 评估与恢复文档更新未影响 `GFramework.Cqrs` 的最小 Release 构建 + - 备注:`0 warning / 0 error`;本轮确认 notification publisher seam、README 与文档更新未引入 `GFramework.Cqrs` 构建告警 +- `dotnet build GFramework.Core/GFramework.Core.csproj -c Release` + - 结果:通过 + - 备注:`0 warning / 0 error`;确认 `CqrsRuntimeModule` 接线变更未引入 `GFramework.Core` 模块构建问题 +- `dotnet test GFramework.Cqrs.Tests/GFramework.Cqrs.Tests.csproj -c Release --filter "FullyQualifiedName~CqrsNotificationPublisherTests"` + - 结果:通过 + - 备注:`5/5` 通过;覆盖自定义 publisher 顺序、上下文注入、零处理器、首错即停与默认接线复用 +- `dotnet test GFramework.Core.Tests/GFramework.Core.Tests.csproj -c Release --filter "FullyQualifiedName~MicrosoftDiContainerTests"` + - 结果:通过 + - 备注:`41/41` 通过;确认 CQRS 基础设施默认接线与容器行为未回归 ## 下一步 -1. 以 `notification publisher seam` 与 `dispatch/invoker` 生成前移为优先对象,补一轮面向实现的设计评估 -2. 单独规划旧 `Command` / `Query` API、`LegacyICqrsRuntime` 与 `Mediator` 测试命名的收口顺序,避免与 runtime 微优化混做 +1. 基于已落地的 notification publisher seam,评估是否需要第二阶段公开配置面、并行 publisher 或 telemetry decorator +2. 继续以 `dispatch/invoker` 生成前移为优先对象,补一轮面向实现的设计评估 +3. 单独规划旧 `Command` / `Query` API、`LegacyICqrsRuntime` 与 `Mediator` 测试命名的收口顺序,避免与 runtime 微优化混做 diff --git a/ai-plan/public/cqrs-rewrite/traces/cqrs-rewrite-migration-trace.md b/ai-plan/public/cqrs-rewrite/traces/cqrs-rewrite-migration-trace.md index a54aa34d..cffece52 100644 --- a/ai-plan/public/cqrs-rewrite/traces/cqrs-rewrite-migration-trace.md +++ b/ai-plan/public/cqrs-rewrite/traces/cqrs-rewrite-migration-trace.md @@ -2,6 +2,41 @@ ## 2026-04-30 +### 阶段:notification publisher seam 最小落地(CQRS-REWRITE-RP-064) + +- 本轮按 `gframework-batch-boot 50` 继续 `cqrs-rewrite`,基线使用本地现有 `origin/main` +- 当前 branch diff 相对 `origin/main` 开始时仅 `3 files / 164 lines`,远低于 `50 files` stop condition,因此继续推进真实代码切片 +- 主线程锁定 `notification publisher seam` 为本轮最低风险高收益切片,并保持关键路径在本地实现 +- 接受两条只读 subagent 结论: + - 对照 `ai-libs/Mediator` 后,只吸收 notification publisher 策略接缝,不在本轮引入并行 publisher、异常聚合或公开配置面 + - 现有仓库测试需要锁定的兼容语义是:零处理器静默完成、顺序执行、首错即停、上下文逐次注入 +- 已完成实现: + - `GFramework.Cqrs` 新增 `INotificationPublisher`、`NotificationPublishContext`、 + `DelegatingNotificationPublishContext` 与默认 `SequentialNotificationPublisher` + - `CqrsDispatcher.PublishAsync(...)` 改为解析 handlers 后构造发布上下文,并委托给 publisher seam 执行 + - `CqrsRuntimeFactory`、`CqrsRuntimeModule` 与 `GFramework.Tests.Common.CqrsTestRuntime` 现会在 runtime 创建前复用容器里已注册的 `INotificationPublisher` + - `GFramework.Cqrs.Tests` 新增 `CqrsNotificationPublisherTests`,覆盖自定义 publisher、上下文注入、零处理器、首错即停与默认接线复用 + - `GFramework.Cqrs/README.md` 与 `docs/zh-CN/core/cqrs.md` 已同步说明默认通知语义与可替换 seam +- 中途验证曾因并行 .NET 构建产生输出文件锁噪音;已改为串行重跑并获取干净结果 + +### 验证 + +- `dotnet build GFramework.Cqrs/GFramework.Cqrs.csproj -c Release` + - 结果:通过,`0 warning / 0 error` +- `dotnet build GFramework.Core/GFramework.Core.csproj -c Release` + - 结果:通过,`0 warning / 0 error` +- `dotnet test GFramework.Cqrs.Tests/GFramework.Cqrs.Tests.csproj -c Release --filter "FullyQualifiedName~CqrsNotificationPublisherTests"` + - 结果:通过,`5/5` passed +- `dotnet test GFramework.Core.Tests/GFramework.Core.Tests.csproj -c Release --filter "FullyQualifiedName~MicrosoftDiContainerTests"` + - 结果:通过,`41/41` passed +- `GIT_DIR=/mnt/f/gewuyou/System/Documents/WorkSpace/GameDev/GFramework/.git/worktrees/GFramework-cqrs GIT_WORK_TREE=/mnt/f/gewuyou/System/Documents/WorkSpace/GameDev/GFramework-WorkTree/GFramework-cqrs bash scripts/validate-csharp-naming.sh` + - 结果:通过 + +### 当前下一步 + +1. 评估 notification publisher seam 的第二阶段是否需要公开配置面、并行 publisher 或 telemetry decorator +2. 把 `dispatch/invoker` 生成前移重新拉回 `Phase 8` 主线,作为下一个实现切片 + ### 阶段:CQRS vs Mediator 评估归档(CQRS-REWRITE-RP-063) - 本轮按用户要求使用 `gframework-boot` 启动上下文后,先完成 `cqrs-rewrite` 现状核对,再并行对照 diff --git a/docs/zh-CN/core/cqrs.md b/docs/zh-CN/core/cqrs.md index 655f0b66..8ef81840 100644 --- a/docs/zh-CN/core/cqrs.md +++ b/docs/zh-CN/core/cqrs.md @@ -18,7 +18,7 @@ description: Cqrs 模块族的运行时、契约层、生成器入口,以及 | 模块 | 角色 | 何时安装 | | --- | --- | --- | | `GeWuYou.GFramework.Cqrs.Abstractions` | 纯契约层,定义 request、notification、stream、handler、pipeline、runtime seam | 需要把消息契约放到更稳定的共享层,或只依赖接口做解耦 | -| `GeWuYou.GFramework.Cqrs` | 默认 runtime,提供 dispatcher、handler 基类、上下文扩展和程序集注册流程 | 大多数直接消费 CQRS 的业务模块 | +| `GeWuYou.GFramework.Cqrs` | 默认 runtime,提供 dispatcher、notification publisher seam、handler 基类、上下文扩展和程序集注册流程 | 大多数直接消费 CQRS 的业务模块 | | `GeWuYou.GFramework.Cqrs.SourceGenerators` | 编译期生成 `ICqrsHandlerRegistry`,让运行时先走生成注册器,再只对剩余 handler 做定向 fallback | handler 较多,想把注册映射前移到编译期 | ## 最小接入路径 @@ -109,6 +109,13 @@ var playerId = await architecture.Context.SendRequestAsync( 新代码通常不需要再分别设计“命令总线”“查询总线”和另一套通知分发语义。 +当前通知分发默认仍保持顺序语义: + +- 零处理器时静默完成 +- 已解析处理器按容器顺序逐个执行 +- 首个处理器抛出异常时立即停止后续分发 +- 如果容器在 runtime 创建前已显式注册 `INotificationPublisher`,默认 runtime 会复用该策略;未注册时回退到内置顺序发布器 + ## Request 与流式变体 除了最常见的 `Command` / `Query` / `Notification`,当前公开面还覆盖两类容易被忽略的入口: