1. ES批次处理概述

Elasticsearch批次处理是处理大量数据的高效方式,通过将数据分批处理可以显著提升性能、减少内存占用、提高系统稳定性。在Java应用中,合理使用ES批次处理可以实现高效的数据导入、更新、删除等操作。本文将详细介绍ES批次处理的各种场景、实现方法、性能优化技巧以及在Java实战中的应用。

1.1 批次处理的核心价值

  1. 性能提升: 通过批量操作减少网络开销
  2. 内存优化: 分批处理避免内存溢出
  3. 稳定性增强: 降低单次操作失败的影响
  4. 可监控性: 便于监控处理进度和性能
  5. 错误处理: 支持部分失败和重试机制

1.2 批次处理场景

  • 数据导入: 大量数据从外部系统导入到ES
  • 数据更新: 批量更新现有文档
  • 数据删除: 批量删除不需要的文档
  • 数据迁移: 从一个索引迁移到另一个索引
  • 数据同步: 与外部系统保持数据同步

2. ES批次处理基础实现

2.1 基础批次处理服务

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
/**
* ES基础批次处理服务
* @author 运维实战
*/
@Service
public class ESBatchProcessingService {

@Autowired
private RestHighLevelClient elasticsearchClient;

private static final Logger logger = LoggerFactory.getLogger(ESBatchProcessingService.class);

/**
* 批量插入文档
* @param indexName 索引名称
* @param documents 文档列表
* @param batchSize 批次大小
* @return 处理结果
*/
public BatchProcessingResult batchInsert(String indexName, List<Map<String, Object>> documents, int batchSize) {
logger.info("开始批量插入文档,索引: {}, 总数量: {}, 批次大小: {}", indexName, documents.size(), batchSize);

BatchProcessingResult result = new BatchProcessingResult();
result.setTotalCount(documents.size());
result.setBatchSize(batchSize);
result.setStartTime(System.currentTimeMillis());

try {
// 分批处理
List<List<Map<String, Object>>> batches = partitionList(documents, batchSize);

for (int i = 0; i < batches.size(); i++) {
List<Map<String, Object>> batch = batches.get(i);

try {
// 执行批量插入
BulkResponse bulkResponse = executeBatchInsert(indexName, batch);

// 处理响应
processBulkResponse(bulkResponse, result, i + 1);

logger.info("批次 {} 处理完成,成功: {}, 失败: {}",
i + 1, result.getSuccessCount(), result.getFailureCount());

} catch (Exception e) {
logger.error("批次 {} 处理失败", i + 1, e);
result.addFailure(batch.size());
}

// 批次间延迟,避免过度压力
if (i < batches.size() - 1) {
Thread.sleep(100);
}
}

} catch (Exception e) {
logger.error("批量插入失败", e);
result.setError(e.getMessage());
}

result.setEndTime(System.currentTimeMillis());
logger.info("批量插入完成,总耗时: {}ms, 成功: {}, 失败: {}",
result.getDuration(), result.getSuccessCount(), result.getFailureCount());

return result;
}

/**
* 执行批量插入
* @param indexName 索引名称
* @param documents 文档列表
* @return 批量响应
*/
private BulkResponse executeBatchInsert(String indexName, List<Map<String, Object>> documents) throws IOException {
BulkRequest bulkRequest = new BulkRequest();

for (Map<String, Object> document : documents) {
IndexRequest indexRequest = new IndexRequest(indexName)
.source(document);
bulkRequest.add(indexRequest);
}

// 设置刷新策略
bulkRequest.setRefreshPolicy(WriteRequest.RefreshPolicy.NONE);

return elasticsearchClient.bulk(bulkRequest, RequestOptions.DEFAULT);
}

/**
* 处理批量响应
* @param bulkResponse 批量响应
* @param result 处理结果
* @param batchNumber 批次号
*/
private void processBulkResponse(BulkResponse bulkResponse, BatchProcessingResult result, int batchNumber) {
if (bulkResponse.hasFailures()) {
logger.warn("批次 {} 存在失败项", batchNumber);

for (BulkItemResponse item : bulkResponse.getItems()) {
if (item.isFailed()) {
logger.error("文档处理失败: {}", item.getFailureMessage());
result.addFailure(1);
} else {
result.addSuccess(1);
}
}
} else {
result.addSuccess(bulkResponse.getItems().length);
}
}

/**
* 分割列表
* @param list 原始列表
* @param batchSize 批次大小
* @return 分割后的列表
*/
private <T> List<List<T>> partitionList(List<T> list, int batchSize) {
List<List<T>> partitions = new ArrayList<>();

for (int i = 0; i < list.size(); i += batchSize) {
int end = Math.min(i + batchSize, list.size());
partitions.add(list.subList(i, end));
}

return partitions;
}
}

2.2 批次处理结果类

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
/**
* 批次处理结果类
* @author 运维实战
*/
public class BatchProcessingResult {

private int totalCount;
private int batchSize;
private int successCount;
private int failureCount;
private long startTime;
private long endTime;
private String error;
private List<String> errorMessages;

public BatchProcessingResult() {
this.successCount = 0;
this.failureCount = 0;
this.errorMessages = new ArrayList<>();
}

/**
* 添加成功计数
* @param count 成功数量
*/
public void addSuccess(int count) {
this.successCount += count;
}

/**
* 添加失败计数
* @param count 失败数量
*/
public void addFailure(int count) {
this.failureCount += count;
}

/**
* 添加错误消息
* @param errorMessage 错误消息
*/
public void addError(String errorMessage) {
this.errorMessages.add(errorMessage);
}

/**
* 获取处理耗时
* @return 处理耗时(毫秒)
*/
public long getDuration() {
return endTime - startTime;
}

/**
* 获取成功率
* @return 成功率
*/
public double getSuccessRate() {
if (totalCount == 0) return 0.0;
return (double) successCount / totalCount * 100;
}

/**
* 是否全部成功
* @return 是否全部成功
*/
public boolean isAllSuccess() {
return failureCount == 0;
}

// getter和setter方法
public int getTotalCount() { return totalCount; }
public void setTotalCount(int totalCount) { this.totalCount = totalCount; }

public int getBatchSize() { return batchSize; }
public void setBatchSize(int batchSize) { this.batchSize = batchSize; }

public int getSuccessCount() { return successCount; }
public void setSuccessCount(int successCount) { this.successCount = successCount; }

public int getFailureCount() { return failureCount; }
public void setFailureCount(int failureCount) { this.failureCount = failureCount; }

public long getStartTime() { return startTime; }
public void setStartTime(long startTime) { this.startTime = startTime; }

public long getEndTime() { return endTime; }
public void setEndTime(long endTime) { this.endTime = endTime; }

public String getError() { return error; }
public void setError(String error) { this.error = error; }

public List<String> getErrorMessages() { return errorMessages; }
public void setErrorMessages(List<String> errorMessages) { this.errorMessages = errorMessages; }
}

2.3 批量更新服务

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
/**
* ES批量更新服务
* @author 运维实战
*/
@Service
public class ESBatchUpdateService {

@Autowired
private RestHighLevelClient elasticsearchClient;

private static final Logger logger = LoggerFactory.getLogger(ESBatchUpdateService.class);

/**
* 批量更新文档
* @param indexName 索引名称
* @param updates 更新数据列表
* @param batchSize 批次大小
* @return 处理结果
*/
public BatchProcessingResult batchUpdate(String indexName, List<DocumentUpdate> updates, int batchSize) {
logger.info("开始批量更新文档,索引: {}, 总数量: {}, 批次大小: {}", indexName, updates.size(), batchSize);

BatchProcessingResult result = new BatchProcessingResult();
result.setTotalCount(updates.size());
result.setBatchSize(batchSize);
result.setStartTime(System.currentTimeMillis());

try {
// 分批处理
List<List<DocumentUpdate>> batches = partitionList(updates, batchSize);

for (int i = 0; i < batches.size(); i++) {
List<DocumentUpdate> batch = batches.get(i);

try {
// 执行批量更新
BulkResponse bulkResponse = executeBatchUpdate(indexName, batch);

// 处理响应
processBulkResponse(bulkResponse, result, i + 1);

logger.info("批次 {} 处理完成,成功: {}, 失败: {}",
i + 1, result.getSuccessCount(), result.getFailureCount());

} catch (Exception e) {
logger.error("批次 {} 处理失败", i + 1, e);
result.addFailure(batch.size());
}

// 批次间延迟
if (i < batches.size() - 1) {
Thread.sleep(100);
}
}

} catch (Exception e) {
logger.error("批量更新失败", e);
result.setError(e.getMessage());
}

result.setEndTime(System.currentTimeMillis());
logger.info("批量更新完成,总耗时: {}ms, 成功: {}, 失败: {}",
result.getDuration(), result.getSuccessCount(), result.getFailureCount());

return result;
}

/**
* 执行批量更新
* @param indexName 索引名称
* @param updates 更新数据列表
* @return 批量响应
*/
private BulkResponse executeBatchUpdate(String indexName, List<DocumentUpdate> updates) throws IOException {
BulkRequest bulkRequest = new BulkRequest();

for (DocumentUpdate update : updates) {
UpdateRequest updateRequest = new UpdateRequest(indexName, update.getId())
.doc(update.getDocument())
.docAsUpsert(update.isUpsert());
bulkRequest.add(updateRequest);
}

// 设置刷新策略
bulkRequest.setRefreshPolicy(WriteRequest.RefreshPolicy.NONE);

return elasticsearchClient.bulk(bulkRequest, RequestOptions.DEFAULT);
}

/**
* 处理批量响应
* @param bulkResponse 批量响应
* @param result 处理结果
* @param batchNumber 批次号
*/
private void processBulkResponse(BulkResponse bulkResponse, BatchProcessingResult result, int batchNumber) {
if (bulkResponse.hasFailures()) {
logger.warn("批次 {} 存在失败项", batchNumber);

for (BulkItemResponse item : bulkResponse.getItems()) {
if (item.isFailed()) {
logger.error("文档更新失败: {}", item.getFailureMessage());
result.addFailure(1);
} else {
result.addSuccess(1);
}
}
} else {
result.addSuccess(bulkResponse.getItems().length);
}
}

/**
* 分割列表
* @param list 原始列表
* @param batchSize 批次大小
* @return 分割后的列表
*/
private <T> List<List<T>> partitionList(List<T> list, int batchSize) {
List<List<T>> partitions = new ArrayList<>();

for (int i = 0; i < list.size(); i += batchSize) {
int end = Math.min(i + batchSize, list.size());
partitions.add(list.subList(i, end));
}

return partitions;
}
}

2.4 文档更新类

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
/**
* 文档更新类
* @author 运维实战
*/
public class DocumentUpdate {

private String id;
private Map<String, Object> document;
private boolean upsert;

public DocumentUpdate() {
this.upsert = false;
}

public DocumentUpdate(String id, Map<String, Object> document) {
this.id = id;
this.document = document;
this.upsert = false;
}

public DocumentUpdate(String id, Map<String, Object> document, boolean upsert) {
this.id = id;
this.document = document;
this.upsert = upsert;
}

// getter和setter方法
public String getId() { return id; }
public void setId(String id) { this.id = id; }

public Map<String, Object> getDocument() { return document; }
public void setDocument(Map<String, Object> document) { this.document = document; }

public boolean isUpsert() { return upsert; }
public void setUpsert(boolean upsert) { this.upsert = upsert; }
}

3. 高级批次处理功能

3.1 异步批次处理服务

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
/**
* ES异步批次处理服务
* @author 运维实战
*/
@Service
public class ESAsyncBatchProcessingService {

@Autowired
private RestHighLevelClient elasticsearchClient;

@Autowired
private TaskExecutor taskExecutor;

private static final Logger logger = LoggerFactory.getLogger(ESAsyncBatchProcessingService.class);

/**
* 异步批量处理
* @param indexName 索引名称
* @param documents 文档列表
* @param batchSize 批次大小
* @return 处理结果
*/
public CompletableFuture<BatchProcessingResult> asyncBatchProcess(String indexName,
List<Map<String, Object>> documents,
int batchSize) {
logger.info("开始异步批量处理,索引: {}, 总数量: {}, 批次大小: {}", indexName, documents.size(), batchSize);

BatchProcessingResult result = new BatchProcessingResult();
result.setTotalCount(documents.size());
result.setBatchSize(batchSize);
result.setStartTime(System.currentTimeMillis());

// 分批处理
List<List<Map<String, Object>>> batches = partitionList(documents, batchSize);

// 创建异步任务列表
List<CompletableFuture<Void>> futures = new ArrayList<>();

for (int i = 0; i < batches.size(); i++) {
final int batchIndex = i;
final List<Map<String, Object>> batch = batches.get(i);

CompletableFuture<Void> future = CompletableFuture.runAsync(() -> {
try {
// 执行批量处理
BulkResponse bulkResponse = executeBatchInsert(indexName, batch);

// 处理响应
processBulkResponse(bulkResponse, result, batchIndex + 1);

logger.info("异步批次 {} 处理完成,成功: {}, 失败: {}",
batchIndex + 1, result.getSuccessCount(), result.getFailureCount());

} catch (Exception e) {
logger.error("异步批次 {} 处理失败", batchIndex + 1, e);
result.addFailure(batch.size());
}
}, taskExecutor);

futures.add(future);
}

// 等待所有任务完成
return CompletableFuture.allOf(futures.toArray(new CompletableFuture[0]))
.thenApply(v -> {
result.setEndTime(System.currentTimeMillis());
logger.info("异步批量处理完成,总耗时: {}ms, 成功: {}, 失败: {}",
result.getDuration(), result.getSuccessCount(), result.getFailureCount());
return result;
});
}

/**
* 执行批量插入
* @param indexName 索引名称
* @param documents 文档列表
* @return 批量响应
*/
private BulkResponse executeBatchInsert(String indexName, List<Map<String, Object>> documents) throws IOException {
BulkRequest bulkRequest = new BulkRequest();

for (Map<String, Object> document : documents) {
IndexRequest indexRequest = new IndexRequest(indexName)
.source(document);
bulkRequest.add(indexRequest);
}

// 设置刷新策略
bulkRequest.setRefreshPolicy(WriteRequest.RefreshPolicy.NONE);

return elasticsearchClient.bulk(bulkRequest, RequestOptions.DEFAULT);
}

/**
* 处理批量响应
* @param bulkResponse 批量响应
* @param result 处理结果
* @param batchNumber 批次号
*/
private void processBulkResponse(BulkResponse bulkResponse, BatchProcessingResult result, int batchNumber) {
if (bulkResponse.hasFailures()) {
logger.warn("批次 {} 存在失败项", batchNumber);

for (BulkItemResponse item : bulkResponse.getItems()) {
if (item.isFailed()) {
logger.error("文档处理失败: {}", item.getFailureMessage());
result.addFailure(1);
} else {
result.addSuccess(1);
}
}
} else {
result.addSuccess(bulkResponse.getItems().length);
}
}

/**
* 分割列表
* @param list 原始列表
* @param batchSize 批次大小
* @return 分割后的列表
*/
private <T> List<List<T>> partitionList(List<T> list, int batchSize) {
List<List<T>> partitions = new ArrayList<>();

for (int i = 0; i < list.size(); i += batchSize) {
int end = Math.min(i + batchSize, list.size());
partitions.add(list.subList(i, end));
}

return partitions;
}
}

3.2 批次处理监控服务

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
/**
* ES批次处理监控服务
* @author 运维实战
*/
@Service
public class ESBatchProcessingMonitorService {

private static final Logger logger = LoggerFactory.getLogger(ESBatchProcessingMonitorService.class);

private final Map<String, BatchProcessingProgress> progressMap = new ConcurrentHashMap<>();

/**
* 开始监控批次处理
* @param taskId 任务ID
* @param totalCount 总数量
* @param batchSize 批次大小
* @return 处理进度
*/
public BatchProcessingProgress startMonitoring(String taskId, int totalCount, int batchSize) {
BatchProcessingProgress progress = new BatchProcessingProgress();
progress.setTaskId(taskId);
progress.setTotalCount(totalCount);
progress.setBatchSize(batchSize);
progress.setStartTime(System.currentTimeMillis());
progress.setStatus(BatchProcessingStatus.RUNNING);

progressMap.put(taskId, progress);

logger.info("开始监控批次处理任务: {}, 总数量: {}, 批次大小: {}", taskId, totalCount, batchSize);

return progress;
}

/**
* 更新处理进度
* @param taskId 任务ID
* @param processedCount 已处理数量
* @param successCount 成功数量
* @param failureCount 失败数量
*/
public void updateProgress(String taskId, int processedCount, int successCount, int failureCount) {
BatchProcessingProgress progress = progressMap.get(taskId);
if (progress != null) {
progress.setProcessedCount(processedCount);
progress.setSuccessCount(successCount);
progress.setFailureCount(failureCount);
progress.setLastUpdateTime(System.currentTimeMillis());

// 计算进度百分比
double percentage = (double) processedCount / progress.getTotalCount() * 100;
progress.setPercentage(percentage);

// 计算处理速度
long duration = progress.getLastUpdateTime() - progress.getStartTime();
if (duration > 0) {
double speed = (double) processedCount / duration * 1000; // 每秒处理数量
progress.setProcessingSpeed(speed);
}

logger.info("任务 {} 进度更新: {}/{} ({}%), 成功: {}, 失败: {}, 速度: {}/s",
taskId, processedCount, progress.getTotalCount(),
String.format("%.2f", percentage), successCount, failureCount,
String.format("%.2f", progress.getProcessingSpeed()));
}
}

/**
* 完成任务
* @param taskId 任务ID
* @param result 处理结果
*/
public void completeTask(String taskId, BatchProcessingResult result) {
BatchProcessingProgress progress = progressMap.get(taskId);
if (progress != null) {
progress.setStatus(BatchProcessingStatus.COMPLETED);
progress.setEndTime(System.currentTimeMillis());
progress.setResult(result);

logger.info("任务 {} 完成,总耗时: {}ms, 成功: {}, 失败: {}, 成功率: {}%",
taskId, progress.getDuration(), result.getSuccessCount(),
result.getFailureCount(), String.format("%.2f", result.getSuccessRate()));
}
}

/**
* 失败任务
* @param taskId 任务ID
* @param error 错误信息
*/
public void failTask(String taskId, String error) {
BatchProcessingProgress progress = progressMap.get(taskId);
if (progress != null) {
progress.setStatus(BatchProcessingStatus.FAILED);
progress.setEndTime(System.currentTimeMillis());
progress.setError(error);

logger.error("任务 {} 失败: {}", taskId, error);
}
}

/**
* 获取处理进度
* @param taskId 任务ID
* @return 处理进度
*/
public BatchProcessingProgress getProgress(String taskId) {
return progressMap.get(taskId);
}

/**
* 获取所有任务进度
* @return 所有任务进度
*/
public Map<String, BatchProcessingProgress> getAllProgress() {
return new HashMap<>(progressMap);
}

/**
* 清理已完成的任务
* @param taskId 任务ID
*/
public void cleanupTask(String taskId) {
progressMap.remove(taskId);
logger.info("清理任务: {}", taskId);
}
}

3.3 批次处理进度类

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
/**
* 批次处理进度类
* @author 运维实战
*/
public class BatchProcessingProgress {

private String taskId;
private int totalCount;
private int batchSize;
private int processedCount;
private int successCount;
private int failureCount;
private double percentage;
private double processingSpeed;
private long startTime;
private long lastUpdateTime;
private long endTime;
private BatchProcessingStatus status;
private BatchProcessingResult result;
private String error;

public BatchProcessingProgress() {
this.processedCount = 0;
this.successCount = 0;
this.failureCount = 0;
this.percentage = 0.0;
this.processingSpeed = 0.0;
this.status = BatchProcessingStatus.PENDING;
}

/**
* 获取处理耗时
* @return 处理耗时(毫秒)
*/
public long getDuration() {
if (endTime > 0) {
return endTime - startTime;
} else if (lastUpdateTime > 0) {
return lastUpdateTime - startTime;
} else {
return 0;
}
}

/**
* 是否完成
* @return 是否完成
*/
public boolean isCompleted() {
return status == BatchProcessingStatus.COMPLETED;
}

/**
* 是否失败
* @return 是否失败
*/
public boolean isFailed() {
return status == BatchProcessingStatus.FAILED;
}

/**
* 是否运行中
* @return 是否运行中
*/
public boolean isRunning() {
return status == BatchProcessingStatus.RUNNING;
}

// getter和setter方法
public String getTaskId() { return taskId; }
public void setTaskId(String taskId) { this.taskId = taskId; }

public int getTotalCount() { return totalCount; }
public void setTotalCount(int totalCount) { this.totalCount = totalCount; }

public int getBatchSize() { return batchSize; }
public void setBatchSize(int batchSize) { this.batchSize = batchSize; }

public int getProcessedCount() { return processedCount; }
public void setProcessedCount(int processedCount) { this.processedCount = processedCount; }

public int getSuccessCount() { return successCount; }
public void setSuccessCount(int successCount) { this.successCount = successCount; }

public int getFailureCount() { return failureCount; }
public void setFailureCount(int failureCount) { this.failureCount = failureCount; }

public double getPercentage() { return percentage; }
public void setPercentage(double percentage) { this.percentage = percentage; }

public double getProcessingSpeed() { return processingSpeed; }
public void setProcessingSpeed(double processingSpeed) { this.processingSpeed = processingSpeed; }

public long getStartTime() { return startTime; }
public void setStartTime(long startTime) { this.startTime = startTime; }

public long getLastUpdateTime() { return lastUpdateTime; }
public void setLastUpdateTime(long lastUpdateTime) { this.lastUpdateTime = lastUpdateTime; }

public long getEndTime() { return endTime; }
public void setEndTime(long endTime) { this.endTime = endTime; }

public BatchProcessingStatus getStatus() { return status; }
public void setStatus(BatchProcessingStatus status) { this.status = status; }

public BatchProcessingResult getResult() { return result; }
public void setResult(BatchProcessingResult result) { this.result = result; }

public String getError() { return error; }
public void setError(String error) { this.error = error; }
}

3.4 批次处理状态枚举

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
/**
* 批次处理状态枚举
* @author 运维实战
*/
public enum BatchProcessingStatus {
PENDING("待处理"),
RUNNING("运行中"),
COMPLETED("已完成"),
FAILED("失败"),
CANCELLED("已取消");

private final String description;

BatchProcessingStatus(String description) {
this.description = description;
}

public String getDescription() {
return description;
}
}

4. 批次处理控制器

4.1 批次处理REST控制器

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
/**
* ES批次处理REST控制器
* @author 运维实战
*/
@RestController
@RequestMapping("/api/es/batch")
public class ESBatchProcessingController {

@Autowired
private ESBatchProcessingService batchProcessingService;

@Autowired
private ESBatchUpdateService batchUpdateService;

@Autowired
private ESAsyncBatchProcessingService asyncBatchProcessingService;

@Autowired
private ESBatchProcessingMonitorService monitorService;

private static final Logger logger = LoggerFactory.getLogger(ESBatchProcessingController.class);

/**
* 批量插入文档
* @param request 批量插入请求
* @return 处理结果
*/
@PostMapping("/insert")
public ResponseEntity<BatchProcessingResult> batchInsert(@RequestBody BatchInsertRequest request) {
try {
logger.info("接收到批量插入请求,索引: {}, 数量: {}", request.getIndexName(), request.getDocuments().size());

BatchProcessingResult result = batchProcessingService.batchInsert(
request.getIndexName(),
request.getDocuments(),
request.getBatchSize()
);

return ResponseEntity.ok(result);
} catch (Exception e) {
logger.error("批量插入失败", e);
return ResponseEntity.status(HttpStatus.INTERNAL_SERVER_ERROR).build();
}
}

/**
* 批量更新文档
* @param request 批量更新请求
* @return 处理结果
*/
@PostMapping("/update")
public ResponseEntity<BatchProcessingResult> batchUpdate(@RequestBody BatchUpdateRequest request) {
try {
logger.info("接收到批量更新请求,索引: {}, 数量: {}", request.getIndexName(), request.getUpdates().size());

BatchProcessingResult result = batchUpdateService.batchUpdate(
request.getIndexName(),
request.getUpdates(),
request.getBatchSize()
);

return ResponseEntity.ok(result);
} catch (Exception e) {
logger.error("批量更新失败", e);
return ResponseEntity.status(HttpStatus.INTERNAL_SERVER_ERROR).build();
}
}

/**
* 异步批量处理
* @param request 异步批量处理请求
* @return 任务ID
*/
@PostMapping("/async")
public ResponseEntity<Map<String, String>> asyncBatchProcess(@RequestBody BatchInsertRequest request) {
try {
logger.info("接收到异步批量处理请求,索引: {}, 数量: {}", request.getIndexName(), request.getDocuments().size());

String taskId = UUID.randomUUID().toString();

// 开始监控
monitorService.startMonitoring(taskId, request.getDocuments().size(), request.getBatchSize());

// 异步处理
CompletableFuture<BatchProcessingResult> future = asyncBatchProcessingService.asyncBatchProcess(
request.getIndexName(),
request.getDocuments(),
request.getBatchSize()
);

// 处理完成回调
future.whenComplete((result, throwable) -> {
if (throwable != null) {
monitorService.failTask(taskId, throwable.getMessage());
} else {
monitorService.completeTask(taskId, result);
}
});

Map<String, String> response = new HashMap<>();
response.put("taskId", taskId);
response.put("status", "STARTED");

return ResponseEntity.ok(response);
} catch (Exception e) {
logger.error("异步批量处理失败", e);
return ResponseEntity.status(HttpStatus.INTERNAL_SERVER_ERROR).build();
}
}

/**
* 获取处理进度
* @param taskId 任务ID
* @return 处理进度
*/
@GetMapping("/progress/{taskId}")
public ResponseEntity<BatchProcessingProgress> getProgress(@PathVariable String taskId) {
try {
BatchProcessingProgress progress = monitorService.getProgress(taskId);
if (progress != null) {
return ResponseEntity.ok(progress);
} else {
return ResponseEntity.notFound().build();
}
} catch (Exception e) {
logger.error("获取处理进度失败", e);
return ResponseEntity.status(HttpStatus.INTERNAL_SERVER_ERROR).build();
}
}

/**
* 获取所有任务进度
* @return 所有任务进度
*/
@GetMapping("/progress")
public ResponseEntity<Map<String, BatchProcessingProgress>> getAllProgress() {
try {
Map<String, BatchProcessingProgress> progressMap = monitorService.getAllProgress();
return ResponseEntity.ok(progressMap);
} catch (Exception e) {
logger.error("获取所有任务进度失败", e);
return ResponseEntity.status(HttpStatus.INTERNAL_SERVER_ERROR).build();
}
}

/**
* 清理任务
* @param taskId 任务ID
* @return 清理结果
*/
@DeleteMapping("/progress/{taskId}")
public ResponseEntity<Map<String, String>> cleanupTask(@PathVariable String taskId) {
try {
monitorService.cleanupTask(taskId);

Map<String, String> response = new HashMap<>();
response.put("taskId", taskId);
response.put("status", "CLEANED");

return ResponseEntity.ok(response);
} catch (Exception e) {
logger.error("清理任务失败", e);
return ResponseEntity.status(HttpStatus.INTERNAL_SERVER_ERROR).build();
}
}
}

4.2 请求类定义

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
/**
* 批量插入请求类
* @author 运维实战
*/
public class BatchInsertRequest {

private String indexName;
private List<Map<String, Object>> documents;
private int batchSize = 1000;

public BatchInsertRequest() {}

public BatchInsertRequest(String indexName, List<Map<String, Object>> documents) {
this.indexName = indexName;
this.documents = documents;
}

public BatchInsertRequest(String indexName, List<Map<String, Object>> documents, int batchSize) {
this.indexName = indexName;
this.documents = documents;
this.batchSize = batchSize;
}

// getter和setter方法
public String getIndexName() { return indexName; }
public void setIndexName(String indexName) { this.indexName = indexName; }

public List<Map<String, Object>> getDocuments() { return documents; }
public void setDocuments(List<Map<String, Object>> documents) { this.documents = documents; }

public int getBatchSize() { return batchSize; }
public void setBatchSize(int batchSize) { this.batchSize = batchSize; }
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
/**
* 批量更新请求类
* @author 运维实战
*/
public class BatchUpdateRequest {

private String indexName;
private List<DocumentUpdate> updates;
private int batchSize = 1000;

public BatchUpdateRequest() {}

public BatchUpdateRequest(String indexName, List<DocumentUpdate> updates) {
this.indexName = indexName;
this.updates = updates;
}

public BatchUpdateRequest(String indexName, List<DocumentUpdate> updates, int batchSize) {
this.indexName = indexName;
this.updates = updates;
this.batchSize = batchSize;
}

// getter和setter方法
public String getIndexName() { return indexName; }
public void setIndexName(String indexName) { this.indexName = indexName; }

public List<DocumentUpdate> getUpdates() { return updates; }
public void setUpdates(List<DocumentUpdate> updates) { this.updates = updates; }

public int getBatchSize() { return batchSize; }
public void setBatchSize(int batchSize) { this.batchSize = batchSize; }
}

5. 总结

5.1 ES批次处理最佳实践

  1. 合理设置批次大小: 根据数据大小和网络条件调整批次大小
  2. 异步处理: 使用异步处理提升系统响应性能
  3. 进度监控: 建立完善的进度监控和告警机制
  4. 错误处理: 实现部分失败和重试机制
  5. 性能优化: 优化批量操作参数和网络配置

5.2 性能优化建议

  • 批次大小: 根据数据大小调整,通常1000-5000条
  • 并发控制: 控制并发批次数量,避免过度压力
  • 网络优化: 使用压缩和连接池优化网络性能
  • 内存管理: 合理管理内存使用,避免内存溢出
  • 监控告警: 建立性能监控和异常告警机制

5.3 运维管理要点

  • 任务管理: 建立任务生命周期管理机制
  • 进度跟踪: 实时跟踪处理进度和性能指标
  • 错误处理: 建立错误处理和重试机制
  • 资源监控: 监控系统资源使用情况
  • 日志管理: 完善日志记录和分析

通过本文的ES多个批次处理任务Java实战指南,您可以掌握Elasticsearch批次处理的各种实现方法、性能优化技巧以及在企业级应用中的最佳实践,构建高效、稳定的ES批次处理系统!