1 Java 8 Stream流底层原理-德赢Vwin官网 网
0
  • 聊天消息
  • 系统消息
  • 评论与回复
登录后你可以
  • 下载海量资料
  • 学习在线课程
  • 观看技术视频
  • 写文章/发帖/加入社区
会员中心
创作中心

完善资料让更多小伙伴认识你,还能领取20积分哦,立即完善>

3天内不再提示

Java 8 Stream流底层原理

jf_ro2CN3Fa 来源:CSDN 作者:CSDN 2022-11-18 10:27 次阅读

  • 函数式接口
    • 操作
    • 流程
    • Collection
    • AbstractPipeline
    • ReferencePipeline
    • Head
    • StatelessOp
    • StatefulOp
    • TerminalOp
    • ReduceOp
    • MatchOp
    • FindOp
    • ForEachOp
    • Sink
    • ChainedReference
    • TerminalSink
    • Collector
    • 并行流
    • ForkJoinTask
    • AbstractTask

函数式接口

初识lambda呢,函数式接口肯定是绕不过去的,函数式接口就是一个有且仅有一个抽象方法,但是可以有多个非抽象方法的接口。函数式接口可以被隐式转换为lambda表达式。

@FunctionalInterface
publicinterfaceCloseable{

voidclose();
}

java.util.function它包含了很多类,用来支持Java的函数式编程,该包中的函数式接口有:

e1af6c02-66e6-11ed-8abf-dac502259ad0.png

操作

e1bb7024-66e6-11ed-8abf-dac502259ad0.png

流程

Stream相关接口继承图:

e1cd4740-66e6-11ed-8abf-dac502259ad0.png

Stream流水线组织结构示意图(图是盗的):

e1dfbe7a-66e6-11ed-8abf-dac502259ad0.png

Collection

类路径java.util.colltction

@Override
defaultSpliteratorspliterator(){
returnSpliterators.spliterator(this,0);
}
//常用Stream流转换
defaultStreamstream(){
returnStreamSupport.stream(spliterator(),false);
}
//并行流
defaultStreamparallelStream(){
returnStreamSupport.stream(spliterator(),true);
}

//java.util.stream.StreamSupport#stream(java.util.Spliterator,boolean)
publicstaticStreamstream(Spliteratorspliterator,booleanparallel){
Objects.requireNonNull(spliterator);
returnnewReferencePipeline.Head<>(spliterator,StreamOpFlag.fromCharacteristics(spliterator),parallel);
}

AbstractPipeline

类路径java.util.stream.AbstractPipeline

//反向链接到管道链的头部(如果是源阶段,则为自身)。
privatefinalAbstractPipelinesourceStage;

//“上游”管道,如果这是源阶段,则为null。
privatefinalAbstractPipelinepreviousStage;

//此管道对象表示的中间操作的操作标志。
protectedfinalintsourceOrOpFlags;

//管道中的下一个阶段;如果这是最后一个阶段,则为null。在链接到下一个管道时有效地结束。
privateAbstractPipelinenextStage;

//如果是顺序的,则此管道对象与流源之间的中间操作数;如果是并行的,则为先前有状态的中间操作数。在管道准备进行评估时有效。
privateintdepth;

//源和所有操作的组合源标志和操作标志,直到此流水线对象表示的操作为止(包括该流水线对象所代表的操作)。在管道准备进行评估时有效。
privateintcombinedFlags;

//源拆分器。仅对头管道有效。如果管道使用非null值,那么在使用管道之前,sourceSupplier必须为null。在使用管道之后,如果非null,则将其设置为null。
privateSpliteratorsourceSpliterator;

//来源供应商。仅对头管道有效。如果非null,则在使用管道之前,sourceSpliterator必须为null。在使用管道之后,如果非null,则将其设置为null。
privateSupplier>sourceSupplier;

//如果已链接或使用此管道,则为True
privatebooleanlinkedOrConsumed;

//如果正在执行任何有状态操作,则为true;否则为true。仅对源阶段有效。
privatebooleansourceAnyStateful;

privateRunnablesourceCloseAction;

//如果管道是并行的,则为true;否则,管道为顺序的;否则为true。仅对源阶段有效。
privatebooleanparallel;

ReferencePipeline

类路径:java.util.stream.ReferencePipeline

filter
//java.util.stream.ReferencePipeline#filter
@Override
publicfinalStreamfilter(PredicatesuperP_OUT>predicate){
Objects.requireNonNull(predicate);
//返回一个匿名无状态的管道
returnnewStatelessOp(this,StreamShape.REFERENCE,StreamOpFlag.NOT_SIZED){
//下游生产线所需要的回调接口
@Override
SinkopWrapSink(intflags,Sinksink){
returnnewSink.ChainedReference(sink){
@Override
publicvoidbegin(longsize){
downstream.begin(-1);
}
//真正执行操作的方法,依靠ChainedReference内置ReferencePipeline引用下游的回调
@Override
publicvoidaccept(P_OUTu){
//只有满足条件的元素才能被下游执行
if(predicate.test(u))
downstream.accept(u);
}
};
}
};
}
map
//java.util.stream.ReferencePipeline#map
publicfinalStreammap(FunctionsuperP_OUT,?extendsR>mapper){
Objects.requireNonNull(mapper);
//返回一个匿名无状态的管道
returnnewStatelessOp(this,StreamShape.REFERENCE,StreamOpFlag.NOT_SORTED|StreamOpFlag.NOT_DISTINCT){
//下游生产线所需要的回调接口
@Override
SinkopWrapSink(intflags,Sinksink){
returnnewSink.ChainedReference(sink){
//真正执行操作的方法,依靠ChainedReference内置ReferencePipeline引用下游的回调
@Override
publicvoidaccept(P_OUTu){
//执行转换后提供给下游执行
downstream.accept(mapper.apply(u));
}
};
}
};
}
flatMap
//java.util.stream.ReferencePipeline#flatMap
@Override
publicfinalStreamflatMap(FunctionsuperP_OUT,?extendsStream>mapper){
Objects.requireNonNull(mapper);
//返回一个匿名无状态的管道
returnnewStatelessOp(this,StreamShape.REFERENCE,
StreamOpFlag.NOT_SORTED|StreamOpFlag.NOT_DISTINCT|StreamOpFlag.NOT_SIZED){
//下游生产线所需要的回调接口
@Override
SinkopWrapSink(intflags,Sinksink){
returnnewSink.ChainedReference(sink){
@Override
publicvoidbegin(longsize){
downstream.begin(-1);
}
//真正执行操作的方法,依靠ChainedReference内置ReferencePipeline引用下游的回调
@Override
publicvoidaccept(P_OUTu){
try(Streamresult=mapper.apply(u)){
//划分为多个流执行下游(分流)
if(result!=null)
result.sequential().forEach(downstream);
}
}
};
}
};
}
peek
//java.util.stream.ReferencePipeline#peek
@Override
publicfinalStreampeek(ConsumersuperP_OUT>action){
Objects.requireNonNull(action);
//返回一个匿名无状态的管道
returnnewStatelessOp(this,StreamShape.REFERENCE,0){
//下游生产线所需要的回调接口
@Override
SinkopWrapSink(intflags,Sinksink){
returnnewSink.ChainedReference(sink){
//真正执行操作的方法,依靠ChainedReference内置ReferencePipeline引用下游的回调
@Override
publicvoidaccept(P_OUTu){
//先执行自定义方法,在执行下游方法
action.accept(u);
downstream.accept(u);
}
};
}
};
}
sorted
@Override
publicfinalStreamsorted(){
//不提供Comparator,会使用元素自实现Comparator的compareTo方法
returnSortedOps.makeRef(this);
}

@Override
publicfinalStreamsorted(ComparatorsuperP_OUT>comparator){
returnSortedOps.makeRef(this,comparator);
}
//Sorted.makeRef
staticStreammakeRef(AbstractPipelineupstream,
ComparatorsuperT>comparator){
returnnewOfRef<>(upstream,comparator);
}
//ofRef类
privatestaticfinalclassOfRef<T>extendsReferencePipeline.StatefulOp<T,T>{

privatefinalbooleanisNaturalSort;
privatefinalComparatorsuperT>comparator;

@Override
publicSinkopWrapSink(intflags,Sinksink){
Objects.requireNonNull(sink);
//根据不同的flag进行不同排序
if(StreamOpFlag.SORTED.isKnown(flags)&&isNaturalSort)
returnsink;
elseif(StreamOpFlag.SIZED.isKnown(flags))
returnnewSizedRefSortingSink<>(sink,comparator);
else
returnnewRefSortingSink<>(sink,comparator);
}

}
distinct
@Override
publicfinalStreamdistinct(){
returnDistinctOps.makeRef(this);
}
staticReferencePipelinemakeRef(AbstractPipelineupstream){
//返回一个匿名有状态的管道
returnnewReferencePipeline.StatefulOp(upstream,StreamShape.REFERENCE,StreamOpFlag.IS_DISTINCT|StreamOpFlag.NOT_SIZED){

@Override
SinkopWrapSink(intflags,Sinksink){
Objects.requireNonNull(sink);

if(StreamOpFlag.DISTINCT.isKnown(flags)){
//已经是去重过了
returnsink;
}elseif(StreamOpFlag.SORTED.isKnown(flags)){
//有序流
returnnewSink.ChainedReference(sink){
booleanseenNull;
//这个为先执行的前序元素
TlastSeen;

@Override
publicvoidbegin(longsize){
seenNull=false;
lastSeen=null;
downstream.begin(-1);
}

@Override
publicvoidend(){
seenNull=false;
lastSeen=null;
downstream.end();
}
//这里通过有序的特性,前序元素与后序元素比较,如果相等则跳过执行后序的元素
@Override
publicvoidaccept(Tt){
if(t==null){
//这里控制元素为null只有一个
if(!seenNull){
seenNull=true;
downstream.accept(lastSeen=null);
}
}elseif(lastSeen==null||!t.equals(lastSeen)){
//这里将前序元素赋值给lastSeen
downstream.accept(lastSeen=t);
}
}
};
}else{
//底层通过Set进行去重,所以该元素需要重写hashCode和equals方法
returnnewSink.ChainedReference(sink){
Setseen;

@Override
publicvoidbegin(longsize){
seen=newHashSet<>();
downstream.begin(-1);
}

@Override
publicvoidend(){
seen=null;
downstream.end();
}

@Override
publicvoidaccept(Tt){
if(!seen.contains(t)){
seen.add(t);
downstream.accept(t);
}
}
};
}
}
};
}
skip、limit
publicstaticStreammakeRef(AbstractPipelineupstream,
longskip,longlimit){
if(skip< 0)
thrownewIllegalArgumentException("Skipmustbenon-negative:"+skip);
//返回一个匿名有状态的管道
returnnewReferencePipeline.StatefulOp(upstream,StreamShape.REFERENCE,flags(limit)){
SpliteratorunorderedSkipLimitSpliterator(Spliterators,longskip,longlimit,longsizeIfKnown){
if(skip<= sizeIfKnown) {
                    limit = limit >=0?Math.min(limit,sizeIfKnown-skip):sizeIfKnown-skip;
skip=0;
}
returnnewStreamSpliterators.UnorderedSliceSpliterator.OfRef<>(s,skip,limit);
}
//自己实现真正操作的方法
@Override
SinkopWrapSink(intflags,Sinksink){
returnnewSink.ChainedReference(sink){
longn=skip;
longm=limit>=0?limit:Long.MAX_VALUE;

@Override
publicvoidbegin(longsize){
downstream.begin(calcSize(size,skip,m));
}

@Override
publicvoidaccept(Tt){
if(n==0){
//limit
if(m>0){
m--;
downstream.accept(t);
}
}
//skip
else{
n--;
}
}

@Override
publicbooleancancellationRequested(){
returnm==0||downstream.cancellationRequested();
}
};
}
};
}
reduce
//java.util.stream.ReferencePipeline#reduce(P_OUT,java.util.function.BinaryOperator)
@Override
publicfinalP_OUTreduce(finalP_OUTidentity,finalBinaryOperatoraccumulator){
returnevaluate(ReduceOps.makeRef(identity,accumulator,accumulator));
}
//java.util.stream.ReferencePipeline#reduce(java.util.function.BinaryOperator)
@Override
publicfinalOptionalreduce(BinaryOperatoraccumulator){
returnevaluate(ReduceOps.makeRef(accumulator));
}
//java.util.stream.ReferencePipeline#reduce(R,java.util.function.BiFunction,java.util.function.BinaryOperator)
@Override
publicfinalRreduce(Ridentity,BiFunctionsuperP_OUT,R>accumulator,BinaryOperatorcombiner){
returnevaluate(ReduceOps.makeRef(identity,accumulator,combiner));
}

//java.util.stream.AbstractPipeline#evaluate(java.util.stream.TerminalOp)
finalRevaluate(TerminalOpterminalOp){
assertgetOutputShape()==terminalOp.inputShape();
if(linkedOrConsumed)
thrownewIllegalStateException(MSG_STREAM_LINKED);
linkedOrConsumed=true;

returnisParallel()
?terminalOp.evaluateParallel(this,sourceSpliterator(terminalOp.getOpFlags()))
:terminalOp.evaluateSequential(this,sourceSpliterator(terminalOp.getOpFlags()));
}
collect
//java.util.stream.ReferencePipeline#collect(java.util.stream.Collector)
@Override
@SuppressWarnings("unchecked")
publicfinalRcollect(CollectorsuperP_OUT,A,R>collector){
Acontainer;
if(isParallel()
&&(collector.characteristics().contains(Collector.Characteristics.CONCURRENT))
&&(!isOrdered()||collector.characteristics().contains(Collector.Characteristics.UNORDERED))){
container=collector.supplier().get();
BiConsumersuperP_OUT>accumulator=collector.accumulator();
forEach(u->accumulator.accept(container,u));
}
else{
container=evaluate(ReduceOps.makeRef(collector));
}
//具有特定转换的使用finisher处理
returncollector.characteristics().contains(Collector.Characteristics.IDENTITY_FINISH)
?(R)container
:collector.finisher().apply(container);
}
//java.util.stream.ReferencePipeline#collect(java.util.function.Supplier,java.util.function.BiConsumer,java.util.function.BiConsumer)
@Override
publicfinalRcollect(Suppliersupplier,BiConsumersuperP_OUT>accumulator,BiConsumercombiner){
returnevaluate(ReduceOps.makeRef(supplier,accumulator,combiner));
}

//java.util.stream.AbstractPipeline#evaluate(java.util.stream.TerminalOp)
finalRevaluate(TerminalOpterminalOp){
assertgetOutputShape()==terminalOp.inputShape();
if(linkedOrConsumed)
thrownewIllegalStateException(MSG_STREAM_LINKED);
linkedOrConsumed=true;

returnisParallel()
?terminalOp.evaluateParallel(this,sourceSpliterator(terminalOp.getOpFlags()))
:terminalOp.evaluateSequential(this,sourceSpliterator(terminalOp.getOpFlags()));
}
forEach
//java.util.stream.ReferencePipeline#forEach
@Override
publicvoidforEach(ConsumersuperP_OUT>action){
evaluate(ForEachOps.makeRef(action,false));
}

//java.util.stream.ForEachOps#makeRef
publicstaticTerminalOpmakeRef(ConsumersuperT>action,booleanordered){
Objects.requireNonNull(action);
returnnewForEachOp.OfRef<>(action,ordered);
}

//java.util.stream.ForEachOps.ForEachOp.OfRef
staticfinalclassOfRef<T>extendsForEachOp<T>{
finalConsumersuperT>consumer;

OfRef(ConsumersuperT>consumer,booleanordered){
super(ordered);
this.consumer=consumer;
}

//只是简单的消费
@Override
publicvoidaccept(Tt){
consumer.accept(t);
}
}

Head

流的数据元的头,类路径java.util.stream.ReferencePipeline.Head

//java.util.stream.ReferencePipeline.Head
staticclassHead<E_IN,E_OUT>extendsReferencePipeline<E_IN,E_OUT>{

Head(Supplier>source,intsourceFlags,booleanparallel){
super(source,sourceFlags,parallel);
}

Head(Spliteratorsource,intsourceFlags,booleanparallel){
super(source,sourceFlags,parallel);
}

@Override
finalbooleanopIsStateful(){
thrownewUnsupportedOperationException();
}

@Override
finalSinkopWrapSink(intflags,Sinksink){
thrownewUnsupportedOperationException();
}
//Optimizedsequentialterminaloperationsfortheheadofthepipeline
@Override
publicvoidforEach(ConsumersuperE_OUT>action){
if(!isParallel()){
sourceStageSpliterator().forEachRemaining(action);
}
else{
super.forEach(action);
}
}

@Override
publicvoidforEachOrdered(ConsumersuperE_OUT>action){
if(!isParallel()){
sourceStageSpliterator().forEachRemaining(action);
}
else{
super.forEachOrdered(action);
}
}
}

StatelessOp

无状态的中间管道,类路径java.util.stream.ReferencePipeline.StatelessOp

//java.util.stream.ReferencePipeline.StatelessOp
abstractstaticclassStatelessOp<E_IN,E_OUT>extendsReferencePipeline<E_IN,E_OUT>{

StatelessOp(AbstractPipelineupstream,StreamShapeinputShape,intopFlags){
super(upstream,opFlags);
assertupstream.getOutputShape()==inputShape;
}

@Override
finalbooleanopIsStateful(){
returnfalse;
}
}

StatefulOp

有状态的中间管道,类路径java.util.stream.ReferencePipeline.StatefulOp

//java.util.stream.ReferencePipeline.StatefulOp
abstractstaticclassStatefulOp<E_IN,E_OUT>extendsReferencePipeline<E_IN,E_OUT>{

StatefulOp(AbstractPipelineupstream,StreamShapeinputShape,intopFlags){
super(upstream,opFlags);
assertupstream.getOutputShape()==inputShape;
}

@Override
finalbooleanopIsStateful(){
returntrue;
}

@Override
abstractNodeopEvaluateParallel(PipelineHelperhelper,
Spliteratorspliterator,
IntFunctiongenerator);

TerminalOp

管道流的结束操作,类路径java.util.stream.TerminalOp

interfaceTerminalOp<E_IN,R>{

//获取此操作的输入类型的形状
defaultStreamShapeinputShape(){returnStreamShape.REFERENCE;}

//获取操作的流标志。终端操作可以设置StreamOpFlag定义的流标志的有限子集,并且这些标志与管道的先前组合的流和中间操作标志组合在一起。
defaultintgetOpFlags(){return0;}

//使用指定的PipelineHelper对操作执行并行评估,该操作描述上游中间操作。
defaultRevaluateParallel(PipelineHelperhelper,Spliteratorspliterator){
if(Tripwire.ENABLED)
Tripwire.trip(getClass(),"{0}triggeringTerminalOp.evaluateParallelserialdefault");
returnevaluateSequential(helper,spliterator);
}

//使用指定的PipelineHelper对操作执行顺序评估,该操作描述上游中间操作。
RevaluateSequential(PipelineHelperhelper,Spliteratorspliterator);
}

ReduceOp

类路径java.util.stream.ReduceOps.ReduceOp

privatestaticabstractclassReduceOp<T,R,SextendsAccumulatingSink<T,R,S>>implementsTerminalOp<T,R>{
privatefinalStreamShapeinputShape;

ReduceOp(StreamShapeshape){
inputShape=shape;
}

publicabstractSmakeSink();

@Override
publicStreamShapeinputShape(){
returninputShape;
}

//通过匿名子类实现makeSink()获取Sink
@Override
publicRevaluateSequential(PipelineHelperhelper,Spliteratorspliterator){
returnhelper.wrapAndCopyInto(makeSink(),spliterator).get();
}

@Override
publicRevaluateParallel(PipelineHelperhelper,Spliteratorspliterator){
returnnewReduceTask<>(this,helper,spliterator).invoke().get();
}
}

MatchOp

类路径java.util.stream.MatchOps.MatchOp

privatestaticfinalclassMatchOp<T>implementsTerminalOp<T,Boolean>{
privatefinalStreamShapeinputShape;
finalMatchKindmatchKind;
finalSupplier>sinkSupplier;

MatchOp(StreamShapeshape,MatchKindmatchKind,Supplier>sinkSupplier){
this.inputShape=shape;
this.matchKind=matchKind;
this.sinkSupplier=sinkSupplier;
}

@Override
publicintgetOpFlags(){
returnStreamOpFlag.IS_SHORT_CIRCUIT|StreamOpFlag.NOT_ORDERED;
}

@Override
publicStreamShapeinputShape(){
returninputShape;
}

//使用内置的sinkSupplier获取Sink
@Override
publicBooleanevaluateSequential(PipelineHelperhelper,Spliteratorspliterator){
returnhelper.wrapAndCopyInto(sinkSupplier.get(),spliterator).getAndClearState();
}

@Override
publicBooleanevaluateParallel(PipelineHelperhelper,Spliteratorspliterator){
returnnewMatchTask<>(this,helper,spliterator).invoke();
}
}

FindOp

类路径java.util.stream.FindOps.FindOp

privatestaticfinalclassFindOp<T,O>implementsTerminalOp<T,O>{
privatefinalStreamShapeshape;
finalbooleanmustFindFirst;
finalOemptyValue;
finalPredicatepresentPredicate;
finalSupplier>sinkSupplier;

FindOp(booleanmustFindFirst,
StreamShapeshape,
OemptyValue,
PredicatepresentPredicate,
Supplier>sinkSupplier){
this.mustFindFirst=mustFindFirst;
this.shape=shape;
this.emptyValue=emptyValue;
this.presentPredicate=presentPredicate;
this.sinkSupplier=sinkSupplier;
}

@Override
publicintgetOpFlags(){
returnStreamOpFlag.IS_SHORT_CIRCUIT|(mustFindFirst?0:StreamOpFlag.NOT_ORDERED);
}

@Override
publicStreamShapeinputShape(){
returnshape;
}

//通过内置sinkSupplier获取Sink
@Override
publicOevaluateSequential(PipelineHelperhelper,Spliteratorspliterator){
Oresult=helper.wrapAndCopyInto(sinkSupplier.get(),spliterator).get();
returnresult!=null?result:emptyValue;
}

@Override
publicOevaluateParallel(PipelineHelperhelper,Spliteratorspliterator){
returnnewFindTask<>(this,helper,spliterator).invoke();
}
}

ForEachOp

类路径java.util.stream.ForEachOps.ForEachOp

staticabstractclassForEachOp<T>implementsTerminalOp<T,Void>,TerminalSink<T,Void>{
privatefinalbooleanordered;

protectedForEachOp(booleanordered){
this.ordered=ordered;
}

@Override
publicintgetOpFlags(){
returnordered?0:StreamOpFlag.NOT_ORDERED;
}

//自己实现了Sink
@Override
publicVoidevaluateSequential(PipelineHelperhelper,Spliteratorspliterator){
returnhelper.wrapAndCopyInto(this,spliterator).get();
}

@Override
publicVoidevaluateParallel(PipelineHelperhelper,Spliteratorspliterator){
if(ordered)
newForEachOrderedTask<>(helper,spliterator,this).invoke();
else
newForEachTask<>(helper,spliterator,helper.wrapSink(this)).invoke();
returnnull;
}

@Override
publicVoidget(){
returnnull;
}

staticfinalclassOfRef<T>extendsForEachOp<T>{
finalConsumersuperT>consumer;

OfRef(ConsumersuperT>consumer,booleanordered){
super(ordered);
this.consumer=consumer;
}

@Override
publicvoidaccept(Tt){
consumer.accept(t);
}
}
...
}

Sink

类路径java.util.stream.Sink

interfaceSink<T>extendsConsumer<T>{
//开始遍历元素之前调用该方法,通知Sink做好准备。
defaultvoidbegin(longsize){}
//所有元素遍历完成之后调用,通知Sink没有更多的元素了。
defaultvoidend(){}
//是否可以结束操作,可以让短路操作尽早结束。
defaultbooleancancellationRequested(){
returnfalse;
}
//遍历元素时调用,接受一个待处理元素,并对元素进行处理。Stage把自己包含的操作和回调方法封装到该方法里,前一个Stage只需要调用当前Stage.accept(Tt)方法就行了。
voidaccept(Tt);
}

这里Sink的子类实现中分为两种:中间操作匿名实现ChainedReferenceTerminalOp子类所提供的Sink。

ChainedReference

类路径java.util.stream.Sink.ChainedReference,这里是中间操作的默认模板父类

staticabstractclassChainedReference<T,E_OUT>implementsSink<T>{
protectedfinalSinksuperE_OUT>downstream;

publicChainedReference(SinksuperE_OUT>downstream){
this.downstream=Objects.requireNonNull(downstream);
}

@Override
publicvoidbegin(longsize){
downstream.begin(size);
}

@Override
publicvoidend(){
downstream.end();
}

@Override
publicbooleancancellationRequested(){
returndownstream.cancellationRequested();
}
}

在上述的中间操作管道流中都是通过匿名类继承ChainedReference实现onWrapSink(int, Sink)返回一个指定操作的Sink。

TerminalSink

这里为什么讲提供呢?这是因为不同的实现TerminalOp的子类中在实现java.util.stream.TerminalOp#evaluateSequential中都是通过helper.wrapAndCopyInto(TerminalOp子类实现提供的Sink, spliterator)中通过参数传递的方式提供的,不同的子类传递的方式不一样所以此处用了一个提供Sink

由ReduceOps中实现TerminalOp所提供的ReducingSink,它是由匿名类实现java.util.stream.ReduceOps.ReduceOp#makeSink来交付给helper.wrapAndCopyInto(makeSink(), spliterator)的。

publicstaticTerminalOpmakeRef(Useed,BiFunctionsuperT,U>reducer,BinaryOperatorcombiner){
Objects.requireNonNull(reducer);
Objects.requireNonNull(combiner);
classReducingSinkextendsBox<U>implementsAccumulatingSink<T,U,ReducingSink>{
@Override
publicvoidbegin(longsize){
state=seed;
}

@Override
publicvoidaccept(Tt){
state=reducer.apply(state,t);
}

@Override
publicvoidcombine(ReducingSinkother){
state=combiner.apply(state,other.state);
}
}
returnnewReduceOp(StreamShape.REFERENCE){
@Override
publicReducingSinkmakeSink(){
returnnewReducingSink();
}
};
}

ForEachOps中实现TerminalOp所提供的是this,它的提供方式就是通过this交付给helper.wrapAndCopyInto(this, spliterator)

//这里ForEachOp自己通过TerminalSink间接的实现了Sink
staticabstractclassForEachOp<T>implementsTerminalOp<T,Void>,TerminalSink<T,Void>{
@Override
publicVoidevaluateSequential(PipelineHelperhelper,Spliteratorspliterator){
returnhelper.wrapAndCopyInto(this,spliterator).get();
}
}

由MatchOps中实现TerminalOp所提供的sinkSupplier通过构造函数由外部赋值,通过Supplier接口的get()来交付给helper.wrapAndCopyInto(sinkSupplier.get(), spliterator)

privatestaticfinalclassMatchOp<T>implementsTerminalOp<T,Boolean>{
finalSupplier>sinkSupplier;

@Override
publicBooleanevaluateSequential(PipelineHelperhelper,Spliteratorspliterator){
returnhelper.wrapAndCopyInto(sinkSupplier.get(),spliterator).getAndClearState();
}
}

由FindOps中实现TerminalOp所提供的与上述MatchOps是一致的

privatestaticfinalclassFindOp<T,O>implementsTerminalOp<T,O>{
finalSupplier>sinkSupplier;

@Override
publicOevaluateSequential(PipelineHelperhelper,Spliteratorspliterator){
Oresult=helper.wrapAndCopyInto(sinkSupplier.get(),spliterator).get();
returnresult!=null?result:emptyValue;
}
}

Collector

在Collector中有以下几个实现接口:

  • Supplier:结果类型的提供器。
  • BiConsumer:将元素放入结果的累加器。
  • BinaryOperator:合并部分结果的组合器。
  • Function:对结果类型转换为最终结果类型的转换器
  • Set:保存Collector特征的集合

并行流

前述都是基于串行流的讲解,其实并行流也是基于上述的helper.wrapAndCopyInto(op.sinkSupplier.get(), spliterator)这个方法上面做的一层基于ForkJoinTask多线程框架的封装。

ForkJoinTask

ForkJoin框架的思想就是分而治之,它将一个大任务切割为多个小任务这个过程称为fork,将每个任务的执行的结果进行汇总的过程称为join。ForkJoin框架相关的接口关系图如下(图是盗的):

e1fb7a7a-66e6-11ed-8abf-dac502259ad0.png

AbstractTask

类路径java.util.stream.AbstractTask,AbstractTask继承了在JUC中已经封装好的ForkJoinTask抽象子类java.util.concurrent.CountedCompleter

此类基于CountedCompleter ,它是fork-join任务的一种形式,其中每个任务都有未完成子代的信号量计数,并且该任务隐式完成并在其最后一个子代完成时得到通知。 内部节点任务可能会覆盖CountedCompleteronCompletion方法,以将子任务的结果合并到当前任务的结果中。

拆分和设置子任务链接是由内部节点的compute()完成的。 在叶节点的compute()时间,可以确保将为所有子代设置父代的子代相关字段(包括父代子代的同级链接)。

例如,执行减少任务的任务将覆盖doLeaf()以使用Spliterator对该叶节点的块执行减少Spliterator ,并覆盖onCompletion()以合并内部节点的子任务的结果:

@Override
protectedReduceTaskmakeChild(Spliteratorspliterator){
//返回一个ForkJoinTask任务
returnnewReduceTask<>(this,spliterator);
}

@Override
protectedSdoLeaf(){
//其他实现大同小异
returnhelper.wrapAndCopyInto(op.makeSink(),spliterator);
}

@Override
publicvoidonCompletion(CountedCompletercaller){
//非叶子节点进行结果组合
if(!isLeaf()){
SleftResult=leftChild.getLocalResult();
leftResult.combine(rightChild.getLocalResult());
setLocalResult(leftResult);
}
//GCspliterator,leftandrightchild
super.onCompletion(caller);
}

AbstractTask封装了分片任务的算法模板,通过是SpliteratortrySplit()方法来实现分片的细节,详细算法源码如下(类路径:java.util.stream.AbstractTask#compute):

@Override
publicvoidcompute(){
//将当前这个spliterator作为右节点(此时为root节点)
Spliteratorrs=spliterator,ls;
//评估任务的大小
longsizeEstimate=rs.estimateSize();
//获取任务阈值
longsizeThreshold=getTargetSize(sizeEstimate);
booleanforkRight=false;
@SuppressWarnings("unchecked")Ktask=(K)this;
//细节不多赘述,下面我用图来讲解算法
/**
*根节点指定为:右边节点
*root
*split()
*leftright
*left.fork()
*split()
*lr
*rs=ls
*right.fork()
*split()
*lr
*l.fork()
*/
while(sizeEstimate>sizeThreshold&&(ls=rs.trySplit())!=null){
KleftChild,rightChild,taskToFork;
task.leftChild=leftChild=task.makeChild(ls);
task.rightChild=rightChild=task.makeChild(rs);
task.setPendingCount(1);
if(forkRight){
forkRight=false;
//左右节点切换进行fork和split
rs=ls;
task=leftChild;
taskToFork=rightChild;
}
else{
forkRight=true;
task=rightChild;
taskToFork=leftChild;
}
//fork任务加入队列中去
taskToFork.fork();
sizeEstimate=rs.estimateSize();
}
//将执行doLeaf底层就是单个串行流的操作
task.setLocalResult(task.doLeaf());
//将结果组合成一个最终结果
task.tryComplete();
}

AbstractTask执行与分片流程图如下:

e20b4e46-66e6-11ed-8abf-dac502259ad0.png

到这里Stream流的相关知识介绍到这,这里附上一副总体图来加深下印象

e217cc8e-66e6-11ed-8abf-dac502259ad0.png

审核编辑 :李倩


声明:本文内容及配图由入驻作者撰写或者入驻合作网站授权转载。文章观点仅代表作者本人,不代表德赢Vwin官网 网立场。文章及其配图仅供工程师学习之用,如有内容侵权或者其他违规问题,请联系本站处理。 举报投诉
  • 接口
    +关注

    关注

    33

    文章

    8575

    浏览量

    151014
  • JAVA
    +关注

    关注

    19

    文章

    2966

    浏览量

    104700
  • 函数
    +关注

    关注

    3

    文章

    4327

    浏览量

    62569

原文标题:还有人不知道 Java 8 Stream流底层原理?

文章出处:【微信号:芋道源码,微信公众号:芋道源码】欢迎添加关注!文章转载请注明出处。

收藏 人收藏

    评论

    相关推荐

    Java 23功能介绍

    Java 23 包含全新和更新的 Java 语言功能、核心 API 以及 JVM,同时适合新的 Java 开发者和高级开发者。从 IntelliJ IDEA 2024.2 开始已支持 Java
    的头像 发表于 12-04 10:02 192次阅读
    <b class='flag-5'>Java</b> 23功能介绍

    Java集合API的改进介绍

    解答这些问题。 我们将逐步学习 Java 集合类的优化过程,并按版本逐一对比分析。主要讨论的焦点将包括 JDK 1.0、1.2、1.4、1.5、1.6、1.8、9、10、11 和 21 版本的 Java 集合功能 Java 集合
    的头像 发表于 11-22 11:12 171次阅读
    <b class='flag-5'>Java</b>集合API的改进介绍

    Java中时间戳的使用

    Java中时间戳的使用
    的头像 发表于 11-06 16:04 187次阅读
    <b class='flag-5'>Java</b>中时间戳的使用

    java反编译能拿到源码吗

    Java反编译是一种将编译后的Java字节码(.class文件)转换回Java源代码的过程。虽然反编译可以帮助理解代码的逻辑和结构,但它并不总是能完美地还原原始源代码。反编译工具通常会产生与原始代码
    的头像 发表于 09-02 11:03 941次阅读

    如何成功的烘烤微控SU-8光刻胶?

    在微控PDMS芯片加工的过程中,需要使用烘胶台或者烤胶设备对SU-8光刻胶或PDMS聚合物进行烘烤。SU-8光刻胶的烘烤通常需要进行2-3次。本文简要介绍SU-8光刻胶烘烤的注意事项
    的头像 发表于 08-27 15:54 255次阅读

    华纳云:java web和java有什么区别java web和java有什么区别

    Java Web和Java是两个不同的概念,它们在功能、用途和实现方式上存在一些区别,下面将详细介绍它们之间的区别。 1. 功能和用途: – Java是一种编程语言,它提供了一种用于开发各种应用程序
    的头像 发表于 07-16 13:35 783次阅读
    华纳云:<b class='flag-5'>java</b> web和<b class='flag-5'>java</b>有什么区别<b class='flag-5'>java</b> web和<b class='flag-5'>java</b>有什么区别

    ESP-ADF下的i2s_stream是否可以全双工工作?

    请问各位朋友: ESP-ADF下的i2s_stream是否可以全双工工作,我看了其下的所有关于I2S的例子程序和create_i2s_stream函数的源码,都只能单工工作(要么读要么写),我要自行修改与i2s_stream相关
    发表于 06-28 06:59

    如何在STM32F439 DMA中断中区分是哪个通道产生的中断?

    STM32F439有两个DMA控制器DMA1和DMA2,每个控制器有8,每个8个通道,我做的是采用串口1DMA中断接收数据,DMA2的
    发表于 05-17 08:20

    Oracle确认Java/JDK 11官方支持延长至2032年1月 

    此外,Solaris操作系统上的Java SE 8Java SE 11的官方支持也同步延期至2030年12月及2032年1月,进一步延长了该平台上的Java服务周期。
    的头像 发表于 05-16 15:57 1188次阅读

    已经安装了Java,且依然提示安装Java是为什么?

    我已经在机器上安装了最新版的 Java 10,打开 Cube 却得到要求安装 Java 1.7.0_45 的提示。何解?Eclipse CDT 依赖 Java,不可卸载重装。
    发表于 04-26 06:23

    使用STM32F429的DMA多个外设都使用到同样的DMA_STREAM的时候,就会发生冲突怎么解决?

    最近在使用STM32F429的DMA时候,发现一个问题,当多个外设都使用到同样的DMA_STREAM的时候,就会发生冲突(后面配置的DMA可用,前面配置的不能用),我用的USART6_TX用
    发表于 04-24 07:13

    stm32F429串口采用DMA方式发送,数据使能失败的原因?

    DMA1 时钟稳定 DMA_DeInit(DMA2_Stream7);// 复位初始化DMA数据 while (DMA_GetCmdStatus(DMA2_Stream7) != DISABLE
    发表于 04-17 07:05

    java实现多线程的几种方式

    了多种实现多线程的方式,本文将详细介绍以下几种方式: 1.继承Thread类 2.实现Runnable接口 3.Callable和Future 4.线程池 5.Java 8
    的头像 发表于 03-14 16:55 686次阅读

    Oracle 2024年Java发展蓝图分析

    Oracle 的 Java 开发者布道师 Nicolai Parlog 于近日发布一段视频,介绍了 2024 年的 Java 工作规划。
    的头像 发表于 01-26 14:27 1326次阅读

    XMC Pinout Tool是否可以不受限制地使用免费的Java版本运行?

    ,我想问一下 Java 许可证是否适用于例如 Java Update 8 381 包含在 Pinout Tool 中,因为此 Java 版本通常会产生许可费用。
    发表于 01-22 06:18