Golang's Goroutines make it easy to run code concurrently. We can simply add the keyword “go” in front of a function call to make it run in a separate routine, or asynchronously.
However, Go routines on their own don't offer much benefit unless we use primitives to help us either share data between routines or make routines coordinate with each other. In this blog post, we will look at a few handy primitives provided by the sync package.
Sometimes two threads might attempt to access the same data simultaneously. The most representative example of this is when two routines are manipulating a variable holding your bank balance. The two routines will perform the following operations:
Now when this operation is performed by two routines interleaving the write after the read operation, it can produce a wrong result. A simple way to prevent this issue is using a Mutex. A Mutex, once it is locked, ensures that no other routine can lock the same Mutex until it is unlocked. Mutex is one of the most basic primitives and is used in the implementation of most other synchronization primitives that will be listed below.
Note: in Golang it is common to use the defer keyword with mutex.Unlock(). You will see the following code at the beginning of a function:
mu.Lock()
defer mu.Unlock()
Adding this at the top of a function ensures that only one routine can access the function at a time. Calling unlock with defer is also convenient since it ensures that the mutex is unlocked in the event of an error or unexpected return from the function.
Channels are by far the most practical synchronization primitive offered by the sync package. We can use channels to handle most of the synchronizing we need. If you encounter a synchronization problem that is difficult to solve, it can likely be restructured to make use of channels. So let's first look at how a channel works and the different ways it can be used.
Mental Model
Channels allow us to enable communication between two goroutines. One way to think about a channel is as a queue data structure that supports only the enqueue and dequeue methods. If we were to implement a queue data structure we would have to handle the below cases:
In a typical queue data structure, dequeue can be handled by returning nil or an error while enqueue can be handled by returning an error or more elaborately by resizing the queue.
Channels handle these scenarios by blocking the routine until the condition is no longer true.
Specifically:
So, enqueuing on a full channel and dequeuing on an empty channel are necessary conditions because they determine when a routine will block (or when a context switch will occur). This makes the second parameter, size, to the make function quite important.
make(chan int, 10) creates a channel of size 10
make(chan int) creates a channel with a default size of zero
For a channel of size zero, these two conditions are true for each enqueue and dequeue operation, i.e. the queue is simultaneously full and empty. In other words, any attempt to read or write from a channel of size 0 will cause the routine to block immediately unless there is a reader or writer blocked awaiting such an operation on the other side of the channel. This can therefore be used as a signalling mechanism.Channels are a reference type and therefore can be passed between functions without the need for a pointer.
Non-Blocking Writes
In Go, we can use the select statement to perform non-blocking operations on channels. This can be useful when we don't want a routine to get blocked while attempting to read from or write to a channel.
To do a non-blocking write, we can use a select statement with a default case. This way if the channel is full the routine will not get blocked.
// define a channel of size 2, which can buffer a maximum two numbers
numbers := make(chan int, 2)
data := []int{1, 2, 3, 4}
for x := range data {
select {
case numbers <- x:
fmt.Println("wrote value", x)
default:
fmt.Println("didn't write since channel is full")
}
}
/*
Output:
wrote value 0
wrote value 1
didn't write since channel is full
didn't write since channel is full
*/
Non-Blocking Reads
Sometimes it's important for our routine to attempt to read a value if one exists or proceed to do something else. That is a non-blocking enqueue function. This can also be achieved via the select function:
channel := make(chan int)
// Attempt a non-blocking read from our channel
select {
case val := <-channel:
fmt.Println("value in chan is ", val)
default:
fmt.Println("No value in channel")
}
// Output:
// No value in channel
As you can see the calling function will not block and will hit the default case in the select statement. We may spawn N workers and check if any one of them passed back an error, which may or may not exist.
Reading from Multiple Channels
The Select statement can also be used to read from multiple channels. We can use a select statement around two channels to block until either one produces a value. We can also add a default to the above code to make the select statement non-blocking, in which case we will attempt to get a value from channel A; if it’s unavailable, we then attempt to get a value from channel B, and if unavailable, hit the default case.
channelA := make(chan int)
channelB := make(chan int)
// Attempt a non-blocking read from our channel
go func() {
select {
case valA := <-channelA:
fmt.Println("value from ChannelA", valA)
case valB := <-channelB:
fmt.Println("value from ChannelB", valB)
}
}()
channelA <- 1
time.Sleep(1)
Channel Closing
Closing a channel is a great way to indicate that we are done sending data as well as the workers listening on these channels, and stop listening to them. Below we will look at an example of reading from channels manually and by using a range loop.
func manualConsumer(channel chan int) {
for {
val, ok := <-channel
if !ok {
break
}
fmt.Println("1: read value from channel val", val)
}
fmt.Println("1: channel is closed")
}
func forRangeConsumer(channel chan int) {
for val := range channel {
fmt.Println("2: read value from channel val", val)
}
fmt.Println("2: channel is closed")
}
func main() {
numbers := make(chan int)
// Spawn two consumers
go manualConsumer(numbers)
go forRangeConsumer(numbers)
// write all data into channel
data := int{1, 2, 3, 4, 5}
for x := range data {
numbers <- x
}
// close channel using go provided 'close' function
// to indicate the end of data to consumers.
close(numbers)
// Give some time for consumer routines to exit
// we can do this more beautifully with use of waitGroups
// but for simplicity sake use sleep
time.Sleep(1)
}
A few points to note here:
Wait Groups are a very simple synchronization primitive. Their primary use case is to wait for all threads to complete before proceeding to the next step in the program.
Mental Model
A wait group is nothing more than a class with an integer. The class provides three functions: Add, Done, and Wait. These functions manipulate the integer internally in a thread-safe manner (you can think of this as an atomic int or an integer with a lock).
Let me expand a bit on the 3 main functions:
The Done function cannot be called more times than the delta value. In other words, the internal integer value cannot be zero.
When passing a
WaitGroup
to a function, pass it by reference to avoid making a copy of the WaitGroup (i.e WaitGroup
is not a reference type).WaitGroup
is commonly used in situations where the main thread should wait for workers to complete. func worker(wg *sync.WaitGroup) {
fmt.Println("working ...")
wg.Done()
}
func main() {
wg := sync.WaitGroup{}
for i := 0; i < 5; i++ {
wg.Add(1)
go worker(&wg)
}
fmt.Println("I am doing some work")
wg.Wait()
}
Condition variables allow multiple routines to coordinate with each other. A condition variable has the following functions:
NewCond(mu Locker)
: This is a helper function or constructor used to create a new condition variable;Wait()
blocks a routine until a different routine calls Broadcast or Signal. This function also ensures that the Locker is released upon blocking and reacquired before returning from the function;Broadcast()
notifies all blocked routines to start;Signal()
notifies a single blocked routine to start;Locker
that is passed when constructing the object is public and accessible via the L
field.Note that we can mimic the signal function above with the use of channels. i.e. by executing a non-blocking write to a channel with a buffer size of 0. If a routine is blocked on reading from the channel, it will get unblocked.
However, it's difficult to implement the broadcast operation with the use of channels. You might feel like it's possible to implement a broadcast by calling close on a channel, which in turn would unblock all routines that are blocked on attempting to read the channel. This is true, but the channel cannot be reused for a second broadcast which a condition variable can do.
To better understand the usefulness of condition variables, let's look at an example where a group of workers are working on two tasks, A and B. Task B is dependent on Task A, meaning all workers must finish working on Task A before they can start working on Task B.
This is essentially the implementation of a barrier. By using a condition variable, we can easily implement this synchronization logic and ensure that all workers complete Task A before moving on to Task B.
func doTask(workerID int, taskName string) {
fmt.Printf("worker %d is doing task %s\n", workerID, taskName)
taskDuration := time.Duration(rand.Intn(5))
time.Sleep(taskDuration) // represents work workerID doing taskName
}
func notifyOrWaitForOthers(workerID int, sharedCounter *int, cond *sync.Cond) {
cond.L.Lock()
defer cond.L.Unlock()
// Worker has finished his job so decrements the shared counter
*sharedCounter--
// You are the last to complete the task, best notify the rest of the workers.
if *sharedCounter == 0 {
fmt.Printf("worker %d is the last to complete work\n", workerID)
fmt.Printf("last worker signals all workers that they can now proceed with next task\n")
cond.Broadcast()
return
}
// You finished early, let's wait till the rest are done.
/*
Wait is a special function. Three things will happen upon calling it
1. cond.L.Unlock will be called
2. scheduler will switch to a different routine and will run it
3. Before the wait call returns, cond.L.Lock will be called.
*/
cond.Wait()
}
func worker(workerID int, sharedCounter *int, cond *sync.Cond, wg *sync.WaitGroup) {
// All workers start on task A
doTask(workerID, "A")
// Once task A is done, you either wait for others or if your the last
// to complete the task notify the rest they can start working on task B
notifyAllOrWaitForOthers(workerID, sharedCounter, cond)
// All workers start on task B
doTask(workerID, "B")
wg.Done()
}
func main() {
var (
mu = &sync.Mutex{}
cond = sync.NewCond(mu)
wg = &sync.WaitGroup{}
numWorkers = 3
sharedCounter = &numWorkers
)
wg.Add(numWorkers)
for i := 0; i < numWorkers; i++ {
go worker(i, &activelyWorking, cond, wg)
}
wg.Wait()
}
You might feel like it is possible to implement the broadcast by calling close on a channel since this would unblock all routines blocked on attempting to read the channel. Although it is possible, the channel cannot be reused for a second broadcast, unlike a condition variable that can be used repeatedly.
The example shown above demonstrates how condition variables can be used to coordinate multiple routines working on two tasks A and B. However, this concept can be extended to handle more complex scenarios where the result of the work done in the previous task is important to complete the task that follows.
The featured image was generated with Kadinsky 2
Prompt: Illustrate a screen displaying lines of code.