乐趣区

关于深度学习:从头开始进行CUDA编程流和事件

前两篇文章咱们介绍了如何应用 GPU 编程执行简略的工作,比方令人难以了解的并行任务、应用共享内存归并(reduce)和设施函数。为了进步咱们的并行处理能力,本文介绍 CUDA 事件和如何应用它们。然而在深入研究之前,咱们将首先探讨 CUDA 流。

后期筹备

导入和加载库,确保有一个 GPU。

 import warnings
 from time import perf_counter, sleep
 
 import numpy as np
 
 import numba
 from numba import cuda
 from numba.core.errors import NumbaPerformanceWarning
 
 print(np.__version__)
 print(numba.__version__)
 
 # Ignore NumbaPerformanceWarning
 warnings.simplefilter("ignore", category=NumbaPerformanceWarning)
 
 # 1.21.6
 # 0.55.2
 
 # Found 1 CUDA devices
 # id 0             b'Tesla T4'                              [SUPPORTED]
 #                       Compute Capability: 7.5
 #                            PCI Device ID: 4
 #                               PCI Bus ID: 0
 #                                     UUID: GPU-eaab966c-a15b-15f7-94b1-a2d4932bac5f
 #                                 Watchdog: Disabled
 #              FP32/FP64 Performance Ratio: 32
 # Summary:
 # 1/1 devices are supported
 # True

流(Streams)

当咱们启动内核(函数)时,它会在 GPU 中排队期待执行,GPU 会程序依照启动工夫执行咱们的内核。设施中启动的许多工作可能依赖于之前的工作,所以“将它们放在同一个队列中”是有情理的。例如,如果将数据异步复制到 GPU 以应用某个内核解决它,则复制的步骤本必须在内核运行之前实现。

然而如果有两个互相独立的内核,将它们放在同一个队列中有意义吗?不肯定!因为对于这种状况,CUDA 通过流的机制来进行解决。咱们能够将流视为独立的队列,它们彼此独立运行,也能够同时运行。这样在运行许多独立工作时,这能够大大放慢总运行工夫。

Numba 中的流

咱们这里演示一个简略的工作。给定一个数组 a,而后将用规范化版本笼罩它:

 a ← a / ∑a[i]

解决这个简略的工作须要应用三个内核。第一个内核 partial_reduce 将是上一篇文章中进行的归并操作的代码。它将返回一个 threads_per_block 大小的数组,把它传递给另一个内核 single_thread_sum,single_thread_sum 将进一步将其缩减为单例数组(大小为 1)。这个内核将在单个线程的单个块上运行。最初还应用 divide_by 将原始数组除以咱们计算的总和最初失去咱们的后果。所有这些操作都将在 GPU 中进行,并且应该一个接一个地运行。

 threads_per_block = 256
 blocks_per_grid = 32 * 40
 
 @cuda.jit
 def partial_reduce(array, partial_reduction):
     i_start = cuda.grid(1)
     threads_per_grid = cuda.blockDim.x * cuda.gridDim.x
     s_thread = 0.0
     for i_arr in range(i_start, array.size, threads_per_grid):
         s_thread += array[i_arr]
 
     s_block = cuda.shared.array((threads_per_block,), numba.float32)
     tid = cuda.threadIdx.x
     s_block[tid] = s_thread
     cuda.syncthreads()
 
     i = cuda.blockDim.x // 2
     while (i > 0):
         if (tid < i):
             s_block[tid] += s_block[tid + i]
         cuda.syncthreads()
         i //= 2
 
     if tid == 0:
         partial_reduction[cuda.blockIdx.x] = s_block[0]
 
 @cuda.jit
 def single_thread_sum(partial_reduction, sum):
     sum[0] = 0.0
     for element in partial_reduction:
         sum[0] += element
 
 
 @cuda.jit
 def divide_by(array, val_array):
     i_start = cuda.grid(1)
     threads_per_grid = cuda.gridsize(1)
     for i in range(i_start, array.size, threads_per_grid):
         array[i] /= val_array[0]

当内核调用和其余操作没有指定流时,它们会在默认流中运行。默认流是一个非凡的流,它的行为取决于运行的参数是 legacy 还是 per-thread。对于咱们来说,在非默认流中运行工作就足够了。上面咱们看看如何运行咱们的三个内核:

 # Define host array
 a = np.ones(10_000_000, dtype=np.float32)
 print(f"Old sum: {a.sum():.2f}")
 # Old sum: 10000000.00
 
 # Example 3.1: Numba CUDA Stream Semantics
 
 # Pin memory
 with cuda.pinned(a):
     # Create a CUDA stream
     stream = cuda.stream()
 
     # Array copy to device and creation in the device. With Numba, you pass the
     # stream as an additional to API functions.
     dev_a = cuda.to_device(a, stream=stream)
     dev_a_reduce = cuda.device_array((blocks_per_grid,), dtype=dev_a.dtype, stream=stream)
     dev_a_sum = cuda.device_array((1,), dtype=dev_a.dtype, stream=stream)
 
     # When launching kernels, stream is passed to the kernel launcher ("dispatcher")
     # configuration, and it comes after the block dimension (`threads_per_block`)
     partial_reduce[blocks_per_grid, threads_per_block, stream](dev_a, dev_a_reduce)
     single_thread_sum[1, 1, stream](dev_a_reduce, dev_a_sum)
     divide_by[blocks_per_grid, threads_per_block, stream](dev_a, dev_a_sum)
 
     # Array copy to host: like the copy to device, when a stream is passed, the copy
     # is asynchronous. Note: the printed output will probably be nonsensical since
     # the write has not been synchronized yet.
     dev_a.copy_to_host(a, stream=stream)
 
 # Whenever we want to ensure that all operations in a stream are finished from
 # the point of view of the host, we call:
 stream.synchronize()
 
 # After that call, we can be sure that `a` has been overwritten with its
 # normalized version
 print(f"New sum: {a.sum():.2f}")

这里还有一个须要强调的内容:cuda.pinned。这是上下文管理器创立一种非凡类型的内存,称为页面锁定或固定内存,CUDA 在将内存从主机传输到设施时应用它会进步速度。

位于主机 RAM 中的内存能够随时进行分页,也就是说操作系统能够偷偷地将对象从 RAM 挪动到硬盘。这样做是为了将不常常应用的对象挪动到较慢的内存地位,从而将疾速的 RAM 内存留给更须要的对象。而是 CUDA 不容许从可分页对象到 GPU 的异步传输。这是因为磁盘(分页)→ RAM → GPU 是十分迟缓的传输流。

要异步传输数据,咱们必须通过某种形式避免操作系统偷偷将数据暗藏在磁盘中的某个中央,这样能够保证数据始终位于 RAM 中。这就是 cuda.pinned 的作用,它创立了一个上下文,在该上下文中参数将被“锁定”,即强制位于 RAM 中。见图 3.2。

这样代码就非常简单了。创立一个流,而后将其传递给要对该流进行操作的每个 CUDA 函数。Numba 中 CUDA 内核配置(方括号)要求流位于块维度大小之后的第三个参数中。

个别状况下,将流传递给 Numba CUDA API 函数不会扭转它的行为,只会扭转它在其中运行的流。一个例外是从设施到主机的复制。然而有一个例外,当调用 device_array.copy_to_host()(不带参数)时复制是同步进行的。当调用 device_array.copy_to_host(stream=stream)(应用流)时,如果 device_array 没有 pinned,复制也会同步进行。如果 pinned 并传递了流,则复制只会异步进行。

一个有用的提醒:Numba 提供了一个有用的上下文管理器,能够在其上下文中排队所有操作;退出上下文时,操作将被同步,包含内存传输。所以例 3.1 也能够写成:

 with cuda.pinned(a):
     stream = cuda.stream()
     with stream.auto_synchronize():
         dev_a = cuda.to_device(a, stream=stream)
         dev_a_reduce = cuda.device_array((blocks_per_grid,), dtype=dev_a.dtype, stream=stream)
         dev_a_sum = cuda.device_array((1,), dtype=dev_a.dtype, stream=stream)
         partial_reduce[blocks_per_grid, threads_per_block, stream](dev_a, dev_a_reduce)
         single_thread_sum[1, 1, stream](dev_a_reduce, dev_a_sum)
         divide_by[blocks_per_grid, threads_per_block, stream](dev_a, dev_a_sum)
         dev_a.copy_to_host(a, stream=stream)

拆散独立内核与流

假如咱们要 normalize 的是多个数组。每一个独自数组的归一化操作是齐全互相独立的。然而 GPU 会等到一个标准化完结后才开始下一个标准化,这样不会享受到并行化带来的晋升。所以咱们能够把这些工作分成不同的流。

让咱们看一个规范化 10 个数组的例子——每个数组都应用本人的流。

 # Example 3.2: Multiple streams
 
 N_streams = 10
 # Do not memory-collect (deallocate arrays) within this context
 with cuda.defer_cleanup():
     # Create 10 streams
     streams = [cuda.stream() for _ in range(1, N_streams + 1)]
 
     # Create base arrays
     arrays = [i * np.ones(10_000_000, dtype=np.float32) for i in range(1, N_streams + 1)
     ]
 
     for i, arr in enumerate(arrays):
         print(f"Old sum (array {i}): {arr.sum():12.2f}")
 
     tics = []  # Launch start times
     for i, (stream, arr) in enumerate(zip(streams, arrays)):
         tic = perf_counter()
         with cuda.pinned(arr):
             dev_a = cuda.to_device(arr, stream=stream)
             dev_a_reduce = cuda.device_array((blocks_per_grid,), dtype=dev_a.dtype, stream=stream
             )
             dev_a_sum = cuda.device_array((1,), dtype=dev_a.dtype, stream=stream)
 
             partial_reduce[blocks_per_grid, threads_per_block, stream](dev_a, dev_a_reduce)
             single_thread_sum[1, 1, stream](dev_a_reduce, dev_a_sum)
             divide_by[blocks_per_grid, threads_per_block, stream](dev_a, dev_a_sum)
 
             dev_a.copy_to_host(arr, stream=stream)
 
         toc = perf_counter()  # Stop time of launches
         print(f"Launched processing {i} in {1e3 * (toc - tic):.2f} ms")
 
         # Ensure that the reference to the GPU arrays are deleted, this will
         # ensure garbage collection at the exit of the context.
         del dev_a, dev_a_reduce, dev_a_sum
 
         tics.append(tic)
 
     tocs = []
     for i, (stream, arr) in enumerate(zip(streams, arrays)):
         stream.synchronize()
         toc = perf_counter()  # Stop time of sync
         tocs.append(toc)
         print(f"New sum (array {i}): {arr.sum():12.2f}")
     for i in range(4):
         print(f"Performed processing {i} in {1e3 * (tocs[i] - tics[i]):.2f} ms")
 
     print(f"Total time {1e3 * (tocs[-1] - tics[0]):.2f} ms")
 
 # Old sum (array 0):  10000000.00
 # Old sum (array 1):  20000000.00
 # Old sum (array 2):  30000000.00
 # Old sum (array 3):  40000000.00
 # Old sum (array 4):  50000000.00
 # Old sum (array 5):  60000000.00
 # Old sum (array 6):  70000000.00
 # Old sum (array 7):  80000000.00
 # Old sum (array 8):  90000000.00
 # Old sum (array 9): 100000000.00
 # Launched processing 0 in 12.99 ms
 # Launched processing 1 in 11.55 ms
 # Launched processing 2 in 11.53 ms
 # Launched processing 3 in 11.98 ms
 # Launched processing 4 in 11.09 ms
 # Launched processing 5 in 11.22 ms
 # Launched processing 6 in 12.16 ms
 # Launched processing 7 in 11.59 ms
 # Launched processing 8 in 11.85 ms
 # Launched processing 9 in 11.20 ms
 # New sum (array 0):         1.00
 # New sum (array 1):         1.00
 # New sum (array 2):         1.00
 # New sum (array 3):         1.00
 # New sum (array 4):         1.00
 # New sum (array 5):         1.00
 # New sum (array 6):         1.00
 # New sum (array 7):         1.00
 # New sum (array 8):         1.00
 # New sum (array 9):         1.00
 # Performed processing 0 in 118.77 ms
 # Performed processing 1 in 110.17 ms
 # Performed processing 2 in 102.25 ms
 # Performed processing 3 in 94.43 ms
 # Total time 158.13 ms

上面代码与单个流进行比拟:

 # Example 3.3: Single stream
 
 # Do not memory-collect (deallocate arrays) within this context
 with cuda.defer_cleanup():
     # Create 1 streams
     streams = [cuda.stream()] * N_streams
 
     # Create base arrays
     arrays = [i * np.ones(10_000_000, dtype=np.float32) for i in range(1, N_streams + 1)
     ]
 
     for i, arr in enumerate(arrays):
         print(f"Old sum (array {i}): {arr.sum():12.2f}")
 
     tics = []  # Launch start times
     for i, (stream, arr) in enumerate(zip(streams, arrays)):
         tic = perf_counter()
         
         with cuda.pinned(arr):
             dev_a = cuda.to_device(arr, stream=stream)
             dev_a_reduce = cuda.device_array((blocks_per_grid,), dtype=dev_a.dtype, stream=stream
             )
             dev_a_sum = cuda.device_array((1,), dtype=dev_a.dtype, stream=stream)
 
             partial_reduce[blocks_per_grid, threads_per_block, stream](dev_a, dev_a_reduce)
             single_thread_sum[1, 1, stream](dev_a_reduce, dev_a_sum)
             divide_by[blocks_per_grid, threads_per_block, stream](dev_a, dev_a_sum)
 
             dev_a.copy_to_host(arr, stream=stream)
 
         toc = perf_counter()  # Stop time of launches
         print(f"Launched processing {i} in {1e3 * (toc - tic):.2f} ms")
 
         # Ensure that the reference to the GPU arrays are deleted, this will
         # ensure garbage collection at the exit of the context.
         del dev_a, dev_a_reduce, dev_a_sum
 
         tics.append(tic)
 
     tocs = []
     for i, (stream, arr) in enumerate(zip(streams, arrays)):
         stream.synchronize()
         toc = perf_counter()  # Stop time of sync
         tocs.append(toc)
         print(f"New sum (array {i}): {arr.sum():12.2f}")
     for i in range(4):
         print(f"Performed processing {i} in {1e3 * (tocs[i] - tics[i]):.2f} ms")
 
     print(f"Total time {1e3 * (tocs[-1] - tics[0]):.2f} ms")
 
 
 # Old sum (array 0):  10000000.00
 # Old sum (array 1):  20000000.00
 # Old sum (array 2):  30000000.00
 # Old sum (array 3):  40000000.00
 # Old sum (array 4):  50000000.00
 # Old sum (array 5):  60000000.00
 # Old sum (array 6):  70000000.00
 # Old sum (array 7):  80000000.00
 # Old sum (array 8):  90000000.00
 # Old sum (array 9): 100000000.00
 # Launched processing 0 in 13.42 ms
 # Launched processing 1 in 12.62 ms
 # Launched processing 2 in 16.10 ms
 # Launched processing 3 in 13.74 ms
 # Launched processing 4 in 17.59 ms
 # Launched processing 5 in 12.57 ms
 # Launched processing 6 in 12.44 ms
 # Launched processing 7 in 12.32 ms
 # Launched processing 8 in 12.54 ms
 # Launched processing 9 in 13.54 ms
 # New sum (array 0):         1.00
 # New sum (array 1):         1.00
 # New sum (array 2):         1.00
 # New sum (array 3):         1.00
 # New sum (array 4):         1.00
 # New sum (array 5):         1.00
 # New sum (array 6):         1.00
 # New sum (array 7):         1.00
 # New sum (array 8):         1.00
 # New sum (array 9):         1.00
 # Performed processing 0 in 143.38 ms
 # Performed processing 1 in 140.16 ms
 # Performed processing 2 in 135.72 ms
 # Performed processing 3 in 126.30 ms
 # Total time 208.43 ms

哪一个更快呢? 当应用多个流时并没有看到总工夫改良。这可能有很多起因。例如,对于并发运行的流,本地内存中必须有足够的空间。英伟达提供了几个工具来调试 CUDA,包含调试 CUDA 流。请查看他们的 Nsight Systems 理解更多信息。

事件

CPU 的运行流程的问题之一是它会比 GPU 的蕴含更多的操作。

所以能够应用 CUDA 间接从 GPU 对事件进行操作工夫的记录。事件只是 GPU 中产生某事的工夫寄存器。在某种程度上,它相似于 time.time 和 time.perf_counter,但与它们不同的是,咱们须要解决的是:从 CPU 进行编程,从 GPU 为事件计时。

所以除了创立工夫戳(“记录”事件)之外,咱们还须要确保事件与 CPU 同步,这样能力对其进行拜访。让咱们查看一个简略的例子。

用于内核执行的事件的计时器

 # Example 3.4: Simple events
 
 # Events need to be initialized, but this does not starting timing.
 # We create two events, one at the start of computations, and one at the end.
 event_beg = cuda.event()
 event_end = cuda.event()
 
 # Create CUDA stream
 stream = cuda.stream()
 
 with cuda.pinned(arr):
     # Queue array copy/create in `stream`
     dev_a = cuda.to_device(arr, stream=stream)
     dev_a_reduce = cuda.device_array((blocks_per_grid,), dtype=dev_a.dtype, stream=stream)
 
     # Here we issue our first event recording. `event_beg` from this line onwards
     # will contain the time referring to this moment in the GPU.
     event_beg.record(stream=stream)
 
     # Launch kernel asynchronously
     partial_reduce[blocks_per_grid, threads_per_block, stream](dev_a, dev_a_reduce)
 
     # Launch a "record" which will be trigged when the kernel run ends
     event_end.record(stream=stream)
 
     # Future tasks submitted to the stream will wait util `event_end` completes.
     event_end.wait(stream=stream)
 
     # Synchronize this event with the CPU, so we can use its value.
     event_end.synchronize()
 
 # Now we calculate the time it took to execute the kernel. Note that we do not
 # need to wait/synchronize `event_beg` because its execution is contingent upon
 # event_end having waited/synchronized
 timing_ms = event_beg.elapsed_time(event_end)  # in miliseconds
 
 print(f"Elapsed time {timing_ms:.2f} ms")
 # Elapsed time 0.57 ms

为 GPU 操作计时的一个有用办法是应用上下文管理器:

 # Example 3.5: Context Manager for CUDA Timer using Events
 class CUDATimer:
     def __init__(self, stream):
         self.stream = stream
         self.event = None  # in ms
 
     def __enter__(self):
         self.event_beg = cuda.event()
         self.event_end = cuda.event()
         self.event_beg.record(stream=self.stream)
         return self
 
     def __exit__(self, type, value, traceback):
         self.event_end.record(stream=self.stream)
         self.event_end.wait(stream=self.stream)
         self.event_end.synchronize()
         self.elapsed = self.event_beg.elapsed_time(self.event_end)
 
 
 stream = cuda.stream()
 dev_a = cuda.to_device(arrays[0], stream=stream)
 dev_a_reduce = cuda.device_array((blocks_per_grid,), dtype=dev_a.dtype, stream=stream)
 with CUDATimer(stream) as cudatimer:
     partial_reduce[blocks_per_grid, threads_per_block, stream](dev_a, dev_a_reduce)
 print(f"Elapsed time {cudatimer.elapsed:.2f} ms")
 # Elapsed time 0.53 ms

对流中事件的计时器

咱们将计时器和 CUDA 中的流进行联合,实现本文的最终目标:

 # Example 3.6: Timing a single streams with events
 
 N_streams = 10
 
 # Do not memory-collect (deallocate arrays) within this context
 with cuda.defer_cleanup():
     # Create 1 stream
     streams = [cuda.stream()] * N_streams
 
     # Create base arrays
     arrays = [i * np.ones(10_000_000, dtype=np.float32) for i in range(1, N_streams + 1)
     ]
 
     events_beg = []  # Launch start times
     events_end = []  # End start times
     for i, (stream, arr) in enumerate(zip(streams, arrays)):
         with cuda.pinned(arr):
             # Declare events and record start
             event_beg = cuda.event()
             event_end = cuda.event()
             event_beg.record(stream=stream)
 
             # Do all CUDA operations
             dev_a = cuda.to_device(arr, stream=stream)
             dev_a_reduce = cuda.device_array((blocks_per_grid,), dtype=dev_a.dtype, stream=stream
             )
             dev_a_sum = cuda.device_array((1,), dtype=dev_a.dtype, stream=stream)
             partial_reduce[blocks_per_grid, threads_per_block, stream](dev_a, dev_a_reduce)
             single_thread_sum[1, 1, stream](dev_a_reduce, dev_a_sum)
             divide_by[blocks_per_grid, threads_per_block, stream](dev_a, dev_a_sum)
             dev_a.copy_to_host(arr, stream=stream)
 
             # Record end
             event_end.record(stream=stream)
 
         events_beg.append(event_beg)
         events_end.append(event_end)
 
         del dev_a, dev_a_reduce, dev_a_sum
 
 sleep(5)  # Wait for all events to finish, does not affect GPU timing
 for event_end in events_end:
     event_end.synchronize()
 
 # The first `event_beg` launched is the earliest event. But the last `event_end`
 # is not known a priori. We find which event that is with:
 elapsed_times = [events_beg[0].elapsed_time(event_end) for event_end in events_end]
 i_stream_last = np.argmax(elapsed_times)
 
 print(f"Last stream: {i_stream_last}")
 print(f"Total time {elapsed_times[i_stream_last]:.2f} ms")
 # Last stream: 9
 # Total time 113.16 ms
 
 # Example 3.7: Timing multiple streams with events
 
 # Do not memory-collect (deallocate arrays) within this context
 with cuda.defer_cleanup():
     # Create 10 streams
     streams = [cuda.stream() for _ in range(1, N_streams + 1)]
 
     # Create base arrays
     arrays = [i * np.ones(10_000_000, dtype=np.float32) for i in range(1, N_streams + 1)
     ]
 
     events_beg = []  # Launch start times
     events_end = []  # End start times
     for i, (stream, arr) in enumerate(zip(streams, arrays)):
         with cuda.pinned(arr):
             # Declare events and record start
             event_beg = cuda.event()
             event_end = cuda.event()
             event_beg.record(stream=stream)
 
             # Do all CUDA operations
             dev_a = cuda.to_device(arr, stream=stream)
             dev_a_reduce = cuda.device_array((blocks_per_grid,), dtype=dev_a.dtype, stream=stream
             )
             dev_a_sum = cuda.device_array((1,), dtype=dev_a.dtype, stream=stream)
             partial_reduce[blocks_per_grid, threads_per_block, stream](dev_a, dev_a_reduce)
             single_thread_sum[1, 1, stream](dev_a_reduce, dev_a_sum)
             divide_by[blocks_per_grid, threads_per_block, stream](dev_a, dev_a_sum)
             dev_a.copy_to_host(arr, stream=stream)
 
             # Record end
             event_end.record(stream=stream)
 
         events_beg.append(event_beg)
         events_end.append(event_end)
 
         del dev_a, dev_a_reduce, dev_a_sum
 
 sleep(5)  # Wait for all events to finish, does not affect GPU timing
 for event_end in events_end:
     event_end.synchronize()
 
 # The first `event_beg` launched is the earliest event. But the last `event_end`
 # is not known a priori. We find which event that is with:
 elapsed_times = [events_beg[0].elapsed_time(event_end) for event_end in events_end]
 i_stream_last = np.argmax(elapsed_times)
 
 print(f"Last stream: {i_stream_last}")
 print(f"Total time {elapsed_times[i_stream_last]:.2f} ms")
 # Last stream: 9
 # Total time 108.50 ms

总结

CUDA 是高性能的。在本教程中,介绍了如何应用事件精确地测量内核的执行工夫,这种办法可用于剖析代码。还介绍了流以及如何应用它们始终保持 gpu 的占用,以及映射数组如何改善内存拜访。以下是本文的源代码:

https://avoid.overfit.cn/post/fd3454303b9b4a7e8a2898b7d24b41ec

作者:Carlos Costa, Ph.D.

退出移动版