Spring Cloud Gateway 内存溢出的解决方案

网友投稿 1420 2022-10-11


Spring Cloud Gateway 内存溢出的解决方案

记 Spring Cloud Gateway 内存溢出查询过程

环境配置:

org.springframework.boot : 2.1.4.RELEASE

org.springframework.cloud :Greenwich.SR1

事故记录:

由于网关存在 RequestBody 丢失的情况,顾采用了网上的通用解决方案,使用如下方式解决:

@Bean

public RouteLocator tpauditRoutes(RouteLocatorBuilder builder) {

return builder.routes().route("gateway-post", r -> r.order(1)

.method(HttpMethod.POST)

.and()

.readBody(String.class, requestBody -> {return true;}) # 重点在这

.and()

.path("/gateway/**")

.filters(f -> {f.stripPrefix(1);return f;})

.uri("lb://APP-API")).build();

}

测试环境,Spring Cloud Gateway 网关功能编写完成。开始进行测试环境压测。

正常采用梯度压测方式,最高用户峰值设置为400并发。经历两轮时长10分钟左右压测,没有异常情况出现。

中午吃饭时间,设置了1个小时的时间进行测试。

回来的时候系统报出如下异常

2019-08-12 15:06:07,296 1092208 [reactor-http-server-epoll-12] WARN io.netty.channel.AbstractChannelHandlerContext.warn:146 - An exception '{}' [enable DEBUG level for full stacktrace] was thrown by a user handler's exceptionCaught() method while handling the following exception:

io.netty.util.internal.OutOfDirectMemoryError: failed to allocate 16777216 byte(s) of direct memory (used: 503316487, max: 504889344)

at io.netty.util.internal.PlatformDependent.incrementMemoryCounter(PlatformDependent.java:640)

at io.netty.util.internal.PlatformDependent.allocateDirectNoCleaner(PlatformDependent.java:594)

at io.netty.buffer.PoolArena$DirectArena.allocateDirect(PoolArena.java:764)

at io.netty.buffer.PoolArena$DirectArena.newChunk(PoolArena.java:740)

at io.netty.buffer.PoolArena.allocateNormal(PoolArena.java:244)

at io.netty.buffer.PoolArena.allocate(PoolArena.java:214)

at io.netty.buffer.PoolArena.allocate(PoolArena.java:146)

at io.netty.buffer.PooledByteBufAllocator.newDirectBuffer(PooledByteBufAllocator.java:324)

at io.netty.buffer.AbstractByteBufAllocator.directBuffer(AbstractByteBufAllocator.java:185)

at io.netty.buffer.AbstractByteBufAllocator.directBuffer(AbstractByteBufAllocator.java:176)

at io.netty.buffer.AbstractByteBufAllocator.ioBuffer(AbstractByteBufAllocator.java:137)

at io.netty.channel.DefaultMaxMessagesRecvByteBufAllocator$MaxMessageHandle.allocate(DefaultMaxMessagesRecvByteBufAllocator.java:114)

at io.netty.channel.epoll.EpollRecvByteAllocatorHandle.allocate(EpollRecvByteAllocatorHandle.java:72)

at io.netty.channel.epoll.AbstractEpollStreamChannel$EpollStreamUnsafe.epollInReady(AbstractEpollStreamChannel.java:793)

at io.netty.channel.epoll.AbstractEpollChannel$AbstractEpollUnsafe$1.run(AbstractEpollChannel.java:382)

at io.netty.util.concurrent.AbstractEventExecutor.safeExecute(AbstractEventExecutor.java:163)

at io.netty.util.concurrent.SingleThreadEventExecutor.runAllTasks(SingleThreadEventExecutor.java:404)

at io.netty.channel.epoll.EpollEventLoop.run(EpollEventLoop.java:315)

at io.

当时一脸懵逼,马上开始监控 Jvm 堆栈,减少jvm的内存空间,提升并发数以后,重启项目重新压测,

项目启动参数如下:

java -jar -Xmx1024M /opt/deploy/gateway-appapi/cloud-employ-gateway-0.0.5-SNAPSHOT.jar

↓↓↓↓修改为↓↓↓↓

java -jar -Xmx512M /opt/deploy/gateway-appapi/cloud-employ-gateway-0.0.5-SNAPSHOT.jar

缩减了一半内存启动,等待问题复现。等待3分钟问题再次复现,但是同时Jvm却的进行了Full GC。

EC EU OC OU MC MU CCSC CCSU YGC YGCT FGC FGCT

275456.0 100103.0 484864.0 50280.2 67672.0 64001.3 9088.0 8463.2 501 11.945 3 0.262

275968.0 25072.3 484864.0 47329.3 67672.0 63959.4 9088.0 8448.8 502 11.970 4 0.429

没错,在出现问题的时候,系统出现了Full Gc,但是OU并没有达到触发的原因。

结合日志中的 direct memory,想到了Jvm 中的堆外内存。

使用 -XX:MaxDirectMemorySize 可以进行设置 Jvm 堆外内存大小,当 Direct ByteBuffer 分配的堆外内存到达指定大小后,即触发Full GC。

该值是有上限的,默认是64M,最大为 sun.misc.VM.maxDirectMemory()。

结合所有情况,表明堆外内存使用存在内存溢出的情况。

报错内容为Netty框架,新增以下配置,开启Netty错误日志打印:

-Dio.netty.leakDetection.targetRecords=40 #设置Records 上限

-Dio.netty.leakDetection.level=advanced #设置日志级别

项目启动,没任何问题,开启压测后服务报出如下异常:

2019-08-13 14:59:01,656 18047 [reactor-http-nio-7] ERROR io.netty.util.ResourceLeakDetector.reportTracedLeak:317 - LEAK: ByteBuf.release() was not called before it's garbage-collected. See http://netty.io/wiki/reference-counted-objects.html for more information.

Recent access records:

#1:

org.springframework.core.io.buffer.NettyDataBuffer.release(NettyDataBuffer.java:301)

org.springframework.core.io.buffer.DataBufferUtils.release(DataBufferUtils.java:420)

org.springframework.core.codec.StringDecoder.decodeDataBuffer(StringDecoder.java:208)

org.springframework.core.codec.StringDecoder.decodeDataBuffer(StringDecoder.java:59)

org.springframework.core.codec.AbstractDataBufferDecoder.lambda$decodeToMono$1(AbstractDataBufferDecoder.java:68)

reactor.core.publisher.FluxMapFuseable$MapFuseableSubscriber.onNext(FluxMapFuseable.java:107)

reactor.core.publisher.FluxContextStart$ContextStartSubscriber.onNext(FluxContextStart.java:103)

reactor.core.publisher.FluxMapFuseable$MapFuseableConditionalSubscriber.onNext(FluxMapFuseable.java:287)

reactor.core.publisher.FluxFilterFuseable$FilterFuseableConditionalSubscriber.onNext(FluxFilterFuseable.java:331)

reactor.core.publisher.Operators$MonoSubscriber.complete(Operators.java:1505)

reactor.core.publisher.MonoCollectList$MonoBufferAllSubscriber.onComplete(MonoCollectList.java:123)

reactor.core.publisher.FluxJust$WeakScalarSubscription.request(FluxJust.java:101)

reactor.core.publisher.MonoCollectList$MonoBufferAllSubscriber.onSubscribe(MonoCollectList.java:90)

reactor.core.publisher.FluxJust.subscribe(FluxJust.java:70)

reactor.core.publisher.FluxDefer.subscribe(FluxDefer.java:54)

reactor.core.publisher.MonoCollectList.subscribe(MonoCollectList.java:59)

reactor.core.publisher.MonoFilterFuseable.subscribe(MonoFilterFuseable.java:44)

reactor.core.publisher.MonoMapFuseable.subscribe(MonoMapFuseable.java:56)

reactor.core.publisher.MonoSubscriberContext.subscribe(MonoSubscriberContext.java:47)

reactor.core.publisher.MonoMapFuseable.subscribe(MonoMapFuseable.java:59)

reactor.core.publisher.MonoOnErrorResume.subscribe(MonoOnErrorResume.java:44)

reactor.core.publisher.MonoOnErrorResume.subscribe(MonoOnErrorResume.java:44)

reactor.core.publisher.MonoPeek.subscribe(MonoPeek.java:71)

reactor.core.publisher.MonoMap.subscribe(MonoMap.java:55)

reactor.core.publisher.MonoFlatMap$FlatMapMain.onNext(MonoFlatMap.java:150)

reactor.core.publisher.FluxContextStart$ContextStartSubscriber.onNext(FluxContextStart.java:103)

reactor.core.publisher.FluxMapFuseable$MapFuseableConditionalSubscriber.onNext(FluxMapFuseable.java:287)

reactor.core.publisher.FluxFilterFuseable$FilterFuseableConditionalSubscriber.onNext(FluxFilterFuseable.java:331)

reactor.core.publisher.Operators$MonoSubscriber.complete(Operators.java:1505)

reactor.core.publisher.MonoCollectList$MonoBufferAllSubscriber.onComplete(MonoCollectList.java:123)

reactor.core.publisher.FluxMap$MapSubscriber.onComplete(FluxMap.java:136)

reactor.core.publisher.FluxPeek$PeekSubscriber.onComplete(FluxPeek.java:252)

reactor.core.publisher.FluxMap$MapSubscriber.onComplete(FluxMap.java:136)

reactor.netty.channel.FluxReceive.terminateReceiver(FluxReceive.java:372)

reactor.netty.channel.FluxReceive.drainReceiver(FluxReceive.java:196)

reactor.netty.channel.FluxReceive.onInboundComplete(FluxReceive.java:337)

reactor.netty.channel.ChannelOperations.onInboundComplete(ChannelOperations.java:333)

reactor.netty.http.server.HttpServerOperations.onInboundNext(HttpServerOperations.java:453)

reactor.netty.channel.ChannelOperationsHandler.channelRead(ChannelOperationsHandler.java:141)

io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:359)

io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:345)

io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:337)

reactor.netty.http.server.HttpTrafficHandler.channelRead(HttpTrafficHandler.java:191)

io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:359)

io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:345)

io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:337)

io.netty.channel.CombinedChannelDuplexHandler$DelegatingChannelHandlerContext.fireChannelRead(CombinedChannelDuplexHandler.java:438)

io.netty.handler.codec.ByteToMessageDecoder.fireChannelRead(ByteToMessageDecoder.java:323)

io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:297)

io.netty.channel.CombinedChannelDuplexHandler.channelRead(CombinedChannelDuplexHandler.java:253)

io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:359)

io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:345)

io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:337)

io.netty.channel.DefaultChannelPipeline$HeadContext.channelRead(DefaultChannelPipeline.java:1408)

io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:359)

io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:345)

io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:930)

io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:163)

io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:677)

io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:612)

io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:529)

io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:491)

io.netty.util.concurrent.SingleThreadEventExecutor$5.run(SingleThreadEventExecutor.java:905)

java.lang.Thread.run(Unknown Source)

#2:

io.netty.buffer.AdvancedLeakAwareByteBuf.nioBuffer(AdvancedLeakAwareByteBuf.java:712)

org.springframework.core.io.buffer.NettyDataBuffer.asByteBuffer(NettyDataBuffer.java:266)

org.springframework.core.codec.StringDecoder.decodeDataBuffer(StringDecoder.java:207)

org.springframework.core.codec.StringDecoder.decodeDataBuffer(StringDecoder.java:59)

org.springframework.core.codec.AbstractDataBufferDecoder.lambda$decodeToMono$1(AbstractDataBufferDecoder.java:68)

reactor.core.publisher.FluxMapFuseable$MapFuseableSubscriber.onNext(FluxMapFuseable.java:107)

reactor.core.publisher.FluxContextStart$ContextStartSubscriber.onNext(FluxContextStart.java:103)

reactor.core.publisher.FluxMapFuseable$MapFuseableConditionalSubscriber.onNext(FluxMapFuseable.java:287)

reactor.core.publisher.FluxFilterFuseable$FilterFuseableConditionalSubscriber.onNext(FluxFilterFuseable.java:331)

reactor.core.publisher.Operators$MonoSubscriber.complete(Operators.java:1505)

reactor.core.publisher.MonoCollectList$MonoBufferAllSubscriber.onComplete(MonoCollectList.java:123)

reactor.core.publisher.FluxJust$WeakScalarSubscription.request(FluxJust.java:101)

reactor.core.publisher.MonoCollectList$MonoBufferAllSubscriber.onSubscribe(MonoCollectList.java:90)

reactor.core.publisher.FluxJust.subscribe(FluxJust.java:70)

reactor.core.publisher.FluxDefer.subscribe(FluxDefer.java:54)

reactor.core.publisher.MonoCollectList.subscribe(MonoCollectList.java:59)

reactor.core.publisher.MonoFilterFuseable.subscribe(MonoFilterFuseable.java:44)

reactor.core.publisher.MonoMapFuseable.subscribe(MonoMapFuseable.java:56)

reactor.core.publisher.MonoSubscriberContext.subscribe(MonoSubscriberContext.java:47)

reactor.core.publisher.MonoMapFuseable.subscribe(MonoMapFuseable.java:59)

reactor.core.publisher.MonoOnErrorResume.subscribe(MonoOnErrorResume.java:44)

reactor.core.publisher.MonoOnErrorResume.subscribe(MonoOnErrorResume.java:44)

reactor.core.publisher.MonoPeek.subscribe(MonoPeek.java:71)

reactor.core.publisher.MonoMap.subscribe(MonoMap.java:55)

reactor.core.publisher.MonoFlatMap$FlatMapMain.onNext(MonoFlatMap.java:150)

reactor.core.publisher.FluxContextStart$ContextStartSubscriber.onNext(FluxContextStart.java:103)

reactor.core.publisher.FluxMapFuseable$MapFuseableConditionalSubscriber.onNext(FluxMapFuseable.java:287)

reactor.core.publisher.FluxFilterFuseable$FilterFuseableConditionalSubscriber.onNext(FluxFilterFuseable.java:331)

reactor.core.publisher.Operators$MonoSubscriber.complete(Operators.java:1505)

reactor.core.publisher.MonoCollectList$MonoBufferAllSubscriber.onComplete(MonoCollectList.java:123)

reactor.core.publisher.FluxMap$MapSubscriber.onComplete(FluxMap.java:136)

reactor.core.publisher.FluxPeek$PeekSubscriber.onComplete(FluxPeek.java:252)

reactor.core.publisher.FluxMap$MapSubscriber.onComplete(FluxMap.java:136)

reactor.netty.channel.FluxReceive.terminateReceiver(FluxReceive.java:372)

reactor.netty.channel.FluxReceive.drainReceiver(FluxReceive.java:196)

reactor.netty.channel.FluxReceive.onInboundComplete(FluxReceive.java:337)

reactor.netty.channel.ChannelOperations.onInboundComplete(ChannelOperations.java:333)

reactor.netty.http.server.HttpServerOperations.onInboundNext(HttpServerOperations.java:453)

reactor.netty.channel.ChannelOperationsHandler.channelRead(ChannelOperationsHandler.java:141)

io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:359)

io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:345)

io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:337)

reactor.netty.http.server.HttpTrafficHandler.channelRead(HttpTrafficHandler.java:191)

io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:359)

io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:345)

io.netty.channel.Abstrahttp://ctChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:337)

io.netty.channel.CombinedChannelDuplexHandler$DelegatingChannelHandlerContext.fireChannelRead(CombinedChannelDuplexHandler.java:438)

io.netty.handler.codec.ByteToMessageDecoder.fireChannelRead(ByteToMessageDecoder.java:323)

io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:297)

io.netty.channel.CombinedChannelDuplexHandler.channelRead(CombinedChannelDuplexHandler.java:253)

io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:359)

io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:345)

io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:337)

io.netty.channel.DefaultChannelPipeline$HeadContext.channelRead(DefaultChannelPipeline.java:1408)

io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:359)

io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:345)

io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:930)

io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:163)

io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:677)

io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:612)

io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:529)

io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:491)

io.netty.util.concurrent.SingleThreadEventExecutor$5.run(SingleThreadEventExecutor.java:905)

java.lang.Thread.run(Unknown Source)

#3:

io.netty.buffer.AdvancedLeakAwareByteBuf.slice(AdvancedLeakAwareByteBuf.java:82)

org.springframework.core.io.buffer.NettyDataBuffer.slice(NettyDataBuffer.java:260)

org.springframework.core.io.buffer.NettyDataBuffer.slice(NettyDataBuffer.java:42)

org.springframework.cloud.gateway.handler.predicate.ReadBodyPredicateFactory.lambda$null$0(ReadBodyPredicateFactory.java:102)

reactor.core.publisher.FluxDefer.subscribe(FluxDefer.java:46)

reactor.core.publisher.MonoCollectList.subscribe(MonoCollectList.java:59)

reactor.core.publisher.MonoFilterFuseable.subscribe(MonoFilterFuseable.java:44)

reactor.core.publisher.MonoMapFuseable.subscribe(MonoMapFuseable.java:56)

reactor.core.publisher.MonoSubscriberContext.subscribe(MonoSubscriberContext.java:47)

reactor.core.publisher.MonoMapFuseable.subscribe(MonoMapFuseable.java:59)

reactor.core.publisher.MonoOnErrorResume.subscribe(MonoOnErrorResume.java:44)

reactor.core.publisher.MonoOnErrorResume.subscribe(MonoOnErrorResume.java:44)

reactor.core.publisher.MonoPeek.subscribe(MonoPeek.java:71)

reactor.core.publisher.MonoMap.subscribe(MonoMap.java:55)

reactor.core.publisher.MonoFlatMap$FlatMapMain.onNext(MonoFlatMap.java:150)

reactor.core.publisher.FluxContextStart$ContextStartSubscriber.onNext(FluxContextStart.java:103)

reactor.core.publisher.FluxMapFuseable$MapFuseableConditionalSubscriber.onNext(FluxMapFuseable.java:287)

reactor.core.publisher.FluxFilterFuseable$FilterFuseableConditionalSubscriber.onNext(FluxFilterFuseable.java:331)

reactor.core.publisher.Operators$MonoSubscriber.complete(Operators.java:1505)

reactor.core.publisher.MonoCollectList$MonoBufferAllSubscriber.onComplete(MonoCollectList.java:123)

reactor.core.publisher.FluxMap$MapSubscriber.onComplete(FluxMap.java:136)

reactor.core.publisher.FluxPeek$PeekSubscriber.onComplete(FluxPeek.java:252)

reactor.core.publisher.FluxMap$MapSubscriber.onComplete(FluxMap.java:136)

reactor.netty.channel.FluxReceive.terminateReceiver(FluxReceive.java:372)

reactor.netty.channel.FluxReceive.drainReceiver(FluxReceive.java:196)

reactor.netty.channel.FluxReceive.onInboundComplete(FluxReceive.java:337)

reactor.netty.channel.ChannelOperations.onInboundComplete(ChannelOperations.java:333)

reactor.netty.http.server.HttpServerOperations.onInboundNext(HttpServerOperations.java:453)

reactor.netty.channel.ChannelOperationsHandler.channelRead(ChannelOperationsHandler.java:141)

io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:359)

io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:345)

io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:337)

reactor.netty.http.server.HttpTrafficHandler.channelRead(HttpTrafficHandler.java:191)

io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:359)

io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:345)

io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:337)

io.netty.channel.CombinedChannelDuplexHandler$DelegatingChannelHandlerContext.fireChannelRead(CombinedChannelDuplexHandler.java:438)

io.netty.handler.codec.ByteToMessageDecoder.fireChannelRead(ByteToMessageDecoder.java:323)

io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:297)

io.netty.channel.CombinedChannelDuplexHandler.channelRead(CombinedChannelDuplexHandler.java:253)

io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:359)

io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:345)

io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:337)

io.netty.channel.DefaultChannelPipeline$HeadContext.channelRead(DefaultChannelPipeline.java:1408)

io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:359)

io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:345)

io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:930)

io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:163)

io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:677)

io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:612)

io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:529)

io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:491)

io.netty.util.concurrent.SingleThreadEventExecutor$5.run(SingleThreadEventExecutor.java:905)

java.lang.Thread.run(Unknown Source)

在 #3 中,我发现了一个眼熟的类,ReadBodyPredicateFactory.java ,还记得最开始的时候使用 readbody 配置么?

这里就是进行 cachedRequestBodyObject 的写入类,

追踪一下Readbody源码

/**

* This predicate is BETA and may be subject to change in a future release. A

* predicate that checks the contents of the request body

* @param inClass the class to parse the body to

* @param predicate a predicate to check the contents of the body

* @param the type the body is parsed to

* @return a {@link BooleanSpec} to be used to add logical operators

*/

public BooleanSpec readBody(Class inClass, Predicate predicate) {

return asyncPredicate(getBean(ReadBodyPredicateFactory.class)

.applyAsync(c -> c.setPredicate(inClass, predicate)));

}

异步调用的 ReadBodyPredicateFactory.applyAsync() 和 错误日志中的

org.springframework.cloud.gateway.handler.predicate.ReadBodyPredicateFactory.lambda$null$0(ReadBodyPredicateFactory.java:102)

指向方法一致。查看源码102行:

Flux cachedFlux = Flux.defer(() ->

Flux.just(dataBuffer.slice(0, dataBuffer.readableByteCount()))

);

此处 Spring Cloud Gateway 通过 dataBuffer.slice 切割出了新的 dataBuffer,但是通过 Netty 的内存检测工具判断,此处的 dataBuffer 并没有被回收。

错误如下,日志很多容易被忽视。

ERROR io.netty.util.ResourceLeakDetector.reportTracedLeak:317 - LEAK: ByteBuf.release() was not called before it's garbage-collected. See http://netty.io/wiki/reference-counted-objects.html for more information.

找到问题那就要解决才行,尝试修改源码

@Override

@SuppressWarnings("unchecked")

public AsyncPredicate applyAsync(Config config) {

return exchange -> {

Class inClass = config.getInClass();

Object cachedBody = exchange.getAttribute(CACHE_REQUEST_BODY_OBJECT_KEY);

Mono> modifiedBody;

// We can only read the body from the request once, once that

// happens if we

// try to read the body again an exception will be thrown. The below

// if/else

// caches the body object as a request attribute in the

// ServerWebExchange

// so if this filter is run more than once (due to more than one

// route

// using it) we do not try to read the request body multiple times

if (cachedBody != null) {

try {

boolean test = config.predicate.test(cachedBody);

exchange.getAttributes().put(TEST_ATTRIBUTE, test);

return Mono.just(test);

} catch (ClassCastException e) {

if (LOGGER.isDebugEnabled()) {

LOGGER.debug("Predicate test failed because class in predicate "

+ "does not match the cached body object", e);

}

}

return Mono.just(false);

} else {

// Join all the DataBuffers so we have a single DataBuffer for

// the body

return DataBufferUtils.join(exchange.getRequest().getBody()).flatMap(dataBuffer -> {

// Update the retain counts so we can read the body twice,

// once to parse into an object

// that we can test the predicate against and a second time

// when the HTTP client sends

// the request downstream

// Note: if we end up reading the body twice we will run

// into

// a problem, but as of right

// now there is no good use case for doing this

DataBufferUtils.retain(dataBuffer);

// Make a slice for each read so each read has its own

// read/write indexes

Flux cachedFlux = Flux

.defer(() -> Flux.just(dataBuffer.slice(0, dataBuffer.readableByteCount())));

ServerHttpRequest mutatedRequest = new ServerHttpRequestDecorator(exchange.getRequest()) {

@Override

public Flux getBody() {

return cachedFlux;

}

};

# 新增如下代码

DataBufferUtils.release(dataBuffer);

return ServerRequest.create(exchange.mutate().request(mutatedRequest).build(), messageReaders)

.bodyToMono(inClass).doOnNext(objectValue -> {

exchange.getAttributes().put(CACHE_REQUEST_BODY_OBJECT_KEY, objectValue);

exchange.getAttributes().put(CACHED_REQUEST_BODY_KEY, cachedFlux);

}).map(objectValue -> config.predicate.test(objectValue));

});

}

};

}

Spring Cloud Gateway 在配置的架构中,版本为2.1.1,修改以上代码后,启动项目测试,问题没有复现,正常运行。

同样这个问题,也可以选择升级 Spring Cloud Gateway 版本,在官方2.1.2版本中,此处代码已被重构,升级后测试也完全正常。


版权声明:本文内容由网络用户投稿,版权归原作者所有,本站不拥有其著作权,亦不承担相应法律责任。如果您发现本站中有涉嫌抄袭或描述失实的内容,请联系我们jiasou666@gmail.com 处理,核实后本网站将在24小时内删除侵权内容。

上一篇:Suricata中的eBPF解析
下一篇:5G NR RRC连接控制
相关文章

 发表评论

暂时没有评论,来抢沙发吧~