using GFramework.Core.Abstractions.Logging; using GFramework.Core.Architectures; using GFramework.Core.Ioc; using GFramework.Core.Logging; using GFramework.Cqrs.Abstractions.Cqrs; namespace GFramework.Cqrs.Tests.Cqrs; /// /// 验证 CQRS dispatcher 会缓存热路径中的服务类型构造结果。 /// [TestFixture] internal sealed class CqrsDispatcherCacheTests { private MicrosoftDiContainer? _container; private ArchitectureContext? _context; /// /// 初始化测试上下文。 /// [SetUp] public void SetUp() { LoggerFactoryResolver.Provider = new ConsoleLoggerFactoryProvider(); _container = new MicrosoftDiContainer(); CqrsTestRuntime.RegisterHandlers( _container, typeof(CqrsDispatcherCacheTests).Assembly, typeof(ArchitectureContext).Assembly); _container.Freeze(); _context = new ArchitectureContext(_container); } /// /// 清理测试上下文引用。 /// [TearDown] public void TearDown() { _context = null; _container = null; } /// /// 验证相同消息类型重复分发时,不会重复扩张服务类型缓存。 /// [Test] public async Task Dispatcher_Should_Cache_Service_Types_After_First_Dispatch() { var notificationServiceTypes = GetCacheField("NotificationHandlerServiceTypes"); var requestServiceTypes = GetCacheField("RequestServiceTypes"); var streamServiceTypes = GetCacheField("StreamHandlerServiceTypes"); var notificationBefore = notificationServiceTypes.Count; var requestBefore = requestServiceTypes.Count; var streamBefore = streamServiceTypes.Count; await _context!.SendRequestAsync(new DispatcherCacheRequest()); await _context.PublishAsync(new DispatcherCacheNotification()); await DrainAsync(_context.CreateStream(new DispatcherCacheStreamRequest())); var notificationAfterFirstDispatch = notificationServiceTypes.Count; var requestAfterFirstDispatch = requestServiceTypes.Count; var streamAfterFirstDispatch = streamServiceTypes.Count; await _context.SendRequestAsync(new DispatcherCacheRequest()); await _context.PublishAsync(new DispatcherCacheNotification()); await DrainAsync(_context.CreateStream(new DispatcherCacheStreamRequest())); Assert.Multiple(() => { Assert.That(notificationAfterFirstDispatch, Is.EqualTo(notificationBefore + 1)); Assert.That(requestAfterFirstDispatch, Is.EqualTo(requestBefore + 1)); Assert.That(streamAfterFirstDispatch, Is.EqualTo(streamBefore + 1)); Assert.That(notificationServiceTypes.Count, Is.EqualTo(notificationAfterFirstDispatch)); Assert.That(requestServiceTypes.Count, Is.EqualTo(requestAfterFirstDispatch)); Assert.That(streamServiceTypes.Count, Is.EqualTo(streamAfterFirstDispatch)); }); } /// /// 通过反射读取 dispatcher 的静态缓存字典。 /// private static IDictionary GetCacheField(string fieldName) { var dispatcherType = typeof(CqrsReflectionFallbackAttribute).Assembly .GetType("GFramework.Cqrs.Internal.CqrsDispatcher", throwOnError: true)!; var field = dispatcherType.GetField( fieldName, BindingFlags.NonPublic | BindingFlags.Static); Assert.That(field, Is.Not.Null, $"Missing dispatcher cache field {fieldName}."); return field!.GetValue(null) as IDictionary ?? throw new InvalidOperationException( $"Dispatcher cache field {fieldName} does not implement IDictionary."); } /// /// 消费整个异步流,确保建流路径被真实执行。 /// private static async Task DrainAsync(IAsyncEnumerable stream) { await foreach (var _ in stream) { } } } /// /// 用于验证 request 服务类型缓存的测试请求。 /// internal sealed record DispatcherCacheRequest : IRequest; /// /// 用于验证 notification 服务类型缓存的测试通知。 /// internal sealed record DispatcherCacheNotification : INotification; /// /// 用于验证 stream 服务类型缓存的测试请求。 /// internal sealed record DispatcherCacheStreamRequest : IStreamRequest; /// /// 处理 。 /// internal sealed class DispatcherCacheRequestHandler : IRequestHandler { /// /// 返回固定结果,供缓存测试验证 dispatcher 请求路径。 /// public ValueTask Handle(DispatcherCacheRequest request, CancellationToken cancellationToken) { return ValueTask.FromResult(1); } } /// /// 处理 。 /// internal sealed class DispatcherCacheNotificationHandler : INotificationHandler { /// /// 消费通知,不执行额外副作用。 /// public ValueTask Handle(DispatcherCacheNotification notification, CancellationToken cancellationToken) { return ValueTask.CompletedTask; } } /// /// 处理 。 /// internal sealed class DispatcherCacheStreamHandler : IStreamRequestHandler { /// /// 返回一个最小流,供缓存测试命中 stream 分发路径。 /// public async IAsyncEnumerable Handle( DispatcherCacheStreamRequest request, [EnumeratorCancellation] CancellationToken cancellationToken) { yield return 1; await Task.CompletedTask; } }