go 51 lines · 8 steps

A cancelable worker pool in Go

How a fan-out/fan-in pipeline crawls URLs concurrently while respecting context cancellation.

Explained by highlit
1package pipeline
2 
3import (
4 "context"
5 "sync"
6)
7 
8type Result struct {
9 URL string
10 Size int
11 Err error
12}
13 
14func Crawl(ctx context.Context, urls []string, workers int, fetch func(context.Context, string) (int, error)) <-chan Result {
15 jobs := make(chan string)
16 results := make(chan Result)
17 
18 go func() {
19 defer close(jobs)
20 for _, u := range urls {
21 select {
22 case jobs <- u:
23 case <-ctx.Done():
24 return
25 }
26 }
27 }()
28 
29 var wg sync.WaitGroup
30 wg.Add(workers)
31 for i := 0; i < workers; i++ {
32 go func() {
33 defer wg.Done()
34 for url := range jobs {
35 size, err := fetch(ctx, url)
36 select {
37 case results <- Result{URL: url, Size: size, Err: err}:
38 case <-ctx.Done():
39 return
40 }
41 }
42 }()
43 }
44 
45 go func() {
46 wg.Wait()
47 close(results)
48 }()
49 
50 return results
51}
01 / 01
STEP 01

Walkthrough

Space play step click any line
Three takeaways
  1. 1Fan-out to a fixed pool of workers reading one shared jobs channel bounds concurrency without spawning a goroutine per task.
  2. 2Selecting on ctx.Done() alongside every channel send makes the whole pipeline promptly cancelable instead of blocking forever.
  3. 3A separate goroutine that waits on the WaitGroup and closes the results channel lets callers range over output until it's naturally done.

Related explainers

Share this explainer

Here's the card — post it anywhere.

A cancelable worker pool in Go — share card
Made with highlit — turn any snippet into a walkthrough like this in about a minute.
Explain your code