Springboot集成Kafka进行批量消费及踩坑点

网友投稿 1090 2022-09-09


Springboot集成Kafka进行批量消费及踩坑点

目录引入依赖创建配置类Kafka 消费者

引入依赖

org.springframework.kafka

spring-kafka

1.3.11.RELEASE

因为我的项目的 springboot 版本是 1.5.22.RELEASE,所以引的是 1.3.11.RELEASE 的包。读者可以根据下图来自行选择对应的版本。图片更新可能不及时,详情可查看spring-kafka 官方网站。

注:这里有个踩坑点,如果引入包版本不对,项目启动时会抛出org.springframework.core.log.LogAccessor 异常:

java.lang.ClassNotFoundException: org.springframework.core.log.LogAccessor

创建配置类

/**

* kafka 配置类

*/

@Configuration

@EnableKafka

public class KafkaConsumerConfig {

private static final org.slf4j.Logger LOGGER = LoggerFactory.getLogger(KafkaConsumerConfig.class);

@Value("${kafka.bootstrap.servers}")

private String kafkaBootstrapServers;

@Value("${kafka.group.id}")

private String kafkaGroupId;

@Value("${kafka.topic}")

private String kafkaTopic;

public static final String CONFIG_PATH = "/home/admin/xxx/BOOT-INF/classes/kafka_client_jaas.conf";

public static final String LOCATION_PATH = "/home/admin/xxx/BOOT-INF/classes/kafka.client.truststore.jks";

@Bean

public KafkaListenerContainerFactory> kafkaListenerContainerFactory() {

ConcurrentKafkaListenerContainerFactory factory = new ConcurrentKafkaListenerContainerFactory<>();

factory.setConsumerFactory(consumerFactory());

// 设置并发量,小于或者等于 Topic 的分区数

factory.setConcurrency(5);

// 设置为批量监听

factory.setBatchListener(Boolean.TRUE);

factory.getContainerProperties().setPollTimeout(30000);

return factory;

}

public ConsumerFactory consumerFactory() {

return new DefaultKafkaConsumerFactory<>(consumerConfigs());

}

public Map consumerConfigs() {

Map props = new HashMap<>();

//设置接入点,请通过控制台获取对应Topic的接入点。

props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaBootstrapServers);

//设置SSL根证书的路径,请记得将XXX修改为自己的路径。

//与SASL路径类似,该文件也不能被打包到jar中。

System.setProperty("java.security.auth.login.config", CONFIG_PATH);

props.put(SslConfigs.SSL_TRUSTSTORE_LOCATION_CONFIG, LOCATION_PATH);

//根证书存储的密码,保持不变。

props.put(SslConfigs.SSL_TRUSTSTORE_PASSWORD_CONFIG, "KafkaOnsClient");

//接入协议,目前支持使用SASL_SSL协议接入。

props.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, "SASL_SSL");

//SASL鉴权方式,保持不变。

props.put(SaslConfigs.SASL_MECHANISM, "PLAIN");

// 自动提交

props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, Boolean.TRUE);

//两次Poll之间的最大允许间隔。

//消费者超过该值没有返回心跳,服务端判断消费者处于非存活状态,服务端将消费者从Consumer Group移除并触发Rebalance,默认30s。

props.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, 30000);

//设置单次拉取的量,走公网访问时,该参数会有较大影响。

props.put(ConsumerConfig.MAX_PARTITION_FETCH_BYTES_CONFIG, 32000);

props.put(ConsumerConfig.FETCH_MAX_BYTES_CONFIG, 32000);

//每次Poll的最大数量。

//注意该值不要改得太大,如果Poll太多数据,而不能在下次Poll之前消费完,则会触发一次负载均衡,产生卡顿。

props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, 30);

//消息的反序列化方式。

props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");

props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");

//当前消费实例所属的消费组,请在控制台申请之后填写。

//属于同一个组的消费实例,会负载消费消息。

props.put(ConsumerConfig.GROUP_ID_CONFIG, kafkaGroupId);

//Hostname校验改成空。

props.put(SslConfigs.SSL_ENDPOINT_IDENTIFICATION_ALGORITHM_CONFIG, "");

return props;

}

}

注:此处通过 factory.setConcurrency(5); 配置了并发量为 5 ,假设我们线上的 Topic 有 12 个分区。那么将会是 3 个线程分配到 2 个分区,2 个线程分配到 3 个分区,3 * 2 + 2 * 3 = 12。

Kafka 消费者

/**

* kafka 消息消费类

*/

@Component

public class KafkaMessageListener {

private static final Logger LOGGER = LoggerFactory.getLogger(KafkaMessageListener.class);

@KafkaListener(topics = {"${kafka.topic}"})

public void listen(List> recordList) {

for (ConsumerRecord record : recordList) {

// 打印消息的分区以及偏移量

LOGGER.info("Kafka Consume partition:{}, offset:{}", record.partition(), record.offset());

String value = record.value();

System.out.println("value = " + value);

// 处理业务逻辑 ...

}

}

}

因为我在配置类中设置了批量监听,所以此处 listen 方法的入参是List:List>。


版权声明:本文内容由网络用户投稿,版权归原作者所有,本站不拥有其著作权,亦不承担相应法律责任。如果您发现本站中有涉嫌抄袭或描述失实的内容,请联系我们jiasou666@gmail.com 处理,核实后本网站将在24小时内删除侵权内容。

上一篇:网络知识丨公网NAT网关(公网nat类型)
下一篇:企业网络项目-1000人网络冗余拓扑(大型企业网络拓扑)
相关文章

 发表评论

暂时没有评论,来抢沙发吧~