Stream Gatherers: Custom Intermediate Operations for Java Streams

Java Streams give you a fixed menu of intermediate operations: map, filter, flatMap, limit, and a handful of others. That covers a lot, but the moment you need something stateful — batching elements into groups, computing a running total, deduplicating by a derived key — you're stuck either collecting early and re-streaming, or writing an awkward flatMap with a mutable field hanging off it.
Stream Gatherers, finalized in Java 24 (JEP 485), close that gap. Stream.gather(Gatherer) lets you write your own intermediate operation and drop it into a pipeline like any built-in.
What Are Stream Gatherers?
A Gatherer is to intermediate operations what a Collector is to terminal ones: a structured way to control how elements flow through a stage of the pipeline. Gatherers support:
- Stateful transformations
- Flexible input/output cardinality (1:1, 1:many, many:1, many:many)
- Short-circuiting
- Parallel execution when the gatherer provides a combiner
stream.gather(myCustomGatherer)
.filter(...)
.map(...)
.collect(...);
The Gatherer Interface
Gatherer<T, A, R> consists of four components:
- Initializer – creates the state object
- Integrator – processes each element and pushes results downstream
- Combiner – merges state across threads when parallel execution occurs
- Finisher – runs once after all elements are processed
Most components are optional; the integrator is the only required one.
Built-in Gatherers (java.util.stream.Gatherers)
The JDK ships several ready-to-use gatherers:
windowFixed(int)– non-overlapping batcheswindowSliding(int)– overlapping sliding windowsscan(Supplier, BiFunction)– running aggregates / stateful mapsfold(Supplier, BiFunction)– produces exactly one output element at the endmapConcurrent(int, Function)– bounded concurrent mapping
fold is a many-to-one gatherer: it emits a single result downstream, unlike scan, which emits a running result for each element.
Practical Examples
1. Fixed window batching
List<List<Integer>> batches = Stream.of(1,2,3,4,5,6,7,8,9)
.gather(Gatherers.windowFixed(3))
.toList();
// [[1,2,3], [4,5,6], [7,8,9]]
2. Sliding window (consecutive pairs)
Stream.of(1,2,3,4,5)
.gather(Gatherers.windowSliding(2))
.forEach(System.out::println);
// [1,2], [2,3], [3,4], [4,5]
3. Running sum with scan
List<Integer> running = Stream.of(1, 2, 3, 4)
.gather(Gatherers.scan(() -> 0, Integer::sum))
.toList();
// [1, 3, 6, 10]
scan takes a Supplier for the initial state and a BiFunction that receives the current state and element, then returns the next state (which gets pushed downstream). Integer::sum works here because it matches that shape.
4. Custom distinct-by-property
This gatherer deduplicates strings by length, keeping the first string seen for each length. Because it is inherently sequential and order-dependent, it uses ofSequential:
Gatherer<String, Set<Integer>, String> distinctByLength =
Gatherer.ofSequential(
HashSet::new,
Gatherer.Integrator.ofGreedy((state, element, downstream) -> {
if (state.add(element.length())) {
downstream.push(element);
}
return true;
}));
The integrator receives the mutable state, the current element, and a Downstream handle, and pushes zero or more results. Its boolean return value signals whether more input should be consumed. A greedy integrator (built with ofGreedy) always returns true to keep going; it stops only when the downstream itself signals it wants no more — which is exactly what downstream.push(...) reports. A non-greedy integrator is the one that can decide on its own to short-circuit by returning false.
ofSequential creates a gatherer that always runs sequentially, even on a parallel stream. It does not accept a combiner, because parallel merging never occurs.
When Should You Use Them?
Reach for a gatherer when the built-in operations don't fit and your alternative is a collect-and-restart or a contorted flatMap. A custom gatherer keeps the logic inside the pipeline, is reusable once written, and can run in parallel if:
- the gatherer provides a combiner, and
- the stream implementation chooses to parallelize that stage
A combiner enables parallelization but does not guarantee it. For simple transformations that map or filter already express cleanly, stick with the basics.
Some scenarios where gatherers genuinely beat the built-ins:
Chunking calls — batch IDs into groups for a REST call or SQL IN clause with windowFixed(n).
Stateful event processing — track the previous value, detect threshold crossings, or emit on rising/falling edges without an external AtomicReference. Here, emit each element only when it differs from the one before it:
Gatherer<Integer, ?, Integer> dropConsecutiveDuplicates =
Gatherer.ofSequential(
() -> new int[]{Integer.MIN_VALUE},
Gatherer.Integrator.ofGreedy((prev, element, downstream) -> {
if (element != prev[0]) {
downstream.push(element);
prev[0] = element;
}
return true;
}));
Deduplication by derived key — keep the first user per email domain or first product per category, using a sequential gatherer like the distinct-by-length example above.
Sliding analytics — moving averages, rolling sums, and time-series windows via windowSliding(n), without manual buffering. Combine it with map to compute over each window:
Stream.of(2, 4, 6, 8)
.gather(Gatherers.windowSliding(2))
.map(w -> w.stream().mapToInt(Integer::intValue).average().orElse(0))
.toList();
// [3.0, 5.0, 7.0]
Bounded concurrent mapping — controlled parallelism without reaching for parallelStream() or hand-rolled CompletableFuture fan-out:
List<Response> responses = ids.stream()
.gather(Gatherers.mapConcurrent(8, this::fetch)) // at most 8 in flight
.toList();
Many-to-one summaries — fold collapses a stream into a single aggregate, metric, or report:
String joined = Stream.of("a", "b", "c")
.gather(Gatherers.fold(() -> "", (acc, s) -> acc + s))
.findFirst()
.orElse("");
// "abc"