spring boot整合spring

网友投稿 231 2023-05-03


spring boot整合spring

前言

由于我们的新项目使用的是spring-boot,而又要同步新项目中建的数据到老的系统当中.原来已经有一部分的同步代码,使用的是kafka. 其实只是做数据的同步,我觉得选MQ没必要使用kafka.首先数据量不大,其实搞kafka又要搞集群,ZK.只是用做一些简单数据同步的话,有点大材小用.

没办法,咱只是个打工的,领导让搞就搞吧.刚开始的时候发现有一个spring-integration-kafka,描述中说是基于spring-kafka做了一次重写.但是我看了官方文档.实在是搞的有点头大.功能一直没实现.文档写的也不是很漂亮,也可能是刚起步,有很多的问题.我这里只能放弃了,使用了spring-kafka.

实现方法

pom.xml文件如下

xmlns:xsi="http://w3.org/2001/XMLSchema-instance"

xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">

4.0.0

org.linuxsogood.sync

linuxsogood-sync

1.0.0-SNAPSHOT

org.springframework.boot

spring-boot-starter-parent

1.4.0.RELEASE

1.8

3.3.1

1.2.4

3.3.6

4.1.1

org.springframework.boot

spring-boot-starter-web

org.springframework.boot

spring-boot-starter-jdbc

org.springframework.boot

spring-boot-starter-aop

org.springframework.boot

spring-boot-starter-freemarker

org.springframework.kafka

spring-kafka

1.1.0.RELEASE

junit

junit

4.12

test

org.assertj

assertj-core

3.5.2

org.hamcrest

hamcrest-all

1.3

test

org.mockito

mockito-all

1.9.5

test

org.springframework

spring-test

4.2.3.RELEASE

test

org.springframework.boot

spring-boot-starter-test

test

mysql

mysql-connector-java

com.microsoft.sqlserver

sqljdbc4

4.0.0

com.alibaba

druid

1.0.11

org.mybatis

mybatis

${mybatis.version}

org.mybatis

mybatis-spring

${mybatis.spring.version}

org.mybatis.generator

mybatis-generator-core

1.3.2

compile

true

com.github.pagehelper

pagehelper

${pagehelper.version}

tk.mybatis

mapper

${mapper.version}

com.alibaba

fastjson

1.2.17

repo.spring.io.milestone

Spring Framework Maven Milestone Repository

https://repo.spring.io/libs-milestone

mybatis_generator

org.mybatis.generator

mybatis-generator-maven-plugin

1.3.2

true

true

org.springframework.boot

spring-boot-maven-plugin

org.linuxsogood.sync.Starter

xmlns:xsi="http://w3.org/2001/XMLSchema-instance"

xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">

4.0.0

org.linuxsogood.sync

linuxsogood-sync

1.0.0-SNAPSHOT

org.springframework.boot

spring-boot-starter-parent

1.4.0.RELEASE

1.8

3.3.1

1.2.4

3.3.6

4.1.1

org.springframework.boot

spring-boot-starter-web

org.springframework.boot

spring-boot-starter-jdbc

org.springframework.boot

spring-boot-starter-aop

org.springframework.boot

spring-boot-starter-freemarker

org.springframework.kafka

spring-kafka

1.1.0.RELEASE

junit

junit

4.12

test

org.assertj

assertj-core

3.5.2

org.hamcrest

hamcrest-all

1.3

test

org.mockito

mockito-all

1.9.5

test

org.springframework

spring-test

4.2.3.RELEASE

test

org.springframework.boot

spring-boot-starter-test

test

mysql

mysql-connector-java

com.microsoft.sqlserver

sqljdbc4

4.0.0

com.alibaba

druid

1.0.11

org.mybatis

mybatis

${mybatis.version}

org.mybatis

mybatis-spring

${mybatis.spring.version}

org.mybatis.generator

mybatis-generator-core

1.3.2

compile

true

com.github.pagehelper

pagehelper

${pagehelper.version}

tk.mybatis

mapper

${mapper.version}

com.alibaba

fastjson

1.2.17

repo.spring.io.milestone

Spring Framework Maven Milestone Repository

https://repo.spring.io/libs-milestone

mybatis_generator

org.mybatis.generator

mybatis-generator-maven-plugin

1.3.2

true

true

org.springframework.boot

spring-boot-maven-plugin

org.linuxsogood.sync.Starter

orm层使用了MyBatis,又使用了通用Mapper和分页插件.

kafka消费端配置

import org.linuxsogood.sync.listener.Listener;

import org.apache.kafka.clients.consumer.ConsumerConfig;

import org.apache.kafka.common.serialization.StringDeserializer;

import org.springframework.beans.factory.annotation.Value;

import org.springframework.context.annotation.Bean;

import org.springframework.context.annotation.Configuration;

import org.springframework.kafka.annotation.EnableKafka;

import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;

import org.springframework.kafka.config.KafkaListenerContainerFactory;

import org.springframework.kafka.core.ConsumerFactory;

import org.springframework.kafka.core.DefaultKafkaConsumerFactory;

import org.springframework.kafka.listener.ConcurrentMessageListenerContainer;

import java.util.HashMap;

import java.util.Map;

@Configuration

@EnableKafka

public class KafkaConsumerConfig {

@Value("${kafka.broker.address}")

private String brokerAddress;

@Bean

KafkaListenerContainerFactory> kafkaListenerContainerFactory() {

ConcurrentKafkaListenerContainerFactory factory = new ConcurrentKafkaListenerContainerFactory<>();

factory.setConsumerFactory(consumerFactory());

factory.setConcurrency(3);

factory.getContainerProperties().setPollTimeout(3000);

return factory;

}

@Bean

public ConsumerFactory consumerFactory() {

return new DefaultKafkaConsumerFactory<>(consumerConfigs());

}

@Bean

public Map consumerConfigs() {

Map propsMap = new HashMap<>();

propsMap.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, this.brokerAddress);

propsMap.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);

propsMap.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, "100");

propsMap.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, "15000");

propsMap.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);

propsMap.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);

propsMap.put(ConsumerConfig.GROUP_ID_CONFIG, "firehome-group");

propsMap.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");

return propsMap;

}

@Bean

public Listener listener() {

return new Listener();

}

}

生产者的配置.

import org.apache.kafka.clients.producer.ProducerConfig;

import org.apache.kafka.common.serialization.StringSerializer;

import org.springframework.beans.factory.annotation.Value;

import org.springframework.context.annotation.Bean;

import org.springframework.context.annotation.Configuration;

import org.springframework.kafka.annotation.EnableKafka;

import org.springframework.kafka.core.DefaultKafkaProducerFactory;

import org.springframework.kafka.core.KafkaTemplate;

import org.springframework.kafka.core.ProducerFactory;

import java.util.HashMap;

import java.util.Map;

@Configuration

@EnableKafka

public class KafkaProducerConfig {

@Value("${kafka.broker.address}")

private String brokerAddress;

@Bean

public ProducerFactory producerFactory() {

return new DefaultKafkaProducerFactory<>(producerConfigs());

}

@Bean

public Map producerConfigs() {

Map props = new HashMap<>();

props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, this.brokerAddress);

props.put(ProducerConfig.RETRIES_CONFIG, 0);

props.put(ProducerConfig.BATCH_SIZE_CONFIG, 16384);

props.put(ProducerConfig.LINGER_MS_CONFIG, 1);

props.put(ProducerConfig.BUFFER_MEMORY_CONFIG, 33554432);

props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);

props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);

return props;

}

@Bean

public KafkaTemplate kafkaTemplate() {

return new KafkaTemplate(producerFactory());

}

}

监听,监听里面,写的就是业务逻辑了,从kafka里面得到数据后,具体怎么去处理. 如果需要开启kafka处理消息的广播模式,多个监听要监听不同的group,即方法上的注解@KafkaListener里的group一定要不一样.如果多个监听里的group写的一样,就会造成只有一个监听能处理其中的消息,另外监听就不能处理消息了.也即是kafka的分布式消息处理方式.

在同一个group里的监听,共同处理接收到的消息,会根据一定的算法来处理.如果不在一个组,但是监听的是同一个topic的话,就会形成广播模式

import com.alibaba.fastjson.JSON;

import org.linuxsogood.qilian.enums.CupMessageType;

import org.linuxsogood.qilian.kafka.MessageWrapper;

import org.linuxsogood.qilian.model.store.Store;

import org.linuxsogood.sync.mapper.StoreMapper;

import org.linuxsogood.sync.model.StoreExample;

import org.apache.commons.lang3.StringUtils;

import org.apache.kafka.clients.consumer.ConsumerRecord;

import org.slf4j.Logger;

import org.slf4j.LoggerFactory;

import org.springframework.beans.factory.annotation.Autowired;

import org.springframework.kafka.annotation.KafkaListener;

import java.util.List;

import java.util.Optional;

public class Listener {

private static final Logger LOGGER = LoggerFactory.getLogger(Listener.class);

@Autowired

private StoreMapper storeMapper;

/**

* 监听kafka消息,如果有消息则消费,同步数据到新烽火的库

* @param record 消息实体bean

*/

@KafkaListener(topics = "linuxsogood-topic", group = "sync-group")

public void listen(ConsumerRecord, ?> record) {

Optional> kafkaMessage = Optional.ofNullable(record.value());

if (kafkaMessage.isPresent()) {

Object message = kafkaMessage.get();

try {

MessageWrapper messageWrapper = JSON.parseObject(message.toString(), MessageWrapper.class);

CupMessageType type = messageWrapper.getType();

//判断消息的数据类型,不同的数据入不同的表

if (CupMessageType.STORE == type) {

proceedStore(messageWrapper);

}

} catch (Exception e) {

LOGGER.error("将接收到的消息保存到数据库时异常, 消息:{}, 异常:{}",message.toString(),e);

}

}

}

/**

* 消息是店铺类型,店铺消息处理入库

* @param messageWrapper 从kafka中得到的消息

*/

private void proceedStore(MessageWrapper messageWrapper) {

Object data = messageWrapper.getData();

Store cupStore = JSON.parseObject(data.toString(), Store.class);

StoreExample storeExample = new StoreExample();

String storeName = StringUtils.isBlank(cupStore.getStoreOldName()) ? cupStore.getStoreName() : cupStore.getStoreOldName();

storeExample.createCriteria().andStoreNameEqualTo(storeName);

List stores = storeMapper.selectByExample(storeExample);

org.linuxsogood.sync.model.Store convertStore = new org.linuxsogood.sync.model.Store();

org.linuxsogood.sync.model.Store store = convertStore.convert(cupStore);

//如果查询不到记录则新增

if (stores.size() == 0) {

storeMapper.insert(store);

} else {

store.setStoreId(stores.get(0).getStoreId());

storeMapper.updateByPrimaryKey(store);

}

}

}

总结

以上就是这篇文章的全部内容了,希望本文的内容对大家的学习或者工作能带来一定的帮助,如果有疑问大家可以留言交流,谢谢大家对我们的支持。


版权声明:本文内容由网络用户投稿,版权归原作者所有,本站不拥有其著作权,亦不承担相应法律责任。如果您发现本站中有涉嫌抄袭或描述失实的内容,请联系我们jiasou666@gmail.com 处理,核实后本网站将在24小时内删除侵权内容。

上一篇:mock自动生成工具(mockjs怎么生成数据)
下一篇:详解vue2父组件传递props异步数据到子组件的问题
相关文章

 发表评论

暂时没有评论,来抢沙发吧~