主頁 >  其他 > PySpark與GraphFrames的安裝與使用

PySpark與GraphFrames的安裝與使用

2022-02-19 08:16:29 其他

PySpark環境搭建

配置hadoop

spark訪問本地檔案并執行運算時,可能會遇到權限問題或是dll錯誤,這是因為spark需要使用到Hadoop的winutils和hadoop.dll,首先我們必須配置好Hadoop相關的環境,可以到github下載:https://github.com/4ttty/winutils

gitcode提供了鏡像加速:https://gitcode.net/mirrors/4ttty/winutils

我選擇了使用這個倉庫提供的最高的Hadoop版本3.0.0將其解壓到D:\deploy\hadoop-3.0.0目錄下,然后配置環境變數:

image-20220217132732434

我們還需要將對應的hadoop.dll復制到系統中,用命令表達就是:

copy D:\deploy\hadoop-3.0.0\bin\hadoop.dll C:\Windows\System32

不過這步需要擁有管理員權限才可以操作,

為了能夠在任何地方使用winutils命令工具,將%HADOOP_HOME%\bin目錄加入環境變數中:

image-20220217133520475

安裝pyspark與Java

首先,我們安裝spark當前(2022-2-17)的最新版本:

pip install pyspark==3.2.1

需要注意pyspark的版本決定了jdk的最高版本,例如假如安裝2.4.5版本的pyspark就只能安裝1.8版本的jdk,否則會報出java.lang.IllegalArgumentException: Unsupported class file major version 55的錯誤,

這是因為pyspark內置了Scala,而Scala是基于jvm的編程語言,Scala與jdk的版本存在兼容性問題,JDK與scala的版本兼容性表:

JDK versionMinimum Scala versionsRecommended Scala versions
172.13.6, 2.12.15 (forthcoming)2.13.6, 2.12.15 (forthcoming)
162.13.5, 2.12.142.13.6, 2.12.14
13, 14, 152.13.2, 2.12.112.13.6, 2.12.14
122.13.1, 2.12.92.13.6, 2.12.14
112.13.0, 2.12.4, 2.11.122.13.6, 2.12.14, 2.11.12
82.13.0, 2.12.0, 2.11.0, 2.10.22.13.6, 2.12.14, 2.11.12, 2.10.7
6, 72.11.0, 2.10.02.11.12, 2.10.7

當前3.2.1版本的pyspark內置的Scala版本為2.12.15,意味著jdk17與其以下的所有版本都支持,

這里我依然選擇安裝jdk8的版本:

image-20220217143447453

測驗一下:

>java -version
java version "1.8.0_201"
Java(TM) SE Runtime Environment (build 1.8.0_201-b09)
Java HotSpot(TM) 64-Bit Server VM (build 25.201-b09, mixed mode)

jdk11的詳細安裝教程(jdk1.8在官網只有安裝包,無zip綠化壓縮包):

綠化版Java11的環境配置與Python呼叫Java
https://xxmdmst.blog.csdn.net/article/details/118366166

graphframes安裝

pip安裝當前最新的graphframes:

pip install graphframes==0.6

然后在官網下載graphframes的jar包,

下載地址:https://spark-packages.org/package/graphframes/graphframes

由于安裝的pyspark版本是3.2,所以這里我選擇了這個jar包:

image-20220217144829403

然后將該jar包放入pyspark安裝目錄的jars目錄下:

image-20220217145105414

pyspark安裝位置可以通過pip查看:

C:\Users\ASUS>pip show pyspark
Name: pyspark
Version: 3.2.1
Summary: Apache Spark Python API
Home-page: https://github.com/apache/spark/tree/master/python
Author: Spark Developers
Author-email: dev@spark.apache.org
License: http://www.apache.org/licenses/LICENSE-2.0
Location: d:\miniconda3\lib\site-packages
Requires: py4j
Required-by:

使用方法

學習pyspark的最佳路徑是官網:https://spark.apache.org/docs/latest/quick-start.html

在下面的網頁,官方提供了在線jupyter:

https://spark.apache.org/docs/latest/api/python/getting_started/index.html

image-20220218174237283

啟動spark并讀取資料

本地模式啟動spark:

from pyspark.sql import SparkSession, Row

spark = SparkSession \
    .builder \
    .appName("Python Spark") \
    .master("local[*]") \
    .getOrCreate()
sc = spark.sparkContext
spark

image-20220217153008268

SparkSession輸出的內容中包含了spark的web頁面,新標簽頁打開頁面后大致效果如上,

點擊Environment選項卡可以查看當前環境中的變數:

image-20220217153531616

啟動hive支持

找到pyspark的安裝位置,例如我的電腦在D:\Miniconda3\Lib\site-packages\pyspark

手動創建conf目錄并將hive-site.xml組態檔復制到其中,如果hive使用了MySQL作為原資料庫,則還需要將MySQL對應的驅動jar包放入spark的jars目錄下,

創建spark會話物件時可通過enableHiveSupport()開啟hive支持:

from pyspark.sql import SparkSession
from pyspark.sql import Row

spark = SparkSession \
    .builder \
    .appName("Python Spark SQL Hive integration example") \
    .enableHiveSupport() \
    .getOrCreate()
sc = spark.sparkContext
spark

spark訪問hive自己創建的表有可能會出現如下的權限報錯:

Caused by: java.lang.RuntimeException: The root scratch dir: /tmp/hive on HDFS s
hould be writable. Current permissions are: rwx------

是因為當前用戶不具備對\tmp\hive的操作權限:

>winutils ls \tmp\hive
drwx------ 1 BUILTIN\Administrators XIAOXIAOMING\None 0 May  4 2020 \tmp\hive

把\tmp\hive目錄的權限改為777即可順利訪問:

>winutils chmod 777 \tmp\hive

>winutils ls \tmp\hive
drwxrwxrwx 1 BUILTIN\Administrators XIAOXIAOMING\None 0 May  4 2020 \tmp\hive

Spark的DataFrame與RDD

從spark2.x開始將RDD和DataFrame的API統一抽象成dataset,DataFrame就是Dataset[Row],RDD則是Dataset.rdd,可以將DataFrame理解為包含結構化資訊的RDD,

將含row的RDD轉換為DataFrame只需要呼叫toDF方法或SparkSession的createDataFrame方法即可,也可以傳入schema覆寫型別或名稱設定,

DataFrame的基礎api

DataFrame默認支持DSL風格語法,例如:

//查看DataFrame中的內容
df.show()

//查看DataFrame部分列中的內容
df.select(df['name'], df['age'] + 1).show()
df.select("name").show()

//列印DataFrame的Schema資訊
df.printSchema()

//過濾age大于等于 21 的
df.filter(df['age'] > 21).show()

//按年齡進行分組并統計相同年齡的人數
personDF.groupBy("age").count().show()

將DataFrame注冊成表或視圖之后即可進行純SQL操作:

df.createOrReplaceTempView("people")
//df.createTempView("t_person")

//查詢年齡最大的前兩名
spark.sql("select * from t_person order by age desc limit 2").show()
//顯示表的Schema資訊
spark.sql("desc t_person").show()

Pyspark可以直接很方便的注冊udf并直接使用:

strlen = spark.udf.register("len", lambda x: len(x))
print(spark.sql("SELECT len('test') length").collect())
print(spark.sql("SELECT 'foo' AS text").select(strlen("text").alias('length')).collect())

執行結果:

[Row(length='4')]
[Row(length='3')]

RDD的簡介

DataFrame的本質是對RDD的包裝,可以理解為DataFrame=RDD[Row]+schema,

RDD(A Resilient Distributed Dataset)叫做彈性可伸縮分布式資料集,是Spark中最基本的資料抽象,它代表一個不可變、自動容錯、可伸縮性、可磁區、里面的元素可并行計算的集合,

在每一個RDD內部具有五大屬性:

  1. 具有一系列的磁區
  2. 一個計算函式操作于每一個切片
  3. 具有一個對其他RDD的依賴串列
  4. 對于 key-value RDDs具有一個Partitioner磁區器
  5. 存盤每一個切片最佳計算位置

一組分片(Partition),即資料集的基本組成單位,對于RDD來說,每個分片都會被一個計算任務處理,并決定并行計算的粒度,用戶可以在創建RDD時指定RDD的分片個數,如果沒有指定,那么就會采用默認值,默認值就是程式所分配到的CPU Core的數目,

**一個計算每個磁區的函式,**Spark中RDD的計算是以分片為單位的,每個RDD都會實作compute函式以達到這個目的,compute函式會對迭代器進行復合,不需要保存每次計算的結果,

**RDD之間的依賴關系,**RDD的每次轉換都會生成一個新的RDD,所以RDD之間就會形成類似于流水線一樣的前后依賴關系,在部分磁區資料丟失時,Spark可以通過這個依賴關系重新計算丟失的磁區資料,而不是對RDD的所有磁區進行重新計算,

**一個Partitioner,即RDD的分片函式,**當前Spark中實作了兩種型別的分片函式,一個是基于哈希的HashPartitioner,另外一個是基于范圍的RangePartitioner,只有對于于key-value的RDD,才會有Partitioner,非key-value的RDD的Parititioner的值是None,Partitioner函式不但決定了RDD本身的分片數量,也決定了parent RDD Shuffle輸出時的分片數量,

**一個串列,存盤存取每個Partition的優先位置(preferred location),**對于一個HDFS檔案來說,這個串列保存的就是每個Partition所在的塊的位置,按照“移動資料不如移動計算”的理念,Spark在進行任務調度的時候,會盡可能地將計算任務分配到其所要處理資料塊的存盤位置,

RDD的API概覽

RDD包含Transformation APIAction API,Transformation API都是延遲加載的只是記住這些應用到基礎資料集上的轉換動作,只有當執行Action API時這些轉換才會真正運行,

Transformation API產生的兩類RDD最重要,分別是MapPartitionsRDDShuffledRDD

產生MapPartitionsRDD的算子有map、keyBy、keys、values、flatMap、mapValues 、flatMapValues、mapPartitions、mapPartitionsWithIndex、glom、filter和filterByRange ,其中用的最多的是mapflatMap,但任何產生MapPartitionsRDD的算子都可以直接使用mapPartitionsmapPartitionsWithIndex實作,

產生ShuffledRDD的算子有combineByKeyWithClassTag、combineByKey、aggregateByKey、foldByKey 、reduceByKey 、distinct、groupByKey、groupBy、partitionBy、sortByKey 和 repartitionAndSortWithinPartitions,

combineByKey到groupByKey 底層均是呼叫combineByKeyWithClassTag方法:

@Experimental
def combineByKeyWithClassTag[C](
    createCombiner: V => C,
    mergeValue: (C, V) => C,
    mergeCombiners: (C, C) => C)(implicit ct: ClassTag[C]): RDD[(K, C)] = self.withScope {
  combineByKeyWithClassTag(createCombiner,mergeValue,mergeCombiners
       ,defaultPartitioner(self))
}

def combineByKey[C](
    createCombiner: V => C,
    mergeValue: (C, V) => C,
    mergeCombiners: (C, C) => C): RDD[(K, C)] = self.withScope {
  combineByKeyWithClassTag(createCombiner, mergeValue, mergeCombiners)(null)
}

def groupByKey(partitioner: Partitioner): RDD[(K, Iterable[V])] = self.withScope {
  val createCombiner = (v: V) => CompactBuffer(v)
  val mergeValue = (buf: CompactBuffer[V], v: V) => buf += v
  val mergeCombiners = (c1: CompactBuffer[V], c2: CompactBuffer[V]) => c1 ++= c2
  val bufs = combineByKeyWithClassTag[CompactBuffer[V]](
    createCombiner, mergeValue, mergeCombiners, partitioner, mapSideCombine = false)
  bufs.asInstanceOf[RDD[(K, Iterable[V])]]
}

三個重要引數的含義:

  • createCombiner:根據每個磁區的第一個元素操作產生一個初始值
  • mergeValue:對每個磁區內部的元素進行迭代合并
  • mergeCombiners:對所有磁區的合并結果進行合并

groupByKey的partitioner未指定時會傳入默認的defaultPartitioner,例如:

val a = sc.parallelize(List("dog", "tiger", "lion", "cat", "spider", "eagle"), 2).keyBy(_.length)
a.groupByKey.collect

res9: Array[(Int, Iterable[String])] = Array((4,CompactBuffer(lion)), (6,CompactBuffer(spider)), (3,CompactBuffer(dog, cat)), (5,CompactBuffer(tiger, eagle)))

aggregateByKey:每個磁區使用zeroValue作為初始值,迭代每一個元素用seqOp進行合并,對所有磁區的結果用combOp進行合并,例如:

val pairRDD = sc.parallelize(List( ("cat",2), ("cat", 5), ("mouse", 4),("cat", 12), ("dog", 12), ("mouse", 2)), 2)
pairRDD.aggregateByKey(0)(math.max(_, _), _ + _).collect
res6: Array[(String, Int)] = Array((dog,12), (cat,17), (mouse,6))
pairRDD.aggregateByKey(100)(math.max(_, _), _ + _).collect
res7: Array[(String, Int)] = Array((dog,100), (cat,200), (mouse,200))

reduceByKey :每個磁區迭代每一個元素用func進行合并,對所有磁區的結果用func再進行合并,例如:

val a = sc.parallelize(List("dog", "tiger", "lion", "cat", "panther", "eagle"), 2)
val b = a.map(x => (x.length, x))
b.reduceByKey(_ + _).collect

Action API有:

動作含義
reduce(func)通過func函式聚集RDD中的所有元素,這個功能必須是課交換且可并聯的
collect()在驅動程式中,以陣列的形式回傳資料集的所有元素
count()回傳RDD的元素個數
first()回傳RDD的第一個元素(類似于take(1))
take(n)回傳一個由資料集的前n個元素組成的陣列
takeSample(withReplacement*,*num, [seed])回傳一個陣列,該陣列由從資料集中隨機采樣的num個元素組成,可以選擇是否用亂數替換不足的部分,seed用于指定亂數生成器種子
takeOrdered(n, [ordering])排序并取前N個元素
saveAsTextFile(path)將資料集的元素以textfile的形式保存到HDFS檔案系統或者其他支持的檔案系統,對于每個元素,Spark將會呼叫toString方法,將它裝換為檔案中的文本
saveAsSequenceFile(path)將資料集中的元素以Hadoop sequencefile的格式保存到指定的目錄下,可以使HDFS或者其他Hadoop支持的檔案系統,
saveAsObjectFile(path)將RDD中的元素用NullWritable作為key,實際元素作為value保存為sequencefile格式
countByKey()針對(K,V)型別的RDD,回傳一個(K,Int)的map,表示每一個key對應的元素個數,
foreach(func)在資料集的每一個元素上,運行函式func進行更新,

spark模擬實作mapreduce版wordcount:

object MapreduceWordcount {
  def main(args: Array[String]): Unit = {
    import org.apache.spark._
    val sc: SparkContext = new SparkContext(new SparkConf().setAppName("wordcount").setMaster("local[*]"))
    sc.setLogLevel("WARN")
    
    import org.apache.hadoop.io.{LongWritable, Text}
    import org.apache.hadoop.mapred.TextInputFormat
    import org.apache.spark.rdd.HadoopRDD

    import scala.collection.mutable.ArrayBuffer
    
    def map(k: LongWritable, v: Text, collect: ArrayBuffer[(String, Int)]) = {
      for (word <- v.toString.split("\\s+"))
        collect += ((word, 1))
    }
    def reduce(key: String, value: Iterator[Int], collect: ArrayBuffer[(String, Int)]) = {
      collect += ((key, value.sum))
    }
    val rdd = sc.hadoopFile("/hdfs/wordcount/in1/*", classOf[TextInputFormat], classOf[LongWritable], classOf[Text], 2)
      .asInstanceOf[HadoopRDD[LongWritable, Text]]
      .mapPartitionsWithInputSplit((split, it) =>{
        val collect: ArrayBuffer[(String, Int)] = ArrayBuffer[(String, Int)]()
        it.foreach(kv => map(kv._1, kv._2, collect))
        collect.toIterator
      })
      .repartitionAndSortWithinPartitions(new HashPartitioner(2))
      .mapPartitions(it => {
        val collect: ArrayBuffer[(String, Int)] = ArrayBuffer[(String, Int)]()
        var lastKey: String = ""
        var values: ArrayBuffer[Int] = ArrayBuffer[Int]()
        for ((currKey, value) <- it) {
          if (!currKey.equals(lastKey)) {
            if (values.length != 0)
              reduce(lastKey, values.toIterator, collect)
            values.clear()
          }
          values += value
          lastKey = currKey
        }
        if (values.length != 0) reduce(lastKey, values.toIterator, collect)
        collect.toIterator
      })
    rdd.foreach(println)
  }
}

各類RDD

image-20220218181635289

  • ShuffledRDD :表示需要走Shuffle程序的網路傳輸

  • CoalescedRDD :用于將一臺機器的多個磁區合并成一個磁區

  • CartesianRDD :對兩個RDD的所有元素產生笛卡爾積

  • MapPartitionsRDD :用于對每個磁區的資料進行特定的處理

  • CoGroupedRDD :用于將2~4個rdd,按照key進行連接聚合

  • SubtractedRDD :用于對2個RDD求差集

  • UnionRDDPartitionerAwareUnionRDD :用于對2個RDD求并集

  • ZippedPartitionsRDD2:zip拉鏈操作產生的RDD

  • ZippedWithIndexRDD:給每一個元素標記一個自增編號

  • PartitionwiseSampledRDD:用于對rdd的元素按照指定的百分比進行隨機采樣

當我們需要給Datafream添加自增列時,可以使用zipWithUniqueId方法:

from pyspark.sql.types import StructType, LongType

schema = data.schema.add(StructField("id", LongType()))
rowRDD = data.rdd.zipWithUniqueId().map(lambda t: t[0]+Row(t[1]))
data = rowRDD.toDF(schema)
data.show()

API用法詳情可參考:https://spark.apache.org/docs/latest/api/python/reference/api/pyspark.RDD.html#pyspark.RDD

cache&checkpoint

RDD通過persist方法或cache方法可以將前面的計算結果快取,但是并不是這兩個方法被呼叫時立即快取,而是觸發后面的action時,該RDD將會被快取在計算節點的記憶體中,并供后面重用,

rdd.persist()

checkpoint的原始碼注釋可以看到:

  • 標記該RDD作為檢查點,
  • 它將被保存在通過SparkContext#setCheckpointDir方法設定的檢查點目錄中
  • 它所參考的所有父RDD參考將全部被移除
  • 這個方法在這個RDD上必須在所有job執行前運行,
  • 強烈建議將這個RDD快取在記憶體中,否則這個保存檔案的計算任務將重新計算,

從中我們得知,在執行checkpoint方法時,最好同時,將該RDD快取起來,否則,checkpoint也會產生一個計算任務,

sc.setCheckpointDir("checkpoint")
rdd.cache()
rdd.checkpoint()

graphframes 的用法

GraphFrame是將Spark中的Graph演算法統一到DataFrame介面的Graph操作介面,為Scala、Java和Python提供了統一的圖處理API,

Graphframes是開源專案,原始碼工程如下:https://github.com/graphframes/graphframes

可以參考:

  • 官網:https://graphframes.github.io/graphframes/docs/_site/index.html
  • GraphFrames用戶指南-Python — Databricks檔案:https://docs.databricks.com/spark/latest/graph-analysis/graphframes/user-guide-python.html

在GraphFrames中圖的頂點(Vertex)和邊(edge)都是以DataFrame形式存盤的:

  • 頂點DataFrame:必須包含列名為“id”的列,用于作為頂點的唯一標識
  • 邊DataFrame:必須包含列名為“src”和“dst”的列,根據唯一標識id標識關系

創建圖的示例:

from graphframes import GraphFrame

vertices = spark.createDataFrame([
    ("a", "Alice", 34),
    ("b", "Bob", 36),
    ("c", "Charlie", 30),
    ("d", "David", 29),
    ("e", "Esther", 32),
    ("f", "Fanny", 36),
    ("g", "Gabby", 60)], ["id", "name", "age"])

edges = spark.createDataFrame([
    ("a", "b", "friend"),
    ("b", "c", "follow"),
    ("c", "b", "follow"),
    ("f", "c", "follow"),
    ("e", "f", "follow"),
    ("e", "d", "friend"),
    ("d", "a", "friend"),
    ("a", "e", "friend")
], ["src", "dst", "relationship"])

# 生成圖
g = GraphFrame(vertices, edges)

GraphFrame提供三種視圖:

print("頂點表視圖:")
graph.vertices.show() # graph.vertices 就是原始的vertices
print("邊表視圖:")
graph.edges.show() # graph.edges 就是原始的 edges
print("三元組視圖:")
graph.triplets.show()

獲取頂點的度、入度和出度:

# 頂點的度
graph.degrees.show()
# 頂點的入度
graph.inDegrees.show()
# 頂點的出度
graph.outDegrees.show()

Motif finding (模式發現)

示例:

# 多個路徑條件
motif = graph.find("(a)-[e]->(b); (b)-[e2]->(a)")
# 在搜索的結果上進行過濾
motif.filter("b.age > 30")
# 不需要回傳路徑中的元素時,可以使用匿名頂點和邊
motif = graph.find("(start)-[]->()")
# 設定路徑不存在的條件
motif = graph.find("(a)-[]->(b); !(b)-[]->(a)")

假設我們要想給用戶推薦關注的人,可以找出這樣的關系:A關注B,B關注C,但是A未關注C,找出這樣的關系就可以把C推薦給A:

# Motif: A->B->C but not A->C
results = graph.find("(A)-[]->(B); (B)-[]->(C); !(A)-[]->(C)")
# 排除自己
results = results.filter("A.id != C.id")
# 選擇需要的列
results = results.select(results.A.id.alias("A"), results.C.id.alias("C"))
results.show()

結果:

+---+---+
|  A|  C|
+---+---+
|  e|  c|
|  e|  a|
|  d|  b|
|  a|  d|
|  f|  b|
|  d|  e|
|  a|  f|
|  a|  c|
+---+---+

Motif在查找路徑程序的程序中,還可以沿著路徑攜帶狀態,例如我們想要找出關系鏈有4個頂點,而且其中3條邊全部都是"friend"關系:

from pyspark.sql.functions import col, lit, when
from functools import reduce

chain4 = graph.find("(a)-[ab]->(b); (b)-[bc]->(c); (c)-[cd]->(d)")


def sumFriends(cnt, relationship):
    "定義下一個頂點更新狀態的條件:如果關系為friend則cnt+1"
    return when(relationship == "friend", cnt+1).otherwise(cnt)


# 將更新方法應用到整個鏈的,鏈上每有一個關系是 friend 就加一,鏈上共三個關系,
condition = reduce(lambda cnt, e: sumFriends(
    cnt, col(e).relationship), ["ab", "bc", "cd"], lit(0))

chainWith2Friends2 = chain4.where(condition >= 3)
chainWith2Friends2.show()

結果:

+---------------+--------------+---------------+--------------+---------------+--------------+---------------+
|              a|            ab|              b|            bc|              c|            cd|              d|
+---------------+--------------+---------------+--------------+---------------+--------------+---------------+
| {a, Alice, 34}|{a, e, friend}|{e, Esther, 32}|{e, d, friend}| {d, David, 29}|{d, a, friend}| {a, Alice, 34}|
|{e, Esther, 32}|{e, d, friend}| {d, David, 29}|{d, a, friend}| {a, Alice, 34}|{a, b, friend}|   {b, Bob, 36}|
| {d, David, 29}|{d, a, friend}| {a, Alice, 34}|{a, e, friend}|{e, Esther, 32}|{e, d, friend}| {d, David, 29}|
|{e, Esther, 32}|{e, d, friend}| {d, David, 29}|{d, a, friend}| {a, Alice, 34}|{a, e, friend}|{e, Esther, 32}|
+---------------+--------------+---------------+--------------+---------------+--------------+---------------+

Subgraphs 子圖

可以直接過濾其頂點或邊,dropIsolatedVertices()方法用于洗掉孤立沒有連接的點:

graph.filterVertices("age > 30").filterEdges("relationship = 'friend'").dropIsolatedVertices()

還可以基于模式發現獲取到的邊創建Subgraphs :

paths = graph.find("(a)-[e]->(b)")\
    .filter("e.relationship = 'follow'")\
    .filter("a.age < b.age")
# 抽取邊資訊e2 = paths.select("e.src", "e.dst", "e.relationship")
e2 = paths.select("e.*")
# 創建Subgraphs
g2 = GraphFrame(graph.vertices, e2)

GraphFrames支持的GraphX演算法

  • PageRank:查找圖中的重要頂點,

  • 廣度優先搜索(BFS):查找從一組頂點到另一組頂點的最短路徑

  • 連通組件(ConnectedComponents):為具備連接關系的頂點分配相同的組件ID

  • 強連通組件(StronglyConnectedConponents):根據每個頂點的強連通分量分配SCC,

  • 最短路徑(Shortest paths):查找從每個頂點到目標頂點集的最短路徑,

  • 三角形計數(TriangleCount):計算每個頂點所屬的三角形的數量,經常用于確定組的穩定性(相互連接的數量代表了穩定性)或作為其他網路度量(如聚類系數)的一部分,在社交網路分析中用來檢測社區,

  • 標簽傳播演算法(LPA):檢測圖中的社區,

pageRank演算法:

results = graph.pageRank(resetProbability=0.15, maxIter=10)
results.vertices.sort("pagerank", ascending=False).show()

結果:

+---+-------+---+-------------------+
| id|   name|age|           pagerank|
+---+-------+---+-------------------+
|  b|    Bob| 36| 2.7025217677349773|
|  c|Charlie| 30| 2.6667877057849627|
|  a|  Alice| 34| 0.4485115093698443|
|  e| Esther| 32| 0.3613490987992571|
|  f|  Fanny| 36|0.32504910549694244|
|  d|  David| 29|0.32504910549694244|
|  g|  Gabby| 60|0.17073170731707318|
+---+-------+---+-------------------+

可以設定起始頂點:

graph.pageRank(resetProbability=0.15, maxIter=10, sourceId="a")
graph.parallelPersonalizedPageRank(resetProbability=0.15, sourceIds=["a", "b", "c", "d"], maxIter=10)

廣度優先搜索BFS:

搜索從姓名叫Esther到年齡小于32的最小路徑:

paths = graph.bfs("name = 'Esther'", "age < 32")
paths.show()
+--------------+--------------+---------------+
|          from|            e0|             to|
+--------------+--------------+---------------+
|{a, Alice, 34}|{a, e, friend}|{e, Esther, 32}|
+--------------+--------------+---------------+

可以指定只能在指定的邊搜索:

graph.bfs("name = 'Esther'",
          "age < 32",
          edgeFilter="relationship != 'friend'",
          maxPathLength=4
          ).show()
+---------------+--------------+--------------+--------------+----------------+
|           from|            e0|            v1|            e1|              to|
+---------------+--------------+--------------+--------------+----------------+
|{e, Esther, 32}|{e, f, follow}|{f, Fanny, 36}|{f, c, follow}|{c, Charlie, 30}|
+---------------+--------------+--------------+--------------+----------------+

Connected components 連通組件:

必須先設定檢查點:

sc.setCheckpointDir("checkpoint")

graph.connectedComponents().show()

結果:

+---+-------+---+------------+
| id|   name|age|   component|
+---+-------+---+------------+
|  a|  Alice| 34|412316860416|
|  b|    Bob| 36|412316860416|
|  c|Charlie| 30|412316860416|
|  d|  David| 29|412316860416|
|  e| Esther| 32|412316860416|
|  f|  Fanny| 36|412316860416|
|  g|  Gabby| 60|146028888064|
+---+-------+---+------------+

可以看到僅g點在一個連通區域內,可以呼叫dropIsolatedVertices()方法,洗掉這種孤立的沒有連接的點:

graph.dropIsolatedVertices().connectedComponents().show()

結果:

+---+-------+---+------------+
| id|   name|age|   component|
+---+-------+---+------------+
|  a|  Alice| 34|412316860416|
|  b|    Bob| 36|412316860416|
|  c|Charlie| 30|412316860416|
|  d|  David| 29|412316860416|
|  e| Esther| 32|412316860416|
|  f|  Fanny| 36|412316860416|
+---+-------+---+------------+

Strongly connected components 強連通組件:

graph.stronglyConnectedComponents(maxIter=10).show()

Shortest paths 最短路徑:

每個頂點到a或d的最短路徑:

graph.shortestPaths(landmarks=["a", "d"]).show()
+---+-------+---+----------------+
| id|   name|age|       distances|
+---+-------+---+----------------+
|  g|  Gabby| 60|              {}|
|  f|  Fanny| 36|              {}|
|  e| Esther| 32|{a -> 2, d -> 1}|
|  d|  David| 29|{a -> 1, d -> 0}|
|  c|Charlie| 30|              {}|
|  b|    Bob| 36|              {}|
|  a|  Alice| 34|{a -> 0, d -> 2}|
+---+-------+---+----------------+

Triangle count 三角形計數:

graph.triangleCount().show()
+-----+---+-------+---+
|count| id|   name|age|
+-----+---+-------+---+
|    1|  a|  Alice| 34|
|    0|  b|    Bob| 36|
|    0|  c|Charlie| 30|
|    1|  d|  David| 29|
|    1|  e| Esther| 32|
|    0|  g|  Gabby| 60|
|    0|  f|  Fanny| 36|
+-----+---+-------+---+

說明頂點a/e/d構成三角形,

標簽傳播演算法(LPA):

graph.labelPropagation(maxIter=5).orderBy("label").show()
+---+-------+---+-------------+
| id|   name|age|        label|
+---+-------+---+-------------+
|  g|  Gabby| 60| 146028888064|
|  f|  Fanny| 36|1047972020224|
|  b|    Bob| 36|1047972020224|
|  a|  Alice| 34|1382979469312|
|  c|Charlie| 30|1382979469312|
|  e| Esther| 32|1460288880640|
|  d|  David| 29|1460288880640|
+---+-------+---+-------------+

PySpark3.X與pandas融合

Pyspark從3.0版本開始出現了pandas_udf裝飾器、applyInPandas和mapInPandas,基于這些方法,我們就可以使用熟悉的pandas的語法處理spark物件的資料,

首先創建幾條測驗資料,并啟動 Apache Arrow

spark.conf.set("spark.sql.execution.arrow.pyspark.enabled", "true")

df = spark.createDataFrame(
    [(1, 1.0), (1, 2.0), (2, 3.0), (2, 5.0), (2, 10.0)],
    ("id", "v"))
df.show()

自定義UDF和UDAF

pyspark暫不支持自定義UDTF,

使用pandas_udf裝飾器我們可以創建出基于pandas的udf自定義函式,在DSL的語法中可以被直接使用:

from pyspark.sql.functions import pandas_udf
import pandas as pd


@pandas_udf("double")
def multiply_func(a: pd.Series, b: pd.Series) -> pd.Series:
    return a * b


df.select(multiply_func("id", "v").alias("product")).show()

注冊函式和視圖后,可以直接在SQL中使用:

df.createOrReplaceTempView("t")
spark.udf.register("multiply", multiply_func)
spark.sql('select multiply(id, v) product from t').show()

結果均為:

+-------+
|product|
+-------+
|    1.0|
|    2.0|
|    6.0|
|   10.0|
|   20.0|
+-------+

還支持聚合函式和視窗函式:

from pyspark.sql import Window


@pandas_udf("double")
def mean_udf(v: pd.Series) -> float:
    return v.mean()


# 對欄位'v'進行求均值
df.select(mean_udf('v').alias("mean_v")).show()
# 按照‘id’分組,求'v'的均值
df.groupby("id").agg(mean_udf('v').alias("mean_v")).show()
# 按照‘id’分組,求'v'的均值,并賦值給新的一列
df.withColumn('mean_v', mean_udf("v").over(Window.partitionBy('id'))).show()

注冊到udf之后同樣可以直接使用SQL實作:

spark.udf.register("mean2", mean_udf)
spark.sql('select mean2(v) mean_v from t').show()
spark.sql('select id,mean2(v) mean_v from t group by id').show()
spark.sql('select id,v,mean2(v) over(partition by id) mean_v from t').show()

結果均為:

+--------+
| mean_v |
+--------+
|     4.2|
+--------+

+---+--------+
| id| mean_v |
+---+--------+
|  1|     1.5|
|  2|     6.0|
+---+--------+

+---+----+------+
| id|   v|mean_v|
+---+----+------+
|  1| 1.0|   1.5|
|  1| 2.0|   1.5|
|  2| 3.0|   6.0|
|  2| 5.0|   6.0|
|  2|10.0|   6.0|
+---+----+------+

分組聚合與JOIN

applyInPandas需要在datafream呼叫groupby之后才能使用:

def subtract_mean(pdf):
    v = pdf.v
    pdf['v1'] = v - v.mean()
    pdf['v2'] = v + v.mean()
    return pdf


t = df.groupby("id")
t.applyInPandas(
    subtract_mean, schema="id long, v double, v1 double, v2 double").show()

結果:

+---+----+----+----+
| id|   v|  v1|  v2|
+---+----+----+----+
|  1| 1.0|-0.5| 2.5|
|  1| 2.0| 0.5| 3.5|
|  2| 3.0|-3.0| 9.0|
|  2| 5.0|-1.0|11.0|
|  2|10.0| 4.0|16.0|
+---+----+----+----+

subtract_mean函式接收的是對應id的dataframe資料,schema指定了回傳值的名稱和型別串列,

通過以下代碼我們可以知道,applyInPandas可以借助cogroup進行表連接:

val a = sc.parallelize(List(1, 2, 1, 3))
val b = a.map((_, "b"))
val c = a.map((_, "c"))
val d = a.map((_, "d"))
val e = a.map((_, "e"))
scala> b.cogroup(c).foreach(println)
(3,(CompactBuffer(b),CompactBuffer(c)))
(1,(CompactBuffer(b, b),CompactBuffer(c, c)))
(2,(CompactBuffer(b),CompactBuffer(c)))

示例:

df1 = spark.createDataFrame(
    [(20000101, 1, 1.0), (20000101, 2, 2.0), (20000102, 1, 3.0), (20000102, 2, 4.0)],
    ("time", "id", "v1"))

df2 = spark.createDataFrame(
    [(20000101, 1, "x"), (20000101, 2, "y")],
    ("time", "id", "v2"))

def asof_join(l, r):
  	# l、r is a pandas.DataFrame
  	# 這里是按照id分組
  	# 那么,l和r分別是對應id的df1和df2資料
    return pd.merge_asof(l, r, on="time", by="id")

df1.groupby("id").cogroup(df2.groupby("id")).applyInPandas(
    asof_join, schema="time int, id int, v1 double, v2 string").show()
# +--------+---+---+---+
# |    time| id| v1| v2|
# +--------+---+---+---+
# |20000101|  1|1.0|  x|
# |20000102|  1|3.0|  x|
# |20000101|  2|2.0|  y|
# |20000102|  2|4.0|  y|
# +--------+---+---+---+

map迭代

執行以下代碼:

def filter_func(iterator):
    for i, pdf in enumerate(iterator):
        print(i, pdf.values.tolist())
        yield pdf


df.mapInPandas(filter_func, schema=df.schema).show()

后臺看到執行結果為:

0 [[2.0, 5.0]]                                                     
0 [[2.0, 3.0]]
0 [[1.0, 1.0]]
0 [[1.0, 2.0]]
0 [[2.0, 10.0]]

前臺結果幾乎保持原樣,可以知道iterator是一個磁區迭代器,迭代出當前磁區的每一行資料都被封裝成一個pandas物件,

Pyspark與Pandas的互動

將spark的Datafream物件轉換為原生的pandas物件只需呼叫toPandas()方法即可:

sdf.toPandas()

將原生的pandas物件轉換為spark物件可以使用spark的頂級方法:

spark.createDataFrame(pdf)

習慣使用pandas的童鞋,還可以直接使用pandas-on-Spark,在spark3.2.0版本時已經匹配到pandas 1.3版本的API,通過pandas-on-Spark,我們可以完全用pandas的api操作資料,而底層執行卻是spark的并行化,

使用pandas-on-Spark最好設定一下環境變數:

import os
os.environ["PYARROW_IGNORE_TIMEZONE"] = "1"

將spark物件轉換為pandas-on-Spark物件:

df = spark.createDataFrame(
    [(1, 1.0), (1, 2.0), (2, 3.0), (2, 5.0), (2, 10.0)],
    ("id", "v"))

pdf = df.to_pandas_on_spark()
print(type(pdf))
pdf

image-20220218090829123

pandas-on-Spark物件也可以還原成spark物件:

pdf.to_spark()

另外spark提供直接將檔案讀取成pandas-on-Spark物件的api,例如:

import pyspark.pandas as ps

pdf = ps.read_csv("example_csv.csv")

ps物件與原生pandas物件的API幾乎完全一致,

ps物件相對于原生pandas物件的API幾乎一致,同時還支持一些強悍的功能,例如直接以SQL形式訪問:

ps.sql("SELECT count(*) as num FROM {pdf}")

{pdf}訪問了變數名為pdf的pandas-on-Spark物件,

轉載請註明出處,本文鏈接:https://www.uj5u.com/qita/427507.html

標籤:其他

上一篇:Dubbo學習記錄(三)----JDK之SPI機制

下一篇:2022年美賽D題思路翻譯資料參考文獻

標籤雲
其他(157675) Python(38076) JavaScript(25376) Java(17977) C(15215) 區塊鏈(8255) C#(7972) AI(7469) 爪哇(7425) MySQL(7132) html(6777) 基礎類(6313) sql(6102) 熊猫(6058) PHP(5869) 数组(5741) R(5409) Linux(5327) 反应(5209) 腳本語言(PerlPython)(5129) 非技術區(4971) Android(4554) 数据框(4311) css(4259) 节点.js(4032) C語言(3288) json(3245) 列表(3129) 扑(3119) C++語言(3117) 安卓(2998) 打字稿(2995) VBA(2789) Java相關(2746) 疑難問題(2699) 细绳(2522) 單片機工控(2479) iOS(2429) ASP.NET(2402) MongoDB(2323) 麻木的(2285) 正则表达式(2254) 字典(2211) 循环(2198) 迅速(2185) 擅长(2169) 镖(2155) 功能(1967) .NET技术(1958) Web開發(1951) python-3.x(1918) HtmlCss(1915) 弹簧靴(1913) C++(1909) xml(1889) PostgreSQL(1872) .NETCore(1853) 谷歌表格(1846) Unity3D(1843) for循环(1842)

熱門瀏覽
  • 網閘典型架構簡述

    網閘架構一般分為兩種:三主機的三系統架構網閘和雙主機的2+1架構網閘。 三主機架構分別為內端機、外端機和仲裁機。三機無論從軟體和硬體上均各自獨立。首先從硬體上來看,三機都用各自獨立的主板、記憶體及存盤設備。從軟體上來看,三機有各自獨立的作業系統。這樣能達到完全的三機獨立。對于“2+1”系統,“2”分為 ......

    uj5u.com 2020-09-10 02:00:44 more
  • 如何從xshell上傳檔案到centos linux虛擬機里

    如何從xshell上傳檔案到centos linux虛擬機里及:虛擬機CentOs下執行 yum -y install lrzsz命令,出現錯誤:鏡像無法找到軟體包 前言 一、安裝lrzsz步驟 二、上傳檔案 三、遇到的問題及解決方案 總結 前言 提示:其實很簡單,往虛擬機上安裝一個上傳檔案的工具 ......

    uj5u.com 2020-09-10 02:00:47 more
  • 一、SQLMAP入門

    一、SQLMAP入門 1、判斷是否存在注入 sqlmap.py -u 網址/id=1 id=1不可缺少。當注入點后面的引數大于兩個時。需要加雙引號, sqlmap.py -u "網址/id=1&uid=1" 2、判斷文本中的請求是否存在注入 從文本中加載http請求,SQLMAP可以從一個文本檔案中 ......

    uj5u.com 2020-09-10 02:00:50 more
  • Metasploit 簡單使用教程

    metasploit 簡單使用教程 浩先生, 2020-08-28 16:18:25 分類專欄: kail 網路安全 linux 文章標簽: linux資訊安全 編輯 著作權 metasploit 使用教程 前言 一、Metasploit是什么? 二、準備作業 三、具體步驟 前言 Msfconsole ......

    uj5u.com 2020-09-10 02:00:53 more
  • 游戲逆向之驅動層與用戶層通訊

    驅動層代碼: #pragma once #include <ntifs.h> #define add_code CTL_CODE(FILE_DEVICE_UNKNOWN,0x800,METHOD_BUFFERED,FILE_ANY_ACCESS) /* 更多游戲逆向視頻www.yxfzedu.com ......

    uj5u.com 2020-09-10 02:00:56 more
  • 北斗電力時鐘(北斗授時服務器)讓網路資料更精準

    北斗電力時鐘(北斗授時服務器)讓網路資料更精準 北斗電力時鐘(北斗授時服務器)讓網路資料更精準 京準電子科技官微——ahjzsz 近幾年,資訊技術的得了快速發展,互聯網在逐漸普及,其在人們生活和生產中都得到了廣泛應用,并且取得了不錯的應用效果。計算機網路資訊在電力系統中的應用,一方面使電力系統的運行 ......

    uj5u.com 2020-09-10 02:01:03 more
  • 【CTF】CTFHub 技能樹 彩蛋 writeup

    ?碎碎念 CTFHub:https://www.ctfhub.com/ 筆者入門CTF時時剛開始刷的是bugku的舊平臺,后來才有了CTFHub。 感覺不論是網頁UI設計,還是題目質量,賽事跟蹤,工具軟體都做得很不錯。 而且因為獨到的金幣制度的確讓人有一種想去刷題賺金幣的感覺。 個人還是非常喜歡這個 ......

    uj5u.com 2020-09-10 02:04:05 more
  • 02windows基礎操作

    我學到了一下幾點 Windows系統目錄結構與滲透的作用 常見Windows的服務詳解 Windows埠詳解 常用的Windows注冊表詳解 hacker DOS命令詳解(net user / type /md /rd/ dir /cd /net use copy、批處理 等) 利用dos命令制作 ......

    uj5u.com 2020-09-10 02:04:18 more
  • 03.Linux基礎操作

    我學到了以下幾點 01Linux系統介紹02系統安裝,密碼啊破解03Linux常用命令04LAMP 01LINUX windows: win03 8 12 16 19 配置不繁瑣 Linux:redhat,centos(紅帽社區版),Ubuntu server,suse unix:金融機構,證券,銀 ......

    uj5u.com 2020-09-10 02:04:30 more
  • 05HTML

    01HTML介紹 02頭部標簽講解03基礎標簽講解04表單標簽講解 HTML前段語言 js1.了解代碼2.根據代碼 懂得挖掘漏洞 (POST注入/XSS漏洞上傳)3.黑帽seo 白帽seo 客戶網站被黑帽植入劫持代碼如何處理4.熟悉html表單 <html><head><title>TDK標題,描述 ......

    uj5u.com 2020-09-10 02:04:36 more
最新发布
  • 2023年最新微信小程式抓包教程

    01 開門見山 隔一個月發一篇文章,不過分。 首先回顧一下《微信系結手機號資料庫被脫庫事件》,我也是第一時間得知了這個訊息,然后跟蹤了整件事情的經過。下面是這起事件的相關截圖以及近日流出的一萬條資料樣本: 個人認為這件事也沒什么,還不如關注一下之前45億快遞資料查詢渠道疑似在近日復活的訊息。 訊息是 ......

    uj5u.com 2023-04-20 08:48:24 more
  • web3 產品介紹:metamask 錢包 使用最多的瀏覽器插件錢包

    Metamask錢包是一種基于區塊鏈技術的數字貨幣錢包,它允許用戶在安全、便捷的環境下管理自己的加密資產。Metamask錢包是以太坊生態系統中最流行的錢包之一,它具有易于使用、安全性高和功能強大等優點。 本文將詳細介紹Metamask錢包的功能和使用方法。 一、 Metamask錢包的功能 數字資 ......

    uj5u.com 2023-04-20 08:47:46 more
  • vulnhub_Earth

    前言 靶機地址->>>vulnhub_Earth 攻擊機ip:192.168.20.121 靶機ip:192.168.20.122 參考文章 https://www.cnblogs.com/Jing-X/archive/2022/04/03/16097695.html https://www.cnb ......

    uj5u.com 2023-04-20 07:46:20 more
  • 從4k到42k,軟體測驗工程師的漲薪史,給我看哭了

    清明節一過,盲猜大家已經無心上班,在數著日子準備過五一,但一想到銀行卡里的余額……瞬間心情就不美麗了。最近,2023年高校畢業生就業調查顯示,本科畢業月平均起薪為5825元。調查一出,便有很多同學表示自己又被平均了。看著這一資料,不免讓人想到前不久中國青年報的一項調查:近六成大學生認為畢業10年內會 ......

    uj5u.com 2023-04-20 07:44:00 more
  • 最新版本 Stable Diffusion 開源 AI 繪畫工具之中文自動提詞篇

    🎈 標簽生成器 由于輸入正向提示詞 prompt 和反向提示詞 negative prompt 都是使用英文,所以對學習母語的我們非常不友好 使用網址:https://tinygeeker.github.io/p/ai-prompt-generator 這個網址是為了讓大家在使用 AI 繪畫的時候 ......

    uj5u.com 2023-04-20 07:43:36 more
  • 漫談前端自動化測驗演進之路及測驗工具分析

    隨著前端技術的不斷發展和應用程式的日益復雜,前端自動化測驗也在不斷演進。隨著 Web 應用程式變得越來越復雜,自動化測驗的需求也越來越高。如今,自動化測驗已經成為 Web 應用程式開發程序中不可或缺的一部分,它們可以幫助開發人員更快地發現和修復錯誤,提高應用程式的性能和可靠性。 ......

    uj5u.com 2023-04-20 07:43:16 more
  • CANN開發實踐:4個DVPP記憶體問題的典型案例解讀

    摘要:由于DVPP媒體資料處理功能對存放輸入、輸出資料的記憶體有更高的要求(例如,記憶體首地址128位元組對齊),因此需呼叫專用的記憶體申請介面,那么本期就分享幾個關于DVPP記憶體問題的典型案例,并給出原因分析及解決方法。 本文分享自華為云社區《FAQ_DVPP記憶體問題案例》,作者:昇騰CANN。 DVPP ......

    uj5u.com 2023-04-20 07:43:03 more
  • msf學習

    msf學習 以kali自帶的msf為例 一、msf核心模塊與功能 msf模塊都放在/usr/share/metasploit-framework/modules目錄下 1、auxiliary 輔助模塊,輔助滲透(埠掃描、登錄密碼爆破、漏洞驗證等) 2、encoders 編碼器模塊,主要包含各種編碼 ......

    uj5u.com 2023-04-20 07:42:59 more
  • Halcon軟體安裝與界面簡介

    1. 下載Halcon17版本到到本地 2. 雙擊安裝包后 3. 步驟如下 1.2 Halcon軟體安裝 界面分為四大塊 1. Halcon的五個助手 1) 影像采集助手:與相機連接,設定相機引數,采集影像 2) 標定助手:九點標定或是其它的標定,生成標定檔案及內參外參,可以將像素單位轉換為長度單位 ......

    uj5u.com 2023-04-20 07:42:17 more
  • 在MacOS下使用Unity3D開發游戲

    第一次發博客,先發一下我的游戲開發環境吧。 去年2月份買了一臺MacBookPro2021 M1pro(以下簡稱mbp),這一年來一直在用mbp開發游戲。我大致分享一下我的開發工具以及使用體驗。 1、Unity 官網鏈接: https://unity.cn/releases 我一般使用的Apple ......

    uj5u.com 2023-04-20 07:40:19 more