老男孩大厂DBA专家实战直通车二期/三期

edfgvasfcse · · 68 次点击 · · 开始浏览    

获课地址:666it.top/13971/ # DBA 专家进阶必学!老男孩百亿 PV 实战班(二期):解决数据倾斜 + 灾备方案设计 在当今数据驱动的时代,数据库管理员(DBA)面临着前所未有的挑战。随着业务规模不断扩大,百亿级别的 PV(页面浏览量)对数据库系统提出了极高的要求。《老男孩百亿 PV 实战班(二期)》聚焦于解决生产环境中的核心难题:数据倾斜和灾备方案设计,帮助 DBA 从业者实现从合格到专家的跨越。 ## 数据倾斜:分布式数据库的"隐形杀手" ### 数据倾斜的成因与危害 数据倾斜是分布式系统中常见的问题,当数据分布不均匀时,会导致某些节点负载过高,形成系统瓶颈。典型的倾斜场景包括: - 热点数据频繁访问 - 哈希分片键选择不当 - 时间序列数据的天然倾斜 - 业务特征导致的数据分布不均 ```sql -- 创建测试表和数据,模拟数据倾斜场景 CREATE TABLE user_behavior ( user_id BIGINT, item_id BIGINT, behavior_type INT, city VARCHAR(50), province VARCHAR(50), timestamp DATETIME, INDEX idx_user (user_id), INDEX idx_item (item_id), INDEX idx_city (city) ) ENGINE=InnoDB PARTITION BY HASH(user_id % 10) PARTITIONS 10; -- 插入模拟数据,制造热点用户 DELIMITER $$ CREATE PROCEDURE GenerateUserBehaviorData() BEGIN DECLARE i INT DEFAULT 0; DECLARE j INT DEFAULT 0; DECLARE current_user BIGINT; DECLARE hot_user_count INT DEFAULT 100; -- 热点用户数量 -- 插入热点用户数据(占总量1%,但访问量占90%) WHILE i < hot_user_count DO SET current_user = 1000000 + i; -- 热点用户ID范围 SET j = 0; WHILE j < 10000 DO -- 每个热点用户1万条记录 INSERT INTO user_behavior VALUES ( current_user, FLOOR(RAND() * 1000000), FLOOR(RAND() * 5) + 1, ELT(FLOOR(RAND() * 10) + 1, '北京', '上海', '广州', '深圳', '杭州', '成都', '武汉', '南京', '西安', '重庆'), ELT(FLOOR(RAND() * 5) + 1, '华北', '华东', '华南', '华中', '西部'), DATE_ADD('2024-01-01', INTERVAL FLOOR(RAND() * 365) DAY) ); SET j = j + 1; END WHILE; SET i = i + 1; END WHILE; -- 插入普通用户数据 SET i = 0; WHILE i < 100000 DO -- 10万普通用户 SET current_user = 2000000 + i; INSERT INTO user_behavior VALUES ( current_user, FLOOR(RAND() * 1000000), FLOOR(RAND() * 5) + 1, ELT(FLOOR(RAND() * 10) + 1, '北京', '上海', '广州', '深圳', '杭州', '成都', '武汉', '南京', '西安', '重庆'), ELT(FLOOR(RAND() * 5) + 1, '华北', '华东', '华南', '华中', '西部'), DATE_ADD('2024-01-01', INTERVAL FLOOR(RAND() * 365) DAY) ); SET i = i + 1; END WHILE; END$$ DELIMITER ; CALL GenerateUserBehaviorData(); -- 分析数据倾斜情况 SELECT PARTITION_NAME, TABLE_ROWS as row_count, ROUND(TABLE_ROWS * 100.0 / (SELECT COUNT(*) FROM user_behavior), 2) as percentage FROM information_schema.PARTITIONS WHERE TABLE_NAME = 'user_behavior' ORDER BY TABLE_ROWS DESC; ``` ### 数据倾斜检测与诊断 ```python #!/usr/bin/env python3 """ 数据倾斜检测工具 适用于MySQL/Redis/ClickHouse等分布式数据库 """ import pandas as pd import numpy as np import pymysql import redis from typing import Dict, List, Tuple import json import logging from datetime import datetime, timedelta class DataSkewDetector: """数据倾斜检测器""" def __init__(self, db_config: Dict): self.db_config = db_config self.logger = self._setup_logger() def _setup_logger(self): """设置日志""" logger = logging.getLogger('DataSkewDetector') logger.setLevel(logging.INFO) handler = logging.StreamHandler() formatter = logging.Formatter( '%(asctime)s - %(name)s - %(levelname)s - %(message)s' ) handler.setFormatter(formatter) logger.addHandler(handler) return logger def detect_mysql_table_skew(self, table_name: str, shard_key: str = None) -> Dict: """ 检测MySQL表数据倾斜 """ try: conn = pymysql.connect(**self.db_config) # 检测表大小分布 size_query = f""" SELECT TABLE_NAME, TABLE_ROWS, DATA_LENGTH, INDEX_LENGTH, ROUND(TABLE_ROWS * 100.0 / (SELECT SUM(TABLE_ROWS) FROM information_schema.TABLES WHERE TABLE_SCHEMA = DATABASE()), 2) as row_percentage, ROUND(DATA_LENGTH * 100.0 / (SELECT SUM(DATA_LENGTH) FROM information_schema.TABLES WHERE TABLE_SCHEMA = DATABASE()), 2) as size_percentage FROM information_schema.TABLES WHERE TABLE_SCHEMA = DATABASE() AND TABLE_NAME = '{table_name}' """ # 检测分区数据分布(如果表有分区) partition_query = f""" SELECT PARTITION_NAME, TABLE_ROWS, ROUND(TABLE_ROWS * 100.0 / (SELECT SUM(TABLE_ROWS) FROM information_schema.PARTITIONS WHERE TABLE_NAME = '{table_name}' AND TABLE_SCHEMA = DATABASE()), 2) as percentage FROM information_schema.PARTITIONS WHERE TABLE_NAME = '{table_name}' AND TABLE_SCHEMA = DATABASE() ORDER BY TABLE_ROWS DESC """ with conn.cursor() as cursor: cursor.execute(size_query) table_stats = cursor.fetchall() cursor.execute(partition_query) partition_stats = cursor.fetchall() # 计算倾斜度指标 skew_metrics = self._calculate_skew_metrics(partition_stats) result = { 'table_name': table_name, 'table_statistics': table_stats, 'partition_distribution': partition_stats, 'skew_metrics': skew_metrics, 'detection_time': datetime.now().isoformat(), 'recommendations': self._generate_recommendations(skew_metrics) } self.logger.info(f"表 {table_name} 倾斜检测完成,倾斜度: {skew_metrics['skew_ratio']:.2f}") return result except Exception as e: self.logger.error(f"检测表 {table_name} 倾斜时出错: {str(e)}") raise def _calculate_skew_metrics(self, partition_data: List[Tuple]) -> Dict: """计算倾斜度指标""" if not partition_data: return {} rows = [row[1] for row in partition_data if row[1] is not None] total_rows = sum(rows) if total_rows == 0: return {} # 计算标准差和变异系数 mean = total_rows / len(rows) std_dev = np.std(rows) cv = std_dev / mean if mean > 0 else 0 # 计算最大最小比例 max_rows = max(rows) min_rows = min(rows) max_min_ratio = max_rows / min_rows if min_rows > 0 else float('inf') # 计算80/20比例 sorted_rows = sorted(rows, reverse=True) top_20_percent = sum(sorted_rows[:int(len(rows) * 0.2)]) bottom_80_percent = sum(sorted_rows[int(len(rows) * 0.2):]) eighty_twenty_ratio = top_20_percent / bottom_80_percent if bottom_80_percent > 0 else float('inf') return { 'total_partitions': len(rows), 'total_rows': total_rows, 'mean_rows_per_partition': mean, 'std_deviation': std_dev, 'coefficient_of_variation': cv, 'max_min_ratio': max_min_ratio, 'eighty_twenty_ratio': eighty_twenty_ratio, 'skew_ratio': cv * 100, # 倾斜度百分比 'max_partition_rows': max_rows, 'min_partition_rows': min_rows } def _generate_recommendations(self, skew_metrics: Dict) -> List[str]: """生成优化建议""" recommendations = [] skew_ratio = skew_metrics.get('skew_ratio', 0) if skew_ratio > 50: recommendations.append("⚠️ 严重数据倾斜,建议重新设计分片策略") recommendations.append("🔧 考虑使用复合分片键或范围分片") recommendations.append("📊 分析热点数据特征,优化数据分布") elif skew_ratio > 20: recommendations.append("⚠️ 中度数据倾斜,建议监控并优化") recommendations.append("🔧 考虑数据重分布或动态分片") else: recommendations.append("✅ 数据分布相对均衡") if skew_metrics.get('max_min_ratio', 0) > 10: recommendations.append("📈 最大最小分区差异过大,建议平衡数据") return recommendations def detect_redis_cluster_skew(self, redis_config: Dict) -> Dict: """ 检测Redis集群数据倾斜 """ try: redis_client = redis.Redis(**redis_config) cluster_info = redis_client.info('keyspace') # 分析各个节点的key分布 key_distribution = {} total_keys = 0 for db, info in cluster_info.items(): if db.startswith('db'): keys = int(info['keys']) key_distribution[db] = keys total_keys += keys # 计算倾斜指标 if key_distribution: keys_list = list(key_distribution.values()) skew_metrics = self._calculate_skew_metrics( [(k, v) for k, v in key_distribution.items()] ) result = { 'key_distribution': key_distribution, 'total_keys': total_keys, 'skew_metrics': skew_metrics, 'detection_time': datetime.now().isoformat() } return result except Exception as e: self.logger.error(f"检测Redis集群倾斜时出错: {str(e)}") raise # 使用示例 if __name__ == "__main__": # MySQL配置 mysql_config = { 'host': 'localhost', 'user': 'root', 'password': 'password', 'database': 'test_db', 'charset': 'utf8mb4' } detector = DataSkewDetector(mysql_config) # 检测表倾斜 result = detector.detect_mysql_table_skew('user_behavior') print(json.dumps(result, indent=2, ensure_ascii=False)) ``` ### 数据倾斜解决方案 #### 1. 分片策略优化 ```sql -- 优化分片策略:使用复合分片键 CREATE TABLE user_behavior_optimized ( user_id BIGINT, item_id BIGINT, behavior_type INT, city VARCHAR(50), province VARCHAR(50), timestamp DATETIME, -- 添加复合分片键 shard_key VARCHAR(100) GENERATED ALWAYS AS (CONCAT(user_id, '_', city)), INDEX idx_user (user_id), INDEX idx_item (item_id), INDEX idx_city (city), INDEX idx_shard (shard_key) ) ENGINE=InnoDB PARTITION BY KEY(shard_key) PARTITIONS 20; -- 范围分片:按时间分片 CREATE TABLE user_behavior_range ( user_id BIGINT, item_id BIGINT, behavior_type INT, city VARCHAR(50), province VARCHAR(50), timestamp DATETIME, INDEX idx_user (user_id), INDEX idx_timestamp (timestamp) ) ENGINE=InnoDB PARTITION BY RANGE (YEAR(timestamp) * 100 + MONTH(timestamp)) ( PARTITION p202401 VALUES LESS THAN (202402), PARTITION p202402 VALUES LESS THAN (202403), PARTITION p202403 VALUES LESS THAN (202404), PARTITION p202404 VALUES LESS THAN (202405), PARTITION p202405 VALUES LESS THAN (202406), PARTITION p_future VALUES LESS THAN MAXVALUE ); ``` #### 2. 热点数据分散 ```python #!/usr/bin/env python3 """ 热点数据分散解决方案 """ import hashlib import time from typing import Any, List import threading from concurrent.futures import ThreadPoolExecutor class HotspotDistributor: """热点数据分散器""" def __init__(self, virtual_nodes: int = 1000): self.virtual_nodes = virtual_nodes self.node_weights = {} def consistent_hash(self, key: str, nodes: List[str]) -> str: """一致性哈希算法""" if not nodes: raise ValueError("节点列表不能为空") hash_value = self._hash_function(key) node_index = hash_value % len(nodes) return nodes[node_index] def virtual_node_hash(self, key: str, physical_nodes: List[str]) -> str: """虚拟节点哈希""" virtual_to_physical = {} # 为每个物理节点创建虚拟节点 for physical_node in physical_nodes: for i in range(self.virtual_nodes): virtual_node = f"{physical_node}_vnode_{i}" virtual_hash = self._hash_function(virtual_node) virtual_to_physical[virtual_hash] = physical_node # 找到最近的虚拟节点 key_hash = self._hash_function(key) sorted_hashes = sorted(virtual_to_physical.keys()) for node_hash in sorted_hashes: if node_hash >= key_hash: return virtual_to_physical[node_hash] # 环回第一个节点 return virtual_to_physical[sorted_hashes[0]] def _hash_function(self, key: str) -> int: """哈希函数""" return int(hashlib.md5(key.encode()).hexdigest()[:8], 16) def distribute_hot_keys(self, hot_keys: List[str], nodes: List[str]) -> Dict[str, str]: """分散热点key""" distribution = {} for key in hot_keys: # 为热点key添加随机后缀 salted_key = f"{key}_{self._hash_function(key) % 1000}" target_node = self.virtual_node_hash(salted_key, nodes) distribution[key] = target_node return distribution class ReadWriteSplitting: """读写分离解决方案""" def __init__(self, master_config: Dict, slave_configs: List[Dict]): self.master = self._create_connection(master_config) self.slaves = [self._create_connection(config) for config in slave_configs] self.slave_round_robin = 0 self.lock = threading.Lock() def _create_connection(self, config: Dict): """创建数据库连接""" # 实际项目中这里会创建真实的数据库连接 return config def get_read_connection(self): """获取读连接(负载均衡)""" with self.lock: slave = self.slaves[self.slave_round_robin] self.slave_round_robin = (self.slave_round_robin + 1) % len(self.slaves) return slave def execute_write(self, query: str, params: Tuple = None): """执行写操作""" # 实际项目中这里会执行真实的写操作 print(f"Write to master: {query}") return True def execute_read(self, query: str, params: Tuple = None): """执行读操作""" slave = self.get_read_connection() # 实际项目中这里会执行真实的读操作 print(f"Read from slave {slave['host']}: {query}") return [] # 使用示例 if __name__ == "__main__": # 热点数据分散示例 distributor = HotspotDistributor() nodes = ["node1", "node2", "node3", "node4", "node5"] hot_keys = ["user_12345", "item_67890", "order_11111"] distribution = distributor.distribute_hot_keys(hot_keys, nodes) print("热点Key分布:", distribution) ``` ## 灾备方案设计:保障业务连续性 ### 多活数据中心架构 ```python #!/usr/bin/env python3 """ 多活数据中心灾备方案 """ import time import json import logging from enum import Enum from typing import Dict, List, Optional from abc import ABC, abstractmethod import threading from dataclasses import dataclass class DataCenterStatus(Enum): ACTIVE = "active" STANDBY = "standby" FAILOVER = "failover" DEGRADED = "degraded" OFFLINE = "offline" @dataclass class DataCenter: id: str name: str region: str status: DataCenterStatus priority: int endpoints: Dict[str, str] health_check_url: str class DisasterRecoveryManager: """灾备管理器""" def __init__(self, data_centers: List[DataCenter]): self.data_centers = {dc.id: dc for dc in data_centers} self.active_dc = None self.standby_dcs = [] self.health_check_interval = 30 # 秒 self.failover_threshold = 3 # 连续失败次数阈值 self.health_stats = {} self.logger = self._setup_logger() self._initialize_dc_roles() def _setup_logger(self): """设置日志""" logger = logging.getLogger('DisasterRecoveryManager') logger.setLevel(logging.INFO) return logger def _initialize_dc_roles(self): """初始化数据中心角色""" # 按优先级排序 sorted_dcs = sorted(self.data_centers.values(), key=lambda x: x.priority) if sorted_dcs: self.active_dc = sorted_dcs[0] self.standby_dcs = sorted_dcs[1:] # 初始化健康状态 for dc in self.data_centers.values(): self.health_stats[dc.id] = { 'consecutive_failures': 0, 'last_check': None, 'response_time': None, 'overall_health': 100 } def start_health_monitoring(self): """启动健康监控""" def monitor_loop(): while True: self._perform_health_checks() time.sleep(self.health_check_interval) monitor_thread = threading.Thread(target=monitor_loop, daemon=True) monitor_thread.start() self.logger.info("健康监控已启动") def _perform_health_checks(self): """执行健康检查""" for dc in self.data_centers.values(): health = self._check_dc_health(dc) if not health['healthy']: self.health_stats[dc.id]['consecutive_failures'] += 1 self.logger.warning(f"数据中心 {dc.name} 健康检查失败: {health['error']}") # 检查是否需要故障转移 if (self.health_stats[dc.id]['consecutive_failures'] >= self.failover_threshold and dc.id == self.active_dc.id): self._initiate_failover(dc) else: self.health_stats[dc.id]['consecutive_failures'] = 0 self.health_stats[dc.id]['response_time'] = health['response_time'] self.health_stats[dc.id]['overall_health'] = health['health_score'] def _check_dc_health(self, dc: DataCenter) -> Dict: """检查数据中心健康状态""" # 模拟健康检查 # 实际项目中这里会检查数据库连接、网络延迟、服务状态等 try: start_time = time.time() # 模拟网络延迟 time.sleep(0.1) response_time = (time.time() - start_time) * 1000 # 毫秒 # 模拟健康评分计算 health_score = max(0, 100 - response_time / 10) return { 'healthy': health_score > 80, 'response_time': response_time, 'health_score': health_score, 'error': None if health_score > 80 else "响应时间过长" } except Exception as e: return { 'healthy': False, 'response_time': None, 'health_score': 0, 'error': str(e) } def _initiate_failover(self, failed_dc: DataCenter): """启动故障转移""" self.logger.info(f"启动故障转移,失败的数据中心: {failed_dc.name}") if not self.standby_dcs: self.logger.error("没有可用的备用数据中心!") return # 选择优先级最高的备用数据中心 new_active = self.standby_dcs[0] # 执行故障转移 self._execute_failover(failed_dc, new_active) # 更新角色 self.active_dc = new_active self.standby_dcs = [dc for dc in self.standby_dcs if dc.id != new_active.id] self.standby_dcs.append(failed_dc) self.logger.info(f"故障转移完成,新的主数据中心: {new_active.name}") def _execute_failover(self, from_dc: DataCenter, to_dc: DataCenter): """执行故障转移操作""" # 实际项目中这里会包括: # 1. 停止到故障中心的流量 # 2. 数据同步验证 # 3. 切换到新中心 # 4. 更新DNS/负载均衡配置 self.logger.info(f"从 {from_dc.name} 切换到 {to_dc.name}") # 模拟切换过程 steps = [ "停止故障中心流量", "验证数据一致性", "切换数据库连接", "更新路由配置", "验证服务可用性" ] for step in steps: self.logger.info(f"执行步骤: {step}") time.sleep(0.5) # 模拟操作时间 def get_current_topology(self) -> Dict: """获取当前拓扑状态""" return { 'active_dc': { 'id': self.active_dc.id, 'name': self.active_dc.name, 'status': self.active_dc.status.value }, 'standby_dcs': [ { 'id': dc.id, 'name': dc.name, 'status': dc.status.value, 'priority': dc.priority } for dc in self.standby_dcs ], 'health_stats': self.health_stats, 'last_updated': time.time() } # 数据备份与恢复策略 class BackupStrategy: """备份策略基类""" def __init__(self, retention_days: int = 30): self.retention_days = retention_days @abstractmethod def perform_backup(self, database: str, backup_path: str) -> bool: """执行备份""" pass @abstractmethod def perform_restore(self, backup_file: str, target_database: str) -> bool: """执行恢复""" pass class MySQLBackupStrategy(BackupStrategy): """MySQL备份策略""" def perform_backup(self, database: str, backup_path: str) -> bool: """执行MySQL备份""" try: # 实际项目中会使用mysqldump或XtraBackup backup_file = f"{backup_path}/{database}_{time.strftime('%Y%m%d_%H%M%S')}.sql" # 模拟备份过程 self._execute_mysqldump(database, backup_file) self._verify_backup(backup_file) self._cleanup_old_backups(backup_path) return True except Exception as e: logging.error(f"备份失败: {str(e)}") return False def perform_restore(self, backup_file: str, target_database: str) -> bool: """执行MySQL恢复""" try: # 模拟恢复过程 self._create_database_if_not_exists(target_database) self._execute_mysql_restore(backup_file, target_database) self._verify_restore(target_database) return True except Exception as e: logging.error(f"恢复失败: {str(e)}") return False def _execute_mysqldump(self, database: str, output_file: str): """执行mysqldump命令""" # 实际命令: mysqldump -u user -p database > output_file logging.info(f"执行mysqldump: {database} -> {output_file}") time.sleep(1) # 模拟备份时间 def _execute_mysql_restore(self, backup_file: str, database: str): """执行mysql恢复命令""" # 实际命令: mysql -u user -p database < backup_file logging.info(f"执行mysql恢复: {backup_file} -> {database}") time.sleep(2) # 模拟恢复时间 def _verify_backup(self, backup_file: str): """验证备份文件""" logging.info(f"验证备份文件: {backup_file}") def _verify_restore(self, database: str): """验证恢复结果""" logging.info(f"验证数据库恢复: {database}") def _create_database_if_not_exists(self, database: str): """创建数据库(如果不存在)""" logging.info(f"检查/创建数据库: {database}") def _cleanup_old_backups(self, backup_path: str): """清理旧备份""" logging.info(f"清理 {backup_path} 中的旧备份") # 使用示例 if __name__ == "__main__": # 创建数据中心配置 data_centers = [ DataCenter( id="dc1", name="北京主数据中心", region="cn-north-1", status=DataCenterStatus.ACTIVE, priority=1, endpoints={ "mysql": "mysql-dc1.internal.com:3306", "redis": "redis-dc1.internal.com:6379", "api": "api-dc1.internal.com:8080" }, health_check_url="http://health-dc1.internal.com/status" ), DataCenter( id="dc2", name="上海备用数据中心", region="cn-east-1", status=DataCenterStatus.STANDBY, priority=2, endpoints={ "mysql": "mysql-dc2.internal.com:3306", "redis": "redis-dc2.internal.com:6379", "api": "api-dc2.internal.com:8080" }, health_check_url="http://health-dc2.internal.com/status" ), DataCenter( id="dc3", name="广州灾备数据中心", region="cn-south-1", status=DataCenterStatus.STANDBY, priority=3, endpoints={ "mysql": "mysql-dc3.internal.com:3306", "redis": "redis-dc3.internal.com:6379", "api": "api-dc3.internal.com:8080" }, health_check_url="http://health-dc3.internal.com/status" ) ] # 初始化灾备管理器 dr_manager = DisasterRecoveryManager(data_centers) dr_manager.start_health_monitoring() # 获取当前状态 topology = dr_manager.get_current_topology() print("当前灾备拓扑:") print(json.dumps(topology, indent=2, ensure_ascii=False)) # 备份示例 backup_strategy = MySQLBackupStrategy() backup_strategy.perform_backup("user_db", "/backup/mysql") ``` ### 数据库监控与告警 ```sql -- 创建监控表 CREATE TABLE database_monitoring ( id BIGINT AUTO_INCREMENT PRIMARY KEY, metric_name VARCHAR(100) NOT NULL, metric_value DECIMAL(15,4), metric_unit VARCHAR(50), instance_host VARCHAR(100), instance_port INT, database_name VARCHAR(100), collect_time DATETIME DEFAULT CURRENT_TIMESTAMP, severity ENUM('INFO', 'WARNING', 'CRITICAL') DEFAULT 'INFO', tags JSON, INDEX idx_metric_time (metric_name, collect_time), INDEX idx_severity_time (severity, collect_time) ); -- 插入监控数据的存储过程 DELIMITER $$ CREATE PROCEDURE CollectDatabaseMetrics() BEGIN DECLARE current_time DATETIME DEFAULT NOW(); -- 收集连接数指标 INSERT INTO database_monitoring (metric_name, metric_value, metric_unit, instance_host, database_name) SELECT 'threads_connected' as metric_name, VARIABLE_VALUE as metric_value, 'connections' as metric_unit, @@hostname as instance_host, DATABASE() as database_name FROM information_schema.GLOBAL_STATUS WHERE VARIABLE_NAME = 'Threads_connected'; -- 收集QPS指标 INSERT INTO database_monitoring (metric_name, metric_value, metric_unit, instance_host, database_name) SELECT 'queries_per_second' as metric_name, VARIABLE_VALUE as metric_value, 'queries/sec' as metric_unit, @@hostname as instance_host, DATABASE() as database_name FROM information_schema.GLOBAL_STATUS WHERE VARIABLE_NAME = 'Queries'; -- 收集慢查询指标 INSERT INTO database_monitoring (metric_name, metric_value, metric_unit, instance_host, database_name, severity) SELECT 'slow_queries' as metric_name, VARIABLE_VALUE as metric_value, 'queries' as metric_unit, @@hostname as instance_host, DATABASE() as database_name, CASE WHEN VARIABLE_VALUE > 10 THEN 'WARNING' WHEN VARIABLE_VALUE > 50 THEN 'CRITICAL' ELSE 'INFO' END as severity FROM information_schema.GLOBAL_STATUS WHERE VARIABLE_NAME = 'Slow_queries'; -- 收集锁等待指标 INSERT INTO database_monitoring (metric_name, metric_value, metric_unit, instance_host, database_name, severity) SELECT 'innodb_row_lock_waits' as metric_name, VARIABLE_VALUE as metric_value, 'waits' as metric_unit, @@hostname as instance_host, DATABASE() as database_name, CASE WHEN VARIABLE_VALUE > 100 THEN 'WARNING' WHEN VARIABLE_VALUE > 500 THEN 'CRITICAL' ELSE 'INFO' END as severity FROM information_schema.GLOBAL_STATUS WHERE VARIABLE_NAME = 'Innodb_row_lock_waits'; END$$ DELIMITER ; -- 创建告警规则表 CREATE TABLE alert_rules ( id INT AUTO_INCREMENT PRIMARY KEY, rule_name VARCHAR(200) NOT NULL, metric_name VARCHAR(100) NOT NULL, condition_operator ENUM('>', '>=', '<', '<=', '=', '!='), condition_value DECIMAL(15,4), severity ENUM('INFO', 'WARNING', 'CRITICAL'), enabled BOOLEAN DEFAULT TRUE, check_interval INT DEFAULT 60, -- 秒 last_check_time DATETIME, created_time DATETIME DEFAULT CURRENT_TIMESTAMP, UNIQUE KEY uk_rule_metric (rule_name, metric_name) ); -- 插入默认告警规则 INSERT INTO alert_rules (rule_name, metric_name, condition_operator, condition_value, severity, check_interval) VALUES ('高连接数告警', 'threads_connected', '>', 100, 'WARNING', 60), ('严重连接数告警', 'threads_connected', '>', 200, 'CRITICAL', 60), ('慢查询增多告警', 'slow_queries', '>', 50, 'WARNING', 300), ('严重锁等待告警', 'innodb_row_lock_waits', '>', 500, 'CRITICAL', 60); ``` ## 实战案例:百亿PV电商系统优化 ### 场景描述 某电商平台面临以下挑战: - 日PV超过100亿 - 峰值QPS达到50万+ - 数据量PB级别 - 要求99.99%可用性 ### 解决方案架构 ```python #!/usr/bin/env python3 """ 百亿PV电商系统数据库架构 """ class ECommerceDatabaseArchitecture: """电商数据库架构""" def __init__(self): self.sharding_strategy = {} self.cache_strategy = {} self.backup_strategy = {} def design_sharding_architecture(self): """设计分片架构""" architecture = { 'user_data': { 'shard_key': 'user_id', 'shard_count': 1024, 'strategy': 'range_based', 'backup_interval': 'hourly' }, 'order_data': { 'shard_key': 'order_id', 'shard_count': 512, 'strategy': 'hash_based', 'backup_interval': 'real_time' }, 'product_data': { 'shard_key': 'product_id', 'shard_count': 256, 'strategy': 'consistent_hash', 'backup_interval': 'daily' }, 'inventory_data': { 'shard_key': 'sku_id', 'shard_count': 128, 'strategy': 'hotspot_aware', 'backup_interval': 'real_time' } } return architecture def implement_cache_strategy(self): """实现缓存策略""" cache_layers = { 'L1': { 'type': 'local_cache', 'max_size': '10GB', 'ttl': '5分钟', 'eviction_policy': 'LRU' }, 'L2': { 'type': 'redis_cluster', 'nodes': 12, 'memory_per_node': '64GB', 'strategy': 'read_through' }, 'L3': { 'type': 'CDN_cache', 'coverage': '全球', 'strategy': 'edge_caching' } } return cache_layers def design_disaster_recovery(self): """设计灾备方案""" dr_plan = { 'rto': '5分钟', # 恢复时间目标 'rpo': '30秒', # 恢复点目标 'backup_strategy': { 'full_backup': '每日凌晨2点', 'incremental_backup': '每小时', 'binlog_backup': '实时' }, 'replication_strategy': { 'sync_replication': '同城

有疑问加站长微信联系(非本文作者))

入群交流(和以上内容无关):加入Go大咖交流群,或添加微信:liuxiaoyan-s 备注:入群;或加QQ群:692541889

68 次点击  
加入收藏 微博
暂无回复
添加一条新回复 (您需要 登录 后才能回复 没有账号 ?)
  • 请尽量让自己的回复能够对别人有帮助
  • 支持 Markdown 格式, **粗体**、~~删除线~~、`单行代码`
  • 支持 @ 本站用户;支持表情(输入 : 提示),见 Emoji cheat sheet
  • 图片支持拖拽、截图粘贴等方式上传