Mohammed Gadiwala

Backend engineer | Golang

Leveraging Multithreading To Read Large Files Faster In Go

The other day I was interviewing at one of the companies, and I was asked the following question, how can you count occurrences of a word in a 50gb file with 4gb of RAM. The trick is to not load the whole file into memory and keep processing each word as we keep on moving the pointer of the file. With this, we can easily process the whole file with a minimal amount of memory resources.
Now the followup question was how can we speed up this process using multithreading? The solution is we keep multiple pointers at different parts of the file and each thread reads chunks of the file concurrently.
Finally, the result can be combined.
This simply shows how the whole file will be divided. And the various pointers of the file. Let's say the file is 1GB huge. Each of 5 threads will process 200MB. The consecutive pointer will start reading from the byte of the last read byte of the previous pointer.
Implementation
When it comes to multithreading the easier option which comes to mind is go-routines. I will walk you through a program that reads a large text file and creates a dictionary of words.
This program demonstrates reading a 1GB file using 5 go-routines with each thread reading 200MB each.
const mb = 1024 * 1024
const gb = 1024 * mb

func main() {
	// A waitgroup to wait for all go-routines to finish.
	wg := sync.WaitGroup{}

	// This channel is used to send every read word in various go-routines.
	channel := make(chan (string))

	// A dictionary which stores the count of unique words.
	dict := make(map[string]int64)

	// Done is a channel to signal the main thread that all the words have been
	// entered in the dictionary.
	done := make(chan (bool), 1)

	// Read all incoming words from the channel and add them to the dictionary.
	go func() {
		for s := range channel {
			dict[s]++
		}

		// Signal the main thread that all the words have entered the dictionary.
		done <- true
	}()

	// Current signifies the counter for bytes of the file.
	var current int64

	// Limit signifies the chunk size of file to be processed by every thread.
	var limit int64 = 500 * mb

	for i := 0; i < 2; i++ {
		wg.Add(1)

		go func() {
			read(current, limit, "gameofthrones.txt", channel)
			fmt.Printf("%d thread has been completed \n", i)
			wg.Done()
		}()

		// Increment the current by 1+(last byte read by previous thread).
		current += limit + 1
	}

	// Wait for all go routines to complete.
	wg.Wait()
	close(channel)

	// Wait for dictionary to process all the words.
	<-done
	close(done)
}

func read(offset int64, limit int64, fileName string, channel chan (string)) {
	file, err := os.Open(fileName)
	defer file.Close()

	if err != nil {
		panic(err)
	}

	// Move the pointer of the file to the start of designated chunk.
	file.Seek(offset, 0)
	reader := bufio.NewReader(file)

	// This block of code ensures that the start of chunk is a new word. If
	// a character is encountered at the given position it moves a few bytes till
	// the end of the word.
	if offset != 0 {
		_, err = reader.ReadBytes(' ')
		if err == io.EOF {
			fmt.Println("EOF")
			return
		}

		if err != nil {
			panic(err)
		}
	}

	var cummulativeSize int64
	for {
		// Break if read size has exceed the chunk size.
		if cummulativeSize > limit {
			break
		}

		b, err := reader.ReadBytes(' ')

		// Break if end of file is encountered.
		if err == io.EOF {
			break
		}

		if err != nil {
			panic(err)
		}

		cummulativeSize += int64(len(b))
		s := strings.TrimSpace(string(b))
		if s != "" {
			// Send the read word in the channel to enter into dictionary.
			channel <- s
		}
	}
}
Over here we also have to handle an edge case. What if the start of the chunk is not the start of a new word. Similarly what if the end of a chunk is not the end of the chunk.
We handle this by extending the end of the chunk till the end of the word and by moving the start of the consecutive chunk to the start of the next word.
We are using channels to unify all the words read by various threads into a single dictionary. A sync. Waitgroup can be used for synchronization of threads and ensure that all the threads have completed reading the file
Results
It was observed that the performance was doubled and the time required to process the 1GB file was halved concurrently as compared to doing it in a serial manner.
The reason that we did not get 5x performance i.e number of threads is that, although the goroutines are lightweight threads, the process of reading a file requires the resource of one whole os level CPU core. It has no sleep time. Hence on a dual-core system it effectively only doubles the performance of file processing.

Tags

More by Mohammed Gadiwala

Topics of interest