浅谈使用java实现阿里云消息队列简单封装

网友投稿 437 2023-02-14


浅谈使用java实现阿里云消息队列简单封装

一、前言

最近公司有使用阿里云消息队列的需求,为了更加方便使用,本人用了几天时间将消息队列封装成api调用方式以方便内部系统的调用,现在已经完成,特此记录其中过程和使用到的相关技术,与君共勉。

现在阿里云提供了两种消息服务:mns服务和ons服务,其中我认为mns是简化版的ons,而且mns的消息消费需要自定义轮询策略的,相比之下,ons的发布与订阅模式功能更加强大(比如相对于mns,ons提供了消息追踪、日志、监控等功能),其api使用起来更加方便,而且听闻阿里内部以后不再对mns进行新的开发,只做维护,ons服务则会逐步替代mns服务成为阿里消息服务的主打产品,所以,如果有使用消息队列的需求,建议不要再使用mns,使用ons是最好的选择。

涉及到的技术:Spring,反射、动态代理、Jackson序列化和反序列化

在看下面的文章之前,需要先看上面的文档以了解相关概念(Topic、Consumer、Producer、Tag等)以及文档中提供的简单的发送和接收代码实现。

该博文只针对有消息队列知识基础的朋友看,能帮上大家的忙我自然很高兴,看不懂的也不要骂,说明你路子不对。

二、设计方案

1.消息发送

在一个简单的cs架构中,假设server会监听一个Topic的Producer发送的消息,那么它首先应该提供client一个api,client只需要简单的调用该api,就可以通过producer来生产消息

2.消息接收

由于api是server制定的,所以server当然也知道如何消费这些消息

在这个过程中,server实际充当着消费者的角色,client实际充当着生产者的角色,但是生产者生产消息的规则则由消费者制定以满足消费者消费需求。

3.最终目标

我们要创建一个单独的jar包,起名为queue-core为生产者和消费者提供依赖和发布订阅的具体实现。

三、消息发送

1.消费者提供接口

@Topic(name="kdyzm",producerId="kdyzm_producer")

public interface UserQueueResource {

@Tag("test1")

public void handleUserInfo(@Body @Key("userInfoHandler") UserModel user);

@Tag("test2")

public void handleUserInfo1(@Body @Key("userInfoHandler1") UserModel user);

}

由于Topic和producer之间是N:1的关系,所以这里直接将producerId作为Topic的一个属性;Tag是一个很关键的过滤条件,消费者通过它进行消息的分类做不同的业务处理,所以,这里使用Tag作为路由条件。

2.生产者使用消费者提供的api发送消息

由于消费者只提供了接口给生产者使用,接口是没有办法直接使用的,因为没有办法实例化,这里使用动态代理生成对象,在消费者提供的api中,添加如下config,以方便生产者直接导入config即可使用,这里使用了基于java的spring config,请知悉。

@Configuration

public class QueueConfig {

@Autowired

@Bean

public UserQueueResource userQueueResource() {

return QueueResourceFachttp://tory.createProxyQueueResource(UserQueueResource.class);

}

}

3.queue-core对生产者发送消息的封装

以上1中所有的注解(Topic、Tag、Body 、Key)以及2中使用到的QueueResourceFactory类都要在queue-core中定义,其中注解的定义只是定义了规则,真正的实现实际上是在QueueResourceFactory中

import java.lang.reflect.InvocationHandler;http://

import java.lang.reflect.Method;

import java.lang.reflect.Proxy;

import org.slf4j.Logger;

import org.slf4j.LoggerFactory;

import com.aliyun.openservices.ons.api.Message;

import com.aliyun.openservices.ons.api.Producer;

import com.aliyun.openservices.ons.api.SendResult;

import com.wy.queue.core.api.MQConnection;

import com.wy.queue.core.utils.JacksonSerializer;

import com.wy.queue.core.utils.MQUtils;

import com.wy.queue.core.utils.QueueCoreSpringUtils;

public class QueueResourceFactory implements InvocationHandler {

private static final Logger logger=LoggerFactory.getLogger(QueueResourceFactory.class);

private String topicName;

private String producerId;

private JacksonSerializer serializer=new JacksonSerializer();

private static final String PREFIX="PID_";

public QueueResourceFactory(String topicName,String producerId) {

this.topicName = topicName;

this.producerId=producerId;

}

public static T createProxyQueueResource(Class clazz) {

String topicName = MQUtils.getTopicName(clazz);

String producerId = MQUtils.getProducerId(clazz);

T target = (T) Proxy.newProxyInstance(QueueResourceFactory.class.getClassLoader(),

new Class>[] { clazz }, new QueueResourceFactory(topicName,producerId));

return target;

}

@Override

public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {

if(args.length == 0 || args.length>1){

throw new RuntimeException("only accept one param at queueResource interface.");

}

String tagName=MQUtils.getTagName(method);

ProducerFactory producerFactory = QueueCoreSpringUtils.getBean(ProducerFactory.class);

MQConnection connectionInfo = QueueCoreSpringUtils.getBean(MQConnection.class);

Producer producer = producerFactory.createProducer(PREFIX+connectionInfo.getPrefix()+"_"+producerId);

//发送消息

Message msg = new Message( //

// 在控制台创建的 Topic,即该消息所属的 Topic 名称

connectionInfo.getPrefix()+"_"+topicName,

// Message Tag,

// 可理解为 Gmail 中的标签,对消息进行再归类,方便 Consumer 指定过滤条件在 MQ 服务器过滤

tagName,

// Message Body

// 任何二进制形式的数据, MQ 不做任何干预,

// 需要 Producer 与 Consumer 协商好一致的序列化和反序列化方式

serializer.serialize(args[0]).getBytes());

SendResult sendResult = producer.send(msg);

logger.info("Send Message success. Message ID is: " + sendResult.getMessageId());

return null;

}

}

这里特意将自定义包和第三方使用的包名都贴过来了,以便于区分。

这里到底做了哪些事情呢?

发送消息的过程就是动态代理创建一个代理对象,该对象调用方法的时候会被拦截,首先解析所有的注解,比如topicName、producerId、tag等关键信息从注解中取出来,然后调用阿里sdk发送消息,过程很简单,但是注意,这里发送消息的时候是分环境的,一般来讲现在企业中会区分QA、staging、product三种环境,其中QA和staging是测试环境,对于消息队列来讲,也是会有三种环境的,但是QA和staging环境往往为了降低成本使用同一个阿里账号,所以创建的topic和productId会放到同一个区域下,这样同名的TopicName是不允许存在的,所以加上了环境前缀加以区分,比如QA_TopicName,PID_Sthttp://aging_ProducerId等等;另外,queue-core提供了MQConnection接口,以获取配置信息,生产者服务只需要实现该接口即可。

4.生产者发送消息

@Autowired

private UserQueueResource userQueueResource;

@Override

public void sendMessage() {

UserModel userModel=new UserModel();

userModel.setName("kdyzm");

userModel.setAge(25);

userQueueResource.handleUserInfo(userModel);

}

只需要数行代码即可将消息发送到指定的Topic,相对于原生的发送代码,精简了太多。

四、消息消费

相对于消息发送,消息的消费要复杂一些。

1.消息消费设计

由于Topic和Consumer之间是N:N的关系,所以将ConsumerId放到消费者具体实现的方法上

@Controller

@QueueResource

public class UserQueueResourceImpl implements UserQueueResource {

private Logger logger = LoggerFactory.getLogger(this.getClass());

@Override

@ConsumerAnnotation("kdyzm_consumer")

public void handleUserInfo(UserModel user) {

logger.info("收到消息1:{}", new Gson().tojson(user));

}

@Override

@ConsumerAnnotation("kdyzm_consumer1")

public void handleUserInfo1(UserModel user) {

logger.info("收到消息2:{}", new Gson().toJson(user));

}

}

这里又有两个新的注解@QueueResource和@ConsumerAnnotation,这两个注解后续会讨论如何使用。有人会问我为什么要使用ConsumerAnnotation这个名字而不使用Consumer这个名字,因为Consumer这个名字和aliyun提供的sdk中的名字冲突了。。。。

在这里, 消费者提供api 接口给生产者以方便生产者发送消息,消费者则实现该接口以消费生产者发送的消息,如何实现api接口就实现了监听,这点是比较关键的逻辑。

2.queue-core实现消息队列监听核心逻辑

第一步:使用sping 容器的监听方法获取所有加上QueueResource注解的Bean

第二步:分发处理Bean

如何处理这些Bean呢,每个Bean实际上都是一个对象,有了对象,比如上面例子中的UserQueueResourceImpl 对象,我们可以拿到该对象实现的接口字节码对象,进而可以拿到该接口UserQueueRerousce上的注解以及方法上和方法中的注解,当然UserQueueResourceImpl实现方法上的注解也能拿得到,这里我将获取到的信息以consumerId为key,其余相关信息封装为Value缓存到了一个Map对象中,核心代码如下:

Class> clazz = resourceImpl.getClass();

Class> clazzIf = clazz.getInterfaces()[0];

Method[] methods = clazz.getMethods();

String topicName = MQUtils.getTopicName(clazzIf);

for (Method m : methods) {

ConsumerAnnotation consumerAnno = m.getAnnotation(ConsumerAnnotation.class);

if (null == consumerAnno) {

// logger.error("method={} need Consumer annotation.", m.getName());

continue;

}

String consuerId = consumerAnno.value();

if (StringUtils.isEmpty(consuerId)) {

logger.error("method={} ConsumerId can't be null", m.getName());

continue;

}

Class>[] parameterTypes = m.getParameterTypes();

Method resourceIfMethod = null;

try {

resourceIfMethod = clazzIf.getMethod(m.getName(), parameterTypes);

} catch (NoSuchMethodException | SecurityException e) {

logger.error("can't find method={} at super interface={} .", m.getName(), clazzIf.getCanonicalName(),

e);

continue;

}

String tagName = MQUtils.getTagName(resourceIfMethod);

consumersMap.put(consuerId, new MethodInfo(topicName, tagName, m));

}

第三步:通过反射实现消费的动作

首先,先确定好反射动作执行的时机,那就是监听到了新的消息

其次,如何执行反射动作?不赘述,有反射相关基础的童鞋都知道怎么做,核心代码如下所示:

MQConnection connectionInfo = QueueCoreSpringUtils.getBean(MQConnection.class);

String topicPrefix=connectionInfo.getPrefix()+"_";

String consumerIdPrefix=PREFIX+connectionInfo.getPrefix()+"_";

for(String consumerId:consumersMap.keySet()){

MethodInfo methodInfo=consumersMap.get(consumerId);

Properties connectionProperties=convertToProperties(connectionInfo);

// 您在控制台创建的 Consumer ID

connectionProperties.put(PropertyKeyConst.ConsumerId, consumerIdPrefix+consumerId);

Consumer consumer = ONSFactory.createConsumer(connectionProperties);

consumer.subscribe(topicPrefix+methodInfo.getTopicName(), methodInfo.getTagName(), new MessageListener() { //订阅多个Tag

public Action consume(Message message, ConsumeContext context) {

try {

String messageBody=new String(message.getBody(),"UTF-8");

logger.info("receive message from topic={},tag={},consumerId={},message={}",topicPrefix+methodInfo.getTopicName(),methodInfo.getTagName(),consumerIdPrefix+consumerId,messageBody);

Method method=methodInfo.getMethod();

Class> parameType = method.getParameterTypes()[0];

Object arg = jacksonSerializer.deserialize(messageBody, parameType);

Object[] args={arg};

method.invoke(resourceImpl, args);

} catch (Exception e) {

logger.error("",e);

}

return Action.CommitMessage;

}

});

consumer.start();

logger.info("consumer={} has started.",consumerIdPrefix+consumerId);

}

五、完整代码见下面的git链接

https://github.com/kdyzm/queue-core.git


版权声明:本文内容由网络用户投稿,版权归原作者所有,本站不拥有其著作权,亦不承担相应法律责任。如果您发现本站中有涉嫌抄袭或描述失实的内容,请联系我们jiasou666@gmail.com 处理,核实后本网站将在24小时内删除侵权内容。

上一篇:api接口注册管理目的(api接口注册管理目的是什么)
下一篇:接口性能测试指标(接口测试的指标)
相关文章

 发表评论

暂时没有评论,来抢沙发吧~