主頁 >  其他 > 為什么一個星期作業量的作業,我做了一個多月,還沒結束

為什么一個星期作業量的作業,我做了一個多月,還沒結束

2021-04-01 18:40:37 其他

    為什么一個星期作業量的作業,我做了一個多月,還沒結束

    為什么一個簡單的小任務,我遇到這么多難題

    這是一個HIK平臺WIFI資料接入的作業,先看下我的代碼提交記錄:

     首先有這兩方面的原因:1、初學Java時間不長,不夠熟練,這個原因浪費的時間并不多,2、與資料提供方溝通浪費的時間,因資料有問題,即DeviceId和資料庫中的DeviceId對不上,導致程式篩選不到資料,需要對方修改相關配置,由于我感覺事情并不急,所以是隔幾天催一下,浪費了不少天的時間,最后DeviceId依然對不上,我是怎么解決的呢?由于基本上是一個設備布在一個地點,所以我是根據資料中的經緯度來篩選資料的,這種辦法其實還是有一點點問題的,

    再說寫代碼花費的時間,服務部署到現網后,就開始發現BUG了,程式處理資料的速度不夠,導致FTP上的ZIP包處理不完大量積壓,因為我用的是單執行緒處理資料,遂改成多執行緒處理資料,

    經過優化和測驗,發現資料處理速度還是不夠,FTP上資料產生的速度大約是380條每秒,可能會更多,而單機每秒最多只能處理200多條資料,繼續嘗試優化,但還是不行,所以我面臨第一個重要問題:把程式部署到多臺機器,提高資料處理速度,程式該怎么寫?

    對于這個問題,經過探索,我最終的解決辦法是:把程式部署在5臺機器上,程式跑起來后,能拿到機器名稱,FTP上的ZIP檔案名帶有時間戳,根據機器名和時間戳的最后一位,把ZIP檔案分配到5臺機器處理,這樣就解決了資料處理速度不夠的問題,

    我想解決的第二個重要問題是:由于我目前做的是大資料維護,學了點Spark和Flink知識,所以我想用Spark改寫,已經寫好的程式是SpringBoot的,所以我要做的是SpringBoot整合Spark,周末我在家搭了一個Hadoop+Spark的分布式集群環境,寫了個純Spark的Demo跑,正常,然后寫了個SpringBoot整合Spark的Demo,本地模式跑,即.setMaster("local[3]"),正常,但是提交到集群跑,web api和swagger在線檔案正常,任務跑起來后在Spark Web頁面可以看到,但日志報錯,我試了3種方式,一種是打war包部署在tomcat里,一種是用命令java -jar運行jar包,一種是用spark-submit命令運行jar包,任務跑起來后都報錯了,三種方式錯誤也不相同,搞到凌晨4點多,沒有解決,放棄,不過,就算我成功了,接下來的問題,可能依然無法解決,就是資料處理完后,要推送到Kafka,網上的教學都是教Spark怎么處理Kafka流資料,Kafka資料是作為資料源的,不是作為目的地的,所以我的想法可能本身就是個問題,

    解釋一下為什么要SpringBoot整合Spark,只用Spark不用SpringBoot不行嗎?實際上之前同事就是這么干的,要么用SpringBoot,要么用Spark或Flink,沒有把SpringBoot和二者在一起用過,為什么我要這么做?因為我要讀mysql資料庫,我要用JDBC或者Mybatis等,如果不用SpringBoot,有些東西可能要自己搞,不太方便,

    既然SpringBoot整合Spark沒有成功,那資料分配不均勻的問題怎么解決(FTP上ZIP檔案5分種產生5個檔案,2個資料量大,3個不是我需要的資料可以直接刪了,但不管是按時間戳這個特征分配,還是按序號這個特征分配,都無法均勻分配,可能會導致一個節點資料積壓,另外3個節點沒有要處理的資料,即一個節點有難,3個節點圍觀,雖然時間尺度拉大后,資料分配是均勻的,但是資料處理延遲大了,5分鐘產生一個ZIP檔案,意味著,有的資料已經延遲處理了5分鐘,而我這邊還要延遲幾分鐘到幾十分鐘不等)?這是我準備解決的第三個重要問題,

    我用Socket解決了這個問題,我把程式部署在5個節點上,一個作為master節點,4個作為worker節點,程式啟動后根據機器名判斷,確定master節點,master節點從FTP上下載ZIP檔案,FTP上ZIP檔案可能很多,先只下載一個檔案進行處理,一個檔案中可能有多達10萬條資料,也可能就幾萬條資料,還可能不是我需要的資料,直接刪掉該ZIP檔案即可,然后決議資料,再然后把資料集合平均分成4份,通過Socket發送給4個worker節點,worker節點收到資料集合,進行篩選和處理,然后推送到Kafka,資料處理完后,給master節點發送一條訊息,可以重復發送幾次,再加上是局域網,以確保master節點能收到訊息(這個很重要,但這里也有問題),master節點收到4個節點資料處理完成的訊息后,洗掉FTP上的該ZIP檔案,然后進行下一次處理,直到FTP上的ZIP檔案全部處理完成并洗掉,我今天寫完后,把寫ES日志和發Kafka以及洗掉ZIP檔案的代碼注釋掉,耗時的地方用Thread.sleep代替,部署到真實環境,看能不能穩定跑上一天,我感覺用Socket實作分布式資料處理,雖然能解決資料分配不均勻的問題,但是程式穩定性變差了,如果master節點收不到worker節點發來的資料處理完成的訊息怎么辦?假設其中一個worker節點的程式掛掉了呢?

    雖然只是一個簡單的小任務,我真的是非常努力,如果我最初的設想成功,以后的類似服務都可以這么寫,意義重大,可惜沒搞成功,退而求其次,用Socket寫分布式處理程式,我好像迷迷糊糊明白為什么Spark要依賴Hadoop了,我自己用Socket寫漏洞很多啊,沒有把資料持久化,萬一漰了,資料就丟了,

    有沒有大佬給指點一下,我努力的思路是不是有問題,有沒有代碼又容易寫,又不容易寫錯,程式又穩定可靠的方案?

 

====================== 分隔線 ==============================================================================================

上面的使用Socket的方案存在的問題:1.master節點把大量資料發送到worker節點處理,占用大量帶寬,并且耗時,2.master節點需要等待所有worker節點全部處理完成后,才能進行下一次處理,因網路傳輸耗時和各機器性能差異,導致某些worker節點空閑時間長,浪費大量時間

下面是新的使用Socket的方案,有下列優點:1.master節點和worker節點通信,只傳輸命令,不傳輸資料內容,節省帶寬,省時可靠,2.worker節點處理完資料,立即請求下一次處理,每一個worker節點得到充分利用

 

最終實作方案:

通過分布式鎖,把FTP上的zip檔案,均勻的分配到7個worker主機節點進行處理,master節點不處理資料,只負責處理7個woker節點的鎖請求與資料處理請求,7個worker節點只要有3個正常作業,就能跟上FTP上資料產生的速度,不會造成zip檔案積壓,該方式資料處理延遲很小,能夠及時處理FTP上產生的資料


大致流程:

1.任務啟動,判斷當前worker節點在FTP上是否存在私有檔案,如果存在,則進入直接處理流程
2.如果私有檔案不存在,則worker節點向master節點請求鎖
3.master節點收到請求,發放鎖
4.worker節點收到鎖,重命名FTP上的一個檔案為私有檔案名,該檔案名對其它節點不可見(各worker節點通過正則運算式判斷處理),然后釋放鎖
5.master節點收到釋放鎖的請求,釋放信號量,使其能夠繼續處理下一個節點的鎖請求
6.worker節點釋放鎖后,下載檔案并處理資料,資料處理完,洗掉私有檔案,并再次請求鎖,以進行下一個檔案的處理

主要實作代碼:

1.ReadFtpFileService類:任務啟動入口,判斷FTP上私有檔案是否存在,向master節點請求鎖或者請求直接處理

package com.suncreate.wifi.service;

import com.suncreate.wifi.hikmodel.SocketData;
import com.suncreate.wifi.tool.FtpUtils;
import com.suncreate.wifi.tool.SocketUtil;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Service;

import java.util.ArrayList;
import java.util.List;

@Service
public class ReadFtpFileService {
    private static final Logger log = LoggerFactory.getLogger(ReadFtpFileService.class);

    @Value("${ftp.host}")
    private String ftpHost;

    @Value("${ftp.port}")
    private int ftpPort;

    @Value("${ftp.username}")
    private String ftpUserName;

    @Value("${ftp.password}")
    private String ftpPassword;

    @Value("${node.name}")
    private String nodeName;

    @Value("${master.ip}")
    private String masterIp;

    @Value("${master.name}")
    private String masterName;

    /**
     * 處理FTP上的zip資料
     */
    public void ProcessZip() {
        try {
            if (nodeName.equals(masterName)) return; //master節點不處理資料,跳過

            log.info("ProcessZip() 開始");

            List<String> ftpFileList = getFtpFileList();
            if (ftpFileList != null && ftpFileList.size() > 0) {
                boolean bl = false;
                String fileName = null;
                for (String ftpFileName : ftpFileList) {
                    if (ftpFileName.endsWith(nodeName + ".zip")) {
                        bl = true;
                        fileName = ftpFileName;
                    }
                }

                if (bl) {
                    SocketUtil.Send(masterIp, new SocketData(nodeName, 3, fileName)); //3:直接處理請求
                    ZipProcessTime.updateTime();
                } else {
                    SocketUtil.Send(masterIp, new SocketData(nodeName, 0)); //0:請求鎖
                    ZipProcessTime.updateTime();
                }
            } else {
                ZipProcessTime.updateTime();

                RunProcessZipThread thread = new RunProcessZipThread(this);
                thread.start();
            }

            log.info("ProcessZip() 結束");
        } catch (Exception e) {
            log.error("ProcessZip 出錯", e);
        }
    }

    private List<String> getFtpFileList() {
        List<String> result = new ArrayList<>();
        FtpUtils ftpUtils = new FtpUtils();

        try {
            ftpUtils.login(ftpHost, ftpPort, ftpUserName, ftpPassword);
            ftpUtils.listFileName("/");
            result = ftpUtils.getFileNameList();
        } catch (Exception e) {
            log.error("getFtpFileList() failed", e);
        } finally {
            ftpUtils.disConnection();
        }

        return result;
    }

}
View Code

2.SocketServer類:用于啟動Socket服務端執行緒

package com.suncreate.wifi.service;

import com.suncreate.wifi.hikmodel.SocketData;
import com.suncreate.wifi.model.HotspotInfoCollected;
import com.suncreate.wifi.model.TermInfoCharacteristics;
import com.suncreate.wifi.task.CheckScheduleConfig;
import com.suncreate.wifi.tool.FtpUtils;
import com.suncreate.wifi.tool.SerializeUtil;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Service;

import java.io.*;
import java.net.ServerSocket;
import java.net.Socket;
import java.util.List;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.Executors;
import java.util.concurrent.ThreadPoolExecutor;

@Service
public class SocketServer {
    private static final Logger log = LoggerFactory.getLogger(SocketServer.class);

    @Autowired
    private KafkaSendService kafkaSendService;

    @Autowired
    private ReadFtpFileService readFtpFileService;

    @Value("${ftp.host}")
    private String ftpHost;

    @Value("${ftp.port}")
    private int ftpPort;

    @Value("${ftp.username}")
    private String ftpUserName;

    @Value("${ftp.password}")
    private String ftpPassword;

    @Value("${master.ip}")
    private String masterIp;

    @Value("${node.name}")
    private String nodeName;

    @Value("${master.name}")
    private String masterName;

    public void start() throws IOException {
        ServerSocket serverSocket = new ServerSocket(18060);
        log.info("當前節點主機名稱:" + nodeName);

        if (nodeName.equals(masterName)) { //master節點多起幾個執行緒與worker節點通信
            for (int i = 0; i < 7; i++) {
                SocketThread socketThread = new SocketThread(serverSocket, kafkaSendService, readFtpFileService, ftpHost, ftpPort, ftpUserName, ftpPassword, masterIp, nodeName);
                socketThread.start();
            }
        } else { //worker節點起一個執行緒與master通信即可
            SocketThread socketThread = new SocketThread(serverSocket, kafkaSendService, readFtpFileService, ftpHost, ftpPort, ftpUserName, ftpPassword, masterIp, nodeName);
            socketThread.start();
        }
    }

}
View Code

3.SocketThread類:Socket服務端接收命令,處理命令

package com.suncreate.wifi.service;

import com.suncreate.wifi.hikmodel.SocketData;
import com.suncreate.wifi.hikmodel.ZipData;
import com.suncreate.wifi.model.HotspotInfoCollected;
import com.suncreate.wifi.model.TermInfoCharacteristics;
import com.suncreate.wifi.tool.FtpUtils;
import com.suncreate.wifi.tool.SerializeUtil;
import com.suncreate.wifi.tool.SocketUtil;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.*;
import java.net.ServerSocket;
import java.net.Socket;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.*;
import java.util.regex.Pattern;

public class SocketThread extends Thread {
    private static final Logger log = LoggerFactory.getLogger(SocketThread.class);

    private static ThreadPoolExecutor threadPool = (ThreadPoolExecutor) Executors.newFixedThreadPool(256);

    private static Semaphore semaphore = new Semaphore(1);

    private ServerSocket serverSocket;

    private KafkaSendService kafkaSendService;

    private ReadFtpFileService readFtpFileService;

    private String ftpHost;

    private int ftpPort;

    private String ftpUserName;

    private String ftpPassword;

    private String masterIp; //master節點IP

    private String nodeName;

    public SocketThread(ServerSocket serverSocket, KafkaSendService kafkaSendService, ReadFtpFileService readFtpFileService, String ftpHost, int ftpPort, String ftpUserName, String ftpPassword, String masterIp, String nodeName) {
        this.serverSocket = serverSocket;
        this.kafkaSendService = kafkaSendService;
        this.readFtpFileService = readFtpFileService;
        this.ftpHost = ftpHost;
        this.ftpPort = ftpPort;
        this.ftpUserName = ftpUserName;
        this.ftpPassword = ftpPassword;
        this.masterIp = masterIp;
        this.nodeName = nodeName;
    }

    public void run() {
        try {
            while (true) {
                try {
                    Socket socket = serverSocket.accept();
                    InputStream inputStream = socket.getInputStream();
                    DataInputStream dataInputStream = new DataInputStream(inputStream);
                    BufferedInputStream bufferedInputStream = new BufferedInputStream(dataInputStream);
                    ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream();

                    byte[] bytes = new byte[10240];
                    int len;
                    while ((len = bufferedInputStream.read(bytes)) > 0) {
                        byteArrayOutputStream.write(bytes, 0, len);
                    }

                    SocketData socketData = (SocketData) SerializeUtil.Deserialize(byteArrayOutputStream.toByteArray());
                    socket.close();

                    //0:請求鎖
                    if (socketData.getCommand().equals(0)) {
                        log.info("收到節點 " + socketData.getNodeName() + " " + socket.getInetAddress().getHostAddress() + " 的命令 0:請求鎖");
                        semaphore.tryAcquire(30, TimeUnit.SECONDS);

                        SocketUtil.Send(socket.getInetAddress().getHostAddress(), new SocketData(nodeName, 1)); //1:發放鎖
                        log.info("向節點 " + socket.getInetAddress().getHostAddress() + " 發放鎖");
                    }

                    //1:發放鎖
                    if (socketData.getCommand().equals(1)) {
                        log.info("收到節點 " + socketData.getNodeName() + " " + socket.getInetAddress().getHostAddress() + " 的命令 1:發放鎖");
                        List<String> ftpFileList = getFtpFileList();
                        //篩選
                        String pattern = "^[\\S\\s]*-[0-9]*-[0-9]*-[0-9]*.zip$";
                        for (int i = ftpFileList.size() - 1; i >= 0; i--) {
                            String ftpFileName = ftpFileList.get(i);
                            boolean bl = Pattern.matches(pattern, ftpFileName);
                            if (!bl) {
                                ftpFileList.remove(i);
                            }
                        }
                        if (ftpFileList != null && ftpFileList.size() > 0) {
                            String newFtpFileName = ftpFileList.get(0).replace(".zip", "") + "-" + nodeName + ".zip";
                            renameFtpFile(ftpFileList.get(0), newFtpFileName);
                            SocketUtil.Send(socket.getInetAddress().getHostAddress(), new SocketData(nodeName, 2)); //2:釋放鎖

                            byte[] file = downloadFtpFile(newFtpFileName);
                            if (file != null) {
                                ZipData zipData = new ZipData(file);

                                if (zipData.getHotsoptList().size() > 0 || zipData.getTerminfoList().size() > 0) {
                                    processZipData(zipData);

                                    removeFtpFile(ftpHost, ftpPort, ftpUserName, ftpPassword, newFtpFileName); //洗掉FTP上的zip檔案

                                    SocketUtil.Send(masterIp, new SocketData(nodeName, 0)); //0:再次請求鎖
                                    ZipProcessTime.updateTime();
                                } else {
                                    removeFtpFile(ftpHost, ftpPort, ftpUserName, ftpPassword, newFtpFileName); //洗掉FTP上的zip檔案

                                    RunProcessZipThread thread = new RunProcessZipThread(readFtpFileService);
                                    thread.start();
                                }
                            } else {
                                RunProcessZipThread thread = new RunProcessZipThread(readFtpFileService);
                                thread.start();
                            }
                        } else {
                            SocketUtil.Send(socket.getInetAddress().getHostAddress(), new SocketData(nodeName, 2)); //2:釋放鎖

                            RunProcessZipThread thread = new RunProcessZipThread(readFtpFileService);
                            thread.start();
                        }
                    }

                    //2:釋放鎖
                    if (socketData.getCommand().equals(2)) {
                        log.info("收到節點 " + socketData.getNodeName() + " " + socket.getInetAddress().getHostAddress() + " 的命令 2:釋放鎖");
                        semaphore.release();
                    }

                    //3:直接處理請求
                    if (socketData.getCommand().equals(3)) {
                        log.info("收到節點 " + socketData.getNodeName() + " " + socket.getInetAddress().getHostAddress() + " 的命令 3:直接處理請求");
                        SocketUtil.Send(socket.getInetAddress().getHostAddress(), new SocketData(nodeName, 4, socketData.getFtpFileName())); //4:直接處理命令
                    }

                    //4:直接處理命令
                    if (socketData.getCommand().equals(4)) {
                        log.info("收到節點 " + socketData.getNodeName() + " " + socket.getInetAddress().getHostAddress() + " 的命令 4:直接處理命令");
                        String ftpFileName = socketData.getFtpFileName();
                        byte[] file = downloadFtpFile(ftpFileName);
                        if (file != null) {
                            ZipData zipData = new ZipData(file);

                            if (zipData.getHotsoptList().size() > 0 || zipData.getTerminfoList().size() > 0) {
                                processZipData(zipData);

                                removeFtpFile(ftpHost, ftpPort, ftpUserName, ftpPassword, ftpFileName); //洗掉FTP上的zip檔案

                                SocketUtil.Send(masterIp, new SocketData(nodeName, 0)); //0:再次請求鎖
                                ZipProcessTime.updateTime();
                            } else {
                                removeFtpFile(ftpHost, ftpPort, ftpUserName, ftpPassword, ftpFileName); //洗掉FTP上的zip檔案

                                RunProcessZipThread thread = new RunProcessZipThread(readFtpFileService);
                                thread.start();
                            }
                        } else {
                            RunProcessZipThread thread = new RunProcessZipThread(readFtpFileService);
                            thread.start();
                        }
                    }
                } catch (Exception e) {
                    log.error("SocketServer while 出錯", e);
                }
            }
        } catch (Exception e) {
            log.error("SocketServer run 出錯", e);
        }
    }

    private void processZipData(ZipData zipData) {
        CountDownLatch countDownLatch = new CountDownLatch(zipData.getHotsoptList().size() + zipData.getTerminfoList().size());

        for (HotspotInfoCollected hotspot : zipData.getHotsoptList()) {
            RunnableSendHotspot runnableSendHotspot = new RunnableSendHotspot(kafkaSendService, hotspot, countDownLatch);
            threadPool.submit(runnableSendHotspot);
        }

        for (TermInfoCharacteristics terminfo : zipData.getTerminfoList()) {
            RunnableSendTerm runnableSendTerm = new RunnableSendTerm(kafkaSendService, terminfo, countDownLatch);
            threadPool.submit(runnableSendTerm);
        }

        try {
            countDownLatch.await();
        } catch (InterruptedException e) {
            log.error("countDownLatch.await() failed", e);
        }
    }

    private void renameFtpFile(String oldName, String newName) {
        FtpUtils ftpUtils = new FtpUtils();

        try {
            ftpUtils.login(ftpHost, ftpPort, ftpUserName, ftpPassword);
            ftpUtils.renameFtpFile(oldName, newName);
        } catch (Exception e) {
            log.error("RenameFtpFile() failed", e);
        } finally {
            ftpUtils.disConnection();
        }
    }

    private List<String> getFtpFileList() {
        List<String> result = new ArrayList<>();
        FtpUtils ftpUtils = new FtpUtils();

        try {
            ftpUtils.login(ftpHost, ftpPort, ftpUserName, ftpPassword);
            ftpUtils.listFileName("/");
            result = ftpUtils.getFileNameList();
        } catch (Exception e) {
            log.error("getFtpFileList() failed", e);
        } finally {
            ftpUtils.disConnection();
        }

        return result;
    }

    private byte[] downloadFtpFile(String filePath) {
        byte[] result = null;
        FtpUtils ftpUtils = new FtpUtils();

        try {
            ftpUtils.login(ftpHost, ftpPort, ftpUserName, ftpPassword);
            result = ftpUtils.downloadFtpFile(filePath);
            int tryCount = 1;
            while (result == null && tryCount < 5) {
                log.info("下載FTP檔案 " + filePath + " 失敗,嘗試再次下載" + tryCount);
                result = ftpUtils.downloadFtpFile(filePath);
                Thread.sleep(3000);
                tryCount++;
            }
        } catch (Exception e) {
            log.error("downloadFtpFile(filePath) failed", e);
        } finally {
            ftpUtils.disConnection();
        }

        return result;
    }

    private boolean removeFtpFile(String ftpHost, int ftpPort, String ftpUserName, String ftpPassword, String filePath) {
        boolean result = false;
        FtpUtils ftpUtils = new FtpUtils();

        try {
            ftpUtils.login(ftpHost, ftpPort, ftpUserName, ftpPassword);
            ftpUtils.listFileName("/");
            result = ftpUtils.removeFtpFile(filePath);
        } catch (Exception e) {
            log.error("removeFtpFile() failed", e);
        } finally {
            ftpUtils.disConnection();
        }

        return result;
    }

}
View Code

4.RunProcessZipThread類:負責任務的重啟

package com.suncreate.wifi.service;

/**
 * 用于重啟資料處理任務
 */
public class RunProcessZipThread extends Thread {

    private ReadFtpFileService readFtpFileService;

    public RunProcessZipThread(ReadFtpFileService readFtpFileService) {
        this.readFtpFileService = readFtpFileService;
    }

    public void run() {
        try {
            Thread.sleep(20000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }

        readFtpFileService.ProcessZip();
    }
}
View Code

實際處理資料的代碼:

1.RunnableSendHotspot類:

package com.suncreate.wifi.service;

import com.google.gson.Gson;
import com.suncreate.logback.elasticsearch.metric.DataType;
import com.suncreate.logback.elasticsearch.metric.ProcPhase;
import com.suncreate.logback.elasticsearch.metric.ProcStatus;
import com.suncreate.logback.elasticsearch.metric.SinkType;
import com.suncreate.logback.elasticsearch.util.MetricUtil;
import com.suncreate.wifi.model.HotspotInfoCollected;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.HashMap;
import java.util.concurrent.CountDownLatch;

public class RunnableSendHotspot implements Runnable {
    private static final Logger log = LoggerFactory.getLogger(RunnableSendHotspot.class);

    private KafkaSendService kafkaSendService;

    private HotspotInfoCollected hotspot;

    private CountDownLatch countDownLatch;

    public RunnableSendHotspot(KafkaSendService kafkaSendService, HotspotInfoCollected hotspot, CountDownLatch countDownLatch) {
        this.kafkaSendService = kafkaSendService;
        this.hotspot = hotspot;
        this.countDownLatch = countDownLatch;
    }

    @Override
    public void run() {
        log2ES(ProcPhase.collect.toString(), ProcStatus.suc.toString(), 1);
        kafkaSendService.sendHotspotInfoCollected(hotspot);
        countDownLatch.countDown();
    }

    private void log2ES(String procPhase, String procStatus, Integer count) {
        HashMap<String, Object> logMap;
        logMap = (HashMap<String, Object>) MetricUtil.getMap("wifi_probe", "hik", DataType.struct_data.toString(),
                procPhase, procStatus, "ftp", SinkType.kafka.toString(), count);
        log.info(new Gson().toJson(logMap) + "    Count:" + countDownLatch.getCount());
    }
}
View Code

2.RunnableSendTerm類:

package com.suncreate.wifi.service;

import com.google.gson.Gson;
import com.suncreate.logback.elasticsearch.metric.DataType;
import com.suncreate.logback.elasticsearch.metric.ProcPhase;
import com.suncreate.logback.elasticsearch.metric.ProcStatus;
import com.suncreate.logback.elasticsearch.metric.SinkType;
import com.suncreate.logback.elasticsearch.util.MetricUtil;
import com.suncreate.wifi.model.TermInfoCharacteristics;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.HashMap;
import java.util.concurrent.CountDownLatch;

public class RunnableSendTerm implements Runnable {
    private static final Logger log = LoggerFactory.getLogger(RunnableSendTerm.class);

    private KafkaSendService kafkaSendService;

    private TermInfoCharacteristics termInfo;

    private CountDownLatch countDownLatch;

    public RunnableSendTerm(KafkaSendService kafkaSendService, TermInfoCharacteristics termInfo, CountDownLatch countDownLatch) {
        this.kafkaSendService = kafkaSendService;
        this.termInfo = termInfo;
        this.countDownLatch = countDownLatch;
    }

    @Override
    public void run() {
        log2ES(ProcPhase.collect.toString(), ProcStatus.suc.toString(), 1);
        kafkaSendService.sendTermInfoCharacteristics(termInfo);
        countDownLatch.countDown();
    }

    private void log2ES(String procPhase, String procStatus, Integer count) {
        HashMap<String, Object> logMap;
        logMap = (HashMap<String, Object>) MetricUtil.getMap("wifi_probe", "hik", DataType.struct_data.toString(),
                procPhase, procStatus, "ftp", SinkType.kafka.toString(), count);
        log.info(new Gson().toJson(logMap) + "    Count:" + countDownLatch.getCount());
    }
}
View Code

輔助代碼:

1.ZipProcessTime類:記錄資料處理任務最后活動的時間

package com.suncreate.wifi.service;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.Date;

/**
 * 記錄資料處理任務最后活動的時間
 */
public class ZipProcessTime {
    private static final Logger log = LoggerFactory.getLogger(ZipProcessTime.class);

    private static long time = System.currentTimeMillis();

    public static void updateTime() {
        time = System.currentTimeMillis();
        log.info("ZipProcessTime 已更新,時間戳=" + time);
    }

    public static long getTime() {
        return time;
    }

}
View Code

2.CheckScheduleConfig類:配置一個定時任務用于監控資料處理任務是否存活,如果長時間不存活,則重啟任務(實際運行程序中,這種情況未出現過)

package com.suncreate.wifi.task;

import com.suncreate.wifi.service.ZipProcessTime;
import com.suncreate.wifi.service.ReadFtpFileService;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Configuration;
import org.springframework.scheduling.annotation.EnableScheduling;
import org.springframework.scheduling.annotation.SchedulingConfigurer;
import org.springframework.scheduling.config.ScheduledTaskRegistrar;
import org.springframework.scheduling.support.CronTrigger;

/**
 * master節點與worker節點之間的socket通信有可能失敗,所以需要一個監控,以重啟資料處理任務
 */
@Configuration
@EnableScheduling
public class CheckScheduleConfig implements SchedulingConfigurer {
    private static final Logger log = LoggerFactory.getLogger(CheckScheduleConfig.class);

    @Value("${checkQuarter}")
    private String checkQuarter;

    @Autowired
    private ReadFtpFileService readFtpFileService;

    @Value("${node.name}")
    private String nodeName;

    @Value("${master.name}")
    private String masterName;

    @Override
    public void configureTasks(ScheduledTaskRegistrar taskRegistrar) {
        taskRegistrar.addTriggerTask(() -> {
            if (!nodeName.equals(masterName)) { //master節點不處理資料,跳過
                double sec = (System.currentTimeMillis() - ZipProcessTime.getTime()) / 1000.0;

                log.info("ZipProcessTime 已經 " + sec + " 秒沒有更新");
                if (sec > 1800) {
                    log.info("ZipProcessTime 已經長時間沒有更新,重啟 ProcessZip");
                    readFtpFileService.ProcessZip();
                }
            }

        }, triggerContext -> new CronTrigger(checkQuarter).nextExecutionTime(triggerContext));
    }
}
View Code

 

====================== 分隔線 ==============================================================================================

目前程式已在現網連續穩定運行一周

 

轉載請註明出處,本文鏈接:https://www.uj5u.com/qita/270703.html

標籤:其他

上一篇:c++學習的一些忠告(轉載)

下一篇:Unity 基于 WebRTC 的云功能

標籤雲
其他(157675) Python(38076) JavaScript(25376) Java(17977) C(15215) 區塊鏈(8255) C#(7972) AI(7469) 爪哇(7425) MySQL(7132) html(6777) 基礎類(6313) sql(6102) 熊猫(6058) PHP(5869) 数组(5741) R(5409) Linux(5327) 反应(5209) 腳本語言(PerlPython)(5129) 非技術區(4971) Android(4554) 数据框(4311) css(4259) 节点.js(4032) C語言(3288) json(3245) 列表(3129) 扑(3119) C++語言(3117) 安卓(2998) 打字稿(2995) VBA(2789) Java相關(2746) 疑難問題(2699) 细绳(2522) 單片機工控(2479) iOS(2429) ASP.NET(2402) MongoDB(2323) 麻木的(2285) 正则表达式(2254) 字典(2211) 循环(2198) 迅速(2185) 擅长(2169) 镖(2155) 功能(1967) .NET技术(1958) Web開發(1951) python-3.x(1918) HtmlCss(1915) 弹簧靴(1913) C++(1909) xml(1889) PostgreSQL(1872) .NETCore(1853) 谷歌表格(1846) Unity3D(1843) for循环(1842)

熱門瀏覽
  • 網閘典型架構簡述

    網閘架構一般分為兩種:三主機的三系統架構網閘和雙主機的2+1架構網閘。 三主機架構分別為內端機、外端機和仲裁機。三機無論從軟體和硬體上均各自獨立。首先從硬體上來看,三機都用各自獨立的主板、記憶體及存盤設備。從軟體上來看,三機有各自獨立的作業系統。這樣能達到完全的三機獨立。對于“2+1”系統,“2”分為 ......

    uj5u.com 2020-09-10 02:00:44 more
  • 如何從xshell上傳檔案到centos linux虛擬機里

    如何從xshell上傳檔案到centos linux虛擬機里及:虛擬機CentOs下執行 yum -y install lrzsz命令,出現錯誤:鏡像無法找到軟體包 前言 一、安裝lrzsz步驟 二、上傳檔案 三、遇到的問題及解決方案 總結 前言 提示:其實很簡單,往虛擬機上安裝一個上傳檔案的工具 ......

    uj5u.com 2020-09-10 02:00:47 more
  • 一、SQLMAP入門

    一、SQLMAP入門 1、判斷是否存在注入 sqlmap.py -u 網址/id=1 id=1不可缺少。當注入點后面的引數大于兩個時。需要加雙引號, sqlmap.py -u "網址/id=1&uid=1" 2、判斷文本中的請求是否存在注入 從文本中加載http請求,SQLMAP可以從一個文本檔案中 ......

    uj5u.com 2020-09-10 02:00:50 more
  • Metasploit 簡單使用教程

    metasploit 簡單使用教程 浩先生, 2020-08-28 16:18:25 分類專欄: kail 網路安全 linux 文章標簽: linux資訊安全 編輯 著作權 metasploit 使用教程 前言 一、Metasploit是什么? 二、準備作業 三、具體步驟 前言 Msfconsole ......

    uj5u.com 2020-09-10 02:00:53 more
  • 游戲逆向之驅動層與用戶層通訊

    驅動層代碼: #pragma once #include <ntifs.h> #define add_code CTL_CODE(FILE_DEVICE_UNKNOWN,0x800,METHOD_BUFFERED,FILE_ANY_ACCESS) /* 更多游戲逆向視頻www.yxfzedu.com ......

    uj5u.com 2020-09-10 02:00:56 more
  • 北斗電力時鐘(北斗授時服務器)讓網路資料更精準

    北斗電力時鐘(北斗授時服務器)讓網路資料更精準 北斗電力時鐘(北斗授時服務器)讓網路資料更精準 京準電子科技官微——ahjzsz 近幾年,資訊技術的得了快速發展,互聯網在逐漸普及,其在人們生活和生產中都得到了廣泛應用,并且取得了不錯的應用效果。計算機網路資訊在電力系統中的應用,一方面使電力系統的運行 ......

    uj5u.com 2020-09-10 02:01:03 more
  • 【CTF】CTFHub 技能樹 彩蛋 writeup

    ?碎碎念 CTFHub:https://www.ctfhub.com/ 筆者入門CTF時時剛開始刷的是bugku的舊平臺,后來才有了CTFHub。 感覺不論是網頁UI設計,還是題目質量,賽事跟蹤,工具軟體都做得很不錯。 而且因為獨到的金幣制度的確讓人有一種想去刷題賺金幣的感覺。 個人還是非常喜歡這個 ......

    uj5u.com 2020-09-10 02:04:05 more
  • 02windows基礎操作

    我學到了一下幾點 Windows系統目錄結構與滲透的作用 常見Windows的服務詳解 Windows埠詳解 常用的Windows注冊表詳解 hacker DOS命令詳解(net user / type /md /rd/ dir /cd /net use copy、批處理 等) 利用dos命令制作 ......

    uj5u.com 2020-09-10 02:04:18 more
  • 03.Linux基礎操作

    我學到了以下幾點 01Linux系統介紹02系統安裝,密碼啊破解03Linux常用命令04LAMP 01LINUX windows: win03 8 12 16 19 配置不繁瑣 Linux:redhat,centos(紅帽社區版),Ubuntu server,suse unix:金融機構,證券,銀 ......

    uj5u.com 2020-09-10 02:04:30 more
  • 05HTML

    01HTML介紹 02頭部標簽講解03基礎標簽講解04表單標簽講解 HTML前段語言 js1.了解代碼2.根據代碼 懂得挖掘漏洞 (POST注入/XSS漏洞上傳)3.黑帽seo 白帽seo 客戶網站被黑帽植入劫持代碼如何處理4.熟悉html表單 <html><head><title>TDK標題,描述 ......

    uj5u.com 2020-09-10 02:04:36 more
最新发布
  • 2023年最新微信小程式抓包教程

    01 開門見山 隔一個月發一篇文章,不過分。 首先回顧一下《微信系結手機號資料庫被脫庫事件》,我也是第一時間得知了這個訊息,然后跟蹤了整件事情的經過。下面是這起事件的相關截圖以及近日流出的一萬條資料樣本: 個人認為這件事也沒什么,還不如關注一下之前45億快遞資料查詢渠道疑似在近日復活的訊息。 訊息是 ......

    uj5u.com 2023-04-20 08:48:24 more
  • web3 產品介紹:metamask 錢包 使用最多的瀏覽器插件錢包

    Metamask錢包是一種基于區塊鏈技術的數字貨幣錢包,它允許用戶在安全、便捷的環境下管理自己的加密資產。Metamask錢包是以太坊生態系統中最流行的錢包之一,它具有易于使用、安全性高和功能強大等優點。 本文將詳細介紹Metamask錢包的功能和使用方法。 一、 Metamask錢包的功能 數字資 ......

    uj5u.com 2023-04-20 08:47:46 more
  • vulnhub_Earth

    前言 靶機地址->>>vulnhub_Earth 攻擊機ip:192.168.20.121 靶機ip:192.168.20.122 參考文章 https://www.cnblogs.com/Jing-X/archive/2022/04/03/16097695.html https://www.cnb ......

    uj5u.com 2023-04-20 07:46:20 more
  • 從4k到42k,軟體測驗工程師的漲薪史,給我看哭了

    清明節一過,盲猜大家已經無心上班,在數著日子準備過五一,但一想到銀行卡里的余額……瞬間心情就不美麗了。最近,2023年高校畢業生就業調查顯示,本科畢業月平均起薪為5825元。調查一出,便有很多同學表示自己又被平均了。看著這一資料,不免讓人想到前不久中國青年報的一項調查:近六成大學生認為畢業10年內會 ......

    uj5u.com 2023-04-20 07:44:00 more
  • 最新版本 Stable Diffusion 開源 AI 繪畫工具之中文自動提詞篇

    🎈 標簽生成器 由于輸入正向提示詞 prompt 和反向提示詞 negative prompt 都是使用英文,所以對學習母語的我們非常不友好 使用網址:https://tinygeeker.github.io/p/ai-prompt-generator 這個網址是為了讓大家在使用 AI 繪畫的時候 ......

    uj5u.com 2023-04-20 07:43:36 more
  • 漫談前端自動化測驗演進之路及測驗工具分析

    隨著前端技術的不斷發展和應用程式的日益復雜,前端自動化測驗也在不斷演進。隨著 Web 應用程式變得越來越復雜,自動化測驗的需求也越來越高。如今,自動化測驗已經成為 Web 應用程式開發程序中不可或缺的一部分,它們可以幫助開發人員更快地發現和修復錯誤,提高應用程式的性能和可靠性。 ......

    uj5u.com 2023-04-20 07:43:16 more
  • CANN開發實踐:4個DVPP記憶體問題的典型案例解讀

    摘要:由于DVPP媒體資料處理功能對存放輸入、輸出資料的記憶體有更高的要求(例如,記憶體首地址128位元組對齊),因此需呼叫專用的記憶體申請介面,那么本期就分享幾個關于DVPP記憶體問題的典型案例,并給出原因分析及解決方法。 本文分享自華為云社區《FAQ_DVPP記憶體問題案例》,作者:昇騰CANN。 DVPP ......

    uj5u.com 2023-04-20 07:43:03 more
  • msf學習

    msf學習 以kali自帶的msf為例 一、msf核心模塊與功能 msf模塊都放在/usr/share/metasploit-framework/modules目錄下 1、auxiliary 輔助模塊,輔助滲透(埠掃描、登錄密碼爆破、漏洞驗證等) 2、encoders 編碼器模塊,主要包含各種編碼 ......

    uj5u.com 2023-04-20 07:42:59 more
  • Halcon軟體安裝與界面簡介

    1. 下載Halcon17版本到到本地 2. 雙擊安裝包后 3. 步驟如下 1.2 Halcon軟體安裝 界面分為四大塊 1. Halcon的五個助手 1) 影像采集助手:與相機連接,設定相機引數,采集影像 2) 標定助手:九點標定或是其它的標定,生成標定檔案及內參外參,可以將像素單位轉換為長度單位 ......

    uj5u.com 2023-04-20 07:42:17 more
  • 在MacOS下使用Unity3D開發游戲

    第一次發博客,先發一下我的游戲開發環境吧。 去年2月份買了一臺MacBookPro2021 M1pro(以下簡稱mbp),這一年來一直在用mbp開發游戲。我大致分享一下我的開發工具以及使用體驗。 1、Unity 官網鏈接: https://unity.cn/releases 我一般使用的Apple ......

    uj5u.com 2023-04-20 07:40:19 more