go
51 lines · 8 steps
A cancelable worker pool in Go
How a fan-out/fan-in pipeline crawls URLs concurrently while respecting context cancellation.
Explained by
highlit
1package pipeline
2
3import (
4 "context"
5 "sync"
6)
7
8type Result struct {
9 URL string
10 Size int
11 Err error
12}
13
14func Crawl(ctx context.Context, urls []string, workers int, fetch func(context.Context, string) (int, error)) <-chan Result {
15 jobs := make(chan string)
16 results := make(chan Result)
17
18 go func() {
19 defer close(jobs)
20 for _, u := range urls {
21 select {
22 case jobs <- u:
23 case <-ctx.Done():
24 return
25 }
26 }
27 }()
28
29 var wg sync.WaitGroup
30 wg.Add(workers)
31 for i := 0; i < workers; i++ {
32 go func() {
33 defer wg.Done()
34 for url := range jobs {
35 size, err := fetch(ctx, url)
36 select {
37 case results <- Result{URL: url, Size: size, Err: err}:
38 case <-ctx.Done():
39 return
40 }
41 }
42 }()
43 }
44
45 go func() {
46 wg.Wait()
47 close(results)
48 }()
49
50 return results
51}
01 / 01
STEP 01
‹ swipe to step through ›
Walkthrough
Space play
←→ step
click any line
Three takeaways
- 1Fan-out to a fixed pool of workers reading one shared jobs channel bounds concurrency without spawning a goroutine per task.
- 2Selecting on ctx.Done() alongside every channel send makes the whole pipeline promptly cancelable instead of blocking forever.
- 3A separate goroutine that waits on the WaitGroup and closes the results channel lets callers range over output until it's naturally done.
Related explainers
java
@Service public class ReportGenerationService { private final ReportRepository reportRepository;
Async report generation with Spring @Async
async
dependency-injection
completablefuture
Intermediate
7 steps
go
package auth import ( "net/http"
Building a JWT auth middleware in Gin
middleware
jwt
authentication
Intermediate
7 steps
go
func RecoveryHandler(logger *zap.Logger) gin.HandlerFunc { return gin.CustomRecovery(func(c *gin.Context, recovered any) { var brokenPipe bool if ne, ok := recovered.(*net.OpError); ok {
A panic-recovery middleware in Gin
middleware
panic-recovery
structured-logging
Intermediate
6 steps
go
package server import ( "context"
Composing HTTP middleware in Go
middleware
http-handlers
closures
Intermediate
8 steps
javascript
async function mapWithConcurrency(items, limit, worker) { const results = new Array(items.length); let nextIndex = 0;
Bounded-concurrency async map in JavaScript
concurrency
async-await
promises
Intermediate
7 steps
go
package config import ( "encoding/json"
Parsing untyped JSON config in Go
json parsing
type assertions
error handling
Intermediate
7 steps
Share this explainer
Here's the card — post it anywhere.
Made with highlit — turn any snippet into a walkthrough like this in about a minute.
Explain your code
Embed this explainer
Drop the interactive walkthrough into a blog or docs. Views never cost a credit.
<iframe src="https://highlit.co/explainers/a-cancelable-worker-pool-in-go-explained-go-7357/embed?autoplay=1" width="100%" height="520" loading="lazy" style="border:0"></iframe>
Autoplay is on by default — add ?autoplay=0 to start paused.