0
  • 聊天消息
  • 系统消息
  • 评论与回复
登录后你可以
  • 下载海量资料
  • 学习在线课程
  • 观看技术视频
  • 写文章/发帖/加入社区
会员中心
创作中心

完善资料让更多小伙伴认识你,还能领取20积分哦,立即完善>

3天内不再提示

Join在Spark中是如何组织运行的

人工智能与大数据技术 来源:人工智能与大数据技术 作者:人工智能与大数据 2020-09-25 11:35 次阅读

Join作为SQL中一个重要语法特性,几乎所有稍微复杂一点的数据分析场景都离不开Join,如今Spark SQL(Dataset/DataFrame)已经成为Spark应用程序开发的主流,作为开发者,我们有必要了解Join在Spark中是如何组织运行的。

SparkSQL总体流程介绍

在阐述Join实现之前,我们首先简单介绍SparkSQL的总体流程,一般地,我们有两种方式使用SparkSQL,一种是直接写sql语句,这个需要有元数据库支持,例如Hive等,另一种是通过Dataset/DataFrame编写Spark应用程序。如下图所示,sql语句被语法解析(SQL AST)成查询计划,或者我们通过Dataset/DataFrame提供的APIs组织成查询计划,查询计划分为两大类:逻辑计划和物理计划,这个阶段通常叫做逻辑计划,经过语法分析(Analyzer)、一系列查询优化(Optimizer)后得到优化后的逻辑计划,最后被映射成物理计划,转换成RDD执行。

对于语法解析、语法分析以及查询优化,本文不做详细阐述,本文重点介绍Join的物理执行过程。

Join基本要素

如下图所示,Join大致包括三个要素:Join方式、Join条件以及过滤条件。其中过滤条件也可以通过AND语句放在Join条件中。

Spark支持所有类型的Join,包括:

inner join

left outer join

right outer join

full outer join

left semi join

left anti join

下面分别阐述这几种Join的实现。

Join基本实现流程

总体上来说,Join的基本实现流程如下图所示,Spark将参与Join的两张表抽象为流式遍历表(streamIter)和查找表(buildIter),通常streamIter为大表,buildIter为小表,我们不用担心哪个表为streamIter,哪个表为buildIter,这个spark会根据join语句自动帮我们完成。

在实际计算时,spark会基于streamIter来遍历,每次取出streamIter中的一条记录rowA,根据Join条件计算keyA,然后根据该keyA去buildIter中查找所有满足Join条件(keyB==keyA)的记录rowBs,并将rowBs中每条记录分别与rowAjoin得到join后的记录,最后根据过滤条件得到最终join的记录。

从上述计算过程中不难发现,对于每条来自streamIter的记录,都要去buildIter中查找匹配的记录,所以buildIter一定要是查找性能较优的数据结构。spark提供了三种join实现:sort merge join、broadcast join以及hash join。

sort merge join实现

要让两条记录能join到一起,首先需要将具有相同key的记录在同一个分区,所以通常来说,需要做一次shuffle,map阶段根据join条件确定每条记录的key,基于该key做shuffle write,将可能join到一起的记录分到同一个分区中,这样在shuffle read阶段就可以将两个表中具有相同key的记录拉到同一个分区处理。前面我们也提到,对于buildIter一定要是查找性能较优的数据结构,通常我们能想到hash表,但是对于一张较大的表来说,不可能将所有记录全部放到hash表中,另外也可以对buildIter先排序,查找时按顺序查找,查找代价也是可以接受的,我们知道,spark shuffle阶段天然就支持排序,这个是非常好实现的,下面是sort merge join示意图。

在shuffle read阶段,分别对streamIter和buildIter进行merge sort,在遍历streamIter时,对于每条记录,都采用顺序查找的方式从buildIter查找对应的记录,由于两个表都是排序的,每次处理完streamIter的一条记录后,对于streamIter的下一条记录,只需从buildIter中上一次查找结束的位置开始查找,所以说每次在buildIter中查找不必重头开始,整体上来说,查找性能还是较优的。

broadcast join实现

为了能具有相同key的记录分到同一个分区,我们通常是做shuffle,那么如果buildIter是一个非常小的表,那么其实就没有必要大动干戈做shuffle了,直接将buildIter广播到每个计算节点,然后将buildIter放到hash表中,如下图所示。

从上图可以看到,不用做shuffle,可以直接在一个map中完成,通常这种join也称之为map join。那么问题来了,什么时候会用broadcast join实现呢?这个不用我们担心,spark sql自动帮我们完成,当buildIter的估计大小不超过参数spark.sql.autoBroadcastJoinThreshold设定的值(默认10M),那么就会自动采用broadcast join,否则采用sort merge join。

hash join实现

除了上面两种join实现方式外,spark还提供了hash join实现方式,在shuffle read阶段不对记录排序,反正来自两格表的具有相同key的记录会在同一个分区,只是在分区内不排序,将来自buildIter的记录放到hash表中,以便查找,如下图所示。

不难发现,要将来自buildIter的记录放到hash表中,那么每个分区来自buildIter的记录不能太大,否则就存不下,默认情况下hash join的实现是关闭状态,如果要使用hash join,必须满足以下四个条件:

buildIter总体估计大小超过spark.sql.autoBroadcastJoinThreshold设定的值,即不满足broadcast join条件

开启尝试使用hash join的开关,spark.sql.join.preferSortMergeJoin=false

每个分区的平均大小不超过spark.sql.autoBroadcastJoinThreshold设定的值,即shuffle read阶段每个分区来自buildIter的记录要能放到内存中

streamIter的大小是buildIter三倍以上

所以说,使用hash join的条件其实是很苛刻的,在大多数实际场景中,即使能使用hash join,但是使用sort merge join也不会比hash join差很多,所以尽量使用hash

下面我们分别阐述不同Join方式的实现流程。

inner join

inner join是一定要找到左右表中满足join条件的记录,我们在写sql语句或者使用DataFrame时,可以不用关心哪个是左表,哪个是右表,在spark sql查询优化阶段,spark会自动将大表设为左表,即streamIter,将小表设为右表,即buildIter。这样对小表的查找相对更优。其基本实现流程如下图所示,在查找阶段,如果右表不存在满足join条件的记录,则跳过。

left outer join

left outer join是以左表为准,在右表中查找匹配的记录,如果查找失败,则返回一个所有字段都为null的记录。我们在写sql语句或者使用DataFrmae时,一般让大表在左边,小表在右边。其基本实现流程如下图所示。

right outer join

right outer join是以右表为准,在左表中查找匹配的记录,如果查找失败,则返回一个所有字段都为null的记录。所以说,右表是streamIter,左表是buildIter,我们在写sql语句或者使用DataFrame时,一般让大表在右边,小表在左边。其基本实现流程如下图所示。

full outer join

full outer join相对来说要复杂一点,总体上来看既要做left outer join,又要做right outer join,但是又不能简单地先left outer join,再right outer join,最后union得到最终结果,因为这样最终结果中就存在两份inner join的结果了。因为既然完成left outer join又要完成right outer join,所以full outer join仅采用sort merge join实现,左边和右表既要作为streamIter,又要作为buildIter,其基本实现流程如下图所示。

由于左表和右表已经排好序,首先分别顺序取出左表和右表中的一条记录,比较key,如果key相等,则joinrowA和rowB,并将rowA和rowB分别更新到左表和右表的下一条记录;如果keyAkeyB,则说明左表中没有与右表rowB对应的记录,那么joinnullRow与rowB,紧接着,rowB更新到右表的下一条记录。如此循环遍历直到左表和右表的记录全部处理完。

left semi join

left semi join是以左表为准,在右表中查找匹配的记录,如果查找成功,则仅返回左边的记录,否则返回null,其基本实现流程如下图所示。

left anti join

left anti join与left semi join相反,是以左表为准,在右表中查找匹配的记录,如果查找成功,则返回null,否则仅返回左边的记录,其基本实现流程如下图所示。

总结

Join是数据库查询中一个非常重要的语法特性,在数据库领域可以说是“得join者得天下”,SparkSQL作为一种分布式数据仓库系统,给我们提供了全面的join支持,并在内部实现上无声无息地做了很多优化,了解join的实现将有助于我们更深刻的了解我们的应用程序的运行轨迹。

责任编辑:xj

原文标题:面试必知的 Spark SQL 几种 Join 实现

文章出处:【微信公众号:人工智能与大数据技术】欢迎添加关注!文章转载请注明出处。

声明:本文内容及配图由入驻作者撰写或者入驻合作网站授权转载。文章观点仅代表作者本人,不代表电子发烧友网立场。文章及其配图仅供工程师学习之用,如有内容侵权或者其他违规问题,请联系本站处理。 举报投诉
  • SQL
    SQL
    +关注

    关注

    1

    文章

    762

    浏览量

    44117
  • SPARK
    +关注

    关注

    1

    文章

    105

    浏览量

    19893

原文标题:面试必知的 Spark SQL 几种 Join 实现

文章出处:【微信号:TheBigData1024,微信公众号:人工智能与大数据技术】欢迎添加关注!文章转载请注明出处。

收藏 人收藏

    评论

    相关推荐

    spark为什么比mapreduce快?

    spark为什么比mapreduce快? 首先澄清几个误区: 1:两者都是基于内存计算的,任何计算框架都肯定是基于内存的,所以网上说的spark是基于内存计算所以快,显然是错误的 2;DAG计算模型
    的头像 发表于 09-06 09:45 254次阅读

    spark运行的基本流程

    记录和分享下spark运行的基本流程。 一、spark的基础组件及其概念 1. ClusterManager Standalone模式
    的头像 发表于 07-02 10:31 401次阅读
    <b class='flag-5'>spark</b><b class='flag-5'>运行</b>的基本流程

    Spark基于DPU的Native引擎算子卸载方案

     和 R 等多种高级编程语言,这使得Spark可以应对各种复杂的大数据应用场景,例如金融、电商、社交媒体等。 Spark 经过多年发展,作为基础的计算框架,不管是
    的头像 发表于 06-28 17:12 560次阅读
    <b class='flag-5'>Spark</b>基于DPU的Native引擎算子卸载方案

    关于Spark的从0实现30s内实时监控指标计算

    前言 说起Spark,大家就会自然而然地想到Flink,而且会不自觉地将这两种主流的大数据实时处理技术进行比较。然后最终得出结论:Flink实时性大于Spark。 的确,Flink的数据计算
    的头像 发表于 06-14 15:52 439次阅读

    Spark+Hive”DPU环境下的性能测评 | OLAP数据库引擎选型白皮书(24版)DPU部分节选

    奇点云2024年版《OLAP数据库引擎选型白皮书》,中科驭数联合奇点云针对Spark+Hive这类大数据计算场景下的主力引擎,测评DPU环境下对比CPU环境下的性能提升效果。特此节选该章节内容,与大家共享。
    的头像 发表于 05-30 16:09 516次阅读
    “<b class='flag-5'>Spark</b>+Hive”<b class='flag-5'>在</b>DPU环境下的性能测评 | OLAP数据库引擎选型白皮书(24版)DPU部分节选

    移植Nucleo745ziq+cyw43439的wifi_join_wpa3时,得到Wifi_join 33555456错误信息如何解决?

    当我移植我的电路板(Nucleo745ziq+cyw43439)的 wifi_join_wpa3 时 但我得到的错误代码是 33555456 OTL.... 如何解决我的项目?
    发表于 05-24 06:49

    STM8RAM运行遇到的疑问求解

    系统函数,而这个系统函数flash里面。这个时候我把flash 已经关了。程序就执行不动了。 RAM的地址域是from 0x0000 to 0x07FF flash的地址域是from 0x8000 to 0xFFFF RAM
    发表于 05-07 07:32

    浅谈变电所运行平台安全管理的应用

    浅谈变电所运行平台安全管理的应用 张颖姣 安科瑞电气股份有限公司 上海嘉定 201801 摘要:电气安全管理是企业生产管理中比较薄弱的环节,它是一项综合性的工作,有工程技术的一面,也有组织
    的头像 发表于 04-15 16:26 302次阅读
    浅谈变电所<b class='flag-5'>运行</b>平台<b class='flag-5'>在</b>安全管理<b class='flag-5'>中</b>的应用

    stm32的运行程序,初始函数明明没有while函数里面,为什么能反复运行

    stm32的运行程序,好多初始函数明明没有while函数里面,但是,他却能反复的,不断地去运行,这是为什么呢? 就像是这个程序,对于设
    发表于 04-08 08:15

    如何利用DPU加速Spark大数据处理? | 总结篇

    SSD速度通过NVMe接口得到了大幅提升,并且网络传输速率也进入了新的高度,但CPU主频发展并未保持同等步调,3GHz左右的核心频率已成为常态。 在当前背景下Apache Spark等大数据处理工具,尽管存储和网络性能的提升极大地减少了数据读取和传输的时间消耗,但
    的头像 发表于 04-02 13:45 1046次阅读
    如何利用DPU加速<b class='flag-5'>Spark</b>大数据处理? | 总结篇

    STM32HIAR如何实现从FLASH加载到SRAM运行程序?

    如题,STM32H IAR如何实现从FLASH加载到SRAM运行程序 有没有相关的例程可供参考
    发表于 03-28 07:46

    Spark基于DPU Snappy压缩算法的异构加速方案

    Spark 某些工作负载方面表现得更加优越。换句话说,Spark 启用了内存分布数据集,除了能够提供交互式查询外,它还可以优化迭代工作负载。Spark SQL是
    的头像 发表于 03-26 17:06 789次阅读
    <b class='flag-5'>Spark</b>基于DPU Snappy压缩算法的异构加速方案

    RDMA技术Apache Spark的应用

    、电信、零售、医疗保健还是物联网,Spark的应用几乎遍及所有需要处理海量数据和复杂计算的领域。它的快速、易用和通用性,使得数据科学家和工程师能够轻松实现数据挖掘、数据分析、实时处理等任务。 然而,Spark的灿烂光环背后,一
    的头像 发表于 03-25 18:13 1530次阅读
    RDMA技术<b class='flag-5'>在</b>Apache <b class='flag-5'>Spark</b><b class='flag-5'>中</b>的应用

    基于DPU和HADOS-RACE加速Spark 3.x

    、Python、Java、Scala、R)等特性大数据计算领域被广泛使用。其中,Spark SQL 是 Spark 生态系统的一个重要组件,它允许用户以结构化数据的方式进行数据处理
    的头像 发表于 03-25 18:12 1345次阅读
    基于DPU和HADOS-RACE加速<b class='flag-5'>Spark</b> 3.x

    浅谈变电所运行平台安全管理的应用

    电气安全管理是企业生产管理中比较薄弱的环节,它是一项综合性的工作,有工程技术的一面,也有组织管理的一面。详细阐述了保障变电所安全运行的基本管理制度,着重探讨了实践中广泛遇到的倒闸操作、电气作业安全管理问题。
    的头像 发表于 02-05 15:40 401次阅读
    浅谈变电所<b class='flag-5'>运行</b>平台<b class='flag-5'>在</b>安全管理<b class='flag-5'>中</b>的应用