0
  • 聊天消息
  • 系统消息
  • 评论与回复
登录后你可以
  • 下载海量资料
  • 学习在线课程
  • 观看技术视频
  • 写文章/发帖/加入社区
会员中心
创作中心

完善资料让更多小伙伴认识你,还能领取20积分哦,立即完善>

3天内不再提示

Pandas与PySpark强强联手,功能与速度齐飞

数据分析与开发 来源:数据分析与开发 2023-04-25 10:51 次阅读

使用Python做数据处理的数据科学家或数据从业者,对数据科学包pandas并不陌生,也不乏像主页君一样的pandas重度使用者,项目开始写的第一行代码,大多是 import pandas as pdpandas做数据处理可以说是yyds!而他的缺点也是非常明显,pandas 只能单机处理,它不能随数据量线性伸缩。例如,如果 pandas 试图读取的数据集大于一台机器的可用内存,则会因内存不足而失败。

另外 pandas 在处理大型数据方面非常慢,虽然有像Dask 或 Vaex 等其他库来优化提升数据处理速度,但在大数据处理神之框架Spark面前,也是小菜一碟。

幸运的是,在新的 Spark 3.2 版本中,出现了一个新的Pandas API,将pandas大部分功能都集成到PySpark中,使用pandas的接口,就能使用Spark,因为 Spark 上的 Pandas API 在后台使用 Spark,这样就能达到强强联手的效果,可以说是非常强大,非常方便。

这一切都始于 2019 年 Spark + AI 峰会。Koalas 是一个开源项目,可以在 Spark 之上使用 Pandas。一开始,它只覆盖了 Pandas 的一小部分功能,但后来逐渐壮大起来。现在,在新的 Spark 3.2 版本中,Koalas 已合并到 PySpark。

Spark 现在集成了 Pandas API,因此可以在 Spark 上运行 Pandas。只需要更改一行代码:

importpyspark.pandasasps

由此我们可以获得诸多的优势:

  • 如果我们熟悉使用Python 和 Pandas,但不熟悉 Spark,可以省略了需复杂的学习过程而立即使用PySpark。
  • 可以为所有内容使用一个代码库:无论是小数据和大数据,还是单机和分布式机器。
  • 可以在Spark分布式框架上,更快地运行 Pandas 代码。

最后一点尤其值得注意。

一方面,可以将分布式计算应用于在 Pandas 中的代码。且借助 Spark 引擎,代码即使在单台机器上也会更快!下图展示了在一台机器(具有 96 个 vCPU 和 384 GiBs 内存)上运行 Spark 和单独调用 pandas 分析 130GB 的 CSV 数据集的性能对比。

7bf05686-ddad-11ed-bfe3-dac502259ad0.png

多线程和 Spark SQL Catalyst Optimizer 都有助于优化性能。例如,Join count 操作在整个阶段代码生成时快 4 倍:没有代码生成时为 5.9 秒,代码生成时为 1.6 秒。

Spark 在链式操作(chaining operations)中具有特别显着的优势。Catalyst 查询优化器可以识别过滤器以明智地过滤数据并可以应用基于磁盘的连接(disk-based joins),而 Pandas 倾向于每一步将所有数据加载到内存中。

现在是不是迫不及待的想尝试如何在 Spark 上使用 Pandas API 编写一些代码?我们现在就开始吧!

在 Pandas / Pandas-on-Spark / Spark 之间切换

需要知道的第一件事是我们到底在使用什么。在使用 Pandas 时,使用类pandas.core.frame.DataFrame。在 Spark 中使用 pandas API 时,使用pyspark.pandas.frame.DataFrame。虽然两者相似,但不相同。主要区别在于前者在单机中,而后者是分布式的。

可以使用 Pandas-on-Spark 创建一个 Dataframe 并将其转换为 Pandas,反之亦然:

#importPandas-on-Spark
importpyspark.pandasasps

#使用Pandas-on-Spark创建一个DataFrame
ps_df=ps.DataFrame(range(10))

#将Pandas-on-SparkDataframe转换为PandasDataframe
pd_df=ps_df.to_pandas()

#将PandasDataframe转换为Pandas-on-SparkDataframe
ps_df=ps.from_pandas(pd_df)

注意,如果使用多台机器,则在将 Pandas-on-Spark Dataframe 转换为 Pandas Dataframe 时,数据会从多台机器传输到一台机器,反之亦然(可参阅PySpark 指南[1])。

还可以将 Pandas-on-Spark Dataframe 转换为 Spark DataFrame,反之亦然:

#使用Pandas-on-Spark创建一个DataFrame
ps_df=ps.DataFrame(range(10))

#将Pandas-on-SparkDataframe转换为SparkDataframe
spark_df=ps_df.to_spark()

#将SparkDataframe转换为Pandas-on-SparkDataframe
ps_df_new=spark_df.to_pandas_on_spark()

数据类型如何改变?

在使用 Pandas-on-Spark 和 Pandas 时,数据类型基本相同。将 Pandas-on-Spark DataFrame 转换为 Spark DataFrame 时,数据类型会自动转换为适当的类型(请参阅PySpark 指南[2]

下面的示例显示了在转换时是如何将数据类型从 PySpark DataFrame 转换为 pandas-on-Spark DataFrame。

>>>sdf=spark.createDataFrame([
...(1,Decimal(1.0),1.,1.,1,1,1,datetime(2020,10,27),"1",True,datetime(2020,10,27)),
...],'tinyinttinyint,decimaldecimal,floatfloat,doubledouble,integerinteger,longlong,shortshort,timestamptimestamp,stringstring,booleanboolean,datedate')
>>>sdf
DataFrame[tinyint: tinyint, decimal: decimal(10,0),
float: float, double: double, integer: int,
long: bigint, short: smallint, timestamp: timestamp, 
string: string, boolean: boolean, date: date]
psdf=sdf.pandas_api()
psdf.dtypes
tinyint                int8
decimal              object
float               float32
double              float64
integer               int32
long                  int64
short                 int16
timestamp    datetime64[ns]
string               object
boolean                bool
date                 object
dtype: object

Pandas-on-Spark vs Spark 函数

在 Spark 中的 DataFrame 及其在 Pandas-on-Spark 中的最常用函数。注意,Pandas-on-Spark 和 Pandas 在语法上的唯一区别就是 import pyspark.pandas as ps 一行。

当你看完如下内容后,你会发现,即使您不熟悉 Spark,也可以通过 Pandas API 轻松使用。

导入库

#运行Spark
frompyspark.sqlimportSparkSession
spark=SparkSession.builder
.appName("Spark")
.getOrCreate()
#在Spark上运行Pandas
importpyspark.pandasasps

读取数据

以 old dog iris 数据集为例。

#SPARK
sdf=spark.read.options(inferSchema='True',
header='True').csv('iris.csv')
#PANDAS-ON-SPARK
pdf=ps.read_csv('iris.csv')

选择

#SPARK
sdf.select("sepal_length","sepal_width").show()
#PANDAS-ON-SPARK
pdf[["sepal_length","sepal_width"]].head()

删除列

#SPARK
sdf.drop('sepal_length').show()#PANDAS-ON-SPARK
pdf.drop('sepal_length').head()

删除重复项

#SPARK
sdf.dropDuplicates(["sepal_length","sepal_width"]).show()
#PANDAS-ON-SPARK
pdf[["sepal_length","sepal_width"]].drop_duplicates()

筛选

#SPARK
sdf.filter((sdf.flower_type=="Iris-setosa")&(sdf.petal_length>1.5)).show()
#PANDAS-ON-SPARK
pdf.loc[(pdf.flower_type=="Iris-setosa")&(pdf.petal_length>1.5)].head()

计数

#SPARK
sdf.filter(sdf.flower_type=="Iris-virginica").count()
#PANDAS-ON-SPARK
pdf.loc[pdf.flower_type=="Iris-virginica"].count()

唯一值

#SPARK
sdf.select("flower_type").distinct().show()
#PANDAS-ON-SPARK
pdf["flower_type"].unique()

排序

#SPARK
sdf.sort("sepal_length","sepal_width").show()
#PANDAS-ON-SPARK
pdf.sort_values(["sepal_length","sepal_width"]).head()

分组

#SPARK
sdf.groupBy("flower_type").count().show()
#PANDAS-ON-SPARK
pdf.groupby("flower_type").count()

替换

#SPARK
sdf.replace("Iris-setosa","setosa").show()
#PANDAS-ON-SPARK
pdf.replace("Iris-setosa","setosa").head()

连接

#SPARK
sdf.union(sdf)
#PANDAS-ON-SPARK
pdf.append(pdf)

transform 和 apply 函数应用

有许多 API 允许用户针对 pandas-on-Spark DataFrame 应用函数,例如:

DataFrame.transform()
DataFrame.apply()
DataFrame.pandas_on_spark.transform_batch()
DataFrame.pandas_on_spark.apply_batch()
Series.pandas_on_spark.transform_batch()

每个 API 都有不同的用途,并且在内部工作方式不同。

transform 和 apply

DataFrame.transform()DataFrame.apply()之间的主要区别在于,前者需要返回相同长度的输入,而后者不需要。

#transform
psdf=ps.DataFrame({'a':[1,2,3],'b':[4,5,6]})
defpandas_plus(pser):
returnpser+1#应该总是返回与输入相同的长度。

psdf.transform(pandas_plus)

#apply
psdf=ps.DataFrame({'a':[1,2,3],'b':[5,6,7]})
defpandas_plus(pser):
returnpser[pser%2==1]#允许任意长度

psdf.apply(pandas_plus)

在这种情况下,每个函数采用一个 pandas Series,Spark 上的 pandas API 以分布式方式计算函数,如下所示。

7c02cf82-ddad-11ed-bfe3-dac502259ad0.png

在“列”轴的情况下,该函数将每一行作为一个熊猫系列。

psdf=ps.DataFrame({'a':[1,2,3],'b':[4,5,6]})
defpandas_plus(pser):
returnsum(pser)#允许任意长度
psdf.apply(pandas_plus,axis='columns')

上面的示例将每一行的总和计算为pands Series

7c31b34c-ddad-11ed-bfe3-dac502259ad0.png

pandas_on_spark.transform_batchpandas_on_spark.apply_batch

batch 后缀表示 pandas-on-Spark DataFrame 或 Series 中的每个块。API 对 pandas-on-Spark DataFrame 或 Series 进行切片,然后以 pandas DataFrame 或 Series 作为输入和输出应用给定函数。请参阅以下示例:

psdf=ps.DataFrame({'a':[1,2,3],'b':[4,5,6]})
defpandas_plus(pdf):
returnpdf+1#应该总是返回与输入相同的长度。

psdf.pandas_on_spark.transform_batch(pandas_plus)

psdf=ps.DataFrame({'a':[1,2,3],'b':[4,5,6]})
defpandas_plus(pdf):
returnpdf[pdf.a>1]#允许任意长度

psdf.pandas_on_spark.apply_batch(pandas_plus)

两个示例中的函数都将 pandas DataFrame 作为 pandas-on-Spark DataFrame 的一个块,并输出一个 pandas DataFrame。Spark 上的 Pandas API 将 pandas 数据帧组合为 pandas-on-Spark 数据帧。

7c6fae22-ddad-11ed-bfe3-dac502259ad0.png在 Spark 上使用 pandas API的注意事项

避免shuffle

某些操作,例如sort_values在并行或分布式环境中比在单台机器上的内存中更难完成,因为它需要将数据发送到其他节点,并通过网络在多个节点之间交换数据。

避免在单个分区上计算

另一种常见情况是在单个分区上进行计算。目前, DataFrame.rank 等一些 API 使用 PySpark 的 Window 而不指定分区规范。这会将所有数据移动到单个机器中的单个分区中,并可能导致严重的性能下降。对于非常大的数据集,应避免使用此类 API。

不要使用重复的列名

不允许使用重复的列名,因为 Spark SQL 通常不允许这样做。Spark 上的 Pandas API 继承了这种行为。例如,见下文:

importpyspark.pandasasps
psdf=ps.DataFrame({'a':[1,2],'b':[3,4]})
psdf.columns=["a","a"]
Reference 'a' is ambiguous, could be: a, a.;

此外,强烈建议不要使用区分大小写的列名。Spark 上的 Pandas API 默认不允许它。

importpyspark.pandasasps
psdf=ps.DataFrame({'a':[1,2],'A':[3,4]})
Reference 'a' is ambiguous, could be: a, a.;

但可以在 Spark 配置spark.sql.caseSensitive中打开以启用它,但需要自己承担风险。

frompyspark.sqlimportSparkSession
builder=SparkSession.builder.appName("pandas-on-spark")
builder=builder.config("spark.sql.caseSensitive","true")
builder.getOrCreate()

importpyspark.pandasasps
psdf=ps.DataFrame({'a':[1,2],'A':[3,4]})
psdf
   a  A
0  1  3
1  2  4

使用默认索引

pandas-on-Spark 用户面临的一个常见问题是默认索引导致性能下降。当索引未知时,Spark 上的 Pandas API 会附加一个默认索引,例如 Spark DataFrame 直接转换为 pandas-on-Spark DataFrame。

如果计划在生产中处理大数据,请通过将默认索引配置为distributeddistributed-sequence来使其确保为分布式。

有关配置默认索引的更多详细信息,请参阅默认索引类型[3]

在 Spark 上使用 pandas API

尽管 Spark 上的 pandas API 具有大部分与 pandas 等效的 API,但仍有一些 API 尚未实现或明确不受支持。因此尽可能直接在 Spark 上使用 pandas API。

例如,Spark 上的 pandas API 没有实现__iter__(),阻止用户将所有数据从整个集群收集到客户端(驱动程序)端。不幸的是,许多外部 API,例如 min、max、sum 等 Python 的内置函数,都要求给定参数是可迭代的。对于 pandas,它开箱即用,如下所示:

>>>importpandasaspd
>>>max(pd.Series([1,2,3]))
3
>>>min(pd.Series([1,2,3]))
1
>>>sum(pd.Series([1,2,3]))
6

Pandas 数据集存在于单台机器中,自然可以在同一台机器内进行本地迭代。但是,pandas-on-Spark 数据集存在于多台机器上,并且它们是以分布式方式计算的。很难在本地迭代,很可能用户在不知情的情况下将整个数据收集到客户端。因此,最好坚持使用 pandas-on-Spark API。上面的例子可以转换如下:

>>>importpyspark.pandasasps
>>>ps.Series([1,2,3]).max()
3
>>>ps.Series([1,2,3]).min()
1
>>>ps.Series([1,2,3]).sum()
6

pandas 用户的另一个常见模式可能是依赖列表推导式或生成器表达式。但是,它还假设数据集在引擎盖下是本地可迭代的。因此,它可以在 pandas 中无缝运行,如下所示:

importpandasaspd
data=[]
countries=['London','NewYork','Helsinki']
pser=pd.Series([20.,21.,12.],index=countries)
fortemperatureinpser:
asserttemperature>0
iftemperature>1000:
temperature=None
data.append(temperature**2)

pd.Series(data,index=countries)
London      400.0
New York    441.0
Helsinki    144.0
dtype: float64

但是,对于 Spark 上的 pandas API,它的工作原理与上述相同。上面的示例也可以更改为直接使用 pandas-on-Spark API,如下所示:

importpyspark.pandasasps
importnumpyasnp
countries=['London','NewYork','Helsinki']
psser=ps.Series([20.,21.,12.],index=countries)
defsquare(temperature)->np.float64:
asserttemperature>0
iftemperature>1000:
temperature=None
returntemperature**2

psser.apply(square)
London      400.0
New York    441.0
Helsinki    144.0

减少对不同 DataFrame 的操作

Spark 上的 Pandas API 默认不允许对不同 DataFrame(或 Series)进行操作,以防止昂贵的操作。只要有可能,就应该避免这种操作。

写在最后

到目前为止,我们将能够在 Spark 上使用 Pandas。这将会导致Pandas 速度的大大提高,迁移到 Spark 时学习曲线的减少,以及单机计算和分布式计算在同一代码库中的合并。

审核编辑 :李倩


声明:本文内容及配图由入驻作者撰写或者入驻合作网站授权转载。文章观点仅代表作者本人,不代表电子发烧友网立场。文章及其配图仅供工程师学习之用,如有内容侵权或者其他违规问题,请联系本站处理。 举报投诉
  • API
    API
    +关注

    关注

    2

    文章

    1439

    浏览量

    61239
  • 代码
    +关注

    关注

    30

    文章

    4601

    浏览量

    67342
  • 过滤器
    +关注

    关注

    1

    文章

    417

    浏览量

    19217

原文标题:Pandas 与 PySpark 强强联手,功能与速度齐飞

文章出处:【微信号:DBDevs,微信公众号:数据分析与开发】欢迎添加关注!文章转载请注明出处。

收藏 人收藏

    评论

    相关推荐

    双面布局贴补,FPC焊接很受伤

    高速先生成员--王辉东 FPC上有器件的位置添加补,按理说是合情合理,为什么加了补,就无法焊接。请走进今天的案例,为你揭秘,看看你是否也有相似的经历。 生活就像巧克力,你永远不知道下一
    发表于 03-11 17:57

    宋仕论道”系列讲座的文章暨宋仕先生研究华强北模式和华强北文化的系列文章,再次迎来更新!

    的时间里,宋仕先生的这一重磅力作被翻译成英文,相继被纽约日报、金融日报、华尔街日报、澳大利亚时报、澳洲早报等多家国外知名媒体火速转载,转载力度之大、速度之快、传播之广大有“星星之火,可以燎原”之势! **
    发表于 03-26 10:36

    亚嵌名师? ?联手??打造尖端嵌入式人才

    亚嵌名师   联手  打造尖端嵌入式人才激爽一夏**嵌入式Linux暑期特惠班火爆招生**暑期特惠班
    发表于 06-20 08:45

    亚嵌名师? ?联手??打造尖端嵌入式人才

    亚嵌名师   联手  打造尖端嵌入式人才激爽一夏**嵌入式Linux暑期特惠班火爆招生**暑期特惠班
    发表于 06-20 08:46

    亚嵌名师? ?联手??打造尖端嵌入式人才

    亚嵌名师   联手  打造尖端嵌入式人才激爽一夏**嵌入式Linux暑期特惠班火爆招生**暑期特惠班
    发表于 06-20 08:48

    钕铁硼

    ,北通杭州湾大桥。钕铁硼永磁材料是以金属钕、纯铁和硼铁合金为主要原材料,添加少量其他元素,应用粉末冶金工艺制造的一种铁基永磁材料。专业生产钕铁硼磁http://www.cxrd-magnet.net/news/ndfebnews/rtndfeb,信誉有保证。
    发表于 09-23 17:28

    推挽、开漏、上拉、弱上拉、下拉、弱下拉输出

    本帖最后由 gk320830 于 2015-3-4 23:01 编辑 推挽、开漏、上拉、弱上拉、下拉、弱下拉输出
    发表于 08-20 14:02

    华清远见、ARM、思卡尔全国巡回技术研讨会(西安)

    2015年5月30日华清远见教育集团联手ARM及思卡尔公司,组合,共同推出主题为“智能化电子设备开发热点及基于ARM的嵌入式系统开发核心技术”的全国大型巡回公益技术讲座西安开奖啦
    发表于 05-22 15:56

    签约美国著名电源转换芯片产品供应商Power Integrations

    ,他希望通过此次合作,能够实现联手,达到互利共赢的目的。而这次签约的另一位主角——世,成立23年来,代理了全球三十几家著名元器件和半导体企业的相关产品,服务了数千家中国企业,是中
    发表于 11-10 15:05

    智能插排哪家

    智能插排哪家
    发表于 07-19 14:00

    单丝伸仪简介

    家技术监督局鉴定。2、该机采用单片机控制系统,自动处理数据,可显示并打印输出,采用等速伸长(CRE)检测原理。3、整机接插件少,可靠性,达到准确、稳定、效率高、该机操作简单方便,具有自检及断电保护功能。4、显示
    发表于 08-21 15:20

    pyspark环境配置问题

    pyspark学习笔记 一些环境配置问题
    发表于 03-24 06:38

    使用Pandas的定制功能来帮助我们自定义内容的显示方式

    Pandas是一个在数据科学中常用的功能强大的Python库。它可以从各种来源加载和操作数据集。当使用Pandas时,默认选项就已经适合大多数人了。但是在某些情况下,我们可能希望更改所显示内容的格式
    发表于 12-19 17:03

    FPC软板补设计

    最近在某EDA画了一块FPC,有专门的FPC补工具,输出的GERBER层名也有补信息,在他们平台下单也可以自动识别补信息,而且还可以少50块,不知道华秋DFM是否可以识别,如果可以检查就比较完美了
    发表于 10-08 15:00

    Pandas:Python中强大方便的绘图功能

    并编译到C,并且在C上执行,因此也保证了处理速度。不过我们今天的重点不在于它的处理速度,而是它和matplotlib合作产生的强大且方便的绘图功能。 到底有多强呢?让我们来体会一下。 1.创建数据 使用
    的头像 发表于 11-03 11:04 358次阅读
    <b class='flag-5'>Pandas</b>:Python中强大方便的绘图<b class='flag-5'>功能</b>