ThinkChat2.0新版上线,更智能更精彩,支持会话、画图、阅读、搜索等,送10W Token,即刻开启你的AI之旅 广告
## **并发队列** * 非阻塞式队列:ConcurrentLinkedQueue * 阻塞式队列:BlockingQueue ## **非阻塞式队列** ConcurrentLinkedQueue:是一个适用于高并发场景下的队列,通过无锁的方式,实现了高并发状态下的高性能,它是一个基于链接节点的**无界**线程安全队列。该队列的元素遵循**先进先出**的原则,该队列不允许null元素 ``` ConcurrentLinkedQueue queue = new ConcurrentLinkedQueue(); queue.offer("A"); queue.offer("B"); queue.offer("C"); //从头获取元素,删除该元素 System.out.println(queue.poll()); //从头获取元素,不刪除该元素 System.out.println(queue.peek()); //获取总长度 System.out.println(queue.size()); ``` ## **阻塞式队列** BlockingQueue:队列容器满时生产者线程阻塞等待、队列容器空时消费者线程阻塞等待 常用的实现类 * ArrayBlockingQueue:有边界的阻塞队列,需初始化容量大小,内部实现是一个数组 ``` ArrayBlockingQueue<String> queue = new ArrayBlockingQueue<>(3); ``` * LinkedBlockingQueue:初始化容量则为有边界阻塞队列、否则就是无边界队列(默认Integer.MAX\_VALUE)的容量,内部实现是一个链表 ``` LinkedBlockingQueue queue = new LinkedBlockingQueue(3); ``` * SynchronousQueue:队列内部仅允许容纳一个元素 ## **BlockingQueue模拟生产者与消费者** 1. 生产者 ``` class Producer implements Runnable { private BlockingQueue<String> blockingQueue; private AtomicInteger count = new AtomicInteger(); private volatile boolean flag = true; public Producer(BlockingQueue<String> blockingQueue) { this.blockingQueue = blockingQueue; } @Override public void run() { while (flag) { String data = count.incrementAndGet() + ""; try { boolean success = blockingQueue.offer(data, 2, TimeUnit.SECONDS); if (success) { System.out.println("生产成功 data:" + data); } else { System.out.println("生产失败 data:" + data); } Thread.sleep(1000); } catch (Exception e) { } } } public void stop() { this.flag = false; } } ``` 2.消费者 ``` class Consumer implements Runnable { private volatile boolean flag = true; private BlockingQueue<String> blockingQueue; public Consumer(BlockingQueue<String> blockingQueue) { this.blockingQueue = blockingQueue; } @Override public void run() { while (flag) { try { String data = blockingQueue.poll(2, TimeUnit.SECONDS); if (data == null) { flag= false; System.out.println("消费者超过2秒时间未获取到消息."); return; } System.out.println("消费成功 data:" + data); } catch (Exception e) { } } } } ``` 3.测试 ``` class test { public static void main(String[] args) { LinkedBlockingDeque<String> queue = new LinkedBlockingDeque<>(3); Producer producer = new Producer(queue); Consumer consumer = new Consumer(queue); Thread p = new Thread(producer); Thread c = new Thread(consumer); p.start(); c.start(); try { Thread.sleep(1000 * 5); producer.stop(); } catch (InterruptedException e) { e.printStackTrace(); } } } ``` 运行结果 ``` 生产成功 data:1 消费成功 data:1 生产成功 data:2 消费成功 data:2 生产成功 data:3 消费成功 data:3 生产成功 data:4 消费成功 data:4 生产成功 data:5 消费成功 data:5 消费者超过2秒时间未获取到消息. ```