GeWuYou f984f4a600 refactor(core): 优化核心组件的线程安全性和错误处理
- 重构 AsyncLogAppender 的 Flush 方法,添加超时控制和 SpinWait 优化
- 为 BindableProperty 添加线程安全锁保护,确保并发访问的安全性
- 在 BindableProperty 中实现回调外部调用以避免死锁问题
- 为 EasyEvents 使用 ConcurrentDictionary 替代 Dictionary 提高并发性能
- 添加协程调度器异常处理回调机制,防止异常传播导致调度器崩溃
- 为 FileAppender 添加初始化异常处理和资源清理逻辑
- 补充完整的单元测试覆盖并发场景下的线程安全性验证
2026-03-04 11:04:59 +08:00

170 lines
4.8 KiB
C#
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

using System.Threading.Channels;
using GFramework.Core.Abstractions.logging;
namespace GFramework.Core.logging.appenders;
/// <summary>
/// 异步日志输出器,使用 Channel 实现非阻塞日志写入
/// </summary>
public sealed class AsyncLogAppender : ILogAppender, IDisposable
{
private readonly Channel<LogEntry> _channel;
private readonly CancellationTokenSource _cts;
private readonly ILogAppender _innerAppender;
private readonly Task _processingTask;
private bool _disposed;
/// <summary>
/// 创建异步日志输出器
/// </summary>
/// <param name="innerAppender">内部日志输出器</param>
/// <param name="bufferSize">缓冲区大小(默认 10000</param>
public AsyncLogAppender(ILogAppender innerAppender, int bufferSize = 10000)
{
_innerAppender = innerAppender ?? throw new ArgumentNullException(nameof(innerAppender));
if (bufferSize <= 0)
throw new ArgumentException("Buffer size must be greater than 0.", nameof(bufferSize));
// 创建有界 Channel
var options = new BoundedChannelOptions(bufferSize)
{
FullMode = BoundedChannelFullMode.Wait, // 缓冲区满时等待
SingleReader = true,
SingleWriter = false
};
_channel = Channel.CreateBounded<LogEntry>(options);
_cts = new CancellationTokenSource();
// 启动后台处理任务
_processingTask = Task.Run(() => ProcessLogsAsync(_cts.Token));
}
/// <summary>
/// 获取当前缓冲区中的日志数量
/// </summary>
public int PendingCount => _channel.Reader.Count;
/// <summary>
/// 获取是否已完成处理
/// </summary>
public bool IsCompleted => _channel.Reader.Completion.IsCompleted;
/// <summary>
/// 释放资源
/// </summary>
public void Dispose()
{
if (_disposed) return;
// 标记 Channel 为完成状态
_channel.Writer.Complete();
// 等待处理任务完成(最多等待 5 秒)
if (!_processingTask.Wait(TimeSpan.FromSeconds(5)))
{
_cts.Cancel();
}
// 释放内部 Appender
if (_innerAppender is IDisposable disposable)
{
disposable.Dispose();
}
_cts.Dispose();
_disposed = true;
}
/// <summary>
/// 追加日志条目(非阻塞)
/// </summary>
/// <param name="entry">日志条目</param>
public void Append(LogEntry entry)
{
if (_disposed)
throw new ObjectDisposedException(nameof(AsyncLogAppender));
// 尝试非阻塞写入,如果失败则丢弃(避免阻塞调用线程)
_channel.Writer.TryWrite(entry);
}
/// <summary>
/// 刷新缓冲区ILogAppender 接口实现)
/// </summary>
void ILogAppender.Flush()
{
Flush();
}
/// <summary>
/// 刷新缓冲区,等待所有日志写入完成
/// </summary>
/// <param name="timeout">超时时间默认30秒</param>
/// <returns>是否成功刷新所有日志</returns>
public bool Flush(TimeSpan? timeout = null)
{
if (_disposed) return false;
var actualTimeout = timeout ?? TimeSpan.FromSeconds(30);
var startTime = DateTime.UtcNow;
// 使用 SpinWait 替代忙轮询,更高效
var spinWait = new SpinWait();
while (_channel.Reader.Count > 0)
{
if (DateTime.UtcNow - startTime > actualTimeout)
{
return false; // 超时
}
spinWait.SpinOnce();
}
_innerAppender.Flush();
return true;
}
/// <summary>
/// 后台处理日志的异步方法
/// </summary>
private async Task ProcessLogsAsync(CancellationToken cancellationToken)
{
try
{
await foreach (var entry in _channel.Reader.ReadAllAsync(cancellationToken))
{
try
{
_innerAppender.Append(entry);
}
catch (Exception ex)
{
// 记录内部错误到控制台(避免递归)
await Console.Error.WriteLineAsync($"[AsyncLogAppender] Error processing log entry: {ex.Message}");
}
}
}
catch (OperationCanceledException)
{
// 正常取消,忽略
}
catch (Exception ex)
{
await Console.Error.WriteLineAsync($"[AsyncLogAppender] Fatal error in processing task: {ex}");
}
finally
{
// 确保最后刷新
try
{
_innerAppender.Flush();
}
catch
{
// 忽略刷新错误
}
}
}
}