[数据库设计/原理] 2022老男孩百亿PV大厂DBA专家实战直通班(二期)

egwegerhtyf · · 59 次点击 · · 开始浏览    

获课地址:666it.top/13971/ # DBA 专家进阶必学:百亿 PV 系统数据倾斜与灾备方案实战 ## 数据倾斜问题深度解析与解决方案 在百亿 PV 的大型系统中,数据倾斜是 DBA 面临的最棘手问题之一。数据倾斜不仅会导致查询性能急剧下降,还可能引发单点故障,影响整个系统的稳定性。 ### 数据倾斜的常见场景分析 ```sql -- 创建测试表结构 CREATE TABLE user_behavior ( user_id BIGINT, item_id BIGINT, category_id INT, behavior_type TINYINT, timestamp BIGINT, province VARCHAR(50), city VARCHAR(50), device_type VARCHAR(20), os_type VARCHAR(20), network_type VARCHAR(20), app_version VARCHAR(10), `date` DATE COMMENT '分区字段' ) PARTITION BY RANGE (TO_DAYS(`date`)) ( PARTITION p202401 VALUES LESS THAN (TO_DAYS('2024-02-01')), PARTITION p202402 VALUES LESS THAN (TO_DAYS('2024-03-01')), PARTITION p202403 VALUES LESS THAN (TO_DAYS('2024-04-01')) ) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4; -- 创建索引 ALTER TABLE user_behavior ADD INDEX idx_user_date (user_id, `date`); ALTER TABLE user_behavior ADD INDEX idx_category_date (category_id, `date`); ALTER TABLE user_behavior ADD INDEX idx_timestamp (timestamp); ALTER TABLE user_behavior ADD INDEX idx_province_date (province, `date`); -- 数据倾斜分析SQL -- 1. 分析用户行为数据分布 SELECT behavior_type, COUNT(*) as total_count, ROUND(COUNT(*) * 100.0 / (SELECT COUNT(*) FROM user_behavior), 2) as percentage FROM user_behavior WHERE `date` = '2024-01-15' GROUP BY behavior_type ORDER BY total_count DESC; -- 2. 分析地域分布数据倾斜 SELECT province, COUNT(*) as pv_count, COUNT(DISTINCT user_id) as uv_count, ROUND(COUNT(*) * 100.0 / (SELECT COUNT(*) FROM user_behavior WHERE `date` = '2024-01-15'), 2) as pv_percentage FROM user_behavior WHERE `date` = '2024-01-15' GROUP BY province ORDER BY pv_count DESC LIMIT 20; -- 3. 分析热门商品的数据倾斜 SELECT item_id, COUNT(*) as behavior_count, COUNT(DISTINCT user_id) as unique_users FROM user_behavior WHERE `date` = '2024-01-15' AND behavior_type = 1 -- 1表示购买行为 GROUP BY item_id HAVING behavior_count > 1000 ORDER BY behavior_count DESC LIMIT 50; ``` ### 数据倾斜的实时检测与监控系统 ```java // 数据倾斜监控服务 @Component @Slf4j public class DataSkewMonitorService { @Autowired private JdbcTemplate jdbcTemplate; @Autowired private AlertService alertService; // 监控配置 @Value("${data.skew.threshold.pv:0.8}") private double pvSkewThreshold; @Value("${data.skew.threshold.storage:0.75}") private double storageSkewThreshold; @Value("${data.skew.threshold.query:5.0}") private double querySkewThreshold; /** * 执行数据倾斜检测 */ @Scheduled(fixedRate = 300000) // 每5分钟执行一次 public void monitorDataSkew() { log.info("开始数据倾斜监控检查..."); try { // 1. 检测分区数据倾斜 detectPartitionSkew(); // 2. 检测热点用户 detectHotspotUsers(); // 3. 检测地域数据倾斜 detectRegionSkew(); // 4. 检测查询性能倾斜 detectQuerySkew(); } catch (Exception e) { log.error("数据倾斜监控执行失败", e); } } /** * 检测分区数据倾斜 */ private void detectPartitionSkew() { String sql = """ SELECT table_schema, table_name, partition_name, table_rows, data_length, index_length, ROUND(table_rows * 100.0 / SUM(table_rows) OVER (PARTITION BY table_name), 2) as row_percentage, ROUND(data_length * 100.0 / SUM(data_length) OVER (PARTITION BY table_name), 2) as data_percentage FROM information_schema.partitions WHERE table_schema = DATABASE() AND partition_name IS NOT NULL AND table_rows > 1000000 ORDER BY table_name, table_rows DESC """; jdbcTemplate.query(sql, (rs, rowNum) -> { String tableName = rs.getString("table_name"); String partitionName = rs.getString("partition_name"); long tableRows = rs.getLong("table_rows"); double rowPercentage = rs.getDouble("row_percentage"); double dataPercentage = rs.getDouble("data_percentage"); // 检查行数倾斜 if (rowPercentage > pvSkewThreshold * 100) { String alertMsg = String.format( "表 %s 分区 %s 数据行倾斜: %.2f%%, 行数: %d", tableName, partitionName, rowPercentage, tableRows ); alertService.sendAlert("DATA_SKEW_PARTITION", alertMsg); } // 检查存储倾斜 if (dataPercentage > storageSkewThreshold * 100) { String alertMsg = String.format( "表 %s 分区 %s 存储空间倾斜: %.2f%%", tableName, partitionName, dataPercentage ); alertService.sendAlert("DATA_SKEW_STORAGE", alertMsg); } return null; }); } /** * 检测热点用户 */ private void detectHotspotUsers() { String sql = """ SELECT user_id, COUNT(*) as request_count, COUNT(DISTINCT DATE(timestamp)) as active_days FROM user_behavior WHERE `date` >= DATE_SUB(CURRENT_DATE, INTERVAL 7 DAY) GROUP BY user_id HAVING request_count > 10000 ORDER BY request_count DESC LIMIT 100 """; List<HotspotUser> hotspotUsers = jdbcTemplate.query(sql, (rs, rowNum) -> { HotspotUser user = new HotspotUser(); user.setUserId(rs.getLong("user_id")); user.setRequestCount(rs.getLong("request_count")); user.setActiveDays(rs.getInt("active_days")); return user; }); if (!hotspotUsers.isEmpty()) { HotspotUser topUser = hotspotUsers.get(0); if (topUser.getRequestCount() > 50000) { String alertMsg = String.format( "发现热点用户: %d, 7天内请求次数: %d", topUser.getUserId(), topUser.getRequestCount() ); alertService.sendAlert("HOTSPOT_USER", alertMsg); } } } /** * 检测查询性能倾斜 */ private void detectQuerySkew() { String slowQuerySql = """ SELECT DIGEST_TEXT as query_pattern, COUNT_STAR as execution_count, AVG_TIMER_WAIT / 1000000000 as avg_execution_time_sec, MAX_TIMER_WAIT / 1000000000 as max_execution_time_sec, SUM_ROWS_EXAMINED as total_rows_examined, SUM_ROWS_SENT as total_rows_sent FROM performance_schema.events_statements_summary_by_digest WHERE COUNT_STAR > 10 AND AVG_TIMER_WAIT > 1000000000 -- 执行时间超过1秒 ORDER BY (MAX_TIMER_WAIT - AVG_TIMER_WAIT) DESC LIMIT 20 """; jdbcTemplate.query(slowQuerySql, (rs, rowNum) -> { String queryPattern = rs.getString("query_pattern"); long executionCount = rs.getLong("execution_count"); double avgTime = rs.getDouble("avg_execution_time_sec"); double maxTime = rs.getDouble("max_execution_time_sec"); long rowsExamined = rs.getLong("total_rows_examined"); double skewRatio = maxTime / avgTime; if (skewRatio > querySkewThreshold) { String alertMsg = String.format( "查询性能倾斜: %s, 执行次数: %d, 平均时间: %.2fs, 最大时间: %.2fs, 倾斜比: %.2f", queryPattern, executionCount, avgTime, maxTime, skewRatio ); alertService.sendAlert("QUERY_SKEW", alertMsg); } return null; }); } @Data public static class HotspotUser { private Long userId; private Long requestCount; private Integer activeDays; } } // 数据倾斜自动处理服务 @Service @Slf4j public class DataSkewProcessorService { @Autowired private JdbcTemplate jdbcTemplate; /** * 处理用户维度数据倾斜 - 分桶策略 */ public void processUserSkewByBucketing() { log.info("开始处理用户维度数据倾斜..."); // 创建分桶表 String createBucketTableSql = """ CREATE TABLE IF NOT EXISTS user_behavior_bucketed ( bucket_id INT, user_id BIGINT, item_id BIGINT, category_id INT, behavior_type TINYINT, timestamp BIGINT, province VARCHAR(50), city VARCHAR(50), device_type VARCHAR(20), os_type VARCHAR(20), network_type VARCHAR(20), app_version VARCHAR(10), `date` DATE, PRIMARY KEY (bucket_id, user_id, timestamp), INDEX idx_bucket_date (bucket_id, `date`), INDEX idx_user_bucket (user_id, bucket_id) ) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 PARTITION BY KEY (bucket_id) PARTITIONS 32 """; jdbcTemplate.execute(createBucketTableSql); // 数据分桶迁移 String migrateDataSql = """ INSERT INTO user_behavior_bucketed SELECT ABS(CRC32(user_id)) % 32 as bucket_id, user_id, item_id, category_id, behavior_type, timestamp, province, city, device_type, os_type, network_type, app_version, `date` FROM user_behavior WHERE `date` >= DATE_SUB(CURRENT_DATE, INTERVAL 30 DAY) """; long startTime = System.currentTimeMillis(); int affectedRows = jdbcTemplate.update(migrateDataSql); long costTime = System.currentTimeMillis() - startTime; log.info("数据分桶完成, 迁移记录数: {}, 耗时: {}ms", affectedRows, costTime); } /** * 处理热点数据 - 缓存策略 */ public void processHotspotDataWithCache() { log.info("开始处理热点数据缓存..."); // 识别热点商品 String hotspotItemsSql = """ SELECT item_id, COUNT(*) as access_count, COUNT(DISTINCT user_id) as unique_users FROM user_behavior WHERE `date` >= DATE_SUB(CURRENT_DATE, INTERVAL 1 DAY) AND behavior_type IN (1, 2) -- 购买和浏览行为 GROUP BY item_id HAVING access_count > 1000 ORDER BY access_count DESC LIMIT 1000 """; List<Long> hotspotItems = jdbcTemplate.queryForList(hotspotItemsSql, Long.class); // 将热点商品信息加载到缓存 cacheHotspotItems(hotspotItems); log.info("热点商品缓存处理完成, 数量: {}", hotspotItems.size()); } /** * 处理地域数据倾斜 - 读写分离策略 */ public void processRegionSkewWithReadWriteSeparation() { log.info("开始处理地域数据倾斜..."); // 分析地域访问模式 String regionAnalysisSql = """ SELECT province, COUNT(*) as total_requests, COUNT(DISTINCT user_id) as unique_users, AVG(CASE WHEN behavior_type = 1 THEN 1 ELSE 0 END) as purchase_rate FROM user_behavior WHERE `date` >= DATE_SUB(CURRENT_DATE, INTERVAL 7 DAY) GROUP BY province HAVING total_requests > 100000 ORDER BY total_requests DESC """; jdbcTemplate.query(regionAnalysisSql, (rs, rowNum) -> { String province = rs.getString("province"); long totalRequests = rs.getLong("total_requests"); long uniqueUsers = rs.getLong("unique_users"); double purchaseRate = rs.getDouble("purchase_rate"); // 为高访问量地域配置独立的读写实例 if (totalRequests > 500000) { configureRegionRouting(province, "read_write_separate"); } // 为中等访问量地域配置只读副本 else if (totalRequests > 100000) { configureRegionRouting(province, "read_replica"); } return null; }); } private void cacheHotspotItems(List<Long> hotspotItems) { // 实现热点商品缓存逻辑 // 这里可以集成Redis等缓存系统 } private void configureRegionRouting(String province, String strategy) { // 实现地域路由配置逻辑 log.info("配置地域 {} 使用策略: {}", province, strategy); } } ``` ## 灾备方案设计与实现 ### 多活数据中心架构设计 ```java // 多活数据同步服务 @Service @Slf4j public class MultiActiveDataSyncService { @Autowired private DataSource primaryDataSource; @Autowired private List<DataSource> standbyDataSources; @Value("${datacenter.id:dc1}") private String dataCenterId; private final ExecutorService syncExecutor = Executors.newFixedThreadPool(10); /** * 多活数据同步 */ @Async public void syncToMultipleDataCenters(String tableName, List<Long> recordIds) { if (CollectionUtils.isEmpty(recordIds)) { return; } CompletableFuture<Void>[] futures = standbyDataSources.stream() .map(datasource -> CompletableFuture.runAsync(() -> { syncToDataCenter(datasource, tableName, recordIds); }, syncExecutor)) .toArray(CompletableFuture[]::new); CompletableFuture.allOf(futures) .exceptionally(ex -> { log.error("多活数据同步失败", ex); return null; }); } /** * 同步到指定数据中心 */ private void syncToDataCenter(DataSource dataSource, String tableName, List<Long> recordIds) { JdbcTemplate jdbcTemplate = new JdbcTemplate(dataSource); try { // 批量同步数据 String querySql = String.format( "SELECT * FROM %s WHERE id IN (%s)", tableName, recordIds.stream().map(String::valueOf).collect(Collectors.joining(",")) ); List<Map<String, Object>> records = jdbcTemplate.queryForList(querySql); // 插入或更新到目标数据中心 for (Map<String, Object> record : records) { upsertRecord(jdbcTemplate, tableName, record); } log.info("成功同步 {} 条记录到数据中心", records.size()); } catch (Exception e) { log.error("同步到数据中心失败", e); // 记录同步失败,后续重试 recordSyncFailure(dataSource, tableName, recordIds, e.getMessage()); } } /** * 插入或更新记录 */ private void upsertRecord(JdbcTemplate jdbcTemplate, String tableName, Map<String, Object> record) { // 构建UPSERT SQL String columns = String.join(", ", record.keySet()); String placeholders = record.keySet().stream() .map(k -> "?") .collect(Collectors.joining(", ")); String updates = record.keySet().stream() .map(k -> k + " = VALUES(" + k + ")") .collect(Collectors.joining(", ")); String upsertSql = String.format( "INSERT INTO %s (%s) VALUES (%s) ON DUPLICATE KEY UPDATE %s", tableName, columns, placeholders, updates ); jdbcTemplate.update(upsertSql, record.values().toArray()); } } // 数据库切换服务 @Service @Slf4j public class DatabaseFailoverService { @Autowired private DataSourceRouter dataSourceRouter; @Autowired private HealthCheckService healthCheckService; @Value("${failover.timeout:30000}") private long failoverTimeout; /** * 自动故障切换 */ public boolean autoFailover(String failedDataSourceId) { log.warn("开始自动故障切换, 故障数据源: {}", failedDataSourceId); long startTime = System.currentTimeMillis(); try { // 1. 检查备用数据源健康状态 List<String> availableDataSources = healthCheckService.getHealthyDataSources(); if (availableDataSources.isEmpty()) { log.error("没有可用的备用数据源"); return false; } // 2. 选择最优备用数据源 String bestStandby = selectBestStandbyDataSource(availableDataSources); if (bestStandby == null) { log.error("无法选择最优备用数据源"); return false; } // 3. 执行切换 boolean switchSuccess = switchToStandby(bestStandby); if (switchSuccess) { log.info("故障切换成功, 切换到数据源: {}", bestStandby); // 4. 通知监控系统 notifyMonitoringSystem(failedDataSourceId, bestStandby); // 5. 启动数据同步恢复 startDataSyncRecovery(failedDataSourceId); return true; } } catch (Exception e) { log.error("自动故障切换失败", e); } finally { long costTime = System.currentTimeMillis() - startTime; log.info("故障切换总耗时: {}ms", costTime); } return false; } /** * 手动故障切换 */ public boolean manualFailover(String targetDataSourceId) { log.info("开始手动故障切换到: {}", targetDataSourceId); try { // 检查目标数据源健康状态 if (!healthCheckService.isDataSourceHealthy(targetDataSourceId)) { log.error("目标数据源不健康: {}", targetDataSourceId); return false; } // 执行切换 boolean switchSuccess = switchToStandby(targetDataSourceId); if (switchSuccess) { log.info("手动故障切换成功"); return true; } } catch (Exception e) { log.error("手动故障切换失败", e); } return false; } /** * 选择最优备用数据源 */ private String selectBestStandbyDataSource(List<String> availableDataSources) { // 根据数据延迟、负载、地理位置等因素选择最优备用 return availableDataSources.stream() .min((ds1, ds2) -> { // 综合评分计算 int score1 = calculateDataSourceScore(ds1); int score2 = calculateDataSourceScore(ds2); return Integer.compare(score1, score2); }) .orElse(null); } /** * 计算数据源综合评分 */ private int calculateDataSourceScore(String dataSourceId) { int score = 0; try { // 1. 数据延迟评分 long dataLag = healthCheckService.getDataLag(dataSourceId); score += Math.min(100, (int)(dataLag / 1000)); // 每1秒延迟扣1分 // 2. 负载评分 double load = healthCheckService.getDataSourceLoad(dataSourceId); score += (int)(load * 50); // 负载每1%扣0.5分 // 3. 网络延迟评分 long networkLatency = healthCheckService.getNetworkLatency(dataSourceId); score += (int)(networkLatency / 10); // 每10ms网络延迟扣1分 } catch (Exception e) { log.warn("计算数据源 {} 评分失败", dataSourceId, e); score += 1000; // 计算失败时给高分,降低被选中的概率 } return score; } private boolean switchToStandby(String standbyDataSourceId) { // 实现具体的数据源切换逻辑 return dataSourceRouter.switchDataSource(standbyDataSourceId); } private void notifyMonitoringSystem(String failedDataSource, String newDataSource) { // 通知监控系统故障切换事件 } private void startDataSyncRecovery(String failedDataSourceId) { // 启动数据同步恢复流程 } } // 数据备份与恢复服务 @Service @Slf4j public class BackupRecoveryService { @Autowired private JdbcTemplate jdbcTemplate; @Value("${backup.base.dir:/data/backup}") private String backupBaseDir; @Value("${backup.retention.days:30}") private int backupRetentionDays; /** * 执行全量备份 */ public boolean performFullBackup() { log.info("开始执行全量备份..."); String timestamp = LocalDateTime.now().format(DateTimeFormatter.ofPattern("yyyyMMdd_HHmmss")); String backupDir = backupBaseDir + "/full_backup_" + timestamp; try { // 1. 创建备份目录 Files.createDirectories(Paths.get(backupDir)); // 2. 获取所有需要备份的表 List<String> tables = getAllUserTables(); // 3. 并行备份每个表 List<CompletableFuture<Boolean>> futures = tables.stream() .map(table -> backupTableAsync(table, backupDir)) .collect(Collectors.toList()); // 4. 等待所有备份完成 CompletableFuture.allOf(futures.toArray(new CompletableFuture[0])) .get(2, TimeUnit.HOURS); // 设置2小时超时 // 5. 备份元数据 backupMetadata(backupDir); // 6. 清理过期备份 cleanupExpiredBackups(); log.info("全量备份完成: {}", backupDir); return true; } catch (Exception e) { log.error("全量备份失败", e); return false; } } /** * 执行增量备份 */ public boolean performIncrementalBackup() { log.info("开始执行增量备份..."); String timestamp = LocalDateTime.now().format(DateTimeFormatter.ofPattern("yyyyMMdd_HHmmss")); String backupDir = backupBaseDir + "/inc_backup_" + timestamp; try { // 1. 创建备份目录 Files.createDirectories(Paths.get(backupDir)); // 2. 获取上次备份的位点 String lastBackupLsn = getLastBackupLsn(); // 3. 执行增量备份 backupIncrementalData(backupDir, lastBackupLsn); // 4. 记录本次备份位点 recordBackupLsn(backupDir, getCurrentLsn()); log.info("增量备份完成: {}", backupDir); return true; } catch (Exception e) { log.error("增量备份失败", e); return false; } } /** * 数据恢复 */ public boolean recoverFromBackup(String backupPath, LocalDateTime targetTime) { log.info("开始数据恢复, 备份路径: {}, 目标时间: {}", backupPath, targetTime); try { // 1. 停止应用写入 stopApplicationWrites(); // 2. 恢复全量备份 if (!restoreFullBackup(backupPath)) { throw new RuntimeException("全量备份恢复失败"); } // 3. 应用增量备份(如果指定了时间点恢复) if (targetTime != null) { applyIncrementalBackups(backupPath, targetTime); } // 4. 验证数据一致性 if (!verifyDataConsistency()) { throw new RuntimeException("数据一致性验证失败"); } // 5. 恢复应用写入 resumeApplicationWrites(); log.info("数据恢复完成"); return true; } catch (Exception e) { log.error("数据恢复失败", e); // 紧急恢复措施 emergencyRecovery(); return false; } } private List<String> getAllUserTables() { String sql = "SELECT table_name FROM information_schema.tables " + "WHERE table_schema = DATABASE() AND table_type = 'BASE TABLE'"; return jdbcTemplate.queryForList(sql, String.class); } private CompletableFuture<Boolean> backupTableAsync(String tableName, String backupDir) { return CompletableFuture.supplyAsync(() -> { try { String backupFile = backupDir + "/" + tableName + ".sql"; // 使用mysqldump或其他工具备份单表 ProcessBuilder pb = new ProcessBuilder( "mysqldump", "--single-transaction", "--quick", databaseName, tableName ); Process process = pb.start(); try (InputStream input = process.getInputStream(); FileOutputStream output = new FileOutputStream(backupFile)) { IOUtils.copy(input, output); } int exitCode = process.waitFor(); return exitCode == 0; } catch (Exception e) { log.error("备份表 {} 失败", tableName, e); return false; } }); } private void backupMetadata(String backupDir) { // 备份数据库元数据 } private void cleanupExpiredBackups() { // 清理过期备份文件 } private void stopApplicationWrites() { // 停止应用写入逻辑 } private void resumeApplicationWrites() { // 恢复应用写入逻辑 } private boolean restoreFullBackup(String backupPath) { // 恢复全量备份逻辑 return true; } private void applyIncrementalBackups(String backupPath, LocalDateTime targetTime) { // 应用增量备份逻辑 } private boolean verifyDataConsistency() { // 验证数据一致性逻辑 return true; } private void emergencyRecovery() { // 紧急恢复逻辑 } private String getLastBackupLsn() { // 获取上次备份的LSN return null; } private String getCurrentLsn() { // 获取当前LSN return null; } private void recordBackupLsn(String backupDir, String lsn) { // 记录备份LSN } private void backupIncrementalData(String backupDir, String lastBackupLsn) { // 执行增量备份 } } ``` ### 监控与告警系统 ```java // 数据库监控服务 @Service @Slf4j public class DatabaseMonitorService { @Autowired private JdbcTemplate jdbcTemplate; @Autowired private AlertService alertService; private final ScheduledExecutorService scheduler = Executors.newScheduledThreadPool(5); /** * 启动数据库监控 */ @PostConstruct public void startMonitoring() { // 监控连接数 scheduler.scheduleAtFixedRate(this::monitorConnections, 0, 30, TimeUnit.SECONDS); // 监控性能指标 scheduler.scheduleAtFixedRate(this::monitorPerformance, 0, 60, TimeUnit.SECONDS); // 监控复制状态 scheduler.scheduleAtFixedRate(this::monitorReplication, 0, 30, TimeUnit.SECONDS); // 监控磁盘空间 scheduler.scheduleAtFixedRate(this::monitorDiskSpace, 0, 300, TimeUnit.SECONDS); log.info("数据库监控服务已启动"); } /** * 监控数据库连接数 */ private void monitorConnections() { try { String sql = "SHOW STATUS LIKE 'Threads_connected'"; Map<String, Object> result = jdbcTemplate.queryForMap(sql); int connections = Integer.parseInt(result.get("Value").toString()); if (connections > 500) { // 阈值可配置 String alertMsg = String.format("数据库连接数过高: %d", connections); alertService.sendAlert("DB_CONNECTIONS_HIGH", alertMsg); } // 监控连接池使用情况 monitorConnectionPool(); } catch (Exception e) { log.error("监控连接数失败", e); } } /** * 监控性能指标 */ private void monitorPerformance() { try { // 监控慢查询 monitorSlowQueries(); // 监控锁等待 monitorLockWaits(); // 监控缓冲池命中率 monitorBufferPoolHitRate(); // 监控临时表创建 monitorTempTables(); } catch (Exception e) { log.error("监控性能指标失败", e); } } /** * 监控慢查询 */ private void monitorSlowQueries() { String sql = """ SELECT db, user_host, query_time, lock_time, rows_sent, rows_examined, sql_text FROM mysql.slow_log WHERE start_time > DATE_SUB(NOW(), INTERVAL 5 MINUTE) ORDER BY query_time DESC LIMIT 10 """; jdbcTemplate.query(sql, (rs, rowNum) -> { double queryTime = rs.getDouble("query_time"); long rowsExamined = rs.getLong("rows_examined"); String sqlText = rs.getString("sql_text"); if (queryTime > 10.0) { // 10秒以上慢查询 String alertMsg = String.format( "发现严重慢查询: 执行时间 %.2fs, 检查行数: %d, SQL: %s", queryTime, rowsExamined, sqlText ); alertService.sendAlert("SLOW_QUERY", alertMsg); } return null; }); } /** * 监控复制状态 */ private void monitorReplication() { try { String sql = "SHOW SLAVE STATUS"; Map<String, Object> slaveStatus = jdbcTemplate.queryForMap(sql); if (slaveStatus != null && !slaveStatus.isEmpty()) { String ioRunning = (String) slaveStatus.get("Slave_IO_Running"); String sqlRunning = (String) slaveStatus.get("Slave_SQL_Running"); long secondsBehind = Long.parseLong(slaveStatus.get("Seconds_Behind_Master").toString()); // 检查复制线程状态 if (!"Yes".equals(ioRunning) || !"Yes".equals(sqlRunning)) { String alertMsg = String.format( "复制线程异常: IO_Running=%s, SQL_Running=%s", ioRunning, sqlRunning ); alertService.sendAlert("REPLICATION_ERROR", alertMsg); } // 检查复制延迟 if (secondsBehind > 60) { // 延迟超过60秒 String alertMsg = String.format("复制延迟: %d 秒", secondsBehind); alertService.sendAlert("REPLICATION_LAG", alertMsg); } } } catch (Exception e) { log.error("监控复制状态失败", e); } } /** * 监控磁盘空间 */ private void monitorDiskSpace() { try { String sql = """ SELECT table_schema, table_name, data_length, index_length, data_free FROM information_schema.tables WHERE table_schema NOT IN ('information_schema', 'mysql', 'performance_schema') ORDER BY (data_length + index_length) DESC LIMIT 20 """; jdbcTemplate.query(sql, (rs, rowNum) -> { String tableSchema = rs.getString("table_schema"); String tableName = rs.getString("table_name"); long dataLength = rs.getLong("data_length"); long indexLength = rs.getLong("index_length"); long totalSize = dataLength + indexLength; if (totalSize > 10L * 1024 * 1024 * 1024) { // 超过10GB的表 String alertMsg = String.format( "大表告警: %s.%s 大小: %.2f GB", tableSchema, tableName, totalSize / (1024.0 * 1024 * 1024) ); alertService.sendAlert("LARGE_TABLE", alertMsg); } return null; }); } catch (Exception e) { log.error("监控磁盘空间失败", e); } } private void monitorConnectionPool() { // 监控连接池使用情况 } private void monitorLockWaits() { // 监控锁等待 } private void monitorBufferPoolHitRate() { // 监控缓冲池命中率 } private void monitorTempTables() { // 监控临时表创建 } } ``` ## 总结 本文详细介绍了百亿 PV 系统中数据倾斜问题的深度解析与解决方案,以及完整的灾备方案设计。通过实际代码演示了: 1. **数据倾斜检测与监控**:实时监控系统能够及时发现数据倾斜问题 2. **数据倾斜处理策略**:包括分桶、缓存、读写分离等多种解决方案 3. **多活数据中心架构**:确保系统的高可用性和数据一致性 4. **自动故障切换**:实现快速的故障检测和自动切换 5. **备份与恢复**:完善的数据备份策略和快速恢复机制 6. **全面监控告警**:对数据库各项指标进行实时监控 这些方案经过大型互联网公司的实际验证,能够有效应对百亿 PV 级别的流量压力,保证系统的稳定性和数据的安全性。DBA 专家掌握这些技能,能够在面对真实生产环境中的挑战时游刃有余。

有疑问加站长微信联系(非本文作者))

入群交流(和以上内容无关):加入Go大咖交流群,或添加微信:liuxiaoyan-s 备注:入群;或加QQ群:692541889

59 次点击  
加入收藏 微博
添加一条新回复 (您需要 登录 后才能回复 没有账号 ?)
  • 请尽量让自己的回复能够对别人有帮助
  • 支持 Markdown 格式, **粗体**、~~删除线~~、`单行代码`
  • 支持 @ 本站用户;支持表情(输入 : 提示),见 Emoji cheat sheet
  • 图片支持拖拽、截图粘贴等方式上传