系列文章目錄
Flink使用指南:Checkpoint機制,完全搞懂了,你就是大佬!
Flink使用指南: 面試必問記憶體管理模型,進大廠一定要知道!
Flink使用指南: Kafka流表關聯HBase維度表
Flink使用指南: Watermark新版本使用
Flink使用指南: Flink SQL自定義函式
1. 原理思想
Flink自帶的TwoPhaseCommitSinkFunction可以實作2pc提交方式保證資料一致性,我們先來看下實作這個類的方法:

2pc提交主要實作beginTransaction(開啟事務準備作業)、preCommit(準備提交)、commit(正式提交)、abort(丟棄)四個方法
舉個例子解釋下實作原理:
比如checkpoint每10s進行一次,此時用FlinkKafkaConsumer011實時消費kafka中的訊息,消費并處理完訊息后,進行一次預提交資料庫的操作,如果預提交沒有問題,10s后進行真正的插入資料庫操作,如果插入成功,進行一次checkpoint,flink會自動記錄消費的offset,可以將checkpoint保存的資料放到hdfs中,如果預提交出錯,比如在5s的時候出錯了,此時Flink程式就會進入不斷的重啟中,重啟的策略可以在配置中設定,當然下一次的checkpoint也不會做了,checkpoint記錄的還是上一次成功消費的offset,本次消費的資料因為在checkpoint期間,消費成功,但是預提交程序中失敗了,注意此時資料并沒有真正的執行插入操作,因為預提交(preCommit)失敗,提交(commit)程序也不會發生了,等你將例外資料處理完成之后,再重新啟動這個Flink程式,它會自動從上一次成功的checkpoint中繼續消費資料,以此來達到Kafka到Mysql的Exactly-Once,
2. 代碼實作
MySqlTwoPhaseCommitSink.java
public class MySqlTwoPhaseCommitSink extends TwoPhaseCommitSinkFunction<ObjectNode, Connection, Void> {
public MySqlTwoPhaseCommitSink() {
super(new KryoSerializer<>(Connection.class, new ExecutionConfig()), VoidSerializer.INSTANCE);
}
/**
* 執行資料入庫操作
* @param connection
* @param objectNode
* @param context
* @throws Exception
*/
@Override
protected void invoke(Connection connection, ObjectNode objectNode, Context context) throws Exception {
System.err.println("start invoke.......");
String date = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").format(new Date());
System.err.println("===>date:" + date + " " + objectNode);
String value = objectNode.get("value").toString();
String sql = "insert into `t_test` (`value`,`insert_time`) values (?,?)";
PreparedStatement ps = connection.prepareStatement(sql);
ps.setString(1, value);
ps.setTimestamp(2, new Timestamp(System.currentTimeMillis()));
//執行insert陳述句
ps.execute();
//手動制造例外
if(Integer.parseInt(value) == 15) System.out.println(1/0);
}
/**
* 獲取連接,開啟手動提交事物(getConnection方法中)
* @return
* @throws Exception
*/
@Override
protected Connection beginTransaction() throws Exception {
String url = "jdbc:mysql://localhost:3306/test?useUnicode=true&characterEncoding=UTF-8&zeroDateTimeBehavior=convertToNull&useSSL=false&autoReconnect=true";
Connection connection = DBConnectUtil.getConnection(url, "root", "123456");
System.err.println("start beginTransaction......."+connection);
return connection;
}
/**
* 預提交,這里預提交的邏輯在invoke方法中
* @param connection
* @throws Exception
*/
@Override
protected void preCommit(Connection connection) throws Exception {
System.err.println("start preCommit......."+connection);
}
/**
* 如果invoke執行正常則提交事物
* @param connection
*/
@Override
protected void commit(Connection connection) {
System.err.println("start commit......."+connection);
DBConnectUtil.commit(connection);
}
@Override
protected void recoverAndCommit(Connection connection) {
System.err.println("start recoverAndCommit......."+connection);
}
@Override
protected void recoverAndAbort(Connection connection) {
System.err.println("start abort recoverAndAbort......."+connection);
}
/**
* 如果invoke執行例外則回滾事物,下一次的checkpoint操作也不會執行
* @param connection
*/
@Override
protected void abort(Connection connection) {
System.err.println("start abort rollback......."+connection);
DBConnectUtil.rollback(connection);
}
}
DBConnectUtil.java
public class DBConnectUtil {
/**
* 獲取連接
*
* @param url
* @param user
* @param password
* @return
* @throws SQLException
*/
public static Connection getConnection(String url, String user, String password) throws SQLException {
Connection conn = null;
try {
Class.forName("com.mysql.jdbc.Driver");
} catch (ClassNotFoundException e) {
e.printStackTrace();
}
conn = DriverManager.getConnection(url, user, password);
//設定手動提交
conn.setAutoCommit(false);
return conn;
}
/**
* 提交事物
*/
public static void commit(Connection conn) {
if (conn != null) {
try {
conn.commit();
} catch (SQLException e) {
e.printStackTrace();
} finally {
close(conn);
}
}
}
/**
* 事物回滾
*
* @param conn
*/
public static void rollback(Connection conn) {
if (conn != null) {
try {
conn.rollback();
} catch (SQLException e) {
e.printStackTrace();
} finally {
close(conn);
}
}
}
/**
* 關閉連接
*
* @param conn
*/
public static void close(Connection conn) {
if (conn != null) {
try {
conn.close();
} catch (SQLException e) {
e.printStackTrace();
}
}
}
}
轉載請註明出處,本文鏈接:https://www.uj5u.com/qita/295120.html
標籤:其他
