Každý z nás mnohokrát denne rieši problém „ “. Samozrejme neúmyselne. Riešime to cestou do práce alebo keď si prezeráme internet alebo keď si ukladáme veci na stôl. najkratšej cesty Znie to trochu... priveľa? Poďme to zistiť. Predstavte si, že ste sa rozhodli stretnúť sa s priateľmi, no... povedzme v kaviarni. Najprv musíte nájsť cestu (alebo cestu) do kaviarne, takže začnete hľadať dostupnú verejnú dopravu (ak idete pešo) alebo trasy a ulice (ak idete autom). Hľadáte trasu z vašej aktuálnej polohy do kaviarne, ale nie „akúkoľvek“ trasu – najkratšiu alebo najrýchlejšiu. Tu je ešte jeden príklad, ktorý nie je taký zrejmý ako ten predchádzajúci. Počas práce sa rozhodnete ísť „skratkou“ cez bočnú ulicu, pretože dobre... je to „skratka“ a je „rýchlejšie“ ísť touto cestou. Ale ako ste vedeli, že táto „skratka“ je rýchlejšia? Na základe osobných skúseností ste vyriešili problém „najkratšej cesty“ a vybrali ste si trasu, ktorá vedie cez bočnú ulicu. V oboch príkladoch je „najkratšia“ trasa určená buď vzdialenosťou, alebo časom potrebným na to, aby ste sa dostali z jedného miesta na druhé. Cestovné príklady sú veľmi prirodzenými aplikáciami a zvlášť problému „najkratšej cesty“. Čo však príklad s usporiadaním vecí na stole? V tomto prípade môže „najkratšia“ predstavovať množstvo alebo zložitosť akcií, ktoré musíte vykonať, aby ste získali napríklad hárok papiera: otvoriť stôl, otvoriť priečinok, vziať hárok papiera alebo vziať hárok papiera priamo zo stola. . teórie grafov Všetky vyššie uvedené príklady predstavujú problém nájdenia najkratšej cesty medzi dvoma vrcholmi v grafe (trasa alebo cesta medzi dvoma miestami, množstvo akcií alebo zložitosť získania listu papiera z jedného alebo druhého miesta). Táto trieda problémov s najkratšou cestou sa nazýva a základným algoritmom na riešenie týchto problémov je , ktorý má výpočtovú zložitosť . SSSP (Single Source Shortest Path) Dijkstrov algoritmus O(n^2) Niekedy však potrebujeme nájsť všetky najkratšie cesty medzi všetkými vrcholmi. Zvážte nasledujúci príklad: vytvárate pre vás mapu pravidelných pohybov medzi , a . V tomto scenári budete mať 6 trás: , , , , a (opačné trasy sa môžu líšiť napríklad z dôvodu jednosmerných ciest) . domovom prácou divadlom work ⭢ home home ⭢ work work ⭢ theatre theatre ⭢ work home ⭢ theatre theatre ⭢ home Pridaním ďalšieho miesta na mapu dôjde k výraznému nárastu kombinácií – podľa vzorca to možno vypočítať ako: permutácií n kombinatoriky A(k, n) = n! / (n - m)! // where n - is a number of elements, // k - is a number of elements in permutation (in our case k = 2) Čo nám dáva 12 trás pre 4 miesta a 90 trás pre 10 miest (čo je pôsobivé). Poznámka... toto sa neberie do úvahy medziľahlé body medzi miestami, tj aby ste sa dostali z domu do práce, musíte prejsť 4 ulice, prejsť popri rieke a prejsť cez most. Ak si predstavíme, niektoré cesty môžu mať spoločné medziľahlé body... no... výsledkom bude veľmi veľký graf s množstvom vrcholov, kde každý vrchol bude predstavovať buď miesto, alebo významný medziľahlý bod. Trieda problémov, kde potrebujeme nájsť všetky najkratšie cesty medzi všetkými pármi vrcholov v grafe, sa nazýva a základným algoritmom na riešenie týchto problémov je , ktorý má výpočtová zložitosť. APSP (All Pairs Shortest Paths) Floyd-Warshallov algoritmus O(n^3) A toto je algoritmus, ktorý dnes implementujeme 🙂 Floyd-Warshallov algoritmus Floyd-Warshallov algoritmus nájde všetky najkratšie cesty medzi každým párom vrcholov v grafe. Algoritmy publikoval Robert Floyd v [1] (ďalšie podrobnosti nájdete v časti „Odkazy“). V tom istom roku Peter Ingerman v [2] opísal modernú implementáciu algoritmu vo forme troch vnorených slučiek : for algorithm FloydWarshall(W) do for k = 0 to N - 1 do for i = 0 to N - 1 do for j = 0 to N - 1 do W[i, j] = min(W[i, j], W[i, k] + W[k, j]) end for end for end for end algorithm // where W - is a weight matrix of N x N size, // min() - is a function which returns lesser of it's arguments Ak ste nikdy nezmenili prácu s grafom reprezentovaným vo forme matice, môže byť ťažké pochopiť, čo robí vyššie uvedený algoritmus. Aby sme sa uistili, že sme na rovnakej stránke, pozrime sa na to, ako môže byť graf reprezentovaný vo forme matice a prečo je takáto reprezentácia užitočná pri riešení problému s najkratšou cestou. Obrázok nižšie ilustruje graf 5 vrcholov. Vľavo je graf prezentovaný vo vizuálnej forme, ktorá pozostáva z kruhov a šípok, kde každý kruh predstavuje vrchol a šípka predstavuje hranu so smerom. Číslo vo vnútri kruhu zodpovedá číslu vrcholu a číslo nad okrajom zodpovedá hmotnosti hrany. Na pravej strane je rovnaký graf prezentovaný vo forme matice váh. Váhová matica je forma , kde každá bunka matice obsahuje „váhu“ – vzdialenosť medzi vrcholom (riadok) a vrcholom (stĺpec). Váhová matica neobsahuje informácie o „ceste“ medzi vrcholmi (zoznam vrcholov, cez ktoré sa dostanete z do ) – iba váhu (vzdialenosť) medzi týmito vrcholmi. orientovaný, vážený matice susednosti i j i j V matici váh môžeme vidieť, že hodnoty buniek sa rovnajú váham medzi vrcholmi. Preto, ak si prezrieme cesty z vrcholu (riadok ), uvidíme, že ... existuje len jedna cesta – od do . Ale na vizuálnej reprezentácii môžeme jasne vidieť cesty z vrcholu k vrcholom a (cez vrchol ). Dôvod je jednoduchý – v počiatočnom stave obsahuje matica váh iba vzdialenosť medzi susednými vrcholmi. Na zvyšku však stačí len táto informácia. susednými 0 0 0 1 0 2 3 1 nájdenie Pozrime sa, ako to funguje. Venujte pozornosť bunke . Jeho hodnota udáva, že z vrcholu do vrcholu vedie cesta s váhou rovnajúcou sa (v skratke môžeme zapísať ako: ). S týmito znalosťami teraz môžeme skenovať všetky bunky riadku (ktorý obsahuje všetky váhy všetkých ciest z vrcholu ) a spätne portovať túto informáciu do riadku , čím sa hmotnosť zvýši o (hodnota ). W[0, 1] 0 1 1 0 ⭢ 1: 1 1 1 0 1 W[0, 1] Rovnakým postupom môžeme nájsť cesty z vrcholu cez ďalšie vrcholy. Počas vyhľadávania sa môže stať, že k rovnakému vrcholu vedie viac ako jedna cesta a čo je dôležitejšie, váhy týchto ciest môžu byť rôzne. Príklad takejto situácie je znázornený na obrázku nižšie, kde vyhľadávanie od vrcholu po vrchol odhalilo ešte jednu cestu k vrcholu s menšou hmotnosťou. 0 0 2 3 Máme dve cesty: pôvodnú cestu – a novú cestu, ktorú sme práve objavili – (pamätajte, že matica váh neobsahuje cesty, takže nevieme, ktorú vrcholy sú zahrnuté v pôvodnej ceste). Toto je moment, kedy sa rozhodneme ponechať si najkratšiu a zapíšeme do bunky . 0 ⭢ 3: 4 0 ⭢ 2 ⭢ 3: 3 3 W[0, 3] Zdá sa, že sme práve našli našu prvú najkratšiu cestu! Kroky, ktoré sme práve videli, sú podstatou Floyd-Warshallovho algoritmu. Pozrite sa na pseudokód algoritmu ešte raz: algorithm FloydWarshall(W) do for k = 0 to N - 1 do for i = 0 to N - 1 do for j = 0 to N - 1 do W[i, j] = min(W[i, j], W[i, k] + W[k, j]) end for end for end for end algorithm Najvzdialenejší cyklus on iteruje cez všetky vrcholy v grafe a v každej iterácii premenná predstavuje vrchol . Vnútorný cyklus on tiež iteruje cez všetky vrcholy v grafe a pri každej iterácii predstavuje vrchol, . A nakoniec najvnútornejší cyklus iteruje cez všetky vrcholy v grafe a v každej iterácii predstavuje vrchol V kombinácii nám to dáva nasledovné: v každej iterácii hľadáme cesty od všetkých vrcholov do všetkých vrcholov až po vrchol . Vo vnútri cyklu porovnávame cestu (predstavenú ) s dráhou (predstavenú súčtom a ) a napíšeme najkratšiu jeden späť do . for k k , cez ktorý hľadáme cesty for i i z ktorého hľadáme cesty for j j , ku ktorému hľadáme cestu. k i j k i ⭢ j W[i, j] i ⭢ k ⭢ j W[I, k] W[k, j] W[i, j] Teraz, keď rozumieme mechanike, je čas implementovať algoritmus. Základná implementácia Zdrojový kód a experimentálne grafy sú dostupné v na GitHub. Experimentálne grafy možno nájsť v adresári . Všetky benchmarky v tomto príspevku sú implementované v súbore . úložisku Data/Sparse-Graphs.zip APSP01.cs Predtým, ako sa pustíme do implementácie, musíme si ujasniť niekoľko technických momentov: Všetky implementácie pracujú s váhovou maticou reprezentovanou vo forme lineárneho poľa. Všetky implementácie používajú celočíselnú aritmetiku. Absencia cesty medzi vrcholmi je vyjadrená špeciálnou konštantnou váhou: . NO_EDGE = (int.MaxValue / 2) - 1 Teraz poďme zistiť, prečo je to tak. Keď hovoríme o matriciach, bunky definujeme ako „riadky“ a „stĺpce“. Z tohto dôvodu sa zdá prirodzené predstaviť si maticu vo forme „štvorca“ alebo „obdĺžnika“ (obrázok 4a). Čo sa týka #1. To však nemusí znamenať, že musíme reprezentovať maticu vo forme poľa polí (obrázok 4b), aby sme sa držali našej predstavivosti. Namiesto toho môžeme maticu reprezentovať vo forme lineárneho poľa (obrázok 4c), kde sa index každej bunky vypočíta podľa nasledujúceho vzorca: i = row * row_size + col; // where row - cell row index, // col - cell column index, // row_size - number of cells in a row. Lineárne pole váh matice je pre efektívne vykonávanie Floyd-Warshallovho algoritmu. Dôvod nie je jednoduchý a podrobné vysvetlenie si zaslúži samostatný príspevok... alebo niekoľko príspevkov. V súčasnosti je však dôležité spomenúť, že takáto reprezentácia výrazne zlepšuje , čo má v skutočnosti veľký vplyv na výkon algoritmu. predpokladom dátovú lokalitu Tu vás žiadam, aby ste mi verili a mali na mysli práve túto informáciu ako podmienku, zároveň vám však odporúčam venovať chvíľu času a preštudovať si otázku a mimochodom – neverte ľuďom na internete . - Poznámka autora Ak sa bližšie pozriete na pseudokód algoritmu, nenájdete žiadne kontroly týkajúce sa existencie cesty medzi dvoma vrcholmi. Namiesto toho pseudokód jednoducho použije funkciu . Dôvod je jednoduchý – pôvodne, ak medzi vrcholmi neexistuje žiadna cesta, je hodnota bunky nastavená na a vo všetkých jazykoch, okrem JavaScriptu, sú všetky hodnoty menšie ako . V prípade celých čísel môže byť lákavé použiť ako hodnotu „bez cesty“. To však povedie k pretečeniu celého čísla v prípadoch, keď hodnoty oboch ciest a budú rovné . Preto používame hodnotu, ktorá je o jednu menšia ako polovica . Čo sa týka #2. min() infinity infinity int.MaxValue i ⭢ k k ⭢ j int.MaxValue int.MaxValue Ahoj! Ale prečo nemôžeme pred vykonaním akýchkoľvek výpočtov skontrolovať, či cesta existuje. Napríklad porovnaním ciest oboch s 0 (ak berieme nulu ako hodnotu „bez cesty“). - Premýšľavý čitateľ Je to skutočne možné, ale nanešťastie to povedie k výraznému zníženiu výkonu. Stručne povedané, CPU vedie štatistiku výsledkov hodnotenia pobočky napr. keď sa niektoré z výrokov vyhodnotia ako alebo . Túto štatistiku používa na spustenie kódu „štatisticky predpovedanej vetvy“ vopred pred vyhodnotením skutočného príkazu (toto sa nazýva ), a preto vykoná kód efektívnejšie. Keď je však predikcia CPU nepresná, spôsobuje to značnú stratu výkonu v porovnaní so správnou predikciou a bezpodmienečným vykonávaním, pretože CPU sa musí zastaviť a vypočítať správnu vetvu. if true false if špekulatívne vykonávanie Pretože pri každej iterácii aktualizujeme významnú časť matice váh, štatistika vetvy CPU sa stáva zbytočnou, pretože neexistuje vzor kódu, všetky vetvy sú založené výlučne na údajoch. Takže takáto kontrola bude mať za následok značné množstvo . k nesprávnych predpovedí pobočiek Tu vás tiež žiadam, aby ste mi (zatiaľ) verili a potom venovali nejaký čas štúdiu témy. Uhh, zdá sa, že sme skončili s teoretickou časťou – poďme implementovať algoritmus (túto implementáciu označujeme ako ): Baseline public void Baseline(int[] matrix, int sz) { for (var k = 0; k < sz; ++k) { for (var i = 0; i < sz; ++i) { for (var j = 0; j < sz; ++j) { var distance = matrix[i * sz + k] + matrix[k * sz + j]; if (matrix[i * sz + j] > distance) { matrix[i * sz + j] = distance; } } } } } Vyššie uvedený kód je takmer identickou kópiou vyššie uvedeného pseudokódu s jedinou výnimkou – namiesto používame na aktualizáciu bunky len v prípade potreby. Math.Min() if Ahoj! Moment, nebol si to ty, kto práve napísal veľa slov o tom, prečo tu nie je ak dobré a o pár riadkov neskôr si sami predstavíme ak? - Premýšľavý čitateľ Dôvod je jednoduchý. V momente písania JIT vydáva takmer ekvivalentný kód pre implementácie aj . Môžete si to skontrolovať podrobne na adrese ale tu sú úryvky tiel hlavných slučiek: if Math.Min sharplab.io, // // == Assembly code of implementation of innermost loop for of j using if. // 53: movsxd r14, r14d // // compare matrix[i * sz + j] and distance (Condition) // 56: cmp [rdx+r14*4+0x10], ebx 5b: jle short 64 // // if matrix[i * sz + j] greater than distance write distance to matrix // 5d: movsxd rbp, ebp 60: mov [rdx+rbp*4+0x10], ebx 64: // end of loop // // == Assembly code of implementation of innermost loop for of j using Math.Min. // 4f: movsxd rbp, ebp 52: mov r14d, [rdx+rbp*4+0x10] // // compare matrix[i * sz + j] and distance (Condition) // 57: cmp r14d, ebx. // // if matrix[i * sz + j] less than distance write value to matrix // 5a: jle short 5e // // otherwise write distance to matrix // 5c: jmp short 61 5e: mov ebx, r14d 61: mov [rdx+rbp*4+0x10], ebx 65: // end of loop Môžete vidieť, bez ohľadu na to, či používame alebo stále existuje podmienená kontrola. Avšak v prípade nie je zbytočné písať. if Math.Min if Teraz, keď sme dokončili implementáciu, je čas spustiť kód a zistiť, aké je to rýchle?! Správnosť kódu si môžete overiť sami spustením testov v . úložisku Na testovanie kódu používam (verzia 0.12.1). Všetky grafy použité v benchmarkoch sú grafy s 300, 600, 1200, 2400 a 4800 vrcholmi. Počet hrán v grafoch je okolo 80 % možného maxima, čo pre orientované, acyklické grafy možno vypočítať ako: Benchmark.NET orientované, acyklické var max = v * (v - 1)) / 2; // where v - is a number of vertexes in a graph. Poďme rock! Tu sú výsledky benchmarkov spustených na mojom PC (Windows 10.0.19042, Intel Core i7-7700 CPU 3,60 GHz (Kaby Lake) / 8 logických procesorov / 4 jadrá): Metóda Veľkosť Priemerná Chyba StdDev Základná línia 300 27,525 ms 0,1937 ms 0,1617 ms Základná línia 600 217,897 ms 1,6415 ms 1,5355 ms Základná línia 1200 1 763,335 ms 7,4561 ms 6,2262 ms Základná línia 2400 14 533,335 ms 63,3518 ms 52,9016 ms Základná línia 4800 119 768,219 ms 181,5514 ms 160,9406 ms Z výsledkov, ktoré môžeme vidieť, čas výpočtu dramaticky rastie v porovnaní s veľkosťou grafu – pre graf s 300 vrcholmi to trvalo 27 milisekúnd, pre graf s 2400 vrcholmi – 14,5 sekundy a pre graf 4800 – 119 sekúnd, čo je ! takmer 2 minúty Pri pohľade na kód algoritmu môže byť ťažké si to predstaviť, existuje niečo, čo môžeme urobiť na urýchlenie výpočtov... pretože dobre... sú tam TRI slučky, len TRI slučky. Ako to však už býva – možnosti sú skryté v detailoch 🙂 Poznáte údaje – riedke grafy Floyd-Warshallov algoritmus je základný algoritmus na riešenie problému najkratšej cesty všetkých párov, najmä pokiaľ ide o alebo grafy (pretože algoritmus hľadá cesty medzi všetkými pármi vrcholov). husté úplné V našich experimentoch však používame grafy, ktoré majú úžasnú vlastnosť – ak existuje cesta z vrcholu do vrcholu , potom z vrcholu do vrcholu žiadna cesta nevedie. Pre nás to znamená, že existuje veľa nesusediacich vrcholov, ktoré môžeme preskočiť, ak neexistuje cesta z do (túto implementáciu označujeme ako ). orientované, acyklické 1 2 2 1 i k SpartialOptimisation public void SpartialOptimisation(int[] matrix, int sz) { for (var k = 0; k < sz; ++k) { for (var i = 0; i < sz; ++i) { if (matrix[i * sz + k] == NO_EDGE) { continue; } for (var j = 0; j < sz; ++j) { var distance = matrix[i * sz + k] + matrix[k * sz + j]; if (matrix[i * sz + j] > distance) { matrix[i * sz + j] = distance; } } } } } Tu sú výsledky predchádzajúcich ( ) a súčasných ( ) implementácií na rovnakej sade grafov (najrýchlejšie výsledky sú zvýraznené ): Baseline SpartialOptimisation tučným písmom Metóda Veľkosť Priemerný Chyba StdDev Pomer Základná línia 300 27,525 ms 0,1937 ms 0,1617 ms 1,00 Čiastočná optimalizácia 300 12,399 ms 0,0943 ms 0,0882 ms 0,45 Základná línia 600 217,897 ms 1,6415 ms 1,5355 ms 1,00 Čiastočná optimalizácia 600 99,122 ms 0,8230 ms 0,7698 ms 0,45 Základná línia 1200 1 763,335 ms 7,4561 ms 6,2262 ms 1,00 Čiastočná optimalizácia 1200 766,675 ms 6,1147 ms 5,7197 ms 0,43 Základná línia 2400 14 533,335 ms 63,3518 ms 52,9016 ms 1,00 Čiastočná optimalizácia 2400 6 507,878 ms 28,2317 ms 26,4079 ms 0,45 Základná línia 4800 119 768,219 ms 181,5514 ms 160,9406 ms 1,00 Čiastočná optimalizácia 4800 55 590,374 ms 414,6051 ms 387,8218 ms 0,46 Celkom pôsobivé! Skrátili sme čas výpočtu na polovicu! Samozrejme, čím je graf hustejší, tým menšie bude zrýchlenie. Toto je však jedna z optimalizácií, ktorá sa hodí, ak vopred viete, s akou triedou údajov máte pracovať. Môžeme urobiť viac? 🙂 Poznajte svoj hardvér – paralelizmus Môj počítač je vybavený procesorom s 8 logickými procesormi ( ) alebo 4 jadrami s technológiou . Mať viac ako jedno jadro je ako mať viac „náhradných rúk“, ktoré môžeme dať do práce. Pozrime sa teda, ktorú časť práce možno efektívne rozdeliť medzi viacerých pracovníkov a potom ju paralelne uviesť. Inter Core i7-7700 CPU 3.60GHz (Kaby Lake) HW Hyper-Threading Slučky sú vždy najzrejmejším kandidátom na paralelizáciu, pretože vo väčšine prípadov sú iterácie nezávislé a môžu sa vykonávať súčasne. V algoritme máme tri slučky, ktoré by sme mali analyzovať a zistiť, či existujú nejaké závislosti, ktoré nám bránia v ich paralelizácii. Začnime od cyklu. V každej iteračnej slučke vypočítava cesty z každého vrcholu do každého vrcholu cez vrchol . Nové a aktualizované cesty sú uložené v matici váh. Pri pohľade na iterácie si môžete všimnúť – možno ich vykonať v akomkoľvek poradí: 0, 1, 2, 3 alebo 3, 1, 2, 0 bez ovplyvnenia výsledku. Stále sa však musia vykonávať postupne, inak niektoré z iterácií nebudú môcť používať nové alebo aktualizované cesty, pretože ešte nebudú zapísané do matice váh. Takáto nekonzistentnosť určite zničí výsledky. Musíme teda hľadať ďalej. for of k k Ďalším kandidátom je slučka . V každej iteračnej slučke načíta cestu z vrcholu do vrcholu (bunka: ), cestu z vrcholu do vrcholu (bunka: ]) a potom skontroluje, či známa cesta z do (bunka: ) je kratšia ako cesta (súčet: + ). Ak sa pozriete bližšie na prístupový vzor, môžete si všimnúť – pri každej iterácii slučka číta dáta z riadku a aktualizovaného riadku, čo v podstate znamená – iterácie sú nezávislé a môžu sa vykonávať nielen v akomkoľvek poradí, ale aj paralelne! for of i i k W[i, k] k j W[k, j i j W[i, j] i ⭢ k ⭢ j W[i, k] W[k, j] i k i Vyzerá to sľubne, tak to poďme implementovať (túto implementáciu označujeme ako ). SpartialParallelOptimisations slučka môže byť tiež paralelná. Paralelizácia najvnútornejšieho cyklu je však v tomto prípade veľmi neefektívna. Môžete si to overiť sami vykonaním niekoľkých jednoduchých zmien v . for of j zdrojovom kóde - Poznámka autora public void SpartialParallelOptimisations(int[] matrix, int sz) { for (var k = 0; k < sz; ++k) { Parallel.For(0, sz, i => { if (matrix[i * sz + k] == NO_EDGE) { return; } for (var j = 0; j < sz; ++j) { var distance = matrix[i * sz + k] + matrix[k * sz + j]; if (matrix[i * sz + j] > distance) { matrix[i * sz + j] = distance; } } }); } } Tu sú výsledky implementácií , a na rovnakej sade grafov (paralelizácia sa vykonáva pomocou triedy ): Baseline SpartialOptimisation SpartialParallelOptimisations Parallel Metóda Veľkosť Priemerná Chyba StdDev Pomer Základná línia 300 27,525 ms 0,1937 ms 0,1617 ms 1,00 Čiastočná optimalizácia 300 12,399 ms 0,0943 ms 0,0882 ms 0,45 SpartialParallelOptimizations 300 6,252 ms 0,0211 ms 0,0187 ms 0,23 Základná línia 600 217,897 ms 1,6415 ms 1,5355 ms 1,00 Čiastočná optimalizácia 600 99,122 ms 0,8230 ms 0,7698 ms 0,45 SpartialParallelOptimizations 600 35,825 ms 0,1003 ms 0,0837 ms 0,16 Základná línia 1200 1 763,335 ms 7,4561 ms 6,2262 ms 1,00 Čiastočná optimalizácia 1200 766,675 ms 6,1147 ms 5,7197 ms 0,43 SpartialParallelOptimizations 1200 248,801 ms 0,6040 ms 0,5043 ms 0,14 Základná línia 2400 14 533,335 ms 63,3518 ms 52,9016 ms 1,00 Čiastočná optimalizácia 2400 6 507,878 ms 28,2317 ms 26,4079 ms 0,45 SpartialParallelOptimizations 2400 2 076,403 ms 20,8320 ms 19,4863 ms 0,14 Základná línia 4800 119 768,219 ms 181,5514 ms 160,9406 ms 1,00 Čiastočná optimalizácia 4800 55 590,374 ms 414,6051 ms 387,8218 ms 0,46 SpartialParallelOptimizations 4800 15 614,506 ms 115,6996 ms 102,5647 ms 0,13 Z výsledkov môžeme vidieť, že paralelizácia cyklu skrátila čas výpočtu 2-4 krát v porovnaní s predchádzajúcou implementáciou ( )! Je to veľmi pôsobivé, ale je dôležité si uvedomiť, že paralelizácia čistých výpočtov spotrebúva všetky dostupné výpočtové zdroje, čo môže viesť k nedostatku zdrojov iných aplikácií v systéme. for of i SpartialOptimisation Paralelizácia by sa mala používať opatrne. Ako asi tušíte – toto nie je koniec 🙂 Poznajte svoju platformu – vektorizácia je transformácia kódu fungujúceho na jednom prvku na kód fungujúci na viacerých prvkoch súčasne. Vektorizácia Môže to znieť ako zložitá záležitosť, tak sa pozrime, ako to funguje na jednoduchom príklade: var a = new [] { 5, 7, 8, 1 }; var b = new [] { 4, 2, 2, 6 }; var c = new [] { 0, 0, 0, 0 }; for (var i = 0; i < 4; ++i) c[i] = a[i] + b[i]; spôsobom možno vykonanie iterácie vyššie uvedeného cyklu na úrovni CPU znázorniť takto: Priveľmi zjednodušeným 0 for Tu je to, čo sa deje. CPU načítava hodnoty polí a z pamäte do registrov CPU (jeden register môže obsahovať hodnotu). Potom CPU vykoná operáciu skalárneho sčítania na týchto registroch a zapíše výsledok späť do hlavnej pamäte – priamo do poľa . Tento proces sa opakuje štyrikrát pred koncom slučky. a b práve jednu c Vektorizácia znamená použitie špeciálnych CPU registrov – vektorových alebo SIMD (single inštrukcie multiple data) registrov a zodpovedajúcich CPU inštrukcií na vykonávanie operácií s viacerými hodnotami poľa naraz: Tu je to, čo sa deje. CPU načíta hodnoty polí a z pamäte do registrov CPU (tentoraz však jeden vektorový register môže obsahovať hodnoty poľa). Potom CPU vykoná operáciu sčítania vektorov na týchto registroch a zapíše výsledok späť do hlavnej pamäte – priamo do poľa . Pretože pracujeme s dvoma hodnotami naraz, tento proces sa namiesto štyroch opakuje dvakrát. a b dve c Na implementáciu vektorizácie v .NET môžeme použiť – typy a (môžeme použiť aj typy z menného priestoru , sú však trochu pokročilé pre aktuálnu implementáciu, takže ich nebudem používať, ale cítim môžete si ich vyskúšať sami): Vector Vector<T> System.Runtime.Intrinsics public void SpartialVectorOptimisations(int[] matrix, int sz) { for (var k = 0; k < sz; ++k) { for (var i = 0; i < sz; ++i) { if (matrix[i * sz + k] == NO_EDGE) { continue; } var ik_vec = new Vector<int>(matrix[i * sz + k]); var j = 0; for (; j < sz - Vector<int>.Count; j += Vector<int>.Count) { var ij_vec = new Vector<int>(matrix, i * sz + j); var ikj_vec = new Vector<int>(matrix, k * sz + j) + ik_vec; var lt_vec = Vector.LessThan(ij_vec, ikj_vec); if (lt_vec == new Vector<int>(-1)) { continue; } var r_vec = Vector.ConditionalSelect(lt_vec, ij_vec, ikj_vec); r_vec.CopyTo(matrix, i * sz + j); } for (; j < sz; ++j) { var distance = matrix[i * sz + k] + matrix[k * sz + j]; if (matrix[i * sz + j] > distance) { matrix[i * sz + j] = distance; } } } } } Vektorizovaný kód môže vyzerať trochu bizarne, tak si ho poďme prejsť krok za krokom. Vytvoríme vektor cesty : . Výsledkom je, že ak vektor môže obsahovať štyri hodnoty typu a váha cesty je rovná 5, vytvoríme vektor štyroch 5 – [5, 5, 5, 5]. Ďalej pri každej iterácii súčasne vypočítame cesty z vrcholu k vrcholom . i ⭢ k var ik_vec = new Vector<int>(matrix[i * sz + k]) int i ⭢ k i j, j + 1, ..., j + Vector<int>.Count Vlastnosť vráti počet prvkov typu , ktoré sa zmestia do vektorových registrov. Veľkosť vektorových registrov závisí od modelu CPU, takže táto vlastnosť môže vrátiť rôzne hodnoty na rôznych CPU. Vector<int>.Count int - Poznámka autora Celý proces výpočtu možno rozdeliť do troch krokov: Načítajte informácie z matice váh do vektorov: a . ij_vec ikj_vec Porovnajte vektory a a vyberte najmenšie hodnoty do nového vektora . ij_vec ikj_vec r_vec Napíšte späť do matice váh. r_vec Zatiaľ čo a sú celkom jednoduché, vyžaduje vysvetlenie. Ako už bolo spomenuté, pomocou vektorov manipulujeme s viacerými hodnotami súčasne. Preto výsledok niektorých operácií nemôže byť singulárny, tj porovnávacia operácia nemôže vrátiť alebo , pretože porovnáva viacero hodnôt. Takže namiesto toho vráti nový vektor, ktorý obsahuje výsledky porovnania medzi zodpovedajúcimi hodnotami z vektorov a ( , ak je hodnota z menšia ako hodnota z a , ak je to inak). Vrátený vektor (samotný o sebe) nie je veľmi užitočný, ale môžeme použiť vektorovú operáciu na extrahovanie požadovaných hodnôt z vektorov a do nového vektora – . Táto operácia vráti nový vektor, kde hodnoty sú rovné menšie ako dve zodpovedajúce hodnoty vstupných vektorov, tj ak je hodnota vektora rovná , potom operácia vyberie hodnotu z , v opačnom prípade vyberie hodnotu z . #1 #3 #2 Vector.LessThan(ij_vec, ikj_vec) true false ij_vec ikj_vec -1 ij_vec ikj_vec 0 Vector.ConditionalSelect(lt_vec, ij_vec, ikj_vec) ij_vec ikj_vec r_vec lt_vec -1 ij_vec ikj_vec Okrem je tu ešte jedna časť, ktorá si vyžaduje vysvetlenie – druhá, nevektorizovaná slučka. Vyžaduje sa, keď veľkosť matice váh nie je súčinom hodnoty . V takom prípade hlavná slučka nemôže spracovať všetky hodnoty (pretože nemôžete načítať časť vektora) a druhá, nevektorovaná, slučka vypočíta zostávajúce iterácie. #2 Vector<int>.Count Tu sú výsledky tejto a všetkých predchádzajúcich implementácií: Metóda sz Priemerná Chyba StdDev Pomer Základná línia 300 27,525 ms 0,1937 ms 0,1617 ms 1,00 Čiastočná optimalizácia 300 12,399 ms 0,0943 ms 0,0882 ms 0,45 SpartialParallelOptimizations 300 6,252 ms 0,0211 ms 0,0187 ms 0,23 SpartialVectorOptimizations 300 3,056 ms 0,0301 ms 0,0281 ms 0,11 Základná línia 600 217,897 ms 1,6415 ms 1,5355 ms 1,00 Čiastočná optimalizácia 600 99,122 ms 0,8230 ms 0,7698 ms 0,45 SpartialParallelOptimizations 600 35,825 ms 0,1003 ms 0,0837 ms 0,16 SpartialVectorOptimizations 600 24,378 ms 0,4320 ms 0,4041 ms 0,11 Základná línia 1200 1 763,335 ms 7,4561 ms 6,2262 ms 1,00 Čiastočná optimalizácia 1200 766,675 ms 6,1147 ms 5,7197 ms 0,43 SpartialParallelOptimizations 1200 248,801 ms 0,6040 ms 0,5043 ms 0,14 SpartialVectorOptimizations 1200 185,628 ms 2,1240 ms 1,9868 ms 0,11 Základná línia 2400 14 533,335 ms 63,3518 ms 52,9016 ms 1,00 Čiastočná optimalizácia 2400 6 507,878 ms 28,2317 ms 26,4079 ms 0,45 SpartialParallelOptimizations 2400 2 076,403 ms 20,8320 ms 19,4863 ms 0,14 SpartialVectorOptimizations 2400 2 568,676 ms 31,7359 ms 29,6858 ms 0,18 Základná línia 4800 119 768,219 ms 181,5514 ms 160,9406 ms 1,00 Čiastočná optimalizácia 4800 55 590,374 ms 414,6051 ms 387,8218 ms 0,46 SpartialParallelOptimizations 4800 15 614,506 ms 115,6996 ms 102,5647 ms 0,13 SpartialVectorOptimizations 4800 18 257,991 ms 84,5978 ms 79,1329 ms 0,15 Z výsledku je zrejmé, že vektorizácia výrazne skrátila čas výpočtu – 3 až 4 krát v porovnaní s neparalelnou verziou ( ). Zaujímavým momentom je, že vektorizovaná verzia tiež prekonáva paralelnú verziu ( ) na menších grafoch (menej ako 2400 vrcholov). SpartialOptimisation SpartialParallelOptimisations A v neposlednom rade – spojme vektorizáciu a paralelizmus! Ak vás zaujíma praktická aplikácia vektorizácie, odporúčam vám prečítať si sériu príspevkov od , kde vektorizoval (tieto výsledky sa neskôr ocitli v aktualizácii pre Garbage Collector v ). Dana Shechtera Array.Sort .NET 5 Poznajte svoju platformu a hardvér – vektorizácia a paralelizmus! Posledná implementácia kombinuje úsilie o paralelizáciu a vektorizáciu a... úprimne povedané, chýba jej individualita 🙂 Pretože... no, práve sme nahradili telo od vektorizovanou slučkou od : Parallel.For SpartialParallelOptimisations SpartialVectorOptimisations public void SpartialParallelVectorOptimisations(int[] matrix, int sz) { for (var k = 0; k < sz; ++k) { Parallel.For(0, sz, i => { if (matrix[i * sz + k] == NO_EDGE) { return; } var ik_vec = new Vector<int>(matrix[i * sz + k]); var j = 0; for (; j < sz - Vector<int>.Count; j += Vector<int>.Count) { var ij_vec = new Vector<int>(matrix, i * sz + j); var ikj_vec = new Vector<int>(matrix, k * sz + j) + ik_vec; var lt_vec = Vector.LessThan(ij_vec, ikj_vec); if (lt_vec == new Vector<int>(-1)) { continue; } var r_vec = Vector.ConditionalSelect(lt_vec, ij_vec, ikj_vec); r_vec.CopyTo(matrix, i * sz + j); } for (; j < sz; ++j) { var distance = matrix[i * sz + k] + matrix[k * sz + j]; if (matrix[i * sz + j] > distance) { matrix[i * sz + j] = distance; } } }); } } Všetky výsledky tohto príspevku sú uvedené nižšie. Ako sa očakávalo, simultánne použitie paralelizmu a vektorizácie preukázalo najlepšie výsledky, ktoré skrátilo čas výpočtu až 25-krát (pre grafy s 1200 vrcholmi) v porovnaní s počiatočnou implementáciou. Metóda sz Priemerná Chyba StdDev Pomer Základná línia 300 27,525 ms 0,1937 ms 0,1617 ms 1,00 Čiastočná optimalizácia 300 12,399 ms 0,0943 ms 0,0882 ms 0,45 SpartialParallelOptimizations 300 6,252 ms 0,0211 ms 0,0187 ms 0,23 SpartialVectorOptimizations 300 3,056 ms 0,0301 ms 0,0281 ms 0,11 SpartialParallelVectorOptimizations 300 3,008 ms 0,0075 ms 0,0066 ms 0,11 Základná línia 600 217,897 ms 1,6415 ms 1,5355 ms 1,00 Čiastočná optimalizácia 600 99,122 ms 0,8230 ms 0,7698 ms 0,45 SpartialParallelOptimizations 600 35,825 ms 0,1003 ms 0,0837 ms 0,16 SpartialVectorOptimizations 600 24,378 ms 0,4320 ms 0,4041 ms 0,11 SpartialParallelVectorOptimizations 600 13,425 ms 0,0319 ms 0,0283 ms 0,06 Základná línia 1200 1 763,335 ms 7,4561 ms 6,2262 ms 1,00 Čiastočná optimalizácia 1200 766,675 ms 6,1147 ms 5,7197 ms 0,43 SpartialParallelOptimizations 1200 248,801 ms 0,6040 ms 0,5043 ms 0,14 SpartialVectorOptimizations 1200 185,628 ms 2,1240 ms 1,9868 ms 0,11 SpartialParallelVectorOptimizations 1200 76,770 ms 0,3021 ms 0,2522 ms 0,04 Základná línia 2400 14 533,335 ms 63,3518 ms 52,9016 ms 1,00 Čiastočná optimalizácia 2400 6 507,878 ms 28,2317 ms 26,4079 ms 0,45 SpartialParallelOptimizations 2400 2 076,403 ms 20,8320 ms 19,4863 ms 0,14 SpartialVectorOptimizations 2400 2 568,676 ms 31,7359 ms 29,6858 ms 0,18 SpartialParallelVectorOptimizations 2400 1 281,877 ms 25,1127 ms 64,8239 ms 0,09 Základná línia 4800 119 768,219 ms 181,5514 ms 160,9406 ms 1,00 Čiastočná optimalizácia 4800 55 590,374 ms 414,6051 ms 387,8218 ms 0,46 SpartialParallelOptimizations 4800 15 614,506 ms 115,6996 ms 102,5647 ms 0,13 SpartialVectorOptimizations 4800 18 257,991 ms 84,5978 ms 79,1329 ms 0,15 SpartialParallelVectorOptimizations 4800 12 785,824 ms 98,6947 ms 87,4903 ms 0,11 Záver V tomto príspevku sme sa mierne ponorili do problému najkratšej cesty všetkých párov a implementovali sme Floyd-Warshallov algoritmus v C#, aby sme ho vyriešili. Aktualizovali sme tiež našu implementáciu, aby rešpektovala dáta a využívala nízkoúrovňové funkcie .NET a hardvéru. V tomto príspevku sme hrali „all in“. V aplikáciách v reálnom živote je však dôležité mať na pamäti – nie sme sami. Hardcore paralelizácia môže výrazne znížiť výkon systému a spôsobiť viac škody ako úžitku. Vektorizácia na druhej strane je o niečo bezpečnejšia, ak sa vykonáva spôsobom nezávislým od platformy. Pamätajte, že niektoré vektorové inštrukcie môžu byť dostupné len na určitých CPU. Dúfam, že sa vám čítanie páčilo a zabavili ste sa pri „vytláčaní“ niektorých kúskov výkonu z algoritmu len s TROCH cyklami 🙂 Referencie Floyd, RW Algorithm 97: Najkratšia cesta / RW Floyd // Komunikácia ACM. – 1962. – Sv. 5, №. 6. – S. 345-. Ingerman, PZ Algoritmus 141: Matica cesty / PZ Ingerman // Komunikácia ACM. – 1962. – Sv. 5, №. 11. – S. 556.