1. 车辆GPS数据处理概述

车辆GPS数据处理是智能交通系统的核心技术,通过SpringBoot + Redis + Kafka + MongoDB的技术栈,可以实现高稳定、高性能的车辆定位数据处理系统。系统具备实时数据接收、流式处理、分布式存储、缓存优化、监控告警等功能。本文将详细介绍车辆GPS数据处理的原理、实现方法、性能优化技巧以及在Java实战中的应用。

1.1 车辆GPS数据处理核心价值

  1. 实时处理: 实时接收和处理车辆GPS定位数据
  2. 高稳定性: 保证系统7x24小时稳定运行
  3. 高性能: 支持大规模车辆数据的并发处理
  4. 数据一致性: 确保GPS数据的准确性和完整性
  5. 可扩展性: 支持业务规模的快速扩展

1.2 车辆GPS数据处理场景

  • 车辆实时定位: 车辆当前位置的实时监控
  • 轨迹分析: 车辆行驶轨迹的分析和统计
  • 异常检测: 车辆异常行为的检测和告警
  • 路径规划: 基于历史数据的路径优化
  • 车队管理: 车队车辆的集中管理

1.3 技术架构组件

  • SpringBoot: 应用框架和微服务基础
  • Redis: 缓存和会话存储
  • Kafka: 消息队列和流式处理
  • MongoDB: GPS数据存储和查询
  • 监控系统: 系统监控和告警

2. 车辆GPS数据处理基础实现

2.1 GPS数据处理配置类

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
/**
* GPS数据处理配置类
* @author 运维实战
*/
@Configuration
@EnableConfigurationProperties(GPSDataProcessingProperties.class)
public class GPSDataProcessingConfig {

@Autowired
private GPSDataProcessingProperties properties;

/**
* GPS数据处理服务
* @return GPS数据处理服务
*/
@Bean
public GPSDataProcessingService gpsDataProcessingService() {
return new GPSDataProcessingService();
}

/**
* GPS数据接收服务
* @return GPS数据接收服务
*/
@Bean
public GPSDataReceiverService gpsDataReceiverService() {
return new GPSDataReceiverService();
}

/**
* GPS数据存储服务
* @return GPS数据存储服务
*/
@Bean
public GPSDataStorageService gpsDataStorageService() {
return new GPSDataStorageService();
}

/**
* GPS数据监控服务
* @return GPS数据监控服务
*/
@Bean
public GPSDataMonitorService gpsDataMonitorService() {
return new GPSDataMonitorService();
}

private static final Logger logger = LoggerFactory.getLogger(GPSDataProcessingConfig.class);
}

2.2 GPS数据处理属性配置

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
/**
* GPS数据处理属性配置
* @author 运维实战
*/
@Data
@ConfigurationProperties(prefix = "gps.data.processing")
public class GPSDataProcessingProperties {

/**
* 是否启用GPS数据处理
*/
private boolean enableGPSDataProcessing = true;

/**
* GPS数据接收端口
*/
private int receiverPort = 8080;

/**
* GPS数据接收超时时间(毫秒)
*/
private long receiverTimeoutMs = 30000;

/**
* GPS数据批处理大小
*/
private int batchSize = 1000;

/**
* GPS数据处理线程池大小
*/
private int processingThreadPoolSize = 10;

/**
* GPS数据存储线程池大小
*/
private int storageThreadPoolSize = 5;

/**
* Kafka主题名称
*/
private String kafkaTopic = "gps-data-topic";

/**
* Kafka消费者组
*/
private String kafkaConsumerGroup = "gps-data-group";

/**
* Redis缓存过期时间(秒)
*/
private long redisCacheExpireSeconds = 3600;

/**
* MongoDB集合名称
*/
private String mongoCollectionName = "gps_data";

/**
* MongoDB批量插入大小
*/
private int mongoBatchInsertSize = 1000;

/**
* 是否启用GPS数据验证
*/
private boolean enableGPSDataValidation = true;

/**
* GPS数据验证规则
*/
private GPSDataValidationRules validationRules = new GPSDataValidationRules();

/**
* 是否启用GPS数据监控
*/
private boolean enableGPSDataMonitoring = true;

/**
* 监控间隔(毫秒)
*/
private long monitorInterval = 30000;

/**
* 是否启用GPS数据告警
*/
private boolean enableGPSDataAlert = true;

/**
* GPS数据异常告警阈值
*/
private int gpsDataAbnormalAlertThreshold = 100;
}

/**
* GPS数据验证规则
* @author 运维实战
*/
@Data
public class GPSDataValidationRules {

/**
* 最小经度
*/
private double minLongitude = -180.0;

/**
* 最大经度
*/
private double maxLongitude = 180.0;

/**
* 最小纬度
*/
private double minLatitude = -90.0;

/**
* 最大纬度
*/
private double maxLatitude = 90.0;

/**
* 最小速度(km/h)
*/
private double minSpeed = 0.0;

/**
* 最大速度(km/h)
*/
private double maxSpeed = 200.0;

/**
* 最小方向角
*/
private double minDirection = 0.0;

/**
* 最大方向角
*/
private double maxDirection = 360.0;
}

2.3 GPS数据模型类

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
/**
* GPS数据模型类
* @author 运维实战
*/
@Data
@Document(collection = "gps_data")
public class GPSData {

@Id
private String id;

/**
* 车辆ID
*/
@Field("vehicle_id")
private String vehicleId;

/**
* 经度
*/
@Field("longitude")
private Double longitude;

/**
* 纬度
*/
@Field("latitude")
private Double latitude;

/**
* 海拔高度
*/
@Field("altitude")
private Double altitude;

/**
* 速度(km/h)
*/
@Field("speed")
private Double speed;

/**
* 方向角
*/
@Field("direction")
private Double direction;

/**
* GPS时间戳
*/
@Field("gps_timestamp")
private Long gpsTimestamp;

/**
* 接收时间戳
*/
@Field("receive_timestamp")
private Long receiveTimestamp;

/**
* 数据质量
*/
@Field("data_quality")
private Integer dataQuality;

/**
* 卫星数量
*/
@Field("satellite_count")
private Integer satelliteCount;

/**
* 是否有效
*/
@Field("is_valid")
private Boolean isValid;

/**
* 创建时间
*/
@Field("created_at")
private Date createdAt;

/**
* 更新时间
*/
@Field("updated_at")
private Date updatedAt;

public GPSData() {
this.createdAt = new Date();
this.updatedAt = new Date();
this.isValid = true;
}

/**
* 计算两点间距离(米)
* @param other 另一个GPS点
* @return 距离(米)
*/
public double calculateDistance(GPSData other) {
if (this.longitude == null || this.latitude == null ||
other.longitude == null || other.latitude == null) {
return 0.0;
}

final int R = 6371000; // 地球半径(米)
double lat1Rad = Math.toRadians(this.latitude);
double lat2Rad = Math.toRadians(other.latitude);
double deltaLatRad = Math.toRadians(other.latitude - this.latitude);
double deltaLonRad = Math.toRadians(other.longitude - this.longitude);

double a = Math.sin(deltaLatRad / 2) * Math.sin(deltaLatRad / 2) +
Math.cos(lat1Rad) * Math.cos(lat2Rad) *
Math.sin(deltaLonRad / 2) * Math.sin(deltaLonRad / 2);
double c = 2 * Math.atan2(Math.sqrt(a), Math.sqrt(1 - a));

return R * c;
}

/**
* 计算两点间方向角
* @param other 另一个GPS点
* @return 方向角(度)
*/
public double calculateBearing(GPSData other) {
if (this.longitude == null || this.latitude == null ||
other.longitude == null || other.latitude == null) {
return 0.0;
}

double lat1Rad = Math.toRadians(this.latitude);
double lat2Rad = Math.toRadians(other.latitude);
double deltaLonRad = Math.toRadians(other.longitude - this.longitude);

double y = Math.sin(deltaLonRad) * Math.cos(lat2Rad);
double x = Math.cos(lat1Rad) * Math.sin(lat2Rad) -
Math.sin(lat1Rad) * Math.cos(lat2Rad) * Math.cos(deltaLonRad);

double bearing = Math.toDegrees(Math.atan2(y, x));
return (bearing + 360) % 360;
}
}

2.4 基础GPS数据处理服务

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
/**
* 基础GPS数据处理服务
* @author 运维实战
*/
@Service
public class GPSDataProcessingService {

@Autowired
private GPSDataProcessingProperties properties;

@Autowired
private GPSDataReceiverService gpsDataReceiverService;

@Autowired
private GPSDataStorageService gpsDataStorageService;

@Autowired
private GPSDataMonitorService gpsDataMonitorService;

@Autowired
private KafkaTemplate<String, GPSData> kafkaTemplate;

@Autowired
private RedisTemplate<String, Object> redisTemplate;

@Autowired
private MongoTemplate mongoTemplate;

private static final Logger logger = LoggerFactory.getLogger(GPSDataProcessingService.class);

/**
* 处理GPS数据
* @param gpsData GPS数据
* @return 处理结果
*/
public GPSDataProcessingResult processGPSData(GPSData gpsData) {
logger.info("开始处理GPS数据,车辆ID: {}, 经度: {}, 纬度: {}",
gpsData.getVehicleId(), gpsData.getLongitude(), gpsData.getLatitude());

GPSDataProcessingResult result = new GPSDataProcessingResult();
result.setVehicleId(gpsData.getVehicleId());
result.setStartTime(System.currentTimeMillis());

try {
// 验证GPS数据
if (properties.isEnableGPSDataValidation()) {
GPSDataValidationResult validationResult = validateGPSData(gpsData);
if (!validationResult.isValid()) {
result.setSuccess(false);
result.setError("GPS数据验证失败: " + validationResult.getErrorMessage());
result.setEndTime(System.currentTimeMillis());
return result;
}
}

// 设置接收时间戳
gpsData.setReceiveTimestamp(System.currentTimeMillis());

// 发送到Kafka
sendToKafka(gpsData);

// 存储到Redis缓存
storeToRedisCache(gpsData);

// 存储到MongoDB
storeToMongoDB(gpsData);

// 记录监控指标
gpsDataMonitorService.recordGPSDataProcessing(gpsData.getVehicleId(), true);

result.setSuccess(true);
result.setEndTime(System.currentTimeMillis());

logger.info("GPS数据处理完成,车辆ID: {}, 成功: {}, 耗时: {}ms",
gpsData.getVehicleId(), result.isSuccess(), result.getDuration());

return result;

} catch (Exception e) {
logger.error("GPS数据处理异常,车辆ID: {}", gpsData.getVehicleId(), e);
result.setSuccess(false);
result.setError("GPS数据处理异常: " + e.getMessage());
result.setEndTime(System.currentTimeMillis());

// 记录失败监控指标
gpsDataMonitorService.recordGPSDataProcessing(gpsData.getVehicleId(), false);

return result;
}
}

/**
* 批量处理GPS数据
* @param gpsDataList GPS数据列表
* @return 批量处理结果
*/
public GPSDataBatchProcessingResult batchProcessGPSData(List<GPSData> gpsDataList) {
logger.info("开始批量处理GPS数据,数量: {}", gpsDataList.size());

GPSDataBatchProcessingResult result = new GPSDataBatchProcessingResult();
result.setTotalCount(gpsDataList.size());
result.setStartTime(System.currentTimeMillis());

try {
// 分批处理
List<List<GPSData>> batches = partitionList(gpsDataList, properties.getBatchSize());

for (List<GPSData> batch : batches) {
// 并行处理批次
List<CompletableFuture<GPSDataProcessingResult>> futures = batch.stream()
.map(gpsData -> CompletableFuture.supplyAsync(() -> processGPSData(gpsData)))
.collect(Collectors.toList());

// 等待批次完成
CompletableFuture.allOf(futures.toArray(new CompletableFuture[0])).join();

// 统计结果
for (CompletableFuture<GPSDataProcessingResult> future : futures) {
GPSDataProcessingResult processingResult = future.get();
if (processingResult.isSuccess()) {
result.incrementSuccessCount();
} else {
result.incrementFailureCount();
}
}
}

result.setEndTime(System.currentTimeMillis());

logger.info("批量GPS数据处理完成,总数: {}, 成功: {}, 失败: {}, 耗时: {}ms",
result.getTotalCount(), result.getSuccessCount(), result.getFailureCount(), result.getDuration());

return result;

} catch (Exception e) {
logger.error("批量GPS数据处理异常", e);
result.setSuccess(false);
result.setError("批量GPS数据处理异常: " + e.getMessage());
result.setEndTime(System.currentTimeMillis());
return result;
}
}

/**
* 验证GPS数据
* @param gpsData GPS数据
* @return 验证结果
*/
private GPSDataValidationResult validateGPSData(GPSData gpsData) {
GPSDataValidationResult result = new GPSDataValidationResult();
result.setValid(true);

GPSDataValidationRules rules = properties.getValidationRules();

// 验证经度
if (gpsData.getLongitude() == null ||
gpsData.getLongitude() < rules.getMinLongitude() ||
gpsData.getLongitude() > rules.getMaxLongitude()) {
result.setValid(false);
result.setErrorMessage("经度超出有效范围");
return result;
}

// 验证纬度
if (gpsData.getLatitude() == null ||
gpsData.getLatitude() < rules.getMinLatitude() ||
gpsData.getLatitude() > rules.getMaxLatitude()) {
result.setValid(false);
result.setErrorMessage("纬度超出有效范围");
return result;
}

// 验证速度
if (gpsData.getSpeed() != null &&
(gpsData.getSpeed() < rules.getMinSpeed() || gpsData.getSpeed() > rules.getMaxSpeed())) {
result.setValid(false);
result.setErrorMessage("速度超出有效范围");
return result;
}

// 验证方向角
if (gpsData.getDirection() != null &&
(gpsData.getDirection() < rules.getMinDirection() || gpsData.getDirection() > rules.getMaxDirection())) {
result.setValid(false);
result.setErrorMessage("方向角超出有效范围");
return result;
}

return result;
}

/**
* 发送GPS数据到Kafka
* @param gpsData GPS数据
*/
private void sendToKafka(GPSData gpsData) {
try {
kafkaTemplate.send(properties.getKafkaTopic(), gpsData.getVehicleId(), gpsData);
logger.debug("GPS数据发送到Kafka成功,车辆ID: {}", gpsData.getVehicleId());
} catch (Exception e) {
logger.error("GPS数据发送到Kafka失败,车辆ID: {}", gpsData.getVehicleId(), e);
throw new RuntimeException("GPS数据发送到Kafka失败", e);
}
}

/**
* 存储GPS数据到Redis缓存
* @param gpsData GPS数据
*/
private void storeToRedisCache(GPSData gpsData) {
try {
String key = "gps:vehicle:" + gpsData.getVehicleId();
redisTemplate.opsForValue().set(key, gpsData, Duration.ofSeconds(properties.getRedisCacheExpireSeconds()));
logger.debug("GPS数据存储到Redis缓存成功,车辆ID: {}", gpsData.getVehicleId());
} catch (Exception e) {
logger.error("GPS数据存储到Redis缓存失败,车辆ID: {}", gpsData.getVehicleId(), e);
// Redis缓存失败不影响主流程
}
}

/**
* 存储GPS数据到MongoDB
* @param gpsData GPS数据
*/
private void storeToMongoDB(GPSData gpsData) {
try {
mongoTemplate.save(gpsData, properties.getMongoCollectionName());
logger.debug("GPS数据存储到MongoDB成功,车辆ID: {}", gpsData.getVehicleId());
} catch (Exception e) {
logger.error("GPS数据存储到MongoDB失败,车辆ID: {}", gpsData.getVehicleId(), e);
throw new RuntimeException("GPS数据存储到MongoDB失败", e);
}
}

/**
* 分割列表
* @param list 原列表
* @param size 分割大小
* @return 分割后的列表
*/
private <T> List<List<T>> partitionList(List<T> list, int size) {
List<List<T>> partitions = new ArrayList<>();
for (int i = 0; i < list.size(); i += size) {
partitions.add(list.subList(i, Math.min(i + size, list.size())));
}
return partitions;
}
}

2.5 GPS数据处理结果类

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
/**
* GPS数据处理结果类
* @author 运维实战
*/
@Data
public class GPSDataProcessingResult {

private boolean success;
private String vehicleId;
private String error;
private long startTime;
private long endTime;

public GPSDataProcessingResult() {
this.success = false;
}

/**
* 获取处理耗时
* @return 处理耗时(毫秒)
*/
public long getDuration() {
return endTime - startTime;
}

/**
* 是否成功
* @return 是否成功
*/
public boolean isSuccess() {
return success;
}
}

/**
* GPS数据批量处理结果类
* @author 运维实战
*/
@Data
public class GPSDataBatchProcessingResult {

private boolean success;
private int totalCount;
private int successCount;
private int failureCount;
private String error;
private long startTime;
private long endTime;

public GPSDataBatchProcessingResult() {
this.success = false;
this.successCount = 0;
this.failureCount = 0;
}

/**
* 获取处理耗时
* @return 处理耗时(毫秒)
*/
public long getDuration() {
return endTime - startTime;
}

/**
* 增加成功计数
*/
public void incrementSuccessCount() {
this.successCount++;
}

/**
* 增加失败计数
*/
public void incrementFailureCount() {
this.failureCount++;
}

/**
* 是否成功
* @return 是否成功
*/
public boolean isSuccess() {
return success;
}
}

/**
* GPS数据验证结果类
* @author 运维实战
*/
@Data
public class GPSDataValidationResult {

private boolean valid;
private String errorMessage;

public GPSDataValidationResult() {
this.valid = true;
}

/**
* 是否有效
* @return 是否有效
*/
public boolean isValid() {
return valid;
}
}

3. 高级功能实现

3.1 GPS数据接收服务

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
/**
* GPS数据接收服务
* @author 运维实战
*/
@Service
public class GPSDataReceiverService {

@Autowired
private GPSDataProcessingProperties properties;

@Autowired
private GPSDataProcessingService gpsDataProcessingService;

@Autowired
private GPSDataMonitorService gpsDataMonitorService;

private static final Logger logger = LoggerFactory.getLogger(GPSDataReceiverService.class);

/**
* 接收GPS数据
* @param gpsData GPS数据
* @return 接收结果
*/
public GPSDataReceiveResult receiveGPSData(GPSData gpsData) {
logger.info("接收到GPS数据,车辆ID: {}, 经度: {}, 纬度: {}",
gpsData.getVehicleId(), gpsData.getLongitude(), gpsData.getLatitude());

GPSDataReceiveResult result = new GPSDataReceiveResult();
result.setVehicleId(gpsData.getVehicleId());
result.setStartTime(System.currentTimeMillis());

try {
// 预处理GPS数据
GPSData preprocessedData = preprocessGPSData(gpsData);

// 处理GPS数据
GPSDataProcessingResult processingResult = gpsDataProcessingService.processGPSData(preprocessedData);

if (processingResult.isSuccess()) {
result.setSuccess(true);
result.setEndTime(System.currentTimeMillis());

// 记录接收成功指标
gpsDataMonitorService.recordGPSDataReceive(gpsData.getVehicleId(), true);

logger.info("GPS数据接收成功,车辆ID: {}, 耗时: {}ms",
gpsData.getVehicleId(), result.getDuration());
} else {
result.setSuccess(false);
result.setError(processingResult.getError());
result.setEndTime(System.currentTimeMillis());

// 记录接收失败指标
gpsDataMonitorService.recordGPSDataReceive(gpsData.getVehicleId(), false);

logger.warn("GPS数据接收失败,车辆ID: {}, 错误: {}", gpsData.getVehicleId(), processingResult.getError());
}

return result;

} catch (Exception e) {
logger.error("GPS数据接收异常,车辆ID: {}", gpsData.getVehicleId(), e);
result.setSuccess(false);
result.setError("GPS数据接收异常: " + e.getMessage());
result.setEndTime(System.currentTimeMillis());

// 记录接收异常指标
gpsDataMonitorService.recordGPSDataReceive(gpsData.getVehicleId(), false);

return result;
}
}

/**
* 批量接收GPS数据
* @param gpsDataList GPS数据列表
* @return 批量接收结果
*/
public GPSDataBatchReceiveResult batchReceiveGPSData(List<GPSData> gpsDataList) {
logger.info("批量接收GPS数据,数量: {}", gpsDataList.size());

GPSDataBatchReceiveResult result = new GPSDataBatchReceiveResult();
result.setTotalCount(gpsDataList.size());
result.setStartTime(System.currentTimeMillis());

try {
// 预处理GPS数据
List<GPSData> preprocessedDataList = gpsDataList.stream()
.map(this::preprocessGPSData)
.collect(Collectors.toList());

// 批量处理GPS数据
GPSDataBatchProcessingResult processingResult = gpsDataProcessingService.batchProcessGPSData(preprocessedDataList);

if (processingResult.isSuccess()) {
result.setSuccess(true);
result.setSuccessCount(processingResult.getSuccessCount());
result.setFailureCount(processingResult.getFailureCount());
} else {
result.setSuccess(false);
result.setError(processingResult.getError());
}

result.setEndTime(System.currentTimeMillis());

// 记录批量接收指标
gpsDataMonitorService.recordGPSDataBatchReceive(gpsDataList.size(), result.getSuccessCount());

logger.info("批量GPS数据接收完成,总数: {}, 成功: {}, 失败: {}, 耗时: {}ms",
result.getTotalCount(), result.getSuccessCount(), result.getFailureCount(), result.getDuration());

return result;

} catch (Exception e) {
logger.error("批量GPS数据接收异常", e);
result.setSuccess(false);
result.setError("批量GPS数据接收异常: " + e.getMessage());
result.setEndTime(System.currentTimeMillis());
return result;
}
}

/**
* 预处理GPS数据
* @param gpsData 原始GPS数据
* @return 预处理后的GPS数据
*/
private GPSData preprocessGPSData(GPSData gpsData) {
// 数据清洗和标准化
if (gpsData.getReceiveTimestamp() == null) {
gpsData.setReceiveTimestamp(System.currentTimeMillis());
}

if (gpsData.getDataQuality() == null) {
gpsData.setDataQuality(calculateDataQuality(gpsData));
}

if (gpsData.getIsValid() == null) {
gpsData.setIsValid(true);
}

return gpsData;
}

/**
* 计算数据质量
* @param gpsData GPS数据
* @return 数据质量分数
*/
private Integer calculateDataQuality(GPSData gpsData) {
int quality = 100;

// 根据卫星数量调整质量
if (gpsData.getSatelliteCount() != null) {
if (gpsData.getSatelliteCount() < 4) {
quality -= 20;
} else if (gpsData.getSatelliteCount() < 6) {
quality -= 10;
}
}

// 根据速度合理性调整质量
if (gpsData.getSpeed() != null) {
if (gpsData.getSpeed() > 150) {
quality -= 15;
}
}

return Math.max(0, quality);
}
}

3.2 GPS数据存储服务

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
/**
* GPS数据存储服务
* @author 运维实战
*/
@Service
public class GPSDataStorageService {

@Autowired
private GPSDataProcessingProperties properties;

@Autowired
private MongoTemplate mongoTemplate;

@Autowired
private RedisTemplate<String, Object> redisTemplate;

@Autowired
private GPSDataMonitorService gpsDataMonitorService;

private static final Logger logger = LoggerFactory.getLogger(GPSDataStorageService.class);

/**
* 存储GPS数据到MongoDB
* @param gpsData GPS数据
* @return 存储结果
*/
public GPSDataStorageResult storeToMongoDB(GPSData gpsData) {
logger.info("存储GPS数据到MongoDB,车辆ID: {}", gpsData.getVehicleId());

GPSDataStorageResult result = new GPSDataStorageResult();
result.setVehicleId(gpsData.getVehicleId());
result.setStorageType("MONGODB");
result.setStartTime(System.currentTimeMillis());

try {
// 创建索引(如果不存在)
createIndexIfNotExists();

// 存储数据
GPSData savedData = mongoTemplate.save(gpsData, properties.getMongoCollectionName());

result.setSuccess(true);
result.setStoredData(savedData);
result.setEndTime(System.currentTimeMillis());

// 记录存储成功指标
gpsDataMonitorService.recordGPSDataStorage(gpsData.getVehicleId(), "MONGODB", true);

logger.info("GPS数据存储到MongoDB成功,车辆ID: {}, 耗时: {}ms",
gpsData.getVehicleId(), result.getDuration());

return result;

} catch (Exception e) {
logger.error("GPS数据存储到MongoDB失败,车辆ID: {}", gpsData.getVehicleId(), e);
result.setSuccess(false);
result.setError("GPS数据存储到MongoDB失败: " + e.getMessage());
result.setEndTime(System.currentTimeMillis());

// 记录存储失败指标
gpsDataMonitorService.recordGPSDataStorage(gpsData.getVehicleId(), "MONGODB", false);

return result;
}
}

/**
* 批量存储GPS数据到MongoDB
* @param gpsDataList GPS数据列表
* @return 批量存储结果
*/
public GPSDataBatchStorageResult batchStoreToMongoDB(List<GPSData> gpsDataList) {
logger.info("批量存储GPS数据到MongoDB,数量: {}", gpsDataList.size());

GPSDataBatchStorageResult result = new GPSDataBatchStorageResult();
result.setTotalCount(gpsDataList.size());
result.setStorageType("MONGODB");
result.setStartTime(System.currentTimeMillis());

try {
// 创建索引(如果不存在)
createIndexIfNotExists();

// 分批存储
List<List<GPSData>> batches = partitionList(gpsDataList, properties.getMongoBatchInsertSize());

for (List<GPSData> batch : batches) {
Collection<GPSData> savedData = mongoTemplate.insert(batch, properties.getMongoCollectionName());
result.addSuccessCount(savedData.size());
}

result.setEndTime(System.currentTimeMillis());

// 记录批量存储指标
gpsDataMonitorService.recordGPSDataBatchStorage(gpsDataList.size(), result.getSuccessCount());

logger.info("批量GPS数据存储到MongoDB完成,总数: {}, 成功: {}, 失败: {}, 耗时: {}ms",
result.getTotalCount(), result.getSuccessCount(), result.getFailureCount(), result.getDuration());

return result;

} catch (Exception e) {
logger.error("批量GPS数据存储到MongoDB失败", e);
result.setSuccess(false);
result.setError("批量GPS数据存储到MongoDB失败: " + e.getMessage());
result.setEndTime(System.currentTimeMillis());
return result;
}
}

/**
* 存储GPS数据到Redis缓存
* @param gpsData GPS数据
* @return 存储结果
*/
public GPSDataStorageResult storeToRedisCache(GPSData gpsData) {
logger.info("存储GPS数据到Redis缓存,车辆ID: {}", gpsData.getVehicleId());

GPSDataStorageResult result = new GPSDataStorageResult();
result.setVehicleId(gpsData.getVehicleId());
result.setStorageType("REDIS");
result.setStartTime(System.currentTimeMillis());

try {
String key = "gps:vehicle:" + gpsData.getVehicleId();
redisTemplate.opsForValue().set(key, gpsData, Duration.ofSeconds(properties.getRedisCacheExpireSeconds()));

result.setSuccess(true);
result.setEndTime(System.currentTimeMillis());

// 记录存储成功指标
gpsDataMonitorService.recordGPSDataStorage(gpsData.getVehicleId(), "REDIS", true);

logger.info("GPS数据存储到Redis缓存成功,车辆ID: {}, 耗时: {}ms",
gpsData.getVehicleId(), result.getDuration());

return result;

} catch (Exception e) {
logger.error("GPS数据存储到Redis缓存失败,车辆ID: {}", gpsData.getVehicleId(), e);
result.setSuccess(false);
result.setError("GPS数据存储到Redis缓存失败: " + e.getMessage());
result.setEndTime(System.currentTimeMillis());

// 记录存储失败指标
gpsDataMonitorService.recordGPSDataStorage(gpsData.getVehicleId(), "REDIS", false);

return result;
}
}

/**
* 从Redis缓存获取GPS数据
* @param vehicleId 车辆ID
* @return GPS数据
*/
public GPSData getFromRedisCache(String vehicleId) {
try {
String key = "gps:vehicle:" + vehicleId;
return (GPSData) redisTemplate.opsForValue().get(key);
} catch (Exception e) {
logger.error("从Redis缓存获取GPS数据失败,车辆ID: {}", vehicleId, e);
return null;
}
}

/**
* 从MongoDB查询GPS数据
* @param vehicleId 车辆ID
* @param startTime 开始时间
* @param endTime 结束时间
* @return GPS数据列表
*/
public List<GPSData> queryFromMongoDB(String vehicleId, Long startTime, Long endTime) {
try {
Query query = new Query();
query.addCriteria(Criteria.where("vehicle_id").is(vehicleId));

if (startTime != null && endTime != null) {
query.addCriteria(Criteria.where("gps_timestamp").gte(startTime).lte(endTime));
}

query.with(Sort.by(Sort.Direction.ASC, "gps_timestamp"));

return mongoTemplate.find(query, GPSData.class, properties.getMongoCollectionName());
} catch (Exception e) {
logger.error("从MongoDB查询GPS数据失败,车辆ID: {}", vehicleId, e);
return new ArrayList<>();
}
}

/**
* 创建索引(如果不存在)
*/
private void createIndexIfNotExists() {
try {
// 创建车辆ID索引
Index vehicleIdIndex = new Index().on("vehicle_id", Sort.Direction.ASC);
mongoTemplate.indexOps(properties.getMongoCollectionName()).ensureIndex(vehicleIdIndex);

// 创建GPS时间戳索引
Index gpsTimestampIndex = new Index().on("gps_timestamp", Sort.Direction.ASC);
mongoTemplate.indexOps(properties.getMongoCollectionName()).ensureIndex(gpsTimestampIndex);

// 创建地理位置索引
Index geoIndex = new Index().on("longitude", Sort.Direction.ASC).on("latitude", Sort.Direction.ASC);
mongoTemplate.indexOps(properties.getMongoCollectionName()).ensureIndex(geoIndex);

// 创建复合索引
Index compoundIndex = new Index()
.on("vehicle_id", Sort.Direction.ASC)
.on("gps_timestamp", Sort.Direction.ASC);
mongoTemplate.indexOps(properties.getMongoCollectionName()).ensureIndex(compoundIndex);

} catch (Exception e) {
logger.error("创建MongoDB索引失败", e);
}
}

/**
* 分割列表
* @param list 原列表
* @param size 分割大小
* @return 分割后的列表
*/
private <T> List<List<T>> partitionList(List<T> list, int size) {
List<List<T>> partitions = new ArrayList<>();
for (int i = 0; i < list.size(); i += size) {
partitions.add(list.subList(i, Math.min(i + size, list.size())));
}
return partitions;
}
}

3.3 GPS数据监控服务

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
/**
* GPS数据监控服务
* @author 运维实战
*/
@Service
public class GPSDataMonitorService {

private final AtomicLong totalGPSDataReceived = new AtomicLong(0);
private final AtomicLong totalGPSDataProcessed = new AtomicLong(0);
private final AtomicLong totalGPSDataStored = new AtomicLong(0);
private final AtomicLong totalSuccessfulOperations = new AtomicLong(0);
private final AtomicLong totalFailedOperations = new AtomicLong(0);

private long lastResetTime = System.currentTimeMillis();
private final long resetInterval = 300000; // 5分钟重置一次

private static final Logger logger = LoggerFactory.getLogger(GPSDataMonitorService.class);

/**
* 记录GPS数据接收
* @param vehicleId 车辆ID
* @param success 是否成功
*/
public void recordGPSDataReceive(String vehicleId, boolean success) {
totalGPSDataReceived.incrementAndGet();

if (success) {
totalSuccessfulOperations.incrementAndGet();
} else {
totalFailedOperations.incrementAndGet();
}

logger.debug("记录GPS数据接收: 车辆ID={}, 成功={}", vehicleId, success);
}

/**
* 记录GPS数据处理
* @param vehicleId 车辆ID
* @param success 是否成功
*/
public void recordGPSDataProcessing(String vehicleId, boolean success) {
totalGPSDataProcessed.incrementAndGet();

if (success) {
totalSuccessfulOperations.incrementAndGet();
} else {
totalFailedOperations.incrementAndGet();
}

logger.debug("记录GPS数据处理: 车辆ID={}, 成功={}", vehicleId, success);
}

/**
* 记录GPS数据存储
* @param vehicleId 车辆ID
* @param storageType 存储类型
* @param success 是否成功
*/
public void recordGPSDataStorage(String vehicleId, String storageType, boolean success) {
totalGPSDataStored.incrementAndGet();

if (success) {
totalSuccessfulOperations.incrementAndGet();
} else {
totalFailedOperations.incrementAndGet();
}

logger.debug("记录GPS数据存储: 车辆ID={}, 存储类型={}, 成功={}", vehicleId, storageType, success);
}

/**
* 记录GPS数据批量接收
* @param totalCount 总数量
* @param successCount 成功数量
*/
public void recordGPSDataBatchReceive(int totalCount, int successCount) {
totalGPSDataReceived.addAndGet(totalCount);
totalSuccessfulOperations.addAndGet(successCount);
totalFailedOperations.addAndGet(totalCount - successCount);

logger.debug("记录GPS数据批量接收: 总数={}, 成功={}", totalCount, successCount);
}

/**
* 记录GPS数据批量存储
* @param totalCount 总数量
* @param successCount 成功数量
*/
public void recordGPSDataBatchStorage(int totalCount, int successCount) {
totalGPSDataStored.addAndGet(totalCount);
totalSuccessfulOperations.addAndGet(successCount);
totalFailedOperations.addAndGet(totalCount - successCount);

logger.debug("记录GPS数据批量存储: 总数={}, 成功={}", totalCount, successCount);
}

/**
* 获取监控指标
* @return 监控指标
*/
public GPSDataMetrics getMetrics() {
// 检查是否需要重置
if (System.currentTimeMillis() - lastResetTime > resetInterval) {
resetMetrics();
}

GPSDataMetrics metrics = new GPSDataMetrics();
metrics.setTotalGPSDataReceived(totalGPSDataReceived.get());
metrics.setTotalGPSDataProcessed(totalGPSDataProcessed.get());
metrics.setTotalGPSDataStored(totalGPSDataStored.get());
metrics.setTotalSuccessfulOperations(totalSuccessfulOperations.get());
metrics.setTotalFailedOperations(totalFailedOperations.get());
metrics.setTimestamp(System.currentTimeMillis());

return metrics;
}

/**
* 重置指标
*/
private void resetMetrics() {
totalGPSDataReceived.set(0);
totalGPSDataProcessed.set(0);
totalGPSDataStored.set(0);
totalSuccessfulOperations.set(0);
totalFailedOperations.set(0);
lastResetTime = System.currentTimeMillis();

logger.info("GPS数据监控指标重置");
}

/**
* 定期监控GPS数据处理状态
*/
@Scheduled(fixedRate = 30000) // 每30秒监控一次
public void monitorGPSDataProcessingStatus() {
try {
GPSDataMetrics metrics = getMetrics();

logger.info("GPS数据处理监控: 接收={}, 处理={}, 存储={}, 成功={}, 失败={}, 成功率={}%",
metrics.getTotalGPSDataReceived(), metrics.getTotalGPSDataProcessed(),
metrics.getTotalGPSDataStored(), metrics.getTotalSuccessfulOperations(),
metrics.getTotalFailedOperations(), String.format("%.2f", metrics.getSuccessRate()));

// 检查异常情况
if (metrics.getSuccessRate() < 90) {
logger.warn("GPS数据处理成功率过低: {}%", String.format("%.2f", metrics.getSuccessRate()));
}

if (metrics.getTotalFailedOperations() > 100) {
logger.warn("GPS数据处理失败次数过多: {}", metrics.getTotalFailedOperations());
}

} catch (Exception e) {
logger.error("GPS数据处理状态监控失败", e);
}
}
}

3.4 GPS数据指标类

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
/**
* GPS数据指标类
* @author 运维实战
*/
@Data
public class GPSDataMetrics {

private long totalGPSDataReceived;
private long totalGPSDataProcessed;
private long totalGPSDataStored;
private long totalSuccessfulOperations;
private long totalFailedOperations;
private long timestamp;

public GPSDataMetrics() {
this.timestamp = System.currentTimeMillis();
}

/**
* 获取成功率
* @return 成功率
*/
public double getSuccessRate() {
long total = totalSuccessfulOperations + totalFailedOperations;
if (total == 0) return 0.0;
return (double) totalSuccessfulOperations / total * 100;
}

/**
* 获取失败率
* @return 失败率
*/
public double getFailureRate() {
long total = totalSuccessfulOperations + totalFailedOperations;
if (total == 0) return 0.0;
return (double) totalFailedOperations / total * 100;
}

/**
* 获取处理效率
* @return 处理效率
*/
public double getProcessingEfficiency() {
if (totalGPSDataReceived == 0) return 0.0;
return (double) totalGPSDataProcessed / totalGPSDataReceived * 100;
}

/**
* 获取存储效率
* @return 存储效率
*/
public double getStorageEfficiency() {
if (totalGPSDataProcessed == 0) return 0.0;
return (double) totalGPSDataStored / totalGPSDataProcessed * 100;
}

/**
* 是否健康
* @return 是否健康
*/
public boolean isHealthy() {
return getSuccessRate() > 90 && getProcessingEfficiency() > 95 && getStorageEfficiency() > 95;
}
}

4. GPS数据控制器

4.1 GPS数据REST控制器

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
/**
* GPS数据REST控制器
* @author 运维实战
*/
@RestController
@RequestMapping("/api/gps/data")
public class GPSDataController {

@Autowired
private GPSDataReceiverService gpsDataReceiverService;

@Autowired
private GPSDataStorageService gpsDataStorageService;

@Autowired
private GPSDataMonitorService gpsDataMonitorService;

private static final Logger logger = LoggerFactory.getLogger(GPSDataController.class);

/**
* 接收GPS数据
* @param gpsData GPS数据
* @return 接收结果
*/
@PostMapping("/receive")
public ResponseEntity<GPSDataReceiveResult> receiveGPSData(@RequestBody GPSData gpsData) {
try {
logger.info("接收到GPS数据接收请求,车辆ID: {}", gpsData.getVehicleId());

GPSDataReceiveResult result = gpsDataReceiverService.receiveGPSData(gpsData);

return ResponseEntity.ok(result);

} catch (Exception e) {
logger.error("GPS数据接收失败", e);
return ResponseEntity.status(HttpStatus.INTERNAL_SERVER_ERROR).build();
}
}

/**
* 批量接收GPS数据
* @param gpsDataList GPS数据列表
* @return 批量接收结果
*/
@PostMapping("/batch-receive")
public ResponseEntity<GPSDataBatchReceiveResult> batchReceiveGPSData(@RequestBody List<GPSData> gpsDataList) {
try {
logger.info("接收到GPS数据批量接收请求,数量: {}", gpsDataList.size());

GPSDataBatchReceiveResult result = gpsDataReceiverService.batchReceiveGPSData(gpsDataList);

return ResponseEntity.ok(result);

} catch (Exception e) {
logger.error("GPS数据批量接收失败", e);
return ResponseEntity.status(HttpStatus.INTERNAL_SERVER_ERROR).build();
}
}

/**
* 存储GPS数据到MongoDB
* @param gpsData GPS数据
* @return 存储结果
*/
@PostMapping("/store/mongodb")
public ResponseEntity<GPSDataStorageResult> storeToMongoDB(@RequestBody GPSData gpsData) {
try {
logger.info("接收到GPS数据存储到MongoDB请求,车辆ID: {}", gpsData.getVehicleId());

GPSDataStorageResult result = gpsDataStorageService.storeToMongoDB(gpsData);

return ResponseEntity.ok(result);

} catch (Exception e) {
logger.error("GPS数据存储到MongoDB失败", e);
return ResponseEntity.status(HttpStatus.INTERNAL_SERVER_ERROR).build();
}
}

/**
* 批量存储GPS数据到MongoDB
* @param gpsDataList GPS数据列表
* @return 批量存储结果
*/
@PostMapping("/batch-store/mongodb")
public ResponseEntity<GPSDataBatchStorageResult> batchStoreToMongoDB(@RequestBody List<GPSData> gpsDataList) {
try {
logger.info("接收到GPS数据批量存储到MongoDB请求,数量: {}", gpsDataList.size());

GPSDataBatchStorageResult result = gpsDataStorageService.batchStoreToMongoDB(gpsDataList);

return ResponseEntity.ok(result);

} catch (Exception e) {
logger.error("GPS数据批量存储到MongoDB失败", e);
return ResponseEntity.status(HttpStatus.INTERNAL_SERVER_ERROR).build();
}
}

/**
* 存储GPS数据到Redis缓存
* @param gpsData GPS数据
* @return 存储结果
*/
@PostMapping("/store/redis")
public ResponseEntity<GPSDataStorageResult> storeToRedisCache(@RequestBody GPSData gpsData) {
try {
logger.info("接收到GPS数据存储到Redis缓存请求,车辆ID: {}", gpsData.getVehicleId());

GPSDataStorageResult result = gpsDataStorageService.storeToRedisCache(gpsData);

return ResponseEntity.ok(result);

} catch (Exception e) {
logger.error("GPS数据存储到Redis缓存失败", e);
return ResponseEntity.status(HttpStatus.INTERNAL_SERVER_ERROR).build();
}
}

/**
* 从Redis缓存获取GPS数据
* @param vehicleId 车辆ID
* @return GPS数据
*/
@GetMapping("/get/redis/{vehicleId}")
public ResponseEntity<GPSData> getFromRedisCache(@PathVariable String vehicleId) {
try {
GPSData gpsData = gpsDataStorageService.getFromRedisCache(vehicleId);

if (gpsData != null) {
return ResponseEntity.ok(gpsData);
} else {
return ResponseEntity.notFound().build();
}

} catch (Exception e) {
logger.error("从Redis缓存获取GPS数据失败", e);
return ResponseEntity.status(HttpStatus.INTERNAL_SERVER_ERROR).build();
}
}

/**
* 从MongoDB查询GPS数据
* @param vehicleId 车辆ID
* @param startTime 开始时间
* @param endTime 结束时间
* @return GPS数据列表
*/
@GetMapping("/query/mongodb/{vehicleId}")
public ResponseEntity<List<GPSData>> queryFromMongoDB(
@PathVariable String vehicleId,
@RequestParam(required = false) Long startTime,
@RequestParam(required = false) Long endTime) {
try {
List<GPSData> gpsDataList = gpsDataStorageService.queryFromMongoDB(vehicleId, startTime, endTime);
return ResponseEntity.ok(gpsDataList);

} catch (Exception e) {
logger.error("从MongoDB查询GPS数据失败", e);
return ResponseEntity.status(HttpStatus.INTERNAL_SERVER_ERROR).build();
}
}

/**
* 获取GPS数据监控指标
* @return GPS数据监控指标
*/
@GetMapping("/metrics")
public ResponseEntity<GPSDataMetrics> getGPSDataMetrics() {
try {
GPSDataMetrics metrics = gpsDataMonitorService.getMetrics();
return ResponseEntity.ok(metrics);
} catch (Exception e) {
logger.error("获取GPS数据监控指标失败", e);
return ResponseEntity.status(HttpStatus.INTERNAL_SERVER_ERROR).build();
}
}
}

4.2 Kafka消费者

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
/**
* GPS数据Kafka消费者
* @author 运维实战
*/
@Component
public class GPSDataKafkaConsumer {

@Autowired
private GPSDataStorageService gpsDataStorageService;

@Autowired
private GPSDataMonitorService gpsDataMonitorService;

@Autowired
private GPSDataProcessingProperties properties;

private static final Logger logger = LoggerFactory.getLogger(GPSDataKafkaConsumer.class);

/**
* 消费GPS数据
* @param gpsData GPS数据
*/
@KafkaListener(topics = "${gps.data.processing.kafka-topic}", groupId = "${gps.data.processing.kafka-consumer-group}")
public void consumeGPSData(GPSData gpsData) {
logger.info("接收到Kafka GPS数据,车辆ID: {}", gpsData.getVehicleId());

try {
// 存储到MongoDB
GPSDataStorageResult storageResult = gpsDataStorageService.storeToMongoDB(gpsData);

if (storageResult.isSuccess()) {
logger.info("Kafka GPS数据存储成功,车辆ID: {}", gpsData.getVehicleId());
} else {
logger.error("Kafka GPS数据存储失败,车辆ID: {}, 错误: {}",
gpsData.getVehicleId(), storageResult.getError());
}

} catch (Exception e) {
logger.error("Kafka GPS数据消费异常,车辆ID: {}", gpsData.getVehicleId(), e);
}
}

/**
* 批量消费GPS数据
* @param gpsDataList GPS数据列表
*/
@KafkaListener(topics = "${gps.data.processing.kafka-topic}", groupId = "${gps.data.processing.kafka-consumer-group}")
public void batchConsumeGPSData(List<GPSData> gpsDataList) {
logger.info("接收到Kafka批量GPS数据,数量: {}", gpsDataList.size());

try {
// 批量存储到MongoDB
GPSDataBatchStorageResult storageResult = gpsDataStorageService.batchStoreToMongoDB(gpsDataList);

if (storageResult.isSuccess()) {
logger.info("Kafka批量GPS数据存储成功,总数: {}, 成功: {}",
storageResult.getTotalCount(), storageResult.getSuccessCount());
} else {
logger.error("Kafka批量GPS数据存储失败,错误: {}", storageResult.getError());
}

} catch (Exception e) {
logger.error("Kafka批量GPS数据消费异常", e);
}
}
}

5. GPS数据注解和AOP

5.1 GPS数据处理注解

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
/**
* GPS数据处理注解
* @author 运维实战
*/
@Target({ElementType.METHOD, ElementType.TYPE})
@Retention(RetentionPolicy.RUNTIME)
@Documented
public @interface GPSDataProcessing {

/**
* 车辆ID
*/
String vehicleId() default "";

/**
* 处理类型
*/
String processingType() default "RECEIVE";

/**
* 是否启用GPS数据处理
*/
boolean enableGPSDataProcessing() default true;

/**
* 是否启用数据验证
*/
boolean enableDataValidation() default true;

/**
* 是否启用监控
*/
boolean enableMonitoring() default true;

/**
* 操作失败时的消息
*/
String message() default "GPS数据处理失败,请稍后重试";

/**
* 操作失败时的HTTP状态码
*/
int statusCode() default 500;
}

5.2 GPS数据处理AOP切面

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
/**
* GPS数据处理AOP切面
* @author 运维实战
*/
@Aspect
@Component
public class GPSDataProcessingAspect {

@Autowired
private GPSDataMonitorService gpsDataMonitorService;

private static final Logger logger = LoggerFactory.getLogger(GPSDataProcessingAspect.class);

/**
* GPS数据处理切点
*/
@Pointcut("@annotation(gpsDataProcessing)")
public void gpsDataProcessingPointcut(GPSDataProcessing gpsDataProcessing) {}

/**
* GPS数据处理环绕通知
* @param joinPoint 连接点
* @param gpsDataProcessing GPS数据处理注解
* @return 执行结果
* @throws Throwable 异常
*/
@Around("gpsDataProcessingPointcut(gpsDataProcessing)")
public Object around(ProceedingJoinPoint joinPoint, GPSDataProcessing gpsDataProcessing) throws Throwable {
String methodName = joinPoint.getSignature().getName();

try {
// 获取方法参数
Object[] args = joinPoint.getArgs();

// 查找车辆ID参数
String vehicleId = gpsDataProcessing.vehicleId();
String processingType = gpsDataProcessing.processingType();

if (vehicleId != null && !vehicleId.isEmpty()) {
logger.info("GPS数据处理开始: method={}, vehicleId={}, processingType={}",
methodName, vehicleId, processingType);

// 记录GPS数据处理指标
gpsDataMonitorService.recordGPSDataProcessing(vehicleId, true);
}

// 执行原方法
return joinPoint.proceed();

} catch (Exception e) {
logger.error("GPS数据处理异常: method={}", methodName, e);
throw new GPSDataProcessingException(gpsDataProcessing.message(), gpsDataProcessing.statusCode());
}
}
}

5.3 GPS数据处理异常类

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
/**
* GPS数据处理异常类
* @author 运维实战
*/
public class GPSDataProcessingException extends RuntimeException {

private final int statusCode;

public GPSDataProcessingException(String message) {
super(message);
this.statusCode = 500;
}

public GPSDataProcessingException(String message, int statusCode) {
super(message);
this.statusCode = statusCode;
}

public GPSDataProcessingException(String message, Throwable cause) {
super(message, cause);
this.statusCode = 500;
}

public GPSDataProcessingException(String message, Throwable cause, int statusCode) {
super(message, cause);
this.statusCode = statusCode;
}

public int getStatusCode() {
return statusCode;
}
}

5.4 GPS数据处理异常处理器

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
/**
* GPS数据处理异常处理器
* @author 运维实战
*/
@ControllerAdvice
public class GPSDataProcessingExceptionHandler {

private static final Logger logger = LoggerFactory.getLogger(GPSDataProcessingExceptionHandler.class);

/**
* 处理GPS数据处理异常
* @param e 异常
* @return 错误响应
*/
@ExceptionHandler(GPSDataProcessingException.class)
public ResponseEntity<Map<String, Object>> handleGPSDataProcessingException(GPSDataProcessingException e) {
logger.warn("GPS数据处理异常: {}", e.getMessage());

Map<String, Object> response = new HashMap<>();
response.put("error", "GPS_DATA_PROCESSING_FAILED");
response.put("message", e.getMessage());
response.put("timestamp", System.currentTimeMillis());

return ResponseEntity.status(e.getStatusCode()).body(response);
}
}

6. 实际应用示例

6.1 使用GPS数据处理注解的服务

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
/**
* 使用GPS数据处理注解的服务
* @author 运维实战
*/
@Service
public class GPSDataProcessingExampleService {

private static final Logger logger = LoggerFactory.getLogger(GPSDataProcessingExampleService.class);

/**
* 基础GPS数据处理示例
* @param gpsData GPS数据
* @return 处理结果
*/
@GPSDataProcessing(vehicleId = "test_vehicle", processingType = "RECEIVE",
enableGPSDataProcessing = true, enableDataValidation = true, message = "基础GPS数据处理:操作失败")
public String basicGPSDataProcessing(GPSData gpsData) {
logger.info("执行基础GPS数据处理示例,车辆ID: {}", gpsData.getVehicleId());

// 模拟GPS数据处理操作
try {
Thread.sleep(100);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}

return "基础GPS数据处理完成,车辆ID: " + gpsData.getVehicleId();
}

/**
* 大批量GPS数据处理示例
* @param gpsDataList GPS数据列表
* @return 处理结果
*/
@GPSDataProcessing(vehicleId = "batch_vehicle", processingType = "BATCH_PROCESS",
enableGPSDataProcessing = true, enableDataValidation = true, message = "大批量GPS数据处理:操作失败")
public String largeGPSDataProcessing(List<GPSData> gpsDataList) {
logger.info("执行大批量GPS数据处理示例,数量: {}", gpsDataList.size());

// 模拟大批量GPS数据处理操作
try {
Thread.sleep(500);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}

return "大批量GPS数据处理完成,数量: " + gpsDataList.size();
}

/**
* 实时GPS数据处理示例
* @param gpsData GPS数据
* @return 处理结果
*/
@GPSDataProcessing(vehicleId = "realtime_vehicle", processingType = "REALTIME_PROCESS",
enableGPSDataProcessing = true, enableDataValidation = true, message = "实时GPS数据处理:操作失败")
public String realtimeGPSDataProcessing(GPSData gpsData) {
logger.info("执行实时GPS数据处理示例,车辆ID: {}", gpsData.getVehicleId());

// 模拟实时GPS数据处理操作
try {
Thread.sleep(50);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}

return "实时GPS数据处理完成,车辆ID: " + gpsData.getVehicleId();
}
}

6.2 GPS数据处理测试控制器

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
/**
* GPS数据处理测试控制器
* @author 运维实战
*/
@RestController
@RequestMapping("/api/gps/data/test")
public class GPSDataProcessingTestController {

@Autowired
private GPSDataProcessingExampleService exampleService;

@Autowired
private GPSDataReceiverService gpsDataReceiverService;

@Autowired
private GPSDataStorageService gpsDataStorageService;

@Autowired
private GPSDataMonitorService gpsDataMonitorService;

private static final Logger logger = LoggerFactory.getLogger(GPSDataProcessingTestController.class);

/**
* 基础GPS数据处理测试
* @param vehicleId 车辆ID
* @return 测试结果
*/
@GetMapping("/basic")
public ResponseEntity<Map<String, String>> testBasicGPSDataProcessing(@RequestParam String vehicleId) {
try {
// 生成测试GPS数据
GPSData gpsData = generateTestGPSData(vehicleId);

String result = exampleService.basicGPSDataProcessing(gpsData);

Map<String, String> response = new HashMap<>();
response.put("status", "SUCCESS");
response.put("result", result);
response.put("timestamp", String.valueOf(System.currentTimeMillis()));

return ResponseEntity.ok(response);

} catch (GPSDataProcessingException e) {
logger.warn("基础GPS数据处理测试失败: {}", e.getMessage());
return ResponseEntity.status(e.getStatusCode()).build();
} catch (Exception e) {
logger.error("基础GPS数据处理测试失败", e);
return ResponseEntity.status(HttpStatus.INTERNAL_SERVER_ERROR).build();
}
}

/**
* 大批量GPS数据处理测试
* @param vehicleId 车辆ID
* @param count 数量
* @return 测试结果
*/
@GetMapping("/large")
public ResponseEntity<Map<String, String>> testLargeGPSDataProcessing(
@RequestParam String vehicleId, @RequestParam int count) {
try {
// 生成测试GPS数据列表
List<GPSData> gpsDataList = generateTestGPSDataList(vehicleId, count);

String result = exampleService.largeGPSDataProcessing(gpsDataList);

Map<String, String> response = new HashMap<>();
response.put("status", "SUCCESS");
response.put("result", result);
response.put("timestamp", String.valueOf(System.currentTimeMillis()));

return ResponseEntity.ok(response);

} catch (GPSDataProcessingException e) {
logger.warn("大批量GPS数据处理测试失败: {}", e.getMessage());
return ResponseEntity.status(e.getStatusCode()).build();
} catch (Exception e) {
logger.error("大批量GPS数据处理测试失败", e);
return ResponseEntity.status(HttpStatus.INTERNAL_SERVER_ERROR).build();
}
}

/**
* 实时GPS数据处理测试
* @param vehicleId 车辆ID
* @return 测试结果
*/
@GetMapping("/realtime")
public ResponseEntity<Map<String, String>> testRealtimeGPSDataProcessing(@RequestParam String vehicleId) {
try {
// 生成测试GPS数据
GPSData gpsData = generateTestGPSData(vehicleId);

String result = exampleService.realtimeGPSDataProcessing(gpsData);

Map<String, String> response = new HashMap<>();
response.put("status", "SUCCESS");
response.put("result", result);
response.put("timestamp", String.valueOf(System.currentTimeMillis()));

return ResponseEntity.ok(response);

} catch (GPSDataProcessingException e) {
logger.warn("实时GPS数据处理测试失败: {}", e.getMessage());
return ResponseEntity.status(e.getStatusCode()).build();
} catch (Exception e) {
logger.error("实时GPS数据处理测试失败", e);
return ResponseEntity.status(HttpStatus.INTERNAL_SERVER_ERROR).build();
}
}

/**
* 获取GPS数据监控指标
* @return GPS数据监控指标
*/
@GetMapping("/metrics")
public ResponseEntity<GPSDataMetrics> getGPSDataMetrics() {
try {
GPSDataMetrics metrics = gpsDataMonitorService.getMetrics();
return ResponseEntity.ok(metrics);
} catch (Exception e) {
logger.error("获取GPS数据监控指标失败", e);
return ResponseEntity.status(HttpStatus.INTERNAL_SERVER_ERROR).build();
}
}

/**
* 生成测试GPS数据
* @param vehicleId 车辆ID
* @return 测试GPS数据
*/
private GPSData generateTestGPSData(String vehicleId) {
GPSData gpsData = new GPSData();
gpsData.setVehicleId(vehicleId);
gpsData.setLongitude(116.3974 + Math.random() * 0.01); // 北京经度范围
gpsData.setLatitude(39.9093 + Math.random() * 0.01); // 北京纬度范围
gpsData.setAltitude(50.0 + Math.random() * 100);
gpsData.setSpeed(Math.random() * 80);
gpsData.setDirection(Math.random() * 360);
gpsData.setGpsTimestamp(System.currentTimeMillis());
gpsData.setSatelliteCount(8 + (int)(Math.random() * 4));
gpsData.setDataQuality(80 + (int)(Math.random() * 20));
return gpsData;
}

/**
* 生成测试GPS数据列表
* @param vehicleId 车辆ID
* @param count 数量
* @return 测试GPS数据列表
*/
private List<GPSData> generateTestGPSDataList(String vehicleId, int count) {
List<GPSData> gpsDataList = new ArrayList<>();
for (int i = 0; i < count; i++) {
GPSData gpsData = generateTestGPSData(vehicleId + "_" + i);
gpsDataList.add(gpsData);
}
return gpsDataList;
}
}

7. 总结

7.1 SpringBoot + Redis + Kafka + MongoDB高稳定车辆GPS数据处理最佳实践

  1. 合理设计系统架构: 使用SpringBoot + Redis + Kafka + MongoDB构建高稳定系统
  2. 选择合适的处理策略: 根据业务需求选择合适的GPS数据处理策略
  3. 监控GPS数据处理: 实时监控GPS数据处理性能和稳定性
  4. 动态调整参数: 根据监控数据动态调整处理参数
  5. 异常处理: 实现完善的异常处理和用户友好提示

7.2 性能优化建议

  • Redis缓存优化: 合理使用Redis缓存提升查询性能
  • Kafka流式处理: 使用Kafka实现异步流式处理
  • MongoDB存储优化: 优化MongoDB索引和查询性能
  • 批量处理: 使用批量处理提升处理效率
  • 并发控制: 合理控制并发处理数量

7.3 运维管理要点

  • 实时监控: 监控GPS数据处理性能和稳定性
  • 动态调整: 根据负载情况动态调整处理参数
  • 异常处理: 建立异常处理和告警机制
  • 日志管理: 完善日志记录和分析
  • 性能调优: 根据监控数据优化系统性能

通过本文的SpringBoot + Redis + Kafka + MongoDB高稳定车辆GPS数据处理实战指南,您可以掌握车辆GPS数据处理的原理、实现方法、性能优化技巧以及在企业级应用中的最佳实践,构建高效、稳定的车辆定位系统!