我正在創建一個簡單的 Apache Flink 作業(使用 Scala),它只嘗試列印代表 RabbitMQ 佇列(RMQSource)接收到的事件的案例類
我已經創建了自己的反序列化模式(使用 Jackson),并且當使用的訊息實際上是表示案例類的 JSON 時,它可以正常作業。但是如果佇列接收到格式錯誤的事件(我們可以稱之為“中毒訊息”,我猜),作業會失敗并繼續重新啟動。我必須清除佇列,然后作業狀態更改為“正在運行”。
題:
收到中毒訊息時,如何防止作業失敗?我可以在消費之前驗證訊息嗎?如果我可以在 Rabbit 中設定死信交換,我應該在哪里(如果可能)代表 Apache Flink 作為消費者進行否定確認?有沒有更好的方法來處理這個問題并保持作業運行以消耗下一個格式良好的訊息?
我的自定義 DeserializationSchema 提供給 RMQSource[Test]:
class eventSerializationSchema extends DeserializationSchema[Test] {
@throws(classOf[IOException])
def deserialize(message: Array[Byte]): Test = objectMapper.readValue(message, classOf[Test])
def isEndOfStream(nextElement: Test): Boolean = false
def getProducedType: TypeInformation[Test] = createTypeInformation[Test]
}
object eventSerializationSchema{
val objectMapper: ObjectMapper = new ObjectMapper().registerModule(DefaultScalaModule)
}
中毒訊息到達消耗佇列時獲得的錯誤:
com.fasterxml.jackson.core.JsonParseException: Unrecognized token 'a': was expecting (JSON String, Number, Array, Object or token 'null', 'true' or 'false')
at [Source: (byte[])"a"; line: 1, column: 2]
at com.fasterxml.jackson.core.JsonParser._constructError(JsonParser.java:2337)
at com.fasterxml.jackson.core.base.ParserMinimalBase._reportError(ParserMinimalBase.java:720)
at com.fasterxml.jackson.core.json.UTF8StreamJsonParser._reportInvalidToken(UTF8StreamJsonParser.java:3593)
at com.fasterxml.jackson.core.json.UTF8StreamJsonParser._handleUnexpectedValue(UTF8StreamJsonParser.java:2688)
at com.fasterxml.jackson.core.json.UTF8StreamJsonParser._nextTokenNotInObject(UTF8StreamJsonParser.java:870)
at com.fasterxml.jackson.core.json.UTF8StreamJsonParser.nextToken(UTF8StreamJsonParser.java:762)
at com.fasterxml.jackson.databind.ObjectMapper._initForReading(ObjectMapper.java:4684)
at com.fasterxml.jackson.databind.ObjectMapper._readMapAndClose(ObjectMapper.java:4586)
at com.fasterxml.jackson.databind.ObjectMapper.readValue(ObjectMapper.java:3609)
at org.angoglez.deserializers.eventSerializationSchema.deserialize(eventSerializationSchema.scala:17)
at org.angoglez.deserializers.eventSerializationSchema.deserialize(eventSerializationSchema.scala:14)
at org.apache.flink.streaming.connectors.rabbitmq.RMQDeserializationSchemaWrapper.deserialize(RMQDeserializationSchemaWrapper.java:47)
at org.apache.flink.streaming.connectors.rabbitmq.RMQSource.processMessage(RMQSource.java:319)
at org.apache.flink.streaming.connectors.rabbitmq.RMQSource.run(RMQSource.java:331)
at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:116)
at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:73)
at org.apache.flink.streaming.runtime.tasks.SourceStreamTask$LegacySourceFunctionThread.run(SourceStreamTask.java:323)
uj5u.com熱心網友回復:
從我的角度來看,你有幾個選擇:
捕獲在反序列化方法中拋出的例外,并洗掉有害記錄。
捕獲反序列化方法中拋出的例外,并以某種方式將您想了解的有關這些有害記錄的內容編碼到正在生成的物件中。然后在下游處理函式中,過濾掉這些有害記錄并將它們發送到側輸出。
不要在反序列化器中應用 ObjectMapper,而是在鏈式處理函式中進行真正的反序列化,該函式可以直接將有害記錄發送到側輸出。
轉載請註明出處,本文鏈接:https://www.uj5u.com/qukuanlian/358553.html
標籤:斯卡拉 兔米克 apache-flink
上一篇:Play框架編譯時依賴注入和單例
