接上一篇繼續
6. Sink 資料終端
api
6.1 基于控制臺和檔案的Sink
直接參考批處理的API即可,學習測驗會使用,開發中更多的是資料實時處理統計分析完之后
存入MySQL/Kafka/Redis/HBase…

案例演示:將詞頻統計結果資料存盤至文本檔案中,代碼如下所示:
package xx.xxxxxx.flink.sink;
import org.apache.flink.api.common.functions.FilterFunction;
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.core.fs.FileSystem;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.util.Collector;
public class StreamSinkFileDemo {
public static void main(String[] args) throws Exception {
// 1. 執行環境-env:流計算執行環境
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1) ;
// 2. 資料源-source:Socket接收資料
DataStreamSource<String> inputDataStream = env.socketTextStream("node1.itcast.cn", 9999);
// 3. 轉換處理-transformation:呼叫DataSet函式,處理資料
SingleOutputStreamOperator<Tuple2<String, Integer>> resultDataStream = inputDataStream
// a. 過濾資料
.filter(new FilterFunction<String>() {
@Override
public boolean filter(String line) throws Exception {
return null != line && line.trim().length() > 0;
}
})
// b. 分割單詞
.flatMap(new FlatMapFunction<String, String>() {
@Override
public void flatMap(String line, Collector<String> out) throws Exception {
String[] words = line.trim().split("\\W+");
for (String word : words) {
out.collect(word);
}
}
})
// c. 轉換二元組,表示每個單詞出現一次
.map(new MapFunction<String, Tuple2<String, Integer>>() {
@Override
public Tuple2<String, Integer> map(String word) throws Exception {
return Tuple2.of(word, 1);
}
})
// d. 按照單詞分組及對組內聚合操作
.keyBy(0).sum(1);
// d. 資料終端-sink:資料終端-sink:保存至檔案
resultDataStream
.setParallelism(1)
.writeAsText("datas/stream-output.txt", FileSystem.WriteMode.OVERWRITE);
// e. 執行應用-execute
env.execute(StreamSinkFileDemo.class.getSimpleName()) ;
}
}
6.2 自定義Sink:MySQL
需求:
將Flink集合中的資料通過自定義Sink保存到MySQL
代碼實作:
package xx.xxxxx.flink.sink.mysql;
import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.PreparedStatement;
/**
* 案例演示:自定義Sink,將資料保存至MySQL表中,繼承RichSinkFunction
*/
public class StreamSinkMySQLDemo {
@Data
@NoArgsConstructor
@AllArgsConstructor
private static class Student{
private Integer id ;
private String name ;
private Integer age ;
}
/**
* 自定義Sink,將DataStream資料寫入到外部存盤MySQL資料庫表中
*/
private static class MySQLSink extends RichSinkFunction<Student> {
private Connection conn = null;
private PreparedStatement pstmt = null;
// 計數
private Integer counter = 0 ;
@Override
public void open(Configuration parameters) throws Exception {
// 1. 加載驅動
Class.forName("com.mysql.jdbc.Driver");
// 2. 創建連接
conn = DriverManager.getConnection(
"jdbc:mysql://node1.itcast.cn:3306/?useUnicode=true&characterEncoding=utf-8&useSSL=false",
"root", "123456"
);
// 3. 創建PreparedStatement
pstmt = conn.prepareStatement("insert into db_flink.t_student (id, name, age) values (?, ?, ?)");
}
@Override
public void invoke(Student student, Context context) throws Exception {
try{
// 設定引數的值
pstmt.setInt(1, student.id);
pstmt.setString(2, student.name);
pstmt.setInt(3, student.age);
// 加入批次
pstmt.addBatch();
counter ++ ;
if(counter >= 10){
pstmt.executeBatch(); // 批量插入
counter = 0 ;
}
}catch (Exception e){
e.printStackTrace();
}
}
@Override
public void close() throws Exception {
try{
if(counter > 0){
// 批量插入
pstmt.executeBatch();
}
}catch (Exception e){
e.printStackTrace();
}finally {
if(null != pstmt) pstmt.close();
if(null != conn) conn.close();
}
}
}
public static void main(String[] args) throws Exception {
// 1. 執行環境-env
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment() ;
env.setParallelism(1);
// 2. 資料源-source
DataStreamSource<Student> inputDataStream = env.fromElements(
new Student(13, "wangwu", 20),
new Student(14, "zhaoliu", 19),
new Student(15, "wangwu", 20),
new Student(16, "zhaoliu", 19)
);
// 3. 資料終端-sink
inputDataStream.addSink(new MySQLSink());
// 4. 應用執行-execute
env.execute(StreamSinkMySQLDemo.class.getSimpleName());
}
}
此外,從Flink 1.11開始,提供JDBC Connector,更加方便保存資料至RDBMs表中,演示保存
資料MySQL資料庫表中,代碼如下所示:
package xx.xxxxx.flink.sink.mysql;
import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;
import org.apache.flink.connector.jdbc.JdbcConnectionOptions;
import org.apache.flink.connector.jdbc.JdbcSink;
import org.apache.flink.connector.jdbc.JdbcStatementBuilder;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import java.sql.PreparedStatement;
import java.sql.SQLException;
/**
* Flink 流計算,官方自帶Connector,將資料保存寫入RDBMs資料庫表中,比如MySQL表中
*/
public class StreamSinkJdbcDemo {
@Data
@NoArgsConstructor
@AllArgsConstructor
private static class Student{
private Integer id ;
private String name ;
private Integer age ;
}
public static void main(String[] args) throws Exception {
// 1. 執行環境-env
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
// 2. 資料源-source
DataStreamSource<Student> studentDataStream = env.fromElements(
new Student(21, "wangwu3", 20),
new Student(22, "zhaoliu4", 19),
new Student(23, "wangwu5", 20),
new Student(24, "zhaoliu6", 19)
);
// 3. 資料終端-sink
studentDataStream.addSink(
JdbcSink.sink(
"insert into db_flink.t_student (id, name, age) values (?, ?, ?)", //
new JdbcStatementBuilder<Student>(){
@Override
public void accept(PreparedStatement pstmt,
Student student) throws SQLException {
pstmt.setInt(1, student.id);
pstmt.setString(2, student.name);
pstmt.setInt(3, student.age);
}
},
new JdbcConnectionOptions.JdbcConnectionOptionsBuilder()
.withDriverName("com.mysql.jdbc.Driver")
.withUrl("jdbc:mysql://node1.itcast.cn:3306/")
.withUsername("root")
.withPassword("123456")
.build()
)
);
// 4. 觸發執行-execute
env.execute(StreamSinkJdbcDemo.class.getSimpleName());
}
}
6.3 Kafka Sink
添加鏈接描述
需求:
將Flink集合中的資料通過自定義Sink保存到Kafka
代碼實作
package xx.xxxxx.flink.sink.kafka;
import cn.itcast.flink.source.mysql.StreamSourceMySQLDemo;
import com.alibaba.fastjson.JSON;
import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer;
import org.apache.flink.streaming.connectors.kafka.KafkaSerializationSchema;
import org.apache.kafka.clients.CommonClientConfigs;
import org.apache.kafka.clients.producer.ProducerRecord;
import javax.annotation.Nullable;
import java.nio.charset.StandardCharsets;
import java.util.Properties;
/**
* 案例演示:將資料保存至Kafka Topic中,直接使用官方提供Connector
* /export/server/kafka/bin/kafka-console-consumer.sh --bootstrap-server node1.itcast.cn:9092 --topic flink-topic
*/
public class StreamSinkKafkaDemo {
@Data
@NoArgsConstructor
@AllArgsConstructor
private static class Student{
private Integer id ;
private String name ;
private Integer age ;
}
/**
* 自定義KafkaSerializationSchema實作類
*/
private static class KafkaSchema implements KafkaSerializationSchema<String> {
private String topic ;
public KafkaSchema(String topicName){
this.topic = topicName ;
}
@Override
public ProducerRecord<byte[], byte[]> serialize(String element, @Nullable Long timestamp) {
ProducerRecord<byte[], byte[]> record = new ProducerRecord<>(
topic, element.getBytes(StandardCharsets.UTF_8)
);
return record;
}
}
public static void main(String[] args) throws Exception {
// 1. 執行環境-env
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment() ;
env.setParallelism(1);
// 2. 資料源-source
DataStreamSource<StreamSourceMySQLDemo.Student> studentDataStream = env.addSource(
new StreamSourceMySQLDemo.MySQLSource()
);
// 3. 資料轉換-transformation
SingleOutputStreamOperator<String> jsonDataStream = studentDataStream.map(
new MapFunction<StreamSourceMySQLDemo.Student, String>() {
@Override
public String map(StreamSourceMySQLDemo.Student student) throws Exception {
return JSON.toJSONString(student);
}
}
);
// 4. 資料終端-sink
String topic = "flink-topic" ;
// a. Kafka 生產者配置屬性
Properties props = new Properties();
props.setProperty(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG, "node1.itcast.cn:9092");
// b. Kafka 資料序列化Schema資訊x
KafkaSerializationSchema<String> kafkaSchema = new KafkaSchema(topic);
// c. 創建FlinkKafkaProducer物件
FlinkKafkaProducer<String> kafkaProducer = new FlinkKafkaProducer<String>(
topic, //
kafkaSchema, //
props,
FlinkKafkaProducer.Semantic.EXACTLY_ONCE //
);
// d. 添加Sink
jsonDataStream.addSink(kafkaProducer);
// 5. 應用執行-execute
env.execute(StreamSinkKafkaDemo.class.getSimpleName());
}
}
6.4 Redis Sink
API
通過Flink 操作Redis 其實可以通過傳統的Jedis 連接池JedisPool 進行Redis 的相關操作,但
是Flink 提供了專門操作Redis 的RedisSink,使用起來更方便,而且不用考慮性能的問題,接下來
將主要介紹RedisSink 如何使用,
redis
RedisSink 核心類是RedisMapper 是一個介面,使用時我們要撰寫自己的redis 操作類實作
這個介面中的三個方法,如下所示
1.getCommandDescription() :
設定使用的Redis 資料結構型別,和key 的名稱,通過RedisCommand 設定資料結構型別
2.String getKeyFromData(T data):
設定value 中的鍵值對key的值
3.String getValueFromData(T data);
設定value 中的鍵值對value的值
使用RedisCommand設定資料結構型別時和redis結構對應關系

可以連接到不同Redis環境(單機Redis服務、集群Redis服務及Sentinel Redis服務),配置
Config:
需求
將Flink集合中的資料通過自定義Sink保存到Redis
代碼實作
package xx.xxxxxx.flink.sink.redis;
import org.apache.flink.api.common.functions.FilterFunction;
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.connectors.redis.RedisSink;
import org.apache.flink.streaming.connectors.redis.common.config.FlinkJedisPoolConfig;
import org.apache.flink.streaming.connectors.redis.common.mapper.RedisCommand;
import org.apache.flink.streaming.connectors.redis.common.mapper.RedisCommandDescription;
import org.apache.flink.streaming.connectors.redis.common.mapper.RedisMapper;
import org.apache.flink.util.Collector;
/**
* 案例演示:將資料保存至Redis中,直接使用官方提供Connector
* https://bahir.apache.org/docs/flink/current/flink-streaming-redis/
*/
public class StreamSinkRedisDemo {
/**
* 自定義RedisMapper,實作其中三個方法,分別為命令、key和Value
*/
private static class StreamRedisMapper implements RedisMapper<Tuple2<String, Integer>> {
@Override
public RedisCommandDescription getCommandDescription() {
return new RedisCommandDescription(RedisCommand.HSET, "wordcount");
}
@Override
public String getKeyFromData(Tuple2<String, Integer> data) {
return data.f0;
}
@Override
public String getValueFromData(Tuple2<String, Integer> data) {
return Integer.toString(data.f1);
}
}
public static void main(String[] args) throws Exception {
// 1. 執行環境-env
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment() ;
env.setParallelism(1);
// 2. 資料源-source:Socket接收資料
DataStreamSource<String> inputDataStream = env.socketTextStream("node1.itcast.cn", 9999);
// 3. 轉換處理-transformation:呼叫DataSet函式,處理資料
SingleOutputStreamOperator<Tuple2<String, Integer>> resultDataStream = inputDataStream
// a. 過濾資料
.filter(new FilterFunction<String>() {
@Override
public boolean filter(String line) throws Exception {
return null != line && line.trim().length() > 0;
}
})
// b. 分割單詞
.flatMap(new FlatMapFunction<String, String>() {
@Override
public void flatMap(String line, Collector<String> out) throws Exception {
String[] words = line.trim().split("\\W+");
for (String word : words) {
out.collect(word);
}
}
})
// c. 轉換二元組,表示每個單詞出現一次
.map(new MapFunction<String, Tuple2<String, Integer>>() {
@Override
public Tuple2<String, Integer> map(String word) throws Exception {
return Tuple2.of(word, 1);
}
})
// d. 按照單詞分組及對組內聚合操作
.keyBy(0).sum(1);
// 4. 資料終端-sink
// a. Redis 服務配置設定
FlinkJedisPoolConfig config = new FlinkJedisPoolConfig.Builder()
.setHost("node1.itcast.cn")
.setPort(6379)
.setDatabase(0)
.setMinIdle(1)
.setMaxIdle(8)
.setMaxTotal(8)
.build();
// b. 創建RedisSink物件
RedisSink<Tuple2<String, Integer>> redisSink = new RedisSink<>(
config, new StreamRedisMapper()
) ;
// c. 添加Sink
resultDataStream.addSink(redisSink);
// 5. 觸發執行
env.execute(StreamSinkRedisDemo.class.getSimpleName());
}
}
轉載請註明出處,本文鏈接:https://www.uj5u.com/qita/287121.html
標籤:其他
上一篇:大資料-Hadoop2.7實作PageRank演算法-MapReduce&HDFS
下一篇:Vim入門教程
