前幾篇博客給大家講解了Flink的運行時架構與環境搭建等??Flink專欄今天正式給大家講解下代碼入門的DataSource,
一、Flink入門 WordCount
記得剛學Hadoop中的MapReduce的時候第一個撰寫的代碼就是WordCount,但是使用MapReduce撰寫wordCount的時候代碼大約有三四十行代碼吧,后來又用Spark實作WordCount代碼只有三四行代碼,那為什么還要學習Flink那???Flink專欄
1.1 準備作業
匯入maven包
<properties>
<maven.compiler.source>1.8</maven.compiler.source>
<maven.compiler.target>1.8</maven.compiler.target>
<encoding>UTF-8</encoding>
<scala.version>2.11.2</scala.version>
<scala.compat.version>2.11</scala.compat.version>
<hadoop.version>2.6.0</hadoop.version>
<flink.version>1.7.2</flink.version>
<scala.binary.version>2.11</scala.binary.version>
<iheart.version>1.4.3</iheart.version>
<fastjson.version>1.2.7</fastjson.version>
</properties>
<dependencies>
<!-- 匯入scala的依賴 -->
<dependency>
<groupId>org.scala-lang</groupId>
<artifactId>scala-library</artifactId>
<version>${scala.version}</version>
</dependency>
<!-- 匯入flink streaming和scala的依賴 -->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-scala_2.11</artifactId>
<version>${flink.version}</version>
</dependency>
<!-- 匯入flink和scala的依賴 -->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-scala_2.11</artifactId>
<version>${flink.version}</version>
</dependency>
<!-- 指定flink-client API的版本 -->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-clients_2.11</artifactId>
<version>${flink.version}</version>
</dependency>
<!-- 匯入flink-table的依賴 -->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table_2.11</artifactId>
<version>${flink.version}</version>
</dependency>
<!-- 指定hadoop-client API的版本 -->
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-client</artifactId>
<version>${hadoop.version}</version>
<exclusions>
<exclusion>
<groupId>xml-apis</groupId>
<artifactId>xml-apis</artifactId>
</exclusion>
</exclusions>
</dependency>
<!-- 指定mysql-connector的依賴 -->
<dependency>
<groupId>mysql</groupId>
<artifactId>mysql-connector-java</artifactId>
<version>5.1.38</version>
</dependency>
<!-- 指定fastjson的依賴 -->
<dependency>
<groupId>com.alibaba</groupId>
<artifactId>fastjson</artifactId>
<version>1.2.60</version>
</dependency>
<dependency>
<groupId>com.jayway.jsonpath</groupId>
<artifactId>json-path</artifactId>
<version>2.3.0</version>
</dependency>
<!-- 指定flink-connector-kafka的依賴 -->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-kafka-0.11_2.11</artifactId>
<version>${flink.version}</version>
</dependency>
<!-- 指定 json/xml 轉物件的依賴包 -->
<dependency>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-core</artifactId>
<version>2.9.9</version>
</dependency>
<dependency>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-databind</artifactId>
<version>2.9.9.3</version>
</dependency>
<dependency>
<groupId>com.fasterxml.jackson.module</groupId>
<artifactId>jackson-module-scala_2.11</artifactId>
<version>2.9.9</version>
</dependency>
<!-- 指定 redis的依賴包 -->
<dependency>
<groupId>redis.clients</groupId>
<artifactId>jedis</artifactId>
<version>2.7.1</version><!--版本號可根據實際情況填寫-->
</dependency>
<!--匯入redis依賴的包-->
<dependency>
<groupId>org.apache.bahir</groupId>
<artifactId>flink-connector-redis_2.11</artifactId>
<version>1.0</version>
</dependency>
</dependencies>
<build>
<plugins>
<!-- 該插件用于將 Scala 代碼編譯成 class 檔案 -->
<plugin>
<groupId>net.alchim31.maven</groupId>
<artifactId>scala-maven-plugin</artifactId>
<version>3.2.2</version>
<executions>
<execution>
<!-- 宣告系結到 maven 的 compile 階段 -->
<goals>
<goal>testCompile</goal>
</goals>
</execution>
</executions>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-assembly-plugin</artifactId>
<version>3.0.0</version>
<configuration>
<descriptorRefs>
<descriptorRef>jar-with-dependencies</descriptorRef>
</descriptorRefs>
</configuration>
<executions>
<execution>
<id>make-assembly</id>
<phase>package</phase>
<goals>
<goal>single</goal>
</goals>
</execution>
</executions>
</plugin>
</plugins>
</build>
準備資料
hello word hello word
hello flink hello spark
hello kafka hello java
1.2 代碼
import org.apache.flink.api.scala._
/**
* @author 批處理wordCount
* @date 2020/8/23 23:03
* @version 1.0
*/
object WordCount {
def main(args: Array[String]): Unit = {
//1.構建運行環境
val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment
//2.從檔案中讀取資料
//2.1 設定讀取檔案的路勁
val inputPath: String ="./data/wordcount.txt"
val data: DataSet[String] = env.readTextFile(inputPath)
//3.根據單詞進行分組,然后進行sum求和
//3.1使用FlatMap進行單詞分詞
val dataFlatMap = data.flatMap(_.split(" "))
//3.2將切分出來的單詞進行添加1
val dataMap = dataFlatMap.map((_, 1))
//3.3 按照單詞進行分組
val dataGrepBy = dataMap.groupBy(0)
//3.4 分組后求和
val dataSum = dataGrepBy.sum(1)
//4.列印輸出
dataSum.print()
}
}
二、輸入資料集 Data Sources
Data Sources 是什么呢?就字面意思其實就可以知道:資料來源,
Flink 做為一款流式計算框架,它可用來做批處理,即處理靜態的資料集、歷史的資料 集;也可以用來做流處理,即實時的處理些實時資料流,實時的產生資料流結果,只要資料 源源不斷 的過來,Flink 就能夠一直計算下去,這個 Data Sources 就是資料的來源地, flink 在批處理中常見的 source 主要有兩大類,
2.1 基于本地集合的 source(Collection-based-source)
1、使用 env.fromElements(),這種方式也支持 Tuple,自定義物件等復合形式,
import org.apache.flink.api.scala._
import scala.collection.mutable
import scala.collection.mutable.{ArrayBuffer, ListBuffer}
/**
* @author 使用FromElements來讀取集合中的資料
* @date 2020/8/26 23:00
* @version 1.0
*/
object BachFromElements {
def main(args: Array[String]): Unit = {
//1.構建運行環境
val env = ExecutionEnvironment.getExecutionEnvironment
//2.使用FromElements構建資料流字串
val strData: DataSet[String] = env.fromElements("1", "2", "3")
strData.print()
//3.使用FromElements構建資料流 tuple
val tupleData: DataSet[(String, Int)] = env.fromElements(("張三", 1), ("李四", 2))
tupleData.print()
//4.使用FromElements構建資料流 Array
val arrayData: DataSet[Array[String]] = env.fromElements(Array("張三", "李四", "王五", "趙劉"))
arrayData.print()
//5.使用FromElements構建資料流ArrayBuffer
val arrayBufferData = env.fromElements(ArrayBuffer("張三", "李四", "王五", "趙劉"))
arrayBufferData.print()
//6.使用FromElements構建資料流List
val listData = env.fromElements(List("張三", "李四", "王五", "趙劉"))
listData.print()
//7.使用FromElements構建資料流ListBuffer
val listBufferData = env.fromElements(ListBuffer("張三", "李四", "王五", "趙劉"))
listBufferData.print()
//8.使用FromElements構建資料流Vector
val vectorData = env.fromElements(Vector("張三", "李四", "王五", "趙劉"))
vectorData.print()
//9.使用FromElements構建資料流Queue
val queueData = env.fromElements(mutable.Queue("張三", "李四", "王五", "趙劉"))
queueData.print()
//10.使用FromElements構建資料流Stack
val stackData = env.fromElements(mutable.Stack("張三", "李四", "王五", "趙劉"))
stackData.print()
//11.使用FromElements構建資料流Stream
val StreamData = env.fromElements(Stream("張三", "李四", "王五", "趙劉"))
StreamData.print()
//12.使用FromElements構建資料流Seq
val seqData = env.fromElements(Seq("張三", "李四", "王五", "趙劉"))
seqData.print()
//13.使用FromElements構建資料流Set
val SetData = env.fromElements(Set("張三", "李四", "王五", "趙劉"))
SetData.print()
//14.使用FromElements構建資料流Iterable
val iterableData = env.fromElements(Iterable("張三", "李四", "王五", "趙劉"))
iterableData.print()
//15.使用FromElements構建資料流Iterable
val arraySeqData = env.fromElements(mutable.ArraySeq("張三", "李四", "王五", "趙劉"))
arraySeqData.print()
//16.使用FromElements構建資料流ArrayStack
val arrayStackData = env.fromElements(mutable.ArrayStack("張三", "李四", "王五", "趙劉"))
arrayStackData.print()
//17.使用FromElements構建資料流Map
val mapData = env.fromElements(Map("張三" -> 1, "李四" -> 2))
mapData.print()
//18.使用FromElements構建資料流range
val rangeData = env.fromElements(Range(1, 10))
rangeData.print()
}
}
2、使用 env.fromCollection(),這種方式支持多種 Collection 的具體型別
3、使用 env.generateSequence()方法創建基于 Sequence 的 DataSet
import org.apache.flink.api.scala._
/**
* @author 使用FromCollection與generateSequence 構建資料集
* @date 2020/8/27 22:24
* @version 1.0
*/
object BachFromCollectionASGenerateSequence {
def main(args: Array[String]): Unit = {
//1.構建運行環境
val env = ExecutionEnvironment.getExecutionEnvironment
//2.使用FromCollection構建Array資料集 設定并行度為1
val arrayData = env.fromCollection(Array("張三", 1, "李四", 2)).setParallelism(1)
arrayData.print()
//3.使用generateSequence構建資料集 類似于range(1,4)
val data = env.generateSequence(1, 4).setParallelism(1)
data.print()
}
}
2.2 基于檔案的 source(File-based-source)
2.2.1讀取本地檔案(txt)
import org.apache.flink.api.scala._
/**
* @author 讀取本地檔案
* @date 2020/8/27 22:31
* @version 1.0
*/
object BachSourceFromFileAsLocalText {
def main(args: Array[String]): Unit = {
//1.構建運行環境
val env = ExecutionEnvironment.getExecutionEnvironment
//2.讀取本地檔案
val data = env.readTextFile("./data/wordcount.txt")
//3.接結果輸出
data.print()
}
}
2.2.2讀取 HDFS 資料(txt)
import org.apache.flink.api.scala._
/**
* @author 讀取HDFS 資料
* @date 2020/8/27 22:31
* @version 1.0
*/
object BachSourceFromFileAsLocalText {
def main(args: Array[String]): Unit = {
//1.構建運行環境
val env = ExecutionEnvironment.getExecutionEnvironment
//2.讀取本地檔案
val data = env.readTextFile("hdfs://node01:8020/wordCount.txt")
data.print()
}
}
2.2.3讀取 CSV 資料
我們對csv檔案操作的時候往往有時候需要表頭的但是有時候也不需要表頭的,所以我在這里給大家將2中方式第一種是帶表頭的,第二種是不帶表頭的,
根據原始碼顯示:ignoreFirstLine 為true的時候是不帶表頭的,(默認是帶表頭的)所以我們可以根據實際情況來判斷 ignoreFirstLine 為true還是為false
import org.apache.flink.api.scala._
/**
* @author 讀取本地檔案csv 第一個需要表頭 第二種不需要表頭
* @date 2020/8/27 22:33
* @version 1.0
*/
object BachSourceFileAsLocalCsv {
case class user(name:String,age:String)
def main(args: Array[String]): Unit = {
//1.構建運行環境
val env = ExecutionEnvironment.getExecutionEnvironment
//2.讀取資料本地csv檔案帶表頭
// 注意:讀取csv檔案是必須定義一個物件來接收負責報錯
val headData = env.readCsvFile[user]("./data/user.csv",fieldDelimiter = ",")
//3.讀取資料本地csv檔案不帶表頭 默認是帶頭部的
val notHeadData = env.readCsvFile[user]("./data/user.csv",fieldDelimiter = ",",ignoreFirstLine = true)
//4.分別輸出
headData.print()
notHeadData.print()
}
}
2.2.4讀取壓縮檔案
flink 支持對一個檔案目錄內的所有檔案,包括所有子目錄中的所有檔案的遍歷訪問方 式,對于從檔案中讀取資料,當讀取的數個檔案夾的時候,嵌套的檔案默認是不會被讀取的, 只會讀取第一個檔案,其他的都會被忽略,所以我們需要使用 recursive.file.enumeration 進 行遞回讀取,
注意:上述講解的都使用迭代的方式進行獲取資料,
import org.apache.flink.api.scala._
import org.apache.flink.configuration.Configuration
/**
* @author 讀取本地的zip檔案
* @date 2020/8/27 22:57
* @version 1.0
*/
object BachSourceFromAsLocalZip {
def main(args: Array[String]): Unit = {
//1.構建運行環境
val env = ExecutionEnvironment.getExecutionEnvironment
//2.開啟遞回
val parameters = new Configuration()
parameters.setBoolean("recursive.file.enumeration",true)
//2.讀取zip檔案
val zipData = env.readTextFile("./data/wordcount.txt.gz").withParameters(parameters)
//3.結果輸出
zipData.print()
}
}
轉載請註明出處,本文鏈接:https://www.uj5u.com/qianduan/4350.html
標籤:其他
