Project Link: https://github.com/Joker666/goworkerpool
In modern programming languages concurrency has become an unequivocal need. Almost all programming languages today have some method of concurrency. Some have really powerful constructs, that can offload works to different threads in the OS, e.g., Java, some mimic this behavior in the same thread, e.g., Ruby.
Golang has a very powerful concurrency model called CSP (communicating sequential processes), which breaks a problem into smaller sequential processes and then schedules several instances of these processes called Goroutines. The communication between these processes happens by passing immutable messages via Channels.
Here we will explore, how we can take advantage of concurrency in golang and how we can limit its usage with worker pools. In the second part, we will explore how to build a robust solution for concurrency around this concept.
Let's imagine we have an external API call which takes about 100ms to get done. If we have 1000 of these calls, and we call these synchronously, it would take about 100s to complete
//// model/data.go
package model
type SimpleData struct {
ID int
}
//// basic/basic.go
package basic
import (
"fmt"
"github.com/Joker666/goworkerpool/model"
"time"
)
func Work(allData []model.SimpleData) {
start := time.Now()
for i, _ := range allData {
Process(allData[i])
}
elapsed := time.Since(start)
fmt.Printf("Took ===============> %s\n", elapsed)
}
func Process(data model.SimpleData) {
fmt.Printf("Start processing %d\n", data.ID)
time.Sleep(100 * time.Millisecond)
fmt.Printf("Finish processing %d\n", data.ID)
}
//// main.go
package main
import (
"fmt"
"github.com/Joker666/goworkerpool/basic"
"github.com/Joker666/goworkerpool/model"
"github.com/Joker666/goworkerpool/worker"
)
func main() {
// Prepare the data
var allData []model.SimpleData
for i := 0; i < 1000; i++ {
data := model.SimpleData{ ID: i }
allData = append(allData, data)
}
fmt.Printf("Start processing all work \n")
// Process
basic.Work(allData)
}
Start processing all work
Took ===============> 1m40.226679665s
Here, we have a simple model that holds a data struct with just an integer value. We process an array of this data synchronously. This is obviously not an optimum solution since these tasks can be processed concurrently. Let's turn this into an asynchronous process with goroutines and channels.
//// worker/notPooled.go
func NotPooledWork(allData []model.SimpleData) {
start := time.Now()
var wg sync.WaitGroup
dataCh := make(chan model.SimpleData, 100)
wg.Add(1)
go func() {
defer wg.Done()
for data := range dataCh {
wg.Add(1)
go func(data model.SimpleData) {
defer wg.Done()
basic.Process(data)
}(data)
}
}()
for i, _ := range allData {
dataCh <- allData[i]
}
close(dataCh)
wg.Wait()
elapsed := time.Since(start)
fmt.Printf("Took ===============> %s\n", elapsed)
}
//// main.go
// Process
worker.NotPooledWork(allData)
Start processing all work
Took ===============> 101.191534ms
Here, we create a buffered channel of 100 and add all the data passed to
NoPooledWork
to the channel. Since it is a buffered channel, it would not be able to enter any more data than 100 until data is extracted from it. Which we do it inside a goroutine. We range over the channel, extract data from it, add a goroutine, and process. There's no limit to how many goroutines we are creating here. This can process as many tasks as passed. Theoretically, it is possible to handle as much data as possible given the resources needed. If we run this, we complete 1000 tasks around 100ms. Crazy right. Not quite.Unless we have all the resources on earth, we have limited allocations we can do for a given period of time. The minimal size of a goroutine object is 2K, but it can reach up to 1GB. The above solution runs all the tasks concurrently and let's say for a million of these tasks, it can quickly run out of memory and CPU of the machine. Either we have to upgrade our machine or find a different solution.
Computer scientists have thought about this problem long before and came up with a brilliant solution called Thread Pool or Worker Pool. The idea is to have a limited pool of workers to process the tasks. Once a worker is done processing one task, it goes for handling the next task. So tasks keep waiting to be processed. This reduces the burst on CPU and memory and rather distributes the tasks over time.
Let's fix the earlier problem with worker pool implementation
//// worker/pooled.go
func PooledWork(allData []model.SimpleData) {
start := time.Now()
var wg sync.WaitGroup
workerPoolSize := 100
dataCh := make(chan model.SimpleData, workerPoolSize)
for i := 0; i < workerPoolSize; i++ {
wg.Add(1)
go func() {
defer wg.Done()
for data := range dataCh {
basic.Process(data)
}
}()
}
for i, _ := range allData {
dataCh <- allData[i]
}
close(dataCh)
wg.Wait()
elapsed := time.Since(start)
fmt.Printf("Took ===============> %s\n", elapsed)
}
//// main.go
// Process
worker.PooledWork(allData)
Start processing all work
Took ===============> 1.002972449s
Here, we have a limited number of worker size, 100. We spin up exactly 100 goroutines to process the tasks. We can think of channels as queues and each worker goroutine is a consumer. Multiple goroutines can listen to the same channel but each item on a channel would be processed just once.
Channels work like queue in Golang
This is a nice solution, if we run it, we see it takes 1s to finish all the tasks. Not quite 100ms, but we don't need it. We get a much better solution that distributes the load over time.
We are not quite done yet. This looks like a complete solution except it isn't. We are not handling errors here. So let's create a scenario where we have to handle errors and let's see how we can go about it.
//// worker/pooledError.go
func PooledWorkError(allData []model.SimpleData) {
start := time.Now()
var wg sync.WaitGroup
workerPoolSize := 100
dataCh := make(chan model.SimpleData, workerPoolSize)
errors := make(chan error, 1000)
for i := 0; i < workerPoolSize; i++ {
wg.Add(1)
go func() {
defer wg.Done()
for data := range dataCh {
process(data, errors)
}
}()
}
for i, _ := range allData {
dataCh <- allData[i]
}
close(dataCh)
wg.Add(1)
go func() {
defer wg.Done()
for {
select {
case err := <-errors:
fmt.Println("finished with error:", err.Error())
case <-time.After(time.Second * 1):
fmt.Println("Timeout: errors finished")
return
}
}
}()
defer close(errors)
wg.Wait()
elapsed := time.Since(start)
fmt.Printf("Took ===============> %s\n", elapsed)
}
func process(data model.SimpleData, errors chan<- error) {
fmt.Printf("Start processing %d\n", data.ID)
time.Sleep(100 * time.Millisecond)
if data.ID % 29 == 0 {
errors <- fmt.Errorf("error on job %v", data.ID)
} else {
fmt.Printf("Finish processing %d\n", data.ID)
}
}
//// main.go
// Process
worker.PooledWorkError(allData)
We have modified our
process
method to handle some randomized errors and add it to the passed errors
channel. So, to handle errors in a concurrent model, we need errors
channel to hold the error data. After all the tasks are done processing, we check the errors
channel for errors. The error object holds ID of the task so that if needed, we can process those again.This is a better solution than the one which was not handling errors at all. But we are not done with it yet, in the second part, we will go over how we can make a dedicated worker pool package that is robust and can handle concurrent tasks with a limiting worker pool number.
We don't always want to keep processing our tasks if multiple tasks keep failing. Indicates that we have a deeper issue to resolve first. For exercise, you can try to improve the last solution to stop processing tasks, when there are more than 2 tasks that have resulted in an error. Send pull request in the original repo when you find a solution and I will send a review for that.
Golang's concurrency model is powerful enough to just build a worker pool solution without much overhead, that's why it's not included in the standard library. But we can spin up our own solution that fits our needs. I will write the next article very soon. Til then, stay tuned.