Stream Gatherers: The Missing Piece in Java Stream Processing
Enhancing Java Streams with custom Gatherers: a flexible way to implement intermediate operations in your Java streams

Introduction
Java Streams revolutionized collection processing with their elegant functional approach, but developers have long encountered limitations when custom intermediate operations were needed. Enter Stream Gatherers, delivered in Java 24 as a powerful extension to the Stream API that fills this critical gap in Java’s functional programming toolkit. Gatherers provide a flexible way to implement custom intermediate operations without resorting to complex workarounds or breaking the stream’s fluent interface.
In this post we’ll have a quick refresher on Java Streams, learn why adding our own custom intermediate operations is not always easy, how Gatherers solve this problem, and see a few examples of how to write them.
I’ll also introduce you to a project I’ve been working on - Gatherers4j , which implements dozens of useful Gatherers.
The Anatomy of a Java Stream
To understand why we need Gatherers, it is helpful to understand how streams work at a high level. Streams can be thought of as having three parts:
- The stream source.
- Zero or more intermediate operations.
- A single terminal operation.
Example
Stream.of("1" ,"2", "3", "42") // <--- Source
.filter(it -> it.length() == 1) // <--- Intermediate
.map(it -> it + "!") // <--- Intermediate
.toList(); // <--- Terminal
// [ "1!", "2!", "3!" ]
The stream source is where elements of the stream come from. Common ways to create a Stream
are:
- From a collection:
someList.stream()
orsomeSet.stream()
- Static factory methods:
Stream.of("a", "b", "c")
- From an array:
Arrays.stream(someArray)
- Infinite sequences:
Stream.iterate(1, n -> n + 1)
- Generators:
Stream.generate(() -> "Bueller")
Intermediate operations are methods on Stream
that return another Stream
such as filter()
or map()
. They do some kind of work to each element of the stream (filtering or mapping to another object in this case) and return those elements as another Stream
. Intermediate operations can be stateless (filter()
and map()
) or stateful (sorted()
). Streams can work with any number of intermediate operations, even none of them.
A key characteristic of streams is their lazy evaluation. When we add intermediate operations like filter()
or map()
, the operations aren’t actually performed immediately. Only when a terminal operation is called does the stream begin processing elements through the entire chain of operations. This lazy behavior allows streams to optimize processing (like merging operations or skipping unnecessary work) and enables features like short-circuiting for infinite streams.
The terminal operation has two functions: first, to make the stream trigger the processing of elements through the pipeline. A stream without a terminal operation doesn’t do anything because nothing triggers this action. Second, to collect the elements of the Stream
into a usable object (or void, if the stream only needs to perform side effects).
The Challenge with Intermediate Operations
There is a comprehensive set of ways to create a stream, and through Collectors , we can create our own terminal operations if the JDK doesn’t have what we need.
What’s missing is a similarly elegant solution for intermediate operations. There is no convenient way to add our own, and the JDK doesn’t fully meet our needs (nor can it).
For example, the JDK comes with a distinct()
intermediate operation, but not distinctBy()
, which would take a function and give us a way to compare elements of a stream other than by equals
and hashcode
.
What we want is something like this:
Stream.of("A", "B", "CC", "DD", "EEE")
.distinctBy(it -> it.length()) // <-- Not real!
.toList();
// Gives: ["A", "CC", "EEE"]
With the current JDK, we can’t do that easily. One possible way to do this would be to map()
our String
into a custom record
with different equals
and hashCode
methods, call distinct()
, and then map the remaining elements back to String
.
Such as:
// Borrowed from JEP-485, mostly
record LengthyString(String subject) {
@Override
public boolean equals(final Object other) {
return other instanceof LengthyString(String otherSubject) &&
this.subject.length() == otherSubject.length();
}
@Override
public int hashCode() {
return Integer.hashCode(subject.length());
}
}
// And then...
Stream.of("A", "B", "CC", "DD", "EEE")
.map(LengthyString::new) // <--- Convert
.distinct() // <--- Do the thing
.map(LengthyString::subject) // <--- Unconvert
.toList();
// Gives: ["A", "CC", "EEE"]
As you can see this example is possible, but not ideal. This solution works but it is clunky and verbose - far from the ideal.
Additionally, there are many, many examples of legitimately useful intermediate operations possible. What should the JDK implement? Where do they draw the line between generally useful and too niche? Should the JDK maintainers be burdened with having to solve these problems generally for everybody? Missing intermediate operations is one of the reasons why libraries like Eclipse Collections or StreamEx exist, to fill in the gaps.
The solution to this is Gatherers - a standardized API introduced in Java 24 that allows developers to add their own intermediate operations to Java streams, just as Collectors allowed for custom terminal operations.
Enter Stream Gatherers
Rather than add and maintain many possible intermediate operations on Stream
, the Java Language authors have introduced the concept of a Gatherer and added a single new method to Stream
called gather
, which takes an instance of java.util.stream.Gatherer
.
Gatherers are described in detail in JEP 485 , which states their purpose is to…
Enhance the Stream API to support custom intermediate operations. This will allow stream pipelines to transform data in ways that are not easily achievable with the existing built-in intermediate operations.
– JEP 485
A Gatherer is an object that can maintain state, receives every element from the input stream for evaluation, and can produce as many elements as it needs to. Gatherers can be thought of as one-to-one (one output for each input), one-to-many, many-to-one, or many-to-many.
Because Gatherers can maintain state, they can evaluate each element of the stream fully aware of its context.
Conveniently, Gatherers can also short-circuit and end evaluation early in cases where future work would be futile or undesirable.
A gatherer is made up of four parts, only one of which is mandatory:
-
An
Initializer
(optional) which is used to provide an object that tracks the state of the gatherer. In principle, this is optional but in practice you will write and use gatherers with state far more frequently. The object returned the initializer is accessible for the entire lifespan of the gatherer, so we can store whatever we want in the state object. -
An
Integrator
(mandatory) which receives elements from the stream for processing. The integrator receives an element, the state object created in the initializer, and adownstream
object to possibly emit output elements to. The integrator has a great deal of flexibility. It can:- Inspect or manipulate the state object
- Emit zero or more elements to the downstream
- Short-circuit the stream evaluation by signaling that processing is over and no more elements should be evaluated
-
A
Combiner
(optional) which is used in parallel-capable gatherers to combine two state objects into a single state object. This method is not required for gatherers that are not parallel-capable such as anything that depends on inherently ordered inputs. We will see an example of this below. -
A
Finisher
(optional) which is called at the very end of the gatherer lifecycle, once all of the elements have passed through theintegrator
function and optionally, thecombiner
. The finisher has access to the state and the downstream, so this is a good time to emit any delayed work. For example, if we wrote a gatherer to find the last n elements, we would store those in the state and emit them at the very end, in the finisher.
Gatherer Lifecycle
When a gatherer is created, the following steps are taken:
-
A
downstream
object is created representing the output end of the gatherer. The downstream receives elements from the gatherer and signals if it is accepting more. This is used to bubble-up a short-circuit by some part of the stream farther along the chain. -
The
initializer
is created and executed, which returns a state object. In the case of a stateless gatherer, the default implementation returns a marker object that the JVM understands means “this gatherer does not have state”. -
The
integrator
is created and for each element in the input stream is called with the state, an element in the stream, and the downstream. If the integrator ever returns false, no more elements will be sent from the input stream (this is how we short-circuit evaluation). -
In a parallel implementation, the
combiner
is called for each pair of state objects. -
The
finisher
is called with the final state and the downstream as a last chance for the Gatherer to emit elements to the downstream.
New Gatherers
Rather than add new intermediate operations on Stream
, Java now ships with several new gatherers: mapConcurrent
, fold
, scan
, windowFixed
, and windowSliding
. It is possible that more built-in gatherers could be added in the future.
Building Our Own Gatherers
Let’s look at a few examples of gatherers. We’ll start off with a simple stateless gatherer to illustrate the basic aspects of writing a gatherer. From there we’ll add in state maintenance, and finally build a gatherer that works with both sequential and parallel streams.
One thing to notice below is that we’ll use static factory methods on Gatherer
to create our custom gatherers. While this is indeed very convenient, we could also implement Gatherer
directly if we wanted. I implement Gatherer directly most of the time because I find it more flexible and easier to test, but that’s just me. Do whatever makes you feel comfortable. Certainly for “one-liner” Gatherers, the static factory approach is very attractive.
Example 1: Stateless: toUpperCase()
For our first example, let’s build something that doesn’t require any state to keep complexity to a minimum. Since we don’t have state, we don’t need an initializer
or a finisher
, we can get away with only defining an integrator
. We could make this parallel-capable and define a combiner
, but we won’t for now.
Goal: Create a Gatherer that works on Stream<String>
and converts each of the elements to its uppercase.
Note that we could implement this with a map
quite easily, but we’ll implement this because it is foundational to understanding the more complex gatherers to follow.
We’ll use the Gatherer.of()
function from the JDK to define a Gatherer
that only has an integrator
:
static Gatherer<String, ?, String> toUpperCase() {
return Gatherer.of(
// Integrator
(_, element, downstream) ->
downstream.push(element == null ? null : element.toUpperCase())
);
}
We will implement our integrator
via a lambda that takes three arguments: the current state (which we ignore using _
), an element
from the stream, and the downstream
to push the element to. Our lambda implementation pushes the null element
in the case that it is null, and the toUpperCase
version when element
is non-null.
The lambda implicitly returns the result of our call to downstream::push()
, which returns true
if more elements can be sent to the downstream and false
otherwise. The boolean return is a form of back-pressure, to tell the stream to stop producing elements if the downstream has decided it doesn’t want any more. For example, if we also had a limit(5)
intermediate operation after toUpperCase()
, eventually the downstream would return false
from the call to push
, and we know we don’t need to perform unnecessary work.
One thing that’s worth mentioning is that we can think of the integrator as push-based rather than pull-based. We’re handed in (pushed) an element
for handling rather than having to call something like an Iterator
. This is an important concept to understand when writing a Gatherer.
I’d also like to point out is the type of our Gatherer which is Gatherer<String, ?, String>
. The first String
is the input type (the type of elements in the input stream), the middle type (?
) is the type of the state. Since we don’t have any state in this gatherer, we don’t care about its type. Also, it’s not really any of the caller’s business even if we did have a type here. Finally, the second String
is the output type (the type of elements pushed to the downstream).
Now we can use this on a Stream<String>
:
Stream.of("a", "b", "c")
.gather(toUpperCase())
.toList();
// Gives: ["A", "B", "C"]
This is, admittedly, an example that could have been written with map()
, but I wanted to show you the basics of defining our own Gatherer.
Example 2: Stateful and Sequential - mapIndexed()
Let’s turn our attention to something more complex, a Gatherer with state. Most of the Gatherers you’ll write or find are going to have some kind of state (that’s what makes them more interesting than map()
and filter()
).
Goal: Write a version of map()
called mapIndexed()
that allows us to perform a map
of an element and its index in the stream.
This example will show how state enters the picture, and how much easier it is to implement this functionality with Gatherers than it would have been before. We would have had to write a custom collector or enclosed outside state in combination with map
or something like that. Either way, it would be messy.
Let’s first define the State
that we’ll have our Gatherer
use:
static class State {
long index;
}
This State
only contains the index
, which we define as a long
in case we end up operating on a very long stream. The state in this case is quite minimal - a single long
to keep the example focused. But the state an be as complex as we want if our use-case calls for it.
Next, let’s define our Gatherer
:
public static <INPUT, OUTPUT> Gatherer<INPUT, ?, OUTPUT> mapIndexed(
final BiFunction<Long, INPUT, OUTPUT> mappingFunction
) {
return Gatherer.ofSequential(
// Initiator
State::new,
// Integrator
Integrator.ofGreedy((state, element, downstream) ->
downstream.push(mappingFunction.apply(state.index++, element))
)
);
}
This gatherer has both an initiator
and an integrator
and once again we can use a static factory on Gatherer
to create it. In this case, we will create a sequential (not parallel) gatherer. Since the initiator returns a Suppiler<State>
, we can accomplish that with a method reference (State::new
). And as in our previous example, we can provide a lambda implementing our integrator. Note that this time we’re asking for a greedy integrator (we could have in the previous example as well). This means that this gatherer will never stop asking for more elements. The downstream might, but we won’t. This allows the JVM to potentially optimize things for us under the covers.
Our mappingFunction
has three type references: first a Long
to represent the index, next an INPUT
which is the type of elements in the input stream, and finally OUTPUT
, the type of elements we’re mapping to and emitting to the output stream. The INPUT
and OUTPUT
types can be anything, and they can be the same type.
The implementation of the lambda involves calling the mapping function with our element and the index in the carried state. While we’re here, we’ll use a postfix operator (++
) to increment the index
for the next time around.
This implementation doesn’t need a finisher
since we’re emitting elements as we get them and don’t have any work to do once the stream runs out of elements. We also don’t need a combiner
since it is a sequential-only Gatherer (we couldn’t parallelize this because it is highly dependent on order).
And to use mapIndexed()
, we pass it our mapping function which in this case combines the element
and its index
into a String
, so we can see what’s going on.
Stream.of("A", "B", "C")
.gather(mapIndexed((index, element) -> element + index))
.toList();
// Gives: ["A0", "B1", "C2"]
This example doesn’t have a complex state, but it very well could. Think of how you’d implement a reverse()
Gatherer, or a Gatherer that slides a window over the stream input. The state and the integrator would both get more complex, and that’s fine.
Example 3: Support for Parallel - byFrequency()
Now let’s look at how parallel evaluation is supported in Gatherers.
Goal: Write a Gatherer called byFrequency()
that counts how many times each stream element occurs.
To keep things simple, we will emit a Map.Entry
whose key
is the element and the value
is the number of occurrences of that element, as a long.
Let’s write our Gatherer, byFrequency()
, which has an initializer
, an integrator
, a combiner
, and a finisher
, all four parts of a Gatherer. This Gatherer can operate on any stream element type (as expressed by the generic T
).
Let’s discuss what a combiner is before going further. The combiner only comes into play when we have a parallel stream. If we have a sequential stream, we don’t need to define a combiner. For a parallel stream, the JVM breaks up the stream into chunks and treats each of them separately. Each chunk is fed, in parallel, to the integrator just like in a sequential stream. When all of the elements in a chunk have gone through the integrator, we’re left with a state representing the results of that integration. Next, each of the chunks are paired up and fed to the combiner, which merges them together and returns a single new state (or possibly mutates one of the existing states and returns it). The finisher is called with the single remaining state object after the combiner merges them all, and it does the work of emitting elements to the downstream.
Let’s see how that is implemented:
static <T> Gatherer<T, Map<T, Long>, Map.Entry<T, Long>> byFrequency() {
return Gatherer.of(
// Initializer
HashMap::new,
// Integrator
Integrator.ofGreedy((state, element, downstream) -> {
state.merge(element, 1L, Long::sum);
return !downstream.isRejecting();
}),
// Combiner
(leftState, rightState) -> {
rightState.forEach((key, value) -> leftState.merge(key, value, Long::sum));
return leftState;
},
// Finisher
(state, downstream) -> {
state.entrySet()
.stream()
.sorted(Map.Entry.<T, Long>comparingByValue().reversed())
.forEach(downstream::push);
}
);
}
For the state, we will define it as a Map<T, Long>
and implement it with an HashMap
.
Our Integrator merges each element
into the state
(adding 1) and then returns true or false depending on whether the downstream
is rejecting or not. If it is rejecting we don’t need to process any more elements.
Our Combiner is something we haven’t seen yet. This is the part that combines the chunks of work. It takes two state objects, combines them together, and returns a merged representation of them. In our case here, we’ll merge the rightState
into the leftState
map and return the newly combined map.
Finally, we have our finisher, which sorts each of the Map.Entry
objects and pushes each of them to the downstream in descending order. The state is either the merged state from all of the blocks of work if we’ve got a parallel stream, or the single state from the integrator if we have a sequential stream.
Here’s an example of its use:
Stream.of("a", "b", "c", "a", "b", "a")
.gather(byFrequency())
.parallel() // <---- Turn on parallel evaluation
.toList();
// Gives [a=3, b=2, c=1]
Note that this would still work perfectly fine if we didn’t call parallel()
to enable parallel processing of our stream. Furthermore, if we had not implemented byFrequency()
with a combiner, the overall stream could still take advantage of parallel processing - this part would be sequential but the rest could be done in parallel. Meaning, not every gatherer needs to be parallel-capable to participate in a parallel stream.
Introducing Gatherers4j
When I saw the first JEP for Gatherers, I thought they looked interesting and started experimenting with them. Initially, it was just seeing what was possible but one thing led to another and now I have a full library of Gatherers called Gatherers4j . In addition to the GitHub repo, there is documentation describing each Gatherer and its use.
Gatherers4j depends on Java 24 (as of this writing) and has one single dependency - JSpecify , to help you infer the nullability of all methods and types. I’m not a big fan of libraries with lots of dependencies, but JSpecify is small (only four annotations!) and stable. Gatherers4j has a single entrypoint to make finding the right Gatherer easier.
What kinds of gatherers are in Gatherers4j? I’m glad you asked! As of this moment there are about 60 gatherers broken down into five categories (this is arbitrary for documentation purposes and has no bearing on how they operate).
Sequence Operations - Transform streams with operations like reverse()
, shuffle()
, zipWith()
, zipWithNext()
and crossWith()
to manipulate the order and relationships between elements.
Filtering and Selection - Refine streams using specialized filters like distinctBy()
, takeUntil()
, dropLast(n)
, and dedupeConsecutive()
to get exactly the elements you need.
Grouping and Windowing - Organize stream elements with operations like window()
(which provides more flexibility than the windowing functions in the JDK, groupBy()
, and groupOrdered()
to create useful collections based on your criteria.
Validation and Constraints - Enforce invariants on streams with ensureSize()
and ensureOrdered()
and fail fast when requirements are not met.
Mathematical Operations - Calculate statistics with operations like simpleMovingAverage()
, runningSum()
, and runningStandardDeviation()
for numerical analysis.
Conclusion
Gatherers are the missing piece in Java Stream processing and they hold a lot of promise. Their thoughtful design fits well into existing code and allows developers to have even more expressive functional streams that before.
Give Gatherers4j a try and let me know what you think!