Wszyscy rozwiązujemy problem „ ” wiele razy dziennie. Oczywiście nieumyślnie. Rozwiązujemy go w drodze do pracy, przeglądając Internet lub porządkując rzeczy na biurku. najkrótszej drogi Brzmi trochę… za dużo? Przekonajmy się. Wyobraź sobie, że postanowiłeś spotkać się ze znajomymi, no cóż... powiedzmy w kawiarni. Przede wszystkim musisz znaleźć trasę (lub ścieżkę) do kawiarni, więc zaczynasz szukać dostępnego transportu publicznego (jeśli idziesz pieszo) lub tras i ulic (jeśli jedziesz samochodem). Szukasz trasy z obecnej lokalizacji do kawiarni, ale nie „żadnej” trasy – najkrótszej lub najszybszej. Oto jeszcze jeden przykład, który nie jest tak oczywisty jak poprzedni. Podczas pracy nad drogą decydujesz się na „skrót” przez boczną ulicę, ponieważ cóż… to jest „skrót” i „szybciej” jest iść tą drogą. Ale skąd wiesz, że ten „skrót” jest szybszy? Na podstawie osobistych doświadczeń rozwiązałeś problem „najkrótszej ścieżki” i wybrałeś trasę, która prowadzi przez boczną ulicę. W obu przykładach „najkrótsza” trasa jest określana albo odległością, albo czasem potrzebnym na dotarcie z jednego miejsca do drugiego. Przykłady podróży są bardzo naturalnymi zastosowaniami i w szczególności problemu „najkrótszej ścieżki”. Co jednak z przykładem układania rzeczy na biurku? W tym przypadku „najkrótsza” może reprezentować liczbę lub złożoność działań, które musisz wykonać, aby na przykład uzyskać kartkę papieru: otwórz biurko, otwórz folder, weź kartkę papieru vs. weź kartkę papieru prosto z biurka. teorii grafów Wszystkie powyższe przykłady przedstawiają problem znalezienia najkrótszej ścieżki między dwoma wierzchołkami w grafie (trasa lub ścieżka między dwoma miejscami, liczba działań lub złożoność pobrania kartki papieru z jednego miejsca lub drugiego). Ta klasa problemów najkrótszej ścieżki nazywana jest , a podstawowym algorytmem rozwiązywania tych problemów jest , który ma złożoność obliczeniową . SSSP (Single Source Shortest Path) algorytm Dijkstry O(n^2) Ale czasami musimy znaleźć wszystkie najkrótsze ścieżki pomiędzy wszystkimi wierzchołkami. Rozważmy następujący przykład: tworzysz mapę dla swoich regularnych ruchów pomiędzy , i . W tym scenariuszu otrzymasz 6 tras: , , , , i (trasy powrotne mogą być różne ze względu na drogi jednokierunkowe na przykład). domem pracą teatrem work ⭢ home home ⭢ work work ⭢ theatre theatre ⭢ work home ⭢ theatre theatre ⭢ home Dodanie większej ilości miejsc na mapie spowoduje znaczący wzrost liczby kombinacji – zgodnie z można to obliczyć następująco: permutacjami wzoru n kombinatoryki A(k, n) = n! / (n - m)! // where n - is a number of elements, // k - is a number of elements in permutation (in our case k = 2) Co daje nam 12 tras dla 4 miejsc i 90 tras dla 10 miejsc (co jest imponujące). Uwaga… to nie uwzględnia punktów pośrednich między miejscami, np. aby dostać się z domu do pracy musisz przejść przez 4 ulice, iść wzdłuż rzeki i przejść przez most. Jeśli sobie wyobrazimy, niektóre trasy mogą mieć wspólne punkty pośrednie… cóż… w rezultacie będziemy mieli bardzo duży graf z wieloma wierzchołkami, gdzie każdy wierzchołek będzie reprezentował albo miejsce, albo znaczący punkt pośredni. Klasa problemów, w których musimy znaleźć wszystkie najkrótsze ścieżki między wszystkimi parami wierzchołków w grafie, nazywa się a podstawowym algorytmem rozwiązywania tych problemów jest , który ma złożoność obliczeniową . APSP (All Pairs Shortest Paths), algorytm Floyda-Warshalla O(n^3) I to jest algorytm, który dziś wdrożymy 🙂 Algorytm Floyda-Warshalla Algorytm Floyda-Warshalla znajduje wszystkie najkrótsze ścieżki między każdą parą wierzchołków w grafie. Algorytm został opublikowany przez Roberta Floyda w [1] (więcej szczegółów można znaleźć w sekcji „Odniesienia”). W tym samym roku Peter Ingerman w [2] opisał nowoczesną implementację algorytmu w formie trzech zagnieżdżonych pętli : for algorithm FloydWarshall(W) do for k = 0 to N - 1 do for i = 0 to N - 1 do for j = 0 to N - 1 do W[i, j] = min(W[i, j], W[i, k] + W[k, j]) end for end for end for end algorithm // where W - is a weight matrix of N x N size, // min() - is a function which returns lesser of it's arguments Jeśli nigdy nie miałeś okazji pracować z grafem przedstawionym w formie macierzy, może być trudno zrozumieć, co robi powyższy algorytm. Więc, aby upewnić się, że jesteśmy na tej samej stronie, przyjrzyjmy się, jak graf może być przedstawiony w formie macierzy i dlaczego taka reprezentacja jest korzystna dla rozwiązania problemu najkrótszej ścieżki. Poniższy rysunek ilustruje graf 5 wierzchołków. Po lewej stronie graf jest przedstawiony w formie wizualnej, która składa się z okręgów i strzałek, gdzie każdy okrąg reprezentuje wierzchołek, a strzałka reprezentuje krawędź z kierunkiem. Liczba wewnątrz okręgu odpowiada numerowi wierzchołka, a liczba nad krawędzią odpowiada wadze krawędzi. Po prawej stronie ten sam graf jest przedstawiony w formie macierzy wag. Macierz wag jest formą , w której każda komórka macierzy zawiera „wagę” – odległość między wierzchołkiem (wiersz) a wierzchołkiem (kolumna). Macierz wag nie zawiera informacji o „ścieżce” między wierzchołkami (lista wierzchołków, przez którą przechodzisz z do ) – tylko wagę (odległość) między tymi wierzchołkami. skierowany, ważony macierzy sąsiedztwa i j i j W macierzy wag możemy zobaczyć, że wartości komórek są równe wagom między wierzchołkami. Dlatego też, jeśli zbadamy ścieżki od wierzchołka (wiersz ), zobaczymy, że… istnieje tylko jedna ścieżka – od do Jednak na reprezentacji wizualnej możemy wyraźnie zobaczyć ścieżki od wierzchołka do wierzchołków i (przez wierzchołek ). Powód tego jest prosty – w stanie początkowym macierz wag zawiera odległość tylko między sąsiadującymi wierzchołkami. Jednak sama ta informacja wystarcza, aby resztę. sąsiadującymi 0 0 0 1 0 2 3 1 znaleźć Zobaczmy jak to działa. Zwróć uwagę na komórkę . Jej wartość wskazuje, że istnieje ścieżka od wierzchołka do wierzchołka o wadze równej (w skrócie możemy zapisać to jako: ). Mając tę wiedzę, możemy teraz przeskanować wszystkie komórki wiersza (który zawiera wszystkie wagi wszystkich ścieżek z wierzchołka ) i przenieść te informacje z powrotem do wiersza , zwiększając wagę o (wartość ). W[0, 1] 0 1 1 0 ⭢ 1: 1 1 1 0 1 W[0, 1] Używając tych samych kroków możemy znaleźć ścieżki od wierzchołka przez inne wierzchołki. Podczas wyszukiwania może się zdarzyć, że istnieje więcej niż jedna ścieżka prowadząca do tego samego wierzchołka i co ważniejsze wagi tych ścieżek mogą być różne. Przykład takiej sytuacji jest zilustrowany na poniższym rysunku, gdzie wyszukiwanie od wierzchołka przez wierzchołek ujawniło jeszcze jedną ścieżkę do wierzchołka o mniejszej wadze. 0 0 2 3 Mamy dwie ścieżki: oryginalną ścieżkę – i nową ścieżkę, którą właśnie odkryliśmy – (pamiętaj, macierz wag nie zawiera ścieżek, więc nie wiemy, które wierzchołki są zawarte w oryginalnej ścieżce). To moment, w którym podejmujemy decyzję o zachowaniu najkrótszej ścieżki i zapisaniu w komórce . 0 ⭢ 3: 4 0 ⭢ 2 ⭢ 3: 3 3 W[0, 3] Wygląda na to, że właśnie znaleźliśmy naszą pierwszą najkrótszą ścieżkę! Kroki, które właśnie widzieliśmy, są istotą algorytmu Floyda-Warshalla. Spójrzmy jeszcze raz na pseudokod algorytmu: algorithm FloydWarshall(W) do for k = 0 to N - 1 do for i = 0 to N - 1 do for j = 0 to N - 1 do W[i, j] = min(W[i, j], W[i, k] + W[k, j]) end for end for end for end algorithm Najbardziej zewnętrzny cykl on iteruje po wszystkich wierzchołkach w grafie i w każdej iteracji zmienna reprezentuje wierzchołek . Wewnętrzny cykl on również iteruje po wszystkich wierzchołkach w grafie i w każdej iteracji reprezentuje wierzchołek . I na koniec najbardziej wewnętrzny cykl on iteruje po wszystkich wierzchołkach w grafie i w każdej iteracji reprezentuje wierzchołek W połączeniu daje nam to następujące: w każdej iteracji przeszukujemy ścieżki ze wszystkich wierzchołków do wszystkich wierzchołków przez wierzchołek . Wewnątrz cyklu porównujemy ścieżkę (reprezentowaną przez ) ze ścieżką (reprezentowaną przez sumę i ) i zapisujemy najkrótszą z powrotem do . for k k , przez który przeszukujemy ścieżki for i i , przez który przeszukujemy ścieżki for j j , przez który przeszukujemy ścieżki. k i j k i ⭢ j W[i, j] i ⭢ k ⭢ j W[I, k] W[k, j] W[i, j] Teraz, gdy rozumiemy już mechanikę, czas na wdrożenie algorytmu. Podstawowa implementacja Kod źródłowy i wykresy eksperymentalne są dostępne w na GitHub. Wykresy eksperymentalne można znaleźć w katalogu . Wszystkie testy porównawcze w tym poście są zaimplementowane w pliku . repozytorium Data/Sparse-Graphs.zip APSP01.cs Zanim przejdziemy do realizacji, musimy wyjaśnić kilka kwestii technicznych: Wszystkie implementacje działają z macierzą wag reprezentowaną w formie tablicy liniowej. Wszystkie implementacje używają arytmetyki liczb całkowitych. Brak ścieżki między wierzchołkami jest reprezentowany przez specjalną stałą wagę: . NO_EDGE = (int.MaxValue / 2) - 1 Teraz zastanówmy się, dlaczego tak jest. Kiedy mówimy o macierzach, definiujemy komórki w kategoriach „wierszy” i „kolumn”. Z tego powodu wydaje się naturalne wyobrażanie sobie macierzy w formie „kwadratu” lub „prostokąta” (Rysunek 4a). Odnośnie #1. Jednakże nie oznacza to koniecznie, że musimy przedstawić macierz w formie tablicy tablic (Rysunek 4b), aby trzymać się naszej wyobraźni. Zamiast tego możemy przedstawić macierz w formie tablicy liniowej (Rysunek 4c), gdzie indeks każdej komórki jest obliczany przy użyciu następującego wzoru: i = row * row_size + col; // where row - cell row index, // col - cell column index, // row_size - number of cells in a row. Liniowa tablica macierzy wag jest koniecznym skutecznego wykonania algorytmu Floyda-Warshalla. Powód tego nie jest prosty i szczegółowe wyjaśnienie zasługuje na osobny post… lub kilka postów. Jednak obecnie ważne jest, aby wspomnieć, że taka reprezentacja znacząco poprawia , co w rzeczywistości ma duży wpływ na wydajność algorytmu. warunkiem lokalność danych Tutaj proszę, abyście mi uwierzyli i wzięli sobie na początek tylko tę informację pod uwagę, ale jednocześnie radzę Wam, abyście poświęcili trochę czasu na przemyślenie zagadnienia, a przy okazji – nie wierzcie ludziom w Internecie. - Uwaga autora Jeśli przyjrzysz się bliżej pseudokodowi algorytmu, nie znajdziesz żadnych sprawdzeń związanych z istnieniem ścieżki między dwoma wierzchołkami. Zamiast tego pseudokod po prostu używa funkcji . Powód jest prosty – pierwotnie, jeśli nie ma ścieżki między dwoma wierzchołkami, wartość komórki jest ustawiana na i we wszystkich językach, z wyjątkiem JavaScript, wszystkie wartości są mniejsze od . W przypadku liczb całkowitych może być kuszące użycie jako wartości „brak ścieżki”. Jednak doprowadzi to do przepełnienia całkowitego w przypadkach, gdy wartości obu ścieżek i będą równe . Dlatego używamy wartości, która jest o jeden mniejsza niż połowa . Odnośnie #2. min() infinity infinity int.MaxValue i ⭢ k k ⭢ j int.MaxValue int.MaxValue Hej! Ale dlaczego nie możemy po prostu sprawdzić, czy ścieżka istnieje, zanim wykonamy jakiekolwiek obliczenia. Na przykład porównując obie ścieżki do 0 (jeśli przyjmujemy zero jako wartość „brak ścieżki”). - Czytelnik myślący Jest to rzeczywiście możliwe, ale niestety doprowadzi to do znacznego spadku wydajności. Krótko mówiąc, CPU przechowuje statystyki wyników oceny rozgałęzień, np. gdy niektóre z instrukcji są oceniane jako lub . Wykorzystuje te statystyki do wykonania kodu „statystycznie przewidzianej rozgałęzienia” z góry, zanim zostanie oceniona rzeczywista instrukcja (nazywa się to ), a zatem wykonuje kod wydajniej. Jednak gdy przewidywanie CPU jest niedokładne, powoduje to znaczną utratę wydajności w porównaniu z prawidłowym przewidywaniem i wykonaniem bezwarunkowym, ponieważ CPU musi zatrzymać się i obliczyć prawidłową rozgałęzienie. if true false if wykonaniem spekulatywnym Ponieważ w każdej iteracji aktualizujemy znaczną część macierzy wag, statystyki rozgałęzień procesora stają się bezużyteczne, ponieważ nie ma wzorca kodu, wszystkie rozgałęzienia są oparte wyłącznie na danych. Tak więc takie sprawdzenie spowoduje znaczną liczbę błędnych . k przewidywań rozgałęzień Tutaj również proszę Was, abyście mi uwierzyli (na razie), a potem poświęcili trochę czasu na zgłębienie tematu. Uhh, wygląda na to, że skończyliśmy część teoretyczną – zaimplementujmy algorytm (oznaczmy tę implementację jako ): Baseline public void Baseline(int[] matrix, int sz) { for (var k = 0; k < sz; ++k) { for (var i = 0; i < sz; ++i) { for (var j = 0; j < sz; ++j) { var distance = matrix[i * sz + k] + matrix[k * sz + j]; if (matrix[i * sz + j] > distance) { matrix[i * sz + j] = distance; } } } } } Powyższy kod jest niemal identyczną kopią wcześniej wspomnianego pseudokodu, z jednym wyjątkiem – zamiast używamy aby zaktualizować komórkę tylko wtedy, gdy jest to konieczne. Math.Min() if Hej! Poczekaj chwilę, czy to nie ty napisałeś dużo słów o tym, dlaczego if nie są tutaj dobre, a kilka linijek dalej sami wprowadzamy if? - Czytelnik myślący Powód jest prosty. W chwili pisania tego tekstu JIT emituje prawie równoważny kod dla obu implementacji i . Możesz to sprawdzić szczegółowo na , ale oto fragmenty ciał pętli głównej: if Math.Min sharplab.io // // == Assembly code of implementation of innermost loop for of j using if. // 53: movsxd r14, r14d // // compare matrix[i * sz + j] and distance (Condition) // 56: cmp [rdx+r14*4+0x10], ebx 5b: jle short 64 // // if matrix[i * sz + j] greater than distance write distance to matrix // 5d: movsxd rbp, ebp 60: mov [rdx+rbp*4+0x10], ebx 64: // end of loop // // == Assembly code of implementation of innermost loop for of j using Math.Min. // 4f: movsxd rbp, ebp 52: mov r14d, [rdx+rbp*4+0x10] // // compare matrix[i * sz + j] and distance (Condition) // 57: cmp r14d, ebx. // // if matrix[i * sz + j] less than distance write value to matrix // 5a: jle short 5e // // otherwise write distance to matrix // 5c: jmp short 61 5e: mov ebx, r14d 61: mov [rdx+rbp*4+0x10], ebx 65: // end of loop Możesz zobaczyć, że niezależnie od tego, czy używamy czy nadal istnieje warunkowe sprawdzenie. Jednak w przypadku nie ma zbędnego zapisu. if Math.Min if Teraz, gdy zakończyliśmy implementację, czas uruchomić kod i zobaczyć, jak szybki jest?! Poprawność kodu możesz sprawdzić samodzielnie, uruchamiając testy w . repozytorium Używam (wersja 0.12.1) do testowania kodu. Wszystkie grafy używane w testach porównawczych są grafami o 300, 600, 1200, 2400 i 4800 wierzchołkach. Liczba krawędzi w grafach wynosi około 80% możliwego maksimum, co dla skierowanych, acyklicznych grafów można obliczyć jako: Benchmark.NET skierowanymi, acyklicznymi var max = v * (v - 1)) / 2; // where v - is a number of vertexes in a graph. Dajmy czadu! Oto wyniki testów porównawczych przeprowadzonych na moim komputerze (Windows 10.0.19042, procesor Intel Core i7-7700 3,60 GHz (Kaby Lake) / 8 procesorów logicznych / 4 rdzenie): Metoda Rozmiar Mieć na myśli Błąd Odchylenie standardowe Linia bazowa 300 27,525 milisekundy 0,1937 milisekundy 0,1617 milisekundy Linia bazowa 600 217,897 milisekund 1,6415 milisekundy 1,5355 milisekundy Linia bazowa 1200 1,763.335 milisekund 7,4561 milisekundy 6,2262 milisekundy Linia bazowa 2400 14 533,335 milisekund 63,3518 milisekund 52,9016 milisekund Linia bazowa 4800 119 768,219 milisekund 181,5514 milisekundy 160,9406 milisekund Z wyników możemy wywnioskować, że czas obliczeń rośnie drastycznie w porównaniu do rozmiaru grafu – dla grafu o 300 wierzchołkach zajęło to 27 milisekund, dla grafu o 2400 wierzchołkach – 14,5 sekundy, a dla grafu o 4800 wierzchołkach – 119 sekund, czyli ! prawie 2 minuty Patrząc na kod algorytmu, trudno sobie wyobrazić, że jest coś, co możemy zrobić, aby przyspieszyć obliczenia… ponieważ… są TRZY pętle, tylko TRZY pętle. Jednak, jak to zwykle bywa – możliwości kryją się w szczegółach 🙂 Poznaj swoje dane – rzadkie wykresy Algorytm Floyda-Warshalla jest podstawowym algorytmem rozwiązywania problemu najkrótszej ścieżki dla wszystkich par, zwłaszcza w przypadku grafów i (ponieważ algorytm przeszukuje ścieżki pomiędzy wszystkimi parami wierzchołków). gęstych zupełnych Jednak w naszych eksperymentach używamy grafów, które mają wspaniałą właściwość – jeśli istnieje ścieżka od wierzchołka do wierzchołka , to nie ma ścieżki od wierzchołka do wierzchołka Dla nas oznacza to, że istnieje wiele nieprzylegających wierzchołków, które możemy pominąć, jeśli nie ma ścieżki od do (oznaczamy tę implementację jako ). skierowanych, acyklicznych 1 2 2 1 i k SpartialOptimisation public void SpartialOptimisation(int[] matrix, int sz) { for (var k = 0; k < sz; ++k) { for (var i = 0; i < sz; ++i) { if (matrix[i * sz + k] == NO_EDGE) { continue; } for (var j = 0; j < sz; ++j) { var distance = matrix[i * sz + k] + matrix[k * sz + j]; if (matrix[i * sz + j] > distance) { matrix[i * sz + j] = distance; } } } } } Poniżej przedstawiono wyniki poprzednich ( ) i obecnych ( ) implementacji na tym samym zestawie wykresów (najszybsze wyniki wyróżniono ): Baseline SpartialOptimisation pogrubioną czcionką Metoda Rozmiar Mieć na myśli Błąd Odchylenie standardowe Stosunek Linia bazowa 300 27,525 milisekundy 0,1937 milisekundy 0,1617 milisekundy 1,00 Częściowa optymalizacja 300 12,399 milisekund 0,0943 milisekundy 0,0882 milisekundy 0,45 Linia bazowa 600 217,897 milisekund 1,6415 milisekundy 1,5355 milisekundy 1,00 Częściowa optymalizacja 600 99,122 milisekundy 0,8230 milisekundy 0,7698 milisekundy 0,45 Linia bazowa 1200 1,763.335 milisekund 7,4561 milisekundy 6,2262 milisekundy 1,00 Częściowa optymalizacja 1200 766,675 milisekundy 6,1147 milisekund 5,7197 milisekundy 0,43 Linia bazowa 2400 14 533,335 milisekund 63,3518 milisekund 52,9016 milisekund 1,00 Częściowa optymalizacja 2400 6,507.878 milisekund 28,2317 milisekund 26,4079 milisekundy 0,45 Linia bazowa 4800 119 768,219 milisekund 181,5514 milisekundy 160,9406 milisekund 1,00 Częściowa optymalizacja 4800 55 590,374 milisekundy 414,6051 milisekundy 387,8218 milisekund 0,46 Naprawdę imponujące! Skróciliśmy czas obliczeń o połowę! Oczywiście, im gęstszy wykres, tym mniejsze będzie przyspieszenie. Jednak jest to jedna z optymalizacji, która przydaje się, jeśli wiesz z góry, z jaką klasą danych zamierzasz pracować. Czy możemy zrobić więcej? 🙂 Poznaj swój sprzęt – paralelizm Mój komputer jest wyposażony w procesor z 8 logicznymi procesorami ( ) lub 4 rdzeniami z technologią . Posiadanie więcej niż jednego rdzenia jest jak posiadanie większej ilości „zapasowych rąk”, które możemy wykorzystać. Zobaczmy więc, która część pracy może być wydajnie podzielona między wielu pracowników, a następnie sparalelizowana. Inter Core i7-7700 CPU 3.60GHz (Kaby Lake) HW Hyper-Threading Pętle są zawsze najbardziej oczywistym kandydatem do paralelizacji, ponieważ w większości przypadków iteracje są niezależne i mogą być wykonywane jednocześnie. W algorytmie mamy trzy pętle, które powinniśmy przeanalizować i sprawdzić, czy istnieją jakieś zależności, które uniemożliwiają nam ich paralelizację. Zacznijmy od pętli . W każdej iteracji pętla oblicza ścieżki z każdego wierzchołka do każdego wierzchołka przez wierzchołek . Nowe i zaktualizowane ścieżki są przechowywane w macierzy wag. Patrząc na iteracje możesz zauważyć – mogą być wykonywane w dowolnej kolejności: 0, 1, 2, 3 lub 3, 1, 2, 0 bez wpływu na wynik. Jednak nadal muszą być wykonywane po kolei, w przeciwnym razie niektóre iteracje nie będą mogły używać nowych lub zaktualizowanych ścieżek, ponieważ nie zostaną jeszcze zapisane w macierzy wag. Taka niespójność z pewnością zmiażdży wyniki. Więc musimy dalej szukać. for of k k Następnym kandydatem jest . W każdej iteracji pętla odczytuje ścieżkę z wierzchołka do wierzchołka (komórka: ), ścieżkę z wierzchołka do wierzchołka (komórka: ]), a następnie sprawdza, czy znana ścieżka z do (komórka: ) jest krótsza niż ścieżka (suma: + ). Jeśli przyjrzysz się bliżej wzorcowi dostępu, możesz zauważyć – w każdej iteracji pętla odczytuje dane z wiersza i aktualizowanego wiersza, co zasadniczo oznacza – iteracje są niezależne i mogą być wykonywane nie tylko w dowolnej kolejności, ale także równolegle! for of i i k W[i, k] k j W[k, j i j W[i, j] i ⭢ k ⭢ j W[i, k] W[k, j] i k i Wygląda to obiecująco, więc zaimplementujmy to (oznaczymy tę implementację jako ). SpartialParallelOptimisations Pętla również może być równoległa. Jednakże, równoległość najbardziej wewnętrznego cyklu w tym przypadku jest bardzo nieefektywna. Możesz to sprawdzić samodzielnie, wprowadzając kilka prostych zmian w . for of j kodzie źródłowym - Uwaga autora public void SpartialParallelOptimisations(int[] matrix, int sz) { for (var k = 0; k < sz; ++k) { Parallel.For(0, sz, i => { if (matrix[i * sz + k] == NO_EDGE) { return; } for (var j = 0; j < sz; ++j) { var distance = matrix[i * sz + k] + matrix[k * sz + j]; if (matrix[i * sz + j] > distance) { matrix[i * sz + j] = distance; } } }); } } Oto wyniki implementacji , i na tym samym zestawie wykresów (paralelizacja jest realizowana przy użyciu klasy ): Baseline SpartialOptimisation SpartialParallelOptimisations Parallel Metoda Rozmiar Mieć na myśli Błąd Odchylenie standardowe Stosunek Linia bazowa 300 27,525 milisekundy 0,1937 milisekundy 0,1617 milisekundy 1,00 Częściowa optymalizacja 300 12,399 milisekundy 0,0943 milisekundy 0,0882 milisekundy 0,45 Częściowe Równoległe Optymalizacje 300 6,252 milisekundy 0,0211 milisekundy 0,0187 milisekundy 0,23 Linia bazowa 600 217,897 milisekund 1,6415 milisekundy 1,5355 milisekundy 1,00 Częściowa optymalizacja 600 99,122 milisekundy 0,8230 milisekundy 0,7698 milisekundy 0,45 Częściowe Równoległe Optymalizacje 600 35,825 milisekundy 0,1003 milisekundy 0,0837 milisekundy 0,16 Linia bazowa 1200 1,763.335 milisekund 7,4561 milisekundy 6,2262 milisekundy 1,00 Częściowa optymalizacja 1200 766,675 milisekundy 6,1147 milisekund 5,7197 milisekundy 0,43 Częściowe Równoległe Optymalizacje 1200 248,801 milisekundy 0,6040 milisekundy 0,5043 milisekundy 0,14 Linia bazowa 2400 14 533,335 milisekund 63,3518 milisekund 52,9016 milisekund 1,00 Częściowa optymalizacja 2400 6,507.878 milisekund 28,2317 milisekund 26,4079 milisekundy 0,45 Częściowe Równoległe Optymalizacje 2400 2,076.403 milisekundy 20,8320 milisekund 19,4863 milisekundy 0,14 Linia bazowa 4800 119 768,219 milisekund 181,5514 milisekundy 160,9406 milisekund 1,00 Częściowa optymalizacja 4800 55 590,374 milisekundy 414,6051 milisekundy 387,8218 milisekund 0,46 Częściowe Równoległe Optymalizacje 4800 15 614,506 milisekund 115,6996 milisekund 102,5647 milisekundy 0,13 Z wyników możemy wywnioskować, że paralelizacja pętli skróciła czas obliczeń o 2-4 razy w porównaniu do poprzedniej implementacji ( )! To bardzo imponujące, jednak należy pamiętać, że paralelizacja czystych obliczeń zużywa wszystkie dostępne zasoby obliczeniowe, co może prowadzić do niedoboru zasobów innych aplikacji w systemie. for of i SpartialOptimisation Paralelizację należy stosować ostrożnie. Jak pewnie zgadliście – to nie koniec 🙂 Poznaj swoją platformę – wektoryzacja to przekształcenie kodu operującego na pojedynczym elemencie w kod operujący na wielu elementach jednocześnie. Wektoryzacja Może się to wydawać skomplikowane, ale zobaczmy, jak to działa na prostym przykładzie: var a = new [] { 5, 7, 8, 1 }; var b = new [] { 4, 2, 2, 6 }; var c = new [] { 0, 0, 0, 0 }; for (var i = 0; i < 4; ++i) c[i] = a[i] + b[i]; W wykonanie iteracji powyższej pętli na poziomie procesora można zilustrować następująco: uproszczeniu 0 for Oto, co się dzieje. CPU ładuje wartości tablic i z pamięci do rejestrów CPU (jeden rejestr może przechowywać wartość). Następnie CPU wykonuje operację dodawania skalarnego na tych rejestrach i zapisuje wynik z powrotem do pamięci głównej – bezpośrednio do tablicy . Ten proces jest powtarzany cztery razy, zanim pętla się zakończy. a b dokładnie jedną c Wektoryzacja oznacza wykorzystanie specjalnych rejestrów procesora – rejestrów wektorowych lub SIMD (single instruction multiple data) i odpowiadających im instrukcji procesora w celu wykonywania operacji na wielu wartościach tablicy jednocześnie: Oto, co się dzieje. CPU ładuje wartości tablic i z pamięci do rejestrów CPU (jednak tym razem jeden rejestr wektorowy może przechowywać wartości tablicowe). Następnie CPU wykonuje operację dodawania wektorów na tych rejestrach i zapisuje wynik z powrotem do pamięci głównej – bezpośrednio do tablicy . Ponieważ operujemy na dwóch wartościach jednocześnie, proces ten powtarza się dwa razy zamiast czterech. a b dwie c Aby zaimplementować wektoryzację w .NET możemy użyć typów i (możemy również użyć typów z przestrzeni nazw , jednak są one nieco zaawansowane dla obecnej implementacji, więc nie będę ich używał, ale możesz spróbować samodzielnie): Vector Vector<T> System.Runtime.Intrinsics public void SpartialVectorOptimisations(int[] matrix, int sz) { for (var k = 0; k < sz; ++k) { for (var i = 0; i < sz; ++i) { if (matrix[i * sz + k] == NO_EDGE) { continue; } var ik_vec = new Vector<int>(matrix[i * sz + k]); var j = 0; for (; j < sz - Vector<int>.Count; j += Vector<int>.Count) { var ij_vec = new Vector<int>(matrix, i * sz + j); var ikj_vec = new Vector<int>(matrix, k * sz + j) + ik_vec; var lt_vec = Vector.LessThan(ij_vec, ikj_vec); if (lt_vec == new Vector<int>(-1)) { continue; } var r_vec = Vector.ConditionalSelect(lt_vec, ij_vec, ikj_vec); r_vec.CopyTo(matrix, i * sz + j); } for (; j < sz; ++j) { var distance = matrix[i * sz + k] + matrix[k * sz + j]; if (matrix[i * sz + j] > distance) { matrix[i * sz + j] = distance; } } } } } Wektoryzowany kod może wyglądać nieco dziwnie, więc przeanalizujmy go krok po kroku. Tworzymy wektor ścieżki : . W rezultacie, jeśli wektor może pomieścić cztery wartości typu , a waga ścieżki jest równa 5, utworzymy wektor czterech 5 – [5, 5, 5, 5]. Następnie, w każdej iteracji, jednocześnie obliczamy ścieżki od wierzchołka do wierzchołków . i ⭢ k var ik_vec = new Vector<int>(matrix[i * sz + k]) int i ⭢ k i j, j + 1, ..., j + Vector<int>.Count Właściwość zwraca liczbę elementów typu , które mieszczą się w rejestrach wektorowych. Rozmiar rejestrów wektorowych zależy od modelu CPU, więc ta właściwość może zwracać różne wartości na różnych CPU. Vector<int>.Count int - Uwaga autora Cały proces obliczeniowy można podzielić na trzy etapy: Załaduj informacje z macierzy wag do wektorów: i . ij_vec ikj_vec Porównaj wektory i , a następnie wybierz najmniejsze wartości do nowego wektora . ij_vec ikj_vec r_vec Zapisz z powrotem do macierzy wag. r_vec Podczas gdy i są dość proste, wymaga wyjaśnienia. Jak wspomniano wcześniej, w przypadku wektorów manipulujemy wieloma wartościami jednocześnie. Dlatego wynik niektórych operacji nie może być osobliwy, np. operacja porównania nie może zwrócić lub , ponieważ porównuje wiele wartości. Zamiast tego zwraca nowy wektor, który zawiera wyniki porównania między odpowiadającymi sobie wartościami z wektorów i ( , jeśli wartość z jest mniejsza niż wartość z i jeśli jest inaczej). Zwrócony wektor (sam w sobie) nie jest zbyt przydatny, jednak możemy użyć operacji wektorowej , aby wyodrębnić wymagane wartości z wektorów i do nowego wektora – . Operacja ta zwraca nowy wektor, w którym wartości są równe mniejszej z dwóch odpowiadających sobie wartości wektorów wejściowych, tj. jeśli wartość wektora jest równa , to operacja wybiera wartość z , w przeciwnym wypadku wybiera wartość z . #1 #3 #2 Vector.LessThan(ij_vec, ikj_vec) true false ij_vec ikj_vec -1 ij_vec ikj_vec 0 Vector.ConditionalSelect(lt_vec, ij_vec, ikj_vec) ij_vec ikj_vec r_vec lt_vec -1 ij_vec ikj_vec Oprócz , jest jeszcze jedna część, która wymaga wyjaśnienia – druga, niewektoryzowana pętla. Jest ona wymagana, gdy rozmiar macierzy wag nie jest iloczynem wartości . W takim przypadku główna pętla nie może przetworzyć wszystkich wartości (ponieważ nie można załadować części wektora), a druga, niewektoryzowana pętla obliczy pozostałe iteracje. #2 Vector<int>.Count Oto rezultaty tej i wszystkich poprzednich implementacji: Metoda sz Mieć na myśli Błąd Odchylenie standardowe Stosunek Linia bazowa 300 27,525 milisekundy 0,1937 milisekundy 0,1617 milisekundy 1,00 Częściowa optymalizacja 300 12,399 milisekund 0,0943 milisekundy 0,0882 milisekundy 0,45 Częściowe Równoległe Optymalizacje 300 6,252 milisekundy 0,0211 milisekundy 0,0187 milisekundy 0,23 SpartialVectorOptimisations 300 3,056 milisekundy 0,0301 milisekundy 0,0281 milisekundy 0,11 Linia bazowa 600 217,897 milisekund 1,6415 milisekundy 1,5355 milisekundy 1,00 Częściowa optymalizacja 600 99,122 milisekundy 0,8230 milisekundy 0,7698 milisekundy 0,45 Częściowe Równoległe Optymalizacje 600 35,825 milisekundy 0,1003 milisekundy 0,0837 milisekundy 0,16 SpartialVectorOptimisations 600 24,378 milisekundy 0,4320 milisekundy 0,4041 milisekundy 0,11 Linia bazowa 1200 1,763.335 milisekund 7,4561 milisekundy 6,2262 milisekundy 1,00 Częściowa optymalizacja 1200 766,675 milisekundy 6,1147 milisekund 5,7197 milisekundy 0,43 Częściowe Równoległe Optymalizacje 1200 248,801 milisekundy 0,6040 milisekundy 0,5043 milisekundy 0,14 SpartialVectorOptimisations 1200 185,628 milisekund 2,1240 milisekundy 1,9868 milisekundy 0,11 Linia bazowa 2400 14 533,335 milisekund 63,3518 milisekund 52,9016 milisekund 1,00 Częściowa optymalizacja 2400 6,507.878 milisekund 28,2317 milisekund 26,4079 milisekundy 0,45 Częściowe Równoległe Optymalizacje 2400 2,076.403 milisekundy 20,8320 milisekund 19,4863 milisekundy 0,14 SpartialVectorOptimisations 2400 2,568,676 milisekund 31,7359 milisekund 29,6858 milisekund 0,18 Linia bazowa 4800 119 768,219 milisekund 181,5514 milisekund 160,9406 milisekund 1,00 Częściowa optymalizacja 4800 55 590,374 milisekundy 414,6051 milisekundy 387,8218 milisekund 0,46 Częściowe Równoległe Optymalizacje 4800 15 614,506 milisekund 115,6996 milisekund 102,5647 milisekundy 0,13 SpartialVectorOptimisations 4800 18 257,991 milisekund 84,5978 milisekund 79,1329 milisekundy 0,15 Z wyniku wynika, że wektoryzacja znacznie skróciła czas obliczeń – od 3 do 4 razy w porównaniu do wersji nieparalelizowanej ( ). Interesującym momentem jest to, że wersja zwektoryzowana przewyższa wersję równoległą ( ) również na mniejszych grafach (mniej niż 2400 wierzchołków). SpartialOptimisation SpartialParallelOptimisations I na koniec – połączmy wektoryzację i paralelizm! Jeśli interesują Cię praktyczne zastosowania wektoryzacji, polecam przeczytać serię postów , w których zwektoryzował (wyniki te później znalazły się w aktualizacji Garbage Collector w ). Dana Shechtera Array.Sort .NET 5 Poznaj swoją platformę i sprzęt – wektoryzację i paralelizm! Ostatnia implementacja łączy w sobie wysiłki paralelizacji i wektoryzacji i… szczerze mówiąc, brakuje jej indywidualności 🙂 Ponieważ… cóż, właśnie zastąpiliśmy ciało ze zwektoryzowaną pętlą ze : Parallel.For SpartialParallelOptimisations SpartialVectorOptimisations public void SpartialParallelVectorOptimisations(int[] matrix, int sz) { for (var k = 0; k < sz; ++k) { Parallel.For(0, sz, i => { if (matrix[i * sz + k] == NO_EDGE) { return; } var ik_vec = new Vector<int>(matrix[i * sz + k]); var j = 0; for (; j < sz - Vector<int>.Count; j += Vector<int>.Count) { var ij_vec = new Vector<int>(matrix, i * sz + j); var ikj_vec = new Vector<int>(matrix, k * sz + j) + ik_vec; var lt_vec = Vector.LessThan(ij_vec, ikj_vec); if (lt_vec == new Vector<int>(-1)) { continue; } var r_vec = Vector.ConditionalSelect(lt_vec, ij_vec, ikj_vec); r_vec.CopyTo(matrix, i * sz + j); } for (; j < sz; ++j) { var distance = matrix[i * sz + k] + matrix[k * sz + j]; if (matrix[i * sz + j] > distance) { matrix[i * sz + j] = distance; } } }); } } Wszystkie wyniki tego posta są przedstawione poniżej. Zgodnie z oczekiwaniami, jednoczesne użycie paralelizmu i wektoryzacji wykazało najlepsze wyniki, skracając czas obliczeń nawet 25 razy (dla grafów 1200 wierzchołków) w porównaniu z początkową implementacją. Metoda sz Mieć na myśli Błąd Odchylenie standardowe Stosunek Linia bazowa 300 27,525 milisekundy 0,1937 milisekundy 0,1617 milisekundy 1,00 Częściowa optymalizacja 300 12,399 milisekundy 0,0943 milisekundy 0,0882 milisekundy 0,45 Częściowe Równoległe Optymalizacje 300 6,252 milisekundy 0,0211 milisekundy 0,0187 milisekundy 0,23 SpartialVectorOptimisations 300 3,056 milisekundy 0,0301 milisekundy 0,0281 milisekundy 0,11 Częściowe Optymalizacje Wektorów Równoległych 300 3,008 milisekundy 0,0075 milisekundy 0,0066 milisekundy 0,11 Linia bazowa 600 217,897 milisekund 1,6415 milisekundy 1,5355 milisekundy 1,00 Częściowa optymalizacja 600 99,122 milisekundy 0,8230 milisekundy 0,7698 milisekundy 0,45 Częściowe Równoległe Optymalizacje 600 35,825 milisekundy 0,1003 milisekundy 0,0837 milisekundy 0,16 SpartialVectorOptimisations 600 24,378 milisekundy 0,4320 milisekundy 0,4041 milisekundy 0,11 Częściowe Optymalizacje Wektorów Równoległych 600 13,425 milisekundy 0,0319 milisekundy 0,0283 milisekundy 0,06 Linia bazowa 1200 1,763.335 milisekund 7,4561 milisekundy 6,2262 milisekundy 1,00 Częściowa optymalizacja 1200 766,675 milisekundy 6,1147 milisekund 5,7197 milisekundy 0,43 Częściowe Równoległe Optymalizacje 1200 248,801 milisekundy 0,6040 milisekundy 0,5043 milisekundy 0,14 SpartialVectorOptimisations 1200 185,628 milisekund 2,1240 milisekundy 1,9868 milisekundy 0,11 Częściowe Optymalizacje Wektorów Równoległych 1200 76,770 milisekund 0,3021 milisekundy 0,2522 milisekundy 0,04 Linia bazowa 2400 14 533,335 milisekund 63,3518 milisekund 52,9016 milisekund 1,00 Częściowa optymalizacja 2400 6,507.878 milisekund 28,2317 milisekund 26,4079 milisekundy 0,45 Częściowe Równoległe Optymalizacje 2400 2,076.403 milisekundy 20,8320 milisekund 19,4863 milisekundy 0,14 SpartialVectorOptimisations 2400 2,568,676 milisekund 31,7359 milisekund 29,6858 milisekund 0,18 Częściowe Optymalizacje Wektorów Równoległych 2400 1,281.877 milisekund 25,1127 milisekundy 64,8239 milisekund 0,09 Linia bazowa 4800 119 768,219 milisekund 181,5514 milisekundy 160,9406 milisekund 1,00 Częściowa optymalizacja 4800 55 590,374 milisekundy 414,6051 milisekundy 387,8218 milisekund 0,46 Częściowe Równoległe Optymalizacje 4800 15 614,506 milisekund 115,6996 milisekund 102,5647 milisekundy 0,13 SpartialVectorOptimisations 4800 18 257,991 milisekund 84,5978 milisekund 79,1329 milisekundy 0,15 Częściowe Optymalizacje Wektorów Równoległych 4800 12 785,824 milisekund 98,6947 milisekund 87,4903 milisekundy 0,11 Wniosek W tym poście nieco zagłębiliśmy się w problem najkrótszej ścieżki dla wszystkich par i zaimplementowaliśmy algorytm Floyda-Warshalla w C#, aby go rozwiązać. Zaktualizowaliśmy również naszą implementację, aby honorować dane i wykorzystywać funkcje niskiego poziomu .NET i sprzętu. W tym poście zagraliśmy „all in”. Jednak w rzeczywistych zastosowaniach ważne jest, aby pamiętać – nie jesteśmy sami. Hardkorowa paralelizacja może znacznie obniżyć wydajność systemu i spowodować więcej szkody niż pożytku. Z drugiej strony wektoryzacja jest nieco bezpieczniejsza, jeśli jest wykonywana w sposób niezależny od platformy. Pamiętaj, że niektóre instrukcje wektorowe mogą być dostępne tylko na niektórych procesorach. Mam nadzieję, że lektura przypadła Ci do gustu i że dobrze się bawiłeś, „wyciskając” trochę wydajności z algorytmu zaledwie w TRZECH cyklach 🙂 Odniesienia Floyd, RW Algorytm 97: Najkrótsza ścieżka / RW Floyd // Komunikacja ACM. – 1962. – Tom 5, nr 6. – S. 345-. Ingerman, PZ Algorytm 141: Macierz ścieżek / PZ Ingerman // Komunikacja ACM. – 1962. – Tom 5, nr 11. – S. 556.