我有一個日志檔案,我想報告IP啟動多于一種(至少兩種)型別protocol連接的地址,同時顯示這些協議。我試圖通過同時使用 DataFrames API 和 SparkSQL來獲得這些結果。
這是我的資料示例:
---------------- -------- -------- --------------- -------------- --------- ------------- ------ -----
| Timestamp|Duration|Protocol|BytesOriginator|ResponderBytes|LocalHost| RemoteHost| State|Flags|
---------------- -------- -------- --------------- -------------- --------- ------------- ------ -----
|748162802.427995| 1.24383| smtp| ?| ?| 1| 128.97.154.3| REJ| L|
|748162802.803033| 3.96513| smtp| 1173| 328| 3| 128.8.142.5| SF| null|
|748162804.817224| 1.02839| nntp| 58| 129| 2| 140.98.2.1| SF| L|
|748162812.254572| 138.168| nntp| 363238| 1200| 4| 128.49.4.103| SF| L|
|748162817.478016| 10.0858| nntp| 230| 100| 4| 128.32.133.1| SF| N|
|748162833.453963| 2.16477| smtp| 2524| 306| 5|192.48.232.17| SF| null|
|748162836.735788| 13.1779| smtp| 16479| 174| 16| 128.233.1.12|RSTRS3| L|
|748162839.930331| 6.69767| smtp| 3104| 371| 8| 139.91.1.1| SF| L|
|748162841.854151| 2.07407| smtp| 1172| 380| 6| 128.8.142.5| SF| null|
|748162854.814153| 131.659| nntp| 319292| 1220| 4| 128.110.4.25| SF| L|
|748162866.207165| 51.8406| nntp| 135714| 280| 4| 128.110.4.25| SF| null|
|748162866.600750|0.402045| smtp| ?| ?| 1| 128.97.154.3| REJ| L|
|748162869.790751| 172.363| smtp| 0| 0| 16|132.230.6.100| SF| L|
|748162873.491682| 102.88| nntp| 346| 180| 4| 128.32.136.1| SF| LN|
|748162875.237378| 5.32943| nntp| 90| 85| 4| 128.32.133.1| SF| N|
---------------- -------- -------- --------------- -------------- --------- ------------- ------ -----
我試圖過濾我的資料框,但我不斷收到錯誤,我不知道我是否應該使用 Window 函式。通過使用 SparkSQL,到目前為止,我得到了IPs但沒有protocols.
這是我所做的:
custom_schema = StructType([
StructField('Timestamp', StringType(), True),
StructField('Duration', FloatType(), True),
StructField('Protocol', StringType(), True),
StructField('BytesOriginator', StringType(), True),
StructField('ResponderBytes', StringType(), True),
StructField('LocalHost', StringType(), True),
StructField('RemoteHost', StringType(), True),
StructField('State', StringType(), True),
StructField('Flags', StringType(), True)
])
logs = spark.read.csv('lbl-conn-7.csv', header=False, sep=' ', schema=custom_schema)
# I get an error
logs.select('RemoteHost', 'Protocol').distinct().filter(F.countDistinct('Protocol') > 1).show()
logs.createOrReplaceTempView("mytable")
sqlContext = SQLContext(sc)
df = sqlContext.sql("select remotehost, protocol FROM mytable GROUP BY HAVING COUNT(distinct protocol) > 1")
# It doesn't show the protocols
df.show()
uj5u.com熱心網友回復:
您可以分組RemoteHost并收集不同Protocol使用的串列。然后,使用協議陣列的大小過濾結果資料幀:
import pyspark.sql.functions as F
logs.groupBy("RemoteHost").agg(
F.collect_set("Protocol").alias("Protocols")
).filter(
F.size("Protocols") >= 2
).show()
Spark SQL 等效查詢:
SELECT RemoteHost,
collect_set(Protocol) AS Protocols
FROM mytable
GROUP BY RemoteHost
HAVING size(Protocols) >= 2 -- or count(distinct Protocol) >= 2
如果要保留所有列,請使用 Windowcollect_set功能:
logs.withColumn(
"Protocols",
F.collect_set("Protocol").over((Window.partitionBy("RemoteHost")))
).filter(
F.size("Protocols") >= 2
).drop("Protocols").show()
轉載請註明出處,本文鏈接:https://www.uj5u.com/qiye/405146.html
標籤:
