PyTorch モデルコードで働く人なら誰でも同じ質問を始めます。 なんでこんなに時間がかかるの? どうやったらスピードアップできるの? なんでこんなに時間がかかるの? どうやったらスピードアップできるの? あなたがMLエンジニア、研究者であろうと、週末にランダムMLリポジトリで遊ぶことを決めたばかりであろうと、あなたは最終的にコードを加速させる方法を理解しようとします。 しかし、それを行う前に、私たちはパフォーマンスを正しく測定する方法を学び、それからこれらの測定から正しい結論を出す必要があります。この記事は、CUDAまたはPyTorchコードの適切なベンチマークについてです。 実行フロー すべての実行例として H100 でマトリックスの倍数を使用しましょう。 しかしながら、測定について考える前に、用語を正確にしましょう. 「カーネル」では、GPU上で実行される機能(または実際にはどの操作も)を意味します. あなたのコードには、トレーニングループの各回転中に何十から何千もの操作から、これらの多くがあるかもしれません。 torch.zeros(16、16、デバイス=”cuda) a + b, where a and b are tensors in GPU memory. a + b, where a and b are tensors in GPU memory. A @ B flash_attention(q、k、v) and so on カーネルが GPU で実行されているにもかかわらず、その トレーニングコードを含むPythonファイルを想像すると、For-loopsやIf-statementsなどの「準備」作業と実際のカーネルの打ち上げの間を交替します。このため、CPUは前進し、カーネルに打たれるたびに、次のカーネルが列に追加されると、前のカーネルがすでに完成しているという保証はありません。 打ち上げ これにより、GPU は休憩を避けることができます: それは常にそのカーネル列で仕事を見つけることができます(同期が明示的に呼ばれる場合を除き、または実行が CPU に縛られている場合を除く)。 The Naïve Approach and Why It's Wrong Python で時間を測定することを最初に考えると、自然な本能は、タイムモジュールに到達し、このようなものを実行することです。 import time import torch def matmul(a, b): return a @ b def get_data(m, n, k): a = torch.randn(m, n, device="cuda", dtype=torch.bfloat16) b = torch.randn(n, k, device="cuda", dtype=torch.bfloat16) return a, b def benchmark_naive(m, n, k, n_iters: int = 100): a, b = get_data(m, n, k) times = [] for _ in range(n_iters): start = time.perf_counter_ns() matmul(a, b) end = time.perf_counter_ns() times.append(end - start) # perf_counter_ns returns nanoseconds, convert to ms return sum(times) / n_iters / 1e6 small_shapes_time = benchmark_naive(16, 32, 16) large_shapes_time = benchmark_naive(4096, 8192, 4096) print(f"(16, 32) by (32, 16) matmul time: {small_shapes_time:.5f} ms") print(f"(4096, 8192) by (8192, 4096) matmul time: {large_shapes_time:.5f} ms") # (16, 32) by (32, 16) matmul time: 0.01415 ms # (4096, 8192) by (8192, 4096) matmul time: 0.01350 ms このコードを2つの異なる形のセットで実行する場合: そして 実際、私の走りでは、実際には256倍の大きさのマトリックスを倍増するのが速い。 16x32 4096x8192 実際には、GPU matmul が完成するのを待つことはありません。すべての「準備」コードが CPU で実行されているので、 実際には、マットムルのカーネルをGPU列にスケジュールし、私たちの生活に進むのにかかる時間を測定しています。 time.perf_counter_ns() できることは、追加することです。 しかし、それはまだ理想的ではありません、なぜなら我々はCPU上で経過した壁時計時間を測定しているので、これはオーバーヘッドのスケジュールを含む。 torch.cuda.synchronize() CUDAイベントによる測定 CPU 時間ではなく GPU 時間を測定する正しい方法は CUDA イベントです これらは、GPU の実行ストリームに直接挿入できるマーカーです GPU は、各 CUDA イベントに到達するたびにタイムスタンプを記録し、デバイス上の実際の実行時間を提供します。 import torch def matmul(a, b): return a @ b def get_data(m, n, k): a = torch.randn(m, n, device="cuda", dtype=torch.bfloat16) b = torch.randn(n, k, device="cuda", dtype=torch.bfloat16) return a, b def benchmark_cuda_events(m, n, k, n_iters: int = 100): a, b = get_data(m, n, k) start_events = [torch.cuda.Event(enable_timing=True) for _ in range(n_iters)] end_events = [torch.cuda.Event(enable_timing=True) for _ in range(n_iters)] for it in range(n_iters): start_events[it].record() matmul(a, b) end_events[it].record() torch.cuda.synchronize() times = [start.elapsed_time(end) for start, end in zip(start_events, end_events)] return sum(times) / n_iters small_shapes_time = benchmark_cuda_events(16, 32, 16) large_shapes_time = benchmark_cuda_events(4096, 8192, 4096) print(f"(16, 32) by (32, 16) matmul time: {small_shapes_time:.5f} ms") print(f"(4096, 8192) by (8192, 4096) matmul time: {large_shapes_time:.5f} ms") # (16, 32) by (32, 16) matmul time: 0.02093 ms # (4096, 8192) by (8192, 4096) matmul time: 0.34025 ms イベントは、私たちの matmul カーネルと同じストリームに挿入されます。 タイムスタンプが記録されていることを確認します。 方法は、スタートと終了のタイムスタンプの違いを示します。 enable_timing=True .elapsed_time 今、私たちは、無知なアプローチからリアルな数値を得ています。このベンチマークから、大きい形状のmatmulは少なくとも15倍も時間がかかります。 また、最初の打ち上げには、JITコンパイルなどの1回のコストが含まれることが多いので、最初の打ち上げに加熱イテレーションを含む方が良いということも注目すべきです。 L2 キャッシュ 私たちの現在の測定のもう一つの問題は、L2キャッシュフラッシュの欠如である。同じカーネルを同じデータで繰り返し実行するとき(そしてこれはまさに私たちがしていることである)、そのデータはGPUのL2キャッシュに残ります。 NVIDIA GPU では、メモリの階層が存在します: HBM は最大で最も遅く、次いで徐々に小さく、より速いメモリユニット(L2 キャッシュ → L1 キャッシュまたは共有メモリ →レジストリメモリ)です。 GPU Generation L2 per GPU HBM per GPU V100 Volta 6MB 32GB A100 Ampere 40MB 80GB H100 Hopper 50MB 80GB B200 Blackwell 126MB 192GB V100 再び 6MB 32GB A100 アンペラ 40MB 80GB H100 ホッパー 50MB 80GB B200 ブラックウェル 126MB 192GB このキャッシュのおかげで、私たちの測定結果は生産で見たものよりも良い結果を示す可能性がありますが、データがイーテレーション間で変化します。より現実的な測定を得るためには、キャッシュを洗浄する必要があります。これを行う方法の1つは、大きなバッファを割り当てて、各イーテレーションの開始時に更新することです。これはもちろん、保証されたキャッシュフラッシュではなく、ヘウリスティックです。 import torch def flush_l2_cache(): # On H100, the L2 cache is 50MB, so we allocate something a bit bigger cache_size = 60 * 1024 * 1024 buffer = torch.zeros(cache_size // 4, dtype=torch.float32, device="cuda") buffer += 1 del buffer def matmul(a, b): return a @ b def get_data(m, n, k): a = torch.randn(m, n, device="cuda", dtype=torch.bfloat16) b = torch.randn(n, k, device="cuda", dtype=torch.bfloat16) return a, b def benchmark_with_cache_flush(m, n, k, n_iters: int = 100): a, b = get_data(m, n, k) for _ in range(10): matmul(a, b) # warmup torch.cuda.synchronize() start_events = [torch.cuda.Event(enable_timing=True) for _ in range(n_iters)] end_events = [torch.cuda.Event(enable_timing=True) for _ in range(n_iters)] for it in range(n_iters): flush_l2_cache() # flush the L2 between iterations start_events[it].record() matmul(a, b) end_events[it].record() torch.cuda.synchronize() times = [start.elapsed_time(end) for start, end in zip(start_events, end_events)] return sum(times) / n_iters small_shapes_time = benchmark_with_cache_flush(16, 32, 16) large_shapes_time = benchmark_with_cache_flush(4096, 8192, 4096) print(f"(16, 32) by (32, 16) matmul time: {small_shapes_time:.5f} ms") print(f"(4096, 8192) by (8192, 4096) matmul time: {large_shapes_time:.5f} ms") # (16, 32) by (32, 16) matmul time: 0.00634 ms # (4096, 8192) by (8192, 4096) matmul time: 0.40575 ms 実際には、この実行と前の実行の間の結果に大きな違いがありましたが、実際には、コードを数回再実行すると結果が少し変化し、私たちの違いは予想される差異の範囲内にありました - したがって、この特定の問題のためにキャッシュは重要な影響を与えませんでした。 内蔵ソリューション 何かをベンチマークするために、これらすべてを覚えなければなりませんか? 答えはノーです。 最初は、Pythonic 方式で GPU カーネルを書くために、OpenAI によって2021年に開発されたもので、現在では、上記のすべてを独自に行うことができる適切なベンチマークユーティリティを持つ組み込みテストモジュールを持っています。 Triton # Source: https://github.com/triton-lang/triton/blob/main/python/triton/testing.py#L127C1-L190C64 def do_bench(fn, warmup=25, rep=100, grad_to_none=None, quantiles=None, return_mode="mean"): """ Benchmark the runtime of the provided function. By default, return the median runtime of :code:`fn` along with the 20-th and 80-th performance percentile. :param fn: Function to benchmark :type fn: Callable :param warmup: Warmup time (in ms) :type warmup: int :param rep: Repetition time (in ms) :type rep: int :param grad_to_none: Reset the gradient of the provided tensor to None :type grad_to_none: torch.tensor, optional :param quantiles: Performance percentile to return in addition to the median. :type quantiles: list[float], optional :param return_mode: The statistical measure to return. Options are "min", "max", "mean", "median", or "all". Default is "mean". :type return_mode: str """ assert return_mode in ["min", "max", "mean", "median", "all"] di = runtime.driver.active.get_device_interface() fn() di.synchronize() cache = runtime.driver.active.get_empty_cache_for_benchmark() # Estimate the runtime of the function start_event = di.Event(enable_timing=True) end_event = di.Event(enable_timing=True) start_event.record() for _ in range(5): runtime.driver.active.clear_cache(cache) fn() end_event.record() di.synchronize() estimate_ms = start_event.elapsed_time(end_event) / 5 # compute number of warmup and repeat n_warmup = max(1, int(warmup / estimate_ms)) n_repeat = max(1, int(rep / estimate_ms)) start_event = [di.Event(enable_timing=True) for i in range(n_repeat)] end_event = [di.Event(enable_timing=True) for i in range(n_repeat)] # Warm-up for _ in range(n_warmup): fn() # Benchmark for i in range(n_repeat): # we don't want `fn` to accumulate gradient values # if it contains a backward pass. So we clear the # provided gradients if grad_to_none is not None: for x in grad_to_none: x.grad = None # we clear the L2 cache before each run runtime.driver.active.clear_cache(cache) # record time of `fn` start_event[i].record() fn() end_event[i].record() # Record clocks di.synchronize() times = [s.elapsed_time(e) for s, e in zip(start_event, end_event)] return _summarize_statistics(times, quantiles, return_mode) 私たちがカバーしたものに加えて、 ベンチマークからいくつかの統計を取得することを可能にし、平均だけでなく、例えば、メディアまたは99番のパーティイルを要求することもできます。 do_bench この機能を使用するには、単に呼び出すことができます: import triton bench_time = triton.testing.do_bench(lambda: matmul(a, b)) CPU-bound execution と CUDA グラフ 非常に短い実行時間を持つカーネルの場合、リリースオーバーヘッドは、カーネル自体の時間を超えるほど大きくなる可能性があります。これは、カーネルリリース間で重要なCPU作業(通常は非最適化)がある場合にも同じことが起こります。 次に、Python で CPU で for-loops が本当に遅いことを知っているので、私たちの matmul カーネルに looong for-loop を追加しましょう。 def cpu_heavy_matmul(a, b): cnt = 0 for _ in range(100_000): cnt += 1 return a @ b (8192、4096) によって matmul の時間は 0.4ms から 2.2ms に増加しました - 5 倍以上! 現在、実際のトレーニングを実行する際には、ベンチマークの結果からCPUオーバーヘッドを削除するか否かを検討する場合があります。 たとえば、トレーニングが既にCPUに結びついている場合、同様の問題が生産実行中にも発生します。 したがって、おそらく、私たちの測定のためのCPU時間を削除することは意味がありません。 しかし、通常のトレーニングを実行する場合、CPUは最適なトレーニングパフォーマンスの道を妨げるべきではありません - 通常はCPUがGPUより先行しています。 その場合、その1つのカーネルが重要なオーバーヘッドを持っている場合でも、それはGPU上のカーネルのオーバーヘッドを削除するつもりはありません。 このようにするには、CUDA グラフを使用できます CUDA グラフでは、一度に一連のカーネルを記録し、最小限の CPU 関与で再生することができます。 これを新しい matmul に使用すると、初期のカーネル時間(CPU overhead が導入される前に)が得られます。 triton.testing.do_bench_cudagraph a, b = get_data(4096, 8192, 4096) triton.testing.do_bench_cudagraph(lambda: cpu_heavy_matmul(a, b)) # back to ~0.4ms! もちろん、CUDAグラフは独自の制約(静的形状、静的制御フロー、および 彼らはe2eトレーニングループよりもGPUパフォーマンスのいくつかの部分を分離するのに最も適しています。 こんな こんな システムだけが失敗するのではなく、データを見る たまに、完璧なベンチマーク方法論さえ、ベンチマークを実行する孤立した環境とは生産条件が異なるため、全体のストーリーを語ることはありません。 たとえば、Expert-Mix レイヤー内のグループ化された gemm カーネルをベンチマークしたいとします。典型的な MoE レイヤーでは、トークンはルーターの確率に基づいて異なる専門家にダイナミックにルーティングされます。結果として、各専門家(およびグループ化された gemm カーネル)によって見られる実際のワークロードは、ルーティングのバランスを取るかに依存します。 無邪気なことに、我々はランダムなルーターの確率を生成し、それらに基づいて人工的にトークンをルーティングします。 それぞれの専門家にリダイレクトされたトークンの基本的な配布が、私たちの測定にどのように影響するかを見てみましょう。 import numpy as np import torch import triton def sample_expert_assignments( seq_len: int, num_experts: int, top_k: int, use_beta: bool = True, alpha: float = 1.0, beta: float = 5.0, ) -> tuple[torch.Tensor, torch.Tensor]: if use_beta: expert_weights = np.random.beta(alpha, beta, num_experts) expert_weights = expert_weights / expert_weights.sum() else: expert_weights = np.ones(num_experts) / num_experts gumbel_noise = -np.log(-np.log(np.random.uniform(0, 1, (seq_len, num_experts)))) log_weights = np.log(expert_weights + 1e-10) scores = log_weights[None, :] + gumbel_noise expert_indices = torch.from_numpy(np.argsort(scores, axis=1)[:, -top_k:]) tokens_per_expert = torch.bincount( expert_indices.flatten(), minlength=num_experts ).to(torch.int32) return expert_indices, tokens_per_expert def get_tokens( seq_len: int, hidden: int, num_experts: int, top_k: int, expert_indices: torch.Tensor, tokens_per_expert: torch.Tensor, device: str = "cuda", ) -> torch.Tensor: x_original = torch.randn((seq_len, hidden), dtype=torch.bfloat16, device=device) token_indices = torch.arange(seq_len, device=device)[:, None].expand(-1, top_k) expert_flat = expert_indices.to(device).flatten() token_flat = token_indices.flatten() token_sorted = token_flat[torch.argsort(expert_flat)] return x_original[token_sorted] def get_tensors( seq_len: int = 1024, hidden: int = 4096, intermediate: int = 1536, num_experts: int = 128, top_k: int = 8, use_beta: bool = True, ) -> tuple[torch.Tensor, torch.Tensor, torch.Tensor]: expert_indices, tokens_per_expert = sample_expert_assignments( seq_len, num_experts, top_k, use_beta=use_beta, ) x = get_tokens( seq_len, hidden, num_experts, top_k, expert_indices, tokens_per_expert, device="cuda", ) w = torch.randn( (num_experts, hidden, intermediate), dtype=torch.bfloat16, device="cuda" ) offsets = torch.cumsum(tokens_per_expert, dim=0).to( dtype=torch.int32, device="cuda" ) return x, w, offsets def benchmark( seq_len: int = 1024, hidden: int = 4096, intermediate: int = 1536, num_experts: int = 128, top_k: int = 8, use_beta: bool = True, num_iters: int = 250, ): x, w, offsets = get_tensors( seq_len, hidden, intermediate, num_experts, top_k, use_beta, ) return triton.testing.do_bench_cudagraph( lambda: torch._grouped_mm(x, w, offs=offsets), rep=num_iters ) params = { "seq_len": 4096, "hidden": 4096, "intermediate": 1536, "num_experts": 128, "top_k": 8, } uniform = benchmark(**params, use_beta=False) beta = benchmark(**params, use_beta=True) print(f"Uniform: {uniform:.5f} ms") print(f"Beta: {beta:.5f} ms") # Uniform: 0.96811 ms # Beta: 1.02422 ms ユニークなトークン割り当てにより、GroupedGEMM カーネルは、ベータ配布からサンプルを採取した時よりも 6% 速くなります(バランスの取れない負荷をモデル化するために使用します)! 実際には、これは、実際のトレーニングの実績と、ベンチマークで観察したものとの間の差異を見ることができます。 すべてを組み合わせる まとめると: When writing the benchmarking code by hand, do not forget to: Flush the L2 cache Use CUDA events Use cuda.synchronize to wait for the completion of all GPU work Alternatively, use or for a built-in solution. triton.testing.do_bench triton.testing.do_bench_cudagraph Regardless of approach, do not forget about the underlying data you’re using. 全体として、あなたのベンチマークは、実際の生産実行で私たちが実際に見るものとの関連性と同じくらい役に立ちます。