我的 RDD(來自 ElasticSearch)看起來像這樣。
[
('rty456ui', {'@timestamp': '2022-10-10T24:56:10.000259 0000', 'host': {'id': 'test-host-id-1'}, 'watchlists': {'ioc': {'summary': '127.0.0.1', 'tags': ('Dummy Tag',)}}, 'source': {'ip': '127.0.0.1'}, 'event': {'created': '2022-10-10T13:56:10 00:00', 'id': 'rty456ui'}, 'tags': ('Mon',)}),
('cxs980qw', {'@timestamp': '2022-10-10T13:56:10.000259 0000', 'host': {'id': 'test-host-id-2'}, 'watchlists': {'ioc': {'summary': '0.0.0.1', 'tags': ('Dummy Tag',)}}, 'source': {'ip': '0.0.0.1'}, 'event': {'created': '2022-10-10T24:56:10 00:00', 'id': 'cxs980qw'}, 'tags': ('Mon', 'Tue')})
]
(我覺得有趣的是 ES 中的串列被轉換為 RDD 中的元組)
我正在嘗試將其轉換為類似的東西。
--------------- ----------- ----------- --------------------------- ----------------------- ----------------------- ---------------
|host.id |event.id |source.ip |event.created |watchlists.ioc.summary |watchlists.ioc.tags |tags |
--------------- ----------- ----------- --------------------------- ----------------------- ----------------------- ---------------
|test-host-id-1 |rty456ui |127.0.0.1 |2022-10-10T13:56:10 00:00 |127.0.0.1 |[Dummy Tag] |[Mon] |
|test-host-id-2 |cxs980qw |0.0.0.1 |2022-10-10T24:56:10 00:00 |127.0.0.1 |[Dummy Tag] |[Mon, Tue] |
--------------- ----------- ----------- --------------------------- ----------------------- ----------------------- ---------------
然而,得到這個。
------- -------- --------- ------------- ---------------------- ------------------- -------------------------------
|host.id|event.id|source.ip|event.created|watchlists.ioc.summary|watchlists.ioc.tags|tags |
------- -------- --------- ------------- ---------------------- ------------------- -------------------------------
|null |null |null |null |null |null |[Ljava.lang.Object;@6c704e6e |
|null |null |null |null |null |null |[Ljava.lang.Object;@701ea4c8 |
------- -------- --------- ------------- ---------------------- ------------------- -------------------------------
代碼
from pyspark.sql.types import StructType, StructField, StringType
schema = StructType([
StructField("host.id",StringType(), True),
StructField("event.id",StringType(), True),
StructField("source.ip",StringType(), True),
StructField("event.created", StringType(), True),
StructField("watchlists.ioc.summary", StringType(), True),
StructField("watchlists.ioc.tags", StringType(), True),
StructField("tags", StringType(), True)
])
df = spark.createDataFrame(es_rdd.map(lambda x: x[1]),schema)
df.show(truncate=False)
我正在嘗試將 rdd 轉換為 Dataframe。此外,我想為它定義架構。但是,pyspark.createDataFrame(rdd, schema)即使 rdd 有資料,也只回傳空值。此外,我[Ljava.lang.Object;@701ea4c8也進入了輸出。那么我在這里錯過了什么?
uj5u.com熱心網友回復:
您的帖子涵蓋了 2 個問題:
為什么當我將 RDD 轉換為資料框時,即使我宣告架構,所有列也會為空:在您的
schemaStructTypeColumn.StructFiedColumn (例如host.id)中獲取 RDD 中的值。但是,這種型別的選擇陳述句只有在使用 Spark SQL 選擇陳述句時才能作業,我認為這里沒有這樣的決議。為了實作您的目標,您可能必須在函式內部更新您的 lambda 函式map以提取確切的元素,例如rdd_trans = rdd.map(lambda x: (x[1]['host']['id'], x[1]['event']['id'], ))為什么
tag列的輸出沒有按預期顯示:這是因為當您宣告tag列時,您將其宣告為字串列,您應該改用 ArrayType。
轉載請註明出處,本文鏈接:https://www.uj5u.com/caozuo/519589.html
標籤:阿帕奇火花pysparkapache-spark-sqlrdd
