java rocketmq

网友投稿 876 2023-01-03


java rocketmq

前言

与消息发送紧密相关的几行代码:

1. DefaultMQProducer producer = new DefaultMQProducer("ProducerGroupName");

2. producer.start();

3. Message msg = new Message(...)

4. SendResult sendResult = producer.send(msg);

5. producer.shutdown();

那这几行代码执行时,背后都做了什么?

一. 首先是DefaultMQProducer.start

@Override

public void start() throws MQClientException {

this.defaultMQProducerImpl.start();

}

调用了默认生成消息的实现类 -- DefaultMQProducerImpl

调用defaultMQProducerImpl.start()方法,DefaultMQProducerImpl.start()会初始化得到MQClientInstance实例对象,MQClientInstance实例对象调用它自己的start方法会 ,启动一些服务,如拉去消息服务PullMessageService.Start()、启动负载平衡服务RebalanceService.Start(),比如网络通信服务MQClientAPIImpl.Start()

另外,还会执行与生产消息相关的信息,如注册produceGroup、new一个TopicPublishInfo对象并以默认TopicKey为键值,构成键值对存入DefaultMQProducerImpl的topicPublishInfoTable中。

efaultMQProducerImpl.start()后,获取的MQClientInstance实例对象会调用sendHeartbeatToAllBroker()方法,不断向broker发送心跳包,yin'b可以使用下面一幅图大致描述DefaultMQProducerImpl.start()过程:

上图中的三个部分中涉及的内容:

1.1 初始化MQClientInstance

一个客户端只能产生一个MQClientInstance实例对象,产生方式使用了工厂模式与单例模式。MQClientInstance.start()方法启动一些服务,源码如下:

public void start() throws MQClientException {

synchronized (this) {

switch (this.serviceState) {

case CREATE_JUST:

this.serviceState = ServiceState.START_FAILED;

// If not specified,looking address from name server

if (null == this.clientConfig.getNamesrvAddr()) {

this.mQClientAPIImpl.fetchNameServerAddr();

}

// Start request-response channel

this.mQClientAPIImpl.start();

// Start various schedule tasks

this.startScheduledTask();

// Start pull service

this.pullMessageService.start();

// Start rebalance service

this.rebalanceService.start();

// Start push service

this.defaultMQProducer.getDefaultMQProducerImpl().start(false);

log.info("the client factory [{}] start OK", this.clientId);

this.serviceState = ServiceState.RUNNING;

break;

case RUNNING:

break;

case SHUTDOWN_ALREADY:

break;

case START_FAILED:

throw new MQClientException("The Factory object[" + this.getClientId() + "] has been created before, and failed.", null);

default:

break;

}

}

}

1.2 注册producer

该过程会将这个当前producer对象注册到MQClientInstance实例对象的的producerTable中。一个jvm(一个客户端)中一个producerGroup只能有一个实例,MQClientInstance操作producerTable大概有如下几个方法:

-- selectProducer

-- updateTopicRouteInfoFromNameServer

-- prepareHeartbeatData

-- isNeedUpdateTopicRouteInfo

-- shutdown

注:

根据不同的clientId,MQClientManager将给出不同的MQClientInstance;

根据不同的group,MQClientInstance将给出不同的MQProducer和MQConsumer

1.3 向路由信息表中添加路由

topicPublishInfoTable定义:

public class DefaultMQProducerImpl implements MQProducerInner {

private final Logger log = ClientLogger.getLog();

private final Random random = new Random();

private final DefaultMQProducer defaultMQProducer;

private final ConcurrentMap topicPublishInfoTable = new ConcurrentHashMap();

它是一个以topic为key的Map型数据结构,DefaultMQProducerImpl.start()时会默认创建一个key=MixAll.DEFAULT_TOPIC的TopicPublishInfo存放到topicPublishInfoTable中。

1.4 发送心跳包

MQClientInstance向broker发送心跳包时,调用sendHeartbeatToAllBroker( ),以及从MQClientInstance实例对象的brokerAddrTable中拿到所有broker地址,向这些broker发送心跳包。

sendHeartbeatToAllBroker会涉及到prepareHeartbeatData()方法,该方法会生成heartbeatData数据,发送心跳包时,heartbeatData作为心跳包的body。与producer相关的部分代码如下:

// Producer

for (Map.Entry entry : this.producerTable.entrySet()) {

MQProducerInner impl = entry.getValue();

if (impl != null) {

ProducerData producerData = new ProducerData();

producerData.setGroupName(entry.getKey());

heartbeatData.getProducerDataSet().add(producerData);

}

二、. SendResult sendResult = producer.send(msg)

首先会调用DefaultMQProducer.send(msg) ,继而调用sendDefaultImpl:

public SendResult send(Message msg,

long timeout) throws MQClientException, RemotingException, MQBrokerException, InterruptedException {

return this.sendDefaultImpl(msg, CommunicationMode.SYNC, null, timeout);

}

sendDefaultImpl做了啥?

2.1. 获取topicPublishInfo

根据msg的topic从topicPublishInfoTable获取对应的topicPublishInfo,如果没有则更新路由信息,从nameserver端拉取最新路由信息。从nameserver端拉取最新路由信息大致为:

首先getTopicRouteInfoFromNameServer,然后topicRouteData2TopicPublishInfo。

2.2 选择消息发送的队列

普通消息:默认方式下,selectOneMessageQueue从topicPublishInfo中的messageQueueList中选择一个队列(MessageQueue)进行发送消息,默认采用长轮询的方式选择队列 。

它的机制如下:正常情况下,顺序选择queue进行发送;如果某一个节点发生了超时,则下次选择queue时,跳过相同的broker。不同的队列选择策略形成了生产消息的几种模式,如顺序消息,事务消息。

顺序消息:将一组需要有序消费的消息发往同一个broker的同一个队列上即可实现顺序消息,假设相同订单号的支付,退款需要放到同一个队列,那么就可以在send的时候,自己实现MessageQueueSelector,根据参数arg字段来选择queue。

privateMKMxzK SendResult sendSelectImpl(

Message msg,

MessageQueueSelector selector,

Object arg,

final CommunicationMode communicationMode,

final SendCallback sendCallback, final long timeout

) throws MQClientException, RemotingException, MQBrokerException, InterruptedException { 。。。}

事务消息:只有在消息发送成功,并且本地操作执行成功时,才发送提交事务消息,做事务提交,消息发送失败,直接发送回滚消息,进行回滚,具体如何实现后面会单独成文分析。

2.3 封装消息体通信包,发送数据包

首先,根据获取的MessageQueue中的getBrokerName,调用findBrokerAddressInPublish得到该消息存放对应的broker地址,如果没有找到则跟新路由信息,重新获取地址 :

brokerAddrTable.get(brokerName).get(MixAll.MASTER_ID)

可知获取的broker均为master(id=0)

然后, 将与该消息相关信息打包成RemotingCommand数据包,其RequestCode.SEND_MESSAGE

根据获取的broke地址,将数据包到对应的broker,默认是发送超时时间为3s。

封装消息请求包的包头:

SendMessageRequestHeader requestHeader = new SendMessageRequestHeader();

requestHeader.setProducerGroup(this.defaultMQProducer.getProducerGroup());

requestHeader.setTopic(msg.getTopic());

requestHeader.setDefaultTopic(this.defaultMQProducer.getCreateTopicKey());

requestHeader.setDefaultTopicQueueNums(this.defaultMQProducer.getDefaultTopicQueueNums());

requestHeader.setQueueId(mq.getQueueId());

requestHeader.setSysFlag(sysFlag);

requestHeader.setBornTimestamp(System.currentTimeMillis());

requestHeader.setFlag(msg.getFlag());

requestHeader.setProperties(MessageDecoder.messageProperties2String(msg.getProperties()));

requestHeader.setReconsumeTimes(0);

requestHeader.setUnitMode(this.isUnitMode());

requestHeader.setBatch(msg instanceof MessageBatch);

发送消息包(普通消息默认为同步方式):

SendResult sendResult = null;

switch (communicationMode) {

   case SYNC:

  sendResult = this.mQClientFactory.getMQClientAPIImpl().sendMessage(

  brokerAddr,

  mq.getBrokerName(),

   msg,

  requestHeader,

   timeout,

  communicationMode,

  context,

  this);

break;

处理来自broker端的响应数据包:

private SendResult sendMessageSync(

final String addr,

final String brokerName,

final Message msg,

final long timeoutMillis,

final RemotingCommand request

) throws RemotingException, MQBrokerException, InterruptedException {

RemotingCommand response = this.remotingClient.invokeSync(addr, request, timeoutMillis);

assert response != null;

return this.processSendResponse(brokerName, msg, response);

}

broker端处理request数据包后会将消息存储到commitLog,具体过程后续分析。


版权声明:本文内容由网络用户投稿,版权归原作者所有,本站不拥有其著作权,亦不承担相应法律责任。如果您发现本站中有涉嫌抄袭或描述失实的内容,请联系我们jiasou666@gmail.com 处理,核实后本网站将在24小时内删除侵权内容。

上一篇:光机电一体化系统接口设计(光机电一体化系统接口设计方案)
下一篇:关于本地post接口测试工具的信息
相关文章

 发表评论

暂时没有评论,来抢沙发吧~