paint-brush
Rješavanje problema najkraćih puteva svih parova pomoću Floyd-Warshall algoritma u C# by@olegkarasik
607 čitanja
607 čitanja

Rješavanje problema najkraćih puteva svih parova pomoću Floyd-Warshall algoritma u C#

by Oleg Karasik28m2024/09/26
Read on Terminal Reader

Predugo; Citati

U ovom postu demonstriram kako možete implementirati Floyd-Warshall algoritam u C# da biste riješili problem najkraće staze svih parova. Osim implementacije, ovaj post uključuje razne optimizacije algoritama uključujući vektorizaciju i paralelizam.
featured image - Rješavanje problema najkraćih puteva svih parova pomoću Floyd-Warshall algoritma u C#
Oleg Karasik HackerNoon profile picture

Svi mi rješavamo problem “ najkraćeg puta ” mnogo puta dnevno. Naravno, nenamjerno. Rješavamo to na putu do posla ili kada pretražujemo internet ili kada posložimo stvari na stolu.


Zvuči malo… previše? Hajde da saznamo.


Zamislite, odlučili ste da se nađete sa prijateljima, pa… recimo u kafiću. Prije svega, morate pronaći rutu (ili stazu) do kafića, pa počnite tražiti dostupni javni prijevoz (ako ste pješice) ili rute i ulice (ako se vozite). Tražite rutu od vaše trenutne lokacije do kafića, ali ne „bilo koju“ rutu – najkraću ili najbržu.


Evo još jednog primjera, koji nije toliko očigledan kao prethodni. U toku posla odlučite da krenete „prečicom“ kroz sporednu ulicu jer pa… to je „prečica“ i „brže“ je ići ovim putem. Ali kako ste znali da je ova „prečica“ brža? Na osnovu ličnog iskustva riješili ste problem „najkraći put“ i odabrali rutu koja ide sporednom ulicom.


U oba primjera, “najkraći” put je određen ili u udaljenosti ili vremenu potrebnom da se stigne od jednog mjesta do drugog. Putujući primjeri su vrlo prirodne primjene teorije grafova i posebno problema „najkraćeg puta“. Međutim, što je s primjerom s poslaganjem stvari na stolu? U ovom slučaju "najkraće" može predstavljati broj ili složenost radnji koje morate izvršiti da biste dobili, na primjer, list papira: otvorite sto, otvorite fasciklu, uzmite list papira nasuprot uzmite list papira direktno sa stola .


Svi gore navedeni primjeri predstavljaju problem pronalaženja najkraćeg puta između dva vrha u grafu (ruta ili putanja između dva mjesta, broj radnji ili složenost dobivanja lista papira sa jednog ili drugog mjesta). Ova klasa problema najkraćeg puta se zove SSSP (Single Source Shortest Path) i osnovni algoritam za rješavanje ovih problema je Dijkstraov algoritam , koji ima O(n^2) računsku složenost.


Ali, ponekad moramo pronaći sve najkraće staze između svih vrhova. Razmislite o sljedećem primjeru: kreirate kartu za svoje redovno kretanje između kuće , posla i pozorišta . U ovom scenariju ćete završiti sa 6 ruta: work ⭢ home , home ⭢ work , work ⭢ theatre , theatre ⭢ work , home ⭢ theatre i theatre ⭢ home (obrnuti putevi mogu biti različiti zbog jednosmjernih puteva na primjer) .


Dodavanje više mjesta na kartu rezultirat će značajnim rastom kombinacija – prema permutacijama n formule kombinatorike može se izračunati kao:

 A(k, n) = n! / (n - m)! // where n - is a number of elements, // k - is a number of elements in permutation (in our case k = 2)

Što nam daje 12 ruta za 4 mjesta i 90 ruta za 10 mjesta (što je impresivno). Napomena… ovo je bez uzimanja u obzir međutačaka između mjesta, tj. da biste od kuće do posla morali preći 4 ulice, prošetati uz rijeku i prijeći most. Ako zamislimo, neke rute mogu imati zajedničke međutačke… pa… kao rezultat imaćemo veoma veliki graf, sa puno vrhova, gde će svaki vrh predstavljati ili mesto ili značajnu međutačku. Klasa problema, u kojoj treba da pronađemo sve najkraće putanje između svih parova vrhova u grafu, naziva se APSP (All Pairs Shortest Paths), a osnovni algoritam za rešavanje ovih problema je Floyd-Warshall algoritam , koji ima O(n^3) složenost računanja.


A ovo je algoritam koji ćemo implementirati danas 🙂

Floyd-Warshall algoritam

Floyd-Warshall algoritam pronalazi sve najkraće staze između svakog para vrhova u grafu. Algoritme je objavio Robert Floyd u [1] (pogledajte odjeljak “Reference” za više detalja). Iste godine Peter Ingerman je u [2] opisao modernu implementaciju algoritma u obliku tri ugniježđene for petlje:

 algorithm FloydWarshall(W) do for k = 0 to N - 1 do for i = 0 to N - 1 do for j = 0 to N - 1 do W[i, j] = min(W[i, j], W[i, k] + W[k, j]) end for end for end for end algorithm // where W - is a weight matrix of N x N size, // min() - is a function which returns lesser of it's arguments

Ako nikada niste imali promjenu u radu sa grafom predstavljenim u obliku matrice onda bi moglo biti teško razumjeti šta radi gornji algoritam. Dakle, da bismo bili sigurni da smo na istoj stranici, pogledajmo kako se graf može predstaviti u obliku matrice i zašto je takav prikaz koristan za rješavanje problema najkraće putanje.


Slika ispod ilustruje usmjereni, ponderisani graf od 5 vrhova. Na lijevoj strani, graf je predstavljen u vizualnom obliku, koji je sačinjen od krugova i strelica, gdje svaki krug predstavlja vrh, a strelica predstavlja rub sa smjerom. Broj unutar kruga odgovara broju vrha, a broj iznad ivice odgovara težini ivice. Desno je isti graf predstavljen u obliku matrice težine. Matrica težine je oblik matrice susjedstva gdje svaka ćelija matrice sadrži “težinu” – udaljenost između vrha i (red) i temena j (kolona). Matrica težina ne uključuje informacije o „putanju“ između vrhova (lista vrhova kroz koje dolazite od i do j ) – samo težinu (udaljenost) između ovih vrhova.


Slika 1. Prikaz usmjerenog, ponderiranog grafa od 5 vrhova u vizualnom obliku (lijevo) i ponderiranom matričnom obliku (desno).


U matrici težine možemo vidjeti da su vrijednosti ćelija jednake težinama između susjednih vrhova. Zato, ako pregledamo putanje od vrha 0 (red 0 ), vidjet ćemo da ... postoji samo jedan put – od 0 do 1 . Ali, na vizuelnom prikazu možemo jasno vidjeti putanje od vrha 0 do vrhova 2 i 3 (kroz vrh 1 ). Razlog za to je jednostavan – u početnom stanju matrica težine sadrži samo udaljenost između susjednih vrhova. Međutim, sama ova informacija dovoljna je da se pronađe ostatak.


Hajde da vidimo kako to radi. Obratite pažnju na ćeliju W[0, 1] . Njegova vrijednost pokazuje da postoji put od vrha 0 do vrha 1 sa težinom jednakom 1 (ukratko možemo zapisati kao: 0 ⭢ 1: 1 ). Sa ovim znanjem, sada možemo skenirati sve ćelije reda 1 (koji sadrži sve težine svih putanja od vrha 1 ) i vratiti ovu informaciju u red 0 , povećavajući težinu za 1 (vrijednost W[0, 1] ).


Slika 2. Ilustracija pronalaženja svih putanja od vrha 0 do vrhova koji su susedni vrhu 1.


Koristeći iste korake možemo pronaći putanje od vrha 0 kroz druge vrhove. Tokom pretrage može se desiti da postoji više putanja koja vodi do istog vrha i što je još važnije težine ovih putanja mogu biti različite. Primjer takve situacije je ilustrovan na donjoj slici, gdje je pretraživanjem od vrha 0 do temena 2 otkriven još jedan put do vrha 3 manje težine.


Slika 3. Ilustracija situacije, gdje je pretraga od vrha 0 do temena 2 otkrila dodatni, kraći put do vrha 3.


Imamo dva puta: originalni put – 0 ⭢ 3: 4 i novi put koji smo upravo otkrili – 0 ⭢ 2 ⭢ 3: 3 (imajte na umu, matrica težine ne sadrži putanje, tako da ne znamo koji vrhovi su uključeni u originalnu putanju). Ovo je trenutak kada donosimo odluku da zadržimo najkraću i upišemo 3 u ćeliju W[0, 3] .


Izgleda da smo upravo pronašli naš prvi najkraći put!


Koraci koje smo upravo vidjeli su suština Floyd-Warshall algoritma. Pogledajte pseudokod algoritma još jednom:

 algorithm FloydWarshall(W) do for k = 0 to N - 1 do for i = 0 to N - 1 do for j = 0 to N - 1 do W[i, j] = min(W[i, j], W[i, k] + W[k, j]) end for end for end for end algorithm

Najudaljeniji ciklus for on k iterira preko svih vrhova u grafu i na svakoj iteraciji varijabla k predstavlja vrh kroz koji pretražujemo putanje . Unutrašnji ciklus for on i iterira preko svih vrhova u grafu i na svakoj iteraciji, i predstavlja vrh od kojeg tražimo staze . I na kraju, najdublji ciklus for on j iterira preko svih vrhova u grafu i na svakoj iteraciji, j predstavlja vrh do kojeg tražimo putanju. U kombinaciji nam daje sljedeće: na svakoj iteraciji k tražimo staze od svih vrhova i do svih vrhova j kroz vrh k . Unutar ciklusa poredimo putanju i ⭢ j (predstavljenu W[i, j] ) sa stazom i ⭢ k ⭢ j (predstavljenu zbirom W[I, k] i W[k, j] ) i zapisujemo najkraći jedan nazad u W[i, j] .


Sada, kada razumijemo mehaniku, vrijeme je da implementiramo algoritam.

Osnovna implementacija

Izvorni kod i eksperimentalni grafovi dostupni su u spremištu na GitHubu. Eksperimentalni grafikoni se mogu naći u direktoriju Data/Sparse-Graphs.zip . Sva mjerila u ovom postu implementirana su u APSP01.cs fajl.

Prije nego što pređemo na implementaciju, moramo razjasniti nekoliko tehničkih momenata:

  1. Sve implementacije rade sa matricom težine predstavljenom u obliku linearnog niza.
  2. Sve implementacije koriste cjelobrojnu aritmetiku. Odsustvo puta između vrhova je predstavljeno posebnom konstantnom težinom: NO_EDGE = (int.MaxValue / 2) - 1 .


Sada, hajde da shvatimo zašto je to tako.


Što se tiče #1. Kada govorimo o matricama, ćelije definiramo u terminima “redova” i “kolona”. Zbog toga se čini prirodnim zamisliti matricu u obliku “kvadrata” ili “pravokutnika” (Slika 4a).


Slika 4. Višestruki prikazi matrice. a) imaginarna "kvadratna" reprezentacija; b) niz reprezentacije niza; c) predstavljanje linearnog niza.


Međutim, to ne znači da moramo predstaviti matricu u obliku niza nizova (Slika 4b) da bismo se pridržavali svoje mašte. Umjesto toga, možemo predstaviti matricu u obliku linearnog niza (slika 4c) gdje se indeks svake ćelije izračunava pomoću sljedeće formule:

 i = row * row_size + col; // where row - cell row index, // col - cell column index, // row_size - number of cells in a row.

Linealni niz matrice težine je preduslov za efikasno izvršavanje Floyd-Warshall algoritma. Razlog tome nije jednostavan i detaljno objašnjenje zaslužuje poseban post... ili nekoliko postova. Međutim, trenutno je važno napomenuti da ovakva reprezentacija značajno poboljšava lokalizaciju podataka , što u stvari ima veliki uticaj na performanse algoritma.

Evo, molim vas da mi vjerujete i samo ovu informaciju imate na umu kao preduvjet, međutim, ujedno vam preporučujem da odvojite malo vremena i proučite pitanje, i usput – ne vjerujte ljudima na internetu .


- Napomena autora

Što se tiče #2. Ako bolje pogledate pseudokod algoritma, nećete naći nikakve provjere vezane za postojanje puta između dva vrha. Umjesto toga, pseudokod jednostavno koristi funkciju min() . Razlog je jednostavan – prvobitno, ako ne postoji put između vrhova, vrijednost ćelije je postavljena na infinity i u svim jezicima, osim što može biti JavaScript, sve vrijednosti su manje od infinity . U slučaju cijelih brojeva, moglo bi biti primamljivo koristiti int.MaxValue kao vrijednost “bez puta”. Međutim, ovo će dovesti do prekoračenja cijelog broja u slučajevima kada će vrijednosti oba puta i ⭢ k i k ⭢ j biti jednake int.MaxValue . Zato koristimo vrijednost koja je jedna manja od polovine int.MaxValue .

Hej! Ali zašto ne možemo jednostavno provjeriti postoji li putanja prije nego što izvršimo bilo kakve kalkulacije. Na primjer, usporedbom putanja oba sa 0 (ako uzmemo nulu kao vrijednost "bez puta").


- Promišljen čitalac

To je zaista moguće, ali nažalost to će dovesti do značajne kazne za učinak. Ukratko, CPU vodi statistiku rezultata evaluacije grane npr. kada neki od if iskaza procijene na true ili false . Koristi ovu statistiku za izvršavanje koda “statistički predviđene grane” unaprijed prije nego što se procijeni stvarna if izjava (ovo se zove spekulativno izvršenje ) i stoga efikasnije izvršava kod. Međutim, kada je predviđanje CPU-a netačno, to uzrokuje značajan gubitak performansi u poređenju sa ispravnim predviđanjem i bezuslovnim izvršenjem jer CPU mora da se zaustavi i izračuna ispravnu granu.


Budući da na svakoj iteraciji k ažuriramo značajan dio matrice težine, statistika grananja CPU-a postaje beskorisna jer ne postoji obrazac koda, sve grane se zasnivaju isključivo na podacima. Dakle, takva provjera će rezultirati značajnom količinom pogrešnih predviđanja grana .

Ovdje vas također molim da mi vjerujete (za sada) i onda odvojite malo vremena za proučavanje teme.


Uhh, izgleda da smo gotovi s teorijskim dijelom – implementirajmo algoritam (ovu implementaciju označavamo kao Baseline ):

 public void Baseline(int[] matrix, int sz) { for (var k = 0; k < sz; ++k) { for (var i = 0; i < sz; ++i) { for (var j = 0; j < sz; ++j) { var distance = matrix[i * sz + k] + matrix[k * sz + j]; if (matrix[i * sz + j] > distance) { matrix[i * sz + j] = distance; } } } } }

Gornji kod je gotovo identična kopija prethodno spomenutog pseudokoda sa jednim izuzetkom – umjesto Math.Min() koristimo if da ažuriramo ćeliju samo kada je to potrebno.

Hej! Čekaj malo, nisi li ti upravo napisao puno riječi o tome zašto ako ovdje nije dobro, a nekoliko redova kasnije i sami uvodimo ako?


- Promišljen čitalac

Razlog je jednostavan. U trenutku pisanja JIT emituje skoro ekvivalentan kod za if i Math.Min implementacije. Možete ga detaljno provjeriti na Sharplab.io, ali evo isječaka tijela glavne petlje:

 // // == Assembly code of implementation of innermost loop for of j using if. // 53: movsxd r14, r14d // // compare matrix[i * sz + j] and distance (Condition) // 56: cmp [rdx+r14*4+0x10], ebx 5b: jle short 64 // // if matrix[i * sz + j] greater than distance write distance to matrix // 5d: movsxd rbp, ebp 60: mov [rdx+rbp*4+0x10], ebx 64: // end of loop // // == Assembly code of implementation of innermost loop for of j using Math.Min. // 4f: movsxd rbp, ebp 52: mov r14d, [rdx+rbp*4+0x10] // // compare matrix[i * sz + j] and distance (Condition) // 57: cmp r14d, ebx. // // if matrix[i * sz + j] less than distance write value to matrix // 5a: jle short 5e // // otherwise write distance to matrix // 5c: jmp short 61 5e: mov ebx, r14d 61: mov [rdx+rbp*4+0x10], ebx 65: // end of loop

Možda ćete vidjeti, bez obzira da li koristimo if ili Math.Min još uvijek postoji uvjetna provjera. Međutim, u slučaju if nema nepotrebnog pisanja.


Sada kada smo završili sa implementacijom, vrijeme je da pokrenemo kod i vidimo koliko je brz?!

Možete sami provjeriti ispravnost koda pokretanjem testova u spremištu .

Koristim Benchmark.NET (verzija 0.12.1) za benchmark kod. Svi grafovi koji se koriste u benčmarkovima su usmjereni, aciklični grafovi od 300, 600, 1200, 2400 i 4800 vrhova. Broj ivica u grafovima je oko 80% mogućeg maksimuma koji se za usmjerene, aciklične grafove može izračunati na sljedeći način:

 var max = v * (v - 1)) / 2; // where v - is a number of vertexes in a graph.

Let's rock!


Evo rezultata benchmarka koji su pokrenuti na mom računaru (Windows 10.0.19042, Intel Core i7-7700 CPU 3,60 Ghz (Kaby Lake) / 8 logičkih procesora / 4 jezgra):


Metoda

Veličina

Zlo

Greška

StdDev

Polazna linija

300

27.525 ms

0.1937 ms

0.1617 ms

Polazna linija

600

217.897 ms

1.6415 ms

1.5355 ms

Polazna linija

1200

1,763,335 ms

7.4561 ms

6.2262 ms

Polazna linija

2400

14,533,335 ms

63.3518 ms

52.9016 ms

Polazna linija

4800

119,768,219 ms

181.5514 ms

160.9406 ms


Iz rezultata koje možemo vidjeti, vrijeme izračuna drastično raste u odnosu na veličinu grafa – za graf od 300 vrhova bilo je potrebno 27 milisekundi, za graf od 2400 vrhova – 14,5 sekundi, a za graf od 4800 – 119 sekundi što je skoro 2 minute !


Gledajući kod algoritma to bi moglo biti teško zamisliti, postoji nešto što možemo učiniti da ubrzamo proračune... jer pa... postoje TRI petlje, samo TRI petlje.


Međutim, kako to obično biva – mogućnosti se kriju u detaljima 🙂

Podaci o poznavanju – rijetki grafikoni

Floyd-Warshall algoritam je osnovni algoritam za rješavanje problema najkraće putanje svih parova, posebno kada su u pitanju gusti ili potpuni grafovi (jer algoritam pretražuje putanje između svih parova vrhova).


Međutim, u našim eksperimentima koristimo usmjerene, aciklične grafove, koji imaju divno svojstvo – ako postoji put od vrha 1 do vrha 2 , onda nema puta od vrha 2 do vrha 1 . Za nas, to znači da postoji mnogo nesusednih vrhova koje možemo preskočiti ako ne postoji put od i do k (ovu implementaciju označavamo kao SpartialOptimisation ).

 public void SpartialOptimisation(int[] matrix, int sz) { for (var k = 0; k < sz; ++k) { for (var i = 0; i < sz; ++i) { if (matrix[i * sz + k] == NO_EDGE) { continue; } for (var j = 0; j < sz; ++j) { var distance = matrix[i * sz + k] + matrix[k * sz + j]; if (matrix[i * sz + j] > distance) { matrix[i * sz + j] = distance; } } } } }

Evo rezultata prethodnih ( Baseline ) i trenutnih ( SpartialOptimisation ) implementacija na istom skupu grafikona (najbrži rezultati su istaknuti podebljanim slovima ):


Metoda

Veličina

Zlo

Greška

StdDev

Ratio

Polazna linija

300

27.525 ms

0.1937 ms

0.1617 ms

1.00

SpartialOptimization

300

12.399 ms

0.0943 ms

0.0882 ms

0,45

Polazna linija

600

217.897 ms

1.6415 ms

1.5355 ms

1.00

SpartialOptimization

600

99.122 ms

0.8230 ms

0.7698 ms

0,45

Polazna linija

1200

1,763,335 ms

7.4561 ms

6.2262 ms

1.00

SpartialOptimization

1200

766.675 ms

6.1147 ms

5.7197 ms

0.43

Polazna linija

2400

14,533,335 ms

63.3518 ms

52.9016 ms

1.00

SpartialOptimization

2400

6,507.878 ms

28.2317 ms

26.4079 ms

0,45

Polazna linija

4800

119,768,219 ms

181.5514 ms

160.9406 ms

1.00

SpartialOptimization

4800

55,590,374 ms

414.6051 ms

387.8218 ms

0.46


Sasvim impresivno!


Smanjili smo vrijeme obračuna za pola! Naravno, što je graf gušći, to će biti manja brzina. Međutim, ovo je jedna od optimizacija koja je zgodna ako unaprijed znate s kojom klasom podataka namjeravate raditi.


Možemo li više? 🙂

Upoznajte svoj hardver – paralelizam


Moj računar je opremljen Inter Core i7-7700 CPU 3.60GHz (Kaby Lake) procesorom sa 8 logičkih procesora ( HW ) ili 4 jezgra sa Hyper-Threading tehnologijom. Imati više od jednog jezgra je kao imati više “rezervnih ruku” koje možemo staviti na posao. Dakle, da vidimo koji dio posla se može efikasno podijeliti između više radnika i onda ga paralelizirati.


Petlje su uvijek najočitiji kandidat za paralelizaciju jer su u većini slučajeva iteracije nezavisne i mogu se izvršavati istovremeno. U algoritmu imamo tri petlje koje treba analizirati i vidjeti postoje li ovisnosti koje nas sprječavaju da ih paraleliziramo.


Počnimo od for of k petlje. Na svakoj iteraciji petlja izračunava putanje od svakog vrha do svakog vrha kroz vrh k . Nove i ažurirane staze se pohranjuju u matricu težine. Gledajući iteracije možete primijetiti – mogu se izvršiti bilo kojim redoslijedom: 0, 1, 2, 3 ili 3, 1, 2, 0 bez utjecaja na rezultat. Međutim, i dalje se moraju izvršavati u nizu, inače neke od iteracija neće moći koristiti nove ili ažurirane putanje jer još neće biti upisane u matricu težine. Takva nedosljednost će definitivno uništiti rezultate. Zato moramo nastaviti da tražimo.


Sljedeći kandidat je for of i petlju. Na svakoj iteraciji petlja čita putanju od vrha i do temena k (ćelija: W[i, k] ), putanju od vrha k do temena j (ćelija: W[k, j ]) i zatim provjerava da li je poznata putanja od i do j (ćelija: W[i, j] ) je kraća od i ⭢ k ⭢ j putanje (zbir: W[i, k] + W[k, j] ). Ako bolje pogledate obrazac pristupa, mogli biste primijetiti – na svakoj iteraciji i petlja čita podatke iz k reda i ažuriranog i reda što u osnovi znači – iteracije su nezavisne i mogu se izvršavati ne samo bilo kojim redoslijedom već i paralelno!


Ovo izgleda obećavajuće, pa hajde da ga implementiramo (ovu implementaciju označavamo kao SpartialParallelOptimisations ).

for of j petlja također može biti paralelna. Međutim, paralelizacija najdubljeg ciklusa u ovom slučaju je vrlo neefikasna. To možete sami provjeriti tako što ćete napraviti nekoliko jednostavnih promjena u izvornom kodu .


- Napomena autora

 public void SpartialParallelOptimisations(int[] matrix, int sz) { for (var k = 0; k < sz; ++k) { Parallel.For(0, sz, i => { if (matrix[i * sz + k] == NO_EDGE) { return; } for (var j = 0; j < sz; ++j) { var distance = matrix[i * sz + k] + matrix[k * sz + j]; if (matrix[i * sz + j] > distance) { matrix[i * sz + j] = distance; } } }); } }

Evo rezultata implementacije Baseline , SpartialOptimisation i SpartialParallelOptimisations na istom skupu grafova (paralelizacija se radi pomoću klase Parallel ):


Metoda

Veličina

Zlo

Greška

StdDev

Ratio

Polazna linija

300

27.525 ms

0.1937 ms

0.1617 ms

1.00

SpartialOptimization

300

12.399 ms

0.0943 ms

0.0882 ms

0,45

SpartialParallelOptimizations

300

6.252 ms

0.0211 ms

0.0187 ms

0.23

Polazna linija

600

217.897 ms

1.6415 ms

1.5355 ms

1.00

SpartialOptimization

600

99.122 ms

0.8230 ms

0.7698 ms

0,45

SpartialParallelOptimizations

600

35.825 ms

0.1003 ms

0.0837 ms

0.16

Polazna linija

1200

1,763,335 ms

7.4561 ms

6.2262 ms

1.00

SpartialOptimization

1200

766.675 ms

6.1147 ms

5.7197 ms

0.43

SpartialParallelOptimizations

1200

248.801 ms

0.6040 ms

0.5043 ms

0.14

Polazna linija

2400

14,533,335 ms

63.3518 ms

52.9016 ms

1.00

SpartialOptimization

2400

6,507.878 ms

28.2317 ms

26.4079 ms

0,45

SpartialParallelOptimizations

2400

2,076.403 ms

20.8320 ms

19.4863 ms

0.14

Polazna linija

4800

119,768,219 ms

181.5514 ms

160.9406 ms

1.00

SpartialOptimization

4800

55,590,374 ms

414.6051 ms

387.8218 ms

0.46

SpartialParallelOptimizations

4800

15,614,506 ms

115.6996 ms

102.5647 ms

0.13


Iz rezultata možemo vidjeti da je paralelizacija for of i petlje smanjila vrijeme izračunavanja za 2-4 puta u odnosu na prethodnu implementaciju ( SpartialOptimisation )! Ovo je vrlo impresivno, međutim, važno je zapamtiti, paralelizacija čistih računanja troši sve dostupne računarske resurse što može dovesti do iscrpljivanja resursa drugih aplikacija u sistemu. Paralelizaciju treba pažljivo koristiti.


Kao što možete pretpostaviti – ovo nije kraj 🙂

Upoznajte svoju platformu – vektorizacija

Vektorizacija je transformacija koda koji radi na jednom elementu u kod koji djeluje na više elemenata istovremeno.

Ovo bi moglo zvučati kao složena stvar, pa da vidimo kako to funkcionira na jednostavnom primjeru:

 var a = new [] { 5, 7, 8, 1 }; var b = new [] { 4, 2, 2, 6 }; var c = new [] { 0, 0, 0, 0 }; for (var i = 0; i < 4; ++i) c[i] = a[i] + b[i];

Na previše pojednostavljen način, izvršenje iteracije 0 gornje for petlje na nivou CPU-a može se ilustrovati na sljedeći način:


Slika 5. Previše pojednostavljena ilustracija skalara za izvršavanje iteracije petlje na nivou CPU-a.


Evo šta se dešava. CPU učitava vrijednosti nizova a i b iz memorije u CPU registre (jedan registar može sadržavati točno jednu vrijednost). Zatim CPU izvršava skalarnu operaciju sabiranja na ovim registrima i upisuje rezultat nazad u glavnu memoriju – pravo u niz c . Ovaj proces se ponavlja četiri puta prije nego što se petlja završi.


Vektorizacija znači korištenje posebnih CPU registara – vektorskih ili SIMD (single istruction multiple data) registara, i odgovarajućih CPU instrukcija za izvršavanje operacija na više vrijednosti niza odjednom:


Slika 6. Previše pojednostavljena ilustracija vektorskog za izvođenje iteracije petlje na nivou CPU-a.


Evo šta se dešava. CPU učitava vrijednosti nizova a i b iz memorije u CPU registre (međutim, ovaj put, jedan vektorski registar može držati dvije vrijednosti niza). Zatim CPU izvršava operaciju vektorskog sabiranja na ovim registrima i upisuje rezultat nazad u glavnu memoriju – pravo u niz c . Budući da radimo na dvije vrijednosti odjednom, ovaj proces se ponavlja dva puta umjesto četiri.


Za implementaciju vektorizacije u .NET možemo koristiti – Vector i Vector<T> tipove (također možemo koristiti tipove iz imenskog prostora System.Runtime.Intrinsics , međutim oni su malo napredniji za trenutnu implementaciju, tako da ih neću koristiti, ali osjećam slobodno ih isprobajte sami):

 public void SpartialVectorOptimisations(int[] matrix, int sz) { for (var k = 0; k < sz; ++k) { for (var i = 0; i < sz; ++i) { if (matrix[i * sz + k] == NO_EDGE) { continue; } var ik_vec = new Vector<int>(matrix[i * sz + k]); var j = 0; for (; j < sz - Vector<int>.Count; j += Vector<int>.Count) { var ij_vec = new Vector<int>(matrix, i * sz + j); var ikj_vec = new Vector<int>(matrix, k * sz + j) + ik_vec; var lt_vec = Vector.LessThan(ij_vec, ikj_vec); if (lt_vec == new Vector<int>(-1)) { continue; } var r_vec = Vector.ConditionalSelect(lt_vec, ij_vec, ikj_vec); r_vec.CopyTo(matrix, i * sz + j); } for (; j < sz; ++j) { var distance = matrix[i * sz + k] + matrix[k * sz + j]; if (matrix[i * sz + j] > distance) { matrix[i * sz + j] = distance; } } } } }

Vektorizovani kod može izgledati pomalo bizarno, pa idemo kroz njega korak po korak.


Kreiramo vektor putanje i ⭢ k : var ik_vec = new Vector<int>(matrix[i * sz + k]) . Kao rezultat, ako vektor može sadržavati četiri vrijednosti tipa int i težina i ⭢ k putanja je jednaka 5, kreiraćemo vektor od četiri petice – [5, 5, 5, 5]. Zatim, na svakoj iteraciji, istovremeno izračunavamo putanje od vrha i do vrhova j, j + 1, ..., j + Vector<int>.Count .

Svojstvo Vector<int>.Count vraća broj elemenata tipa int koji se uklapa u vektorske registre. Veličina vektorskih registara ovisi o modelu CPU-a, tako da ovo svojstvo može vratiti različite vrijednosti na različitim CPU-ima.


- Napomena autora

Cijeli proces izračunavanja može se podijeliti u tri koraka:

  1. Učitavanje informacija iz matrice težine u vektore: ij_vec i ikj_vec .
  2. Usporedite vektore ij_vec i ikj_vec i odaberite najmanje vrijednosti u novi vektor r_vec .
  3. Napišite r_vec natrag u matricu težine.


Dok su #1 i #3 prilično jednostavni, #2 zahtijeva objašnjenje. Kao što je već pomenuto, vektorima manipulišemo sa više vrednosti u isto vreme. Stoga, rezultat nekih operacija ne može biti singularan, tj. operacija poređenja Vector.LessThan(ij_vec, ikj_vec) ne može vratiti true ili false jer uspoređuje više vrijednosti. Dakle, umjesto toga vraća novi vektor koji sadrži rezultate poređenja između odgovarajućih vrijednosti iz vektora ij_vec i ikj_vec ( -1 , ako je vrijednost iz ij_vec manja od vrijednosti iz ikj_vec i 0 ako je drugačije). Vraćeni vektor (sam po sebi) nije baš koristan, međutim, možemo koristiti vektorsku operaciju Vector.ConditionalSelect(lt_vec, ij_vec, ikj_vec) da izvučemo tražene vrijednosti iz vektora ij_vec i ikj_vec u novi vektor – r_vec . Ova operacija vraća novi vektor gdje su vrijednosti jednake manjim od dvije odgovarajuće vrijednosti ulaznih vektora, tj. ako je vrijednost vektora lt_vec jednaka -1 , tada operacija bira vrijednost iz ij_vec , u suprotnom odabire vrijednost iz ikj_vec .


Osim #2 , postoji još jedan dio koji zahtijeva objašnjenje – druga, nevektorizirana petlja. Potreban je kada veličina matrice težine nije proizvod Vector<int>.Count vrijednosti. U tom slučaju glavna petlja ne može obraditi sve vrijednosti (jer ne možete učitati dio vektora), a druga, nevektorska, petlja će izračunati preostale iteracije.


Evo rezultata ove i svih prethodnih implementacija:


Metoda

sz

Zlo

Greška

StdDev

Ratio

Polazna linija

300

27.525 ms

0.1937 ms

0.1617 ms

1.00

SpartialOptimization

300

12.399 ms

0.0943 ms

0.0882 ms

0,45

SpartialParallelOptimizations

300

6.252 ms

0.0211 ms

0.0187 ms

0.23

SpartialVectorOptimizations

300

3.056 ms

0.0301 ms

0.0281 ms

0.11

Polazna linija

600

217.897 ms

1.6415 ms

1.5355 ms

1.00

SpartialOptimization

600

99.122 ms

0.8230 ms

0.7698 ms

0,45

SpartialParallelOptimizations

600

35.825 ms

0.1003 ms

0.0837 ms

0.16

SpartialVectorOptimizations

600

24.378 ms

0.4320 ms

0.4041 ms

0.11

Polazna linija

1200

1,763,335 ms

7.4561 ms

6.2262 ms

1.00

SpartialOptimization

1200

766.675 ms

6.1147 ms

5.7197 ms

0.43

SpartialParallelOptimizations

1200

248.801 ms

0.6040 ms

0.5043 ms

0.14

SpartialVectorOptimizations

1200

185.628 ms

2.1240 ms

1.9868 ms

0.11

Polazna linija

2400

14,533,335 ms

63.3518 ms

52.9016 ms

1.00

SpartialOptimization

2400

6,507.878 ms

28.2317 ms

26.4079 ms

0,45

SpartialParallelOptimizations

2400

2,076.403 ms

20.8320 ms

19.4863 ms

0.14

SpartialVectorOptimizations

2400

2,568.676 ms

31.7359 ms

29.6858 ms

0.18

Polazna linija

4800

119,768,219 ms

181.5514 ms

160.9406 ms

1.00

SpartialOptimization

4800

55,590,374 ms

414.6051 ms

387.8218 ms

0.46

SpartialParallelOptimizations

4800

15,614,506 ms

115.6996 ms

102.5647 ms

0.13

SpartialVectorOptimizations

4800

18,257,991 ms

84.5978 ms

79.1329 ms

0.15


Iz rezultata je očigledno da je vektorizacija značajno smanjila vreme izračunavanja – od 3 do 4 puta u poređenju sa neparalelizovanom verzijom ( SpartialOptimisation ). Interesantan momenat je da vektorizovana verzija takođe nadmašuje paralelnu verziju ( SpartialParallelOptimisations ) na manjim grafovima (manje od 2400 vrhova).


I na kraju, ali ne i najmanje važno – spojimo vektorizaciju i paralelizam!

Ako vas zanima praktična primjena vektorizacije, preporučujem vam da pročitate seriju postova Dana Shechtera gdje je vektorizirao Array.Sort (ovi rezultati su se kasnije našli u ažuriranju za Garbage Collector u .NET 5 ).

Upoznajte svoju platformu i hardver – vektorizacija i paralelizam!

Poslednja implementacija kombinuje napore paralelizacije i vektorizacije i… da budem iskrena, nedostaje joj individualnost 🙂 Jer… pa, upravo smo zamenili telo Parallel.For iz SpartialParallelOptimisations vektorizovanom petljom iz SpartialVectorOptimisations :

 public void SpartialParallelVectorOptimisations(int[] matrix, int sz) { for (var k = 0; k < sz; ++k) { Parallel.For(0, sz, i => { if (matrix[i * sz + k] == NO_EDGE) { return; } var ik_vec = new Vector<int>(matrix[i * sz + k]); var j = 0; for (; j < sz - Vector<int>.Count; j += Vector<int>.Count) { var ij_vec = new Vector<int>(matrix, i * sz + j); var ikj_vec = new Vector<int>(matrix, k * sz + j) + ik_vec; var lt_vec = Vector.LessThan(ij_vec, ikj_vec); if (lt_vec == new Vector<int>(-1)) { continue; } var r_vec = Vector.ConditionalSelect(lt_vec, ij_vec, ikj_vec); r_vec.CopyTo(matrix, i * sz + j); } for (; j < sz; ++j) { var distance = matrix[i * sz + k] + matrix[k * sz + j]; if (matrix[i * sz + j] > distance) { matrix[i * sz + j] = distance; } } }); } }

Svi rezultati ovog posta su predstavljeni u nastavku. Kao što se i očekivalo, simultana upotreba paralelizma i vektorizacije pokazala je najbolje rezultate, smanjujući vrijeme proračuna do 25 puta (za grafove od 1200 vrhova) u odnosu na početnu implementaciju.


Metoda

sz

Zlo

Greška

StdDev

Ratio

Polazna linija

300

27.525 ms

0.1937 ms

0.1617 ms

1.00

SpartialOptimization

300

12.399 ms

0.0943 ms

0.0882 ms

0,45

SpartialParallelOptimizations

300

6.252 ms

0.0211 ms

0.0187 ms

0.23

SpartialVectorOptimizations

300

3.056 ms

0.0301 ms

0.0281 ms

0.11

SpartialParallelVectorOptimizations

300

3.008 ms

0.0075 ms

0.0066 ms

0.11

Polazna linija

600

217.897 ms

1.6415 ms

1.5355 ms

1.00

SpartialOptimization

600

99.122 ms

0.8230 ms

0.7698 ms

0,45

SpartialParallelOptimizations

600

35.825 ms

0.1003 ms

0.0837 ms

0.16

SpartialVectorOptimizations

600

24.378 ms

0.4320 ms

0.4041 ms

0.11

SpartialParallelVectorOptimizations

600

13.425 ms

0.0319 ms

0.0283 ms

0.06

Polazna linija

1200

1,763,335 ms

7.4561 ms

6.2262 ms

1.00

SpartialOptimization

1200

766.675 ms

6.1147 ms

5.7197 ms

0.43

SpartialParallelOptimizations

1200

248.801 ms

0.6040 ms

0.5043 ms

0.14

SpartialVectorOptimizations

1200

185.628 ms

2.1240 ms

1.9868 ms

0.11

SpartialParallelVectorOptimizations

1200

76.770 ms

0.3021 ms

0.2522 ms

0.04

Polazna linija

2400

14,533,335 ms

63.3518 ms

52.9016 ms

1.00

SpartialOptimization

2400

6,507.878 ms

28.2317 ms

26.4079 ms

0,45

SpartialParallelOptimizations

2400

2,076.403 ms

20.8320 ms

19.4863 ms

0.14

SpartialVectorOptimizations

2400

2,568.676 ms

31.7359 ms

29.6858 ms

0.18

SpartialParallelVectorOptimizations

2400

1,281.877 ms

25.1127 ms

64.8239 ms

0.09

Polazna linija

4800

119,768,219 ms

181.5514 ms

160.9406 ms

1.00

SpartialOptimization

4800

55,590,374 ms

414.6051 ms

387.8218 ms

0.46

SpartialParallelOptimizations

4800

15,614,506 ms

115.6996 ms

102.5647 ms

0.13

SpartialVectorOptimizations

4800

18,257,991 ms

84.5978 ms

79.1329 ms

0.15

SpartialParallelVectorOptimizations

4800

12,785,824 ms

98.6947 ms

87.4903 ms

0.11

Zaključak

U ovom postu smo malo zaronili u problem najkraće putanje svih parova i implementirali Floyd-Warshall algoritam u C# kako bismo ga riješili. Također smo ažurirali našu implementaciju da poštujemo podatke i koristimo niskorazine mogućnosti .NET-a i hardvera.


U ovom postu smo igrali “all in”. Međutim, u stvarnim aplikacijama važno je zapamtiti – nismo sami. Hardcore paralelizacija može značajno smanjiti performanse sistema i uzrokovati više štete nego koristi. S druge strane, vektorizacija je malo sigurnija ako se radi na platformski nezavisan način. Zapamtite da bi neke vektorske instrukcije mogle biti dostupne samo na određenim CPU-ima.


Nadam se da ste uživali u čitanju i da ste se zabavili u "cijeđenju" nekih delova performansi iz algoritma sa samo TRI ciklusa 🙂

Reference


  1. Floyd, RW Algoritam 97: Najkraći put / RW Floyd // Komunikacije ACM-a. – 1962. – Vol. 5, №. 6. – P. 345-.
  2. Ingerman, PZ Algoritam 141: Matrica puta / PZ Ingerman // Komunikacije ACM-a. – 1962. – Vol. 5, №. 11. – Str. 556.