批計算和流計算
在軟體系統中,尤其是企業級軟體,基本離不開資料統計和分析等資料計算,最初,多數常見的統計分析都是基于資料庫的資料進行處理,例如某一段時間的活躍用戶數統計,這種計算方式稱作離線計算,也稱作批量計算(個人理解),
而現實世界中的資料產生方式有很多都是持續不斷的,也就是說實際很多場景的資料是就是資料流,這些資料隨著時間的流逝,價值會不斷的降低,因此就需要盡可能實時的進行處理,
而批計算是一批資料一起處理,尤其是最初資料先入資料庫,再拿出來處理,這種方式在資料量日漸爆發的場景下,對于實時分析的業務就會有很多瓶頸,于是漸漸的出現了流計算,
相對于傳統的批計算而言,流計算更加的實時,基本是在資料產生并接收到的同時就進行處理,更加符合當前很多要求實時計算的場景,
flink發展
流計算,或者說實時計算,最初代表性的技術是storm,除此之外還有spark,目前對這兩個技識訓沒有深入了解,據說storm是真正的實時計算,而spark所謂的實時計算,實際只是縮小了批計算的范圍,嚴格來說依然還是批計算,
因此實際上spark的實時計算沒有storm快,但是spark支持實時計算的同時還支持批計算以及機器學習,并且也有它豐富的生態圈,因此spark應用場景也很廣,
spark是2013年貢獻給apache基金會,在2014年左右正式流行起來,而flink實際上也是差不多的時間貢獻給apache基金會,但是當時卻沒有spark流行,由于乍一聽起來和spark功能很重合,都是同時支持批計算、流計算和機器學習,所以之前甚至有人說它生不逢時,
直到2015年之后,阿里巴巴開始注意到這個框架并大量使用和改進,在經過了若干次雙11的洗禮之后,這個框架的能力越來越被大眾接收,使用的公司也越來越多,于是flink似乎翻身了,
上邊說spark的實時計算實際上只是縮小了批計算的范圍,而flink的實時計算則是真正的實時計算,所以flink實時計算的性能也要強于spark,
在flink的思想中,資料處理都是基于資料流,實時計算的資料流稱作無界流,批計算的資料流稱作有界流,
無界流
所謂的無界流,就是一段資料有開始時間,沒有結束時間,其實就是資料持續在產生,需要持續的分析處理,
有界流
同理,有界流就是一段資料有開始時間,也有結束時間,所以其實也很容易發現有界流其實就是無界流的一個特例,在無界流中定義一個結束時間的話,這一段資料就是有界流,
flink的概念還有很多,例如jobManager、taskManager、solt等,對于flink集群來說,還有master和worker,這些概念均關聯很多其他技術點,后續再進一步深入,
抱著先知其然再知其所以然的心態,這里先搭建一個簡單的flink集群用起來,
flink簡單集群搭建(centos)
flink集群搭建相對簡單,首先需要下載安裝包,我下載的目前官網最新版1.11.2,可在官網https://flink.apache.org/downloads.html處查看最新版本并下載,下載方式多種,我這里使用wget直接下載到虛擬機:
wget https://downloads.apache.org/flink/flink-1.11.2/flink-1.11.2-bin-scala_2.11.tgz
下載好了之后是解壓:
tar -zxf flink-1.11.2-bin-scala_2.11.tgz
然后是簡單的配置,分為兩步,一個是配置jobManager,一個是配置taskManager,其他配置暫時默認,
jobManager只需要修改解壓后目錄的conf目錄的flink-conf.yaml檔案,找到jobmanager.rpc.address這一行,把后邊的localhost改為實際的jobManager節點的主機名,我這里就是node001,因此配置修改如下:
jobmanager.rpc.address: node001
然后同樣是conf目錄下,修改workers檔案,早期版本的可能不叫workers,而是slaves,這個和hbase新舊版本中檔案命名有點像,在workers檔案中加入規劃的taskManager節點主機名,例如我修改后如下:
node002
node003
node004
上述配置需要各個節點保持一致,所以需要把修改好的檔案包括整個flink分發到其他機器上,也就是把解壓后的這個flink的目錄傳到另外幾個節點上,例如我是在node001上操作的,然后就可以使用類似如下的命令分發到其他機器:
scp -r flink-1.11.2 node002:`pwd`/
分發完成之后,各個機器需要配置一下環境變數,修改/etc/profile檔案,加入flink內容,然后就可以在規劃的jobManager節點執行啟動命令啟動flink集群:
start-cluster.sh
上述命令實際是flink解壓后目錄下的bin目錄下的腳本,執行上述腳本后會看到日志依次列印出各個節點的啟動情況:
Starting cluster.
Starting standalonesession daemon on host node001.
Starting taskexecutor daemon on host node002.
Starting taskexecutor daemon on host node003.
Starting taskexecutor daemon on host node004.
啟動完成就要可以進行驗證,最直接的驗證就是訪問自帶的web界面,默認就是開啟的,使用8081埠,例如我這里就可以使用http://node001:8081進行訪問,
除此之外,為了更進一步的驗證,參考官網示例,可以寫一個簡單的java代碼驗證,
java程式驗證flink
撰寫一個簡單的flink程式,需要引入flink相應的依賴包,如下:
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-scala_2.11</artifactId>
<version>1.11.2</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-scala_2.11</artifactId>
<version>1.11.2</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-clients_2.11</artifactId>
<version>1.11.2</version>
</dependency>
然后根據官網https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/datastream_api.html的示例撰寫如下代碼:
package com.tzx.study.demo.flink;
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.util.Collector;
public class FlinkTest {
public static void main(String [] args)throws Exception{
StreamExecutionEnvironment env= StreamExecutionEnvironment.getExecutionEnvironment();
DataStream<Tuple2<String, Integer>> dataStream = env
.socketTextStream("localhost", 8888)
.flatMap(new Splitter())
.keyBy(value -> value.f0)
.sum(1);
dataStream.print();
env.execute("Window WordCount");
}
public static class Splitter implements FlatMapFunction<String, Tuple2<String, Integer>> {
@Override
public void flatMap(String sentence, Collector<Tuple2<String, Integer>> out) throws Exception {
for (String word: sentence.split(" ")) {
out.collect(new Tuple2<String, Integer>(word, 1));
}
}
}
}
上述代碼的意思是,創建一個執行環境,如果是idea等開發工具運行,就創建本地運行環境,如果是把程式生成可執行jar放到flink集群運行,就是集群環境,
然后建立一個本地的埠是8888的socket檔案資料流連接,讀到每行資料以空格分隔,然后計算數量,
上述代碼在main方法中,因此是可以直接運行的,需要注意的是,運行之前需要先開啟8888的socket埠監聽,否則會啟動失敗,如果是本地idea測驗,需要windows上啟動這個埠,我是直接執行nc -lp 8888命令,后來把這個程式生成jar放到集群環境中運行,就需要在運行的linux節點中監聽這個埠,例如nc -lk 8888,windows和linux中稍有區別,
當然了,如果要更方便的驗證,也完全可以直接把localhost換成實際的主機名,這樣就不需要分別在不同環境啟動這個埠,
運行之后,在nc監聽的界面輸入相應資訊,便可以看到實時輸出的統計資料,代表簡單的flink集群和程式驗證成功,也標志著第一步成功邁出,接下來就是基于此的進一步應用和理解,
CSDN認證博客專家
web安全
系統安全
安全架構
轉載請註明出處,本文鏈接:https://www.uj5u.com/yidong/171977.html
標籤:其他
上一篇:qt呼叫簡單udp程式出錯!
