最强总结!数据库优化完全指南!!

科技   2024-11-20 14:00   广东  
点击关注公众号,SQL干货及时获取
后台回复:1024,获取500G视频教程
推荐阅读
转行成为数据分析师
牛逼,OpenAI新模型 o1 国内直接连!
《SQL145题第2版》正式发布!

大家好,我是岳哥。

这是《最强总结》系列第三篇,之前分享的还没阅读的可以看这篇:

最强总结!数据库开窗函数完全指南!!

最强总结!SQL Server/MySQL/Oracle函数完全指南!!

数据库优化是提升应用性能的关键环节。本文将从多个维度系统地介绍数据库优化的方法和实践经验。

这些总结都是此前整理好后保存的,最近集中发布,觉得有帮助,记得三连(点赞+转发+在看),岳哥才会更有动力继续发布。此外,大家也可以留言需要哪方面的总结。

一、SQL查询优化
二、数据库配置优化
三、表结构优化
四、分区和分表优化
五、缓存优化
六、监控和维护
七、安全优化

需要PDF版本的同学,可以在公众号后台回复:数据库优化

一、SQL查询优化

1. 索引优化

-- 1. 创建合适的索引
CREATE INDEX idx_user_email ON users(email);
CREATE INDEX idx_order_status_date ON orders(status, created_at);

-- 2. 联合索引优化
-- 不推荐:创建多个单列索引
CREATE INDEX idx_name ON users(name);
CREATE INDEX idx_email ON users(email);
CREATE INDEX idx_phone ON users(phone);

-- 推荐:创建合适的联合索引
CREATE INDEX idx_users_search ON users(name, email, phone);

-- 3. 索引排序优化
-- 考虑排序方向的索引
CREATE INDEX idx_products_category_price ON products(category ASC, price DESC);

-- 4. 前缀索引
-- 对于长字符串字段,使用前缀索引
CREATE INDEX idx_description ON products(description(50));

-- 5. 覆盖索引
-- 创建包含所有查询所需字段的索引
CREATE INDEX idx_orders_covering ON orders(order_id, status, created_at, total_amount);

-- 使用覆盖索引的查询
EXPLAIN SELECT order_id, status, created_at, total_amount 
FROM orders 
WHERE status = 'pending' 
ORDER BY created_at DESC;

-- 6. 避免索引失效
-- 不推荐:使用函数导致索引失效
SELECT * FROM users WHERE YEAR(created_at) = 2024;

-- 推荐:改写为范围查询
SELECT * FROM users 
WHERE created_at >= '2024-01-01' 
AND created_at < '2025-01-01';

2. 查询重写

-- 1. 避免SELECT *
-- 不推荐
SELECT * FROM orders JOIN users ON orders.user_id = users.id;

-- 推荐
SELECT o.order_id, o.status, u.name, u.email 
FROM orders o 
JOIN users u ON o.user_id = u.id;

-- 2. 使用EXISTS替代IN
-- 不推荐
SELECT * FROM orders 
WHERE user_id IN (SELECT id FROM users WHERE status = 'active');

-- 推荐
SELECT * FROM orders o 
WHERE EXISTS (
    SELECT 1 FROM users u 
    WHERE u.id = o.user_id AND u.status = 'active'
);

-- 3. 使用UNION ALL替代UNION
-- 不推荐
SELECT order_id FROM completed_orders
UNION
SELECT order_id FROM pending_orders;

-- 推荐
SELECT order_id FROM completed_orders
UNION ALL
SELECT order_id FROM pending_orders;

-- 4. 使用JOIN替代子查询
-- 不推荐
SELECT *,
    (SELECT name FROM users WHERE users.id = orders.user_id) as user_name
FROM orders;

-- 推荐
SELECT o.*, u.name as user_name
FROM orders o
LEFT JOIN users u ON o.user_id = u.id;

-- 5. 分页优化
-- 不推荐
SELECT * FROM orders ORDER BY created_at DESC LIMIT 100000010;

-- 推荐
SELECT * FROM orders 
WHERE created_at < (
    SELECT created_at FROM orders 
    ORDER BY created_at DESC 
    LIMIT 1 OFFSET 999999
)
ORDER BY created_at DESC 
LIMIT 10;

-- 6. 使用临时表优化复杂查询
-- 创建临时表存储中间结果
CREATE TEMPORARY TABLE tmp_order_stats AS
SELECT user_id, COUNT(*) as order_count, SUM(amount) as total_amount
FROM orders
GROUP BY user_id;

-- 使用临时表
SELECT u.*, t.order_count, t.total_amount
FROM users u
LEFT JOIN tmp_order_stats t ON u.id = t.user_id;

二、数据库配置优化

1. 内存配置

# MySQL配置文件 my.cnf

# 1. InnoDB缓冲池配置
innodb_buffer_pool_size = 12G        # 总内存的50-75%
innodb_buffer_pool_instances = 8     # CPU核心数相当
innodb_buffer_pool_chunk_size = 128M # 默认值,通常不需要修改

# 2. 查询缓存配置(MySQL 8.0已移除)
query_cache_type = 0                 # 禁用查询缓存
query_cache_size = 0                 # 设置为0

# 3. 排序缓冲区
sort_buffer_size = 2M               # 每个会话的排序缓冲区
join_buffer_size = 2M               # 每个会话的连接缓冲区

# 4. 临时表配置
tmp_table_size = 64M                # 内存临时表大小
max_heap_table_size = 64M           # 用户创建的内存表大小

# 5. 连接缓冲区
max_connections = 1000              # 最大连接数
thread_cache_size = 128             # 线程缓存大小

# 6. InnoDB日志配置
innodb_log_file_size = 1G          # 重做日志文件大小
innodb_log_files_in_group = 2      # 重做日志文件组数
innodb_log_buffer_size = 16M       # 日志缓冲区大小

# 7. 性能监控配置
performance_schema = ON
performance_schema_max_table_instances = 1000

# 查看当前内存使用情况的SQL
SELECT 
    event_name AS memory_type,
    current_alloc AS current_bytes,
    high_alloc AS max_bytes
FROM performance_schema.memory_summary_global_by_event_name
WHERE event_name LIKE '%memory%'
ORDER BY current_alloc DESC;

# 查看InnoDB缓冲池状态
SHOW ENGINE INNODB STATUS;

# 查看全局变量
SHOW VARIABLES LIKE 'innodb_buffer_pool_size';

# 检查缓冲池命中率
SELECT 
    (1 - (
        SELECT variable_value 
        FROM performance_schema.global_status 
        WHERE variable_name = 'Innodb_buffer_pool_reads'
    ) / (
        SELECT variable_value 
        FROM performance_schema.global_status 
        WHERE variable_name = 'Innodb_buffer_pool_read_requests'
    )) * 100 as buffer_pool_hit_ratio;

2. 磁盘I/O优化

# MySQL配置文件 my.cnf

# 1. InnoDB文件I/O配置
innodb_file_per_table = ON          # 每个表使用独立的表空间文件
innodb_flush_method = O_DIRECT      # 使用直接I/O
innodb_io_capacity = 2000           # SSD磁盘I/O能力
innodb_io_capacity_max = 4000       # 最大I/O能力

# 2. 日志写入配置
innodb_flush_log_at_trx_commit = 1  # 事务提交时写入磁盘
sync_binlog = 1                     # 每次事务同步二进制日志

# 3. 双写缓冲区配置
innodb_doublewrite = ON             # 启用双写缓冲
innodb_doublewrite_batch_size = 32  # 批处理大小

# 4. 读写分离配置
innodb_read_io_threads = 8          # 读线程数
innodb_write_io_threads = 8         # 写线程数

# 5. 预读配置
innodb_read_ahead_threshold = 56    # 预读阈值
innodb_random_read_ahead = OFF      # 随机预读

# 监控I/O性能的SQL查询
SELECT 
    event_name,
    count_star as count,
    sum_timer_wait/1000000000 as total_ms,
    avg_timer_wait/1000000000 as avg_ms,
    max_timer_wait/1000000000 as max_ms
FROM performance_schema.events_waits_summary_global_by_event_name
WHERE event_name LIKE '%io%'
ORDER BY sum_timer_wait DESC;

# 检查InnoDB I/O状态
SHOW ENGINE INNODB STATUS;

# 查看文件I/O情况
SELECT 
    file_name,
    event_name,
    count_read,
    count_write,
    sum_number_of_bytes_read,
    sum_number_of_bytes_write
FROM performance_schema.file_summary_by_instance
WHERE event_name LIKE '%innodb%'
ORDER BY sum_number_of_bytes_read + sum_number_of_bytes_write DESC;

# 检查慢查询
SELECT 
    query,
    exec_count,
    total_latency,
    avg_latency,
    rows_sent_avg,
    rows_examined_avg
FROM sys.statements_with_runtimes_in_95th_percentile;

3. 并发配置

# MySQL并发配置

# 1. 连接管理
max_connections = 1000                 # 最大连接数
max_user_connections = 800             # 每个用户最大连接数
thread_cache_size = 128                # 线程缓存大小
wait_timeout = 600                     # 非交互连接超时时间
interactive_timeout = 1800             # 交互连接超时时间

# 2. 锁配置
innodb_lock_wait_timeout = 50          # 锁等待超时时间
innodb_deadlock_detect = ON            # 死锁检测
innodb_autoinc_lock_mode = 2           # 自增锁模式

# 3. 事务配置
transaction_isolation = REPEATABLE-READ # 事务隔离级别
innodb_rollback_segments = 128         # 回滚段数量

# 监控并发情况的SQL
# 检查当前连接数
SELECT COUNT(*) AS connections,
       USER AS user,
       HOST AS host
FROM INFORMATION_SCHEMA.PROCESSLIST
GROUP BY user, host;

# 检查锁等待情况
SELECT 
    r.trx_id waiting_trx_id,
    r.trx_state waiting_trx_state,
    b.trx_id blocking_trx_id,
    b.trx_state blocking_trx_state,
    TIMESTAMPDIFF(SECOND, r.trx_wait_started, NOW()) wait_time
FROM information_schema.innodb_lock_waits w
INNER JOIN information_schema.innodb_trx b ON b.trx_id = w.blocking_trx_id
INNER JOIN information_schema.innodb_trx r ON r.trx_id = w.requesting_trx_id;

# 检查事务状态
SELECT * FROM information_schema.innodb_trx
WHERE trx_state != 'RUNNING';

# 检查表锁情况
SHOW OPEN TABLES WHERE in_use > 0;

三、表结构优化

1. 数据类型优化

-- 1. 整数类型优化
-- 不推荐
CREATE TABLE products (
    id INT,                    -- 范围过大
    status TINYINT,           -- 未声明是否有符号
    quantity INT,             -- 范围过大
    category_id INT           -- 范围过大
);

-- 推荐
CREATE TABLE products (
    id MEDIUMINT UNSIGNED AUTO_INCREMENT,  -- 适中范围
    status TINYINT UNSIGNED,               -- 明确无符号
    quantity SMALLINT UNSIGNED,            -- 适合范围
    category_id SMALLINT UNSIGNED          -- 适合范围
);

-- 2. 字符串类型优化
-- 不推荐
CREATE TABLE users (
    id INT,
    name VARCHAR(255),         -- 长度过长
    email VARCHAR(255),        -- 长度过长
    phone CHAR(20),           -- 使用固定长度
    description TEXT          -- 未限制长度
);

-- 推荐
CREATE TABLE users (
    id INT,
    name VARCHAR(50),          -- 适合长度
    email VARCHAR(100),        -- 适合长度
    phone VARCHAR(20),         -- 可变长度
    description TEXT(16383)    -- MEDIUMTEXT替代
);

-- 3. 时间类型优化
-- 不推荐
CREATE TABLE orders (
    id INT,
    created_at VARCHAR(30),    -- 使用字符串存储时间
    updated_at VARCHAR(30),    -- 使用字符串存储时间
    delivery_date DATETIME     -- 不需要时间精度
);

-- 推荐
CREATE TABLE orders (
    id INT,
    created_at TIMESTAMP,      -- 自动更新时间戳
    updated_at TIMESTAMP,      -- 自动更新时间戳
    delivery_date DATE         -- 只需要日期
);

-- 4. 小数类型优化
-- 不推荐
CREATE TABLE financial_records (
    id INT,
    amount FLOAT,              -- 存在精度问题
    price DOUBLE,             -- 存在精度问题
    tax_rate DECIMAL(10,2)    -- 范围过大
);

-- 推荐
CREATE TABLE financial_records (
    id INT,
    amount DECIMAL(8,2),       -- 精确定点数
    price DECIMAL(8,2),        -- 精确定点数
    tax_rate DECIMAL(5,2)      -- 适合范围
);

-- 5. 枚举类型优化
-- 不推荐
CREATE TABLE tickets (
    id INT,
    status VARCHAR(20),        -- 使用字符串存储固定选项
    priority VARCHAR(20)       -- 使用字符串存储固定选项
);

-- 推荐
CREATE TABLE tickets (
    id INT,
    status ENUM('open''closed''pending'),  -- 使用枚举
    priority ENUM('low''medium''high')     -- 使用枚举
);

-- 6. JSON类型优化(MySQL 5.7+)
-- 不推荐
CREATE TABLE product_attributes (
    id INT,
    attributes VARCHAR(4000)   -- 使用字符串存储JSON
);

-- 推荐
CREATE TABLE product_attributes (
    id INT,
    attributes JSON,           -- 使用JSON类型
    INDEX idx_attr ((CAST(attributes->>'$.color' AS CHAR(20))))  -- JSON索引
);

2. 范式和反范式优化

-- 1. 范式设计示例
-- 第三范式设计
CREATE TABLE orders (
    order_id INT PRIMARY KEY,
    user_id INT,
    order_date DATE,
    status VARCHAR(20),
    FOREIGN KEY (user_id) REFERENCES users(id)
);

CREATE TABLE order_items (
    order_item_id INT PRIMARY KEY,
    order_id INT,
    product_id INT,
    quantity INT,
    unit_price DECIMAL(10,2),
    FOREIGN KEY (order_id) REFERENCES orders(order_id),
    FOREIGN KEY (product_id) REFERENCES products(id)
);

-- 2. 反范式设计示例
-- 为了查询效率,在orders表中冗余一些常用信息
CREATE TABLE orders_denormalized (
    order_id INT PRIMARY KEY,
    user_id INT,
    user_name VARCHAR(50),         -- 冗余用户名
    user_email VARCHAR(100),       -- 冗余用户邮箱
    order_date DATE,
    status VARCHAR(20),
    total_amount DECIMAL(10,2),    -- 冗余订单总额
    total_items INT,               -- 冗余商品总数
    FOREIGN KEY (user_id) REFERENCES users(id)
);

-- 3. 混合设计示例
-- 保持基本范式,但添加汇总表
CREATE TABLE order_summaries (
    order_id INT PRIMARY KEY,
    total_amount DECIMAL(10,2),
    total_items INT,
    last_updated TIMESTAMP,
    FOREIGN KEY (order_id) REFERENCES orders(order_id)
);

-- 创建触发器自动更新汇总信息
DELIMITER $$

CREATE TRIGGER after_order_item_insert
AFTER INSERT ON order_items
FOR EACH ROW
BEGIN
    INSERT INTO order_summaries (order_id, total_amount, total_items)
    SELECT 
        NEW.order_id,
        SUM(quantity * unit_price),
        SUM(quantity)
    FROM order_items
    WHERE order_id = NEW.order_id
    ON DUPLICATE KEY UPDATE
        total_amount = VALUES(total_amount),
        total_items = VALUES(total_items),
        last_updated = CURRENT_TIMESTAMP;
END $$

DELIMITER ;

-- 4. 垂直分表示例
-- 将大字段拆分到单独的表
CREATE TABLE products (
    id INT PRIMARY KEY,
    name VARCHAR(100),
    price DECIMAL(10,2),
    category_id INT
);

CREATE TABLE product_details (
    product_id INT PRIMARY KEY,
    description TEXT,
    specifications JSON,
    images JSON,
    FOREIGN KEY (product_id) REFERENCES products(id)
);

-- 5. 水平分表示例
-- 按时间范围分表
CREATE TABLE orders_2023 (
    order_id INT PRIMARY KEY,
    user_id INT,
    order_date DATE,
    CHECK (YEAR(order_date) = 2023)
);

CREATE TABLE orders_2024 (
    order_id INT PRIMARY KEY,
    user_id INT,
    order_date DATE,
    CHECK (YEAR(order_date) = 2024)
);

四、分区和分表优化

1. 分区策略

-- 1. 范围分区(RANGE)
CREATE TABLE orders (
    order_id INT,
    order_date DATE,
    amount DECIMAL(10,2),
    customer_id INT
)
PARTITION BY RANGE (YEAR(order_date)) (
    PARTITION p2022 VALUES LESS THAN (2023),
    PARTITION p2023 VALUES LESS THAN (2024),
    PARTITION p2024 VALUES LESS THAN (2025),
    PARTITION pmax VALUES LESS THAN MAXVALUE
);

-- 2. 列表分区(LIST)
CREATE TABLE sales (
    id INT,
    amount DECIMAL(10,2),
    region VARCHAR(20)
)
PARTITION BY LIST COLUMNS(region) (
    PARTITION p_east VALUES IN ('east''northeast'),
    PARTITION p_west VALUES IN ('west''northwest'),
    PARTITION p_cent VALUES IN ('central'),
    PARTITION p_south VALUES IN ('south''southeast')
);

-- 3. 哈希分区(HASH)
CREATE TABLE products (
    id INT,
    name VARCHAR(100),
    created_at DATE
)
PARTITION BY HASH(id)
PARTITIONS 4;

-- 4. 复合分区策略
CREATE TABLE customer_orders (
    order_id INT,
    customer_id INT,
    order_date DATE,
    amount DECIMAL(10,2)
)
PARTITION BY RANGE(YEAR(order_date))
SUBPARTITION BY HASH(customer_id)
SUBPARTITIONS 4 (
    PARTITION p2023 VALUES LESS THAN (2024),
    PARTITION p2024 VALUES LESS THAN (2025),
    PARTITION pmax VALUES LESS THAN MAXVALUE
);

-- 5. 分区维护
-- 添加新分区
ALTER TABLE orders ADD PARTITION (
    PARTITION p2025 VALUES LESS THAN (2026)
);

-- 删除分区
ALTER TABLE orders DROP PARTITION p2022;

-- 重组分区
ALTER TABLE orders REORGANIZE PARTITION p2023 INTO (
    PARTITION p2023_h1 VALUES LESS THAN (2023-07-01),
    PARTITION p2023_h2 VALUES LESS THAN (2024)
);

-- 6. 分区查询优化
-- 使用分区键进行查询
EXPLAIN SELECT * 
FROM orders 
WHERE order_date BETWEEN '2023-01-01' AND '2023-12-31';

-- 分区信息查看
SELECT 
    PARTITION_NAME,
    PARTITION_ORDINAL_POSITION,
    TABLE_ROWS
FROM information_schema.PARTITIONS
WHERE TABLE_NAME = 'orders';

-- 7. 分区表维护
-- 检查分区
ALTER TABLE orders CHECK PARTITION p2023, p2024;

-- 优化分区
ALTER TABLE orders OPTIMIZE PARTITION p2023, p2024;

-- 分析分区
ALTER TABLE orders ANALYZE PARTITION p2023, p2024;

-- 重建分区
ALTER TABLE orders REBUILD PARTITION p2023, p2024;

2. 分表策略

-- 1. 水平分表策略
-- 按用户ID范围分表
CREATE TABLE orders_0000_0999 (
    order_id BIGINT PRIMARY KEY,
    user_id INT CHECK (user_id BETWEEN 0 AND 999),
    order_date TIMESTAMP,
    amount DECIMAL(10,2)
);

CREATE TABLE orders_1000_1999 (
    order_id BIGINT PRIMARY KEY,
    user_id INT CHECK (user_id BETWEEN 1000 AND 1999),
    order_date TIMESTAMP,
    amount DECIMAL(10,2)
);

-- 创建路由视图
CREATE VIEW orders_all AS
    SELECT * FROM orders_0000_0999
    UNION ALL
    SELECT * FROM orders_1000_1999;

-- 2. 垂直分表策略
-- 基础信息表
CREATE TABLE products_base (
    product_id INT PRIMARY KEY,
    name VARCHAR(100),
    price DECIMAL(10,2),
    category_id INT
);

-- 详细信息表
CREATE TABLE products_details (
    product_id INT PRIMARY KEY,
    description TEXT,
    specifications JSON,
    created_at TIMESTAMP,
    updated_at TIMESTAMP,
    FOREIGN KEY (product_id) REFERENCES products_base(product_id)
);

-- 3. 时间分表策略
-- 按月分表
CREATE TABLE orders_202401 (
    order_id BIGINT PRIMARY KEY,
    user_id INT,
    order_date TIMESTAMP CHECK (order_date >= '2024-01-01' AND order_date < '2024-02-01'),
    amount DECIMAL(10,2)
);

CREATE TABLE orders_202402 (
    order_id BIGINT PRIMARY KEY,
    user_id INT,
    order_date TIMESTAMP CHECK (order_date >= '2024-02-01' AND order_date < '2024-03-01'),
    amount DECIMAL(10,2)
);

-- 4. 分表管理存储过程
DELIMITER $$

CREATE PROCEDURE create_monthly_order_table(IN p_year INTIN p_month INT)
BEGIN
    SET @table_name = CONCAT('orders_', p_year, LPAD(p_month, 2'0'));
    SET @start_date = DATE_FORMAT(CONCAT(p_year, '-', p_month, '-01'), '%Y-%m-%d');
    SET @end_date = DATE_FORMAT(DATE_ADD(@start_date, INTERVAL 1 MONTH), '%Y-%m-%d');
    
    SET @sql = CONCAT(
        'CREATE TABLE ', @table_name, ' (
            order_id BIGINT PRIMARY KEY,
            user_id INT,
            order_date TIMESTAMP CHECK (order_date >= '''
, @start_date, ''' AND order_date < ''', @end_date, '''),
            amount DECIMAL(10,2),
            INDEX idx_user_id (user_id),
            INDEX idx_order_date (order_date)
        )'

    );
    
    PREPARE stmt FROM @sql;
    EXECUTE stmt;
    DEALLOCATE PREPARE stmt;
END $$

DELIMITER ;

-- 5. 分表查询优化
-- 创建分表汇总视图
CREATE VIEW orders_summary AS
    SELECT 'orders_202401' as table_name, COUNT(*) as record_count, 
           MIN(order_date) as min_date, MAX(order_date) as max_date
    FROM orders_202401
    UNION ALL
    SELECT 'orders_202402' as table_name, COUNT(*) as record_count,
           MIN(order_date) as min_date, MAX(order_date) as max_date
    FROM orders_202402;

-- 6. 分表数据迁移
-- 创建数据迁移存储过程
DELIMITER $$

CREATE PROCEDURE migrate_orders_data(
    IN source_table VARCHAR(50),
    IN target_table VARCHAR(50),
    IN batch_size INT
)
BEGIN
    DECLARE done INT DEFAULT FALSE;
    DECLARE batch_start BIGINT;
    
    -- 获取最小ID
    SET batch_start = (SELECT MIN(order_id) FROM source_table);
    
    REPEAT
        -- 批量迁移数据
        INSERT INTO target_table
        SELECT * FROM source_table
        WHERE order_id >= batch_start 
        AND order_id < batch_start + batch_size;
        
        -- 更新起始ID
        SET batch_start = batch_start + batch_size;
        
        -- 检查是否完成
        SET done = NOT EXISTS(
            SELECT 1 FROM source_table 
            WHERE order_id >= batch_start LIMIT 1
        );
        
        -- 提交事务并等待一小段时间
        COMMIT;
        DO SLEEP(0.1);
        
    UNTIL done END REPEAT;
END $$

DELIMITER ;

五、缓存优化

1. 查询缓存

-- 1. MySQL查询缓存配置(适用于MySQL 5.7及以下版本)
SET GLOBAL query_cache_type = 1;
SET GLOBAL query_cache_size = 134217728;  -- 128MB

-- 2. 应用层查询缓存实现
-- 创建缓存表
CREATE TABLE query_cache (
    cache_key VARCHAR(255) PRIMARY KEY,
    cache_value TEXT,
    expires_at TIMESTAMP,
    created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
    updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP
);

-- 创建缓存管理存储过程
DELIMITER $$

CREATE PROCEDURE cache_query(
    IN p_cache_key VARCHAR(255),
    IN p_query TEXT,
    IN p_expire_minutes INT
)
BEGIN
    DECLARE v_cache_value TEXT;
    DECLARE v_is_expired BOOLEAN;
    
    -- 检查缓存是否存在且未过期
    SELECT 
        cache_value,
        expires_at < NOW() INTO v_cache_value, v_is_expired
    FROM query_cache
    WHERE cache_key = p_cache_key;
    
    -- 如果缓存不存在或已过期,执行查询并更新缓存
    IF v_cache_value IS NULL OR v_is_expired THEN
        SET @sql = p_query;
        PREPARE stmt FROM @sql;
        EXECUTE stmt;
        
        -- 将结果存入缓存
        -- 注意:这里假设查询结果可以转换为JSON格式
        SET @result = (SELECT JSON_ARRAYAGG(JSON_OBJECT(
            'column1', column1,
            'column2', column2
            -- 添加更多列
        )) FROM (SELECT * FROM tmp_result) t);
        
        -- 更新缓存
        INSERT INTO query_cache (cache_key, cache_value, expires_at)
        VALUES (
            p_cache_key,
            @result,
            DATE_ADD(NOW(), INTERVAL p_expire_minutes MINUTE)
        )
        ON DUPLICATE KEY UPDATE
            cache_value = VALUES(cache_value),
            expires_at = VALUES(expires_at);
            
        DEALLOCATE PREPARE stmt;
    ELSE
        -- 返回缓存的结果
        SELECT JSON_EXTRACT(v_cache_value, '$[*]');
    END IF;
END $$

DELIMITER ;

-- 3. 缓存预热
DELIMITER $$

CREATE PROCEDURE warm_up_cache()
BEGIN
    -- 预热热门商品缓存
    CALL cache_query(
        'hot_products',
        'SELECT * FROM products WHERE sales_count > 1000 ORDER BY sales_count DESC LIMIT 100',
        60  -- 缓存60分钟
    );
    
    -- 预热分类统计缓存
    CALL cache_query(
        'category_stats',
        'SELECT category_id, COUNT(*) as product_count, AVG(price) as avg_price FROM products GROUP BY category_id',
        120  -- 缓存120分钟
    );
END $$

DELIMITER ;

-- 4. 缓存清理
DELIMITER $$

CREATE PROCEDURE clean_expired_cache()
BEGIN
    -- 删除过期缓存
    DELETE FROM query_cache WHERE expires_at < NOW();
    
    -- 压缩缓存表
    OPTIMIZE TABLE query_cache;
END $$

DELIMITER ;

-- 创建定时任务
CREATE EVENT clean_cache_event
ON SCHEDULE EVERY 1 HOUR
DO CALL clean_expired_cache();

-- 5. 缓存监控
CREATE VIEW cache_statistics AS
SELECT 
    COUNT(*) as total_cache_entries,
    COUNT(CASE WHEN expires_at < NOW() THEN 1 ENDas expired_entries,
    AVG(TIMESTAMPDIFF(MINUTE, created_at, expires_at)) as avg_cache_duration,
    MAX(updated_at) as last_cache_update
FROM query_cache;

2. 缓存策略

-- 1. 多级缓存实现
CREATE TABLE cache_levels (
    id INT PRIMARY KEY,
    level_name VARCHAR(50),
    priority INT,
    max_size BIGINT,
    expire_time INT  -- 秒数
);

INSERT INTO cache_levels VALUES
(1'memory'11073741824300),      -- 1GB, 5分钟
(2'redis'2107374182403600),     -- 10GB, 1小时
(3'disk'310737418240086400);    -- 100GB, 1天

-- 2. 缓存队列表
CREATE TABLE cache_queue (
    id BIGINT PRIMARY KEY AUTO_INCREMENT,
    cache_key VARCHAR(255),
    cache_level INT,
    priority INT,
    status ENUM('pending''processing''completed''failed'),
    created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
    updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP,
    FOREIGN KEY (cache_level) REFERENCES cache_levels(id)
);

-- 3. 缓存淘汰策略实现
DELIMITER $$

CREATE PROCEDURE evict_cache(IN p_level_id INT)
BEGIN
    DECLARE v_max_size BIGINT;
    DECLARE v_current_size BIGINT;
    
    -- 获取缓存级别的最大大小
    SELECT max_size INTO v_max_size
    FROM cache_levels
    WHERE id = p_level_id;
    
    -- 获取当前缓存大小
    SELECT SUM(LENGTH(cache_value)) INTO v_current_size
    FROM query_cache
    WHERE cache_level = p_level_id;
    
    -- 如果超过最大大小,执行LRU淘汰
    IF v_current_size > v_max_size THEN
        DELETE FROM query_cache 
        WHERE cache_level = p_level_id
        AND id IN (
            SELECT id 
            FROM (
                SELECT id
                FROM query_cache
                WHERE cache_level = p_level_id
                ORDER BY updated_at ASC
                LIMIT 100
            ) tmp
        );
    END IF;
END $$

DELIMITER ;

-- 4. 缓存预加载策略
DELIMITER $$

CREATE PROCEDURE preload_cache()
BEGIN
    -- 预加载热门商品
    INSERT INTO cache_queue (cache_key, cache_level, prioritystatus)
    SELECT 
        CONCAT('product:'id),
        1,  -- memory缓存级别
        10-- 高优先级
        'pending'
    FROM products 
    WHERE views_count > 1000
    ORDER BY views_count DESC
    LIMIT 100;
    
    -- 预加载分类统计
    INSERT INTO cache_queue (cache_key, cache_level, prioritystatus)
    SELECT 
        CONCAT('category_stats:'id),
        2,  -- redis缓存级别
        5,  -- 中等优先级
        'pending'
    FROM categories;
END $$

DELIMITER ;

-- 5. 缓存一致性维护
DELIMITER $$

CREATE TRIGGER after_product_update
AFTER UPDATE ON products
FOR EACH ROW
BEGIN
    -- 清除相关缓存
    DELETE FROM query_cache 
    WHERE cache_key LIKE CONCAT('product:', NEW.id, '%');
    
    -- 添加缓存重建任务
    INSERT INTO cache_queue (cache_key, cache_level, prioritystatus)
    VALUES (
        CONCAT('product:', NEW.id),
        1,  -- memory缓存级别
        10-- 高优先级
        'pending'
    );
END $$

DELIMITER ;

-- 6. 缓存监控和统计
CREATE VIEW cache_statistics AS
SELECT 
    cl.level_name,
    COUNT(*) as entry_count,
    SUM(LENGTH(qc.cache_value)) as total_size,
    AVG(TIMESTAMPDIFF(SECOND, qc.created_at, qc.expires_at)) as avg_ttl,
    COUNT(CASE WHEN qc.expires_at < NOW() THEN 1 ENDas expired_count
FROM query_cache qc
JOIN cache_levels cl ON qc.cache_level = cl.id
GROUP BY cl.level_name;

六、监控和维护

1. 性能监控

-- 1. 慢查询监控
-- 配置慢查询日志
SET GLOBAL slow_query_log = 'ON';
SET GLOBAL slow_query_log_file = '/var/log/mysql/slow-query.log';
SET GLOBAL long_query_time = 2;  -- 设置慢查询阈值为2秒

-- 创建慢查询分析视图
CREATE VIEW slow_query_analysis AS
SELECT 
    db_name,
    SUBSTRING_INDEX(SUBSTRING_INDEX(query_text, '  '1), '\n'1as query_pattern,
    COUNT(*) as execution_count,
    AVG(query_time) as avg_time,
    MAX(query_time) as max_time,
    SUM(rows_examined) as total_rows_examined,
    SUM(rows_sent) as total_rows_sent
FROM mysql.slow_log
GROUP BY db_name, query_pattern
ORDER BY avg_time DESC;

-- 2. 连接数监控
CREATE TABLE connection_log (
    id BIGINT AUTO_INCREMENT PRIMARY KEY,
    timestamp TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
    total_connections INT,
    active_connections INT,
    waiting_connections INT
);

DELIMITER $$

CREATE PROCEDURE monitor_connections()
BEGIN
    INSERT INTO connection_log (total_connections, active_connections, waiting_connections)
    SELECT 
        COUNT(*) as total_connections,
        COUNT(CASE WHEN command != 'Sleep' THEN 1 ENDas active_connections,
        COUNT(CASE WHEN state = 'Waiting for table lock' THEN 1 ENDas waiting_connections
    FROM information_schema.processlist;
END $$

DELIMITER ;

-- 创建定时任务
CREATE EVENT monitor_connections_event
ON SCHEDULE EVERY 1 MINUTE
DO CALL monitor_connections();

-- 3. 资源使用监控
CREATE TABLE resource_usage_log (
    id BIGINT AUTO_INCREMENT PRIMARY KEY,
    timestamp TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
    cpu_usage FLOAT,
    memory_usage BIGINT,
    disk_usage BIGINT,
    iops INT
);

-- 4. 查询性能监控
CREATE TABLE query_performance_log (
    id BIGINT AUTO_INCREMENT PRIMARY KEY,
    query_id VARCHAR(32),
    query_text TEXT,
    execution_time DECIMAL(10,4),
    rows_affected INT,
    timestamp TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
    explain_plan JSON
);

DELIMITER $$

CREATE TRIGGER before_query_execution
BEFORE INSERT ON query_performance_log
FOR EACH ROW
BEGIN
    -- 获取执行计划
    SET @explain_json = (
        SELECT JSON_ARRAYAGG(
            JSON_OBJECT(
                'select_type', select_type,
                'table', table_name,
                'type', access_type,
                'rows', rows_examined_per_scan
            )
        )
        FROM information_schema.processlist
        WHERE info = NEW.query_text
    );
    SET NEW.explain_plan = @explain_json;
END $$

DELIMITER ;

-- 5. 性能报告生成
CREATE VIEW performance_summary AS
WITH query_stats AS (
    SELECT 
        DATE(timestampas stat_date,
        COUNT(*) as total_queries,
        AVG(execution_time) as avg_execution_time,
        MAX(execution_time) as max_execution_time,
        SUM(CASE WHEN execution_time > 1 THEN 1 ELSE 0 ENDas slow_queries
    FROM query_performance_log
    GROUP BY DATE(timestamp)
),
resource_stats AS (
    SELECT 
        DATE(timestampas stat_date,
        AVG(cpu_usage) as avg_cpu_usage,
        MAX(cpu_usage) as max_cpu_usage,
        AVG(memory_usage) as avg_memory_usage,
        MAX(memory_usage) as max_memory_usage
    FROM resource_usage_log
    GROUP BY DATE(timestamp)
)
SELECT 
    qs.stat_date,
    qs.total_queries,
    qs.avg_execution_time,
    qs.slow_queries,
    rs.avg_cpu_usage,
    rs.avg_memory_usage
FROM query_stats qs
JOIN resource_stats rs ON qs.stat_date = rs.stat_date;

-- 6. 性能预警系统
CREATE TABLE performance_alerts (
    id BIGINT AUTO_INCREMENT PRIMARY KEY,
    alert_type ENUM('slow_query''high_cpu''high_memory''high_connections'),
    alert_message TEXT,
    severity ENUM('info''warning''critical'),
    created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
);

DELIMITER $$

CREATE PROCEDURE check_performance_alerts()
BEGIN
    -- 检查慢查询
    INSERT INTO performance_alerts (alert_type, alert_message, severity)
    SELECT 
        'slow_query',
        CONCAT('Query took ', execution_time, ' seconds: 'LEFT(query_text, 100)),
        CASE 
            WHEN execution_time > 10 THEN 'critical'
            WHEN execution_time > 5 THEN 'warning'
            ELSE 'info'
        END
    FROM query_performance_log
    WHERE execution_time > 2
    AND timestamp > DATE_SUB(NOW(), INTERVAL 5 MINUTE);

    -- 检查资源使用
    INSERT INTO performance_alerts (alert_type, alert_message, severity)
    SELECT 
        'high_cpu',
        CONCAT('CPU usage at 'ROUND(cpu_usage, 2), '%'),
        CASE 
            WHEN cpu_usage > 90 THEN 'critical'
            WHEN cpu_usage > 70 THEN 'warning'
            ELSE 'info'
        END
    FROM resource_usage_log
    WHERE timestamp > DATE_SUB(NOW(), INTERVAL 5 MINUTE)
    AND cpu_usage > 70;
END $$

DELIMITER ;

-- 创建定时检查任务
CREATE EVENT check_performance_alerts_event
ON SCHEDULE EVERY 5 MINUTE
DO CALL check_performance_alerts();

2. 定期维护

-- 1. 表维护程序
DELIMITER $$

CREATE PROCEDURE maintain_tables()
BEGIN
    DECLARE done INT DEFAULT FALSE;
    DECLARE table_name VARCHAR(255);
    DECLARE cur_tables CURSOR FOR 
        SELECT TABLE_NAME 
        FROM information_schema.TABLES 
        WHERE TABLE_SCHEMA = DATABASE();
    DECLARE CONTINUE HANDLER FOR NOT FOUND SET done = TRUE;

    -- 创建维护日志
    CREATE TABLE IF NOT EXISTS maintenance_log (
        id BIGINT AUTO_INCREMENT PRIMARY KEY,
        table_name VARCHAR(255),
        operation VARCHAR(50),
        start_time TIMESTAMP,
        end_time TIMESTAMP,
        status VARCHAR(20),
        error_message TEXT
    );

    OPEN cur_tables;
    
    read_loop: LOOP
        FETCH cur_tables INTO table_name;
        IF done THEN
            LEAVE read_loop;
        END IF;

        -- 记录开始时间
        INSERT INTO maintenance_log (table_name, operation, start_time, status)
        VALUES (table_name, 'OPTIMIZE'NOW(), 'RUNNING');
        
        -- 执行维护操作
        SET @sql = CONCAT('OPTIMIZE TABLE ', table_name);
        BEGIN
            DECLARE CONTINUE HANDLER FOR SQLEXCEPTION
            BEGIN
                UPDATE maintenance_log 
                SET status = 'ERROR',
                    error_message = 'Error during optimization',
                    end_time = NOW()
                WHERE table_name = table_name 
                AND status = 'RUNNING';
            END;
            
            PREPARE stmt FROM @sql;
            EXECUTE stmt;
            DEALLOCATE PREPARE stmt;
            
            -- 更新完成状态
            UPDATE maintenance_log 
            SET status = 'COMPLETED',
                end_time = NOW()
            WHERE table_name = table_name 
            AND status = 'RUNNING';
        END;
    END LOOP;

    CLOSE cur_tables;
END $$

DELIMITER ;

-- 2. 统计信息更新
CREATE PROCEDURE update_statistics()
BEGIN
    -- 更新表统计信息
    ANALYZE TABLE users, orders, products;
    
    -- 更新索引统计信息
    ANALYZE TABLE users PERSISTENT FOR COLUMNS (email), INDEXES (idx_email);
END;

-- 3. 数据清理
DELIMITER $$

CREATE PROCEDURE cleanup_old_data()
BEGIN
    -- 设置清理时间阈值
    SET @cleanup_date = DATE_SUB(CURRENT_DATEINTERVAL 1 YEAR);
    
    -- 开始事务
    START TRANSACTION;
    
    -- 清理旧日志
    DELETE FROM system_logs 
    WHERE created_at < @cleanup_date;
    
    -- 清理旧会话
    DELETE FROM user_sessions 
    WHERE last_activity < @cleanup_date;
    
    -- 归档旧订单
    INSERT INTO orders_archive
    SELECT * FROM orders 
    WHERE order_date < @cleanup_date;
    
    DELETE FROM orders 
    WHERE order_date < @cleanup_date;
    
    -- 提交事务
    COMMIT;
END $$

DELIMITER ;

-- 4. 备份管理
DELIMITER $$

CREATE PROCEDURE manage_backups()
BEGIN
    -- 记录备份开始
    INSERT INTO backup_log (backup_type, start_time, status)
    VALUES ('FULL'NOW(), 'RUNNING');
    
    SET @backup_id = LAST_INSERT_ID();
    
    -- 执行备份
    SET @backup_file = CONCAT('/backups/full_'DATE_FORMAT(NOW(), '%Y%m%d_%H%i%s'), '.sql');
    SET @backup_command = CONCAT(
        'mysqldump -u root -p --all-databases --events --routines ',
        '> ', @backup_file
    );
    
    -- 执行系统命令(需要适当的权限)
    SET @result = sys_exec(@backup_command);
    
    -- 更新备份状态
    UPDATE backup_log
    SET end_time = NOW(),
        status = IF(@result = 0'COMPLETED''FAILED'),
        file_path = @backup_file
    WHERE id = @backup_id;
    
    -- 清理旧备份
    SET @cleanup_date = DATE_SUB(CURRENT_DATEINTERVAL 30 DAY);
    DELETE FROM backup_log 
    WHERE start_time < @cleanup_date;
END $$

DELIMITER ;

-- 5. 创建维护计划
CREATE EVENT daily_maintenance
ON SCHEDULE EVERY 1 DAY
STARTS CURRENT_DATE + INTERVAL 1 DAY + INTERVAL 1 HOUR
DO
BEGIN
    CALL maintain_tables();
    CALL update_statistics();
    CALL cleanup_old_data();
END;

CREATE EVENT weekly_backup
ON SCHEDULE EVERY 1 WEEK
STARTS CURRENT_DATE + INTERVAL 1 WEEK
DO
CALL manage_backups();

-- 6. 维护监控
CREATE VIEW maintenance_status AS
SELECT 
    DATE(start_time) as maintenance_date,
    operation,
    COUNT(*) as total_operations,
    COUNT(CASE WHEN status = 'COMPLETED' THEN 1 ENDas successful_operations,
    COUNT(CASE WHEN status = 'ERROR' THEN 1 ENDas failed_operations,
    AVG(TIMESTAMPDIFF(SECOND, start_time, end_time)) as avg_duration_seconds
FROM maintenance_log
GROUP BY DATE(start_time), operation;

3. 故障恢复

-- 1. 故障检测和记录
CREATE TABLE system_failures (
    id BIGINT AUTO_INCREMENT PRIMARY KEY,
    failure_type VARCHAR(50),
    failure_description TEXT,
    detection_time TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
    resolution_time TIMESTAMP,
    resolution_description TEXT,
    affected_tables TEXT,
    severity ENUM('low''medium''high''critical'),
    status ENUM('detected''investigating''resolving''resolved')
);

-- 2. 自动故障检测程序
DELIMITER $$

CREATE PROCEDURE detect_database_issues()
BEGIN
    -- 检查表损坏
    INSERT INTO system_failures (
        failure_type,
        failure_description,
        affected_tables,
        severity,
        status
    )
    SELECT 
        'TABLE_CORRUPTION',
        CONCAT('Table ', table_name, ' may be corrupted'),
        table_name,
        'high',
        'detected'
    FROM information_schema.TABLES
    WHERE CHECK_TABLE = 'CHECK_FAILED';
    
    -- 检查连接问题
    IF (
        SELECT COUNT(*) 
        FROM information_schema.PROCESSLIST 
        WHERE command = 'Sleep'
    ) > 1000 THEN
        INSERT INTO system_failures (
            failure_type,
            failure_description,
            severity,
            status
        )
        VALUES (
            'CONNECTION_OVERLOAD',
            'Too many sleeping connections',
            'medium',
            'detected'
        );
    END IF;
    
    -- 检查磁盘空间
    IF (
        SELECT DATA_FREE 
        FROM information_schema.TABLES 
        WHERE TABLE_SCHEMA = DATABASE()
    ) < 1073741824 THEN -- 1GB
        INSERT INTO system_failures (
            failure_type,
            failure_description,
            severity,
            status
        )
        VALUES (
            'LOW_DISK_SPACE',
            'Database is running low on disk space',
            'critical',
            'detected'
        );
    END IF;
END $$

DELIMITER ;

-- 3. 自动恢复程序
DELIMITER $$

CREATE PROCEDURE attempt_recovery(IN p_failure_id BIGINT)
BEGIN
    DECLARE v_failure_type VARCHAR(50);
    DECLARE v_affected_tables TEXT;
    
    -- 获取故障信息
    SELECT failure_type, affected_tables 
    INTO v_failure_type, v_affected_tables
    FROM system_failures
    WHERE id = p_failure_id;
    
    -- 更新状态为处理中
    UPDATE system_failures 
    SET status = 'investigating'
    WHERE id = p_failure_id;
    
    -- 根据故障类型执行恢复操作
    CASE v_failure_type
        WHEN 'TABLE_CORRUPTION' THEN
            BEGIN
                -- 尝试修复表
                SET @repair_sql = CONCAT('REPAIR TABLE ', v_affected_tables);
                PREPARE stmt FROM @repair_sql;
                EXECUTE stmt;
                DEALLOCATE PREPARE stmt;
                
                -- 验证修复结果
                SET @check_sql = CONCAT('CHECK TABLE ', v_affected_tables);
                PREPARE stmt FROM @check_sql;
                EXECUTE stmt;
                DEALLOCATE PREPARE stmt;
            END;
            
        WHEN 'CONNECTION_OVERLOAD' THEN
            BEGIN
                -- 清理空闲连接
                KILL CONNECTION (
                    SELECT id 
                    FROM information_schema.PROCESSLIST 
                    WHERE command = 'Sleep' 
                    AND time > 3600
                );
            END;
            
        WHEN 'LOW_DISK_SPACE' THEN
            BEGIN
                -- 执行紧急清理
                CALL cleanup_old_data();
                -- 优化表空间
                OPTIMIZE TABLE users, orders, products;
            END;
    END CASE;
    
    -- 更新恢复状态
    UPDATE system_failures 
    SET status = 'resolved',
        resolution_time = NOW(),
        resolution_description = CONCAT('Automatic recovery for ', v_failure_type, ' completed')
    WHERE id = p_failure_id;
END $$

DELIMITER ;

-- 4. 数据恢复程序
DELIMITER $$

CREATE PROCEDURE recover_data(
    IN p_table_name VARCHAR(255),
    IN p_backup_date DATETIME,
    IN p_where_clause TEXT
)
BEGIN
    -- 记录恢复操作
    INSERT INTO recovery_log (
        table_name,
        backup_date,
        where_clause,
        start_time,
        status
    )
    VALUES (
        p_table_name,
        p_backup_date,
        p_where_clause,
        NOW(),
        'STARTED'
    );
    
    SET @recovery_id = LAST_INSERT_ID();
    
    -- 创建临时表
    SET @create_tmp = CONCAT(
        'CREATE TABLE tmp_recovery_', @recovery_id,
        ' LIKE ', p_table_name
    );
    PREPARE stmt FROM @create_tmp;
    EXECUTE stmt;
    
    -- 从备份恢复数据
    SET @restore_data = CONCAT(
        'INSERT INTO tmp_recovery_', @recovery_id,
        ' SELECT * FROM ', p_table_name, '_backup',
        ' WHERE backup_date = ', QUOTE(p_backup_date),
        ' AND ', p_where_clause
    );
    PREPARE stmt FROM @restore_data;
    EXECUTE stmt;
    
    -- 合并恢复的数据
    SET @merge_data = CONCAT(
        'REPLACE INTO ', p_table_name,
        ' SELECT * FROM tmp_recovery_', @recovery_id
    );
    PREPARE stmt FROM @merge_data;
    EXECUTE stmt;
    
    -- 清理临时表
    SET @drop_tmp = CONCAT('DROP TABLE tmp_recovery_', @recovery_id);
    PREPARE stmt FROM @drop_tmp;
    EXECUTE stmt;
    
    DEALLOCATE PREPARE stmt;
    
    -- 更新恢复日志
    UPDATE recovery_log
    SET end_time = NOW(),
        status = 'COMPLETED',
        rows_recovered = ROW_COUNT()
    WHERE id = @recovery_id;
END $$

DELIMITER ;

-- 5. 创建恢复点
CREATE PROCEDURE create_recovery_point(
    IN p_description VARCHAR(255)
)
BEGIN
    -- 记录恢复点
    INSERT INTO recovery_points (
        description,
        created_at,
        status
    )
    VALUES (
        p_description,
        NOW(),
        'CREATING'
    );
    
    SET @recovery_point_id = LAST_INSERT_ID();
    
    -- 创建表快照
    CREATE TABLE recovery_point_data (
        recovery_point_id BIGINT,
        table_name VARCHAR(255),
        data_snapshot LONGBLOB,
        created_at TIMESTAMP
    );
    
    -- 保存主要表的状态
    INSERT INTO recovery_point_data
    SELECT 
        @recovery_point_id,
        table_name,
        snapshot,
        NOW()
    FROM (
        SELECT 
            'users' as table_name,
            user_data as snapshot 
        FROM users
        UNION ALL
        SELECT 
            'orders' as table_name,
            order_data as snapshot
        FROM orders
    ) as snapshots;
    
    -- 更新恢复点状态
    UPDATE recovery_points
    SET status = 'CREATED'
    WHERE id = @recovery_point_id;
END;

七、安全优化

1. 权限管理

-- 1. 创建角色和权限系统
CREATE TABLE roles (
    role_id INT PRIMARY KEY AUTO_INCREMENT,
    role_name VARCHAR(50UNIQUE,
    description TEXT,
    created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
    updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP
);

CREATE TABLE permissions (
    permission_id INT PRIMARY KEY AUTO_INCREMENT,
    permission_name VARCHAR(100UNIQUE,
    description TEXT,
    resource_type VARCHAR(50),
    access_level ENUM('READ''WRITE''ADMIN')
);

CREATE TABLE role_permissions (
    role_id INT,
    permission_id INT,
    granted_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
    granted_by VARCHAR(100),
    PRIMARY KEY (role_id, permission_id),
    FOREIGN KEY (role_id) REFERENCES roles(role_id),
    FOREIGN KEY (permission_id) REFERENCES permissions(permission_id)
);

CREATE TABLE user_roles (
    user_id INT,
    role_id INT,
    granted_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
    expires_at TIMESTAMP NULL,
    granted_by VARCHAR(100),
    PRIMARY KEY (user_id, role_id),
    FOREIGN KEY (role_id) REFERENCES roles(role_id)
);

-- 2. 创建权限管理存储过程
DELIMITER $$

CREATE PROCEDURE grant_user_role(
    IN p_user_id INT,
    IN p_role_name VARCHAR(50),
    IN p_granted_by VARCHAR(100)
)
BEGIN
    DECLARE v_role_id INT;
    
    -- 获取角色ID
    SELECT role_id INTO v_role_id
    FROM roles
    WHERE role_name = p_role_name;
    
    -- 授予角色
    INSERT INTO user_roles (user_id, role_id, granted_by)
    VALUES (p_user_id, v_role_id, p_granted_by)
    ON DUPLICATE KEY UPDATE
        granted_at = CURRENT_TIMESTAMP,
        granted_by = p_granted_by;
        
    -- 记录审计日志
    INSERT INTO security_audit_log (
        action_type,
        user_id,
        role_id,
        performed_by,
        action_description
    )
    VALUES (
        'GRANT_ROLE',
        p_user_id,
        v_role_id,
        p_granted_by,
        CONCAT('Granted role ', p_role_name, ' to user ', p_user_id)
    );
END $$

CREATE PROCEDURE revoke_user_role(
    IN p_user_id INT,
    IN p_role_name VARCHAR(50),
    IN p_revoked_by VARCHAR(100)
)
BEGIN
    DECLARE v_role_id INT;
    
    -- 获取角色ID
    SELECT role_id INTO v_role_id
    FROM roles
    WHERE role_name = p_role_name;
    
    -- 撤销角色
    DELETE FROM user_roles
    WHERE user_id = p_user_id
    AND role_id = v_role_id;
    
    -- 记录审计日志
    INSERT INTO security_audit_log (
        action_type,
        user_id,
        role_id,
        performed_by,
        action_description
    )
    VALUES (
        'REVOKE_ROLE',
        p_user_id,
        v_role_id,
        p_revoked_by,
        CONCAT('Revoked role ', p_role_name, ' from user ', p_user_id)
    );
END $$

DELIMITER ;

-- 3. 创建安全视图
CREATE VIEW user_permissions AS
SELECT 
    u.user_id,
    r.role_name,
    p.permission_name,
    p.resource_type,
    p.access_level
FROM user_roles ur
JOIN roles r ON ur.role_id = r.role_id
JOIN role_permissions rp ON r.role_id = rp.role_id
JOIN permissions p ON rp.permission_id = p.permission_id
JOIN users u ON ur.user_id = u.id
WHERE (ur.expires_at IS NULL OR ur.expires_at > NOW());

-- 4. 创建权限检查函数
DELIMITER $$

CREATE FUNCTION check_user_permission(
    p_user_id INT,
    p_permission_name VARCHAR(100)
RETURNS BOOLEAN
DETERMINISTIC
BEGIN
    DECLARE has_permission BOOLEAN;
    
    SELECT EXISTS (
        SELECT 1
        FROM user_permissions
        WHERE user_id = p_user_id
        AND permission_name = p_permission_name
    ) INTO has_permission;
    
    RETURN has_permission;
END $$

DELIMITER ;

-- 5. 设置默认安全策略
-- 创建基本角色
INSERT INTO roles (role_name, description) VALUES
('admin''Full system access'),
('developer''Development and testing access'),
('analyst''Read-only access to analytics'),
('user''Basic user access');

-- 创建基本权限
INSERT INTO permissions (permission_name, resource_type, access_level) VALUES
('READ_USER_DATA''USER''READ'),
('WRITE_USER_DATA''USER''WRITE'),
('MANAGE_USERS''USER''ADMIN'),
('READ_ANALYTICS''ANALYTICS''READ'),
('GENERATE_REPORTS''REPORTS''WRITE');

-- 分配角色权限
INSERT INTO role_permissions (role_id, permission_id) 
SELECT 
    r.role_id,
    p.permission_id
FROM roles r
CROSS JOIN permissions p
WHERE r.role_name = 'admin';

-- 6. 审计日志
CREATE TABLE security_audit_log (
    id BIGINT AUTO_INCREMENT PRIMARY KEY,
    action_type VARCHAR(50),
    user_id INT,
    role_id INT,
    permission_id INT,
    performed_by VARCHAR(100),
    action_description TEXT,
    action_time TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
    ip_address VARCHAR(45),
    success BOOLEAN
);

2. 加密和脱敏

-- 1. 创建加密函数
DELIMITER $$

CREATE FUNCTION encrypt_data(p_data TEXT, p_key VARCHAR(32))
RETURNS VARBINARY(1000)
DETERMINISTIC
BEGIN
    RETURN AES_ENCRYPT(p_data, p_key);
END $$

CREATE FUNCTION decrypt_data(p_data VARBINARY(1000), p_key VARCHAR(32))
RETURNS TEXT
DETERMINISTIC
BEGIN
    RETURN AES_DECRYPT(p_data, p_key);
END $$

-- 2. 创建脱敏函数
CREATE FUNCTION mask_email(p_email VARCHAR(255))
RETURNS VARCHAR(255)
DETERMINISTIC
BEGIN
    DECLARE v_local_part VARCHAR(255);
    DECLARE v_domain VARCHAR(255);
    
    SET v_local_part = SUBSTRING_INDEX(p_email, '@'1);
    SET v_domain = SUBSTRING_INDEX(p_email, '@'-1);
    
    RETURN CONCAT(
        LEFT(v_local_part, 2),
        REPEAT('*', LENGTH(v_local_part) - 2),
        '@',
        v_domain
    );
END $$

CREATE FUNCTION mask_phone(p_phone VARCHAR(20))
RETURNS VARCHAR(20)
DETERMINISTIC
BEGIN
    RETURN CONCAT(
        LEFT(p_phone, 3),
        REPEAT('*'LENGTH(p_phone) - 7),
        RIGHT(p_phone, 4)
    );
END $$

CREATE FUNCTION mask_card(p_card VARCHAR(20))
RETURNS VARCHAR(20)
DETERMINISTIC
BEGIN
    RETURN CONCAT(
        'XXXX-XXXX-XXXX-',
        RIGHT(p_card, 4)
    );
END $$

DELIMITER ;

-- 3. 创建敏感数据表
CREATE TABLE sensitive_data (
    id INT PRIMARY KEY AUTO_INCREMENT,
    data_type VARCHAR(50),
    raw_data VARBINARY(1000),
    encryption_key_id INT,
    created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
    updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP,
    INDEX idx_data_type (data_type)
);

-- 4. 创建加密密钥管理
CREATE TABLE encryption_keys (
    id INT PRIMARY KEY AUTO_INCREMENT,
    key_name VARCHAR(100),
    key_value VARBINARY(1000),
    created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
    expires_at TIMESTAMP,
    status ENUM('active''expired''revoked'),
    INDEX idx_status (status)
);

-- 5. 创建数据访问控制
DELIMITER $$

CREATE PROCEDURE access_sensitive_data(
    IN p_user_id INT,
    IN p_data_type VARCHAR(50),
    IN p_data_id INT
)
BEGIN
    DECLARE v_has_permission BOOLEAN;
    DECLARE v_encrypted_data VARBINARY(1000);
    DECLARE v_key_value VARCHAR(32);
    
    -- 检查权限
    SELECT EXISTS (
        SELECT 1 FROM user_permissions
        WHERE user_id = p_user_id
        AND permission_name = CONCAT('READ_', p_data_type)
    ) INTO v_has_permission;
    
    IF v_has_permission THEN
        -- 获取加密数据和密钥
        SELECT 
            sd.raw_data,
            ek.key_value
        INTO 
            v_encrypted_data,
            v_key_value
        FROM sensitive_data sd
        JOIN encryption_keys ek ON sd.encryption_key_id = ek.id
        WHERE sd.id = p_data_id
        AND sd.data_type = p_data_type
        AND ek.status = 'active';
        
        -- 解密并返回数据
        SELECT decrypt_data(v_encrypted_data, v_key_value) as decrypted_data;
        
        -- 记录访问日志
        INSERT INTO data_access_log (
            user_id,
            data_type,
            data_id,
            access_time,
            access_type
        )
        VALUES (
            p_user_id,
            p_data_type,
            p_data_id,
            NOW(),
            'READ'
        );
    ELSE
        SIGNAL SQLSTATE '45000'
        SET MESSAGE_TEXT = 'Permission denied';
    END IF;
END $$

DELIMITER ;

-- 6. 创建数据脱敏视图
CREATE VIEW masked_user_data AS
SELECT 
    id,
    username,
    mask_email(email) as email,
    mask_phone(phone) as phone,
    city,
    country,
    created_at
FROM users;

-- 7. 创建加密数据备份程序
DELIMITER $$

CREATE PROCEDURE backup_sensitive_data()
BEGIN
    DECLARE v_backup_key VARCHAR(32);
    
    -- 生成备份密钥
    SET v_backup_key = SHA2(CONCAT(UUID(), RAND()), 256);
    
    -- 创建加密备份
    CREATE TABLE sensitive_data_backup AS
    SELECT 
        id,
        data_type,
        encrypt_data(raw_data, v_backup_key) as encrypted_backup,
        NOW() as backup_time
    FROM sensitive_data;
    
    -- 安全存储备份密钥
    INSERT INTO encryption_keys (
        key_name,
        key_value,
        expires_at,
        status
    )
    VALUES (
        CONCAT('BACKUP_KEY_'DATE_FORMAT(NOW(), '%Y%m%d')),
        v_backup_key,
        DATE_ADD(NOW(), INTERVAL 30 DAY),
        'active'
    );
END $$

DELIMITER ;

-- 8. 创建密钥轮换程序
DELIMITER $$

CREATE PROCEDURE rotate_encryption_keys()
BEGIN
    DECLARE v_new_key VARCHAR(32);
    DECLARE v_old_key_id INT;
    
    START TRANSACTION;
    
    -- 生成新密钥
    SET v_new_key = SHA2(CONCAT(UUID(), RAND()), 256);
    
    -- 保存新密钥
    INSERT INTO encryption_keys (
        key_name,
        key_value,
        status
    )
    VALUES (
        CONCAT('KEY_'DATE_FORMAT(NOW(), '%Y%m%d')),
        v_new_key,
        'active'
    );
    
    SET v_old_key_id = (
        SELECT id 
        FROM encryption_keys 
        WHERE status = 'active'
        ORDER BY created_at DESC 
        LIMIT 1
    );
    
    -- 重新加密数据
    UPDATE sensitive_data
    SET raw_data = encrypt_data(
        decrypt_data(raw_data, (
            SELECT key_value 
            FROM encryption_keys 
            WHERE id = encryption_key_id
        )),
        v_new_key
    ),
    encryption_key_id = LAST_INSERT_ID()
    WHERE encryption_key_id = v_old_key_id;
    
    -- 废除旧密钥
    UPDATE encryption_keys
    SET status = 'expired'
    WHERE id = v_old_key_id;
    
    COMMIT;
END $$

DELIMITER ;

3. 审计跟踪

-- 1. 创建审计日志表
CREATE TABLE audit_log (
    id BIGINT AUTO_INCREMENT PRIMARY KEY,
    action_type ENUM('INSERT''UPDATE''DELETE''SELECT''LOGIN''LOGOUT'),
    table_name VARCHAR(100),
    record_id BIGINT,
    old_value JSON,
    new_value JSON,
    user_id INT,
    application_user VARCHAR(100),
    action_time TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
    client_ip VARCHAR(45),
    user_agent VARCHAR(255)
);

-- 2. 创建审计触发器
DELIMITER $$

-- 为用户表创建审计触发器
CREATE TRIGGER users_audit_insert
AFTER INSERT ON users
FOR EACH ROW
BEGIN
    INSERT INTO audit_log (
        action_type,
        table_name,
        record_id,
        new_value,
        user_id,
        application_user
    )
    VALUES (
        'INSERT',
        'users',
        NEW.id,
        JSON_OBJECT(
            'username', NEW.username,
            'email', NEW.email,
            'status', NEW.status
        ),
        @current_user_id,
        CURRENT_USER()
    );
END $$

CREATE TRIGGER users_audit_update
AFTER UPDATE ON users
FOR EACH ROW
BEGIN
    INSERT INTO audit_log (
        action_type,
        table_name,
        record_id,
        old_value,
        new_value,
        user_id,
        application_user
    )
    VALUES (
        'UPDATE',
        'users',
        NEW.id,
        JSON_OBJECT(
            'username', OLD.username,
            'email', OLD.email,
            'status', OLD.status
        ),
        JSON_OBJECT(
            'username', NEW.username,
            'email', NEW.email,
            'status', NEW.status
        ),
        @current_user_id,
        CURRENT_USER()
    );
END $$

CREATE TRIGGER users_audit_delete
AFTER DELETE ON users
FOR EACH ROW
BEGIN
    INSERT INTO audit_log (
        action_type,
        table_name,
        record_id,
        old_value,
        user_id,
        application_user
    )
    VALUES (
        'DELETE',
        'users',
        OLD.id,
        JSON_OBJECT(
            'username', OLD.username,
            'email', OLD.email,
            'status', OLD.status
        ),
        @current_user_id,
        CURRENT_USER()
    );
END $$

DELIMITER ;

-- 3. 创建审计查询视图
CREATE VIEW audit_summary AS
SELECT 
    DATE(action_time) as audit_date,
    action_type,
    table_name,
    COUNT(*) as action_count,
    COUNT(DISTINCT user_id) as unique_users
FROM audit_log
GROUP BY DATE(action_time), action_type, table_name;

-- 4. 创建审计报告程序
DELIMITER $$

CREATE PROCEDURE generate_audit_report(
    IN p_start_date DATE,
    IN p_end_date DATE
)
BEGIN
    -- 总体统计
    SELECT 
        action_type,
        COUNT(*) as total_actions,
        COUNT(DISTINCT user_id) as unique_users,
        MIN(action_time) as first_action,
        MAX(action_time) as last_action
    FROM audit_log
    WHERE DATE(action_time) BETWEEN p_start_date AND p_end_date
    GROUP BY action_type;
    
    -- 用户活动分析
    SELECT 
        user_id,
        application_user,
        COUNT(*) as total_actions,
        GROUP_CONCAT(DISTINCT action_type) as action_types,
        COUNT(DISTINCT table_name) as affected_tables
    FROM audit_log
    WHERE DATE(action_time) BETWEEN p_start_date AND p_end_date
    GROUP BY user_id, application_user
    ORDER BY total_actions DESC;
    
    -- 可疑活动检测
    SELECT 
        action_time,
        action_type,
        table_name,
        user_id,
        application_user,
        client_ip
    FROM audit_log
    WHERE DATE(action_time) BETWEEN p_start_date AND p_end_date
    AND (
        -- 大量删除操作
        action_type = 'DELETE'
        -- 非工作时间的活动
        OR HOUR(action_time) NOT BETWEEN 9 AND 18
        -- 异常IP访问
        OR client_ip NOT LIKE '192.168.%'
    );
END $$

-- 5. 创建审计清理程序
CREATE PROCEDURE clean_audit_logs(IN p_days_to_keep INT)
BEGIN
    -- 将旧日志归档
    INSERT INTO audit_log_archive
    SELECT *
    FROM audit_log
    WHERE action_time < DATE_SUB(CURRENT_DATEINTERVAL p_days_to_keep DAY);
    
    -- 删除旧日志
    DELETE FROM audit_log
    WHERE action_time < DATE_SUB(CURRENT_DATEINTERVAL p_days_to_keep DAY);
END $$

DELIMITER ;

-- 6. 创建审计警报程序
DELIMITER $$

CREATE PROCEDURE check_audit_alerts()
BEGIN
    -- 检查可疑活动
    INSERT INTO security_alerts (
        alert_type,
        description,
        severity,
        created_at
    )
    SELECT 
        'SUSPICIOUS_ACTIVITY',
        CONCAT(
            'Multiple delete operations (',
            COUNT(*),
            ') by user ',
            user_id,
            ' in the last hour'
        ),
        'HIGH',
        NOW()
    FROM audit_log
    WHERE action_type = 'DELETE'
    AND action_time > DATE_SUB(NOW(), INTERVAL 1 HOUR)
    GROUP BY user_id
    HAVING COUNT(*) > 100;
    
    -- 检查异常登录
    INSERT INTO security_alerts (
        alert_type,
        description,
        severity,
        created_at
    )
    SELECT 
        'ABNORMAL_LOGIN',
        CONCAT(
            'Multiple failed login attempts (',
            COUNT(*),
            ') from IP ',
            client_ip
        ),
        'HIGH',
        NOW()
    FROM audit_log
    WHERE action_type = 'LOGIN'
    AND action_time > DATE_SUB(NOW(), INTERVAL 1 HOUR)
    AND new_value->>'$.success' = 'false'
    GROUP BY client_ip
    HAVING COUNT(*) > 5;
END $$

DELIMITER ;


最后

给大家推荐一下GPT 4.0系统,一次性买了200多个Plus会员放在这个系统的池子里,无需梯子即可直连,费用还比官网便宜一半,包售后。更多介绍点击这里每月仅需88元!
我是岳哥欢迎关注,下期见~

SQL数据库开发
8年开发,5年管理,一个懂职场和AI的数据人。专注数据,Ai和职场等领域。回复「1024」,领取500G技术教程
 最新文章