IT博客汇
  • 首页
  • 精华
  • 技术
  • 设计
  • 资讯
  • 扯淡
  • 权利声明
  • 登录 注册

    Flink批处理生成执行计划

    Yanjun发表于 2020-03-17 03:26:32
    love 0

    我们这里所说的优化执行计划,是指从生成Plan之后,一直到生成JobGraph之前这一期间对Plan进行各种转换、增强、优化处理。从用户编程的Flink批处理程序开始,一直到生成JobGraph的整个过程,这个过程中,会创建很多数据结构来完成执行计划的优化工作,而且,通过分层增强与优化的方式,有利于我们对每层进行了哪些处理有一个很好地理解。
    我们从整理流程上来看,都有哪些核心的处理,然后会对每一步进行概要说明,从宏观上理解每一步都做了哪些操作。批处理执行计划生成及优化的主要流程,如下图所示:
    Flink-Generating-Execution-Plan
    根据上图,我们将Flink批处理执行计划生成的流程,分为如下几个阶段进行说明。

    • 构建用户编程Operator DAG

    把用户程序中设置的各种DataSource、Transformation、DataSink及其配置信息进行收集,以最原始的形态保存在ExecutionEnvironment上线文环境对象中,当然也跟踪了从DataSource到DataSink之间的操作顺序,最后通过ExecutionEnvironment对象,基于DataSink就可以从后向前遍历,直接链接到DataSource。
    在上面的处理过程中,涉及到3个比较重要的内容:一是用户根据Flink定义的各种Transformation接口实现并传入的处理函数,也包括Flink为了完成处理内部直接创建的函数对象;二是创建位于org.apache.flink.api.java.operators包下面的Operator具体实现类对象;三是根据用户编写的程序配置,比如并行度、Broadcast变量等等,收集这些配置并设置到对应的Operator对象上。

    • 生成通用Operator DAG

    根据ExecutionEnvironment中收集的信息,生成程序Plan。Flink批处理在生成程序Plan的过程中,比较容易理解,就是把原生的Operator(与org.apache.flink.api.java.operators包下面的具体实现相对应),转换成语义更加丰富的Operator(与org.apache.flink.api.common.operators包下面的具体实现相对应),也就是通过调用org.apache.flink.api.java.operators.translateToDataflow(),生成org.apache.flink.api.common.operators.Operator。
    这一阶段,生成的Operator DAG中的节点,基本上与上一阶段的Operator DAG中的节点一一对应,不过不同的是会使用一些中间结构来辅助表示一些复杂的操作,像groupBy、join、coGroup等操作,最后还是会转换成一个对应的Operator(也就是DataSet)。
    举个例子,下面代码片段是一个join操作,需要指定关联条件及关联后取哪些字段作为结果集:

            DataSet<Tuple3<Integer, String, Integer>> joinDocsRanks =
                    filterDocs.join(filterRanks)
                                .where(0).equalTo(1)
                                .projectSecond(0, 1, 2);
    

    首先,join()返回一个JoinOperatorSets,它并不是一个Operator,而是一个中间结构。然后,where()和equalTo()确定了进行关联的2个数据集对应的关联字段,其中where()返回JoinOperatorSetsPredicate也是一个中间结构,equalTo()返回的是一个Operator。最后,projectSecond()返回一个Operator。
    另外,这一阶段还会根据需要对Operator内部的Function进行处理,可能会生成新的Function,用来在最终物理执行的时候进行调用,处理数据,比如sum()操作,在translateToDataflow()中会创建一个内部的AggregatingUdf函数,实现求和的操作。

    • 优化器级别DAG表示

    得到Plan以后,使用OptimizerNode来对Plan进一步编译。这一阶段,主要通过GraphCreatingVisitor构建基本OptimizerNode DAG,在创建对应的OptimizerNode的同时,还对各个OptimizerNode之间的连接关系通过抽象的DagConnection来表示。另外,这些创建的DagConnection是什么类型的,也会确定下来,主要包括InComing连接、OutGoing连接。
    而对于Broadcast DataSet,同样也会建立与Operator之间的对应连接关系,也通过DagConnection来类似去表示。与其他Operator之间连接不同的是,这种连接会设置OptimizerNode对应Operator的输出数据的传输策略,值为ShipStrategyType.BROADCAST类型,而其他连接类型包括ShipStrategyType.FORWARD、ShipStrategyType.PARTITION_HASH、ShipStrategyType.PARTITION_RANDOM等等。
    构建好OptimizerNode DAG以后,再经过IdAndEstimatesVisitor、UnionParallelismAndForwardEnforcer、BranchesVisitor、InterestingPropertyVisitor来进行增强,具体每一步都增强了哪些内容,可以从各个Visitor实现类的名称大概有所了解。

    • 生成最佳执行计划

    经过上一步做的比较重要之一的处理,是对每个Operator的数据输出数量(记录数、数据大小)通过Optimizer进行了估算。这一阶段会根据这些信息来生成一个最佳执行计划,这个过程是通过递归方式来实现的。在生成最佳执行计划过程中,还会对生成的DAG执行剪枝优化操作,以使最终生成PlanNode DAG执行的代价最小。
    生成最佳执行计划以后,这时就可以创建生成OptimizedPlan了。最后,OptimizedPlan经过BinaryUnionReplacer、RangePartitionRewriter、JavaApiPostPass这几步增强处理后,就可以通过调用JobGraphGenerator.compileJobGraph()来生成JobGraph。

    下面,我们通过一个简单的WordCount批处理程序,来展示在生成和优化执行计划过程中,各个阶段对应的DAG图的节点类型。示例代码如下所示:

            final MultipleParameterTool params = MultipleParameterTool.fromArgs(args);
            final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
            env.getConfig().setGlobalJobParameters(params);
    
            // get input data
            DataSet<String> text = env.readTextFile(input);
            DataSet<Tuple2<String, Integer>> counts =
                    // split up the lines in pairs (2-tuples) containing: (word,1)
                    text.flatMap(new Tokenizer())
                    // group by the tuple field "0" and sum up tuple field "1"
                    .groupBy(0)
                    .sum(1);
            counts.writeAsCsv(params.get("output"), "\n", " ");
            // execute program
            env.execute("WordCount Example");
    

    这个简单的Flink批处理程序,经过上面的翻译优化过程,分别通过下面的线性连接关系来表达,抽象出各层中DAG关系图,如下所示:

    readTextFile() → flatMap() → groupBy() → sum() → writeAsCsv()
    DataSource → FlatMapOperator → UnsortedGrouping → AggregateOperator→DataSink
    GenericDataSourceBase → FlatMapOperatorBase → GroupReduceOperatorBase → GenericDataSinkBase
    DataSourceNode → FlatMapNode → GroupReduceNode → DataSinkNode
    SourcePlanNode → SingleInputPlanNode → SingleInputPlanNode → SinkPlanNode
    

    上面5个简单的线性DAG,其中,第一行表示用户编程API中调用各个算子所得到的操作DAG,第二行表示通过org.apache.flink.api.java.operators包下面的Operator结构来构建最原始的Operator DAG,第三行表示通过org.apache.flink.api.common.operators包下面的Operator结构来构建Operator DAG,第四行表示通过Optimizer生成的OptimizerNode DAG,第五行表示通过翻译优化得到的最优执行计划PlanNode DAG。



沪ICP备19023445号-2号
友情链接