PyTorch 모델 코드를 사용하는 모든 사람들은 다음과 같은 질문을 시작합니다. 왜 이렇게 오래 걸리나요?어떻게 훈련을 더 빨리 할 수 있습니까? 왜 이렇게 오래 걸리나요?어떻게 훈련을 더 빨리 할 수 있습니까? ML 엔지니어이든, 연구원이든, 주말에 무작위 ML 저장소로 놀기로 결정하든, 결국 코드를 가속화하는 방법을 이해하려고 노력할 것입니다. 그러나 우리가 그것을 할 수 있기 전에 우리는 성능을 올바르게 측정하는 방법을 배워야합니다.그리고 이러한 측정에서 올바른 결론을 내리십시오.이 기사는 CUDA 또는 PyTorch 코드를 올바르게 벤치마킹하는 것에 관한 것입니다. 실행 흐름 H100에서 매트릭스 번호를 전체 실행 예제로 사용합시다. 그러나, 우리가 측정에 대해 생각하기 전에, 단어를 똑바로 가져 가자. "코넬"은 GPU에서 실행되는 기능 (또는 실제로 모든 작업)을 의미합니다. torch.zeros(16, 16), 장치=”cuda) a + b, a 및 b가 GPU 메모리의 텐서 A @ B 플래시_주의(q, k, v) 그리고 그렇게 GPU에서 실행되는 핵심에도 불구하고, 그것은 우리가 훈련 코드와 함께 파이썬 파일을 상상한다면, 그것은 for-loops 또는 if-statements와 같은 "예비"작업과 실제 커널 런칭 사이를 교환합니다.이 때문에 CPU는 앞으로 실행하고 매번 커널을 때릴 때마다 그 커널을 GPU에서 실행하기 위해 스케줄합니다.다음 커널이 차단에 추가되면 이전 커널이 이미 완료되었다는 보장은 없습니다. 발사 이렇게하면 GPU가 앉아있는 것을 피할 수 있습니다 : 그것은 항상 그 커널 꼬리에 작업을 찾을 수 있습니다 (동기화가 명시적으로 호출되거나 실행이 CPU에 의존 할 때 제외). 순진한 접근법과 그것이 잘못된 이유 우리가 처음으로 Python에서 시간을 측정하는 것에 대해 생각할 때, 자연스러운 본능은 시간 모듈에 도달하고 다음과 같은 것을 실행하는 것입니다. import time import torch def matmul(a, b): return a @ b def get_data(m, n, k): a = torch.randn(m, n, device="cuda", dtype=torch.bfloat16) b = torch.randn(n, k, device="cuda", dtype=torch.bfloat16) return a, b def benchmark_naive(m, n, k, n_iters: int = 100): a, b = get_data(m, n, k) times = [] for _ in range(n_iters): start = time.perf_counter_ns() matmul(a, b) end = time.perf_counter_ns() times.append(end - start) # perf_counter_ns returns nanoseconds, convert to ms return sum(times) / n_iters / 1e6 small_shapes_time = benchmark_naive(16, 32, 16) large_shapes_time = benchmark_naive(4096, 8192, 4096) print(f"(16, 32) by (32, 16) matmul time: {small_shapes_time:.5f} ms") print(f"(4096, 8192) by (8192, 4096) matmul time: {large_shapes_time:.5f} ms") # (16, 32) by (32, 16) matmul time: 0.01415 ms # (4096, 8192) by (8192, 4096) matmul time: 0.01350 ms 이 코드를 두 개의 다른 모양 세트에서 실행한다면: 그리고 , 우리는 matmul 시간에 근본적으로 차이가 없다는 것을 알게 될 것입니다. 사실, 내 실행에서, 256 배 더 큰 매트릭스를 곱하는 것이 실제로 더 빠릅니다.이 결과는 매우 의심스럽고이 코드가 실제로 무엇을 측정하는지 생각하게해야합니다. 16x32 4096x8192 사실, 우리는 GPU matmul이 끝날 때까지 기다리지 않는다.모든 “예비”코드는 CPU에서 실행되기 때문에, 그것은 실제로 우리가 GPU 차단에 matmul 커널을 예약하고 우리의 삶을 진행하는 데 걸리는 시간을 측정하고 있습니다. time.perf_counter_ns() 우리가 할 수있는 것은 추가하는 것입니다. 그러나 이것은 여전히 이상적이지 않다, 왜냐하면 우리는 CPU에서 과거의 벽 시계 시간을 측정하고 있기 때문에 CPU가 GPU를 기다리도록 강요합니다.We do not isolate the actual GPU execution this way. torch.cuda.synchronize() CUDA 이벤트와의 측정 CPU 시간 대신 GPU 시간을 측정하는 올바른 방법은 CUDA 이벤트입니다.이 마커는 우리가 GPU의 실행 스트림에 직접 삽입할 수 있습니다.GPU는 각 CUDA 이벤트에 도달할 때마다 타임스탬프를 기록하여 장치에서 실제 실행 시간을 제공합니다. import torch def matmul(a, b): return a @ b def get_data(m, n, k): a = torch.randn(m, n, device="cuda", dtype=torch.bfloat16) b = torch.randn(n, k, device="cuda", dtype=torch.bfloat16) return a, b def benchmark_cuda_events(m, n, k, n_iters: int = 100): a, b = get_data(m, n, k) start_events = [torch.cuda.Event(enable_timing=True) for _ in range(n_iters)] end_events = [torch.cuda.Event(enable_timing=True) for _ in range(n_iters)] for it in range(n_iters): start_events[it].record() matmul(a, b) end_events[it].record() torch.cuda.synchronize() times = [start.elapsed_time(end) for start, end in zip(start_events, end_events)] return sum(times) / n_iters small_shapes_time = benchmark_cuda_events(16, 32, 16) large_shapes_time = benchmark_cuda_events(4096, 8192, 4096) print(f"(16, 32) by (32, 16) matmul time: {small_shapes_time:.5f} ms") print(f"(4096, 8192) by (8192, 4096) matmul time: {large_shapes_time:.5f} ms") # (16, 32) by (32, 16) matmul time: 0.02093 ms # (4096, 8192) by (8192, 4096) matmul time: 0.34025 ms 이벤트는 우리의 matmul 커널과 같은 스트림에 삽입됩니다.Setting 타임스탬프가 기록되어 있는지 확인합니다.The 방법은 우리에게 시작과 종료 타임스탬프 사이의 차이를 제공합니다. enable_timing=True .elapsed_time 이제 우리는 순진한 접근법에서 진짜 숫자를 얻고, 절대적으로 빠른 결과를 얻지 못합니다.이 기준에서, 큰 모양의 matmul은 적어도 15 배 더 오래 걸립니다. 또한 JIT 컴파일과 같은 첫 번째 발사에 종종 일회용 비용이 포함되어 있기 때문에 처음에 가열 반복을 포함하는 것이 좋습니다. L2 캐시 현재의 측정에 대한 또 다른 문제는 L2 캐시 플러쉬의 부재입니다.우리가 동일한 데이터를 사용하여 동일한 커널을 반복적으로 실행할 때 (그리고 이것은 정확히 우리가하는 일입니다), 그 데이터는 GPU의 L2 캐시에 남아 있습니다. NVIDIA GPU에는 메모리 계층이 있습니다 : HBM는 가장 크고 가장 느린 메모리 단위이며 점차적으로 작고 빠른 메모리 단위 (L2 캐시 → L1 캐시 또는 공유 메모리 → 레지스트리 메모리)가 따릅니다. GPU Generation L2 per GPU HBM per GPU V100 Volta 6MB 32GB A100 Ampere 40MB 80GB H100 Hopper 50MB 80GB B200 Blackwell 126MB 192GB V100 다시 6MB 32GB A100 앰프 40MB 80GB H100 호퍼 50MB 80GB B200 블랙웰 126 MB 192GB 이 캐시 때문에, 우리의 측정 결과는 우리가 생산에서 볼 수있는 것보다 더 나은 결과를 보여줄 수 있습니다, 데이터가 반복 사이에 변화합니다. 더 현실적인 측정을 얻으려면 캐시를 씻어야합니다. 그렇게하는 한 가지 방법은 큰 버퍼를 할당하고 각 반복의 시작 부분에 업데이트하는 것입니다.이것은 물론 보장된 캐시 플러시보다 heuristic입니다. import torch def flush_l2_cache(): # On H100, the L2 cache is 50MB, so we allocate something a bit bigger cache_size = 60 * 1024 * 1024 buffer = torch.zeros(cache_size // 4, dtype=torch.float32, device="cuda") buffer += 1 del buffer def matmul(a, b): return a @ b def get_data(m, n, k): a = torch.randn(m, n, device="cuda", dtype=torch.bfloat16) b = torch.randn(n, k, device="cuda", dtype=torch.bfloat16) return a, b def benchmark_with_cache_flush(m, n, k, n_iters: int = 100): a, b = get_data(m, n, k) for _ in range(10): matmul(a, b) # warmup torch.cuda.synchronize() start_events = [torch.cuda.Event(enable_timing=True) for _ in range(n_iters)] end_events = [torch.cuda.Event(enable_timing=True) for _ in range(n_iters)] for it in range(n_iters): flush_l2_cache() # flush the L2 between iterations start_events[it].record() matmul(a, b) end_events[it].record() torch.cuda.synchronize() times = [start.elapsed_time(end) for start, end in zip(start_events, end_events)] return sum(times) / n_iters small_shapes_time = benchmark_with_cache_flush(16, 32, 16) large_shapes_time = benchmark_with_cache_flush(4096, 8192, 4096) print(f"(16, 32) by (32, 16) matmul time: {small_shapes_time:.5f} ms") print(f"(4096, 8192) by (8192, 4096) matmul time: {large_shapes_time:.5f} ms") # (16, 32) by (32, 16) matmul time: 0.00634 ms # (4096, 8192) by (8192, 4096) matmul time: 0.40575 ms 실제로,이 실행과 이전 실행 사이의 결과에 상당한 차이가 있었지만, 실제로 코드를 여러 번 다시 실행하면 결과가 약간 변경되며, 우리의 차이는 예상 변수 내에 있습니다. 내장된 솔루션 뭔가를 벤치마크하기 위해서 이 모든 것을 기억해야 하는가? 대답은 그렇지 않다. , 원래 OpenAI에 의해 Pythonic 방식으로 GPU 커널을 작성하기 위해 2021 년에 개발되었습니다.오늘날에는 올바른 벤치마킹 유틸리티가있는 내장 테스트 모듈이 있으며, 위의 모든 것을 스스로 할 수 있습니다. 트리톤 # Source: https://github.com/triton-lang/triton/blob/main/python/triton/testing.py#L127C1-L190C64 def do_bench(fn, warmup=25, rep=100, grad_to_none=None, quantiles=None, return_mode="mean"): """ Benchmark the runtime of the provided function. By default, return the median runtime of :code:`fn` along with the 20-th and 80-th performance percentile. :param fn: Function to benchmark :type fn: Callable :param warmup: Warmup time (in ms) :type warmup: int :param rep: Repetition time (in ms) :type rep: int :param grad_to_none: Reset the gradient of the provided tensor to None :type grad_to_none: torch.tensor, optional :param quantiles: Performance percentile to return in addition to the median. :type quantiles: list[float], optional :param return_mode: The statistical measure to return. Options are "min", "max", "mean", "median", or "all". Default is "mean". :type return_mode: str """ assert return_mode in ["min", "max", "mean", "median", "all"] di = runtime.driver.active.get_device_interface() fn() di.synchronize() cache = runtime.driver.active.get_empty_cache_for_benchmark() # Estimate the runtime of the function start_event = di.Event(enable_timing=True) end_event = di.Event(enable_timing=True) start_event.record() for _ in range(5): runtime.driver.active.clear_cache(cache) fn() end_event.record() di.synchronize() estimate_ms = start_event.elapsed_time(end_event) / 5 # compute number of warmup and repeat n_warmup = max(1, int(warmup / estimate_ms)) n_repeat = max(1, int(rep / estimate_ms)) start_event = [di.Event(enable_timing=True) for i in range(n_repeat)] end_event = [di.Event(enable_timing=True) for i in range(n_repeat)] # Warm-up for _ in range(n_warmup): fn() # Benchmark for i in range(n_repeat): # we don't want `fn` to accumulate gradient values # if it contains a backward pass. So we clear the # provided gradients if grad_to_none is not None: for x in grad_to_none: x.grad = None # we clear the L2 cache before each run runtime.driver.active.clear_cache(cache) # record time of `fn` start_event[i].record() fn() end_event[i].record() # Record clocks di.synchronize() times = [s.elapsed_time(e) for s, e in zip(start_event, end_event)] return _summarize_statistics(times, quantiles, return_mode) 우리가 다루고 있는 것 외에도, 예를 들어, 한 사람은 중간 또는 99 퍼센티일을 요청할 수 있습니다. do_bench 이 기능을 사용하려면 간단히 호출 할 수 있습니다 : import triton bench_time = triton.testing.do_bench(lambda: matmul(a, b)) CPU-bound execution 및 CUDA 그래프 매우 짧은 실행 시간을 가진 커널의 경우, 시작 오버 헤드가 커널 시간 자체를 초과할 정도로 커질 수 있습니다.이것은 커널 출시 사이에 상당한 CPU 작업 (일반적으로 최적화되지 않은)이있을 때 일어납니다. 우리는 for-loops가 Python에서 CPU에서 정말 느린다는 것을 알고 있기 때문에 우리의 matmul 커널에 looong for-loop을 추가합시다 : def cpu_heavy_matmul(a, b): cnt = 0 for _ in range(100_000): cnt += 1 return a @ b 우리는 (8192, 4096)에 의해 matmul 시간 (4096, 8192)이 0.4ms에서 2.2ms로 증가했다는 것을 알게 될 것입니다 - 5x 이상! 이제 실제 훈련 실행이 어떻게 보이는지에 따라 벤치마킹 결과에서 CPU-overhead를 제거하거나 제거하고 싶을 수도 있습니다. 예를 들어, 훈련이 이미 CPU-bound인 경우 생산 실행 중에도 동일한 문제가 발생할 것입니다. 그런 다음, 대부분의 경우, 우리의 측정에 대한 CPU 시간을 제거하는 것이 의미가 없습니다. 그러나 정상적인 훈련 실행에서 CPU는 최적의 훈련 성능을 방해해서는 안됩니다 - 일반적으로 CPU는 GPU보다 앞서 실행됩니다. 그 경우, 그 하나의 핵심이 상당한 overhead을 가지고 있더라도 GPU에서 kernel의 runtime를 변경하지 않을 것입니다. 이를 위해 우리는 CUDA 차트를 사용할 수 있습니다. CUDA 차트는 일련의 핵을 한 번 기록하고 최소한의 CPU 참여로 재생할 수 있습니다. 이것을 우리의 새로운 matmul에 사용하여, 우리는 초기 코너 시간 (CPU overhead가 도입되기 전에)을 얻습니다. triton.testing.do_bench_cudagraph a, b = get_data(4096, 8192, 4096) triton.testing.do_bench_cudagraph(lambda: cpu_heavy_matmul(a, b)) # back to ~0.4ms! 물론 CUDA 차트는 자체 제한 사항 (정적 모양, 정적 제어 흐름 및 그들은 e2e 훈련 루프보다 GPU 성능의 일부 조각을 고립하는 데 가장 적합합니다. 이렇게 이렇게 시스템만 실패하지 않는다 - 데이터를보고 때때로 완벽한 벤치마킹 방법론조차도 생산 조건이 우리가 벤치마킹을 실행하는 고립된 환경과 다르기 때문에 전체 이야기를 말하지 못합니다. 예를 들어, 전문가 혼합 계층 내부의 그룹화된 gemm 코널을 벤치마크하려고 한다고 가정 해 봅시다. 전형적인 MoE 계층에서 토큰은 라우터 확률에 따라 다른 전문가에게 역동적으로 라우팅됩니다. 결과적으로 각 전문가 (그리고 그룹화된 gemm 코널)가 본 실제 작업량은 라우팅이 얼마나 균형 잡힌 지에 달려 있습니다. 순진하게, 우리는 무작위 라우터 확률을 생성하고 인공적으로 라우터 토큰을 기반으로합니다.그러나, 훈련 기간 동안, 우리의 라우터는 불균형이 될 수 있으며, 다른 사람들보다 훨씬 더 많은 전문가를 사용합니다. 각 전문가에게 전달되는 토큰의 기본 분배가 우리의 측정에 어떻게 영향을 미치는지 살펴보자. import numpy as np import torch import triton def sample_expert_assignments( seq_len: int, num_experts: int, top_k: int, use_beta: bool = True, alpha: float = 1.0, beta: float = 5.0, ) -> tuple[torch.Tensor, torch.Tensor]: if use_beta: expert_weights = np.random.beta(alpha, beta, num_experts) expert_weights = expert_weights / expert_weights.sum() else: expert_weights = np.ones(num_experts) / num_experts gumbel_noise = -np.log(-np.log(np.random.uniform(0, 1, (seq_len, num_experts)))) log_weights = np.log(expert_weights + 1e-10) scores = log_weights[None, :] + gumbel_noise expert_indices = torch.from_numpy(np.argsort(scores, axis=1)[:, -top_k:]) tokens_per_expert = torch.bincount( expert_indices.flatten(), minlength=num_experts ).to(torch.int32) return expert_indices, tokens_per_expert def get_tokens( seq_len: int, hidden: int, num_experts: int, top_k: int, expert_indices: torch.Tensor, tokens_per_expert: torch.Tensor, device: str = "cuda", ) -> torch.Tensor: x_original = torch.randn((seq_len, hidden), dtype=torch.bfloat16, device=device) token_indices = torch.arange(seq_len, device=device)[:, None].expand(-1, top_k) expert_flat = expert_indices.to(device).flatten() token_flat = token_indices.flatten() token_sorted = token_flat[torch.argsort(expert_flat)] return x_original[token_sorted] def get_tensors( seq_len: int = 1024, hidden: int = 4096, intermediate: int = 1536, num_experts: int = 128, top_k: int = 8, use_beta: bool = True, ) -> tuple[torch.Tensor, torch.Tensor, torch.Tensor]: expert_indices, tokens_per_expert = sample_expert_assignments( seq_len, num_experts, top_k, use_beta=use_beta, ) x = get_tokens( seq_len, hidden, num_experts, top_k, expert_indices, tokens_per_expert, device="cuda", ) w = torch.randn( (num_experts, hidden, intermediate), dtype=torch.bfloat16, device="cuda" ) offsets = torch.cumsum(tokens_per_expert, dim=0).to( dtype=torch.int32, device="cuda" ) return x, w, offsets def benchmark( seq_len: int = 1024, hidden: int = 4096, intermediate: int = 1536, num_experts: int = 128, top_k: int = 8, use_beta: bool = True, num_iters: int = 250, ): x, w, offsets = get_tensors( seq_len, hidden, intermediate, num_experts, top_k, use_beta, ) return triton.testing.do_bench_cudagraph( lambda: torch._grouped_mm(x, w, offs=offsets), rep=num_iters ) params = { "seq_len": 4096, "hidden": 4096, "intermediate": 1536, "num_experts": 128, "top_k": 8, } uniform = benchmark(**params, use_beta=False) beta = benchmark(**params, use_beta=True) print(f"Uniform: {uniform:.5f} ms") print(f"Beta: {beta:.5f} ms") # Uniform: 0.96811 ms # Beta: 1.02422 ms 유일한 토큰 할당을 사용하면 GroupedGEMM 커널은 베타 배포에서 샘플을 얻을 때보다 6 % 빠릅니다 (우리는 불균형화 된 부하를 모델링하는 데 사용합니다)! 모든 것을 함께 넣어주는 요약하기 : When writing the benchmarking code by hand, do not forget to: Flush the L2 cache Use CUDA events Use cuda.synchronize to wait for the completion of all GPU work Alternatively, use or for a built-in solution. triton.testing.do_bench triton.testing.do_bench_cudagraph Regardless of approach, do not forget about the underlying data you’re using. 전체적으로, 귀하의 벤치마크는 실제 생산 실행에서 실제로 볼 수있는 것에 대한 관련성만큼 유용합니다.우리가 CPU 오버 헤드에 관심이 있다면, 그것을 제거하지 마십시오.