主頁 >  其他 > Flink查詢關聯Hbase輸出

Flink查詢關聯Hbase輸出

2022-02-25 07:57:40 其他

1、前言

大家在開發Flink的時候,很多時候會把某些固定的維度資訊存盤在Hbase或者Redis等這些第三方庫里,已方便來跟流發生關聯查詢輸出,本文將從如何本地搭建一個Hbase環境開始講起,到如何構建一個Hbase公共呼叫類,到如何構建一個異步呼叫Hbase的公共代碼,再到最后實際呼叫代碼后的輸出,

2、本地利用Docker搭建HBase環境

本地如何搭建Docker環境,之前一篇博客中已經詳細描述過,大家如果想學習如何搭建,可以去看下,地址如下:Docker入門-Windows 10&Mac系統安裝_一個資料小開發的博客-CSDN博客一、何為DockerDocker 是一個開源的應用容器引擎,基于Go語言并遵從 Apache2.0 協議開源,Docker 可以讓開發者打包他們的應用以及依賴包到一個輕量級、可移植的容器中,然后發布到任何流行的 Linux 機器上,也可以實作虛擬化,容器是完全使用沙箱機制,相互之間不會有任何介面(類似 iPhone 的 app),更重要的是容器性能開銷極低,runc 是一個 Linux 命令列工具,用于根據 OCI容器運行時規范 創建和運行容器,containerd 是一個守護程式https://blog.csdn.net/Aaron_ch/article/details/115559512?ops_request_misc=%257B%2522request%255Fid%2522%253A%2522164568960216780271525279%2522%252C%2522scm%2522%253A%252220140713.130102334.pc%255Fblog.%2522%257D&request_id=164568960216780271525279&biz_id=0&utm_medium=distribute.pc_search_result.none-task-blog-2~blog~first_rank_ecpm_v1~rank_v31_ecpm-1-115559512.nonecase&utm_term=docker&spm=1018.2226.3001.4450

2.1、下載Hbase鏡像

docker pull harisekhon/hbase

2.2、啟動Hbase鏡像:

docker run -d -h myhbase -p 2181:2181 -p 8080:8080 -p 8085:8085 -p 9090:9090 -p 9095:9095 -p 16000:16000 -p 16010:16010 -p 16020:16020 -p 16201:16201 -p 16301:16301 --name hbase harisekhon/hbase

Hbase 埠映射圖:

2.3、本地添加host

打開/etc/hosts ,如下圖所示,添加一行

127.0.0.1 myhbase

此時在瀏覽器中輸入hbase訪問的地址,可以看到hbase的頁面

http://localhost:16010/master-status

3、訪問Hbase

3.1、圖形化工具訪問Hbase

如下的圖形化工具,Mac和Windows都可以訪問,

如果需要圖形化工具的,評論區留下郵箱,本人看到了會第一時間發送

3.2、Java工具類訪問

直接上代碼,可以直接使用

package com.horse.utils.hbase;

import com.alibaba.fastjson.JSONObject;
import com.google.common.base.CaseFormat;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.beanutils.BeanUtils;
import org.apache.flink.util.StringUtils;
import org.apache.hadoop.hbase.*;
import org.apache.hadoop.hbase.client.*;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.util.Bytes;

import java.io.IOException;
import java.util.List;

/**
 * @author :aaronChen
 * @date :Created in 2022-02-22 16:51
 * @description :
 * @modifiedBy :
 */
@Slf4j
public class HBaseUtils {

    public static Connection getHBaseConnection() {

        try {
            // 1.獲取組態檔資訊
            Configuration configuration = HBaseConfiguration.create();
            configuration.set("hbase.zookeeper.quorum", "localhost");
//            configuration.set("hbase.zookeeper.property.clientPort", "2181");//可寫可不寫,默認為2181

            // 2.創建連接物件
            return ConnectionFactory.createConnection(configuration);

        } catch (IOException e) {
            log.error("獲取Hbase相關基礎資訊失");
            e.printStackTrace();
        }
        return null;
    }

//    static {
//        try {
//            // 1.獲取組態檔資訊
//            Configuration configuration = HBaseConfiguration.create();
//            configuration.set("hbase.zookeeper.quorum", "localhost");
            configuration.set("hbase.zookeeper.property.clientPort", "2181");//可寫可不寫,默認為2181
//
//            // 2.創建連接物件
//            connection = ConnectionFactory.createConnection(configuration);
//
//            // 3.創建Admin物件
//            admin = connection.getAdmin();
//
//        } catch (IOException e) {
//            log.error("獲取Hbase相關基礎資訊失");
//            e.printStackTrace();
//        }
//    }

    /**
     * @param tableName
     * @return true or false
     * @throws IOException
     * @author aaronChen
     * @description 判斷表是否存在
     */
    public static boolean isTableExist(String tableName, Admin admin) throws IOException {
        // 1.判斷表是否存在
        boolean exists = admin.tableExists(TableName.valueOf(tableName));

        // 2.回傳結果
        return exists;
    }


    /**
     * @param tableName               需要創建的hbase表名,格式必須要是 命名空間:tablename
     * @param cfs                     可以同時傳入多個列簇
     * @param createNameSpaceIfExists 如果命名空間不存在,是否創建
     * @throws IOException
     * @author aaronChen
     * @description 創建表
     */
    public static boolean createTable(Admin admin, String tableName, boolean createNameSpaceIfExists, String... cfs) throws IOException {

        //1、判斷傳入的表是否正確
        if (tableName.split(":").length != 2) {
            log.error("{},傳入的表名有問題,需要傳入 nameSpace:tableName 這種格式", tableName);
            return false;
        }

        String nameSpace = tableName.split(":")[0];

        //2、判斷命名空間相關資訊
        if (!isNameSpaceExist(admin, nameSpace)) {
            if (createNameSpaceIfExists) {
                log.info("{},該命名空間不存在,開始創建該命名空間", nameSpace);
                createNameSpace(admin, nameSpace);
            } else {
                log.error("{},該命名空間不存在", nameSpace);
                return false;
            }
        }

        // 3、判斷表是否存在
        if (isTableExist(tableName, admin)) {
            log.error("{},表已存在", tableName);
            return false;
        }

        // 4、判斷是否存在列族資訊
        if (cfs.length <= 0) {
            log.info("請設定列族資訊!");
            return false;
        }

        // 5、創建表描述器
        try {
            if (admin.tableExists(TableName.valueOf(tableName))) {
                return false;
            }
            //定義表描述物件
            TableDescriptorBuilder tableDescriptorBuilder = TableDescriptorBuilder.newBuilder(TableName.valueOf(tableName));
            //遍歷列族陣列
            for (String cf : cfs) {
                //定義列族描述物件
                ColumnFamilyDescriptor columnFamily = ColumnFamilyDescriptorBuilder.of(cf);
                //給表添加列族資訊
                tableDescriptorBuilder.setColumnFamily(columnFamily);
            }
            //創建表
            admin.createTable(tableDescriptorBuilder.build());
        } catch (Exception e) {
            e.printStackTrace();
            return false;
        }

        log.info("{},表新建成功", tableName);

        return true;

    }

    /**
     * @param tableName
     * @throws IOException
     * @author aaronChen
     * @description 洗掉表
     */
    public static void dropTable(Admin admin, String tableName) throws IOException {

        // 1.判斷表是否存在
        if (!isTableExist(tableName, admin)) {
            log.error("{},表不存在", tableName);
            return;
        }

        // 2.使表下線
        admin.disableTable(TableName.valueOf(tableName));

        // 3.洗掉表
        admin.deleteTable(TableName.valueOf(tableName));
    }

    /**
     * @param nameSpace
     * @return true or false
     * @throws IOException
     * @author aaronChen
     * @description 判斷一個命名空間是否存在
     */
    public static boolean isNameSpaceExist(Admin admin, String nameSpace) throws IOException {
        NamespaceDescriptor[] namespaceDescriptors = admin.listNamespaceDescriptors();
        for (NamespaceDescriptor namespaceDescriptor : namespaceDescriptors) {
            if (namespaceDescriptor.getName().equals(nameSpace)) {
                return true;
            }
        }
        return false;
    }

    /**
     * @param namespace
     * @author aaronChen
     * @description 創建命名空間
     */
    public static void createNameSpace(Admin admin, String namespace) {

        // 1.創建命名空間描述器
        NamespaceDescriptor namespaceDescriptor = NamespaceDescriptor.create(namespace).build();

        // 2.創建命名空間
        try {
            admin.createNamespace(namespaceDescriptor);
        } catch (NamespaceExistException e) {
            log.error("{},命名空間已存在", namespace);
        } catch (IOException e) {
            e.printStackTrace();
        }
    }

    /**
     * @param tableName
     * @param rowKey
     * @param columnFamily
     * @param columnName
     * @param value
     * @throws IOException
     * @author aaronChen
     * @description 向表中插入資料
     */
    public static void putData(Connection connection, String tableName, String rowKey, String columnFamily, String columnName, String value) throws IOException {
        // 1.獲取表物件
        Table table = connection.getTable(TableName.valueOf(tableName));

        // 2.創建put物件
        Put put = new Put(Bytes.toBytes(rowKey));

        // 3.給Put物件賦值
        put.addColumn(Bytes.toBytes(columnFamily), Bytes.toBytes(columnName), Bytes.toBytes(value));

        // 4.插入資料
        table.put(put);

        // 5.關閉表連接
        table.close();
    }


    /**
     * @param tableName
     * @param clz
     * @param <T>
     * @return
     * @throws Exception
     */
    public static <T> T getData(Connection connection, String tableName, String rowKey, String columnFamily, String columnName, boolean underScoreToCamel, Class<T> clz) throws Exception {

        //1、判斷傳入的引數是否為慷訓空字串
        if (StringUtils.isNullOrWhitespaceOnly(tableName)) {
            log.error("傳入的tableName為空:{}", tableName);
            return null;
        }

        if (StringUtils.isNullOrWhitespaceOnly(rowKey)) {
            log.error("傳入的rowKey為空:{}", rowKey);
            return null;
        }

        if (StringUtils.isNullOrWhitespaceOnly(columnFamily)) {
            log.error("傳入的columnFamily為空:{}", columnFamily);
            return null;
        }

        Get get = new Get(Bytes.toBytes(rowKey));

        if (StringUtils.isNullOrWhitespaceOnly(columnName)) {
            get.addFamily(Bytes.toBytes(columnFamily));
//            log.info("傳入的columnName為空:{},將根據傳入的其他資訊查詢全量的列資訊", columnName);
        } else {
            //傳入的不為空的話,就查詢傳入的值
            get.addColumn(Bytes.toBytes(columnFamily), Bytes.toBytes(columnName));
//            log.info("傳入的columnName為:{},將根據傳入的值查詢資訊", columnName);
        }

        //4、獲取表物件
        Table table = connection.getTable(TableName.valueOf(tableName));

        //5、設定獲取資料的版本數
        //get.setMaxVersions(5);

        //5、獲取資料
        Result resultData = table.get(get);

        //6、關閉表連接
        table.close();

        //創建 泛型物件
        T t = clz.newInstance();

        //給泛型物件賦值

        List<Cell> cells = resultData.listCells();

        for (Cell cell : cells) {

            String resultColumnName = Bytes.toString(CellUtil.cloneQualifier(cell));
            String resultValue = Bytes.toString(CellUtil.cloneValue(cell));

            //判斷是否需要轉換為駝峰命名
            if (underScoreToCamel) {

                if (resultColumnName.contains("_")) {
                    resultColumnName = CaseFormat.LOWER_UNDERSCORE.to(CaseFormat.LOWER_CAMEL, resultColumnName.toLowerCase());
                } else if (resultColumnName.contains("-")) {
                    resultColumnName = CaseFormat.LOWER_HYPHEN.to(CaseFormat.LOWER_CAMEL, resultColumnName.toLowerCase());
                } else {
                    resultColumnName = CaseFormat.LOWER_CAMEL.to(CaseFormat.LOWER_CAMEL, resultColumnName);
                }

            }

            BeanUtils.setProperty(t, resultColumnName, resultValue);
        }

        return t;

    }


    /**
     * @param tableName
     * @return 回傳一個 JSONArray
     * <p>
     * [
     * {
     * "rowKey":"11111",
     * "value":[
     * {
     * "columnFamily":"first",
     * "columnValue":{
     * "name":"aaron",
     * "age":18
     * }
     * },
     * {
     * "columnFamily":"second",
     * "columnValue":{
     * "name":"aaron",
     * "age":18
     * }
     * }
     * ]
     * },
     * {
     * "rowKey":"22222",
     * "value":[
     * {
     * "columnFamily":"first",
     * "columnValue":{
     * "name":"wwww",
     * "age":19
     * }
     * },
     * {
     * "columnFamily":"second",
     * "columnValue":{
     * "name":"cccc",
     * "age":18,
     * "address","NanJ"
     * }
     * }
     * ]
     * }
     * ]
     * @throws IOException
     * @author aaronChen
     * @description 掃描全表資料
     * @deprecated
     */
    private static void scanTable(Connection connection, String tableName, String leftScanRowKey, String rightScanRowKey) throws IOException {


        // 1.獲取表物件
        Table table = connection.getTable(TableName.valueOf(tableName));

        // 2.構建Scan物件 // 左閉右開
        Scan scan = new Scan(Bytes.toBytes(leftScanRowKey), Bytes.toBytes(rightScanRowKey));

        // 3.掃描表
        ResultScanner resultScanner = table.getScanner(scan);

        // 4.決議resultScanner
        for (Result result : resultScanner) {

            // 5.決議result并列印
            for (Cell cell : result.rawCells()) {

                // 6.列印資料
                System.out.println("RowKey:" + Bytes.toString(CellUtil.cloneRow(cell)) +
                        ",CF:" + Bytes.toString(CellUtil.cloneFamily(cell)) +
                        ",CN:" + Bytes.toString(CellUtil.cloneQualifier(cell)) +
                        ",Value:" + Bytes.toString(CellUtil.cloneValue(cell)));
            }
        }

        // 7.關閉表連接
        table.close();
    }


    /**
     * @param tableName 要洗掉的表名
     * @param rowKey    要洗掉的rowKey
     * @param cf        要洗掉的列簇
     * @param cn        要洗掉的列名
     * @throws IOException
     * @description Delete標記: 洗掉特定列列指定的版本
     * DeleteFamily標記: 洗掉特定列族所有列
     * DeleteColumn標記: 洗掉特定列的所有版本
     * 指定rowkey: 使用DeleteFamily標記
     * ---->不加時間戳表示洗掉[指定rowkey]的所有資料,加時間戳表示洗掉[指定rowkey]中[時間戳版本小于或等于指定時間戳]的所有資料
     * 指定rowkey+columnFamily: 使用DeleteFamily標記
     * ---->不加時間戳表示洗掉[指定列族]的所有資料,加了時間戳就表示洗掉[指定列族]下[時間戳版本小于或等于指定時間戳]的所有資料
     * 指定rowkey+columnFamily+column(addColumns): 使用DeleteColumn標記
     * ---->不加時間戳表示洗掉[指定列]所有版本的資料,加時間戳表示洗掉[指定列]中[時間戳版本小于或等于指定時間戳]的所有資料,
     * 指定rowkey+columnFamily+column(addColumn): 使用Delete標記 (只洗掉單個版本資料,生產環境盡量別用)
     * ---->不加時間戳表示洗掉[指定列]中[最新版本]的資料,加時間戳表示洗掉[指定列]中[指定時間戳版本]的資料,
     * ---->不推薦的原因是:操作不同(如flush前后操作產生的結果會不一樣)結果可能不同
     * 如:在flush前如果有多個版本的資料,此時進行addColumn(不加時間戳)操作,會將最新版本的資料洗掉,然后老版本的資料會出現
     * 在flush后進行addColumn(不加時間戳)操作,會將最新版本的資料洗掉,而此時flush已將老版本的資料進行了洗掉,所有此時老版本的資料就不會出現了
     * <p>
     * 洗掉列最好使用addColumns
     * addColumns:不加時間戳表示洗掉指定列所有版本的資料(推薦)
     * addColumns:加時間戳表示洗掉時間戳小于或等于指定時間戳的指定列的所有版本,
     * addColumn:不加時間戳表示洗掉最新版本的資料,操作不同(如flush前后操作產生的結果會不一樣)結果可能不同
     * addColumn:加時間戳表示洗掉指定時間戳的指定列版本的資料,
     */
    public static void deleteData(Connection connection, String tableName, String rowKey, String cf, String cn) throws IOException {

        // 1.獲取表物件
        Table table = connection.getTable(TableName.valueOf(tableName));

        // 2.創建洗掉物件
        Delete delete = new Delete(Bytes.toBytes(rowKey));


        //delete.addColumns(Bytes.toBytes(cf), Bytes.toBytes(cn));
        //delete.addColumns(Bytes.toBytes(cf), Bytes.toBytes(cn),1574158036021L);

        //delete.addColumn(Bytes.toBytes(cf), Bytes.toBytes(cn));
        //delete.addColumn(Bytes.toBytes(cf), Bytes.toBytes(cn),1574158036021L);

        // 2.2 洗掉指定的列族
        // addFamily:洗掉指定列族的所有列的所有版本資料,
        delete.addFamily(Bytes.toBytes(cf));

        // 3.執行洗掉操作
        table.delete(delete);

        // 4.關閉連接
        table.close();
    }

    /**
     * @author aaronChen
     * @description 關閉資源
     */
    public static void close(Admin admin, Connection connection) {

        if (admin != null) {
            try {
                admin.close();
            } catch (IOException e) {
                e.printStackTrace();
            }
        }
        if (connection != null) {
            try {
                connection.close();
            } catch (IOException e) {
                e.printStackTrace();
            }
        }
    }



    public static JSONObject getJSONObject(String rowKey, List<Cell> cells) {

        /**
         * @return JSONObject {
         *       "rowKey":"11111",
         *      "value":{
         *      "name":"aaron",
         *      "age":18
         *            }
         *      }
         */
        JSONObject jsonObject = new JSONObject();
        JSONObject valueJsonObject = new JSONObject();

        for (Cell cell : cells) {
            valueJsonObject.put(Bytes.toString(CellUtil.cloneQualifier(cell)), Bytes.toString(CellUtil.cloneValue(cell)));
        }

        jsonObject.put("rowKey", rowKey);
        jsonObject.put("value", valueJsonObject);
        return jsonObject;
    }

}

比較核心的一個幾個方法做出詳細說明下:

3.2.1、新建表函式

public static boolean createTable(Admin admin, String tableName, boolean createNameSpaceIfExists, String... cfs)

會根據傳入的相關資訊進行表的創建,例如:

表名是"aaron1:test1",當"aaron1"命名空間不存在的時候,會根據createNameSpaceIfExists引數來選擇是否在不存在的情況下創建,如果需要創建,就會先創建該命名空間后,再去新建表,

public static void main(String[] args) throws Exception{
        createTable(getHBaseConnection().getAdmin(),"aaron1:test1",false,"name","age","id");
    }

當傳入的createNameSpaceIfExists是true的時候

public static void main(String[] args) throws Exception{
        createTable(getHBaseConnection().getAdmin(),"aaron1:test1",true,"name","age","id");
    }

在web頁面可以查看到已經創建成功


3.2.2、插入資料

根據傳入的引數來插入具體的資料,

public static void putData(Connection connection, String tableName, String rowKey, String columnFamily, String columnName, String value)
public static void main(String[] args) throws Exception{
        putData(getHBaseConnection(),"aaron1:test1","aaaa","name","schoolName","Nanj");
    }

通過圖形化工具可以看到資料已經插入成功

3.2.3、查詢資料

本函式可以通過特定的rowKey查詢出結果,并把結果轉成JavaBean輸出

public static <T> T getData(Connection connection, String tableName, String rowKey, String columnFamily, String columnName, boolean underScoreToCamel, Class<T> clz)
public static void main(String[] args) throws Exception {
        PassengerInfor passengerInfor = getData(getHBaseConnection(), "aaron:test1", "11111", "first", "", true, PassengerInfor.class);
        log.info("查詢出來的結果為:{},", passengerInfor);
    }

4、Flink異步IO訪問Hbase

因為異步IO都是通過函式繼承RichAsyncFunction這個抽象類,所以為了更大化的使用,先撰寫一個工具類,

4.1、異步IO訪問Hbase工具類

/**
 * @param <IN>    流輸入需要查詢的內容
 * @param <HBase> 從HBase查詢回傳的結果集
 * @param <OUT>   最終輸入流查詢完HBase后回傳的結果集
 * @author :aaronChen
 * @date :Created in 2022-02-22 16:48
 * @description: 該類是用來根據傳入的一個rowKey來查詢HBase某個表里的資料公共類
 */

原始碼如下,其中使用的時候,使用到了如上的一些Hbase工具類:

package com.horse.utils.hbase;

import com.horse.utils.ThreadPoolUtil;
import lombok.extern.slf4j.Slf4j;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.functions.async.ResultFuture;
import org.apache.flink.streaming.api.functions.async.RichAsyncFunction;
import org.apache.hadoop.hbase.TableName;

import org.apache.hadoop.hbase.client.*;

import java.io.IOException;
import java.util.Collections;
import java.util.concurrent.ThreadPoolExecutor;

/**
 * @param <IN>    流輸入需要查詢的內容
 * @param <HBase> 從HBase查詢回傳的結果集
 * @param <OUT>   最終輸入流查詢完HBase后回傳的結果集
 * @author :aaronChen
 * @date :Created in 2022-02-22 16:48
 * @description: 該類是用來根據傳入的一個rowKey來查詢HBase某個表里的資料公共類
 */
@Slf4j
public abstract class HBaseAsyncFunctionUtil<IN, HBase, OUT> extends RichAsyncFunction<IN, OUT> {

    private Connection connection;
    private ThreadPoolExecutor threadPoolExecutor;

    private Admin hBaseAdmin;

    private HBase queryHBaseBean;

    private Class<HBase> clz;

    /**
     * hBase表名
     */
    private Table hBaseTable;

    /**
     * 列簇名
     */
    private String columnFamily;

    /**
     * 列名
     */
    private String columnName;

    /**
     * 命名空間:tableName
     */
    private String tableName;

    public HBaseAsyncFunctionUtil(String tableName, String columnFamily, String columnName, Class<HBase> clz) {
        this(tableName, columnFamily, clz);
        this.columnName = columnName;
    }

    public HBaseAsyncFunctionUtil(String tableName, String columnFamily, Class<HBase> clz) {

        this.clz = clz;
        this.tableName = tableName;
        this.columnFamily = columnFamily;
    }

    public abstract String setRowKey(IN input);

    public abstract OUT getList(String rowKey, IN input, HBase hBaseResult);

    private void setQueryHBaseBean() throws Exception {
        queryHBaseBean = clz.newInstance();
    }

    @Override
    public void asyncInvoke(IN input, ResultFuture<OUT> resultFuture) throws Exception {

        threadPoolExecutor.submit(new Runnable() {
            @Override
            public void run() {
                resultFuture.complete(Collections.singleton(getResult(input)));
            }
        });
    }


    @Override
    public void timeout(IN input, ResultFuture<OUT> resultFuture) throws Exception {
        close();
        connc();
        log.info("Connection to HBase TimeOut,Now is reConnect!");

        threadPoolExecutor.submit(new Runnable() {
            @Override
            public void run() {
                resultFuture.complete(Collections.singleton(getResult(input)));
            }
        });

    }

    private OUT getResult(IN input) {

        String rowKey = setRowKey(input);
        try {

            setQueryHBaseBean();

            HBase hBaseResult = (HBase) HBaseUtils.getData(connection, tableName, rowKey, columnFamily, columnName, true, queryHBaseBean.getClass());

            return getList(rowKey, input, hBaseResult);

        } catch (Exception e) {
            e.printStackTrace();
        }

        return null;
    }


    @Override
    public void open(Configuration parameters) throws Exception {
        super.open(parameters);
        //賦值connection
        connc();
    }

    @Override
    public void close() throws Exception {
        super.close();

        if (hBaseTable != null) {
            hBaseTable.close();
        }

        if (hBaseAdmin != null) {
            hBaseAdmin.close();
        }

        if (connection != null) {
            connection.close();
        }
    }

    private void connc() throws IOException {
        connection = HBaseUtils.getHBaseConnection();

        hBaseAdmin = connection.getAdmin();

        if (!hBaseAdmin.tableExists(TableName.valueOf(tableName))) {
            throw new IOException("Query Hbase Table is not exists!");
        }

        hBaseTable = connection.getTable(TableName.valueOf(tableName));

        //初始化
        threadPoolExecutor = ThreadPoolUtil.getThreadPool();
    }
}

4.2、具體使用

同樣還是利用nc模擬輸入流資料

主類AsyncIOQueryHBase代碼如下:

package com.horse.asyncio;

import com.horse.bean.PassengerInfor;
import com.horse.bean.UserLoginLog;
import com.horse.cep.function.MyFlatMapFunction;
import org.apache.flink.streaming.api.datastream.AsyncDataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import com.horse.utils.hbase.HBaseAsyncFunctionUtil;

import java.util.concurrent.TimeUnit;

/**
 * @author :aaronChen
 * @date :Created in 2022-02-24 14:08
 * @description :
 * @modifiedBy :
 */
public class AsyncIOQueryHBase {
    public static void main(String[] args) {
        final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        env.setParallelism(1);

        DataStreamSource<String> socketTextStream = env.socketTextStream("localhost", 8888);

        SingleOutputStreamOperator<UserLoginLog> dataStream = socketTextStream.flatMap(new MyFlatMapFunction());

        SingleOutputStreamOperator<PassengerInfor> outputStreamOperator = AsyncDataStream.unorderedWait(dataStream,
                new HBaseAsyncFunctionUtil<UserLoginLog, PassengerInfor, PassengerInfor>("aaron:test1", "first", "", PassengerInfor.class) {

                    @Override
                    public String setRowKey(UserLoginLog input) {
                        return input.getUserName();
                    }

                    @Override
                    public PassengerInfor getList(String rowKey, UserLoginLog input, PassengerInfor hBaseResult) {
                        return hBaseResult;
                    }

                }, 1000, TimeUnit.MINUTES, 100);

        outputStreamOperator.print("異步IO查詢Hbase結果輸出");

        try {
            env.execute();
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
}

在nc中輸入如下資料:

{"loginId":11111,"loginTime":1645177352000,"loginStatus":1,"userName":"11111"}

查看日志:

可以看到如上圖中所示,已經能夠從hbase中查詢出結果,

如果覺得寫的不錯的,可以適當表示下哈~本人表示感謝,如果有寫的不到位的也可以提問,

轉載請註明出處,本文鏈接:https://www.uj5u.com/qita/432151.html

標籤:其他

上一篇:使用canal實時同步MySQL資料到es

下一篇:實踐資料湖iceberg 第二十四課 iceberg元資料詳細決議

標籤雲
其他(157675) Python(38076) JavaScript(25376) Java(17977) C(15215) 區塊鏈(8255) C#(7972) AI(7469) 爪哇(7425) MySQL(7132) html(6777) 基礎類(6313) sql(6102) 熊猫(6058) PHP(5869) 数组(5741) R(5409) Linux(5327) 反应(5209) 腳本語言(PerlPython)(5129) 非技術區(4971) Android(4554) 数据框(4311) css(4259) 节点.js(4032) C語言(3288) json(3245) 列表(3129) 扑(3119) C++語言(3117) 安卓(2998) 打字稿(2995) VBA(2789) Java相關(2746) 疑難問題(2699) 细绳(2522) 單片機工控(2479) iOS(2429) ASP.NET(2402) MongoDB(2323) 麻木的(2285) 正则表达式(2254) 字典(2211) 循环(2198) 迅速(2185) 擅长(2169) 镖(2155) 功能(1967) .NET技术(1958) Web開發(1951) python-3.x(1918) HtmlCss(1915) 弹簧靴(1913) C++(1909) xml(1889) PostgreSQL(1872) .NETCore(1853) 谷歌表格(1846) Unity3D(1843) for循环(1842)

熱門瀏覽
  • 網閘典型架構簡述

    網閘架構一般分為兩種:三主機的三系統架構網閘和雙主機的2+1架構網閘。 三主機架構分別為內端機、外端機和仲裁機。三機無論從軟體和硬體上均各自獨立。首先從硬體上來看,三機都用各自獨立的主板、記憶體及存盤設備。從軟體上來看,三機有各自獨立的作業系統。這樣能達到完全的三機獨立。對于“2+1”系統,“2”分為 ......

    uj5u.com 2020-09-10 02:00:44 more
  • 如何從xshell上傳檔案到centos linux虛擬機里

    如何從xshell上傳檔案到centos linux虛擬機里及:虛擬機CentOs下執行 yum -y install lrzsz命令,出現錯誤:鏡像無法找到軟體包 前言 一、安裝lrzsz步驟 二、上傳檔案 三、遇到的問題及解決方案 總結 前言 提示:其實很簡單,往虛擬機上安裝一個上傳檔案的工具 ......

    uj5u.com 2020-09-10 02:00:47 more
  • 一、SQLMAP入門

    一、SQLMAP入門 1、判斷是否存在注入 sqlmap.py -u 網址/id=1 id=1不可缺少。當注入點后面的引數大于兩個時。需要加雙引號, sqlmap.py -u "網址/id=1&uid=1" 2、判斷文本中的請求是否存在注入 從文本中加載http請求,SQLMAP可以從一個文本檔案中 ......

    uj5u.com 2020-09-10 02:00:50 more
  • Metasploit 簡單使用教程

    metasploit 簡單使用教程 浩先生, 2020-08-28 16:18:25 分類專欄: kail 網路安全 linux 文章標簽: linux資訊安全 編輯 著作權 metasploit 使用教程 前言 一、Metasploit是什么? 二、準備作業 三、具體步驟 前言 Msfconsole ......

    uj5u.com 2020-09-10 02:00:53 more
  • 游戲逆向之驅動層與用戶層通訊

    驅動層代碼: #pragma once #include <ntifs.h> #define add_code CTL_CODE(FILE_DEVICE_UNKNOWN,0x800,METHOD_BUFFERED,FILE_ANY_ACCESS) /* 更多游戲逆向視頻www.yxfzedu.com ......

    uj5u.com 2020-09-10 02:00:56 more
  • 北斗電力時鐘(北斗授時服務器)讓網路資料更精準

    北斗電力時鐘(北斗授時服務器)讓網路資料更精準 北斗電力時鐘(北斗授時服務器)讓網路資料更精準 京準電子科技官微——ahjzsz 近幾年,資訊技術的得了快速發展,互聯網在逐漸普及,其在人們生活和生產中都得到了廣泛應用,并且取得了不錯的應用效果。計算機網路資訊在電力系統中的應用,一方面使電力系統的運行 ......

    uj5u.com 2020-09-10 02:01:03 more
  • 【CTF】CTFHub 技能樹 彩蛋 writeup

    ?碎碎念 CTFHub:https://www.ctfhub.com/ 筆者入門CTF時時剛開始刷的是bugku的舊平臺,后來才有了CTFHub。 感覺不論是網頁UI設計,還是題目質量,賽事跟蹤,工具軟體都做得很不錯。 而且因為獨到的金幣制度的確讓人有一種想去刷題賺金幣的感覺。 個人還是非常喜歡這個 ......

    uj5u.com 2020-09-10 02:04:05 more
  • 02windows基礎操作

    我學到了一下幾點 Windows系統目錄結構與滲透的作用 常見Windows的服務詳解 Windows埠詳解 常用的Windows注冊表詳解 hacker DOS命令詳解(net user / type /md /rd/ dir /cd /net use copy、批處理 等) 利用dos命令制作 ......

    uj5u.com 2020-09-10 02:04:18 more
  • 03.Linux基礎操作

    我學到了以下幾點 01Linux系統介紹02系統安裝,密碼啊破解03Linux常用命令04LAMP 01LINUX windows: win03 8 12 16 19 配置不繁瑣 Linux:redhat,centos(紅帽社區版),Ubuntu server,suse unix:金融機構,證券,銀 ......

    uj5u.com 2020-09-10 02:04:30 more
  • 05HTML

    01HTML介紹 02頭部標簽講解03基礎標簽講解04表單標簽講解 HTML前段語言 js1.了解代碼2.根據代碼 懂得挖掘漏洞 (POST注入/XSS漏洞上傳)3.黑帽seo 白帽seo 客戶網站被黑帽植入劫持代碼如何處理4.熟悉html表單 <html><head><title>TDK標題,描述 ......

    uj5u.com 2020-09-10 02:04:36 more
最新发布
  • 2023年最新微信小程式抓包教程

    01 開門見山 隔一個月發一篇文章,不過分。 首先回顧一下《微信系結手機號資料庫被脫庫事件》,我也是第一時間得知了這個訊息,然后跟蹤了整件事情的經過。下面是這起事件的相關截圖以及近日流出的一萬條資料樣本: 個人認為這件事也沒什么,還不如關注一下之前45億快遞資料查詢渠道疑似在近日復活的訊息。 訊息是 ......

    uj5u.com 2023-04-20 08:48:24 more
  • web3 產品介紹:metamask 錢包 使用最多的瀏覽器插件錢包

    Metamask錢包是一種基于區塊鏈技術的數字貨幣錢包,它允許用戶在安全、便捷的環境下管理自己的加密資產。Metamask錢包是以太坊生態系統中最流行的錢包之一,它具有易于使用、安全性高和功能強大等優點。 本文將詳細介紹Metamask錢包的功能和使用方法。 一、 Metamask錢包的功能 數字資 ......

    uj5u.com 2023-04-20 08:47:46 more
  • vulnhub_Earth

    前言 靶機地址->>>vulnhub_Earth 攻擊機ip:192.168.20.121 靶機ip:192.168.20.122 參考文章 https://www.cnblogs.com/Jing-X/archive/2022/04/03/16097695.html https://www.cnb ......

    uj5u.com 2023-04-20 07:46:20 more
  • 從4k到42k,軟體測驗工程師的漲薪史,給我看哭了

    清明節一過,盲猜大家已經無心上班,在數著日子準備過五一,但一想到銀行卡里的余額……瞬間心情就不美麗了。最近,2023年高校畢業生就業調查顯示,本科畢業月平均起薪為5825元。調查一出,便有很多同學表示自己又被平均了。看著這一資料,不免讓人想到前不久中國青年報的一項調查:近六成大學生認為畢業10年內會 ......

    uj5u.com 2023-04-20 07:44:00 more
  • 最新版本 Stable Diffusion 開源 AI 繪畫工具之中文自動提詞篇

    🎈 標簽生成器 由于輸入正向提示詞 prompt 和反向提示詞 negative prompt 都是使用英文,所以對學習母語的我們非常不友好 使用網址:https://tinygeeker.github.io/p/ai-prompt-generator 這個網址是為了讓大家在使用 AI 繪畫的時候 ......

    uj5u.com 2023-04-20 07:43:36 more
  • 漫談前端自動化測驗演進之路及測驗工具分析

    隨著前端技術的不斷發展和應用程式的日益復雜,前端自動化測驗也在不斷演進。隨著 Web 應用程式變得越來越復雜,自動化測驗的需求也越來越高。如今,自動化測驗已經成為 Web 應用程式開發程序中不可或缺的一部分,它們可以幫助開發人員更快地發現和修復錯誤,提高應用程式的性能和可靠性。 ......

    uj5u.com 2023-04-20 07:43:16 more
  • CANN開發實踐:4個DVPP記憶體問題的典型案例解讀

    摘要:由于DVPP媒體資料處理功能對存放輸入、輸出資料的記憶體有更高的要求(例如,記憶體首地址128位元組對齊),因此需呼叫專用的記憶體申請介面,那么本期就分享幾個關于DVPP記憶體問題的典型案例,并給出原因分析及解決方法。 本文分享自華為云社區《FAQ_DVPP記憶體問題案例》,作者:昇騰CANN。 DVPP ......

    uj5u.com 2023-04-20 07:43:03 more
  • msf學習

    msf學習 以kali自帶的msf為例 一、msf核心模塊與功能 msf模塊都放在/usr/share/metasploit-framework/modules目錄下 1、auxiliary 輔助模塊,輔助滲透(埠掃描、登錄密碼爆破、漏洞驗證等) 2、encoders 編碼器模塊,主要包含各種編碼 ......

    uj5u.com 2023-04-20 07:42:59 more
  • Halcon軟體安裝與界面簡介

    1. 下載Halcon17版本到到本地 2. 雙擊安裝包后 3. 步驟如下 1.2 Halcon軟體安裝 界面分為四大塊 1. Halcon的五個助手 1) 影像采集助手:與相機連接,設定相機引數,采集影像 2) 標定助手:九點標定或是其它的標定,生成標定檔案及內參外參,可以將像素單位轉換為長度單位 ......

    uj5u.com 2023-04-20 07:42:17 more
  • 在MacOS下使用Unity3D開發游戲

    第一次發博客,先發一下我的游戲開發環境吧。 去年2月份買了一臺MacBookPro2021 M1pro(以下簡稱mbp),這一年來一直在用mbp開發游戲。我大致分享一下我的開發工具以及使用體驗。 1、Unity 官網鏈接: https://unity.cn/releases 我一般使用的Apple ......

    uj5u.com 2023-04-20 07:40:19 more