共计 5121 个字符,预计需要花费 13 分钟才能阅读完成。
在数据生产利用部门,取数剖析是一个很常见的需要,实际上业务人员需要时刻变动,最高效的形式是让业务部门本人来取,缩小不必要的重复劳动,个别状况下,业务部门数据库表构造个别是固定的,依据理论业务将取数需要做成 sql 脚本,疾速实现数据获取 — 授人以渔的形式,提供平台或工具
那如何实现一个自助取数查问工具?
基于底层数据来开发不难,无非是将用户输出变量作为筛选条件,将参数映射到 sql 语句,并生成一个 sql 语句而后再去数据库执行
最初再利用 QT 开发一个 GUI 界面,用户界面的点击和筛选条件,信号触发对应按钮与绑定的传参槽函数执行
具体思路:
一、数据库连贯类
此处利用 pandas 读写操作 oracle 数据库
二、主函数模块
1)输出参数模块,内部输出条件参数,建设数据库关键字段映射
– 注:读取内部 txt 文件,将筛选字段可能须要进行键值对转换
2)sql 语句汇合模块,将待执行的业务 sql 语句对立寄存到这里
3)数据处理函数工厂
4)应用多线程提取数据
一、数据库连贯类
cx_Oracle 是一个 Python 扩大模块,相当于 python 的 Oracle 数据库的驱动,通过应用所有数据库拜访模块通用的数据库 API 来实现 Oracle 数据库的查问和更新
Pandas 是基于 NumPy 开发,为了解决数据分析工作的模块,Pandas 引入了大量库和一些规范的数据模型,提供了高效地操作大型数据集所需的办法类和函数
pandas 调用数据库次要有 read_sql_table,read_sql_query,read_sql 三种形式
本文次要介绍一下 Pandas 中 read_sql_query 办法的应用
1:pd.read_sql_query()
读取自定义数据,返还 DataFrame 格局,通过 SQL 查问脚本包含增删改查。pd.read_sql_query(sql, con, index_col=None,coerce_float=True, params=None, parse_dates=None,chunksize=None)
sql:要执行的 sql 脚本,文本类型
con:数据库连贯
index_col: 抉择返回后果集索引的列, 文本 / 文本列表
coerce_float: 十分有用,将数字模式的字符串间接以 float 型读入
parse_dates: 将某一列日期型字符串转换为 datetime 型数据,与 pd.to_datetime 函数性能相似。params: 向 sql 脚本中传入的参数,官网类型有列表,元组和字典。用于传递参数的语法是数据库驱动程序相干的。chunksize:如果提供了一个整数值,那么就会返回一个 generator,每次输入的行数就是提供的值的大小
read_sql_query()中能够承受 SQL 语句,DELETE,INSERT INTO、UPDATE 操作没有返回值(然而会在数据库中执行),程序会抛出 SourceCodeCloseError,并终止程序。SELECT 会返回后果。如果想持续运行,能够 try 捕获此异样。2:pd.read_sql_table()
读取数据库中的表,返还 DataFrame 格局(通过表名)import pandas as pd
pd.read_sql_table(table_name, con, schema=None,index_col=None, coerce_float=True, parse_dates=None, columns=None,chunksize=None)
3:pd.read_sql()
读数据库通过 SQL 脚本或者表名
import pandas as pd
pd.read_sql(sql, con, index_col=None,coerce_float=True, params=None, parse_dates=None, columns=None, chunksize=None)
以下创立连贯 oracel 数据库的连贯类 Oracle_DB
次要提供 2 种操作数据的函数办法。
import cx_Oracle
# Pandas 读写操作 Oracle 数据库
import pandas as pd
# 防止编码问题带来的乱码
import os
os.environ['NLS_LANG'] = 'SIMPLIFIED CHINESE_CHINA.UTF8'
class Oracle_DB(object):
def __init__(self):
try:
# 连贯 oracle
# 办法 1:sqlalchemy 提供的 create_engine()
# from sqlalchemy import create_engine
# engine = create_engine('oracle+cx_oracle://username:password@ip:1521/ORCL')
# #办法 2:cx_Oracle.connect()
self.engine = cx_Oracle.connect('username', 'password', 'ip:1521/database')
except cx_Oracle.Error as e:
print("Error %d:%s" % (e.args[0], e.args[1]))
exit()
# 查问局部信息
def search_one(self, sql,sparm):
try:
# #查问获取数据用 sql 语句
# 代传参数:sparm-- 查问指定字段参数
df = pd.read_sql_query(sql, self.engine,params=sparm)
self.engine.close()
except Exception as e:
return "Error" + e.args[0]
return df
# 查问全副信息
def search_all(self, sql):
try:
# #查问获取数据用 sql 语句
df = pd.read_sql_query(sql, self.engine)
self.engine.close()
except Exception as e:
return "Error" + e.args[0]
return df
二、数据提取主函数模块
cx_Oracle 是一个 Python 扩大模块,相当于 python 的 Oracle 数据库的驱动,通过应用所有数据库拜访模块通用的数据库 API 来实现 Oracle 数据库的查问和更新。
1)内部输出参数模块
txt 文本中,就蕴含一列数据,第一行列名,读取的时候疏忽第一行
# 建设 ID——编号字典
def buildid():
sqlid = """select * from b_build_info"""
db = Oracle_DB() # 实例化一个对象
b_build_info = db.search_all(sqlid)
ID_bUILDCODE = b_build_info.set_index("BUILDCODE")["ID"].to_dict()
return ID_bUILDCODE
#通过文本传入待导出数据清单
def read_task_list():
build_code=buildid()
tasklist=[]
is_first_line=True
with open("./b_lst.txt") as lst:
for line in lst:
if is_first_line:
is_first_line=False
continue
tasklist.append(build_code.get(line.strip('\n'))) #键值对转换
return tasklist
2)业务 sql 语句汇合
留神 in 前面 {0} 不要加引号,这里传入为元组,params 参数传入 sparm
= {‘Start_time’:’2021-04-01′,’End_time’:’2021-05-01′},此处参数可依据须要扭转
def sql_d(lst):
# 逐月数据
sql_d_energy_item_month = """select * from d_energy_item_month
where recorddate >= to_date(:Start_time, 'yyyy-MM-dd')
and recorddate < to_date(:End_time, 'yyyy-MM-dd')
and buildid in {0}
order by recorddate asc""".format(lst)
# 逐月数据
sql_d_energy_month = """select d.*,t.name from d_energy_month d join t_device_info t on d.branchid = t.id
where d.recorddate >= to_date(:Start_time, 'yyyy-MM-dd')
and d.recorddate < to_date(:End_time, 'yyyy-MM-dd')
and d.buildid = '{0}'
order by d.recorddate asc""".format(lst)
# 查问当日数据
sql_energy_item_hour_cheak = """select * from d_energy_item_hour
where trunc(sysdate)=trunc(recorddate)
order by recorddate asc""".format(lst)
sql_collection = [sql_d_energy_item_month, sql_d_energy_item_day, sql_d_energy_item_hour, sql_d_energy_month,
sql_d_energy_day, sql_d_energy_hour, sql_energy_hour_cheak]
#此处省略局部 sql 语句
return sql_collection
3)业务数据处理
业务数据处理流程,原始数据后处理,这里不作介绍:
def db_extranction(lst,sparm,sql_type):
"""sql_type-- 输出须要操作的 sql 业务序号"""
sql_=sql_d(lst)[sql_type] #输入 sql 语句
db = Oracle_DB() # 实例化一个对象
res=db.search_one(sql_,sparm)
# 数据处理加工
RES=Data_item_factory(res) #此处省略
# res = db.search_all(sql_d_energy_item_month)
print(RES)
return RES
多线程提取数据局部,这里 tasklist 列表多线程提取数据
import threading
# Pandas 读写操作 Oracle 数据库
from tools.Data_Update_oracle import Oracle_DB
import pandas as pd
from concurrent import futures
if __name__ == '__main__':
#内部传入
tasklist= read_task_list()
print(tasklist)
# 输出工夫查找范畴参数,可手动批改
sparm = {'Start_time':'2021-04-01','End_time':'2021-05-01'}
lst = tuple(list(tasklist))
#业务类型序号,可手动批改
sql_type=0
#全副提取
db_extranction(lst,sparm,sql_type)
#多线程按字段分批提取
办法一:应用 threading 模块的 Thread 类的结构器创立线程
#threads=[threading.Thread(target=db_extranction,args=(lst,sparm,sql_type)) for lst in tasklist]
# [threads[i].start() for i in range(len(threads))]
办法二:应用 python 的 concurrent 库,这是官网基于 threading 封装,先装置该库
# with futures.ThreadPoolExecutor(len(tasklist)) as executor:
# executor.map([db_extranction(lst,sparm,sql_type) for lst in tasklist],tasklist)
到此整个数据库取数工具开发流程介绍结束,就差最初一步分享给小伙伴应用了,做成 GUI 利用此处不做具体介绍,构建独立的 python 环境,疾速公布你的利用
最近整顿了几百 G 的 Python 学习材料,蕴含新手入门电子书、教程、源码等等,收费分享给大家!想要的返回“Python 编程学习圈”,发送“J”即可收费取得