Kafka Java Producer代码实例详解

网友投稿 365 2022-12-05


Kafka Java Producer代码实例详解

根据业务需要可以使用Kafka提供的java Producer API进行产生数据,并将产生的数据发送到Kafka对应Topic的对应分区中,入口类为:Producer

Kafka的Producer API主要提供下列三个方法:

public void send(KeyedMessage message) 发送单条数据到Kafka集群

  public void send(List> messages) 发送多条数据(数据集)到Kafka集群

  public void close() 关闭Kafka连接资源

一、JavaKafkaProducerPartitioner:自定义的数据分区器,功能是:决定输入的key/value键值对的message发送到Topic的那个分区中,返回分区id,范围:[0,分区数量); 这里的实现比较简单,根据key中的数字决定分区的值。具体代码如下:

import kafka.producer.Partitioner;

import kafka.utils.VerifiableProperties;

/**

* Created by gerry on 12/21.

*/

public class JavaKafkaProducerPartitioner implements Partitioner {

/**

* 无参构造函数

*/

public JavaKafkaProducerPartitioner() {

this(new VerifiableProperties());

}

/**

* 构造函数,必须给定

*

* @param properties 上下文

*/

public JavaKafkaProducerPartitioner(VerifiableProperties properties) {

// nothings

}

@Override

public int partition(Object key, int numPartitions) {

int num = Integer.valueOf(((String) key).replaceAll("key_", "").trim());

return num % numPartitions;

}

}

二、 JavaKafkaProducer:通过Kafka提供的API进行数据产生操作的测试类;具体代码如下:

import kafka.javaapi.producer.Producer;

import kafka.producer.KeyedMessage;

import kafka.producer.ProducerConfig;

import org.apache.log4j.Logger;

import java.util.Properties;

import java.util.concurrent.ExecutorService;

import java.util.concurrent.Executors;

import java.util.concurrent.TimeUnit;

import java.util.concurrent.atomic.AtomicBoolean;

import java.util.concurrent.ThreadLocalRandom;

/**

* Created by gerry on 12/21.

*/

public class JavaKafkaProducer {

private Logger logger = Logger.getLogger(JavaKafkaProducer.class);

public static final String TOPIC_NAME = "test";

public static final char[] charts = "qazwsxedcrfvtgbyhnujmikolp1234567890".toCharArray();

public static final int chartsLength = charts.length;

public static void main(String[] args) {

String brokerList = "192.168.187.149:9092";

http:// brokerList = "192.168.187.149:9092,192.168.187.149:9093,192.168.187.149:9094,192.168.187.149:9095";

brokerList = "192.168.187.146:9092";

Properties props = new Properties();

props.put("metadata.broker.list", brokerList);

/**

* 0表示不等待结果返回

* 1表示等待至少有一个服务器返回数据接收标识

* -1表示必须接收到所有的服务器返回标识,及同步写入

* */

props.put("request.required.acks", "0");

/**

* 内部发送数据是异步还是同步

* sync:同步, 默认

* async:异步

*/

props.put("producer.type", "async");

/**

* 设置序列化的类

* 可选:kafka.serializer.StringEncoder

* 默认:kafka.serializer.DefaultEncoder

*/

props.put("serializer.class", "kafka.serializer.StringEncoder");

/**

* 设置分区类

* 根据key进行数据分区

* 默认是:kafka.producer.DefaultPartitioner ==> 安装key的hash进行分区

* 可选:kafka.serializer.ByteArrayPartitioner ==> 转换为字节数组后进行hash分区

*/

props.put("partitioner.class", "JavaKafkaProducerPartitioner");

// 重试次数

props.put("message.send.max.retries", "3");

// 异步提交的时候(async),并发提交的记录数

props.put("batch.num.messages", "200");

// 设置缓冲区大小,默认10KB

props.put("send.buffer.bytes", "102400");

// 2. 构建Kafka Producer Configuration上下文

ProducerConfig config = new ProducerConfig(props);

// 3. 构建Producer对象

final Producer producer = new Producer(config);

// 4. 发送数据到服务器,并发线程发送

final AtomicBoolean flag = new AtomicBoolean(true);

int numThreads = 50;

ExecutorService pool = Executors.newFixedThreadPool(numThreads);

for (int i = 0; i < 5; i++) {

pool.submit(new Thread(new Runnable() {

@Override

public void run() {

while (flag.get()) {

// 发送数据

KeyedMessage message = generateKeyedMessage();

producer.send(message);

System.out.println("发送数据:" + message);

// 休眠一下

try {

int least = 10;

int bound = 100;

Thread.sleep(ThreadLocalRandom.current().nextInt(least, bound));

} catch (InterruptedException e) {

e.printStackTrace();

}

}

System.out.println(Thread.currentThread().getName() + " shutdown....");

}

}, "Thread-" + i));

}

// 5. 等待执行完成

long sleepMillis = 600000;

try {

Thread.sleep(sleepMillis);

} catch (InterruptedException e) {

e.printStackTrace();

}

flag.set(false);

// 6. 关闭资源

pool.shutdown();

try {

pool.awaitTermination(6, TimeUnit.SECONDS);

} catch (InterruptedException e) {

} finally {

producer.close(); // 最后之后调用

}

}

/**

* 产生一个消息

*

* @return

*/

private static KeyedMessage generateKeyedMessage() {

String key = "key_" + ThreadLocalRandom.current().nextInt(10, 99);

StringBuilder sb = new StringBuilder();

int num = ThreadLocalRandom.current().nextInt(1, 5);

for (int i = 0; i < num; i++) {

sb.append(generateStringMessage(ThreadLocalRandom.current().nextInt(3, 20))).append(" ");

}

String message = sb.toString().trim();

return new KeyedMessage(TOPIC_NAME, key, message);

}

/**

* 产生一个给定长度的字符串

*

* @param numItems

* @return

*/

private static String generateStringMessage(int numItems) {

StringBuilder sb = new StringBuilder();

for (int i = 0; i < numItems; i++) {

sb.append(charts[ThreadLocalRandom.current().nextInt(chartsLength)]);

}

return sb.toString();

}

}

三、Pom.xml依赖配置如下

0.8.2.1

org.apache.kafka

kafka_2.10

${kafka.version}


版权声明:本文内容由网络用户投稿,版权归原作者所有,本站不拥有其著作权,亦不承担相应法律责任。如果您发现本站中有涉嫌抄袭或描述失实的内容,请联系我们jiasou666@gmail.com 处理,核实后本网站将在24小时内删除侵权内容。

上一篇:Shiro + JWT + SpringBoot应用示例代码详解
下一篇:springboot+jwt实现token登陆权限认证的实现
相关文章

 发表评论

暂时没有评论,来抢沙发吧~