0
  • 聊天消息
  • 系统消息
  • 评论与回复
登录后你可以
  • 下载海量资料
  • 学习在线课程
  • 观看技术视频
  • 写文章/发帖/加入社区
会员中心
创作中心

完善资料让更多小伙伴认识你,还能领取20积分哦,立即完善>

3天内不再提示

深度剖析SQL中的Grouping Sets语句2

jf_78858299 来源:元闰子的邀请 作者:元闰子 2023-05-10 17:44 次阅读

Expand 算子的实现

Expand 算子在 Spark SQL 源码中的实现为 ExpandExec 类(Spark SQL 中的算子实现类的命名都是 XxxExec 的格式,其中 Xxx 为具体的算子名,比如 Project 算子的实现类为 ProjectExec),核心代码如下:

/**
 * Apply all of the GroupExpressions to every input row, hence we will get
 * multiple output rows for an input row.
 * @param projections The group of expressions, all of the group expressions should
 *                    output the same schema specified bye the parameter `output`
 * @param output      The output Schema
 * @param child       Child operator
 */
case class ExpandExec(
    projections: Seq[Seq[Expression]],
    output: Seq[Attribute],
    child: SparkPlan)
  extends UnaryExecNode with CodegenSupport {

  ...
  // 关键点1,将child.output,也即上游算子输出数据的schema,
  // 绑定到表达式数组exprs,以此来计算输出数据
  private[this] val projection =
    (exprs: Seq[Expression]) => UnsafeProjection.create(exprs, child.output)

  // doExecute()方法为Expand算子执行逻辑所在
  protected override def doExecute(): RDD[InternalRow] = {
    val numOutputRows = longMetric("numOutputRows")

    // 处理上游算子的输出数据,Expand算子的输入数据就从iter迭代器获取
    child.execute().mapPartitions { iter =>
      // 关键点2,projections对应了Grouping Sets里面每个grouping set的表达式,
      // 表达式输出数据的schema为this.output, 比如 (quantity, city, car_model, spark_grouping_id)
      // 这里的逻辑是为它们各自生成一个UnsafeProjection对象,通过该对象的apply方法就能得出Expand算子的输出数据
      val groups = projections.map(projection).toArray
      new Iterator[InternalRow] {
        private[this] var result: InternalRow = _
        private[this] var idx = -1  // -1 means the initial state
        private[this] var input: InternalRow = _

        override final def hasNext: Boolean = (-1 < idx && idx < groups.length) || iter.hasNext

        override final def next(): InternalRow = {
          // 关键点3,对于输入数据的每一条记录,都重复使用N次,其中N的大小对应了projections数组的大小,
          // 也即Grouping Sets里指定的grouping set的数量
          if (idx <= 0) {
            // in the initial (-1) or beginning(0) of a new input row, fetch the next input tuple
            input = iter.next()
            idx = 0
          }
          // 关键点4,对输入数据的每一条记录,通过UnsafeProjection计算得出输出数据,
          // 每个grouping set对应的UnsafeProjection都会对同一个input计算一遍
          result = groups(idx)(input)
          idx += 1

          if (idx == groups.length && iter.hasNext) {
            idx = 0
          }

          numOutputRows += 1
          result
        }
      }
    }
  }
  ...
}

ExpandExec 的实现并不复杂,想要理解它的运作原理,关键是看懂上述源码中提到的 4 个关键点。

关键点 1关键点 2 是基础,关键点 2 中的 groups 是一个 UnsafeProjection[N] 数组类型,其中每个 UnsafeProjection 代表了 Grouping Sets 语句里指定的 grouping set,它的定义是这样的:

// A projection that returns UnsafeRow.
abstract class UnsafeProjection extends Projection {
  override def apply(row: InternalRow): UnsafeRow
}

// The factory object for `UnsafeProjection`.
object UnsafeProjection
    extends CodeGeneratorWithInterpretedFallback[Seq[Expression], UnsafeProjection] {
  // Returns an UnsafeProjection for given sequence of Expressions, which will be bound to
  // `inputSchema`.
  def create(exprs: Seq[Expression], inputSchema: Seq[Attribute]): UnsafeProjection = {
    create(bindReferences(exprs, inputSchema))
  }
  ...
}

UnsafeProjection 起来了类似列投影的作用,其中, apply 方法根据创建时的传参 exprsinputSchema,对输入记录进行列投影,得出输出记录。

比如,前面的 GROUPING SETS ((city, car_model), (city), (car_model), ())例子,它对应的 groups 是这样的:

图片

其中,AttributeReference 类型的表达式,在计算时,会直接引用输入数据对应列的值;Iteral 类型的表达式,在计算时,值是固定的。

关键点 3关键点 4 是 Expand 算子的精华所在,ExpandExec 通过这两段逻辑,将每一个输入记录, 扩展(Expand) 成 N 条输出记录。

关键点 4groups(idx)(input) 等同于 groups(idx).apply(input)

还是以前面 GROUPING SETS ((city, car_model), (city), (car_model), ()) 为例子,效果是这样的:

图片

到这里,我们已经弄清楚 Expand 算子的工作原理,再回头看前面提到的 3 个问题,也不难回答了:

  1. Expand 的实现逻辑是怎样的,为什么能达到 Union All 的效果?
    如果说 Union All 是先聚合再联合,那么 Expand 就是先联合再聚合。Expand 利用 groups 里的 N 个表达式对每条输入记录进行计算,扩展成 N 条输出记录。后面再聚合时,就能达到与 Union All 一样的效果了。
  2. Expand 节点的输出数据是怎样的

在 schema 上,Expand 输出数据会比输入数据多出 spark_grouping_id 列;在记录数上,是输入数据记录数的 N 倍。

  1. spark_grouping_id 列的作用是什么

spark_grouping_id 给每个 grouping set 进行编号,这样,即使在 Expand 阶段把数据先联合起来,在 Aggregate 阶段(把 spark_grouping_id 加入到分组规则)也能保证数据能够按照每个 grouping set 分别聚合,确保了结果的正确性。

查询性能对比

从前文可知,Grouping Sets 和 Union All 两个版本的 SQL 语句有着一样的效果,但是它们的执行计划却有着巨大的差别。下面,我们将比对两个版本之间的执行性能差异。

spark-sql 执行完 SQL 语句之后会打印耗时信息,我们对两个版本的 SQL 分别执行 10 次,得到如下信息:

// Grouping Sets 版本执行10次的耗时信息
// SELECT city, car_model, sum(quantity) AS sum FROM dealer GROUP BY GROUPING SETS ((city, car_model), (city), (car_model), ()) ORDER BY city, car_model;
Time taken: 0.289 seconds, Fetched 15 row(s)
Time taken: 0.251 seconds, Fetched 15 row(s)
Time taken: 0.259 seconds, Fetched 15 row(s)
Time taken: 0.258 seconds, Fetched 15 row(s)
Time taken: 0.296 seconds, Fetched 15 row(s)
Time taken: 0.247 seconds, Fetched 15 row(s)
Time taken: 0.298 seconds, Fetched 15 row(s)
Time taken: 0.286 seconds, Fetched 15 row(s)
Time taken: 0.292 seconds, Fetched 15 row(s)
Time taken: 0.282 seconds, Fetched 15 row(s)

// Union All 版本执行10次的耗时信息
// (SELECT city, car_model, sum(quantity) AS sum FROM dealer GROUP BY city, car_model) UNION ALL (SELECT city, NULL as car_model, sum(quantity) AS sum FROM dealer GROUP BY city) UNION ALL (SELECT NULL as city, car_model, sum(quantity) AS sum FROM dealer GROUP BY car_model) UNION ALL (SELECT NULL as city, NULL as car_model, sum(quantity) AS sum FROM dealer) ORDER BY city, car_model;
Time taken: 0.628 seconds, Fetched 15 row(s)
Time taken: 0.594 seconds, Fetched 15 row(s)
Time taken: 0.591 seconds, Fetched 15 row(s)
Time taken: 0.607 seconds, Fetched 15 row(s)
Time taken: 0.616 seconds, Fetched 15 row(s)
Time taken: 0.64 seconds, Fetched 15 row(s)
Time taken: 0.623 seconds, Fetched 15 row(s)
Time taken: 0.625 seconds, Fetched 15 row(s)
Time taken: 0.62 seconds, Fetched 15 row(s)
Time taken: 0.62 seconds, Fetched 15 row(s)

可以算出,Grouping Sets 版本的 SQL 平均耗时为 0.276s ;Union All 版本的 SQL 平均耗时为 0.616s ,是前者的 2.2 倍

所以, Grouping Sets 版本的 SQL 不仅在表达上更加简洁,在性能上也更加高效

RollUp 和 Cube

Group By 的高级用法中,还有 RollUpCube 两个比较常用。

首先,我们看下 RollUp 语句

Spark SQL 官方文档中 SQL Syntax 一节对 RollUp 语句的描述如下:

Specifies multiple levels of aggregations in a single statement. This clause is used to compute aggregations based on multiple grouping sets. ROLLUP is a shorthand for GROUPING SETS. (... 一些例子)

官方文档中,把 RollUp 描述为 Grouping Sets 的简写,等价规则为:RollUp(A, B, C) == Grouping Sets((A, B, C), (A, B), (A), ())

比如,Group By RollUp(city, car_model) 就等同于 Group By Grouping Sets((city, car_model), (city), ())

下面,我们通过 expand extended 看下 RollUp 版本 SQL 的 Optimized Logical Plan:

spark-sql> explain extended SELECT city, car_model, sum(quantity) AS sum FROM dealer GROUP BY ROLLUP(city, car_model) ORDER BY city, car_model;
== Parsed Logical Plan ==
...
== Analyzed Logical Plan ==
...
== Optimized Logical Plan ==
Sort [city#2164 ASC NULLS FIRST, car_model#2165 ASC NULLS FIRST], true
+- Aggregate [city#2164, car_model#2165, spark_grouping_id#2163L], [city#2164, car_model#2165, sum(quantity#2159) AS sum#2150L]
   +- Expand [[quantity#2159, city#2157, car_model#2158, 0], [quantity#2159, city#2157, null, 1], [quantity#2159, null, null, 3]], [quantity#2159, city#2164, car_model#2165, spark_grouping_id#2163L]
      +- Project [quantity#2159, city#2157, car_model#2158]
         +- HiveTableRelation [`default`.`dealer`, ..., Data Cols: [id#2156, city#2157, car_model#2158, quantity#2159], Partition Cols: []]
== Physical Plan ==
...

从上述 Plan 可以看出,RollUp 底层实现用的也是 Expand 算子,说明 RollUp 确实是基于 Grouping Sets 实现的。 而且 Expand [[quantity#2159, city#2157, car_model#2158, 0], [quantity#2159, city#2157, null, 1], [quantity#2159, null, null, 3]] 也表明 RollUp 符合等价规则。

下面,我们按照同样的思路,看下 Cube 语句

Spark SQL 官方文档中 SQL Syntax 一节对 Cube 语句的描述如下:

CUBE clause is used to perform aggregations based on combination of grouping columns specified in the GROUP BYclause. CUBE is a shorthand for GROUPING SETS. (... 一些例子)

同样,官方文档把 Cube 描述为 Grouping Sets 的简写,等价规则为:Cube(A, B, C) == Grouping Sets((A, B, C), (A, B), (A, C), (B, C), (A), (B), (C), ())

比如,Group By Cube(city, car_model) 就等同于 Group By Grouping Sets((city, car_model), (city), (car_model), ())

下面,我们通过 expand extended 看下 Cube 版本 SQL 的 Optimized Logical Plan:

spark-sql> explain extended SELECT city, car_model, sum(quantity) AS sum FROM dealer GROUP BY CUBE(city, car_model) ORDER BY city, car_model;
== Parsed Logical Plan ==
...
== Analyzed Logical Plan ==
...
== Optimized Logical Plan ==
Sort [city#2202 ASC NULLS FIRST, car_model#2203 ASC NULLS FIRST], true
+- Aggregate [city#2202, car_model#2203, spark_grouping_id#2201L], [city#2202, car_model#2203, sum(quantity#2197) AS sum#2188L]
   +- Expand [[quantity#2197, city#2195, car_model#2196, 0], [quantity#2197, city#2195, null, 1], [quantity#2197, null, car_model#2196, 2], [quantity#2197, null, null, 3]], [quantity#2197, city#2202, car_model#2203, spark_grouping_id#2201L]
      +- Project [quantity#2197, city#2195, car_model#2196]
         +- HiveTableRelation [`default`.`dealer`, ..., Data Cols: [id#2194, city#2195, car_model#2196, quantity#2197], Partition Cols: []]
== Physical Plan ==
...

从上述 Plan 可以看出,Cube 底层用的也是 Expand 算子,说明 Cube 确实基于 Grouping Sets 实现,而且也符合等价规则。

所以,RollUpCube 可以看成是 Grouping Sets 的语法糖,在底层实现和性能上是一样的。

最后

本文重点讨论了 Group By 高级用法 Groupings Sets 语句的功能和底层实现。

虽然 Groupings Sets 的功能,通过 Union All 也能实现,但前者并非后者的语法糖,它们的底层实现完全不一样。Grouping Sets 采用的是先联合再聚合的思路,通过 spark_grouping_id 列来保证数据的正确性;Union All 则采用先聚合再联合的思路。Grouping Sets 在 SQL 语句表达和性能上都有更大的优势

Group By 的另外两个高级用法 RollUpCube 则可以看成是 Grouping Sets 的语法糖,它们的底层都是基于 Expand 算子实现, 在性能上与直接使用 Grouping Sets 是一样的,但在 SQL 表达上更加简洁

声明:本文内容及配图由入驻作者撰写或者入驻合作网站授权转载。文章观点仅代表作者本人,不代表电子发烧友网立场。文章及其配图仅供工程师学习之用,如有内容侵权或者其他违规问题,请联系本站处理。 举报投诉
  • 数据
    +关注

    关注

    8

    文章

    6593

    浏览量

    87959
  • SQL
    SQL
    +关注

    关注

    1

    文章

    742

    浏览量

    43645
  • 函数
    +关注

    关注

    3

    文章

    4147

    浏览量

    61558
收藏 人收藏

    评论

    相关推荐

    在Delphi动态地使用SQL查询语句

    在Delphi动态地使用SQL查询语句在一般的数据库管理系统,通常都需要应用SQL查询语句
    发表于 05-10 11:10

    SQL语句生成器

    SQL语句生成器SQL数据库语句生成及分析器(支持表结构、索引、所有记录到SQL脚本)可用于数据数的备份和恢复!功能不用多说,试试就知道了
    发表于 06-12 16:15

    关于labviewSQL语句写法

    我的问题是:比如说要查询数据库的时间在20120806-20130105之间的数据 ,用vi程序查询,那么SQL query语句该怎么写? 求大神帮忙???谢谢
    发表于 01-05 22:09

    C语言深度剖析

    C语言深度剖析[完整版].pdfC语言深度剖析[完整版].pdf (919.58 KB )
    发表于 03-19 05:11

    SQL语句的两种嵌套方式

    一般情况下,SQL语句是嵌套在宿主语言(如C语言)的。有两种嵌套方式:1.调用层接口(CLI):提供一些库,库的函数和方法实现SQL的调
    发表于 05-23 08:51

    区分SQL语句与主语言语句

    为了区分SQL语句与主语言语句,所有SQL 语句必须加前缀EXEC SQL处理过程:含嵌入式
    发表于 10-28 08:44

    为什么要动态sql语句

    为什么要动态sql语句?因为动态sql语句能够提供一些比较友好的机制1、可以使得一些在编译过程无法获得完整的
    发表于 12-20 06:00

    数据库SQL语句电子教程

    电子发烧友为您提供了数据库SQL语句电子教程,帮助您了解数据库 SQL语句 ,学习读懂数据库SQL语句
    发表于 07-14 17:09 0次下载

    如何使用navicat或PHPMySQLAdmin导入SQL语句

    很多朋友问我们怎么导入SQL语句,这是新人最需要知道的东西,现制作图文教程,希望对新手有所帮助,顺便文末附SQL语句导入导出大全,高手可以提供更加详细的教程。
    发表于 04-10 15:06 2次下载

    如何使用SQL修复语句程序说明

    本文档的主要内容详细介绍的是如何使用SQL修复语句程序说明。
    发表于 10-31 15:09 5次下载

    嵌入式SQL语句

    为了区分SQL语句与主语言语句,所有SQL 语句必须加前缀EXEC SQL处理过程:含嵌入式
    发表于 10-21 11:51 4次下载
    嵌入式<b class='flag-5'>SQL</b><b class='flag-5'>语句</b>

    SQL语句利用日志写shell及相关绕过

    在能够写SQL语句的地方,outfile、dumpfile、drop database等都被禁止,一般进行SQL注入来getshell或删库的方式行不通了。
    的头像 发表于 02-03 17:32 1569次阅读

    深度剖析SQL中的Grouping Sets语句1

    SQL 中 `Group By` 语句大家都很熟悉, **根据指定的规则对数据进行分组** ,常常和**聚合函数**一起使用。
    的头像 发表于 05-10 17:44 557次阅读
    <b class='flag-5'>深度</b><b class='flag-5'>剖析</b><b class='flag-5'>SQL</b>中的<b class='flag-5'>Grouping</b> <b class='flag-5'>Sets</b><b class='flag-5'>语句</b>1

    sql查询语句大全及实例

    SQL(Structured Query Language)是一种专门用于数据库管理系统的标准交互式数据库查询语言。它被广泛应用于数据库管理和数据操作领域。在本文中,我们将为您详细介绍SQL查询语句
    的头像 发表于 11-17 15:06 887次阅读

    oracle执行sql查询语句的步骤是什么

    Oracle数据库是一种常用的关系型数据库管理系统,具有强大的SQL查询功能。Oracle执行SQL查询语句的步骤包括编写SQL语句、解析
    的头像 发表于 12-06 10:49 547次阅读