[TOC]
# 简介
首先加入RabbitMQ java client依赖:
~~~xml
<dependencies>
<dependency>
<groupId>com.rabbitmq</groupId>
<artifactId>amqp-client</artifactId>
<version>4.0.2</version>
</dependency>
</dependencies>
~~~
RabbitMQ的java client使用`com.rabbitmq.client`作为其顶级包。关键的类和接口是:
~~~
com.rabbitmq.client.Channel
com.rabbitmq.client.Connection
com.rabbitmq.client.ConnectionFactory
com.rabbitmq.client.Consumer
~~~
通过Channel可以进行一系列的api操作。 Connection(连接)用于打开通道,注册连接生命周期事件处理程序,并关闭不再需要的连接。
Connection(连接)通过ConnectionFactory实例化,ConnectionFactory可以设置一些Collection(连接)的一些配置,比如说vhost或者说username等等。
## Connections(连接)和Channels(管道)
核心的类是Connections(连接)和Channels(管道),分别代表着AMQP 0-9-1协议中的Connections(连接)和Channels(管道),一般被导入
~~~
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.Channel;
~~~
## 连接服务器
下面的代码时使用给定的参数(host name,端口等等)连接AMQP的服务器。
~~~
ConnectionFactory factory = new ConnectionFactory();
factory.setUsername(userName);
factory.setPassword(password);
factory.setVirtualHost(virtualHost);
factory.setHost(hostName);
factory.setPort(portNumber);
Connection conn = factory.newConnection();
~~~
所有的这些参数RabbitMQ服务器都设置了默认值,可以在ConnectionFactory类中查看这些默认值。
另外,URI可以以下面的方法进行连接都有默认值。
~~~
ConnectionFactory factory = new ConnectionFactory();
factory.setUri("amqp://userName:password@hostName:portNumber/virtualHost");
Connection conn = factory.newConnection();
~~~
Connection(连接)接口可以被用作创建一个channel(管道):
~~~
Channel channel = conn.createChannel();
~~~
可以使用channel(管道)发送和接收消息,下面会有讲到。
关闭连接,只需要关闭channel(管道)和connection(连接):
~~~
channel.close();
conn.close();
~~~
注意,关闭管道是被认为是最佳实践,但是却不是严格意义的必要的。当底层的连接关闭时候,channel(管道)也就自动的被关闭了。
## 使用Exchanges和Queues
客户端应用必须应用在exchanges和queues,这些都是AMQP协议定义的。使用这些(exchanges和queues)首先必须“声明”它(就是创建的意思)。
下面的代码就是怎样去"声明"一个exchange和队列,并且将它们绑定在一起。
~~~
channel.exchangeDeclare(exchangeName, "direct", true);
String queueName = channel.queueDeclare().getQueue();
channel.queueBind(queueName, exchangeName, routingKey);
~~~
可以通过参数去设置exchange和queue的一些属性,使用这些方法的一些重载方法进行相关设置。
~~~
channel.exchangeDeclare(exchangeName, "direct", true);
channel.queueDeclare(queueName, true, false, false, null);
channel.queueBind(queueName, exchangeName, routingKey);
~~~
## 发送消息(Publishing messages)
使用Channel.basicPublish方法将消息发送给一个exchange:
~~~
byte[] messageBodyBytes = "Hello, world!".getBytes();
channel.basicPublish(exchangeName, routingKey, null, messageBodyBytes);
~~~
为了更好的控制,你可以使用重载的参数来设置消息的一些属性(比如说mandatory标志,关于mandatory标志,下面会讲到),或者在发送消息前设定一些消息属性。
~~~
channel.basicPublish(exchangeName, routingKey, mandatory,
MessageProperties.PERSISTENT_TEXT_PLAIN,
messageBodyBytes);
~~~
可以自己构建BasicProperties的对象,如下面的代码:
~~~
channel.basicPublish(exchangeName, routingKey,
new AMQP.BasicProperties.Builder()
.contentType("text/plain")
.deliveryMode(2)
.priority(1)
.userId("bob")
.build()),
messageBodyBytes);
~~~
发送消息指定头信息:
~~~
Map<String, Object> headers = new HashMap<String, Object>();
headers.put("latitude", 51.5252949);
headers.put("longitude", -0.0905493);
channel.basicPublish(exchangeName, routingKey,
new AMQP.BasicProperties.Builder()
.headers(headers)
.build()),
messageBodyBytes);
~~~
发送一个有过期时间的消息,下面的博客也会讲到:
~~~
channel.basicPublish(exchangeName, routingKey,
new AMQP.BasicProperties.Builder()
.expiration("60000")
.build()),
messageBodyBytes);
~~~
## 订阅消息("Push API")
~~~
import com.rabbitmq.client.Consumer;
import com.rabbitmq.client.DefaultConsumer;
~~~
最有效的接收消息的方法是使用Consumer接口去订阅。当消息到达消费端的时候会自动的传递消费(delivered),而不需要去请求。
当我们调用Consumers(消费者)有关的api的时候,会生成一个消费者标识符(consumer tag)。
不同的Consumer实例必须有不同的消费者标签。 强烈建议不要在连接上重复使用消费者标签,不然在监视消费者时可能导致自动连接恢复和混淆监控数据的问题。
实现Consumer的最简单的方法是将便利(convenience)类DefaultConsumer子类化。 该子类的对象可以在basicConsume方法调用中传递以设置订阅:
~~~java
boolean autoAck = false;
channel.basicConsume(queueName, autoAck, "myConsumerTag", new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
String routingKey = envelope.getRoutingKey();
String contentType = properties.getContentType();
long deliveryTag = envelope.getDeliveryTag();
// (process the message components here ...)
channel.basicAck(deliveryTag, false);
}
});
~~~
在这里,因为我们设置了自动确认(`autoAck`)的值为false,所以有必要在传递给消费者的方法中进行自动确认(`handleDelivery`方法中)。
更复杂的消费者将会重写更多的方法。事实上,`handleShutdownSignal`方法被调用当Channel(通道)和连接关闭的时候。并且在调用该消费者的任何回调方法之前将`consumer tag`传递给`handleConsumeOk`(com.rabbitmq.client.Consumer接口中定义的方法)方法
消费者还可以分别实现`handleCancelOk`(com.rabbitmq.client.Consumer接口中定义的方法)和`handleCancel`(com.rabbitmq.client.Consumer接口中定义的方法)方法来通知显式和隐式取消。
你也可以使用Channel.basicCancel方法明确的取消一个特定的消费,传递consumer tag,
~~~
channel.basicCancel(consumerTag);
~~~
和生产者一样,对于消费者来说并发处理消息也要慎重考虑。
回调给消费者是在与实例化其`Channel`(管道)的线程分开的线程池中调度的。 这意味着消费者可以安全地在`Connection`或`Channel`上调用阻塞方法,例如`Channel#queueDeclare`或`Channel#basicCancel`。
每一个`Channel`(管道)都有自己的调度线程。对于最常用的使用方式就是一个消费者一个`Channel`(管道),意味着一个消费者不会阻塞其他的消费。如果是一个`Channel`(管道)多消费者必须明白一个长时间的消费调用可能会阻塞其他消费者的回调调度。
# 通道和并发注意事项(线程安全)
根据经验,**在线程间共享Channel(通道)是要避免的**。应用应该优先使用每个线程自己的Channel(通道)实例,而不是多个线程共享这个Channel(通道)实例。
虽然有些在Channel(通道)上的操作是可以并发安全的调用,但是一些操作不行会导致一些边界交错,双重确认等等。
在共享(多线程)Channel(通道)上进行并发发布会导致一些边界交错,触发连接协议异常和连接关闭。因此需要严格在应用中同步调用(Channel#basicPublish必须在正确关键的地方调用)。线程之间的共享也会干扰生产者的消息确认。我们强烈的推荐不应该在通道上进行并发的发布消息。
在共享的Channel(通道)上一个线程生产(publish)消息,一个线程消费(consume)消息是线程安全的。
服务器推送可以同时发送,保证每通道的订阅被保留。 调度机制使用`java.util.concurrent.ExecutorService`。 可以使用单列的`ConnectionFactory`调用`ConnectionFactory#setSharedExecutor`去设置所有连接共用的`executor`。
当我们手动确认[manual acknowledgements](https://link.jianshu.com?t=http://www.rabbitmq.com/confirms.html) 的时候,很重要的是考虑什么线程去做这个ack确认。如果接收传递的线程(例如,Consumer#handleDelivery委托给不同线程的传递处理)不同于手动确认的线程,则将多个线程参数设置为true是线程不安全的并导致双重确认,因此导致通道协议异常导致Channel关闭。一次确认一条消息可以确保安全的。
# 简单例子
通过ConnectionFactory获得Connection,Connection得到Channel
**Exchange**
~~~
package rabbitmq;
import com.rabbitmq.client.*;
import java.io.IOException;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.TimeoutException;
public class ExchangeTest {
public static void main(String[] args) throws IOException, TimeoutException {
ConnectionFactory connectionFactory = new ConnectionFactory();
connectionFactory.setHost("127.0.0.1");
connectionFactory.setPort(5672);
connectionFactory.setUsername("guest");
connectionFactory.setPassword("guest");
connectionFactory.setVirtualHost("/");
Connection connection = connectionFactory.newConnection();
Channel channel = connection.createChannel();
//创建exchange,类型是direct类型
channel.exchangeDeclare("zhihao.miao", "direct");
//创建exchange, 类型是direct类型
channel.exchangeDeclare("zhihao.miao.info", BuiltinExchangeType.DIRECT);
//第三个参数表示是否持久化,同步操作,有返回值
AMQP.Exchange.DeclareOk ok = channel.exchangeDeclare("zhihao.miao.debug", BuiltinExchangeType.DIRECT, true);
System.out.println("---ok---" + ok);
//设置属性
Map<String, Object> argument = new HashMap<>();
argument.put("alternate-exchange", "log");
//第三个是持久化,第四个是是否自动删除
channel.exchangeDeclare("zhihao.miao.warn", BuiltinExchangeType.TOPIC, true, false, argument);
//异步创建exchange,没有返回值,
channel.exchangeDeclareNoWait("zhihao.miao.log", BuiltinExchangeType.TOPIC, true, false, false, argument);
//判断是否存在, 不存在就报错
AMQP.Exchange.DeclareOk declareOk = channel.exchangeDeclarePassive("zhihao.miao.info");
System.out.println("---declareOk---" + declareOk);
//判断是否存在, 不存在就报错
declareOk = channel.exchangeDeclarePassive("zhihao.miao.debug");
System.out.println("---declareOk2---" + declareOk);
//删除exchange(可重复执行), 删除一个不存在的也不会报错
channel.exchangeDelete("zhihao.miao");
channel.exchangeDelete("zhihao.miao.debug");
channel.exchangeDelete("zhihao.miao.info");
channel.exchangeDelete("zhihao.miao.warn");
channel.exchangeDelete("zhihao.miao.log");
//删除exchange
channel.exchangeDelete("zhihao.miao.info");
channel.close();
connection.close();
}
}
~~~
队列的api操作
**queues**
~~~
public class QueueTest {
public static void main(String[] args) throws Exception{
ConnectionFactory connectionFactory = new ConnectionFactory();
connectionFactory.setHost("192.168.1.131");
connectionFactory.setPort(5672);
connectionFactory.setUsername("zhihao.miao");
connectionFactory.setPassword("123456");
connectionFactory.setVirtualHost("/");
Connection connection = connectionFactory.newConnection();
Channel channel = connection.createChannel();
//第二个参数表示是否持久化,第三个参数是判断这个队列是否在连接是否生效,为true表示连接关闭队列删除。
AMQP.Queue.DeclareOk ok = channel.queueDeclare("zhihao.info",true,false,false,null);
System.out.println(ok);
//异步没有返回值的方法api
channel.queueDeclareNoWait("zhihao.info.miao",true,false,false,null);
//判断queue是否存在,不存在会抛出异常
//channel.exchangeDeclarePassive("zhihao.info");
//抛出错误
//channel.exchangeDeclarePassive("zhihao.info.miao2");
//exchange和queue进行绑定(可重复执行,不会重复创建)
channel.queueBind("zhihao.info","zhihao.miao.order","info");
//异步进行绑定
channel.queueBindNoWait("zhihao.info.miao","zhihao.miao.pay","info",null);
//exchange与exchange进行绑定(可重复执行,不会重复创建)
channel.exchangeBind("zhihao.miao.email","zhihao.miao.weixin","debug");
//exchange和queue进行解绑(可重复执行)
channel.queueUnbind("zhihao.info","zhihao.miao.order","info");
//exchange和exchange进行解绑(可重复执行)
channel.exchangeUnbind("zhihao.info.miao","zhihao.miao.pay","debug");
//删除队列
channel.queueDelete("zhihao.info");
channel.close();
connection.close();
}
}
~~~
**消息的发送**
~~~java
public class Sender {
public static void main(String[] args) throws Exception{
ConnectionFactory connectionFactory = new ConnectionFactory();
connectionFactory.setUri("amqp://zhihao.miao:123456@192.168.1.131:5672");
Connection connection = connectionFactory.newConnection();
Channel channel = connection.createChannel();
AMQP.BasicProperties properties = new AMQP.BasicProperties.Builder().deliveryMode(2).
contentEncoding("UTF-8").build();
//第一个参数是exchange参数,如果是为空字符串,那么就会发送到(AMQP default)默认的exchange,而且routingKey
//便是所要发送到的队列名
channel.basicPublish("","zhihao.info.miao",properties,"忘记密码,验证码是1234".getBytes());
channel.basicPublish("","zhihao.miao",properties,"忘记密码,六位验证密码是343sdf".getBytes());
//direct类型的exchange类型的exchange,zhihao.miao.order绑定zhihao.info.miao队列,route key是order
channel.basicPublish("zhihao.miao.order","order",properties,"爱奇艺会员到期了".getBytes());
//zhihao.miao.pay绑定zhihao.info.miao队列,route key是order
channel.basicPublish("zhihao.miao.pay","pay",properties,"优酷会员到期了".getBytes());
//topic类型的exchange
channel.basicPublish("log","user.log",properties,"你的外卖已经送达".getBytes());
channel.basicPublish("log","user.log.info",properties,"你的外卖正在配送中".getBytes());
channel.basicPublish("log","user",properties,"你的投诉已经采纳".getBytes());
channel.close();
connection.close();
}
}
~~~
**消息消费**
~~~
public class Consumer {
public static void main(String[] args) throws Exception{
ConnectionFactory connectionFactory = new ConnectionFactory();
connectionFactory.setHost("192.168.1.131");
connectionFactory.setPort(5672);
connectionFactory.setUsername("zhihao.miao");
connectionFactory.setPassword("123456");
connectionFactory.setVirtualHost("/");
//客户端的消费消息
Map<String,Object> clientProperties = new HashMap<>();
clientProperties.put("desc","支付系统2.0");
clientProperties.put("author","zhihao.miao");
clientProperties.put("user","zhihao.miao@xxx.com");
connectionFactory.setClientProperties(clientProperties);
//给客户端的connetction命名
Connection connection = connectionFactory.newConnection("log队列的消费者");
//给channel起个编号
Channel channel = connection.createChannel(10);
//返回consumerTag,也可以通过重载方法进行设置consumerTag
String consumerTag = channel.basicConsume("user_log_queue",true,new SimpleConsumer(channel));
System.out.println(consumerTag);
TimeUnit.SECONDS.sleep(30);
channel.close();
connection.close();
}
}
~~~
具体的消息逻辑,继承DefaultConsumer类重写handleDelivery方法,如果是手工确认消息,会在handleDelivery方法中进行相关的确认(调用相关api)
~~~java
public class SimpleConsumer extends DefaultConsumer{
public SimpleConsumer(Channel channel){
super(channel);
}
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
System.out.println(consumerTag);
System.out.println("-----收到消息了---------------");
System.out.println("消息属性为:"+properties);
System.out.println("消息内容为:"+new String(body));
}
}
~~~
- 基础
- 编译和安装
- classpath到底是什么?
- 编译运行
- 安装
- sdkman多版本
- jabba多版本
- java字节码查看
- 数据类型
- 简介
- 整形
- char和int
- 变量和常量
- 大数值运算
- 基本类型包装类
- Math类
- 内存划分
- 位运算符
- 方法相关
- 方法重载
- 可变参数
- 方法引用
- 面向对象
- 定义
- 继承和覆盖
- 接口和抽象类
- 接口定义增强
- 内建函数式接口
- 多态
- 泛型
- final和static
- 内部类
- 包
- 修饰符
- 异常
- 枚举类
- 代码块
- 对象克隆
- BeanUtils
- java基础类
- scanner类
- Random类
- System类
- Runtime类
- Comparable接口
- Comparator接口
- MessageFormat类
- NumberFormat
- 数组相关
- 数组
- Arrays
- string相关
- String
- StringBuffer
- StringBuilder
- 正则
- 日期类
- Locale类
- Date
- DateFormat
- SimpleDateFormat
- Calendar
- 新时间日期API
- 简介
- LocalDate,LocalTime,LocalDateTime
- Instant时间点
- 带时区的日期,时间处理
- 时间间隔
- 日期时间校正器
- TimeUnit
- 用yyyy
- 集合
- 集合和迭代器
- ArrayList集合
- List
- Set
- 判断集合唯一
- Map和Entry
- stack类
- Collections集合工具类
- Stream数据流
- foreach不能修改内部元素
- of方法
- IO
- File类
- 字节流stream
- 字符流Reader
- IO流分类
- 转换流
- 缓冲流
- 流的操作规律
- properties
- 序列化流与反序列化流
- 打印流
- System类对IO支持
- commons-IO
- IO流总结
- NIO
- 异步与非阻塞
- IO通信
- Unix的IO模型
- epoll对于文件描述符操作模式
- 用户空间和内核空间
- NIO与普通IO的主要区别
- Paths,Path,Files
- Buffer
- Channel
- Selector
- Pipe
- Charset
- NIO代码
- 多线程
- 创建线程
- 线程常用方法
- 线程池相关
- 线程池概念
- ThreadPoolExecutor
- Runnable和Callable
- 常用的几种线程池
- 线程安全
- 线程同步的几种方法
- synchronized
- 死锁
- lock接口
- ThreadLoad
- ReentrantLock
- 读写锁
- 锁的相关概念
- volatile
- 释放锁和不释放锁的操作
- 等待唤醒机制
- 线程状态
- 守护线程和普通线程
- Lamda表达式
- 反射相关
- 类加载器
- 反射
- 注解
- junit注解
- 动态代理
- 网络编程相关
- 简介
- UDP
- TCP
- 多线程socket上传图片
- NIO
- JDBC相关
- JDBC
- 预处理
- 批处理
- 事务
- properties配置文件
- DBUtils
- DBCP连接池
- C3P0连接池
- 获得MySQL自动生成的主键
- Optional类
- Jigsaw模块化
- 日志相关
- JDK日志
- log4j
- logback
- xml
- tomcat
- maven
- 简介
- 仓库
- 目录结构
- 常用命令
- 生命周期
- idea配置
- jar包冲突
- 依赖范围
- 私服
- 插件
- git-commit-id-plugin
- maven-assembly-plugin
- maven-resources-plugin
- maven-compiler-plugin
- versions-maven-plugin
- maven-source-plugin
- tomcat-maven-plugin
- 多环境
- 自定义插件
- stream
- swing
- json
- jackson
- optional
- junit
- gradle
- servlet
- 配置
- ServletContext
- 生命周期
- HttpServlet
- request
- response
- 乱码
- session和cookie
- cookie
- session
- jsp
- 简介
- 注释
- 方法,成员变量
- 指令
- 动作标签
- 隐式对象
- EL
- JSTL
- javaBean
- listener监听器
- Filter过滤器
- 图片验证码
- HttpUrlConnection
- 国际化
- 文件上传
- 文件下载
- spring
- 简介
- Bean
- 获取和实例化
- 属性注入
- 自动装配
- 继承和依赖
- 作用域
- 使用外部属性文件
- spel
- 前后置处理器
- 生命周期
- 扫描规则
- 整合多个配置文件
- 注解
- 简介
- 注解分层
- 类注入
- 分层和作用域
- 初始化方法和销毁方法
- 属性
- 泛型注入
- Configuration配置文件
- aop
- aop的实现
- 动态代理实现
- cglib代理实现
- aop名词
- 简介
- aop-xml
- aop-注解
- 代理方式选择
- jdbc
- 简介
- JDBCTemplate
- 事务
- 整合
- junit整合
- hibernate
- 简介
- hibernate.properties
- 实体对象三种状态
- 检索方式
- 简介
- 导航对象图检索
- OID检索
- HQL
- Criteria(QBC)
- Query
- 缓存
- 事务管理
- 关系映射
- 注解
- 优化
- MyBatis
- 简介
- 入门程序
- Mapper动态代理开发
- 原始Dao开发
- Mapper接口开发
- SqlMapConfig.xml
- map映射文件
- 输出返回map
- 输入参数
- pojo包装类
- 多个输入参数
- resultMap
- 动态sql
- 关联
- 一对一
- 一对多
- 多对多
- 整合spring
- CURD
- 占位符和sql拼接以及参数处理
- 缓存
- 延迟加载
- 注解开发
- springMVC
- 简介
- RequestMapping
- 参数绑定
- 常用注解
- 响应
- 文件上传
- 异常处理
- 拦截器
- springBoot
- 配置
- 热更新
- java配置
- springboot配置
- yaml语法
- 运行
- Actuator 监控
- 多环境配置切换
- 日志
- 日志简介
- logback和access
- 日志文件配置属性
- 开机自启
- aop
- 整合
- 整合Redis
- 整合Spring Data JPA
- 基本查询
- 复杂查询
- 多数据源的支持
- Repository分析
- JpaSpecificationExecutor
- 整合Junit
- 整合mybatis
- 常用注解
- 基本操作
- 通用mapper
- 动态sql
- 关联映射
- 使用xml
- spring容器
- 整合druid
- 整合邮件
- 整合fastjson
- 整合swagger
- 整合JDBC
- 整合spingboot-cache
- 请求
- restful
- 拦截器
- 常用注解
- 参数校验
- 自定义filter
- websocket
- 响应
- 异常错误处理
- 文件下载
- 常用注解
- 页面
- Thymeleaf组件
- 基本对象
- 内嵌对象
- 上传文件
- 单元测试
- 模拟请求测试
- 集成测试
- 源码解析
- 自动配置原理
- 启动流程分析
- 源码相关链接
- Servlet,Filter,Listener
- springcloud
- 配置
- 父pom
- 创建子工程
- Eureka
- Hystrix
- Ribbon
- Feign
- Zuul
- kotlin
- 基本数据类型
- 函数
- 区间
- 区块链
- 简介
- linux
- ulimit修改
- 防止syn攻击
- centos7部署bbr
- debain9开启bbr
- mysql
- 隔离性
- sql执行加载顺序
- 7种join
- explain
- 索引失效和优化
- 表连接优化
- orderby的filesort问题
- 慢查询
- show profile
- 全局查询日志
- 死锁解决
- sql
- 主从
- IDEA
- mac快捷键
- 美化界面
- 断点调试
- 重构
- springboot-devtools热部署
- IDEA进行JAR打包
- 导入jar包
- ProjectStructure
- toString添加json模板
- 配置maven
- Lombok插件
- rest client
- 文档显示
- sftp文件同步
- 书签
- 代码查看和搜索
- postfix
- live template
- git
- 文件头注释
- JRebel
- 离线模式
- xRebel
- github
- 连接mysql
- 选项没有Java class的解决方法
- 扩展
- 项目配置和web部署
- 前端开发
- json和Inject language
- idea内存和cpu变高
- 相关设置
- 设计模式
- 单例模式
- 简介
- 责任链
- JUC
- 原子类
- 原子类简介
- 基本类型原子类
- 数组类型原子类
- 引用类型原子类
- JVM
- JVM规范内存解析
- 对象的创建和结构
- 垃圾回收
- 内存分配策略
- 备注
- 虚拟机工具
- 内存模型
- 同步八种操作
- 内存区域大小参数设置
- happens-before
- web service
- tomcat
- HTTPS
- nginx
- 变量
- 运算符
- 模块
- Rewrite规则
- Netty
- netty为什么没用AIO
- 基本组件
- 源码解读
- 简单的socket例子
- 准备netty
- netty服务端启动
- 案例一:发送字符串
- 案例二:发送对象
- websocket
- ActiveMQ
- JMS
- 安装
- 生产者-消费者代码
- 整合springboot
- kafka
- 简介
- 安装
- 图形化界面
- 生产过程分析
- 保存消息分析
- 消费过程分析
- 命令行
- 生产者
- 消费者
- 拦截器interceptor
- partition
- kafka为什么快
- kafka streams
- kafka与flume整合
- RabbitMQ
- AMQP
- 整体架构
- RabbitMQ安装
- rpm方式安装
- 命令行和管控页面
- 消息生产与消费
- 整合springboot
- 依赖和配置
- 简单测试
- 多方测试
- 对象支持
- Topic Exchange模式
- Fanout Exchange订阅
- 消息确认
- java client
- RabbitAdmin和RabbitTemplate
- 两者简介
- RabbitmqAdmin
- RabbitTemplate
- SimpleMessageListenerContainer
- MessageListenerAdapter
- MessageConverter
- 详解
- Jackson2JsonMessageConverter
- ContentTypeDelegatingMessageConverter
- lucene
- 简介
- 入门程序
- luke查看索引
- 分析器
- 索引库维护
- elasticsearch
- 配置
- 插件
- head插件
- ik分词插件
- 常用术语
- Mapping映射
- 数据类型
- 属性方法
- Dynamic Mapping
- Index Template 索引模板
- 管理映射
- 建立映射
- 索引操作
- 单模式下CURD
- mget多个文档
- 批量操作
- 版本控制
- 基本查询
- Filter过滤
- 组合查询
- 分析器
- redis
- String
- list
- hash
- set
- sortedset
- 发布订阅
- 事务
- 连接池
- 管道
- 分布式可重入锁
- 配置文件翻译
- 持久化
- RDB
- AOF
- 总结
- Lettuce
- zookeeper
- zookeeper简介
- 集群部署
- Observer模式
- 核心工作机制
- zk命令行操作
- zk客户端API
- 感知服务动态上下线
- 分布式共享锁
- 原理
- zab协议
- 两阶段提交协议
- 三阶段提交协议
- Paxos协议
- ZAB协议
- hadoop
- 简介
- hadoop安装
- 集群安装
- 单机安装
- linux编译hadoop
- 添加新节点
- 退役旧节点
- 集群间数据拷贝
- 归档
- 快照管理
- 回收站
- 检查hdfs健康状态
- 安全模式
- hdfs简介
- hdfs命令行操作
- 常见问题汇总
- hdfs客户端操作
- mapreduce工作机制
- 案例-单词统计
- 局部聚合Combiner
- combiner流程
- combiner案例
- 自定义排序
- 自定义Bean对象
- 排序的分类
- 案例-按总量排序需求
- 一次性完成统计和排序
- 分区
- 分区简介
- 案例-结果分区
- 多表合并
- reducer端合并
- map端合并(分布式缓存)
- 分组
- groupingComparator
- 案例-求topN
- 全局计数器
- 合并小文件
- 小文件的弊端
- CombineTextInputFormat机制
- 自定义InputFormat
- 自定义outputFormat
- 多job串联
- 倒排索引
- 共同好友
- 串联
- 数据压缩
- InputFormat接口实现类
- yarn简介
- 推测执行算法
- 本地提交到yarn
- 框架运算全流程
- 数据倾斜问题
- mapreduce的优化方案
- HA机制
- 优化
- Hive
- 安装
- shell参数
- 数据类型
- 集合类型
- 数据库
- DDL操作
- 创建表
- 修改表
- 分区表
- 分桶表
- DML操作
- load
- insert
- select
- export,import
- Truncate
- 注意
- 严格模式
- 函数
- 内置运算符
- 内置函数
- 自定义函数
- Transfrom实现
- having和where不同
- 压缩
- 存储
- 存储和压缩结合使用
- explain详解
- 调优
- Fetch抓取
- 本地模式
- 表的优化
- GroupBy
- count(Distinct)去重统计
- 行列过滤
- 动态分区调整
- 数据倾斜
- 并行执行
- JVM重用
- 推测执行
- reduce内存和个数
- sql查询结果作为变量(shell)
- youtube
- flume
- 简介
- 安装
- 常用组件
- 拦截器
- 案例
- 监听端口到控制台
- 采集目录到HDFS
- 采集文件到HDFS
- 多个agent串联
- 日志采集和汇总
- 单flume多channel,sink
- 自定义拦截器
- 高可用配置
- 使用注意
- 监控Ganglia
- sqoop
- 安装
- 常用命令
- 数据导入
- 准备数据
- 导入数据到HDFS
- 导入关系表到HIVE
- 导入表数据子集
- 增量导入
- 数据导出
- 打包脚本
- 作业
- 原理
- azkaban
- 简介
- 安装
- 案例
- 简介
- command类型单一job
- command类型多job工作流flow
- HDFS操作任务
- mapreduce任务
- hive脚本任务
- oozie
- 安装
- hbase
- 简介
- 系统架构
- 物理存储
- 寻址机制
- 读写过程
- 安装
- 命令行
- 基本CURD
- java api
- CURD
- CAS
- 过滤器查询
- 建表高级属性
- 与mapreduce结合
- 与sqoop结合
- 协处理器
- 参数配置优化
- 数据备份和恢复
- 节点管理
- 案例-点击流
- 简介
- HUE
- 安装
- storm
- 简介
- 安装
- 集群启动及任务过程分析
- 单词统计
- 单词统计(接入kafka)
- 并行度和分组
- 启动流程分析
- ACK容错机制
- ACK简介
- BaseRichBolt简单使用
- BaseBasicBolt简单使用
- Ack工作机制
- 本地目录树
- zookeeper目录树
- 通信机制
- 案例
- 日志告警
- 工具
- YAPI
- chrome无法手动拖动安装插件
- 时间和空间复杂度
- jenkins
- 定位cpu 100%
- 常用脚本工具
- OOM问题定位
- scala
- 编译
- 基本语法
- 函数
- 数组常用方法
- 集合
- 并行集合
- 类
- 模式匹配
- 异常
- tuple元祖
- actor并发编程
- 柯里化
- 隐式转换
- 泛型
- 迭代器
- 流stream
- 视图view
- 控制抽象
- 注解
- spark
- 企业架构
- 安装
- api开发
- mycat
- Groovy
- 基础