Java Kafka 消费积压监控的示例代码

网友投稿 429 2022-10-14


Java Kafka 消费积压监控的示例代码

后端代码:

Monitor.java代码:

package com.suncreate.kafkaConsumerMonitor.service;

import com.suncreate.kafkaConsumerMonitor.model.ConsumerInfo;

import org.apache.kafka.clients.consumer.ConsumerConfig;

import org.apache.kafka.clients.consumer.KafkaConsumer;

import org.apache.kafka.clients.consumer.OffsetAndMetadata;

import org.apache.kafka.common.PartitionInfo;

import org.apache.kafka.common.TopicPartition;

import org.apache.kafka.common.serialization.StringDeserializer;

import org.slf4j.Logger;

import org.slf4j.LoggerFactory;

import java.text.DecimalFormat;

import java.text.SimpleDateFormat;

import java.util.*;

/**

* kafka消费监控

*

* @author suxiang

*/

public class Monitor {

private static final Logger log = LoggerFactory.getLogger(Monitor.class);

private String servers;

private String topic;

private String groupId;

private long lastTime;

private long lastTotalLag = 0L;

private long lastLogSize = 0L;

private long lastOffset = 0L;

private double lastRatio = 0;

private long speedLogSize = 0L;

private long speedOffset = 0L;

private String time;

private List list;

private final SimpleDateFormat simpleDateFormat = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");

public String getTime() {

return time;

}

public void setTime(String time) {

this.time = time;

}

public long getLastTotalLag() {

return lastTotalLag;

}

public double getLastRatio() {

return lastRatio;

}

public String getTopic() {

return topic;

}

public String getGroupId() {

return groupId;

}

public long getSpeedLogSize() {

return speedLogSize;

}

public long getSpeedOffset() {

return speedOffset;

}

public List getList() {

return list;

}

public void setList(List list) {

this.list = list;

}

private KafkaConsumer consumer;

private List topicPartitionList;

private final DecimalFormat decimalFormat = new DecimalFormat("0.00");

public Monitor(String servers, String topic, String groupId) {

this.servers = servers;

this.topic = topic;

this.groupId = groupId;

this.list = new ArrayList<>();

//消费者

Properties properties = new Properties();

properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, this.servers);

properties.put(ConsumerConfig.GROUP_ID_CONFIG, this.groupId);

properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);

properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);

consumer = new KafkaConsumer(properties);

//查询 topic partitions

topicPartitionList = new ArrayList<>();

List partitionInfoList = consumer.partitionsFor(topic);

for (PartitionInfo partitionInfo : partitionInfoList) {

TopicPartition topicPartition = new TopicPartition(partitionInfo.topic(), partitionInfo.partition());

topicPartitionList.add(topicPartition);

}

}

public void monitor(boolean addToList) {

try {

long startTime = System.currentTimeMillis();

//查询 log size

Map endOffsetMap = new HashMap<>();

Map endOffsets = consumer.endOffsets(topicPartitionList);

for (TopicPartition partitionInfo : endOffsets.keySet()) {

endOffsetMap.put(partitionInfo.partition(), endOffsets.get(partitionInfo));

}

//查询消费 offset

Map commitOffsetMap = new HashMap<>();

for (TopicPartition topicAndPartition : topicPartitionList) {

OffsetAndMetadata committed = consumer.committed(topicAndPartition);

commitOffsetMap.put(topicAndPartition.partition(), committed.offset());

}

long endTime = System.currentTimeMillis();

log.info("查询logSize和offset耗时:" + (new DecimalFormat("0.000")).format((endTime - startTime) / 1000.0) + " 秒");

startTime = System.currentTimeMillis();

//累加lag

long totalLag = 0L;

long logSize = 0L;

long offset = 0L;

if (endOffsetMap.size() == commitOffsetMap.size()) {

for (Integer partition : endOffsetMap.keySet()) {

long endOffset = endOffsetMap.get(partition);

long commitOffset = commitOffsetMap.get(partition);

long diffOfhttp://fset = endOffset - commitOffset;

totalLag += diffOffset;

logSize += endOffset;

offset += commitOffset;

}

} else {

log.error("Topic:" + topic + " consumer:" + consumer + " topic partitions lost");

}

log.info("Topic:" + topic + " logSize:" + logSize + " offset:" + offset + " totalLag:" + totalLag);

if (lastTime > 0) {

if (System.currentTimeMillis() - lastTime > 0) {

speedLogSize = (long) ((logSize - lastLogSize) / ((System.currentTimeMillis() - lastTime) / 1000.0));

speedOffset = (long) ((offset - lastOffset) / ((System.currentTimeMillis() - lastTime) / 1000.0));

}

if (speedLogSize > 0) {

String strRatio = decimalFormat.format(speedOffset * 100 / (speedLogSize * 1.0));

lastRatio = Double.parseDouble(strRatio);

log.info("Topic:" + topic + " speedLogSize:" + speedLogSize + " speedOffset:" + speedOffset + " 百分比:" + strRatio + "%");

}

}

lastTime = System.currentTimeMillis();

lastTotalLag = totalLag;

lastLogSize = logSize;

lastOffset = offset;

endTime = System.currentTimeMillis();

log.info("计算耗时:" + (new DecimalFormat("0.000")).format((endTime - startTime) / 1000.0) + " 秒");

if (addToList) {

this.setTime(simpleDateFormat.format(new Date()));

this.list.add(new ConsumerInfo(this.getTopic(), this.getGroupId(), this.getLastTotalLag(), this.getLastRatio(), this.getSpeedLogSize(), this.getSpeedOffset(), this.getTime()));

if (this.list.size() > 500) {

this.list.remove(0);

}

}

} catch (Exception e) {

log.error("Monitor error", e);

}

}

}

MonitorService.java代码:

package com.suncreate.kafkaConsumerMonitor.service;

import com.suncreate.kafkaConsumerMonitor.model.ConsumerInfo;

import org.slf4j.Logger;

import org.slf4j.LoggerFactory;

import org.springframework.beans.factory.annotation.Value;

import org.springframework.stereotype.Service;

import javax.annotation.PostConstruct;

import java.util.*;

@Service

public class MonitorService {

private static final Logger log = LoggerFactory.getLogger(MonitorService.class);

@Value("${kafka.consumer.servers}")

private String servers;

private Monitor monitor;

private List monitorList;

@PostConstruct

private void Init() {

monitorList = new ArrayList<>();

monitorList.add(new Monitor(servers, "wifiData", "wifi-kafka-hbase"));

monitorList.add(new Monitor(servers, "KK_PASS_INFhttp://O_TYCC", "EXTRACT-SAMPLE"));

monitorList.add(new Monitor(servers, "KK_PASS_INFO_TYCC", "dblrecog-upload2vcn"));

monitorList.add(new Monitor(servers, "KK_PASS_INFO_TYCC_FILTER", "yisa20210521000001"));

monitorList.add(new Monitor(servers, "KK_PASS_INFO_TYCC_FILTER", "kafka-filter-check-19"));

monitorList.add(new Monitor(servers, "motorVehicle", "unifiedstorage-downloader"));

monitorList.add(new Monitor(servers, "motorVehicle", "full-vehicle-data-storage-kafka2ch"));

monitorList.add(new Monitor(servers, "motorVehicle", "vehicle_store"));

monitorList.add(new Monitor(servers, "motorVehicle", "vcn-sk-upload-luyang"));

monitorList.add(new Monitor(servers, "motorVehicle", "vcn-sk-upload-yaohai"));

monitorList.add(new Monitor(servers, "motorVehicle", "vcn-sk-upload-baohe"));

monitorList.add(new Monitor(servers, "peopleFace", "kafka-filter-check-19"));

}

public void monitorOnce(boolean addToList) {

for (Monitor monitor : monitorList) {

monitor.monitor(addToList);

}

}

public List getConsumerList() {

List list = new ArrayList<>();

for (Monitor monitor : monitorList) {

list.add(new ConsumerInfo(monitor.getTopic(), monitor.getGroupId(), monitor.getLastTotalLag(), monitor.getLastRatio(), monitor.getSpeedLogSize(), monitor.getSpeedOffset(), monitor.getTime()));

}

return list;

}

public List getDetails(String topic, String groupId) {

for (Monitor monitor : monitorList) {

if (monitor.getTopic().equals(topic) && monitor.getGroupId().equals(groupId)) {

return monitor.getList();

}

}

return new ArrayList<>();

}

}

MonitorConfig.java代码:

package com.suncreate.kafkaConsumerMonitor.task;

import com.suncreate.kafkaConsumerMonitor.service.MonitorService;

import org.slf4j.Logger;

import org.slf4j.LoggerFactory;

import org.springframework.beans.factory.annotation.Autowired;

import org.springframework.context.annotation.Configuration;

import org.springframework.scheduling.annotation.EnableScheduling;

import org.springframework.scheduling.annotation.SchedulingConfigurer;

import org.springframework.scheduling.config.ScheduledTaskRegistrar;

import org.springframework.scheduling.support.CronTrigger;

import java.text.SimpleDateFormat;

@Configuration

@EnableScheduling

public class MonitorConfig implements SchedulingConfigurer {

private static final Logger logger = LoggerFactory.getLogger(MonitorConfig.class);

private String cronExpression = "0 */3 * * * ?";

//private String cronExpression = "*/20 * * * * ?";

private final SimpleDateFormat simpleDateFormat = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss.SSS");

@Autowired

private MonitorService monitorService;

@Override

public void configureTasks(ScheduledTaskRegistrar taskRegistrar) {

taskRegistrar.addTriggerTask(() -> {

monitorService.monitorOnce(true);

}, triggerContext -> new CronTrigger(cronExpression).nextExecutionTime(triggerContext));

}

}

MonitorController.java代码:

package com.suncreate.kafkaConsumerMonitor.controller;

import com.suncreate.kafkaConsumerMonitor.model.ConsumerInfo;

import com.suncreate.kafkaConsumerMonitor.model.LayuiData;

import com.suncreate.kafkaConsumerMonitor.service.MonitorService;

import org.springframework.beans.factory.annotation.Autowired;

import org.springframework.web.bind.annotation.GetMapping;

import org.springframework.web.bind.annotation.PostMapping;

import org.springframework.web.bind.annotation.RequestMapping;

import org.springframework.web.bind.annotation.RestController;

import java.util.List;

@RestController

@RequestMapping("/monitor")

public class MonitorController {

@Autowired

private MonitorService monitorService;

@GetMapping("/getConsumers")

public LayuiData getConsumers() {

List list = monitorService.getConsumerList();

LayuiData data = new LayuiData(list);

return data;

}

@GetMapping("/monitorOnce")

public void monitorOnce() {

monitorService.monitorOnce(false);

}

@GetMapping("/getDetails")

public LayuiData getDetails(String topic, String groupId) {

List list = monitorService.getDetails(topic, groupId);

LayuiData data = new LayuiData(list);

return data;

}

}

pom.xml文件(有些东西没用到或者备用,没有删):

xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">

4.0.0

org.springframework.boot

spring-boot-starter-parent

2.1.6.RELEASE

com.suncreate

kafka-consumer-monitor

1.0

kafka-consumer-monitor

Kafka消费积压监控预警

1.8

6.1.4

org.projectlombok

lombok

1.18.12

com.alibaba

fastjson

1.2.54

org.springframework.boot

spring-boot-starter-web

org.springframework.boot

spring-boot-starter-test

test

org.springframework.kafka

spring-kafka-test

test

com.google.code.gson

gson

2.8.0

org.postgresql

postgresql

runtime

org.springframework.boot

spring-boot-starter-jdbc

org.elasticsearch.client

elasticsearch-rest-high-level-client

6.1.4

com.oracle

ojdbc6

11.1.0.7.0

org.apache.kafka

kafka_2.11

0.11.0.1

org.apache.kafka

kafka-clients

0.11.0.1

org.springframework.boot

spring-boot-maven-plugin

org.apache.maven.plugins

maven-compiler-plugin

8

8

xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">

4.0.0

org.springframework.boot

spring-boot-starter-parent

2.1.6.RELEASE

com.suncreate

kafka-consumer-monitor

1.0

kafka-consumer-monitor

Kafka消费积压监控预警

1.8

6.1.4

org.projectlombok

lombok

1.18.12

com.alibaba

fastjson

1.2.54

org.springframework.boot

spring-boot-starter-web

org.springframework.boot

spring-boot-starter-test

test

org.springframework.kafka

spring-kafka-test

test

com.google.code.gson

gson

2.8.0

org.postgresql

postgresql

runtime

org.springframework.boot

spring-boot-starter-jdbc

org.elasticsearch.client

elasticsearch-rest-high-level-client

6.1.4

com.oracle

ojdbc6

11.1.0.7.0

org.apache.kafka

kafka_2.11

0.11.0.1

org.apache.kafka

kafka-clients

0.11.0.1

org.springframework.boot

spring-boot-maven-plugin

org.apache.maven.plugins

maven-compiler-plugin

8

8

前端使用了 Layui 和 ECharts 展示表格和图表

index.css代码:

.div-title {

font-size: 18px;

margin-top: 10px;

margin-left: 10px;

}

.div-right {

text-align: right;

}

.span-red {

color: #ff0000;

}

index.html代码(展示topic、消费者组Consumer GroupId、Total Lag、Kafka数据生产速度、Kafka数据消费速度等):

detail.html代码(展示单个消费者组的Total Lag、生产速度、消费速度以及Total Lag趋势图):

效果图:

消费者组列表:

消费者组明细:


版权声明:本文内容由网络用户投稿,版权归原作者所有,本站不拥有其著作权,亦不承担相应法律责任。如果您发现本站中有涉嫌抄袭或描述失实的内容,请联系我们jiasou666@gmail.com 处理,核实后本网站将在24小时内删除侵权内容。

上一篇:防水多功能网关在智慧灯杆的应用优势
下一篇:TCP往返传输时间(RTT)的估计
相关文章

 发表评论

暂时没有评论,来抢沙发吧~