Java Stream 宏观介绍见:深入解析 Java Stream 设计:从四幕剧看流水线设计与执行机制-CSDN博客
PipelineHelper
PipelineHelper
是 Java Stream API 内部一个至关重要的辅助类。正如其名,它是一个“管道助手”。可以把它想象成一个执行上下文对象,当一个流管道需要被执行时(即调用终端操作时),这个对象会被创建并传递,它封装了执行该管道所需的所有信息和核心工具方法。
- 在
PipelineHelper
这个抽象类的定义中,没有声明任何字段。它是一个纯粹的行为和契约定义类,其状态将由它的具体实现类(主要是AbstractPipeline
)来持有。 PipelineHelper
是一个纯抽象类,它只定义了方法签名,没有任何一个方法的具体实现。所有方法的实现都委托给了它的子类。这种设计强制子类必须提供一套完整的管道执行机制。
抽象方法构成了 PipelineHelper
的核心能力,可以分为几类:
信息获取类方法
这类方法用于查询当前流管道的静态属性,是执行优化的关键依据。
abstract StreamShape getSourceShape();
- 语义:获取管道源头的“形状”,即流中元素的类型是引用类型 (
REFERENCE
)、int
、long
还是double
。
- 语义:获取管道源头的“形状”,即流中元素的类型是引用类型 (
abstract int getStreamAndOpFlags();
- 语义:获取整个管道(从源头到当前操作)合并后的特征标记 (
StreamOpFlag
)。这些标记包括SIZED
(大小已知)、ORDERED
(有序)、DISTINCT
(元素唯一)、SHORT_CIRCUIT
(可短路)等。终端操作会根据这些标记来选择最高效的执行策略。
- 语义:获取整个管道(从源头到当前操作)合并后的特征标记 (
abstract<P_IN> long exactOutputSizeIfKnown(Spliterator<P_IN> spliterator);
- 语义:在已知的情况下,计算处理完给定的
spliterator
后,会产生的确切输出元素数量。如果源是SIZED
的,并且中间没有filter
等改变大小的操作,这个方法就能返回一个精确值。这对于toArray
等操作预先分配内存空间至关重要。
- 语义:在已知的情况下,计算处理完给定的
核心执行类方法
这类方法定义了驱动管道数据流动的核心逻辑。
abstract<P_IN> Sink<P_IN> wrapSink(Sink<P_OUT> sink);
- 语义:这是最核心的方法之一。它接受一个用于接收最终输出结果的
Sink
(来自终端操作),然后用当前管道中所有中间操作的逻辑,从后往前地对这个Sink
进行层层包装,最终返回一个包装好的、位于管道头部的Sink
。当向这个返回的Sink
推入一个元素时,这个元素会依次流过filter
、map
等所有中间操作,最终(如果没被过滤掉)到达原始的Sink
。
- 语义:这是最核心的方法之一。它接受一个用于接收最终输出结果的
abstract<P_IN> void copyInto(Sink<P_IN> wrappedSink, Spliterator<P_IN> spliterator);
- 语义:将数据从源
spliterator
中不断取出,并喂给(accept
)已经通过wrapSink
包装好的Sink
。它负责驱动数据的流动。
- 语义:将数据从源
abstract <P_IN> boolean copyIntoWithCancel(Sink<P_IN> wrappedSink, Spliterator<P_IN> spliterator);
- 语义:
copyInto
的一个支持短路(cancellation)的版本。在每次推送元素后,它会检查Sink.cancellationRequested()
,如果为true
(例如findFirst
找到了元素),就立刻停止迭代。
- 语义:
abstract<P_IN, S extends Sink<P_OUT>> S wrapAndCopyInto(S sink, Spliterator<P_IN> spliterator);
- 语义:一个便捷方法,它内部调用
wrapSink
和copyInto
,将包装和数据拷贝两步操作合并为一步。
- 语义:一个便捷方法,它内部调用
结果聚合类方法
这类方法用于有状态操作(如 sorted
)或部分终端操作(如 toArray
)将结果收集到一个中间容器 Node
中。
abstract<P_IN> Node<P_OUT> evaluate(Spliterator<P_IN> spliterator, boolean flatten, IntFunction<P_OUT[]> generator);
- 语义:执行整个管道,并将所有输出结果收集到一个
Node
对象中返回。Node
是一个可以表示单个元素或元素树的内部结构。
- 语义:执行整个管道,并将所有输出结果收集到一个
abstract Node.Builder<P_OUT> makeNodeBuilder(long exactSizeIfKnown, IntFunction<P_OUT[]> generator);
- 语义:创建一个
Node.Builder
,这是一个用于构建Node
的辅助工具。
- 语义:创建一个
其他方法
abstract<P_IN> Spliterator<P_OUT> wrapSpliterator(Spliterator<P_IN> spliterator);
- 语义:提供一种不同于
Sink
模型的执行方式。它将管道操作包装到一个新的Spliterator
中,当遍历这个新的Spliterator
时,操作会惰性地应用到原始数据上。
- 语义:提供一种不同于
总结:语义和能力
PipelineHelper
的核心语义是封装一条流管道的完整执行方案。它不是管道本身,而是执行管道时所需的 “蓝图”和“工具箱” 。
它的能力可以总结为:
- 信息中心:提供关于管道的元数据(形状、标志、大小),供终端操作进行查询和优化决策。
- 执行引擎:定义了将数据源 (
Spliterator
) 和数据处理逻辑 (Sink
链) 连接起来并驱动数据流动的核心方法 (wrapSink
,copyInto
)。 - 流程编排器:它将流的“声明式”定义(一系列
map
,filter
调用)转化为一个“命令式”的执行过程。 - 结果聚合器:为需要缓冲所有元素的有状态操作和终端操作提供了收集结果到
Node
的能力。
AbstractPipeline
类概述
AbstractPipeline
类的结构和源码。这个抽象类是所有类型流(如 ReferencePipeline
for Stream<T>
, IntPipeline
for IntStream
等)实现的基础,在 Java Stream API 中扮演着至关重要的角色。
StreamSupport.java
文件中的 stream()
方法,正是通过实例化 ReferencePipeline.Head
(ReferencePipeline
的一个内部类,而 ReferencePipeline
继承自 AbstractPipeline
)来创建流的。
AbstractPipeline<E_IN, E_OUT, S extends BaseStream<E_OUT, S>>
是一个抽象类,它实现了 BaseStream
和 PipelineHelper
接口。
E_IN
: 上游阶段输入元素的类型。E_OUT
: 当前阶段输出元素的类型。S
: 当前流的类型 (例如Stream<E_OUT>
)。
主要结构和成员
AbstractPipeline
的字段主要用于构建和管理流管道这个双向链表,并维护整个管道的状态。
sourceStage
: 指向管道链表的头节点(源阶段)。所有阶段都共享同一个sourceStage
,用于获取全局信息,如数据源和并行标志。previousStage
: 指向链表中的前一个阶段。nextStage
: 指向链表中的后一个阶段。depth
: 当前阶段在管道中的深度(从0开始的索引)。sourceOrOpFlags
: 当前阶段操作的标志(如NOT_SIZED
)。combinedFlags
: 从源阶段到当前阶段累计的所有标志。sourceSpliterator
/sourceSupplier
: 数据源,只在sourceStage
中有效。parallel
: 是否为并行流,只在sourceStage
中有效。linkedOrConsumed
: 一个布尔标志,用于确保流只能被消费一次。一旦一个中间操作被链接到下游,或者一个终端操作开始执行,这个标志就会被设为true
,防止后续操作。sourceCloseAction
: 用于注册流关闭时需要执行的清理逻辑。
核心职责:构建和驱动流水线
AbstractPipeline
是整个 Stream API 实现的骨架和引擎。它的核心职责可以概括为以下几点:
-
流水线构建 (Pipeline Construction):
- 双向链表结构:通过
previousStage
和nextStage
字段,将各个操作阶段(stage)链接成一个双向链表。 - 构造器:提供了三种核心构造器:
AbstractPipeline(Spliterator, ...)
和AbstractPipeline(Supplier, ...)
用于创建流水线的源头 (Head)。AbstractPipeline(previousStage, ...)
用于追加 (Append) 一个新的中间操作阶段。
- 状态管理:通过
linkedOrConsumed
标志,严格保证了“流只能被消费一次”的原则。在构造链条时,它会将被链接的previousStage
标记为已消费,防止分叉。
- 双向链表结构:通过
-
标志位管理 (Flag Management):
sourceOrOpFlags
: 每个阶段都持有自己的操作标志(如SORTED
,DISTINCT
)。combinedFlags
: 在构建流水线时,通过StreamOpFlag.combineOpFlags()
方法,自顶向下地累积和计算从源头到当前阶段的所有标志。这使得任何一个阶段都能快速知道整个上游流水线的综合特性(例如,是否整体有序ORDERED
,是否包含短路操作SHORT_CIRCUIT
)。这是非常高效的设计。
-
终端操作的统一入口和驱动 (Terminal Operation Driver):
evaluate(TerminalOp)
: 这是所有终端操作的总入口。它不关心具体的终端操作是什么,只负责根据isParallel()
状态,调用terminalOp.evaluateParallel(...)
或terminalOp.evaluateSequential(...)
。sourceSpliterator(int)
: 这是evaluate
方法的核心辅助。它负责为终端操作准备好一个“就绪”的Spliterator
,尤其是通过预计算来处理并行流中的有状态操作(屏障),这是整个并行处理机制的精髓所在。
-
并行/顺序切换 (Parallel/Sequential Switching):
- 通过
parallel()
和sequential()
方法,简单地修改源头阶段的parallel
标志位,就能影响整个流水线的执行模式。这种控制集中在源头的设计非常简洁。
- 通过
-
核心
PipelineHelper
接口的实现:- 它实现了
PipelineHelper
接口中的大部分通用逻辑,如wrapAndCopyInto
,copyInto
,copyIntoWithCancel
等,这些方法定义了如何将Spliterator
中的数据“灌入”到Sink
链中,并处理了短路操作的逻辑。
- 它实现了
留给子类实现的职责:与“形状”相关的具体行为
AbstractPipeline
完美地抽象了所有 Stream(无论其元素是对象、int
、long
还是 double
)的通用流水线逻辑。它刻意将所有与**“形状” (Shape)** 相关的具体实现细节留给了子类。这里的“形状”指的是流的类型(REFERENCE
, INT_VALUE
, LONG_VALUE
, DOUBLE_VALUE
)。
子类(如 ReferencePipeline
, IntPipeline
等)必须实现以下抽象方法:
-
getOutputShape()
: 返回当前阶段的输出“形状”。这是类型系统的基础。 -
opWrapSink(int flags, Sink<E_OUT> sink)
: (核心抽象) 这是实现一个中间操作的关键。子类需要提供一个方法,将下游的Sink
“包装”成一个新的Sink
,这个新的Sink
在接收到元素后,会执行当前阶段的操作(如map
的转换逻辑,filter
的判断逻辑),然后将结果传递给原始的下游Sink
。 -
wrap(...)
和lazySpliterator(...)
: 这两个方法负责创建与“形状”匹配的Spliterator
包装器。例如,ReferencePipeline
会创建StreamSpliterators.WrappingSpliterator
,而IntPipeline
会创建StreamSpliterators.IntWrappingSpliterator
。 -
evaluateToNode(...)
,makeNodeBuilder(...)
: 这些方法与将流的结果收集到Node
(一个内部的、用于聚合结果的树状结构)中有关。不同“形状”的流需要不同类型的Node
和Node.Builder
(如Node.OfReference
,Node.OfInt
)。 -
opEvaluateParallel(...)
和opEvaluateParallelLazy(...)
: (核心抽象) 这是实现一个有状态操作的关键。子类必须为有状态操作(如sorted
,distinct
)提供具体的并行求值逻辑。
设计原因分析与启示
这种设计的精妙之处在于 “模板方法模式” (Template Method Pattern) 和 “职责分离原则” (Separation of Concerns) 的完美应用。
-
模板方法模式:
AbstractPipeline
定义了流水线执行的骨架(模板),例如evaluate()
方法规定了“获取Spliterator
-> 调用终端操作求值”这个流程。- 它将流程中可变的部分(如如何包装
Sink
、如何创建Spliterator
)定义为抽象方法(opWrapSink
,wrap
),交由子类去实现。 - 启示: 当我们设计一个具有固定流程但某些步骤细节可变的框架或组件时,模板方法模式是绝佳的选择。它能锁定核心逻辑的稳定性,同时提供高度的扩展性。
-
职责分离:
AbstractPipeline
只关心“如何驱动流水线”,它不关心流水线中流动的数据具体是什么类型,也不关心每个操作的具体业务逻辑。- 具体的操作逻辑被封装在
Sink
的实现中。 - 具体的类型处理被封装在
ReferencePipeline
,IntPipeline
等子类中。 - 启示: 在设计复杂系统时,要清晰地划分不同模块的职责边界。一个类或模块应该只有一个引起它变化的原因。
AbstractPipeline
的变化原因仅仅是“流水线驱动逻辑的变更”,而不是“增加了一种新的中间操作”或“支持一种新的数据类型”。
-
组合优于继承:
- 虽然这里用了继承,但流水线的构建更像是组合。每个
AbstractPipeline
对象都持有一个previousStage
的引用,形成一个操作链。Stream 的行为是由这一系列对象的组合来定义的,而不是通过一个巨大的、多层继承的类来实现。 - 启示: 优先考虑使用对象组合来构建复杂的行为,因为它比继承更加灵活。
- 虽然这里用了继承,但流水线的构建更像是组合。每个
通过分析 AbstractPipeline
,我们可以学到以下几点来提升自己的设计能力:
- 识别变与不变: 在设计之初,就要仔细思考你的系统中哪些是稳定不变的核心流程(不变),哪些是可能变化或需要扩展的细节(变)。将不变的部分固化在抽象基类中,将变化的部分抽象成接口或抽象方法。
- 善用模板方法: 对于有固定步骤的业务流程,使用模板方法模式可以极大地简化子类的实现,并保证核心流程的正确性。
- 明确职责边界: 一个好的抽象类应该有一个清晰、单一的职责。
AbstractPipeline
的职责就是“流水线控制器”,它做得非常出色。避免设计“万能”的基类。 - 面向接口/抽象编程:
AbstractPipeline
内部大量使用了Spliterator
,Sink
,TerminalOp
等接口,而不是具体的实现类。这使得它可以与任何符合接口定义的组件协同工作,大大增强了灵活性。 - 管理好状态:
AbstractPipeline
通过linkedOrConsumed
和parallel
等几个关键的状态字段,清晰地管理了流的生命周期和执行模式,保证了正确性和线程安全。在自己的设计中,也要仔细考虑对象的状态转换和并发访问问题。
为什么叫做pipeline而不是node?
单个 AbstractPipeline
对象很像一个链表中的节点 (Node)。然而,它被命名为 AbstractPipeline
(抽象管道)而不是 AbstractStage
或 AbstractNode
是因为它所代表的概念和职责远超一个简单的节点。
简单来说:一个 AbstractPipeline
实例代表管道中的一个“阶段”,但它本身的设计和功能是为了构建和管理整个“管道”。
让我们从几个方面来深入理解:
1. 结构上:既是节点,也知全局
AbstractPipeline
内部通过 previousStage
和 nextStage
字段,将各个操作(如 map
, filter
)链接起来,形成一个双向链表结构。这确实是节点的特征。
但是,请看这个关键字段:
// ... existing code .../*** Backlink to the head of the pipeline chain (self if this is the source* stage).*/@SuppressWarnings("rawtypes")private final AbstractPipeline sourceStage;
// ... existing code ...
这个 sourceStage
字段让每一个“节点”都能直接访问到整个链表的头部,也就是数据源头。一个普通的链表节点通常只知道它的前驱和后继。而 AbstractPipeline
的每个实例都拥有对整个管道起点的认知。
此外,还有 combinedFlags
字段,它会累积从源头到当前阶段所有操作的特性(比如 SIZED
, DISTINCT
等)。
这种设计意味着,虽然它是一个独立的阶段(节点),但它始终携带着整个管道(到它为止)的上下文信息。它的行为和优化都依赖于对整个管道的理解,而不仅仅是它自己这个环节。
2. 概念上:代表“数据流动的管道”
“Pipeline”(管道)这个词更强调的是过程和流动。数据从 sourceStage
流入,依次穿过每一个由 AbstractPipeline
实例代表的中间操作,最后从末端流出到终端操作。
- Node (节点) 是一个静态的概念,强调“存在于某处”。
- Pipeline (管道) 是一个动态的概念,强调“从一处流向另一处的过程”。
Stream API 的核心就是建立一个数据处理的流水线。AbstractPipeline
正是构建这条流水线的基础组件。所以用“Pipeline”来命名,更能体现其在数据流处理中的角色和作用。
3. 功能上:懒加载的“配方”
整个由 AbstractPipeline
链接起来的结构,在终端操作(如 collect
, forEach
)被调用之前,什么也不做。它只是一个“配方”或者说一个“蓝图”,描述了数据应该如何被处理。
当终端操作开始时,它会从最后一个 AbstractPipeline
阶段开始,反向遍历整个链表(管道),将每个阶段的操作包装(wrapSink
)成一个最终的、融合的 Sink
。然后,一次性地从源头拉取数据,让数据流过这个融合后的 Sink
,完成所有计算。
所以,AbstractPipeline
的链表结构本身就是对整个处理 流程(Pipeline) 的定义。
总结
特性 | 像一个 Node | 为什么叫 Pipeline |
---|---|---|
结构 | 通过 previousStage 和 nextStage 链接,形成链表。 | 每个实例都通过 sourceStage 了解管道源头,并用 combinedFlags 累积全局状态。 |
概念 | 是构成链表的一个基本单元。 | 强调数据从源头流经一系列处理阶段的动态过程。 |
功能 | 封装了单个操作(如 map , filter )。 | 整个链表共同定义了一个完整的、可执行的数据处理流水线(配方)。 |
因此,虽然 AbstractPipeline
的实例在结构上扮演着“节点”的角色,但它的命名着眼于它所构建的、更宏大的“数据处理管道”这一核心概念。这个命名精准地反映了 Stream API 的设计哲学。
启动Stream:evaluate
有两个evaluate,它们的关系是:
-
final <R> R evaluate(TerminalOp<E_OUT, R> terminalOp)
(将军/Orchestrator): 这是高层次的、通用的执行入口。它负责启动任何类型的终端操作。它不关心终端操作具体做什么,只负责接收一个代表“任务”的TerminalOp
对象,然后决定是按顺序执行还是并行执行,并把具体的执行工作委托给这个TerminalOp
对象。 -
final <P_IN> Node<E_OUT> evaluate(Spliterator<P_IN> spliterator, ...)
(工兵/Worker): 这是一个低层次的、专用的执行方法。它只有一个非常具体的目标:将流中的所有元素收集到一个内部数据结构Node
中。它是一个“工具”方法,被那些需要先把所有元素聚合起来才能进行下一步操作的终端操作所使用。
evaluate(TerminalOp<E_OUT, R> terminalOp)
- 通用执行引擎
// ... existing code ...final <R> R evaluate(TerminalOp<E_OUT, R> terminalOp) {assert getOutputShape() == terminalOp.inputShape();if (linkedOrConsumed)throw new IllegalStateException(MSG_STREAM_LINKED);linkedOrConsumed = true;return isParallel()? terminalOp.evaluateParallel(this, sourceSpliterator(terminalOp.getOpFlags())): terminalOp.evaluateSequential(this, sourceSpliterator(terminalOp.getOpFlags()));}
// ... existing code ...
-
职责:
- 接收任务: 它的唯一参数
TerminalOp<E_OUT, R>
是一个接口,代表了一个完整的终端操作。像count()
,collect()
,reduce()
,forEach()
等操作,在内部都会被封装成一个实现了TerminalOp
接口的对象。这个对象知道如何进行求值、如何合并结果。 - 检查状态: 检查流是否已经被使用过 (
linkedOrConsumed
)。 - 决策与分派: 核心逻辑是
isParallel()
判断。- 如果流是并行的,它就调用
terminalOp.evaluateParallel(...)
。 - 如果流是顺序的,它就调用
terminalOp.evaluateSequential(...)
。
- 如果流是并行的,它就调用
- 返回最终结果: 它返回泛型
<R>
,也就是终端操作的最终用户可见结果。这个R
可以是任何类型,比如Long
(对于count
)、List<T>
(对于collect
)、void
(对于forEach
)。
- 接收任务: 它的唯一参数
-
总结: 这个方法是所有终端操作的总指挥。它连接了用户调用的
Stream.collect()
和实际执行该操作的TerminalOp
对象,并根据并行状态选择正确的执行路径。
evaluate(Spliterator<P_IN> spliterator, ...)
- 节点收集器
// ... existing code ...@Overridefinal <P_IN> Node<E_OUT> evaluate(Spliterator<P_IN> spliterator,boolean flatten,IntFunction<E_OUT[]> generator) {if (isParallel()) {// @@@ Optimize if op of this pipeline stage is a stateful opreturn evaluateToNode(this, spliterator, flatten, generator);}else {Node.Builder<E_OUT> nb = makeNodeBuilder(exactOutputSizeIfKnown(spliterator), generator);return wrapAndCopyInto(nb, spliterator).build();}}
// ... existing code ...
-
职责:
- 单一目标: 它的目标非常明确,就是执行流水线,并将所有输出元素收集到一个
Node
对象里。Node
是 Stream API 内部用来暂存元素集合的高效数据结构。 - 需要具体实现: 它需要一个
generator
(如String[]::new
) 来知道如何创建最终存放元素的数组。 - 返回中间结构: 它返回的是
Node<E_OUT>
,这是一个内部数据结构,而不是通常用户直接消费的最终结果。
- 单一目标: 它的目标非常明确,就是执行流水线,并将所有输出元素收集到一个
-
总结: 这个方法是一个专用工具,用于将流“物化”到内存中。它本身不是一个完整的终端操作,而是某些终端操作(比如
toArray()
)实现过程中的一个步骤。
一个调用链的例子
让我们以 stream.toArray(String[]::new)
为例,看看它们是如何协同工作的:
-
用户调用:
stream.toArray(String[]::new)
。 -
创建
TerminalOp
:toArray
方法内部会创建一个TerminalOp
的实例。这个TerminalOp
的逻辑大致是:“请给我一个包含所有流元素的Node
,然后我会用这个Node
和传入的String[]::new
来创建一个String
数组作为最终结果”。 -
调用“将军”:
toArray
方法接着会调用高层evaluate(theToArrayTerminalOp)
。 -
“将军”做决策:
evaluate(TerminalOp)
检查isParallel()
。我们假设是顺序执行。 -
“将军”下令: 它调用
theToArrayTerminalOp.evaluateSequential(this, spliterator)
。 -
TerminalOp
的实现:evaluateSequential
的实现需要一个Node
。于是,它内部就会调用低层的、作为“工兵”的evaluate(spliterator, true, generator)
。 -
“工兵”干活: 这个低层
evaluate
方法启动流水线,将所有元素收集到一个Node
中,并返回这个Node
。 -
完成任务:
TerminalOp
拿到Node
后,从中提取元素,创建用户期望的String[]
数组。 -
返回结果: 最终的数组被层层返回,直到最初的用户调用处。
这两个方法的设计体现了优秀的关注点分离(Separation of Concerns):一个负责顶层流程控制,另一个负责具体的数据聚合任务,使得整个 Stream 执行框架既灵活又清晰。
toArray实现没有使用 teminalOp,而是直接调用evaluateToArrayNode,这个会间接调用
evaluate(Spliterator<P_IN> spliterator, ...)
对于其它op,基本只需要直接调用 wrapAndCopyInto,比如ReduceOp的任务是将流中的所有元素聚合成一个单一的值,它不需要在内存中保留所有元素,可以逐个处理元素,并不断更新一个累加器。
Node<E_OUT> evaluate(Spliterator<P_IN> spliterator, ...)
evaluate
的核心作用是启动流水线并将其所有输出元素收集到一个内部数据结构 Node
中。Node
是 Stream API 用于在内存中高效存储一系列元素的内部表示,它可以是一个简单的数组包装,也可以是一个更复杂的树形结构(在并行计算中使用)。
此方法是许多终端操作(如 toArray()
, reduce
的部分形式)的最终执行逻辑。它的设计思想是根据流的并行/顺序状态,分派到两种截然不同的执行策略:
- 顺序执行:逻辑相对简单,创建一个
Node.Builder
(它本身也是一个Sink
),然后利用我们之前分析过的wrapAndCopyInto
机制,将所有元素推入Builder
中,最后构建出Node
。 - 并行执行:逻辑复杂得多,它将任务委托给一个抽象的
evaluateToNode
方法。该方法内部会利用 Fork/Join 框架,将数据源Spliterator
分割成多个部分,并行处理,然后将各个部分的结果(通常是多个Node
)合并成一个最终的Node
。
final <P_IN> Node<E_OUT> evaluate(Spliterator<P_IN> spliterator,boolean flatten,IntFunction<E_OUT[]> generator)
final
: 此方法不可被子类重写,是框架的核心稳定部分。<P_IN>
: 方法级泛型,代表输入Spliterator
的元素类型。Node<E_OUT>
: 返回值。E_OUT
是当前Pipeline
阶段的输出类型。返回一个包含所有流元素的Node
。Spliterator<P_IN> spliterator
: 数据源。boolean flatten
: 一个标志。在并行计算中,结果可能是一个Node
树。如果flatten
为true
,则要求返回的Node
是一个扁平化的结构(即一个包含所有元素的单一数组),而不是树。IntFunction<E_OUT[]> generator
: 一个函数,用于创建指定类型的数组,例如String[]::new
。这是 Java 泛型数组创建的标准模式。
evaluate
的实现是一个清晰的 if-else
分支:
// ... existing code ...@Overridefinal <P_IN> Node<E_OUT> evaluate(Spliterator<P_IN> spliterator,boolean flatten,IntFunction<E_OUT[]> generator) {if (isParallel()) {// @@@ Optimize if op of this pipeline stage is a stateful opreturn evaluateToNode(this, spliterator, flatten, generator);}else {Node.Builder<E_OUT> nb = makeNodeBuilder(exactOutputSizeIfKnown(spliterator), generator);return wrapAndCopyInto(nb, spliterator).build();}}
// ... existing code ...
顺序执行路径
这是相对简单的路径,我们先分析它。
Node.Builder<E_OUT> nb = makeNodeBuilder(exactOutputSizeIfKnown(spliterator), generator);
return wrapAndCopyInto(nb, spliterator).build();
第 1 步: exactOutputSizeIfKnown(spliterator)
- 计算确切大小
此方法尝试在流开始处理前,预先计算出最终会输出多少个元素。这是一个重要的优化,如果大小已知,Node.Builder
就可以一次性分配足够大的内存,避免后续动态扩容。
exactOutputSizeIfKnown
:int flags = getStreamAndOpFlags();
获取当前流水线的所有标志。long size = StreamOpFlag.SIZED.isKnown(flags) ? spliterator.getExactSizeIfKnown() : -1;
- 检查流水线是否具有
SIZED
特性。像从ArrayList
创建的流就有这个特性。 - 如果有,就调用
spliterator.getExactSizeIfKnown()
获取源头大小。 - 如果没有,大小未知,返回
-1
。
- 检查流水线是否具有
if (size != -1 && StreamOpFlag.SIZE_ADJUSTING.isKnown(flags) && !isParallel())
- 这是一个针对顺序流的进一步计算。如果源大小已知,并且流水线中有会调整大小的操作(
SIZE_ADJUSTING
,例如limit()
),则需要逐个阶段计算最终大小。 for (AbstractPipeline<?, ?, ?> stage = sourceStage.nextStage; ...)
: 循环遍历从源之后到当前的所有中间操作。size = stage.exactOutputSize(size);
: 每个阶段都会根据自己的逻辑调整大小。例如,limit(10)
的exactOutputSize
实现就会返回Math.min(previousSize, 10)
。
- 这是一个针对顺序流的进一步计算。如果源大小已知,并且流水线中有会调整大小的操作(
- 最终返回计算出的
size
或-1
。
第 2 步: makeNodeBuilder(...)
- 创建节点构建器
这是一个 abstract
方法,其具体实现由子类(如 ReferencePipeline
, IntPipeline
)提供。
- 递归分析
makeNodeBuilder
:- 它接收上一步计算出的
exactSizeIfKnown
和数组生成器generator
。 - 在
ReferencePipeline
中,它会调用Nodes.builder(exactSizeIfKnown, generator)
。 Nodes.builder
会根据exactSizeIfKnown
的值,决定是创建一个固定大小的Builder
还是一个可动态扩容的Builder
。- 这个
Node.Builder
同时实现了Sink
接口,所以它可以作为数据流的目的地。它的accept
方法就是将接收到的元素添加到内部的存储中。
- 它接收上一步计算出的
第 3 步: wrapAndCopyInto(nb, spliterator).build()
- 封装、执行、构建
这是执行的核心。
-
递归分析
wrapAndCopyInto
:wrapSink(nb)
: 调用wrapSink
,将Node.Builder
(nb
) 从后向前用流水线中的每个中间操作逻辑进行包装。例如,map(f).filter(p)
,会先用filter
的逻辑包装nb
,再用map
的逻辑包装filter
后的Sink
。返回一个包含了所有操作的wrappedSink
。copyInto(wrappedSink, spliterator)
: 调用copyInto
,它会启动数据流,将spliterator
的数据推送到wrappedSink
。- 返回
nb
:wrapAndCopyInto
执行完毕后,返回最初的、未被包装的Node.Builder
实例nb
。此时,nb
内部已经包含了所有处理过的元素。
-
.build()
: 在nb
上调用build()
方法,它会完成构建过程(例如,将内部的动态数组裁剪到合适的大小),并返回一个最终的、不可变的Node
对象。
并行执行路径
if (isParallel()) {// @@@ Optimize if op of this pipeline stage is a stateful opreturn evaluateToNode(this, spliterator, flatten, generator);
}
当 isParallel()
返回 true
时,执行会进入此分支。
evaluateToNode(this, spliterator, flatten, generator)
- 并行求值
此方法是并行执行的核心,但它本身也是一个 abstract
方法。
- 递归分析
evaluateToNode
:- 为什么是抽象的? 因为针对不同数据类型(引用类型 vs. 原始类型
int
,long
,double
)的并行处理和数据存储方式有很大差异。原始类型流可以利用连续内存的数组进行高效计算,而引用类型流则不能。将此方法设为抽象,可以强制每个子类(ReferencePipeline
,IntPipeline
等)提供最高效的并行实现。 - 内部发生了什么? 以
ReferencePipeline
为例,它的evaluateToNode
实现大致如下:- 调用 Nodes.collect
- 大小已知路径 (Sized Path): 如果流的大小可以精确预知 (
size >= 0
) 并且源Spliterator
支持SUBSIZED
特性,系统会采取最高效的策略。它会预先分配一个最终大小的数组,然后启动一个SizedCollectorTask
(一个 Fork/Join 任务),让所有并行线程直接将结果写入到数组的指定位置。这避免了任何中间数据结构和后续的数据拷贝。 - 大小未知路径 (Unsized Path): 如果大小未知,系统会启动一个
CollectorTask
。这个任务会递归地分解,每个子任务都会生成一个Node
,然后通过conc
方法合并成一个ConcNode
树。最终返回的就是这个树的根节点。flattenTree
参数决定了是否需要将这棵树“压平”后再返回。
- 为什么是抽象的? 因为针对不同数据类型(引用类型 vs. 原始类型
总结
evaluate
方法是 Stream API 内部一个设计极为精巧的执行引擎入口。它通过一个简单的 isParallel()
判断,将执行流导向两个完全不同的世界:
- 顺序世界:清晰、线性、易于理解。通过
Sink
链的包装和forEachRemaining
的驱动,一步步完成数据处理。 - 并行世界:复杂、递归、高性能。通过抽象化和 Fork/Join 框架,将大规模数据处理任务分解、并行执行并合并结果。
它完美地体现了策略模式,根据上下文(并行或顺序)选择合适的算法来完成“将流元素收集到 Node
中”这一任务。对它的分析,可以帮助我们深入理解 Stream API 在不同模式下的核心工作原理。
Stream构建过程 wrapSink
final <P_IN> Sink<P_IN> wrapSink(Sink<E_OUT> sink)
final
: 这个方法是最终的,不允许任何子类(如ReferencePipeline
,IntPipeline
等)重写。这表明它是 Stream 框架的核心、稳定且不可更改的机制。<P_IN>
: 这是一个泛型参数。它代表了经过所有包装后,最终返回的那个Sink
所能接受的元素类型。这个类型实际上是当前流水线阶段的上一个阶段的输出类型。Sink<P_IN>
: 这是返回类型。它返回一个Sink
,这个Sink
已经将当前AbstractPipeline
节点以及它之前的所有节点的操作逻辑都“包裹”起来了。Sink<E_OUT> sink
: 这是输入参数。它接收一个Sink
。E_OUT
是当前AbstractPipeline
节点的输出类型。这个传入的sink
通常是流水线中下一个阶段的Sink
,或者是终端操作(Terminal Operation)提供的最终Sink
。
一句话概括:wrapSink
方法接收一个用于处理本阶段输出的 Sink
,然后返回一个能够处理本阶段输入的、经过层层包装的全新 Sink
。
wrapSink
的核心思想是操作融合(Operation Fusion)和责任链模式的反向构建。
我们知道 Stream 的中间操作是懒加载的。当写下 stream.filter(...).map(...).sorted(...)
时,数据并没有开始流动。只是在构建一个处理步骤的“配方”(AbstractPipeline
链表)。
当一个终端操作(如 forEach
, collect
)被调用时,Stream 需要开始处理数据。但它不是低效地让数据流过 filter
,把结果存起来,再流过 map
... 而是希望一次性完成所有操作。
wrapSink
就是实现这个“一次性完成”的关键。它的工作流程如下:
- 从流水线的末端(最靠近终端操作的那个中间操作)开始。
- 接收终端操作提供的“最终 Sink”(比如,
Collectors.toList()
会提供一个将元素添加到 List 的 Sink)。 - 将这个 Sink 用当前阶段的操作逻辑(比如
map
的转换逻辑)包装起来,生成一个新的 Sink。 - 拿着这个新生成的 Sink,移动到前一个流水线阶段。
- 用前一个阶段的操作逻辑(比如
filter
的过滤逻辑)再次包装。 - ...如此循环,直到回到数据源头。
最终会得到一个“俄罗斯套娃”式的 Sink
,它内部包含了所有中间操作的逻辑。当数据从源头(Spliterator
)取出后,只需调用这个最终 Sink
的 accept
方法,数据就会像穿过一根融合后的管道一样,瞬间完成所有处理。
现在我们来看具体的实现,它非常精炼:
// ... existing code ...@Override@SuppressWarnings("unchecked")final <P_IN> Sink<P_IN> wrapSink(Sink<E_OUT> sink) {Objects.requireNonNull(sink);for ( @SuppressWarnings("rawtypes") AbstractPipeline p=AbstractPipeline.this; p.depth > 0; p=p.previousStage) {sink = p.opWrapSink(p.previousStage.combinedFlags, sink);}return (Sink<P_IN>) sink;}
// ... existing code ...
-
Objects.requireNonNull(sink);
- 标准的非空检查,确保传入的下游
Sink
是有效的。
- 标准的非空检查,确保传入的下游
-
for ( AbstractPipeline p=AbstractPipeline.this; p.depth > 0; p=p.previousStage)
- 初始化:
AbstractPipeline p = AbstractPipeline.this
。循环从当前AbstractPipeline
实例开始。在实际调用中,this
通常是流水线的最后一个中间操作。 - 循环条件:
p.depth > 0
。depth
属性记录了当前阶段距离源头的“深度”。源头(Head)的depth
是 0。所以这个循环会一直持续,直到回溯到源头为止。 - 迭代:
p = p.previousStage
。在每次循环后,p
指向链表中的上一个节点,实现了从后往前的遍历。
- 初始化:
-
sink = p.opWrapSink(p.previousStage.combinedFlags, sink);
- 这是循环的核心。它调用了当前阶段
p
的opWrapSink
方法。 opWrapSink
是一个抽象方法,必须由具体的中间操作(如map
,filter
的匿名内部类)来实现。- 它将当前的
sink
(也就是下游操作的Sink
)传进去。 opWrapSink
的实现会返回一个新的、包装后的Sink
。- 这个新的
Sink
被重新赋值给sink
变量,用于下一次循环(即交给上一个阶段继续包装)。
- 这是循环的核心。它调用了当前阶段
-
return (Sink<P_IN>) sink;
- 当循环结束时,
sink
变量所持有的已经是被流水线上所有中间操作层层包裹后的最终Sink
。 - 它被强制转换为
Sink<P_IN>
并返回。此时P_IN
对应的是第一个中间操作的输入类型。
- 当循环结束时,
关键交互:opWrapSink
方法
wrapSink
只是一个驱动循环,真正的魔法发生在每个操作对 opWrapSink
的具体实现中。我们来看几个例子(以 LongPipeline
为例,原理相通):
示例 1: peek()
操作 peek
的作用是在元素流过时执行一个动作,但不对元素做任何修改。
// ... existing code ...@OverrideSink<Long> opWrapSink(int flags, Sink<Long> sink) {return new Sink.ChainedLong<>(sink) {@Overridepublic void accept(long t) {action.accept(t); // 执行 peek 的动作downstream.accept(t); // 将原始元素传递给下游}};}// Sink.javaabstract static class ChainedLong<E_OUT> implements Sink.OfLong {protected final Sink<? super E_OUT> downstream;public ChainedLong(Sink<? super E_OUT> downstream) {this.downstream = Objects.requireNonNull(downstream);}@Overridepublic void begin(long size) {downstream.begin(size);}@Overridepublic void end() {downstream.end();}@Overridepublic boolean cancellationRequested() {return downstream.cancellationRequested();}}
// ... existing code ...
opWrapSink
返回了一个新的 Sink
。当这个新 Sink
的 accept
方法被调用时,它先执行 peek
的 action
,然后原封不动地调用下游 sink
(下游sink通过构造函数 赋值给 downstream
)的 accept
方法。
示例 2: filter()
操作
// ... existing code ...@OverrideSink<Long> opWrapSink(int flags, Sink<Long> sink) {return new Sink.ChainedLong<>(sink) {
// ... existing code ...@Overridepublic void accept(long t) {if (predicate.test(t)) // 执行过滤downstream.accept(t); // 满足条件才传递给下游}};}
// ... existing code ...
filter
的 opWrapSink
实现中,只有当元素 t
满足 predicate
条件时,才会调用下游 sink
的 accept
方法。
示例 3: map()
操作
// ... existing code ...@OverrideSink<Long> opWrapSink(int flags, Sink<Long> sink) {return new Sink.ChainedLong<>(sink) {@Overridepublic void accept(long t) {downstream.accept(mapper.applyAsLong(t)); // 将转换后的元素传递给下游}};}
// ... existing code ...
map
的 opWrapSink
实现中,它先用 mapper
对元素 t
进行转换,然后将转换后的结果传递给下游 sink
。
调用时机与完整流程
wrapSink
通常在终端操作的 evaluate
方法中被间接调用,例如通过 wrapAndCopyInto
。
一个完整的流程是这样的:
- 构建:
Stream.of(1,2,3).filter(i -> i > 1).map(i -> i * 2)
。这会创建一个AbstractPipeline
的链表。 - 触发: 调用终端操作
.forEach(System.out::println)
。 - 准备执行:
forEach
操作的evaluate
方法被调用。它会创建一个最终的Sink
,这个Sink
的accept
方法就是System.out::println
。 - 包装 Sink:
evaluate
方法内部会调用wrapSink(finalSink)
。wrapSink
从map
阶段开始。它调用map
的opWrapSink
,传入finalSink
。返回一个mapSink
,其accept
方法会执行i -> finalSink.accept(i * 2)
。wrapSink
移动到filter
阶段。它调用filter
的opWrapSink
,传入上一步得到的mapSink
。返回一个filterSink
,其accept
方法会执行i -> { if (i > 1) mapSink.accept(i); }
。
- 执行: 循环结束,
wrapSink
返回了最终的filterSink
。 - 数据流动: 系统开始从源
Spliterator
中获取数据(1, 2, 3),并逐个调用filterSink.accept()
。filterSink.accept(1)
->1 > 1
为 false,什么也不做。filterSink.accept(2)
->2 > 1
为 true,调用mapSink.accept(2)
-> 调用finalSink.accept(2 * 2)
-> 打印 4。filterSink.accept(3)
->3 > 1
为 true,调用mapSink.accept(3)
-> 调用finalSink.accept(3 * 2)
-> 打印 6。
总结
wrapSink
是 Stream API 实现高性能的核心机制之一。它不是一个简单的工具方法,而是将声明式的操作链表转换为高效的、融合的执行计划的“编译器”。
通过从后向前的遍历和责任链模式的运用,它将多个独立的中间操作逻辑“编织”到一个单一的 Sink
对象中,使得数据可以在一次遍历中完成所有处理,极大地减少了中间状态的存储和方法调用的开销。理解了 wrapSink
,就理解了 Java Stream 运行时的精髓。
copyIntoWithCancel
这个方法是处理 短路操作(Short-Circuiting Operations) 的核心,比如 findFirst
, anyMatch
, limit
等。
在 Stream 流水线中,大部分操作(如 map
, filter
)需要处理完所有元素。但有些操作希望能提前终止,一旦满足某个条件就不再处理后续元素,这就是“短路”。
findFirst()
: 找到第一个元素后就应该立即停止。limit(n)
: 处理完 n 个元素后就应该立即停止。anyMatch(p)
: 找到任何一个匹配的元素后就应该立即停止。
copyIntoWithCancel
的核心作用就是为支持短路操作提供一个高效的数据遍历和推送机制。
它的设计思想是:在数据从源头(Spliterator
)流向 Sink
的过程中,每处理完一个元素,就通过 Sink.cancellationRequested()
方法检查下游是否发出了“取消”信号。如果收到了取消信号,就立即停止遍历,不再从 Spliterator
中拉取更多的数据。
这与非短路操作的 copyInto
方法形成对比,后者通常会使用 spliterator.forEachRemaining(wrappedSink)
一次性将所有元素推送给 Sink
,效率更高,但无法中途停止。
final <P_IN> boolean copyIntoWithCancel(Sink<P_IN> wrappedSink, Spliterator<P_IN> spliterator)
final
: 此方法不可被子类重写,是框架的核心稳定部分。<P_IN>
: 泛型参数,代表wrappedSink
能接受的元素类型,也就是数据源Spliterator
提供的元素类型。boolean
: 返回值。返回true
表示遍历是因为收到了取消请求而提前终止的;返回false
表示遍历是正常完成的(所有元素都被处理了)。Sink<P_IN> wrappedSink
: 经过wrapSink
方法包装后的最终Sink
。它内部已经融合了流水线上所有中间操作的逻辑。Spliterator<P_IN> spliterator
: 数据源的Spliterator
。copyIntoWithCancel
将从它这里拉取数据。
代码分析
// ... existing code ...@Override@SuppressWarnings("unchecked")final <P_IN> boolean copyIntoWithCancel(Sink<P_IN> wrappedSink, Spliterator<P_IN> spliterator) {@SuppressWarnings("rawtypes")AbstractPipeline p = AbstractPipeline.this;while (p.depth > 0) {p = p.previousStage;}wrappedSink.begin(spliterator.getExactSizeIfKnown());boolean cancelled = p.forEachWithCancel(spliterator, wrappedSink);wrappedSink.end();return cancelled;}
// ... existing code ...
-
AbstractPipeline p = AbstractPipeline.this;
while (p.depth > 0) { p = p.previousStage; }
- 这段代码的目的是找到流水线的源头阶段(Source Stage)。
AbstractPipeline
实例构成一个链表,depth
属性表示当前阶段距离源头的距离。源头的depth
为 0。- 通过
p = p.previousStage
不断回溯,循环结束后,变量p
就指向了链表的头部,即代表数据源的那个AbstractPipeline
实例。 - 为什么需要找到源头? 因为遍历的逻辑(特别是针对不同数据类型,如
int
,long
,Object
的遍历)是与源头的StreamShape
相关的。forEachWithCancel
是一个抽象方法,其具体实现位于ReferencePipeline
,IntPipeline
等具体的Pipeline
子类中,而调用哪个实现版本取决于源头的类型。
-
wrappedSink.begin(spliterator.getExactSizeIfKnown());
- 这是
Sink
协议的一部分。在开始向Sink
推送数据之前,必须调用begin()
方法。 - 它会通知
Sink
准备接收数据,并可选地告知预计的元素数量。对于短路操作,这个数量可能不准,但协议要求必须调用。
- 这是
-
boolean cancelled = p.forEachWithCancel(spliterator, wrappedSink);
- 这是整个方法的核心。
- 它调用了源头阶段
p
的forEachWithCancel
方法。 forEachWithCancel
是一个抽象方法,由具体的Pipeline
实现(如ReferencePipeline
)提供。它的职责是:- 从
spliterator
中逐个拉取元素。 - 将每个元素传递给
wrappedSink.accept()
。 - 在处理完每个元素后,检查
wrappedSink.cancellationRequested()
。 - 如果返回
true
,则立即停止遍历,并向上返回true
。
- 从
cancelled
变量记录了遍历是否被提前取消。
-
wrappedSink.end();
Sink
协议的另一部分。在所有数据推送完毕(无论是正常结束还是被取消)后,必须调用end()
方法。- 这会通知
Sink
数据流结束,可以进行一些最终处理,比如limit
操作可以丢弃多余的元素,findFirst
可以标记已找到结果。
-
return cancelled;
- 将
forEachWithCancel
的结果返回,告知上游调用者(通常是终端操作的evaluate
方法)数据处理是否被短路了。
- 将
forEachWithCancel
方法
copyIntoWithCancel
将核心的遍历逻辑委托给了 forEachWithCancel
。我们来看一下 ReferencePipeline
中 forEachWithCancel
的典型实现:
// In ReferencePipeline.java
@Override
final boolean forEachWithCancel(Spliterator<E_OUT> spliterator, Sink<E_OUT> sink) {boolean cancelled = false;// tryAdvance returns false when there are no more elements// sink.cancellationRequested() returns true when short-circuiting is requestedwhile (!cancelled && spliterator.tryAdvance(e -> sink.accept(e))) {cancelled = sink.cancellationRequested();}return cancelled;
}
这段代码非常清晰地展示了短路机制:
while
循环的条件是!cancelled
并且spliterator.tryAdvance(...)
。spliterator.tryAdvance(e -> sink.accept(e))
:尝试从Spliterator
获取一个元素,如果成功,就立即通过 lambda 表达式将其传递给sink.accept()
。如果Spliterator
中没有更多元素了,tryAdvance
返回false
,循环正常结束。cancelled = sink.cancellationRequested();
:在accept
调用之后,立刻检查Sink
是否请求取消。如果limit(5)
的Sink
已经收到了 5 个元素,它的cancellationRequested()
就会返回true
。cancelled
变量被设为true
后,下一次while
循环的条件!cancelled
就不满足了,循环终止,即使Spliterator
中还有很多元素。
调用时机与完整流程
copyIntoWithCancel
在 copyInto
方法内部被调用,而 copyInto
又是在终端操作的 evaluate
方法中被触发的。
完整流程示例:Stream.of(1,2,3,4,5).limit(2).forEach(...)
- 构建: 创建
AbstractPipeline
链表。 - 触发: 调用终端操作
forEach
。 - 包装 Sink:
forEach
内部调用wrapSink
,将limit(2)
的逻辑和forEach
的逻辑包装成一个wrappedSink
。limit(2)
的Sink
会有一个计数器。它的cancellationRequested()
在计数器达到 2 之前返回false
,达到 2 之后返回true
。
- 调用 copyInto:
forEach
的evaluate
方法调用wrapAndCopyInto
,后者再调用copyInto
。 - 进入短路分支:
copyInto
内部检查到StreamOpFlag.SHORT_CIRCUIT
标志位(由limit
操作设置),于是调用copyIntoWithCancel(wrappedSink, spliterator)
。 - 执行
copyIntoWithCancel
:- 找到源头阶段。
- 调用
wrappedSink.begin()
。 - 调用源头阶段的
forEachWithCancel
。 forEachWithCancel
开始循环:tryAdvance
获取元素1
,调用wrappedSink.accept(1)
。limit
的Sink
计数器变为 1。cancellationRequested()
返回false
。tryAdvance
获取元素2
,调用wrappedSink.accept(2)
。limit
的Sink
计数器变为 2。cancellationRequested()
现在返回true
。cancelled
变量被设为true
。while
循环终止。
forEachWithCancel
返回true
。- 调用
wrappedSink.end()
。 copyIntoWithCancel
返回true
。
总结
copyIntoWithCancel
是 Stream API 实现**短路(short-circuiting)**操作的关键枢纽。它通过一个清晰的协议——在推送每个元素后检查 Sink
的取消状态——实现了对数据处理流程的提前终止。
它本身并不执行遍历,而是负责:
- 定位到正确的
Pipeline
实现(源头阶段)。 - 遵循
Sink
的begin/end
协议。 - 委托带有取消检查的遍历任务给特定于数据类型的
forEachWithCancel
方法。
这种设计将框架流程控制(copyIntoWithCancel
)与具体遍历逻辑(forEachWithCancel
)解耦,是 Stream 内部优雅设计的一个典范。
为什么通过循环找源头?sourceStage
不是保存了吗?
原因在于代码的演进、分段执行(Pipeline Slicing)的复杂性以及对 depth
和 combinedFlags
字段的重新计算。
让我们来看 sourceSpliterator
这个关键方法,它揭示了更深层次的原因:
// ... existing code ...@SuppressWarnings("unchecked")protected Spliterator<?> sourceSpliterator(int terminalFlags) {// ... (获取初始 spliterator) ...if (isParallel() && hasAnyStateful()) {// Adapt the source spliterator, evaluating each stateful op// in the pipeline up to and including this pipeline stage.// The depth and flags of each pipeline stage are adjusted accordingly.int depth = 1;for (@SuppressWarnings("rawtypes") AbstractPipeline u = sourceStage, p = sourceStage.nextStage, e = this;u != e;u = p, p = p.nextStage) {int thisOpFlags = p.sourceOrOpFlags;if (p.opIsStateful()) {depth = 0;// ... (省略一些标志位操作) ...spliterator = p.opEvaluateParallelLazy(u, spliterator);// ... (省略一些标志位操作) ...}p.depth = depth++;p.combinedFlags = StreamOpFlag.combineOpFlags(thisOpFlags, u.combinedFlags);}}// ... (应用 terminalFlags) ...return spliterator;}
// ... existing code ...
这个方法揭示了一个非常重要的机制:在并行流遇到有状态操作(Stateful Operation, 如 sorted
, distinct
)时,流水线会被“切片”执行。
-
流水线切片(Pipeline Slicing): 当一个并行流包含有状态操作时,执行过程会被分为多个段(Segment)。每个有状态操作都会成为一个段的终点。前一个段会先被完整地执行(
opEvaluateParallelLazy
),其结果(一个新的Spliterator
)会成为下一个段的输入。 -
depth
和combinedFlags
的动态修改: 请注意sourceSpliterator
方法中的for
循环。在这个循环里,它会遍历从sourceStage
到当前阶段this
之间的所有 stage。当遇到一个有状态操作时,它会:- 立即执行到这个有状态操作为止的所有上游操作 (
spliterator = p.opEvaluateParallelLazy(u, spliterator);
)。 - 重置
depth
(depth = 0;
)。这意味着这个有状态操作成为了一个新的“源头”。 - 重新计算后续阶段的
depth
和combinedFlags
(p.depth = depth++; p.combinedFlags = ...
)。
- 立即执行到这个有状态操作为止的所有上游操作 (
在调用终端操作时,sourceSpliterator
方法可能会动态地修改流水线中各个阶段的内部状态,特别是 depth
字段。一个原本 depth > 0
的阶段,在经过 sourceSpliterator
的处理后,其 depth
可能变为 0
,因为它成了一个新分段的“源头”。
copyIntoWithCancel
中的那个 循环的真正目的,是为了找到当前执行上下文中的“有效源头”。
- 对于简单的顺序流或无状态并行流,
sourceStage
确实就是最终的源头,它的depth
始终是0
。此时,循环和直接使用sourceStage
效果一致。 - 但对于被切片后的有状态并行流,
this
所属的那个分段的“源头”可能不再是最初的sourceStage
,而是一个中间的有状态操作阶段。sourceSpliterator
方法已经把那个阶段的depth
修改为了0
。
因此,while (p.depth > 0)
这个循环是一个健壮的、不依赖于 sourceStage
字段的、根据当前(可能已被修改过的)depth
状态来查找当前分段源头的机制。它从 this
开始,沿着 previousStage
链回溯,直到找到第一个 depth
为 0
的阶段——这才是它当前需要依赖的那个“源头”,并调用它的 forEachWithCancel
方法。
copyInto
它是 Stream 流水线执行的核心驱动方法之一,负责将数据从源头(Spliterator
)推送至最终的 Sink
。
copyInto
的核心作用是启动并管理整个数据流的处理过程。在 Stream 的懒加载模型中,所有的中间操作都只是构建了一个操作链(AbstractPipeline
),并没有实际处理数据。当终端操作被调用时,就需要一个机制来“启动引擎”,让数据真正地流动起来。copyInto
就是这个引擎的启动器。
它的设计思想是区分对待两种不同类型的流水线:
- 非短路流水线(Non-Short-Circuit): 对于那些需要处理所有元素的操作(如
map
,filter
,collect
),copyInto
会采用最高效的方式,即调用spliterator.forEachRemaining(wrappedSink)
,一次性地将所有数据从Spliterator
推送到Sink
。 - 短路流水线(Short-Circuit): 对于那些可能提前结束的操作(如
findFirst
,anyMatch
,limit
),copyInto
会将任务委托给一个专门的方法copyIntoWithCancel
。这个方法会逐个处理元素,并在每一步检查是否需要提前终止。
通过这种方式,copyInto
充当了一个分发器(Dispatcher),根据流水线的特性(是否包含短路操作)选择最优的执行策略。
final <P_IN> void copyInto(Sink<P_IN> wrappedSink, Spliterator<P_IN> spliterator)
final
: 此方法不可被子类重写,是框架的核心稳定部分。<P_IN>
: 泛型参数,代表wrappedSink
能接受的元素类型,也就是数据源Spliterator
提供的元素类型。void
: 这个方法没有返回值。它的职责是执行一个动作(将数据从 spliterator 拷贝到 sink),而不是计算一个结果。最终的结果是由Sink
自身来构建和持有的。Sink<P_IN> wrappedSink
: 经过wrapSink
方法包装后的最终Sink
。它内部已经融合了流水线上所有中间操作的逻辑。Spliterator<P_IN> spliterator
: 数据源的Spliterator
。copyInto
将从它这里拉取数据。
代码解析
// ... existing code ...@Overridefinal <P_IN> void copyInto(Sink<P_IN> wrappedSink, Spliterator<P_IN> spliterator) {Objects.requireNonNull(wrappedSink);if (!StreamOpFlag.SHORT_CIRCUIT.isKnown(getStreamAndOpFlags())) {wrappedSink.begin(spliterator.getExactSizeIfKnown());spliterator.forEachRemaining(wrappedSink);wrappedSink.end();}else {copyIntoWithCancel(wrappedSink, spliterator);}}
// ... existing code ...
-
Objects.requireNonNull(wrappedSink);
- 标准的非空检查,确保下游的
Sink
是有效的。
- 标准的非空检查,确保下游的
-
if (!StreamOpFlag.SHORT_CIRCUIT.isKnown(getStreamAndOpFlags()))
- 这是核心的判断逻辑。它检查整个流水线的
combinedFlags
中是否包含SHORT_CIRCUIT
标志。 getStreamAndOpFlags()
返回当前流水线所有操作标志的组合。StreamOpFlag.SHORT_CIRCUIT
是一个标志位,像limit()
,findFirst()
,anyMatch()
等操作会把它加入到流水线的标志中。- 如果不包含这个标志,说明这是一个需要处理所有元素的普通流水线,进入
if
分支。
- 这是核心的判断逻辑。它检查整个流水线的
-
非短路分支(
if
块)wrappedSink.begin(spliterator.getExactSizeIfKnown());
- 调用
Sink
的begin
方法,通知它数据即将开始流动。如果源Spliterator
的大小已知,就传递给Sink
,这有助于Sink
(比如Collectors.toList()
)预分配内存,提高效率。
- 调用
spliterator.forEachRemaining(wrappedSink);
- 这是最高效的数据推送方式。
forEachRemaining
会遍历Spliterator
中所有剩余的元素,并将每个元素传递给wrappedSink
的accept
方法。这个调用是阻塞的,直到所有元素处理完毕。
- 这是最高效的数据推送方式。
wrappedSink.end();
- 所有元素都处理完后,调用
Sink
的end
方法,通知它数据流结束,可以进行最后的处理(比如Collector
的finisher
操作)。
- 所有元素都处理完后,调用
-
短路分支(
else
块)else { copyIntoWithCancel(wrappedSink, spliterator); }
- 如果流水线中包含
SHORT_CIRCUIT
标志,copyInto
不会自己处理,而是将所有参数(wrappedSink
和spliterator
)直接委托给copyIntoWithCancel
方法。 copyIntoWithCancel
内部实现了逐个元素处理并检查取消信号的逻辑,我们之前已经详细分析过它。
调用时机与完整流程
copyInto
通常由 wrapAndCopyInto
方法调用,而 wrapAndCopyInto
是终端操作(如 forEach
, reduce
, collect
)执行其 evaluateSequential
逻辑时的最终步骤。
完整流程示例:Stream.of(1, 2, 3).map(i -> i * 2).collect(Collectors.toList())
- 构建: 创建
AbstractPipeline
链表 (source
->map
)。 - 触发: 调用终端操作
collect
。 - 执行:
collect
的evaluateSequential
方法被调用。 - 准备 Sink:
collect
方法会创建一个Sink
,这个Sink
的逻辑是把元素添加到一个ArrayList
中。 - 调用
wrapAndCopyInto
:evaluateSequential
内部调用wrapAndCopyInto(listSink, spliterator)
。 - 包装 Sink:
wrapAndCopyInto
首先调用wrapSink(listSink)
。wrapSink
会将listSink
用map
操作的逻辑包装起来,返回一个mapSink
。mapSink.accept(i)
的效果是listSink.accept(i * 2)
。 - 调用
copyInto
:wrapAndCopyInto
接着调用copyInto(mapSink, spliterator)
。 - 进入非短路分支:
copyInto
检查流水线标志,发现没有SHORT_CIRCUIT
。- 调用
mapSink.begin(...)
。 - 调用
spliterator.forEachRemaining(mapSink)
。forEachRemaining
从spliterator
取出1
,调用mapSink.accept(1)
->listSink.accept(2)
。forEachRemaining
从spliterator
取出2
,调用mapSink.accept(2)
->listSink.accept(4)
。forEachRemaining
从spliterator
取出3
,调用mapSink.accept(3)
->listSink.accept(6)
。
- 调用
mapSink.end()
。
- 调用
- 返回结果:
wrapAndCopyInto
返回最初的listSink
,collect
操作从这个Sink
中获取最终的List
结果[2, 4, 6]
。
总结
copyInto
是连接 Stream 声明式构建和命令式执行的关键桥梁。它位于执行路径的核心位置,扮演着一个智能调度员的角色。
- 它通过检查
SHORT_CIRCUIT
标志,为流水线选择了最优的执行路径。 - 对于非短路操作,它使用
forEachRemaining
进行高效的批量处理。 - 对于短路操作,它委托给专门的
copyIntoWithCancel
方法进行精细的、可中断的控制。
理解 copyInto
的分发逻辑,是理解 Stream 如何根据不同操作的特性来优化其执行过程的重要一步。
wrapAndCopyInto
wrapAndCopyInto
的核心作用是封装(Wrap)并执行(CopyInto)。它是许多终端操作(Terminal Operation)在顺序执行(Sequential Execution)时的入口点。
当一个终端操作(如 reduce
, collect
, forEach
)被调用时,它提供了一个最终的“目的地”——Sink
,这个 Sink
知道如何处理最终的元素并产生结果。但是,数据从源头流到这个最终 Sink
之前,需要经过所有中间操作(如 map
, filter
)的处理。
wrapAndCopyInto
的设计思想就是:
- 封装 (Wrap): 接收终端操作提供的原始
Sink
,然后从流水线的末端开始,反向用每一个中间操作的逻辑去“包装”这个Sink
,最终形成一个包含了所有操作逻辑的、完整的wrappedSink
。 - 执行 (CopyInto): 将这个封装好的
wrappedSink
和数据源Spliterator
交给copyInto
方法,由copyInto
启动数据流,将数据从源头一个个推送到wrappedSink
中进行处理。
final <P_IN, S extends Sink<E_OUT>> S wrapAndCopyInto(S sink, Spliterator<P_IN> spliterator)
<P_IN>
: 方法级的泛型参数。代表整个流水线输入端的元素类型,即源头Spliterator
提供的元素类型。<S extends Sink<E_OUT>>
: 方法级的泛型参数。S
代表传入的sink
参数的具体类型。E_OUT
是AbstractPipeline
类的泛型参数,代表当前流水线阶段的输出元素类型。- 这个约束保证了传入的
sink
必须能消费当前阶段的输出。
S sink
: 参数,由终端操作提供,是数据流的最终目的地。例如,对于collect(Collectors.toList())
,它就是一个能将元素添加到列表中的Sink
。Spliterator<P_IN> spliterator
: 参数,数据源的Spliterator
。- 返回
S
: 方法返回传入的那个原始sink
对象。这一点非常重要,因为它允许调用方在wrapAndCopyInto
执行完毕后,能从这个sink
对象中提取最终结果(例如,通过调用sink.build()
或sink.get()
)。
代码解析
// ... existing code ...@Overridefinal <P_IN, S extends Sink<E_OUT>> S wrapAndCopyInto(S sink, Spliterator<P_IN> spliterator) {copyInto(wrapSink(Objects.requireNonNull(sink)), spliterator);return sink;}@Override
// ... existing code ...
这个方法的实现非常简洁,但每一步都至关重要,它将两个核心操作串联了起来:
-
wrapSink(Objects.requireNonNull(sink))
: 这是第一步,即“封装”阶段。Objects.requireNonNull(sink)
:确保传入的终端Sink
不为空。wrapSink(...)
:这是个关键的辅助方法。它会从当前Pipeline
阶段开始,沿着previousStage
链反向回溯,直到源头。在回溯的每一步,它都会调用当前阶段的opWrapSink
方法,将Sink
用当前操作的逻辑包装一层。- 比喻:想象一个俄罗斯套娃。
wrapSink
从最里面的小娃娃(终端Sink
)开始,一层层地往外套上更大的娃娃(中间操作的Sink
),最终返回最外层的那个大娃娃。这个大娃娃就是wrappedSink
。
-
copyInto(wrappedSink, spliterator)
: 这是第二步,即“执行”阶段。- 它接收上一步构建好的、包含了所有流水线逻辑的
wrappedSink
。 - 然后调用
copyInto
方法,将数据源spliterator
和wrappedSink
连接起来,启动数据流。 copyInto
内部会判断流水线是否包含短路操作,并选择合适的遍历策略(forEachRemaining
或copyIntoWithCancel
)。
- 它接收上一步构建好的、包含了所有流水线逻辑的
-
return sink;
:- 在
copyInto
执行完毕后(意味着所有数据都处理完了),方法返回最初传入的、未被包装的那个sink
。 - 调用者(终端操作)持有这个原始
sink
的引用,现在可以安全地从中获取计算结果了。
- 在
调用时机与完整流程
wrapAndCopyInto
主要在顺序执行的终端操作中被调用。一个典型的例子是 collect
操作。
完整流程示例:List.of("a", "b", "c").stream().map(String::toUpperCase).collect(Collectors.toList())
- 触发: 调用终端操作
collect
。 - 执行
evaluate
:collect
内部会调用evaluate(terminalOp)
。 - 选择路径:
evaluate
方法发现是顺序执行 (isParallel()
为 false),于是调用terminalOp.evaluateSequential(this, sourceSpliterator(...))
。 evaluateSequential
:CollectOp
(collect
的实现)的evaluateSequential
方法会创建一个用于收集元素的Sink
(我们称之为listSink
),然后调用helper.wrapAndCopyInto(listSink, spliterator)
。这里的helper
就是map
操作所在的Pipeline
实例。- 进入
wrapAndCopyInto
:wrapSink(listSink)
被调用:wrapSink
发现map
操作是它的上一个阶段。- 它调用
map
操作的opWrapSink
方法,用map
的逻辑(s -> s.toUpperCase()
)包装listSink
,生成一个mapSink
。 wrapSink
返回这个mapSink
。
copyInto(mapSink, spliterator)
被调用:copyInto
启动数据流。- 它从
spliterator
中取出 "a",交给mapSink.accept("a")
。mapSink
内部执行 "A",然后调用listSink.accept("A")
。 - 这个过程对 "b" 和 "c" 重复。
- 返回
listSink
:wrapAndCopyInto
执行完毕,将原始的listSink
返回给CollectOp
。
- 获取结果:
CollectOp
从listSink
中获取最终的列表["A", "B", "C"]
并返回。
总结
wrapAndCopyInto
是 Stream API 内部一个设计精巧的执行协调器。它本身不处理任何业务逻辑,而是通过精确地调用 wrapSink
和 copyInto
,完美地将操作链的构建和数据的实际流动解耦并衔接起来。
- 职责单一: 它的职责就是“封装然后执行”。
- 流程清晰:
wrapSink
负责准备处理器,copyInto
负责输送数据。 - 结果导向: 通过返回原始
Sink
,使得调用方能方便地获取最终结果。
理解 wrapAndCopyInto
的工作方式,是深入理解 Stream 顺序执行模型的关键。
evaluateToArrayNode
evaluateToArrayNode
的核心作用是作为 toArray
操作的专用执行引擎。它封装了将流转换为 Node
的所有逻辑,特别是针对并行流中包含有状态操作(stateful operation)的场景,它实现了一个重要的优化。
其设计思想是:
- 提供统一入口:为
toArray
提供一个比通用evaluate(TerminalOp)
更直接、更专门的入口点。 - 并行优化:识别出一种可以优化的特定并行场景(流水线末端是单个有状态操作),并为其提供一条“快速通道”,避免不必要的数据收集和复制步骤。
- 代码复用:在通用场景下,它会委托给更底层的
evaluate(spliterator, flatten, generator)
方法,复用其核心的“物化”逻辑。
final Node<E_OUT> evaluateToArrayNode(IntFunction<E_OUT[]> generator)
final
: 此方法不可被子类重写,是框架的核心稳定部分。Node<E_OUT>
: 返回值。E_OUT
是当前流水线阶段(也就是最后一个阶段)的输出元素类型。返回一个包含了所有流元素的Node
对象。IntFunction<E_OUT[]> generator
: 一个函数,用于创建指定类型的数组,例如String[]::new
。这是toArray
操作所必需的,因为它需要知道如何创建最终的数组容器。
代码逻辑深度解析
// ... existing code ...@SuppressWarnings("unchecked")final Node<E_OUT> evaluateToArrayNode(IntFunction<E_OUT[]> generator) {if (linkedOrConsumed)throw new IllegalStateException(MSG_STREAM_LINKED);linkedOrConsumed = true;// If the last intermediate operation is stateful then// evaluate directly to avoid an extra collection stepif (isParallel() && previousStage != null && opIsStateful()) {// Set the depth of this, last, pipeline stage to zero to slice the// pipeline such that this operation will not be included in the// upstream slice and upstream operations will not be included// in this slicedepth = 0;return opEvaluateParallel(previousStage, previousStage.sourceSpliterator(0), generator);}else {return evaluate(sourceSpliterator(0), true, generator);}}
// ... existing code ...
if (linkedOrConsumed)throw new IllegalStateException(MSG_STREAM_LINKED);
linkedOrConsumed = true;
这是所有终端操作的标准起始步骤。它确保一个流实例只能被消费一次,然后将 linkedOrConsumed
标志位置为 true
,防止后续操作。
特殊并行路径 (Optimized Parallel Path - if
块)
这是该方法中最精妙的部分,一个针对特定场景的性能优化。
if (isParallel() && previousStage != null && opIsStateful()) {// ...
}
触发条件:
isParallel()
: 流必须是并行的。previousStage != null
: 当前流水线至少有两个阶段(源+一个操作)。opIsStateful()
: 当前(也就是最后一个)中间操作必须是有状态的。例如sorted()
,distinct()
。
场景示例: stream.filter(...).sorted().toArray()
。当执行 toArray()
时,sorted()
就是那个“最后一个有状态的中间操作”。
为什么需要优化?
常规的并行执行模型遇到有状态操作时,会将其视为一个“屏障”。它会先并行执行屏障之前的所有操作,将结果收集到一个中间 Node
中,然后再对这个中间 Node
执行有状态操作。如果 toArray()
再走一遍这个流程,就会变成:
并行 filter -> 中间 Node1 -> 并行 sorted -> 中间 Node2 -> toArray 从 Node2 创建数组
这中间的 Node2
是多余的。我们完全可以直接让 sorted()
操作的结果直接写入最终的数组。
优化实现:
depth = 0;
- 这是一个非常巧妙的技巧。通过将当前阶段(
toArray
伪阶段)的深度设置为0,它有效地将流水线“切片”。toArray
之前的sorted()
操作现在被视为一个独立的、完整的流水线。
- 这是一个非常巧妙的技巧。通过将当前阶段(
return opEvaluateParallel(previousStage, previousStage.sourceSpliterator(0), generator);
opEvaluateParallel
是一个抽象方法,由有状态操作(如SortedOps
)自己实现。- 这个调用相当于对
sorted()
操作说:“请你用并行的方式执行,但不要把结果收集到你自己的中间Node
里,而是直接使用我(toArray
)提供的generator
来创建并填充最终的Node
。” - 这样,执行流就变成了:
并行 filter -> 中间 Node1 -> sorted() 直接并行排序并写入最终 Node -> toArray 从最终 Node 创建数组
- 这就避免了创建
Node2
的开销,减少了一次大规模的数据收集和可能的复制。
通用路径 (General Path - else
块)
如果不满足上述特殊并行场景的条件(例如,是顺序流,或者最后一个操作是无状态的如 map
),则会进入这个更通用的路径。
else {return evaluate(sourceSpliterator(0), true, generator);
}
实现:
sourceSpliterator(0)
: 获取流的源Spliterator
。参数0
表示终端操作本身没有附加的标志。evaluate(..., true, ...)
: 直接调用我们之前分析过的那个底层的“工兵”方法evaluate
。- 第一个参数: 数据源
Spliterator
。 - 第二个参数
flatten
: 设置为true
。这至关重要,它告诉evaluate
方法,即使在并行计算中产生了Node
树,也必须在返回前将其“拍平”为一个包含所有元素的、单一的、由数组支持的Node
。这正是toArray
所需要的结果。 - 第三个参数
generator
: 将toArray
提供的数组生成器传递下去,供evaluate
在创建最终Node
时使用。
- 第一个参数: 数据源
总结
evaluateToArrayNode
是一个精心设计的、专用于 toArray
的执行方法。它不仅仅是一个简单的包装,其核心价值在于:
- 识别并优化了一个关键的并行场景:通过“切片”流水线和调用
opEvaluateParallel
,它避免了在“有状态操作 +toArray
”组合下的重复数据收集,提升了性能。 - 无缝衔接通用逻辑:在不满足优化条件时,它能平滑地回退到通用的
evaluate
方法,复用其健壮的流物化能力,并确保通过flatten=true
参数获得toArray
所需的扁平化Node
结果。
这个方法的设计完美体现了在通用框架中嵌入专用优化的思想,是理解 Stream API 高性能实现的一个绝佳范例。
sourceStageSpliterator
sourceStageSpliterator()
的核心作用是为流管道的源头阶段(Source Stage)提供一个一次性的、安全的获取其 Spliterator
的方式。
其设计思想是:
- 所有权和封装:流的源
Spliterator
是整个管道的生命线。它被封装在源头阶段(sourceStage
)中,不能被管道中的任意阶段随意访问。 - 单一职责:此方法只做一件事——从源头阶段取出
Spliterator
。它不关心并行、有状态操作等复杂的流水线处理逻辑。 - 状态强制转换:调用此方法是一个明确的“消费”动作。一旦成功调用,流就被认为是已消费,其内部的
sourceSpliterator
或sourceSupplier
会被清空,防止重复消费。
final Spliterator<E_OUT> sourceStageSpliterator()
final
: 此方法不可被子类重写,保证其行为在整个框架中的一致性和稳定性。Spliterator<E_OUT>
: 返回值。E_OUT
在这里特指源头阶段的输出类型。它返回流最开始的那个Spliterator
。- 无参数: 该方法不需要任何外部输入,它的所有操作都基于当前
AbstractPipeline
实例的状态。
代码逻辑深度解析
// ... existing code ...final Spliterator<E_OUT> sourceStageSpliterator() {// Ensures that this method is only ever called on the sourceStageif (this != sourceStage)throw new IllegalStateException();if (linkedOrConsumed)throw new IllegalStateException(MSG_STREAM_LINKED);linkedOrConsumed = true;if (sourceSpliterator != null) {@SuppressWarnings("unchecked")Spliterator<E_OUT> s = (Spliterator<E_OUT>)sourceSpliterator;sourceSpliterator = null;return s;}else if (sourceSupplier != null) {@SuppressWarnings("unchecked")Spliterator<E_OUT> s = (Spliterator<E_OUT>)sourceSupplier.get();sourceSupplier = null;return s;}else {throw new IllegalStateException(MSG_CONSUMED);}}
// ... existing code ...
调用者身份验证
// Ensures that this method is only ever called on the sourceStage
if (this != sourceStage)throw new IllegalStateException();
这是此方法最关键的检查。sourceStage
字段在流水线构建时,会从第一个阶段一直传递到最后一个阶段,它始终指向流水线的源头。
this
: 代表调用sourceStageSpliterator()
方法的当前AbstractPipeline
对象。sourceStage
: 指向流管道的第一个阶段(源头)。
这个检查强制规定:只有源头阶段对象自己才能调用这个方法来获取 Spliterator
。管道中的任何中间阶段(如 map
, filter
)都无权调用此方法,这保证了数据源的封装性和安全性。
流状态检查
if (linkedOrConsumed)throw new IllegalStateException(MSG_STREAM_LINKED);
linkedOrConsumed = true;
这是标准的流消费检查。
if (linkedOrConsumed)
: 检查流是否已经被操作过或关闭。由于这个方法只能在源头阶段调用,这里的linkedOrConsumed
实际上检查的是源头阶段的状态。linkedOrConsumed = true
: 如果检查通过,立即将流标记为已消费。这是一个原子性的“检查并设置”操作,确保一旦Spliterator
被取出,流就不能再被用于任何其他操作。
Spliterator
提取逻辑
Stream 的源可以由一个 Spliterator
直接提供,也可以由一个 Supplier<Spliterator>
延迟提供。这部分代码处理了这两种情况。
-
Case 1: 直接提供
Spliterator
if (sourceSpliterator != null) {@SuppressWarnings("unchecked")Spliterator<E_OUT> s = (Spliterator<E_OUT>)sourceSpliterator;sourceSpliterator = null; // 清空引用,防止重复获取return s; }
如果
sourceSpliterator
字段不为空,说明流是由一个现成的Spliterator
创建的。代码会:- 将其强制转换为正确的泛型类型。
- 将
sourceSpliterator
字段置为null
。这是关键的“消费”动作,确保这个Spliterator
实例不会被再次取出。 - 返回取出的
Spliterator
。
-
Case 2: 延迟提供
Spliterator
else if (sourceSupplier != null) {@SuppressWarnings("unchecked")Spliterator<E_OUT> s = (Spliterator<E_OUT>)sourceSupplier.get();sourceSupplier = null; // 清空引用,防止重复获取return s; }
如果
sourceSupplier
字段不为空,说明流是由一个Supplier
创建的。代码会:- 调用
sourceSupplier.get()
来生成一个新的Spliterator
实例。 - 将
sourceSupplier
字段置为null
。同样是关键的“消费”动作,确保不能通过这个Supplier
再次生成Spliterator
。 - 返回新生成的
Spliterator
。
- 调用
-
Case 3: 异常情况
else {throw new IllegalStateException(MSG_CONSUMED); }
如果
sourceSpliterator
和sourceSupplier
都为null
,这意味着流已经被消费过了(它们的引用在上次被获取时已经被清空)。此时抛出异常,明确告知调用者流已被消费。
与 sourceSpliterator(int)
的对比
AbstractPipeline
中还有一个看起来很像的方法 sourceSpliterator(int terminalFlags)
。它们的主要区别在于:
-
sourceStageSpliterator()
(本方法):- 职责: 简单、纯粹地从源头阶段获取
Spliterator
。 - 调用者: 只能是源头阶段自己。
- 复杂度: 非常简单,不处理任何流水线逻辑。
- 用途: 主要被
spliterator()
这个终端操作的实现所使用,当用户想把一个未消费的流转换回Spliterator
时调用。
- 职责: 简单、纯粹地从源头阶段获取
-
sourceSpliterator(int terminalFlags)
:- 职责: 为任意阶段准备用于终端操作求值的
Spliterator
。 - 调用者: 任何一个阶段在准备执行终端操作时都可以调用(通常是最后一个阶段)。
- 复杂度: 非常复杂。它包含了处理并行流中有状态操作的核心逻辑。如果流是并行的且包含有状态操作(如
sorted
),这个方法会先执行到最后一个有状态操作为止的所有计算,然后返回一个代表中间结果的Spliterator
。 - 用途: 被
evaluate()
和evaluateToArrayNode()
等所有执行终端操作的方法调用,是实际执行计算的起点。
- 职责: 为任意阶段准备用于终端操作求值的
简单来说,sourceStageSpliterator()
是一个“档案管理员”,只负责从档案室(源头)取出原始文件(Spliterator
)。而 sourceSpliterator(int)
是一个“项目经理”,它不仅能拿到原始文件,还能在需要时组织团队(并行计算)完成一系列加工(有状态操作),最后交付一份处理好的报告(中间结果的 Spliterator
)。
总结
sourceStageSpliterator()
是一个高度专用的内部方法,其设计目标是正确性和安全性。通过严格的调用者检查和状态转换,它确保了流的源数据只能被源头阶段自己、且仅有一次地取出。它与更复杂的 sourceSpliterator(int)
方法形成了职责分工,共同构成了 Stream API 内部数据流转的基石。
sourceSpliterator
这个方法是 Stream API 内部实现中最核心、最复杂的方法之一。它不仅仅是获取源 Spliterator
,更重要的是,它扮演了启动终端操作前“预处理”流水线的角色,尤其是处理复杂的并行流场景。可以说,理解了这个方法,就理解了 Stream 并行计算的精髓。
sourceSpliterator(int terminalFlags)
的核心作用是:为终端操作准备一个“就绪”的 Spliterator
。
这个“就绪”的 Spliterator
可能是:
- 对于简单流(顺序流或无状态并行流):就是流的原始
Spliterator
。 - 对于复杂流(有状态并行流):是一个代表了部分计算结果的全新
Spliterator
。
其设计思想是:
- 处理屏障(Barrier):有状态操作(如
sorted()
,distinct()
)在并行流中像一个“屏障”,必须等待上游所有数据都到达并处理后,才能继续向下游传递。此方法就是负责执行到最后一个屏障为止的所有计算。 - 流水线切片与重组:它通过一个循环,动态地“切片”流水线,执行一部分(直到一个有状态操作),然后用其结果(一个新的
Spliterator
)作为下一段流水线的输入,并更新后续阶段的元数据(如depth
,combinedFlags
)。 - 延迟求值(Lazy Evaluation):在处理有状态操作时,它调用
opEvaluateParallelLazy
,这个方法返回的Spliterator
封装了并行计算的结果,但数据本身可能是延迟生成的。
protected Spliterator<?> sourceSpliterator(int terminalFlags)
protected
: 只能被java.util.stream
包内或其子类访问,是内部实现细节。Spliterator<?>
: 返回一个Spliterator
。注意这里的通配符?
,因为经过有状态操作后,Spliterator
的类型可能已经改变,但这个方法本身不关心具体类型,只把它当作一个数据源。int terminalFlags
: 来自终端操作的标志位(StreamOpFlag
)。例如,一个短路操作(likefindFirst
)会传入SHORT_CIRCUIT
标志,这个标志会影响后续流水线的行为。
代码逻辑深度解析
// ... existing code ...@SuppressWarnings("unchecked")protected Spliterator<?> sourceSpliterator(int terminalFlags) {// Get the source spliterator of the pipelineSpliterator<?> spliterator = null;if (sourceStage.sourceSpliterator != null) {spliterator = sourceStage.sourceSpliterator;sourceStage.sourceSpliterator = null;}else if (sourceStage.sourceSupplier != null) {spliterator = (Spliterator<?>) sourceStage.sourceSupplier.get();sourceStage.sourceSupplier = null;}else {throw new IllegalStateException(MSG_CONSUMED);}if (isParallel() && hasAnyStateful()) {// Adapt the source spliterator, evaluating each stateful op// in the pipeline up to and including this pipeline stage.// The depth and flags of each pipeline stage are adjusted accordingly.int depth = 1;for (@SuppressWarnings("rawtypes") AbstractPipeline u = sourceStage, p = sourceStage.nextStage, e = this;u != e;u = p, p = p.nextStage) {int thisOpFlags = p.sourceOrOpFlags;if (p.opIsStateful()) {depth = 0;if (StreamOpFlag.SHORT_CIRCUIT.isKnown(thisOpFlags)) {// ... (omitted for brevity)thisOpFlags = thisOpFlags & ~StreamOpFlag.IS_SHORT_CIRCUIT;}spliterator = p.opEvaluateParallelLazy(u, spliterator);// Inject or clear SIZED on the source pipeline stage// based on the stage's spliteratorthisOpFlags = spliterator.hasCharacteristics(Spliterator.SIZED)? (thisOpFlags & ~StreamOpFlag.NOT_SIZED) | StreamOpFlag.IS_SIZED: (thisOpFlags & ~StreamOpFlag.IS_SIZED) | StreamOpFlag.NOT_SIZED;}p.depth = depth++;p.combinedFlags = StreamOpFlag.combineOpFlags(thisOpFlags, u.combinedFlags);}}if (terminalFlags != 0) {// Apply flags from the terminal operation to last pipeline stagecombinedFlags = StreamOpFlag.combineOpFlags(terminalFlags, combinedFlags);}return spliterator;}
// ... existing code ...
第一步:获取原始 Spliterator
// Get the source spliterator of the pipeline
Spliterator<?> spliterator = null;
if (sourceStage.sourceSpliterator != null) {spliterator = sourceStage.sourceSpliterator;sourceStage.sourceSpliterator = null;
}
else if (sourceStage.sourceSupplier != null) {spliterator = (Spliterator<?>) sourceStage.sourceSupplier.get();sourceStage.sourceSupplier = null;
}
else {throw new IllegalStateException(MSG_CONSUMED);
}
这部分逻辑与 sourceStageSpliterator()
中的提取逻辑几乎完全相同。它从源头阶段(sourceStage
)获取最原始的 Spliterator
,并立即将源头的引用(sourceSpliterator
或 sourceSupplier
)清空,标志着流的消费正式开始。
第二步:处理并行流中的有状态操作(核心 if
和 for
循环)
if (isParallel() && hasAnyStateful()) {// ... for loop ...
}
这是整个方法的核心和灵魂。只有当流是并行的,并且流水线中至少包含一个有状态操作时,这个 if
块才会执行。
for
循环详解:
for (AbstractPipeline u = sourceStage, p = sourceStage.nextStage, e = this; u != e; u = p, p = p.nextStage)
- 初始化:
u
(upstream): 上游阶段,初始为sourceStage
。p
(pipeline): 当前处理的阶段,初始为源的下一个阶段。e
(end): 循环的终点,即调用sourceSpliterator
的那个阶段(通常是最后一个阶段)。
- 循环条件:
u != e
,只要上游阶段还没到终点就继续。 - 迭代:
u = p, p = p.nextStage
,两个指针一起向后移动,始终保持u
是p
的前一个阶段。
循环体内部:
-
if (p.opIsStateful()) { ... }
- 循环的主要目的就是找到有状态的操作阶段
p
。
- 循环的主要目的就是找到有状态的操作阶段
-
depth = 0;
- 一旦找到一个有状态操作,就将
depth
计数器重置为0。这是一个信号,表示我们即将创建一个新的“逻辑源头”。
- 一旦找到一个有状态操作,就将
-
spliterator = p.opEvaluateParallelLazy(u, spliterator);
- 这是魔法发生的地方。调用当前有状态操作
p
的opEvaluateParallelLazy
方法。 - 这个方法会把
u
(代表了到目前为止的所有上游操作)和当前的spliterator
(代表了上游的数据源)作为输入。 - 它会立即触发并执行从上一个屏障(或源头)到当前屏障
p
之间的所有并行计算。 - 执行完成后,它返回一个新的
Spliterator
。这个新的Spliterator
封装了p
操作(如sorted
)的计算结果。现在,这个新的spliterator
变量就成了下一段流水线的“源”。
- 这是魔法发生的地方。调用当前有状态操作
-
更新标志和元数据:
thisOpFlags = spliterator.hasCharacteristics(...)
:根据新生成的Spliterator
是否SIZED
,来更新当前操作的标志。例如,distinct()
可能会改变流的大小,所以需要重新评估SIZED
特性。p.depth = depth++;
:重新设置后续阶段的深度。因为p
已经被“执行”并物化为一个新的Spliterator
,所以它的下一个阶段的深度就变成了1。p.combinedFlags = StreamOpFlag.combineOpFlags(...)
:基于更新后的thisOpFlags
,重新计算后续阶段的组合标志。
执行示例: source.parallel().filter(...).sorted().map(...).toArray()
当 toArray
调用 sourceSpliterator
时:
- 循环开始,
p
指向filter
。opIsStateful()
为 false,跳过if
。 - 循环继续,
p
指向sorted
。opIsStateful()
为 true,进入if
。 - 调用
sorted.opEvaluateParallelLazy(filterHelper, sourceSpliterator)
。 - 这会触发
filter
和sorted
的并行计算,其结果被封装成一个新的spliterator_after_sorted
。 spliterator
变量现在被更新为spliterator_after_sorted
。sorted
之后的map
阶段的depth
和combinedFlags
会被更新,就好像它的上游直接就是spliterator_after_sorted
一样。- 循环结束。方法返回
spliterator_after_sorted
。
最终,toArray
的终端操作拿到的 Spliterator
已经是排好序的数据源了。
第三步:合并终端操作的标志
if (terminalFlags != 0) {// Apply flags from the terminal operation to last pipeline stagecombinedFlags = StreamOpFlag.combineOpFlags(terminalFlags, combinedFlags);
}
最后,将终端操作自己带来的标志(如 SHORT_CIRCUIT
)合并到最后一个阶段的 combinedFlags
中。这会影响最终的 wrapAndCopyInto
等方法的行为。
与 sourceStageSpliterator()
的对比
sourceStageSpliterator()
: 档案管理员。职责单一,只负责从源头取出原始Spliterator
。简单、安全、不涉及计算。sourceSpliterator(int)
: 项目经理/总工程师。职责复杂,负责在启动最终任务前,解决掉所有中途的“硬骨头”(有状态并行操作),通过预计算和流水线重组,为最终任务提供一个干净、就绪的数据源。
总结
sourceSpliterator(int terminalFlags)
是 Stream API 实现高性能并行计算的引擎室。它通过一个巧妙的循环,将一个复杂的、带有“屏障”的流水线,动态地、分段地执行,并将每一段的计算结果物化为一个新的 Spliterator
,作为下一段的输入。这个“评估-重组”的过程,有效地解决了有状态操作在并行环境下的数据依赖问题,是整个 Stream 框架中最为精妙和关键的设计之一。
wrapSpliterator
这个方法是 PipelineHelper
接口的实现,它的核心作用是:将一个代表上游数据的 Spliterator
,通过当前流水线阶段以及其所有下游阶段的操作进行“包装”,最终返回一个包含了所有后续操作逻辑的新的 Spliterator
。
简单来说,它就是 stream.spliterator()
这个终端操作的幕后功臣之一。当你想把一个构建好的、包含多个中间操作的 Stream 再转换回一个 Spliterator
时,就是这个方法在发挥作用。
wrapSpliterator
的核心作用是将**数据源(sourceSpliterator
)和操作链(从当前阶段到末尾)**结合起来,生成一个全新的、功能完备的 Spliterator
。
其设计思想是:
- 延迟执行 (Laziness):这个方法本身不执行任何计算。它只是创建一个新的
Spliterator
对象,这个对象内部“知道”需要对源数据执行哪些操作。真正的计算和转换只会在你调用这个新Spliterator
的tryAdvance
或forEachRemaining
等方法时才会发生。 - 递归/委托:它不自己实现复杂的包装逻辑,而是委托给一个抽象的
wrap
方法。这个wrap
方法由具体的 Stream 实现(如ReferencePipeline
,IntPipeline
)来提供,因为不同类型的 Stream(对象、int、long、double)其Spliterator
的包装方式也不同。 - 流水线封装:返回的
Spliterator
封装了从当前阶段开始的整个下游流水线。对外部调用者来说,它就像一个黑盒,你只管从里面取数据,它会自动帮你完成map
,filter
等所有操作。
final <P_IN> Spliterator<E_OUT> wrapSpliterator(Spliterator<P_IN> sourceSpliterator)
final
: 此方法不可被子类重写,是框架的核心稳定部分。<P_IN>
: 泛型参数,代表输入Spliterator
的元素类型。P_IN
代表 "Pipeline Input"。Spliterator<E_OUT>
: 返回值。返回一个新的Spliterator
,其元素类型是E_OUT
,即当前流水线阶段的输出类型。Spliterator<P_IN> sourceSpliterator
: 参数,代表上游提供的数据源。
代码逻辑深度解析
// ... existing code ...@Override@SuppressWarnings("unchecked")final <P_IN> Spliterator<E_OUT> wrapSpliterator(Spliterator<P_IN> sourceSpliterator) {if (depth == 0) {return (Spliterator<E_OUT>) sourceSpliterator;}else {return wrap(this, () -> sourceSpliterator, isParallel());}}
// ... existing code ...
基本情况 (Base Case)
if (depth == 0) {return (Spliterator<E_OUT>) sourceSpliterator;
}
depth
: 这个字段表示当前阶段在流水线中的深度。depth == 0
是一个特殊标记,它意味着当前AbstractPipeline
实例就是源头阶段 (Source Stage)。- 逻辑: 如果当前阶段就是源头,说明它后面没有任何操作了。因此,不需要进行任何包装,直接将输入的
sourceSpliterator
原样返回即可。这里的强制类型转换是安全的,因为在源头阶段,输入类型P_IN
和输出类型E_OUT
必然是相同的。 - 作用: 这是递归包装的终点。
递归包装 (Recursive Wrapping)
else {return wrap(this, () -> sourceSpliterator, isParallel());
}
当 depth > 0
时,意味着当前阶段是一个中间操作阶段(如 map
, filter
等)。这时就需要进行包装。
-
wrap(...)
: 这是一个抽象方法,定义在AbstractPipeline
的更下方。abstract <P_IN> Spliterator<E_OUT> wrap(PipelineHelper<E_OUT> ph,Supplier<Spliterator<P_IN>> supplier,boolean isParallel);
它由具体的子类(
ReferencePipeline
,IntPipeline
等)实现,因为它们才知道如何创建对应类型的包装Spliterator
(例如,StreamSpliterators.WrappingSpliterator
)。 -
wrap
方法的参数:this
: 将当前AbstractPipeline
实例作为PipelineHelper
传递进去。这个helper
对象包含了从当前阶段到最后一个阶段的所有信息(操作链、标志位等)。() -> sourceSpliterator
: 将上游的sourceSpliterator
包装成一个Supplier
。这是为了支持延迟获取。包装后的Spliterator
只有在第一次被使用时,才会通过这个Supplier
真正拿到上游的Spliterator
。isParallel()
: 传递当前流的并行状态,以便wrap
方法创建出正确的(并行或顺序的)Spliterator
包装器。
执行流程示例: 假设有 stream.map(...).filter(...)
,我们想从 map
阶段开始获取 Spliterator
。
- 在
map
阶段调用wrapSpliterator(sourceSpliterator)
。 map
阶段的depth > 0
,进入else
分支。- 调用
wrap(mapPipeline, () -> sourceSpliterator, isParallel)
。 ReferencePipeline
中实现的wrap
方法会创建一个StreamSpliterators.WrappingSpliterator
。- 这个
WrappingSpliterator
内部持有了mapPipeline
这个PipelineHelper
和上游的Spliterator
的Supplier
。 - 当用户调用这个新
Spliterator
的tryAdvance
时,它会:
- a. 从
Supplier
获取上游Spliterator
并调用其tryAdvance
得到一个元素。 - b. 通过
mapPipeline
找到map
操作的Sink
包装逻辑。 - c. 将元素推入
map
的Sink
,再推入filter
的Sink
。 - d. 如果元素通过了所有操作,就返回
true
。
总结
wrapSpliterator
是连接 Stream 流水线和 Spliterator
终端操作的关键桥梁。它通过一个简洁的 if-else
判断,优雅地处理了两种情况:
- 当位于流水线源头时,它作为递归的终点,直接返回源
Spliterator
。 - 当位于中间操作时,它将**“未来的操作”(由
this
PipelineHelper 代表)和“现在的数据源”**(由sourceSpliterator
代表)委托给具体的wrap
方法,创建一个新的、懒加载的、功能完备的Spliterator
。
这个方法的设计充分体现了职责分离和延迟执行的思想,是 Stream API 能够灵活地在不同表现形式(Stream
vs Spliterator
)之间转换的核心机制。
spliterator()
这个方法是 BaseStream
接口中定义的一个终端操作。它的作用非常直观:将一个已经构建好(但未消费)的 Stream 流水线转换回一个 Spliterator
。这提供了一种机制,允许用户在构建了复杂的流操作后,不立即进行求值,而是获取一个代表了整个流水线逻辑的 Spliterator
,以便进行更灵活或自定义的遍历。
spliterator()
的核心作用是将一个完整的、声明式的 Stream 流水线物化为一个可迭代的数据源 Spliterator
。
其设计思想是:
- 延迟执行的桥梁:Stream 的中间操作是延迟执行的,终端操作触发计算。
spliterator()
是一个特殊的终端操作,它本身不“计算”出最终结果(如collect
),而是将计算逻辑封装到一个新的Spliterator
中。真正的计算将在遍历这个返回的Spliterator
时发生。 - 互操作性:提供了从 Stream API 到更底层的
Spliterator
API 的一个“出口”,允许开发者将 Stream 的强大声明性与Spliterator
的精细控制(如自定义分割策略)结合起来。 - 消费流:和所有终端操作一样,调用
spliterator()
会消费掉这个流,使其不能再被其他操作使用。
public Spliterator<E_OUT> spliterator()
public
: 这是一个公开的 API,是BaseStream
接口的一部分,供所有 Stream 用户使用。Spliterator<E_OUT>
: 返回一个Spliterator
。E_OUT
是当前流水线阶段(也就是调用spliterator()
的那个 Stream 对象)的输出元素类型。
代码逻辑深度解析
// ... existing code ...// Primitive specialization use co-variant overrides, hence is not final@Override@SuppressWarnings("unchecked")public Spliterator<E_OUT> spliterator() {if (linkedOrConsumed)throw new IllegalStateException(MSG_STREAM_LINKED);linkedOrConsumed = true;if (this == sourceStage) {if (sourceStage.sourceSpliterator != null) {@SuppressWarnings("unchecked")Spliterator<E_OUT> s = (Spliterator<E_OUT>) sourceStage.sourceSpliterator;sourceStage.sourceSpliterator = null;return s;}else if (sourceStage.sourceSupplier != null) {@SuppressWarnings("unchecked")Supplier<Spliterator<E_OUT>> s = (Supplier<Spliterator<E_OUT>>) sourceStage.sourceSupplier;sourceStage.sourceSupplier = null;return lazySpliterator(s);}else {throw new IllegalStateException(MSG_CONSUMED);}}else {return wrap(this, () -> sourceSpliterator(0), isParallel());}}
// ... existing code ...
首先是标准的状态检查:
if (linkedOrConsumed)throw new IllegalStateException(MSG_STREAM_LINKED);
linkedOrConsumed = true;
这确保了流只能被消费一次,然后将流标记为已消费。
接下来,代码逻辑分为两大分支:
情况一:当前阶段是源头 (this == sourceStage
)
这个 if
块处理的是最简单的情况:在一个没有任何中间操作的原始 Stream 上调用 spliterator()
。例如 Stream.of(1, 2, 3).spliterator()
。
-
if (sourceStage.sourceSpliterator != null)
: 如果流是直接由一个Spliterator
创建的。- 直接获取这个
Spliterator
。 - 将
sourceStage.sourceSpliterator
置为null
,完成消费。 - 返回该
Spliterator
。
- 直接获取这个
-
else if (sourceStage.sourceSupplier != null)
: 如果流是由Supplier<Spliterator>
创建的。- 获取这个
Supplier
。 - 将
sourceStage.sourceSupplier
置为null
,完成消费。 - 调用
lazySpliterator(s)
。这是一个关键点,它不会立即调用supplier.get()
,而是返回一个特殊的Spliterator
(StreamSpliterators.LazySpliterator
),这个LazySpliterator
只有在第一次被使用(如调用tryAdvance
)时,才会真正从Supplier
中获取底层的Spliterator
。这保持了懒加载的特性。
- 获取这个
-
else
: 如果两者都为null
,说明流已被消费,抛出异常。
情况二:当前阶段是中间操作 (else
块)
这个 else
块处理的是更复杂、更常见的情况:在一个包含了至少一个中间操作的 Stream 上调用 spliterator()
。例如 Stream.of(1, 2, 3).map(i -> i * 2).spliterator()
。
else {return wrap(this, () -> sourceSpliterator(0), isParallel());
}
这行代码是整个方法的核心,它做了三件事:
-
() -> sourceSpliterator(0)
: 创建一个Supplier
。这个Supplier
的任务是调用我们之前深入分析过的sourceSpliterator(0)
方法。回忆一下,sourceSpliterator
会负责处理并行流中的有状态操作(屏障),并返回一个“就绪”的Spliterator
作为后续操作的数据源。对于顺序流,它就简单地返回原始Spliterator
。 -
wrap(this, ..., isParallel())
: 调用wrap
方法。this
: 将当前AbstractPipeline
实例(例如,代表map
操作的那个对象)作为PipelineHelper
传入。这个helper
知道从当前阶段到最后一个阶段的所有操作。() -> sourceSpliterator(0)
: 将准备好的数据源(通过Supplier
懒加载)传入。isParallel()
: 告知wrap
方法要创建并行还是顺序的包装器。
-
返回结果:
wrap
方法会返回一个全新的、经过包装的Spliterator
(例如StreamSpliterators.WrappingSpliterator
)。这个WrappingSpliterator
内部就同时持有了数据源(来自sourceSpliterator
)和操作链(来自this
这个PipelineHelper
)。当遍历这个新的Spliterator
时,它会从数据源取出一个元素,然后依次应用流水线上的所有操作。
与内部 wrapSpliterator
的关系
spliterator()
的 else
块与我们之前分析的 wrapSpliterator
的 else
块非常相似。
spliterator()
:return wrap(this, () -> sourceSpliterator(0), isParallel());
wrapSpliterator(...)
:return wrap(this, () -> sourceSpliterator, isParallel());
它们的主要区别在于 wrap
方法的第二个参数,即数据源 Supplier
:
spliterator()
使用() -> sourceSpliterator(0)
。它调用的是复杂的sourceSpliterator(int)
方法。这是因为它作为终端操作,必须处理整个流水线的复杂性,特别是并行流中的有状态操作。它需要一个能代表“预计算”结果的Spliterator
。wrapSpliterator(...)
使用() -> sourceSpliterator
(这里的sourceSpliterator
是wrapSpliterator
的参数)。它接收一个已经准备好的上游Spliterator
。它的职责相对简单,只是将这个现成的数据源和下游操作包装起来,不负责处理上游的屏障问题。
简单说,spliterator()
是面向用户的、发起整个封装过程的入口;而 wrapSpliterator
是内部递归包装过程中的一个环节。
总结
public spliterator()
方法是 Stream API 提供的一个功能强大的“逃生舱口”。它允许用户在任意阶段将一个声明式的、懒加载的 Stream 流水线,转换成一个具体的、可迭代的 Spliterator
对象。
- 对于源 Stream,它直接或懒加载地返回原始的
Spliterator
。 - 对于带有中间操作的 Stream,它通过调用核心的
wrap
和sourceSpliterator
方法,将数据源的获取逻辑(可能包含预计算)和操作链的执行逻辑完美地封装到一个新的Spliterator
中,实现了从Stream
到Spliterator
的无缝转换,同时保持了懒加载的特性。
close
Stream 需要 close()
和 onClose()
是因为它可能建立在需要显式关闭的底层 I/O 资源之上(例如文件、网络套接字等)。如果不关闭 Stream,这些底层资源可能无法被释放,从而导致资源泄漏。
BaseStream
接口(Stream
, IntStream
等的父接口)继承了 java.lang.AutoCloseable
接口。这意味着所有 Stream 都可以,并且在某些情况下必须在 try-with-resources
语句中使用,以确保资源的正确释放。
想象一个典型的场景:逐行读取文件。
在 Java 8 之前,我们可能会这么写:
BufferedReader reader = null;
try {reader = new BufferedReader(new FileReader("file.txt"));String line;while ((line = reader.readLine()) != null) {System.out.println(line);}
} finally {if (reader != null) {reader.close(); // 必须手动关闭}
}
使用 Stream API,代码可以变得非常简洁:
// 正确的用法
try (Stream<String> lines = Files.lines(Paths.get("file.txt"))) {lines.forEach(System.out::println);
} catch (IOException e) {e.printStackTrace();
}
// 在 try 块结束时,lines.close() 会被自动调用
Files.lines(...)
方法返回一个 Stream<String>
。这个 Stream 的数据源是一个打开的文件句柄。当 Stream 的处理完成时,这个文件句柄必须被关闭。try-with-resources
结构就保证了 stream.close()
方法在最后一定会被调用。
如果忘记使用 try-with-resources
,就可能导致文件句柄一直被占用,直到垃圾回收器介入,这在生产环境中是不可接受的。
现在我们来看onClose(Runnable closeHandler)
方法。
它的作用不是执行关闭操作,而是注册一个关闭处理器(Runnable
)。这个处理器会在 Stream 的 close()
方法被调用时执行。
这是一个非常精巧的回调机制。我们来分析一下它的源码:
// ... existing code ...@Override@SuppressWarnings("unchecked")public S onClose(Runnable closeHandler) {if (linkedOrConsumed)throw new IllegalStateException(MSG_STREAM_LINKED);Objects.requireNonNull(closeHandler);Runnable existingHandler = sourceStage.sourceCloseAction;sourceStage.sourceCloseAction =(existingHandler == null)? closeHandler: Streams.composeWithExceptions(existingHandler, closeHandler);return (S) this;}
// ... existing code ...
if (linkedOrConsumed)
: 检查流是否已经被操作或消费,确保只能在流水线构建阶段注册关闭处理器。Runnable existingHandler = sourceStage.sourceCloseAction;
: 这是关键。它获取的是源头阶段 (sourceStage
) 的关闭处理器。这意味着无论你在流水线的哪个阶段(map
,filter
等)调用onClose
,这个关闭逻辑最终都会被附加到整个流水线的源头。这是完全合理的,因为需要关闭的资源(如文件句柄)正是在源头被打开的。sourceStage.sourceCloseAction = ...
: 它将新的处理器设置回源头阶段。Streams.composeWithExceptions(existingHandler, closeHandler)
: 这个逻辑允许你注册多个关闭处理器。如果已经存在一个处理器,它会把新旧两个处理器组合起来,确保它们都会被执行。
那么,这个注册的 sourceCloseAction
在哪里被调用呢?答案是在 close()
方法里:
// ... existing code ...@Overridepublic void close() {linkedOrConsumed = true;sourceSupplier = null;sourceSpliterator = null;Runnable closeAction = sourceStage.sourceCloseAction;if (closeAction != null) {sourceStage.sourceCloseAction = null;closeAction.run();}}
// ... existing code ...
可以看到,close()
方法会从 sourceStage
获取之前注册的 closeAction
并执行它(closeAction.run()
)。
设计启示
- 职责分离:
onClose
的设计体现了完美的职责分离。- 资源创建者(例如
Files.lines
方法)负责提供资源,并使用onClose
注册清理资源的逻辑。它知道如何关闭资源。 - 资源使用者(调用 Stream API 的我们)负责在处理完数据后调用
close()
方法(通常通过try-with-resources
),但不需要关心具体的关闭细节。
- 资源创建者(例如
- 封装:
onClose
机制将资源管理的复杂性封装在了 Stream 的实现内部。使用者只需要遵循AutoCloseable
的标准用法即可,这大大降低了API的复杂度。 - 灵活性:通过组合
Runnable
,该机制甚至支持为一个 Stream 注册多个独立的关闭动作,增加了灵活性。
因此,onClose()
是一个供 Stream 提供方使用的内部机制,用于将资源清理逻辑与 Stream 本身绑定,而 close()
则是供 Stream 消费方使用的公开接口,用于触发这些清理逻辑。
核心抽象方法
这些方法必须由具体的管道实现(如 ReferencePipeline
, IntPipeline
)或其内部的操作类(如 StatelessOp
, StatefulOp
)提供:
abstract StreamShape getOutputShape()
: 返回当前管道阶段输出的流类型(REFERENCE
,INT_VALUE
,LONG_VALUE
,DOUBLE_VALUE
)。abstract <P_IN> Sink<P_IN> opWrapSink(int flags, Sink<E_OUT> sink)
:- 这是构建
Sink
链的核心。 - 参数
sink
是下游操作的Sink
。 - 此方法需要返回一个新的
Sink
,该Sink
实现了当前操作的逻辑,并将结果传递给下游的sink
。 - 例如,一个
map
操作会创建一个Sink
,它对接收到的每个元素应用映射函数,然后将结果传递给下游Sink
。
- 这是构建
abstract <P_IN> Spliterator<E_OUT> opEvaluateParallel(PipelineHelper<E_OUT> helper, Spliterator<P_IN> spliterator, IntFunction<E_OUT[]> generator)
:- (主要由有状态操作实现)定义了有状态操作如何在并行模式下执行,并返回一个新的
Spliterator
包含处理后的元素。
- (主要由有状态操作实现)定义了有状态操作如何在并行模式下执行,并返回一个新的
abstract boolean opIsStateful()
: 返回当前操作是否有状态(例如sorted()
,distinct()
是有状态的,而filter()
,map()
是无状态的)。