第242集SpringBoot大批量任务处理架构实战:分批处理、异步执行、进度监控的企业级解决方案

前言

在大型企业级应用中,大批量任务处理是一个常见且重要的需求场景。无论是数据迁移、批量导入、报表生成还是系统维护,都需要处理大量数据或执行大量操作。传统的同步处理方式往往会导致系统资源耗尽、响应超时等问题。基于SpringBoot的大批量任务处理架构,不仅能够实现高效的分批处理,还能提供异步执行、进度监控、失败重试等企业级功能。随着数据量的不断增长和业务复杂度的提升,构建可扩展、可监控的批量任务处理框架,已成为企业级架构师必须掌握的核心技能。

本文将深入探讨SpringBoot中大批量任务处理的架构设计与实战应用,从分批处理到异步执行,从进度监控到性能优化,为企业构建稳定、高效的批量任务处理解决方案提供全面的技术指导。

一、SpringBoot大批量任务处理架构概述与核心原理

1.1 大批量任务处理架构设计

SpringBoot大批量任务处理系统采用分层架构设计,通过分批处理、异步执行、进度监控等技术,实现高效的批量任务管理能力。

1.2 核心组件架构

二、企业级大批量任务管理器设计

2.1 大批量任务核心管理器

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
/**
* 企业级大批量任务管理器
* 提供完整的批量任务处理能力
*/
@Component
@Slf4j
public class BatchTaskManager {

private final BatchProcessor batchProcessor;
private final AsyncExecutor asyncExecutor;
private final ProgressMonitor progressMonitor;
private final ResultCollector resultCollector;
private final TaskScheduler taskScheduler;

private final Map<String, BatchTaskContext> taskContexts = new ConcurrentHashMap<>();
private final AtomicLong totalTaskCount = new AtomicLong(0);
private final AtomicLong completedTaskCount = new AtomicLong(0);
private final AtomicLong failedTaskCount = new AtomicLong(0);

/**
* 执行大批量任务
*/
public <T, R> BatchTaskResult<R> executeBatchTask(BatchTaskRequest<T> request) {
try {
String taskId = generateTaskId();
totalTaskCount.incrementAndGet();

// 创建任务上下文
BatchTaskContext context = createTaskContext(taskId, request);
taskContexts.put(taskId, context);

// 记录任务开始
progressMonitor.recordTaskStart(context);

// 执行批量处理
BatchTaskResult<R> result = executeBatchProcessing(context);

// 更新任务状态
context.setStatus(TaskStatus.COMPLETED);
context.setEndTime(System.currentTimeMillis());
context.setResult(result);

// 记录任务完成
progressMonitor.recordTaskComplete(context);
completedTaskCount.incrementAndGet();

log.info("大批量任务执行完成: {} - 处理数量: {}, 成功数量: {}, 失败数量: {}",
taskId, result.getTotalCount(), result.getSuccessCount(), result.getFailureCount());

return result;

} catch (Exception e) {
log.error("大批量任务执行失败", e);
failedTaskCount.incrementAndGet();
throw new BatchTaskException("大批量任务执行失败: " + e.getMessage());
}
}

/**
* 异步执行大批量任务
*/
public <T, R> CompletableFuture<BatchTaskResult<R>> executeBatchTaskAsync(BatchTaskRequest<T> request) {
try {
String taskId = generateTaskId();
totalTaskCount.incrementAndGet();

// 创建任务上下文
BatchTaskContext context = createTaskContext(taskId, request);
taskContexts.put(taskId, context);

// 记录任务开始
progressMonitor.recordTaskStart(context);

// 异步执行批量处理
CompletableFuture<BatchTaskResult<R>> future = asyncExecutor.executeAsync(() -> {
try {
BatchTaskResult<R> result = executeBatchProcessing(context);

// 更新任务状态
context.setStatus(TaskStatus.COMPLETED);
context.setEndTime(System.currentTimeMillis());
context.setResult(result);

// 记录任务完成
progressMonitor.recordTaskComplete(context);
completedTaskCount.incrementAndGet();

return result;

} catch (Exception e) {
// 更新任务状态
context.setStatus(TaskStatus.FAILED);
context.setEndTime(System.currentTimeMillis());
context.setException(e);

// 记录任务失败
progressMonitor.recordTaskFailed(context);
failedTaskCount.incrementAndGet();

throw new BatchTaskException("异步大批量任务执行失败: " + e.getMessage(), e);
}
});

return future;

} catch (Exception e) {
log.error("异步大批量任务执行失败", e);
failedTaskCount.incrementAndGet();
throw new BatchTaskException("异步大批量任务执行失败: " + e.getMessage());
}
}

/**
* 执行批量处理
*/
private <T, R> BatchTaskResult<R> executeBatchProcessing(BatchTaskContext context) {
try {
BatchTaskRequest<T> request = context.getRequest();

// 1. 数据分批
List<List<T>> batches = batchProcessor.splitData(request.getData(), request.getBatchSize());
context.setTotalBatches(batches.size());

// 2. 执行分批处理
List<BatchResult<R>> batchResults = new ArrayList<>();

for (int i = 0; i < batches.size(); i++) {
List<T> batch = batches.get(i);

try {
// 处理单个批次
BatchResult<R> batchResult = batchProcessor.processBatch(batch, request.getProcessor());

// 更新进度
context.setProcessedBatches(i + 1);
progressMonitor.updateProgress(context);

batchResults.add(batchResult);

} catch (Exception e) {
log.error("批次处理失败: {} - 批次: {}", context.getTaskId(), i, e);

// 创建失败的批次结果
BatchResult<R> failedResult = BatchResult.<R>builder()
.batchIndex(i)
.successCount(0)
.failureCount(batch.size())
.exception(e)
.build();

batchResults.add(failedResult);
}
}

// 3. 收集结果
BatchTaskResult<R> result = resultCollector.collectResults(batchResults);

return result;

} catch (Exception e) {
log.error("批量处理执行失败: {}", context.getTaskId(), e);
throw new BatchTaskException("批量处理执行失败: " + e.getMessage(), e);
}
}

/**
* 创建任务上下文
*/
private <T> BatchTaskContext createTaskContext(String taskId, BatchTaskRequest<T> request) {
return BatchTaskContext.builder()
.taskId(taskId)
.request(request)
.status(TaskStatus.RUNNING)
.startTime(System.currentTimeMillis())
.totalBatches(0)
.processedBatches(0)
.build();
}

/**
* 生成任务ID
*/
private String generateTaskId() {
return "BATCH_" + System.currentTimeMillis() + "_" + UUID.randomUUID().toString().substring(0, 8);
}

/**
* 获取任务状态
*/
public TaskStatus getTaskStatus(String taskId) {
BatchTaskContext context = taskContexts.get(taskId);
return context != null ? context.getStatus() : null;
}

/**
* 获取任务进度
*/
public TaskProgress getTaskProgress(String taskId) {
BatchTaskContext context = taskContexts.get(taskId);
if (context == null) {
return null;
}

return TaskProgress.builder()
.taskId(taskId)
.status(context.getStatus())
.totalBatches(context.getTotalBatches())
.processedBatches(context.getProcessedBatches())
.progressPercentage(context.getTotalBatches() > 0 ?
(double) context.getProcessedBatches() / context.getTotalBatches() * 100 : 0)
.startTime(context.getStartTime())
.endTime(context.getEndTime())
.build();
}

/**
* 取消任务
*/
public boolean cancelTask(String taskId) {
try {
BatchTaskContext context = taskContexts.get(taskId);
if (context != null && context.getStatus() == TaskStatus.RUNNING) {
context.setStatus(TaskStatus.CANCELLED);
context.setEndTime(System.currentTimeMillis());

// 记录任务取消
progressMonitor.recordTaskCancelled(context);

log.info("任务取消成功: {}", taskId);
return true;
}

return false;

} catch (Exception e) {
log.error("取消任务失败: {}", taskId, e);
return false;
}
}

/**
* 获取任务统计信息
*/
public BatchTaskStatistics getStatistics() {
try {
return BatchTaskStatistics.builder()
.totalTaskCount(totalTaskCount.get())
.completedTaskCount(completedTaskCount.get())
.failedTaskCount(failedTaskCount.get())
.runningTaskCount(taskContexts.values().stream()
.mapToLong(ctx -> ctx.getStatus() == TaskStatus.RUNNING ? 1 : 0)
.sum())
.cancelledTaskCount(taskContexts.values().stream()
.mapToLong(ctx -> ctx.getStatus() == TaskStatus.CANCELLED ? 1 : 0)
.sum())
.build();

} catch (Exception e) {
log.error("获取任务统计信息失败", e);
throw new BatchTaskException("获取任务统计信息失败: " + e.getMessage());
}
}

/**
* 清理已完成的任务
*/
public void cleanupCompletedTasks() {
try {
taskContexts.entrySet().removeIf(entry -> {
TaskStatus status = entry.getValue().getStatus();
return status == TaskStatus.COMPLETED || status == TaskStatus.FAILED || status == TaskStatus.CANCELLED;
});

log.info("清理已完成的任务完成");

} catch (Exception e) {
log.error("清理已完成的任务失败", e);
}
}
}

2.2 分批处理器

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
/**
* 分批处理器
* 负责将大批量数据分割成小批次进行处理
*/
@Component
@Slf4j
public class BatchProcessor {

private final DataSplitter dataSplitter;
private final ChunkProcessor chunkProcessor;
private final BatchValidator batchValidator;

private final AtomicLong totalBatchesProcessed = new AtomicLong(0);
private final AtomicLong totalItemsProcessed = new AtomicLong(0);
private final AtomicLong totalProcessingTime = new AtomicLong(0);

/**
* 分割数据
*/
public <T> List<List<T>> splitData(List<T> data, int batchSize) {
try {
if (data == null || data.isEmpty()) {
return new ArrayList<>();
}

if (batchSize <= 0) {
batchSize = 100; // 默认批次大小
}

List<List<T>> batches = dataSplitter.split(data, batchSize);

log.debug("数据分割完成: 总数量={}, 批次大小={}, 批次数量={}",
data.size(), batchSize, batches.size());

return batches;

} catch (Exception e) {
log.error("数据分割失败", e);
throw new BatchProcessorException("数据分割失败: " + e.getMessage());
}
}

/**
* 处理批次
*/
public <T, R> BatchResult<R> processBatch(List<T> batch, BatchItemProcessor<T, R> processor) {
try {
long startTime = System.currentTimeMillis();

// 验证批次
if (!batchValidator.validate(batch)) {
throw new BatchProcessorException("批次验证失败");
}

// 处理批次
List<R> results = new ArrayList<>();
List<Exception> exceptions = new ArrayList<>();
int successCount = 0;
int failureCount = 0;

for (int i = 0; i < batch.size(); i++) {
T item = batch.get(i);

try {
R result = processor.process(item);
results.add(result);
successCount++;

} catch (Exception e) {
log.error("处理单个项目失败: 批次索引={}, 项目索引={}", batch.hashCode(), i, e);
exceptions.add(e);
failureCount++;
}
}

long endTime = System.currentTimeMillis();
long processingTime = endTime - startTime;

// 更新统计信息
totalBatchesProcessed.incrementAndGet();
totalItemsProcessed.addAndGet(batch.size());
totalProcessingTime.addAndGet(processingTime);

BatchResult<R> result = BatchResult.<R>builder()
.batchIndex(batch.hashCode())
.successCount(successCount)
.failureCount(failureCount)
.results(results)
.exceptions(exceptions)
.processingTime(processingTime)
.build();

log.debug("批次处理完成: 批次大小={}, 成功数量={}, 失败数量={}, 处理时间={}ms",
batch.size(), successCount, failureCount, processingTime);

return result;

} catch (Exception e) {
log.error("批次处理失败", e);
throw new BatchProcessorException("批次处理失败: " + e.getMessage(), e);
}
}

/**
* 并行处理批次
*/
public <T, R> List<BatchResult<R>> processBatchesParallel(List<List<T>> batches,
BatchItemProcessor<T, R> processor, int parallelism) {
try {
if (batches == null || batches.isEmpty()) {
return new ArrayList<>();
}

if (parallelism <= 0) {
parallelism = Runtime.getRuntime().availableProcessors();
}

ExecutorService executor = Executors.newFixedThreadPool(parallelism);

try {
List<CompletableFuture<BatchResult<R>>> futures = batches.stream()
.map(batch -> CompletableFuture.supplyAsync(() ->
processBatch(batch, processor), executor))
.collect(Collectors.toList());

List<BatchResult<R>> results = futures.stream()
.map(CompletableFuture::join)
.collect(Collectors.toList());

log.info("并行批次处理完成: 批次数量={}, 并行度={}", batches.size(), parallelism);

return results;

} finally {
executor.shutdown();
}

} catch (Exception e) {
log.error("并行批次处理失败", e);
throw new BatchProcessorException("并行批次处理失败: " + e.getMessage(), e);
}
}

/**
* 流水线处理批次
*/
public <T, R> List<BatchResult<R>> processBatchesPipeline(List<List<T>> batches,
BatchItemProcessor<T, R> processor, int pipelineSize) {
try {
if (batches == null || batches.isEmpty()) {
return new ArrayList<>();
}

if (pipelineSize <= 0) {
pipelineSize = 3; // 默认流水线大小
}

List<BatchResult<R>> results = new ArrayList<>();
BlockingQueue<List<T>> inputQueue = new LinkedBlockingQueue<>(pipelineSize);
BlockingQueue<BatchResult<R>> outputQueue = new LinkedBlockingQueue<>(pipelineSize);

// 启动生产者线程
Thread producer = new Thread(() -> {
try {
for (List<T> batch : batches) {
inputQueue.put(batch);
}
inputQueue.put(new ArrayList<>()); // 结束标记
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
});

// 启动消费者线程
Thread consumer = new Thread(() -> {
try {
List<T> batch;
while ((batch = inputQueue.take()) != null && !batch.isEmpty()) {
BatchResult<R> result = processBatch(batch, processor);
outputQueue.put(result);
}
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
});

producer.start();
consumer.start();

// 收集结果
BatchResult<R> result;
while ((result = outputQueue.take()) != null) {
results.add(result);
}

producer.join();
consumer.join();

log.info("流水线批次处理完成: 批次数量={}, 流水线大小={}", batches.size(), pipelineSize);

return results;

} catch (Exception e) {
log.error("流水线批次处理失败", e);
throw new BatchProcessorException("流水线批次处理失败: " + e.getMessage(), e);
}
}

/**
* 获取处理统计信息
*/
public BatchProcessingStatistics getStatistics() {
try {
long totalBatches = totalBatchesProcessed.get();
long totalItems = totalItemsProcessed.get();
long totalTime = totalProcessingTime.get();

return BatchProcessingStatistics.builder()
.totalBatchesProcessed(totalBatches)
.totalItemsProcessed(totalItems)
.totalProcessingTime(totalTime)
.averageBatchSize(totalBatches > 0 ? (double) totalItems / totalBatches : 0)
.averageProcessingTime(totalBatches > 0 ? (double) totalTime / totalBatches : 0)
.build();

} catch (Exception e) {
log.error("获取处理统计信息失败", e);
throw new BatchProcessorException("获取处理统计信息失败: " + e.getMessage());
}
}
}

2.3 异步执行器

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
/**
* 异步执行器
* 负责异步执行大批量任务
*/
@Component
@Slf4j
public class AsyncExecutor {

private final ThreadPoolManager threadPoolManager;
private final TaskScheduler taskScheduler;
private final RetryManager retryManager;

private final Map<String, CompletableFuture<?>> runningTasks = new ConcurrentHashMap<>();
private final AtomicLong totalAsyncTasks = new AtomicLong(0);
private final AtomicLong completedAsyncTasks = new AtomicLong(0);
private final AtomicLong failedAsyncTasks = new AtomicLong(0);

/**
* 异步执行任务
*/
public <T> CompletableFuture<T> executeAsync(Callable<T> task) {
try {
totalAsyncTasks.incrementAndGet();

CompletableFuture<T> future = CompletableFuture.supplyAsync(() -> {
try {
T result = task.call();
completedAsyncTasks.incrementAndGet();
return result;
} catch (Exception e) {
failedAsyncTasks.incrementAndGet();
throw new RuntimeException("异步任务执行失败: " + e.getMessage(), e);
}
}, threadPoolManager.getDefaultExecutor());

return future;

} catch (Exception e) {
log.error("创建异步任务失败", e);
throw new AsyncExecutorException("创建异步任务失败: " + e.getMessage());
}
}

/**
* 异步执行任务(带超时)
*/
public <T> CompletableFuture<T> executeAsyncWithTimeout(Callable<T> task, long timeout, TimeUnit timeUnit) {
try {
totalAsyncTasks.incrementAndGet();

CompletableFuture<T> future = CompletableFuture.supplyAsync(() -> {
try {
T result = task.call();
completedAsyncTasks.incrementAndGet();
return result;
} catch (Exception e) {
failedAsyncTasks.incrementAndGet();
throw new RuntimeException("异步任务执行失败: " + e.getMessage(), e);
}
}, threadPoolManager.getDefaultExecutor());

// 设置超时
future.orTimeout(timeout, timeUnit);

return future;

} catch (Exception e) {
log.error("创建异步任务失败", e);
throw new AsyncExecutorException("创建异步任务失败: " + e.getMessage());
}
}

/**
* 异步执行任务(带重试)
*/
public <T> CompletableFuture<T> executeAsyncWithRetry(Callable<T> task, int maxRetries) {
try {
totalAsyncTasks.incrementAndGet();

CompletableFuture<T> future = CompletableFuture.supplyAsync(() -> {
return retryManager.executeWithRetry(task, maxRetries);
}, threadPoolManager.getDefaultExecutor());

future.whenComplete((result, throwable) -> {
if (throwable == null) {
completedAsyncTasks.incrementAndGet();
} else {
failedAsyncTasks.incrementAndGet();
}
});

return future;

} catch (Exception e) {
log.error("创建异步任务失败", e);
throw new AsyncExecutorException("创建异步任务失败: " + e.getMessage());
}
}

/**
* 批量异步执行任务
*/
public <T> CompletableFuture<List<T>> executeBatchAsync(List<Callable<T>> tasks) {
try {
totalAsyncTasks.addAndGet(tasks.size());

List<CompletableFuture<T>> futures = tasks.stream()
.map(task -> CompletableFuture.supplyAsync(() -> {
try {
T result = task.call();
completedAsyncTasks.incrementAndGet();
return result;
} catch (Exception e) {
failedAsyncTasks.incrementAndGet();
throw new RuntimeException("批量异步任务执行失败: " + e.getMessage(), e);
}
}, threadPoolManager.getDefaultExecutor()))
.collect(Collectors.toList());

CompletableFuture<List<T>> future = CompletableFuture.allOf(
futures.toArray(new CompletableFuture[0]))
.thenApply(v -> futures.stream()
.map(CompletableFuture::join)
.collect(Collectors.toList()));

return future;

} catch (Exception e) {
log.error("创建批量异步任务失败", e);
throw new AsyncExecutorException("创建批量异步任务失败: " + e.getMessage());
}
}

/**
* 延迟执行任务
*/
public <T> CompletableFuture<T> executeDelayed(Callable<T> task, long delay, TimeUnit timeUnit) {
try {
totalAsyncTasks.incrementAndGet();

CompletableFuture<T> future = new CompletableFuture<>();

taskScheduler.schedule(() -> {
try {
T result = task.call();
future.complete(result);
completedAsyncTasks.incrementAndGet();
} catch (Exception e) {
future.completeExceptionally(e);
failedAsyncTasks.incrementAndGet();
}
}, delay, timeUnit);

return future;

} catch (Exception e) {
log.error("创建延迟任务失败", e);
throw new AsyncExecutorException("创建延迟任务失败: " + e.getMessage());
}
}

/**
* 周期性执行任务
*/
public ScheduledFuture<?> executePeriodic(Runnable task, long initialDelay, long period, TimeUnit timeUnit) {
try {
return taskScheduler.scheduleAtFixedRate(() -> {
try {
task.run();
completedAsyncTasks.incrementAndGet();
} catch (Exception e) {
log.error("周期性任务执行失败", e);
failedAsyncTasks.incrementAndGet();
}
}, initialDelay, period, timeUnit);

} catch (Exception e) {
log.error("创建周期性任务失败", e);
throw new AsyncExecutorException("创建周期性任务失败: " + e.getMessage());
}
}

/**
* 获取异步执行统计信息
*/
public AsyncExecutionStatistics getStatistics() {
try {
long total = totalAsyncTasks.get();
long completed = completedAsyncTasks.get();
long failed = failedAsyncTasks.get();
double successRate = total > 0 ? (double) completed / total : 0;

return AsyncExecutionStatistics.builder()
.totalAsyncTasks(total)
.completedAsyncTasks(completed)
.failedAsyncTasks(failed)
.successRate(successRate)
.runningTasksCount(runningTasks.size())
.build();

} catch (Exception e) {
log.error("获取异步执行统计信息失败", e);
throw new AsyncExecutorException("获取异步执行统计信息失败: " + e.getMessage());
}
}

/**
* 取消所有运行中的任务
*/
public void cancelAllRunningTasks() {
try {
for (Map.Entry<String, CompletableFuture<?>> entry : runningTasks.entrySet()) {
String taskId = entry.getKey();
CompletableFuture<?> future = entry.getValue();

if (!future.isDone()) {
future.cancel(true);
log.info("取消任务: {}", taskId);
}
}

runningTasks.clear();

} catch (Exception e) {
log.error("取消所有运行中的任务失败", e);
}
}
}

三、进度监控与结果收集

3.1 进度监控器

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
/**
* 进度监控器
* 负责监控大批量任务的执行进度
*/
@Component
@Slf4j
public class ProgressMonitor {

private final Map<String, TaskProgress> taskProgressMap = new ConcurrentHashMap<>();
private final Map<String, TaskMetrics> taskMetricsMap = new ConcurrentHashMap<>();

private final ScheduledExecutorService scheduler = Executors.newScheduledThreadPool(2);
private final AlertManager alertManager;

/**
* 记录任务开始
*/
public void recordTaskStart(BatchTaskContext context) {
try {
TaskProgress progress = TaskProgress.builder()
.taskId(context.getTaskId())
.status(TaskStatus.RUNNING)
.totalBatches(context.getTotalBatches())
.processedBatches(context.getProcessedBatches())
.progressPercentage(0.0)
.startTime(context.getStartTime())
.build();

taskProgressMap.put(context.getTaskId(), progress);

TaskMetrics metrics = TaskMetrics.builder()
.taskId(context.getTaskId())
.startTime(context.getStartTime())
.lastUpdateTime(System.currentTimeMillis())
.build();

taskMetricsMap.put(context.getTaskId(), metrics);

log.info("任务开始记录: {} - 总批次: {}", context.getTaskId(), context.getTotalBatches());

} catch (Exception e) {
log.error("记录任务开始失败: {}", context.getTaskId(), e);
}
}

/**
* 更新任务进度
*/
public void updateProgress(BatchTaskContext context) {
try {
TaskProgress progress = taskProgressMap.get(context.getTaskId());
if (progress != null) {
progress.setProcessedBatches(context.getProcessedBatches());
progress.setProgressPercentage(context.getTotalBatches() > 0 ?
(double) context.getProcessedBatches() / context.getTotalBatches() * 100 : 0);
progress.setLastUpdateTime(System.currentTimeMillis());

// 更新指标
TaskMetrics metrics = taskMetricsMap.get(context.getTaskId());
if (metrics != null) {
metrics.setLastUpdateTime(System.currentTimeMillis());
metrics.setProcessedBatches(context.getProcessedBatches());
}

log.debug("任务进度更新: {} - 进度: {}%", context.getTaskId(), progress.getProgressPercentage());
}

} catch (Exception e) {
log.error("更新任务进度失败: {}", context.getTaskId(), e);
}
}

/**
* 记录任务完成
*/
public void recordTaskComplete(BatchTaskContext context) {
try {
TaskProgress progress = taskProgressMap.get(context.getTaskId());
if (progress != null) {
progress.setStatus(TaskStatus.COMPLETED);
progress.setEndTime(context.getEndTime());
progress.setProgressPercentage(100.0);
progress.setLastUpdateTime(System.currentTimeMillis());
}

TaskMetrics metrics = taskMetricsMap.get(context.getTaskId());
if (metrics != null) {
metrics.setEndTime(context.getEndTime());
metrics.setDuration(context.getEndTime() - context.getStartTime());
metrics.setLastUpdateTime(System.currentTimeMillis());
}

log.info("任务完成记录: {} - 执行时间: {}ms",
context.getTaskId(), context.getEndTime() - context.getStartTime());

} catch (Exception e) {
log.error("记录任务完成失败: {}", context.getTaskId(), e);
}
}

/**
* 记录任务失败
*/
public void recordTaskFailed(BatchTaskContext context) {
try {
TaskProgress progress = taskProgressMap.get(context.getTaskId());
if (progress != null) {
progress.setStatus(TaskStatus.FAILED);
progress.setEndTime(context.getEndTime());
progress.setLastUpdateTime(System.currentTimeMillis());
}

TaskMetrics metrics = taskMetricsMap.get(context.getTaskId());
if (metrics != null) {
metrics.setEndTime(context.getEndTime());
metrics.setDuration(context.getEndTime() - context.getStartTime());
metrics.setLastUpdateTime(System.currentTimeMillis());
}

// 发送告警
alertManager.sendAlert("任务执行失败",
"任务ID: " + context.getTaskId() + ", 异常: " + context.getException().getMessage());

log.error("任务失败记录: {} - 异常: {}", context.getTaskId(), context.getException().getMessage());

} catch (Exception e) {
log.error("记录任务失败失败: {}", context.getTaskId(), e);
}
}

/**
* 记录任务取消
*/
public void recordTaskCancelled(BatchTaskContext context) {
try {
TaskProgress progress = taskProgressMap.get(context.getTaskId());
if (progress != null) {
progress.setStatus(TaskStatus.CANCELLED);
progress.setEndTime(context.getEndTime());
progress.setLastUpdateTime(System.currentTimeMillis());
}

TaskMetrics metrics = taskMetricsMap.get(context.getTaskId());
if (metrics != null) {
metrics.setEndTime(context.getEndTime());
metrics.setDuration(context.getEndTime() - context.getStartTime());
metrics.setLastUpdateTime(System.currentTimeMillis());
}

log.info("任务取消记录: {}", context.getTaskId());

} catch (Exception e) {
log.error("记录任务取消失败: {}", context.getTaskId(), e);
}
}

/**
* 获取任务进度
*/
public TaskProgress getTaskProgress(String taskId) {
return taskProgressMap.get(taskId);
}

/**
* 获取所有任务进度
*/
public Map<String, TaskProgress> getAllTaskProgress() {
return new HashMap<>(taskProgressMap);
}

/**
* 获取任务指标
*/
public TaskMetrics getTaskMetrics(String taskId) {
return taskMetricsMap.get(taskId);
}

/**
* 获取所有任务指标
*/
public Map<String, TaskMetrics> getAllTaskMetrics() {
return new HashMap<>(taskMetricsMap);
}

/**
* 启动进度监控
*/
@PostConstruct
public void startMonitoring() {
scheduler.scheduleAtFixedRate(() -> {
try {
monitorTaskProgress();
} catch (Exception e) {
log.error("进度监控失败", e);
}
}, 0, 30, TimeUnit.SECONDS);
}

/**
* 监控任务进度
*/
private void monitorTaskProgress() {
try {
for (Map.Entry<String, TaskProgress> entry : taskProgressMap.entrySet()) {
String taskId = entry.getKey();
TaskProgress progress = entry.getValue();

// 检查长时间运行的任务
if (progress.getStatus() == TaskStatus.RUNNING) {
long runningTime = System.currentTimeMillis() - progress.getStartTime();
if (runningTime > 3600000) { // 1小时
alertManager.sendAlert("任务运行时间过长",
"任务ID: " + taskId + ", 运行时间: " + runningTime + "ms");
}
}

// 检查进度停滞的任务
long lastUpdateTime = progress.getLastUpdateTime();
if (lastUpdateTime > 0) {
long timeSinceLastUpdate = System.currentTimeMillis() - lastUpdateTime;
if (timeSinceLastUpdate > 300000) { // 5分钟
alertManager.sendAlert("任务进度停滞",
"任务ID: " + taskId + ", 停滞时间: " + timeSinceLastUpdate + "ms");
}
}
}

} catch (Exception e) {
log.error("监控任务进度失败", e);
}
}

/**
* 清理进度监控器
*/
@PreDestroy
public void cleanup() {
try {
scheduler.shutdown();
taskProgressMap.clear();
taskMetricsMap.clear();

log.info("进度监控器清理完成");

} catch (Exception e) {
log.error("进度监控器清理失败", e);
}
}
}

3.2 结果收集器

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
/**
* 结果收集器
* 负责收集和聚合大批量任务的处理结果
*/
@Component
@Slf4j
public class ResultCollector {

private final ResultAggregator resultAggregator;
private final ResultStorage resultStorage;
private final ResultAnalyzer resultAnalyzer;

private final AtomicLong totalResultsCollected = new AtomicLong(0);
private final AtomicLong totalSuccessResults = new AtomicLong(0);
private final AtomicLong totalFailureResults = new AtomicLong(0);

/**
* 收集结果
*/
public <R> BatchTaskResult<R> collectResults(List<BatchResult<R>> batchResults) {
try {
if (batchResults == null || batchResults.isEmpty()) {
return BatchTaskResult.<R>builder()
.totalCount(0)
.successCount(0)
.failureCount(0)
.results(new ArrayList<>())
.exceptions(new ArrayList<>())
.build();
}

// 聚合结果
BatchTaskResult<R> aggregatedResult = resultAggregator.aggregate(batchResults);

// 存储结果
resultStorage.storeResult(aggregatedResult);

// 分析结果
resultAnalyzer.analyzeResult(aggregatedResult);

// 更新统计信息
totalResultsCollected.incrementAndGet();
totalSuccessResults.addAndGet(aggregatedResult.getSuccessCount());
totalFailureResults.addAndGet(aggregatedResult.getFailureCount());

log.info("结果收集完成: 总数量={}, 成功数量={}, 失败数量={}",
aggregatedResult.getTotalCount(),
aggregatedResult.getSuccessCount(),
aggregatedResult.getFailureCount());

return aggregatedResult;

} catch (Exception e) {
log.error("结果收集失败", e);
throw new ResultCollectionException("结果收集失败: " + e.getMessage());
}
}

/**
* 收集部分结果
*/
public <R> PartialResult<R> collectPartialResults(List<BatchResult<R>> batchResults, int maxResults) {
try {
if (batchResults == null || batchResults.isEmpty()) {
return PartialResult.<R>builder()
.totalCount(0)
.successCount(0)
.failureCount(0)
.results(new ArrayList<>())
.exceptions(new ArrayList<>())
.hasMore(false)
.build();
}

// 聚合结果
BatchTaskResult<R> aggregatedResult = resultAggregator.aggregate(batchResults);

// 限制结果数量
List<R> limitedResults = aggregatedResult.getResults().stream()
.limit(maxResults)
.collect(Collectors.toList());

List<Exception> limitedExceptions = aggregatedResult.getExceptions().stream()
.limit(maxResults)
.collect(Collectors.toList());

boolean hasMore = aggregatedResult.getResults().size() > maxResults ||
aggregatedResult.getExceptions().size() > maxResults;

PartialResult<R> partialResult = PartialResult.<R>builder()
.totalCount(aggregatedResult.getTotalCount())
.successCount(aggregatedResult.getSuccessCount())
.failureCount(aggregatedResult.getFailureCount())
.results(limitedResults)
.exceptions(limitedExceptions)
.hasMore(hasMore)
.build();

log.info("部分结果收集完成: 总数量={}, 返回数量={}, 还有更多={}",
partialResult.getTotalCount(),
partialResult.getResults().size(),
partialResult.isHasMore());

return partialResult;

} catch (Exception e) {
log.error("部分结果收集失败", e);
throw new ResultCollectionException("部分结果收集失败: " + e.getMessage());
}
}

/**
* 收集统计结果
*/
public <R> StatisticsResult collectStatistics(List<BatchResult<R>> batchResults) {
try {
if (batchResults == null || batchResults.isEmpty()) {
return StatisticsResult.builder()
.totalCount(0)
.successCount(0)
.failureCount(0)
.successRate(0.0)
.averageProcessingTime(0.0)
.totalProcessingTime(0)
.build();
}

// 计算统计信息
int totalCount = batchResults.stream()
.mapToInt(result -> result.getSuccessCount() + result.getFailureCount())
.sum();

int successCount = batchResults.stream()
.mapToInt(BatchResult::getSuccessCount)
.sum();

int failureCount = batchResults.stream()
.mapToInt(BatchResult::getFailureCount)
.sum();

double successRate = totalCount > 0 ? (double) successCount / totalCount : 0;

long totalProcessingTime = batchResults.stream()
.mapToLong(BatchResult::getProcessingTime)
.sum();

double averageProcessingTime = batchResults.size() > 0 ?
(double) totalProcessingTime / batchResults.size() : 0;

StatisticsResult statisticsResult = StatisticsResult.builder()
.totalCount(totalCount)
.successCount(successCount)
.failureCount(failureCount)
.successRate(successRate)
.averageProcessingTime(averageProcessingTime)
.totalProcessingTime(totalProcessingTime)
.build();

log.info("统计结果收集完成: 总数量={}, 成功率={}%, 平均处理时间={}ms",
totalCount, successRate * 100, averageProcessingTime);

return statisticsResult;

} catch (Exception e) {
log.error("统计结果收集失败", e);
throw new ResultCollectionException("统计结果收集失败: " + e.getMessage());
}
}

/**
* 收集异常结果
*/
public ExceptionResult collectExceptions(List<BatchResult<?>> batchResults) {
try {
if (batchResults == null || batchResults.isEmpty()) {
return ExceptionResult.builder()
.totalExceptions(0)
.exceptionTypes(new HashMap<>())
.exceptions(new ArrayList<>())
.build();
}

// 收集所有异常
List<Exception> allExceptions = batchResults.stream()
.flatMap(result -> result.getExceptions().stream())
.collect(Collectors.toList());

// 统计异常类型
Map<String, Integer> exceptionTypes = allExceptions.stream()
.collect(Collectors.groupingBy(
exception -> exception.getClass().getSimpleName(),
Collectors.collectingAndThen(Collectors.counting(), Math::toIntExact)
));

ExceptionResult exceptionResult = ExceptionResult.builder()
.totalExceptions(allExceptions.size())
.exceptionTypes(exceptionTypes)
.exceptions(allExceptions)
.build();

log.info("异常结果收集完成: 总异常数={}, 异常类型数={}",
allExceptions.size(), exceptionTypes.size());

return exceptionResult;

} catch (Exception e) {
log.error("异常结果收集失败", e);
throw new ResultCollectionException("异常结果收集失败: " + e.getMessage());
}
}

/**
* 获取结果收集统计信息
*/
public ResultCollectionStatistics getStatistics() {
try {
long total = totalResultsCollected.get();
long success = totalSuccessResults.get();
long failure = totalFailureResults.get();
double successRate = total > 0 ? (double) success / total : 0;

return ResultCollectionStatistics.builder()
.totalResultsCollected(total)
.totalSuccessResults(success)
.totalFailureResults(failure)
.successRate(successRate)
.build();

} catch (Exception e) {
log.error("获取结果收集统计信息失败", e);
throw new ResultCollectionException("获取结果收集统计信息失败: " + e.getMessage());
}
}
}

四、实战应用与最佳实践

4.1 大批量任务处理实战示例

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
/**
* 大批量任务处理实战示例
*/
@RestController
@RequestMapping("/api/batch-task")
@Slf4j
public class BatchTaskExample {

private final BatchTaskManager batchTaskManager;
private final ProgressMonitor progressMonitor;

/**
* 演示大批量数据处理
*/
@PostMapping("/demonstrate-data-processing")
public ResponseEntity<BatchTaskDemoResult> demonstrateDataProcessing(@RequestBody DataProcessingRequest request) {
try {
// 1. 准备大批量数据
List<String> data = generateLargeData(request.getDataSize());

// 2. 创建批量任务请求
BatchTaskRequest<String> batchRequest = BatchTaskRequest.<String>builder()
.data(data)
.batchSize(request.getBatchSize())
.processor(new DataProcessor())
.build();

// 3. 执行大批量任务
BatchTaskResult<String> result = batchTaskManager.executeBatchTask(batchRequest);

BatchTaskDemoResult demoResult = BatchTaskDemoResult.builder()
.taskId(result.getTaskId())
.totalCount(result.getTotalCount())
.successCount(result.getSuccessCount())
.failureCount(result.getFailureCount())
.success(true)
.build();

return ResponseEntity.ok(demoResult);

} catch (Exception e) {
log.error("大批量数据处理演示失败", e);
return ResponseEntity.badRequest().build();
}
}

/**
* 演示异步大批量任务
*/
@PostMapping("/demonstrate-async-processing")
public ResponseEntity<AsyncBatchTaskDemoResult> demonstrateAsyncProcessing(@RequestBody DataProcessingRequest request) {
try {
// 1. 准备大批量数据
List<String> data = generateLargeData(request.getDataSize());

// 2. 创建批量任务请求
BatchTaskRequest<String> batchRequest = BatchTaskRequest.<String>builder()
.data(data)
.batchSize(request.getBatchSize())
.processor(new DataProcessor())
.build();

// 3. 异步执行大批量任务
CompletableFuture<BatchTaskResult<String>> future = batchTaskManager.executeBatchTaskAsync(batchRequest);

AsyncBatchTaskDemoResult demoResult = AsyncBatchTaskDemoResult.builder()
.future(future)
.success(true)
.build();

return ResponseEntity.ok(demoResult);

} catch (Exception e) {
log.error("异步大批量任务演示失败", e);
return ResponseEntity.badRequest().build();
}
}

/**
* 获取任务进度
*/
@GetMapping("/progress/{taskId}")
public ResponseEntity<TaskProgress> getTaskProgress(@PathVariable String taskId) {
try {
TaskProgress progress = progressMonitor.getTaskProgress(taskId);
if (progress != null) {
return ResponseEntity.ok(progress);
} else {
return ResponseEntity.notFound().build();
}
} catch (Exception e) {
log.error("获取任务进度失败", e);
return ResponseEntity.badRequest().build();
}
}

/**
* 取消任务
*/
@PostMapping("/cancel/{taskId}")
public ResponseEntity<Void> cancelTask(@PathVariable String taskId) {
try {
boolean cancelled = batchTaskManager.cancelTask(taskId);
if (cancelled) {
return ResponseEntity.ok().build();
} else {
return ResponseEntity.notFound().build();
}
} catch (Exception e) {
log.error("取消任务失败", e);
return ResponseEntity.badRequest().build();
}
}

/**
* 生成大批量数据
*/
private List<String> generateLargeData(int size) {
List<String> data = new ArrayList<>();
for (int i = 0; i < size; i++) {
data.add("Data_" + i);
}
return data;
}

/**
* 数据处理器
*/
private static class DataProcessor implements BatchItemProcessor<String, String> {
@Override
public String process(String item) {
try {
// 模拟数据处理
Thread.sleep(10);
return "Processed_" + item;
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new RuntimeException("数据处理失败", e);
}
}
}
}

4.2 大批量任务处理最佳实践

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
/**
* 大批量任务处理最佳实践
*/
@Component
@Slf4j
public class BatchTaskBestPractices {

private final BatchTaskManager batchTaskManager;
private final BatchProcessor batchProcessor;
private final AsyncExecutor asyncExecutor;

/**
* 大批量任务处理最佳实践指南
*/
public void demonstrateBestPractices() {
log.info("=== 大批量任务处理最佳实践指南 ===");

// 1. 分批处理最佳实践
demonstrateBatchProcessingBestPractices();

// 2. 异步执行最佳实践
demonstrateAsyncExecutionBestPractices();

// 3. 进度监控最佳实践
demonstrateProgressMonitoringBestPractices();

// 4. 错误处理最佳实践
demonstrateErrorHandlingBestPractices();

// 5. 性能优化最佳实践
demonstratePerformanceOptimizationBestPractices();
}

/**
* 分批处理最佳实践
*/
private void demonstrateBatchProcessingBestPractices() {
log.info("--- 分批处理最佳实践 ---");

try {
// 1. 合理设置批次大小
List<String> data = generateTestData(1000);
int optimalBatchSize = calculateOptimalBatchSize(data.size());

List<List<String>> batches = batchProcessor.splitData(data, optimalBatchSize);
log.info("分批处理: 总数量={}, 批次大小={}, 批次数量={}",
data.size(), optimalBatchSize, batches.size());

// 2. 并行处理批次
List<BatchResult<String>> results = batchProcessor.processBatchesParallel(
batches, new TestProcessor(), 4);

log.info("并行批次处理完成: 批次数量={}", results.size());

// 3. 流水线处理批次
List<BatchResult<String>> pipelineResults = batchProcessor.processBatchesPipeline(
batches, new TestProcessor(), 3);

log.info("流水线批次处理完成: 批次数量={}", pipelineResults.size());

log.info("分批处理最佳实践示例完成");

} catch (Exception e) {
log.error("分批处理最佳实践示例失败", e);
}
}

/**
* 异步执行最佳实践
*/
private void demonstrateAsyncExecutionBestPractices() {
log.info("--- 异步执行最佳实践 ---");

try {
// 1. 异步执行任务
CompletableFuture<String> future = asyncExecutor.executeAsync(() -> {
// 模拟耗时操作
Thread.sleep(1000);
return "异步任务完成";
});

// 2. 异步执行任务(带超时)
CompletableFuture<String> timeoutFuture = asyncExecutor.executeAsyncWithTimeout(() -> {
// 模拟耗时操作
Thread.sleep(500);
return "超时任务完成";
}, 1, TimeUnit.SECONDS);

// 3. 异步执行任务(带重试)
CompletableFuture<String> retryFuture = asyncExecutor.executeAsyncWithRetry(() -> {
// 模拟可能失败的操作
if (Math.random() < 0.5) {
throw new RuntimeException("随机失败");
}
return "重试任务完成";
}, 3);

// 4. 批量异步执行任务
List<Callable<String>> tasks = Arrays.asList(
() -> "任务1",
() -> "任务2",
() -> "任务3"
);

CompletableFuture<List<String>> batchFuture = asyncExecutor.executeBatchAsync(tasks);

// 5. 等待所有任务完成
CompletableFuture.allOf(future, timeoutFuture, retryFuture, batchFuture).join();

log.info("异步执行最佳实践示例完成");

} catch (Exception e) {
log.error("异步执行最佳实践示例失败", e);
}
}

/**
* 进度监控最佳实践
*/
private void demonstrateProgressMonitoringBestPractices() {
log.info("--- 进度监控最佳实践 ---");

try {
// 1. 创建大批量任务
List<String> data = generateTestData(100);
BatchTaskRequest<String> request = BatchTaskRequest.<String>builder()
.data(data)
.batchSize(10)
.processor(new TestProcessor())
.build();

// 2. 异步执行任务
CompletableFuture<BatchTaskResult<String>> future = batchTaskManager.executeBatchTaskAsync(request);

// 3. 监控任务进度
String taskId = future.get().getTaskId();

// 定期检查进度
for (int i = 0; i < 10; i++) {
Thread.sleep(1000);

TaskProgress progress = batchTaskManager.getTaskProgress(taskId);
if (progress != null) {
log.info("任务进度: {}% - 状态: {}",
progress.getProgressPercentage(), progress.getStatus());
}
}

log.info("进度监控最佳实践示例完成");

} catch (Exception e) {
log.error("进度监控最佳实践示例失败", e);
}
}

/**
* 错误处理最佳实践
*/
private void demonstrateErrorHandlingBestPractices() {
log.info("--- 错误处理最佳实践 ---");

try {
// 1. 创建可能失败的任务
List<String> data = generateTestData(50);
BatchTaskRequest<String> request = BatchTaskRequest.<String>builder()
.data(data)
.batchSize(10)
.processor(new FailingProcessor())
.build();

// 2. 执行任务并处理异常
try {
BatchTaskResult<String> result = batchTaskManager.executeBatchTask(request);
log.info("任务执行结果: 成功={}, 失败={}",
result.getSuccessCount(), result.getFailureCount());
} catch (Exception e) {
log.error("任务执行失败", e);
}

// 3. 使用重试机制
CompletableFuture<String> retryFuture = asyncExecutor.executeAsyncWithRetry(() -> {
if (Math.random() < 0.7) {
throw new RuntimeException("模拟失败");
}
return "重试成功";
}, 5);

try {
String result = retryFuture.get();
log.info("重试任务结果: {}", result);
} catch (Exception e) {
log.error("重试任务失败", e);
}

log.info("错误处理最佳实践示例完成");

} catch (Exception e) {
log.error("错误处理最佳实践示例失败", e);
}
}

/**
* 性能优化最佳实践
*/
private void demonstratePerformanceOptimizationBestPractices() {
log.info("--- 性能优化最佳实践 ---");

try {
// 1. 使用合适的批次大小
List<String> data = generateTestData(1000);
int optimalBatchSize = calculateOptimalBatchSize(data.size());

// 2. 使用并行处理
List<List<String>> batches = batchProcessor.splitData(data, optimalBatchSize);
List<BatchResult<String>> results = batchProcessor.processBatchesParallel(
batches, new TestProcessor(), Runtime.getRuntime().availableProcessors());

// 3. 使用异步执行
CompletableFuture<BatchTaskResult<String>> future = batchTaskManager.executeBatchTaskAsync(
BatchTaskRequest.<String>builder()
.data(data)
.batchSize(optimalBatchSize)
.processor(new TestProcessor())
.build());

// 4. 监控性能指标
BatchProcessingStatistics statistics = batchProcessor.getStatistics();
log.info("处理统计: 总批次={}, 总项目={}, 平均处理时间={}ms",
statistics.getTotalBatchesProcessed(),
statistics.getTotalItemsProcessed(),
statistics.getAverageProcessingTime());

log.info("性能优化最佳实践示例完成");

} catch (Exception e) {
log.error("性能优化最佳实践示例失败", e);
}
}

/**
* 生成测试数据
*/
private List<String> generateTestData(int size) {
List<String> data = new ArrayList<>();
for (int i = 0; i < size; i++) {
data.add("TestData_" + i);
}
return data;
}

/**
* 计算最优批次大小
*/
private int calculateOptimalBatchSize(int totalSize) {
// 根据数据量计算最优批次大小
if (totalSize < 100) {
return 10;
} else if (totalSize < 1000) {
return 50;
} else if (totalSize < 10000) {
return 100;
} else {
return 200;
}
}

/**
* 测试处理器
*/
private static class TestProcessor implements BatchItemProcessor<String, String> {
@Override
public String process(String item) {
try {
Thread.sleep(10);
return "Processed_" + item;
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new RuntimeException("处理失败", e);
}
}
}

/**
* 失败处理器
*/
private static class FailingProcessor implements BatchItemProcessor<String, String> {
@Override
public String process(String item) {
if (Math.random() < 0.3) {
throw new RuntimeException("随机失败: " + item);
}
return "Processed_" + item;
}
}
}

五、总结

本文深入探讨了SpringBoot中大批量任务处理的架构设计与实战应用。通过构建企业级的大批量任务处理框架,我们实现了:

  1. 分批处理:将大批量数据分割成小批次进行处理,避免内存溢出和超时问题
  2. 异步执行:提供异步执行能力,支持超时控制、重试机制和批量处理
  3. 进度监控:实时监控任务执行进度,提供进度跟踪和告警功能
  4. 结果收集:统一收集和聚合处理结果,支持统计分析
  5. 性能优化:通过并行处理、流水线处理等技术提升处理效率

通过这套企业级的大批量任务处理架构,我们能够高效处理大量数据和复杂任务,提供完整的任务管理和监控能力。随着数据量的不断增长和业务复杂度的提升,构建可扩展、可监控的批量任务处理框架将成为企业级架构师必须掌握的核心技能。

在实际应用中,建议遵循大批量任务处理的最佳实践,合理设置批次大小,使用异步执行和并行处理,及时监控任务进度,并通过错误处理和重试机制确保任务的稳定执行。只有这样,才能真正发挥大批量任务处理的价值,构建高质量的企业级应用系统。