Netty分布式ByteBuf使用的底层实现方式源码解析

网友投稿 299 2022-08-17


Netty分布式ByteBuf使用的底层实现方式源码解析

目录概述AbstractByteBuf属性和构造方法首先看这个类的属性和构造方法我们看几个最简单的方法我们重点关注第二个校验方法ensureWritable(length)我们跟到扩容的方法里面去最后将写指针后移length个字节

概述

熟悉Nio的小伙伴应该对jdk底层byteBuffer不会陌生, 也就是字节缓冲区, 主要用于对网络底层io进行读写, 当channel中有数据时, 将channel中的数据读取到字节缓冲区, 当要往对方写数据的时候, 将字节缓冲区的数据写到channel中

但是jdk的byteBuffer是使用起来有诸多不便, 比如只有一个标记位置的指针position, 在进行读写操作时要频繁的通过flip()方法进行指针位置的移动, 极易出错, 并且byteBuffer的内存一旦分配则不能改变, 不支持动态扩容, 当读写的内容大于缓冲区内存时, 则会发生索引越界异常

而Netty的ByteBuf对jdk的byteBuffer做了重新的定义, 同样是字节缓冲区用于读取网络io中的数据, 但是使用起来大大简化, 并且支持了自动扩容, 不用担心读写数据大小超过初始分配的大小

byteBuf根据其分类的不同底层实现方式有所不同, 有直接基于jdk底层byteBuffer实现的, 也有基于字节数组的实现的, 对于byteBuf的分类, 在后面的小节将会讲到

byteBuf中维护了两个指针, 一是读指针, 二是写指针, 两个指针相互独立, 在读操作的时候, 只会移动读指针, 通过指针位置记录读取的字节数

同样在写操作时, 也只会移动写指针, 通过写指针的位置记录写的字节数

在每次读写操作的过程中都会对指针的位置进行校验, 读指针的位置不能超过写指针, 否则会抛出异常

同样, 写指针不能超过缓冲区分配的内存, 则将对缓冲区做扩容操作

具体指针操作, 入下图所示:

AbstractByteBuf属性和构造方法

在讲AbstractByteBuf之前, 我们首先先了解一下ByteBuf这个类, 这是所有ByteBuf的最顶层抽象, 里面定义了大量对ByteBuf操作的抽象方法供子类实现

AbstractByteBuf同样也缓冲区的抽象类, 定义了byteBuf的骨架操作, 比如参数校验, 自动扩容, 以及一些读写操作的指针移动, 但具体的实现, 不同的bytebuf实现起来是不同的, 这种情况则交给其子类实现

AbstractByteBuf继承了这个类, 并实现了其大部分的方法

首先看这个类的属性和构造方法

//读指针

int readerIndex;

//写指针

int writerIndex;

//保存读指针

private int markedReaderIndex;

//保存写指针

private int markedWriterIndex;

//最大分配容量

private int maxCapacity;

protected AbstractByteBuf(int maxCapacitqrAXIy) {

if (maxCapacity < 0) {

throw new IllegalArgumentException("maxCapacity: " + maxCapacity + " (expected: >= 0)");

}

this.maxCapacity = maxCapacity;

}

我们可以看到在属性中定义了读写指针的成员标量, 和读写指针位置的保存

在构造方法中可以传入可分配的最大内存, 然后赋值到成员变量中

我们看几个最简单的方法

@Override

public int maxCapacity() {

return maxCapacity;

}

@Override

public int readerIndex() {

return readerIndex;

}

@Override

public int writerIndex() {

return writerIndex;

}

获取最大内存, 获取读写指针这些方法, 对所有的bytebuf都是通用的, 所以可以定义在AbstractByteBuf中

我们以一个writeBytes方法为例, 让同学们熟悉AbstractByteBuf中哪些部分自己实现, 哪些部分则交给了子类实现:

@Override

public ByteBuf writeBytes(ByteBuf src) {

writeBytes(src, src.readableBytes());

return this;

}

这个方法是将源的ByteBuf(参数)中的字节写入到自身ByteBuf中

首先这里调用了自身的writeBytes方法, 并传入参数ByteBuf本身, 以及Bytebuf的可读字节数, 我们跟到readbleBytes()方法中, 其实就是调用了自身的方法:

@Override

public int readableBytes() {

return writerIndex - readerIndex;

}

我们看到, 这里可读字节数就是返回了写指针到读指针之间的长度

我们再继续跟到writeBytes(src, src.readableBytes())中:

@Override

public ByteBuf writeBytes(ByteBuf src, int length) {

if (length > src.readableBytes()) {

throw new IndexOutOfBoundsException(String.format(

"length(%d) exceeds src.readableBytes(%d) where src is: %s", length, src.readableBytes(), src));

}

writeBytes(src, src.readerIndex(), length);

src.readerIndex(src.readerIndex() + length);

return this;

}

这里同样调用了自身的方法首先会对参数进行验证, 就是写入自身的长度不能超过源ByteBuf的可读字节数

这里又调用了一个wirte方法, 参数传入源Bytebuf, 其可读字节数, 写入的长度, 这里写入的长度我们知道就是源ByteBuf的可读字节数

我们再跟到writeBytes(src, src.readerIndex(), length);

public ByteBuf writeBytes(ByteBuf src, int srcIndex, int length) {

ensureAccessible();

ensureWritable(length);

setBytes(writerIndex, src, srcIndex, length);

writerIndex += length;

return this;

}

我们重点关注第二个校验方法ensureWritable(length)

public ByteBuf ensureWritable(int minWritableBytes) {

if (minWritableBytes < 0) {

throw new IllegalArgumentException(String.format(

"minWritableBytes: %d (expected: >= 0)", minWritableBytes));

}

ensureWritable0(minWritableBytes);

return this;

}

然后我们再跟到ensureWritable0(minWritableBytes)方法中:

private void ensureWritable0(int minWritableBytes) {

if (minWritableBytes <= writableBytes()) {

return;

}

if (minWritableBytes > maxCapacity - writerIndex) {

throw new IndexOutOfBoundsException(String.format(

"writerIndex(%d) + minWritableBytes(%d) exceeds maxCapacity(%d): %s",

writerIndex, minWritableBytes, maxCapacity, this));

}

//自动扩容

int newCapacity = alloc().calculateNewCapacity(writerIndex + minWritableBytes, maxCapacity);

capacity(newCapacity);

}

开始做了两个参数校验, 第一个表示当前ByteBuf写入的长度如果要小于可写字节数, 则返回

第二个可以换种方式去看minWritableBytes+ writerIndex> maxCapacity 也就是需要写入的长度+写指针必须要小于最大分配的内存, 否则报错, 注意这里最大分配内存不带表当前内存, 而是byteBuf所能分配的最大内存

如果需要写入的长度超过了可写字节数, 并且需要写入的长度+写指针不超过最大内存, 则就开始了ByteBuf非常经典也非常重要的操作, 也就是自动扩容

int newCapacity = alloc().calculateNewCapacity(writerIndex + minWritableBytes, maxCapacity);

其中alloc()返回的是当前bytebuf返回的缓冲区分配器对象, 我们之后的小节会讲到, 这里调用了其calculateNewCapacity(writerIndex + minWritableBytes, maxCapacity)方法为其扩容, 其中传入的参数writerIndex + minWritableBytes代表所需要的容量, maxCapacity为最大容量

我们跟到扩容的方法里面去

public int calculateNewCapacity(int minNewCapacity, int maxCapacity) {

//合法性校验

if (minNewCapacity < 0) {

throw new IllegalArgumentException("minNewCapacity: " + minNewCapacity + " (expectd: 0+)");

}

if (minNewCapacity > maxCapacity) {

throw new IllegalArgumentException(String.format(

"minNewCapacity: %d (expected: not greater than maxCapacity(%d)",

minNewCapacity, maxCapacity));

}

//阈值为4mb

final int threshold = 1048576 * 4;

//最小需要扩容内存(总内存) == 阈值

if (minNewCapacity == threshold) {

//返回阈值

return threshold;

}

//最小扩容内存>阈值

if (minNewCapacity > threshold) {

//newCapacity为需要扩容内存

int newCapacity = minNewCapacity / threshold * threshold;

//目标容量+阈值>最大容量

if (newCapacity > maxCapacity - threshold) {

//将最大容量作为新容量

newCapacity = maxCapacity;

} else {

//否则, 目标容量+阈值

newCapacity += threshold;

}

return newCapacity;

}

//如果小于阈值

int newCapacity = 64;

//目标容量<需要扩容的容量

while (newCapacity < minNewCapacity) {

//倍增

newCapacity <<= 1;

}

//目标容量和最大容量http://返回一个最小的

return Math.min(newCapacity, maxCapacity);

}

扩容相关的逻辑注释也写的非常清楚, 如果小于阈值(4mb), 采用倍增的方式, 如果大于阈值(4mb), 采用平移4mb的方式

我们回到writeBytes(ByteBuf src, int srcIndex, int length):

public ByteBuf writeBytes(ByteBuf src, int srcIndex, int length) {

ensureAccessible();

ensureWritable(length);

setBytes(writerIndex, src, srcIndex, length);

writerIndex += length;

return this;

}

再往下看setBytes(writerIndex, src, srcIndex, length), 这里的参数的意思是从当前byteBuf的writerIndex节点开始写入, 将源缓冲区src的读指针位置, 写lenght个字节, 这里的方法中AbstractByteBuf类并没有提供实现, 因为不同类型的BtyeBuf实现的方式是不一样的, 所以这里交给了子类去实现

最后将写指针后移length个字节

最后我们回到writeBytes(ByteBuf src, int length)方法中:

public ByteBuf writeBytes(ByteBuf src, int length) {

if (length > src.readableBytes()) {

throw new IndexOutOfBoundsException(String.format(

"length(%d) exceeds src.readableBytes(%d) where src is: %s", length, src.readableBytes(), src));

}

writeBytes(src, src.readerIndex(), length);

src.readerIndex(src.readerIndex() + length);

return this;

}

当writeBytes(src, src.readerIndex(), length)写完之后, 通过src.readerIndex(src.readerIndex() + length)将源缓冲区的读指针后移lenght个字节

以上对AbstractByteBuf的简单介绍和其中写操作的方法的简单剖析

以上就是Netty分布式ByteBuf使用的底层实现方式源码解析的详细内容,更多关于Netty分布式ByteBuf使用底层实现的资料请关注我们其它相关文章!


版权声明:本文内容由网络用户投稿,版权归原作者所有,本站不拥有其著作权,亦不承担相应法律责任。如果您发现本站中有涉嫌抄袭或描述失实的内容,请联系我们jiasou666@gmail.com 处理,核实后本网站将在24小时内删除侵权内容。

上一篇:Java9新特性中的模块化详解
下一篇:Netty分布式pipeline管道传播事件的逻辑总结分析
相关文章

 发表评论

暂时没有评论,来抢沙发吧~