mirror of
https://github.com/GeWuYou/GFramework.git
synced 2026-05-07 00:39:00 +08:00
Replace Mediator runtime with built-in CQRS
This commit is contained in:
parent
c2ee2209fd
commit
48e57c8547
10
AGENTS.md
10
AGENTS.md
@ -244,6 +244,16 @@ bash scripts/validate-csharp-naming.sh
|
|||||||
- Tracking updates MUST reflect completed work, newly discovered issues, validation results, and the next recommended
|
- Tracking updates MUST reflect completed work, newly discovered issues, validation results, and the next recommended
|
||||||
recovery point.
|
recovery point.
|
||||||
- Completing code changes without updating the active tracking document is considered incomplete work.
|
- Completing code changes without updating the active tracking document is considered incomplete work.
|
||||||
|
- For any multi-step refactor, migration, or cross-module task, contributors MUST create or adopt a dedicated recovery
|
||||||
|
document under `local-plan/todos/` before making substantive code changes.
|
||||||
|
- Recovery documents MUST record the current phase, the active recovery point identifier, known risks, and the next
|
||||||
|
recommended resume step so another contributor or subagent can continue the work safely.
|
||||||
|
- Contributors MUST maintain a matching execution trace under `local-plan/traces/` for complex work. The trace should
|
||||||
|
record the current date, key decisions, validation milestones, and the immediate next step.
|
||||||
|
- When a task spans multiple commits or is likely to exceed a single agent context window, update both the recovery
|
||||||
|
document and the trace at each meaningful milestone before pausing or handing work off.
|
||||||
|
- If subagents are used on a complex task, the main agent MUST capture the delegated scope and any accepted findings in
|
||||||
|
the active recovery document or trace before continuing implementation.
|
||||||
|
|
||||||
### Repository Documentation
|
### Repository Documentation
|
||||||
|
|
||||||
|
|||||||
@ -1,11 +1,13 @@
|
|||||||
using GFramework.Core.Abstractions.Command;
|
using GFramework.Core.Abstractions.Command;
|
||||||
|
using GFramework.Core.Abstractions.Cqrs;
|
||||||
|
using GFramework.Core.Abstractions.Cqrs.Command;
|
||||||
|
using GFramework.Core.Abstractions.Cqrs.Query;
|
||||||
using GFramework.Core.Abstractions.Environment;
|
using GFramework.Core.Abstractions.Environment;
|
||||||
using GFramework.Core.Abstractions.Events;
|
using GFramework.Core.Abstractions.Events;
|
||||||
using GFramework.Core.Abstractions.Model;
|
using GFramework.Core.Abstractions.Model;
|
||||||
using GFramework.Core.Abstractions.Query;
|
using GFramework.Core.Abstractions.Query;
|
||||||
using GFramework.Core.Abstractions.Systems;
|
using GFramework.Core.Abstractions.Systems;
|
||||||
using GFramework.Core.Abstractions.Utility;
|
using GFramework.Core.Abstractions.Utility;
|
||||||
using Mediator;
|
|
||||||
using ICommand = GFramework.Core.Abstractions.Command.ICommand;
|
using ICommand = GFramework.Core.Abstractions.Command.ICommand;
|
||||||
|
|
||||||
namespace GFramework.Core.Abstractions.Architectures;
|
namespace GFramework.Core.Abstractions.Architectures;
|
||||||
@ -118,12 +120,12 @@ public interface IArchitectureContext
|
|||||||
TResult SendCommand<TResult>(Command.ICommand<TResult> command);
|
TResult SendCommand<TResult>(Command.ICommand<TResult> command);
|
||||||
|
|
||||||
/// <summary>
|
/// <summary>
|
||||||
/// [Mediator] 发送命令的同步版本(不推荐,仅用于兼容性)
|
/// 发送一个 CQRS 命令并返回结果。
|
||||||
/// </summary>
|
/// </summary>
|
||||||
/// <typeparam name="TResponse">命令响应类型</typeparam>
|
/// <typeparam name="TResponse">命令响应类型。</typeparam>
|
||||||
/// <param name="command">要发送的命令对象</param>
|
/// <param name="command">要发送的 CQRS 命令。</param>
|
||||||
/// <returns>命令执行结果</returns>
|
/// <returns>命令执行结果。</returns>
|
||||||
TResponse SendCommand<TResponse>(Mediator.ICommand<TResponse> command);
|
TResponse SendCommand<TResponse>(GFramework.Core.Abstractions.Cqrs.Command.ICommand<TResponse> command);
|
||||||
|
|
||||||
|
|
||||||
/// <summary>
|
/// <summary>
|
||||||
@ -133,14 +135,13 @@ public interface IArchitectureContext
|
|||||||
Task SendCommandAsync(IAsyncCommand command);
|
Task SendCommandAsync(IAsyncCommand command);
|
||||||
|
|
||||||
/// <summary>
|
/// <summary>
|
||||||
/// [Mediator] 异步发送命令并返回结果
|
/// 异步发送一个 CQRS 命令并返回结果。
|
||||||
/// 通过Mediator模式发送命令请求,支持取消操作
|
|
||||||
/// </summary>
|
/// </summary>
|
||||||
/// <typeparam name="TResponse">命令响应类型</typeparam>
|
/// <typeparam name="TResponse">命令响应类型。</typeparam>
|
||||||
/// <param name="command">要发送的命令对象</param>
|
/// <param name="command">要发送的 CQRS 命令。</param>
|
||||||
/// <param name="cancellationToken">取消令牌,用于取消操作</param>
|
/// <param name="cancellationToken">取消令牌。</param>
|
||||||
/// <returns>包含命令执行结果的ValueTask</returns>
|
/// <returns>包含命令执行结果的值任务。</returns>
|
||||||
ValueTask<TResponse> SendCommandAsync<TResponse>(Mediator.ICommand<TResponse> command,
|
ValueTask<TResponse> SendCommandAsync<TResponse>(GFramework.Core.Abstractions.Cqrs.Command.ICommand<TResponse> command,
|
||||||
CancellationToken cancellationToken = default);
|
CancellationToken cancellationToken = default);
|
||||||
|
|
||||||
|
|
||||||
@ -161,12 +162,12 @@ public interface IArchitectureContext
|
|||||||
TResult SendQuery<TResult>(Query.IQuery<TResult> query);
|
TResult SendQuery<TResult>(Query.IQuery<TResult> query);
|
||||||
|
|
||||||
/// <summary>
|
/// <summary>
|
||||||
/// [Mediator] 发送查询的同步版本(不推荐,仅用于兼容性)
|
/// 发送一个 CQRS 查询并返回结果。
|
||||||
/// </summary>
|
/// </summary>
|
||||||
/// <typeparam name="TResponse">查询响应类型</typeparam>
|
/// <typeparam name="TResponse">查询响应类型。</typeparam>
|
||||||
/// <param name="query">要发送的查询对象</param>
|
/// <param name="query">要发送的 CQRS 查询。</param>
|
||||||
/// <returns>查询结果</returns>
|
/// <returns>查询结果。</returns>
|
||||||
TResponse SendQuery<TResponse>(Mediator.IQuery<TResponse> query);
|
TResponse SendQuery<TResponse>(GFramework.Core.Abstractions.Cqrs.Query.IQuery<TResponse> query);
|
||||||
|
|
||||||
/// <summary>
|
/// <summary>
|
||||||
/// 异步发送一个查询请求
|
/// 异步发送一个查询请求
|
||||||
@ -177,14 +178,13 @@ public interface IArchitectureContext
|
|||||||
Task<TResult> SendQueryAsync<TResult>(IAsyncQuery<TResult> query);
|
Task<TResult> SendQueryAsync<TResult>(IAsyncQuery<TResult> query);
|
||||||
|
|
||||||
/// <summary>
|
/// <summary>
|
||||||
/// [Mediator] 异步发送查询并返回结果
|
/// 异步发送一个 CQRS 查询并返回结果。
|
||||||
/// 通过Mediator模式发送查询请求,支持取消操作
|
|
||||||
/// </summary>
|
/// </summary>
|
||||||
/// <typeparam name="TResponse">查询响应类型</typeparam>
|
/// <typeparam name="TResponse">查询响应类型。</typeparam>
|
||||||
/// <param name="query">要发送的查询对象</param>
|
/// <param name="query">要发送的 CQRS 查询。</param>
|
||||||
/// <param name="cancellationToken">取消令牌,用于取消操作</param>
|
/// <param name="cancellationToken">取消令牌。</param>
|
||||||
/// <returns>包含查询结果的ValueTask</returns>
|
/// <returns>包含查询结果的值任务。</returns>
|
||||||
ValueTask<TResponse> SendQueryAsync<TResponse>(Mediator.IQuery<TResponse> query,
|
ValueTask<TResponse> SendQueryAsync<TResponse>(GFramework.Core.Abstractions.Cqrs.Query.IQuery<TResponse> query,
|
||||||
CancellationToken cancellationToken = default);
|
CancellationToken cancellationToken = default);
|
||||||
|
|
||||||
/// <summary>
|
/// <summary>
|
||||||
@ -265,4 +265,4 @@ public interface IArchitectureContext
|
|||||||
/// </summary>
|
/// </summary>
|
||||||
/// <returns>环境对象实例</returns>
|
/// <returns>环境对象实例</returns>
|
||||||
IEnvironment GetEnvironment();
|
IEnvironment GetEnvironment();
|
||||||
}
|
}
|
||||||
|
|||||||
25
GFramework.Core.Abstractions/Cqrs/Command/ICommand.cs
Normal file
25
GFramework.Core.Abstractions/Cqrs/Command/ICommand.cs
Normal file
@ -0,0 +1,25 @@
|
|||||||
|
namespace GFramework.Core.Abstractions.Cqrs.Command;
|
||||||
|
|
||||||
|
/// <summary>
|
||||||
|
/// 表示一个 CQRS 命令。
|
||||||
|
/// 命令通常用于修改系统状态。
|
||||||
|
/// </summary>
|
||||||
|
/// <typeparam name="TResponse">命令响应类型。</typeparam>
|
||||||
|
public interface ICommand<out TResponse> : IRequest<TResponse>
|
||||||
|
{
|
||||||
|
}
|
||||||
|
|
||||||
|
/// <summary>
|
||||||
|
/// 表示一个无显式返回值的 CQRS 命令。
|
||||||
|
/// </summary>
|
||||||
|
public interface ICommand : ICommand<Unit>
|
||||||
|
{
|
||||||
|
}
|
||||||
|
|
||||||
|
/// <summary>
|
||||||
|
/// 表示一个流式 CQRS 命令。
|
||||||
|
/// </summary>
|
||||||
|
/// <typeparam name="TResponse">流式响应元素类型。</typeparam>
|
||||||
|
public interface IStreamCommand<out TResponse> : IStreamRequest<TResponse>
|
||||||
|
{
|
||||||
|
}
|
||||||
9
GFramework.Core.Abstractions/Cqrs/INotification.cs
Normal file
9
GFramework.Core.Abstractions/Cqrs/INotification.cs
Normal file
@ -0,0 +1,9 @@
|
|||||||
|
namespace GFramework.Core.Abstractions.Cqrs;
|
||||||
|
|
||||||
|
/// <summary>
|
||||||
|
/// 表示一个一对多发布的通知消息。
|
||||||
|
/// 通知不要求返回值,允许被零个或多个处理器消费。
|
||||||
|
/// </summary>
|
||||||
|
public interface INotification
|
||||||
|
{
|
||||||
|
}
|
||||||
17
GFramework.Core.Abstractions/Cqrs/INotificationHandler.cs
Normal file
17
GFramework.Core.Abstractions/Cqrs/INotificationHandler.cs
Normal file
@ -0,0 +1,17 @@
|
|||||||
|
namespace GFramework.Core.Abstractions.Cqrs;
|
||||||
|
|
||||||
|
/// <summary>
|
||||||
|
/// 表示处理通知消息的处理器契约。
|
||||||
|
/// </summary>
|
||||||
|
/// <typeparam name="TNotification">通知类型。</typeparam>
|
||||||
|
public interface INotificationHandler<in TNotification>
|
||||||
|
where TNotification : INotification
|
||||||
|
{
|
||||||
|
/// <summary>
|
||||||
|
/// 处理通知消息。
|
||||||
|
/// </summary>
|
||||||
|
/// <param name="notification">要处理的通知。</param>
|
||||||
|
/// <param name="cancellationToken">取消令牌。</param>
|
||||||
|
/// <returns>异步处理任务。</returns>
|
||||||
|
ValueTask Handle(TNotification notification, CancellationToken cancellationToken);
|
||||||
|
}
|
||||||
22
GFramework.Core.Abstractions/Cqrs/IPipelineBehavior.cs
Normal file
22
GFramework.Core.Abstractions/Cqrs/IPipelineBehavior.cs
Normal file
@ -0,0 +1,22 @@
|
|||||||
|
namespace GFramework.Core.Abstractions.Cqrs;
|
||||||
|
|
||||||
|
/// <summary>
|
||||||
|
/// 定义 CQRS 请求处理前后的管道行为。
|
||||||
|
/// </summary>
|
||||||
|
/// <typeparam name="TRequest">请求类型。</typeparam>
|
||||||
|
/// <typeparam name="TResponse">响应类型。</typeparam>
|
||||||
|
public interface IPipelineBehavior<TRequest, TResponse>
|
||||||
|
where TRequest : IRequest<TResponse>
|
||||||
|
{
|
||||||
|
/// <summary>
|
||||||
|
/// 处理当前请求,并决定是否继续调用后续行为或最终处理器。
|
||||||
|
/// </summary>
|
||||||
|
/// <param name="message">当前请求消息。</param>
|
||||||
|
/// <param name="next">下一个处理委托。</param>
|
||||||
|
/// <param name="cancellationToken">取消令牌。</param>
|
||||||
|
/// <returns>请求响应。</returns>
|
||||||
|
ValueTask<TResponse> Handle(
|
||||||
|
TRequest message,
|
||||||
|
MessageHandlerDelegate<TRequest, TResponse> next,
|
||||||
|
CancellationToken cancellationToken);
|
||||||
|
}
|
||||||
10
GFramework.Core.Abstractions/Cqrs/IRequest.cs
Normal file
10
GFramework.Core.Abstractions/Cqrs/IRequest.cs
Normal file
@ -0,0 +1,10 @@
|
|||||||
|
namespace GFramework.Core.Abstractions.Cqrs;
|
||||||
|
|
||||||
|
/// <summary>
|
||||||
|
/// 表示一个有响应的 CQRS 请求。
|
||||||
|
/// 该接口是命令、查询以及其他请求语义的统一基接口。
|
||||||
|
/// </summary>
|
||||||
|
/// <typeparam name="TResponse">请求响应类型。</typeparam>
|
||||||
|
public interface IRequest<out TResponse>
|
||||||
|
{
|
||||||
|
}
|
||||||
18
GFramework.Core.Abstractions/Cqrs/IRequestHandler.cs
Normal file
18
GFramework.Core.Abstractions/Cqrs/IRequestHandler.cs
Normal file
@ -0,0 +1,18 @@
|
|||||||
|
namespace GFramework.Core.Abstractions.Cqrs;
|
||||||
|
|
||||||
|
/// <summary>
|
||||||
|
/// 表示处理单个 CQRS 请求的处理器契约。
|
||||||
|
/// </summary>
|
||||||
|
/// <typeparam name="TRequest">请求类型。</typeparam>
|
||||||
|
/// <typeparam name="TResponse">响应类型。</typeparam>
|
||||||
|
public interface IRequestHandler<in TRequest, TResponse>
|
||||||
|
where TRequest : IRequest<TResponse>
|
||||||
|
{
|
||||||
|
/// <summary>
|
||||||
|
/// 处理指定请求并返回结果。
|
||||||
|
/// </summary>
|
||||||
|
/// <param name="request">要处理的请求。</param>
|
||||||
|
/// <param name="cancellationToken">取消令牌。</param>
|
||||||
|
/// <returns>请求结果。</returns>
|
||||||
|
ValueTask<TResponse> Handle(TRequest request, CancellationToken cancellationToken);
|
||||||
|
}
|
||||||
10
GFramework.Core.Abstractions/Cqrs/IStreamRequest.cs
Normal file
10
GFramework.Core.Abstractions/Cqrs/IStreamRequest.cs
Normal file
@ -0,0 +1,10 @@
|
|||||||
|
namespace GFramework.Core.Abstractions.Cqrs;
|
||||||
|
|
||||||
|
/// <summary>
|
||||||
|
/// 表示一个流式 CQRS 请求。
|
||||||
|
/// 请求处理器可以逐步产生响应序列,而不是一次性返回完整结果。
|
||||||
|
/// </summary>
|
||||||
|
/// <typeparam name="TResponse">流式响应元素类型。</typeparam>
|
||||||
|
public interface IStreamRequest<out TResponse>
|
||||||
|
{
|
||||||
|
}
|
||||||
18
GFramework.Core.Abstractions/Cqrs/IStreamRequestHandler.cs
Normal file
18
GFramework.Core.Abstractions/Cqrs/IStreamRequestHandler.cs
Normal file
@ -0,0 +1,18 @@
|
|||||||
|
namespace GFramework.Core.Abstractions.Cqrs;
|
||||||
|
|
||||||
|
/// <summary>
|
||||||
|
/// 表示处理流式 CQRS 请求的处理器契约。
|
||||||
|
/// </summary>
|
||||||
|
/// <typeparam name="TRequest">流式请求类型。</typeparam>
|
||||||
|
/// <typeparam name="TResponse">流式响应元素类型。</typeparam>
|
||||||
|
public interface IStreamRequestHandler<in TRequest, out TResponse>
|
||||||
|
where TRequest : IStreamRequest<TResponse>
|
||||||
|
{
|
||||||
|
/// <summary>
|
||||||
|
/// 处理流式请求并返回异步响应序列。
|
||||||
|
/// </summary>
|
||||||
|
/// <param name="request">要处理的请求。</param>
|
||||||
|
/// <param name="cancellationToken">取消令牌。</param>
|
||||||
|
/// <returns>异步响应序列。</returns>
|
||||||
|
IAsyncEnumerable<TResponse> Handle(TRequest request, CancellationToken cancellationToken);
|
||||||
|
}
|
||||||
14
GFramework.Core.Abstractions/Cqrs/MessageHandlerDelegate.cs
Normal file
14
GFramework.Core.Abstractions/Cqrs/MessageHandlerDelegate.cs
Normal file
@ -0,0 +1,14 @@
|
|||||||
|
namespace GFramework.Core.Abstractions.Cqrs;
|
||||||
|
|
||||||
|
/// <summary>
|
||||||
|
/// 表示 CQRS 请求在管道中继续向下执行的处理委托。
|
||||||
|
/// </summary>
|
||||||
|
/// <typeparam name="TRequest">请求类型。</typeparam>
|
||||||
|
/// <typeparam name="TResponse">响应类型。</typeparam>
|
||||||
|
/// <param name="message">当前请求消息。</param>
|
||||||
|
/// <param name="cancellationToken">取消令牌。</param>
|
||||||
|
/// <returns>请求响应。</returns>
|
||||||
|
public delegate ValueTask<TResponse> MessageHandlerDelegate<in TRequest, TResponse>(
|
||||||
|
TRequest message,
|
||||||
|
CancellationToken cancellationToken)
|
||||||
|
where TRequest : IRequest<TResponse>;
|
||||||
18
GFramework.Core.Abstractions/Cqrs/Query/IQuery.cs
Normal file
18
GFramework.Core.Abstractions/Cqrs/Query/IQuery.cs
Normal file
@ -0,0 +1,18 @@
|
|||||||
|
namespace GFramework.Core.Abstractions.Cqrs.Query;
|
||||||
|
|
||||||
|
/// <summary>
|
||||||
|
/// 表示一个 CQRS 查询。
|
||||||
|
/// 查询用于读取数据,不应产生副作用。
|
||||||
|
/// </summary>
|
||||||
|
/// <typeparam name="TResponse">查询响应类型。</typeparam>
|
||||||
|
public interface IQuery<out TResponse> : IRequest<TResponse>
|
||||||
|
{
|
||||||
|
}
|
||||||
|
|
||||||
|
/// <summary>
|
||||||
|
/// 表示一个流式 CQRS 查询。
|
||||||
|
/// </summary>
|
||||||
|
/// <typeparam name="TResponse">流式响应元素类型。</typeparam>
|
||||||
|
public interface IStreamQuery<out TResponse> : IStreamRequest<TResponse>
|
||||||
|
{
|
||||||
|
}
|
||||||
13
GFramework.Core.Abstractions/Cqrs/Unit.cs
Normal file
13
GFramework.Core.Abstractions/Cqrs/Unit.cs
Normal file
@ -0,0 +1,13 @@
|
|||||||
|
namespace GFramework.Core.Abstractions.Cqrs;
|
||||||
|
|
||||||
|
/// <summary>
|
||||||
|
/// 表示没有实际返回值的 CQRS 响应类型。
|
||||||
|
/// 该类型用于统一命令与请求的泛型签名,避免引入外部库的 <c>Unit</c> 定义。
|
||||||
|
/// </summary>
|
||||||
|
public readonly record struct Unit
|
||||||
|
{
|
||||||
|
/// <summary>
|
||||||
|
/// 获取默认的空响应实例。
|
||||||
|
/// </summary>
|
||||||
|
public static Unit Value { get; } = new();
|
||||||
|
}
|
||||||
@ -2,14 +2,14 @@ using GFramework.Core.Abstractions.Architectures;
|
|||||||
using GFramework.Core.Abstractions.Utility;
|
using GFramework.Core.Abstractions.Utility;
|
||||||
using GFramework.Core.Architectures;
|
using GFramework.Core.Architectures;
|
||||||
using GFramework.Core.Logging;
|
using GFramework.Core.Logging;
|
||||||
using Mediator;
|
using GFramework.Core.Tests;
|
||||||
using Microsoft.Extensions.DependencyInjection;
|
using GfCqrs = GFramework.Core.Abstractions.Cqrs;
|
||||||
|
|
||||||
namespace GFramework.Core.Tests.Architectures;
|
namespace GFramework.Core.Tests.Architectures;
|
||||||
|
|
||||||
/// <summary>
|
/// <summary>
|
||||||
/// 验证 Architecture 通过 <c>ArchitectureModules</c> 暴露出的模块安装与 Mediator 行为注册能力。
|
/// 验证 Architecture 通过 <c>ArchitectureModules</c> 暴露出的模块安装与 CQRS 行为注册能力。
|
||||||
/// 这些测试覆盖模块安装回调和中介管道行为接入,确保模块管理器仍然保持可观察行为不变。
|
/// 这些测试覆盖模块安装回调和请求管道行为接入,确保模块管理器仍然保持可观察行为不变。
|
||||||
/// </summary>
|
/// </summary>
|
||||||
[TestFixture]
|
[TestFixture]
|
||||||
public class ArchitectureModulesBehaviorTests
|
public class ArchitectureModulesBehaviorTests
|
||||||
@ -57,7 +57,7 @@ public class ArchitectureModulesBehaviorTests
|
|||||||
}
|
}
|
||||||
|
|
||||||
/// <summary>
|
/// <summary>
|
||||||
/// 验证注册的 Mediator 行为会参与请求管道执行。
|
/// 验证注册的 CQRS 行为会参与请求管道执行。
|
||||||
/// </summary>
|
/// </summary>
|
||||||
[Test]
|
[Test]
|
||||||
public async Task RegisterMediatorBehavior_Should_Apply_Pipeline_Behavior_To_Request()
|
public async Task RegisterMediatorBehavior_Should_Apply_Pipeline_Behavior_To_Request()
|
||||||
@ -67,7 +67,9 @@ public class ArchitectureModulesBehaviorTests
|
|||||||
|
|
||||||
await architecture.InitializeAsync();
|
await architecture.InitializeAsync();
|
||||||
|
|
||||||
var response = await architecture.Context.SendRequestAsync(new ModuleBehaviorRequest());
|
var response = await CqrsTestRuntime.ExecutePipelineAsync<ModuleBehaviorRequest, string>(
|
||||||
|
architecture.Context,
|
||||||
|
new ModuleBehaviorRequest());
|
||||||
|
|
||||||
Assert.Multiple(() =>
|
Assert.Multiple(() =>
|
||||||
{
|
{
|
||||||
@ -83,12 +85,6 @@ public class ArchitectureModulesBehaviorTests
|
|||||||
/// </summary>
|
/// </summary>
|
||||||
private sealed class ModuleTestArchitecture(Action<ModuleTestArchitecture> registrationAction) : Architecture
|
private sealed class ModuleTestArchitecture(Action<ModuleTestArchitecture> registrationAction) : Architecture
|
||||||
{
|
{
|
||||||
/// <summary>
|
|
||||||
/// 打开 Mediator 服务注册,以便测试中介行为接入。
|
|
||||||
/// </summary>
|
|
||||||
public override Action<IServiceCollection>? Configurator =>
|
|
||||||
services => services.AddMediator(options => { options.ServiceLifetime = ServiceLifetime.Singleton; });
|
|
||||||
|
|
||||||
/// <summary>
|
/// <summary>
|
||||||
/// 在初始化阶段执行测试注入的模块注册逻辑。
|
/// 在初始化阶段执行测试注入的模块注册逻辑。
|
||||||
/// </summary>
|
/// </summary>
|
||||||
@ -136,14 +132,14 @@ public class ArchitectureModulesBehaviorTests
|
|||||||
/// <summary>
|
/// <summary>
|
||||||
/// 用于验证管道行为注册是否生效的测试请求。
|
/// 用于验证管道行为注册是否生效的测试请求。
|
||||||
/// </summary>
|
/// </summary>
|
||||||
public sealed class ModuleBehaviorRequest : IRequest<string>
|
public sealed class ModuleBehaviorRequest : GfCqrs.IRequest<string>
|
||||||
{
|
{
|
||||||
}
|
}
|
||||||
|
|
||||||
/// <summary>
|
/// <summary>
|
||||||
/// 处理测试请求的处理器。
|
/// 处理测试请求的处理器。
|
||||||
/// </summary>
|
/// </summary>
|
||||||
public sealed class ModuleBehaviorRequestHandler : IRequestHandler<ModuleBehaviorRequest, string>
|
public sealed class ModuleBehaviorRequestHandler : GfCqrs.IRequestHandler<ModuleBehaviorRequest, string>
|
||||||
{
|
{
|
||||||
/// <summary>
|
/// <summary>
|
||||||
/// 返回固定结果,便于聚焦验证管道行为是否执行。
|
/// 返回固定结果,便于聚焦验证管道行为是否执行。
|
||||||
@ -162,8 +158,8 @@ public sealed class ModuleBehaviorRequestHandler : IRequestHandler<ModuleBehavio
|
|||||||
/// </summary>
|
/// </summary>
|
||||||
/// <typeparam name="TRequest">请求类型。</typeparam>
|
/// <typeparam name="TRequest">请求类型。</typeparam>
|
||||||
/// <typeparam name="TResponse">响应类型。</typeparam>
|
/// <typeparam name="TResponse">响应类型。</typeparam>
|
||||||
public sealed class TrackingPipelineBehavior<TRequest, TResponse> : IPipelineBehavior<TRequest, TResponse>
|
public sealed class TrackingPipelineBehavior<TRequest, TResponse> : GfCqrs.IPipelineBehavior<TRequest, TResponse>
|
||||||
where TRequest : IRequest<TResponse>
|
where TRequest : GfCqrs.IRequest<TResponse>
|
||||||
{
|
{
|
||||||
/// <summary>
|
/// <summary>
|
||||||
/// 获取当前测试进程中该请求类型对应的行为触发次数。
|
/// 获取当前测试进程中该请求类型对应的行为触发次数。
|
||||||
@ -179,10 +175,10 @@ public sealed class TrackingPipelineBehavior<TRequest, TResponse> : IPipelineBeh
|
|||||||
/// <returns>下游处理器的响应结果。</returns>
|
/// <returns>下游处理器的响应结果。</returns>
|
||||||
public async ValueTask<TResponse> Handle(
|
public async ValueTask<TResponse> Handle(
|
||||||
TRequest message,
|
TRequest message,
|
||||||
MessageHandlerDelegate<TRequest, TResponse> next,
|
GfCqrs.MessageHandlerDelegate<TRequest, TResponse> next,
|
||||||
CancellationToken cancellationToken)
|
CancellationToken cancellationToken)
|
||||||
{
|
{
|
||||||
InvocationCount++;
|
InvocationCount++;
|
||||||
return await next(message, cancellationToken);
|
return await next(message, cancellationToken);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
@ -347,58 +347,61 @@ public class TestArchitectureContextV3 : IArchitectureContext
|
|||||||
{
|
{
|
||||||
}
|
}
|
||||||
|
|
||||||
public ValueTask<TResponse> SendRequestAsync<TResponse>(IRequest<TResponse> request,
|
public ValueTask<TResponse> SendRequestAsync<TResponse>(global::GFramework.Core.Abstractions.Cqrs.IRequest<TResponse> request,
|
||||||
CancellationToken cancellationToken = default)
|
CancellationToken cancellationToken = default)
|
||||||
{
|
{
|
||||||
throw new NotImplementedException();
|
throw new NotImplementedException();
|
||||||
}
|
}
|
||||||
|
|
||||||
public TResponse SendRequest<TResponse>(IRequest<TResponse> request)
|
public TResponse SendRequest<TResponse>(global::GFramework.Core.Abstractions.Cqrs.IRequest<TResponse> request)
|
||||||
{
|
{
|
||||||
throw new NotImplementedException();
|
throw new NotImplementedException();
|
||||||
}
|
}
|
||||||
|
|
||||||
public ValueTask<TResponse> SendCommandAsync<TResponse>(global::Mediator.ICommand<TResponse> command,
|
public ValueTask<TResponse> SendCommandAsync<TResponse>(
|
||||||
|
global::GFramework.Core.Abstractions.Cqrs.Command.ICommand<TResponse> command,
|
||||||
CancellationToken cancellationToken = default)
|
CancellationToken cancellationToken = default)
|
||||||
{
|
{
|
||||||
throw new NotImplementedException();
|
throw new NotImplementedException();
|
||||||
}
|
}
|
||||||
|
|
||||||
public TResponse SendCommand<TResponse>(global::Mediator.ICommand<TResponse> command)
|
public TResponse SendCommand<TResponse>(global::GFramework.Core.Abstractions.Cqrs.Command.ICommand<TResponse> command)
|
||||||
{
|
{
|
||||||
throw new NotImplementedException();
|
throw new NotImplementedException();
|
||||||
}
|
}
|
||||||
|
|
||||||
public ValueTask<TResponse> SendQueryAsync<TResponse>(global::Mediator.IQuery<TResponse> query,
|
public ValueTask<TResponse> SendQueryAsync<TResponse>(
|
||||||
|
global::GFramework.Core.Abstractions.Cqrs.Query.IQuery<TResponse> query,
|
||||||
CancellationToken cancellationToken = default)
|
CancellationToken cancellationToken = default)
|
||||||
{
|
{
|
||||||
throw new NotImplementedException();
|
throw new NotImplementedException();
|
||||||
}
|
}
|
||||||
|
|
||||||
public TResponse SendQuery<TResponse>(global::Mediator.IQuery<TResponse> query)
|
public TResponse SendQuery<TResponse>(global::GFramework.Core.Abstractions.Cqrs.Query.IQuery<TResponse> query)
|
||||||
{
|
{
|
||||||
throw new NotImplementedException();
|
throw new NotImplementedException();
|
||||||
}
|
}
|
||||||
|
|
||||||
public ValueTask PublishAsync<TNotification>(TNotification notification,
|
public ValueTask PublishAsync<TNotification>(TNotification notification,
|
||||||
CancellationToken cancellationToken = default) where TNotification : INotification
|
CancellationToken cancellationToken = default) where TNotification : global::GFramework.Core.Abstractions.Cqrs.INotification
|
||||||
{
|
{
|
||||||
throw new NotImplementedException();
|
throw new NotImplementedException();
|
||||||
}
|
}
|
||||||
|
|
||||||
public IAsyncEnumerable<TResponse> CreateStream<TResponse>(IStreamRequest<TResponse> request,
|
public IAsyncEnumerable<TResponse> CreateStream<TResponse>(
|
||||||
|
global::GFramework.Core.Abstractions.Cqrs.IStreamRequest<TResponse> request,
|
||||||
CancellationToken cancellationToken = default)
|
CancellationToken cancellationToken = default)
|
||||||
{
|
{
|
||||||
throw new NotImplementedException();
|
throw new NotImplementedException();
|
||||||
}
|
}
|
||||||
|
|
||||||
public ValueTask SendAsync<TCommand>(TCommand command, CancellationToken cancellationToken = default)
|
public ValueTask SendAsync<TCommand>(TCommand command, CancellationToken cancellationToken = default)
|
||||||
where TCommand : IRequest<Unit>
|
where TCommand : global::GFramework.Core.Abstractions.Cqrs.IRequest<global::GFramework.Core.Abstractions.Cqrs.Unit>
|
||||||
{
|
{
|
||||||
throw new NotImplementedException();
|
throw new NotImplementedException();
|
||||||
}
|
}
|
||||||
|
|
||||||
public ValueTask<TResponse> SendAsync<TResponse>(IRequest<TResponse> command,
|
public ValueTask<TResponse> SendAsync<TResponse>(global::GFramework.Core.Abstractions.Cqrs.IRequest<TResponse> command,
|
||||||
CancellationToken cancellationToken = default)
|
CancellationToken cancellationToken = default)
|
||||||
{
|
{
|
||||||
throw new NotImplementedException();
|
throw new NotImplementedException();
|
||||||
@ -439,4 +442,4 @@ public class TestArchitectureContextV3 : IArchitectureContext
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
#endregion
|
#endregion
|
||||||
|
|||||||
@ -394,58 +394,61 @@ public class TestArchitectureContext : IArchitectureContext
|
|||||||
{
|
{
|
||||||
}
|
}
|
||||||
|
|
||||||
public ValueTask<TResponse> SendRequestAsync<TResponse>(IRequest<TResponse> request,
|
public ValueTask<TResponse> SendRequestAsync<TResponse>(global::GFramework.Core.Abstractions.Cqrs.IRequest<TResponse> request,
|
||||||
CancellationToken cancellationToken = default)
|
CancellationToken cancellationToken = default)
|
||||||
{
|
{
|
||||||
throw new NotImplementedException();
|
throw new NotImplementedException();
|
||||||
}
|
}
|
||||||
|
|
||||||
public TResponse SendRequest<TResponse>(IRequest<TResponse> request)
|
public TResponse SendRequest<TResponse>(global::GFramework.Core.Abstractions.Cqrs.IRequest<TResponse> request)
|
||||||
{
|
{
|
||||||
throw new NotImplementedException();
|
throw new NotImplementedException();
|
||||||
}
|
}
|
||||||
|
|
||||||
public ValueTask<TResponse> SendCommandAsync<TResponse>(global::Mediator.ICommand<TResponse> command,
|
public ValueTask<TResponse> SendCommandAsync<TResponse>(
|
||||||
|
global::GFramework.Core.Abstractions.Cqrs.Command.ICommand<TResponse> command,
|
||||||
CancellationToken cancellationToken = default)
|
CancellationToken cancellationToken = default)
|
||||||
{
|
{
|
||||||
throw new NotImplementedException();
|
throw new NotImplementedException();
|
||||||
}
|
}
|
||||||
|
|
||||||
public TResponse SendCommand<TResponse>(global::Mediator.ICommand<TResponse> command)
|
public TResponse SendCommand<TResponse>(global::GFramework.Core.Abstractions.Cqrs.Command.ICommand<TResponse> command)
|
||||||
{
|
{
|
||||||
throw new NotImplementedException();
|
throw new NotImplementedException();
|
||||||
}
|
}
|
||||||
|
|
||||||
public ValueTask<TResponse> SendQueryAsync<TResponse>(global::Mediator.IQuery<TResponse> query,
|
public ValueTask<TResponse> SendQueryAsync<TResponse>(
|
||||||
|
global::GFramework.Core.Abstractions.Cqrs.Query.IQuery<TResponse> query,
|
||||||
CancellationToken cancellationToken = default)
|
CancellationToken cancellationToken = default)
|
||||||
{
|
{
|
||||||
throw new NotImplementedException();
|
throw new NotImplementedException();
|
||||||
}
|
}
|
||||||
|
|
||||||
public TResponse SendQuery<TResponse>(global::Mediator.IQuery<TResponse> query)
|
public TResponse SendQuery<TResponse>(global::GFramework.Core.Abstractions.Cqrs.Query.IQuery<TResponse> query)
|
||||||
{
|
{
|
||||||
throw new NotImplementedException();
|
throw new NotImplementedException();
|
||||||
}
|
}
|
||||||
|
|
||||||
public ValueTask PublishAsync<TNotification>(TNotification notification,
|
public ValueTask PublishAsync<TNotification>(TNotification notification,
|
||||||
CancellationToken cancellationToken = default) where TNotification : INotification
|
CancellationToken cancellationToken = default) where TNotification : global::GFramework.Core.Abstractions.Cqrs.INotification
|
||||||
{
|
{
|
||||||
throw new NotImplementedException();
|
throw new NotImplementedException();
|
||||||
}
|
}
|
||||||
|
|
||||||
public IAsyncEnumerable<TResponse> CreateStream<TResponse>(IStreamRequest<TResponse> request,
|
public IAsyncEnumerable<TResponse> CreateStream<TResponse>(
|
||||||
|
global::GFramework.Core.Abstractions.Cqrs.IStreamRequest<TResponse> request,
|
||||||
CancellationToken cancellationToken = default)
|
CancellationToken cancellationToken = default)
|
||||||
{
|
{
|
||||||
throw new NotImplementedException();
|
throw new NotImplementedException();
|
||||||
}
|
}
|
||||||
|
|
||||||
public ValueTask SendAsync<TCommand>(TCommand command, CancellationToken cancellationToken = default)
|
public ValueTask SendAsync<TCommand>(TCommand command, CancellationToken cancellationToken = default)
|
||||||
where TCommand : IRequest<Unit>
|
where TCommand : global::GFramework.Core.Abstractions.Cqrs.IRequest<global::GFramework.Core.Abstractions.Cqrs.Unit>
|
||||||
{
|
{
|
||||||
throw new NotImplementedException();
|
throw new NotImplementedException();
|
||||||
}
|
}
|
||||||
|
|
||||||
public ValueTask<TResponse> SendAsync<TResponse>(IRequest<TResponse> command,
|
public ValueTask<TResponse> SendAsync<TResponse>(global::GFramework.Core.Abstractions.Cqrs.IRequest<TResponse> command,
|
||||||
CancellationToken cancellationToken = default)
|
CancellationToken cancellationToken = default)
|
||||||
{
|
{
|
||||||
throw new NotImplementedException();
|
throw new NotImplementedException();
|
||||||
@ -510,4 +513,4 @@ public class TestArchitectureContext : IArchitectureContext
|
|||||||
{
|
{
|
||||||
return Environment;
|
return Environment;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
73
GFramework.Core.Tests/CqrsTestRuntime.cs
Normal file
73
GFramework.Core.Tests/CqrsTestRuntime.cs
Normal file
@ -0,0 +1,73 @@
|
|||||||
|
using System.Reflection;
|
||||||
|
using GFramework.Core.Abstractions.Architectures;
|
||||||
|
using GFramework.Core.Abstractions.Logging;
|
||||||
|
using GFramework.Core.Abstractions.Rule;
|
||||||
|
using GFramework.Core.Architectures;
|
||||||
|
using GFramework.Core.Ioc;
|
||||||
|
using GFramework.Core.Logging;
|
||||||
|
using GfCqrs = GFramework.Core.Abstractions.Cqrs;
|
||||||
|
|
||||||
|
namespace GFramework.Core.Tests;
|
||||||
|
|
||||||
|
internal static class CqrsTestRuntime
|
||||||
|
{
|
||||||
|
private static readonly MethodInfo RegisterHandlersMethod = typeof(ArchitectureContext).Assembly
|
||||||
|
.GetType("GFramework.Core.Cqrs.Internal.CqrsHandlerRegistrar", throwOnError: true)!
|
||||||
|
.GetMethod(
|
||||||
|
"RegisterHandlers",
|
||||||
|
BindingFlags.Public | BindingFlags.NonPublic | BindingFlags.Static)!
|
||||||
|
?? throw new InvalidOperationException("Failed to locate CqrsHandlerRegistrar.RegisterHandlers.");
|
||||||
|
|
||||||
|
public static void RegisterHandlers(MicrosoftDiContainer container, params Assembly[] assemblies)
|
||||||
|
{
|
||||||
|
ArgumentNullException.ThrowIfNull(container);
|
||||||
|
ArgumentNullException.ThrowIfNull(assemblies);
|
||||||
|
|
||||||
|
var logger = LoggerFactoryResolver.Provider.CreateLogger(nameof(CqrsTestRuntime));
|
||||||
|
RegisterHandlersMethod.Invoke(
|
||||||
|
null,
|
||||||
|
[container, assemblies.Where(static assembly => assembly is not null).Distinct().ToArray(), logger]);
|
||||||
|
}
|
||||||
|
|
||||||
|
public static ValueTask<TResponse> ExecutePipelineAsync<TRequest, TResponse>(
|
||||||
|
IArchitectureContext context,
|
||||||
|
TRequest request,
|
||||||
|
CancellationToken cancellationToken = default)
|
||||||
|
where TRequest : class, GfCqrs.IRequest<TResponse>
|
||||||
|
{
|
||||||
|
ArgumentNullException.ThrowIfNull(context);
|
||||||
|
ArgumentNullException.ThrowIfNull(request);
|
||||||
|
|
||||||
|
var handlers = context.GetServices<GfCqrs.IRequestHandler<TRequest, TResponse>>();
|
||||||
|
if (handlers.Count == 0)
|
||||||
|
throw new InvalidOperationException(
|
||||||
|
$"No CQRS request handler registered for {typeof(TRequest).FullName}.");
|
||||||
|
|
||||||
|
if (handlers.Count > 1)
|
||||||
|
throw new InvalidOperationException(
|
||||||
|
$"Expected a single CQRS request handler for {typeof(TRequest).FullName}, but found {handlers.Count}.");
|
||||||
|
|
||||||
|
var handler = handlers[0];
|
||||||
|
PrepareContext(handler, context);
|
||||||
|
|
||||||
|
GfCqrs.MessageHandlerDelegate<TRequest, TResponse> pipeline = handler.Handle;
|
||||||
|
|
||||||
|
var behaviors = context.GetServices<GfCqrs.IPipelineBehavior<TRequest, TResponse>>();
|
||||||
|
for (var index = behaviors.Count - 1; index >= 0; index--)
|
||||||
|
{
|
||||||
|
var behavior = behaviors[index];
|
||||||
|
PrepareContext(behavior, context);
|
||||||
|
|
||||||
|
var next = pipeline;
|
||||||
|
pipeline = (message, token) => behavior.Handle(message, next, token);
|
||||||
|
}
|
||||||
|
|
||||||
|
return pipeline(request, cancellationToken);
|
||||||
|
}
|
||||||
|
|
||||||
|
private static void PrepareContext(object instance, IArchitectureContext context)
|
||||||
|
{
|
||||||
|
if (instance is IContextAware contextAware)
|
||||||
|
contextAware.SetContext(context);
|
||||||
|
}
|
||||||
|
}
|
||||||
@ -1,10 +1,10 @@
|
|||||||
using System.Diagnostics;
|
using System.Diagnostics;
|
||||||
using System.Reflection;
|
using System.Reflection;
|
||||||
|
using GFramework.Core.Abstractions.Cqrs;
|
||||||
using GFramework.Core.Architectures;
|
using GFramework.Core.Architectures;
|
||||||
using GFramework.Core.Ioc;
|
using GFramework.Core.Ioc;
|
||||||
using GFramework.Core.Logging;
|
using GFramework.Core.Logging;
|
||||||
using Mediator;
|
using GFramework.Core.Tests;
|
||||||
using Microsoft.Extensions.DependencyInjection;
|
|
||||||
|
|
||||||
namespace GFramework.Core.Tests.Mediator;
|
namespace GFramework.Core.Tests.Mediator;
|
||||||
|
|
||||||
@ -26,11 +26,10 @@ public class MediatorAdvancedFeaturesTests
|
|||||||
loggerField?.SetValue(_container,
|
loggerField?.SetValue(_container,
|
||||||
LoggerFactoryResolver.Provider.CreateLogger(nameof(MediatorAdvancedFeaturesTests)));
|
LoggerFactoryResolver.Provider.CreateLogger(nameof(MediatorAdvancedFeaturesTests)));
|
||||||
|
|
||||||
// 注册Mediator及相关处理器
|
CqrsTestRuntime.RegisterHandlers(
|
||||||
_container.ExecuteServicesHook(configurator =>
|
_container,
|
||||||
{
|
typeof(MediatorAdvancedFeaturesTests).Assembly,
|
||||||
configurator.AddMediator(options => { options.ServiceLifetime = ServiceLifetime.Singleton; });
|
typeof(ArchitectureContext).Assembly);
|
||||||
});
|
|
||||||
|
|
||||||
_container.Freeze();
|
_container.Freeze();
|
||||||
_context = new ArchitectureContext(_container);
|
_context = new ArchitectureContext(_container);
|
||||||
@ -487,4 +486,4 @@ public sealed record TestDatabaseRequest : IRequest<string>
|
|||||||
public List<string> Storage { get; init; } = new();
|
public List<string> Storage { get; init; } = new();
|
||||||
}
|
}
|
||||||
|
|
||||||
#endregion
|
#endregion
|
||||||
|
|||||||
@ -1,12 +1,12 @@
|
|||||||
using System.Diagnostics;
|
using System.Diagnostics;
|
||||||
using System.Reflection;
|
using System.Reflection;
|
||||||
using GFramework.Core.Abstractions.Architectures;
|
using GFramework.Core.Abstractions.Architectures;
|
||||||
|
using GFramework.Core.Abstractions.Cqrs;
|
||||||
using GFramework.Core.Architectures;
|
using GFramework.Core.Architectures;
|
||||||
using GFramework.Core.Command;
|
using GFramework.Core.Command;
|
||||||
using GFramework.Core.Ioc;
|
using GFramework.Core.Ioc;
|
||||||
using GFramework.Core.Logging;
|
using GFramework.Core.Logging;
|
||||||
using Mediator;
|
using GFramework.Core.Tests;
|
||||||
using Microsoft.Extensions.DependencyInjection;
|
|
||||||
using ICommand = GFramework.Core.Abstractions.Command.ICommand;
|
using ICommand = GFramework.Core.Abstractions.Command.ICommand;
|
||||||
|
|
||||||
namespace GFramework.Core.Tests.Mediator;
|
namespace GFramework.Core.Tests.Mediator;
|
||||||
@ -33,11 +33,10 @@ public class MediatorArchitectureIntegrationTests
|
|||||||
_commandBus = new CommandExecutor();
|
_commandBus = new CommandExecutor();
|
||||||
_container.RegisterPlurality(_commandBus);
|
_container.RegisterPlurality(_commandBus);
|
||||||
|
|
||||||
// 注册Mediator
|
CqrsTestRuntime.RegisterHandlers(
|
||||||
_container.ExecuteServicesHook(configurator =>
|
_container,
|
||||||
{
|
typeof(MediatorArchitectureIntegrationTests).Assembly,
|
||||||
configurator.AddMediator(options => { options.ServiceLifetime = ServiceLifetime.Singleton; });
|
typeof(ArchitectureContext).Assembly);
|
||||||
});
|
|
||||||
|
|
||||||
_container.Freeze();
|
_container.Freeze();
|
||||||
_context = new ArchitectureContext(_container);
|
_context = new ArchitectureContext(_container);
|
||||||
@ -559,4 +558,4 @@ public class TestTraditionalCommand : ICommand
|
|||||||
public IArchitectureContext GetContext() => null!;
|
public IArchitectureContext GetContext() => null!;
|
||||||
}
|
}
|
||||||
|
|
||||||
#endregion
|
#endregion
|
||||||
|
|||||||
@ -2,6 +2,7 @@ using System.Diagnostics;
|
|||||||
using System.Reflection;
|
using System.Reflection;
|
||||||
using System.Runtime.CompilerServices;
|
using System.Runtime.CompilerServices;
|
||||||
using GFramework.Core.Abstractions.Architectures;
|
using GFramework.Core.Abstractions.Architectures;
|
||||||
|
using GFramework.Core.Abstractions.Cqrs;
|
||||||
using GFramework.Core.Abstractions.Events;
|
using GFramework.Core.Abstractions.Events;
|
||||||
using GFramework.Core.Architectures;
|
using GFramework.Core.Architectures;
|
||||||
using GFramework.Core.Command;
|
using GFramework.Core.Command;
|
||||||
@ -10,13 +11,9 @@ using GFramework.Core.Events;
|
|||||||
using GFramework.Core.Ioc;
|
using GFramework.Core.Ioc;
|
||||||
using GFramework.Core.Logging;
|
using GFramework.Core.Logging;
|
||||||
using GFramework.Core.Query;
|
using GFramework.Core.Query;
|
||||||
using Mediator;
|
using GFramework.Core.Tests;
|
||||||
using Microsoft.Extensions.DependencyInjection;
|
|
||||||
using ICommand = GFramework.Core.Abstractions.Command.ICommand;
|
using ICommand = GFramework.Core.Abstractions.Command.ICommand;
|
||||||
|
using Unit = GFramework.Core.Abstractions.Cqrs.Unit;
|
||||||
// ✅ Mediator 库的命名空间
|
|
||||||
|
|
||||||
// ✅ 使用 global using 或别名来区分
|
|
||||||
|
|
||||||
namespace GFramework.Core.Tests.Mediator;
|
namespace GFramework.Core.Tests.Mediator;
|
||||||
|
|
||||||
@ -25,7 +22,7 @@ public class MediatorComprehensiveTests
|
|||||||
{
|
{
|
||||||
/// <summary>
|
/// <summary>
|
||||||
/// 测试初始化方法,在每个测试方法执行前运行。
|
/// 测试初始化方法,在每个测试方法执行前运行。
|
||||||
/// 负责初始化日志工厂、依赖注入容器、Mediator以及各种总线服务。
|
/// 负责初始化日志工厂、依赖注入容器、自有 CQRS 处理器以及各种总线服务。
|
||||||
/// </summary>
|
/// </summary>
|
||||||
[SetUp]
|
[SetUp]
|
||||||
public void SetUp()
|
public void SetUp()
|
||||||
@ -51,13 +48,11 @@ public class MediatorComprehensiveTests
|
|||||||
_container.RegisterPlurality(_asyncQueryBus);
|
_container.RegisterPlurality(_asyncQueryBus);
|
||||||
_container.RegisterPlurality(_environment);
|
_container.RegisterPlurality(_environment);
|
||||||
|
|
||||||
// ✅ 注册 Mediator
|
CqrsTestRuntime.RegisterHandlers(
|
||||||
_container.ExecuteServicesHook(configurator =>
|
_container,
|
||||||
{
|
typeof(MediatorComprehensiveTests).Assembly,
|
||||||
configurator.AddMediator(options => { options.ServiceLifetime = ServiceLifetime.Singleton; });
|
typeof(ArchitectureContext).Assembly);
|
||||||
});
|
|
||||||
|
|
||||||
// ✅ Freeze 容器
|
|
||||||
_container.Freeze();
|
_container.Freeze();
|
||||||
|
|
||||||
_context = new ArchitectureContext(_container);
|
_context = new ArchitectureContext(_container);
|
||||||
@ -194,19 +189,19 @@ public class MediatorComprehensiveTests
|
|||||||
|
|
||||||
|
|
||||||
/// <summary>
|
/// <summary>
|
||||||
/// 测试未注册的Mediator抛出InvalidOperationException
|
/// 测试未注册的 CQRS handler 时抛出 InvalidOperationException
|
||||||
/// </summary>
|
/// </summary>
|
||||||
[Test]
|
[Test]
|
||||||
public void Unregistered_Mediator_Should_Throw_InvalidOperationException()
|
public void Unregistered_Cqrs_Handler_Should_Throw_InvalidOperationException()
|
||||||
{
|
{
|
||||||
var containerWithoutMediator = new MicrosoftDiContainer();
|
var containerWithoutHandlers = new MicrosoftDiContainer();
|
||||||
containerWithoutMediator.Freeze();
|
containerWithoutHandlers.Freeze();
|
||||||
|
|
||||||
var contextWithoutMediator = new ArchitectureContext(containerWithoutMediator);
|
var contextWithoutHandlers = new ArchitectureContext(containerWithoutHandlers);
|
||||||
var testRequest = new TestRequest { Value = 42 };
|
var testRequest = new TestRequest { Value = 42 };
|
||||||
|
|
||||||
Assert.ThrowsAsync<InvalidOperationException>(async () =>
|
Assert.ThrowsAsync<InvalidOperationException>(async () =>
|
||||||
await contextWithoutMediator.SendRequestAsync(testRequest));
|
await contextWithoutHandlers.SendRequestAsync(testRequest));
|
||||||
}
|
}
|
||||||
|
|
||||||
/// <summary>
|
/// <summary>
|
||||||
@ -270,10 +265,10 @@ public class MediatorComprehensiveTests
|
|||||||
}
|
}
|
||||||
|
|
||||||
/// <summary>
|
/// <summary>
|
||||||
/// 测试并发Mediator请求不会相互干扰
|
/// 测试并发 CQRS 请求不会相互干扰
|
||||||
/// </summary>
|
/// </summary>
|
||||||
[Test]
|
[Test]
|
||||||
public async Task Concurrent_Mediator_Requests_Should_Not_Interfere()
|
public async Task Concurrent_Cqrs_Requests_Should_Not_Interfere()
|
||||||
{
|
{
|
||||||
const int requestCount = 10;
|
const int requestCount = 10;
|
||||||
var tasks = new List<Task<int>>();
|
var tasks = new List<Task<int>>();
|
||||||
@ -389,10 +384,10 @@ public class MediatorComprehensiveTests
|
|||||||
}
|
}
|
||||||
|
|
||||||
/// <summary>
|
/// <summary>
|
||||||
/// 测试Mediator性能基准
|
/// 测试 CQRS 性能基准
|
||||||
/// </summary>
|
/// </summary>
|
||||||
[Test]
|
[Test]
|
||||||
public async Task Performance_Benchmark_For_Mediator()
|
public async Task Performance_Benchmark_For_Cqrs()
|
||||||
{
|
{
|
||||||
const int iterations = 1000;
|
const int iterations = 1000;
|
||||||
var stopwatch = Stopwatch.StartNew();
|
var stopwatch = Stopwatch.StartNew();
|
||||||
@ -413,10 +408,10 @@ public class MediatorComprehensiveTests
|
|||||||
}
|
}
|
||||||
|
|
||||||
/// <summary>
|
/// <summary>
|
||||||
/// 测试Mediator和传统CQRS可以共存
|
/// 测试自有 CQRS 和传统 CQRS 可以共存
|
||||||
/// </summary>
|
/// </summary>
|
||||||
[Test]
|
[Test]
|
||||||
public async Task Mediator_And_Legacy_CQRS_Can_Coexist()
|
public async Task Cqrs_And_Legacy_CQRS_Can_Coexist()
|
||||||
{
|
{
|
||||||
// 使用传统方式
|
// 使用传统方式
|
||||||
var legacyCommand = new TestLegacyCommand();
|
var legacyCommand = new TestLegacyCommand();
|
||||||
@ -726,4 +721,4 @@ public sealed class TestStreamRequestHandler : IStreamRequestHandler<TestStreamR
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
#endregion
|
#endregion
|
||||||
|
|||||||
@ -1,6 +1,7 @@
|
|||||||
using GFramework.Core.Abstractions.Architectures;
|
using GFramework.Core.Abstractions.Architectures;
|
||||||
using GFramework.Core.Abstractions.Environment;
|
using GFramework.Core.Abstractions.Environment;
|
||||||
using GFramework.Core.Abstractions.Logging;
|
using GFramework.Core.Abstractions.Logging;
|
||||||
|
using GFramework.Core.Cqrs.Internal;
|
||||||
using Microsoft.Extensions.DependencyInjection;
|
using Microsoft.Extensions.DependencyInjection;
|
||||||
|
|
||||||
namespace GFramework.Core.Architectures;
|
namespace GFramework.Core.Architectures;
|
||||||
@ -92,16 +93,20 @@ internal sealed class ArchitectureBootstrapper(
|
|||||||
|
|
||||||
/// <summary>
|
/// <summary>
|
||||||
/// 为服务容器设置上下文并执行扩展配置钩子。
|
/// 为服务容器设置上下文并执行扩展配置钩子。
|
||||||
/// 这一步统一承接 Mediator 等容器扩展的接入点,避免 <see cref="Architecture" /> 直接操作容器细节。
|
/// 这一步统一承接 CQRS 运行时与容器扩展的接入点,避免 <see cref="Architecture" /> 直接操作容器细节。
|
||||||
/// </summary>
|
/// </summary>
|
||||||
/// <param name="context">当前架构上下文。</param>
|
/// <param name="context">当前架构上下文。</param>
|
||||||
/// <param name="configurator">可选的服务集合配置委托。</param>
|
/// <param name="configurator">可选的服务集合配置委托。</param>
|
||||||
private void ConfigureServices(IArchitectureContext context, Action<IServiceCollection>? configurator)
|
private void ConfigureServices(IArchitectureContext context, Action<IServiceCollection>? configurator)
|
||||||
{
|
{
|
||||||
services.SetContext(context);
|
services.SetContext(context);
|
||||||
|
CqrsHandlerRegistrar.RegisterHandlers(
|
||||||
|
services.Container,
|
||||||
|
[architectureType.Assembly, typeof(ArchitectureContext).Assembly],
|
||||||
|
logger);
|
||||||
|
|
||||||
if (configurator is null)
|
if (configurator is null)
|
||||||
logger.Debug("Mediator-based cqrs will not take effect without the service setter configured!");
|
logger.Debug("No external service configurator provided. Using built-in CQRS runtime registration only.");
|
||||||
|
|
||||||
services.Container.ExecuteServicesHook(configurator);
|
services.Container.ExecuteServicesHook(configurator);
|
||||||
}
|
}
|
||||||
@ -115,4 +120,4 @@ internal sealed class ArchitectureBootstrapper(
|
|||||||
{
|
{
|
||||||
await services.ModuleManager.InitializeAllAsync(asyncMode);
|
await services.ModuleManager.InitializeAllAsync(asyncMode);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
@ -1,14 +1,19 @@
|
|||||||
using System.Collections.Concurrent;
|
using System.Collections.Concurrent;
|
||||||
using GFramework.Core.Abstractions.Architectures;
|
using GFramework.Core.Abstractions.Architectures;
|
||||||
using GFramework.Core.Abstractions.Command;
|
using GFramework.Core.Abstractions.Command;
|
||||||
|
using GFramework.Core.Abstractions.Cqrs;
|
||||||
|
using GFramework.Core.Abstractions.Cqrs.Command;
|
||||||
|
using GFramework.Core.Abstractions.Cqrs.Query;
|
||||||
using GFramework.Core.Abstractions.Environment;
|
using GFramework.Core.Abstractions.Environment;
|
||||||
using GFramework.Core.Abstractions.Events;
|
using GFramework.Core.Abstractions.Events;
|
||||||
using GFramework.Core.Abstractions.Ioc;
|
using GFramework.Core.Abstractions.Ioc;
|
||||||
|
using GFramework.Core.Abstractions.Logging;
|
||||||
using GFramework.Core.Abstractions.Model;
|
using GFramework.Core.Abstractions.Model;
|
||||||
using GFramework.Core.Abstractions.Query;
|
using GFramework.Core.Abstractions.Query;
|
||||||
using GFramework.Core.Abstractions.Systems;
|
using GFramework.Core.Abstractions.Systems;
|
||||||
using GFramework.Core.Abstractions.Utility;
|
using GFramework.Core.Abstractions.Utility;
|
||||||
using Mediator;
|
using GFramework.Core.Cqrs.Internal;
|
||||||
|
using GFramework.Core.Logging;
|
||||||
using ICommand = GFramework.Core.Abstractions.Command.ICommand;
|
using ICommand = GFramework.Core.Abstractions.Command.ICommand;
|
||||||
|
|
||||||
namespace GFramework.Core.Architectures;
|
namespace GFramework.Core.Architectures;
|
||||||
@ -20,23 +25,15 @@ public class ArchitectureContext(IIocContainer container) : IArchitectureContext
|
|||||||
{
|
{
|
||||||
private readonly IIocContainer _container = container ?? throw new ArgumentNullException(nameof(container));
|
private readonly IIocContainer _container = container ?? throw new ArgumentNullException(nameof(container));
|
||||||
private readonly ConcurrentDictionary<Type, object> _serviceCache = new();
|
private readonly ConcurrentDictionary<Type, object> _serviceCache = new();
|
||||||
|
private readonly ILogger _logger = LoggerFactoryResolver.Provider.CreateLogger(nameof(ArchitectureContext));
|
||||||
|
private CqrsDispatcher? _cqrsDispatcher;
|
||||||
|
|
||||||
#region Mediator Integration
|
#region CQRS Integration
|
||||||
|
|
||||||
/// <summary>
|
/// <summary>
|
||||||
/// 获取 Mediator 实例(延迟加载)
|
/// 获取 CQRS 运行时分发器(延迟初始化)。
|
||||||
/// </summary>
|
/// </summary>
|
||||||
private IMediator Mediator => GetOrCache<IMediator>();
|
private CqrsDispatcher CqrsDispatcher => _cqrsDispatcher ??= new CqrsDispatcher(_container, this, _logger);
|
||||||
|
|
||||||
/// <summary>
|
|
||||||
/// 获取 ISender 实例(更轻量的发送器)
|
|
||||||
/// </summary>
|
|
||||||
private ISender Sender => GetOrCache<ISender>();
|
|
||||||
|
|
||||||
/// <summary>
|
|
||||||
/// 获取 IPublisher 实例(用于发布通知)
|
|
||||||
/// </summary>
|
|
||||||
private IPublisher Publisher => GetOrCache<IPublisher>();
|
|
||||||
|
|
||||||
/// <summary>
|
/// <summary>
|
||||||
/// 获取指定类型的服务实例,如果缓存中存在则直接返回,否则从容器中获取并缓存
|
/// 获取指定类型的服务实例,如果缓存中存在则直接返回,否则从容器中获取并缓存
|
||||||
@ -64,30 +61,23 @@ public class ArchitectureContext(IIocContainer container) : IArchitectureContext
|
|||||||
}
|
}
|
||||||
|
|
||||||
/// <summary>
|
/// <summary>
|
||||||
/// [Mediator] 发送请求(Command/Query)
|
/// 发送请求(Command/Query)
|
||||||
/// 这是推荐的新方式,统一处理命令和查询
|
/// 使用 GFramework 自有 CQRS runtime 统一处理命令和查询。
|
||||||
/// </summary>
|
/// </summary>
|
||||||
/// <typeparam name="TResponse">响应类型</typeparam>
|
/// <typeparam name="TResponse">响应类型</typeparam>
|
||||||
/// <param name="request">请求对象(Command 或 Query)</param>
|
/// <param name="request">请求对象(Command 或 Query)</param>
|
||||||
/// <param name="cancellationToken">取消令牌</param>
|
/// <param name="cancellationToken">取消令牌</param>
|
||||||
/// <returns>响应结果</returns>
|
/// <returns>响应结果</returns>
|
||||||
/// <exception cref="InvalidOperationException">当 Mediator 未注册时抛出</exception>
|
|
||||||
public async ValueTask<TResponse> SendRequestAsync<TResponse>(
|
public async ValueTask<TResponse> SendRequestAsync<TResponse>(
|
||||||
IRequest<TResponse> request,
|
IRequest<TResponse> request,
|
||||||
CancellationToken cancellationToken = default)
|
CancellationToken cancellationToken = default)
|
||||||
{
|
{
|
||||||
ArgumentNullException.ThrowIfNull(request);
|
ArgumentNullException.ThrowIfNull(request);
|
||||||
|
return await CqrsDispatcher.SendAsync(request, cancellationToken);
|
||||||
var mediator = Mediator;
|
|
||||||
if (mediator == null)
|
|
||||||
throw new InvalidOperationException(
|
|
||||||
"Mediator not registered. Call EnableMediator() in your Architecture.OnInitialize() method.");
|
|
||||||
|
|
||||||
return await mediator.Send(request, cancellationToken);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
/// <summary>
|
/// <summary>
|
||||||
/// [Mediator] 发送请求的同步版本(不推荐,仅用于兼容性)
|
/// 发送请求的同步版本(不推荐,仅用于兼容性)
|
||||||
/// </summary>
|
/// </summary>
|
||||||
/// <typeparam name="TResponse">响应类型</typeparam>
|
/// <typeparam name="TResponse">响应类型</typeparam>
|
||||||
/// <param name="request">请求对象</param>
|
/// <param name="request">请求对象</param>
|
||||||
@ -98,8 +88,8 @@ public class ArchitectureContext(IIocContainer container) : IArchitectureContext
|
|||||||
}
|
}
|
||||||
|
|
||||||
/// <summary>
|
/// <summary>
|
||||||
/// [Mediator] 发布通知(一对多)
|
/// 发布通知(一对多)
|
||||||
/// 用于事件驱动场景,多个处理器可以同时处理同一个通知
|
/// 使用 GFramework 自有 CQRS runtime 分发到所有已注册通知处理器。
|
||||||
/// </summary>
|
/// </summary>
|
||||||
/// <typeparam name="TNotification">通知类型</typeparam>
|
/// <typeparam name="TNotification">通知类型</typeparam>
|
||||||
/// <param name="notification">通知对象</param>
|
/// <param name="notification">通知对象</param>
|
||||||
@ -110,16 +100,11 @@ public class ArchitectureContext(IIocContainer container) : IArchitectureContext
|
|||||||
where TNotification : INotification
|
where TNotification : INotification
|
||||||
{
|
{
|
||||||
ArgumentNullException.ThrowIfNull(notification);
|
ArgumentNullException.ThrowIfNull(notification);
|
||||||
|
await CqrsDispatcher.PublishAsync(notification, cancellationToken);
|
||||||
var publisher = Publisher;
|
|
||||||
if (publisher == null)
|
|
||||||
throw new InvalidOperationException("Publisher not registered.");
|
|
||||||
|
|
||||||
await publisher.Publish(notification, cancellationToken);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
/// <summary>
|
/// <summary>
|
||||||
/// [Mediator] 发送请求并返回流(用于大数据集)
|
/// 发送请求并返回流(用于大数据集)
|
||||||
/// </summary>
|
/// </summary>
|
||||||
/// <typeparam name="TResponse">响应项类型</typeparam>
|
/// <typeparam name="TResponse">响应项类型</typeparam>
|
||||||
/// <param name="request">流式请求</param>
|
/// <param name="request">流式请求</param>
|
||||||
@ -130,12 +115,7 @@ public class ArchitectureContext(IIocContainer container) : IArchitectureContext
|
|||||||
CancellationToken cancellationToken = default)
|
CancellationToken cancellationToken = default)
|
||||||
{
|
{
|
||||||
ArgumentNullException.ThrowIfNull(request);
|
ArgumentNullException.ThrowIfNull(request);
|
||||||
|
return CqrsDispatcher.CreateStream(request, cancellationToken);
|
||||||
var mediator = Mediator;
|
|
||||||
if (mediator == null)
|
|
||||||
throw new InvalidOperationException("Mediator not registered.");
|
|
||||||
|
|
||||||
return mediator.CreateStream(request, cancellationToken);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
/// <summary>
|
/// <summary>
|
||||||
@ -180,12 +160,12 @@ public class ArchitectureContext(IIocContainer container) : IArchitectureContext
|
|||||||
}
|
}
|
||||||
|
|
||||||
/// <summary>
|
/// <summary>
|
||||||
/// [Mediator] 发送查询的同步版本(不推荐,仅用于兼容性)
|
/// 发送 CQRS 查询的同步版本(不推荐,仅用于兼容性)
|
||||||
/// </summary>
|
/// </summary>
|
||||||
/// <typeparam name="TResponse">查询响应类型</typeparam>
|
/// <typeparam name="TResponse">查询响应类型</typeparam>
|
||||||
/// <param name="query">要发送的查询对象</param>
|
/// <param name="query">要发送的查询对象</param>
|
||||||
/// <returns>查询结果</returns>
|
/// <returns>查询结果</returns>
|
||||||
public TResponse SendQuery<TResponse>(Mediator.IQuery<TResponse> query)
|
public TResponse SendQuery<TResponse>(GFramework.Core.Abstractions.Cqrs.Query.IQuery<TResponse> query)
|
||||||
{
|
{
|
||||||
return SendQueryAsync(query).AsTask().GetAwaiter().GetResult();
|
return SendQueryAsync(query).AsTask().GetAwaiter().GetResult();
|
||||||
}
|
}
|
||||||
@ -205,23 +185,17 @@ public class ArchitectureContext(IIocContainer container) : IArchitectureContext
|
|||||||
}
|
}
|
||||||
|
|
||||||
/// <summary>
|
/// <summary>
|
||||||
/// [Mediator] 异步发送查询并返回结果
|
/// 异步发送 CQRS 查询并返回结果。
|
||||||
/// 通过Mediator模式发送查询请求,支持取消操作
|
|
||||||
/// </summary>
|
/// </summary>
|
||||||
/// <typeparam name="TResponse">查询响应类型</typeparam>
|
/// <typeparam name="TResponse">查询响应类型</typeparam>
|
||||||
/// <param name="query">要发送的查询对象</param>
|
/// <param name="query">要发送的查询对象</param>
|
||||||
/// <param name="cancellationToken">取消令牌,用于取消操作</param>
|
/// <param name="cancellationToken">取消令牌,用于取消操作</param>
|
||||||
/// <returns>包含查询结果的ValueTask</returns>
|
/// <returns>包含查询结果的ValueTask</returns>
|
||||||
public async ValueTask<TResponse> SendQueryAsync<TResponse>(Mediator.IQuery<TResponse> query,
|
public async ValueTask<TResponse> SendQueryAsync<TResponse>(GFramework.Core.Abstractions.Cqrs.Query.IQuery<TResponse> query,
|
||||||
CancellationToken cancellationToken = default)
|
CancellationToken cancellationToken = default)
|
||||||
{
|
{
|
||||||
ArgumentNullException.ThrowIfNull(query);
|
ArgumentNullException.ThrowIfNull(query);
|
||||||
|
return await SendRequestAsync(query, cancellationToken);
|
||||||
var sender = Sender;
|
|
||||||
if (sender == null)
|
|
||||||
throw new InvalidOperationException("Sender not registered.");
|
|
||||||
|
|
||||||
return await sender.Send(query, cancellationToken);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
#endregion
|
#endregion
|
||||||
@ -347,23 +321,17 @@ public class ArchitectureContext(IIocContainer container) : IArchitectureContext
|
|||||||
#region Command Execution
|
#region Command Execution
|
||||||
|
|
||||||
/// <summary>
|
/// <summary>
|
||||||
/// [Mediator] 异步发送命令并返回结果
|
/// 异步发送 CQRS 命令并返回结果。
|
||||||
/// 通过Mediator模式发送命令请求,支持取消操作
|
|
||||||
/// </summary>
|
/// </summary>
|
||||||
/// <typeparam name="TResponse">命令响应类型</typeparam>
|
/// <typeparam name="TResponse">命令响应类型</typeparam>
|
||||||
/// <param name="command">要发送的命令对象</param>
|
/// <param name="command">要发送的命令对象</param>
|
||||||
/// <param name="cancellationToken">取消令牌,用于取消操作</param>
|
/// <param name="cancellationToken">取消令牌,用于取消操作</param>
|
||||||
/// <returns>包含命令执行结果的ValueTask</returns>
|
/// <returns>包含命令执行结果的ValueTask</returns>
|
||||||
public async ValueTask<TResponse> SendCommandAsync<TResponse>(Mediator.ICommand<TResponse> command,
|
public async ValueTask<TResponse> SendCommandAsync<TResponse>(GFramework.Core.Abstractions.Cqrs.Command.ICommand<TResponse> command,
|
||||||
CancellationToken cancellationToken = default)
|
CancellationToken cancellationToken = default)
|
||||||
{
|
{
|
||||||
ArgumentNullException.ThrowIfNull(command);
|
ArgumentNullException.ThrowIfNull(command);
|
||||||
|
return await SendRequestAsync(command, cancellationToken);
|
||||||
var sender = Sender;
|
|
||||||
if (sender == null)
|
|
||||||
throw new InvalidOperationException("Sender not registered.");
|
|
||||||
|
|
||||||
return await sender.Send(command, cancellationToken);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
/// <summary>
|
/// <summary>
|
||||||
@ -393,12 +361,12 @@ public class ArchitectureContext(IIocContainer container) : IArchitectureContext
|
|||||||
}
|
}
|
||||||
|
|
||||||
/// <summary>
|
/// <summary>
|
||||||
/// [Mediator] 发送命令的同步版本(不推荐,仅用于兼容性)
|
/// 发送 CQRS 命令的同步版本(不推荐,仅用于兼容性)
|
||||||
/// </summary>
|
/// </summary>
|
||||||
/// <typeparam name="TResponse">命令响应类型</typeparam>
|
/// <typeparam name="TResponse">命令响应类型</typeparam>
|
||||||
/// <param name="command">要发送的命令对象</param>
|
/// <param name="command">要发送的命令对象</param>
|
||||||
/// <returns>命令执行结果</returns>
|
/// <returns>命令执行结果</returns>
|
||||||
public TResponse SendCommand<TResponse>(Mediator.ICommand<TResponse> command)
|
public TResponse SendCommand<TResponse>(GFramework.Core.Abstractions.Cqrs.Command.ICommand<TResponse> command)
|
||||||
{
|
{
|
||||||
return SendCommandAsync(command).AsTask().GetAwaiter().GetResult();
|
return SendCommandAsync(command).AsTask().GetAwaiter().GetResult();
|
||||||
}
|
}
|
||||||
@ -491,4 +459,4 @@ public class ArchitectureContext(IIocContainer container) : IArchitectureContext
|
|||||||
}
|
}
|
||||||
|
|
||||||
#endregion
|
#endregion
|
||||||
}
|
}
|
||||||
|
|||||||
@ -9,13 +9,13 @@ namespace GFramework.Core.Command;
|
|||||||
/// </summary>
|
/// </summary>
|
||||||
/// <typeparam name="TInput">命令输入参数类型,必须实现 ICommandInput 接口</typeparam>
|
/// <typeparam name="TInput">命令输入参数类型,必须实现 ICommandInput 接口</typeparam>
|
||||||
/// <param name="input">命令执行所需的输入参数</param>
|
/// <param name="input">命令执行所需的输入参数</param>
|
||||||
public abstract class AbstractCommand<TInput>(TInput input) : ContextAwareBase, ICommand
|
public abstract class AbstractCommand<TInput>(TInput input) : ContextAwareBase, GFramework.Core.Abstractions.Command.ICommand
|
||||||
where TInput : ICommandInput
|
where TInput : ICommandInput
|
||||||
{
|
{
|
||||||
/// <summary>
|
/// <summary>
|
||||||
/// 执行命令的入口方法,实现 ICommand 接口的 Execute 方法
|
/// 执行命令的入口方法,实现 ICommand 接口的 Execute 方法
|
||||||
/// </summary>
|
/// </summary>
|
||||||
void ICommand.Execute()
|
void GFramework.Core.Abstractions.Command.ICommand.Execute()
|
||||||
{
|
{
|
||||||
OnExecute(input);
|
OnExecute(input);
|
||||||
}
|
}
|
||||||
@ -25,4 +25,4 @@ public abstract class AbstractCommand<TInput>(TInput input) : ContextAwareBase,
|
|||||||
/// </summary>
|
/// </summary>
|
||||||
/// <param name="input">命令执行所需的输入参数</param>
|
/// <param name="input">命令执行所需的输入参数</param>
|
||||||
protected abstract void OnExecute(TInput input);
|
protected abstract void OnExecute(TInput input);
|
||||||
}
|
}
|
||||||
|
|||||||
@ -10,14 +10,14 @@ namespace GFramework.Core.Command;
|
|||||||
/// <typeparam name="TInput">命令输入参数类型,必须实现 ICommandInput 接口</typeparam>
|
/// <typeparam name="TInput">命令输入参数类型,必须实现 ICommandInput 接口</typeparam>
|
||||||
/// <typeparam name="TResult">命令执行后返回的结果类型</typeparam>
|
/// <typeparam name="TResult">命令执行后返回的结果类型</typeparam>
|
||||||
/// <param name="input">命令执行所需的输入参数</param>
|
/// <param name="input">命令执行所需的输入参数</param>
|
||||||
public abstract class AbstractCommand<TInput, TResult>(TInput input) : ContextAwareBase, ICommand<TResult>
|
public abstract class AbstractCommand<TInput, TResult>(TInput input) : ContextAwareBase, GFramework.Core.Abstractions.Command.ICommand<TResult>
|
||||||
where TInput : ICommandInput
|
where TInput : ICommandInput
|
||||||
{
|
{
|
||||||
/// <summary>
|
/// <summary>
|
||||||
/// 执行命令的入口方法,实现 ICommand{TResult} 接口的 Execute 方法
|
/// 执行命令的入口方法,实现 ICommand{TResult} 接口的 Execute 方法
|
||||||
/// </summary>
|
/// </summary>
|
||||||
/// <returns>命令执行后的结果</returns>
|
/// <returns>命令执行后的结果</returns>
|
||||||
TResult ICommand<TResult>.Execute()
|
TResult GFramework.Core.Abstractions.Command.ICommand<TResult>.Execute()
|
||||||
{
|
{
|
||||||
return OnExecute(input);
|
return OnExecute(input);
|
||||||
}
|
}
|
||||||
@ -28,4 +28,4 @@ public abstract class AbstractCommand<TInput, TResult>(TInput input) : ContextAw
|
|||||||
/// <param name="input">命令执行所需的输入参数</param>
|
/// <param name="input">命令执行所需的输入参数</param>
|
||||||
/// <returns>命令执行后的结果</returns>
|
/// <returns>命令执行后的结果</returns>
|
||||||
protected abstract TResult OnExecute(TInput input);
|
protected abstract TResult OnExecute(TInput input);
|
||||||
}
|
}
|
||||||
|
|||||||
@ -12,9 +12,9 @@
|
|||||||
// limitations under the License.
|
// limitations under the License.
|
||||||
|
|
||||||
using System.Diagnostics;
|
using System.Diagnostics;
|
||||||
|
using GFramework.Core.Abstractions.Cqrs;
|
||||||
using GFramework.Core.Abstractions.Logging;
|
using GFramework.Core.Abstractions.Logging;
|
||||||
using GFramework.Core.Logging;
|
using GFramework.Core.Logging;
|
||||||
using Mediator;
|
|
||||||
|
|
||||||
namespace GFramework.Core.Cqrs.Behaviors;
|
namespace GFramework.Core.Cqrs.Behaviors;
|
||||||
|
|
||||||
@ -69,4 +69,4 @@ public sealed class LoggingBehavior<TRequest, TResponse> : IPipelineBehavior<TRe
|
|||||||
throw;
|
throw;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
@ -12,9 +12,9 @@
|
|||||||
// limitations under the License.
|
// limitations under the License.
|
||||||
|
|
||||||
using System.Diagnostics;
|
using System.Diagnostics;
|
||||||
|
using GFramework.Core.Abstractions.Cqrs;
|
||||||
using GFramework.Core.Abstractions.Logging;
|
using GFramework.Core.Abstractions.Logging;
|
||||||
using GFramework.Core.Logging;
|
using GFramework.Core.Logging;
|
||||||
using Mediator;
|
|
||||||
|
|
||||||
namespace GFramework.Core.Cqrs.Behaviors;
|
namespace GFramework.Core.Cqrs.Behaviors;
|
||||||
|
|
||||||
@ -61,4 +61,4 @@ public sealed class PerformanceBehavior<TRequest, TResponse> : IPipelineBehavior
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
@ -12,7 +12,8 @@
|
|||||||
// limitations under the License.
|
// limitations under the License.
|
||||||
|
|
||||||
using GFramework.Core.Rule;
|
using GFramework.Core.Rule;
|
||||||
using Mediator;
|
using GFramework.Core.Abstractions.Cqrs;
|
||||||
|
using GFramework.Core.Abstractions.Cqrs.Command;
|
||||||
|
|
||||||
namespace GFramework.Core.Cqrs.Command;
|
namespace GFramework.Core.Cqrs.Command;
|
||||||
|
|
||||||
@ -21,7 +22,7 @@ namespace GFramework.Core.Cqrs.Command;
|
|||||||
/// 继承自ContextAwareBase并实现ICommandHandler接口,为具体的命令处理器提供基础功能
|
/// 继承自ContextAwareBase并实现ICommandHandler接口,为具体的命令处理器提供基础功能
|
||||||
/// </summary>
|
/// </summary>
|
||||||
/// <typeparam name="TCommand">命令类型</typeparam>
|
/// <typeparam name="TCommand">命令类型</typeparam>
|
||||||
public abstract class AbstractCommandHandler<TCommand> : ContextAwareBase, ICommandHandler<TCommand>
|
public abstract class AbstractCommandHandler<TCommand> : ContextAwareBase, IRequestHandler<TCommand, Unit>
|
||||||
where TCommand : ICommand<Unit>
|
where TCommand : ICommand<Unit>
|
||||||
{
|
{
|
||||||
/// <summary>
|
/// <summary>
|
||||||
@ -41,7 +42,7 @@ public abstract class AbstractCommandHandler<TCommand> : ContextAwareBase, IComm
|
|||||||
/// </summary>
|
/// </summary>
|
||||||
/// <typeparam name="TCommand">命令类型,必须实现ICommand接口</typeparam>
|
/// <typeparam name="TCommand">命令类型,必须实现ICommand接口</typeparam>
|
||||||
/// <typeparam name="TResult">命令执行结果类型</typeparam>
|
/// <typeparam name="TResult">命令执行结果类型</typeparam>
|
||||||
public abstract class AbstractCommandHandler<TCommand, TResult> : ContextAwareBase, ICommandHandler<TCommand, TResult>
|
public abstract class AbstractCommandHandler<TCommand, TResult> : ContextAwareBase, IRequestHandler<TCommand, TResult>
|
||||||
where TCommand : ICommand<TResult>
|
where TCommand : ICommand<TResult>
|
||||||
{
|
{
|
||||||
/// <summary>
|
/// <summary>
|
||||||
@ -52,4 +53,4 @@ public abstract class AbstractCommandHandler<TCommand, TResult> : ContextAwareBa
|
|||||||
/// <param name="cancellationToken">取消令牌,用于取消操作</param>
|
/// <param name="cancellationToken">取消令牌,用于取消操作</param>
|
||||||
/// <returns>表示异步操作完成的ValueTask,包含命令执行结果</returns>
|
/// <returns>表示异步操作完成的ValueTask,包含命令执行结果</returns>
|
||||||
public abstract ValueTask<TResult> Handle(TCommand command, CancellationToken cancellationToken);
|
public abstract ValueTask<TResult> Handle(TCommand command, CancellationToken cancellationToken);
|
||||||
}
|
}
|
||||||
|
|||||||
@ -12,7 +12,8 @@
|
|||||||
// limitations under the License.
|
// limitations under the License.
|
||||||
|
|
||||||
using GFramework.Core.Rule;
|
using GFramework.Core.Rule;
|
||||||
using Mediator;
|
using GFramework.Core.Abstractions.Cqrs;
|
||||||
|
using GFramework.Core.Abstractions.Cqrs.Command;
|
||||||
|
|
||||||
namespace GFramework.Core.Cqrs.Command;
|
namespace GFramework.Core.Cqrs.Command;
|
||||||
|
|
||||||
@ -24,7 +25,7 @@ namespace GFramework.Core.Cqrs.Command;
|
|||||||
/// <typeparam name="TCommand">流式命令类型,必须实现IStreamCommand接口</typeparam>
|
/// <typeparam name="TCommand">流式命令类型,必须实现IStreamCommand接口</typeparam>
|
||||||
/// <typeparam name="TResponse">流式命令响应元素类型</typeparam>
|
/// <typeparam name="TResponse">流式命令响应元素类型</typeparam>
|
||||||
public abstract class AbstractStreamCommandHandler<TCommand, TResponse> : ContextAwareBase,
|
public abstract class AbstractStreamCommandHandler<TCommand, TResponse> : ContextAwareBase,
|
||||||
IStreamCommandHandler<TCommand, TResponse>
|
IStreamRequestHandler<TCommand, TResponse>
|
||||||
where TCommand : IStreamCommand<TResponse>
|
where TCommand : IStreamCommand<TResponse>
|
||||||
{
|
{
|
||||||
/// <summary>
|
/// <summary>
|
||||||
@ -35,4 +36,4 @@ public abstract class AbstractStreamCommandHandler<TCommand, TResponse> : Contex
|
|||||||
/// <param name="cancellationToken">取消令牌,用于取消流式处理操作</param>
|
/// <param name="cancellationToken">取消令牌,用于取消流式处理操作</param>
|
||||||
/// <returns>异步可枚举的响应序列,每个元素类型为TResponse</returns>
|
/// <returns>异步可枚举的响应序列,每个元素类型为TResponse</returns>
|
||||||
public abstract IAsyncEnumerable<TResponse> Handle(TCommand command, CancellationToken cancellationToken);
|
public abstract IAsyncEnumerable<TResponse> Handle(TCommand command, CancellationToken cancellationToken);
|
||||||
}
|
}
|
||||||
|
|||||||
@ -12,7 +12,6 @@
|
|||||||
// limitations under the License.
|
// limitations under the License.
|
||||||
|
|
||||||
using GFramework.Core.Abstractions.Cqrs.Command;
|
using GFramework.Core.Abstractions.Cqrs.Command;
|
||||||
using Mediator;
|
|
||||||
|
|
||||||
namespace GFramework.Core.Cqrs.Command;
|
namespace GFramework.Core.Cqrs.Command;
|
||||||
|
|
||||||
@ -29,4 +28,4 @@ public abstract class CommandBase<TInput, TResponse>(TInput input) : ICommand<TR
|
|||||||
/// 获取命令的输入数据。
|
/// 获取命令的输入数据。
|
||||||
/// </summary>
|
/// </summary>
|
||||||
public TInput Input => input;
|
public TInput Input => input;
|
||||||
}
|
}
|
||||||
|
|||||||
263
GFramework.Core/Cqrs/Internal/CqrsDispatcher.cs
Normal file
263
GFramework.Core/Cqrs/Internal/CqrsDispatcher.cs
Normal file
@ -0,0 +1,263 @@
|
|||||||
|
using System.Collections.Concurrent;
|
||||||
|
using System.Reflection;
|
||||||
|
using GFramework.Core.Abstractions.Architectures;
|
||||||
|
using GFramework.Core.Abstractions.Cqrs;
|
||||||
|
using GFramework.Core.Abstractions.Ioc;
|
||||||
|
using GFramework.Core.Abstractions.Logging;
|
||||||
|
using GFramework.Core.Abstractions.Rule;
|
||||||
|
|
||||||
|
namespace GFramework.Core.Cqrs.Internal;
|
||||||
|
|
||||||
|
/// <summary>
|
||||||
|
/// GFramework 自有 CQRS 运行时分发器。
|
||||||
|
/// 该类型负责解析请求/通知处理器,并在调用前为上下文感知对象注入当前架构上下文。
|
||||||
|
/// </summary>
|
||||||
|
internal sealed class CqrsDispatcher(
|
||||||
|
IIocContainer container,
|
||||||
|
IArchitectureContext context,
|
||||||
|
ILogger logger)
|
||||||
|
{
|
||||||
|
private delegate ValueTask<object?> RequestInvoker(object handler, object request, CancellationToken cancellationToken);
|
||||||
|
private delegate ValueTask<object?> RequestPipelineInvoker(
|
||||||
|
object handler,
|
||||||
|
IReadOnlyList<object> behaviors,
|
||||||
|
object request,
|
||||||
|
CancellationToken cancellationToken);
|
||||||
|
private delegate ValueTask NotificationInvoker(object handler, object notification, CancellationToken cancellationToken);
|
||||||
|
private delegate object StreamInvoker(object handler, object request, CancellationToken cancellationToken);
|
||||||
|
|
||||||
|
private static readonly ConcurrentDictionary<(Type RequestType, Type ResponseType), RequestInvoker> RequestInvokers = new();
|
||||||
|
private static readonly ConcurrentDictionary<(Type RequestType, Type ResponseType), RequestPipelineInvoker> RequestPipelineInvokers = new();
|
||||||
|
private static readonly ConcurrentDictionary<Type, NotificationInvoker> NotificationInvokers = new();
|
||||||
|
private static readonly ConcurrentDictionary<(Type RequestType, Type ResponseType), StreamInvoker> StreamInvokers = new();
|
||||||
|
|
||||||
|
/// <summary>
|
||||||
|
/// 发送请求并返回结果。
|
||||||
|
/// </summary>
|
||||||
|
/// <typeparam name="TResponse">响应类型。</typeparam>
|
||||||
|
/// <param name="request">请求对象。</param>
|
||||||
|
/// <param name="cancellationToken">取消令牌。</param>
|
||||||
|
/// <returns>请求响应。</returns>
|
||||||
|
public async ValueTask<TResponse> SendAsync<TResponse>(
|
||||||
|
IRequest<TResponse> request,
|
||||||
|
CancellationToken cancellationToken = default)
|
||||||
|
{
|
||||||
|
ArgumentNullException.ThrowIfNull(request);
|
||||||
|
|
||||||
|
var requestType = request.GetType();
|
||||||
|
var handlerType = typeof(IRequestHandler<,>).MakeGenericType(requestType, typeof(TResponse));
|
||||||
|
var handler = container.Get(handlerType)
|
||||||
|
?? throw new InvalidOperationException(
|
||||||
|
$"No CQRS request handler registered for {requestType.FullName}.");
|
||||||
|
|
||||||
|
PrepareHandler(handler);
|
||||||
|
var behaviorType = typeof(IPipelineBehavior<,>).MakeGenericType(requestType, typeof(TResponse));
|
||||||
|
var behaviors = container.GetAll(behaviorType);
|
||||||
|
|
||||||
|
foreach (var behavior in behaviors)
|
||||||
|
PrepareHandler(behavior);
|
||||||
|
|
||||||
|
if (behaviors.Count == 0)
|
||||||
|
{
|
||||||
|
var invoker = RequestInvokers.GetOrAdd(
|
||||||
|
(requestType, typeof(TResponse)),
|
||||||
|
static key => CreateRequestInvoker(key.RequestType, key.ResponseType));
|
||||||
|
|
||||||
|
var result = await invoker(handler, request, cancellationToken);
|
||||||
|
return result is null ? default! : (TResponse)result;
|
||||||
|
}
|
||||||
|
|
||||||
|
var pipelineInvoker = RequestPipelineInvokers.GetOrAdd(
|
||||||
|
(requestType, typeof(TResponse)),
|
||||||
|
static key => CreateRequestPipelineInvoker(key.RequestType, key.ResponseType));
|
||||||
|
|
||||||
|
var pipelineResult = await pipelineInvoker(handler, behaviors, request, cancellationToken);
|
||||||
|
return pipelineResult is null ? default! : (TResponse)pipelineResult;
|
||||||
|
}
|
||||||
|
|
||||||
|
/// <summary>
|
||||||
|
/// 发布通知到所有已注册处理器。
|
||||||
|
/// </summary>
|
||||||
|
/// <typeparam name="TNotification">通知类型。</typeparam>
|
||||||
|
/// <param name="notification">通知对象。</param>
|
||||||
|
/// <param name="cancellationToken">取消令牌。</param>
|
||||||
|
public async ValueTask PublishAsync<TNotification>(
|
||||||
|
TNotification notification,
|
||||||
|
CancellationToken cancellationToken = default)
|
||||||
|
where TNotification : INotification
|
||||||
|
{
|
||||||
|
ArgumentNullException.ThrowIfNull(notification);
|
||||||
|
|
||||||
|
var notificationType = notification.GetType();
|
||||||
|
var handlerType = typeof(INotificationHandler<>).MakeGenericType(notificationType);
|
||||||
|
var handlers = container.GetAll(handlerType);
|
||||||
|
|
||||||
|
if (handlers.Count == 0)
|
||||||
|
{
|
||||||
|
logger.Debug($"No CQRS notification handler registered for {notificationType.FullName}.");
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
var invoker = NotificationInvokers.GetOrAdd(
|
||||||
|
notificationType,
|
||||||
|
CreateNotificationInvoker);
|
||||||
|
|
||||||
|
foreach (var handler in handlers)
|
||||||
|
{
|
||||||
|
PrepareHandler(handler);
|
||||||
|
await invoker(handler, notification, cancellationToken);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/// <summary>
|
||||||
|
/// 创建流式请求并返回异步响应序列。
|
||||||
|
/// </summary>
|
||||||
|
/// <typeparam name="TResponse">响应元素类型。</typeparam>
|
||||||
|
/// <param name="request">流式请求对象。</param>
|
||||||
|
/// <param name="cancellationToken">取消令牌。</param>
|
||||||
|
/// <returns>异步响应序列。</returns>
|
||||||
|
public IAsyncEnumerable<TResponse> CreateStream<TResponse>(
|
||||||
|
IStreamRequest<TResponse> request,
|
||||||
|
CancellationToken cancellationToken = default)
|
||||||
|
{
|
||||||
|
ArgumentNullException.ThrowIfNull(request);
|
||||||
|
|
||||||
|
var requestType = request.GetType();
|
||||||
|
var handlerType = typeof(IStreamRequestHandler<,>).MakeGenericType(requestType, typeof(TResponse));
|
||||||
|
var handler = container.Get(handlerType)
|
||||||
|
?? throw new InvalidOperationException(
|
||||||
|
$"No CQRS stream handler registered for {requestType.FullName}.");
|
||||||
|
|
||||||
|
PrepareHandler(handler);
|
||||||
|
|
||||||
|
var invoker = StreamInvokers.GetOrAdd(
|
||||||
|
(requestType, typeof(TResponse)),
|
||||||
|
static key => CreateStreamInvoker(key.RequestType, key.ResponseType));
|
||||||
|
|
||||||
|
return (IAsyncEnumerable<TResponse>)invoker(handler, request, cancellationToken);
|
||||||
|
}
|
||||||
|
|
||||||
|
/// <summary>
|
||||||
|
/// 为上下文感知处理器注入当前架构上下文。
|
||||||
|
/// </summary>
|
||||||
|
/// <param name="handler">处理器实例。</param>
|
||||||
|
private void PrepareHandler(object handler)
|
||||||
|
{
|
||||||
|
if (handler is IContextAware contextAware)
|
||||||
|
contextAware.SetContext(context);
|
||||||
|
}
|
||||||
|
|
||||||
|
/// <summary>
|
||||||
|
/// 生成请求处理器调用委托,避免每次发送都重复反射。
|
||||||
|
/// </summary>
|
||||||
|
private static RequestInvoker CreateRequestInvoker(Type requestType, Type responseType)
|
||||||
|
{
|
||||||
|
var method = typeof(CqrsDispatcher)
|
||||||
|
.GetMethod(nameof(InvokeRequestHandlerAsync), BindingFlags.NonPublic | BindingFlags.Static)!
|
||||||
|
.MakeGenericMethod(requestType, responseType);
|
||||||
|
return (RequestInvoker)Delegate.CreateDelegate(typeof(RequestInvoker), method);
|
||||||
|
}
|
||||||
|
|
||||||
|
/// <summary>
|
||||||
|
/// 生成带管道行为的请求处理委托,避免每次发送都重复反射。
|
||||||
|
/// </summary>
|
||||||
|
private static RequestPipelineInvoker CreateRequestPipelineInvoker(Type requestType, Type responseType)
|
||||||
|
{
|
||||||
|
var method = typeof(CqrsDispatcher)
|
||||||
|
.GetMethod(nameof(InvokeRequestPipelineAsync), BindingFlags.NonPublic | BindingFlags.Static)!
|
||||||
|
.MakeGenericMethod(requestType, responseType);
|
||||||
|
return (RequestPipelineInvoker)Delegate.CreateDelegate(typeof(RequestPipelineInvoker), method);
|
||||||
|
}
|
||||||
|
|
||||||
|
/// <summary>
|
||||||
|
/// 生成通知处理器调用委托,避免每次发布都重复反射。
|
||||||
|
/// </summary>
|
||||||
|
private static NotificationInvoker CreateNotificationInvoker(Type notificationType)
|
||||||
|
{
|
||||||
|
var method = typeof(CqrsDispatcher)
|
||||||
|
.GetMethod(nameof(InvokeNotificationHandlerAsync), BindingFlags.NonPublic | BindingFlags.Static)!
|
||||||
|
.MakeGenericMethod(notificationType);
|
||||||
|
return (NotificationInvoker)Delegate.CreateDelegate(typeof(NotificationInvoker), method);
|
||||||
|
}
|
||||||
|
|
||||||
|
/// <summary>
|
||||||
|
/// 生成流式处理器调用委托,避免每次创建流都重复反射。
|
||||||
|
/// </summary>
|
||||||
|
private static StreamInvoker CreateStreamInvoker(Type requestType, Type responseType)
|
||||||
|
{
|
||||||
|
var method = typeof(CqrsDispatcher)
|
||||||
|
.GetMethod(nameof(InvokeStreamHandler), BindingFlags.NonPublic | BindingFlags.Static)!
|
||||||
|
.MakeGenericMethod(requestType, responseType);
|
||||||
|
return (StreamInvoker)Delegate.CreateDelegate(typeof(StreamInvoker), method);
|
||||||
|
}
|
||||||
|
|
||||||
|
/// <summary>
|
||||||
|
/// 执行已强类型化的请求处理器调用。
|
||||||
|
/// </summary>
|
||||||
|
private static async ValueTask<object?> InvokeRequestHandlerAsync<TRequest, TResponse>(
|
||||||
|
object handler,
|
||||||
|
object request,
|
||||||
|
CancellationToken cancellationToken)
|
||||||
|
where TRequest : IRequest<TResponse>
|
||||||
|
{
|
||||||
|
var typedHandler = (IRequestHandler<TRequest, TResponse>)handler;
|
||||||
|
var typedRequest = (TRequest)request;
|
||||||
|
var result = await typedHandler.Handle(typedRequest, cancellationToken);
|
||||||
|
return result;
|
||||||
|
}
|
||||||
|
|
||||||
|
/// <summary>
|
||||||
|
/// 执行包含管道行为链的请求处理。
|
||||||
|
/// </summary>
|
||||||
|
private static async ValueTask<object?> InvokeRequestPipelineAsync<TRequest, TResponse>(
|
||||||
|
object handler,
|
||||||
|
IReadOnlyList<object> behaviors,
|
||||||
|
object request,
|
||||||
|
CancellationToken cancellationToken)
|
||||||
|
where TRequest : IRequest<TResponse>
|
||||||
|
{
|
||||||
|
var typedHandler = (IRequestHandler<TRequest, TResponse>)handler;
|
||||||
|
var typedRequest = (TRequest)request;
|
||||||
|
|
||||||
|
MessageHandlerDelegate<TRequest, TResponse> next =
|
||||||
|
(message, token) => typedHandler.Handle(message, token);
|
||||||
|
|
||||||
|
for (var i = behaviors.Count - 1; i >= 0; i--)
|
||||||
|
{
|
||||||
|
var behavior = (IPipelineBehavior<TRequest, TResponse>)behaviors[i];
|
||||||
|
var currentNext = next;
|
||||||
|
next = (message, token) => behavior.Handle(message, currentNext, token);
|
||||||
|
}
|
||||||
|
|
||||||
|
var result = await next(typedRequest, cancellationToken);
|
||||||
|
return result;
|
||||||
|
}
|
||||||
|
|
||||||
|
/// <summary>
|
||||||
|
/// 执行已强类型化的通知处理器调用。
|
||||||
|
/// </summary>
|
||||||
|
private static ValueTask InvokeNotificationHandlerAsync<TNotification>(
|
||||||
|
object handler,
|
||||||
|
object notification,
|
||||||
|
CancellationToken cancellationToken)
|
||||||
|
where TNotification : INotification
|
||||||
|
{
|
||||||
|
var typedHandler = (INotificationHandler<TNotification>)handler;
|
||||||
|
var typedNotification = (TNotification)notification;
|
||||||
|
return typedHandler.Handle(typedNotification, cancellationToken);
|
||||||
|
}
|
||||||
|
|
||||||
|
/// <summary>
|
||||||
|
/// 执行已强类型化的流式处理器调用。
|
||||||
|
/// </summary>
|
||||||
|
private static object InvokeStreamHandler<TRequest, TResponse>(
|
||||||
|
object handler,
|
||||||
|
object request,
|
||||||
|
CancellationToken cancellationToken)
|
||||||
|
where TRequest : IStreamRequest<TResponse>
|
||||||
|
{
|
||||||
|
var typedHandler = (IStreamRequestHandler<TRequest, TResponse>)handler;
|
||||||
|
var typedRequest = (TRequest)request;
|
||||||
|
return typedHandler.Handle(typedRequest, cancellationToken);
|
||||||
|
}
|
||||||
|
}
|
||||||
81
GFramework.Core/Cqrs/Internal/CqrsHandlerRegistrar.cs
Normal file
81
GFramework.Core/Cqrs/Internal/CqrsHandlerRegistrar.cs
Normal file
@ -0,0 +1,81 @@
|
|||||||
|
using System.Reflection;
|
||||||
|
using GFramework.Core.Abstractions.Cqrs;
|
||||||
|
using GFramework.Core.Abstractions.Ioc;
|
||||||
|
using GFramework.Core.Abstractions.Logging;
|
||||||
|
using Microsoft.Extensions.DependencyInjection;
|
||||||
|
|
||||||
|
namespace GFramework.Core.Cqrs.Internal;
|
||||||
|
|
||||||
|
/// <summary>
|
||||||
|
/// 在架构初始化期间扫描并注册 CQRS 处理器。
|
||||||
|
/// 首批实现采用运行时反射扫描,优先满足“无需 AddMediator 即可工作”的迁移目标。
|
||||||
|
/// </summary>
|
||||||
|
internal static class CqrsHandlerRegistrar
|
||||||
|
{
|
||||||
|
/// <summary>
|
||||||
|
/// 扫描指定程序集并注册所有 CQRS 请求/通知/流式处理器。
|
||||||
|
/// </summary>
|
||||||
|
/// <param name="container">目标容器。</param>
|
||||||
|
/// <param name="assemblies">要扫描的程序集集合。</param>
|
||||||
|
/// <param name="logger">日志记录器。</param>
|
||||||
|
public static void RegisterHandlers(
|
||||||
|
IIocContainer container,
|
||||||
|
IEnumerable<Assembly> assemblies,
|
||||||
|
ILogger logger)
|
||||||
|
{
|
||||||
|
ArgumentNullException.ThrowIfNull(container);
|
||||||
|
ArgumentNullException.ThrowIfNull(assemblies);
|
||||||
|
ArgumentNullException.ThrowIfNull(logger);
|
||||||
|
|
||||||
|
foreach (var assembly in assemblies.Distinct())
|
||||||
|
{
|
||||||
|
RegisterAssemblyHandlers(container.GetServicesUnsafe, assembly, logger);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/// <summary>
|
||||||
|
/// 注册单个程序集里的所有 CQRS 处理器映射。
|
||||||
|
/// </summary>
|
||||||
|
private static void RegisterAssemblyHandlers(IServiceCollection services, Assembly assembly, ILogger logger)
|
||||||
|
{
|
||||||
|
foreach (var implementationType in assembly.GetTypes().Where(IsConcreteHandlerType))
|
||||||
|
{
|
||||||
|
var handlerInterfaces = implementationType
|
||||||
|
.GetInterfaces()
|
||||||
|
.Where(IsSupportedHandlerInterface)
|
||||||
|
.ToList();
|
||||||
|
|
||||||
|
if (handlerInterfaces.Count == 0)
|
||||||
|
continue;
|
||||||
|
|
||||||
|
foreach (var handlerInterface in handlerInterfaces)
|
||||||
|
{
|
||||||
|
services.AddSingleton(handlerInterface, implementationType);
|
||||||
|
logger.Debug(
|
||||||
|
$"Registered CQRS handler {implementationType.FullName} as {handlerInterface.FullName}.");
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/// <summary>
|
||||||
|
/// 判断指定类型是否可作为可实例化处理器。
|
||||||
|
/// </summary>
|
||||||
|
private static bool IsConcreteHandlerType(Type type)
|
||||||
|
{
|
||||||
|
return type is { IsAbstract: false, IsInterface: false } && !type.ContainsGenericParameters;
|
||||||
|
}
|
||||||
|
|
||||||
|
/// <summary>
|
||||||
|
/// 判断接口是否为当前运行时支持的 CQRS 处理器接口。
|
||||||
|
/// </summary>
|
||||||
|
private static bool IsSupportedHandlerInterface(Type type)
|
||||||
|
{
|
||||||
|
if (!type.IsGenericType)
|
||||||
|
return false;
|
||||||
|
|
||||||
|
var definition = type.GetGenericTypeDefinition();
|
||||||
|
return definition == typeof(IRequestHandler<,>) ||
|
||||||
|
definition == typeof(INotificationHandler<>) ||
|
||||||
|
definition == typeof(IStreamRequestHandler<,>);
|
||||||
|
}
|
||||||
|
}
|
||||||
@ -12,7 +12,7 @@
|
|||||||
// limitations under the License.
|
// limitations under the License.
|
||||||
|
|
||||||
using GFramework.Core.Rule;
|
using GFramework.Core.Rule;
|
||||||
using Mediator;
|
using GFramework.Core.Abstractions.Cqrs;
|
||||||
|
|
||||||
namespace GFramework.Core.Cqrs.Notification;
|
namespace GFramework.Core.Cqrs.Notification;
|
||||||
|
|
||||||
@ -33,4 +33,4 @@ public abstract class AbstractNotificationHandler<TNotification> : ContextAwareB
|
|||||||
/// <param name="cancellationToken">取消令牌,用于取消操作</param>
|
/// <param name="cancellationToken">取消令牌,用于取消操作</param>
|
||||||
/// <returns>表示异步操作完成的ValueTask</returns>
|
/// <returns>表示异步操作完成的ValueTask</returns>
|
||||||
public abstract ValueTask Handle(TNotification notification, CancellationToken cancellationToken);
|
public abstract ValueTask Handle(TNotification notification, CancellationToken cancellationToken);
|
||||||
}
|
}
|
||||||
|
|||||||
@ -12,7 +12,7 @@
|
|||||||
// limitations under the License.
|
// limitations under the License.
|
||||||
|
|
||||||
using GFramework.Core.Abstractions.Cqrs.Notification;
|
using GFramework.Core.Abstractions.Cqrs.Notification;
|
||||||
using Mediator;
|
using GFramework.Core.Abstractions.Cqrs;
|
||||||
|
|
||||||
namespace GFramework.Core.Cqrs.Notification;
|
namespace GFramework.Core.Cqrs.Notification;
|
||||||
|
|
||||||
@ -28,4 +28,4 @@ public abstract class NotificationBase<TInput>(TInput input) : INotification whe
|
|||||||
/// 获取通知的输入数据。
|
/// 获取通知的输入数据。
|
||||||
/// </summary>
|
/// </summary>
|
||||||
public TInput Input => input;
|
public TInput Input => input;
|
||||||
}
|
}
|
||||||
|
|||||||
@ -12,7 +12,8 @@
|
|||||||
// limitations under the License.
|
// limitations under the License.
|
||||||
|
|
||||||
using GFramework.Core.Rule;
|
using GFramework.Core.Rule;
|
||||||
using Mediator;
|
using GFramework.Core.Abstractions.Cqrs;
|
||||||
|
using GFramework.Core.Abstractions.Cqrs.Query;
|
||||||
|
|
||||||
namespace GFramework.Core.Cqrs.Query;
|
namespace GFramework.Core.Cqrs.Query;
|
||||||
|
|
||||||
@ -23,7 +24,7 @@ namespace GFramework.Core.Cqrs.Query;
|
|||||||
/// </summary>
|
/// </summary>
|
||||||
/// <typeparam name="TQuery">查询类型,必须实现IQuery接口</typeparam>
|
/// <typeparam name="TQuery">查询类型,必须实现IQuery接口</typeparam>
|
||||||
/// <typeparam name="TResult">查询结果类型</typeparam>
|
/// <typeparam name="TResult">查询结果类型</typeparam>
|
||||||
public abstract class AbstractQueryHandler<TQuery, TResult> : ContextAwareBase, IQueryHandler<TQuery, TResult>
|
public abstract class AbstractQueryHandler<TQuery, TResult> : ContextAwareBase, IRequestHandler<TQuery, TResult>
|
||||||
where TQuery : IQuery<TResult>
|
where TQuery : IQuery<TResult>
|
||||||
{
|
{
|
||||||
/// <summary>
|
/// <summary>
|
||||||
@ -34,4 +35,4 @@ public abstract class AbstractQueryHandler<TQuery, TResult> : ContextAwareBase,
|
|||||||
/// <param name="cancellationToken">取消令牌,用于取消操作</param>
|
/// <param name="cancellationToken">取消令牌,用于取消操作</param>
|
||||||
/// <returns>表示异步操作完成的ValueTask,包含查询结果</returns>
|
/// <returns>表示异步操作完成的ValueTask,包含查询结果</returns>
|
||||||
public abstract ValueTask<TResult> Handle(TQuery query, CancellationToken cancellationToken);
|
public abstract ValueTask<TResult> Handle(TQuery query, CancellationToken cancellationToken);
|
||||||
}
|
}
|
||||||
|
|||||||
@ -12,7 +12,8 @@
|
|||||||
// limitations under the License.
|
// limitations under the License.
|
||||||
|
|
||||||
using GFramework.Core.Rule;
|
using GFramework.Core.Rule;
|
||||||
using Mediator;
|
using GFramework.Core.Abstractions.Cqrs;
|
||||||
|
using GFramework.Core.Abstractions.Cqrs.Query;
|
||||||
|
|
||||||
namespace GFramework.Core.Cqrs.Query;
|
namespace GFramework.Core.Cqrs.Query;
|
||||||
|
|
||||||
@ -24,7 +25,7 @@ namespace GFramework.Core.Cqrs.Query;
|
|||||||
/// <typeparam name="TQuery">流式查询类型,必须实现IStreamQuery接口</typeparam>
|
/// <typeparam name="TQuery">流式查询类型,必须实现IStreamQuery接口</typeparam>
|
||||||
/// <typeparam name="TResponse">流式查询响应元素类型</typeparam>
|
/// <typeparam name="TResponse">流式查询响应元素类型</typeparam>
|
||||||
public abstract class AbstractStreamQueryHandler<TQuery, TResponse> : ContextAwareBase,
|
public abstract class AbstractStreamQueryHandler<TQuery, TResponse> : ContextAwareBase,
|
||||||
IStreamQueryHandler<TQuery, TResponse>
|
IStreamRequestHandler<TQuery, TResponse>
|
||||||
where TQuery : IStreamQuery<TResponse>
|
where TQuery : IStreamQuery<TResponse>
|
||||||
{
|
{
|
||||||
/// <summary>
|
/// <summary>
|
||||||
@ -35,4 +36,4 @@ public abstract class AbstractStreamQueryHandler<TQuery, TResponse> : ContextAwa
|
|||||||
/// <param name="cancellationToken">取消令牌,用于取消流式查询操作</param>
|
/// <param name="cancellationToken">取消令牌,用于取消流式查询操作</param>
|
||||||
/// <returns>异步可枚举的响应序列,每个元素类型为TResponse</returns>
|
/// <returns>异步可枚举的响应序列,每个元素类型为TResponse</returns>
|
||||||
public abstract IAsyncEnumerable<TResponse> Handle(TQuery query, CancellationToken cancellationToken);
|
public abstract IAsyncEnumerable<TResponse> Handle(TQuery query, CancellationToken cancellationToken);
|
||||||
}
|
}
|
||||||
|
|||||||
@ -12,7 +12,6 @@
|
|||||||
// limitations under the License.
|
// limitations under the License.
|
||||||
|
|
||||||
using GFramework.Core.Abstractions.Cqrs.Query;
|
using GFramework.Core.Abstractions.Cqrs.Query;
|
||||||
using Mediator;
|
|
||||||
|
|
||||||
namespace GFramework.Core.Cqrs.Query;
|
namespace GFramework.Core.Cqrs.Query;
|
||||||
|
|
||||||
@ -29,4 +28,4 @@ public abstract class QueryBase<TInput, TResponse>(TInput input) : IQuery<TRespo
|
|||||||
/// 获取查询的输入数据。
|
/// 获取查询的输入数据。
|
||||||
/// </summary>
|
/// </summary>
|
||||||
public TInput Input => input;
|
public TInput Input => input;
|
||||||
}
|
}
|
||||||
|
|||||||
@ -12,7 +12,7 @@
|
|||||||
// limitations under the License.
|
// limitations under the License.
|
||||||
|
|
||||||
using GFramework.Core.Rule;
|
using GFramework.Core.Rule;
|
||||||
using Mediator;
|
using GFramework.Core.Abstractions.Cqrs;
|
||||||
|
|
||||||
namespace GFramework.Core.Cqrs.Request;
|
namespace GFramework.Core.Cqrs.Request;
|
||||||
|
|
||||||
@ -21,7 +21,7 @@ namespace GFramework.Core.Cqrs.Request;
|
|||||||
/// 继承自ContextAwareBase并实现IRequestHandler接口
|
/// 继承自ContextAwareBase并实现IRequestHandler接口
|
||||||
/// </summary>
|
/// </summary>
|
||||||
/// <typeparam name="TRequest">请求类型,必须实现IRequest[Unit]接口</typeparam>
|
/// <typeparam name="TRequest">请求类型,必须实现IRequest[Unit]接口</typeparam>
|
||||||
public abstract class AbstractRequestHandler<TRequest> : ContextAwareBase, IRequestHandler<TRequest>
|
public abstract class AbstractRequestHandler<TRequest> : ContextAwareBase, IRequestHandler<TRequest, Unit>
|
||||||
where TRequest : IRequest<Unit>
|
where TRequest : IRequest<Unit>
|
||||||
{
|
{
|
||||||
/// <summary>
|
/// <summary>
|
||||||
@ -49,4 +49,4 @@ public abstract class AbstractRequestHandler<TRequest, TResponse> : ContextAware
|
|||||||
/// <param name="cancellationToken">取消令牌,用于取消操作</param>
|
/// <param name="cancellationToken">取消令牌,用于取消操作</param>
|
||||||
/// <returns>表示异步操作的ValueTask,完成时返回处理结果</returns>
|
/// <returns>表示异步操作的ValueTask,完成时返回处理结果</returns>
|
||||||
public abstract ValueTask<TResponse> Handle(TRequest request, CancellationToken cancellationToken);
|
public abstract ValueTask<TResponse> Handle(TRequest request, CancellationToken cancellationToken);
|
||||||
}
|
}
|
||||||
|
|||||||
@ -12,7 +12,7 @@
|
|||||||
// limitations under the License.
|
// limitations under the License.
|
||||||
|
|
||||||
using GFramework.Core.Rule;
|
using GFramework.Core.Rule;
|
||||||
using Mediator;
|
using GFramework.Core.Abstractions.Cqrs;
|
||||||
|
|
||||||
namespace GFramework.Core.Cqrs.Request;
|
namespace GFramework.Core.Cqrs.Request;
|
||||||
|
|
||||||
@ -35,4 +35,4 @@ public abstract class AbstractStreamRequestHandler<TRequest, TResponse> : Contex
|
|||||||
/// <param name="cancellationToken">取消令牌,用于取消流式请求操作</param>
|
/// <param name="cancellationToken">取消令牌,用于取消流式请求操作</param>
|
||||||
/// <returns>异步可枚举的响应序列,每个元素类型为TResponse</returns>
|
/// <returns>异步可枚举的响应序列,每个元素类型为TResponse</returns>
|
||||||
public abstract IAsyncEnumerable<TResponse> Handle(TRequest request, CancellationToken cancellationToken);
|
public abstract IAsyncEnumerable<TResponse> Handle(TRequest request, CancellationToken cancellationToken);
|
||||||
}
|
}
|
||||||
|
|||||||
@ -12,7 +12,7 @@
|
|||||||
// limitations under the License.
|
// limitations under the License.
|
||||||
|
|
||||||
using GFramework.Core.Abstractions.Cqrs.Request;
|
using GFramework.Core.Abstractions.Cqrs.Request;
|
||||||
using Mediator;
|
using GFramework.Core.Abstractions.Cqrs;
|
||||||
|
|
||||||
namespace GFramework.Core.Cqrs.Request;
|
namespace GFramework.Core.Cqrs.Request;
|
||||||
|
|
||||||
@ -29,4 +29,4 @@ public abstract class RequestBase<TInput, TResponse>(TInput input) : IRequest<TR
|
|||||||
/// 获取请求的输入数据。
|
/// 获取请求的输入数据。
|
||||||
/// </summary>
|
/// </summary>
|
||||||
public TInput Input => input;
|
public TInput Input => input;
|
||||||
}
|
}
|
||||||
|
|||||||
@ -1,16 +1,15 @@
|
|||||||
using GFramework.Core.Abstractions.Rule;
|
using GFramework.Core.Abstractions.Rule;
|
||||||
using Mediator;
|
using GFramework.Core.Abstractions.Cqrs.Command;
|
||||||
|
|
||||||
namespace GFramework.Core.Extensions;
|
namespace GFramework.Core.Extensions;
|
||||||
|
|
||||||
/// <summary>
|
/// <summary>
|
||||||
/// 提供对 IContextAware 接口的 Mediator 命令扩展方法
|
/// 提供对 IContextAware 接口的 CQRS 命令扩展方法。
|
||||||
/// 使用 Mediator 库的命令模式
|
|
||||||
/// </summary>
|
/// </summary>
|
||||||
public static class ContextAwareMediatorCommandExtensions
|
public static class ContextAwareMediatorCommandExtensions
|
||||||
{
|
{
|
||||||
/// <summary>
|
/// <summary>
|
||||||
/// [Mediator] 发送命令的同步版本(不推荐,仅用于兼容性)
|
/// 发送命令的同步版本(不推荐,仅用于兼容性)
|
||||||
/// </summary>
|
/// </summary>
|
||||||
/// <typeparam name="TResponse">命令响应类型</typeparam>
|
/// <typeparam name="TResponse">命令响应类型</typeparam>
|
||||||
/// <param name="contextAware">实现 IContextAware 接口的对象</param>
|
/// <param name="contextAware">实现 IContextAware 接口的对象</param>
|
||||||
@ -28,7 +27,7 @@ public static class ContextAwareMediatorCommandExtensions
|
|||||||
}
|
}
|
||||||
|
|
||||||
/// <summary>
|
/// <summary>
|
||||||
/// [Mediator] 异步发送命令并返回结果
|
/// 异步发送命令并返回结果
|
||||||
/// </summary>
|
/// </summary>
|
||||||
/// <typeparam name="TResponse">命令响应类型</typeparam>
|
/// <typeparam name="TResponse">命令响应类型</typeparam>
|
||||||
/// <param name="contextAware">实现 IContextAware 接口的对象</param>
|
/// <param name="contextAware">实现 IContextAware 接口的对象</param>
|
||||||
@ -45,4 +44,4 @@ public static class ContextAwareMediatorCommandExtensions
|
|||||||
var context = contextAware.GetContext();
|
var context = contextAware.GetContext();
|
||||||
return context.SendCommandAsync(command, cancellationToken);
|
return context.SendCommandAsync(command, cancellationToken);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
@ -1,10 +1,10 @@
|
|||||||
using GFramework.Core.Abstractions.Rule;
|
using GFramework.Core.Abstractions.Rule;
|
||||||
using Mediator;
|
using GFramework.Core.Abstractions.Cqrs;
|
||||||
|
|
||||||
namespace GFramework.Core.Extensions;
|
namespace GFramework.Core.Extensions;
|
||||||
|
|
||||||
/// <summary>
|
/// <summary>
|
||||||
/// 提供对 IContextAware 接口的 Mediator 统一接口扩展方法
|
/// 提供对 IContextAware 接口的 CQRS 统一接口扩展方法。
|
||||||
/// </summary>
|
/// </summary>
|
||||||
public static class ContextAwareMediatorExtensions
|
public static class ContextAwareMediatorExtensions
|
||||||
{
|
{
|
||||||
@ -122,4 +122,4 @@ public static class ContextAwareMediatorExtensions
|
|||||||
var context = contextAware.GetContext();
|
var context = contextAware.GetContext();
|
||||||
return context.SendAsync(command, cancellationToken);
|
return context.SendAsync(command, cancellationToken);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
@ -1,16 +1,15 @@
|
|||||||
using GFramework.Core.Abstractions.Rule;
|
using GFramework.Core.Abstractions.Rule;
|
||||||
using Mediator;
|
using GFramework.Core.Abstractions.Cqrs.Query;
|
||||||
|
|
||||||
namespace GFramework.Core.Extensions;
|
namespace GFramework.Core.Extensions;
|
||||||
|
|
||||||
/// <summary>
|
/// <summary>
|
||||||
/// 提供对 IContextAware 接口的 Mediator 查询扩展方法
|
/// 提供对 IContextAware 接口的 CQRS 查询扩展方法。
|
||||||
/// 使用 Mediator 库的查询模式
|
|
||||||
/// </summary>
|
/// </summary>
|
||||||
public static class ContextAwareMediatorQueryExtensions
|
public static class ContextAwareMediatorQueryExtensions
|
||||||
{
|
{
|
||||||
/// <summary>
|
/// <summary>
|
||||||
/// [Mediator] 发送查询的同步版本(不推荐,仅用于兼容性)
|
/// 发送查询的同步版本(不推荐,仅用于兼容性)
|
||||||
/// </summary>
|
/// </summary>
|
||||||
/// <typeparam name="TResponse">查询响应类型</typeparam>
|
/// <typeparam name="TResponse">查询响应类型</typeparam>
|
||||||
/// <param name="contextAware">实现 IContextAware 接口的对象</param>
|
/// <param name="contextAware">实现 IContextAware 接口的对象</param>
|
||||||
@ -27,7 +26,7 @@ public static class ContextAwareMediatorQueryExtensions
|
|||||||
}
|
}
|
||||||
|
|
||||||
/// <summary>
|
/// <summary>
|
||||||
/// [Mediator] 异步发送查询并返回结果
|
/// 异步发送查询并返回结果
|
||||||
/// </summary>
|
/// </summary>
|
||||||
/// <typeparam name="TResponse">查询响应类型</typeparam>
|
/// <typeparam name="TResponse">查询响应类型</typeparam>
|
||||||
/// <param name="contextAware">实现 IContextAware 接口的对象</param>
|
/// <param name="contextAware">实现 IContextAware 接口的对象</param>
|
||||||
@ -44,4 +43,4 @@ public static class ContextAwareMediatorQueryExtensions
|
|||||||
var context = contextAware.GetContext();
|
var context = contextAware.GetContext();
|
||||||
return context.SendQueryAsync(query, cancellationToken);
|
return context.SendQueryAsync(query, cancellationToken);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
@ -1,10 +1,10 @@
|
|||||||
using GFramework.Core.Abstractions.Bases;
|
using GFramework.Core.Abstractions.Bases;
|
||||||
|
using GFramework.Core.Abstractions.Cqrs;
|
||||||
using GFramework.Core.Abstractions.Ioc;
|
using GFramework.Core.Abstractions.Ioc;
|
||||||
using GFramework.Core.Abstractions.Logging;
|
using GFramework.Core.Abstractions.Logging;
|
||||||
using GFramework.Core.Abstractions.Systems;
|
using GFramework.Core.Abstractions.Systems;
|
||||||
using GFramework.Core.Logging;
|
using GFramework.Core.Logging;
|
||||||
using GFramework.Core.Rule;
|
using GFramework.Core.Rule;
|
||||||
using Mediator;
|
|
||||||
using Microsoft.Extensions.DependencyInjection;
|
using Microsoft.Extensions.DependencyInjection;
|
||||||
|
|
||||||
namespace GFramework.Core.Ioc;
|
namespace GFramework.Core.Ioc;
|
||||||
@ -804,4 +804,4 @@ public class MicrosoftDiContainer(IServiceCollection? serviceCollection = null)
|
|||||||
}
|
}
|
||||||
|
|
||||||
#endregion
|
#endregion
|
||||||
}
|
}
|
||||||
|
|||||||
@ -9,7 +9,7 @@ namespace GFramework.Core.Query;
|
|||||||
/// </summary>
|
/// </summary>
|
||||||
/// <typeparam name="TInput">查询输入参数的类型,必须实现IQueryInput接口</typeparam>
|
/// <typeparam name="TInput">查询输入参数的类型,必须实现IQueryInput接口</typeparam>
|
||||||
/// <typeparam name="TResult">查询结果的类型</typeparam>
|
/// <typeparam name="TResult">查询结果的类型</typeparam>
|
||||||
public abstract class AbstractQuery<TInput, TResult>(TInput input) : ContextAwareBase, IQuery<TResult>
|
public abstract class AbstractQuery<TInput, TResult>(TInput input) : ContextAwareBase, GFramework.Core.Abstractions.Query.IQuery<TResult>
|
||||||
where TInput : IQueryInput
|
where TInput : IQueryInput
|
||||||
{
|
{
|
||||||
/// <summary>
|
/// <summary>
|
||||||
@ -27,4 +27,4 @@ public abstract class AbstractQuery<TInput, TResult>(TInput input) : ContextAwar
|
|||||||
/// <param name="input">查询输入参数</param>
|
/// <param name="input">查询输入参数</param>
|
||||||
/// <returns>查询结果,类型为TResult</returns>
|
/// <returns>查询结果,类型为TResult</returns>
|
||||||
protected abstract TResult OnDo(TInput input);
|
protected abstract TResult OnDo(TInput input);
|
||||||
}
|
}
|
||||||
|
|||||||
@ -18,9 +18,17 @@
|
|||||||
</PropertyGroup>
|
</PropertyGroup>
|
||||||
|
|
||||||
<ItemGroup>
|
<ItemGroup>
|
||||||
<Analyzer Include="$(MSBuildThisFileDirectory)../analyzers/dotnet/cs/GFramework.Godot.SourceGenerators.dll"/>
|
<!--
|
||||||
<Analyzer Include="$(MSBuildThisFileDirectory)../analyzers/dotnet/cs/GFramework.Godot.SourceGenerators.Abstractions.dll"/>
|
仅在 NuGet 打包布局存在时自动注入 analyzer。
|
||||||
<Analyzer Include="$(MSBuildThisFileDirectory)../analyzers/dotnet/cs/GFramework.SourceGenerators.Common.dll"/>
|
仓库内项目引用场景会通过 ProjectReference(OutputItemType=Analyzer) 提供生成器,
|
||||||
|
因此这里需要避免对不存在的打包路径做无效引用。
|
||||||
|
-->
|
||||||
|
<Analyzer Include="$(MSBuildThisFileDirectory)../analyzers/dotnet/cs/GFramework.Godot.SourceGenerators.dll"
|
||||||
|
Condition="Exists('$(MSBuildThisFileDirectory)../analyzers/dotnet/cs/GFramework.Godot.SourceGenerators.dll')"/>
|
||||||
|
<Analyzer Include="$(MSBuildThisFileDirectory)../analyzers/dotnet/cs/GFramework.Godot.SourceGenerators.Abstractions.dll"
|
||||||
|
Condition="Exists('$(MSBuildThisFileDirectory)../analyzers/dotnet/cs/GFramework.Godot.SourceGenerators.Abstractions.dll')"/>
|
||||||
|
<Analyzer Include="$(MSBuildThisFileDirectory)../analyzers/dotnet/cs/GFramework.SourceGenerators.Common.dll"
|
||||||
|
Condition="Exists('$(MSBuildThisFileDirectory)../analyzers/dotnet/cs/GFramework.SourceGenerators.Common.dll')"/>
|
||||||
</ItemGroup>
|
</ItemGroup>
|
||||||
|
|
||||||
<ItemGroup Condition="Exists('$(MSBuildProjectDirectory)/$(GFrameworkGodotProjectFile)')">
|
<ItemGroup Condition="Exists('$(MSBuildProjectDirectory)/$(GFrameworkGodotProjectFile)')">
|
||||||
|
|||||||
@ -1,8 +1,10 @@
|
|||||||
using GFramework.Core.Abstractions.Rule;
|
using GFramework.Core.Abstractions.Cqrs;
|
||||||
|
using GFramework.Core.Abstractions.Cqrs.Command;
|
||||||
|
using GFramework.Core.Abstractions.Cqrs.Query;
|
||||||
|
using GFramework.Core.Abstractions.Rule;
|
||||||
using GFramework.Core.Coroutine;
|
using GFramework.Core.Coroutine;
|
||||||
using GFramework.Core.Coroutine.Extensions;
|
using GFramework.Core.Coroutine.Extensions;
|
||||||
using GFramework.Core.Extensions;
|
using GFramework.Core.Extensions;
|
||||||
using Mediator;
|
|
||||||
|
|
||||||
namespace GFramework.Godot.Coroutine;
|
namespace GFramework.Godot.Coroutine;
|
||||||
|
|
||||||
@ -104,4 +106,4 @@ public static class ContextAwareCoroutineExtensions
|
|||||||
.ToCoroutineEnumerator()
|
.ToCoroutineEnumerator()
|
||||||
.RunCoroutine(segment, tag);
|
.RunCoroutine(segment, tag);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
109
local-plan/todos/cqrs-rewrite-migration-tracking.md
Normal file
109
local-plan/todos/cqrs-rewrite-migration-tracking.md
Normal file
@ -0,0 +1,109 @@
|
|||||||
|
# CQRS 重写迁移跟踪
|
||||||
|
|
||||||
|
## 目标
|
||||||
|
|
||||||
|
围绕 `GFramework` 当前的双轨 CQRS 现状,完成一轮以“去 Mediator 外部依赖”为目标的架构迁移:
|
||||||
|
|
||||||
|
- 将 `Mediator` 从 GFramework 公共 API 和运行时主路径中移除
|
||||||
|
- 基于 GFramework 自有抽象重建正式 CQRS runtime、行为管道和注册机制
|
||||||
|
- 保留 `EventBus` 作为框架级事件系统,不与 CQRS notification 混同
|
||||||
|
- 让 `CoreGrid-Migration` 直连本地 `GFramework`,作为真实迁移验证工程
|
||||||
|
- 为复杂迁移建立明确恢复点与进度追踪,避免上下文过长或中断后失去状态
|
||||||
|
|
||||||
|
## 当前恢复点
|
||||||
|
|
||||||
|
- 恢复点编号:`CQRS-REWRITE-RP-002`
|
||||||
|
- 当前阶段:`Phase 4`
|
||||||
|
- 当前焦点:
|
||||||
|
- 清理剩余 `Mediator` 包依赖与文档残留
|
||||||
|
- 评估是否继续把协程扩展和测试项目中的 `Mediator.Abstractions` 完全移除
|
||||||
|
- 规划第二阶段优化:代码生成注册、性能收敛、行为 API 命名统一
|
||||||
|
|
||||||
|
## 本轮计划
|
||||||
|
|
||||||
|
### Phase 0:工作流基础
|
||||||
|
|
||||||
|
- [x] 在 `local-plan/todos/` 建立本任务跟踪文档
|
||||||
|
- [x] 在 `local-plan/traces/` 建立本任务追踪文档
|
||||||
|
- [x] 将恢复点 / trace / subagent 协作规范写入 `AGENTS.md`
|
||||||
|
|
||||||
|
### Phase 1:本地验证链路
|
||||||
|
|
||||||
|
- [x] 确认 `CoreGrid-Migration` 当前引用形态
|
||||||
|
- [x] 将 `CoreGrid-Migration` 从 NuGet 包切到本地 `GFramework` 工程引用
|
||||||
|
- [x] 让 `CoreGrid-Migration` 使用本地 Source Generator 而不是外部已发布版本
|
||||||
|
- [x] 验证本地引用链路至少能完成 restore / build
|
||||||
|
|
||||||
|
### Phase 2:CQRS 基础重建
|
||||||
|
|
||||||
|
- [x] 在 `GFramework.Core.Abstractions` 定义自有 CQRS 契约
|
||||||
|
- [x] 在 `GFramework.Core` 落地 dispatcher / handler registry / behavior pipeline
|
||||||
|
- [x] 清理 `IArchitectureContext` 中对 `Mediator.*` 的公共签名依赖
|
||||||
|
- [x] 设计 CQRS 模块启用方式,替代 `Configurator => AddMediator(...)`
|
||||||
|
|
||||||
|
### Phase 3:接入迁移
|
||||||
|
|
||||||
|
- [x] 迁移 `GFramework.Core.Cqrs.*` 基类到新契约
|
||||||
|
- [x] 迁移 `ContextAwareMediator*Extensions` 与协程扩展
|
||||||
|
- [x] 迁移 `CoreGrid-Migration/scripts/cqrs/**` 到新契约
|
||||||
|
- [x] 删除 `GameArchitecture.Configurator` 中的 `AddMediator(...)`
|
||||||
|
|
||||||
|
### Phase 4:收尾
|
||||||
|
|
||||||
|
- [ ] 移除 `Mediator` 包依赖与相关测试/文档残留
|
||||||
|
- [x] 运行目标构建与测试
|
||||||
|
- [x] 记录剩余风险与下一恢复点
|
||||||
|
|
||||||
|
## 当前完成结果
|
||||||
|
|
||||||
|
- `CoreGrid-Migration` 已直连本地 `GFramework` 源码与本地 source generators。
|
||||||
|
- `GameArchitecture` 已不再依赖 `collection.AddMediator(...)` 即可使用 CQRS。
|
||||||
|
- `GFramework.Core.Abstractions` 新增自有 CQRS 契约:
|
||||||
|
- `IRequest<TResponse>` / `INotification` / `IStreamRequest<TResponse>`
|
||||||
|
- `IRequestHandler<,>` / `INotificationHandler<>` / `IStreamRequestHandler<,>`
|
||||||
|
- `Unit`
|
||||||
|
- `IPipelineBehavior<,>` / `MessageHandlerDelegate<,>`
|
||||||
|
- `ArchitectureBootstrapper` 会在初始化阶段自动扫描并注册当前架构程序集与 `GFramework.Core` 程序集中的 CQRS handlers。
|
||||||
|
- `CqrsDispatcher` 已支持:
|
||||||
|
- request dispatch
|
||||||
|
- notification publish
|
||||||
|
- stream dispatch
|
||||||
|
- context-aware handler 注入
|
||||||
|
- request pipeline behavior 链式执行
|
||||||
|
- `GFramework.Core.Tests` 中原依赖 `Mediator` 注册路径的测试已切换到框架内建 CQRS 注册路径。
|
||||||
|
- 当前验证状态:
|
||||||
|
- `dotnet build GFramework/GFramework.sln` 通过
|
||||||
|
- `dotnet test GFramework/GFramework.Core.Tests/GFramework.Core.Tests.csproj --no-build` 通过,`1621` 个测试全部通过
|
||||||
|
- `dotnet build CoreGrid-Migration/CoreGrid.sln` 通过
|
||||||
|
|
||||||
|
## 当前已知事实
|
||||||
|
|
||||||
|
- `GFramework` 当前仍同时维护:
|
||||||
|
- 基于 `CommandExecutor` / `QueryExecutor` / `EventBus` 的轻量旧 CQRS
|
||||||
|
- 基于 GFramework 自有抽象的新 CQRS runtime
|
||||||
|
- 仍存在 `Mediator` 残留的区域主要集中在:
|
||||||
|
- 文档中的历史说明
|
||||||
|
- `MediatorCoroutineExtensions` 及对应测试
|
||||||
|
- 测试项目对 `Mediator.Abstractions` 的少量残余依赖
|
||||||
|
- `CoreGrid-Migration` 已切到本地源码引用,并在当前恢复点完成构建验证
|
||||||
|
|
||||||
|
## 当前风险
|
||||||
|
|
||||||
|
- `GFramework` 仓库存在与本任务无关的既有改动,提交时必须避免覆盖
|
||||||
|
- `CoreGrid-Migration` 是 worktree,WSL 下原生 `git` 解析该 worktree 路径有兼容问题
|
||||||
|
- 当前 `RegisterMediatorBehavior` 命名仍保留历史前缀,但底层已切换为框架自有 CQRS pipeline;若后续要彻底脱媒介命名,需要一次 API 命名迁移
|
||||||
|
- 当前 handler 自动注册基于运行时反射扫描;若后续追求冷启动与 AOT 友好性,需要补 source-generator 注册路径
|
||||||
|
|
||||||
|
## 下次恢复建议
|
||||||
|
|
||||||
|
若本轮中断,优先从以下顺序恢复:
|
||||||
|
|
||||||
|
1. 查看 `local-plan/traces/cqrs-rewrite-migration-trace.md`
|
||||||
|
2. 确认当前恢复点 `CQRS-REWRITE-RP-002` 已对应到最新提交
|
||||||
|
3. 优先决定是否继续移除 `Mediator.Abstractions` 包与 `MediatorCoroutineExtensions` 历史兼容层
|
||||||
|
4. 若继续演进,再处理 CQRS 注册的生成器化与 API 命名统一
|
||||||
|
|
||||||
|
## 备注
|
||||||
|
|
||||||
|
- 本文档是当前任务的主恢复点,后续每个关键阶段完成后都要更新
|
||||||
|
- 发生方向调整时,不覆盖旧结论,直接追加阶段记录与新的恢复点编号
|
||||||
68
local-plan/traces/cqrs-rewrite-migration-trace.md
Normal file
68
local-plan/traces/cqrs-rewrite-migration-trace.md
Normal file
@ -0,0 +1,68 @@
|
|||||||
|
# CQRS 重写迁移追踪
|
||||||
|
|
||||||
|
## 2026-04-14
|
||||||
|
|
||||||
|
### 阶段:初始化
|
||||||
|
|
||||||
|
- 建立 `CQRS-REWRITE-RP-001` 恢复点
|
||||||
|
- 已确认本次迁移目标:
|
||||||
|
- 彻底参考 `Mediator` 思路重写 GFramework 正式 CQRS
|
||||||
|
- 不保留对 `Mediator` 的兼容层
|
||||||
|
- 使用 `abstractions + runtime 可选模块` 边界
|
||||||
|
- 保留 `EventBus`,不与 CQRS notification 合并
|
||||||
|
|
||||||
|
### 已确认的实现前提
|
||||||
|
|
||||||
|
- `CoreGrid-Migration` 当前仍依赖 NuGet 版 `GeWuYou.GFramework*`
|
||||||
|
- `CoreGrid/scripts/core/GameArchitecture.cs` 与 `CoreGrid-Migration/scripts/core/GameArchitecture.cs` 通过 `AddMediator(...)` 启用基于生成器的 runtime
|
||||||
|
- `GFramework` 当前 `IArchitectureContext` 与一批 CQRS 基类直接引用 `Mediator.*`
|
||||||
|
- `CoreGrid/scripts/cqrs/**` 的 handler 很薄,主要迁移成本在框架 runtime 和注册机制,不在业务逻辑本身
|
||||||
|
|
||||||
|
### 当前动作
|
||||||
|
|
||||||
|
- 准备更新 `AGENTS.md`,补充恢复点 / trace / subagent 协作规范
|
||||||
|
- 准备将 `CoreGrid-Migration` 切换为本地项目引用,建立真实验证链路
|
||||||
|
|
||||||
|
### 下一步
|
||||||
|
|
||||||
|
1. 完成 `AGENTS.md` 规则补充
|
||||||
|
2. 改造 `CoreGrid-Migration/CoreGrid.csproj` 为本地项目与本地生成器引用
|
||||||
|
3. 进行第一次构建验证,确认本地链路可用
|
||||||
|
|
||||||
|
### 阶段:CQRS 主路径迁移完成
|
||||||
|
|
||||||
|
- `CoreGrid-Migration/CoreGrid.csproj` 已切到本地 `ProjectReference` + 本地 source generators
|
||||||
|
- `CoreGrid-Migration/scripts/core/GameArchitecture.cs` 已删除 `AddMediator(...)` 配置钩子
|
||||||
|
- `GFramework.Core.Abstractions` 新增 GFramework 自有 CQRS 契约与 `Unit`
|
||||||
|
- `IArchitectureContext` / `ArchitectureContext` 已切到自有 CQRS 签名
|
||||||
|
- `ArchitectureBootstrapper` 已内建 handler 扫描注册,使用方无需再显式调用 `AddMediator(...)`
|
||||||
|
- `CqrsDispatcher` 已补齐 request/notification/stream dispatch 与 pipeline behavior 执行
|
||||||
|
- `GFramework.Core.Cqrs.*` 基类、`ContextAwareMediator*Extensions`、Godot 协程上下文扩展均已迁到新契约
|
||||||
|
- `GFramework.Core.Tests` 中原依赖旧 `Mediator` 注册入口的测试已迁移到 `CqrsTestRuntime` 反射注册路径
|
||||||
|
|
||||||
|
### 阶段:验证
|
||||||
|
|
||||||
|
- `dotnet build /mnt/f/gewuyou/System/Documents/WorkSpace/GameDev/GFramework/GFramework.Core/GFramework.Core.csproj`
|
||||||
|
- 结果:通过
|
||||||
|
- `dotnet build /mnt/f/gewuyou/System/Documents/WorkSpace/GameDev/GFramework/GFramework.Core.Tests/GFramework.Core.Tests.csproj`
|
||||||
|
- 结果:通过
|
||||||
|
- `dotnet test /mnt/f/gewuyou/System/Documents/WorkSpace/GameDev/GFramework/GFramework.Core.Tests/GFramework.Core.Tests.csproj --no-build`
|
||||||
|
- 结果:通过
|
||||||
|
- 明细:`1621` 个测试全部通过
|
||||||
|
- `dotnet build /mnt/f/gewuyou/System/Documents/WorkSpace/GameDev/GFramework/GFramework.sln`
|
||||||
|
- 结果:通过
|
||||||
|
- `dotnet build /mnt/f/gewuyou/System/Documents/WorkSpace/GameDev/CoreGrid-Migration/CoreGrid.sln`
|
||||||
|
- 结果:通过
|
||||||
|
- 备注:仅存在既有 analyzer warnings,无新增构建错误
|
||||||
|
|
||||||
|
### 当前残留
|
||||||
|
|
||||||
|
- 文档与少量历史 API 命名仍保留 `Mediator` 前缀
|
||||||
|
- `MediatorCoroutineExtensions` 与少量测试仍依赖 `Mediator.Abstractions`
|
||||||
|
- handler 自动注册当前使用运行时反射扫描,尚未切回生成器注册
|
||||||
|
|
||||||
|
### 下一步建议
|
||||||
|
|
||||||
|
1. 决定是否继续做“完全移除 `Mediator.Abstractions` 包”的第二阶段清理
|
||||||
|
2. 若继续,优先迁移协程扩展与相关测试
|
||||||
|
3. 评估是否将 `RegisterMediatorBehavior`、`ContextAwareMediator*Extensions` 等历史命名升级为 CQRS 中性命名
|
||||||
Loading…
x
Reference in New Issue
Block a user