前言:最近写了一个自动导入mysql中数据表的脚本,后来发现有一个to_sql()函数也可以实现类似的功能,测试了该函数后发现,无法实现对数据类型的自动准确判断。(有可能是我没理解到位,有经验的朋友请指出)不罗嗦,直接放示例文件和代码。
示例文件:

# -*- coding: utf-8 -*-"""Created on Tue Jun 23 11:39:34 2020@author: Ray"""import reimport sysimport pymysqlimport loggingimport argparseimport numpy as npimport pandas as pdfrom warnings import filterwarnings#--------------------------------------------------------------------------报错日志-----------------------------------------------------------------------------------------------#logging.basicConfig(filename = 'load.log', level=logging.DEBUG, format = "%(levelname)s %(asctime)s:%(name)s:%(message)s", datefmt = '%Y-%m-%d', filemode = 'w')filterwarnings("error", category = pymysql.Warning) # 捕获mysql错误信息。#--------------------------------------------------------------------------参数设置-----------------------------------------------------------------------------------------------#parser = argparse.ArgumentParser(description = 'Inofrmation of database table.')parser.add_argument('-data', action = 'store', help = 'Path of data source.', required = True) # 指定数据源或数据地址parser.add_argument('-database', help = 'Database name.', required = True) # 指定存储得库名parser.add_argument('-tablename', help = 'Table name.', required = True) # 指定存储的表名parser.add_argument('-droptable', help = 'If exists table, drop table', required = False, default = False, type = bool) # 如果建表时存在相同表名是否删除parser.add_argument('-primaryKey', nargs = '+', help = 'PrimaryKey column.(Example format: ID or ID Pos)', required = False, default = None) # 设定的主键parser.add_argument('-index', nargs = '+', help = 'Index column.(Example format: MAF or MAF Pos)', required = False, default = None) # 指定索引字段(默认为空)parser.add_argument('-index_union', help = 'Pass index_union dictionary.(Example format: MAF-Pos or MAF-Pos,Chrom-Gene)', required = False, default = None) # 指定索引字段(默认为空)parser.add_argument('-host', help = 'Host name.', required = True) # 主机名或IP地址parser.add_argument('-user', help = 'User name.', required = True) # 用户名parser.add_argument('-password', help = 'Password', required = True) # 密码args = parser.parse_args()    wide_connect = Nonewide_cursor = NonetypeList = [] # 建表语句priDict = dict() # 定义主键()        #--------------------------------------------------------------------------数据导入-----------------------------------------------------------------------------------------------## 判断原始数据是否含有重复名with open(args.data, 'r') as data:    for line in data:        li = line.strip().split('\t')        break        set_lst = set(li)        if len(set_lst) == len(li):    pass    else:    logging.warning("Duplicate column names.")    sys.exit()    data = pd.read_table(args.data, sep = '\t')if len(data.columsn) < 2:    logging.debug("列太少")    sys.exit()# 数据规范if data.isnull().any().any() == True: # 判断是否存在null值    for line, j in zip(data.values, range(len(data.values))): # 输出null的位置        for i in line:            if pd.isnull(i):                logging.error("The NULL in the {}".format(j + 1))                sys.exit()   if data.isna().any().any() == True:    for line, j in zip(data.values, range(len(data.values))): # 输出null的位置        for i in line:            if pd.isna(i):                logging.error("The NaN in the {}".format(j + 1))                sys.exit()if set(['MAF']).issubset(data.columns): # 判断是否存在MAF列,判断是否全小于0.5    MAF_max = max(i for i in data['MAF'])     if MAF_max > 0.5:        logging.error("MAF must be less than 0.5")        sys.exit()if set(['Gene']).issubset(data.columns): # 如果用户数据有Gene列,判断其格式    for i ,j in zip(data['Gene'], range(len(data['Gene']))):        if re.search(':', i) != None:            logging.error("The bug in the {} line of Gene column".format(j + 1))            sys.exit()        else:            continue#--------------------------------------------------------------------------MySQL链接----------------------------------------------------------------------------------------------#wide_connect = pymysql.connect(host = args.host,                               user = args.user,                               password = args.password,                               port = 3306, charset = 'utf8',                               local_infile = True) # ***loacl_infile允许导入本地文件***wide_cursor = wide_connect.cursor()#--------------------------------------------------------------------------数据类型-----------------------------------------------------------------------------------------------#      # 针对整数型数值定义其格式-----------------------------------------------------------------------------------------------------------------------------------------------------------int_fun = lambda x: 'TINYINT({})'.format(len(str(x))) if -128 <= x <= 127\            else 'SMALLINT({})'.format(len(str(x))) if -32768<= x <= 32767\            else 'MEDIUMINT({})'.format(len(str(x))) if -8388608 <= x <= 8388607\            else 'INT({})'.format(len(str(x))) if -2147483648 <= x <= 2147483647\            else 'BIGINT({})'.format(len(str(x)))int_fun_unsigned = lambda x: 'TINYINT({}) UNSIGNED '.format(len(str(x))) if x <= 255\            else 'SMALLINT({}) UNSIGNED '.format(len(str(x))) if x <= 65535\            else 'MEDIUMINT({}) UNSIGNED '.format(len(str(x))) if x <= 16777215\            else 'INT({}) UNSIGNED '.format(len(str(x))) if x <= 4294967295\            else 'BIGINT({}) UNSIGNED '.format(len(str(x)))                      # 针对浮点型数据定义其格式-----------------------------------------------------------------------------------------------------------------------------------------------------------def float_fun(column):    flo = [str(i).split('.') for i in data[column] if pd.notnull(i)]    m = max([len(i[0]) for i in flo if len(i) >= 2]) # 整数位最大长度    d = min([len(i[1]) for i in flo if len(i) >= 2]) # 小数位最大长度    return 'FLOAT ({m}, {d})'.format(m = m + d + 2, d = d + 2)#--------------------------------------------------------------------------建表语句-----------------------------------------------------------------------------------------------#typeList = []for column in data.columns:    # 如果字段是整数型    if data[column].dtypes in [np.dtype('int16'), np.dtype('int32'), np.dtype('int64'), np.dtype('uint16'), np.dtype('uint32'), np.dtype('uint64')]:        if -1 in np.sign(data[column]):            int_max = data[column].max() # 整数长度            typeList.append("`{}`".format(column) + int_fun(int_max) + "DEFAULT NULL")            continue        else:            int_max = data[column].max() # 整数长度            typeList.append("`{}`".format(column) + int_fun_unsigned(int_max) + "DEFAULT NULL")            continue    # 如果字段是浮点型    if data[column].dtype in [np.dtype('float16'), np.dtype('float32'), np.dtype('float64')]:        typeList.append("`{}`".format(column) + float_fun(column) + "DEFAULT NULL")        # 如果字段是文本    else:        str_len = max([len(i) for i in data[column] if pd.notnull(i)])        typeList.append("`{}`".format(column) + "VARCHAR({})".format(str_len) + "DEFAULT NULL")    # 如果定义了主键,则该字段默认不为空----------------------------------------------------------------------------------------------------------------------------------------------------for column, c in zip(data.columns, range(len(data.columns))):    if column in args.primaryKey:        typeList[c] = typeList[c].replace('DEFAULT NULL', 'NOT NULL')        priDict[column] = column                    #----------------------------------------------------------------------------建表-------------------------------------------------------------------------------------------------## 指定主键(我们一般定义ID为主键)if args.primaryKey != None:    if len(args.primaryKey) == 1: # 如果定义一个键是主键        primaryKey_statement = ["PRIMARY KEY (`{}`)".format(col) for col in data.columns if col in args.primaryKey][0]    else:        primaryKey_statement = "PRIMARY KEY ({})".format(','.join(['`' + col + '`'for col in data.columns if col in args.primaryKey]))        # 指定索引if args.index != None and args.index_union != None: # 如果创建联合索引,在将args.index中的每个元素单独创建索引的同时,将args.index_union中的每一个元素创建索引    index_statement = ','.join(["KEY `{}` (`{}`)".format(col, col) for col in data.columns if col in args.index])    index_statement =  index_statement + ',' + ','.join(["KEY `{}` ({})".format(index, ','.join('`' + k + '`' for k in index.split('-'))) for index in args.index_union.split(',')])elif args.index != None and args.index_union == None: # 如果不创建联合索引,将self.key中的每个元素创建索引    index_statement = ','.join(["KEY `{}` (`{}`)".format(col, col) for col in data.columns if col in args.index])elif args.index == None and args.index_union != None:    index_statement = ','.join(["KEY `{}` ({})".format(index, ','.join('`' + k + '`' for k in index.split('-'))) for index in args.index_union.split(',')])    # 执行建表语句----------------------------------------------------------------------------------------------------------------------------------------------------------------------wide_cursor.execute("USE {};".format(args.database)) # 选择数据库try:    if args.droptable: # 是否删除已经存在的表        wide_cursor.execute("DROP TABLE IF EXISTS {};".format(args.tablename))except pymysql.Warning:    logging.debug("不存在{}表".format(args.tablename))    if 'primaryKey_statement' in dir() and 'index_statement' in dir():     wide_cursor.execute("CREATE TABLE IF NOT EXISTS `{table_}`(\                        {data_demand_statement_},\                        {primary_key_},\                        {index_statement_}) ENGINE=MyISAM DEFAULT CHARSET=utf8;".\                        format(table_ = args.tablename, # 表名                               data_demand_statement_ = ','.join(typeList), # 字段格式语句                               primary_key_ = primaryKey_statement, # 主键语句                               index_statement_ = index_statement # 索引语句                               )                        )elif 'primaryKey_statement' not in dir() and 'index_statement' in dir():    wide_cursor.execute("CREATE TABLE IF NOT EXISTS `{table_}`(\                        {data_demand_statement_},\                        {index_statement_}) ENGINE=MyISAM DEFAULT CHARSET=utf8;".\                        format(table_ = args.tablename, # 表名                               data_demand_statement_ = ','.join(typeList), # 字段格式语句                               index_statement_ = index_statement # 索引语句                               )                        )elif 'primaryKey_statement' in dir() and 'index_statement' not in dir():    wide_cursor.execute("CREATE TABLE IF NOT EXISTS `{table_}`(\                        {data_demand_statement_},\                        {primary_key_}) ENGINE=MyISAM DEFAULT CHARSET=utf8;".\                        format(table_ = args.tablename, # 表名                               data_demand_statement_ = ','.join(typeList), # 字段格式语句                               primary_key_ = primaryKey_statement # 索引语句                               )                        )else:     wide_cursor.execute("CREATE TABLE IF NOT EXISTS `{table_}`(\                        {data_demand_statement_}) ENGINE=MyISAM DEFAULT CHARSET=utf8;".\                        format(table_ = args.tablename, # 表名                               data_demand_statement_ = ','.join(typeList), # 字段格式语句                               )                        )            #----------------------------------------------------------------------------存储-------------------------------------------------------------------------------------------------#wide_cursor.execute("USE {}".format(args.database))# 导入数据表语句------------------------------------------------------------------------------------------------------------------------------------------------------------try:    wide_cursor.execute("LOAD DATA LOCAL INFILE '{}' INTO TABLE {}.{} FIELDS TERMINATED BY '\\t' IGNORE 1 LINES".format(args.data, args.database, args.tablename))except pymysql.Warning:    logging.warning("Duplicate primarikey.")    wide_cursor.execute("drop table {}".format(args.tablename))        wide_connect.commit() # 数据提交wide_cursor.close() # 关闭链接

代码有很多不成熟的地方,大家在使用过程中发现有错误请指出。
在学习的过程中借鉴了这篇文章。(https://blog.csdn.net/weixin_38153458/article/details/85289433?utm_medium=distribute.pc_relevant.none-task-blog-BlogCommendFromMachineLearnPai2-1.nonecase&depth_1-utm_source=distribute.pc_relevant.none-task-blog-BlogCommendFromMachineLearnPai2-1.nonecase)