Java Kafka 消費積壓監控
后端代碼:
Monitor.java代碼:
package com.suncreate.kafkaConsumerMonitor.service; import com.suncreate.kafkaConsumerMonitor.model.ConsumerInfo; import org.apache.kafka.clients.consumer.ConsumerConfig; import org.apache.kafka.clients.consumer.KafkaConsumer; import org.apache.kafka.clients.consumer.OffsetAndMetadata; import org.apache.kafka.common.PartitionInfo; import org.apache.kafka.common.TopicPartition; import org.apache.kafka.common.serialization.StringDeserializer; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.text.DecimalFormat; import java.text.SimpleDateFormat; import java.util.*; /** * kafka消費監控 * * @author suxiang */ public class Monitor { private static final Logger log = LoggerFactory.getLogger(Monitor.class); private String servers; private String topic; private String groupId; private long lastTime; private long lastTotalLag = 0L; private long lastLogSize = 0L; private long lastOffset = 0L; private double lastRatio = 0; private long speedLogSize = 0L; private long speedOffset = 0L; private String time; private List<ConsumerInfo> list; private final SimpleDateFormat simpleDateFormat = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss"); public String getTime() { return time; } public void setTime(String time) { this.time = time; } public long getLastTotalLag() { return lastTotalLag; } public double getLastRatio() { return lastRatio; } public String getTopic() { return topic; } public String getGroupId() { return groupId; } public long getSpeedLogSize() { return speedLogSize; } public long getSpeedOffset() { return speedOffset; } public List<ConsumerInfo> getList() { return list; } public void setList(List<ConsumerInfo> list) { this.list = list; } private KafkaConsumer<String, String> consumer; private List<TopicPartition> topicPartitionList; private final DecimalFormat decimalFormat = new DecimalFormat("0.00"); public Monitor(String servers, ConsumerGroupsService consumerGroupsService, String topic, String groupId) { this.servers = servers; this.topic = topic; this.groupId = consumerGroupsService.getGroupId(topic, groupId); this.list = new ArrayList<>(); //消費者 Properties properties = new Properties(); properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, this.servers); properties.put(ConsumerConfig.GROUP_ID_CONFIG, this.groupId); properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); consumer = new KafkaConsumer<String, String>(properties); //查詢 topic partitions topicPartitionList = new ArrayList<>(); List<PartitionInfo> partitionInfoList = consumer.partitionsFor(topic); for (PartitionInfo partitionInfo : partitionInfoList) { TopicPartition topicPartition = new TopicPartition(partitionInfo.topic(), partitionInfo.partition()); topicPartitionList.add(topicPartition); } } public void monitor(boolean addToList) { try { long startTime = System.currentTimeMillis(); //查詢 log size Map<Integer, Long> endOffsetMap = new HashMap<>(); Map<TopicPartition, Long> endOffsets = consumer.endOffsets(topicPartitionList); for (TopicPartition partitionInfo : endOffsets.keySet()) { endOffsetMap.put(partitionInfo.partition(), endOffsets.get(partitionInfo)); } //查詢消費 offset Map<Integer, Long> commitOffsetMap = new HashMap<>(); for (TopicPartition topicAndPartition : topicPartitionList) { OffsetAndMetadata committed = consumer.committed(topicAndPartition); commitOffsetMap.put(topicAndPartition.partition(), committed.offset()); } long endTime = System.currentTimeMillis(); log.info("查詢logSize和offset耗時:" + (new DecimalFormat("0.000")).format((endTime - startTime) / 1000.0) + " 秒"); startTime = System.currentTimeMillis(); //累加lag long totalLag = 0L; long logSize = 0L; long offset = 0L; if (endOffsetMap.size() == commitOffsetMap.size()) { for (Integer partition : endOffsetMap.keySet()) { long endOffset = endOffsetMap.get(partition); long commitOffset = commitOffsetMap.get(partition); long diffOffset = endOffset - commitOffset; totalLag += diffOffset; logSize += endOffset; offset += commitOffset; } } else { log.error("Topic:" + topic + " consumer:" + consumer + " topic partitions lost"); } log.info("Topic:" + topic + " logSize:" + logSize + " offset:" + offset + " totalLag:" + totalLag); if (lastTime > 0) { if (System.currentTimeMillis() - lastTime > 0) { speedLogSize = (long) ((logSize - lastLogSize) / ((System.currentTimeMillis() - lastTime) / 1000.0)); speedOffset = (long) ((offset - lastOffset) / ((System.currentTimeMillis() - lastTime) / 1000.0)); } if (speedLogSize > 0) { String strRatio = decimalFormat.format(speedOffset * 100 / (speedLogSize * 1.0)); lastRatio = Double.parseDouble(strRatio); log.info("Topic:" + topic + " speedLogSize:" + speedLogSize + " speedOffset:" + speedOffset + " 百分比:" + strRatio + "%"); } } lastTime = System.currentTimeMillis(); lastTotalLag = totalLag; lastLogSize = logSize; lastOffset = offset; endTime = System.currentTimeMillis(); log.info("計算耗時:" + (new DecimalFormat("0.000")).format((endTime - startTime) / 1000.0) + " 秒"); if (addToList) { this.setTime(simpleDateFormat.format(new Date())); this.list.add(new ConsumerInfo(this.getTopic(), this.getGroupId(), this.getLastTotalLag(), this.getLastRatio(), this.getSpeedLogSize(), this.getSpeedOffset(), this.getTime())); if (this.list.size() > 500) { this.list.remove(0); } } } catch (Exception e) { log.error("Monitor error", e); } } }View Code
MonitorService.java代碼:
package com.suncreate.kafkaConsumerMonitor.service; import com.suncreate.kafkaConsumerMonitor.model.ConsumerInfo; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Value; import org.springframework.stereotype.Service; import javax.annotation.PostConstruct; import java.util.*; @Service public class MonitorService { private static final Logger log = LoggerFactory.getLogger(MonitorService.class); @Value("${kafka.consumer.servers}") private String servers; @Autowired private ConsumerGroupsService consumerGroupsService; private List<Monitor> monitorList; @PostConstruct private void Init() { monitorList = new ArrayList<>(); monitorList.add(new Monitor(servers, consumerGroupsService, "wifiData", "wifi-kafka-hbase")); monitorList.add(new Monitor(servers, consumerGroupsService, "KK_PASS_INFO_TYCC", "EXTRACT-SAMPLE")); monitorList.add(new Monitor(servers, consumerGroupsService, "KK_PASS_INFO_TYCC", "dblrecog-upload2vcn")); monitorList.add(new Monitor(servers, consumerGroupsService, "KK_PASS_INFO_TYCC_FILTER", "yisa")); monitorList.add(new Monitor(servers, consumerGroupsService, "KK_PASS_INFO_TYCC_FILTER", "kafka-filter-check")); monitorList.add(new Monitor(servers, consumerGroupsService, "motorVehicle", "unifiedstorage-downloader")); monitorList.add(new Monitor(servers, consumerGroupsService, "motorVehicle", "full-vehicle-data-storage-kafka2ch")); monitorList.add(new Monitor(servers, consumerGroupsService, "motorVehicle", "vehicle_store")); monitorList.add(new Monitor(servers, consumerGroupsService, "motorVehicle", "vcn-sk-upload-luyang")); monitorList.add(new Monitor(servers, consumerGroupsService, "motorVehicle", "vcn-sk-upload-yaohai")); monitorList.add(new Monitor(servers, consumerGroupsService, "motorVehicle", "vcn-sk-upload-baohe")); monitorList.add(new Monitor(servers, consumerGroupsService, "peopleFace", "kafka-filter-check")); } public void monitorOnce(boolean addToList) { for (Monitor monitor : monitorList) { monitor.monitor(addToList); } } public List<ConsumerInfo> getConsumerList() { List<ConsumerInfo> list = new ArrayList<>(); for (Monitor monitor : monitorList) { list.add(new ConsumerInfo(monitor.getTopic(), monitor.getGroupId(), monitor.getLastTotalLag(), monitor.getLastRatio(), monitor.getSpeedLogSize(), monitor.getSpeedOffset(), monitor.getTime())); } return list; } public List<ConsumerInfo> getDetails(String topic, String groupId) { for (Monitor monitor : monitorList) { if (monitor.getTopic().equals(topic) && monitor.getGroupId().equals(groupId)) { return monitor.getList(); } } return new ArrayList<>(); } }View Code
ConsumerGroupsService.java代碼:
用于獲取kafka的topic下的所有消費者組,new Monitor傳的groupId引數可能不是消費者組的全稱,所以需要從topic的所有消費者組中匹配到全稱,
由于對接的是華為FusionInsight平臺的Kafka,所以需要使用帶身份認證的埠連接,才能使用AdminClient類獲取到所有消費者組,代碼里把不帶安全認證的埠21005換成帶安全認證的埠21007,
package com.suncreate.kafkaConsumerMonitor.service; import kafka.admin.AdminClient; import kafka.coordinator.group.GroupOverview; import org.apache.kafka.clients.admin.AdminClientConfig; import org.apache.kafka.common.TopicPartition; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Value; import org.springframework.stereotype.Service; import scala.collection.JavaConversions; import javax.annotation.PostConstruct; import java.util.*; @Service public class ConsumerGroupsService { private static final Logger log = LoggerFactory.getLogger(ConsumerGroupsService.class); @Value("${kafka.consumer.servers}") private String servers; private List<GroupOverview> groupListAll; @PostConstruct private void Init() { try { //身份認證 System.setProperty("java.security.auth.login.config", "/home/server/import/conf/jaas.conf"); System.setProperty("java.security.krb5.conf", "/home/server/import/conf/krb5.conf"); //System.setProperty("java.security.auth.login.config", "D:/Project/shiny/kafka-consumer-monitor/conf/jaas.conf"); //System.setProperty("java.security.krb5.conf", "D:/Project/shiny/kafka-consumer-monitor/conf/krb5.conf"); Properties properties = new Properties(); properties.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, servers.replace("21005", "21007")); properties.put(AdminClientConfig.SECURITY_PROTOCOL_CONFIG, "SASL_PLAINTEXT"); AdminClient client = AdminClient.create(properties); try { groupListAll = scala.collection.JavaConversions.seqAsJavaList(client.listAllGroupsFlattened().toSeq()); if (groupListAll != null) { log.info("ConsumerGroupsService Init 獲取所有消費者組 成功 groupListAll size=" + groupListAll.size()); } else { log.error("ConsumerGroupsService Init 獲取所有消費者組 失敗 groupListAll=null"); } } catch (Exception e) { log.error("ConsumerGroupsService Init 獲取所有消費者組 失敗", e); } finally { client.close(); } } catch (Exception e) { log.error("ConsumerGroupsService Init 失敗", e); } } public String getGroupId(String topic, String groupId) { java.util.Set<String> groups = getConsumerGroups(topic); for (String item : groups) { if (item.indexOf(groupId) >= 0) { return item; } } return groupId; } private java.util.Set<String> getConsumerGroups(String topic) { Properties properties = new Properties(); properties.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, servers.replace("21005", "21007")); properties.put(AdminClientConfig.SECURITY_PROTOCOL_CONFIG, "SASL_PLAINTEXT"); AdminClient client = AdminClient.create(properties); java.util.Set<String> groups = new HashSet<String>(); try { if (groupListAll != null) { for (GroupOverview overview : groupListAll) { String groupID = overview.groupId(); Map<TopicPartition, Object> offsets = JavaConversions.mapAsJavaMap(client.listGroupOffsets(groupID)); for (TopicPartition partition : offsets.keySet()) { if (partition.topic().equals(topic)) { groups.add(groupID); } } } } log.info("Topic:" + topic + " 消費者組集合:" + groups); } catch (Exception e) { log.error("getConsumerGroups error", e); } finally { client.close(); } return groups; } }View Code
MonitorConfig.java代碼:
package com.suncreate.kafkaConsumerMonitor.task; import com.suncreate.kafkaConsumerMonitor.service.MonitorService; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.context.annotation.Configuration; import org.springframework.scheduling.annotation.EnableScheduling; import org.springframework.scheduling.annotation.SchedulingConfigurer; import org.springframework.scheduling.config.ScheduledTaskRegistrar; import org.springframework.scheduling.support.CronTrigger; import java.text.SimpleDateFormat; @Configuration @EnableScheduling public class MonitorConfig implements SchedulingConfigurer { private static final Logger logger = LoggerFactory.getLogger(MonitorConfig.class); private String cronExpression = "0 */3 * * * ?"; //private String cronExpression = "*/20 * * * * ?"; private final SimpleDateFormat simpleDateFormat = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss.SSS"); @Autowired private MonitorService monitorService; @Override public void configureTasks(ScheduledTaskRegistrar taskRegistrar) { taskRegistrar.addTriggerTask(() -> { monitorService.monitorOnce(true); }, triggerContext -> new CronTrigger(cronExpression).nextExecutionTime(triggerContext)); } }View Code
MonitorController.java代碼:
package com.suncreate.kafkaConsumerMonitor.controller; import com.suncreate.kafkaConsumerMonitor.model.ConsumerInfo; import com.suncreate.kafkaConsumerMonitor.model.LayuiData; import com.suncreate.kafkaConsumerMonitor.service.MonitorService; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.web.bind.annotation.GetMapping; import org.springframework.web.bind.annotation.PostMapping; import org.springframework.web.bind.annotation.RequestMapping; import org.springframework.web.bind.annotation.RestController; import java.util.List; @RestController @RequestMapping("/monitor") public class MonitorController { @Autowired private MonitorService monitorService; @GetMapping("/getConsumers") public LayuiData getConsumers() { List<ConsumerInfo> list = monitorService.getConsumerList(); LayuiData data = new LayuiData(list); return data; } @GetMapping("/monitorOnce") public void monitorOnce() { monitorService.monitorOnce(false); } @GetMapping("/getDetails") public LayuiData getDetails(String topic, String groupId) { List<ConsumerInfo> list = monitorService.getDetails(topic, groupId); LayuiData data = new LayuiData(list); return data; } }View Code
pom.xml檔案(有些東西沒用到或者備用,沒有刪):
pom檔案中參考的jar包,跟開源的jar包版本完全一致,但jar包中的內容大不相同,所以必須參考華為平臺給的jar包才行,需要注意jar包依賴的jar包也不能使用開源jar包,一定要參考到華為平臺給的jar包,
<?xml version="1.0" encoding="UTF-8"?> <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> <modelVersion>4.0.0</modelVersion> <parent> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-parent</artifactId> <version>2.1.6.RELEASE</version> <relativePath/> <!-- lookup parent from repository --> </parent> <groupId>com.suncreate</groupId> <artifactId>kafka-consumer-monitor</artifactId> <version>1.0</version> <name>kafka-consumer-monitor</name> <description>Kafka消費積壓監控預警</description> <properties> <java.version>1.8</java.version> </properties> <dependencies> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-web</artifactId> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-test</artifactId> <scope>test</scope> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-jdbc</artifactId> </dependency> <dependency> <groupId>org.projectlombok</groupId> <artifactId>lombok</artifactId> <version>1.18.12</version> </dependency> <dependency> <groupId>com.alibaba</groupId> <artifactId>fastjson</artifactId> <version>1.2.54</version> </dependency> <dependency> <groupId>com.google.code.gson</groupId> <artifactId>gson</artifactId> <version>2.8.0</version> </dependency> <!-- postgresql --> <dependency> <groupId>org.postgresql</groupId> <artifactId>postgresql</artifactId> <scope>runtime</scope> </dependency> <!-- elasticsearch --> <dependency> <groupId>org.elasticsearch.client</groupId> <artifactId>elasticsearch-rest-high-level-client</artifactId> <version>6.1.4</version> </dependency> <!-- oracle --> <dependency> <groupId>com.oracle</groupId> <artifactId>ojdbc6</artifactId> <version>11.1.0.7.0</version> </dependency> <!-- kafka --> <dependency> <groupId>org.apache.kafka</groupId> <artifactId>kafka_2.11</artifactId> <version>0.11.0.1</version> <classifier>huawei</classifier> <exclusions> <exclusion> <groupId>org.slf4j</groupId> <artifactId>slf4j-api</artifactId> </exclusion> <exclusion> <groupId>org.slf4j</groupId> <artifactId>slf4j-log4j12</artifactId> </exclusion> <exclusion> <groupId>org.apache.zookeeper</groupId> <artifactId>zookeeper</artifactId> </exclusion> <exclusion> <groupId>org.apache.kafka</groupId> <artifactId>kafka-clients</artifactId> </exclusion> <exclusion> <groupId>org.apache.kafka</groupId> <artifactId>kafka-clients</artifactId> </exclusion> <exclusion> <groupId>net.sf.jopt-simple</groupId> <artifactId>jopt-simple</artifactId> </exclusion> <exclusion> <groupId>com.yammer.metrics</groupId> <artifactId>metrics-core</artifactId> </exclusion> <exclusion> <groupId>org.scala-lang</groupId> <artifactId>scala-library</artifactId> </exclusion> <exclusion> <groupId>com.101tec</groupId> <artifactId>zkclient</artifactId> </exclusion> <exclusion> <groupId>org.scala-lang.modules</groupId> <artifactId>scala-parser-combinators_2.11</artifactId> </exclusion> </exclusions> </dependency> <dependency> <groupId>org.apache.kafka</groupId> <artifactId>kafka-clients</artifactId> <version>0.11.0.1</version> <classifier>huawei</classifier> <exclusions> <exclusion> <groupId>org.slf4j</groupId> <artifactId>slf4j-api</artifactId> </exclusion> <exclusion> <groupId>org.slf4j</groupId> <artifactId>slf4j-log4j12</artifactId> </exclusion> <exclusion> <groupId>net.jpountz.lz4</groupId> <artifactId>lz4</artifactId> </exclusion> <exclusion> <groupId>org.xerial.snappy</groupId> <artifactId>snappy-java</artifactId> </exclusion> </exclusions> </dependency> <!-- kafka_2.11 依賴的jar包 --> <dependency> <groupId>net.sf.jopt-simple</groupId> <artifactId>jopt-simple</artifactId> <version>5.0.3</version> <classifier>huawei</classifier> </dependency> <dependency> <groupId>com.yammer.metrics</groupId> <artifactId>metrics-core</artifactId> <version>2.2.0</version> <classifier>huawei</classifier> <exclusions> <exclusion> <groupId>org.slf4j</groupId> <artifactId>slf4j-api</artifactId> </exclusion> <exclusion> <groupId>org.slf4j</groupId> <artifactId>slf4j-log4j12</artifactId> </exclusion> </exclusions> </dependency> <dependency> <groupId>org.scala-lang</groupId> <artifactId>scala-library</artifactId> <version>2.11.11</version> <classifier>huawei</classifier> </dependency> <dependency> <groupId>com.101tec</groupId> <artifactId>zkclient</artifactId> <version>0.10</version> <classifier>huawei</classifier> <exclusions> <exclusion> <groupId>org.slf4j</groupId> <artifactId>slf4j-api</artifactId> </exclusion> <exclusion> <groupId>org.slf4j</groupId> <artifactId>slf4j-log4j12</artifactId> </exclusion> <exclusion> <groupId>org.apache.zookeeper</groupId> <artifactId>zookeeper</artifactId> </exclusion> </exclusions> </dependency> <dependency> <groupId>org.scala-lang.modules</groupId> <artifactId>scala-parser-combinators_2.11</artifactId> <version>1.0.4</version> <classifier>huawei</classifier> <exclusions> <exclusion> <groupId>org.scala-lang</groupId> <artifactId>scala-library</artifactId> </exclusion> </exclusions> </dependency> <dependency> <groupId>org.slf4j</groupId> <artifactId>slf4j-api</artifactId> <version>1.7.25</version> <classifier>huawei</classifier> </dependency> <dependency> <groupId>org.slf4j</groupId> <artifactId>slf4j-log4j12</artifactId> <version>1.7.25</version> <classifier>huawei</classifier> </dependency> <dependency> <groupId>org.apache.zookeeper</groupId> <artifactId>zookeeper</artifactId> <version>3.5.1</version> <classifier>huawei</classifier> </dependency> <!-- kafka-clients 依賴的jar包 --> <dependency> <groupId>net.jpountz.lz4</groupId> <artifactId>lz4</artifactId> <version>1.3.0</version> <classifier>huawei</classifier> </dependency> <dependency> <groupId>org.xerial.snappy</groupId> <artifactId>snappy-java</artifactId> <version>1.1.2.6</version> <classifier>huawei</classifier> </dependency> </dependencies> <build> <plugins> <plugin> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-maven-plugin</artifactId> </plugin> <plugin> <groupId>org.apache.maven.plugins</groupId> <artifactId>maven-compiler-plugin</artifactId> <configuration> <source>8</source> <target>8</target> </configuration> </plugin> </plugins> </build> </project>View Code
前端使用了 Layui 和 ECharts 展示表格和圖表
index.css代碼:
.div-title { font-size: 18px; margin-top: 10px; margin-left: 10px; } .div-right { text-align: right; } .span-red { color: #ff0000; }View Code
index.html代碼(展示topic、消費者組Consumer GroupId、Total Lag、Kafka資料生產速度、Kafka資料消費速度等):
<!DOCTYPE html> <html lang="zh"> <head> <meta charset="UTF-8"> <title>Title</title> <link rel="stylesheet" href="css/index.css"> <link rel="stylesheet" href="js/layui-v2.6.8/css/layui.css" media="all"> <script type="text/javascript" src="js/jquery-1.7.1.js"></script> <script type="text/javascript" src="js/layui-v2.6.8/layui.js" charset="utf-8"></script> </head> <body> <div class="div-title">Kafka 監控 <button type="button" class="layui-btn layui-btn-sm" onclick="refreshTable()">重繪</button> </div> <table class="layui-hide" id="myTable"></table> <script type="text/javascript"> var myTable; layui.use('table', function () { var table = layui.table; myTable = table.render({ elem: '#myTable', url: '/home/monitor/getConsumers', cellMinWidth: 80, //全域定義常規單元格的最小寬度 cols: [[ {field: 'topic', width: 300, title: 'topic', sort: true}, {field: 'groupId', width: 300, title: 'groupId'}, { field: 'totalLag', width: 150, title: 'Total Lag', sort: true, templet: function (d) { if (d.delayDay * 24 > 2) { return '<div ><span >' + formatLongNum(d.totalLag) + '</span></div>' } else { return '<div ><span>' + formatLongNum(d.totalLag) + '</span></div>' } } }, { field: 'speedLogSize', width: 150, title: '生產速度(條/秒)', templet: function (d) { return '<div >' + d.speedLogSize + '</div>' } }, { field: 'speedOffset', width: 150, title: '消費速度(條/秒)', templet: function (d) { return '<div >' + d.speedOffset + '</div>' } }, { field: 'ratio', width: 100, title: '消費/生產', templet: function (d) { if (d.ratio < 90) { return '<div ><span >' + d.ratio + '%</span></div>' } else { return '<div ><span>' + d.ratio + '%</span></div>' } } }, { field: 'delayDay', width: 150, title: '積壓(天)', sort: true, templet: function (d) { if (d.delayDay * 24 > 2) { return '<div ><span >' + d.delayDay + '</span></div>' } else { return '<div ><span>' + d.delayDay + '</span></div>' } } }, { field: 'ope', width: 100, title: '操作', templet: function (d) { return '<a href="https://www.cnblogs.com/home/detail.html?topic=' + d.topic + '&groupId=' + d.groupId + '" target="_blank" >詳細</a>'; } } ]] }); }); function refreshTable() { if (myTable) { myTable.reload(); } } setInterval(function () { refreshTable(); }, 30000); function formatLongNum(num) { return (num + '').replace(/(\d{1,4})(?=(\d{4})+(?:$|\.))/g, '$1 ') } // setInterval(function () { // $.get("/home/monitor/monitorOnce"); // }, 30000); </script> </body> </html>View Code
detail.html代碼(展示單個消費者組的Total Lag、生產速度、消費速度以及Total Lag趨勢圖):
<!DOCTYPE html> <html lang="zh"> <head> <meta charset="UTF-8"> <title>Title</title> <link rel="stylesheet" href="css/index.css"> <link rel="stylesheet" href="js/layui-v2.6.8/css/layui.css" media="all"> <script type="text/javascript" src="js/jquery-1.7.1.js"></script> <script type="text/javascript" src="js/layui-v2.6.8/layui.js" charset="utf-8"></script> <script type="text/javascript" src="js/echarts-v4.7.0/echarts.min.js"></script> </head> <body> <div class="div-title"><span id="detailTitle"></span> 明細 <button type="button" class="layui-btn layui-btn-sm" onclick="refreshTable()">重繪</button> </div> <div id="main" style="height:400px;"></div> <table class="layui-hide" id="test"></table> <script type="text/javascript"> var myTable; var topic = getQueryVariable("topic"); var groupId = getQueryVariable("groupId"); $("#detailTitle").html(topic + " " + groupId); layui.use('table', function () { var table = layui.table; myTable = table.render({ elem: '#test', url: '/home/monitor/getDetails?topic=' + topic + '&groupId=' + groupId, cellMinWidth: 80, //全域定義常規單元格的最小寬度 initSort: { field: 'time', //排序欄位,對應 cols 設定的各欄位名 type: 'desc' //排序方式 asc: 升序、desc: 降序、null: 默認排序 }, cols: [[ {field: 'topic', width: 300, title: 'topic'}, {field: 'groupId', width: 300, title: 'groupId'}, {field: 'time', width: 180, title: '時間', sort: true}, { field: 'totalLag', width: 150, title: 'Total Lag', templet: function (d) { if (d.delayDay * 24 > 2) { return '<div ><span >' + formatLongNum(d.totalLag) + '</span></div>' } else { return '<div ><span>' + formatLongNum(d.totalLag) + '</span></div>' } } }, { field: 'speedLogSize', width: 150, title: '生產速度(條/秒)', templet: function (d) { return '<div >' + d.speedLogSize + '</div>' } }, { field: 'speedOffset', width: 150, title: '消費速度(條/秒)', templet: function (d) { return '<div >' + d.speedOffset + '</div>' } }, { field: 'ratio', width: 100, title: '消費/生產', templet: function (d) { if (d.ratio < 90) { return '<div ><span >' + d.ratio + '%</span></div>' } else { return '<div ><span>' + d.ratio + '%</span></div>' } } }, { field: 'delayDay', width: 150, title: '積壓(天)', templet: function (d) { if (d.delayDay * 24 > 2) { return '<div ><span >' + d.delayDay + '</span></div>' } else { return '<div ><span>' + d.delayDay + '</span></div>' } } } ]] }); }); function refreshTable() { if (myTable) { myTable.reload(); } showChart(); } setInterval(function () { refreshTable(); }, 30000); function formatLongNum(num) { return (num + '').replace(/(\d{1,4})(?=(\d{4})+(?:$|\.))/g, '$1 ') } function getQueryVariable(variable) { var query = window.location.search.substring(1); var vars = query.split("&"); for (var i = 0; i < vars.length; i++) { var pair = vars[i].split("="); if (pair[0] == variable) { return pair[1]; } } return (false); } function showChart() { $.ajax({ type: "GET", url: '/home/monitor/getDetails?topic=' + topic + '&groupId=' + groupId, success: function (data) { if (data && data.data && data.data.length > 1) { debugger; var chartDom = document.getElementById('main'); var myChart = echarts.init(chartDom); var option; var xAxis = []; var serseis = []; for (var i = 0; i < data.data.length; i++) { xAxis.push(data.data[i].time); serseis.push(data.data[i].totalLag); } option = { title: { show: true, text: "Total Lag 趨勢圖", x: 'center' }, xAxis: { type: 'category', data: xAxis }, yAxis: { type: 'value' }, series: [{ data: serseis, type: 'line' }] }; myChart.setOption(option); } } }); } showChart(); </script> </body> </html>View Code
原始碼:
https://files-cdn.cnblogs.com/files/s0611163/kafka-consumer-monitor.zip
效果圖:
消費者組串列:

消費者組明細:

轉載請註明出處,本文鏈接:https://www.uj5u.com/houduan/288736.html
標籤:Java
上一篇:演算法-堆疊佇列堆
