Limiting Concurrency in Go
Writing multithreaded, concurrent code in go is incredibly easy due to the go
keyword and how lightweight goroutines are. Goroutines, channels and the sync
and sync/atomic
packages provide a lot of the common functionality for writing multithreaded programs and synchronizing between threads. One common issue that eventually comes up is limiting concurrency.
A common use of multithreading is to concurrently make multiple requests over the network and without some limit on the number of requests you’ll quickly run out of resources like available local TCP ports which will result in EADDRNOTAVAIL
or a too many open files
error from trying to open a socket.
package main
import (
"net/http"
"sync"
)
func main() {
var wg sync.WaitGroup
for i := 0; i < 10000; i++ {
wg.Add(1)
go func() {
defer wg.Done()
_, err := http.Get("http://www.google.com")
if err != nil {
panic(err)
}
}()
}
wg.Wait()
}
The typical solution to this problem is to use a semaphore to limit access to a resource, but the go standard library doesn’t have a semaphore implementation. One way to implement a semaphore is with jmoiron’s semaphore pattern. Which would look like
func main() {
var empty struct{}
concurrency := 5
semaphore := make(chan struct{}, concurrency)
for i := 0; i < 10000; i++ {
semaphore <- empty
go func() {
defer func() {
<-semaphore
}()
_, err := http.Get("http://www.google.com")
if err != nil {
panic(err)
}
}()
}
for i := 0; i < cap(semaphore); i++ {
semaphore <- empty
}
}
That pattern could be wrapped up into a type with an interface similar to sync.WaitGroup
’s Add(int)
and Done()
. The golang/x/sync
package also provides a slightly different implementation with a similar API Acquire(context.Context, int64) error
, Release(int64)
.
Another approach to limiting concurrency is the worker pool pattern. That pattern can be altered slightly to return errors as well. The following code shows what that looks like. Also note that this could be generalized further as the task url field could be a generic func()
or func() error
to be able to use this worker pool for different tasks.
type task struct {
url string
err chan error
}
type pool struct {
tasks chan task
}
func (p pool) worker() {
for task := range p.tasks {
_, err := http.Get(task.url)
if err != nil {
task.err <- err
}
close(task.err)
}
}
func (p pool) get(url string) error {
task := task{url: url, err: make(chan error)}
p.tasks <- task
return <-task.err
}
func main() {
concurrency := 5
p := pool{tasks: make(chan task, concurrency)}
for i := 0; i < concurrency; i++ {
go p.worker()
}
var wg sync.WaitGroup
for i := 0; i < 10000; i++ {
wg.Add(1)
go func() {
defer wg.Done()
if err := p.get("http://wwww.google.com"); err != nil {
panic(err)
}
}()
}
wg.Wait()
close(p.tasks)
}