using GFramework.Core.Abstractions.Architectures;
using GFramework.Core.Abstractions.Events;
using GFramework.Core.Abstractions.Logging;
using GFramework.Core.Architectures;
using GFramework.Core.Command;
using GFramework.Core.Environment;
using GFramework.Core.Events;
using GFramework.Core.Ioc;
using GFramework.Core.Logging;
using GFramework.Core.Query;
using GFramework.Cqrs.Abstractions.Cqrs;
using ICommand = GFramework.Core.Abstractions.Command.ICommand;
namespace GFramework.Cqrs.Tests.Mediator;
[TestFixture]
public class MediatorComprehensiveTests
{
///
/// 测试初始化方法,在每个测试方法执行前运行。
/// 负责初始化日志工厂、依赖注入容器、自有 CQRS 处理器以及各种总线服务。
///
[SetUp]
public void SetUp()
{
LoggerFactoryResolver.Provider = new ConsoleLoggerFactoryProvider();
_container = new MicrosoftDiContainer();
var loggerField = typeof(MicrosoftDiContainer).GetField("_logger",
BindingFlags.NonPublic | BindingFlags.Instance);
loggerField?.SetValue(_container,
LoggerFactoryResolver.Provider.CreateLogger(nameof(MediatorComprehensiveTests)));
// 注册基础服务(Legacy CQRS)
_eventBus = new EventBus();
_commandBus = new CommandExecutor();
_queryBus = new QueryExecutor();
_asyncQueryBus = new AsyncQueryExecutor();
_environment = new DefaultEnvironment();
_container.RegisterPlurality(_eventBus);
_container.RegisterPlurality(_commandBus);
_container.RegisterPlurality(_queryBus);
_container.RegisterPlurality(_asyncQueryBus);
_container.RegisterPlurality(_environment);
CqrsTestRuntime.RegisterHandlers(
_container,
typeof(MediatorComprehensiveTests).Assembly,
typeof(ArchitectureContext).Assembly);
_container.Freeze();
_context = new ArchitectureContext(_container);
}
///
/// 测试清理方法,在每个测试方法执行后运行。
/// 负责释放容器和上下文资源。
///
[TearDown]
public void TearDown()
{
_context = null;
_container = null;
_eventBus = null;
_commandBus = null;
_queryBus = null;
_asyncQueryBus = null;
_environment = null;
}
private AsyncQueryExecutor? _asyncQueryBus;
private CommandExecutor? _commandBus;
private MicrosoftDiContainer? _container;
private ArchitectureContext? _context;
private DefaultEnvironment? _environment;
private EventBus? _eventBus;
private QueryExecutor? _queryBus;
///
/// 测试SendRequestAsync方法在请求有效时返回结果
///
[Test]
public async Task SendRequestAsync_Should_ReturnResult_When_Request_IsValid()
{
var testRequest = new TestRequest { Value = 42 };
var result = await _context!.SendRequestAsync(testRequest);
Assert.That(result, Is.EqualTo(42));
}
///
/// 测试SendRequestAsync方法在请求为null时抛出ArgumentNullException
///
[Test]
public void SendRequestAsync_Should_ThrowArgumentNullException_When_Request_IsNull()
{
Assert.ThrowsAsync(async () =>
await _context!.SendRequestAsync(null!));
}
///
/// 测试SendRequest方法在请求有效时返回结果
///
[Test]
public void SendRequest_Should_ReturnResult_When_Request_IsValid()
{
var testRequest = new TestRequest { Value = 123 };
var result = _context!.SendRequest(testRequest);
Assert.That(result, Is.EqualTo(123));
}
///
/// 测试PublishAsync方法在通知有效时发布通知
///
[Test]
public async Task PublishAsync_Should_PublishNotification_When_Notification_IsValid()
{
TestNotificationHandler.LastReceivedMessage = null;
var notification = new TestNotification { Message = "test" };
await _context!.PublishAsync(notification);
await Task.Delay(100);
Assert.That(TestNotificationHandler.LastReceivedMessage, Is.EqualTo("test"));
}
///
/// 测试CreateStream方法在流请求有效时返回流
///
[Test]
public async Task CreateStream_Should_ReturnStream_When_StreamRequest_IsValid()
{
var testStreamRequest = new TestStreamRequest { Values = [1, 2, 3, 4, 5] };
var stream = _context!.CreateStream(testStreamRequest);
var results = new List();
await foreach (var item in stream)
{
results.Add(item);
}
Assert.That(results, Is.EqualTo(new[] { 1, 2, 3, 4, 5 }));
}
///
/// 测试SendAsync方法(无返回值命令)在命令有效时执行
///
[Test]
public async Task SendAsync_CommandWithoutResult_Should_Execute_When_Command_IsValid()
{
var testCommand = new TestCommand { ShouldExecute = true };
await _context!.SendAsync(testCommand);
Assert.That(testCommand.Executed, Is.True);
}
///
/// 测试SendAsync方法(带返回值命令)在命令有效时返回结果
///
[Test]
public async Task SendAsync_CommandWithResult_Should_ReturnResult_When_Command_IsValid()
{
var testCommand = new TestCommandWithResult { ResultValue = 42 };
var result = await _context!.SendAsync(testCommand);
Assert.That(result, Is.EqualTo(42));
}
///
/// 测试GetService方法使用缓存
///
[Test]
public void GetService_Should_Use_Cache()
{
var firstResult = _context!.GetService();
Assert.That(firstResult, Is.Not.Null);
Assert.That(firstResult, Is.SameAs(_eventBus));
var secondResult = _context.GetService();
Assert.That(secondResult, Is.SameAs(firstResult));
}
///
/// 测试未注册的 CQRS handler 时抛出 InvalidOperationException
///
[Test]
public void Unregistered_Cqrs_Handler_Should_Throw_InvalidOperationException()
{
var containerWithoutHandlers = new MicrosoftDiContainer();
containerWithoutHandlers.Freeze();
var contextWithoutHandlers = new ArchitectureContext(containerWithoutHandlers);
var testRequest = new TestRequest { Value = 42 };
Assert.ThrowsAsync(async () =>
await contextWithoutHandlers.SendRequestAsync(testRequest));
}
///
/// 测试多个通知处理器都被调用
///
[Test]
public async Task Multiple_Notification_Handlers_Should_All_Be_Invoked()
{
// 重置静态字段
TestNotificationHandler.LastReceivedMessage = null;
TestNotificationHandler2.LastReceivedMessage = null;
TestNotificationHandler3.LastReceivedMessage = null;
var notification = new TestNotification { Message = "multi-handler test" };
await _context!.PublishAsync(notification);
await Task.Delay(100);
// 验证所有处理器都被调用
Assert.That(TestNotificationHandler.LastReceivedMessage, Is.EqualTo("multi-handler test"));
Assert.That(TestNotificationHandler2.LastReceivedMessage, Is.EqualTo("multi-handler test"));
Assert.That(TestNotificationHandler3.LastReceivedMessage, Is.EqualTo("multi-handler test"));
}
///
/// 测试CancellationToken取消长时间运行的请求
///
[Test]
public async Task CancellationToken_Should_Cancel_Long_Running_Request()
{
using var cts = new CancellationTokenSource(TimeSpan.FromMilliseconds(50));
var longRequest = new TestLongRunningRequest { DelayMs = 1000 };
// 应该在50ms后被取消
Assert.ThrowsAsync(async () =>
await _context!.SendRequestAsync(longRequest, cts.Token));
}
///
/// 测试CancellationToken取消流请求
///
[Test]
public async Task CancellationToken_Should_Cancel_Stream_Request()
{
using var cts = new CancellationTokenSource(TimeSpan.FromMilliseconds(100));
var longStreamRequest = new TestLongStreamRequest { ItemCount = 1000 };
var stream = _context!.CreateStream(longStreamRequest, cts.Token);
var results = new List();
// 流应该在100ms后被取消(TaskCanceledException 继承自 OperationCanceledException)
Assert.CatchAsync(async () =>
{
await foreach (var item in stream)
{
results.Add(item);
}
});
// 验证只处理了部分数据
Assert.That(results.Count, Is.LessThan(1000));
}
///
/// 测试并发 CQRS 请求不会相互干扰
///
[Test]
public async Task Concurrent_Cqrs_Requests_Should_Not_Interfere()
{
const int requestCount = 10;
var tasks = new List>();
// 并发发送多个请求
for (int i = 0; i < requestCount; i++)
{
var request = new TestRequest { Value = i };
tasks.Add(_context!.SendRequestAsync(request).AsTask());
}
var results = await Task.WhenAll(tasks);
// 验证所有结果都正确返回
Assert.That(results.Length, Is.EqualTo(requestCount));
Assert.That(results.OrderBy(x => x), Is.EqualTo(Enumerable.Range(0, requestCount)));
}
///
/// 测试处理器异常被正确传播
///
[Test]
public async Task Handler_Exception_Should_Be_Propagated()
{
var faultyRequest = new TestFaultyRequest();
Assert.ThrowsAsync(async () =>
await _context!.SendRequestAsync(faultyRequest));
}
///
/// 测试多个命令处理器可以修改同一对象
///
[Test]
public async Task Multiple_Command_Handlers_Can_Modify_Same_Object()
{
var sharedData = new SharedData();
var command1 = new TestModifyDataCommand { Data = sharedData, Value = 10 };
var command2 = new TestModifyDataCommand { Data = sharedData, Value = 20 };
await _context!.SendAsync(command1);
await _context.SendAsync(command2);
// 验证数据被正确修改
Assert.That(sharedData.Value, Is.EqualTo(30)); // 10 + 20
}
///
/// 测试通知顺序被保留
///
[Test]
public async Task Notification_Ordering_Should_Be_Preserved()
{
var receivedOrder = new List();
TestOrderedNotificationHandler.ReceivedMessages = receivedOrder;
var notifications = new[]
{
new TestOrderedNotification { Order = 1, Message = "First" },
new TestOrderedNotification { Order = 2, Message = "Second" },
new TestOrderedNotification { Order = 3, Message = "Third" }
};
foreach (var notification in notifications)
{
await _context!.PublishAsync(notification);
}
await Task.Delay(200); // 等待所有处理完成
// 验证接收顺序与发送顺序一致
Assert.That(receivedOrder.Count, Is.EqualTo(3));
Assert.That(receivedOrder[0], Is.EqualTo("First"));
Assert.That(receivedOrder[1], Is.EqualTo("Second"));
Assert.That(receivedOrder[2], Is.EqualTo("Third"));
}
///
/// 测试流请求带过滤功能
///
[Test]
public async Task Stream_Request_With_Filtering()
{
var filterRequest = new TestFilterStreamRequest
{
Values = Enumerable.Range(1, 10).ToArray(),
FilterEven = true
};
var stream = _context!.CreateStream(filterRequest);
var results = new List();
await foreach (var item in stream)
{
results.Add(item);
}
// 验证只返回偶数
Assert.That(results.All(x => x % 2 == 0), Is.True);
Assert.That(results, Is.EqualTo(new[] { 2, 4, 6, 8, 10 }));
}
///
/// 测试请求验证使用Behaviors
///
[Test]
public async Task Request_Validation_With_Behaviors()
{
var invalidCommand = new TestValidatedCommand { Name = "" }; // 无效:空字符串
Assert.ThrowsAsync(async () =>
await _context!.SendAsync(invalidCommand));
}
///
/// 测试 CQRS 性能基准
///
[Test]
public async Task Performance_Benchmark_For_Cqrs()
{
const int iterations = 1000;
var stopwatch = Stopwatch.StartNew();
for (int i = 0; i < iterations; i++)
{
var request = new TestRequest { Value = i };
var result = await _context!.SendRequestAsync(request);
Assert.That(result, Is.EqualTo(i));
}
stopwatch.Stop();
var avgTime = stopwatch.ElapsedMilliseconds / (double)iterations;
// 验证性能在合理范围内(平均每个请求不超过10ms)
Assert.That(avgTime, Is.LessThan(10.0));
Console.WriteLine($"Average time per request: {avgTime:F2}ms");
}
///
/// 测试自有 CQRS 和传统 CQRS 可以共存
///
[Test]
public async Task Cqrs_And_Legacy_CQRS_Can_Coexist()
{
// 使用传统方式
var legacyCommand = new TestLegacyCommand();
_context!.SendCommand(legacyCommand);
Assert.That(legacyCommand.Executed, Is.True);
// 使用自有 CQRS 方式
var mediatorCommand = new TestCommandWithResult { ResultValue = 999 };
var result = await _context.SendAsync(mediatorCommand);
Assert.That(result, Is.EqualTo(999));
// 验证两者可以同时工作
Assert.That(legacyCommand.Executed, Is.True);
Assert.That(result, Is.EqualTo(999));
}
}
#region Advanced Test Classes for CQRS Features
public sealed record TestLongRunningRequest : IRequest
{
public int DelayMs { get; init; }
}
public sealed class TestLongRunningRequestHandler : IRequestHandler
{
public async ValueTask Handle(TestLongRunningRequest request, CancellationToken cancellationToken)
{
await Task.Delay(request.DelayMs, cancellationToken);
cancellationToken.ThrowIfCancellationRequested();
return "Completed";
}
}
public sealed record TestLongStreamRequest : IStreamRequest
{
public int ItemCount { get; init; }
}
public sealed class TestLongStreamRequestHandler : IStreamRequestHandler
{
public async IAsyncEnumerable Handle(
TestLongStreamRequest request,
[EnumeratorCancellation] CancellationToken cancellationToken)
{
for (int i = 0; i < request.ItemCount; i++)
{
cancellationToken.ThrowIfCancellationRequested();
yield return i;
await Task.Delay(10, cancellationToken); // 模拟处理延迟
}
}
}
public sealed record TestFaultyRequest : IRequest;
public sealed class TestFaultyRequestHandler : IRequestHandler
{
public ValueTask Handle(TestFaultyRequest request, CancellationToken cancellationToken)
{
throw new InvalidOperationException("Handler failed intentionally");
}
}
public class SharedData
{
public int Value { get; set; }
}
public sealed record TestModifyDataCommand : IRequest
{
public SharedData Data { get; init; } = null!;
public int Value { get; init; }
}
public sealed class TestModifyDataCommandHandler : IRequestHandler
{
public ValueTask Handle(TestModifyDataCommand request, CancellationToken cancellationToken)
{
request.Data.Value += request.Value;
return ValueTask.FromResult(Unit.Value);
}
}
public sealed record TestCachingQuery : IRequest
{
public string Key { get; init; } = string.Empty;
public Dictionary Cache { get; init; } = new();
}
public sealed class TestCachingQueryHandler : IRequestHandler
{
public ValueTask Handle(TestCachingQuery request, CancellationToken cancellationToken)
{
if (request.Cache.TryGetValue(request.Key, out var cachedValue))
{
return new ValueTask(cachedValue);
}
var newValue = $"Value_for_{request.Key}";
request.Cache[request.Key] = newValue;
return new ValueTask(newValue);
}
}
public sealed record TestOrderedNotification : INotification
{
public int Order { get; init; }
public string Message { get; init; } = string.Empty;
}
public sealed class TestOrderedNotificationHandler : INotificationHandler
{
public static List ReceivedMessages { get; set; } = new();
public ValueTask Handle(TestOrderedNotification notification, CancellationToken cancellationToken)
{
ReceivedMessages.Add(notification.Message);
return ValueTask.CompletedTask;
}
}
// 额外的通知处理器来测试多处理器场景
public sealed class TestNotificationHandler2 : INotificationHandler
{
public static string? LastReceivedMessage { get; set; }
public ValueTask Handle(TestNotification notification, CancellationToken cancellationToken)
{
LastReceivedMessage = notification.Message;
return ValueTask.CompletedTask;
}
}
public sealed class TestNotificationHandler3 : INotificationHandler
{
public static string? LastReceivedMessage { get; set; }
public ValueTask Handle(TestNotification notification, CancellationToken cancellationToken)
{
LastReceivedMessage = notification.Message;
return ValueTask.CompletedTask;
}
}
public sealed record TestFilterStreamRequest : IStreamRequest
{
public int[] Values { get; init; } = [];
public bool FilterEven { get; init; }
}
public sealed class TestFilterStreamRequestHandler : IStreamRequestHandler
{
public async IAsyncEnumerable Handle(
TestFilterStreamRequest request,
[EnumeratorCancellation] CancellationToken cancellationToken)
{
foreach (var value in request.Values)
{
cancellationToken.ThrowIfCancellationRequested();
if (request.FilterEven && value % 2 != 0)
continue;
yield return value;
await Task.Yield();
}
}
}
public sealed record TestValidatedCommand : IRequest
{
public string Name { get; init; } = string.Empty;
}
public sealed class TestValidatedCommandHandler : IRequestHandler
{
public ValueTask Handle(TestValidatedCommand request, CancellationToken cancellationToken)
{
if (string.IsNullOrWhiteSpace(request.Name))
{
throw new ArgumentException($"Name cannot be empty {nameof(request.Name)}");
}
return ValueTask.FromResult(Unit.Value);
}
}
// 传统命令用于共存测试
public class TestLegacyCommand : ICommand
{
public bool Executed { get; private set; }
public void Execute()
{
Executed = true;
}
public void SetContext(IArchitectureContext context)
{
// 不需要实现
}
public IArchitectureContext GetContext()
{
return null!;
}
}
#endregion
#region Test Classes - CQRS Runtime
// ✅ 这些类使用自有 CQRS IRequest
public sealed record TestRequest : IRequest
{
public int Value { get; init; }
}
public sealed record TestCommand : IRequest
{
public bool ShouldExecute { get; init; }
public bool Executed { get; set; }
}
public sealed record TestCommandWithResult : IRequest
{
public int ResultValue { get; init; }
}
public sealed record TestQuery : IRequest
{
public string QueryResult { get; init; } = string.Empty;
}
public sealed record TestNotification : INotification
{
public string Message { get; init; } = string.Empty;
}
public sealed record TestStreamRequest : IStreamRequest
{
public int[] Values { get; init; } = [];
}
// ✅ 这些 Handler 使用自有 CQRS IRequestHandler
public sealed class TestRequestHandler : IRequestHandler
{
public ValueTask Handle(TestRequest request, CancellationToken cancellationToken)
{
return new ValueTask(request.Value);
}
}
public sealed class TestCommandHandler : IRequestHandler
{
public ValueTask Handle(TestCommand request, CancellationToken cancellationToken)
{
if (request.ShouldExecute)
{
request.Executed = true;
}
return ValueTask.FromResult(Unit.Value);
}
}
public sealed class TestCommandWithResultHandler : IRequestHandler
{
public ValueTask Handle(TestCommandWithResult request, CancellationToken cancellationToken)
{
return new ValueTask(request.ResultValue);
}
}
public sealed class TestQueryHandler : IRequestHandler
{
public ValueTask Handle(TestQuery request, CancellationToken cancellationToken)
{
return new ValueTask(request.QueryResult);
}
}
public sealed class TestNotificationHandler : INotificationHandler
{
public static string? LastReceivedMessage { get; set; }
public ValueTask Handle(TestNotification notification, CancellationToken cancellationToken)
{
LastReceivedMessage = notification.Message;
return ValueTask.CompletedTask;
}
}
public sealed class TestStreamRequestHandler : IStreamRequestHandler
{
public async IAsyncEnumerable Handle(
TestStreamRequest request,
[EnumeratorCancellation] CancellationToken cancellationToken)
{
foreach (var value in request.Values)
{
cancellationToken.ThrowIfCancellationRequested();
yield return value;
await Task.Yield();
}
}
}
#endregion