1. MongoDB存储原始轨迹数据概述

MongoDB存储原始轨迹数据是智能交通和地理信息系统的核心技术,通过MongoDB的Schema Free特性和时间序列数据友好设计,可以实现灵活、高效的轨迹数据存储。系统具备原始数据存储、时间序列优化、地理索引、Schema Free设计、性能优化、监控告警等功能。本文将详细介绍MongoDB存储原始轨迹数据的原理、实现方法、性能优化技巧以及在Java实战中的应用。

1.1 MongoDB存储原始轨迹数据核心价值

  1. Schema Free: 灵活的数据结构适应不同轨迹数据格式
  2. 时间序列友好: 优化的时间序列数据存储和查询
  3. 地理索引: 高效的地理位置查询和空间分析
  4. 高性能: 支持大规模轨迹数据的快速存储和检索
  5. 可扩展性: 支持水平扩展和分片存储

1.2 MongoDB存储原始轨迹数据场景

  • 车辆轨迹存储: 车辆行驶轨迹的原始数据存储
  • 人员轨迹分析: 人员移动轨迹的分析和统计
  • 地理信息管理: 地理信息数据的存储和管理
  • 轨迹数据挖掘: 轨迹数据的挖掘和分析
  • 实时轨迹查询: 实时轨迹数据的查询和展示

1.3 MongoDB技术特性

  • 文档数据库: 基于文档的NoSQL数据库
  • Schema Free: 无需预定义数据结构
  • 地理索引: 支持2d和2dsphere地理索引
  • 时间序列: 优化的时间序列数据存储
  • 分片集群: 支持水平扩展和分片

2. MongoDB存储原始轨迹数据基础实现

2.1 MongoDB轨迹数据配置类

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
/**
* MongoDB轨迹数据配置类
* @author 运维实战
*/
@Configuration
@EnableConfigurationProperties(MongoDBTrajectoryProperties.class)
public class MongoDBTrajectoryConfig {

@Autowired
private MongoDBTrajectoryProperties properties;

/**
* MongoDB轨迹数据服务
* @return MongoDB轨迹数据服务
*/
@Bean
public MongoDBTrajectoryService mongoDBTrajectoryService() {
return new MongoDBTrajectoryService();
}

/**
* MongoDB轨迹数据存储服务
* @return MongoDB轨迹数据存储服务
*/
@Bean
public MongoDBTrajectoryStorageService mongoDBTrajectoryStorageService() {
return new MongoDBTrajectoryStorageService();
}

/**
* MongoDB轨迹数据查询服务
* @return MongoDB轨迹数据查询服务
*/
@Bean
public MongoDBTrajectoryQueryService mongoDBTrajectoryQueryService() {
return new MongoDBTrajectoryQueryService();
}

/**
* MongoDB轨迹数据监控服务
* @return MongoDB轨迹数据监控服务
*/
@Bean
public MongoDBTrajectoryMonitorService mongoDBTrajectoryMonitorService() {
return new MongoDBTrajectoryMonitorService();
}

private static final Logger logger = LoggerFactory.getLogger(MongoDBTrajectoryConfig.class);
}

2.2 MongoDB轨迹数据属性配置

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
/**
* MongoDB轨迹数据属性配置
* @author 运维实战
*/
@Data
@ConfigurationProperties(prefix = "mongodb.trajectory")
public class MongoDBTrajectoryProperties {

/**
* 是否启用MongoDB轨迹数据存储
*/
private boolean enableMongoDBTrajectoryStorage = true;

/**
* 轨迹数据集合名称
*/
private String trajectoryCollectionName = "trajectory_data";

/**
* 原始轨迹数据集合名称
*/
private String rawTrajectoryCollectionName = "raw_trajectory_data";

/**
* 时间序列集合名称
*/
private String timeSeriesCollectionName = "trajectory_timeseries";

/**
* 批量插入大小
*/
private int batchInsertSize = 1000;

/**
* 批量更新大小
*/
private int batchUpdateSize = 500;

/**
* 是否启用地理索引
*/
private boolean enableGeospatialIndex = true;

/**
* 地理索引类型
*/
private String geospatialIndexType = "2dsphere";

/**
* 是否启用时间序列索引
*/
private boolean enableTimeSeriesIndex = true;

/**
* 时间序列索引字段
*/
private String timeSeriesIndexField = "timestamp";

/**
* 是否启用Schema Free
*/
private boolean enableSchemaFree = true;

/**
* 是否启用数据压缩
*/
private boolean enableDataCompression = true;

/**
* 压缩算法
*/
private String compressionAlgorithm = "snappy";

/**
* 是否启用监控
*/
private boolean enableMonitoring = true;

/**
* 监控间隔(毫秒)
*/
private long monitorInterval = 30000;

/**
* 是否启用告警
*/
private boolean enableAlert = true;

/**
* 存储异常告警阈值
*/
private int storageExceptionAlertThreshold = 100;

/**
* 查询性能告警阈值(毫秒)
*/
private long queryPerformanceAlertThreshold = 1000;
}

2.3 轨迹数据模型类

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
/**
* 轨迹数据模型类
* @author 运维实战
*/
@Data
@Document(collection = "trajectory_data")
public class TrajectoryData {

@Id
private String id;

/**
* 轨迹ID
*/
@Field("trajectory_id")
private String trajectoryId;

/**
* 对象ID(车辆ID、人员ID等)
*/
@Field("object_id")
private String objectId;

/**
* 对象类型
*/
@Field("object_type")
private String objectType;

/**
* 经度
*/
@Field("longitude")
private Double longitude;

/**
* 纬度
*/
@Field("latitude")
private Double latitude;

/**
* 海拔高度
*/
@Field("altitude")
private Double altitude;

/**
* 速度(km/h)
*/
@Field("speed")
private Double speed;

/**
* 方向角
*/
@Field("direction")
private Double direction;

/**
* 时间戳
*/
@Field("timestamp")
private Long timestamp;

/**
* GPS时间戳
*/
@Field("gps_timestamp")
private Long gpsTimestamp;

/**
* 接收时间戳
*/
@Field("receive_timestamp")
private Long receiveTimestamp;

/**
* 数据质量
*/
@Field("data_quality")
private Integer dataQuality;

/**
* 卫星数量
*/
@Field("satellite_count")
private Integer satelliteCount;

/**
* 是否有效
*/
@Field("is_valid")
private Boolean isValid;

/**
* 原始数据
*/
@Field("raw_data")
private Map<String, Object> rawData;

/**
* 扩展属性
*/
@Field("extended_properties")
private Map<String, Object> extendedProperties;

/**
* 创建时间
*/
@Field("created_at")
private Date createdAt;

/**
* 更新时间
*/
@Field("updated_at")
private Date updatedAt;

public TrajectoryData() {
this.id = UUID.randomUUID().toString();
this.createdAt = new Date();
this.updatedAt = new Date();
this.isValid = true;
this.receiveTimestamp = System.currentTimeMillis();
this.rawData = new HashMap<>();
this.extendedProperties = new HashMap<>();
}

/**
* 生成轨迹ID
* @return 轨迹ID
*/
public String generateTrajectoryId() {
if (objectId != null && timestamp != null) {
return objectId + "_" + timestamp;
}
return UUID.randomUUID().toString();
}

/**
* 验证轨迹数据
* @return 是否有效
*/
public boolean validate() {
if (longitude == null || latitude == null) {
return false;
}

if (longitude < -180 || longitude > 180) {
return false;
}

if (latitude < -90 || latitude > 90) {
return false;
}

if (timestamp == null || timestamp <= 0) {
return false;
}

return true;
}

/**
* 计算数据质量分数
* @return 数据质量分数
*/
public int calculateDataQuality() {
int quality = 100;

// 根据卫星数量调整质量
if (satelliteCount != null) {
if (satelliteCount < 4) {
quality -= 30;
} else if (satelliteCount < 6) {
quality -= 15;
} else if (satelliteCount < 8) {
quality -= 5;
}
}

// 根据速度合理性调整质量
if (speed != null) {
if (speed > 150) {
quality -= 20;
} else if (speed > 120) {
quality -= 10;
}
}

// 根据GPS时间戳调整质量
if (gpsTimestamp != null) {
long timeDiff = Math.abs(System.currentTimeMillis() - gpsTimestamp);
if (timeDiff > 300000) { // 5分钟
quality -= 25;
} else if (timeDiff > 60000) { // 1分钟
quality -= 10;
}
}

return Math.max(0, quality);
}

/**
* 添加原始数据
* @param key 键
* @param value 值
*/
public void addRawData(String key, Object value) {
if (rawData == null) {
rawData = new HashMap<>();
}
rawData.put(key, value);
}

/**
* 添加扩展属性
* @param key 键
* @param value 值
*/
public void addExtendedProperty(String key, Object value) {
if (extendedProperties == null) {
extendedProperties = new HashMap<>();
}
extendedProperties.put(key, value);
}

/**
* 获取地理位置信息
* @return 地理位置信息
*/
public GeoLocation getGeoLocation() {
return new GeoLocation(longitude, latitude, altitude);
}

/**
* 设置地理位置信息
* @param geoLocation 地理位置信息
*/
public void setGeoLocation(GeoLocation geoLocation) {
this.longitude = geoLocation.getLongitude();
this.latitude = geoLocation.getLatitude();
this.altitude = geoLocation.getAltitude();
}
}

/**
* 地理位置信息类
* @author 运维实战
*/
@Data
public class GeoLocation {

private Double longitude;
private Double latitude;
private Double altitude;

public GeoLocation() {}

public GeoLocation(Double longitude, Double latitude) {
this.longitude = longitude;
this.latitude = latitude;
}

public GeoLocation(Double longitude, Double latitude, Double altitude) {
this.longitude = longitude;
this.latitude = latitude;
this.altitude = altitude;
}

/**
* 计算两点间距离(米)
* @param other 另一个地理位置
* @return 距离(米)
*/
public double calculateDistance(GeoLocation other) {
if (this.longitude == null || this.latitude == null ||
other.longitude == null || other.latitude == null) {
return 0.0;
}

final int R = 6371000; // 地球半径(米)
double lat1Rad = Math.toRadians(this.latitude);
double lat2Rad = Math.toRadians(other.latitude);
double deltaLatRad = Math.toRadians(other.latitude - this.latitude);
double deltaLonRad = Math.toRadians(other.longitude - this.longitude);

double a = Math.sin(deltaLatRad / 2) * Math.sin(deltaLatRad / 2) +
Math.cos(lat1Rad) * Math.cos(lat2Rad) *
Math.sin(deltaLonRad / 2) * Math.sin(deltaLonRad / 2);
double c = 2 * Math.atan2(Math.sqrt(a), Math.sqrt(1 - a));

return R * c;
}

/**
* 计算两点间方向角
* @param other 另一个地理位置
* @return 方向角(度)
*/
public double calculateBearing(GeoLocation other) {
if (this.longitude == null || this.latitude == null ||
other.longitude == null || other.latitude == null) {
return 0.0;
}

double lat1Rad = Math.toRadians(this.latitude);
double lat2Rad = Math.toRadians(other.latitude);
double deltaLonRad = Math.toRadians(other.longitude - this.longitude);

double y = Math.sin(deltaLonRad) * Math.cos(lat2Rad);
double x = Math.cos(lat1Rad) * Math.sin(lat2Rad) -
Math.sin(lat1Rad) * Math.cos(lat2Rad) * Math.cos(deltaLonRad);

double bearing = Math.toDegrees(Math.atan2(y, x));
return (bearing + 360) % 360;
}
}

2.4 基础MongoDB轨迹数据服务

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
/**
* 基础MongoDB轨迹数据服务
* @author 运维实战
*/
@Service
public class MongoDBTrajectoryService {

@Autowired
private MongoDBTrajectoryProperties properties;

@Autowired
private MongoTemplate mongoTemplate;

@Autowired
private MongoDBTrajectoryStorageService mongoDBTrajectoryStorageService;

@Autowired
private MongoDBTrajectoryQueryService mongoDBTrajectoryQueryService;

@Autowired
private MongoDBTrajectoryMonitorService mongoDBTrajectoryMonitorService;

private static final Logger logger = LoggerFactory.getLogger(MongoDBTrajectoryService.class);

/**
* 存储轨迹数据
* @param trajectoryData 轨迹数据
* @return 存储结果
*/
public MongoDBTrajectoryStorageResult storeTrajectoryData(TrajectoryData trajectoryData) {
logger.info("存储轨迹数据,轨迹ID: {}, 对象ID: {}", trajectoryData.getTrajectoryId(), trajectoryData.getObjectId());

MongoDBTrajectoryStorageResult result = new MongoDBTrajectoryStorageResult();
result.setTrajectoryId(trajectoryData.getTrajectoryId());
result.setObjectId(trajectoryData.getObjectId());
result.setStartTime(System.currentTimeMillis());

try {
// 验证轨迹数据
if (!trajectoryData.validate()) {
result.setSuccess(false);
result.setError("轨迹数据验证失败");
result.setEndTime(System.currentTimeMillis());
return result;
}

// 生成轨迹ID
if (trajectoryData.getTrajectoryId() == null) {
trajectoryData.setTrajectoryId(trajectoryData.generateTrajectoryId());
}

// 计算数据质量
trajectoryData.setDataQuality(trajectoryData.calculateDataQuality());

// 存储轨迹数据
result = mongoDBTrajectoryStorageService.storeTrajectoryData(trajectoryData);

// 记录存储指标
mongoDBTrajectoryMonitorService.recordTrajectoryStorage(trajectoryData.getObjectId(), true);

logger.info("轨迹数据存储完成,轨迹ID: {}, 成功: {}, 耗时: {}ms",
trajectoryData.getTrajectoryId(), result.isSuccess(), result.getDuration());

return result;

} catch (Exception e) {
logger.error("轨迹数据存储异常,轨迹ID: {}", trajectoryData.getTrajectoryId(), e);
result.setSuccess(false);
result.setError("轨迹数据存储异常: " + e.getMessage());
result.setEndTime(System.currentTimeMillis());

// 记录存储失败指标
mongoDBTrajectoryMonitorService.recordTrajectoryStorage(trajectoryData.getObjectId(), false);

return result;
}
}

/**
* 批量存储轨迹数据
* @param trajectoryDataList 轨迹数据列表
* @return 批量存储结果
*/
public MongoDBTrajectoryBatchStorageResult batchStoreTrajectoryData(List<TrajectoryData> trajectoryDataList) {
logger.info("批量存储轨迹数据,数量: {}", trajectoryDataList.size());

MongoDBTrajectoryBatchStorageResult result = new MongoDBTrajectoryBatchStorageResult();
result.setTotalCount(trajectoryDataList.size());
result.setStartTime(System.currentTimeMillis());

try {
// 验证和预处理轨迹数据
List<TrajectoryData> validDataList = new ArrayList<>();
List<TrajectoryData> invalidDataList = new ArrayList<>();

for (TrajectoryData trajectoryData : trajectoryDataList) {
if (trajectoryData.validate()) {
// 生成轨迹ID
if (trajectoryData.getTrajectoryId() == null) {
trajectoryData.setTrajectoryId(trajectoryData.generateTrajectoryId());
}

// 计算数据质量
trajectoryData.setDataQuality(trajectoryData.calculateDataQuality());

validDataList.add(trajectoryData);
} else {
invalidDataList.add(trajectoryData);
logger.warn("轨迹数据验证失败,对象ID: {}", trajectoryData.getObjectId());
}
}

// 批量存储有效数据
if (!validDataList.isEmpty()) {
MongoDBTrajectoryBatchStorageResult storageResult = mongoDBTrajectoryStorageService.batchStoreTrajectoryData(validDataList);

result.setSuccessCount(storageResult.getSuccessCount());
result.setFailureCount(storageResult.getFailureCount() + invalidDataList.size());
} else {
result.setFailureCount(trajectoryDataList.size());
}

result.setEndTime(System.currentTimeMillis());

// 记录批量存储指标
mongoDBTrajectoryMonitorService.recordTrajectoryBatchStorage(trajectoryDataList.size(), result.getSuccessCount());

logger.info("批量轨迹数据存储完成,总数: {}, 成功: {}, 失败: {}, 耗时: {}ms",
result.getTotalCount(), result.getSuccessCount(), result.getFailureCount(), result.getDuration());

return result;

} catch (Exception e) {
logger.error("批量轨迹数据存储异常", e);
result.setSuccess(false);
result.setError("批量轨迹数据存储异常: " + e.getMessage());
result.setEndTime(System.currentTimeMillis());
return result;
}
}

/**
* 查询轨迹数据
* @param query 查询条件
* @return 查询结果
*/
public MongoDBTrajectoryQueryResult queryTrajectoryData(TrajectoryQuery query) {
logger.info("查询轨迹数据,对象ID: {}, 时间范围: {} - {}",
query.getObjectId(), query.getStartTime(), query.getEndTime());

MongoDBTrajectoryQueryResult result = new MongoDBTrajectoryQueryResult();
result.setQuery(query);
result.setStartTime(System.currentTimeMillis());

try {
// 执行查询
result = mongoDBTrajectoryQueryService.queryTrajectoryData(query);

// 记录查询指标
mongoDBTrajectoryMonitorService.recordTrajectoryQuery(query.getObjectId(), result.getTrajectoryDataList().size(), true);

logger.info("轨迹数据查询完成,对象ID: {}, 结果数量: {}, 耗时: {}ms",
query.getObjectId(), result.getTrajectoryDataList().size(), result.getDuration());

return result;

} catch (Exception e) {
logger.error("轨迹数据查询异常,对象ID: {}", query.getObjectId(), e);
result.setSuccess(false);
result.setError("轨迹数据查询异常: " + e.getMessage());
result.setEndTime(System.currentTimeMillis());

// 记录查询失败指标
mongoDBTrajectoryMonitorService.recordTrajectoryQuery(query.getObjectId(), 0, false);

return result;
}
}

/**
* 地理范围查询
* @param geoQuery 地理查询条件
* @return 查询结果
*/
public MongoDBTrajectoryQueryResult geoQueryTrajectoryData(GeoTrajectoryQuery geoQuery) {
logger.info("地理范围查询轨迹数据,中心点: ({}, {}), 半径: {}m",
geoQuery.getCenterLongitude(), geoQuery.getCenterLatitude(), geoQuery.getRadiusMeters());

MongoDBTrajectoryQueryResult result = new MongoDBTrajectoryQueryResult();
result.setStartTime(System.currentTimeMillis());

try {
// 执行地理查询
result = mongoDBTrajectoryQueryService.geoQueryTrajectoryData(geoQuery);

// 记录地理查询指标
mongoDBTrajectoryMonitorService.recordTrajectoryGeoQuery(geoQuery.getCenterLongitude(),
geoQuery.getCenterLatitude(), result.getTrajectoryDataList().size(), true);

logger.info("地理范围查询完成,中心点: ({}, {}), 结果数量: {}, 耗时: {}ms",
geoQuery.getCenterLongitude(), geoQuery.getCenterLatitude(),
result.getTrajectoryDataList().size(), result.getDuration());

return result;

} catch (Exception e) {
logger.error("地理范围查询异常,中心点: ({}, {})",
geoQuery.getCenterLongitude(), geoQuery.getCenterLatitude(), e);
result.setSuccess(false);
result.setError("地理范围查询异常: " + e.getMessage());
result.setEndTime(System.currentTimeMillis());

// 记录地理查询失败指标
mongoDBTrajectoryMonitorService.recordTrajectoryGeoQuery(geoQuery.getCenterLongitude(),
geoQuery.getCenterLatitude(), 0, false);

return result;
}
}

/**
* 创建索引
* @return 创建结果
*/
public MongoDBIndexCreateResult createIndexes() {
logger.info("创建MongoDB轨迹数据索引");

MongoDBIndexCreateResult result = new MongoDBIndexCreateResult();
result.setStartTime(System.currentTimeMillis());

try {
// 创建地理索引
if (properties.isEnableGeospatialIndex()) {
Index geoIndex = new Index().on("longitude", Sort.Direction.ASC).on("latitude", Sort.Direction.ASC);
if ("2dsphere".equals(properties.getGeospatialIndexType())) {
geoIndex = new Index().on("location", Sort.Direction.ASC);
}
mongoTemplate.indexOps(properties.getTrajectoryCollectionName()).ensureIndex(geoIndex);
}

// 创建时间序列索引
if (properties.isEnableTimeSeriesIndex()) {
Index timeIndex = new Index().on(properties.getTimeSeriesIndexField(), Sort.Direction.ASC);
mongoTemplate.indexOps(properties.getTrajectoryCollectionName()).ensureIndex(timeIndex);
}

// 创建复合索引
Index compoundIndex = new Index()
.on("object_id", Sort.Direction.ASC)
.on("timestamp", Sort.Direction.ASC);
mongoTemplate.indexOps(properties.getTrajectoryCollectionName()).ensureIndex(compoundIndex);

result.setSuccess(true);
result.setEndTime(System.currentTimeMillis());

logger.info("MongoDB轨迹数据索引创建成功,耗时: {}ms", result.getDuration());

return result;

} catch (Exception e) {
logger.error("MongoDB轨迹数据索引创建异常", e);
result.setSuccess(false);
result.setError("MongoDB轨迹数据索引创建异常: " + e.getMessage());
result.setEndTime(System.currentTimeMillis());
return result;
}
}
}

2.5 MongoDB轨迹数据查询条件类

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
/**
* 轨迹数据查询条件类
* @author 运维实战
*/
@Data
public class TrajectoryQuery {

/**
* 对象ID
*/
private String objectId;

/**
* 对象类型
*/
private String objectType;

/**
* 开始时间
*/
private Long startTime;

/**
* 结束时间
*/
private Long endTime;

/**
* 最小经度
*/
private Double minLongitude;

/**
* 最大经度
*/
private Double maxLongitude;

/**
* 最小纬度
*/
private Double minLatitude;

/**
* 最大纬度
*/
private Double maxLatitude;

/**
* 最小速度
*/
private Double minSpeed;

/**
* 最大速度
*/
private Double maxSpeed;

/**
* 最小数据质量
*/
private Integer minDataQuality;

/**
* 是否有效
*/
private Boolean isValid;

/**
* 分页大小
*/
private Integer pageSize = 1000;

/**
* 页码
*/
private Integer pageNumber = 0;

/**
* 排序字段
*/
private String sortField = "timestamp";

/**
* 排序方向
*/
private String sortDirection = "ASC";

public TrajectoryQuery() {}

public TrajectoryQuery(String objectId, Long startTime, Long endTime) {
this.objectId = objectId;
this.startTime = startTime;
this.endTime = endTime;
}

/**
* 构建MongoDB查询
* @return MongoDB查询
*/
public Query buildMongoQuery() {
Query query = new Query();

if (objectId != null) {
query.addCriteria(Criteria.where("object_id").is(objectId));
}

if (objectType != null) {
query.addCriteria(Criteria.where("object_type").is(objectType));
}

if (startTime != null && endTime != null) {
query.addCriteria(Criteria.where("timestamp").gte(startTime).lte(endTime));
} else if (startTime != null) {
query.addCriteria(Criteria.where("timestamp").gte(startTime));
} else if (endTime != null) {
query.addCriteria(Criteria.where("timestamp").lte(endTime));
}

if (minLongitude != null && maxLongitude != null) {
query.addCriteria(Criteria.where("longitude").gte(minLongitude).lte(maxLongitude));
}

if (minLatitude != null && maxLatitude != null) {
query.addCriteria(Criteria.where("latitude").gte(minLatitude).lte(maxLatitude));
}

if (minSpeed != null && maxSpeed != null) {
query.addCriteria(Criteria.where("speed").gte(minSpeed).lte(maxSpeed));
}

if (minDataQuality != null) {
query.addCriteria(Criteria.where("data_quality").gte(minDataQuality));
}

if (isValid != null) {
query.addCriteria(Criteria.where("is_valid").is(isValid));
}

// 排序
if (sortField != null) {
Sort.Direction direction = "DESC".equalsIgnoreCase(sortDirection) ?
Sort.Direction.DESC : Sort.Direction.ASC;
query.with(Sort.by(direction, sortField));
}

// 分页
if (pageSize != null && pageNumber != null) {
query.skip(pageNumber * pageSize).limit(pageSize);
}

return query;
}
}

/**
* 地理轨迹数据查询条件类
* @author 运维实战
*/
@Data
public class GeoTrajectoryQuery {

/**
* 中心经度
*/
private Double centerLongitude;

/**
* 中心纬度
*/
private Double centerLatitude;

/**
* 半径(米)
*/
private Double radiusMeters;

/**
* 开始时间
*/
private Long startTime;

/**
* 结束时间
*/
private Long endTime;

/**
* 对象类型
*/
private String objectType;

/**
* 分页大小
*/
private Integer pageSize = 1000;

/**
* 页码
*/
private Integer pageNumber = 0;

public GeoTrajectoryQuery() {}

public GeoTrajectoryQuery(Double centerLongitude, Double centerLatitude, Double radiusMeters) {
this.centerLongitude = centerLongitude;
this.centerLatitude = centerLatitude;
this.radiusMeters = radiusMeters;
}

/**
* 构建MongoDB地理查询
* @return MongoDB查询
*/
public Query buildMongoGeoQuery() {
Query query = new Query();

// 地理范围查询
if (centerLongitude != null && centerLatitude != null && radiusMeters != null) {
query.addCriteria(Criteria.where("location").nearSphere(
new Point(centerLongitude, centerLatitude)).maxDistance(radiusMeters / 111000.0));
}

if (objectType != null) {
query.addCriteria(Criteria.where("object_type").is(objectType));
}

if (startTime != null && endTime != null) {
query.addCriteria(Criteria.where("timestamp").gte(startTime).lte(endTime));
}

// 排序
query.with(Sort.by(Sort.Direction.ASC, "timestamp"));

// 分页
if (pageSize != null && pageNumber != null) {
query.skip(pageNumber * pageSize).limit(pageSize);
}

return query;
}
}

2.6 MongoDB轨迹数据结果类

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
/**
* MongoDB轨迹数据存储结果类
* @author 运维实战
*/
@Data
public class MongoDBTrajectoryStorageResult {

private boolean success;
private String trajectoryId;
private String objectId;
private TrajectoryData storedData;
private String error;
private long startTime;
private long endTime;

public MongoDBTrajectoryStorageResult() {
this.success = false;
}

/**
* 获取存储耗时
* @return 存储耗时(毫秒)
*/
public long getDuration() {
return endTime - startTime;
}

/**
* 是否成功
* @return 是否成功
*/
public boolean isSuccess() {
return success;
}
}

/**
* MongoDB轨迹数据批量存储结果类
* @author 运维实战
*/
@Data
public class MongoDBTrajectoryBatchStorageResult {

private boolean success;
private int totalCount;
private int successCount;
private int failureCount;
private String error;
private long startTime;
private long endTime;

public MongoDBTrajectoryBatchStorageResult() {
this.success = false;
this.successCount = 0;
this.failureCount = 0;
}

/**
* 获取存储耗时
* @return 存储耗时(毫秒)
*/
public long getDuration() {
return endTime - startTime;
}

/**
* 是否成功
* @return 是否成功
*/
public boolean isSuccess() {
return success;
}
}

/**
* MongoDB轨迹数据查询结果类
* @author 运维实战
*/
@Data
public class MongoDBTrajectoryQueryResult {

private boolean success;
private TrajectoryQuery query;
private List<TrajectoryData> trajectoryDataList;
private int totalCount;
private String error;
private long startTime;
private long endTime;

public MongoDBTrajectoryQueryResult() {
this.success = false;
this.trajectoryDataList = new ArrayList<>();
this.totalCount = 0;
}

/**
* 获取查询耗时
* @return 查询耗时(毫秒)
*/
public long getDuration() {
return endTime - startTime;
}

/**
* 是否成功
* @return 是否成功
*/
public boolean isSuccess() {
return success;
}
}

/**
* MongoDB索引创建结果类
* @author 运维实战
*/
@Data
public class MongoDBIndexCreateResult {

private boolean success;
private String message;
private String error;
private long startTime;
private long endTime;

public MongoDBIndexCreateResult() {
this.success = false;
}

/**
* 获取创建耗时
* @return 创建耗时(毫秒)
*/
public long getDuration() {
return endTime - startTime;
}

/**
* 是否成功
* @return 是否成功
*/
public boolean isSuccess() {
return success;
}
}

3. 高级功能实现

3.1 MongoDB轨迹数据存储服务

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
/**
* MongoDB轨迹数据存储服务
* @author 运维实战
*/
@Service
public class MongoDBTrajectoryStorageService {

@Autowired
private MongoDBTrajectoryProperties properties;

@Autowired
private MongoTemplate mongoTemplate;

@Autowired
private MongoDBTrajectoryMonitorService mongoDBTrajectoryMonitorService;

private static final Logger logger = LoggerFactory.getLogger(MongoDBTrajectoryStorageService.class);

/**
* 存储轨迹数据
* @param trajectoryData 轨迹数据
* @return 存储结果
*/
public MongoDBTrajectoryStorageResult storeTrajectoryData(TrajectoryData trajectoryData) {
logger.info("存储轨迹数据到MongoDB,轨迹ID: {}", trajectoryData.getTrajectoryId());

MongoDBTrajectoryStorageResult result = new MongoDBTrajectoryStorageResult();
result.setTrajectoryId(trajectoryData.getTrajectoryId());
result.setObjectId(trajectoryData.getObjectId());
result.setStartTime(System.currentTimeMillis());

try {
// 创建索引(如果不存在)
createIndexesIfNotExists();

// 存储轨迹数据
TrajectoryData savedData = mongoTemplate.save(trajectoryData, properties.getTrajectoryCollectionName());

result.setSuccess(true);
result.setStoredData(savedData);
result.setEndTime(System.currentTimeMillis());

// 记录存储成功指标
mongoDBTrajectoryMonitorService.recordTrajectoryStorage(trajectoryData.getObjectId(), true);

logger.info("轨迹数据存储到MongoDB成功,轨迹ID: {}, 耗时: {}ms",
trajectoryData.getTrajectoryId(), result.getDuration());

return result;

} catch (Exception e) {
logger.error("轨迹数据存储到MongoDB失败,轨迹ID: {}", trajectoryData.getTrajectoryId(), e);
result.setSuccess(false);
result.setError("轨迹数据存储到MongoDB失败: " + e.getMessage());
result.setEndTime(System.currentTimeMillis());

// 记录存储失败指标
mongoDBTrajectoryMonitorService.recordTrajectoryStorage(trajectoryData.getObjectId(), false);

return result;
}
}

/**
* 批量存储轨迹数据
* @param trajectoryDataList 轨迹数据列表
* @return 批量存储结果
*/
public MongoDBTrajectoryBatchStorageResult batchStoreTrajectoryData(List<TrajectoryData> trajectoryDataList) {
logger.info("批量存储轨迹数据到MongoDB,数量: {}", trajectoryDataList.size());

MongoDBTrajectoryBatchStorageResult result = new MongoDBTrajectoryBatchStorageResult();
result.setTotalCount(trajectoryDataList.size());
result.setStartTime(System.currentTimeMillis());

try {
// 创建索引(如果不存在)
createIndexesIfNotExists();

// 分批存储
List<List<TrajectoryData>> batches = partitionList(trajectoryDataList, properties.getBatchInsertSize());

for (List<TrajectoryData> batch : batches) {
Collection<TrajectoryData> savedData = mongoTemplate.insert(batch, properties.getTrajectoryCollectionName());
result.setSuccessCount(result.getSuccessCount() + savedData.size());
}

result.setFailureCount(result.getTotalCount() - result.getSuccessCount());
result.setEndTime(System.currentTimeMillis());

// 记录批量存储指标
mongoDBTrajectoryMonitorService.recordTrajectoryBatchStorage(trajectoryDataList.size(), result.getSuccessCount());

logger.info("批量轨迹数据存储到MongoDB完成,总数: {}, 成功: {}, 失败: {}, 耗时: {}ms",
result.getTotalCount(), result.getSuccessCount(), result.getFailureCount(), result.getDuration());

return result;

} catch (Exception e) {
logger.error("批量轨迹数据存储到MongoDB失败", e);
result.setSuccess(false);
result.setError("批量轨迹数据存储到MongoDB失败: " + e.getMessage());
result.setEndTime(System.currentTimeMillis());
return result;
}
}

/**
* 存储原始轨迹数据
* @param rawData 原始数据
* @return 存储结果
*/
public MongoDBTrajectoryStorageResult storeRawTrajectoryData(Map<String, Object> rawData) {
logger.info("存储原始轨迹数据到MongoDB");

MongoDBTrajectoryStorageResult result = new MongoDBTrajectoryStorageResult();
result.setStartTime(System.currentTimeMillis());

try {
// 创建索引(如果不存在)
createIndexesIfNotExists();

// 存储原始数据
mongoTemplate.insert(rawData, properties.getRawTrajectoryCollectionName());

result.setSuccess(true);
result.setEndTime(System.currentTimeMillis());

logger.info("原始轨迹数据存储到MongoDB成功,耗时: {}ms", result.getDuration());

return result;

} catch (Exception e) {
logger.error("原始轨迹数据存储到MongoDB失败", e);
result.setSuccess(false);
result.setError("原始轨迹数据存储到MongoDB失败: " + e.getMessage());
result.setEndTime(System.currentTimeMillis());
return result;
}
}

/**
* 存储时间序列轨迹数据
* @param trajectoryData 轨迹数据
* @return 存储结果
*/
public MongoDBTrajectoryStorageResult storeTimeSeriesTrajectoryData(TrajectoryData trajectoryData) {
logger.info("存储时间序列轨迹数据到MongoDB,轨迹ID: {}", trajectoryData.getTrajectoryId());

MongoDBTrajectoryStorageResult result = new MongoDBTrajectoryStorageResult();
result.setTrajectoryId(trajectoryData.getTrajectoryId());
result.setObjectId(trajectoryData.getObjectId());
result.setStartTime(System.currentTimeMillis());

try {
// 创建时间序列索引(如果不存在)
createTimeSeriesIndexesIfNotExists();

// 存储时间序列数据
TrajectoryData savedData = mongoTemplate.save(trajectoryData, properties.getTimeSeriesCollectionName());

result.setSuccess(true);
result.setStoredData(savedData);
result.setEndTime(System.currentTimeMillis());

logger.info("时间序列轨迹数据存储到MongoDB成功,轨迹ID: {}, 耗时: {}ms",
trajectoryData.getTrajectoryId(), result.getDuration());

return result;

} catch (Exception e) {
logger.error("时间序列轨迹数据存储到MongoDB失败,轨迹ID: {}", trajectoryData.getTrajectoryId(), e);
result.setSuccess(false);
result.setError("时间序列轨迹数据存储到MongoDB失败: " + e.getMessage());
result.setEndTime(System.currentTimeMillis());
return result;
}
}

/**
* 创建索引(如果不存在)
*/
private void createIndexesIfNotExists() {
try {
// 创建地理索引
if (properties.isEnableGeospatialIndex()) {
Index geoIndex = new Index().on("longitude", Sort.Direction.ASC).on("latitude", Sort.Direction.ASC);
if ("2dsphere".equals(properties.getGeospatialIndexType())) {
geoIndex = new Index().on("location", Sort.Direction.ASC);
}
mongoTemplate.indexOps(properties.getTrajectoryCollectionName()).ensureIndex(geoIndex);
}

// 创建时间序列索引
if (properties.isEnableTimeSeriesIndex()) {
Index timeIndex = new Index().on(properties.getTimeSeriesIndexField(), Sort.Direction.ASC);
mongoTemplate.indexOps(properties.getTrajectoryCollectionName()).ensureIndex(timeIndex);
}

// 创建复合索引
Index compoundIndex = new Index()
.on("object_id", Sort.Direction.ASC)
.on("timestamp", Sort.Direction.ASC);
mongoTemplate.indexOps(properties.getTrajectoryCollectionName()).ensureIndex(compoundIndex);

} catch (Exception e) {
logger.error("创建MongoDB索引失败", e);
}
}

/**
* 创建时间序列索引(如果不存在)
*/
private void createTimeSeriesIndexesIfNotExists() {
try {
// 创建时间序列索引
Index timeIndex = new Index().on(properties.getTimeSeriesIndexField(), Sort.Direction.ASC);
mongoTemplate.indexOps(properties.getTimeSeriesCollectionName()).ensureIndex(timeIndex);

// 创建对象ID索引
Index objectIdIndex = new Index().on("object_id", Sort.Direction.ASC);
mongoTemplate.indexOps(properties.getTimeSeriesCollectionName()).ensureIndex(objectIdIndex);

// 创建复合索引
Index compoundIndex = new Index()
.on("object_id", Sort.Direction.ASC)
.on(properties.getTimeSeriesIndexField(), Sort.Direction.ASC);
mongoTemplate.indexOps(properties.getTimeSeriesCollectionName()).ensureIndex(compoundIndex);

} catch (Exception e) {
logger.error("创建MongoDB时间序列索引失败", e);
}
}

/**
* 分割列表
* @param list 原列表
* @param size 分割大小
* @return 分割后的列表
*/
private <T> List<List<T>> partitionList(List<T> list, int size) {
List<List<T>> partitions = new ArrayList<>();
for (int i = 0; i < list.size(); i += size) {
partitions.add(list.subList(i, Math.min(i + size, list.size())));
}
return partitions;
}
}

3.2 MongoDB轨迹数据查询服务

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
/**
* MongoDB轨迹数据查询服务
* @author 运维实战
*/
@Service
public class MongoDBTrajectoryQueryService {

@Autowired
private MongoDBTrajectoryProperties properties;

@Autowired
private MongoTemplate mongoTemplate;

@Autowired
private MongoDBTrajectoryMonitorService mongoDBTrajectoryMonitorService;

private static final Logger logger = LoggerFactory.getLogger(MongoDBTrajectoryQueryService.class);

/**
* 查询轨迹数据
* @param query 查询条件
* @return 查询结果
*/
public MongoDBTrajectoryQueryResult queryTrajectoryData(TrajectoryQuery query) {
logger.info("查询轨迹数据,对象ID: {}", query.getObjectId());

MongoDBTrajectoryQueryResult result = new MongoDBTrajectoryQueryResult();
result.setQuery(query);
result.setStartTime(System.currentTimeMillis());

try {
// 构建MongoDB查询
Query mongoQuery = query.buildMongoQuery();

// 执行查询
List<TrajectoryData> trajectoryDataList = mongoTemplate.find(mongoQuery, TrajectoryData.class, properties.getTrajectoryCollectionName());

result.setSuccess(true);
result.setTrajectoryDataList(trajectoryDataList);
result.setTotalCount(trajectoryDataList.size());
result.setEndTime(System.currentTimeMillis());

logger.info("轨迹数据查询成功,对象ID: {}, 结果数量: {}, 耗时: {}ms",
query.getObjectId(), result.getTotalCount(), result.getDuration());

return result;

} catch (Exception e) {
logger.error("轨迹数据查询异常,对象ID: {}", query.getObjectId(), e);
result.setSuccess(false);
result.setError("轨迹数据查询异常: " + e.getMessage());
result.setEndTime(System.currentTimeMillis());
return result;
}
}

/**
* 地理范围查询轨迹数据
* @param geoQuery 地理查询条件
* @return 查询结果
*/
public MongoDBTrajectoryQueryResult geoQueryTrajectoryData(GeoTrajectoryQuery geoQuery) {
logger.info("地理范围查询轨迹数据,中心点: ({}, {})", geoQuery.getCenterLongitude(), geoQuery.getCenterLatitude());

MongoDBTrajectoryQueryResult result = new MongoDBTrajectoryQueryResult();
result.setStartTime(System.currentTimeMillis());

try {
// 构建MongoDB地理查询
Query mongoQuery = geoQuery.buildMongoGeoQuery();

// 执行地理查询
List<TrajectoryData> trajectoryDataList = mongoTemplate.find(mongoQuery, TrajectoryData.class, properties.getTrajectoryCollectionName());

result.setSuccess(true);
result.setTrajectoryDataList(trajectoryDataList);
result.setTotalCount(trajectoryDataList.size());
result.setEndTime(System.currentTimeMillis());

logger.info("地理范围查询成功,中心点: ({}, {}), 结果数量: {}, 耗时: {}ms",
geoQuery.getCenterLongitude(), geoQuery.getCenterLatitude(), result.getTotalCount(), result.getDuration());

return result;

} catch (Exception e) {
logger.error("地理范围查询异常,中心点: ({}, {})", geoQuery.getCenterLongitude(), geoQuery.getCenterLatitude(), e);
result.setSuccess(false);
result.setError("地理范围查询异常: " + e.getMessage());
result.setEndTime(System.currentTimeMillis());
return result;
}
}

/**
* 查询时间序列轨迹数据
* @param objectId 对象ID
* @param startTime 开始时间
* @param endTime 结束时间
* @return 查询结果
*/
public MongoDBTrajectoryQueryResult queryTimeSeriesTrajectoryData(String objectId, Long startTime, Long endTime) {
logger.info("查询时间序列轨迹数据,对象ID: {}, 时间范围: {} - {}", objectId, startTime, endTime);

MongoDBTrajectoryQueryResult result = new MongoDBTrajectoryQueryResult();
result.setStartTime(System.currentTimeMillis());

try {
// 构建查询
Query query = new Query();
query.addCriteria(Criteria.where("object_id").is(objectId));

if (startTime != null && endTime != null) {
query.addCriteria(Criteria.where(properties.getTimeSeriesIndexField()).gte(startTime).lte(endTime));
}

query.with(Sort.by(Sort.Direction.ASC, properties.getTimeSeriesIndexField()));

// 执行查询
List<TrajectoryData> trajectoryDataList = mongoTemplate.find(query, TrajectoryData.class, properties.getTimeSeriesCollectionName());

result.setSuccess(true);
result.setTrajectoryDataList(trajectoryDataList);
result.setTotalCount(trajectoryDataList.size());
result.setEndTime(System.currentTimeMillis());

logger.info("时间序列轨迹数据查询成功,对象ID: {}, 结果数量: {}, 耗时: {}ms",
objectId, result.getTotalCount(), result.getDuration());

return result;

} catch (Exception e) {
logger.error("时间序列轨迹数据查询异常,对象ID: {}", objectId, e);
result.setSuccess(false);
result.setError("时间序列轨迹数据查询异常: " + e.getMessage());
result.setEndTime(System.currentTimeMillis());
return result;
}
}

/**
* 聚合查询轨迹数据
* @param objectId 对象ID
* @param startTime 开始时间
* @param endTime 结束时间
* @return 聚合结果
*/
public MongoDBTrajectoryAggregationResult aggregateTrajectoryData(String objectId, Long startTime, Long endTime) {
logger.info("聚合查询轨迹数据,对象ID: {}, 时间范围: {} - {}", objectId, startTime, endTime);

MongoDBTrajectoryAggregationResult result = new MongoDBTrajectoryAggregationResult();
result.setObjectId(objectId);
result.setStartTime(System.currentTimeMillis());

try {
// 构建聚合管道
Aggregation aggregation = Aggregation.newAggregation(
Aggregation.match(Criteria.where("object_id").is(objectId)
.and(properties.getTimeSeriesIndexField()).gte(startTime).lte(endTime)),
Aggregation.group("object_id")
.count().as("totalCount")
.avg("speed").as("avgSpeed")
.max("speed").as("maxSpeed")
.min("speed").as("minSpeed")
.avg("data_quality").as("avgDataQuality")
);

// 执行聚合查询
AggregationResults<Map> aggregationResults = mongoTemplate.aggregate(
aggregation, properties.getTrajectoryCollectionName(), Map.class);

List<Map> results = aggregationResults.getMappedResults();

if (!results.isEmpty()) {
Map<String, Object> data = results.get(0);
result.setSuccess(true);
result.setTotalCount(((Number) data.get("totalCount")).intValue());
result.setAvgSpeed(((Number) data.get("avgSpeed")).doubleValue());
result.setMaxSpeed(((Number) data.get("maxSpeed")).doubleValue());
result.setMinSpeed(((Number) data.get("minSpeed")).doubleValue());
result.setAvgDataQuality(((Number) data.get("avgDataQuality")).doubleValue());
} else {
result.setSuccess(true);
result.setTotalCount(0);
}

result.setEndTime(System.currentTimeMillis());

logger.info("轨迹数据聚合查询成功,对象ID: {}, 总数量: {}, 耗时: {}ms",
objectId, result.getTotalCount(), result.getDuration());

return result;

} catch (Exception e) {
logger.error("轨迹数据聚合查询异常,对象ID: {}", objectId, e);
result.setSuccess(false);
result.setError("轨迹数据聚合查询异常: " + e.getMessage());
result.setEndTime(System.currentTimeMillis());
return result;
}
}
}

3.3 MongoDB轨迹数据监控服务

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
/**
* MongoDB轨迹数据监控服务
* @author 运维实战
*/
@Service
public class MongoDBTrajectoryMonitorService {

private final AtomicLong totalTrajectoryStored = new AtomicLong(0);
private final AtomicLong totalTrajectoryQueried = new AtomicLong(0);
private final AtomicLong totalSuccessfulStorages = new AtomicLong(0);
private final AtomicLong totalFailedStorages = new AtomicLong(0);
private final AtomicLong totalSuccessfulQueries = new AtomicLong(0);
private final AtomicLong totalFailedQueries = new AtomicLong(0);

private long lastResetTime = System.currentTimeMillis();
private final long resetInterval = 300000; // 5分钟重置一次

private static final Logger logger = LoggerFactory.getLogger(MongoDBTrajectoryMonitorService.class);

/**
* 记录轨迹数据存储
* @param objectId 对象ID
* @param success 是否成功
*/
public void recordTrajectoryStorage(String objectId, boolean success) {
totalTrajectoryStored.incrementAndGet();

if (success) {
totalSuccessfulStorages.incrementAndGet();
} else {
totalFailedStorages.incrementAndGet();
}

logger.debug("记录轨迹数据存储: 对象ID={}, 成功={}", objectId, success);
}

/**
* 记录轨迹数据查询
* @param objectId 对象ID
* @param resultCount 结果数量
* @param success 是否成功
*/
public void recordTrajectoryQuery(String objectId, int resultCount, boolean success) {
totalTrajectoryQueried.incrementAndGet();

if (success) {
totalSuccessfulQueries.incrementAndGet();
} else {
totalFailedQueries.incrementAndGet();
}

logger.debug("记录轨迹数据查询: 对象ID={}, 结果数量={}, 成功={}", objectId, resultCount, success);
}

/**
* 记录轨迹数据批量存储
* @param totalCount 总数量
* @param successCount 成功数量
*/
public void recordTrajectoryBatchStorage(int totalCount, int successCount) {
totalTrajectoryStored.addAndGet(totalCount);
totalSuccessfulStorages.addAndGet(successCount);
totalFailedStorages.addAndGet(totalCount - successCount);

logger.debug("记录轨迹数据批量存储: 总数={}, 成功={}", totalCount, successCount);
}

/**
* 记录轨迹数据地理查询
* @param longitude 经度
* @param latitude 纬度
* @param resultCount 结果数量
* @param success 是否成功
*/
public void recordTrajectoryGeoQuery(Double longitude, Double latitude, int resultCount, boolean success) {
totalTrajectoryQueried.incrementAndGet();

if (success) {
totalSuccessfulQueries.incrementAndGet();
} else {
totalFailedQueries.incrementAndGet();
}

logger.debug("记录轨迹数据地理查询: 经度={}, 纬度={}, 结果数量={}, 成功={}", longitude, latitude, resultCount, success);
}

/**
* 获取监控指标
* @return 监控指标
*/
public MongoDBTrajectoryMetrics getMetrics() {
// 检查是否需要重置
if (System.currentTimeMillis() - lastResetTime > resetInterval) {
resetMetrics();
}

MongoDBTrajectoryMetrics metrics = new MongoDBTrajectoryMetrics();
metrics.setTotalTrajectoryStored(totalTrajectoryStored.get());
metrics.setTotalTrajectoryQueried(totalTrajectoryQueried.get());
metrics.setTotalSuccessfulStorages(totalSuccessfulStorages.get());
metrics.setTotalFailedStorages(totalFailedStorages.get());
metrics.setTotalSuccessfulQueries(totalSuccessfulQueries.get());
metrics.setTotalFailedQueries(totalFailedQueries.get());
metrics.setTimestamp(System.currentTimeMillis());

return metrics;
}

/**
* 重置指标
*/
private void resetMetrics() {
totalTrajectoryStored.set(0);
totalTrajectoryQueried.set(0);
totalSuccessfulStorages.set(0);
totalFailedStorages.set(0);
totalSuccessfulQueries.set(0);
totalFailedQueries.set(0);
lastResetTime = System.currentTimeMillis();

logger.info("MongoDB轨迹数据监控指标重置");
}

/**
* 定期监控MongoDB轨迹数据处理状态
*/
@Scheduled(fixedRate = 30000) // 每30秒监控一次
public void monitorMongoDBTrajectoryProcessingStatus() {
try {
MongoDBTrajectoryMetrics metrics = getMetrics();

logger.info("MongoDB轨迹数据处理监控: 存储={}, 查询={}, 存储成功={}, 存储失败={}, 查询成功={}, 查询失败={}, 存储成功率={}%, 查询成功率={}%",
metrics.getTotalTrajectoryStored(), metrics.getTotalTrajectoryQueried(),
metrics.getTotalSuccessfulStorages(), metrics.getTotalFailedStorages(),
metrics.getTotalSuccessfulQueries(), metrics.getTotalFailedQueries(),
String.format("%.2f", metrics.getStorageSuccessRate()),
String.format("%.2f", metrics.getQuerySuccessRate()));

// 检查异常情况
if (metrics.getStorageSuccessRate() < 95) {
logger.warn("MongoDB轨迹数据存储成功率过低: {}%", String.format("%.2f", metrics.getStorageSuccessRate()));
}

if (metrics.getQuerySuccessRate() < 90) {
logger.warn("MongoDB轨迹数据查询成功率过低: {}%", String.format("%.2f", metrics.getQuerySuccessRate()));
}

} catch (Exception e) {
logger.error("MongoDB轨迹数据处理状态监控失败", e);
}
}
}

3.4 MongoDB轨迹数据指标类

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
/**
* MongoDB轨迹数据指标类
* @author 运维实战
*/
@Data
public class MongoDBTrajectoryMetrics {

private long totalTrajectoryStored;
private long totalTrajectoryQueried;
private long totalSuccessfulStorages;
private long totalFailedStorages;
private long totalSuccessfulQueries;
private long totalFailedQueries;
private long timestamp;

public MongoDBTrajectoryMetrics() {
this.timestamp = System.currentTimeMillis();
}

/**
* 获取存储成功率
* @return 存储成功率
*/
public double getStorageSuccessRate() {
long total = totalSuccessfulStorages + totalFailedStorages;
if (total == 0) return 0.0;
return (double) totalSuccessfulStorages / total * 100;
}

/**
* 获取存储失败率
* @return 存储失败率
*/
public double getStorageFailureRate() {
long total = totalSuccessfulStorages + totalFailedStorages;
if (total == 0) return 0.0;
return (double) totalFailedStorages / total * 100;
}

/**
* 获取查询成功率
* @return 查询成功率
*/
public double getQuerySuccessRate() {
long total = totalSuccessfulQueries + totalFailedQueries;
if (total == 0) return 0.0;
return (double) totalSuccessfulQueries / total * 100;
}

/**
* 获取查询失败率
* @return 查询失败率
*/
public double getQueryFailureRate() {
long total = totalSuccessfulQueries + totalFailedQueries;
if (total == 0) return 0.0;
return (double) totalFailedQueries / total * 100;
}

/**
* 获取存储效率
* @return 存储效率
*/
public double getStorageEfficiency() {
if (totalTrajectoryStored == 0) return 0.0;
return (double) totalSuccessfulStorages / totalTrajectoryStored * 100;
}

/**
* 获取查询效率
* @return 查询效率
*/
public double getQueryEfficiency() {
if (totalTrajectoryQueried == 0) return 0.0;
return (double) totalSuccessfulQueries / totalTrajectoryQueried * 100;
}

/**
* 是否健康
* @return 是否健康
*/
public boolean isHealthy() {
return getStorageSuccessRate() > 95 && getQuerySuccessRate() > 90;
}
}

/**
* MongoDB轨迹数据聚合结果类
* @author 运维实战
*/
@Data
public class MongoDBTrajectoryAggregationResult {

private boolean success;
private String objectId;
private int totalCount;
private double avgSpeed;
private double maxSpeed;
private double minSpeed;
private double avgDataQuality;
private String error;
private long startTime;
private long endTime;

public MongoDBTrajectoryAggregationResult() {
this.success = false;
this.totalCount = 0;
this.avgSpeed = 0.0;
this.maxSpeed = 0.0;
this.minSpeed = 0.0;
this.avgDataQuality = 0.0;
}

/**
* 获取聚合耗时
* @return 聚合耗时(毫秒)
*/
public long getDuration() {
return endTime - startTime;
}

/**
* 是否成功
* @return 是否成功
*/
public boolean isSuccess() {
return success;
}
}

4. MongoDB轨迹数据控制器

4.1 MongoDB轨迹数据REST控制器

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
/**
* MongoDB轨迹数据REST控制器
* @author 运维实战
*/
@RestController
@RequestMapping("/api/mongodb/trajectory")
public class MongoDBTrajectoryController {

@Autowired
private MongoDBTrajectoryService mongoDBTrajectoryService;

@Autowired
private MongoDBTrajectoryMonitorService mongoDBTrajectoryMonitorService;

private static final Logger logger = LoggerFactory.getLogger(MongoDBTrajectoryController.class);

/**
* 存储轨迹数据
* @param trajectoryData 轨迹数据
* @return 存储结果
*/
@PostMapping("/store")
public ResponseEntity<MongoDBTrajectoryStorageResult> storeTrajectoryData(@RequestBody TrajectoryData trajectoryData) {
try {
logger.info("接收到轨迹数据存储请求,轨迹ID: {}", trajectoryData.getTrajectoryId());

MongoDBTrajectoryStorageResult result = mongoDBTrajectoryService.storeTrajectoryData(trajectoryData);

return ResponseEntity.ok(result);

} catch (Exception e) {
logger.error("轨迹数据存储失败", e);
return ResponseEntity.status(HttpStatus.INTERNAL_SERVER_ERROR).build();
}
}

/**
* 批量存储轨迹数据
* @param trajectoryDataList 轨迹数据列表
* @return 批量存储结果
*/
@PostMapping("/batch-store")
public ResponseEntity<MongoDBTrajectoryBatchStorageResult> batchStoreTrajectoryData(@RequestBody List<TrajectoryData> trajectoryDataList) {
try {
logger.info("接收到轨迹数据批量存储请求,数量: {}", trajectoryDataList.size());

MongoDBTrajectoryBatchStorageResult result = mongoDBTrajectoryService.batchStoreTrajectoryData(trajectoryDataList);

return ResponseEntity.ok(result);

} catch (Exception e) {
logger.error("轨迹数据批量存储失败", e);
return ResponseEntity.status(HttpStatus.INTERNAL_SERVER_ERROR).build();
}
}

/**
* 查询轨迹数据
* @param query 查询条件
* @return 查询结果
*/
@PostMapping("/query")
public ResponseEntity<MongoDBTrajectoryQueryResult> queryTrajectoryData(@RequestBody TrajectoryQuery query) {
try {
logger.info("接收到轨迹数据查询请求,对象ID: {}", query.getObjectId());

MongoDBTrajectoryQueryResult result = mongoDBTrajectoryService.queryTrajectoryData(query);

return ResponseEntity.ok(result);

} catch (Exception e) {
logger.error("轨迹数据查询失败", e);
return ResponseEntity.status(HttpStatus.INTERNAL_SERVER_ERROR).build();
}
}

/**
* 地理范围查询轨迹数据
* @param geoQuery 地理查询条件
* @return 查询结果
*/
@PostMapping("/geo-query")
public ResponseEntity<MongoDBTrajectoryQueryResult> geoQueryTrajectoryData(@RequestBody GeoTrajectoryQuery geoQuery) {
try {
logger.info("接收到轨迹数据地理查询请求,中心点: ({}, {})", geoQuery.getCenterLongitude(), geoQuery.getCenterLatitude());

MongoDBTrajectoryQueryResult result = mongoDBTrajectoryService.geoQueryTrajectoryData(geoQuery);

return ResponseEntity.ok(result);

} catch (Exception e) {
logger.error("轨迹数据地理查询失败", e);
return ResponseEntity.status(HttpStatus.INTERNAL_SERVER_ERROR).build();
}
}

/**
* 创建索引
* @return 创建结果
*/
@PostMapping("/create-indexes")
public ResponseEntity<MongoDBIndexCreateResult> createIndexes() {
try {
logger.info("接收到索引创建请求");

MongoDBIndexCreateResult result = mongoDBTrajectoryService.createIndexes();

return ResponseEntity.ok(result);

} catch (Exception e) {
logger.error("索引创建失败", e);
return ResponseEntity.status(HttpStatus.INTERNAL_SERVER_ERROR).build();
}
}

/**
* 获取MongoDB轨迹数据监控指标
* @return 监控指标
*/
@GetMapping("/metrics")
public ResponseEntity<MongoDBTrajectoryMetrics> getMongoDBTrajectoryMetrics() {
try {
MongoDBTrajectoryMetrics metrics = mongoDBTrajectoryMonitorService.getMetrics();
return ResponseEntity.ok(metrics);
} catch (Exception e) {
logger.error("获取MongoDB轨迹数据监控指标失败", e);
return ResponseEntity.status(HttpStatus.INTERNAL_SERVER_ERROR).build();
}
}
}

5. MongoDB轨迹数据注解和AOP

5.1 MongoDB轨迹数据注解

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
/**
* MongoDB轨迹数据注解
* @author 运维实战
*/
@Target({ElementType.METHOD, ElementType.TYPE})
@Retention(RetentionPolicy.RUNTIME)
@Documented
public @interface MongoDBTrajectory {

/**
* 对象ID
*/
String objectId() default "";

/**
* 操作类型
*/
String operationType() default "STORE";

/**
* 是否启用MongoDB轨迹数据存储
*/
boolean enableMongoDBTrajectoryStorage() default true;

/**
* 是否启用Schema Free
*/
boolean enableSchemaFree() default true;

/**
* 是否启用时间序列优化
*/
boolean enableTimeSeriesOptimization() default true;

/**
* 是否启用地理索引
*/
boolean enableGeospatialIndex() default true;

/**
* 是否启用监控
*/
boolean enableMonitoring() default true;

/**
* 操作失败时的消息
*/
String message() default "MongoDB轨迹数据操作失败,请稍后重试";

/**
* 操作失败时的HTTP状态码
*/
int statusCode() default 500;
}

5.2 MongoDB轨迹数据AOP切面

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
/**
* MongoDB轨迹数据AOP切面
* @author 运维实战
*/
@Aspect
@Component
public class MongoDBTrajectoryAspect {

@Autowired
private MongoDBTrajectoryMonitorService mongoDBTrajectoryMonitorService;

private static final Logger logger = LoggerFactory.getLogger(MongoDBTrajectoryAspect.class);

/**
* MongoDB轨迹数据切点
*/
@Pointcut("@annotation(mongoDBTrajectory)")
public void mongoDBTrajectoryPointcut(MongoDBTrajectory mongoDBTrajectory) {}

/**
* MongoDB轨迹数据环绕通知
* @param joinPoint 连接点
* @param mongoDBTrajectory MongoDB轨迹数据注解
* @return 执行结果
* @throws Throwable 异常
*/
@Around("mongoDBTrajectoryPointcut(mongoDBTrajectory)")
public Object around(ProceedingJoinPoint joinPoint, MongoDBTrajectory mongoDBTrajectory) throws Throwable {
String methodName = joinPoint.getSignature().getName();

try {
// 获取方法参数
Object[] args = joinPoint.getArgs();

// 查找对象ID参数
String objectId = mongoDBTrajectory.objectId();
String operationType = mongoDBTrajectory.operationType();

if (objectId != null && !objectId.isEmpty()) {
logger.info("MongoDB轨迹数据操作开始: method={}, objectId={}, operationType={}",
methodName, objectId, operationType);

// 记录MongoDB轨迹数据操作指标
if ("STORE".equals(operationType)) {
mongoDBTrajectoryMonitorService.recordTrajectoryStorage(objectId, true);
} else if ("QUERY".equals(operationType)) {
mongoDBTrajectoryMonitorService.recordTrajectoryQuery(objectId, 0, true);
}
}

// 执行原方法
return joinPoint.proceed();

} catch (Exception e) {
logger.error("MongoDB轨迹数据操作异常: method={}", methodName, e);
throw new MongoDBTrajectoryException(mongoDBTrajectory.message(), mongoDBTrajectory.statusCode());
}
}
}

5.3 MongoDB轨迹数据异常类

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
/**
* MongoDB轨迹数据异常类
* @author 运维实战
*/
public class MongoDBTrajectoryException extends RuntimeException {

private final int statusCode;

public MongoDBTrajectoryException(String message) {
super(message);
this.statusCode = 500;
}

public MongoDBTrajectoryException(String message, int statusCode) {
super(message);
this.statusCode = statusCode;
}

public MongoDBTrajectoryException(String message, Throwable cause) {
super(message, cause);
this.statusCode = 500;
}

public MongoDBTrajectoryException(String message, Throwable cause, int statusCode) {
super(message, cause);
this.statusCode = statusCode;
}

public int getStatusCode() {
return statusCode;
}
}

5.4 MongoDB轨迹数据异常处理器

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
/**
* MongoDB轨迹数据异常处理器
* @author 运维实战
*/
@ControllerAdvice
public class MongoDBTrajectoryExceptionHandler {

private static final Logger logger = LoggerFactory.getLogger(MongoDBTrajectoryExceptionHandler.class);

/**
* 处理MongoDB轨迹数据异常
* @param e 异常
* @return 错误响应
*/
@ExceptionHandler(MongoDBTrajectoryException.class)
public ResponseEntity<Map<String, Object>> handleMongoDBTrajectoryException(MongoDBTrajectoryException e) {
logger.warn("MongoDB轨迹数据异常: {}", e.getMessage());

Map<String, Object> response = new HashMap<>();
response.put("error", "MONGODB_TRAJECTORY_FAILED");
response.put("message", e.getMessage());
response.put("timestamp", System.currentTimeMillis());

return ResponseEntity.status(e.getStatusCode()).body(response);
}
}

6. 实际应用示例

6.1 使用MongoDB轨迹数据注解的服务

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
/**
* 使用MongoDB轨迹数据注解的服务
* @author 运维实战
*/
@Service
public class MongoDBTrajectoryExampleService {

private static final Logger logger = LoggerFactory.getLogger(MongoDBTrajectoryExampleService.class);

/**
* 基础MongoDB轨迹数据存储示例
* @param trajectoryData 轨迹数据
* @return 处理结果
*/
@MongoDBTrajectory(objectId = "test_object", operationType = "STORE",
enableMongoDBTrajectoryStorage = true, enableSchemaFree = true,
enableTimeSeriesOptimization = true, enableGeospatialIndex = true,
message = "基础MongoDB轨迹数据存储:操作失败")
public String basicMongoDBTrajectoryStorage(TrajectoryData trajectoryData) {
logger.info("执行基础MongoDB轨迹数据存储示例,对象ID: {}", trajectoryData.getObjectId());

// 模拟MongoDB轨迹数据存储操作
try {
Thread.sleep(100);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}

return "基础MongoDB轨迹数据存储完成,对象ID: " + trajectoryData.getObjectId();
}

/**
* 大批量MongoDB轨迹数据存储示例
* @param trajectoryDataList 轨迹数据列表
* @return 处理结果
*/
@MongoDBTrajectory(objectId = "batch_object", operationType = "STORE",
enableMongoDBTrajectoryStorage = true, enableSchemaFree = true,
enableTimeSeriesOptimization = true, enableGeospatialIndex = true,
message = "大批量MongoDB轨迹数据存储:操作失败")
public String largeMongoDBTrajectoryStorage(List<TrajectoryData> trajectoryDataList) {
logger.info("执行大批量MongoDB轨迹数据存储示例,数量: {}", trajectoryDataList.size());

// 模拟大批量MongoDB轨迹数据存储操作
try {
Thread.sleep(500);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}

return "大批量MongoDB轨迹数据存储完成,数量: " + trajectoryDataList.size();
}

/**
* Schema Free轨迹数据存储示例
* @param rawData 原始数据
* @return 处理结果
*/
@MongoDBTrajectory(objectId = "schema_free_object", operationType = "STORE",
enableMongoDBTrajectoryStorage = true, enableSchemaFree = true,
enableTimeSeriesOptimization = true, enableGeospatialIndex = true,
message = "Schema Free轨迹数据存储:操作失败")
public String schemaFreeTrajectoryStorage(Map<String, Object> rawData) {
logger.info("执行Schema Free轨迹数据存储示例");

// 模拟Schema Free轨迹数据存储操作
try {
Thread.sleep(150);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}

return "Schema Free轨迹数据存储完成";
}
}

6.2 MongoDB轨迹数据测试控制器

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
/**
* MongoDB轨迹数据测试控制器
* @author 运维实战
*/
@RestController
@RequestMapping("/api/mongodb/trajectory/test")
public class MongoDBTrajectoryTestController {

@Autowired
private MongoDBTrajectoryExampleService exampleService;

@Autowired
private MongoDBTrajectoryService mongoDBTrajectoryService;

@Autowired
private MongoDBTrajectoryMonitorService mongoDBTrajectoryMonitorService;

private static final Logger logger = LoggerFactory.getLogger(MongoDBTrajectoryTestController.class);

/**
* 基础MongoDB轨迹数据存储测试
* @param objectId 对象ID
* @return 测试结果
*/
@GetMapping("/basic")
public ResponseEntity<Map<String, String>> testBasicMongoDBTrajectoryStorage(@RequestParam String objectId) {
try {
// 生成测试轨迹数据
TrajectoryData trajectoryData = generateTestTrajectoryData(objectId);

String result = exampleService.basicMongoDBTrajectoryStorage(trajectoryData);

Map<String, String> response = new HashMap<>();
response.put("status", "SUCCESS");
response.put("result", result);
response.put("timestamp", String.valueOf(System.currentTimeMillis()));

return ResponseEntity.ok(response);

} catch (MongoDBTrajectoryException e) {
logger.warn("基础MongoDB轨迹数据存储测试失败: {}", e.getMessage());
return ResponseEntity.status(e.getStatusCode()).build();
} catch (Exception e) {
logger.error("基础MongoDB轨迹数据存储测试失败", e);
return ResponseEntity.status(HttpStatus.INTERNAL_SERVER_ERROR).build();
}
}

/**
* 大批量MongoDB轨迹数据存储测试
* @param objectId 对象ID
* @param count 数量
* @return 测试结果
*/
@GetMapping("/large")
public ResponseEntity<Map<String, String>> testLargeMongoDBTrajectoryStorage(
@RequestParam String objectId, @RequestParam int count) {
try {
// 生成测试轨迹数据列表
List<TrajectoryData> trajectoryDataList = generateTestTrajectoryDataList(objectId, count);

String result = exampleService.largeMongoDBTrajectoryStorage(trajectoryDataList);

Map<String, String> response = new HashMap<>();
response.put("status", "SUCCESS");
response.put("result", result);
response.put("timestamp", String.valueOf(System.currentTimeMillis()));

return ResponseEntity.ok(response);

} catch (MongoDBTrajectoryException e) {
logger.warn("大批量MongoDB轨迹数据存储测试失败: {}", e.getMessage());
return ResponseEntity.status(e.getStatusCode()).build();
} catch (Exception e) {
logger.error("大批量MongoDB轨迹数据存储测试失败", e);
return ResponseEntity.status(HttpStatus.INTERNAL_SERVER_ERROR).build();
}
}

/**
* Schema Free轨迹数据存储测试
* @return 测试结果
*/
@GetMapping("/schema-free")
public ResponseEntity<Map<String, String>> testSchemaFreeTrajectoryStorage() {
try {
// 生成测试原始数据
Map<String, Object> rawData = generateTestRawData();

String result = exampleService.schemaFreeTrajectoryStorage(rawData);

Map<String, String> response = new HashMap<>();
response.put("status", "SUCCESS");
response.put("result", result);
response.put("timestamp", String.valueOf(System.currentTimeMillis()));

return ResponseEntity.ok(response);

} catch (MongoDBTrajectoryException e) {
logger.warn("Schema Free轨迹数据存储测试失败: {}", e.getMessage());
return ResponseEntity.status(e.getStatusCode()).build();
} catch (Exception e) {
logger.error("Schema Free轨迹数据存储测试失败", e);
return ResponseEntity.status(HttpStatus.INTERNAL_SERVER_ERROR).build();
}
}

/**
* 获取MongoDB轨迹数据监控指标
* @return MongoDB轨迹数据监控指标
*/
@GetMapping("/metrics")
public ResponseEntity<MongoDBTrajectoryMetrics> getMongoDBTrajectoryMetrics() {
try {
MongoDBTrajectoryMetrics metrics = mongoDBTrajectoryMonitorService.getMetrics();
return ResponseEntity.ok(metrics);
} catch (Exception e) {
logger.error("获取MongoDB轨迹数据监控指标失败", e);
return ResponseEntity.status(HttpStatus.INTERNAL_SERVER_ERROR).build();
}
}

/**
* 生成测试轨迹数据
* @param objectId 对象ID
* @return 测试轨迹数据
*/
private TrajectoryData generateTestTrajectoryData(String objectId) {
TrajectoryData trajectoryData = new TrajectoryData();
trajectoryData.setObjectId(objectId);
trajectoryData.setObjectType("vehicle");
trajectoryData.setLongitude(116.3974 + Math.random() * 0.01); // 北京经度范围
trajectoryData.setLatitude(39.9093 + Math.random() * 0.01); // 北京纬度范围
trajectoryData.setAltitude(50.0 + Math.random() * 100);
trajectoryData.setSpeed(Math.random() * 80);
trajectoryData.setDirection(Math.random() * 360);
trajectoryData.setTimestamp(System.currentTimeMillis());
trajectoryData.setSatelliteCount(8 + (int)(Math.random() * 4));
trajectoryData.setDataQuality(80 + (int)(Math.random() * 20));

// 添加原始数据
trajectoryData.addRawData("device_id", "device_" + Math.random());
trajectoryData.addRawData("signal_strength", Math.random() * 100);

// 添加扩展属性
trajectoryData.addExtendedProperty("weather", "sunny");
trajectoryData.addExtendedProperty("road_type", "highway");

return trajectoryData;
}

/**
* 生成测试轨迹数据列表
* @param objectId 对象ID
* @param count 数量
* @return 测试轨迹数据列表
*/
private List<TrajectoryData> generateTestTrajectoryDataList(String objectId, int count) {
List<TrajectoryData> trajectoryDataList = new ArrayList<>();
for (int i = 0; i < count; i++) {
TrajectoryData trajectoryData = generateTestTrajectoryData(objectId + "_" + i);
trajectoryDataList.add(trajectoryData);
}
return trajectoryDataList;
}

/**
* 生成测试原始数据
* @return 测试原始数据
*/
private Map<String, Object> generateTestRawData() {
Map<String, Object> rawData = new HashMap<>();
rawData.put("object_id", "raw_object_" + Math.random());
rawData.put("timestamp", System.currentTimeMillis());
rawData.put("longitude", 116.3974 + Math.random() * 0.01);
rawData.put("latitude", 39.9093 + Math.random() * 0.01);
rawData.put("speed", Math.random() * 80);
rawData.put("direction", Math.random() * 360);
rawData.put("raw_data", "custom_field_" + Math.random());
rawData.put("device_info", Map.of("device_id", "device_" + Math.random(), "version", "1.0"));
return rawData;
}
}

7. 总结

7.1 MongoDB存储原始轨迹数据最佳实践

  1. 合理设计数据结构: 利用Schema Free特性设计灵活的轨迹数据结构
  2. 优化时间序列存储: 使用时间序列索引优化时间相关查询
  3. 创建地理索引: 创建2dsphere索引支持地理范围查询
  4. 监控轨迹数据存储: 实时监控轨迹数据存储和查询性能
  5. 异常处理: 实现完善的异常处理和用户友好提示

7.2 性能优化建议

  • 索引优化: 创建合适的索引提升查询性能
  • 批量操作: 使用批量插入和更新提升存储性能
  • 分片策略: 根据业务需求设计分片策略
  • 压缩优化: 使用数据压缩减少存储空间
  • 查询优化: 优化查询语句和聚合管道

7.3 运维管理要点

  • 实时监控: 监控轨迹数据存储和查询性能
  • 动态调整: 根据负载情况动态调整MongoDB配置
  • 异常处理: 建立异常处理和告警机制
  • 日志管理: 完善日志记录和分析
  • 性能调优: 根据监控数据优化MongoDB性能

通过本文的MongoDB存储原始轨迹数据(Schema Free+时间序列数据友好)Java实战指南,您可以掌握MongoDB轨迹数据存储的原理、实现方法、性能优化技巧以及在企业级应用中的最佳实践,构建高效、灵活的轨迹数据存储系统!