大数据最佳实践-pyspark
目录概述代码实战All DataFrames above result same.提交参考资料概述这是PySpark DataFrame API的简短介绍和快速入门。PySpark DataFrames延迟评估。它们是在RDD之上实现的。当Spark转换数据时,它不会立即计算转换,而是计划以后如何计算。当 明确调用诸如之类的动作时collect(),计算开始。该笔记本显示了主要面向新用户的Data
概述
这是PySpark DataFrame API的简短介绍和快速入门。PySpark DataFrames延迟评估。它们是在RDD之上实现的。当Spark转换数据时,它不会立即计算转换,而是计划以后如何计算。当 明确调用诸如之类的动作时collect(),计算开始。该笔记本显示了主要面向新用户的DataFrame的基本用法。您可以在此处的实时笔记本上自己运行这些示例的最新版本。
Apache Spark文档站点中还有其他有用的信息,请参见最新版本的Spark SQL和DataFrames,RDD编程指南,结构化流编程指南,Spark流流编程指南和机器学习库(MLlib)指南。
PySpark应用程序从初始化开始,SparkSession这是PySpark的入口,如下所示。如果通过pyspark可执行文件在PySpark Shell中运行它,则该Shell会自动在变量spark中为用户创建会话。
代码实战
DataFrame创建
pyspark.sql.SparkSession.createDataFrame通常可以通过传递列表,元组,字典和pyspark.sql.Rows的列表,pandas DataFrame和由此类列表组成的RDD来创建PySpark DataFrame 。pyspark.sql.SparkSession.createDataFrame使用schema参数指定DataFrame的架构。省略时,PySpark通过从数据中采样来推断相应的模式。
首先,您可以从行列表中创建一个PySpark DataFrame
from datetime import datetime, date
import pandas as pd
from pyspark.sql import Row
df = spark.createDataFrame([
Row(a=1, b=2., c=‘string1’, d=date(2000, 1, 1), e=datetime(2000, 1, 1, 12, 0)),
Row(a=2, b=3., c=‘string2’, d=date(2000, 2, 1), e=datetime(2000, 1, 2, 12, 0)),
Row(a=4, b=5., c=‘string3’, d=date(2000, 3, 1), e=datetime(2000, 1, 3, 12, 0))
])
df
DataFrame [a:bigint,b:double,c:string,d:date,e:timestamp]
使用显式架构创建一个PySpark DataFrame。
df = spark.createDataFrame([
(1, 2., ‘string1’, date(2000, 1, 1), datetime(2000, 1, 1, 12, 0)),
(2, 3., ‘string2’, date(2000, 2, 1), datetime(2000, 1, 2, 12, 0)),
(3, 4., ‘string3’, date(2000, 3, 1), datetime(2000, 1, 3, 12, 0))
], schema=‘a long, b double, c string, d date, e timestamp’)
df
DataFrame [a:bigint,b:double,c:string,d:date,e:timestamp]
从pandas DataFrame创建一个PySpark DataFrame
pandas_df = pd.DataFrame({
‘a’: [1, 2, 3],
‘b’: [2., 3., 4.],
‘c’: [‘string1’, ‘string2’, ‘string3’],
‘d’: [date(2000, 1, 1), date(2000, 2, 1), date(2000, 3, 1)],
‘e’: [datetime(2000, 1, 1, 12, 0), datetime(2000, 1, 2, 12, 0), datetime(2000, 1, 3, 12, 0)]
})
df = spark.createDataFrame(pandas_df)
df
DataFrame [a:bigint,b:double,c:string,d:date,e:timestamp]
从由元组列表组成的RDD中创建一个PySpark DataFrame。
rdd = spark.sparkContext.parallelize([
(1, 2., ‘string1’, date(2000, 1, 1), datetime(2000, 1, 1, 12, 0)),
(2, 3., ‘string2’, date(2000, 2, 1), datetime(2000, 1, 2, 12, 0)),
(3, 4., ‘string3’, date(2000, 3, 1), datetime(2000, 1, 3, 12, 0))
])
df = spark.createDataFrame(rdd, schema=[‘a’, ‘b’, ‘c’, ‘d’, ‘e’])
df
DataFrame [a:bigint,b:double,c:string,d:date,e:timestamp]
首先创建的DataFrame具有相同的结果和架构。
All DataFrames above result same.
df.show()
df.printSchema()
根
|-a:长整数(可为空= true)
|-b:双精度(nullable = true)
|-c:字符串(可为空= true)
|-d:日期(可为空= true)
|-e:时间戳记(nullable = true)
查看数据
可以使用来显示DataFrame的顶行DataFrame.show()。
df.show(1)
仅显示前1行
或者,您可以spark.sql.repl.eagerEval.enabled在Jupyter等笔记本电脑中启用配置,以便对PySpark DataFrame进行急切的评估。可以通过spark.sql.repl.eagerEval.maxNumRows配置控制要显示的行数。
spark.conf.set(‘spark.sql.repl.eagerEval.enabled’, True)
df
一种 b C d Ë
1个 2.0 字符串1 2000-01-01 2000-01-01 12:00:00
2个 3.0 字符串2 2000-02-01 2000-01-02 12:00:00
3 4.0 字符串3 2000-03-01 2000-01-03 12:00:00
这些行也可以垂直显示。当行太长而无法水平显示时,此功能很有用。
df.show(1, vertical=True)
您可以看到DataFrame的架构和列名称,如下所示:
df.columns
[‘a’,‘b’,‘c’,‘d’,‘e’]
df.printSchema()
根
|-a:长整数(可为空= true)
|-b:双精度(nullable = true)
|-c:字符串(可为空= true)
|-d:日期(可为空= true)
|-e:时间戳记(nullable = true)
显示数据框的摘要
df.select(“a”, “b”, “c”).describe().show()
DataFrame.collect()将分发的数据作为Python中的本地数据收集到驱动程序端。请注意,当数据集太大而无法放入驱动程序端时,这可能会引发内存不足错误,因为它会将所有数据从执行程序收集到驱动程序端。
df.collect()
[Row(a = 1,b = 2.0,c =‘string1’,d = datetime.date(2000,1,1),e = datetime.datetime(2000,1,1,12,0)),
Row(a = 2,b = 3.0,c =‘string2’,d = datetime.date(2000,2,1),e = datetime.datetime(2000,1,2,12,0)),
行(a = 3,b = 4.0,c =‘string3’,d = datetime.date(2000,3,1),e = datetime.datetime(2000,1,3,12,0))]]
为了避免引发内存不足的异常,请使用DataFrame.take()或DataFrame.tail()。
df.take(1)
[Row(a = 1,b = 2.0,c =‘string1’,d = datetime.date(2000,1,1),e = datetime.datetime(2000,1,1,12,0))]]
PySpark DataFrame还提供了转换回pandas DataFrame的功能,以利用pandas API。请注意,toPandas还将所有数据收集到驱动程序端,当数据太大而无法放入驱动程序端时,这些数据很容易导致内存不足错误。
df.toPandas()
选择和访问数据
PySpark DataFrame的计算是延迟的,仅选择一个列不会触发计算,但会返回一个Column实例。
df.a
列<b’a’>
实际上,大多数列操作都返回Columns。
from pyspark.sql import Column
from pyspark.sql.functions import upper
type(df.c) == type(upper(df.c)) == type(df.c.isNull())
真的
这些Column可以用于从DataFrame中选择列。例如,DataFrame.select()采用Column返回另一个DataFrame的实例。
df.select(df.c).show()
分配新Column实例。
df.withColumn(‘upper_c’, upper(df.c)).show()
要选择行的子集,请使用DataFrame.filter()。
df.filter(df.a == 1).show()
将函数
PySpark支持各种UDF和API,以允许用户执行Python本机功能。另请参阅最新的Pandas UDF和Pandas Function API。例如,下面的示例允许用户在Python本机函数内直接使用pandas系列中的API 。
import pandas
from pyspark.sql.functions import pandas_udf
@pandas_udf(‘long’)
def pandas_plus_one(series: pd.Series) -> pd.Series:
# Simply plus one by using pandas Series.
return series + 1
df.select(pandas_plus_one(df.a)).show()
另一个示例是DataFrame.mapInPandas允许用户直接在pandas DataFrame中使用API,而没有诸如结果长度之类的任何限制。
def pandas_filter_func(iterator):
for pandas_df in iterator:
yield pandas_df[pandas_df.a == 1]
df.mapInPandas(pandas_filter_func, schema=df.schema).show()
分组数据
PySpark DataFrame还提供了一种通过使用通用方法,拆分应用,合并策略来处理分组数据的方法。它按一定条件对数据进行分组,将一个函数应用于每个组,然后将它们组合回DataFrame。
df = spark.createDataFrame([
[‘red’, ‘banana’, 1, 10], [‘blue’, ‘banana’, 2, 20], [‘red’, ‘carrot’, 3, 30],
[‘blue’, ‘grape’, 4, 40], [‘red’, ‘carrot’, 5, 50], [‘black’, ‘carrot’, 6, 60],
[‘red’, ‘banana’, 7, 70], [‘red’, ‘grape’, 8, 80]], schema=[‘color’, ‘fruit’, ‘v1’, ‘v2’])
df.show()
分组,然后将avg()功能应用于结果组。
df.groupby(‘color’).avg().show()
您还可以使用pandas API对每个组应用Python本机函数。
def plus_mean(pandas_df):
return pandas_df.assign(v1=pandas_df.v1 - pandas_df.v1.mean())
df.groupby(‘color’).applyInPandas(plus_mean, schema=df.schema).show()
共同分组并应用功能。
df1 = spark.createDataFrame(
[(20000101, 1, 1.0), (20000101, 2, 2.0), (20000102, 1, 3.0), (20000102, 2, 4.0)],
(‘time’, ‘id’, ‘v1’))
df2 = spark.createDataFrame(
[(20000101, 1, ‘x’), (20000101, 2, ‘y’)],
(‘time’, ‘id’, ‘v2’))
def asof_join(l, r):
return pd.merge_asof(l, r, on=‘time’, by=‘id’)
df1.groupby(‘id’).cogroup(df2.groupby(‘id’)).applyInPandas(
asof_join, schema=‘time int, id int, v1 double, v2 string’).show()
数据输入/输出
CSV简单易用。Parquet和ORC是高效且紧凑的文件格式,可以更快地读取和写入。
PySpark中还有许多其他数据源,例如JDBC,文本,binaryFile,Avro等。另请参阅Apache Spark文档中的最新Spark SQL,DataFrame和Datasets指南。
CSV
df.write.csv(‘foo.csv’, header=True)
spark.read.csv(‘foo.csv’, header=True).show()
实木地板
df.write.parquet(‘bar.parquet’)
spark.read.parquet(‘bar.parquet’).show()
ORC
df.write.orc(‘zoo.orc’)
spark.read.orc(‘zoo.orc’).show()
使用
DataFrame和Spark SQL共享同一个执行引擎,因此可以无缝互换使用。例如,您可以将DataFrame注册为表并轻松运行SQL,如下所示:
df.createOrReplaceTempView(“tableA”)
spark.sql(“SELECT count(*) from tableA”).show()
此外,可以直接在SQL中注册和调用UDF:
@pandas_udf(“integer”)
def add_one(s: pd.Series) -> pd.Series:
return s + 1
spark.udf.register(“add_one”, add_one)
spark.sql(“SELECT add_one(v1) FROM tableA”).show()
这些SQL表达式可以直接混合并用作PySpark列。
from pyspark.sql.functions import expr
df.selectExpr(‘add_one(v1)’).show()
df.select(expr(‘count(*)’) > 0).show()
提交
spark.worker.cleanup.appDataTtl
–packages
–repositories
–py-files选项可以用于分发.egg,.zip并且.py库执行者
参考资料
https://spark.apache.org/docs/latest/api/python/user_guide/index.html
更多推荐
所有评论(0)