共计 9718 个字符,预计需要花费 25 分钟才能阅读完成。
原文地址 - 我的博客
兴许你在数据迷信 /AI/ 机器学习的钻研中头疼于大型数据加载与落盘的速度问题,毕竟 IO 过程是最磨人工夫的。大家常调侃于 python 能优化的空间的不多,但事实上咱们能够尽量地做到更好。心愿本文对你的程序有点帮忙。
本文的 IO 效率晋升的探讨限定在数据迷信畛域内的以 numpy.ndarray
为代表的大型数组 (张量、矩阵) 数据对象的 IO 问题上。解决问题的伎俩是以多线程 / 多过程为根底的并行写入 / 读取。同网络 io 和一般的小数据量的 io 问题不同,数据迷信的大矩阵对象往往随同着矩阵的切片等操作,他们对于内存的占用(是否复制、挪动等)不明,更容易陷入内存冗余占用问题,这些都会影响 io 效率。本文探讨如下几个主题:
- 基于多线程 / 过程的并行读写办法及性能比照
- 并行 IO 中留神内存的冗余拷贝景象
- 最佳实际总结
IO 情景
本文探讨的 IO 情景很简略,从磁盘上加载大数据进行解决,再将后果存储。这种状况常见于各类机器学习框架中,对数据的 load 和 dump 是最根本要解决的问题。下文中探讨的一些原理和技巧也在 pytorch
、tensorflow
等的 IO 接口中体现。
在数据迷信场景下,要优化读写的效率,能够从以下几个方向动手:
- 从文件编码格局动手,采纳
pkl
等的二进制编码减速读写 - 从读写接口优化动手,采纳 DirectIO/ 零拷贝等的优化
- 分块、分批并行读写,适宜数据绝对独立情景
上述三种办法第一种操作简略,但编码的模式不不便与其余语言 / 工具兼容。第二种对于 Python 来讲有点小题大做,而且 Python 的 IO 接口不如动态语言那样显式,尽管也能间接采纳 os.open(CLONE_FLATS=...)
的最底层接口,但采纳 DirectIO
[4] 或mmap
之类的优化都须要减少设计老本。第三种办法虽波及多线程 / 过程,但不波及通信与同步,实际绝对简略。
多线程 / 多过程并行读写
并行根本逻辑
多过程导致的并行读写逻辑很简略,次要的开销在操作系统对过程的治理上。多线程对并行读写的实践撑持有必要再提一下 (针对 Cpython), 下图[1] 所示的是 GIL 针对线程 IO 情景的解决。
上图也显示了多线程的次要开销是各个线程 run
阶段的总和以及操作系统对线程的治理开销。
针对 Cpython 的多线程仍须要留神的是
- Linux 下齐全是 POSIX-thread, 这意味着调度模式依然是 1:1 的用户 - 内核映射关系
- Cpython 多线程默认共享解释器中的全局变量
- 线程开释 GIL 的 IO 机会是进行底层根本的 IO 零碎调用后
- 多线程对于调度通信应用信号量、条件变量等办法
规范库接口测评
咱们设计一个小试验对 CPython 规范库提供的多线程 / 过程后果的并行写文件效率进行测试:
import os
import numpy as np
import time
from multiprocessing import Process
from multiprocessing import Pool
from concurrent.futures import ProcessPoolExecutor, ThreadPoolExecutor
from threading import Thread
from memory_profiler import profile
# Time calculator
class Benchmark:
def __init__(self, text):
self.text = text
def __enter__(self):
self.start = time.time()
def __exit__(self, *args):
self.end = time.time()
print("%s: consume: %s" % (self.text, self.end - self.start))
# Base Task
def store_task(data: np.ndarray, output, index):
fname = "%s_worker_%s.csv" % (output, index)
np.savetxt(fname, data, delimiter='\t')
#main data source
worker_num = os.cpu_count()
big_data = np.random.rand(1000000, 10)
task_num = big_data.shape[0] // worker_num
# 1. multiprocessing.Porcess
@profile
def loop_mp():
pool = []
for i in range(worker_num):
start = i * task_num
end = (i+1) * task_num
p = Process(target=store_task, args=(big_data[start: end], 'testdata/', i))
p.start()
pool.append(p)
for p in pool:
p.join()
# 2. threading.Thread
@profile
def mt_thread():
pool = []
for i in range(worker_num):
start = i * task_num
end = (i+1) * task_num
t = Thread(target=store_task, args=(big_data[start: end], 'testdata/thread', i))
t.start()
pool.append(t)
for p in pool:
p.join()
# 3. multiprocessing.Pool
@profile
def mp_pool():
with Pool(processes=worker_num) as pool:
tasks = []
for i in range(worker_num):
start = i * task_num
end = (i+1) * task_num
tasks.append(pool.apply_async(store_task_inner, (big_data[start: end], 'testdata/mp_pool', i)))
pool.close()
pool.join()
# 4. ProcessPoolExecutor
@profile
def loop_pool():
with ProcessPoolExecutor(max_workers=worker_num) as exe:
for i in range(worker_num):
start = i * task_num
end = (i+1) * task_num
exe.submit(store_task, big_data[start: end], 'testdata/pool', i)
# 5. ThreadPoolExecutor
def loop_thread():
with ThreadPoolExecutor(max_workers=worker_num) as exe:
for i in range(worker_num):
start = i * task_num
end = (i+1) * task_num
exe.submit(store_task, big_data[start: end], 'testdata/pool_thread', i)
# 6. direct
@profile
def direct():
store_task(big_data, 'testdata/all', 0)
if __name__ == '__main__':
with Benchmark("loop mp"):
loop_mp()
with Benchmark("mt thread"):
mt_thread()
with Benchmark("mp pool"):
mp_pool()
with Benchmark("loop pool"):
loop_pool()
with Benchmark("direct"):
direct()
with Benchmark("Thread"):
loop_thread()
从工夫耗费和内存上剖析下各个接口的效率(测试环境MacOS 2.2 GHz 四核 Intel Core i7
):
接口 | 耗时 | 内存 |
---|---|---|
multiprocessing.Process |
5.14s | p.start() 产生额定开销,触发参数的复制 |
theading.Thread |
10.34s | 无额定开销 |
multiprocessing.Pool |
4.18s | Pool() 构建额定开销, 参数未产生复制 |
ProcessPoolExecutor |
3.69s | 参数未产生复制 |
ThreadPoolExecutor |
10.82s | 无额定开销 |
direct | 22.04s | 无额定开销 |
工夫开销剖析
直观上看,多过程的接口减速了 4 -4.5x, 多线程减速了一半的工夫。多线程比多过程要慢的起因比较复杂,原则上切换的开销线程要小于过程,但此例中多线程还波及到线程间调度上的通信,而多过程则独立运行。当然有趣味的敌人也能够抉择 asyncio.tasks
基于多路复用的接口比照下,毛病是比拟难找到适宜的非阻塞读写接口。
值得注意的是,多过程的两个接口的速度也有很大差异,Process
的模式比线程池的要慢很多,起因可能是数据拷贝的开销。下节探讨池技术为何防止了数据的拷贝。
内存开销剖析
因为 CPython 的数据类型的限度,对于多线程 threading
和多过程 multiprocessing
的数据是否复制不能显式地展示,从原理上讲 Thread()
是无需拷贝数据的,Process
是须要拷贝数据的。然而上表中显示 multiprcocessing.Pool
和ProcessPoolExecutor
这两个基于线程池的办法未产生数据的拷贝。
代码中的 @profile
是一个内存剖析的三方库,但他的后果也不能充分说明实质。
其中 Process
的后果是
Line # Mem usage Increment Occurences Line Contents
============================================================
29 101.3 MiB 101.3 MiB 1 @profile
30 def loop_mp():
31 101.3 MiB 0.0 MiB 1 pool = []
32 120.6 MiB 0.0 MiB 9 for i in range(worker_num):
33 120.6 MiB 0.0 MiB 8 start = i * task_num
34 120.6 MiB 0.0 MiB 8 end = (i+1) * task_num
35 120.6 MiB 0.0 MiB 8 p = Process(target=store_task, args=(big_data[start: end], 'testdata/', i))
36 120.6 MiB 19.3 MiB 8 p.start()
37 120.6 MiB 0.0 MiB 8 pool.append(p)
38 120.6 MiB 0.0 MiB 9 for p in pool:
39 120.6 MiB 0.0 MiB 8 p.join()
显著能够看出 p.start()
产生了数据的拷贝,拷贝的就是 big_data[start: end]
理论大小。这与 fork
零碎调用差异很大,零碎调用要明确地传入 CLONE_FLAGS
来约定子过程与父过程的数据拷贝状况。再来看ProcessPoolExecutor
Line # Mem usage Increment Occurences Line Contents
============================================================
68 121.1 MiB 121.1 MiB 1 @profile
69 def loop_pool():
70 121.1 MiB 0.0 MiB 1 with ProcessPoolExecutor(max_workers=worker_num) as exe:
71 121.2 MiB -0.0 MiB 9 for i in range(worker_num):
72 121.2 MiB 0.0 MiB 8 start = i * task_num
73 121.2 MiB 0.0 MiB 8 end = (i+1) * task_num
74 121.2 MiB 0.1 MiB 8 exe.submit(store_task, big_data[start: end], 'testdata/pool', i)
外表上看没有产生拷贝,但事实如此吗?因为 exe.submit
毕竟不是间接触发了 Process()
的构建,想弄明确这个问题还得深究 Pool
技术的原理。
对于 Cpython 的源码解析,曾经不少 Pythonista 做了大量工作。从 [2] 的参考看到 ProcessPoolExecutor
的封装逻辑是
|======================= In-process =====================|== Out-of-process ==|
+----------+ +----------+ +--------+ +-----------+ +---------+
| | => | Work Ids | => | | => | Call Q | => | |
| | +----------+ | | +-----------+ | |
| | | ... | | | | ... | | |
| | | 6 | | | | 5, call() | | |
| | | 7 | | | | ... | | |
| Process | | ... | | Local | +-----------+ | Process |
| Pool | +----------+ | Worker | | #1..n |
| Executor | | Thread | | |
| | +----------- + | | +-----------+ | |
| | <=> | Work Items | <=> | | <= | Result Q | <= | |
| | +------------+ | | +-----------+ | |
| | | 6: call() | | | | ... | | |
| | | future | | | | 4, result | | |
| | | ... | | | | 3, except | | |
+
−+----------+ +------------+ +--------+ +-----------+ +---------+
这个流程是否似曾相识?没错,他与之前文章 [[C++ 造轮子] 基于 pthread 线程池](http://zhikai.pro/post/103) 中 …:
- 应用队列保护工作 task
- Pool 随同着空过程的创立
- 有专门的治理线程来负责 Pool 的治理与监控
那么具体到参数数据拷贝上便是 Queue.put()
与Queue.get()
的操作是否产生数据拷贝了。multiporcessing.Queue
是多过程通信的一种重要接口,他是基于共享内存的,参数数据的传递不产生拷贝,这对于大的 ndarray
对象而言是极其重要的。
ndarray
的对象拷贝
Python 世界里所有皆对象。— Py 圈名言
面对企业级大数据时,Python 程序呈现的内存 / 显存占用率过高往往不是那么容易查明起因。动静援用类型 +gc 给 python 的内存治理带来了不便,但不必要的数据拷贝产生情景还是要尽量避免。
切片与组合
切片和组合是在以 numpy
为代表的向量 / 矩阵 / 张量运算库的罕用操作,他们底层是否产生复制很难剖析:
import numpy as np
A = np.random.rand(1 << 8, 1 << 8)
B = A[:16]
del A ## can not release A's mem, for B's reference
print(A) ## error, the ref A has not exist yet,however its mem still exist
C = np.random.rand(1 << 4, 1 << 8)
D = np.concatenate([B, C], axis=1) ## D is a copy of B+C memory
对于 concatenate
次要看内存散布决定是否产生复制[6]:
00 04 08 0C 10 14 18 1C 20 24 28 2C
| | | | | | | | | | | |
[data1][foo][data2][bar][concat(data1, data2) ]
data1 & data2 displayed in different place, concat them can only cover a new place.
切片同样是看内存散布,基于 row 和 column 的内存排列是不同的,具体的能够应用 order=['C', 'F']
决定数组是按行在内存排列还是按列。[7] 还有一种方法是探索切片最终是否转换成 slice(start, offset, stride)
的模式,如是则为 view, 不能则大概率是 copy, 例如诸多的 fancy_index
模式都是 copy, [:]
其实就是slice(None, None, None)
, 它也是 copy.[8]
切片到底是 view 还是 copy 在小数据量时无需 care,但数据规模达到与内存下限时,大型的 ndarray 切片操作肯定要小心了.
过程创立时的复制
咱们心愿把数据切片后传递给子过程, 同时咱们心愿这份数据不产生复制,各个过程共享这一大型 ndarray
。首先从上一章明确的是,采纳multiprocessing.Process(target=func, args=(ndarray[start:offset]))
创立子过程的形式是肯定会复制 ndarray 的。其实这里次要用到的技术是 multiprocessing
的共享内存办法。
Python3.8 之后新减少了shared_memeory
, 给之前各种共享内存的形式做了一个对立的繁难应用接口。咱们应用 share_memory 革新一下上节的代码:
from multiprocessing import shared_memory
def store_task_sha_v2(start, end, output, index, sha_name, shape, dtype):
fname = "%s_worker_%s.csv" % (output, index)
exist_sham = shared_memory.SharedMemory(name=sha_name)
data = np.ndarray(shape, dtype=dtype, buffer=exist_sham.buf)
print(sha_name, data.shape, index)
np.savetxt(fname, data[start: end], delimiter='\t')
del data
exist_sham.close()
@profile
def mp_pool_sha():
shm = shared_memory.SharedMemory(create=True, size=big_data.nbytes)
b = np.ndarray(big_data.shape, dtype=big_data.dtype, buffer=shm.buf)
b[:] = big_data[:]
print(b.shape)
with ProcessPoolExecutor(max_workers=worker_num) as pool:
tasks = []
for i in range(worker_num):
start = i * task_num
end = (i+1) * task_num
tasks.append(
pool.submit(store_task_sha_v2,
start, end, 'testdata/mp_pool_sha', i ,
shm.name, b.shape, b.dtype))
for t in tasks:
# Note! 在这里捕捉异样,ProcessPoolExecutor 举荐这么应用!
try:
print(t.result())
except Exception as e:
print(f'{e}')
del b
shm.close()
shm.unlink()
代码简单了不少,但逻辑很简略: 共享缓冲区申请 -> 映射 local-ndarray 对象 -> 放数据进入共享缓存区 -> 其余过程读写 -> 敞开缓存区。share_memeory
的益处还有他能够随时申请 local-variable 进行共享。
最佳实际总结
并行读文件加载ndarray
退出你的训练数据很大,须要流解决(训练),间接应用 torch.datasets
等模块加载,他们封装好了并行流处理过程。
如果须要一次性载入 RAM 解决(如 KNN 等算法)则能够采纳分块并行读:
def parallize_load(file, total_num, worker_num):
"""Load embedding file parallelization
@emb_file: source filename
@total_num: total lines
@worker_num: parallelize process num
return: np.ndaary
"""
def load_from_txt(emb, start, n_rows, arr_list):
data = np.loadtxt(emb, skiprows=start, max_rows=n_rows)
arr_list.append(data)
worker_load_num = total_num // worker_num
pool = []
with Manager() as manager:
arr_list = manager.list([])
for index in range(worker_num):
s = index * worker_load_num
if index != worker_num - 1:
e = worker_load_num
else:
e = total_num - (worker_load_num * index)
p = Process(target=load_from_txt, args=(emb_file, s, e, arr_list))
pool.append(p)
p.start()
for p in pool:
p.join()
arr = np.concatenate(arr_list)
return arr
source_total_num = sum(1 for line in open("souce_big_file", "rb"))
source_emb_data = parallize_load("souce_big_file", source_total_num, worker_num)
这基本上是worker_num
X 倍的减速。
并行写入实际
- 尽量避免对 large-ndarray 对象的切片、组合操作。
- 尽量避免应用
for-loop
, 多用矩阵运算 - 写入文件多过程效率更高,逻辑更简洁,但要时刻留神过程间数据不要产生复制
- 尽可能采纳三方库的 io 接口如
np.savetxt
,df.to_csv
等,他们可能对异样、分 chunk 写入等方面都有优化 - 写入字符串时,能尽量地拼接
'\t'.join(List[])
, 就不要应用for ele in List: fp.write("%s\t%s\n" % (ele))
More work
本文探讨的对象只局限于 host-device
的 RAM 和 disk, 对于更常见的 GPU-mem,对于 Python 诸多三方库的接口来讲可就太苦楚了,他们往往都省略了调配 - 申请 - 调度 - 通信 - 销毁的过程,呈现 OOM 异样后排查只能靠指标察看。于此,接下来能够持续钻研下显存的最佳实际。
最初,兴许本文的内容会让你很惊讶,因为对 Python 做优化是一件出力不讨好的事件。但不得不说这些方法在我目前的工作中,在肯定资源的 constrain 下解决了原程序的很多问题。当然目前支流的机器学习算法流程都基于流解决,一次性地过大占用很少呈现了,但也有存在 embedding 读写等须要用到手动读写的中央。
- [1] Understanding GIL
- [2] Lib/concurrent/futures/process.py
- [3] Python 之路
- [4] Direct IO in Python
- [5] Python Doc: 17.2.1.5. Sharing state between processes
- [6] In-place numpy array concatenation? #13279
- [7] Numpy: views vs copy by slicing
- [8] Views versus copies in NumPy