feat(core): 添加架构上下文和CQRS运行时实现

- 实现ArchitectureContext提供系统、模型、工具等组件访问管理
- 添加CqrsDispatcher作为GFramework自有CQRS运行时分发器
- 集成Microsoft.Extensions.DependencyInjection作为IoC容器适配器
- 实现完整的命令、查询、事件处理机制
- 支持上下文感知处理器注入架构上下文
- 提供管道行为链处理机制
- 实现流式请求处理功能
- 添加服务实例缓存和优先级排序支持
This commit is contained in:
GeWuYou 2026-04-15 14:41:53 +08:00
parent 3dbe1053fb
commit 048f96c6cd
32 changed files with 416 additions and 82 deletions

View File

@ -0,0 +1,51 @@
using GFramework.Core.Abstractions.Architectures;
namespace GFramework.Core.Abstractions.Cqrs;
/// <summary>
/// 定义架构上下文使用的 CQRS runtime seam。
/// 该抽象把请求分发、通知发布与流式处理从具体实现中解耦,
/// 使 <see cref="IArchitectureContext" /> 不再直接依赖某个固定的 runtime 类型。
/// </summary>
public interface ICqrsRuntime
{
/// <summary>
/// 发送请求并返回响应。
/// </summary>
/// <typeparam name="TResponse">响应类型。</typeparam>
/// <param name="context">当前架构上下文,用于上下文感知处理器注入与嵌套请求访问。</param>
/// <param name="request">要分发的请求。</param>
/// <param name="cancellationToken">取消令牌。</param>
/// <returns>请求响应。</returns>
ValueTask<TResponse> SendAsync<TResponse>(
IArchitectureContext context,
IRequest<TResponse> request,
CancellationToken cancellationToken = default);
/// <summary>
/// 发布通知到所有已注册处理器。
/// </summary>
/// <typeparam name="TNotification">通知类型。</typeparam>
/// <param name="context">当前架构上下文,用于上下文感知处理器注入。</param>
/// <param name="notification">要发布的通知。</param>
/// <param name="cancellationToken">取消令牌。</param>
/// <returns>表示通知分发完成的值任务。</returns>
ValueTask PublishAsync<TNotification>(
IArchitectureContext context,
TNotification notification,
CancellationToken cancellationToken = default)
where TNotification : INotification;
/// <summary>
/// 创建流式请求的异步响应序列。
/// </summary>
/// <typeparam name="TResponse">流元素类型。</typeparam>
/// <param name="context">当前架构上下文,用于上下文感知处理器注入。</param>
/// <param name="request">流式请求。</param>
/// <param name="cancellationToken">取消令牌。</param>
/// <returns>按需生成的异步响应序列。</returns>
IAsyncEnumerable<TResponse> CreateStream<TResponse>(
IArchitectureContext context,
IStreamRequest<TResponse> request,
CancellationToken cancellationToken = default);
}

View File

@ -17,6 +17,9 @@
<ItemGroup> <ItemGroup>
<Using Include="GFramework.Core.Abstractions"/> <Using Include="GFramework.Core.Abstractions"/>
</ItemGroup> </ItemGroup>
<ItemGroup>
<ProjectReference Include="..\GFramework.Cqrs.Abstractions\GFramework.Cqrs.Abstractions.csproj"/>
</ItemGroup>
<ItemGroup> <ItemGroup>
<PackageReference Update="Meziantou.Analyzer" Version="3.0.46"> <PackageReference Update="Meziantou.Analyzer" Version="3.0.46">
<PrivateAssets>all</PrivateAssets> <PrivateAssets>all</PrivateAssets>

View File

@ -1,4 +1,5 @@
using System.Reflection; using System.Reflection;
using GFramework.Core.Abstractions.Cqrs;
using GFramework.Core.Abstractions.Ioc; using GFramework.Core.Abstractions.Ioc;
using GFramework.Core.Abstractions.Logging; using GFramework.Core.Abstractions.Logging;
using GFramework.Core.Architectures; using GFramework.Core.Architectures;
@ -38,6 +39,65 @@ internal static class CqrsTestRuntime
?? throw new InvalidOperationException( ?? throw new InvalidOperationException(
"Failed to locate CqrsHandlerRegistrar.RegisterHandlers."); "Failed to locate CqrsHandlerRegistrar.RegisterHandlers.");
private static readonly Type CqrsDispatcherType = typeof(ArchitectureContext).Assembly
.GetType(
"GFramework.Core.Cqrs.Internal.CqrsDispatcher",
throwOnError: true)!
?? throw new InvalidOperationException(
"Failed to locate CqrsDispatcher type.");
private static readonly ConstructorInfo CqrsDispatcherConstructor = CqrsDispatcherType.GetConstructor(
BindingFlags.Instance |
BindingFlags.Public |
BindingFlags.NonPublic,
binder: null,
[
typeof(IIocContainer),
typeof(ILogger)
],
modifiers: null)
?? throw new InvalidOperationException(
"Failed to locate CqrsDispatcher constructor.");
private static readonly Type DefaultCqrsHandlerRegistrarType = typeof(ArchitectureContext).Assembly
.GetType(
"GFramework.Core.Cqrs.Internal.DefaultCqrsHandlerRegistrar",
throwOnError: true)!
?? throw new InvalidOperationException(
"Failed to locate DefaultCqrsHandlerRegistrar type.");
private static readonly ConstructorInfo DefaultCqrsHandlerRegistrarConstructor =
DefaultCqrsHandlerRegistrarType.GetConstructor(
BindingFlags.Instance | BindingFlags.Public | BindingFlags.NonPublic,
binder: null,
[
typeof(IIocContainer),
typeof(ILogger)
],
modifiers: null)
?? throw new InvalidOperationException(
"Failed to locate DefaultCqrsHandlerRegistrar constructor.");
/// <summary>
/// 为裸测试容器补齐默认 CQRS runtime seam。
/// 这使仅使用 <see cref="MicrosoftDiContainer" /> 的测试环境也能观察与生产路径一致的 runtime 行为,
/// 而无需完整启动服务模块管理器。
/// </summary>
/// <param name="container">目标测试容器。</param>
internal static void RegisterInfrastructure(MicrosoftDiContainer container)
{
ArgumentNullException.ThrowIfNull(container);
var runtimeLogger = LoggerFactoryResolver.Provider.CreateLogger("CqrsDispatcher");
var registrarLogger = LoggerFactoryResolver.Provider.CreateLogger(nameof(CqrsTestRuntime));
var runtime = (ICqrsRuntime)CqrsDispatcherConstructor.Invoke([container, runtimeLogger]);
var registrar =
(ICqrsHandlerRegistrar)DefaultCqrsHandlerRegistrarConstructor.Invoke([container, registrarLogger]);
container.Register<ICqrsRuntime>(runtime);
container.Register<ICqrsHandlerRegistrar>(registrar);
}
/// <summary> /// <summary>
/// 通过与生产代码一致的注册入口扫描并注册指定程序集中的 CQRS 处理器。 /// 通过与生产代码一致的注册入口扫描并注册指定程序集中的 CQRS 处理器。
/// </summary> /// </summary>
@ -48,6 +108,8 @@ internal static class CqrsTestRuntime
ArgumentNullException.ThrowIfNull(container); ArgumentNullException.ThrowIfNull(container);
ArgumentNullException.ThrowIfNull(assemblies); ArgumentNullException.ThrowIfNull(assemblies);
RegisterInfrastructure(container);
var logger = LoggerFactoryResolver.Provider.CreateLogger(nameof(CqrsTestRuntime)); var logger = LoggerFactoryResolver.Provider.CreateLogger(nameof(CqrsTestRuntime));
RegisterHandlersMethod.Invoke( RegisterHandlersMethod.Invoke(
null, null,

View File

@ -1,6 +1,5 @@
using System.Reflection; using System.Reflection;
using GFramework.Core.Abstractions.Bases; using GFramework.Core.Abstractions.Bases;
using GFramework.Core.Abstractions.Cqrs;
using GFramework.Core.Ioc; using GFramework.Core.Ioc;
using GFramework.Core.Logging; using GFramework.Core.Logging;
using GFramework.Core.Tests.Cqrs; using GFramework.Core.Tests.Cqrs;
@ -14,6 +13,8 @@ namespace GFramework.Core.Tests.Ioc;
[TestFixture] [TestFixture]
public class MicrosoftDiContainerTests public class MicrosoftDiContainerTests
{ {
private MicrosoftDiContainer _container = null!;
/// <summary> /// <summary>
/// 在每个测试方法执行前进行设置 /// 在每个测试方法执行前进行设置
/// </summary> /// </summary>
@ -29,9 +30,9 @@ public class MicrosoftDiContainerTests
BindingFlags.NonPublic | BindingFlags.Instance); BindingFlags.NonPublic | BindingFlags.Instance);
loggerField?.SetValue(_container, loggerField?.SetValue(_container,
LoggerFactoryResolver.Provider.CreateLogger(nameof(MicrosoftDiContainer))); LoggerFactoryResolver.Provider.CreateLogger(nameof(MicrosoftDiContainer)));
}
private MicrosoftDiContainer _container = null!; CqrsTestRuntime.RegisterInfrastructure(_container);
}
/// <summary> /// <summary>
/// 测试注册单例实例的功能 /// 测试注册单例实例的功能
@ -328,6 +329,8 @@ public class MicrosoftDiContainerTests
descriptor.ServiceType == typeof(INotificationHandler<DeterministicOrderNotification>)), descriptor.ServiceType == typeof(INotificationHandler<DeterministicOrderNotification>)),
Is.False); Is.False);
// Clear 会移除测试手工补齐的 CQRS seam需要先恢复基础设施再验证程序集去重状态是否已重置。
CqrsTestRuntime.RegisterInfrastructure(_container);
_container.RegisterCqrsHandlersFromAssembly(assembly); _container.RegisterCqrsHandlersFromAssembly(assembly);
Assert.That( Assert.That(

View File

@ -2,18 +2,13 @@ using System.Collections.Concurrent;
using GFramework.Core.Abstractions.Architectures; using GFramework.Core.Abstractions.Architectures;
using GFramework.Core.Abstractions.Command; using GFramework.Core.Abstractions.Command;
using GFramework.Core.Abstractions.Cqrs; using GFramework.Core.Abstractions.Cqrs;
using GFramework.Core.Abstractions.Cqrs.Command;
using GFramework.Core.Abstractions.Cqrs.Query;
using GFramework.Core.Abstractions.Environment; using GFramework.Core.Abstractions.Environment;
using GFramework.Core.Abstractions.Events; using GFramework.Core.Abstractions.Events;
using GFramework.Core.Abstractions.Ioc; using GFramework.Core.Abstractions.Ioc;
using GFramework.Core.Abstractions.Logging;
using GFramework.Core.Abstractions.Model; using GFramework.Core.Abstractions.Model;
using GFramework.Core.Abstractions.Query; using GFramework.Core.Abstractions.Query;
using GFramework.Core.Abstractions.Systems; using GFramework.Core.Abstractions.Systems;
using GFramework.Core.Abstractions.Utility; using GFramework.Core.Abstractions.Utility;
using GFramework.Core.Cqrs.Internal;
using GFramework.Core.Logging;
using ICommand = GFramework.Core.Abstractions.Command.ICommand; using ICommand = GFramework.Core.Abstractions.Command.ICommand;
namespace GFramework.Core.Architectures; namespace GFramework.Core.Architectures;
@ -25,15 +20,15 @@ public class ArchitectureContext(IIocContainer container) : IArchitectureContext
{ {
private readonly IIocContainer _container = container ?? throw new ArgumentNullException(nameof(container)); private readonly IIocContainer _container = container ?? throw new ArgumentNullException(nameof(container));
private readonly ConcurrentDictionary<Type, object> _serviceCache = new(); private readonly ConcurrentDictionary<Type, object> _serviceCache = new();
private readonly ILogger _logger = LoggerFactoryResolver.Provider.CreateLogger(nameof(ArchitectureContext)); private ICqrsRuntime? _cqrsRuntime;
private CqrsDispatcher? _cqrsDispatcher;
#region CQRS Integration #region CQRS Integration
/// <summary> /// <summary>
/// 获取 CQRS 运行时分发器(延迟初始化)。 /// 获取 CQRS runtime seam(延迟初始化)。
/// </summary> /// </summary>
private CqrsDispatcher CqrsDispatcher => _cqrsDispatcher ??= new CqrsDispatcher(_container, this, _logger); private ICqrsRuntime CqrsRuntime => _cqrsRuntime ??=
_container.Get<ICqrsRuntime>() ?? throw new InvalidOperationException("ICqrsRuntime not registered");
/// <summary> /// <summary>
/// 获取指定类型的服务实例,如果缓存中存在则直接返回,否则从容器中获取并缓存 /// 获取指定类型的服务实例,如果缓存中存在则直接返回,否则从容器中获取并缓存
@ -73,7 +68,7 @@ public class ArchitectureContext(IIocContainer container) : IArchitectureContext
CancellationToken cancellationToken = default) CancellationToken cancellationToken = default)
{ {
ArgumentNullException.ThrowIfNull(request); ArgumentNullException.ThrowIfNull(request);
return await CqrsDispatcher.SendAsync(request, cancellationToken); return await CqrsRuntime.SendAsync(this, request, cancellationToken);
} }
/// <summary> /// <summary>
@ -100,7 +95,7 @@ public class ArchitectureContext(IIocContainer container) : IArchitectureContext
where TNotification : INotification where TNotification : INotification
{ {
ArgumentNullException.ThrowIfNull(notification); ArgumentNullException.ThrowIfNull(notification);
await CqrsDispatcher.PublishAsync(notification, cancellationToken); await CqrsRuntime.PublishAsync(this, notification, cancellationToken);
} }
/// <summary> /// <summary>
@ -115,7 +110,7 @@ public class ArchitectureContext(IIocContainer container) : IArchitectureContext
CancellationToken cancellationToken = default) CancellationToken cancellationToken = default)
{ {
ArgumentNullException.ThrowIfNull(request); ArgumentNullException.ThrowIfNull(request);
return CqrsDispatcher.CreateStream(request, cancellationToken); return CqrsRuntime.CreateStream(this, request, cancellationToken);
} }
/// <summary> /// <summary>
@ -151,7 +146,7 @@ public class ArchitectureContext(IIocContainer container) : IArchitectureContext
/// <typeparam name="TResult">查询结果类型</typeparam> /// <typeparam name="TResult">查询结果类型</typeparam>
/// <param name="query">要发送的查询</param> /// <param name="query">要发送的查询</param>
/// <returns>查询结果</returns> /// <returns>查询结果</returns>
public TResult SendQuery<TResult>(Abstractions.Query.IQuery<TResult> query) public TResult SendQuery<TResult>(IQuery<TResult> query)
{ {
if (query == null) throw new ArgumentNullException(nameof(query)); if (query == null) throw new ArgumentNullException(nameof(query));
var queryBus = GetOrCache<IQueryExecutor>(); var queryBus = GetOrCache<IQueryExecutor>();
@ -165,7 +160,7 @@ public class ArchitectureContext(IIocContainer container) : IArchitectureContext
/// <typeparam name="TResponse">查询响应类型</typeparam> /// <typeparam name="TResponse">查询响应类型</typeparam>
/// <param name="query">要发送的查询对象</param> /// <param name="query">要发送的查询对象</param>
/// <returns>查询结果</returns> /// <returns>查询结果</returns>
public TResponse SendQuery<TResponse>(GFramework.Core.Abstractions.Cqrs.Query.IQuery<TResponse> query) public TResponse SendQuery<TResponse>(Abstractions.Cqrs.Query.IQuery<TResponse> query)
{ {
return SendQueryAsync(query).AsTask().GetAwaiter().GetResult(); return SendQueryAsync(query).AsTask().GetAwaiter().GetResult();
} }
@ -191,7 +186,7 @@ public class ArchitectureContext(IIocContainer container) : IArchitectureContext
/// <param name="query">要发送的查询对象</param> /// <param name="query">要发送的查询对象</param>
/// <param name="cancellationToken">取消令牌,用于取消操作</param> /// <param name="cancellationToken">取消令牌,用于取消操作</param>
/// <returns>包含查询结果的ValueTask</returns> /// <returns>包含查询结果的ValueTask</returns>
public async ValueTask<TResponse> SendQueryAsync<TResponse>(GFramework.Core.Abstractions.Cqrs.Query.IQuery<TResponse> query, public async ValueTask<TResponse> SendQueryAsync<TResponse>(Abstractions.Cqrs.Query.IQuery<TResponse> query,
CancellationToken cancellationToken = default) CancellationToken cancellationToken = default)
{ {
ArgumentNullException.ThrowIfNull(query); ArgumentNullException.ThrowIfNull(query);
@ -327,7 +322,7 @@ public class ArchitectureContext(IIocContainer container) : IArchitectureContext
/// <param name="command">要发送的命令对象</param> /// <param name="command">要发送的命令对象</param>
/// <param name="cancellationToken">取消令牌,用于取消操作</param> /// <param name="cancellationToken">取消令牌,用于取消操作</param>
/// <returns>包含命令执行结果的ValueTask</returns> /// <returns>包含命令执行结果的ValueTask</returns>
public async ValueTask<TResponse> SendCommandAsync<TResponse>(GFramework.Core.Abstractions.Cqrs.Command.ICommand<TResponse> command, public async ValueTask<TResponse> SendCommandAsync<TResponse>(Abstractions.Cqrs.Command.ICommand<TResponse> command,
CancellationToken cancellationToken = default) CancellationToken cancellationToken = default)
{ {
ArgumentNullException.ThrowIfNull(command); ArgumentNullException.ThrowIfNull(command);
@ -366,7 +361,7 @@ public class ArchitectureContext(IIocContainer container) : IArchitectureContext
/// <typeparam name="TResponse">命令响应类型</typeparam> /// <typeparam name="TResponse">命令响应类型</typeparam>
/// <param name="command">要发送的命令对象</param> /// <param name="command">要发送的命令对象</param>
/// <returns>命令执行结果</returns> /// <returns>命令执行结果</returns>
public TResponse SendCommand<TResponse>(GFramework.Core.Abstractions.Cqrs.Command.ICommand<TResponse> command) public TResponse SendCommand<TResponse>(Abstractions.Cqrs.Command.ICommand<TResponse> command)
{ {
return SendCommandAsync(command).AsTask().GetAwaiter().GetResult(); return SendCommandAsync(command).AsTask().GetAwaiter().GetResult();
} }
@ -388,7 +383,7 @@ public class ArchitectureContext(IIocContainer container) : IArchitectureContext
/// <typeparam name="TResult">命令执行结果类型</typeparam> /// <typeparam name="TResult">命令执行结果类型</typeparam>
/// <param name="command">要发送的命令</param> /// <param name="command">要发送的命令</param>
/// <returns>命令执行结果</returns> /// <returns>命令执行结果</returns>
public TResult SendCommand<TResult>(Abstractions.Command.ICommand<TResult> command) public TResult SendCommand<TResult>(ICommand<TResult> command)
{ {
ArgumentNullException.ThrowIfNull(command); ArgumentNullException.ThrowIfNull(command);
var commandBus = GetOrCache<ICommandExecutor>(); var commandBus = GetOrCache<ICommandExecutor>();

View File

@ -14,34 +14,70 @@ namespace GFramework.Core.Cqrs.Internal;
/// </summary> /// </summary>
internal sealed class CqrsDispatcher( internal sealed class CqrsDispatcher(
IIocContainer container, IIocContainer container,
IArchitectureContext context, ILogger logger) : ICqrsRuntime
ILogger logger)
{ {
private delegate ValueTask<object?> RequestInvoker(object handler, object request, CancellationToken cancellationToken); private static readonly ConcurrentDictionary<(Type RequestType, Type ResponseType), RequestInvoker>
private delegate ValueTask<object?> RequestPipelineInvoker( RequestInvokers = new();
object handler,
IReadOnlyList<object> behaviors, private static readonly ConcurrentDictionary<(Type RequestType, Type ResponseType), RequestPipelineInvoker>
object request, RequestPipelineInvokers = new();
CancellationToken cancellationToken);
private delegate ValueTask NotificationInvoker(object handler, object notification, CancellationToken cancellationToken);
private delegate object StreamInvoker(object handler, object request, CancellationToken cancellationToken);
private static readonly ConcurrentDictionary<(Type RequestType, Type ResponseType), RequestInvoker> RequestInvokers = new();
private static readonly ConcurrentDictionary<(Type RequestType, Type ResponseType), RequestPipelineInvoker> RequestPipelineInvokers = new();
private static readonly ConcurrentDictionary<Type, NotificationInvoker> NotificationInvokers = new(); private static readonly ConcurrentDictionary<Type, NotificationInvoker> NotificationInvokers = new();
private static readonly ConcurrentDictionary<(Type RequestType, Type ResponseType), StreamInvoker> StreamInvokers = new();
private static readonly ConcurrentDictionary<(Type RequestType, Type ResponseType), StreamInvoker> StreamInvokers =
new();
/// <summary>
/// 发布通知到所有已注册处理器。
/// </summary>
/// <typeparam name="TNotification">通知类型。</typeparam>
/// <param name="context">当前架构上下文,用于上下文感知处理器注入。</param>
/// <param name="notification">通知对象。</param>
/// <param name="cancellationToken">取消令牌。</param>
public async ValueTask PublishAsync<TNotification>(
IArchitectureContext context,
TNotification notification,
CancellationToken cancellationToken = default)
where TNotification : INotification
{
ArgumentNullException.ThrowIfNull(context);
ArgumentNullException.ThrowIfNull(notification);
var notificationType = notification.GetType();
var handlerType = typeof(INotificationHandler<>).MakeGenericType(notificationType);
var handlers = container.GetAll(handlerType);
if (handlers.Count == 0)
{
logger.Debug($"No CQRS notification handler registered for {notificationType.FullName}.");
return;
}
var invoker = NotificationInvokers.GetOrAdd(
notificationType,
CreateNotificationInvoker);
foreach (var handler in handlers)
{
PrepareHandler(handler, context);
await invoker(handler, notification, cancellationToken);
}
}
/// <summary> /// <summary>
/// 发送请求并返回结果。 /// 发送请求并返回结果。
/// </summary> /// </summary>
/// <typeparam name="TResponse">响应类型。</typeparam> /// <typeparam name="TResponse">响应类型。</typeparam>
/// <param name="context">当前架构上下文,用于上下文感知处理器注入。</param>
/// <param name="request">请求对象。</param> /// <param name="request">请求对象。</param>
/// <param name="cancellationToken">取消令牌。</param> /// <param name="cancellationToken">取消令牌。</param>
/// <returns>请求响应。</returns> /// <returns>请求响应。</returns>
public async ValueTask<TResponse> SendAsync<TResponse>( public async ValueTask<TResponse> SendAsync<TResponse>(
IArchitectureContext context,
IRequest<TResponse> request, IRequest<TResponse> request,
CancellationToken cancellationToken = default) CancellationToken cancellationToken = default)
{ {
ArgumentNullException.ThrowIfNull(context);
ArgumentNullException.ThrowIfNull(request); ArgumentNullException.ThrowIfNull(request);
var requestType = request.GetType(); var requestType = request.GetType();
@ -50,12 +86,12 @@ internal sealed class CqrsDispatcher(
?? throw new InvalidOperationException( ?? throw new InvalidOperationException(
$"No CQRS request handler registered for {requestType.FullName}."); $"No CQRS request handler registered for {requestType.FullName}.");
PrepareHandler(handler); PrepareHandler(handler, context);
var behaviorType = typeof(IPipelineBehavior<,>).MakeGenericType(requestType, typeof(TResponse)); var behaviorType = typeof(IPipelineBehavior<,>).MakeGenericType(requestType, typeof(TResponse));
var behaviors = container.GetAll(behaviorType); var behaviors = container.GetAll(behaviorType);
foreach (var behavior in behaviors) foreach (var behavior in behaviors)
PrepareHandler(behavior); PrepareHandler(behavior, context);
if (behaviors.Count == 0) if (behaviors.Count == 0)
{ {
@ -75,51 +111,20 @@ internal sealed class CqrsDispatcher(
return pipelineResult is null ? default! : (TResponse)pipelineResult; return pipelineResult is null ? default! : (TResponse)pipelineResult;
} }
/// <summary>
/// 发布通知到所有已注册处理器。
/// </summary>
/// <typeparam name="TNotification">通知类型。</typeparam>
/// <param name="notification">通知对象。</param>
/// <param name="cancellationToken">取消令牌。</param>
public async ValueTask PublishAsync<TNotification>(
TNotification notification,
CancellationToken cancellationToken = default)
where TNotification : INotification
{
ArgumentNullException.ThrowIfNull(notification);
var notificationType = notification.GetType();
var handlerType = typeof(INotificationHandler<>).MakeGenericType(notificationType);
var handlers = container.GetAll(handlerType);
if (handlers.Count == 0)
{
logger.Debug($"No CQRS notification handler registered for {notificationType.FullName}.");
return;
}
var invoker = NotificationInvokers.GetOrAdd(
notificationType,
CreateNotificationInvoker);
foreach (var handler in handlers)
{
PrepareHandler(handler);
await invoker(handler, notification, cancellationToken);
}
}
/// <summary> /// <summary>
/// 创建流式请求并返回异步响应序列。 /// 创建流式请求并返回异步响应序列。
/// </summary> /// </summary>
/// <typeparam name="TResponse">响应元素类型。</typeparam> /// <typeparam name="TResponse">响应元素类型。</typeparam>
/// <param name="context">当前架构上下文,用于上下文感知处理器注入。</param>
/// <param name="request">流式请求对象。</param> /// <param name="request">流式请求对象。</param>
/// <param name="cancellationToken">取消令牌。</param> /// <param name="cancellationToken">取消令牌。</param>
/// <returns>异步响应序列。</returns> /// <returns>异步响应序列。</returns>
public IAsyncEnumerable<TResponse> CreateStream<TResponse>( public IAsyncEnumerable<TResponse> CreateStream<TResponse>(
IArchitectureContext context,
IStreamRequest<TResponse> request, IStreamRequest<TResponse> request,
CancellationToken cancellationToken = default) CancellationToken cancellationToken = default)
{ {
ArgumentNullException.ThrowIfNull(context);
ArgumentNullException.ThrowIfNull(request); ArgumentNullException.ThrowIfNull(request);
var requestType = request.GetType(); var requestType = request.GetType();
@ -128,7 +133,7 @@ internal sealed class CqrsDispatcher(
?? throw new InvalidOperationException( ?? throw new InvalidOperationException(
$"No CQRS stream handler registered for {requestType.FullName}."); $"No CQRS stream handler registered for {requestType.FullName}.");
PrepareHandler(handler); PrepareHandler(handler, context);
var invoker = StreamInvokers.GetOrAdd( var invoker = StreamInvokers.GetOrAdd(
(requestType, typeof(TResponse)), (requestType, typeof(TResponse)),
@ -141,7 +146,8 @@ internal sealed class CqrsDispatcher(
/// 为上下文感知处理器注入当前架构上下文。 /// 为上下文感知处理器注入当前架构上下文。
/// </summary> /// </summary>
/// <param name="handler">处理器实例。</param> /// <param name="handler">处理器实例。</param>
private void PrepareHandler(object handler) /// <param name="context">当前架构上下文。</param>
private static void PrepareHandler(object handler, IArchitectureContext context)
{ {
if (handler is IContextAware contextAware) if (handler is IContextAware contextAware)
contextAware.SetContext(context); contextAware.SetContext(context);
@ -260,4 +266,18 @@ internal sealed class CqrsDispatcher(
var typedRequest = (TRequest)request; var typedRequest = (TRequest)request;
return typedHandler.Handle(typedRequest, cancellationToken); return typedHandler.Handle(typedRequest, cancellationToken);
} }
private delegate ValueTask<object?> RequestInvoker(object handler, object request,
CancellationToken cancellationToken);
private delegate ValueTask<object?> RequestPipelineInvoker(
object handler,
IReadOnlyList<object> behaviors,
object request,
CancellationToken cancellationToken);
private delegate ValueTask NotificationInvoker(object handler, object notification,
CancellationToken cancellationToken);
private delegate object StreamInvoker(object handler, object request, CancellationToken cancellationToken);
} }

View File

@ -0,0 +1,26 @@
using System.Reflection;
using GFramework.Core.Abstractions.Ioc;
using GFramework.Core.Abstractions.Logging;
namespace GFramework.Core.Cqrs.Internal;
/// <summary>
/// 默认的 CQRS 处理器注册器实现。
/// 该适配器把容器公开的 handler 接入入口转发到现有的注册流水线,
/// 使容器主路径只依赖 <see cref="ICqrsHandlerRegistrar" /> 抽象。
/// </summary>
internal sealed class DefaultCqrsHandlerRegistrar(IIocContainer container, ILogger logger) : ICqrsHandlerRegistrar
{
private readonly IIocContainer _container = container ?? throw new ArgumentNullException(nameof(container));
private readonly ILogger _logger = logger ?? throw new ArgumentNullException(nameof(logger));
/// <summary>
/// 按当前 runtime 约定扫描并注册处理器程序集。
/// </summary>
/// <param name="assemblies">要接入的程序集集合。</param>
public void RegisterHandlers(IEnumerable<Assembly> assemblies)
{
ArgumentNullException.ThrowIfNull(assemblies);
CqrsHandlerRegistrar.RegisterHandlers(_container, assemblies, _logger);
}
}

View File

@ -1,11 +1,9 @@
using System.ComponentModel; using System.ComponentModel;
using System.Reflection; using System.Reflection;
using GFramework.Core.Abstractions.Bases; using GFramework.Core.Abstractions.Bases;
using GFramework.Core.Abstractions.Cqrs;
using GFramework.Core.Abstractions.Ioc; using GFramework.Core.Abstractions.Ioc;
using GFramework.Core.Abstractions.Logging; using GFramework.Core.Abstractions.Logging;
using GFramework.Core.Abstractions.Systems; using GFramework.Core.Abstractions.Systems;
using GFramework.Core.Cqrs.Internal;
using GFramework.Core.Logging; using GFramework.Core.Logging;
using GFramework.Core.Rule; using GFramework.Core.Rule;
@ -424,7 +422,7 @@ public class MicrosoftDiContainer(IServiceCollection? serviceCollection = null)
continue; continue;
} }
CqrsHandlerRegistrar.RegisterHandlers(this, [assembly], _logger); ResolveCqrsHandlerRegistrar().RegisterHandlers([assembly]);
_registeredCqrsHandlerAssemblyKeys.Add(assemblyKey); _registeredCqrsHandlerAssemblyKeys.Add(assemblyKey);
} }
} }
@ -456,6 +454,27 @@ public class MicrosoftDiContainer(IServiceCollection? serviceCollection = null)
#region Get #region Get
/// <summary>
/// 获取当前容器中已注册的 CQRS 处理器注册器。
/// 该方法仅供容器内部在注册阶段使用,因此直接读取服务描述符中的实例绑定,
/// 避免在容器未冻结前依赖完整的服务提供者构建流程。
/// </summary>
/// <returns>已注册的 CQRS 处理器注册器实例。</returns>
/// <exception cref="InvalidOperationException">未找到可用的 CQRS 处理器注册器实例时抛出。</exception>
private ICqrsHandlerRegistrar ResolveCqrsHandlerRegistrar()
{
var descriptor = GetServicesUnsafe.LastOrDefault(static service =>
service.ServiceType == typeof(ICqrsHandlerRegistrar));
if (descriptor?.ImplementationInstance is ICqrsHandlerRegistrar registrar)
return registrar;
const string errorMessage =
"ICqrsHandlerRegistrar not registered. Ensure the CQRS runtime module has been installed before registering handlers.";
_logger.Error(errorMessage);
throw new InvalidOperationException(errorMessage);
}
/// <summary> /// <summary>
/// 获取指定泛型类型的服务实例 /// 获取指定泛型类型的服务实例
/// 返回第一个匹配的注册实例如果不存在则返回null /// 返回第一个匹配的注册实例如果不存在则返回null

View File

@ -0,0 +1,61 @@
using GFramework.Core.Abstractions.Architectures;
using GFramework.Core.Abstractions.Cqrs;
using GFramework.Core.Abstractions.Ioc;
using GFramework.Core.Cqrs.Internal;
using GFramework.Core.Logging;
namespace GFramework.Core.Services.Modules;
/// <summary>
/// CQRS runtime 模块,用于把默认请求分发器与处理器注册器接入架构容器。
/// 该模块在架构初始化早期完成注册,保证用户初始化阶段即可使用 CQRS 入口与 handler 自动接入能力。
/// </summary>
public sealed class CqrsRuntimeModule : IServiceModule
{
/// <summary>
/// 获取模块名称。
/// </summary>
public string ModuleName => nameof(CqrsRuntimeModule);
/// <summary>
/// 获取模块优先级。
/// CQRS runtime 需要先于架构默认 handler 扫描路径可用,因此放在基础总线模块之后、用户初始化之前注册。
/// </summary>
public int Priority => 15;
/// <summary>
/// 获取模块启用状态,默认启用。
/// </summary>
public bool IsEnabled => true;
/// <summary>
/// 注册默认 CQRS runtime seam 实现。
/// </summary>
/// <param name="container">目标依赖注入容器。</param>
public void Register(IIocContainer container)
{
ArgumentNullException.ThrowIfNull(container);
var dispatcherLogger = LoggerFactoryResolver.Provider.CreateLogger(nameof(CqrsDispatcher));
var registrarLogger = LoggerFactoryResolver.Provider.CreateLogger(nameof(DefaultCqrsHandlerRegistrar));
container.Register<ICqrsRuntime>(new CqrsDispatcher(container, dispatcherLogger));
container.Register<ICqrsHandlerRegistrar>(new DefaultCqrsHandlerRegistrar(container, registrarLogger));
}
/// <summary>
/// 初始化模块。
/// </summary>
public void Initialize()
{
}
/// <summary>
/// 异步销毁模块。
/// </summary>
/// <returns>已完成的值任务。</returns>
public ValueTask DestroyAsync()
{
return ValueTask.CompletedTask;
}
}

View File

@ -42,7 +42,7 @@ public sealed class ServiceModuleManager : IServiceModuleManager
/// <summary> /// <summary>
/// 注册内置服务模块,并根据优先级排序后完成服务注册。 /// 注册内置服务模块,并根据优先级排序后完成服务注册。
/// 内置模块包括事件总线、命令执行器、查询执行器等核心模块。 /// 内置模块包括事件总线、命令执行器、CQRS runtime、查询执行器等核心模块。
/// 同时注册通过 ArchitectureModuleRegistry 自动注册的外部模块。 /// 同时注册通过 ArchitectureModuleRegistry 自动注册的外部模块。
/// </summary> /// </summary>
/// <param name="container">IoC容器实例用于模块服务注册。</param> /// <param name="container">IoC容器实例用于模块服务注册。</param>
@ -57,6 +57,7 @@ public sealed class ServiceModuleManager : IServiceModuleManager
// 注册内置模块 // 注册内置模块
RegisterModule(new EventBusModule()); RegisterModule(new EventBusModule());
RegisterModule(new CommandExecutorModule()); RegisterModule(new CommandExecutorModule());
RegisterModule(new CqrsRuntimeModule());
RegisterModule(new QueryExecutorModule()); RegisterModule(new QueryExecutorModule());
RegisterModule(new AsyncQueryExecutorModule()); RegisterModule(new AsyncQueryExecutorModule());
@ -148,4 +149,4 @@ public sealed class ServiceModuleManager : IServiceModuleManager
_builtInModulesRegistered = false; _builtInModulesRegistered = false;
_logger.Info("All service modules destroyed"); _logger.Info("All service modules destroyed");
} }
} }

View File

@ -4,4 +4,4 @@
/// 命令输入接口,定义命令模式中输入数据的契约 /// 命令输入接口,定义命令模式中输入数据的契约
/// 该接口作为标记接口使用,不包含任何成员定义 /// 该接口作为标记接口使用,不包含任何成员定义
/// </summary> /// </summary>
public interface ICommandInput : IInput; public interface ICommandInput : IInput;

View File

@ -0,0 +1,17 @@
using System.Reflection;
namespace GFramework.Core.Abstractions.Cqrs;
/// <summary>
/// 定义 CQRS 处理器程序集接入的 runtime seam。
/// 该抽象负责承接“生成注册器优先、反射扫描回退”的处理器注册流程,
/// 让容器与架构启动链不再直接依赖固定的注册实现类型。
/// </summary>
public interface ICqrsHandlerRegistrar
{
/// <summary>
/// 扫描并注册指定程序集集合中的 CQRS 处理器。
/// </summary>
/// <param name="assemblies">要接入的程序集集合。</param>
void RegisterHandlers(IEnumerable<Assembly> assemblies);
}

View File

@ -17,4 +17,4 @@ namespace GFramework.Core.Abstractions.Cqrs;
/// 表示输入数据的标记接口。 /// 表示输入数据的标记接口。
/// 该接口用于标识各类CQRS模式中的输入参数类型。 /// 该接口用于标识各类CQRS模式中的输入参数类型。
/// </summary> /// </summary>
public interface IInput; public interface IInput;

View File

@ -17,4 +17,4 @@ namespace GFramework.Core.Abstractions.Cqrs.Notification;
/// 表示通知输入数据的标记接口。 /// 表示通知输入数据的标记接口。
/// 该接口继承自 IInput用于标识CQRS模式中通知类型的输入参数。 /// 该接口继承自 IInput用于标识CQRS模式中通知类型的输入参数。
/// </summary> /// </summary>
public interface INotificationInput : IInput; public interface INotificationInput : IInput;

View File

@ -3,4 +3,4 @@
/// <summary> /// <summary>
/// 查询输入接口,定义了查询操作的输入规范 /// 查询输入接口,定义了查询操作的输入规范
/// </summary> /// </summary>
public interface IQueryInput : IInput; public interface IQueryInput : IInput;

View File

@ -17,4 +17,4 @@ namespace GFramework.Core.Abstractions.Cqrs.Request;
/// 表示请求输入数据的标记接口。 /// 表示请求输入数据的标记接口。
/// 该接口继承自 IInput用于标识CQRS模式中请求类型的输入参数。 /// 该接口继承自 IInput用于标识CQRS模式中请求类型的输入参数。
/// </summary> /// </summary>
public interface IRequestInput : IInput; public interface IRequestInput : IInput;

View File

@ -0,0 +1,18 @@
<Project>
<PropertyGroup>
<TargetFramework>netstandard2.1</TargetFramework>
<ContinuousIntegrationBuild>true</ContinuousIntegrationBuild>
<EmbedUntrackedSources>true</EmbedUntrackedSources>
<LangVersion>preview</LangVersion>
</PropertyGroup>
<ItemGroup>
<PackageReference Include="Meziantou.Analyzer" Version="2.0.264">
<PrivateAssets>all</PrivateAssets>
<IncludeAssets>runtime; build; native; contentfiles; analyzers</IncludeAssets>
</PackageReference>
<PackageReference Include="Meziantou.Polyfill" Version="1.0.71">
<PrivateAssets>all</PrivateAssets>
<IncludeAssets>runtime; build; native; contentfiles; analyzers</IncludeAssets>
</PackageReference>
</ItemGroup>
</Project>

View File

@ -0,0 +1,11 @@
<Project Sdk="Microsoft.NET.Sdk">
<PropertyGroup>
<PackageId>GeWuYou.$(AssemblyName)</PackageId>
<GenerateDocumentationFile>true</GenerateDocumentationFile>
<MeziantouPolyfill_IncludedPolyfills>T:System.Diagnostics.CodeAnalysis.NotNullWhenAttribute</MeziantouPolyfill_IncludedPolyfills>
<Nullable>enable</Nullable>
<EnableGFrameworkPackageTransitiveGlobalUsings>true</EnableGFrameworkPackageTransitiveGlobalUsings>
</PropertyGroup>
</Project>

View File

@ -0,0 +1,3 @@
global using System.Collections.Generic;
global using System.Threading;
global using System.Threading.Tasks;

View File

@ -0,0 +1,16 @@
<Project Sdk="Microsoft.NET.Sdk">
<PropertyGroup>
<PackageId>GeWuYou.$(AssemblyName)</PackageId>
<TargetFrameworks>net8.0;net9.0;net10.0</TargetFrameworks>
<ImplicitUsings>disable</ImplicitUsings>
<Nullable>enable</Nullable>
<GenerateDocumentationFile>true</GenerateDocumentationFile>
<EnableGFrameworkPackageTransitiveGlobalUsings>true</EnableGFrameworkPackageTransitiveGlobalUsings>
</PropertyGroup>
<ItemGroup>
<ProjectReference Include="..\GFramework.Cqrs.Abstractions\GFramework.Cqrs.Abstractions.csproj"/>
</ItemGroup>
</Project>

View File

@ -38,6 +38,10 @@ Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "GFramework.Godot.SourceGene
EndProject EndProject
Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "GFramework.Godot.Tests", "GFramework.Godot.Tests\GFramework.Godot.Tests.csproj", "{576119E2-13D0-4ACF-A012-D01C320E8BF3}" Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "GFramework.Godot.Tests", "GFramework.Godot.Tests\GFramework.Godot.Tests.csproj", "{576119E2-13D0-4ACF-A012-D01C320E8BF3}"
EndProject EndProject
Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "GFramework.Cqrs.Abstractions", "GFramework.Cqrs.Abstractions\GFramework.Cqrs.Abstractions.csproj", "{69C06523-98AA-49DE-95D4-4BF203716DD2}"
EndProject
Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "GFramework.Cqrs", "GFramework.Cqrs\GFramework.Cqrs.csproj", "{E7034F34-0D2B-4D99-B8E2-D149EF6C88F2}"
EndProject
Global Global
GlobalSection(SolutionConfigurationPlatforms) = preSolution GlobalSection(SolutionConfigurationPlatforms) = preSolution
Debug|Any CPU = Debug|Any CPU Debug|Any CPU = Debug|Any CPU
@ -276,6 +280,30 @@ Global
{576119E2-13D0-4ACF-A012-D01C320E8BF3}.Release|x64.Build.0 = Release|Any CPU {576119E2-13D0-4ACF-A012-D01C320E8BF3}.Release|x64.Build.0 = Release|Any CPU
{576119E2-13D0-4ACF-A012-D01C320E8BF3}.Release|x86.ActiveCfg = Release|Any CPU {576119E2-13D0-4ACF-A012-D01C320E8BF3}.Release|x86.ActiveCfg = Release|Any CPU
{576119E2-13D0-4ACF-A012-D01C320E8BF3}.Release|x86.Build.0 = Release|Any CPU {576119E2-13D0-4ACF-A012-D01C320E8BF3}.Release|x86.Build.0 = Release|Any CPU
{69C06523-98AA-49DE-95D4-4BF203716DD2}.Debug|Any CPU.ActiveCfg = Debug|Any CPU
{69C06523-98AA-49DE-95D4-4BF203716DD2}.Debug|Any CPU.Build.0 = Debug|Any CPU
{69C06523-98AA-49DE-95D4-4BF203716DD2}.Debug|x64.ActiveCfg = Debug|Any CPU
{69C06523-98AA-49DE-95D4-4BF203716DD2}.Debug|x64.Build.0 = Debug|Any CPU
{69C06523-98AA-49DE-95D4-4BF203716DD2}.Debug|x86.ActiveCfg = Debug|Any CPU
{69C06523-98AA-49DE-95D4-4BF203716DD2}.Debug|x86.Build.0 = Debug|Any CPU
{69C06523-98AA-49DE-95D4-4BF203716DD2}.Release|Any CPU.ActiveCfg = Release|Any CPU
{69C06523-98AA-49DE-95D4-4BF203716DD2}.Release|Any CPU.Build.0 = Release|Any CPU
{69C06523-98AA-49DE-95D4-4BF203716DD2}.Release|x64.ActiveCfg = Release|Any CPU
{69C06523-98AA-49DE-95D4-4BF203716DD2}.Release|x64.Build.0 = Release|Any CPU
{69C06523-98AA-49DE-95D4-4BF203716DD2}.Release|x86.ActiveCfg = Release|Any CPU
{69C06523-98AA-49DE-95D4-4BF203716DD2}.Release|x86.Build.0 = Release|Any CPU
{E7034F34-0D2B-4D99-B8E2-D149EF6C88F2}.Debug|Any CPU.ActiveCfg = Debug|Any CPU
{E7034F34-0D2B-4D99-B8E2-D149EF6C88F2}.Debug|Any CPU.Build.0 = Debug|Any CPU
{E7034F34-0D2B-4D99-B8E2-D149EF6C88F2}.Debug|x64.ActiveCfg = Debug|Any CPU
{E7034F34-0D2B-4D99-B8E2-D149EF6C88F2}.Debug|x64.Build.0 = Debug|Any CPU
{E7034F34-0D2B-4D99-B8E2-D149EF6C88F2}.Debug|x86.ActiveCfg = Debug|Any CPU
{E7034F34-0D2B-4D99-B8E2-D149EF6C88F2}.Debug|x86.Build.0 = Debug|Any CPU
{E7034F34-0D2B-4D99-B8E2-D149EF6C88F2}.Release|Any CPU.ActiveCfg = Release|Any CPU
{E7034F34-0D2B-4D99-B8E2-D149EF6C88F2}.Release|Any CPU.Build.0 = Release|Any CPU
{E7034F34-0D2B-4D99-B8E2-D149EF6C88F2}.Release|x64.ActiveCfg = Release|Any CPU
{E7034F34-0D2B-4D99-B8E2-D149EF6C88F2}.Release|x64.Build.0 = Release|Any CPU
{E7034F34-0D2B-4D99-B8E2-D149EF6C88F2}.Release|x86.ActiveCfg = Release|Any CPU
{E7034F34-0D2B-4D99-B8E2-D149EF6C88F2}.Release|x86.Build.0 = Release|Any CPU
EndGlobalSection EndGlobalSection
GlobalSection(SolutionProperties) = preSolution GlobalSection(SolutionProperties) = preSolution
HideSolutionNode = FALSE HideSolutionNode = FALSE