feat(context): 添加协程环境中的 MDC 上下文传播支持- 新增 CoroutineMdcWebFilter Bean,用于在协程环境中传播 MDC 上下文信息

This commit is contained in:
gewuyou 2025-07-17 10:30:40 +00:00
parent fa40833dfe
commit 3cda514cfc
23 changed files with 517 additions and 135 deletions

View File

@ -97,7 +97,10 @@ subprojects {
apply { apply {
// plugin(libs.plugins.java.get().pluginId) // plugin(libs.plugins.java.get().pluginId)
// plugin(libs.plugins.javaLibrary.get().pluginId) // plugin(libs.plugins.javaLibrary.get().pluginId)
if (!project.name.contains("demo")) {
plugin(libs.plugins.maven.publish.get().pluginId) plugin(libs.plugins.maven.publish.get().pluginId)
}
plugin(libs.plugins.kotlin.plugin.spring.get().pluginId)
plugin(libs.plugins.kotlin.jvm.get().pluginId) plugin(libs.plugins.kotlin.jvm.get().pluginId)
plugin(libs.plugins.axionRelease.get().pluginId) plugin(libs.plugins.axionRelease.get().pluginId)
plugin(libs.plugins.kotlin.kapt.get().pluginId) plugin(libs.plugins.kotlin.kapt.get().pluginId)
@ -111,6 +114,7 @@ subprojects {
// signAllPublications() // signAllPublications()
// } // }
// 发布配置 // 发布配置
if (!project.name.contains("demo")) {
publishing { publishing {
repositories { repositories {
// 本地仓库 // 本地仓库
@ -189,7 +193,7 @@ subprojects {
} }
} }
} }
}
// 依赖配置 // 依赖配置
dependencies { dependencies {
@ -217,8 +221,6 @@ subprojects {
tasks.named<Test>("test") { tasks.named<Test>("test") {
useJUnitPlatform() useJUnitPlatform()
} }
} }
/** /**
* 注册一个 Gradle 任务用于清理项目中的无用文件。 * 注册一个 Gradle 任务用于清理项目中的无用文件。

View File

@ -117,4 +117,7 @@ object Modules {
"${AUTHORIZE}:forgeboot-security-authorize-autoconfigure" "${AUTHORIZE}:forgeboot-security-authorize-autoconfigure"
} }
} }
object Demo{
const val ROOT = ":forgeboot-demo"
}
} }

View File

@ -1,4 +1,4 @@
dependencies { dependencies {
compileOnly(libs.kotlinxCoroutines.reactor)
} }

View File

@ -47,11 +47,16 @@ abstract class AbstractContext<K, V>: Context<K, V> {
} }
/** /**
* 从上下文中移除指定的键值对 * 从上下文中移除指定的键值对并返回被移除的值
* *
* @param key 要移除的键 * 此方法用于在上下文中删除与指定键关联的条目如果该键存在
* 则将其从上下文中移除并返回与之关联的值如果该键不存在
* 则返回 null
*
* @param key 要移除的键不能为空
* @return 与指定键关联的值如果键不存在则返回 null
*/ */
override fun remove(key: K) { override fun remove(key: K) : V? {
local.get().remove(key) return local.get().remove(key)
} }
} }

View File

@ -45,9 +45,14 @@ interface Context<K, V> {
fun clear() fun clear()
/** /**
* 从上下文中移除指定的键值对 * 从上下文中移除指定的键值对并返回被移除的值
* *
* @param key 要移除的键 * 此方法用于在上下文中删除与指定键关联的条目如果该键存在
* 则将其从上下文中移除并返回与之关联的值如果该键不存在
* 则返回 null
*
* @param key 要移除的键不能为空
* @return 与指定键关联的值如果键不存在则返回 null
*/ */
fun remove(key: K) fun remove(key: K): V?
} }

View File

@ -3,6 +3,7 @@ package com.gewuyou.forgeboot.context.autoconfigure
import com.gewuyou.forgeboot.context.api.ContextProcessor import com.gewuyou.forgeboot.context.api.ContextProcessor
import com.gewuyou.forgeboot.context.impl.ContextHolder import com.gewuyou.forgeboot.context.impl.ContextHolder
import com.gewuyou.forgeboot.context.impl.filter.ContextWebFilter import com.gewuyou.forgeboot.context.impl.filter.ContextWebFilter
import com.gewuyou.forgeboot.context.impl.filter.CoroutineMdcWebFilter
import com.gewuyou.forgeboot.context.impl.processor.ReactorProcessor import com.gewuyou.forgeboot.context.impl.processor.ReactorProcessor
import org.springframework.boot.autoconfigure.condition.ConditionalOnClass import org.springframework.boot.autoconfigure.condition.ConditionalOnClass
import org.springframework.boot.autoconfigure.condition.ConditionalOnMissingBean import org.springframework.boot.autoconfigure.condition.ConditionalOnMissingBean
@ -46,4 +47,17 @@ class ContextWebFluxAutoConfiguration {
reactorProcessor: ReactorProcessor, reactorProcessor: ReactorProcessor,
contextHolder: ContextHolder contextHolder: ContextHolder
) = ContextWebFilter(chain, reactorProcessor, contextHolder) ) = ContextWebFilter(chain, reactorProcessor, contextHolder)
/**
* 注册 CoroutineMdcWebFilter Bean用于在协程环境中传播 MDC 上下文信息
*
* MDCMapped Diagnostic Context通常用于存储线程上下文数据在异步或协程编程模型中
* 需要特殊的处理以确保上下文能够在不同协程之间正确传递
* 该过滤器通过注册为 Spring WebFlux WebFilter确保请求链路中的 MDC 数据一致性
*
* @return 构建完成的 CoroutineMdcWebFilter 实例
*/
@Bean
@ConditionalOnMissingBean
@Order(Ordered.HIGHEST_PRECEDENCE + 11) // 稍晚于 ContextWebFilter 执行
fun coroutineMdcWebFilter(contextHolder: ContextHolder): CoroutineMdcWebFilter = CoroutineMdcWebFilter(contextHolder)
} }

View File

@ -33,4 +33,12 @@ open class ContextHolder(
override fun <T> retrieveByType(key: String, type: Class<T>): T? { override fun <T> retrieveByType(key: String, type: Class<T>): T? {
return retrieve(key)?.let { valueSerializer.deserialize(it, type) } return retrieve(key)?.let { valueSerializer.deserialize(it, type) }
} }
/**
* 将指定的映射中的所有键值对存储到上下文中
*
* @param map 包含键值对的映射键为字符串类型值可以为任意类型或 null
*/
fun putAll(map: Map<String, Any?>) {
map.forEach { (k, v) -> put(k, v) }
}
} }

View File

@ -0,0 +1,102 @@
package com.gewuyou.forgeboot.context.impl.coroutine
import com.gewuyou.forgeboot.context.api.ContextProcessor
import com.gewuyou.forgeboot.context.impl.ContextHolder
import com.gewuyou.forgeboot.context.impl.element.CoroutineContextMapElement
import kotlinx.coroutines.*
import kotlin.coroutines.CoroutineContext
/**
* 上下文感知的协程作用域实现类
* 用于在协程中保持上下文信息的传递和一致性
*
* @property contextHolder 上下文持有者实例用于获取和存储上下文快照
* @property processors 上下文处理器列表用于在协程中注入和清理上下文数据
* @since 2025-07-17 17:31:31
* @author gewuyou
*/
class ContextAwareCoroutineScope(
private val contextHolder: ContextHolder,
private val processors: List<ContextProcessor>,
) : CoroutineScope {
/**
* 协程上下文配置包含SupervisorJob和默认调度器
* SupervisorJob确保子协程的独立生命周期
* Dispatchers.Default适用于计算密集型任务
*/
override val coroutineContext: CoroutineContext = SupervisorJob() + Dispatchers.Default
/**
* 启动一个带有上下文传播的协程
*
* @param dispatcher 协程调度器默认使用Dispatchers.Default
* @param block 协程执行的具体逻辑
* @return 返回Job对象用于管理协程生命周期
*/
fun launchWithContext(
dispatcher: CoroutineDispatcher = Dispatchers.Default,
block: suspend CoroutineScope.() -> Unit,
): Job {
val snapshot = contextHolder.snapshot().toMutableMap()
return launch(dispatcher + CoroutineContextMapElement(contextHolder, snapshot)) {
try {
// 注入上下文数据到当前协程
processors.forEach { it.inject(Unit, snapshot) }
block()
} finally {
// 清理上下文数据并释放资源
processors.forEach { it.inject(Unit, mutableMapOf()) }
contextHolder.clear()
}
}
}
/**
* 异步启动一个带有上下文传播的协程
*
* @param dispatcher 协程调度器默认使用Dispatchers.Default
* @param block 协程执行的具体逻辑
* @return 返回Deferred对象用于获取异步执行结果
*/
fun <T> asyncWithContext(
dispatcher: CoroutineDispatcher = Dispatchers.Default,
block: suspend CoroutineScope.() -> T,
): Deferred<T> {
val snapshot = contextHolder.snapshot().toMutableMap()
return async(dispatcher + CoroutineContextMapElement(contextHolder, snapshot)) {
try {
// 注入上下文数据到当前协程
processors.forEach { it.inject(Unit, snapshot) }
block()
} finally {
// 清理上下文数据并释放资源
processors.forEach { it.inject(Unit, mutableMapOf()) }
contextHolder.clear()
}
}
}
/**
* 在指定上下文中执行挂起代码块
*
* @param block 需要执行的挂起代码块
* @return 返回执行结果
*/
suspend fun <T> runWithContext(
block: suspend () -> T,
): T {
val snapshot = contextHolder.snapshot().toMutableMap()
return withContext(CoroutineContextMapElement(contextHolder, snapshot)) {
try {
// 注入上下文数据到当前协程
processors.forEach { it.inject(Unit, snapshot) }
block()
} finally {
// 清理上下文数据并释放资源
processors.forEach { it.inject(Unit, mutableMapOf()) }
contextHolder.clear()
}
}
}
}

View File

@ -0,0 +1,80 @@
package com.gewuyou.forgeboot.context.impl.element
import com.gewuyou.forgeboot.context.impl.ContextHolder
import kotlinx.coroutines.ThreadContextElement
import kotlinx.coroutines.withContext
import kotlin.coroutines.AbstractCoroutineContextElement
import kotlin.coroutines.CoroutineContext
/**
* 协程上下文键值对元素用于在协程上下文中保存和恢复一个Map结构的数据
*
* @property contextMap 保存协程上下文数据的Map键为字符串值为可空的任意类型对象
* @since 2025-07-17 14:27:05
* @author gewuyou
*/
class CoroutineContextMapElement(
private val contextHolder: ContextHolder,
private val contextMap: Map<String, Any?>,
) : ThreadContextElement<Map<String, Any?>>,
AbstractCoroutineContextElement(Key) {
companion object Key : CoroutineContext.Key<CoroutineContextMapElement> {
/**
* 在统一的协程上下文快照中执行指定的挂起代码块
*
* 该方法会将给定的上下文快照设置为新的协程上下文并在其中执行传入的代码块
* 使用 [withContext] 切换到新的协程上下文同时通过 [ContextHolder.putAll] 更新上下文数据
*
* @param contextHolder 用于存储和更新上下文数据的容器
* @param snapshot 提供初始上下文数据的键值对映射
* @param block 需要在新上下文中执行的挂起函数
* @return 返回执行完成后的结果
*/
suspend fun <T> withUnifiedContextSnapshot(
contextHolder: ContextHolder,
snapshot: Map<String, Any?>,
block: suspend () -> T,
): T {
return withContext(CoroutineContextMapElement(contextHolder, snapshot)) {
contextHolder.putAll(snapshot)
block()
}
}
}
/**
* 更新当前协程上下文时调用保存当前上下文数据
*
* @param context 当前协程上下文
* @return 返回一个空Map表示保存的上下文状态
*/
override fun updateThreadContext(context: CoroutineContext): Map<String, Any?> {
val oldContext = contextHolder.snapshot() // 原始 ThreadLocal 快照
contextMap.forEach { (k, v) ->
if (v != null) contextHolder.put(k, v.toString())
}
return oldContext
}
/**
* 恢复协程上下文时调用用于将之前保存的上下文状态还原
*
* @param context 当前协程上下文
* @param oldState 之前保存的上下文状态
*/
override fun restoreThreadContext(context: CoroutineContext, oldState: Map<String, Any?>) {
contextHolder.clear()
oldState.forEach { (k, v): Map.Entry<String, Any?> -> contextHolder.put(k, v) }
}
/**
* 获取当前协程上下文中保存的Map数据
*
* @return 返回包含当前上下文数据的Map
*/
fun getContext(): Map<String, Any?> = contextMap
}

View File

@ -1,53 +0,0 @@
package com.gewuyou.forgeboot.context.impl.element
import kotlinx.coroutines.ThreadContextElement
import org.slf4j.MDC
import kotlin.coroutines.AbstractCoroutineContextElement
import kotlin.coroutines.CoroutineContext
/**
* MDC上下文元素用于在协程中传播MDCMapped Diagnostic Context上下文
*
* MDC SLF4J 提供的一个日志诊断工具允许将特定信息绑定到当前线程的上下文中
* 以便在日志输出时能够包含这些信息由于协程可能在线程间切换因此需要此实现来保证上下文一致性
*
* @param contextMap 包含 MDC 上下文键值对的不可变 Map用于保存和恢复诊断上下文
* @since 2025-07-16 11:05:47
* @author gewuyou
*/
class MdcContextElement(
private val contextMap: Map<String, String>
) : ThreadContextElement<Map<String, String>>,
AbstractCoroutineContextElement(Key) {
/**
* 协程上下文键对象用于标识此类上下文元素的唯一性
*/
companion object Key : CoroutineContext.Key<MdcContextElement>
/**
* 更新当前线程的 MDC 上下文为协程指定的上下文并返回旧的上下文状态
*
* 此方法会在协程切换至新线程时调用以确保目标线程的 MDC 上下文与协程一致
*
* @param context 当前协程的上下文不直接使用但保留用于扩展
* @return 返回更新前的 MDC 上下文状态用于后续恢复
*/
override fun updateThreadContext(context: CoroutineContext): Map<String, String> {
val oldState = MDC.getCopyOfContextMap() ?: emptyMap()
MDC.setContextMap(contextMap)
return oldState
}
/**
* 恢复当前线程的 MDC 上下文至先前保存的状态
*
* 此方法在协程完成执行并释放线程资源时调用确保线程可以还原其原始 MDC 上下文
*
* @param context 当前协程的上下文不直接使用但保留用于扩展
* @param oldState 需要恢复的先前 MDC 上下文状态
*/
override fun restoreThreadContext(context: CoroutineContext, oldState: Map<String, String>) {
MDC.setContextMap(oldState)
}
}

View File

@ -1,6 +1,7 @@
package com.gewuyou.forgeboot.context.impl.filter package com.gewuyou.forgeboot.context.impl.filter
import com.gewuyou.forgeboot.context.impl.element.MdcContextElement import com.gewuyou.forgeboot.context.impl.ContextHolder
import com.gewuyou.forgeboot.context.impl.element.CoroutineContextMapElement
import kotlinx.coroutines.reactive.awaitFirstOrNull import kotlinx.coroutines.reactive.awaitFirstOrNull
import kotlinx.coroutines.reactor.mono import kotlinx.coroutines.reactor.mono
import org.springframework.web.server.ServerWebExchange import org.springframework.web.server.ServerWebExchange
@ -15,21 +16,32 @@ import java.util.stream.Collectors
* 用于在响应式编程环境下将上下文 MDC Reactor Context 传递到 Kotlin 协程中 * 用于在响应式编程环境下将上下文 MDC Reactor Context 传递到 Kotlin 协程中
* 确保日志上下文信息能够正确传播 * 确保日志上下文信息能够正确传播
* *
* @property contextHolder 上下文持有者实例用于存储和管理 MDC 上下文数据
*
* @since 2025-07-16 11:07:44 * @since 2025-07-16 11:07:44
* @author gewuyou * @author gewuyou
*/ */
class CoroutineMdcWebFilter : WebFilter { class CoroutineMdcWebFilter(
private val contextHolder: ContextHolder
) : WebFilter {
/** /**
* 执行过滤操作的方法 * 执行过滤操作的方法
* *
* 在响应式流执行过程中拦截并提取 Reactor Context 中的 MDC 数据
* 将其封装为协程上下文并在新的协程环境中继续执行过滤链
* 以确保日志上下文信息在异步非阻塞处理流程中正确传播
*
* @param exchange 表示当前的服务器 Web 交换信息包含请求和响应 * @param exchange 表示当前的服务器 Web 交换信息包含请求和响应
* @param chain 当前的过滤器链用于继续执行后续的过滤器或目标处理器 * @param chain 当前的过滤器链用于继续执行后续的过滤器或目标处理器
* @return 返回一个 Mono<Void>表示异步完成的过滤操作 * @return 返回一个 Mono<Void>表示异步完成的过滤操作
*/ */
override fun filter(exchange: ServerWebExchange, chain: WebFilterChain): Mono<Void> { override fun filter(exchange: ServerWebExchange, chain: WebFilterChain): Mono<Void> {
return Mono.deferContextual { ctxView -> return Mono.deferContextual { ctxView ->
// 从 Reactor 上下文中提取键值对,筛选出 key 和 value 均为 String 类型的条目 /**
* Reactor 上下文中提取键值对筛选出 key value 均为 String 类型的条目
* 构建 MDC 数据映射表用于传递日志上下文信息
*/
val mdcMap = ctxView.stream() val mdcMap = ctxView.stream()
.filter { it.key is String && it.value is String } .filter { it.key is String && it.value is String }
.collect(Collectors.toMap( .collect(Collectors.toMap(
@ -37,8 +49,14 @@ class CoroutineMdcWebFilter : WebFilter {
{ it.value as String } { it.value as String }
)) ))
// 在带有 MDC 上下文的协程中执行过滤链 /**
mono(MdcContextElement(mdcMap)) { * 创建带有 MDC 上下文的协程环境并执行过滤链
* 1. MDC 数据注入协程上下文
* 2. 将上下文数据同步到 ContextHolder
* 3. 执行后续过滤器链并等待结果
*/
mono(CoroutineContextMapElement(contextHolder,mdcMap)) {
contextHolder.putAll(mdcMap)
chain.filter(exchange).awaitFirstOrNull() chain.filter(exchange).awaitFirstOrNull()
} }
} }

3
forgeboot-demo/.gitattributes vendored Normal file
View File

@ -0,0 +1,3 @@
/gradlew text eol=lf
*.bat text eol=crlf
*.jar binary

40
forgeboot-demo/.gitignore vendored Normal file
View File

@ -0,0 +1,40 @@
HELP.md
.gradle
build/
!gradle/wrapper/gradle-wrapper.jar
!**/src/main/**/build/
!**/src/test/**/build/
### STS ###
.apt_generated
.classpath
.factorypath
.project
.settings
.springBeans
.sts4-cache
bin/
!**/src/main/**/bin/
!**/src/test/**/bin/
### IntelliJ IDEA ###
.idea
*.iws
*.iml
*.ipr
out/
!**/src/main/**/out/
!**/src/test/**/out/
### NetBeans ###
/nbproject/private/
/nbbuild/
/dist/
/nbdist/
/.nb-gradle/
### VS Code ###
.vscode/
### Kotlin ###
.kotlin

View File

@ -0,0 +1,5 @@
dependencies {
}

View File

@ -0,0 +1,3 @@
/gradlew text eol=lf
*.bat text eol=crlf
*.jar binary

View File

@ -0,0 +1,40 @@
HELP.md
.gradle
build/
!gradle/wrapper/gradle-wrapper.jar
!**/src/main/**/build/
!**/src/test/**/build/
### STS ###
.apt_generated
.classpath
.factorypath
.project
.settings
.springBeans
.sts4-cache
bin/
!**/src/main/**/bin/
!**/src/test/**/bin/
### IntelliJ IDEA ###
.idea
*.iws
*.iml
*.ipr
out/
!**/src/main/**/out/
!**/src/test/**/out/
### NetBeans ###
/nbproject/private/
/nbbuild/
/dist/
/nbdist/
/.nb-gradle/
### VS Code ###
.vscode/
### Kotlin ###
.kotlin

View File

@ -0,0 +1,9 @@
dependencies {
implementation(libs.springBootStarter.web)
implementation(project(Modules.TRACE.STARTER))
implementation(project(Modules.Context.STARTER))
implementation(libs.kotlinxCoroutines.reactor)
implementation(libs.kotlinxCoroutines.core)
}

View File

@ -0,0 +1,11 @@
package com.gewuyou.forgeboot.trace.demo
import org.springframework.boot.autoconfigure.SpringBootApplication
import org.springframework.boot.runApplication
@SpringBootApplication
class ForgebootTraceDemoApplication
fun main(args: Array<String>) {
runApplication<ForgebootTraceDemoApplication>(*args)
}

View File

@ -0,0 +1,44 @@
package com.gewuyou.forgeboot.trace.demo.controller
import com.gewuyou.forgeboot.context.api.ContextProcessor
import com.gewuyou.forgeboot.context.impl.ContextHolder
import com.gewuyou.forgeboot.context.impl.coroutine.ContextAwareCoroutineScope
import com.gewuyou.forgeboot.core.extension.log
import com.gewuyou.forgeboot.trace.api.RequestIdProvider
import org.springframework.web.bind.annotation.GetMapping
import org.springframework.web.bind.annotation.RequestMapping
import org.springframework.web.bind.annotation.RestController
/**
*MVC控制器
*
* @since 2025-07-17 17:13:44
* @author gewuyou
*/
@RestController
@RequestMapping("/trace")
class TraceTestController(
private val requestIdProvider: RequestIdProvider,
private val contextHolder: ContextHolder,
private val processors: List<ContextProcessor>,
) {
@GetMapping("/coroutine")
suspend fun coroutine(): String {
val requestId = requestIdProvider.getRequestId()
log.info("→ Controller RequestId: $requestId")
// 模拟内部异步任务3秒后完成
val scope = ContextAwareCoroutineScope(contextHolder,processors)
scope.launchWithContext {
log.info("RID: ${requestIdProvider.getRequestId()}")
}
return "Main coroutine returned immediately with requestId: $requestId"
}
@GetMapping("/servlet")
fun servlet(): String {
val requestId = requestIdProvider.getRequestId()
log.info("Servlet requestId: $requestId")
return "Servlet OK: $requestId"
}
}

View File

@ -0,0 +1,5 @@
spring:
application:
name: forgeboot-trace-demo
profiles:
active: dev

View File

@ -0,0 +1,30 @@
<?xml version="1.0" encoding="UTF-8"?>
<configuration scan="true">
<!-- 公共属性 -->
<springProperty name="APP_NAME" source="spring.application.name" defaultValue="unknow-app"/>
<property name="LOG_PATH" value="logs/${APP_NAME}"/>
<property name="MAX_FILE_SIZE" value="10MB"/>
<property name="MAX_HISTORY" value="10"/>
<!--彩色日志格式-->
<property name="CONSOLE_PATTERN"
value="时间:[%red(%d{yyyy-MM-dd HH:mm:ss})] 请求ID:[%highlight(%X{requestId})] 线程:[%green(%thread)] 日志级别:[%highlight(%-5level)] 调用位置:[%boldMagenta(%logger{50}) 参见:%blue(\(%F:%L\))] 日志信息:[%cyan(%msg%n)]"/>
<!--文件日志格式-->
<property name="FILE_PATTERN"
value="时间:[%d{yyyy-MM-dd HH:mm:ss}] 请求ID:[%X{requestId}] 线程:[%thread] 日志级别:[%-5level] 调用位置:[%logger{50} 参见:(%F:%L)] 日志信息:[%msg%n]"/>
<!-- ========== dev & test双轨Pattern + JSON ========== -->
<springProfile name="dev">
<!-- 彩色行日志 -->
<appender name="CONSOLE" class="ch.qos.logback.core.ConsoleAppender">
<encoder>
<pattern>${CONSOLE_PATTERN}</pattern>
</encoder>
</appender>
<!-- <root level="DEBUG">-->
<root level="INFO">
<appender-ref ref="CONSOLE"/>
</root>
</springProfile>
</configuration>

View File

@ -136,6 +136,14 @@ project(":forgeboot-trace:forgeboot-trace-impl").name = "forgeboot-trace-impl"
project(":forgeboot-trace:forgeboot-trace-autoconfigure").name = "forgeboot-trace-autoconfigure" project(":forgeboot-trace:forgeboot-trace-autoconfigure").name = "forgeboot-trace-autoconfigure"
//endregion //endregion
////region module demo
include(
"forgeboot-demo",
":forgeboot-demo:forgeboot-trace-demo"
)
////region module security ////region module security
///** ///**
// * Includes and configures projects related to 'forgeboot-security' // * Includes and configures projects related to 'forgeboot-security'