springboot 1.5.2 集成kafka的简单例子

网友投稿 459 2023-03-21


springboot 1.5.2 集成kafka的简单例子

本文介绍了springboot 1.5.2 集成kafka的简单例子 ,分享给大家,具体如下:

随着spring boot 1.5版本的发布,在spring项目中与kafka集成更为简便。

添加依赖

compile("org.springframework.kafka:spring-kafka:1.1.2.RELEASE")

添加application.properties

#kafka

# 指定kafka 代理地址,可以多个

spring.kafka.bootstrap-servers=192.168.59.130:9092,192.168.59.131:9092,192.168.59.132:9092

# 指定默认消费者group id

spring.kafka.consumer.group-id=myGroup

# 指定默认topic id

spring.kafka.template.default-topic= my-replicated-topic

# 指定listener 容器中的线程数,用于提高并发量

spring.kafka.listener.concurrency= 3

# 每次批量发送消息的数量

spring.kafka.producer.batch-size= 1000

configuraVBlPUiztion 启用kafka

package cn.xiaojf.today.data.kafka.configuration;

import org.springframework.context.annotation.Configuration;

import org.springframework.kafka.annotation.EnableKafka;

/**

* kafka 配置

* @author xiaojf 2017/3/24 14:09

*/

@Configuration

@EnableKafka

public class KafkaConfiguration {

}

消息生产者

package cn.xiaojf.today.data.kafka.producer;

import org.apache.kafka.clients.producer.Producer;

import org.apache.kafka.clients.producer.RecordMetadata;

import org.springframework.beans.factory.annotation.Autowired;

import org.springframework.kafka.core.KafkaOperations;

import org.springframework.kafka.core.KafkaTemplate;

import org.springframework.kafka.support.ProducerListener;

import org.springframework.stereotype.Component;

/**

* 消息生产者

* @author xiaojf 2017/3/24 14:36

*/

@Component

public class MsgProducer {

@Authttp://owired

private KafkaTemplate kafkaTemplate;

public void send() {

kafkaTemplate.send("my-replicated-topic","xiaojf");

kafkaTemplate.send("my-replicated-topic","xiaojf");

kafkaTemplate.metrics();

kafkaTemplate.execute(new KafkaOperations.ProducerCallback() {

@Override

public Object doInKafka(Producer producer) {

//这里可以编写kafka原生的api操作

return null;

}

});

//消息发送的监听器,用于回调返回信息

kafkaTemplate.setProducerListener(new ProducerListener() {

@Override

public void onSuccess(String topic, Integer partition, String key, String value, RecordMetadata recordMetadata) {

}

@Override

public void onError(String topic, Integer partition, String key, String value, Exception exception) {

}

@Override

public boolean isInterestedInSuccess() {

return false;

}

});

}

}

消息消费者

package cn.xiaojf.today.data.kafka.consumer;

import org.springframework.kafka.annotation.KafkaListener;

import org.springframework.stereotype.Component;

/**

* 消息消费者

* @author xiaojf 2017/3/24 14:36

*/

@Component

public class MsgConsumer {

@KafkaListener(topics = {"my-replicated-topic","my-replicated-topic2"})

public void processMessage(String content) {

System.out.println(content);

}

}


版权声明:本文内容由网络用户投稿,版权归原作者所有,本站不拥有其著作权,亦不承担相应法律责任。如果您发现本站中有涉嫌抄袭或描述失实的内容,请联系我们jiasou666@gmail.com 处理,核实后本网站将在24小时内删除侵权内容。

上一篇:Java中Switch用法代码示例
下一篇:查看路由器管理员密码(怎么查看路由器管理员密码)
相关文章

 发表评论

暂时没有评论,来抢沙发吧~