造轮子之MemorySafeLinkedBlockingQueue
造輪子之MemorySafeLinkedBlockingQueue-LinkBlockingQueue改進
LinkBlockingQueue改進
問題背景
https://github.com/apache/dubbo/pull/9722/files
使用線程池的同學對于標題中的隊列想必都有過使用
,但上述隊列使用不當時則會造成程序OOM
,那怎么來控制呢?
使用ArrayBlockingQueue?如何來評估長度?
是否有一個完美的解決方案呢,MemorySafeLinkedBlockingQueue則通過對內存的限制判斷盡面控制隊列的容量,完成解決了可能存在的OOM問題 。
獲取內存大?。ㄗ?:單位大B;支持準實時更新):
Runtime.getRuntime().freeMemory()//JVM中已經申請到的堆內存中還未使用的大小Runtime.getRuntime().maxMemory()// JVM可從操作系統(tǒng)申請到的最大內存值 -XxmRuntime.getRuntime().totalMemory()// JVM已從操作系統(tǒng)申請到的內存大小 —Xxs可設置該值大小-初始堆的大小線程池在excute任務時,放隊列,放不進去,使用新線程運行任務。這個放不進行 ,是使用的offer??非阻塞方法嗎 ?
參考 :https://blog.csdn.net/weixin_43108539/article/details/125190023
public void execute(Runnable command) { if (command == null) throw new NullPointerException(); //拿到32位的int int c = ctl.get(); //工作線程數(shù)<核心線程數(shù) if (workerCountOf(c) < corePoolSize) { //進入if,代表可以創(chuàng)建 核心 線程數(shù) if (addWorker(command, true)) return; //如果沒進入if,代表創(chuàng)建核心線程數(shù)失敗,重新獲取 ctl c = ctl.get(); } //判斷線程池為Running狀態(tài)
,將任務添加入阻塞隊列
,使用offer if (isRunning(c) && workQueue.offer(command)) { int recheck = ctl.get(); //再次判斷是否為Running狀態(tài)
,若不是Running狀態(tài),remove任務 if (! isRunning(recheck) && remove(command)) reject(command); //如果線程池在Running狀態(tài) ,線程池數(shù)量為0 else if (workerCountOf(recheck) == 0) //阻塞隊列有任務
,但是沒有工作線程
,添加一個任務為空的工作線程處理阻塞隊列中的任務 addWorker(null, false); } //阻塞隊列已滿
,創(chuàng)建非核心線程 ,拒絕策略-addWorker中有判斷核心線程數(shù)是否超過最大線程數(shù) else if (!addWorker(command, false)) reject(command); }空閑內存計算
package com.zte.sdn.oscp.queue;import cn.hutool.core.thread.NamedThreadFactory;import java.util.concurrent.Executors;import java.util.concurrent.ScheduledExecutorService;import java.util.concurrent.TimeUnit;import java.util.concurrent.atomic.AtomicBoolean;public class MemoryLimitCalculator { private static volatile long maxAvailable; private static final AtomicBoolean refreshStarted = new AtomicBoolean(false); private static void refresh() { maxAvailable = Runtime.getRuntime().freeMemory(); } private static void checkAndScheduleRefresh() { if (!refreshStarted.get()) { // immediately refresh when first call to prevent maxAvailable from being 0 // to ensure that being refreshed before refreshStarted being set as true // notice: refresh may be called for more than once because there is no lock refresh(); if (refreshStarted.compareAndSet(false, true)) { ScheduledExecutorService scheduledExecutorService = Executors.newSingleThreadScheduledExecutor(new NamedThreadFactory("Dubbo-Memory-Calculator")); // check every 50 ms to improve performance scheduledExecutorService.scheduleWithFixedDelay(MemoryLimitCalculator::refresh, 50, 50, TimeUnit.MILLISECONDS); Runtime.getRuntime().addShutdownHook(new Thread(() ->{ refreshStarted.set(false); scheduledExecutorService.shutdown(); })); } } } /** * Get the maximum available memory of the current JVM. * * @return maximum available memory */ public static long maxAvailable() { checkAndScheduleRefresh(); return maxAvailable; } /** * Take the current JVM's maximum available memory * as a percentage of the result as the limit. * * @param percentage percentage * @return available memory */ public static long calculate(final float percentage) { if (percentage <= 0 || percentage >1) { throw new IllegalArgumentException(); } checkAndScheduleRefresh(); return (long) (maxAvailable() * percentage); } /** * By default, it takes 80% of the maximum available memory of the current JVM. * * @return available memory */ public static long defaultLimit() { checkAndScheduleRefresh(); return (long) (maxAvailable() * 0.8); }}內存安全隊列
package com.zte.sdn.oscp.queue;import java.util.Collection;import java.util.concurrent.LinkedBlockingQueue;import java.util.concurrent.TimeUnit;public class MemorySafeLinkedBlockingQueueextends LinkedBlockingQueue{ private static final long serialVersionUID = 8032578371739960142L; public static int THE_256_MB = 256 * 1024 * 1024; private int maxFreeMemory; private Rejectorrejector; public MemorySafeLinkedBlockingQueue() { this(THE_256_MB); } public MemorySafeLinkedBlockingQueue(final int maxFreeMemory) { super(Integer.MAX_VALUE); this.maxFreeMemory = maxFreeMemory; //default as DiscardPolicy to ensure compatibility with the old version this.rejector = new DiscardPolicy<>(); } public MemorySafeLinkedBlockingQueue(final Collection<? extends E>c, final int maxFreeMemory) { super(c); this.maxFreeMemory = maxFreeMemory; //default as DiscardPolicy to ensure compatibility with the old version this.rejector = new DiscardPolicy<>(); } /** * set the max free memory. * * @param maxFreeMemory the max free memory */ public void setMaxFreeMemory(final int maxFreeMemory) { this.maxFreeMemory = maxFreeMemory; } /** * get the max free memory. * * @return the max free memory limit */ public int getMaxFreeMemory() { return maxFreeMemory; } /** * set the rejector. * * @param rejector the rejector */ public void setRejector(final Rejectorrejector) { this.rejector = rejector; } /** * determine if there is any remaining free memory. * * @return true if has free memory */ public boolean hasRemainedMemory() { return MemoryLimitCalculator.maxAvailable() >maxFreeMemory; } @Override public void put(final E e) throws InterruptedException { if (hasRemainedMemory()) { super.put(e); } else { rejector.reject(e, this); } } @Override public boolean offer(final E e, final long timeout, final TimeUnit unit) throws InterruptedException { if (!hasRemainedMemory()) { rejector.reject(e, this); return false; } return super.offer(e, timeout, unit); } @Override public boolean offer(final E e) { if (!hasRemainedMemory()) { rejector.reject(e, this); return false; } return super.offer(e); }} 拒絕策略
注意其中的rejector是拒絕策略 ,默認的DiscardPolicy什么也不處理;
而DiscardOldPolicy的處理邏輯很簡單
public class DiscardOldestPolicyimplements Rejector{ @Override public void reject(final E e, final Queuequeue) { queue.poll(); queue.offer(e); }} AbortPolicy則直接拋出異常
public class AbortPolicyimplements Rejector{ @Override public void reject(final E e, final Queuequeue) { throw new RejectException("no more memory can be used !"); }} 個人建議增加日志打印即可。
展開閱讀全文投稿時間:2022-09-09 最后更新:2022-09-09
.jpg)
標簽:氣流干燥設備
上一篇:世博國際在線游戲(達州)有限公司
下一篇:歐寶綜合(許昌)有限公司