|
Java example source code file (ReferencePipeline.java)
This example Java source code file (ReferencePipeline.java) is included in the alvinalexander.com
"Java Source Code
Warehouse" project. The intent of this project is to help you "Learn
Java by Example" TM.
Learn more about this Java project at its project page.
The ReferencePipeline.java Java example source code
/*
* Copyright (c) 2012, 2013, Oracle and/or its affiliates. All rights reserved.
* DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
*
* This code is free software; you can redistribute it and/or modify it
* under the terms of the GNU General Public License version 2 only, as
* published by the Free Software Foundation. Oracle designates this
* particular file as subject to the "Classpath" exception as provided
* by Oracle in the LICENSE file that accompanied this code.
*
* This code is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License
* version 2 for more details (a copy is included in the LICENSE file that
* accompanied this code).
*
* You should have received a copy of the GNU General Public License version
* 2 along with this work; if not, write to the Free Software Foundation,
* Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
*
* Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA
* or visit www.oracle.com if you need additional information or have any
* questions.
*/
package java.util.stream;
import java.util.Comparator;
import java.util.Iterator;
import java.util.Objects;
import java.util.Optional;
import java.util.Spliterator;
import java.util.Spliterators;
import java.util.function.BiConsumer;
import java.util.function.BiFunction;
import java.util.function.BinaryOperator;
import java.util.function.Consumer;
import java.util.function.DoubleConsumer;
import java.util.function.Function;
import java.util.function.IntConsumer;
import java.util.function.IntFunction;
import java.util.function.LongConsumer;
import java.util.function.Predicate;
import java.util.function.Supplier;
import java.util.function.ToDoubleFunction;
import java.util.function.ToIntFunction;
import java.util.function.ToLongFunction;
/**
* Abstract base class for an intermediate pipeline stage or pipeline source
* stage implementing whose elements are of type {@code U}.
*
* @param <P_IN> type of elements in the upstream source
* @param <P_OUT> type of elements in produced by this stage
*
* @since 1.8
*/
abstract class ReferencePipeline<P_IN, P_OUT>
extends AbstractPipeline<P_IN, P_OUT, Stream
implements Stream<P_OUT> {
/**
* Constructor for the head of a stream pipeline.
*
* @param source {@code Supplier<Spliterator>} describing the stream source
* @param sourceFlags the source flags for the stream source, described in
* {@link StreamOpFlag}
* @param parallel {@code true} if the pipeline is parallel
*/
ReferencePipeline(Supplier<? extends Spliterator source,
int sourceFlags, boolean parallel) {
super(source, sourceFlags, parallel);
}
/**
* Constructor for the head of a stream pipeline.
*
* @param source {@code Spliterator} describing the stream source
* @param sourceFlags The source flags for the stream source, described in
* {@link StreamOpFlag}
* @param parallel {@code true} if the pipeline is parallel
*/
ReferencePipeline(Spliterator<?> source,
int sourceFlags, boolean parallel) {
super(source, sourceFlags, parallel);
}
/**
* Constructor for appending an intermediate operation onto an existing
* pipeline.
*
* @param upstream the upstream element source.
*/
ReferencePipeline(AbstractPipeline<?, P_IN, ?> upstream, int opFlags) {
super(upstream, opFlags);
}
// Shape-specific methods
@Override
final StreamShape getOutputShape() {
return StreamShape.REFERENCE;
}
@Override
final <P_IN> Node evaluateToNode(PipelineHelper helper,
Spliterator<P_IN> spliterator,
boolean flattenTree,
IntFunction<P_OUT[]> generator) {
return Nodes.collect(helper, spliterator, flattenTree, generator);
}
@Override
final <P_IN> Spliterator wrap(PipelineHelper ph,
Supplier<Spliterator supplier,
boolean isParallel) {
return new StreamSpliterators.WrappingSpliterator<>(ph, supplier, isParallel);
}
@Override
final Spliterator<P_OUT> lazySpliterator(Supplier> supplier) {
return new StreamSpliterators.DelegatingSpliterator<>(supplier);
}
@Override
final void forEachWithCancel(Spliterator<P_OUT> spliterator, Sink sink) {
do { } while (!sink.cancellationRequested() && spliterator.tryAdvance(sink));
}
@Override
final Node.Builder<P_OUT> makeNodeBuilder(long exactSizeIfKnown, IntFunction generator) {
return Nodes.builder(exactSizeIfKnown, generator);
}
// BaseStream
@Override
public final Iterator<P_OUT> iterator() {
return Spliterators.iterator(spliterator());
}
// Stream
// Stateless intermediate operations from Stream
@Override
public Stream<P_OUT> unordered() {
if (!isOrdered())
return this;
return new StatelessOp<P_OUT, P_OUT>(this, StreamShape.REFERENCE, StreamOpFlag.NOT_ORDERED) {
@Override
Sink<P_OUT> opWrapSink(int flags, Sink sink) {
return sink;
}
};
}
@Override
public final Stream<P_OUT> filter(Predicate predicate) {
Objects.requireNonNull(predicate);
return new StatelessOp<P_OUT, P_OUT>(this, StreamShape.REFERENCE,
StreamOpFlag.NOT_SIZED) {
@Override
Sink<P_OUT> opWrapSink(int flags, Sink sink) {
return new Sink.ChainedReference<P_OUT, P_OUT>(sink) {
@Override
public void begin(long size) {
downstream.begin(-1);
}
@Override
public void accept(P_OUT u) {
if (predicate.test(u))
downstream.accept(u);
}
};
}
};
}
@Override
@SuppressWarnings("unchecked")
public final <R> Stream map(Function mapper) {
Objects.requireNonNull(mapper);
return new StatelessOp<P_OUT, R>(this, StreamShape.REFERENCE,
StreamOpFlag.NOT_SORTED | StreamOpFlag.NOT_DISTINCT) {
@Override
Sink<P_OUT> opWrapSink(int flags, Sink sink) {
return new Sink.ChainedReference<P_OUT, R>(sink) {
@Override
public void accept(P_OUT u) {
downstream.accept(mapper.apply(u));
}
};
}
};
}
@Override
public final IntStream mapToInt(ToIntFunction<? super P_OUT> mapper) {
Objects.requireNonNull(mapper);
return new IntPipeline.StatelessOp<P_OUT>(this, StreamShape.REFERENCE,
StreamOpFlag.NOT_SORTED | StreamOpFlag.NOT_DISTINCT) {
@Override
Sink<P_OUT> opWrapSink(int flags, Sink sink) {
return new Sink.ChainedReference<P_OUT, Integer>(sink) {
@Override
public void accept(P_OUT u) {
downstream.accept(mapper.applyAsInt(u));
}
};
}
};
}
@Override
public final LongStream mapToLong(ToLongFunction<? super P_OUT> mapper) {
Objects.requireNonNull(mapper);
return new LongPipeline.StatelessOp<P_OUT>(this, StreamShape.REFERENCE,
StreamOpFlag.NOT_SORTED | StreamOpFlag.NOT_DISTINCT) {
@Override
Sink<P_OUT> opWrapSink(int flags, Sink sink) {
return new Sink.ChainedReference<P_OUT, Long>(sink) {
@Override
public void accept(P_OUT u) {
downstream.accept(mapper.applyAsLong(u));
}
};
}
};
}
@Override
public final DoubleStream mapToDouble(ToDoubleFunction<? super P_OUT> mapper) {
Objects.requireNonNull(mapper);
return new DoublePipeline.StatelessOp<P_OUT>(this, StreamShape.REFERENCE,
StreamOpFlag.NOT_SORTED | StreamOpFlag.NOT_DISTINCT) {
@Override
Sink<P_OUT> opWrapSink(int flags, Sink sink) {
return new Sink.ChainedReference<P_OUT, Double>(sink) {
@Override
public void accept(P_OUT u) {
downstream.accept(mapper.applyAsDouble(u));
}
};
}
};
}
@Override
public final <R> Stream flatMap(Function> mapper) {
Objects.requireNonNull(mapper);
// We can do better than this, by polling cancellationRequested when stream is infinite
return new StatelessOp<P_OUT, R>(this, StreamShape.REFERENCE,
StreamOpFlag.NOT_SORTED | StreamOpFlag.NOT_DISTINCT | StreamOpFlag.NOT_SIZED) {
@Override
Sink<P_OUT> opWrapSink(int flags, Sink sink) {
return new Sink.ChainedReference<P_OUT, R>(sink) {
@Override
public void begin(long size) {
downstream.begin(-1);
}
@Override
public void accept(P_OUT u) {
try (Stream<? extends R> result = mapper.apply(u)) {
// We can do better that this too; optimize for depth=0 case and just grab spliterator and forEach it
if (result != null)
result.sequential().forEach(downstream);
}
}
};
}
};
}
@Override
public final IntStream flatMapToInt(Function<? super P_OUT, ? extends IntStream> mapper) {
Objects.requireNonNull(mapper);
// We can do better than this, by polling cancellationRequested when stream is infinite
return new IntPipeline.StatelessOp<P_OUT>(this, StreamShape.REFERENCE,
StreamOpFlag.NOT_SORTED | StreamOpFlag.NOT_DISTINCT | StreamOpFlag.NOT_SIZED) {
@Override
Sink<P_OUT> opWrapSink(int flags, Sink sink) {
return new Sink.ChainedReference<P_OUT, Integer>(sink) {
IntConsumer downstreamAsInt = downstream::accept;
@Override
public void begin(long size) {
downstream.begin(-1);
}
@Override
public void accept(P_OUT u) {
try (IntStream result = mapper.apply(u)) {
// We can do better that this too; optimize for depth=0 case and just grab spliterator and forEach it
if (result != null)
result.sequential().forEach(downstreamAsInt);
}
}
};
}
};
}
@Override
public final DoubleStream flatMapToDouble(Function<? super P_OUT, ? extends DoubleStream> mapper) {
Objects.requireNonNull(mapper);
// We can do better than this, by polling cancellationRequested when stream is infinite
return new DoublePipeline.StatelessOp<P_OUT>(this, StreamShape.REFERENCE,
StreamOpFlag.NOT_SORTED | StreamOpFlag.NOT_DISTINCT | StreamOpFlag.NOT_SIZED) {
@Override
Sink<P_OUT> opWrapSink(int flags, Sink sink) {
return new Sink.ChainedReference<P_OUT, Double>(sink) {
DoubleConsumer downstreamAsDouble = downstream::accept;
@Override
public void begin(long size) {
downstream.begin(-1);
}
@Override
public void accept(P_OUT u) {
try (DoubleStream result = mapper.apply(u)) {
// We can do better that this too; optimize for depth=0 case and just grab spliterator and forEach it
if (result != null)
result.sequential().forEach(downstreamAsDouble);
}
}
};
}
};
}
@Override
public final LongStream flatMapToLong(Function<? super P_OUT, ? extends LongStream> mapper) {
Objects.requireNonNull(mapper);
// We can do better than this, by polling cancellationRequested when stream is infinite
return new LongPipeline.StatelessOp<P_OUT>(this, StreamShape.REFERENCE,
StreamOpFlag.NOT_SORTED | StreamOpFlag.NOT_DISTINCT | StreamOpFlag.NOT_SIZED) {
@Override
Sink<P_OUT> opWrapSink(int flags, Sink sink) {
return new Sink.ChainedReference<P_OUT, Long>(sink) {
LongConsumer downstreamAsLong = downstream::accept;
@Override
public void begin(long size) {
downstream.begin(-1);
}
@Override
public void accept(P_OUT u) {
try (LongStream result = mapper.apply(u)) {
// We can do better that this too; optimize for depth=0 case and just grab spliterator and forEach it
if (result != null)
result.sequential().forEach(downstreamAsLong);
}
}
};
}
};
}
@Override
public final Stream<P_OUT> peek(Consumer action) {
Objects.requireNonNull(action);
return new StatelessOp<P_OUT, P_OUT>(this, StreamShape.REFERENCE,
0) {
@Override
Sink<P_OUT> opWrapSink(int flags, Sink sink) {
return new Sink.ChainedReference<P_OUT, P_OUT>(sink) {
@Override
public void accept(P_OUT u) {
action.accept(u);
downstream.accept(u);
}
};
}
};
}
// Stateful intermediate operations from Stream
@Override
public final Stream<P_OUT> distinct() {
return DistinctOps.makeRef(this);
}
@Override
public final Stream<P_OUT> sorted() {
return SortedOps.makeRef(this);
}
@Override
public final Stream<P_OUT> sorted(Comparator comparator) {
return SortedOps.makeRef(this, comparator);
}
@Override
public final Stream<P_OUT> limit(long maxSize) {
if (maxSize < 0)
throw new IllegalArgumentException(Long.toString(maxSize));
return SliceOps.makeRef(this, 0, maxSize);
}
@Override
public final Stream<P_OUT> skip(long n) {
if (n < 0)
throw new IllegalArgumentException(Long.toString(n));
if (n == 0)
return this;
else
return SliceOps.makeRef(this, n, -1);
}
// Terminal operations from Stream
@Override
public void forEach(Consumer<? super P_OUT> action) {
evaluate(ForEachOps.makeRef(action, false));
}
@Override
public void forEachOrdered(Consumer<? super P_OUT> action) {
evaluate(ForEachOps.makeRef(action, true));
}
@Override
@SuppressWarnings("unchecked")
public final <A> A[] toArray(IntFunction generator) {
// Since A has no relation to U (not possible to declare that A is an upper bound of U)
// there will be no static type checking.
// Therefore use a raw type and assume A == U rather than propagating the separation of A and U
// throughout the code-base.
// The runtime type of U is never checked for equality with the component type of the runtime type of A[].
// Runtime checking will be performed when an element is stored in A[], thus if A is not a
// super type of U an ArrayStoreException will be thrown.
@SuppressWarnings("rawtypes")
IntFunction rawGenerator = (IntFunction) generator;
return (A[]) Nodes.flatten(evaluateToArrayNode(rawGenerator), rawGenerator)
.asArray(rawGenerator);
}
@Override
public final Object[] toArray() {
return toArray(Object[]::new);
}
@Override
public final boolean anyMatch(Predicate<? super P_OUT> predicate) {
return evaluate(MatchOps.makeRef(predicate, MatchOps.MatchKind.ANY));
}
@Override
public final boolean allMatch(Predicate<? super P_OUT> predicate) {
return evaluate(MatchOps.makeRef(predicate, MatchOps.MatchKind.ALL));
}
@Override
public final boolean noneMatch(Predicate<? super P_OUT> predicate) {
return evaluate(MatchOps.makeRef(predicate, MatchOps.MatchKind.NONE));
}
@Override
public final Optional<P_OUT> findFirst() {
return evaluate(FindOps.makeRef(true));
}
@Override
public final Optional<P_OUT> findAny() {
return evaluate(FindOps.makeRef(false));
}
@Override
public final P_OUT reduce(final P_OUT identity, final BinaryOperator<P_OUT> accumulator) {
return evaluate(ReduceOps.makeRef(identity, accumulator, accumulator));
}
@Override
public final Optional<P_OUT> reduce(BinaryOperator accumulator) {
return evaluate(ReduceOps.makeRef(accumulator));
}
@Override
public final <R> R reduce(R identity, BiFunction accumulator, BinaryOperator combiner) {
return evaluate(ReduceOps.makeRef(identity, accumulator, combiner));
}
@Override
@SuppressWarnings("unchecked")
public final <R, A> R collect(Collector collector) {
A container;
if (isParallel()
&& (collector.characteristics().contains(Collector.Characteristics.CONCURRENT))
&& (!isOrdered() || collector.characteristics().contains(Collector.Characteristics.UNORDERED))) {
container = collector.supplier().get();
BiConsumer<A, ? super P_OUT> accumulator = collector.accumulator();
forEach(u -> accumulator.accept(container, u));
}
else {
container = evaluate(ReduceOps.makeRef(collector));
}
return collector.characteristics().contains(Collector.Characteristics.IDENTITY_FINISH)
? (R) container
: collector.finisher().apply(container);
}
@Override
public final <R> R collect(Supplier supplier,
BiConsumer<R, ? super P_OUT> accumulator,
BiConsumer<R, R> combiner) {
return evaluate(ReduceOps.makeRef(supplier, accumulator, combiner));
}
@Override
public final Optional<P_OUT> max(Comparator comparator) {
return reduce(BinaryOperator.maxBy(comparator));
}
@Override
public final Optional<P_OUT> min(Comparator comparator) {
return reduce(BinaryOperator.minBy(comparator));
}
@Override
public final long count() {
return mapToLong(e -> 1L).sum();
}
//
/**
* Source stage of a ReferencePipeline.
*
* @param <E_IN> type of elements in the upstream source
* @param <E_OUT> type of elements in produced by this stage
* @since 1.8
*/
static class Head<E_IN, E_OUT> extends ReferencePipeline {
/**
* Constructor for the source stage of a Stream.
*
* @param source {@code Supplier<Spliterator>} describing the stream
* source
* @param sourceFlags the source flags for the stream source, described
* in {@link StreamOpFlag}
*/
Head(Supplier<? extends Spliterator source,
int sourceFlags, boolean parallel) {
super(source, sourceFlags, parallel);
}
/**
* Constructor for the source stage of a Stream.
*
* @param source {@code Spliterator} describing the stream source
* @param sourceFlags the source flags for the stream source, described
* in {@link StreamOpFlag}
*/
Head(Spliterator<?> source,
int sourceFlags, boolean parallel) {
super(source, sourceFlags, parallel);
}
@Override
final boolean opIsStateful() {
throw new UnsupportedOperationException();
}
@Override
final Sink<E_IN> opWrapSink(int flags, Sink sink) {
throw new UnsupportedOperationException();
}
// Optimized sequential terminal operations for the head of the pipeline
@Override
public void forEach(Consumer<? super E_OUT> action) {
if (!isParallel()) {
sourceStageSpliterator().forEachRemaining(action);
}
else {
super.forEach(action);
}
}
@Override
public void forEachOrdered(Consumer<? super E_OUT> action) {
if (!isParallel()) {
sourceStageSpliterator().forEachRemaining(action);
}
else {
super.forEachOrdered(action);
}
}
}
/**
* Base class for a stateless intermediate stage of a Stream.
*
* @param <E_IN> type of elements in the upstream source
* @param <E_OUT> type of elements in produced by this stage
* @since 1.8
*/
abstract static class StatelessOp<E_IN, E_OUT>
extends ReferencePipeline<E_IN, E_OUT> {
/**
* Construct a new Stream by appending a stateless intermediate
* operation to an existing stream.
*
* @param upstream The upstream pipeline stage
* @param inputShape The stream shape for the upstream pipeline stage
* @param opFlags Operation flags for the new stage
*/
StatelessOp(AbstractPipeline<?, E_IN, ?> upstream,
StreamShape inputShape,
int opFlags) {
super(upstream, opFlags);
assert upstream.getOutputShape() == inputShape;
}
@Override
final boolean opIsStateful() {
return false;
}
}
/**
* Base class for a stateful intermediate stage of a Stream.
*
* @param <E_IN> type of elements in the upstream source
* @param <E_OUT> type of elements in produced by this stage
* @since 1.8
*/
abstract static class StatefulOp<E_IN, E_OUT>
extends ReferencePipeline<E_IN, E_OUT> {
/**
* Construct a new Stream by appending a stateful intermediate operation
* to an existing stream.
* @param upstream The upstream pipeline stage
* @param inputShape The stream shape for the upstream pipeline stage
* @param opFlags Operation flags for the new stage
*/
StatefulOp(AbstractPipeline<?, E_IN, ?> upstream,
StreamShape inputShape,
int opFlags) {
super(upstream, opFlags);
assert upstream.getOutputShape() == inputShape;
}
@Override
final boolean opIsStateful() {
return true;
}
@Override
abstract <P_IN> Node opEvaluateParallel(PipelineHelper helper,
Spliterator<P_IN> spliterator,
IntFunction<E_OUT[]> generator);
}
}
Other Java examples (source code examples)
Here is a short list of links related to this Java ReferencePipeline.java source code file:
|