Netty之非阻塞处理(netty 异步非阻塞)

网友投稿 591 2022-10-12


Netty之非阻塞处理(netty 异步非阻塞)

Netty 是一个异步的、基于事件驱动的网络应用框架,用以快速开发高性能、高可靠性的网络 IO 程序。

一、异步模型

同步I/O : 需要进程去真正的去操作I/O;

异步I/O:内核在I/O操作完成后再通知应用进程操作结果。

怎么去理解同步和异步?

同步:比如服务端发送数据给客户端,客户端中的处理器(继承一个入站处理器即可),可以去重写 channelRead0 方法,那么该方法触发的时候,其实必须得服务器有消息发过来,客户端才能去读写,两者必须是有先后顺序,这就是所谓的同步。 异步:客户端在服务端发送数据来之前就已经返回数据给了用户,但客户端已经告诉服务端数据到了要通过订阅的方式(大名鼎鼎的观察者模式),文章最后已经附上传送门,理解设计模式

比如上一篇关于Netty的AttributeKey和AttributeMap的原理和使用,这里不妨讲讲它的缺点

二、异步模型存在的问题

使用流程

Step1 使用 AttributeKey 设置 key 值和 k-v 对,为 channel 获取 值做准备

创建一个处理器 NettyClientHandler 继承 SimpleChannelInboundHandler<RpcResponse>,它已经实现了 入站处理器相关的功能,只要重写它的 channelRead0 方法即可

public class NettyClientHandler extends SimpleChannelInboundHandler { @Override protected void channelRead0(ChannelHandlerContext ctx, RpcResponse msg) throws Exception { try { AttributeKey key = AttributeKey.valueOf(msg.getRequestId()); ctx.channel().attr(key).set(msg); ctx.channel().close(); } finally { ReferenceCountUtil.release(msg); } } }

记得将该 处理器 加入到 客户端 bootStrap 的 handler()方法中,需要 通过默认的 初始化器new ChannelInitializer<SocketChannel>()(也是一个处理器)去初始化处理器链,我是通过匿名内部类去重写 initChannel 方法的,最后addLast() 刚刚自己写的处理器即可。

创建服务器和客户端,这里不再赘述,这篇文章对刚入门的帮助不大,可到文章最后取经拿服务端和客户端。

Step2 使用 channel 的 attr 方法,获取 k-v 值 客户端这里NettyClient 通过用户调用 sendRequest() 方法,去向服务端发送信息,返回值是服务端发回的消息,我们都知道,信息都是在处理器获取的,也就是在channelRead0方法中,所以我们要在sendRequest()方法中,获取服务端传来的值,通过下面代码获取 @Override public Object sendRequest(RpcRequest rpcRequest) throws RpcException { // 通过 host 和 port 获取 channel //省略 // 写入 channel 让 服务端 去 读 request channel.writeAndFlush(rpcRequest); // 获取 k-v 对 RpcResponse rpcResponse = channel.attr(key).get(); } 相信你们当中有一部分发觉了异样,sendRequest()方法和channelRead0()不会同步,就是说你发送数据后,会立马执行到 获取 k-v 的代码,不能阻塞住等待 channelRead0()方法把 k-v 值 set 进去

最后测试到,客户端拿不到值,总是为null

那怎么保持使用异步操作,并且可以顺利拿到值呢?

那么就得通过future来实现,就是先返回值,但值还是没有的,后面让用户自己用future的方法get阻塞拿值,说白了,还是要去同步,只是同步由CPU转到了用户自己手中,慢慢品

三、使用CompletableFuture 解决异步问题

CompletableFuture 使用方法

CompletableFuture resultFuture = new CompletableFuture<>(); /**complete 执行结束后,状态发生改变,则 说明 值已经传到了,complete 是 (被观察者) 通知类的通知方法,通知 观察者 ,get 方法将 不再阻塞,可以获取到值 */ resultFuture .complete(msg); /**获取 正确结果,get 是阻塞操作,所以 先把 resultFuture 作为 返回值 返回,再 get 获取值 */ RpcResponse rpcResponse = resultFuture.get(); // 获取 错误结果, 抛 异常 处理 resultFuture.completeExceptionally(future.cause());

所以我们要做的就是在channelRead0()中 做 complete(),最后 用户直接 get得到数据即可,只要把sendRequest()方法的返回类型改为CompletableFuture 就可以了。

简单来说就是通过使用这个CompletableFuture,让 response不至于返回后是null,因为我们自己new了一个CompletableFuture类,这个类会被通知,并把结果告知给它

需要注意的是,在 客户端的sendRequest()方法拿到的 CompletableFuture<RpcResponse> 和在channelRead0()拿到的必须为同一个,可以设计成单例模式,这里是很泛化的单例,通用

public class SingleFactory { private static Map objectMap = new HashMap<>(); private SingleFactory() {} /** * 使用 双重 校验锁 实现 单例模式 * @param clazz * @param * @return */ public static T getInstance(Class clazz) { Object instance = objectMap.get(clazz); if (instance == null) { synchronized (clazz) { if (instance == null) { try { instance = clazz.newInstance(); } catch (InstantiationException | IllegalAccessException e) { throw new RuntimeException(e.getMessage(), e); } } } } return clazz.cast(instance); } }

下面这样实现是因为涉及到多个客户端并发访问同一个服务器,设计的原因如下:

如果是同一个客户端要采用发起多个线程去请求服务端,设计时如果多个线程的rpcRequest请求id一样,那么要考虑线程安全 如果是不同客户端发起请求服务端,又要保证线程之间对CompleteFuture是线程安全的,确保性能,不能用让所有线程共享同一个 CompleteFuture,这样通知会变为不定向,不可用,因此考虑使用map暂时缓存所有CompleteFuture,更加高效

public class UnprocessedRequests { /** * k - request id * v - 可将来获取 的 response */ private static ConcurrentMap> unprocessedResponseFutures = new ConcurrentHashMap<>(); /** * @param requestId 请求体的 requestId 字段 * @param future 经过 CompletableFuture 包装过的 响应体 */ public void put(String requestId, CompletableFuture future) { System.out.println("put" + future); unprocessedResponseFutures.put(requestId, future); } /** * 移除 CompletableFuture * @param requestId 请求体的 requestId 字段 */ public void remove(String requestId) { unprocessedResponseFutures.remove(requestId); } public void complete(RpcResponse rpcResponse) { CompletableFuture completableFuture = unprocessedResponseFutures.remove(rpcResponse.getRequestId()); completableFuture.complete(rpcResponse); System.out.println("remove" + completableFuture); } }

传送门:

设计模式:https://gitee.com/fyphome/git-res/tree/master/design-patterns或者:https://github.com/Fyupeng/java/tree/main/design_patterns服务端和客户端的实现:https://github.com/Fyupeng/java/tree/main/NettyPro/src/main/java/com/fyp/netty/groupchat​

四、结束语

评论区可留言,可私信,可互相交流学习,共同进步,欢迎各位给出意见或评价,本人致力于做到优质文章,希望能有幸拜读各位的建议!个人博客园:https://cnblogs.com/fyphome/

专注品质,热爱生活。交流技术,寻求同志。


版权声明:本文内容由网络用户投稿,版权归原作者所有,本站不拥有其著作权,亦不承担相应法律责任。如果您发现本站中有涉嫌抄袭或描述失实的内容,请联系我们jiasou666@gmail.com 处理,核实后本网站将在24小时内删除侵权内容。

上一篇:Java 如何调用long的最大值和最小值
下一篇:​RENIX非对称时延测试——网络测试仪实操
相关文章

 发表评论

暂时没有评论,来抢沙发吧~