1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 329 330 331 332 333 334 335 336 337 338 339 340 341 342 343 344 345 346 347 348 349 350 351 352 353 354 355 356 357 358 359 360 361 362 363 364 365
|
import pymysql import sys import time from tqdm import tqdm
class MySQLMigrator: def __init__(self, source_config, target_config): self.source_config = source_config self.target_config = target_config self.source_conn = None self.target_conn = None
def connect_databases(self): """连接源数据库和目标数据库(优化逻辑,降低权限要求)""" try: self.source_conn = pymysql.connect(**self.source_config) print("源数据库连接成功")
try: self.target_conn = pymysql.connect(**self.target_config) print(f"成功连接到目标业务数据库 `{self.target_config['database']}`")
except pymysql.err.OperationalError as e: error_code, error_msg = e.args if error_code == 1049: print(f"目标数据库 `{self.target_config['database']}` 不存在,尝试创建...") self._create_target_database_via_mysql() else: raise e
except Exception as e: raise Exception(f"数据库连接失败: {str(e)}")
def _create_target_database_via_mysql(self): """通过连接'mysql'系统库来创建目标数据库""" try: temp_target_config = self.target_config.copy() temp_target_config['database'] = 'mysql'
temp_conn = pymysql.connect(**temp_target_config) with temp_conn.cursor() as cursor: charset, collation = self.get_database_charset() cursor.execute( f"CREATE DATABASE IF NOT EXISTS `{self.target_config['database']}` " f"CHARACTER SET {charset} COLLATE {collation}" ) temp_conn.commit() temp_conn.close() print(f"目标数据库 `{self.target_config['database']}` 创建成功。")
self.target_conn = pymysql.connect(**self.target_config) print(f"成功连接到新创建的目标数据库 `{self.target_config['database']}`")
except pymysql.err.OperationalError as e: error_code, error_msg = e.args if error_code == 1044: raise Exception( f"无法创建目标数据库。当前用户 `{self.target_config['user']}` 没有创建数据库的权限。\n" f"请先使用管理员账户(如root)在目标MySQL服务器上手动创建数据库 `{self.target_config['database']}`,\n" f"或者授予当前用户创建数据库的权限(如 GRANT CREATE ON *.* TO ...)。" ) else: raise e except Exception as e: raise Exception(f"通过'mysql'系统库创建目标数据库失败: {str(e)}")
def get_database_charset(self): """获取源数据库的字符集和排序规则""" try: with self.source_conn.cursor() as cursor: cursor.execute( "SELECT DEFAULT_CHARACTER_SET_NAME, DEFAULT_COLLATION_NAME " "FROM information_schema.SCHEMATA WHERE SCHEMA_NAME = %s", (self.source_config['database'],) ) result = cursor.fetchone() return result[0], result[1] except Exception as e: raise Exception(f"获取数据库字符集失败: {str(e)}")
def get_all_tables(self): """获取源数据库中的所有表名""" try: with self.source_conn.cursor() as cursor: cursor.execute( "SELECT TABLE_NAME FROM information_schema.TABLES " "WHERE TABLE_SCHEMA = %s AND TABLE_TYPE = 'BASE TABLE'", (self.source_config['database'],) ) tables = [row[0] for row in cursor.fetchall()] return tables except Exception as e: raise Exception(f"获取表列表失败: {str(e)}")
def table_exists_in_target(self, table_name): """检查表是否已存在于目标数据库""" try: with self.target_conn.cursor() as cursor: cursor.execute( "SELECT COUNT(*) FROM information_schema.TABLES " "WHERE TABLE_SCHEMA = %s AND TABLE_NAME = %s", (self.target_config['database'], table_name) ) return cursor.fetchone()[0] > 0 except Exception as e: raise Exception(f"检查表存在性失败: {str(e)}")
def get_table_creation_order(self): """获取基于外键依赖关系的正确表创建顺序""" try: with self.source_conn.cursor() as cursor: cursor.execute(""" SELECT TABLE_NAME, REFERENCED_TABLE_NAME FROM information_schema.KEY_COLUMN_USAGE WHERE CONSTRAINT_SCHEMA = %s AND REFERENCED_TABLE_NAME IS NOT NULL """, (self.source_config['database'],))
dependencies = {} for table, referenced_table in cursor.fetchall(): if table not in dependencies: dependencies[table] = [] dependencies[table].append(referenced_table)
visited = {} creation_order = []
def visit(table_name): if table_name in visited: if visited[table_name] == 1: raise Exception(f"检测到循环外键依赖,涉及表: {table_name}") return visited[table_name] = 1
if table_name in dependencies: for ref_table in dependencies[table_name]: visit(ref_table)
visited[table_name] = 2 if table_name not in creation_order: creation_order.append(table_name)
all_tables = self.get_all_tables() for table in all_tables: visit(table)
return creation_order
except Exception as e: raise Exception(f"分析表依赖关系失败: {str(e)}")
def migrate_table_structure(self, table_name): """迁移表结构(包括索引、约束、注释等)""" try: with self.source_conn.cursor() as source_cursor: source_cursor.execute(f"SHOW CREATE TABLE `{table_name}`") create_table_sql = source_cursor.fetchone()[1]
with self.target_conn.cursor() as target_cursor: target_cursor.execute(f"DROP TABLE IF EXISTS `{table_name}`") target_cursor.execute(create_table_sql)
self.target_conn.commit() except Exception as e: raise Exception(f"迁移表结构失败: {str(e)}")
def migrate_table_data(self, table_name, batch_size=1000): """迁移表数据(分批处理,避免内存溢出),返回实际迁移的行数[3](@ref)""" try: with self.source_conn.cursor() as source_cursor: source_cursor.execute(f"SELECT COLUMN_NAME FROM information_schema.COLUMNS " f"WHERE TABLE_SCHEMA = %s AND TABLE_NAME = %s " f"ORDER BY ORDINAL_POSITION", (self.source_config['database'], table_name)) columns = [row[0] for row in source_cursor.fetchall()] columns_str = ', '.join([f"`{col}`" for col in columns]) placeholders = ', '.join(['%s'] * len(columns))
source_cursor.execute(f"SELECT COUNT(*) FROM `{table_name}`") total_rows = source_cursor.fetchone()[0]
offset = 0 actual_rows_migrated = 0 with self.target_conn.cursor() as target_cursor: while offset < total_rows: source_cursor.execute(f"SELECT {columns_str} FROM `{table_name}` " f"LIMIT %s OFFSET %s", (batch_size, offset)) batch_data = source_cursor.fetchall()
if not batch_data: break
insert_sql = f"INSERT INTO `{table_name}` ({columns_str}) VALUES ({placeholders})" target_cursor.executemany(insert_sql, batch_data) self.target_conn.commit()
actual_rows_migrated += len(batch_data) offset += len(batch_data)
return actual_rows_migrated except Exception as e: raise Exception(f"迁移表数据失败: {str(e)}")
def migrate_database(self): """执行完整的数据库迁移(基于数据行数的平滑进度条)""" try: self.connect_databases()
all_tables = self.get_all_tables() tables_to_migrate = [t for t in all_tables if not self.table_exists_in_target(t)]
if not tables_to_migrate: print("没有需要迁移的表") return
creation_order = self.get_table_creation_order() tables_to_migrate_ordered = [t for t in creation_order if t in tables_to_migrate]
print(f"需要迁移的表数量: {len(tables_to_migrate_ordered)}") print("正在计算总数据量...")
total_rows_to_migrate = 0 table_row_counts = {}
with self.source_conn.cursor() as count_cursor: for table_name in tables_to_migrate_ordered: count_cursor.execute(f"SELECT COUNT(*) FROM `{table_name}`") table_row_count = count_cursor.fetchone()[0] table_row_counts[table_name] = table_row_count total_rows_to_migrate += table_row_count print(f" 表 `{table_name}`: {table_row_count} 行")
print(f"预估总数据行数: {total_rows_to_migrate}") print("-" * 50)
with self.target_conn.cursor() as cursor: cursor.execute("SET FOREIGN_KEY_CHECKS = 0") self.target_conn.commit()
print("正在迁移表结构...", end="", flush=True) for table_name in tables_to_migrate_ordered: self.migrate_table_structure(table_name) print(" 完成")
migrated_rows_so_far = 0 start_time = time.time()
with tqdm(total=total_rows_to_migrate, desc="总体进度", unit='row', unit_scale=True, ncols=100) as pbar: for table_name in tables_to_migrate_ordered: pbar.set_description(f"正在迁移: {table_name} ({table_row_counts[table_name]}行)")
try: rows_migrated_this_table = self.migrate_table_data(table_name)
migrated_rows_so_far += rows_migrated_this_table
pbar.update(rows_migrated_this_table)
tqdm.write(f"✅ 表 `{table_name}` 迁移完成 ({rows_migrated_this_table}/{table_row_counts[table_name]} 行)")
except Exception as e: print("\r" + " " * 100 + "\r", end="", flush=True) tqdm.write(f"❌ 迁移表 `{table_name}` 时出错: {str(e)}") raise
with self.target_conn.cursor() as cursor: cursor.execute("SET FOREIGN_KEY_CHECKS = 1") self.target_conn.commit()
print("-" * 50) total_time = time.time() - start_time print(f"🎉 数据库迁移完成!") if total_rows_to_migrate > 0: print(f"📊 统计信息: 共迁移 {len(tables_to_migrate_ordered)} 张表,{total_rows_to_migrate} 行数据,耗时 {total_time:.2f} 秒") print(f"📈 平均速率: {total_rows_to_migrate / total_time:.2f} 行/秒")
except Exception as e: print("\r" + " " * 100 + "\r", end="", flush=True) print(f"❌ 迁移过程中出错: {str(e)}") try: if self.target_conn: with self.target_conn.cursor() as cursor: cursor.execute("SET FOREIGN_KEY_CHECKS = 1") self.target_conn.commit() except: pass sys.exit(1) finally: if self.source_conn: self.source_conn.close() if self.target_conn: self.target_conn.close()
if __name__ == "__main__":
source_config = { 'host': 'ip', 'port': 3306, 'user': '', 'password': '', 'database': ', 'charset': 'utf8mb4' }
target_config = { 'host': 'ip', 'port': 3306, 'user': '', 'password': '', 'database': '', 'charset': 'utf8mb4' }
# 执行迁移 migrator = MySQLMigrator(source_config, target_config) migrator.migrate_database()
|