主頁 >  其他 > 【硬剛大資料之面試篇】2021年從零到大資料專家面試篇之SparkSQL篇

【硬剛大資料之面試篇】2021年從零到大資料專家面試篇之SparkSQL篇

2021-07-30 07:03:12 其他

📢歡迎關注博客主頁:https://blog.csdn.net/u013411339
📢歡迎點贊 👍 收藏 ?留言 📝 ,歡迎留言交流!
📢本文由【王知無】原創,首發于 CSDN博客!
📢本文首發CSDN論壇,未經過官方和本人允許,嚴禁轉載!

本文是對《【硬剛大資料之學習路線篇】2021年從零到大資料專家的學習指南(全面升級版)》的面試部分補充,

硬剛大資料系列文章鏈接:

  • 2021年從零到大資料專家的學習指南(全面升級版)

  • 2021年從零到大資料專家面試篇之Hadoop/HDFS/Yarn篇

1.談談你對Spark SQL的理解

Spark SQL是一個用來處理結構化資料的Spark組件,前身是shark,但是shark過多的依賴于hive如采用hive的語法決議器、查詢優化器等,制約了Spark各個組件之間的相互集成,因此Spark SQL應運而生,

Spark SQL在汲取了shark諸多優勢如記憶體列存盤、兼容hive等基礎上,做了重新的構造,因此也擺脫了對hive的依賴,但同時兼容hive,除了采取記憶體列存盤優化性能,還引入了位元組碼生成技術、CBO和RBO對查詢等進行動態評估獲取最優邏輯計劃、物理計劃執行等,基于這些優化,使得Spark SQL相對于原有的SQL on Hadoop技術在性能方面得到有效提升,

同時,Spark SQL支持多種資料源,如JDBC、HDFS、HBase,它的內部組件,如SQL的語法決議器、分析器等支持重定義進行擴展,能更好的滿足不同的業務場景,與Spark Core無縫集成,提供了DataSet/DataFrame的可編程抽象資料模型,并且可被視為一個分布式的SQL查詢引擎,

在這里插入圖片描述

2.談談你對DataSet/DataFrame的理解

DataSet/DataFrame都是Spark SQL提供的分布式資料集,相對于RDD而言,除了記錄資料以外,還記錄表的schema資訊,

DataSet是自Spark1.6開始提供的一個分布式資料集,具有RDD的特性比如強型別、可以使用強大的lambda運算式,并且使用Spark SQL的優化執行引擎,DataSet API支持Scala和Java語言,不支持Python,但是鑒于Python的動態特性,它仍然能夠受益于DataSet API(如,你可以通過一個列名從Row里獲取這個欄位 row.columnName),類似的還有R語言,

DataFrame是DataSet以命名列方式組織的分布式資料集,類似于RDBMS中的表,或者R和Python中的 data frame,DataFrame API支持Scala、Java、Python、R,在Scala API中,DataFrame變成型別為Row的Dataset:
type DataFrame = Dataset[Row],

DataFrame在編譯期不進行資料中欄位的型別檢查,在運行期進行檢查,但DataSet則與之相反,因為它是強型別的,此外,二者都是使用catalyst進行sql的決議和優化,為了方便,以下統一使用DataSet統稱,

DataSet創建

DataSet通常通過加載外部資料或通過RDD轉化創建,

  • 1.加載外部資料
    以加載json和mysql為例:
val ds = sparkSession.read.json("/路徑/people.json")

val ds = sparkSession.read.format("jdbc")
.options(Map("url" -> "jdbc:mysql://ip:port/db",
"driver" -> "com.mysql.jdbc.Driver",
"dbtable" -> "tableName", "user" -> "root", "root" -> "123")).load()
  • 2.RDD轉換為DataSet
    通過RDD轉化創建DataSet,關鍵在于為RDD指定schema,通常有兩種方式(偽代碼):
1.定義一個case class,利用反射機制來推斷

1) 從HDFS中加載檔案為普通RDD
val lineRDD = sparkContext.textFile("hdfs://ip:port/person.txt").map(_.split(" "))

2) 定義case class(相當于表的schema)
case class Person(id:Int, name:String, age:Int)

3) 將RDD和case class關聯
val personRDD = lineRDD.map(x => Person(x(0).toInt, x(1), x(2).toInt))

4) 將RDD轉換成DataFrame
val ds= personRDD.toDF

2.手動定義一個schema StructType,直接指定在RDD上

val schemaString ="name age"

val schema =  StructType(schemaString.split(" ").map(fieldName => StructField(fieldName, StringType, true)))

val rowRdd = peopleRdd.map(p=>Row(p(0),p(1)))

val ds = sparkSession.createDataFrame(rowRdd,schema)

操作DataSet的兩種風格語法

DSL語法

1.查詢DataSet部分列中的內容

personDS.select(col("name"))
personDS.select(col("name"), col("age"))

2.查詢所有的name和age和salary,并將salary加1000

personDS.select(col("name"), col("age"), col("salary") + 1000)
personDS.select(personDS("name"), personDS("age"), personDS("salary") + 1000)

3.過濾age大于18的

personDS.filter(col("age") > 18)

4.按年齡進行分組并統計相同年齡的人數

personDS.groupBy("age").count()

注意:直接使用col方法需要import org.apache.spark.sql.functions._

SQL語法

如果想使用SQL風格的語法,需要將DataSet注冊成表

personDS.registerTempTable("person")
//查詢年齡最大的前兩名
val result = sparkSession.sql("select * from person order by age desc limit 2")
//保存結果為json檔案,注意:如果不指定存盤格式,則默認存盤為parquet
result.write.format("json").save("hdfs://ip:port/res2") 

3.說說Spark SQL的幾種使用方式

  • 1.sparksql-shell互動式查詢

就是利用Spark提供的shell命令列執行SQL

  • 2.編程

首先要獲取Spark SQL編程"入口":SparkSession(當然在早期版本中大家可能更熟悉的是SQLContext,如果是操作hive則為HiveContext),這里以讀取parquet為例:

val spark = SparkSession.builder()
.appName("example").master("local[*]").getOrCreate();
val df = sparkSession.read.format("parquet").load("/路徑/parquet檔案")

然后就可以針對df進行業務處理了,

  • 3.Thriftserver

beeline客戶端連接操作
啟動spark-sql的thrift服務,sbin/start-thriftserver.sh,啟動腳本中配置好Spark集群服務資源、地址等資訊,然后通過beeline連接thrift服務進行資料處理,
hive-jdbc驅動包來訪問spark-sql的thrift服務
在專案pom檔案中引入相關驅動包,跟訪問mysql等jdbc資料源類似,示例:

Class.forName("org.apache.hive.jdbc.HiveDriver")
val conn = DriverManager.getConnection("jdbc:hive2://ip:port", "root", "123");
try {
  val stat = conn.createStatement()
  val res = stat.executeQuery("select * from people limit 1")
  while (res.next()) {
    println(res.getString("name"))
  }
} catch {
  case e: Exception => e.printStackTrace()
} finally{
  if(conn!=null) conn.close()
}

4.說說Spark SQL 獲取Hive資料的方式

Spark SQL讀取hive資料的關鍵在于將hive的元資料作為服務暴露給Spark,除了通過上面thriftserver jdbc連接hive的方式,也可以通過下面這種方式:

首先,配置 $HIVE_HOME/conf/hive-site.xml,增加如下內容:

<property>
<name>hive.metastore.uris</name>
<value>thrift://ip:port</value>
</property>

然后,啟動hive metastore

最后,將hive-site.xml復制或者軟鏈到 S P A R K H O M E / c o n f / , 如 果 h i v e 的 元 數 據 存 儲 在 m y s q l 中 , 那 么 需 要 將 m y s q l 的 連 接 驅 動 j a r 包 如 m y s q l ? c o n n e c t o r ? j a v a ? 5.1.12. j a r 放 到 SPARK_HOME/conf/,如果hive的元資料存盤在mysql中,那么需要將mysql的連接驅動jar包如mysql-connector-java-5.1.12.jar放到 SPARKH?OME/conf/hivemysqlmysqljarmysql?connector?java?5.1.12.jarSPARK_HOME/lib/下,啟動spark-sql即可操作hive中的庫和表,而此時使用hive元資料獲取SparkSession的方式為:

val spark = SparkSession.builder()
.config(sparkConf).enableHiveSupport().getOrCreate()

5.分別說明UDF、UDAF、Aggregator

  • UDF
    UDF是最基礎的用戶自定義函式,以自定義一個求字串長度的udf為例:
val udf_str_length = udf{(str:String) => str.length}
spark.udf.register("str_length",udf_str_length)
val ds =sparkSession.read.json("路徑/people.json")
ds.createOrReplaceTempView("people")
sparkSession.sql("select str_length(address) from people")
  • UDAF
    定義UDAF,需要繼承抽象類UserDefinedAggregateFunction,它是弱型別的,下面的aggregator是強型別的,以求平均數為例:
import org.apache.spark.sql.{Row, SparkSession}
import org.apache.spark.sql.expressions.MutableAggregationBuffer
import org.apache.spark.sql.expressions.UserDefinedAggregateFunction
import org.apache.spark.sql.types._

object MyAverage extends UserDefinedAggregateFunction {
  // Data types of input arguments of this aggregate function
  def inputSchema: StructType = StructType(StructField("inputColumn", LongType) :: Nil)
  // Data types of values in the aggregation buffer
  def bufferSchema: StructType = {
    StructType(StructField("sum", LongType) :: StructField("count", LongType) :: Nil)
  }
  // The data type of the returned value
  def dataType: DataType = DoubleType
  // Whether this function always returns the same output on the identical input
  def deterministic: Boolean = true
  // Initializes the given aggregation buffer. The buffer itself is a `Row` that in addition to
  // standard methods like retrieving a value at an index (e.g., get(), getBoolean()), provides
  // the opportunity to update its values. Note that arrays and maps inside the buffer are still
  // immutable.
  def initialize(buffer: MutableAggregationBuffer): Unit = {
    buffer(0) = 0L
    buffer(1) = 0L
  }
  // Updates the given aggregation buffer `buffer` with new input data from `input`
  def update(buffer: MutableAggregationBuffer, input: Row): Unit = {
    if (!input.isNullAt(0)) {
      buffer(0) = buffer.getLong(0) + input.getLong(0)
      buffer(1) = buffer.getLong(1) + 1
    }
  }
  // Merges two aggregation buffers and stores the updated buffer values back to `buffer1`
  def merge(buffer1: MutableAggregationBuffer, buffer2: Row): Unit = {
    buffer1(0) = buffer1.getLong(0) + buffer2.getLong(0)
    buffer1(1) = buffer1.getLong(1) + buffer2.getLong(1)
  }
  // Calculates the final result
  def evaluate(buffer: Row): Double = buffer.getLong(0).toDouble / buffer.getLong(1)
}

// Register the function to access it
spark.udf.register("myAverage", MyAverage)

val df = spark.read.json("examples/src/main/resources/employees.json")
df.createOrReplaceTempView("employees")
df.show()
val result = spark.sql("SELECT myAverage(salary) as average_salary FROM employees")
result.show()
  • Aggregator
import org.apache.spark.sql.{Encoder, Encoders, SparkSession}
import org.apache.spark.sql.expressions.Aggregator

case class Employee(name: String, salary: Long)
case class Average(var sum: Long, var count: Long)

object MyAverage extends Aggregator[Employee, Average, Double] {
  // A zero value for this aggregation. Should satisfy the property that any b + zero = b
  def zero: Average = Average(0L, 0L)
  // Combine two values to produce a new value. For performance, the function may modify `buffer`
  // and return it instead of constructing a new object
  def reduce(buffer: Average, employee: Employee): Average = {
    buffer.sum += employee.salary
    buffer.count += 1
    buffer
  }
  // Merge two intermediate values
  def merge(b1: Average, b2: Average): Average = {
    b1.sum += b2.sum
    b1.count += b2.count
    b1
  }
  // Transform the output of the reduction
  def finish(reduction: Average): Double = reduction.sum.toDouble / reduction.count
  // Specifies the Encoder for the intermediate value type
  def bufferEncoder: Encoder[Average] = Encoders.product
  // Specifies the Encoder for the final output value type
  def outputEncoder: Encoder[Double] = Encoders.scalaDouble
}

val ds = spark.read.json("examples/src/main/resources/employees.json").as[Employee]
ds.show()
// Convert the function to a `TypedColumn` and give it a name
val averageSalary = MyAverage.toColumn.name("average_salary")
val result = ds.select(averageSalary)
result.show()

6.對比一下Spark SQL與HiveSQL

在這里插入圖片描述

7.說說Spark SQL決議查詢parquet格式Hive表如何獲取磁區欄位和查詢條件

問題現象

sparksql加載指定Hive磁區表路徑,生成的DataSet沒有磁區欄位,
如,sparkSession.read.format("parquet").load(s"${hive_path}"),hive_path為Hive磁區表在HDFS上的存盤路徑,

hive_path的幾種指定方式會導致這種情況的發生(test_partition是一個Hive外部磁區表,dt是它的磁區欄位,磁區資料有dt為20200101和20200102):

1.hive_path為"/spark/dw/test.db/test_partition/dt=20200101"
2.hive_path為"/spark/dw/test.db/test_partition/*"

因為牽涉到的原始碼比較多,這里僅以示例的程式中涉及到的原始碼中的class、object和方法,繪制成xmind圖如下,想細心研究的可以參考該圖到spark原始碼中進行分析,

在這里插入圖片描述

問題分析

這里主要給出幾個原始碼段,結合上述xmind圖理解:
在這里插入圖片描述
在這里插入圖片描述

在沒有指定引數basePath的情況下:

1.hive_path為/spark/dw/test.db/test_partition/dt=20200101

sparksql底層處理后得到的basePaths: Set(new Path(“/spark/dw/test.db/test_partition/dt=20200101”))【偽代碼】

leafDirs: Seq(new Path(“/spark/dw/test.db/test_partition/dt=20200101”))【偽代碼】

2.hive_path為/spark/dw/test.db/test_partition/*

sparksql底層處理后得到的basePaths: Set(new Path(“/spark/dw/test.db/test_partition/dt=20200101”),new Path(“/spark/dw/test.db/test_partition/dt=20200102”))【偽代碼】

leafDirs: Seq(new Path(“/spark/dw/test.db/test_partition/dt=20200101”),new Path(“/spark/dw/test.db/test_partition/dt=20200102”))【偽代碼】

這兩種情況導致原始碼if(basePaths.contains(currentPath))為true,還沒有決議磁區就重置變數finished為true跳出回圈,因此最終生成的結果也就沒有磁區欄位:
在這里插入圖片描述

解決方案

  • 1.在Spark SQL加載Hive表資料路徑時,指定引數basePath,如
    sparkSession.read.option(“basePath”,"/spark/dw/test.db/test_partition")

  • 2.主要重寫basePaths方法和parsePartition方法中的處理邏輯,同時需要修改其他涉及的代碼,由于涉及需要改寫的代碼比較多,可以封裝成工具

8.說說你對Spark SQL 小檔案問題處理的理解

在生產中,無論是通過SQL陳述句或者Scala/Java等代碼的方式使用Spark SQL處理資料,在Spark SQL寫資料時,往往會遇到生成的小檔案過多的問題,而管理這些大量的小檔案,是一件非常頭疼的事情,

大量的小檔案會影響Hadoop集群管理或者Spark在處理資料時的穩定性:

1.Spark SQL寫Hive或者直接寫入HDFS,過多的小檔案會對NameNode記憶體管理等產生巨大的壓力,會影響整個集群的穩定運行

2.容易導致task數過多,如果超過引數spark.driver.maxResultSize的配置(默認1g),會拋出類似如下的例外,影響任務的處理

Caused by: org.apache.spark.SparkException: Job aborted due to stage failure: Total size of serialized results of 478 tasks (2026.0 MB) is bigger than spark.driver.maxResultSize (1024.0 MB)

當然可以通過調大spark.driver.maxResultSize的默認配置來解決問題,但如果不能從源頭上解決小檔案問題,以后還可能遇到類似的問題,
此外,Spark在處理任務時,一個磁區分配一個task進行處理,多個磁區并行處理,雖然并行處理能夠提高處理效率,但不是意味著task數越多越好,如果資料量不大,過多的task運行反而會影響效率,
最后,Spark中一個task處理一個磁區從而也會影響最終生成的檔案數,

在數倉建設中,產生小檔案過多的原因有很多種,比如:

1.流式處理中,每個批次的處理執行保存操作也會產生很多小檔案
2.為了解決資料更新問題,同一份資料保存了不同的幾個狀態,也容易導致檔案數過多

那么如何解決這種小檔案的問題呢?

  • 1.通過repartition或coalesce算子控制最后的DataSet的磁區數
    注意repartition和coalesce的區別

  • 2.將Hive風格的Coalesce and Repartition Hint 應用到Spark SQL
    需要注意這種方式對Spark的版本有要求,建議在Spark2.4.X及以上版本使用,示例:

INSERT ... SELECT /*+ COALESCE(numPartitions) */ ...
INSERT ... SELECT /*+ REPARTITION(numPartitions) */ ...
  • 3.小檔案定期合并可以定時通過異步的方式針對Hive磁區表的每一個磁區中的小檔案進行合并操作

上述只是給出3種常見的解決辦法,并且要結合實際用到的技術和場景去具體處理,比如對于HDFS小檔案過多,也可以通過生成HAR 檔案或者Sequence File來解決,

9.SparkSQL讀寫Hive metastore Parquet遇到過什么問題嗎?

Spark SQL為了更好的性能,在讀寫Hive metastore parquet格式的表時,會默認使用自己的Parquet SerDe,而不是采用Hive的SerDe進行序列化和反序列化,該行為可以通過配置引數spark.sql.hive.convertMetastoreParquet進行控制,默認true,

這里從表schema的處理角度而言,就必須注意Hive和Parquet兼容性,主要有兩個區別:
1.Hive是大小寫敏感的,但Parquet相反
2.Hive會將所有列視為nullable,但是nullability在parquet里有獨特的意義

由于上面的原因,在將Hive metastore parquet轉化為Spark SQL parquet時,需要兼容處理一下Hive和Parquet的schema,即需要對二者的結構進行一致化,主要處理規則是:

1.有相同名字的欄位必須要有相同的資料型別,忽略nullability,兼容處理的欄位應該保持Parquet側的資料型別,這樣就可以處理到nullability型別了(空值問題)
2.兼容處理的schema應只包含在Hive元資料里的schema資訊,主要體現在以下兩個方面:
(1)只出現在Parquet schema的欄位會被忽略
(2)只出現在Hive元資料里的欄位將會被視為nullable,并處理到兼容后的schema中

關于schema(或者說元資料metastore),Spark SQL在處理Parquet表時,同樣為了更好的性能,會快取Parquet的元資料資訊,此時,如果直接通過Hive或者其他工具對該Parquet表進行修改導致了元資料的變化,那么Spark SQL快取的元資料并不能同步更新,此時需要手動重繪Spark SQL快取的元資料,來確保元資料的一致性,方式如下:

// 第一種方式應用的比較多
1. sparkSession.catalog.refreshTable(s"${dbName.tableName}")
2. sparkSession.catalog.refreshByPath(s"${path}")

10.說說Spark SQL如何選擇join策略

在了解join策略選擇之前,首先看幾個先決條件:

  • 1. build table的選擇

Hash Join的第一步就是根據兩表之中較小的那一個構建哈希表,這個小表就叫做build table,大表則稱為probe table,因為需要拿小表形成的哈希表來"探測"它,原始碼如下:

/* 左表作為build table的條件,join型別需滿足:
   1. InnerLike:實作目前包括inner join和cross join
   2. RightOuter:right outer join
*/      
private def canBuildLeft(joinType: JoinType): Boolean = joinType match {
  case _: InnerLike | RightOuter => true
  case _ => false
}

/* 右表作為build table的條件,join型別需滿足(第1種是在業務開發中寫的SQL主要適配的):
   1. InnerLike、LeftOuter(left outer join)、LeftSemi(left semi join)、LeftAnti(left anti join)
   2. ExistenceJoin:only used in the end of optimizer and physical plans, we will not generate SQL for this join type
*/
private def canBuildRight(joinType: JoinType): Boolean = joinType match {
  case _: InnerLike | LeftOuter | LeftSemi | LeftAnti | _: ExistenceJoin => true
  case _ => false
}
  • 2. 滿足什么條件的表才能被廣播

如果一個表的大小小于或等于引數spark.sql.autoBroadcastJoinThreshold(默認10M)配置的值,那么就可以廣播該表,原始碼如下:

private def canBroadcastBySizes(joinType: JoinType, left: LogicalPlan, right: LogicalPlan)
  : Boolean = {
  val buildLeft = canBuildLeft(joinType) && canBroadcast(left)
  val buildRight = canBuildRight(joinType) && canBroadcast(right)
  buildLeft || buildRight
}

private def canBroadcast(plan: LogicalPlan): Boolean = {
  plan.stats.sizeInBytes >= 0 && plan.stats.sizeInBytes <= conf.autoBroadcastJoinThreshold
}

private def broadcastSideBySizes(joinType: JoinType, left: LogicalPlan, right: LogicalPlan)
  : BuildSide = {
  val buildLeft = canBuildLeft(joinType) && canBroadcast(left)
  val buildRight = canBuildRight(joinType) && canBroadcast(right)
  
  // 最侄訓呼叫broadcastSide
  broadcastSide(buildLeft, buildRight, left, right)
}

除了通過上述表的大小滿足一定條件之外,我們也可以通過直接在Spark SQL中顯示使用hint方式(/*+ BROADCAST(small_table) */),直接指定要廣播的表,原始碼如下:

private def canBroadcastByHints(joinType: JoinType, left: LogicalPlan, right: LogicalPlan)
  : Boolean = {
  val buildLeft = canBuildLeft(joinType) && left.stats.hints.broadcast
  val buildRight = canBuildRight(joinType) && right.stats.hints.broadcast
  buildLeft || buildRight
}

private def broadcastSideByHints(joinType: JoinType, left: LogicalPlan, right: LogicalPlan)
  : BuildSide = {
  val buildLeft = canBuildLeft(joinType) && left.stats.hints.broadcast
  val buildRight = canBuildRight(joinType) && right.stats.hints.broadcast
  
  // 最侄訓呼叫broadcastSide
  broadcastSide(buildLeft, buildRight, left, right)
}

無論是通過表大小進行廣播還是根據是否指定hint進行表廣播,最終都會呼叫broadcastSide,來決定應該廣播哪個表:

private def broadcastSide(
     canBuildLeft: Boolean,
     canBuildRight: Boolean,
     left: LogicalPlan,
     right: LogicalPlan): BuildSide = {

   def smallerSide =
     if (right.stats.sizeInBytes <= left.stats.sizeInBytes) BuildRight else BuildLeft

  if (canBuildRight && canBuildLeft) {
    // 如果左表和右表都能作為build table,則將根據表的統計資訊,確定physical size較小的表作為build table(即使兩個表都被指定了hint)
    smallerSide
  } else if (canBuildRight) {
     // 上述條件不滿足,優先判斷右表是否滿足build條件,滿足則廣播右表,否則,接著判斷左表是否滿足build條件
    BuildRight
  } else if (canBuildLeft) {
    BuildLeft
  } else {
    // 如果左表和右表都不能作為build table,則將根據表的統計資訊,確定physical size較小的表作為build table,目前主要用于broadcast nested loop join
    smallerSide
  }
}

從上述原始碼可知,即使用戶指定了廣播hint,實際執行時,不一定按照hint的表進行廣播,

  • 3. 是否可構造本地HashMap

應用于Shuffle Hash Join中,原始碼如下:

// 邏輯計劃的單個磁區足夠小到構建一個hash表
// 注意:要求磁區數是固定的,如果磁區數是動態的,還需滿足其他條件
private def canBuildLocalHashMap(plan: LogicalPlan): Boolean = {
  // 邏輯計劃的physical size小于spark.sql.autoBroadcastJoinThreshold * spark.sql.shuffle.partitions(默認200)時,即可構造本地HashMap
  plan.stats.sizeInBytes < conf.autoBroadcastJoinThreshold * conf.numShufflePartitions
}

SparkSQL目前主要實作了3種join:Broadcast Hash Join、ShuffledHashJoin、Sort Merge Join,那么Catalyst在處理SQL陳述句時,是依據什么規則進行join策略選擇的呢?

1. Broadcast Hash Join

主要根據hint和size進行判斷是否滿足條件,

case ExtractEquiJoinKeys(joinType, leftKeys, rightKeys, condition, left, right)
   if canBroadcastByHints(joinType, left, right) =>
   val buildSide = broadcastSideByHints(joinType, left, right)
   Seq(joins.BroadcastHashJoinExec(
     leftKeys, rightKeys, joinType, buildSide, condition, planLater(left), planLater(right)))

// broadcast hints were not specified, so need to infer it from size and configuration.
case ExtractEquiJoinKeys(joinType, leftKeys, rightKeys, condition, left, right)
   if canBroadcastBySizes(joinType, left, right) =>
   val buildSide = broadcastSideBySizes(joinType, left, right)
   Seq(joins.BroadcastHashJoinExec(
     leftKeys, rightKeys, joinType, buildSide, condition, planLater(left), planLater(right)))

2. Shuffle Hash Join

選擇Shuffle Hash Join需要同時滿足以下條件:

  1. spark.sql.join.preferSortMergeJoin為false,即Shuffle Hash Join優先于Sort Merge Join
  2. 右表或左表是否能夠作為build table
  3. 是否能構建本地HashMap
  4. 以右表為例,它的邏輯計劃大小要遠小于左表大小(默認3倍)

上述條件優先檢查右表,

case ExtractEquiJoinKeys(joinType, leftKeys, rightKeys, condition, left, right)
    if !conf.preferSortMergeJoin && canBuildRight(joinType) && canBuildLocalHashMap(right)
      && muchSmaller(right, left) ||
      !RowOrdering.isOrderable(leftKeys) =>
   Seq(joins.ShuffledHashJoinExec(
     leftKeys, rightKeys, joinType, BuildRight, condition, planLater(left), planLater(right)))

case ExtractEquiJoinKeys(joinType, leftKeys, rightKeys, condition, left, right)
     if !conf.preferSortMergeJoin && canBuildLeft(joinType) && uildLocalHashMap(left)
       && muchSmaller(left, right) ||
       !RowOrdering.isOrderable(leftKeys) =>
    Seq(joins.ShuffledHashJoinExec(
      leftKeys, rightKeys, joinType, BuildLeft, condition, planLater(left), planLater(right)))
      
private def muchSmaller(a: LogicalPlan, b: LogicalPlan): Boolean = {
  a.stats.sizeInBytes * 3 <= b.stats.sizeInBytes
}

如果不滿足上述條件,但是如果參與join的表的key無法被排序,即無法使用Sort Merge Join,最終也會選擇Shuffle Hash Join,

!RowOrdering.isOrderable(leftKeys)

def isOrderable(exprs: Seq[Expression]): Boolean = exprs.forall(e => isOrderable(e.dataType))

3. Sort Merge Join

如果上面兩種join策略(Broadcast Hash Join和Shuffle Hash Join)都不符合條件,并且參與join的key是可排序的,就會選擇Sort Merge Join,

case ExtractEquiJoinKeys(joinType, leftKeys, rightKeys, condition, left, right)
   if RowOrdering.isOrderable(leftKeys) =>
   joins.SortMergeJoinExec(
     leftKeys, rightKeys, joinType, condition, planLater(left), planLater(right)) :: Nil

4. Without joining keys

Broadcast Hash Join、Shuffle Hash Join和Sort Merge Join都屬于經典的ExtractEquiJoinKeys(等值連接條件),

對于非ExtractEquiJoinKeys,則會優先檢查表是否可以被廣播(hint或者size),如果可以,則會使用BroadcastNestedLoopJoin(簡稱BNLJ),熟悉Nested Loop Join則不難理解BNLJ,主要卻別在于BNLJ加上了廣播表,

原始碼如下:

// Pick BroadcastNestedLoopJoin if one side could be broadcast
case j @ logical.Join(left, right, joinType, condition)
    if canBroadcastByHints(joinType, left, right) =>
  val buildSide = broadcastSideByHints(joinType, left, right)
  joins.BroadcastNestedLoopJoinExec(
    planLater(left), planLater(right), buildSide, joinType, condition) :: Nil

case j @ logical.Join(left, right, joinType, condition)
    if canBroadcastBySizes(joinType, left, right) =>
  val buildSide = broadcastSideBySizes(joinType, left, right)
  joins.BroadcastNestedLoopJoinExec(
    planLater(left), planLater(right), buildSide, joinType, condition) :: Nil

如果表不能被廣播,又細分為兩種情況:

  1. 若join型別InnerLike(關于InnerLike上面已有介紹)對量表直接進行笛卡爾積處理若
  2. 上述情況都不滿足,最終方案是選擇兩個表中physical size較小的表進行廣播,join策略仍為BNLJ

原始碼如下:

// Pick CartesianProduct for InnerJoin
case logical.Join(left, right, _: InnerLike, condition) =>
  joins.CartesianProductExec(planLater(left), planLater(right), condition) :: Nil

case logical.Join(left, right, joinType, condition) =>
  val buildSide = broadcastSide(
    left.stats.hints.broadcast, right.stats.hints.broadcast, left, right)
  // This join could be very slow or OOM
  joins.BroadcastNestedLoopJoinExec(
    planLater(left), planLater(right), buildSide, joinType, condition) :: Nil

11.講講Spark SQL中Not in Subquery為何低效以及如何規避

首先看個Not in Subquery的SQL:

// test_partition1 和 test_partition2為Hive外部磁區表
select * from test_partition1 t1 where t1.id not in (select id from test_partition2);

對應的完整的邏輯計劃和物理計劃為:

== Parsed Logical Plan ==
'Project [*]
+- 'Filter NOT 't1.id IN (list#3 [])
   :  +- 'Project ['id]
   :     +- 'UnresolvedRelation `test_partition2`
   +- 'SubqueryAlias `t1`
      +- 'UnresolvedRelation `test_partition1`

== Analyzed Logical Plan ==
id: string, name: string, dt: string
Project [id#4, name#5, dt#6]
+- Filter NOT id#4 IN (list#3 [])
   :  +- Project [id#7]
   :     +- SubqueryAlias `default`.`test_partition2`
   :        +- HiveTableRelation `default`.`test_partition2`, org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe, [id#7, name#8], [dt#9]
   +- SubqueryAlias `t1`
      +- SubqueryAlias `default`.`test_partition1`
         +- HiveTableRelation `default`.`test_partition1`, org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe, [id#4, name#5], [dt#6]

== Optimized Logical Plan ==
Join LeftAnti, ((id#4 = id#7) || isnull((id#4 = id#7)))
:- HiveTableRelation `default`.`test_partition1`, org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe, [id#4, name#5], [dt#6]
+- Project [id#7]
   +- HiveTableRelation `default`.`test_partition2`, org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe, [id#7, name#8], [dt#9]

== Physical Plan ==
BroadcastNestedLoopJoin BuildRight, LeftAnti, ((id#4 = id#7) || isnull((id#4 = id#7)))
:- Scan hive default.test_partition1 [id#4, name#5, dt#6], HiveTableRelation `default`.`test_partition1`, org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe, [id#4, name#5], [dt#6]
+- BroadcastExchange IdentityBroadcastMode
   +- Scan hive default.test_partition2 [id#7], HiveTableRelation `default

通過上述邏輯計劃和物理計劃可以看出,Spark SQL在對not in subquery處理,從邏輯計劃轉換為物理計劃時,會最終選擇BroadcastNestedLoopJoin(對應到Spark原始碼中BroadcastNestedLoopJoinExec.scala)策略,

提起BroadcastNestedLoopJoin,不得不提Nested Loop Join,它在很多RDBMS中得到應用,比如mysql,它的作業方式是回圈從一張表(outer table)中讀取資料,然后訪問另一張表(inner table,通常有索引),將outer表中的每一條資料與inner表中的資料進行join,類似一個嵌套的回圈并且在回圈的程序中進行資料的比對校驗是否滿足一定條件,

對于被連接的資料集較小的情況下,Nested Loop Join是個較好的選擇,但是當資料集非常大時,從它的執行原理可知,效率會很低甚至可能影響整個服務的穩定性,

而Spark SQL中的BroadcastNestedLoopJoin就類似于Nested Loop Join,只不過加上了廣播表(build table)而已,

BroadcastNestedLoopJoin是一個低效的物理執行計劃,內部實作將子查詢(select id from test_partition2)進行廣播,然后test_partition1每一條記錄通過loop遍歷廣播的資料去匹配是否滿足一定條件,

private def leftExistenceJoin(
   // 廣播的資料
    relation: Broadcast[Array[InternalRow]],
    exists: Boolean): RDD[InternalRow] = {
  assert(buildSide == BuildRight)
  
/* streamed對應物理計劃中:
  Scan hive default.test_partition1 [id#4, name#5, dt#6], HiveTableRelation `default`.`test_partition1`, org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe, [id#4, name#5], [dt#6] 
 */
  streamed.execute().mapPartitionsInternal { streamedIter =>
    val buildRows = relation.value
    val joinedRow = new JoinedRow

   // 條件是否定義,此處為Some(((id#4 = id#7) || isnull((id#4 = id#7))))
    if (condition.isDefined) {
      streamedIter.filter(l => 
        // exists主要是為了根據joinType來進一步條件判斷資料的回傳與否,此處joinType為LeftAnti
        buildRows.exists(r => boundCondition(joinedRow(l, r))) == exists
      )
      
      // else
    } else if (buildRows.nonEmpty == exists) {
      streamedIter
    } else {
      Iterator.empty
    }
  }
}

由于BroadcastNestedLoopJoin的低效率執行,可能導致長時間占用executor資源,影響集群性能,同時,因為子查詢的結果集要進行廣播,如果資料量特別大,對driver端也是一個嚴峻的考驗,極有可能帶來OOM的風險,因此,在實際生產中,要盡可能利用其他效率相對高的SQL來避免使用Not in Subquery,

雖然通過改寫Not in Subquery的SQL,進行低效率的SQL到高效率的SQL過渡,能夠避免上面所說的問題,但是這往往建立在我們發現任務執行慢甚至失敗,然后排查任務中的SQL,發現"問題"SQL的前提下,那么如何在任務執行前,就"檢查"出這樣的SQL,從而進行提前預警呢?

這里給出一個思路,就是決議Spark SQL計劃,根據Spark SQL的join策略匹配條件等,來判斷任務中是否使用了低效的Not in Subquery進行預警,然后通知業務方進行修改,同時,我們在實際完成資料的ETL處理等分析時,也要事前避免類似的低性能SQL,

12.說說SparkSQL中產生笛卡爾積的幾種典型場景以及處理策略

Spark SQL幾種產生笛卡爾積的典型場景

首先來看一下在Spark SQL中產生笛卡爾積的幾種典型SQL:

  1. join陳述句中不指定on條件
select * from test_partition1 join test_partition2;
  1. join陳述句中指定不等值連接
select * from test_partition1 t1 inner join test_partition2 t2 on t1.name <> t2.name;
  1. join陳述句on中用or指定連接條件
select * from test_partition1 t1 join test_partition2 t2 on t1.id = t2.id or t1.name = t2.name;
  1. join陳述句on中用||指定連接條件
select * from test_partition1 t1 join test_partition2 t2 on t1.id = t2.id || t1.name = t2.name;

除了上述舉的幾個典型例子,實際業務開發中產生笛卡爾積的原因多種多樣,

同時需要注意,在一些SQL中即使滿足了上述4種規則之一也不一定產生笛卡爾積,比如,對于join陳述句中指定不等值連接條件的下述SQL不會產生笛卡爾積:

--在Spark SQL內部優化程序中針對join策略的選擇,最侄訓通過SortMergeJoin進行處理,
select * from test_partition1 t1 join test_partition2 t2 on t1.id = t2.i

此外,對于直接在SQL中使用cross join的方式,也不一定產生笛卡爾積,比如下述SQL:

-- Spark SQL內部優化程序中選擇了SortMergeJoin方式進行處理
select * from test_partition1 t1 cross  join test_partition2 t2 on t1.id = t2.id;

但是如果cross join沒有指定on條件同樣會產生笛卡爾積,
那么如何判斷一個SQL是否產生了笛卡爾積呢?

Spark SQL是否產生了笛卡爾積

以join陳述句不指定on條件產生笛卡爾積的SQL為例:

-- test_partition1和test_partition2是Hive磁區表
select * from test_partition1 join test_partition2;

通過Spark UI上SQL一欄查看上述SQL執行圖,如下:

可以看出,因為該join陳述句中沒有指定on連接查詢條件,導致了CartesianProduct即笛卡爾積,

再來看一下該join陳述句的邏輯計劃和物理計劃:

在這里插入圖片描述
可以看出,因為該join陳述句中沒有指定on連接查詢條件,導致了CartesianProduct即笛卡爾積,

再來看一下該join陳述句的邏輯計劃和物理計劃:

== Parsed Logical Plan ==
'GlobalLimit 1000
+- 'LocalLimit 1000
   +- 'Project [*]
      +- 'UnresolvedRelation `t`

== Analyzed Logical Plan ==
id: string, name: string, dt: string, id: string, name: string, dt: string
GlobalLimit 1000
+- LocalLimit 1000
   +- Project [id#84, name#85, dt#86, id#87, name#88, dt#89]
      +- SubqueryAlias `t`
         +- Project [id#84, name#85, dt#86, id#87, name#88, dt#89]
            +- Join Inner
               :- SubqueryAlias `default`.`test_partition1`
               :  +- HiveTableRelation `default`.`test_partition1`, org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe, [id#84, name#85], [dt#86]
               +- SubqueryAlias `default`.`test_partition2`
                  +- HiveTableRelation `default`.`test_partition2`, org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe, [id#87, name#88], [dt#89]

== Optimized Logical Plan ==
GlobalLimit 1000
+- LocalLimit 1000
   +- Join Inner
      :- HiveTableRelation `default`.`test_partition1`, org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe, [id#84, name#85], [dt#86]
      +- HiveTableRelation `default`.`test_partition2`, org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe, [id#87, name#88], [dt#89]

== Physical Plan ==
CollectLimit 1000
+- CartesianProduct
   :- Scan hive default.test_partition1 [id#84, name#85, dt#86], HiveTableRelation `default`.`test_partition1`, org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe, [id#84, name#85], [dt#86]
   +- Scan hive default.test_partition2 [id#87, name#88, dt#89], HiveTableRelation `default`.`test_partition2`, org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe, [id#87, name#88], [dt#89]

通過邏輯計劃到物理計劃,以及最終的物理計劃選擇CartesianProduct,可以分析得出該SQL最終確實產生了笛卡爾積,

Spark SQL中產生笛卡爾積的處理策略

Spark SQL中主要有ExtractEquiJoinKeys(Broadcast Hash Join、Shuffle Hash Join、Sort Merge Join,這3種是我們比較熟知的Spark SQL join)和Without joining keys(CartesianProduct、BroadcastNestedLoopJoin)join策略,

那么,如何判斷SQL是否產生了笛卡爾積就迎刃而解,

  1. 在利用Spark SQL執行SQL任務時,通過查看SQL的執行圖來分析是否產生了笛卡爾積,如果產生笛卡爾積,則將任務殺死,進行任務優化避免笛卡爾積,【不推薦,用戶需要到Spark UI上查看執行圖,并且需要對Spark UI界面功能等要了解,需要一定的專業性,(注意:這里之所以這樣說,是因為Spark SQL是計算引擎,面向的用戶角色不同,用戶不一定對Spark本身了解透徹,但熟悉SQL,對于做平臺的小伙伴兒,想必深有感觸)】
  2. 分析Spark SQL的邏輯計劃和物理計劃,通程序式決議計劃推斷SQL最終是否選擇了笛卡爾積執行策略,如果是,及時提示風險,具體可以參考Spark SQL join策略選擇的原始碼:
def apply(plan: LogicalPlan): Seq[SparkPlan] = plan match {
// --- BroadcastHashJoin --------------------------------------------------------------------
// broadcast hints were specified
case ExtractEquiJoinKeys(joinType, leftKeys, rightKeys, condition, left, right)
if canBroadcastByHints(joinType, left, right) =>
        val buildSide = broadcastSideByHints(joinType, left, right)
Seq(joins.BroadcastHashJoinExec(
          leftKeys, rightKeys, joinType, buildSide, condition, planLater(left), planLater(right)))
// broadcast hints were not specified, so need to infer it from size and configuration.
case ExtractEquiJoinKeys(joinType, leftKeys, rightKeys, condition, left, right)
if canBroadcastBySizes(joinType, left, right) =>
        val buildSide = broadcastSideBySizes(joinType, left, right)
Seq(joins.BroadcastHashJoinExec(
          leftKeys, rightKeys, joinType, buildSide, condition, planLater(left), planLater(right)))
// --- ShuffledHashJoin ---------------------------------------------------------------------
case ExtractEquiJoinKeys(joinType, leftKeys, rightKeys, condition, left, right)
if !conf.preferSortMergeJoin && canBuildRight(joinType) && canBuildLocalHashMap(right)
           && muchSmaller(right, left) ||
           !RowOrdering.isOrderable(leftKeys) =>
Seq(joins.ShuffledHashJoinExec(
          leftKeys, rightKeys, joinType, BuildRight, condition, planLater(left), planLater(right)))
case ExtractEquiJoinKeys(joinType, leftKeys, rightKeys, condition, left, right)
if !conf.preferSortMergeJoin && canBuildLeft(joinType) && canBuildLocalHashMap(left)
           && muchSmaller(left, right) ||
           !RowOrdering.isOrderable(leftKeys) =>
Seq(joins.ShuffledHashJoinExec(
          leftKeys, rightKeys, joinType, BuildLeft, condition, planLater(left), planLater(right)))
// --- SortMergeJoin ------------------------------------------------------------
case ExtractEquiJoinKeys(joinType, leftKeys, rightKeys, condition, left, right)
if RowOrdering.isOrderable(leftKeys) =>
        joins.SortMergeJoinExec(
          leftKeys, rightKeys, joinType, condition, planLater(left), planLater(right)) :: Nil
// --- Without joining keys ------------------------------------------------------------
// Pick BroadcastNestedLoopJoin if one side could be broadcast
case j @ logical.Join(left, right, joinType, condition)
if canBroadcastByHints(joinType, left, right) =>
        val buildSide = broadcastSideByHints(joinType, left, right)
        joins.BroadcastNestedLoopJoinExec(
          planLater(left), planLater(right), buildSide, joinType, condition) :: Nil
case j @ logical.Join(left, right, joinType, condition)
if canBroadcastBySizes(joinType, left, right) =>
        val buildSide = broadcastSideBySizes(joinType, left, right)
        joins.BroadcastNestedLoopJoinExec(
          planLater(left), planLater(right), buildSide, joinType, condition) :: Nil
// Pick CartesianProduct for InnerJoin
case logical.Join(left, right, _: InnerLike, condition) =>
        joins.CartesianProductExec(planLater(left), planLater(right), condition) :: Nil
case logical.Join(left, right, joinType, condition) =>
        val buildSide = broadcastSide(
left.stats.hints.broadcast, right.stats.hints.broadcast, left, right)
// This join could be very slow or OOM
        joins.BroadcastNestedLoopJoinExec(
          planLater(left), planLater(right), buildSide, joinType, condition) :: Nil
// --- Cases where this strategy does not apply ---------------------------------------------
case _ => Nil
    }

13.具體講講Spark SQL/Hive中的一些實用函式

字串函式

1. concat
對字串進行拼接:concat(str1, str2, …, strN) ,引數:str1、str2…是要進行拼接的字串,

-- return the concatenation of str1、str2、..., strN
-- SparkSQL
select concat('Spark', 'SQL');

2. concat_ws
在拼接的字串中間添加某種分隔符:concat_ws(sep, [str | array(str)]+),
引數1:分隔符,如 - ;引數2:要拼接的字串(可多個)

-- return the concatenation of the strings separated by sep
-- Spark-SQL
select concat_ws("-", "Spark", "SQL");

3. encode
設定編碼格式:encode(str, charset),
引數1:要進行編碼的字串 ;引數2:使用的編碼格式,如UTF-8

-- encode the first argument using the second argument character set
select encode("HIVE", "UTF-8");

4. decode
轉碼:decode(bin, charset),
引數1:進行轉碼的binary ;引數2:使用的轉碼格式,如UTF-8

-- decode the first argument using the second argument character set
select decode(encode("HIVE", "UTF-8"), "UTF-8");

5. format_string / printf
格式化字串:format_string(strfmt, obj, …)

-- returns a formatted string from printf-style format strings
select format_string("Spark SQL %d %s", 100, "days");

6. initcap / lower / upper
initcap:將每個單詞的首字母轉為大寫,其他字母小寫,單詞之間以空白分隔,
upper:全部轉為大寫,
lower:全部轉為小寫,

-- Spark Sql
select initcap("spaRk sql");

-- SPARK SQL
select upper("sPark sql");

-- spark sql
select lower("Spark Sql");

7. length
回傳字串的長度,

-- 回傳4
select length("Hive");

8. lpad / rpad
回傳固定長度的字串,如果長度不夠,用某種字符進行補全,
lpad(str, len, pad):左補全
rpad(str, len, pad):右補全
注意:如果引數str的長度大于引數len,則回傳的結果長度會被截取為長度為len的字串

-- vehi
select lpad("hi", 4, "ve");

-- hive
select rpad("hi", 4, "ve");

-- spar
select lpad("spark", 4, "ve");

9. trim / ltrim / rtrim
去除空格或者某種字符,
trim(str) / trim(trimStr, str):首尾去除,
ltrim(str) / ltrim(trimStr, str):左去除,
rtrim(str) / rtrim(trimStr, str):右去除,

-- hive
select trim(" hive ");

-- arkSQLS
SELECT ltrim("Sp", "SSparkSQLS") as tmp;

10. regexp_extract
正則提取某些字串

-- 2000
select regexp_extract("1000-2000", "(\\d+)-(\\d+)", 2);

11. regexp_replace
正則替換

-- r-r
select regexp_replace("100-200", "(\\d+)", "r");

12. repeat
repeat(str, n):復制給定的字串n次

-- aa
select repeat("a", 2);

13. instr / locate
回傳截取字串的位置,如果匹配的字串不存在,則回傳0

-- returns the (1-based) index of the first occurrence of substr in str.

-- 6
select instr("SparkSQL", "SQL");

-- 0
select locate("A", "fruit");

14. space
在字串前面加n個空格

select concat(space(2), "A");

15. split
split(str, regex):以某字符拆分字串 split(str, regex)

-- ["one","two"]
select split("one two", " ");

16. substr / substring_index

-- k SQL
select substr("Spark SQL", 5);

-- 從后面開始截取,回傳SQL
select substr("Spark SQL", -3);

-- k
select substr("Spark SQL", 5, 1);

-- org.apache,注意:如果引數3為負值,則從右邊取值
select substring_index("org.apache.spark", ".", 2);

17. translate
替換某些字符為指定字符

-- The translate will happen when any character in the string matches the character in the `matchingString`
-- A1B2C3
select translate("AaBbCc", "abc", "123");

JSON函式

  1. get_json_object
-- v2
select get_json_object('{"k1": "v1", "k2": "v2"}', '$.k2');
  1. from_json
select tmp.k from  (
select from_json('{"k": "fruit", "v": "apple"}','k STRING, v STRING', map("","")) as tmp
);
  1. to_json
-- 可以把所有欄位轉化為json字串,然后表示成value欄位
select to_json(struct(*)) AS value;

時間函式

  1. current_date / current_timestamp
    獲取當前時間
select current_date;

select current_timestamp;
  1. 從日期時間中提取欄位/格式化時間
    1)year、month、day、dayofmonth、hour、minute、second
-- 20
select day("2020-12-20");

2)dayofweek(1 = Sunday, 2 = Monday, …, 7 = Saturday)、dayofye

-- 7
select dayofweek("2020-12-12");

3)weekofyear(date)

/**
   * Extracts the week number as an integer from a given date/timestamp/string.
   *
   * A week is considered to start on a Monday and week 1 is the first week with more than 3 days,
   * as defined by ISO 8601
   *
   * @return An integer, or null if the input was a string that could not be cast to a date
   * @group datetime_funcs
   * @since 1.5.0
   */
  def weekofyear(e: Column): Column = withExpr { WeekOfYear(e.expr) }

-- 50
select weekofyear("2020-12-12");

4)trunc
截取某部分的日期,其他部分默認為01,第二個引數: YEAR、YYYY、YY、MON、MONTH、MM

-- 2020-01-01
select trunc("2020-12-12", "YEAR");

-- 2020-12-01
select trunc("2020-12-12", "MM");

5)date_trunc
引數:YEAR、YYYY、YY、MON、MONTH、MM、DAY、DD、HOUR、MINUTE、SECOND、WEEK、QUARTER

-- 2012-12-12 09:00:00
select date_trunc("HOUR" ,"2012-12-12T09:32:05.359");

6)date_format
按照某種格式格式化時間

-- 2020-12-12
select date_format("2020-12-12 12:12:12", "yyyy-MM-dd");

3. 日期時間轉換

1)unix_timestamp
回傳當前時間的unix時間戳,

select unix_timestamp();

-- 1609257600
select unix_timestamp("2020-12-30", "yyyy-MM-dd");

2)from_unixtime
將unix epoch(1970-01-01 00:00:00 UTC)中的秒數轉換為以給定格式表示當前系統時區中該時刻的時間戳的字串,

select from_unixtime(1609257600, "yyyy-MM-dd HH:mm:ss");

3)to_unix_timestamp
將時間轉化為時間戳,

-- 1609257600
select to_unix_timestamp("2020-12-30", "yyyy-MM-dd");

4)to_date / date
將時間字串轉化為date,

-- 2020-12-30
select to_date("2020-12-30 12:30:00");
select date("2020-12-30");

5)to_timestamp
將時間字串轉化為timestamp,

select to_timestamp("2020-12-30 12:30:00");

6)quarter
從給定的日期/時間戳/字串中提取季度,

-- 4
select quarter("2020-12-30");

4. 日期、時間計算

1)months_between(end, start)
回傳兩個日期之間的月數,引數1為截止時間,引數2為開始時間

-- 3.94959677
select months_between("1997-02-28 10:30:00", "1996-10-30");

2)add_months
回傳某日期后n個月后的日期,

-- 2020-12-28
select add_months("2020-11-28", 1);

3)last_day(date)
回傳某個時間的當月最后一天

-- 2020-12-31
select last_day("2020-12-01");

4)next_day(start_date, day_of_week)
回傳某時間后the first date基于specified day of the week,
引數1:開始時間,
引數2:Mon、Tue、Wed、Thu、Fri、Sat、Sun,

-- 2020-12-07
select next_day("2020-12-01", "Mon");

5)date_add(start_date, num_days)
回傳指定時間增加num_days天后的時間

-- 2020-12-02
select date_add("2020-12-01", 1);

6)datediff(endDate, startDate)
兩個日期相差的天數

-- 3
select datediff("2020-12-01", "2020-11-28");

7)關于UTC時間

-- to_utc_timestamp(timestamp, timezone) - Given a timestamp like '2017-07-14 02:40:00.0', interprets it as a time in the given time zone, and renders that time as a timestamp in UTC. For example, 'GMT+1' would yield '2017-07-14 01:40:00.0'.

select to_utc_timestamp("2020-12-01", "Asia/Seoul") ;

-- from_utc_timestamp(timestamp, timezone) - Given a timestamp like '2017-07-14 02:40:00.0', interprets it as a time in UTC, and renders that time as a timestamp in the given time zone. For example, 'GMT+1' would yield '2017-07-14 03:40:00.0'.

select from_utc_timestamp("2020-12-01", "Asia/Seoul");

常用的開窗函式

開窗函式格式通常滿足:

function_name([argument_list])
OVER (
[PARTITION BY partition_expression,…]
[ORDER BY sort_expression, … [ASC|DESC]])

function_name: 函式名稱,比如SUM()、AVG()

partition_expression:磁區列

sort_expression:排序列

注意:以下舉例涉及的表employee中欄位含義:name(員工姓名)、dept_no(部門編號)、salary(工資)

  1. cume_dist

如果按升序排列,則統計:小于等于當前值的行數/總行數(number of rows ≤ current row)/(total number of rows),如果是降序排列,則統計:大于等于當前值的行數/總行數,用于累計統計,

  1. lead(value_expr[,offset[,default]])

用于統計視窗內往下第n行值,第一個引數為列名,第二個引數為往下第n行(可選,默認為1),第三個引數為默認值(當往下第n行為NULL時候,取默認值,如不指定,則為NULL),

  1. lag(value_expr[,offset[,default]])

與lead相反,用于統計視窗內往上第n行值,第一個引數為列名,第二個引數為往上第n行(可選,默認為1),第三個引數為默認值(當往上第n行為NULL時候,取默認值,如不指定,則為NULL),

  1. first_value

取分組內排序后,截止到當前行,第一個值,

  1. last_value

取分組內排序后,截止到當前行,最后一個值,

  1. rank

對組中的資料進行排名,如果名次相同,則排名也相同,但是下一個名次的排名序號會出現不連續,比如查找具體條件的topN行,RANK() 排序為 (1,2,2,4),

  1. dense_rank

dense_rank函式的功能與rank函式類似,dense_rank函式在生成序號時是連續的,而rank函式生成的序號有可能不連續,當出現名次相同時,則排名序號也相同,而下一個排名的序號與上一個排名序號是連續的,
DENSE_RANK() 排序為 (1,2,2,3),

  1. SUM/AVG/MIN/MAX

資料:

id        time                  pv
1         2015-04-10      1
1         2015-04-11      3
1         2015-04-12      6
1         2015-04-13      3
1         2015-04-14      2
2         2015-05-15      8
2         2015-05-16      6

結果:

SELECT id,
       time,
       pv,
       SUM(pv) OVER(PARTITION BY id ORDER BY time) AS pv1, -- 默認為從起點到當前行
       SUM(pv) OVER(PARTITION BY id ORDER BY time ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW) AS pv2, --從起點到當前行,結果同pv1
       SUM(pv) OVER(PARTITION BY id) AS pv3,                                --分組內所有行
       SUM(pv) OVER(PARTITION BY id ORDER BY time ROWS BETWEEN 3 PRECEDING AND CURRENT ROW) AS pv4,   --當前行+往前3行
       SUM(pv) OVER(PARTITION BY id ORDER BY time ROWS BETWEEN 3 PRECEDING AND 1 FOLLOWING) AS pv5,    --當前行+往前3行+往后1行
       SUM(pv) OVER(PARTITION BY id ORDER BY time ROWS BETWEEN CURRENT ROW AND UNBOUNDED FOLLOWING) AS pv6   ---當前行+往后所有行  
FROM data;
  1. NTILE

NTILE(n),用于將分組資料按照順序切分成n片,回傳當前切片值,

NTILE不支持ROWS BETWEEN,比如 NTILE(2) OVER(PARTITION BY cookieid ORDER BY createtime ROWS BETWEEN 3 PRECEDING AND CURRENT ROW),

如果切片不均勻,默認增加第一個切片的分布,

  1. ROW_NUMBER

從1開始,按照順序,生成分組內記錄的序列,

比如,按照pv降序排列,生成分組內每天的pv名次
ROW_NUMBER() 的應用場景非常多,比如獲取分組內排序第一的記錄,

轉載請註明出處,本文鏈接:https://www.uj5u.com/qita/290814.html

標籤:其他

上一篇:利用python爬取汽車之家,需要買車的程式員可以學

下一篇:Hadoop學習筆記(四)HDFS單機版Windows安裝

標籤雲
其他(157675) Python(38076) JavaScript(25376) Java(17977) C(15215) 區塊鏈(8255) C#(7972) AI(7469) 爪哇(7425) MySQL(7132) html(6777) 基礎類(6313) sql(6102) 熊猫(6058) PHP(5869) 数组(5741) R(5409) Linux(5327) 反应(5209) 腳本語言(PerlPython)(5129) 非技術區(4971) Android(4554) 数据框(4311) css(4259) 节点.js(4032) C語言(3288) json(3245) 列表(3129) 扑(3119) C++語言(3117) 安卓(2998) 打字稿(2995) VBA(2789) Java相關(2746) 疑難問題(2699) 细绳(2522) 單片機工控(2479) iOS(2429) ASP.NET(2402) MongoDB(2323) 麻木的(2285) 正则表达式(2254) 字典(2211) 循环(2198) 迅速(2185) 擅长(2169) 镖(2155) 功能(1967) .NET技术(1958) Web開發(1951) python-3.x(1918) HtmlCss(1915) 弹簧靴(1913) C++(1909) xml(1889) PostgreSQL(1872) .NETCore(1853) 谷歌表格(1846) Unity3D(1843) for循环(1842)

熱門瀏覽
  • 網閘典型架構簡述

    網閘架構一般分為兩種:三主機的三系統架構網閘和雙主機的2+1架構網閘。 三主機架構分別為內端機、外端機和仲裁機。三機無論從軟體和硬體上均各自獨立。首先從硬體上來看,三機都用各自獨立的主板、記憶體及存盤設備。從軟體上來看,三機有各自獨立的作業系統。這樣能達到完全的三機獨立。對于“2+1”系統,“2”分為 ......

    uj5u.com 2020-09-10 02:00:44 more
  • 如何從xshell上傳檔案到centos linux虛擬機里

    如何從xshell上傳檔案到centos linux虛擬機里及:虛擬機CentOs下執行 yum -y install lrzsz命令,出現錯誤:鏡像無法找到軟體包 前言 一、安裝lrzsz步驟 二、上傳檔案 三、遇到的問題及解決方案 總結 前言 提示:其實很簡單,往虛擬機上安裝一個上傳檔案的工具 ......

    uj5u.com 2020-09-10 02:00:47 more
  • 一、SQLMAP入門

    一、SQLMAP入門 1、判斷是否存在注入 sqlmap.py -u 網址/id=1 id=1不可缺少。當注入點后面的引數大于兩個時。需要加雙引號, sqlmap.py -u "網址/id=1&uid=1" 2、判斷文本中的請求是否存在注入 從文本中加載http請求,SQLMAP可以從一個文本檔案中 ......

    uj5u.com 2020-09-10 02:00:50 more
  • Metasploit 簡單使用教程

    metasploit 簡單使用教程 浩先生, 2020-08-28 16:18:25 分類專欄: kail 網路安全 linux 文章標簽: linux資訊安全 編輯 著作權 metasploit 使用教程 前言 一、Metasploit是什么? 二、準備作業 三、具體步驟 前言 Msfconsole ......

    uj5u.com 2020-09-10 02:00:53 more
  • 游戲逆向之驅動層與用戶層通訊

    驅動層代碼: #pragma once #include <ntifs.h> #define add_code CTL_CODE(FILE_DEVICE_UNKNOWN,0x800,METHOD_BUFFERED,FILE_ANY_ACCESS) /* 更多游戲逆向視頻www.yxfzedu.com ......

    uj5u.com 2020-09-10 02:00:56 more
  • 北斗電力時鐘(北斗授時服務器)讓網路資料更精準

    北斗電力時鐘(北斗授時服務器)讓網路資料更精準 北斗電力時鐘(北斗授時服務器)讓網路資料更精準 京準電子科技官微——ahjzsz 近幾年,資訊技術的得了快速發展,互聯網在逐漸普及,其在人們生活和生產中都得到了廣泛應用,并且取得了不錯的應用效果。計算機網路資訊在電力系統中的應用,一方面使電力系統的運行 ......

    uj5u.com 2020-09-10 02:01:03 more
  • 【CTF】CTFHub 技能樹 彩蛋 writeup

    ?碎碎念 CTFHub:https://www.ctfhub.com/ 筆者入門CTF時時剛開始刷的是bugku的舊平臺,后來才有了CTFHub。 感覺不論是網頁UI設計,還是題目質量,賽事跟蹤,工具軟體都做得很不錯。 而且因為獨到的金幣制度的確讓人有一種想去刷題賺金幣的感覺。 個人還是非常喜歡這個 ......

    uj5u.com 2020-09-10 02:04:05 more
  • 02windows基礎操作

    我學到了一下幾點 Windows系統目錄結構與滲透的作用 常見Windows的服務詳解 Windows埠詳解 常用的Windows注冊表詳解 hacker DOS命令詳解(net user / type /md /rd/ dir /cd /net use copy、批處理 等) 利用dos命令制作 ......

    uj5u.com 2020-09-10 02:04:18 more
  • 03.Linux基礎操作

    我學到了以下幾點 01Linux系統介紹02系統安裝,密碼啊破解03Linux常用命令04LAMP 01LINUX windows: win03 8 12 16 19 配置不繁瑣 Linux:redhat,centos(紅帽社區版),Ubuntu server,suse unix:金融機構,證券,銀 ......

    uj5u.com 2020-09-10 02:04:30 more
  • 05HTML

    01HTML介紹 02頭部標簽講解03基礎標簽講解04表單標簽講解 HTML前段語言 js1.了解代碼2.根據代碼 懂得挖掘漏洞 (POST注入/XSS漏洞上傳)3.黑帽seo 白帽seo 客戶網站被黑帽植入劫持代碼如何處理4.熟悉html表單 <html><head><title>TDK標題,描述 ......

    uj5u.com 2020-09-10 02:04:36 more
最新发布
  • 2023年最新微信小程式抓包教程

    01 開門見山 隔一個月發一篇文章,不過分。 首先回顧一下《微信系結手機號資料庫被脫庫事件》,我也是第一時間得知了這個訊息,然后跟蹤了整件事情的經過。下面是這起事件的相關截圖以及近日流出的一萬條資料樣本: 個人認為這件事也沒什么,還不如關注一下之前45億快遞資料查詢渠道疑似在近日復活的訊息。 訊息是 ......

    uj5u.com 2023-04-20 08:48:24 more
  • web3 產品介紹:metamask 錢包 使用最多的瀏覽器插件錢包

    Metamask錢包是一種基于區塊鏈技術的數字貨幣錢包,它允許用戶在安全、便捷的環境下管理自己的加密資產。Metamask錢包是以太坊生態系統中最流行的錢包之一,它具有易于使用、安全性高和功能強大等優點。 本文將詳細介紹Metamask錢包的功能和使用方法。 一、 Metamask錢包的功能 數字資 ......

    uj5u.com 2023-04-20 08:47:46 more
  • vulnhub_Earth

    前言 靶機地址->>>vulnhub_Earth 攻擊機ip:192.168.20.121 靶機ip:192.168.20.122 參考文章 https://www.cnblogs.com/Jing-X/archive/2022/04/03/16097695.html https://www.cnb ......

    uj5u.com 2023-04-20 07:46:20 more
  • 從4k到42k,軟體測驗工程師的漲薪史,給我看哭了

    清明節一過,盲猜大家已經無心上班,在數著日子準備過五一,但一想到銀行卡里的余額……瞬間心情就不美麗了。最近,2023年高校畢業生就業調查顯示,本科畢業月平均起薪為5825元。調查一出,便有很多同學表示自己又被平均了。看著這一資料,不免讓人想到前不久中國青年報的一項調查:近六成大學生認為畢業10年內會 ......

    uj5u.com 2023-04-20 07:44:00 more
  • 最新版本 Stable Diffusion 開源 AI 繪畫工具之中文自動提詞篇

    🎈 標簽生成器 由于輸入正向提示詞 prompt 和反向提示詞 negative prompt 都是使用英文,所以對學習母語的我們非常不友好 使用網址:https://tinygeeker.github.io/p/ai-prompt-generator 這個網址是為了讓大家在使用 AI 繪畫的時候 ......

    uj5u.com 2023-04-20 07:43:36 more
  • 漫談前端自動化測驗演進之路及測驗工具分析

    隨著前端技術的不斷發展和應用程式的日益復雜,前端自動化測驗也在不斷演進。隨著 Web 應用程式變得越來越復雜,自動化測驗的需求也越來越高。如今,自動化測驗已經成為 Web 應用程式開發程序中不可或缺的一部分,它們可以幫助開發人員更快地發現和修復錯誤,提高應用程式的性能和可靠性。 ......

    uj5u.com 2023-04-20 07:43:16 more
  • CANN開發實踐:4個DVPP記憶體問題的典型案例解讀

    摘要:由于DVPP媒體資料處理功能對存放輸入、輸出資料的記憶體有更高的要求(例如,記憶體首地址128位元組對齊),因此需呼叫專用的記憶體申請介面,那么本期就分享幾個關于DVPP記憶體問題的典型案例,并給出原因分析及解決方法。 本文分享自華為云社區《FAQ_DVPP記憶體問題案例》,作者:昇騰CANN。 DVPP ......

    uj5u.com 2023-04-20 07:43:03 more
  • msf學習

    msf學習 以kali自帶的msf為例 一、msf核心模塊與功能 msf模塊都放在/usr/share/metasploit-framework/modules目錄下 1、auxiliary 輔助模塊,輔助滲透(埠掃描、登錄密碼爆破、漏洞驗證等) 2、encoders 編碼器模塊,主要包含各種編碼 ......

    uj5u.com 2023-04-20 07:42:59 more
  • Halcon軟體安裝與界面簡介

    1. 下載Halcon17版本到到本地 2. 雙擊安裝包后 3. 步驟如下 1.2 Halcon軟體安裝 界面分為四大塊 1. Halcon的五個助手 1) 影像采集助手:與相機連接,設定相機引數,采集影像 2) 標定助手:九點標定或是其它的標定,生成標定檔案及內參外參,可以將像素單位轉換為長度單位 ......

    uj5u.com 2023-04-20 07:42:17 more
  • 在MacOS下使用Unity3D開發游戲

    第一次發博客,先發一下我的游戲開發環境吧。 去年2月份買了一臺MacBookPro2021 M1pro(以下簡稱mbp),這一年來一直在用mbp開發游戲。我大致分享一下我的開發工具以及使用體驗。 1、Unity 官網鏈接: https://unity.cn/releases 我一般使用的Apple ......

    uj5u.com 2023-04-20 07:40:19 more