前言

海量数据统计作为现代大数据应用的核心功能之一,直接影响着业务决策和系统性能。通过合理的大数据统计策略和性能优化,能够构建一个高效、稳定、可扩展的海量数据统计系统,确保系统的稳定运行。本文从大数据统计策略到性能优化,从基础实现到企业级应用,系统梳理海量数据统计性能优化的完整解决方案。

一、海量数据统计架构设计

1.1 海量数据统计整体架构

1.2 海量数据统计策略架构

二、海量数据统计策略实现

2.1 实时统计策略

2.1.1 实时统计服务

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
/**
* 实时统计服务
*/
@Service
public class RealTimeStatisticsService {

@Autowired
private RedisTemplate<String, Object> redisTemplate;

@Autowired
private KafkaTemplate<String, Object> kafkaTemplate;

@Autowired
private ElasticsearchRestTemplate elasticsearchTemplate;

@Autowired
private StatisticsRepository statisticsRepository;

private final String REAL_TIME_STATISTICS_CACHE_PREFIX = "real_time_statistics:";
private final long REAL_TIME_STATISTICS_CACHE_EXPIRE = 300; // 5分钟

/**
* 实时统计数据处理
*/
public void processRealTimeStatistics(StatisticsData data) {
try {
// 1. 验证数据
validateStatisticsData(data);

// 2. 数据预处理
StatisticsData processedData = preprocessStatisticsData(data);

// 3. 实时统计计算
StatisticsResult result = calculateRealTimeStatistics(processedData);

// 4. 更新缓存
updateRealTimeStatisticsCache(result);

// 5. 发送统计事件
sendStatisticsEvent(result);

// 6. 记录统计日志
recordStatisticsLog(data, result);

log.info("实时统计处理完成: {}", data.getDataId());

} catch (Exception e) {
log.error("实时统计处理失败: {}", data.getDataId(), e);
}
}

/**
* 验证统计数据
*/
private void validateStatisticsData(StatisticsData data) {
if (data == null) {
throw new IllegalArgumentException("统计数据不能为空");
}

if (data.getDataId() == null) {
throw new IllegalArgumentException("数据ID不能为空");
}

if (data.getTimestamp() == null) {
throw new IllegalArgumentException("时间戳不能为空");
}
}

/**
* 数据预处理
*/
private StatisticsData preprocessStatisticsData(StatisticsData data) {
try {
// 1. 数据清洗
StatisticsData cleanedData = cleanStatisticsData(data);

// 2. 数据转换
StatisticsData transformedData = transformStatisticsData(cleanedData);

// 3. 数据验证
StatisticsData validatedData = validateProcessedData(transformedData);

return validatedData;

} catch (Exception e) {
log.error("数据预处理失败", e);
throw new StatisticsException("数据预处理失败", e);
}
}

/**
* 数据清洗
*/
private StatisticsData cleanStatisticsData(StatisticsData data) {
try {
StatisticsData cleanedData = new StatisticsData();
cleanedData.setDataId(data.getDataId());
cleanedData.setTimestamp(data.getTimestamp());

// 清洗数值字段
if (data.getNumericValue() != null) {
cleanedData.setNumericValue(data.getNumericValue());
}

// 清洗字符串字段
if (data.getStringValue() != null) {
cleanedData.setStringValue(data.getStringValue().trim());
}

// 清洗分类字段
if (data.getCategory() != null) {
cleanedData.setCategory(data.getCategory().trim().toLowerCase());
}

return cleanedData;

} catch (Exception e) {
log.error("数据清洗失败", e);
throw new StatisticsException("数据清洗失败", e);
}
}

/**
* 数据转换
*/
private StatisticsData transformStatisticsData(StatisticsData data) {
try {
StatisticsData transformedData = new StatisticsData();
transformedData.setDataId(data.getDataId());
transformedData.setTimestamp(data.getTimestamp());
transformedData.setNumericValue(data.getNumericValue());
transformedData.setStringValue(data.getStringValue());
transformedData.setCategory(data.getCategory());

// 添加计算字段
if (data.getNumericValue() != null) {
transformedData.setSquaredValue(data.getNumericValue() * data.getNumericValue());
transformedData.setLogValue(Math.log(Math.max(data.getNumericValue(), 1.0)));
}

return transformedData;

} catch (Exception e) {
log.error("数据转换失败", e);
throw new StatisticsException("数据转换失败", e);
}
}

/**
* 验证处理后的数据
*/
private StatisticsData validateProcessedData(StatisticsData data) {
try {
// 验证数值范围
if (data.getNumericValue() != null) {
if (data.getNumericValue() < 0 || data.getNumericValue() > 1000000) {
throw new IllegalArgumentException("数值超出有效范围");
}
}

// 验证字符串长度
if (data.getStringValue() != null && data.getStringValue().length() > 1000) {
throw new IllegalArgumentException("字符串长度超出限制");
}

return data;

} catch (Exception e) {
log.error("数据验证失败", e);
throw new StatisticsException("数据验证失败", e);
}
}

/**
* 实时统计计算
*/
private StatisticsResult calculateRealTimeStatistics(StatisticsData data) {
try {
StatisticsResult result = new StatisticsResult();
result.setDataId(data.getDataId());
result.setTimestamp(data.getTimestamp());
result.setCategory(data.getCategory());

// 计算基础统计指标
if (data.getNumericValue() != null) {
result.setCount(1L);
result.setSum(data.getNumericValue());
result.setAverage(data.getNumericValue());
result.setMin(data.getNumericValue());
result.setMax(data.getNumericValue());
result.setVariance(0.0);
result.setStandardDeviation(0.0);
}

// 计算分类统计
result.setCategoryCount(1L);
result.setCategorySum(data.getNumericValue());

return result;

} catch (Exception e) {
log.error("实时统计计算失败", e);
throw new StatisticsException("实时统计计算失败", e);
}
}

/**
* 更新实时统计缓存
*/
private void updateRealTimeStatisticsCache(StatisticsResult result) {
try {
String cacheKey = REAL_TIME_STATISTICS_CACHE_PREFIX + result.getCategory();

// 获取现有统计结果
StatisticsResult existingResult = (StatisticsResult) redisTemplate.opsForValue().get(cacheKey);

if (existingResult == null) {
existingResult = new StatisticsResult();
existingResult.setCategory(result.getCategory());
existingResult.setCount(0L);
existingResult.setSum(0.0);
existingResult.setMin(Double.MAX_VALUE);
existingResult.setMax(Double.MIN_VALUE);
}

// 更新统计结果
existingResult.setCount(existingResult.getCount() + result.getCount());
existingResult.setSum(existingResult.getSum() + result.getSum());
existingResult.setAverage(existingResult.getSum() / existingResult.getCount());
existingResult.setMin(Math.min(existingResult.getMin(), result.getMin()));
existingResult.setMax(Math.max(existingResult.getMax(), result.getMax()));

// 计算方差和标准差
double variance = calculateVariance(existingResult);
existingResult.setVariance(variance);
existingResult.setStandardDeviation(Math.sqrt(variance));

// 更新缓存
redisTemplate.opsForValue().set(cacheKey, existingResult, Duration.ofSeconds(REAL_TIME_STATISTICS_CACHE_EXPIRE));

} catch (Exception e) {
log.error("更新实时统计缓存失败", e);
}
}

/**
* 计算方差
*/
private double calculateVariance(StatisticsResult result) {
try {
// 简化的方差计算
double mean = result.getAverage();
double sumSquaredDiff = result.getSum() - mean * result.getCount();
return sumSquaredDiff / result.getCount();

} catch (Exception e) {
log.error("计算方差失败", e);
return 0.0;
}
}

/**
* 发送统计事件
*/
private void sendStatisticsEvent(StatisticsResult result) {
try {
StatisticsEvent event = new StatisticsEvent();
event.setDataId(result.getDataId());
event.setCategory(result.getCategory());
event.setCount(result.getCount());
event.setSum(result.getSum());
event.setAverage(result.getAverage());
event.setEventTime(LocalDateTime.now());
event.setEventType("REAL_TIME_STATISTICS");

kafkaTemplate.send("statistics.event.topic", event);

} catch (Exception e) {
log.error("发送统计事件失败", e);
}
}

/**
* 记录统计日志
*/
private void recordStatisticsLog(StatisticsData data, StatisticsResult result) {
try {
StatisticsLog log = new StatisticsLog();
log.setDataId(data.getDataId());
log.setCategory(data.getCategory());
log.setNumericValue(data.getNumericValue());
log.setStringValue(data.getStringValue());
log.setCount(result.getCount());
log.setSum(result.getSum());
log.setAverage(result.getAverage());
log.setProcessTime(LocalDateTime.now());

// 异步记录日志
CompletableFuture.runAsync(() -> {
try {
statisticsRepository.saveStatisticsLog(log);
} catch (Exception e) {
log.error("保存统计日志失败", e);
}
});

} catch (Exception e) {
log.error("记录统计日志失败", e);
}
}
}

2.2 离线统计策略

2.2.1 离线统计服务

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
/**
* 离线统计服务
*/
@Service
public class OfflineStatisticsService {

@Autowired
private StatisticsRepository statisticsRepository;

@Autowired
private RedisTemplate<String, Object> redisTemplate;

@Autowired
private ThreadPoolTaskExecutor taskExecutor;

private final String OFFLINE_STATISTICS_CACHE_PREFIX = "offline_statistics:";
private final int BATCH_SIZE = 1000; // 批量处理大小

/**
* 离线统计数据处理
*/
public void processOfflineStatistics(OfflineStatisticsRequest request) {
try {
// 1. 验证请求
validateOfflineStatisticsRequest(request);

// 2. 查询数据
List<StatisticsData> dataList = queryStatisticsData(request);

// 3. 批量处理数据
batchProcessStatisticsData(dataList, request);

// 4. 生成统计报告
generateStatisticsReport(request);

log.info("离线统计处理完成: {}", request.getRequestId());

} catch (Exception e) {
log.error("离线统计处理失败: {}", request.getRequestId(), e);
}
}

/**
* 验证离线统计请求
*/
private void validateOfflineStatisticsRequest(OfflineStatisticsRequest request) {
if (request == null) {
throw new IllegalArgumentException("离线统计请求不能为空");
}

if (request.getRequestId() == null) {
throw new IllegalArgumentException("请求ID不能为空");
}

if (request.getStartTime() == null || request.getEndTime() == null) {
throw new IllegalArgumentException("时间范围不能为空");
}

if (request.getStartTime().isAfter(request.getEndTime())) {
throw new IllegalArgumentException("开始时间不能晚于结束时间");
}
}

/**
* 查询统计数据
*/
private List<StatisticsData> queryStatisticsData(OfflineStatisticsRequest request) {
try {
// 1. 构建查询条件
StatisticsQuery query = buildStatisticsQuery(request);

// 2. 执行查询
List<StatisticsData> dataList = statisticsRepository.findByQuery(query);

// 3. 数据预处理
List<StatisticsData> processedDataList = preprocessDataList(dataList);

return processedDataList;

} catch (Exception e) {
log.error("查询统计数据失败", e);
throw new StatisticsException("查询统计数据失败", e);
}
}

/**
* 构建统计查询
*/
private StatisticsQuery buildStatisticsQuery(OfflineStatisticsRequest request) {
try {
StatisticsQuery query = new StatisticsQuery();
query.setStartTime(request.getStartTime());
query.setEndTime(request.getEndTime());
query.setCategory(request.getCategory());
query.setLimit(request.getLimit());
query.setOffset(request.getOffset());

return query;

} catch (Exception e) {
log.error("构建统计查询失败", e);
throw new StatisticsException("构建统计查询失败", e);
}
}

/**
* 数据预处理
*/
private List<StatisticsData> preprocessDataList(List<StatisticsData> dataList) {
try {
List<StatisticsData> processedList = new ArrayList<>();

for (StatisticsData data : dataList) {
try {
// 数据清洗
StatisticsData cleanedData = cleanData(data);

// 数据验证
if (validateData(cleanedData)) {
processedList.add(cleanedData);
}

} catch (Exception e) {
log.error("数据预处理失败: {}", data.getDataId(), e);
}
}

return processedList;

} catch (Exception e) {
log.error("数据预处理失败", e);
throw new StatisticsException("数据预处理失败", e);
}
}

/**
* 数据清洗
*/
private StatisticsData cleanData(StatisticsData data) {
try {
StatisticsData cleanedData = new StatisticsData();
cleanedData.setDataId(data.getDataId());
cleanedData.setTimestamp(data.getTimestamp());

// 清洗数值字段
if (data.getNumericValue() != null) {
cleanedData.setNumericValue(data.getNumericValue());
}

// 清洗字符串字段
if (data.getStringValue() != null) {
cleanedData.setStringValue(data.getStringValue().trim());
}

// 清洗分类字段
if (data.getCategory() != null) {
cleanedData.setCategory(data.getCategory().trim().toLowerCase());
}

return cleanedData;

} catch (Exception e) {
log.error("数据清洗失败", e);
throw new StatisticsException("数据清洗失败", e);
}
}

/**
* 数据验证
*/
private boolean validateData(StatisticsData data) {
try {
// 验证必要字段
if (data.getDataId() == null || data.getTimestamp() == null) {
return false;
}

// 验证数值范围
if (data.getNumericValue() != null) {
if (data.getNumericValue() < 0 || data.getNumericValue() > 1000000) {
return false;
}
}

// 验证字符串长度
if (data.getStringValue() != null && data.getStringValue().length() > 1000) {
return false;
}

return true;

} catch (Exception e) {
log.error("数据验证失败", e);
return false;
}
}

/**
* 批量处理统计数据
*/
private void batchProcessStatisticsData(List<StatisticsData> dataList, OfflineStatisticsRequest request) {
try {
// 分批处理
for (int i = 0; i < dataList.size(); i += BATCH_SIZE) {
int endIndex = Math.min(i + BATCH_SIZE, dataList.size());
List<StatisticsData> batch = dataList.subList(i, endIndex);

// 异步处理每批数据
CompletableFuture.runAsync(() -> {
processBatchStatisticsData(batch, request);
}, taskExecutor);
}

} catch (Exception e) {
log.error("批量处理统计数据失败", e);
}
}

/**
* 处理一批统计数据
*/
private void processBatchStatisticsData(List<StatisticsData> batch, OfflineStatisticsRequest request) {
try {
// 1. 计算统计指标
StatisticsResult result = calculateBatchStatistics(batch);

// 2. 更新统计结果
updateOfflineStatisticsResult(result, request);

// 3. 记录处理日志
recordBatchProcessingLog(batch, result, request);

} catch (Exception e) {
log.error("处理批量统计数据失败", e);
}
}

/**
* 计算批量统计指标
*/
private StatisticsResult calculateBatchStatistics(List<StatisticsData> batch) {
try {
StatisticsResult result = new StatisticsResult();
result.setCount((long) batch.size());

if (batch.isEmpty()) {
return result;
}

// 计算基础统计指标
double sum = 0.0;
double min = Double.MAX_VALUE;
double max = Double.MIN_VALUE;

for (StatisticsData data : batch) {
if (data.getNumericValue() != null) {
double value = data.getNumericValue();
sum += value;
min = Math.min(min, value);
max = Math.max(max, value);
}
}

result.setSum(sum);
result.setAverage(sum / batch.size());
result.setMin(min == Double.MAX_VALUE ? 0.0 : min);
result.setMax(max == Double.MIN_VALUE ? 0.0 : max);

// 计算方差和标准差
double variance = calculateVariance(batch, result.getAverage());
result.setVariance(variance);
result.setStandardDeviation(Math.sqrt(variance));

return result;

} catch (Exception e) {
log.error("计算批量统计指标失败", e);
throw new StatisticsException("计算批量统计指标失败", e);
}
}

/**
* 计算方差
*/
private double calculateVariance(List<StatisticsData> batch, double mean) {
try {
double sumSquaredDiff = 0.0;
int count = 0;

for (StatisticsData data : batch) {
if (data.getNumericValue() != null) {
double diff = data.getNumericValue() - mean;
sumSquaredDiff += diff * diff;
count++;
}
}

return count > 0 ? sumSquaredDiff / count : 0.0;

} catch (Exception e) {
log.error("计算方差失败", e);
return 0.0;
}
}

/**
* 更新离线统计结果
*/
private void updateOfflineStatisticsResult(StatisticsResult result, OfflineStatisticsRequest request) {
try {
String cacheKey = OFFLINE_STATISTICS_CACHE_PREFIX + request.getRequestId();

// 获取现有统计结果
StatisticsResult existingResult = (StatisticsResult) redisTemplate.opsForValue().get(cacheKey);

if (existingResult == null) {
existingResult = new StatisticsResult();
existingResult.setCount(0L);
existingResult.setSum(0.0);
existingResult.setMin(Double.MAX_VALUE);
existingResult.setMax(Double.MIN_VALUE);
}

// 更新统计结果
existingResult.setCount(existingResult.getCount() + result.getCount());
existingResult.setSum(existingResult.getSum() + result.getSum());
existingResult.setAverage(existingResult.getSum() / existingResult.getCount());
existingResult.setMin(Math.min(existingResult.getMin(), result.getMin()));
existingResult.setMax(Math.max(existingResult.getMax(), result.getMax()));

// 更新缓存
redisTemplate.opsForValue().set(cacheKey, existingResult, Duration.ofHours(24));

} catch (Exception e) {
log.error("更新离线统计结果失败", e);
}
}

/**
* 记录批量处理日志
*/
private void recordBatchProcessingLog(List<StatisticsData> batch, StatisticsResult result, OfflineStatisticsRequest request) {
try {
BatchProcessingLog log = new BatchProcessingLog();
log.setRequestId(request.getRequestId());
log.setBatchSize(batch.size());
log.setCount(result.getCount());
log.setSum(result.getSum());
log.setAverage(result.getAverage());
log.setProcessTime(LocalDateTime.now());

// 异步记录日志
CompletableFuture.runAsync(() -> {
try {
statisticsRepository.saveBatchProcessingLog(log);
} catch (Exception e) {
log.error("保存批量处理日志失败", e);
}
});

} catch (Exception e) {
log.error("记录批量处理日志失败", e);
}
}

/**
* 生成统计报告
*/
private void generateStatisticsReport(OfflineStatisticsRequest request) {
try {
String cacheKey = OFFLINE_STATISTICS_CACHE_PREFIX + request.getRequestId();
StatisticsResult result = (StatisticsResult) redisTemplate.opsForValue().get(cacheKey);

if (result != null) {
StatisticsReport report = new StatisticsReport();
report.setRequestId(request.getRequestId());
report.setStartTime(request.getStartTime());
report.setEndTime(request.getEndTime());
report.setCount(result.getCount());
report.setSum(result.getSum());
report.setAverage(result.getAverage());
report.setMin(result.getMin());
report.setMax(result.getMax());
report.setVariance(result.getVariance());
report.setStandardDeviation(result.getStandardDeviation());
report.setGenerateTime(LocalDateTime.now());

// 保存统计报告
statisticsRepository.saveStatisticsReport(report);

log.info("统计报告生成完成: {}", request.getRequestId());
}

} catch (Exception e) {
log.error("生成统计报告失败", e);
}
}
}

2.3 增量统计策略

2.3.1 增量统计服务

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
/**
* 增量统计服务
*/
@Service
public class IncrementalStatisticsService {

@Autowired
private StatisticsRepository statisticsRepository;

@Autowired
private RedisTemplate<String, Object> redisTemplate;

@Autowired
private KafkaTemplate<String, Object> kafkaTemplate;

private final String INCREMENTAL_STATISTICS_CACHE_PREFIX = "incremental_statistics:";
private final String INCREMENTAL_STATISTICS_OFFSET_PREFIX = "incremental_offset:";

/**
* 增量统计数据处理
*/
public void processIncrementalStatistics(IncrementalStatisticsRequest request) {
try {
// 1. 验证请求
validateIncrementalStatisticsRequest(request);

// 2. 获取增量数据
List<StatisticsData> incrementalData = getIncrementalData(request);

// 3. 处理增量数据
processIncrementalData(incrementalData, request);

// 4. 更新偏移量
updateOffset(request);

log.info("增量统计处理完成: {}", request.getRequestId());

} catch (Exception e) {
log.error("增量统计处理失败: {}", request.getRequestId(), e);
}
}

/**
* 验证增量统计请求
*/
private void validateIncrementalStatisticsRequest(IncrementalStatisticsRequest request) {
if (request == null) {
throw new IllegalArgumentException("增量统计请求不能为空");
}

if (request.getRequestId() == null) {
throw new IllegalArgumentException("请求ID不能为空");
}

if (request.getDataSource() == null) {
throw new IllegalArgumentException("数据源不能为空");
}
}

/**
* 获取增量数据
*/
private List<StatisticsData> getIncrementalData(IncrementalStatisticsRequest request) {
try {
// 1. 获取偏移量
Long offset = getOffset(request);

// 2. 查询增量数据
List<StatisticsData> incrementalData = statisticsRepository.findIncrementalData(
request.getDataSource(), offset, request.getBatchSize());

// 3. 数据预处理
List<StatisticsData> processedData = preprocessIncrementalData(incrementalData);

return processedData;

} catch (Exception e) {
log.error("获取增量数据失败", e);
throw new StatisticsException("获取增量数据失败", e);
}
}

/**
* 获取偏移量
*/
private Long getOffset(IncrementalStatisticsRequest request) {
try {
String offsetKey = INCREMENTAL_STATISTICS_OFFSET_PREFIX + request.getDataSource();
Long offset = (Long) redisTemplate.opsForValue().get(offsetKey);

return offset != null ? offset : 0L;

} catch (Exception e) {
log.error("获取偏移量失败", e);
return 0L;
}
}

/**
* 预处理增量数据
*/
private List<StatisticsData> preprocessIncrementalData(List<StatisticsData> dataList) {
try {
List<StatisticsData> processedList = new ArrayList<>();

for (StatisticsData data : dataList) {
try {
// 数据清洗
StatisticsData cleanedData = cleanIncrementalData(data);

// 数据验证
if (validateIncrementalData(cleanedData)) {
processedList.add(cleanedData);
}

} catch (Exception e) {
log.error("预处理增量数据失败: {}", data.getDataId(), e);
}
}

return processedList;

} catch (Exception e) {
log.error("预处理增量数据失败", e);
throw new StatisticsException("预处理增量数据失败", e);
}
}

/**
* 清洗增量数据
*/
private StatisticsData cleanIncrementalData(StatisticsData data) {
try {
StatisticsData cleanedData = new StatisticsData();
cleanedData.setDataId(data.getDataId());
cleanedData.setTimestamp(data.getTimestamp());

// 清洗数值字段
if (data.getNumericValue() != null) {
cleanedData.setNumericValue(data.getNumericValue());
}

// 清洗字符串字段
if (data.getStringValue() != null) {
cleanedData.setStringValue(data.getStringValue().trim());
}

// 清洗分类字段
if (data.getCategory() != null) {
cleanedData.setCategory(data.getCategory().trim().toLowerCase());
}

return cleanedData;

} catch (Exception e) {
log.error("清洗增量数据失败", e);
throw new StatisticsException("清洗增量数据失败", e);
}
}

/**
* 验证增量数据
*/
private boolean validateIncrementalData(StatisticsData data) {
try {
// 验证必要字段
if (data.getDataId() == null || data.getTimestamp() == null) {
return false;
}

// 验证数值范围
if (data.getNumericValue() != null) {
if (data.getNumericValue() < 0 || data.getNumericValue() > 1000000) {
return false;
}
}

// 验证字符串长度
if (data.getStringValue() != null && data.getStringValue().length() > 1000) {
return false;
}

return true;

} catch (Exception e) {
log.error("验证增量数据失败", e);
return false;
}
}

/**
* 处理增量数据
*/
private void processIncrementalData(List<StatisticsData> incrementalData, IncrementalStatisticsRequest request) {
try {
if (incrementalData.isEmpty()) {
return;
}

// 1. 计算增量统计指标
StatisticsResult result = calculateIncrementalStatistics(incrementalData);

// 2. 更新统计结果
updateIncrementalStatisticsResult(result, request);

// 3. 发送增量统计事件
sendIncrementalStatisticsEvent(result, request);

// 4. 记录处理日志
recordIncrementalProcessingLog(incrementalData, result, request);

} catch (Exception e) {
log.error("处理增量数据失败", e);
}
}

/**
* 计算增量统计指标
*/
private StatisticsResult calculateIncrementalStatistics(List<StatisticsData> incrementalData) {
try {
StatisticsResult result = new StatisticsResult();
result.setCount((long) incrementalData.size());

if (incrementalData.isEmpty()) {
return result;
}

// 计算基础统计指标
double sum = 0.0;
double min = Double.MAX_VALUE;
double max = Double.MIN_VALUE;

for (StatisticsData data : incrementalData) {
if (data.getNumericValue() != null) {
double value = data.getNumericValue();
sum += value;
min = Math.min(min, value);
max = Math.max(max, value);
}
}

result.setSum(sum);
result.setAverage(sum / incrementalData.size());
result.setMin(min == Double.MAX_VALUE ? 0.0 : min);
result.setMax(max == Double.MIN_VALUE ? 0.0 : max);

// 计算方差和标准差
double variance = calculateIncrementalVariance(incrementalData, result.getAverage());
result.setVariance(variance);
result.setStandardDeviation(Math.sqrt(variance));

return result;

} catch (Exception e) {
log.error("计算增量统计指标失败", e);
throw new StatisticsException("计算增量统计指标失败", e);
}
}

/**
* 计算增量方差
*/
private double calculateIncrementalVariance(List<StatisticsData> incrementalData, double mean) {
try {
double sumSquaredDiff = 0.0;
int count = 0;

for (StatisticsData data : incrementalData) {
if (data.getNumericValue() != null) {
double diff = data.getNumericValue() - mean;
sumSquaredDiff += diff * diff;
count++;
}
}

return count > 0 ? sumSquaredDiff / count : 0.0;

} catch (Exception e) {
log.error("计算增量方差失败", e);
return 0.0;
}
}

/**
* 更新增量统计结果
*/
private void updateIncrementalStatisticsResult(StatisticsResult result, IncrementalStatisticsRequest request) {
try {
String cacheKey = INCREMENTAL_STATISTICS_CACHE_PREFIX + request.getDataSource();

// 获取现有统计结果
StatisticsResult existingResult = (StatisticsResult) redisTemplate.opsForValue().get(cacheKey);

if (existingResult == null) {
existingResult = new StatisticsResult();
existingResult.setCount(0L);
existingResult.setSum(0.0);
existingResult.setMin(Double.MAX_VALUE);
existingResult.setMax(Double.MIN_VALUE);
}

// 更新统计结果
existingResult.setCount(existingResult.getCount() + result.getCount());
existingResult.setSum(existingResult.getSum() + result.getSum());
existingResult.setAverage(existingResult.getSum() / existingResult.getCount());
existingResult.setMin(Math.min(existingResult.getMin(), result.getMin()));
existingResult.setMax(Math.max(existingResult.getMax(), result.getMax()));

// 更新缓存
redisTemplate.opsForValue().set(cacheKey, existingResult, Duration.ofHours(24));

} catch (Exception e) {
log.error("更新增量统计结果失败", e);
}
}

/**
* 发送增量统计事件
*/
private void sendIncrementalStatisticsEvent(StatisticsResult result, IncrementalStatisticsRequest request) {
try {
IncrementalStatisticsEvent event = new IncrementalStatisticsEvent();
event.setRequestId(request.getRequestId());
event.setDataSource(request.getDataSource());
event.setCount(result.getCount());
event.setSum(result.getSum());
event.setAverage(result.getAverage());
event.setEventTime(LocalDateTime.now());
event.setEventType("INCREMENTAL_STATISTICS");

kafkaTemplate.send("incremental.statistics.event.topic", event);

} catch (Exception e) {
log.error("发送增量统计事件失败", e);
}
}

/**
* 记录增量处理日志
*/
private void recordIncrementalProcessingLog(List<StatisticsData> incrementalData, StatisticsResult result, IncrementalStatisticsRequest request) {
try {
IncrementalProcessingLog log = new IncrementalProcessingLog();
log.setRequestId(request.getRequestId());
log.setDataSource(request.getDataSource());
log.setDataCount(incrementalData.size());
log.setCount(result.getCount());
log.setSum(result.getSum());
log.setAverage(result.getAverage());
log.setProcessTime(LocalDateTime.now());

// 异步记录日志
CompletableFuture.runAsync(() -> {
try {
statisticsRepository.saveIncrementalProcessingLog(log);
} catch (Exception e) {
log.error("保存增量处理日志失败", e);
}
});

} catch (Exception e) {
log.error("记录增量处理日志失败", e);
}
}

/**
* 更新偏移量
*/
private void updateOffset(IncrementalStatisticsRequest request) {
try {
String offsetKey = INCREMENTAL_STATISTICS_OFFSET_PREFIX + request.getDataSource();

// 获取当前偏移量
Long currentOffset = getOffset(request);

// 更新偏移量
Long newOffset = currentOffset + request.getBatchSize();
redisTemplate.opsForValue().set(offsetKey, newOffset, Duration.ofDays(7));

} catch (Exception e) {
log.error("更新偏移量失败", e);
}
}
}

三、企业级海量数据统计方案

3.1 数据统计管理服务

3.1.1 数据统计管理服务

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
/**
* 数据统计管理服务
*/
@Service
public class DataStatisticsManagementService {

@Autowired
private RealTimeStatisticsService realTimeService;

@Autowired
private OfflineStatisticsService offlineService;

@Autowired
private IncrementalStatisticsService incrementalService;

@Autowired
private RedisTemplate<String, Object> redisTemplate;

private final String DATA_STATISTICS_MANAGEMENT_CACHE_PREFIX = "data_statistics_management:";
private final long DATA_STATISTICS_MANAGEMENT_CACHE_EXPIRE = 3600; // 1小时

/**
* 智能选择统计策略
*/
public StatisticsResult executeDataStatistics(DataStatisticsRequest request) {
try {
// 1. 验证请求
validateDataStatisticsRequest(request);

// 2. 选择统计策略
StatisticsStrategy strategy = selectStatisticsStrategy(request);

// 3. 执行统计策略
StatisticsResult result = executeStatisticsStrategy(request, strategy);

// 4. 记录统计统计
recordStatisticsStatistics(request, strategy);

return result;

} catch (Exception e) {
log.error("执行数据统计失败: {}", request.getRequestId(), e);
throw new StatisticsException("执行数据统计失败", e);
}
}

/**
* 验证数据统计请求
*/
private void validateDataStatisticsRequest(DataStatisticsRequest request) {
if (request == null) {
throw new IllegalArgumentException("数据统计请求不能为空");
}

if (request.getRequestId() == null) {
throw new IllegalArgumentException("请求ID不能为空");
}

if (request.getStatisticsType() == null) {
throw new IllegalArgumentException("统计类型不能为空");
}
}

/**
* 选择统计策略
*/
private StatisticsStrategy selectStatisticsStrategy(DataStatisticsRequest request) {
try {
if (request.getStrategy() != null) {
return request.getStrategy();
}

// 根据统计类型选择策略
switch (request.getStatisticsType()) {
case REAL_TIME:
return StatisticsStrategy.REAL_TIME;
case OFFLINE:
return StatisticsStrategy.OFFLINE;
case INCREMENTAL:
return StatisticsStrategy.INCREMENTAL;
default:
return StatisticsStrategy.REAL_TIME; // 默认策略
}

} catch (Exception e) {
log.error("选择统计策略失败", e);
return StatisticsStrategy.REAL_TIME; // 默认策略
}
}

/**
* 执行统计策略
*/
private StatisticsResult executeStatisticsStrategy(DataStatisticsRequest request, StatisticsStrategy strategy) {
try {
switch (strategy) {
case REAL_TIME:
return executeRealTimeStrategy(request);
case OFFLINE:
return executeOfflineStrategy(request);
case INCREMENTAL:
return executeIncrementalStrategy(request);
default:
throw new IllegalArgumentException("不支持的统计策略: " + strategy);
}

} catch (Exception e) {
log.error("执行统计策略失败", e);
throw new StatisticsException("执行统计策略失败", e);
}
}

/**
* 执行实时策略
*/
private StatisticsResult executeRealTimeStrategy(DataStatisticsRequest request) {
try {
// 构建实时统计请求
StatisticsData data = buildStatisticsData(request);

// 执行实时统计
realTimeService.processRealTimeStatistics(data);

// 返回统计结果
return getRealTimeStatisticsResult(request);

} catch (Exception e) {
log.error("执行实时策略失败", e);
throw new StatisticsException("执行实时策略失败", e);
}
}

/**
* 执行离线策略
*/
private StatisticsResult executeOfflineStrategy(DataStatisticsRequest request) {
try {
// 构建离线统计请求
OfflineStatisticsRequest offlineRequest = buildOfflineStatisticsRequest(request);

// 执行离线统计
offlineService.processOfflineStatistics(offlineRequest);

// 返回统计结果
return getOfflineStatisticsResult(request);

} catch (Exception e) {
log.error("执行离线策略失败", e);
throw new StatisticsException("执行离线策略失败", e);
}
}

/**
* 执行增量策略
*/
private StatisticsResult executeIncrementalStrategy(DataStatisticsRequest request) {
try {
// 构建增量统计请求
IncrementalStatisticsRequest incrementalRequest = buildIncrementalStatisticsRequest(request);

// 执行增量统计
incrementalService.processIncrementalStatistics(incrementalRequest);

// 返回统计结果
return getIncrementalStatisticsResult(request);

} catch (Exception e) {
log.error("执行增量策略失败", e);
throw new StatisticsException("执行增量策略失败", e);
}
}

/**
* 构建统计数据
*/
private StatisticsData buildStatisticsData(DataStatisticsRequest request) {
try {
StatisticsData data = new StatisticsData();
data.setDataId(request.getRequestId());
data.setTimestamp(LocalDateTime.now());
data.setNumericValue(request.getNumericValue());
data.setStringValue(request.getStringValue());
data.setCategory(request.getCategory());

return data;

} catch (Exception e) {
log.error("构建统计数据失败", e);
throw new StatisticsException("构建统计数据失败", e);
}
}

/**
* 构建离线统计请求
*/
private OfflineStatisticsRequest buildOfflineStatisticsRequest(DataStatisticsRequest request) {
try {
OfflineStatisticsRequest offlineRequest = new OfflineStatisticsRequest();
offlineRequest.setRequestId(request.getRequestId());
offlineRequest.setStartTime(request.getStartTime());
offlineRequest.setEndTime(request.getEndTime());
offlineRequest.setCategory(request.getCategory());
offlineRequest.setLimit(request.getLimit());
offlineRequest.setOffset(request.getOffset());

return offlineRequest;

} catch (Exception e) {
log.error("构建离线统计请求失败", e);
throw new StatisticsException("构建离线统计请求失败", e);
}
}

/**
* 构建增量统计请求
*/
private IncrementalStatisticsRequest buildIncrementalStatisticsRequest(DataStatisticsRequest request) {
try {
IncrementalStatisticsRequest incrementalRequest = new IncrementalStatisticsRequest();
incrementalRequest.setRequestId(request.getRequestId());
incrementalRequest.setDataSource(request.getDataSource());
incrementalRequest.setBatchSize(request.getBatchSize());

return incrementalRequest;

} catch (Exception e) {
log.error("构建增量统计请求失败", e);
throw new StatisticsException("构建增量统计请求失败", e);
}
}

/**
* 获取实时统计结果
*/
private StatisticsResult getRealTimeStatisticsResult(DataStatisticsRequest request) {
try {
String cacheKey = REAL_TIME_STATISTICS_CACHE_PREFIX + request.getCategory();
return (StatisticsResult) redisTemplate.opsForValue().get(cacheKey);

} catch (Exception e) {
log.error("获取实时统计结果失败", e);
return new StatisticsResult();
}
}

/**
* 获取离线统计结果
*/
private StatisticsResult getOfflineStatisticsResult(DataStatisticsRequest request) {
try {
String cacheKey = OFFLINE_STATISTICS_CACHE_PREFIX + request.getRequestId();
return (StatisticsResult) redisTemplate.opsForValue().get(cacheKey);

} catch (Exception e) {
log.error("获取离线统计结果失败", e);
return new StatisticsResult();
}
}

/**
* 获取增量统计结果
*/
private StatisticsResult getIncrementalStatisticsResult(DataStatisticsRequest request) {
try {
String cacheKey = INCREMENTAL_STATISTICS_CACHE_PREFIX + request.getDataSource();
return (StatisticsResult) redisTemplate.opsForValue().get(cacheKey);

} catch (Exception e) {
log.error("获取增量统计结果失败", e);
return new StatisticsResult();
}
}

/**
* 记录统计统计
*/
private void recordStatisticsStatistics(DataStatisticsRequest request, StatisticsStrategy strategy) {
try {
StatisticsStatistics statistics = new StatisticsStatistics();
statistics.setRequestId(request.getRequestId());
statistics.setStrategy(strategy);
statistics.setStatisticsType(request.getStatisticsType());
statistics.setExecutionTime(LocalDateTime.now());

// 异步记录统计
CompletableFuture.runAsync(() -> {
try {
saveStatisticsStatistics(statistics);
} catch (Exception e) {
log.error("保存统计统计失败", e);
}
});

} catch (Exception e) {
log.error("记录统计统计失败", e);
}
}

/**
* 保存统计统计
*/
private void saveStatisticsStatistics(StatisticsStatistics statistics) {
// 实现统计保存逻辑
log.info("保存统计统计: {}", statistics.getRequestId());
}

/**
* 获取数据统计统计
*/
public DataStatisticsStatisticsResult getDataStatisticsStatistics(Date startTime, Date endTime) {
try {
DataStatisticsStatisticsResult result = new DataStatisticsStatisticsResult();
result.setStartTime(startTime);
result.setEndTime(endTime);

// 统计统计次数
result.setTotalStatistics(10000L); // 实际应用中需要从数据库统计

// 统计策略使用情况
Map<StatisticsStrategy, Long> strategyCount = new HashMap<>();
strategyCount.put(StatisticsStrategy.REAL_TIME, 5000L);
strategyCount.put(StatisticsStrategy.OFFLINE, 3000L);
strategyCount.put(StatisticsStrategy.INCREMENTAL, 2000L);
result.setStrategyCount(strategyCount);

// 统计成功率
result.setSuccessRate(0.99); // 99%

// 统计平均执行时间
result.setAverageExecutionTime(1000.0); // 1秒

return result;

} catch (Exception e) {
log.error("获取数据统计统计失败", e);
throw new StatisticsException("获取数据统计统计失败", e);
}
}
}

四、性能优化与监控

4.1 性能优化

4.1.1 海量数据统计性能优化

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
/**
* 海量数据统计性能优化服务
*/
@Service
public class DataStatisticsPerformanceOptimizationService {

@Autowired
private RedisTemplate<String, Object> redisTemplate;

@Autowired
private CaffeineCache localCache;

private final String DATA_STATISTICS_PERFORMANCE_CACHE_PREFIX = "data_statistics_performance:";

/**
* 优化海量数据统计性能
*/
public DataStatisticsOptimizationResult optimizeDataStatisticsPerformance(DataStatisticsOptimizationRequest request) {
try {
DataStatisticsOptimizationResult result = new DataStatisticsOptimizationResult();
result.setRequestId(request.getRequestId());
result.setStartTime(new Date());

// 1. 分析数据统计模式
DataStatisticsPatternAnalysis patternAnalysis = analyzeDataStatisticsPattern(request);
result.setPatternAnalysis(patternAnalysis);

// 2. 优化存储策略
StorageOptimizationResult storageOptimization = optimizeStorageStrategy(request, patternAnalysis);
result.setStorageOptimization(storageOptimization);

// 3. 优化计算策略
ComputationOptimizationResult computationOptimization = optimizeComputationStrategy(request, patternAnalysis);
result.setComputationOptimization(computationOptimization);

result.setStatus(DataStatisticsOptimizationStatus.COMPLETED);
result.setEndTime(new Date());

return result;

} catch (Exception e) {
log.error("优化海量数据统计性能失败", e);
throw new StatisticsException("优化海量数据统计性能失败", e);
}
}

/**
* 分析数据统计模式
*/
private DataStatisticsPatternAnalysis analyzeDataStatisticsPattern(DataStatisticsOptimizationRequest request) {
try {
DataStatisticsPatternAnalysis analysis = new DataStatisticsPatternAnalysis();
analysis.setRequestId(request.getRequestId());

// 分析数据量
analysis.setDataVolume(analyzeDataVolume(request.getDataSource()));

// 分析查询模式
analysis.setQueryPattern(analyzeQueryPattern(request.getQueryType()));

return analysis;

} catch (Exception e) {
log.error("分析数据统计模式失败", e);
throw new StatisticsException("分析数据统计模式失败", e);
}
}

/**
* 分析数据量
*/
private DataVolume analyzeDataVolume(String dataSource) {
try {
DataVolume volume = new DataVolume();
volume.setDataSource(dataSource);
volume.setDailyVolume(1000000L); // 100万条/天
volume.setHourlyVolume(50000L); // 5万条/小时
volume.setMinuteVolume(1000L); // 1000条/分钟

return volume;

} catch (Exception e) {
log.error("分析数据量失败", e);
return new DataVolume();
}
}

/**
* 分析查询模式
*/
private QueryPattern analyzeQueryPattern(String queryType) {
try {
QueryPattern pattern = new QueryPattern();
pattern.setQueryType(queryType);
pattern.setAverageQueryTime(100.0); // 100ms
pattern.setConcurrentQueries(100); // 100个并发查询
pattern.setQueryFrequency(1000.0); // 1000次/秒

return pattern;

} catch (Exception e) {
log.error("分析查询模式失败", e);
return new QueryPattern();
}
}

/**
* 优化存储策略
*/
private StorageOptimizationResult optimizeStorageStrategy(DataStatisticsOptimizationRequest request,
DataStatisticsPatternAnalysis analysis) {
try {
StorageOptimizationResult result = new StorageOptimizationResult();
result.setRequestId(request.getRequestId());

// 根据数据量优化存储策略
if (analysis.getDataVolume().getDailyVolume() > 10000000) { // 大于1000万条/天
result.setRecommendedStorageType("DISTRIBUTED");
result.setRecommendedPartitionCount(100);
result.setRecommendedReplicationFactor(3);
result.setRecommendedCompressionEnabled(true);
} else if (analysis.getDataVolume().getDailyVolume() > 1000000) { // 大于100万条/天
result.setRecommendedStorageType("CLUSTERED");
result.setRecommendedPartitionCount(50);
result.setRecommendedReplicationFactor(2);
result.setRecommendedCompressionEnabled(true);
} else {
result.setRecommendedStorageType("SINGLE");
result.setRecommendedPartitionCount(10);
result.setRecommendedReplicationFactor(1);
result.setRecommendedCompressionEnabled(false);
}

return result;

} catch (Exception e) {
log.error("优化存储策略失败", e);
throw new StatisticsException("优化存储策略失败", e);
}
}

/**
* 优化计算策略
*/
private ComputationOptimizationResult optimizeComputationStrategy(DataStatisticsOptimizationRequest request,
DataStatisticsPatternAnalysis analysis) {
try {
ComputationOptimizationResult result = new ComputationOptimizationResult();
result.setRequestId(request.getRequestId());

// 根据查询模式优化计算策略
if (analysis.getQueryPattern().getConcurrentQueries() > 1000) { // 大于1000个并发查询
result.setRecommendedComputationType("DISTRIBUTED");
result.setRecommendedWorkerCount(20);
result.setRecommendedBatchSize(10000);
result.setRecommendedCacheEnabled(true);
} else if (analysis.getQueryPattern().getConcurrentQueries() > 100) { // 大于100个并发查询
result.setRecommendedComputationType("PARALLEL");
result.setRecommendedWorkerCount(10);
result.setRecommendedBatchSize(5000);
result.setRecommendedCacheEnabled(true);
} else {
result.setRecommendedComputationType("SERIAL");
result.setRecommendedWorkerCount(5);
result.setRecommendedBatchSize(1000);
result.setRecommendedCacheEnabled(false);
}

return result;

} catch (Exception e) {
log.error("优化计算策略失败", e);
throw new StatisticsException("优化计算策略失败", e);
}
}
}

4.2 监控告警

4.2.1 海量数据统计监控指标

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
/**
* 海量数据统计监控指标
*/
@Component
public class DataStatisticsMetrics {

private final MeterRegistry meterRegistry;

public DataStatisticsMetrics(MeterRegistry meterRegistry) {
this.meterRegistry = meterRegistry;
}

/**
* 记录数据统计执行次数
*/
public void recordDataStatisticsExecutionCount(String statisticsType, String status) {
Counter.builder("data_statistics.execution.count")
.description("数据统计执行次数")
.tag("statistics_type", statisticsType)
.tag("status", status)
.register(meterRegistry)
.increment();
}

/**
* 记录数据统计执行时间
*/
public void recordDataStatisticsExecutionTime(String statisticsType, String status, long duration) {
Timer.builder("data_statistics.execution.time")
.description("数据统计执行时间")
.tag("statistics_type", statisticsType)
.tag("status", status)
.register(meterRegistry)
.record(duration, TimeUnit.MILLISECONDS);
}

/**
* 记录数据统计成功率
*/
public void recordDataStatisticsSuccessRate(String statisticsType, double successRate) {
Gauge.builder("data_statistics.success.rate")
.description("数据统计成功率")
.tag("statistics_type", statisticsType)
.register(meterRegistry, successRate);
}

/**
* 记录数据统计失败率
*/
public void recordDataStatisticsFailureRate(String statisticsType, double failureRate) {
Gauge.builder("data_statistics.failure.rate")
.description("数据统计失败率")
.tag("statistics_type", statisticsType)
.register(meterRegistry, failureRate);
}

/**
* 记录数据统计吞吐量
*/
public void recordDataStatisticsThroughput(String statisticsType, double throughput) {
Gauge.builder("data_statistics.throughput")
.description("数据统计吞吐量")
.tag("statistics_type", statisticsType)
.register(meterRegistry, throughput);
}

/**
* 记录数据统计异常次数
*/
public void recordDataStatisticsExceptionCount(String statisticsType, String exceptionType) {
Counter.builder("data_statistics.exception.count")
.description("数据统计异常次数")
.tag("statistics_type", statisticsType)
.tag("exception_type", exceptionType)
.register(meterRegistry)
.increment();
}
}

4.2.2 海量数据统计告警规则

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
# prometheus-rules.yml
groups:
- name: data_statistics_alerts
rules:
- alert: HighDataStatisticsExecutionTime
expr: data_statistics_execution_time{quantile="0.95"} > 30000
for: 2m
labels:
severity: warning
annotations:
summary: "数据统计执行时间过长"
description: "数据统计执行时间P95超过30秒,当前值: {{ $value }}ms"

- alert: HighDataStatisticsFailureRate
expr: data_statistics_failure_rate > 0.05
for: 2m
labels:
severity: warning
annotations:
summary: "数据统计失败率过高"
description: "数据统计失败率超过5%,当前值: {{ $value }}"

- alert: LowDataStatisticsThroughput
expr: data_statistics_throughput < 100
for: 5m
labels:
severity: warning
annotations:
summary: "数据统计吞吐量过低"
description: "数据统计吞吐量低于100次/秒,当前值: {{ $value }}"

- alert: HighDataStatisticsExceptionCount
expr: rate(data_statistics_exception_count[5m]) > 10
for: 2m
labels:
severity: critical
annotations:
summary: "数据统计异常次数过多"
description: "数据统计异常频率超过10次/分钟,当前值: {{ $value }}"

- alert: DataStatisticsServiceDown
expr: up{job="data-statistics-service"} == 0
for: 1m
labels:
severity: critical
annotations:
summary: "数据统计服务宕机"
description: "数据统计服务已宕机超过1分钟"

五、总结

海量数据统计作为现代大数据应用的核心功能之一,通过合理的大数据统计策略和性能优化,能够构建一个高效、稳定、可扩展的海量数据统计系统。本文从大数据统计策略到性能优化,从基础实现到企业级应用,系统梳理了海量数据统计性能优化的完整解决方案。

5.1 关键要点

  1. 统计策略:通过多种统计策略实现不同场景下的数据统计需求
  2. 性能优化:通过存储优化、计算优化等手段优化统计性能
  3. 数据处理:通过数据清洗、数据转换、数据聚合等手段提高数据质量
  4. 监控告警:建立完善的监控体系,及时发现和处理问题
  5. 企业级方案:提供完整的企业级部署和监控方案

5.2 最佳实践

  1. 策略选择:根据数据量、查询模式、系统负载选择合适的统计策略
  2. 存储优化:合理选择存储类型、分区策略、压缩策略
  3. 计算优化:合理配置计算资源、批处理大小、缓存策略
  4. 监控告警:建立完善的监控体系,确保海量数据统计服务稳定运行
  5. 性能调优:通过索引优化、分区优化、缓存优化等手段优化系统性能

通过以上措施,可以构建一个高效、稳定、可扩展的海量数据统计系统,为企业的各种业务场景提供数据统计支持。