為什么一個星期作業量的作業,我做了一個多月,還沒結束
為什么一個簡單的小任務,我遇到這么多難題
這是一個HIK平臺WIFI資料接入的作業,先看下我的代碼提交記錄:


首先有這兩方面的原因:1、初學Java時間不長,不夠熟練,這個原因浪費的時間并不多,2、與資料提供方溝通浪費的時間,因資料有問題,即DeviceId和資料庫中的DeviceId對不上,導致程式篩選不到資料,需要對方修改相關配置,由于我感覺事情并不急,所以是隔幾天催一下,浪費了不少天的時間,最后DeviceId依然對不上,我是怎么解決的呢?由于基本上是一個設備布在一個地點,所以我是根據資料中的經緯度來篩選資料的,這種辦法其實還是有一點點問題的,
再說寫代碼花費的時間,服務部署到現網后,就開始發現BUG了,程式處理資料的速度不夠,導致FTP上的ZIP包處理不完大量積壓,因為我用的是單執行緒處理資料,遂改成多執行緒處理資料,
經過優化和測驗,發現資料處理速度還是不夠,FTP上資料產生的速度大約是380條每秒,可能會更多,而單機每秒最多只能處理200多條資料,繼續嘗試優化,但還是不行,所以我面臨第一個重要問題:把程式部署到多臺機器,提高資料處理速度,程式該怎么寫?
對于這個問題,經過探索,我最終的解決辦法是:把程式部署在5臺機器上,程式跑起來后,能拿到機器名稱,FTP上的ZIP檔案名帶有時間戳,根據機器名和時間戳的最后一位,把ZIP檔案分配到5臺機器處理,這樣就解決了資料處理速度不夠的問題,
我想解決的第二個重要問題是:由于我目前做的是大資料維護,學了點Spark和Flink知識,所以我想用Spark改寫,已經寫好的程式是SpringBoot的,所以我要做的是SpringBoot整合Spark,周末我在家搭了一個Hadoop+Spark的分布式集群環境,寫了個純Spark的Demo跑,正常,然后寫了個SpringBoot整合Spark的Demo,本地模式跑,即.setMaster("local[3]"),正常,但是提交到集群跑,web api和swagger在線檔案正常,任務跑起來后在Spark Web頁面可以看到,但日志報錯,我試了3種方式,一種是打war包部署在tomcat里,一種是用命令java -jar運行jar包,一種是用spark-submit命令運行jar包,任務跑起來后都報錯了,三種方式錯誤也不相同,搞到凌晨4點多,沒有解決,放棄,不過,就算我成功了,接下來的問題,可能依然無法解決,就是資料處理完后,要推送到Kafka,網上的教學都是教Spark怎么處理Kafka流資料,Kafka資料是作為資料源的,不是作為目的地的,所以我的想法可能本身就是個問題,
解釋一下為什么要SpringBoot整合Spark,只用Spark不用SpringBoot不行嗎?實際上之前同事就是這么干的,要么用SpringBoot,要么用Spark或Flink,沒有把SpringBoot和二者在一起用過,為什么我要這么做?因為我要讀mysql資料庫,我要用JDBC或者Mybatis等,如果不用SpringBoot,有些東西可能要自己搞,不太方便,
既然SpringBoot整合Spark沒有成功,那資料分配不均勻的問題怎么解決(FTP上ZIP檔案5分種產生5個檔案,2個資料量大,3個不是我需要的資料可以直接刪了,但不管是按時間戳這個特征分配,還是按序號這個特征分配,都無法均勻分配,可能會導致一個節點資料積壓,另外3個節點沒有要處理的資料,即一個節點有難,3個節點圍觀,雖然時間尺度拉大后,資料分配是均勻的,但是資料處理延遲大了,5分鐘產生一個ZIP檔案,意味著,有的資料已經延遲處理了5分鐘,而我這邊還要延遲幾分鐘到幾十分鐘不等)?這是我準備解決的第三個重要問題,
我用Socket解決了這個問題,我把程式部署在5個節點上,一個作為master節點,4個作為worker節點,程式啟動后根據機器名判斷,確定master節點,master節點從FTP上下載ZIP檔案,FTP上ZIP檔案可能很多,先只下載一個檔案進行處理,一個檔案中可能有多達10萬條資料,也可能就幾萬條資料,還可能不是我需要的資料,直接刪掉該ZIP檔案即可,然后決議資料,再然后把資料集合平均分成4份,通過Socket發送給4個worker節點,worker節點收到資料集合,進行篩選和處理,然后推送到Kafka,資料處理完后,給master節點發送一條訊息,可以重復發送幾次,再加上是局域網,以確保master節點能收到訊息(這個很重要,但這里也有問題),master節點收到4個節點資料處理完成的訊息后,洗掉FTP上的該ZIP檔案,然后進行下一次處理,直到FTP上的ZIP檔案全部處理完成并洗掉,我今天寫完后,把寫ES日志和發Kafka以及洗掉ZIP檔案的代碼注釋掉,耗時的地方用Thread.sleep代替,部署到真實環境,看能不能穩定跑上一天,我感覺用Socket實作分布式資料處理,雖然能解決資料分配不均勻的問題,但是程式穩定性變差了,如果master節點收不到worker節點發來的資料處理完成的訊息怎么辦?假設其中一個worker節點的程式掛掉了呢?
雖然只是一個簡單的小任務,我真的是非常努力,如果我最初的設想成功,以后的類似服務都可以這么寫,意義重大,可惜沒搞成功,退而求其次,用Socket寫分布式處理程式,我好像迷迷糊糊明白為什么Spark要依賴Hadoop了,我自己用Socket寫漏洞很多啊,沒有把資料持久化,萬一漰了,資料就丟了,
有沒有大佬給指點一下,我努力的思路是不是有問題,有沒有代碼又容易寫,又不容易寫錯,程式又穩定可靠的方案?
====================== 分隔線 ==============================================================================================
上面的使用Socket的方案存在的問題:1.master節點把大量資料發送到worker節點處理,占用大量帶寬,并且耗時,2.master節點需要等待所有worker節點全部處理完成后,才能進行下一次處理,因網路傳輸耗時和各機器性能差異,導致某些worker節點空閑時間長,浪費大量時間
下面是新的使用Socket的方案,有下列優點:1.master節點和worker節點通信,只傳輸命令,不傳輸資料內容,節省帶寬,省時可靠,2.worker節點處理完資料,立即請求下一次處理,每一個worker節點得到充分利用
最終實作方案:
通過分布式鎖,把FTP上的zip檔案,均勻的分配到7個worker主機節點進行處理,master節點不處理資料,只負責處理7個woker節點的鎖請求與資料處理請求,7個worker節點只要有3個正常作業,就能跟上FTP上資料產生的速度,不會造成zip檔案積壓,該方式資料處理延遲很小,能夠及時處理FTP上產生的資料
大致流程:
1.任務啟動,判斷當前worker節點在FTP上是否存在私有檔案,如果存在,則進入直接處理流程
2.如果私有檔案不存在,則worker節點向master節點請求鎖
3.master節點收到請求,發放鎖
4.worker節點收到鎖,重命名FTP上的一個檔案為私有檔案名,該檔案名對其它節點不可見(各worker節點通過正則運算式判斷處理),然后釋放鎖
5.master節點收到釋放鎖的請求,釋放信號量,使其能夠繼續處理下一個節點的鎖請求
6.worker節點釋放鎖后,下載檔案并處理資料,資料處理完,洗掉私有檔案,并再次請求鎖,以進行下一個檔案的處理
主要實作代碼:
1.ReadFtpFileService類:任務啟動入口,判斷FTP上私有檔案是否存在,向master節點請求鎖或者請求直接處理
package com.suncreate.wifi.service; import com.suncreate.wifi.hikmodel.SocketData; import com.suncreate.wifi.tool.FtpUtils; import com.suncreate.wifi.tool.SocketUtil; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Value; import org.springframework.stereotype.Service; import java.util.ArrayList; import java.util.List; @Service public class ReadFtpFileService { private static final Logger log = LoggerFactory.getLogger(ReadFtpFileService.class); @Value("${ftp.host}") private String ftpHost; @Value("${ftp.port}") private int ftpPort; @Value("${ftp.username}") private String ftpUserName; @Value("${ftp.password}") private String ftpPassword; @Value("${node.name}") private String nodeName; @Value("${master.ip}") private String masterIp; @Value("${master.name}") private String masterName; /** * 處理FTP上的zip資料 */ public void ProcessZip() { try { if (nodeName.equals(masterName)) return; //master節點不處理資料,跳過 log.info("ProcessZip() 開始"); List<String> ftpFileList = getFtpFileList(); if (ftpFileList != null && ftpFileList.size() > 0) { boolean bl = false; String fileName = null; for (String ftpFileName : ftpFileList) { if (ftpFileName.endsWith(nodeName + ".zip")) { bl = true; fileName = ftpFileName; } } if (bl) { SocketUtil.Send(masterIp, new SocketData(nodeName, 3, fileName)); //3:直接處理請求 ZipProcessTime.updateTime(); } else { SocketUtil.Send(masterIp, new SocketData(nodeName, 0)); //0:請求鎖 ZipProcessTime.updateTime(); } } else { ZipProcessTime.updateTime(); RunProcessZipThread thread = new RunProcessZipThread(this); thread.start(); } log.info("ProcessZip() 結束"); } catch (Exception e) { log.error("ProcessZip 出錯", e); } } private List<String> getFtpFileList() { List<String> result = new ArrayList<>(); FtpUtils ftpUtils = new FtpUtils(); try { ftpUtils.login(ftpHost, ftpPort, ftpUserName, ftpPassword); ftpUtils.listFileName("/"); result = ftpUtils.getFileNameList(); } catch (Exception e) { log.error("getFtpFileList() failed", e); } finally { ftpUtils.disConnection(); } return result; } }View Code
2.SocketServer類:用于啟動Socket服務端執行緒
package com.suncreate.wifi.service; import com.suncreate.wifi.hikmodel.SocketData; import com.suncreate.wifi.model.HotspotInfoCollected; import com.suncreate.wifi.model.TermInfoCharacteristics; import com.suncreate.wifi.task.CheckScheduleConfig; import com.suncreate.wifi.tool.FtpUtils; import com.suncreate.wifi.tool.SerializeUtil; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Value; import org.springframework.stereotype.Service; import java.io.*; import java.net.ServerSocket; import java.net.Socket; import java.util.List; import java.util.concurrent.CountDownLatch; import java.util.concurrent.Executors; import java.util.concurrent.ThreadPoolExecutor; @Service public class SocketServer { private static final Logger log = LoggerFactory.getLogger(SocketServer.class); @Autowired private KafkaSendService kafkaSendService; @Autowired private ReadFtpFileService readFtpFileService; @Value("${ftp.host}") private String ftpHost; @Value("${ftp.port}") private int ftpPort; @Value("${ftp.username}") private String ftpUserName; @Value("${ftp.password}") private String ftpPassword; @Value("${master.ip}") private String masterIp; @Value("${node.name}") private String nodeName; @Value("${master.name}") private String masterName; public void start() throws IOException { ServerSocket serverSocket = new ServerSocket(18060); log.info("當前節點主機名稱:" + nodeName); if (nodeName.equals(masterName)) { //master節點多起幾個執行緒與worker節點通信 for (int i = 0; i < 7; i++) { SocketThread socketThread = new SocketThread(serverSocket, kafkaSendService, readFtpFileService, ftpHost, ftpPort, ftpUserName, ftpPassword, masterIp, nodeName); socketThread.start(); } } else { //worker節點起一個執行緒與master通信即可 SocketThread socketThread = new SocketThread(serverSocket, kafkaSendService, readFtpFileService, ftpHost, ftpPort, ftpUserName, ftpPassword, masterIp, nodeName); socketThread.start(); } } }View Code
3.SocketThread類:Socket服務端接收命令,處理命令
package com.suncreate.wifi.service; import com.suncreate.wifi.hikmodel.SocketData; import com.suncreate.wifi.hikmodel.ZipData; import com.suncreate.wifi.model.HotspotInfoCollected; import com.suncreate.wifi.model.TermInfoCharacteristics; import com.suncreate.wifi.tool.FtpUtils; import com.suncreate.wifi.tool.SerializeUtil; import com.suncreate.wifi.tool.SocketUtil; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.io.*; import java.net.ServerSocket; import java.net.Socket; import java.util.ArrayList; import java.util.List; import java.util.concurrent.*; import java.util.regex.Pattern; public class SocketThread extends Thread { private static final Logger log = LoggerFactory.getLogger(SocketThread.class); private static ThreadPoolExecutor threadPool = (ThreadPoolExecutor) Executors.newFixedThreadPool(256); private static Semaphore semaphore = new Semaphore(1); private ServerSocket serverSocket; private KafkaSendService kafkaSendService; private ReadFtpFileService readFtpFileService; private String ftpHost; private int ftpPort; private String ftpUserName; private String ftpPassword; private String masterIp; //master節點IP private String nodeName; public SocketThread(ServerSocket serverSocket, KafkaSendService kafkaSendService, ReadFtpFileService readFtpFileService, String ftpHost, int ftpPort, String ftpUserName, String ftpPassword, String masterIp, String nodeName) { this.serverSocket = serverSocket; this.kafkaSendService = kafkaSendService; this.readFtpFileService = readFtpFileService; this.ftpHost = ftpHost; this.ftpPort = ftpPort; this.ftpUserName = ftpUserName; this.ftpPassword = ftpPassword; this.masterIp = masterIp; this.nodeName = nodeName; } public void run() { try { while (true) { try { Socket socket = serverSocket.accept(); InputStream inputStream = socket.getInputStream(); DataInputStream dataInputStream = new DataInputStream(inputStream); BufferedInputStream bufferedInputStream = new BufferedInputStream(dataInputStream); ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream(); byte[] bytes = new byte[10240]; int len; while ((len = bufferedInputStream.read(bytes)) > 0) { byteArrayOutputStream.write(bytes, 0, len); } SocketData socketData = (SocketData) SerializeUtil.Deserialize(byteArrayOutputStream.toByteArray()); socket.close(); //0:請求鎖 if (socketData.getCommand().equals(0)) { log.info("收到節點 " + socketData.getNodeName() + " " + socket.getInetAddress().getHostAddress() + " 的命令 0:請求鎖"); semaphore.tryAcquire(30, TimeUnit.SECONDS); SocketUtil.Send(socket.getInetAddress().getHostAddress(), new SocketData(nodeName, 1)); //1:發放鎖 log.info("向節點 " + socket.getInetAddress().getHostAddress() + " 發放鎖"); } //1:發放鎖 if (socketData.getCommand().equals(1)) { log.info("收到節點 " + socketData.getNodeName() + " " + socket.getInetAddress().getHostAddress() + " 的命令 1:發放鎖"); List<String> ftpFileList = getFtpFileList(); //篩選 String pattern = "^[\\S\\s]*-[0-9]*-[0-9]*-[0-9]*.zip$"; for (int i = ftpFileList.size() - 1; i >= 0; i--) { String ftpFileName = ftpFileList.get(i); boolean bl = Pattern.matches(pattern, ftpFileName); if (!bl) { ftpFileList.remove(i); } } if (ftpFileList != null && ftpFileList.size() > 0) { String newFtpFileName = ftpFileList.get(0).replace(".zip", "") + "-" + nodeName + ".zip"; renameFtpFile(ftpFileList.get(0), newFtpFileName); SocketUtil.Send(socket.getInetAddress().getHostAddress(), new SocketData(nodeName, 2)); //2:釋放鎖 byte[] file = downloadFtpFile(newFtpFileName); if (file != null) { ZipData zipData = new ZipData(file); if (zipData.getHotsoptList().size() > 0 || zipData.getTerminfoList().size() > 0) { processZipData(zipData); removeFtpFile(ftpHost, ftpPort, ftpUserName, ftpPassword, newFtpFileName); //洗掉FTP上的zip檔案 SocketUtil.Send(masterIp, new SocketData(nodeName, 0)); //0:再次請求鎖 ZipProcessTime.updateTime(); } else { removeFtpFile(ftpHost, ftpPort, ftpUserName, ftpPassword, newFtpFileName); //洗掉FTP上的zip檔案 RunProcessZipThread thread = new RunProcessZipThread(readFtpFileService); thread.start(); } } else { RunProcessZipThread thread = new RunProcessZipThread(readFtpFileService); thread.start(); } } else { SocketUtil.Send(socket.getInetAddress().getHostAddress(), new SocketData(nodeName, 2)); //2:釋放鎖 RunProcessZipThread thread = new RunProcessZipThread(readFtpFileService); thread.start(); } } //2:釋放鎖 if (socketData.getCommand().equals(2)) { log.info("收到節點 " + socketData.getNodeName() + " " + socket.getInetAddress().getHostAddress() + " 的命令 2:釋放鎖"); semaphore.release(); } //3:直接處理請求 if (socketData.getCommand().equals(3)) { log.info("收到節點 " + socketData.getNodeName() + " " + socket.getInetAddress().getHostAddress() + " 的命令 3:直接處理請求"); SocketUtil.Send(socket.getInetAddress().getHostAddress(), new SocketData(nodeName, 4, socketData.getFtpFileName())); //4:直接處理命令 } //4:直接處理命令 if (socketData.getCommand().equals(4)) { log.info("收到節點 " + socketData.getNodeName() + " " + socket.getInetAddress().getHostAddress() + " 的命令 4:直接處理命令"); String ftpFileName = socketData.getFtpFileName(); byte[] file = downloadFtpFile(ftpFileName); if (file != null) { ZipData zipData = new ZipData(file); if (zipData.getHotsoptList().size() > 0 || zipData.getTerminfoList().size() > 0) { processZipData(zipData); removeFtpFile(ftpHost, ftpPort, ftpUserName, ftpPassword, ftpFileName); //洗掉FTP上的zip檔案 SocketUtil.Send(masterIp, new SocketData(nodeName, 0)); //0:再次請求鎖 ZipProcessTime.updateTime(); } else { removeFtpFile(ftpHost, ftpPort, ftpUserName, ftpPassword, ftpFileName); //洗掉FTP上的zip檔案 RunProcessZipThread thread = new RunProcessZipThread(readFtpFileService); thread.start(); } } else { RunProcessZipThread thread = new RunProcessZipThread(readFtpFileService); thread.start(); } } } catch (Exception e) { log.error("SocketServer while 出錯", e); } } } catch (Exception e) { log.error("SocketServer run 出錯", e); } } private void processZipData(ZipData zipData) { CountDownLatch countDownLatch = new CountDownLatch(zipData.getHotsoptList().size() + zipData.getTerminfoList().size()); for (HotspotInfoCollected hotspot : zipData.getHotsoptList()) { RunnableSendHotspot runnableSendHotspot = new RunnableSendHotspot(kafkaSendService, hotspot, countDownLatch); threadPool.submit(runnableSendHotspot); } for (TermInfoCharacteristics terminfo : zipData.getTerminfoList()) { RunnableSendTerm runnableSendTerm = new RunnableSendTerm(kafkaSendService, terminfo, countDownLatch); threadPool.submit(runnableSendTerm); } try { countDownLatch.await(); } catch (InterruptedException e) { log.error("countDownLatch.await() failed", e); } } private void renameFtpFile(String oldName, String newName) { FtpUtils ftpUtils = new FtpUtils(); try { ftpUtils.login(ftpHost, ftpPort, ftpUserName, ftpPassword); ftpUtils.renameFtpFile(oldName, newName); } catch (Exception e) { log.error("RenameFtpFile() failed", e); } finally { ftpUtils.disConnection(); } } private List<String> getFtpFileList() { List<String> result = new ArrayList<>(); FtpUtils ftpUtils = new FtpUtils(); try { ftpUtils.login(ftpHost, ftpPort, ftpUserName, ftpPassword); ftpUtils.listFileName("/"); result = ftpUtils.getFileNameList(); } catch (Exception e) { log.error("getFtpFileList() failed", e); } finally { ftpUtils.disConnection(); } return result; } private byte[] downloadFtpFile(String filePath) { byte[] result = null; FtpUtils ftpUtils = new FtpUtils(); try { ftpUtils.login(ftpHost, ftpPort, ftpUserName, ftpPassword); result = ftpUtils.downloadFtpFile(filePath); int tryCount = 1; while (result == null && tryCount < 5) { log.info("下載FTP檔案 " + filePath + " 失敗,嘗試再次下載" + tryCount); result = ftpUtils.downloadFtpFile(filePath); Thread.sleep(3000); tryCount++; } } catch (Exception e) { log.error("downloadFtpFile(filePath) failed", e); } finally { ftpUtils.disConnection(); } return result; } private boolean removeFtpFile(String ftpHost, int ftpPort, String ftpUserName, String ftpPassword, String filePath) { boolean result = false; FtpUtils ftpUtils = new FtpUtils(); try { ftpUtils.login(ftpHost, ftpPort, ftpUserName, ftpPassword); ftpUtils.listFileName("/"); result = ftpUtils.removeFtpFile(filePath); } catch (Exception e) { log.error("removeFtpFile() failed", e); } finally { ftpUtils.disConnection(); } return result; } }View Code
4.RunProcessZipThread類:負責任務的重啟
package com.suncreate.wifi.service; /** * 用于重啟資料處理任務 */ public class RunProcessZipThread extends Thread { private ReadFtpFileService readFtpFileService; public RunProcessZipThread(ReadFtpFileService readFtpFileService) { this.readFtpFileService = readFtpFileService; } public void run() { try { Thread.sleep(20000); } catch (InterruptedException e) { e.printStackTrace(); } readFtpFileService.ProcessZip(); } }View Code
實際處理資料的代碼:
1.RunnableSendHotspot類:
package com.suncreate.wifi.service; import com.google.gson.Gson; import com.suncreate.logback.elasticsearch.metric.DataType; import com.suncreate.logback.elasticsearch.metric.ProcPhase; import com.suncreate.logback.elasticsearch.metric.ProcStatus; import com.suncreate.logback.elasticsearch.metric.SinkType; import com.suncreate.logback.elasticsearch.util.MetricUtil; import com.suncreate.wifi.model.HotspotInfoCollected; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.util.HashMap; import java.util.concurrent.CountDownLatch; public class RunnableSendHotspot implements Runnable { private static final Logger log = LoggerFactory.getLogger(RunnableSendHotspot.class); private KafkaSendService kafkaSendService; private HotspotInfoCollected hotspot; private CountDownLatch countDownLatch; public RunnableSendHotspot(KafkaSendService kafkaSendService, HotspotInfoCollected hotspot, CountDownLatch countDownLatch) { this.kafkaSendService = kafkaSendService; this.hotspot = hotspot; this.countDownLatch = countDownLatch; } @Override public void run() { log2ES(ProcPhase.collect.toString(), ProcStatus.suc.toString(), 1); kafkaSendService.sendHotspotInfoCollected(hotspot); countDownLatch.countDown(); } private void log2ES(String procPhase, String procStatus, Integer count) { HashMap<String, Object> logMap; logMap = (HashMap<String, Object>) MetricUtil.getMap("wifi_probe", "hik", DataType.struct_data.toString(), procPhase, procStatus, "ftp", SinkType.kafka.toString(), count); log.info(new Gson().toJson(logMap) + " Count:" + countDownLatch.getCount()); } }View Code
2.RunnableSendTerm類:
package com.suncreate.wifi.service; import com.google.gson.Gson; import com.suncreate.logback.elasticsearch.metric.DataType; import com.suncreate.logback.elasticsearch.metric.ProcPhase; import com.suncreate.logback.elasticsearch.metric.ProcStatus; import com.suncreate.logback.elasticsearch.metric.SinkType; import com.suncreate.logback.elasticsearch.util.MetricUtil; import com.suncreate.wifi.model.TermInfoCharacteristics; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.util.HashMap; import java.util.concurrent.CountDownLatch; public class RunnableSendTerm implements Runnable { private static final Logger log = LoggerFactory.getLogger(RunnableSendTerm.class); private KafkaSendService kafkaSendService; private TermInfoCharacteristics termInfo; private CountDownLatch countDownLatch; public RunnableSendTerm(KafkaSendService kafkaSendService, TermInfoCharacteristics termInfo, CountDownLatch countDownLatch) { this.kafkaSendService = kafkaSendService; this.termInfo = termInfo; this.countDownLatch = countDownLatch; } @Override public void run() { log2ES(ProcPhase.collect.toString(), ProcStatus.suc.toString(), 1); kafkaSendService.sendTermInfoCharacteristics(termInfo); countDownLatch.countDown(); } private void log2ES(String procPhase, String procStatus, Integer count) { HashMap<String, Object> logMap; logMap = (HashMap<String, Object>) MetricUtil.getMap("wifi_probe", "hik", DataType.struct_data.toString(), procPhase, procStatus, "ftp", SinkType.kafka.toString(), count); log.info(new Gson().toJson(logMap) + " Count:" + countDownLatch.getCount()); } }View Code
輔助代碼:
1.ZipProcessTime類:記錄資料處理任務最后活動的時間
package com.suncreate.wifi.service; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.util.Date; /** * 記錄資料處理任務最后活動的時間 */ public class ZipProcessTime { private static final Logger log = LoggerFactory.getLogger(ZipProcessTime.class); private static long time = System.currentTimeMillis(); public static void updateTime() { time = System.currentTimeMillis(); log.info("ZipProcessTime 已更新,時間戳=" + time); } public static long getTime() { return time; } }View Code
2.CheckScheduleConfig類:配置一個定時任務用于監控資料處理任務是否存活,如果長時間不存活,則重啟任務(實際運行程序中,這種情況未出現過)
package com.suncreate.wifi.task; import com.suncreate.wifi.service.ZipProcessTime; import com.suncreate.wifi.service.ReadFtpFileService; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Value; import org.springframework.context.annotation.Configuration; import org.springframework.scheduling.annotation.EnableScheduling; import org.springframework.scheduling.annotation.SchedulingConfigurer; import org.springframework.scheduling.config.ScheduledTaskRegistrar; import org.springframework.scheduling.support.CronTrigger; /** * master節點與worker節點之間的socket通信有可能失敗,所以需要一個監控,以重啟資料處理任務 */ @Configuration @EnableScheduling public class CheckScheduleConfig implements SchedulingConfigurer { private static final Logger log = LoggerFactory.getLogger(CheckScheduleConfig.class); @Value("${checkQuarter}") private String checkQuarter; @Autowired private ReadFtpFileService readFtpFileService; @Value("${node.name}") private String nodeName; @Value("${master.name}") private String masterName; @Override public void configureTasks(ScheduledTaskRegistrar taskRegistrar) { taskRegistrar.addTriggerTask(() -> { if (!nodeName.equals(masterName)) { //master節點不處理資料,跳過 double sec = (System.currentTimeMillis() - ZipProcessTime.getTime()) / 1000.0; log.info("ZipProcessTime 已經 " + sec + " 秒沒有更新"); if (sec > 1800) { log.info("ZipProcessTime 已經長時間沒有更新,重啟 ProcessZip"); readFtpFileService.ProcessZip(); } } }, triggerContext -> new CronTrigger(checkQuarter).nextExecutionTime(triggerContext)); } }View Code
====================== 分隔線 ==============================================================================================
目前程式已在現網連續穩定運行一周
轉載請註明出處,本文鏈接:https://www.uj5u.com/qita/270703.html
標籤:其他
上一篇:c++學習的一些忠告(轉載)
