1. ES的批量(Bulk)概述

ES的批量(Bulk)是Elasticsearch提供的高性能批量操作API,通过将多个索引、更新、删除操作合并为单个请求,可以显著提升数据处理效率,减少网络开销,优化系统性能。在Java应用中,合理使用ES的Bulk API可以实现批量数据处理、性能优化、资源节约等功能。本文将详细介绍ES的批量(Bulk)的原理、实现方法、性能优化技巧以及在Java实战中的应用。

1.1 ES的批量(Bulk)核心价值

  1. 性能提升: 通过批量操作显著提升数据处理效率
  2. 网络优化: 减少网络请求次数和开销
  3. 资源节约: 降低系统资源消耗
  4. 吞吐量提升: 提高系统整体吞吐量
  5. 监控简化: 简化批量操作监控和管理

1.2 ES批量(Bulk)场景

  • 批量数据导入: 大量数据批量导入Elasticsearch
  • 批量数据更新: 批量更新索引中的数据
  • 批量数据删除: 批量删除索引中的数据
  • 数据迁移: 数据从一个索引迁移到另一个
  • 实时数据处理: 实时批量处理流式数据

1.3 Bulk操作类型

  • INDEX: 索引操作(创建或更新文档)
  • CREATE: 创建操作(仅创建新文档)
  • UPDATE: 更新操作(更新现有文档)
  • DELETE: 删除操作(删除文档)

2. ES的批量(Bulk)基础实现

2.1 ES批量(Bulk)配置类

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
/**
* ES批量(Bulk)配置类
* @author 运维实战
*/
@Configuration
@EnableConfigurationProperties(ESBulkProperties.class)
public class ESBulkConfig {

@Autowired
private ESBulkProperties properties;

/**
* ES批量(Bulk)服务
* @return 批量服务
*/
@Bean
public ESBulkService esBulkService() {
return new ESBulkService();
}

/**
* ES批量(Bulk)监控服务
* @return 监控服务
*/
@Bean
public ESBulkMonitorService esBulkMonitorService() {
return new ESBulkMonitorService();
}

/**
* ES批量(Bulk)优化服务
* @return 优化服务
*/
@Bean
public ESBulkOptimizeService esBulkOptimizeService() {
return new ESBulkOptimizeService();
}

private static final Logger logger = LoggerFactory.getLogger(ESBulkConfig.class);
}

2.2 ES批量(Bulk)属性配置

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
/**
* ES批量(Bulk)属性配置
* @author 运维实战
*/
@Data
@ConfigurationProperties(prefix = "elasticsearch.bulk")
public class ESBulkProperties {

/**
* 最大批量大小
*/
private int maxBatchSize = 1000;

/**
* 批量操作超时时间(毫秒)
*/
private long bulkTimeoutMs = 30000;

/**
* 是否启用批量操作
*/
private boolean enableBulk = true;

/**
* 是否启用批量索引
*/
private boolean enableBulkIndex = true;

/**
* 是否启用批量更新
*/
private boolean enableBulkUpdate = true;

/**
* 是否启用批量删除
*/
private boolean enableBulkDelete = true;

/**
* 是否启用批量创建
*/
private boolean enableBulkCreate = true;

/**
* 是否启用并发批量操作
*/
private boolean enableConcurrentBulk = true;

/**
* 并发批量操作线程数
*/
private int concurrentBulkThreads = 5;

/**
* 是否启用批量操作重试
*/
private boolean enableBulkRetry = true;

/**
* 批量操作重试次数
*/
private int bulkRetryCount = 3;

/**
* 批量操作重试间隔(毫秒)
*/
private long bulkRetryIntervalMs = 1000;

/**
* 是否启用监控
*/
private boolean enableMonitor = true;

/**
* 监控间隔(毫秒)
*/
private long monitorInterval = 30000;
}

2.3 基础ES批量(Bulk)服务

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
/**
* 基础ES批量(Bulk)服务
* @author 运维实战
*/
@Service
public class ESBulkService {

@Autowired
private RestHighLevelClient elasticsearchClient;

@Autowired
private ESBulkProperties properties;

@Autowired
private ESBulkMonitorService esBulkMonitorService;

@Autowired
private ESBulkOptimizeService esBulkOptimizeService;

private static final Logger logger = LoggerFactory.getLogger(ESBulkService.class);

/**
* 批量索引操作
* @param indexName 索引名称
* @param documents 文档列表
* @return 批量操作结果
*/
public ESBulkResult bulkIndex(String indexName, List<Map<String, Object>> documents) {
logger.info("开始批量索引操作,索引: {}, 文档数量: {}", indexName, documents.size());

ESBulkResult result = new ESBulkResult();
result.setIndexName(indexName);
result.setOperationType("INDEX");
result.setTotalCount(documents.size());
result.setStartTime(System.currentTimeMillis());

try {
// 获取批量操作策略
ESBulkStrategy strategy = esBulkOptimizeService.getStrategy("INDEX", documents.size());

// 执行批量索引
result = executeBulkIndex(indexName, documents, strategy);

// 记录批量操作指标
esBulkMonitorService.recordBulkOperation(indexName, "INDEX", documents.size(), result.getSuccessCount());

logger.info("批量索引操作完成,索引: {}, 成功: {}, 失败: {}, 总耗时: {}ms",
indexName, result.getSuccessCount(), result.getFailureCount(), result.getDuration());

return result;

} catch (Exception e) {
logger.error("批量索引操作异常,索引: {}", indexName, e);
result.setSuccess(false);
result.setError("批量索引操作异常: " + e.getMessage());
result.setEndTime(System.currentTimeMillis());
return result;
}
}

/**
* 批量更新操作
* @param indexName 索引名称
* @param updateDocuments 更新文档列表
* @return 批量操作结果
*/
public ESBulkResult bulkUpdate(String indexName, List<Map<String, Object>> updateDocuments) {
logger.info("开始批量更新操作,索引: {}, 文档数量: {}", indexName, updateDocuments.size());

ESBulkResult result = new ESBulkResult();
result.setIndexName(indexName);
result.setOperationType("UPDATE");
result.setTotalCount(updateDocuments.size());
result.setStartTime(System.currentTimeMillis());

try {
// 获取批量操作策略
ESBulkStrategy strategy = esBulkOptimizeService.getStrategy("UPDATE", updateDocuments.size());

// 执行批量更新
result = executeBulkUpdate(indexName, updateDocuments, strategy);

// 记录批量操作指标
esBulkMonitorService.recordBulkOperation(indexName, "UPDATE", updateDocuments.size(), result.getSuccessCount());

logger.info("批量更新操作完成,索引: {}, 成功: {}, 失败: {}, 总耗时: {}ms",
indexName, result.getSuccessCount(), result.getFailureCount(), result.getDuration());

return result;

} catch (Exception e) {
logger.error("批量更新操作异常,索引: {}", indexName, e);
result.setSuccess(false);
result.setError("批量更新操作异常: " + e.getMessage());
result.setEndTime(System.currentTimeMillis());
return result;
}
}

/**
* 批量删除操作
* @param indexName 索引名称
* @param documentIds 文档ID列表
* @return 批量操作结果
*/
public ESBulkResult bulkDelete(String indexName, List<String> documentIds) {
logger.info("开始批量删除操作,索引: {}, 文档ID数量: {}", indexName, documentIds.size());

ESBulkResult result = new ESBulkResult();
result.setIndexName(indexName);
result.setOperationType("DELETE");
result.setTotalCount(documentIds.size());
result.setStartTime(System.currentTimeMillis());

try {
// 获取批量操作策略
ESBulkStrategy strategy = esBulkOptimizeService.getStrategy("DELETE", documentIds.size());

// 执行批量删除
result = executeBulkDelete(indexName, documentIds, strategy);

// 记录批量操作指标
esBulkMonitorService.recordBulkOperation(indexName, "DELETE", documentIds.size(), result.getSuccessCount());

logger.info("批量删除操作完成,索引: {}, 成功: {}, 失败: {}, 总耗时: {}ms",
indexName, result.getSuccessCount(), result.getFailureCount(), result.getDuration());

return result;

} catch (Exception e) {
logger.error("批量删除操作异常,索引: {}", indexName, e);
result.setSuccess(false);
result.setError("批量删除操作异常: " + e.getMessage());
result.setEndTime(System.currentTimeMillis());
return result;
}
}

/**
* 批量创建操作
* @param indexName 索引名称
* @param documents 文档列表
* @return 批量操作结果
*/
public ESBulkResult bulkCreate(String indexName, List<Map<String, Object>> documents) {
logger.info("开始批量创建操作,索引: {}, 文档数量: {}", indexName, documents.size());

ESBulkResult result = new ESBulkResult();
result.setIndexName(indexName);
result.setOperationType("CREATE");
result.setTotalCount(documents.size());
result.setStartTime(System.currentTimeMillis());

try {
// 获取批量操作策略
ESBulkStrategy strategy = esBulkOptimizeService.getStrategy("CREATE", documents.size());

// 执行批量创建
result = executeBulkCreate(indexName, documents, strategy);

// 记录批量操作指标
esBulkMonitorService.recordBulkOperation(indexName, "CREATE", documents.size(), result.getSuccessCount());

logger.info("批量创建操作完成,索引: {}, 成功: {}, 失败: {}, 总耗时: {}ms",
indexName, result.getSuccessCount(), result.getFailureCount(), result.getDuration());

return result;

} catch (Exception e) {
logger.error("批量创建操作异常,索引: {}", indexName, e);
result.setSuccess(false);
result.setError("批量创建操作异常: " + e.getMessage());
result.setEndTime(System.currentTimeMillis());
return result;
}
}

/**
* 混合批量操作
* @param indexName 索引名称
* @param bulkOperations 批量操作列表
* @return 批量操作结果
*/
public ESBulkResult bulkMixed(String indexName, List<ESBulkOperation> bulkOperations) {
logger.info("开始混合批量操作,索引: {}, 操作数量: {}", indexName, bulkOperations.size());

ESBulkResult result = new ESBulkResult();
result.setIndexName(indexName);
result.setOperationType("MIXED");
result.setTotalCount(bulkOperations.size());
result.setStartTime(System.currentTimeMillis());

try {
// 获取批量操作策略
ESBulkStrategy strategy = esBulkOptimizeService.getStrategy("MIXED", bulkOperations.size());

// 执行混合批量操作
result = executeBulkMixed(indexName, bulkOperations, strategy);

// 记录批量操作指标
esBulkMonitorService.recordBulkOperation(indexName, "MIXED", bulkOperations.size(), result.getSuccessCount());

logger.info("混合批量操作完成,索引: {}, 成功: {}, 失败: {}, 总耗时: {}ms",
indexName, result.getSuccessCount(), result.getFailureCount(), result.getDuration());

return result;

} catch (Exception e) {
logger.error("混合批量操作异常,索引: {}", indexName, e);
result.setSuccess(false);
result.setError("混合批量操作异常: " + e.getMessage());
result.setEndTime(System.currentTimeMillis());
return result;
}
}

/**
* 执行批量索引
* @param indexName 索引名称
* @param documents 文档列表
* @param strategy 策略
* @return 批量操作结果
*/
private ESBulkResult executeBulkIndex(String indexName, List<Map<String, Object>> documents, ESBulkStrategy strategy) {
ESBulkResult result = new ESBulkResult();
result.setIndexName(indexName);
result.setOperationType("INDEX");
result.setTotalCount(documents.size());
result.setStartTime(System.currentTimeMillis());

try {
// 构建批量请求
BulkRequest bulkRequest = new BulkRequest();
bulkRequest.timeout(TimeValue.timeValueMillis(properties.getBulkTimeoutMs()));

for (Map<String, Object> document : documents) {
String documentId = (String) document.get("id");
document.remove("id"); // 移除id字段,避免重复

IndexRequest indexRequest = new IndexRequest(indexName)
.id(documentId)
.source(document);

bulkRequest.add(indexRequest);
}

// 执行批量请求
BulkResponse bulkResponse = elasticsearchClient.bulk(bulkRequest, RequestOptions.DEFAULT);

// 处理响应
result = processBulkResponse(bulkResponse, result);

result.setEndTime(System.currentTimeMillis());
return result;

} catch (Exception e) {
logger.error("执行批量索引异常,索引: {}", indexName, e);
result.setSuccess(false);
result.setError("执行批量索引异常: " + e.getMessage());
result.setEndTime(System.currentTimeMillis());
return result;
}
}

/**
* 执行批量更新
* @param indexName 索引名称
* @param updateDocuments 更新文档列表
* @param strategy 策略
* @return 批量操作结果
*/
private ESBulkResult executeBulkUpdate(String indexName, List<Map<String, Object>> updateDocuments, ESBulkStrategy strategy) {
ESBulkResult result = new ESBulkResult();
result.setIndexName(indexName);
result.setOperationType("UPDATE");
result.setTotalCount(updateDocuments.size());
result.setStartTime(System.currentTimeMillis());

try {
// 构建批量请求
BulkRequest bulkRequest = new BulkRequest();
bulkRequest.timeout(TimeValue.timeValueMillis(properties.getBulkTimeoutMs()));

for (Map<String, Object> updateDocument : updateDocuments) {
String documentId = (String) updateDocument.get("id");
updateDocument.remove("id"); // 移除id字段

UpdateRequest updateRequest = new UpdateRequest(indexName, documentId)
.doc(updateDocument);

bulkRequest.add(updateRequest);
}

// 执行批量请求
BulkResponse bulkResponse = elasticsearchClient.bulk(bulkRequest, RequestOptions.DEFAULT);

// 处理响应
result = processBulkResponse(bulkResponse, result);

result.setEndTime(System.currentTimeMillis());
return result;

} catch (Exception e) {
logger.error("执行批量更新异常,索引: {}", indexName, e);
result.setSuccess(false);
result.setError("执行批量更新异常: " + e.getMessage());
result.setEndTime(System.currentTimeMillis());
return result;
}
}

/**
* 执行批量删除
* @param indexName 索引名称
* @param documentIds 文档ID列表
* @param strategy 策略
* @return 批量操作结果
*/
private ESBulkResult executeBulkDelete(String indexName, List<String> documentIds, ESBulkStrategy strategy) {
ESBulkResult result = new ESBulkResult();
result.setIndexName(indexName);
result.setOperationType("DELETE");
result.setTotalCount(documentIds.size());
result.setStartTime(System.currentTimeMillis());

try {
// 构建批量请求
BulkRequest bulkRequest = new BulkRequest();
bulkRequest.timeout(TimeValue.timeValueMillis(properties.getBulkTimeoutMs()));

for (String documentId : documentIds) {
DeleteRequest deleteRequest = new DeleteRequest(indexName, documentId);
bulkRequest.add(deleteRequest);
}

// 执行批量请求
BulkResponse bulkResponse = elasticsearchClient.bulk(bulkRequest, RequestOptions.DEFAULT);

// 处理响应
result = processBulkResponse(bulkResponse, result);

result.setEndTime(System.currentTimeMillis());
return result;

} catch (Exception e) {
logger.error("执行批量删除异常,索引: {}", indexName, e);
result.setSuccess(false);
result.setError("执行批量删除异常: " + e.getMessage());
result.setEndTime(System.currentTimeMillis());
return result;
}
}

/**
* 执行批量创建
* @param indexName 索引名称
* @param documents 文档列表
* @param strategy 策略
* @return 批量操作结果
*/
private ESBulkResult executeBulkCreate(String indexName, List<Map<String, Object>> documents, ESBulkStrategy strategy) {
ESBulkResult result = new ESBulkResult();
result.setIndexName(indexName);
result.setOperationType("CREATE");
result.setTotalCount(documents.size());
result.setStartTime(System.currentTimeMillis());

try {
// 构建批量请求
BulkRequest bulkRequest = new BulkRequest();
bulkRequest.timeout(TimeValue.timeValueMillis(properties.getBulkTimeoutMs()));

for (Map<String, Object> document : documents) {
String documentId = (String) document.get("id");
document.remove("id"); // 移除id字段

IndexRequest indexRequest = new IndexRequest(indexName)
.id(documentId)
.source(document)
.opType(DocWriteRequest.OpType.CREATE); // 设置为CREATE操作

bulkRequest.add(indexRequest);
}

// 执行批量请求
BulkResponse bulkResponse = elasticsearchClient.bulk(bulkRequest, RequestOptions.DEFAULT);

// 处理响应
result = processBulkResponse(bulkResponse, result);

result.setEndTime(System.currentTimeMillis());
return result;

} catch (Exception e) {
logger.error("执行批量创建异常,索引: {}", indexName, e);
result.setSuccess(false);
result.setError("执行批量创建异常: " + e.getMessage());
result.setEndTime(System.currentTimeMillis());
return result;
}
}

/**
* 执行混合批量操作
* @param indexName 索引名称
* @param bulkOperations 批量操作列表
* @param strategy 策略
* @return 批量操作结果
*/
private ESBulkResult executeBulkMixed(String indexName, List<ESBulkOperation> bulkOperations, ESBulkStrategy strategy) {
ESBulkResult result = new ESBulkResult();
result.setIndexName(indexName);
result.setOperationType("MIXED");
result.setTotalCount(bulkOperations.size());
result.setStartTime(System.currentTimeMillis());

try {
// 构建批量请求
BulkRequest bulkRequest = new BulkRequest();
bulkRequest.timeout(TimeValue.timeValueMillis(properties.getBulkTimeoutMs()));

for (ESBulkOperation operation : bulkOperations) {
switch (operation.getType()) {
case "INDEX":
IndexRequest indexRequest = new IndexRequest(indexName)
.id(operation.getDocumentId())
.source(operation.getDocument());
bulkRequest.add(indexRequest);
break;
case "UPDATE":
UpdateRequest updateRequest = new UpdateRequest(indexName, operation.getDocumentId())
.doc(operation.getDocument());
bulkRequest.add(updateRequest);
break;
case "DELETE":
DeleteRequest deleteRequest = new DeleteRequest(indexName, operation.getDocumentId());
bulkRequest.add(deleteRequest);
break;
case "CREATE":
IndexRequest createRequest = new IndexRequest(indexName)
.id(operation.getDocumentId())
.source(operation.getDocument())
.opType(DocWriteRequest.OpType.CREATE);
bulkRequest.add(createRequest);
break;
}
}

// 执行批量请求
BulkResponse bulkResponse = elasticsearchClient.bulk(bulkRequest, RequestOptions.DEFAULT);

// 处理响应
result = processBulkResponse(bulkResponse, result);

result.setEndTime(System.currentTimeMillis());
return result;

} catch (Exception e) {
logger.error("执行混合批量操作异常,索引: {}", indexName, e);
result.setSuccess(false);
result.setError("执行混合批量操作异常: " + e.getMessage());
result.setEndTime(System.currentTimeMillis());
return result;
}
}

/**
* 处理批量响应
* @param bulkResponse 批量响应
* @param result 结果对象
* @return 处理后的结果
*/
private ESBulkResult processBulkResponse(BulkResponse bulkResponse, ESBulkResult result) {
int successCount = 0;
int failureCount = 0;
List<String> errors = new ArrayList<>();

for (BulkItemResponse itemResponse : bulkResponse.getItems()) {
if (itemResponse.isFailed()) {
failureCount++;
errors.add(itemResponse.getFailureMessage());
} else {
successCount++;
}
}

result.setSuccessCount(successCount);
result.setFailureCount(failureCount);
result.setSuccess(successCount > 0);
result.setErrors(errors);

return result;
}
}

2.4 ES批量操作结果类

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
/**
* ES批量操作结果类
* @author 运维实战
*/
@Data
public class ESBulkResult {

private boolean success;
private String indexName;
private String operationType;
private int totalCount;
private int successCount;
private int failureCount;
private List<String> errors;
private String error;
private long startTime;
private long endTime;

public ESBulkResult() {
this.success = false;
this.successCount = 0;
this.failureCount = 0;
this.errors = new ArrayList<>();
}

/**
* 获取处理耗时
* @return 处理耗时(毫秒)
*/
public long getDuration() {
return endTime - startTime;
}

/**
* 获取成功率
* @return 成功率
*/
public double getSuccessRate() {
if (totalCount == 0) return 0.0;
return (double) successCount / totalCount * 100;
}

/**
* 获取失败率
* @return 失败率
*/
public double getFailureRate() {
if (totalCount == 0) return 0.0;
return (double) failureCount / totalCount * 100;
}

/**
* 是否全部成功
* @return 是否全部成功
*/
public boolean isAllSuccess() {
return failureCount == 0;
}
}

2.5 ES批量操作策略类

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
/**
* ES批量操作策略类
* @author 运维实战
*/
@Data
public class ESBulkStrategy {

private String strategy;
private String description;
private int operationCount;
private int recommendedBatchSize;
private long estimatedDuration;
private long timestamp;

public ESBulkStrategy() {
this.timestamp = System.currentTimeMillis();
}

/**
* 获取性能提升率
* @return 性能提升率
*/
public double getPerformanceImprovementRate() {
if (operationCount == 0) return 0.0;
return (double) (operationCount - 1) / operationCount * 100;
}
}

2.6 ES批量操作类

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
/**
* ES批量操作类
* @author 运维实战
*/
@Data
public class ESBulkOperation {

private String type;
private String documentId;
private Map<String, Object> document;

public ESBulkOperation() {}

public ESBulkOperation(String type, String documentId, Map<String, Object> document) {
this.type = type;
this.documentId = documentId;
this.document = document;
}
}

3. 高级功能实现

3.1 ES批量(Bulk)监控服务

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
/**
* ES批量(Bulk)监控服务
* @author 运维实战
*/
@Service
public class ESBulkMonitorService {

private final AtomicLong totalBulkOperations = new AtomicLong(0);
private final AtomicLong totalSuccessfulOperations = new AtomicLong(0);
private final AtomicLong totalFailedOperations = new AtomicLong(0);
private final AtomicLong totalDocumentsProcessed = new AtomicLong(0);

private long lastResetTime = System.currentTimeMillis();
private final long resetInterval = 300000; // 5分钟重置一次

private static final Logger logger = LoggerFactory.getLogger(ESBulkMonitorService.class);

/**
* 记录批量操作
* @param indexName 索引名称
* @param operationType 操作类型
* @param documentCount 文档数量
* @param successCount 成功数量
*/
public void recordBulkOperation(String indexName, String operationType, int documentCount, int successCount) {
totalBulkOperations.incrementAndGet();
totalDocumentsProcessed.addAndGet(documentCount);
totalSuccessfulOperations.addAndGet(successCount);
totalFailedOperations.addAndGet(documentCount - successCount);

logger.debug("记录批量操作: 索引={}, 操作类型={}, 文档数量={}, 成功数量={}",
indexName, operationType, documentCount, successCount);
}

/**
* 获取监控指标
* @return 监控指标
*/
public ESBulkMetrics getMetrics() {
// 检查是否需要重置
if (System.currentTimeMillis() - lastResetTime > resetInterval) {
resetMetrics();
}

ESBulkMetrics metrics = new ESBulkMetrics();
metrics.setTotalBulkOperations(totalBulkOperations.get());
metrics.setTotalSuccessfulOperations(totalSuccessfulOperations.get());
metrics.setTotalFailedOperations(totalFailedOperations.get());
metrics.setTotalDocumentsProcessed(totalDocumentsProcessed.get());
metrics.setTimestamp(System.currentTimeMillis());

return metrics;
}

/**
* 重置指标
*/
private void resetMetrics() {
totalBulkOperations.set(0);
totalSuccessfulOperations.set(0);
totalFailedOperations.set(0);
totalDocumentsProcessed.set(0);
lastResetTime = System.currentTimeMillis();

logger.info("ES批量操作监控指标重置");
}

/**
* 定期监控ES批量操作状态
*/
@Scheduled(fixedRate = 30000) // 每30秒监控一次
public void monitorESBulkStatus() {
try {
ESBulkMetrics metrics = getMetrics();

logger.info("ES批量操作监控: 总操作次数={}, 成功操作={}, 失败操作={}, 总文档数={}, 成功率={}%, 平均每批次文档数={}",
metrics.getTotalBulkOperations(), metrics.getTotalSuccessfulOperations(),
metrics.getTotalFailedOperations(), metrics.getTotalDocumentsProcessed(),
String.format("%.2f", metrics.getSuccessRate()),
String.format("%.2f", metrics.getAverageDocumentsPerBatch()));

// 检查异常情况
if (metrics.getSuccessRate() < 90) {
logger.warn("ES批量操作成功率过低: {}%", String.format("%.2f", metrics.getSuccessRate()));
}

if (metrics.getAverageDocumentsPerBatch() > 2000) {
logger.warn("ES批量操作平均批次大小过大: {}", String.format("%.2f", metrics.getAverageDocumentsPerBatch()));
}

} catch (Exception e) {
logger.error("ES批量操作状态监控失败", e);
}
}
}

3.2 ES批量操作指标类

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
/**
* ES批量操作指标类
* @author 运维实战
*/
@Data
public class ESBulkMetrics {

private long totalBulkOperations;
private long totalSuccessfulOperations;
private long totalFailedOperations;
private long totalDocumentsProcessed;
private long timestamp;

public ESBulkMetrics() {
this.timestamp = System.currentTimeMillis();
}

/**
* 获取成功率
* @return 成功率
*/
public double getSuccessRate() {
if (totalDocumentsProcessed == 0) return 0.0;
return (double) totalSuccessfulOperations / totalDocumentsProcessed * 100;
}

/**
* 获取失败率
* @return 失败率
*/
public double getFailureRate() {
if (totalDocumentsProcessed == 0) return 0.0;
return (double) totalFailedOperations / totalDocumentsProcessed * 100;
}

/**
* 获取平均每批次文档数
* @return 平均每批次文档数
*/
public double getAverageDocumentsPerBatch() {
if (totalBulkOperations == 0) return 0.0;
return (double) totalDocumentsProcessed / totalBulkOperations;
}

/**
* 是否健康
* @return 是否健康
*/
public boolean isHealthy() {
return getSuccessRate() > 90 && getAverageDocumentsPerBatch() <= 2000;
}
}

3.3 ES批量(Bulk)优化服务

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
/**
* ES批量(Bulk)优化服务
* @author 运维实战
*/
@Service
public class ESBulkOptimizeService {

@Autowired
private ESBulkProperties properties;

private static final Logger logger = LoggerFactory.getLogger(ESBulkOptimizeService.class);

/**
* 获取批量操作策略
* @param operationType 操作类型
* @param operationCount 操作数量
* @return 批量操作策略
*/
public ESBulkStrategy getStrategy(String operationType, int operationCount) {
logger.info("获取ES批量操作策略,操作类型: {}, 操作数量: {}", operationType, operationCount);

ESBulkStrategy strategy = new ESBulkStrategy();
strategy.setOperationCount(operationCount);
strategy.setTimestamp(System.currentTimeMillis());

// 根据操作类型和数量选择策略
if (operationCount <= 100) {
strategy.setStrategy("SMALL_BATCH");
strategy.setDescription("小批量操作,性能提升有限");
strategy.setRecommendedBatchSize(operationCount);
strategy.setEstimatedDuration(operationCount * 50);
} else if (operationCount <= 500) {
strategy.setStrategy("MEDIUM_BATCH");
strategy.setDescription("中批量操作,性能提升明显");
strategy.setRecommendedBatchSize(Math.min(500, operationCount));
strategy.setEstimatedDuration(operationCount * 30);
} else if (operationCount <= 1000) {
strategy.setStrategy("LARGE_BATCH");
strategy.setDescription("大批量操作,性能提升显著");
strategy.setRecommendedBatchSize(Math.min(1000, operationCount));
strategy.setEstimatedDuration(operationCount * 20);
} else {
strategy.setStrategy("HUGE_BATCH");
strategy.setDescription("超大批量操作,性能提升最大");
strategy.setRecommendedBatchSize(Math.min(properties.getMaxBatchSize(), operationCount));
strategy.setEstimatedDuration(operationCount * 15);
}

logger.info("ES批量操作策略确定: 策略={}, 操作数量={}, 推荐批次大小={}, 性能提升率={}%",
strategy.getStrategy(), operationCount, strategy.getRecommendedBatchSize(),
String.format("%.2f", strategy.getPerformanceImprovementRate()));

return strategy;
}

/**
* 计算最优批量大小
* @param totalDocumentCount 总文档数量
* @return 最优批量大小
*/
public int calculateOptimalBatchSize(int totalDocumentCount) {
if (totalDocumentCount <= 100) {
return totalDocumentCount;
} else if (totalDocumentCount <= 1000) {
return Math.min(500, totalDocumentCount);
} else if (totalDocumentCount <= 10000) {
return Math.min(1000, totalDocumentCount);
} else {
return Math.min(properties.getMaxBatchSize(), totalDocumentCount);
}
}

/**
* 计算预计处理时间
* @param documentCount 文档数量
* @param batchSize 批次大小
* @return 预计处理时间(毫秒)
*/
public long calculateEstimatedProcessingTime(int documentCount, int batchSize) {
int batchCount = (int) Math.ceil((double) documentCount / batchSize);
return batchCount * 1000; // 每批次约1秒
}

/**
* 获取批量操作优化建议
* @param operationType 操作类型
* @param documentCount 文档数量
* @return 优化建议
*/
public ESBulkOptimizationAdvice getOptimizationAdvice(String operationType, int documentCount) {
ESBulkOptimizationAdvice advice = new ESBulkOptimizationAdvice();
advice.setOperationType(operationType);
advice.setDocumentCount(documentCount);
advice.setTimestamp(System.currentTimeMillis());

if (documentCount <= 100) {
advice.setAdvice("SMALL_BATCH");
advice.setDescription("小批量操作,建议直接执行");
advice.setRecommendedBatchSize(documentCount);
} else if (documentCount <= 1000) {
advice.setAdvice("MEDIUM_BATCH");
advice.setDescription("中批量操作,建议分批执行");
advice.setRecommendedBatchSize(500);
} else if (documentCount <= 10000) {
advice.setAdvice("LARGE_BATCH");
advice.setDescription("大批量操作,强烈建议分批执行");
advice.setRecommendedBatchSize(1000);
} else {
advice.setAdvice("HUGE_BATCH");
advice.setDescription("超大批量操作,必须分批执行");
advice.setRecommendedBatchSize(properties.getMaxBatchSize());
}

return advice;
}
}

3.4 ES批量操作优化建议类

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
/**
* ES批量操作优化建议类
* @author 运维实战
*/
@Data
public class ESBulkOptimizationAdvice {

private String operationType;
private int documentCount;
private String advice;
private String description;
private int recommendedBatchSize;
private long timestamp;

public ESBulkOptimizationAdvice() {
this.timestamp = System.currentTimeMillis();
}

/**
* 获取预计批次数量
* @return 预计批次数量
*/
public int getEstimatedBatchCount() {
return (int) Math.ceil((double) documentCount / recommendedBatchSize);
}

/**
* 获取预计处理时间(毫秒)
* @return 预计处理时间
*/
public long getEstimatedProcessingTime() {
return getEstimatedBatchCount() * 1000; // 每批次约1秒
}

/**
* 获取性能提升率
* @return 性能提升率
*/
public double getPerformanceImprovementRate() {
if (documentCount == 0) return 0.0;
return (double) (documentCount - getEstimatedBatchCount()) / documentCount * 100;
}
}

4. ES批量(Bulk)控制器

4.1 ES批量(Bulk)REST控制器

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
/**
* ES批量(Bulk)REST控制器
* @author 运维实战
*/
@RestController
@RequestMapping("/api/elasticsearch/bulk")
public class ESBulkController {

@Autowired
private ESBulkService esBulkService;

@Autowired
private ESBulkMonitorService esBulkMonitorService;

@Autowired
private ESBulkOptimizeService esBulkOptimizeService;

private static final Logger logger = LoggerFactory.getLogger(ESBulkController.class);

/**
* 批量索引操作
* @param request 批量索引请求
* @return 批量操作结果
*/
@PostMapping("/index")
public ResponseEntity<ESBulkResult> bulkIndex(@RequestBody ESBulkIndexRequest request) {
try {
logger.info("接收到批量索引请求,索引: {}, 文档数量: {}", request.getIndexName(), request.getDocuments().size());

ESBulkResult result = esBulkService.bulkIndex(request.getIndexName(), request.getDocuments());

return ResponseEntity.ok(result);

} catch (Exception e) {
logger.error("批量索引操作失败", e);
return ResponseEntity.status(HttpStatus.INTERNAL_SERVER_ERROR).build();
}
}

/**
* 批量更新操作
* @param request 批量更新请求
* @return 批量操作结果
*/
@PostMapping("/update")
public ResponseEntity<ESBulkResult> bulkUpdate(@RequestBody ESBulkUpdateRequest request) {
try {
logger.info("接收到批量更新请求,索引: {}, 文档数量: {}", request.getIndexName(), request.getDocuments().size());

ESBulkResult result = esBulkService.bulkUpdate(request.getIndexName(), request.getDocuments());

return ResponseEntity.ok(result);

} catch (Exception e) {
logger.error("批量更新操作失败", e);
return ResponseEntity.status(HttpStatus.INTERNAL_SERVER_ERROR).build();
}
}

/**
* 批量删除操作
* @param request 批量删除请求
* @return 批量操作结果
*/
@PostMapping("/delete")
public ResponseEntity<ESBulkResult> bulkDelete(@RequestBody ESBulkDeleteRequest request) {
try {
logger.info("接收到批量删除请求,索引: {}, 文档ID数量: {}", request.getIndexName(), request.getDocumentIds().size());

ESBulkResult result = esBulkService.bulkDelete(request.getIndexName(), request.getDocumentIds());

return ResponseEntity.ok(result);

} catch (Exception e) {
logger.error("批量删除操作失败", e);
return ResponseEntity.status(HttpStatus.INTERNAL_SERVER_ERROR).build();
}
}

/**
* 批量创建操作
* @param request 批量创建请求
* @return 批量操作结果
*/
@PostMapping("/create")
public ResponseEntity<ESBulkResult> bulkCreate(@RequestBody ESBulkCreateRequest request) {
try {
logger.info("接收到批量创建请求,索引: {}, 文档数量: {}", request.getIndexName(), request.getDocuments().size());

ESBulkResult result = esBulkService.bulkCreate(request.getIndexName(), request.getDocuments());

return ResponseEntity.ok(result);

} catch (Exception e) {
logger.error("批量创建操作失败", e);
return ResponseEntity.status(HttpStatus.INTERNAL_SERVER_ERROR).build();
}
}

/**
* 混合批量操作
* @param request 混合批量请求
* @return 批量操作结果
*/
@PostMapping("/mixed")
public ResponseEntity<ESBulkResult> bulkMixed(@RequestBody ESBulkMixedRequest request) {
try {
logger.info("接收到混合批量请求,索引: {}, 操作数量: {}", request.getIndexName(), request.getOperations().size());

ESBulkResult result = esBulkService.bulkMixed(request.getIndexName(), request.getOperations());

return ResponseEntity.ok(result);

} catch (Exception e) {
logger.error("混合批量操作失败", e);
return ResponseEntity.status(HttpStatus.INTERNAL_SERVER_ERROR).build();
}
}

/**
* 获取批量操作策略
* @param operationType 操作类型
* @param operationCount 操作数量
* @return 批量操作策略
*/
@GetMapping("/strategy/{operationType}/{operationCount}")
public ResponseEntity<ESBulkStrategy> getBulkStrategy(
@PathVariable String operationType, @PathVariable int operationCount) {
try {
ESBulkStrategy strategy = esBulkOptimizeService.getStrategy(operationType, operationCount);
return ResponseEntity.ok(strategy);
} catch (Exception e) {
logger.error("获取批量操作策略失败", e);
return ResponseEntity.status(HttpStatus.INTERNAL_SERVER_ERROR).build();
}
}

/**
* 获取批量操作优化建议
* @param operationType 操作类型
* @param documentCount 文档数量
* @return 优化建议
*/
@GetMapping("/advice/{operationType}/{documentCount}")
public ResponseEntity<ESBulkOptimizationAdvice> getBulkOptimizationAdvice(
@PathVariable String operationType, @PathVariable int documentCount) {
try {
ESBulkOptimizationAdvice advice = esBulkOptimizeService.getOptimizationAdvice(operationType, documentCount);
return ResponseEntity.ok(advice);
} catch (Exception e) {
logger.error("获取批量操作优化建议失败", e);
return ResponseEntity.status(HttpStatus.INTERNAL_SERVER_ERROR).build();
}
}

/**
* 获取批量操作监控指标
* @return 监控指标
*/
@GetMapping("/metrics")
public ResponseEntity<ESBulkMetrics> getBulkMetrics() {
try {
ESBulkMetrics metrics = esBulkMonitorService.getMetrics();
return ResponseEntity.ok(metrics);
} catch (Exception e) {
logger.error("获取批量操作监控指标失败", e);
return ResponseEntity.status(HttpStatus.INTERNAL_SERVER_ERROR).build();
}
}
}

4.2 请求类定义

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
/**
* ES批量索引请求类
* @author 运维实战
*/
@Data
public class ESBulkIndexRequest {

private String indexName;
private List<Map<String, Object>> documents;

public ESBulkIndexRequest() {}

public ESBulkIndexRequest(String indexName, List<Map<String, Object>> documents) {
this.indexName = indexName;
this.documents = documents;
}
}

/**
* ES批量更新请求类
* @author 运维实战
*/
@Data
public class ESBulkUpdateRequest {

private String indexName;
private List<Map<String, Object>> documents;

public ESBulkUpdateRequest() {}

public ESBulkUpdateRequest(String indexName, List<Map<String, Object>> documents) {
this.indexName = indexName;
this.documents = documents;
}
}

/**
* ES批量删除请求类
* @author 运维实战
*/
@Data
public class ESBulkDeleteRequest {

private String indexName;
private List<String> documentIds;

public ESBulkDeleteRequest() {}

public ESBulkDeleteRequest(String indexName, List<String> documentIds) {
this.indexName = indexName;
this.documentIds = documentIds;
}
}

/**
* ES批量创建请求类
* @author 运维实战
*/
@Data
public class ESBulkCreateRequest {

private String indexName;
private List<Map<String, Object>> documents;

public ESBulkCreateRequest() {}

public ESBulkCreateRequest(String indexName, List<Map<String, Object>> documents) {
this.indexName = indexName;
this.documents = documents;
}
}

/**
* ES混合批量请求类
* @author 运维实战
*/
@Data
public class ESBulkMixedRequest {

private String indexName;
private List<ESBulkOperation> operations;

public ESBulkMixedRequest() {}

public ESBulkMixedRequest(String indexName, List<ESBulkOperation> operations) {
this.indexName = indexName;
this.operations = operations;
}
}

5. ES批量(Bulk)注解和AOP

5.1 ES批量(Bulk)注解

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
/**
* ES批量(Bulk)注解
* @author 运维实战
*/
@Target({ElementType.METHOD, ElementType.TYPE})
@Retention(RetentionPolicy.RUNTIME)
@Documented
public @interface ESBulk {

/**
* 索引名称
*/
String indexName() default "";

/**
* 操作类型
*/
String operationType() default "INDEX";

/**
* 最大批量大小
*/
int maxBatchSize() default 1000;

/**
* 是否启用批量操作
*/
boolean enableBulk() default true;

/**
* 是否启用并发批量操作
*/
boolean enableConcurrentBulk() default true;

/**
* 并发批量操作线程数
*/
int concurrentBulkThreads() default 5;

/**
* 批量操作超时时间(毫秒)
*/
long bulkTimeoutMs() default 30000;

/**
* 操作失败时的消息
*/
String message() default "ES批量操作失败,请稍后重试";

/**
* 操作失败时的HTTP状态码
*/
int statusCode() default 500;
}

5.2 ES批量(Bulk)AOP切面

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
/**
* ES批量(Bulk)AOP切面
* @author 运维实战
*/
@Aspect
@Component
public class ESBulkAspect {

@Autowired
private ESBulkService esBulkService;

@Autowired
private ESBulkMonitorService esBulkMonitorService;

@Autowired
private ESBulkOptimizeService esBulkOptimizeService;

private static final Logger logger = LoggerFactory.getLogger(ESBulkAspect.class);

/**
* ES批量操作切点
*/
@Pointcut("@annotation(esBulk)")
public void esBulkPointcut(ESBulk esBulk) {}

/**
* ES批量操作环绕通知
* @param joinPoint 连接点
* @param esBulk ES批量注解
* @return 执行结果
* @throws Throwable 异常
*/
@Around("esBulkPointcut(esBulk)")
public Object around(ProceedingJoinPoint joinPoint, ESBulk esBulk) throws Throwable {
String methodName = joinPoint.getSignature().getName();

try {
// 获取方法参数
Object[] args = joinPoint.getArgs();

// 查找数据参数
List<Map<String, Object>> documents = findDocuments(args);
String indexName = esBulk.indexName();
String operationType = esBulk.operationType();

if (documents != null && !documents.isEmpty()) {
// 获取批量操作策略
ESBulkStrategy strategy = esBulkOptimizeService.getStrategy(operationType, documents.size());

logger.info("ES批量操作开始: method={}, indexName={}, operationType={}, strategy={}",
methodName, indexName, operationType, strategy.getStrategy());

// 记录批量操作指标
esBulkMonitorService.recordBulkOperation(indexName, operationType, documents.size(), documents.size());
}

// 执行原方法
return joinPoint.proceed();

} catch (Exception e) {
logger.error("ES批量操作处理异常: method={}", methodName, e);
throw new ESBulkException(esBulk.message(), esBulk.statusCode());
}
}

/**
* 查找文档参数
* @param args 方法参数
* @return 文档列表
*/
private List<Map<String, Object>> findDocuments(Object[] args) {
for (Object arg : args) {
if (arg instanceof List) {
@SuppressWarnings("unchecked")
List<Map<String, Object>> list = (List<Map<String, Object>>) arg;
return list;
}
}
return null;
}
}

5.3 ES批量(Bulk)异常类

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
/**
* ES批量(Bulk)异常类
* @author 运维实战
*/
public class ESBulkException extends RuntimeException {

private final int statusCode;

public ESBulkException(String message) {
super(message);
this.statusCode = 500;
}

public ESBulkException(String message, int statusCode) {
super(message);
this.statusCode = statusCode;
}

public ESBulkException(String message, Throwable cause) {
super(message, cause);
this.statusCode = 500;
}

public ESBulkException(String message, Throwable cause, int statusCode) {
super(message, cause);
this.statusCode = statusCode;
}

public int getStatusCode() {
return statusCode;
}
}

5.4 ES批量(Bulk)异常处理器

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
/**
* ES批量(Bulk)异常处理器
* @author 运维实战
*/
@ControllerAdvice
public class ESBulkExceptionHandler {

private static final Logger logger = LoggerFactory.getLogger(ESBulkExceptionHandler.class);

/**
* 处理ES批量操作异常
* @param e 异常
* @return 错误响应
*/
@ExceptionHandler(ESBulkException.class)
public ResponseEntity<Map<String, Object>> handleESBulkException(ESBulkException e) {
logger.warn("ES批量操作异常: {}", e.getMessage());

Map<String, Object> response = new HashMap<>();
response.put("error", "ES_BULK_OPERATION_FAILED");
response.put("message", e.getMessage());
response.put("timestamp", System.currentTimeMillis());

return ResponseEntity.status(e.getStatusCode()).body(response);
}
}

6. 实际应用示例

6.1 使用ES批量(Bulk)注解的服务

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
/**
* 使用ES批量(Bulk)注解的服务
* @author 运维实战
*/
@Service
public class ESBulkExampleService {

private static final Logger logger = LoggerFactory.getLogger(ESBulkExampleService.class);

/**
* 基础批量索引示例
* @param documents 文档列表
* @return 处理结果
*/
@ESBulk(indexName = "user_data", operationType = "INDEX", maxBatchSize = 500, enableBulk = true,
message = "基础批量索引:操作失败")
public String basicBulkIndex(List<Map<String, Object>> documents) {
logger.info("执行基础批量索引示例,文档数量: {}", documents.size());

// 模拟ES批量索引操作
try {
Thread.sleep(documents.size() * 10);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}

return "基础批量索引完成,文档数量: " + documents.size();
}

/**
* 大批量索引示例
* @param documents 文档列表
* @return 处理结果
*/
@ESBulk(indexName = "order_data", operationType = "INDEX", maxBatchSize = 2000, enableBulk = true,
message = "大批量索引:操作失败")
public String largeBulkIndex(List<Map<String, Object>> documents) {
logger.info("执行大批量索引示例,文档数量: {}", documents.size());

// 模拟ES大批量索引操作
try {
Thread.sleep(documents.size() * 5);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}

return "大批量索引完成,文档数量: " + documents.size();
}

/**
* 并发批量索引示例
* @param documents 文档列表
* @return 处理结果
*/
@ESBulk(indexName = "product_data", operationType = "INDEX", maxBatchSize = 1000, enableConcurrentBulk = true,
concurrentBulkThreads = 10, message = "并发批量索引:操作失败")
public String concurrentBulkIndex(List<Map<String, Object>> documents) {
logger.info("执行并发批量索引示例,文档数量: {}", documents.size());

// 模拟ES并发批量索引操作
try {
Thread.sleep(documents.size() * 3);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}

return "并发批量索引完成,文档数量: " + documents.size();
}
}

6.2 ES批量(Bulk)测试控制器

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
/**
* ES批量(Bulk)测试控制器
* @author 运维实战
*/
@RestController
@RequestMapping("/api/elasticsearch/bulk/test")
public class ESBulkTestController {

@Autowired
private ESBulkExampleService exampleService;

@Autowired
private ESBulkService esBulkService;

@Autowired
private ESBulkMonitorService esBulkMonitorService;

@Autowired
private ESBulkOptimizeService esBulkOptimizeService;

private static final Logger logger = LoggerFactory.getLogger(ESBulkTestController.class);

/**
* 基础批量索引测试
* @param documentCount 文档数量
* @return 测试结果
*/
@GetMapping("/basic-index")
public ResponseEntity<Map<String, String>> testBasicBulkIndex(@RequestParam int documentCount) {
try {
// 生成测试文档
List<Map<String, Object>> documents = generateTestDocuments(documentCount);

String result = exampleService.basicBulkIndex(documents);

Map<String, String> response = new HashMap<>();
response.put("status", "SUCCESS");
response.put("result", result);
response.put("timestamp", String.valueOf(System.currentTimeMillis()));

return ResponseEntity.ok(response);

} catch (ESBulkException e) {
logger.warn("基础批量索引测试失败: {}", e.getMessage());
return ResponseEntity.status(e.getStatusCode()).build();
} catch (Exception e) {
logger.error("基础批量索引测试失败", e);
return ResponseEntity.status(HttpStatus.INTERNAL_SERVER_ERROR).build();
}
}

/**
* 大批量索引测试
* @param documentCount 文档数量
* @return 测试结果
*/
@GetMapping("/large-index")
public ResponseEntity<Map<String, String>> testLargeBulkIndex(@RequestParam int documentCount) {
try {
// 生成测试文档
List<Map<String, Object>> documents = generateTestDocuments(documentCount);

String result = exampleService.largeBulkIndex(documents);

Map<String, String> response = new HashMap<>();
response.put("status", "SUCCESS");
response.put("result", result);
response.put("timestamp", String.valueOf(System.currentTimeMillis()));

return ResponseEntity.ok(response);

} catch (ESBulkException e) {
logger.warn("大批量索引测试失败: {}", e.getMessage());
return ResponseEntity.status(e.getStatusCode()).build();
} catch (Exception e) {
logger.error("大批量索引测试失败", e);
return ResponseEntity.status(HttpStatus.INTERNAL_SERVER_ERROR).build();
}
}

/**
* 并发批量索引测试
* @param documentCount 文档数量
* @return 测试结果
*/
@GetMapping("/concurrent-index")
public ResponseEntity<Map<String, String>> testConcurrentBulkIndex(@RequestParam int documentCount) {
try {
// 生成测试文档
List<Map<String, Object>> documents = generateTestDocuments(documentCount);

String result = exampleService.concurrentBulkIndex(documents);

Map<String, String> response = new HashMap<>();
response.put("status", "SUCCESS");
response.put("result", result);
response.put("timestamp", String.valueOf(System.currentTimeMillis()));

return ResponseEntity.ok(response);

} catch (ESBulkException e) {
logger.warn("并发批量索引测试失败: {}", e.getMessage());
return ResponseEntity.status(e.getStatusCode()).build();
} catch (Exception e) {
logger.error("并发批量索引测试失败", e);
return ResponseEntity.status(HttpStatus.INTERNAL_SERVER_ERROR).build();
}
}

/**
* 获取批量操作策略
* @param operationType 操作类型
* @param operationCount 操作数量
* @return 批量操作策略
*/
@GetMapping("/strategy/{operationType}/{operationCount}")
public ResponseEntity<ESBulkStrategy> getBulkStrategy(
@PathVariable String operationType, @PathVariable int operationCount) {
try {
ESBulkStrategy strategy = esBulkOptimizeService.getStrategy(operationType, operationCount);
return ResponseEntity.ok(strategy);
} catch (Exception e) {
logger.error("获取批量操作策略失败", e);
return ResponseEntity.status(HttpStatus.INTERNAL_SERVER_ERROR).build();
}
}

/**
* 获取批量操作监控指标
* @return 监控指标
*/
@GetMapping("/metrics")
public ResponseEntity<ESBulkMetrics> getBulkMetrics() {
try {
ESBulkMetrics metrics = esBulkMonitorService.getMetrics();
return ResponseEntity.ok(metrics);
} catch (Exception e) {
logger.error("获取批量操作监控指标失败", e);
return ResponseEntity.status(HttpStatus.INTERNAL_SERVER_ERROR).build();
}
}

/**
* 生成测试文档
* @param count 文档数量
* @return 测试文档
*/
private List<Map<String, Object>> generateTestDocuments(int count) {
List<Map<String, Object>> documents = new ArrayList<>();

for (int i = 0; i < count; i++) {
Map<String, Object> document = new HashMap<>();
document.put("id", "test_id_" + i);
document.put("name", "test_name_" + i);
document.put("email", "test_email_" + i + "@example.com");
document.put("age", 20 + (i % 50));
document.put("created_at", new Timestamp(System.currentTimeMillis()));
documents.add(document);
}

return documents;
}
}

7. 总结

7.1 ES的批量(Bulk)最佳实践

  1. 合理设置批量大小: 根据数据量和系统性能设置合适的批量大小
  2. 选择合适的操作类型: 根据业务需求选择合适的Bulk操作类型
  3. 监控批量操作: 实时监控批量操作性能和成功率
  4. 动态调整参数: 根据监控数据动态调整批量操作参数
  5. 异常处理: 实现完善的异常处理和用户友好提示

7.2 性能优化建议

  • 批量大小优化: 根据数据量智能调整批量大小
  • 并发处理: 合理使用并发批量操作提升性能
  • 监控告警: 建立完善的监控和告警机制
  • 缓存优化: 合理使用缓存减少ES操作压力
  • 异步处理: 使用异步处理提升系统响应性能

7.3 运维管理要点

  • 实时监控: 监控ES批量操作性能和成功率
  • 动态调整: 根据负载情况动态调整批量操作参数
  • 异常处理: 建立异常处理和告警机制
  • 日志管理: 完善日志记录和分析
  • 性能调优: 根据监控数据优化批量操作参数

通过本文的ES的批量(Bulk)Java实战指南,您可以掌握ES批量操作的原理、实现方法、性能优化技巧以及在企业级应用中的最佳实践,构建高效、稳定的Elasticsearch批量操作系统!