Limiting Concurrency in Go

Writing multithreaded, concurrent code in go is incredibly easy due to the go keyword and how lightweight goroutines are. Goroutines, channels and the sync and sync/atomic packages provide a lot of the common functionality for writing multithreaded programs and synchronizing between threads. One common issue that eventually comes up is limiting concurrency.

A common use of multithreading is to concurrently make multiple requests over the network and without some limit on the number of requests you’ll quickly run out of resources like available local TCP ports which will result in EADDRNOTAVAIL or a too many open files error from trying to open a socket.

package main

import (
   "net/http"
   "sync"
)

func main() {
   var wg sync.WaitGroup
   for i := 0; i < 10000; i++ {
   	wg.Add(1)
   	go func() {
   		defer wg.Done()
   		_, err := http.Get("http://www.google.com")
   		if err != nil {
   			panic(err)
   		}
   	}()
   }
   wg.Wait()
}

The typical solution to this problem is to use a semaphore to limit access to a resource, but the go standard library doesn’t have a semaphore implementation. One way to implement a semaphore is with jmoiron’s semaphore pattern. Which would look like

func main() {
	var empty struct{}
	concurrency := 5
	semaphore := make(chan struct{}, concurrency)
	for i := 0; i < 10000; i++ {
		semaphore <- empty
		go func() {
			defer func() {
				<-semaphore
			}()
			_, err := http.Get("http://www.google.com")
			if err != nil {
				panic(err)
			}
		}()
	}

	for i := 0; i < cap(semaphore); i++ {
		semaphore <- empty
	}
}

That pattern could be wrapped up into a type with an interface similar to sync.WaitGroup’s Add(int) and Done(). The golang/x/sync package also provides a slightly different implementation with a similar API Acquire(context.Context, int64) error, Release(int64).

Another approach to limiting concurrency is the worker pool pattern. That pattern can be altered slightly to return errors as well. The following code shows what that looks like. Also note that this could be generalized further as the task url field could be a generic func() or func() error to be able to use this worker pool for different tasks.

type task struct {
	url string
	err chan error
}

type pool struct {
	tasks chan task
}

func (p pool) worker() {
	for task := range p.tasks {
		_, err := http.Get(task.url)
		if err != nil {
			task.err <- err
		}
		close(task.err)
	}
}

func (p pool) get(url string) error {
	task := task{url: url, err: make(chan error)}
	p.tasks <- task
	return <-task.err
}

func main() {
	concurrency := 5
	p := pool{tasks: make(chan task, concurrency)}

	for i := 0; i < concurrency; i++ {
		go p.worker()
	}

	var wg sync.WaitGroup
	for i := 0; i < 10000; i++ {
		wg.Add(1)
		go func() {
			defer wg.Done()
			if err := p.get("http://wwww.google.com"); err != nil {
				panic(err)
			}
		}()
	}
        wg.Wait()
        close(p.tasks)
}