Flink 中有兩個 Exactly-Once 語意實作,一個是 Kafka,另一個是 StreamingFileSink,
參考他們實作的邏輯,來自定義實作端到端exactly-once語意,
分析:
Flink的checkpoint機制(通過Chandy-Lamport):
JobManager的CheckpointCoordinator通過在stream中添加barrier,當barrier前的資料的所有operator的checkpoint都操作完成并回傳CheckpointCoordinator,才代表此次checkpoint執行完成;
checkpoint機制可以保證不丟資料,因為每次恢復的時候都是從最后一次checkpoint成功的地方開始處理,這樣可能會重復處理某些資料,實作了at-least-once,沒法做到exactly-once語意;
flink提供了TwoPhaseCommit兩階段提交機制:pre-commit預提交和commit正式提交,其中pre-commit不是真正提交了,可以回滾的,當兩次checkpoint間某operator掛了,此時sink端預提交的資料操作會被回滾,然后從最后一次checkpoint成功的地方開始處理,實作了exactly-once語意,
實作:
具體實作主要是通過繼承TwoPhaseCommitSinkFunction,重寫里面的方法,關閉mysql自動提交,在commit()方法中真正提交,abort()方法中rollback
主要代碼如下:
public class TwoPhaseCommitMysqlConsumerDemo {
public static void main(String[] args) throws Exception{
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// 設定ck屬性
env.setStateBackend(new FsStateBackend("hdfs://zcx1:9000:/flink/ck"));
env.enableCheckpointing(10000);
env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
env.getCheckpointConfig().setMinPauseBetweenCheckpoints(500);
env.getCheckpointConfig().setCheckpointTimeout(60000);
env.getCheckpointConfig().setFailOnCheckpointingErrors(false);
env.getCheckpointConfig().enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
env.getCheckpointConfig().setMaxConcurrentCheckpoints(1);
Properties properties=new Properties();
properties.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,"zcx4:9092,zcx5:9092,zcx6:9092");
//設定讀取已提交的資料
properties.setProperty(ConsumerConfig.ISOLATION_LEVEL_CONFIG,"read_committed");
properties.setProperty(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG,"false");
FlinkKafkaConsumer kafka = new FlinkKafkaConsumer("zcx2222", new SimpleStringSchema(),properties);
DataStreamSource dataStreamSource = env.addSource(kafka);
SingleOutputStreamOperator sum = dataStreamSource.flatMap(new FlatMapFunction<String, WC>() {
@Override
public void flatMap(String s, Collector<WC> collector) throws Exception {
String[] split = s.split(" ");
for (String ss : split) {
collector.collect(new WC(ss, 1));
}
}
}).keyBy("word").sum("num");
sum.print();
TwoPhaseCommitSinkFunction<WC, MyConnection, Void> twoPhaseCommitSinkFunction = new TwoPhaseCommitSinkFunction<WC, MyConnection, Void>(new KryoSerializer<MyConnection>(MyConnection.class, new ExecutionConfig()), new VoidSerializer()) {
@Override
protected void invoke(MyConnection myconnection, WC wc, Context context) throws Exception {
PreparedStatement preparedStatement = myconnection.connection.prepareStatement("insert into wc values(?,?) on duplicate key update num=?");
preparedStatement.setString(1, wc.word);
preparedStatement.setInt(2, wc.num);
preparedStatement.setInt(3, wc.num);
preparedStatement.executeUpdate();
preparedStatement.close();
}
@Override
protected MyConnection beginTransaction() throws Exception {
Class.forName("com.mysql.jdbc.Driver");
Connection connection = DriverManager.getConnection("jdbc:mysql://localhost:3306/test", "root", "root");
//關閉自動提交
connection.setAutoCommit(false);
return new MyConnection(connection);
}
@Override
protected void preCommit(MyConnection myconnection) throws Exception {
//invoke中完成
}
@Override
protected void commit(MyConnection myconnection) {
try {
myconnection.connection.commit();
myconnection.connection.close();
} catch (SQLException e) {
e.printStackTrace();
}
}
@Override
protected void abort(MyConnection myconnection) {
try {
myconnection.connection.rollback();
myconnection.connection.close();
} catch (SQLException e) {
e.printStackTrace();
}
}
};
sum.addSink(twoPhaseCommitSinkFunction);
env.execute();
}
}
class MyConnection{
transient Connection connection;
public MyConnection(Connection connection) {
this.connection = connection;
}
}
轉載請註明出處,本文鏈接:https://www.uj5u.com/qita/423713.html
標籤:其他
下一篇:歸零,重新出發
