java 44 lines · 8 steps

A producer-consumer pipeline in Java

One producer feeds tasks through a bounded queue to four worker threads, using a poison pill to signal a clean shutdown.

Explained by highlit
1public class WorkPipeline {
2 private final BlockingQueue<Task> queue = new LinkedBlockingQueue<>(1000);
3 private static final Task POISON = new Task(-1, null);
4 private final ExecutorService consumers = Executors.newFixedThreadPool(4);
5 
6 public void start(List<Task> source) {
7 for (int i = 0; i < 4; i++) {
8 consumers.submit(this::consume);
9 }
10 Thread producer = new Thread(() -> produce(source), "producer");
11 producer.start();
12 }
13 
14 private void produce(List<Task> source) {
15 try {
16 for (Task task : source) {
17 queue.put(task);
18 }
19 for (int i = 0; i < 4; i++) {
20 queue.put(POISON);
21 }
22 } catch (InterruptedException e) {
23 Thread.currentThread().interrupt();
24 }
25 }
26 
27 private void consume() {
28 try {
29 while (true) {
30 Task task = queue.take();
31 if (task == POISON) {
32 break;
33 }
34 process(task);
35 }
36 } catch (InterruptedException e) {
37 Thread.currentThread().interrupt();
38 }
39 }
40 
41 private void process(Task task) {
42 System.out.printf("[%s] processing %d%n", Thread.currentThread().getName(), task.id());
43 }
44}
01 / 01
STEP 01

Walkthrough

Space play step click any line
Three takeaways
  1. 1A bounded BlockingQueue gives you backpressure for free — producers block when consumers fall behind.
  2. 2A sentinel value (poison pill) is a simple way to tell each worker to stop without sharing extra flags.
  3. 3Restoring the interrupt flag after catching InterruptedException keeps cancellation working correctly up the stack.

Related explainers

Share this explainer

Here's the card — post it anywhere.

A producer-consumer pipeline in Java — share card
Made with highlit — turn any snippet into a walkthrough like this in about a minute.
Explain your code