DistinctOps
DistinctOps
是一个专门用于实现 Stream.distinct()
操作的工厂类。正如它的名字所示,它的核心职责就是创建能够去除流中重复元素的操作。distinct()
是一个有状态的中间操作 (stateful intermediate operation),这意味着它通常需要看到所有元素才能决定哪些元素可以进入下一阶段,这使得它的实现比无状态操作(如 filter
, map
)要复杂得多。
ReferencePipeline(这个本身就是顶层的stream)中调用
@Overridepublic final Stream<P_OUT> distinct() {return DistinctOps.makeRef(this);}
下面我们分几个部分来详细解析 DistinctOps
类。
类的定位和结构
final class DistinctOps
: 这是一个final
类,并且构造函数是private
的,表明它是一个纯粹的工具类(Utility Class),不能被继承或实例化。所有的功能都通过静态方法makeRef
提供。makeRef(AbstractPipeline<?, T, ?> upstream)
: 这是该类的唯一入口。当调用stream.distinct()
时,底层实际上就是调用了这个DistinctOps.makeRef()
方法。它接收上游的Pipeline
作为输入,然后返回一个新的Pipeline
阶段,这个新阶段就包含了去重逻辑。// ... existing code ... static <T> ReferencePipeline<T, T> makeRef(AbstractPipeline<?, T, ?> upstream) {return new ReferencePipeline.StatefulOp<T, T>(upstream, StreamShape.REFERENCE,StreamOpFlag.IS_DISTINCT | StreamOpFlag.NOT_SIZED) {// ... 具体的实现 ...}; } // ... existing code ...
这里它创建了一个
ReferencePipeline.StatefulOp
的匿名子类实例。这立即告诉我们几点重要信息:distinct()
是一个有状态操作 (StatefulOp
)。- 它向上游声明了
IS_DISTINCT
标志,表示从这个阶段输出的流是已经去重的。 - 它还声明了
NOT_SIZED
标志,因为去重后元素的数量是未知的,除非处理完所有元素。
核心实现:串行 vs 并行,有序 vs 无序
distinct()
的复杂性在于它必须根据流的 并行性 (parallel) 和 有序性 (ordered) 采取截然不同的策略。DistinctOps
内部通过重写 StatefulOp
的几个关键方法来处理这些不同情况。
串行执行 (opWrapSink
)
这是最简单的情况,对应的是流的串行 (sequential) 执行。逻辑在 opWrapSink
方法中实现。
// ... existing code ...@OverrideSink<T> opWrapSink(int flags, Sink<T> sink) {Objects.requireNonNull(sink);if (StreamOpFlag.DISTINCT.isKnown(flags)) {// 如果上游已经去重,什么都不做,直接返回下游的 Sinkreturn sink;} else if (StreamOpFlag.SORTED.isKnown(flags)) {// 优化:如果流已排序,只需和前一个元素比较return new Sink.ChainedReference<>(sink) {boolean seenNull;T lastSeen;// ...@Overridepublic void accept(T t) {if (t == null) {if (!seenNull) { /* ... */ }} else if (lastSeen == null || !t.equals(lastSeen)) {downstream.accept(lastSeen = t);}}};} else {// 通用情况:使用 Set 来记录见过的元素return new Sink.ChainedReference<>(sink) {Set<T> seen;@Overridepublic void begin(long size) {seen = new HashSet<>();downstream.begin(-1);}// ...@Overridepublic void accept(T t) {if (seen.add(t)) { // Set.add() 返回 true 表示添加成功(即之前没有)downstream.accept(t);}}};}}
// ... existing code ...
逻辑分析:
- 已去重 (
DISTINCT.isKnown
): 如果流已经被标记为去重的,distinct()
就成了一个空操作(no-op),直接把元素传给下游。 - 已排序 (
SORTED.isKnown
): 这是一个非常重要的优化。如果流是排序的,那么重复的元素必然是相邻的。因此,我们不需要一个Set
来存储所有见过的元素,只需要记住上一个元素 (lastSeen
) 即可。这极大地降低了空间复杂度。 - 通用情况: 对于无序的流,唯一的办法就是用一个
HashSet
来存储所有已经见过的元素。每次来一个新元素,尝试添加到Set
中。如果add
方法返回true
,说明这是个新元素,就把它传递给下游。
并行执行 (opEvaluateParallel
)
这是最复杂的情况,对应流的并行 (parallel) 执行。并行执行需要一个屏障 (barrier) 操作,即必须处理完所有分片(Spliterator)的数据,合并结果后才能继续。
// ... existing code ...@Override<P_IN> Node<T> opEvaluateParallel(PipelineHelper<T> helper,Spliterator<P_IN> spliterator,IntFunction<T[]> generator) {if (StreamOpFlag.DISTINCT.isKnown(helper.getStreamAndOpFlags())) {// 已去重,空操作return helper.evaluate(spliterator, false, generator);}else if (StreamOpFlag.ORDERED.isKnown(helper.getStreamAndOpFlags())) {// 有序并行:退化为串行处理模式来保证顺序return reduce(helper, spliterator);}else {// 无序并行:最高效的并行模式AtomicBoolean seenNull = new AtomicBoolean(false);ConcurrentHashMap<T, Boolean> map = new ConcurrentHashMap<>();// ... 并行地将所有元素放入 ConcurrentHashMap 来去重forEachOp.evaluateParallel(helper, spliterator);// ...return Nodes.node(keys);}}
// ... existing code ...
逻辑分析:
- 有序并行 (
ORDERED.isKnown
): 为了保证元素的顺序,并行distinct
无法做到真正的“懒加载”。它必须收集所有元素,在一个地方完成去重,然后再把结果交给下游。这里的reduce
方法内部使用了LinkedHashSet
来保证顺序,这实际上是一个代价高昂的屏障操作。 - 无序并行: 这是并行
distinct
性能最好的场景。因为它不关心顺序,所以可以充分利用并行性。- 它使用
ConcurrentHashMap
作为共享的Set
(因为ConcurrentHashMap
是线程安全的,而HashSet
不是)。 - 每个线程都把自己分片中的元素尝试放入这个共享的
map
中。putIfAbsent
是一个原子操作,可以保证只有一个线程能成功放入一个特定的元素。 - 所有线程完成后,
map
的keySet
就是最终去重后的结果集。
- 它使用
- ConcurrentHashMap 有一个限制:它的 key 和 value 都不能为 null。
为了处理流中可能存在的 null 元素,代码使用了一个额外的 AtomicBoolean seenNull 标志位。如果流中出现了 null,就将这个标志设为 true。
并行懒加载 (opEvaluateParallelLazy
)
这个方法用于支持可以“懒加载”的并行操作。
// ... existing code ...@Override<P_IN> Spliterator<T> opEvaluateParallelLazy(PipelineHelper<T> helper, Spliterator<P_IN> spliterator) {if (StreamOpFlag.DISTINCT.isKnown(helper.getStreamAndOpFlags())) {// ...}else if (StreamOpFlag.ORDERED.isKnown(helper.getStreamAndOpFlags())) {// 有序流不支持懒加载,必须退化为屏障操作return reduce(helper, spliterator).spliterator();}else {// 无序流可以懒加载return new StreamSpliterators.DistinctSpliterator<>(helper.wrapSpliterator(spliterator));}}
// ... existing code ...
逻辑分析:
- 对于有序流,
distinct
无法实现懒加载,因为它需要看到所有元素才能确定最终顺序。所以它退化成了opEvaluateParallel
中的reduce
逻辑,收集所有元素再返回一个新的Spliterator
。 - 对于无序流,可以实现懒加载(逐个处理)。它返回一个特制的
DistinctSpliterator
,这个Spliterator
在内部包装了原始的Spliterator
,并在tryAdvance
时进行去重判断。
MatchOps
MatchOps
是 Stream API 内部一个至关重要的工厂类,它专门负责创建和管理 短路(short-circuiting) 的终端操作,也就是我们常用的 anyMatch()
, allMatch()
, noneMatch()
。首先看类的定义:
final class MatchOps {private MatchOps() { }
//...
和 SortedOps
、ReduceOps
类似,它是一个包可见的、拥有私有构造函数的 final
类。这表明它是一个内部使用的工具工厂,专门用于创建 TerminalOp
(终端操作)的实例。
它的核心职责是:根据用户调用的匹配方法(anyMatch
等)和传入的谓词(Predicate
),构建一个能够高效执行匹配逻辑的终端操作。
短路(Short-circuiting)是理解 MatchOps
的关键。anyMatch
, allMatch
, noneMatch
都是短路操作。这意味着它们不一定需要处理流中的所有元素就能得出最终结果。
anyMatch(p)
: 只要找到一个满足谓词p
的元素,结果就确定为true
,无需再检查后续元素。allMatch(p)
: 只要找到一个不满足谓词p
的元素,结果就确定为false
,无需再检查后续元素。noneMatch(p)
: 只要找到一个满足谓词p
的元素,结果就确定为false
,无需再检查后续元素。
这种“提前退出”的能力就是短路,它能极大地提升在某些数据场景下的执行效率。
MatchKind
枚举:统一匹配逻辑
为了用一套统一的逻辑来处理这三种不同的匹配规则,MatchOps
设计了一个非常精巧的枚举 MatchKind
。
// ... existing code ...enum MatchKind {/** Do any elements match the predicate? */ANY(true, true),/** Do all elements match the predicate? */ALL(false, false),/** Do no elements match the predicate? */NONE(true, false);private final boolean stopOnPredicateMatches;private final boolean shortCircuitResult;private MatchKind(boolean stopOnPredicateMatches,boolean shortCircuitResult) {this.stopOnPredicateMatches = stopOnPredicateMatches;this.shortCircuitResult = shortCircuitResult;}}
// ... existing code ...
这个枚举通过两个布尔值,巧妙地描述了三种匹配行为的共性与差异:
stopOnPredicateMatches
: 当谓词(predicate
)的计算结果为true
时,是否应该停止处理?ANY
: 是。找到一个匹配的就停。ALL
: 否。找到一个匹配的还不够,必须继续找,直到找到不匹配的或者遍历完。NONE
: 是。找到一个匹配的就停(因为结果已经确定是false
)。
shortCircuitResult
: 如果发生了短路(提前停止),那么最终的结果应该是什么?ANY
:true
。ALL
:false
。NONE
:false
。
通过这个枚举,后续的实现代码(如下面的 MatchSink
)就可以写出通用的逻辑,而无需为 any
, all
, none
分别写 if-else
分支。
工厂方法与 MatchSink
MatchOps
提供了一系列 make...
工厂方法,用于为不同类型的流(Stream
, IntStream
等)创建匹配操作。我们以 makeRef
为例:
// ... existing code ...public static <T> TerminalOp<T, Boolean> makeRef(Predicate<? super T> predicate,MatchKind matchKind) {Objects.requireNonNull(predicate);Objects.requireNonNull(matchKind);class MatchSink extends BooleanTerminalSink<T> {MatchSink() {super(matchKind);}@Overridepublic void accept(T t) {if (!stop && predicate.test(t) == matchKind.stopOnPredicateMatches) {stop = true;value = matchKind.shortCircuitResult;}}}return new MatchOp<>(StreamShape.REFERENCE, matchKind, MatchSink::new);}
// ... existing code ...
- 它接收一个
Predicate
和一个MatchKind
。 - 内部定义了一个局部类
MatchSink
,它继承自BooleanTerminalSink
。Sink
是流中处理元素的末端。 MatchSink
的accept(T t)
方法是核心逻辑所在:!stop
: 检查是否已经有其他元素触发了短路。如果已经stop
,则直接忽略当前元素。predicate.test(t) == matchKind.stopOnPredicateMatches
: 这是一个非常优雅的判断。它将当前元素的匹配结果 (true
或false
) 与MatchKind
中定义的“停止条件”进行比较。- 对于
ANY
:stopOnPredicateMatches
是true
。当predicate.test(t)
为true
时,条件成立。 - 对于
ALL
:stopOnPredicateMatches
是false
。当predicate.test(t)
为false
时,条件成立。 - 对于
NONE
:stopOnPredicateMatches
是true
。当predicate.test(t)
为true
时,条件成立。
- 对于
- 如果条件成立,就设置
stop = true
来通知上游停止发送数据,并设置value = matchKind.shortCircuitResult
来记录短路时的结果。
- 最后,它创建一个
MatchOp
实例并返回。MatchOp
是TerminalOp
的一个具体实现,它封装了MatchKind
和用于创建MatchSink
的Supplier
。
并行执行:MatchTask
对于并行流,MatchOp
的 evaluateParallel
方法会创建一个 MatchTask
。
// ... existing code ...@SuppressWarnings("serial")private static final class MatchTask<P_IN, P_OUT>extends AbstractShortCircuitTask<P_IN, P_OUT, Boolean, MatchTask<P_IN, P_OUT>> {
// ... (constructors) ...@Overrideprotected Boolean doLeaf() {boolean b = helper.wrapAndCopyInto(op.sinkSupplier.get(), spliterator).getAndClearState();if (b == op.matchKind.shortCircuitResult)shortCircuit(b);return null;}@Overrideprotected Boolean getEmptyResult() {return !op.matchKind.shortCircuitResult;}}
// ... existing code ...
MatchTask
继承自 AbstractShortCircuitTask
,这是一个专为并行短路操作设计的 Fork/Join 任务。
doLeaf()
: 这是叶子任务执行的逻辑。它会对分配给自己的那一小部分数据执行匹配操作。helper.wrapAndCopyInto(...)
: 执行匹配,得到这部分数据的结果b
。if (b == op.matchKind.shortCircuitResult)
: 判断这个局部结果b
是否就是最终的短路结果。例如,对于anyMatch
,如果这个叶子任务发现了一个匹配项,它的结果b
就会是true
,这恰好等于ANY
的shortCircuitResult
。shortCircuit(b)
: 如果发现了可以导致整个流短路的结果,它会调用shortCircuit
。这个方法会通知 Fork/Join 框架,一个最终结果已经找到了,所有其他正在运行的、或者尚未开始的MatchTask
都可以被取消了。这实现了并行的短路。
getEmptyResult()
: 如果所有任务都正常执行完毕,没有发生短路,那么这个方法返回最终的结果。例如,对于allMatch
,如果所有元素都匹配,没有发生短路,最终结果就是true
,即!op.matchKind.shortCircuitResult
(!false
)。
总结
MatchOps
是一个设计精良的内部工厂,它通过以下方式优雅地实现了 anyMatch
, allMatch
, noneMatch
:
MatchKind
枚举: 用两个布尔标志统一了三种匹配模式的逻辑,避免了重复代码。MatchSink
: 作为串行执行的核心,它利用MatchKind
的配置来实现通用的、可短路的元素处理逻辑。MatchTask
: 作为并行执行的核心,它利用AbstractShortCircuitTask
的能力,在 Fork/Join 框架下实现了高效的并行短路,一旦任何一个子任务找到了决定性的结果,就能迅速终止整个计算。
FindOps
它与我们之前讨论的 MatchOps
非常相似,但专注于实现 findFirst()
和 findAny()
这两个终端操作。FindOps
是一个内部使用的工厂类,其核心职责是创建用于“查找”操作的终端操作(TerminalOp
)实例。
final class FindOps {private FindOps() { }
//...
和 MatchOps
一样,它也是一个 final
的、拥有私有构造函数的工具类,专门用于 Stream API 内部。它处理的 findFirst()
和 findAny()
都是短路操作,一旦找到符合条件的元素,就可以立即终止流的处理。
findFirst()
vs findAny()
这是理解 FindOps
的关键区别点:
findFirst()
: 必须返回流中遭遇顺序(encounter order)的第一个元素。这是一个有序操作。在并行流中,即使其他线程更快地找到了一个元素,也必须等待遭遇顺序更靠前的任务完成,以确保返回的是“最左边”的那个结果。findAny()
: 可以返回流中的任意一个元素。这是一个无序操作。在并行流中,它允许任何一个线程只要找到了一个元素,就可以立即短路整个计算,而无需关心这个元素在原始流中的位置。这使得findAny()
在并行流中的性能通常优于findFirst()
。
FindOps
内部通过一个布尔标志 mustFindFirst
来区分这两种行为。
FindOps
提供了一系列 make...
方法,用于为不同类型的流(对象、int、long、double)创建查找操作。
// ... existing code ...@SuppressWarnings("unchecked")public static <T> TerminalOp<T, Optional<T>> makeRef(boolean mustFindFirst) {return (TerminalOp<T, Optional<T>>)(mustFindFirst ? FindSink.OfRef.OP_FIND_FIRST : FindSink.OfRef.OP_FIND_ANY);}
// ... existing code ...
这些方法非常简洁。它们根据 mustFindFirst
参数,直接返回一个预先创建好的、静态的 TerminalOp
实例。这些实例(如 OP_FIND_FIRST
)被定义在内部类 FindSink
中。这种方式避免了重复创建对象,提高了效率。
FindSink
- 串行执行的核心
FindSink
是实现查找逻辑的 TerminalSink
。
// ... existing code ...private abstract static class FindSink<T, O> implements TerminalSink<T, O> {boolean hasValue;T value;// ...@Overridepublic void accept(T value) {if (!hasValue) {hasValue = true;this.value = value;}}@Overridepublic boolean cancellationRequested() {return hasValue;}
// ... existing code ...
它的逻辑非常直接:
accept(T value)
: 当接收到第一个元素时,将其保存在this.value
中,并将hasValue
标志设为true
。由于if (!hasValue)
的判断,后续所有到达的元素都会被忽略。cancellationRequested()
: 一旦hasValue
变为true
,此方法就返回true
。这会通知上游的流管道:“我已经找到结果了,请不要再发送更多元素了”,从而实现串行流的短路。
FindSink
有多个静态内部子类(OfRef
, OfInt
等),用于处理不同数据类型,并预先创建了 OP_FIND_FIRST
和 OP_FIND_ANY
这两个静态常量。
FindOp
- 终端操作的封装
FindOp
是 TerminalOp
接口的具体实现,它封装了查找操作的所有元信息。
// ... existing code ...private static final class FindOp<T, O> implements TerminalOp<T, O> {private final StreamShape shape;final int opFlags;final O emptyValue;// ...final Supplier<TerminalSink<T, O>> sinkSupplier;FindOp(boolean mustFindFirst,StreamShape shape,O emptyValue,Predicate<O> presentPredicate,Supplier<TerminalSink<T, O>> sinkSupplier) {this.opFlags = StreamOpFlag.IS_SHORT_CIRCUIT | (mustFindFirst ? 0 : StreamOpFlag.NOT_ORDERED);// ...}
// ... existing code ...
opFlags
: 操作标志。所有查找操作都是IS_SHORT_CIRCUIT
。对于findAny
(mustFindFirst
为false
),还会额外加上NOT_ORDERED
标志,告知流管道可以进行无序优化。evaluateSequential(...)
: 定义了串行执行的逻辑,即创建一个FindSink
并让流把元素送入其中。evaluateParallel(...)
: 定义了并行执行的逻辑,它会创建一个FindTask
并启动它。
FindTask
- 并行执行的核心
FindTask
是 AbstractShortCircuitTask
的子类,负责并行查找。
// ... existing code ...private static final class FindTask<P_IN, P_OUT, O>extends AbstractShortCircuitTask<P_IN, P_OUT, O, FindTask<P_IN, P_OUT, O>> {private final FindOp<P_OUT, O> op;private final boolean mustFindFirst;
// ...@Overrideprotected O doLeaf() {O result = helper.wrapAndCopyInto(op.sinkSupplier.get(), spliterator).get();if (!mustFindFirst) { // findAny 逻辑if (result != null)shortCircuit(result); // 找到任何一个,立即短路return null;}else { // findFirst 逻辑if (result != null) {foundResult(result); // 找到一个,需要通过 cancelLaterNodes 确保有序性return result;}elsereturn null;}}
// ...}
// ... existing code ...
doLeaf()
方法清晰地展示了 findFirst
和 findAny
在并行时的区别:
findAny
(!mustFindFirst
): 叶子任务只要处理自己的数据分片并找到了一个元素 (result != null
),就立刻调用shortCircuit(result)
。这会尝试将结果写入共享的AtomicReference
,并触发全局的短路,所有其他任务都会尽快停止。findFirst
(mustFindFirst
): 叶子任务找到一个元素后,不能直接宣布胜利。它需要调用foundResult(result)
,这个方法内部会调用cancelLaterNodes()
。这会取消掉所有处理“更右边”(遭遇顺序更靠后)数据的任务,同时允许“更左边”的任务继续执行。最终,只有最左边的那个找到了结果的任务,其结果才会被采纳为最终结果。这个过程通过onCompletion
方法中的逻辑来保证。
private void foundResult(O answer) {if (isLeftmostNode())shortCircuit(answer);elsecancelLaterNodes();}AbstractTask::protected boolean isLeftmostNode() {@SuppressWarnings("unchecked")K node = (K) this;while (node != null) {K parent = node.getParent();if (parent != null && parent.leftChild != node)return false;node = parent;}return true;}
总结
FindOps
通过一系列精心设计的内部类,为 findFirst
和 findAny
提供了统一且高效的实现:
- 工厂方法 (
make...
): 提供简洁的入口,并复用预创建的TerminalOp
实例。 FindSink
: 实现了串行查找的短路逻辑。FindOp
: 封装了操作的元数据,并区分了findFirst
和findAny
的标志。FindTask
: 作为AbstractShortCircuitTask
的子类,它为并行查找提供了核心实现,并巧妙地利用父类的短路和取消机制,分别实现了findAny
的“快速响应”和findFirst
的“有序保证”。