前言

Cassandra微服务跨库查询作为NoSQL分布式系统中的重要挑战,直接影响着数据一致性和查询性能。通过合理的Cassandra跨库查询策略和数据聚合方案,能够在不破坏微服务边界的前提下实现复杂的数据查询需求,确保系统的稳定运行。本文从Cassandra跨库查询策略到数据聚合方案,从基础实现到企业级应用,系统梳理Cassandra微服务跨库查询的完整解决方案。

一、Cassandra微服务跨库查询架构设计

1.1 Cassandra微服务跨库查询整体架构

1.2 Cassandra跨库查询策略架构

二、Cassandra跨库查询策略实现

2.1 Cassandra多Keyspace查询策略

2.1.1 Cassandra多Keyspace查询服务

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
/**
* Cassandra多Keyspace查询服务
*/
@Service
public class CassandraMultiKeyspaceQueryService {

@Autowired
private CassandraTemplate cassandraTemplate;

@Autowired
private RedisTemplate<String, Object> redisTemplate;

private final String CASSANDRA_QUERY_CACHE_PREFIX = "cassandra_query_cache:";
private final long CASSANDRA_QUERY_CACHE_EXPIRE = 1800; // 30分钟

/**
* 跨Keyspace查询用户订单信息
*/
public UserOrderQueryResult queryUserOrderInfo(Long userId) {
try {
// 从缓存获取
String cacheKey = CASSANDRA_QUERY_CACHE_PREFIX + "user_order:" + userId;
UserOrderQueryResult cachedResult = (UserOrderQueryResult) redisTemplate.opsForValue().get(cacheKey);

if (cachedResult != null) {
return cachedResult;
}

// 1. 查询用户信息
UserInfo userInfo = queryUserInfo(userId);

// 2. 查询用户订单信息
List<OrderInfo> orders = queryUserOrders(userId);

// 3. 查询订单商品信息
List<ProductInfo> products = queryOrderProducts(orders);

// 4. 查询支付信息
List<PaymentInfo> payments = queryOrderPayments(orders);

// 5. 聚合数据
UserOrderQueryResult result = aggregateUserOrderData(userInfo, orders, products, payments);

// 缓存结果
redisTemplate.opsForValue().set(cacheKey, result, Duration.ofSeconds(CASSANDRA_QUERY_CACHE_EXPIRE));

return result;

} catch (Exception e) {
log.error("跨Keyspace查询用户订单信息失败", e);
throw new CassandraCrossDatabaseQueryException("跨Keyspace查询用户订单信息失败", e);
}
}

/**
* 查询用户信息
*/
private UserInfo queryUserInfo(Long userId) {
try {
String cql = "SELECT * FROM users.users WHERE user_id = ?";
UserInfo userInfo = cassandraTemplate.selectOne(cql, UserInfo.class, userId);
return userInfo;
} catch (Exception e) {
log.error("查询用户信息失败", e);
throw new CassandraCrossDatabaseQueryException("查询用户信息失败", e);
}
}

/**
* 查询用户订单信息
*/
private List<OrderInfo> queryUserOrders(Long userId) {
try {
String cql = "SELECT * FROM orders.orders WHERE user_id = ?";
List<OrderInfo> orders = cassandraTemplate.select(cql, OrderInfo.class, userId);
return orders;
} catch (Exception e) {
log.error("查询用户订单信息失败", e);
throw new CassandraCrossDatabaseQueryException("查询用户订单信息失败", e);
}
}

/**
* 查询订单商品信息
*/
private List<ProductInfo> queryOrderProducts(List<OrderInfo> orders) {
try {
if (orders.isEmpty()) {
return new ArrayList<>();
}

// 提取商品ID
List<Long> productIds = orders.stream()
.map(OrderInfo::getProductId)
.distinct()
.collect(Collectors.toList());

// 批量查询商品信息
String cql = "SELECT * FROM products.products WHERE product_id IN ?";
List<ProductInfo> products = cassandraTemplate.select(cql, ProductInfo.class, productIds);
return products;

} catch (Exception e) {
log.error("查询订单商品信息失败", e);
throw new CassandraCrossDatabaseQueryException("查询订单商品信息失败", e);
}
}

/**
* 查询订单支付信息
*/
private List<PaymentInfo> queryOrderPayments(List<OrderInfo> orders) {
try {
if (orders.isEmpty()) {
return new ArrayList<>();
}

// 提取订单ID
List<Long> orderIds = orders.stream()
.map(OrderInfo::getOrderId)
.distinct()
.collect(Collectors.toList());

// 批量查询支付信息
String cql = "SELECT * FROM payments.payments WHERE order_id IN ?";
List<PaymentInfo> payments = cassandraTemplate.select(cql, PaymentInfo.class, orderIds);
return payments;

} catch (Exception e) {
log.error("查询订单支付信息失败", e);
throw new CassandraCrossDatabaseQueryException("查询订单支付信息失败", e);
}
}

/**
* 聚合用户订单数据
*/
private UserOrderQueryResult aggregateUserOrderData(UserInfo userInfo, List<OrderInfo> orders,
List<ProductInfo> products, List<PaymentInfo> payments) {
try {
UserOrderQueryResult result = new UserOrderQueryResult();
result.setUserId(userInfo.getUserId());
result.setUserName(userInfo.getUserName());
result.setUserEmail(userInfo.getUserEmail());

// 聚合订单信息
result.setTotalOrders(orders.size());
result.setTotalOrderAmount(orders.stream()
.mapToDouble(OrderInfo::getAmount)
.sum());

// 聚合商品信息
result.setTotalProducts(products.size());
result.setTotalProductAmount(products.stream()
.mapToDouble(ProductInfo::getPrice)
.sum());

// 聚合支付信息
result.setTotalPayments(payments.size());
result.setTotalPaymentAmount(payments.stream()
.mapToDouble(PaymentInfo::getAmount)
.sum());

// 计算统计信息
calculateUserOrderStatistics(result, orders, products, payments);

return result;

} catch (Exception e) {
log.error("聚合用户订单数据失败", e);
throw new CassandraCrossDatabaseQueryException("聚合用户订单数据失败", e);
}
}

/**
* 计算用户订单统计信息
*/
private void calculateUserOrderStatistics(UserOrderQueryResult result, List<OrderInfo> orders,
List<ProductInfo> products, List<PaymentInfo> payments) {
try {
UserOrderStatistics statistics = new UserOrderStatistics();

// 计算订单统计
statistics.setOrderCount(orders.size());
statistics.setOrderAmount(result.getTotalOrderAmount());
statistics.setAvgOrderAmount(orders.size() > 0 ?
result.getTotalOrderAmount() / orders.size() : 0.0);

// 计算商品统计
statistics.setProductCount(products.size());
statistics.setProductAmount(result.getTotalProductAmount());

// 计算支付统计
statistics.setPaymentCount(payments.size());
statistics.setPaymentAmount(result.getTotalPaymentAmount());

// 计算综合统计
statistics.setTotalAmount(result.getTotalOrderAmount() + result.getTotalProductAmount());

result.setStatistics(statistics);

} catch (Exception e) {
log.error("计算用户订单统计信息失败", e);
}
}

/**
* 并行查询多个Keyspace
*/
public CompletableFuture<MultiKeyspaceQueryResult> queryMultipleKeyspacesAsync(
MultiKeyspaceQueryRequest request) {
try {
CompletableFuture<UserInfo> userFuture = CompletableFuture.supplyAsync(() -> {
return queryUserInfo(request.getUserId());
});

CompletableFuture<List<OrderInfo>> ordersFuture = CompletableFuture.supplyAsync(() -> {
return queryUserOrders(request.getUserId());
});

CompletableFuture<List<ProductInfo>> productsFuture = CompletableFuture.supplyAsync(() -> {
return queryUserProducts(request.getUserId());
});

CompletableFuture<List<PaymentInfo>> paymentsFuture = CompletableFuture.supplyAsync(() -> {
return queryUserPayments(request.getUserId());
});

// 等待所有查询完成
CompletableFuture<Void> allFutures = CompletableFuture.allOf(
userFuture, ordersFuture, productsFuture, paymentsFuture);

return allFutures.thenApply(v -> {
MultiKeyspaceQueryResult result = new MultiKeyspaceQueryResult();
try {
result.setUserInfo(userFuture.get());
result.setOrders(ordersFuture.get());
result.setProducts(productsFuture.get());
result.setPayments(paymentsFuture.get());
result.setStatus(QueryStatus.SUCCESS);
} catch (Exception e) {
result.setStatus(QueryStatus.FAILED);
result.setErrorMessage(e.getMessage());
}
return result;
});

} catch (Exception e) {
log.error("并行查询多个Keyspace失败", e);
CompletableFuture<MultiKeyspaceQueryResult> failedFuture = new CompletableFuture<>();
failedFuture.completeExceptionally(e);
return failedFuture;
}
}

/**
* 查询用户商品信息
*/
private List<ProductInfo> queryUserProducts(Long userId) {
try {
String cql = "SELECT * FROM products.products WHERE user_id = ?";
List<ProductInfo> products = cassandraTemplate.select(cql, ProductInfo.class, userId);
return products;
} catch (Exception e) {
log.error("查询用户商品信息失败", e);
throw new CassandraCrossDatabaseQueryException("查询用户商品信息失败", e);
}
}

/**
* 查询用户支付信息
*/
private List<PaymentInfo> queryUserPayments(Long userId) {
try {
String cql = "SELECT * FROM payments.payments WHERE user_id = ?";
List<PaymentInfo> payments = cassandraTemplate.select(cql, PaymentInfo.class, userId);
return payments;
} catch (Exception e) {
log.error("查询用户支付信息失败", e);
throw new CassandraCrossDatabaseQueryException("查询用户支付信息失败", e);
}
}
}

2.2 Cassandra聚合查询策略

2.2.1 Cassandra聚合查询服务

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
/**
* Cassandra聚合查询服务
*/
@Service
public class CassandraAggregationQueryService {

@Autowired
private CassandraTemplate cassandraTemplate;

@Autowired
private RedisTemplate<String, Object> redisTemplate;

private final String CASSANDRA_AGGREGATION_CACHE_PREFIX = "cassandra_aggregation_cache:";
private final long CASSANDRA_AGGREGATION_CACHE_EXPIRE = 1800; // 30分钟

/**
* 聚合查询用户订单统计
*/
public UserOrderAggregationResult aggregateUserOrderStatistics(Long userId) {
try {
// 从缓存获取
String cacheKey = CASSANDRA_AGGREGATION_CACHE_PREFIX + "user_order_stats:" + userId;
UserOrderAggregationResult cachedResult = (UserOrderAggregationResult) redisTemplate.opsForValue().get(cacheKey);

if (cachedResult != null) {
return cachedResult;
}

// 1. 查询订单统计
OrderStatistics orderStats = queryOrderStatistics(userId);

// 2. 查询商品统计
ProductStatistics productStats = queryProductStatistics(userId);

// 3. 查询支付统计
PaymentStatistics paymentStats = queryPaymentStatistics(userId);

// 4. 聚合数据
UserOrderAggregationResult result = aggregateStatistics(userId, orderStats, productStats, paymentStats);

// 缓存结果
redisTemplate.opsForValue().set(cacheKey, result, Duration.ofSeconds(CASSANDRA_AGGREGATION_CACHE_EXPIRE));

return result;

} catch (Exception e) {
log.error("聚合查询用户订单统计失败", e);
throw new CassandraCrossDatabaseQueryException("聚合查询用户订单统计失败", e);
}
}

/**
* 查询订单统计
*/
private OrderStatistics queryOrderStatistics(Long userId) {
try {
// 查询订单总数
String countCql = "SELECT COUNT(*) FROM orders.orders WHERE user_id = ?";
Long totalOrders = cassandraTemplate.selectOne(countCql, Long.class, userId);

// 查询订单金额统计
String amountCql = "SELECT SUM(amount) as total_amount, AVG(amount) as avg_amount, " +
"MIN(amount) as min_amount, MAX(amount) as max_amount FROM orders.orders WHERE user_id = ?";
Map<String, Object> amountStats = cassandraTemplate.selectOne(amountCql, Map.class, userId);

// 查询订单状态统计
String statusCql = "SELECT status, COUNT(*) as count FROM orders.orders WHERE user_id = ? GROUP BY status";
List<Map<String, Object>> statusStats = cassandraTemplate.select(statusCql, Map.class, userId);

// 构建订单统计
OrderStatistics orderStats = new OrderStatistics();
orderStats.setUserId(userId);
orderStats.setTotalOrders(totalOrders);
orderStats.setTotalAmount((Double) amountStats.get("total_amount"));
orderStats.setAvgAmount((Double) amountStats.get("avg_amount"));
orderStats.setMinAmount((Double) amountStats.get("min_amount"));
orderStats.setMaxAmount((Double) amountStats.get("max_amount"));

// 构建状态统计
Map<String, Long> statusCount = new HashMap<>();
for (Map<String, Object> statusStat : statusStats) {
statusCount.put((String) statusStat.get("status"), (Long) statusStat.get("count"));
}
orderStats.setStatusCount(statusCount);

return orderStats;

} catch (Exception e) {
log.error("查询订单统计失败", e);
throw new CassandraCrossDatabaseQueryException("查询订单统计失败", e);
}
}

/**
* 查询商品统计
*/
private ProductStatistics queryProductStatistics(Long userId) {
try {
// 查询商品总数
String countCql = "SELECT COUNT(*) FROM products.products WHERE user_id = ?";
Long totalProducts = cassandraTemplate.selectOne(countCql, Long.class, userId);

// 查询商品价格统计
String priceCql = "SELECT SUM(price) as total_price, AVG(price) as avg_price, " +
"MIN(price) as min_price, MAX(price) as max_price FROM products.products WHERE user_id = ?";
Map<String, Object> priceStats = cassandraTemplate.selectOne(priceCql, Map.class, userId);

// 查询商品分类统计
String categoryCql = "SELECT category, COUNT(*) as count FROM products.products WHERE user_id = ? GROUP BY category";
List<Map<String, Object>> categoryStats = cassandraTemplate.select(categoryCql, Map.class, userId);

// 构建商品统计
ProductStatistics productStats = new ProductStatistics();
productStats.setUserId(userId);
productStats.setTotalProducts(totalProducts);
productStats.setTotalPrice((Double) priceStats.get("total_price"));
productStats.setAvgPrice((Double) priceStats.get("avg_price"));
productStats.setMinPrice((Double) priceStats.get("min_price"));
productStats.setMaxPrice((Double) priceStats.get("max_price"));

// 构建分类统计
Map<String, Long> categoryCount = new HashMap<>();
for (Map<String, Object> categoryStat : categoryStats) {
categoryCount.put((String) categoryStat.get("category"), (Long) categoryStat.get("count"));
}
productStats.setCategoryCount(categoryCount);

return productStats;

} catch (Exception e) {
log.error("查询商品统计失败", e);
throw new CassandraCrossDatabaseQueryException("查询商品统计失败", e);
}
}

/**
* 查询支付统计
*/
private PaymentStatistics queryPaymentStatistics(Long userId) {
try {
// 查询支付总数
String countCql = "SELECT COUNT(*) FROM payments.payments WHERE user_id = ?";
Long totalPayments = cassandraTemplate.selectOne(countCql, Long.class, userId);

// 查询支付金额统计
String amountCql = "SELECT SUM(amount) as total_amount, AVG(amount) as avg_amount, " +
"MIN(amount) as min_amount, MAX(amount) as max_amount FROM payments.payments WHERE user_id = ?";
Map<String, Object> amountStats = cassandraTemplate.selectOne(amountCql, Map.class, userId);

// 查询支付方式统计
String methodCql = "SELECT payment_method, COUNT(*) as count FROM payments.payments WHERE user_id = ? GROUP BY payment_method";
List<Map<String, Object>> methodStats = cassandraTemplate.select(methodCql, Map.class, userId);

// 构建支付统计
PaymentStatistics paymentStats = new PaymentStatistics();
paymentStats.setUserId(userId);
paymentStats.setTotalPayments(totalPayments);
paymentStats.setTotalAmount((Double) amountStats.get("total_amount"));
paymentStats.setAvgAmount((Double) amountStats.get("avg_amount"));
paymentStats.setMinAmount((Double) amountStats.get("min_amount"));
paymentStats.setMaxAmount((Double) amountStats.get("max_amount"));

// 构建支付方式统计
Map<String, Long> methodCount = new HashMap<>();
for (Map<String, Object> methodStat : methodStats) {
methodCount.put((String) methodStat.get("payment_method"), (Long) methodStat.get("count"));
}
paymentStats.setMethodCount(methodCount);

return paymentStats;

} catch (Exception e) {
log.error("查询支付统计失败", e);
throw new CassandraCrossDatabaseQueryException("查询支付统计失败", e);
}
}

/**
* 聚合统计信息
*/
private UserOrderAggregationResult aggregateStatistics(Long userId, OrderStatistics orderStats,
ProductStatistics productStats, PaymentStatistics paymentStats) {
try {
UserOrderAggregationResult result = new UserOrderAggregationResult();
result.setUserId(userId);

// 聚合订单统计
result.setTotalOrders(orderStats.getTotalOrders());
result.setTotalOrderAmount(orderStats.getTotalAmount());
result.setAvgOrderAmount(orderStats.getAvgAmount());
result.setOrderStatusCount(orderStats.getStatusCount());

// 聚合商品统计
result.setTotalProducts(productStats.getTotalProducts());
result.setTotalProductAmount(productStats.getTotalPrice());
result.setAvgProductPrice(productStats.getAvgPrice());
result.setProductCategoryCount(productStats.getCategoryCount());

// 聚合支付统计
result.setTotalPayments(paymentStats.getTotalPayments());
result.setTotalPaymentAmount(paymentStats.getTotalAmount());
result.setAvgPaymentAmount(paymentStats.getAvgAmount());
result.setPaymentMethodCount(paymentStats.getMethodCount());

// 计算综合统计
calculateComprehensiveStatistics(result);

return result;

} catch (Exception e) {
log.error("聚合统计信息失败", e);
throw new CassandraCrossDatabaseQueryException("聚合统计信息失败", e);
}
}

/**
* 计算综合统计
*/
private void calculateComprehensiveStatistics(UserOrderAggregationResult result) {
try {
ComprehensiveStatistics comprehensiveStats = new ComprehensiveStatistics();

// 计算综合金额
comprehensiveStats.setTotalAmount(result.getTotalOrderAmount() + result.getTotalProductAmount());

// 计算综合数量
comprehensiveStats.setTotalCount(result.getTotalOrders() + result.getTotalProducts() + result.getTotalPayments());

// 计算平均金额
comprehensiveStats.setAvgAmount(comprehensiveStats.getTotalCount() > 0 ?
comprehensiveStats.getTotalAmount() / comprehensiveStats.getTotalCount() : 0.0);

result.setComprehensiveStatistics(comprehensiveStats);

} catch (Exception e) {
log.error("计算综合统计失败", e);
}
}

/**
* 复杂聚合查询
*/
public ComplexAggregationResult executeComplexAggregation(ComplexAggregationRequest request) {
try {
ComplexAggregationResult result = new ComplexAggregationResult();
result.setRequestId(request.getRequestId());
result.setStartTime(new Date());

// 执行复杂聚合查询
List<Map<String, Object>> aggregationResults = new ArrayList<>();

for (AggregationQuery query : request.getQueries()) {
try {
List<Map<String, Object>> queryResults = cassandraTemplate.select(query.getCql(), Map.class, query.getParameters());
aggregationResults.addAll(queryResults);
} catch (Exception e) {
log.error("执行聚合查询失败: {}", query.getCql(), e);
}
}

// 处理聚合结果
result.setAggregationResults(aggregationResults);
result.setTotalCount(aggregationResults.size());
result.setEndTime(new Date());
result.setStatus(AggregationStatus.SUCCESS);

return result;

} catch (Exception e) {
log.error("执行复杂聚合查询失败", e);
throw new CassandraCrossDatabaseQueryException("执行复杂聚合查询失败", e);
}
}
}

2.3 Cassandra事件驱动策略

2.3.1 Cassandra事件驱动服务

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
/**
* Cassandra事件驱动服务
*/
@Service
public class CassandraEventDrivenService {

@Autowired
private RabbitTemplate rabbitTemplate;

@Autowired
private RedisTemplate<String, Object> redisTemplate;

@Autowired
private CassandraAggregationQueryService aggregationQueryService;

private final String CASSANDRA_EVENT_QUEUE_PREFIX = "cassandra_cross_db_query.event.";
private final String CASSANDRA_EVENT_CACHE_PREFIX = "cassandra_event_cache:";

/**
* 发布Cassandra跨库查询事件
*/
public void publishCassandraCrossDatabaseQueryEvent(CassandraCrossDatabaseQueryEvent event) {
try {
// 设置事件ID
if (event.getEventId() == null) {
event.setEventId(UUID.randomUUID().toString());
}

// 设置时间戳
event.setTimestamp(new Date());

// 发布到消息队列
String queueName = CASSANDRA_EVENT_QUEUE_PREFIX + event.getEventType();
rabbitTemplate.convertAndSend(queueName, event);

log.info("发布Cassandra跨库查询事件成功: {}", event.getEventId());

} catch (Exception e) {
log.error("发布Cassandra跨库查询事件失败", e);
throw new CassandraCrossDatabaseQueryException("发布Cassandra跨库查询事件失败", e);
}
}

/**
* 处理用户信息更新事件
*/
@RabbitListener(queues = "cassandra_cross_db_query.event.user.updated")
public void handleUserUpdatedEvent(CassandraUserUpdatedEvent event) {
try {
log.info("处理Cassandra用户信息更新事件: {}", event.getUserId());

// 清除相关缓存
clearUserRelatedCache(event.getUserId());

// 更新聚合数据
updateUserAggregatedData(event.getUserId());

// 发布数据更新事件
CassandraDataUpdatedEvent dataUpdatedEvent = new CassandraDataUpdatedEvent();
dataUpdatedEvent.setEventId(UUID.randomUUID().toString());
dataUpdatedEvent.setEventType("CASSANDRA_USER_DATA_UPDATED");
dataUpdatedEvent.setUserId(event.getUserId());
dataUpdatedEvent.setTimestamp(new Date());

publishCassandraCrossDatabaseQueryEvent(dataUpdatedEvent);

} catch (Exception e) {
log.error("处理Cassandra用户信息更新事件失败", e);
}
}

/**
* 处理订单创建事件
*/
@RabbitListener(queues = "cassandra_cross_db_query.event.order.created")
public void handleOrderCreatedEvent(CassandraOrderCreatedEvent event) {
try {
log.info("处理Cassandra订单创建事件: {}", event.getOrderId());

// 清除相关缓存
clearOrderRelatedCache(event.getOrderId());

// 更新用户聚合数据
updateUserAggregatedData(event.getUserId());

// 发布数据更新事件
CassandraDataUpdatedEvent dataUpdatedEvent = new CassandraDataUpdatedEvent();
dataUpdatedEvent.setEventId(UUID.randomUUID().toString());
dataUpdatedEvent.setEventType("CASSANDRA_ORDER_DATA_UPDATED");
dataUpdatedEvent.setOrderId(event.getOrderId());
dataUpdatedEvent.setUserId(event.getUserId());
dataUpdatedEvent.setTimestamp(new Date());

publishCassandraCrossDatabaseQueryEvent(dataUpdatedEvent);

} catch (Exception e) {
log.error("处理Cassandra订单创建事件失败", e);
}
}

/**
* 处理支付完成事件
*/
@RabbitListener(queues = "cassandra_cross_db_query.event.payment.completed")
public void handlePaymentCompletedEvent(CassandraPaymentCompletedEvent event) {
try {
log.info("处理Cassandra支付完成事件: {}", event.getPaymentId());

// 清除相关缓存
clearPaymentRelatedCache(event.getPaymentId());

// 更新订单聚合数据
updateOrderAggregatedData(event.getOrderId());

// 更新用户聚合数据
updateUserAggregatedData(event.getUserId());

// 发布数据更新事件
CassandraDataUpdatedEvent dataUpdatedEvent = new CassandraDataUpdatedEvent();
dataUpdatedEvent.setEventId(UUID.randomUUID().toString());
dataUpdatedEvent.setEventType("CASSANDRA_PAYMENT_DATA_UPDATED");
dataUpdatedEvent.setPaymentId(event.getPaymentId());
dataUpdatedEvent.setOrderId(event.getOrderId());
dataUpdatedEvent.setUserId(event.getUserId());
dataUpdatedEvent.setTimestamp(new Date());

publishCassandraCrossDatabaseQueryEvent(dataUpdatedEvent);

} catch (Exception e) {
log.error("处理Cassandra支付完成事件失败", e);
}
}

/**
* 清除用户相关缓存
*/
private void clearUserRelatedCache(Long userId) {
try {
String pattern = CASSANDRA_EVENT_CACHE_PREFIX + "user_*:" + userId;
Set<String> keys = redisTemplate.keys(pattern);

if (keys != null && !keys.isEmpty()) {
redisTemplate.delete(keys);
}

log.info("清除Cassandra用户相关缓存成功: {}", userId);

} catch (Exception e) {
log.error("清除Cassandra用户相关缓存失败", e);
}
}

/**
* 清除订单相关缓存
*/
private void clearOrderRelatedCache(Long orderId) {
try {
String pattern = CASSANDRA_EVENT_CACHE_PREFIX + "order_*:" + orderId;
Set<String> keys = redisTemplate.keys(pattern);

if (keys != null && !keys.isEmpty()) {
redisTemplate.delete(keys);
}

log.info("清除Cassandra订单相关缓存成功: {}", orderId);

} catch (Exception e) {
log.error("清除Cassandra订单相关缓存失败", e);
}
}

/**
* 清除支付相关缓存
*/
private void clearPaymentRelatedCache(Long paymentId) {
try {
String pattern = CASSANDRA_EVENT_CACHE_PREFIX + "payment_*:" + paymentId;
Set<String> keys = redisTemplate.keys(pattern);

if (keys != null && !keys.isEmpty()) {
redisTemplate.delete(keys);
}

log.info("清除Cassandra支付相关缓存成功: {}", paymentId);

} catch (Exception e) {
log.error("清除Cassandra支付相关缓存失败", e);
}
}

/**
* 更新用户聚合数据
*/
private void updateUserAggregatedData(Long userId) {
try {
// 异步更新用户聚合数据
CompletableFuture.runAsync(() -> {
try {
aggregationQueryService.aggregateUserOrderStatistics(userId);
log.info("更新Cassandra用户聚合数据成功: {}", userId);
} catch (Exception e) {
log.error("更新Cassandra用户聚合数据失败: {}", userId, e);
}
});

} catch (Exception e) {
log.error("更新Cassandra用户聚合数据失败", e);
}
}

/**
* 更新订单聚合数据
*/
private void updateOrderAggregatedData(Long orderId) {
try {
// 异步更新订单聚合数据
CompletableFuture.runAsync(() -> {
try {
// 这里可以添加订单聚合数据更新逻辑
log.info("更新Cassandra订单聚合数据成功: {}", orderId);
} catch (Exception e) {
log.error("更新Cassandra订单聚合数据失败: {}", orderId, e);
}
});

} catch (Exception e) {
log.error("更新Cassandra订单聚合数据失败", e);
}
}
}

三、企业级Cassandra微服务数据查询方案

3.1 Cassandra微服务数据查询管理服务

3.1.1 Cassandra微服务数据查询管理服务

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
/**
* Cassandra微服务数据查询管理服务
*/
@Service
public class CassandraMicroserviceDataQueryManagementService {

@Autowired
private CassandraMultiKeyspaceQueryService multiKeyspaceQueryService;

@Autowired
private CassandraAggregationQueryService aggregationQueryService;

@Autowired
private CassandraEventDrivenService eventDrivenService;

@Autowired
private RedisTemplate<String, Object> redisTemplate;

private final String CASSANDRA_QUERY_CACHE_PREFIX = "cassandra_query_cache:";
private final long CASSANDRA_QUERY_CACHE_EXPIRE = 1800; // 30分钟

/**
* 执行Cassandra跨库查询
*/
public CassandraCrossDatabaseQueryResult executeCassandraCrossDatabaseQuery(CassandraCrossDatabaseQueryRequest request) {
try {
CassandraCrossDatabaseQueryResult result = new CassandraCrossDatabaseQueryResult();
result.setRequestId(request.getRequestId());
result.setStartTime(new Date());

// 验证请求
validateCassandraQueryRequest(request);

// 选择查询策略
CassandraQueryStrategy strategy = selectCassandraQueryStrategy(request);
result.setStrategy(strategy);

// 执行查询
switch (strategy) {
case MULTI_KEYSPACE_QUERY:
result = executeMultiKeyspaceQuery(request);
break;
case AGGREGATION_QUERY:
result = executeAggregationQuery(request);
break;
case EVENT_DRIVEN:
result = executeEventDrivenQuery(request);
break;
case HYBRID:
result = executeHybridQuery(request);
break;
default:
throw new IllegalArgumentException("不支持的Cassandra查询策略: " + strategy);
}

result.setStatus(CassandraQueryStatus.SUCCESS);
result.setEndTime(new Date());

log.info("执行Cassandra跨库查询成功: 请求ID={}, 策略={}", request.getRequestId(), strategy);
return result;

} catch (Exception e) {
log.error("执行Cassandra跨库查询失败", e);
throw new CassandraCrossDatabaseQueryException("执行Cassandra跨库查询失败", e);
}
}

/**
* 验证Cassandra查询请求
*/
private void validateCassandraQueryRequest(CassandraCrossDatabaseQueryRequest request) {
if (request.getRequestId() == null || request.getRequestId().trim().isEmpty()) {
throw new IllegalArgumentException("请求ID不能为空");
}

if (request.getQueryType() == null) {
throw new IllegalArgumentException("查询类型不能为空");
}

if (request.getParameters() == null) {
request.setParameters(new HashMap<>());
}
}

/**
* 选择Cassandra查询策略
*/
private CassandraQueryStrategy selectCassandraQueryStrategy(CassandraCrossDatabaseQueryRequest request) {
try {
// 根据查询类型选择策略
switch (request.getQueryType()) {
case USER_ORDER_QUERY:
return CassandraQueryStrategy.MULTI_KEYSPACE_QUERY;
case USER_ORDER_AGGREGATION:
return CassandraQueryStrategy.AGGREGATION_QUERY;
case COMPLEX_AGGREGATION:
return CassandraQueryStrategy.AGGREGATION_QUERY;
case SEARCH_RESULTS:
return CassandraQueryStrategy.HYBRID;
default:
return CassandraQueryStrategy.MULTI_KEYSPACE_QUERY;
}

} catch (Exception e) {
log.error("选择Cassandra查询策略失败", e);
return CassandraQueryStrategy.MULTI_KEYSPACE_QUERY; // 默认策略
}
}

/**
* 执行多Keyspace查询
*/
private CassandraCrossDatabaseQueryResult executeMultiKeyspaceQuery(CassandraCrossDatabaseQueryRequest request) {
try {
CassandraCrossDatabaseQueryResult result = new CassandraCrossDatabaseQueryResult();
result.setRequestId(request.getRequestId());
result.setStrategy(CassandraQueryStrategy.MULTI_KEYSPACE_QUERY);

// 根据查询类型执行不同的多Keyspace查询
switch (request.getQueryType()) {
case USER_ORDER_QUERY:
Long userId = Long.valueOf(request.getParameters().get("userId").toString());
UserOrderQueryResult userOrderQuery = multiKeyspaceQueryService.queryUserOrderInfo(userId);
result.setData(userOrderQuery);
break;

default:
throw new IllegalArgumentException("不支持的多Keyspace查询类型: " + request.getQueryType());
}

return result;

} catch (Exception e) {
log.error("执行多Keyspace查询失败", e);
throw new CassandraCrossDatabaseQueryException("执行多Keyspace查询失败", e);
}
}

/**
* 执行聚合查询
*/
private CassandraCrossDatabaseQueryResult executeAggregationQuery(CassandraCrossDatabaseQueryRequest request) {
try {
CassandraCrossDatabaseQueryResult result = new CassandraCrossDatabaseQueryResult();
result.setRequestId(request.getRequestId());
result.setStrategy(CassandraQueryStrategy.AGGREGATION_QUERY);

// 根据查询类型执行不同的聚合查询
switch (request.getQueryType()) {
case USER_ORDER_AGGREGATION:
Long userId = Long.valueOf(request.getParameters().get("userId").toString());
UserOrderAggregationResult userOrderAggregation = aggregationQueryService.aggregateUserOrderStatistics(userId);
result.setData(userOrderAggregation);
break;

case COMPLEX_AGGREGATION:
ComplexAggregationRequest complexRequest = (ComplexAggregationRequest) request.getParameters().get("complexRequest");
ComplexAggregationResult complexResult = aggregationQueryService.executeComplexAggregation(complexRequest);
result.setData(complexResult);
break;

default:
throw new IllegalArgumentException("不支持的聚合查询类型: " + request.getQueryType());
}

return result;

} catch (Exception e) {
log.error("执行聚合查询失败", e);
throw new CassandraCrossDatabaseQueryException("执行聚合查询失败", e);
}
}

/**
* 执行事件驱动查询
*/
private CassandraCrossDatabaseQueryResult executeEventDrivenQuery(CassandraCrossDatabaseQueryRequest request) {
try {
CassandraCrossDatabaseQueryResult result = new CassandraCrossDatabaseQueryResult();
result.setRequestId(request.getRequestId());
result.setStrategy(CassandraQueryStrategy.EVENT_DRIVEN);

// 发布查询事件
CassandraCrossDatabaseQueryEvent event = new CassandraCrossDatabaseQueryEvent();
event.setEventId(UUID.randomUUID().toString());
event.setEventType("CASSANDRA_CROSS_DATABASE_QUERY");
event.setRequestId(request.getRequestId());
event.setQueryType(request.getQueryType());
event.setParameters(request.getParameters());
event.setTimestamp(new Date());

eventDrivenService.publishCassandraCrossDatabaseQueryEvent(event);

// 等待事件处理完成
Thread.sleep(1000);

result.setData("Cassandra事件已发布,请稍后查询结果");

return result;

} catch (Exception e) {
log.error("执行事件驱动查询失败", e);
throw new CassandraCrossDatabaseQueryException("执行事件驱动查询失败", e);
}
}

/**
* 执行混合查询
*/
private CassandraCrossDatabaseQueryResult executeHybridQuery(CassandraCrossDatabaseQueryRequest request) {
try {
CassandraCrossDatabaseQueryResult result = new CassandraCrossDatabaseQueryResult();
result.setRequestId(request.getRequestId());
result.setStrategy(CassandraQueryStrategy.HYBRID);

// 根据查询类型执行混合查询
switch (request.getQueryType()) {
case SEARCH_RESULTS:
CassandraSearchRequest searchRequest = (CassandraSearchRequest) request.getParameters().get("searchRequest");
CassandraSearchResult searchResult = executeCassandraSearch(searchRequest);
result.setData(searchResult);
break;

default:
throw new IllegalArgumentException("不支持的混合查询类型: " + request.getQueryType());
}

return result;

} catch (Exception e) {
log.error("执行混合查询失败", e);
throw new CassandraCrossDatabaseQueryException("执行混合查询失败", e);
}
}

/**
* 执行Cassandra搜索
*/
private CassandraSearchResult executeCassandraSearch(CassandraSearchRequest request) {
try {
CassandraSearchResult result = new CassandraSearchResult();

// 构建搜索查询
String cql = "SELECT * FROM " + request.getKeyspace() + "." + request.getTable() + " WHERE ";

List<Object> parameters = new ArrayList<>();
List<String> conditions = new ArrayList<>();

if (request.getKeyword() != null && !request.getKeyword().trim().isEmpty()) {
conditions.add("name LIKE ?");
parameters.add("%" + request.getKeyword() + "%");
}

if (request.getCategory() != null) {
conditions.add("category = ?");
parameters.add(request.getCategory());
}

if (request.getPriceMin() != null) {
conditions.add("price >= ?");
parameters.add(request.getPriceMin());
}

if (request.getPriceMax() != null) {
conditions.add("price <= ?");
parameters.add(request.getPriceMax());
}

if (!conditions.isEmpty()) {
cql += String.join(" AND ", conditions);
} else {
cql += "1=1";
}

// 设置分页
cql += " LIMIT " + request.getSize() + " OFFSET " + request.getFrom();

// 执行搜索
List<Map<String, Object>> documents = cassandraTemplate.select(cql, Map.class, parameters.toArray());

// 构建结果
result.setDocuments(documents);
result.setTotalCount(documents.size());
result.setFrom(request.getFrom());
result.setSize(request.getSize());

return result;

} catch (Exception e) {
log.error("执行Cassandra搜索失败", e);
throw new CassandraCrossDatabaseQueryException("执行Cassandra搜索失败", e);
}
}

/**
* 异步执行Cassandra跨库查询
*/
@Async
public CompletableFuture<CassandraCrossDatabaseQueryResult> executeCassandraCrossDatabaseQueryAsync(
CassandraCrossDatabaseQueryRequest request) {
try {
CassandraCrossDatabaseQueryResult result = executeCassandraCrossDatabaseQuery(request);
return CompletableFuture.completedFuture(result);
} catch (Exception e) {
return CompletableFuture.failedFuture(e);
}
}

/**
* 获取Cassandra查询结果
*/
public CassandraCrossDatabaseQueryResult getCassandraQueryResult(String requestId) {
try {
// 从缓存获取
String cacheKey = CASSANDRA_QUERY_CACHE_PREFIX + requestId;
CassandraCrossDatabaseQueryResult cachedResult = (CassandraCrossDatabaseQueryResult) redisTemplate.opsForValue().get(cacheKey);

if (cachedResult != null) {
return cachedResult;
}

// 从数据库获取
throw new CassandraCrossDatabaseQueryException("Cassandra查询结果不存在: " + requestId);

} catch (Exception e) {
log.error("获取Cassandra查询结果失败", e);
throw new CassandraCrossDatabaseQueryException("获取Cassandra查询结果失败", e);
}
}

/**
* 缓存Cassandra查询结果
*/
public void cacheCassandraQueryResult(String requestId, CassandraCrossDatabaseQueryResult result) {
try {
String cacheKey = CASSANDRA_QUERY_CACHE_PREFIX + requestId;
redisTemplate.opsForValue().set(cacheKey, result, Duration.ofSeconds(CASSANDRA_QUERY_CACHE_EXPIRE));

} catch (Exception e) {
log.error("缓存Cassandra查询结果失败", e);
}
}

/**
* 获取Cassandra查询统计信息
*/
public CassandraQueryStatistics getCassandraQueryStatistics(Date startTime, Date endTime) {
try {
CassandraQueryStatistics statistics = new CassandraQueryStatistics();
statistics.setStartTime(startTime);
statistics.setEndTime(endTime);

// 统计查询次数
statistics.setTotalQueries(600); // 实际应用中需要从数据库统计

// 统计查询策略使用情况
Map<CassandraQueryStrategy, Long> strategyCount = new HashMap<>();
strategyCount.put(CassandraQueryStrategy.MULTI_KEYSPACE_QUERY, 250L);
strategyCount.put(CassandraQueryStrategy.AGGREGATION_QUERY, 200L);
strategyCount.put(CassandraQueryStrategy.EVENT_DRIVEN, 50L);
strategyCount.put(CassandraQueryStrategy.HYBRID, 100L);
statistics.setStrategyCount(strategyCount);

// 统计查询类型使用情况
Map<CassandraQueryType, Long> typeCount = new HashMap<>();
typeCount.put(CassandraQueryType.USER_ORDER_QUERY, 200L);
typeCount.put(CassandraQueryType.USER_ORDER_AGGREGATION, 150L);
typeCount.put(CassandraQueryType.COMPLEX_AGGREGATION, 100L);
typeCount.put(CassandraQueryType.SEARCH_RESULTS, 150L);
statistics.setTypeCount(typeCount);

// 统计平均响应时间
statistics.setAverageResponseTime(150.0); // 150ms

// 统计成功率
statistics.setSuccessRate(0.995); // 99.5%

return statistics;

} catch (Exception e) {
log.error("获取Cassandra查询统计信息失败", e);
throw new CassandraCrossDatabaseQueryException("获取Cassandra查询统计信息失败", e);
}
}
}

四、性能优化与监控

4.1 性能优化

4.1.1 Cassandra跨库查询性能优化

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
/**
* Cassandra跨库查询性能优化服务
*/
@Service
public class CassandraCrossDatabaseQueryPerformanceOptimizationService {

@Autowired
private RedisTemplate<String, Object> redisTemplate;

@Autowired
private CaffeineCache localCache;

private final String CASSANDRA_PERFORMANCE_CACHE_PREFIX = "cassandra_performance_cache:";

/**
* 缓存Cassandra查询性能数据
*/
public void cacheCassandraQueryPerformanceData(String requestId, Object data) {
String cacheKey = CASSANDRA_PERFORMANCE_CACHE_PREFIX + requestId;

try {
// 写入本地缓存
localCache.put(cacheKey, data);

// 写入Redis缓存
String redisCacheKey = "redis_cache:" + cacheKey;
redisTemplate.opsForValue().set(redisCacheKey, data, Duration.ofHours(1));

} catch (Exception e) {
log.error("缓存Cassandra查询性能数据失败", e);
}
}

/**
* 获取缓存的Cassandra查询性能数据
*/
public Object getCachedCassandraQueryPerformanceData(String requestId) {
String cacheKey = CASSANDRA_PERFORMANCE_CACHE_PREFIX + requestId;

try {
// 从本地缓存获取
Object cachedData = localCache.getIfPresent(cacheKey);
if (cachedData != null) {
return cachedData;
}

// 从Redis获取
String redisCacheKey = "redis_cache:" + cacheKey;
Object redisData = redisTemplate.opsForValue().get(redisCacheKey);
if (redisData != null) {
localCache.put(cacheKey, redisData);
return redisData;
}

} catch (Exception e) {
log.error("获取缓存的Cassandra查询性能数据失败", e);
}

return null;
}

/**
* 批量处理Cassandra查询请求
*/
public List<CassandraCrossDatabaseQueryResult> batchProcessCassandraQueryRequests(
List<CassandraCrossDatabaseQueryRequest> requests) {
List<CassandraCrossDatabaseQueryResult> results = new ArrayList<>();

try {
// 按查询类型分组
Map<CassandraQueryType, List<CassandraCrossDatabaseQueryRequest>> typeGroups = requests.stream()
.collect(Collectors.groupingBy(CassandraCrossDatabaseQueryRequest::getQueryType));

// 并行处理各类型
typeGroups.entrySet().parallelStream().forEach(entry -> {
CassandraQueryType queryType = entry.getKey();
List<CassandraCrossDatabaseQueryRequest> typeRequests = entry.getValue();

try {
List<CassandraCrossDatabaseQueryResult> typeResults = processCassandraTypeRequests(queryType, typeRequests);

synchronized (results) {
results.addAll(typeResults);
}

} catch (Exception e) {
log.error("处理Cassandra查询类型失败: {}", queryType, e);
}
});

} catch (Exception e) {
log.error("批量处理Cassandra查询请求失败", e);
}

return results;
}

/**
* 处理Cassandra类型请求
*/
private List<CassandraCrossDatabaseQueryResult> processCassandraTypeRequests(CassandraQueryType queryType,
List<CassandraCrossDatabaseQueryRequest> requests) {
List<CassandraCrossDatabaseQueryResult> results = new ArrayList<>();

for (CassandraCrossDatabaseQueryRequest request : requests) {
try {
CassandraCrossDatabaseQueryResult result = processCassandraQueryRequest(request);
results.add(result);
} catch (Exception e) {
log.error("处理Cassandra查询请求失败: {}", request.getRequestId(), e);
CassandraCrossDatabaseQueryResult errorResult = new CassandraCrossDatabaseQueryResult();
errorResult.setRequestId(request.getRequestId());
errorResult.setStatus(CassandraQueryStatus.FAILED);
errorResult.setErrorMessage(e.getMessage());
results.add(errorResult);
}
}

return results;
}

/**
* 处理Cassandra查询请求
*/
private CassandraCrossDatabaseQueryResult processCassandraQueryRequest(CassandraCrossDatabaseQueryRequest request) {
// 实现Cassandra查询处理逻辑
CassandraCrossDatabaseQueryResult result = new CassandraCrossDatabaseQueryResult();
result.setRequestId(request.getRequestId());
result.setStatus(CassandraQueryStatus.SUCCESS);
result.setData("Cassandra查询成功");
return result;
}

/**
* 预热Cassandra查询性能缓存
*/
@PostConstruct
public void warmupCassandraQueryPerformanceCache() {
try {
// 预热常用Cassandra查询性能数据
List<String> commonRequestIds = Arrays.asList("cassandra_req_1", "cassandra_req_2", "cassandra_req_3");

for (String requestId : commonRequestIds) {
try {
String cacheKey = CASSANDRA_PERFORMANCE_CACHE_PREFIX + requestId;
Object performanceData = new Object();
cacheCassandraQueryPerformanceData(requestId, performanceData);
} catch (Exception e) {
log.error("预热Cassandra查询性能缓存失败: {}", requestId, e);
}
}

} catch (Exception e) {
log.error("预热Cassandra查询性能缓存失败", e);
}
}

/**
* 清理过期缓存
*/
@Scheduled(fixedRate = 300000) // 5分钟
public void cleanupExpiredCache() {
try {
// 清理本地缓存
localCache.cleanUp();

// 清理Redis过期缓存
cleanupRedisExpiredCache();

} catch (Exception e) {
log.error("清理过期缓存失败", e);
}
}

/**
* 清理Redis过期缓存
*/
private void cleanupRedisExpiredCache() {
try {
Set<String> cacheKeys = redisTemplate.keys("redis_cache:" + CASSANDRA_PERFORMANCE_CACHE_PREFIX + "*");

for (String key : cacheKeys) {
Long ttl = redisTemplate.getExpire(key);
if (ttl != null && ttl <= 0) {
redisTemplate.delete(key);
}
}

} catch (Exception e) {
log.error("清理Redis过期缓存失败", e);
}
}
}

4.2 监控告警

4.2.1 Cassandra监控指标

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
/**
* Cassandra跨库查询监控指标
*/
@Component
public class CassandraCrossDatabaseQueryMetrics {

private final MeterRegistry meterRegistry;

public CassandraCrossDatabaseQueryMetrics(MeterRegistry meterRegistry) {
this.meterRegistry = meterRegistry;
}

/**
* 记录Cassandra跨库查询次数
*/
public void recordCassandraCrossDatabaseQueryCount(String queryType, String strategy) {
Counter.builder("cassandra_cross_db_query.count")
.description("Cassandra跨库查询次数")
.tag("query_type", queryType)
.tag("strategy", strategy)
.register(meterRegistry)
.increment();
}

/**
* 记录Cassandra跨库查询时间
*/
public void recordCassandraCrossDatabaseQueryTime(String queryType, String strategy, long duration) {
Timer.builder("cassandra_cross_db_query.time")
.description("Cassandra跨库查询时间")
.tag("query_type", queryType)
.tag("strategy", strategy)
.register(meterRegistry)
.record(duration, TimeUnit.MILLISECONDS);
}

/**
* 记录Cassandra跨库查询数据量
*/
public void recordCassandraCrossDatabaseQueryDataCount(String queryType, long count) {
Counter.builder("cassandra_cross_db_query.data.count")
.description("Cassandra跨库查询数据量")
.tag("query_type", queryType)
.register(meterRegistry)
.increment(count);
}

/**
* 记录Cassandra跨库查询成功率
*/
public void recordCassandraCrossDatabaseQuerySuccessRate(String queryType, double successRate) {
Gauge.builder("cassandra_cross_db_query.success.rate")
.description("Cassandra跨库查询成功率")
.tag("query_type", queryType)
.register(meterRegistry, successRate);
}

/**
* 记录Cassandra跨库查询失败率
*/
public void recordCassandraCrossDatabaseQueryFailureRate(String queryType, double failureRate) {
Gauge.builder("cassandra_cross_db_query.failure.rate")
.description("Cassandra跨库查询失败率")
.tag("query_type", queryType)
.register(meterRegistry, failureRate);
}

/**
* 记录Cassandra跨库查询吞吐量
*/
public void recordCassandraCrossDatabaseQueryThroughput(String queryType, double throughput) {
Gauge.builder("cassandra_cross_db_query.throughput")
.description("Cassandra跨库查询吞吐量")
.tag("query_type", queryType)
.register(meterRegistry, throughput);
}

/**
* 记录Cassandra跨库查询异常次数
*/
public void recordCassandraCrossDatabaseQueryExceptionCount(String queryType, String exceptionType) {
Counter.builder("cassandra_cross_db_query.exception.count")
.description("Cassandra跨库查询异常次数")
.tag("query_type", queryType)
.tag("exception_type", exceptionType)
.register(meterRegistry)
.increment();
}
}

4.2.2 Cassandra告警规则

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
# prometheus-rules.yml
groups:
- name: cassandra_cross_db_query_alerts
rules:
- alert: HighCassandraCrossDatabaseQueryTime
expr: cassandra_cross_db_query_time{quantile="0.95"} > 1000
for: 2m
labels:
severity: warning
annotations:
summary: "Cassandra跨库查询时间过长"
description: "Cassandra跨库查询时间P95超过1秒,当前值: {{ $value }}ms"

- alert: HighCassandraCrossDatabaseQueryFailureRate
expr: cassandra_cross_db_query_failure_rate > 0.01
for: 2m
labels:
severity: warning
annotations:
summary: "Cassandra跨库查询失败率过高"
description: "Cassandra跨库查询失败率超过1%,当前值: {{ $value }}"

- alert: LowCassandraCrossDatabaseQueryThroughput
expr: cassandra_cross_db_query_throughput < 100
for: 5m
labels:
severity: warning
annotations:
summary: "Cassandra跨库查询吞吐量过低"
description: "Cassandra跨库查询吞吐量低于100次/秒,当前值: {{ $value }}"

- alert: HighCassandraCrossDatabaseQueryExceptionCount
expr: rate(cassandra_cross_db_query_exception_count[5m]) > 1
for: 2m
labels:
severity: critical
annotations:
summary: "Cassandra跨库查询异常次数过多"
description: "Cassandra跨库查询异常频率超过1次/分钟,当前值: {{ $value }}"

- alert: CassandraCrossDatabaseQueryServiceDown
expr: up{job="cassandra-cross-db-query-service"} == 0
for: 1m
labels:
severity: critical
annotations:
summary: "Cassandra跨库查询服务宕机"
description: "Cassandra跨库查询服务已宕机超过1分钟"

五、总结

Cassandra微服务跨库查询作为NoSQL分布式系统中的重要挑战,通过合理的Cassandra跨库查询策略和数据聚合方案,能够在不破坏微服务边界的前提下实现复杂的数据查询需求。本文从Cassandra跨库查询策略到数据聚合方案,从基础实现到企业级应用,系统梳理了Cassandra微服务跨库查询的完整解决方案。

5.1 关键要点

  1. 多Keyspace查询:通过多Keyspace查询实现跨库数据整合
  2. 聚合查询:通过Cassandra聚合功能实现复杂的数据统计和分析
  3. 事件驱动:通过事件机制实现数据的异步更新和同步
  4. 性能优化:通过缓存、索引优化等手段优化查询性能
  5. 监控告警:建立完善的监控体系,及时发现和处理问题

5.2 最佳实践

  1. 策略选择:根据查询复杂度、数据量、响应时间要求选择合适的Cassandra查询策略
  2. Keyspace设计:合理设计Cassandra Keyspace结构,提高查询效率
  3. 缓存策略:合理使用缓存,减少Cassandra查询压力
  4. 事件驱动:通过事件机制保持数据一致性
  5. 监控告警:建立完善的监控体系,确保Cassandra查询服务稳定运行

通过以上措施,可以构建一个高效、稳定、可扩展的Cassandra微服务跨库查询系统,为企业的各种业务场景提供数据查询支持。