我在 DEV 環境中將 apache 光束管道部署到 GCP 資料流,一切運行良好。然后我將它部署到歐洲環境中的生產環境(具體來說 - 作業區域:europe-west1,工人位置:)europe-west1-d,在那里我們獲得了高資料速度并且事情開始變得復雜。
我正在使用會話視窗將事件分組到會話中。會話密鑰是tenantId/visitorId,其間隔是 30 分鐘。我還使用觸發器每 30 秒發出一次事件,以便在會話結束之前釋放事件(將它們寫入 BigQuery)。
問題似乎發生在EventToSession/GroupPairsByKey. droppedDueToLateness在這一步中,柜臺下有數千個事件,并且dataFreshness不斷增加(自從我部署它以來一直在增加)。這一步之前的所有步驟都運行良好,之后的所有步驟都受到它的影響,但似乎沒有任何其他問題。
我查看了一些指標,發現該EventToSession/GroupPairsByKey步驟每秒處理 100K 到 200K 鍵(取決于一天中的時間),這對我來說似乎很多。cpu 利用率沒有超過 70%,我正在使用流引擎。大多數情況下,worker 的數量是 2。最大 worker 記憶體容量為 32GB,而最大 worker 記憶體使用量目前為 23GB。我正在使用e2-standard-8機器型別。
我沒有任何熱鍵,因為每個會話最多包含幾十個事件。
我最大的懷疑是該步驟中正在處理的大量密鑰EventToSession/GroupPairsByKey。但另一方面,會話通常與單個客戶相關,因此谷歌應該期望每秒處理這么多的密鑰,不是嗎?
想獲得如何解決dataFreshness和事件droppedDueToLateness問題的建議。
添加生成會話的代碼:
input = input.apply("SetEventTimestamp", WithTimestamps.of(event -> Instant.parse(getEventTimestamp(event))
.withAllowedTimestampSkew(new Duration(Long.MAX_VALUE)))
.apply("SetKeyForRow", WithKeys.of(event -> getSessionKey(event))).setCoder(KvCoder.of(StringUtf8Coder.of(), input.getCoder()))
.apply("CreatingWindow", Window.<KV<String, TableRow>>into(Sessions.withGapDuration(Duration.standardMinutes(30)))
.triggering(Repeatedly.forever(AfterProcessingTime.pastFirstElementInPane().plusDelayOf(Duration.standardSeconds(30))))
.discardingFiredPanes()
.withAllowedLateness(Duration.standardDays(30)))
.apply("GroupPairsByKey", GroupByKey.create())
.apply("CreateCollectionOfValuesOnly", Values.create())
.apply("FlattenTheValues", Flatten.iterables());
uj5u.com熱心網友回復:
在做了一些研究后,我發現了以下內容:
- 關于不斷增加的資料新鮮度:只要允許遲到的資料到達會話視窗,該特定視窗就會保留在記憶體中。這意味著允許延遲 30 天的資料將使每個會話在記憶體中至少保留 30 天,這顯然會使系統過載。此外,我發現我們有一些持久的會話,機器人訪問我們正在監控的網站并采取行動。這些機器人可以永遠保持會話,這也會使系統過載。解決方案是將允許的延遲時間減少到 2 天并使用有界會話(查找“有界會話”)。
- 關于由于遲到而丟棄的事件:這些事件在到達時屬于過期視窗,水印已經結束的視窗(請參閱
droppedDueToLateness此處的檔案)。這些事件在會話視窗函式之后首先被丟棄,GroupByKey以后無法處理。我們不想丟棄任何遲到的資料,因此解決方案是在每個事件進入會話部分之前檢查每個事件的時間戳,并僅將不會被丟棄的事件流到會話部分 - 滿足此條件的事件:event_timestamp >= event_arrival_time - (gap_duration allowed_lateness). 其余部分將在沒有會話資料的情況下寫入 BigQuery(如果事件的時間戳在之前,event_arrival_time - (gap_duration allowed_lateness)即使該事件屬于實時會話,Apache Beam 也會丟棄一個事件......)
ps - 在有界會話部分,他演示了如何實作有時間限制的會話,我相信他有一個錯誤,允許會話增長超過提供的最大大小。一旦會話超過最大大小,可以發送與該會話相交并且在會話之前的延遲資料,以使會話的開始時間更早,從而擴展會話。此外,一旦會話超過最大大小,就不能添加屬于它的事件,但不能擴展它。
為了解決這個問題,我切換了current視窗跨度和 if 陳述句的順序,并在視窗跨越部分的函式中編輯了 if 陳述句(檢查會話最大大小的陳述句)mergeWindows,因此會話無法通過最大值大小并且只能添加不超過最大大小的資料。這是我的實作:
public void mergeWindows(MergeContext c) throws Exception {
List<IntervalWindow> sortedWindows = new ArrayList<>();
for (IntervalWindow window : c.windows()) {
sortedWindows.add(window);
}
Collections.sort(sortedWindows);
List<MergeCandidate> merges = new ArrayList<>();
MergeCandidate current = new MergeCandidate();
for (IntervalWindow window : sortedWindows) {
MergeCandidate next = new MergeCandidate(window);
if (current.intersects(window)) {
if ((current.union == null || new Duration(current.union.start(), window.end()).getMillis() <= maxSize.plus(gapDuration).getMillis())) {
current.add(window);
continue;
}
}
merges.add(current);
current = next;
}
merges.add(current);
for (MergeCandidate merge : merges) {
merge.apply(c);
}
}
轉載請註明出處,本文鏈接:https://www.uj5u.com/shujuku/484759.html
上一篇:SpringBoot會話管理——PostgreSQL Redis組合解決方案
下一篇:跨2個域共享用戶資料
