The Java Stream API allows to create pipelines of chained methods to produce a result. It exists since Java 8. Concretely, it’s a way to write some long treatments on a collection of elements in a more readable way, for example using lambda expressions.
This article is part of a series on advanced Java concepts, also corresponding to topics to study for upgrading from level I certification to level II certification (OCP level). The new Oracle Java certification framework (as from Java 11) only offers one certification covering both levels, so you’ll want to go through the level I (“OCA”) topics as well.
A few things to know about Stream:
- A stream is an immutable flow of elements.
- By default it’s sequential, but it can be set to run in parallel.
- Operations on the source (which is immutable) each return a stream, so that operations can be chained.
- Stream processing stops as soon as there is a result, which means some operations could be ignored.
- Operations are not applied sequentially.
- Stream operators use functional interfaces => they can be implemented with lambda expressions.
- They can represent finite (ex: takeWhile() => finished when condition is met) or infinite (ex: generate()) flows of elements.
- They can be obtained from any collection or array or with static methods (Stream.of()).
- The structure of the API can be describe like so:
-BaseStream I => core behaviours (ex: sequential/parallel modes)
–Stream I => uses generics
–DoubleStream I => typed, to avoid excessive (un)boxing, provides processing operations (same for the followings)
–IntStream I
–LongStream I
Stream pipeline
A really important concept in streams is operation types: operations can be intermediate, terminal or short-circuit:
- Intermediate: performs an action and produces a new stream
- Terminal: traverses* the stream pipeline and ends the stream processing
- Short-circuit: produces finite result even if presented with infinite input
* Streams are not processed until a final (terminal) operation is found! They are lazy.
Intermediate operators | Terminal operators |
filter | forEach (+forEachOrdered) |
map (+ mapToInt/Long/Double) | count |
flatMap (+ toInt/Long/Double) | min (returns OptionalInt, uses Comparator) |
peek (similar to forEach but intermediate operator) | max (same as above) |
distinct (no duplicates) | sum (only available for primitive variants) |
sorted | average (same as above + returns OptionalDouble) |
dropWhile (remove elements until an element doesn’t match) | collect |
skip (skip N first elements) | reduce |
limit (return stream limited to a certain length) <= short-circuit | allMatch (takes a predicate, returns a boolean) <= short-circuit |
takeWhile (take elements until an element doesn’t match) <= short circuit | anyMatch (same as above) <= short-circuit |
noneMatch (same as above) <= short-circuit | |
findAny (returns a matching element wrapped in Optional, takes no parameters => random result) <= short-circuit | |
findFirst (returns the matching element wrapped in Optional) <= short-circuit |
Functional interfaces
A functional interface is an interface that only implements one (non-static, public) method. A lambda expression can be used to implement it because there is only one method accessible, which is used by default.
Streams use lambda expressions, which reduces the code written in most cases. Lambda expressions are implementations of functional interfaces, therefor it’s important to know what’s a functional interface, the types that exist and which one in used by each operator. Of course, custom functional interfaces can also be created.
- Supplier => produces elements
Supplier<T>
method: T get()
ex: generate(Supplier s)
myItems.stream().generate(new Random()::nextInt) - Predicate => performs tests (returns a boolean)
Predicate<T>
method: boolean Test(T t)
ex: filter(Predicate p)
myItems.stream().filter(x -> x.getAge() > 18) - Consumer => processes elements
Consumer<T>
method: void accept(T t)
ex: peek(Consumer c)
myItems.stream().peek(x -> System.out.println(x)) - Function => convert types
Function<T,R>
method: R apply(T t)
ex: map(Function f)
myItems.stream().map(x -> x.length()) - UnaryOperator => convert values
UnaryOperator<T>
method: T apply(T t)
ex: map(UnaryOperator u)
myItems.stream().map(String::toUpperCase)
There exist a lot of primitive variants for these functional interfaces that allow better performances thanks to less (un)boxing operations. Some examples are Int/Long/DoublePredicate (primitive input), ToInt/Long/DoubleFunction (primitive output), Int/Long/DoubleFunction (primitve input), and so on.
There are also variants with two arguments, like BiPredicate<T,U>, BiFunction<T,U,R> (convert 2 types into one), and more, and their primitive variants.
A bit more about the collect operator
- mutable reduction
- it accepts Collector interface implementations
- it produces new containers using Supplier, accumulates data elements into these containers using BiConsumer, combines result containers using BinaryOperator and optionally performs a final transformation of the processing result using Function.
- stream.collect(Supplier<R> supplier, BiConsumer<R, ? super T> accumulator, BiConsumer<R,R> combiner)
(collector)
(Collectors.collectingAndThen(collector, finisher)) - collect and reduce perform the same reduction but collect accumulates intermediate results within containers, which may improve performance.
- Collectors have a few read-to-use methods to define a collection strategy, like:
Collectors.partitioningBy(p -> p instance of ..) => returns Map<Boolean, List<T>>
Collectors.groupingBy(p -> p.getCategory()) => Map<K, List<T>>
…
Parallelism
Elements in a stream are subdivised into subsets, which are processed in parallel (which may give a false impression of sequentiallity when the set is not big). Subsets may be subdivised themselves into subsets. Processing of subsets is indeterminate. Subsets are then combined.
It’s possible to for parallelism on a stream by calling .parallel (or sequentially by calling .sequential, but it’s already the default behaviour). The entire stream processing will turn sequential or parallel based on which was invoked last (if there are multiple calls in the stream).
Suitable cases for parallelism are:
- stateless: state of an element should not affect others
- non-interfering: data source must not be affected
- associative: result must not be affected by the order of operands (elements in the collection)
The appropriate Collectors should be used when parallelism is on (toMap => toConcurrentMap).
It’s useful if:
- There is a large number of elements.
- Multiple CPU cores are available to physically parallelize computations
- processing of stream elements require significant CPU resources