RocketMQ获取指定消息的实现方法(源码)

网友投稿 410 2022-11-27


RocketMQ获取指定消息的实现方法(源码)

概要

消息查询是什么?

消息查询就是根据用户提供的msgId从MQ中取出该消息

RocketMQ如果有多个节点如何查询?

问题:RocketMQ分布式结构中,数据分散在各个节点,即便是同一Topic的数据,也未必都在一个broker上。客户端怎么知道数据该去哪个节点上查?

猜想1:逐个访问broker节点查询数据

猜想2:有某种数据中心存在,该中心知道所有消息存储的位置,只要向该中心查询即可得到消息具体位置,进而取得消息内容

实际:

1.消息Id中含有消息所在的broker的地址信息(IP\Port)以及该消息在CommitLog中的偏移量。

2.客户端实现会从msgId字符串中解析出broker地址,向指定broker节查询消息。

问题:CommitLog文件有多个,只有偏移量估计不能确定在哪个文件吧?

实际:单个Broker节点内offset是全局唯一的,不是每个CommitLog文件的偏移量都是从0开始的。单个节点内所有CommitLog文件共用一套偏移量,每个文件的文件名为其第一个消息的偏移量。所以可以根据偏移量和文件名确定CommitLog文件。

源码阅读

0.使用方式

MessageExt  msg = consumer.viewMessage(msgId);

1.消息ID解析

这个了解下就可以了

public class MessageId {

private SocketAddress address;

private long offset;

public MessageId(SocketAddress address, long offset) {

this.address = address;

this.offset = offset;

}

//get-set

}

//from MQAdminImpl.java

public MessageExt viewMessage(

String msgId) throws RemotingException, MQBrokerException, InterruptedException, MQClientException {

MessageId messageId = null;

try {

//从msgId字符串中解析出address和offset

//address = ip:port

//offset为消息在CommitLog文件中的偏移量

messageId = MessageDecoder.decodeMessageId(msgId);

} catch (Exception e) {

throw new MQClientException(ResponseCode.NO_MESSAGE, "query message by id finished, but no message.");

}

return this.mQClientFactory.getMQClientAPIImpl().viewMessage(RemotingUtil.socketAddress2String(messageId.getAddress()),

messageId.getOffset(), timeoutMillis);

}

//from MessageDecoder.java

public static MessageId decodeMessageId(final String msgId) throws UnknownHostException {

SocketAddress address;

long offset;

//ipv4和ipv6的区别

//如果msgId总长度超过32字符,则为ipv6

int ipLength = msgId.length() == 32 ? 4 * 2 : 16 * 2;

byte[] ip = UtilAll.string2bytes(msgId.substring(0, ipLength));

byte[] port = UtilAll.string2bytes(msgId.substring(ipLength, ipLength + 8));

ByteBuffer bb = ByteBuffer.wrap(port);

int portInt = bb.getInt(0);

address = new InetSocketAddress(InetAddress.getByAddress(ip), portInt);

// offset

byte[] data = UtilAll.string2bytes(msgId.substring(ipLength + 8, ipLength + 8 + 16));

bb = ByteBuffer.wrap(data);

offset = bb.getLong(0);

return new MessageId(address, offset);

}

2.长连接客户端RPC实现

要发请求首先得先建立连接,这里方法可以看到创建连接相关的操作。值得注意的是,第一次访问的时候可能连接还没建立,建立连接需要消耗一段时间。代码中对这个时间也做了判断,如果连接建立完成后,发现已经超时,则不再发出请求。目的应该是尽可能减少请求线程的阻塞时间。

//from NettyRemotingClient.java

@Override

public RemotingCommand invokeSync(String addr, final RemotingCommand request, long timeoutMillis)

throws InterruptedException, RemotingConnectException, RemotingSendRequestException, RemotingTimeoutException {

long beginStartTime = System.currentTimeMillis();

//这里会先检查有无该地址的通道,有则返回,无则创建

final Channel channel = this.getAndCreateChannel(addr);

if (channel != null && channel.isActive()) {

try {

//前置钩子

doBeforeRpcHooks(addr, request);

//判断通道建立完成时是否已到达超时时间,如果超时直接抛出异常。不发请求

long costTime = System.currentTimeMillis() - beginStartTime;

if (timeoutMillis < costTime) {

throw new RemotingTimeoutException("invokeSync call timeout");

}

//同步调用

RemotingCommand response = this.invokeSyncImpl(channel, request, timeoutMillis - costTime);

//后置钩子

doAfterRpcHooks(RemotingHelper.parseChannelRemoteAddr(channel), request, respohttp://nse); //后置钩子

return response;

} catch (RemotingSendRequestException e) {

log.warn("invokeSync: send request exception, so close the channel[{}]", addr);

this.closeChannel(addr, channel);

throw e;

} catch (RemotingTimeoutException e) {

if (nettyClientConfig.isClientCloseSocketIfTimeout()) {

this.closeChannel(addr, channel);

log.warn("invokeSync: close socket because of timeout, {}ms, {}", timeoutMillis, addr);

}

log.warn("invokeSync: wait response timeout exception, the channel[{}]", addr);

throw e;

}

} else {

this.closeChannel(addr, channel);

throw new RemotingConnectException(addr);

}

}

下一步看看它的同步调用做了什么处理。注意到它会构建一个Future对象加入待响应池,发出请求报文后就挂起线程,然后等待唤醒(waitResponse内部使用CountDownLatch等待)。

//from NettyRemotingAbstract.javapublic RemotingCommand invokeSyncImpl(final Channel channel, final RemotingCommand request,

final long timeoutMillis)

throws InterruptedException, RemotingSendRequestException, RemotingTimeoutException {

//请求id

final int opaque = request.getOpaque();

try {

//请求存根

final ResponseFuture responseFuture = new ResponseFuture(channel, opaque, timeoutMillis, null, null);

//加入待响应的请求池

this.responseTable.put(opaque, responseFuture);

final SocketAddress addr = channel.remoteAddress();

//将请求发出,成功发出时更新状态

channel.writeAndFlush(request).addListener(new ChannelFutureListener() {

@Override

public void operationComplete(ChannelFuture f) throws Exception {

if (f.isSuccess()) { //若成功发出,更新请求状态为“已发出”

responseFuture.setSendRequestOK(true);

return;

} else {

responseFuture.setSendRequestOK(false);

}

//若发出失败,则从池中移除(没用了,释放资源)

responseTable.remove(opaque);

responseFuture.setCause(f.cause());

//putResponse的时候会唤醒等待的线程

responseFuture.putResponse(null);

log.warn("send a request command to channel <" + addr + "> failed.");

}

});

//只等待一段时间,不会一直等下去

//若正常响应,则收到响应后,此线程会被唤醒,继续执行下去

//若超时,则到达该时间后线程苏醒,继续执行

RemotingCommand responseCommand = responseFuture.waitResponse(timeoutMillis);

if (null == responseCommand) {

if (responseFuture.isSendRequestOK()) {

throw new RemotingTimeoutException(RemotingHelper.parseSocketAddressAddr(addr), timeoutMillis,

responseFuture.getCause());

} else {

throw new RemotingSendRequestException(RemotingHelper.parseSocketAddressAddr(addr), responseFuture.getCause());

}

}

return responseCommand;

} finally {

//正常响应完成时,将future释放(正常逻辑)

//超时时,将future释放。这个请求已经作废了,后面如果再收到响应,就可以直接丢弃了(由于找不到相关的响应钩子,就不处理了)

this.responseTable.remove(opaque);

}

}

好,我们再来看看收到报文的时候是怎么处理的。我们都了解JDK中的Future的原理,大概就是将这个任务提交给其他线程处理,该线程处理完毕后会将结果写入到Future对象中,写入时如果有线程在等待该结果,则唤醒这些线程。这里也差不多,只不过执行线程在服务端,服务执行完毕后会将结果通过长连接发送给客户端,客户端收到后根据报文中的ID信息从待响应池中找到Future对象,然后就是类似的处理了。

class NettyClientHandler extends SimpleChannelInboundHandler {

//底层解码完毕得到RemotingCommand的报文

@Override

protected void channelRead0(ChannelHandlerContext ctx, RemotingCommand msg) throws Exception {

processMessageReceived(ctx, msg);

}

}

public void processMessageReceived(ChannelHandlerContext ctx, RemotingCommand msg) throws Exception {

final RemotingCommand cmd = msg;

if (cmd != null) {

//判断类型

switch (cmd.getType()) {

case REQUEST_COMMAND:

processRequestCommand(ctx, cmd);

break;

case RESPONSE_COMMAND:

processResponseCommand(ctx, cmd);

break;

default:

break;

}

}

}

public void processResponseCommand(ChannelHandlerContext ctx, RemotingCommand cmd) {

//取得消息id

final int opaque = cmd.getOpaque();

//从待响应池中取得对应请求

final ResponseFuture responseFuture = responseTable.get(opaque);

if (responseFuture != null) {

//将响应值注入到ResponseFuture对象中,等待线程可从这个对象获取结果

responseFuture.setResponseCommand(cmd);

//请求已处理完毕,释放该请求

responseTable.remove(opaque);

//如果有回调函数的话则回调(由当前线程处理)

if (responseFuture.getInvokeCallback() != null) {

executeInvokeCallback(responseFuture);

} else {

//没有的话,则唤醒等待线程(由等待线程做处理)

responseFuture.putResponse(cmd);

responseFuture.release();

}

} else {

log.warn("receive response, but not matched any request, " + RemotingHelper.parseChannelRemoteAddr(ctx.channel()));

log.warn(cmd.toString());

}

}

总结一下,客户端的处理时序大概是这样的:

结构大概是这样的:

3.服务端的处理

//todo 服务端待补充CommitLog文件映射相关内容

class NettyServerHandler extends SimpleChannelInboundHandler {

@Override

protected void channelRead0(ChannelHandlerContext ctx, RemotingCommand msg) throws Exception {

processMessageReceived(ctx, msg);

}

}

//from NettyRemotingAbscract.java

public void processMessageReceived(ChannelHandlerContext ctx, RemotingCommand msg) throws Exception {

final RemotingCommand cmd = msg;

if (cmd != null) {

switch (cmd.getType()) {

case REQUEST_COMMAND: //服务端走这里

processRequestCommand(ctx, cmd);

break;

case RESPONSE_COMMAND:

processResponseCommand(ctx, cmd);

break;

default:

break;

}

}

}

//from NettyRemotingAbscract.java

public void processRequestCommand(final ChannelHandlerContext ctx, final RemotingCommand cmd) {

//查看有无该请求code相关的处理器

final Pair matched = this.processorTable.get(cmd.getCode());

//如果没有,则使用默认处理器(可能没有默认处理器)

final Pair pair = null == matched ? this.defaultRequestProcessor : matched;

final int opaque = cmd.getOpaque();

if (pair != null) {

Runnable run = new Runnable() {

@Override

public void run() {

try {

doBeforeRpcHooks(RemotingHelper.parseChannelRemoteAddr(ctx.channel()), cmd);

final RemotingResponseCallback callback = new RemotingResponseCallback() {

@Override

public void callback(RemotingCommand response) {

doAfterRpcHooks(RemotingHelper.parseChannelRemoteAddr(ctx.channel()), cmd, response);

if (!cmd.isOnewayRPC()) {

if (response != null) { //不为null,则由本类将响应值写会给请求方

response.setOpaque(opaque);

response.markResponseType();

try {

ctx.writeAndFlush(response);

} catch (Throwable e) {

log.error("process request over, but response failed", e);

log.error(cmd.toString());

log.error(response.toString());

}

} else { //为null,意味着processor内部已经将响应处理了,这里无需再处理。

}

}

}

};

if (pair.getObject1() instanceof AsyncNettyRequestProcessor) {//QueryMessageProcessor为异步处理器

AsyncNettyRequestProcessor processor = (AsyncNettyRequestProcessor)pair.getObject1();

processor.asyncProcessRequest(ctx, cmd, callback);

} else {

NettyRequestProcessor processor = pair.getObject1();

RemotingCommand response = processor.processRequest(ctx, cmd);

doAfterRpcHooks(RemotingHelper.parseChannelRemoteAddr(ctx.channel()), cmd, response);

callback.callback(response);

}

} catch (Throwable e) {

log.error("process request exception", e);

log.error(cmd.toString());

if (!cmd.isOnewayRPC()) {

final RemotingCommand response = RemotingCommand.createResponseCommand(RemotingSysResponseCode.SYSTEM_ERROR,

RemotingHelper.exceptionSimpleDesc(e));

response.setOpaque(opaque);

ctx.writeAndFlush(response);

}

}

}

};

if (pair.getObject1().rejectRequest()) {

final RemotingCommand response = RemotingCommand.createResponseCommand(RemotingSysResponseCode.SYSTEM_BUSY,

"[REJECTREQUEST]system busy, start flow control for a while");

response.setOpaque(opaque);

ctx.writeAndFlush(response);

return;

}

try {

final RequestTask requestTask = new RequestTask(run, ctx.channel(), cmd);

pair.getObject2().submit(requestTask);

} catch (RejectedExecutionException e) {

if ((System.currentTimeMillis() % 10000) == 0) {

log.warn(RemotingHelper.parseChannelRemoteAddr(ctx.channel())

+ ", too many requests and system thread pool busy, RejectedExecutionException "

+ pair.getObject2().toString()

+ " request code: " + cmd.getCode());

}

if (!cmd.isOnewayRPC()) {

final RemotingCommand response = RemotingCommand.createResponseCommand(RemotingSysResponseCode.SYSTEM_BUSY,

"[OVERLOAD]system busy, start flow control for a while");

response.setOpaque(opaque);

ctx.writeAndFlush(response);

}

}

} else {

String error = " request type " + cmd.getCode() + " not supported";

final RemotingCommand response =

RemotingCommand.createResponseCommand(RemotingSysResponseCode.REQUEST_CODE_NOT_SUPPORTED, error);

response.setOpaque(opaque);

ctx.writeAndFlush(response);

log.error(RemotingHelper.parseChannelRemoteAddr(ctx.channel()) + error);

}

}

//from QueryMessageProcesor.java

@Override

public RemotingCommand processRequest(ChannelHandlerContext ctx, RemotingCommand request)

throws RemotingCommandException {

switch (request.getCode()) {

case RequestCode.QUERY_MESSAGE:

return this.queryMessage(ctx, request);

case RequestCode.VIEW_MESSAGE_BY_ID: //通过msgId查询消息

return this.viewMessageById(ctx, request);

default:

break;

}

return null;

}

public RemotingCommand viewMessageById(ChannelHandlerContext ctx, RemotingCommand request)

throws RemotingCommandException {

final RemotingCommand response = RemotingCommand.createResponseCommand(null);

final ViewMessageRequestHeader requestHeader =

(ViewMessageRequestHeader) request.decodeCommandCustomHeader(ViewMessageRequestHeader.class);

response.setOpaque(request.getOpaque());

//getMessagetStore得到当前映射到内存中的CommitLog文件,然后根据偏移量取得数据

final SelectMappedBufferResult selectMappedBufferResult =

this.brokerController.getMessageStore().selectOneMessageByOffset(requestHeader.getOffset());

if (selectMappedBufferResult != null) {

response.setCode(ResponseCode.SUCCESS);

response.setRemark(null);

//将响应通过socket写回给客户端

try {

//response对象的数据作为header

//消息内容作为body

FileRegion fileRegion =

new OneMessageTransfer(response.encodeHeader(selectMappedBufferResult.getSize()),

selectMappedBufferResult);

ctx.channel().writeAndFlush(fileRegion).addListener(new Channelhttp://FutureListener() {

@Override

public void operationComplete(ChannelFuture future) throws Exception {

selectMappedBufferResult.release();

if (!future.isSuccess()) {

log.error("Transfer one message from page cache failed, ", future.cause());

}

}

});

} catch (Throwable e) {

log.error("", e);

selectMappedBufferResult.release();

}

return null; //如果有值,则直接写回给请求方。这里返回null是不需要由外层处理响应。

} else {

response.setCode(ResponseCode.SYSTEM_ERROR);

response.setRemark("can not find message by the offset, " + requestHeader.getOffset());

}

return response;

}

总结


版权声明:本文内容由网络用户投稿,版权归原作者所有,本站不拥有其著作权,亦不承担相应法律责任。如果您发现本站中有涉嫌抄袭或描述失实的内容,请联系我们jiasou666@gmail.com 处理,核实后本网站将在24小时内删除侵权内容。

上一篇:java8新特性之接口默认方法示例详解
下一篇:Java后台通过Collections获取list集合中最大数,最小数代码
相关文章

 发表评论

暂时没有评论,来抢沙发吧~