refactor(state): 优化 Store 状态分发的并发控制机制

- 将 Store 类标记为 sealed 以防止继承
- 引入独立的 dispatch 门闩锁,将状态锁的保护范围缩小为仅保护临界区访问
- 实现 dispatch 过程中的快照机制,确保中间件和 reducer 在锁外执行稳定的不可变序列
- 重构 ExecuteDispatchPipeline 方法,接受快照参数并改为静态方法
- 添加 CreateReducerSnapshot 方法为每次分发创建 reducer 快照
- 更新 StoreBuilder 和 StoreSelection 类为 sealed
- 新增测试用例验证长时间运行的 middleware 不会阻塞状态读取和订阅操作
- 修复 dispatch 过程中状态锁占用时间过长的问题,提升并发性能
This commit is contained in:
GeWuYou 2026-03-23 20:11:10 +08:00
parent 79cebb95b5
commit b7c54743fa
4 changed files with 273 additions and 163 deletions

View File

@ -1,5 +1,3 @@
using GFramework.Core.Abstractions.Property;
using GFramework.Core.Abstractions.StateManagement;
using GFramework.Core.Extensions;
using GFramework.Core.Property;
using GFramework.Core.StateManagement;
@ -316,6 +314,40 @@ public class StoreTests
Assert.That(logs, Is.EqualTo(new[] { "builder:before", "builder:after", "builder:before", "builder:after" }));
}
/// <summary>
/// 测试长时间运行的 middleware 不会长时间占用状态锁,
/// 使读取状态和新增订阅仍能在 dispatch 进行期间完成。
/// </summary>
[Test]
public void Dispatch_Should_Not_Block_State_Read_Or_Subscribe_While_Middleware_Is_Running()
{
using var entered = new ManualResetEventSlim(false);
using var release = new ManualResetEventSlim(false);
var store = CreateStore();
store.UseMiddleware(new BlockingMiddleware(entered, release));
var dispatchTask = Task.Run(() => store.Dispatch(new IncrementAction(1)));
Assert.That(entered.Wait(TimeSpan.FromSeconds(2)), Is.True, "middleware 未按预期进入阻塞阶段");
var stateReadTask = Task.Run(() => store.State.Count);
Assert.That(stateReadTask.Wait(TimeSpan.FromMilliseconds(200)), Is.True, "State 读取被 dispatch 长时间阻塞");
Assert.That(stateReadTask.Result, Is.EqualTo(0), "middleware 执行期间应仍能读取到提交前的状态快照");
var subscribeTask = Task.Run(() =>
{
var unRegister = store.Subscribe(_ => { });
unRegister.UnRegister();
});
Assert.That(subscribeTask.Wait(TimeSpan.FromMilliseconds(200)), Is.True, "Subscribe 被 dispatch 长时间阻塞");
release.Set();
Assert.That(dispatchTask.Wait(TimeSpan.FromSeconds(2)), Is.True, "dispatch 未在释放 middleware 后完成");
Assert.That(store.State.Count, Is.EqualTo(1));
}
/// <summary>
/// 创建一个带有基础 reducer 的测试 Store。
/// </summary>
@ -455,6 +487,25 @@ public class StoreTests
}
}
/// <summary>
/// 用于验证 dispatch 管线在 middleware 执行期间不会占用状态锁的测试中间件。
/// </summary>
private sealed class BlockingMiddleware(ManualResetEventSlim entered, ManualResetEventSlim release)
: IStoreMiddleware<CounterState>
{
/// <summary>
/// 通知测试线程 middleware 已进入阻塞点,并等待释放信号后继续执行。
/// </summary>
/// <param name="context">当前分发上下文。</param>
/// <param name="next">后续处理节点。</param>
public void Invoke(StoreDispatchContext<CounterState> context, Action next)
{
entered.Set();
release.Wait(TimeSpan.FromSeconds(2));
next();
}
}
/// <summary>
/// 在中间件阶段尝试二次分发的测试中间件,用于验证重入保护。
/// </summary>

View File

@ -10,8 +10,15 @@ namespace GFramework.Core.StateManagement;
/// 或需要中间件/诊断能力的状态场景,而不是替代所有简单字段级响应式属性。
/// </summary>
/// <typeparam name="TState">状态树的根状态类型。</typeparam>
public class Store<TState> : IStore<TState>, IStoreDiagnostics<TState>
public sealed class Store<TState> : IStore<TState>, IStoreDiagnostics<TState>
{
/// <summary>
/// Dispatch 串行化门闩。
/// 该锁保证任意时刻只有一个 action 管线在运行,从而保持状态演进顺序确定,
/// 同时避免让耗时 middleware / reducer 长时间占用状态锁。
/// </summary>
private readonly object _dispatchGate = new();
/// <summary>
/// 当前状态变化订阅者列表。
/// 使用显式订阅对象而不是委托链,便于处理原子初始化订阅、挂起补发和精确解绑。
@ -20,18 +27,20 @@ public class Store<TState> : IStore<TState>, IStoreDiagnostics<TState>
/// <summary>
/// Store 内部所有可变状态的同步锁。
/// 该锁同时保护订阅集合、reducer 注册表和分发过程,确保状态演进是串行且可预测的
/// 该锁仅保护状态快照、订阅集合、缓存选择视图和注册表本身的短临界区访问
/// </summary>
private readonly object _lock = new();
/// <summary>
/// 已注册的中间件链,按添加顺序执行。
/// Dispatch 开始时会抓取快照,因此运行中的分发不会受到后续注册变化影响。
/// </summary>
private readonly List<IStoreMiddleware<TState>> _middlewares = [];
/// <summary>
/// 按 action 具体运行时类型组织的 reducer 注册表。
/// Store 采用精确类型匹配策略,保证 reducer 执行顺序和行为保持确定性。
/// Dispatch 开始时会抓取对应 action 类型的 reducer 快照。
/// </summary>
private readonly Dictionary<Type, List<IStoreReducerAdapter>> _reducers = [];
@ -83,6 +92,34 @@ public class Store<TState> : IStore<TState>, IStoreDiagnostics<TState>
_stateComparer = comparer ?? EqualityComparer<TState>.Default;
}
/// <summary>
/// 获取最近一次分发的 action 类型。
/// </summary>
public Type? LastActionType
{
get
{
lock (_lock)
{
return _lastActionType;
}
}
}
/// <summary>
/// 获取最近一次真正改变状态的时间戳。
/// </summary>
public DateTimeOffset? LastStateChangedAt
{
get
{
lock (_lock)
{
return _lastStateChangedAt;
}
}
}
/// <summary>
/// 获取当前状态快照。
/// </summary>
@ -97,6 +134,112 @@ public class Store<TState> : IStore<TState>, IStoreDiagnostics<TState>
}
}
/// <summary>
/// 分发一个 action 并按顺序执行匹配的 reducer。
/// </summary>
/// <typeparam name="TAction">action 的具体类型。</typeparam>
/// <param name="action">要分发的 action。</param>
/// <exception cref="ArgumentNullException">当 <paramref name="action"/> 为 <see langword="null"/> 时抛出。</exception>
/// <exception cref="InvalidOperationException">当同一 Store 发生重入分发时抛出。</exception>
public void Dispatch<TAction>(TAction action)
{
ArgumentNullException.ThrowIfNull(action);
Action<TState>[] listenersSnapshot = Array.Empty<Action<TState>>();
IStoreMiddleware<TState>[] middlewaresSnapshot = Array.Empty<IStoreMiddleware<TState>>();
IStoreReducerAdapter[] reducersSnapshot = Array.Empty<IStoreReducerAdapter>();
IEqualityComparer<TState> stateComparerSnapshot = _stateComparer;
StoreDispatchContext<TState>? context = null;
var enteredDispatchScope = false;
lock (_dispatchGate)
{
try
{
lock (_lock)
{
EnsureNotDispatching();
_isDispatching = true;
enteredDispatchScope = true;
context = new StoreDispatchContext<TState>(action!, _state);
stateComparerSnapshot = _stateComparer;
middlewaresSnapshot = _middlewares.Count > 0
? _middlewares.ToArray()
: Array.Empty<IStoreMiddleware<TState>>();
reducersSnapshot = CreateReducerSnapshot(context.ActionType);
}
// middleware 和 reducer 可能包含较重的同步逻辑,因此仅持有 dispatch 串行门,
// 不占用状态锁,让读取、订阅和注册操作只在需要访问共享状态时短暂阻塞。
ExecuteDispatchPipeline(context, middlewaresSnapshot, reducersSnapshot, stateComparerSnapshot);
lock (_lock)
{
_lastActionType = context.ActionType;
_lastDispatchRecord = new StoreDispatchRecord<TState>(
context.Action,
context.PreviousState,
context.NextState,
context.HasStateChanged,
context.DispatchedAt);
if (!context.HasStateChanged)
{
return;
}
_state = context.NextState;
_lastStateChangedAt = context.DispatchedAt;
listenersSnapshot = SnapshotListenersForNotification(context.NextState);
}
}
finally
{
if (enteredDispatchScope)
{
lock (_lock)
{
_isDispatching = false;
}
}
}
}
// 始终在锁外通知订阅者,避免监听器内部读取 Store 或执行额外逻辑时产生死锁。
foreach (var listener in listenersSnapshot)
{
listener(context!.NextState);
}
}
/// <summary>
/// 获取当前订阅者数量。
/// </summary>
public int SubscriberCount
{
get
{
lock (_lock)
{
return _listeners.Count;
}
}
}
/// <summary>
/// 获取最近一次分发记录。
/// </summary>
public StoreDispatchRecord<TState>? LastDispatchRecord
{
get
{
lock (_lock)
{
return _lastDispatchRecord;
}
}
}
/// <summary>
/// 订阅状态变化通知。
/// </summary>
@ -198,119 +341,6 @@ public class Store<TState> : IStore<TState>, IStoreDiagnostics<TState>
}
}
/// <summary>
/// 分发一个 action 并按顺序执行匹配的 reducer。
/// </summary>
/// <typeparam name="TAction">action 的具体类型。</typeparam>
/// <param name="action">要分发的 action。</param>
/// <exception cref="ArgumentNullException">当 <paramref name="action"/> 为 <see langword="null"/> 时抛出。</exception>
/// <exception cref="InvalidOperationException">当同一 Store 发生重入分发时抛出。</exception>
public void Dispatch<TAction>(TAction action)
{
ArgumentNullException.ThrowIfNull(action);
Action<TState>[] listenersSnapshot = Array.Empty<Action<TState>>();
StoreDispatchContext<TState>? context = null;
lock (_lock)
{
EnsureNotDispatching();
_isDispatching = true;
try
{
context = new StoreDispatchContext<TState>(action!, _state);
// 在锁内串行执行完整分发流程,确保 reducer 与中间件看到的是一致的状态序列,
// 并且不会因为并发写入导致 reducer 顺序失效。
ExecuteDispatchPipeline(context);
_lastActionType = context.ActionType;
_lastDispatchRecord = new StoreDispatchRecord<TState>(
context.Action,
context.PreviousState,
context.NextState,
context.HasStateChanged,
context.DispatchedAt);
if (!context.HasStateChanged)
{
return;
}
_state = context.NextState;
_lastStateChangedAt = context.DispatchedAt;
listenersSnapshot = SnapshotListenersForNotification(context.NextState);
}
finally
{
_isDispatching = false;
}
}
// 始终在锁外通知订阅者,避免监听器内部读取 Store 或执行额外逻辑时产生死锁。
foreach (var listener in listenersSnapshot)
{
listener(context!.NextState);
}
}
/// <summary>
/// 获取当前订阅者数量。
/// </summary>
public int SubscriberCount
{
get
{
lock (_lock)
{
return _listeners.Count;
}
}
}
/// <summary>
/// 获取最近一次分发的 action 类型。
/// </summary>
public Type? LastActionType
{
get
{
lock (_lock)
{
return _lastActionType;
}
}
}
/// <summary>
/// 获取最近一次真正改变状态的时间戳。
/// </summary>
public DateTimeOffset? LastStateChangedAt
{
get
{
lock (_lock)
{
return _lastStateChangedAt;
}
}
}
/// <summary>
/// 获取最近一次分发记录。
/// </summary>
public StoreDispatchRecord<TState>? LastDispatchRecord
{
get
{
lock (_lock)
{
return _lastDispatchRecord;
}
}
}
/// <summary>
/// 创建一个用于当前状态类型的 Store 构建器。
/// </summary>
@ -435,13 +465,20 @@ public class Store<TState> : IStore<TState>, IStoreDiagnostics<TState>
/// 执行一次完整分发管线。
/// </summary>
/// <param name="context">当前分发上下文。</param>
private void ExecuteDispatchPipeline(StoreDispatchContext<TState> context)
/// <param name="middlewares">本次分发使用的中间件快照。</param>
/// <param name="reducers">本次分发使用的 reducer 快照。</param>
/// <param name="stateComparer">本次分发使用的状态比较器快照。</param>
private static void ExecuteDispatchPipeline(
StoreDispatchContext<TState> context,
IReadOnlyList<IStoreMiddleware<TState>> middlewares,
IReadOnlyList<IStoreReducerAdapter> reducers,
IEqualityComparer<TState> stateComparer)
{
Action pipeline = () => ApplyReducers(context);
Action pipeline = () => ApplyReducers(context, reducers, stateComparer);
for (var i = _middlewares.Count - 1; i >= 0; i--)
for (var i = middlewares.Count - 1; i >= 0; i--)
{
var middleware = _middlewares[i];
var middleware = middlewares[i];
var next = pipeline;
pipeline = () => middleware.Invoke(context, next);
}
@ -454,9 +491,14 @@ public class Store<TState> : IStore<TState>, IStoreDiagnostics<TState>
/// reducer 使用 action 的精确运行时类型进行查找,以保证匹配结果和执行顺序稳定。
/// </summary>
/// <param name="context">当前分发上下文。</param>
private void ApplyReducers(StoreDispatchContext<TState> context)
/// <param name="reducers">本次分发使用的 reducer 快照。</param>
/// <param name="stateComparer">本次分发使用的状态比较器快照。</param>
private static void ApplyReducers(
StoreDispatchContext<TState> context,
IReadOnlyList<IStoreReducerAdapter> reducers,
IEqualityComparer<TState> stateComparer)
{
if (!_reducers.TryGetValue(context.ActionType, out var reducers) || reducers.Count == 0)
if (reducers.Count == 0)
{
context.NextState = context.PreviousState;
context.HasStateChanged = false;
@ -473,7 +515,7 @@ public class Store<TState> : IStore<TState>, IStoreDiagnostics<TState>
}
context.NextState = nextState;
context.HasStateChanged = !_stateComparer.Equals(context.PreviousState, nextState);
context.HasStateChanged = !stateComparer.Equals(context.PreviousState, nextState);
}
/// <summary>
@ -521,6 +563,22 @@ public class Store<TState> : IStore<TState>, IStoreDiagnostics<TState>
return activeListeners.Count > 0 ? activeListeners.ToArray() : Array.Empty<Action<TState>>();
}
/// <summary>
/// 为当前 action 类型创建 reducer 快照。
/// Dispatch 在离开状态锁前复制列表,以便后续在锁外执行稳定、不可变的 reducer 序列。
/// </summary>
/// <param name="actionType">当前分发的 action 类型。</param>
/// <returns>对应 action 类型的 reducer 快照;若未注册则返回空数组。</returns>
private IStoreReducerAdapter[] CreateReducerSnapshot(Type actionType)
{
if (!_reducers.TryGetValue(actionType, out var reducers) || reducers.Count == 0)
{
return Array.Empty<IStoreReducerAdapter>();
}
return reducers.ToArray();
}
/// <summary>
/// 解绑一个精确的订阅对象。
/// </summary>

View File

@ -7,7 +7,7 @@ namespace GFramework.Core.StateManagement;
/// 该类型用于在 Store 创建之前集中配置比较器、reducer 和中间件,适合模块安装和测试工厂场景。
/// </summary>
/// <typeparam name="TState">状态树的根状态类型。</typeparam>
public class StoreBuilder<TState> : IStoreBuilder<TState>
public sealed class StoreBuilder<TState> : IStoreBuilder<TState>
{
/// <summary>
/// 延迟应用到 Store 的配置操作列表。
@ -20,43 +20,6 @@ public class StoreBuilder<TState> : IStoreBuilder<TState>
/// </summary>
private IEqualityComparer<TState>? _comparer;
/// <summary>
/// 配置状态比较器。
/// </summary>
/// <param name="comparer">状态比较器。</param>
/// <returns>当前构建器实例。</returns>
public IStoreBuilder<TState> WithComparer(IEqualityComparer<TState> comparer)
{
_comparer = comparer ?? throw new ArgumentNullException(nameof(comparer));
return this;
}
/// <summary>
/// 添加一个强类型 reducer。
/// </summary>
/// <typeparam name="TAction">当前 reducer 处理的 action 类型。</typeparam>
/// <param name="reducer">要添加的 reducer。</param>
/// <returns>当前构建器实例。</returns>
public IStoreBuilder<TState> AddReducer<TAction>(IReducer<TState, TAction> reducer)
{
ArgumentNullException.ThrowIfNull(reducer);
_configurators.Add(store => store.RegisterReducer(reducer));
return this;
}
/// <summary>
/// 使用委托快速添加一个 reducer。
/// </summary>
/// <typeparam name="TAction">当前 reducer 处理的 action 类型。</typeparam>
/// <param name="reducer">执行归约的委托。</param>
/// <returns>当前构建器实例。</returns>
public IStoreBuilder<TState> AddReducer<TAction>(Func<TState, TAction, TState> reducer)
{
ArgumentNullException.ThrowIfNull(reducer);
_configurators.Add(store => store.RegisterReducer(reducer));
return this;
}
/// <summary>
/// 添加一个 Store 中间件。
/// </summary>
@ -84,4 +47,42 @@ public class StoreBuilder<TState> : IStoreBuilder<TState>
return store;
}
/// <summary>
/// 添加一个强类型 reducer。
/// </summary>
/// <typeparam name="TAction">当前 reducer 处理的 action 类型。</typeparam>
/// <param name="reducer">要添加的 reducer。</param>
/// <returns>当前构建器实例。</returns>
public IStoreBuilder<TState> AddReducer<TAction>(IReducer<TState, TAction> reducer)
{
ArgumentNullException.ThrowIfNull(reducer);
_configurators.Add(store => store.RegisterReducer(reducer));
return this;
}
/// <summary>
/// 配置状态比较器。
/// </summary>
/// <param name="comparer">状态比较器。</param>
/// <returns>当前构建器实例。</returns>
public IStoreBuilder<TState> WithComparer(IEqualityComparer<TState> comparer)
{
_comparer = comparer ?? throw new ArgumentNullException(nameof(comparer));
return this;
}
/// <summary>
/// 使用委托快速添加一个 reducer。
/// </summary>
/// <typeparam name="TAction">当前 reducer 处理的 action 类型。</typeparam>
/// <param name="reducer">执行归约的委托。</param>
/// <returns>当前构建器实例。</returns>
public IStoreBuilder<TState> AddReducer<TAction>(Func<TState, TAction, TState> reducer)
{
ArgumentNullException.ThrowIfNull(reducer);
_configurators.Add(store => store.RegisterReducer(reducer));
return this;
}
}

View File

@ -12,7 +12,7 @@ namespace GFramework.Core.StateManagement;
/// </summary>
/// <typeparam name="TState">源状态类型。</typeparam>
/// <typeparam name="TSelected">投影后的局部状态类型。</typeparam>
public class StoreSelection<TState, TSelected> : IReadonlyBindableProperty<TSelected>
public sealed class StoreSelection<TState, TSelected> : IReadonlyBindableProperty<TSelected>
{
/// <summary>
/// 用于判断选择结果是否真正变化的比较器。