博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
Flink - StreamJob
阅读量:6853 次
发布时间:2019-06-26

本文共 42290 字,大约阅读时间需要 140 分钟。

 

先看最简单的例子,

final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();DataStream
> stream = env.addSource(...);stream .map(new MapFunction
() {...}) .addSink(new SinkFunction
>() {...});env.execute();

 

DataStream

env.addSource

第一步是产生source,

public 
DataStreamSource
addSource(SourceFunction
function, String sourceName, TypeInformation
typeInfo) { if(typeInfo == null) { //如果没有指定typeInfo,做类型推断 if (function instanceof ResultTypeQueryable) { typeInfo = ((ResultTypeQueryable
) function).getProducedType(); } else { try { typeInfo = TypeExtractor.createTypeInfo( SourceFunction.class, function.getClass(), 0, null, null); } catch (final InvalidTypesException e) { typeInfo = (TypeInformation
) new MissingTypeInfo(sourceName, e); } } } boolean isParallel = function instanceof ParallelSourceFunction; clean(function); StreamSource
sourceOperator; if (function instanceof StoppableFunction) { sourceOperator = new StoppableStreamSource<>(cast2StoppableSourceFunction(function)); } else { sourceOperator = new StreamSource<>(function); //将SourceFunction封装成StreamSource } return new DataStreamSource<>(this, typeInfo, sourceOperator, isParallel, sourceName); //将StreamSource封装成DataStreamSource }

 

StreamSource是一种StreamOperator,核心逻辑是run,

public class StreamSource
> extends AbstractUdfStreamOperator
implements StreamOperator
{ private transient SourceFunction.SourceContext
ctx; //用于collect output private transient volatile boolean canceledOrStopped = false; public StreamSource(SRC sourceFunction) { super(sourceFunction); this.chainingStrategy = ChainingStrategy.HEAD; //Source只能做Chaining Head } public void run(final Object lockingObject, final Output
> collector) throws Exception { final TimeCharacteristic timeCharacteristic = getOperatorConfig().getTimeCharacteristic(); LatencyMarksEmitter latencyEmitter = null; //latencyMarker的相关逻辑 if(getExecutionConfig().isLatencyTrackingEnabled()) { latencyEmitter = new LatencyMarksEmitter<>( getProcessingTimeService(), collector, getExecutionConfig().getLatencyTrackingInterval(), getOperatorConfig().getVertexID(), getRuntimeContext().getIndexOfThisSubtask()); } final long watermarkInterval = getRuntimeContext().getExecutionConfig().getAutoWatermarkInterval(); this.ctx = StreamSourceContexts.getSourceContext( timeCharacteristic, getProcessingTimeService(), lockingObject, collector, watermarkInterval); try { userFunction.run(ctx); //调用souceFunction执行用户逻辑,source应该不停的发送,该函数不会结束 // if we get here, then the user function either exited after being done (finite source) // or the function was canceled or stopped. For the finite source case, we should emit // a final watermark that indicates that we reached the end of event-time if (!isCanceledOrStopped()) { ctx.emitWatermark(Watermark.MAX_WATERMARK); //发出最大的waterMarker } } finally { } }

 

但是addSource返回的应该是DataStream,

所以将StreamSource封装成DataStreamSource

public class DataStreamSource
extends SingleOutputStreamOperator
{ boolean isParallel; public DataStreamSource(StreamExecutionEnvironment environment, TypeInformation
outTypeInfo, StreamSource
operator, boolean isParallel, String sourceName) { super(environment, new SourceTransformation<>(sourceName, operator, outTypeInfo, environment.getParallelism())); this.isParallel = isParallel; if (!isParallel) { setParallelism(1); } }

可以认为SourceTransformation是StreamOperator的封装

public class SingleOutputStreamOperator
extends DataStream
{ protected SingleOutputStreamOperator(StreamExecutionEnvironment environment, StreamTransformation
transformation) { super(environment, transformation); }

而DataStream是StreamTransformation的封装

SingleOutputStreamOperator,这个命名简直不可理喻,集成自DataStream,叫Operator

 

 

map操作

在DataStream中,

public 
SingleOutputStreamOperator
map(MapFunction
mapper) { TypeInformation
outType = TypeExtractor.getMapReturnTypes(clean(mapper), getType(), Utils.getCallLocationName(), true); return transform("Map", outType, new StreamMap<>(clean(mapper))); }

 

这里,StreamMap是StreamOperator

public class StreamMap
extends AbstractUdfStreamOperator
> implements OneInputStreamOperator
{ public StreamMap(MapFunction
mapper) { super(mapper); chainingStrategy = ChainingStrategy.ALWAYS; //对于map而已,永远是可以chain的 } @Override public void processElement(StreamRecord
element) throws Exception { output.collect(element.replace(userFunction.map(element.getValue()))); //map的逻辑就执行mapFunc,并替换原有的element }}

 

调用transform,

public 
SingleOutputStreamOperator
transform(String operatorName, TypeInformation
outTypeInfo, OneInputStreamOperator
operator) { OneInputTransformation
resultTransform = new OneInputTransformation<>( this.transformation, operatorName, operator, outTypeInfo, environment.getParallelism()); @SuppressWarnings({ "unchecked", "rawtypes" }) SingleOutputStreamOperator
returnStream = new SingleOutputStreamOperator(environment, resultTransform); getExecutionEnvironment().addOperator(resultTransform); return returnStream; }

可以看到这里做了两层封装,从operator –> transformation –> dataStream

最后调用getExecutionEnvironment().addOperator(resultTransform);

protected final List
> transformations = new ArrayList<>(); public void addOperator(StreamTransformation
transformation) { Preconditions.checkNotNull(transformation, "transformation must not be null."); this.transformations.add(transformation); }

这个会把StreamTransformation,注册到transformations 这个结构中,后面会用到

 

sink

public DataStreamSink
addSink(SinkFunction
sinkFunction) { // configure the type if needed if (sinkFunction instanceof InputTypeConfigurable) { ((InputTypeConfigurable) sinkFunction).setInputType(getType(), getExecutionConfig() ); } StreamSink
sinkOperator = new StreamSink<>(clean(sinkFunction)); DataStreamSink
sink = new DataStreamSink<>(this, sinkOperator); getExecutionEnvironment().addOperator(sink.getTransformation()); return sink; }

 

StreamSink是operator,

public class StreamSink
extends AbstractUdfStreamOperator
> implements OneInputStreamOperator
{ public StreamSink(SinkFunction
sinkFunction) { super(sinkFunction); chainingStrategy = ChainingStrategy.ALWAYS; //对于sink也是永远可以chain的 } @Override public void processElement(StreamRecord
element) throws Exception { userFunction.invoke(element.getValue()); } @Override protected void reportOrForwardLatencyMarker(LatencyMarker maker) { // all operators are tracking latencies this.latencyGauge.reportLatency(maker, true); // sinks don't forward latency markers }}

 

而DataStreamSink不是DataStream,而是和DataStream对等的一个类,因为他的作用也是封装SinkTransformation

public class DataStreamSink
{ SinkTransformation
transformation; @SuppressWarnings("unchecked") protected DataStreamSink(DataStream
inputStream, StreamSink
operator) { this.transformation = new SinkTransformation
(inputStream.getTransformation(), "Unnamed", operator, inputStream.getExecutionEnvironment().getParallelism()); }

最终也是注册到执行环境,

getExecutionEnvironment().addOperator(sink.getTransformation());

 

DataStream,最终形成一个StreamTransformation的树

 

StreamGraph

下面就开始执行,

env.execute

public JobExecutionResult execute(String jobName) throws ProgramInvocationException {    StreamGraph streamGraph = getStreamGraph();    streamGraph.setJobName(jobName);    transformations.clear();    return executeRemotely(streamGraph);}

可以看到这里调用的是StreamGraphGenerator.generate

而传入的参数,就是之前的transformations,所有operator和sink都注册在里面

public StreamGraph getStreamGraph() {    if (transformations.size() <= 0) {        throw new IllegalStateException("No operators defined in streaming topology. Cannot execute.");    }    return StreamGraphGenerator.generate(this, transformations);}

 

StreamGraphGenerator

public class StreamGraphGenerator {    // The StreamGraph that is being built, this is initialized at the beginning.    private StreamGraph streamGraph;    private final StreamExecutionEnvironment env;    // Keep track of which Transforms we have already transformed, this is necessary because    // we have loops, i.e. feedback edges.    private Map
, Collection
> alreadyTransformed; //防止环,所以把transformed过的记下来 /** * Private constructor. The generator should only be invoked using {
@link #generate}. */ private StreamGraphGenerator(StreamExecutionEnvironment env) { this.streamGraph = new StreamGraph(env); this.streamGraph.setChaining(env.isChainingEnabled()); this.streamGraph.setStateBackend(env.getStateBackend()); this.env = env; this.alreadyTransformed = new HashMap<>(); } /** * Generates a {
@code StreamGraph} by traversing the graph of {
@code StreamTransformations} * starting from the given transformations. * * @param env The {
@code StreamExecutionEnvironment} that is used to set some parameters of the * job * @param transformations The transformations starting from which to transform the graph * * @return The generated {
@code StreamGraph} */ public static StreamGraph generate(StreamExecutionEnvironment env, List
> transformations) { return new StreamGraphGenerator(env).generateInternal(transformations); } /** * This starts the actual transformation, beginning from the sinks. */ private StreamGraph generateInternal(List
> transformations) { for (StreamTransformation
transformation: transformations) { transform(transformation); } return streamGraph; }

对每个StreamTransformation调用transform逻辑,

private Collection
transform(StreamTransformation
transform) { if (alreadyTransformed.containsKey(transform)) { return alreadyTransformed.get(transform); //如果transform过,就直接返回 } Collection
transformedIds; if (transform instanceof OneInputTransformation
) { transformedIds = transformOnInputTransform((OneInputTransformation
) transform); } else if (transform instanceof TwoInputTransformation
) { transformedIds = transformTwoInputTransform((TwoInputTransformation
) transform); } else if (transform instanceof SourceTransformation
) { transformedIds = transformSource((SourceTransformation
) transform); } else if (transform instanceof SinkTransformation
) { transformedIds = transformSink((SinkTransformation
) transform); } else if (transform instanceof UnionTransformation
) { transformedIds = transformUnion((UnionTransformation
) transform); } else if (transform instanceof SplitTransformation
) { transformedIds = transformSplit((SplitTransformation
) transform); } else if (transform instanceof SelectTransformation
) { transformedIds = transformSelect((SelectTransformation
) transform); } else if (transform instanceof FeedbackTransformation
) { transformedIds = transformFeedback((FeedbackTransformation
) transform); } else if (transform instanceof CoFeedbackTransformation
) { transformedIds = transformCoFeedback((CoFeedbackTransformation
) transform); } else if (transform instanceof PartitionTransformation
) { transformedIds = transformPartition((PartitionTransformation
) transform); } return transformedIds; }

上面有用到,OneInputTransformation,SourceTransformation,SinkTransformation

transformOnInputTransform

/**     * Transforms a {
@code OneInputTransformation}. * *

* This recusively transforms the inputs, creates a new {

@code StreamNode} in the graph and * wired the inputs to this new node. */ private
Collection
transformOnInputTransform(OneInputTransformation
transform) { Collection
inputIds = transform(transform.getInput()); //递归调用transform,所以前面source没有加到transformations,因为这里会递归到 // the recursive call might have already transformed this if (alreadyTransformed.containsKey(transform)) { return alreadyTransformed.get(transform); //如果已经transform过,直接返回 } String slotSharingGroup = determineSlotSharingGroup(transform.getSlotSharingGroup(), inputIds); //产生slotSharingGroup streamGraph.addOperator(transform.getId(), //addOperator slotSharingGroup, transform.getOperator(), transform.getInputType(), transform.getOutputType(), transform.getName()); if (transform.getStateKeySelector() != null) { TypeSerializer
keySerializer = transform.getStateKeyType().createSerializer(env.getConfig()); streamGraph.setOneInputStateKey(transform.getId(), transform.getStateKeySelector(), keySerializer); } streamGraph.setParallelism(transform.getId(), transform.getParallelism()); streamGraph.setMaxParallelism(transform.getId(), transform.getMaxParallelism()); for (Integer inputId: inputIds) { streamGraph.addEdge(inputId, transform.getId(), 0); //addEdge } return Collections.singleton(transform.getId()); }

transform id代表什么?

public abstract class StreamTransformation
{ // This is used to assign a unique ID to every StreamTransformation protected static Integer idCounter = 0; public static int getNewNodeId() { idCounter++; return idCounter; } protected final int id; public StreamTransformation(String name, TypeInformation
outputType, int parallelism) { this.id = getNewNodeId();

可以看到这个id是从0开始自增长的值,先加后返回,所以第一个transform id为1

类static,所以取决于StreamTransformation对象创建的顺序

 

slotSharingGroup,这里只是名字,所以是string

public abstract class StreamTransformation
{ private String slotSharingGroup; public StreamTransformation(String name, TypeInformation
outputType, int parallelism) { this.slotSharingGroup = null;

默认下slotSharingGroup 是null,没有设置

 

在DataStreamSink, SingleOutputStreamOperator中都可以设置,

/**     * Sets the slot sharing group of this operation. Parallel instances of     * operations that are in the same slot sharing group will be co-located in the same     * TaskManager slot, if possible.     *     * 

Operations inherit the slot sharing group of input operations if all input operations * are in the same slot sharing group and no slot sharing group was explicitly specified. * *

Initially an operation is in the default slot sharing group. An operation can be put into * the default group explicitly by setting the slot sharing group to {

@code "default"}. * * @param slotSharingGroup The slot sharing group name. */ @PublicEvolving public DataStreamSink
slotSharingGroup(String slotSharingGroup) { transformation.setSlotSharingGroup(slotSharingGroup); return this; }

这是用户可以直接通过api设置的

someStream.filter(...).slotSharingGroup("group1")

 

determineSlotSharingGroup

/**     * Determines the slot sharing group for an operation based on the slot sharing group set by     * the user and the slot sharing groups of the inputs.     *     * 

If the user specifies a group name, this is taken as is. If nothing is specified and * the input operations all have the same group name then this name is taken. Otherwise the * default group is choosen. * * @param specifiedGroup The group specified by the user. * @param inputIds The IDs of the input operations. */ private String determineSlotSharingGroup(String specifiedGroup, Collection

inputIds) { if (specifiedGroup != null) { //如果用户指定,以用户指定为准 return specifiedGroup; } else { String inputGroup = null; for (int id: inputIds) { //根据输入的SlotSharingGroup进行推断 String inputGroupCandidate = streamGraph.getSlotSharingGroup(id); if (inputGroup == null) { inputGroup = inputGroupCandidate; //初始化 } else if (!inputGroup.equals(inputGroupCandidate)) { //逻辑如果所有input的SlotSharingGroup都相同,就用;否则就用“default” return "default"; } } return inputGroup == null ? "default" : inputGroup; //默认用default } }

如果用户不指定,那么所有operator都默认在default slotSharingGroup下

如果用户指定,以用户指定为准

 

streamGraph.addOperator

public 
void addOperator( Integer vertexID, String slotSharingGroup, StreamOperator
operatorObject, TypeInformation
inTypeInfo, TypeInformation
outTypeInfo, String operatorName) { if (operatorObject instanceof StoppableStreamSource) { addNode(vertexID, slotSharingGroup, StoppableSourceStreamTask.class, operatorObject, operatorName); } else if (operatorObject instanceof StreamSource) { addNode(vertexID, slotSharingGroup, SourceStreamTask.class, operatorObject, operatorName); } else { addNode(vertexID, slotSharingGroup, OneInputStreamTask.class, operatorObject, operatorName); }

Integer vertexID, 可以看到vertexId就是transform.getId()

protected StreamNode addNode(Integer vertexID,        String slotSharingGroup,        Class
vertexClass, StreamOperator
operatorObject, String operatorName) { if (streamNodes.containsKey(vertexID)) { //如果已经有vertexId throw new RuntimeException("Duplicate vertexID " + vertexID); } StreamNode vertex = new StreamNode(environment, vertexID, slotSharingGroup, operatorObject, operatorName, new ArrayList
>(), vertexClass); streamNodes.put(vertexID, vertex); return vertex; }

StreamNode其实就是Transformation的封装

区别在于,不是每一个Transformation都会形成一个StreamNode

 

streamGraph.addEdge

在transformation中,通过递归的记录input transformation来表示之间的关系

这里增加edge抽象

streamGraph.addEdge(inputId, transform.getId(), 0);

public void addEdge(Integer upStreamVertexID, Integer downStreamVertexID, int typeNumber) {        addEdgeInternal(upStreamVertexID,                downStreamVertexID,                typeNumber,                null,                new ArrayList
()); }

 

private void addEdgeInternal(Integer upStreamVertexID,            Integer downStreamVertexID,            int typeNumber,            StreamPartitioner
partitioner, List
outputNames) { if (virtualSelectNodes.containsKey(upStreamVertexID)) { //如果是虚拟select节点 int virtualId = upStreamVertexID; upStreamVertexID = virtualSelectNodes.get(virtualId).f0; //由于不是真实节点,所以以虚拟节点的父节点为父节点 if (outputNames.isEmpty()) { // selections that happen downstream override earlier selections outputNames = virtualSelectNodes.get(virtualId).f1; //将select虚拟节点,转换为outputNames } addEdgeInternal(upStreamVertexID, downStreamVertexID, typeNumber, partitioner, outputNames);//递归的调用addEdgeInternal } else if (virtualPartitionNodes.containsKey(upStreamVertexID)) { int virtualId = upStreamVertexID; upStreamVertexID = virtualPartitionNodes.get(virtualId).f0; if (partitioner == null) { partitioner = virtualPartitionNodes.get(virtualId).f1; //对于partition虚拟节点,转换为partitioner } addEdgeInternal(upStreamVertexID, downStreamVertexID, typeNumber, partitioner, outputNames);//递归的调用addEdgeInternal } else { StreamNode upstreamNode = getStreamNode(upStreamVertexID); StreamNode downstreamNode = getStreamNode(downStreamVertexID); // If no partitioner was specified and the parallelism of upstream and downstream // operator matches use forward partitioning, use rebalance otherwise. if (partitioner == null && upstreamNode.getParallelism() == downstreamNode.getParallelism()) { //关键逻辑,决定默认partitioner partitioner = new ForwardPartitioner
(); //如果并发度相同则是forward } else if (partitioner == null) { partitioner = new RebalancePartitioner(); //如果并发度不同则是Rebalance } if (partitioner instanceof ForwardPartitioner) { //判断如果用户指定forward,而并发度不同,抛异常 if (upstreamNode.getParallelism() != downstreamNode.getParallelism()) { throw new UnsupportedOperationException("Forward partitioning does not allow " + "change of parallelism. Upstream operation: " + upstreamNode + " parallelism: " + upstreamNode.getParallelism() + ", downstream operation: " + downstreamNode + " parallelism: " + downstreamNode.getParallelism() + " You must use another partitioning strategy, such as broadcast, rebalance, shuffle or global."); } } StreamEdge edge = new StreamEdge(upstreamNode, downstreamNode, typeNumber, outputNames, partitioner); //创建StreamEdge getStreamNode(edge.getSourceId()).addOutEdge(edge); //将上下游StreamNode用StreamEdge相连 getStreamNode(edge.getTargetId()).addInEdge(edge); } }

可以看到对于select和partition这样的虚拟node,会被封装在StreamEdge中,而不会真正产生StreamNode

如下示意图,

/**  * The following graph of {
@code StreamTransformations}: * *
{
@code *   Source              Source         *     +                    +            *     |                    |            *     v                    v            * Rebalance          HashPartition     *     +                    +            *     |                    |            *     |                    |            *     +------>Union<------+            *               +                      *               |                      *               v                      *             Split                    *               +                      *               |                      *               v                      *             Select                   *               +                      *               v                      *              Map                     *               +                      *               |                      *               v                      *             Sink  * }
* * Would result in this graph of operations at runtime: * *
{
@code * Source Source * + + * | | * | | * +------->Map<-------+ * + * | * v * Sink * /

 

SourceTransformation,SinkTransformation都大同小异,不详述了

看下对虚拟节点处理,

transformPartition

private 
Collection
transformPartition(PartitionTransformation
partition) { StreamTransformation
input = partition.getInput(); List
resultIds = new ArrayList<>(); Collection
transformedIds = transform(input); //递归transform父节点,并得到他们的id for (Integer transformedId: transformedIds) { int virtualId = StreamTransformation.getNewNodeId(); //产生自己的id streamGraph.addVirtualPartitionNode(transformedId, virtualId, partition.getPartitioner()); //只是注册到VirtualPartitionNode,而没有真正产生StreamNode resultIds.add(virtualId); } return resultIds; }

 

transformUnion

private 
Collection
transformUnion(UnionTransformation
union) { List
> inputs = union.getInputs(); List
resultIds = new ArrayList<>(); for (StreamTransformation
input: inputs) { resultIds.addAll(transform(input)); //递归 } return resultIds; }

只是简单的将inputs合并

 

JobGraph

 

env.execute

public JobExecutionResult execute(String jobName) throws ProgramInvocationException {    StreamGraph streamGraph = getStreamGraph();    streamGraph.setJobName(jobName);    transformations.clear();    return executeRemotely(streamGraph);}
继续

executeRemotely

protected JobExecutionResult executeRemotely(StreamGraph streamGraph, List
jarFiles) throws ProgramInvocationException { ClusterClient client; try { return client.run(streamGraph, jarFiles, globalClasspaths, usercodeClassLoader).getJobExecutionResult(); } }

 

ClusterClient.run

public JobSubmissionResult run(FlinkPlan compiledPlan,            List
libraries, List
classpaths, ClassLoader classLoader, SavepointRestoreSettings savepointSettings) throws ProgramInvocationException { JobGraph job = getJobGraph(compiledPlan, libraries, classpaths, savepointSettings); return submitJob(job, classLoader); }

 

private JobGraph getJobGraph(FlinkPlan optPlan, List
jarFiles, List
classpaths, SavepointRestoreSettings savepointSettings) { JobGraph job; if (optPlan instanceof StreamingPlan) { //如果是流job plan job = ((StreamingPlan) optPlan).getJobGraph(); job.setSavepointRestoreSettings(savepointSettings); } else { //如果是batch JobGraphGenerator gen = new JobGraphGenerator(this.flinkConfig); job = gen.compileJobGraph((OptimizedPlan) optPlan); } for (URL jar : jarFiles) { try { job.addJar(new Path(jar.toURI())); //加入jar } catch (URISyntaxException e) { throw new RuntimeException("URL is invalid. This should not happen.", e); } } job.setClasspaths(classpaths); //加上classpath return job; }

 

对于流的case,调用到,

((StreamingPlan) optPlan).getJobGraph();

 

StreamGraph.getJobGraph

public JobGraph getJobGraph() {        StreamingJobGraphGenerator jobgraphGenerator = new StreamingJobGraphGenerator(this);        return jobgraphGenerator.createJobGraph();    }

 

StreamingJobGraphGenerator.createJobGraph

 

public JobGraph createJobGraph() {        jobGraph = new JobGraph(streamGraph.getJobName()); //创建JobGraph        // make sure that all vertices start immediately        jobGraph.setScheduleMode(ScheduleMode.EAGER); //对于流所有vertices需要立即启动,相对的模式,LAZY_FROM_SOURCES,task只有在input ready时,才会创建        init(); //简单的结构new,初始化        // Generate deterministic hashes for the nodes in order to identify them across        // submission iff they didn't change.        Map
hashes = defaultStreamGraphHasher.traverseStreamGraphAndGenerateHashes(streamGraph); //为每个node创建唯一的hashid,这样多次提交时能够定位到,最终返回node id和hash id的对应 setChaining(hashes, legacyHashes); //核心逻辑,创建JobVertex,JobEdge setPhysicalEdges(); //只是将每个vertex的入边信息,写入该vertex所对应的StreamConfig里面 setSlotSharing(); configureCheckpointing(); // set the ExecutionConfig last when it has been finalized jobGraph.setExecutionConfig(streamGraph.getExecutionConfig()); return jobGraph; }

 

setChaining

private void setChaining(Map
hashes, List
> legacyHashes) { for (Integer sourceNodeId : streamGraph.getSourceIDs()) { createChain(sourceNodeId, sourceNodeId, hashes, legacyHashes, 0); } }

对每个source,调用createChain

private List
createChain( Integer startNodeId, Integer currentNodeId, Map
hashes, List
> legacyHashes, int chainIndex) { if (!builtVertices.contains(startNodeId)) { List
transitiveOutEdges = new ArrayList
();//最终要生成JobEdge的StreamingEdge List
chainableOutputs = new ArrayList
(); List
nonChainableOutputs = new ArrayList
(); for (StreamEdge outEdge : streamGraph.getStreamNode(currentNodeId).getOutEdges()) { //遍历当前Node的所有出边 if (isChainable(outEdge, streamGraph)) { //判断是否可以chain,核心逻辑 chainableOutputs.add(outEdge); } else { nonChainableOutputs.add(outEdge); } } for (StreamEdge chainable : chainableOutputs) { //对于chainable,递归调用下去 transitiveOutEdges.addAll( createChain(startNodeId, chainable.getTargetId(), hashes, legacyHashes, chainIndex + 1)); //currentNodeId设为targetNode的id,同时chainIndex加1 } for (StreamEdge nonChainable : nonChainableOutputs) { //对于nonChainable transitiveOutEdges.add(nonChainable); //既然不是chained,就需要产生真正的JobEdge,所以放到transitiveOutEdges createChain(nonChainable.getTargetId(), nonChainable.getTargetId(), hashes, legacyHashes, 0); //继续,但注意这里startNodeId和currentNodeId都设为TargetId,因为当前的非chained,下一个需要开始新的chain } chainedNames.put(currentNodeId, createChainedName(currentNodeId, chainableOutputs)); //为每个chain生成name StreamConfig config = currentNodeId.equals(startNodeId) ? createJobVertex(startNodeId, hashes, legacyHashes) //只有为chain中的startNode创建JobVertex,其他的只是创建空StreamConfig : new StreamConfig(new Configuration()); setVertexConfig(currentNodeId, config, chainableOutputs, nonChainableOutputs); //将StreamNode中的配置放到StreamConfig中 if (currentNodeId.equals(startNodeId)) { //如果是startNode config.setChainStart(); config.setChainIndex(0); config.setOperatorName(streamGraph.getStreamNode(currentNodeId).getOperatorName()); config.setOutEdgesInOrder(transitiveOutEdges); config.setOutEdges(streamGraph.getStreamNode(currentNodeId).getOutEdges()); for (StreamEdge edge : transitiveOutEdges) { connect(startNodeId, edge); //只要startNode需要connect edge } config.setTransitiveChainedTaskConfigs(chainedConfigs.get(startNodeId)); } else { Map
chainedConfs = chainedConfigs.get(startNodeId); if (chainedConfs == null) { chainedConfigs.put(startNodeId, new HashMap
()); } config.setChainIndex(chainIndex); config.setOperatorName(streamGraph.getStreamNode(currentNodeId).getOperatorName()); chainedConfigs.get(startNodeId).put(currentNodeId, config); } if (chainableOutputs.isEmpty()) { config.setChainEnd(); } return transitiveOutEdges; } else { return new ArrayList<>(); } }

 

isChainable

public static boolean isChainable(StreamEdge edge, StreamGraph streamGraph) {        StreamNode upStreamVertex = edge.getSourceVertex(); //StreamEdge的起点        StreamNode downStreamVertex = edge.getTargetVertex(); //StreamEdge的终点        StreamOperator
headOperator = upStreamVertex.getOperator(); StreamOperator
outOperator = downStreamVertex.getOperator(); return downStreamVertex.getInEdges().size() == 1 //终点的入边为1,如果多个输入,需要等其他输入,无法chain执行 && outOperator != null && headOperator != null && upStreamVertex.isSameSlotSharingGroup(downStreamVertex) //在同一个SlotSharingGroup && outOperator.getChainingStrategy() == ChainingStrategy.ALWAYS //终点ChainingStrategy是Always && (headOperator.getChainingStrategy() == ChainingStrategy.HEAD || headOperator.getChainingStrategy() == ChainingStrategy.ALWAYS) //启动ChainingStrategy是Head或Always && (edge.getPartitioner() instanceof ForwardPartitioner) //Edge是ForwardPartitioner && upStreamVertex.getParallelism() == downStreamVertex.getParallelism() //起点和终点的并发度相同 && streamGraph.isChainingEnabled(); //允许chain }

 

createJobVertex

private StreamConfig createJobVertex(            Integer streamNodeId,            Map
hashes, List
> legacyHashes) { JobVertex jobVertex; StreamNode streamNode = streamGraph.getStreamNode(streamNodeId); byte[] hash = hashes.get(streamNodeId); //取出streamNode对应的唯一id JobVertexID jobVertexId = new JobVertexID(hash); //生成JobVertexID if (streamNode.getInputFormat() != null) { jobVertex = new InputFormatVertex( chainedNames.get(streamNodeId), jobVertexId, legacyJobVertexIds); TaskConfig taskConfig = new TaskConfig(jobVertex.getConfiguration()); taskConfig.setStubWrapper(new UserCodeObjectWrapper
(streamNode.getInputFormat())); } else { jobVertex = new JobVertex( chainedNames.get(streamNodeId), jobVertexId, legacyJobVertexIds); } jobVertex.setInvokableClass(streamNode.getJobVertexClass()); int parallelism = streamNode.getParallelism(); if (parallelism > 0) { jobVertex.setParallelism(parallelism); //设置并发度 } else { parallelism = jobVertex.getParallelism(); } jobVertex.setMaxParallelism(streamNode.getMaxParallelism()); jobVertices.put(streamNodeId, jobVertex); //将jobVertex加到相应的结构中去 builtVertices.add(streamNodeId); jobGraph.addVertex(jobVertex); return new StreamConfig(jobVertex.getConfiguration()); }

 

connect(startNodeId, edge)

只需要去connect transitiveOutEdges

为何叫transitive,对于一组chain node,其实只会创建HeadNode所对应的JobVertex;并且在建立链接的时候,只需要对nonchainable的边建JobEdge

上面看到,在递归调用createChain的时候会传回所有的transitiveOutEdges,因为后面chain node没有创建JobVertex,所以他们连的nonchainable的边也要放到HeadNode上,这可以理解是一种传递

private void connect(Integer headOfChain, StreamEdge edge) {        physicalEdgesInOrder.add(edge);//connect都是物理边,即会产生JobEdge        Integer downStreamvertexID = edge.getTargetId();        JobVertex headVertex = jobVertices.get(headOfChain);        JobVertex downStreamVertex = jobVertices.get(downStreamvertexID);        StreamConfig downStreamConfig = new StreamConfig(downStreamVertex.getConfiguration());        downStreamConfig.setNumberOfInputs(downStreamConfig.getNumberOfInputs() + 1); //多一个入边,inputs + 1        StreamPartitioner
partitioner = edge.getPartitioner(); JobEdge jobEdge = null; if (partitioner instanceof ForwardPartitioner) { jobEdge = downStreamVertex.connectNewDataSetAsInput( headVertex, DistributionPattern.POINTWISE, ResultPartitionType.PIPELINED); //Streaming都是pipelining,即一有结果,consumer就会来拖 } else if (partitioner instanceof RescalePartitioner){ jobEdge = downStreamVertex.connectNewDataSetAsInput( headVertex, DistributionPattern.POINTWISE, //produer的subtask可以对应一个或多个consumer的tasks ResultPartitionType.PIPELINED); } else { jobEdge = downStreamVertex.connectNewDataSetAsInput( headVertex, DistributionPattern.ALL_TO_ALL, //producer和consumer的subtask,一对一 ResultPartitionType.PIPELINED); } // set strategy name so that web interface can show it. jobEdge.setShipStrategyName(partitioner.toString()); }

 

downStreamVertex.connectNewDataSetAsInput

JobVertex.connectNewDataSetAsInput

public JobEdge connectNewDataSetAsInput(            JobVertex input,            DistributionPattern distPattern,            ResultPartitionType partitionType) {        IntermediateDataSet dataSet = input.createAndAddResultDataSet(partitionType); //创建IntermediateDataSet,并注册到inputVertex        JobEdge edge = new JobEdge(dataSet, this, distPattern); //创建JobEdge        this.inputs.add(edge); //把edge作为当前vertex的input        dataSet.addConsumer(edge); //edge从IntermediateDataSet去数据        return edge;    }

setSlotSharing

private void setSlotSharing() {        Map
slotSharingGroups = new HashMap<>(); for (Entry
entry : jobVertices.entrySet()) { //遍历每个JobVertex String slotSharingGroup = streamGraph.getStreamNode(entry.getKey()).getSlotSharingGroup(); SlotSharingGroup group = slotSharingGroups.get(slotSharingGroup); if (group == null) { group = new SlotSharingGroup(); //初始化SlotSharingGroup slotSharingGroups.put(slotSharingGroup, group); } entry.getValue().setSlotSharingGroup(group); //把节点加入SlotSharingGroup } for (Tuple2
pair : streamGraph.getIterationSourceSinkPairs()) { //对于Iteration要创建CoLocationGroup CoLocationGroup ccg = new CoLocationGroup(); JobVertex source = jobVertices.get(pair.f0.getId()); JobVertex sink = jobVertices.get(pair.f1.getId()); ccg.addVertex(source); ccg.addVertex(sink); source.updateCoLocationGroup(ccg); sink.updateCoLocationGroup(ccg); } }

configureCheckpointing

private void configureCheckpointing() {        CheckpointConfig cfg = streamGraph.getCheckpointConfig();        long interval = cfg.getCheckpointInterval();        if (interval > 0) {  //只要设置过CheckpointInterval,默认设为fixedDelayRestart策略            // check if a restart strategy has been set, if not then set the FixedDelayRestartStrategy            if (streamGraph.getExecutionConfig().getRestartStrategy() == null) {                // if the user enabled checkpointing, the default number of exec retries is infinite.                streamGraph.getExecutionConfig().setRestartStrategy(                    RestartStrategies.fixedDelayRestart(Integer.MAX_VALUE, DEFAULT_RESTART_DELAY));            }        } else {            // interval of max value means disable periodic checkpoint            interval = Long.MAX_VALUE;        }        // collect the vertices that receive "trigger checkpoint" messages.        // currently, these are all the sources        List
triggerVertices = new ArrayList<>(); // collect the vertices that need to acknowledge the checkpoint // currently, these are all vertices List
ackVertices = new ArrayList<>(jobVertices.size()); //所以JobVertex都需要ack // collect the vertices that receive "commit checkpoint" messages // currently, these are all vertices List
commitVertices = new ArrayList<>(); for (JobVertex vertex : jobVertices.values()) { if (vertex.isInputVertex()) { //没有输入的Vertex triggerVertices.add(vertex.getID()); //加入triggerVertex } commitVertices.add(vertex.getID()); ackVertices.add(vertex.getID()); } CheckpointingMode mode = cfg.getCheckpointingMode(); boolean isExactlyOnce; if (mode == CheckpointingMode.EXACTLY_ONCE) { //Checkpoint模式 isExactlyOnce = true; } else if (mode == CheckpointingMode.AT_LEAST_ONCE) { isExactlyOnce = false; } else { throw new IllegalStateException("Unexpected checkpointing mode. " + "Did not expect there to be another checkpointing mode besides " + "exactly-once or at-least-once."); } JobSnapshottingSettings settings = new JobSnapshottingSettings( triggerVertices, ackVertices, commitVertices, interval, cfg.getCheckpointTimeout(), cfg.getMinPauseBetweenCheckpoints(), cfg.getMaxConcurrentCheckpoints(), externalizedCheckpointSettings, isExactlyOnce); jobGraph.setSnapshotSettings(settings); }

 

至此,JobGraph已经完成

最终,将JobGraph发送到JobManager

 

参考,

转载地址:http://hqfyl.baihongyu.com/

你可能感兴趣的文章
OpenSSL 与 SSL 数字证书概念贴
查看>>
云时代下的开源之路 专访阿里云数据库团队
查看>>
Android的sdk、api及工程目录说明
查看>>
RHEL64 缺少ISO 9660图像 安装程序试图挂载映像#1,在硬盘上无法找到该映像
查看>>
源码安装apache
查看>>
CentOS7安装MySQL5.6.27数据库
查看>>
Data Guard Broker系列之五:数据库角色转换
查看>>
第三章_JSP
查看>>
【原创】modb 功能设计之“支持部分MySQL客户端协议”-3
查看>>
Spring中你不知道的注入方式
查看>>
导航点击选中效果重构
查看>>
手机端产生本地图形验证码
查看>>
C++设计模式之1-工厂模式
查看>>
Android自定义View 画弧形,文字,并增加动画效果
查看>>
设计模式 总揽 通过这篇随笔可以访问所需要了解的设计模式
查看>>
Java HotSpot VM中的JIT编译
查看>>
敏捷软件测试--初见
查看>>
NSLayoutConstraint
查看>>
深入浅出: 大小端模式
查看>>
走在网页游戏开发的路上(十一)
查看>>