IT博客汇
  • 首页
  • 精华
  • 技术
  • 设计
  • 资讯
  • 扯淡
  • 权利声明
  • 登录 注册

    Flink批处理生成JobGraph流程分析

    Yanjun发表于 2020-03-16 10:22:37
    love 0

    我们先通过一个简单的批量处理WordCount的程序,来了解一下,在编程API层面的基本使用方法,以及Flink是如何基于Pipeline风格的API实现一个完整的应用程序的,还有就是每一步操作的底层,都是如何进行转换的。WordCount程序代码,示例如下所示:

            final MultipleParameterTool params = MultipleParameterTool.fromArgs(args);
            final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
            env.getConfig().setGlobalJobParameters(params);
    
            // get input data
            DataSet<String> text = env.readTextFile(input);
            DataSet<Tuple2<String, Integer>> counts =
                    // split up the lines in pairs (2-tuples) containing: (word,1)
                    text.flatMap(new Tokenizer())
                    // group by the tuple field "0" and sum up tuple field "1"
                    .groupBy(0)
                    .sum(1);
            counts.writeAsCsv(params.get("output"), "\n", " ");
            // execute program
            env.execute("WordCount Example");
    

    上述WordCount示例程序,它在用户编程API使用的层面来看是呈线性的,对应一个计算DAG,我们以它为例因为线性比较简单,更容易理解和说明。通过查看Flink源码可以看到,上面程序通过调用ExecutionEnvironment的readTextFile()、flatMap()、groupBy()、sum()、writeAsCsv()方法,生成了一个计算Job DAG。这个Job可以通过下图来展示Flink在High-Level API层面是如何映射到我们所说的Operator组件的,如图所示:
    Flink-Batch-WordCount
    上图中,第一层蓝色方框表示通过用户编程API调用方法后,转换成的各个DataSet对象,它们表示每经过一个转换操作,都由一个DataSet生成另一个DataSet。除了DataSink和UnsortedGrouping以外,这些DataSet都是位于org.apache.flink.api.java.operators包下面的实现类。第二层的黑色方框表示,为了能够进一步校验用户编写的Batch Job程序的规范性,以及获取更多有关Dataflow的信息,又抽象出了一层Operator设计,这一层Operator位于org.apache.flink.api.common.operators包下面。
    从左右向右箭头表示,用户程序对输入数据集进行转换操作的顺序;从右向左箭头表示,每经过一次转换操作生成一个新的DataSet,它内部都会维护一个指针用来指向上一个DataSet,这方便了对各个转换操作新生成生成DataSet的顺序进行跟踪。其实,Flink在构建JobGraph的过程中,就是从后向前进行遍历,不断对Job的DAG图中各个Operator进行增强和优化。下面我们会对这两层进行详细解释说明。
    有关DataSet类的设计,如下图所示:
    CD.DataSet
    图中各个Operator的具体实现类,是构建DAG图最原始的表示形式,他们内部直接封装了通过编程API传入的函数,或者Flink内部为完成DAG而实现的内置函数。我们以flatMap为例,调用flatMap()会创建名为FlatMapOperator的DataSet,具体实现代码如下所示:

        public <R> FlatMapOperator<T, R> flatMap(FlatMapFunction<T, R> flatMapper) {
            if (flatMapper == null) {
                throw new NullPointerException("FlatMap function must not be null.");
            }
    
            String callLocation = Utils.getCallLocationName();
            TypeInformation<R> resultType = TypeExtractor.getFlatMapReturnTypes(flatMapper, getType(), callLocation, true);
            return new FlatMapOperator<>(this, resultType, clean(flatMapper), callLocation);
        }
    

    代码中的FlatMapFunction就是用户编写Flink程序时,传入给flatMap()方法的函数实现,这个函数会被设置为FlatMapOperator对象的一个function属性的值。
    当我们编写的Flink批量处理程序中,最后会调用ExecutionEnvironment.execute(),在该方法中会对最原始的由Operator构建好的DAG图进行处理,处理的基本流程简要描述如下所示:

    1. 创建一个Plan,它是Pipeline接口的具体实现类,表示了从DataSink到DataSource实现连通的数据流DAG图。
    2. 根据输入配置,获取到对应的PipelineExecutor,用来将实现的Job提交到执行引擎进行计算处理。
    3. 将Plan进一步进行优化,编译生成OptimizedPlan,它表示了对应的执行计划,通过抽象的数据结构PlanNode和Channel来更加精确描述程序如何被执行。
    4. 最后,根据OptimizedPlan生成最终的JobGraph,从而提交到集群JobManager进行真正执行。

    这里,我们不会非常深入地说明各个步骤涉及到的细节,但是会从更高Level来理解生成JobGraph的过程中,都做了哪些处理。

    生成Plan

    在生成Plan的过程中,就涉及到上面WordCount数据流DAG图中展示的第二层构成DAG的基本元素Operator,它们是位于org.apache.flink.api.common.operators包下面实现类,我们先看看这个Operator类体系的继承关系,如下图所示:
    CD.api.common.Operator
    上面类图中,大多数Operator实现类名称都带有一个Base的后缀,大部分都与org.apache.flink.api.java包下面的Operator实现类有一个对应关系,比如,FlatMapOperator对应FlatMapOperatorBase,DataSource对应DataSourceBase、AggregateOperator对应GroupReduceOperatorBase等等,不再一一列举。
    在DataSet的具体实现类中,每个类都有一个translateToDataFlow()方法,能够将该Operator的输入DataSet转换成一个org.apache.flink.api.common.operators.Operator,所以在从DataSink到DataSource递归遍历过程中,都会进行该转换操作。另外,还会根据用户编写程序中设置的各种参数,包括Flink默认的一些参数值等等,都配置到基于org.apache.flink.api.common.operators.Operator的具体对象上。下面是核心的处理代码:

        public Plan translateToPlan(List<DataSink<?>> sinks, String jobName) {
            List<GenericDataSinkBase<?>> planSinks = new ArrayList<>();
    
            for (DataSink<?> sink : sinks) {
                planSinks.add(translate(sink));
            }
    
            Plan p = new Plan(planSinks);
            p.setJobName(jobName);
            return p;
        }
    

    其中,参数sinks是在ExecutionEnvironment中管理的最原始Flink程序DAG,对于上面的WordCount例子,实际只有一个DataSink。上面的translate(sink)方法调用,对最初构建好的整个DAG进行翻译处理,它是一个递归处理过程,代码如下所示:

        private <T> GenericDataSinkBase<T> translate(DataSink<T> sink) {
            // translate the input recursively
            Operator<T> input = translate(sink.getDataSet());
            // translate the sink itself and connect it to the input
            GenericDataSinkBase<T> translatedSink = sink.translateToDataFlow(input);
            translatedSink.setResources(sink.getMinResources(), sink.getPreferredResources());
            return translatedSink;
        }
    

    上面translate(sink.getDataSet())方法调用中,对各种不同类型的Operator进行翻译处理,因为每个Operator的特点以及行为本身就不同,所以translate()方法中分别进行了不同的处理,实现如下代码所示:

        private <T> Operator<T> translate(DataSet<T> dataSet) {
            while (dataSet instanceof NoOpOperator) {
                dataSet = ((NoOpOperator<T>) dataSet).getInput();
            }
    
            // check if we have already translated that data set (operation or source)
            Operator<?> previous = this.translated.get(dataSet);
            if (previous != null) {
                // Union operators may only have a single output.
                // We ensure this by not reusing previously created union operators.
                // The optimizer will merge subsequent binary unions into one n-ary union.
                if (!(dataSet instanceof UnionOperator)) {
                    // all other operators are reused.
                    @SuppressWarnings("unchecked")
                    Operator<T> typedPrevious = (Operator<T>) previous;
                    return typedPrevious;
                }
            }
    
            Operator<T> dataFlowOp;
            if (dataSet instanceof DataSource) {
                DataSource<T> dataSource = (DataSource<T>) dataSet;
                dataFlowOp = dataSource.translateToDataFlow();
                dataFlowOp.setResources(dataSource.getMinResources(), dataSource.getPreferredResources());
            }
            else if (dataSet instanceof SingleInputOperator) {
                SingleInputOperator<?, ?, ?> singleInputOperator = (SingleInputOperator<?, ?, ?>) dataSet;
                dataFlowOp = translateSingleInputOperator(singleInputOperator);
                dataFlowOp.setResources(singleInputOperator.getMinResources(), singleInputOperator.getPreferredResources());
            }
            else if (dataSet instanceof TwoInputOperator) {
                TwoInputOperator<?, ?, ?, ?> twoInputOperator = (TwoInputOperator<?, ?, ?, ?>) dataSet;
                dataFlowOp = translateTwoInputOperator(twoInputOperator);
                dataFlowOp.setResources(twoInputOperator.getMinResources(), twoInputOperator.getPreferredResources());
            }
            else if (dataSet instanceof BulkIterationResultSet) {
                BulkIterationResultSet<?> bulkIterationResultSet = (BulkIterationResultSet<?>) dataSet;
                dataFlowOp = translateBulkIteration(bulkIterationResultSet);
                dataFlowOp.setResources(bulkIterationResultSet.getIterationHead().getMinResources(),
                        bulkIterationResultSet.getIterationHead().getPreferredResources());
            }
            else if (dataSet instanceof DeltaIterationResultSet) {
                DeltaIterationResultSet<?, ?> deltaIterationResultSet = (DeltaIterationResultSet<?, ?>) dataSet;
                dataFlowOp = translateDeltaIteration(deltaIterationResultSet);
                dataFlowOp.setResources(deltaIterationResultSet.getIterationHead().getMinResources(),
                        deltaIterationResultSet.getIterationHead().getPreferredResources());
            }
            else if (dataSet instanceof DeltaIteration.SolutionSetPlaceHolder || dataSet instanceof DeltaIteration.WorksetPlaceHolder) {
                throw new InvalidProgramException("A data set that is part of a delta iteration was used as a sink or action."
                    + " Did you forget to close the iteration?");
            }
            else {
                throw new RuntimeException("Error while creating the data flow plan for the program: Unknown operator or data set type: " + dataSet);
            }
    
            this.translated.put(dataSet, dataFlowOp);
            // take care of broadcast variables
            translateBcVariables(dataSet, dataFlowOp);
            return dataFlowOp;
        }
    

    上面translateSingleInputOperator()、translateTwoInputOperator()等方法中,会递归调用该translate()方法来对某个Operator进行翻译处理。我们以前面给出的WordCount程序为例,就会对DataSource→FlatMapOperator→UnsortedGrouping→AggregateOperator→DataSink进行从后至前的遍历,最终翻译成GenericDataSourceBase→FlatMapOperatorBase→GroupReduceOperatorBase→GenericDataSinkBase。这样,我们便得到了最基本的Plan。

    生成OptimizedPlan

    通过上一步生成的Plan,我们还无法精确地知道如何真正运行某个Operator。所以接下来,生成的OptimizedPlan能够做到如下事情:

    • 基于用户的程序生成Plan,更进一步生成更精确描述用户程序执行的OptimizedPlan。
    • OptimizedPlan能够精确地描述程序如何物理地执行。
    • Operator使用何种策略执行,比如,是Hash Join还是Sort Merge Join。
    • Operator之间进行数据交换的策略,是Local Pipe Forward、Shuffle,还是Broadcast。
    • Operator之间使用的数据交换模式,是Pipelined还是Batch。
    • 在什么地方缓存中间结果。

    这里,我们不再更详细地去分析,后面会对单独写文章进行深入分析。现在可以查看对应代码来了解,核心代码如下所示:

            GraphCreatingVisitor graphCreator = new GraphCreatingVisitor(defaultParallelism, defaultDataExchangeMode);
            program.accept(graphCreator);
    
            // if we have a plan with multiple data sinks, add logical optimizer nodes that have two data-sinks as children
            // each until we have only a single root node. This allows to transparently deal with the nodes with
            // multiple outputs
            OptimizerNode rootNode;
            if (graphCreator.getSinks().size() == 1) {
                rootNode = graphCreator.getSinks().get(0);
            }
            else if (graphCreator.getSinks().size() > 1) {
                Iterator<DataSinkNode> iter = graphCreator.getSinks().iterator();
                rootNode = iter.next();
                while (iter.hasNext()) {
                    rootNode = new SinkJoiner(rootNode, iter.next());
                }
            }
            else {
                throw new CompilerException("Bug: The optimizer plan representation has no sinks.");
            }
    
            // now that we have all nodes created and recorded which ones consume memory, tell the nodes their minimal
            // guaranteed memory, for further cost estimations. We assume an equal distribution of memory among consumer tasks
            rootNode.accept(new IdAndEstimatesVisitor(this.statistics));
    
            // We need to enforce that union nodes always forward their output to their successor.
            // Any partitioning must be either pushed before or done after the union, but not on the union's output.
            UnionParallelismAndForwardEnforcer unionEnforcer = new UnionParallelismAndForwardEnforcer();
            rootNode.accept(unionEnforcer);
    
            // We are dealing with operator DAGs, rather than operator trees.
            // That requires us to deviate at some points from the classical DB optimizer algorithms.
            // This step builds auxiliary structures to help track branches and joins in the DAG
            BranchesVisitor branchingVisitor = new BranchesVisitor();
            rootNode.accept(branchingVisitor);
    
            // Propagate the interesting properties top-down through the graph
            InterestingPropertyVisitor propsVisitor = new InterestingPropertyVisitor(this.costEstimator);
            rootNode.accept(propsVisitor);
            
            // perform a sanity check: the root may not have any unclosed branches
            if (rootNode.getOpenBranches() != null && rootNode.getOpenBranches().size() > 0) {
                throw new CompilerException("Bug: Logic for branching plans (non-tree plans) has an error, and does not " +
                        "track the re-joining of branches correctly.");
            }
    
            // the final step is now to generate the actual plan alternatives
            List<PlanNode> bestPlan = rootNode.getAlternativePlans(this.costEstimator);
            if (bestPlan.size() != 1) {
                throw new CompilerException("Error in compiler: more than one best plan was created!");
            }
    
            // check if the best plan's root is a data sink (single sink plan)
            // if so, directly take it. if it is a sink joiner node, get its contained sinks
            PlanNode bestPlanRoot = bestPlan.get(0);
            List<SinkPlanNode> bestPlanSinks = new ArrayList<SinkPlanNode>(4);
    
            if (bestPlanRoot instanceof SinkPlanNode) {
                bestPlanSinks.add((SinkPlanNode) bestPlanRoot);
            } else if (bestPlanRoot instanceof SinkJoinerPlanNode) {
                ((SinkJoinerPlanNode) bestPlanRoot).getDataSinks(bestPlanSinks);
            }
    
            // finalize the plan
            OptimizedPlan plan = new PlanFinalizer().createFinalPlan(bestPlanSinks, program.getJobName(), program);
            plan.accept(new BinaryUnionReplacer());
            plan.accept(new RangePartitionRewriter(plan));
    
            // post pass the plan. this is the phase where the serialization and comparator code is set
            postPasser.postPass(plan);
    

    这里需要说的是,经过上面代码的处理,首先会将Plan生成一个中间结构的Plan,主要是通过GraphCreatingVisitor来进行处理的,DAG图以OptimizerNode为基本节点进行抽象构建,对应的类及其继承关系如下图所示:
    CD.OptimizerNode
    优化器(Optimizer)会使用每个OptimizerNode提供的信息来构建DAG,称为Optimizer DAG,从而完成如下处理工作:

    • 评估每个Operator处理的数据量的大小,以及Key/Value数据的数量。
    • 确定在DAG的什么位置进行Split(分支)或者Join(合并)处理操作。
    • 使用DagConnection数据接口来描述Operator之间如何进行连接,包括Broadcast连接、Incoming连接和Outgoing连接。
    • 选择合适的属性,从而不断迭代优化候选计划(Candidate Plan)。

    然后,根据上面基于OptimizerNode的DAG,又进行优化处理,最终得到了以PlanNode为基本节点抽象的DAG图,PlanNode类及其继承关系如下图所示:
    CD.PlanNode
    每个PlanNode并不是和之前翻译过程中生成的一些数据结构没关系,它们内部都有一个OptimizerNode的引用,从而能访问到OptimizerNode内部的信息。PlanNode主要抽象了DAG图中各个Operator之间如何进行数据交换,以及数据交换策略,最终得到一个唯一的最佳Plan,对应代码片段如下所示:

            List<PlanNode> bestPlan = rootNode.getAlternativePlans(this.costEstimator);
            if (bestPlan.size() != 1) {
                throw new CompilerException("Error in compiler: more than one best plan was created!");
            }
    
            // check if the best plan's root is a data sink (single sink plan)
            // if so, directly take it. if it is a sink joiner node, get its contained sinks
            PlanNode bestPlanRoot = bestPlan.get(0);
            List<SinkPlanNode> bestPlanSinks = new ArrayList<SinkPlanNode>(4);
            if (bestPlanRoot instanceof SinkPlanNode) {
                bestPlanSinks.add((SinkPlanNode) bestPlanRoot);
            } else if (bestPlanRoot instanceof SinkJoinerPlanNode) {
                ((SinkJoinerPlanNode) bestPlanRoot).getDataSinks(bestPlanSinks);
            }
    
            // finalize the plan
            OptimizedPlan plan = new PlanFinalizer().createFinalPlan(bestPlanSinks, program.getJobName(), program);
            plan.accept(new BinaryUnionReplacer());
    
            plan.accept(new RangePartitionRewriter(plan));
    
            // post pass the plan. this is the phase where the serialization and comparator code is set
            postPasser.postPass(plan);
    

    到此为止,已经生成了一个OptimizedPlan,通过OptimizedPlan就可以生成我们需要的JobGraph了。

    生成JobGraph

    JobGraph是JobManager能够接收的Job的数据结构,是一种Low-Level的数据流DAG表现形式,它定义了Job范围内(Job-wide)的一些设置。生成JobGraph的主流程,代码如下所示:

        private JobGraph compilePlan(Plan plan, Configuration optimizerConfiguration) {
            Optimizer optimizer = new Optimizer(new DataStatistics(), optimizerConfiguration);
            OptimizedPlan optimizedPlan = optimizer.compile(plan);
            JobGraphGenerator jobGraphGenerator = new JobGraphGenerator(optimizerConfiguration);
            return jobGraphGenerator.compileJobGraph(optimizedPlan, plan.getJobId());
        }
    

    组成JobGraph计算图拓扑中,最核心的两个数据结构为JobVertex和IntermediateDataSet。其中,JobVertex是对计算逻辑单元的抽象,它通过设置invokableClassName属性来表示,它必须是AbstractInvokable类的具体实现类,能够直接在TaskManager上通过反射生成AbstractInvokable对象,然后调用其invoke()方法执行该Task对应的处理逻辑,比如,批处理Job使用的是具体实现BatchTask。IntermediateDataSet是一个Operator执行后产生的中间结果集的抽象,也包括一个DataSource对应的数据集,它能够被其他Operator读取、物化(Materialized),甚至丢弃。



沪ICP备19023445号-2号
友情链接