- 我有一個RichParallelSourceFunction(parallelism=1),它每5秒鐘查詢一次MySQL,并發出一個時間戳串列,表明何時開始/停止向水槽寫入資料。
- 我有一個RichParallelSourceFunction(parallelism=1),它每5秒鐘查詢一次MySQL,發出一個時間戳串列,指示何時開始/停止向水槽寫入。
- 這個時間戳被廣播到原始流(parallelism=10)。
- 我已經將RichParallelSourceFunction的并行性設定為1,以減少對MySQL服務器的同時請求的數量。
- 我很困惑,在這種情況下是否需要廣播狀態。為什么不直接將廣播的資料存盤在操作員本地資料結構中呢?
- .broadcast(stateDesc)與.broadcast()之間有什么區別?
class MyBroadcastProcessFunction(name: String) extends BroadcastProcessFunction[Log, TimestampList, Log] with CheckpointedFunction{
private var sortedTimestamps: IndexedSeq[Long] = _
@transient var buffer: ListState[(Log, Long) ] = _
def processElement(value: Log, timestamp: Long, out: Collector[Log])。) Unit = {
if (timestamp > sortedTimestamps(0)
out.collect(value)
else[/span
buffer.put(value, 0L)
}
override def processBroadcastElement(timestamps: TimestampList。
ctx: BroadcastProcessFunction[Log, TimestampList, Log] #Context,
出。Collector[Log])。) Unit = {
//將時間戳添加到SortedTimestamps。
//我需要在這里使用BroadcastState mapstate嗎?或者只使用操作者本地的資料結構(例如sortedTimestamps)? }
}
override def snapshotState(context: FunctionSnapshotContext) 。Unit = {}。
override def initializeState(text: FunctionInitializationContext)。Unit = {
val stateDesc = new ListStateDescriptor[(Log, Long)]("logBuffer",
classOf[(Log, Long)])
buffer = context.getOperatorStateStore.getListState(stateDesc)
}
}
uj5u.com熱心網友回復:
轉換.broadcast()將向所有的下游操作者發出所有的事件,而不考慮流的并行性。Doc說:
設定DataStream的磁區,以便將輸出元素廣播給下一個操作的每個并行實體。 回傳:
.broadcast(stateDesc)是為了定義一個模式狀態,你可以在一個事件流的基礎上找到一個通常非常小的模式。這也是一個很好的參考。
您創建BroadcastProcessFunction的方式是錯誤的,因為您只處理了一個流。正確的方法是在processBroadcastElement()方法中處理廣播狀態,在你的案例中是來自MySql的時間戳。在這個方法中,你必須更新全域/廣播的狀態。
然后在另一個方法processElement()中,你會收到一個有規律的或快速的流,你可以根據你在第一個方法processBroadcastElement()中更新的狀態找到一個模式。
這里更多的是你應該如何實作。有一些注意事項,比如你將不能更新一個ListState。最好是使用MapState,因為它在鏈接上有描述。
def processBroadcastElement(timestamps。TimestampList。
ctx: BroadcastProcessFunction[Log, TimestampList, Log]#Context, out: Collector[Log])。) Unit = {
//更新緩沖區狀態。
// I don't think you can use .put() to update the ListState.
//實際上我認為不可能更新ListState,你必須使用MapState.。
context.getOperatorStateStore.getListState(stateDesc)
.put(value.name, value)。
}
def processElement(value: Log, timestamp: Long, out: Collector[Log])。) Unit = {
buffer = context.getOperatorStateStore.getListState(stateDesc)
if (value: Log match in buffer ?)
out.collect(value) //MATCH
}
uj5u.com熱心網友回復:
.broadcast(stateDesc)需要一個狀態描述符,這樣它就知道如何序列化被廣播的資料。無論你是否想在你的KeyedBroadcastProcessFunction中的MapState中存盤被廣播的資料,你都可以使用這個。
如果你不使用MapState來存盤這些資料,那么如果作業失敗并重新啟動,這些資料就會丟失。但在你的情況下,這也許并不重要,因為你可以在作業重新啟動時從MySQL中獲得最新的資料。
轉載請註明出處,本文鏈接:https://www.uj5u.com/caozuo/309129.html
標籤:
