在上一個章節《[3] Flink大資料流式處理利劍: Flink的部署架構》 筆者提到了Flink的基本部署架構,這一個章節筆者就代理大家來安裝一個最新的Flink集群,兵馬未動糧草先行,所以首先要去Flink的官方網站上去下載Flink的安裝包,Flink當前的最新版本是 1.14.3

可以看到1.14.3 有兩個版本,一個是運行在Scala2.11上面,一個是運行在Scala2.12上面;筆者就安裝一個Scala2.12版本吧!下載地址:https://dlcdn.apache.org/flink/flink-1.14.3/flink-1.14.3-bin-scala_2.12.tgz,筆者在Linux CentOs 機器上運行下面的命令:
wget https://dlcdn.apache.org/flink/flink-1.14.3/flink-1.14.3-bin-scala_2.12.tgz --no-check-certificate
tar -vxf flink-1.14.3-bin-scala_2.12.tgzls
cd flink-1.14.3
執行tree命令后,其目錄檔案結構如下:
.
├── bin
│ ├── bash-java-utils.jar
│ ├── config.sh
│ ├── find-flink-home.sh
│ ├── flink
│ ├── flink-console.sh
│ ├── flink-daemon.sh
│ ├── historyserver.sh
│ ├── jobmanager.sh
│ ├── kubernetes-jobmanager.sh
│ ├── kubernetes-session.sh
│ ├── kubernetes-taskmanager.sh
│ ├── pyflink-shell.sh
│ ├── sql-client.sh
│ ├── standalone-job.sh
│ ├── start-cluster.sh
│ ├── start-zookeeper-quorum.sh
│ ├── stop-cluster.sh
│ ├── stop-zookeeper-quorum.sh
│ ├── taskmanager.sh
│ ├── yarn-session.sh
│ └── zookeeper.sh
├── conf
│ ├── flink-conf.yaml
│ ├── log4j-cli.properties
│ ├── log4j-console.properties
│ ├── log4j.properties
│ ├── log4j-session.properties
│ ├── logback-console.xml
│ ├── logback-session.xml
│ ├── logback.xml
│ ├── masters
│ ├── workers
│ └── zoo.cfg
├── examples
│ ├── batch
│ │ ├── ConnectedComponents.jar
│ │ ├── DistCp.jar
│ │ ├── EnumTriangles.jar
│ │ ├── KMeans.jar
│ │ ├── PageRank.jar
│ │ ├── TransitiveClosure.jar
│ │ ├── WebLogAnalysis.jar
│ │ └── WordCount.jar
│ ├── gelly
│ │ └── flink-gelly-examples_2.12-1.14.3.jar
│ ├── python
│ │ ├── datastream
│ │ │ ├── event_time_timer.py
│ │ │ ├── __init__.py
│ │ │ ├── process_json_data.py
│ │ │ ├── state_access.py
│ │ │ └── word_count.py
│ │ └── table
│ │ ├── __init__.py
│ │ ├── mixing_use_of_datastream_and_table.py
│ │ ├── multi_sink.py
│ │ ├── pandas
│ │ │ ├── conversion_from_dataframe.py
│ │ │ ├── __init__.py
│ │ │ └── pandas_udaf.py
│ │ ├── process_json_data.py
│ │ ├── process_json_data_with_udf.py
│ │ ├── windowing
│ │ │ ├── __init__.py
│ │ │ ├── over_window.py
│ │ │ ├── session_window.py
│ │ │ ├── sliding_window.py
│ │ │ └── tumble_window.py
│ │ └── word_count.py
│ ├── streaming
│ │ ├── Iteration.jar
│ │ ├── SessionWindowing.jar
│ │ ├── SocketWindowWordCount.jar
│ │ ├── StateMachineExample.jar
│ │ ├── TopSpeedWindowing.jar
│ │ ├── Twitter.jar
│ │ ├── WindowJoin.jar
│ │ └── WordCount.jar
│ └── table
│ ├── AdvancedFunctionsExample.jar
│ ├── ChangelogSocketExample.jar
│ ├── GettingStartedExample.jar
│ ├── StreamSQLExample.jar
│ ├── StreamWindowSQLExample.jar
│ ├── UpdatingTopCityExample.jar
│ └── WordCountSQLExample.jar
├── lib
│ ├── flink-csv-1.14.3.jar
│ ├── flink-dist_2.12-1.14.3.jar
│ ├── flink-json-1.14.3.jar
│ ├── flink-shaded-zookeeper-3.4.14.jar
│ ├── flink-table_2.12-1.14.3.jar
│ ├── log4j-1.2-api-2.17.1.jar
│ ├── log4j-api-2.17.1.jar
│ ├── log4j-core-2.17.1.jar
│ └── log4j-slf4j-impl-2.17.1.jar
├── LICENSE
├── licenses
│ ├── LICENSE.antlr-java-grammar-files
│ ├── LICENSE.antlr-runtime
│ ├── LICENSE-aopalliance
│ ├── LICENSE.asm
│ ├── LICENSE.automaton
│ ├── LICENSE.base64
│ ├── LICENSE.bouncycastle
│ ├── LICENSE.google-auth-library-credentials
│ ├── LICENSE.grizzled-slf4j
│ ├── LICENSE-hdrhistogram
│ ├── LICENSE.icu4j
│ ├── LICENSE.influx
│ ├── LICENSE.janino
│ ├── LICENSE.javax.activation
│ ├── LICENSE.jaxb
│ ├── LICENSE.jdom
│ ├── LICENSE.jline
│ ├── LICENSE.jsr166y
│ ├── LICENSE.jzlib
│ ├── LICENSE.kryo
│ ├── LICENSE.minlog
│ ├── LICENSE.protobuf
│ ├── LICENSE.py4j
│ ├── LICENSE.pyrolite
│ ├── LICENSE-re2j
│ ├── LICENSE.reflections
│ ├── LICENSE.scala
│ ├── LICENSE.scopt
│ ├── LICENSE.slf4j-api
│ ├── LICENSE-stax2api
│ └── LICENSE.webbit
├── log
├── NOTICE
├── opt
│ ├── flink-azure-fs-hadoop-1.14.3.jar
│ ├── flink-cep_2.12-1.14.3.jar
│ ├── flink-cep-scala_2.12-1.14.3.jar
│ ├── flink-gelly_2.12-1.14.3.jar
│ ├── flink-gelly-scala_2.12-1.14.3.jar
│ ├── flink-oss-fs-hadoop-1.14.3.jar
│ ├── flink-python_2.12-1.14.3.jar
│ ├── flink-queryable-state-runtime-1.14.3.jar
│ ├── flink-s3-fs-hadoop-1.14.3.jar
│ ├── flink-s3-fs-presto-1.14.3.jar
│ ├── flink-shaded-netty-tcnative-dynamic-2.0.39.Final-14.0.jar
│ ├── flink-shaded-zookeeper-3.5.9.jar
│ ├── flink-sql-client_2.12-1.14.3.jar
│ ├── flink-state-processor-api_2.12-1.14.3.jar
│ └── python
│ ├── cloudpickle-1.2.2-src.zip
│ ├── py4j-0.10.8.1-src.zip
│ └── pyflink.zip
├── plugins
│ ├── external-resource-gpu
│ │ ├── flink-external-resource-gpu-1.14.3.jar
│ │ ├── gpu-discovery-common.sh
│ │ └── nvidia-gpu-discovery.sh
│ ├── metrics-datadog
│ │ └── flink-metrics-datadog-1.14.3.jar
│ ├── metrics-graphite
│ │ └── flink-metrics-graphite-1.14.3.jar
│ ├── metrics-influx
│ │ └── flink-metrics-influxdb-1.14.3.jar
│ ├── metrics-jmx
│ │ └── flink-metrics-jmx-1.14.3.jar
│ ├── metrics-prometheus
│ │ └── flink-metrics-prometheus-1.14.3.jar
│ ├── metrics-slf4j
│ │ └── flink-metrics-slf4j-1.14.3.jar
│ ├── metrics-statsd
│ │ └── flink-metrics-statsd-1.14.3.jar
│ └── README.txt
└── README.txt
26 directories, 146 files
其目錄結構如下:
[root@localhost flink-1.14.3]# tree . -d
.
├── bin
├── conf
├── examples
│ ├── batch
│ ├── gelly
│ ├── python
│ │ ├── datastream
│ │ └── table
│ │ ├── pandas
│ │ └── windowing
│ ├── streaming
│ └── table
├── lib
├── licenses
├── log
├── opt
│ └── python
└── plugins
├── external-resource-gpu
├── metrics-datadog
├── metrics-graphite
├── metrics-influx
├── metrics-jmx
├── metrics-prometheus
├── metrics-slf4j
└── metrics-statsd
26 directories
然后執行,啟動集群的命令:
[flink@localhost bin]# ./start-cluster.sh
Starting cluster.
Starting standalonesession daemon on host localhost.localdomain.
Starting taskexecutor daemon on host localhost.localdomain.
同一臺機器上啟動集群
默認情況,其會在本地啟動1一個Job manager節點,三個task manager節點,驗證的方式是用瀏覽器打開 http://127.0.0.1:8081/#/overview[如果沒有安裝在本機,請把127.0.0.1替換成相應的IP地址]
由此可以看到:
#1)只有 1個Job Manager(IP地址為127.0.0.1)

#2) 3個Task Manager(在同一臺機器上,使用不同的埠)

我們看到其有3個Task Manager節點和1個Job Manager節點在同一臺服務器上(127.0.0.1,本機)
如果我們不想在同一臺機器上安裝這三臺Flink的應用服務,那么我們應該怎么辦?
不同臺機器上啟動集群
如果我們不想在同一臺機器上安裝這三臺Flink的應用服務,假設我們有三臺虛擬機,其IP地址如下:
- 192.168.1.100
- 192.168.1.101
- 192.168.1.101;
我們現在以192.168.1.100為Job manager(master)節點;192.168.1.101,192.168.1.102為Task Manager節點(Slave)節點;則其架構如下:

具體的配置步驟如下:
步驟1:修改<flink_home>\conf\flink-conf.yaml 檔案
打開flink-conf.yaml檔案,修改 jobmanager.rpc.address的地址為192.168.1.100,也就是192.168.1.100為master節點

步驟2:修改<flink_home>\conf\master檔案,把其地址修改為192.168.1.100:8081

步驟3:修改<flink_home>\conf\slave檔案,在其里面添加2行TaskManager(Slave)的IP地址:
192.168.1.101
192.168.1.102

步驟4: 把上面的flink軟體使用scp同步到192.168.1.101和192.168.1.102目錄下
scp flink-1.14.3-bin-scala_2.12.tgz flink@192.168.1.101:/opt/flink
scp flink-1.14.3-bin-scala_2.12.tgz flink@192.168.1.102:/opt/flink
步驟5. 在各自的三個節點上的bin目錄下面執行:start-cluster.sh
start-cluster.sh
注意在執行的程序中需要輸入其他節點的密碼,
步驟6:打開主節點的地址,可以看到安裝集群安裝成功,

如何保證Job Manager節點的高可用
上面的配置,能夠保證Task Manager節點(Slave)節點高可用,但是Job Manager(Master)節點只有一個,如果Job Manager(Master)節點掛了,則整個集群都掛了;那么如何保證Job Manager(Master)節點的高可用呢?Flink給我們提供了兩種不同的方式:
- 一種是ZooKeeper的方式 : Job Manager(Master)節點可以不用部署在K8S上
- 一種是直接借助K8S的特性:Job Manager(Master)節點必須部署在K8S上

具體方式,請參考Flink HA
通過命令列在集群上運行一個例子
上面已經把Flink的集群搭建好了,下面來看如何運行一個例子,比如在Flink的安裝包路徑的examples檔案夾下,有很多例子,咱們以SocketWindowWordCount.jar為例子,試跑一個,

SocketWindowWordCount.jar的代碼如下:
package org.apache.flink.streaming.examples.socket;
import java.lang.invoke.SerializedLambda;
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.common.functions.ReduceFunction;
import org.apache.flink.api.java.utils.ParameterTool;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.windowing.assigners.TumblingProcessingTimeWindows;
import org.apache.flink.streaming.api.windowing.assigners.WindowAssigner;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.util.Collector;
public class SocketWindowWordCount {
public static void main(String[] args) throws Exception {
String hostname;
int port;
try {
ParameterTool params = ParameterTool.fromArgs(args);
hostname = params.has("hostname") ? params.get("hostname") : "localhost";
port = params.getInt("port");
} catch (Exception e) {
System.err.println("No port specified. Please run 'SocketWindowWordCount --hostname <hostname> --port <port>', where hostname (localhost by default) and port is the address of the text server");
System.err.println("To start a simple text server, run 'netcat -l <port>' and type the input text into the command line");
return;
}
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
DataStreamSource dataStreamSource = env.socketTextStream(hostname, port, "\n");
SingleOutputStreamOperator singleOutputStreamOperator = dataStreamSource.flatMap(new FlatMapFunction<String, WordWithCount>() {
public void flatMap(String value, Collector<SocketWindowWordCount.WordWithCount> out) {
for (String word : value.split("\\s"))
out.collect(new SocketWindowWordCount.WordWithCount(word, 1L));
}
}).keyBy(value -> value.word).window((WindowAssigner)TumblingProcessingTimeWindows.of(Time.seconds(5L))).reduce(new ReduceFunction<WordWithCount>() {
public SocketWindowWordCount.WordWithCount reduce(SocketWindowWordCount.WordWithCount a, SocketWindowWordCount.WordWithCount b) {
return new SocketWindowWordCount.WordWithCount(a.word, a.count + b.count);
}
});
singleOutputStreamOperator.print().setParallelism(1);
env.execute("Socket Window WordCount");
}
public static class WordWithCount {
public String word;
public long count;
public WordWithCount() {}
public WordWithCount(String word, long count) {
this.word = word;
this.count = count;
}
public String toString() {
return this.word + " : " + this.count;
}
}
}
- 步驟1
由上面的代碼可知,其需要指定一個埠(我們假定是9999)去讀取需要處理的資料流,
我們通過nc命令模擬一個Socker資料流:
yum install nc
nc -lk 9999
- 步驟2:
所以我們在192.168.1.100(master節點)的bin目錄下,找到flink的客戶端,然后執行下面的命令
./flink run -c org.apache.flink.streaming.examples.socket.SocketWindowWordCount ../examples/streaming/SocketWindowWordCount.jar --hostname 192.100.30.131 --port 9999

-
步驟3:登錄到Web UI控制臺
登錄Web UI控制臺后,我們將會看到已經部署的任務

-
步驟4:在nc控制臺輸入字串,如下圖:

注意,其字串以空格為區分, -
步驟5:回到Flink UI控制臺,查看結果
在Flink的UI控制臺,點擊Task Manger,找到 Free Slot值為0的節點,然后在stdout tab下可以看到其輸出,如下:
-
步驟6 取消job
上面的測驗完成后,我們可以取消當前job,可以點擊Running Jobs --> Cancel Job

這樣在完成的Job串列里面就會有Cancel狀態的Job

通過命Web UI界面集群上運行一個例子
在左邊點擊“Submit New Job”按鈕,則可以看到一個可以直接上傳Flink應用程式的按鈕;輸入回應的引數,就可以點擊提交了,其效果和通過命令列的方式一模一樣,筆者就不在贅述,

值得一提的是,其有一個并行度的引數(Parallelism),如果輸入2個話,其將會有兩個并發:
- FlatMap的處理
- Window視窗處理

總結
上面筆者總結了如何在一個機器上安裝集群,以及在多個機器上安裝集群;并提到了通過zookeeper可以讓Flink集群的Master節點也保持高可用;最后演示了如何部署一個SocketWindowWordCount.jar的例子,并在Flink的Web UI上面觀察其job的執行情況和輸出,下一個章節筆者將會帶領大家今日其具體的Flink的用法,
轉載請註明出處,本文鏈接:https://www.uj5u.com/qita/423270.html
標籤:其他
