Každý z nás mnohokrát denne rieši problém „ najkratšej cesty “. Samozrejme neúmyselne. Riešime to cestou do práce alebo keď si prezeráme internet alebo keď si ukladáme veci na stôl.
Znie to trochu... priveľa? Poďme to zistiť.
Predstavte si, že ste sa rozhodli stretnúť sa s priateľmi, no... povedzme v kaviarni. Najprv musíte nájsť cestu (alebo cestu) do kaviarne, takže začnete hľadať dostupnú verejnú dopravu (ak idete pešo) alebo trasy a ulice (ak idete autom). Hľadáte trasu z vašej aktuálnej polohy do kaviarne, ale nie „akúkoľvek“ trasu – najkratšiu alebo najrýchlejšiu.
Tu je ešte jeden príklad, ktorý nie je taký zrejmý ako ten predchádzajúci. Počas práce sa rozhodnete ísť „skratkou“ cez bočnú ulicu, pretože dobre... je to „skratka“ a je „rýchlejšie“ ísť touto cestou. Ale ako ste vedeli, že táto „skratka“ je rýchlejšia? Na základe osobných skúseností ste vyriešili problém „najkratšej cesty“ a vybrali ste si trasu, ktorá vedie cez bočnú ulicu.
V oboch príkladoch je „najkratšia“ trasa určená buď vzdialenosťou, alebo časom potrebným na to, aby ste sa dostali z jedného miesta na druhé. Cestovné príklady sú veľmi prirodzenými aplikáciami teórie grafov a zvlášť problému „najkratšej cesty“. Čo však príklad s usporiadaním vecí na stole? V tomto prípade môže „najkratšia“ predstavovať množstvo alebo zložitosť akcií, ktoré musíte vykonať, aby ste získali napríklad hárok papiera: otvoriť stôl, otvoriť priečinok, vziať hárok papiera alebo vziať hárok papiera priamo zo stola. .
Všetky vyššie uvedené príklady predstavujú problém nájdenia najkratšej cesty medzi dvoma vrcholmi v grafe (trasa alebo cesta medzi dvoma miestami, množstvo akcií alebo zložitosť získania listu papiera z jedného alebo druhého miesta). Táto trieda problémov s najkratšou cestou sa nazýva SSSP (Single Source Shortest Path) a základným algoritmom na riešenie týchto problémov je Dijkstrov algoritmus , ktorý má výpočtovú zložitosť O(n^2)
.
Niekedy však potrebujeme nájsť všetky najkratšie cesty medzi všetkými vrcholmi. Zvážte nasledujúci príklad: vytvárate pre vás mapu pravidelných pohybov medzi domovom , prácou a divadlom . V tomto scenári budete mať 6 trás: work ⭢ home
, home ⭢ work
, work ⭢ theatre
, theatre ⭢ work
, home ⭢ theatre
a theatre ⭢ home
(opačné trasy sa môžu líšiť napríklad z dôvodu jednosmerných ciest) .
Pridaním ďalšieho miesta na mapu dôjde k výraznému nárastu kombinácií – podľa permutácií n vzorca kombinatoriky to možno vypočítať ako:
A(k, n) = n! / (n - m)! // where n - is a number of elements, // k - is a number of elements in permutation (in our case k = 2)
Čo nám dáva 12 trás pre 4 miesta a 90 trás pre 10 miest (čo je pôsobivé). Poznámka... toto sa neberie do úvahy medziľahlé body medzi miestami, tj aby ste sa dostali z domu do práce, musíte prejsť 4 ulice, prejsť popri rieke a prejsť cez most. Ak si predstavíme, niektoré cesty môžu mať spoločné medziľahlé body... no... výsledkom bude veľmi veľký graf s množstvom vrcholov, kde každý vrchol bude predstavovať buď miesto, alebo významný medziľahlý bod. Trieda problémov, kde potrebujeme nájsť všetky najkratšie cesty medzi všetkými pármi vrcholov v grafe, sa nazýva APSP (All Pairs Shortest Paths) a základným algoritmom na riešenie týchto problémov je Floyd-Warshallov algoritmus , ktorý má O(n^3)
výpočtová zložitosť.
A toto je algoritmus, ktorý dnes implementujeme 🙂
Floyd-Warshallov algoritmus nájde všetky najkratšie cesty medzi každým párom vrcholov v grafe. Algoritmy publikoval Robert Floyd v [1] (ďalšie podrobnosti nájdete v časti „Odkazy“). V tom istom roku Peter Ingerman v [2] opísal modernú implementáciu algoritmu vo forme troch vnorených slučiek for
:
algorithm FloydWarshall(W) do for k = 0 to N - 1 do for i = 0 to N - 1 do for j = 0 to N - 1 do W[i, j] = min(W[i, j], W[i, k] + W[k, j]) end for end for end for end algorithm // where W - is a weight matrix of N x N size, // min() - is a function which returns lesser of it's arguments
Ak ste nikdy nezmenili prácu s grafom reprezentovaným vo forme matice, môže byť ťažké pochopiť, čo robí vyššie uvedený algoritmus. Aby sme sa uistili, že sme na rovnakej stránke, pozrime sa na to, ako môže byť graf reprezentovaný vo forme matice a prečo je takáto reprezentácia užitočná pri riešení problému s najkratšou cestou.
Obrázok nižšie ilustruje orientovaný, vážený graf 5 vrcholov. Vľavo je graf prezentovaný vo vizuálnej forme, ktorá pozostáva z kruhov a šípok, kde každý kruh predstavuje vrchol a šípka predstavuje hranu so smerom. Číslo vo vnútri kruhu zodpovedá číslu vrcholu a číslo nad okrajom zodpovedá hmotnosti hrany. Na pravej strane je rovnaký graf prezentovaný vo forme matice váh. Váhová matica je forma matice susednosti , kde každá bunka matice obsahuje „váhu“ – vzdialenosť medzi vrcholom i
(riadok) a vrcholom j
(stĺpec). Váhová matica neobsahuje informácie o „ceste“ medzi vrcholmi (zoznam vrcholov, cez ktoré sa dostanete z i
do j
) – iba váhu (vzdialenosť) medzi týmito vrcholmi.
V matici váh môžeme vidieť, že hodnoty buniek sa rovnajú váham medzi susednými vrcholmi. Preto, ak si prezrieme cesty z vrcholu 0
(riadok 0
), uvidíme, že ... existuje len jedna cesta – od 0
do 1
. Ale na vizuálnej reprezentácii môžeme jasne vidieť cesty z vrcholu 0
k vrcholom 2
a 3
(cez vrchol 1
). Dôvod je jednoduchý – v počiatočnom stave obsahuje matica váh iba vzdialenosť medzi susednými vrcholmi. Na nájdenie zvyšku však stačí len táto informácia.
Pozrime sa, ako to funguje. Venujte pozornosť bunke W[0, 1]
. Jeho hodnota udáva, že z vrcholu 0
do vrcholu 1
vedie cesta s váhou rovnajúcou sa 1
(v skratke môžeme zapísať ako: 0 ⭢ 1: 1
). S týmito znalosťami teraz môžeme skenovať všetky bunky riadku 1
(ktorý obsahuje všetky váhy všetkých ciest z vrcholu 1
) a spätne portovať túto informáciu do riadku 0
, čím sa hmotnosť zvýši o 1
(hodnota W[0, 1]
).
Rovnakým postupom môžeme nájsť cesty z vrcholu 0
cez ďalšie vrcholy. Počas vyhľadávania sa môže stať, že k rovnakému vrcholu vedie viac ako jedna cesta a čo je dôležitejšie, váhy týchto ciest môžu byť rôzne. Príklad takejto situácie je znázornený na obrázku nižšie, kde vyhľadávanie od vrcholu 0
po vrchol 2
odhalilo ešte jednu cestu k vrcholu 3
s menšou hmotnosťou.
Máme dve cesty: pôvodnú cestu – 0 ⭢ 3: 4
a novú cestu, ktorú sme práve objavili – 0 ⭢ 2 ⭢ 3: 3
(pamätajte, že matica váh neobsahuje cesty, takže nevieme, ktorú vrcholy sú zahrnuté v pôvodnej ceste). Toto je moment, kedy sa rozhodneme ponechať si najkratšiu a zapíšeme 3
do bunky W[0, 3]
.
Zdá sa, že sme práve našli našu prvú najkratšiu cestu!
Kroky, ktoré sme práve videli, sú podstatou Floyd-Warshallovho algoritmu. Pozrite sa na pseudokód algoritmu ešte raz:
algorithm FloydWarshall(W) do for k = 0 to N - 1 do for i = 0 to N - 1 do for j = 0 to N - 1 do W[i, j] = min(W[i, j], W[i, k] + W[k, j]) end for end for end for end algorithm
Najvzdialenejší cyklus for
on k
iteruje cez všetky vrcholy v grafe a v každej iterácii premenná k
predstavuje vrchol , cez ktorý hľadáme cesty . Vnútorný cyklus for
on i
tiež iteruje cez všetky vrcholy v grafe a pri každej iterácii i
predstavuje vrchol, z ktorého hľadáme cesty . A nakoniec najvnútornejší cyklus for
j
iteruje cez všetky vrcholy v grafe a v každej iterácii j
predstavuje vrchol , ku ktorému hľadáme cestu. V kombinácii nám to dáva nasledovné: v každej iterácii k
hľadáme cesty od všetkých vrcholov i
do všetkých vrcholov j
až po vrchol k
. Vo vnútri cyklu porovnávame cestu i ⭢ j
(predstavenú W[i, j]
) s dráhou i ⭢ k ⭢ j
(predstavenú súčtom W[I, k]
a W[k, j]
) a napíšeme najkratšiu jeden späť do W[i, j]
.
Teraz, keď rozumieme mechanike, je čas implementovať algoritmus.
Zdrojový kód a experimentálne grafy sú dostupné v úložisku na GitHub. Experimentálne grafy možno nájsť v adresári Data/Sparse-Graphs.zip
. Všetky benchmarky v tomto príspevku sú implementované v súbore APSP01.cs .
Predtým, ako sa pustíme do implementácie, musíme si ujasniť niekoľko technických momentov:
NO_EDGE = (int.MaxValue / 2) - 1
.
Teraz poďme zistiť, prečo je to tak.
Čo sa týka #1. Keď hovoríme o matriciach, bunky definujeme ako „riadky“ a „stĺpce“. Z tohto dôvodu sa zdá prirodzené predstaviť si maticu vo forme „štvorca“ alebo „obdĺžnika“ (obrázok 4a).
To však nemusí znamenať, že musíme reprezentovať maticu vo forme poľa polí (obrázok 4b), aby sme sa držali našej predstavivosti. Namiesto toho môžeme maticu reprezentovať vo forme lineárneho poľa (obrázok 4c), kde sa index každej bunky vypočíta podľa nasledujúceho vzorca:
i = row * row_size + col; // where row - cell row index, // col - cell column index, // row_size - number of cells in a row.
Lineárne pole váh matice je predpokladom pre efektívne vykonávanie Floyd-Warshallovho algoritmu. Dôvod nie je jednoduchý a podrobné vysvetlenie si zaslúži samostatný príspevok... alebo niekoľko príspevkov. V súčasnosti je však dôležité spomenúť, že takáto reprezentácia výrazne zlepšuje dátovú lokalitu , čo má v skutočnosti veľký vplyv na výkon algoritmu.
Tu vás žiadam, aby ste mi verili a mali na mysli práve túto informáciu ako podmienku, zároveň vám však odporúčam venovať chvíľu času a preštudovať si otázku a mimochodom – neverte ľuďom na internete .
- Poznámka autora
Čo sa týka #2. Ak sa bližšie pozriete na pseudokód algoritmu, nenájdete žiadne kontroly týkajúce sa existencie cesty medzi dvoma vrcholmi. Namiesto toho pseudokód jednoducho použije funkciu min()
. Dôvod je jednoduchý – pôvodne, ak medzi vrcholmi neexistuje žiadna cesta, je hodnota bunky nastavená na infinity
a vo všetkých jazykoch, okrem JavaScriptu, sú všetky hodnoty menšie ako infinity
. V prípade celých čísel môže byť lákavé použiť int.MaxValue
ako hodnotu „bez cesty“. To však povedie k pretečeniu celého čísla v prípadoch, keď hodnoty oboch ciest i ⭢ k
a k ⭢ j
budú rovné int.MaxValue
. Preto používame hodnotu, ktorá je o jednu menšia ako polovica int.MaxValue
.
Ahoj! Ale prečo nemôžeme pred vykonaním akýchkoľvek výpočtov skontrolovať, či cesta existuje. Napríklad porovnaním ciest oboch s 0 (ak berieme nulu ako hodnotu „bez cesty“).
- Premýšľavý čitateľ
Je to skutočne možné, ale nanešťastie to povedie k výraznému zníženiu výkonu. Stručne povedané, CPU vedie štatistiku výsledkov hodnotenia pobočky napr. keď sa niektoré z výrokov if
vyhodnotia ako true
alebo false
. Túto štatistiku používa na spustenie kódu „štatisticky predpovedanej vetvy“ vopred pred vyhodnotením skutočného príkazu if
(toto sa nazýva špekulatívne vykonávanie ), a preto vykoná kód efektívnejšie. Keď je však predikcia CPU nepresná, spôsobuje to značnú stratu výkonu v porovnaní so správnou predikciou a bezpodmienečným vykonávaním, pretože CPU sa musí zastaviť a vypočítať správnu vetvu.
Pretože pri každej iterácii k
aktualizujeme významnú časť matice váh, štatistika vetvy CPU sa stáva zbytočnou, pretože neexistuje vzor kódu, všetky vetvy sú založené výlučne na údajoch. Takže takáto kontrola bude mať za následok značné množstvo nesprávnych predpovedí pobočiek .
Tu vás tiež žiadam, aby ste mi (zatiaľ) verili a potom venovali nejaký čas štúdiu témy.
Uhh, zdá sa, že sme skončili s teoretickou časťou – poďme implementovať algoritmus (túto implementáciu označujeme ako Baseline
):
public void Baseline(int[] matrix, int sz) { for (var k = 0; k < sz; ++k) { for (var i = 0; i < sz; ++i) { for (var j = 0; j < sz; ++j) { var distance = matrix[i * sz + k] + matrix[k * sz + j]; if (matrix[i * sz + j] > distance) { matrix[i * sz + j] = distance; } } } } }
Vyššie uvedený kód je takmer identickou kópiou vyššie uvedeného pseudokódu s jedinou výnimkou – namiesto Math.Min()
používame if
na aktualizáciu bunky len v prípade potreby.
Ahoj! Moment, nebol si to ty, kto práve napísal veľa slov o tom, prečo tu nie je ak dobré a o pár riadkov neskôr si sami predstavíme ak?
- Premýšľavý čitateľ
Dôvod je jednoduchý. V momente písania JIT vydáva takmer ekvivalentný kód pre implementácie if
aj Math.Min
. Môžete si to skontrolovať podrobne na adrese sharplab.io, ale tu sú úryvky tiel hlavných slučiek:
// // == Assembly code of implementation of innermost loop for of j using if. // 53: movsxd r14, r14d // // compare matrix[i * sz + j] and distance (Condition) // 56: cmp [rdx+r14*4+0x10], ebx 5b: jle short 64 // // if matrix[i * sz + j] greater than distance write distance to matrix // 5d: movsxd rbp, ebp 60: mov [rdx+rbp*4+0x10], ebx 64: // end of loop // // == Assembly code of implementation of innermost loop for of j using Math.Min. // 4f: movsxd rbp, ebp 52: mov r14d, [rdx+rbp*4+0x10] // // compare matrix[i * sz + j] and distance (Condition) // 57: cmp r14d, ebx. // // if matrix[i * sz + j] less than distance write value to matrix // 5a: jle short 5e // // otherwise write distance to matrix // 5c: jmp short 61 5e: mov ebx, r14d 61: mov [rdx+rbp*4+0x10], ebx 65: // end of loop
Môžete vidieť, bez ohľadu na to, či používame if
alebo Math.Min
stále existuje podmienená kontrola. Avšak v prípade if
nie je zbytočné písať.
Teraz, keď sme dokončili implementáciu, je čas spustiť kód a zistiť, aké je to rýchle?!
Správnosť kódu si môžete overiť sami spustením testov v úložisku .
Na testovanie kódu používam Benchmark.NET (verzia 0.12.1). Všetky grafy použité v benchmarkoch sú orientované, acyklické grafy s 300, 600, 1200, 2400 a 4800 vrcholmi. Počet hrán v grafoch je okolo 80 % možného maxima, čo pre orientované, acyklické grafy možno vypočítať ako:
var max = v * (v - 1)) / 2; // where v - is a number of vertexes in a graph.
Poďme rock!
Tu sú výsledky benchmarkov spustených na mojom PC (Windows 10.0.19042, Intel Core i7-7700 CPU 3,60 GHz (Kaby Lake) / 8 logických procesorov / 4 jadrá):
Metóda | Veľkosť | Priemerná | Chyba | StdDev |
---|---|---|---|---|
Základná línia | 300 | 27,525 ms | 0,1937 ms | 0,1617 ms |
Základná línia | 600 | 217,897 ms | 1,6415 ms | 1,5355 ms |
Základná línia | 1200 | 1 763,335 ms | 7,4561 ms | 6,2262 ms |
Základná línia | 2400 | 14 533,335 ms | 63,3518 ms | 52,9016 ms |
Základná línia | 4800 | 119 768,219 ms | 181,5514 ms | 160,9406 ms |
Z výsledkov, ktoré môžeme vidieť, čas výpočtu dramaticky rastie v porovnaní s veľkosťou grafu – pre graf s 300 vrcholmi to trvalo 27 milisekúnd, pre graf s 2400 vrcholmi – 14,5 sekundy a pre graf 4800 – 119 sekúnd, čo je takmer 2 minúty !
Pri pohľade na kód algoritmu môže byť ťažké si to predstaviť, existuje niečo, čo môžeme urobiť na urýchlenie výpočtov... pretože dobre... sú tam TRI slučky, len TRI slučky.
Ako to však už býva – možnosti sú skryté v detailoch 🙂
Floyd-Warshallov algoritmus je základný algoritmus na riešenie problému najkratšej cesty všetkých párov, najmä pokiaľ ide o husté alebo úplné grafy (pretože algoritmus hľadá cesty medzi všetkými pármi vrcholov).
V našich experimentoch však používame orientované, acyklické grafy, ktoré majú úžasnú vlastnosť – ak existuje cesta z vrcholu 1
do vrcholu 2
, potom z vrcholu 2
do vrcholu 1
žiadna cesta nevedie. Pre nás to znamená, že existuje veľa nesusediacich vrcholov, ktoré môžeme preskočiť, ak neexistuje cesta z i
do k
(túto implementáciu označujeme ako SpartialOptimisation
).
public void SpartialOptimisation(int[] matrix, int sz) { for (var k = 0; k < sz; ++k) { for (var i = 0; i < sz; ++i) { if (matrix[i * sz + k] == NO_EDGE) { continue; } for (var j = 0; j < sz; ++j) { var distance = matrix[i * sz + k] + matrix[k * sz + j]; if (matrix[i * sz + j] > distance) { matrix[i * sz + j] = distance; } } } } }
Tu sú výsledky predchádzajúcich ( Baseline
) a súčasných ( SpartialOptimisation
) implementácií na rovnakej sade grafov (najrýchlejšie výsledky sú zvýraznené tučným písmom ):
Metóda | Veľkosť | Priemerný | Chyba | StdDev | Pomer |
---|---|---|---|---|---|
Základná línia | 300 | 27,525 ms | 0,1937 ms | 0,1617 ms | 1,00 |
Čiastočná optimalizácia | 300 | 12,399 ms | 0,0943 ms | 0,0882 ms | 0,45 |
Základná línia | 600 | 217,897 ms | 1,6415 ms | 1,5355 ms | 1,00 |
Čiastočná optimalizácia | 600 | 99,122 ms | 0,8230 ms | 0,7698 ms | 0,45 |
Základná línia | 1200 | 1 763,335 ms | 7,4561 ms | 6,2262 ms | 1,00 |
Čiastočná optimalizácia | 1200 | 766,675 ms | 6,1147 ms | 5,7197 ms | 0,43 |
Základná línia | 2400 | 14 533,335 ms | 63,3518 ms | 52,9016 ms | 1,00 |
Čiastočná optimalizácia | 2400 | 6 507,878 ms | 28,2317 ms | 26,4079 ms | 0,45 |
Základná línia | 4800 | 119 768,219 ms | 181,5514 ms | 160,9406 ms | 1,00 |
Čiastočná optimalizácia | 4800 | 55 590,374 ms | 414,6051 ms | 387,8218 ms | 0,46 |
Celkom pôsobivé!
Skrátili sme čas výpočtu na polovicu! Samozrejme, čím je graf hustejší, tým menšie bude zrýchlenie. Toto je však jedna z optimalizácií, ktorá sa hodí, ak vopred viete, s akou triedou údajov máte pracovať.
Môžeme urobiť viac? 🙂
Môj počítač je vybavený procesorom Inter Core i7-7700 CPU 3.60GHz (Kaby Lake)
s 8 logickými procesormi ( HW ) alebo 4 jadrami s technológiou Hyper-Threading . Mať viac ako jedno jadro je ako mať viac „náhradných rúk“, ktoré môžeme dať do práce. Pozrime sa teda, ktorú časť práce možno efektívne rozdeliť medzi viacerých pracovníkov a potom ju paralelne uviesť.
Slučky sú vždy najzrejmejším kandidátom na paralelizáciu, pretože vo väčšine prípadov sú iterácie nezávislé a môžu sa vykonávať súčasne. V algoritme máme tri slučky, ktoré by sme mali analyzovať a zistiť, či existujú nejaké závislosti, ktoré nám bránia v ich paralelizácii.
Začnime od for of k
cyklu. V každej iteračnej slučke vypočítava cesty z každého vrcholu do každého vrcholu cez vrchol k
. Nové a aktualizované cesty sú uložené v matici váh. Pri pohľade na iterácie si môžete všimnúť – možno ich vykonať v akomkoľvek poradí: 0, 1, 2, 3 alebo 3, 1, 2, 0 bez ovplyvnenia výsledku. Stále sa však musia vykonávať postupne, inak niektoré z iterácií nebudú môcť používať nové alebo aktualizované cesty, pretože ešte nebudú zapísané do matice váh. Takáto nekonzistentnosť určite zničí výsledky. Musíme teda hľadať ďalej.
Ďalším kandidátom je slučka for of i
. V každej iteračnej slučke načíta cestu z vrcholu i
do vrcholu k
(bunka: W[i, k]
), cestu z vrcholu k
do vrcholu j
(bunka: W[k, j
]) a potom skontroluje, či známa cesta z i
do j
(bunka: W[i, j]
) je kratšia ako cesta i ⭢ k ⭢ j
(súčet: W[i, k]
+ W[k, j]
). Ak sa pozriete bližšie na prístupový vzor, môžete si všimnúť – pri každej iterácii i
slučka číta dáta z k
riadku a aktualizovaného i
riadku, čo v podstate znamená – iterácie sú nezávislé a môžu sa vykonávať nielen v akomkoľvek poradí, ale aj paralelne!
Vyzerá to sľubne, tak to poďme implementovať (túto implementáciu označujeme ako SpartialParallelOptimisations
).
for of j
slučka môže byť tiež paralelná. Paralelizácia najvnútornejšieho cyklu je však v tomto prípade veľmi neefektívna. Môžete si to overiť sami vykonaním niekoľkých jednoduchých zmien v zdrojovom kóde .
- Poznámka autora
public void SpartialParallelOptimisations(int[] matrix, int sz) { for (var k = 0; k < sz; ++k) { Parallel.For(0, sz, i => { if (matrix[i * sz + k] == NO_EDGE) { return; } for (var j = 0; j < sz; ++j) { var distance = matrix[i * sz + k] + matrix[k * sz + j]; if (matrix[i * sz + j] > distance) { matrix[i * sz + j] = distance; } } }); } }
Tu sú výsledky implementácií Baseline
, SpartialOptimisation
a SpartialParallelOptimisations
na rovnakej sade grafov (paralelizácia sa vykonáva pomocou triedy Parallel ):
Metóda | Veľkosť | Priemerná | Chyba | StdDev | Pomer |
---|---|---|---|---|---|
Základná línia | 300 | 27,525 ms | 0,1937 ms | 0,1617 ms | 1,00 |
Čiastočná optimalizácia | 300 | 12,399 ms | 0,0943 ms | 0,0882 ms | 0,45 |
SpartialParallelOptimizations | 300 | 6,252 ms | 0,0211 ms | 0,0187 ms | 0,23 |
Základná línia | 600 | 217,897 ms | 1,6415 ms | 1,5355 ms | 1,00 |
Čiastočná optimalizácia | 600 | 99,122 ms | 0,8230 ms | 0,7698 ms | 0,45 |
SpartialParallelOptimizations | 600 | 35,825 ms | 0,1003 ms | 0,0837 ms | 0,16 |
Základná línia | 1200 | 1 763,335 ms | 7,4561 ms | 6,2262 ms | 1,00 |
Čiastočná optimalizácia | 1200 | 766,675 ms | 6,1147 ms | 5,7197 ms | 0,43 |
SpartialParallelOptimizations | 1200 | 248,801 ms | 0,6040 ms | 0,5043 ms | 0,14 |
Základná línia | 2400 | 14 533,335 ms | 63,3518 ms | 52,9016 ms | 1,00 |
Čiastočná optimalizácia | 2400 | 6 507,878 ms | 28,2317 ms | 26,4079 ms | 0,45 |
SpartialParallelOptimizations | 2400 | 2 076,403 ms | 20,8320 ms | 19,4863 ms | 0,14 |
Základná línia | 4800 | 119 768,219 ms | 181,5514 ms | 160,9406 ms | 1,00 |
Čiastočná optimalizácia | 4800 | 55 590,374 ms | 414,6051 ms | 387,8218 ms | 0,46 |
SpartialParallelOptimizations | 4800 | 15 614,506 ms | 115,6996 ms | 102,5647 ms | 0,13 |
Z výsledkov môžeme vidieť, že paralelizácia cyklu for of i
skrátila čas výpočtu 2-4 krát v porovnaní s predchádzajúcou implementáciou ( SpartialOptimisation
)! Je to veľmi pôsobivé, ale je dôležité si uvedomiť, že paralelizácia čistých výpočtov spotrebúva všetky dostupné výpočtové zdroje, čo môže viesť k nedostatku zdrojov iných aplikácií v systéme.
Ako asi tušíte – toto nie je koniec 🙂
Vektorizácia je transformácia kódu fungujúceho na jednom prvku na kód fungujúci na viacerých prvkoch súčasne.
Môže to znieť ako zložitá záležitosť, tak sa pozrime, ako to funguje na jednoduchom príklade:
var a = new [] { 5, 7, 8, 1 }; var b = new [] { 4, 2, 2, 6 }; var c = new [] { 0, 0, 0, 0 }; for (var i = 0; i < 4; ++i) c[i] = a[i] + b[i];
Priveľmi zjednodušeným spôsobom možno vykonanie iterácie 0
vyššie uvedeného cyklu for
na úrovni CPU znázorniť takto:
Tu je to, čo sa deje. CPU načítava hodnoty polí a
a b
z pamäte do registrov CPU (jeden register môže obsahovať práve jednu hodnotu). Potom CPU vykoná operáciu skalárneho sčítania na týchto registroch a zapíše výsledok späť do hlavnej pamäte – priamo do poľa c
. Tento proces sa opakuje štyrikrát pred koncom slučky.
Vektorizácia znamená použitie špeciálnych CPU registrov – vektorových alebo SIMD (single inštrukcie multiple data) registrov a zodpovedajúcich CPU inštrukcií na vykonávanie operácií s viacerými hodnotami poľa naraz:
Tu je to, čo sa deje. CPU načíta hodnoty polí a
a b
z pamäte do registrov CPU (tentoraz však jeden vektorový register môže obsahovať dve hodnoty poľa). Potom CPU vykoná operáciu sčítania vektorov na týchto registroch a zapíše výsledok späť do hlavnej pamäte – priamo do poľa c
. Pretože pracujeme s dvoma hodnotami naraz, tento proces sa namiesto štyroch opakuje dvakrát.
Na implementáciu vektorizácie v .NET môžeme použiť – typy Vector a Vector<T> (môžeme použiť aj typy z menného priestoru System.Runtime.Intrinsics , sú však trochu pokročilé pre aktuálnu implementáciu, takže ich nebudem používať, ale cítim môžete si ich vyskúšať sami):
public void SpartialVectorOptimisations(int[] matrix, int sz) { for (var k = 0; k < sz; ++k) { for (var i = 0; i < sz; ++i) { if (matrix[i * sz + k] == NO_EDGE) { continue; } var ik_vec = new Vector<int>(matrix[i * sz + k]); var j = 0; for (; j < sz - Vector<int>.Count; j += Vector<int>.Count) { var ij_vec = new Vector<int>(matrix, i * sz + j); var ikj_vec = new Vector<int>(matrix, k * sz + j) + ik_vec; var lt_vec = Vector.LessThan(ij_vec, ikj_vec); if (lt_vec == new Vector<int>(-1)) { continue; } var r_vec = Vector.ConditionalSelect(lt_vec, ij_vec, ikj_vec); r_vec.CopyTo(matrix, i * sz + j); } for (; j < sz; ++j) { var distance = matrix[i * sz + k] + matrix[k * sz + j]; if (matrix[i * sz + j] > distance) { matrix[i * sz + j] = distance; } } } } }
Vektorizovaný kód môže vyzerať trochu bizarne, tak si ho poďme prejsť krok za krokom.
Vytvoríme vektor cesty i ⭢ k
: var ik_vec = new Vector<int>(matrix[i * sz + k])
. Výsledkom je, že ak vektor môže obsahovať štyri hodnoty typu int
a váha cesty i ⭢ k
je rovná 5, vytvoríme vektor štyroch 5 – [5, 5, 5, 5]. Ďalej pri každej iterácii súčasne vypočítame cesty z vrcholu i
k vrcholom j, j + 1, ..., j + Vector<int>.Count
.
Vlastnosť Vector<int>.Count
vráti počet prvkov typu int
, ktoré sa zmestia do vektorových registrov. Veľkosť vektorových registrov závisí od modelu CPU, takže táto vlastnosť môže vrátiť rôzne hodnoty na rôznych CPU.
- Poznámka autora
Celý proces výpočtu možno rozdeliť do troch krokov:
ij_vec
a ikj_vec
.ij_vec
a ikj_vec
a vyberte najmenšie hodnoty do nového vektora r_vec
.r_vec
späť do matice váh.
Zatiaľ čo #1 a #3 sú celkom jednoduché, #2 vyžaduje vysvetlenie. Ako už bolo spomenuté, pomocou vektorov manipulujeme s viacerými hodnotami súčasne. Preto výsledok niektorých operácií nemôže byť singulárny, tj porovnávacia operácia Vector.LessThan(ij_vec, ikj_vec)
nemôže vrátiť true
alebo false
, pretože porovnáva viacero hodnôt. Takže namiesto toho vráti nový vektor, ktorý obsahuje výsledky porovnania medzi zodpovedajúcimi hodnotami z vektorov ij_vec
a ikj_vec
( -1
, ak je hodnota z ij_vec
menšia ako hodnota z ikj_vec
a 0
, ak je to inak). Vrátený vektor (samotný o sebe) nie je veľmi užitočný, ale môžeme použiť vektorovú operáciu Vector.ConditionalSelect(lt_vec, ij_vec, ikj_vec)
na extrahovanie požadovaných hodnôt z vektorov ij_vec
a ikj_vec
do nového vektora – r_vec
. Táto operácia vráti nový vektor, kde hodnoty sú rovné menšie ako dve zodpovedajúce hodnoty vstupných vektorov, tj ak je hodnota vektora lt_vec
rovná -1
, potom operácia vyberie hodnotu z ij_vec
, v opačnom prípade vyberie hodnotu z ikj_vec
.
Okrem #2 je tu ešte jedna časť, ktorá si vyžaduje vysvetlenie – druhá, nevektorizovaná slučka. Vyžaduje sa, keď veľkosť matice váh nie je súčinom hodnoty Vector<int>.Count
. V takom prípade hlavná slučka nemôže spracovať všetky hodnoty (pretože nemôžete načítať časť vektora) a druhá, nevektorovaná, slučka vypočíta zostávajúce iterácie.
Tu sú výsledky tejto a všetkých predchádzajúcich implementácií:
Metóda | sz | Priemerná | Chyba | StdDev | Pomer |
---|---|---|---|---|---|
Základná línia | 300 | 27,525 ms | 0,1937 ms | 0,1617 ms | 1,00 |
Čiastočná optimalizácia | 300 | 12,399 ms | 0,0943 ms | 0,0882 ms | 0,45 |
SpartialParallelOptimizations | 300 | 6,252 ms | 0,0211 ms | 0,0187 ms | 0,23 |
SpartialVectorOptimizations | 300 | 3,056 ms | 0,0301 ms | 0,0281 ms | 0,11 |
Základná línia | 600 | 217,897 ms | 1,6415 ms | 1,5355 ms | 1,00 |
Čiastočná optimalizácia | 600 | 99,122 ms | 0,8230 ms | 0,7698 ms | 0,45 |
SpartialParallelOptimizations | 600 | 35,825 ms | 0,1003 ms | 0,0837 ms | 0,16 |
SpartialVectorOptimizations | 600 | 24,378 ms | 0,4320 ms | 0,4041 ms | 0,11 |
Základná línia | 1200 | 1 763,335 ms | 7,4561 ms | 6,2262 ms | 1,00 |
Čiastočná optimalizácia | 1200 | 766,675 ms | 6,1147 ms | 5,7197 ms | 0,43 |
SpartialParallelOptimizations | 1200 | 248,801 ms | 0,6040 ms | 0,5043 ms | 0,14 |
SpartialVectorOptimizations | 1200 | 185,628 ms | 2,1240 ms | 1,9868 ms | 0,11 |
Základná línia | 2400 | 14 533,335 ms | 63,3518 ms | 52,9016 ms | 1,00 |
Čiastočná optimalizácia | 2400 | 6 507,878 ms | 28,2317 ms | 26,4079 ms | 0,45 |
SpartialParallelOptimizations | 2400 | 2 076,403 ms | 20,8320 ms | 19,4863 ms | 0,14 |
SpartialVectorOptimizations | 2400 | 2 568,676 ms | 31,7359 ms | 29,6858 ms | 0,18 |
Základná línia | 4800 | 119 768,219 ms | 181,5514 ms | 160,9406 ms | 1,00 |
Čiastočná optimalizácia | 4800 | 55 590,374 ms | 414,6051 ms | 387,8218 ms | 0,46 |
SpartialParallelOptimizations | 4800 | 15 614,506 ms | 115,6996 ms | 102,5647 ms | 0,13 |
SpartialVectorOptimizations | 4800 | 18 257,991 ms | 84,5978 ms | 79,1329 ms | 0,15 |
Z výsledku je zrejmé, že vektorizácia výrazne skrátila čas výpočtu – 3 až 4 krát v porovnaní s neparalelnou verziou ( SpartialOptimisation
). Zaujímavým momentom je, že vektorizovaná verzia tiež prekonáva paralelnú verziu ( SpartialParallelOptimisations
) na menších grafoch (menej ako 2400 vrcholov).
A v neposlednom rade – spojme vektorizáciu a paralelizmus!
Ak vás zaujíma praktická aplikácia vektorizácie, odporúčam vám prečítať si sériu príspevkov od Dana Shechtera , kde vektorizoval Array.Sort
(tieto výsledky sa neskôr ocitli v aktualizácii pre Garbage Collector v .NET 5 ).
Posledná implementácia kombinuje úsilie o paralelizáciu a vektorizáciu a... úprimne povedané, chýba jej individualita 🙂 Pretože... no, práve sme nahradili telo Parallel.For
od SpartialParallelOptimisations
vektorizovanou slučkou od SpartialVectorOptimisations
:
public void SpartialParallelVectorOptimisations(int[] matrix, int sz) { for (var k = 0; k < sz; ++k) { Parallel.For(0, sz, i => { if (matrix[i * sz + k] == NO_EDGE) { return; } var ik_vec = new Vector<int>(matrix[i * sz + k]); var j = 0; for (; j < sz - Vector<int>.Count; j += Vector<int>.Count) { var ij_vec = new Vector<int>(matrix, i * sz + j); var ikj_vec = new Vector<int>(matrix, k * sz + j) + ik_vec; var lt_vec = Vector.LessThan(ij_vec, ikj_vec); if (lt_vec == new Vector<int>(-1)) { continue; } var r_vec = Vector.ConditionalSelect(lt_vec, ij_vec, ikj_vec); r_vec.CopyTo(matrix, i * sz + j); } for (; j < sz; ++j) { var distance = matrix[i * sz + k] + matrix[k * sz + j]; if (matrix[i * sz + j] > distance) { matrix[i * sz + j] = distance; } } }); } }
Všetky výsledky tohto príspevku sú uvedené nižšie. Ako sa očakávalo, simultánne použitie paralelizmu a vektorizácie preukázalo najlepšie výsledky, ktoré skrátilo čas výpočtu až 25-krát (pre grafy s 1200 vrcholmi) v porovnaní s počiatočnou implementáciou.
Metóda | sz | Priemerná | Chyba | StdDev | Pomer |
---|---|---|---|---|---|
Základná línia | 300 | 27,525 ms | 0,1937 ms | 0,1617 ms | 1,00 |
Čiastočná optimalizácia | 300 | 12,399 ms | 0,0943 ms | 0,0882 ms | 0,45 |
SpartialParallelOptimizations | 300 | 6,252 ms | 0,0211 ms | 0,0187 ms | 0,23 |
SpartialVectorOptimizations | 300 | 3,056 ms | 0,0301 ms | 0,0281 ms | 0,11 |
SpartialParallelVectorOptimizations | 300 | 3,008 ms | 0,0075 ms | 0,0066 ms | 0,11 |
Základná línia | 600 | 217,897 ms | 1,6415 ms | 1,5355 ms | 1,00 |
Čiastočná optimalizácia | 600 | 99,122 ms | 0,8230 ms | 0,7698 ms | 0,45 |
SpartialParallelOptimizations | 600 | 35,825 ms | 0,1003 ms | 0,0837 ms | 0,16 |
SpartialVectorOptimizations | 600 | 24,378 ms | 0,4320 ms | 0,4041 ms | 0,11 |
SpartialParallelVectorOptimizations | 600 | 13,425 ms | 0,0319 ms | 0,0283 ms | 0,06 |
Základná línia | 1200 | 1 763,335 ms | 7,4561 ms | 6,2262 ms | 1,00 |
Čiastočná optimalizácia | 1200 | 766,675 ms | 6,1147 ms | 5,7197 ms | 0,43 |
SpartialParallelOptimizations | 1200 | 248,801 ms | 0,6040 ms | 0,5043 ms | 0,14 |
SpartialVectorOptimizations | 1200 | 185,628 ms | 2,1240 ms | 1,9868 ms | 0,11 |
SpartialParallelVectorOptimizations | 1200 | 76,770 ms | 0,3021 ms | 0,2522 ms | 0,04 |
Základná línia | 2400 | 14 533,335 ms | 63,3518 ms | 52,9016 ms | 1,00 |
Čiastočná optimalizácia | 2400 | 6 507,878 ms | 28,2317 ms | 26,4079 ms | 0,45 |
SpartialParallelOptimizations | 2400 | 2 076,403 ms | 20,8320 ms | 19,4863 ms | 0,14 |
SpartialVectorOptimizations | 2400 | 2 568,676 ms | 31,7359 ms | 29,6858 ms | 0,18 |
SpartialParallelVectorOptimizations | 2400 | 1 281,877 ms | 25,1127 ms | 64,8239 ms | 0,09 |
Základná línia | 4800 | 119 768,219 ms | 181,5514 ms | 160,9406 ms | 1,00 |
Čiastočná optimalizácia | 4800 | 55 590,374 ms | 414,6051 ms | 387,8218 ms | 0,46 |
SpartialParallelOptimizations | 4800 | 15 614,506 ms | 115,6996 ms | 102,5647 ms | 0,13 |
SpartialVectorOptimizations | 4800 | 18 257,991 ms | 84,5978 ms | 79,1329 ms | 0,15 |
SpartialParallelVectorOptimizations | 4800 | 12 785,824 ms | 98,6947 ms | 87,4903 ms | 0,11 |
V tomto príspevku sme sa mierne ponorili do problému najkratšej cesty všetkých párov a implementovali sme Floyd-Warshallov algoritmus v C#, aby sme ho vyriešili. Aktualizovali sme tiež našu implementáciu, aby rešpektovala dáta a využívala nízkoúrovňové funkcie .NET a hardvéru.
V tomto príspevku sme hrali „all in“. V aplikáciách v reálnom živote je však dôležité mať na pamäti – nie sme sami. Hardcore paralelizácia môže výrazne znížiť výkon systému a spôsobiť viac škody ako úžitku. Vektorizácia na druhej strane je o niečo bezpečnejšia, ak sa vykonáva spôsobom nezávislým od platformy. Pamätajte, že niektoré vektorové inštrukcie môžu byť dostupné len na určitých CPU.
Dúfam, že sa vám čítanie páčilo a zabavili ste sa pri „vytláčaní“ niektorých kúskov výkonu z algoritmu len s TROCH cyklami 🙂