Skip to main content

Command Palette

Search for a command to run...

Stream Gatherers: Custom Intermediate Operations for Java Streams

Updated
5 min read
Stream Gatherers: Custom Intermediate Operations for Java Streams

Java Streams give you a fixed menu of intermediate operations: map, filter, flatMap, limit, and a handful of others. That covers a lot, but the moment you need something stateful — batching elements into groups, computing a running total, deduplicating by a derived key — you're stuck either collecting early and re-streaming, or writing an awkward flatMap with a mutable field hanging off it.

Stream Gatherers, finalized in Java 24 (JEP 485), close that gap. Stream.gather(Gatherer) lets you write your own intermediate operation and drop it into a pipeline like any built-in.

What Are Stream Gatherers?

A Gatherer is to intermediate operations what a Collector is to terminal ones: a structured way to control how elements flow through a stage of the pipeline. Gatherers support:

  • Stateful transformations
  • Flexible input/output cardinality (1:1, 1:many, many:1, many:many)
  • Short-circuiting
  • Parallel execution when the gatherer provides a combiner
stream.gather(myCustomGatherer)
      .filter(...)
      .map(...)
      .collect(...);

The Gatherer Interface

Gatherer<T, A, R> consists of four components:

  1. Initializer – creates the state object
  2. Integrator – processes each element and pushes results downstream
  3. Combiner – merges state across threads when parallel execution occurs
  4. Finisher – runs once after all elements are processed

Most components are optional; the integrator is the only required one.

Built-in Gatherers (java.util.stream.Gatherers)

The JDK ships several ready-to-use gatherers:

  • windowFixed(int) – non-overlapping batches
  • windowSliding(int) – overlapping sliding windows
  • scan(Supplier, BiFunction) – running aggregates / stateful maps
  • fold(Supplier, BiFunction) – produces exactly one output element at the end
  • mapConcurrent(int, Function) – bounded concurrent mapping

fold is a many-to-one gatherer: it emits a single result downstream, unlike scan, which emits a running result for each element.

Practical Examples

1. Fixed window batching

List<List<Integer>> batches = Stream.of(1,2,3,4,5,6,7,8,9)
    .gather(Gatherers.windowFixed(3))
    .toList();
// [[1,2,3], [4,5,6], [7,8,9]]

2. Sliding window (consecutive pairs)

Stream.of(1,2,3,4,5)
    .gather(Gatherers.windowSliding(2))
    .forEach(System.out::println);
// [1,2], [2,3], [3,4], [4,5]

3. Running sum with scan

List<Integer> running = Stream.of(1, 2, 3, 4)
    .gather(Gatherers.scan(() -> 0, Integer::sum))
    .toList();
// [1, 3, 6, 10]

scan takes a Supplier for the initial state and a BiFunction that receives the current state and element, then returns the next state (which gets pushed downstream). Integer::sum works here because it matches that shape.

4. Custom distinct-by-property

This gatherer deduplicates strings by length, keeping the first string seen for each length. Because it is inherently sequential and order-dependent, it uses ofSequential:

Gatherer<String, Set<Integer>, String> distinctByLength =
    Gatherer.ofSequential(
        HashSet::new,
        Gatherer.Integrator.ofGreedy((state, element, downstream) -> {
            if (state.add(element.length())) {
                downstream.push(element);
            }
            return true;
        }));

The integrator receives the mutable state, the current element, and a Downstream handle, and pushes zero or more results. Its boolean return value signals whether more input should be consumed. A greedy integrator (built with ofGreedy) always returns true to keep going; it stops only when the downstream itself signals it wants no more — which is exactly what downstream.push(...) reports. A non-greedy integrator is the one that can decide on its own to short-circuit by returning false.

ofSequential creates a gatherer that always runs sequentially, even on a parallel stream. It does not accept a combiner, because parallel merging never occurs.

When Should You Use Them?

Reach for a gatherer when the built-in operations don't fit and your alternative is a collect-and-restart or a contorted flatMap. A custom gatherer keeps the logic inside the pipeline, is reusable once written, and can run in parallel if:

  • the gatherer provides a combiner, and
  • the stream implementation chooses to parallelize that stage

A combiner enables parallelization but does not guarantee it. For simple transformations that map or filter already express cleanly, stick with the basics.

Some scenarios where gatherers genuinely beat the built-ins:

Chunking calls — batch IDs into groups for a REST call or SQL IN clause with windowFixed(n).

Stateful event processing — track the previous value, detect threshold crossings, or emit on rising/falling edges without an external AtomicReference. Here, emit each element only when it differs from the one before it:

Gatherer<Integer, ?, Integer> dropConsecutiveDuplicates =
    Gatherer.ofSequential(
        () -> new int[]{Integer.MIN_VALUE},
        Gatherer.Integrator.ofGreedy((prev, element, downstream) -> {
            if (element != prev[0]) {
                downstream.push(element);
                prev[0] = element;
            }
            return true;
        }));

Deduplication by derived key — keep the first user per email domain or first product per category, using a sequential gatherer like the distinct-by-length example above.

Sliding analytics — moving averages, rolling sums, and time-series windows via windowSliding(n), without manual buffering. Combine it with map to compute over each window:

Stream.of(2, 4, 6, 8)
    .gather(Gatherers.windowSliding(2))
    .map(w -> w.stream().mapToInt(Integer::intValue).average().orElse(0))
    .toList();
// [3.0, 5.0, 7.0]

Bounded concurrent mapping — controlled parallelism without reaching for parallelStream() or hand-rolled CompletableFuture fan-out:

List<Response> responses = ids.stream()
    .gather(Gatherers.mapConcurrent(8, this::fetch)) // at most 8 in flight
    .toList();

Many-to-one summariesfold collapses a stream into a single aggregate, metric, or report:

String joined = Stream.of("a", "b", "c")
    .gather(Gatherers.fold(() -> "", (acc, s) -> acc + s))
    .findFirst()
    .orElse("");
// "abc"