企业🤖AI智能体构建引擎,智能编排和调试,一键部署,支持私有化部署方案 广告
![](https://cdn.zimug.com/wx-zimug.png) 在之前的文章中已经为大家介绍了java并发编程的工具:BlockingQueue接口、ArrayBlockingQueue、DelayQueue、LinkedBlockingQueue、PriorityBlockingQueue,本文为系列文章第六篇。 本篇文章将为大家介绍并发编程集合类SynchronousQueue,它是BlockingQueue接口的实现类。与所有的BlockingQueue接口实现类不同的是:SynchronousQueue队列的容量永远是0(或者可以理解为容量为1的队列,但是说队列容量为1并不准确),这是因为SynchronousQueue实际上它不是一个真正的队列,因为它不会为队列中元素维护存储空间,它只是多个线程之间数据交换的媒介。 * `SynchronousQueue`队列的一个线程的插入动作总是会等待另一个线程的移除操作,反之亦然。 * `put()`方法放入元素对象到 SynchronousQueue不会返回结果(处于阻塞状态),直到另一个线程执行`take()`移除元素对象. * `peek()`方法在BlockingQueue接口实现类中可以获取元素,但不从队列中移除该元素。但在SynchronousQueue队列中该方法不可用。 ## SynchronousQueue 同步队列例子 下面我们写一个例子,来帮助我们理解`SynchronousQueue`的特性及用法。 **SynchronousQueueProducer.java** 启动线程每隔一秒向队列中放入一个Integer对象。 ~~~ public class SynchronousQueueProducer implements Runnable { protected BlockingQueue<Integer> blockingQueue; public SynchronousQueueProducer(BlockingQueue<Integer> queue) { this.blockingQueue = queue; } @Override public void run() { int i = 0; while (true) { System.out.println(Thread.currentThread().getName() + " Put: " + ++i); try { blockingQueue.put(i); Thread.sleep(1000); //每隔一秒生产一次 } catch (InterruptedException e) { e.printStackTrace(); } } } } ~~~ **SynchronousQueueConsumer.java**启动线程每5秒从队列中取出一个元素对象。 ~~~ public class SynchronousQueueConsumer implements Runnable { protected BlockingQueue<Integer> blockingQueue; public SynchronousQueueConsumer(BlockingQueue<Integer> queue) { this.blockingQueue = queue; } @Override public void run() { while (true) { try { Integer data = blockingQueue.take(); System.out.println(Thread.currentThread().getName() + " take(): " + data); Thread.sleep(5000); //每隔5秒消费一次 } catch (InterruptedException e) { e.printStackTrace(); } } } } ~~~ **SynchronousQueueExample.java** 新建一个SynchronousQueue同步队列,启动一个生产者线程插入对象,两个消费者线程移除对象。进行测试,看看效果。 ~~~ public class SynchronousQueueExample { public static void main(String[] args) { final BlockingQueue<Integer> synchronousQueue = new SynchronousQueue<>(); SynchronousQueueProducer queueProducer = new SynchronousQueueProducer(synchronousQueue); new Thread(queueProducer).start(); SynchronousQueueConsumer queueConsumer1 = new SynchronousQueueConsumer(synchronousQueue); new Thread(queueConsumer1).start(); SynchronousQueueConsumer queueConsumer2 = new SynchronousQueueConsumer(synchronousQueue); new Thread(queueConsumer2).start(); } } ~~~ 分析一下实验的结果,Thread-0 是生产者线程,Thread-1 和 Thread-2是消费者线程。从实验结果我们可以看到SynchronousQueue必须是生产一次、消费一次、生产一次、消费一次,不论有多少个消费者和生产者线程都必须遵循这个规则。 ~~~ Thread-0 Put: 1 Thread-1 take(): 1 Thread-0 Put: 2 Thread-2 take(): 2 Thread-0 Put: 3 Thread-1 take(): 3 Thread-0 Put: 4 Thread-2 take(): 4 Thread-0 Put: 5 Thread-1 take(): 5 Thread-0 Put: 6 Thread-2 take(): 6 Thread-0 Put: 7 Thread-1 take(): 7 Thread-0 Put: 8 Thread-2 take(): 8 Thread-0 Put: 9 Thread-1 take(): 9 Thread-0 Put: 10 Thread-2 take(): 10 Thread-0 Put: 11 Thread-1 take(): 11 Thread-0 Put: 12 Thread-2 take(): 12 ~~~ 应用场景:如果我们不确定来自生产者请求数量,但是这些请求需要很快的处理掉,那么配合SynchronousQueue为每个生产者请求分配一个消费线程是最处理效率最高的办法。Executors.newCachedThreadPool()底层就使用了SynchronousQueue,这个线程池根据需求创建新的线程。