第147集限制每秒允许写入数据库的请求数量Java实战
|字数总计:6.5k|阅读时长:30分钟|阅读量:
1. 数据库写入限流概述
数据库写入限流是保护数据库系统稳定性的重要手段,通过限制每秒允许写入数据库的请求数量,可以有效防止数据库过载、连接池耗尽、锁竞争等问题。在Java应用中,合理使用数据库写入限流可以实现数据库保护、性能优化、系统稳定性提升等功能。本文将详细介绍数据库写入限流的原理、实现方法、性能优化技巧以及在Java实战中的应用。
1.1 数据库写入限流核心价值
- 数据库保护: 防止数据库过载和连接池耗尽
- 性能优化: 通过限流提升数据库性能
- 系统稳定性: 保证系统稳定运行
- 资源管理: 合理管理数据库资源
- 监控告警: 提供丰富的监控指标
1.2 数据库写入限流场景
- 高并发写入: 大量并发写入请求
- 批量数据导入: 批量数据导入操作
- 实时数据同步: 实时数据同步写入
- 数据迁移: 数据迁移过程中的写入
- 系统保护: 防止系统过载
1.3 限流算法对比
- 令牌桶算法: 支持突发流量,平滑限流
- 漏桶算法: 固定速率输出,严格限流
- 滑动窗口: 基于时间窗口的限流
- 计数器算法: 简单计数限流
2. 数据库写入限流基础实现
2.1 数据库写入限流配置类
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42
|
@Configuration @EnableConfigurationProperties(DatabaseWriteLimitProperties.class) public class DatabaseWriteLimitConfig { @Autowired private DatabaseWriteLimitProperties properties;
@Bean("databaseWriteLimiter") public RateLimiter databaseWriteLimiter() { RateLimiter limiter = RateLimiter.create(properties.getQpsLimit()); logger.info("数据库写入限流器初始化完成,QPS限制: {}", properties.getQpsLimit()); return limiter; }
@Bean public DatabaseWriteLimitService databaseWriteLimitService() { return new DatabaseWriteLimitService(); }
@Bean public DatabaseWriteMonitorService databaseWriteMonitorService() { return new DatabaseWriteMonitorService(); } private static final Logger logger = LoggerFactory.getLogger(DatabaseWriteLimitConfig.class); }
|
2.2 数据库写入限流属性配置
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63
|
@Data @ConfigurationProperties(prefix = "database.write.limit") public class DatabaseWriteLimitProperties {
private double qpsLimit = 100.0;
private boolean enabled = true;
private String mode = "TOKEN_BUCKET";
private boolean burstEnabled = true;
private int burstSize = 50;
private long warmupPeriod = 10;
private boolean warmupEnabled = false;
private long timeoutMs = 1000;
private boolean timeoutEnabled = true;
private long monitorInterval = 30000;
private boolean monitorEnabled = true; }
|
2.3 基础数据库写入限流服务
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211
|
@Service public class DatabaseWriteLimitService { @Autowired @Qualifier("databaseWriteLimiter") private RateLimiter rateLimiter; @Autowired private DatabaseWriteLimitProperties properties; @Autowired private DatabaseWriteMonitorService monitorService; private static final Logger logger = LoggerFactory.getLogger(DatabaseWriteLimitService.class);
public DatabaseWriteResult writeWithLimit(Map<String, Object> data) { if (!properties.isEnabled()) { return executeWrite(data); } try { boolean acquired = acquireToken(); if (acquired) { DatabaseWriteResult result = executeWrite(data); monitorService.recordSuccess(); logger.debug("数据库写入成功,数据: {}", data); return result; } else { monitorService.recordLimited(); logger.warn("数据库写入被限流,数据: {}", data); return DatabaseWriteResult.limited("写入请求被限流"); } } catch (Exception e) { monitorService.recordError(); logger.error("数据库写入异常,数据: {}", data, e); return DatabaseWriteResult.error("写入异常: " + e.getMessage()); } }
public BatchWriteResult writeBatchWithLimit(List<Map<String, Object>> dataList) { if (!properties.isEnabled()) { return executeBatchWrite(dataList); } BatchWriteResult result = new BatchWriteResult(); result.setTotalCount(dataList.size()); result.setStartTime(System.currentTimeMillis()); int successCount = 0; int limitedCount = 0; int errorCount = 0; for (Map<String, Object> data : dataList) { try { boolean acquired = acquireToken(); if (acquired) { DatabaseWriteResult writeResult = executeWrite(data); if (writeResult.isSuccess()) { successCount++; monitorService.recordSuccess(); } else { errorCount++; monitorService.recordError(); } } else { limitedCount++; monitorService.recordLimited(); } } catch (Exception e) { errorCount++; monitorService.recordError(); logger.error("批量写入异常,数据: {}", data, e); } } result.setSuccessCount(successCount); result.setLimitedCount(limitedCount); result.setErrorCount(errorCount); result.setEndTime(System.currentTimeMillis()); logger.info("批量写入完成,成功: {}, 限流: {}, 错误: {}, 总耗时: {}ms", successCount, limitedCount, errorCount, result.getDuration()); return result; }
private boolean acquireToken() { if (properties.isTimeoutEnabled()) { return rateLimiter.tryAcquire(properties.getTimeoutMs(), TimeUnit.MILLISECONDS); } else { return rateLimiter.tryAcquire(); } }
private DatabaseWriteResult executeWrite(Map<String, Object> data) { try { Thread.sleep(50); DatabaseWriteResult result = new DatabaseWriteResult(); result.setSuccess(true); result.setRowsAffected(1); result.setExecutionTime(System.currentTimeMillis()); result.setData(data); return result; } catch (InterruptedException e) { Thread.currentThread().interrupt(); return DatabaseWriteResult.error("写入被中断"); } catch (Exception e) { return DatabaseWriteResult.error("写入失败: " + e.getMessage()); } }
private BatchWriteResult executeBatchWrite(List<Map<String, Object>> dataList) { BatchWriteResult result = new BatchWriteResult(); result.setTotalCount(dataList.size()); result.setStartTime(System.currentTimeMillis()); try { Thread.sleep(dataList.size() * 10); result.setSuccessCount(dataList.size()); result.setLimitedCount(0); result.setErrorCount(0); result.setEndTime(System.currentTimeMillis()); return result; } catch (InterruptedException e) { Thread.currentThread().interrupt(); result.setErrorCount(dataList.size()); result.setEndTime(System.currentTimeMillis()); return result; } catch (Exception e) { result.setErrorCount(dataList.size()); result.setEndTime(System.currentTimeMillis()); return result; } }
public RateLimiterStatus getRateLimiterStatus() { RateLimiterStatus status = new RateLimiterStatus(); status.setQpsLimit(properties.getQpsLimit()); status.setAvailablePermits(rateLimiter.availablePermits()); status.setCurrentTime(System.currentTimeMillis()); return status; }
public void adjustQpsLimit(double newQpsLimit) { rateLimiter.setRate(newQpsLimit); properties.setQpsLimit(newQpsLimit); logger.info("数据库写入QPS限制调整: {} -> {}", properties.getQpsLimit(), newQpsLimit); } }
|
2.4 数据库写入结果类
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63
|
@Data public class DatabaseWriteResult { private boolean success; private int rowsAffected; private String error; private long executionTime; private Map<String, Object> data; public DatabaseWriteResult() { this.success = false; this.rowsAffected = 0; this.executionTime = System.currentTimeMillis(); }
public static DatabaseWriteResult success(int rowsAffected) { DatabaseWriteResult result = new DatabaseWriteResult(); result.setSuccess(true); result.setRowsAffected(rowsAffected); return result; }
public static DatabaseWriteResult limited(String message) { DatabaseWriteResult result = new DatabaseWriteResult(); result.setSuccess(false); result.setError(message); return result; }
public static DatabaseWriteResult error(String error) { DatabaseWriteResult result = new DatabaseWriteResult(); result.setSuccess(false); result.setError(error); return result; }
public boolean isLimited() { return !success && error != null && error.contains("限流"); } }
|
2.5 批量写入结果类
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63
|
@Data public class BatchWriteResult { private int totalCount; private int successCount; private int limitedCount; private int errorCount; private long startTime; private long endTime; public BatchWriteResult() { this.successCount = 0; this.limitedCount = 0; this.errorCount = 0; }
public long getDuration() { return endTime - startTime; }
public double getSuccessRate() { if (totalCount == 0) return 0.0; return (double) successCount / totalCount * 100; }
public double getLimitedRate() { if (totalCount == 0) return 0.0; return (double) limitedCount / totalCount * 100; }
public double getErrorRate() { if (totalCount == 0) return 0.0; return (double) errorCount / totalCount * 100; }
public boolean isAllSuccess() { return limitedCount == 0 && errorCount == 0; } }
|
3. 高级限流功能
3.1 滑动窗口限流服务
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88
|
@Service public class SlidingWindowLimitService { private final Map<String, SlidingWindow> windows = new ConcurrentHashMap<>(); private static final Logger logger = LoggerFactory.getLogger(SlidingWindowLimitService.class);
public DatabaseWriteResult writeWithSlidingWindow(String key, int qpsLimit, Map<String, Object> data) { try { SlidingWindow window = windows.computeIfAbsent(key, k -> new SlidingWindow(qpsLimit, 1000)); if (window.tryAcquire()) { DatabaseWriteResult result = executeWrite(data); logger.debug("滑动窗口限流写入成功,key: {}, data: {}", key, data); return result; } else { logger.warn("滑动窗口限流写入被限流,key: {}, qpsLimit: {}", key, qpsLimit); return DatabaseWriteResult.limited("滑动窗口限流:请求频率过高"); } } catch (Exception e) { logger.error("滑动窗口限流写入异常,key: {}, data: {}", key, data, e); return DatabaseWriteResult.error("写入异常: " + e.getMessage()); } }
public SlidingWindowStatus getSlidingWindowStatus(String key) { SlidingWindow window = windows.get(key); if (window == null) { return null; } SlidingWindowStatus status = new SlidingWindowStatus(); status.setKey(key); status.setQpsLimit(window.getQpsLimit()); status.setCurrentQps(window.getCurrentQps()); status.setWindowSize(window.getWindowSize()); status.setCurrentTime(System.currentTimeMillis()); return status; }
private DatabaseWriteResult executeWrite(Map<String, Object> data) { try { Thread.sleep(50); DatabaseWriteResult result = new DatabaseWriteResult(); result.setSuccess(true); result.setRowsAffected(1); result.setExecutionTime(System.currentTimeMillis()); result.setData(data); return result; } catch (InterruptedException e) { Thread.currentThread().interrupt(); return DatabaseWriteResult.error("写入被中断"); } catch (Exception e) { return DatabaseWriteResult.error("写入失败: " + e.getMessage()); } } }
|
3.2 滑动窗口实现类
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79
|
public class SlidingWindow { private final int qpsLimit; private final long windowSize; private final Queue<Long> requests; private final Object lock = new Object(); public SlidingWindow(int qpsLimit, long windowSize) { this.qpsLimit = qpsLimit; this.windowSize = windowSize; this.requests = new LinkedList<>(); }
public boolean tryAcquire() { synchronized (lock) { long currentTime = System.currentTimeMillis(); cleanExpiredRequests(currentTime); if (requests.size() >= qpsLimit) { return false; } requests.offer(currentTime); return true; } }
private void cleanExpiredRequests(long currentTime) { long expireTime = currentTime - windowSize; while (!requests.isEmpty() && requests.peek() < expireTime) { requests.poll(); } }
public int getCurrentQps() { synchronized (lock) { long currentTime = System.currentTimeMillis(); cleanExpiredRequests(currentTime); return requests.size(); } }
public int getQpsLimit() { return qpsLimit; }
public long getWindowSize() { return windowSize; } }
|
3.3 滑动窗口状态类
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42
|
@Data public class SlidingWindowStatus { private String key; private int qpsLimit; private int currentQps; private long windowSize; private long currentTime; public SlidingWindowStatus() { this.currentTime = System.currentTimeMillis(); }
public double getUsageRate() { if (qpsLimit == 0) return 0.0; return (double) currentQps / qpsLimit * 100; }
public boolean isNearLimit() { return getUsageRate() > 80; }
public boolean isLimited() { return currentQps >= qpsLimit; } }
|
3.4 数据库写入监控服务
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101
|
@Service public class DatabaseWriteMonitorService { private final AtomicLong successCount = new AtomicLong(0); private final AtomicLong limitedCount = new AtomicLong(0); private final AtomicLong errorCount = new AtomicLong(0); private final AtomicLong totalCount = new AtomicLong(0); private long lastResetTime = System.currentTimeMillis(); private final long resetInterval = 60000; private static final Logger logger = LoggerFactory.getLogger(DatabaseWriteMonitorService.class);
public void recordSuccess() { successCount.incrementAndGet(); totalCount.incrementAndGet(); }
public void recordLimited() { limitedCount.incrementAndGet(); totalCount.incrementAndGet(); }
public void recordError() { errorCount.incrementAndGet(); totalCount.incrementAndGet(); }
public DatabaseWriteMetrics getMetrics() { if (System.currentTimeMillis() - lastResetTime > resetInterval) { resetMetrics(); } DatabaseWriteMetrics metrics = new DatabaseWriteMetrics(); metrics.setSuccessCount(successCount.get()); metrics.setLimitedCount(limitedCount.get()); metrics.setErrorCount(errorCount.get()); metrics.setTotalCount(totalCount.get()); metrics.setTimestamp(System.currentTimeMillis()); return metrics; }
private void resetMetrics() { successCount.set(0); limitedCount.set(0); errorCount.set(0); totalCount.set(0); lastResetTime = System.currentTimeMillis(); logger.info("数据库写入监控指标重置"); }
@Scheduled(fixedRate = 30000) public void monitorDatabaseWriteStatus() { try { DatabaseWriteMetrics metrics = getMetrics(); logger.info("数据库写入监控: 总请求数={}, 成功数={}, 限流数={}, 错误数={}, 成功率={}%, 限流率={}%", metrics.getTotalCount(), metrics.getSuccessCount(), metrics.getLimitedCount(), metrics.getErrorCount(), String.format("%.2f", metrics.getSuccessRate()), String.format("%.2f", metrics.getLimitedRate())); if (metrics.getLimitedRate() > 20) { logger.warn("数据库写入限流率过高: {}%", String.format("%.2f", metrics.getLimitedRate())); } if (metrics.getErrorRate() > 10) { logger.warn("数据库写入错误率过高: {}%", String.format("%.2f", metrics.getErrorRate())); } } catch (Exception e) { logger.error("数据库写入状态监控失败", e); } } }
|
3.5 数据库写入指标类
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52
|
@Data public class DatabaseWriteMetrics { private long successCount; private long limitedCount; private long errorCount; private long totalCount; private long timestamp; public DatabaseWriteMetrics() { this.timestamp = System.currentTimeMillis(); }
public double getSuccessRate() { if (totalCount == 0) return 0.0; return (double) successCount / totalCount * 100; }
public double getLimitedRate() { if (totalCount == 0) return 0.0; return (double) limitedCount / totalCount * 100; }
public double getErrorRate() { if (totalCount == 0) return 0.0; return (double) errorCount / totalCount * 100; }
public boolean isHealthy() { return getLimitedRate() < 20 && getErrorRate() < 10; } }
|
3.6 限流器状态类
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40
|
@Data public class RateLimiterStatus { private double qpsLimit; private double availablePermits; private long currentTime; public RateLimiterStatus() { this.currentTime = System.currentTimeMillis(); }
public double getUsageRate() { if (qpsLimit == 0) return 0.0; return (qpsLimit - availablePermits) / qpsLimit * 100; }
public boolean isNearLimit() { return availablePermits < qpsLimit * 0.1; }
public boolean isLimited() { return availablePermits <= 0; } }
|
4. 数据库写入限流控制器
4.1 数据库写入限流REST控制器
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156
|
@RestController @RequestMapping("/api/database/write/limit") public class DatabaseWriteLimitController { @Autowired private DatabaseWriteLimitService databaseWriteLimitService; @Autowired private SlidingWindowLimitService slidingWindowLimitService; @Autowired private DatabaseWriteMonitorService monitorService; private static final Logger logger = LoggerFactory.getLogger(DatabaseWriteLimitController.class);
@PostMapping("/single") public ResponseEntity<DatabaseWriteResult> writeSingle(@RequestBody DatabaseWriteRequest request) { try { logger.info("接收到限流写入单条记录请求"); DatabaseWriteResult result = databaseWriteLimitService.writeWithLimit(request.getData()); return ResponseEntity.ok(result); } catch (Exception e) { logger.error("限流写入单条记录失败", e); return ResponseEntity.status(HttpStatus.INTERNAL_SERVER_ERROR).build(); } }
@PostMapping("/batch") public ResponseEntity<BatchWriteResult> writeBatch(@RequestBody BatchWriteRequest request) { try { logger.info("接收到限流写入多条记录请求,数量: {}", request.getDataList().size()); BatchWriteResult result = databaseWriteLimitService.writeBatchWithLimit(request.getDataList()); return ResponseEntity.ok(result); } catch (Exception e) { logger.error("限流写入多条记录失败", e); return ResponseEntity.status(HttpStatus.INTERNAL_SERVER_ERROR).build(); } }
@PostMapping("/sliding-window") public ResponseEntity<DatabaseWriteResult> writeWithSlidingWindow(@RequestBody SlidingWindowWriteRequest request) { try { logger.info("接收到滑动窗口限流写入请求,key: {}, qpsLimit: {}", request.getKey(), request.getQpsLimit()); DatabaseWriteResult result = slidingWindowLimitService.writeWithSlidingWindow( request.getKey(), request.getQpsLimit(), request.getData()); return ResponseEntity.ok(result); } catch (Exception e) { logger.error("滑动窗口限流写入失败", e); return ResponseEntity.status(HttpStatus.INTERNAL_SERVER_ERROR).build(); } }
@GetMapping("/status") public ResponseEntity<RateLimiterStatus> getRateLimiterStatus() { try { RateLimiterStatus status = databaseWriteLimitService.getRateLimiterStatus(); return ResponseEntity.ok(status); } catch (Exception e) { logger.error("获取限流器状态失败", e); return ResponseEntity.status(HttpStatus.INTERNAL_SERVER_ERROR).build(); } }
@GetMapping("/sliding-window/status/{key}") public ResponseEntity<SlidingWindowStatus> getSlidingWindowStatus(@PathVariable String key) { try { SlidingWindowStatus status = slidingWindowLimitService.getSlidingWindowStatus(key); if (status != null) { return ResponseEntity.ok(status); } else { return ResponseEntity.notFound().build(); } } catch (Exception e) { logger.error("获取滑动窗口状态失败", e); return ResponseEntity.status(HttpStatus.INTERNAL_SERVER_ERROR).build(); } }
@GetMapping("/metrics") public ResponseEntity<DatabaseWriteMetrics> getMetrics() { try { DatabaseWriteMetrics metrics = monitorService.getMetrics(); return ResponseEntity.ok(metrics); } catch (Exception e) { logger.error("获取监控指标失败", e); return ResponseEntity.status(HttpStatus.INTERNAL_SERVER_ERROR).build(); } }
@PostMapping("/adjust") public ResponseEntity<Map<String, String>> adjustQpsLimit(@RequestBody QpsAdjustRequest request) { try { logger.info("接收到QPS限制调整请求: {}", request.getQpsLimit()); databaseWriteLimitService.adjustQpsLimit(request.getQpsLimit()); Map<String, String> response = new HashMap<>(); response.put("status", "SUCCESS"); response.put("message", "QPS限制调整成功"); response.put("newQpsLimit", String.valueOf(request.getQpsLimit())); return ResponseEntity.ok(response); } catch (Exception e) { logger.error("调整QPS限制失败", e); return ResponseEntity.status(HttpStatus.INTERNAL_SERVER_ERROR).build(); } } }
|
4.2 请求类定义
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15
|
@Data public class DatabaseWriteRequest { private Map<String, Object> data; public DatabaseWriteRequest() {} public DatabaseWriteRequest(Map<String, Object> data) { this.data = data; } }
|
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15
|
@Data public class BatchWriteRequest { private List<Map<String, Object>> dataList; public BatchWriteRequest() {} public BatchWriteRequest(List<Map<String, Object>> dataList) { this.dataList = dataList; } }
|
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19
|
@Data public class SlidingWindowWriteRequest { private String key; private int qpsLimit; private Map<String, Object> data; public SlidingWindowWriteRequest() {} public SlidingWindowWriteRequest(String key, int qpsLimit, Map<String, Object> data) { this.key = key; this.qpsLimit = qpsLimit; this.data = data; } }
|
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15
|
@Data public class QpsAdjustRequest { private double qpsLimit; public QpsAdjustRequest() {} public QpsAdjustRequest(double qpsLimit) { this.qpsLimit = qpsLimit; } }
|
5. 数据库写入限流注解和AOP
5.1 数据库写入限流注解
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44
|
@Target({ElementType.METHOD, ElementType.TYPE}) @Retention(RetentionPolicy.RUNTIME) @Documented public @interface DatabaseWriteLimit {
double qpsLimit() default 100.0;
String key() default "";
String mode() default "TOKEN_BUCKET";
long timeoutMs() default 1000;
boolean timeoutEnabled() default true;
String message() default "数据库写入请求被限流,请稍后重试";
int statusCode() default 429; }
|
5.2 数据库写入限流AOP切面
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84
|
@Aspect @Component public class DatabaseWriteLimitAspect { @Autowired private DatabaseWriteLimitService databaseWriteLimitService; @Autowired private SlidingWindowLimitService slidingWindowLimitService; @Autowired private DatabaseWriteMonitorService monitorService; private static final Logger logger = LoggerFactory.getLogger(DatabaseWriteLimitAspect.class);
@Pointcut("@annotation(databaseWriteLimit)") public void databaseWriteLimitPointcut(DatabaseWriteLimit databaseWriteLimit) {}
@Around("databaseWriteLimitPointcut(databaseWriteLimit)") public Object around(ProceedingJoinPoint joinPoint, DatabaseWriteLimit databaseWriteLimit) throws Throwable { String key = generateKey(joinPoint, databaseWriteLimit); try { DatabaseWriteResult result; if ("SLIDING_WINDOW".equals(databaseWriteLimit.mode())) { result = slidingWindowLimitService.writeWithSlidingWindow( key, (int) databaseWriteLimit.qpsLimit(), null); } else { result = databaseWriteLimitService.writeWithLimit(null); } if (result.isSuccess()) { logger.debug("数据库写入限流检查通过: key={}", key); return joinPoint.proceed(); } else { logger.warn("数据库写入限流检查失败: key={}, message={}", key, result.getError()); throw new DatabaseWriteLimitException(databaseWriteLimit.message(), databaseWriteLimit.statusCode()); } } catch (DatabaseWriteLimitException e) { throw e; } catch (Exception e) { logger.error("数据库写入限流处理异常: key={}", key, e); throw new DatabaseWriteLimitException("数据库写入限流处理异常", 500); } }
private String generateKey(ProceedingJoinPoint joinPoint, DatabaseWriteLimit databaseWriteLimit) { if (!databaseWriteLimit.key().isEmpty()) { return databaseWriteLimit.key(); } MethodSignature signature = (MethodSignature) joinPoint.getSignature(); String className = signature.getDeclaringType().getSimpleName(); String methodName = signature.getName(); return className + "." + methodName; } }
|
5.3 数据库写入限流异常类
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32
|
public class DatabaseWriteLimitException extends RuntimeException { private final int statusCode; public DatabaseWriteLimitException(String message) { super(message); this.statusCode = 429; } public DatabaseWriteLimitException(String message, int statusCode) { super(message); this.statusCode = statusCode; } public DatabaseWriteLimitException(String message, Throwable cause) { super(message, cause); this.statusCode = 429; } public DatabaseWriteLimitException(String message, Throwable cause, int statusCode) { super(message, cause); this.statusCode = statusCode; } public int getStatusCode() { return statusCode; } }
|
5.4 数据库写入限流异常处理器
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26
|
@ControllerAdvice public class DatabaseWriteLimitExceptionHandler { private static final Logger logger = LoggerFactory.getLogger(DatabaseWriteLimitExceptionHandler.class);
@ExceptionHandler(DatabaseWriteLimitException.class) public ResponseEntity<Map<String, Object>> handleDatabaseWriteLimitException(DatabaseWriteLimitException e) { logger.warn("数据库写入限流异常: {}", e.getMessage()); Map<String, Object> response = new HashMap<>(); response.put("error", "DATABASE_WRITE_LIMIT_EXCEEDED"); response.put("message", e.getMessage()); response.put("timestamp", System.currentTimeMillis()); return ResponseEntity.status(e.getStatusCode()).body(response); } }
|
6. 实际应用示例
6.1 使用数据库写入限流注解的服务
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88
|
@Service public class DatabaseWriteLimitExampleService { private static final Logger logger = LoggerFactory.getLogger(DatabaseWriteLimitExampleService.class);
@DatabaseWriteLimit(qpsLimit = 50.0, key = "example.basic", message = "基础限流:数据库写入请求被限流") public String basicWriteLimit(String data) { logger.info("执行基础限流写入示例: {}", data); try { Thread.sleep(100); } catch (InterruptedException e) { Thread.currentThread().interrupt(); } return "基础限流写入完成: " + data; }
@DatabaseWriteLimit(qpsLimit = 30.0, key = "example.sliding", mode = "SLIDING_WINDOW", message = "滑动窗口限流:数据库写入请求被限流") public String slidingWindowWriteLimit(String data) { logger.info("执行滑动窗口限流写入示例: {}", data); try { Thread.sleep(120); } catch (InterruptedException e) { Thread.currentThread().interrupt(); } return "滑动窗口限流写入完成: " + data; }
@DatabaseWriteLimit(qpsLimit = 200.0, key = "example.highQps", timeoutMs = 500, message = "高QPS限流:数据库写入请求被限流") public String highQpsWriteLimit(String data) { logger.info("执行高QPS限流写入示例: {}", data); try { Thread.sleep(80); } catch (InterruptedException e) { Thread.currentThread().interrupt(); } return "高QPS限流写入完成: " + data; }
@DatabaseWriteLimit(qpsLimit = 10.0, key = "example.lowQps", timeoutMs = 2000, message = "低QPS限流:数据库写入请求被限流") public String lowQpsWriteLimit(String data) { logger.info("执行低QPS限流写入示例: {}", data); try { Thread.sleep(150); } catch (InterruptedException e) { Thread.currentThread().interrupt(); } return "低QPS限流写入完成: " + data; } }
|
6.2 数据库写入限流测试控制器
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153
|
@RestController @RequestMapping("/api/database/write/limit/test") public class DatabaseWriteLimitTestController { @Autowired private DatabaseWriteLimitExampleService exampleService; @Autowired private DatabaseWriteLimitService databaseWriteLimitService; @Autowired private DatabaseWriteMonitorService monitorService; private static final Logger logger = LoggerFactory.getLogger(DatabaseWriteLimitTestController.class);
@GetMapping("/basic") public ResponseEntity<Map<String, String>> testBasicWriteLimit(@RequestParam String data) { try { String result = exampleService.basicWriteLimit(data); Map<String, String> response = new HashMap<>(); response.put("status", "SUCCESS"); response.put("result", result); response.put("timestamp", String.valueOf(System.currentTimeMillis())); return ResponseEntity.ok(response); } catch (DatabaseWriteLimitException e) { logger.warn("基础限流写入测试被限流: {}", e.getMessage()); return ResponseEntity.status(e.getStatusCode()).build(); } catch (Exception e) { logger.error("基础限流写入测试失败", e); return ResponseEntity.status(HttpStatus.INTERNAL_SERVER_ERROR).build(); } }
@GetMapping("/sliding-window") public ResponseEntity<Map<String, String>> testSlidingWindowWriteLimit(@RequestParam String data) { try { String result = exampleService.slidingWindowWriteLimit(data); Map<String, String> response = new HashMap<>(); response.put("status", "SUCCESS"); response.put("result", result); response.put("timestamp", String.valueOf(System.currentTimeMillis())); return ResponseEntity.ok(response); } catch (DatabaseWriteLimitException e) { logger.warn("滑动窗口限流写入测试被限流: {}", e.getMessage()); return ResponseEntity.status(e.getStatusCode()).build(); } catch (Exception e) { logger.error("滑动窗口限流写入测试失败", e); return ResponseEntity.status(HttpStatus.INTERNAL_SERVER_ERROR).build(); } }
@GetMapping("/high-qps") public ResponseEntity<Map<String, String>> testHighQpsWriteLimit(@RequestParam String data) { try { String result = exampleService.highQpsWriteLimit(data); Map<String, String> response = new HashMap<>(); response.put("status", "SUCCESS"); response.put("result", result); response.put("timestamp", String.valueOf(System.currentTimeMillis())); return ResponseEntity.ok(response); } catch (DatabaseWriteLimitException e) { logger.warn("高QPS限流写入测试被限流: {}", e.getMessage()); return ResponseEntity.status(e.getStatusCode()).build(); } catch (Exception e) { logger.error("高QPS限流写入测试失败", e); return ResponseEntity.status(HttpStatus.INTERNAL_SERVER_ERROR).build(); } }
@GetMapping("/low-qps") public ResponseEntity<Map<String, String>> testLowQpsWriteLimit(@RequestParam String data) { try { String result = exampleService.lowQpsWriteLimit(data); Map<String, String> response = new HashMap<>(); response.put("status", "SUCCESS"); response.put("result", result); response.put("timestamp", String.valueOf(System.currentTimeMillis())); return ResponseEntity.ok(response); } catch (DatabaseWriteLimitException e) { logger.warn("低QPS限流写入测试被限流: {}", e.getMessage()); return ResponseEntity.status(e.getStatusCode()).build(); } catch (Exception e) { logger.error("低QPS限流写入测试失败", e); return ResponseEntity.status(HttpStatus.INTERNAL_SERVER_ERROR).build(); } }
@GetMapping("/status") public ResponseEntity<RateLimiterStatus> getStatus() { try { RateLimiterStatus status = databaseWriteLimitService.getRateLimiterStatus(); return ResponseEntity.ok(status); } catch (Exception e) { logger.error("获取限流器状态失败", e); return ResponseEntity.status(HttpStatus.INTERNAL_SERVER_ERROR).build(); } }
@GetMapping("/metrics") public ResponseEntity<DatabaseWriteMetrics> getMetrics() { try { DatabaseWriteMetrics metrics = monitorService.getMetrics(); return ResponseEntity.ok(metrics); } catch (Exception e) { logger.error("获取监控指标失败", e); return ResponseEntity.status(HttpStatus.INTERNAL_SERVER_ERROR).build(); } } }
|
7. 总结
7.1 数据库写入限流最佳实践
- 合理设置QPS限制: 根据数据库性能和业务需求设置QPS限制
- 选择合适的限流算法: 根据场景选择令牌桶、滑动窗口等算法
- 监控限流状态: 实时监控限流器状态和性能指标
- 动态调整参数: 根据负载情况动态调整限流参数
- 异常处理: 实现完善的异常处理和用户友好提示
7.2 性能优化建议
- 预热机制: 使用预热机制平滑启动限流器
- 滑动窗口: 使用滑动窗口实现精确限流
- 监控告警: 建立完善的监控和告警机制
- 异步处理: 使用异步处理提升系统响应性能
- 缓存优化: 合理使用缓存减少限流检查开销
7.3 运维管理要点
- 实时监控: 监控限流器状态和性能指标
- 动态调整: 根据负载情况动态调整限流参数
- 异常处理: 建立异常处理和告警机制
- 日志管理: 完善日志记录和分析
- 性能调优: 根据监控数据优化限流参数
通过本文的限制每秒允许写入数据库的请求数量Java实战指南,您可以掌握数据库写入限流的原理、实现方法、性能优化技巧以及在企业级应用中的最佳实践,构建高效、稳定的数据库写入限流系统!