多应用+插件架构,代码干净,二开方便,首家独创一键云编译技术,文档视频完善,免费商用码云13.8K 广告
# 使用`ThreadPoolExecutor`和`Semaphore`限制任务提交率 > 原文: [https://howtodoinjava.com/java/multi-threading/throttling-task-submission-rate-using-threadpoolexecutor-and-semaphore/](https://howtodoinjava.com/java/multi-threading/throttling-task-submission-rate-using-threadpoolexecutor-and-semaphore/) 如果您知道在 Web 服务器中,则可以配置到服务器的最大并发连接数。 如果有更多连接超出此限制,则它们必须等待直到释放或关闭某些其他连接。 此限制可以视为节流。 节流是为输出速率比输入速率慢的系统调节输入速率的能力。 必须停止系统崩溃或资源耗尽。 在与`BlockingQueue`和`ThreadPoolExecutor`相关的上一篇文章中,我们了解了如何创建具有以下能力的`CustomThreadPoolExecutor`: 1)提交到阻塞队列 的任务,2)一个执行器,从队列中拾取任务并执行它们,3)已在`ExecuteGate`之后覆盖了`Execute()`方法以执行一些必要的额外活动,4)附加了一个`RejectedExecutionHandler`,用于处理由于队列已满而被拒绝的任务 我们的方法已经足够好,并且能够处理大多数实际情况。 现在,我们再添加一个概念,在某些情况下可能会证明是有益的。 这个概念是围绕队列中任务提交的限制。 在此示例中,节流将有助于使队列中的任务数保持在限制范围内,从而使任何任务都不会被拒绝。 它从本质上也消除了`RejectedExecutionHandler`的必要性。 ## 使用`CustomThreadPoolExecutor`和`RejectedExecutionHandler`的先前解决方案 在此解决方案中,我们有以下类: **`DemoTask.java`** ```java public class DemoTask implements Runnable { private String name = null; public DemoTask(String name) { this.name = name; } public String getName() { return this.name; } @Override public void run(){ try { Thread.sleep(1000); } catch (InterruptedException e){ e.printStackTrace(); } System.out.println("Executing : " + name); } } ``` **`CustomThreadPoolExecutor.java`** ```java import java.util.concurrent.BlockingQueue; import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; public class CustomThreadPoolExecutor extends ThreadPoolExecutor { public CustomThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue) { super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue); } @Override protected void beforeExecute(Thread t, Runnable r) { super.beforeExecute(t, r); } @Override protected void afterExecute(Runnable r, Throwable t) { super.afterExecute(r, t); if (t != null) { t.printStackTrace(); } } } ``` **`DemoExecutor.java`** ```java import java.util.concurrent.ArrayBlockingQueue; import java.util.concurrent.BlockingQueue; import java.util.concurrent.RejectedExecutionHandler; import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; public class DemoExecutor { public static void main(String[] args) { Integer threadCounter = 0; BlockingQueue<Runnable> blockingQueue = new ArrayBlockingQueue<Runnable>(50); CustomThreadPoolExecutor executor = new CustomThreadPoolExecutor(10, 20, 5000, TimeUnit.MILLISECONDS, blockingQueue); executor.setRejectedExecutionHandler(new RejectedExecutionHandler() { @Override public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) { System.out.println("DemoTask Rejected : " + ((DemoTask) r).getName()); try { Thread.sleep(1000); } catch (InterruptedException e) { e.printStackTrace(); } System.out.println("Lets add another time : " + ((DemoTask) r).getName()); executor.execute(r); } }); // Let start all core threads initially executor.prestartAllCoreThreads(); while (true) { threadCounter++; // Adding threads one by one //System.out.println("Adding DemoTask : " + threadCounter); executor.execute(new DemoTask(threadCounter.toString())); if (threadCounter == 1000) break; } } } ``` 如果运行上述程序,则将获得**输出**,如下所示: ```java DemoTask Rejected : 71 Executing : 3 Executing : 5 ... ... ``` 将出现多次“`DemoTask Rejected`”。 在下一个解决方案中,我们将使用节流技术,以使任何任务都不会被拒绝。 ## 使用`ThreadPoolExecutor`和`Semaphore`限制任务的提交率 在此解决方案中,我们将创建一个[`Semaphore`](https://docs.oracle.com/javase/7/docs/api/java/util/concurrent/Semaphore.html "Semaphore"),其编号必须等于在任何给定时间点阻塞队列中的最大任务数。 因此该方法如下所示: 1)在执行任务之前,要求锁定信号量 2)如果获取了锁定,则执行正常。 否则,将重试直到获得锁 3)任务完成后; 锁被释放到信号量 我们启用节流的新`BlockingThreadPoolExecutor`如下所示: ```java package threadpoolDemo; import java.util.concurrent.BlockingQueue; import java.util.concurrent.RejectedExecutionException; import java.util.concurrent.Semaphore; import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; public class BlockingThreadPoolExecutor extends ThreadPoolExecutor { private final Semaphore semaphore; public BlockingThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue) { super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue); semaphore = new Semaphore(corePoolSize + 50); } @Override protected void beforeExecute(Thread t, Runnable r) { super.beforeExecute(t, r); } @Override public void execute(final Runnable task) { boolean acquired = false; do { try { semaphore.acquire(); acquired = true; } catch (final InterruptedException e) { //LOGGER.warn("InterruptedException whilst aquiring semaphore", e); } } while (!acquired); try { super.execute(task); } catch (final RejectedExecutionException e) { System.out.println("Task Rejected"); semaphore.release(); throw e; } } @Override protected void afterExecute(Runnable r, Throwable t) { super.afterExecute(r, t); if (t != null) { t.printStackTrace(); } semaphore.release(); } } ``` 现在,如下测试代码。 ```java package threadpoolDemo; import java.util.concurrent.ArrayBlockingQueue; import java.util.concurrent.BlockingQueue; import java.util.concurrent.RejectedExecutionHandler; import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; public class DemoExecutor { public static void main(String[] args) { Integer threadCounter = 0; BlockingQueue<Runnable> blockingQueue = new ArrayBlockingQueue<Runnable>(50); BlockingThreadPoolExecutor executor = new BlockingThreadPoolExecutor(10, 20, 5000, TimeUnit.MILLISECONDS, blockingQueue); executor.setRejectedExecutionHandler(new RejectedExecutionHandler() { @Override public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) { System.out.println("DemoTask Rejected : " + ((DemoTask) r).getName()); try { Thread.sleep(1000); } catch (InterruptedException e) { e.printStackTrace(); } System.out.println("Lets add another time : " + ((DemoTask) r).getName()); executor.execute(r); } }); // Let start all core threads initially executor.prestartAllCoreThreads(); while (true) { threadCounter++; // Adding threads one by one System.out.println("Adding DemoTask : " + threadCounter); executor.execute(new DemoTask(threadCounter.toString())); if (threadCounter == 1000) break; } } } ``` 当使用`BlockingThreadPoolExecutor`代替`CustomThreadPoolExecutor`运行`DemoExecutor`程序时,您不会看到任何任务被拒绝,并且所有任务都将成功执行。 您可以控制在任何时候通过`Semaphore`构造函数传递参数的任务数量。 这就是这篇文章的全部内容。 您应该阅读有关[**并发**](//howtodoinjava.com/category/java/multi-threading/ "multi-threading")的更多信息,以提高信心。 学习愉快!