主頁 > 資料庫 > 基于FLink實作的實時安全檢測(一段時間內連續登錄失敗20次后,下一次登錄成功場景)

基于FLink實作的實時安全檢測(一段時間內連續登錄失敗20次后,下一次登錄成功場景)

2023-02-24 07:52:18 資料庫

研發背景

    公司安全部目前針對內部系統的網路訪問日志的安全審計,大部分都是T+1時效,每日當天,啟動Python撰寫的定時任務,完成昨日的日志審計和檢測,定時任務運行完成后,統一進行企業微信告警推送,這種方案在目前的網路環境和人員規模下,呈現兩個痛點,一是面對日益頻繁的網路攻擊、釣魚鏈接,T+1的定時任務,難以及時進行告警,因此也難以有效避免如關鍵資訊泄露等問題,二是目前以Python為主的單機定時任務,針對不同場景的處理時效,從一小時到十幾小時不等,效率低下,為解決以上問題,本人協助公司安全部同時對告警采集平臺進行改造,由之前的python單機任務處理,切換到基于Flink集群的并行處理,且告警推送時效,由之前的T+1天,提升到秒級實時告警,本次改造涉及網路日志審計的多個常見場景,如埠掃描、黑名單統計、例外流量、連續惡意登錄等,本次以一段時間內連續登錄失敗20次后,下一次登錄成功場景來進行介紹,

場景描述

    針對一個內部系統,如郵件系統,公司員工的訪問行為日志,存放于kafka,我們希望對于一個用戶賬號在同一個IP下,任意的3分鐘時間內,連續登錄郵件系統20次失敗,下一次登錄成功,這種場景能夠及時獲取并推送到企業微信某個指定的安全介面人,kafka中的資料,能夠通過某個關鍵字,區分當前網路訪問是否一次登錄事件,且有訪問時間(也就是事件時間),在決議到符合需求的用戶賬號之后,第一時間進行企業微信告警推送,并將其這段時間內的訪問行為,寫入下游ElasticSearch,

組件版本

  • Flink-1.14.4
  • Java8
  • ElasticSearch-7.3.2
  • Kafka-2.12_2.8.1

日志結構

IP和賬號皆為測驗使用,

{
   "user": "wangxm",
   "client_ip": "110.68.6.182",
   "source": "login",
   "loginname": "[email protected]",
   "IP": "110.8.148.58",
   "timestamp": "17:58:12",
   "@timestamp": "2022-04-20T09:58:13.647Z",
   "ip": "110.7.231.25",
   "clienttype": "POP3",
   "result": "success",
   "@version": "1"
 }

 

技術方案

    上述場景,可考慮使用FlinkCEP及Flink的滑動視窗進行實作,由于本人在采用FlinkCEP的方案進行代碼撰寫除錯后,發現并不能滿足,因此改用滑動視窗進行實作,

 

關鍵代碼

主入口類

    主入口類,創建了flink環境、設定了基礎引數,創建了kafkaSource,接入訊息后,進行了映射、過濾,并設定了水位線,進行了分組,之后設定了滑動視窗,在視窗內進行了事件統計,將復合條件的事件收集回傳并寫入ElasticSearch,

    針對map、filter、keyBy、window等算子,都單獨進行了撰寫,后面會一一列出來,

 

package com.data.dev.flink.mailTopic.main;

import com.data.dev.common.javabean.BaseBean;
import com.data.dev.common.javabean.kafkaMailTopic.MailMsgAlarm;
import com.data.dev.elasticsearch.ElasticSearchInfo;
import com.data.dev.elasticsearch.SinkToEs;
import com.data.dev.flink.FlinkEnv;
import com.data.dev.flink.mailTopic.OperationForLoginFailCheck.*;
import com.data.dev.kafka.KafkaSourceBuilder;
import com.data.dev.key.ConfigurationKey;
import com.data.dev.utils.TimeUtils;
import lombok.extern.slf4j.Slf4j;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.connector.kafka.source.KafkaSource;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.KeyedStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.datastream.WindowedStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.windowing.assigners.SlidingEventTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;

import java.time.Duration;


/**
 * Flink處理在3分鐘內連續登錄失敗20次后登錄成功的場景
 * 采用滑動視窗來實作
 * @author wangxiaomin 2022-06-01
 */

@Slf4j
public class MailMsg extends BaseBean {

    /**
     * Flink作業名稱
     */
    public static final  String JobName = "告警采集平臺——連續登錄失敗后登錄成功告警";
    /**
     * Kafka訊息名
     */
    public static final  String KafkaSourceName = "Kafka Source for AlarmPlatform About Mail Topic";

    public MailMsg(){
        log.info("初始化滑動視窗場景告警程式");
    }

    /**
     * 執行邏輯統計場景,實作告警推送
     */
    public static void execute(){


        //① 創建Flink執行環境并設定checkpoint等必要的引數
        StreamExecutionEnvironment env = FlinkEnv.getFlinkEnv();
        KafkaSource<String> kafkaSource = KafkaSourceBuilder.getKafkaSource(ConfigurationKey.KAFKA_MAIL_TOPIC_NAME,ConfigurationKey.KAFKA_MAIL_CONSUMER_GROUP_ID) ;
        DataStreamSource<String> kafkaMailMsg = env.fromSource(kafkaSource, WatermarkStrategy.forBoundedOutOfOrderness(Duration.ofMillis(10)), KafkaSourceName);


        //② 篩選登錄訊息,創建初始登錄事件流
        SingleOutputStreamOperator<com.data.dev.common.javabean.kafkaMailTopic.MailMsg> loginMapDs = kafkaMailMsg.map(new MsgToBeanMapper()).name("Map算子加工");
        SingleOutputStreamOperator<com.data.dev.common.javabean.kafkaMailTopic.MailMsg> loginFilterDs = loginMapDs.filter(new MailMsgForLoginFilter()).name("Filter算子加工");

        //③ 設定水位線
        WatermarkStrategy<com.data.dev.common.javabean.kafkaMailTopic.MailMsg> watermarkStrategy = WatermarkStrategy.<com.data.dev.common.javabean.kafkaMailTopic.MailMsg>forBoundedOutOfOrderness(Duration.ofMinutes(1))
                        .withTimestampAssigner((mailMsg, timestamp) -> TimeUtils.switchUTCToBeijingTimestamp(mailMsg.getTimestamp_datetime()));
        SingleOutputStreamOperator<com.data.dev.common.javabean.kafkaMailTopic.MailMsg> loginWmDs = loginFilterDs.assignTimestampsAndWatermarks(watermarkStrategy.withIdleness(Duration.ofMinutes(3))).name("增加水位線");

        //④ 設定主鍵
        KeyedStream<com.data.dev.common.javabean.kafkaMailTopic.MailMsg, String> loginKeyedDs = loginWmDs.keyBy(new LoginKeySelector());

        //⑥ 轉化為滑動視窗
        WindowedStream<com.data.dev.common.javabean.kafkaMailTopic.MailMsg, String, TimeWindow> loginWindowDs = loginKeyedDs.window(SlidingEventTimeWindows.of(Time.seconds(180L),Time.seconds(90L)));

        //⑦ 在視窗內進行邏輯統計
        SingleOutputStreamOperator<MailMsgAlarm> loginWindowsDealDs  = loginWindowDs.process(new WindowProcessFuncImpl()).name("視窗處理邏輯");

        //⑧ 將結果轉化為通用DataStream<String>格式
        SingleOutputStreamOperator<String> resultDs  = loginWindowsDealDs.map(new AlarmMsgToStringMapper()).name("視窗結果轉化為標準格式");

        //⑨ 將最終結果寫入ES
        resultDs.addSink(SinkToEs.getEsSinkBuilder(ElasticSearchInfo.ES_LOGIN_FAIL_INDEX_NAME,ElasticSearchInfo.ES_INDEX_TYPE_DEFAULT).build());

        //⑩ 提交Flink集群進行執行
        FlinkEnv.envExec(env,JobName);

    }
}

 

mapper算子

package com.data.dev.flink.mailTopic.OperationForLoginFailCheck;

import com.alibaba.fastjson.JSON;
import com.data.dev.common.javabean.BaseBean;
import com.data.dev.common.javabean.kafkaMailTopic.MailMsgAlarm;
import lombok.extern.slf4j.Slf4j;
import org.apache.flink.api.common.functions.MapFunction;

/**
 *  邏輯統計場景告警推送ES訊息體
 *  @author wangxiaoming-ghq 2022-06-01
 */
@Slf4j
public   class AlarmMsgToStringMapper extends BaseBean implements MapFunction<MailMsgAlarm, String> {

    @Override
    public String map(MailMsgAlarm mailMsgAlarm) throws Exception {
        return JSON.toJSONString(mailMsgAlarm);
    }
}

filter算子

package com.data.dev.flink.mailTopic.OperationForLoginFailCheck;

import com.data.dev.common.javabean.BaseBean;
import com.data.dev.common.javabean.kafkaMailTopic.MailMsg;
import lombok.extern.slf4j.Slf4j;
import org.apache.flink.api.common.functions.FilterFunction;


/**
 * ② 消費mail主題的訊息,過濾其中login的事件
 * @author wangxiaoming-ghq 2022-06-01
 */
@Slf4j
public class MailMsgForLoginFilter extends BaseBean implements FilterFunction<MailMsg> {
    @Override
    public boolean filter(MailMsg mailMsg) {
        if("login".equals(mailMsg.getSource())) {
            log.info("篩選原始的login事件:【" + mailMsg + "】");
        }
        return "login".equals(mailMsg.getSource());
    }
}

keyBy算子

package com.data.dev.flink.mailTopic.OperationForLoginFailCheck;

import com.data.dev.common.javabean.BaseBean;
import com.data.dev.common.javabean.kafkaMailTopic.MailMsg;
import lombok.extern.slf4j.Slf4j;
import org.apache.flink.api.java.functions.KeySelector;

/**
 * CEP 編程,需要進行key選取
 */
@Slf4j
public class LoginKeySelector extends BaseBean implements KeySelector<MailMsg, String> {
    @Override
    public String getKey(MailMsg mailMsg) {
        return mailMsg.getUser() + "@" + mailMsg.getClient_ip();
    }
}

視窗函式(核心代碼)

    這里我們主要考慮使用一個事件串列,用來存盤每一個視窗期內得到的連續登錄,當檢測到登陸失敗的事件,即存入事件串列中,之后判斷下一次登錄失敗事件,如果檢測到登錄成功事件,但此時登錄失敗的次數不足20次,則清空loginEventList,等待下一次檢測,一旦符合視窗內連續登錄失敗超過20次且下一次登錄成功這個事件,則清空此時的loginEventList并將當前登錄成功的事件進行告警推送,

 

package com.data.dev.flink.mailTopic.OperationForLoginFailCheck;

import com.data.dev.common.javabean.kafkaMailTopic.MailMsg;
import com.data.dev.common.javabean.kafkaMailTopic.MailMsgAlarm;
import com.data.dev.utils.HttpUtils;
import com.data.dev.utils.IPUtils;
import lombok.extern.slf4j.Slf4j;
import org.apache.flink.streaming.api.functions.windowing.ProcessWindowFunction;
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
import org.apache.flink.util.Collector;

import java.io.Serializable;
import java.util.ArrayList;
import java.util.List;

/**
 *  滑動視窗內復雜事件決議邏輯實作
 *  @author wangxiaoming-ghq 2022-06-01
 */
@Slf4j
public   class WindowProcessFuncImpl extends  ProcessWindowFunction<MailMsg, MailMsgAlarm, String, TimeWindow> implements Serializable {
    @Override
    public void process(String key, ProcessWindowFunction<MailMsg, MailMsgAlarm, String, TimeWindow>.Context context, Iterable<MailMsg> iterable, Collector<MailMsgAlarm> collector) {

        List<MailMsg> loginEventList = new ArrayList<>();
        MailMsgAlarm mailMsgAlarm;
        for (MailMsg mailMsg : iterable) {
            log.info("收集到的登錄事件【" + mailMsg + "】");
if (mailMsg.getResult().equals("fail")) { //開始檢測當前視窗內的事件,并將失敗的事件收集到loginEventList log.info("開始檢測當前視窗內的事件,并將失敗的事件收集到loginEventList"); loginEventList.add(mailMsg); } else if (mailMsg.getResult().equals("success") && loginEventList.size() < 20) {//如果檢測到登錄成功事件,但此時登錄失敗的次數不足20次,則清空loginEventList,等待下一次檢測 log.info("檢測到登錄成功事件,但此時登錄失敗的次數為【" + loginEventList.size() + "】不足20次,清空loginEventList,等待下一次檢測"); loginEventList.clear(); } else if (mailMsg.getResult().equals("success") && loginEventList.size() >= 20) { mailMsgAlarm = getMailMsgAlarm(loginEventList,mailMsg); log.info("檢測到登錄成功的事件,此時視窗內連續登錄失敗的次數為【" + mailMsgAlarm.getFailTimes() + "】"); //一旦符合視窗內連續登錄失敗超過20次且下一次登錄成功這個事件,則清空此時的loginEventList并將當前登錄成功的事件進行告警推送; loginEventList.clear(); doAlarmPush(mailMsgAlarm); collector.collect(mailMsgAlarm);//將當前登錄成功的事件進行收集上報 } else { log.info(mailMsg.getUser() + "當前已連續:【" + loginEventList.size() + "】 次登錄失敗"); } } } /** * 2022年6月17日15:03:06 * @param eventList:當前視窗內的事件串列 * @param eventCurrent:當前登錄成功的事件 * @return mailMsgAlarm:告警訊息體 */ public static MailMsgAlarm getMailMsgAlarm(List<MailMsg> eventList,MailMsg eventCurrent){ String alarmKey = eventCurrent.getUser() + "@" + eventCurrent.getClient_ip(); String loginFailStartTime = eventList.get(0).getTimestamp_datetime(); String loginSuccessTime = eventCurrent.getTimestamp_datetime(); int loginFailTimes = eventList.size(); MailMsgAlarm mailMsgAlarm = new MailMsgAlarm(); mailMsgAlarm.setMailMsg(eventCurrent); mailMsgAlarm.setAlarmKey(alarmKey); mailMsgAlarm.setStartTime(loginFailStartTime); mailMsgAlarm.setEndTime(loginSuccessTime); mailMsgAlarm.setFailTimes(loginFailTimes); return mailMsgAlarm; } /** * 2022年6月17日14:47:53 * @param mailMsgAlarm :當前構建的需要告警的事件 */ public void doAlarmPush(MailMsgAlarm mailMsgAlarm){ String userKey = mailMsgAlarm.getAlarmKey(); String clientIp = mailMsgAlarm.mailMsg.getClient_ip(); boolean isWhiteListIp = IPUtils.isWhiteListIp(clientIp); if(isWhiteListIp){//如果是白名單IP,不告警 log.info("當前登錄用戶【" + userKey + "】屬于白名單IP"); }else { //IP歸屬查詢結果、企業微信推送告警 String user = HttpUtils.getUserByClientIp(clientIp); HttpUtils.pushAlarmMsgToWechatWork(user,mailMsgAlarm.toString()); } } }

最后一次map算子

package com.data.dev.flink.mailTopic.OperationForLoginFailCheck;

import com.alibaba.fastjson.JSON;
import com.data.dev.common.javabean.BaseBean;
import com.data.dev.common.javabean.kafkaMailTopic.MailMsgAlarm;
import lombok.extern.slf4j.Slf4j;
import org.apache.flink.api.common.functions.MapFunction;

/**
 *  邏輯統計場景告警推送ES訊息體
 *  @author wangxiaoming-ghq 2022-06-01
 */
@Slf4j
public   class AlarmMsgToStringMapper extends BaseBean implements MapFunction<MailMsgAlarm, String> {

    @Override
    public String map(MailMsgAlarm mailMsgAlarm) throws Exception {
        return JSON.toJSONString(mailMsgAlarm);
    }
}

ElasticSearch工具類

package com.data.dev.elasticsearch;

import com.data.dev.common.javabean.BaseBean;
import com.data.dev.key.ConfigurationKey;
import com.data.dev.key.ElasticSearchKey;
import lombok.extern.slf4j.Slf4j;
import org.apache.flink.api.common.functions.RuntimeContext;
import org.apache.flink.streaming.connectors.elasticsearch.ElasticsearchSinkFunction;
import org.apache.flink.streaming.connectors.elasticsearch.RequestIndexer;
import org.apache.flink.streaming.connectors.elasticsearch7.ElasticsearchSink;
import org.apache.flink.streaming.connectors.elasticsearch7.RestClientFactory;
import org.apache.http.HttpHost;
import org.apache.http.auth.AuthScope;
import org.apache.http.auth.UsernamePasswordCredentials;
import org.apache.http.client.CredentialsProvider;
import org.apache.http.impl.client.BasicCredentialsProvider;
import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.client.Requests;

import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;

/**
 * 2022年6月17日15:15:06
 * @author wangxiaoming-ghq
 * Flink流計算結果寫入ES公共方法
 */
@Slf4j
public class SinkToEs extends BaseBean {
    public static final long serialVersionUID = 2L;
    private static final HashMap<String,String> ES_PROPS_MAP = ConfigurationKey.getApplicationProps();
    private static final String HOST = ES_PROPS_MAP.get(ConfigurationKey.ES_HOST);
    private static final String PASSWORD = ES_PROPS_MAP.get(ConfigurationKey.ES_PASSWORD);
    private static final String USERNAME = ES_PROPS_MAP.get(ConfigurationKey.ES_USERNAME);
    private static final String PORT = ES_PROPS_MAP.get(ConfigurationKey.ES_PORT);

    /**
     * 2022年6月17日15:17:55
     * 獲取ES連接資訊
     * @return esInfoMap:ES連接資訊持久化
     */
    public static HashMap<String,String > getElasticSearchInfo(){
        log.info("獲取ES連接資訊:【 " + "HOST="+HOST + "PORT="+PORT+"USERNAME="+USERNAME+"PASSWORD=********" + " 】");
        HashMap<String,String> esInfoMap = new HashMap<>();
        esInfoMap.put(ElasticSearchKey.HOST,HOST);
        esInfoMap.put(ElasticSearchKey.PASSWORD,PASSWORD);
        esInfoMap.put(ElasticSearchKey.USERNAME,USERNAME);
        esInfoMap.put(ElasticSearchKey.PORT,PORT);

        return esInfoMap;
    }

    /**
     * @param esIndexName:寫入索引名稱
     * @param esType:寫入索引型別
     * @return ElasticsearchSink.Builder<String>:構建器
     */
    public static ElasticsearchSink.Builder<String> getEsSinkBuilder(String esIndexName,String esType){
        HashMap<String, String> esInfoMap = getElasticSearchInfo();
        List<HttpHost> httpHosts = new ArrayList<>();
        httpHosts.add(new HttpHost(String.valueOf(esInfoMap.get(ElasticSearchKey.HOST)), Integer.parseInt(esInfoMap.get(ElasticSearchKey.PORT)), "http"));

        ElasticsearchSink.Builder<String> esSinkBuilder = new ElasticsearchSink.Builder<>(
                httpHosts,
                new ElasticsearchSinkFunction<String>() {

                    public IndexRequest createIndexRequest() {
                        Map<String, String> json = new HashMap<>();
                        //log.info("寫入ES的data:【"+json+"】");
                        IndexRequest index  = Requests.indexRequest()
                                .index(esIndexName)
                                .type(esType)
                                .source(json);
                        return index;
                    }

                    @Override
                    public void process(String element, RuntimeContext ctx, RequestIndexer indexer) {
                        indexer.add(createIndexRequest());
                    }
                }
        );


        //定義es的連接配置  帶用戶名密碼
        RestClientFactory restClientFactory = restClientBuilder -> {
            CredentialsProvider credentialsProvider = new BasicCredentialsProvider();
            credentialsProvider.setCredentials(
                    AuthScope.ANY,
                    new UsernamePasswordCredentials(
                            String.valueOf(esInfoMap.get(ElasticSearchKey.USERNAME)),
                            String.valueOf(esInfoMap.get(ElasticSearchKey.PASSWORD))
                    )
            );
            restClientBuilder.setHttpClientConfigCallback(httpAsyncClientBuilder -> {
                httpAsyncClientBuilder.disableAuthCaching();
                return httpAsyncClientBuilder.setDefaultCredentialsProvider(credentialsProvider);
            });
        };

        esSinkBuilder.setRestClientFactory(restClientFactory);
        return esSinkBuilder;
    }

}

事件物體類

package com.data.dev.common.javabean.kafkaMailTopic;

import com.data.dev.common.javabean.BaseBean;
import lombok.Data;

import java.util.Objects;


/**
 * @author wangxiaoming-ghq 2022-05-15
 * 邏輯統計場景告警事件
 */
@Data
public class MailMsgAlarm extends BaseBean {


    /**
     * 當前登錄成功的事件
     */
   public  MailMsg mailMsg;

    /**
     * 當前捕獲的告警主鍵:username@client_ip
     */
   public  String alarmKey;

    /**
     * 第一次登錄失敗的事件時間
     */
   public  String startTime;

    /**
     * 連續登錄失敗后下一次登錄成功的事件時間
     */
   public  String endTime;

    /**
     * 連續登錄失敗的次數
     */
   public  int failTimes;

    @Override
    public String toString() {
        return "{" +
                "  'mailMsg_login_success':'" + mailMsg + "'" +
                ", 'alarmKey':'" + alarmKey + "'" +
                ", 'start_login_time_in3min':'"  +startTime + "'" +
                ", 'end_login_time_in3min':'"  +endTime + "'" +
                ", 'login_fail_times':'"  +failTimes +  "'" +
                "}";
    }

    public MailMsgAlarm() {
    }

    @Override
    public boolean equals(Object o) {
        if (this == o) return true;
        if (!(o instanceof MailMsgAlarm)) return false;
        MailMsgAlarm that = (MailMsgAlarm) o;
        return getFailTimes() == that.getFailTimes() && getMailMsg().equals(that.getMailMsg()) && getAlarmKey().equals(that.getAlarmKey()) && getStartTime().equals(that.getStartTime()) && getEndTime().equals(that.getEndTime());
    }

    @Override
    public int hashCode() {
        return Objects.hash(getMailMsg(), getAlarmKey(), getStartTime(), getEndTime(), getFailTimes());
    }
}

訊息物體類

package com.data.dev.common.javabean.kafkaMailTopic;

import com.data.dev.common.javabean.BaseBean;
import lombok.Data;

import java.util.Objects;

/**
 * {
 *   "user": "wangxm",
 *   "client_ip": "110.68.6.182",
 *   "source": "login",
 *   "loginname": "[email protected]",
 *   "IP": "110.8.148.58",
 *   "timestamp": "17:58:12",
 *   "@timestamp": "2022-04-20T09:58:13.647Z",
 *   "ip": "110.7.231.25",
 *   "clienttype": "POP3",
 *   "result": "success",
 *   "@version": "1"
 * }
 *
 * user登錄用戶
 * client_ip 來源ip
 * source 型別
 * loginname 登錄用戶郵箱地址
 * ip 目標前端ip
 * timestamp 發送時間
 * &#064;timestamp  發送日期時間
 * IP 郵件日志發送來源IP
 * clienttype 客戶端登錄型別
 * result 登錄狀態
 */

@Data
public class MailMsg extends BaseBean {
    public String user;
    public String client_ip;
    public String source;
    public String loginName;
    public String mailSenderSourceIp;
    public String timestamp_time;
    public String timestamp_datetime;
    public String ip;
    public String clientType;
    public String result;
    public String version;

    public MailMsg() {
    }

    public MailMsg(String user, String client_ip, String source, String loginName, String mailSenderSourceIp, String timestamp_time, String timestamp_datetime, String ip, String clientType, String result, String version) {
        this.user = user;
        this.client_ip = client_ip;
        this.source = source;
        this.loginName = loginName;
        this.mailSenderSourceIp = mailSenderSourceIp;
        this.timestamp_time = timestamp_time;
        this.timestamp_datetime = timestamp_datetime;
        this.ip = ip;
        this.clientType = clientType;
        this.result = result;
        this.version = version;
    }

    @Override
    public boolean equals(Object o) {
        if (this == o) return true;
        if (!(o instanceof MailMsg)) return false;
        MailMsg mailMsg = (MailMsg) o;
        return getUser().equals(mailMsg.getUser()) && getClient_ip().equals(mailMsg.getClient_ip()) && getSource().equals(mailMsg.getSource()) && getLoginName().equals(mailMsg.getLoginName()) && getMailSenderSourceIp().equals(mailMsg.getMailSenderSourceIp()) && getTimestamp_time().equals(mailMsg.getTimestamp_time()) && getTimestamp_datetime().equals(mailMsg.getTimestamp_datetime()) && getIp().equals(mailMsg.getIp()) && getClientType().equals(mailMsg.getClientType()) && getResult().equals(mailMsg.getResult()) && getVersion().equals(mailMsg.getVersion());
    }

    @Override
    public int hashCode() {
        return Objects.hash(getUser(), getClient_ip(), getSource(), getLoginName(), getMailSenderSourceIp(), getTimestamp_time(), getTimestamp_datetime(), getIp(), getClientType(), getResult(), getVersion());
    }

    @Override
    public String toString() {
        return "{" +
                "  'user':'" + user + "'" +
                ", 'client_ip':'" + client_ip  + "'" +
                ", 'source':'" + source  + "'" +
                ", 'loginName':'" + loginName  + "'" +
                ", 'IP':'" + mailSenderSourceIp + "'" +
                ", 'timestamp':'" + timestamp_time + "'" +
                ", '@timestamp':'" + timestamp_datetime + "'" +
                ", 'ip':'"  + "'" +
                ", 'clientType':'" + clientType  + "'" +
                ", 'result':'" + result  + "'" +
                ", 'version':'" + version + "'" +
                "}";
    }

}

 源代碼已去掉敏感資訊,地址:https://gitee.com/wangxm-2270/alarmCollectByFlink.git

轉載請註明出處,本文鏈接:https://www.uj5u.com/shujuku/544905.html

標籤:其他

上一篇:基于SpringBoot實作操作GaussDB(DWS)的專案實戰

下一篇:redis(1)NoSQL資料庫簡介

標籤雲
其他(157675) Python(38076) JavaScript(25376) Java(17977) C(15215) 區塊鏈(8255) C#(7972) AI(7469) 爪哇(7425) MySQL(7132) html(6777) 基礎類(6313) sql(6102) 熊猫(6058) PHP(5869) 数组(5741) R(5409) Linux(5327) 反应(5209) 腳本語言(PerlPython)(5129) 非技術區(4971) Android(4554) 数据框(4311) css(4259) 节点.js(4032) C語言(3288) json(3245) 列表(3129) 扑(3119) C++語言(3117) 安卓(2998) 打字稿(2995) VBA(2789) Java相關(2746) 疑難問題(2699) 细绳(2522) 單片機工控(2479) iOS(2429) ASP.NET(2402) MongoDB(2323) 麻木的(2285) 正则表达式(2254) 字典(2211) 循环(2198) 迅速(2185) 擅长(2169) 镖(2155) 功能(1967) .NET技术(1958) Web開發(1951) python-3.x(1918) HtmlCss(1915) 弹簧靴(1913) C++(1909) xml(1889) PostgreSQL(1872) .NETCore(1853) 谷歌表格(1846) Unity3D(1843) for循环(1842)

熱門瀏覽
  • GPU虛擬機創建時間深度優化

    **?桔妹導讀:**GPU虛擬機實體創建速度慢是公有云面臨的普遍問題,由于通常情況下創建虛擬機屬于低頻操作而未引起業界的重視,實際生產中還是存在對GPU實體創建時間有苛刻要求的業務場景。本文將介紹滴滴云在解決該問題時的思路、方法、并展示最終的優化成果。 從公有云服務商那里購買過虛擬主機的資深用戶,一 ......

    uj5u.com 2020-09-10 06:09:13 more
  • 可編程網卡芯片在滴滴云網路的應用實踐

    **?桔妹導讀:**隨著云規模不斷擴大以及業務層面對延遲、帶寬的要求越來越高,采用DPDK 加速網路報文處理的方式在橫向縱向擴展都出現了局限性。可編程芯片成為業界熱點。本文主要講述了可編程網卡芯片在滴滴云網路中的應用實踐,遇到的問題、帶來的收益以及開源社區貢獻。 #1. 資料中心面臨的問題 隨著滴滴 ......

    uj5u.com 2020-09-10 06:10:21 more
  • 滴滴資料通道服務演進之路

    **?桔妹導讀:**滴滴資料通道引擎承載著全公司的資料同步,為下游實時和離線場景提供了必不可少的源資料。隨著任務量的不斷增加,資料通道的整體架構也隨之發生改變。本文介紹了滴滴資料通道的發展歷程,遇到的問題以及今后的規劃。 #1. 背景 資料,對于任何一家互聯網公司來說都是非常重要的資產,公司的大資料 ......

    uj5u.com 2020-09-10 06:11:05 more
  • 滴滴AI Labs斬獲國際機器翻譯大賽中譯英方向世界第三

    **桔妹導讀:**深耕人工智能領域,致力于探索AI讓出行更美好的滴滴AI Labs再次斬獲國際大獎,這次獲獎的專案是什么呢?一起來看看詳細報道吧! 近日,由國際計算語言學協會ACL(The Association for Computational Linguistics)舉辦的世界最具影響力的機器 ......

    uj5u.com 2020-09-10 06:11:29 more
  • MPP (Massively Parallel Processing)大規模并行處理

    1、什么是mpp? MPP (Massively Parallel Processing),即大規模并行處理,在資料庫非共享集群中,每個節點都有獨立的磁盤存盤系統和記憶體系統,業務資料根據資料庫模型和應用特點劃分到各個節點上,每臺資料節點通過專用網路或者商業通用網路互相連接,彼此協同計算,作為整體提供 ......

    uj5u.com 2020-09-10 06:11:41 more
  • 滴滴資料倉庫指標體系建設實踐

    **桔妹導讀:**指標體系是什么?如何使用OSM模型和AARRR模型搭建指標體系?如何統一流程、規范化、工具化管理指標體系?本文會對建設的方法論結合滴滴資料指標體系建設實踐進行解答分析。 #1. 什么是指標體系 ##1.1 指標體系定義 指標體系是將零散單點的具有相互聯系的指標,系統化的組織起來,通 ......

    uj5u.com 2020-09-10 06:12:52 more
  • 單表千萬行資料庫 LIKE 搜索優化手記

    我們經常在資料庫中使用 LIKE 運算子來完成對資料的模糊搜索,LIKE 運算子用于在 WHERE 子句中搜索列中的指定模式。 如果需要查找客戶表中所有姓氏是“張”的資料,可以使用下面的 SQL 陳述句: SELECT * FROM Customer WHERE Name LIKE '張%' 如果需要 ......

    uj5u.com 2020-09-10 06:13:25 more
  • 滴滴Ceph分布式存盤系統優化之鎖優化

    **桔妹導讀:**Ceph是國際知名的開源分布式存盤系統,在工業界和學術界都有著重要的影響。Ceph的架構和演算法設計發表在國際系統領域頂級會議OSDI、SOSP、SC等上。Ceph社區得到Red Hat、SUSE、Intel等大公司的大力支持。Ceph是國際云計算領域應用最廣泛的開源分布式存盤系統, ......

    uj5u.com 2020-09-10 06:14:51 more
  • es~通過ElasticsearchTemplate進行聚合~嵌套聚合

    之前寫過《es~通過ElasticsearchTemplate進行聚合操作》的文章,這一次主要寫一個嵌套的聚合,例如先對sex集合,再對desc聚合,最后再對age求和,共三層嵌套。 Aggregations的部分特性類似于SQL語言中的group by,avg,sum等函式,Aggregation ......

    uj5u.com 2020-09-10 06:14:59 more
  • 爬蟲日志監控 -- Elastc Stack(ELK)部署

    傻瓜式部署,只需替換IP與用戶 導讀: 現ELK四大組件分別為:Elasticsearch(核心)、logstash(處理)、filebeat(采集)、kibana(可視化) 下載均在https://www.elastic.co/cn/downloads/下tar包,各組件版本最好一致,配合fdm會 ......

    uj5u.com 2020-09-10 06:15:05 more
最新发布
  • day02-2-商鋪查詢快取

    功能02-商鋪查詢快取 3.商鋪詳情快取查詢 3.1什么是快取? 快取就是資料交換的緩沖區(稱作Cache),是存盤資料的臨時地方,一般讀寫性能較高。 快取的作用: 降低后端負載 提高讀寫效率,降低回應時間 快取的成本: 資料一致性成本 代碼維護成本 運維成本 3.2需求說明 如下,當我們點擊商店詳 ......

    uj5u.com 2023-04-20 08:33:24 more
  • MySQL中binlog備份腳本分享

    關于MySQL的二進制日志(binlog),我們都知道二進制日志(binlog)非常重要,尤其當你需要point to point災難恢復的時侯,所以我們要對其進行備份。關于二進制日志(binlog)的備份,可以基于flush logs方式先切換binlog,然后拷貝&壓縮到到遠程服務器或本地服務器 ......

    uj5u.com 2023-04-20 08:28:06 more
  • day02-短信登錄

    功能實作02 2.功能01-短信登錄 2.1基于Session實作登錄 2.1.1思路分析 2.1.2代碼實作 2.1.2.1發送短信驗證碼 發送短信驗證碼: 發送驗證碼的介面為:http://127.0.0.1:8080/api/user/code?phone=xxxxx<手機號> 請求方式:PO ......

    uj5u.com 2023-04-20 08:27:27 more
  • 快取與資料庫雙寫一致性幾種策略分析

    本文將對幾種快取與資料庫保證資料一致性的使用方式進行分析。為保證高并發性能,以下分析場景不考慮執行的原子性及加鎖等強一致性要求的場景,僅追求最終一致性。 ......

    uj5u.com 2023-04-20 08:26:48 more
  • sql陳述句優化

    問題查找及措施 問題查找 需要找到具體的代碼,對其進行一對一優化,而非一直把關注點放在服務器和sql平臺 降低簡化每個事務中處理的問題,盡量不要讓一個事務拖太長的時間 例如檔案上傳時,應將檔案上傳這一步放在事務外面 微軟建議 4.啟動sql定時執行計劃 怎么啟動sqlserver代理服務-百度經驗 ......

    uj5u.com 2023-04-20 08:26:35 more
  • 云時代,MySQL到ClickHouse資料同步產品對比推薦

    ClickHouse 在執行分析查詢時的速度優勢很好的彌補了MySQL的不足,但是對于很多開發者和DBA來說,如何將MySQL穩定、高效、簡單的同步到 ClickHouse 卻很困難。本文對比了 NineData、MaterializeMySQL(ClickHouse自帶)、Bifrost 三款產品... ......

    uj5u.com 2023-04-20 08:26:29 more
  • sql陳述句優化

    問題查找及措施 問題查找 需要找到具體的代碼,對其進行一對一優化,而非一直把關注點放在服務器和sql平臺 降低簡化每個事務中處理的問題,盡量不要讓一個事務拖太長的時間 例如檔案上傳時,應將檔案上傳這一步放在事務外面 微軟建議 4.啟動sql定時執行計劃 怎么啟動sqlserver代理服務-百度經驗 ......

    uj5u.com 2023-04-20 08:25:13 more
  • Redis 報”OutOfDirectMemoryError“(堆外記憶體溢位)

    Redis 報錯“OutOfDirectMemoryError(堆外記憶體溢位) ”問題如下: 一、報錯資訊: 使用 Redis 的業務介面 ,產生 OutOfDirectMemoryError(堆外記憶體溢位),如圖: 格式化后的報錯資訊: { "timestamp": "2023-04-17 22: ......

    uj5u.com 2023-04-20 08:24:54 more
  • day02-2-商鋪查詢快取

    功能02-商鋪查詢快取 3.商鋪詳情快取查詢 3.1什么是快取? 快取就是資料交換的緩沖區(稱作Cache),是存盤資料的臨時地方,一般讀寫性能較高。 快取的作用: 降低后端負載 提高讀寫效率,降低回應時間 快取的成本: 資料一致性成本 代碼維護成本 運維成本 3.2需求說明 如下,當我們點擊商店詳 ......

    uj5u.com 2023-04-20 08:24:03 more
  • day02-短信登錄

    功能實作02 2.功能01-短信登錄 2.1基于Session實作登錄 2.1.1思路分析 2.1.2代碼實作 2.1.2.1發送短信驗證碼 發送短信驗證碼: 發送驗證碼的介面為:http://127.0.0.1:8080/api/user/code?phone=xxxxx<手機號> 請求方式:PO ......

    uj5u.com 2023-04-20 08:23:11 more