企业🤖AI Agent构建引擎,智能编排和调试,一键部署,支持私有化部署方案 广告
## 队列概述 队列,又称为伫列(queue),是先进先出(FIFO, First-In-First-Out)的线性表。在具体应用中通常用链表或者数组来实现。队列只允许在后端(称为*rear*)进行插入操作,在前端(称为*front*)进行删除操作。队列的操作方式和堆栈类似,唯一的区别在于队列只允许新数据在后端进行添加。   在Java中队列又可以分为两个大类,一种是阻塞队列和非阻塞队列。   1、没有实现阻塞接口:   1)实现java.util.Queue的LinkList,   2)实现java.util.AbstractQueue接口内置的不阻塞队列: PriorityQueue 和 ConcurrentLinkedQueue   2、实现阻塞接口的   java.util.concurrent 中加入了 BlockingQueue 接口和五个阻塞队列类。它实质上就是一种带有一点扭曲的 FIFO 数据结构。不是立即从队列中添加或者删除元素,线程执行操作阻塞,直到有空间或者元素可用。   五个队列所提供的各有不同:   \* ArrayBlockingQueue :一个由数组支持的有界队列。   \* LinkedBlockingQueue :一个由链接节点支持的可选有界队列。   \* PriorityBlockingQueue :一个由优先级堆支持的无界优先级队列。   \* DelayQueue :一个由优先级堆支持的、基于时间的调度队列。   \* SynchronousQueue :一个利用 BlockingQueue 接口的简单聚集(rendezvous)机制。   队列是Java中常用的数据结构,比如在线程池中就是用到了队列,比如消息队列等。   由队列先入先出的特性,我们知道队列数据的存储结构可以两种,一种是基于数组实现的,另一种则是基于单链实现。前者在创建的时候就已经确定了数组的长度,所以队列的长度是固定的,但是可以循环使用数组,所以这种队列也可以称之为循环队列。后者实现的队列内部通过指针指向形成一个队列,这种队列是单向且长度不固定,所以也称之为非循环队列。下面我将使用两种方式分别实现队列。 ## 二、基于数组实现循环队列   由于在往队列中放数据或拉取数据的时候需要移动数组对应的下标,所以需要记录一下队尾和队头的位置。说一下几个核心的属性吧:   1、queue:队列,object类型的数组,用于存储数据,长度固定,当存储的数据数量大于数组当度则抛出异常;   2、head:队头指针,int类型,用于记录队列头部的位置信息。   3、tail:队尾指针,int类型,用于记录队列尾部的位置信息。   4、size:队列长度,队列长度大于等于0或者小于等于数组长度。 ### 非线程安全队列 ```java import java.util.Arrays; import java.util.concurrent.TimeUnit; /** * Created by Mr.zihan on 2021/12/18. * connect to cowboy2014@qq.com * 特点:基于数组实现的非线程安全的队列 */ public class MyQueue { /** * 队列管道,当管道中存放的数据大于队列的长度时将不会再offer数据,直至从队列中poll数据后 */ private Object[] queue; //队列的头部,获取数据时总是从头部获取 private int head; //队列尾部,push数据时总是从尾部添加 private int tail; //队列长度 private int size; //数组中能存放数据的最大容量 private final static int MAX_CAPACITY = 1<<30; //数组长度 private int capacity; //最大下标 private int maxIndex; public MyQueue(int initialCapacity) { if (initialCapacity > MAX_CAPACITY) { throw new OutOfMemoryError("initialCapacity too large"); } if (initialCapacity <= 0) { throw new IndexOutOfBoundsException("initialCapacity must be more than zero"); } this.queue = new Object[initialCapacity]; this.capacity = initialCapacity; this.maxIndex = initialCapacity - 1; this.head = this.tail = -1; this.size = 0; } public MyQueue(){ queue = new Object[16]; capacity = 16; head = tail = -1; size = 0; maxIndex = 15; } /** * 往队列尾部添加数据 * @param object 数据 */ public void offer(Object object){ if (size >= capacity){ System.out.println("queue's size more than or equal to array's capacity"); return; } if (++tail > maxIndex){ //循环队列 tail = 0; } queue[tail] = object; size++; } /** * 从队列头部拉出数据 * @return 返回队列的第一个数据 */ public Object poll(){ if (size <= 0){ System.out.println("the queue is null"); return null; } if (++head > maxIndex){ head = 0; } size--; Object old = queue[head]; queue[head] = null; return old; } ``` 这个队列是非线程安全的。测试思路: 1. 使用一个线程不停发送数据; 2. 另外10个线程用来处理队列中的数据,二者的速率相当,防止过早队列满或队列空。 ``` public static void main(String[] args) { MyQueue myQueue = new MyQueue(); //启动n个线程做真实推送 for (int i = 0; i < 10; i++) { new Thread(() -> { while (true) { try { long starTime = System.currentTimeMillis(); //获取一条推送消息,此方法会进行不进行阻塞 String msg = (String) myQueue.poll(); //模拟推送耗时 TimeUnit.MILLISECONDS.sleep(10); if (null == msg){ continue; } long endTime = System.currentTimeMillis(); System.out.println(String.format("[%s,%s,take耗时:%s],%s,收到消息:%s", starTime, endTime, (endTime - starTime), Thread.currentThread().getName(), msg)); } catch (InterruptedException e) { e.printStackTrace(); } } }).start(); } // 定义一个线程发送消息 int i = 0; while (true){ myQueue.offer("消息编号" + i++); try { TimeUnit.MILLISECONDS.sleep(1); } catch (InterruptedException e) { e.printStackTrace(); } } } ``` 证据呈上,编号2138的消息被消费了好多遍: ``` [1639817508763,1639817508773,take耗时:10],Thread-5,收到消息:消息编号2138 [1639817508763,1639817508773,take耗时:10],Thread-1,收到消息:消息编号2139 the queue is null [1639817508763,1639817508773,take耗时:10],Thread-4,收到消息:消息编号2140 the queue is null [1639817508763,1639817508773,take耗时:10],Thread-9,收到消息:消息编号2138 the queue is null [1639817508763,1639817508773,take耗时:10],Thread-7,收到消息:消息编号2138 ``` ### 优化为线程安全队列 #### 使用volatile ``` private volatile Object[] queue; //队列的头部,获取数据时总是从头部获取 private volatile int head; //队列尾部,push数据时总是从尾部添加 private volatile int tail; ``` #### 使用synchronized关键字 #### 使用ReentrantLock #### 使用CAS ## 参考资料 1. [教你如何使用Java手写一个基于数组实现的队列](ttps://www.cnblogs.com/rainple/p/9988341.html)