我創建我的 SparkSession 并以這種方式注冊 kryo 類:
val sparkConf = new SparkConf()
.setAppName("bd-dq-spark")
.set("spark.sql.adaptive.enabled", "true")
.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
.set("spark.kryo.registrationRequired", "true")
.set("spark.driver.host", "127.0.0.1")
.registerKryoClasses(Array(classOf[HeatSensorEvent], Class.forName("scala.Enumeration$Val"), Class.forName("cs.spark_implicits.Model$EventType$")))
val spark: SparkSession =
SparkSession.builder()
.master("local[*]")
.config(sparkConf)
.getOrCreate()
我這樣定義我的案例類:
object Model {
type Timestamp = Long
case class HeatSensorEvent(
eventId: String,
sensorId: String,
deviceId: String,
eventType: EventType,
timestamp: Timestamp,
temperature: Double
)
object EventType extends Enumeration {
final type EventType = Value
val TEMPERATURE_CHANGE: EventType.Value = Value
}
}
我以這種方式準備我的假資料:
val heatSensorEventData = Seq(
HeatSensorEvent("123", "s1", "d1", TEMPERATURE_CHANGE, 1619555389, Double.box(85.41)),
HeatSensorEvent("234", "s1", "d1", TEMPERATURE_CHANGE, 1619555419, Double.box(60.41)),
HeatSensorEvent("345", "s1", "d1", TEMPERATURE_CHANGE, 1619556389, Double.box(60.41)),
HeatSensorEvent("567", "s1", "d1", TEMPERATURE_CHANGE, 1619557389, Double.box(50.41))
)
我的主要內容是:
def main(args: Array[String]): Unit = {
implicit val heatSensorEventEncoder: Encoder[HeatSensorEvent] = org.apache.spark.sql.Encoders.kryo[HeatSensorEvent]
implicit val eventTypeEncoder: Encoder[EventType] = org.apache.spark.sql.Encoders.kryo[EventType.EventType]
val heatSensorEventDs: Dataset[HeatSensorEvent] = spark
.createDataset(heatSensorEventData).as[HeatSensorEvent]
heatSensorEventDs.show
heatSensorEventDs.printSchema()
}
但我得到的只是這個:
--------------------
| value|
--------------------
|[27 01 01 64 B1 0...|
|[27 01 01 64 B1 0...|
|[27 01 01 64 B1 0...|
|[27 01 01 64 B1 0...|
--------------------
root
|-- value: binary (nullable = true)
我的問題是為什么我丟失了所有架構并且無法顯示正常資料?我怎樣才能解決這個問題?
uj5u.com熱心網友回復:
將編碼器與物件一起使用時,列可以轉換為單個二進制列,這使得無法使用 dataset.show()
請參閱如何解決此問題的方法,該方法源自這篇文章(不幸的是,這是一個 http 鏈接)。
定義你的類:
type Timestamp = Long
object Events {
sealed case class EventType(value: String)
object TEMPERATURE_CHANGE extends EventType("TEMPERATURE_CHANGE")
val values: Array[EventType] = Array(TEMPERATURE_CHANGE)
}
case class HeatSensorEvent(
eventId: String,
sensorId: String,
deviceId: String,
eventType: Events.EventType,
timestamp: Timestamp,
temperature: Double
)
創建您的資料:
val heatSensorEventData = Seq(
HeatSensorEvent("123", "s1", "d1", Events.TEMPERATURE_CHANGE, 1619555389, Double.box(85.41)),
HeatSensorEvent("234", "s1", "d1", Events.TEMPERATURE_CHANGE, 1619555419, Double.box(60.41)),
HeatSensorEvent("345", "s1", "d1", Events.TEMPERATURE_CHANGE, 1619556389, Double.box(60.41)),
HeatSensorEvent("567", "s1", "d1", Events.TEMPERATURE_CHANGE, 1619557389, Double.box(50.41))
)
現在您可以看到您的資料集:
val ds = heatSensorEventData.toDS()
ds.show()
輸出:
------- -------- -------- -------------------- ---------- -----------
|eventId|sensorId|deviceId| eventType| timestamp|temperature|
------- -------- -------- -------------------- ---------- -----------
| 123| s1| d1|[TEMPERATURE_CHANGE]|1619555389| 85.41|
| 234| s1| d1|[TEMPERATURE_CHANGE]|1619555419| 60.41|
| 345| s1| d1|[TEMPERATURE_CHANGE]|1619556389| 60.41|
| 567| s1| d1|[TEMPERATURE_CHANGE]|1619557389| 50.41|
------- -------- -------- -------------------- ---------- -----------
ds: org.apache.spark.sql.Dataset[HeatSensorEvent] = [eventId: string, sensorId: string ... 4 more fields]
已請求在 spark 中使用列舉,并在未修復的情況下關閉。這樣做的好處是您不需要使用自定義編碼器。
轉載請註明出處,本文鏈接:https://www.uj5u.com/ruanti/311448.html
標籤:斯卡拉 阿帕奇火花 apache-spark-sql apache-spark-数据集
