Svi mi rješavamo problem “ najkraćeg puta ” mnogo puta dnevno. Naravno, nenamjerno. Rješavamo to na putu do posla ili kada pretražujemo internet ili kada posložimo stvari na stolu.
Zvuči malo… previše? Hajde da saznamo.
Zamislite, odlučili ste da se nađete sa prijateljima, pa… recimo u kafiću. Prije svega, morate pronaći rutu (ili stazu) do kafića, pa počnite tražiti dostupni javni prijevoz (ako ste pješice) ili rute i ulice (ako se vozite). Tražite rutu od vaše trenutne lokacije do kafića, ali ne „bilo koju“ rutu – najkraću ili najbržu.
Evo još jednog primjera, koji nije toliko očigledan kao prethodni. U toku posla odlučite da krenete „prečicom“ kroz sporednu ulicu jer pa… to je „prečica“ i „brže“ je ići ovim putem. Ali kako ste znali da je ova „prečica“ brža? Na osnovu ličnog iskustva riješili ste problem „najkraći put“ i odabrali rutu koja ide sporednom ulicom.
U oba primjera, “najkraći” put je određen ili u udaljenosti ili vremenu potrebnom da se stigne od jednog mjesta do drugog. Putujući primjeri su vrlo prirodne primjene teorije grafova i posebno problema „najkraćeg puta“. Međutim, što je s primjerom s poslaganjem stvari na stolu? U ovom slučaju "najkraće" može predstavljati broj ili složenost radnji koje morate izvršiti da biste dobili, na primjer, list papira: otvorite sto, otvorite fasciklu, uzmite list papira nasuprot uzmite list papira direktno sa stola .
Svi gore navedeni primjeri predstavljaju problem pronalaženja najkraćeg puta između dva vrha u grafu (ruta ili putanja između dva mjesta, broj radnji ili složenost dobivanja lista papira sa jednog ili drugog mjesta). Ova klasa problema najkraćeg puta se zove SSSP (Single Source Shortest Path) i osnovni algoritam za rješavanje ovih problema je Dijkstraov algoritam , koji ima O(n^2)
računsku složenost.
Ali, ponekad moramo pronaći sve najkraće staze između svih vrhova. Razmislite o sljedećem primjeru: kreirate kartu za svoje redovno kretanje između kuće , posla i pozorišta . U ovom scenariju ćete završiti sa 6 ruta: work ⭢ home
, home ⭢ work
, work ⭢ theatre
, theatre ⭢ work
, home ⭢ theatre
i theatre ⭢ home
(obrnuti putevi mogu biti različiti zbog jednosmjernih puteva na primjer) .
Dodavanje više mjesta na kartu rezultirat će značajnim rastom kombinacija – prema permutacijama n formule kombinatorike može se izračunati kao:
A(k, n) = n! / (n - m)! // where n - is a number of elements, // k - is a number of elements in permutation (in our case k = 2)
Što nam daje 12 ruta za 4 mjesta i 90 ruta za 10 mjesta (što je impresivno). Napomena… ovo je bez uzimanja u obzir međutačaka između mjesta, tj. da biste od kuće do posla morali preći 4 ulice, prošetati uz rijeku i prijeći most. Ako zamislimo, neke rute mogu imati zajedničke međutačke… pa… kao rezultat imaćemo veoma veliki graf, sa puno vrhova, gde će svaki vrh predstavljati ili mesto ili značajnu međutačku. Klasa problema, u kojoj treba da pronađemo sve najkraće putanje između svih parova vrhova u grafu, naziva se APSP (All Pairs Shortest Paths), a osnovni algoritam za rešavanje ovih problema je Floyd-Warshall algoritam , koji ima O(n^3)
složenost računanja.
A ovo je algoritam koji ćemo implementirati danas 🙂
Floyd-Warshall algoritam pronalazi sve najkraće staze između svakog para vrhova u grafu. Algoritme je objavio Robert Floyd u [1] (pogledajte odjeljak “Reference” za više detalja). Iste godine Peter Ingerman je u [2] opisao modernu implementaciju algoritma u obliku tri ugniježđene for
petlje:
algorithm FloydWarshall(W) do for k = 0 to N - 1 do for i = 0 to N - 1 do for j = 0 to N - 1 do W[i, j] = min(W[i, j], W[i, k] + W[k, j]) end for end for end for end algorithm // where W - is a weight matrix of N x N size, // min() - is a function which returns lesser of it's arguments
Ako nikada niste imali promjenu u radu sa grafom predstavljenim u obliku matrice onda bi moglo biti teško razumjeti šta radi gornji algoritam. Dakle, da bismo bili sigurni da smo na istoj stranici, pogledajmo kako se graf može predstaviti u obliku matrice i zašto je takav prikaz koristan za rješavanje problema najkraće putanje.
Slika ispod ilustruje usmjereni, ponderisani graf od 5 vrhova. Na lijevoj strani, graf je predstavljen u vizualnom obliku, koji je sačinjen od krugova i strelica, gdje svaki krug predstavlja vrh, a strelica predstavlja rub sa smjerom. Broj unutar kruga odgovara broju vrha, a broj iznad ivice odgovara težini ivice. Desno je isti graf predstavljen u obliku matrice težine. Matrica težine je oblik matrice susjedstva gdje svaka ćelija matrice sadrži “težinu” – udaljenost između vrha i
(red) i temena j
(kolona). Matrica težina ne uključuje informacije o „putanju“ između vrhova (lista vrhova kroz koje dolazite od i
do j
) – samo težinu (udaljenost) između ovih vrhova.
U matrici težine možemo vidjeti da su vrijednosti ćelija jednake težinama između susjednih vrhova. Zato, ako pregledamo putanje od vrha 0
(red 0
), vidjet ćemo da ... postoji samo jedan put – od 0
do 1
. Ali, na vizuelnom prikazu možemo jasno vidjeti putanje od vrha 0
do vrhova 2
i 3
(kroz vrh 1
). Razlog za to je jednostavan – u početnom stanju matrica težine sadrži samo udaljenost između susjednih vrhova. Međutim, sama ova informacija dovoljna je da se pronađe ostatak.
Hajde da vidimo kako to radi. Obratite pažnju na ćeliju W[0, 1]
. Njegova vrijednost pokazuje da postoji put od vrha 0
do vrha 1
sa težinom jednakom 1
(ukratko možemo zapisati kao: 0 ⭢ 1: 1
). Sa ovim znanjem, sada možemo skenirati sve ćelije reda 1
(koji sadrži sve težine svih putanja od vrha 1
) i vratiti ovu informaciju u red 0
, povećavajući težinu za 1
(vrijednost W[0, 1]
).
Koristeći iste korake možemo pronaći putanje od vrha 0
kroz druge vrhove. Tokom pretrage može se desiti da postoji više putanja koja vodi do istog vrha i što je još važnije težine ovih putanja mogu biti različite. Primjer takve situacije je ilustrovan na donjoj slici, gdje je pretraživanjem od vrha 0
do temena 2
otkriven još jedan put do vrha 3
manje težine.
Imamo dva puta: originalni put – 0 ⭢ 3: 4
i novi put koji smo upravo otkrili – 0 ⭢ 2 ⭢ 3: 3
(imajte na umu, matrica težine ne sadrži putanje, tako da ne znamo koji vrhovi su uključeni u originalnu putanju). Ovo je trenutak kada donosimo odluku da zadržimo najkraću i upišemo 3
u ćeliju W[0, 3]
.
Izgleda da smo upravo pronašli naš prvi najkraći put!
Koraci koje smo upravo vidjeli su suština Floyd-Warshall algoritma. Pogledajte pseudokod algoritma još jednom:
algorithm FloydWarshall(W) do for k = 0 to N - 1 do for i = 0 to N - 1 do for j = 0 to N - 1 do W[i, j] = min(W[i, j], W[i, k] + W[k, j]) end for end for end for end algorithm
Najudaljeniji ciklus for
on k
iterira preko svih vrhova u grafu i na svakoj iteraciji varijabla k
predstavlja vrh kroz koji pretražujemo putanje . Unutrašnji ciklus for
on i
iterira preko svih vrhova u grafu i na svakoj iteraciji, i
predstavlja vrh od kojeg tražimo staze . I na kraju, najdublji ciklus for
on j
iterira preko svih vrhova u grafu i na svakoj iteraciji, j
predstavlja vrh do kojeg tražimo putanju. U kombinaciji nam daje sljedeće: na svakoj iteraciji k
tražimo staze od svih vrhova i
do svih vrhova j
kroz vrh k
. Unutar ciklusa poredimo putanju i ⭢ j
(predstavljenu W[i, j]
) sa stazom i ⭢ k ⭢ j
(predstavljenu zbirom W[I, k]
i W[k, j]
) i zapisujemo najkraći jedan nazad u W[i, j]
.
Sada, kada razumijemo mehaniku, vrijeme je da implementiramo algoritam.
Izvorni kod i eksperimentalni grafovi dostupni su u spremištu na GitHubu. Eksperimentalni grafikoni se mogu naći u direktoriju Data/Sparse-Graphs.zip
. Sva mjerila u ovom postu implementirana su u APSP01.cs fajl.
Prije nego što pređemo na implementaciju, moramo razjasniti nekoliko tehničkih momenata:
NO_EDGE = (int.MaxValue / 2) - 1
.
Sada, hajde da shvatimo zašto je to tako.
Što se tiče #1. Kada govorimo o matricama, ćelije definiramo u terminima “redova” i “kolona”. Zbog toga se čini prirodnim zamisliti matricu u obliku “kvadrata” ili “pravokutnika” (Slika 4a).
Međutim, to ne znači da moramo predstaviti matricu u obliku niza nizova (Slika 4b) da bismo se pridržavali svoje mašte. Umjesto toga, možemo predstaviti matricu u obliku linearnog niza (slika 4c) gdje se indeks svake ćelije izračunava pomoću sljedeće formule:
i = row * row_size + col; // where row - cell row index, // col - cell column index, // row_size - number of cells in a row.
Linealni niz matrice težine je preduslov za efikasno izvršavanje Floyd-Warshall algoritma. Razlog tome nije jednostavan i detaljno objašnjenje zaslužuje poseban post... ili nekoliko postova. Međutim, trenutno je važno napomenuti da ovakva reprezentacija značajno poboljšava lokalizaciju podataka , što u stvari ima veliki uticaj na performanse algoritma.
Evo, molim vas da mi vjerujete i samo ovu informaciju imate na umu kao preduvjet, međutim, ujedno vam preporučujem da odvojite malo vremena i proučite pitanje, i usput – ne vjerujte ljudima na internetu .
- Napomena autora
Što se tiče #2. Ako bolje pogledate pseudokod algoritma, nećete naći nikakve provjere vezane za postojanje puta između dva vrha. Umjesto toga, pseudokod jednostavno koristi funkciju min()
. Razlog je jednostavan – prvobitno, ako ne postoji put između vrhova, vrijednost ćelije je postavljena na infinity
i u svim jezicima, osim što može biti JavaScript, sve vrijednosti su manje od infinity
. U slučaju cijelih brojeva, moglo bi biti primamljivo koristiti int.MaxValue
kao vrijednost “bez puta”. Međutim, ovo će dovesti do prekoračenja cijelog broja u slučajevima kada će vrijednosti oba puta i ⭢ k
i k ⭢ j
biti jednake int.MaxValue
. Zato koristimo vrijednost koja je jedna manja od polovine int.MaxValue
.
Hej! Ali zašto ne možemo jednostavno provjeriti postoji li putanja prije nego što izvršimo bilo kakve kalkulacije. Na primjer, usporedbom putanja oba sa 0 (ako uzmemo nulu kao vrijednost "bez puta").
- Promišljen čitalac
To je zaista moguće, ali nažalost to će dovesti do značajne kazne za učinak. Ukratko, CPU vodi statistiku rezultata evaluacije grane npr. kada neki od if
iskaza procijene na true
ili false
. Koristi ovu statistiku za izvršavanje koda “statistički predviđene grane” unaprijed prije nego što se procijeni stvarna if
izjava (ovo se zove spekulativno izvršenje ) i stoga efikasnije izvršava kod. Međutim, kada je predviđanje CPU-a netačno, to uzrokuje značajan gubitak performansi u poređenju sa ispravnim predviđanjem i bezuslovnim izvršenjem jer CPU mora da se zaustavi i izračuna ispravnu granu.
Budući da na svakoj iteraciji k
ažuriramo značajan dio matrice težine, statistika grananja CPU-a postaje beskorisna jer ne postoji obrazac koda, sve grane se zasnivaju isključivo na podacima. Dakle, takva provjera će rezultirati značajnom količinom pogrešnih predviđanja grana .
Ovdje vas također molim da mi vjerujete (za sada) i onda odvojite malo vremena za proučavanje teme.
Uhh, izgleda da smo gotovi s teorijskim dijelom – implementirajmo algoritam (ovu implementaciju označavamo kao Baseline
):
public void Baseline(int[] matrix, int sz) { for (var k = 0; k < sz; ++k) { for (var i = 0; i < sz; ++i) { for (var j = 0; j < sz; ++j) { var distance = matrix[i * sz + k] + matrix[k * sz + j]; if (matrix[i * sz + j] > distance) { matrix[i * sz + j] = distance; } } } } }
Gornji kod je gotovo identična kopija prethodno spomenutog pseudokoda sa jednim izuzetkom – umjesto Math.Min()
koristimo if
da ažuriramo ćeliju samo kada je to potrebno.
Hej! Čekaj malo, nisi li ti upravo napisao puno riječi o tome zašto ako ovdje nije dobro, a nekoliko redova kasnije i sami uvodimo ako?
- Promišljen čitalac
Razlog je jednostavan. U trenutku pisanja JIT emituje skoro ekvivalentan kod za if
i Math.Min
implementacije. Možete ga detaljno provjeriti na Sharplab.io, ali evo isječaka tijela glavne petlje:
// // == Assembly code of implementation of innermost loop for of j using if. // 53: movsxd r14, r14d // // compare matrix[i * sz + j] and distance (Condition) // 56: cmp [rdx+r14*4+0x10], ebx 5b: jle short 64 // // if matrix[i * sz + j] greater than distance write distance to matrix // 5d: movsxd rbp, ebp 60: mov [rdx+rbp*4+0x10], ebx 64: // end of loop // // == Assembly code of implementation of innermost loop for of j using Math.Min. // 4f: movsxd rbp, ebp 52: mov r14d, [rdx+rbp*4+0x10] // // compare matrix[i * sz + j] and distance (Condition) // 57: cmp r14d, ebx. // // if matrix[i * sz + j] less than distance write value to matrix // 5a: jle short 5e // // otherwise write distance to matrix // 5c: jmp short 61 5e: mov ebx, r14d 61: mov [rdx+rbp*4+0x10], ebx 65: // end of loop
Možda ćete vidjeti, bez obzira da li koristimo if
ili Math.Min
još uvijek postoji uvjetna provjera. Međutim, u slučaju if
nema nepotrebnog pisanja.
Sada kada smo završili sa implementacijom, vrijeme je da pokrenemo kod i vidimo koliko je brz?!
Možete sami provjeriti ispravnost koda pokretanjem testova u spremištu .
Koristim Benchmark.NET (verzija 0.12.1) za benchmark kod. Svi grafovi koji se koriste u benčmarkovima su usmjereni, aciklični grafovi od 300, 600, 1200, 2400 i 4800 vrhova. Broj ivica u grafovima je oko 80% mogućeg maksimuma koji se za usmjerene, aciklične grafove može izračunati na sljedeći način:
var max = v * (v - 1)) / 2; // where v - is a number of vertexes in a graph.
Let's rock!
Evo rezultata benchmarka koji su pokrenuti na mom računaru (Windows 10.0.19042, Intel Core i7-7700 CPU 3,60 Ghz (Kaby Lake) / 8 logičkih procesora / 4 jezgra):
Metoda | Veličina | Zlo | Greška | StdDev |
---|---|---|---|---|
Polazna linija | 300 | 27.525 ms | 0.1937 ms | 0.1617 ms |
Polazna linija | 600 | 217.897 ms | 1.6415 ms | 1.5355 ms |
Polazna linija | 1200 | 1,763,335 ms | 7.4561 ms | 6.2262 ms |
Polazna linija | 2400 | 14,533,335 ms | 63.3518 ms | 52.9016 ms |
Polazna linija | 4800 | 119,768,219 ms | 181.5514 ms | 160.9406 ms |
Iz rezultata koje možemo vidjeti, vrijeme izračuna drastično raste u odnosu na veličinu grafa – za graf od 300 vrhova bilo je potrebno 27 milisekundi, za graf od 2400 vrhova – 14,5 sekundi, a za graf od 4800 – 119 sekundi što je skoro 2 minute !
Gledajući kod algoritma to bi moglo biti teško zamisliti, postoji nešto što možemo učiniti da ubrzamo proračune... jer pa... postoje TRI petlje, samo TRI petlje.
Međutim, kako to obično biva – mogućnosti se kriju u detaljima 🙂
Floyd-Warshall algoritam je osnovni algoritam za rješavanje problema najkraće putanje svih parova, posebno kada su u pitanju gusti ili potpuni grafovi (jer algoritam pretražuje putanje između svih parova vrhova).
Međutim, u našim eksperimentima koristimo usmjerene, aciklične grafove, koji imaju divno svojstvo – ako postoji put od vrha 1
do vrha 2
, onda nema puta od vrha 2
do vrha 1
. Za nas, to znači da postoji mnogo nesusednih vrhova koje možemo preskočiti ako ne postoji put od i
do k
(ovu implementaciju označavamo kao SpartialOptimisation
).
public void SpartialOptimisation(int[] matrix, int sz) { for (var k = 0; k < sz; ++k) { for (var i = 0; i < sz; ++i) { if (matrix[i * sz + k] == NO_EDGE) { continue; } for (var j = 0; j < sz; ++j) { var distance = matrix[i * sz + k] + matrix[k * sz + j]; if (matrix[i * sz + j] > distance) { matrix[i * sz + j] = distance; } } } } }
Evo rezultata prethodnih ( Baseline
) i trenutnih ( SpartialOptimisation
) implementacija na istom skupu grafikona (najbrži rezultati su istaknuti podebljanim slovima ):
Metoda | Veličina | Zlo | Greška | StdDev | Ratio |
---|---|---|---|---|---|
Polazna linija | 300 | 27.525 ms | 0.1937 ms | 0.1617 ms | 1.00 |
SpartialOptimization | 300 | 12.399 ms | 0.0943 ms | 0.0882 ms | 0,45 |
Polazna linija | 600 | 217.897 ms | 1.6415 ms | 1.5355 ms | 1.00 |
SpartialOptimization | 600 | 99.122 ms | 0.8230 ms | 0.7698 ms | 0,45 |
Polazna linija | 1200 | 1,763,335 ms | 7.4561 ms | 6.2262 ms | 1.00 |
SpartialOptimization | 1200 | 766.675 ms | 6.1147 ms | 5.7197 ms | 0.43 |
Polazna linija | 2400 | 14,533,335 ms | 63.3518 ms | 52.9016 ms | 1.00 |
SpartialOptimization | 2400 | 6,507.878 ms | 28.2317 ms | 26.4079 ms | 0,45 |
Polazna linija | 4800 | 119,768,219 ms | 181.5514 ms | 160.9406 ms | 1.00 |
SpartialOptimization | 4800 | 55,590,374 ms | 414.6051 ms | 387.8218 ms | 0.46 |
Sasvim impresivno!
Smanjili smo vrijeme obračuna za pola! Naravno, što je graf gušći, to će biti manja brzina. Međutim, ovo je jedna od optimizacija koja je zgodna ako unaprijed znate s kojom klasom podataka namjeravate raditi.
Možemo li više? 🙂
Moj računar je opremljen Inter Core i7-7700 CPU 3.60GHz (Kaby Lake)
procesorom sa 8 logičkih procesora ( HW ) ili 4 jezgra sa Hyper-Threading tehnologijom. Imati više od jednog jezgra je kao imati više “rezervnih ruku” koje možemo staviti na posao. Dakle, da vidimo koji dio posla se može efikasno podijeliti između više radnika i onda ga paralelizirati.
Petlje su uvijek najočitiji kandidat za paralelizaciju jer su u većini slučajeva iteracije nezavisne i mogu se izvršavati istovremeno. U algoritmu imamo tri petlje koje treba analizirati i vidjeti postoje li ovisnosti koje nas sprječavaju da ih paraleliziramo.
Počnimo od for of k
petlje. Na svakoj iteraciji petlja izračunava putanje od svakog vrha do svakog vrha kroz vrh k
. Nove i ažurirane staze se pohranjuju u matricu težine. Gledajući iteracije možete primijetiti – mogu se izvršiti bilo kojim redoslijedom: 0, 1, 2, 3 ili 3, 1, 2, 0 bez utjecaja na rezultat. Međutim, i dalje se moraju izvršavati u nizu, inače neke od iteracija neće moći koristiti nove ili ažurirane putanje jer još neće biti upisane u matricu težine. Takva nedosljednost će definitivno uništiti rezultate. Zato moramo nastaviti da tražimo.
Sljedeći kandidat je for of i
petlju. Na svakoj iteraciji petlja čita putanju od vrha i
do temena k
(ćelija: W[i, k]
), putanju od vrha k
do temena j
(ćelija: W[k, j
]) i zatim provjerava da li je poznata putanja od i
do j
(ćelija: W[i, j]
) je kraća od i ⭢ k ⭢ j
putanje (zbir: W[i, k]
+ W[k, j]
). Ako bolje pogledate obrazac pristupa, mogli biste primijetiti – na svakoj iteraciji i
petlja čita podatke iz k
reda i ažuriranog i
reda što u osnovi znači – iteracije su nezavisne i mogu se izvršavati ne samo bilo kojim redoslijedom već i paralelno!
Ovo izgleda obećavajuće, pa hajde da ga implementiramo (ovu implementaciju označavamo kao SpartialParallelOptimisations
).
for of j
petlja također može biti paralelna. Međutim, paralelizacija najdubljeg ciklusa u ovom slučaju je vrlo neefikasna. To možete sami provjeriti tako što ćete napraviti nekoliko jednostavnih promjena u izvornom kodu .
- Napomena autora
public void SpartialParallelOptimisations(int[] matrix, int sz) { for (var k = 0; k < sz; ++k) { Parallel.For(0, sz, i => { if (matrix[i * sz + k] == NO_EDGE) { return; } for (var j = 0; j < sz; ++j) { var distance = matrix[i * sz + k] + matrix[k * sz + j]; if (matrix[i * sz + j] > distance) { matrix[i * sz + j] = distance; } } }); } }
Evo rezultata implementacije Baseline
, SpartialOptimisation
i SpartialParallelOptimisations
na istom skupu grafova (paralelizacija se radi pomoću klase Parallel ):
Metoda | Veličina | Zlo | Greška | StdDev | Ratio |
---|---|---|---|---|---|
Polazna linija | 300 | 27.525 ms | 0.1937 ms | 0.1617 ms | 1.00 |
SpartialOptimization | 300 | 12.399 ms | 0.0943 ms | 0.0882 ms | 0,45 |
SpartialParallelOptimizations | 300 | 6.252 ms | 0.0211 ms | 0.0187 ms | 0.23 |
Polazna linija | 600 | 217.897 ms | 1.6415 ms | 1.5355 ms | 1.00 |
SpartialOptimization | 600 | 99.122 ms | 0.8230 ms | 0.7698 ms | 0,45 |
SpartialParallelOptimizations | 600 | 35.825 ms | 0.1003 ms | 0.0837 ms | 0.16 |
Polazna linija | 1200 | 1,763,335 ms | 7.4561 ms | 6.2262 ms | 1.00 |
SpartialOptimization | 1200 | 766.675 ms | 6.1147 ms | 5.7197 ms | 0.43 |
SpartialParallelOptimizations | 1200 | 248.801 ms | 0.6040 ms | 0.5043 ms | 0.14 |
Polazna linija | 2400 | 14,533,335 ms | 63.3518 ms | 52.9016 ms | 1.00 |
SpartialOptimization | 2400 | 6,507.878 ms | 28.2317 ms | 26.4079 ms | 0,45 |
SpartialParallelOptimizations | 2400 | 2,076.403 ms | 20.8320 ms | 19.4863 ms | 0.14 |
Polazna linija | 4800 | 119,768,219 ms | 181.5514 ms | 160.9406 ms | 1.00 |
SpartialOptimization | 4800 | 55,590,374 ms | 414.6051 ms | 387.8218 ms | 0.46 |
SpartialParallelOptimizations | 4800 | 15,614,506 ms | 115.6996 ms | 102.5647 ms | 0.13 |
Iz rezultata možemo vidjeti da je paralelizacija for of i
petlje smanjila vrijeme izračunavanja za 2-4 puta u odnosu na prethodnu implementaciju ( SpartialOptimisation
)! Ovo je vrlo impresivno, međutim, važno je zapamtiti, paralelizacija čistih računanja troši sve dostupne računarske resurse što može dovesti do iscrpljivanja resursa drugih aplikacija u sistemu.
Kao što možete pretpostaviti – ovo nije kraj 🙂
Vektorizacija je transformacija koda koji radi na jednom elementu u kod koji djeluje na više elemenata istovremeno.
Ovo bi moglo zvučati kao složena stvar, pa da vidimo kako to funkcionira na jednostavnom primjeru:
var a = new [] { 5, 7, 8, 1 }; var b = new [] { 4, 2, 2, 6 }; var c = new [] { 0, 0, 0, 0 }; for (var i = 0; i < 4; ++i) c[i] = a[i] + b[i];
Na previše pojednostavljen način, izvršenje iteracije 0
gornje for
petlje na nivou CPU-a može se ilustrovati na sljedeći način:
Evo šta se dešava. CPU učitava vrijednosti nizova a
i b
iz memorije u CPU registre (jedan registar može sadržavati točno jednu vrijednost). Zatim CPU izvršava skalarnu operaciju sabiranja na ovim registrima i upisuje rezultat nazad u glavnu memoriju – pravo u niz c
. Ovaj proces se ponavlja četiri puta prije nego što se petlja završi.
Vektorizacija znači korištenje posebnih CPU registara – vektorskih ili SIMD (single istruction multiple data) registara, i odgovarajućih CPU instrukcija za izvršavanje operacija na više vrijednosti niza odjednom:
Evo šta se dešava. CPU učitava vrijednosti nizova a
i b
iz memorije u CPU registre (međutim, ovaj put, jedan vektorski registar može držati dvije vrijednosti niza). Zatim CPU izvršava operaciju vektorskog sabiranja na ovim registrima i upisuje rezultat nazad u glavnu memoriju – pravo u niz c
. Budući da radimo na dvije vrijednosti odjednom, ovaj proces se ponavlja dva puta umjesto četiri.
Za implementaciju vektorizacije u .NET možemo koristiti – Vector i Vector<T> tipove (također možemo koristiti tipove iz imenskog prostora System.Runtime.Intrinsics , međutim oni su malo napredniji za trenutnu implementaciju, tako da ih neću koristiti, ali osjećam slobodno ih isprobajte sami):
public void SpartialVectorOptimisations(int[] matrix, int sz) { for (var k = 0; k < sz; ++k) { for (var i = 0; i < sz; ++i) { if (matrix[i * sz + k] == NO_EDGE) { continue; } var ik_vec = new Vector<int>(matrix[i * sz + k]); var j = 0; for (; j < sz - Vector<int>.Count; j += Vector<int>.Count) { var ij_vec = new Vector<int>(matrix, i * sz + j); var ikj_vec = new Vector<int>(matrix, k * sz + j) + ik_vec; var lt_vec = Vector.LessThan(ij_vec, ikj_vec); if (lt_vec == new Vector<int>(-1)) { continue; } var r_vec = Vector.ConditionalSelect(lt_vec, ij_vec, ikj_vec); r_vec.CopyTo(matrix, i * sz + j); } for (; j < sz; ++j) { var distance = matrix[i * sz + k] + matrix[k * sz + j]; if (matrix[i * sz + j] > distance) { matrix[i * sz + j] = distance; } } } } }
Vektorizovani kod može izgledati pomalo bizarno, pa idemo kroz njega korak po korak.
Kreiramo vektor putanje i ⭢ k
: var ik_vec = new Vector<int>(matrix[i * sz + k])
. Kao rezultat, ako vektor može sadržavati četiri vrijednosti tipa int
i težina i ⭢ k
putanja je jednaka 5, kreiraćemo vektor od četiri petice – [5, 5, 5, 5]. Zatim, na svakoj iteraciji, istovremeno izračunavamo putanje od vrha i
do vrhova j, j + 1, ..., j + Vector<int>.Count
.
Svojstvo Vector<int>.Count
vraća broj elemenata tipa int
koji se uklapa u vektorske registre. Veličina vektorskih registara ovisi o modelu CPU-a, tako da ovo svojstvo može vratiti različite vrijednosti na različitim CPU-ima.
- Napomena autora
Cijeli proces izračunavanja može se podijeliti u tri koraka:
ij_vec
i ikj_vec
.ij_vec
i ikj_vec
i odaberite najmanje vrijednosti u novi vektor r_vec
.r_vec
natrag u matricu težine.
Dok su #1 i #3 prilično jednostavni, #2 zahtijeva objašnjenje. Kao što je već pomenuto, vektorima manipulišemo sa više vrednosti u isto vreme. Stoga, rezultat nekih operacija ne može biti singularan, tj. operacija poređenja Vector.LessThan(ij_vec, ikj_vec)
ne može vratiti true
ili false
jer uspoređuje više vrijednosti. Dakle, umjesto toga vraća novi vektor koji sadrži rezultate poređenja između odgovarajućih vrijednosti iz vektora ij_vec
i ikj_vec
( -1
, ako je vrijednost iz ij_vec
manja od vrijednosti iz ikj_vec
i 0
ako je drugačije). Vraćeni vektor (sam po sebi) nije baš koristan, međutim, možemo koristiti vektorsku operaciju Vector.ConditionalSelect(lt_vec, ij_vec, ikj_vec)
da izvučemo tražene vrijednosti iz vektora ij_vec
i ikj_vec
u novi vektor – r_vec
. Ova operacija vraća novi vektor gdje su vrijednosti jednake manjim od dvije odgovarajuće vrijednosti ulaznih vektora, tj. ako je vrijednost vektora lt_vec
jednaka -1
, tada operacija bira vrijednost iz ij_vec
, u suprotnom odabire vrijednost iz ikj_vec
.
Osim #2 , postoji još jedan dio koji zahtijeva objašnjenje – druga, nevektorizirana petlja. Potreban je kada veličina matrice težine nije proizvod Vector<int>.Count
vrijednosti. U tom slučaju glavna petlja ne može obraditi sve vrijednosti (jer ne možete učitati dio vektora), a druga, nevektorska, petlja će izračunati preostale iteracije.
Evo rezultata ove i svih prethodnih implementacija:
Metoda | sz | Zlo | Greška | StdDev | Ratio |
---|---|---|---|---|---|
Polazna linija | 300 | 27.525 ms | 0.1937 ms | 0.1617 ms | 1.00 |
SpartialOptimization | 300 | 12.399 ms | 0.0943 ms | 0.0882 ms | 0,45 |
SpartialParallelOptimizations | 300 | 6.252 ms | 0.0211 ms | 0.0187 ms | 0.23 |
SpartialVectorOptimizations | 300 | 3.056 ms | 0.0301 ms | 0.0281 ms | 0.11 |
Polazna linija | 600 | 217.897 ms | 1.6415 ms | 1.5355 ms | 1.00 |
SpartialOptimization | 600 | 99.122 ms | 0.8230 ms | 0.7698 ms | 0,45 |
SpartialParallelOptimizations | 600 | 35.825 ms | 0.1003 ms | 0.0837 ms | 0.16 |
SpartialVectorOptimizations | 600 | 24.378 ms | 0.4320 ms | 0.4041 ms | 0.11 |
Polazna linija | 1200 | 1,763,335 ms | 7.4561 ms | 6.2262 ms | 1.00 |
SpartialOptimization | 1200 | 766.675 ms | 6.1147 ms | 5.7197 ms | 0.43 |
SpartialParallelOptimizations | 1200 | 248.801 ms | 0.6040 ms | 0.5043 ms | 0.14 |
SpartialVectorOptimizations | 1200 | 185.628 ms | 2.1240 ms | 1.9868 ms | 0.11 |
Polazna linija | 2400 | 14,533,335 ms | 63.3518 ms | 52.9016 ms | 1.00 |
SpartialOptimization | 2400 | 6,507.878 ms | 28.2317 ms | 26.4079 ms | 0,45 |
SpartialParallelOptimizations | 2400 | 2,076.403 ms | 20.8320 ms | 19.4863 ms | 0.14 |
SpartialVectorOptimizations | 2400 | 2,568.676 ms | 31.7359 ms | 29.6858 ms | 0.18 |
Polazna linija | 4800 | 119,768,219 ms | 181.5514 ms | 160.9406 ms | 1.00 |
SpartialOptimization | 4800 | 55,590,374 ms | 414.6051 ms | 387.8218 ms | 0.46 |
SpartialParallelOptimizations | 4800 | 15,614,506 ms | 115.6996 ms | 102.5647 ms | 0.13 |
SpartialVectorOptimizations | 4800 | 18,257,991 ms | 84.5978 ms | 79.1329 ms | 0.15 |
Iz rezultata je očigledno da je vektorizacija značajno smanjila vreme izračunavanja – od 3 do 4 puta u poređenju sa neparalelizovanom verzijom ( SpartialOptimisation
). Interesantan momenat je da vektorizovana verzija takođe nadmašuje paralelnu verziju ( SpartialParallelOptimisations
) na manjim grafovima (manje od 2400 vrhova).
I na kraju, ali ne i najmanje važno – spojimo vektorizaciju i paralelizam!
Ako vas zanima praktična primjena vektorizacije, preporučujem vam da pročitate seriju postova Dana Shechtera gdje je vektorizirao Array.Sort
(ovi rezultati su se kasnije našli u ažuriranju za Garbage Collector u .NET 5 ).
Poslednja implementacija kombinuje napore paralelizacije i vektorizacije i… da budem iskrena, nedostaje joj individualnost 🙂 Jer… pa, upravo smo zamenili telo Parallel.For
iz SpartialParallelOptimisations
vektorizovanom petljom iz SpartialVectorOptimisations
:
public void SpartialParallelVectorOptimisations(int[] matrix, int sz) { for (var k = 0; k < sz; ++k) { Parallel.For(0, sz, i => { if (matrix[i * sz + k] == NO_EDGE) { return; } var ik_vec = new Vector<int>(matrix[i * sz + k]); var j = 0; for (; j < sz - Vector<int>.Count; j += Vector<int>.Count) { var ij_vec = new Vector<int>(matrix, i * sz + j); var ikj_vec = new Vector<int>(matrix, k * sz + j) + ik_vec; var lt_vec = Vector.LessThan(ij_vec, ikj_vec); if (lt_vec == new Vector<int>(-1)) { continue; } var r_vec = Vector.ConditionalSelect(lt_vec, ij_vec, ikj_vec); r_vec.CopyTo(matrix, i * sz + j); } for (; j < sz; ++j) { var distance = matrix[i * sz + k] + matrix[k * sz + j]; if (matrix[i * sz + j] > distance) { matrix[i * sz + j] = distance; } } }); } }
Svi rezultati ovog posta su predstavljeni u nastavku. Kao što se i očekivalo, simultana upotreba paralelizma i vektorizacije pokazala je najbolje rezultate, smanjujući vrijeme proračuna do 25 puta (za grafove od 1200 vrhova) u odnosu na početnu implementaciju.
Metoda | sz | Zlo | Greška | StdDev | Ratio |
---|---|---|---|---|---|
Polazna linija | 300 | 27.525 ms | 0.1937 ms | 0.1617 ms | 1.00 |
SpartialOptimization | 300 | 12.399 ms | 0.0943 ms | 0.0882 ms | 0,45 |
SpartialParallelOptimizations | 300 | 6.252 ms | 0.0211 ms | 0.0187 ms | 0.23 |
SpartialVectorOptimizations | 300 | 3.056 ms | 0.0301 ms | 0.0281 ms | 0.11 |
SpartialParallelVectorOptimizations | 300 | 3.008 ms | 0.0075 ms | 0.0066 ms | 0.11 |
Polazna linija | 600 | 217.897 ms | 1.6415 ms | 1.5355 ms | 1.00 |
SpartialOptimization | 600 | 99.122 ms | 0.8230 ms | 0.7698 ms | 0,45 |
SpartialParallelOptimizations | 600 | 35.825 ms | 0.1003 ms | 0.0837 ms | 0.16 |
SpartialVectorOptimizations | 600 | 24.378 ms | 0.4320 ms | 0.4041 ms | 0.11 |
SpartialParallelVectorOptimizations | 600 | 13.425 ms | 0.0319 ms | 0.0283 ms | 0.06 |
Polazna linija | 1200 | 1,763,335 ms | 7.4561 ms | 6.2262 ms | 1.00 |
SpartialOptimization | 1200 | 766.675 ms | 6.1147 ms | 5.7197 ms | 0.43 |
SpartialParallelOptimizations | 1200 | 248.801 ms | 0.6040 ms | 0.5043 ms | 0.14 |
SpartialVectorOptimizations | 1200 | 185.628 ms | 2.1240 ms | 1.9868 ms | 0.11 |
SpartialParallelVectorOptimizations | 1200 | 76.770 ms | 0.3021 ms | 0.2522 ms | 0.04 |
Polazna linija | 2400 | 14,533,335 ms | 63.3518 ms | 52.9016 ms | 1.00 |
SpartialOptimization | 2400 | 6,507.878 ms | 28.2317 ms | 26.4079 ms | 0,45 |
SpartialParallelOptimizations | 2400 | 2,076.403 ms | 20.8320 ms | 19.4863 ms | 0.14 |
SpartialVectorOptimizations | 2400 | 2,568.676 ms | 31.7359 ms | 29.6858 ms | 0.18 |
SpartialParallelVectorOptimizations | 2400 | 1,281.877 ms | 25.1127 ms | 64.8239 ms | 0.09 |
Polazna linija | 4800 | 119,768,219 ms | 181.5514 ms | 160.9406 ms | 1.00 |
SpartialOptimization | 4800 | 55,590,374 ms | 414.6051 ms | 387.8218 ms | 0.46 |
SpartialParallelOptimizations | 4800 | 15,614,506 ms | 115.6996 ms | 102.5647 ms | 0.13 |
SpartialVectorOptimizations | 4800 | 18,257,991 ms | 84.5978 ms | 79.1329 ms | 0.15 |
SpartialParallelVectorOptimizations | 4800 | 12,785,824 ms | 98.6947 ms | 87.4903 ms | 0.11 |
U ovom postu smo malo zaronili u problem najkraće putanje svih parova i implementirali Floyd-Warshall algoritam u C# kako bismo ga riješili. Također smo ažurirali našu implementaciju da poštujemo podatke i koristimo niskorazine mogućnosti .NET-a i hardvera.
U ovom postu smo igrali “all in”. Međutim, u stvarnim aplikacijama važno je zapamtiti – nismo sami. Hardcore paralelizacija može značajno smanjiti performanse sistema i uzrokovati više štete nego koristi. S druge strane, vektorizacija je malo sigurnija ako se radi na platformski nezavisan način. Zapamtite da bi neke vektorske instrukcije mogle biti dostupne samo na određenim CPU-ima.
Nadam se da ste uživali u čitanju i da ste se zabavili u "cijeđenju" nekih delova performansi iz algoritma sa samo TRI ciklusa 🙂