paint-brush
Riešenie problému najkratších ciest všetkých párov pomocou Floyd-Warshallovho algoritmu v C# podľa@olegkarasik
678 čítania
678 čítania

Riešenie problému najkratších ciest všetkých párov pomocou Floyd-Warshallovho algoritmu v C#

podľa Oleg Karasik28m2024/09/26
Read on Terminal Reader

Príliš dlho; Čítať

V tomto príspevku demonštrujem, ako môžete implementovať Floyd-Warshallov algoritmus v C # na vyriešenie problému najkratšej cesty všetkých párov. Okrem implementácie tento príspevok obsahuje rôzne optimalizácie algoritmov vrátane vektorizácie a paralelizmu.
featured image - Riešenie problému najkratších ciest všetkých párov pomocou Floyd-Warshallovho algoritmu v C#
Oleg Karasik HackerNoon profile picture

Každý z nás mnohokrát denne rieši problém „ najkratšej cesty “. Samozrejme neúmyselne. Riešime to cestou do práce alebo keď si prezeráme internet alebo keď si ukladáme veci na stôl.


Znie to trochu... priveľa? Poďme to zistiť.


Predstavte si, že ste sa rozhodli stretnúť sa s priateľmi, no... povedzme v kaviarni. Najprv musíte nájsť cestu (alebo cestu) do kaviarne, takže začnete hľadať dostupnú verejnú dopravu (ak idete pešo) alebo trasy a ulice (ak idete autom). Hľadáte trasu z vašej aktuálnej polohy do kaviarne, ale nie „akúkoľvek“ trasu – najkratšiu alebo najrýchlejšiu.


Tu je ešte jeden príklad, ktorý nie je taký zrejmý ako ten predchádzajúci. Počas práce sa rozhodnete ísť „skratkou“ cez bočnú ulicu, pretože dobre... je to „skratka“ a je „rýchlejšie“ ísť touto cestou. Ale ako ste vedeli, že táto „skratka“ je rýchlejšia? Na základe osobných skúseností ste vyriešili problém „najkratšej cesty“ a vybrali ste si trasu, ktorá vedie cez bočnú ulicu.


V oboch príkladoch je „najkratšia“ trasa určená buď vzdialenosťou, alebo časom potrebným na to, aby ste sa dostali z jedného miesta na druhé. Cestovné príklady sú veľmi prirodzenými aplikáciami teórie grafov a zvlášť problému „najkratšej cesty“. Čo však príklad s usporiadaním vecí na stole? V tomto prípade môže „najkratšia“ predstavovať množstvo alebo zložitosť akcií, ktoré musíte vykonať, aby ste získali napríklad hárok papiera: otvoriť stôl, otvoriť priečinok, vziať hárok papiera alebo vziať hárok papiera priamo zo stola. .


Všetky vyššie uvedené príklady predstavujú problém nájdenia najkratšej cesty medzi dvoma vrcholmi v grafe (trasa alebo cesta medzi dvoma miestami, množstvo akcií alebo zložitosť získania listu papiera z jedného alebo druhého miesta). Táto trieda problémov s najkratšou cestou sa nazýva SSSP (Single Source Shortest Path) a základným algoritmom na riešenie týchto problémov je Dijkstrov algoritmus , ktorý má výpočtovú zložitosť O(n^2) .


Niekedy však potrebujeme nájsť všetky najkratšie cesty medzi všetkými vrcholmi. Zvážte nasledujúci príklad: vytvárate pre vás mapu pravidelných pohybov medzi domovom , prácou a divadlom . V tomto scenári budete mať 6 trás: work ⭢ home , home ⭢ work , work ⭢ theatre , theatre ⭢ work , home ⭢ theatre a theatre ⭢ home (opačné trasy sa môžu líšiť napríklad z dôvodu jednosmerných ciest) .


Pridaním ďalšieho miesta na mapu dôjde k výraznému nárastu kombinácií – podľa permutácií n vzorca kombinatoriky to možno vypočítať ako:

 A(k, n) = n! / (n - m)! // where n - is a number of elements, // k - is a number of elements in permutation (in our case k = 2)

Čo nám dáva 12 trás pre 4 miesta a 90 trás pre 10 miest (čo je pôsobivé). Poznámka... toto sa neberie do úvahy medziľahlé body medzi miestami, tj aby ste sa dostali z domu do práce, musíte prejsť 4 ulice, prejsť popri rieke a prejsť cez most. Ak si predstavíme, niektoré cesty môžu mať spoločné medziľahlé body... no... výsledkom bude veľmi veľký graf s množstvom vrcholov, kde každý vrchol bude predstavovať buď miesto, alebo významný medziľahlý bod. Trieda problémov, kde potrebujeme nájsť všetky najkratšie cesty medzi všetkými pármi vrcholov v grafe, sa nazýva APSP (All Pairs Shortest Paths) a základným algoritmom na riešenie týchto problémov je Floyd-Warshallov algoritmus , ktorý má O(n^3) výpočtová zložitosť.


A toto je algoritmus, ktorý dnes implementujeme 🙂

Floyd-Warshallov algoritmus

Floyd-Warshallov algoritmus nájde všetky najkratšie cesty medzi každým párom vrcholov v grafe. Algoritmy publikoval Robert Floyd v [1] (ďalšie podrobnosti nájdete v časti „Odkazy“). V tom istom roku Peter Ingerman v [2] opísal modernú implementáciu algoritmu vo forme troch vnorených slučiek for :

 algorithm FloydWarshall(W) do for k = 0 to N - 1 do for i = 0 to N - 1 do for j = 0 to N - 1 do W[i, j] = min(W[i, j], W[i, k] + W[k, j]) end for end for end for end algorithm // where W - is a weight matrix of N x N size, // min() - is a function which returns lesser of it's arguments

Ak ste nikdy nezmenili prácu s grafom reprezentovaným vo forme matice, môže byť ťažké pochopiť, čo robí vyššie uvedený algoritmus. Aby sme sa uistili, že sme na rovnakej stránke, pozrime sa na to, ako môže byť graf reprezentovaný vo forme matice a prečo je takáto reprezentácia užitočná pri riešení problému s najkratšou cestou.


Obrázok nižšie ilustruje orientovaný, vážený graf 5 vrcholov. Vľavo je graf prezentovaný vo vizuálnej forme, ktorá pozostáva z kruhov a šípok, kde každý kruh predstavuje vrchol a šípka predstavuje hranu so smerom. Číslo vo vnútri kruhu zodpovedá číslu vrcholu a číslo nad okrajom zodpovedá hmotnosti hrany. Na pravej strane je rovnaký graf prezentovaný vo forme matice váh. Váhová matica je forma matice susednosti , kde každá bunka matice obsahuje „váhu“ – vzdialenosť medzi vrcholom i (riadok) a vrcholom j (stĺpec). Váhová matica neobsahuje informácie o „ceste“ medzi vrcholmi (zoznam vrcholov, cez ktoré sa dostanete z i do j ) – iba váhu (vzdialenosť) medzi týmito vrcholmi.


Obrázok 1. Znázornenie orientovaného, váženého grafu 5 vrcholov vo vizuálnej forme (vľavo) a váženej maticovej forme (vpravo).


V matici váh môžeme vidieť, že hodnoty buniek sa rovnajú váham medzi susednými vrcholmi. Preto, ak si prezrieme cesty z vrcholu 0 (riadok 0 ), uvidíme, že ... existuje len jedna cesta – od 0 do 1 . Ale na vizuálnej reprezentácii môžeme jasne vidieť cesty z vrcholu 0 k vrcholom 2 a 3 (cez vrchol 1 ). Dôvod je jednoduchý – v počiatočnom stave obsahuje matica váh iba vzdialenosť medzi susednými vrcholmi. Na nájdenie zvyšku však stačí len táto informácia.


Pozrime sa, ako to funguje. Venujte pozornosť bunke W[0, 1] . Jeho hodnota udáva, že z vrcholu 0 do vrcholu 1 vedie cesta s váhou rovnajúcou sa 1 (v skratke môžeme zapísať ako: 0 ⭢ 1: 1 ). S týmito znalosťami teraz môžeme skenovať všetky bunky riadku 1 (ktorý obsahuje všetky váhy všetkých ciest z vrcholu 1 ) a spätne portovať túto informáciu do riadku 0 , čím sa hmotnosť zvýši o 1 (hodnota W[0, 1] ).


Obrázok 2. Ilustrácia nájdenia všetkých ciest od vrcholu 0 k vrcholom susediacim s vrcholom 1.


Rovnakým postupom môžeme nájsť cesty z vrcholu 0 cez ďalšie vrcholy. Počas vyhľadávania sa môže stať, že k rovnakému vrcholu vedie viac ako jedna cesta a čo je dôležitejšie, váhy týchto ciest môžu byť rôzne. Príklad takejto situácie je znázornený na obrázku nižšie, kde vyhľadávanie od vrcholu 0 po vrchol 2 odhalilo ešte jednu cestu k vrcholu 3 s menšou hmotnosťou.


Obrázok 3. Ilustrácia situácie, keď vyhľadávanie od vrcholu 0 po vrchol 2 odhalilo dodatočnú kratšiu cestu k vrcholu 3.


Máme dve cesty: pôvodnú cestu – 0 ⭢ 3: 4 a novú cestu, ktorú sme práve objavili – 0 ⭢ 2 ⭢ 3: 3 (pamätajte, že matica váh neobsahuje cesty, takže nevieme, ktorú vrcholy sú zahrnuté v pôvodnej ceste). Toto je moment, kedy sa rozhodneme ponechať si najkratšiu a zapíšeme 3 do bunky W[0, 3] .


Zdá sa, že sme práve našli našu prvú najkratšiu cestu!


Kroky, ktoré sme práve videli, sú podstatou Floyd-Warshallovho algoritmu. Pozrite sa na pseudokód algoritmu ešte raz:

 algorithm FloydWarshall(W) do for k = 0 to N - 1 do for i = 0 to N - 1 do for j = 0 to N - 1 do W[i, j] = min(W[i, j], W[i, k] + W[k, j]) end for end for end for end algorithm

Najvzdialenejší cyklus for on k iteruje cez všetky vrcholy v grafe a v každej iterácii premenná k predstavuje vrchol , cez ktorý hľadáme cesty . Vnútorný cyklus for on i tiež iteruje cez všetky vrcholy v grafe a pri každej iterácii i predstavuje vrchol, z ktorého hľadáme cesty . A nakoniec najvnútornejší cyklus for j iteruje cez všetky vrcholy v grafe a v každej iterácii j predstavuje vrchol , ku ktorému hľadáme cestu. V kombinácii nám to dáva nasledovné: v každej iterácii k hľadáme cesty od všetkých vrcholov i do všetkých vrcholov j až po vrchol k . Vo vnútri cyklu porovnávame cestu i ⭢ j (predstavenú W[i, j] ) s dráhou i ⭢ k ⭢ j (predstavenú súčtom W[I, k] a W[k, j] ) a napíšeme najkratšiu jeden späť do W[i, j] .


Teraz, keď rozumieme mechanike, je čas implementovať algoritmus.

Základná implementácia

Zdrojový kód a experimentálne grafy sú dostupné v úložisku na GitHub. Experimentálne grafy možno nájsť v adresári Data/Sparse-Graphs.zip . Všetky benchmarky v tomto príspevku sú implementované v súbore APSP01.cs .

Predtým, ako sa pustíme do implementácie, musíme si ujasniť niekoľko technických momentov:

  1. Všetky implementácie pracujú s váhovou maticou reprezentovanou vo forme lineárneho poľa.
  2. Všetky implementácie používajú celočíselnú aritmetiku. Absencia cesty medzi vrcholmi je vyjadrená špeciálnou konštantnou váhou: NO_EDGE = (int.MaxValue / 2) - 1 .


Teraz poďme zistiť, prečo je to tak.


Čo sa týka #1. Keď hovoríme o matriciach, bunky definujeme ako „riadky“ a „stĺpce“. Z tohto dôvodu sa zdá prirodzené predstaviť si maticu vo forme „štvorca“ alebo „obdĺžnika“ (obrázok 4a).


Obrázok 4. Viacnásobné znázornenie matice. a) imaginárne „štvorcové“ zobrazenie; b) pole reprezentácie poľa; c) lineárna reprezentácia poľa.


To však nemusí znamenať, že musíme reprezentovať maticu vo forme poľa polí (obrázok 4b), aby sme sa držali našej predstavivosti. Namiesto toho môžeme maticu reprezentovať vo forme lineárneho poľa (obrázok 4c), kde sa index každej bunky vypočíta podľa nasledujúceho vzorca:

 i = row * row_size + col; // where row - cell row index, // col - cell column index, // row_size - number of cells in a row.

Lineárne pole váh matice je predpokladom pre efektívne vykonávanie Floyd-Warshallovho algoritmu. Dôvod nie je jednoduchý a podrobné vysvetlenie si zaslúži samostatný príspevok... alebo niekoľko príspevkov. V súčasnosti je však dôležité spomenúť, že takáto reprezentácia výrazne zlepšuje dátovú lokalitu , čo má v skutočnosti veľký vplyv na výkon algoritmu.

Tu vás žiadam, aby ste mi verili a mali na mysli práve túto informáciu ako podmienku, zároveň vám však odporúčam venovať chvíľu času a preštudovať si otázku a mimochodom – neverte ľuďom na internete .


- Poznámka autora

Čo sa týka #2. Ak sa bližšie pozriete na pseudokód algoritmu, nenájdete žiadne kontroly týkajúce sa existencie cesty medzi dvoma vrcholmi. Namiesto toho pseudokód jednoducho použije funkciu min() . Dôvod je jednoduchý – pôvodne, ak medzi vrcholmi neexistuje žiadna cesta, je hodnota bunky nastavená na infinity a vo všetkých jazykoch, okrem JavaScriptu, sú všetky hodnoty menšie ako infinity . V prípade celých čísel môže byť lákavé použiť int.MaxValue ako hodnotu „bez cesty“. To však povedie k pretečeniu celého čísla v prípadoch, keď hodnoty oboch ciest i ⭢ k a k ⭢ j budú rovné int.MaxValue . Preto používame hodnotu, ktorá je o jednu menšia ako polovica int.MaxValue .

Ahoj! Ale prečo nemôžeme pred vykonaním akýchkoľvek výpočtov skontrolovať, či cesta existuje. Napríklad porovnaním ciest oboch s 0 (ak berieme nulu ako hodnotu „bez cesty“).


- Premýšľavý čitateľ

Je to skutočne možné, ale nanešťastie to povedie k výraznému zníženiu výkonu. Stručne povedané, CPU vedie štatistiku výsledkov hodnotenia pobočky napr. keď sa niektoré z výrokov if vyhodnotia ako true alebo false . Túto štatistiku používa na spustenie kódu „štatisticky predpovedanej vetvy“ vopred pred vyhodnotením skutočného príkazu if (toto sa nazýva špekulatívne vykonávanie ), a preto vykoná kód efektívnejšie. Keď je však predikcia CPU nepresná, spôsobuje to značnú stratu výkonu v porovnaní so správnou predikciou a bezpodmienečným vykonávaním, pretože CPU sa musí zastaviť a vypočítať správnu vetvu.


Pretože pri každej iterácii k aktualizujeme významnú časť matice váh, štatistika vetvy CPU sa stáva zbytočnou, pretože neexistuje vzor kódu, všetky vetvy sú založené výlučne na údajoch. Takže takáto kontrola bude mať za následok značné množstvo nesprávnych predpovedí pobočiek .

Tu vás tiež žiadam, aby ste mi (zatiaľ) verili a potom venovali nejaký čas štúdiu témy.


Uhh, zdá sa, že sme skončili s teoretickou časťou – poďme implementovať algoritmus (túto implementáciu označujeme ako Baseline ):

 public void Baseline(int[] matrix, int sz) { for (var k = 0; k < sz; ++k) { for (var i = 0; i < sz; ++i) { for (var j = 0; j < sz; ++j) { var distance = matrix[i * sz + k] + matrix[k * sz + j]; if (matrix[i * sz + j] > distance) { matrix[i * sz + j] = distance; } } } } }

Vyššie uvedený kód je takmer identickou kópiou vyššie uvedeného pseudokódu s jedinou výnimkou – namiesto Math.Min() používame if na aktualizáciu bunky len v prípade potreby.

Ahoj! Moment, nebol si to ty, kto práve napísal veľa slov o tom, prečo tu nie je ak dobré a o pár riadkov neskôr si sami predstavíme ak?


- Premýšľavý čitateľ

Dôvod je jednoduchý. V momente písania JIT vydáva takmer ekvivalentný kód pre implementácie if aj Math.Min . Môžete si to skontrolovať podrobne na adrese sharplab.io, ale tu sú úryvky tiel hlavných slučiek:

 // // == Assembly code of implementation of innermost loop for of j using if. // 53: movsxd r14, r14d // // compare matrix[i * sz + j] and distance (Condition) // 56: cmp [rdx+r14*4+0x10], ebx 5b: jle short 64 // // if matrix[i * sz + j] greater than distance write distance to matrix // 5d: movsxd rbp, ebp 60: mov [rdx+rbp*4+0x10], ebx 64: // end of loop // // == Assembly code of implementation of innermost loop for of j using Math.Min. // 4f: movsxd rbp, ebp 52: mov r14d, [rdx+rbp*4+0x10] // // compare matrix[i * sz + j] and distance (Condition) // 57: cmp r14d, ebx. // // if matrix[i * sz + j] less than distance write value to matrix // 5a: jle short 5e // // otherwise write distance to matrix // 5c: jmp short 61 5e: mov ebx, r14d 61: mov [rdx+rbp*4+0x10], ebx 65: // end of loop

Môžete vidieť, bez ohľadu na to, či používame if alebo Math.Min stále existuje podmienená kontrola. Avšak v prípade if nie je zbytočné písať.


Teraz, keď sme dokončili implementáciu, je čas spustiť kód a zistiť, aké je to rýchle?!

Správnosť kódu si môžete overiť sami spustením testov v úložisku .

Na testovanie kódu používam Benchmark.NET (verzia 0.12.1). Všetky grafy použité v benchmarkoch sú orientované, acyklické grafy s 300, 600, 1200, 2400 a 4800 vrcholmi. Počet hrán v grafoch je okolo 80 % možného maxima, čo pre orientované, acyklické grafy možno vypočítať ako:

 var max = v * (v - 1)) / 2; // where v - is a number of vertexes in a graph.

Poďme rock!


Tu sú výsledky benchmarkov spustených na mojom PC (Windows 10.0.19042, Intel Core i7-7700 CPU 3,60 GHz (Kaby Lake) / 8 logických procesorov / 4 jadrá):


Metóda

Veľkosť

Priemerná

Chyba

StdDev

Základná línia

300

27,525 ms

0,1937 ms

0,1617 ms

Základná línia

600

217,897 ms

1,6415 ms

1,5355 ms

Základná línia

1200

1 763,335 ms

7,4561 ms

6,2262 ms

Základná línia

2400

14 533,335 ms

63,3518 ms

52,9016 ms

Základná línia

4800

119 768,219 ms

181,5514 ms

160,9406 ms


Z výsledkov, ktoré môžeme vidieť, čas výpočtu dramaticky rastie v porovnaní s veľkosťou grafu – pre graf s 300 vrcholmi to trvalo 27 milisekúnd, pre graf s 2400 vrcholmi – 14,5 sekundy a pre graf 4800 – 119 sekúnd, čo je takmer 2 minúty !


Pri pohľade na kód algoritmu môže byť ťažké si to predstaviť, existuje niečo, čo môžeme urobiť na urýchlenie výpočtov... pretože dobre... sú tam TRI slučky, len TRI slučky.


Ako to však už býva – možnosti sú skryté v detailoch 🙂

Poznáte údaje – riedke grafy

Floyd-Warshallov algoritmus je základný algoritmus na riešenie problému najkratšej cesty všetkých párov, najmä pokiaľ ide o husté alebo úplné grafy (pretože algoritmus hľadá cesty medzi všetkými pármi vrcholov).


V našich experimentoch však používame orientované, acyklické grafy, ktoré majú úžasnú vlastnosť – ak existuje cesta z vrcholu 1 do vrcholu 2 , potom z vrcholu 2 do vrcholu 1 žiadna cesta nevedie. Pre nás to znamená, že existuje veľa nesusediacich vrcholov, ktoré môžeme preskočiť, ak neexistuje cesta z i do k (túto implementáciu označujeme ako SpartialOptimisation ).

 public void SpartialOptimisation(int[] matrix, int sz) { for (var k = 0; k < sz; ++k) { for (var i = 0; i < sz; ++i) { if (matrix[i * sz + k] == NO_EDGE) { continue; } for (var j = 0; j < sz; ++j) { var distance = matrix[i * sz + k] + matrix[k * sz + j]; if (matrix[i * sz + j] > distance) { matrix[i * sz + j] = distance; } } } } }

Tu sú výsledky predchádzajúcich ( Baseline ) a súčasných ( SpartialOptimisation ) implementácií na rovnakej sade grafov (najrýchlejšie výsledky sú zvýraznené tučným písmom ):


Metóda

Veľkosť

Priemerný

Chyba

StdDev

Pomer

Základná línia

300

27,525 ms

0,1937 ms

0,1617 ms

1,00

Čiastočná optimalizácia

300

12,399 ms

0,0943 ms

0,0882 ms

0,45

Základná línia

600

217,897 ms

1,6415 ms

1,5355 ms

1,00

Čiastočná optimalizácia

600

99,122 ms

0,8230 ms

0,7698 ms

0,45

Základná línia

1200

1 763,335 ms

7,4561 ms

6,2262 ms

1,00

Čiastočná optimalizácia

1200

766,675 ms

6,1147 ms

5,7197 ms

0,43

Základná línia

2400

14 533,335 ms

63,3518 ms

52,9016 ms

1,00

Čiastočná optimalizácia

2400

6 507,878 ms

28,2317 ms

26,4079 ms

0,45

Základná línia

4800

119 768,219 ms

181,5514 ms

160,9406 ms

1,00

Čiastočná optimalizácia

4800

55 590,374 ms

414,6051 ms

387,8218 ms

0,46


Celkom pôsobivé!


Skrátili sme čas výpočtu na polovicu! Samozrejme, čím je graf hustejší, tým menšie bude zrýchlenie. Toto je však jedna z optimalizácií, ktorá sa hodí, ak vopred viete, s akou triedou údajov máte pracovať.


Môžeme urobiť viac? 🙂

Poznajte svoj hardvér – paralelizmus


Môj počítač je vybavený procesorom Inter Core i7-7700 CPU 3.60GHz (Kaby Lake) s 8 logickými procesormi ( HW ) alebo 4 jadrami s technológiou Hyper-Threading . Mať viac ako jedno jadro je ako mať viac „náhradných rúk“, ktoré môžeme dať do práce. Pozrime sa teda, ktorú časť práce možno efektívne rozdeliť medzi viacerých pracovníkov a potom ju paralelne uviesť.


Slučky sú vždy najzrejmejším kandidátom na paralelizáciu, pretože vo väčšine prípadov sú iterácie nezávislé a môžu sa vykonávať súčasne. V algoritme máme tri slučky, ktoré by sme mali analyzovať a zistiť, či existujú nejaké závislosti, ktoré nám bránia v ich paralelizácii.


Začnime od for of k cyklu. V každej iteračnej slučke vypočítava cesty z každého vrcholu do každého vrcholu cez vrchol k . Nové a aktualizované cesty sú uložené v matici váh. Pri pohľade na iterácie si môžete všimnúť – možno ich vykonať v akomkoľvek poradí: 0, 1, 2, 3 alebo 3, 1, 2, 0 bez ovplyvnenia výsledku. Stále sa však musia vykonávať postupne, inak niektoré z iterácií nebudú môcť používať nové alebo aktualizované cesty, pretože ešte nebudú zapísané do matice váh. Takáto nekonzistentnosť určite zničí výsledky. Musíme teda hľadať ďalej.


Ďalším kandidátom je slučka for of i . V každej iteračnej slučke načíta cestu z vrcholu i do vrcholu k (bunka: W[i, k] ), cestu z vrcholu k do vrcholu j (bunka: W[k, j ]) a potom skontroluje, či známa cesta z i do j (bunka: W[i, j] ) je kratšia ako cesta i ⭢ k ⭢ j (súčet: W[i, k] + W[k, j] ). Ak sa pozriete bližšie na prístupový vzor, môžete si všimnúť – pri každej iterácii i slučka číta dáta z k riadku a aktualizovaného i riadku, čo v podstate znamená – iterácie sú nezávislé a môžu sa vykonávať nielen v akomkoľvek poradí, ale aj paralelne!


Vyzerá to sľubne, tak to poďme implementovať (túto implementáciu označujeme ako SpartialParallelOptimisations ).

for of j slučka môže byť tiež paralelná. Paralelizácia najvnútornejšieho cyklu je však v tomto prípade veľmi neefektívna. Môžete si to overiť sami vykonaním niekoľkých jednoduchých zmien v zdrojovom kóde .


- Poznámka autora

 public void SpartialParallelOptimisations(int[] matrix, int sz) { for (var k = 0; k < sz; ++k) { Parallel.For(0, sz, i => { if (matrix[i * sz + k] == NO_EDGE) { return; } for (var j = 0; j < sz; ++j) { var distance = matrix[i * sz + k] + matrix[k * sz + j]; if (matrix[i * sz + j] > distance) { matrix[i * sz + j] = distance; } } }); } }

Tu sú výsledky implementácií Baseline , SpartialOptimisation a SpartialParallelOptimisations na rovnakej sade grafov (paralelizácia sa vykonáva pomocou triedy Parallel ):


Metóda

Veľkosť

Priemerná

Chyba

StdDev

Pomer

Základná línia

300

27,525 ms

0,1937 ms

0,1617 ms

1,00

Čiastočná optimalizácia

300

12,399 ms

0,0943 ms

0,0882 ms

0,45

SpartialParallelOptimizations

300

6,252 ms

0,0211 ms

0,0187 ms

0,23

Základná línia

600

217,897 ms

1,6415 ms

1,5355 ms

1,00

Čiastočná optimalizácia

600

99,122 ms

0,8230 ms

0,7698 ms

0,45

SpartialParallelOptimizations

600

35,825 ms

0,1003 ms

0,0837 ms

0,16

Základná línia

1200

1 763,335 ms

7,4561 ms

6,2262 ms

1,00

Čiastočná optimalizácia

1200

766,675 ms

6,1147 ms

5,7197 ms

0,43

SpartialParallelOptimizations

1200

248,801 ms

0,6040 ms

0,5043 ms

0,14

Základná línia

2400

14 533,335 ms

63,3518 ms

52,9016 ms

1,00

Čiastočná optimalizácia

2400

6 507,878 ms

28,2317 ms

26,4079 ms

0,45

SpartialParallelOptimizations

2400

2 076,403 ms

20,8320 ms

19,4863 ms

0,14

Základná línia

4800

119 768,219 ms

181,5514 ms

160,9406 ms

1,00

Čiastočná optimalizácia

4800

55 590,374 ms

414,6051 ms

387,8218 ms

0,46

SpartialParallelOptimizations

4800

15 614,506 ms

115,6996 ms

102,5647 ms

0,13


Z výsledkov môžeme vidieť, že paralelizácia cyklu for of i skrátila čas výpočtu 2-4 krát v porovnaní s predchádzajúcou implementáciou ( SpartialOptimisation )! Je to veľmi pôsobivé, ale je dôležité si uvedomiť, že paralelizácia čistých výpočtov spotrebúva všetky dostupné výpočtové zdroje, čo môže viesť k nedostatku zdrojov iných aplikácií v systéme. Paralelizácia by sa mala používať opatrne.


Ako asi tušíte – toto nie je koniec 🙂

Poznajte svoju platformu – vektorizácia

Vektorizácia je transformácia kódu fungujúceho na jednom prvku na kód fungujúci na viacerých prvkoch súčasne.

Môže to znieť ako zložitá záležitosť, tak sa pozrime, ako to funguje na jednoduchom príklade:

 var a = new [] { 5, 7, 8, 1 }; var b = new [] { 4, 2, 2, 6 }; var c = new [] { 0, 0, 0, 0 }; for (var i = 0; i < 4; ++i) c[i] = a[i] + b[i];

Priveľmi zjednodušeným spôsobom možno vykonanie iterácie 0 vyššie uvedeného cyklu for na úrovni CPU znázorniť takto:


Obrázok 5. Zjednodušená ilustrácia skalárneho vykonávania opakovania slučky na úrovni CPU.


Tu je to, čo sa deje. CPU načítava hodnoty polí a a b z pamäte do registrov CPU (jeden register môže obsahovať práve jednu hodnotu). Potom CPU vykoná operáciu skalárneho sčítania na týchto registroch a zapíše výsledok späť do hlavnej pamäte – priamo do poľa c . Tento proces sa opakuje štyrikrát pred koncom slučky.


Vektorizácia znamená použitie špeciálnych CPU registrov – vektorových alebo SIMD (single inštrukcie multiple data) registrov a zodpovedajúcich CPU inštrukcií na vykonávanie operácií s viacerými hodnotami poľa naraz:


Obrázok 6. Zjednodušená ilustrácia vykonávania vektorovej iterácie slučky na úrovni CPU.


Tu je to, čo sa deje. CPU načíta hodnoty polí a a b z pamäte do registrov CPU (tentoraz však jeden vektorový register môže obsahovať dve hodnoty poľa). Potom CPU vykoná operáciu sčítania vektorov na týchto registroch a zapíše výsledok späť do hlavnej pamäte – priamo do poľa c . Pretože pracujeme s dvoma hodnotami naraz, tento proces sa namiesto štyroch opakuje dvakrát.


Na implementáciu vektorizácie v .NET môžeme použiť – typy Vector a Vector<T> (môžeme použiť aj typy z menného priestoru System.Runtime.Intrinsics , sú však trochu pokročilé pre aktuálnu implementáciu, takže ich nebudem používať, ale cítim môžete si ich vyskúšať sami):

 public void SpartialVectorOptimisations(int[] matrix, int sz) { for (var k = 0; k < sz; ++k) { for (var i = 0; i < sz; ++i) { if (matrix[i * sz + k] == NO_EDGE) { continue; } var ik_vec = new Vector<int>(matrix[i * sz + k]); var j = 0; for (; j < sz - Vector<int>.Count; j += Vector<int>.Count) { var ij_vec = new Vector<int>(matrix, i * sz + j); var ikj_vec = new Vector<int>(matrix, k * sz + j) + ik_vec; var lt_vec = Vector.LessThan(ij_vec, ikj_vec); if (lt_vec == new Vector<int>(-1)) { continue; } var r_vec = Vector.ConditionalSelect(lt_vec, ij_vec, ikj_vec); r_vec.CopyTo(matrix, i * sz + j); } for (; j < sz; ++j) { var distance = matrix[i * sz + k] + matrix[k * sz + j]; if (matrix[i * sz + j] > distance) { matrix[i * sz + j] = distance; } } } } }

Vektorizovaný kód môže vyzerať trochu bizarne, tak si ho poďme prejsť krok za krokom.


Vytvoríme vektor cesty i ⭢ k : var ik_vec = new Vector<int>(matrix[i * sz + k]) . Výsledkom je, že ak vektor môže obsahovať štyri hodnoty typu int a váha cesty i ⭢ k je rovná 5, vytvoríme vektor štyroch 5 – [5, 5, 5, 5]. Ďalej pri každej iterácii súčasne vypočítame cesty z vrcholu i k vrcholom j, j + 1, ..., j + Vector<int>.Count .

Vlastnosť Vector<int>.Count vráti počet prvkov typu int , ktoré sa zmestia do vektorových registrov. Veľkosť vektorových registrov závisí od modelu CPU, takže táto vlastnosť môže vrátiť rôzne hodnoty na rôznych CPU.


- Poznámka autora

Celý proces výpočtu možno rozdeliť do troch krokov:

  1. Načítajte informácie z matice váh do vektorov: ij_vec a ikj_vec .
  2. Porovnajte vektory ij_vec a ikj_vec a vyberte najmenšie hodnoty do nového vektora r_vec .
  3. Napíšte r_vec späť do matice váh.


Zatiaľ čo #1 a #3 sú celkom jednoduché, #2 vyžaduje vysvetlenie. Ako už bolo spomenuté, pomocou vektorov manipulujeme s viacerými hodnotami súčasne. Preto výsledok niektorých operácií nemôže byť singulárny, tj porovnávacia operácia Vector.LessThan(ij_vec, ikj_vec) nemôže vrátiť true alebo false , pretože porovnáva viacero hodnôt. Takže namiesto toho vráti nový vektor, ktorý obsahuje výsledky porovnania medzi zodpovedajúcimi hodnotami z vektorov ij_vec a ikj_vec ( -1 , ak je hodnota z ij_vec menšia ako hodnota z ikj_vec a 0 , ak je to inak). Vrátený vektor (samotný o sebe) nie je veľmi užitočný, ale môžeme použiť vektorovú operáciu Vector.ConditionalSelect(lt_vec, ij_vec, ikj_vec) na extrahovanie požadovaných hodnôt z vektorov ij_vec a ikj_vec do nového vektora – r_vec . Táto operácia vráti nový vektor, kde hodnoty sú rovné menšie ako dve zodpovedajúce hodnoty vstupných vektorov, tj ak je hodnota vektora lt_vec rovná -1 , potom operácia vyberie hodnotu z ij_vec , v opačnom prípade vyberie hodnotu z ikj_vec .


Okrem #2 je tu ešte jedna časť, ktorá si vyžaduje vysvetlenie – druhá, nevektorizovaná slučka. Vyžaduje sa, keď veľkosť matice váh nie je súčinom hodnoty Vector<int>.Count . V takom prípade hlavná slučka nemôže spracovať všetky hodnoty (pretože nemôžete načítať časť vektora) a druhá, nevektorovaná, slučka vypočíta zostávajúce iterácie.


Tu sú výsledky tejto a všetkých predchádzajúcich implementácií:


Metóda

sz

Priemerná

Chyba

StdDev

Pomer

Základná línia

300

27,525 ms

0,1937 ms

0,1617 ms

1,00

Čiastočná optimalizácia

300

12,399 ms

0,0943 ms

0,0882 ms

0,45

SpartialParallelOptimizations

300

6,252 ms

0,0211 ms

0,0187 ms

0,23

SpartialVectorOptimizations

300

3,056 ms

0,0301 ms

0,0281 ms

0,11

Základná línia

600

217,897 ms

1,6415 ms

1,5355 ms

1,00

Čiastočná optimalizácia

600

99,122 ms

0,8230 ms

0,7698 ms

0,45

SpartialParallelOptimizations

600

35,825 ms

0,1003 ms

0,0837 ms

0,16

SpartialVectorOptimizations

600

24,378 ms

0,4320 ms

0,4041 ms

0,11

Základná línia

1200

1 763,335 ms

7,4561 ms

6,2262 ms

1,00

Čiastočná optimalizácia

1200

766,675 ms

6,1147 ms

5,7197 ms

0,43

SpartialParallelOptimizations

1200

248,801 ms

0,6040 ms

0,5043 ms

0,14

SpartialVectorOptimizations

1200

185,628 ms

2,1240 ms

1,9868 ms

0,11

Základná línia

2400

14 533,335 ms

63,3518 ms

52,9016 ms

1,00

Čiastočná optimalizácia

2400

6 507,878 ms

28,2317 ms

26,4079 ms

0,45

SpartialParallelOptimizations

2400

2 076,403 ms

20,8320 ms

19,4863 ms

0,14

SpartialVectorOptimizations

2400

2 568,676 ms

31,7359 ms

29,6858 ms

0,18

Základná línia

4800

119 768,219 ms

181,5514 ms

160,9406 ms

1,00

Čiastočná optimalizácia

4800

55 590,374 ms

414,6051 ms

387,8218 ms

0,46

SpartialParallelOptimizations

4800

15 614,506 ms

115,6996 ms

102,5647 ms

0,13

SpartialVectorOptimizations

4800

18 257,991 ms

84,5978 ms

79,1329 ms

0,15


Z výsledku je zrejmé, že vektorizácia výrazne skrátila čas výpočtu – 3 až 4 krát v porovnaní s neparalelnou verziou ( SpartialOptimisation ). Zaujímavým momentom je, že vektorizovaná verzia tiež prekonáva paralelnú verziu ( SpartialParallelOptimisations ) na menších grafoch (menej ako 2400 vrcholov).


A v neposlednom rade – spojme vektorizáciu a paralelizmus!

Ak vás zaujíma praktická aplikácia vektorizácie, odporúčam vám prečítať si sériu príspevkov od Dana Shechtera , kde vektorizoval Array.Sort (tieto výsledky sa neskôr ocitli v aktualizácii pre Garbage Collector v .NET 5 ).

Poznajte svoju platformu a hardvér – vektorizácia a paralelizmus!

Posledná implementácia kombinuje úsilie o paralelizáciu a vektorizáciu a... úprimne povedané, chýba jej individualita 🙂 Pretože... no, práve sme nahradili telo Parallel.For od SpartialParallelOptimisations vektorizovanou slučkou od SpartialVectorOptimisations :

 public void SpartialParallelVectorOptimisations(int[] matrix, int sz) { for (var k = 0; k < sz; ++k) { Parallel.For(0, sz, i => { if (matrix[i * sz + k] == NO_EDGE) { return; } var ik_vec = new Vector<int>(matrix[i * sz + k]); var j = 0; for (; j < sz - Vector<int>.Count; j += Vector<int>.Count) { var ij_vec = new Vector<int>(matrix, i * sz + j); var ikj_vec = new Vector<int>(matrix, k * sz + j) + ik_vec; var lt_vec = Vector.LessThan(ij_vec, ikj_vec); if (lt_vec == new Vector<int>(-1)) { continue; } var r_vec = Vector.ConditionalSelect(lt_vec, ij_vec, ikj_vec); r_vec.CopyTo(matrix, i * sz + j); } for (; j < sz; ++j) { var distance = matrix[i * sz + k] + matrix[k * sz + j]; if (matrix[i * sz + j] > distance) { matrix[i * sz + j] = distance; } } }); } }

Všetky výsledky tohto príspevku sú uvedené nižšie. Ako sa očakávalo, simultánne použitie paralelizmu a vektorizácie preukázalo najlepšie výsledky, ktoré skrátilo čas výpočtu až 25-krát (pre grafy s 1200 vrcholmi) v porovnaní s počiatočnou implementáciou.


Metóda

sz

Priemerná

Chyba

StdDev

Pomer

Základná línia

300

27,525 ms

0,1937 ms

0,1617 ms

1,00

Čiastočná optimalizácia

300

12,399 ms

0,0943 ms

0,0882 ms

0,45

SpartialParallelOptimizations

300

6,252 ms

0,0211 ms

0,0187 ms

0,23

SpartialVectorOptimizations

300

3,056 ms

0,0301 ms

0,0281 ms

0,11

SpartialParallelVectorOptimizations

300

3,008 ms

0,0075 ms

0,0066 ms

0,11

Základná línia

600

217,897 ms

1,6415 ms

1,5355 ms

1,00

Čiastočná optimalizácia

600

99,122 ms

0,8230 ms

0,7698 ms

0,45

SpartialParallelOptimizations

600

35,825 ms

0,1003 ms

0,0837 ms

0,16

SpartialVectorOptimizations

600

24,378 ms

0,4320 ms

0,4041 ms

0,11

SpartialParallelVectorOptimizations

600

13,425 ms

0,0319 ms

0,0283 ms

0,06

Základná línia

1200

1 763,335 ms

7,4561 ms

6,2262 ms

1,00

Čiastočná optimalizácia

1200

766,675 ms

6,1147 ms

5,7197 ms

0,43

SpartialParallelOptimizations

1200

248,801 ms

0,6040 ms

0,5043 ms

0,14

SpartialVectorOptimizations

1200

185,628 ms

2,1240 ms

1,9868 ms

0,11

SpartialParallelVectorOptimizations

1200

76,770 ms

0,3021 ms

0,2522 ms

0,04

Základná línia

2400

14 533,335 ms

63,3518 ms

52,9016 ms

1,00

Čiastočná optimalizácia

2400

6 507,878 ms

28,2317 ms

26,4079 ms

0,45

SpartialParallelOptimizations

2400

2 076,403 ms

20,8320 ms

19,4863 ms

0,14

SpartialVectorOptimizations

2400

2 568,676 ms

31,7359 ms

29,6858 ms

0,18

SpartialParallelVectorOptimizations

2400

1 281,877 ms

25,1127 ms

64,8239 ms

0,09

Základná línia

4800

119 768,219 ms

181,5514 ms

160,9406 ms

1,00

Čiastočná optimalizácia

4800

55 590,374 ms

414,6051 ms

387,8218 ms

0,46

SpartialParallelOptimizations

4800

15 614,506 ms

115,6996 ms

102,5647 ms

0,13

SpartialVectorOptimizations

4800

18 257,991 ms

84,5978 ms

79,1329 ms

0,15

SpartialParallelVectorOptimizations

4800

12 785,824 ms

98,6947 ms

87,4903 ms

0,11

Záver

V tomto príspevku sme sa mierne ponorili do problému najkratšej cesty všetkých párov a implementovali sme Floyd-Warshallov algoritmus v C#, aby sme ho vyriešili. Aktualizovali sme tiež našu implementáciu, aby rešpektovala dáta a využívala nízkoúrovňové funkcie .NET a hardvéru.


V tomto príspevku sme hrali „all in“. V aplikáciách v reálnom živote je však dôležité mať na pamäti – nie sme sami. Hardcore paralelizácia môže výrazne znížiť výkon systému a spôsobiť viac škody ako úžitku. Vektorizácia na druhej strane je o niečo bezpečnejšia, ak sa vykonáva spôsobom nezávislým od platformy. Pamätajte, že niektoré vektorové inštrukcie môžu byť dostupné len na určitých CPU.


Dúfam, že sa vám čítanie páčilo a zabavili ste sa pri „vytláčaní“ niektorých kúskov výkonu z algoritmu len s TROCH cyklami 🙂

Referencie


  1. Floyd, RW Algorithm 97: Najkratšia cesta / RW Floyd // Komunikácia ACM. – 1962. – Sv. 5, №. 6. – S. 345-.
  2. Ingerman, PZ Algoritmus 141: Matica cesty / PZ Ingerman // Komunikácia ACM. – 1962. – Sv. 5, №. 11. – S. 556.