Java 实现限流器处理Rest接口请求详解流程

网友投稿 296 2022-09-18


Java 实现限流器处理Rest接口请求详解流程

Maven依赖

com.google.guava

guava

31.0.1-jre

代码

上代码,不废话。

首先是限流器代码。

package com.huyi.csdn.tools.rate;

import com.google.common.util.concurrent.Monitor;

import com.google.common.util.concurrent.RateLimiter;

import java.util.concurrent.ConcurrentLinkedQueue;

import java.util.function.Consumer;

/**

* @Program: csdn @ClassName: RequestRateLimiter @Author: huyi @Date: 2021-10-30 22:16 @Description:

* 请求限流器 @Version: V1.0

*/

public class RequestRateLimiter {

// 请求队列

private final ConcurrentLinkedQueue bucket = new ConcurrentLinkedQueue<>();

// 队列上限

private static final int BUCKET_CAPACITY = 100;

// 漏桶下沿水流速度

private final RateLimiter rateLimiter = RateLimiter.create(10.0D);

// 请求监视器

private final Monitor requestMoniter = new Monitor();

// 处理监视器

private final Monitor handleMoniter = new Monitor();

/** 请求实体 */

public static class Request {

private int data;

public Request(int data) {

this.data = data;

}

@Override

public String toString() {

return "Request{" + "data=" + data + '}';

}

}

public void submitRequest(int data) {

this.submitRequest(new Request(data));

}

public void submitRequest(Request request) {

// 请求监视器,创建监视向导,队列数据量小于上限

if (requestMoniter.enterIf(requestMoniter.newGuard(() -> bucket.size() < BUCKET_CAPACITY))) {

try {

boolean result = bucket.offer(request);

if (result) {

System.out.println("成功向队列加入新的请求!" + Thread.currentThread() + " request:" + request);

} else {

System.out.println("加入新请求失败!");

}

} finally {

requestMoniter.leave();

}

} else {

// 队列已满

// System.out.println("请求队列已经上限,请稍后重试!");

}

}

// 处理请求方法

public void handleRequest(Consumer consumer) {

if (handleMoniter.enterIf(handleMoniter.newGuard(() -> !bucket.isEmpty()))) {

try {

// 匀速处理

rateLimiter.acquire();

consumer.accept(bucket.poll());

} finally {

handleMoniter.leave();

}

}

}

}

代码说明

1、有个长度100的任务队列,增加了监视器。

2、添加了限流器,限流为10。

验证代码

package com.huyi.csdn.tools.rate;

import java.time.LocalTime;

import java.util.concurrent.TimeUnit;

import java.util.concurrent.atomic.AtomicInteger;

import java.util.stream.IntStream;

/**

* @Program: csdn @ClassName: TestRateLimiter @Author: huyi @Date: 2021-10-30

* 22:35 @Description: @Version: V1.0

*/

public class TestRateLimiter {

ppphWrPrivate static final AtomicInteger DATA = new AtomicInteger(0);

private static final RequestRateLimiter HANDLE = new RequestRateLimiter();

public static void main(String[] args) {

IntStream.range(0, 10)

.forEach(

(x) ->

new Thread(

() -> {

while (true) {

HANDLE.submitRequest(DATA.getAndIncrement());

try {

TimeUnit.MILLISECONDS.sleep(100);

} catch (InterruptedException e) {

e.printStackTrace();

}

}

})

.start());

IntStream.range(0, 20)

.forEach(

(x) ->

new Thread(

() -> {

while (true) {

HANDLE.handleRequest(

y ->

System.out.println(

LocalTime.now() + ":处理数据 -> " + y.toString()));

}

})

.start());

}

}

验证执行结果

总结

胸中一点浩然气,天地千里快哉风。

如果本文对你有用的,请不要吝啬你的赞,谢谢支持。


版权声明:本文内容由网络用户投稿,版权归原作者所有,本站不拥有其著作权,亦不承担相应法律责任。如果您发现本站中有涉嫌抄袭或描述失实的内容,请联系我们jiasou666@gmail.com 处理,核实后本网站将在24小时内删除侵权内容。

上一篇:ACL访问控制列表(标准acl、扩展acl)(扩展acl配置命令实例)
下一篇:Cisco使用MSTP+VRRP+静态路由+子网划分+DHCP实验案例
相关文章

 发表评论

暂时没有评论,来抢沙发吧~