主頁 >  其他 > Elasticsearch:使用 Apache Flink、Elasticsearch 打造實時事件處理及搜索

Elasticsearch:使用 Apache Flink、Elasticsearch 打造實時事件處理及搜索

2021-10-14 07:41:47 其他

從實時持續生成的資料中獲取可操作的見解是當今許多企業的共同要求, 實時資料處理的一個廣泛用例是儀表板, 支持此類用例的典型架構基于資料流處理器、具有低延遲讀/寫訪問的資料存盤和可視化框架,

在這篇博文中,我們演示了如何使用 Apache Flink 和 Elasticsearch 為流資料分析構建實時事件處理及搜索, 下圖描述了我們的系統架構,在實際的應用中,我們可以使用 Kibana 共同打造實時儀表板解決方案,

Real-time-dashboard-for-stream-data analytics.png

在我們的架構中,Apache Flink 執行流分析作業,這些作業攝取資料流,應用轉換來分析、轉換和建模動態資料,并將其結果寫入 Elasticsearch 索引, Kibana 連接到索引并查詢它以獲取要可視化的資料, 我們架構的所有組件都是 Apache License 2.0 下的開源系統, 在今天的展示中,我將重點講述如何從資料的攝入到 Flink 并做相應的一些處理,并最終寫入到 Elasticsearch 中去,

為什么要使用 Apache Flink 進行流處理?

在深入探討實作演示應用程式的細節之前,我們先討論一些使 Apache Flink 成為出色流處理器的特性,Apache Flink 帶有一組具有競爭力的流處理功能,其中一些在開源領域是獨一無二的,最重要的是:

  • 支持事件時間和亂序流:實際上,事件流很少按照它們產生的順序到達,尤其是來自分布式系統和設備的流,直到現在,由應用程式員來糾正這種“時間漂移”,或者干脆忽略它并接受不準確的結果,因為流系統(至少在開源世界中)不支持事件時間(即處理事件當它們發生在現實世界時), Flink 是第一個支持亂序流并且能夠根據時間戳一致處理事件的開源引擎,
  • Scala 和 Java 中富有表現力且易于使用的 API:Flink 的 DataStream API 將許多在批處理 API 中眾所周知的運算子(例如 map、reduce 和 join)移植到流媒體世界,此外,它還提供特定于流的操作,例如視窗(window)、拆分(split)和連接(connect),對用戶定義函式的一流支持簡化了自定義應用程式行為的實作, DataStream API 在 Scala 和 Java 中可用,
  • 支持會話和未對齊的視窗:大多數流媒體系統都有一些視窗的概念,即基于某些時間函式的一組事件,不幸的是,在許多系統中,這些視窗是硬編碼的,并與系統的內部檢查點機制相關聯, Flink 是第一個將視窗與容錯完全解耦的開源流引擎,允許更豐富的視窗形式,例如會話,
  • 一致性、容錯性和高可用性:Flink 保證在出現故障時狀態更新的一致性(通常稱為“exactly-once processing”),以及選定源和接收器之間的一致資料移動(例如,Kafka 和 HDFS 之間的一致資料移動),Flink 還支持 worker 和 master 故障轉移,消除任何單點故障,
  • 低延遲和高吞吐量:我們已經將 Flink 的時鐘頻率設定為每核心每秒 150 萬個事件,并且還觀察到包括網路資料改組在內的作業的延遲在 25 毫秒范圍內,使用調整旋鈕,Flink 用戶可以導航延遲-吞吐量權衡,使系統既適合高吞吐量資料攝取和轉換,也適合超低延遲(毫秒范圍)應用程式,
  • 連接器和集成點:Flink 與各種開源系統集成,用于資料輸入和輸出(例如 HDFS、Kafka、Elasticsearch、HBase 等)、部署(例如 YARN)以及充當執行引擎對于其他框架(例如,Cascading、Google Cloud Dataflow), Flink 專案本身捆綁了一個 Hadoop MapReduce 兼容層、一個 Storm 兼容層,以及用于機器學習和圖形處理的庫,
  • 開發人員生產力和操作簡單性:Flink 可在各種環境中運行, IDE 中的本地執行顯著簡化了 Flink 應用程式的開發和除錯,在分布式設定中,Flink 以大規模橫向擴展運行, YARN 模式允許用戶在幾秒鐘內啟動 Flink 集群, Flink 通過定義良好的 REST 介面來監控作業和整個系統的指標,內置的 Web 儀表板顯示這些指標,并使 Flink 的監控非常方便,

這些特性的結合使 Apache Flink 成為許多流處理應用程式的獨特選擇,

Flink stream processing API

在接下來的步驟中,我們將按照上面的順序來完成對事件的處理,

安裝

對于沒有接觸 Flink 及 Elastic Stack 的開發者來說,你需要安裝如下的部分:

Elasticsearch

你可以參考我之前的文章 “如何在 Linux,MacOS 及 Windows 上進行安裝 Elasticsearch” 在你自己喜歡的系統上安裝 Elasticsearch,

Kibana

你可以參考我之前的文章 “ Kibana:如何在 Linux,MacOS 及 Windows上安裝 Elastic 堆疊中的 Kibana” 在自己喜歡的系統上安裝 Kibana,

Flink

對于這個部分的按照,你可以參考如下的鏈接:

  • macOS: How to Install Apache Flink On Mac OS
  • Windows: How to Install Apache Flink On Local Windows
  • Ubuntu: How to Install Apache Flink On Ubuntu

在這些系統上的安裝是非常直接的,針對我的安裝,我選擇 macOS,我使用如下的方式來運行 Flink:

$ start-cluster.sh
Starting cluster.
Starting standalonesession daemon on host liuxg.
Starting taskexecutor daemon on host liuxg.

如上所示,它顯示我們的 Flink 已經成功運行起來了,在啟動后,我們甚至可以在瀏覽器中打開地址 http://localhost:8081來查看 Flink 的運行狀態,我們甚至在這里可以提交我們的任務,

如果你能看到上面的畫面,說明我們的 Flink 的安裝是成功的,

創建演示例子

接下來,我們將使用 Java 來構建一個展示的例子,它使用 API 來訪問 Flink,如上所示,我們將使用 Flink 的 enviornment,source,transform 及 sink APIs 來構建我們的應用,為了方便大家學習,我已經把我的專案上傳到 github 了,你需要使用如下的命令來進行下載:

git clone https://github.com/liu-xiao-guo/ElasticsearchFlink

你可以使用你自己喜歡的 IDE 來創建一個新的專案來開始,

source

在我們的練習中,我們將使用 nc 這個工具來發送資料,你需要在自己的平臺上安裝 nc,我們使用如下命令來啟動 nc:

nc -l 8888

如上所示,它打開埠 8888,并偵聽(-l)向這個埠發送的連接,我們可以在一個 terminal 中運行上面的命令,在下面的實驗中,我們可以在這個 terminal 中打入字串,并回車,這樣它就可以把資料發送到一個已經建立的連接中,

ElasticsearchFlink.java

這是整個代碼的最重要的部分,其實也是蠻簡單的,我把代碼貼下來:

ElasticsearchFlink.java

import com.liuxg.User;

import org.apache.flink.api.common.functions.FilterFunction;
import org.apache.flink.api.common.functions.RuntimeContext;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.connectors.elasticsearch.ElasticsearchSinkFunction;
import org.apache.flink.streaming.connectors.elasticsearch.RequestIndexer;
import org.apache.flink.streaming.connectors.elasticsearch7.ElasticsearchSink;
import org.apache.http.HttpHost;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.client.Requests;

import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;

public class ElasticsearchFlink {
    public static void main(String[] args) {
        // Create Flink environment
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        // Define a source
        try {
            DataStreamSource<String> source = env.socketTextStream("localhost", 8888);

            DataStream<String> filterSource = source.filter(new FilterFunction<String>() {
                @Override
                public boolean filter(String s) throws Exception {
                    return !s.contains("hello");
                }
            });

            DataStream<User> transSource = filterSource.map(value -> {
                String[] fields = value.split(",");
                return new User(fields[ 0 ], fields[ 1 ]);
            });

            // Use ESBuilder  to construct an output
            List<HttpHost> hosts = new ArrayList<>();
            hosts.add(new HttpHost("localhost", 9200, "http"));
            ElasticsearchSink.Builder<User> builder = new ElasticsearchSink.Builder<User>(hosts,
                    new ElasticsearchSinkFunction<User>() {
                        @Override
                        public void process(User u, RuntimeContext runtimeContext, RequestIndexer requestIndexer) {
                            Map<String, String> jsonMap = new HashMap<>();
                            jsonMap.put("id", u.id);
                            jsonMap.put("name", u.name);
                            IndexRequest indexRequest = Requests.indexRequest();
                            indexRequest.index("flink-test");
                            // indexRequest.id("1000");
                            indexRequest.source(jsonMap);
                            requestIndexer.add(indexRequest);
                        }
                    });
            
            // Define a sink
            builder.setBulkFlushMaxActions(1);
            transSource.addSink(builder.build());

            // Execute the transform
            env.execute("flink-es");

        } catch (Exception e) {
            e.printStackTrace();
        }
    }
}

如上所示,我們在開始的部分得到 enviroment:

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

我們接下來,使用如下的方法來建立一個和 localhost:8888 埠的連接:

   DataStreamSource<String> source = env.socketTextStream("localhost", 8888);

如果我們的 nc 已經成功運行,那么上面的句子將正常回傳,

接下來,我們使用一個 Flink 的 filter 功能,它對資料做一個簡單的 transform,如果字串中含有 "hello",這個資料將被忽略,最終它不會被寫入到 Elasticsearch 中:

            DataStream<String> filterSource = source.filter(new FilterFunction<String>() {
                @Override
                public boolean filter(String s) throws Exception {
                    return !s.contains("hello");
                }
            });

再接下來,我們使用一個 Map 的 transform 功能,比如,當我們的輸入資料為 1,liuxg 時,我們希望提前到的資料是 id:1, 及 name:liuxg,

            DataStream<User> transSource = filterSource.map(value -> {
                String[] fields = value.split(",");
                return new User(fields[ 0 ], fields[ 1 ]);
            });

這個 transfrom 也非常簡單,

在 Flink API 的最后部分是 sink,我們可以通過如下的方式來寫入資料到 Elasticsearch 中:

            // Use ESBuilder  to construct an output
            List<HttpHost> hosts = new ArrayList<>();
            hosts.add(new HttpHost("localhost", 9200, "http"));
            ElasticsearchSink.Builder<User> builder = new ElasticsearchSink.Builder<User>(hosts,
                    new ElasticsearchSinkFunction<User>() {
                        @Override
                        public void process(User u, RuntimeContext runtimeContext, RequestIndexer requestIndexer) {
                            Map<String, String> jsonMap = new HashMap<>();
                            jsonMap.put("id", u.id);
                            jsonMap.put("name", u.name);
                            IndexRequest indexRequest = Requests.indexRequest();
                            indexRequest.index("flink-test");
                            // indexRequest.id("1000");
                            indexRequest.source(jsonMap);
                            requestIndexer.add(indexRequest);
                        }
                    });
            
            // Define a sink
            builder.setBulkFlushMaxActions(1);
            transSource.addSink(builder.build());

            // Execute the transform
            env.execute("flink-es");

在這里需要注意的是我們在 hosts 的構建中:

   hosts.add(new HttpHost("localhost", 9200, "http"));

我們需要根據自己的 Elasticsearch 地址及埠號做相應的修改,在上面特別需要指出的是如下的這句:

builder.setBulkFlushMaxActions(1);

因為 Flink 有批處理及實時處理,在上面我們設定這個引數值為1,表明每當收到任何的資訊,就會立即進行處理,而不需要等到收集到一定的事件后再做處理,

我們接下運行應用,在運行之前我們確保 nc 已經成功運行,否則應用將會退出,我們接下在 nc 運行所在的界面中打入如下的一行字并回車:

1,liuxg

我們在 Kibana 中進行查看:

GET _cat/indices/flink-test

它將顯示有一個叫做 flink-test 的索引已經被成功地創建了:

我們再接著使用如下的命令來進行搜索:

GET flink-test/_search

我們看到有一個檔案已經被創建了,

我們再接下來打入如下的一行字:

2,hello

顯然在這個輸入中,它含有 hello 字串,在我們的設計中,如果含有 hello,那么在 filter 的設計中將回傳 false,也就是說這個資料將不被寫入到 Elasticsearch 中,我們可以在 Kibana 中使用上面的同樣的命令來進行查看,

結論

在這篇博文中,我們演示了如何使用 Apache Flink 和 Elasticsearch 構建實時事件處理及搜索的應用程式, 通過支持事件時間處理,Apache Flink 能夠產生有意義且一致的結果,即使對于歷史資料或在事件無序到達的環境中也是如此, 與其他開源流處理解決方案相比,具有靈活視窗語意的富有表現力的 DataStream API 可顯著減少自定義應用程式邏輯, 在本次的展示中,我們使用了 Flink 的極少一部分對資料 transform 的功能,Flink 具有許多的資料分析功能,通過 Flink 和 Elastic Stack 的結合,它比將產生許多豐富的應用場景,

轉載請註明出處,本文鏈接:https://www.uj5u.com/qita/312074.html

標籤:其他

上一篇:springboot操作資料庫表的blob欄位

下一篇:做JAVA后端的千萬別裸辭,我已經遭到了社會的毒打...

標籤雲
其他(157675) Python(38076) JavaScript(25376) Java(17977) C(15215) 區塊鏈(8255) C#(7972) AI(7469) 爪哇(7425) MySQL(7132) html(6777) 基礎類(6313) sql(6102) 熊猫(6058) PHP(5869) 数组(5741) R(5409) Linux(5327) 反应(5209) 腳本語言(PerlPython)(5129) 非技術區(4971) Android(4554) 数据框(4311) css(4259) 节点.js(4032) C語言(3288) json(3245) 列表(3129) 扑(3119) C++語言(3117) 安卓(2998) 打字稿(2995) VBA(2789) Java相關(2746) 疑難問題(2699) 细绳(2522) 單片機工控(2479) iOS(2429) ASP.NET(2402) MongoDB(2323) 麻木的(2285) 正则表达式(2254) 字典(2211) 循环(2198) 迅速(2185) 擅长(2169) 镖(2155) 功能(1967) .NET技术(1958) Web開發(1951) python-3.x(1918) HtmlCss(1915) 弹簧靴(1913) C++(1909) xml(1889) PostgreSQL(1872) .NETCore(1853) 谷歌表格(1846) Unity3D(1843) for循环(1842)

熱門瀏覽
  • 網閘典型架構簡述

    網閘架構一般分為兩種:三主機的三系統架構網閘和雙主機的2+1架構網閘。 三主機架構分別為內端機、外端機和仲裁機。三機無論從軟體和硬體上均各自獨立。首先從硬體上來看,三機都用各自獨立的主板、記憶體及存盤設備。從軟體上來看,三機有各自獨立的作業系統。這樣能達到完全的三機獨立。對于“2+1”系統,“2”分為 ......

    uj5u.com 2020-09-10 02:00:44 more
  • 如何從xshell上傳檔案到centos linux虛擬機里

    如何從xshell上傳檔案到centos linux虛擬機里及:虛擬機CentOs下執行 yum -y install lrzsz命令,出現錯誤:鏡像無法找到軟體包 前言 一、安裝lrzsz步驟 二、上傳檔案 三、遇到的問題及解決方案 總結 前言 提示:其實很簡單,往虛擬機上安裝一個上傳檔案的工具 ......

    uj5u.com 2020-09-10 02:00:47 more
  • 一、SQLMAP入門

    一、SQLMAP入門 1、判斷是否存在注入 sqlmap.py -u 網址/id=1 id=1不可缺少。當注入點后面的引數大于兩個時。需要加雙引號, sqlmap.py -u "網址/id=1&uid=1" 2、判斷文本中的請求是否存在注入 從文本中加載http請求,SQLMAP可以從一個文本檔案中 ......

    uj5u.com 2020-09-10 02:00:50 more
  • Metasploit 簡單使用教程

    metasploit 簡單使用教程 浩先生, 2020-08-28 16:18:25 分類專欄: kail 網路安全 linux 文章標簽: linux資訊安全 編輯 著作權 metasploit 使用教程 前言 一、Metasploit是什么? 二、準備作業 三、具體步驟 前言 Msfconsole ......

    uj5u.com 2020-09-10 02:00:53 more
  • 游戲逆向之驅動層與用戶層通訊

    驅動層代碼: #pragma once #include <ntifs.h> #define add_code CTL_CODE(FILE_DEVICE_UNKNOWN,0x800,METHOD_BUFFERED,FILE_ANY_ACCESS) /* 更多游戲逆向視頻www.yxfzedu.com ......

    uj5u.com 2020-09-10 02:00:56 more
  • 北斗電力時鐘(北斗授時服務器)讓網路資料更精準

    北斗電力時鐘(北斗授時服務器)讓網路資料更精準 北斗電力時鐘(北斗授時服務器)讓網路資料更精準 京準電子科技官微——ahjzsz 近幾年,資訊技術的得了快速發展,互聯網在逐漸普及,其在人們生活和生產中都得到了廣泛應用,并且取得了不錯的應用效果。計算機網路資訊在電力系統中的應用,一方面使電力系統的運行 ......

    uj5u.com 2020-09-10 02:01:03 more
  • 【CTF】CTFHub 技能樹 彩蛋 writeup

    ?碎碎念 CTFHub:https://www.ctfhub.com/ 筆者入門CTF時時剛開始刷的是bugku的舊平臺,后來才有了CTFHub。 感覺不論是網頁UI設計,還是題目質量,賽事跟蹤,工具軟體都做得很不錯。 而且因為獨到的金幣制度的確讓人有一種想去刷題賺金幣的感覺。 個人還是非常喜歡這個 ......

    uj5u.com 2020-09-10 02:04:05 more
  • 02windows基礎操作

    我學到了一下幾點 Windows系統目錄結構與滲透的作用 常見Windows的服務詳解 Windows埠詳解 常用的Windows注冊表詳解 hacker DOS命令詳解(net user / type /md /rd/ dir /cd /net use copy、批處理 等) 利用dos命令制作 ......

    uj5u.com 2020-09-10 02:04:18 more
  • 03.Linux基礎操作

    我學到了以下幾點 01Linux系統介紹02系統安裝,密碼啊破解03Linux常用命令04LAMP 01LINUX windows: win03 8 12 16 19 配置不繁瑣 Linux:redhat,centos(紅帽社區版),Ubuntu server,suse unix:金融機構,證券,銀 ......

    uj5u.com 2020-09-10 02:04:30 more
  • 05HTML

    01HTML介紹 02頭部標簽講解03基礎標簽講解04表單標簽講解 HTML前段語言 js1.了解代碼2.根據代碼 懂得挖掘漏洞 (POST注入/XSS漏洞上傳)3.黑帽seo 白帽seo 客戶網站被黑帽植入劫持代碼如何處理4.熟悉html表單 <html><head><title>TDK標題,描述 ......

    uj5u.com 2020-09-10 02:04:36 more
最新发布
  • 2023年最新微信小程式抓包教程

    01 開門見山 隔一個月發一篇文章,不過分。 首先回顧一下《微信系結手機號資料庫被脫庫事件》,我也是第一時間得知了這個訊息,然后跟蹤了整件事情的經過。下面是這起事件的相關截圖以及近日流出的一萬條資料樣本: 個人認為這件事也沒什么,還不如關注一下之前45億快遞資料查詢渠道疑似在近日復活的訊息。 訊息是 ......

    uj5u.com 2023-04-20 08:48:24 more
  • web3 產品介紹:metamask 錢包 使用最多的瀏覽器插件錢包

    Metamask錢包是一種基于區塊鏈技術的數字貨幣錢包,它允許用戶在安全、便捷的環境下管理自己的加密資產。Metamask錢包是以太坊生態系統中最流行的錢包之一,它具有易于使用、安全性高和功能強大等優點。 本文將詳細介紹Metamask錢包的功能和使用方法。 一、 Metamask錢包的功能 數字資 ......

    uj5u.com 2023-04-20 08:47:46 more
  • vulnhub_Earth

    前言 靶機地址->>>vulnhub_Earth 攻擊機ip:192.168.20.121 靶機ip:192.168.20.122 參考文章 https://www.cnblogs.com/Jing-X/archive/2022/04/03/16097695.html https://www.cnb ......

    uj5u.com 2023-04-20 07:46:20 more
  • 從4k到42k,軟體測驗工程師的漲薪史,給我看哭了

    清明節一過,盲猜大家已經無心上班,在數著日子準備過五一,但一想到銀行卡里的余額……瞬間心情就不美麗了。最近,2023年高校畢業生就業調查顯示,本科畢業月平均起薪為5825元。調查一出,便有很多同學表示自己又被平均了。看著這一資料,不免讓人想到前不久中國青年報的一項調查:近六成大學生認為畢業10年內會 ......

    uj5u.com 2023-04-20 07:44:00 more
  • 最新版本 Stable Diffusion 開源 AI 繪畫工具之中文自動提詞篇

    🎈 標簽生成器 由于輸入正向提示詞 prompt 和反向提示詞 negative prompt 都是使用英文,所以對學習母語的我們非常不友好 使用網址:https://tinygeeker.github.io/p/ai-prompt-generator 這個網址是為了讓大家在使用 AI 繪畫的時候 ......

    uj5u.com 2023-04-20 07:43:36 more
  • 漫談前端自動化測驗演進之路及測驗工具分析

    隨著前端技術的不斷發展和應用程式的日益復雜,前端自動化測驗也在不斷演進。隨著 Web 應用程式變得越來越復雜,自動化測驗的需求也越來越高。如今,自動化測驗已經成為 Web 應用程式開發程序中不可或缺的一部分,它們可以幫助開發人員更快地發現和修復錯誤,提高應用程式的性能和可靠性。 ......

    uj5u.com 2023-04-20 07:43:16 more
  • CANN開發實踐:4個DVPP記憶體問題的典型案例解讀

    摘要:由于DVPP媒體資料處理功能對存放輸入、輸出資料的記憶體有更高的要求(例如,記憶體首地址128位元組對齊),因此需呼叫專用的記憶體申請介面,那么本期就分享幾個關于DVPP記憶體問題的典型案例,并給出原因分析及解決方法。 本文分享自華為云社區《FAQ_DVPP記憶體問題案例》,作者:昇騰CANN。 DVPP ......

    uj5u.com 2023-04-20 07:43:03 more
  • msf學習

    msf學習 以kali自帶的msf為例 一、msf核心模塊與功能 msf模塊都放在/usr/share/metasploit-framework/modules目錄下 1、auxiliary 輔助模塊,輔助滲透(埠掃描、登錄密碼爆破、漏洞驗證等) 2、encoders 編碼器模塊,主要包含各種編碼 ......

    uj5u.com 2023-04-20 07:42:59 more
  • Halcon軟體安裝與界面簡介

    1. 下載Halcon17版本到到本地 2. 雙擊安裝包后 3. 步驟如下 1.2 Halcon軟體安裝 界面分為四大塊 1. Halcon的五個助手 1) 影像采集助手:與相機連接,設定相機引數,采集影像 2) 標定助手:九點標定或是其它的標定,生成標定檔案及內參外參,可以將像素單位轉換為長度單位 ......

    uj5u.com 2023-04-20 07:42:17 more
  • 在MacOS下使用Unity3D開發游戲

    第一次發博客,先發一下我的游戲開發環境吧。 去年2月份買了一臺MacBookPro2021 M1pro(以下簡稱mbp),這一年來一直在用mbp開發游戲。我大致分享一下我的開發工具以及使用體驗。 1、Unity 官網鏈接: https://unity.cn/releases 我一般使用的Apple ......

    uj5u.com 2023-04-20 07:40:19 more