如果咱们须要解决一个文本文件,外面有 100 万行数据,须要对每条数据做解决,比方将每行数据的数字做一个运算,放入到另一个文件里。
最简略的方法就是关上文件,逐行读取,每读取一行,对这一行做下解决,增加到指标文件中,再回来读取下一行。
这就是线性解决形式,如果解决一行数据须要 0.1 秒,那么用线性解决形式就须要:
10 万秒,即大略 28 个小时
显然对咱们来说,这个工夫有点长,有没用方法缩短呢?
当然有方法,那就是用 多线程 解决!
为什么呢?是因为多线程是提高效率,实现更无效程序的必然状态。
比方,须要解决大量的数据,须要响应多样的申请,须要与慢速的处理过程交互等等,都须要用到线程编程。
然而,线程这个概念不太好了解,用起来也总是不不便,而且容易出错,一方面是因为,咱们的思路是线性的,另一方面是多线程自身有很多须要把握的概念,学习了解难度比拟高。
明天我将分享一下我在工作中是如何利用多线程技术,提速增效的。
对于后面那个例子,能够将原来的一个解决流程,合成为多个,例如之前的解决能够合成为:
读取行、做运算、存文件 三个自流程。
这样的话,相当于将只能一个人做的工作,能够让更多的人来做,从而造成相似的流水线效应
那么用多线程,就能够使咱们的三个工作呈现同时运行的状态,晋升效率,比方先读取一行,而后再解决数据的同时,读取下一行,如此往返。
是不是感觉很好?
先别着急,首先须要解决一个问题 ——
如何防止反复读和跳读
反复读指的是,一个以上线程读取到了同一条数据;
跳读指的是,有些数据行没有任何线程解决。
这里介绍一个帮忙我解决了很多多线程问题的办法,一个数据源类。
多线程数据源类
数据源类,就是将数据集中管理,而后以线程平安的形式为多线程程序提供数据。
留神:并非最佳办法,但很实用
废话不多说,间接看代码:
import threading
class DataSource:
def __init__(self, dataFileName, startLine=0, maxcount=None):
self.dataFileName = dataFileName
self.startLine = startLine # 第一行行号为 1
self.line_index = startLine # 以后读取地位
self.maxcount = maxcount # 读取最大行数
self.lock = threading.RLock() # 同步锁
self.__data__ = open(self.dataFileName, 'r', encoding= 'utf-8')
for i in range(self.startLine):
l = self.__data__.readline()
def getLine(self):
self.lock.acquire()
try:
if self.maxcount is None or self.line_index < (self.startLine + self.maxcount):
line = self.__data__.readline()
if line:
self.line_index += 1
return True, line
else:
return False, None
else:
return False, None
except Exception as e:
return False, "解决出错:" + e.args
finally:
self.lock.release()
def __del__(self):
if not self.__data__.closed:
self.__data__.close()
print("敞开数据源:", self.dataFileName)
- init 初始化办法,承受 3 个参数
lock 属性是一个同步锁,以便在多线程读取不呈现抵触
- dataFileName 是数据文件门路
- startLine 开始读取行,对于大文件须要调配解决时特地有用,
- maxcount 读取最大行数,通过和 startLine 配合能够读取指定局部的数据,默认为全副读取
getLine 办法,每次调用会返回一个元组,蕴含状态和失去的,数据
del 办法会在对象销毁时调用,在此记录以后解决地位
这样就是能够利用在多线程程序中,承当读取待处理记录的工作了。
业务解决
例如外围处理程序如下:
import time
def process(worker_id, datasource):
count = 0
while True:
status, data = datasource.getLine()
if status:
print(">>> 线程[%d] 取得数据,正在解决……" % worker_id)
time.sleep(3) # 期待 3 秒模仿处理过程
print(">>> 线程[%d] 解决数据 实现" % worker_id)
count += 1
else:
break # 退出循环
print(">>> 线程 [%d] 完结,共解决[%d] 条数据" % (worker_id, count))
- 参数 worker_id 是线程号,用于辨别输入音讯
- 参数 datasource 是 DataSource 的实例,作为各线程的共享数据源
- count 用于记录以后线程解决的记录数
- 用一个死循环,驱动重复解决,直到读取没数据可读
组装
线程组装局部就也很简略:
import threading
def main():
datasource = DataSource('data.txt')
workercount = 10 # 开启的线程数,留神:并非越多越快哦
workers = []
for i in range(workercount):
worker = threading.Thread(target=process, args=(i+1, datasource))
worker.start()
workers.append(worker)
for worker in workers:
worker.join()
- 先初始化一个 DataSource
- workercount 为须要创立的线程数,在理论利用中能够通过配置或者参数提供,另外不是线程越多越好,个别设置为 CPU 外围数的两倍即可
- threading.Thread 是线程类,能够实例化一个线程,target 参数是线程解决办法,这里就是后面定义的 process 办法,args 为提供给解决办法的参数
- 线程的 start 办法是启动线程,因为创立不等于启动,start 是个异步办法,调用会霎时实现
- join 办法是期待线程解决实现,是同步办法,只有线程真正解决实现才会完结
扩大
通过这样的形式,帮我解决了很多理论的业务,比方爬取关键字信息,合并数据等等。
如果解决的数据不是文本文件,只有批改一下 DataSource 的 getLine 实现就能够了,比方数据源来自数据库等。
另外,下面的 DataSource 并非最优的,只是起到了标准读取接口,避免数据误读的作用,齐全谈不上性能最优。
那么如何实现更优呢,这里提供一个思路就是,应用生产者消费者模型,利用 队列,以及 预读取 技术来实现更优的数据源类。
例如,DataSource 中,是逐行读取的,能够采纳预读取,即提前读取一些数据,当线程须要数据时,先给出预读取的,等预读取的数据生产到一定量时,再异步读取一部分。
这样的益处是,各个线程不用期待 IO 工夫(简略了解为从文件或者网络读取的等待时间)。
如何实现呢,能够理解一下队列(queue)的概念,Python 中提供了两种队列,同步队列 queue 和 队列集。
想想具体应该怎么做呢?欢送在留言区写下你的办法和倡议。
总结
明天分享了一个在理论工作中用到的,多线程解决数据的例子,例子尽管简略,但很实用,曾经帮忙我解决了很多重要的工作。
谈一些感悟,Python 的利用并不仅限于数据分析、AI 畛域等热门畛域,更多的能够利用在于解决日常生活工作中,比方解决数据,代替手工操作,简略运算等。
咱们晓得,学会一个货色最好的形式是应用,对于 Python 技能来说,也是一样的,多在日常工作中用,多去解决理论问题,不必卯足了劲儿,憋个大招。
以上就是本次分享的所有内容,想要理解更多 python 常识欢送返回公众号:Python 编程学习圈,每日干货分享