目錄
案例五 FlinkSQL整合Hive
介紹
???????集成Hive的基本方式
???????準備作業
1.添加hadoop_classpath
2.下載jar并上傳至flink/lib目錄
3.修改hive配置
4.啟動hive元資料服務
???????SQL CLI
1.修改flinksql配置
2.啟動flink集群
3.啟動flink-sql客戶端
4.執行sql:
???????代碼演示
案例五 FlinkSQL整合Hive
介紹
https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/table/connectors/hive/
https://zhuanlan.zhihu.com/p/338506408
使用Hive構建資料倉庫已經成為了比較普遍的一種解決方案,目前,一些比較常見的大資料處理引擎,都無一例外兼容Hive,Flink從1.9開始支持集成Hive,不過1.9版本為beta版,不推薦在生產環境中使用,在Flink1.10版本中,標志著對 Blink的整合宣告完成,對 Hive 的集成也達到了生產級別的要求,值得注意的是,不同版本的Flink對于Hive的集成有所差異,接下來將以最新的Flink1.12版本為例,實作Flink集成Hive
???????集成Hive的基本方式
Flink 與 Hive 的集成主要體現在以下兩個方面:
- 持久化元資料
Flink利用 Hive 的 MetaStore 作為持久化的 Catalog,我們可通過HiveCatalog將不同會話中的 Flink 元資料存盤到 Hive Metastore 中,例如,我們可以使用HiveCatalog將其 Kafka的資料源表存盤在 Hive Metastore 中,這樣該表的元資料資訊會被持久化到Hive的MetaStore對應的元資料庫中,在后續的 SQL 查詢中,我們可以重復使用它們,
- 利用 Flink 來讀寫 Hive 的表
Flink打通了與Hive的集成,如同使用SparkSQL或者Impala操作Hive中的資料一樣,我們可以使用Flink直接讀寫Hive中的表,
HiveCatalog的設計提供了與 Hive 良好的兼容性,用戶可以”開箱即用”的訪問其已有的 Hive表,不需要修改現有的 Hive Metastore,也不需要更改表的資料位置或磁區,
???????準備作業
1.添加hadoop_classpath
vim /etc/profile
增加如下配置
export HADOOP_CLASSPATH=`hadoop classpath`
重繪配置
source /etc/profile
2.下載jar并上傳至flink/lib目錄
https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/table/connectors/hive/

3.修改hive配置
vim /export/server/hive/conf/hive-site.xml
<property>
<name>hive.metastore.uris</name>
<value>thrift://node3:9083</value>
</property>
4.啟動hive元資料服務
nohup /export/server/hive/bin/hive --service metastore &
???????SQL CLI
1.修改flinksql配置
vim /export/server/flink/conf/sql-client-defaults.yaml
增加如下配置
catalogs:
- name: myhive
type: hive
hive-conf-dir: /export/server/hive/conf
default-database: default
2.啟動flink集群
/export/server/flink/bin/start-cluster.sh
3.啟動flink-sql客戶端
/export/server/flink/bin/sql-client.sh embedded
4.執行sql:
show catalogs;
use catalog myhive;
show tables;
select * from person;
???????代碼演示
package cn.itcast.extend;
import org.apache.flink.table.api.EnvironmentSettings;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.TableEnvironment;
import org.apache.flink.table.api.TableResult;
import org.apache.flink.table.catalog.hive.HiveCatalog;
/**
* Author itcast
* Desc
*/
public class HiveDemo {
public static void main(String[] args){
EnvironmentSettings settings = EnvironmentSettings.newInstance().useBlinkPlanner().build();
TableEnvironment tableEnv = TableEnvironment.create(settings);
String name = "myhive";
String defaultDatabase = "default";
String hiveConfDir = "./conf";
HiveCatalog hive = new HiveCatalog(name, defaultDatabase, hiveConfDir);
//注冊catalog
tableEnv.registerCatalog("myhive", hive);
//使用注冊的catalog
tableEnv.useCatalog("myhive");
//向Hive表中寫入資料
String insertSQL = "insert into person select * from person";
TableResult result = tableEnv.executeSql(insertSQL);
System.out.println(result.getJobClient().get().getJobStatus());
}
}
轉載請註明出處,本文鏈接:https://www.uj5u.com/qita/282594.html
標籤:其他
