## **并发队列**
* 非阻塞式队列:ConcurrentLinkedQueue
* 阻塞式队列:BlockingQueue
## **非阻塞式队列**
ConcurrentLinkedQueue:是一个适用于高并发场景下的队列,通过无锁的方式,实现了高并发状态下的高性能,它是一个基于链接节点的**无界**线程安全队列。该队列的元素遵循**先进先出**的原则,该队列不允许null元素
```
ConcurrentLinkedQueue queue = new ConcurrentLinkedQueue();
queue.offer("A");
queue.offer("B");
queue.offer("C");
//从头获取元素,删除该元素
System.out.println(queue.poll());
//从头获取元素,不刪除该元素
System.out.println(queue.peek());
//获取总长度
System.out.println(queue.size());
```
## **阻塞式队列**
BlockingQueue:队列容器满时生产者线程阻塞等待、队列容器空时消费者线程阻塞等待
常用的实现类
* ArrayBlockingQueue:有边界的阻塞队列,需初始化容量大小,内部实现是一个数组
```
ArrayBlockingQueue<String> queue = new ArrayBlockingQueue<>(3);
```
* LinkedBlockingQueue:初始化容量则为有边界阻塞队列、否则就是无边界队列(默认Integer.MAX\_VALUE)的容量,内部实现是一个链表
```
LinkedBlockingQueue queue = new LinkedBlockingQueue(3);
```
* SynchronousQueue:队列内部仅允许容纳一个元素
## **BlockingQueue模拟生产者与消费者**
1. 生产者
```
class Producer implements Runnable {
private BlockingQueue<String> blockingQueue;
private AtomicInteger count = new AtomicInteger();
private volatile boolean flag = true;
public Producer(BlockingQueue<String> blockingQueue) {
this.blockingQueue = blockingQueue;
}
@Override
public void run() {
while (flag) {
String data = count.incrementAndGet() + "";
try {
boolean success = blockingQueue.offer(data, 2, TimeUnit.SECONDS);
if (success) {
System.out.println("生产成功 data:" + data);
} else {
System.out.println("生产失败 data:" + data);
}
Thread.sleep(1000);
} catch (Exception e) {
}
}
}
public void stop() {
this.flag = false;
}
}
```
2.消费者
```
class Consumer implements Runnable {
private volatile boolean flag = true;
private BlockingQueue<String> blockingQueue;
public Consumer(BlockingQueue<String> blockingQueue) {
this.blockingQueue = blockingQueue;
}
@Override
public void run() {
while (flag) {
try {
String data = blockingQueue.poll(2, TimeUnit.SECONDS);
if (data == null) {
flag= false;
System.out.println("消费者超过2秒时间未获取到消息.");
return;
}
System.out.println("消费成功 data:" + data);
} catch (Exception e) {
}
}
}
}
```
3.测试
```
class test {
public static void main(String[] args) {
LinkedBlockingDeque<String> queue = new LinkedBlockingDeque<>(3);
Producer producer = new Producer(queue);
Consumer consumer = new Consumer(queue);
Thread p = new Thread(producer);
Thread c = new Thread(consumer);
p.start();
c.start();
try {
Thread.sleep(1000 * 5);
producer.stop();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
```
运行结果
```
生产成功 data:1
消费成功 data:1
生产成功 data:2
消费成功 data:2
生产成功 data:3
消费成功 data:3
生产成功 data:4
消费成功 data:4
生产成功 data:5
消费成功 data:5
消费者超过2秒时间未获取到消息.
```