关于连接池:mysql连接池实现

30次阅读

共计 1525 个字符,预计需要花费 4 分钟才能阅读完成。

代码如下:

import pymysql
import logging
import traceback
import threading

if __name__ == '__main__':
    from config import mysql_conf
else:
    from config.config import mysql_conf
from dbutils.pooled_db import PooledDB


# MySQL 连接池
class MySQLPool(object):
    # 类变量
    pool = PooledDB(creator=pymysql, **mysql_conf)

    print("创立数据库连接池 >>>", id(pool))

    # with 上下文
    def __enter__(self):
        self.conn = self.pool.connection()
        self.cursor = self.conn.cursor(cursor=pymysql.cursors.DictCursor)
        # 记得 return self
        return self

    def __exit__(self, exc_type, exc_val, exc_tb):
        # 敞开连接池
        self.cursor.close()
        self.conn.close()

    # 插入或批改操作
    def insert_or_update(self, sql):
        try:
            self.cursor.execute(sql)
            rowid = self.cursor.lastrowid
            self.conn.commit()
            return rowid
        except Exception as error:
            print(traceback.format_exc())
            # 回滚
            self.conn.rollback()
            # 简略的日志解决
            print(error)
            # logging.error("=======ERROR=======\n%s\nsql:%s" % (error, sql))
            raise

    # 插入或批改操作
    def insert_many(self, sql, data):
        try:
            self.cursor.executemany(sql, data)
            self.conn.commit()
            return 1
        except Exception as error:
            print(traceback.format_exc())
            # 回滚
            self.conn.rollback()
            # 简略的日志解决
            print(error)
            # logging.error("=======ERROR=======\n%s\nsql:%s" % (error, sql))
            raise

    # 查问操作
    def query(self, sql):
        try:
            self.cursor.execute(sql)
            results = self.cursor.fetchall()
            return results
        except Exception as error:
            # 简略的日志解决
            print(error)
            # logging.error("=======ERROR=======:\n%s\nsql:%s" % (error, sql))
            raise


if __name__ == '__main__':
    mysql = MySQLPool()

间接调用即可

 with MySQLPool() as poolHandler:
        # 插入 position_pool
        poolHandler.insert_many(sql_insert_pool, data)
        # 插入 position_increase
        poolHandler.insert_many(sql_insert_increase, data)

print(“ 创立数据库连接池 >>>”, id(pool)),打印这一行代码其实就晓得是不是单个连接池了

正文完
 0