主頁 > 後端開發 > Flink從入門到入土(詳細教程)

Flink從入門到入土(詳細教程)

2020-09-13 12:39:47 後端開發

和其他所有的計算框架一樣,flink也有一些基礎的開發步驟以及基礎,核心的API,從開發步驟的角度來講,主要分為四大部分

Flink從入門到入土

1.Environment

Flink從入門到入土

Flink Job在提交執行計算時,需要首先建立和Flink框架之間的聯系,也就指的是當前的flink運行環境,只有獲取了環境資訊,才能將task調度到不同的taskManager執行,而這個環境物件的獲取方式相對比較簡單

// 批處理環境
val env = ExecutionEnvironment.getExecutionEnvironment
// 流式資料處理環境
val env = StreamExecutionEnvironment.getExecutionEnvironment

 

2.Source

Flink從入門到入土

Flink框架可以從不同的來源獲取資料,將資料提交給框架進行處理, 我們將獲取資料的來源稱之為資料源.

2.1.從集合讀取資料

一般情況下,可以將資料臨時存盤到記憶體中,形成特殊的資料結構后,作為資料源使用,這里的資料結構采用集合型別是比較普遍的

import org.apache.flink.streaming.api.scala._

/**
 * description: SourceList 
 * date: 2020/8/28 19:02 
 * version: 1.0
 *
 * @author 陽斌
 *         郵箱:[email protected]
 *         類的說明:從集合讀取資料
 */
object SourceList {

  def main(args: Array[String]): Unit = {
      //1.創建執行的環境
    val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment

    //2.從集合中讀取資料
    val sensorDS: DataStream[WaterSensor] = env.fromCollection(
      // List(1,2,3,4,5)
      List(
        WaterSensor("ws_001", 1577844001, 45.0),
        WaterSensor("ws_002", 1577844015, 43.0),
        WaterSensor("ws_003", 1577844020, 42.0)
      )
    )
    //3.列印
    sensorDS.print()
    //4.執行
    env.execute("sensor")

  }

  /**
   * 定義樣例類:水位傳感器:用于接收空高資料
   *
   * @param id 傳感器編號
   * @param ts 時間戳
   * @param vc 空高
   */
  case class WaterSensor(id: String, ts: Long, vc: Double)
}

 

Flink從入門到入土

2.2從檔案中讀取資料

通常情況下,我們會從存盤介質中獲取資料,比較常見的就是將日志檔案作為資料源

import org.apache.flink.streaming.api.scala._

/**
 * description: SourceList 
 * date: 2020/8/28 19:02 
 * version: 1.0
 *
 * @author 陽斌
 *         郵箱:[email protected]
 *         類的說明:從檔案讀取資料
 */
object SourceFile {

  def main(args: Array[String]): Unit = {
    //1.創建執行的環境
    val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
    //2.從指定路徑獲取資料
    val fileDS: DataStream[String] = env.readTextFile("input/data.log")

    //3.列印
    fileDS.print()

    //4.執行
    env.execute("sensor")

  }
}
/**
 * 在讀取檔案時,檔案路徑可以是目錄也可以是單一檔案,如果采用相對檔案路徑,會從當前系統引數user.dir中獲取路徑
 * System.getProperty("user.dir")
 */
/**
 * 如果在IDEA中執行代碼,那么系統引數user.dir自動指向專案根目錄,
 * 如果是standalone集群環境, 默認為集群節點根目錄,當然除了相對路徑以外,
 * 也可以將路徑設定為分布式檔案系統路徑,如HDFS
 val fileDS: DataStream[String] =
 env.readTextFile( "hdfs://hadoop02:9000/test/1.txt")
 */

 

Flink從入門到入土

如果是standalone集群環境, 默認為集群節點根目錄,當然除了相對路徑以外,也可以將路徑設定為分布式檔案系統路徑,如HDFS

val fileDS: DataStream[String] =
env.readTextFile( "hdfs://hadoop02:9000/test/1.txt")

 

默認讀取時,flink的依賴關系中是不包含Hadoop依賴關系的,所以執行上面代碼時,會出現錯誤,

Flink從入門到入土

解決方法就是增加相關依賴jar包就可以了

Flink從入門到入土

2.3 kafka讀取資料

Kafka作為訊息傳輸佇列,是一個分布式的,高吞吐量,易于擴展地基于主題發布/訂閱的訊息系統,在現今企業級開發中,Kafka 和 Flink成為構建一個實時的資料處理系統的首選

2.3.1 引入kafka連接器的依賴

<!-- https://mvnrepository.com/artifact/org.apache.flink/flink-connector-kafka-0.11 -->
<dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-connector-kafka-0.11_2.11</artifactId>
    <version>1.10.0</version>
</dependency>

 

2.3.2 代碼實作參考

import java.util.Properties

import org.apache.flink.streaming.api.scala._
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer011
import org.apache.flink.streaming.util.serialization.SimpleStringSchema

/**
 * description: SourceList 
 * date: 2020/8/28 19:02 
 * version: 1.0
 *
 * @author 陽斌
 *         郵箱:[email protected]
 *         類的說明:從kafka讀取資料
 */
object SourceKafka {

  def main(args: Array[String]): Unit = {
    val env: StreamExecutionEnvironment =
      StreamExecutionEnvironment.getExecutionEnvironment

    val properties = new Properties()
    properties.setProperty("bootstrap.servers", "hadoop02:9092")
    properties.setProperty("group.id", "consumer-group")
    properties.setProperty("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer")
    properties.setProperty("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer")
    properties.setProperty("auto.offset.reset", "latest")

    val kafkaDS: DataStream[String] = env.addSource(
      new FlinkKafkaConsumer011[String](
        "sensor",
        new SimpleStringSchema(),
        properties)
    )
    kafkaDS.print()
    env.execute("sensor")
  }
}

 

2.4 自定義資料源

大多數情況下,前面的資料源已經能夠滿足需要,但是難免會存在特殊情況的場合,所以flink也提供了能自定義資料源的方式

2.4.1  創建自定義資料源

import com.atyang.day01.Source.SourceList.WaterSensor
import org.apache.flink.streaming.api.functions.source.SourceFunction

import scala.util.Random

/**
 * description: ss 
 * date: 2020/8/28 20:36 
 * version: 1.0
 *
 * @author 陽斌
 *         郵箱:[email protected]
 *         類的說明:自定義資料源
 */
class MySensorSource extends SourceFunction[WaterSensor] {
  var flg = true
  override def run(ctx: SourceFunction.SourceContext[WaterSensor]): Unit = {
    while ( flg ) {
      // 采集資料
      ctx.collect(
        WaterSensor(
          "sensor_" +new Random().nextInt(3),
          1577844001,
          new Random().nextInt(5)+40
        )
      )
      Thread.sleep(100)
    }
  }

  override def cancel(): Unit = {
    flg = false;
  }
}

 

Flink從入門到入土

3.Transform

Flink從入門到入土

在Spark中,算子分為轉換算子和行動算子,轉換算子的作用可以通過算子方法的呼叫將一個RDD轉換另外一個RDD,Flink中也存在同樣的操作,可以將一個資料流轉換為其他的資料流,

轉換程序中,資料流的型別也會發生變化,那么到底Flink支持什么樣的資料型別呢,其實我們常用的資料型別,Flink都是支持的,比如:Long, String, Integer, Int, 元組,樣例類,List, Map等,

3.1 map

  • 映射:將資料流中的資料進行轉換, 形成新的資料流,消費一個元素并產出一個元素
  • 引數:Scala匿名函式或MapFunction
  • 回傳:DataStream
import org.apache.flink.streaming.api.scala._

/**
 * description: SourceList 
 * date: 2020/8/28 19:02 
 * version: 1.0
 *
 * @author 陽斌
 *         郵箱:[email protected]
 *         類的說明:從集合讀取資料
 */
object Transfrom_map {

  def main(args: Array[String]): Unit = {
      //1.創建執行的環境
    val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment

    //2.從集合中讀取資料
    val sensorDS: DataStream[WaterSensor] = env.fromCollection(
      // List(1,2,3,4,5)
      List(
        WaterSensor("ws_001", 1577844001, 45.0),
        WaterSensor("ws_002", 1577844015, 43.0),
        WaterSensor("ws_003", 1577844020, 42.0)
      )
    )

    val sensorDSMap = sensorDS.map(x => (x.id+"_1",x.ts+"_1",x.vc + 1))

    //3.列印
    sensorDSMap.print()
    //4.執行
    env.execute("sensor")

  }

  /**
   * 定義樣例類:水位傳感器:用于接收空高資料
   *
   * @param id 傳感器編號
   * @param ts 時間戳
   * @param vc 空高
   */
  case class WaterSensor(id: String, ts: Long, vc: Double)


}

 

Flink從入門到入土

3.1.1 MapFunction

Flink為每一個算子的引數都至少提供了Scala匿名函式和函式類兩種的方式,其中如果使用函式類作為引數的話,需要讓自定義函式繼承指定的父類或實作特定的介面,例如:MapFunction

sensor-data.log 檔案資料

sensor_1,1549044122,10
sensor_1,1549044123,20
sensor_1,1549044124,30
sensor_2,1549044125,40
sensor_1,1549044126,50
sensor_2,1549044127,60
sensor_1,1549044128,70
sensor_3,1549044129,80
sensor_3,1549044130,90
sensor_3,1549044130,100
import org.apache.flink.streaming.api.scala._

/**
 * description: SourceList 
 * date: 2020/8/28 19:02 
 * version: 1.0
 *
 * @author 陽斌
 *         郵箱:[email protected]
 *         類的說明:從檔案讀取資料
 */
object SourceFileMap {

  def main(args: Array[String]): Unit = {
    //1.創建執行的環境
    val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
    //2.從指定路徑獲取資料
    val fileDS: DataStream[String] = env.readTextFile("input/sensor-data.log")

    val MapDS = fileDS.map(
      lines => {
        //更加逗號切割 獲取每個元素
        val datas: Array[String] = lines.split(",")
        WaterSensor(datas(0), datas(1).toLong, datas(2).toInt)
      }
    )

    //3.列印
    MapDS.print()

    //4.執行
    env.execute("map")

  }

  /**
   * 定義樣例類:水位傳感器:用于接收空高資料
   *
   * @param id 傳感器編號
   * @param ts 時間戳
   * @param vc 空高
   */
  case class WaterSensor(id: String, ts: Long, vc: Double)

}

 

Flink從入門到入土

import org.apache.flink.api.common.functions.MapFunction
import org.apache.flink.streaming.api.scala._

/**
 * description: SourceList 
 * date: 2020/8/28 19:02 
 * version: 1.0
 *
 * @author 陽斌
 *         郵箱:[email protected]
 *         類的說明:從檔案讀取資料
 */
object Transform_MapFunction {

  def main(args: Array[String]): Unit = {
    //1.創建執行的環境
    val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
    //2.從指定路徑獲取資料
    val sensorDS: DataStream[String] = env.readTextFile("input/sensor-data.log")

     sensorDS.map()

    //3.列印
  //  MapDS.print()

    //4.執行
    env.execute("map")

  }

  /**
   * 自定義繼承 MapFunction
   * MapFunction[T,O]
   * 自定義輸入和輸出
   *
   */
  class MyMapFunction extends MapFunction[String,WaterSensor]{
    override def map(t: String): WaterSensor = {

      val datas: Array[String] = t.split(",")

      WaterSensor(datas(0),datas(1).toLong,datas(2).toInt)
    }
  }

  /**
   * 定義樣例類:水位傳感器:用于接收空高資料
   *
   * @param id 傳感器編號
   * @param ts 時間戳
   * @param vc 空高
   */
  case class WaterSensor(id: String, ts: Long, vc: Double)

}

 

Flink從入門到入土

3.1.2 RichMapFunction

所有Flink函式類都有其Rich版本,它與常規函式的不同在于,可以獲取運行環境的背景關系,并擁有一些生命周期方法,所以可以實作更復雜的功能,也有意味著提供了更多的,更豐富的功能,例如:RichMapFunction

sensor-data.log 檔案資料 同上一致

import org.apache.flink.api.common.functions.{MapFunction, RichMapFunction}
import org.apache.flink.configuration.Configuration
import org.apache.flink.streaming.api.scala._

/**
 * description: SourceList 
 * date: 2020/8/28 19:02 
 * version: 1.0
 *
 * @author 陽斌
 *         郵箱:[email protected]
 *         類的說明:從檔案讀取資料
 */
object Transform_RichMapFunction {

  def main(args: Array[String]): Unit = {
    //1.創建執行的環境
    val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
    //2.從指定路徑獲取資料
    val sensorDS: DataStream[String] = env.readTextFile("input/sensor-data.log")

    val myMapDS: DataStream[WaterSensor] = sensorDS.map(new MyRichMapFunction)

    //3.列印
    myMapDS.print()

    //4.執行
    env.execute("map")

  }

  /**
   * 自定義繼承 MapFunction
   * MapFunction[T,O]
   * 自定義輸入和輸出
   *
   */
  class MyRichMapFunction extends RichMapFunction[String,WaterSensor]{

    override def map(value: String): WaterSensor = {
      val datas: Array[String] = value.split(",")
      //      WaterSensor(datas(0), datas(1).toLong, datas(2).toInt)
      WaterSensor(getRuntimeContext.getTaskName, datas(1).toLong, datas(2).toInt)
    }

    // 富函式提供了生命周期方法
    override def open(parameters: Configuration): Unit = {}

    override def close(): Unit = {}


  }

  /**
   * 定義樣例類:水位傳感器:用于接收空高資料
   *
   * @param id 傳感器編號
   * @param ts 時間戳
   * @param vc 空高
   */
  case class WaterSensor(id: String, ts: Long, vc: Double)

}

 

Rich Function有一個生命周期的概念,典型的生命周期方法有:

  • open()方法是rich function的初始化方法,當一個算子例如map或者filter被調 用之前open()會被呼叫
  • close()方法是生命周期中的最后一個呼叫的方法,做一些清理作業
  • getRuntimeContext()方法提供了函式的RuntimeContext的一些資訊,例如函式執行         的并行度,任務的名字,以及state狀態

3.1.3 flatMap

  • 扁平映射:將資料流中的整體拆分成一個一個的個體使用,消費一個元素并產生零到多個元素
  • 引數:Scala匿名函式或FlatMapFunction
  • 回傳:DataStream

Flink從入門到入土

 

import org.apache.flink.streaming.api.scala._

/**
 * description: SourceList
 * date: 2020/8/28 19:02
 * version: 1.0
 *
 * @author 陽斌
 *         郵箱:[email protected]
 *         類的說明:FlatMap
 */
object Transform_FlatMap {

  def main(args: Array[String]): Unit = {

    // 1.創建執行環境
    val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
    env.setParallelism(1)

    // 2.讀取資料
    val listDS: DataStream[List[Int]] = env.fromCollection(
      List(
        List(1, 2, 3, 4),
        List(5, 6, 7,1,1,1)
      )
    )

    val resultDS: DataStream[Int] = listDS.flatMap(list => list)

    resultDS.print()


    // 4. 執行
    env.execute()
  }


}

 

Flink從入門到入土

3.2. filter

  • 過濾:根據指定的規則將滿足條件(true)的資料保留,不滿足條件(false)的資料丟棄
  • 引數:Scala匿名函式或FilterFunction
  • 回傳:DataStream

 

import org.apache.flink.streaming.api.scala._

/**
 * description: SourceList
 * date: 2020/8/28 19:02
 * version: 1.0
 *
 * @author 陽斌
 *         郵箱:[email protected]
 *         類的說明:Filter
 */
object Transform_Filter {

  def main(args: Array[String]): Unit = {

    // 1.創建執行環境
    val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
    env.setParallelism(1)

    // 2.讀取資料
    val listDS: DataStream[List[Int]] = env.fromCollection(
      List(
        List(1, 2, 3, 4,1, 2, 3, 4),
        List(5, 6, 7,1,1,1,1, 2, 3, 4,1, 2, 3, 4),
        List(1, 2, 3, 4),
        List(5, 6, 7,1,1,1),
        List(1, 2, 3, 4),
        List(5, 6, 7,1,1,1)
      )
    )
    // true就留下,false就拋棄
    listDS.filter(num => {
      num.size>5
      })
      .print("filter")
    // 4. 執行
    env.execute()
  }
}

 

Flink從入門到入土

3.3 keyBy

在Spark中有一個GroupBy的算子,用于根據指定的規則將資料進行分組,在flink中也有類似的功能,那就是keyBy,根據指定的key對資料進行分流

  • 分流:根據指定的Key將元素發送到不同的磁區,相同的Key會被分到一個磁區(這里磁區指的就是下游算子多個并行節點的其中一個),keyBy()是通過哈希來磁區的

Flink從入門到入土

 

  • 引數:Scala匿名函式或POJO屬性或元組索引,不能使用陣列

  • 回傳:KeyedStream

Flink從入門到入土

 

 

import org.apache.flink.streaming.api.scala._

/**
 * description: SourceList
 * date: 2020/8/28 19:02
 * version: 1.0
 *
 * @author 陽斌
 *         郵箱:[email protected]
 *         類的說明:FlatMap
 */
object Transform_KeyBy {

  def main(args: Array[String]): Unit = {

    // 1.創建執行環境
    val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
    env.setParallelism(1)

    // 2.讀取資料
    val sensorDS: DataStream[String] = env.readTextFile("input/sensor-data.log")

    //3.轉換為樣例類
    val mapDS = sensorDS.map(
      lines => {
        val datas = lines.split(",")
        WaterSensor(datas(0), datas(1).toLong, datas(2).toInt)
      }
    )

    // 4. 使用keyby進行分組
    // TODO 關于回傳的key的型別:
    // 1. 如果是位置索引 或 欄位名稱 ,程式無法推斷出key的型別,所以給一個java的Tuple型別
    // 2. 如果是匿名函式 或 函式類 的方式,可以推斷出key的型別,比較推薦使用
    // *** 分組的概念:分組只是邏輯上進行分組,打上了記號(標簽),跟并行度沒有絕對的關系
    //      同一個分組的資料在一起(不離不棄)
    //      同一個磁區里可以有多個不同的組

    //        val sensorKS: KeyedStream[WaterSensor, Tuple] = mapDS.keyBy(0)
    //    val sensorKS: KeyedStream[WaterSensor, Tuple] = mapDS.keyBy("id")
    val sensorKS: KeyedStream[WaterSensor, String] = mapDS.keyBy(_.id)
    //    val sensorKS: KeyedStream[WaterSensor, String] = mapDS.keyBy(
    //      new KeySelector[WaterSensor, String] {
    //        override def getKey(value: WaterSensor): String = {
    //          value.id
    //        }
    //      }
    //    )

    sensorKS.print().setParallelism(5)

    // 4. 執行
    env.execute()
  }

  /**
   * 定義樣例類:水位傳感器:用于接收空高資料
   *
   * @param id 傳感器編號
   * @param ts 時間戳
   * @param vc 空高
   */
  case class WaterSensor(id: String, ts: Long, vc: Double)
}

 

Flink從入門到入土

3.4 shuffle

  • 打亂重組(洗牌):將資料按照均勻分布打散到下游
  • 引數:無
  • 回傳:DataStream

Flink從入門到入土

 

import org.apache.flink.streaming.api.scala._

/**
 * description: SourceList
 * date: 2020/8/28 19:02
 * version: 1.0
 *
 * @author 陽斌
 *         郵箱:[email protected]
 *         類的說明:FlatMap
 */
object Transform_Shuffle {

  def main(args: Array[String]): Unit = {

    // 1.創建執行環境
    val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
    env.setParallelism(1)

    // 2.讀取資料
    val sensorDS: DataStream[String] = env.readTextFile("input/sensor-data.log")

    val shuffleDS = sensorDS.shuffle

    sensorDS.print("data")

    shuffleDS.print("shuffle")
    // 4. 執行
    env.execute()
  }
}

 

Flink從入門到入土

3.5. split

在某些情況下,我們需要將資料流根據某些特征拆分成兩個或者多個資料流,給不同資料流增加標記以便于從流中取出,

Flink從入門到入土

需求:將水位傳感器資料按照空高高低(以40cm,30cm為界),拆分成三個流

import org.apache.flink.streaming.api.scala._

/**
 * description: SourceList
 * date: 2020/8/28 19:02
 * version: 1.0
 *
 * @author 陽斌
 *         郵箱:[email protected]
 *         類的說明:FlatMap
 */
object Transform_Split {

  def main(args: Array[String]): Unit = {

    // 1.創建執行環境
    val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
    env.setParallelism(1)

    // 2.讀取資料
    val sensorDS: DataStream[String] = env.readTextFile("input/sensor-data.log")

    // 3.轉換成樣例類
    val mapDS: DataStream[WaterSensor] = sensorDS.map(
      lines => {
        val datas: Array[String] = lines.split(",")
        WaterSensor(datas(0), datas(1).toLong, datas(2).toInt)
      }
    )
    val splitSS: SplitStream[WaterSensor] = mapDS.split(
      sensor => {
        if (sensor.vc < 40) {
          Seq("normal")
        } else if (sensor.vc < 80) {
          Seq("Warn")
        } else {
          Seq("alarm")
        }
      }
    )

    // 4. 執行
    env.execute()
  }

  /**
   * 定義樣例類:水位傳感器:用于接收空高資料
   *
   * @param id 傳感器編號
   * @param ts 時間戳
   * @param vc 空高
   */
  case class WaterSensor(id: String, ts: Long, vc: Double)
}

 

3.6 select

將資料流進行切分后,如何從流中將不同的標記取出呢,這時就需要使用select算子了,

Flink從入門到入土

 

import org.apache.flink.streaming.api.scala._

/**
 * description: SourceList
 * date: 2020/8/28 19:02
 * version: 1.0
 *
 * @author 陽斌
 *         郵箱:[email protected]
 *         類的說明:FlatMap
 */
object Transform_Split {

  def main(args: Array[String]): Unit = {

    // 1.創建執行環境
    val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
    env.setParallelism(1)

    // 2.讀取資料
    val sensorDS: DataStream[String] = env.readTextFile("input/sensor-data.log")

    // 3.轉換成樣例類
    val mapDS: DataStream[WaterSensor] = sensorDS.map(
      lines => {
        val datas: Array[String] = lines.split(",")
        WaterSensor(datas(0), datas(1).toLong, datas(2).toInt)
      }
    )
    val splitDS: SplitStream[WaterSensor] = mapDS.split(
      sensor => {
        if (sensor.vc < 40) {
          Seq("info")
        } else if (sensor.vc < 80) {
          Seq("warn")
        } else {
          Seq("error")
        }
      }
    )
    val errorDS: DataStream[WaterSensor] = splitDS.select("error")
    val warnDS: DataStream[WaterSensor] = splitDS.select("warn")
    val infoDS: DataStream[WaterSensor] = splitDS.select("info")

    infoDS.print("info")
    warnDS.print("warn")
    errorDS.print("error")

    // 4. 執行
    env.execute()
  }

  /**
   * 定義樣例類:水位傳感器:用于接收空高資料
   *
   * @param id 傳感器編號
   * @param ts 時間戳
   * @param vc 空高
   */
  case class WaterSensor(id: String, ts: Long, vc: Double)
}

 

Flink從入門到入土

3.7 connect

在某些情況下,我們需要將兩個不同來源的資料流進行連接,實作資料匹配,比如訂單支付和第三方交易資訊,這兩個資訊的資料就來自于不同資料源,連接后,將訂單支付和第三方交易資訊進行對賬,此時,才能算真正的支付完成,

Flink中的connect算子可以連接兩個保持他們型別的資料流,兩個資料流被Connect之后,只是被放在了一個同一個流中,內部依然保持各自的資料和形式不發生任何變化,兩個流相互獨立,

Flink從入門到入土

 

import org.apache.flink.streaming.api.scala._

/**
 * description: SourceList
 * date: 2020/8/28 19:02
 * version: 1.0
 *
 * @author 陽斌
 *         郵箱:[email protected]
 *         類的說明:FlatMap
 */
object Transform_Connect {

  def main(args: Array[String]): Unit = {

    // 1.創建執行環境
    val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
    env.setParallelism(1)

    // 2.讀取資料
    val sensorDS: DataStream[String] = env.readTextFile("input/sensor-data.log")

    // 3.轉換成樣例類
    val mapDS: DataStream[WaterSensor] = sensorDS.map(
      lines => {
        val datas: Array[String] = lines.split(",")
        WaterSensor(datas(0), datas(1).toLong, datas(2).toInt)
      }
    )

    // 4. 從集合中再讀取一條流
    val numDS: DataStream[Int] = env.fromCollection(List(1, 2, 3, 4, 5, 6))

    val resultCS: ConnectedStreams[WaterSensor, Int] = mapDS.connect(numDS)

    // coMap表示連接流呼叫的map,各自都需要一個 function
    resultCS.map(
      sensor=>sensor.id,
      num=>num+1
    ).print()

    // 4. 執行
    env.execute()
  }

  /**
   * 定義樣例類:水位傳感器:用于接收空高資料
   *
   * @param id 傳感器編號
   * @param ts 時間戳
   * @param vc 空高
   */
  case class WaterSensor(id: String, ts: Long, vc: Double)
}

 

Flink從入門到入土

3.8 union

對兩個或者兩個以上的DataStream進行union操作,產生一個包含所有DataStream元素的新DataStream

Flink從入門到入土

connect與 union 區別:

  1. union之前兩個流的型別必須是一樣,connect可以不一樣
  2. connect只能操作兩個流,union可以操作多個,

 

import org.apache.flink.streaming.api.scala._

/**
 * description: SourceList
 * date: 2020/8/28 19:02
 * version: 1.0
 *
 * @author 陽斌
 *         郵箱:[email protected]
 *         類的說明:FlatMap
 */
object Transform_Union {

  def main(args: Array[String]): Unit = {

    // 1.創建執行環境
    val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
    env.setParallelism(1)

    // 2. 從集合中讀取流
    val num1DS: DataStream[Int] = env.fromCollection(List(1, 2, 3, 4))
    val num2DS: DataStream[Int] = env.fromCollection(List(7, 8, 9, 10))
    val num3DS: DataStream[Int] = env.fromCollection(List(17, 18, 19, 110))

    // TODO union 真正將多條流合并成一條流
    // 合并的流,型別必須一致
    // 可以合并多條流,只要型別一致
    num1DS.union(num2DS).union(num3DS)
      .print()
    

    // 4. 執行
    env.execute()
  }

  /**
   * 定義樣例類:水位傳感器:用于接收空高資料
   *
   * @param id 傳感器編號
   * @param ts 時間戳
   * @param vc 空高
   */
  case class WaterSensor(id: String, ts: Long, vc: Double)
}

 

Flink從入門到入土

3.9 Operator

Flink作為計算框架,主要應用于資料計算處理上, 所以在keyBy對資料進行分流后,可以對資料進行相應的統計分析

3.9.1 滾動聚合算子(Rolling Aggregation)

這些算子可以針對KeyedStream的每一個支流做聚合,執行完成后,會將聚合的結果合成一個流回傳,所以結果都是DataStream

sum()

Flink從入門到入土

min()

Flink從入門到入土

max()

Flink從入門到入土

3.9.2 reduce

一個分組資料流的聚合操作,合并當前的元素和上次聚合的結果,產生一個新的值,回傳的流中包含每一次聚合的結果,而不是只回傳最后一次聚合的最終結果,

Flink從入門到入土

 

import org.apache.flink.streaming.api.scala._

/**
 * description: SourceList
 * date: 2020/8/28 19:02
 * version: 1.0
 *
 * @author 陽斌
 *         郵箱:[email protected]
 *         類的說明:Reduce
 */
object Transform_Reduce {

  def main(args: Array[String]): Unit = {

    // 1.創建執行環境
    val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
    env.setParallelism(1)

    // 2.讀取資料
    val sensorDS: DataStream[String] = env.readTextFile("input/sensor-data.log")

    // 3.轉換成樣例類
    val mapDS: DataStream[WaterSensor] = sensorDS.map(
      lines => {
        val datas: Array[String] = lines.split(",")
        WaterSensor(datas(0), datas(1).toLong, datas(2).toInt)
      }
    )
    val sensorKS: KeyedStream[WaterSensor, String] = mapDS.keyBy(_.id)
    // 輸入的型別一樣,輸出型別和輸出型別也要一樣
    // 組內的第一條資料,不進入reduce計算
    val reduceDS: DataStream[WaterSensor] = sensorKS.reduce(
      (ws1, ws2) => {
        println(ws1 + "<===>" + ws2)
        WaterSensor(ws1.id, System.currentTimeMillis(), ws1.vc + ws2.vc)
      }
    )
    reduceDS.print("reduce")
    // 4. 執行
    env.execute()
  }

  /**
   * 定義樣例類:水位傳感器:用于接收空高資料
   *
   * @param id 傳感器編號
   * @param ts 時間戳
   * @param vc 空高
   */
  case class WaterSensor(id: String, ts: Long, vc: Double)
}

 

Flink從入門到入土

3.9.3process

Flink在資料流通過keyBy進行分流處理后,如果想要處理程序中獲取環境相關資訊,可以采用process算子自定義實作 1)繼承KeyedProcessFunction抽象類,并定義泛型:[KEY, IN, OUT]

class MyKeyedProcessFunction extends KeyedProcessFunction[String, WaterSensor, String]{}
重寫方法
// 自定義KeyedProcessFunction,是一個特殊的富函式
  // 1.實作KeyedProcessFunction,指定泛型:K - key的型別, I - 上游資料的型別, O - 輸出的資料型別
  // 2.重寫 processElement方法,定義 每條資料來的時候 的 處理邏輯

/**
      * 處理邏輯:來一條處理一條
      *
      * @param value 一條資料
      * @param ctx   背景關系物件
      * @param out   采集器:收集資料,并輸出
      */
    override def processElement(value: WaterSensor, ctx: KeyedProcessFunction[String, WaterSensor, String]#Context, out: Collector[String]): Unit = {
      out.collect("我來到process啦,分組的key是="+ctx.getCurrentKey+",資料=" + value)
      // 如果key是tuple,即keyby的時候,使用的是 位置索引 或 欄位名稱,那么key獲取到是一個tuple
//      ctx.getCurrentKey.asInstanceOf[Tuple1].f0 //Tuple1需要手動引入Java的Tuple
    }

 

完整代碼:

import org.apache.flink.streaming.api.functions.KeyedProcessFunction
import org.apache.flink.streaming.api.scala._
import org.apache.flink.util.Collector

/**
 * description: SourceList
 * date: 2020/8/28 19:02
 * version: 1.0
 *
 * @author 陽斌
 *         郵箱:[email protected]
 *         類的說明:Reduce
 */
object Transform_Process {

  def main(args: Array[String]): Unit = {

    // 1.創建執行環境
    val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
    env.setParallelism(1)

    // 2.讀取資料
    val sensorDS: DataStream[String] = env.readTextFile("input/sensor-data.log")

    // 3.轉換成樣例類
    val mapDS: DataStream[WaterSensor] = sensorDS.map(
      lines => {
        val datas: Array[String] = lines.split(",")
        WaterSensor(datas(0), datas(1).toLong, datas(2).toInt)
      }
    )
    //按照ID  進行分組
    val sensorKS: KeyedStream[WaterSensor, String] = mapDS.keyBy(_.id)

    sensorKS.process(new MyKeyedProcessFunction)

    // 4. 執行
    env.execute()
  }

  // 自定義KeyedProcessFunction,是一個特殊的富函式
  // 1.實作KeyedProcessFunction,指定泛型:K - key的型別, I - 上游資料的型別, O - 輸出的資料型別
  // 2.重寫 processElement方法,定義 每條資料來的時候 的 處理邏輯
  class MyKeyedProcessFunction extends KeyedProcessFunction[String, WaterSensor, String] {
    /**
     * 處理邏輯:來一條處理一條
     *
     * @param value 一條資料
     * @param ctx   背景關系物件
     * @param out   采集器:收集資料,并輸出
     */
    override def processElement(value: WaterSensor, ctx: KeyedProcessFunction[String, WaterSensor, String]#Context, out: Collector[String]): Unit = {
      out.collect("我來到process啦,分組的key是="+ctx.getCurrentKey+",資料=" + value)
      // 如果key是tuple,即keyby的時候,使用的是 位置索引 或 欄位名稱,那么key獲取到是一個tuple
      //      ctx.getCurrentKey.asInstanceOf[Tuple1].f0 //Tuple1需要手動引入Java的Tuple
    }
  }

  /**
   * 定義樣例類:水位傳感器:用于接收空高資料
   *
   * @param id 傳感器編號
   * @param ts 時間戳
   * @param vc 空高
   */
  case class WaterSensor(id: String, ts: Long, vc: Double)
}

 

4.Sink

Flink從入門到入土

Sink有下沉的意思,在Flink中所謂的Sink其實可以表示為將資料存盤起來的意思,也可以將范圍擴大,表示將處理完的資料發送到指定的存盤系統的輸出操作

之前我們一直在使用的print方法其實就是一種Sink,

  @PublicEvolving
    public DataStreamSink<T> print(String sinkIdentifier) {
        PrintSinkFunction<T> printFunction = new PrintSinkFunction(sinkIdentifier, false);
        return this.addSink(printFunction).name("Print to Std. Out");
    }

 

官方提供了一部分的框架的sink,除此以外,需要用戶自定義實作sink

Flink從入門到入土

Flink從入門到入土

本文作者:Java知音@陽斌

轉載請註明出處,本文鏈接:https://www.uj5u.com/houduan/24435.html

標籤:Java

上一篇:資料結構—B樹、B+樹、B*樹

下一篇:為什么使用了索引查實還是慢?

標籤雲
其他(157675) Python(38076) JavaScript(25376) Java(17977) C(15215) 區塊鏈(8255) C#(7972) AI(7469) 爪哇(7425) MySQL(7132) html(6777) 基礎類(6313) sql(6102) 熊猫(6058) PHP(5869) 数组(5741) R(5409) Linux(5327) 反应(5209) 腳本語言(PerlPython)(5129) 非技術區(4971) Android(4554) 数据框(4311) css(4259) 节点.js(4032) C語言(3288) json(3245) 列表(3129) 扑(3119) C++語言(3117) 安卓(2998) 打字稿(2995) VBA(2789) Java相關(2746) 疑難問題(2699) 细绳(2522) 單片機工控(2479) iOS(2429) ASP.NET(2402) MongoDB(2323) 麻木的(2285) 正则表达式(2254) 字典(2211) 循环(2198) 迅速(2185) 擅长(2169) 镖(2155) 功能(1967) .NET技术(1958) Web開發(1951) python-3.x(1918) HtmlCss(1915) 弹簧靴(1913) C++(1909) xml(1889) PostgreSQL(1872) .NETCore(1853) 谷歌表格(1846) Unity3D(1843) for循环(1842)

熱門瀏覽
  • 【C++】Microsoft C++、C 和匯編程式檔案

    ......

    uj5u.com 2020-09-10 00:57:23 more
  • 例外宣告

    相比于斷言適用于排除邏輯上不可能存在的狀態,例外通常是用于邏輯上可能發生的錯誤。 例外宣告 Item 1:當函式不可能拋出例外或不能接受拋出例外時,使用noexcept 理由 如果不打算拋出例外的話,程式就會認為無法處理這種錯誤,并且應當盡早終止,如此可以有效地阻止例外的傳播與擴散。 示例 //不可 ......

    uj5u.com 2020-09-10 00:57:27 more
  • Codeforces 1400E Clear the Multiset(貪心 + 分治)

    鏈接:https://codeforces.com/problemset/problem/1400/E 來源:Codeforces 思路:給你一個陣列,現在你可以進行兩種操作,操作1:將一段沒有 0 的區間進行減一的操作,操作2:將 i 位置上的元素歸零。最終問:將這個陣列的全部元素歸零后操作的最少 ......

    uj5u.com 2020-09-10 00:57:30 more
  • UVA11610 【Reverse Prime】

    本人看到此題沒有翻譯,就附帶了一個自己的翻譯版本 思考 這一題,它的第一個要求是找出所有 $7$ 位反向質數及其質因數的個數。 我們應該需要質數篩篩選1~$10^{7}$的所有數,這里就不慢慢介紹了。但是,重讀題,我們突然發現反向質數都是 $7$ 位,而將它反過來后的數字卻是 $6$ 位數,這就說明 ......

    uj5u.com 2020-09-10 00:57:36 more
  • 統計區間素數數量

    1 #pragma GCC optimize(2) 2 #include <bits/stdc++.h> 3 using namespace std; 4 bool isprime[1000000010]; 5 vector<int> prime; 6 inline int getlist(int ......

    uj5u.com 2020-09-10 00:57:47 more
  • C/C++編程筆記:C++中的 const 變數詳解,教你正確認識const用法

    1、C中的const 1、區域const變數存放在堆疊區中,會分配記憶體(也就是說可以通過地址間接修改變數的值)。測驗代碼如下: 運行結果: 2、全域const變數存放在只讀資料段(不能通過地址修改,會發生寫入錯誤), 默認為外部聯編,可以給其他源檔案使用(需要用extern關鍵字修飾) 運行結果: ......

    uj5u.com 2020-09-10 00:58:04 more
  • 【C++犯錯記錄】VS2019 MFC添加資源不懂如何修改資源宏ID

    1. 首先在資源視圖中,添加資源 2. 點擊新添加的資源,復制自動生成的ID 3. 在解決方案資源管理器中找到Resource.h檔案,編輯,使用整個專案搜索和替換的方式快速替換 宏宣告 4. Ctrl+Shift+F 全域搜索,點擊查找全部,然后逐個替換 5. 為什么使用搜索替換而不使用屬性視窗直 ......

    uj5u.com 2020-09-10 00:59:11 more
  • 【C++犯錯記錄】VS2019 MFC不懂的批量添加資源

    1. 打開資源頭檔案Resource.h,在其中預先定義好宏 ID(不清楚其實ID值應該設定多少,可以先新建一個相同的資源項,再在這個資源的ID值的基礎上遞增即可) 2. 在資源視圖中選中專案資源,按F7編輯資源檔案,按 ID 型別 相對路徑的形式添加 資源。(別忘了先把檔案拷貝到專案中的res檔案 ......

    uj5u.com 2020-09-10 01:00:19 more
  • C/C++編程筆記:關于C++的參考型別,專供新手入門使用

    今天要講的是C++中我最喜歡的一個用法——參考,也叫別名。 參考就是給一個變數名取一個變數名,方便我們間接地使用這個變數。我們可以給一個變數創建N個參考,這N + 1個變數共享了同一塊記憶體區域。(參考型別的變數會占用記憶體空間,占用的記憶體空間的大小和指標型別的大小是相同的。雖然參考是一個物件的別名,但 ......

    uj5u.com 2020-09-10 01:00:22 more
  • 【C/C++編程筆記】從頭開始學習C ++:初學者完整指南

    眾所周知,C ++的學習曲線陡峭,但是花時間學習這種語言將為您的職業帶來奇跡,并使您與其他開發人員區分開。您會更輕松地學習新語言,形成真正的解決問題的技能,并在編程的基礎上打下堅實的基礎。 C ++將幫助您養成良好的編程習慣(即清晰一致的編碼風格,在撰寫代碼時注釋代碼,并限制類內部的可見性),并且由 ......

    uj5u.com 2020-09-10 01:00:41 more
最新发布
  • Rust中的智能指標:Box<T> Rc<T> Arc<T> Cell<T> RefCell<T> Weak

    Rust中的智能指標是什么 智能指標(smart pointers)是一類資料結構,是擁有資料所有權和額外功能的指標。是指標的進一步發展 指標(pointer)是一個包含記憶體地址的變數的通用概念。這個地址參考,或 ” 指向”(points at)一些其 他資料 。參考以 & 符號為標志并借用了他們所 ......

    uj5u.com 2023-04-20 07:24:10 more
  • Java的值傳遞和參考傳遞

    值傳遞不會改變本身,參考傳遞(如果傳遞的值需要實體化到堆里)如果發生修改了會改變本身。 1.基本資料型別都是值傳遞 package com.example.basic; public class Test { public static void main(String[] args) { int ......

    uj5u.com 2023-04-20 07:24:04 more
  • [2]SpinalHDL教程——Scala簡單入門

    第一個 Scala 程式 shell里面輸入 $ scala scala> 1 + 1 res0: Int = 2 scala> println("Hello World!") Hello World! 檔案形式 object HelloWorld { /* 這是我的第一個 Scala 程式 * 以 ......

    uj5u.com 2023-04-20 07:23:58 more
  • 理解函式指標和回呼函式

    理解 函式指標 指向函式的指標。比如: 理解函式指標的偽代碼 void (*p)(int type, char *data); // 定義一個函式指標p void func(int type, char *data); // 宣告一個函式func p = func; // 將指標p指向函式func ......

    uj5u.com 2023-04-20 07:23:52 more
  • Django筆記二十五之資料庫函式之日期函式

    本文首發于公眾號:Hunter后端 原文鏈接:Django筆記二十五之資料庫函式之日期函式 日期函式主要介紹兩個大類,Extract() 和 Trunc() Extract() 函式作用是提取日期,比如我們可以提取一個日期欄位的年份,月份,日等資料 Trunc() 的作用則是截取,比如 2022-0 ......

    uj5u.com 2023-04-20 07:23:45 more
  • 一天吃透JVM面試八股文

    什么是JVM? JVM,全稱Java Virtual Machine(Java虛擬機),是通過在實際的計算機上仿真模擬各種計算機功能來實作的。由一套位元組碼指令集、一組暫存器、一個堆疊、一個垃圾回收堆和一個存盤方法域等組成。JVM屏蔽了與作業系統平臺相關的資訊,使得Java程式只需要生成在Java虛擬機 ......

    uj5u.com 2023-04-20 07:23:31 more
  • 使用Java接入小程式訂閱訊息!

    更新完微信服務號的模板訊息之后,我又趕緊把微信小程式的訂閱訊息給實作了!之前我一直以為微信小程式也是要企業才能申請,沒想到小程式個人就能申請。 訊息推送平臺🔥推送下發【郵件】【短信】【微信服務號】【微信小程式】【企業微信】【釘釘】等訊息型別。 https://gitee.com/zhongfuch ......

    uj5u.com 2023-04-20 07:22:59 more
  • java -- 緩沖流、轉換流、序列化流

    緩沖流 緩沖流, 也叫高效流, 按照資料型別分類: 位元組緩沖流:BufferedInputStream,BufferedOutputStream 字符緩沖流:BufferedReader,BufferedWriter 緩沖流的基本原理,是在創建流物件時,會創建一個內置的默認大小的緩沖區陣列,通過緩沖 ......

    uj5u.com 2023-04-20 07:22:49 more
  • Java-SpringBoot-Range請求頭設定實作視頻分段傳輸

    老實說,人太懶了,現在基本都不喜歡寫筆記了,但是網上有關Range請求頭的文章都太水了 下面是抄的一段StackOverflow的代碼...自己大修改過的,寫的注釋挺全的,應該直接看得懂,就不解釋了 寫的不好...只是希望能給視頻網站開發的新手一點點幫助吧. 業務場景:視頻分段傳輸、視頻多段傳輸(理 ......

    uj5u.com 2023-04-20 07:22:42 more
  • Windows 10開發教程_編程入門自學教程_菜鳥教程-免費教程分享

    教程簡介 Windows 10開發入門教程 - 從簡單的步驟了解Windows 10開發,從基本到高級概念,包括簡介,UWP,第一個應用程式,商店,XAML控制元件,資料系結,XAML性能,自適應設計,自適應UI,自適應代碼,檔案管理,SQLite資料庫,應用程式到應用程式通信,應用程式本地化,應用程式 ......

    uj5u.com 2023-04-20 07:22:35 more