1. ThreadPoolExecutor并发控制概述

ThreadPoolExecutor是Java并发包中最重要的线程池实现,通过合理配置线程池参数可以有效控制并发写入数据库的线程数量,从而避免数据库连接池耗尽、系统资源过载等问题。在数据库写入场景中,合理使用ThreadPoolExecutor可以实现高性能、稳定的并发数据处理。本文将详细介绍ThreadPoolExecutor的各种配置参数、实现方法、性能优化技巧以及在数据库写入场景中的最佳实践。

1.1 ThreadPoolExecutor核心价值

  1. 并发控制: 精确控制同时执行的线程数量
  2. 资源管理: 合理管理线程生命周期和系统资源
  3. 性能优化: 通过线程复用提升系统性能
  4. 稳定性增强: 避免线程过多导致的系统崩溃
  5. 监控能力: 提供丰富的线程池监控指标

1.2 数据库写入并发控制场景

  • 批量数据导入: 大量数据分批并发写入数据库
  • 实时数据同步: 实时数据流并发处理
  • 数据迁移: 数据从一个系统迁移到另一个系统
  • 报表生成: 并发生成多个报表数据
  • 数据清洗: 并发处理数据清洗任务

2. ThreadPoolExecutor基础配置

2.1 线程池配置类

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
/**
* 数据库写入线程池配置类
* @author 运维实战
*/
@Configuration
@EnableConfigurationProperties(ThreadPoolProperties.class)
public class ThreadPoolConfig {

@Autowired
private ThreadPoolProperties threadPoolProperties;

/**
* 数据库写入线程池
* @return 线程池执行器
*/
@Bean("databaseWriteExecutor")
public ThreadPoolExecutor databaseWriteExecutor() {
return new ThreadPoolExecutor(
threadPoolProperties.getCorePoolSize(), // 核心线程数
threadPoolProperties.getMaximumPoolSize(), // 最大线程数
threadPoolProperties.getKeepAliveTime(), // 线程存活时间
TimeUnit.SECONDS, // 时间单位
new LinkedBlockingQueue<>(threadPoolProperties.getQueueCapacity()), // 工作队列
new ThreadFactory() { // 线程工厂
private final AtomicInteger threadNumber = new AtomicInteger(1);
private final String namePrefix = "db-write-";

@Override
public Thread newThread(Runnable r) {
Thread thread = new Thread(r, namePrefix + threadNumber.getAndIncrement());
thread.setDaemon(false);
thread.setPriority(Thread.NORM_PRIORITY);
return thread;
}
},
new ThreadPoolExecutor.CallerRunsPolicy() // 拒绝策略
);
}

/**
* 数据库批量写入线程池
* @return 线程池执行器
*/
@Bean("databaseBatchWriteExecutor")
public ThreadPoolExecutor databaseBatchWriteExecutor() {
return new ThreadPoolExecutor(
threadPoolProperties.getBatchCorePoolSize(), // 批量处理核心线程数
threadPoolProperties.getBatchMaximumPoolSize(), // 批量处理最大线程数
threadPoolProperties.getBatchKeepAliveTime(), // 批量处理线程存活时间
TimeUnit.SECONDS, // 时间单位
new LinkedBlockingQueue<>(threadPoolProperties.getBatchQueueCapacity()), // 批量处理工作队列
new ThreadFactory() { // 线程工厂
private final AtomicInteger threadNumber = new AtomicInteger(1);
private final String namePrefix = "db-batch-";

@Override
public Thread newThread(Runnable r) {
Thread thread = new Thread(r, namePrefix + threadNumber.getAndIncrement());
thread.setDaemon(false);
thread.setPriority(Thread.NORM_PRIORITY);
return thread;
}
},
new ThreadPoolExecutor.AbortPolicy() // 拒绝策略
);
}

/**
* 线程池监控器
* @return 线程池监控器
*/
@Bean
public ThreadPoolMonitor threadPoolMonitor() {
return new ThreadPoolMonitor();
}
}

2.2 线程池配置属性类

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
/**
* 线程池配置属性类
* @author 运维实战
*/
@ConfigurationProperties(prefix = "threadpool.database")
@Data
public class ThreadPoolProperties {

/**
* 核心线程数
*/
private int corePoolSize = 5;

/**
* 最大线程数
*/
private int maximumPoolSize = 20;

/**
* 线程存活时间(秒)
*/
private long keepAliveTime = 60;

/**
* 队列容量
*/
private int queueCapacity = 1000;

/**
* 批量处理核心线程数
*/
private int batchCorePoolSize = 3;

/**
* 批量处理最大线程数
*/
private int batchMaximumPoolSize = 10;

/**
* 批量处理线程存活时间(秒)
*/
private long batchKeepAliveTime = 120;

/**
* 批量处理队列容量
*/
private int batchQueueCapacity = 500;

/**
* 是否允许核心线程超时
*/
private boolean allowCoreThreadTimeOut = true;

/**
* 是否预启动核心线程
*/
private boolean prestartAllCoreThreads = false;
}

2.3 数据库写入服务

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
/**
* 数据库写入服务
* @author 运维实战
*/
@Service
public class DatabaseWriteService {

@Autowired
@Qualifier("databaseWriteExecutor")
private ThreadPoolExecutor databaseWriteExecutor;

@Autowired
@Qualifier("databaseBatchWriteExecutor")
private ThreadPoolExecutor databaseBatchWriteExecutor;

@Autowired
private JdbcTemplate jdbcTemplate;

@Autowired
private ThreadPoolMonitor threadPoolMonitor;

private static final Logger logger = LoggerFactory.getLogger(DatabaseWriteService.class);

/**
* 并发写入单条记录
* @param data 数据
* @return 写入结果
*/
public CompletableFuture<WriteResult> writeSingleRecord(Map<String, Object> data) {
logger.info("开始并发写入单条记录,线程池状态: {}", getThreadPoolStatus());

return CompletableFuture.supplyAsync(() -> {
try {
// 执行数据库写入
int rows = executeInsert(data);

WriteResult result = new WriteResult();
result.setSuccess(true);
result.setRowsAffected(rows);
result.setThreadName(Thread.currentThread().getName());
result.setExecutionTime(System.currentTimeMillis());

logger.info("单条记录写入成功,线程: {}, 影响行数: {}",
Thread.currentThread().getName(), rows);

return result;

} catch (Exception e) {
logger.error("单条记录写入失败,线程: {}", Thread.currentThread().getName(), e);

WriteResult result = new WriteResult();
result.setSuccess(false);
result.setError(e.getMessage());
result.setThreadName(Thread.currentThread().getName());
result.setExecutionTime(System.currentTimeMillis());

return result;
}
}, databaseWriteExecutor);
}

/**
* 并发写入多条记录
* @param dataList 数据列表
* @return 写入结果列表
*/
public CompletableFuture<List<WriteResult>> writeMultipleRecords(List<Map<String, Object>> dataList) {
logger.info("开始并发写入多条记录,数量: {}, 线程池状态: {}", dataList.size(), getThreadPoolStatus());

List<CompletableFuture<WriteResult>> futures = new ArrayList<>();

for (Map<String, Object> data : dataList) {
CompletableFuture<WriteResult> future = writeSingleRecord(data);
futures.add(future);
}

return CompletableFuture.allOf(futures.toArray(new CompletableFuture[0]))
.thenApply(v -> {
List<WriteResult> results = new ArrayList<>();
for (CompletableFuture<WriteResult> future : futures) {
try {
results.add(future.get());
} catch (Exception e) {
logger.error("获取写入结果失败", e);
}
}
return results;
});
}

/**
* 批量并发写入
* @param dataList 数据列表
* @param batchSize 批次大小
* @return 写入结果
*/
public CompletableFuture<BatchWriteResult> batchWriteRecords(List<Map<String, Object>> dataList, int batchSize) {
logger.info("开始批量并发写入,总数量: {}, 批次大小: {}, 线程池状态: {}",
dataList.size(), batchSize, getThreadPoolStatus());

BatchWriteResult result = new BatchWriteResult();
result.setTotalCount(dataList.size());
result.setBatchSize(batchSize);
result.setStartTime(System.currentTimeMillis());

// 分批处理
List<List<Map<String, Object>>> batches = partitionList(dataList, batchSize);

List<CompletableFuture<WriteResult>> futures = new ArrayList<>();

for (int i = 0; i < batches.size(); i++) {
final int batchIndex = i;
final List<Map<String, Object>> batch = batches.get(i);

CompletableFuture<WriteResult> future = CompletableFuture.supplyAsync(() -> {
try {
// 执行批量插入
int rows = executeBatchInsert(batch);

WriteResult writeResult = new WriteResult();
writeResult.setSuccess(true);
writeResult.setRowsAffected(rows);
writeResult.setThreadName(Thread.currentThread().getName());
writeResult.setExecutionTime(System.currentTimeMillis());

logger.info("批次 {} 写入成功,线程: {}, 影响行数: {}",
batchIndex + 1, Thread.currentThread().getName(), rows);

return writeResult;

} catch (Exception e) {
logger.error("批次 {} 写入失败,线程: {}", batchIndex + 1, Thread.currentThread().getName(), e);

WriteResult writeResult = new WriteResult();
writeResult.setSuccess(false);
writeResult.setError(e.getMessage());
writeResult.setThreadName(Thread.currentThread().getName());
writeResult.setExecutionTime(System.currentTimeMillis());

return writeResult;
}
}, databaseBatchWriteExecutor);

futures.add(future);
}

return CompletableFuture.allOf(futures.toArray(new CompletableFuture[0]))
.thenApply(v -> {
int successCount = 0;
int failureCount = 0;
int totalRowsAffected = 0;

for (CompletableFuture<WriteResult> future : futures) {
try {
WriteResult writeResult = future.get();
if (writeResult.isSuccess()) {
successCount++;
totalRowsAffected += writeResult.getRowsAffected();
} else {
failureCount++;
}
} catch (Exception e) {
logger.error("获取批量写入结果失败", e);
failureCount++;
}
}

result.setSuccessCount(successCount);
result.setFailureCount(failureCount);
result.setTotalRowsAffected(totalRowsAffected);
result.setEndTime(System.currentTimeMillis());

logger.info("批量并发写入完成,成功: {}, 失败: {}, 总影响行数: {}, 耗时: {}ms",
successCount, failureCount, totalRowsAffected, result.getDuration());

return result;
});
}

/**
* 执行单条插入
* @param data 数据
* @return 影响行数
*/
private int executeInsert(Map<String, Object> data) {
String sql = "INSERT INTO user_data (id, name, email, age, created_at) VALUES (?, ?, ?, ?, ?)";

return jdbcTemplate.update(sql,
data.get("id"),
data.get("name"),
data.get("email"),
data.get("age"),
new Timestamp(System.currentTimeMillis())
);
}

/**
* 执行批量插入
* @param dataList 数据列表
* @return 影响行数
*/
private int executeBatchInsert(List<Map<String, Object>> dataList) {
String sql = "INSERT INTO user_data (id, name, email, age, created_at) VALUES (?, ?, ?, ?, ?)";

int[] results = jdbcTemplate.batchUpdate(sql, new BatchPreparedStatementSetter() {
@Override
public void setValues(PreparedStatement ps, int i) throws SQLException {
Map<String, Object> data = dataList.get(i);
ps.setObject(1, data.get("id"));
ps.setObject(2, data.get("name"));
ps.setObject(3, data.get("email"));
ps.setObject(4, data.get("age"));
ps.setTimestamp(5, new Timestamp(System.currentTimeMillis()));
}

@Override
public int getBatchSize() {
return dataList.size();
}
});

return Arrays.stream(results).sum();
}

/**
* 分割列表
* @param list 原始列表
* @param batchSize 批次大小
* @return 分割后的列表
*/
private <T> List<List<T>> partitionList(List<T> list, int batchSize) {
List<List<T>> partitions = new ArrayList<>();

for (int i = 0; i < list.size(); i += batchSize) {
int end = Math.min(i + batchSize, list.size());
partitions.add(list.subList(i, end));
}

return partitions;
}

/**
* 获取线程池状态
* @return 线程池状态
*/
private String getThreadPoolStatus() {
return String.format("核心线程数: %d, 最大线程数: %d, 当前线程数: %d, 活跃线程数: %d, 队列大小: %d",
databaseWriteExecutor.getCorePoolSize(),
databaseWriteExecutor.getMaximumPoolSize(),
databaseWriteExecutor.getPoolSize(),
databaseWriteExecutor.getActiveCount(),
databaseWriteExecutor.getQueue().size()
);
}
}

2.4 写入结果类

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
/**
* 写入结果类
* @author 运维实战
*/
@Data
public class WriteResult {

private boolean success;
private int rowsAffected;
private String error;
private String threadName;
private long executionTime;

public WriteResult() {
this.success = false;
this.rowsAffected = 0;
this.executionTime = System.currentTimeMillis();
}
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
/**
* 批量写入结果类
* @author 运维实战
*/
@Data
public class BatchWriteResult {

private int totalCount;
private int batchSize;
private int successCount;
private int failureCount;
private int totalRowsAffected;
private long startTime;
private long endTime;

public BatchWriteResult() {
this.successCount = 0;
this.failureCount = 0;
this.totalRowsAffected = 0;
}

/**
* 获取处理耗时
* @return 处理耗时(毫秒)
*/
public long getDuration() {
return endTime - startTime;
}

/**
* 获取成功率
* @return 成功率
*/
public double getSuccessRate() {
if (totalCount == 0) return 0.0;
return (double) successCount / totalCount * 100;
}

/**
* 是否全部成功
* @return 是否全部成功
*/
public boolean isAllSuccess() {
return failureCount == 0;
}
}

3. 线程池监控和管理

3.1 线程池监控器

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
/**
* 线程池监控器
* @author 运维实战
*/
@Component
public class ThreadPoolMonitor {

@Autowired
@Qualifier("databaseWriteExecutor")
private ThreadPoolExecutor databaseWriteExecutor;

@Autowired
@Qualifier("databaseBatchWriteExecutor")
private ThreadPoolExecutor databaseBatchWriteExecutor;

private static final Logger logger = LoggerFactory.getLogger(ThreadPoolMonitor.class);

/**
* 获取线程池状态
* @param executor 线程池执行器
* @return 线程池状态
*/
public ThreadPoolStatus getThreadPoolStatus(ThreadPoolExecutor executor) {
ThreadPoolStatus status = new ThreadPoolStatus();
status.setCorePoolSize(executor.getCorePoolSize());
status.setMaximumPoolSize(executor.getMaximumPoolSize());
status.setCurrentPoolSize(executor.getPoolSize());
status.setActiveCount(executor.getActiveCount());
status.setQueueSize(executor.getQueue().size());
status.setCompletedTaskCount(executor.getCompletedTaskCount());
status.setTaskCount(executor.getTaskCount());
status.setLargestPoolSize(executor.getLargestPoolSize());

return status;
}

/**
* 获取数据库写入线程池状态
* @return 线程池状态
*/
public ThreadPoolStatus getDatabaseWriteThreadPoolStatus() {
return getThreadPoolStatus(databaseWriteExecutor);
}

/**
* 获取数据库批量写入线程池状态
* @return 线程池状态
*/
public ThreadPoolStatus getDatabaseBatchWriteThreadPoolStatus() {
return getThreadPoolStatus(databaseBatchWriteExecutor);
}

/**
* 获取所有线程池状态
* @return 所有线程池状态
*/
public Map<String, ThreadPoolStatus> getAllThreadPoolStatus() {
Map<String, ThreadPoolStatus> statusMap = new HashMap<>();
statusMap.put("databaseWrite", getDatabaseWriteThreadPoolStatus());
statusMap.put("databaseBatchWrite", getDatabaseBatchWriteThreadPoolStatus());
return statusMap;
}

/**
* 检查线程池健康状态
* @param executor 线程池执行器
* @return 健康状态
*/
public ThreadPoolHealthStatus checkThreadPoolHealth(ThreadPoolExecutor executor) {
ThreadPoolHealthStatus healthStatus = new ThreadPoolHealthStatus();

int activeCount = executor.getActiveCount();
int maximumPoolSize = executor.getMaximumPoolSize();
int queueSize = executor.getQueue().size();
int queueCapacity = executor.getQueue().remainingCapacity() + queueSize;

// 计算使用率
double threadUsageRate = (double) activeCount / maximumPoolSize * 100;
double queueUsageRate = (double) queueSize / queueCapacity * 100;

healthStatus.setThreadUsageRate(threadUsageRate);
healthStatus.setQueueUsageRate(queueUsageRate);

// 判断健康状态
if (threadUsageRate > 90 || queueUsageRate > 90) {
healthStatus.setStatus(ThreadPoolHealthStatus.HealthStatus.CRITICAL);
healthStatus.setMessage("线程池使用率过高,存在风险");
} else if (threadUsageRate > 70 || queueUsageRate > 70) {
healthStatus.setStatus(ThreadPoolHealthStatus.HealthStatus.WARNING);
healthStatus.setMessage("线程池使用率较高,需要关注");
} else {
healthStatus.setStatus(ThreadPoolHealthStatus.HealthStatus.HEALTHY);
healthStatus.setMessage("线程池状态健康");
}

return healthStatus;
}

/**
* 定期监控线程池状态
*/
@Scheduled(fixedRate = 30000) // 每30秒监控一次
public void monitorThreadPoolStatus() {
try {
ThreadPoolStatus writeStatus = getDatabaseWriteThreadPoolStatus();
ThreadPoolStatus batchWriteStatus = getDatabaseBatchWriteThreadPoolStatus();

logger.info("数据库写入线程池状态: 核心线程数={}, 最大线程数={}, 当前线程数={}, 活跃线程数={}, 队列大小={}",
writeStatus.getCorePoolSize(), writeStatus.getMaximumPoolSize(),
writeStatus.getCurrentPoolSize(), writeStatus.getActiveCount(), writeStatus.getQueueSize());

logger.info("数据库批量写入线程池状态: 核心线程数={}, 最大线程数={}, 当前线程数={}, 活跃线程数={}, 队列大小={}",
batchWriteStatus.getCorePoolSize(), batchWriteStatus.getMaximumPoolSize(),
batchWriteStatus.getCurrentPoolSize(), batchWriteStatus.getActiveCount(), batchWriteStatus.getQueueSize());

// 检查健康状态
ThreadPoolHealthStatus writeHealth = checkThreadPoolHealth(databaseWriteExecutor);
ThreadPoolHealthStatus batchWriteHealth = checkThreadPoolHealth(databaseBatchWriteExecutor);

if (writeHealth.getStatus() != ThreadPoolHealthStatus.HealthStatus.HEALTHY) {
logger.warn("数据库写入线程池健康状态: {}", writeHealth.getMessage());
}

if (batchWriteHealth.getStatus() != ThreadPoolHealthStatus.HealthStatus.HEALTHY) {
logger.warn("数据库批量写入线程池健康状态: {}", batchWriteHealth.getMessage());
}

} catch (Exception e) {
logger.error("监控线程池状态失败", e);
}
}

/**
* 动态调整线程池参数
* @param executor 线程池执行器
* @param corePoolSize 核心线程数
* @param maximumPoolSize 最大线程数
*/
public void adjustThreadPoolParameters(ThreadPoolExecutor executor, int corePoolSize, int maximumPoolSize) {
try {
executor.setCorePoolSize(corePoolSize);
executor.setMaximumPoolSize(maximumPoolSize);

logger.info("线程池参数调整完成: 核心线程数={}, 最大线程数={}", corePoolSize, maximumPoolSize);
} catch (Exception e) {
logger.error("调整线程池参数失败", e);
}
}
}

3.2 线程池状态类

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
/**
* 线程池状态类
* @author 运维实战
*/
@Data
public class ThreadPoolStatus {

private int corePoolSize;
private int maximumPoolSize;
private int currentPoolSize;
private int activeCount;
private int queueSize;
private long completedTaskCount;
private long taskCount;
private int largestPoolSize;

/**
* 获取线程使用率
* @return 线程使用率
*/
public double getThreadUsageRate() {
if (maximumPoolSize == 0) return 0.0;
return (double) activeCount / maximumPoolSize * 100;
}

/**
* 获取队列使用率
* @return 队列使用率
*/
public double getQueueUsageRate() {
if (queueSize == 0) return 0.0;
return (double) queueSize / (queueSize + 1000) * 100; // 假设队列容量为1000
}

/**
* 是否健康
* @return 是否健康
*/
public boolean isHealthy() {
return getThreadUsageRate() < 80 && getQueueUsageRate() < 80;
}
}

3.3 线程池健康状态类

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
/**
* 线程池健康状态类
* @author 运维实战
*/
@Data
public class ThreadPoolHealthStatus {

private HealthStatus status;
private String message;
private double threadUsageRate;
private double queueUsageRate;

/**
* 健康状态枚举
*/
public enum HealthStatus {
HEALTHY("健康"),
WARNING("警告"),
CRITICAL("严重");

private final String description;

HealthStatus(String description) {
this.description = description;
}

public String getDescription() {
return description;
}
}

public ThreadPoolHealthStatus() {
this.status = HealthStatus.HEALTHY;
this.message = "线程池状态正常";
}
}

4. 并发控制策略

4.1 并发控制服务

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
/**
* 并发控制服务
* @author 运维实战
*/
@Service
public class ConcurrencyControlService {

@Autowired
@Qualifier("databaseWriteExecutor")
private ThreadPoolExecutor databaseWriteExecutor;

@Autowired
private ThreadPoolMonitor threadPoolMonitor;

private static final Logger logger = LoggerFactory.getLogger(ConcurrencyControlService.class);

/**
* 基于线程池状态的并发控制
* @param task 任务
* @return 执行结果
*/
public <T> CompletableFuture<T> executeWithConcurrencyControl(Supplier<T> task) {
return CompletableFuture.supplyAsync(() -> {
try {
// 检查线程池状态
ThreadPoolStatus status = threadPoolMonitor.getDatabaseWriteThreadPoolStatus();

// 如果线程池使用率过高,等待一段时间
if (status.getThreadUsageRate() > 80) {
logger.warn("线程池使用率过高: {}%, 等待执行", status.getThreadUsageRate());
Thread.sleep(1000); // 等待1秒
}

// 执行任务
return task.get();

} catch (Exception e) {
logger.error("并发控制执行失败", e);
throw new RuntimeException(e);
}
}, databaseWriteExecutor);
}

/**
* 基于信号量的并发控制
* @param task 任务
* @param semaphore 信号量
* @return 执行结果
*/
public <T> CompletableFuture<T> executeWithSemaphore(Supplier<T> task, Semaphore semaphore) {
return CompletableFuture.supplyAsync(() -> {
try {
// 获取信号量
semaphore.acquire();

try {
// 执行任务
return task.get();
} finally {
// 释放信号量
semaphore.release();
}

} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new RuntimeException("任务被中断", e);
} catch (Exception e) {
logger.error("信号量控制执行失败", e);
throw new RuntimeException(e);
}
}, databaseWriteExecutor);
}

/**
* 基于计数器的并发控制
* @param task 任务
* @param maxConcurrency 最大并发数
* @return 执行结果
*/
public <T> CompletableFuture<T> executeWithCounter(Supplier<T> task, int maxConcurrency) {
return CompletableFuture.supplyAsync(() -> {
try {
// 检查当前并发数
int currentConcurrency = databaseWriteExecutor.getActiveCount();

if (currentConcurrency >= maxConcurrency) {
logger.warn("当前并发数: {} 已达到最大限制: {}, 等待执行", currentConcurrency, maxConcurrency);
Thread.sleep(500); // 等待500毫秒
}

// 执行任务
return task.get();

} catch (Exception e) {
logger.error("计数器控制执行失败", e);
throw new RuntimeException(e);
}
}, databaseWriteExecutor);
}

/**
* 动态调整并发数
* @param targetConcurrency 目标并发数
*/
public void adjustConcurrency(int targetConcurrency) {
try {
ThreadPoolStatus currentStatus = threadPoolMonitor.getDatabaseWriteThreadPoolStatus();

if (targetConcurrency != currentStatus.getMaximumPoolSize()) {
threadPoolMonitor.adjustThreadPoolParameters(databaseWriteExecutor,
Math.min(targetConcurrency, currentStatus.getCorePoolSize()), targetConcurrency);

logger.info("并发数调整完成: {} -> {}", currentStatus.getMaximumPoolSize(), targetConcurrency);
}

} catch (Exception e) {
logger.error("调整并发数失败", e);
}
}

/**
* 获取最优并发数
* @return 最优并发数
*/
public int getOptimalConcurrency() {
try {
ThreadPoolStatus status = threadPoolMonitor.getDatabaseWriteThreadPoolStatus();

// 基于CPU核心数和数据库连接池大小计算最优并发数
int cpuCores = Runtime.getRuntime().availableProcessors();
int dbConnectionPoolSize = 20; // 假设数据库连接池大小为20

// 最优并发数 = min(CPU核心数 * 2, 数据库连接池大小 * 0.8)
int optimalConcurrency = Math.min(cpuCores * 2, (int) (dbConnectionPoolSize * 0.8));

logger.info("计算最优并发数: CPU核心数={}, 数据库连接池大小={}, 最优并发数={}",
cpuCores, dbConnectionPoolSize, optimalConcurrency);

return optimalConcurrency;

} catch (Exception e) {
logger.error("计算最优并发数失败", e);
return 10; // 默认并发数
}
}
}

4.2 并发控制配置

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
/**
* 并发控制配置
* @author 运维实战
*/
@Configuration
public class ConcurrencyControlConfig {

/**
* 数据库写入信号量
* @return 信号量
*/
@Bean("databaseWriteSemaphore")
public Semaphore databaseWriteSemaphore() {
return new Semaphore(10); // 最多允许10个并发写入
}

/**
* 数据库批量写入信号量
* @return 信号量
*/
@Bean("databaseBatchWriteSemaphore")
public Semaphore databaseBatchWriteSemaphore() {
return new Semaphore(5); // 最多允许5个并发批量写入
}

/**
* 并发控制属性
* @return 并发控制属性
*/
@Bean
@ConfigurationProperties(prefix = "concurrency.control")
public ConcurrencyControlProperties concurrencyControlProperties() {
return new ConcurrencyControlProperties();
}
}

4.3 并发控制属性类

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
/**
* 并发控制属性类
* @author 运维实战
*/
@Data
@ConfigurationProperties(prefix = "concurrency.control")
public class ConcurrencyControlProperties {

/**
* 最大并发写入数
*/
private int maxConcurrentWrites = 10;

/**
* 最大并发批量写入数
*/
private int maxConcurrentBatchWrites = 5;

/**
* 并发控制策略
*/
private String strategy = "THREAD_POOL"; // THREAD_POOL, SEMAPHORE, COUNTER

/**
* 是否启用动态调整
*/
private boolean enableDynamicAdjustment = true;

/**
* 调整间隔(秒)
*/
private long adjustmentInterval = 60;

/**
* 并发数调整阈值
*/
private double adjustmentThreshold = 0.8;
}

5. 实际应用示例

5.1 数据库写入控制器

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
/**
* 数据库写入控制器
* @author 运维实战
*/
@RestController
@RequestMapping("/api/database/write")
public class DatabaseWriteController {

@Autowired
private DatabaseWriteService databaseWriteService;

@Autowired
private ConcurrencyControlService concurrencyControlService;

@Autowired
@Qualifier("databaseWriteSemaphore")
private Semaphore databaseWriteSemaphore;

private static final Logger logger = LoggerFactory.getLogger(DatabaseWriteController.class);

/**
* 单条记录写入
* @param request 写入请求
* @return 写入结果
*/
@PostMapping("/single")
public ResponseEntity<WriteResult> writeSingle(@RequestBody WriteRequest request) {
try {
logger.info("接收到单条记录写入请求");

CompletableFuture<WriteResult> future = concurrencyControlService.executeWithSemaphore(
() -> databaseWriteService.writeSingleRecord(request.getData()).join(),
databaseWriteSemaphore
);

WriteResult result = future.get(30, TimeUnit.SECONDS);
return ResponseEntity.ok(result);

} catch (Exception e) {
logger.error("单条记录写入失败", e);
return ResponseEntity.status(HttpStatus.INTERNAL_SERVER_ERROR).build();
}
}

/**
* 多条记录写入
* @param request 批量写入请求
* @return 写入结果
*/
@PostMapping("/multiple")
public ResponseEntity<List<WriteResult>> writeMultiple(@RequestBody BatchWriteRequest request) {
try {
logger.info("接收到多条记录写入请求,数量: {}", request.getDataList().size());

CompletableFuture<List<WriteResult>> future = databaseWriteService.writeMultipleRecords(request.getDataList());
List<WriteResult> results = future.get(60, TimeUnit.SECONDS);

return ResponseEntity.ok(results);

} catch (Exception e) {
logger.error("多条记录写入失败", e);
return ResponseEntity.status(HttpStatus.INTERNAL_SERVER_ERROR).build();
}
}

/**
* 批量写入
* @param request 批量写入请求
* @return 写入结果
*/
@PostMapping("/batch")
public ResponseEntity<BatchWriteResult> writeBatch(@RequestBody BatchWriteRequest request) {
try {
logger.info("接收到批量写入请求,数量: {}, 批次大小: {}",
request.getDataList().size(), request.getBatchSize());

CompletableFuture<BatchWriteResult> future = databaseWriteService.batchWriteRecords(
request.getDataList(), request.getBatchSize()
);

BatchWriteResult result = future.get(120, TimeUnit.SECONDS);
return ResponseEntity.ok(result);

} catch (Exception e) {
logger.error("批量写入失败", e);
return ResponseEntity.status(HttpStatus.INTERNAL_SERVER_ERROR).build();
}
}

/**
* 获取线程池状态
* @return 线程池状态
*/
@GetMapping("/threadpool/status")
public ResponseEntity<Map<String, Object>> getThreadPoolStatus() {
try {
Map<String, Object> status = new HashMap<>();
status.put("databaseWrite", databaseWriteService.getThreadPoolStatus());
status.put("availablePermits", databaseWriteSemaphore.availablePermits());
status.put("queueLength", databaseWriteSemaphore.getQueueLength());

return ResponseEntity.ok(status);

} catch (Exception e) {
logger.error("获取线程池状态失败", e);
return ResponseEntity.status(HttpStatus.INTERNAL_SERVER_ERROR).build();
}
}

/**
* 调整并发数
* @param request 调整请求
* @return 调整结果
*/
@PostMapping("/concurrency/adjust")
public ResponseEntity<Map<String, String>> adjustConcurrency(@RequestBody ConcurrencyAdjustRequest request) {
try {
logger.info("接收到并发数调整请求: {}", request.getTargetConcurrency());

concurrencyControlService.adjustConcurrency(request.getTargetConcurrency());

Map<String, String> response = new HashMap<>();
response.put("status", "SUCCESS");
response.put("message", "并发数调整完成");
response.put("targetConcurrency", String.valueOf(request.getTargetConcurrency()));

return ResponseEntity.ok(response);

} catch (Exception e) {
logger.error("调整并发数失败", e);
return ResponseEntity.status(HttpStatus.INTERNAL_SERVER_ERROR).build();
}
}
}

5.2 请求类定义

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
/**
* 写入请求类
* @author 运维实战
*/
@Data
public class WriteRequest {

private Map<String, Object> data;

public WriteRequest() {}

public WriteRequest(Map<String, Object> data) {
this.data = data;
}
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
/**
* 批量写入请求类
* @author 运维实战
*/
@Data
public class BatchWriteRequest {

private List<Map<String, Object>> dataList;
private int batchSize = 100;

public BatchWriteRequest() {}

public BatchWriteRequest(List<Map<String, Object>> dataList) {
this.dataList = dataList;
}

public BatchWriteRequest(List<Map<String, Object>> dataList, int batchSize) {
this.dataList = dataList;
this.batchSize = batchSize;
}
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
/**
* 并发调整请求类
* @author 运维实战
*/
@Data
public class ConcurrencyAdjustRequest {

private int targetConcurrency;

public ConcurrencyAdjustRequest() {}

public ConcurrencyAdjustRequest(int targetConcurrency) {
this.targetConcurrency = targetConcurrency;
}
}

6. 总结

6.1 ThreadPoolExecutor最佳实践

  1. 合理配置参数: 根据系统资源和业务需求配置线程池参数
  2. 监控线程池状态: 实时监控线程池使用情况和健康状态
  3. 动态调整参数: 根据负载情况动态调整线程池参数
  4. 错误处理: 实现完善的错误处理和重试机制
  5. 资源管理: 合理管理线程生命周期和系统资源

6.2 性能优化建议

  • 核心线程数: 根据CPU核心数和业务特点设置
  • 最大线程数: 考虑数据库连接池大小和系统资源
  • 队列容量: 平衡内存使用和任务积压
  • 拒绝策略: 选择合适的拒绝策略处理任务溢出
  • 线程存活时间: 合理设置线程回收时间

6.3 运维管理要点

  • 实时监控: 监控线程池状态和性能指标
  • 动态调整: 根据负载情况动态调整并发数
  • 健康检查: 定期检查线程池健康状态
  • 异常处理: 建立异常处理和告警机制
  • 日志管理: 完善日志记录和分析

通过本文的ThreadPoolExecutor控制并发写入数据库的线程数Java实战指南,您可以掌握线程池配置、并发控制、性能优化以及在企业级应用中的最佳实践,构建高效、稳定的数据库并发写入系统!