任何使用 PyTorch 模型代码的人都会开始提出相同的问题: 為什麼要這麼長時間?如何讓我的訓練循環更快? 為什麼要這麼長時間?如何讓我的訓練循環更快? 无论您是ML工程师,研究人员还是刚刚决定在周末玩随机ML存储库,您最终都将试图了解如何加速代码。 然而,在我们能够做到这一点之前,我们需要学习如何正确地测量性能,然后从这些测量中得出正确的结论。 流程执行 让我们在 H100 上使用矩阵倍数作为整个运行示例。 然而,在我们考虑测量之前,让我们用“内核”来指的是在GPU上运行的函数(或者实际上是任何操作)。 torch.zeros(十六、十六),装置=“cuda) a + b,其中 a 和 b 是 GPU 内存中的压缩器 A @ B flash_attention(q、k、v) 因此,在 尽管内核在GPU上运行,但其 如果我们以训练代码来想象一个Python文件,它会在“准备工作”之间进行交替,例如for-loops或 if-statements,以及实际的内核发射。由于这个原因,CPU在前进,每次它击中内核时,它会通过将其添加到队列中来安排该内核在GPU上运行。 发射 这允许GPU避免停留:它可以始终在该内核排列中找到工作(除非明确呼吁同步或执行与CPU有关)。 天真的方法和为什么它是错误的 当我们第一次想到在Python中测量时间时,自然的本能是达到时间模块并运行这样的东西: import time import torch def matmul(a, b): return a @ b def get_data(m, n, k): a = torch.randn(m, n, device="cuda", dtype=torch.bfloat16) b = torch.randn(n, k, device="cuda", dtype=torch.bfloat16) return a, b def benchmark_naive(m, n, k, n_iters: int = 100): a, b = get_data(m, n, k) times = [] for _ in range(n_iters): start = time.perf_counter_ns() matmul(a, b) end = time.perf_counter_ns() times.append(end - start) # perf_counter_ns returns nanoseconds, convert to ms return sum(times) / n_iters / 1e6 small_shapes_time = benchmark_naive(16, 32, 16) large_shapes_time = benchmark_naive(4096, 8192, 4096) print(f"(16, 32) by (32, 16) matmul time: {small_shapes_time:.5f} ms") print(f"(4096, 8192) by (8192, 4096) matmul time: {large_shapes_time:.5f} ms") # (16, 32) by (32, 16) matmul time: 0.01415 ms # (4096, 8192) by (8192, 4096) matmul time: 0.01350 ms 如果我们在两组不同的形状上运行此代码: 和 事实上,在我的运行中,实际上是更快的来倍增256倍大的矩阵,这个结果是非常可疑的,应该让我们思考这个代码实际上衡量了什么。 16x32 4096x8192 事实上,我们从不等 GPU matmul 完成,因为所有“准备”代码都在 CPU 上运行, 它实际上是衡量我们将matmul内核安排到GPU排队并继续我们的生活所需的时间。 time.perf_counter_ns() 我们可以做的就是添加 在 matmul 之后,迫使 CPU 等待 GPU. 但这仍然不是理想的,因为我们在 CPU 上测量过去的墙时钟时间,这包括计时。 torch.cuda.synchronize() 测量 CUDA 事件 测量 GPU 时间而不是 CPU 时间的正确方法是使用 CUDA 事件. 这些是我们可以直接插入 GPU 执行流的标记。 import torch def matmul(a, b): return a @ b def get_data(m, n, k): a = torch.randn(m, n, device="cuda", dtype=torch.bfloat16) b = torch.randn(n, k, device="cuda", dtype=torch.bfloat16) return a, b def benchmark_cuda_events(m, n, k, n_iters: int = 100): a, b = get_data(m, n, k) start_events = [torch.cuda.Event(enable_timing=True) for _ in range(n_iters)] end_events = [torch.cuda.Event(enable_timing=True) for _ in range(n_iters)] for it in range(n_iters): start_events[it].record() matmul(a, b) end_events[it].record() torch.cuda.synchronize() times = [start.elapsed_time(end) for start, end in zip(start_events, end_events)] return sum(times) / n_iters small_shapes_time = benchmark_cuda_events(16, 32, 16) large_shapes_time = benchmark_cuda_events(4096, 8192, 4096) print(f"(16, 32) by (32, 16) matmul time: {small_shapes_time:.5f} ms") print(f"(4096, 8192) by (8192, 4096) matmul time: {large_shapes_time:.5f} ms") # (16, 32) by (32, 16) matmul time: 0.02093 ms # (4096, 8192) by (8192, 4096) matmul time: 0.34025 ms 事件被插入到与我们的 matmul 内核相同的流中。 确保时刻标记被记录。 方法告诉我们开始和结束时刻标签之间的区别。 enable_timing=True .elapsed_time 现在我们从天真的方法中获得了真实的数字,而不是荒谬的快速结果. 从这个基准来看,大形状的 matmul至少需要15倍的时间。 值得注意的是,最好在开始时包括加热迭代,因为通常在第一次发射中包含一次性成本,例如JIT编译。 L2 缓存 当我们用相同的数据重复运行相同的内核时(这正是我们正在做的),该数据仍然保留在GPU的L2缓存中。 在NVIDIA GPU中,存在一个内存等级:HBM是最大的和最慢的,其次是越来越小的和更快的内存单元(L2缓存 → L1缓存或共享内存 →注册记忆)。 GPU Generation L2 per GPU HBM per GPU V100 Volta 6MB 32GB A100 Ampere 40MB 80GB H100 Hopper 50MB 80GB B200 Blackwell 126MB 192GB 第100集 返回 六MB 32GB 的 阿100 阿姆珀 四十MB 80GB 的 H100的 霍普尔 50MB 80GB 的 B200 的 黑色 第126章 第192章 由于这种缓存,我们的测量结果可能比我们在生产中看到的更好,数据在迭代之间发生变化。为了获得更现实的测量,我们需要清洗缓存。 import torch def flush_l2_cache(): # On H100, the L2 cache is 50MB, so we allocate something a bit bigger cache_size = 60 * 1024 * 1024 buffer = torch.zeros(cache_size // 4, dtype=torch.float32, device="cuda") buffer += 1 del buffer def matmul(a, b): return a @ b def get_data(m, n, k): a = torch.randn(m, n, device="cuda", dtype=torch.bfloat16) b = torch.randn(n, k, device="cuda", dtype=torch.bfloat16) return a, b def benchmark_with_cache_flush(m, n, k, n_iters: int = 100): a, b = get_data(m, n, k) for _ in range(10): matmul(a, b) # warmup torch.cuda.synchronize() start_events = [torch.cuda.Event(enable_timing=True) for _ in range(n_iters)] end_events = [torch.cuda.Event(enable_timing=True) for _ in range(n_iters)] for it in range(n_iters): flush_l2_cache() # flush the L2 between iterations start_events[it].record() matmul(a, b) end_events[it].record() torch.cuda.synchronize() times = [start.elapsed_time(end) for start, end in zip(start_events, end_events)] return sum(times) / n_iters small_shapes_time = benchmark_with_cache_flush(16, 32, 16) large_shapes_time = benchmark_with_cache_flush(4096, 8192, 4096) print(f"(16, 32) by (32, 16) matmul time: {small_shapes_time:.5f} ms") print(f"(4096, 8192) by (8192, 4096) matmul time: {large_shapes_time:.5f} ms") # (16, 32) by (32, 16) matmul time: 0.00634 ms # (4096, 8192) by (8192, 4096) matmul time: 0.40575 ms 实际上,这次运行和上一次运行之间的结果存在显著差异,但实际上,重启代码几次会改变结果,我们的差异在预期的差异范围内 - 所以对于这个特定的问题,缓存没有显着影响。 内置解决方案 我们是否需要记住这一切,只是为了衡量某些东西?答案是没有。 ,最初是由OpenAI于2021年开发的,用于以Pythonic方式编写GPU内核,现在它有一个内置的测试模块,具有适当的基准工具,可以自行完成上述所有操作。 三角形 # Source: https://github.com/triton-lang/triton/blob/main/python/triton/testing.py#L127C1-L190C64 def do_bench(fn, warmup=25, rep=100, grad_to_none=None, quantiles=None, return_mode="mean"): """ Benchmark the runtime of the provided function. By default, return the median runtime of :code:`fn` along with the 20-th and 80-th performance percentile. :param fn: Function to benchmark :type fn: Callable :param warmup: Warmup time (in ms) :type warmup: int :param rep: Repetition time (in ms) :type rep: int :param grad_to_none: Reset the gradient of the provided tensor to None :type grad_to_none: torch.tensor, optional :param quantiles: Performance percentile to return in addition to the median. :type quantiles: list[float], optional :param return_mode: The statistical measure to return. Options are "min", "max", "mean", "median", or "all". Default is "mean". :type return_mode: str """ assert return_mode in ["min", "max", "mean", "median", "all"] di = runtime.driver.active.get_device_interface() fn() di.synchronize() cache = runtime.driver.active.get_empty_cache_for_benchmark() # Estimate the runtime of the function start_event = di.Event(enable_timing=True) end_event = di.Event(enable_timing=True) start_event.record() for _ in range(5): runtime.driver.active.clear_cache(cache) fn() end_event.record() di.synchronize() estimate_ms = start_event.elapsed_time(end_event) / 5 # compute number of warmup and repeat n_warmup = max(1, int(warmup / estimate_ms)) n_repeat = max(1, int(rep / estimate_ms)) start_event = [di.Event(enable_timing=True) for i in range(n_repeat)] end_event = [di.Event(enable_timing=True) for i in range(n_repeat)] # Warm-up for _ in range(n_warmup): fn() # Benchmark for i in range(n_repeat): # we don't want `fn` to accumulate gradient values # if it contains a backward pass. So we clear the # provided gradients if grad_to_none is not None: for x in grad_to_none: x.grad = None # we clear the L2 cache before each run runtime.driver.active.clear_cache(cache) # record time of `fn` start_event[i].record() fn() end_event[i].record() # Record clocks di.synchronize() times = [s.elapsed_time(e) for s, e in zip(start_event, end_event)] return _summarize_statistics(times, quantiles, return_mode) 除了我们所涵盖的, 允许从基准中获取几个统计数据,而不仅仅是平均值,例如,可以请求中间值或第99个百分位。 do_bench 为了使用这个功能,我们可以简单地调用: import triton bench_time = triton.testing.do_bench(lambda: matmul(a, b)) CPU 绑定执行和 CUDA 图表 对于运行时间非常短的内核,发射时间可能会变得如此之大,以至于超过内核本身的时间。 让我们将 looong for-loop 添加到我们的 matmul 内核中,因为我们知道在 Python 中,for-loops 在 CPU 上非常慢: def cpu_heavy_matmul(a, b): cnt = 0 for _ in range(100_000): cnt += 1 return a @ b 我們會看到,由 (8192, 4096) matmul 的 (4096, 8192) 時間從 0.4ms 增加到 2.2ms - 超過 5x! 现在,取决于你的实际训练运行看起来是什么样子,你可能要么从基准结果中消除CPU上限,要么不。例如,如果你的训练已经与CPU有联系,那么在生产过程中也会出现相同的问题。那么,最有可能的是,消除CPU时间对于我们的测量没有意义。然而,在正常的训练运行中,CPU不应该阻碍最佳训练性能 - 通常CPU在GPU之前运行。 为了做到这一点,我们可以使用 CUDA 图表。 CUDA 图表允许我们记录一系列内核一次,并以最小的 CPU 参与重新播放它们。 ,这提供完全相同的功能. 使用这个为我们的新 matmul,我们得到的初始内核时间(在CPU过头被引入之前)。 triton.testing.do_bench_cudagraph a, b = get_data(4096, 8192, 4096) triton.testing.do_bench_cudagraph(lambda: cpu_heavy_matmul(a, b)) # back to ~0.4ms! 当然,CUDA图表有自己的局限性(静态形状、静态控制流量和 它们最适合隔离一些GPU性能,而不是e2e训练循环。 这样 这样 不仅系统失败 - 查看数据 有时,即使是完美的基准方法也无法讲述整个故事,因为生产条件与我们运行基准的孤立环境不同。 例如,假设我们希望在专家混合层中对组合 gemm 内核进行比较。在典型的 MoE 层中,代币会根据路由器的概率动态地向不同的专家进行路由。 天真地说,我们会生成随机路由器概率,并基于它们人工路由代币,然而,在整个培训过程中,我们的路由器可能不平衡,利用几个专家比其他人更为辛苦。 让我们看看向每个专家路由的代币的底层分布如何影响我们的测量。 import numpy as np import torch import triton def sample_expert_assignments( seq_len: int, num_experts: int, top_k: int, use_beta: bool = True, alpha: float = 1.0, beta: float = 5.0, ) -> tuple[torch.Tensor, torch.Tensor]: if use_beta: expert_weights = np.random.beta(alpha, beta, num_experts) expert_weights = expert_weights / expert_weights.sum() else: expert_weights = np.ones(num_experts) / num_experts gumbel_noise = -np.log(-np.log(np.random.uniform(0, 1, (seq_len, num_experts)))) log_weights = np.log(expert_weights + 1e-10) scores = log_weights[None, :] + gumbel_noise expert_indices = torch.from_numpy(np.argsort(scores, axis=1)[:, -top_k:]) tokens_per_expert = torch.bincount( expert_indices.flatten(), minlength=num_experts ).to(torch.int32) return expert_indices, tokens_per_expert def get_tokens( seq_len: int, hidden: int, num_experts: int, top_k: int, expert_indices: torch.Tensor, tokens_per_expert: torch.Tensor, device: str = "cuda", ) -> torch.Tensor: x_original = torch.randn((seq_len, hidden), dtype=torch.bfloat16, device=device) token_indices = torch.arange(seq_len, device=device)[:, None].expand(-1, top_k) expert_flat = expert_indices.to(device).flatten() token_flat = token_indices.flatten() token_sorted = token_flat[torch.argsort(expert_flat)] return x_original[token_sorted] def get_tensors( seq_len: int = 1024, hidden: int = 4096, intermediate: int = 1536, num_experts: int = 128, top_k: int = 8, use_beta: bool = True, ) -> tuple[torch.Tensor, torch.Tensor, torch.Tensor]: expert_indices, tokens_per_expert = sample_expert_assignments( seq_len, num_experts, top_k, use_beta=use_beta, ) x = get_tokens( seq_len, hidden, num_experts, top_k, expert_indices, tokens_per_expert, device="cuda", ) w = torch.randn( (num_experts, hidden, intermediate), dtype=torch.bfloat16, device="cuda" ) offsets = torch.cumsum(tokens_per_expert, dim=0).to( dtype=torch.int32, device="cuda" ) return x, w, offsets def benchmark( seq_len: int = 1024, hidden: int = 4096, intermediate: int = 1536, num_experts: int = 128, top_k: int = 8, use_beta: bool = True, num_iters: int = 250, ): x, w, offsets = get_tensors( seq_len, hidden, intermediate, num_experts, top_k, use_beta, ) return triton.testing.do_bench_cudagraph( lambda: torch._grouped_mm(x, w, offs=offsets), rep=num_iters ) params = { "seq_len": 4096, "hidden": 4096, "intermediate": 1536, "num_experts": 128, "top_k": 8, } uniform = benchmark(**params, use_beta=False) beta = benchmark(**params, use_beta=True) print(f"Uniform: {uniform:.5f} ms") print(f"Beta: {beta:.5f} ms") # Uniform: 0.96811 ms # Beta: 1.02422 ms 通过统一的代币分配,GroupedGEMM内核比我们从 beta 分布中采样时更快 6%(我们使用它来模拟不平衡的负载)!在实践中,这可能会导致我们在实际训练中表现与我们在基准测试中观察到的差异。 把一切都放在一起 总结一下: When writing the benchmarking code by hand, do not forget to: Flush the L2 cache Use CUDA events Use cuda.synchronize to wait for the completion of all GPU work Alternatively, use or for a built-in solution. triton.testing.do_bench triton.testing.do_bench_cudagraph Regardless of approach, do not forget about the underlying data you’re using. 总的来说,你的基准只有与我们实际上在生产中看到的相关性一样有用。如果我们关心CPU的重量级,不要去除它。