第239集SpringBoot InheritableThreadLocal架构实战:父子线程传递、上下文继承、异步任务处理的企业级解决方案

前言

在复杂的多线程应用中,父子线程间的上下文传递是一个关键挑战。传统的ThreadLocal只能在线程内部使用,无法传递给子线程,这导致在异步任务处理、线程池执行等场景中丢失重要的上下文信息。InheritableThreadLocal作为ThreadLocal的扩展,能够自动将父线程的变量传递给子线程,实现上下文的继承和传递。基于SpringBoot的InheritableThreadLocal架构,不仅能够安全高效地管理父子线程间的上下文传递,还能实现异步任务处理、上下文继承和性能优化。随着微服务架构和异步编程的普及,构建可扩展、可维护的InheritableThreadLocal框架,已成为企业级架构师必须掌握的核心技能。

本文将深入探讨SpringBoot中InheritableThreadLocal的架构设计与实战应用,从父子线程传递到上下文继承,从异步任务处理到性能优化,为企业构建稳定、高效的InheritableThreadLocal解决方案提供全面的技术指导。

一、SpringBoot InheritableThreadLocal架构概述与核心原理

1.1 InheritableThreadLocal架构设计

SpringBoot InheritableThreadLocal系统采用分层架构设计,通过父子线程传递、上下文继承、异步任务处理等技术,实现高效的线程间上下文传递能力。

1.2 核心组件架构

二、企业级InheritableThreadLocal管理器设计

2.1 InheritableThreadLocal核心管理器

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
/**
* 企业级InheritableThreadLocal管理器
* 提供父子线程间的上下文传递和继承管理
*/
@Component
@Slf4j
public class InheritableThreadLocalManager {

private final Map<String, InheritableThreadLocal<?>> inheritableThreadLocalMap = new ConcurrentHashMap<>();
private final ContextInheritanceManager contextInheritanceManager;
private final AsyncTaskManager asyncTaskManager;
private final ThreadPoolManager threadPoolManager;
private final PerformanceMonitor performanceMonitor;

/**
* 创建InheritableThreadLocal实例
*/
public <T> InheritableThreadLocal<T> createInheritableThreadLocal(String name, Class<T> type) {
try {
// 检查是否已存在
if (inheritableThreadLocalMap.containsKey(name)) {
throw new InheritableThreadLocalException("InheritableThreadLocal已存在: " + name);
}

// 创建InheritableThreadLocal实例
InheritableThreadLocal<T> inheritableThreadLocal = new InheritableThreadLocal<T>() {
@Override
protected T childValue(T parentValue) {
try {
// 记录子线程继承操作
performanceMonitor.recordInheritance(name, parentValue);

// 执行上下文继承逻辑
return contextInheritanceManager.inheritContext(name, parentValue);

} catch (Exception e) {
log.error("子线程继承上下文失败: {}", name, e);
return parentValue; // 返回父线程的值
}
}

@Override
protected T initialValue() {
try {
return type.newInstance();
} catch (Exception e) {
log.error("InheritableThreadLocal初始值创建失败: {}", name, e);
return null;
}
}

@Override
public void set(T value) {
super.set(value);
// 记录设置操作
performanceMonitor.recordOperation(name, "set");
}

@Override
public T get() {
T value = super.get();
// 记录获取操作
performanceMonitor.recordOperation(name, "get");
return value;
}

@Override
public void remove() {
T value = get();
super.remove();
// 记录移除操作
performanceMonitor.recordOperation(name, "remove");
}
};

// 注册InheritableThreadLocal
inheritableThreadLocalMap.put(name, inheritableThreadLocal);

// 注册到上下文继承管理器
contextInheritanceManager.registerInheritableThreadLocal(name, inheritableThreadLocal);

log.info("InheritableThreadLocal创建成功: {}", name);
return inheritableThreadLocal;

} catch (Exception e) {
log.error("InheritableThreadLocal创建失败: {}", name, e);
throw new InheritableThreadLocalException("InheritableThreadLocal创建失败: " + e.getMessage());
}
}

/**
* 获取InheritableThreadLocal实例
*/
@SuppressWarnings("unchecked")
public <T> InheritableThreadLocal<T> getInheritableThreadLocal(String name) {
InheritableThreadLocal<?> inheritableThreadLocal = inheritableThreadLocalMap.get(name);
if (inheritableThreadLocal == null) {
throw new InheritableThreadLocalException("InheritableThreadLocal不存在: " + name);
}
return (InheritableThreadLocal<T>) inheritableThreadLocal;
}

/**
* 设置InheritableThreadLocal值
*/
public <T> void setValue(String name, T value) {
try {
InheritableThreadLocal<T> inheritableThreadLocal = getInheritableThreadLocal(name);
inheritableThreadLocal.set(value);

// 记录上下文信息
contextInheritanceManager.recordContext(name, value);

log.debug("InheritableThreadLocal值设置成功: {} = {}", name, value);

} catch (Exception e) {
log.error("InheritableThreadLocal值设置失败: {}", name, e);
throw new InheritableThreadLocalException("InheritableThreadLocal值设置失败: " + e.getMessage());
}
}

/**
* 获取InheritableThreadLocal值
*/
public <T> T getValue(String name) {
try {
InheritableThreadLocal<T> inheritableThreadLocal = getInheritableThreadLocal(name);
T value = inheritableThreadLocal.get();

// 记录上下文信息
contextInheritanceManager.recordContext(name, value);

return value;

} catch (Exception e) {
log.error("InheritableThreadLocal值获取失败: {}", name, e);
throw new InheritableThreadLocalException("InheritableThreadLocal值获取失败: " + e.getMessage());
}
}

/**
* 移除InheritableThreadLocal值
*/
public <T> void removeValue(String name) {
try {
InheritableThreadLocal<T> inheritableThreadLocal = getInheritableThreadLocal(name);
inheritableThreadLocal.remove();

// 清理上下文信息
contextInheritanceManager.clearContext(name);

log.debug("InheritableThreadLocal值移除成功: {}", name);

} catch (Exception e) {
log.error("InheritableThreadLocal值移除失败: {}", name, e);
throw new InheritableThreadLocalException("InheritableThreadLocal值移除失败: " + e.getMessage());
}
}

/**
* 执行异步任务
*/
public <T> CompletableFuture<T> executeAsyncTask(String taskName, Callable<T> task) {
try {
// 获取当前线程的上下文
Map<String, Object> currentContext = contextInheritanceManager.getCurrentContext();

// 创建异步任务
CompletableFuture<T> future = asyncTaskManager.executeAsyncTask(taskName, task, currentContext);

log.debug("异步任务执行成功: {}", taskName);
return future;

} catch (Exception e) {
log.error("异步任务执行失败: {}", taskName, e);
throw new InheritableThreadLocalException("异步任务执行失败: " + e.getMessage());
}
}

/**
* 执行异步任务(带返回值处理)
*/
public <T, R> CompletableFuture<R> executeAsyncTaskWithResult(String taskName, Callable<T> task, Function<T, R> resultProcessor) {
try {
// 获取当前线程的上下文
Map<String, Object> currentContext = contextInheritanceManager.getCurrentContext();

// 创建异步任务
CompletableFuture<R> future = asyncTaskManager.executeAsyncTaskWithResult(taskName, task, resultProcessor, currentContext);

log.debug("异步任务执行成功: {}", taskName);
return future;

} catch (Exception e) {
log.error("异步任务执行失败: {}", taskName, e);
throw new InheritableThreadLocalException("异步任务执行失败: " + e.getMessage());
}
}

/**
* 批量执行异步任务
*/
public <T> CompletableFuture<List<T>> executeBatchAsyncTasks(String taskName, List<Callable<T>> tasks) {
try {
// 获取当前线程的上下文
Map<String, Object> currentContext = contextInheritanceManager.getCurrentContext();

// 创建批量异步任务
CompletableFuture<List<T>> future = asyncTaskManager.executeBatchAsyncTasks(taskName, tasks, currentContext);

log.debug("批量异步任务执行成功: {}", taskName);
return future;

} catch (Exception e) {
log.error("批量异步任务执行失败: {}", taskName, e);
throw new InheritableThreadLocalException("批量异步任务执行失败: " + e.getMessage());
}
}

/**
* 创建自定义线程池
*/
public ExecutorService createCustomThreadPool(String poolName, int corePoolSize, int maximumPoolSize) {
try {
ExecutorService threadPool = threadPoolManager.createCustomThreadPool(poolName, corePoolSize, maximumPoolSize);

log.info("自定义线程池创建成功: {}", poolName);
return threadPool;

} catch (Exception e) {
log.error("自定义线程池创建失败: {}", poolName, e);
throw new InheritableThreadLocalException("自定义线程池创建失败: " + e.getMessage());
}
}

/**
* 清理所有InheritableThreadLocal
*/
public void clearAll() {
try {
for (Map.Entry<String, InheritableThreadLocal<?>> entry : inheritableThreadLocalMap.entrySet()) {
String name = entry.getKey();
InheritableThreadLocal<?> inheritableThreadLocal = entry.getValue();

try {
inheritableThreadLocal.remove();
log.debug("InheritableThreadLocal清理成功: {}", name);
} catch (Exception e) {
log.error("InheritableThreadLocal清理失败: {}", name, e);
}
}

// 清理上下文信息
contextInheritanceManager.clearAllContext();

log.info("所有InheritableThreadLocal清理完成");

} catch (Exception e) {
log.error("InheritableThreadLocal清理失败", e);
throw new InheritableThreadLocalException("InheritableThreadLocal清理失败: " + e.getMessage());
}
}

/**
* 获取InheritableThreadLocal统计信息
*/
public InheritableThreadLocalStatistics getStatistics() {
try {
Map<String, InheritableThreadLocalInfo> inheritableThreadLocalInfos = new HashMap<>();

for (Map.Entry<String, InheritableThreadLocal<?>> entry : inheritableThreadLocalMap.entrySet()) {
String name = entry.getKey();
InheritableThreadLocal<?> inheritableThreadLocal = entry.getValue();

InheritableThreadLocalInfo info = InheritableThreadLocalInfo.builder()
.name(name)
.type(inheritableThreadLocal.getClass().getSimpleName())
.hasValue(inheritableThreadLocal.get() != null)
.build();

inheritableThreadLocalInfos.put(name, info);
}

return InheritableThreadLocalStatistics.builder()
.totalCount(inheritableThreadLocalMap.size())
.inheritableThreadLocalInfos(inheritableThreadLocalInfos)
.inheritanceMetrics(contextInheritanceManager.getInheritanceMetrics())
.asyncTaskMetrics(asyncTaskManager.getAsyncTaskMetrics())
.threadPoolMetrics(threadPoolManager.getThreadPoolMetrics())
.performanceMetrics(performanceMonitor.getMetrics())
.build();

} catch (Exception e) {
log.error("获取InheritableThreadLocal统计信息失败", e);
throw new InheritableThreadLocalException("获取InheritableThreadLocal统计信息失败: " + e.getMessage());
}
}

/**
* 销毁InheritableThreadLocal管理器
*/
@PreDestroy
public void destroy() {
try {
// 清理所有InheritableThreadLocal
clearAll();

// 清理资源
inheritableThreadLocalMap.clear();

log.info("InheritableThreadLocal管理器销毁完成");

} catch (Exception e) {
log.error("InheritableThreadLocal管理器销毁失败", e);
}
}
}

2.2 上下文继承管理器

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
/**
* 上下文继承管理器
* 负责管理父子线程间的上下文继承
*/
@Component
@Slf4j
public class ContextInheritanceManager {

private final Map<String, InheritableThreadLocal<?>> registeredInheritableThreadLocals = new ConcurrentHashMap<>();
private final Map<String, Object> contextMap = new ConcurrentHashMap<>();
private final ThreadLocal<Map<String, Object>> threadLocalContext = new ThreadLocal<>();

private final AtomicLong totalInheritanceCount = new AtomicLong(0);
private final AtomicLong successfulInheritanceCount = new AtomicLong(0);
private final AtomicLong failedInheritanceCount = new AtomicLong(0);

/**
* 注册InheritableThreadLocal
*/
public void registerInheritableThreadLocal(String name, InheritableThreadLocal<?> inheritableThreadLocal) {
try {
registeredInheritableThreadLocals.put(name, inheritableThreadLocal);

log.info("InheritableThreadLocal注册成功: {}", name);

} catch (Exception e) {
log.error("InheritableThreadLocal注册失败: {}", name, e);
throw new ContextInheritanceException("InheritableThreadLocal注册失败: " + e.getMessage());
}
}

/**
* 继承上下文
*/
public <T> T inheritContext(String name, T parentValue) {
try {
totalInheritanceCount.incrementAndGet();

// 执行上下文继承逻辑
T inheritedValue = performContextInheritance(name, parentValue);

if (inheritedValue != null) {
successfulInheritanceCount.incrementAndGet();
log.debug("上下文继承成功: {} = {}", name, inheritedValue);
} else {
failedInheritanceCount.incrementAndGet();
log.warn("上下文继承失败: {}", name);
}

return inheritedValue;

} catch (Exception e) {
failedInheritanceCount.incrementAndGet();
log.error("上下文继承失败: {}", name, e);
return parentValue; // 返回父线程的值
}
}

/**
* 执行上下文继承
*/
private <T> T performContextInheritance(String name, T parentValue) {
try {
if (parentValue == null) {
return null;
}

// 根据上下文类型执行不同的继承策略
switch (name) {
case "userContext":
return inheritUserContext(parentValue);
case "requestContext":
return inheritRequestContext(parentValue);
case "businessContext":
return inheritBusinessContext(parentValue);
case "securityContext":
return inheritSecurityContext(parentValue);
case "logContext":
return inheritLogContext(parentValue);
default:
return inheritDefaultContext(parentValue);
}

} catch (Exception e) {
log.error("执行上下文继承失败: {}", name, e);
return parentValue;
}
}

/**
* 继承用户上下文
*/
@SuppressWarnings("unchecked")
private <T> T inheritUserContext(T parentValue) {
try {
if (parentValue instanceof UserContext) {
UserContext parentUserContext = (UserContext) parentValue;

// 创建新的用户上下文,继承父线程的信息
UserContext inheritedUserContext = UserContext.builder()
.userId(parentUserContext.getUserId())
.username(parentUserContext.getUsername())
.email(parentUserContext.getEmail())
.roles(new ArrayList<>(parentUserContext.getRoles()))
.permissions(new ArrayList<>(parentUserContext.getPermissions()))
.loginTime(parentUserContext.getLoginTime())
.threadId(Thread.currentThread().getId())
.inheritedFrom(Thread.currentThread().getName())
.build();

return (T) inheritedUserContext;
}

return parentValue;

} catch (Exception e) {
log.error("继承用户上下文失败", e);
return parentValue;
}
}

/**
* 继承请求上下文
*/
@SuppressWarnings("unchecked")
private <T> T inheritRequestContext(T parentValue) {
try {
if (parentValue instanceof RequestContext) {
RequestContext parentRequestContext = (RequestContext) parentValue;

// 创建新的请求上下文,继承父线程的信息
RequestContext inheritedRequestContext = RequestContext.builder()
.requestId(parentRequestContext.getRequestId())
.requestTime(parentRequestContext.getRequestTime())
.clientIp(parentRequestContext.getClientIp())
.userAgent(parentRequestContext.getUserAgent())
.threadId(Thread.currentThread().getId())
.inheritedFrom(Thread.currentThread().getName())
.build();

return (T) inheritedRequestContext;
}

return parentValue;

} catch (Exception e) {
log.error("继承请求上下文失败", e);
return parentValue;
}
}

/**
* 继承业务上下文
*/
@SuppressWarnings("unchecked")
private <T> T inheritBusinessContext(T parentValue) {
try {
if (parentValue instanceof Map) {
Map<String, Object> parentBusinessContext = (Map<String, Object>) parentValue;

// 创建新的业务上下文,深度复制父线程的信息
Map<String, Object> inheritedBusinessContext = new HashMap<>();
for (Map.Entry<String, Object> entry : parentBusinessContext.entrySet()) {
String key = entry.getKey();
Object value = entry.getValue();

// 深度复制值
Object copiedValue = deepCopyValue(value);
inheritedBusinessContext.put(key, copiedValue);
}

// 添加继承信息
inheritedBusinessContext.put("threadId", Thread.currentThread().getId());
inheritedBusinessContext.put("inheritedFrom", Thread.currentThread().getName());

return (T) inheritedBusinessContext;
}

return parentValue;

} catch (Exception e) {
log.error("继承业务上下文失败", e);
return parentValue;
}
}

/**
* 继承安全上下文
*/
@SuppressWarnings("unchecked")
private <T> T inheritSecurityContext(T parentValue) {
try {
if (parentValue instanceof SecurityContext) {
SecurityContext parentSecurityContext = (SecurityContext) parentValue;

// 创建新的安全上下文,继承父线程的信息
SecurityContext inheritedSecurityContext = SecurityContext.builder()
.principal(parentSecurityContext.getPrincipal())
.authorities(new ArrayList<>(parentSecurityContext.getAuthorities()))
.authenticated(parentSecurityContext.isAuthenticated())
.threadId(Thread.currentThread().getId())
.inheritedFrom(Thread.currentThread().getName())
.build();

return (T) inheritedSecurityContext;
}

return parentValue;

} catch (Exception e) {
log.error("继承安全上下文失败", e);
return parentValue;
}
}

/**
* 继承日志上下文
*/
@SuppressWarnings("unchecked")
private <T> T inheritLogContext(T parentValue) {
try {
if (parentValue instanceof LogContext) {
LogContext parentLogContext = (LogContext) parentValue;

// 创建新的日志上下文,继承父线程的信息
LogContext inheritedLogContext = LogContext.builder()
.traceId(parentLogContext.getTraceId())
.spanId(parentLogContext.getSpanId())
.parentSpanId(parentLogContext.getParentSpanId())
.threadId(Thread.currentThread().getId())
.inheritedFrom(Thread.currentThread().getName())
.build();

return (T) inheritedLogContext;
}

return parentValue;

} catch (Exception e) {
log.error("继承日志上下文失败", e);
return parentValue;
}
}

/**
* 继承默认上下文
*/
private <T> T inheritDefaultContext(T parentValue) {
try {
// 对于其他类型的上下文,执行浅度复制
if (parentValue instanceof Cloneable) {
// 如果对象实现了Cloneable接口,尝试克隆
try {
Method cloneMethod = parentValue.getClass().getMethod("clone");
return (T) cloneMethod.invoke(parentValue);
} catch (Exception e) {
log.debug("对象克隆失败,使用原始值: {}", e.getMessage());
}
}

// 返回原始值
return parentValue;

} catch (Exception e) {
log.error("继承默认上下文失败", e);
return parentValue;
}
}

/**
* 深度复制值
*/
private Object deepCopyValue(Object value) {
try {
if (value == null) {
return null;
}

if (value instanceof String || value instanceof Number || value instanceof Boolean) {
return value; // 基本类型直接返回
}

if (value instanceof List) {
List<?> list = (List<?>) value;
List<Object> copiedList = new ArrayList<>();
for (Object item : list) {
copiedList.add(deepCopyValue(item));
}
return copiedList;
}

if (value instanceof Map) {
Map<?, ?> map = (Map<?, ?>) value;
Map<Object, Object> copiedMap = new HashMap<>();
for (Map.Entry<?, ?> entry : map.entrySet()) {
Object key = deepCopyValue(entry.getKey());
Object val = deepCopyValue(entry.getValue());
copiedMap.put(key, val);
}
return copiedMap;
}

// 对于其他对象,尝试序列化反序列化
try {
ByteArrayOutputStream baos = new ByteArrayOutputStream();
ObjectOutputStream oos = new ObjectOutputStream(baos);
oos.writeObject(value);
oos.close();

ByteArrayInputStream bais = new ByteArrayInputStream(baos.toByteArray());
ObjectInputStream ois = new ObjectInputStream(bais);
Object copiedValue = ois.readObject();
ois.close();

return copiedValue;

} catch (Exception e) {
log.debug("序列化复制失败,使用原始值: {}", e.getMessage());
return value;
}

} catch (Exception e) {
log.error("深度复制值失败", e);
return value;
}
}

/**
* 记录上下文信息
*/
public void recordContext(String name, Object value) {
try {
String key = "record_" + name;
setThreadLocalContext(key, value);

} catch (Exception e) {
log.error("记录上下文信息失败: {}", name, e);
}
}

/**
* 清理上下文信息
*/
public void clearContext(String name) {
try {
String key = "record_" + name;
Map<String, Object> threadContext = threadLocalContext.get();
if (threadContext != null) {
threadContext.remove(key);
}

} catch (Exception e) {
log.error("清理上下文信息失败: {}", name, e);
}
}

/**
* 清理所有上下文
*/
public void clearAllContext() {
try {
contextMap.clear();
threadLocalContext.remove();

log.info("所有上下文清理完成");

} catch (Exception e) {
log.error("清理所有上下文失败", e);
}
}

/**
* 获取当前上下文
*/
public Map<String, Object> getCurrentContext() {
try {
Map<String, Object> currentContext = new HashMap<>();

// 获取ThreadLocal上下文
Map<String, Object> threadContext = threadLocalContext.get();
if (threadContext != null) {
currentContext.putAll(threadContext);
}

// 获取全局上下文
currentContext.putAll(contextMap);

return currentContext;

} catch (Exception e) {
log.error("获取当前上下文失败", e);
return new HashMap<>();
}
}

/**
* 设置ThreadLocal上下文
*/
private void setThreadLocalContext(String key, Object value) {
Map<String, Object> threadContext = threadLocalContext.get();
if (threadContext == null) {
threadContext = new HashMap<>();
threadLocalContext.set(threadContext);
}
threadContext.put(key, value);
}

/**
* 获取继承指标
*/
public InheritanceMetrics getInheritanceMetrics() {
try {
long total = totalInheritanceCount.get();
long successful = successfulInheritanceCount.get();
long failed = failedInheritanceCount.get();
double successRate = total > 0 ? (double) successful / total : 0;

return InheritanceMetrics.builder()
.totalInheritanceCount(total)
.successfulInheritanceCount(successful)
.failedInheritanceCount(failed)
.successRate(successRate)
.registeredInheritableThreadLocalCount(registeredInheritableThreadLocals.size())
.build();

} catch (Exception e) {
log.error("获取继承指标失败", e);
throw new ContextInheritanceException("获取继承指标失败: " + e.getMessage());
}
}
}

2.3 异步任务管理器

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
/**
* 异步任务管理器
* 负责管理异步任务的执行和上下文传递
*/
@Component
@Slf4j
public class AsyncTaskManager {

private final ExecutorService defaultExecutor;
private final Map<String, ExecutorService> customExecutors = new ConcurrentHashMap<>();
private final AtomicLong totalTaskCount = new AtomicLong(0);
private final AtomicLong completedTaskCount = new AtomicLong(0);
private final AtomicLong failedTaskCount = new AtomicLong(0);

private final ScheduledExecutorService scheduler = Executors.newScheduledThreadPool(2);

/**
* 执行异步任务
*/
public <T> CompletableFuture<T> executeAsyncTask(String taskName, Callable<T> task, Map<String, Object> context) {
try {
totalTaskCount.incrementAndGet();

// 创建异步任务
CompletableFuture<T> future = CompletableFuture.supplyAsync(() -> {
try {
// 设置上下文
setTaskContext(context);

// 执行任务
T result = task.call();

completedTaskCount.incrementAndGet();
log.debug("异步任务执行成功: {}", taskName);

return result;

} catch (Exception e) {
failedTaskCount.incrementAndGet();
log.error("异步任务执行失败: {}", taskName, e);
throw new RuntimeException("异步任务执行失败: " + e.getMessage(), e);
} finally {
// 清理上下文
clearTaskContext();
}
}, defaultExecutor);

return future;

} catch (Exception e) {
log.error("创建异步任务失败: {}", taskName, e);
throw new AsyncTaskException("创建异步任务失败: " + e.getMessage());
}
}

/**
* 执行异步任务(带返回值处理)
*/
public <T, R> CompletableFuture<R> executeAsyncTaskWithResult(String taskName, Callable<T> task,
Function<T, R> resultProcessor, Map<String, Object> context) {
try {
totalTaskCount.incrementAndGet();

// 创建异步任务
CompletableFuture<R> future = CompletableFuture.supplyAsync(() -> {
try {
// 设置上下文
setTaskContext(context);

// 执行任务
T taskResult = task.call();

// 处理结果
R processedResult = resultProcessor.apply(taskResult);

completedTaskCount.incrementAndGet();
log.debug("异步任务执行成功: {}", taskName);

return processedResult;

} catch (Exception e) {
failedTaskCount.incrementAndGet();
log.error("异步任务执行失败: {}", taskName, e);
throw new RuntimeException("异步任务执行失败: " + e.getMessage(), e);
} finally {
// 清理上下文
clearTaskContext();
}
}, defaultExecutor);

return future;

} catch (Exception e) {
log.error("创建异步任务失败: {}", taskName, e);
throw new AsyncTaskException("创建异步任务失败: " + e.getMessage());
}
}

/**
* 批量执行异步任务
*/
public <T> CompletableFuture<List<T>> executeBatchAsyncTasks(String taskName, List<Callable<T>> tasks,
Map<String, Object> context) {
try {
totalTaskCount.addAndGet(tasks.size());

// 创建批量异步任务
CompletableFuture<List<T>> future = CompletableFuture.supplyAsync(() -> {
try {
// 设置上下文
setTaskContext(context);

// 执行批量任务
List<T> results = new ArrayList<>();
for (Callable<T> task : tasks) {
try {
T result = task.call();
results.add(result);
completedTaskCount.incrementAndGet();
} catch (Exception e) {
log.error("批量任务中的单个任务执行失败: {}", taskName, e);
failedTaskCount.incrementAndGet();
throw e;
}
}

log.debug("批量异步任务执行成功: {}", taskName);
return results;

} catch (Exception e) {
log.error("批量异步任务执行失败: {}", taskName, e);
throw new RuntimeException("批量异步任务执行失败: " + e.getMessage(), e);
} finally {
// 清理上下文
clearTaskContext();
}
}, defaultExecutor);

return future;

} catch (Exception e) {
log.error("创建批量异步任务失败: {}", taskName, e);
throw new AsyncTaskException("创建批量异步任务失败: " + e.getMessage());
}
}

/**
* 执行异步任务(带超时)
*/
public <T> CompletableFuture<T> executeAsyncTaskWithTimeout(String taskName, Callable<T> task,
Map<String, Object> context, long timeout, TimeUnit timeUnit) {
try {
totalTaskCount.incrementAndGet();

// 创建异步任务
CompletableFuture<T> future = CompletableFuture.supplyAsync(() -> {
try {
// 设置上下文
setTaskContext(context);

// 执行任务
T result = task.call();

completedTaskCount.incrementAndGet();
log.debug("异步任务执行成功: {}", taskName);

return result;

} catch (Exception e) {
failedTaskCount.incrementAndGet();
log.error("异步任务执行失败: {}", taskName, e);
throw new RuntimeException("异步任务执行失败: " + e.getMessage(), e);
} finally {
// 清理上下文
clearTaskContext();
}
}, defaultExecutor);

// 设置超时
future.orTimeout(timeout, timeUnit);

return future;

} catch (Exception e) {
log.error("创建异步任务失败: {}", taskName, e);
throw new AsyncTaskException("创建异步任务失败: " + e.getMessage());
}
}

/**
* 执行异步任务(带重试)
*/
public <T> CompletableFuture<T> executeAsyncTaskWithRetry(String taskName, Callable<T> task,
Map<String, Object> context, int maxRetries) {
try {
totalTaskCount.incrementAndGet();

// 创建异步任务
CompletableFuture<T> future = CompletableFuture.supplyAsync(() -> {
Exception lastException = null;

for (int attempt = 0; attempt <= maxRetries; attempt++) {
try {
// 设置上下文
setTaskContext(context);

// 执行任务
T result = task.call();

completedTaskCount.incrementAndGet();
log.debug("异步任务执行成功: {} (尝试次数: {})", taskName, attempt + 1);

return result;

} catch (Exception e) {
lastException = e;
log.warn("异步任务执行失败: {} (尝试次数: {})", taskName, attempt + 1, e);

if (attempt < maxRetries) {
// 等待一段时间后重试
try {
Thread.sleep(1000 * (attempt + 1));
} catch (InterruptedException ie) {
Thread.currentThread().interrupt();
break;
}
}
} finally {
// 清理上下文
clearTaskContext();
}
}

failedTaskCount.incrementAndGet();
log.error("异步任务执行失败: {} (最大重试次数: {})", taskName, maxRetries, lastException);
throw new RuntimeException("异步任务执行失败: " + lastException.getMessage(), lastException);

}, defaultExecutor);

return future;

} catch (Exception e) {
log.error("创建异步任务失败: {}", taskName, e);
throw new AsyncTaskException("创建异步任务失败: " + e.getMessage());
}
}

/**
* 设置任务上下文
*/
private void setTaskContext(Map<String, Object> context) {
try {
if (context != null && !context.isEmpty()) {
// 设置到ThreadLocal
for (Map.Entry<String, Object> entry : context.entrySet()) {
String key = entry.getKey();
Object value = entry.getValue();

// 这里可以根据需要设置到具体的InheritableThreadLocal
// 简化实现,记录到日志
log.debug("设置任务上下文: {} = {}", key, value);
}
}

} catch (Exception e) {
log.error("设置任务上下文失败", e);
}
}

/**
* 清理任务上下文
*/
private void clearTaskContext() {
try {
// 清理ThreadLocal
// 简化实现,记录到日志
log.debug("清理任务上下文");

} catch (Exception e) {
log.error("清理任务上下文失败", e);
}
}

/**
* 获取异步任务指标
*/
public AsyncTaskMetrics getAsyncTaskMetrics() {
try {
long total = totalTaskCount.get();
long completed = completedTaskCount.get();
long failed = failedTaskCount.get();
double successRate = total > 0 ? (double) completed / total : 0;

return AsyncTaskMetrics.builder()
.totalTaskCount(total)
.completedTaskCount(completed)
.failedTaskCount(failed)
.successRate(successRate)
.customExecutorCount(customExecutors.size())
.build();

} catch (Exception e) {
log.error("获取异步任务指标失败", e);
throw new AsyncTaskException("获取异步任务指标失败: " + e.getMessage());
}
}

/**
* 创建自定义执行器
*/
public ExecutorService createCustomExecutor(String executorName, int corePoolSize, int maximumPoolSize) {
try {
ExecutorService executor = Executors.newFixedThreadPool(maximumPoolSize, r -> {
Thread thread = new Thread(r);
thread.setName(executorName + "-" + thread.getId());
thread.setDaemon(true);
return thread;
});

customExecutors.put(executorName, executor);

log.info("自定义执行器创建成功: {}", executorName);
return executor;

} catch (Exception e) {
log.error("自定义执行器创建失败: {}", executorName, e);
throw new AsyncTaskException("自定义执行器创建失败: " + e.getMessage());
}
}

/**
* 清理异步任务管理器
*/
@PreDestroy
public void cleanup() {
try {
// 关闭默认执行器
defaultExecutor.shutdown();

// 关闭自定义执行器
for (ExecutorService executor : customExecutors.values()) {
executor.shutdown();
}

// 关闭调度器
scheduler.shutdown();

log.info("异步任务管理器清理完成");

} catch (Exception e) {
log.error("异步任务管理器清理失败", e);
}
}
}

三、线程池管理器

3.1 线程池管理器实现

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
/**
* 线程池管理器
* 负责管理自定义线程池和InheritableThreadLocal支持
*/
@Component
@Slf4j
public class ThreadPoolManager {

private final Map<String, ExecutorService> threadPools = new ConcurrentHashMap<>();
private final Map<String, ThreadPoolInfo> threadPoolInfos = new ConcurrentHashMap<>();

private final AtomicLong totalTaskCount = new AtomicLong(0);
private final AtomicLong completedTaskCount = new AtomicLong(0);
private final AtomicLong rejectedTaskCount = new AtomicLong(0);

/**
* 创建自定义线程池
*/
public ExecutorService createCustomThreadPool(String poolName, int corePoolSize, int maximumPoolSize) {
try {
// 检查是否已存在
if (threadPools.containsKey(poolName)) {
throw new ThreadPoolException("线程池已存在: " + poolName);
}

// 创建线程工厂
ThreadFactory threadFactory = new InheritableThreadFactory(poolName);

// 创建拒绝策略
RejectedExecutionHandler rejectedHandler = new InheritableRejectedExecutionHandler(poolName);

// 创建线程池
ThreadPoolExecutor threadPool = new ThreadPoolExecutor(
corePoolSize,
maximumPoolSize,
60L,
TimeUnit.SECONDS,
new LinkedBlockingQueue<>(1000),
threadFactory,
rejectedHandler
);

// 注册线程池
threadPools.put(poolName, threadPool);

// 创建线程池信息
ThreadPoolInfo info = ThreadPoolInfo.builder()
.name(poolName)
.corePoolSize(corePoolSize)
.maximumPoolSize(maximumPoolSize)
.currentPoolSize(threadPool.getPoolSize())
.activeCount(threadPool.getActiveCount())
.taskCount(threadPool.getTaskCount())
.completedTaskCount(threadPool.getCompletedTaskCount())
.build();

threadPoolInfos.put(poolName, info);

log.info("自定义线程池创建成功: {}", poolName);
return threadPool;

} catch (Exception e) {
log.error("自定义线程池创建失败: {}", poolName, e);
throw new ThreadPoolException("自定义线程池创建失败: " + e.getMessage());
}
}

/**
* 获取线程池
*/
public ExecutorService getThreadPool(String poolName) {
ExecutorService threadPool = threadPools.get(poolName);
if (threadPool == null) {
throw new ThreadPoolException("线程池不存在: " + poolName);
}
return threadPool;
}

/**
* 执行任务
*/
public <T> CompletableFuture<T> executeTask(String poolName, Callable<T> task) {
try {
ExecutorService threadPool = getThreadPool(poolName);

totalTaskCount.incrementAndGet();

CompletableFuture<T> future = CompletableFuture.supplyAsync(() -> {
try {
T result = task.call();
completedTaskCount.incrementAndGet();
return result;
} catch (Exception e) {
log.error("任务执行失败: {}", poolName, e);
throw new RuntimeException("任务执行失败: " + e.getMessage(), e);
}
}, threadPool);

return future;

} catch (Exception e) {
log.error("执行任务失败: {}", poolName, e);
throw new ThreadPoolException("执行任务失败: " + e.getMessage());
}
}

/**
* 批量执行任务
*/
public <T> CompletableFuture<List<T>> executeBatchTasks(String poolName, List<Callable<T>> tasks) {
try {
ExecutorService threadPool = getThreadPool(poolName);

totalTaskCount.addAndGet(tasks.size());

List<CompletableFuture<T>> futures = tasks.stream()
.map(task -> CompletableFuture.supplyAsync(() -> {
try {
T result = task.call();
completedTaskCount.incrementAndGet();
return result;
} catch (Exception e) {
log.error("批量任务中的单个任务执行失败: {}", poolName, e);
throw new RuntimeException("任务执行失败: " + e.getMessage(), e);
}
}, threadPool))
.collect(Collectors.toList());

CompletableFuture<List<T>> future = CompletableFuture.allOf(
futures.toArray(new CompletableFuture[0]))
.thenApply(v -> futures.stream()
.map(CompletableFuture::join)
.collect(Collectors.toList()));

return future;

} catch (Exception e) {
log.error("批量执行任务失败: {}", poolName, e);
throw new ThreadPoolException("批量执行任务失败: " + e.getMessage());
}
}

/**
* 获取线程池信息
*/
public ThreadPoolInfo getThreadPoolInfo(String poolName) {
ThreadPoolInfo info = threadPoolInfos.get(poolName);
if (info == null) {
throw new ThreadPoolException("线程池信息不存在: " + poolName);
}

// 更新实时信息
ExecutorService threadPool = threadPools.get(poolName);
if (threadPool instanceof ThreadPoolExecutor) {
ThreadPoolExecutor tpe = (ThreadPoolExecutor) threadPool;
info.setCurrentPoolSize(tpe.getPoolSize());
info.setActiveCount(tpe.getActiveCount());
info.setTaskCount(tpe.getTaskCount());
info.setCompletedTaskCount(tpe.getCompletedTaskCount());
}

return info;
}

/**
* 获取所有线程池信息
*/
public Map<String, ThreadPoolInfo> getAllThreadPoolInfos() {
Map<String, ThreadPoolInfo> allInfos = new HashMap<>();

for (String poolName : threadPools.keySet()) {
try {
ThreadPoolInfo info = getThreadPoolInfo(poolName);
allInfos.put(poolName, info);
} catch (Exception e) {
log.error("获取线程池信息失败: {}", poolName, e);
}
}

return allInfos;
}

/**
* 关闭线程池
*/
public void shutdownThreadPool(String poolName) {
try {
ExecutorService threadPool = threadPools.get(poolName);
if (threadPool != null) {
threadPool.shutdown();
threadPools.remove(poolName);
threadPoolInfos.remove(poolName);

log.info("线程池关闭成功: {}", poolName);
}

} catch (Exception e) {
log.error("关闭线程池失败: {}", poolName, e);
throw new ThreadPoolException("关闭线程池失败: " + e.getMessage());
}
}

/**
* 关闭所有线程池
*/
public void shutdownAllThreadPools() {
try {
for (String poolName : threadPools.keySet()) {
shutdownThreadPool(poolName);
}

log.info("所有线程池关闭完成");

} catch (Exception e) {
log.error("关闭所有线程池失败", e);
}
}

/**
* 获取线程池指标
*/
public ThreadPoolMetrics getThreadPoolMetrics() {
try {
return ThreadPoolMetrics.builder()
.totalThreadPoolCount(threadPools.size())
.totalTaskCount(totalTaskCount.get())
.completedTaskCount(completedTaskCount.get())
.rejectedTaskCount(rejectedTaskCount.get())
.build();

} catch (Exception e) {
log.error("获取线程池指标失败", e);
throw new ThreadPoolException("获取线程池指标失败: " + e.getMessage());
}
}

/**
* 清理线程池管理器
*/
@PreDestroy
public void cleanup() {
try {
shutdownAllThreadPools();
log.info("线程池管理器清理完成");

} catch (Exception e) {
log.error("线程池管理器清理失败", e);
}
}
}

3.2 可继承线程工厂

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
/**
* 可继承线程工厂
* 支持InheritableThreadLocal的线程工厂
*/
public class InheritableThreadFactory implements ThreadFactory {

private final String poolName;
private final AtomicInteger threadNumber = new AtomicInteger(1);
private final ThreadGroup group;

public InheritableThreadFactory(String poolName) {
this.poolName = poolName;
SecurityManager s = System.getSecurityManager();
this.group = (s != null) ? s.getThreadGroup() : Thread.currentThread().getThreadGroup();
}

@Override
public Thread newThread(Runnable r) {
Thread thread = new InheritableThread(group, r, poolName + "-" + threadNumber.getAndIncrement(), 0);

if (thread.isDaemon()) {
thread.setDaemon(false);
}

if (thread.getPriority() != Thread.NORM_PRIORITY) {
thread.setPriority(Thread.NORM_PRIORITY);
}

return thread;
}
}

3.3 可继承拒绝策略

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
/**
* 可继承拒绝策略
* 支持InheritableThreadLocal的拒绝策略
*/
public class InheritableRejectedExecutionHandler implements RejectedExecutionHandler {

private final String poolName;
private final AtomicLong rejectedCount = new AtomicLong(0);

public InheritableRejectedExecutionHandler(String poolName) {
this.poolName = poolName;
}

@Override
public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
rejectedCount.incrementAndGet();

log.warn("任务被拒绝执行: {} - 线程池: {}, 活跃线程: {}, 队列大小: {}",
r.getClass().getSimpleName(),
poolName,
executor.getActiveCount(),
executor.getQueue().size());

// 尝试在调用线程中执行
if (!executor.isShutdown()) {
try {
r.run();
} catch (Exception e) {
log.error("在调用线程中执行任务失败", e);
}
}
}

public long getRejectedCount() {
return rejectedCount.get();
}
}

四、性能监控与配置管理

4.1 性能监控器

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
/**
* 性能监控器
* 负责监控InheritableThreadLocal的性能指标
*/
@Component
@Slf4j
public class PerformanceMonitor {

private final Map<String, PerformanceMetrics> metricsMap = new ConcurrentHashMap<>();
private final AtomicLong totalOperations = new AtomicLong(0);
private final AtomicLong totalInheritanceOperations = new AtomicLong(0);
private final AtomicLong totalLatency = new AtomicLong(0);

private final ScheduledExecutorService scheduler = Executors.newScheduledThreadPool(1);

/**
* 记录操作
*/
public void recordOperation(String name, String operation) {
try {
totalOperations.incrementAndGet();

PerformanceMetrics metrics = metricsMap.computeIfAbsent(name, k ->
PerformanceMetrics.builder()
.name(k)
.setCount(0)
.getCount(0)
.removeCount(0)
.inheritanceCount(0)
.totalLatency(0)
.averageLatency(0)
.maxLatency(0)
.minLatency(Long.MAX_VALUE)
.lastOperationTime(System.currentTimeMillis())
.build()
);

long startTime = System.nanoTime();

// 更新指标
updateMetrics(metrics, operation);

long endTime = System.nanoTime();
long latency = endTime - startTime;

totalLatency.addAndGet(latency);
metrics.setTotalLatency(metrics.getTotalLatency() + latency);
metrics.setAverageLatency(metrics.getTotalLatency() / getTotalOperationCount(metrics));
metrics.setMaxLatency(Math.max(metrics.getMaxLatency(), latency));
metrics.setMinLatency(Math.min(metrics.getMinLatency(), latency));
metrics.setLastOperationTime(System.currentTimeMillis());

} catch (Exception e) {
log.error("记录操作失败: {} - {}", name, operation, e);
}
}

/**
* 记录继承操作
*/
public void recordInheritance(String name, Object value) {
try {
totalInheritanceOperations.incrementAndGet();

PerformanceMetrics metrics = metricsMap.computeIfAbsent(name, k ->
PerformanceMetrics.builder()
.name(k)
.setCount(0)
.getCount(0)
.removeCount(0)
.inheritanceCount(0)
.totalLatency(0)
.averageLatency(0)
.maxLatency(0)
.minLatency(Long.MAX_VALUE)
.lastOperationTime(System.currentTimeMillis())
.build()
);

metrics.setInheritanceCount(metrics.getInheritanceCount() + 1);
metrics.setLastOperationTime(System.currentTimeMillis());

} catch (Exception e) {
log.error("记录继承操作失败: {}", name, e);
}
}

/**
* 更新指标
*/
private void updateMetrics(PerformanceMetrics metrics, String operation) {
switch (operation) {
case "set":
metrics.setSetCount(metrics.getSetCount() + 1);
break;
case "get":
metrics.setGetCount(metrics.getGetCount() + 1);
break;
case "remove":
metrics.setRemoveCount(metrics.getRemoveCount() + 1);
break;
}
}

/**
* 获取总操作次数
*/
private long getTotalOperationCount(PerformanceMetrics metrics) {
return metrics.getSetCount() + metrics.getGetCount() + metrics.getRemoveCount() + metrics.getInheritanceCount();
}

/**
* 获取性能指标
*/
public Map<String, PerformanceMetrics> getMetrics() {
return new HashMap<>(metricsMap);
}

/**
* 获取总体性能指标
*/
public OverallPerformanceMetrics getOverallMetrics() {
try {
long totalOps = totalOperations.get();
long totalInheritanceOps = totalInheritanceOperations.get();
long totalLat = totalLatency.get();
double averageLatency = totalOps > 0 ? (double) totalLat / totalOps : 0;

return OverallPerformanceMetrics.builder()
.totalOperations(totalOps)
.totalInheritanceOperations(totalInheritanceOps)
.totalLatency(totalLat)
.averageLatency(averageLatency)
.inheritableThreadLocalCount(metricsMap.size())
.build();

} catch (Exception e) {
log.error("获取总体性能指标失败", e);
throw new PerformanceException("获取总体性能指标失败: " + e.getMessage());
}
}

/**
* 启动性能监控
*/
@PostConstruct
public void startMonitoring() {
scheduler.scheduleAtFixedRate(() -> {
try {
logPerformanceMetrics();
} catch (Exception e) {
log.error("性能监控失败", e);
}
}, 0, 60, TimeUnit.SECONDS);
}

/**
* 记录性能指标
*/
private void logPerformanceMetrics() {
try {
OverallPerformanceMetrics overallMetrics = getOverallMetrics();

log.info("InheritableThreadLocal性能指标 - 总操作数: {}, 继承操作数: {}, 平均延迟: {}ns, InheritableThreadLocal数量: {}",
overallMetrics.getTotalOperations(),
overallMetrics.getTotalInheritanceOperations(),
overallMetrics.getAverageLatency(),
overallMetrics.getInheritableThreadLocalCount());

// 记录每个InheritableThreadLocal的性能指标
for (PerformanceMetrics metrics : metricsMap.values()) {
log.debug("InheritableThreadLocal性能指标 - {}: 设置={}, 获取={}, 移除={}, 继承={}, 平均延迟={}ns",
metrics.getName(),
metrics.getSetCount(),
metrics.getGetCount(),
metrics.getRemoveCount(),
metrics.getInheritanceCount(),
metrics.getAverageLatency());
}

} catch (Exception e) {
log.error("记录性能指标失败", e);
}
}

/**
* 清理性能监控器
*/
@PreDestroy
public void cleanup() {
try {
scheduler.shutdown();
metricsMap.clear();

log.info("性能监控器清理完成");

} catch (Exception e) {
log.error("性能监控器清理失败", e);
}
}
}

4.2 InheritableThreadLocal配置

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
/**
* InheritableThreadLocal配置
*/
@Configuration
@EnableConfigurationProperties(InheritableThreadLocalProperties.class)
@Slf4j
public class InheritableThreadLocalConfiguration {

private final InheritableThreadLocalProperties properties;

/**
* InheritableThreadLocal管理器
*/
@Bean
@ConditionalOnMissingBean
public InheritableThreadLocalManager inheritableThreadLocalManager(
ContextInheritanceManager contextInheritanceManager,
AsyncTaskManager asyncTaskManager,
ThreadPoolManager threadPoolManager,
PerformanceMonitor performanceMonitor) {

return new InheritableThreadLocalManager(
contextInheritanceManager,
asyncTaskManager,
threadPoolManager,
performanceMonitor
);
}

/**
* 上下文继承管理器
*/
@Bean
@ConditionalOnMissingBean
public ContextInheritanceManager contextInheritanceManager() {
return new ContextInheritanceManager();
}

/**
* 异步任务管理器
*/
@Bean
@ConditionalOnMissingBean
public AsyncTaskManager asyncTaskManager() {
return new AsyncTaskManager();
}

/**
* 线程池管理器
*/
@Bean
@ConditionalOnMissingBean
public ThreadPoolManager threadPoolManager() {
return new ThreadPoolManager();
}

/**
* 性能监控器
*/
@Bean
@ConditionalOnMissingBean
public PerformanceMonitor performanceMonitor() {
return new PerformanceMonitor();
}

/**
* InheritableThreadLocal管理端点
*/
@Bean
@ConditionalOnProperty(name = "inheritable-thread-local.management.endpoints.enabled", havingValue = "true", matchIfMissing = true)
public InheritableThreadLocalManagementEndpoint inheritableThreadLocalManagementEndpoint(
InheritableThreadLocalManager inheritableThreadLocalManager) {

return new InheritableThreadLocalManagementEndpoint(inheritableThreadLocalManager);
}
}

4.3 InheritableThreadLocal属性配置

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
/**
* InheritableThreadLocal属性配置
*/
@ConfigurationProperties(prefix = "inheritable-thread-local")
@Data
@Slf4j
public class InheritableThreadLocalProperties {

/**
* 是否启用InheritableThreadLocal管理
*/
private boolean enabled = true;

/**
* 是否启用上下文继承
*/
private boolean enableContextInheritance = true;

/**
* 是否启用异步任务管理
*/
private boolean enableAsyncTaskManagement = true;

/**
* 是否启用线程池管理
*/
private boolean enableThreadPoolManagement = true;

/**
* 是否启用性能监控
*/
private boolean enablePerformanceMonitoring = true;

/**
* 上下文继承配置
*/
private ContextInheritanceConfig contextInheritance = new ContextInheritanceConfig();

/**
* 异步任务配置
*/
private AsyncTaskConfig asyncTask = new AsyncTaskConfig();

/**
* 线程池配置
*/
private ThreadPoolConfig threadPool = new ThreadPoolConfig();

/**
* 性能监控配置
*/
private PerformanceMonitoringConfig performanceMonitoring = new PerformanceMonitoringConfig();

/**
* 上下文继承配置
*/
@Data
public static class ContextInheritanceConfig {
/**
* 是否启用用户上下文继承
*/
private boolean enableUserContextInheritance = true;

/**
* 是否启用请求上下文继承
*/
private boolean enableRequestContextInheritance = true;

/**
* 是否启用业务上下文继承
*/
private boolean enableBusinessContextInheritance = true;

/**
* 是否启用安全上下文继承
*/
private boolean enableSecurityContextInheritance = true;

/**
* 是否启用日志上下文继承
*/
private boolean enableLogContextInheritance = true;

/**
* 继承策略
*/
private InheritanceStrategy inheritanceStrategy = InheritanceStrategy.DEEP_COPY;

/**
* 继承超时时间(毫秒)
*/
private long inheritanceTimeout = 5000;
}

/**
* 异步任务配置
*/
@Data
public static class AsyncTaskConfig {
/**
* 是否启用异步任务管理
*/
private boolean enabled = true;

/**
* 默认线程池大小
*/
private int defaultPoolSize = 10;

/**
* 任务超时时间(秒)
*/
private int taskTimeout = 30;

/**
* 最大重试次数
*/
private int maxRetries = 3;

/**
* 是否启用任务监控
*/
private boolean enableTaskMonitoring = true;

/**
* 监控间隔(秒)
*/
private int monitoringInterval = 60;
}

/**
* 线程池配置
*/
@Data
public static class ThreadPoolConfig {
/**
* 是否启用线程池管理
*/
private boolean enabled = true;

/**
* 默认核心线程数
*/
private int defaultCorePoolSize = 5;

/**
* 默认最大线程数
*/
private int defaultMaximumPoolSize = 20;

/**
* 线程空闲时间(秒)
*/
private long keepAliveTime = 60;

/**
* 队列容量
*/
private int queueCapacity = 1000;

/**
* 是否启用线程池监控
*/
private boolean enableThreadPoolMonitoring = true;

/**
* 监控间隔(秒)
*/
private int monitoringInterval = 30;
}

/**
* 性能监控配置
*/
@Data
public static class PerformanceMonitoringConfig {
/**
* 是否启用性能监控
*/
private boolean enabled = true;

/**
* 监控间隔(秒)
*/
private int monitoringInterval = 60;

/**
* 是否启用延迟监控
*/
private boolean enableLatencyMonitoring = true;

/**
* 是否启用继承监控
*/
private boolean enableInheritanceMonitoring = true;

/**
* 延迟阈值(纳秒)
*/
private long latencyThreshold = 1000000; // 1ms
}

/**
* 继承策略枚举
*/
public enum InheritanceStrategy {
/**
* 浅度复制
*/
SHALLOW_COPY,

/**
* 深度复制
*/
DEEP_COPY,

/**
* 引用传递
*/
REFERENCE_PASS
}
}

五、InheritableThreadLocal管理端点

5.1 管理端点实现

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
/**
* InheritableThreadLocal管理端点
* 提供RESTful API进行InheritableThreadLocal管理
*/
@RestController
@RequestMapping("/api/inheritable-thread-local")
@Slf4j
public class InheritableThreadLocalManagementEndpoint {

private final InheritableThreadLocalManager inheritableThreadLocalManager;

/**
* 创建InheritableThreadLocal
*/
@PostMapping("/create")
public ResponseEntity<InheritableThreadLocalInfo> createInheritableThreadLocal(
@Valid @RequestBody CreateInheritableThreadLocalRequest request) {

try {
InheritableThreadLocal<?> inheritableThreadLocal = inheritableThreadLocalManager.createInheritableThreadLocal(
request.getName(), request.getType());

InheritableThreadLocalInfo info = InheritableThreadLocalInfo.builder()
.name(request.getName())
.type(request.getType().getSimpleName())
.hasValue(inheritableThreadLocal.get() != null)
.build();

return ResponseEntity.ok(info);
} catch (Exception e) {
log.error("创建InheritableThreadLocal失败", e);
return ResponseEntity.badRequest().build();
}
}

/**
* 设置InheritableThreadLocal值
*/
@PostMapping("/set")
public ResponseEntity<Void> setInheritableThreadLocalValue(
@Valid @RequestBody SetInheritableThreadLocalValueRequest request) {

try {
inheritableThreadLocalManager.setValue(request.getName(), request.getValue());
return ResponseEntity.ok().build();
} catch (Exception e) {
log.error("设置InheritableThreadLocal值失败", e);
return ResponseEntity.badRequest().build();
}
}

/**
* 获取InheritableThreadLocal值
*/
@GetMapping("/get/{name}")
public ResponseEntity<Object> getInheritableThreadLocalValue(@PathVariable String name) {
try {
Object value = inheritableThreadLocalManager.getValue(name);
return ResponseEntity.ok(value);
} catch (Exception e) {
log.error("获取InheritableThreadLocal值失败", e);
return ResponseEntity.badRequest().build();
}
}

/**
* 移除InheritableThreadLocal值
*/
@DeleteMapping("/remove/{name}")
public ResponseEntity<Void> removeInheritableThreadLocalValue(@PathVariable String name) {
try {
inheritableThreadLocalManager.removeValue(name);
return ResponseEntity.ok().build();
} catch (Exception e) {
log.error("移除InheritableThreadLocal值失败", e);
return ResponseEntity.badRequest().build();
}
}

/**
* 执行异步任务
*/
@PostMapping("/execute-async")
public ResponseEntity<CompletableFuture<Object>> executeAsyncTask(
@Valid @RequestBody ExecuteAsyncTaskRequest request) {

try {
CompletableFuture<Object> future = inheritableThreadLocalManager.executeAsyncTask(
request.getTaskName(), request.getTask());
return ResponseEntity.ok(future);
} catch (Exception e) {
log.error("执行异步任务失败", e);
return ResponseEntity.badRequest().build();
}
}

/**
* 创建自定义线程池
*/
@PostMapping("/create-thread-pool")
public ResponseEntity<ThreadPoolInfo> createCustomThreadPool(
@Valid @RequestBody CreateThreadPoolRequest request) {

try {
ExecutorService threadPool = inheritableThreadLocalManager.createCustomThreadPool(
request.getPoolName(), request.getCorePoolSize(), request.getMaximumPoolSize());

ThreadPoolInfo info = ThreadPoolInfo.builder()
.name(request.getPoolName())
.corePoolSize(request.getCorePoolSize())
.maximumPoolSize(request.getMaximumPoolSize())
.currentPoolSize(0)
.activeCount(0)
.taskCount(0)
.completedTaskCount(0)
.build();

return ResponseEntity.ok(info);
} catch (Exception e) {
log.error("创建自定义线程池失败", e);
return ResponseEntity.badRequest().build();
}
}

/**
* 清理所有InheritableThreadLocal
*/
@PostMapping("/clear-all")
public ResponseEntity<Void> clearAllInheritableThreadLocals() {
try {
inheritableThreadLocalManager.clearAll();
return ResponseEntity.ok().build();
} catch (Exception e) {
log.error("清理所有InheritableThreadLocal失败", e);
return ResponseEntity.badRequest().build();
}
}

/**
* 获取InheritableThreadLocal统计信息
*/
@GetMapping("/statistics")
public ResponseEntity<InheritableThreadLocalStatistics> getInheritableThreadLocalStatistics() {
try {
InheritableThreadLocalStatistics statistics = inheritableThreadLocalManager.getStatistics();
return ResponseEntity.ok(statistics);
} catch (Exception e) {
log.error("获取InheritableThreadLocal统计信息失败", e);
return ResponseEntity.badRequest().build();
}
}

/**
* 获取性能指标
*/
@GetMapping("/performance")
public ResponseEntity<OverallPerformanceMetrics> getPerformanceMetrics() {
try {
OverallPerformanceMetrics metrics = inheritableThreadLocalManager.getPerformanceMetrics();
return ResponseEntity.ok(metrics);
} catch (Exception e) {
log.error("获取性能指标失败", e);
return ResponseEntity.badRequest().build();
}
}

/**
* 导出InheritableThreadLocal报告
*/
@GetMapping("/export")
public ResponseEntity<byte[]> exportInheritableThreadLocalReport(
@RequestParam(defaultValue = "pdf") String format) {

try {
byte[] report = inheritableThreadLocalManager.exportInheritableThreadLocalReport(format);
HttpHeaders headers = new HttpHeaders();
headers.setContentType(MediaType.APPLICATION_OCTET_STREAM);
headers.setContentDispositionFormData("attachment", "inheritable-thread-local-report." + format);
return ResponseEntity.ok().headers(headers).body(report);
} catch (Exception e) {
log.error("导出InheritableThreadLocal报告失败", e);
return ResponseEntity.badRequest().build();
}
}
}

六、实战应用与最佳实践

6.1 父子线程上下文传递实战

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
/**
* 父子线程上下文传递实战示例
*/
@RestController
@RequestMapping("/api/context-inheritance")
@Slf4j
public class ContextInheritanceExample {

private final InheritableThreadLocalManager inheritableThreadLocalManager;

/**
* 演示父子线程上下文传递
*/
@PostMapping("/demonstrate-inheritance")
public ResponseEntity<InheritanceDemoResult> demonstrateInheritance() {
try {
// 1. 在父线程中设置上下文
UserContext userContext = UserContext.builder()
.userId("12345")
.username("testuser")
.email("test@example.com")
.roles(Arrays.asList("USER", "ADMIN"))
.permissions(Arrays.asList("READ", "WRITE"))
.loginTime(System.currentTimeMillis())
.build();

inheritableThreadLocalManager.setValue("userContext", userContext);

// 2. 创建异步任务
CompletableFuture<String> future = inheritableThreadLocalManager.executeAsyncTask(
"inheritance-demo", () -> {
// 在子线程中获取上下文
UserContext inheritedContext = inheritableThreadLocalManager.getValue("userContext");

if (inheritedContext != null) {
log.info("子线程成功继承用户上下文: {}", inheritedContext.getUserId());
return "继承成功: " + inheritedContext.getUsername();
} else {
log.warn("子线程未能继承用户上下文");
return "继承失败";
}
});

// 3. 等待任务完成
String result = future.get(10, TimeUnit.SECONDS);

InheritanceDemoResult demoResult = InheritanceDemoResult.builder()
.parentThreadId(Thread.currentThread().getId())
.parentThreadName(Thread.currentThread().getName())
.result(result)
.success(true)
.build();

return ResponseEntity.ok(demoResult);

} catch (Exception e) {
log.error("父子线程上下文传递演示失败", e);
return ResponseEntity.badRequest().build();
}
}

/**
* 演示批量异步任务上下文传递
*/
@PostMapping("/demonstrate-batch-inheritance")
public ResponseEntity<BatchInheritanceDemoResult> demonstrateBatchInheritance() {
try {
// 1. 在父线程中设置上下文
RequestContext requestContext = RequestContext.builder()
.requestId(UUID.randomUUID().toString())
.requestTime(System.currentTimeMillis())
.clientIp("192.168.1.1")
.userAgent("Mozilla/5.0")
.build();

inheritableThreadLocalManager.setValue("requestContext", requestContext);

// 2. 创建批量异步任务
List<Callable<String>> tasks = Arrays.asList(
() -> {
RequestContext context = inheritableThreadLocalManager.getValue("requestContext");
return "任务1: " + (context != null ? context.getRequestId() : "无上下文");
},
() -> {
RequestContext context = inheritableThreadLocalManager.getValue("requestContext");
return "任务2: " + (context != null ? context.getRequestId() : "无上下文");
},
() -> {
RequestContext context = inheritableThreadLocalManager.getValue("requestContext");
return "任务3: " + (context != null ? context.getRequestId() : "无上下文");
}
);

CompletableFuture<List<String>> future = inheritableThreadLocalManager.executeBatchAsyncTasks(
"batch-inheritance-demo", tasks);

// 3. 等待任务完成
List<String> results = future.get(10, TimeUnit.SECONDS);

BatchInheritanceDemoResult demoResult = BatchInheritanceDemoResult.builder()
.parentThreadId(Thread.currentThread().getId())
.parentThreadName(Thread.currentThread().getName())
.taskResults(results)
.success(true)
.build();

return ResponseEntity.ok(demoResult);

} catch (Exception e) {
log.error("批量异步任务上下文传递演示失败", e);
return ResponseEntity.badRequest().build();
}
}

/**
* 演示自定义线程池上下文传递
*/
@PostMapping("/demonstrate-thread-pool-inheritance")
public ResponseEntity<ThreadPoolInheritanceDemoResult> demonstrateThreadPoolInheritance() {
try {
// 1. 在父线程中设置上下文
BusinessContext businessContext = new HashMap<>();
businessContext.put("orderId", "ORDER-12345");
businessContext.put("amount", 100.0);
businessContext.put("currency", "USD");

inheritableThreadLocalManager.setValue("businessContext", businessContext);

// 2. 创建自定义线程池
ExecutorService threadPool = inheritableThreadLocalManager.createCustomThreadPool(
"inheritance-demo-pool", 2, 4);

// 3. 在线程池中执行任务
CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> {
Map<String, Object> context = inheritableThreadLocalManager.getValue("businessContext");

if (context != null) {
log.info("线程池任务成功继承业务上下文: {}", context.get("orderId"));
return "继承成功: " + context.get("orderId");
} else {
log.warn("线程池任务未能继承业务上下文");
return "继承失败";
}
}, threadPool);

// 4. 等待任务完成
String result = future.get(10, TimeUnit.SECONDS);

ThreadPoolInheritanceDemoResult demoResult = ThreadPoolInheritanceDemoResult.builder()
.parentThreadId(Thread.currentThread().getId())
.parentThreadName(Thread.currentThread().getName())
.threadPoolName("inheritance-demo-pool")
.result(result)
.success(true)
.build();

return ResponseEntity.ok(demoResult);

} catch (Exception e) {
log.error("自定义线程池上下文传递演示失败", e);
return ResponseEntity.badRequest().build();
}
}
}

6.2 InheritableThreadLocal最佳实践

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
/**
* InheritableThreadLocal最佳实践
*/
@Component
@Slf4j
public class InheritableThreadLocalBestPractices {

private final InheritableThreadLocalManager inheritableThreadLocalManager;

/**
* InheritableThreadLocal最佳实践指南
*/
public void demonstrateBestPractices() {
log.info("=== InheritableThreadLocal最佳实践指南 ===");

// 1. 正确使用InheritableThreadLocal
demonstrateCorrectUsage();

// 2. 上下文继承策略
demonstrateInheritanceStrategies();

// 3. 异步任务处理
demonstrateAsyncTaskHandling();

// 4. 线程池管理
demonstrateThreadPoolManagement();

// 5. 性能优化
demonstratePerformanceOptimization();
}

/**
* 正确使用InheritableThreadLocal
*/
private void demonstrateCorrectUsage() {
log.info("--- 正确使用InheritableThreadLocal ---");

try {
// 1. 使用静态变量
InheritableThreadLocal<String> staticInheritableThreadLocal = inheritableThreadLocalManager.createInheritableThreadLocal(
"staticInheritableThreadLocal", String.class);

// 2. 在父线程中设置值
staticInheritableThreadLocal.set("父线程的值");

// 3. 创建子线程
CompletableFuture<String> future = inheritableThreadLocalManager.executeAsyncTask(
"correct-usage-demo", () -> {
// 在子线程中获取值
String value = staticInheritableThreadLocal.get();
log.info("子线程获取到的值: {}", value);
return value;
});

// 4. 等待子线程完成
String result = future.get(5, TimeUnit.SECONDS);
log.info("子线程返回结果: {}", result);

// 5. 清理资源
staticInheritableThreadLocal.remove();

log.info("InheritableThreadLocal使用示例完成");

} catch (Exception e) {
log.error("InheritableThreadLocal使用示例失败", e);
}
}

/**
* 上下文继承策略
*/
private void demonstrateInheritanceStrategies() {
log.info("--- 上下文继承策略 ---");

try {
// 1. 用户上下文继承
UserContext userContext = UserContext.builder()
.userId("12345")
.username("testuser")
.email("test@example.com")
.roles(Arrays.asList("USER", "ADMIN"))
.permissions(Arrays.asList("READ", "WRITE"))
.loginTime(System.currentTimeMillis())
.build();

inheritableThreadLocalManager.setValue("userContext", userContext);

// 2. 请求上下文继承
RequestContext requestContext = RequestContext.builder()
.requestId(UUID.randomUUID().toString())
.requestTime(System.currentTimeMillis())
.clientIp("192.168.1.1")
.userAgent("Mozilla/5.0")
.build();

inheritableThreadLocalManager.setValue("requestContext", requestContext);

// 3. 业务上下文继承
Map<String, Object> businessContext = new HashMap<>();
businessContext.put("orderId", "ORDER-12345");
businessContext.put("amount", 100.0);
businessContext.put("currency", "USD");

inheritableThreadLocalManager.setValue("businessContext", businessContext);

// 4. 在子线程中验证继承
CompletableFuture<Map<String, Object>> future = inheritableThreadLocalManager.executeAsyncTask(
"inheritance-strategy-demo", () -> {
Map<String, Object> inheritedContext = new HashMap<>();

// 获取继承的用户上下文
UserContext inheritedUserContext = inheritableThreadLocalManager.getValue("userContext");
if (inheritedUserContext != null) {
inheritedContext.put("userContext", inheritedUserContext);
}

// 获取继承的请求上下文
RequestContext inheritedRequestContext = inheritableThreadLocalManager.getValue("requestContext");
if (inheritedRequestContext != null) {
inheritedContext.put("requestContext", inheritedRequestContext);
}

// 获取继承的业务上下文
Map<String, Object> inheritedBusinessContext = inheritableThreadLocalManager.getValue("businessContext");
if (inheritedBusinessContext != null) {
inheritedContext.put("businessContext", inheritedBusinessContext);
}

return inheritedContext;
});

// 5. 等待子线程完成
Map<String, Object> inheritedContext = future.get(5, TimeUnit.SECONDS);
log.info("子线程继承的上下文: {}", inheritedContext);

log.info("上下文继承策略示例完成");

} catch (Exception e) {
log.error("上下文继承策略示例失败", e);
}
}

/**
* 异步任务处理
*/
private void demonstrateAsyncTaskHandling() {
log.info("--- 异步任务处理 ---");

try {
// 1. 设置上下文
LogContext logContext = LogContext.builder()
.traceId(UUID.randomUUID().toString())
.spanId(UUID.randomUUID().toString())
.parentSpanId(null)
.build();

inheritableThreadLocalManager.setValue("logContext", logContext);

// 2. 执行异步任务(带超时)
CompletableFuture<String> future = inheritableThreadLocalManager.executeAsyncTaskWithTimeout(
"async-task-demo", () -> {
LogContext context = inheritableThreadLocalManager.getValue("logContext");
if (context != null) {
log.info("异步任务获取到日志上下文: {}", context.getTraceId());
return "任务完成: " + context.getTraceId();
} else {
return "任务完成: 无上下文";
}
}, 5, TimeUnit.SECONDS);

// 3. 等待任务完成
String result = future.get(10, TimeUnit.SECONDS);
log.info("异步任务结果: {}", result);

// 4. 执行异步任务(带重试)
CompletableFuture<String> retryFuture = inheritableThreadLocalManager.executeAsyncTaskWithRetry(
"async-task-retry-demo", () -> {
LogContext context = inheritableThreadLocalManager.getValue("logContext");
if (context != null) {
log.info("重试任务获取到日志上下文: {}", context.getTraceId());
return "重试任务完成: " + context.getTraceId();
} else {
return "重试任务完成: 无上下文";
}
}, 3);

// 5. 等待重试任务完成
String retryResult = retryFuture.get(10, TimeUnit.SECONDS);
log.info("重试任务结果: {}", retryResult);

log.info("异步任务处理示例完成");

} catch (Exception e) {
log.error("异步任务处理示例失败", e);
}
}

/**
* 线程池管理
*/
private void demonstrateThreadPoolManagement() {
log.info("--- 线程池管理 ---");

try {
// 1. 设置上下文
SecurityContext securityContext = SecurityContext.builder()
.principal("testuser")
.authorities(Arrays.asList("ROLE_USER", "ROLE_ADMIN"))
.authenticated(true)
.build();

inheritableThreadLocalManager.setValue("securityContext", securityContext);

// 2. 创建自定义线程池
ExecutorService threadPool = inheritableThreadLocalManager.createCustomThreadPool(
"demo-pool", 2, 4);

// 3. 在线程池中执行任务
List<Callable<String>> tasks = Arrays.asList(
() -> {
SecurityContext context = inheritableThreadLocalManager.getValue("securityContext");
return "任务1: " + (context != null ? context.getPrincipal() : "无上下文");
},
() -> {
SecurityContext context = inheritableThreadLocalManager.getValue("securityContext");
return "任务2: " + (context != null ? context.getPrincipal() : "无上下文");
},
() -> {
SecurityContext context = inheritableThreadLocalManager.getValue("securityContext");
return "任务3: " + (context != null ? context.getPrincipal() : "无上下文");
}
);

CompletableFuture<List<String>> future = inheritableThreadLocalManager.executeBatchAsyncTasks(
"thread-pool-demo", tasks);

// 4. 等待任务完成
List<String> results = future.get(10, TimeUnit.SECONDS);
log.info("线程池任务结果: {}", results);

// 5. 关闭线程池
threadPool.shutdown();

log.info("线程池管理示例完成");

} catch (Exception e) {
log.error("线程池管理示例失败", e);
}
}

/**
* 性能优化
*/
private void demonstratePerformanceOptimization() {
log.info("--- 性能优化 ---");

try {
// 1. 使用缓存
InheritableThreadLocal<Map<String, Object>> cacheInheritableThreadLocal = inheritableThreadLocalManager.createInheritableThreadLocal(
"cacheInheritableThreadLocal", Map.class);

// 2. 在父线程中设置缓存
Map<String, Object> cache = new HashMap<>();
cache.put("key1", "value1");
cache.put("key2", "value2");
cacheInheritableThreadLocal.set(cache);

// 3. 在子线程中使用缓存
CompletableFuture<Map<String, Object>> future = inheritableThreadLocalManager.executeAsyncTask(
"performance-optimization-demo", () -> {
Map<String, Object> inheritedCache = cacheInheritableThreadLocal.get();
if (inheritedCache == null) {
inheritedCache = new HashMap<>();
cacheInheritableThreadLocal.set(inheritedCache);
}

// 使用缓存
inheritedCache.put("key3", "value3");
inheritedCache.put("key4", "value4");

return inheritedCache;
});

// 4. 等待任务完成
Map<String, Object> result = future.get(5, TimeUnit.SECONDS);
log.info("性能优化结果: {}", result);

// 5. 清理资源
cacheInheritableThreadLocal.remove();

log.info("性能优化示例完成");

} catch (Exception e) {
log.error("性能优化示例失败", e);
}
}
}

七、总结

本文深入探讨了SpringBoot中InheritableThreadLocal的架构设计与实战应用。通过构建企业级的InheritableThreadLocal管理框架,我们实现了:

  1. 父子线程上下文传递:自动将父线程的变量传递给子线程,实现上下文的继承和传递
  2. 上下文继承管理:支持用户上下文、请求上下文、业务上下文等多种上下文类型的继承
  3. 异步任务处理:提供完整的异步任务执行和上下文传递能力
  4. 线程池管理:支持自定义线程池和InheritableThreadLocal的集成
  5. 性能监控与优化:提供全面的性能监控和优化机制

通过这套企业级的InheritableThreadLocal架构,我们能够安全高效地管理父子线程间的上下文传递,实现异步任务处理和线程池管理。随着微服务架构和异步编程的不断发展,构建可扩展、可维护的InheritableThreadLocal框架将成为企业级架构师必须掌握的核心技能。

在实际应用中,建议遵循InheritableThreadLocal的最佳实践,合理使用上下文继承策略,及时清理资源,并通过监控和调试工具确保系统的稳定运行。只有这样,才能真正发挥InheritableThreadLocal的价值,构建高质量的企业级应用系统。