累加器Accumulators
- 一,累加器作用及其原理
- 1.1,作用
- 1.2,原理
- 二,累加器關鍵原始碼跟蹤閱讀
- 2.1,測驗代碼
- 2.2,跟蹤原始碼
- 2.2.1,add呼叫
- 2.2.2,merge呼叫
- 三,累加器在行動算子和轉換算子中執行有何不同
- 3.1,測驗代碼
- 四,累加器級別
- 4.1,檢查點與累加器
- 4.2 多個行動操作與累加器
- 五,自定義累加器
- 六,總結
一,累加器作用及其原理
1.1,作用
可實作分布式計數或求和;可以在spark application運行UI中顯示其值,便于除錯,
1.2,原理
在Driver端中定義的累加器Accumulators物件,跟隨各spark task任務分發到Executor端,反序列化后的Accumulators副本物件各自執行累加操作(add),task任務執行執行完畢后,Driver端對回傳的多個Accumulators副本物件執行合并操作(merge),
二,累加器關鍵原始碼跟蹤閱讀
2.1,測驗代碼
def main(args: Array[String]): Unit = {
log.info("-------begin---------")
val sparkConnf=new SparkConf().setAppName("acc").setMaster("local[3]")
val sparkContext=new SparkContext(sparkConnf)
val rdd = sparkContext.parallelize(Array(1, 2, 3, 4,5), 2)
var sum=0
val sumAcc = sparkContext.longAccumulator("sumAcc")
rdd.foreach(num=>{
sum=sum+num
sumAcc.add(num)
println("----excutor:----sumACC="+sumAcc)
})
println("---------sum="+sum)
println("---------sumAcc="+sumAcc.value)
sparkContext.stop()
}
關鍵日志:
----excutor:----sumACC=LongAccumulator(id: 0, name: Some(sumAcc), value: 3)
----excutor:----sumACC=LongAccumulator(id: 0, name: Some(sumAcc), value: 7)
----excutor:----sumACC=LongAccumulator(id: 0, name: Some(sumAcc), value: 12)
----excutor:----sumACC=LongAccumulator(id: 0, name: Some(sumAcc), value: 1)
----excutor:----sumACC=LongAccumulator(id: 0, name: Some(sumAcc), value: 3)
---------sum=0
---------sumAcc=15
2.2,跟蹤原始碼
2.2.1,add呼叫
SparkContext.scala
/**
* Create and register a long accumulator, which starts with 0 and accumulates inputs by `add`.
*/
def longAccumulator(name: String): LongAccumulator = {
val acc = new LongAccumulator
register(acc, name)
acc
}
/**
* Register the given accumulator with given name.
*
* @note Accumulators must be registered before use, or it will throw exception.
*/
def register(acc: AccumulatorV2[_, _], name: String): Unit = {
acc.register(this, name = Option(name))
}
AccumulatorV2.scala
private[spark] def register(
sc: SparkContext,
name: Option[String] = None,
countFailedValues: Boolean = false): Unit = {
if (this.metadata != null) {
throw new IllegalStateException("Cannot register an Accumulator twice.")
}
this.metadata = AccumulatorMetadata(AccumulatorContext.newId(), name, countFailedValues)
AccumulatorContext.register(this)
sc.cleaner.foreach(_.registerAccumulatorForCleanup(this))
}
AccumulatorContext
/**
* This global map holds the original accumulator objects that are created on the driver.
* It keeps weak references to these objects so that accumulators can be garbage-collected
* once the RDDs and user-code that reference them are cleaned up.
* TODO: Don't use a global map; these should be tied to a SparkContext (SPARK-13051).
*/
private val originals = new ConcurrentHashMap[Long, jl.ref.WeakReference[AccumulatorV2[_, _]]]
/**
* Registers an [[AccumulatorV2]] created on the driver such that it can be used on the executors.
*
* All accumulators registered here can later be used as a container for accumulating partial
* values across multiple tasks. This is what `org.apache.spark.scheduler.DAGScheduler` does.
* Note: if an accumulator is registered here, it should also be registered with the active
* context cleaner for cleanup so as to avoid memory leaks.
*
* If an [[AccumulatorV2]] with the same ID was already registered, this does nothing instead
* of overwriting it. We will never register same accumulator twice, this is just a sanity check.
*/
def register(a: AccumulatorV2[_, _]): Unit = {
originals.putIfAbsent(a.id, new jl.ref.WeakReference[AccumulatorV2[_, _]](a))
}
val sumAcc = sparkContext.longAccumulator("sumAcc")
這行代碼創建了一個LongAccumulator型別的累加器,并做了一些列注冊作業,并回傳了一個物件,
接下來rdd.foreach是一個行動操作:
/**
* Applies a function f to all elements of this RDD.
*/
def foreach(f: T => Unit): Unit = withScope {
val cleanF = sc.clean(f)
sc.runJob(this, (iter: Iterator[T]) => iter.foreach(cleanF))
}
這里runJon的第二個引數的實際內容大致為
(iter: Iterator[Int])=>iter.foreach(
num=>{
sum=sum+num
sumAcc.add(sum)
println("----excutor:----sumACC="+sumAcc)
}
)
這個函式在那執行?是在task類的runTask方法中執行的,
ResultTask.scala
override def runTask(context: TaskContext): U = {
// Deserialize the RDD and the func using the broadcast variables.
val threadMXBean = ManagementFactory.getThreadMXBean
val deserializeStartTimeNs = System.nanoTime()
val deserializeStartCpuTime = if (threadMXBean.isCurrentThreadCpuTimeSupported) {
threadMXBean.getCurrentThreadCpuTime
} else 0L
val ser = SparkEnv.get.closureSerializer.newInstance()
val (rdd, func) = ser.deserialize[(RDD[T], (TaskContext, Iterator[T]) => U)](
ByteBuffer.wrap(taskBinary.value), Thread.currentThread.getContextClassLoader)
_executorDeserializeTimeNs = System.nanoTime() - deserializeStartTimeNs
_executorDeserializeCpuTime = if (threadMXBean.isCurrentThreadCpuTimeSupported) {
threadMXBean.getCurrentThreadCpuTime - deserializeStartCpuTime
} else 0L
func(context, rdd.iterator(partition, context))
}
這里反序列化出來的func函式的第二個引數中就封裝了測驗代碼中rdd.foreach中傳入的用戶代碼,
(iter: Iterator[Int])=>iter.foreach(
num=>{
sum=sum+num
sumAcc.add(sum)
println("----excutor:----sumACC="+sumAcc)
}
)
func(context, rdd.iterator(partition, context)) 執行func函式,
rdd.iterator回傳該task負責的rdd一個磁區的所有資料組成的迭代器Iterator并作為引數傳入func中,在其函式體中遍歷Iterator中資料并挨個執行用戶自定義的邏輯代碼,
結論一:所以sumAcc.add(sum)的執行次數與行動操作回傳的資料元素數量一致,
2.2.2,merge呼叫
一個stage執行完畢后,executor會和ApplicationMaster通訊發送
CompletionEvent物件,
DAGSchedulerEventProcessLoop.scala
private def doOnReceive(event: DAGSchedulerEvent): Unit = event match {
case JobSubmitted(jobId, rdd, func, partitions, callSite, listener, properties) =>
dagScheduler.handleJobSubmitted(jobId, rdd, func, partitions, callSite, listener, properties)
case MapStageSubmitted(jobId, dependency, callSite, listener, properties) =>
dagScheduler.handleMapStageSubmitted(jobId, dependency, callSite, listener, properties)
case StageCancelled(stageId, reason) =>
dagScheduler.handleStageCancellation(stageId, reason)
case JobCancelled(jobId, reason) =>
dagScheduler.handleJobCancellation(jobId, reason)
case JobGroupCancelled(groupId) =>
dagScheduler.handleJobGroupCancelled(groupId)
case AllJobsCancelled =>
dagScheduler.doCancelAllJobs()
case ExecutorAdded(execId, host) =>
dagScheduler.handleExecutorAdded(execId, host)
case ExecutorLost(execId, reason) =>
val workerLost = reason match {
case SlaveLost(_, true) => true
case _ => false
}
dagScheduler.handleExecutorLost(execId, workerLost)
case WorkerRemoved(workerId, host, message) =>
dagScheduler.handleWorkerRemoved(workerId, host, message)
case BeginEvent(task, taskInfo) =>
dagScheduler.handleBeginEvent(task, taskInfo)
case SpeculativeTaskSubmitted(task) =>
dagScheduler.handleSpeculativeTaskSubmitted(task)
case GettingResultEvent(taskInfo) =>
dagScheduler.handleGetTaskResult(taskInfo)
case completion: CompletionEvent =>
dagScheduler.handleTaskCompletion(completion)
case TaskSetFailed(taskSet, reason, exception) =>
dagScheduler.handleTaskSetFailed(taskSet, reason, exception)
case ResubmitFailedStages =>
dagScheduler.resubmitFailedStages()
}
最侄訓呼叫updateAccumulators(event: CompletionEvent): Unit方法合并所有的累加器,

測驗代碼中sum=sum+num執行次數和 sumAcc.add(num)執行情況一摸一樣,都是分散在各個executor中執行的,但是其結果沒有回傳到Driver端,所以Driver端的sum結果一直是初始值,
三,累加器在行動算子和轉換算子中執行有何不同
上面寫的原始碼跟蹤里累加器是寫在行動操作中的,與寫在轉換操作中有什么不一樣?
3.1,測驗代碼
def main(args: Array[String]): Unit = {
log.info("-------begin---------")
val sparkConnf=new SparkConf().setAppName("acc").setMaster("local[1]")
val sparkContext=new SparkContext(sparkConnf)
val rdd = sparkContext.parallelize(Array(1, 2, 3, 4,5), 2)
val sumAcc = sparkContext.longAccumulator("sumAcc")
val mapRDD = rdd.map(num =>{
sumAcc.add(num)
println("----transfer:----sumACC="+sumAcc)
num
})
mapRDD.foreach(num=>{
sumAcc.add(num)
println("----action:----sumACC="+sumAcc)
})
println("----result:----sumACC="+sumAcc)
sparkContext.stop()
}
在轉換操作中執行的add操作顯然是在rdd的compute方法中被呼叫的,先于行動操作執行,
關鍵日志:
----transfer:----sumACC=LongAccumulator(id: 0, name: Some(sumAcc), value: 1)
----action:----sumACC=LongAccumulator(id: 0, name: Some(sumAcc), value: 2)
----transfer:----sumACC=LongAccumulator(id: 0, name: Some(sumAcc), value: 4)
----action:----sumACC=LongAccumulator(id: 0, name: Some(sumAcc), value: 6)
----transfer:----sumACC=LongAccumulator(id: 0, name: Some(sumAcc), value: 3)
----action:----sumACC=LongAccumulator(id: 0, name: Some(sumAcc), value: 6)
----transfer:----sumACC=LongAccumulator(id: 0, name: Some(sumAcc), value: 10)
----action:----sumACC=LongAccumulator(id: 0, name: Some(sumAcc), value: 14)
----transfer:----sumACC=LongAccumulator(id: 0, name: Some(sumAcc), value: 19)
----action:----sumACC=LongAccumulator(id: 0, name: Some(sumAcc), value: 24)
----result:----sumACC=LongAccumulator(id: 0, name: Some(sumAcc), value: 30)
累加器add操作執行了兩次,結果是30不是15,
四,累加器級別
累加器是application級別的,如果一個application中有多個行動操作或者有檢查點(檢查點是一個獨立的job,參考:https://cangchen.blog.csdn.net/article/details/122020410)的情況,累加器的值可能與預期的不一致,
4.1,檢查點與累加器
def main(args: Array[String]): Unit = {
log.info("-------begin---------")
val sparkConnf=new SparkConf().setAppName("acc").setMaster("local[1]")
val sparkContext=new SparkContext(sparkConnf)
sparkContext.setCheckpointDir(".")
val rdd = sparkContext.parallelize(Array(1, 2, 3, 4,5), 2)
val sumAcc = sparkContext.longAccumulator("sumAcc")
val mapRDD = rdd.map(num =>{
sumAcc.add(num)
num
})
mapRDD.checkpoint()
mapRDD.foreach(num=>{
println(num)
})
println("----result:----sumACC="+sumAcc)
sparkContext.stop()
}
結果:
----result:----sumACC=LongAccumulator(id: 0, name: Some(sumAcc), value: 30)
累加器的add操作是在轉換操作map中執行的,計算時執行了兩次,第一次是在行動操作foreach 提交的job中執行的,第二次是檢查點提交的job中執行,要避免檢查點重復執行add,可使用cache,
def main(args: Array[String]): Unit = {
log.info("-------begin---------")
val sparkConnf=new SparkConf().setAppName("acc").setMaster("local[1]")
val sparkContext=new SparkContext(sparkConnf)
sparkContext.setCheckpointDir(".")
val rdd = sparkContext.parallelize(Array(1, 2, 3, 4,5), 2)
val sumAcc = sparkContext.longAccumulator("sumAcc")
val mapRDD = rdd.map(num =>{
sumAcc.add(num)
num
})
mapRDD.checkpoint()
mapRDD.cache()
mapRDD.foreach(num=>{
println(num)
})
println("----result:----sumACC="+sumAcc)
sparkContext.stop()
}
結果:
----result:----sumACC=LongAccumulator(id: 0, name: Some(sumAcc), value: 15)
cache會快取資料避免rdd的compute函式再次呼叫,所以累加器只執行了一次,
4.2 多個行動操作與累加器
def main(args: Array[String]): Unit = {
log.info("-------begin---------")
val sparkConnf=new SparkConf().setAppName("acc").setMaster("local[1]")
val sparkContext=new SparkContext(sparkConnf)
// sparkContext.setCheckpointDir(".")
val rdd = sparkContext.parallelize(Array(1, 2, 3, 4,5), 2)
val sumAcc = sparkContext.longAccumulator("sumAcc")
val mapRDD = rdd.map(num =>{
sumAcc.add(num)
num
})
// mapRDD.checkpoint()
//mapRDD.cache()
mapRDD.foreach(num=>{
println(num)
})
mapRDD.foreach(num=>{
println(num)
})
println("----result:----sumACC="+sumAcc)
sparkContext.stop()
}
結果:
----result:----sumACC=LongAccumulator(id: 0, name: Some(sumAcc), value: 30)
兩個行動操作生成兩個獨立的job,累加器執行了兩次,同樣使用cache也能避免這種情況:
def main(args: Array[String]): Unit = {
log.info("-------begin---------")
val sparkConnf=new SparkConf().setAppName("acc").setMaster("local[1]")
val sparkContext=new SparkContext(sparkConnf)
// sparkContext.setCheckpointDir(".")
val rdd = sparkContext.parallelize(Array(1, 2, 3, 4,5), 2)
val sumAcc = sparkContext.longAccumulator("sumAcc")
val mapRDD = rdd.map(num =>{
sumAcc.add(num)
num
})
// mapRDD.checkpoint()
mapRDD.cache()
mapRDD.foreach(num=>{
println(num)
})
mapRDD.foreach(num=>{
println(num)
})
println("----result:----sumACC="+sumAcc)
sparkContext.stop()
}
----result:----sumACC=LongAccumulator(id: 0, name: Some(sumAcc), value: 15)
五,自定義累加器
自定義累加器實作wordcount功能
package cchen.spark.sparkcore
import org.apache.spark.internal.Logging
import org.apache.spark.rdd.RDD
import org.apache.spark.util.{AccumulatorV2, LongAccumulator}
import org.apache.spark.{SparkConf, SparkContext}
import scala.collection.{Iterator, mutable}
class WordCount
object WordCount extends Logging{
def main(args: Array[String]): Unit = {
log.info("-------begin---------")
val sparkConnf=new SparkConf().setAppName("acc").setMaster("local[3]")
val sparkContext=new SparkContext(sparkConnf)
val rdd = sparkContext.parallelize(Array("hello", "thank you", "thank you very much", "are you ok"), 2)
val flat_rdd = rdd.flatMap(_.split(" ",-1))
val wordCountACC = new WordCountAccumulator
sparkContext.register(wordCountACC, "wordCountACC")
flat_rdd.foreach(f=>{
wordCountACC.add(f)
})
flat_rdd.cache()
println("----reduceByKey result-----")
flat_rdd.map(f=>(f,1)).reduceByKey(_+_).collect().map(f=>println(f._1+":"+f._2))
println("----acc result-----")
wordCountACC.value.map(f=>println(f._1+":"+f._2))
sparkContext.stop()
}
}
class WordCountAccumulator extends AccumulatorV2[String,mutable.Map[String,Long]]{
private var map=mutable.Map[String,Long]()
override def isZero: Boolean = map.isEmpty
override def copy(): AccumulatorV2[String, mutable.Map[String,Long]] ={
val newACC=new WordCountAccumulator()
newACC.map=this.map.clone()
newACC
}
override def reset(): Unit = map.clear()
override def add(v: String): Unit = {
val value=map.getOrElse(v,0L)
map.put(v,value+1)
}
override def merge(other: AccumulatorV2[String, mutable.Map[String,Long]]): Unit =other match {
case o: WordCountAccumulator =>
o.value.map(f=>{
val value=map.getOrElse(f._1,0L)
map.put(f._1,value+f._2)
})
case _ =>
throw new UnsupportedOperationException(
s"Cannot merge ${this.getClass.getName} with ${other.getClass.getName}")
}
override def value: mutable.Map[String,Long]= map
}
結果:
----reduceByKey result-----
are:1
thank:2
hello:1
very:1
ok:1
you:3
much:1
----acc result-----
you:3
ok:1
are:1
very:1
thank:2
much:1
hello:1
使用累加器的方式速度應該更快,跟reduceByKey相比它沒有shuffle程序,
六,總結
1,累加器的add操作實際執行的地方與客戶代碼中呼叫的地方有關系,
如果add在RDD轉換操作中呼叫,則實際在RDD compute函式中被呼叫;如果在RDD行動操作中被呼叫,則在ResultTask runTask方法中被呼叫,都是在Excutor端執行
2,累加器的merge操作實際執行的地方在Driver端,每個job stage執行成功后執行累加器的merge操作,
3,累加器是application級別,多個行動操作或者單行動操作且有檢查點checkpoint的情況下要注意“多加”的現象,
轉載請註明出處,本文鏈接:https://www.uj5u.com/qita/390610.html
標籤:其他
上一篇:RabbitMQ——安裝
