GeWuYou d94d8deb29 fix(logging): 修复异步日志追加器中的操作取消异常处理
- 添加对 OperationCanceledException 的特殊处理,避免将其报告为后台处理错误
- 在 ReportProcessingError 方法中检查并过滤掉操作取消异常
- 添加单元测试验证当内部追加器抛出 OperationCanceledException 时不报告错误
- 创建 CancellationAppender 测试辅助类来模拟取消异常场景
- 确保取消相关的异常不会触发错误处理逻辑
2026-03-21 21:59:51 +08:00

238 lines
7.5 KiB
C#
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

using GFramework.Core.Abstractions.Logging;
namespace GFramework.Core.Logging.Appenders;
/// <summary>
/// 异步日志输出器,使用 <see cref="Channel{T}" /> 将调用线程与慢速日志目标解耦。
/// </summary>
/// <remarks>
/// <para>
/// 该输出器在后台线程中顺序消费日志条目,因此调用方不会因为文件 IO 或其他慢速输出目标而阻塞。
/// </para>
/// <para>
/// 内部输出器抛出的异常不会重新抛回调用线程;如需观察后台处理失败,请在构造函数中提供
/// <c>processingErrorHandler</c> 回调。
/// </para>
/// </remarks>
public sealed class AsyncLogAppender : ILogAppender
{
private readonly Channel<LogEntry> _channel;
private readonly CancellationTokenSource _cts;
private readonly SemaphoreSlim _flushSemaphore = new(0, 1);
private readonly ILogAppender _innerAppender;
private readonly Action<Exception>? _processingErrorHandler;
private readonly Task _processingTask;
private bool _disposed;
private volatile bool _flushRequested;
/// <summary>
/// 创建异步日志输出器
/// </summary>
/// <param name="innerAppender">内部日志输出器</param>
/// <param name="bufferSize">缓冲区大小(默认 10000</param>
/// <param name="processingErrorHandler">
/// 后台处理日志时的错误回调。
/// 默认值为 <see langword="null" />,表示吞掉内部异常以避免污染宿主标准错误输出。
/// </param>
public AsyncLogAppender(
ILogAppender innerAppender,
int bufferSize = 10000,
Action<Exception>? processingErrorHandler = null)
{
_innerAppender = innerAppender ?? throw new ArgumentNullException(nameof(innerAppender));
_processingErrorHandler = processingErrorHandler;
if (bufferSize <= 0)
throw new ArgumentException("Buffer size must be greater than 0.", nameof(bufferSize));
// 创建有界 Channel
var options = new BoundedChannelOptions(bufferSize)
{
FullMode = BoundedChannelFullMode.Wait, // 缓冲区满时等待
SingleReader = true,
SingleWriter = false
};
_channel = Channel.CreateBounded<LogEntry>(options);
_cts = new CancellationTokenSource();
// 启动后台处理任务
_processingTask = Task.Run(() => ProcessLogsAsync(_cts.Token));
}
/// <summary>
/// 获取当前缓冲区中的日志数量
/// </summary>
public int PendingCount => _channel.Reader.Count;
/// <summary>
/// 获取是否已完成处理
/// </summary>
public bool IsCompleted => _channel.Reader.Completion.IsCompleted;
/// <summary>
/// 释放资源
/// </summary>
public void Dispose()
{
if (_disposed) return;
// 标记 Channel 为完成状态
_channel.Writer.Complete();
// 等待处理任务完成(最多等待 5 秒)
if (!_processingTask.Wait(TimeSpan.FromSeconds(5)))
{
_cts.Cancel();
}
// 释放内部 Appender
if (_innerAppender is IDisposable disposable)
{
disposable.Dispose();
}
_cts.Dispose();
_flushSemaphore.Dispose();
_disposed = true;
}
/// <summary>
/// 追加日志条目(非阻塞)
/// </summary>
/// <param name="entry">日志条目</param>
public void Append(LogEntry entry)
{
if (_disposed)
throw new ObjectDisposedException(nameof(AsyncLogAppender));
// 尝试非阻塞写入,如果失败则丢弃(避免阻塞调用线程)
_channel.Writer.TryWrite(entry);
}
/// <summary>
/// 刷新缓冲区ILogAppender 接口实现)
/// 注意此方法会阻塞直到所有待处理日志写入完成或超时默认30秒
/// 超时结果可通过 OnFlushCompleted 事件观察
/// </summary>
void ILogAppender.Flush()
{
var success = Flush();
OnFlushCompleted?.Invoke(success);
}
/// <summary>
/// Flush 操作完成事件参数指示是否成功true或超时false
/// </summary>
public event Action<bool>? OnFlushCompleted;
/// <summary>
/// 刷新缓冲区,等待所有日志写入完成
/// 使用信号量机制确保可靠的完成通知,避免竞态条件
/// </summary>
/// <param name="timeout">超时时间默认30秒</param>
/// <returns>是否成功刷新所有日志true=成功false=超时)</returns>
public bool Flush(TimeSpan? timeout = null)
{
if (_disposed) return false;
var actualTimeout = timeout ?? TimeSpan.FromSeconds(30);
// 请求刷新
_flushRequested = true;
try
{
// 等待处理任务发出完成信号
var success = _flushSemaphore.Wait(actualTimeout);
OnFlushCompleted?.Invoke(success);
return success;
}
finally
{
_flushRequested = false;
}
}
/// <summary>
/// 后台处理日志的异步方法。
/// 该循环必须始终保持存活,因此所有内部异常都通过回调上报并被吞掉。
/// </summary>
private async Task ProcessLogsAsync(CancellationToken cancellationToken)
{
try
{
await foreach (var entry in _channel.Reader.ReadAllAsync(cancellationToken))
{
try
{
_innerAppender.Append(entry);
}
catch (Exception ex)
{
// 后台消费失败只通过显式回调暴露,避免测试宿主将 stderr 误判为测试告警。
ReportProcessingError(ex);
}
// 检查是否有刷新请求且通道已空
if (_flushRequested && _channel.Reader.Count == 0)
{
_innerAppender.Flush();
// 发出完成信号
if (_flushSemaphore.CurrentCount == 0)
{
_flushSemaphore.Release();
}
}
}
}
catch (OperationCanceledException)
{
// 正常取消,忽略
}
catch (Exception ex)
{
ReportProcessingError(ex);
}
finally
{
// 确保最后刷新
try
{
_innerAppender.Flush();
}
catch (Exception ex)
{
ReportProcessingError(ex);
}
}
}
/// <summary>
/// 上报后台处理异常,同时隔离观察者自身抛出的错误,避免终止处理循环。
/// 取消相关异常表示关闭流程中的预期控制流,不应被视为后台处理失败。
/// </summary>
/// <param name="exception">后台处理中捕获到的异常。</param>
private void ReportProcessingError(Exception exception)
{
if (exception is OperationCanceledException)
{
return;
}
if (_processingErrorHandler is null)
{
return;
}
try
{
_processingErrorHandler(exception);
}
catch
{
// 错误观察者只用于诊断,绝不能反向影响日志处理线程的生命周期。
}
}
}