dbstructsync -mysql表、字段、索引差異解析工具(原創)

最近寫了一個工具(比較兩套測試環境mysql數據庫中表、表字段、索引的差異,基於python)通過文章簡單介紹下工具的相關內容

  1. 工具名稱
  2. 主要功能
  3. 具體使用方法
  4. 部分實現代碼
  5. 後續

一、工具名稱:

dbstructsync (python庫)

二、主要功能:

比較兩套環境中mysql指定庫中表、表字段及索引的差異,返回同步的sql ,裡面包含建表,修改索引,修改字段的sql .

A環境的數據庫db 作為sourcedb, B環境的數據庫db targetdb ,程序邏輯比較的是 sourcedb 與targetdb 的差異,然後返回一個list的數據類型 

list中包含新建表sql,修改、增加字段sql, 刪除、新增索引sql 

現在總共有3個版本,0.0.1 和0.0.2 存在一定的bug, 所以請使用最新的0.0.3版本

 

 

其他說明:由於是剛完成不久的程序,所以暫時不對最終結果sql進行執行,避免對使用過程中產生不好的影響,這個版本大家可以通過python 自行選擇需要執行哪些操作;隨着之後程序的逐步深入修改和演變,會將執行sql這一步也都加進去

同時也會優化使用方式,讓使用這個工具的小夥伴更容易操作

三、具體使用方法:

1、 pip install  -i https://pypi.python.org/pypi  dbstructsync   

在代碼里引入使用,

from DbStructSync import cli 

result=cli.db_sync(sourcedb, targetdb) 
#sourcedb,targetdb是兩個dict的參數,具體參數看下面
# 這裏得到的    result = ['use 庫;', 
#             'CREATE TABLE `test_async` (\n  `test_async` #varchar(30) NOT NULL,\n  `aa` varchar(400) DEFAULT NULL,\n  #PRIMARY KEY (`test_async`)\n) ENGINE=InnoDB DEFAULT #CHARSET=utf8;',
#             'drop index `index_chaxx` on chanxx_auto_puxx_conf;',
#             'create index `index_chaxx` on #chanxx_auto_puxx_conf(`channel_nxx`,`channel_prxx`) USING #BTREE;']
#result 中包含  use 庫;
#              如果有少的表,會有 create table的數據; 如果有不同的索引,會#存在drop index 和create index的sql;
#              如果有不同的字段,會有alter table的sql ;
#只需要對這個結果,再通過pymysql的一些數據庫操作就可以保證 sourcedb #的內容與taragetdb一致。

2、同時還支持命令行操作,代碼寫入到x.py代碼中

result = cli.db_sync_commandline()
python x.py --source  host=10.1.1.x,port=3306,user=root,passwd=root,db=investx --target host=10.1.1.x,port=3306,user=root,passwd=root,db=investx

 

命令行中  –source  key=value;key2=value2;key3=value3  –target key=value;key2=value2;key3=value3

–source, –target 是兩給必輸的參數,後續的值會作為一個dict類型傳入程序。 –source是源庫的信息, –target是目標庫的信息

還包括其他幾個命令參數 –only-index , –only-fields ; –only-index 只比較索引差異, –only-fields 只比較字段差異, 非必填,默認都為False 

四、部分實現代碼:

 

def diff_tables(sourcetable, targettable):
    '''

    :param sourcetable:  源數據庫的表名列表
    :param targettable:  目的數據庫的表名列表
    :return: 返回dict,包含三種結果,源庫多的表,目標庫多的表,相同的表
    '''
    logger.info('開始比較兩個庫中表的差異,源庫表{},目標庫表{}'.format(sourcetable, targettable))
    table_result={}
    if not isinstance(sourcetable, list) or not isinstance(targettable, list):
         raise  TypeError('sourcetable , targettable的類型不是list')
    source_diff = set(sourcetable) - set(targettable)
    target_diff = set(targettable) - set(sourcetable)
    same_tables = set(sourcetable)& set(targettable)
    table_result['source'] = source_diff
    table_result['target'] = target_diff
    table_result['same'] = same_tables
    logger.info('兩個庫中表的差異結果{}'.format(table_result))
    return  table_result
 

def  diff_indexs_fields(sourcesql, targetsql, type=1):
    '''
    :param sourcesql: 源數據庫表的創建sql
    :param targetsql: 目標數據表的創建sql
    :return: 記錄下索引不一致的地方
    '''
    result = {}
    logger.info('解析語句中的索引字段,並進行比較索引')
    sourcesql = parse_str(sourcesql)  # 從括號中提取需要的內容
    #logger.info('從括號中提取出來的信息數據{}'.format(sourcesql))
    sourcesql = lists2str(sourcesql)  #將list轉換為str,並對數據的空格數據進行處理
    logger.info('解析完的數據的信息{}'.format(sourcesql))
    sourcesql = sourcesql.split('\n') #將str按照'\\n'進行分割
    logger.info('解析完數據之後的信息{}'.format(sourcesql))
    targetsql = parse_str(targetsql)
    targetsql = lists2str(targetsql)
    targetsql = targetsql.split('\n')
    if type ==1:
        source_index = parse_fields(sourcesql,type)
        target_index = parse_fields(targetsql,type)

        result= compare_indexs_field(source_index, target_index, type)
    elif type ==2:
        source_field_sql = parse_fields(sourcesql, type)
        target_field_sql = parse_fields(targetsql, type)
        result = compare_indexs_field(source_field_sql, target_field_sql, type)
    return  result

def dict2sql(dict_str):
    '''
    將解析完成的數據轉換為對應的可執行sql
    :param dict_str:
    :return:
    '''
    dict_str = copy.deepcopy(dict_str) # 做一個深度copy,可以放心的進行相關數據處理

    if not isinstance(dict_str, dict):
        raise  TypeError('調用方法{}參數不是dict類型,請確認'.format('dict2sql'))
    #獲取db名字
    for key ,value in dict_str.items():
        dbname = key
        logger.info('數據庫名{}'.format(dbname))
        for table, table_desc  in  value.get('source').items():
               if table =='create_table':
                   #create_table_sql = lists2str(table_desc)
                   dict_str[dbname]['source'][table] = table_desc
                   #其他的都是table的名字
                   logger.info('數據庫的修改語句:{}'.format(table_desc))
               else:
                  logger.info('對於索引和字段的解析原始數據{}'.format(table_desc))
                  if table_desc.get('index'):
                      create_index_sql_lists=[]
                      #create_index_sql_lists.append('use {};'.format(dbname))
                      index_lists= (table_desc.get('index'))
                      result_index= parse_comma_split(str(index_lists)[1:-1])
                      for i in result_index:
                           if i.strip().startswith('\'KEY'):
                               #print(i.strip())
                               index_values = parse_space_split(i.strip())
                               drop_index_sql= 'drop index {} on {}'.format(index_values[1],table )
                               if len(index_values)<=3:
                                  create_index_sql='create index {} on {}{} '.format(index_values[1], table, index_values[2])
                               else:
                                  create_index_sql='create index {} on {}{} {}'.format(index_values[1], table, index_values[2], ' '.join(index_values[3:]))
                               create_index_sql_lists.append(drop_index_sql)
                               create_index_sql_lists.append(create_index_sql)

                           if i.strip().startswith('\'UNIQUE KEY'):
                                 index_values = parse_space_split(i.strip())
                                 drop_index_sql = 'drop index {} on {}'.format(index_values[2], table)
                                 if len(index_values) <= 4:
                                     create_index_sql = 'create unique index {} on {}{} '.format(index_values[2], table,
                                                                                          index_values[3])
                                 else:
                                     create_index_sql = 'create unique index {} on {}{} {}'.format(index_values[2], table,
                                                                                               index_values[3],
                                                                                               ' '.join(index_values[4:]),
                                                                                               )
                                 create_index_sql_lists.append(drop_index_sql)
                                 create_index_sql_lists.append(create_index_sql)
                      logger.info('表{}解析出來的索引的修改sql{}'.format(table, create_index_sql_lists))
                      dict_str[dbname]['source'][table]['index'] = create_index_sql_lists
                  if table_desc.get('fields'):
                      create_fields_sql_lists=[]
                      #create_fields_sql_lists.append('use {};'.format(dbname))
                      modify_field_sqls = table_desc.get('fields').get('modify',None)
                      create_field_sqls=table_desc.get('fields').get('lose',None)

                      if modify_field_sqls:
                           for modify_field_sql in modify_field_sqls:
                               sql_indexs = parse_space_split(str(modify_field_sql)[0:-1])
                               #print(sql_indexs)
                               alter_fields_sql='alter table {} modify column {} {} {}'.format(table, sql_indexs[0],sql_indexs[1],' '.join(sql_indexs[2:]))
                               create_fields_sql_lists.append(alter_fields_sql)
                      if create_field_sqls:
                           for  create_field_sql in create_field_sqls:
                                sql_indexs = parse_space_split(str(create_field_sql)[0:-1])
                                create_fields_sql='alter table {} add column {} {}'.format(table, sql_indexs[0],' '.join(sql_indexs[2:]))
                                create_fields_sql_lists.append(create_fields_sql)
                      logger.info('表{}解析出來的字段的修改sql{}'.format(table,create_fields_sql_lists))
                      dict_str[dbname]['source'][table]['fields'] = create_fields_sql_lists

    return  dict_str  # 返回給一個全部是可執行sql的dict

五、後續:

1、對使用過程中遇到對bug進行修復

2、對代碼進行優化

3、增加其他相關功能,讓工具越來越好用

4、希望使用的小夥伴多提意見,未來成為一個好用的小工具

本站聲明:網站內容來源於博客園,如有侵權,請聯繫我們,我們將及時處理【其他文章推薦】

網頁設計公司推薦更多不同的設計風格,搶佔消費者視覺第一線

※廣告預算用在刀口上,網站設計公司幫您達到更多曝光效益

※自行創業 缺乏曝光? 下一步"網站設計"幫您第一時間規劃公司的門面形象