我正在嘗試使用 Apache Flink 從 Kafka 讀取和列印 Protobuf 訊息。
我按照官方檔案沒有成功:https : //nightlies.apache.org/flink/flink-docs-master/docs/dev/datastream/fault-tolerance/serialization/third_party_serializers/
Flink 消費者代碼為:
def main(args: Array[String]): Unit = {
val env = StreamExecutionEnvironment.createLocalEnvironmentWithWebUI()
env.enableCheckpointing(5000)
env.getCheckpointConfig.setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE)
env.getCheckpointConfig.setCheckpointStorage(s"$targetPath/checkpoints")
env.getConfig.registerTypeWithKryoSerializer(classOf[User], classOf[ProtobufSerializer])
val source = KafkaSource.builder[User]
.setBootstrapServers(brokers)
.setTopics(topic)
.setGroupId(consumerGroupId)
.setValueOnlyDeserializer(new ProtoDeserializer())
.setStartingOffsets(OffsetsInitializer.earliest)
.build
val stream = env.fromSource(source, WatermarkStrategy.noWatermarks[User], kafkaTableName)
stream.print()
env.execute()
}
解串器代碼是:
class ProtoDeserializer extends DeserializationSchema[User] {
override def getProducedType: TypeInformation[User] = null
override def deserialize(message: Array[Byte]): User = User.parseFrom(message)
override def isEndOfStream(nextElement: User): Boolean = false
}
執行流光時出現以下錯誤:
Protocol message contained an invalid tag (zero).
值得一提的是,我設法使用融合的 protobuf 消費者成功讀取和反序列化訊息,因此訊息似乎沒有損壞。
uj5u.com熱心網友回復:
融合的 protobuf 序列化器不會產生可以被其他反序列化器直接反序列化的內容。confluent 的檔案中描述了該格式:它以一個魔術位元組(始終為零)開頭,后跟一個四位元組的模式 ID。隨后是 protobuf 負載,從位元組 5 開始。
在這種情況下,該getProducedType方法應回傳適當的值。如果沒有這個,您可能會在運行時遇到問題。TypeInformationTypeInformation.of(User.class)
使用 with 的反序列化器KafkaSource不需要實作isEndOfStream,但它不會傷害任何東西。
轉載請註明出處,本文鏈接:https://www.uj5u.com/net/403042.html
標籤:
