我開發了一個代碼,我JSON用預定義的格式決議傳入的資料。因此,它按預期作業。現在我的目標是將資料發送到相應的方法Right,Left并由Process我正在DB呼叫的另一個函式使用。
package KafkaAsSource
import KafkaAsSource.JSONParsingExample.{sampleJsonString, schemaJsonString}
import com.fasterxml.jackson.databind.{DeserializationFeature, MapperFeature, ObjectMapper}
import com.fasterxml.jackson.module.scala.{DefaultScalaModule, ScalaObjectMapper}
import com.networknt.schema.{JsonSchemaFactory, SpecVersion}
import org.apache.flink.streaming.api.functions.ProcessFunction
import org.apache.flink.util.Collector
import org.apache.flink.api.scala._
import org.apache.flink.streaming.api.scala.OutputTag
import scala.jdk.CollectionConverters._
import scala.util.{Failure, Success, Try}
case class Premium(id: String, premium: Long, eventTime: String)
class Splitter extends ProcessFunction[String,Premium] {
val outputTag = new OutputTag[String]("failed")
def fromJson[T](json: String)(implicit m: Manifest[T]): Either[String, T] = {
Try {
val schema = JsonSchemaFactory.getInstance(SpecVersion.VersionFlag.V4).getSchema(schemaJsonString)
// You can read a JSON object from String, a file, URL, etc.
val parsedJson = new ObjectMapper().readTree(sampleJsonString)
val validationMessages = schema.validate(parsedJson).asScala
validationMessages.foreach(msg => println(msg.getMessage))
} match {
case Success(x) => {
println(" Good: " x)
Right(x)
}
case Failure(err) => {
println("Bad: " json)
Left(json)
}
}
}
override def processElement(i: String, context: ProcessFunction[String, Premium]#Context, collector: Collector[Premium]): Unit = {
fromJson(i) match {
case Right(data) => {
collector.collect(data)
println("Good Records: " data)
}
case Left(json) => {
context.output(outputTag, json)
println("Bad Records: " json)
}
}
}
}
這給了我這樣的錯誤:
type mismatch;
found : x.type (with underlying type Unit)
required: T
Right(x)
在這種情況下,無論資料是否正確,Success總是呼叫該部分。我哪里錯了?
uj5u.com熱心網友回復:
Try只有在拋出例外時才會將其視為失敗。如果傳遞給的塊Try沒有拋出,則根據定義它是 a Success。
因此,假設您schema.validate(parsedJson)不拋出例外而是回傳一組驗證失敗,則有幾種方法:
Try如果出現故障,您可以留在里面并扔掉。實作此目的的一種簡單方法是使用require: 雖然它通常用于表達先決條件,但它是一種快速且易讀的拋出方法
Try {
// as before
val validationMessages = schema.validate(parsedJson).asScala
require(validationMessages.isEmpty)
parsedJson
} match {
// as before
}
- 您可以省去
Try(特別是如果您確定該塊中的任何其他代碼都不會引發例外)并直接轉到Either,例如:
val schema = JsonSchemaFactory.getInstance(SpecVersion.VersionFlag.V4).getSchema(schemaJsonString)
// You can read a JSON object from String, a file, URL, etc.
val parsedJson = new ObjectMapper().readTree(sampleJsonString)
val validationMessages = schema.validate(parsedJson).asScala
if (validationMessages.isEmpty) Right(parsedJson)
else Left(json)
編輯:請注意,在您編輯的代碼中,您已添加
validationMessages.foreach(msg => println(msg.getMessage))
替換parsedJson你之前的那個,把你的Try從 aTry[T]變成 a Try[Unit]which 變成Either[String, Unit]了match.
轉載請註明出處,本文鏈接:https://www.uj5u.com/qianduan/405130.html
標籤:
