spring 整合kafka监听消费的配置过程

网友投稿 333 2022-11-01


spring 整合kafka监听消费的配置过程

前言

最近项目里有个需求,要消费kafka里的数据。之前也手动写过代码去消费kafka数据。但是转念一想。既然spring提供了消费kafka的方法。就没必要再去重复造轮子。于是尝试使用spring的API。

项目技术背景,使用springMVC,XML配置和注解相互使用。kafka的配置都是使用XML方式。

整合过程

1. 引入spring-kafka的依赖包

http://

org.springframework.kafka

spring-kafka

2.2.0.RELEASE

2. 在spring的xml文件里增加配置项,也可以单独创建一个spring-context-XX.xml文件。

<map>

value="org.apache.kafka.common.serialization.StringDeserializer" />

value="org.apache.kafka.common.serialization.StringDeserializer" />

class="com.chao.service.consumer.PayPalConsumer" />

init-method="doStart">

2. 自定义消费者类,消费者类依然可以使用注解。

/**

* get msg from kafka

*/

@Component

public class PayPalConsumer implements MessageListener {

private static Logger logger = LoggerFactory.getLogger(PayPalConsumer.class);

@Autowired

private XXService XXService;

@Override

public void onMessage(ConsumerRecord authorizeRecord) {

String value = authorizeRecord.value();

if (StringUtils.isEmpty(value)){

logger.warn("receive message from kafka is null");

return;

}

logger.info("receive message from kafka is {}",value);

}

}

使用这个步骤配置,一次性过。非常顺利。

value="org.apache.kafka.common.serialization.StringDeserializer" />

value="org.apache.kafka.common.serialization.StringDeserializer" />

class="com.chao.service.consumer.PayPalConsumer" />

init-method="doStart">

2. 自定义消费者类,消费者类依然可以使用注解。

/**

* get msg from kafka

*/

@Component

public class PayPalConsumer implements MessageListener {

private static Logger logger = LoggerFactory.getLogger(PayPalConsumer.class);

@Autowired

private XXService XXService;

@Override

public void onMessage(ConsumerRecord authorizeRecord) {

String value = authorizeRecord.value();

if (StringUtils.isEmpty(value)){

logger.warn("receive message from kafka is null");

return;

}

logger.info("receive message from kafka is {}",value);

}

}

使用这个步骤配置,一次性过。非常顺利。

value="org.apache.kafka.common.serialization.StringDeserializer" />

class="com.chao.service.consumer.PayPalConsumer" />

class="com.chao.service.consumer.PayPalConsumer" />

init-method="doStart">

init-method="doStart">

2. 自定义消费者类,消费者类依然可以使用注解。

/**

* get msg from kafka

*/

@Component

public class PayPalConsumer implements MessageListener {

private static Logger logger = LoggerFactory.getLogger(PayPalConsumer.class);

@Autowired

private XXService XXService;

@Override

public void onMessage(ConsumerRecord authorizeRecord) {

String value = authorizeRecord.value();

if (StringUtils.isEmpty(value)){

logger.warn("receive message from kafka is null");

return;

}

logger.info("receive message from kafka is {}",value);

}

}

使用这个步骤配置,一次性过。非常顺利。


版权声明:本文内容由网络用户投稿,版权归原作者所有,本站不拥有其著作权,亦不承担相应法律责任。如果您发现本站中有涉嫌抄袭或描述失实的内容,请联系我们jiasou666@gmail.com 处理,核实后本网站将在24小时内删除侵权内容。

上一篇:并查集实践
下一篇:TopK问题
相关文章

 发表评论

暂时没有评论,来抢沙发吧~