前两篇文章咱们介绍了如何应用GPU编程执行简略的工作,比方令人难以了解的并行任务、应用共享内存归并(reduce)和设施函数。为了进步咱们的并行处理能力,本文介绍CUDA事件和如何应用它们。然而在深入研究之前,咱们将首先探讨CUDA流。

后期筹备

导入和加载库,确保有一个GPU。

 import warnings from time import perf_counter, sleep  import numpy as np  import numba from numba import cuda from numba.core.errors import NumbaPerformanceWarning  print(np.__version__) print(numba.__version__)  # Ignore NumbaPerformanceWarning warnings.simplefilter("ignore", category=NumbaPerformanceWarning)  # 1.21.6 # 0.55.2  # Found 1 CUDA devices # id 0             b'Tesla T4'                              [SUPPORTED] #                       Compute Capability: 7.5 #                            PCI Device ID: 4 #                               PCI Bus ID: 0 #                                     UUID: GPU-eaab966c-a15b-15f7-94b1-a2d4932bac5f #                                 Watchdog: Disabled #              FP32/FP64 Performance Ratio: 32 # Summary: # 1/1 devices are supported # True

流 (Streams)

当咱们启动内核(函数)时,它会在 GPU 中排队期待执行,GPU 会程序依照启动工夫执行咱们的内核。设施中启动的许多工作可能依赖于之前的工作,所以“将它们放在同一个队列中”是有情理的。例如,如果将数据异步复制到 GPU 以应用某个内核解决它,则复制的步骤本必须在内核运行之前实现。

然而如果有两个互相独立的内核,将它们放在同一个队列中有意义吗?不肯定!因为对于这种状况,CUDA通过流的机制来进行解决。咱们能够将流视为独立的队列,它们彼此独立运行,也能够同时运行。这样在运行许多独立工作时,这能够大大放慢总运行工夫。

Numba 中的流

咱们这里演示一个简略的工作。给定一个数组 a,而后将用规范化版本笼罩它:

 a ← a / ∑a[i]

解决这个简略的工作须要应用三个内核。第一个内核 partial_reduce 将是上一篇文章中进行的归并操作的代码。它将返回一个 threads_per_block 大小的数组,把它传递给另一个内核 single_thread_sum,single_thread_sum将进一步将其缩减为单例数组(大小为 1)。这个内核将在单个线程的单个块上运行。最初还应用 divide_by 将原始数组除以咱们计算的总和最初失去咱们的后果。所有这些操作都将在 GPU 中进行,并且应该一个接一个地运行。

 threads_per_block = 256 blocks_per_grid = 32 * 40  @cuda.jit def partial_reduce(array, partial_reduction):     i_start = cuda.grid(1)     threads_per_grid = cuda.blockDim.x * cuda.gridDim.x     s_thread = 0.0     for i_arr in range(i_start, array.size, threads_per_grid):         s_thread += array[i_arr]      s_block = cuda.shared.array((threads_per_block,), numba.float32)     tid = cuda.threadIdx.x     s_block[tid] = s_thread     cuda.syncthreads()      i = cuda.blockDim.x // 2     while (i > 0):         if (tid < i):             s_block[tid] += s_block[tid + i]         cuda.syncthreads()         i //= 2      if tid == 0:         partial_reduction[cuda.blockIdx.x] = s_block[0]  @cuda.jit def single_thread_sum(partial_reduction, sum):     sum[0] = 0.0     for element in partial_reduction:         sum[0] += element   @cuda.jit def divide_by(array, val_array):     i_start = cuda.grid(1)     threads_per_grid = cuda.gridsize(1)     for i in range(i_start, array.size, threads_per_grid):         array[i] /= val_array[0]

当内核调用和其余操作没有指定流时,它们会在默认流中运行。默认流是一个非凡的流,它的行为取决于运行的参数是legacy 还是per-thread。对于咱们来说,在非默认流中运行工作就足够了。上面咱们看看如何运行咱们的三个内核:

 # Define host array a = np.ones(10_000_000, dtype=np.float32) print(f"Old sum: {a.sum():.2f}") # Old sum: 10000000.00  # Example 3.1: Numba CUDA Stream Semantics  # Pin memory with cuda.pinned(a):     # Create a CUDA stream     stream = cuda.stream()      # Array copy to device and creation in the device. With Numba, you pass the     # stream as an additional to API functions.     dev_a = cuda.to_device(a, stream=stream)     dev_a_reduce = cuda.device_array((blocks_per_grid,), dtype=dev_a.dtype, stream=stream)     dev_a_sum = cuda.device_array((1,), dtype=dev_a.dtype, stream=stream)      # When launching kernels, stream is passed to the kernel launcher ("dispatcher")     # configuration, and it comes after the block dimension (`threads_per_block`)     partial_reduce[blocks_per_grid, threads_per_block, stream](dev_a, dev_a_reduce)     single_thread_sum[1, 1, stream](dev_a_reduce, dev_a_sum)     divide_by[blocks_per_grid, threads_per_block, stream](dev_a, dev_a_sum)      # Array copy to host: like the copy to device, when a stream is passed, the copy     # is asynchronous. Note: the printed output will probably be nonsensical since     # the write has not been synchronized yet.     dev_a.copy_to_host(a, stream=stream)  # Whenever we want to ensure that all operations in a stream are finished from # the point of view of the host, we call: stream.synchronize()  # After that call, we can be sure that `a` has been overwritten with its # normalized version print(f"New sum: {a.sum():.2f}")

这里还有一个须要强调的内容:cuda.pinned。这是上下文管理器创立一种非凡类型的内存,称为页面锁定或固定内存,CUDA 在将内存从主机传输到设施时应用它会进步速度。

位于主机 RAM 中的内存能够随时进行分页,也就是说操作系统能够偷偷地将对象从 RAM 挪动到硬盘。这样做是为了将不常常应用的对象挪动到较慢的内存地位,从而将疾速的 RAM 内存留给更须要的对象。而是 CUDA 不容许从可分页对象到 GPU 的异步传输。这是因为磁盘(分页)→ RAM → GPU是十分迟缓的传输流。

要异步传输数据,咱们必须通过某种形式避免操作系统偷偷将数据暗藏在磁盘中的某个中央,这样能够保证数据始终位于 RAM 中。这就是cuda.pinned的作用,它创立了一个上下文,在该上下文中参数将被“锁定”,即强制位于 RAM 中。见图 3.2。

这样代码就非常简单了。创立一个流,而后将其传递给要对该流进行操作的每个 CUDA 函数。Numba中CUDA 内核配置(方括号)要求流位于块维度大小之后的第三个参数中。

个别状况下,将流传递给 Numba CUDA API 函数不会扭转它的行为,只会扭转它在其中运行的流。一个例外是从设施到主机的复制。然而有一个例外,当调用 device_array.copy_to_host()(不带参数)时复制是同步进行的。当调用 device_array.copy_to_host(stream=stream)(应用流)时,如果 device_array 没有pinned,复制也会同步进行。如果pinned并传递了流,则复制只会异步进行。

一个有用的提醒:Numba 提供了一个有用的上下文管理器,能够在其上下文中排队所有操作;退出上下文时,操作将被同步,包含内存传输。所以例3.1也能够写成:

 with cuda.pinned(a):     stream = cuda.stream()     with stream.auto_synchronize():         dev_a = cuda.to_device(a, stream=stream)         dev_a_reduce = cuda.device_array((blocks_per_grid,), dtype=dev_a.dtype, stream=stream)         dev_a_sum = cuda.device_array((1,), dtype=dev_a.dtype, stream=stream)         partial_reduce[blocks_per_grid, threads_per_block, stream](dev_a, dev_a_reduce)         single_thread_sum[1, 1, stream](dev_a_reduce, dev_a_sum)         divide_by[blocks_per_grid, threads_per_block, stream](dev_a, dev_a_sum)         dev_a.copy_to_host(a, stream=stream)

拆散独立内核与流

假如咱们要normalize的是多个数组。每一个独自数组的归一化操作是齐全互相独立的。然而GPU会等到一个标准化完结后才开始下一个标准化,这样不会享受到并行化带来的晋升。所以咱们能够把这些工作分成不同的流。

让咱们看一个规范化10个数组的例子——每个数组都应用本人的流。

 # Example 3.2: Multiple streams  N_streams = 10 # Do not memory-collect (deallocate arrays) within this context with cuda.defer_cleanup():     # Create 10 streams     streams = [cuda.stream() for _ in range(1, N_streams + 1)]      # Create base arrays     arrays = [         i * np.ones(10_000_000, dtype=np.float32) for i in range(1, N_streams + 1)     ]      for i, arr in enumerate(arrays):         print(f"Old sum (array {i}): {arr.sum():12.2f}")      tics = []  # Launch start times     for i, (stream, arr) in enumerate(zip(streams, arrays)):         tic = perf_counter()         with cuda.pinned(arr):             dev_a = cuda.to_device(arr, stream=stream)             dev_a_reduce = cuda.device_array(                 (blocks_per_grid,), dtype=dev_a.dtype, stream=stream             )             dev_a_sum = cuda.device_array((1,), dtype=dev_a.dtype, stream=stream)              partial_reduce[blocks_per_grid, threads_per_block, stream](dev_a, dev_a_reduce)             single_thread_sum[1, 1, stream](dev_a_reduce, dev_a_sum)             divide_by[blocks_per_grid, threads_per_block, stream](dev_a, dev_a_sum)              dev_a.copy_to_host(arr, stream=stream)          toc = perf_counter()  # Stop time of launches         print(f"Launched processing {i} in {1e3 * (toc - tic):.2f} ms")          # Ensure that the reference to the GPU arrays are deleted, this will         # ensure garbage collection at the exit of the context.         del dev_a, dev_a_reduce, dev_a_sum          tics.append(tic)      tocs = []     for i, (stream, arr) in enumerate(zip(streams, arrays)):         stream.synchronize()         toc = perf_counter()  # Stop time of sync         tocs.append(toc)         print(f"New sum (array {i}): {arr.sum():12.2f}")     for i in range(4):         print(f"Performed processing {i} in {1e3 * (tocs[i] - tics[i]):.2f} ms")      print(f"Total time {1e3 * (tocs[-1] - tics[0]):.2f} ms")  # Old sum (array 0):  10000000.00 # Old sum (array 1):  20000000.00 # Old sum (array 2):  30000000.00 # Old sum (array 3):  40000000.00 # Old sum (array 4):  50000000.00 # Old sum (array 5):  60000000.00 # Old sum (array 6):  70000000.00 # Old sum (array 7):  80000000.00 # Old sum (array 8):  90000000.00 # Old sum (array 9): 100000000.00 # Launched processing 0 in 12.99 ms # Launched processing 1 in 11.55 ms # Launched processing 2 in 11.53 ms # Launched processing 3 in 11.98 ms # Launched processing 4 in 11.09 ms # Launched processing 5 in 11.22 ms # Launched processing 6 in 12.16 ms # Launched processing 7 in 11.59 ms # Launched processing 8 in 11.85 ms # Launched processing 9 in 11.20 ms # New sum (array 0):         1.00 # New sum (array 1):         1.00 # New sum (array 2):         1.00 # New sum (array 3):         1.00 # New sum (array 4):         1.00 # New sum (array 5):         1.00 # New sum (array 6):         1.00 # New sum (array 7):         1.00 # New sum (array 8):         1.00 # New sum (array 9):         1.00 # Performed processing 0 in 118.77 ms # Performed processing 1 in 110.17 ms # Performed processing 2 in 102.25 ms # Performed processing 3 in 94.43 ms # Total time 158.13 ms

上面代码与单个流进行比拟:

 # Example 3.3: Single stream  # Do not memory-collect (deallocate arrays) within this context with cuda.defer_cleanup():     # Create 1 streams     streams = [cuda.stream()] * N_streams      # Create base arrays     arrays = [         i * np.ones(10_000_000, dtype=np.float32) for i in range(1, N_streams + 1)     ]      for i, arr in enumerate(arrays):         print(f"Old sum (array {i}): {arr.sum():12.2f}")      tics = []  # Launch start times     for i, (stream, arr) in enumerate(zip(streams, arrays)):         tic = perf_counter()                  with cuda.pinned(arr):             dev_a = cuda.to_device(arr, stream=stream)             dev_a_reduce = cuda.device_array(                 (blocks_per_grid,), dtype=dev_a.dtype, stream=stream             )             dev_a_sum = cuda.device_array((1,), dtype=dev_a.dtype, stream=stream)              partial_reduce[blocks_per_grid, threads_per_block, stream](dev_a, dev_a_reduce)             single_thread_sum[1, 1, stream](dev_a_reduce, dev_a_sum)             divide_by[blocks_per_grid, threads_per_block, stream](dev_a, dev_a_sum)              dev_a.copy_to_host(arr, stream=stream)          toc = perf_counter()  # Stop time of launches         print(f"Launched processing {i} in {1e3 * (toc - tic):.2f} ms")          # Ensure that the reference to the GPU arrays are deleted, this will         # ensure garbage collection at the exit of the context.         del dev_a, dev_a_reduce, dev_a_sum          tics.append(tic)      tocs = []     for i, (stream, arr) in enumerate(zip(streams, arrays)):         stream.synchronize()         toc = perf_counter()  # Stop time of sync         tocs.append(toc)         print(f"New sum (array {i}): {arr.sum():12.2f}")     for i in range(4):         print(f"Performed processing {i} in {1e3 * (tocs[i] - tics[i]):.2f} ms")      print(f"Total time {1e3 * (tocs[-1] - tics[0]):.2f} ms")   # Old sum (array 0):  10000000.00 # Old sum (array 1):  20000000.00 # Old sum (array 2):  30000000.00 # Old sum (array 3):  40000000.00 # Old sum (array 4):  50000000.00 # Old sum (array 5):  60000000.00 # Old sum (array 6):  70000000.00 # Old sum (array 7):  80000000.00 # Old sum (array 8):  90000000.00 # Old sum (array 9): 100000000.00 # Launched processing 0 in 13.42 ms # Launched processing 1 in 12.62 ms # Launched processing 2 in 16.10 ms # Launched processing 3 in 13.74 ms # Launched processing 4 in 17.59 ms # Launched processing 5 in 12.57 ms # Launched processing 6 in 12.44 ms # Launched processing 7 in 12.32 ms # Launched processing 8 in 12.54 ms # Launched processing 9 in 13.54 ms # New sum (array 0):         1.00 # New sum (array 1):         1.00 # New sum (array 2):         1.00 # New sum (array 3):         1.00 # New sum (array 4):         1.00 # New sum (array 5):         1.00 # New sum (array 6):         1.00 # New sum (array 7):         1.00 # New sum (array 8):         1.00 # New sum (array 9):         1.00 # Performed processing 0 in 143.38 ms # Performed processing 1 in 140.16 ms # Performed processing 2 in 135.72 ms # Performed processing 3 in 126.30 ms # Total time 208.43 ms

哪一个更快呢?当应用多个流时并没有看到总工夫改良。这可能有很多起因。例如,对于并发运行的流,本地内存中必须有足够的空间。英伟达提供了几个工具来调试CUDA,包含调试CUDA流。请查看他们的Nsight Systems理解更多信息。

事件

CPU 的运行流程的问题之一是它会比 GPU 的蕴含更多的操作。

所以能够应用 CUDA 间接从 GPU 对事件进行操作工夫的记录。事件只是 GPU 中产生某事的工夫寄存器。在某种程度上,它相似于 time.time 和 time.perf_counter,但与它们不同的是,咱们须要解决的是:从 CPU进行编程,从 GPU 为事件计时。

所以除了创立工夫戳(“记录”事件)之外,咱们还须要确保事件与 CPU 同步,这样能力对其进行拜访。让咱们查看一个简略的例子。

用于内核执行的事件的计时器

 # Example 3.4: Simple events  # Events need to be initialized, but this does not starting timing. # We create two events, one at the start of computations, and one at the end. event_beg = cuda.event() event_end = cuda.event()  # Create CUDA stream stream = cuda.stream()  with cuda.pinned(arr):     # Queue array copy/create in `stream`     dev_a = cuda.to_device(arr, stream=stream)     dev_a_reduce = cuda.device_array((blocks_per_grid,), dtype=dev_a.dtype, stream=stream)      # Here we issue our first event recording. `event_beg` from this line onwards     # will contain the time referring to this moment in the GPU.     event_beg.record(stream=stream)      # Launch kernel asynchronously     partial_reduce[blocks_per_grid, threads_per_block, stream](dev_a, dev_a_reduce)      # Launch a "record" which will be trigged when the kernel run ends     event_end.record(stream=stream)      # Future tasks submitted to the stream will wait util `event_end` completes.     event_end.wait(stream=stream)      # Synchronize this event with the CPU, so we can use its value.     event_end.synchronize()  # Now we calculate the time it took to execute the kernel. Note that we do not # need to wait/synchronize `event_beg` because its execution is contingent upon # event_end having waited/synchronized timing_ms = event_beg.elapsed_time(event_end)  # in miliseconds  print(f"Elapsed time {timing_ms:.2f} ms") # Elapsed time 0.57 ms

为GPU操作计时的一个有用办法是应用上下文管理器:

 # Example 3.5: Context Manager for CUDA Timer using Events class CUDATimer:     def __init__(self, stream):         self.stream = stream         self.event = None  # in ms      def __enter__(self):         self.event_beg = cuda.event()         self.event_end = cuda.event()         self.event_beg.record(stream=self.stream)         return self      def __exit__(self, type, value, traceback):         self.event_end.record(stream=self.stream)         self.event_end.wait(stream=self.stream)         self.event_end.synchronize()         self.elapsed = self.event_beg.elapsed_time(self.event_end)   stream = cuda.stream() dev_a = cuda.to_device(arrays[0], stream=stream) dev_a_reduce = cuda.device_array((blocks_per_grid,), dtype=dev_a.dtype, stream=stream) with CUDATimer(stream) as cudatimer:     partial_reduce[blocks_per_grid, threads_per_block, stream](dev_a, dev_a_reduce) print(f"Elapsed time {cudatimer.elapsed:.2f} ms") # Elapsed time 0.53 ms

对流中事件的计时器

咱们将计时器和CUDA中的流进行联合,实现本文的最终目标:

 # Example 3.6: Timing a single streams with events  N_streams = 10  # Do not memory-collect (deallocate arrays) within this context with cuda.defer_cleanup():     # Create 1 stream     streams = [cuda.stream()] * N_streams      # Create base arrays     arrays = [         i * np.ones(10_000_000, dtype=np.float32) for i in range(1, N_streams + 1)     ]      events_beg = []  # Launch start times     events_end = []  # End start times     for i, (stream, arr) in enumerate(zip(streams, arrays)):         with cuda.pinned(arr):             # Declare events and record start             event_beg = cuda.event()             event_end = cuda.event()             event_beg.record(stream=stream)              # Do all CUDA operations             dev_a = cuda.to_device(arr, stream=stream)             dev_a_reduce = cuda.device_array(                 (blocks_per_grid,), dtype=dev_a.dtype, stream=stream             )             dev_a_sum = cuda.device_array((1,), dtype=dev_a.dtype, stream=stream)             partial_reduce[blocks_per_grid, threads_per_block, stream](dev_a, dev_a_reduce)             single_thread_sum[1, 1, stream](dev_a_reduce, dev_a_sum)             divide_by[blocks_per_grid, threads_per_block, stream](dev_a, dev_a_sum)             dev_a.copy_to_host(arr, stream=stream)              # Record end             event_end.record(stream=stream)          events_beg.append(event_beg)         events_end.append(event_end)          del dev_a, dev_a_reduce, dev_a_sum  sleep(5)  # Wait for all events to finish, does not affect GPU timing for event_end in events_end:     event_end.synchronize()  # The first `event_beg` launched is the earliest event. But the last `event_end` # is not known a priori. We find which event that is with: elapsed_times = [events_beg[0].elapsed_time(event_end) for event_end in events_end] i_stream_last = np.argmax(elapsed_times)  print(f"Last stream: {i_stream_last}") print(f"Total time {elapsed_times[i_stream_last]:.2f} ms") # Last stream: 9 # Total time 113.16 ms  # Example 3.7: Timing multiple streams with events  # Do not memory-collect (deallocate arrays) within this context with cuda.defer_cleanup():     # Create 10 streams     streams = [cuda.stream() for _ in range(1, N_streams + 1)]      # Create base arrays     arrays = [         i * np.ones(10_000_000, dtype=np.float32) for i in range(1, N_streams + 1)     ]      events_beg = []  # Launch start times     events_end = []  # End start times     for i, (stream, arr) in enumerate(zip(streams, arrays)):         with cuda.pinned(arr):             # Declare events and record start             event_beg = cuda.event()             event_end = cuda.event()             event_beg.record(stream=stream)              # Do all CUDA operations             dev_a = cuda.to_device(arr, stream=stream)             dev_a_reduce = cuda.device_array(                 (blocks_per_grid,), dtype=dev_a.dtype, stream=stream             )             dev_a_sum = cuda.device_array((1,), dtype=dev_a.dtype, stream=stream)             partial_reduce[blocks_per_grid, threads_per_block, stream](dev_a, dev_a_reduce)             single_thread_sum[1, 1, stream](dev_a_reduce, dev_a_sum)             divide_by[blocks_per_grid, threads_per_block, stream](dev_a, dev_a_sum)             dev_a.copy_to_host(arr, stream=stream)              # Record end             event_end.record(stream=stream)          events_beg.append(event_beg)         events_end.append(event_end)          del dev_a, dev_a_reduce, dev_a_sum  sleep(5)  # Wait for all events to finish, does not affect GPU timing for event_end in events_end:     event_end.synchronize()  # The first `event_beg` launched is the earliest event. But the last `event_end` # is not known a priori. We find which event that is with: elapsed_times = [events_beg[0].elapsed_time(event_end) for event_end in events_end] i_stream_last = np.argmax(elapsed_times)  print(f"Last stream: {i_stream_last}") print(f"Total time {elapsed_times[i_stream_last]:.2f} ms") # Last stream: 9 # Total time 108.50 ms

总结

CUDA是高性能的。在本教程中,介绍了如何应用事件精确地测量内核的执行工夫,这种办法可用于剖析代码。还介绍了流以及如何应用它们始终保持gpu的占用,以及映射数组如何改善内存拜访。以下是本文的源代码:

https://avoid.overfit.cn/post/fd3454303b9b4a7e8a2898b7d24b41ec

作者:Carlos Costa, Ph.D.