feat(coroutine): 添加协程调度器和相关功能实现

- 实现了 CoroutineScheduler 类,支持协程的运行、暂停、恢复和终止管理
- 添加了协程槽位管理机制 CoroutineSlot,用于跟踪单个协程状态
- 实现了协程的优先级、标签和分组功能,支持批量操作
- 集成了等待指令系统,包括 WaitForSecondsRealtime、WaitForFixedUpdate 等
- 添加了协程统计功能和异常处理机制
- 实现了 Godot 平台的时间源适配器 GodotTimeSource
- 创建了协程调度器的高级功能测试用例
- 添加了 Timing 节点用于在 Godot 引擎中管理协程生命周期
This commit is contained in:
GeWuYou 2026-04-06 00:33:37 +08:00
parent 6cac882fb4
commit 2053451185
6 changed files with 152 additions and 14 deletions

View File

@ -171,4 +171,63 @@ public sealed class CoroutineSchedulerAdvancedTests
Assert.That(status, Is.EqualTo(CoroutineCompletionStatus.Faulted));
}
/// <summary>
/// 验证完成状态缓存有固定上限,避免无限增长。
/// </summary>
[Test]
public void CompletionStatusHistory_Should_Be_Bounded()
{
var timeSource = new FakeTimeSource();
var scheduler = new CoroutineScheduler(timeSource);
var handles = new List<CoroutineHandle>();
IEnumerator<IYieldInstruction> ImmediateCoroutine()
{
yield break;
}
for (var i = 0; i < 1100; i++)
{
handles.Add(scheduler.Run(ImmediateCoroutine()));
}
Assert.That(scheduler.TryGetCompletionStatus(handles[0], out _), Is.False);
Assert.That(scheduler.TryGetCompletionStatus(handles[^1], out var latestStatus), Is.True);
Assert.That(latestStatus, Is.EqualTo(CoroutineCompletionStatus.Completed));
}
/// <summary>
/// 验证作为首个等待指令的 WaitForCoroutine 会立即启动子协程,并沿用父协程取消令牌。
/// </summary>
[Test]
public async Task WaitForCoroutine_Should_Start_Child_During_Prewarm_And_Propagate_Cancellation()
{
var timeSource = new FakeTimeSource();
var scheduler = new CoroutineScheduler(timeSource);
using var cancellationTokenSource = new CancellationTokenSource();
IEnumerator<IYieldInstruction> ChildCoroutine()
{
yield return new Delay(10);
}
IEnumerator<IYieldInstruction> ParentCoroutine()
{
yield return new WaitForCoroutine(ChildCoroutine());
}
var handle = scheduler.Run(ParentCoroutine(), cancellationToken: cancellationTokenSource.Token);
Assert.That(scheduler.ActiveCoroutineCount, Is.EqualTo(2));
cancellationTokenSource.Cancel();
timeSource.Advance(0.1);
scheduler.Update();
var status = await scheduler.WaitForCompletionAsync(handle);
Assert.That(status, Is.EqualTo(CoroutineCompletionStatus.Cancelled));
Assert.That(scheduler.ActiveCoroutineCount, Is.EqualTo(0));
}
}

View File

@ -34,10 +34,13 @@ public sealed class CoroutineScheduler(
ITimeSource? realtimeTimeSource = null,
CoroutineExecutionStage executionStage = CoroutineExecutionStage.Update)
{
private const int CompletionStatusHistoryLimit = 1024;
private readonly Dictionary<CoroutineHandle, TaskCompletionSource<CoroutineCompletionStatus>> _completionSources =
new();
private readonly Dictionary<CoroutineHandle, CoroutineCompletionStatus> _completionStatuses = new();
private readonly Queue<CoroutineHandle> _completionStatusOrder = new();
private readonly Dictionary<string, HashSet<CoroutineHandle>> _grouped = new();
private readonly ILogger _logger = LoggerFactoryResolver.Provider.CreateLogger(nameof(CoroutineScheduler));
private readonly Dictionary<CoroutineHandle, CoroutineMetadata> _metadata = new();
@ -218,6 +221,7 @@ public sealed class CoroutineScheduler(
var slot = new CoroutineSlot
{
CancellationToken = cancellationToken,
Enumerator = coroutine,
State = CoroutineState.Running,
Handle = handle,
@ -386,7 +390,14 @@ public sealed class CoroutineScheduler(
{
case WaitForCoroutine waitForCoroutine:
{
var targetHandle = Run(waitForCoroutine.Coroutine);
var targetHandle = Run(waitForCoroutine.Coroutine, cancellationToken: slot.CancellationToken);
if (!targetHandle.IsValid)
{
waitForCoroutine.Complete();
slot.Waiting = null;
break;
}
slot.Waiting = waitForCoroutine;
WaitForCoroutine(slot.Handle, targetHandle);
break;
@ -596,6 +607,8 @@ public sealed class CoroutineScheduler(
_tagged.Clear();
_grouped.Clear();
_waiting.Clear();
_completionStatuses.Clear();
_completionStatusOrder.Clear();
_nextSlot = 0;
ActiveCoroutineCount = 0;
@ -629,7 +642,7 @@ public sealed class CoroutineScheduler(
}
else
{
slot.Waiting = slot.Enumerator.Current;
HandleYieldInstruction(slot, slot.Enumerator.Current);
}
}
catch (Exception ex)
@ -712,7 +725,7 @@ public sealed class CoroutineScheduler(
source.TrySetResult(completionStatus);
}
_completionStatuses[handle] = completionStatus;
RecordCompletionStatus(handle, completionStatus);
OnCoroutineFinished?.Invoke(handle, completionStatus, exception);
}
@ -892,6 +905,23 @@ public sealed class CoroutineScheduler(
_statistics.PausedCount = _pausedCount;
}
/// <summary>
/// 记录协程最终状态,并对历史缓存施加固定上限,避免完成状态字典无限增长。
/// </summary>
/// <param name="handle">已结束的协程句柄。</param>
/// <param name="completionStatus">协程最终状态。</param>
private void RecordCompletionStatus(CoroutineHandle handle, CoroutineCompletionStatus completionStatus)
{
_completionStatuses[handle] = completionStatus;
_completionStatusOrder.Enqueue(handle);
while (_completionStatusOrder.Count > CompletionStatusHistoryLimit)
{
var expiredHandle = _completionStatusOrder.Dequeue();
_completionStatuses.Remove(expiredHandle);
}
}
/// <summary>
/// 为协程添加标签。
/// </summary>

View File

@ -13,6 +13,12 @@ internal sealed class CoroutineSlot
/// </summary>
public CancellationTokenRegistration CancellationRegistration;
/// <summary>
/// 创建该协程时传入的取消令牌。
/// 当协程启动子协程时,会把同一个取消令牌继续传递下去,以保持父子协程的取消语义一致。
/// </summary>
public CancellationToken CancellationToken;
/// <summary>
/// 协程枚举器,包含协程的执行逻辑
/// </summary>

View File

@ -1,6 +1,5 @@
using System.Collections.Generic;
using GFramework.Godot.Coroutine;
using NUnit.Framework;
namespace GFramework.Godot.Tests.Coroutine;
@ -49,4 +48,25 @@ public sealed class GodotTimeSourceTests
Assert.That(timeSource.DeltaTime, Is.EqualTo(0.75).Within(0.0001));
Assert.That(timeSource.CurrentTime, Is.EqualTo(2.0).Within(0.0001));
}
/// <summary>
/// 验证绝对时间源在回拨时仍保持单调,不会把 CurrentTime 拉回去。
/// </summary>
[Test]
public void Update_Should_Keep_Absolute_Time_Monotonic_When_Provider_Goes_Backwards()
{
var values = new Queue<double>([5.0, 4.0, 6.5]);
var timeSource = new GodotTimeSource(() => values.Dequeue(), useAbsoluteTime: true);
timeSource.Update();
timeSource.Update();
Assert.That(timeSource.DeltaTime, Is.EqualTo(0).Within(0.0001));
Assert.That(timeSource.CurrentTime, Is.EqualTo(5.0).Within(0.0001));
timeSource.Update();
Assert.That(timeSource.DeltaTime, Is.EqualTo(1.5).Within(0.0001));
Assert.That(timeSource.CurrentTime, Is.EqualTo(6.5).Within(0.0001));
}
}

View File

@ -1,5 +1,4 @@
using GFramework.Core.Abstractions.Coroutine;
using Godot;
namespace GFramework.Godot.Coroutine;
@ -47,9 +46,11 @@ public sealed class GodotTimeSource(Func<double> timeProvider, bool useAbsoluteT
return;
}
DeltaTime = Math.Max(0, value - _lastAbsoluteTime);
_lastAbsoluteTime = value;
CurrentTime = value;
// 对绝对时间源做单调钳制,避免 provider 回拨后把 CurrentTime 也拉回去。
var nextTime = Math.Max(value, _lastAbsoluteTime);
DeltaTime = nextTime - _lastAbsoluteTime;
_lastAbsoluteTime = nextTime;
CurrentTime = nextTime;
return;
}

View File

@ -1,9 +1,5 @@
using System.Reflection;
using GFramework.Core.Abstractions.Coroutine;
using GFramework.Core.Coroutine;
using GFramework.Core.Coroutine.Instructions;
using GFramework.Godot.Extensions;
using Godot;
namespace GFramework.Godot.Coroutine;
@ -470,6 +466,11 @@ public partial class Timing : Node
return handle;
}
if (!GetScheduler(segment).IsCoroutineAlive(handle))
{
return handle;
}
RegisterOwnedCoroutine(owner, handle);
return handle;
}
@ -515,7 +516,13 @@ public partial class Timing : Node
/// <returns>被终止的协程数量。</returns>
public static int KillCoroutines(Node owner)
{
return Instance.KillOwnedCoroutinesOnInstance(owner);
var count = 0;
foreach (var timing in EnumerateActiveInstances())
{
count += timing.KillOwnedCoroutinesOnInstance(owner);
}
return count;
}
/// <summary>
@ -562,7 +569,13 @@ public partial class Timing : Node
/// <returns>该节点当前归属的活跃协程数量。</returns>
public static int GetOwnedCoroutineCount(Node owner)
{
return Instance.GetOwnedCoroutineCountOnInstance(owner);
var count = 0;
foreach (var timing in EnumerateActiveInstances())
{
count += timing.GetOwnedCoroutineCountOnInstance(owner);
}
return count;
}
/// <summary>
@ -683,6 +696,15 @@ public partial class Timing : Node
return id < ActiveInstances.Length ? ActiveInstances[id] : null;
}
/// <summary>
/// 枚举所有当前已注册的 Timing 实例。
/// </summary>
/// <returns>活跃 Timing 实例序列。</returns>
private static IEnumerable<Timing> EnumerateActiveInstances()
{
return ActiveInstances.Where(static timing => timing is not null).Select(static timing => timing!);
}
/// <summary>
/// 检查节点是否处于有效状态。
/// </summary>