using System.Threading.Channels; using GFramework.Core.Abstractions.Logging; namespace GFramework.Core.Logging.Appenders; /// /// 异步日志输出器,使用 Channel 实现非阻塞日志写入 /// public sealed class AsyncLogAppender : ILogAppender, IDisposable { private readonly Channel _channel; private readonly CancellationTokenSource _cts; private readonly SemaphoreSlim _flushSemaphore = new(0, 1); private readonly ILogAppender _innerAppender; private readonly Task _processingTask; private bool _disposed; private volatile bool _flushRequested; /// /// 创建异步日志输出器 /// /// 内部日志输出器 /// 缓冲区大小(默认 10000) public AsyncLogAppender(ILogAppender innerAppender, int bufferSize = 10000) { _innerAppender = innerAppender ?? throw new ArgumentNullException(nameof(innerAppender)); if (bufferSize <= 0) throw new ArgumentException("Buffer size must be greater than 0.", nameof(bufferSize)); // 创建有界 Channel var options = new BoundedChannelOptions(bufferSize) { FullMode = BoundedChannelFullMode.Wait, // 缓冲区满时等待 SingleReader = true, SingleWriter = false }; _channel = Channel.CreateBounded(options); _cts = new CancellationTokenSource(); // 启动后台处理任务 _processingTask = Task.Run(() => ProcessLogsAsync(_cts.Token)); } /// /// 获取当前缓冲区中的日志数量 /// public int PendingCount => _channel.Reader.Count; /// /// 获取是否已完成处理 /// public bool IsCompleted => _channel.Reader.Completion.IsCompleted; /// /// 释放资源 /// public void Dispose() { if (_disposed) return; // 标记 Channel 为完成状态 _channel.Writer.Complete(); // 等待处理任务完成(最多等待 5 秒) if (!_processingTask.Wait(TimeSpan.FromSeconds(5))) { _cts.Cancel(); } // 释放内部 Appender if (_innerAppender is IDisposable disposable) { disposable.Dispose(); } _cts.Dispose(); _flushSemaphore.Dispose(); _disposed = true; } /// /// 追加日志条目(非阻塞) /// /// 日志条目 public void Append(LogEntry entry) { if (_disposed) throw new ObjectDisposedException(nameof(AsyncLogAppender)); // 尝试非阻塞写入,如果失败则丢弃(避免阻塞调用线程) _channel.Writer.TryWrite(entry); } /// /// 刷新缓冲区(ILogAppender 接口实现) /// 注意:此方法会阻塞直到所有待处理日志写入完成,或超时(默认30秒) /// 超时结果可通过 OnFlushCompleted 事件观察 /// void ILogAppender.Flush() { var success = Flush(); OnFlushCompleted?.Invoke(success); } /// /// Flush 操作完成事件,参数指示是否成功(true)或超时(false) /// public event Action? OnFlushCompleted; /// /// 刷新缓冲区,等待所有日志写入完成 /// 使用信号量机制确保可靠的完成通知,避免竞态条件 /// /// 超时时间(默认30秒) /// 是否成功刷新所有日志(true=成功,false=超时) public bool Flush(TimeSpan? timeout = null) { if (_disposed) return false; var actualTimeout = timeout ?? TimeSpan.FromSeconds(30); // 请求刷新 _flushRequested = true; try { // 等待处理任务发出完成信号 var success = _flushSemaphore.Wait(actualTimeout); OnFlushCompleted?.Invoke(success); return success; } finally { _flushRequested = false; } } /// /// 后台处理日志的异步方法 /// private async Task ProcessLogsAsync(CancellationToken cancellationToken) { try { await foreach (var entry in _channel.Reader.ReadAllAsync(cancellationToken)) { try { _innerAppender.Append(entry); } catch (Exception ex) { // 记录内部错误到控制台(避免递归) await Console.Error.WriteLineAsync($"[AsyncLogAppender] Error processing log entry: {ex.Message}"); } // 检查是否有刷新请求且通道已空 if (_flushRequested && _channel.Reader.Count == 0) { _innerAppender.Flush(); // 发出完成信号 if (_flushSemaphore.CurrentCount == 0) { _flushSemaphore.Release(); } } } } catch (OperationCanceledException) { // 正常取消,忽略 } catch (Exception ex) { await Console.Error.WriteLineAsync($"[AsyncLogAppender] Fatal error in processing task: {ex}"); } finally { // 确保最后刷新 try { _innerAppender.Flush(); } catch { // 忽略刷新错误 } } } }