关于python:PythonIO效率提升面向hugendarray对象

1次阅读

共计 9718 个字符,预计需要花费 25 分钟才能阅读完成。

原文地址 - 我的博客

兴许你在数据迷信 /AI/ 机器学习的钻研中头疼于大型数据加载与落盘的速度问题,毕竟 IO 过程是最磨人工夫的。大家常调侃于 python 能优化的空间的不多,但事实上咱们能够尽量地做到更好。心愿本文对你的程序有点帮忙。

本文的 IO 效率晋升的探讨限定在数据迷信畛域内的以 numpy.ndarray 为代表的大型数组 (张量、矩阵) 数据对象的 IO 问题上。解决问题的伎俩是以多线程 / 多过程为根底的并行写入 / 读取。同网络 io 和一般的小数据量的 io 问题不同,数据迷信的大矩阵对象往往随同着矩阵的切片等操作,他们对于内存的占用(是否复制、挪动等)不明,更容易陷入内存冗余占用问题,这些都会影响 io 效率。本文探讨如下几个主题:

  • 基于多线程 / 过程的并行读写办法及性能比照
  • 并行 IO 中留神内存的冗余拷贝景象
  • 最佳实际总结

IO 情景

本文探讨的 IO 情景很简略,从磁盘上加载大数据进行解决,再将后果存储。这种状况常见于各类机器学习框架中,对数据的 load 和 dump 是最根本要解决的问题。下文中探讨的一些原理和技巧也在 pytorchtensorflow 等的 IO 接口中体现。

在数据迷信场景下,要优化读写的效率,能够从以下几个方向动手:

  • 从文件编码格局动手,采纳 pkl 等的二进制编码减速读写
  • 从读写接口优化动手,采纳 DirectIO/ 零拷贝等的优化
  • 分块、分批并行读写,适宜数据绝对独立情景

上述三种办法第一种操作简略,但编码的模式不不便与其余语言 / 工具兼容。第二种对于 Python 来讲有点小题大做,而且 Python 的 IO 接口不如动态语言那样显式,尽管也能间接采纳 os.open(CLONE_FLATS=...) 的最底层接口,但采纳 DirectIO[4] 或mmap之类的优化都须要减少设计老本。第三种办法虽波及多线程 / 过程,但不波及通信与同步,实际绝对简略。

多线程 / 多过程并行读写

并行根本逻辑

多过程导致的并行读写逻辑很简略,次要的开销在操作系统对过程的治理上。多线程对并行读写的实践撑持有必要再提一下 (针对 Cpython), 下图[1] 所示的是 GIL 针对线程 IO 情景的解决。

上图也显示了多线程的次要开销是各个线程 run 阶段的总和以及操作系统对线程的治理开销。

针对 Cpython 的多线程仍须要留神的是

  • Linux 下齐全是 POSIX-thread, 这意味着调度模式依然是 1:1 的用户 - 内核映射关系
  • Cpython 多线程默认共享解释器中的全局变量
  • 线程开释 GIL 的 IO 机会是进行底层根本的 IO 零碎调用后
  • 多线程对于调度通信应用信号量、条件变量等办法

规范库接口测评

咱们设计一个小试验对 CPython 规范库提供的多线程 / 过程后果的并行写文件效率进行测试:

import os
import numpy as np
import time
from multiprocessing import Process
from multiprocessing import Pool
from concurrent.futures import ProcessPoolExecutor, ThreadPoolExecutor
from threading import Thread
from memory_profiler import profile
# Time calculator
class Benchmark:
    def __init__(self, text):
        self.text = text
    def __enter__(self):
        self.start = time.time()
    def __exit__(self, *args):
        self.end = time.time()
        print("%s: consume: %s" % (self.text, self.end - self.start))

# Base Task
def store_task(data: np.ndarray, output, index):
    fname = "%s_worker_%s.csv" % (output, index)
    np.savetxt(fname, data, delimiter='\t')

#main data source
worker_num = os.cpu_count()
big_data = np.random.rand(1000000, 10)
task_num = big_data.shape[0] // worker_num

# 1. multiprocessing.Porcess
@profile
def loop_mp():
    pool = []
    for i in range(worker_num):
        start = i * task_num
        end = (i+1) * task_num
        p = Process(target=store_task, args=(big_data[start: end], 'testdata/', i))
        p.start()
        pool.append(p)
    for p in pool:
        p.join()

# 2. threading.Thread
@profile
def mt_thread():
    pool = []
    for i in range(worker_num):
        start = i * task_num
        end = (i+1) * task_num
        t = Thread(target=store_task, args=(big_data[start: end], 'testdata/thread', i))
        t.start()
        pool.append(t)
    for p in pool:
        p.join()

# 3. multiprocessing.Pool
@profile
def mp_pool():
    with Pool(processes=worker_num) as pool:
        tasks = []
        for i in range(worker_num):
            start = i * task_num
            end = (i+1) * task_num
            tasks.append(pool.apply_async(store_task_inner, (big_data[start: end], 'testdata/mp_pool', i)))
        pool.close()
        pool.join()

# 4. ProcessPoolExecutor
@profile
def loop_pool():
    with ProcessPoolExecutor(max_workers=worker_num) as exe:
        for i in range(worker_num):
            start = i * task_num
            end = (i+1) * task_num
            exe.submit(store_task, big_data[start: end], 'testdata/pool', i)

# 5. ThreadPoolExecutor
def loop_thread():
    with ThreadPoolExecutor(max_workers=worker_num) as exe:
        for i in range(worker_num):
            start = i * task_num
            end = (i+1) * task_num
            exe.submit(store_task, big_data[start: end], 'testdata/pool_thread', i)

# 6.  direct
@profile
def direct():
    store_task(big_data, 'testdata/all', 0)

if __name__ == '__main__':
    with Benchmark("loop mp"):
        loop_mp()
    with Benchmark("mt thread"):
        mt_thread()
    with Benchmark("mp pool"):
        mp_pool()
    with Benchmark("loop pool"):
        loop_pool()
    with Benchmark("direct"):
        direct()
    with Benchmark("Thread"):
        loop_thread()

从工夫耗费和内存上剖析下各个接口的效率(测试环境MacOS 2.2 GHz 四核 Intel Core i7):

接口 耗时 内存
multiprocessing.Process 5.14s p.start()产生额定开销,触发参数的复制
theading.Thread 10.34s 无额定开销
multiprocessing.Pool 4.18s Pool()构建额定开销, 参数未产生复制
ProcessPoolExecutor 3.69s 参数未产生复制
ThreadPoolExecutor 10.82s 无额定开销
direct 22.04s 无额定开销

工夫开销剖析

直观上看,多过程的接口减速了 4 -4.5x, 多线程减速了一半的工夫。多线程比多过程要慢的起因比较复杂,原则上切换的开销线程要小于过程,但此例中多线程还波及到线程间调度上的通信,而多过程则独立运行。当然有趣味的敌人也能够抉择 asyncio.tasks 基于多路复用的接口比照下,毛病是比拟难找到适宜的非阻塞读写接口。

值得注意的是,多过程的两个接口的速度也有很大差异,Process的模式比线程池的要慢很多,起因可能是数据拷贝的开销。下节探讨池技术为何防止了数据的拷贝。

内存开销剖析

因为 CPython 的数据类型的限度,对于多线程 threading 和多过程 multiprocessing 的数据是否复制不能显式地展示,从原理上讲 Thread() 是无需拷贝数据的,Process是须要拷贝数据的。然而上表中显示 multiprcocessing.PoolProcessPoolExecutor这两个基于线程池的办法未产生数据的拷贝。

代码中的 @profile 是一个内存剖析的三方库,但他的后果也不能充分说明实质。
其中 Process 的后果是

Line #    Mem usage    Increment  Occurences   Line Contents
============================================================
    29    101.3 MiB    101.3 MiB           1   @profile
    30                                         def loop_mp():
    31    101.3 MiB      0.0 MiB           1       pool = []
    32    120.6 MiB      0.0 MiB           9       for i in range(worker_num):
    33    120.6 MiB      0.0 MiB           8           start = i * task_num
    34    120.6 MiB      0.0 MiB           8           end = (i+1) * task_num
    35    120.6 MiB      0.0 MiB           8           p = Process(target=store_task, args=(big_data[start: end], 'testdata/', i))
    36    120.6 MiB     19.3 MiB           8           p.start()
    37    120.6 MiB      0.0 MiB           8           pool.append(p)
    38    120.6 MiB      0.0 MiB           9       for p in pool:
    39    120.6 MiB      0.0 MiB           8           p.join()

显著能够看出 p.start()产生了数据的拷贝,拷贝的就是 big_data[start: end] 理论大小。这与 fork 零碎调用差异很大,零碎调用要明确地传入 CLONE_FLAGS 来约定子过程与父过程的数据拷贝状况。再来看ProcessPoolExecutor

Line #    Mem usage    Increment  Occurences   Line Contents
============================================================
    68    121.1 MiB    121.1 MiB           1   @profile
    69                                         def loop_pool():
    70    121.1 MiB      0.0 MiB           1       with ProcessPoolExecutor(max_workers=worker_num) as exe:
    71    121.2 MiB     -0.0 MiB           9           for i in range(worker_num):
    72    121.2 MiB      0.0 MiB           8               start = i * task_num
    73    121.2 MiB      0.0 MiB           8               end = (i+1) * task_num
    74    121.2 MiB      0.1 MiB           8               exe.submit(store_task, big_data[start: end], 'testdata/pool', i)

外表上看没有产生拷贝,但事实如此吗?因为 exe.submit 毕竟不是间接触发了 Process() 的构建,想弄明确这个问题还得深究 Pool 技术的原理。

对于 Cpython 的源码解析,曾经不少 Pythonista 做了大量工作。从 [2] 的参考看到 ProcessPoolExecutor 的封装逻辑是

|======================= In-process =====================|== Out-of-process ==|

+----------+     +----------+       +--------+     +-----------+    +---------+
|          |  => | Work Ids |    => |        |  => | Call Q    | => |         |
|          |     +----------+       |        |     +-----------+    |         |
|          |     | ...      |       |        |     | ...       |    |         |
|          |     | 6        |       |        |     | 5, call() |    |         |
|          |     | 7        |       |        |     | ...       |    |         |
| Process  |     | ...      |       | Local  |     +-----------+    | Process |
|  Pool    |     +----------+       | Worker |                      |  #1..n  |
| Executor |                        | Thread |                      |         |
|          |     +----------- +     |        |     +-----------+    |         |
|          | <=> | Work Items | <=> |        | <=  | Result Q  | <= |         |
|          |     +------------+     |        |     +-----------+    |         |
|          |     | 6: call()  |     |        |     | ...       |    |         |
|          |     |    future  |     |        |     | 4, result |    |         |
|          |     | ...        |     |        |     | 3, except |    |         |
+
−+----------+     +------------+     +--------+     +-----------+    +---------+

这个流程是否似曾相识?没错,他与之前文章 [[C++ 造轮子] 基于 pthread 线程池](http://zhikai.pro/post/103) 中 …:

  • 应用队列保护工作 task
  • Pool 随同着空过程的创立
  • 有专门的治理线程来负责 Pool 的治理与监控

那么具体到参数数据拷贝上便是 Queue.put()Queue.get()的操作是否产生数据拷贝了。multiporcessing.Queue是多过程通信的一种重要接口,他是基于共享内存的,参数数据的传递不产生拷贝,这对于大的 ndarray 对象而言是极其重要的。

ndarray的对象拷贝

Python 世界里所有皆对象。— Py 圈名言

面对企业级大数据时,Python 程序呈现的内存 / 显存占用率过高往往不是那么容易查明起因。动静援用类型 +gc 给 python 的内存治理带来了不便,但不必要的数据拷贝产生情景还是要尽量避免。

切片与组合

切片和组合是在以 numpy 为代表的向量 / 矩阵 / 张量运算库的罕用操作,他们底层是否产生复制很难剖析:

import numpy as np
A = np.random.rand(1 << 8, 1 << 8)
B = A[:16]
del A  ## can not release A's mem, for B's reference
print(A)  ## error, the ref A has not exist yet,however its mem still exist
C = np.random.rand(1 << 4, 1 << 8)
D = np.concatenate([B, C], axis=1) ## D is a copy of B+C memory

对于 concatenate 次要看内存散布决定是否产生复制[6]:

00    04    08    0C    10    14    18    1C    20    24    28    2C
|     |     |     |     |     |     |     |     |     |     |     |
[data1][foo][data2][bar][concat(data1, data2)  ]

data1 & data2 displayed in different place, concat them can only cover a new place.

切片同样是看内存散布,基于 row 和 column 的内存排列是不同的,具体的能够应用 order=['C', 'F'] 决定数组是按行在内存排列还是按列。[7] 还有一种方法是探索切片最终是否转换成 slice(start, offset, stride) 的模式,如是则为 view, 不能则大概率是 copy, 例如诸多的 fancy_index 模式都是 copy, [:] 其实就是slice(None, None, None), 它也是 copy.[8]

切片到底是 view 还是 copy 在小数据量时无需 care,但数据规模达到与内存下限时,大型的 ndarray 切片操作肯定要小心了.

过程创立时的复制

咱们心愿把数据切片后传递给子过程, 同时咱们心愿这份数据不产生复制,各个过程共享这一大型 ndarray。首先从上一章明确的是,采纳multiprocessing.Process(target=func, args=(ndarray[start:offset])) 创立子过程的形式是肯定会复制 ndarray 的。其实这里次要用到的技术是 multiprocessing 的共享内存办法。

Python3.8 之后新减少了shared_memeory, 给之前各种共享内存的形式做了一个对立的繁难应用接口。咱们应用 share_memory 革新一下上节的代码:

from multiprocessing import shared_memory
def store_task_sha_v2(start, end, output, index, sha_name, shape, dtype):
    fname = "%s_worker_%s.csv" % (output, index)
    exist_sham = shared_memory.SharedMemory(name=sha_name)
    data = np.ndarray(shape, dtype=dtype, buffer=exist_sham.buf)
    print(sha_name, data.shape, index)
    np.savetxt(fname, data[start: end], delimiter='\t')
    del data
    exist_sham.close()

@profile
def mp_pool_sha():
    shm = shared_memory.SharedMemory(create=True, size=big_data.nbytes)
    b = np.ndarray(big_data.shape, dtype=big_data.dtype, buffer=shm.buf)
    b[:] = big_data[:]
    print(b.shape)
    with ProcessPoolExecutor(max_workers=worker_num) as pool:
        tasks = []
        for i in range(worker_num):
            start = i * task_num
            end = (i+1) * task_num
            tasks.append(
                pool.submit(store_task_sha_v2, 
                    start, end, 'testdata/mp_pool_sha', i ,
                    shm.name, b.shape, b.dtype))
        for t in tasks:
            # Note! 在这里捕捉异样,ProcessPoolExecutor 举荐这么应用!
            try:
                print(t.result())
            except Exception as e:
                print(f'{e}')
    del b
    shm.close()
    shm.unlink() 

代码简单了不少,但逻辑很简略: 共享缓冲区申请 -> 映射 local-ndarray 对象 -> 放数据进入共享缓存区 -> 其余过程读写 -> 敞开缓存区。share_memeory的益处还有他能够随时申请 local-variable 进行共享。

最佳实际总结

并行读文件加载ndarray

退出你的训练数据很大,须要流解决(训练),间接应用 torch.datasets 等模块加载,他们封装好了并行流处理过程。

如果须要一次性载入 RAM 解决(如 KNN 等算法)则能够采纳分块并行读:

def parallize_load(file, total_num, worker_num):
    """Load embedding file parallelization
       @emb_file: source filename
       @total_num: total lines
       @worker_num: parallelize process num
    return: np.ndaary
    """
    def load_from_txt(emb, start, n_rows, arr_list):
        data = np.loadtxt(emb, skiprows=start, max_rows=n_rows)
        arr_list.append(data)

    worker_load_num = total_num // worker_num
    pool = []
    with Manager() as manager:
        arr_list = manager.list([])
        for index in range(worker_num):
            s = index * worker_load_num
            if index != worker_num - 1:
                e = worker_load_num
            else:
                e = total_num - (worker_load_num * index)
            p = Process(target=load_from_txt, args=(emb_file, s, e, arr_list))
            pool.append(p)
            p.start()
        for p in pool:
            p.join()
        arr = np.concatenate(arr_list)
    return arr
source_total_num = sum(1 for line in open("souce_big_file", "rb"))
source_emb_data = parallize_load("souce_big_file", source_total_num, worker_num)

这基本上是worker_numX 倍的减速。

并行写入实际

  • 尽量避免对 large-ndarray 对象的切片、组合操作。
  • 尽量避免应用for-loop, 多用矩阵运算
  • 写入文件多过程效率更高,逻辑更简洁,但要时刻留神过程间数据不要产生复制
  • 尽可能采纳三方库的 io 接口如 np.savetxt,df.to_csv 等,他们可能对异样、分 chunk 写入等方面都有优化
  • 写入字符串时,能尽量地拼接'\t'.join(List[]), 就不要应用for ele in List: fp.write("%s\t%s\n" % (ele))

More work

本文探讨的对象只局限于 host-device 的 RAM 和 disk, 对于更常见的 GPU-mem,对于 Python 诸多三方库的接口来讲可就太苦楚了,他们往往都省略了调配 - 申请 - 调度 - 通信 - 销毁的过程,呈现 OOM 异样后排查只能靠指标察看。于此,接下来能够持续钻研下显存的最佳实际。

最初,兴许本文的内容会让你很惊讶,因为对 Python 做优化是一件出力不讨好的事件。但不得不说这些方法在我目前的工作中,在肯定资源的 constrain 下解决了原程序的很多问题。当然目前支流的机器学习算法流程都基于流解决,一次性地过大占用很少呈现了,但也有存在 embedding 读写等须要用到手动读写的中央。

  • [1] Understanding GIL
  • [2] Lib/concurrent/futures/process.py
  • [3] Python 之路
  • [4] Direct IO in Python
  • [5] Python Doc: 17.2.1.5. Sharing state between processes
  • [6] In-place numpy array concatenation? #13279
  • [7] Numpy: views vs copy by slicing
  • [8] Views versus copies in NumPy
正文完
 0