第240集SpringBoot TransmittableThreadLocal架构实战:线程池传递、TTL装饰器、跨线程上下文的企业级解决方案

前言

在复杂的多线程应用中,线程池的使用已经成为常态,但传统的ThreadLocal和InheritableThreadLocal都无法很好地支持线程池场景下的上下文传递。TransmittableThreadLocal(TTL)作为阿里巴巴开源的高性能线程本地变量传递框架,能够完美解决线程池场景下的上下文传递问题。基于SpringBoot的TransmittableThreadLocal架构,不仅能够实现线程池间的上下文传递,还能通过TTL装饰器模式提供跨线程的上下文管理能力。随着微服务架构和异步编程的普及,构建可扩展、高性能的TransmittableThreadLocal框架,已成为企业级架构师必须掌握的核心技能。

本文将深入探讨SpringBoot中TransmittableThreadLocal的架构设计与实战应用,从线程池传递到TTL装饰器,从跨线程上下文到性能优化,为企业构建稳定、高效的TransmittableThreadLocal解决方案提供全面的技术指导。

一、SpringBoot TransmittableThreadLocal架构概述与核心原理

1.1 TransmittableThreadLocal架构设计

SpringBoot TransmittableThreadLocal系统采用分层架构设计,通过线程池传递、TTL装饰器、跨线程上下文等技术,实现高效的线程间上下文传递能力。

1.2 核心组件架构

二、企业级TransmittableThreadLocal管理器设计

2.1 TransmittableThreadLocal核心管理器

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
/**
* 企业级TransmittableThreadLocal管理器
* 提供线程池场景下的上下文传递和TTL装饰器管理
*/
@Component
@Slf4j
public class TransmittableThreadLocalManager {

private final Map<String, TransmittableThreadLocal<?>> ttlMap = new ConcurrentHashMap<>();
private final TTLDecoratorManager ttlDecoratorManager;
private final ThreadPoolTransmitter threadPoolTransmitter;
private final AsyncTaskTransmitter asyncTaskTransmitter;
private final PerformanceMonitor performanceMonitor;

/**
* 创建TransmittableThreadLocal实例
*/
public <T> TransmittableThreadLocal<T> createTransmittableThreadLocal(String name, Class<T> type) {
try {
// 检查是否已存在
if (ttlMap.containsKey(name)) {
throw new TransmittableThreadLocalException("TransmittableThreadLocal已存在: " + name);
}

// 创建TransmittableThreadLocal实例
TransmittableThreadLocal<T> ttl = new TransmittableThreadLocal<T>() {
@Override
protected T initialValue() {
try {
return type.newInstance();
} catch (Exception e) {
log.error("TransmittableThreadLocal初始值创建失败: {}", name, e);
return null;
}
}

@Override
protected T copy(T parentValue) {
try {
// 记录复制操作
performanceMonitor.recordCopy(name, parentValue);

// 执行上下文复制逻辑
return ttlDecoratorManager.copyContext(name, parentValue);

} catch (Exception e) {
log.error("TransmittableThreadLocal复制失败: {}", name, e);
return parentValue; // 返回父线程的值
}
}

@Override
protected void beforeExecute() {
try {
// 记录执行前操作
performanceMonitor.recordBeforeExecute(name);

// 执行上下文设置逻辑
ttlDecoratorManager.beforeExecute(name);

} catch (Exception e) {
log.error("TransmittableThreadLocal执行前操作失败: {}", name, e);
}
}

@Override
protected void afterExecute() {
try {
// 记录执行后操作
performanceMonitor.recordAfterExecute(name);

// 执行上下文清理逻辑
ttlDecoratorManager.afterExecute(name);

} catch (Exception e) {
log.error("TransmittableThreadLocal执行后操作失败: {}", name, e);
}
}

@Override
public void set(T value) {
super.set(value);
// 记录设置操作
performanceMonitor.recordOperation(name, "set");
}

@Override
public T get() {
T value = super.get();
// 记录获取操作
performanceMonitor.recordOperation(name, "get");
return value;
}

@Override
public void remove() {
T value = get();
super.remove();
// 记录移除操作
performanceMonitor.recordOperation(name, "remove");
}
};

// 注册TransmittableThreadLocal
ttlMap.put(name, ttl);

// 注册到TTL装饰器管理器
ttlDecoratorManager.registerTransmittableThreadLocal(name, ttl);

log.info("TransmittableThreadLocal创建成功: {}", name);
return ttl;

} catch (Exception e) {
log.error("TransmittableThreadLocal创建失败: {}", name, e);
throw new TransmittableThreadLocalException("TransmittableThreadLocal创建失败: " + e.getMessage());
}
}

/**
* 获取TransmittableThreadLocal实例
*/
@SuppressWarnings("unchecked")
public <T> TransmittableThreadLocal<T> getTransmittableThreadLocal(String name) {
TransmittableThreadLocal<?> ttl = ttlMap.get(name);
if (ttl == null) {
throw new TransmittableThreadLocalException("TransmittableThreadLocal不存在: " + name);
}
return (TransmittableThreadLocal<T>) ttl;
}

/**
* 设置TransmittableThreadLocal值
*/
public <T> void setValue(String name, T value) {
try {
TransmittableThreadLocal<T> ttl = getTransmittableThreadLocal(name);
ttl.set(value);

log.debug("TransmittableThreadLocal值设置成功: {} = {}", name, value);

} catch (Exception e) {
log.error("TransmittableThreadLocal值设置失败: {}", name, e);
throw new TransmittableThreadLocalException("TransmittableThreadLocal值设置失败: " + e.getMessage());
}
}

/**
* 获取TransmittableThreadLocal值
*/
public <T> T getValue(String name) {
try {
TransmittableThreadLocal<T> ttl = getTransmittableThreadLocal(name);
T value = ttl.get();

return value;

} catch (Exception e) {
log.error("TransmittableThreadLocal值获取失败: {}", name, e);
throw new TransmittableThreadLocalException("TransmittableThreadLocal值获取失败: " + e.getMessage());
}
}

/**
* 移除TransmittableThreadLocal值
*/
public <T> void removeValue(String name) {
try {
TransmittableThreadLocal<T> ttl = getTransmittableThreadLocal(name);
ttl.remove();

log.debug("TransmittableThreadLocal值移除成功: {}", name);

} catch (Exception e) {
log.error("TransmittableThreadLocal值移除失败: {}", name, e);
throw new TransmittableThreadLocalException("TransmittableThreadLocal值移除失败: " + e.getMessage());
}
}

/**
* 装饰Runnable
*/
public Runnable decorateRunnable(String taskName, Runnable runnable) {
try {
return ttlDecoratorManager.decorateRunnable(taskName, runnable);

} catch (Exception e) {
log.error("装饰Runnable失败: {}", taskName, e);
throw new TransmittableThreadLocalException("装饰Runnable失败: " + e.getMessage());
}
}

/**
* 装饰Callable
*/
public <T> Callable<T> decorateCallable(String taskName, Callable<T> callable) {
try {
return ttlDecoratorManager.decorateCallable(taskName, callable);

} catch (Exception e) {
log.error("装饰Callable失败: {}", taskName, e);
throw new TransmittableThreadLocalException("装饰Callable失败: " + e.getMessage());
}
}

/**
* 装饰Executor
*/
public Executor decorateExecutor(String executorName, Executor executor) {
try {
return ttlDecoratorManager.decorateExecutor(executorName, executor);

} catch (Exception e) {
log.error("装饰Executor失败: {}", executorName, e);
throw new TransmittableThreadLocalException("装饰Executor失败: " + e.getMessage());
}
}

/**
* 装饰ExecutorService
*/
public ExecutorService decorateExecutorService(String executorName, ExecutorService executorService) {
try {
return ttlDecoratorManager.decorateExecutorService(executorName, executorService);

} catch (Exception e) {
log.error("装饰ExecutorService失败: {}", executorName, e);
throw new TransmittableThreadLocalException("装饰ExecutorService失败: " + e.getMessage());
}
}

/**
* 装饰ScheduledExecutorService
*/
public ScheduledExecutorService decorateScheduledExecutorService(String executorName, ScheduledExecutorService scheduledExecutorService) {
try {
return ttlDecoratorManager.decorateScheduledExecutorService(executorName, scheduledExecutorService);

} catch (Exception e) {
log.error("装饰ScheduledExecutorService失败: {}", executorName, e);
throw new TransmittableThreadLocalException("装饰ScheduledExecutorService失败: " + e.getMessage());
}
}

/**
* 执行异步任务
*/
public <T> CompletableFuture<T> executeAsyncTask(String taskName, Callable<T> task) {
try {
return asyncTaskTransmitter.executeAsyncTask(taskName, task);

} catch (Exception e) {
log.error("执行异步任务失败: {}", taskName, e);
throw new TransmittableThreadLocalException("执行异步任务失败: " + e.getMessage());
}
}

/**
* 执行异步任务(带超时)
*/
public <T> CompletableFuture<T> executeAsyncTaskWithTimeout(String taskName, Callable<T> task, long timeout, TimeUnit timeUnit) {
try {
return asyncTaskTransmitter.executeAsyncTaskWithTimeout(taskName, task, timeout, timeUnit);

} catch (Exception e) {
log.error("执行异步任务失败: {}", taskName, e);
throw new TransmittableThreadLocalException("执行异步任务失败: " + e.getMessage());
}
}

/**
* 批量执行异步任务
*/
public <T> CompletableFuture<List<T>> executeBatchAsyncTasks(String taskName, List<Callable<T>> tasks) {
try {
return asyncTaskTransmitter.executeBatchAsyncTasks(taskName, tasks);

} catch (Exception e) {
log.error("批量执行异步任务失败: {}", taskName, e);
throw new TransmittableThreadLocalException("批量执行异步任务失败: " + e.getMessage());
}
}

/**
* 清理所有TransmittableThreadLocal
*/
public void clearAll() {
try {
for (Map.Entry<String, TransmittableThreadLocal<?>> entry : ttlMap.entrySet()) {
String name = entry.getKey();
TransmittableThreadLocal<?> ttl = entry.getValue();

try {
ttl.remove();
log.debug("TransmittableThreadLocal清理成功: {}", name);
} catch (Exception e) {
log.error("TransmittableThreadLocal清理失败: {}", name, e);
}
}

log.info("所有TransmittableThreadLocal清理完成");

} catch (Exception e) {
log.error("TransmittableThreadLocal清理失败", e);
throw new TransmittableThreadLocalException("TransmittableThreadLocal清理失败: " + e.getMessage());
}
}

/**
* 获取TransmittableThreadLocal统计信息
*/
public TransmittableThreadLocalStatistics getStatistics() {
try {
Map<String, TransmittableThreadLocalInfo> ttlInfos = new HashMap<>();

for (Map.Entry<String, TransmittableThreadLocal<?>> entry : ttlMap.entrySet()) {
String name = entry.getKey();
TransmittableThreadLocal<?> ttl = entry.getValue();

TransmittableThreadLocalInfo info = TransmittableThreadLocalInfo.builder()
.name(name)
.type(ttl.getClass().getSimpleName())
.hasValue(ttl.get() != null)
.build();

ttlInfos.put(name, info);
}

return TransmittableThreadLocalStatistics.builder()
.totalCount(ttlMap.size())
.transmittableThreadLocalInfos(ttlInfos)
.decoratorMetrics(ttlDecoratorManager.getDecoratorMetrics())
.transmissionMetrics(threadPoolTransmitter.getTransmissionMetrics())
.asyncTaskMetrics(asyncTaskTransmitter.getAsyncTaskMetrics())
.performanceMetrics(performanceMonitor.getMetrics())
.build();

} catch (Exception e) {
log.error("获取TransmittableThreadLocal统计信息失败", e);
throw new TransmittableThreadLocalException("获取TransmittableThreadLocal统计信息失败: " + e.getMessage());
}
}

/**
* 销毁TransmittableThreadLocal管理器
*/
@PreDestroy
public void destroy() {
try {
// 清理所有TransmittableThreadLocal
clearAll();

// 清理资源
ttlMap.clear();

log.info("TransmittableThreadLocal管理器销毁完成");

} catch (Exception e) {
log.error("TransmittableThreadLocal管理器销毁失败", e);
}
}
}

2.2 TTL装饰器管理器

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
/**
* TTL装饰器管理器
* 负责管理各种TTL装饰器
*/
@Component
@Slf4j
public class TTLDecoratorManager {

private final Map<String, TransmittableThreadLocal<?>> registeredTTLs = new ConcurrentHashMap<>();
private final AtomicLong totalDecorations = new AtomicLong(0);
private final AtomicLong successfulDecorations = new AtomicLong(0);
private final AtomicLong failedDecorations = new AtomicLong(0);

/**
* 注册TransmittableThreadLocal
*/
public void registerTransmittableThreadLocal(String name, TransmittableThreadLocal<?> ttl) {
try {
registeredTTLs.put(name, ttl);

log.info("TransmittableThreadLocal注册成功: {}", name);

} catch (Exception e) {
log.error("TransmittableThreadLocal注册失败: {}", name, e);
throw new TTLDecoratorException("TransmittableThreadLocal注册失败: " + e.getMessage());
}
}

/**
* 复制上下文
*/
public <T> T copyContext(String name, T parentValue) {
try {
if (parentValue == null) {
return null;
}

// 根据上下文类型执行不同的复制策略
switch (name) {
case "userContext":
return copyUserContext(parentValue);
case "requestContext":
return copyRequestContext(parentValue);
case "businessContext":
return copyBusinessContext(parentValue);
case "securityContext":
return copySecurityContext(parentValue);
case "logContext":
return copyLogContext(parentValue);
default:
return copyDefaultContext(parentValue);
}

} catch (Exception e) {
log.error("复制上下文失败: {}", name, e);
return parentValue;
}
}

/**
* 复制用户上下文
*/
@SuppressWarnings("unchecked")
private <T> T copyUserContext(T parentValue) {
try {
if (parentValue instanceof UserContext) {
UserContext parentUserContext = (UserContext) parentValue;

// 创建新的用户上下文,复制父线程的信息
UserContext copiedUserContext = UserContext.builder()
.userId(parentUserContext.getUserId())
.username(parentUserContext.getUsername())
.email(parentUserContext.getEmail())
.roles(new ArrayList<>(parentUserContext.getRoles()))
.permissions(new ArrayList<>(parentUserContext.getPermissions()))
.loginTime(parentUserContext.getLoginTime())
.threadId(Thread.currentThread().getId())
.copiedFrom(Thread.currentThread().getName())
.build();

return (T) copiedUserContext;
}

return parentValue;

} catch (Exception e) {
log.error("复制用户上下文失败", e);
return parentValue;
}
}

/**
* 复制请求上下文
*/
@SuppressWarnings("unchecked")
private <T> T copyRequestContext(T parentValue) {
try {
if (parentValue instanceof RequestContext) {
RequestContext parentRequestContext = (RequestContext) parentValue;

// 创建新的请求上下文,复制父线程的信息
RequestContext copiedRequestContext = RequestContext.builder()
.requestId(parentRequestContext.getRequestId())
.requestTime(parentRequestContext.getRequestTime())
.clientIp(parentRequestContext.getClientIp())
.userAgent(parentRequestContext.getUserAgent())
.threadId(Thread.currentThread().getId())
.copiedFrom(Thread.currentThread().getName())
.build();

return (T) copiedRequestContext;
}

return parentValue;

} catch (Exception e) {
log.error("复制请求上下文失败", e);
return parentValue;
}
}

/**
* 复制业务上下文
*/
@SuppressWarnings("unchecked")
private <T> T copyBusinessContext(T parentValue) {
try {
if (parentValue instanceof Map) {
Map<String, Object> parentBusinessContext = (Map<String, Object>) parentValue;

// 创建新的业务上下文,深度复制父线程的信息
Map<String, Object> copiedBusinessContext = new HashMap<>();
for (Map.Entry<String, Object> entry : parentBusinessContext.entrySet()) {
String key = entry.getKey();
Object value = entry.getValue();

// 深度复制值
Object copiedValue = deepCopyValue(value);
copiedBusinessContext.put(key, copiedValue);
}

// 添加复制信息
copiedBusinessContext.put("threadId", Thread.currentThread().getId());
copiedBusinessContext.put("copiedFrom", Thread.currentThread().getName());

return (T) copiedBusinessContext;
}

return parentValue;

} catch (Exception e) {
log.error("复制业务上下文失败", e);
return parentValue;
}
}

/**
* 复制安全上下文
*/
@SuppressWarnings("unchecked")
private <T> T copySecurityContext(T parentValue) {
try {
if (parentValue instanceof SecurityContext) {
SecurityContext parentSecurityContext = (SecurityContext) parentValue;

// 创建新的安全上下文,复制父线程的信息
SecurityContext copiedSecurityContext = SecurityContext.builder()
.principal(parentSecurityContext.getPrincipal())
.authorities(new ArrayList<>(parentSecurityContext.getAuthorities()))
.authenticated(parentSecurityContext.isAuthenticated())
.threadId(Thread.currentThread().getId())
.copiedFrom(Thread.currentThread().getName())
.build();

return (T) copiedSecurityContext;
}

return parentValue;

} catch (Exception e) {
log.error("复制安全上下文失败", e);
return parentValue;
}
}

/**
* 复制日志上下文
*/
@SuppressWarnings("unchecked")
private <T> T copyLogContext(T parentValue) {
try {
if (parentValue instanceof LogContext) {
LogContext parentLogContext = (LogContext) parentValue;

// 创建新的日志上下文,复制父线程的信息
LogContext copiedLogContext = LogContext.builder()
.traceId(parentLogContext.getTraceId())
.spanId(parentLogContext.getSpanId())
.parentSpanId(parentLogContext.getParentSpanId())
.threadId(Thread.currentThread().getId())
.copiedFrom(Thread.currentThread().getName())
.build();

return (T) copiedLogContext;
}

return parentValue;

} catch (Exception e) {
log.error("复制日志上下文失败", e);
return parentValue;
}
}

/**
* 复制默认上下文
*/
private <T> T copyDefaultContext(T parentValue) {
try {
// 对于其他类型的上下文,执行浅度复制
if (parentValue instanceof Cloneable) {
// 如果对象实现了Cloneable接口,尝试克隆
try {
Method cloneMethod = parentValue.getClass().getMethod("clone");
return (T) cloneMethod.invoke(parentValue);
} catch (Exception e) {
log.debug("对象克隆失败,使用原始值: {}", e.getMessage());
}
}

// 返回原始值
return parentValue;

} catch (Exception e) {
log.error("复制默认上下文失败", e);
return parentValue;
}
}

/**
* 深度复制值
*/
private Object deepCopyValue(Object value) {
try {
if (value == null) {
return null;
}

if (value instanceof String || value instanceof Number || value instanceof Boolean) {
return value; // 基本类型直接返回
}

if (value instanceof List) {
List<?> list = (List<?>) value;
List<Object> copiedList = new ArrayList<>();
for (Object item : list) {
copiedList.add(deepCopyValue(item));
}
return copiedList;
}

if (value instanceof Map) {
Map<?, ?> map = (Map<?, ?>) value;
Map<Object, Object> copiedMap = new HashMap<>();
for (Map.Entry<?, ?> entry : map.entrySet()) {
Object key = deepCopyValue(entry.getKey());
Object val = deepCopyValue(entry.getValue());
copiedMap.put(key, val);
}
return copiedMap;
}

// 对于其他对象,尝试序列化反序列化
try {
ByteArrayOutputStream baos = new ByteArrayOutputStream();
ObjectOutputStream oos = new ObjectOutputStream(baos);
oos.writeObject(value);
oos.close();

ByteArrayInputStream bais = new ByteArrayInputStream(baos.toByteArray());
ObjectInputStream ois = new ObjectInputStream(bais);
Object copiedValue = ois.readObject();
ois.close();

return copiedValue;

} catch (Exception e) {
log.debug("序列化复制失败,使用原始值: {}", e.getMessage());
return value;
}

} catch (Exception e) {
log.error("深度复制值失败", e);
return value;
}
}

/**
* 执行前操作
*/
public void beforeExecute(String name) {
try {
// 记录执行前操作
log.debug("TransmittableThreadLocal执行前操作: {}", name);

} catch (Exception e) {
log.error("执行前操作失败: {}", name, e);
}
}

/**
* 执行后操作
*/
public void afterExecute(String name) {
try {
// 记录执行后操作
log.debug("TransmittableThreadLocal执行后操作: {}", name);

} catch (Exception e) {
log.error("执行后操作失败: {}", name, e);
}
}

/**
* 装饰Runnable
*/
public Runnable decorateRunnable(String taskName, Runnable runnable) {
try {
totalDecorations.incrementAndGet();

Runnable decoratedRunnable = TtlRunnable.get(runnable);

successfulDecorations.incrementAndGet();
log.debug("Runnable装饰成功: {}", taskName);

return decoratedRunnable;

} catch (Exception e) {
failedDecorations.incrementAndGet();
log.error("Runnable装饰失败: {}", taskName, e);
throw new TTLDecoratorException("Runnable装饰失败: " + e.getMessage());
}
}

/**
* 装饰Callable
*/
public <T> Callable<T> decorateCallable(String taskName, Callable<T> callable) {
try {
totalDecorations.incrementAndGet();

Callable<T> decoratedCallable = TtlCallable.get(callable);

successfulDecorations.incrementAndGet();
log.debug("Callable装饰成功: {}", taskName);

return decoratedCallable;

} catch (Exception e) {
failedDecorations.incrementAndGet();
log.error("Callable装饰失败: {}", taskName, e);
throw new TTLDecoratorException("Callable装饰失败: " + e.getMessage());
}
}

/**
* 装饰Executor
*/
public Executor decorateExecutor(String executorName, Executor executor) {
try {
totalDecorations.incrementAndGet();

Executor decoratedExecutor = TtlExecutors.getTtlExecutor(executor);

successfulDecorations.incrementAndGet();
log.debug("Executor装饰成功: {}", executorName);

return decoratedExecutor;

} catch (Exception e) {
failedDecorations.incrementAndGet();
log.error("Executor装饰失败: {}", executorName, e);
throw new TTLDecoratorException("Executor装饰失败: " + e.getMessage());
}
}

/**
* 装饰ExecutorService
*/
public ExecutorService decorateExecutorService(String executorName, ExecutorService executorService) {
try {
totalDecorations.incrementAndGet();

ExecutorService decoratedExecutorService = TtlExecutors.getTtlExecutorService(executorService);

successfulDecorations.incrementAndGet();
log.debug("ExecutorService装饰成功: {}", executorName);

return decoratedExecutorService;

} catch (Exception e) {
failedDecorations.incrementAndGet();
log.error("ExecutorService装饰失败: {}", executorName, e);
throw new TTLDecoratorException("ExecutorService装饰失败: " + e.getMessage());
}
}

/**
* 装饰ScheduledExecutorService
*/
public ScheduledExecutorService decorateScheduledExecutorService(String executorName, ScheduledExecutorService scheduledExecutorService) {
try {
totalDecorations.incrementAndGet();

ScheduledExecutorService decoratedScheduledExecutorService = TtlExecutors.getTtlScheduledExecutorService(scheduledExecutorService);

successfulDecorations.incrementAndGet();
log.debug("ScheduledExecutorService装饰成功: {}", executorName);

return decoratedScheduledExecutorService;

} catch (Exception e) {
failedDecorations.incrementAndGet();
log.error("ScheduledExecutorService装饰失败: {}", executorName, e);
throw new TTLDecoratorException("ScheduledExecutorService装饰失败: " + e.getMessage());
}
}

/**
* 获取装饰器指标
*/
public DecoratorMetrics getDecoratorMetrics() {
try {
long total = totalDecorations.get();
long successful = successfulDecorations.get();
long failed = failedDecorations.get();
double successRate = total > 0 ? (double) successful / total : 0;

return DecoratorMetrics.builder()
.totalDecorations(total)
.successfulDecorations(successful)
.failedDecorations(failed)
.successRate(successRate)
.registeredTTLCount(registeredTTLs.size())
.build();

} catch (Exception e) {
log.error("获取装饰器指标失败", e);
throw new TTLDecoratorException("获取装饰器指标失败: " + e.getMessage());
}
}
}

三、线程池传递器

3.1 线程池传递器实现

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
/**
* 线程池传递器
* 负责管理线程池间的上下文传递
*/
@Component
@Slf4j
public class ThreadPoolTransmitter {

private final Map<String, ExecutorService> decoratedExecutors = new ConcurrentHashMap<>();
private final Map<String, ScheduledExecutorService> decoratedScheduledExecutors = new ConcurrentHashMap<>();

private final AtomicLong totalTransmissions = new AtomicLong(0);
private final AtomicLong successfulTransmissions = new AtomicLong(0);
private final AtomicLong failedTransmissions = new AtomicLong(0);

/**
* 创建装饰的线程池
*/
public ExecutorService createDecoratedThreadPool(String poolName, int corePoolSize, int maximumPoolSize) {
try {
// 检查是否已存在
if (decoratedExecutors.containsKey(poolName)) {
throw new ThreadPoolTransmitterException("装饰的线程池已存在: " + poolName);
}

// 创建原始线程池
ThreadPoolExecutor originalExecutor = new ThreadPoolExecutor(
corePoolSize,
maximumPoolSize,
60L,
TimeUnit.SECONDS,
new LinkedBlockingQueue<>(1000),
new ThreadFactory() {
private final AtomicInteger threadNumber = new AtomicInteger(1);
private final String namePrefix = poolName + "-";

@Override
public Thread newThread(Runnable r) {
Thread thread = new Thread(r, namePrefix + threadNumber.getAndIncrement());
thread.setDaemon(false);
return thread;
}
},
new ThreadPoolExecutor.CallerRunsPolicy()
);

// 装饰线程池
ExecutorService decoratedExecutor = TtlExecutors.getTtlExecutorService(originalExecutor);

// 注册装饰的线程池
decoratedExecutors.put(poolName, decoratedExecutor);

log.info("装饰的线程池创建成功: {}", poolName);
return decoratedExecutor;

} catch (Exception e) {
log.error("装饰的线程池创建失败: {}", poolName, e);
throw new ThreadPoolTransmitterException("装饰的线程池创建失败: " + e.getMessage());
}
}

/**
* 创建装饰的定时线程池
*/
public ScheduledExecutorService createDecoratedScheduledThreadPool(String poolName, int corePoolSize) {
try {
// 检查是否已存在
if (decoratedScheduledExecutors.containsKey(poolName)) {
throw new ThreadPoolTransmitterException("装饰的定时线程池已存在: " + poolName);
}

// 创建原始定时线程池
ScheduledThreadPoolExecutor originalExecutor = new ScheduledThreadPoolExecutor(
corePoolSize,
new ThreadFactory() {
private final AtomicInteger threadNumber = new AtomicInteger(1);
private final String namePrefix = poolName + "-";

@Override
public Thread newThread(Runnable r) {
Thread thread = new Thread(r, namePrefix + threadNumber.getAndIncrement());
thread.setDaemon(false);
return thread;
}
},
new ThreadPoolExecutor.CallerRunsPolicy()
);

// 装饰定时线程池
ScheduledExecutorService decoratedExecutor = TtlExecutors.getTtlScheduledExecutorService(originalExecutor);

// 注册装饰的定时线程池
decoratedScheduledExecutors.put(poolName, decoratedExecutor);

log.info("装饰的定时线程池创建成功: {}", poolName);
return decoratedExecutor;

} catch (Exception e) {
log.error("装饰的定时线程池创建失败: {}", poolName, e);
throw new ThreadPoolTransmitterException("装饰的定时线程池创建失败: " + e.getMessage());
}
}

/**
* 获取装饰的线程池
*/
public ExecutorService getDecoratedExecutor(String poolName) {
ExecutorService executor = decoratedExecutors.get(poolName);
if (executor == null) {
throw new ThreadPoolTransmitterException("装饰的线程池不存在: " + poolName);
}
return executor;
}

/**
* 获取装饰的定时线程池
*/
public ScheduledExecutorService getDecoratedScheduledExecutor(String poolName) {
ScheduledExecutorService executor = decoratedScheduledExecutors.get(poolName);
if (executor == null) {
throw new ThreadPoolTransmitterException("装饰的定时线程池不存在: " + poolName);
}
return executor;
}

/**
* 执行任务
*/
public <T> CompletableFuture<T> executeTask(String poolName, Callable<T> task) {
try {
ExecutorService executor = getDecoratedExecutor(poolName);

totalTransmissions.incrementAndGet();

CompletableFuture<T> future = CompletableFuture.supplyAsync(() -> {
try {
T result = task.call();
successfulTransmissions.incrementAndGet();
return result;
} catch (Exception e) {
failedTransmissions.incrementAndGet();
log.error("任务执行失败: {}", poolName, e);
throw new RuntimeException("任务执行失败: " + e.getMessage(), e);
}
}, executor);

return future;

} catch (Exception e) {
log.error("执行任务失败: {}", poolName, e);
throw new ThreadPoolTransmitterException("执行任务失败: " + e.getMessage());
}
}

/**
* 批量执行任务
*/
public <T> CompletableFuture<List<T>> executeBatchTasks(String poolName, List<Callable<T>> tasks) {
try {
ExecutorService executor = getDecoratedExecutor(poolName);

totalTransmissions.addAndGet(tasks.size());

List<CompletableFuture<T>> futures = tasks.stream()
.map(task -> CompletableFuture.supplyAsync(() -> {
try {
T result = task.call();
successfulTransmissions.incrementAndGet();
return result;
} catch (Exception e) {
failedTransmissions.incrementAndGet();
log.error("批量任务中的单个任务执行失败: {}", poolName, e);
throw new RuntimeException("任务执行失败: " + e.getMessage(), e);
}
}, executor))
.collect(Collectors.toList());

CompletableFuture<List<T>> future = CompletableFuture.allOf(
futures.toArray(new CompletableFuture[0]))
.thenApply(v -> futures.stream()
.map(CompletableFuture::join)
.collect(Collectors.toList()));

return future;

} catch (Exception e) {
log.error("批量执行任务失败: {}", poolName, e);
throw new ThreadPoolTransmitterException("批量执行任务失败: " + e.getMessage());
}
}

/**
* 执行定时任务
*/
public ScheduledFuture<?> scheduleTask(String poolName, Runnable task, long delay, TimeUnit unit) {
try {
ScheduledExecutorService executor = getDecoratedScheduledExecutor(poolName);

totalTransmissions.incrementAndGet();

ScheduledFuture<?> future = executor.schedule(() -> {
try {
task.run();
successfulTransmissions.incrementAndGet();
} catch (Exception e) {
failedTransmissions.incrementAndGet();
log.error("定时任务执行失败: {}", poolName, e);
}
}, delay, unit);

return future;

} catch (Exception e) {
log.error("执行定时任务失败: {}", poolName, e);
throw new ThreadPoolTransmitterException("执行定时任务失败: " + e.getMessage());
}
}

/**
* 执行周期性任务
*/
public ScheduledFuture<?> schedulePeriodicTask(String poolName, Runnable task, long initialDelay, long period, TimeUnit unit) {
try {
ScheduledExecutorService executor = getDecoratedScheduledExecutor(poolName);

totalTransmissions.incrementAndGet();

ScheduledFuture<?> future = executor.scheduleAtFixedRate(() -> {
try {
task.run();
successfulTransmissions.incrementAndGet();
} catch (Exception e) {
failedTransmissions.incrementAndGet();
log.error("周期性任务执行失败: {}", poolName, e);
}
}, initialDelay, period, unit);

return future;

} catch (Exception e) {
log.error("执行周期性任务失败: {}", poolName, e);
throw new ThreadPoolTransmitterException("执行周期性任务失败: " + e.getMessage());
}
}

/**
* 关闭线程池
*/
public void shutdownExecutor(String poolName) {
try {
ExecutorService executor = decoratedExecutors.get(poolName);
if (executor != null) {
executor.shutdown();
decoratedExecutors.remove(poolName);

log.info("线程池关闭成功: {}", poolName);
}

} catch (Exception e) {
log.error("关闭线程池失败: {}", poolName, e);
throw new ThreadPoolTransmitterException("关闭线程池失败: " + e.getMessage());
}
}

/**
* 关闭定时线程池
*/
public void shutdownScheduledExecutor(String poolName) {
try {
ScheduledExecutorService executor = decoratedScheduledExecutors.get(poolName);
if (executor != null) {
executor.shutdown();
decoratedScheduledExecutors.remove(poolName);

log.info("定时线程池关闭成功: {}", poolName);
}

} catch (Exception e) {
log.error("关闭定时线程池失败: {}", poolName, e);
throw new ThreadPoolTransmitterException("关闭定时线程池失败: " + e.getMessage());
}
}

/**
* 关闭所有线程池
*/
public void shutdownAllExecutors() {
try {
// 关闭所有线程池
for (String poolName : decoratedExecutors.keySet()) {
shutdownExecutor(poolName);
}

// 关闭所有定时线程池
for (String poolName : decoratedScheduledExecutors.keySet()) {
shutdownScheduledExecutor(poolName);
}

log.info("所有线程池关闭完成");

} catch (Exception e) {
log.error("关闭所有线程池失败", e);
}
}

/**
* 获取传递指标
*/
public TransmissionMetrics getTransmissionMetrics() {
try {
return TransmissionMetrics.builder()
.totalTransmissions(totalTransmissions.get())
.successfulTransmissions(successfulTransmissions.get())
.failedTransmissions(failedTransmissions.get())
.decoratedExecutorCount(decoratedExecutors.size())
.decoratedScheduledExecutorCount(decoratedScheduledExecutors.size())
.build();

} catch (Exception e) {
log.error("获取传递指标失败", e);
throw new ThreadPoolTransmitterException("获取传递指标失败: " + e.getMessage());
}
}

/**
* 清理线程池传递器
*/
@PreDestroy
public void cleanup() {
try {
shutdownAllExecutors();
log.info("线程池传递器清理完成");

} catch (Exception e) {
log.error("线程池传递器清理失败", e);
}
}
}

四、异步任务传递器

4.1 异步任务传递器实现

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
/**
* 异步任务传递器
* 负责管理异步任务的上下文传递
*/
@Component
@Slf4j
public class AsyncTaskTransmitter {

private final ExecutorService defaultExecutor;
private final Map<String, ExecutorService> customExecutors = new ConcurrentHashMap<>();

private final AtomicLong totalTaskCount = new AtomicLong(0);
private final AtomicLong completedTaskCount = new AtomicLong(0);
private final AtomicLong failedTaskCount = new AtomicLong(0);

/**
* 执行异步任务
*/
public <T> CompletableFuture<T> executeAsyncTask(String taskName, Callable<T> task) {
try {
totalTaskCount.incrementAndGet();

// 装饰Callable
Callable<T> decoratedTask = TtlCallable.get(task);

// 创建异步任务
CompletableFuture<T> future = CompletableFuture.supplyAsync(() -> {
try {
T result = decoratedTask.call();
completedTaskCount.incrementAndGet();
log.debug("异步任务执行成功: {}", taskName);
return result;
} catch (Exception e) {
failedTaskCount.incrementAndGet();
log.error("异步任务执行失败: {}", taskName, e);
throw new RuntimeException("异步任务执行失败: " + e.getMessage(), e);
}
}, defaultExecutor);

return future;

} catch (Exception e) {
log.error("创建异步任务失败: {}", taskName, e);
throw new AsyncTaskTransmitterException("创建异步任务失败: " + e.getMessage());
}
}

/**
* 执行异步任务(带超时)
*/
public <T> CompletableFuture<T> executeAsyncTaskWithTimeout(String taskName, Callable<T> task, long timeout, TimeUnit timeUnit) {
try {
totalTaskCount.incrementAndGet();

// 装饰Callable
Callable<T> decoratedTask = TtlCallable.get(task);

// 创建异步任务
CompletableFuture<T> future = CompletableFuture.supplyAsync(() -> {
try {
T result = decoratedTask.call();
completedTaskCount.incrementAndGet();
log.debug("异步任务执行成功: {}", taskName);
return result;
} catch (Exception e) {
failedTaskCount.incrementAndGet();
log.error("异步任务执行失败: {}", taskName, e);
throw new RuntimeException("异步任务执行失败: " + e.getMessage(), e);
}
}, defaultExecutor);

// 设置超时
future.orTimeout(timeout, timeUnit);

return future;

} catch (Exception e) {
log.error("创建异步任务失败: {}", taskName, e);
throw new AsyncTaskTransmitterException("创建异步任务失败: " + e.getMessage());
}
}

/**
* 批量执行异步任务
*/
public <T> CompletableFuture<List<T>> executeBatchAsyncTasks(String taskName, List<Callable<T>> tasks) {
try {
totalTaskCount.addAndGet(tasks.size());

// 装饰所有Callable
List<Callable<T>> decoratedTasks = tasks.stream()
.map(TtlCallable::get)
.collect(Collectors.toList());

// 创建批量异步任务
CompletableFuture<List<T>> future = CompletableFuture.supplyAsync(() -> {
try {
List<T> results = new ArrayList<>();
for (Callable<T> task : decoratedTasks) {
try {
T result = task.call();
results.add(result);
completedTaskCount.incrementAndGet();
} catch (Exception e) {
log.error("批量任务中的单个任务执行失败: {}", taskName, e);
failedTaskCount.incrementAndGet();
throw e;
}
}

log.debug("批量异步任务执行成功: {}", taskName);
return results;

} catch (Exception e) {
log.error("批量异步任务执行失败: {}", taskName, e);
throw new RuntimeException("批量异步任务执行失败: " + e.getMessage(), e);
}
}, defaultExecutor);

return future;

} catch (Exception e) {
log.error("创建批量异步任务失败: {}", taskName, e);
throw new AsyncTaskTransmitterException("创建批量异步任务失败: " + e.getMessage());
}
}

/**
* 执行异步任务(带重试)
*/
public <T> CompletableFuture<T> executeAsyncTaskWithRetry(String taskName, Callable<T> task, int maxRetries) {
try {
totalTaskCount.incrementAndGet();

// 装饰Callable
Callable<T> decoratedTask = TtlCallable.get(task);

// 创建异步任务
CompletableFuture<T> future = CompletableFuture.supplyAsync(() -> {
Exception lastException = null;

for (int attempt = 0; attempt <= maxRetries; attempt++) {
try {
T result = decoratedTask.call();
completedTaskCount.incrementAndGet();
log.debug("异步任务执行成功: {} (尝试次数: {})", taskName, attempt + 1);
return result;

} catch (Exception e) {
lastException = e;
log.warn("异步任务执行失败: {} (尝试次数: {})", taskName, attempt + 1, e);

if (attempt < maxRetries) {
// 等待一段时间后重试
try {
Thread.sleep(1000 * (attempt + 1));
} catch (InterruptedException ie) {
Thread.currentThread().interrupt();
break;
}
}
}
}

failedTaskCount.incrementAndGet();
log.error("异步任务执行失败: {} (最大重试次数: {})", taskName, maxRetries, lastException);
throw new RuntimeException("异步任务执行失败: " + lastException.getMessage(), lastException);

}, defaultExecutor);

return future;

} catch (Exception e) {
log.error("创建异步任务失败: {}", taskName, e);
throw new AsyncTaskTransmitterException("创建异步任务失败: " + e.getMessage());
}
}

/**
* 执行异步任务(带自定义执行器)
*/
public <T> CompletableFuture<T> executeAsyncTaskWithExecutor(String taskName, Callable<T> task, String executorName) {
try {
totalTaskCount.incrementAndGet();

// 获取自定义执行器
ExecutorService executor = getCustomExecutor(executorName);

// 装饰Callable
Callable<T> decoratedTask = TtlCallable.get(task);

// 创建异步任务
CompletableFuture<T> future = CompletableFuture.supplyAsync(() -> {
try {
T result = decoratedTask.call();
completedTaskCount.incrementAndGet();
log.debug("异步任务执行成功: {} (执行器: {})", taskName, executorName);
return result;
} catch (Exception e) {
failedTaskCount.incrementAndGet();
log.error("异步任务执行失败: {} (执行器: {})", taskName, executorName, e);
throw new RuntimeException("异步任务执行失败: " + e.getMessage(), e);
}
}, executor);

return future;

} catch (Exception e) {
log.error("创建异步任务失败: {}", taskName, e);
throw new AsyncTaskTransmitterException("创建异步任务失败: " + e.getMessage());
}
}

/**
* 创建自定义执行器
*/
public ExecutorService createCustomExecutor(String executorName, int corePoolSize, int maximumPoolSize) {
try {
// 检查是否已存在
if (customExecutors.containsKey(executorName)) {
throw new AsyncTaskTransmitterException("自定义执行器已存在: " + executorName);
}

// 创建原始执行器
ThreadPoolExecutor originalExecutor = new ThreadPoolExecutor(
corePoolSize,
maximumPoolSize,
60L,
TimeUnit.SECONDS,
new LinkedBlockingQueue<>(1000),
new ThreadFactory() {
private final AtomicInteger threadNumber = new AtomicInteger(1);
private final String namePrefix = executorName + "-";

@Override
public Thread newThread(Runnable r) {
Thread thread = new Thread(r, namePrefix + threadNumber.getAndIncrement());
thread.setDaemon(false);
return thread;
}
},
new ThreadPoolExecutor.CallerRunsPolicy()
);

// 装饰执行器
ExecutorService decoratedExecutor = TtlExecutors.getTtlExecutorService(originalExecutor);

// 注册自定义执行器
customExecutors.put(executorName, decoratedExecutor);

log.info("自定义执行器创建成功: {}", executorName);
return decoratedExecutor;

} catch (Exception e) {
log.error("自定义执行器创建失败: {}", executorName, e);
throw new AsyncTaskTransmitterException("自定义执行器创建失败: " + e.getMessage());
}
}

/**
* 获取自定义执行器
*/
public ExecutorService getCustomExecutor(String executorName) {
ExecutorService executor = customExecutors.get(executorName);
if (executor == null) {
throw new AsyncTaskTransmitterException("自定义执行器不存在: " + executorName);
}
return executor;
}

/**
* 关闭自定义执行器
*/
public void shutdownCustomExecutor(String executorName) {
try {
ExecutorService executor = customExecutors.get(executorName);
if (executor != null) {
executor.shutdown();
customExecutors.remove(executorName);

log.info("自定义执行器关闭成功: {}", executorName);
}

} catch (Exception e) {
log.error("关闭自定义执行器失败: {}", executorName, e);
throw new AsyncTaskTransmitterException("关闭自定义执行器失败: " + e.getMessage());
}
}

/**
* 关闭所有自定义执行器
*/
public void shutdownAllCustomExecutors() {
try {
for (String executorName : customExecutors.keySet()) {
shutdownCustomExecutor(executorName);
}

log.info("所有自定义执行器关闭完成");

} catch (Exception e) {
log.error("关闭所有自定义执行器失败", e);
}
}

/**
* 获取异步任务指标
*/
public AsyncTaskMetrics getAsyncTaskMetrics() {
try {
long total = totalTaskCount.get();
long completed = completedTaskCount.get();
long failed = failedTaskCount.get();
double successRate = total > 0 ? (double) completed / total : 0;

return AsyncTaskMetrics.builder()
.totalTaskCount(total)
.completedTaskCount(completed)
.failedTaskCount(failed)
.successRate(successRate)
.customExecutorCount(customExecutors.size())
.build();

} catch (Exception e) {
log.error("获取异步任务指标失败", e);
throw new AsyncTaskTransmitterException("获取异步任务指标失败: " + e.getMessage());
}
}

/**
* 清理异步任务传递器
*/
@PreDestroy
public void cleanup() {
try {
// 关闭默认执行器
defaultExecutor.shutdown();

// 关闭所有自定义执行器
shutdownAllCustomExecutors();

log.info("异步任务传递器清理完成");

} catch (Exception e) {
log.error("异步任务传递器清理失败", e);
}
}
}

五、性能监控与配置管理

5.1 性能监控器

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
/**
* 性能监控器
* 负责监控TransmittableThreadLocal的性能指标
*/
@Component
@Slf4j
public class PerformanceMonitor {

private final Map<String, PerformanceMetrics> metricsMap = new ConcurrentHashMap<>();
private final AtomicLong totalOperations = new AtomicLong(0);
private final AtomicLong totalCopyOperations = new AtomicLong(0);
private final AtomicLong totalLatency = new AtomicLong(0);

private final ScheduledExecutorService scheduler = Executors.newScheduledThreadPool(1);

/**
* 记录操作
*/
public void recordOperation(String name, String operation) {
try {
totalOperations.incrementAndGet();

PerformanceMetrics metrics = metricsMap.computeIfAbsent(name, k ->
PerformanceMetrics.builder()
.name(k)
.setCount(0)
.getCount(0)
.removeCount(0)
.copyCount(0)
.beforeExecuteCount(0)
.afterExecuteCount(0)
.totalLatency(0)
.averageLatency(0)
.maxLatency(0)
.minLatency(Long.MAX_VALUE)
.lastOperationTime(System.currentTimeMillis())
.build()
);

long startTime = System.nanoTime();

// 更新指标
updateMetrics(metrics, operation);

long endTime = System.nanoTime();
long latency = endTime - startTime;

totalLatency.addAndGet(latency);
metrics.setTotalLatency(metrics.getTotalLatency() + latency);
metrics.setAverageLatency(metrics.getTotalLatency() / getTotalOperationCount(metrics));
metrics.setMaxLatency(Math.max(metrics.getMaxLatency(), latency));
metrics.setMinLatency(Math.min(metrics.getMinLatency(), latency));
metrics.setLastOperationTime(System.currentTimeMillis());

} catch (Exception e) {
log.error("记录操作失败: {} - {}", name, operation, e);
}
}

/**
* 记录复制操作
*/
public void recordCopy(String name, Object value) {
try {
totalCopyOperations.incrementAndGet();

PerformanceMetrics metrics = metricsMap.computeIfAbsent(name, k ->
PerformanceMetrics.builder()
.name(k)
.setCount(0)
.getCount(0)
.removeCount(0)
.copyCount(0)
.beforeExecuteCount(0)
.afterExecuteCount(0)
.totalLatency(0)
.averageLatency(0)
.maxLatency(0)
.minLatency(Long.MAX_VALUE)
.lastOperationTime(System.currentTimeMillis())
.build()
);

metrics.setCopyCount(metrics.getCopyCount() + 1);
metrics.setLastOperationTime(System.currentTimeMillis());

} catch (Exception e) {
log.error("记录复制操作失败: {}", name, e);
}
}

/**
* 记录执行前操作
*/
public void recordBeforeExecute(String name) {
try {
PerformanceMetrics metrics = metricsMap.computeIfAbsent(name, k ->
PerformanceMetrics.builder()
.name(k)
.setCount(0)
.getCount(0)
.removeCount(0)
.copyCount(0)
.beforeExecuteCount(0)
.afterExecuteCount(0)
.totalLatency(0)
.averageLatency(0)
.maxLatency(0)
.minLatency(Long.MAX_VALUE)
.lastOperationTime(System.currentTimeMillis())
.build()
);

metrics.setBeforeExecuteCount(metrics.getBeforeExecuteCount() + 1);
metrics.setLastOperationTime(System.currentTimeMillis());

} catch (Exception e) {
log.error("记录执行前操作失败: {}", name, e);
}
}

/**
* 记录执行后操作
*/
public void recordAfterExecute(String name) {
try {
PerformanceMetrics metrics = metricsMap.computeIfAbsent(name, k ->
PerformanceMetrics.builder()
.name(k)
.setCount(0)
.getCount(0)
.removeCount(0)
.copyCount(0)
.beforeExecuteCount(0)
.afterExecuteCount(0)
.totalLatency(0)
.averageLatency(0)
.maxLatency(0)
.minLatency(Long.MAX_VALUE)
.lastOperationTime(System.currentTimeMillis())
.build()
);

metrics.setAfterExecuteCount(metrics.getAfterExecuteCount() + 1);
metrics.setLastOperationTime(System.currentTimeMillis());

} catch (Exception e) {
log.error("记录执行后操作失败: {}", name, e);
}
}

/**
* 更新指标
*/
private void updateMetrics(PerformanceMetrics metrics, String operation) {
switch (operation) {
case "set":
metrics.setSetCount(metrics.getSetCount() + 1);
break;
case "get":
metrics.setGetCount(metrics.getGetCount() + 1);
break;
case "remove":
metrics.setRemoveCount(metrics.getRemoveCount() + 1);
break;
}
}

/**
* 获取总操作次数
*/
private long getTotalOperationCount(PerformanceMetrics metrics) {
return metrics.getSetCount() + metrics.getGetCount() + metrics.getRemoveCount() +
metrics.getCopyCount() + metrics.getBeforeExecuteCount() + metrics.getAfterExecuteCount();
}

/**
* 获取性能指标
*/
public Map<String, PerformanceMetrics> getMetrics() {
return new HashMap<>(metricsMap);
}

/**
* 获取总体性能指标
*/
public OverallPerformanceMetrics getOverallMetrics() {
try {
long totalOps = totalOperations.get();
long totalCopyOps = totalCopyOperations.get();
long totalLat = totalLatency.get();
double averageLatency = totalOps > 0 ? (double) totalLat / totalOps : 0;

return OverallPerformanceMetrics.builder()
.totalOperations(totalOps)
.totalCopyOperations(totalCopyOps)
.totalLatency(totalLat)
.averageLatency(averageLatency)
.transmittableThreadLocalCount(metricsMap.size())
.build();

} catch (Exception e) {
log.error("获取总体性能指标失败", e);
throw new PerformanceException("获取总体性能指标失败: " + e.getMessage());
}
}

/**
* 启动性能监控
*/
@PostConstruct
public void startMonitoring() {
scheduler.scheduleAtFixedRate(() -> {
try {
logPerformanceMetrics();
} catch (Exception e) {
log.error("性能监控失败", e);
}
}, 0, 60, TimeUnit.SECONDS);
}

/**
* 记录性能指标
*/
private void logPerformanceMetrics() {
try {
OverallPerformanceMetrics overallMetrics = getOverallMetrics();

log.info("TransmittableThreadLocal性能指标 - 总操作数: {}, 复制操作数: {}, 平均延迟: {}ns, TransmittableThreadLocal数量: {}",
overallMetrics.getTotalOperations(),
overallMetrics.getTotalCopyOperations(),
overallMetrics.getAverageLatency(),
overallMetrics.getTransmittableThreadLocalCount());

// 记录每个TransmittableThreadLocal的性能指标
for (PerformanceMetrics metrics : metricsMap.values()) {
log.debug("TransmittableThreadLocal性能指标 - {}: 设置={}, 获取={}, 移除={}, 复制={}, 执行前={}, 执行后={}, 平均延迟={}ns",
metrics.getName(),
metrics.getSetCount(),
metrics.getGetCount(),
metrics.getRemoveCount(),
metrics.getCopyCount(),
metrics.getBeforeExecuteCount(),
metrics.getAfterExecuteCount(),
metrics.getAverageLatency());
}

} catch (Exception e) {
log.error("记录性能指标失败", e);
}
}

/**
* 清理性能监控器
*/
@PreDestroy
public void cleanup() {
try {
scheduler.shutdown();
metricsMap.clear();

log.info("性能监控器清理完成");

} catch (Exception e) {
log.error("性能监控器清理失败", e);
}
}
}

六、实战应用与最佳实践

6.1 线程池上下文传递实战

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
/**
* 线程池上下文传递实战示例
*/
@RestController
@RequestMapping("/api/thread-pool-transmission")
@Slf4j
public class ThreadPoolTransmissionExample {

private final TransmittableThreadLocalManager ttlManager;

/**
* 演示线程池上下文传递
*/
@PostMapping("/demonstrate-transmission")
public ResponseEntity<TransmissionDemoResult> demonstrateTransmission() {
try {
// 1. 在父线程中设置上下文
UserContext userContext = UserContext.builder()
.userId("12345")
.username("testuser")
.email("test@example.com")
.roles(Arrays.asList("USER", "ADMIN"))
.permissions(Arrays.asList("READ", "WRITE"))
.loginTime(System.currentTimeMillis())
.build();

ttlManager.setValue("userContext", userContext);

// 2. 创建装饰的线程池
ExecutorService decoratedExecutor = ttlManager.createDecoratedThreadPool(
"transmission-demo-pool", 2, 4);

// 3. 在线程池中执行任务
CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> {
// 在子线程中获取上下文
UserContext inheritedContext = ttlManager.getValue("userContext");

if (inheritedContext != null) {
log.info("子线程成功获取用户上下文: {}", inheritedContext.getUserId());
return "传递成功: " + inheritedContext.getUsername();
} else {
log.warn("子线程未能获取用户上下文");
return "传递失败";
}
}, decoratedExecutor);

// 4. 等待任务完成
String result = future.get(10, TimeUnit.SECONDS);

TransmissionDemoResult demoResult = TransmissionDemoResult.builder()
.parentThreadId(Thread.currentThread().getId())
.parentThreadName(Thread.currentThread().getName())
.result(result)
.success(true)
.build();

return ResponseEntity.ok(demoResult);

} catch (Exception e) {
log.error("线程池上下文传递演示失败", e);
return ResponseEntity.badRequest().build();
}
}

/**
* 演示TTL装饰器
*/
@PostMapping("/demonstrate-decorator")
public ResponseEntity<DecoratorDemoResult> demonstrateDecorator() {
try {
// 1. 在父线程中设置上下文
RequestContext requestContext = RequestContext.builder()
.requestId(UUID.randomUUID().toString())
.requestTime(System.currentTimeMillis())
.clientIp("192.168.1.1")
.userAgent("Mozilla/5.0")
.build();

ttlManager.setValue("requestContext", requestContext);

// 2. 创建原始Runnable
Runnable originalRunnable = () -> {
RequestContext context = ttlManager.getValue("requestContext");
if (context != null) {
log.info("装饰的Runnable获取到请求上下文: {}", context.getRequestId());
} else {
log.warn("装饰的Runnable未能获取到请求上下文");
}
};

// 3. 装饰Runnable
Runnable decoratedRunnable = ttlManager.decorateRunnable("decorator-demo", originalRunnable);

// 4. 创建线程池并执行
ExecutorService executor = Executors.newFixedThreadPool(2);
executor.submit(decoratedRunnable);

// 5. 等待任务完成
Thread.sleep(1000);

DecoratorDemoResult demoResult = DecoratorDemoResult.builder()
.parentThreadId(Thread.currentThread().getId())
.parentThreadName(Thread.currentThread().getName())
.decorated(true)
.success(true)
.build();

return ResponseEntity.ok(demoResult);

} catch (Exception e) {
log.error("TTL装饰器演示失败", e);
return ResponseEntity.badRequest().build();
}
}

/**
* 演示异步任务传递
*/
@PostMapping("/demonstrate-async-transmission")
public ResponseEntity<AsyncTransmissionDemoResult> demonstrateAsyncTransmission() {
try {
// 1. 在父线程中设置上下文
BusinessContext businessContext = new HashMap<>();
businessContext.put("orderId", "ORDER-12345");
businessContext.put("amount", 100.0);
businessContext.put("currency", "USD");

ttlManager.setValue("businessContext", businessContext);

// 2. 执行异步任务
CompletableFuture<String> future = ttlManager.executeAsyncTask(
"async-transmission-demo", () -> {
Map<String, Object> context = ttlManager.getValue("businessContext");

if (context != null) {
log.info("异步任务获取到业务上下文: {}", context.get("orderId"));
return "异步传递成功: " + context.get("orderId");
} else {
log.warn("异步任务未能获取到业务上下文");
return "异步传递失败";
}
});

// 3. 等待任务完成
String result = future.get(10, TimeUnit.SECONDS);

AsyncTransmissionDemoResult demoResult = AsyncTransmissionDemoResult.builder()
.parentThreadId(Thread.currentThread().getId())
.parentThreadName(Thread.currentThread().getName())
.result(result)
.success(true)
.build();

return ResponseEntity.ok(demoResult);

} catch (Exception e) {
log.error("异步任务传递演示失败", e);
return ResponseEntity.badRequest().build();
}
}
}

6.2 TransmittableThreadLocal最佳实践

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
/**
* TransmittableThreadLocal最佳实践
*/
@Component
@Slf4j
public class TransmittableThreadLocalBestPractices {

private final TransmittableThreadLocalManager ttlManager;

/**
* TransmittableThreadLocal最佳实践指南
*/
public void demonstrateBestPractices() {
log.info("=== TransmittableThreadLocal最佳实践指南 ===");

// 1. 正确使用TransmittableThreadLocal
demonstrateCorrectUsage();

// 2. TTL装饰器使用
demonstrateDecoratorUsage();

// 3. 线程池传递
demonstrateThreadPoolTransmission();

// 4. 异步任务处理
demonstrateAsyncTaskHandling();

// 5. 性能优化
demonstratePerformanceOptimization();
}

/**
* 正确使用TransmittableThreadLocal
*/
private void demonstrateCorrectUsage() {
log.info("--- 正确使用TransmittableThreadLocal ---");

try {
// 1. 使用静态变量
TransmittableThreadLocal<String> staticTTL = ttlManager.createTransmittableThreadLocal(
"staticTTL", String.class);

// 2. 在父线程中设置值
staticTTL.set("父线程的值");

// 3. 创建装饰的线程池
ExecutorService decoratedExecutor = ttlManager.createDecoratedThreadPool(
"correct-usage-pool", 2, 4);

// 4. 在线程池中执行任务
CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> {
// 在子线程中获取值
String value = staticTTL.get();
log.info("子线程获取到的值: {}", value);
return value;
}, decoratedExecutor);

// 5. 等待子线程完成
String result = future.get(5, TimeUnit.SECONDS);
log.info("子线程返回结果: {}", result);

// 6. 清理资源
staticTTL.remove();

log.info("TransmittableThreadLocal使用示例完成");

} catch (Exception e) {
log.error("TransmittableThreadLocal使用示例失败", e);
}
}

/**
* TTL装饰器使用
*/
private void demonstrateDecoratorUsage() {
log.info("--- TTL装饰器使用 ---");

try {
// 1. 设置上下文
UserContext userContext = UserContext.builder()
.userId("12345")
.username("testuser")
.email("test@example.com")
.roles(Arrays.asList("USER", "ADMIN"))
.permissions(Arrays.asList("READ", "WRITE"))
.loginTime(System.currentTimeMillis())
.build();

ttlManager.setValue("userContext", userContext);

// 2. 创建原始Callable
Callable<String> originalCallable = () -> {
UserContext context = ttlManager.getValue("userContext");
if (context != null) {
log.info("装饰的Callable获取到用户上下文: {}", context.getUserId());
return "装饰成功: " + context.getUsername();
} else {
return "装饰失败";
}
};

// 3. 装饰Callable
Callable<String> decoratedCallable = ttlManager.decorateCallable("decorator-demo", originalCallable);

// 4. 创建线程池并执行
ExecutorService executor = Executors.newFixedThreadPool(2);
Future<String> future = executor.submit(decoratedCallable);

// 5. 等待任务完成
String result = future.get(5, TimeUnit.SECONDS);
log.info("装饰器结果: {}", result);

log.info("TTL装饰器使用示例完成");

} catch (Exception e) {
log.error("TTL装饰器使用示例失败", e);
}
}

/**
* 线程池传递
*/
private void demonstrateThreadPoolTransmission() {
log.info("--- 线程池传递 ---");

try {
// 1. 设置上下文
RequestContext requestContext = RequestContext.builder()
.requestId(UUID.randomUUID().toString())
.requestTime(System.currentTimeMillis())
.clientIp("192.168.1.1")
.userAgent("Mozilla/5.0")
.build();

ttlManager.setValue("requestContext", requestContext);

// 2. 创建装饰的线程池
ExecutorService decoratedExecutor = ttlManager.createDecoratedThreadPool(
"transmission-pool", 2, 4);

// 3. 在线程池中执行任务
List<Callable<String>> tasks = Arrays.asList(
() -> {
RequestContext context = ttlManager.getValue("requestContext");
return "任务1: " + (context != null ? context.getRequestId() : "无上下文");
},
() -> {
RequestContext context = ttlManager.getValue("requestContext");
return "任务2: " + (context != null ? context.getRequestId() : "无上下文");
},
() -> {
RequestContext context = ttlManager.getValue("requestContext");
return "任务3: " + (context != null ? context.getRequestId() : "无上下文");
}
);

// 4. 批量执行任务
CompletableFuture<List<String>> future = CompletableFuture.allOf(
tasks.stream()
.map(task -> CompletableFuture.supplyAsync(() -> {
try {
return task.call();
} catch (Exception e) {
throw new RuntimeException(e);
}
}, decoratedExecutor))
.toArray(CompletableFuture[]::new)
).thenApply(v -> tasks.stream()
.map(task -> {
try {
return task.call();
} catch (Exception e) {
throw new RuntimeException(e);
}
})
.collect(Collectors.toList()));

// 5. 等待任务完成
List<String> results = future.get(10, TimeUnit.SECONDS);
log.info("线程池传递结果: {}", results);

log.info("线程池传递示例完成");

} catch (Exception e) {
log.error("线程池传递示例失败", e);
}
}

/**
* 异步任务处理
*/
private void demonstrateAsyncTaskHandling() {
log.info("--- 异步任务处理 ---");

try {
// 1. 设置上下文
LogContext logContext = LogContext.builder()
.traceId(UUID.randomUUID().toString())
.spanId(UUID.randomUUID().toString())
.parentSpanId(null)
.build();

ttlManager.setValue("logContext", logContext);

// 2. 执行异步任务(带超时)
CompletableFuture<String> future = ttlManager.executeAsyncTaskWithTimeout(
"async-task-demo", () -> {
LogContext context = ttlManager.getValue("logContext");
if (context != null) {
log.info("异步任务获取到日志上下文: {}", context.getTraceId());
return "任务完成: " + context.getTraceId();
} else {
return "任务完成: 无上下文";
}
}, 5, TimeUnit.SECONDS);

// 3. 等待任务完成
String result = future.get(10, TimeUnit.SECONDS);
log.info("异步任务结果: {}", result);

// 4. 执行异步任务(带重试)
CompletableFuture<String> retryFuture = ttlManager.executeAsyncTaskWithRetry(
"async-task-retry-demo", () -> {
LogContext context = ttlManager.getValue("logContext");
if (context != null) {
log.info("重试任务获取到日志上下文: {}", context.getTraceId());
return "重试任务完成: " + context.getTraceId();
} else {
return "重试任务完成: 无上下文";
}
}, 3);

// 5. 等待重试任务完成
String retryResult = retryFuture.get(10, TimeUnit.SECONDS);
log.info("重试任务结果: {}", retryResult);

log.info("异步任务处理示例完成");

} catch (Exception e) {
log.error("异步任务处理示例失败", e);
}
}

/**
* 性能优化
*/
private void demonstratePerformanceOptimization() {
log.info("--- 性能优化 ---");

try {
// 1. 使用缓存
TransmittableThreadLocal<Map<String, Object>> cacheTTL = ttlManager.createTransmittableThreadLocal(
"cacheTTL", Map.class);

// 2. 在父线程中设置缓存
Map<String, Object> cache = new HashMap<>();
cache.put("key1", "value1");
cache.put("key2", "value2");
cacheTTL.set(cache);

// 3. 在子线程中使用缓存
CompletableFuture<Map<String, Object>> future = ttlManager.executeAsyncTask(
"performance-optimization-demo", () -> {
Map<String, Object> inheritedCache = cacheTTL.get();
if (inheritedCache == null) {
inheritedCache = new HashMap<>();
cacheTTL.set(inheritedCache);
}

// 使用缓存
inheritedCache.put("key3", "value3");
inheritedCache.put("key4", "value4");

return inheritedCache;
});

// 4. 等待任务完成
Map<String, Object> result = future.get(5, TimeUnit.SECONDS);
log.info("性能优化结果: {}", result);

// 5. 清理资源
cacheTTL.remove();

log.info("性能优化示例完成");

} catch (Exception e) {
log.error("性能优化示例失败", e);
}
}
}

七、总结

本文深入探讨了SpringBoot中TransmittableThreadLocal的架构设计与实战应用。通过构建企业级的TransmittableThreadLocal管理框架,我们实现了:

  1. 线程池传递:完美解决线程池场景下的上下文传递问题,支持ThreadPoolExecutor、ScheduledExecutorService等
  2. TTL装饰器:通过装饰器模式提供Runnable、Callable、Executor等组件的TTL支持
  3. 跨线程上下文:实现线程池间的上下文传递,支持异步任务和定时任务
  4. 性能监控与优化:提供全面的性能监控和优化机制
  5. 企业级管理:提供完整的配置管理、监控告警、报告导出等功能

通过这套企业级的TransmittableThreadLocal架构,我们能够安全高效地管理线程池场景下的上下文传递,实现跨线程的上下文管理。随着微服务架构和异步编程的不断发展,构建可扩展、高性能的TransmittableThreadLocal框架将成为企业级架构师必须掌握的核心技能。

在实际应用中,建议遵循TransmittableThreadLocal的最佳实践,合理使用TTL装饰器,及时清理资源,并通过监控和调试工具确保系统的稳定运行。只有这样,才能真正发挥TransmittableThreadLocal的价值,构建高质量的企业级应用系统。