Kafka简单客户端编程实例

网友投稿 265 2023-03-19


Kafka简单客户端编程实例

今天,我们给大家带来一篇如何利用Kafka的API进行客户端编程的文章,这篇文章很简单,就是利用Kafka的API创建一个生产者和消费者,生产者不断向Kafka写入消息,消费者则不断消费Kafka的消息。下面是具体的实例代码。

一、创建配置类Config

这个类很简单,只是存放了两个常量,一个是话题TOPIC,一个是线程数THREADS

package com.lya.kafka;

/**

* 配置项

* @author liuyazhuang

*

*/

public class Config {

/**

http://* 话题

*/

public static final String TOPIC = "wordcount";

/**

* 线程数

*/

public static final Integer THREADS = 1;

}

二、编程生产者类ProducerDemo

这个类的主要作用就是向Kafka写入相应的消息,并且将消息写入wordcount话题。

package com.lya.kafka;

import java.util.Properties;

import kafka.javaapi.producer.Producer;

import kafka.producer.KeyedMessage;

import kafka.producer.ProducerConfig;

/**

* 生产者实例

* @author liuyazhuang

*

*/

public class ProducerDemo {

http:// public static void main(String[] args) throws Exception {

Properties props = new Properties();

props.put("zk.connect", "192.168.209.121:2181");

props.put("metadata.broker.list","192.168.209.121:9092");

props.put("serializer.class", "kafka.serializer.StringEncoder");

props.put("zk.connectiontimeout.ms", "15000");

ProducerConfig config = new ProducerConfig(props);

Producer producer = new Producer(config);

// 发送业务消息

// 读取文件 读取内存数据库 读socket端口

for (int i = 1; i <= 100; i++) {

Thread.sleep(500);

producer.send(new KeyedMessage(Config.TOPIC,

"this number ===>>> " + i));

}

}

}

三、编写消息者类ConsumerDemo

这个类的主要作用就是消费Kafka中wordcount话题的消息。

package com.lya.kafka;

import java.util.HashMap;

import java.util.List;

import java.util.Map;

import java.util.Properties;

import kafka.consumer.Consumer;

import kafka.consumer.ConsumerConfig;

import kafka.consumer.KafkaStream;

import kafka.javaapi.consumer.ConsumerConnector;

import kafka.message.MessageAndMetadata;

/**

* 消费者实例

* @author liuyazhuang

*

*/

public class ConsumerDemo {

public static void main(String[] args) {

Properties props = new Properties();

props.put("zookeeper.connect", "192.168.209.121:2181");

props.put("group.id", "1111");

props.put("auto.offset.reset", "smallest");

props.put("zk.connectiontimeout.ms", "15000");

ConsumerConfig config = new ConsumerConfig(props);

ConsumerConnector consumer =Consumer.createJavaConsumerConnector(config);

Map topicCountMap = new HashMap();

topicCountMap.put(Config.TOPIC, Config.THREADS);

Map>> consumerMap = consumer.createMessageStreams(topicCountMap);

List> streams = consumerMap.get(Config.TOPIC);

for(final KafkaStream kafkaStream : streams){

new Thread(new Runnable() {

@Override

public void run() {

for(MessageAndMetadata mm : kafkaStream){

String msg = new String(mm.message());

System.out.println(msg);

}

}

}).start();

}

}

}

四、运行实例

首先,运行消费者类ConsumerDemo

运行结果如下:

没有打印任何信息。

此时,我们运行生产者类ProducerDemo

我们再次打开消费者的控制台查看如下:

打印出了生产者生产的消息。

至此,VpRMyKafka简单客户端编程实例结束。


版权声明:本文内容由网络用户投稿,版权归原作者所有,本站不拥有其著作权,亦不承担相应法律责任。如果您发现本站中有涉嫌抄袭或描述失实的内容,请联系我们jiasou666@gmail.com 处理,核实后本网站将在24小时内删除侵权内容。

上一篇:java制作简单验证码功能
下一篇:api接口文档的作用(api接口文档的作用有哪些)
相关文章

 发表评论

暂时没有评论,来抢沙发吧~