前言

在大数据时代,高效的数据导入能力是系统性能的重要指标。传统的单线程插入方式在面对百万级数据时显得力不从心,而多线程事务处理能够显著提升数据插入性能。本文从架构设计到代码实现,系统梳理3秒插入百万数据的高性能解决方案。

一、高性能插入架构设计

1.1 整体架构图

1.2 性能优化策略

1.2.1 多线程策略

  • 线程池配置:根据CPU核数和数据库连接数合理配置
  • 任务分片:将大数据集分割成小批次并行处理
  • 负载均衡:确保各线程负载均衡,避免热点

1.2.2 批量处理策略

  • 批量大小:根据数据库性能调整批量大小
  • 事务控制:合理控制事务边界,平衡性能与一致性
  • 内存管理:控制内存使用,避免OOM

1.2.3 数据库优化策略

  • 连接池优化:合理配置连接池参数
  • 批量插入:使用批量插入语句
  • 索引优化:临时禁用索引,插入后重建

二、核心实现架构

2.1 高性能批量插入服务

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
@Service
@Slf4j
public class HighPerformanceBatchInsertService {

@Autowired
private DataMapper dataMapper;

@Autowired
private DataSource dataSource;

@Autowired
private MeterRegistry meterRegistry;

// 线程池配置
private final ThreadPoolExecutor executor;
private final CountDownLatch latch;
private final AtomicLong successCount = new AtomicLong(0);
private final AtomicLong failureCount = new AtomicLong(0);
private final List<Exception> exceptions = Collections.synchronizedList(new ArrayList<>());

public HighPerformanceBatchInsertService() {
// 1. 创建线程池
this.executor = new ThreadPoolExecutor(
16, // 核心线程数
32, // 最大线程数
60L, TimeUnit.SECONDS, // 空闲时间
new LinkedBlockingQueue<>(1000), // 队列大小
new ThreadFactoryBuilder()
.setNameFormat("batch-insert-%d")
.setDaemon(false)
.build(),
new ThreadPoolExecutor.CallerRunsPolicy() // 拒绝策略
);

// 2. 初始化计数器
this.latch = new CountDownLatch(0);
}

/**
* 高性能批量插入
*/
public BatchInsertResult batchInsert(List<DataEntity> dataList, BatchInsertConfig config) {
long startTime = System.currentTimeMillis();

try {
// 1. 参数校验
validateBatchInsertParams(dataList, config);

// 2. 数据预处理
List<List<DataEntity>> batches = preprocessData(dataList, config);

// 3. 重置计数器
resetCounters(batches.size());

// 4. 并行处理
processBatchesInParallel(batches, config);

// 5. 等待完成
waitForCompletion(config.getTimeoutSeconds());

// 6. 统计结果
BatchInsertResult result = buildResult(startTime);

log.info("批量插入完成: 总数={}, 成功={}, 失败={}, 耗时={}ms",
dataList.size(), successCount.get(), failureCount.get(),
System.currentTimeMillis() - startTime);

return result;

} catch (Exception e) {
log.error("批量插入失败", e);
throw new BusinessException("批量插入失败: " + e.getMessage());
} finally {
// 7. 清理资源
cleanup();
}
}

/**
* 并行处理批次
*/
private void processBatchesInParallel(List<List<DataEntity>> batches, BatchInsertConfig config) {
for (int i = 0; i < batches.size(); i++) {
List<DataEntity> batch = batches.get(i);
int batchIndex = i;

executor.submit(() -> {
try {
processBatch(batch, batchIndex, config);
} catch (Exception e) {
log.error("批次处理失败: batchIndex={}", batchIndex, e);
failureCount.addAndGet(batch.size());
exceptions.add(e);
}
});
}
}

/**
* 处理单个批次
*/
private void processBatch(List<DataEntity> batch, int batchIndex, BatchInsertConfig config) {
Connection connection = null;
PreparedStatement statement = null;

try {
// 1. 获取数据库连接
connection = dataSource.getConnection();
connection.setAutoCommit(false);

// 2. 准备批量插入语句
String sql = buildBatchInsertSql(config.getTableName(), batch.get(0));
statement = connection.prepareStatement(sql);

// 3. 批量设置参数
for (DataEntity entity : batch) {
setBatchInsertParameters(statement, entity);
statement.addBatch();
}

// 4. 执行批量插入
int[] results = statement.executeBatch();

// 5. 提交事务
connection.commit();

// 6. 统计成功数量
int successCountInBatch = Arrays.stream(results)
.filter(result -> result >= 0)
.sum();

successCount.addAndGet(successCountInBatch);

// 7. 记录性能指标
recordPerformanceMetrics(batch.size(), batchIndex);

log.debug("批次处理完成: batchIndex={}, size={}, success={}",
batchIndex, batch.size(), successCountInBatch);

} catch (Exception e) {
try {
if (connection != null) {
connection.rollback();
}
} catch (SQLException rollbackException) {
log.error("事务回滚失败", rollbackException);
}

log.error("批次处理异常: batchIndex={}", batchIndex, e);
failureCount.addAndGet(batch.size());
exceptions.add(e);

} finally {
// 8. 关闭资源
closeResources(connection, statement);
}
}

/**
* 数据预处理
*/
private List<List<DataEntity>> preprocessData(List<DataEntity> dataList, BatchInsertConfig config) {
// 1. 数据验证
List<DataEntity> validData = validateData(dataList);

// 2. 数据清洗
List<DataEntity> cleanedData = cleanData(validData);

// 3. 数据分片
List<List<DataEntity>> batches = splitIntoBatches(cleanedData, config.getBatchSize());

log.info("数据预处理完成: 原始={}, 有效={}, 清洗后={}, 批次={}",
dataList.size(), validData.size(), cleanedData.size(), batches.size());

return batches;
}

/**
* 数据验证
*/
private List<DataEntity> validateData(List<DataEntity> dataList) {
return dataList.stream()
.filter(this::isValidData)
.collect(Collectors.toList());
}

/**
* 数据清洗
*/
private List<DataEntity> cleanData(List<DataEntity> dataList) {
return dataList.stream()
.map(this::cleanDataEntity)
.collect(Collectors.toList());
}

/**
* 数据分片
*/
private List<List<DataEntity>> splitIntoBatches(List<DataEntity> dataList, int batchSize) {
List<List<DataEntity>> batches = new ArrayList<>();

for (int i = 0; i < dataList.size(); i += batchSize) {
int endIndex = Math.min(i + batchSize, dataList.size());
List<DataEntity> batch = dataList.subList(i, endIndex);
batches.add(new ArrayList<>(batch));
}

return batches;
}

/**
* 构建批量插入SQL
*/
private String buildBatchInsertSql(String tableName, DataEntity entity) {
StringBuilder sql = new StringBuilder();
sql.append("INSERT INTO ").append(tableName).append(" (");

// 获取字段名
Field[] fields = entity.getClass().getDeclaredFields();
for (int i = 0; i < fields.length; i++) {
if (i > 0) sql.append(", ");
sql.append(fields[i].getName());
}

sql.append(") VALUES (");

// 构建占位符
for (int i = 0; i < fields.length; i++) {
if (i > 0) sql.append(", ");
sql.append("?");
}

sql.append(")");

return sql.toString();
}

/**
* 设置批量插入参数
*/
private void setBatchInsertParameters(PreparedStatement statement, DataEntity entity) throws SQLException {
Field[] fields = entity.getClass().getDeclaredFields();

for (int i = 0; i < fields.length; i++) {
Field field = fields[i];
field.setAccessible(true);

try {
Object value = field.get(entity);
statement.setObject(i + 1, value);
} catch (IllegalAccessException e) {
throw new SQLException("设置参数失败: " + field.getName(), e);
}
}
}

/**
* 记录性能指标
*/
private void recordPerformanceMetrics(int batchSize, int batchIndex) {
// 1. 记录处理数量
Counter.builder("batch.insert.count")
.tag("batch_size", String.valueOf(batchSize))
.register(meterRegistry)
.increment();

// 2. 记录批次索引
Gauge.builder("batch.insert.index")
.register(meterRegistry, batchIndex, Number::doubleValue);
}

/**
* 等待完成
*/
private void waitForCompletion(int timeoutSeconds) throws InterruptedException {
if (!latch.await(timeoutSeconds, TimeUnit.SECONDS)) {
log.warn("批量插入超时: timeout={}s", timeoutSeconds);
}
}

/**
* 构建结果
*/
private BatchInsertResult buildResult(long startTime) {
BatchInsertResult result = new BatchInsertResult();
result.setTotalCount(successCount.get() + failureCount.get());
result.setSuccessCount(successCount.get());
result.setFailureCount(failureCount.get());
result.setSuccessRate((double) successCount.get() / result.getTotalCount());
result.setDuration(System.currentTimeMillis() - startTime);
result.setThroughput((double) result.getTotalCount() / result.getDuration() * 1000);
result.setExceptions(new ArrayList<>(exceptions));

return result;
}

/**
* 重置计数器
*/
private void resetCounters(int batchCount) {
successCount.set(0);
failureCount.set(0);
exceptions.clear();
latch = new CountDownLatch(batchCount);
}

/**
* 清理资源
*/
private void cleanup() {
// 关闭线程池
executor.shutdown();
try {
if (!executor.awaitTermination(60, TimeUnit.SECONDS)) {
executor.shutdownNow();
}
} catch (InterruptedException e) {
executor.shutdownNow();
Thread.currentThread().interrupt();
}
}

/**
* 关闭数据库资源
*/
private void closeResources(Connection connection, PreparedStatement statement) {
try {
if (statement != null) {
statement.close();
}
} catch (SQLException e) {
log.warn("关闭PreparedStatement失败", e);
}

try {
if (connection != null) {
connection.close();
}
} catch (SQLException e) {
log.warn("关闭Connection失败", e);
}
}

/**
* 验证数据有效性
*/
private boolean isValidData(DataEntity entity) {
// 实现数据验证逻辑
return entity != null && entity.getId() != null;
}

/**
* 清洗数据实体
*/
private DataEntity cleanDataEntity(DataEntity entity) {
// 实现数据清洗逻辑
return entity;
}

/**
* 参数校验
*/
private void validateBatchInsertParams(List<DataEntity> dataList, BatchInsertConfig config) {
if (dataList == null || dataList.isEmpty()) {
throw new IllegalArgumentException("数据列表不能为空");
}

if (config == null) {
throw new IllegalArgumentException("配置不能为空");
}

if (config.getBatchSize() <= 0) {
throw new IllegalArgumentException("批次大小必须大于0");
}
}
}

2.2 线程池配置优化

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
@Configuration
@EnableConfigurationProperties(ThreadPoolProperties.class)
public class ThreadPoolConfig {

@Autowired
private ThreadPoolProperties properties;

/**
* 批量插入线程池
*/
@Bean("batchInsertExecutor")
public ThreadPoolExecutor batchInsertExecutor() {
return new ThreadPoolExecutor(
properties.getCorePoolSize(),
properties.getMaxPoolSize(),
properties.getKeepAliveTime(),
TimeUnit.SECONDS,
new LinkedBlockingQueue<>(properties.getQueueCapacity()),
new ThreadFactoryBuilder()
.setNameFormat("batch-insert-%d")
.setDaemon(false)
.setPriority(Thread.NORM_PRIORITY)
.build(),
new ThreadPoolExecutor.CallerRunsPolicy()
);
}

/**
* 数据预处理线程池
*/
@Bean("dataPreprocessExecutor")
public ThreadPoolExecutor dataPreprocessExecutor() {
return new ThreadPoolExecutor(
Runtime.getRuntime().availableProcessors(),
Runtime.getRuntime().availableProcessors() * 2,
60L,
TimeUnit.SECONDS,
new LinkedBlockingQueue<>(1000),
new ThreadFactoryBuilder()
.setNameFormat("data-preprocess-%d")
.setDaemon(false)
.build(),
new ThreadPoolExecutor.AbortPolicy()
);
}

/**
* 监控线程池
*/
@Bean("monitorExecutor")
public ScheduledExecutorService monitorExecutor() {
return Executors.newScheduledThreadPool(2,
new ThreadFactoryBuilder()
.setNameFormat("monitor-%d")
.setDaemon(true)
.build());
}
}

@ConfigurationProperties(prefix = "thread.pool")
@Data
public class ThreadPoolProperties {
private int corePoolSize = 16;
private int maxPoolSize = 32;
private long keepAliveTime = 60L;
private int queueCapacity = 1000;
private int preprocessCoreSize = 8;
private int preprocessMaxSize = 16;
}

2.3 数据库连接池优化

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
@Configuration
public class DataSourceConfig {

@Bean
@Primary
@ConfigurationProperties("spring.datasource.hikari")
public DataSource dataSource() {
HikariConfig config = new HikariConfig();

// 基础配置
config.setJdbcUrl("jdbc:mysql://localhost:3306/test?useUnicode=true&characterEncoding=utf8&useSSL=false");
config.setUsername("root");
config.setPassword("password");
config.setDriverClassName("com.mysql.cj.jdbc.Driver");

// 连接池配置
config.setMaximumPoolSize(50); // 最大连接数
config.setMinimumIdle(10); // 最小空闲连接数
config.setConnectionTimeout(30000); // 连接超时时间
config.setIdleTimeout(600000); // 空闲超时时间
config.setMaxLifetime(1800000); // 连接最大生命周期
config.setLeakDetectionThreshold(60000); // 连接泄漏检测阈值

// 性能优化配置
config.addDataSourceProperty("cachePrepStmts", "true");
config.addDataSourceProperty("prepStmtCacheSize", "250");
config.addDataSourceProperty("prepStmtCacheSqlLimit", "2048");
config.addDataSourceProperty("useServerPrepStmts", "true");
config.addDataSourceProperty("useLocalSessionState", "true");
config.addDataSourceProperty("rewriteBatchedStatements", "true");
config.addDataSourceProperty("cacheResultSetMetadata", "true");
config.addDataSourceProperty("cacheServerConfiguration", "true");
config.addDataSourceProperty("elideSetAutoCommits", "true");
config.addDataSourceProperty("maintainTimeStats", "false");

// 批量插入优化
config.addDataSourceProperty("useBulkStmts", "true");
config.addDataSourceProperty("allowMultiQueries", "true");

return new HikariDataSource(config);
}

/**
* 批量插入专用数据源
*/
@Bean("batchInsertDataSource")
@ConfigurationProperties("spring.datasource.batch")
public DataSource batchInsertDataSource() {
HikariConfig config = new HikariConfig();

// 基础配置
config.setJdbcUrl("jdbc:mysql://localhost:3306/test?useUnicode=true&characterEncoding=utf8&useSSL=false");
config.setUsername("root");
config.setPassword("password");
config.setDriverClassName("com.mysql.cj.jdbc.Driver");

// 批量插入优化配置
config.setMaximumPoolSize(100); // 更大的连接池
config.setMinimumIdle(20);
config.setConnectionTimeout(60000); // 更长的超时时间
config.setIdleTimeout(300000);
config.setMaxLifetime(1800000);

// 批量插入特殊配置
config.addDataSourceProperty("rewriteBatchedStatements", "true");
config.addDataSourceProperty("useBulkStmts", "true");
config.addDataSourceProperty("allowMultiQueries", "true");
config.addDataSourceProperty("useCompression", "false");
config.addDataSourceProperty("autoReconnect", "true");
config.addDataSourceProperty("failOverReadOnly", "false");

return new HikariDataSource(config);
}
}

三、MyBatis批量插入优化

3.1 MyBatis批量插入Mapper

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
@Mapper
public interface BatchInsertMapper {

/**
* 批量插入
*/
@Insert("<script>" +
"INSERT INTO data_table (id, name, value, create_time) VALUES " +
"<foreach collection='list' item='item' separator=','>" +
"(#{item.id}, #{item.name}, #{item.value}, #{item.createTime})" +
"</foreach>" +
"</script>")
int batchInsert(@Param("list") List<DataEntity> list);

/**
* 批量插入(忽略重复)
*/
@Insert("<script>" +
"INSERT IGNORE INTO data_table (id, name, value, create_time) VALUES " +
"<foreach collection='list' item='item' separator=','>" +
"(#{item.id}, #{item.name}, #{item.value}, #{item.createTime})" +
"</foreach>" +
"</script>")
int batchInsertIgnore(@Param("list") List<DataEntity> list);

/**
* 批量插入(重复时更新)
*/
@Insert("<script>" +
"INSERT INTO data_table (id, name, value, create_time) VALUES " +
"<foreach collection='list' item='item' separator=','>" +
"(#{item.id}, #{item.name}, #{item.value}, #{item.createTime})" +
"</foreach>" +
"ON DUPLICATE KEY UPDATE " +
"name = VALUES(name), " +
"value = VALUES(value), " +
"update_time = NOW()" +
"</script>")
int batchInsertOnDuplicateKeyUpdate(@Param("list") List<DataEntity> list);

/**
* 批量插入(使用REPLACE)
*/
@Insert("<script>" +
"REPLACE INTO data_table (id, name, value, create_time) VALUES " +
"<foreach collection='list' item='item' separator=','>" +
"(#{item.id}, #{item.name}, #{item.value}, #{item.createTime})" +
"</foreach>" +
"</script>")
int batchReplace(@Param("list") List<DataEntity> list);
}

3.2 MyBatis批量插入服务

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
@Service
@Slf4j
public class MyBatisBatchInsertService {

@Autowired
private BatchInsertMapper batchInsertMapper;

@Autowired
private SqlSessionFactory sqlSessionFactory;

/**
* 使用MyBatis批量插入
*/
public BatchInsertResult batchInsertWithMyBatis(List<DataEntity> dataList, BatchInsertConfig config) {
long startTime = System.currentTimeMillis();

try {
// 1. 数据分片
List<List<DataEntity>> batches = splitIntoBatches(dataList, config.getBatchSize());

// 2. 并行处理
List<CompletableFuture<Integer>> futures = batches.stream()
.map(batch -> CompletableFuture.supplyAsync(() ->
batchInsertMapper.batchInsert(batch)))
.collect(Collectors.toList());

// 3. 等待完成
CompletableFuture<Void> allFutures = CompletableFuture.allOf(
futures.toArray(new CompletableFuture[0]));

allFutures.get(config.getTimeoutSeconds(), TimeUnit.SECONDS);

// 4. 统计结果
int totalInserted = futures.stream()
.mapToInt(future -> {
try {
return future.get();
} catch (Exception e) {
log.error("获取批量插入结果失败", e);
return 0;
}
})
.sum();

BatchInsertResult result = new BatchInsertResult();
result.setTotalCount(dataList.size());
result.setSuccessCount(totalInserted);
result.setFailureCount(dataList.size() - totalInserted);
result.setSuccessRate((double) totalInserted / dataList.size());
result.setDuration(System.currentTimeMillis() - startTime);
result.setThroughput((double) dataList.size() / result.getDuration() * 1000);

log.info("MyBatis批量插入完成: 总数={}, 成功={}, 失败={}, 耗时={}ms, 吞吐量={}/s",
dataList.size(), totalInserted, dataList.size() - totalInserted,
result.getDuration(), result.getThroughput());

return result;

} catch (Exception e) {
log.error("MyBatis批量插入失败", e);
throw new BusinessException("MyBatis批量插入失败: " + e.getMessage());
}
}

/**
* 使用SqlSession批量插入
*/
public BatchInsertResult batchInsertWithSqlSession(List<DataEntity> dataList, BatchInsertConfig config) {
long startTime = System.currentTimeMillis();

try (SqlSession sqlSession = sqlSessionFactory.openSession(ExecutorType.BATCH)) {
BatchInsertMapper mapper = sqlSession.getMapper(BatchInsertMapper.class);

int totalInserted = 0;
int batchCount = 0;

// 分批处理
for (int i = 0; i < dataList.size(); i += config.getBatchSize()) {
int endIndex = Math.min(i + config.getBatchSize(), dataList.size());
List<DataEntity> batch = dataList.subList(i, endIndex);

mapper.batchInsert(batch);
batchCount++;

// 每1000条提交一次
if (batchCount % 1000 == 0) {
sqlSession.commit();
totalInserted += batch.size();
log.debug("已提交批次: {}, 已插入: {}", batchCount, totalInserted);
}
}

// 提交剩余数据
sqlSession.commit();
totalInserted += dataList.size() - totalInserted;

BatchInsertResult result = new BatchInsertResult();
result.setTotalCount(dataList.size());
result.setSuccessCount(totalInserted);
result.setFailureCount(dataList.size() - totalInserted);
result.setSuccessRate((double) totalInserted / dataList.size());
result.setDuration(System.currentTimeMillis() - startTime);
result.setThroughput((double) dataList.size() / result.getDuration() * 1000);

log.info("SqlSession批量插入完成: 总数={}, 成功={}, 失败={}, 耗时={}ms, 吞吐量={}/s",
dataList.size(), totalInserted, dataList.size() - totalInserted,
result.getDuration(), result.getThroughput());

return result;

} catch (Exception e) {
log.error("SqlSession批量插入失败", e);
throw new BusinessException("SqlSession批量插入失败: " + e.getMessage());
}
}

/**
* 数据分片
*/
private List<List<DataEntity>> splitIntoBatches(List<DataEntity> dataList, int batchSize) {
List<List<DataEntity>> batches = new ArrayList<>();

for (int i = 0; i < dataList.size(); i += batchSize) {
int endIndex = Math.min(i + batchSize, dataList.size());
List<DataEntity> batch = dataList.subList(i, endIndex);
batches.add(new ArrayList<>(batch));
}

return batches;
}
}

四、性能监控与优化

4.1 性能监控服务

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
@Service
@Slf4j
public class PerformanceMonitoringService {

@Autowired
private MeterRegistry meterRegistry;

@Autowired
private ThreadPoolExecutor batchInsertExecutor;

/**
* 记录批量插入性能指标
*/
public void recordBatchInsertMetrics(BatchInsertResult result) {
try {
// 1. 记录总数量
Counter.builder("batch.insert.total.count")
.register(meterRegistry)
.increment(result.getTotalCount());

// 2. 记录成功数量
Counter.builder("batch.insert.success.count")
.register(meterRegistry)
.increment(result.getSuccessCount());

// 3. 记录失败数量
Counter.builder("batch.insert.failure.count")
.register(meterRegistry)
.increment(result.getFailureCount());

// 4. 记录处理时间
Timer.builder("batch.insert.duration")
.register(meterRegistry)
.record(result.getDuration(), TimeUnit.MILLISECONDS);

// 5. 记录吞吐量
Gauge.builder("batch.insert.throughput")
.register(meterRegistry, result, BatchInsertResult::getThroughput);

// 6. 记录成功率
Gauge.builder("batch.insert.success.rate")
.register(meterRegistry, result, BatchInsertResult::getSuccessRate);

} catch (Exception e) {
log.error("记录批量插入性能指标失败", e);
}
}

/**
* 监控线程池状态
*/
@Scheduled(fixedDelay = 30000) // 30秒监控一次
public void monitorThreadPoolStatus() {
try {
// 1. 记录线程池指标
Gauge.builder("thread.pool.core.size")
.register(meterRegistry, batchInsertExecutor, ThreadPoolExecutor::getCorePoolSize);

Gauge.builder("thread.pool.max.size")
.register(meterRegistry, batchInsertExecutor, ThreadPoolExecutor::getMaximumPoolSize);

Gauge.builder("thread.pool.active.size")
.register(meterRegistry, batchInsertExecutor, ThreadPoolExecutor::getActiveCount);

Gauge.builder("thread.pool.queue.size")
.register(meterRegistry, batchInsertExecutor, executor -> executor.getQueue().size());

Gauge.builder("thread.pool.completed.tasks")
.register(meterRegistry, batchInsertExecutor, ThreadPoolExecutor::getCompletedTaskCount);

// 2. 检查线程池健康状态
if (batchInsertExecutor.getQueue().size() > 800) {
log.warn("线程池队列接近满载: queueSize={}", batchInsertExecutor.getQueue().size());
}

if (batchInsertExecutor.getActiveCount() == batchInsertExecutor.getMaximumPoolSize()) {
log.warn("线程池线程全部活跃: activeCount={}", batchInsertExecutor.getActiveCount());
}

} catch (Exception e) {
log.error("监控线程池状态失败", e);
}
}

/**
* 监控数据库连接池状态
*/
@Scheduled(fixedDelay = 60000) // 1分钟监控一次
public void monitorDataSourceStatus() {
try {
HikariDataSource dataSource = (HikariDataSource) dataSource;
HikariPoolMXBean poolBean = dataSource.getHikariPoolMXBean();

// 1. 记录连接池指标
Gauge.builder("datasource.pool.total.connections")
.register(meterRegistry, poolBean, HikariPoolMXBean::getTotalConnections);

Gauge.builder("datasource.pool.active.connections")
.register(meterRegistry, poolBean, HikariPoolMXBean::getActiveConnections);

Gauge.builder("datasource.pool.idle.connections")
.register(meterRegistry, poolBean, HikariPoolMXBean::getIdleConnections);

Gauge.builder("datasource.pool.threads.awaiting.connection")
.register(meterRegistry, poolBean, HikariPoolMXBean::getThreadsAwaitingConnection);

// 2. 检查连接池健康状态
if (poolBean.getThreadsAwaitingConnection() > 10) {
log.warn("数据库连接池等待线程过多: waitingThreads={}",
poolBean.getThreadsAwaitingConnection());
}

if (poolBean.getActiveConnections() == poolBean.getTotalConnections()) {
log.warn("数据库连接池连接全部活跃: activeConnections={}",
poolBean.getActiveConnections());
}

} catch (Exception e) {
log.error("监控数据库连接池状态失败", e);
}
}

/**
* 性能报告
*/
public PerformanceReport generatePerformanceReport() {
PerformanceReport report = new PerformanceReport();
report.setTimestamp(LocalDateTime.now());

// 1. 线程池状态
ThreadPoolStatus threadPoolStatus = new ThreadPoolStatus();
threadPoolStatus.setCorePoolSize(batchInsertExecutor.getCorePoolSize());
threadPoolStatus.setMaxPoolSize(batchInsertExecutor.getMaximumPoolSize());
threadPoolStatus.setActiveCount(batchInsertExecutor.getActiveCount());
threadPoolStatus.setQueueSize(batchInsertExecutor.getQueue().size());
threadPoolStatus.setCompletedTaskCount(batchInsertExecutor.getCompletedTaskCount());
report.setThreadPoolStatus(threadPoolStatus);

// 2. 数据库连接池状态
try {
HikariDataSource dataSource = (HikariDataSource) dataSource;
HikariPoolMXBean poolBean = dataSource.getHikariPoolMXBean();

DataSourceStatus dataSourceStatus = new DataSourceStatus();
dataSourceStatus.setTotalConnections(poolBean.getTotalConnections());
dataSourceStatus.setActiveConnections(poolBean.getActiveConnections());
dataSourceStatus.setIdleConnections(poolBean.getIdleConnections());
dataSourceStatus.setThreadsAwaitingConnection(poolBean.getThreadsAwaitingConnection());
report.setDataSourceStatus(dataSourceStatus);

} catch (Exception e) {
log.error("获取数据库连接池状态失败", e);
}

return report;
}
}

4.2 性能优化策略

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
@Component
@Slf4j
public class PerformanceOptimizationService {

@Autowired
private DataSource dataSource;

/**
* 数据库性能优化
*/
public void optimizeDatabasePerformance() {
try {
// 1. 禁用自动提交
optimizeAutoCommit();

// 2. 优化批量插入
optimizeBatchInsert();

// 3. 优化索引
optimizeIndexes();

// 4. 优化查询缓存
optimizeQueryCache();

log.info("数据库性能优化完成");

} catch (Exception e) {
log.error("数据库性能优化失败", e);
}
}

/**
* 优化自动提交
*/
private void optimizeAutoCommit() {
try (Connection connection = dataSource.getConnection()) {
// 禁用自动提交以提高批量插入性能
connection.setAutoCommit(false);

// 设置事务隔离级别
connection.setTransactionIsolation(Connection.TRANSACTION_READ_COMMITTED);

} catch (SQLException e) {
log.error("优化自动提交失败", e);
}
}

/**
* 优化批量插入
*/
private void optimizeBatchInsert() {
try (Connection connection = dataSource.getConnection()) {
// 设置批量插入相关参数
Statement statement = connection.createStatement();

// 禁用外键检查
statement.execute("SET FOREIGN_KEY_CHECKS = 0");

// 禁用唯一性检查
statement.execute("SET UNIQUE_CHECKS = 0");

// 禁用自动提交
statement.execute("SET AUTOCOMMIT = 0");

// 设置批量插入大小
statement.execute("SET SESSION bulk_insert_buffer_size = 256*1024*1024");

statement.close();

} catch (SQLException e) {
log.error("优化批量插入失败", e);
}
}

/**
* 优化索引
*/
private void optimizeIndexes() {
try (Connection connection = dataSource.getConnection()) {
Statement statement = connection.createStatement();

// 禁用索引更新
statement.execute("ALTER TABLE data_table DISABLE KEYS");

statement.close();

} catch (SQLException e) {
log.error("优化索引失败", e);
}
}

/**
* 恢复索引
*/
public void restoreIndexes() {
try (Connection connection = dataSource.getConnection()) {
Statement statement = connection.createStatement();

// 重新启用索引
statement.execute("ALTER TABLE data_table ENABLE KEYS");

// 重新启用外键检查
statement.execute("SET FOREIGN_KEY_CHECKS = 1");

// 重新启用唯一性检查
statement.execute("SET UNIQUE_CHECKS = 1");

statement.close();

} catch (SQLException e) {
log.error("恢复索引失败", e);
}
}

/**
* 优化查询缓存
*/
private void optimizeQueryCache() {
try (Connection connection = dataSource.getConnection()) {
Statement statement = connection.createStatement();

// 设置查询缓存大小
statement.execute("SET GLOBAL query_cache_size = 128*1024*1024");

// 设置查询缓存类型
statement.execute("SET GLOBAL query_cache_type = ON");

statement.close();

} catch (SQLException e) {
log.error("优化查询缓存失败", e);
}
}

/**
* 动态调整线程池参数
*/
public void adjustThreadPoolParameters() {
try {
// 1. 获取当前系统负载
double systemLoad = getSystemLoad();

// 2. 根据负载调整线程池参数
if (systemLoad > 0.8) {
// 高负载时减少线程数
batchInsertExecutor.setCorePoolSize(8);
batchInsertExecutor.setMaximumPoolSize(16);
log.info("系统负载较高,减少线程池大小");
} else if (systemLoad < 0.3) {
// 低负载时增加线程数
batchInsertExecutor.setCorePoolSize(24);
batchInsertExecutor.setMaximumPoolSize(48);
log.info("系统负载较低,增加线程池大小");
}

} catch (Exception e) {
log.error("调整线程池参数失败", e);
}
}

/**
* 获取系统负载
*/
private double getSystemLoad() {
try {
OperatingSystemMXBean osBean = ManagementFactory.getOperatingSystemMXBean();
return osBean.getSystemLoadAverage();
} catch (Exception e) {
log.error("获取系统负载失败", e);
return 0.5; // 默认中等负载
}
}
}

五、异常处理与重试机制

5.1 异常处理服务

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
@Service
@Slf4j
public class ExceptionHandlingService {

@Autowired
private RedisTemplate<String, Object> redisTemplate;

/**
* 处理批量插入异常
*/
public void handleBatchInsertException(Exception e, List<DataEntity> failedData) {
try {
// 1. 记录异常日志
log.error("批量插入异常: 失败数量={}", failedData.size(), e);

// 2. 保存失败数据
saveFailedData(failedData);

// 3. 发送告警
sendExceptionAlert(e, failedData.size());

// 4. 记录异常统计
recordExceptionStatistics(e.getClass().getSimpleName());

} catch (Exception handlingException) {
log.error("处理批量插入异常失败", handlingException);
}
}

/**
* 保存失败数据
*/
private void saveFailedData(List<DataEntity> failedData) {
try {
String key = "failed:data:" + System.currentTimeMillis();
redisTemplate.opsForValue().set(key, failedData, Duration.ofHours(24));

log.info("失败数据已保存: key={}, size={}", key, failedData.size());

} catch (Exception e) {
log.error("保存失败数据失败", e);
}
}

/**
* 发送异常告警
*/
private void sendExceptionAlert(Exception e, int failedCount) {
try {
ExceptionAlert alert = new ExceptionAlert();
alert.setExceptionType(e.getClass().getSimpleName());
alert.setErrorMessage(e.getMessage());
alert.setFailedCount(failedCount);
alert.setTimestamp(LocalDateTime.now());

// 发送告警通知
log.warn("发送异常告警: {}", alert);

} catch (Exception alertException) {
log.error("发送异常告警失败", alertException);
}
}

/**
* 记录异常统计
*/
private void recordExceptionStatistics(String exceptionType) {
try {
String key = "exception:count:" + exceptionType + ":" + LocalDate.now().toString();
redisTemplate.opsForValue().increment(key);
redisTemplate.expire(key, Duration.ofDays(7));

} catch (Exception e) {
log.error("记录异常统计失败", e);
}
}

/**
* 重试机制
*/
@Retryable(value = {SQLException.class, DataAccessException.class},
maxAttempts = 3,
backoff = @Backoff(delay = 1000, multiplier = 2))
public BatchInsertResult retryBatchInsert(List<DataEntity> dataList, BatchInsertConfig config) {
try {
return batchInsertService.batchInsert(dataList, config);
} catch (Exception e) {
log.warn("批量插入重试: attempt={}, error={}",
RetrySynchronizationManager.getContext().getRetryCount(), e.getMessage());
throw e;
}
}

/**
* 重试失败后的处理
*/
@Recover
public BatchInsertResult recoverBatchInsert(Exception e, List<DataEntity> dataList, BatchInsertConfig config) {
log.error("批量插入重试失败,执行恢复策略: error={}", e.getMessage());

// 1. 保存失败数据
saveFailedData(dataList);

// 2. 返回失败结果
BatchInsertResult result = new BatchInsertResult();
result.setTotalCount(dataList.size());
result.setSuccessCount(0);
result.setFailureCount(dataList.size());
result.setSuccessRate(0.0);
result.setDuration(0);
result.setThroughput(0.0);
result.setExceptions(Arrays.asList(e));

return result;
}
}

六、最佳实践总结

6.1 性能优化最佳实践

  1. 线程池配置:根据CPU核数和数据库连接数合理配置
  2. 批量大小:根据数据库性能调整批量大小,通常1000-5000条
  3. 事务控制:合理控制事务边界,避免长事务
  4. 连接池优化:配置合适的连接池参数

6.2 异常处理最佳实践

  1. 异常分类:区分可重试和不可重试的异常
  2. 重试策略:实现指数退避重试机制
  3. 失败处理:保存失败数据,支持后续处理
  4. 监控告警:完善的异常监控和告警机制

6.3 数据一致性最佳实践

  1. 事务管理:使用合适的事务隔离级别
  2. 并发控制:避免数据竞争和死锁
  3. 数据验证:插入前进行数据验证
  4. 回滚机制:异常时能够正确回滚

七、总结

多线程事务3秒插入百万数据是一个复杂的性能优化问题,需要从多个维度进行优化:

  1. 架构设计:合理的多线程架构和任务分片策略
  2. 数据库优化:连接池配置、批量插入、索引优化
  3. 性能监控:完善的性能指标监控和告警
  4. 异常处理:健壮的异常处理和重试机制

通过本文的实践指导,读者可以构建一个高性能的数据导入系统,实现3秒插入百万数据的目标。

关键要点:

  1. 多线程架构:合理的线程池配置和任务分片
  2. 批量处理:高效的批量插入和事务管理
  3. 性能优化:数据库参数优化和索引管理
  4. 监控告警:完善的性能监控和异常处理
  5. 数据一致性:保证数据的一致性和完整性

通过本文的实践指导,读者可以快速搭建企业级的高性能数据导入系统,为大数据处理提供强有力的技术支撑。