共计 1525 个字符,预计需要花费 4 分钟才能阅读完成。
代码如下:
import pymysql
import logging
import traceback
import threading
if __name__ == '__main__':
from config import mysql_conf
else:
from config.config import mysql_conf
from dbutils.pooled_db import PooledDB
# MySQL 连接池
class MySQLPool(object):
# 类变量
pool = PooledDB(creator=pymysql, **mysql_conf)
print("创立数据库连接池 >>>", id(pool))
# with 上下文
def __enter__(self):
self.conn = self.pool.connection()
self.cursor = self.conn.cursor(cursor=pymysql.cursors.DictCursor)
# 记得 return self
return self
def __exit__(self, exc_type, exc_val, exc_tb):
# 敞开连接池
self.cursor.close()
self.conn.close()
# 插入或批改操作
def insert_or_update(self, sql):
try:
self.cursor.execute(sql)
rowid = self.cursor.lastrowid
self.conn.commit()
return rowid
except Exception as error:
print(traceback.format_exc())
# 回滚
self.conn.rollback()
# 简略的日志解决
print(error)
# logging.error("=======ERROR=======\n%s\nsql:%s" % (error, sql))
raise
# 插入或批改操作
def insert_many(self, sql, data):
try:
self.cursor.executemany(sql, data)
self.conn.commit()
return 1
except Exception as error:
print(traceback.format_exc())
# 回滚
self.conn.rollback()
# 简略的日志解决
print(error)
# logging.error("=======ERROR=======\n%s\nsql:%s" % (error, sql))
raise
# 查问操作
def query(self, sql):
try:
self.cursor.execute(sql)
results = self.cursor.fetchall()
return results
except Exception as error:
# 简略的日志解决
print(error)
# logging.error("=======ERROR=======:\n%s\nsql:%s" % (error, sql))
raise
if __name__ == '__main__':
mysql = MySQLPool()
间接调用即可
with MySQLPool() as poolHandler:
# 插入 position_pool
poolHandler.insert_many(sql_insert_pool, data)
# 插入 position_increase
poolHandler.insert_many(sql_insert_increase, data)
print(“ 创立数据库连接池 >>>”, id(pool)),打印这一行代码其实就晓得是不是单个连接池了
正文完