前言

MySQL数据同步到Elasticsearch作为现代企业级应用的核心需求,直接影响着搜索性能、数据一致性和系统稳定性。通过深入理解MySQL到ES的数据同步策略,掌握性能优化技巧,能够构建一个高效、稳定、可扩展的数据同步系统,确保企业级应用的搜索性能和数据处理能力。本文从数据同步策略分析到性能优化,从基础原理到企业级应用,系统梳理MySQL到ES数据同步的完整解决方案。

一、MySQL到ES数据同步架构设计

1.1 数据同步整体架构

1.2 核心组件设计

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
/**
* MySQL到ES数据同步核心组件
*/
@Component
public class MySQLToESSyncComponent {

@Autowired
private DataChangeCaptureService captureService;

@Autowired
private MessageQueueService queueService;

@Autowired
private DataTransformService transformService;

@Autowired
private ESWriteService esWriteService;

/**
* 启动数据同步
*/
public void startSync() {
// 启动变更捕获
captureService.startCapture();

// 启动消息处理
queueService.startProcessing();

// 启动ES写入
esWriteService.startWriting();
}

/**
* 停止数据同步
*/
public void stopSync() {
captureService.stopCapture();
queueService.stopProcessing();
esWriteService.stopWriting();
}
}

二、数据同步策略分析

2.1 实时同步策略

2.1.1 基于Binlog的实时同步

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
/**
* 基于MySQL Binlog的实时数据同步
*/
@Component
public class BinlogSyncStrategy {

@Autowired
private BinlogReader binlogReader;

@Autowired
private DataChangeProcessor changeProcessor;

/**
* 启动Binlog同步
*/
@EventListener
public void startBinlogSync() {
binlogReader.readBinlog(new BinlogEventListener() {
@Override
public void onInsert(InsertEvent event) {
processInsertEvent(event);
}

@Override
public void onUpdate(UpdateEvent event) {
processUpdateEvent(event);
}

@Override
public void onDelete(DeleteEvent event) {
processDeleteEvent(event);
}
});
}

/**
* 处理插入事件
*/
private void processInsertEvent(InsertEvent event) {
try {
// 解析插入数据
Map<String, Object> data = parseInsertData(event);

// 数据转换
Map<String, Object> esDoc = transformToESDocument(data);

// 发送到消息队列
sendToQueue("insert", esDoc);

} catch (Exception e) {
log.error("处理插入事件失败", e);
handleSyncError(event, e);
}
}

/**
* 处理更新事件
*/
private void processUpdateEvent(UpdateEvent event) {
try {
// 解析更新前后数据
Map<String, Object> beforeData = parseData(event.getBefore());
Map<String, Object> afterData = parseData(event.getAfter());

// 数据转换
Map<String, Object> esDoc = transformToESDocument(afterData);

// 发送到消息队列
sendToQueue("update", esDoc);

} catch (Exception e) {
log.error("处理更新事件失败", e);
handleSyncError(event, e);
}
}

/**
* 处理删除事件
*/
private void processDeleteEvent(DeleteEvent event) {
try {
// 解析删除数据
Map<String, Object> data = parseDeleteData(event);

// 发送删除指令到消息队列
sendToQueue("delete", data);

} catch (Exception e) {
log.error("处理删除事件失败", e);
handleSyncError(event, e);
}
}
}

2.1.2 基于Canal的实时同步

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
/**
* 基于Canal的实时数据同步
*/
@Component
public class CanalSyncStrategy {

@Autowired
private CanalConnector canalConnector;

@Autowired
private DataSyncProcessor syncProcessor;

/**
* 启动Canal同步
*/
public void startCanalSync() {
// 连接Canal
canalConnector.connect();

// 订阅数据库
canalConnector.subscribe(".*\\..*");

// 回滚到未进行ack的地方
canalConnector.rollback();

// 启动数据同步线程
startSyncThread();
}

/**
* 启动同步线程
*/
private void startSyncThread() {
Thread syncThread = new Thread(() -> {
while (true) {
try {
// 获取指定数量的数据
Message message = canalConnector.getWithoutAck(1000);

long batchId = message.getBatchId();
int size = message.getEntries().size();

if (batchId == -1 || size == 0) {
Thread.sleep(1000);
continue;
}

// 处理数据变更
processEntries(message.getEntries());

// 提交确认
canalConnector.ack(batchId);

} catch (Exception e) {
log.error("Canal同步异常", e);
// 回滚
canalConnector.rollback();
}
}
});

syncThread.setName("CanalSyncThread");
syncThread.start();
}

/**
* 处理数据变更条目
*/
private void processEntries(List<Entry> entries) {
for (Entry entry : entries) {
if (entry.getEntryType() == EntryType.ROWDATA) {
RowChange rowChange = null;
try {
rowChange = RowChange.parseFrom(entry.getStoreValue());
} catch (Exception e) {
log.error("解析RowChange失败", e);
continue;
}

EventType eventType = rowChange.getEventType();

for (RowData rowData : rowChange.getRowDatasList()) {
switch (eventType) {
case INSERT:
processInsert(rowData.getAfterColumnsList());
break;
case UPDATE:
processUpdate(rowData.getBeforeColumnsList(),
rowData.getAfterColumnsList());
break;
case DELETE:
processDelete(rowData.getBeforeColumnsList());
break;
}
}
}
}
}
}

2.2 批量同步策略

2.2.1 全量数据同步

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
/**
* 全量数据同步策略
*/
@Component
public class FullSyncStrategy {

@Autowired
private MySQLDataSource mysqlDataSource;

@Autowired
private ESClient esClient;

@Autowired
private DataTransformService transformService;

/**
* 执行全量同步
*/
public void executeFullSync(String tableName, String indexName) {
try {
// 1. 创建ES索引
createESIndex(indexName);

// 2. 分批读取MySQL数据
int batchSize = 1000;
int offset = 0;

while (true) {
List<Map<String, Object>> batchData =
readBatchData(tableName, offset, batchSize);

if (batchData.isEmpty()) {
break;
}

// 3. 数据转换
List<Map<String, Object>> esDocs =
transformService.transformBatch(batchData);

// 4. 批量写入ES
bulkWriteToES(indexName, esDocs);

offset += batchSize;

// 5. 记录同步进度
recordSyncProgress(tableName, offset);
}

log.info("全量同步完成: {} -> {}", tableName, indexName);

} catch (Exception e) {
log.error("全量同步失败", e);
throw new SyncException("全量同步失败", e);
}
}

/**
* 分批读取数据
*/
private List<Map<String, Object>> readBatchData(String tableName,
int offset, int batchSize) {
String sql = String.format(
"SELECT * FROM %s ORDER BY id LIMIT %d OFFSET %d",
tableName, batchSize, offset
);

return mysqlDataSource.queryForList(sql);
}

/**
* 批量写入ES
*/
private void bulkWriteToES(String indexName, List<Map<String, Object>> docs) {
BulkRequest bulkRequest = new BulkRequest();

for (Map<String, Object> doc : docs) {
IndexRequest indexRequest = new IndexRequest(indexName)
.source(doc);
bulkRequest.add(indexRequest);
}

try {
BulkResponse bulkResponse = esClient.bulk(bulkRequest);

if (bulkResponse.hasFailures()) {
log.error("批量写入ES失败: {}", bulkResponse.buildFailureMessage());
}

} catch (Exception e) {
log.error("批量写入ES异常", e);
throw new SyncException("批量写入ES失败", e);
}
}
}

2.2.2 增量数据同步

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
/**
* 增量数据同步策略
*/
@Component
public class IncrementalSyncStrategy {

@Autowired
private MySQLDataSource mysqlDataSource;

@Autowired
private SyncStateService syncStateService;

@Autowired
private ESClient esClient;

/**
* 执行增量同步
*/
public void executeIncrementalSync(String tableName, String indexName) {
try {
// 1. 获取上次同步时间
Date lastSyncTime = syncStateService.getLastSyncTime(tableName);

// 2. 查询增量数据
List<Map<String, Object>> incrementalData =
queryIncrementalData(tableName, lastSyncTime);

if (incrementalData.isEmpty()) {
log.info("没有增量数据需要同步");
return;
}

// 3. 数据转换
List<Map<String, Object>> esDocs =
transformService.transformBatch(incrementalData);

// 4. 批量写入ES
bulkWriteToES(indexName, esDocs);

// 5. 更新同步状态
syncStateService.updateLastSyncTime(tableName, new Date());

log.info("增量同步完成: {} -> {}, 同步数据量: {}",
tableName, indexName, incrementalData.size());

} catch (Exception e) {
log.error("增量同步失败", e);
throw new SyncException("增量同步失败", e);
}
}

/**
* 查询增量数据
*/
private List<Map<String, Object>> queryIncrementalData(String tableName,
Date lastSyncTime) {
String sql = String.format(
"SELECT * FROM %s WHERE update_time > ? ORDER BY update_time",
tableName
);

return mysqlDataSource.queryForList(sql, lastSyncTime);
}
}

三、数据转换与映射

3.1 数据转换服务

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
/**
* 数据转换服务
*/
@Service
public class DataTransformService {

@Autowired
private FieldMappingService fieldMappingService;

@Autowired
private DataTypeConverter dataTypeConverter;

/**
* 转换单条数据
*/
public Map<String, Object> transform(Map<String, Object> mysqlData,
String tableName) {
try {
// 1. 获取字段映射规则
Map<String, String> fieldMapping =
fieldMappingService.getFieldMapping(tableName);

// 2. 字段映射转换
Map<String, Object> mappedData = new HashMap<>();
for (Map.Entry<String, Object> entry : mysqlData.entrySet()) {
String mysqlField = entry.getKey();
Object value = entry.getValue();

String esField = fieldMapping.getOrDefault(mysqlField, mysqlField);

// 3. 数据类型转换
Object convertedValue = dataTypeConverter.convert(value, esField);

mappedData.put(esField, convertedValue);
}

// 4. 添加元数据
addMetadata(mappedData, tableName);

return mappedData;

} catch (Exception e) {
log.error("数据转换失败", e);
throw new TransformException("数据转换失败", e);
}
}

/**
* 批量转换数据
*/
public List<Map<String, Object>> transformBatch(List<Map<String, Object>> mysqlDataList,
String tableName) {
return mysqlDataList.stream()
.map(data -> transform(data, tableName))
.collect(Collectors.toList());
}

/**
* 添加元数据
*/
private void addMetadata(Map<String, Object> data, String tableName) {
data.put("_source_table", tableName);
data.put("_sync_time", new Date());
data.put("_sync_version", "1.0");
}
}

3.2 字段映射配置

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
/**
* 字段映射服务
*/
@Service
public class FieldMappingService {

private final Map<String, Map<String, String>> fieldMappings = new HashMap<>();

@PostConstruct
public void initFieldMappings() {
// 用户表字段映射
Map<String, String> userMapping = new HashMap<>();
userMapping.put("user_id", "id");
userMapping.put("user_name", "name");
userMapping.put("user_email", "email");
userMapping.put("create_time", "createdAt");
userMapping.put("update_time", "updatedAt");
fieldMappings.put("user", userMapping);

// 订单表字段映射
Map<String, String> orderMapping = new HashMap<>();
orderMapping.put("order_id", "id");
orderMapping.put("order_no", "orderNumber");
orderMapping.put("user_id", "userId");
orderMapping.put("order_amount", "amount");
orderMapping.put("order_status", "status");
orderMapping.put("create_time", "createdAt");
fieldMappings.put("order", orderMapping);

// 商品表字段映射
Map<String, String> productMapping = new HashMap<>();
productMapping.put("product_id", "id");
productMapping.put("product_name", "name");
productMapping.put("product_price", "price");
productMapping.put("product_category", "category");
productMapping.put("product_description", "description");
fieldMappings.put("product", productMapping);
}

/**
* 获取字段映射
*/
public Map<String, String> getFieldMapping(String tableName) {
return fieldMappings.getOrDefault(tableName, new HashMap<>());
}

/**
* 添加字段映射
*/
public void addFieldMapping(String tableName, String mysqlField, String esField) {
fieldMappings.computeIfAbsent(tableName, k -> new HashMap<>())
.put(mysqlField, esField);
}
}

3.3 数据类型转换

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
/**
* 数据类型转换器
*/
@Component
public class DataTypeConverter {

private final DateTimeFormatter dateTimeFormatter =
DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss");

/**
* 转换数据类型
*/
public Object convert(Object value, String fieldName) {
if (value == null) {
return null;
}

// 根据字段名判断数据类型
if (fieldName.contains("time") || fieldName.contains("Time")) {
return convertToDateTime(value);
} else if (fieldName.contains("date") || fieldName.contains("Date")) {
return convertToDate(value);
} else if (fieldName.contains("amount") || fieldName.contains("price")) {
return convertToDecimal(value);
} else if (fieldName.contains("status") || fieldName.contains("type")) {
return convertToInteger(value);
} else if (value instanceof String) {
return convertString(value.toString());
}

return value;
}

/**
* 转换为日期时间
*/
private Object convertToDateTime(Object value) {
if (value instanceof Date) {
return ((Date) value).toInstant().toString();
} else if (value instanceof String) {
try {
LocalDateTime dateTime = LocalDateTime.parse(value.toString(), dateTimeFormatter);
return dateTime.toString();
} catch (Exception e) {
return value;
}
}
return value;
}

/**
* 转换为日期
*/
private Object convertToDate(Object value) {
if (value instanceof Date) {
return ((Date) value).toInstant().toString().substring(0, 10);
}
return value;
}

/**
* 转换为小数
*/
private Object convertToDecimal(Object value) {
if (value instanceof BigDecimal) {
return ((BigDecimal) value).doubleValue();
} else if (value instanceof Number) {
return ((Number) value).doubleValue();
} else if (value instanceof String) {
try {
return Double.parseDouble(value.toString());
} catch (NumberFormatException e) {
return 0.0;
}
}
return value;
}

/**
* 转换为整数
*/
private Object convertToInteger(Object value) {
if (value instanceof Number) {
return ((Number) value).intValue();
} else if (value instanceof String) {
try {
return Integer.parseInt(value.toString());
} catch (NumberFormatException e) {
return 0;
}
}
return value;
}

/**
* 转换字符串
*/
private Object convertString(String value) {
// 去除前后空格
value = value.trim();

// 处理空字符串
if (value.isEmpty()) {
return null;
}

return value;
}
}

四、ES索引设计与优化

4.1 索引映射设计

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
/**
* ES索引映射设计
*/
@Component
public class ESIndexMappingService {

@Autowired
private ESClient esClient;

/**
* 创建用户索引映射
*/
public void createUserIndexMapping() {
Map<String, Object> mapping = new HashMap<>();

Map<String, Object> properties = new HashMap<>();

// ID字段
Map<String, Object> idMapping = new HashMap<>();
idMapping.put("type", "keyword");
properties.put("id", idMapping);

// 姓名字段
Map<String, Object> nameMapping = new HashMap<>();
nameMapping.put("type", "text");
nameMapping.put("analyzer", "ik_max_word");
nameMapping.put("search_analyzer", "ik_smart");
Map<String, Object> nameFields = new HashMap<>();
Map<String, Object> keywordField = new HashMap<>();
keywordField.put("type", "keyword");
nameFields.put("keyword", keywordField);
nameMapping.put("fields", nameFields);
properties.put("name", nameMapping);

// 邮箱字段
Map<String, Object> emailMapping = new HashMap<>();
emailMapping.put("type", "keyword");
properties.put("email", emailMapping);

// 创建时间
Map<String, Object> createdAtMapping = new HashMap<>();
createdAtMapping.put("type", "date");
createdAtMapping.put("format", "yyyy-MM-dd HH:mm:ss||yyyy-MM-dd||epoch_millis");
properties.put("createdAt", createdAtMapping);

// 更新时间
Map<String, Object> updatedAtMapping = new HashMap<>();
updatedAtMapping.put("type", "date");
updatedAtMapping.put("format", "yyyy-MM-dd HH:mm:ss||yyyy-MM-dd||epoch_millis");
properties.put("updatedAt", updatedAtMapping);

mapping.put("properties", properties);

// 创建索引
CreateIndexRequest request = new CreateIndexRequest("user")
.mapping(mapping)
.settings(createIndexSettings());

try {
CreateIndexResponse response = esClient.indices().create(request);
log.info("用户索引创建成功: {}", response.isAcknowledged());
} catch (Exception e) {
log.error("用户索引创建失败", e);
throw new IndexException("用户索引创建失败", e);
}
}

/**
* 创建订单索引映射
*/
public void createOrderIndexMapping() {
Map<String, Object> mapping = new HashMap<>();

Map<String, Object> properties = new HashMap<>();

// 订单ID
Map<String, Object> idMapping = new HashMap<>();
idMapping.put("type", "keyword");
properties.put("id", idMapping);

// 订单号
Map<String, Object> orderNumberMapping = new HashMap<>();
orderNumberMapping.put("type", "keyword");
properties.put("orderNumber", orderNumberMapping);

// 用户ID
Map<String, Object> userIdMapping = new HashMap<>();
userIdMapping.put("type", "keyword");
properties.put("userId", userIdMapping);

// 订单金额
Map<String, Object> amountMapping = new HashMap<>();
amountMapping.put("type", "double");
properties.put("amount", amountMapping);

// 订单状态
Map<String, Object> statusMapping = new HashMap<>();
statusMapping.put("type", "integer");
properties.put("status", statusMapping);

// 创建时间
Map<String, Object> createdAtMapping = new HashMap<>();
createdAtMapping.put("type", "date");
createdAtMapping.put("format", "yyyy-MM-dd HH:mm:ss||yyyy-MM-dd||epoch_millis");
properties.put("createdAt", createdAtMapping);

mapping.put("properties", properties);

// 创建索引
CreateIndexRequest request = new CreateIndexRequest("order")
.mapping(mapping)
.settings(createIndexSettings());

try {
CreateIndexResponse response = esClient.indices().create(request);
log.info("订单索引创建成功: {}", response.isAcknowledged());
} catch (Exception e) {
log.error("订单索引创建失败", e);
throw new IndexException("订单索引创建失败", e);
}
}

/**
* 创建索引设置
*/
private Map<String, Object> createIndexSettings() {
Map<String, Object> settings = new HashMap<>();

// 分片设置
Map<String, Object> indexSettings = new HashMap<>();
indexSettings.put("number_of_shards", 3);
indexSettings.put("number_of_replicas", 1);

// 刷新间隔
indexSettings.put("refresh_interval", "30s");

// 分析器设置
Map<String, Object> analysis = new HashMap<>();
Map<String, Object> analyzer = new HashMap<>();

// IK分析器
Map<String, Object> ikMaxWord = new HashMap<>();
ikMaxWord.put("type", "custom");
ikMaxWord.put("tokenizer", "ik_max_word");
analyzer.put("ik_max_word", ikMaxWord);

Map<String, Object> ikSmart = new HashMap<>();
ikSmart.put("type", "custom");
ikSmart.put("tokenizer", "ik_smart");
analyzer.put("ik_smart", ikSmart);

analysis.put("analyzer", analyzer);
indexSettings.put("analysis", analysis);

settings.put("index", indexSettings);

return settings;
}
}

4.2 索引优化策略

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
/**
* ES索引优化服务
*/
@Service
public class ESIndexOptimizationService {

@Autowired
private ESClient esClient;

/**
* 优化索引性能
*/
public void optimizeIndex(String indexName) {
try {
// 1. 强制合并段
forceMergeSegments(indexName);

// 2. 刷新索引
refreshIndex(indexName);

// 3. 优化索引设置
optimizeIndexSettings(indexName);

log.info("索引优化完成: {}", indexName);

} catch (Exception e) {
log.error("索引优化失败", e);
throw new OptimizationException("索引优化失败", e);
}
}

/**
* 强制合并段
*/
private void forceMergeSegments(String indexName) {
ForceMergeRequest request = new ForceMergeRequest(indexName);
request.maxNumSegments(1);
request.flush(true);

try {
ForceMergeResponse response = esClient.indices().forcemerge(request);
log.info("段合并完成: {}, 耗时: {}ms", indexName, response.getTotalMillis());
} catch (Exception e) {
log.error("段合并失败", e);
}
}

/**
* 刷新索引
*/
private void refreshIndex(String indexName) {
RefreshRequest request = new RefreshRequest(indexName);

try {
RefreshResponse response = esClient.indices().refresh(request);
log.info("索引刷新完成: {}", indexName);
} catch (Exception e) {
log.error("索引刷新失败", e);
}
}

/**
* 优化索引设置
*/
private void optimizeIndexSettings(String indexName) {
Map<String, Object> settings = new HashMap<>();

// 禁用自动刷新
settings.put("refresh_interval", "-1");

// 设置合并策略
Map<String, Object> merge = new HashMap<>();
merge.put("policy", "tiered");
Map<String, Object> tiered = new HashMap<>();
tiered.put("segments_per_tier", 2);
tiered.put("max_merge_at_once", 2);
merge.put("tiered", tiered);
settings.put("merge", merge);

UpdateSettingsRequest request = new UpdateSettingsRequest(indexName)
.settings(settings);

try {
UpdateSettingsResponse response = esClient.indices().putSettings(request);
log.info("索引设置优化完成: {}", indexName);
} catch (Exception e) {
log.error("索引设置优化失败", e);
}
}
}

五、性能优化与监控

5.1 批量写入优化

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
/**
* ES批量写入优化服务
*/
@Service
public class ESBulkWriteOptimizationService {

@Autowired
private ESClient esClient;

private final int BATCH_SIZE = 1000;
private final int FLUSH_INTERVAL = 5000; // 5秒

private final Map<String, List<IndexRequest>> pendingRequests = new ConcurrentHashMap<>();
private final Map<String, Long> lastFlushTime = new ConcurrentHashMap<>();

/**
* 批量写入优化
*/
public void optimizedBulkWrite(String indexName, List<Map<String, Object>> documents) {
try {
// 1. 分批处理
List<List<Map<String, Object>>> batches =
Lists.partition(documents, BATCH_SIZE);

// 2. 并行处理批次
batches.parallelStream().forEach(batch -> {
processBatch(indexName, batch);
});

// 3. 刷新待处理请求
flushPendingRequests(indexName);

} catch (Exception e) {
log.error("批量写入优化失败", e);
throw new BulkWriteException("批量写入优化失败", e);
}
}

/**
* 处理批次
*/
private void processBatch(String indexName, List<Map<String, Object>> batch) {
BulkRequest bulkRequest = new BulkRequest();

for (Map<String, Object> doc : batch) {
IndexRequest indexRequest = new IndexRequest(indexName)
.source(doc);
bulkRequest.add(indexRequest);
}

// 设置批量请求参数
bulkRequest.timeout(TimeValue.timeValueMinutes(2));
bulkRequest.setRefreshPolicy(WriteRequest.RefreshPolicy.NONE);

try {
BulkResponse bulkResponse = esClient.bulk(bulkRequest);

if (bulkResponse.hasFailures()) {
handleBulkFailures(bulkResponse, indexName);
}

} catch (Exception e) {
log.error("批次处理失败", e);
// 重试机制
retryBatch(indexName, batch);
}
}

/**
* 处理批量写入失败
*/
private void handleBulkFailures(BulkResponse bulkResponse, String indexName) {
for (BulkItemResponse item : bulkResponse.getItems()) {
if (item.isFailed()) {
BulkItemResponse.Failure failure = item.getFailure();
log.error("批量写入失败: {}, 原因: {}",
failure.getId(), failure.getMessage());

// 记录失败文档,用于后续重试
recordFailedDocument(indexName, item);
}
}
}

/**
* 重试批次
*/
private void retryBatch(String indexName, List<Map<String, Object>> batch) {
// 实现重试逻辑
int maxRetries = 3;
int retryCount = 0;

while (retryCount < maxRetries) {
try {
Thread.sleep(1000 * (retryCount + 1)); // 指数退避
processBatch(indexName, batch);
break;
} catch (Exception e) {
retryCount++;
log.warn("重试批次失败,第{}次重试", retryCount);
}
}

if (retryCount >= maxRetries) {
log.error("批次重试失败,达到最大重试次数");
}
}
}

5.2 同步性能监控

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
/**
* 数据同步性能监控服务
*/
@Service
public class SyncPerformanceMonitorService {

private final MeterRegistry meterRegistry;
private final Timer syncTimer;
private final Counter syncCounter;
private final Gauge syncGauge;

public SyncPerformanceMonitorService(MeterRegistry meterRegistry) {
this.meterRegistry = meterRegistry;
this.syncTimer = Timer.builder("sync.duration")
.description("数据同步耗时")
.register(meterRegistry);
this.syncCounter = Counter.builder("sync.count")
.description("数据同步次数")
.register(meterRegistry);
this.syncGauge = Gauge.builder("sync.queue.size")
.description("同步队列大小")
.register(meterRegistry, this, SyncPerformanceMonitorService::getQueueSize);
}

/**
* 记录同步性能指标
*/
public void recordSyncMetrics(String tableName, long duration, int recordCount) {
// 记录同步耗时
syncTimer.record(duration, TimeUnit.MILLISECONDS);

// 记录同步次数
syncCounter.increment();

// 记录同步记录数
Counter.builder("sync.records")
.tag("table", tableName)
.register(meterRegistry)
.increment(recordCount);

// 记录同步速率
Timer.builder("sync.rate")
.tag("table", tableName)
.register(meterRegistry)
.record(duration, TimeUnit.MILLISECONDS);
}

/**
* 获取队列大小
*/
private double getQueueSize() {
// 实现获取队列大小的逻辑
return 0.0;
}

/**
* 生成性能报告
*/
public SyncPerformanceReport generatePerformanceReport() {
SyncPerformanceReport report = new SyncPerformanceReport();

// 计算平均同步耗时
double avgDuration = syncTimer.mean(TimeUnit.MILLISECONDS);
report.setAverageSyncDuration(avgDuration);

// 计算同步总数
double totalCount = syncCounter.count();
report.setTotalSyncCount(totalCount);

// 计算同步成功率
double successRate = calculateSuccessRate();
report.setSuccessRate(successRate);

// 计算同步速率
double syncRate = calculateSyncRate();
report.setSyncRate(syncRate);

return report;
}

/**
* 计算同步成功率
*/
private double calculateSuccessRate() {
// 实现成功率计算逻辑
return 0.95; // 示例值
}

/**
* 计算同步速率
*/
private double calculateSyncRate() {
// 实现同步速率计算逻辑
return 1000.0; // 示例值:每秒1000条记录
}
}

5.3 异常处理与重试机制

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
/**
* 数据同步异常处理服务
*/
@Service
public class SyncExceptionHandlerService {

@Autowired
private RetryTemplate retryTemplate;

@Autowired
private DeadLetterQueueService deadLetterQueueService;

/**
* 处理同步异常
*/
public void handleSyncException(String tableName, Object data, Exception exception) {
try {
// 1. 记录异常日志
log.error("数据同步异常: 表={}, 数据={}", tableName, data, exception);

// 2. 判断异常类型
if (isRetryableException(exception)) {
// 可重试异常,加入重试队列
addToRetryQueue(tableName, data, exception);
} else {
// 不可重试异常,加入死信队列
addToDeadLetterQueue(tableName, data, exception);
}

// 3. 发送告警通知
sendAlertNotification(tableName, exception);

} catch (Exception e) {
log.error("异常处理失败", e);
}
}

/**
* 判断是否为可重试异常
*/
private boolean isRetryableException(Exception exception) {
return exception instanceof ConnectException ||
exception instanceof SocketTimeoutException ||
exception instanceof ReadTimeoutException ||
exception instanceof BulkWriteException;
}

/**
* 添加到重试队列
*/
private void addToRetryQueue(String tableName, Object data, Exception exception) {
RetryTask retryTask = new RetryTask();
retryTask.setTableName(tableName);
retryTask.setData(data);
retryTask.setException(exception);
retryTask.setRetryCount(0);
retryTask.setMaxRetries(3);
retryTask.setNextRetryTime(calculateNextRetryTime(0));

// 发送到重试队列
retryTemplate.execute(context -> {
// 重试逻辑
return retrySyncTask(retryTask);
});
}

/**
* 添加到死信队列
*/
private void addToDeadLetterQueue(String tableName, Object data, Exception exception) {
DeadLetterMessage message = new DeadLetterMessage();
message.setTableName(tableName);
message.setData(data);
message.setException(exception);
message.setTimestamp(new Date());

deadLetterQueueService.addMessage(message);
}

/**
* 计算下次重试时间
*/
private Date calculateNextRetryTime(int retryCount) {
// 指数退避策略
long delay = (long) Math.pow(2, retryCount) * 1000; // 毫秒
return new Date(System.currentTimeMillis() + delay);
}

/**
* 重试同步任务
*/
private Object retrySyncTask(RetryTask retryTask) {
try {
// 执行重试逻辑
return executeSyncTask(retryTask);
} catch (Exception e) {
retryTask.setRetryCount(retryTask.getRetryCount() + 1);

if (retryTask.getRetryCount() >= retryTask.getMaxRetries()) {
// 达到最大重试次数,加入死信队列
addToDeadLetterQueue(retryTask.getTableName(),
retryTask.getData(), e);
} else {
// 继续重试
retryTask.setNextRetryTime(calculateNextRetryTime(retryTask.getRetryCount()));
addToRetryQueue(retryTask.getTableName(), retryTask.getData(), e);
}

throw e;
}
}

/**
* 执行同步任务
*/
private Object executeSyncTask(RetryTask retryTask) {
// 实现具体的同步逻辑
return null;
}

/**
* 发送告警通知
*/
private void sendAlertNotification(String tableName, Exception exception) {
AlertMessage alert = new AlertMessage();
alert.setTableName(tableName);
alert.setException(exception);
alert.setTimestamp(new Date());
alert.setSeverity(AlertSeverity.ERROR);

// 发送告警
// alertService.sendAlert(alert);
}
}

六、企业级数据同步架构

6.1 微服务架构设计

6.2 配置管理服务

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
/**
* 数据同步配置管理服务
*/
@Service
public class SyncConfigManagementService {

@Autowired
private ConfigRepository configRepository;

/**
* 获取同步配置
*/
public SyncConfig getSyncConfig(String tableName) {
return configRepository.findByTableName(tableName)
.orElseThrow(() -> new ConfigNotFoundException("同步配置不存在: " + tableName));
}

/**
* 更新同步配置
*/
public void updateSyncConfig(String tableName, SyncConfig config) {
try {
// 1. 验证配置
validateSyncConfig(config);

// 2. 更新配置
configRepository.save(config);

// 3. 通知配置变更
notifyConfigChange(tableName, config);

log.info("同步配置更新成功: {}", tableName);

} catch (Exception e) {
log.error("同步配置更新失败", e);
throw new ConfigUpdateException("同步配置更新失败", e);
}
}

/**
* 验证同步配置
*/
private void validateSyncConfig(SyncConfig config) {
if (config.getTableName() == null || config.getTableName().isEmpty()) {
throw new ConfigValidationException("表名不能为空");
}

if (config.getIndexName() == null || config.getIndexName().isEmpty()) {
throw new ConfigValidationException("索引名不能为空");
}

if (config.getSyncMode() == null) {
throw new ConfigValidationException("同步模式不能为空");
}

if (config.getBatchSize() <= 0) {
throw new ConfigValidationException("批次大小必须大于0");
}
}

/**
* 通知配置变更
*/
private void notifyConfigChange(String tableName, SyncConfig config) {
ConfigChangeEvent event = new ConfigChangeEvent();
event.setTableName(tableName);
event.setConfig(config);
event.setTimestamp(new Date());

// 发布配置变更事件
// eventPublisher.publishEvent(event);
}
}

6.3 数据一致性保障

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
/**
* 数据一致性保障服务
*/
@Service
public class DataConsistencyService {

@Autowired
private MySQLDataSource mysqlDataSource;

@Autowired
private ESClient esClient;

@Autowired
private ConsistencyCheckService consistencyCheckService;

/**
* 检查数据一致性
*/
public ConsistencyReport checkDataConsistency(String tableName, String indexName) {
ConsistencyReport report = new ConsistencyReport();

try {
// 1. 获取MySQL数据总数
long mysqlCount = getMySQLRecordCount(tableName);

// 2. 获取ES数据总数
long esCount = getESRecordCount(indexName);

// 3. 比较数据总数
report.setMysqlCount(mysqlCount);
report.setEsCount(esCount);
report.setCountConsistent(mysqlCount == esCount);

// 4. 抽样检查数据内容
List<ConsistencyCheckResult> sampleResults =
consistencyCheckService.checkSampleData(tableName, indexName);
report.setSampleResults(sampleResults);

// 5. 计算一致性比例
double consistencyRatio = calculateConsistencyRatio(sampleResults);
report.setConsistencyRatio(consistencyRatio);

// 6. 生成一致性报告
report.setConsistent(consistencyRatio >= 0.95);
report.setCheckTime(new Date());

return report;

} catch (Exception e) {
log.error("数据一致性检查失败", e);
throw new ConsistencyCheckException("数据一致性检查失败", e);
}
}

/**
* 获取MySQL记录数
*/
private long getMySQLRecordCount(String tableName) {
String sql = "SELECT COUNT(*) FROM " + tableName;
return mysqlDataSource.queryForObject(sql, Long.class);
}

/**
* 获取ES记录数
*/
private long getESRecordCount(String indexName) {
CountRequest countRequest = new CountRequest(indexName);

try {
CountResponse countResponse = esClient.count(countRequest);
return countResponse.getCount();
} catch (Exception e) {
log.error("获取ES记录数失败", e);
return 0;
}
}

/**
* 计算一致性比例
*/
private double calculateConsistencyRatio(List<ConsistencyCheckResult> results) {
if (results.isEmpty()) {
return 0.0;
}

long consistentCount = results.stream()
.mapToLong(result -> result.isConsistent() ? 1 : 0)
.sum();

return (double) consistentCount / results.size();
}

/**
* 修复数据不一致
*/
public void repairDataInconsistency(String tableName, String indexName) {
try {
// 1. 识别不一致的数据
List<InconsistentData> inconsistentDataList =
identifyInconsistentData(tableName, indexName);

// 2. 修复不一致的数据
for (InconsistentData data : inconsistentDataList) {
repairInconsistentData(data);
}

log.info("数据不一致修复完成: {} -> {}", tableName, indexName);

} catch (Exception e) {
log.error("数据不一致修复失败", e);
throw new ConsistencyRepairException("数据不一致修复失败", e);
}
}

/**
* 识别不一致的数据
*/
private List<InconsistentData> identifyInconsistentData(String tableName, String indexName) {
// 实现识别不一致数据的逻辑
return new ArrayList<>();
}

/**
* 修复不一致的数据
*/
private void repairInconsistentData(InconsistentData data) {
// 实现修复不一致数据的逻辑
}
}

七、最佳实践与总结

7.1 数据同步最佳实践

  1. 选择合适的同步策略

    • 实时性要求高的场景使用Binlog同步
    • 批量处理场景使用定时同步
    • 混合使用实时同步和批量同步
  2. 优化ES写入性能

    • 使用批量写入减少网络开销
    • 合理设置刷新间隔
    • 优化索引映射和设置
  3. 保障数据一致性

    • 实现数据一致性检查机制
    • 建立数据修复流程
    • 监控数据同步状态
  4. 异常处理与监控

    • 建立完善的异常处理机制
    • 实现重试和死信队列
    • 监控同步性能和状态

7.2 架构设计要点

  1. 高可用性设计

    • 多节点部署
    • 故障转移机制
    • 数据备份策略
  2. 可扩展性设计

    • 微服务架构
    • 水平扩展能力
    • 负载均衡
  3. 可维护性设计

    • 配置化管理
    • 监控告警
    • 日志记录

7.3 性能优化建议

  1. MySQL优化

    • 合理设计索引
    • 优化查询语句
    • 配置合适的参数
  2. ES优化

    • 优化索引映射
    • 调整分片和副本
    • 使用合适的分析器
  3. 网络优化

    • 使用批量操作
    • 压缩传输数据
    • 优化网络配置

总结

MySQL数据同步到Elasticsearch是企业级应用的核心需求,通过合理的数据同步策略、性能优化技巧和企业级架构设计,能够构建一个高效、稳定、可扩展的数据同步系统。本文从数据同步策略分析到性能优化,从基础原理到企业级应用,系统梳理了MySQL到ES数据同步的完整解决方案。

关键要点:

  1. 多种同步策略:实时同步、批量同步、混合同步
  2. 性能优化:批量写入、索引优化、网络优化
  3. 数据一致性:一致性检查、数据修复、监控告警
  4. 企业级架构:微服务设计、配置管理、异常处理

通过深入理解这些技术要点,架构师能够设计出符合企业需求的数据同步系统,确保数据的实时性、一致性和系统的高可用性。