前言

千万级数据的全表更新作为大数据处理中的重要场景,直接影响着系统性能和业务连续性。通过合理的大数据更新策略和分批处理方案,能够在不影响业务运行的情况下完成大规模数据更新,确保系统的稳定运行。本文从大数据更新策略到分批处理方案,从基础实现到企业级应用,系统梳理千万级数据全表更新的完整解决方案。

一、大数据更新架构设计

1.1 大数据更新整体架构

1.2 大数据更新策略架构

二、大数据更新策略实现

2.1 分批更新策略

2.1.1 分批更新服务

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
/**
* 分批更新服务
*/
@Service
public class BatchUpdateService {

@Autowired
private DataSource dataSource;

@Autowired
private BatchUpdateConfigService configService;

@Autowired
private BatchUpdateProgressService progressService;

private final int DEFAULT_BATCH_SIZE = 1000;
private final int MAX_BATCH_SIZE = 10000;

/**
* 分批更新数据
*/
public BatchUpdateResult batchUpdate(String tableName, String updateSql,
Map<String, Object> parameters) {
try {
BatchUpdateResult result = new BatchUpdateResult();
result.setTableName(tableName);
result.setStartTime(new Date());

// 1. 获取更新配置
BatchUpdateConfig config = configService.getConfig(tableName);
if (config == null) {
config = createDefaultConfig(tableName);
}

// 2. 计算总数据量
long totalCount = getTotalCount(tableName, updateSql, parameters);
result.setTotalCount(totalCount);

if (totalCount == 0) {
result.setStatus(BatchUpdateStatus.COMPLETED);
result.setEndTime(new Date());
return result;
}

// 3. 分批处理
int batchSize = Math.min(config.getBatchSize(), MAX_BATCH_SIZE);
int totalBatches = (int) Math.ceil((double) totalCount / batchSize);
result.setTotalBatches(totalBatches);

// 4. 执行分批更新
executeBatchUpdate(tableName, updateSql, parameters, batchSize, totalBatches, result);

result.setStatus(BatchUpdateStatus.COMPLETED);
result.setEndTime(new Date());

log.info("分批更新完成: 表={}, 总数量={}, 批次={}", tableName, totalCount, totalBatches);
return result;

} catch (Exception e) {
log.error("分批更新失败", e);
throw new DataUpdateException("分批更新失败", e);
}
}

/**
* 执行分批更新
*/
private void executeBatchUpdate(String tableName, String updateSql,
Map<String, Object> parameters, int batchSize, int totalBatches,
BatchUpdateResult result) {
try {
int processedBatches = 0;
int processedCount = 0;

while (processedBatches < totalBatches) {
try {
// 执行单批次更新
int batchCount = executeSingleBatch(tableName, updateSql, parameters,
processedBatches * batchSize, batchSize);

processedCount += batchCount;
processedBatches++;

// 更新进度
result.setProcessedBatches(processedBatches);
result.setProcessedCount(processedCount);
result.setProgress((double) processedBatches / totalBatches * 100);

progressService.updateProgress(result);

// 休眠一段时间,避免对数据库造成压力
Thread.sleep(configService.getSleepInterval());

} catch (Exception e) {
log.error("批次更新失败: 批次={}", processedBatches, e);

// 记录失败批次
result.addFailedBatch(processedBatches);

// 根据配置决定是否继续
if (configService.isStopOnError()) {
throw e;
}

processedBatches++;
}
}

} catch (Exception e) {
log.error("执行分批更新失败", e);
result.setStatus(BatchUpdateStatus.FAILED);
result.setErrorMessage(e.getMessage());
throw e;
}
}

/**
* 执行单批次更新
*/
private int executeSingleBatch(String tableName, String updateSql,
Map<String, Object> parameters, int offset, int batchSize) {
try (Connection connection = dataSource.getConnection()) {

// 构建分页SQL
String pagedSql = buildPagedSql(updateSql, offset, batchSize);

try (PreparedStatement statement = connection.prepareStatement(pagedSql)) {

// 设置参数
setParameters(statement, parameters);

// 执行更新
int updateCount = statement.executeUpdate();

log.debug("批次更新完成: 表={}, 偏移={}, 大小={}, 更新数量={}",
tableName, offset, batchSize, updateCount);

return updateCount;
}

} catch (SQLException e) {
log.error("执行单批次更新失败", e);
throw new DataUpdateException("执行单批次更新失败", e);
}
}

/**
* 构建分页SQL
*/
private String buildPagedSql(String updateSql, int offset, int batchSize) {
// 简单的分页实现,实际应用中需要根据数据库类型调整
return updateSql + " LIMIT " + offset + ", " + batchSize;
}

/**
* 设置参数
*/
private void setParameters(PreparedStatement statement, Map<String, Object> parameters)
throws SQLException {
int index = 1;
for (Map.Entry<String, Object> entry : parameters.entrySet()) {
statement.setObject(index++, entry.getValue());
}
}

/**
* 获取总数据量
*/
private long getTotalCount(String tableName, String updateSql, Map<String, Object> parameters) {
try (Connection connection = dataSource.getConnection()) {

// 构建计数SQL
String countSql = buildCountSql(updateSql);

try (PreparedStatement statement = connection.prepareStatement(countSql)) {

// 设置参数
setParameters(statement, parameters);

try (ResultSet resultSet = statement.executeQuery()) {
if (resultSet.next()) {
return resultSet.getLong(1);
}
}
}

} catch (SQLException e) {
log.error("获取总数据量失败", e);
throw new DataUpdateException("获取总数据量失败", e);
}

return 0;
}

/**
* 构建计数SQL
*/
private String buildCountSql(String updateSql) {
// 将UPDATE语句转换为SELECT COUNT语句
// 这是一个简化的实现,实际应用中需要更复杂的SQL解析
return "SELECT COUNT(*) FROM (" + updateSql.replaceFirst("UPDATE", "SELECT * FROM") + ") t";
}

/**
* 创建默认配置
*/
private BatchUpdateConfig createDefaultConfig(String tableName) {
BatchUpdateConfig config = new BatchUpdateConfig();
config.setTableName(tableName);
config.setBatchSize(DEFAULT_BATCH_SIZE);
config.setSleepInterval(100);
config.setStopOnError(false);
config.setMaxRetryCount(3);

return config;
}
}

2.2 增量更新策略

2.2.1 增量更新服务

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
/**
* 增量更新服务
*/
@Service
public class IncrementalUpdateService {

@Autowired
private DataSource dataSource;

@Autowired
private IncrementalUpdateConfigService configService;

@Autowired
private IncrementalUpdateProgressService progressService;

/**
* 增量更新数据
*/
public IncrementalUpdateResult incrementalUpdate(String tableName, String updateSql,
String timestampColumn, Date lastUpdateTime) {
try {
IncrementalUpdateResult result = new IncrementalUpdateResult();
result.setTableName(tableName);
result.setLastUpdateTime(lastUpdateTime);
result.setStartTime(new Date());

// 1. 获取增量更新配置
IncrementalUpdateConfig config = configService.getConfig(tableName);
if (config == null) {
config = createDefaultConfig(tableName);
}

// 2. 构建增量更新SQL
String incrementalSql = buildIncrementalSql(updateSql, timestampColumn, lastUpdateTime);

// 3. 执行增量更新
executeIncrementalUpdate(tableName, incrementalSql, result);

result.setStatus(IncrementalUpdateStatus.COMPLETED);
result.setEndTime(new Date());

log.info("增量更新完成: 表={}, 更新时间={}", tableName, lastUpdateTime);
return result;

} catch (Exception e) {
log.error("增量更新失败", e);
throw new DataUpdateException("增量更新失败", e);
}
}

/**
* 执行增量更新
*/
private void executeIncrementalUpdate(String tableName, String incrementalSql,
IncrementalUpdateResult result) {
try (Connection connection = dataSource.getConnection()) {

try (PreparedStatement statement = connection.prepareStatement(incrementalSql)) {

// 执行更新
int updateCount = statement.executeUpdate();

result.setUpdateCount(updateCount);

log.info("增量更新执行完成: 表={}, 更新数量={}", tableName, updateCount);
}

} catch (SQLException e) {
log.error("执行增量更新失败", e);
throw new DataUpdateException("执行增量更新失败", e);
}
}

/**
* 构建增量更新SQL
*/
private String buildIncrementalSql(String updateSql, String timestampColumn, Date lastUpdateTime) {
// 在UPDATE语句中添加时间条件
String incrementalSql = updateSql;

if (updateSql.toUpperCase().contains("WHERE")) {
incrementalSql += " AND " + timestampColumn + " > ?";
} else {
incrementalSql += " WHERE " + timestampColumn + " > ?";
}

return incrementalSql;
}

/**
* 创建默认配置
*/
private IncrementalUpdateConfig createDefaultConfig(String tableName) {
IncrementalUpdateConfig config = new IncrementalUpdateConfig();
config.setTableName(tableName);
config.setBatchSize(1000);
config.setSleepInterval(100);
config.setMaxRetryCount(3);

return config;
}

/**
* 获取最后更新时间
*/
public Date getLastUpdateTime(String tableName, String timestampColumn) {
try (Connection connection = dataSource.getConnection()) {

String sql = "SELECT MAX(" + timestampColumn + ") FROM " + tableName;

try (PreparedStatement statement = connection.prepareStatement(sql);
ResultSet resultSet = statement.executeQuery()) {

if (resultSet.next()) {
return resultSet.getTimestamp(1);
}
}

} catch (SQLException e) {
log.error("获取最后更新时间失败", e);
throw new DataUpdateException("获取最后更新时间失败", e);
}

return null;
}

/**
* 更新最后更新时间
*/
public void updateLastUpdateTime(String tableName, String timestampColumn, Date updateTime) {
try (Connection connection = dataSource.getConnection()) {

String sql = "UPDATE " + tableName + " SET " + timestampColumn + " = ? WHERE " +
timestampColumn + " IS NULL";

try (PreparedStatement statement = connection.prepareStatement(sql)) {
statement.setTimestamp(1, new Timestamp(updateTime.getTime()));
statement.executeUpdate();
}

} catch (SQLException e) {
log.error("更新最后更新时间失败", e);
throw new DataUpdateException("更新最后更新时间失败", e);
}
}
}

2.3 并行更新策略

2.3.1 并行更新服务

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
/**
* 并行更新服务
*/
@Service
public class ParallelUpdateService {

@Autowired
private DataSource dataSource;

@Autowired
private ParallelUpdateConfigService configService;

@Autowired
private ParallelUpdateProgressService progressService;

private final ExecutorService executorService = Executors.newFixedThreadPool(10);

/**
* 并行更新数据
*/
public ParallelUpdateResult parallelUpdate(String tableName, String updateSql,
Map<String, Object> parameters) {
try {
ParallelUpdateResult result = new ParallelUpdateResult();
result.setTableName(tableName);
result.setStartTime(new Date());

// 1. 获取并行更新配置
ParallelUpdateConfig config = configService.getConfig(tableName);
if (config == null) {
config = createDefaultConfig(tableName);
}

// 2. 计算总数据量
long totalCount = getTotalCount(tableName, updateSql, parameters);
result.setTotalCount(totalCount);

if (totalCount == 0) {
result.setStatus(ParallelUpdateStatus.COMPLETED);
result.setEndTime(new Date());
return result;
}

// 3. 数据分片
List<DataShard> shards = createDataShards(tableName, totalCount, config.getShardCount());
result.setShardCount(shards.size());

// 4. 并行执行更新
executeParallelUpdate(tableName, updateSql, parameters, shards, result);

result.setStatus(ParallelUpdateStatus.COMPLETED);
result.setEndTime(new Date());

log.info("并行更新完成: 表={}, 总数量={}, 分片数={}", tableName, totalCount, shards.size());
return result;

} catch (Exception e) {
log.error("并行更新失败", e);
throw new DataUpdateException("并行更新失败", e);
}
}

/**
* 执行并行更新
*/
private void executeParallelUpdate(String tableName, String updateSql,
Map<String, Object> parameters, List<DataShard> shards, ParallelUpdateResult result) {
try {
List<Future<ShardUpdateResult>> futures = new ArrayList<>();

// 提交并行任务
for (DataShard shard : shards) {
Future<ShardUpdateResult> future = executorService.submit(() -> {
return executeShardUpdate(tableName, updateSql, parameters, shard);
});
futures.add(future);
}

// 等待所有任务完成
int processedShards = 0;
int processedCount = 0;

for (Future<ShardUpdateResult> future : futures) {
try {
ShardUpdateResult shardResult = future.get();
processedShards++;
processedCount += shardResult.getUpdateCount();

// 更新进度
result.setProcessedShards(processedShards);
result.setProcessedCount(processedCount);
result.setProgress((double) processedShards / shards.size() * 100);

progressService.updateProgress(result);

} catch (Exception e) {
log.error("分片更新失败", e);
result.addFailedShard(processedShards);
}
}

} catch (Exception e) {
log.error("执行并行更新失败", e);
result.setStatus(ParallelUpdateStatus.FAILED);
result.setErrorMessage(e.getMessage());
throw e;
}
}

/**
* 执行分片更新
*/
private ShardUpdateResult executeShardUpdate(String tableName, String updateSql,
Map<String, Object> parameters, DataShard shard) {
try {
ShardUpdateResult result = new ShardUpdateResult();
result.setShardId(shard.getShardId());
result.setStartTime(new Date());

try (Connection connection = dataSource.getConnection()) {

// 构建分片SQL
String shardSql = buildShardSql(updateSql, shard);

try (PreparedStatement statement = connection.prepareStatement(shardSql)) {

// 设置参数
setParameters(statement, parameters);

// 执行更新
int updateCount = statement.executeUpdate();

result.setUpdateCount(updateCount);
result.setStatus(ShardUpdateStatus.COMPLETED);
result.setEndTime(new Date());

log.debug("分片更新完成: 分片={}, 更新数量={}", shard.getShardId(), updateCount);
}
}

return result;

} catch (Exception e) {
log.error("分片更新失败: 分片={}", shard.getShardId(), e);
ShardUpdateResult result = new ShardUpdateResult();
result.setShardId(shard.getShardId());
result.setStatus(ShardUpdateStatus.FAILED);
result.setErrorMessage(e.getMessage());
return result;
}
}

/**
* 构建分片SQL
*/
private String buildShardSql(String updateSql, DataShard shard) {
// 在UPDATE语句中添加分片条件
String shardSql = updateSql;

if (updateSql.toUpperCase().contains("WHERE")) {
shardSql += " AND " + shard.getCondition();
} else {
shardSql += " WHERE " + shard.getCondition();
}

return shardSql;
}

/**
* 创建数据分片
*/
private List<DataShard> createDataShards(String tableName, long totalCount, int shardCount) {
List<DataShard> shards = new ArrayList<>();

long shardSize = totalCount / shardCount;
long remainder = totalCount % shardCount;

for (int i = 0; i < shardCount; i++) {
DataShard shard = new DataShard();
shard.setShardId(i);

long startId = i * shardSize;
long endId = startId + shardSize - 1;

// 处理余数
if (i == shardCount - 1) {
endId += remainder;
}

shard.setStartId(startId);
shard.setEndId(endId);
shard.setCondition("id BETWEEN " + startId + " AND " + endId);

shards.add(shard);
}

return shards;
}

/**
* 获取总数据量
*/
private long getTotalCount(String tableName, String updateSql, Map<String, Object> parameters) {
try (Connection connection = dataSource.getConnection()) {

String countSql = "SELECT COUNT(*) FROM " + tableName;

try (PreparedStatement statement = connection.prepareStatement(countSql);
ResultSet resultSet = statement.executeQuery()) {

if (resultSet.next()) {
return resultSet.getLong(1);
}
}

} catch (SQLException e) {
log.error("获取总数据量失败", e);
throw new DataUpdateException("获取总数据量失败", e);
}

return 0;
}

/**
* 设置参数
*/
private void setParameters(PreparedStatement statement, Map<String, Object> parameters)
throws SQLException {
int index = 1;
for (Map.Entry<String, Object> entry : parameters.entrySet()) {
statement.setObject(index++, entry.getValue());
}
}

/**
* 创建默认配置
*/
private ParallelUpdateConfig createDefaultConfig(String tableName) {
ParallelUpdateConfig config = new ParallelUpdateConfig();
config.setTableName(tableName);
config.setShardCount(4);
config.setMaxRetryCount(3);

return config;
}
}

三、企业级数据更新方案

3.1 大数据更新管理服务

3.1.1 大数据更新管理服务

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
/**
* 大数据更新管理服务
*/
@Service
public class BigDataUpdateManagementService {

@Autowired
private BatchUpdateService batchUpdateService;

@Autowired
private IncrementalUpdateService incrementalUpdateService;

@Autowired
private ParallelUpdateService parallelUpdateService;

@Autowired
private DataUpdateConfigService configService;

@Autowired
private DataUpdateProgressService progressService;

/**
* 执行大数据更新
*/
public DataUpdateResult executeBigDataUpdate(DataUpdateRequest request) {
try {
DataUpdateResult result = new DataUpdateResult();
result.setRequestId(request.getRequestId());
result.setTableName(request.getTableName());
result.setStartTime(new Date());

// 1. 验证请求
validateRequest(request);

// 2. 选择更新策略
UpdateStrategy strategy = selectUpdateStrategy(request);
result.setStrategy(strategy);

// 3. 执行更新
switch (strategy) {
case BATCH:
BatchUpdateResult batchResult = batchUpdateService.batchUpdate(
request.getTableName(), request.getUpdateSql(), request.getParameters());
result.setBatchResult(batchResult);
break;

case INCREMENTAL:
IncrementalUpdateResult incrementalResult = incrementalUpdateService.incrementalUpdate(
request.getTableName(), request.getUpdateSql(),
request.getTimestampColumn(), request.getLastUpdateTime());
result.setIncrementalResult(incrementalResult);
break;

case PARALLEL:
ParallelUpdateResult parallelResult = parallelUpdateService.parallelUpdate(
request.getTableName(), request.getUpdateSql(), request.getParameters());
result.setParallelResult(parallelResult);
break;

default:
throw new IllegalArgumentException("不支持的更新策略: " + strategy);
}

result.setStatus(DataUpdateStatus.COMPLETED);
result.setEndTime(new Date());

log.info("大数据更新完成: 表={}, 策略={}", request.getTableName(), strategy);
return result;

} catch (Exception e) {
log.error("大数据更新失败", e);
throw new DataUpdateException("大数据更新失败", e);
}
}

/**
* 验证请求
*/
private void validateRequest(DataUpdateRequest request) {
if (request.getTableName() == null || request.getTableName().trim().isEmpty()) {
throw new IllegalArgumentException("表名不能为空");
}

if (request.getUpdateSql() == null || request.getUpdateSql().trim().isEmpty()) {
throw new IllegalArgumentException("更新SQL不能为空");
}

if (request.getParameters() == null) {
request.setParameters(new HashMap<>());
}
}

/**
* 选择更新策略
*/
private UpdateStrategy selectUpdateStrategy(DataUpdateRequest request) {
try {
// 获取表配置
DataUpdateConfig config = configService.getConfig(request.getTableName());

if (config != null && config.getStrategy() != null) {
return config.getStrategy();
}

// 根据数据量选择策略
long dataCount = getDataCount(request.getTableName());

if (dataCount < 100000) { // 10万以下
return UpdateStrategy.BATCH;
} else if (dataCount < 1000000) { // 100万以下
return UpdateStrategy.PARALLEL;
} else { // 100万以上
return UpdateStrategy.INCREMENTAL;
}

} catch (Exception e) {
log.error("选择更新策略失败", e);
return UpdateStrategy.BATCH; // 默认策略
}
}

/**
* 获取数据量
*/
private long getDataCount(String tableName) {
try (Connection connection = dataSource.getConnection()) {

String sql = "SELECT COUNT(*) FROM " + tableName;

try (PreparedStatement statement = connection.prepareStatement(sql);
ResultSet resultSet = statement.executeQuery()) {

if (resultSet.next()) {
return resultSet.getLong(1);
}
}

} catch (SQLException e) {
log.error("获取数据量失败", e);
throw new DataUpdateException("获取数据量失败", e);
}

return 0;
}

/**
* 异步执行大数据更新
*/
@Async
public CompletableFuture<DataUpdateResult> executeBigDataUpdateAsync(DataUpdateRequest request) {
try {
DataUpdateResult result = executeBigDataUpdate(request);
return CompletableFuture.completedFuture(result);
} catch (Exception e) {
return CompletableFuture.failedFuture(e);
}
}

/**
* 获取更新进度
*/
public DataUpdateProgress getUpdateProgress(String requestId) {
try {
return progressService.getProgress(requestId);
} catch (Exception e) {
log.error("获取更新进度失败", e);
throw new DataUpdateException("获取更新进度失败", e);
}
}

/**
* 取消更新任务
*/
public boolean cancelUpdateTask(String requestId) {
try {
return progressService.cancelTask(requestId);
} catch (Exception e) {
log.error("取消更新任务失败", e);
throw new DataUpdateException("取消更新任务失败", e);
}
}

/**
* 获取更新历史
*/
public List<DataUpdateResult> getUpdateHistory(String tableName, Date startTime, Date endTime) {
try {
return progressService.getHistory(tableName, startTime, endTime);
} catch (Exception e) {
log.error("获取更新历史失败", e);
throw new DataUpdateException("获取更新历史失败", e);
}
}
}

3.2 数据更新优化服务

3.2.1 数据更新优化服务

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
/**
* 数据更新优化服务
*/
@Service
public class DataUpdateOptimizationService {

@Autowired
private DataSource dataSource;

@Autowired
private DataUpdateConfigService configService;

/**
* 优化更新性能
*/
public UpdateOptimizationResult optimizeUpdatePerformance(String tableName, String updateSql) {
try {
UpdateOptimizationResult result = new UpdateOptimizationResult();
result.setTableName(tableName);
result.setStartTime(new Date());

// 1. 分析SQL
SqlAnalysisResult sqlAnalysis = analyzeSql(updateSql);
result.setSqlAnalysis(sqlAnalysis);

// 2. 优化索引
IndexOptimizationResult indexOptimization = optimizeIndexes(tableName, sqlAnalysis);
result.setIndexOptimization(indexOptimization);

// 3. 优化SQL
String optimizedSql = optimizeSql(updateSql, sqlAnalysis);
result.setOptimizedSql(optimizedSql);

// 4. 优化配置
UpdateConfigOptimizationResult configOptimization = optimizeConfig(tableName, sqlAnalysis);
result.setConfigOptimization(configOptimization);

result.setStatus(OptimizationStatus.COMPLETED);
result.setEndTime(new Date());

log.info("更新性能优化完成: 表={}", tableName);
return result;

} catch (Exception e) {
log.error("更新性能优化失败", e);
throw new DataUpdateException("更新性能优化失败", e);
}
}

/**
* 分析SQL
*/
private SqlAnalysisResult analyzeSql(String updateSql) {
try {
SqlAnalysisResult result = new SqlAnalysisResult();

// 简单的SQL分析,实际应用中需要更复杂的解析
if (updateSql.toUpperCase().contains("WHERE")) {
result.setHasWhereClause(true);
}

if (updateSql.toUpperCase().contains("JOIN")) {
result.setHasJoin(true);
}

if (updateSql.toUpperCase().contains("ORDER BY")) {
result.setHasOrderBy(true);
}

// 分析更新字段
List<String> updateFields = extractUpdateFields(updateSql);
result.setUpdateFields(updateFields);

// 分析WHERE条件
List<String> whereConditions = extractWhereConditions(updateSql);
result.setWhereConditions(whereConditions);

return result;

} catch (Exception e) {
log.error("分析SQL失败", e);
throw new DataUpdateException("分析SQL失败", e);
}
}

/**
* 提取更新字段
*/
private List<String> extractUpdateFields(String updateSql) {
List<String> fields = new ArrayList<>();

// 简单的字段提取,实际应用中需要更复杂的解析
String upperSql = updateSql.toUpperCase();
int setIndex = upperSql.indexOf("SET");

if (setIndex != -1) {
int whereIndex = upperSql.indexOf("WHERE");
String setClause = upperSql.substring(setIndex + 3,
whereIndex != -1 ? whereIndex : upperSql.length());

String[] parts = setClause.split(",");
for (String part : parts) {
String field = part.trim().split("=")[0].trim();
fields.add(field);
}
}

return fields;
}

/**
* 提取WHERE条件
*/
private List<String> extractWhereConditions(String updateSql) {
List<String> conditions = new ArrayList<>();

// 简单的条件提取,实际应用中需要更复杂的解析
String upperSql = updateSql.toUpperCase();
int whereIndex = upperSql.indexOf("WHERE");

if (whereIndex != -1) {
String whereClause = upperSql.substring(whereIndex + 5);
String[] parts = whereClause.split("AND|OR");

for (String part : parts) {
String condition = part.trim();
if (!condition.isEmpty()) {
conditions.add(condition);
}
}
}

return conditions;
}

/**
* 优化索引
*/
private IndexOptimizationResult optimizeIndexes(String tableName, SqlAnalysisResult sqlAnalysis) {
try {
IndexOptimizationResult result = new IndexOptimizationResult();
result.setTableName(tableName);

List<String> recommendedIndexes = new ArrayList<>();

// 根据WHERE条件推荐索引
for (String condition : sqlAnalysis.getWhereConditions()) {
String indexField = extractIndexField(condition);
if (indexField != null) {
recommendedIndexes.add(indexField);
}
}

result.setRecommendedIndexes(recommendedIndexes);

// 检查现有索引
List<String> existingIndexes = getExistingIndexes(tableName);
result.setExistingIndexes(existingIndexes);

// 计算索引优化建议
List<String> optimizationSuggestions = calculateIndexOptimization(
recommendedIndexes, existingIndexes);
result.setOptimizationSuggestions(optimizationSuggestions);

return result;

} catch (Exception e) {
log.error("优化索引失败", e);
throw new DataUpdateException("优化索引失败", e);
}
}

/**
* 提取索引字段
*/
private String extractIndexField(String condition) {
// 简单的字段提取,实际应用中需要更复杂的解析
if (condition.contains("=")) {
return condition.split("=")[0].trim();
}

return null;
}

/**
* 获取现有索引
*/
private List<String> getExistingIndexes(String tableName) {
List<String> indexes = new ArrayList<>();

try (Connection connection = dataSource.getConnection()) {

String sql = "SHOW INDEX FROM " + tableName;

try (PreparedStatement statement = connection.prepareStatement(sql);
ResultSet resultSet = statement.executeQuery()) {

while (resultSet.next()) {
String indexName = resultSet.getString("Key_name");
if (!indexes.contains(indexName)) {
indexes.add(indexName);
}
}
}

} catch (SQLException e) {
log.error("获取现有索引失败", e);
}

return indexes;
}

/**
* 计算索引优化建议
*/
private List<String> calculateIndexOptimization(List<String> recommendedIndexes,
List<String> existingIndexes) {
List<String> suggestions = new ArrayList<>();

for (String recommendedIndex : recommendedIndexes) {
if (!existingIndexes.contains(recommendedIndex)) {
suggestions.add("建议创建索引: " + recommendedIndex);
}
}

return suggestions;
}

/**
* 优化SQL
*/
private String optimizeSql(String updateSql, SqlAnalysisResult sqlAnalysis) {
String optimizedSql = updateSql;

// 简单的SQL优化,实际应用中需要更复杂的优化
if (sqlAnalysis.isHasOrderBy()) {
// 移除不必要的ORDER BY
optimizedSql = optimizedSql.replaceAll("ORDER BY.*", "");
}

return optimizedSql;
}

/**
* 优化配置
*/
private UpdateConfigOptimizationResult optimizeConfig(String tableName, SqlAnalysisResult sqlAnalysis) {
try {
UpdateConfigOptimizationResult result = new UpdateConfigOptimizationResult();
result.setTableName(tableName);

// 根据SQL分析结果优化配置
DataUpdateConfig config = configService.getConfig(tableName);
if (config == null) {
config = new DataUpdateConfig();
config.setTableName(tableName);
}

// 优化批次大小
if (sqlAnalysis.isHasJoin()) {
config.setBatchSize(500); // 有JOIN时减小批次大小
} else {
config.setBatchSize(1000); // 无JOIN时增大批次大小
}

// 优化休眠时间
if (sqlAnalysis.isHasWhereClause()) {
config.setSleepInterval(50); // 有WHERE条件时减少休眠时间
} else {
config.setSleepInterval(100); // 无WHERE条件时增加休眠时间
}

result.setOptimizedConfig(config);

return result;

} catch (Exception e) {
log.error("优化配置失败", e);
throw new DataUpdateException("优化配置失败", e);
}
}
}

四、性能优化与监控

4.1 性能优化

4.1.1 数据更新性能优化

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
/**
* 数据更新性能优化服务
*/
@Service
public class DataUpdatePerformanceOptimizationService {

@Autowired
private RedisTemplate<String, Object> redisTemplate;

@Autowired
private CaffeineCache localCache;

private final String UPDATE_CACHE_PREFIX = "update_cache:";

/**
* 缓存更新结果
*/
public void cacheUpdateResult(String tableName, String updateSql, Object result) {
String cacheKey = UPDATE_CACHE_PREFIX + tableName + ":" + DigestUtil.md5Hex(updateSql);

try {
// 写入本地缓存
localCache.put(cacheKey, result);

// 写入Redis缓存
String redisCacheKey = "redis_cache:" + cacheKey;
redisTemplate.opsForValue().set(redisCacheKey, result, Duration.ofHours(1));

} catch (Exception e) {
log.error("缓存更新结果失败", e);
}
}

/**
* 获取缓存的更新结果
*/
public Object getCachedUpdateResult(String tableName, String updateSql) {
String cacheKey = UPDATE_CACHE_PREFIX + tableName + ":" + DigestUtil.md5Hex(updateSql);

try {
// 从本地缓存获取
Object cachedResult = localCache.getIfPresent(cacheKey);
if (cachedResult != null) {
return cachedResult;
}

// 从Redis获取
String redisCacheKey = "redis_cache:" + cacheKey;
Object redisResult = redisTemplate.opsForValue().get(redisCacheKey);
if (redisResult != null) {
localCache.put(cacheKey, redisResult);
return redisResult;
}

} catch (Exception e) {
log.error("获取缓存的更新结果失败", e);
}

return null;
}

/**
* 批量更新优化
*/
public BatchUpdateOptimizationResult optimizeBatchUpdate(String tableName, String updateSql,
Map<String, Object> parameters) {
try {
BatchUpdateOptimizationResult result = new BatchUpdateOptimizationResult();
result.setTableName(tableName);
result.setStartTime(new Date());

// 1. 分析更新模式
UpdatePattern pattern = analyzeUpdatePattern(updateSql);
result.setPattern(pattern);

// 2. 优化批次大小
int optimalBatchSize = calculateOptimalBatchSize(tableName, pattern);
result.setOptimalBatchSize(optimalBatchSize);

// 3. 优化更新策略
UpdateStrategy optimalStrategy = calculateOptimalStrategy(tableName, pattern);
result.setOptimalStrategy(optimalStrategy);

// 4. 优化参数
Map<String, Object> optimizedParameters = optimizeParameters(parameters, pattern);
result.setOptimizedParameters(optimizedParameters);

result.setStatus(OptimizationStatus.COMPLETED);
result.setEndTime(new Date());

return result;

} catch (Exception e) {
log.error("批量更新优化失败", e);
throw new DataUpdateException("批量更新优化失败", e);
}
}

/**
* 分析更新模式
*/
private UpdatePattern analyzeUpdatePattern(String updateSql) {
UpdatePattern pattern = new UpdatePattern();

String upperSql = updateSql.toUpperCase();

if (upperSql.contains("WHERE")) {
pattern.setHasWhereClause(true);
}

if (upperSql.contains("JOIN")) {
pattern.setHasJoin(true);
}

if (upperSql.contains("ORDER BY")) {
pattern.setHasOrderBy(true);
}

if (upperSql.contains("LIMIT")) {
pattern.setHasLimit(true);
}

return pattern;
}

/**
* 计算最优批次大小
*/
private int calculateOptimalBatchSize(String tableName, UpdatePattern pattern) {
int baseBatchSize = 1000;

if (pattern.isHasJoin()) {
baseBatchSize = 500; // 有JOIN时减小批次大小
}

if (pattern.isHasOrderBy()) {
baseBatchSize = 300; // 有ORDER BY时进一步减小批次大小
}

return baseBatchSize;
}

/**
* 计算最优策略
*/
private UpdateStrategy calculateOptimalStrategy(String tableName, UpdatePattern pattern) {
if (pattern.isHasWhereClause()) {
return UpdateStrategy.BATCH; // 有WHERE条件时使用分批更新
} else {
return UpdateStrategy.PARALLEL; // 无WHERE条件时使用并行更新
}
}

/**
* 优化参数
*/
private Map<String, Object> optimizeParameters(Map<String, Object> parameters, UpdatePattern pattern) {
Map<String, Object> optimizedParameters = new HashMap<>(parameters);

// 根据模式优化参数
if (pattern.isHasJoin()) {
optimizedParameters.put("optimize_join", true);
}

if (pattern.isHasOrderBy()) {
optimizedParameters.put("optimize_order", true);
}

return optimizedParameters;
}

/**
* 预热更新缓存
*/
@PostConstruct
public void warmupUpdateCache() {
try {
// 预热常用更新结果
List<String> commonTables = Arrays.asList("user", "order", "product");

for (String tableName : commonTables) {
try {
String updateSql = "UPDATE " + tableName + " SET status = ? WHERE id = ?";
Object result = new Object();
cacheUpdateResult(tableName, updateSql, result);
} catch (Exception e) {
log.error("预热更新缓存失败: {}", tableName, e);
}
}

} catch (Exception e) {
log.error("预热更新缓存失败", e);
}
}

/**
* 清理过期缓存
*/
@Scheduled(fixedRate = 300000) // 5分钟
public void cleanupExpiredCache() {
try {
// 清理本地缓存
localCache.cleanUp();

// 清理Redis过期缓存
cleanupRedisExpiredCache();

} catch (Exception e) {
log.error("清理过期缓存失败", e);
}
}

/**
* 清理Redis过期缓存
*/
private void cleanupRedisExpiredCache() {
try {
Set<String> cacheKeys = redisTemplate.keys("redis_cache:" + UPDATE_CACHE_PREFIX + "*");

for (String key : cacheKeys) {
Long ttl = redisTemplate.getExpire(key);
if (ttl != null && ttl <= 0) {
redisTemplate.delete(key);
}
}

} catch (Exception e) {
log.error("清理Redis过期缓存失败", e);
}
}
}

4.2 监控告警

4.2.1 监控指标

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
/**
* 数据更新监控指标
*/
@Component
public class DataUpdateMetrics {

private final MeterRegistry meterRegistry;

public DataUpdateMetrics(MeterRegistry meterRegistry) {
this.meterRegistry = meterRegistry;
}

/**
* 记录更新次数
*/
public void recordUpdateCount(String tableName, String strategy) {
Counter.builder("data.update.count")
.description("数据更新次数")
.tag("table_name", tableName)
.tag("strategy", strategy)
.register(meterRegistry)
.increment();
}

/**
* 记录更新时间
*/
public void recordUpdateTime(String tableName, String strategy, long duration) {
Timer.builder("data.update.time")
.description("数据更新时间")
.tag("table_name", tableName)
.tag("strategy", strategy)
.register(meterRegistry)
.record(duration, TimeUnit.MILLISECONDS);
}

/**
* 记录更新数据量
*/
public void recordUpdateDataCount(String tableName, long count) {
Counter.builder("data.update.data.count")
.description("数据更新数量")
.tag("table_name", tableName)
.register(meterRegistry)
.increment(count);
}

/**
* 记录更新成功率
*/
public void recordUpdateSuccessRate(String tableName, double successRate) {
Gauge.builder("data.update.success.rate")
.description("数据更新成功率")
.tag("table_name", tableName)
.register(meterRegistry, successRate);
}

/**
* 记录更新失败率
*/
public void recordUpdateFailureRate(String tableName, double failureRate) {
Gauge.builder("data.update.failure.rate")
.description("数据更新失败率")
.tag("table_name", tableName)
.register(meterRegistry, failureRate);
}

/**
* 记录更新吞吐量
*/
public void recordUpdateThroughput(String tableName, double throughput) {
Gauge.builder("data.update.throughput")
.description("数据更新吞吐量")
.tag("table_name", tableName)
.register(meterRegistry, throughput);
}

/**
* 记录更新异常次数
*/
public void recordUpdateExceptionCount(String tableName, String exceptionType) {
Counter.builder("data.update.exception.count")
.description("数据更新异常次数")
.tag("table_name", tableName)
.tag("exception_type", exceptionType)
.register(meterRegistry)
.increment();
}
}

4.2.2 告警规则

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
# prometheus-rules.yml
groups:
- name: data_update_alerts
rules:
- alert: HighDataUpdateFailureRate
expr: data_update_failure_rate > 0.1
for: 2m
labels:
severity: warning
annotations:
summary: "数据更新失败率过高"
description: "数据更新失败率超过10%,当前值: {{ $value }}"

- alert: HighDataUpdateTime
expr: data_update_time{quantile="0.95"} > 300000
for: 2m
labels:
severity: warning
annotations:
summary: "数据更新时间过长"
description: "数据更新时间P95超过5分钟,当前值: {{ $value }}ms"

- alert: LowDataUpdateThroughput
expr: data_update_throughput < 100
for: 5m
labels:
severity: warning
annotations:
summary: "数据更新吞吐量过低"
description: "数据更新吞吐量低于100条/秒,当前值: {{ $value }}"

- alert: HighDataUpdateExceptionCount
expr: rate(data_update_exception_count[5m]) > 5
for: 2m
labels:
severity: critical
annotations:
summary: "数据更新异常次数过多"
description: "数据更新异常频率超过5次/分钟,当前值: {{ $value }}"

- alert: DataUpdateProgressStalled
expr: increase(data_update_progress[10m]) == 0
for: 5m
labels:
severity: warning
annotations:
summary: "数据更新进度停滞"
description: "数据更新进度在10分钟内无变化"

五、总结

千万级数据的全表更新作为大数据处理中的重要场景,通过合理的大数据更新策略和分批处理方案,能够在不影响业务运行的情况下完成大规模数据更新。本文从大数据更新策略到分批处理方案,从基础实现到企业级应用,系统梳理了千万级数据全表更新的完整解决方案。

5.1 关键要点

  1. 分批更新:将大规模更新分解为小批次处理,避免长时间锁表
  2. 增量更新:基于时间戳的增量更新,减少不必要的数据处理
  3. 并行更新:使用多线程并行处理,提高更新效率
  4. 性能优化:通过索引优化、SQL优化等手段提高更新性能
  5. 监控告警:建立完善的监控体系,及时发现和处理问题

5.2 最佳实践

  1. 策略选择:根据数据量、更新模式选择合适的更新策略
  2. 分批处理:合理设置批次大小,平衡性能和资源消耗
  3. 性能优化:优化索引、SQL和配置参数
  4. 监控告警:建立完善的监控体系,确保更新过程稳定
  5. 异常处理:完善的异常处理和重试机制

通过以上措施,可以构建一个高效、稳定、可扩展的千万级数据更新方案,为企业的各种业务场景提供数据更新支持。