0
  • 聊天消息
  • 系统消息
  • 评论与回复
登录后你可以
  • 下载海量资料
  • 学习在线课程
  • 观看技术视频
  • 写文章/发帖/加入社区
会员中心
创作中心

完善资料让更多小伙伴认识你,还能领取20积分哦,立即完善>

3天内不再提示

Spark SQL的概念及查询方式

数据分析与开发 来源:大数据技术与架构 作者:大数据技术与架构 2021-09-02 15:44 次阅读

一、Spark SQL的概念理解

Spark SQL是spark套件中一个模板,它将数据的计算任务通过SQL的形式转换成了RDD的计算,类似于Hive通过SQL的形式将数据的计算任务转换成了MapReduce。

Spark SQL的特点:

和Spark Core的无缝集成,可以在写整个RDD应用的时候,配置Spark SQL来完成逻辑实现。

统一的数据访问方式,Spark SQL提供标准化的SQL查询。

Hive的继承,Spark SQL通过内嵌的hive或者连接外部已经部署好的hive案例,实现了对hive语法的继承和操作。

标准化的连接方式,Spark SQL可以通过启动thrift Server来支持JDBC、ODBC的访问,将自己作为一个BI Server使用

Spark SQL数据抽象:

RDD(Spark1.0)-》DataFrame(Spark1.3)-》DataSet(Spark1.6)

Spark SQL提供了DataFrame和DataSet的数据抽象

DataFrame就是RDD+Schema,可以认为是一张二维表格,劣势在于编译器不进行表格中的字段的类型检查,在运行期进行检查

DataSet是Spark最新的数据抽象,Spark的发展会逐步将DataSet作为主要的数据抽象,弱化RDD和DataFrame.DataSet包含了DataFrame所有的优化机制。除此之外提供了以样例类为Schema模型的强类型

DataFrame=DataSet[Row]

DataFrame和DataSet都有可控的内存管理机制,所有数据都保存在非堆上,都使用了catalyst进行SQL的优化。

Spark SQL客户端查询:

可以通过Spark-shell来操作Spark SQL,spark作为SparkSession的变量名,sc作为SparkContext的变量名

可以通过Spark提供的方法读取json文件,将json文件转换成DataFrame

可以通过DataFrame提供的API来操作DataFrame里面的数据。

可以通过将DataFrame注册成为一个临时表的方式,来通过Spark.sql方法运行标准的SQL语句来查询。

二、Spark SQL查询方式

DataFrame查询方式

DataFrame支持两种查询方式:一种是DSL风格,另外一种是SQL风格

(1)、DSL风格:

需要引入import spark.implicit. _ 这个隐式转换,可以将DataFrame隐式转换成RDD

(2)、SQL风格:

a、需要将DataFrame注册成一张表格,如果通过CreateTempView这种方式来创建,那么该表格Session有效,如果通过CreateGlobalTempView来创建,那么该表格跨Session有效,但是SQL语句访问该表格的时候需要加上前缀global_temp

b、需要通过sparkSession.sql方法来运行你的SQL语句

DataSet查询方式

定义一个DataSet,先定义一个Case类

三、DataFrame、Dataset和RDD互操作

RDD-》DataFrame

普通方式:例如rdd.map(para(para(0).trim(),para(1).trim().toInt)).toDF(“name”,“age”)

通过反射来设置schema,例如:

#通过反射设置schema,数据集是spark自带的people.txt,路径在下面的代码中case class Person(name:String,age:Int)

val peopleDF=spark.sparkContext.textFile(“file:///root/spark/spark2.4.1/examples/src/main/resources/people.txt”).map(_.split(“,”)).map(para=》Person(para(0).trim,para(1).trim.toInt)).toDF

peopleDF.show

8a20a542-0bb0-11ec-8fb8-12bb97331649.png

#注册成一张临时表

peopleDF.createOrReplaceTempView(“persons”)

val teen=spark.sql(“select name,age from persons where age between 13 and 29”)

teen.show

8a301b1c-0bb0-11ec-8fb8-12bb97331649.png

这时teen是一张表,每一行是一个row对象,如果需要访问Row对象中的每一个元素,可以通过下标 row(0);你也可以通过列名 row.getAs[String](“name”)

8a3be46a-0bb0-11ec-8fb8-12bb97331649.png

也可以使用getAs方法:

8a45a978-0bb0-11ec-8fb8-12bb97331649.png

3、通过编程的方式来设置schema,适用于编译器不能确定列的情况

val peopleRDD=spark.sparkContext.textFile(“file:///root/spark/spark2.4.1/examples/src/main/resources/people.txt”)

val schemaString=“name age”

val filed=schemaString.split(“ ”).map(filename=》 org.apache.spark.sql.types.StructField(filename,org.apache.spark.sql.types.StringType,nullable = true))

val schema=org.apache.spark.sql.types.StructType(filed)

peopleRDD.map(_.split(“,”)).map(para=》org.apache.spark.sql.Row(para(0).trim,para(1).trim))

val peopleDF=spark.createDataFrame(res6,schema)

peopleDF.show

8a52119a-0bb0-11ec-8fb8-12bb97331649.png

8a5ddf02-0bb0-11ec-8fb8-12bb97331649.png

8a6a9cb0-0bb0-11ec-8fb8-12bb97331649.png

DataFrame-》RDD

dataFrame.rdd

RDD-》DataSet

rdd.map(para=》 Person(para(0).trim(),para(1).trim().toInt)).toDS

DataSet-》DataSet

dataSet.rdd

DataFrame -》 DataSet

dataFrame.to[Person]

DataSet -》 DataFrame

dataSet.toDF

四、用户自定义函数

用户自定义UDF函数

通过spark.udf功能用户可以自定义函数

自定义udf函数:

通过spark.udf.register(name,func)来注册一个UDF函数,name是UDF调用时的标识符,fun是一个函数,用于处理字段。

需要将一个DF或者DS注册为一个临时表

通过spark.sql去运行一个SQL语句,在SQL语句中可以通过name(列名)方式来应用UDF函数

用户自定义聚合函数

1. 弱类型用户自定义聚合函数

新建一个Class 继承UserDefinedAggregateFunction ,然后复写方法:

//聚合函数需要输入参数的数据类型

override def inputSchema: StructType = ???

//可以理解为保存聚合函数业务逻辑数据的一个数据结构

override def bufferSchema: StructType = ???

// 返回值的数据类型

override def dataType: DataType = ???

// 对于相同的输入一直有相同的输出

override def deterministic: Boolean = true

//用于初始化你的数据结构

override def initialize(buffer: MutableAggregationBuffer): Unit = ???

//用于同分区内Row对聚合函数的更新操作

override def update(buffer: MutableAggregationBuffer, input: Row): Unit = ???

//用于不同分区对聚合结果的聚合。

override def merge(buffer1: MutableAggregationBuffer, buffer2: Row): Unit = ???

//计算最终结果

override def evaluate(buffer: Row): Any = ???

你需要通过spark.udf.resigter去注册你的UDAF函数。

需要通过spark.sql去运行你的SQL语句,可以通过 select UDAF(列名) 来应用你的用户自定义聚合函数。

2、强类型用户自定义聚合函数

新建一个class,继承Aggregator[Employee, Average, Double],其中Employee是在应用聚合函数的时候传入的对象,Average是聚合函数在运行的时候内部需要的数据结构,Double是聚合函数最终需要输出的类型。这些可以根据自己的业务需求去调整。复写相对应的方法:

//用于定义一个聚合函数内部需要的数据结构

override def zero: Average = ???

//针对每个分区内部每一个输入来更新你的数据结构

override def reduce(b: Average, a: Employee): Average = ???

//用于对于不同分区的结构进行聚合

override def merge(b1: Average, b2: Average): Average = ???

//计算输出

override def finish(reduction: Average): Double = ???

//用于数据结构他的转换

override def bufferEncoder: Encoder[Average] = ???

//用于最终结果的转换

override def outputEncoder: Encoder[Double] = ???

新建一个UDAF实例,通过DF或者DS的DSL风格语法去应用。

五、Spark SQL和Hive的继承

1、内置Hive

Spark内置有Hive,Spark2.1.1 内置的Hive是1.2.1。

需要将core-site.xml和hdfs-site.xml 拷贝到spark的conf目录下。如果Spark路径下发现metastore_db,需要删除【仅第一次启动的时候】。

在你第一次启动创建metastore的时候,你需要指定spark.sql.warehouse.dir这个参数, 比如:bin/spark-shell --conf spark.sql.warehouse.dir=hdfs://master01:9000/spark_warehouse

注意,如果你在load数据的时候,需要将数据放到HDFS上。

2、外部Hive(这里主要使用这个方法)

需要将hive-site.xml 拷贝到spark的conf目录下。

如果hive的metestore使用的是mysql数据库,那么需要将mysql的jdbc驱动包放到spark的jars目录下。

可以通过spark-sql或者spark-shell来进行sql的查询。完成和hive的连接。

8a76fc9e-0bb0-11ec-8fb8-12bb97331649.png

这就是hive里面的表

8a89228e-0bb0-11ec-8fb8-12bb97331649.png

六、Spark SQL的数据源

1、输入

对于Spark SQL的输入需要使用sparkSession.read方法

通用模式 sparkSession.read.format(“json”).load(“path”) 支持类型:parquet、json、text、csv、orc、jdbc

专业模式 sparkSession.read.json、 csv 直接指定类型。

2、输出

对于Spark SQL的输出需要使用 sparkSession.write方法

通用模式 dataFrame.write.format(“json”).save(“path”) 支持类型:parquet、json、text、csv、orc

专业模式 dataFrame.write.csv(“path”) 直接指定类型

如果你使用通用模式,spark默认parquet是默认格式、sparkSession.read.load 加载的默认是parquet格式dataFrame.write.save也是默认保存成parquet格式。

如果需要保存成一个text文件,那么需要dataFrame里面只有一列(只需要一列即可)。

七、Spark SQL实战

1、数据说明

这里有三个数据集,合起来大概有几十万条数据,是关于货品交易的数据集。

8aa849d4-0bb0-11ec-8fb8-12bb97331649.png

2、任务

这里有三个需求:

计算所有订单中每年的销售单数、销售总额

计算所有订单每年最大金额订单的销售额

计算所有订单中每年最畅销货品

3、步骤

1. 加载数据

tbStock.txt

#代码case class tbStock(ordernumber:String,locationid:String,dateid:String) extends Serializable

val tbStockRdd=spark.sparkContext.textFile(“file:///root/dataset/tbStock.txt”)

val tbStockDS=tbStockRdd.map(_.split(“,”)).map(attr=》tbStock(attr(0),attr(1),attr(2))).toDS

tbStockDS.show()

8ab2028a-0bb0-11ec-8fb8-12bb97331649.png8ac01d7a-0bb0-11ec-8fb8-12bb97331649.png8ad1cfc0-0bb0-11ec-8fb8-12bb97331649.png

8adee8f4-0bb0-11ec-8fb8-12bb97331649.png

tbStockDetail.txt

case class tbStockDetail(ordernumber:String,rownum:Int,itemid:String,number:Int,price:Double,amount:Double) extends Serializable

val tbStockDetailRdd=spark.sparkContext.textFile(“file:///root/dataset/tbStockDetail.txt”)

val tbStockDetailDS=tbStockDetailRdd.map(_.split(“,”)).map(attr=》tbStockDetail(attr(0),attr(1).trim().toInt,attr(2),attr(3).trim().toInt,attr(4).trim().toDouble,attr(5).trim().toDouble)).toDS

tbStockDetailDS.show()

8af1d2de-0bb0-11ec-8fb8-12bb97331649.png8b00fd68-0bb0-11ec-8fb8-12bb97331649.png

8b0bcd88-0bb0-11ec-8fb8-12bb97331649.png

8b305a72-0bb0-11ec-8fb8-12bb97331649.png

tbDate.txt

case class tbDate(dateid:String,years:Int,theyear:Int,month:Int,day:Int,weekday:Int,week:Int,quarter:Int,period:Int,halfmonth:Int) extends Serializable

val tbDateRdd=spark.sparkContext.textFile(“file:///root/dataset/tbDate.txt”)

val tbDateDS=tbDateRdd.map(_.split(“,”)).map(attr=》tbDate(attr(0),attr(1).trim().toInt,attr(2).trim().toInt,attr(3).trim().toInt,attr(4).trim().toInt,attr(5).trim().toInt,attr(6).trim().toInt,attr(7).trim().toInt,attr(8).trim().toInt,attr(9).trim().toInt)).toDS

tbDateDS.show()

8b45157a-0bb0-11ec-8fb8-12bb97331649.png8b5182f6-0bb0-11ec-8fb8-12bb97331649.png

8b638f14-0bb0-11ec-8fb8-12bb97331649.png

8b7541be-0bb0-11ec-8fb8-12bb97331649.png

2. 注册表

tbStockDS.createOrReplaceTempView(“tbStock”)

tbDateDS.createOrReplaceTempView(“tbDate”)

tbStockDetailDS.createOrReplaceTempView(“tbStockDetail”)

8b829b66-0bb0-11ec-8fb8-12bb97331649.png

3. 解析表

计算所有订单中每年的销售单数、销售总额

#sql语句

select c.theyear,count(distinct a.ordernumber),sum(b.amount)

from tbStock a

join tbStockDetail b on a.ordernumber=b.ordernumber

join tbDate c on a.dateid=c.dateid

group by c.theyear

order by c.theyear

8b8fb152-0bb0-11ec-8fb8-12bb97331649.png

计算所有订单每年最大金额订单的销售额

a、先统计每年每个订单的销售额

select a.dateid,a.ordernumber,sum(b.amount) as SumOfAmount

from tbStock a

join tbStockDetail b on a.ordernumber=b.ordernumber

group by a.dateid,a.ordernumber

8ba32660-0bb0-11ec-8fb8-12bb97331649.png

b、计算最大金额订单的销售额

select d.theyear,c.SumOfAmount as SumOfAmount

from

(select a.dateid,a.ordernumber,sum(b.amount) as SumOfAmount

from tbStock a

join tbStockDetail b on a.ordernumber=b.ordernumber

group by a.dateid,a.ordernumber) c

join tbDate d on c.dateid=d.dateid

group by d.theyear

order by theyear desc

8bb0abfa-0bb0-11ec-8fb8-12bb97331649.png

计算所有订单中每年最畅销货品

a、求出每年每个货品的销售额

select c.theyear,b.itemid,sum(b.amount) as SumOfAmount

from tbStock a

join tbStockDetail b on a.ordernumber=b.ordernumber

join tbDate c on a.dateid=c.dateid

group by c.theyear,b.itemid

8bc427c0-0bb0-11ec-8fb8-12bb97331649.png

b、在a的基础上,统计每年单个货品的最大金额

select d.theyear,max(d.SumOfAmount) as MaxOfAmount

from

(select c.theyear,b.itemid,sum(b.amount) as SumOfAmount

from tbStock a

join tbStockDetail b on a.ordernumber=b.ordernumber

join tbDate c on a.dateid=c.dateid

group by c.theyear,b.itemid) d

group by theyear

8bd5482a-0bb0-11ec-8fb8-12bb97331649.png

c、用最大销售额和统计好的每个货品的销售额join,以及用年join,集合得到最畅销货品那一行信息

select distinct e.theyear,e.itemid,f.maxofamount

from

(select c.theyear,b.itemid,sum(b.amount) as sumofamount

from tbStock a

join tbStockDetail b on a.ordernumber=b.ordernumber

join tbDate c on a.dateid=c.dateid

group by c.theyear,b.itemid) e

join

(select d.theyear,max(d.sumofamount) as maxofamount

from

(select c.theyear,b.itemid,sum(b.amount) as sumofamount

from tbStock a

join tbStockDetail b on a.ordernumber=b.ordernumber

join tbDate c on a.dateid=c.dateid

group by c.theyear,b.itemid) d

group by d.theyear) f on e.theyear=f.theyear

and e.sumofamount=f.maxofamount order by e.theyear

8be25894-0bb0-11ec-8fb8-12bb97331649.png

编辑:jq

声明:本文内容及配图由入驻作者撰写或者入驻合作网站授权转载。文章观点仅代表作者本人,不代表电子发烧友网立场。文章及其配图仅供工程师学习之用,如有内容侵权或者其他违规问题,请联系本站处理。 举报投诉
  • 数据
    +关注

    关注

    8

    文章

    7080

    浏览量

    89175
  • SQL
    SQL
    +关注

    关注

    1

    文章

    767

    浏览量

    44174
  • 函数
    +关注

    关注

    3

    文章

    4338

    浏览量

    62739
  • RDD
    RDD
    +关注

    关注

    0

    文章

    7

    浏览量

    7984

原文标题:Spark SQL 重点知识总结

文章出处:【微信号:DBDevs,微信公众号:数据分析与开发】欢迎添加关注!文章转载请注明出处。

收藏 人收藏

    评论

    相关推荐

    浅谈SQL优化小技巧

    作者:京东零售 王军 回顾:MySQL的执行过程回顾 MySQL的执行过程,帮助 介绍 如何进行sql优化。 (1)客户端发送一条查询语句到服务器; (2)服务器先查询缓存,如果命中缓存,则立即返回
    的头像 发表于 12-25 09:59 499次阅读

    SQL与NoSQL的区别

    景。 SQL数据库 SQL数据库,也称为关系型数据库管理系统(RDBMS),是一种基于关系模型的数据库。它使用表格、行和列来组织数据,并通过SQL语言进行数据的查询和管理。 特点 结构
    的头像 发表于 11-19 10:15 189次阅读

    谐波的概念及应用

    本文简单介绍了谐波的概念及应用。
    的头像 发表于 10-18 14:14 530次阅读
    谐波的<b class='flag-5'>概念及</b>应用

    大数据从业者必知必会的Hive SQL调优技巧

    大数据从业者必知必会的Hive SQL调优技巧 摘要 :在大数据领域中,Hive SQL被广泛应用于数据仓库的数据查询和分析。然而,由于数据量庞大和复杂的查询需求,Hive
    的头像 发表于 09-24 13:30 286次阅读

    spark为什么比mapreduce快?

    spark为什么比mapreduce快? 首先澄清几个误区: 1:两者都是基于内存计算的,任何计算框架都肯定是基于内存的,所以网上说的spark是基于内存计算所以快,显然是错误的 2;DAG计算模型
    的头像 发表于 09-06 09:45 287次阅读

    S参数的概念及应用

    电子发烧友网站提供《S参数的概念及应用.pdf》资料免费下载
    发表于 08-12 14:29 0次下载

    IP 地址在 SQL 注入攻击中的作用及防范策略

    SQL 注入是通过将恶意的 SQL 代码插入到输入参数中,欺骗应用程序执行这些恶意代码,从而实现对数据库的非法操作。例如,在一个登录表单中,如果输入的用户名被直接拼接到 SQL 查询
    的头像 发表于 08-05 17:36 330次阅读

    什么是 Flink SQL 解决不了的问题?

    简介 在实时数据开发过程中,大家经常会用 Flink SQL 或者 Flink DataStream API 来做数据加工。通常情况下选用2者都能加工出想要的数据,但是总会有 Flink SQL
    的头像 发表于 07-09 20:50 333次阅读

    spark运行的基本流程

    记录和分享下spark运行的基本流程。 一、spark的基础组件及其概念 1. ClusterManager 在Standalone模式中即为Master,控制整个集群,监控Worker。在YARN
    的头像 发表于 07-02 10:31 422次阅读
    <b class='flag-5'>spark</b>运行的基本流程

    Spark基于DPU的Native引擎算子卸载方案

    1.背景介绍 Apache Spark(以下简称Spark)是一个开源的分布式计算框架,由UC Berkeley AMP Lab开发,可用于批处理、交互式查询Spark
    的头像 发表于 06-28 17:12 709次阅读
    <b class='flag-5'>Spark</b>基于DPU的Native引擎算子卸载方案

    Spark基于DPU Snappy压缩算法的异构加速方案

    Spark 在某些工作负载方面表现得更加优越。换句话说,Spark 启用了内存分布数据集,除了能够提供交互式查询外,它还可以优化迭代工作负载。Spark
    的头像 发表于 03-26 17:06 834次阅读
    <b class='flag-5'>Spark</b>基于DPU Snappy压缩算法的异构加速方案

    RDMA技术在Apache Spark中的应用

    背景介绍 在当今数据驱动的时代,Apache Spark已经成为了处理大规模数据集的首选框架。作为一个开源的分布式计算系统,Spark因其高效的大数据处理能力而在各行各业中广受欢迎。无论是金融服务
    的头像 发表于 03-25 18:13 1557次阅读
    RDMA技术在Apache <b class='flag-5'>Spark</b>中的应用

    基于DPU和HADOS-RACE加速Spark 3.x

    、Python、Java、Scala、R)等特性在大数据计算领域被广泛使用。其中,Spark SQLSpark 生态系统中的一个重要组件,它允许用户以结构化数据的方式进行数据处理
    的头像 发表于 03-25 18:12 1391次阅读
    基于DPU和HADOS-RACE加速<b class='flag-5'>Spark</b> 3.x

    为什么需要监控SQL服务器?

    服务器是存储、处理和管理数据的关系数据库管理系统 (RDBMS) 工具或软件,例如Microsoft的MSSQL、Oracle DB和PostgreSQL。此外,服务器执行SQL查询和命令来操作关系数据库。实际上,SQL服务器将
    的头像 发表于 02-19 17:19 486次阅读

    查询SQL在mysql内部是如何执行?

    我们知道在mySQL客户端,输入一条查询SQL,然后看到返回查询的结果。这条查询语句在 MySQL 内部到底是如何执行的呢?本文跟大家探讨一下哈,我们先来看下MySQL基本架构~
    的头像 发表于 01-22 14:53 591次阅读
    <b class='flag-5'>查询</b><b class='flag-5'>SQL</b>在mysql内部是如何执行?