netty服务端辅助类ServerBootstrap创建逻辑分析

网友投稿 460 2022-08-17


netty服务端辅助类ServerBootstrap创建逻辑分析

目录ServerBootstrap创建核心参数初始化流程首先执行绑定注册自身到 EventLoop绑定端口逻辑

ServerBootstrap创建

ServerBootstrap 为 netty 建立服务端的辅助类, 以 NIO为例,创建代码如下:

public static void main(String[] args) throws Exception {

ServerBootstrap bs = new ServerBootstrap();

bs.group(new NioEventLoopGroup(1), new NioEventLoopGroup())

.channel(NioServerSocketChannel.class)

.childHandler(new ChannelInitializer() {

@Override

protected void initChannel(Channel ch) throws Exception {

ch.pipeline()

.addLast(new HttpServerCodec())

.addLast(new HttpObjectAggregator(65535))

.addLast(new Controller());

}

}).bind(8080).sync().channel().closeFuture().sync();

}

核心参数

//配置属性,如 SO_KEEPALIVE 等private final ServerBootstrapConfig config = new ServerBootstrapConfig(this);

//acceot 的 子channel所绑定的 事件循环组"

private volatile EventLoopGroup childGroup;

private volatile ChannelHandler childHandler;

初始化流程

主要为 绑定本地端口 -> 注册自身到 EventLoop , 并注册 accept 和 read 事件 -> EventLoop的主循环中会不断的select注册的channel的事件,并处理。

首先执行绑定

核心逻辑位于

io.netty.bootstrap.AbstractBootstrap.doBind(SocketAddress) 和  io.netty.bootstrap.AbstractBootstrap.initAndRegister()中

private ChannelFuture doBind(final SocketAddress localAddress) {

final ChannelFuture regFuture = initAndRegister();

..........if (regFuture.isDone()) {

// At this point we know that the registration was complete and successful.

ChannelPromise promise = channel.newPromise();

//绑定逻辑

doBind0(regFuture, channel, localAddress, promise);

return promise;

} else {

// Registration future is almost always fulfilled already, but just in case it's not.

final PendingRegistrationPromise promise = new PendingRegistrationPromise(channel);

regFuture.addListener(new ChannelFutureListener() {

@Override

public void operationComplete(ChannelFuture future) throws Exception {

Throwable cause = future.cause();

if (cause != null) {

// Registration on the EventLoop failed so fail the ChannelPromise directly to not cause an

// IllegalStateException once we try to access the EventLoop of the Channel.

promise.setFailure(cause);

} else {

// Registration was successful, so set the correct executor to use.

// See https://github.com/netty/netty/issues/2586

promise.registered();

doBind0(regFuture, channel, localAddress, promise);

}

}

});

return promise;

}

}

注册自身到 EventLoop

先来看 initAndRegister , 核心逻辑就是利用channelFactory初始化一个NioServerSocketChannel实例,并为其设置上config中的参数,然后将其注册到EventLoop中,实际上是委托的channel的Unsafe来实现注册的,核心逻辑位于 AbstractUnsafe.register0 中 完成注册

final ChannelFuture initAndRegister() {

Channel channel = null;

try {

//本例子中实际调用的是 NioServerSocketChannel的构造参数, 并为其设置感兴趣的事件类型为 OP_ACCEPT

channel = channelFactory.newChannel();

init(channel);

} catch (Throwable t) {

if (channel != null) {

// channel can be null if newChannel crashed (eg SocketException("too many open files"))

channel.unsafe().closeForcibly();

}

// as the Channel is not registered yet we need to force the usage of the GlobalEventExecutor

return new DefaultChannelPromise(channel, GlobalEventExecutor.INSTANCE).setFailure(t);

}

ChannelFuture regFuture = config().group().register(channel);

if (regFuture.cause() != null) {

if (channel.isRegistered()) {

channel.close();

} else {

channel.unsafe().closeForcibly();

}

}

return regFuture;

}

void init(Channel channel) throws Exception {

//设置属性

..........

p.addLast(new ChannelInitializer() {

@Override

public void initChannel(final Channel ch) throws Exception {

final ChannelPipeline pipeline = ch.pipeline();

ChannelHandler handler = config.handler();

if (handler != null) {

pipeline.addLast(handler);

}

ch.eventLoop().execute(new Runnable() {

@Override

public void run() {

//为NioServerSocketChannel 设置一个 默认的 channelhandler : ServerBootstrapAcceptor , 当发生 accept事件时,将 accept的channel注册到 childEventLoop中

pipeline.addLast(new ServerBootstrapAcceptor(

ch, currentChildGroup, currentChildHandler, currentChildOptions, currentChildAttrs));

}

});

}

});

}

private void register0(ChannelPromise promise) {

try {

// check if the channel is still open as it could be closed in the mean time when the register

// call was outside of the eventLoop

if (!promise.setUncancellable() || !ensureOpen(promise)) {

return;

}

boolean firstRegistration = neverRegistered;

//执行channel到 eventloop的 selector

doRegister();

neverRegistered = false;

registered = true;

// Ensure we call handlerAdded(...) before we actually notify the promise. This is needed as the

// user may already fire events through the pipeline in the ChannelFutureListener.

pipeline.invokeHandlerAddedIfNeeded();

safeSetSuccess(promise);

//触发 InboundChannelHnader.channelRegistered 事件

pipeline.fireChannelRegistered();

// Only fire a channelActive if the channel has never been registered. This prevents firing // multiple channel actives if the channel is deregistered and re-registered. if (isActive()) { if (firstRegistration) {

//触发channelActive事件,并会为 channel 绑定上 read 事件

pipeline.fireChannelActive();

} else if (config().isAutoRead()) {

// This channel was registered before and autoRead() is set. This means we need to begin read

// again so that we process inbound data.

//

// See https://github.com/netty/netty/issues/4805

beginRead();

}

}

} catch (Throwable t) {

// Close the channel directly to avoid FD leak.

closeForcibly();

closeFuture.setClosed();

http:// safeSetFailure(promise, t);

}

}

绑定端口逻辑

initAndRegister注册成功后,开始执行真正的绑定端口逻辑,核心逻辑位于 NioSocketChannel.doBind0(SocketAddress) 中

private void doBind0(SocketAddress localAddress) throws Exception {

if (PlatformDependent.javaVersion() >= 7) {

SocketUtils.bind(javaChannel(), localAddress);

} else {

SocketUtils.bind(javaChannel().socket(), localAddress);

}

}

至此 绑定个成功, 当触发 ACCEPT 事件时, 会触发  NioServerSocketChannel.doReadMessages -> ServerBootstrapAcceptor.channelRead , 并将 子channel 注册到 childEventLoop中

public void channelRead(ChannelHandlerContext ctx, Object msg) {

final Channel child = (Channel) msg;

child.pipeline().addLast(childHandler);

setChannelOptions(child, childOptions, logger);

for (Entry, Object> e: childAttrs) {

child.attr((AttributeKey) e.getKey()).set(e.getValue());

}

try {

//注册channel

childGroup.register(child).addListener(new ChannelFutureListener() {

@Override

public void operationComplete(ChannelFuture future) throws Exception {

if (!future.isSuccess()) {

forceClose(child, future.cause());

}

}

});

} catch (Throwable t) {

forceClose(child, t);

}

}

以上就是netty服务端辅助类ServerBootstrap创建逻辑分析的详细内容,更多关于netty辅助类ServerBootstrap创建逻辑的资料请关注我们其它相关文章!


版权声明:本文内容由网络用户投稿,版权归原作者所有,本站不拥有其著作权,亦不承担相应法律责任。如果您发现本站中有涉嫌抄袭或描述失实的内容,请联系我们jiasou666@gmail.com 处理,核实后本网站将在24小时内删除侵权内容。

上一篇:基于SpringBoot实现自动装配返回属性的设计思路
下一篇:Netty事件循环主逻辑NioEventLoop的run方法分析
相关文章

 发表评论

暂时没有评论,来抢沙发吧~