java
44 lines · 8 steps
A producer-consumer pipeline in Java
One producer feeds tasks through a bounded queue to four worker threads, using a poison pill to signal a clean shutdown.
Explained by
highlit
1public class WorkPipeline {
2 private final BlockingQueue<Task> queue = new LinkedBlockingQueue<>(1000);
3 private static final Task POISON = new Task(-1, null);
4 private final ExecutorService consumers = Executors.newFixedThreadPool(4);
5
6 public void start(List<Task> source) {
7 for (int i = 0; i < 4; i++) {
8 consumers.submit(this::consume);
9 }
10 Thread producer = new Thread(() -> produce(source), "producer");
11 producer.start();
12 }
13
14 private void produce(List<Task> source) {
15 try {
16 for (Task task : source) {
17 queue.put(task);
18 }
19 for (int i = 0; i < 4; i++) {
20 queue.put(POISON);
21 }
22 } catch (InterruptedException e) {
23 Thread.currentThread().interrupt();
24 }
25 }
26
27 private void consume() {
28 try {
29 while (true) {
30 Task task = queue.take();
31 if (task == POISON) {
32 break;
33 }
34 process(task);
35 }
36 } catch (InterruptedException e) {
37 Thread.currentThread().interrupt();
38 }
39 }
40
41 private void process(Task task) {
42 System.out.printf("[%s] processing %d%n", Thread.currentThread().getName(), task.id());
43 }
44}
01 / 01
STEP 01
‹ swipe to step through ›
Walkthrough
Space play
←→ step
click any line
Three takeaways
- 1A bounded BlockingQueue gives you backpressure for free — producers block when consumers fall behind.
- 2A sentinel value (poison pill) is a simple way to tell each worker to stop without sharing extra flags.
- 3Restoring the interrupt flag after catching InterruptedException keeps cancellation working correctly up the stack.
Related explainers
rust
use std::collections::HashMap; use std::sync::{Arc, Mutex}; use std::thread;
Aggregating metrics across threads in Rust
concurrency
shared-state
mutex
Intermediate
7 steps
java
public class ThumbnailProcessor { private static final int MAX_CONCURRENCY = 4;
Bounded parallel thumbnail rendering in Java
concurrency
thread-pool
futures
Intermediate
7 steps
rust
use std::sync::{mpsc, Arc, Mutex}; use std::thread; use std::time::Duration;
Building a thread pool in Rust
concurrency
channels
thread-pool
Advanced
9 steps
java
public class SortedListMerger { public static int[] merge(int[] a, int[] b) { int[] result = new int[a.length + b.length];
Merging two sorted arrays in Java
two-pointers
merging
arrays
Beginner
6 steps
go
package cache import ( "container/list"
Building a generic LRU cache in Go
lru-cache
generics
linked-list
Intermediate
8 steps
java
import java.util.ArrayDeque; import java.util.Deque; public final class RollingAverage {
A rolling average over a sliding window
sliding-window
running-sum
deque
Intermediate
7 steps
Share this explainer
Here's the card — post it anywhere.
Made with highlit — turn any snippet into a walkthrough like this in about a minute.
Explain your code
Embed this explainer
Drop the interactive walkthrough into a blog or docs. Views never cost a credit.
<iframe src="https://highlit.co/explainers/a-producer-consumer-pipeline-in-java-explained-java-4d17/embed?autoplay=1" width="100%" height="520" loading="lazy" style="border:0"></iframe>
Autoplay is on by default — add ?autoplay=0 to start paused.