代码如下:
import pymysqlimport loggingimport tracebackimport threadingif __name__ == '__main__': from config import mysql_confelse: from config.config import mysql_conffrom dbutils.pooled_db import PooledDB# MySQL连接池class MySQLPool(object): # 类变量 pool = PooledDB(creator=pymysql, **mysql_conf) print("创立数据库连接池 >>>", id(pool)) # with上下文 def __enter__(self): self.conn = self.pool.connection() self.cursor = self.conn.cursor(cursor=pymysql.cursors.DictCursor) # 记得return self return self def __exit__(self, exc_type, exc_val, exc_tb): # 敞开连接池 self.cursor.close() self.conn.close() # 插入或批改操作 def insert_or_update(self, sql): try: self.cursor.execute(sql) rowid = self.cursor.lastrowid self.conn.commit() return rowid except Exception as error: print(traceback.format_exc()) # 回滚 self.conn.rollback() # 简略的日志解决 print(error) # logging.error("=======ERROR=======\n%s\nsql:%s" % (error, sql)) raise # 插入或批改操作 def insert_many(self, sql, data): try: self.cursor.executemany(sql, data) self.conn.commit() return 1 except Exception as error: print(traceback.format_exc()) # 回滚 self.conn.rollback() # 简略的日志解决 print(error) # logging.error("=======ERROR=======\n%s\nsql:%s" % (error, sql)) raise # 查问操作 def query(self, sql): try: self.cursor.execute(sql) results = self.cursor.fetchall() return results except Exception as error: # 简略的日志解决 print(error) # logging.error("=======ERROR=======:\n%s\nsql:%s" % (error, sql)) raiseif __name__ == '__main__': mysql = MySQLPool()
间接调用即可
with MySQLPool() as poolHandler: # 插入position_pool poolHandler.insert_many(sql_insert_pool, data) # 插入position_increase poolHandler.insert_many(sql_insert_increase, data)
print("创立数据库连接池 >>>", id(pool)),打印这一行代码其实就晓得是不是单个连接池了