Spark配置
Spark提供三個位置配置系統:
- Spark屬性控制大多數應用程式引數,可以通過使用SparkConf物件或通過Java系統屬性進行設定,
- 可以使用環境變數在每個節點上通過conf/spark-env.sh腳本設定每臺機器的設定,例如IP地址,
- 可以通過log4jb .properties配置日志記錄,
Spark屬性控制大多數應用程式設定,并為每個應用程式單獨配置,這些屬性可以直接設定SparkConf傳遞給你的SparkContext,SparkConf允許您配置一些常用屬性(例如主URL和應用程式名稱),以及通過set()方法配置任意鍵值對,例如,我們可以用如下兩個執行緒初始化一個應用程式:
val conf = new SparkConf()
.setMaster("local[2]")
.setAppName("CountingSheep")
val sc = new SparkContext(conf)
注意,我們使用本地[2]運行,這意味著兩個執行緒,表示“最小”并行性,
單位格式:
時間:
25ms (milliseconds)
5s (seconds)
10m or 10min (minutes)
3h (hours)
5d (days)
1y (years)
位元組大小:
1b (bytes)
1k or 1kb (kibibytes = 1024 bytes)
1m or 1mb (mebibytes = 1024 kibibytes)
1g or 1gb (gibibytes = 1024 mebibytes)
1t or 1tb (tebibytes = 1024 gibibytes)
1p or 1pb (pebibytes = 1024 tebibytes)
動態加載Spark屬性
在某些情況下,您可能希望避免在SparkConf中硬編碼某些配置,例如,如果您想運行相同的應用程式與不同的主控或不同的記憶體數量,Spark允許你簡單地創建一個空的conf:
val sc = new SparkContext(new SparkConf())
然后,你可以在運行時提供配置值:
./bin/spark-submit --name "My app" --master local[4] --conf spark.eventLog.enabled=false
--conf "spark.executor.extraJavaOptions=-XX:+PrintGCDetails -XX:+PrintGCTimeStamps" myApp.jar
Spark shell和Spark-submit工具支持兩種動態加載配置的方式,第一個是命令列選項,如——master,如上所示,Spark-submit可以接受使用——conf/-c標志的任何Spark屬性,但對在啟動Spark應用程式時起作用的屬性使用特殊標志,運行/bin/spark-submit ——help將顯示這些選項的完整串列,
.bin/spark-submit還將從conf/spark-defaults.conf中讀取配置選項,其中每一行由一個鍵和一個由空格分隔的值組成,例如:
spark.master spark://5.6.7.8:7077
spark.executor.memory 4g
spark.eventLog.enabled true
spark.serializer org.apache.spark.serializer.KryoSerializer
任何指定為標志或屬性檔案中的值都將傳遞給應用程式,并與通過SparkConf指定的值合并,直接在SparkConf上設定的屬性優先級最高,然后是傳遞給spark-submit或spark-shell的標志,然后是spark-defaults.conf檔案中的選項,一些配置鍵從早期版本的Spark開始重新命名;在這種情況下,舊的鍵名仍然被接受,但其優先級低于新鍵的任何實體,即:
(程式中配置)>(啟動時配置)>(組態檔配置)(新>舊)
Spark屬性主要可以分為兩種:一種是與部署相關的,比如“Spark.driver.memory”、“spark.executor.instances”,這類屬性在運行時通過SparkConf編程設定時可能不受影響,或者行為取決于您選擇的集群管理器和部署模式,因此建議通過組態檔或spark-submit命令列選項設定;另一個主要與Spark運行時控制相關,如“spark.task.maxFailures”,這類屬性可以用兩種方式設定,
詳細配置情況,可查詢官網檔案:https://spark.apache.org/docs/latest/configuration.html
可用配置
Spark SQL
運行時
運行時SQL配置是每會話的、可變的Spark SQL配置,它們可以通過組態檔和帶有——conf/-c前綴的命令列選項設定初始值,或者通過設定用于創建SparkSession的SparkConf,此外,它們可以通過set命令設定和查詢,并通過RESET命令或運行時SparkSession.conf的setter和getter方法保持初始值,
val sparkConf = new SparkConf()
.setMaster("local[*]")
.setAppName("sparkSQL")
val spark = SparkSession.builder()
.config(sparkConf)
.getOrCreate()
靜態
靜態SQL配置是跨會話的、不可變的Spark SQL配置,它們可以通過組態檔和帶有——conf/-c前綴的命令列選項設定最終值,或者通過設定用于創建SparkSession的SparkConf,外部用戶可以通過SparkSession.conf或set命令查詢靜態sql配置值,例如set spark.sql.extensions;,但不能設定/取消設定它們,
Spark Streaming
val sparkConf = new SparkConf().setMaster("local[*]").setAppName("sparkStreaming")
// 創建時需要傳遞兩個引數
// 1.環境配置
// 2.批處理周期(采集周期)
val ssc = new StreamingContext(sparkConf, Seconds(3))
集群管理
Spark中的每個集群管理器都有額外的配置選項,配置可以在每個模式的頁面上找到:
YARN
Mesos
Kubernetes
Standalone Mode
環境變數
Spark的某些設定可以通過環境變數進行配置,環境變數從Spark安裝目錄下的conf/Spark-env.sh腳本(或conf/Spark-env.sh)中讀取,cmd視窗),在單機模式和Mesos模式下,該檔案可以提供特定于計算機的資訊,如主機名,當運行本地Spark應用程式或提交腳本時,它也是來源,
注意:安裝Spark時,conf/Spark-env.sh默認不存在,但可以拷貝conf/spark-env.sh.template來創建它,確保你的拷貝是可執行的,
注意:在集群模式下在YARN上運行Spark時,需要使用Spark.YARN.appMasterEnv設定環境變數,在你的conf/spark-defaults.conf檔案中的[EnvironmentVariableName]屬性,在spark-env.sh中設定的環境變數在集群模式下不會反映在YARN Application Master行程中,更多資訊請參見與yarn相關的Spark屬性,
配置日志
Spark使用log4j進行日志記錄,您可以通過添加log4j2來配置它,conf目錄下的屬性檔案,一種開始的方法是復制現有的log4j2.properties.template,
重寫配置目錄
如果需要指定默認的“SPARK_HOME/conf”目錄以外的其他配置目錄,可以通過設定“SPARK_CONF_DIR”,Spark將使用組態檔(spark-defaults.conf, spark-env.sh, log4j2.properties等),
繼承Hadoop集群配置
如果你計劃使用Spark從HDFS進行讀寫操作,有兩個Hadoop組態檔應該包含在Spark的類路徑中:
hdfs-site.xml,
core-site.xml,
這些組態檔的位置在不同的Hadoop版本中是不同的,但是一個常見的位置是在/etc/Hadoop/conf中,有些工具動態地創建配置,但提供了下載配置副本的機制,
要使這些檔案對Spark可見,請將$SPARK_HOME/conf/Spark-env.sh中的HADOOP_CONF_DIR設定到包含組態檔的位置,
hadoop/hive配置
如果您的Spark應用程式正在與Hadoop、Hive或兩者互動,那么在Spark的類路徑中可能存在Hadoop/Hive組態檔,
多個運行的應用程式可能需要不同的Hadoop/Hive客戶端配置,您可以在Spark的類路徑中為每個應用程式復制和修改hdfs-site.xml, core-site.xml, yarn-site.xml, hive-site.xml,在運行在YARN上的Spark集群中,這些組態檔是在集群范圍內設定的,應用程式無法安全地更改它們,
更好的選擇是以spark.hadoop.的形式使用spark hadoop屬性,并以spark.hive.形式的spark hive屬性,例如,增加配置“spark.hadoop.abc.def=xyz”表示增加hadoop屬性“abc.def=xyz”,增加配置“spark.hive.abc=xyz表示添加hive屬性“hive.abc =xyz”,它們可以被認為與普通的spark屬性相同,可以在$SPARK_HOME/conf/spark-defaults.conf中設定
在某些情況下,您可能希望避免在SparkConf中硬編碼某些配置,例如,Spark允許您簡單地創建一個空的conf并設定Spark/Spark hadoop/ Spark hive屬性,
val conf = new SparkConf().set("spark.hadoop.abc.def", "xyz")
val sc = new SparkContext(conf)
此外,您可以在運行時修改或添加配置:
./bin/spark-submit \
--name "My app" \
--master local[4] \
--conf spark.eventLog.enabled=false \
--conf "spark.executor.extraJavaOptions=-XX:+PrintGCDetails -XX:+PrintGCTimeStamps" \
--conf spark.hadoop.abc.def=xyz \
--conf spark.hive.abc=xyz
myApp.jar
自定義資源調度與配置概述
gpu和其他加速器已廣泛用于加速特殊作業負載,如深度學習和信號處理,Spark現在支持請求和調度通用資源,如gpu,但有一些注意事項,當前的實作要求資源具有可由調度器分配的地址,它需要您的集群管理器支持并正確配置資源,
有一些配置可以為驅動程式請求資源:spark.driver.resource.{resourceName},spark.executor.resource.{resourceName}為執行器請求資源,spark.task.resource.{resourceName}.amount為每個任務指定需求,spark.driver.resource.{resourceName}在YARN、Kubernetes和Spark Standalone上的客戶端驅動程式上需要discoveryScript配置,spark.executor.resource,{resourceName},discoveryScript配置對于YARN和Kubernetes是必需的,Kubernetes還需要spark.driver.resource.{resourceName}.vendor/spark.executor.resource,{resourceName}.vendor,請參閱上面的配置描述以獲得關于每種配置的更多資訊,
Spark將使用指定的配置來首先從集群管理器請求具有相應資源的容器,一旦它獲得了容器,Spark在該容器中啟動一個Executor,它將發現容器中有什么資源以及與每個資源關聯的地址,執行程式將向驅動程式注冊并向該執行程式報告可用的資源,然后,Spark調度器可以將任務調度到每個Executor,并根據用戶指定的資源需求分配特定的資源地址,用戶可以使用TaskContext.get().resources api查看分配給任務的資源,在驅動程式上,用戶可以看到SparkContext資源呼叫所分配的資源,然后由用戶決定使用分配的地址來執行他們想要的處理,或者將這些地址傳遞到他們正在使用的ML/AI框架中,
查看您的集群管理器特定頁面了解每種模式的要求和詳細資訊——YARN、Kubernetes和獨立模式,它目前不能用于Mesos或本地模式,還請注意,不支持具有多個作業者的本地集群模式(請參閱獨立檔案),
stage級別調度概述
階段級調度特性允許用戶在階段級指定任務和執行程式資源需求,這允許使用具有不同資源的執行程式運行不同的階段,一個主要的例子是ETL階段使用只有cpu的執行程式運行,下一個階段是需要gpu的ML階段,階段級調度允許用戶在ML階段運行時請求不同的具有gpu的執行程式,而不必在應用程式開始時獲取具有gpu的執行程式,并且這些執行程式在ETL階段運行時處于空閑狀態,這只適用于Scala、Java和Python中的RDD API,當啟用動態分配時,它在YARN和Kubernetes上可用,有關更多實作細節,請參見YARN頁面或Kubernetes頁面,
看RDD,withResources和ResourceProfileBuilder API用于使用該特性,當前的實作為每個創建的ResourceProfile獲取新的執行程式,目前必須是精確匹配的,Spark不會嘗試將需要不同于執行器創建時所使用的ResourceProfile的任務放入執行器中,未被使用的執行程式將使用動態分配邏輯閑置超時,該特性的默認配置是每個階段只允許一個ResourceProfile,如果用戶把RDD關聯的ResourceProfile超過1個,Spark默認會拋出例外,參見config spark.scheduler.resource.profilemergeneconflicts來控制該行為,Spark在Spark.scheduler.resource. profilemergeneconflicts啟用時實作的當前合并策略是沖突的ResourceProfiles中每個資源的簡單最大值,Spark將使用每個資源的最大值創建一個新的ResourceProfile,
基于push的shuffle概述
基于push的shuffle有助于提高spark shuffle的可靠性和性能,它將map任務生成的shuffle塊推到遠程外部shuffle服務,以便每個shuffle磁區合并,Reduce任務獲取合并的shuffle磁區和原始shuffle塊的組合作為它們的輸入資料,從而將外部shuffle服務的小的隨機讀取磁盤轉換為大的順序讀取,為reduce任務提供更好的資料位置的可能性還有助于最小化網路IO,在某些情況下,基于push的shuffle優先于批處理獲取,比如合并輸出可用時的磁區合并,
基于push的shuffle提高了長時間運行的作業/查詢的性能,這些作業/查詢在shuffle期間涉及大量磁盤I/O,目前,它不太適合快速運行的作業/查詢,處理少量的shuffle資料,這將在未來的版本中得到進一步的改進,
轉載請註明出處,本文鏈接:https://www.uj5u.com/shujuku/511103.html
標籤:其他
上一篇:CentOS 7.9 安裝 rabbitmq-3.10.2
下一篇:MySQL 視窗函式
