1.宣告
當前內容主要為測驗和使用事件時間,使用自定義的時間作為水印,主要為模擬之用
- 工業的傳感器中,默認獲取的資料可能在多個工廠中的時間到來不一致,Flink處理的為工廠中的傳遞的時間
- 可能存在延遲的資料,延遲的資料需要處理(可能是網路原因或者其他原因)
主要內容為:
- 收集當前的延遲時間,顯示延遲資料
- 使用事件視窗處理資料,處理當前資料
2.基本demo
pom依賴和ComputerTemperature物體類參考前面的博文
1.創建時間會變化的資料源Source(就是簡單的變化)
import java.io.Serializable;
import java.util.Date;
import java.util.Iterator;
import org.apache.flink.streaming.api.functions.source.FromIteratorFunction;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
import com.hy.flink.test.pojo.AccountTransation;
import com.hy.flink.test.pojo.ComputerTemperature;
/**
*
* @author hy
* @createTime 2021-05-16 09:52:36
* @description 生成一個亂數的資源
*
*/
public class RandomComputerTemperatureLaterSource extends FromIteratorFunction<ComputerTemperature> {
/**
*
*/
private static final long serialVersionUID = 1L;
public RandomComputerTemperatureLaterSource(long sleepTime, Integer randomNumCount) {
super(new RandomComputerTemperatureIterator(sleepTime, randomNumCount));
// TODO Auto-generated constructor stub
}
private static class RandomComputerTemperatureIterator implements Iterator<ComputerTemperature>, Serializable {
/**
*
*/
private static final long serialVersionUID = 1L;
private final long sleepTime;
private String[] computerNames = { "電腦1", "電腦2", "電腦3" };
private Integer randomNumCount;
private RandomComputerTemperatureIterator(long sleepTime, Integer randomNumCount) {
this.sleepTime = sleepTime;
this.randomNumCount = randomNumCount;
}
@Override
public boolean hasNext() {
randomNumCount--;
return randomNumCount > 0;
}
@Override
public ComputerTemperature next() {
// 默認休眠時間為1秒鐘
try {
Thread.sleep(sleepTime);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
Double temperature = Math.random() * 100 + 1;
int index = (int) (Math.random() * computerNames.length);
long timestamp = new Date().getTime();
// 控制時間的產生,讓時間出現亂序的操作
if (timestamp % 2 == 0) {
timestamp -= temperature.intValue() * 3000;
}
ComputerTemperature computerTemperature = new ComputerTemperature(computerNames[index], temperature,
timestamp);
// StreamRecord<ComputerTemperature> streamRecord = new
// StreamRecord<ComputerTemperature>(computerTemperature,timestamp);
System.out.println("當前的數量為:" + randomNumCount);
System.out.println("產生資料為====》"+computerTemperature);
return computerTemperature;
}
}
}
為了對比資料,所以將所有的資料全部列印出來
2.創建ComputerTemperature的Sink
import org.apache.flink.streaming.api.functions.sink.SinkFunction;
import org.apache.flink.util.OutputTag;
import org.apache.flink.walkthrough.common.sink.AlertSink;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
*
* @author hy
* @createTime 2021-05-16 10:27:25
* @description 主要為列印當前的賬戶的資訊
*
*/
public class ComputerTemperatureSink implements SinkFunction<ComputerTemperature> {
/**
*
*/
private static final long serialVersionUID = 1L;
private static final Logger LOG = LoggerFactory.getLogger(AlertSink.class);
@Override
public void invoke(ComputerTemperature value, Context context) {
LOG.info("LATE===>"+value.toString());
}
}
這里就是簡單的log列印,這里可以采用其他的東西取代
3.撰寫具體操作類
package com.hy.flink.test.window;
import java.text.SimpleDateFormat;
import java.time.Duration;
import java.util.Collection;
import java.util.Date;
import org.apache.flink.api.common.ExecutionConfig;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.api.java.tuple.Tuple3;
import org.apache.flink.streaming.api.TimeCharacteristic;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.AssignerWithPeriodicWatermarks;
import org.apache.flink.streaming.api.functions.ProcessFunction;
import org.apache.flink.streaming.api.functions.sink.SinkFunction;
import org.apache.flink.streaming.api.functions.sink.SinkFunction.Context;
import org.apache.flink.streaming.api.functions.timestamps.AscendingTimestampExtractor;
import org.apache.flink.streaming.api.functions.timestamps.BoundedOutOfOrdernessTimestampExtractor;
import org.apache.flink.streaming.api.functions.windowing.ProcessWindowFunction;
import org.apache.flink.streaming.api.watermark.Watermark;
import org.apache.flink.streaming.api.windowing.assigners.EventTimeSessionWindows;
import org.apache.flink.streaming.api.windowing.assigners.TumblingProcessingTimeWindows;
import org.apache.flink.streaming.api.windowing.assigners.WindowAssigner;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.api.windowing.triggers.Trigger;
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
import org.apache.flink.util.Collector;
import org.apache.flink.util.OutputTag;
import org.apache.flink.walkthrough.common.sink.AlertSink;
import com.hy.flink.test.pojo.ComputerTemperature;
import com.hy.flink.test.pojo.ComputerTemperatureSink;
import com.hy.flink.test.source.RandomComputerTemperatureLaterSource;
import com.hy.flink.test.source.RandomComputerTemperatureSource;
import com.hy.flink.test.window.OfficeWindowsTest.MyMaxTemperatureHandler;
/**
*
* @author hy
* @createTime 2021-06-06 09:37:41
* @description 當前內容主要為測驗和使用當前的遲到的事件 1. 產生遲到的事件(應該以隨機的時間作為目標,而不是有序的) 2.
* 需要特定的水印才能處理,創建自己的時間水印
*/
public class LaterWindowEventTest {
static OutputTag<ComputerTemperature> lateTag = new OutputTag<ComputerTemperature>("late") {
};
public static void main(String[] args) {
// 設定存盤延遲到的資料
// 設定當前的環境為本地環境
final StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironment();
// 設定為事件時間
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
// 設定資料來源為亂數方式的資料
DataStream<ComputerTemperature> dataStream = env.addSource(new RandomComputerTemperatureLaterSource(560, 30));
WatermarkStrategy<ComputerTemperature> strategy = WatermarkStrategy
.<ComputerTemperature>forBoundedOutOfOrderness(Duration.ofSeconds(5))
.withTimestampAssigner((event, timestamp) -> event.getTimestamp());
dataStream = dataStream.assignTimestampsAndWatermarks(strategy);
// 將資料按照當前的名稱進行分組操作,然后每5秒統計一次,使用MyHandler進行統計操作
SingleOutputStreamOperator<Tuple3<String, Long, Double>> process = dataStream
/* .assignTimestampsAndWatermarks(new TimeLagWatermarkGenerator()) */
/*
* .assignTimestampsAndWatermarks( new
* BoundedOutOfOrdernessTimestampExtractor<ComputerTemperature>(Time.seconds(10)
* ) {
*
* @Override public long extractTimestamp(ComputerTemperature element) { return
* System.currentTimeMillis(); } })
*/
/*
* .assignTimestampsAndWatermarks(new
* AscendingTimestampExtractor<ComputerTemperature>() {
*
* @Override public long extractAscendingTimestamp(ComputerTemperature element)
* { return element.getTimestamp(); } })
*/
.keyBy(x -> x.getName())
// 使用timeWindow才是時間視窗,使用.window(TumblingProcessingTimeWindows.of(Time.seconds(5))的延遲資料無效,無法正常顯示和處理延遲資料
.timeWindow(Time.milliseconds(5000))
// 將延遲到的資料放在延遲資料集合中
.sideOutputLateData(lateTag)
// 允許最晚到的時間為10秒的資料,也可以處理
.allowedLateness(Time.seconds(10))
.process(new MyMaxTemperatureHandler());
// print the results with a single thread, rather than in parallel
// 列印結果并使用單個執行緒的方式,采用并行計算,不管當前的是否有資料,都開始統計
process.addSink(new SinkFunction<Tuple3<String,Long,Double>>(){
@Override
public void invoke(Tuple3<String, Long, Double> value, Context context) throws Exception {
System.out.println("收集到的資料===>"+value);
}
}).setParallelism(1);
// 開始處理延遲的資料,問題1這里為什么沒有資料顯示????,是沒有丟棄資料嗎?通過查看發現有資料是被丟棄的
DataStream<ComputerTemperature> lateStream = process.getSideOutput(lateTag);
//SingleOutputStreamOperator<ComputerTemperature> process2 = lateStream.process(new MyLateTemperatureHandler());
//process2.print().setParallelism(1);// 延遲的資料的顯示,發現丟棄的資料是無法列印的,是否能sink
lateStream.addSink(new ComputerTemperatureSink());
try {
env.execute("開始執行統計每個電腦的5次溫度中的溫度最大值");
} catch (Exception e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
private static class TimeLagWatermarkGenerator implements AssignerWithPeriodicWatermarks<ComputerTemperature> {
private final long maxTimeLag = 5000; // 5 seconds
@Override
public long extractTimestamp(ComputerTemperature element, long previousElementTimestamp) {
return element.getTimestamp();
}
@Override
public Watermark getCurrentWatermark() {
// return the watermark as current time minus the maximum time lag
return new Watermark(System.currentTimeMillis() - maxTimeLag);
}
}
// 專門處理延期到達的資料
static class MyLateTemperatureHandler extends ProcessFunction<ComputerTemperature, ComputerTemperature> {
@Override
public void processElement(ComputerTemperature bean,
ProcessFunction<ComputerTemperature, ComputerTemperature>.Context context,
Collector<ComputerTemperature> out) throws Exception {
System.out.println("=======================>");
System.out.println(bean);
System.out.println("<========================");
// 不收集過時的bean
/* out.collect(bean); */
/* context.output(lateTag,bean); */
}
}
// 主要獲取當前的temperature的最大溫度
static class MyMaxTemperatureHandler
extends ProcessWindowFunction<ComputerTemperature, Tuple3<String, Long, Double>, String, TimeWindow> {
@Override
public void process(String key,
ProcessWindowFunction<ComputerTemperature, Tuple3<String, Long, Double>, String, TimeWindow>.Context context,
Iterable<ComputerTemperature> events, Collector<Tuple3<String, Long, Double>> out) throws Exception {
// TODO Auto-generated method stub
Double max = 0.0;
long time = 0L;
System.out.println("開始處理資料.........");
events.forEach(x -> System.out.println(x));
for (ComputerTemperature event : events) {
Double temperature = event.getTemperature();
if (temperature > max) {
max = temperature;
time = event.getTimestamp();
}
}
// 主要收集最大值的資料new SimpleDateFormat("yyyy-MM-dd HH:mm:ss.SSS").format(new
// Date(time))
out.collect(Tuple3.of(key, time, max));
}
}
}
這里使用OutputTag收集延遲的資料,自動收集,可以設定延遲時間
3.測驗

由于資料量比較多亂,所以整理后:



測驗成功!
4.其中的坑
1. 本人感覺有坑,在執行的程序中一直沒有出現延遲資料的列印,但是實際處理的時候是少了資料,個人感覺必須使用:timeWindow(Time.milliseconds(5000))而不是使用window
2.由于官方沒有實際的例子,所以不知道是env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);還是其他的東西生效了,導致可以正常使用
轉載請註明出處,本文鏈接:https://www.uj5u.com/qita/286209.html
標籤:其他
