我的 hdfs 結構顯示了一個名為“mapped_files”的檔案夾,該檔案夾中包含多個 csv 檔案 - “mapped_file_1.csv”、“mapped_file_2.csv”、...如何合并所有這些檔案? 這些檔案可能沒有完全相同的列。例如,當我使用 pyspark 讀取檔案“mapped_file_1.csv”和“mapped_file_2.csv”時,它們看起來像這樣:
###mapped_file_1.csv
---------- --------- --------- -------- ----- ----- ----------- ---------- ------------------ --------------------- ------------------- --------------- --------------------
|chromosome| start| end|assembly| ref| alt|risk_allele| genes| phenotype|clinical_significance|polyphen_prediction|sift_prediction| hgvs|
---------- --------- --------- -------- ----- ----- ----------- ---------- ------------------ --------------------- ------------------- --------------- --------------------
| 9| 96369762| 96369762| null| C/T| C/T| T|intergenic|Migraine with aura| null| null| null| rs59270819|
| 10| 29075768| 29075768| null|G/A/C|G/A/C| A|intergenic|Migraine with aura| null| null| null| rs59495588|
---------- --------- --------- -------- ----- ----- ----------- ---------- ------------------ --------------------- ------------------- --------------- --------------------
###mapped_file_2.csv
------------ ---------- ---------- ---------- -------------------- ----------- ------------------- --------------- -------------------- -------- ------------- -------- -------- --- --- ----------- -------------------- ---------------------- -------------- --------------------- ---------------- -------------------- -------------------- ---------- -------------------- -------- ----- ---- ------- ---- ------- ------- -------------------- ------- -------------------- -----------------
|variant_name|variant_id|chromosome| genes| variant_type|description|polyphen_prediction|sift_prediction| hgvs|assembly|assembly.date| start| end|ref|alt|risk_allele| phenotype|clinical_actionability|classification|clinical_significance| method| assertion_criteria| level_certainty| date| author| origin|title|year|authors|pmid|is_gwas| name| url|version|databanks.variant_id|clinvar_accession|
------------ ---------- ---------- ---------- -------------------- ----------- ------------------- --------------- -------------------- -------- ------------- -------- -------- --- --- ----------- -------------------- ---------------------- -------------- --------------------- ---------------- -------------------- -------------------- ---------- -------------------- -------- ----- ---- ------- ---- ------- ------- -------------------- ------- -------------------- -----------------
| null| null| 1|['TARDBP']|single nucleotide...| null| null| null|['Q13148:p.Ala90V...| GRCh38| null|11016874|11016874| C| T| T|Amyotrophic later...| null| null| Uncertain signifi...| literature only| null|no assertion crit...|2019-07-02| GeneReviews|germline| null|null| null|null| null|ClinVar|https://www.ncbi....| null| null| VCV000021481|
| null| null| 1|['TARDBP']|single nucleotide...| null| null| null|['Q13148:p.Ala90V...| GRCh38| null|11016874|11016874| C| T| T|Amyotrophic later...| null| null| Uncertain signifi...|clinical testing|Invitae Variant C...|criteria provided...|2019-08-15| Invitae|germline| null|null| null|null| null|ClinVar|https://www.ncbi....| null| null| VCV000021481|
| null| null| 1|['TARDBP']|single nucleotide...| null| null| null|['Q13148:p.Ala90V...| GRCh38| null|11016874|11016874| C| T| T|Amyotrophic later...| null| null| Uncertain signifi...| literature only| null|no assertion crit...|2019-07-02| GeneReviews|germline| null|null| null|null| null|ClinVar|https://www.ncbi....| null| null| VCV000021481|
------------ ---------- ---------- ---------- -------------------- ----------- ------------------- --------------- -------------------- -------- ------------- -------- -------- --- --- ----------- -------------------- ---------------------- -------------- --------------------- ---------------- -------------------- -------------------- ---------- -------------------- -------- ----- ---- ------- ---- ------- ------- -------------------- ------- -------------------- -----------------
從前面的資料幀/檔案中,我們可以看到兩個資料幀/檔案中都不存在列。我這樣做了:
from pyspark.sql import SparkSession
from pyspark.sql import Row
from functools import reduce
import pyspark.sql.functions as F
warehouse_location ='hdfs://hdfs-nn:9000'
spark = SparkSession \
.builder \
.master("local[2]") \
.appName("csv") \
.config("spark.sql.warehouse.dir", warehouse_location) \
.getOrCreate()
all_data = spark.read.options(header='True', delimiter=';').csv("hdfs://hdfs-nn:9000/mapped_files/*")
all_data.show()
------------ ---------- ---------- ---------- -------------------- ----------- ------------------- --------------- -------------------- -------- ------------- -------- ---------- ---- ---- ----------- -------------------- ---------------------- -------------- --------------------- ---------------- -------------------- -------------------- ---------- -------------------- -------- ----- ---- ------- ---- ------- ------- -------------------- ------- -------------------- -----------------
|variant_name|variant_id|chromosome| genes| variant_type|description|polyphen_prediction|sift_prediction| hgvs|assembly|assembly.date| start| end| ref| alt|risk_allele| phenotype|clinical_actionability|classification|clinical_significance| method| assertion_criteria| level_certainty| date| author| origin|title|year|authors|pmid|is_gwas| name| url|version|databanks.variant_id|clinvar_accession|
------------ ---------- ---------- ---------- -------------------- ----------- ------------------- --------------- -------------------- -------- ------------- -------- ---------- ---- ---- ----------- -------------------- ---------------------- -------------- --------------------- ---------------- -------------------- -------------------- ---------- -------------------- -------- ----- ---- ------- ---- ------- ------- -------------------- ------- -------------------- -----------------
| null| null| 1|['TARDBP']|single nucleotide...| null| null| null|['Q13148:p.Ala90V...| GRCh38| null|11016874| 11016874| C| T| T|Amyotrophic later...| null| null| Uncertain signifi...| literature only| null|no assertion crit...|2019-07-02| GeneReviews|germline| null|null| null|null| null|ClinVar|https://www.ncbi....| null| null| VCV000021481|
| null| null| 1|['TARDBP']|single nucleotide...| null| null| null|['Q13148:p.Ala90V...| GRCh38| null|11016874| 11016874| C| T| T|Amyotrophic later...| null| null| Uncertain signifi...|clinical testing|Invitae Variant C...|criteria provided...|2019-08-15| Invitae|germline| null|null| null|null| null|ClinVar|https://www.ncbi....| null| null| VCV000021481|
| null| null| 1|['TARDBP']|single nucleotide...| null| null| null|['Q13148:p.Ala90V...| GRCh38| null|11016874| 11016874| C| T| T|Amyotrophic later...| null| null| Uncertain signifi...| literature only| null|no assertion crit...|2019-07-02| GeneReviews|germline| null|null| null|null| null|ClinVar|https://www.ncbi....| null| null| VCV000021481|
| 9| 96369762| 96369762| null| C/T| C/T| T| intergenic| Migraine with aura| null| null| null|rs59270819|null|null| null| null| null| null| null| null| null| null| null| null| null| null|null| null|null| null| null| null| null| null| null|
| 10| 29075768| 29075768| null| G/A/C| G/A/C| A| intergenic| Migraine with aura| null| null| null|rs59495588|null|null| null| null| null| null| null| null| null| null| null| null| null| null|null| null|null| null| null| null| null| null| null|
------------ ---------- ---------- ---------- -------------------- ----------- ------------------- --------------- -------------------- -------- ------------- -------- ---------- ---- ---- ----------- -------------------- ---------------------- -------------- --------------------- ---------------- -------------------- -------------------- ---------- -------------------- -------- ----- ---- ------- ---- ------- ------- -------------------- ------- -------------------- -----------------
only showing top 20 rows
當我使用前面的代碼時,共同列的值沒有出現在正確的位置(在最后兩行中,值不在正確的列中)。
所以,我的問題是:如何使用 pyspark 讀取多個 csv 檔案并將它們合并在一起(它們可能沒有相同的列)?
uj5u.com熱心網友回復:
簡單的方法是將缺失的列添加到兩個資料框并使用聯合函式。我更喜歡unionByName,所以我將在我的示例中使用它:
df1 = spark.read.options(header='True', delimiter=';').csv("mapped_file_1.csv")
df2 = spark.read.options(header='True', delimiter=';').csv("mapped_file_2.csv")
united_df = df1.unionByName(df2, allowMissingColumns=True)
allowMissingColumns 將使用 NULL 完成資料框中缺失的列。
如果您有超過 2 個檔案,則可以定義一個函式并使用 reduce 來合并所有資料框:
def unite_dfs(df1, df2):
return df1.unionByName(df2, allowMissingColumns=True)
list_of_dfs = [df1, df2, df3, df4, df5, df6]
united_df = reduce(unite_dfs, list_of_dfs)
讓我知道這是否清楚。我沒有包含匯入,因為我只使用了您的代碼片段中的庫。如果不清楚我可以編輯。
轉載請註明出處,本文鏈接:https://www.uj5u.com/ruanti/460154.html
標籤:阿帕奇火花 Hadoop pyspark 高清晰度电视
