【排查问题/面试必备】Doris SQL执行流程全解析

科技   2024-11-04 11:36   湖南  

本文从一个SQL查询语句出发,从使用层面以及源码层面解析SQL的执行过程。

Doris把查询规划所有的部分,都放到了一个FE里面,都会由FE来完成。FE来根据用户的查询生成一个完整的逻辑规划,然后这个逻辑规划最后生成一个分布式的逻辑规划。当FE生成好查询计划树后,BE对应的各种Plan Node(Scan, Join, Union, Aggregation, Sort等)执行自己负责的操作即可。

整个查询过程可以分为3个模块:

  1. 1. 查询优化器:将 SQL 文本转换成一个 “最佳的” 分布式物理执行计划

  2. 2. 查询调度器:将执行计划调度到计算节点

  3. 3. 执行执行器:计算节点执行具体的物理执行计划

本文将依次介绍介绍这三个模块的执行流程及原理。

一、查询优化器

一条SQL进入Doris后,有多种可以执行的方式。查询优化器的目的是为了生成一个BE能够识别的 “最佳的” 分布式物理执行计划,主要分为两个步骤:

  1. 1. 第一步:生成单节点查询计划,即SQL-->PlanNodeTree,其中Plan Node是具体的算子。该步骤包括Plan Tree的生成,谓词下推, Table Partitions pruning,Column projections,Cost-based优化等。

  2. 2. 第二步:将单节点的查询计划分布式化,即PlanNodeTree-->PlanFragmentTree,其中FragmentTree是BE执行查询的最小单位,至少包含一个算子。分布式化的目标是最小化数据移动和最大化本地Scan,分布式化的方法是增加ExchangeNode以及DataSink,执行计划树会以ExchangeNode为边界拆分为PlanFragment。

具体效果如下图所示:

查询优化器在运行过程中,又可以细分成很多个执行阶段:

  1. 1. 词法语法解析

  2. 2. 语义解析

  3. 3. query改写

  4. 4. 单机执行计划

  5. 5. 分布式执行计划

后文将依次进行介绍。

1.1、词法语法解析

  1. 1. 首先会进入词法解析,会将一个SQL中的关键词解析出来,如:select、from、join、group by等(如下图的蓝色部分)。

  2. 2. 然后会进入语法解析,该阶段会判断语法是否正确

最终一条select语句被拆分为多个部分转成抽象语法树(AST Tree),如:

  • • selectList:select中查询的内容(列名的集合)

  • • tableRefs:查询的表

  • • WhereClause:子查询(子查询相当于又是一个新的SelectStmt,具体结构看SQL类型)或者过滤条件GroupByClause、HavingClause和OrderByClause一一填充即可

1.2、语义解析

然后进行语义解析,语义解析的目的是为了保证查询的列名和表名是正确的,包括如下部分:

  • • 元数据的识别和解析(Binder):检查表权限,列是否存在类型是否支持等,然后解析(Binder)出来

  • • SQL合法性检查

  • • SQL重写:如select * 重写为 select col1,col2...

  • • 函数检查

  • • 别名处理

具体步骤如下:

1.3、query改写

语义解析结束以后就进入了query改写阶段。该阶段会按照指定的规则给之前生成的抽象语法树(AST Tree)做一些变化。query改写又分为表达式改写子查询解嵌套

1.3.1、表达式改写

例如说SQL中where后面的条件是 c1=c2 and True,很显然是可以改成c1=c2的,那么Doris内部是怎么识别并改写的呢?


FE中有一个接口 ExprRewriteRule,该接口中只有一个方法 apply

  1. 1. 该方法按照内核代码(开发者)定义的规则,将用户的表达式进行改写

  2. 2. 该方法的输入是原来是表达式(Expr)

  3. 3. 该方法输出是上文中提到的简化后的表达式(NewExpr)

public interface ExprRewriteRule {
    /**
     * Applies this equal rule to the given analyzed Expr. Returns the transformed and
     * analyzed Expr or the original unmodified Expr if no changes were made. If any
     * changes were made, the transformed Expr is guaranteed to be a different Expr object,
     * so callers can rely on object reference comparison for change detection.
     */

    Expr apply(Expr expr, Analyzer analyzer, ExprRewriter.ClauseType clauseType) throws AnalysisException;
}

对于上文提到的例子,实际上是用到了 CompoundPredicateWriteRule 这个rule,在apply该rule的时候,只需要根据规则进行变换即可。伪代码如下:

public class CompoundPredicateWriteRule implements ExprRewriteRule {

    public static ExprRewriteRule INSTANCE = new CompoundPredicateWriteRule();

    @Override
    public Expr apply(Expr expr, Analyzer analyzer, ExprRewriter.ClauseType clauseType) throws AnalysisException {
        //1. 首先会检查是不是一个CompoundPredicate
        if (!(expr instanceof CompoundPredicate)) {
            return expr;
        }
        //2.然后会child做一些变换
        CompoundPredicate cp = (CompoundPredicate) expr;

        List<Expr> children = cp.getChildren();

        Expr leftChild = cp.getChild(0);
        Expr rightChild = cp.getChild(1);

        boolean and = (cp.getOp() == CompoundPredicate.Operator.AND);
        boolean or = (cp.getOp() == CompoundPredicate.Operator.OR);

        boolean leftChildTrue = (leftChild instanceof BoolLiteral) && (((BoolLiteral) leftChild).getValue());
        boolean leftChildFalse = (leftChild instanceof BoolLiteral) && (!((BoolLiteral) leftChild).getValue());

        boolean rightChildTrue = (rightChild instanceof BoolLiteral) && (((BoolLiteral) rightChild).getValue());
        boolean rightChildFalse = (rightChild instanceof BoolLiteral) && (!((BoolLiteral) rightChild).getValue());

        // case true and expr ==> expr
        if (leftChildTrue && and) {
            return rightChild;
        }
        
        ......

        // other case ,return origin expr
        return expr;
    }
}

那么这个规则是在哪添加的,以及如何生效的呢?

在FE中只需要执行以下两步:

  1. 1. 注册表达式改写Rule :如果开发者想新增新的规则,需要在这里进行注册,才可以被apply

  1. 1. 框架apply rule:当完成注册以后,需要利用FE中的SQL改写框架 ExprRewriter。该框架会对用户输入的SQL,对每个Rule都进行apply

这里将最核心的代码部分贴上,其中中文部分是笔者的注释:

public class ExprRewriter {
    private boolean useUpBottom = false;
    private int numChanges = 0;
    //rules是一个链表,包含了所有注册的规则
    private final List<ExprRewriteRule> rules;
    // Once-only Rules
    private List<ExprRewriteRule> onceRules = Lists.newArrayList();
     ......
 }
public Expr rewrite(Expr expr, Analyzer analyzer, ClauseType clauseType) throws AnalysisException {
    // Keep applying the rule list until no rule has made any changes.
    int oldNumChanges;
    Expr rewrittenExpr = expr;
    do {
        oldNumChanges = numChanges;
        //遍历所有的Rule,并执行apply方法applyRuleRepeatedly
        for (ExprRewriteRule rule : rules) {
            // when foldConstantByBe is on, fold all constant expr by BE instead of applying
            // FoldConstantsRule in FE
            if (rule instanceof FoldConstantsRule && analyzer.safeIsEnableFoldConstantByBe()) {
                continue;
            }
            rewrittenExpr = applyRuleRepeatedly(rewrittenExpr, rule, analyzer, clauseType);
        }
    } while (oldNumChanges != numChanges);

    return rewrittenExpr;
}

1.3.2、子查询

还有一种改写会相对复杂些,属于子查询相关的改写,即子查询解嵌套。

下图就是一个子查询解嵌套的例子:

  • • 为什么需要改写:如果不进行改写的话,外层的select每扫一次表都要去子查询进行对比,这样的话执行效率是比较低的。

  • • 改写后的收益:改写后,只需要执行一次join即可

解嵌套的主要逻辑在 rewriteSelectStatement,下文附上核心的实现逻辑,其中中文部分是笔者的注释。

private static SelectStmt rewriteSelectStatement(SelectStmt stmt, Analyzer analyzer)
        throws AnalysisException {
    SelectStmt result = stmt;

    //1. 首先会监测where子句里面有没有子查询
    // Rewrite all the subqueries in the WHERE clause.
    if (result.hasWhereClause()) {
        // Push negation to leaf operands.
        result.whereClause = Expr.pushNegationToOperands(result.whereClause);
        if (ConnectContext.get() == null) {
            // Check if we can equal the subqueries in the WHERE clause. OR predicates with
            // subqueries are not supported.
            if (hasSubqueryInDisjunction(result.whereClause)) {
                throw new AnalysisException("Subqueries in OR predicates are not supported: "
                        + result.whereClause.toSql());
            }
        }
        //1.1 如果有,就进行一次解嵌套
        rewriteWhereClauseSubqueries(result, analyzer);
    }
    //2. 检查having里面有没有子查询
    // Rewrite all subquery in the having clause
    if (result.getHavingClauseAfterAnalyzed() != null
            && result.getHavingClauseAfterAnalyzed().getSubquery() != null) {
        //2.1 如果有,就进行一次解嵌套
        result = rewriteHavingClauseSubqueries(result, analyzer);
    }

    return result;
}

1.4、单机执行计划

经过了前3步的处理,Doris对于一些基于规则的转换已经做完了,这时候SQL在内核中还是一颗抽象语法树(AST Tree)。此时Doris需要将该树转换成一个单机的执行规划。

例如说会将下图中的SQL转成一个逻辑计划树PlanNodeTree,在这个转换期间会做一些优化如join order以及谓词下推

1.4.1、Join Reorder

上文提到的SQL如果不做优化,会按照默认的顺序进行join。其中t1跟t2进行关联,会产生一个笛卡尔积,这很有可能导致OOM。为此,Doris会对这个join重新排序,这样join的结果集会非常小,刚刚提到的笛卡尔积也就消失了。

那么join reorder是怎么实现的呢?

Doris使用贪婪算法进行reorder,每次都是找中间结果最小的表进行合并。

如下图所示:

  • • 1,2,3,4 这四个点表示4个表

  • • 表之间的线表示表之间是有关联条件的,线上数字表示的是两个表关联以后的结果集,数字越小表示结果集越小

  • • 下图中的第一步:选择了1和3之间的关联,因为结果集最小;后续依次选择2和4

  • • 最后生成的执行计划树就是1-->3-->2-->4这种的执行顺序

有了上文的例子进行理解,下面讲解下实现代码中最核心的部分。该算法的核心实现在createJoinPlan中,大概的步骤如下:

  1. 1. 首先会挑一张表出来,即leftmostRef

  2. 2. 然后会遍历所有与这个表有关系的表

  3. 3. 寻找candidate:找到一个候选的表,生成TableRef

  4. 4. 跟候选TableRef,生成一个候选的join节点

  5. 5. 代价比较:对比是这次候选的join节点最优还是上一次for循环的最优

  • • 如果这次更优,就进行以下赋值替换操作

  • • 否则放弃

精简过的代码如下

private PlanNode createJoinPlan(Analyzer analyzer, TableRef leftmostRef, List<Pair<TableRef, PlanNode>> refPlans)
        throws UserException {
    
    // Refs that have been joined. The union of joinedRefs and the refs in remainingRefs
    // are the set of all table refs.
    //1. 首先会挑一张表出来,即leftmostRef
    Set<TableRef> joinedRefs = Sets.newHashSet(leftmostRef);

    while (!remainingRefs.isEmpty()) {
        // We minimize the resulting cardinality at each step in the join chain,
        // which minimizes the total number of hash table lookups.
        PlanNode newRoot = null;
        Pair<TableRef, PlanNode> minEntry = null;
        long newRootRightChildCardinality = 0;
        //2. 然后会遍历所有与这个表有关系的表
        for (Pair<TableRef, PlanNode> tblRefToPlanNodeOfCandidate : remainingRefs) {
            //3. 找到候选的表
            TableRef tblRefOfCandidate = tblRefToPlanNodeOfCandidate.first;
            long cardinalityOfCandidate = tblRefToPlanNodeOfCandidate.second.getCardinality();
            PlanNode rootPlanNodeOfCandidate = tblRefToPlanNodeOfCandidate.second;
            JoinOperator joinOp = tblRefOfCandidate.getJoinOp();
            
            analyzer.setAssignedConjuncts(root.getAssignedConjuncts());
            //4. 跟候选TableRef,生成一个候选的join节点
            PlanNode candidate = createJoinNode(analyzer, root, rootPlanNodeOfCandidate, tblRefOfCandidate);
            // it may not return null, but protect.


            //5. 对比是这次候选的join节点最优还是上一次for循环的最优
            if (newRoot == null
                    || ((candidate.getClass().equals(newRoot.getClass())
                    && candidateCardinalityIsSmaller(
                            candidate, tblRefToPlanNodeOfCandidate.second.getCardinality(),
                            newRoot, newRootRightChildCardinality)))
                    || (candidate instanceof HashJoinNode && newRoot instanceof NestedLoopJoinNode)) {
                //5.1 如果这次更优,就进行以下赋值替换操作
                newRoot = candidate;
                minEntry = tblRefToPlanNodeOfCandidate;
                newRootRightChildCardinality = cardinalityOfCandidate;
            }
        }
    }

    return root;
}

1.4.2、谓词下推

单机执行规划还有谓词下推的优化,谓词下推在Doris中也叫做谓词分配。

为了便于理解,这里举两个个例子来说明下谓词分配的原则。

案例一

如下图所示:

  1. 1. 假如用户的SQL有3个谓词,并且涉及到了不同的表。

  2. 2. 其中黄色的谓词分配给了逻辑计划树底层的HashJoinNode,因为其已经包含了表a和表b的信息。但是这个HashJoinNode还没有跟表c进行join,所以SQL中上面两个蓝色的谓词只能分配给上层的HashJoinNode

案例二

当用户执行SQL:

select * from a join b on a.k1=1;

其中 a.k1=1 只涉及到表a,实际上可以进行下推,那Doris是怎么做到的呢?

  1. 1. 将 a.k1=1 放在谓词的链表Conjuncts里面

  2. 2. 当生成 Scan a 的时候会去list里面拿相关的谓词

  3. 3. 然后再生成 scan b 和 JoinNode

即生成了如下的执行计划树:

1.5、分布式执行计划

接着需要给单机执行计划生成分布式执行计划。

下面以join算子为例,来介绍分布式执行计划是怎么生成的。

1.5.1、join

Broadcast Join

SELECT * FROM A,B WHERE A.column=B.column

上面的SQL需要对A和B执行join操作,由于数据都是分散在整个MPP架构里面,所以执行join必须得涉及网络的传输。Broadcast Join的处理方式就是将B表的数据全部发送给所有A的节点。

优点:网络开销小(只用分发表B)

缺点:内存占用多(每个BE都需要把表B放在内存里面)

由于内存占用较多,如果表B的数据量够大,那就做不了Broadcast Join了,所以Doris引入了Shuffle Join

Shuffle Join

SELECT * FROM A,B WHERE A.column=B.column

Shuffle Join是根据表A和表B执行join的那一列进行hash,相同hash值的数据放在同一个节点上。

优点:使用与大数据量的场景

缺点:网络开销多(需要将表A和B进行网络传输)

Bucket shuffle join

基于上面网络开销的痛点,Doris引入了更好的操作:Bucket shuffle join

Doris 的表数据本身是通过 Hash 计算分桶的,所以就可以利用表本身的分桶列的性质来进行 Join 数据的 Shuffle。假如两张表需要做 Join,并且 Join 列是左表的分桶列,那么左表的数据其实可以不用去移动右表通过左表的数据分桶发送数据就可以完成 Join 的计算。

SELECT * FROM A,B WHERE A.distributekey=B.anycolumn

优点:性能好

缺点:场景有限,需要使用 A.distributekey

既然表A能够做到不传输数据,那么表B是不是也能够这样?所以Doris又引入了Colocate join

Colocate join

它与 Bucket Shuffle Join 相似,相当于在数据导入的时候,根据预设的 Join 列的场景已经做好了数据的 Shuffle。那么实际查询的时候就可以直接进行 Join 计算而不需要考虑数据的 Shuffle 问题了。

SELECT* FROM A,B WHERE A.colocatecolumn=B.collocatecolumn

优点:性能最好

缺点:场景有限:要求表A和表B的分布是在一个group的,且join的条件跟group是match的。这样的话表A和表B都不需要shuffle了,所以性能也是最好的。

四种 Shuffle 方式对比

假设关系S 和 R 进行Join,N 表示参与 Join 计算的节点的数量;T 则表示关系的 Tuple 数目

Shuffle方式网络开销物理算子适用场景
BroadCastN * T(R)Hash Join / Nest Loop Join通用
ShuffleT(S) + T(R)Hash Join通用
Bucket ShuffleT(R)Hash JoinJoin条件中存在左表的分布式列,且左表执行时为单分区
Colocate0Hash JoinJoin条件中存在左表的分布式列,且左右表同属于一个Colocate Group

N :参与 Join 计算的 Instance 个数

T(关系) : 关系的 Tuple 数目

上面这 4 种方式灵活度是从高到低的,它对这个数据分布的要求是越来越严格,但 Join 计算的性能也是越来越好的。

源码分析

接下来从代码层面分析,Doris是选择哪种join方式的。

大概步骤如下

  1. 1. 首先判断leftChildFragment跟rightChildFragment能否做Colocate

  • • 如果可以,则直接采用Colocate join

  • • 如果不可以,则继续

  • 2. 判断能否做BucketShuffle

    • • 如果可以,则直接采用BucketShuffleJoin

    • • 如果不可以,则继续

  • 3. 引入joinCostEvaluation,用来对比Broadcast和Shuffle的执行代价,并选择代价小的来执行。

  • 精简的代码如下,其中中文部分是笔者的注释:

    private PlanFragment createHashJoinFragment(
            HashJoinNode node, PlanFragment rightChildFragment,
            PlanFragment leftChildFragment, ArrayList<PlanFragment> fragments)
            throws UserException {
        List<String> reason = Lists.newArrayList();
        //1. 首先判断leftChildFragment跟rightChildFragment能否做Colocate
        if (canColocateJoin(node, leftChildFragment, rightChildFragment, reason)) {
            ......
            //1.1 如果可以,则直接采用Colocate join
            return leftChildFragment;
        }

        //2. 如果不能,则判断能否做BucketShuffle
        List<Expr> rhsPartitionExprs = Lists.newArrayList();
        if (canBucketShuffleJoin(node, leftChildFragment, rhsPartitionExprs)) {
            ......
            //2.1 如果可以,则直接采用BucketShuffleJoin
            return leftChildFragment;
        }

        //3. 引入joinCostEvaluation,用来对比Broadcast和Shuffle的执行代价
        JoinCostEvaluation joinCostEvaluation = new JoinCostEvaluation(node, rightChildFragment, leftChildFragment);
        boolean doBroadcast;
        //4. 根据以下策略来进行选择其中一个来执行
        if (node.getJoinOp() == JoinOperator.NULL_AWARE_LEFT_ANTI_JOIN) {
            doBroadcast = true;
        } else if (node.getJoinOp() != JoinOperator.RIGHT_OUTER_JOIN
                && node.getJoinOp() != JoinOperator.FULL_OUTER_JOIN) {
            if (node.getInnerRef().isBroadcastJoin()) {
                // respect user join hint
                doBroadcast = true;
            } else if (!node.getInnerRef().isPartitionJoin() && joinCostEvaluation.isBroadcastCostSmaller()
                    && joinCostEvaluation.constructHashTableSpace()
                    <= ctx.getRootAnalyzer().getAutoBroadcastJoinThreshold()) {
                doBroadcast = true;
            } else {
                doBroadcast = false;
            }
        } else {
            doBroadcast = false;
        }
        //5. 如果Broadcast代价小,则选择Broadcast
        if (doBroadcast) {
            node.setDistributionMode(HashJoinNode.DistributionMode.BROADCAST);
            ........
            ((ExchangeNode) node.getChild(1)).setRightChildOfBroadcastHashJoin(true);
            return leftChildFragment;
        } else {
            //6. 否则选择Shuffle
            ........
            return joinFragment;
        }
    }

    二、查询调度器

    2.1、流程分析

    在生成查询的分布式 Plan 之后,FE 调度模块会负责 PlanFragment 的执行实例生成、PlanFragment 的调度、每个 BE 执行状态的管理、查询结果的接收。

    有了分布式执行计划之后,Doris 需要解决下面的问题:

    1. 1. 哪个 BE 执行哪个 PlanFragment

    2. 2. 每个 Tablet 选择哪个副本去查询

    3. 3. 多个 PlanFragment 如何调度

    Doris 会首先确认 Scan Operator 所在的 Fragment 在哪些 BE 节点执行,每个 Scan Operator 有需要访问的 Tablet 列表。然后对于每个 Tablet,Doris 会先选择版本匹配的、健康的、所在的 BE 状态正常的副本进行查询。在最终决定每个 Tablet 选择哪个副本查询时,采用的是随机方式,不过 Doris 会尽可能保证每个 BE 的请求均衡。假如有 10 个 BE、10 个 Tablet,最终调度的结果理论上就是每个 BE 负责 1 个 Tablet 的 Scan。

    当确定包含 Scan 的 PlanFragment 由哪些 BE 节点执行后,其他的 PlanFragment 实例也会在 Scan 的 BE 节点上执行 (也可以通过参数选择其他 BE 节点 ),不过具体选择哪个 BE 是随机选取的。当 FE 确定每个 PlanFragment 由哪个 BE 执行,每个 Tablet 查询哪个副本后,FE 就会将 PlanFragment 执行相关的参数通过 Thrift 的方式发送给 BE。

    如下图是一个简单示例,图中的PlanFrament包含了一个ScanNode,ScanNode扫描3个tablet,每个tablet有2副本,集群假设有2台host。

    computeScanRangeAssignment阶段确定了需要扫描replica 1,3,5,8,10,12,其中replica 1,3,5位于host1上,replica 8,10,12位于host2上。

    • • 如果全局并发度设置为1时,则创建2个实例FInstanceExecParam,下发到host1和host2上去执行

    • • 如果全局并发度设置为3,这个host1上创建3个实例FInstanceExecParam,host2上创建3个实例FInstanceExecParam,每个实例扫描一个replica,相当于发起6个rpc请求。

    2.2、源码分析

    有了大概的了解以后,接下来从源码层面再进行分析。上述提到的PlanFragment 的分配和分发的逻辑是在FE的Coordinator中实现的。

    伪代码如下:

    public class Coordinator {
        //最重要的数据结构,该链表保存了前面执行计划里面包含的fragment
        private final List<PlanFragment> fragments;
        
        public void exec() throws Exception {
            // 1. prepare information:做一些初始化的工作
            prepare();
            
            // 2. scheduler:分配的过程,给fragmen分配BE
            //将scan node分配到真正存放数据的BE上
            computeScanRangeAssignment();
            //将上层的fragmen分配给BE
            computeFragmentExecParams();
            
            // 3. send:将fragmen分发到BE
            sendFragment();
        }
    }

    private void sendFragment() throws TException, RpcException, UserException {
         for (PlanFragment fragment : fragments) {
           ..........
         }
         BackendServiceProxy proxy = BackendServiceProxy.getInstance();
        futures.add(ImmutableTriple.of(states, proxy, states.execRemoteFragmentsAsync(proxy)));
    }

    执行流程图如下:

    下面来依次介绍各个流程:

    1. 1. prepare阶段:给每个PlanFragment创建一个FragmentExecParams结构,用来表示PlanFragment执行时所需的所有参数;如果一个PlanFragment包含有DataSinkNode,则找到数据发送的目的PlanFragment,然后指定目的PlanFragment的FragmentExecParams的输入为该PlanFragment的FragmentExecParams。

      private void prepare() {
          //给每个PlanFragment创建一个FragmentExecParams结构,用来表示PlanFragment执行时所需的所有参数;
          for (PlanFragment fragment : fragments) {
              fragmentExecParamsMap.put(fragment.getFragmentId(), new FragmentExecParams(fragment));
          }

          //如果一个PlanFragment包含有DataSinkNode,则找到数据发送的目的PlanFragment
          //然后指定目的PlanFragment的FragmentExecParams的输入为该PlanFragment的FragmentExecParams。
          for (PlanFragment fragment : fragments) {
              if (!(fragment.getSink() instanceof DataStreamSink)) {
                  continue;
              }
              FragmentExecParams params = fragmentExecParamsMap.get(fragment.getDestFragment().getFragmentId());
              params.inputFragments.add(fragment.getFragmentId());
          }
      }
    2. 2. scheduler阶段

    • • computeScanRangeAssignmentByColocate:针对colocate join进行处理,由于join的两个表桶中的数据分布都是一样的,他们是基于桶的join操作,所以在这里是确定每个桶选择哪个host。在给host分配桶时,尽量保证每个host分配到的桶基本平均。

    • • computeScanRangeAssignmentByBucket:针对bucket shuffle join进行处理,也只是基于桶的操作,所以在这里是确定每个桶选择哪个host。在给host分配桶时,同样需要尽量保证每个host分配到的桶基本平均。

    • • computeScanRangeAssignmentByScheduler:针对其他类型的join进行处理。确定每个scanNode读取tablet哪个副本。一个scanNode会读取多个tablet,每个tablet有多个副本。为了使scan操作尽可能分散到多台机器上执行,提高并发性能,减少IO压力,Doris采用了Round-Robin算法,使tablet的扫描尽可能地分散到多台机器上去。例如100个tablet需要扫描,每个tablet 3个副本,一共10台机器,在分配时,保障每台机器扫描10个tablet。

    • • a. computeScanRangeAssignment阶段:针对不同类型的join进行不同的处理,主要逻辑是对fragment合理分配,尽可能保证每个BE节点的请求都是平均。

      private void computeScanRangeAssignment() throws Exception {
          //针对colocate join进行处理
         if (fragmentContainsColocateJoin) {
             computeScanRangeAssignmentByColocate((OlapScanNode) scanNode, assignedBytesPerHost, replicaNumPerHost);
         }
         //针对colocate join进行处理
         if (fragmentContainsBucketShuffleJoin) {
             bucketShuffleJoinController.computeScanRangeAssignmentByBucket((OlapScanNode) scanNode,
                     idToBackend, addressToBackendID, replicaNumPerHost);
         }
         //针对其他类型的join进行处理
         if (!(fragmentContainsColocateJoin || fragmentContainsBucketShuffleJoin)) {
             computeScanRangeAssignmentByScheduler(scanNode, locations, assignment, assignedBytesPerHost,
                     replicaNumPerHost);
         }
      }

      private void computeScanRangeAssignmentByScheduler() throws Exception {
          for (TScanRangeLocations scanRangeLocations : locations) {
              Reference<Long> backendIdRef = new Reference<Long>();
              //Doris采用了Round-Robin算法,使tablet的扫描尽可能地分散到多台机器上去
              TScanRangeLocation minLocation = selectBackendsByRoundRobin(scanRangeLocations,
                      assignedBytesPerHost, replicaNumPerHost, backendIdRef);
              updateScanRangeNumByScanRange(scanRangeParams);
          }
      }
    • • b. computeFragmentExecParams:这个阶段解决PlanFragment下发到哪个BE上执行,以及如何处理实例并发问题。确定了每个tablet的扫描地址之后,就可以以地址为维度,将FragmentExecParams生成多个实例,也就是FragmentExecParams中包含的地址有多个,就生成多个实例FInstanceExecParam。如果设置了并发度,那么一个地址的执行实例再进一步的拆成多个FInstanceExecParam。针对bucket shuffle join和colocate join会有一些特殊处理,但是基本思想一样。FInstanceExecParam创建完成后,会分配一个唯一的ID,方便追踪信息。如果FragmentExecParams中包含有ExchangeNode,需要计算有多少senders,以便知道需要接受多少个发送方的数据。最后FragmentExecParams确定destinations,并把目的地址填充上去。

  • 3. send阶段:将fragmen分发到BE。该阶段会调用BE的Proxy,这是一个异步的行为,coordinator只负责发,但是不负责等待执行结果

    private void sendFragment() throws TException, RpcException, UserException {
         for (PlanFragment fragment : fragments) {
           ..........
         }
         BackendServiceProxy proxy = BackendServiceProxy.getInstance();
        futures.add(ImmutableTriple.of(states, proxy, states.execRemoteFragmentsAsync(proxy)));
    }

    public class BackendServiceProxy {
         public Future<InternalService.PExecPlanFragmentResult> execPlanFragmentsAsync(TNetworkAddress address,
              TExecPlanFragmentParamsList paramsList, boolean twoPhaseExecution) throws TException, RpcException {
                final BackendServiceClient client = getProxy(address);
                return client.execPlanFragmentAsync(pRequest);
        }
    }
  • 三、查询执行器

    为了简单起见,本文介绍的是非pipeline执行模式

    BE 的 BackendService 会接收 FE 的 查询请求,让 FragmentMgr 进行处理。FragmentMgr::exec_plan_fragment 会启动一个线程由 PlanFragmentExecutor 具体执行一个 plan fragment。PlanFragmentExecutor 会根据 plan fragment 创建一个 ExecNode 树,FE 每个 PlanNode 都会对应 ExecNode 的一个子类。

    PlanFragmentExecutor::get_next_internal 会驱动整个 ExecNode 树的执行,会自顶向下调用每个 ExecNode 的 get_next 方法,最终数据会从 ScanNode 节点产生,向上层节点传递,每个节点都会按照自己的逻辑处理 RowBatch。PlanFragmentExecutor 在拿到每个 RowBatch 后,如果是中间结果,就会将数据传输给其他 BE 节点,如果是最终结果,就会将数据传输给 FE 节点。

    整个执行的数据流是由底层的fragment叶子节点先去读磁盘数据,然后一步一步的返还给最top层的fragment。然后最top层的数据被FE拉取算好的数据,再返还给客户端。

    从以上逻辑可以看到几个关键的节点:

    1. 1. 单个 Fragment 执行流程

    2. 2. Fragment 之间的数据交互

    3. 3. FE 和 Top Fragment

    4. 4. FE 将数据返回给前端展示

    后文将依次进行介绍。

    3.1、单个 Fragment 执行流程


    通过前文的介绍,可以知道,单个Fragment 是由一个DataSink+一个子树构成的。be里面负责执行单个 Fragment 执行流程的类:PlanFragmentExecutor

    class PlanFragmentExecutor {
        public:
           //主要对来自FE的TExecPlanFragmentParams进行解析以及做一些准备工作
           Status prepare(const TExecPlanFragmentParams& request);
           Status open();
           Status execute();
        private:
           //对应上图plan子树的root节点
           ExecNode* _plan;    // lives in _runtime_state->obj_pool()
           //对应上图DataSink
           std::unique_ptr<DataSink> _sink;
           std::unique_ptr<RuntimeState> _runtime_state;
    }

    其中最主要的逻辑是:

    1. 1. 调用root节点(_plan)的get_next

    2. 2. 然后不断调用子节点的get_next

    3. 3. 然后把get_next返回的数据通过 _sink->send发送给上层的节点

    伪代码如下:

    1. PlanFragmentExecutor::prepare
    2. PlanFragmentExecutor::open
       1. open plan node tree
       2. open sink
       3. while(true) {
          get next batch
          send
          }
    3. close

    假如某fragment的root node为Hash Join Node,具体如下图:

    这里仔细看一下 _plan->get_next()即Hash Join Node的get_next()做了什么?

    以Hash Join Node为例:

    • • root节点的get_next

      • • 输入:下层计算的结果;

      • • 输出:这层算好的值

    • • Hash Join Node首先在open阶段会

      • • 执行右孩子child[1]->get_next()构建hash表,即获取右孩子的数据并构建成hash表

      • • 当所有右孩子的数据都获取完成之后,即完成open阶段

    • • Hash Join Node然后执行child[0]->get_next会自顶向下一层一层的调用下层算子的get_next。在这个阶段会:

      • • 读取左孩子(child[0])的一个batch

      • • 根据hash table找到match的行

      • • 对左右孩子的行进行拼接并放在output_block

    • • 直到最下层的算子执行完了,再自底向上的返回结果

    3.2、Fragment 之间的数据交互

    说完单个Fragment执行流程,就涉及到Fragment 之间的数据交互。

    Fragment 之间的数据交互是存在不同的分发策略的(实现VDataStreamSender::send())

    1. 1. HASH_PARTITIONED:有些场景,需要对下层的数据进行hash,然后发送给不同的上层节点,如下图所示

    2. 2. UNPARTITIONED:有些场景,需要将数据全部发送给同一个上层节点

    3.2.1、关键结构体

    • • VExchangeNode:负责不同fragment的数据接收的node

      • • 该类中有个receiver节点,负责接收不同channel发来的数据。

      • • VDataStreamRecvr:在VExchangeNode中真正的负责数据的接收。主要逻辑就是不断接受数据返回给上层,具体逻辑见:VExchangeNode::get_next-->VExchangeNode::get_next

    • • DataSink:负责不同fragment的数据发送的node(这是个基类)。

      • • 该类中有一个channel list,其中channel 就是负责对数据进行发送的。一般来说channel数跟上层的Exchange Node数量保持一致,每个channel负责给对应的be发数据,具体逻辑见:VDataStreamSender::send()

      • • VDataStreamSender:在DataSink中真正的负责数据的发送,如果支持向量化则会new VDataStreamSender,否则是new DataStreamSender。

    3.2.2、sink如何发送数据

    sink如何发送数据给exchange node?

    1. 1. 发送给local 的 exchange node 这里就直接调用recvr的add_block方法,直接将数据放入VDataStreamRecvr::_sender_queues::_block_queue中,

    2. 2. 通过rpc发送给 remote的exchange node 最终是将数据通过rpc,同local一样放入了_block_queue中,后续上层节点调用get_next方法时会调用_sender_queues的get batch方法从_block_queue中获取数据,调用链:

    VDataStreamSender::Channel::send_block -> PInternalServiceImpl::transmit_block -> VDataStreamMgr::transmit_block -> VDataStreamRecvr::add_block -> VDataStreamRecvr::SenderQueue::add_block

    3.2.3、exchange node如何接收并合并

    exchange node如何接收、存储数据?exchange node在真实运行时,上层算子会调用exchange node的get next方法,传入一个空block,通过exchange node的get next方法将数据通过block返回给上层算子。

    1. 1. 单个sender发来的数据 调用_sender_queues的get_batch方法将数据从_block_queue中取出,最终返回给get_next方法的block。

    2. 2. 多个sender发来的数据 每个remote sender都会将数据通过rpc发送给exchange node,首先会到达exchange node所在be的PInternalServiceImpl::transmit_block方法,进而调用VDataStreamMgr::transmit_block方法中,该方法中会根据node_id找到对应需要接收数据的exchange node中的recvr,然后recvr负责接收数据,并根据sender_id, 将数据通过add_block方法存入指定的_sender_queues的_block_queue中。

    简而言之,sink向exchange node发送数据共分为2种情况:两个节点在同一个BE,local send ,这样sender直接调用recvr的add block,将数据直接放入recvr中的指定位置。两个节点不在同一个BE,则通过rpc send,在exchange node所在be节点上找到exchange node的recvr,调用add block将数据放入指定位置。

    3.3、FE 和 Top Fragment数据交互

    说完了Fragment之间的交互呢,数据会一层一层的传输到top Fragment。这就涉及到了FE 和 Top Fragment数据交互,该步骤的目的就是让FE获取top Fragment计算之后的结果。这个流程是FE主动向 Top Fragment去拉取的,具体结构如下:

    3.3.1、FE

    FE的Coordinator中有一个重要结构体:ResultReceiver,FE会通过该结构体不断的向 Top Fragment主动拉取BE结果的的。

    伪代码如下:

    1. 1. 不断通过receiver.getNext获取 Top Fragment数据

    2. 2. 然后将数据放在FE自己的channel中

    3.3.2、BE

    BE的 Top Fragment有一个重要结构体:VMysqlResultWriter,其会把结果不断放在Row Buffer中,等待FE去拉取。

    伪代码如下:

    1. 1. 把数据放在row_buffer中缓存

    2. 2. 然后拷贝到Thrift

    Status VMysqlResultWriter<is_binary_format>::append_block(Block& input_block){
            for (size_t row_idx = 0; row_idx != num_rows; ++row_idx) {
                 for (int i = 0; i < _output_vexpr_ctxs.size(); ++i) {
                    RETURN_IF_ERROR(arguments[i].serde->write_column_to_mysql(
                            *(arguments[i].column), row_buffer, row_idx, arguments[i].is_const));
                }
                // copy MysqlRowBuffer to Thrift
                result->result_batch.rows[row_idx].append(row_buffer.buf(), row_buffer.length());
            }
    }

    3.4、FE 将数据返回给前端展示

    上文提到FE拉取 Top Fragment数据以后,会将数据放在自己的channel中,即MysqlChannel类。这个MysqlChannel即是负责FE的数据返回给前端展示的。

    1. 1. 该类中的sendOnePacket()这个方法并不是说coordinator调用一次,就send一次。

    2. 2. 而是先写一个Buffer,等buffer写满以后,才会调用一次发送逻辑:realNetSend

    数据返回给用户以后,整个查询流程就结束了。

    3.5、源码链路

    这里分析一下笔者调试SQL执行的代码链路:

    SQL
    SELECT l_shipdate,sum(l_suppkey) FROM lineitem JOIN orders ON l_orderkey=o_orderkey GROUP BY l_shipdate ORDER BY sum(l_suppkey);

    # 测试环境
    1 FE + 1 BE

    # 数据量
    tpch 3G

    3.5.1、执行计划

    通执行 explain SELECT l_shipdate,sum(l_suppkey) FROM lineitem JOIN orders ON l_orderkey=o_orderkey GROUP BY l_shipdate ORDER BY sum(l_suppkey);

    1. 1. 可以看到BE中有3个FRAGMENT

    2. 2. FRAGMENT直接通过EXCHANGE跟SINK进行连接的

    3.5.2、源码执行流程

    prepare

    1. 1. 由于有3个fragment,所以会执行3次 PlanFragmentExecutor::prepare

    2. 2. _plan指的是子树的root节点,,其在执行prepare过程中会将子节点也调用执行。例如说 fragment 0,_plan就是exchange;

    3. 3. prepare过程中会按需初始化exchange node。例如说 fragment 1就需要初始化,因为要接受fragment 2的数据;fragment 2不需要,应该所有数据的传输的以及计算在local即可完成

    FragmentMgr::exec_plan_fragment(const TExecPlanFragmentParams& params)
    |--> FragmentMgr::exec_plan_fragment(params, empty_function)
    |  |--> fragment_executor->prepare(params)
    |  |  |--> PlanFragmentExecutor::prepare
    |  |  |  |--> //会根据 plan fragment 创建当前fragment的 ExecNode 子树
    |  |  |  |--> ExecNode::create_tree
    |  |  |  |--> //准备exchange node
    |  |  |  |--> _plan->collect_nodes(TPlanNodeType::EXCHANGE_NODE, &exch_nodes);
    |  |  |  |--> _plan->prepare
    |  |  |  |--> //准备scan node
    |  |  |  |--> _plan->collect_scan_nodes
    |  |  |  |--> //准备sink node
    |  |  |  |--> DataSink::create_data_sink
    |  |  |  |--> _sink->prepare

    execute

    1. 1. BE 的 BackendService 会接收 FE 的 查询请求,让 FragmentMgr 进行处理

    2. 2. _plan->open会依次open子树的所有节点

    3. 3. HashJoinNode在open阶段会执行右孩子get_next,构建hash表。然后在HashJoinNode::get_next阶段根据左孩子的数据进行hash判断。

    4. 4. 最终通过_sink->send来将数据传输上层fragment

    //BE 的 BackendService 会接收 FE 的 查询请求,让 FragmentMgr 进行处理
    //这里是PLAN FRAGMENT 2的堆栈
    FragmentMgr::_exec_actual
    |--> PlanFragmentExecutor::execute
    |  |--> PlanFragmentExecutor::open
    |  |  |--> PlanFragmentExecutor::open_vectorized_internal
    |  |  |  |--> //open plan node tree
    |  |  |  |--> _plan->open
    |  |  |  |  |--> AggregationNode::open
    |  |  |  |  |  |--> ExecNode::open
    |  |  |  |  |  |--> _children[0]->open
    |  |  |  |  |  |  |--> //open:准备工作
    |  |  |  |  |  |  |--> HashJoinNode::open
    |  |  |  |  |  |  |  |--> VJoinNodeBase::open
    |  |  |  |  |  |  |  |  |--> //它使用C++11中的promise和future机制启动一个异步线程
    |  |  |  |  |  |  |  |  |--> //,来并行执行_probe_side_open_thread方法
    |  |  |  |  |  |  |  |  |--> std::promise<Status> thread_status;
    |  |  |  |  |  |  |  |  |--> //它通过调用submit_func方法将_probe_side_open_thread方法提交到线程池中。
    |  |  |  |  |  |  |  |  |--> //这里的state->exec_env()->join_node_thread_pool()获取了Doris引擎的线程池对象
    |  |  |  |  |  |  |  |  |--> //,该线程池对象用于执行与join相关的操作,包括probe线程的打开和异步的join操作。
    |  |  |  |  |  |  |  |  |--> state->exec_env()->join_node_thread_pool()->submit_func(
                    [this, state, thread_status_p = &thread_status] {
                        this->_probe_side_open_thread(state, thread_status_p);
                    });
    |  |  |  |  |  |  |  |  |--> _materialize_build_side(state)
    |  |  |  |  |  |  |  |  |  |--> HashJoinNode::_materialize_build_side
    |  |  |  |  |  |  |  |  |  |  |--> //右孩子get_next,构建hash表
    |  |  |  |  |  |  |  |  |  |  |--> child(1)->get_next_after_projects
    |  |  |  |  |  |  |  |  |  |--> HashJoinNode::sink
    |  |  |  |  |  |  |  |  |  |  |--> _process_build_block 
    |  |  |  |  |  |  |  |  |  |  |  |--> _extract_join_column
    |  |  |  |  |  |  |  |  |  |  |  |--> hash_table_build_process
    |  |  |  |  |  |  |  |  |--> //使用get_future().get()方法阻塞等待异步线程的完成,
    |  |  |  |  |  |  |  |  |--> //并返回线程的状态。如果线程执行成功,
    |  |  |  |  |  |  |  |  |--> //则调用VExpr::open方法打开join的输出表达式,否则返回错误状态。
    |  |  |  |  |  |  |  |  |--> RETURN_IF_ERROR(thread_status.get_future().get())
    |  |  |  |  |  |  |  |  |--> RETURN_IF_ERROR(VExpr::open(_output_expr_ctxs, state))
    |  |  |  |--> //open sink
    |  |  |  |--> _sink->open
    |  |  |  |--> /*while(true)*/
    |  |  |  |--> PlanFragmentExecutor::get_vectorized_internal
    |  |  |  |  |--> ExecNode::get_next_after_projects
    |  |  |  |  |  |--> //一些回调
    |  |  |  |  |  |--> vectorized::AggregationNode::get_next
    |  |  |  |  |  |  |--> ExecNode::get_next_after_projects
    |  |  |  |  |  |  |--> //一些回调
    |  |  |  |  |  |  |  |--> vectorized::HashJoinNode::get_next
    |  |  |  |  |  |  |  |  |--> /*loop*/
    |  |  |  |  |  |  |  |  |--> HashJoinNode::prepare_for_next
    |  |  |  |  |  |  |  |  |  |--> HashJoinNode::_prepare_probe_block 
    |  |  |  |  |  |  |  |  |--> //1. 读取左孩子的一个batch 
    |  |  |  |  |  |  |  |  |--> child(0)->get_next_after_projects()
    |  |  |  |  |  |  |  |  |--> vectorized::AggregationNode::push
    |  |  |  |  |  |  |  |  |--> /*end loop*/
    |  |  |  |  |  |  |  |-->//根据hashtable找到match的行,放入 output_block
    |  |  |  |  |  |  |  |--> vectorized::AggregationNode::pull
    |  |  |  |  |  |  |  |  |--> _build_output_block(&temp_block, output_block, false)
    |  |  |  |  |--> _sink->send
    |  |  |  |  |  |--> VDataStreamSender::send
    |  |  |  |  |  |  |--> //如果两个需要传递数据的Plan Fragment在同一个BE节点上,
    |  |  |  |  |  |  |--> //那么上层的sender会直接通过RuntimeState对象进而找到recver,
    |  |  |  |  |  |  |--> //直接在sink节点调用exchange node 中的
    |  |  |  |  |  |  |--> //recver对象的add_block将数据直接放入exchange node 中
    |  |  |  |  |  |  |--> /**loop for channels*/
    |  |  |  |  |  |  |--> channel->send_local_block
    |  |  |  |  |  |  |  |--> Channel<Parent>::send_local_block
    |  |  |  |  |  |  |  |  |--> _local_recvr->add_block
    |  |  |  |  |  |  |  |  |  |--> VDataStreamRecvr::add_block
    |  |  |  |  |  |  |--> /**end loop for channels*/
    |  |  |  |  |  |  |--> //通过rpc发送给 remote的exchange node
    |  |  |  |  |  |  |--> /**loop for channels*/
    |  |  |  |  |  |  |--> channel->send_block
    |  |  |  |  |  |  |  |--> 
    |  |  |  |  |  |  |--> /**end loop for channels*/
    |  |  |  |--> /*end while(true)*/

    下面给出笔者调试代码过程中一些堆栈记录:

    send调用堆栈

    HashJoinNode::get_next堆栈

    四、总结

    一条SQL的执行主要包含以下3个阶段:

    1. 1. FE生成查询计划

    • • 逻辑查询计划 PlanNodeTree,每个 PlanNode 代表一种运算。

    • • 分布式查询计划 PlanFragmentTree ,每个 PlanFragment 是由 PlanNodeTree 的子树 和 Sink 节点组成的。

  • 2. FE进行分配分发

  • 3. BE执行查询

    • • Plan Fragment Tree 一层层处理数据,FE 获取后,最终返回给用户

    • • 单个 Fragment 执行,递归调用 get_next 计算结果

    • • Fragment 和 Fragment 之间, sink 通过 channel 分发数据给上层Exchange Node

    • • FE coordinator 不断获取 Top Fragment 的 row buffer 中的数据

    • • 通过 Mysql Channel 将数据返回给 Client

    五、参考文档

    • • https://blog.csdn.net/qq_43377188/article/details/125292678

    • • https://www.slidestalk.com/doris.apache/SQL82045

    • • https://github.com/apache/doris/pull/15491

    • • https://www.infoq.cn/article/qtfkmuxpuq2zk2ekjnxd

    • • https://cloud.tencent.com/developer/article/2042956

    • • https://www.modb.pro/db/44960

    300万字!全网最全大数据学习面试社区等你来!


    如果这个文章对你有帮助,不要忘记 「在看」 「点赞」 「收藏」 三连啊喂!

    全网首发|大数据专家级技能模型与学习指南(胜天半子篇)
    互联网最坏的时代可能真的来了
    我在B站读大学,大数据专业
    我们在学习Flink的时候,到底在学习什么?
    193篇文章暴揍Flink,这个合集你需要关注一下
    Flink生产环境TOP难题与优化,阿里巴巴藏经阁YYDS
    Flink CDC我吃定了耶稣也留不住他!| Flink CDC线上问题小盘点
    我们在学习Spark的时候,到底在学习什么?
    在所有Spark模块中,我愿称SparkSQL为最强!
    硬刚Hive | 4万字基础调优面试小总结
    数据治理方法论和实践小百科全书
    标签体系下的用户画像建设小指南
    4万字长文 | ClickHouse基础&实践&调优全视角解析
    【面试&个人成长】社招和校招的经验之谈
    大数据方向另一个十年开启 |《硬刚系列》第一版完结
    我写过的关于成长/面试/职场进阶的文章
    当我们在学习Hive的时候在学习什么?「硬刚Hive续集」

    大数据技术与架构
    王知无,大数据卷王,专注大数据技术分享。
     最新文章