
前幾篇文章給大家講解了個關于Flink批處理相關的技術點,今天給大家將講解下關于流式處理的DataSource與DataSink ?(Flink專輯)
一、入門案例
使用Flink的流式處理來計算wordCount
實作步驟:
- 獲取Flink批處理運行環境
- 構建一個socket源
- 使用Flink操作進行單詞統計
- 列印
說明:如果 linux 上沒有安裝 nc 服務 ,使用 yum 安裝
yum install -y nc
參考代碼:
import org.apache.flink.streaming.api.scala._
/**
* @author 流處理wordCount
* @date 2020/8/26 22:03
* @version 1.0
*/
object StreamWordCount{
def main(args: Array[String]): Unit = {
//1.構建流處理的運行環境
val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
//2.使用socket來接收資料
val socketData: DataStream[String] = env.socketTextStream("node01", 9999)
//3.對資料進行切分將每個單詞獲取出來后面加1 使用keyBy進行分組使用sum進行求核
val result = socketData.flatMap(_.split(" ")).map((_, 1)).keyBy(0).sum(1)
//4.將結果輸出并啟動
result.print("批處理wordCount")
env.execute("批處理wordCount")
}
}
二、Flink 在流處理上常見的 Source
注意:Flink 在流處理上常見的 Source ,Flink 在流處理上的 source 和在批處理上的 source 基本一致,
2.1 基本地集合的source
我在這就不給大家一一介紹了,我在這里給大家入門,想學習更多關于本地的Data Source請看?DataSource
import org.apache.flink.streaming.api.scala._
/**
* @author
* @date 2020/9/20 18:57
* @version 1.0
*/
object StreamDataSource {
def main(args: Array[String]): Unit = {
//1.構建流處理運行環境
val env = StreamExecutionEnvironment.getExecutionEnvironment
//2.使用fromCollection 構建資料集
val data = env.fromCollection(List("張三", "李四", "王五"))
//3.輸出
data.print()
env.execute("StreamDataSource")
}
}
2.2 基本地檔案的Source
import org.apache.flink.streaming.api.scala._
/**
* @author
* @date 2020/9/20 19:05
* @version 1.0
*/
object StreamFileSource {
def main(args: Array[String]): Unit = {
//1.構建流處理運行環境
val env = StreamExecutionEnvironment.getExecutionEnvironment
//2.使用檔案構建資料集
val dataSource = env.readTextFile("./data/wordcount.txt")
//3.列印
dataSource.print()
env.execute("StreamFileSource")
}
}
2.3 自定義Source
除了預定義的 Source 外,我們還可以通過實作 SourceFunction 來自定義 Source,然 后通過 StreamExecutionEnvironment.addSource(sourceFunction)添加進來, 比如讀取 Kafka 資料的 Source: addSource(new FlinkKafkaConsumer08<>); 我們可以實作以下三個介面來自定義 Source:
2.3.1 SourceFunction:創建非并行資料源
import org.apache.flink.streaming.api.functions.source.SourceFunction
import org.apache.flink.streaming.api.scala._
/**
* @author
* @date 2020/9/20 19:22
* @version 1.0
*/
object StreamCustomerNoParallelSource {
def main(args: Array[String]): Unit = {
//1.構建流式處理運行環境
val env = StreamExecutionEnvironment.getExecutionEnvironment
//2.使用自定義資料流
val dataSource = env.addSource(new MyNoParallel()).setParallelism(1)
//3.列印
dataSource.print()
//4.執行程式
env.execute("StreamCustomerNoParallelSource")
}
class MyNoParallel() extends SourceFunction[Long] {
// 定義一個變數
var number: Long = 1L
var isRunning: Boolean = true
override def run(sourceContext: SourceFunction.SourceContext[Long]): Unit = {
while (isRunning) {
sourceContext.collect(number)
number += 1
Thread.sleep(1000)
if (number == 10) {
cancel()
}
}
}
override def cancel(): Unit = {
isRunning = false
}
}
}
2.3.2 ParallelSourceFunction:創建并行資料源,
import org.apache.flink.streaming.api.functions.source.{ParallelSourceFunction, SourceFunction}
import org.apache.flink.streaming.api.scala._
/**
* @author
* @date 2020/9/20 20:40
* @version 1.0
*/
object StreamCustomerParallelSource {
def main(args: Array[String]): Unit = {
//1.構建流處理運行環境
val env = StreamExecutionEnvironment.getExecutionEnvironment
//2.基于自定義ParallelSource資料源創建并行的資料
val source = env.addSource(new MyParallelSource()).setParallelism(1)
//3.列印輸出
source.print()
//4.執行任務
env.execute("StreamCustomerParallelSource")
}
class MyParallelSource() extends ParallelSourceFunction[Long] {
//1.定義一個Long型別的變數
var number: Long = 1L
//2.定義一個變數
var isRunning: Boolean = true
override def run(sourceContext: SourceFunction.SourceContext[Long]): Unit = {
while (isRunning) {
sourceContext.collect(number)
number += 1
if (number > 20) {
cancel()
}
}
}
override def cancel(): Unit = {
isRunning = false
}
}
}
2.3.3 RichParallelSourceFunction:創建并行資料源,
import org.apache.flink.streaming.api.functions.source.{RichParallelSourceFunction, SourceFunction}
import org.apache.flink.streaming.api.scala._
/**
* @author
* @date 2020/9/20 20:48
* @version 1.0
*/
object StreamCustomerRichParallelSource {
def main(args: Array[String]): Unit = {
//1.構建流式處理資料集
val env = StreamExecutionEnvironment.getExecutionEnvironment
//2.基于RichParallelSource并行資料源構建資料集
val dataSource = env.addSource(new RichParallelSource()).setParallelism(2)
dataSource.map(line=>{
println("接收到的資料:" + line)
line
})
env.execute("StreamCustomerRichParallelSource")
}
class RichParallelSource() extends RichParallelSourceFunction[Long]{
//1.定義一個Long型別的變數
var number: Long = 1L
//2.定義一個變數
var isRunning: Boolean = true
override def run(sourceContext: SourceFunction.SourceContext[Long]): Unit = {
while (isRunning) {
sourceContext.collect(number)
number += 1
Thread.sleep(1000)
if (number > 20) {
cancel()
}
}
}
override def cancel(): Unit = {
isRunning=false
}
}
}
2.3.4 基于 kafka 的 source 操作
在這里我就不過多講解了關于Kafka的常用的命令,如果想學的可以點擊-> kfka常用的操作
代碼示例:
import java.util.Properties
import org.apache.flink.api.common.serialization.SimpleStringSchema
import org.apache.flink.streaming.api.scala._
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer011
/**
* @author 消費kafka中的資料
* @date 2020/9/21 22:53
* @version 1.0
*/
object StreamKafkaSource {
def main(args: Array[String]): Unit = {
//1.構建流處理運行環境
val env = StreamExecutionEnvironment.getExecutionEnvironment
//2.指定消費主題
var topic = "FlinkAsKafka"
//2.1設定配置資訊
val porps = new Properties()
porps.setProperty("bootstrap.servers", "node01:9092,node02:9092,node03:9092")
porps.setProperty("group.id", "test01")
porps.setProperty("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer")
porps.setProperty("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer")
//3.基于Flink構建kafka消費者
val kafka = new FlinkKafkaConsumer011[String](topic, new SimpleStringSchema(), porps)
//4.設定Flink層最新的資料開始消費
kafka.setStartFromLatest()
//5.基于kafka構建資料源
val data = env.addSource(kafka)
//6.列印輸出
data.print()
env.execute("StreamKafkaSource")
}
}
2.3.5 基于 mysql 的 source 操作
上面就是 Flink 自帶的 Kafka source,那么接下來就模仿著寫一個從 MySQL 中讀取資料 的 Source,
import java.sql.{Connection, DriverManager, PreparedStatement}
import org.apache.flink.configuration.Configuration
import org.apache.flink.streaming.api.functions.source.{RichSourceFunction, SourceFunction}
import org.apache.flink.streaming.api.scala._
/**
* @author 基于MySQL的source操作
* @date 2020/9/21 23:17
* @version 1.0
*/
object StreamFromMysqlSource {
case class User(id: String, user_id: String, user_name: String, phone: String, lan_id: String, region_id: String)
def main(args: Array[String]): Unit = {
//1.創建流式執行環境
val env = StreamExecutionEnvironment.getExecutionEnvironment
//2.添加自定義mysql資料源
val source = env.addSource(new MySqlSource())
//3.輸出
source.print()
//4.任務執行
env.execute("StreamFromMysqlSource")
}
class MySqlSource() extends RichSourceFunction[User] {
//1.宣告Connection物件
var connection: Connection = null
//2.宣告 PreparedStatement 物件
var ps: PreparedStatement = null
override def open(parameters: Configuration): Unit = {
var driver = "com.mysql.jdbc.Driver"
var url = "jdbc:mysql://node01:3306/datax_web"
var username = "root"
var password = "123456"
Class.forName(driver)
connection = DriverManager.getConnection(url, username, password)
var sql =
"""
|SELECT id,user_id,user_name,phone,lan_id,region_id
|FROM user
|""".stripMargin
ps = connection.prepareStatement(sql)
}
override def run(sourceContext: SourceFunction.SourceContext[User]): Unit = {
val queryResultSet = ps.executeQuery()
while (queryResultSet.next()) {
val id = queryResultSet.getString("id")
val user_id = queryResultSet.getString("user_id")
val user_name = queryResultSet.getString("user_name")
val phone = queryResultSet.getString("phone")
val lan_id = queryResultSet.getString("lan_id")
val region_id = queryResultSet.getString("region_id")
sourceContext.collect(User(id, user_id, user_name, phone, lan_id, region_id))
}
}
override def cancel(): Unit = {
if (connection != null) {
connection.close()
}
if (ps != null) {
ps.close()
}
}
}
}
三、Flink 常用的DataSink
3.1 將資料 sink 到本地檔案
import org.apache.flink.streaming.api.scala._
/**
* @author
* @date 2020/9/22 22:41
* @version 1.0
*/
object StreamFileSourceSinkFile {
def main(args: Array[String]): Unit = {
//1.構建流式處理運行環境
val env = StreamExecutionEnvironment.getExecutionEnvironment
//2.讀取本地檔案構建資料集
val data = env.readTextFile("./data/wordcount.txt")
//3.檔案輸出
data.writeAsText("./data/wordcountSink.txt").setParallelism(1)
//4.開始執行
env.execute("StreamFileSourceSinkFile")
}
}
3.2 Sink 到本地集合
import org.apache.flink.streaming.api.scala._
/**
* @author
* @date 2020/9/22 22:50
* @version 1.0
*/
object StreamFromCollectionSourceFile {
def main(args: Array[String]): Unit = {
//1.構建流處理運行環境
val env = StreamExecutionEnvironment.getExecutionEnvironment
//2.使用FromCollection構建資料集
val data = env.fromCollection(List((1, "張三"), (2, "李四"), (1, "趙劉")))
//3.將檔案輸出
data.writeAsText("./data/fromCollection.txt").setParallelism(1)
//4.執行任務
env.execute("StreamFromCollectionSourceFile")
}
}
3.3 Sink將資料 到 HDFS
import org.apache.flink.streaming.api.scala._
/**
* @author
* @date 2020/9/22 22:41
* @version 1.0
*/
object StreamFileSourceSinkFileHDFS {
def main(args: Array[String]): Unit = {
//1.構建流式處理運行環境
val env = StreamExecutionEnvironment.getExecutionEnvironment
//2.讀取本地檔案構建資料集
val data = env.readTextFile("./data/wordcount.txt")
//3.檔案輸出
data.writeAsText("hdfs://node01:8020/data/wordcountSink.txt").setParallelism(1)
//4.開始執行
env.execute("StreamFileSourceSinkFile")
}
}
3.4 Sink將資料 到 Kafka
import java.util.Properties
import org.apache.flink.api.common.serialization.SimpleStringSchema
import org.apache.flink.streaming.api.scala._
import org.apache.flink.streaming.connectors.kafka.{FlinkKafkaConsumer011, FlinkKafkaProducer011}
/**
* @author
* @date 2020/9/22 23:17
* @version 1.0
*/
object StreamKafkaSink {
def main(args: Array[String]): Unit = {
//1.構建流處理運行環境
val env = StreamExecutionEnvironment.getExecutionEnvironment
//2.構建資料集
val dataSource: DataStream[String] = env.fromElements("1,小麗,北京,女")
//3.構建組態檔
val prop = new Properties()
prop.setProperty("bootstrap.servers", "node01:9092,node02:9092,node03:9092")
//4.連接Kafka
val producer: FlinkKafkaProducer011[String] = new FlinkKafkaProducer011[String]("FlinkAsKafka", new SimpleStringSchema(), prop)
//5.將資料打入kafka
dataSource.addSink(producer)
//6.執行任務
env.execute("StreamKafkaSink")
}
}
3.4 Sink將資料 到 MySQL
import java.sql.{Connection, DriverManager, PreparedStatement}
import org.apache.flink.streaming.api.functions.sink.RichSinkFunction
import org.apache.flink.configuration.Configuration
import org.apache.flink.streaming.api.scala._
/**
* @author
* @date 2020/9/22 23:35
* @version 1.0
*/
object StreamMysqlSink {
case class Student(id: Int, name: String, addr: String, sex: String)
def main(args: Array[String]): Unit = {
//1.構建流式處理運行環境
val env = StreamExecutionEnvironment.getExecutionEnvironment
//2.資料準備
val dataSource: DataStream[Student] = env.fromElements(
Student(1, "張三", "上海", "男"),
Student(2, "李四", "北京", "女"),
Student(3, "王五", "上海", "男"),
Student(4, "趙劉", "廣東", "男")
)
dataSource.addSink(new StudentSinkToMysql)
env.execute("StreamMysqlSink")
}
class StudentSinkToMysql extends RichSinkFunction[Student] {
var connection: Connection = null
var ps: PreparedStatement = null
override def open(parameters: Configuration): Unit = {
var driver = "com.mysql.jdbc.Driver"
var url = "jdbc:mysql://node01:3306/text?characterEncoding=utf-8&useSSL=false"
var username = "root"
var password = "123456"
//加載驅動
Class.forName(driver)
//創建連接
connection = DriverManager.getConnection(url,username,password)
ps = connection.prepareStatement("insert into student(id,name,addr,sex) values (?,?,?,?);")
}
override def close(): Unit = {
if (connection != null) {
connection.close()
}
if (ps != null) {
ps.close()
}
}
override def invoke(value: Student): Unit = {
ps.setInt(1,value.id)
ps.setString(2,value.name)
ps.setString(3,value.addr)
ps.setString(4,value.sex)
ps.executeUpdate()
}
}
}
轉載請註明出處,本文鏈接:https://www.uj5u.com/houduan/137360.html
標籤:python
