Project Link: https://github.com/Joker666/goworkerpool Concurrency in Golang and WorkerPool: [Part 1] Goroutines and channels are powerful language structs that make golang a powerful concurrent language. In the first part of the article, we explored, how we can build a workerpool to optimize the performance of the concurrency structs of golang namely limiting the resource utilization. But it was a simple example to demonstrate how we can go about it. Here, we will build a robust solution according to the learning from the first part so that we can use this solution in any application. There are some solutions on the internet with complex architecture using dispatchers and all. In reality, we do not need it, we can do everything using one shared channel. Let's see how we can build that here Architecture Here we make a generic workerpool package that can handle tasks with workers based on the desired concurrency. Let's see the directory structure. workerpool ├── pool.go ├── task.go └── worker.go The directory is in the root folder of the project. Let's go over what is. is a single unit of work that needs to be processed. is a simple worker function that handles running the task. And actually handles the creation and managing the workers. workerpool Task Task Worker Pool Implementation Let's code out first. Task workerpool ( ) Task { Err error Data {} f {}) *Task { &Task{f: f, Data: data} } { fmt.Printf( , workerID, task.Data) task.Err = task.f(task.Data) } // workerpool/task.go package import "fmt" type struct interface } , func ( {}) interface error func NewTask (f ( {}) func interface error data interface return func process (workerID , task *Task) int "Worker %d processes task %v\n" is a simple stuct that holds everything needed to process a task. We pass it the and the function that is supposed to be executed. And function executes the task. The function takes the Data as a parameter for processing. And we also store the error that is returned. Let's see how processes these tasks. Task Data f process f Worker workerpool ( ) Worker { ID taskChan *Task } { &Worker{ ID: ID, taskChan: channel, } } { fmt.Printf( , wr.ID) wg.Add( ) { wg.Done() task := wr.taskChan { process(wr.ID, task) } }() } // workerpool/worker.go package import "fmt" "sync" // Worker handles all the work type struct int chan // NewWorker returns new instance of worker * func NewWorker (channel *Task, ID ) chan int Worker return // Start starts the worker func (wr *Worker) Start (wg *sync.WaitGroup) "Starting worker %d\n" 1 go func () defer for range We have a nice little struct here. It takes a worker ID and a channel where tasks should be written to. In the method, we range over the for incoming tasks to process inside a goroutine. We can imagine, multiple of this workers would be running concurrently and handling tasks. Worker Start taskChan Worker Pool We have implemented the and to handle tasks but there's a missing piece here. Who's gonna spawn up these workers and send them tasks. The answer: Worker Pool Task Worker workerpool ( ) Pool { Tasks []*Task concurrency collector *Task wg sync.WaitGroup } { &Pool{ Tasks: tasks, concurrency: concurrency, collector: ( *Task, ), } } { i := ; i <= p.concurrency; i++ { worker := NewWorker(p.collector, i) worker.Start(&p.wg) } i := p.Tasks { p.collector <- p.Tasks[i] } (p.collector) p.wg.Wait() } // workerpoo/pool.go package import "fmt" "sync" "time" // Pool is the worker pool type struct int chan // NewPool initializes a new pool with the given tasks and // at the given concurrency. * func NewPool (tasks []*Task, concurrency ) int Pool return make chan 1000 // Run runs all work within the pool and blocks until it's // finished. func (p *Pool) Run () for 1 for range close The worker pool holds all the tasks that it needs to process and takes a concurrency number as input to spawn that many numbers of goroutines to complete the tasks concurrently. It has a buffered channel which is shared among all the workers. collector So, when we run this worker pool, we spawn the desired number of workers that take the shared channel . Next, we range over the tasks and write it to the channel. We synchronize everything with waitgoup. Now that we have a nice solution, let's test it out collector collector main ( ) { allTask []*workerpool.Task i := ; i <= ; i++ { task := workerpool.NewTask( { taskID := data.( ) time.Sleep( * time.Millisecond) fmt.Printf( , taskID) }, i) allTask = (allTask, task) } pool := workerpool.NewPool(allTask, ) pool.Run() } // main.go package import "fmt" "time" "github.com/Joker666/goworkerpool/workerpool" func main () var for 1 100 func (data {}) interface error int 100 "Task %d processed\n" return nil append 5 Here, we create 100 tasks and use 5 as concurrency to process them. And see the output Worker 3 processes task 98 Task 92 processed Worker 2 processes task 99 Task 98 processed Worker 5 processes task 100 Task 99 processed Task 100 processed Took ===============> 2.0056295s It takes us two seconds to process 100 tasks, if we increase the concurrency to 10, we would see that it would take just about one second to process all the tasks. We have built a robust solution for worker pool that can handle concurrency, store errors to task, send them data to process. This package is generic and not coupled to a specific implementation. We can use this to tackle large problems as well Extending Further: Handling Tasks In Background We can actually extend our solution further, so that, the workers keep waiting for new tasks in the background and we can send them new tasks to process. For that, Task stays as is, but we would need to modify a bit. Let's see the changes Worker Worker { ID taskChan *Task quit } { &Worker{ ID: ID, taskChan: channel, quit: ( ), } } .... { fmt.Printf( , wr.ID) { { task := <-wr.taskChan: process(wr.ID, task) <-wr.quit: } } } { fmt.Printf( , wr.ID) { wr.quit <- }() } // workerpool/worker.go // Worker handles all the work type struct int chan chan bool // NewWorker returns new instance of worker * func NewWorker (channel *Task, ID ) chan int Worker return make chan bool // StartBackground starts the worker in background waiting func (wr *Worker) StartBackground () "Starting worker %d\n" for select case case return // Stop quits the worker func (wr *Worker) Stop () "Closing worker %d\n" go func () true We add a channel to struct and two new methods. would start an infinite for loop with to read from and process the task. If it reads from channel it, returns from the function. method writes to the channel. quit Worker StartBackgorund select taskChan quit Stop quit Armed with these two new methods let's add some new things to . Pool Pool { Tasks []*Task Workers []*Worker concurrency collector *Task runBackground wg sync.WaitGroup } { p.collector <- task } { { { fmt.Print( ) time.Sleep( * time.Second) } }() i := ; i <= p.concurrency; i++ { worker := NewWorker(p.collector, i) p.Workers = (p.Workers, worker) worker.StartBackground() } i := p.Tasks { p.collector <- p.Tasks[i] } p.runBackground = ( ) <-p.runBackground } { i := p.Workers { p.Workers[i].Stop() } p.runBackground <- } // workerpool/pool.go type struct int chan chan bool // AddTask adds a task to the pool func (p *Pool) AddTask (task *Task) // RunBackground runs the pool in background func (p *Pool) RunBackground () go func () for "⌛ Waiting for tasks to come in ...\n" 10 for 1 append go for range make chan bool // Stop stops background workers func (p *Pool) Stop () for range true The struct now holds the workers and has a channel that can help it to stay alive. We have 3 new methods, can add to channel anytime now. Pool runBackground AddTask collector The method spawns a goroutine that runs infinitely to keep the Pool alive along with channel. This is a technique to run the execution forever to read from an empty channel. We spin up the workers in goroutines. Stop method, stops the workers, it writes to to finish the method. Let's see how it works now. RunBackground runBackground runBackground RunBackground If we had a real world scenario, this would be running alongside a HTTP server and consuming tasks. We would replicate similar behavior with an infinite loop and it would stop if it matches a certain condition ... pool := workerpool.NewPool(allTask, ) { { taskID := rand.Intn( ) + taskID% == { pool.Stop() } time.Sleep(time.Duration(rand.Intn( )) * time.Second) task := workerpool.NewTask( { taskID := data.( ) time.Sleep( * time.Millisecond) fmt.Printf( , taskID) }, taskID) pool.AddTask(task) } }() pool.RunBackground() // main.go 5 go func () for 100 20 if 7 0 5 func (data {}) interface error int 100 "Task %d processed\n" return nil When we run this, we would see a random task is getting inserted while the workers are running in the background, and one of the workers picking the task up. It would eventually stop when it matches the condition to stop. Conclusion We explored how we can build a robust solution with worker pool from the naive one in the first part. Also, we extended further to implement the worker pool running in the background to handle further incoming tasks.