第247集SpringBoot读写分离多节点架构实战:主从切换、负载均衡与企业级高可用设计
|字数总计:2.9k|阅读时长:13分钟|阅读量:
前言
随着业务规模增长,单数据库节点已成为系统瓶颈。读写分离多节点架构通过将读操作分散到多个从节点,写操作集中在主节点,显著提升系统并发能力与可用性。本文从SpringBoot配置到企业级治理,系统梳理可落地的多节点架构方案。
一、架构设计原则
- 读写分离:写操作路由到主库,读操作路由到从库,降低主库压力。
- 负载均衡:多个从库间轮询/权重分配,提升读性能。
- 故障转移:主库故障时自动切换,从库故障时剔除并恢复。
- 数据一致性:主从同步延迟处理,最终一致性保障。
- 监控告警:实时监控各节点状态,异常及时告警。
二、核心组件选型
2.1 数据源管理
- HikariCP:高性能连接池,支持多数据源配置。
- Druid:阿里开源连接池,提供监控与SQL防火墙。
- ShardingSphere:分库分表中间件,支持读写分离。
2.2 负载均衡策略
- 轮询(Round Robin):均匀分配请求。
- 权重轮询(Weighted Round Robin):按性能分配权重。
- 最少连接(Least Connections):选择连接数最少的节点。
- 一致性哈希(Consistent Hash):保证相同请求路由到同一节点。
三、基础配置实现
3.1 多数据源配置
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36
| @Configuration @EnableTransactionManagement public class DataSourceConfig {
@Bean @Primary @ConfigurationProperties("spring.datasource.master") public DataSource masterDataSource() { return DruidDataSourceBuilder.create().build(); }
@Bean @ConfigurationProperties("spring.datasource.slave1") public DataSource slave1DataSource() { return DruidDataSourceBuilder.create().build(); }
@Bean @ConfigurationProperties("spring.datasource.slave2") public DataSource slave2DataSource() { return DruidDataSourceBuilder.create().build(); }
@Bean public DataSource routingDataSource() { Map<Object, Object> dataSourceMap = new HashMap<>(); dataSourceMap.put("master", masterDataSource()); dataSourceMap.put("slave1", slave1DataSource()); dataSourceMap.put("slave2", slave2DataSource());
DynamicDataSource routingDataSource = new DynamicDataSource(); routingDataSource.setDefaultTargetDataSource(masterDataSource()); routingDataSource.setTargetDataSources(dataSourceMap); return routingDataSource; } }
|
3.2 动态数据源切换
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23
| public class DynamicDataSource extends AbstractRoutingDataSource {
@Override protected Object determineCurrentLookupKey() { return DataSourceContextHolder.getDataSourceType(); } }
public class DataSourceContextHolder { private static final ThreadLocal<String> CONTEXT_HOLDER = new ThreadLocal<>();
public static void setDataSourceType(String dataSourceType) { CONTEXT_HOLDER.set(dataSourceType); }
public static String getDataSourceType() { return CONTEXT_HOLDER.get(); }
public static void clearDataSourceType() { CONTEXT_HOLDER.remove(); } }
|
四、读写分离实现
4.1 AOP切面实现
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36
| @Aspect @Component @Order(1) public class DataSourceAspect {
@Pointcut("@annotation(org.springframework.transaction.annotation.Transactional)") public void transactionalPointcut() {}
@Pointcut("execution(* com.example.service.*.save*(..)) || " + "execution(* com.example.service.*.insert*(..)) || " + "execution(* com.example.service.*.update*(..)) || " + "execution(* com.example.service.*.delete*(..))") public void writePointcut() {}
@Pointcut("execution(* com.example.service.*.find*(..)) || " + "execution(* com.example.service.*.get*(..)) || " + "execution(* com.example.service.*.list*(..)) || " + "execution(* com.example.service.*.query*(..))") public void readPointcut() {}
@Before("writePointcut()") public void setWriteDataSource() { DataSourceContextHolder.setDataSourceType("master"); }
@Before("readPointcut()") public void setReadDataSource() { String slave = LoadBalancer.selectSlave(); DataSourceContextHolder.setDataSourceType(slave); }
@After("transactionalPointcut()") public void clearDataSource() { DataSourceContextHolder.clearDataSourceType(); } }
|
4.2 负载均衡器
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27
| @Component public class LoadBalancer {
private static final List<String> SLAVE_NODES = Arrays.asList("slave1", "slave2"); private static final AtomicInteger COUNTER = new AtomicInteger(0);
public static String selectSlave() { int index = COUNTER.getAndIncrement() % SLAVE_NODES.size(); return SLAVE_NODES.get(index); }
public static String selectSlaveByWeight() { Map<String, Integer> weights = Map.of("slave1", 3, "slave2", 1); int totalWeight = weights.values().stream().mapToInt(Integer::intValue).sum(); int random = ThreadLocalRandom.current().nextInt(totalWeight);
int currentWeight = 0; for (Map.Entry<String, Integer> entry : weights.entrySet()) { currentWeight += entry.getValue(); if (random < currentWeight) { return entry.getKey(); } } return "slave1"; } }
|
五、主从切换与故障转移
5.1 健康检查
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38
| @Component @Slf4j public class DataSourceHealthChecker {
@Autowired private Map<String, DataSource> dataSourceMap;
private final Map<String, Boolean> healthStatus = new ConcurrentHashMap<>();
@Scheduled(fixedDelay = 5000) public void checkHealth() { dataSourceMap.forEach((name, dataSource) -> { try { try (Connection conn = dataSource.getConnection()) { boolean isValid = conn.isValid(3); healthStatus.put(name, isValid); if (!isValid) { log.warn("数据源 {} 健康检查失败", name); } } } catch (SQLException e) { healthStatus.put(name, false); log.error("数据源 {} 健康检查异常", name, e); } }); }
public boolean isHealthy(String dataSourceName) { return healthStatus.getOrDefault(dataSourceName, false); }
public List<String> getHealthySlaves() { return healthStatus.entrySet().stream() .filter(entry -> entry.getKey().startsWith("slave") && entry.getValue()) .map(Map.Entry::getKey) .collect(Collectors.toList()); } }
|
5.2 故障转移策略
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37
| @Component public class FailoverStrategy {
@Autowired private DataSourceHealthChecker healthChecker;
public String selectDataSource(String operation) { if ("write".equals(operation)) { return selectMaster(); } else { return selectSlave(); } }
private String selectMaster() { if (healthChecker.isHealthy("master")) { return "master"; } List<String> healthySlaves = healthChecker.getHealthySlaves(); if (!healthySlaves.isEmpty()) { log.warn("主库故障,切换到从库: {}", healthySlaves.get(0)); return healthySlaves.get(0); } throw new RuntimeException("所有数据源都不可用"); }
private String selectSlave() { List<String> healthySlaves = healthChecker.getHealthySlaves(); if (healthySlaves.isEmpty()) { log.warn("所有从库故障,降级到主库"); return "master"; } return LoadBalancer.selectSlaveByWeight(); } }
|
六、ShardingSphere集成
6.1 配置读写分离
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35
| spring: shardingsphere: datasource: names: master,slave1,slave2 master: type: com.zaxxer.hikari.HikariDataSource driver-class-name: com.mysql.cj.jdbc.Driver jdbc-url: jdbc:mysql://master-host:3306/db?useUnicode=true&characterEncoding=utf8 username: root password: password slave1: type: com.zaxxer.hikari.HikariDataSource driver-class-name: com.mysql.cj.jdbc.Driver jdbc-url: jdbc:mysql://slave1-host:3306/db?useUnicode=true&characterEncoding=utf8 username: root password: password slave2: type: com.zaxxer.hikari.HikariDataSource driver-class-name: com.mysql.cj.jdbc.Driver jdbc-url: jdbc:mysql://slave2-host:3306/db?useUnicode=true&characterEncoding=utf8 username: root password: password
rules: readwrite-splitting: data-sources: readwrite_ds: static-strategy: write-data-source-name: master read-data-source-names: slave1,slave2 load-balancer-name: round_robin
load-balancers: round_robin: type: ROUND_ROBIN
|
6.2 强制路由
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18
| @Component public class ShardingSphereRouter {
@Autowired private ShardingSphereDataSource dataSource;
public void forceMaster() { HintManager.getInstance().setMasterRouteOnly(); }
public void forceSlave() { HintManager.getInstance().setMasterRouteOnly(false); }
public void clearHint() { HintManager.clear(); } }
|
七、连接池优化
7.1 HikariCP配置
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25
| spring: datasource: master: hikari: maximum-pool-size: 20 minimum-idle: 5 idle-timeout: 300000 max-lifetime: 1200000 connection-timeout: 30000 validation-timeout: 5000 leak-detection-threshold: 60000 slave1: hikari: maximum-pool-size: 15 minimum-idle: 3 idle-timeout: 300000 max-lifetime: 1200000 connection-timeout: 30000 slave2: hikari: maximum-pool-size: 15 minimum-idle: 3 idle-timeout: 300000 max-lifetime: 1200000 connection-timeout: 30000
|
7.2 连接池监控
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24
| @Component @Slf4j public class ConnectionPoolMonitor {
@Autowired private Map<String, DataSource> dataSourceMap;
@Scheduled(fixedDelay = 30000) public void monitorConnectionPools() { dataSourceMap.forEach((name, dataSource) -> { if (dataSource instanceof HikariDataSource) { HikariDataSource hikariDataSource = (HikariDataSource) dataSource; HikariPoolMXBean poolBean = hikariDataSource.getHikariPoolMXBean();
log.info("数据源 {} - 活跃连接: {}, 空闲连接: {}, 总连接: {}, 等待线程: {}", name, poolBean.getActiveConnections(), poolBean.getIdleConnections(), poolBean.getTotalConnections(), poolBean.getThreadsAwaitingConnection()); } }); } }
|
八、事务管理
8.1 分布式事务处理
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28
| @Service @Transactional public class UserService {
@Autowired private UserMapper userMapper;
@Transactional(rollbackFor = Exception.class) public void createUser(User user) { userMapper.insert(user);
try { HintManager.getInstance().setMasterRouteOnly(); User savedUser = userMapper.selectById(user.getId()); log.info("用户创建成功: {}", savedUser); } finally { HintManager.clear(); } }
@Transactional(readOnly = true) public List<User> findUsers(String keyword) { return userMapper.findByKeyword(keyword); } }
|
8.2 读写分离事务边界
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20
| @Service public class OrderService {
@Autowired private OrderMapper orderMapper;
@Autowired private UserMapper userMapper;
public void processOrder(Order order) { User user = userMapper.selectById(order.getUserId());
orderMapper.insert(order);
userMapper.updateBalance(user.getId(), order.getAmount()); } }
|
九、监控与告警
9.1 数据源监控指标
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40
| @Component public class DataSourceMetrics {
private final MeterRegistry meterRegistry; private final Map<String, DataSource> dataSourceMap;
public DataSourceMetrics(MeterRegistry meterRegistry, Map<String, DataSource> dataSourceMap) { this.meterRegistry = meterRegistry; this.dataSourceMap = dataSourceMap; }
@EventListener @Async public void handleDataSourceEvent(DataSourceEvent event) { Counter.builder("datasource.operation.count") .tag("datasource", event.getDataSourceName()) .tag("operation", event.getOperation()) .register(meterRegistry) .increment(); }
@Scheduled(fixedDelay = 10000) public void recordConnectionMetrics() { dataSourceMap.forEach((name, dataSource) -> { if (dataSource instanceof HikariDataSource) { HikariDataSource hikariDataSource = (HikariDataSource) dataSource; HikariPoolMXBean poolBean = hikariDataSource.getHikariPoolMXBean();
Gauge.builder("datasource.connections.active") .tag("datasource", name) .register(meterRegistry, poolBean, HikariPoolMXBean::getActiveConnections);
Gauge.builder("datasource.connections.idle") .tag("datasource", name) .register(meterRegistry, poolBean, HikariPoolMXBean::getIdleConnections); } }); } }
|
9.2 告警规则配置
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29
| management: endpoints: web: exposure: include: health,metrics,prometheus metrics: export: prometheus: enabled: true
groups: - name: datasource-alerts rules: - alert: DataSourceDown expr: datasource_health_status == 0 for: 30s labels: severity: critical annotations: summary: "数据源 {{ $labels.datasource }} 不可用"
- alert: HighConnectionUsage expr: datasource_connections_active / datasource_connections_total > 0.8 for: 1m labels: severity: warning annotations: summary: "数据源 {{ $labels.datasource }} 连接使用率过高"
|
十、性能优化策略
10.1 读写分离优化
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31
| @Component public class ReadWriteOptimizer {
@Transactional(readOnly = true) public List<User> batchFindUsers(List<Long> userIds) { return userMapper.selectBatchIds(userIds); }
@Transactional public void batchInsertUsers(List<User> users) { userMapper.insertBatch(users); }
public void createUserWithDelayRead(User user) { userMapper.insert(user);
CompletableFuture.runAsync(() -> { try { Thread.sleep(100); User savedUser = userMapper.selectById(user.getId()); log.info("延迟读取用户: {}", savedUser); } catch (InterruptedException e) { Thread.currentThread().interrupt(); } }); } }
|
10.2 缓存集成
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27
| @Service public class CachedUserService {
@Autowired private UserMapper userMapper;
@Autowired private RedisTemplate<String, Object> redisTemplate;
@Cacheable(value = "users", key = "#id") @Transactional(readOnly = true) public User findById(Long id) { return userMapper.selectById(id); }
@CacheEvict(value = "users", key = "#user.id") @Transactional public void updateUser(User user) { userMapper.updateById(user); }
@CacheEvict(value = "users", allEntries = true) @Transactional public void clearUserCache() { } }
|
十一、最佳实践总结
11.1 配置最佳实践
- 连接池配置:根据业务量调整最大连接数,避免连接泄漏。
- 超时设置:合理设置连接超时和查询超时,避免长时间阻塞。
- 健康检查:定期检查数据源健康状态,及时发现问题。
- 监控告警:建立完善的监控体系,异常及时告警。
11.2 开发最佳实践
- 事务边界:明确读写操作的事务边界,避免跨库事务。
- 强制路由:必要时使用强制路由,确保数据一致性。
- 异常处理:妥善处理数据源切换异常,提供降级方案。
- 性能测试:定期进行压力测试,验证架构性能。
11.3 运维最佳实践
- 备份策略:建立完善的数据备份和恢复策略。
- 扩容方案:制定数据源扩容和缩容方案。
- 故障演练:定期进行故障演练,验证高可用方案。
- 文档维护:维护详细的架构文档和操作手册。
十二、常见问题与解决方案
12.1 主从延迟问题
问题:主从同步延迟导致读取到旧数据。
解决方案:
- 使用强制路由到主库读取关键数据。
- 实现延迟读取机制。
- 监控主从延迟,设置告警阈值。
12.2 连接池耗尽
问题:高并发下连接池连接耗尽。
解决方案:
- 调整连接池大小配置。
- 优化SQL查询性能。
- 实现连接池监控和告警。
12.3 数据源切换失败
问题:数据源切换时出现异常。
解决方案:
- 实现降级策略,切换到可用数据源。
- 增加重试机制。
- 完善异常处理和日志记录。
十三、总结
SpringBoot读写分离多节点架构通过合理的数据源配置、负载均衡策略、故障转移机制和监控告警,能够显著提升系统的并发能力和可用性。在实际应用中,需要根据业务特点选择合适的中间件和配置参数,并建立完善的监控和运维体系,确保架构的稳定运行。
通过本文的实践指导,读者可以快速搭建企业级的读写分离多节点架构,为业务发展提供强有力的技术支撑。