[TOC]
# 安装
## 下载
安装jdk,并配置好环境变量
下载kafka到/tmp下
~~~
wget http://mirrors.tuna.tsinghua.edu.cn/apache/kafka/0.11.0.0/kafka_2.12-0.11.0.0.tgz -P /tmp
~~~
你可以登录Apache kafka 官方下载。
http://kafka.apache.org/downloads.html
注意:别下成源文件了!
带src的是源文件,如:
~~~
Source download: kafka-0.10.1.0-src.tgz (asc, md5)
~~~
你应该下的是:
~~~
Scala 2.11 - kafka_2.11-0.10.1.0.tgz (asc, md5)
~~~
推荐下载scala 2.11版本的
## server.properties配置文件
在kafak目录下创建个logs文件夹
解压去config文件夹下,编辑server.properties
~~~
# broker的全局唯一编号,不能重复
broker.id=0
# 用来监听链接的端口,producer或consumer将在此端口建立连接
port=9092
#删除topic功能使能
delete.topic.enable=true
# 处理网络请求的线程数量
num.network.threads=3
# 用来处理磁盘IO线程数量
num.io.threads=8
# 发送套接字的缓冲区大小
socket.send.buffer.bytes=102400
# 接受套接字的缓冲区大小
socket.receive.buffer.bytes=102400
# 请求套接字的缓冲区大小
socket.request.max.bytes=104857600
# kafka运行日志存放的路径
log.dirs=/root/tools/kafka/logs
# topic在当前broker上的分片个数
num.partitions=2
# 用来恢复和清理data下数据
num.recovery.threads.per.data.dir=1
# segment文件保留的最长时间,超时将被删除,和下面参数配合,达到这个时间生成新的
log.retention.hours=168
# 滚动生成新的segment文件的最大时间
log.roll.hours=168
# 日志文件中每个segment的大小,默认为1G
log.segment.bytes=1073741824
# 周期性检查文件大小的时间,检查上面一个参数
log.retention.check.interval.ms=300000
# 消息保存的最大值5M
message.max.byte=5242880
#kafka保存消息的副本数,如果一个副本失效了,另一个还可以继续提供服务
default.replication.factor=2
# 日志清理是否打开
log.cleaner.enable=true
# broker需要使用zookeeper保存meta数据
zookeeper.connect=master:2181,slave1:2181,slave2:2181
# zookeeper链接超时时间
zookeeper.connection.timeout.ms=6000
# partion buffer中,消息的条数达到阈值,将触发flush到磁盘
log.flush.interval.messages=10000
# 消息buffer的时间,达到阈值,将触发flush到磁盘
log.flush.interval.ms=3000
# 删除topic需要server.properties中设置delete.topic.enable=true否则只是标记删除
delete.topic.enable=true
# 此处的host.name为本机ip,如果不改,则客户端会抛出:Producer connection to localhost:9092 unsuccessful错误
host.name=master
advertised.host.name=192.168.33.70
~~~
分发到各个机器上
**`改下broker.id还有host.name还有advertised.host.name还有log的位置`**
配置下kafka的环境变量
## producer.proerties配置文件
~~~
#指定kafka节点列表,用于获取metadata,不必全部指定
metadata.broker.list=master:9092,slave1:9092,slave2:9092
#指定分区处理类.,默认kafka.producer.DefaultPartitioner,表通过key哈希到对应分区
#partitioner.class=kafka.producer.DefaultPartitioner
#是否压缩,默认0表示不压缩,1表示用gzip压缩,2表示用snappy压缩.压缩后消息中会有头来指明消息压缩的类型,故在消费者端消息解压是透明的无需指定
compression.codec=none
#指定序列化处理类,数据传输需要序列化
serializer.class=kafka.serializer.DefaultEncoder
#如果要压缩消息,这里指定哪些topic要压缩消息,默认empty,表示不压缩
#compressed.topic=
#设置发送数据是否需要服务端的反馈,有三个值0,-1,1
# 0:producer不会等待broker发送ack
# 1:当leader接收到消息之后发送ack
# -1:当所有的follower都同步消息成功后发送ack
request.required.acks=0
#在向producer发送ack之前,broker允许等待的最大时间,如果超时,broker将会向producer发送一个error ACK.意味着上一次消息因为某种原因未能成功(比如follower未能同步成功)
request.timeout.ms=10000
#同步还是异步发送消息,默认sync表示同步,async表示异步.
#异步可以提高发送吞吐量,也意味着消息将会在本地buffer中,并适时批量发送,但是也可能导致丢失未发送过去的消息
producer.type=sync
#在async模式下,当message被缓存的时间超过此值后,将会批量发送给broker,默认为5000ms
#在值和batch.num.message协同工作
queue.buffering.max.ms=5000
#在async模式下,producer端允许buffer的最大消息量
#无论如何,producer都无法尽快的将消息发送给broker,从而导致消息在producer端大量沉积
#此时,如果消息的条数达到阈值,将会导致producer端阻塞或者消息被抛弃,默认为10000
queue.buffering.max.messages=20000
#如果是异步,指定每次批量发送数据量,默认为200
batch.num.messages=500
#当消息在producer端沉积的条数达到"queue.buffering.max.messages"后
#阻塞一定时间后,队列仍然没有enqueue(producer仍然没有发送任何消息)
#此时producer可以继续阻塞或者将消息抛弃,此timeout值用于控制"阻塞"的时间
#-1:无阻塞超时限制,消息不会被抛弃
#0:立即清空队列,消息被抛弃
queue.enqueue.timeout.ms=-1
#当producer接收到error ACK,或者没有接收到ACK时,允许消息重发的次数
#因为broker并没有完整的机制来避免消息重复,所以当网络异常时(比如ACK丢失)
#有可能导致broker接收到重复的消息,默认值为3
message.send.max.retries=3
#producer刷新topic metadata的时间间隔,producer需要知道partition leader的位置,以及当前topic的情况
#因此producer需要一个机制来获取最新的metadata,当producer遇到特定错误时,将会立即刷新
#(比如topic失效,partition丢失,leader失效等),此外也可以通过此参数来配置额外的刷新机制,默认值600000
topic.metadata.refresh.interval.ms=60000
~~~
**`metadata.broker.list`要修改地址**
## consumer.properties配置文件
~~~
#zookeeper连接服务器地址
zookeeper.connect=master:2181,slave1:2181,slave2:2181
#zookeeper的session过期时间,默认5000ms,用于检测消费者是否挂掉
zookeeper.session.timeout.ms=5000
#当消费者挂掉,其他消费者要等该指定时间才能检查到并且触发重新负载均衡
zookeeper.connection.timeout.ms=10000
#指定多久消费者更新offset到zookeeper中.注意offset更新时基于time而不是每次获得的消息.一旦在更新zookeeper发生异常并重启,将可能拿到已拿到过的消息
zookeeper.sync.time.ms=2000
#指定消费
#group.id=master
#当consumer消费一定量的消息之后,将会自动向zookeeper提交offset信息
#注意offset信息并不是每消费一次消息就向zk提交一次,而先在本地保存(内存),并定期提交,默认为true
auto.commit.enable=true
#自动更新时间.默认60*1000
auto.commit.interval.ms=1000
#当前consumer的标识,可以设定,也可以有系统生成,主要用来跟踪消息消费情况,便于观察
consumer.id=xx
#消费者客户端编号,用户区分不同客户端,默认客户端程序自动产生
client.id=xxx
#最大取多少块缓存到消费者(默认10)
queued.max.message.chunks=50
#当有新的consumer加入到group时,将会reblance,此后将会有partitions的消费端迁移到新的consumer上.
#如果一个consumer获得了某个partition的消费权限,那么它将会向zk注册"Partition Owner registry"节点信息,
#但是有可能此时旧的consumer尚没有释放此节点,此值用于控制,注册节点的重试次数
rebalance.max.retries=5
#获取消息的最大尺寸,broker不会像consumer输出大于此值的消息chunk.每次feth将得到多条消息,此值为总大小,提示此值,将会消耗更多的consumer端内存
fetch.min.bytes=6553600
#当消息的尺寸不足时,server阻塞的时间.如果超时,消息将立即发送给consumer
#就是consumer拉取消息的时候,消息比如生产了一半,这时候会等待,等待的时间超过这个时候就不管了,发给consumer
fetch.wait.max.ms=5000
socket.receive.buffer.bytes=655360
#如果zookeeper没有offset值或者offset值超出范围.那么就给个初始的offset.
#有smallest,largest,anything可选,分别表示给当前最小的offset,当前最大的offset,抛异常.默认largest
auto.offset.reset=smallest
#指定序列化处理类
derializer.class=kafka.serializer.DefaultDecoder
~~~
## 启动
然后各个节点启动kafka
~~~
kafka-server-start.sh config/server.properties
~~~
启动成功,jps查看会出现kafka
如果报这种错误表示jdk版本要java8的
~~~
kafka/Kafka : Unsupported major.minor version 52.0
~~~
# 日志
~~~
server.log #kafka的运行日志
state-change.log #kafka他是用zookeeper来保存状态,所以他可能会进行切换,切换的日志就保存在这里
controller.log #kafka选择一个节点作为“controller”,当发现有节点down掉的时候它负责在游泳分区的所有节点中选择新的leader,这使得Kafka可以批量的高效的管理所有分区节点的主从关系。如果controller down掉了,活着的节点中的一个会备切换为新的controller.
~~~
# zk中查看状态
~~~
#查看目录情况 执行“ls /”
[zk: 127.0.0.1:2181(CONNECTED) 0] ls /
#显示结果:[consumers, config, controller, isr_change_notification, admin, brokers, zookeeper, controller_epoch]
'''
上面的显示结果中:只有zookeeper是原生的,其他都是Kafka创建的
'''
#标注一个重要的
[zk: 127.0.0.1:12181(CONNECTED) 1] get /brokers/ids/0
{"jmx_port":-1,"timestamp":"1456125963355","endpoints":["PLAINTEXT://192.168.7.100:19092"],"host":"192.168.7.100","version":2,"port":19092}
cZxid = 0x1000001c1
ctime = Mon Feb 22 15:26:03 CST 2016
mZxid = 0x1000001c1
mtime = Mon Feb 22 15:26:03 CST 2016
pZxid = 0x1000001c1
cversion = 0
dataVersion = 0
aclVersion = 0
ephemeralOwner = 0x152e40aead20016
dataLength = 139
numChildren = 0
[zk: 127.0.0.1:12181(CONNECTED) 2]
#还有一个是查看partion
[zk: 127.0.0.1:12181(CONNECTED) 7] get /brokers/topics/shuaige/partitions/0
null
cZxid = 0x100000029
ctime = Mon Feb 22 10:05:11 CST 2016
mZxid = 0x100000029
mtime = Mon Feb 22 10:05:11 CST 2016
pZxid = 0x10000002a
cversion = 1
dataVersion = 0
aclVersion = 0
ephemeralOwner = 0x0
dataLength = 0
numChildren = 1
[zk: 127.0.0.1:12181(CONNECTED) 8]
~~~
# 常见问题
## 启动advertised.listeners配置异常
~~~
java.lang.IllegalArgumentException: requirement failed: advertised.listeners cannot use the nonroutable meta-address 0.0.0.0. Use a routable IP address.
at scala.Predef$.require(Predef.scala:277)
at kafka.server.KafkaConfig.validateValues(KafkaConfig.scala:1203)
at kafka.server.KafkaConfig.<init>(KafkaConfig.scala:1170)
at kafka.server.KafkaConfig$.fromProps(KafkaConfig.scala:881)
at kafka.server.KafkaConfig$.fromProps(KafkaConfig.scala:878)
at kafka.server.KafkaServerStartable$.fromProps(KafkaServerStartable.scala:28)
at kafka.Kafka$.main(Kafka.scala:82)
at kafka.Kafka.main(Kafka.scala)
~~~
**解决方法:修改server.properties**
~~~
advertised.listeners=PLAINTEXT://{ip}:9092 # ip可以内网、外网ip、127.0.0.1 或域名
~~~
**解析**
server.properties中有两个listeners。 listeners:启动kafka服务监听的ip和端口,可以监听内网ip和0.0.0.0(不能为外网ip),默认为java.net.InetAddress.getCanonicalHostName()获取的ip。advertised.listeners:生产者和消费者连接的地址,kafka会把该地址注册到zookeeper中,所以只能为除0.0.0.0之外的合法ip或域名 ,默认和listeners的配置一致
## 启动PrintGCDateStamps异常
~~~
[0.004s][warning][gc] -Xloggc is deprecated. Will use -Xlog:gc:/data/service/kafka_2.11-0.11.0.2/bin/../logs/kafkaServer-gc.log instead.
Unrecognized VM option 'PrintGCDateStamps'
Error: Could not create the Java Virtual Machine.
Error: A fatal exception has occurred. Program will exit.
~~~
**解决方法: 更换jdk1.8.x版本或者使用>=kafka1.0.x的版本**
**解析**
只有在jdk1.9并且kafka版本在1.0.x之前的版本才会出现
## 生成者发送message失败或消费者不能消费(kafka1.0.1)
~~~
#(java)org.apache.kafka警告
Connection to node 0 could not be established. Broker may not be available.
# (nodejs) kafka-node异常 (执行producer.send后的异常)
{ TimeoutError: Request timed out after 30000ms
at new TimeoutError (D:\project\node\kafka-test\src\node_modules\kafka-node\lib\errors\TimeoutError.js:6:9)
at Timeout.setTimeout [as _onTimeout] (D:\project\node\kafka-test\src\node_modules\kafka-node\lib\kafkaClient.js:737:14)
at ontimeout (timers.js:466:11)
at tryOnTimeout (timers.js:304:5)
at Timer.listOnTimeout (timers.js:264:5) message: 'Request timed out after 30000ms' }
~~~
**解决方法**: 检查advertised.listeners的配置(如果有多个Broker可根据java版本的对应的node号检查配置),判断当前的网络是否可以连接到地址(telnet等)
## partitions配置的值过小造成错误(kafka1.0.1)
~~~
#(java)org.apache.kafka(执行producer.send)
Exception in thread "main" org.apache.kafka.common.KafkaException: Invalid partition given with record: 1 is not in the range [0...1).
at org.apache.kafka.clients.producer.KafkaProducer.waitOnMetadata(KafkaProducer.java:908)
at org.apache.kafka.clients.producer.KafkaProducer.doSend(KafkaProducer.java:778)
at org.apache.kafka.clients.producer.KafkaProducer.send(KafkaProducer.java:768)
at com.wenshao.dal.TestProducer.main(TestProducer.java:36)
# (nodejs) kafka-node异常 (执行producer.send后的异常)
{ BrokerNotAvailableError: Could not find the leader
at new BrokerNotAvailableError (D:\project\node\kafka-test\src\node_modules\kafka-node\lib\errors\BrokerNotAvailableError.js:11:9)
at refreshMetadata.error (D:\project\node\kafka-test\src\node_modules\kafka-node\lib\kafkaClient.js:831:16)
at D:\project\node\kafka-test\src\node_modules\kafka-node\lib\client.js:514:9
at KafkaClient.wrappedFn (D:\project\node\kafka-test\src\node_modules\kafka-node\lib\kafkaClient.js:379:14)
at KafkaClient.Client.handleReceivedData (D:\project\node\kafka-test\src\node_modules\kafka-node\lib\client.js:770:60)
at Socket.<anonymous> (D:\project\node\kafka-test\src\node_modules\kafka-node\lib\kafkaClient.js:618:10)
at Socket.emit (events.js:159:13)
at addChunk (_stream_readable.js:265:12)
at readableAddChunk (_stream_readable.js:252:11)
at Socket.Readable.push (_stream_readable.js:209:10) message: 'Could not find the leader' }
~~~
**解决方法**: 修改num.partitions的值,partitions在是在创建topic的时候默认创建的partitions节点的个数,只对新创建的topic生效,所有尽量在项目规划时候定一个合理的值。也可以通过命令行动态扩容()
~~~
./bin/kafka-topics.sh --zookeeper localhost:2181 --alter --partitions 2 --topic foo
~~~
- 基础
- 编译和安装
- classpath到底是什么?
- 编译运行
- 安装
- sdkman多版本
- jabba多版本
- java字节码查看
- 数据类型
- 简介
- 整形
- char和int
- 变量和常量
- 大数值运算
- 基本类型包装类
- Math类
- 内存划分
- 位运算符
- 方法相关
- 方法重载
- 可变参数
- 方法引用
- 面向对象
- 定义
- 继承和覆盖
- 接口和抽象类
- 接口定义增强
- 内建函数式接口
- 多态
- 泛型
- final和static
- 内部类
- 包
- 修饰符
- 异常
- 枚举类
- 代码块
- 对象克隆
- BeanUtils
- java基础类
- scanner类
- Random类
- System类
- Runtime类
- Comparable接口
- Comparator接口
- MessageFormat类
- NumberFormat
- 数组相关
- 数组
- Arrays
- string相关
- String
- StringBuffer
- StringBuilder
- 正则
- 日期类
- Locale类
- Date
- DateFormat
- SimpleDateFormat
- Calendar
- 新时间日期API
- 简介
- LocalDate,LocalTime,LocalDateTime
- Instant时间点
- 带时区的日期,时间处理
- 时间间隔
- 日期时间校正器
- TimeUnit
- 用yyyy
- 集合
- 集合和迭代器
- ArrayList集合
- List
- Set
- 判断集合唯一
- Map和Entry
- stack类
- Collections集合工具类
- Stream数据流
- foreach不能修改内部元素
- of方法
- IO
- File类
- 字节流stream
- 字符流Reader
- IO流分类
- 转换流
- 缓冲流
- 流的操作规律
- properties
- 序列化流与反序列化流
- 打印流
- System类对IO支持
- commons-IO
- IO流总结
- NIO
- 异步与非阻塞
- IO通信
- Unix的IO模型
- epoll对于文件描述符操作模式
- 用户空间和内核空间
- NIO与普通IO的主要区别
- Paths,Path,Files
- Buffer
- Channel
- Selector
- Pipe
- Charset
- NIO代码
- 多线程
- 创建线程
- 线程常用方法
- 线程池相关
- 线程池概念
- ThreadPoolExecutor
- Runnable和Callable
- 常用的几种线程池
- 线程安全
- 线程同步的几种方法
- synchronized
- 死锁
- lock接口
- ThreadLoad
- ReentrantLock
- 读写锁
- 锁的相关概念
- volatile
- 释放锁和不释放锁的操作
- 等待唤醒机制
- 线程状态
- 守护线程和普通线程
- Lamda表达式
- 反射相关
- 类加载器
- 反射
- 注解
- junit注解
- 动态代理
- 网络编程相关
- 简介
- UDP
- TCP
- 多线程socket上传图片
- NIO
- JDBC相关
- JDBC
- 预处理
- 批处理
- 事务
- properties配置文件
- DBUtils
- DBCP连接池
- C3P0连接池
- 获得MySQL自动生成的主键
- Optional类
- Jigsaw模块化
- 日志相关
- JDK日志
- log4j
- logback
- xml
- tomcat
- maven
- 简介
- 仓库
- 目录结构
- 常用命令
- 生命周期
- idea配置
- jar包冲突
- 依赖范围
- 私服
- 插件
- git-commit-id-plugin
- maven-assembly-plugin
- maven-resources-plugin
- maven-compiler-plugin
- versions-maven-plugin
- maven-source-plugin
- tomcat-maven-plugin
- 多环境
- 自定义插件
- stream
- swing
- json
- jackson
- optional
- junit
- gradle
- servlet
- 配置
- ServletContext
- 生命周期
- HttpServlet
- request
- response
- 乱码
- session和cookie
- cookie
- session
- jsp
- 简介
- 注释
- 方法,成员变量
- 指令
- 动作标签
- 隐式对象
- EL
- JSTL
- javaBean
- listener监听器
- Filter过滤器
- 图片验证码
- HttpUrlConnection
- 国际化
- 文件上传
- 文件下载
- spring
- 简介
- Bean
- 获取和实例化
- 属性注入
- 自动装配
- 继承和依赖
- 作用域
- 使用外部属性文件
- spel
- 前后置处理器
- 生命周期
- 扫描规则
- 整合多个配置文件
- 注解
- 简介
- 注解分层
- 类注入
- 分层和作用域
- 初始化方法和销毁方法
- 属性
- 泛型注入
- Configuration配置文件
- aop
- aop的实现
- 动态代理实现
- cglib代理实现
- aop名词
- 简介
- aop-xml
- aop-注解
- 代理方式选择
- jdbc
- 简介
- JDBCTemplate
- 事务
- 整合
- junit整合
- hibernate
- 简介
- hibernate.properties
- 实体对象三种状态
- 检索方式
- 简介
- 导航对象图检索
- OID检索
- HQL
- Criteria(QBC)
- Query
- 缓存
- 事务管理
- 关系映射
- 注解
- 优化
- MyBatis
- 简介
- 入门程序
- Mapper动态代理开发
- 原始Dao开发
- Mapper接口开发
- SqlMapConfig.xml
- map映射文件
- 输出返回map
- 输入参数
- pojo包装类
- 多个输入参数
- resultMap
- 动态sql
- 关联
- 一对一
- 一对多
- 多对多
- 整合spring
- CURD
- 占位符和sql拼接以及参数处理
- 缓存
- 延迟加载
- 注解开发
- springMVC
- 简介
- RequestMapping
- 参数绑定
- 常用注解
- 响应
- 文件上传
- 异常处理
- 拦截器
- springBoot
- 配置
- 热更新
- java配置
- springboot配置
- yaml语法
- 运行
- Actuator 监控
- 多环境配置切换
- 日志
- 日志简介
- logback和access
- 日志文件配置属性
- 开机自启
- aop
- 整合
- 整合Redis
- 整合Spring Data JPA
- 基本查询
- 复杂查询
- 多数据源的支持
- Repository分析
- JpaSpecificationExecutor
- 整合Junit
- 整合mybatis
- 常用注解
- 基本操作
- 通用mapper
- 动态sql
- 关联映射
- 使用xml
- spring容器
- 整合druid
- 整合邮件
- 整合fastjson
- 整合swagger
- 整合JDBC
- 整合spingboot-cache
- 请求
- restful
- 拦截器
- 常用注解
- 参数校验
- 自定义filter
- websocket
- 响应
- 异常错误处理
- 文件下载
- 常用注解
- 页面
- Thymeleaf组件
- 基本对象
- 内嵌对象
- 上传文件
- 单元测试
- 模拟请求测试
- 集成测试
- 源码解析
- 自动配置原理
- 启动流程分析
- 源码相关链接
- Servlet,Filter,Listener
- springcloud
- 配置
- 父pom
- 创建子工程
- Eureka
- Hystrix
- Ribbon
- Feign
- Zuul
- kotlin
- 基本数据类型
- 函数
- 区间
- 区块链
- 简介
- linux
- ulimit修改
- 防止syn攻击
- centos7部署bbr
- debain9开启bbr
- mysql
- 隔离性
- sql执行加载顺序
- 7种join
- explain
- 索引失效和优化
- 表连接优化
- orderby的filesort问题
- 慢查询
- show profile
- 全局查询日志
- 死锁解决
- sql
- 主从
- IDEA
- mac快捷键
- 美化界面
- 断点调试
- 重构
- springboot-devtools热部署
- IDEA进行JAR打包
- 导入jar包
- ProjectStructure
- toString添加json模板
- 配置maven
- Lombok插件
- rest client
- 文档显示
- sftp文件同步
- 书签
- 代码查看和搜索
- postfix
- live template
- git
- 文件头注释
- JRebel
- 离线模式
- xRebel
- github
- 连接mysql
- 选项没有Java class的解决方法
- 扩展
- 项目配置和web部署
- 前端开发
- json和Inject language
- idea内存和cpu变高
- 相关设置
- 设计模式
- 单例模式
- 简介
- 责任链
- JUC
- 原子类
- 原子类简介
- 基本类型原子类
- 数组类型原子类
- 引用类型原子类
- JVM
- JVM规范内存解析
- 对象的创建和结构
- 垃圾回收
- 内存分配策略
- 备注
- 虚拟机工具
- 内存模型
- 同步八种操作
- 内存区域大小参数设置
- happens-before
- web service
- tomcat
- HTTPS
- nginx
- 变量
- 运算符
- 模块
- Rewrite规则
- Netty
- netty为什么没用AIO
- 基本组件
- 源码解读
- 简单的socket例子
- 准备netty
- netty服务端启动
- 案例一:发送字符串
- 案例二:发送对象
- websocket
- ActiveMQ
- JMS
- 安装
- 生产者-消费者代码
- 整合springboot
- kafka
- 简介
- 安装
- 图形化界面
- 生产过程分析
- 保存消息分析
- 消费过程分析
- 命令行
- 生产者
- 消费者
- 拦截器interceptor
- partition
- kafka为什么快
- kafka streams
- kafka与flume整合
- RabbitMQ
- AMQP
- 整体架构
- RabbitMQ安装
- rpm方式安装
- 命令行和管控页面
- 消息生产与消费
- 整合springboot
- 依赖和配置
- 简单测试
- 多方测试
- 对象支持
- Topic Exchange模式
- Fanout Exchange订阅
- 消息确认
- java client
- RabbitAdmin和RabbitTemplate
- 两者简介
- RabbitmqAdmin
- RabbitTemplate
- SimpleMessageListenerContainer
- MessageListenerAdapter
- MessageConverter
- 详解
- Jackson2JsonMessageConverter
- ContentTypeDelegatingMessageConverter
- lucene
- 简介
- 入门程序
- luke查看索引
- 分析器
- 索引库维护
- elasticsearch
- 配置
- 插件
- head插件
- ik分词插件
- 常用术语
- Mapping映射
- 数据类型
- 属性方法
- Dynamic Mapping
- Index Template 索引模板
- 管理映射
- 建立映射
- 索引操作
- 单模式下CURD
- mget多个文档
- 批量操作
- 版本控制
- 基本查询
- Filter过滤
- 组合查询
- 分析器
- redis
- String
- list
- hash
- set
- sortedset
- 发布订阅
- 事务
- 连接池
- 管道
- 分布式可重入锁
- 配置文件翻译
- 持久化
- RDB
- AOF
- 总结
- Lettuce
- zookeeper
- zookeeper简介
- 集群部署
- Observer模式
- 核心工作机制
- zk命令行操作
- zk客户端API
- 感知服务动态上下线
- 分布式共享锁
- 原理
- zab协议
- 两阶段提交协议
- 三阶段提交协议
- Paxos协议
- ZAB协议
- hadoop
- 简介
- hadoop安装
- 集群安装
- 单机安装
- linux编译hadoop
- 添加新节点
- 退役旧节点
- 集群间数据拷贝
- 归档
- 快照管理
- 回收站
- 检查hdfs健康状态
- 安全模式
- hdfs简介
- hdfs命令行操作
- 常见问题汇总
- hdfs客户端操作
- mapreduce工作机制
- 案例-单词统计
- 局部聚合Combiner
- combiner流程
- combiner案例
- 自定义排序
- 自定义Bean对象
- 排序的分类
- 案例-按总量排序需求
- 一次性完成统计和排序
- 分区
- 分区简介
- 案例-结果分区
- 多表合并
- reducer端合并
- map端合并(分布式缓存)
- 分组
- groupingComparator
- 案例-求topN
- 全局计数器
- 合并小文件
- 小文件的弊端
- CombineTextInputFormat机制
- 自定义InputFormat
- 自定义outputFormat
- 多job串联
- 倒排索引
- 共同好友
- 串联
- 数据压缩
- InputFormat接口实现类
- yarn简介
- 推测执行算法
- 本地提交到yarn
- 框架运算全流程
- 数据倾斜问题
- mapreduce的优化方案
- HA机制
- 优化
- Hive
- 安装
- shell参数
- 数据类型
- 集合类型
- 数据库
- DDL操作
- 创建表
- 修改表
- 分区表
- 分桶表
- DML操作
- load
- insert
- select
- export,import
- Truncate
- 注意
- 严格模式
- 函数
- 内置运算符
- 内置函数
- 自定义函数
- Transfrom实现
- having和where不同
- 压缩
- 存储
- 存储和压缩结合使用
- explain详解
- 调优
- Fetch抓取
- 本地模式
- 表的优化
- GroupBy
- count(Distinct)去重统计
- 行列过滤
- 动态分区调整
- 数据倾斜
- 并行执行
- JVM重用
- 推测执行
- reduce内存和个数
- sql查询结果作为变量(shell)
- youtube
- flume
- 简介
- 安装
- 常用组件
- 拦截器
- 案例
- 监听端口到控制台
- 采集目录到HDFS
- 采集文件到HDFS
- 多个agent串联
- 日志采集和汇总
- 单flume多channel,sink
- 自定义拦截器
- 高可用配置
- 使用注意
- 监控Ganglia
- sqoop
- 安装
- 常用命令
- 数据导入
- 准备数据
- 导入数据到HDFS
- 导入关系表到HIVE
- 导入表数据子集
- 增量导入
- 数据导出
- 打包脚本
- 作业
- 原理
- azkaban
- 简介
- 安装
- 案例
- 简介
- command类型单一job
- command类型多job工作流flow
- HDFS操作任务
- mapreduce任务
- hive脚本任务
- oozie
- 安装
- hbase
- 简介
- 系统架构
- 物理存储
- 寻址机制
- 读写过程
- 安装
- 命令行
- 基本CURD
- java api
- CURD
- CAS
- 过滤器查询
- 建表高级属性
- 与mapreduce结合
- 与sqoop结合
- 协处理器
- 参数配置优化
- 数据备份和恢复
- 节点管理
- 案例-点击流
- 简介
- HUE
- 安装
- storm
- 简介
- 安装
- 集群启动及任务过程分析
- 单词统计
- 单词统计(接入kafka)
- 并行度和分组
- 启动流程分析
- ACK容错机制
- ACK简介
- BaseRichBolt简单使用
- BaseBasicBolt简单使用
- Ack工作机制
- 本地目录树
- zookeeper目录树
- 通信机制
- 案例
- 日志告警
- 工具
- YAPI
- chrome无法手动拖动安装插件
- 时间和空间复杂度
- jenkins
- 定位cpu 100%
- 常用脚本工具
- OOM问题定位
- scala
- 编译
- 基本语法
- 函数
- 数组常用方法
- 集合
- 并行集合
- 类
- 模式匹配
- 异常
- tuple元祖
- actor并发编程
- 柯里化
- 隐式转换
- 泛型
- 迭代器
- 流stream
- 视图view
- 控制抽象
- 注解
- spark
- 企业架构
- 安装
- api开发
- mycat
- Groovy
- 基础