أي شخص يعمل مع رمز نموذج PyTorch يبدأ في سؤال نفس الأسئلة: لماذا يستغرق هذا وقتا طويلا؟ كيف أكون أسرع في التدريب؟ لماذا يستغرق هذا وقتا طويلا؟ كيف أكون أسرع في التدريب؟ سواء كنت مهندسًا ML أو الباحث أو قررت فقط اللعب مع مخزون ML العشوائي خلال نهاية الأسبوع ، فستحاول في النهاية فهم كيفية تسريع رمزك. ومع ذلك ، قبل أن نتمكن من القيام بذلك ، علينا أن نتعلم كيفية قياس الأداء بشكل صحيح ، ثم إلقاء النتائج الصحيحة من هذه التقييمات. التنفيذ flow دعونا نستخدم مزيج التعديل على H100 كمثال يعمل في جميع أنحاء. ومع ذلك ، قبل أن نفكر في قياسات ، دعونا نضع معالجة مباشرة.ب "المعدة" نحن نفهم وظيفة (أو بالفعل أي عملية) التي تعمل على GPU. قد يكون لدينا العديد من هذه الكود ، من عشرات إلى آلاف العمليات خلال كل إعادة التدوير من حلقات التدريب ، على سبيل المثال: torch.zeros ((16، 16)، device=”cuda) a + b ، حيث a و b هي التضخمات في ذاكرة GPU A @ B flash_attention ( q ، k ، v ) وهكذا في على الرغم من أن الكورنيش يتم تنفيذها على GPU ، فإن يتم التحكم بها من قبل CPU. إذا تصورنا ملف Python مع رمز التدريب، فإنه يتبدل بين العملية "مبادلة" مثل "بروتوكولات" أو "أو" ، والانتقال إلى الكورنيش الفعلي. بسبب ذلك، يتم تشغيل الكورنيش إلى الأمام، وعندما يصل الكورنيش إلى الكورنيش، فإنه يخطط لتشغيل الكورنيش على GPU من خلال إضافة الكورنيش إلى صفوف. إطلاق هذا يتيح GPU تجنب الجلوس: فإنه يمكن دائمًا العثور على العمل في هذا الجانب من الكمبيوتر (إلا عندما يتم إدخال تلقائيًا أو عندما يتم إجراء عملية التحكم في CPU). السلوك البشري ولماذا هو خطأ عندما نتفكّر أولاً في قياس الوقت في Python ، فإنّ الحركة الطبيعية هي الوصول إلى الوحدات الزمنية وتشغيل شيء ما مثل هذا: import time import torch def matmul(a, b): return a @ b def get_data(m, n, k): a = torch.randn(m, n, device="cuda", dtype=torch.bfloat16) b = torch.randn(n, k, device="cuda", dtype=torch.bfloat16) return a, b def benchmark_naive(m, n, k, n_iters: int = 100): a, b = get_data(m, n, k) times = [] for _ in range(n_iters): start = time.perf_counter_ns() matmul(a, b) end = time.perf_counter_ns() times.append(end - start) # perf_counter_ns returns nanoseconds, convert to ms return sum(times) / n_iters / 1e6 small_shapes_time = benchmark_naive(16, 32, 16) large_shapes_time = benchmark_naive(4096, 8192, 4096) print(f"(16, 32) by (32, 16) matmul time: {small_shapes_time:.5f} ms") print(f"(4096, 8192) by (8192, 4096) matmul time: {large_shapes_time:.5f} ms") # (16, 32) by (32, 16) matmul time: 0.01415 ms # (4096, 8192) by (8192, 4096) matmul time: 0.01350 ms إذا قمت بتشغيل هذا الكود على مجموعة متنوعة من أشكال: و في الواقع، في رحلتي، فإنه في الواقع أسرع من تكرار الأقمار الصناعية التي هي 256 مرات أكبر. 16x32 4096x8192 في الواقع ، لا ننتظر حتى ينتهي GPU matmul. Since all the “preparatory” code runs on the CPU, هو في الواقع قياس الوقت الذي يستغرقنا لتخطيط الكورنيش MATMUL إلى صفوف GPU وتواصل مع حياتنا. time.perf_counter_ns() ما يمكننا فعله هو إضافة لكن هذا لا يزال مثالي ، لأننا نقوم بتقييم الوقت المتبقي على الشاشة على CPU ، والذي يشمل التخطيط على الرأس. torch.cuda.synchronize() تقييم الأحداث مع CUDA الطريقة الصحيحة لتقييم الوقت GPU بدلاً من الوقت CPU هي مع الأحداث CUDA. هذه هي العلامات التي يمكن إدخالها مباشرة في تدفق التنفيذ GPU. import torch def matmul(a, b): return a @ b def get_data(m, n, k): a = torch.randn(m, n, device="cuda", dtype=torch.bfloat16) b = torch.randn(n, k, device="cuda", dtype=torch.bfloat16) return a, b def benchmark_cuda_events(m, n, k, n_iters: int = 100): a, b = get_data(m, n, k) start_events = [torch.cuda.Event(enable_timing=True) for _ in range(n_iters)] end_events = [torch.cuda.Event(enable_timing=True) for _ in range(n_iters)] for it in range(n_iters): start_events[it].record() matmul(a, b) end_events[it].record() torch.cuda.synchronize() times = [start.elapsed_time(end) for start, end in zip(start_events, end_events)] return sum(times) / n_iters small_shapes_time = benchmark_cuda_events(16, 32, 16) large_shapes_time = benchmark_cuda_events(4096, 8192, 4096) print(f"(16, 32) by (32, 16) matmul time: {small_shapes_time:.5f} ms") print(f"(4096, 8192) by (8192, 4096) matmul time: {large_shapes_time:.5f} ms") # (16, 32) by (32, 16) matmul time: 0.02093 ms # (4096, 8192) by (8192, 4096) matmul time: 0.34025 ms يتم إدخال الأحداث في نفس الإرسال مثل القناع Matmul. يتيح للمستخدمين إدخال الملفات في الوقت المناسب. • المنهج يوفر لنا الفرق بين وقت البدء والنتائج. enable_timing=True .elapsed_time الآن نستطيع الحصول على الأرقام الحقيقية، وليس نتائج سريعة جدًا من النهج البسيط. من هذا النمط، matmul مع أشكال كبيرة يستغرق على الأقل 15 مرة أطول. من المهم أيضا أن نلاحظ أن من الأفضل تضمين تكرار التدفئة في البداية ، لأنه غالبا ما يكون هناك تكاليف مرة واحدة تشمل في البدء الأول ، مثل جمعية JIT. كاميرا L2 مشكلة أخرى مع قياساتنا الحالية هي عدم وجود حفرة L2 Cache. عندما نقوم بتشغيل نفس الكورنيش مرتين مع نفس البيانات (وذلك هو ما نفعله بالضبط)، فإن البيانات تظل في حفرة L2 في GPU. في GPU NVIDIA ، هناك تقسيم الذاكرة: HBM هو أكبر وأكثر صعوبة ، وتتبع وحدات الذاكرة أصغر وأسرع تدريجيا (L2 Cache → L1 Cache أو L1 Memory → Registry Memory). GPU Generation L2 per GPU HBM per GPU V100 Volta 6MB 32GB A100 Ampere 40MB 80GB H100 Hopper 50MB 80GB B200 Blackwell 126MB 192GB V100 العودة 6 MB 32GB A100 Ampere 40 MB 80GB H100 هوبر 50 MB 80GB B200 Blackwell 126 MB 192GB بسبب هذه المفاتيح، قد تظهر التقييمات لدينا نتائج أفضل مما نرى في الإنتاج، حيث تتغير البيانات بين التكرارات.لكن للحصول على التقييمات أكثر واقعية، علينا أن نغسل المفاتيح. import torch def flush_l2_cache(): # On H100, the L2 cache is 50MB, so we allocate something a bit bigger cache_size = 60 * 1024 * 1024 buffer = torch.zeros(cache_size // 4, dtype=torch.float32, device="cuda") buffer += 1 del buffer def matmul(a, b): return a @ b def get_data(m, n, k): a = torch.randn(m, n, device="cuda", dtype=torch.bfloat16) b = torch.randn(n, k, device="cuda", dtype=torch.bfloat16) return a, b def benchmark_with_cache_flush(m, n, k, n_iters: int = 100): a, b = get_data(m, n, k) for _ in range(10): matmul(a, b) # warmup torch.cuda.synchronize() start_events = [torch.cuda.Event(enable_timing=True) for _ in range(n_iters)] end_events = [torch.cuda.Event(enable_timing=True) for _ in range(n_iters)] for it in range(n_iters): flush_l2_cache() # flush the L2 between iterations start_events[it].record() matmul(a, b) end_events[it].record() torch.cuda.synchronize() times = [start.elapsed_time(end) for start, end in zip(start_events, end_events)] return sum(times) / n_iters small_shapes_time = benchmark_with_cache_flush(16, 32, 16) large_shapes_time = benchmark_with_cache_flush(4096, 8192, 4096) print(f"(16, 32) by (32, 16) matmul time: {small_shapes_time:.5f} ms") print(f"(4096, 8192) by (8192, 4096) matmul time: {large_shapes_time:.5f} ms") # (16, 32) by (32, 16) matmul time: 0.00634 ms # (4096, 8192) by (8192, 4096) matmul time: 0.40575 ms في الواقع، كان هناك فرق كبير في النتيجة بين هذا الإجراء والخطوة السابقة، ولكن في الواقع إعادة إجراء الكود عدة مرات يغير النتائج قليلاً، والانخفاض لدينا داخل التوازن المتوقع - لذلك لم يكن له تأثير كبير على هذه المشكلة ذات الصلة. الحلول المدمجة هل يجب أن نتذكر كل هذا فقط لتقييم شيء ما؟ الإجابة ليست. ، والتي تم تطويرها في الأصل من قبل OpenAI في عام 2021 للكتابة الكوريوس GPU بطريقة Pythonic. اليوم لديها ماكينة اختبار متكاملة مع أدوات التقييم المناسبة، والتي قادرة على فعل كل ما هو أعلى بنفسها. تريتون # Source: https://github.com/triton-lang/triton/blob/main/python/triton/testing.py#L127C1-L190C64 def do_bench(fn, warmup=25, rep=100, grad_to_none=None, quantiles=None, return_mode="mean"): """ Benchmark the runtime of the provided function. By default, return the median runtime of :code:`fn` along with the 20-th and 80-th performance percentile. :param fn: Function to benchmark :type fn: Callable :param warmup: Warmup time (in ms) :type warmup: int :param rep: Repetition time (in ms) :type rep: int :param grad_to_none: Reset the gradient of the provided tensor to None :type grad_to_none: torch.tensor, optional :param quantiles: Performance percentile to return in addition to the median. :type quantiles: list[float], optional :param return_mode: The statistical measure to return. Options are "min", "max", "mean", "median", or "all". Default is "mean". :type return_mode: str """ assert return_mode in ["min", "max", "mean", "median", "all"] di = runtime.driver.active.get_device_interface() fn() di.synchronize() cache = runtime.driver.active.get_empty_cache_for_benchmark() # Estimate the runtime of the function start_event = di.Event(enable_timing=True) end_event = di.Event(enable_timing=True) start_event.record() for _ in range(5): runtime.driver.active.clear_cache(cache) fn() end_event.record() di.synchronize() estimate_ms = start_event.elapsed_time(end_event) / 5 # compute number of warmup and repeat n_warmup = max(1, int(warmup / estimate_ms)) n_repeat = max(1, int(rep / estimate_ms)) start_event = [di.Event(enable_timing=True) for i in range(n_repeat)] end_event = [di.Event(enable_timing=True) for i in range(n_repeat)] # Warm-up for _ in range(n_warmup): fn() # Benchmark for i in range(n_repeat): # we don't want `fn` to accumulate gradient values # if it contains a backward pass. So we clear the # provided gradients if grad_to_none is not None: for x in grad_to_none: x.grad = None # we clear the L2 cache before each run runtime.driver.active.clear_cache(cache) # record time of `fn` start_event[i].record() fn() end_event[i].record() # Record clocks di.synchronize() times = [s.elapsed_time(e) for s, e in zip(start_event, end_event)] return _summarize_statistics(times, quantiles, return_mode) وبالإضافة إلى ما تم نشره، يتيح لك الحصول على عدد من الإحصائيات من المؤشر، وليس فقط المتوسط. على سبيل المثال، قد يطلب منك المتوسط أو النقطة 99. do_bench من أجل استخدام هذه الوظيفة ، يمكننا بسهولة الدعاء: import triton bench_time = triton.testing.do_bench(lambda: matmul(a, b)) CPU-bound execution و CUDA Graphs بالنسبة للذوبان التي لديها وقت إجراء قصير جداً، قد يصبح حجم إطلاقه أكبر بكثير مما يزيد عن الوقت الذوبان نفسه. دعونا نضيف looong for-loop إلى كورنيش matmul لدينا، لأننا نعلم أن for-loops هي بطيئة حقا على CPU في Python: def cpu_heavy_matmul(a, b): cnt = 0 for _ in range(100_000): cnt += 1 return a @ b وسوف نرى أن (4096, 8192) من (8192, 4096) وقت matmul زاد من 0.4ms إلى 2.2ms - أكثر من 5x! الآن، اعتمادًا على ما يبدو عملية التدريب الفعلية الخاصة بك، قد ترغب في إزالة CPU-overhead من النتائج المقارنة أو لا. على سبيل المثال، إذا كان التدريب الخاص بك هو بالفعل CPU-bound، ثم نفس المشكلات سوف تظهر خلال عملية الإنتاج أيضا. ثم، على الأرجح، لن يكون من الأهمية إزالة وقت CPU لقياساتنا. ومع ذلك، في عمليات التدريب العادية، لا ينبغي أن يكون CPU في طريق الأداء التدريبي الأفضل - عادة ما يتم تشغيل CPU قبل GPU. في هذه الحالة، حتى لو كان هذا الكورنيش لديه overhead كبير، فإنه لن يغير الوقت التنفيذي للكيان الصناعي على GPU. ثم نريد إزالة overhead من قياساتنا. للقيام بذلك ، يمكننا استخدام شاشات CUDA. CUDA Graphs تسمح لنا بإدخال سلسلة من الكورول مرة واحدة وتعبيرها مع ممارسة CPU الحد الأدنى. في Triton ، هناك باستخدام هذا لدينا matmul الجديد، ونحن نكتشف وقت القناع الأولي ( قبل إدخال CPU overhead). triton.testing.do_bench_cudagraph a, b = get_data(4096, 8192, 4096) triton.testing.do_bench_cudagraph(lambda: cpu_heavy_matmul(a, b)) # back to ~0.4ms! وبطبيعة الحال ، تأتي ورقات CUDA مع التحديات الخاصة بها (الشكلات المستقرة ، والطاقة التحكمية المستقرة و إنها مناسبة بشكل أفضل لإزالة بعض أجزاء من أداء GPU بدلاً من حلقات التدريب e2e. هذا هو هذا هو ليس فقط الأنظمة تنتهك - نرى البيانات في بعض الأحيان ، لا يعرف الطريقة المثلى للترجمة قصة كلها ، لأن ظروف الإنتاج مختلفة من البيئة المزدحمة التي نقوم بها للترجمة. على سبيل المثال، دعونا نقول إننا نريد أن نقيس الكورنيش المختلطة داخل طبقة Mixture-of-Experts.في طبقة MoE التقليدية، يتم تسريع تكنولوجيا تكنولوجيا تكنولوجيا تكنولوجيا تكنولوجيا تكنولوجيا تكنولوجيا تكنولوجيا تكنولوجيا تكنولوجيا تكنولوجيا تكنولوجيا تكنولوجيا تكنولوجيا تكنولوجيا تكنولوجيا تكنولوجيا تكنولوجيا تكنولوجيا تكنولوجيا. وبشكل غامض ، سنقوم بتنمية احتمالات المرور العشوائية وتسريع الطرق الصناعية على أساسها ، ومع ذلك ، خلال التدريب ، قد يكون المرور لدينا غير متوازن ، وذلك باستخدام العديد من الخبراء أكثر من الآخرين. دعونا نلقي نظرة على كيفية تأثير توزيع التوترات الأساسية التي يتم توجيهها إلى كل خبير على قياساتنا. import numpy as np import torch import triton def sample_expert_assignments( seq_len: int, num_experts: int, top_k: int, use_beta: bool = True, alpha: float = 1.0, beta: float = 5.0, ) -> tuple[torch.Tensor, torch.Tensor]: if use_beta: expert_weights = np.random.beta(alpha, beta, num_experts) expert_weights = expert_weights / expert_weights.sum() else: expert_weights = np.ones(num_experts) / num_experts gumbel_noise = -np.log(-np.log(np.random.uniform(0, 1, (seq_len, num_experts)))) log_weights = np.log(expert_weights + 1e-10) scores = log_weights[None, :] + gumbel_noise expert_indices = torch.from_numpy(np.argsort(scores, axis=1)[:, -top_k:]) tokens_per_expert = torch.bincount( expert_indices.flatten(), minlength=num_experts ).to(torch.int32) return expert_indices, tokens_per_expert def get_tokens( seq_len: int, hidden: int, num_experts: int, top_k: int, expert_indices: torch.Tensor, tokens_per_expert: torch.Tensor, device: str = "cuda", ) -> torch.Tensor: x_original = torch.randn((seq_len, hidden), dtype=torch.bfloat16, device=device) token_indices = torch.arange(seq_len, device=device)[:, None].expand(-1, top_k) expert_flat = expert_indices.to(device).flatten() token_flat = token_indices.flatten() token_sorted = token_flat[torch.argsort(expert_flat)] return x_original[token_sorted] def get_tensors( seq_len: int = 1024, hidden: int = 4096, intermediate: int = 1536, num_experts: int = 128, top_k: int = 8, use_beta: bool = True, ) -> tuple[torch.Tensor, torch.Tensor, torch.Tensor]: expert_indices, tokens_per_expert = sample_expert_assignments( seq_len, num_experts, top_k, use_beta=use_beta, ) x = get_tokens( seq_len, hidden, num_experts, top_k, expert_indices, tokens_per_expert, device="cuda", ) w = torch.randn( (num_experts, hidden, intermediate), dtype=torch.bfloat16, device="cuda" ) offsets = torch.cumsum(tokens_per_expert, dim=0).to( dtype=torch.int32, device="cuda" ) return x, w, offsets def benchmark( seq_len: int = 1024, hidden: int = 4096, intermediate: int = 1536, num_experts: int = 128, top_k: int = 8, use_beta: bool = True, num_iters: int = 250, ): x, w, offsets = get_tensors( seq_len, hidden, intermediate, num_experts, top_k, use_beta, ) return triton.testing.do_bench_cudagraph( lambda: torch._grouped_mm(x, w, offs=offsets), rep=num_iters ) params = { "seq_len": 4096, "hidden": 4096, "intermediate": 1536, "num_experts": 128, "top_k": 8, } uniform = benchmark(**params, use_beta=False) beta = benchmark(**params, use_beta=True) print(f"Uniform: {uniform:.5f} ms") print(f"Beta: {beta:.5f} ms") # Uniform: 0.96811 ms # Beta: 1.02422 ms مع التخصيص الوظيفي للطائرات ، فإن الكورنيش GroupedGEMM أسرع بنسبة 6٪ مقارنة مع نموذجًا من توزيع البيتا (الذي نستخدمه لتصميم ضغط غير متوازن)!في الممارسة العملية ، قد يؤدي ذلك إلى رؤية الاختلافات بين الأداء خلال عملية التدريب الحقيقية مقارنة مع ما نلاحظ أثناء التقييم. وضع كل شيء معا للتوضيح : When writing the benchmarking code by hand, do not forget to: Flush the L2 cache Use CUDA events Use cuda.synchronize to wait for the completion of all GPU work Alternatively, use or for a built-in solution. triton.testing.do_bench triton.testing.do_bench_cudagraph Regardless of approach, do not forget about the underlying data you’re using. على سبيل المثال ، فإن معيار النموذج الخاص بك مفيد فقط كما هو الحال مع ما سنرى في العملية الإنتاجية الحقيقية.إذا نحن نأخذ في الاعتبار حجم CPU ، لا تجنب ذلك.إذا كنت تتوقع توزيع بيانات محددة أثناء التدريب ، حاول اختبارها أثناء النموذج.