文章目錄
- 前言
- 實作方案
- 全量和增量同步
- SpringBoot集成Elasticearch
前言
在實際開發專案程序當中,難免會使用到Elasticsearch做搜索,文章描述從Mysql通過Logstash實時同步到Elasticsearch,下面就開始來進行實作吧!具體的Elasticsearch+Logstash+kibana搭建,請移步到 ELK搭建步驟,
實作方案
本人總結了兩種實作方案來實作mysql到es的同步,
- 使用Elastic官方提供的 Logstash 來實作Mysql的全量和增量同步(根據時間戳或者自增id),
- 使用Elastic 官方提供的 Logstash 來實作全量同步,后續的資料庫表更新、洗掉、修改等通過阿里開源的框架canal實作(增量同步), canal偽裝成mysql的從節點,通過binlog日志檔案進行同步,通過Java程式進行監聽,同步到Elasticsearch當中,
本次介紹通過 Elastic 官方提供的 Logstash 來實作Mysql的全量和增量同步,
全量和增量同步
先看Mysql表的關系
一個是主表:news 資訊文章表,表內容如下:

一個是從表:custom_infomation 定制資訊表,與news 成 一對多的關系,一條文章對應多條定制資訊,表內容如下:

描述:custom_information表中的item_id和news表中的id有關聯關系,
用JSON資料結構來描述一對多的關系,如下:
{
"id":"15c7ee7a5dc411ea9bc2fa163e0c8256",
"title":"“宅經濟”進入數字化時代",
"source":"人民日報",
"customList":[
{
"secondLevel":"32552",
"isRelEnterprise":"0",
"secondLevelName":"濟南",
"moduleType":"1",
"customName":"地區1",
"firstLevel":"37200",
"firstLevelName":"山東",
"customId":"1",
"detId":"1"
},
{
"secondLevel":"222",
"isRelEnterprise":"0",
"secondLevelName":"林業1",
"moduleType":"1",
"customName":"行業1",
"firstLevel":"11",
"firstLevelName":"林業",
"customId":"2",
"detId":"3"
}
]
}
這里需要和Elasticsearch做映射關系,在Elasticsearch中也是一對多的關系,大致是這樣的結構,這里采用的是Elasticsearch中的nested型別來實作,

創建所需索引(采用靜態mapping映射)
PUT app-article-link
{
"mappings" : {
"properties" : {
"address" : {
"type" : "text",
"fields" : {
"keyword" : {
"type" : "keyword",
"ignore_above" : 256
}
}
},
"customList" : {
"type" : "nested",
"properties" : {
"customId" : {
"type" : "text",
"fields" : {
"keyword" : {
"type" : "keyword",
"ignore_above" : 256
}
}
},
"customName" : {
"type" : "text",
"fields" : {
"keyword" : {
"type" : "keyword",
"ignore_above" : 256
}
}
},
"detId" : {
"type" : "keyword"
},
"firstLevel" : {
"type" : "keyword"
},
"firstLevelName" : {
"type" : "text",
"fields" : {
"keyword" : {
"type" : "keyword",
"ignore_above" : 256
}
}
},
"isRelEnterprise" : {
"type" : "keyword"
},
"moduleType" : {
"type" : "keyword"
},
"secondLevel" : {
"type" : "keyword"
},
"secondLevelName" : {
"type" : "text",
"fields" : {
"keyword" : {
"type" : "keyword",
"ignore_above" : 256
}
}
}
}
},
"custom_list" : {
"type" : "text",
"fields" : {
"keyword" : {
"type" : "keyword",
"ignore_above" : 256
}
}
},
"detail" : {
"type" : "text",
"analyzer" : "ik_max_word",
"search_analyzer" : "ik_smart"
},
"endTime" : {
"type" : "keyword"
},
"id" : {
"type" : "keyword"
},
"industryName" : {
"type" : "text",
"fields" : {
"keyword" : {
"type" : "keyword",
"ignore_above" : 256
}
}
},
"isDelete" : {
"type" : "keyword"
},
"price" : {
"type" : "keyword"
},
"publishDate" : {
"type" : "keyword"
},
"relevanceType" : {
"type" : "keyword"
},
"savePath" : {
"type" : "keyword"
},
"source" : {
"type" : "text",
"fields" : {
"keyword" : {
"type" : "keyword",
"ignore_above" : 256
},
"suggest" : {
"type" : "completion",
"analyzer" : "simple",
"preserve_separators" : true,
"preserve_position_increments" : true,
"max_input_length" : 50
}
},
"analyzer" : "ik_max_word",
"search_analyzer" : "ik_smart"
},
"startTime" : {
"type" : "keyword"
},
"summary" : {
"type" : "text",
"analyzer" : "ik_max_word",
"search_analyzer" : "ik_smart"
},
"techFieldName" : {
"type" : "text",
"fields" : {
"keyword" : {
"type" : "keyword",
"ignore_above" : 256
}
}
},
"title" : {
"type" : "text",
"fields" : {
"keyword" : {
"type" : "keyword",
"ignore_above" : 256
},
"suggest" : {
"type" : "completion",
"analyzer" : "simple",
"preserve_separators" : true,
"preserve_position_increments" : true,
"max_input_length" : 50
}
},
"analyzer" : "ik_max_word",
"search_analyzer" : "ik_smart"
},
"update_time" : {
"type" : "keyword"
},
"videoStatus" : {
"type" : "keyword"
}
}
}
}
以下是Logstash 相關配置操作:
由于上面描述的資料庫表是一對多的關系,這里選擇先建立一個視圖,原因是會通過資料庫表的最新時間欄位來作為臨界點進行資料同步(關鍵點是找出主表和從表的最新時間點),視圖創建sql如下:
SELECT
t.id,
t.title,
t.source,
'8' AS relevanceType ,
date_format( greatest( `t`.`update_time`, ifnull( `i`.`update_time`, '1970' )), '%Y-%m-%d %H:%i:%s' ) AS `update_time`
FROM
`news` t
LEFT JOIN custom_information i
ON t.id=i.item_id
AND i.is_delete='0'
AND i.module_type='8'
WHERE
t.state = '0'
AND t.publish_status='3'
AND t.relevance_type='2'
上面的update_time為兩表中的最新時間,
在logstash congf目錄下創建news.conf,內容如下:
input {
jdbc {
jdbc_driver_library => "/opt/apps/logstash/lib/mysql-connector-java-8.0.13.jar"
jdbc_driver_class => "com.mysql.cj.jdbc.Driver"
jdbc_connection_string => "jdbc:mysql://192.168.0.178:3306/test?characterEncoding=utf8&useSSL=false&serverTimezone=UTC&rewriteBatchedStatements=true"
jdbc_user => "root"
jdbc_password => "123456"
connection_retry_attempts => "3"
jdbc_validation_timeout => "3600"
jdbc_paging_enabled => "true"
jdbc_page_size => "500"
statement_filepath => "/opt/apps/logstash/sql/news.sql"
use_column_value => true
lowercase_column_names => false
tracking_column => "update_time"
tracking_column_type => "timestamp"
record_last_run => true
last_run_metadata_path => "/opt/apps/logstash/station/news.txt"
clean_run => false
schedule => "*/5 * * * * *"
type => "news"
}
}
filter {
aggregate {
task_id => "%{id}"
code => "
map['id'] = event.get('id')
map['title'] = event.get('title')
map['source'] = event.get('source')
map['custom_list'] ||=[]
map['customList'] ||=[]
if (event.get('detId') != nil)
if !(map['custom_list'].include? event.get('detId'))
map['custom_list'] << event.get('detId')
map['customList'] << {
'detId' => event.get('detId'),
'moduleType' => event.get('moduleType'),
'customId' => event.get('customId'),
'customName' => event.get('customName'),
'firstLevel' => event.get('firstLevel'),
'firstLevelName' => event.get('firstLevelName'),
'secondLevel' => event.get('secondLevel'),
'secondLevelName' => event.get('secondLevelName'),
'isRelEnterprise' => event.get('isRelEnterprise')
}
end
end
event.cancel()
"
push_previous_map_as_event => true
timeout => 5
}
mutate {
}
mutate {
remove_field => ["@timestamp","@version"]
}
}
output {
elasticsearch {
document_id => "%{id}"
document_type => "_doc"
index => "app-article-link"
hosts => ["http://192.168.0.178:9200"]
}
stdout{
codec => rubydebug
}
}
input{} 中
statement_filepath 為sql陳述句位置,
last_run_metadata_path 記錄最新時間位置,下次從這個時間點開始更新,
tracking_column 為更新的時間欄位,
schedule 執行的時間 上述中每個五秒鐘執行一次,
執行的sql:
SELECT
n.id,
n.title,
n.source
FROM
news_view n
order by n.update_time
編輯conf/pipelines.yml
[root@localhost config]# vi pipelines.yml
# List of pipelines to be loaded by Logstash
#
# This document must be a list of dictionaries/hashes, where the keys/values are pipeline settings.
# Default values for omitted settings are read from the `logstash.yml` file.
# When declaring multiple pipelines, each MUST have its own `pipeline.id`.
#
# Example of two pipelines:
#
# - pipeline.id: test
# pipeline.workers: 1
# pipeline.batch.size: 1
# config.string: "input { generator {} } filter { sleep { time => 1 } } output { stdout { codec => dots } }"
# - pipeline.id: another_test
# queue.type: persisted
# path.config: "/tmp/logstash/*.config"
#
#- pipeline.id: news_table
# path.config: /opt/apps/logstash/config/addmysql.conf
#- pipeline.id: news_table3
# path.config: /opt/apps/logstash/config/addmysql3.conf
- pipeline.id: news
path.config: /opt/apps/logstash/config/news.conf
執行./bin/logstash
[root@localhost logstash]# ./bin/logstash
kibana常用查詢
精確查詢
GET /app-article-link/_search
{
"_source": ["id","title","source","customList","update_time","savePath","isDelete"],
"query": {
"bool": {
"must": [
{ "match": { "id": "15c7ee7a5dc411ea9bc2fa163e0c8256" }}
]
}}}
nested查詢,mapping映射型別必須為nested
GET app-article-link/_search
{
"query": {
"bool": {
"must": [
{
"nested": {
"path": "customList",
"query": {
"bool": {
"must": [
{ "match": { "customList.customId": "1" }},
{ "match": { "customList.secondLevel": "5552" }}
]
}}}}
]
}}}
自動補全查詢,欄位型別必須為completion
GET app-article-link/_search
{
"_source": ["source","title","detail"],
"suggest": {
"title_suggest": {
"prefix": "國家知識產",
"completion": {
"field": "title.suggest",
"size": 10,
"skip_duplicates": true
}
}
}
}
高亮查詢
GET app-article-link/_search
{
"query": {
"multi_match": {
"query": "安徽",
"fields": ["title"]
}
},
"highlight": {
"pre_tags": "<span class='highLight'>",
"post_tags": "</span>",
"fields": {
"title": {}
}
}
}
最終通過Logstash匯入的資料格式:

SpringBoot集成Elasticearch
搭建的Elasticsearch為7.8.1版本,
引入依賴
<!-- es搜索 -->
<dependency>
<groupId>org.elasticsearch.client</groupId>
<artifactId>elasticsearch-rest-high-level-client</artifactId>
<version>7.8.1</version>
</dependency>
<dependency>
<groupId>org.elasticsearch</groupId>
<artifactId>elasticsearch</artifactId>
<version>7.8.1</version>
</dependency>
<dependency>
<groupId>org.elasticsearch.client</groupId>
<artifactId>elasticsearch-rest-client</artifactId>
<version>7.8.1</version>
</dependency>
創建配置
@Configuration
public class ESConfig {
private static String hosts = "192.168.0.178"; // 集群地址,多個用,隔開
private static int port = 9200; // 使用的埠號
private static String schema = "http"; // 使用的協議
private static ArrayList<HttpHost> hostList = null;
private static int connectTimeOut = 1000; // 連接超時時間
private static int socketTimeOut = 30000; // 連接超時時間
private static int connectionRequestTimeOut = 500; // 獲取連接的超時時間
private static int maxConnectNum = 100; // 最大連接數
private static int maxConnectPerRoute = 100; // 最大路由連接數
static {
hostList = new ArrayList<>();
String[] hostStrs = hosts.split(",");
for (String host : hostStrs) {
hostList.add(new HttpHost(host, port, schema));
}
}
@Bean
public RestHighLevelClient restHighLevelClient(){
RestClientBuilder builder = RestClient.builder(hostList.toArray(new HttpHost[0]));
// 異步httpclient連接延時配置
builder.setRequestConfigCallback(new RequestConfigCallback() {
@Override
public Builder customizeRequestConfig(Builder requestConfigBuilder) {
requestConfigBuilder.setConnectTimeout(connectTimeOut);
requestConfigBuilder.setSocketTimeout(socketTimeOut);
requestConfigBuilder.setConnectionRequestTimeout(connectionRequestTimeOut);
return requestConfigBuilder;
}
});
// 異步httpclient連接數配置
builder.setHttpClientConfigCallback(new HttpClientConfigCallback() {
@Override
public HttpAsyncClientBuilder customizeHttpClient(HttpAsyncClientBuilder httpClientBuilder) {
httpClientBuilder.setMaxConnTotal(maxConnectNum);
httpClientBuilder.setMaxConnPerRoute(maxConnectPerRoute);
return httpClientBuilder;
}
});
RestHighLevelClient client = new RestHighLevelClient(builder);
return client;
}
}
撰寫測驗
注入template
@Autowired
private RestHighLevelClient restHighLevelClient ;
高亮搜索
public ResultBody highlighted(@RequestParam(value = "key") String key,
@RequestParam(value = "pageSize",defaultValue = "10") Integer pageSize,
@RequestParam(value = "from",defaultValue = "1") Integer from) throws IOException {
// 偏移量
int offset = (from -1) * pageSize ;
//定義索引庫
SearchRequest searchRequest = new SearchRequest("app-article-link");
SearchSourceBuilder searchSourceBuilder = new SearchSourceBuilder();
// 創建查詢陳述句 ES中must和should不能同時使用 同時使用should失效 嵌套多個must 將should條件拼接在一個must中即可
BoolQueryBuilder shouldQuery = QueryBuilders.boolQuery();
// 行業
/**if(industryList.size()>0) {
for (Map<String,String> itemMap : industryList) {
String customId = itemMap.get("customId");
String firstLevel = itemMap.get("firstLevel");
String secondLevel = itemMap.get("secondLevel");
NestedQueryBuilder nestedQueryBuilder = QueryBuilders.nestedQuery("customList",
QueryBuilders.boolQuery().must(QueryBuilders.matchQuery("customList.customId.keyword", customId))
.must(QueryBuilders.matchQuery("customList.firstLevel",firstLevel))
.must(QueryBuilders.matchQuery("customList.secondLevel",secondLevel)),
ScoreMode.None);
shouldQuery.should(nestedQueryBuilder);
}
}**/
// 地區定位
/**if(StringUtils.isNotBlank(areaCode)) {
// nested 嵌套物件查詢
NestedQueryBuilder nestedQueryBuilder = QueryBuilders.nestedQuery("customList",
QueryBuilders.boolQuery().must(QueryBuilders.matchQuery("customList.customId.keyword", "1"))
.must(QueryBuilders.matchQuery("customList.firstLevel",areaCode)),
ScoreMode.None);
shouldQuery.should(nestedQueryBuilder);
}**/
BoolQueryBuilder boolQueryBuilder = QueryBuilders.boolQuery()
.must(shouldQuery)
.must(QueryBuilders.matchQuery("isDelete","0"));
//boolQueryBuilder.mustNot(QueryBuilders.termsQuery("id",articleItemId));
//List<String> customIdList = new ArrayList();
//if(customIdList!=null && customIdList.size()>0) {
// boolQueryBuilder.mustNot(QueryBuilders.termsQuery("id",customIdList));
//}
// 如果有關鍵詞,添加關鍵詞
if(StringUtils.isNotBlank(key)) {
boolQueryBuilder.must(QueryBuilders.multiMatchQuery(key,"title","summary","detail" ));
}
//定義高亮查詢
HighlightBuilder highlightBuilder = new HighlightBuilder();
//設定需要高亮的欄位
highlightBuilder.field("title")
// 設定前綴、后綴
.preTags("<font color='#ee1a1a'>")
.postTags("</font>");
searchSourceBuilder.query(boolQueryBuilder);
searchSourceBuilder.highlighter(highlightBuilder);
// 分頁
searchSourceBuilder.from(offset);
searchSourceBuilder.size(pageSize);
// 按發布時間降序排序
searchSourceBuilder.sort("publishDate", SortOrder.DESC);
searchRequest.source(searchSourceBuilder);
SearchResponse searchResponse = restHighLevelClient.search(searchRequest, RequestOptions.DEFAULT);
long total = searchResponse.getHits().getTotalHits().value;
List<Map<String, Object>> list = Lists.newArrayList();
//遍歷高亮結果
for (SearchHit hit : searchResponse.getHits().getHits()) {
Map<String, HighlightField> highlightFields = hit.getHighlightFields();
HighlightField nameHighlight = highlightFields.get("title");
Map<String, Object> sourceAsMap = hit.getSourceAsMap();
// 拼接,覆寫原有值
if (nameHighlight != null) {
Text[] fragments = nameHighlight.getFragments();
String title = "";
for (Text text : fragments) {
title += text;
}
sourceAsMap.put("title", title);
}
// 初始值
sourceAsMap.put("isRead","0");
list.add(sourceAsMap);
}
// 構造回傳資料
Map<String,Object> retMap = new HashMap<>();
retMap.put("total",total);
retMap.put("dataList",list);
return ResultBody.ok().data(retMap);
}
自動補全
public ResultBody getSearchSuggest(@RequestParam(value = "key") String key) throws IOException {
if(StringUtils.isBlank(key)) {
return ResultBody.ok();
}
CompletionSuggestionBuilder suggestion = SuggestBuilders
.completionSuggestion("title.suggest").prefix(key).size(20).skipDuplicates(true);
SuggestBuilder suggestBuilder = new SuggestBuilder();
suggestBuilder.addSuggestion("suggest", suggestion);
// source builder
SearchSourceBuilder sourceBuilder = new SearchSourceBuilder();
sourceBuilder.suggest(suggestBuilder);
SearchRequest searchRequest = new SearchRequest("app-article-link"); //索引
searchRequest.source(sourceBuilder);
SearchResponse response = restHighLevelClient.search(searchRequest, RequestOptions.DEFAULT);
Suggest suggest = response.getSuggest();
Set<String> keywords = null;
if (suggest != null) {
keywords = new HashSet<>();
List<? extends Suggest.Suggestion.Entry<? extends Suggest.Suggestion.Entry.Option>> entries = suggest.getSuggestion("suggest").getEntries();
for (Suggest.Suggestion.Entry<? extends Suggest.Suggestion.Entry.Option> entry: entries) {
for (Suggest.Suggestion.Entry.Option option: entry.getOptions()) {
// 最多回傳10個推薦,每個長度最大為50
String keyword = option.getText().string();
if (!StringUtils.isEmpty(keyword) && keyword.length() <= 50) {
// 去除輸入欄位
if (keyword.equals(key)) continue;
keywords.add(keyword);
if (keywords.size() >= 10) {
break;
}
}
}
}
}
return ResultBody.ok().data(keywords);
}
歡迎交流!!!
轉載請註明出處,本文鏈接:https://www.uj5u.com/qita/293902.html
標籤:其他
