Writing concurrent programs in any language requires a different thought-process than the one used for writing sequential programs. Getting into this new mindset takes practice; this can be aided with some simple examples of what these programs look like in Go.
Fortunately, Go treats concurrency as a first-class citizen. Included in the language specification are native concurrency primitives that are meant to make the transition into writing concurrent programs as smooth as possible.
Goroutines allow splitting programs into multiple process, channels enable communication from one goroutine to another, WaitGroups allow the ability to keep track of the exact number of active goroutines at once, and the select statement enables multiplexing of channel communications to handle the appropriate response for the communication case in question.
Let's observe what these mechanisms look like in practice...
Goroutines
The first and foremost mechanism to employ in your concurrency tool belt is the goroutine. A simple comparison that can be drawn is a goroutine's similarity with a traditional thread (although these two are by no means synonymous).
With Go's notion of a goroutine, programs can be split into separate processes to make a program concurrent.
A goroutine can executed by prefixing a function definition with the keyword go:
go func(){
// do something
}()
// OR
go doSomething()
This manner of calling a function is similar to a regular function call, except that it is non-blocking. The goroutine executes immediately and the line after the goroutine is evaluated.
Let's visualize this in a sample program which prints out letters & numbers:
package main
import (
"fmt"
"time"
)
func main() {
go outputLetters()
for i := 0; i < 11; i++ {
fmt.Print(i)
}
time.Sleep(100 * time.Microsecond)
}
func outputLetters() {
for i := 0; i < 26; i++ {
asciiChar := i + 97
fmt.Printf("%c", asciiChar)
}
}
The output shows that the numbers are being printed before the letters:
012345678910abcdefghijklmnopqrstuvwxyz
The ordering of the output demonstrates how goroutines do not follow a sequential order of operations. They run in the background and are scheduled by the built-in Go scheduler.
Notice the call to time.Sleep()
underneath the loop that prints numbers. Try removing this line and executing the program to see what happens...
The resulting output shows the numbers, but not the letters. This demonstrates how the main()
function is finishing before the scheduled goroutine has time to finish.
In practice, a hardcoded sleep timer on a process won't be needed. It is used here to demonstrate how coordination is needed to ensure all jobs finish
When starting to use goroutines, it may seem obvious that a device is needed to establish some way to wait for queued up work to finish.
Go offers ways of coordinating or synchronizing these jobs. The most important of which is the channel.
Channels
Simply put, channels are a way for goroutines to communicate and synchronize with each other.
They can be used to signal a process that has completed or to pass values from one goroutine to another.
There are two key operations when using channels: send and receive. Commonly, you'll have one goroutine executing a send on a channel, with another goroutine executing a receive on that same channel.
Channels can be defined to transmit any types necessary. This is known as the channel's element type.
Channels are created like so:
ch1 := make(chan int) // unbuffered channel with element type int
ch2 := make(chan string) // unbuffered channel with element type string
ch3 := make(chan float64, 4) // buffered channel with element type float64 & capacity of 4
The operations below show how to send or receive values on a channel:
ch1 <- 1 // send the value 1 on the channel (ch1)
val := <- ch1 // receive value from channel (ch1) & store in val
ch2 <- "foo" // send the string "foo" on the channel (ch2)
<- ch2 // receive value from the channel (ch2) & discard the result
Channels can also be closed to signify that there will be no more values sent on it. The operation close()
is used to close a channel and set a flag that reflects this:
close(ch1) // close ch1
Attempts to send more values on this channel will result in a panic. Receiving values from this channel works until the channel has been drained of the values sent to it, then further receive operations return the zero value of the channel's element type (int value 0 in the case of ch1).
As shown above in the section showing how to instantiate channels, there are two types of these to unpack: unbuffered channels & buffered channels.
Unbuffered Channels
Unbuffered channels (sometimes known as synchronous channels) are created with make()
without passing an argument for the channel's capacity.
make(chan int)
This type of channel provides strict guarantees on synchronization; a goroutine that sends a message on this channel type will only resume its execution once there has been a receipt of a separate goroutine executing a receive on this same channel.
This functionality allows developers to have different goroutines work together without having one outpace the other in its execution speed.
To amend the program from the previous section, we can remove the time.Sleep()
and use an unbuffered channel to wait on the outputLetters
goroutine to finish.
func main() {
ch := make(chan struct{})
go outputLetters(ch)
for i := 0; i < 11; i++ {
fmt.Print(i)
}
<-ch
}
func outputLetters(ch chan struct{}) {
for i := 0; i < 26; i++ {
asciiChar := i + 97
fmt.Printf("%c", asciiChar)
}
ch <- struct{}{}
}
In this example we create a channel with the element type of struct{}
. This is a common practice to signify the importance of the arrival of an item on the channel instead of the content of that value.
We only care that an item has been received from the channel since this means that the
outputLetters()
goroutine has completed.
The sequence of events is as follows:
- kick off goroutine
outputLetters()
in the background - the main goroutine (
main()
func) proceeds to print all numbers from 0 to 10 - the main goroutine blocks on
<-ch
and waits for an sent item on the channel - the
outputLetters()
goroutine proceeds to print all lowercase ASCII letters then sends an empty struct value on the channel once it has completed - the main goroutine receives and discards this value finishing execution
What about cases where we care about a channel's values?
There are many cases where we want to feed values from one goroutine into another via channels. This is known as a pipeline.
Pipelines are useful in many contexts, especially the ones where there are a sequence of operations that work on batches of data and it is desired to optimize this work.
Imagine we have 3 functions: one to generate numbers, one to generate the fibonacci number based on the generated number, and one to print the results.
func main() {
numberOfIterations := make(chan int)
fibNumbers := make(chan int)
go generateIterations(numberOfIterations)
go slowFib(fibNumbers, numberOfIterations)
for {
result, ok := <-fibNumbers
if !ok {
break
}
fmt.Printf("Fibonacci result: %v\n", result)
}
}
func generateIterations(numberOfIterations chan int) {
for i := 0; i < 45; i++ {
numberOfIterations <- i
}
close(numberOfIterations)
}
func slowFib(fibNumbers, numberOfIterations chan int) {
for {
num, ok := <-numberOfIterations
if !ok {
break // channel closed
}
currentFib := recursiveFib(num)
fibNumbers <- currentFib
}
close(fibNumbers)
}
func recursiveFib(n int) int {
if n == 0 {
return 0
}
if n == 1 {
return 1
}
return recursiveFib(n-1) + recursiveFib(n-2)
}
The 3 operations above work together to create an output stream.
The generateIterations()
goroutine is a simple number generator that passes each index in its loop into the numberOfIterations
channel. When this job has finished it closes the channel to notify the goroutines downstream that there will be no more values sent on this channel.
The slowFib()
goroutine waits on a value to be received from the numberOfIterations
channel, passes that to a suboptimal fibonacci number generator function, then sends this result on the fibNumbers
channel. The fibNumbers
channel is closed on completion of this goroutine.
The final loop in the main()
function will loop, printing all results from the fibNumbers
channel and will break as soon as it detects that fibNumbers
has been closed.
The form
x, ok := <-ch
can be used to determine if the channel being recieved from (ch) has been closed.ok
receives a returned boolean indicating if the channel is open or closed.
There are a few modifications we can make to the program above to make them a bit more idiomatically Go.
Go provides the notion of a unidirectional channel that allows us to signify a function's intent with the channels it receives as arguments.
Unidirectional Channels
In goroutines, it is common that the channels passed in as arguments will be used for a singular purpose: for sending, or for receiving.
Go provides the syntax to define channels as receive-only or send-only.
chan<- string // send-only channel
<-chan string // receive-only channel
These can be used to convert a bidirectional channel (unbuffered channel) into a unidirectional channel when they're passed into specific goroutines. This is idiomatic to express intent of the channels being passed to the goroutine.
Be careful with these conversions as there is no way to return a unidirectional channel to a bidirectional channel
Let's modify the 3-step pipeline program from the previous section and use unidirectional channels.
func main() {
numberOfIterations := make(chan int)
fibNumbers := make(chan int)
go generateIterations(numberOfIterations)
go slowFib(fibNumbers, numberOfIterations)
for {
result, ok := <-fibNumbers
if !ok {
break
}
fmt.Printf("Fibonacci result: %v\n", result)
}
}
func generateIterations(out chan<- int) {
for i := 0; i < 45; i++ {
out <- i
}
close(out)
}
func slowFib(out chan<- int, in <-chan int) {
for {
num, ok := <-in
if !ok {
break // channel closed
}
currentFib := recursiveFib(num)
out <- currentFib
}
close(out)
}
func recursiveFib(n int) int {
if n == 0 {
return 0
}
if n == 1 {
return 1
}
return recursiveFib(n-1) + recursiveFib(n-2)
}
The parameters defined in generateIterations()
and slowFib()
now have channel types that indicate their purpose in the function.
It is a compile-time error to attempt to close a receive-only channel (only send-only channels can use the
close()
function
Unbuffered channels work wonders for spinning up goroutines to accomplish a task. However, there are many cases where we want to bound how many goroutines are created in parallel.
Buffered channels provide a way of accomplishing this.
Buffered Channels
Buffered channels are similar to unbuffered channels except they have a "queue" of values that can be sent on them before another channel has to receive them.
ch := make(chan string, 5) // buffered channel with buffer of size 5
With a buffered channel of size 5, 5 values can be sent to the channel before the sending goroutine blocks:
ch <- "an"
ch <- "example"
ch <- "for"
ch <- "clarifying"
ch <- "buffered channels"
ch <- "extra send" // this line blocks if no goroutine has received anything from the channel
Executing receives on the channel are non-blocking until the buffer has been emptied:
<-ch // an
<-ch // example
<-ch // for
<-ch // clarifying
<-ch // buffered channels
<-ch // extra send
<-ch // BLOCKS until more values are sent on the channel
Let's create a program to send web requests that limits the number of active requests being executed to 3:
package main
import (
"fmt"
"net/http"
)
func main() {
urls := []string{
"http://www.google.com",
"http://www.facebook.com",
"http://www.reddit.com",
"http://www.coinbase.com",
"http://www.instagram.com",
"http://www.twitter.com",
"http://www.amazon.com",
"http://www.github.com",
"http://www.youtube.com",
"http://www.developerdao.com",
"http://www.wikipedia.org",
"http://www.espn.com",
}
activeRequests := make(chan string, 3)
done := make(chan struct{}) // signal when done
go sendGet(done, activeRequests)
for _, val := range urls {
activeRequests <- val
}
close(activeRequests) // close channel so sendGet goroutine knows to terminate range loop
<-done // finished execution
}
func sendGet(out chan<- struct{}, in <-chan string) {
for url := range in {
resp, err := http.Get(url)
if err != nil {
fmt.Println(err)
}
defer resp.Body.Close()
statusCode := resp.StatusCode
fmt.Printf("Connection status response from %s: %v\n", url, statusCode)
}
out <- struct{}{}
}
Observe that we have a sendGet()
goroutine that is only being run with up to 3 concurrent copies of itself at once. This keeps a hard limit on the maximum number of network connections we have open at one time.
The form
for url := range in
in thesendGet()
goroutine is a range loop over a channel that allows values to be received as they become available in the channel being looped over.
While the above program terminates as soon as there are no more urls, we are aware of the number of the number of goroutines that will be run (the size of the urls
slice).
There are many occurrences when we cannot prepare for how many goroutines will be executed as the input in a more realistic program could be streamed into our program.
An easy and safe way to count the number of executing goroutines and shut the program down when there are no more running is made available with the sync.WaitGroup.
WaitGroups
WaitGroups are available in the sync package along with other types for safe shared-state concurrency.
The WaitGroup type provides 3 methods:
- Add
- Done
- Wait
Add(delta int)
accepts an integer that can be used to keep track of the number of running goroutines
Done()
is used to signal that the goroutine has finished its work. It is equivalent to Add(-1)
.
Wait()
is used to block until the counter created with calls to Add()
has returned to 0. This ensures that we wait on all active jobs to finish before continuing.
This program takes a variable amount of input from the user and uses a WaitGroup to wait until all active goroutines finish work:
package main
import (
"crypto/sha1"
"encoding/base64"
"fmt"
"os"
"sync"
)
func main() {
input := make(chan []string)
var wg sync.WaitGroup
go func() {
input <- os.Args[1:]
close(input)
}()
for list := range input {
for _, val := range list {
wg.Add(1)
go generateHash(&wg, val)
}
}
wg.Wait()
}
func generateHash(wg *sync.WaitGroup, str string) {
defer wg.Done()
bytes := []byte(str)
hasher := sha1.New()
hasher.Write(bytes)
result := base64.URLEncoding.EncodeToString(hasher.Sum(nil))
fmt.Printf("Result: %v\n", result)
}
What's happening here:
For each value we receive from standard input, we call
wg.Add(1)
to increment the total number of goroutines that are active and then start the goroutine.Inside the goroutine function
generateHash(wg *sync.WaitGroup, str string)
we calldefer wg.Done()
to decrement the count in the WaitGroup to signify that this particular job has finished.At the end of
main()
we callwg.Wait()
which blocks until its counter has reached zero.
Notice how at the top of
main()
we load the input inside its own goroutine.If this wasn't nested in a goroutine, the program would enter into a deadlock. The
main()
function would block on the call as it's waiting for another goroutine to receive the value.
While parallelizing calls to a function as trivial as generateHash()
isn't realistic, the program demonstrates how to wait on a number of heavier workloads to complete concurrently before finishing program execution.
In practice there are many occurrences where a program has to handle more than a single channel's worth of events which can lead to added complexity.
Fortunately Go provides a select statement that can be used for such a purpose.
Select Statement
The select statement is a block that allows for multiplexing channel communications.
What this means is a select statement waits for any number of defined channel communication cases to be satisfied before executing. These cases can be used for triggering responses when working with multiple channels in a program.
The select statement is similar to a switch statement, except that the case statements include channel events (i.e. a value is sent on a channel or a value is ready to be received from a channel).
It will choose a case to execute depending on the channel event that has occurred. If there is more than 1 channel event that has been emitted at the same time, then a case is chosen at random to be executed.
The syntax is as follows:
select {
case ch1<-1:
// do something when a value is sent on ch1
case x := <-ch2:
// do something when a value is ready to be received from ch2
// and capture this value in x
default:
// do something if none of the other cases happened
}
Similar to the switch statement in Go, only one of these statements will be executed. There is no fall-through.
To demonstrate what can be done with this in another example case, imagine there are two goroutines: one that generates numbers and one that generates unix timestamps. To add another requirement for the exercise, the program should shut down after 10 seconds.
The select statement can monitor the events from 3 channels used to communicate data from these jobs:
package main
import (
"fmt"
"time"
)
func main() {
numbers := make(chan int)
timestamps := make(chan int64)
ticker := time.NewTicker(5 * time.Second)
go generateNumbers(numbers)
go generateTimestamps(timestamps)
loop:
for {
select {
case num := <-numbers:
fmt.Printf("number: %v\n", num)
case timestamp := <-timestamps:
fmt.Printf("timestamp: %v\n", timestamp)
case <-ticker.C:
break loop // time has elapsed
}
}
}
func generateNumbers(out chan<- int) {
i := 0
for {
out <- i
i++
time.Sleep(500 * time.Millisecond)
}
}
func generateTimestamps(out chan<- int64) {
timestamp := time.Now().Unix()
for {
out <- timestamp
timestamp = time.Now().Unix()
time.Sleep(500 * time.Millisecond)
}
}
This program runs two goroutines and uses 3 separate channels for communication.
The two goroutines will run in a continuous loop producing output (these are slowed down by the calls to time.Sleep()
) until the defined ticker
times out.
time.NewTicker(d Duration)
returns an object that sends the current time on its encapsulated channel after the defined time duration. In this case, the ticker will emit an event every 10 seconds.
Program execution will run until the first event is received from the ticker's channel ticker.C
.
A labeled break is used here to not just break out of the select statement, but also the surrounding for loop.
End
This post broke down Go's support for concurrency through its primitive types.
Getting into the concurrent programming mindset is easier to transition into after creating and tweaking simple programs like the ones in this post.
The concepts covered here will help transform sequential programs into concurrent ones with optimized performance, unlocked by the tools that are native to Go and waiting to be unlocked.
The structure and concepts described in this blog post were inspired by learnings from Alan Donovan and Brian Kernighan's excellent book The Go Programming Language