Merge pull request #127 from GeWuYou/feat/async-log-appender-error-handler

feat(logging): 添加异步日志输出器的错误处理回调功能
This commit is contained in:
gewuyou 2026-03-21 22:09:06 +08:00 committed by GitHub
commit 63b1d71a0e
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
3 changed files with 128 additions and 14 deletions

View File

@ -1,6 +1,5 @@
using GFramework.Core.Abstractions.Logging;
using GFramework.Core.Logging.Appenders;
using NUnit.Framework;
namespace GFramework.Core.Tests.Logging;
@ -152,8 +151,12 @@ public class AsyncLogAppenderTests
[Test]
public void Append_WhenInnerAppenderThrows_ShouldNotCrash()
{
var reportedExceptions = new List<Exception>();
var innerAppender = new ThrowingAppender();
using var asyncAppender = new AsyncLogAppender(innerAppender, bufferSize: 1000);
using var asyncAppender = new AsyncLogAppender(
innerAppender,
bufferSize: 1000,
processingErrorHandler: reportedExceptions.Add);
// 即使内部 Appender 抛出异常,也不应该影响调用线程
Assert.DoesNotThrow(() =>
@ -165,7 +168,56 @@ public class AsyncLogAppenderTests
}
});
Thread.Sleep(100); // 等待后台处理
asyncAppender.Flush();
Assert.That(reportedExceptions, Has.Count.EqualTo(10));
Assert.That(reportedExceptions, Has.All.TypeOf<InvalidOperationException>());
Assert.That(reportedExceptions.Select(static exception => exception.Message),
Has.All.EqualTo("Test exception"));
}
[Test]
public void Append_WhenProcessingErrorHandlerThrows_ShouldStillNotCrash()
{
var innerAppender = new ThrowingAppender();
using var asyncAppender = new AsyncLogAppender(
innerAppender,
bufferSize: 1000,
processingErrorHandler: static _ => throw new InvalidOperationException("Observer failure"));
Assert.DoesNotThrow(() =>
{
for (int i = 0; i < 10; i++)
{
var entry = new LogEntry(DateTime.UtcNow, LogLevel.Info, "TestLogger", $"Message {i}", null, null);
asyncAppender.Append(entry);
}
});
Assert.That(asyncAppender.Flush(), Is.True);
}
[Test]
public void Append_WhenInnerAppenderThrowsOperationCanceledException_ShouldNotReportError()
{
var reportedExceptions = new List<Exception>();
var innerAppender = new CancellationAppender();
using var asyncAppender = new AsyncLogAppender(
innerAppender,
bufferSize: 1000,
processingErrorHandler: reportedExceptions.Add);
Assert.DoesNotThrow(() =>
{
for (int i = 0; i < 10; i++)
{
var entry = new LogEntry(DateTime.UtcNow, LogLevel.Info, "TestLogger", $"Message {i}", null, null);
asyncAppender.Append(entry);
}
});
Assert.That(asyncAppender.Flush(), Is.True);
Assert.That(reportedExceptions, Is.Empty);
}
// 辅助测试类
@ -228,4 +280,20 @@ public class AsyncLogAppenderTests
{
}
}
private class CancellationAppender : ILogAppender
{
public void Append(LogEntry entry)
{
throw new OperationCanceledException("Simulated cancellation");
}
public void Flush()
{
}
public void Dispose()
{
}
}
}

View File

@ -15,4 +15,5 @@ global using System;
global using System.Collections.Generic;
global using System.Linq;
global using System.Threading;
global using System.Threading.Tasks;
global using System.Threading.Tasks;
global using System.Threading.Channels;

View File

@ -1,17 +1,26 @@
using System.Threading.Channels;
using GFramework.Core.Abstractions.Logging;
namespace GFramework.Core.Logging.Appenders;
/// <summary>
/// 异步日志输出器,使用 Channel 实现非阻塞日志写入
/// 异步日志输出器,使用 <see cref="Channel" /> 将调用线程与慢速日志目标解耦。
/// </summary>
public sealed class AsyncLogAppender : ILogAppender, IDisposable
/// <remarks>
/// <para>
/// 该输出器在后台线程中顺序消费日志条目,因此调用方不会因为文件 IO 或其他慢速输出目标而阻塞。
/// </para>
/// <para>
/// 内部输出器抛出的异常不会重新抛回调用线程;如需观察后台处理失败,请在构造函数中提供
/// <c>processingErrorHandler</c> 回调。
/// </para>
/// </remarks>
public sealed class AsyncLogAppender : ILogAppender
{
private readonly Channel<LogEntry> _channel;
private readonly CancellationTokenSource _cts;
private readonly SemaphoreSlim _flushSemaphore = new(0, 1);
private readonly ILogAppender _innerAppender;
private readonly Action<Exception>? _processingErrorHandler;
private readonly Task _processingTask;
private bool _disposed;
private volatile bool _flushRequested;
@ -21,9 +30,17 @@ public sealed class AsyncLogAppender : ILogAppender, IDisposable
/// </summary>
/// <param name="innerAppender">内部日志输出器</param>
/// <param name="bufferSize">缓冲区大小(默认 10000</param>
public AsyncLogAppender(ILogAppender innerAppender, int bufferSize = 10000)
/// <param name="processingErrorHandler">
/// 后台处理日志时的错误回调。
/// 默认值为 <see langword="null" />,表示吞掉内部异常以避免污染宿主标准错误输出。
/// </param>
public AsyncLogAppender(
ILogAppender innerAppender,
int bufferSize = 10000,
Action<Exception>? processingErrorHandler = null)
{
_innerAppender = innerAppender ?? throw new ArgumentNullException(nameof(innerAppender));
_processingErrorHandler = processingErrorHandler;
if (bufferSize <= 0)
throw new ArgumentException("Buffer size must be greater than 0.", nameof(bufferSize));
@ -138,7 +155,8 @@ public sealed class AsyncLogAppender : ILogAppender, IDisposable
}
/// <summary>
/// 后台处理日志的异步方法
/// 后台处理日志的异步方法。
/// 该循环必须始终保持存活,因此所有内部异常都通过回调上报并被吞掉。
/// </summary>
private async Task ProcessLogsAsync(CancellationToken cancellationToken)
{
@ -152,8 +170,8 @@ public sealed class AsyncLogAppender : ILogAppender, IDisposable
}
catch (Exception ex)
{
// 记录内部错误到控制台(避免递归)
await Console.Error.WriteLineAsync($"[AsyncLogAppender] Error processing log entry: {ex.Message}");
// 后台消费失败只通过显式回调暴露,避免测试宿主将 stderr 误判为测试告警。
ReportProcessingError(ex);
}
// 检查是否有刷新请求且通道已空
@ -175,7 +193,7 @@ public sealed class AsyncLogAppender : ILogAppender, IDisposable
}
catch (Exception ex)
{
await Console.Error.WriteLineAsync($"[AsyncLogAppender] Fatal error in processing task: {ex}");
ReportProcessingError(ex);
}
finally
{
@ -184,10 +202,37 @@ public sealed class AsyncLogAppender : ILogAppender, IDisposable
{
_innerAppender.Flush();
}
catch
catch (Exception ex)
{
// 忽略刷新错误
ReportProcessingError(ex);
}
}
}
/// <summary>
/// 上报后台处理异常,同时隔离观察者自身抛出的错误,避免终止处理循环。
/// 取消相关异常表示关闭流程中的预期控制流,不应被视为后台处理失败。
/// </summary>
/// <param name="exception">后台处理中捕获到的异常。</param>
private void ReportProcessingError(Exception exception)
{
if (exception is OperationCanceledException)
{
return;
}
if (_processingErrorHandler is null)
{
return;
}
try
{
_processingErrorHandler(exception);
}
catch
{
// 错误观察者只用于诊断,绝不能反向影响日志处理线程的生命周期。
}
}
}