1、前言
大家在開發Flink的時候,很多時候會把某些固定的維度資訊存盤在Hbase或者Redis等這些第三方庫里,已方便來跟流發生關聯查詢輸出,本文將從如何本地搭建一個Hbase環境開始講起,到如何構建一個Hbase公共呼叫類,到如何構建一個異步呼叫Hbase的公共代碼,再到最后實際呼叫代碼后的輸出,
2、本地利用Docker搭建HBase環境
本地如何搭建Docker環境,之前一篇博客中已經詳細描述過,大家如果想學習如何搭建,可以去看下,地址如下:Docker入門-Windows 10&Mac系統安裝_一個資料小開發的博客-CSDN博客一、何為DockerDocker 是一個開源的應用容器引擎,基于Go語言并遵從 Apache2.0 協議開源,Docker 可以讓開發者打包他們的應用以及依賴包到一個輕量級、可移植的容器中,然后發布到任何流行的 Linux 機器上,也可以實作虛擬化,容器是完全使用沙箱機制,相互之間不會有任何介面(類似 iPhone 的 app),更重要的是容器性能開銷極低,runc 是一個 Linux 命令列工具,用于根據 OCI容器運行時規范 創建和運行容器,containerd 是一個守護程式https://blog.csdn.net/Aaron_ch/article/details/115559512?ops_request_misc=%257B%2522request%255Fid%2522%253A%2522164568960216780271525279%2522%252C%2522scm%2522%253A%252220140713.130102334.pc%255Fblog.%2522%257D&request_id=164568960216780271525279&biz_id=0&utm_medium=distribute.pc_search_result.none-task-blog-2~blog~first_rank_ecpm_v1~rank_v31_ecpm-1-115559512.nonecase&utm_term=docker&spm=1018.2226.3001.4450
2.1、下載Hbase鏡像
docker pull harisekhon/hbase
2.2、啟動Hbase鏡像:
docker run -d -h myhbase -p 2181:2181 -p 8080:8080 -p 8085:8085 -p 9090:9090 -p 9095:9095 -p 16000:16000 -p 16010:16010 -p 16020:16020 -p 16201:16201 -p 16301:16301 --name hbase harisekhon/hbase
Hbase 埠映射圖:
2.3、本地添加host
打開/etc/hosts ,如下圖所示,添加一行
127.0.0.1 myhbase

此時在瀏覽器中輸入hbase訪問的地址,可以看到hbase的頁面
http://localhost:16010/master-status

3、訪問Hbase
3.1、圖形化工具訪問Hbase
如下的圖形化工具,Mac和Windows都可以訪問,

如果需要圖形化工具的,評論區留下郵箱,本人看到了會第一時間發送
3.2、Java工具類訪問
直接上代碼,可以直接使用
package com.horse.utils.hbase;
import com.alibaba.fastjson.JSONObject;
import com.google.common.base.CaseFormat;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.beanutils.BeanUtils;
import org.apache.flink.util.StringUtils;
import org.apache.hadoop.hbase.*;
import org.apache.hadoop.hbase.client.*;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.util.Bytes;
import java.io.IOException;
import java.util.List;
/**
* @author :aaronChen
* @date :Created in 2022-02-22 16:51
* @description :
* @modifiedBy :
*/
@Slf4j
public class HBaseUtils {
public static Connection getHBaseConnection() {
try {
// 1.獲取組態檔資訊
Configuration configuration = HBaseConfiguration.create();
configuration.set("hbase.zookeeper.quorum", "localhost");
// configuration.set("hbase.zookeeper.property.clientPort", "2181");//可寫可不寫,默認為2181
// 2.創建連接物件
return ConnectionFactory.createConnection(configuration);
} catch (IOException e) {
log.error("獲取Hbase相關基礎資訊失");
e.printStackTrace();
}
return null;
}
// static {
// try {
// // 1.獲取組態檔資訊
// Configuration configuration = HBaseConfiguration.create();
// configuration.set("hbase.zookeeper.quorum", "localhost");
configuration.set("hbase.zookeeper.property.clientPort", "2181");//可寫可不寫,默認為2181
//
// // 2.創建連接物件
// connection = ConnectionFactory.createConnection(configuration);
//
// // 3.創建Admin物件
// admin = connection.getAdmin();
//
// } catch (IOException e) {
// log.error("獲取Hbase相關基礎資訊失");
// e.printStackTrace();
// }
// }
/**
* @param tableName
* @return true or false
* @throws IOException
* @author aaronChen
* @description 判斷表是否存在
*/
public static boolean isTableExist(String tableName, Admin admin) throws IOException {
// 1.判斷表是否存在
boolean exists = admin.tableExists(TableName.valueOf(tableName));
// 2.回傳結果
return exists;
}
/**
* @param tableName 需要創建的hbase表名,格式必須要是 命名空間:tablename
* @param cfs 可以同時傳入多個列簇
* @param createNameSpaceIfExists 如果命名空間不存在,是否創建
* @throws IOException
* @author aaronChen
* @description 創建表
*/
public static boolean createTable(Admin admin, String tableName, boolean createNameSpaceIfExists, String... cfs) throws IOException {
//1、判斷傳入的表是否正確
if (tableName.split(":").length != 2) {
log.error("{},傳入的表名有問題,需要傳入 nameSpace:tableName 這種格式", tableName);
return false;
}
String nameSpace = tableName.split(":")[0];
//2、判斷命名空間相關資訊
if (!isNameSpaceExist(admin, nameSpace)) {
if (createNameSpaceIfExists) {
log.info("{},該命名空間不存在,開始創建該命名空間", nameSpace);
createNameSpace(admin, nameSpace);
} else {
log.error("{},該命名空間不存在", nameSpace);
return false;
}
}
// 3、判斷表是否存在
if (isTableExist(tableName, admin)) {
log.error("{},表已存在", tableName);
return false;
}
// 4、判斷是否存在列族資訊
if (cfs.length <= 0) {
log.info("請設定列族資訊!");
return false;
}
// 5、創建表描述器
try {
if (admin.tableExists(TableName.valueOf(tableName))) {
return false;
}
//定義表描述物件
TableDescriptorBuilder tableDescriptorBuilder = TableDescriptorBuilder.newBuilder(TableName.valueOf(tableName));
//遍歷列族陣列
for (String cf : cfs) {
//定義列族描述物件
ColumnFamilyDescriptor columnFamily = ColumnFamilyDescriptorBuilder.of(cf);
//給表添加列族資訊
tableDescriptorBuilder.setColumnFamily(columnFamily);
}
//創建表
admin.createTable(tableDescriptorBuilder.build());
} catch (Exception e) {
e.printStackTrace();
return false;
}
log.info("{},表新建成功", tableName);
return true;
}
/**
* @param tableName
* @throws IOException
* @author aaronChen
* @description 洗掉表
*/
public static void dropTable(Admin admin, String tableName) throws IOException {
// 1.判斷表是否存在
if (!isTableExist(tableName, admin)) {
log.error("{},表不存在", tableName);
return;
}
// 2.使表下線
admin.disableTable(TableName.valueOf(tableName));
// 3.洗掉表
admin.deleteTable(TableName.valueOf(tableName));
}
/**
* @param nameSpace
* @return true or false
* @throws IOException
* @author aaronChen
* @description 判斷一個命名空間是否存在
*/
public static boolean isNameSpaceExist(Admin admin, String nameSpace) throws IOException {
NamespaceDescriptor[] namespaceDescriptors = admin.listNamespaceDescriptors();
for (NamespaceDescriptor namespaceDescriptor : namespaceDescriptors) {
if (namespaceDescriptor.getName().equals(nameSpace)) {
return true;
}
}
return false;
}
/**
* @param namespace
* @author aaronChen
* @description 創建命名空間
*/
public static void createNameSpace(Admin admin, String namespace) {
// 1.創建命名空間描述器
NamespaceDescriptor namespaceDescriptor = NamespaceDescriptor.create(namespace).build();
// 2.創建命名空間
try {
admin.createNamespace(namespaceDescriptor);
} catch (NamespaceExistException e) {
log.error("{},命名空間已存在", namespace);
} catch (IOException e) {
e.printStackTrace();
}
}
/**
* @param tableName
* @param rowKey
* @param columnFamily
* @param columnName
* @param value
* @throws IOException
* @author aaronChen
* @description 向表中插入資料
*/
public static void putData(Connection connection, String tableName, String rowKey, String columnFamily, String columnName, String value) throws IOException {
// 1.獲取表物件
Table table = connection.getTable(TableName.valueOf(tableName));
// 2.創建put物件
Put put = new Put(Bytes.toBytes(rowKey));
// 3.給Put物件賦值
put.addColumn(Bytes.toBytes(columnFamily), Bytes.toBytes(columnName), Bytes.toBytes(value));
// 4.插入資料
table.put(put);
// 5.關閉表連接
table.close();
}
/**
* @param tableName
* @param clz
* @param <T>
* @return
* @throws Exception
*/
public static <T> T getData(Connection connection, String tableName, String rowKey, String columnFamily, String columnName, boolean underScoreToCamel, Class<T> clz) throws Exception {
//1、判斷傳入的引數是否為慷訓空字串
if (StringUtils.isNullOrWhitespaceOnly(tableName)) {
log.error("傳入的tableName為空:{}", tableName);
return null;
}
if (StringUtils.isNullOrWhitespaceOnly(rowKey)) {
log.error("傳入的rowKey為空:{}", rowKey);
return null;
}
if (StringUtils.isNullOrWhitespaceOnly(columnFamily)) {
log.error("傳入的columnFamily為空:{}", columnFamily);
return null;
}
Get get = new Get(Bytes.toBytes(rowKey));
if (StringUtils.isNullOrWhitespaceOnly(columnName)) {
get.addFamily(Bytes.toBytes(columnFamily));
// log.info("傳入的columnName為空:{},將根據傳入的其他資訊查詢全量的列資訊", columnName);
} else {
//傳入的不為空的話,就查詢傳入的值
get.addColumn(Bytes.toBytes(columnFamily), Bytes.toBytes(columnName));
// log.info("傳入的columnName為:{},將根據傳入的值查詢資訊", columnName);
}
//4、獲取表物件
Table table = connection.getTable(TableName.valueOf(tableName));
//5、設定獲取資料的版本數
//get.setMaxVersions(5);
//5、獲取資料
Result resultData = table.get(get);
//6、關閉表連接
table.close();
//創建 泛型物件
T t = clz.newInstance();
//給泛型物件賦值
List<Cell> cells = resultData.listCells();
for (Cell cell : cells) {
String resultColumnName = Bytes.toString(CellUtil.cloneQualifier(cell));
String resultValue = Bytes.toString(CellUtil.cloneValue(cell));
//判斷是否需要轉換為駝峰命名
if (underScoreToCamel) {
if (resultColumnName.contains("_")) {
resultColumnName = CaseFormat.LOWER_UNDERSCORE.to(CaseFormat.LOWER_CAMEL, resultColumnName.toLowerCase());
} else if (resultColumnName.contains("-")) {
resultColumnName = CaseFormat.LOWER_HYPHEN.to(CaseFormat.LOWER_CAMEL, resultColumnName.toLowerCase());
} else {
resultColumnName = CaseFormat.LOWER_CAMEL.to(CaseFormat.LOWER_CAMEL, resultColumnName);
}
}
BeanUtils.setProperty(t, resultColumnName, resultValue);
}
return t;
}
/**
* @param tableName
* @return 回傳一個 JSONArray
* <p>
* [
* {
* "rowKey":"11111",
* "value":[
* {
* "columnFamily":"first",
* "columnValue":{
* "name":"aaron",
* "age":18
* }
* },
* {
* "columnFamily":"second",
* "columnValue":{
* "name":"aaron",
* "age":18
* }
* }
* ]
* },
* {
* "rowKey":"22222",
* "value":[
* {
* "columnFamily":"first",
* "columnValue":{
* "name":"wwww",
* "age":19
* }
* },
* {
* "columnFamily":"second",
* "columnValue":{
* "name":"cccc",
* "age":18,
* "address","NanJ"
* }
* }
* ]
* }
* ]
* @throws IOException
* @author aaronChen
* @description 掃描全表資料
* @deprecated
*/
private static void scanTable(Connection connection, String tableName, String leftScanRowKey, String rightScanRowKey) throws IOException {
// 1.獲取表物件
Table table = connection.getTable(TableName.valueOf(tableName));
// 2.構建Scan物件 // 左閉右開
Scan scan = new Scan(Bytes.toBytes(leftScanRowKey), Bytes.toBytes(rightScanRowKey));
// 3.掃描表
ResultScanner resultScanner = table.getScanner(scan);
// 4.決議resultScanner
for (Result result : resultScanner) {
// 5.決議result并列印
for (Cell cell : result.rawCells()) {
// 6.列印資料
System.out.println("RowKey:" + Bytes.toString(CellUtil.cloneRow(cell)) +
",CF:" + Bytes.toString(CellUtil.cloneFamily(cell)) +
",CN:" + Bytes.toString(CellUtil.cloneQualifier(cell)) +
",Value:" + Bytes.toString(CellUtil.cloneValue(cell)));
}
}
// 7.關閉表連接
table.close();
}
/**
* @param tableName 要洗掉的表名
* @param rowKey 要洗掉的rowKey
* @param cf 要洗掉的列簇
* @param cn 要洗掉的列名
* @throws IOException
* @description Delete標記: 洗掉特定列列指定的版本
* DeleteFamily標記: 洗掉特定列族所有列
* DeleteColumn標記: 洗掉特定列的所有版本
* 指定rowkey: 使用DeleteFamily標記
* ---->不加時間戳表示洗掉[指定rowkey]的所有資料,加時間戳表示洗掉[指定rowkey]中[時間戳版本小于或等于指定時間戳]的所有資料
* 指定rowkey+columnFamily: 使用DeleteFamily標記
* ---->不加時間戳表示洗掉[指定列族]的所有資料,加了時間戳就表示洗掉[指定列族]下[時間戳版本小于或等于指定時間戳]的所有資料
* 指定rowkey+columnFamily+column(addColumns): 使用DeleteColumn標記
* ---->不加時間戳表示洗掉[指定列]所有版本的資料,加時間戳表示洗掉[指定列]中[時間戳版本小于或等于指定時間戳]的所有資料,
* 指定rowkey+columnFamily+column(addColumn): 使用Delete標記 (只洗掉單個版本資料,生產環境盡量別用)
* ---->不加時間戳表示洗掉[指定列]中[最新版本]的資料,加時間戳表示洗掉[指定列]中[指定時間戳版本]的資料,
* ---->不推薦的原因是:操作不同(如flush前后操作產生的結果會不一樣)結果可能不同
* 如:在flush前如果有多個版本的資料,此時進行addColumn(不加時間戳)操作,會將最新版本的資料洗掉,然后老版本的資料會出現
* 在flush后進行addColumn(不加時間戳)操作,會將最新版本的資料洗掉,而此時flush已將老版本的資料進行了洗掉,所有此時老版本的資料就不會出現了
* <p>
* 洗掉列最好使用addColumns
* addColumns:不加時間戳表示洗掉指定列所有版本的資料(推薦)
* addColumns:加時間戳表示洗掉時間戳小于或等于指定時間戳的指定列的所有版本,
* addColumn:不加時間戳表示洗掉最新版本的資料,操作不同(如flush前后操作產生的結果會不一樣)結果可能不同
* addColumn:加時間戳表示洗掉指定時間戳的指定列版本的資料,
*/
public static void deleteData(Connection connection, String tableName, String rowKey, String cf, String cn) throws IOException {
// 1.獲取表物件
Table table = connection.getTable(TableName.valueOf(tableName));
// 2.創建洗掉物件
Delete delete = new Delete(Bytes.toBytes(rowKey));
//delete.addColumns(Bytes.toBytes(cf), Bytes.toBytes(cn));
//delete.addColumns(Bytes.toBytes(cf), Bytes.toBytes(cn),1574158036021L);
//delete.addColumn(Bytes.toBytes(cf), Bytes.toBytes(cn));
//delete.addColumn(Bytes.toBytes(cf), Bytes.toBytes(cn),1574158036021L);
// 2.2 洗掉指定的列族
// addFamily:洗掉指定列族的所有列的所有版本資料,
delete.addFamily(Bytes.toBytes(cf));
// 3.執行洗掉操作
table.delete(delete);
// 4.關閉連接
table.close();
}
/**
* @author aaronChen
* @description 關閉資源
*/
public static void close(Admin admin, Connection connection) {
if (admin != null) {
try {
admin.close();
} catch (IOException e) {
e.printStackTrace();
}
}
if (connection != null) {
try {
connection.close();
} catch (IOException e) {
e.printStackTrace();
}
}
}
public static JSONObject getJSONObject(String rowKey, List<Cell> cells) {
/**
* @return JSONObject {
* "rowKey":"11111",
* "value":{
* "name":"aaron",
* "age":18
* }
* }
*/
JSONObject jsonObject = new JSONObject();
JSONObject valueJsonObject = new JSONObject();
for (Cell cell : cells) {
valueJsonObject.put(Bytes.toString(CellUtil.cloneQualifier(cell)), Bytes.toString(CellUtil.cloneValue(cell)));
}
jsonObject.put("rowKey", rowKey);
jsonObject.put("value", valueJsonObject);
return jsonObject;
}
}
比較核心的一個幾個方法做出詳細說明下:
3.2.1、新建表函式
public static boolean createTable(Admin admin, String tableName, boolean createNameSpaceIfExists, String... cfs)
會根據傳入的相關資訊進行表的創建,例如:
表名是"aaron1:test1",當"aaron1"命名空間不存在的時候,會根據createNameSpaceIfExists引數來選擇是否在不存在的情況下創建,如果需要創建,就會先創建該命名空間后,再去新建表,
public static void main(String[] args) throws Exception{
createTable(getHBaseConnection().getAdmin(),"aaron1:test1",false,"name","age","id");
}

當傳入的createNameSpaceIfExists是true的時候
public static void main(String[] args) throws Exception{
createTable(getHBaseConnection().getAdmin(),"aaron1:test1",true,"name","age","id");
}
在web頁面可以查看到已經創建成功
3.2.2、插入資料
根據傳入的引數來插入具體的資料,
public static void putData(Connection connection, String tableName, String rowKey, String columnFamily, String columnName, String value)
public static void main(String[] args) throws Exception{
putData(getHBaseConnection(),"aaron1:test1","aaaa","name","schoolName","Nanj");
}
通過圖形化工具可以看到資料已經插入成功

3.2.3、查詢資料
本函式可以通過特定的rowKey查詢出結果,并把結果轉成JavaBean輸出
public static <T> T getData(Connection connection, String tableName, String rowKey, String columnFamily, String columnName, boolean underScoreToCamel, Class<T> clz)
public static void main(String[] args) throws Exception {
PassengerInfor passengerInfor = getData(getHBaseConnection(), "aaron:test1", "11111", "first", "", true, PassengerInfor.class);
log.info("查詢出來的結果為:{},", passengerInfor);
}
![]()
4、Flink異步IO訪問Hbase
因為異步IO都是通過函式繼承RichAsyncFunction這個抽象類,所以為了更大化的使用,先撰寫一個工具類,
4.1、異步IO訪問Hbase工具類
/** * @param <IN> 流輸入需要查詢的內容 * @param <HBase> 從HBase查詢回傳的結果集 * @param <OUT> 最終輸入流查詢完HBase后回傳的結果集 * @author :aaronChen * @date :Created in 2022-02-22 16:48 * @description: 該類是用來根據傳入的一個rowKey來查詢HBase某個表里的資料公共類 */
原始碼如下,其中使用的時候,使用到了如上的一些Hbase工具類:
package com.horse.utils.hbase;
import com.horse.utils.ThreadPoolUtil;
import lombok.extern.slf4j.Slf4j;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.functions.async.ResultFuture;
import org.apache.flink.streaming.api.functions.async.RichAsyncFunction;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.*;
import java.io.IOException;
import java.util.Collections;
import java.util.concurrent.ThreadPoolExecutor;
/**
* @param <IN> 流輸入需要查詢的內容
* @param <HBase> 從HBase查詢回傳的結果集
* @param <OUT> 最終輸入流查詢完HBase后回傳的結果集
* @author :aaronChen
* @date :Created in 2022-02-22 16:48
* @description: 該類是用來根據傳入的一個rowKey來查詢HBase某個表里的資料公共類
*/
@Slf4j
public abstract class HBaseAsyncFunctionUtil<IN, HBase, OUT> extends RichAsyncFunction<IN, OUT> {
private Connection connection;
private ThreadPoolExecutor threadPoolExecutor;
private Admin hBaseAdmin;
private HBase queryHBaseBean;
private Class<HBase> clz;
/**
* hBase表名
*/
private Table hBaseTable;
/**
* 列簇名
*/
private String columnFamily;
/**
* 列名
*/
private String columnName;
/**
* 命名空間:tableName
*/
private String tableName;
public HBaseAsyncFunctionUtil(String tableName, String columnFamily, String columnName, Class<HBase> clz) {
this(tableName, columnFamily, clz);
this.columnName = columnName;
}
public HBaseAsyncFunctionUtil(String tableName, String columnFamily, Class<HBase> clz) {
this.clz = clz;
this.tableName = tableName;
this.columnFamily = columnFamily;
}
public abstract String setRowKey(IN input);
public abstract OUT getList(String rowKey, IN input, HBase hBaseResult);
private void setQueryHBaseBean() throws Exception {
queryHBaseBean = clz.newInstance();
}
@Override
public void asyncInvoke(IN input, ResultFuture<OUT> resultFuture) throws Exception {
threadPoolExecutor.submit(new Runnable() {
@Override
public void run() {
resultFuture.complete(Collections.singleton(getResult(input)));
}
});
}
@Override
public void timeout(IN input, ResultFuture<OUT> resultFuture) throws Exception {
close();
connc();
log.info("Connection to HBase TimeOut,Now is reConnect!");
threadPoolExecutor.submit(new Runnable() {
@Override
public void run() {
resultFuture.complete(Collections.singleton(getResult(input)));
}
});
}
private OUT getResult(IN input) {
String rowKey = setRowKey(input);
try {
setQueryHBaseBean();
HBase hBaseResult = (HBase) HBaseUtils.getData(connection, tableName, rowKey, columnFamily, columnName, true, queryHBaseBean.getClass());
return getList(rowKey, input, hBaseResult);
} catch (Exception e) {
e.printStackTrace();
}
return null;
}
@Override
public void open(Configuration parameters) throws Exception {
super.open(parameters);
//賦值connection
connc();
}
@Override
public void close() throws Exception {
super.close();
if (hBaseTable != null) {
hBaseTable.close();
}
if (hBaseAdmin != null) {
hBaseAdmin.close();
}
if (connection != null) {
connection.close();
}
}
private void connc() throws IOException {
connection = HBaseUtils.getHBaseConnection();
hBaseAdmin = connection.getAdmin();
if (!hBaseAdmin.tableExists(TableName.valueOf(tableName))) {
throw new IOException("Query Hbase Table is not exists!");
}
hBaseTable = connection.getTable(TableName.valueOf(tableName));
//初始化
threadPoolExecutor = ThreadPoolUtil.getThreadPool();
}
}
4.2、具體使用
同樣還是利用nc模擬輸入流資料
主類AsyncIOQueryHBase代碼如下:
package com.horse.asyncio;
import com.horse.bean.PassengerInfor;
import com.horse.bean.UserLoginLog;
import com.horse.cep.function.MyFlatMapFunction;
import org.apache.flink.streaming.api.datastream.AsyncDataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import com.horse.utils.hbase.HBaseAsyncFunctionUtil;
import java.util.concurrent.TimeUnit;
/**
* @author :aaronChen
* @date :Created in 2022-02-24 14:08
* @description :
* @modifiedBy :
*/
public class AsyncIOQueryHBase {
public static void main(String[] args) {
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
DataStreamSource<String> socketTextStream = env.socketTextStream("localhost", 8888);
SingleOutputStreamOperator<UserLoginLog> dataStream = socketTextStream.flatMap(new MyFlatMapFunction());
SingleOutputStreamOperator<PassengerInfor> outputStreamOperator = AsyncDataStream.unorderedWait(dataStream,
new HBaseAsyncFunctionUtil<UserLoginLog, PassengerInfor, PassengerInfor>("aaron:test1", "first", "", PassengerInfor.class) {
@Override
public String setRowKey(UserLoginLog input) {
return input.getUserName();
}
@Override
public PassengerInfor getList(String rowKey, UserLoginLog input, PassengerInfor hBaseResult) {
return hBaseResult;
}
}, 1000, TimeUnit.MINUTES, 100);
outputStreamOperator.print("異步IO查詢Hbase結果輸出");
try {
env.execute();
} catch (Exception e) {
e.printStackTrace();
}
}
}
在nc中輸入如下資料:
{"loginId":11111,"loginTime":1645177352000,"loginStatus":1,"userName":"11111"}

查看日志:
可以看到如上圖中所示,已經能夠從hbase中查詢出結果,
如果覺得寫的不錯的,可以適當表示下哈~本人表示感謝,如果有寫的不到位的也可以提問,

轉載請註明出處,本文鏈接:https://www.uj5u.com/qita/432151.html
標籤:其他

