java中的接口是类吗
267
2022-12-05
Java kafka如何实现自定义分区类和拦截器
生产者发送到对应的分区有以下几种方式:
(1)指定了patition,则直接使用;(可以查阅对应的java api, 有多种参数)
(2)未指定patition但指定key,通过对key的value进行hash出一个patition;
(3)patition和key都未指定,使用轮询选出一个patition。
但是kafka提供了,自定义分区算法的功能,由业务手动实现分布:
1、实现一个自定义分区类,CustomPartitioner实现Partitioner
import org.apache.kafka.clients.producer.Partitioner;
import org.apache.kafka.common.Cluster;
import java.util.Map;
public class CustomPartitioner implements Partitioner {
/**
*
* @param topic 当前的发送的topic
* @param key 当前的key值
* @param keyBytes 当前的key的字节数组
* @param value 当前的value值
* @param valueBytes 当前的value的字节数组
* @param cluster
* @return
*/
@Override
public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) {
//这边根据返回值就是分区号, 这边就是固定发送到三号分区
return 3;
}
@Override
public void close() {
}
@Override
public void configure(Map
}
}
2、producer配置文件指定,具体的分区类
// 具体的分区类
props.put(ProducerConfig.PARTITIONER_CLASS_CONFIG, "kafka.CustomPartitioner");
技巧:可以使用ProducerConfig中提供的配置ProducerConfig
kafka producer拦截器
拦截器(interceptor)是在Kafka 0.10版本被引入的。
interceptor使得用户在消息发送前以及producer回调逻辑前有机会对消息做一些定制化需求,比如修改消息等。
许用户指定多个interceptor按序作用于同一条消息从而形成一个拦截链(interceptor chain)。
所使用的类为:
org.apache.kafka.clients.producer.ProducerInterceptor
我们可以编码测试下:
1、定义消息拦截器,实现消息处理(可以是加时间戳等等,unid等等。)
import org.apache.kafka.clients.producer.ProducerInterceptor;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
import java.util.Map;
import java.util.UUID;
public class MessageInterceptor implements ProducerInterceptor
@Override
public void configure(Map
System.out.println("这是MessageInterceptor的configure方法");
}
/**
* 这个是消息发送之前进行处理
*
* @param record
* @return
*/
@Override
public ProducerRecord
// 创建一个新的record,把uuid入消息体的最前部
System.out.println("为消息添加uuid");
return new ProducerRecord(record.topic(), record.partition(), record.timestamp(), record.key(),
UUID.randomUUID().toString().replace("-", "") + "," + record.value());
}
/**
* 这个是生产者回调函数调用之前处理
* @param metadata
* @param exception
*/
@Override
public void onAcknowledgement(RecordMetadata metadata, Exception exception) {
System.out.println("MessageInterceptor拦截器的onAcknowledgement方法");
}
@Override
public void close() {
System.out.println("MessageInterceptor close 方法");
}
}
2、定义计数拦截器
import java.util.Map;
import org.apache.kafka.clients.producer.ProducerInterceptor;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
public class CounterInterceptor implements ProducerInterceptor
private int errorCounter = 0;
private int successCounter = 0;
@Override
public void configure(Map
System.out.println("这是CounterInterceptor的configure方法");
}
@Override
public ProducerRecord
System.out.println("CounterInterceptor计数过滤器不对消息做任何操作");
return record;
}
@Override
public void onAcknowledgement(RecordMetadata metadata, Exception exception) {
// 统计成功和失败的次数
System.out.println("CounterInterceptor过滤器执行统计失败和成功数量");
if (exception == null) {
successCounter++;
} else {
errorCounter++;
}
}
@Override
public void close() {
// 保存结果
System.out.println("Successful sent: " + successCounter);
System.out.println("Failed sent: " + errorCounter);
}
}
3、producer客户端:
import org.apache.kafka.clients.producer.*;
import java.util.ArrayList;
import java.util.List;
import java.util.Properties;
public class Producer1 {
public static void main(String[] args) throws Exception {
Properties props = new Properties();
// Kafka服务端的主机名和端口号
props.put("bootstrap.servers", "localhost:9092");
// 等待所有副本节点的应答
props.put("acks", "all");
// 消息发送最大尝试次数
props.put("retries", 0);
// 一批消息处理大小
props.put("batch.sizehttp://", 16384);
// 请求延时,可能生产数据太快了
props.put("linger.ms", 1);
// 发送缓存区内存大小,数据是先放到生产者的缓冲区
props.put("buffer.memory", 33554432);
// key序列化
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
// value序列化
props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
// 具体的分区类
props.put(ProducerConfig.PARTITIONER_CLASS_CONFIG, "kafka.CustomPartitioner");
//定义拦截器
List
interceptors.add("kafka.MessageInterceptor");
interceptors.add("kafka.CounterInterceptor");
props.put(ProducerConfig.INTERCEPTOR_CLASSES_CONFIG, interceptors);
Producer
for (int i = 0; i < 1; i++) {
producer.send(new ProducerRecord
public void onCompletion(RecordMetadata recordMetadata, Exception e) {
System.out.println("这是producer回调函数");
}
});
}
/*System.out.println("现在执行关闭producer");
producer.close();*/
producer.close();
}
}
总结,我们可以知道拦截器链各个方法的执行顺序,假如有A、B拦截器,在一个拦截器链中:
(1)执行A的configure方法,执行B的configure方法
(2)执行A的onSend方法,B的onSend方法
(3)生产者发送完毕后,执行A的onAcknowledgement方法,B的onAcknowledgement方法。
(4)执行producer自身的callback回调函数。
(5)执行A的close方法,B的close方法。
版权声明:本文内容由网络用户投稿,版权归原作者所有,本站不拥有其著作权,亦不承担相应法律责任。如果您发现本站中有涉嫌抄袭或描述失实的内容,请联系我们jiasou666@gmail.com 处理,核实后本网站将在24小时内删除侵权内容。
发表评论
暂时没有评论,来抢沙发吧~