?? 作者:韓信子@ShowMeAI
?? 大資料技術?技能提升系列:https://www.showmeai.tech/tutorials/84
?? 資料分析實戰系列:https://www.showmeai.tech/tutorials/40
?? 本文地址:https://www.showmeai.tech/article-detail/338
?? 宣告:著作權所有,轉載請聯系平臺與作者并注明出處
?? 收藏ShowMeAI查看更多精彩內容
Pandas 是每位資料科學家和 Python 資料分析師都熟悉的工具庫,它靈活且強大具備豐富的功能,但在處理大型資料集時,它是非常受限的,
這種情況下,我們會過渡到 PySpark,結合 Spark 生態強大的大資料處理能力,充分利用多機器并行的計算能力,可以加速計算,不過 PySpark 的語法和 Pandas 差異也比較大,很多開發人員會感覺這很讓人頭大,
在本篇內容中, ShowMeAI 將對最核心的資料處理和分析功能,梳理 PySpark 和 Pandas 相對應的代碼片段,以便大家可以無痛地完成 Pandas 到大資料 PySpark 的轉換??
大資料處理分析及機器學習建模相關知識,ShowMeAI制作了詳細的教程與工具速查手冊,大家可以通過如下內容展開學習或者回顧相關知識,
??圖解資料分析:從入門到精通系列教程
??圖解大資料技術:從入門到精通系列教程
??圖解機器學習演算法:從入門到精通系列教程
??資料科學工具庫速查表 | Spark RDD 速查表
??資料科學工具庫速查表 | Spark SQL 速查表
?? 匯入工具庫
在使用具體功能之前,我們需要先匯入所需的庫:
# pandas vs pyspark,工具庫匯入
import pandas as pd
import pyspark.sql.functions as F
PySpark 所有功能的入口點是 SparkSession 類,通過 SparkSession 實體,您可以創建spark dataframe、應用各種轉換、讀取和寫入檔案等,下面是定義 SparkSession的代碼模板:
from pyspark.sql import SparkSession
spark = SparkSession\
.builder\
.appName('SparkByExamples.com')\
.getOrCreate()
?? 創建 dataframe
在 Pandas 和 PySpark 中,我們最方便的資料承載資料結構都是 dataframe,它們的定義有一些不同,我們來對比一下看看:
?? Pandas
columns = ["employee","department","state","salary","age"]
data = https://www.cnblogs.com/showmeai/p/[("Alain","Sales","Paris",60000,34),
("Ahmed","Sales","Lyon",80000,45),
("Ines","Sales","Nice",55000,30),
("Fatima","Finance","Paris",90000,28),
("Marie","Finance","Nantes",100000,40)]
創建DataFrame的 Pandas 語法如下:
df = pd.DataFrame(data=https://www.cnblogs.com/showmeai/p/data, columns=columns)
# 查看頭2行
df.head(2)
?? PySpark
創建DataFrame的 PySpark 語法如下:
df = spark.createDataFrame(data).toDF(*columns)
# 查看頭2行
df.limit(2).show()
?? 指定列型別
?? Pandas
Pandas 指定欄位資料型別的方法如下:
types_dict = {
"employee": pd.Series([r[0] for r in data], dtype='str'),
"department": pd.Series([r[1] for r in data], dtype='str'),
"state": pd.Series([r[2] for r in data], dtype='str'),
"salary": pd.Series([r[3] for r in data], dtype='int'),
"age": pd.Series([r[4] for r in data], dtype='int')
}
df = pd.DataFrame(types_dict)
Pandas 可以通過如下代碼來檢查資料型別:
df.dtypes
?? PySpark
PySpark 指定欄位資料型別的方法如下:
from pyspark.sql.types import StructType,StructField, StringType, IntegerType
schema = StructType([ \
StructField("employee",StringType(),True), \
StructField("department",StringType(),True), \
StructField("state",StringType(),True), \
StructField("salary", IntegerType(), True), \
StructField("age", IntegerType(), True) \
])
df = spark.createDataFrame(data=https://www.cnblogs.com/showmeai/p/data,schema=schema)
PySpark 可以通過如下代碼來檢查資料型別:
df.dtypes
# 查看資料型別
df.printSchema()
?? 讀寫檔案
Pandas 和 PySpark 中的讀寫檔案方式非常相似, 具體語法對比如下:
?? Pandas
df = pd.read_csv(path, sep=';', header=True)
df.to_csv(path, ';', index=False)
?? PySpark
df = spark.read.csv(path, sep=';')
df.coalesce(n).write.mode('overwrite').csv(path, sep=';')
注意 ①
PySpark 中可以指定要磁區的列:
df.partitionBy("department","state").write.mode('overwrite').csv(path, sep=';')
注意 ②
可以通過上面所有代碼行中的 parquet 更改 CSV 來讀取和寫入不同的格式,例如 parquet 格式
?? 資料選擇 - 列
?? Pandas
在 Pandas 中選擇某些列是這樣完成的:
columns_subset = ['employee', 'salary']
df[columns_subset].head()
df.loc[:, columns_subset].head()
?? PySpark
在 PySpark 中,我們需要使用帶有列名串列的 select 方法來進行欄位選擇:
columns_subset = ['employee', 'salary']
df.select(columns_subset).show(5)
?? 資料選擇 - 行
?? Pandas
Pandas可以使用 iloc對行進行篩選:
# 頭2行
df.iloc[:2].head()
?? PySpark
在 Spark 中,可以像這樣選擇前 n 行:
df.take(2).head()
# 或者
df.limit(2).head()
注意:使用 spark 時,資料可能分布在不同的計算節點上,因此“第一行”可能會隨著運行而變化,
?? 條件選擇
?? Pandas
Pandas 中根據特定條件過濾資料/選擇資料的語法如下:
# First method
flt = (df['salary'] >= 90_000) & (df['state'] == 'Paris')
filtered_df = df[flt]
# Second Method: Using query which is generally faster
filtered_df = df.query('(salary >= 90_000) and (state == "Paris")')
# Or
target_state = "Paris"
filtered_df = df.query('(salary >= 90_000) and (state == @target_state)')
?? PySpark
在 Spark 中,使用 filter方法或執行 SQL 進行資料選擇, 語法如下:
# 方法1:基于filter進行資料選擇
filtered_df = df.filter((F.col('salary') >= 90_000) & (F.col('state') == 'Paris'))
# 或者
filtered_df = df.filter(F.expr('(salary >= 90000) and (state == "Paris")'))
# 方法2:基于SQL進行資料選擇
df.createOrReplaceTempView("people")
filtered_df = spark.sql("""
SELECT * FROM people
WHERE (salary >= 90000) and (state == "Paris")
""")
?? 添加欄位
?? Pandas
在 Pandas 中,有幾種添加列的方法:
seniority = [3, 5, 2, 4, 10]
# 方法1
df['seniority'] = seniority
# 方法2
df.insert(2, "seniority", seniority, True)
?? PySpark
在 PySpark 中有一個特定的方法withColumn可用于添加列:
seniority = [3, 5, 2, 4, 10]
df = df.withColumn('seniority', seniority)
?? dataframe拼接
?? 2個dataframe - pandas
# pandas拼接2個dataframe
df_to_add = pd.DataFrame(data=https://www.cnblogs.com/showmeai/p/[("Robert","Advertisement","Paris",55000,27)], columns=columns)
df = pd.concat([df, df_to_add], ignore_index = True)
?? 2個dataframe - PySpark
# PySpark拼接2個dataframe
df_to_add = spark.createDataFrame([("Robert","Advertisement","Paris",55000,27)]).toDF(*columns)
df = df.union(df_to_add)
?? 多個dataframe - pandas
# pandas拼接多個dataframe
dfs = [df, df1, df2,...,dfn]
df = pd.concat(dfs, ignore_index = True)
?? 多個dataframe - PySpark
PySpark 中 unionAll 方法只能用來連接兩個 dataframe,我們使用 reduce 方法配合unionAll來完成多個 dataframe 拼接:
# pyspark拼接多個dataframe
from functools import reduce
from pyspark.sql import DataFrame
def unionAll(*dfs):
return reduce(DataFrame.unionAll, dfs)
dfs = [df, df1, df2,...,dfn]
df = unionAll(*dfs)
?? 簡單統計
Pandas 和 PySpark 都提供了為 dataframe 中的每一列進行統計計算的方法,可以輕松對下列統計值進行統計計算:
- 列元素的計數
- 列元素的平均值
- 最大值
- 最小值
- 標準差
- 三個分位數:25%、50% 和 75%
Pandas 和 PySpark 計算這些統計值的方法很類似,如下:
?? Pandas & PySpark
df.summary()
#或者
df.describe()
?? 資料分組聚合統計
Pandas 和 PySpark 分組聚合的操作也是非常類似的:
?? Pandas
df.groupby('department').agg({'employee': 'count', 'salary':'max', 'age':'mean'})
?? PySpark
df.groupBy('department').agg({'employee': 'count', 'salary':'max', 'age':'mean'})
但是,最終顯示的結果需要一些調整才能一致,
在 Pandas 中,要分組的列會自動成為索引,如下所示:
要將其作為列恢復,我們需要應用 reset_index方法:
df.groupby('department').agg({'employee': 'count', 'salary':'max', 'age':'mean'}).reset_index()
在 PySpark 中,列名會在結果dataframe中被重命名,如下所示:
要恢復列名,可以像下面這樣使用別名方法:
df.groupBy('department').agg(F.count('employee').alias('employee'), F.max('salary').alias('salary'), F.mean('age').alias('age'))
?? 資料轉換
在資料處理中,我們經常要進行資料變換,最常見的是要對「欄位/列」應用特定轉換,在Pandas中我們可以輕松基于apply函式完成,但在PySpark 中我們可以使用udf(用戶定義的函式)封裝我們需要完成的變換的Python函式,
例如,我們對salary欄位進行處理,如果工資低于 60000,我們需要增加工資 15%,如果超過 60000,我們需要增加 5%,
?? Pandas
Pandas 中的語法如下:
df['new_salary'] = df['salary'].apply(lambda x: x*1.15 if x<= 60000 else x*1.05)
?? Pyspark
PySpark 中的等價操作下:
from pyspark.sql.types import FloatType
df.withColumn('new_salary', F.udf(lambda x: x*1.15 if x<= 60000 else x*1.05, FloatType())('salary'))
?? 請注意, udf方法需要明確指定資料型別(在我們的例子中為 FloatType)
?? 總結
本篇內容中, ShowMeAI 給大家總結了Pandas和PySpark對應的功能操作細節,我們可以看到Pandas和PySpark的語法有很多相似之處,但是要注意一些細節差異,
另外,大家還是要基于場景進行合適的工具選擇:
- 在處理大型資料集時,使用 PySpark 可以為您提供很大的優勢,因為它允許并行計算,
- 如果您正在使用的資料集很小,那么使用Pandas會很快和靈活,
參考資料
- ?? 圖解資料分析:從入門到精通系列教程:https://www.showmeai.tech/tutorials/33
- ?? 圖解大資料技術:從入門到精通系列教程:https://www.showmeai.tech/tutorials/84
- ?? 圖解機器學習演算法:從入門到精通系列教程:https://www.showmeai.tech/tutorials/34
- ?? 資料科學工具庫速查表 | Spark RDD 速查表:https://www.showmeai.tech/article-detail/106
- ?? 資料科學工具庫速查表 | Spark SQL 速查表:https://www.showmeai.tech/article-detail/107
推薦閱讀
- ?? 資料分析實戰系列 :https://www.showmeai.tech/tutorials/40
- ?? 機器學習資料分析實戰系列:https://www.showmeai.tech/tutorials/41
- ?? 深度學習資料分析實戰系列:https://www.showmeai.tech/tutorials/42
- ?? TensorFlow資料分析實戰系列:https://www.showmeai.tech/tutorials/43
- ?? PyTorch資料分析實戰系列:https://www.showmeai.tech/tutorials/44
- ?? NLP實戰資料分析實戰系列:https://www.showmeai.tech/tutorials/45
- ?? CV實戰資料分析實戰系列:https://www.showmeai.tech/tutorials/46
- ?? AI 面試題庫系列:https://www.showmeai.tech/tutorials/48
轉載請註明出處,本文鏈接:https://www.uj5u.com/shujuku/538256.html
標籤:大數據
上一篇:大資料需求分析
