Springboot 2.x集成kafka 2.2.0的示例代码(springboot常用注解)

网友投稿 309 2022-08-01


目录引言基本环境代码编写1、基本引用pom2、基本配置3、实体类4、生产者端5、消费者6、测试效果展示遇到的问题

引言

kafka近几年更新非常快,也可以看出kafka在企业中是用的频率越来越高,在springboot中集成kafka还是比较简单的,但是应该注意使用的版本和kafka中基本配置,这个地方需要信心,防止进入坑中。

版本对应地址:https://spring.io/projects/spring-kafka

基本环境

springboot版本2.1.4

kafka版本2.2.0

jdk 1.8

代码编写

1、基本引用pom

xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">

4.0.0

org.springframework.boot

spring-boot-starter-parent

2.1.4.RELEASE

com.example

demo

0.0.1-SNAPSHOT

kafkademo

Demo project for Spring Boot

1.8

org.springframework.boot

spring-boot-starter-web

mysql

mysql-connector-java

runtime

org.springframework.boot

spring-boot-starter-test

test

org.springframework.kafka

spring-kafka

2.2.0.RELEASE

com.google.code.gson

gson

2.7

org.springframework.boot

spring-EzpCHVtvwEboot-maven-plugin

xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">

4.0.0

org.springframework.boot

spring-boot-starter-parent

2.1.4.RELEASE

com.example

demo

0.0.1-SNAPSHOT

kafkademo

Demo project for Spring Boot

1.8

org.springframework.boot

spring-boot-starter-web

mysql

mysql-connector-java

runtime

org.springframework.boot

spring-boot-starter-test

test

org.springframework.kafka

spring-kafka

2.2.0.RELEASE

com.google.code.gson

gson

2.7

org.springframework.boot

spring-EzpCHVtvwEboot-maven-plugin

2、基本配置

spring.kafka.bootstrap-servers=2.1.1.1:9092

spring.kafka.consumer.group-id=test-consumer-group

spring.kafka.consumer.key-deserializer=org.apache.kafka.common.serialization.StringDeserializer

spring.kafka.consumer.value-deserializer=org.apache.kafka.common.serialization.StringDeserializer

spring.kafka.producer.key-serializer=org.apache.kafka.common.serialization.StringSerializer

spring.kafka.producer.value-serializer=org.apache.kafka.common.serialization.StringSerializer

#logging.level.root=debug

3、实体类

package com.example.demo.model;

import java.util.Date;

public class Messages {

private Long id;

private String msg;

private Date sendTime;

public Long getId() {

return id;

}

public void setId(Long id) {

this.id = id;

}

public String getMsg() {

return msg;

}

public void setMsg(String msg) {

this.msg = msg;

}

public Date getSendTime() {

return sendTime;

}

public void setSendTime(Date sendTime) {

this.sendTime = sendTime;

}

}

4、生产者端

package com.example.demo.service;

import com.example.demo.model.Messages;

import com.google.gson.Gson;

import com.google.gson.GsonBuilder;

import org.springframework.beans.factory.annotation.Autowired;

import org.springframework.kafka.core.KafkaTemplate;

import org.springframework.kafka.support.SendResult;

import org.springframework.stereotype.Service;

import org.springframework.util.concurrent.ListenableFuture;

import java.util.Date;

import java.util.UUID;

@Service

public class KafkaSender {

@Autowired

private KafkEzpCHVtvwEaTemplate kafkaTemplate;

private Gson gson = new GsonBuilder().create();

public void send() {

Messages message = new Messages();

message.setId(System.currentTimeMillis());

message.setMsg("123");

message.setSendTime(new Date());

ListenableFuture> test0 = kafkaTemplate.send("newtopic", gson.tojson(message));

}

}

5、消费者

package com.example.demo.service;

import org.apache.kafka.clients.consumer.ConsumerRecord;

import org.springframework.kafka.annotation.KafkaListener;

import org.springframework.stereotype.Service;

import java.util.Optional;

@Service

public class KafkaReceiver {

@KafkaListener(topics = {"newtopic"})

public void listen(ConsumerRecord, ?> record) {

Optional> kafkaMessage = Optional.ofNullable(record.value());

if (kafkaMessage.isPresent()) {

Object message = kafkaMessage.get();

System.out.println("record =" + record);

System.out.println("message =" + message);

}

}

}

6、测试

在启动方法中模拟消息生产者,向kafka中发送消息

package com.example.demo;

import com.example.demo.service.KafkaSender;

import org.springframework.boot.SpringApplication;

import org.springframework.boot.autoconfigure.SpringBootApplication;

import org.springframework.context.ConfigurableApplicationContext;

@SpringBootApplication

public class KafkademoApplication {

public static void main(String[] args) {

ConfigurableApplicationContext context = SpringApplication.run(KafkademoApplication.class, args);

KafkaSender sender = context.getBean(KafkaSender.class);

for (int i = 0; i <1000; i++) {

sender.send();

try {

Thread.sleep(300);

} catch (InterruptedException e) {

e.printStackTrace();

}

}

}

}

效果展示

命令行直接消费消息

遇到的问题

生产端连接kafka超时

at org.apache.kafka.common.network.NetworkReceive.readFrom(NetworkReceive.java:119)

解决方案:

修改kafka中的server.properties中的下面配置,将原来的默认配置替换成下面ip+端口的形式,重启kafka


版权声明:本文内容由网络用户投稿,版权归原作者所有,本站不拥有其著作权,亦不承担相应法律责任。如果您发现本站中有涉嫌抄袭或描述失实的内容,请联系我们jiasou666@gmail.com 处理,核实后本网站将在24小时内删除侵权内容。

上一篇:基于kafka实现Spring Cloud Bus消息总线(Kafka api)
下一篇:Java 导出Excel增加下拉框选项
相关文章

 发表评论

暂时没有评论,来抢沙发吧~