Spring Boot集成Kafka的示例代码

网友投稿 491 2023-02-09


Spring Boot集成Kafka的示例代码

本文介绍了Spring Boot集成Kafka的示例代码,分享给大家,也给自己留个笔记

系统环境

使用远程服务器上搭建的kafka服务

Ubuntu 16.04 LTS

kafka_2.12-0.11.0.0.tgz

zookeeper-3.5.2-alpha.tar.gz

集成过程

1.创建spring boot工程,添加相关依赖:

xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">

4.0.0

com.laravelshao.springboot

spring-boot-integration-kafka

0.0.1-SNAPSHOT

jar

spring-boot-integration-kafka

Demo project for Spring Boothttp://

org.springframework.boot

spring-boot-starter-parent

2.0.0.RELEASE

UTF-8

UTF-8

1.8

org.springframework.boot

spring-boot-starter

org.springframework.kafka

spring-kafka

org.springframework.boot

spring-boot-starter-json

org.springframework.boot

spring-boot-starter-test

test

org.springframework.boot

spring-boot-maven-plugin

xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">

4.0.0

com.laravelshao.springboot

spring-boot-integration-kafka

0.0.1-SNAPSHOT

jar

spring-boot-integration-kafka

Demo project for Spring Boothttp://

org.springframework.boot

spring-boot-starter-parent

2.0.0.RELEASE

UTF-8

UTF-8

1.8

org.springframework.boot

spring-boot-starter

org.springframework.kafka

spring-kafka

org.springframework.boot

spring-boot-starter-json

org.springframework.boot

spring-boot-starter-test

test

org.springframework.boot

spring-boot-maven-plugin

2.添加配置信息,这里使用yml文件

spring:

kafka:

bootstrap-servers:X.X.X.X:9092

producer:

value-serializer: org.springframework.kafka.support.serializer.JsonSerializer

consumer:

group-id: test

auto-offset-reset: earliest

value-deserializer: org.springframework.kafka.support.serializer.JsonDeserializer

properties:

spring:

json:

trusted:

packages: com.laravelshao.springboot.kafka

3.创建消息对象

public class Message {

private Integer id;

private String msg;

public Message() {

}

public Message(Integer id, String msg) {

this.id = id;

this.msg = msg;

}

public Integer getId() {

return id;

}

public void setId(Integer id) {

this.id = id;

}

public String getMsg() {

return msg;

}

public void setMsg(String msg) {

this.msg = msg;

}

@Override

public String toString() {

return "Message{" +

"id=" + id +

", msg='" + msg + '\'' +

'}';

}

}

4.创建生产者

package com.laravelshao.springboot.kafka;

import org.slf4j.Logger;

import org.slf4j.LoggerFactory;

import org.springframework.beans.factory.annotation.Autowired;

import org.springframework.kafka.core.KafkaTemplate;

import org.springframework.stereotype.Component;

/**

* Created by shaoqinghua on 2018/3/23.

*/

@Component

public class Producer {

private static Logger log = LoggerFactory.getLogger(Producer.class);

@Autowired

private KafkaTemplate kafkaTemplate;

public void send(String topic, Message message) {

kafkaTemplate.send(topic, message);

log.info("Producer->topic:{}, message:{}", topic, message);

}

}

5.创建消费者,使用@ KafkaListener注解监听主题

package com.laravelshao.springboot.kafka;

import org.apache.kafka.clients.consumer.ConsumerRecord;

import org.slf4j.Logger;

import org.slf4j.LoggerFactory;

import org.springframework.kafka.annotation.KafkaListener;

import org.springframework.stereotype.Component;

/**

* Created by shaoqinghua on 2018/3/23.

*/

@Component

public class Consumer {

private static Logger log = LoggerFactory.getLogger(Consumer.class);

@KafkaListener(topics = "test_topic")

public void receive(ConsumerRecord consumerRecord) {

log.info("Consumer->topic:{}, value:{}", consumerRecord.topic(), consumerRecord.value());

}

}

6.发送消费测试

package com.laravelshao.springboot;

import com.laravelshao.springboot.kafka.Message;

import com.laravelshao.springboot.kafka.Producer;

import org.springframework.boot.SpringApplication;

import org.springframework.boot.autoconfigure.SpringBootApplication;

import org.springframework.context.ApplicationContext;

@SpringBootApplication

public class IntegrationKafkaApplication {

public static void main(String[] args) throws InterruptedException {

ApplicationContext context = SpringApplication.run(IntegrationKafkaApplication.class, args);

Producer producer = context.getBean(Producer.class);

for (int i = 1; i < 10; i++) {

producer.send("test_topic", new Message(i, "test topic message " + i));

Thread.sleep(2000);

}

}

}

可以依次看到发送消息,消费消息

异常问题

反序列化异常(自定义的消息对象不在kafka信任的包路径下)?

[org.springframework.kafka.KafkaListenerEndpointContainer#0-0-C-1] ERROR org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.719 Container exception

org.apache.kafka.common.errors.SerializationException: Error deserializing key/value for partition test_topic-0 at offset 9. If needed, please seek past the record to continue consumption.

Caused by: java.lang.IllegalArgumentException: The class 'com.laravelshao.springboot.kafka.Message' is not in the trusted packages: [java.util, java.lang]. If you believe this class is safe to deserialize, please provide its name. If the serialization is only done by a trusted source, you can also enable trust all (*).

 at org.springframework.kafka.support.converter.DefaultJackson2JavaTypeMapper.getClassIdType(DefaultJackson2JavaTypeMapper.java:139)

 at org.springframework.kafka.support.converter.DefaultJackson2JavaTypeMapper.toJavaType(DefaultJackson2JavaTypeMapper.java:113)

 at org.springframework.kafka.support.serializer.JsonDeserializer.deserialize(JsonDeserializer.java:191)

 at org.apache.kafka.clients.consumer.internals.Fetcher.parseRecord(Fetcher.java:923)

 at org.apache.kafka.clients.consumer.internals.Fetcher.access$2600(Fetcher.java:93)

 at org.apache.kafka.clients.consumer.internals.Fetcher$PartitionRecords.fetchRecords(Fetcher.java:1100)

 at org.apache.kafka.clients.consumer.internals.Fetcher$PartitionRecords.access$1200(Fetcher.java:949)

 at org.apache.kafka.clients.consumer.internals.Fetcher.fetchRecords(Fetcher.java:570)

 at org.apache.kafka.clients.consumer.internals.Fetcher.fetchedRecords(Fetcher.java:531)

 at org.apache.kafka.clients.consumer.KafkaConsumer.pollOnce(KafkaConsumer.java:1146)

 at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1103)

 at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.run(KafkaMessageListenerContainer.java:667)

 at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)

 at java.util.concurrent.FutureTask.run(FutureTask.java:266)

 at java.lang.Thread.run(Thread.java:745)

解决方法:将当前包添加到kafka信任的包路径下

spring:

kafka:

consumer:

properties:

spring:

json:

trusted:

packages: com.laravelshao.springboot.kafka


版权声明:本文内容由网络用户投稿,版权归原作者所有,本站不拥有其著作权,亦不承担相应法律责任。如果您发现本站中有涉嫌抄袭或描述失实的内容,请联系我们jiasou666@gmail.com 处理,核实后本网站将在24小时内删除侵权内容。

上一篇:vue组件详解之使用slot分发内容
下一篇:angular2模块和共享模块详解
相关文章

 发表评论

暂时没有评论,来抢沙发吧~