Featured image of post 实战指南:大型阿里云 MySQL 快照数据的高效本地还原

实战指南:大型阿里云 MySQL 快照数据的高效本地还原

针对大型阿里云 RDS MySQL 快照备份文件,本文探讨了如何通过优化配置与多线程脚本,实现数据的快速本地还原与自动化空间清理。

前言

接到一项取证相关任务,需要对一份数据库文件进行分析。实际拿到文件后发现,该文件并非传统的物理数据文件,而是阿里云 RDS MySQL 的快照备份数据。 因此,第一步工作并不是直接分析数据,而是将快照内容完整还原为可用的 MySQL 数据库实例。

查阅阿里云官方文档RDS MySQL 快照备份文件恢复到自建数据库后,结合实际数据规模与性能瓶颈,逐步形成了本文所记录的恢复与优化方案。

快照目录结构分析

对快照文件解压后,其目录结构大致如下:

├── db1/
│   ├── table_a/
│   │   ├── structure.sql
│   │   └── data/
│   │       ├── data-001.sql
│   │       └── data-002.sql
│   └── table_b/
│       ├── structure.sql
│       └── data/
│           └── data-001.sql
├── db2/
│   ├── users/
│   │   ├── structure.sql
│   │   └── data/
│   │       └── data_0001.sql
│   └── orders/
│       ├── structure.sql
│       └── data/
│           ├── data_0001.sql
│           └── data_0002.sql
└── db3/
    └── empty_table/
        └── structure.sql   # 仅包含结构,会创建空表

可以确认以下几点:

  • 每个数据库对应一个一级目录
  • 每个表对应一个子目录
  • structure.sql:表或库的结构定义
  • data/ 目录:表数据(.sql 或 .csv)
  • 若仅存在 structure.sql,则恢复后为空表

初始恢复尝试

按照阿里云官方提供的 Python 示例脚本进行恢复测试。 然而在实际环境中遇到以下问题:

  • 数据规模极大:.sql 文件总量超过 500GB
  • 导入速度极慢:长时间无明显进度
  • 资源利用率低:CPU、IO 未被充分使用

官方脚本的核心问题在于:

  1. 单线程顺序导入
  2. 完全依赖 MySQL 默认配置
  3. 每个 SQL 文件独立执行,事务与刷盘开销巨大

官方示例脚本如下(原样保留):

官方 Python 脚本源码
#!/usr/bin/python
# ****************************************************************#
# ScriptName: restore_from_downloads
# Author: @alibaba-inc.com
# Function:
# ***************************************************************#

import os
import sys

STRUCTURE_FILE_NAME = "structure.sql"
DATA_PATH_PREFIX = "data"


def create_database(db_host, db_port, db_user, db_pass, create_stmt_file):
    cmd = "mysql -h" + db_host + " -P" + db_port + " -u" + \
        db_user + " -p" + db_pass + " <" + create_stmt_file
    if os.system(cmd) != 0:
        print("[ERROR]: execute SQL failed. command: " + cmd)
        exit(1)


def create_table(db_host, db_port, db_user, db_pass, db_name, create_stmt_file):
    cmd = "mysql -h" + db_host + " -P" + db_port + " -u" + db_user + \
        " -p" + db_pass + " -D" + db_name + " <" + create_stmt_file
    if os.system(cmd) != 0:
        print("[ERROR]: execute SQL failed. command: " + cmd)
        exit(1)


def import_file_csv_with_header(db_host, db_port, db_user, db_pass, csv_file, db, table, with_header=False):
    if with_header:
        in_file = open(csv_file)
        schema = in_file.readline().strip('\n')
        load_cmd = "load data local infile \"" + csv_file + "\" into table `" + db + "`.`" + table + "` character set utf8mb4 FIELDS TERMINATED BY \",\" enclosed by \"\\\"\" lines terminated by \"\\n\" ignore 1 lines" + \
            " (" + schema + ")"
        in_file.close()
    else:
        load_cmd = "load data local infile \"" + csv_file + "\" into table `" + db + "`.`" + table + "` character set utf8mb4 FIELDS TERMINATED BY \",\" enclosed by \"\\\"\""
    
    cmd = "mysql --local_infile=1 -h" + db_host + " -P" + db_port + " -u" + \
        db_user + " -p" + db_pass + " -e '" + load_cmd + "'"

    print("[INFO]: trying to exec: " + cmd)
    if os.system(cmd) != 0:
        print("[ERROR]: execute SQL failed. command: " + cmd)
        exit(1)


def import_file_sql(db_host, db_port, db_user, db_pass, sql_file):
    cmd = "mysql -h" + db_host + " -P" + db_port + \
        " -u" + db_user + " -p" + db_pass + " <" + sql_file
    print("[INFO]: trying to exec: " + cmd)
    if os.system(cmd) != 0:
        print("[ERROR]: execute SQL failed. command: " + cmd)
        exit(1)


def print_usage():
    print(
        "Usage: python ./restore_mysql.py [backupset_directory] [database_host] [database_port] [database_username] [database_password]")


enable_foreign_key_check = None

def read_db_foreign_key_enable():
    global enable_foreign_key_check
    cmd = "mysql -h" + db_host + " -P" + db_port + " -u" + db_user + " -p" + db_pass + " -e " + "'SELECT @@FOREIGN_KEY_CHECKS'" # + " | awk '{print $1}' | sed -n '2,1p' "
    from subprocess import check_output
    try:
        cmd_output = check_output(cmd, shell=True)
        cmd_output = cmd_output.decode()
        foreign_check_enable = cmd_output.split("\n")[1]
        if foreign_check_enable == "1":
            print("[INFO]: foreign key check is on")
            enable_foreign_key_check = True
        elif foreign_check_enable == "0":
            print("[INFO]: foreign key check is off")
            enable_foreign_key_check = False
    except Exception:
        print("[WARN] try to get foreign key config failed. won't change foreign key config for MySQL")
        # do nothing


def do_disable_foreign_key_check():
    global enable_foreign_key_check
    if enable_foreign_key_check is True:
        print("[INFO]: try to disable foreign key check before importing data...")
        cmd = "mysql -h" + db_host + " -P" + db_port + " -u" + db_user + " -p" + db_pass + " -e " + "'SET GLOBAL FOREIGN_KEY_CHECKS=0'"
        if os.system(cmd) != 0:
            print("[ERROR]: execute SQL failed. command: " + cmd)
            exit(1)
        print("[INFO]: success disable foreign key check")
    else:
        print("[INFO]: no need to disable foreign key check before importing data")

def do_enable_foreign_key_check():
    global enable_foreign_key_check
    if enable_foreign_key_check is True:
        print("[INFO]: try to enable foreign key check after importing data...")
        cmd = "mysql -h" + db_host + " -P" + db_port + " -u" + db_user + " -p" + db_pass + " -e " + "'SET GLOBAL FOREIGN_KEY_CHECKS=1'"
        if os.system(cmd) != 0:
            print("[ERROR]: execute SQL failed. command: " + cmd)
            exit(1)
        print("[INFO]: success enable foreign key check")
    else:
        print("[INFO]: no need to enable foreign key check after importing data")

if __name__ == '__main__':
    if len(sys.argv) != 6:
        print_usage()
        exit()

    root_dir = os.path.abspath(sys.argv[1])
    db_host = sys.argv[2]
    db_port = sys.argv[3]
    db_user = sys.argv[4]
    db_pass = sys.argv[5]
    print("[INFO]: restore data from " + root_dir +
          " to " + db_host + ":" + db_port)

    read_db_foreign_key_enable()

    do_disable_foreign_key_check()

    try:
        db_dirs = os.listdir(root_dir)
        for db_dir in db_dirs:
            dir_path = os.path.join(root_dir, db_dir)
            if not os.path.isdir(dir_path):
                continue
            db_structure_file = os.path.join(dir_path, STRUCTURE_FILE_NAME)
            create_database(db_host, db_port, db_user, db_pass, db_structure_file)
            print("[INFO]: restore structure database: " + db_dir + " ends")

            table_dirs = os.listdir(dir_path)
            for table_dir in table_dirs:
                table_dir_path = os.path.join(dir_path, table_dir)
                if not os.path.isdir(table_dir_path):
                    continue
                table_structure_file = os.path.join(table_dir_path, STRUCTURE_FILE_NAME)
                create_table(db_host, db_port, db_user, db_pass, db_dir, table_structure_file)
                print("[INFO]: restore structure table: " + table_dir + " ends")

                table_data_dir_path = os.path.join(table_dir_path, DATA_PATH_PREFIX)
                if not os.path.isdir(table_data_dir_path):
                    continue
                
                filename_slices = os.listdir(table_data_dir_path)[0].split(".")
                files_format = filename_slices[-1]
                if files_format == "csv":
                    with_header = len(filename_slices) > 1 and filename_slices[-2]=="wh" # .wh.csv is csv with header, .csv is csv without header
                    csv_files = os.listdir(table_data_dir_path)
                    csv_count = 0
                    for csv_file in csv_files:
                        csv_file_path = os.path.join(table_data_dir_path, csv_file)
                        file_size = os.path.getsize(csv_file_path)
                        if file_size > 0:
                            import_file_csv_with_header(db_host, db_port, db_user,
                                            db_pass, csv_file_path, db_dir, table_dir, with_header)
                            csv_count = csv_count + 1
                            print("[INFO]: restore data [" + str(csv_count) + "/" + str(
                                len(csv_files)) + "] of table " + db_dir + "." + table_dir)
                elif files_format == "sql":
                    sql_files = os.listdir(table_data_dir_path)
                    sql_count = 0
                    for sql_file in sql_files:
                        sql_file_path = os.path.join(table_data_dir_path, sql_file)
                        file_size = os.path.getsize(sql_file_path)
                        if file_size > 0:
                            import_file_sql(db_host, db_port,
                                            db_user, db_pass, sql_file_path)
                            sql_count = sql_count + 1
                            print("[INFO]: restore data [" + str(sql_count) + "/" + str(
                                len(sql_files)) + "] of table " + db_dir + "." + table_dir)
    finally:
        do_enable_foreign_key_check()

性能瓶颈分析

综合分析后,主要瓶颈集中在两个方面:

  1. MySQL 服务端参数未针对大规模导入进行优化
  2. 数据导入逻辑为单线程,无法利用多核 CPU 与磁盘并发能力

因此,优化方向也明确为两点:

  • MySQL 配置专项调优
  • 重写导入脚本,实现并行、高吞吐导入

MySQL 配置专项优化

针对一次性大规模数据导入场景,对 my.cnf 进行针对性调整,核心原则包括:

  • 最大化内存与 I/O 利用率
  • 暂时关闭影响写入性能的安全与一致性特性
  • 导入完成后可恢复为常规生产配置

主要优化点包括但不限于:

  • 关闭 binlog、双写缓冲
  • 降低事务刷盘频率
  • 提高 innodb_buffer_pool_size
  • 提高 max_allowed_packet
  • 禁用性能与监控相关模块
针对大文件导入优化的 my.cnf 建议配置
## Mysql 5.7 配置,其他版本请依据实际情况自己修改
[client]
port = 3306
socket = /tmp/mysql.sock
default-character-set = utf8mb4

[mysql]
no-auto-rehash
default-character-set = utf8mb4

[mysqld]
# ============================================
# 基础设置
# ============================================
user = mysql
port = 3306
basedir = /usr/local/mysql
datadir = /usr/local/mysql/data
socket = /tmp/mysql.sock
pid-file = /usr/local/mysql/data/mysql.pid
tmpdir = /tmp

character-set-server = utf8mb4
collation-server = utf8mb4_unicode_ci
default-storage-engine = InnoDB

# ============================================
# 网络优化(大文件导入)
# ============================================
max_connections = 200
max_allowed_packet = 1G
connect_timeout = 600
wait_timeout = 28800
interactive_timeout = 28800
net_read_timeout = 600
net_write_timeout = 600
net_buffer_length = 16K
back_log = 300

# ============================================
# 安全设置
# ============================================
skip-name-resolve
local_infile = ON
explicit_defaults_for_timestamp = ON
lower_case_table_names = 0
sql_mode = NO_ENGINE_SUBSTITUTION

# ============================================
# 日志配置(导入时最小化)
# ============================================
log-error = /usr/local/mysql/data/mysql-error.log
log_warnings = 2
slow_query_log = OFF
long_query_time = 2
slow_query_log_file = /usr/local/mysql/data/mysql-slow.log
general_log = OFF

# 二进制日志(导入时禁用)
skip-log-bin
# log-bin = /usr/local/mysql/data/mysql-bin
# expire_logs_days = 7
# max_binlog_size = 1G
# binlog_format = ROW
# sync_binlog = 0

# ============================================
# 缓存优化(8核CPU)
# ============================================
thread_cache_size = 100
table_open_cache = 4096
table_definition_cache = 2048
table_open_cache_instances = 8

# 查询缓存(禁用)
query_cache_type = 0
query_cache_size = 0

# 排序和连接缓冲
sort_buffer_size = 16M
join_buffer_size = 16M
read_buffer_size = 8M
read_rnd_buffer_size = 16M

# MyISAM 键缓存
key_buffer_size = 128M

# 临时表(16GB内存配置)
tmp_table_size = 1G
max_heap_table_size = 1G

# ============================================
# InnoDB 核心配置(16GB内存 + 5万IOPS优化)
# ============================================

# 缓冲池设置(16GB的70% = 11GB)
innodb_buffer_pool_size = 11G
innodb_buffer_pool_instances = 8
innodb_buffer_pool_chunk_size = 128M

# 重做日志配置
innodb_log_buffer_size = 128M
innodb_log_file_size = 2G
innodb_log_files_in_group = 2

# 刷新策略(导入时设为0最快,导入完改回2)
innodb_flush_log_at_trx_commit = 0

# 刷新方法(SSD推荐O_DIRECT)
innodb_flush_method = O_DIRECT

# 数据文件
innodb_data_file_path = ibdata1:1G:autoextend
innodb_autoextend_increment = 256

# I/O线程(8核CPU配置)
innodb_read_io_threads = 8
innodb_write_io_threads = 8
innodb_purge_threads = 4
innodb_page_cleaners = 4

# I/O容量(针对5万IOPS的硬盘)
innodb_io_capacity = 10000
innodb_io_capacity_max = 20000

# 脏页配置(导入时提高)
innodb_max_dirty_pages_pct = 90
innodb_max_dirty_pages_pct_lwm = 10

# SSD优化
innodb_flush_neighbors = 0

# 双写缓冲(导入时禁用,导入完启用)
innodb_doublewrite = OFF

# 锁配置
innodb_lock_wait_timeout = 300
innodb_rollback_on_timeout = ON
innodb_autoinc_lock_mode = 2

# 自适应哈希(导入时禁用)
innodb_adaptive_hash_index = OFF

# 变更缓冲(最大化)
innodb_change_buffering = all
innodb_change_buffer_max_size = 50

# Undo日志
innodb_undo_tablespaces = 3
innodb_undo_logs = 128
innodb_undo_log_truncate = ON
innodb_max_undo_log_size = 1G

# 并发控制(0=自动)
innodb_thread_concurrency = 0
innodb_concurrency_tickets = 5000

# 预读优化(导入时禁用预读)
innodb_read_ahead_threshold = 0
innodb_random_read_ahead = OFF

# 文件配置
innodb_file_per_table = ON
innodb_open_files = 4096

# 监控
innodb_print_all_deadlocks = OFF

# 同步优化
innodb_sync_array_size = 8
innodb_sync_spin_loops = 30

# LRU扫描深度
innodb_lru_scan_depth = 512

# 统计信息
innodb_stats_persistent = ON
innodb_stats_auto_recalc = ON
innodb_stats_persistent_sample_pages = 20
innodb_stats_on_metadata = OFF

# 日志压缩
innodb_log_compressed_pages = OFF

# 排序缓冲
innodb_sort_buffer_size = 64M

# 在线DDL
innodb_online_alter_log_max_size = 1G

# 缓冲池预加载
innodb_buffer_pool_dump_at_shutdown = ON
innodb_buffer_pool_load_at_startup = ON
innodb_buffer_pool_dump_pct = 25

# 快速关闭
innodb_fast_shutdown = 1

# 严格模式(导入时关闭)
innodb_strict_mode = OFF

# ============================================
# 性能优化
# ============================================
performance_schema = OFF
symbolic-links = 0
skip-external-locking
open_files_limit = 65535
transaction_isolation = READ-COMMITTED
autocommit = ON

# 优化器
eq_range_index_dive_limit = 200
optimizer_switch = 'index_merge=on,index_merge_union=on,index_merge_sort_union=on,index_merge_intersection=on,engine_condition_pushdown=on,index_condition_pushdown=on,mrr=on,mrr_cost_based=on,block_nested_loop=on,batched_key_access=on,materialization=on,semijoin=on,loosescan=on,firstmatch=on,duplicateweedout=on,subquery_materialization_cost_based=on,use_index_extensions=on,condition_fanout_filter=on,derived_merge=on'

# 磁盘临时表引擎
internal_tmp_disk_storage_engine = InnoDB

# ============================================
# 导入专用设置
# ============================================
bulk_insert_buffer_size = 128M
delay_key_write = ALL
myisam_sort_buffer_size = 128M
myisam_max_sort_file_size = 10G

# ============================================
# 其他优化
# ============================================
max_connect_errors = 1000
max_prepared_stmt_count = 16382
thread_stack = 256K
max_sort_length = 1024
max_length_for_sort_data = 4096
lock_wait_timeout = 31536000
host_cache_size = 0
event_scheduler = OFF
secure_file_priv = ''

[mysqldump]
quick
quote-names
max_allowed_packet = 1G
default-character-set = utf8mb4
single-transaction
skip-lock-tables

[myisamchk]
key_buffer_size = 256M
sort_buffer_size = 256M
read_buffer = 128M
write_buffer = 128M

多进程导入脚本重构

在 MySQL 层完成优化后,单线程导入依然无法满足效率要求,因此对官方脚本进行了完全重构,核心目标是:

设计目标

  • 多进程并行导入
  • 自动识别结构 / 数据完整性
  • 支持 SQL 与 CSV 混合格式
  • 导入完成后自动清理源文件,释放磁盘空间
  • 导入过程可视化、可控、可中断

脚本核心能力

  • 预扫描备份目录并生成导入报告
  • 自动识别以下几类表:
    • 结构 + 数据完整(正常导入)
    • 仅结构(空表)
    • 有数据但缺失结构(严重错误,需人工处理)
  • 按 CPU 核心数动态分配并发度
  • 每个表作为最小调度单元,负载均衡
  • 数据导入完成后自动删除 .sql / .csv 文件

并发策略

  • 结构导入阶段:全核并行(耗时极短)
  • 数据导入阶段:保留 2 个核心给 MySQL,其余全部用于导入任务
  • 使用 imap_unordered 确保无空闲进程
多线程导入脚本 (Python 3)
#!/usr/bin/python3
# ****************************************************************#
# ScriptName: auto_clean_restore.py (v6.0 CN + AutoClean)
# Function: 预检 -> 确认 -> 导入 -> 自动删除源文件
# ***************************************************************#

import os
import sys
import subprocess
import shutil
from multiprocessing import Pool, cpu_count

STRUCTURE_FILE_NAME = "structure.sql"
DATA_PATH_PREFIX = "data"

# --- 辅助函数: 生成 MySQL 命令 ---
def get_cmd(db_host, db_port, db_user, db_pass, db_name=None):
    cmd = [
        'mysql',
        f'-h{db_host}', f'-P{db_port}', f'-u{db_user}', f'-p{db_pass}',
        '--default-character-set=utf8mb4',
        '--net_buffer_length=1M',
        '--max_allowed_packet=1G'
    ]
    if db_name:
        cmd.append(f'-D{db_name}')
    return cmd

# --- 扫描逻辑 ---
def scan_backup_directory(root_dir):
    """扫描目录并返回分类后的任务列表"""
    valid_tasks = []
    
    report = {
        "total_tables": 0,
        "ready_full": [],      # 结构 + 数据 (完美)
        "ready_struct": [],    # 仅结构 (空表)
        "missing_struct": [],  # 有数据但无结构 (严重错误)
        "useless": []          # 啥都没 (废弃目录)
    }

    # 排序以保证显示美观
    db_dirs = sorted([d for d in os.listdir(root_dir) if os.path.isdir(os.path.join(root_dir, d))])
    
    for db in db_dirs:
        db_path = os.path.join(root_dir, db)
        tables = sorted([t for t in os.listdir(db_path) if os.path.isdir(os.path.join(db_path, t))])
        
        for t in tables:
            t_path = os.path.join(db_path, t)
            report["total_tables"] += 1
            identifier = f"{db}.{t}"

            # 检查结构
            struc_file = os.path.join(t_path, STRUCTURE_FILE_NAME)
            has_struct = os.path.exists(struc_file) and os.path.getsize(struc_file) > 0

            # 检查数据
            data_dir = os.path.join(t_path, DATA_PATH_PREFIX)
            has_data = False
            if os.path.isdir(data_dir):
                # 只统计非空的 sql 或 csv
                valid_files = [f for f in os.listdir(data_dir) 
                               if (f.endswith('.sql') or f.endswith('.csv')) 
                               and os.path.getsize(os.path.join(data_dir, f)) > 0]
                if valid_files:
                    has_data = True

            # 分类
            task_info = (db, t, t_path) 

            if has_struct and has_data:
                report["ready_full"].append(identifier)
                valid_tasks.append(task_info)
            elif has_struct and not has_data:
                report["ready_struct"].append(identifier)
                valid_tasks.append(task_info) # 仅导入结构
            elif not has_struct and has_data:
                report["missing_struct"].append(identifier)
                # 这种无法导入,不加入任务列表
            else:
                report["useless"].append(identifier)

    return report, valid_tasks

# --- 工作进程 1: 导入结构 (带自动删除) ---
def restore_structure_worker(args):
    db_name, table_name, table_path, db_host, db_port, db_user, db_pass = args
    struc_file = os.path.join(table_path, STRUCTURE_FILE_NAME)
    
    if not os.path.exists(struc_file): return True

    try:
        cmd = get_cmd(db_host, db_port, db_user, db_pass, db_name)
        with open(struc_file, 'r', encoding='utf-8', errors='replace') as f:
            res = subprocess.run(cmd, stdin=f, capture_output=True, text=True)
        
        if res.returncode != 0:
            if "already exists" in res.stderr: 
                # 表已存在也视为成功,尝试删除文件
                pass 
            else:
                raise Exception(res.stderr.strip())
        
        # 【自动删除】导入成功,删除 structure.sql
        try:
            os.remove(struc_file)
        except OSError:
            pass

        return True
    except Exception as e:
        print(f"[结构失败] `{db_name}`.`{table_name}`: {e}", flush=True)
        return False

# --- 工作进程 2: 导入数据 (带自动删除) ---
def restore_data_worker(args):
    db_name, table_name, table_path, db_host, db_port, db_user, db_pass = args
    
    data_dir = os.path.join(table_path, DATA_PATH_PREFIX)
    if not os.path.isdir(data_dir): 
        # 如果没有数据目录,尝试删除表目录(如果为空)清理垃圾
        try: os.rmdir(table_path)
        except: pass
        return True
    
    files = sorted([os.path.join(data_dir, f) for f in os.listdir(data_dir) if os.path.getsize(os.path.join(data_dir, f)) > 0])
    if not files:
        try: os.rmdir(data_dir)
        except: pass
        return True

    is_csv = files[0].endswith('.csv')
    process = None

    try:
        print(f"[数据导入-{os.getpid()}]: 🚀 开始 `{db_name}`.`{table_name}` (共 {len(files)} 个文件)", flush=True)
        cmd = get_cmd(db_host, db_port, db_user, db_pass, db_name)
        if is_csv: cmd.append('--local-infile=1')

        process = subprocess.Popen(cmd, stdin=subprocess.PIPE, stdout=subprocess.PIPE, stderr=subprocess.PIPE, text=True, bufsize=1024*1024)

        # 快速检查连接
        try:
            process.wait(timeout=0.1)
            out, err = process.communicate()
            raise Exception(f"数据库连接断开: {err.strip()}")
        except subprocess.TimeoutExpired:
            pass

        # --- 策略 A: SQL 流式导入 ---
        if not is_csv:
            process.stdin.write("SET unique_checks=0; SET foreign_key_checks=0; SET sql_log_bin=0; SET autocommit=0;\n")
            for fpath in files:
                with open(fpath, 'r', encoding='utf-8', errors='replace', buffering=1024*1024) as f:
                    shutil.copyfileobj(f, process.stdin)
                # 每个文件提交一次
                process.stdin.write("\nCOMMIT;\nSET autocommit=0;\n")
            process.stdin.write("COMMIT;\n")
            process.stdin.close()
        
        # --- 策略 B: CSV 导入 ---
        else:
            csv_cmds = ["SET unique_checks=0; SET foreign_key_checks=0; SET sql_log_bin=0; SET autocommit=0;"]
            schema = ""
            parts = os.path.basename(files[0]).split('.')
            if len(parts) > 1 and parts[-2] == "wh":
                with open(files[0], 'r', encoding='utf-8') as f: schema = f.readline().strip()
            for fpath in files:
                path_str = os.path.abspath(fpath).replace('\\', '/')
                load = f"LOAD DATA LOCAL INFILE '{path_str}' INTO TABLE `{table_name}` CHARACTER SET utf8mb4 FIELDS TERMINATED BY ',' ENCLOSED BY '\"' LINES TERMINATED BY '\\n'"
                if schema: load += f" IGNORE 1 LINES ({schema})"
                csv_cmds.append(load + "; COMMIT; SET autocommit=0;")
            csv_cmds.append("COMMIT;")
            out, err = process.communicate(input="\n".join(csv_cmds))
            if process.returncode != 0: raise Exception(err)

        # 等待完成
        if process.poll() is None: process.wait()
        if process.returncode != 0:
            _, err = process.communicate()
            raise Exception(err.strip())

        # 【自动删除】导入成功,开始清理文件
        for fpath in files:
            try: os.remove(fpath)
            except OSError: pass
        
        # 尝试删除空的 data 目录
        try: os.rmdir(data_dir)
        except OSError: pass

        # 尝试删除空的表目录 (如果结构文件和数据文件都删光了)
        try: os.rmdir(table_path)
        except OSError: pass

        print(f"[数据完成-{os.getpid()}]: ✅ 已清理 `{db_name}`.`{table_name}`", flush=True)
        return True

    except Exception as e:
        print(f"[数据失败] `{db_name}`.`{table_name}`: {e}", flush=True)
        if process and process.poll() is None: process.terminate()
        return False

# --- 主程序 ---
def main():
    if len(sys.argv) != 6:
        print("用法: python3 auto_clean_restore.py [目录] [主机] [端口] [用户] [密码]")
        sys.exit(1)

    root, host, port, user, password = os.path.abspath(sys.argv[1]), sys.argv[2], sys.argv[3], sys.argv[4], sys.argv[5]

    print(f"--- 🔍 阶段 0: 正在扫描备份目录: {root} ---")
    print("文件扫描中... 请稍候...")
    
    report, valid_tasks = scan_backup_directory(root)

    # --- 打印报告 ---
    print("\n" + "="*60)
    print(f"📑 数据库导入预检报告 (共扫描表: {report['total_tables']})")
    print("="*60)

    # 1. 严重问题
    if report['missing_struct']:
        print(f"\n🚨 [严重错误] 缺失表结构 (有数据但无 CREATE TABLE):")
        print(f"   数量: {len(report['missing_struct'])}")
        print("-" * 30)
        for item in report['missing_struct']:
            print(f"   ❌ {item}")
        print("-" * 30)
        print("   👉 这些表无法自动恢复,需要人工处理!")
    else:
        print("\n✅ 未发现严重的表结构缺失问题。")

    # 2. 空表
    if report['ready_struct']:
        print(f"\n⚠️  [提示] 仅结构 (空表):")
        print(f"   数量: {len(report['ready_struct'])}")
        if len(report['ready_struct']) < 10:
            for item in report['ready_struct']: print(f"   - {item}")
        else:
            print(f"   - (列表太长,共 {len(report['ready_struct'])} 个空表)")

    # 3. 正常任务
    print(f"\n📦 [就绪] 准备全量导入 (结构 + 数据):")
    print(f"   数量: {len(report['ready_full'])}")

    # 4. 无用目录
    if report['useless']:
        print(f"\n🗑️  [忽略] 无结构且无数据 (废弃目录):")
        print(f"   数量: {len(report['useless'])}")

    print("="*60)
    print(f"\n有效导入任务总数: {len(valid_tasks)}")
    print("⚠️  警告: 导入成功后,源文件 (.sql/.csv) 将被【永久删除】以释放空间!")
    
    # --- 确认执行 ---
    if not valid_tasks:
        print("❌ 没有发现需要导入的表。程序退出。")
        sys.exit(0)

    user_input = input("\n🚀 是否确认开始导入并自动清理?(输入 y 确认): ").strip().lower()
    if user_input != 'y':
        print("⛔️ 用户取消操作。")
        sys.exit(0)

    # --- 开始执行 ---
    # 准备参数
    restore_args = [ (t[0], t[1], t[2], host, port, user, password) for t in valid_tasks ]
    
    # 阶段 1: 创建数据库
    print(f"\n>>> [阶段 1] 正在创建数据库...")
    unique_dbs = set([t[0] for t in valid_tasks])
    for db in unique_dbs:
        subprocess.run(get_cmd(host, port, user, password), input=f"CREATE DATABASE IF NOT EXISTS `{db}`;", text=True, stderr=subprocess.DEVNULL)

    # 阶段 2: 导入结构
    print(f"\n>>> [阶段 2] 并发导入表结构...")
    # 结构导入很快,使用全部核心
    with Pool(cpu_count()) as pool:
        pool.map(restore_structure_worker, restore_args)

    # 阶段 3: 导入数据
    print(f"\n>>> [阶段 3] 并发导入数据 (完成后自动删除源文件)...")
    # 数据导入较重,保留 2 个核心给数据库
    workers = max(1, cpu_count() - 2)
    print(f"启动 {workers} 个并发进程 (负载均衡模式)...")
    with Pool(workers) as pool:
        # chunksize=1 确保每次只领 1 个表,干完立刻去领下一个,绝不闲置
        list(pool.imap_unordered(restore_data_worker, restore_args, chunksize=1))

    print("\n--- 🎉 全部完成!源文件已清理 ---")

if __name__ == '__main__':
    main()

实际效果

在相同硬件条件下,对比结果如下:

项目官方脚本优化后
CPU 利用率< 20%80%+
IO 吞吐极低接近磁盘上限
导入速度几 GB / 小时数十 GB / 小时
磁盘占用持续增长实时释放

对于超大规模快照(数百 GB 以上),该方案可以显著缩短恢复时间,并避免因磁盘空间耗尽而中断任务。

总结

恢复超大规模数据库快照,性能瓶颈往往不在 SQL,而在配置与并发模型。