[TOC]
# 简介
Flume是Cloudera提供的一个高可用的,高可靠的,分布式的海量日志采集、聚合和传输的系统,Flume支持在日志系统中定制各类数据发送方,用于收集数据;同时,Flume提供对数据进行简单处理,并写到各种数据接受方(可定制)的能力。Flume有各种自带的拦截器,比如:TimestampInterceptor(时间戳)、HostInterceptor(主机)、RegexExtractorInterceptor(正则)等,通过使用不同的拦截器,实现不同的功能。但是以上的这些拦截器,不能改变原有日志数据的内容或者对日志信息添加一定的处理逻辑,当一条日志信息有几十个甚至上百个字段的时候,在传统的Flume处理下,收集到的日志还是会有对应这么多的字段,也不能对你想要的字段进行对应的处理
# 自定义拦截器
根据实际业务的需求,为了更好的满足数据在应用层的处理,通过自定义Flume拦截器,过滤掉不需要的字段,并对指定字段加密处理,将源数据进行预处理。减少了数据的传输量,降低了存储的开销
# 需求
~~~
13601249301 100 200 300 400 500 600 700
13601249302 100 200 300 400 500 600 700
13601249303 100 200 300 400 500 600 700
13601249304 100 200 300 400 500 600 700
13601249305 100 200 300 400 500 600 700
13601249306 100 200 300 400 500 600 700
13601249307 100 200 300 400 500 600 700
13601249308 100 200 300 400 500 600 700
13601249309 100 200 300 400 500 600 700
13601249310 100 200 300 400 500 600 700
13601249311 100 200 300 400 500 600 700
13601249312 100 200 300 400 500 600 700
~~~
把这个变成这个样子
第一行加密,中间有几行舍弃
![](https://box.kancloud.cn/9a73270b2419e629800c1ed8bf4dae99_1436x190.png)
# 实现
二部分
## 编写java代码,自定义拦截器;
内容包括:
1. 定义一个类CustomParameterInterceptor实现Interceptor接口。
2. 在CustomParameterInterceptor类中定义变量,这些变量是需要到 Flume的配置文件中进行配置使用的。每一行字段间的分隔符(fields_separator)、通过分隔符分隔后,所需要列字段的下标(indexs)、多个下标使用的分隔符(indexs_separator)、多个下标使用的分隔符(indexs_separator)。
3. 添加CustomParameterInterceptor的有参构造方法。并对相应的变量进行处理。将配置文件中传过来的unicode编码进行转换为字符串。
4. 写具体的要处理的逻辑intercept()方法,一个是单个处理的,一个是批量处理。
5. 接口中定义了一个内部接口Builder,在configure方法中,进行一些参数配置。并给出,在flume的conf中没配置一些参数时,给出其默认值。通过其builder方法,返回一个CustomParameterInterceptor对象。
6. 定义一个静态类,类中封装MD5加密方法
7. 通过以上步骤,自定义拦截器的代码开发已完成,然后打包成jar, 放到Flume的根目录下的lib中
![](https://box.kancloud.cn/4f2e68edbe218f81cdd341ee91d57df7_522x604.png)
## 修改Flume的配置信息
新增配置文件spool-interceptor-hdfs.conf,内容为:
~~~
a1.channels = c1
a1.sources = r1
a1.sinks = s1
#channel
a1.channels.c1.type = memory
a1.channels.c1.capacity=100000
a1.channels.c1.transactionCapacity=50000
#source
a1.sources.r1.channels = c1
# 监控/root/data/下的文件
a1.sources.r1.type = spooldir
a1.sources.r1.spoolDir = /root/data/
a1.sources.r1.batchSize= 50
a1.sources.r1.inputCharset = UTF-8
# 拦截器
a1.sources.r1.interceptors =i1 i2
# 自己定义的java类, $表示内部类
a1.sources.r1.interceptors.i1.type =com.hive.CustomParameterInterceptor$Builder
# 下面的定义属性会传递到自定义的类中
# 自定义拦截器的属性,这个是代表分隔符,unicode编码的空格
a1.sources.r1.interceptors.i1.fields_separator=\\u0009
# 当前列我需要取哪些列的下标
a1.sources.r1.interceptors.i1.indexs =0,1,3,5,6
# 当前索引用的什么分隔符,用的是逗号,写unicode编码
a1.sources.r1.interceptors.i1.indexs_separator =\\u002c
# 具体加密字段的参数,第一列加密
a1.sources.r1.interceptors.i1.encrypted_field_index =0
# 这个拦截器类型是时间戳
a1.sources.r1.interceptors.i2.type = timestamp
#sink
a1.sinks.s1.channel = c1
a1.sinks.s1.type = hdfs
a1.sinks.s1.hdfs.path =hdfs://master:9000/flume/%Y%m%d
# 当前类hdfs里面的属性值
a1.sinks.s1.hdfs.filePrefix = event
a1.sinks.s1.hdfs.fileSuffix = .log
a1.sinks.s1.hdfs.rollSize = 10485760
a1.sinks.s1.hdfs.rollInterval =20
a1.sinks.s1.hdfs.rollCount = 0
a1.sinks.s1.hdfs.batchSize = 1500
a1.sinks.s1.hdfs.round = true
a1.sinks.s1.hdfs.roundUnit = minute
a1.sinks.s1.hdfs.threadsPoolSize = 25
a1.sinks.s1.hdfs.useLocalTimeStamp = true
a1.sinks.s1.hdfs.minBlockReplicas = 1
a1.sinks.s1.hdfs.fileType =DataStream
a1.sinks.s1.hdfs.writeFormat = Text
a1.sinks.s1.hdfs.callTimeout = 60000
a1.sinks.s1.hdfs.idleTimeout =60
~~~
# 代码
先看内部Builder类,里面有configure方法
字段的默认值有CustomParameterInterceptor.Constants这个内部类提供
Builder类里面的builder是构造拦截器,用里面的类来构建
先调用这个类的构造函数
然后`List<Event> intercept`会调用当个intercept
~~~
package com.hive;
import com.google.common.base.Charsets;
import org.apache.flume.Context;
import org.apache.flume.Event;
import org.apache.flume.interceptor.Interceptor;
import java.security.MessageDigest;
import java.security.NoSuchAlgorithmException;
import java.util.ArrayList;
import java.util.List;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
import static com.hive.CustomParameterInterceptor.Constants.*;
public class CustomParameterInterceptor implements Interceptor {
//指明每一行字段的分隔符
private final String fields_separator;
//通过分割符分割后,指明需要那列的字段,下标
private final String indexs;
//多个下标的分割符
private final String indexs_separator;
//需要加密的字段下标
private final String encrypted_field_index;
public CustomParameterInterceptor(String fields_separator, String indexs, String indexs_separator, String encrypted_field_index) {
//每一行字段的分隔符
String f = fields_separator.trim();
//多个下标的分割符
String i = indexs_separator.trim();
//通过分割符分割后,指明需要那列的字段,下标
this.indexs = indexs;
//需要加密的字段下标
this.encrypted_field_index = encrypted_field_index.trim();
if (!f.equals("")) {
f = UnicodeToString(f);
}
//指明每一行字段的分隔符
this.fields_separator = f;
if (!i.equals("")) {
i = UnicodeToString(i);
}
//多个下标的分割符
this.indexs_separator = i;
}
/**
* unicode转换为string
* \t 制表符 ('\u0009') \n 新行(换行)符 (' ') \r 回车符 (' ') \f 换页符 ('\u000C') \a 报警
* (bell) 符 ('\u0007') \e 转义符 ('\u001B') \cx 空格(\u0020)对应于 x 的控制符
*/
private String UnicodeToString(String str) {
Pattern pattern = Pattern.compile("(\\\\u(\\p{XDigit}{4}))");
Matcher matcher = pattern.matcher(str);
char ch;
while (matcher.find()) {
ch = (char) Integer.parseInt(matcher.group(2), 16);
str = str.replace(matcher.group(1), ch + "");
}
return str;
}
//初始化方法
@Override
public void initialize() {}
//处理单条事件
@Override
public Event intercept(Event event) {
if (event == null) {
return null;
}
try {
//getBody是具体存放数据的内容,获取每一行数据
String line = new String(event.getBody(), Charsets.UTF_8);
//分隔符切分
String[] fields_spilts = line.split(fields_separator);
//对应所需要的列的下标
String[] indexs_split = indexs.split(indexs_separator);
String newLine = "";
//循环下标数组
for (int i = 0; i < indexs_split.length; i++) {
int parseInt = Integer.parseInt(indexs_split[i]);
//对字段进行加密
if (!"".equals(encrypted_field_index) && encrypted_field_index.equals(indexs_split[i])) {
//将数据最终加密为md5
newLine += StringUtils.GetMD5Code(fields_spilts[parseInt]);
} else {
newLine += fields_spilts[parseInt];
}
//拼接字段分割符
if (i != indexs_split.length - 1) {
newLine += fields_separator;
}
}
//把这个新的一行数据设置进去
event.setBody(newLine.getBytes());
return event;
} catch (Exception e) {
return event;
}
}
//处理很多事件
@Override
public List<Event> intercept(List<Event> events) {
ArrayList<Event> out = new ArrayList<Event>();
//循环这个时间列表
for (Event event : events) {
//调用单个事件
Event outEvent = intercept(event);
if (outEvent != null) {
out.add(outEvent);
}
}
return out;
}
@Override
public void close() {
}
//这个Interceptor.Builder可以读取flume的配置文件,因为Builder中继承的Configurable可以读取配置文件
public static class Builder implements Interceptor.Builder {
/**
* The fields_separator.指明每一行字段的分隔符
*/
private String fields_separator;
/**
* The indexs.通过分隔符分割后,指明需要那列的字段 下标
*/
private String indexs;
/**
* The indexs_separator. 多个下标的分隔符
*/
private String indexs_separator;
/**
* The encrypted_field. 需要加密的字段下标
*/
private String encrypted_field_index;
//构建对应的拦截器
@Override
public Interceptor build() {
//用上面一个类来构建,返回外面的那个类,把值传递给他的构造器
return new CustomParameterInterceptor(fields_separator, indexs, indexs_separator, encrypted_field_index);
}
//能够帮我们获取配置文件定义的参数
@Override
public void configure(Context context) {
//后面的值是默认值,context可以获取到配置文件的值,在这里面用大写常量代替,具体的值可参考下面的类Constants
fields_separator = context.getString(FIELD_SEPARATOR, DEFAULT_FIELD_SEPARATOR);
indexs = context.getString(INDEXS, DEFAULT_INDEXS);
indexs_separator = context.getString(INDEXS_SEPARATOR, DEFAULT_INDEXS_SEPARATOR);
encrypted_field_index = context.getString(ENCRYPTED_FIELD_INDEX, DEFAULT_ENCRYPTED_FIELD_INDEX);
}
}
public static class Constants {
/**
* The Constant FIELD_SEPARATOR.
*/
public static final String FIELD_SEPARATOR = "fields_separator";
/**
* The Constant DEFAULT_FIELD_SEPARATOR.
*/
public static final String DEFAULT_FIELD_SEPARATOR = " ";
/**
* The Constant INDEXS.
*/
public static final String INDEXS = "indexs";
/**
* The Constant DEFAULT_INDEXS.
*/
public static final String DEFAULT_INDEXS = "0";
/**
* The Constant INDEXS_SEPARATOR.
*/
public static final String INDEXS_SEPARATOR = "indexs_separator";
/**
* The Constant DEFAULT_INDEXS_SEPARATOR.
*/
public static final String DEFAULT_INDEXS_SEPARATOR = ",";
/**
* The Constant ENCRYPTED_FIELD_INDEX.
*/
public static final String ENCRYPTED_FIELD_INDEX = "encrypted_field_index";
/**
* The Constant DEFAUL_TENCRYPTED_FIELD_INDEX.
*/
public static final String DEFAULT_ENCRYPTED_FIELD_INDEX = "";
/**
* The Constant PROCESSTIME.
*/
public static final String PROCESSTIME = "processTime";
/**
* The Constant PROCESSTIME.
*/
public static final String DEFAULT_PROCESSTIME = "a";
}
/**
* 字符串md5加密
*/
public static class StringUtils {
// 全局数组
private final static String[] strDigits = {"0", "1", "2", "3", "4", "5",
"6", "7", "8", "9", "a", "b", "c", "d", "e", "f"};
// 返回形式为数字跟字符串
private static String byteToArrayString(byte bByte) {
int iRet = bByte;
// System.out.println("iRet="+iRet);
if (iRet < 0) {
iRet += 256;
}
int iD1 = iRet / 16;
int iD2 = iRet % 16;
return strDigits[iD1] + strDigits[iD2];
}
// 返回形式只为数字
private static String byteToNum(byte bByte) {
int iRet = bByte;
System.out.println("iRet1=" + iRet);
if (iRet < 0) {
iRet += 256;
}
return String.valueOf(iRet);
}
// 转换字节数组为16进制字串
private static String byteToString(byte[] bByte) {
StringBuffer sBuffer = new StringBuffer();
for (int i = 0; i < bByte.length; i++) {
sBuffer.append(byteToArrayString(bByte[i]));
}
return sBuffer.toString();
}
public static String GetMD5Code(String strObj) {
String resultString = null;
try {
resultString = new String(strObj);
MessageDigest md = MessageDigest.getInstance("MD5");
// md.digest() 该函数返回值为存放哈希值结果的byte数组
resultString = byteToString(md.digest(strObj.getBytes()));
} catch (NoSuchAlgorithmException ex) {
ex.printStackTrace();
}
return resultString;
}
}
}
~~~
然后把这个jar包上传到hive的lib目录
启动:
~~~
flume-ng agent -c conf -f conf/spool-interceptor-hdfs.conf -name a1 -Dflume.root.logger=DEBUG,console
~~~
- 基础
- 编译和安装
- classpath到底是什么?
- 编译运行
- 安装
- sdkman多版本
- jabba多版本
- java字节码查看
- 数据类型
- 简介
- 整形
- char和int
- 变量和常量
- 大数值运算
- 基本类型包装类
- Math类
- 内存划分
- 位运算符
- 方法相关
- 方法重载
- 可变参数
- 方法引用
- 面向对象
- 定义
- 继承和覆盖
- 接口和抽象类
- 接口定义增强
- 内建函数式接口
- 多态
- 泛型
- final和static
- 内部类
- 包
- 修饰符
- 异常
- 枚举类
- 代码块
- 对象克隆
- BeanUtils
- java基础类
- scanner类
- Random类
- System类
- Runtime类
- Comparable接口
- Comparator接口
- MessageFormat类
- NumberFormat
- 数组相关
- 数组
- Arrays
- string相关
- String
- StringBuffer
- StringBuilder
- 正则
- 日期类
- Locale类
- Date
- DateFormat
- SimpleDateFormat
- Calendar
- 新时间日期API
- 简介
- LocalDate,LocalTime,LocalDateTime
- Instant时间点
- 带时区的日期,时间处理
- 时间间隔
- 日期时间校正器
- TimeUnit
- 用yyyy
- 集合
- 集合和迭代器
- ArrayList集合
- List
- Set
- 判断集合唯一
- Map和Entry
- stack类
- Collections集合工具类
- Stream数据流
- foreach不能修改内部元素
- of方法
- IO
- File类
- 字节流stream
- 字符流Reader
- IO流分类
- 转换流
- 缓冲流
- 流的操作规律
- properties
- 序列化流与反序列化流
- 打印流
- System类对IO支持
- commons-IO
- IO流总结
- NIO
- 异步与非阻塞
- IO通信
- Unix的IO模型
- epoll对于文件描述符操作模式
- 用户空间和内核空间
- NIO与普通IO的主要区别
- Paths,Path,Files
- Buffer
- Channel
- Selector
- Pipe
- Charset
- NIO代码
- 多线程
- 创建线程
- 线程常用方法
- 线程池相关
- 线程池概念
- ThreadPoolExecutor
- Runnable和Callable
- 常用的几种线程池
- 线程安全
- 线程同步的几种方法
- synchronized
- 死锁
- lock接口
- ThreadLoad
- ReentrantLock
- 读写锁
- 锁的相关概念
- volatile
- 释放锁和不释放锁的操作
- 等待唤醒机制
- 线程状态
- 守护线程和普通线程
- Lamda表达式
- 反射相关
- 类加载器
- 反射
- 注解
- junit注解
- 动态代理
- 网络编程相关
- 简介
- UDP
- TCP
- 多线程socket上传图片
- NIO
- JDBC相关
- JDBC
- 预处理
- 批处理
- 事务
- properties配置文件
- DBUtils
- DBCP连接池
- C3P0连接池
- 获得MySQL自动生成的主键
- Optional类
- Jigsaw模块化
- 日志相关
- JDK日志
- log4j
- logback
- xml
- tomcat
- maven
- 简介
- 仓库
- 目录结构
- 常用命令
- 生命周期
- idea配置
- jar包冲突
- 依赖范围
- 私服
- 插件
- git-commit-id-plugin
- maven-assembly-plugin
- maven-resources-plugin
- maven-compiler-plugin
- versions-maven-plugin
- maven-source-plugin
- tomcat-maven-plugin
- 多环境
- 自定义插件
- stream
- swing
- json
- jackson
- optional
- junit
- gradle
- servlet
- 配置
- ServletContext
- 生命周期
- HttpServlet
- request
- response
- 乱码
- session和cookie
- cookie
- session
- jsp
- 简介
- 注释
- 方法,成员变量
- 指令
- 动作标签
- 隐式对象
- EL
- JSTL
- javaBean
- listener监听器
- Filter过滤器
- 图片验证码
- HttpUrlConnection
- 国际化
- 文件上传
- 文件下载
- spring
- 简介
- Bean
- 获取和实例化
- 属性注入
- 自动装配
- 继承和依赖
- 作用域
- 使用外部属性文件
- spel
- 前后置处理器
- 生命周期
- 扫描规则
- 整合多个配置文件
- 注解
- 简介
- 注解分层
- 类注入
- 分层和作用域
- 初始化方法和销毁方法
- 属性
- 泛型注入
- Configuration配置文件
- aop
- aop的实现
- 动态代理实现
- cglib代理实现
- aop名词
- 简介
- aop-xml
- aop-注解
- 代理方式选择
- jdbc
- 简介
- JDBCTemplate
- 事务
- 整合
- junit整合
- hibernate
- 简介
- hibernate.properties
- 实体对象三种状态
- 检索方式
- 简介
- 导航对象图检索
- OID检索
- HQL
- Criteria(QBC)
- Query
- 缓存
- 事务管理
- 关系映射
- 注解
- 优化
- MyBatis
- 简介
- 入门程序
- Mapper动态代理开发
- 原始Dao开发
- Mapper接口开发
- SqlMapConfig.xml
- map映射文件
- 输出返回map
- 输入参数
- pojo包装类
- 多个输入参数
- resultMap
- 动态sql
- 关联
- 一对一
- 一对多
- 多对多
- 整合spring
- CURD
- 占位符和sql拼接以及参数处理
- 缓存
- 延迟加载
- 注解开发
- springMVC
- 简介
- RequestMapping
- 参数绑定
- 常用注解
- 响应
- 文件上传
- 异常处理
- 拦截器
- springBoot
- 配置
- 热更新
- java配置
- springboot配置
- yaml语法
- 运行
- Actuator 监控
- 多环境配置切换
- 日志
- 日志简介
- logback和access
- 日志文件配置属性
- 开机自启
- aop
- 整合
- 整合Redis
- 整合Spring Data JPA
- 基本查询
- 复杂查询
- 多数据源的支持
- Repository分析
- JpaSpecificationExecutor
- 整合Junit
- 整合mybatis
- 常用注解
- 基本操作
- 通用mapper
- 动态sql
- 关联映射
- 使用xml
- spring容器
- 整合druid
- 整合邮件
- 整合fastjson
- 整合swagger
- 整合JDBC
- 整合spingboot-cache
- 请求
- restful
- 拦截器
- 常用注解
- 参数校验
- 自定义filter
- websocket
- 响应
- 异常错误处理
- 文件下载
- 常用注解
- 页面
- Thymeleaf组件
- 基本对象
- 内嵌对象
- 上传文件
- 单元测试
- 模拟请求测试
- 集成测试
- 源码解析
- 自动配置原理
- 启动流程分析
- 源码相关链接
- Servlet,Filter,Listener
- springcloud
- 配置
- 父pom
- 创建子工程
- Eureka
- Hystrix
- Ribbon
- Feign
- Zuul
- kotlin
- 基本数据类型
- 函数
- 区间
- 区块链
- 简介
- linux
- ulimit修改
- 防止syn攻击
- centos7部署bbr
- debain9开启bbr
- mysql
- 隔离性
- sql执行加载顺序
- 7种join
- explain
- 索引失效和优化
- 表连接优化
- orderby的filesort问题
- 慢查询
- show profile
- 全局查询日志
- 死锁解决
- sql
- 主从
- IDEA
- mac快捷键
- 美化界面
- 断点调试
- 重构
- springboot-devtools热部署
- IDEA进行JAR打包
- 导入jar包
- ProjectStructure
- toString添加json模板
- 配置maven
- Lombok插件
- rest client
- 文档显示
- sftp文件同步
- 书签
- 代码查看和搜索
- postfix
- live template
- git
- 文件头注释
- JRebel
- 离线模式
- xRebel
- github
- 连接mysql
- 选项没有Java class的解决方法
- 扩展
- 项目配置和web部署
- 前端开发
- json和Inject language
- idea内存和cpu变高
- 相关设置
- 设计模式
- 单例模式
- 简介
- 责任链
- JUC
- 原子类
- 原子类简介
- 基本类型原子类
- 数组类型原子类
- 引用类型原子类
- JVM
- JVM规范内存解析
- 对象的创建和结构
- 垃圾回收
- 内存分配策略
- 备注
- 虚拟机工具
- 内存模型
- 同步八种操作
- 内存区域大小参数设置
- happens-before
- web service
- tomcat
- HTTPS
- nginx
- 变量
- 运算符
- 模块
- Rewrite规则
- Netty
- netty为什么没用AIO
- 基本组件
- 源码解读
- 简单的socket例子
- 准备netty
- netty服务端启动
- 案例一:发送字符串
- 案例二:发送对象
- websocket
- ActiveMQ
- JMS
- 安装
- 生产者-消费者代码
- 整合springboot
- kafka
- 简介
- 安装
- 图形化界面
- 生产过程分析
- 保存消息分析
- 消费过程分析
- 命令行
- 生产者
- 消费者
- 拦截器interceptor
- partition
- kafka为什么快
- kafka streams
- kafka与flume整合
- RabbitMQ
- AMQP
- 整体架构
- RabbitMQ安装
- rpm方式安装
- 命令行和管控页面
- 消息生产与消费
- 整合springboot
- 依赖和配置
- 简单测试
- 多方测试
- 对象支持
- Topic Exchange模式
- Fanout Exchange订阅
- 消息确认
- java client
- RabbitAdmin和RabbitTemplate
- 两者简介
- RabbitmqAdmin
- RabbitTemplate
- SimpleMessageListenerContainer
- MessageListenerAdapter
- MessageConverter
- 详解
- Jackson2JsonMessageConverter
- ContentTypeDelegatingMessageConverter
- lucene
- 简介
- 入门程序
- luke查看索引
- 分析器
- 索引库维护
- elasticsearch
- 配置
- 插件
- head插件
- ik分词插件
- 常用术语
- Mapping映射
- 数据类型
- 属性方法
- Dynamic Mapping
- Index Template 索引模板
- 管理映射
- 建立映射
- 索引操作
- 单模式下CURD
- mget多个文档
- 批量操作
- 版本控制
- 基本查询
- Filter过滤
- 组合查询
- 分析器
- redis
- String
- list
- hash
- set
- sortedset
- 发布订阅
- 事务
- 连接池
- 管道
- 分布式可重入锁
- 配置文件翻译
- 持久化
- RDB
- AOF
- 总结
- Lettuce
- zookeeper
- zookeeper简介
- 集群部署
- Observer模式
- 核心工作机制
- zk命令行操作
- zk客户端API
- 感知服务动态上下线
- 分布式共享锁
- 原理
- zab协议
- 两阶段提交协议
- 三阶段提交协议
- Paxos协议
- ZAB协议
- hadoop
- 简介
- hadoop安装
- 集群安装
- 单机安装
- linux编译hadoop
- 添加新节点
- 退役旧节点
- 集群间数据拷贝
- 归档
- 快照管理
- 回收站
- 检查hdfs健康状态
- 安全模式
- hdfs简介
- hdfs命令行操作
- 常见问题汇总
- hdfs客户端操作
- mapreduce工作机制
- 案例-单词统计
- 局部聚合Combiner
- combiner流程
- combiner案例
- 自定义排序
- 自定义Bean对象
- 排序的分类
- 案例-按总量排序需求
- 一次性完成统计和排序
- 分区
- 分区简介
- 案例-结果分区
- 多表合并
- reducer端合并
- map端合并(分布式缓存)
- 分组
- groupingComparator
- 案例-求topN
- 全局计数器
- 合并小文件
- 小文件的弊端
- CombineTextInputFormat机制
- 自定义InputFormat
- 自定义outputFormat
- 多job串联
- 倒排索引
- 共同好友
- 串联
- 数据压缩
- InputFormat接口实现类
- yarn简介
- 推测执行算法
- 本地提交到yarn
- 框架运算全流程
- 数据倾斜问题
- mapreduce的优化方案
- HA机制
- 优化
- Hive
- 安装
- shell参数
- 数据类型
- 集合类型
- 数据库
- DDL操作
- 创建表
- 修改表
- 分区表
- 分桶表
- DML操作
- load
- insert
- select
- export,import
- Truncate
- 注意
- 严格模式
- 函数
- 内置运算符
- 内置函数
- 自定义函数
- Transfrom实现
- having和where不同
- 压缩
- 存储
- 存储和压缩结合使用
- explain详解
- 调优
- Fetch抓取
- 本地模式
- 表的优化
- GroupBy
- count(Distinct)去重统计
- 行列过滤
- 动态分区调整
- 数据倾斜
- 并行执行
- JVM重用
- 推测执行
- reduce内存和个数
- sql查询结果作为变量(shell)
- youtube
- flume
- 简介
- 安装
- 常用组件
- 拦截器
- 案例
- 监听端口到控制台
- 采集目录到HDFS
- 采集文件到HDFS
- 多个agent串联
- 日志采集和汇总
- 单flume多channel,sink
- 自定义拦截器
- 高可用配置
- 使用注意
- 监控Ganglia
- sqoop
- 安装
- 常用命令
- 数据导入
- 准备数据
- 导入数据到HDFS
- 导入关系表到HIVE
- 导入表数据子集
- 增量导入
- 数据导出
- 打包脚本
- 作业
- 原理
- azkaban
- 简介
- 安装
- 案例
- 简介
- command类型单一job
- command类型多job工作流flow
- HDFS操作任务
- mapreduce任务
- hive脚本任务
- oozie
- 安装
- hbase
- 简介
- 系统架构
- 物理存储
- 寻址机制
- 读写过程
- 安装
- 命令行
- 基本CURD
- java api
- CURD
- CAS
- 过滤器查询
- 建表高级属性
- 与mapreduce结合
- 与sqoop结合
- 协处理器
- 参数配置优化
- 数据备份和恢复
- 节点管理
- 案例-点击流
- 简介
- HUE
- 安装
- storm
- 简介
- 安装
- 集群启动及任务过程分析
- 单词统计
- 单词统计(接入kafka)
- 并行度和分组
- 启动流程分析
- ACK容错机制
- ACK简介
- BaseRichBolt简单使用
- BaseBasicBolt简单使用
- Ack工作机制
- 本地目录树
- zookeeper目录树
- 通信机制
- 案例
- 日志告警
- 工具
- YAPI
- chrome无法手动拖动安装插件
- 时间和空间复杂度
- jenkins
- 定位cpu 100%
- 常用脚本工具
- OOM问题定位
- scala
- 编译
- 基本语法
- 函数
- 数组常用方法
- 集合
- 并行集合
- 类
- 模式匹配
- 异常
- tuple元祖
- actor并发编程
- 柯里化
- 隐式转换
- 泛型
- 迭代器
- 流stream
- 视图view
- 控制抽象
- 注解
- spark
- 企业架构
- 安装
- api开发
- mycat
- Groovy
- 基础