[TOC]
# BlockingQueue简介
消息队列常用于有生产者和消费者两类角色的多线程同步场景
BlockingQueue也是java.util.concurrent下的主要用来控制线程同步的工具。
主要的方法是:put、take一对阻塞存取;add、poll一对非阻塞存取。
**插入:**
1. add(anObject):把anObject加到BlockingQueue里,即如果BlockingQueue可以容纳,则返回true,否则抛出异常
2. offer(anObject):表示如果可能的话,将anObject加到BlockingQueue里,即如果BlockingQueue可以容纳,则返回true,否则返回false.
3. put(anObject):把anObject加到BlockingQueue里,如果BlockQueue没有空间,则调用此方法的线程被阻塞直到BlockingQueue里面有空间再继续.
**读取:**
4. poll(time):取走BlockingQueue里排在首位的对象,若不能立即取出,则可以等time参数规定的时间,取不到时返回null
5. take():取走BlockingQueue里排在首位的对象,若BlockingQueue为空,阻断进入等待状态直到Blocking有新的对象被加入为止
**其他**
`int remainingCapacity();`返回队列剩余的容量,在队列插入和获取的时候,不要瞎搞,数据可能不准
`boolean remove(Object o); `从队列移除元素,如果存在,即移除一个或者更多,队列改变了返回true
`public boolean contains(Object o);` 查看队列是否存在这个元素,存在返回true
`int drainTo(Collection<? super E> c); `传入的集合中的元素,如果在队列中存在,那么将队列中的元素移动到集合中
`int drainTo(Collection<? super E> c, int maxElements);` 和上面方法的区别在于,制定了移动的数量
## 实现类
BlockingQueue有四个具体的实现类,常用的两种实现类为:
1. ArrayBlockingQueue:一个由数组支持的有界阻塞队列,规定大小的BlockingQueue,其构造函数必须带一个int参数来指明其大小.其所含的对象是以FIFO(先入先出)顺序排序的。
2. LinkedBlockingQueue:大小不定的BlockingQueue,若其构造函数带一个规定大小的参数,生成的BlockingQueue有大小限制,若不带大小参数,所生成的BlockingQueue的大小由Integer.MAX_VALUE来决定.其所含的对象是以FIFO(先入先出)顺序排序的。
LinkedBlockingQueue 可以指定容量,也可以不指定,不指定的话,
默认最大是Integer.MAX_VALUE,其中主要用到put和take方法,
put方法在队列满的时候会阻塞直到有队列成员被消费,
take方法在队列空的时候会阻塞,直到有队列成员被放进来。
LinkedBlockingQueue和ArrayBlockingQueue区别:
LinkedBlockingQueue和ArrayBlockingQueue比较起来,它们背后所用的数据结构不一样,导致LinkedBlockingQueue的数据吞吐量要大于ArrayBlockingQueue,但在线程数量很大时其性能的可预见性低于ArrayBlockingQueue.
## 代码
**队列生产者**
~~~
package testThread;
import java.util.Random;
import java.util.concurrent.BlockingQueue;
public class TestBlockingQueueProducer implements Runnable{
private final BlockingQueue<String> queue;
Random random = new Random();
//生产者
public TestBlockingQueueProducer(BlockingQueue<String> queue) {
this.queue = queue;
}
@Override
public void run() {
for (int i = 0; i < 10; i++) {
try {
Thread.sleep(random.nextInt(10));
String task = Thread.currentThread().getName() + " made a product " + i;
System.out.println(task);
//阻塞方法
queue.put(task);
}catch (Exception e) {
e.printStackTrace();
}
}
}
}
~~~
**队列消费者**
~~~
package testThread;
import java.util.Random;
import java.util.concurrent.BlockingQueue;
public class TestBlockingQueueConsumer implements Runnable {
private final BlockingQueue<String> queue;
Random random = new Random();
//消费者
public TestBlockingQueueConsumer(BlockingQueue<String> queue) {
this.queue = queue;
}
@Override
public void run() {
try {
Thread.sleep(random.nextInt(10));
System.out.println(Thread.currentThread().getName() + " trying...");
//如果队列为空会阻塞当前线程
String temp = queue.take();
//这个是不准的,又有人放又有人取
int remainingCapacity = queue.remainingCapacity();
System.out.println(Thread.currentThread().getName() + " get a job " + temp);
// System.out.println("队列中的元素个数: "+ remainingCapacity);
} catch (Exception e) {
e.printStackTrace();
}
}
}
~~~
**测试**
~~~
package testThread;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
public class TestBlockingQueue {
public static void main(String[] args) {
BlockingQueue<String> queue = new LinkedBlockingQueue<String>(2);
// BlockingQueue<String> queue = new LinkedBlockingQueue<String>();
// 不设置的话,LinkedBlockingQueue默认大小为Integer.MAX_VALUE
// BlockingQueue<String> queue = new ArrayBlockingQueue<String>(2);
TestBlockingQueueConsumer consumer = new TestBlockingQueueConsumer(queue);
TestBlockingQueueProducer producer = new TestBlockingQueueProducer(queue);
for (int i = 0; i < 3; i++) {
new Thread(producer, "Producer" + (i + 1)).start();
}
for (int i = 0; i < 5; i++) {
new Thread(consumer, "Consumer" + (i + 1)).start();
}
// new Thread(producer, "Producer" + (5)).start();
}
}
~~~
## 成员介绍
### ArrayBlockingQueue
线程安全
一个由数组结构组成的有界阻塞队列
基于数组实现的有界阻塞队列,查找快,增删慢
生产者和消费者用的是同一把锁
消费的方式:FIFO
需求:想按照队列顺序去执行任务,还不想出现频繁的GC现象
### LinkedBlockingQueue
线程安全
一个由链表结构组成的有界阻塞队列
基于链表实现的阻塞队列,链表是增删快,定位慢
### DelayQueue
DelayQueue中的元素,只有指定的延迟时间到了,才能够从队列中获取到该元素。
DelayQueue是一个没有大小限制的队列,因此往队列中插入数据的操作(生产者)永远不会被阻塞,而只有获取数据的操作(消费者)才会被阻塞
应用场景:
1. 客户端长时间占用连接的问题,超过这个空闲时间了,可以移除的
2. 处理长时间不用的缓存;如果队列里面的对象长时间不用,超过了空闲时间,就移除
3. 任务超时处理
### PriorityBlockingQueue
线程安全
一个支持优先级排序的无界阻塞队列
PriorityBlockingQueue并不会阻塞数据生产者,而只会在没有可消费的数据时,阻塞数据的消费者
不阻塞生产者
阻塞消费者
### SynchronousQueue
一种无缓冲的等待队列,来一个任务就执行这个任务,这期间不能太添加任何的任务。也就是不用阻塞了,其实对于少量任务而言,这种做法更高效
声明一个SynchronousQueue有两种不同的方式,它们之间有着不太一样的行为。公平模式和非公平模式的区别:
如果采用公平模式:SynchronousQueue会采用公平锁,并配合一个FIFO队列来阻塞多余的生产者和消费者,从而体系整体的公平策略;
但如果是非公平模式(SynchronousQueue默认):SynchronousQueue采用非公平锁,同时配合一个LIFO队列来管理多余的生产者和消费者,而后一种模式,如果生产者和消费者的处理速度有差距,则很容易出现饥渴的情况,即可能有某些生产者或者是消费者的数据永远都得不到处理
### concurrentLinkedQueue
peek
- linux
- 常用命令
- 高级文本命令
- 面试题
- redis
- String
- list
- hash
- set
- sortedSet
- 案例-推荐
- java高级特性
- 多线程
- 实现线程的三种方式
- 同步关键词
- 读写锁
- 锁的相关概念
- 多线程的join
- 有三个线程T1 T2 T3,保证顺序执行
- java五种线程池
- 守护线程与普通线程
- ThreadLocal
- BlockingQueue消息队列
- JMS
- 反射
- volatile
- jvm
- IO
- nio
- netty
- netty简介
- 案例一发送字符串
- 案例二发送对象
- 轻量级RPC开发
- 简介
- spring(IOC/AOP)
- spring初始化顺序
- 通过ApplicationContextAware加载Spring上下文
- InitializingBean的作用
- 结论
- 自定义注解
- zk在框架中的应用
- hadoop
- 简介
- hadoop集群搭建
- hadoop单机安装
- HDFS简介
- hdfs基本操作
- hdfs环境搭建
- 常见问题汇总
- hdfs客户端操作
- mapreduce工作机制
- 案列-单词统计
- 局部聚合Combiner
- 案列-流量统计(分区,排序,比较)
- 案列-倒排索引
- 案例-共同好友
- 案列-join算法实现
- 案例-求topN(分组)
- 自定义inputFormat
- 自定义outputFormat
- 框架运算全流程
- mapreduce的优化方案
- HA机制
- Hive
- 安装
- DDL操作
- 创建表
- 修改表
- DML操作
- Load
- insert
- select
- join操作
- 严格模式
- 数据类型
- shell参数
- 函数
- 内置运算符
- 内置函数
- 自定义函数
- Transform实现
- 特殊分割符处理
- 案例
- 级联求和accumulate
- flume
- 简介
- 安装
- 常用的组件
- 拦截器
- 案例
- 采集目录到HDFS
- 采集文件到HDFS
- 多个agent串联
- 日志采集和汇总
- 自定义拦截器
- 高可用配置
- 使用注意
- sqoop
- 安装
- 数据导入
- 导入数据到HDFS
- 导入关系表到HIVE
- 导入表数据子集
- 增量导入
- 数据导出
- 作业
- 原理
- azkaban
- 简介
- 安装
- 案例
- 简介
- command类型单一job
- command类型多job工作流flow
- HDFS操作任务
- mapreduce任务
- hive脚本任务
- hbase
- 简介
- 安装
- 命令行
- 基本CURD
- 过滤器查询
- 系统架构
- 物理存储
- 寻址机制
- 读写过程
- Region管理
- master工作机制
- 建表高级属性
- 与mapreduce结合
- 协处理器
- 点击流平台开发
- 简介
- storm
- 简介
- 安装
- 集群启动及任务过程分析
- 单词统计
- 并行度
- ACK容错机制
- ACK简介