Python MySQL to MSSQL 資料匯出

mysql 使用 mysqldump 產生的 .sql 檔案, 要匯入到 mssql 之中, 由於匯出的資料量太大, 直接轉換 mysql sql command 為 mssql sql command, 如果使用 SSMS(SQL Server Management Studio) 會遇到記憶體不足的問題.

透過 sqlcmd 匯入資料庫 (不經過 SSMS):

sqlcmd -S localhost -d %db_name% -E -i %output_file%

這個也會遇到很多奇奇怪怪的問題, 明明在 SSMS 執行正常的 table, 在 sqlcmd 會遇到重覆插入 PK 值.

最佳解法是透過下列的python script 進行 export, 這是整合了「逐行讀取」、「自動切分 1000 筆」、「停用約束」以及修正後語法的完整版本:

import pyodbc
import re
import argparse
import sys

# MSSQL 連線設定
conn_config = {
    "DRIVER": "{ODBC Driver 18 for SQL Server}",
    "SERVER": "127.0.0.1",
    "DATABASE": "你的資料庫名稱",
    "UID": "帳號",
    "PWD": "密碼",
    "Encrypt": "yes",
    "TrustServerCertificate": "yes"
}

conn_str = ";".join([f"{k}={v}" for k, v in conn_config.items()])

def split_values(values_str):
    # 使用正規表示法找出所有 (v1, v2, ...) 結構,處理跨行與大資料
    rows = re.findall(r"(\((?:[^()]|\([^()]*\))*\))", values_str)
    return rows

def run_import(sql_file):
    conn = None
    try:
        # 關閉自動提交,手動控制 commit 頻率
        conn = pyodbc.connect(conn_str, autocommit=False)
        cursor = conn.cursor()
        
        # 1. 啟動前置作業:停用所有外鍵約束,避免 TRUNCATE 失敗
        print("正在暫時停用資料庫約束...")
        cursor.execute("EXEC sp_MSforeachtable 'ALTER TABLE ? NOCHECK CONSTRAINT ALL'")
        
        insert_pattern = re.compile(r"INSERT INTO `(.+?)` \((.+?)\) VALUES\s*(.+)", re.I | re.S)
        
        with open(sql_file, 'r', encoding='utf-8') as f:
            sql_buffer = []
            for line in f:
                clean_line = line.strip()
                if not clean_line or clean_line.startswith('--') or clean_line.startswith('/*'):
                    continue
                
                sql_buffer.append(line)
                
                # 偵測到分號代表一個完整的語句結束
                if clean_line.endswith(';'):
                    full_sql = "".join(sql_buffer).strip()
                    sql_buffer = []
                    
                    if full_sql.startswith("INSERT INTO"):
                        # 去掉結尾分號進行內容解析
                        match = insert_pattern.search(full_sql[:-1])
                        if match:
                            table_name = match.group(1)
                            columns = match.group(2).replace("`", "")
                            # 處理 MariaDB 跳脫字元,將 \' 轉為 MSSQL 的 ''
                            raw_values = match.group(3).replace("\\'", "''")
                            
                            print(f"正在處理資料表: {table_name}")
                            
                            try:
                                # 執行初始化作業
                                cursor.execute(f"TRUNCATE TABLE {table_name}")
                                cursor.execute(f"SET IDENTITY_INSERT {table_name} ON")
                                
                                # 將巨大的 VALUES 拆分為每 1000 筆一組
                                all_rows = split_values(raw_values)
                                chunk_size = 1000
                                for i in range(0, len(all_rows), chunk_size):
                                    chunk = all_rows[i:i + chunk_size]
                                    values_part = ",".join(chunk)
                                    cursor.execute(f"INSERT INTO {table_name} ({columns}) VALUES {values_part}")
                                
                                cursor.execute(f"SET IDENTITY_INSERT {table_name} OFF")
                                
                                # 每完成一個 table 才提交一次
                                conn.commit()
                                print(f"成功匯入 {table_name},共 {len(all_rows)} 筆。")
                                
                            except Exception as table_err:
                                conn.rollback()
                                print(f"資料表 {table_name} 匯入失敗: {table_err}")

        # 2. 結束作業:重新啟用約束檢查
        print("正在還原資料庫約束...")
        cursor.execute("EXEC sp_MSforeachtable 'ALTER TABLE ? WITH CHECK CHECK CONSTRAINT ALL'")
        conn.commit()
        print("所有資料同步作業已完成")

    except Exception as e:
        print(f"程式執行異常: {e}")
    finally:
        if conn:
            conn.close()

if __name__ == "__main__":
    parser = argparse.ArgumentParser(description="MariaDB To MSSQL Migration Tool")
    parser.add_argument("input", help="Path to the .sql dump file")
    
    args = parser.parse_args()
    run_import(args.input)

如果執行過程中遇到記憶體不足的問題,通常是因為 split_values 處理了單一極其巨大的 INSERT 字串。若發生此情況,請告訴我,我們可以改用生成器(Generator)模式來進一步優化字串解析。

上面這個版本針對一般情況是可以使用,針對特定的資料庫欄位類型(例如 Blob 或特殊的日期格式)則需要再進一步的轉換處理。

執行結果:

發佈留言

發佈留言必須填寫的電子郵件地址不會公開。 必填欄位標示為 *