本期與大家分享的是,小北精心整理的大資料學習筆記,資料采集工具FlinkX 的詳細介紹,希望對大家能有幫助,喜歡就給點鼓勵吧,記得三連哦!歡迎各位大佬評論區指教討論!
💜🧡💛制作不易,各位大佬們給點鼓勵!
🧡💛💚點贊👍 ? 收藏? ? 關注?
💛💚💙歡迎各位大佬指教,一鍵三連走起!
一、FlinkX 簡介
FlinkX 是一個基于 Flink 的資料同步工具, FlinkX 可以采集靜態資料,如 MySQL、HDFS 等,也可以采集實時變化的資料,如 MySQL binlog、Kafka 等, 同時 FlinkX 也是一個支持所有語法和特性的計算框架原生的 FlinkSql , 并提供了大量的 案例 , FlinkX 目前包括以下功能
- 大多數插件支持并發讀寫資料,可以大大提高讀寫速
- 部分插件支持故障恢復功能,可以從故障位置恢復任務,節省運行時間;
- 關系資料庫的源插件支持間隔輪詢, 可不斷采集變化的資料
- 部分資料庫支持開啟 Kerberos 安全認證;
- 限制原始碼插件的讀取速度,減少對業務資料庫的影響
- 寫資料時保存臟資料
- 限制臟資料的最大數量
- 多種運行模式:Local、Standalone、Yarn Session、Yarn Per
- 同步任務支持執行flinksql語法的transformer操作
- sql 任務支持 共享 與 flinkSql 自己的連接器 ;
FlinkX 開源地址: https://github.com/DTStack/flinkx
二、FlinkX 安裝
首先,需要安裝unzip
yum install unzip
1、上傳并解壓
unzip flinkx-1.10.zip -d /usr/local/soft/
2、配置環境變數
vim /etc/profile
export FLINKX_HOME=/usr/local/soft/flinkx-1.10
export PATH=$PATH:$FLINKX_HOME/bin
source /etc/profile
3、給bin/flinkx這個檔案加上執行權限
chmod a+x flinkx
4、修改組態檔,設定運行埠
vim flinkconf/flink-conf.yaml
web服務埠,不指定的話會隨機生成一個
rest.bind-port: 8888
三、FlinkX的簡單使用
FlinkX的使用參考檔案:
https://github.com/DTStack/flinkx#introduction

1、MySQLToHDFS
組態檔
{
"job": {
"content": [
{
"reader": {
"parameter": {
"username": "root",
"password": "123456",
"connection": [
{
"jdbcUrl": [
"jdbc:mysql://master:3306/student?characterEncoding=utf8"
],
"table": [
"student"
]
}
],
"column": [
"*"
],
"customSql": "",
"where": "clazz = '理科二班'",
"splitPk": "",
"queryTimeOut": 1000,
"requestAccumulatorInterval": 2
},
"name": "mysqlreader"
},
"writer": {
"name": "hdfswriter",
"parameter": {
"path": "hdfs://master:9000/data/flinkx/student",
"defaultFS": "hdfs://master:9000",
"column": [
{
"name": "col1",
"index": 0,
"type": "string"
},
{
"name": "col2",
"index": 1,
"type": "string"
},
{
"name": "col3",
"index": 2,
"type": "string"
},
{
"name": "col4",
"index": 3,
"type": "string"
},
{
"name": "col5",
"index": 4,
"type": "string"
},
{
"name": "col6",
"index": 5,
"type": "string"
}
],
"fieldDelimiter": ",",
"fileType": "text",
"writeMode": "overwrite"
}
}
}
],
"setting": {
"restore": {
"isRestore": false,
"isStream": false
},
"errorLimit": {},
"speed": {
"channel": 1
}
}
}
}
啟動任務
flinkx -mode local -job ./scripts/mysqlToHDFS.json -pluginRoot syncplugins/ -flinkconf flinkconf/

監聽日志
flinkx 任務啟動后,會在執行命令的目錄下生成一個nohup.out檔案
tail -f nohup.out
通過web界面查看任務運行情況
http://master:8888
2、MySQLToHive
組態檔
{
"job": {
"content": [
{
"reader": {
"parameter": {
"username": "root",
"password": "123456",
"connection": [
{
"jdbcUrl": [
"jdbc:mysql://master:3306/student?characterEncoding=utf8"
],
"table": [
"student"
]
}
],
"column": [
"*"
],
"customSql": "",
"where": "clazz = '文科二班'",
"splitPk": "id",
"queryTimeOut": 1000,
"requestAccumulatorInterval": 2
},
"name": "mysqlreader"
},
"writer": {
"name": "hivewriter",
"parameter": {
"jdbcUrl": "jdbc:hive2://master:10000/testflinkx",
"username": "",
"password": "",
"fileType": "text",
"fieldDelimiter": ",",
"writeMode": "overwrite",
"compress": "",
"charsetName": "UTF-8",
"maxFileSize": 1073741824,
"tablesColumn": "{\"student\":[{\"key\":\"id\",\"type\":\"string\"},{\"key\":\"name\",\"type\":\"string\"},{\"key\":\"age\",\"type\":\"string\"}]}",
"defaultFS": "hdfs://master:9000"
}
}
}
],
"setting": {
"restore": {
"isRestore": false,
"isStream": false
},
"errorLimit": {},
"speed": {
"channel": 3
}
}
}
}
在hive中創建flinkx_test資料庫,并創建student磁區表
create database flinkx_test;
use flinkx_test;
CREATE TABLE `student`(
`id` string,
`name` string,
`age` string)
PARTITIONED BY (
`pt` string)
ROW FORMAT DELIMITED
FIELDS TERMINATED BY ','
啟動hiveserver2
# 第一種方式:
hiveserver2
# 第二種方式:
hive --service hiveserver2
啟動任務
flinkx -mode local -job ./scripts/mysqlToHive.json -pluginRoot syncplugins/ -flinkconf flinkconf/
查看日志及運行情況同上
3、 MySQLToHBase
組態檔
{
"job": {
"content": [
{
"reader": {
"parameter": {
"username": "root",
"password": "123456",
"connection": [
{
"jdbcUrl": [
"jdbc:mysql://master:3306/student?characterEncoding=utf8"
],
"table": [
"score"
]
}
],
"column": [
"*"
],
"customSql": "",
"splitPk": "student_id",
"queryTimeOut": 1000,
"requestAccumulatorInterval": 2
},
"name": "mysqlreader"
},
"writer": {
"name": "hbasewriter",
"parameter": {
"hbaseConfig": {
"hbase.zookeeper.property.clientPort": "2181",
"hbase.rootdir": "hdfs://master:9000/hbase",
"hbase.cluster.distributed": "true",
"hbase.zookeeper.quorum": "master,node1,node2",
"zookeeper.znode.parent": "/hbase"
},
"table": "testFlinkx",
"rowkeyColumn": "$(cf1:student_id)_$(cf1:course_id)",
"column": [
{
"name": "cf1:student_id",
"type": "string"
},
{
"name": "cf1:course_id",
"type": "string"
},
{
"name": "cf1:score",
"type": "string"
}
]
}
}
}
],
"setting": {
"restore": {
"isRestore": false,
"isStream": false
},
"errorLimit": {},
"speed": {
"channel": 3
}
}
}
}
啟動hbase 并創建testflinkx表
create 'testFlinkx','cf1'
啟動任務
flinkx -mode local -job ./scripts/mysqlToHBase.json -pluginRootsyncplugins/ -flinkconf flinkconf/
查看日志及運行情況同上
4、 MySQLToMySQL
組態檔
{
"job": {
"content": [
{
"reader": {
"name": "mysqlreader",
"parameter": {
"column": [
{
"name": "id",
"type": "int"
},
{
"name": "name",
"type": "string"
},
{
"name": "age",
"type": "int"
},
{
"name": "gender",
"type": "string"
},
{
"name": "clazz",
"type": "string"
}
],
"username": "root",
"password": "123456",
"connection": [
{
"jdbcUrl": [
"jdbc:mysql://master:3306/student?useSSL=false"
],
"table": [
"student"
]
}
]
}
},
"writer": {
"name": "mysqlwriter",
"parameter": {
"username": "root",
"password": "123456",
"connection": [
{
"jdbcUrl": "jdbc:mysql://master:3306/student?useSSL=false",
"table": [
"student2"
]
}
],
"writeMode": "insert",
"column": [
{
"name": "id",
"type": "int"
},
{
"name": "name",
"type": "string"
},
{
"name": "age",
"type": "int"
},
{
"name": "gender",
"type": "string"
},
{
"name": "clazz",
"type": "string"
}
]
}
}
}
],
"setting": {
"speed": {
"channel": 1,
"bytes": 0
}
}
}
}
轉載請註明出處,本文鏈接:https://www.uj5u.com/qita/325518.html
標籤:其他
