BlockingQueue队列处理高并发下的日志

网友投稿 340 2022-08-22


BlockingQueue队列处理高并发下的日志

目录前言what阻塞队列?1.声明存储固定消息的队列2.消息入队3.消息出队被消费

前言

当系统流量负载比较高时,业务日志的写入操作也要纳入系统性能考量之内,如若处理不当,将影响系统的正常业务操作,之前写过一篇《spring boot通过MQ消费log4j2的日志》的博文,采用了RabbitMQ消息中间件来存储抗高并发下的日志,因为引入了中间件,操作使用起来可能没那么简便,今天分享使用多线程消费阻塞队列的方式来处理我们的海量日志

what阻塞队列?

阻塞队列(BlockingQueue)是区别于普通队列多了两个附加操作的线程安全的队列。这两个附加的操作是:在队列为空时,获取元素的线程会等待队列变为非空。当队列满时,存储元素的线程会等待队列可用。阻塞队列常用于生产者和消费者的场景,生产者是往队列里添加元素的线程,消费者是从队列里拿元素的线程。阻塞队列就是生产者存放元素的容器,而消费者也只从容器里拿元素。

1.声明存储固定消息的队列

/**

* Created by kl on 2017/3/20.

* Content :销售操作日志队列

*/

public class SalesLogQueue{

//队列大小

public static final int QUEUE_MAX_SIZE = 1000;

private static SalesLogQueue alarmMessageQueue = new SalesLogQueue();

//阻塞队列

mwfKiNCprivate BlockingQueueblockingQueue = new LinkedBlockingQueue(QUEUE_MAX_SIZE);

private SalesLogQueue(){}

public static SalesLogQueue getInstance() {

return alarmMessageQueue;

}

/**

* 消息入队

* @param salesLog

* @return

*/

public boolean push(SalesLog salesLog) {

return this.blockingQueue.add(salesLog);//队列满了就抛出异常,不阻塞

}

/**

* 消息出队

* @return

*/

public SalesLog poll() {

SalesLog result = null;

try {

result = this.blockingQueue.take();

} catch (InterruptedException e) {

e.printStackTrace();

}

return result;

}

/**

* 获取队列大小

* @return

*/

public int size() {

return this.blockingQueue.size();

}

}

ps:因为业务原因,采用add的方式入队,队列满了就抛异常,不阻塞

2.消息入队

消息入队可以在任何需要保存日志的地方操作,如aop统一拦截日志处理,filter过滤请求日志处理,或者耦合的业务日志,记住,不阻塞入队操作,不然将影响正常的业务操作,如下为filter统一处理请求日志:

/**

* Created by kl on 2017/3/20.

* Content :访问请求拦截,保存操作日志

*/

public class SalesLogFilter implements Filter {

private RoleResourceService resourceService;

@Override

public void init(FilterConfig filterConfig) throws ServletException {

ServletContext context = filterConfig.getServletContext();

ApplicationContext ctx = WebApplicationContextUtils.getWebApplicationContext(context);

resourceService = ctx.getBean(RoleResourceService.class);

}

@Override

public void doFilter(ServletRequest servletRequest, ServletResponse servletResponse, FilterChain filterChain) throws IOException, ServletException {

try {

HttpServletRequest request = (HttpServletRequest) servletRequest;

String requestUrl = request.getRequestURI();

String requestType=request.getMethod();

String ipAddress = HttpClientUtil.getIpAddr(request);

Map resource=resourceService.getResource();

String context=resource.get(requestUrl);

//动态url正则匹配

if(StringUtil.isNull(context)){

for(Map.Entry entry:resource.entrySet()){

String resourceUrl= entry.getKey();

if(requestUrl.matches(resourceUrl)){

context=entry.getValue();

break;

}

}

}

SamwfKiNClesLog log=new SalesLog();

log.setCreateDate(new Timestamp(System.currentTimeMillis()));

log.setContext(context);

log.setOperateUser(UserTokenUtil.currentUser.get().get("realname"));

log.setRequestIp(ipAddress);

log.setRequestUrl(requestUrl);

log.setRequestType(requestType);

SalesLogQueue.getInstance().push(log);

}catch (Exception e){

e.printStackTrace();

}

filterChain.doFilter(servletRequest, servletResponse);

}

@Override

public void destroy() {

}

}

3.消息出队被消费

BlockingQueue是线程安全的,所以可以放心的在多个线程中去处理队列中的消息,如下代码声明了一个两个大小的固定线程池,并添加了两个线程去处理队列中的消息

/**

* Created by kl on 2017/3/20.

* Content :启动消费操作日志队列的线程

*/

@Component

public class ConsumeSalesLogQueue {

@Autowired

SalesLogService salesLogService;

@PostConstruct

public void startrtThread() {

ExecutorService e = Executors.newFixedThreadPool(2);//两个大小的固定线程池

e.submit(new PollSalesLog(salesLogService));

e.submit(new PollSalesLog(salesLogService));

}

class PollSalesLog implements Runnable {

SalesLogService salesLogService;

public PollSalesLog(SalesLogService salesLogService) {

this.salesLogService = salesLogService;

}

@Override

public void run() {

while (true) {

try {

SalesLog salesLog = SalesLogQueue.getInstance().poll();

if(salesLog!=null){

salesLogService.saveSalesLog(salesLog);

}

} catch (Exception e) {

e.printStackTrace();

}

}

}

}

}

参考博文如下,对BlockingQueue队列更多了解,可读一读如下的博文:

详细分析java并发集合ArrayBlockingQueue的用法

详解Java阻塞队列(BlockingQueue)的实现原理

Java并发之BlockingQueue的使用

以上就是BlockingQueue队列处理高并发下的日志的详细内容,更多关于BlockingQueue队列处理高并发日志的资料请关注我们其它相关文章!


版权声明:本文内容由网络用户投稿,版权归原作者所有,本站不拥有其著作权,亦不承担相应法律责任。如果您发现本站中有涉嫌抄袭或描述失实的内容,请联系我们jiasou666@gmail.com 处理,核实后本网站将在24小时内删除侵权内容。

上一篇:Python中的循环结构(python里的循环结构)
下一篇:PySide6开发环境配置(使用PyCharm)
相关文章

 发表评论

暂时没有评论,来抢沙发吧~