[TOC]
# failover故障转移
在完成单点的Flume NG搭建后,下面我们搭建一个高可用的Flume NG集群,架构图如下所示:
![](https://box.kancloud.cn/e6600e5a18d2821c4f2083c93eef0192_1001x748.png)
## 节点分配
Flume的Agent和Collector分布如下表所示
| 名称 | ip地址 | Host | 角色 |
| --- | --- | --- | --- |
| Agent1 | 1992.168.200.101 | it1 | webserver |
| collector1 | 192.168.200.102 | it2 | AgentMstr1 |
| collector2 | 192.168.200.103 | it3 | AgentMstr2 |
Agent1数据分别流入到Collector1和Collector2,Flume NG本身提供了Failover机制,可以自动切换和恢复。下面我们开发配置Flume NG集群
## 配置
在下面单点Flume中,基本配置都完成了,我们只需要新添加两个配置文件,它们是flume-client.conf和flume-server.conf,其配置内容如下所示:
这边it1,it2,it3是不同机器的hostname
1. it1上的flume-client.conf配置
~~~
#agent1 name
agent1.channels = c1
agent1.sources = r1
# sinks可以配置多个
agent1.sinks = k1 k2
#设置组别为g1
agent1.sinkgroups = g1
#把g1和k1,k2这2个组装起来
agent1.sinkgroups.g1.sinks = k1 k2
#设置channel
agent1.channels.c1.type = memory
agent1.channels.c1.capacity = 1000
agent1.channels.c1.transactionCapacity = 100
agent1.sources.r1.channels = c1
agent1.sources.r1.type = exec
agent1.sources.r1.command = tail -F /root/log/test.log
agent1.sources.r1.interceptors = i1 i2
agent1.sources.r1.interceptors.i1.type = static
agent1.sources.r1.interceptors.i1.key = Type
agent1.sources.r1.interceptors.i1.value = LOGIN
agent1.sources.r1.interceptors.i2.type = timestamp
# set sink1
agent1.sinks.k1.channel = c1
agent1.sinks.k1.type = avro
agent1.sinks.k1.hostname = it2
agent1.sinks.k1.port = 52020
# set sink2
agent1.sinks.k2.channel = c1
agent1.sinks.k2.type = avro
agent1.sinks.k2.hostname = it3
agent1.sinks.k2.port = 52020
#设置failover故障转移
agent1.sinkgroups.g1.processor.type = failover
# 配置优先级,优先传输到k1
agent1.sinkgroups.g1.processor.priority.k1 = 10
agent1.sinkgroups.g1.processor.priority.k2 = 5
# 将故障的sink剔除的时间,出了故障不会立刻剔除
agent1.sinkgroups.g1.processor.maxpenalty = 10000
#这里首先要申明一个sinkgroups,然后再设置2个sink ,k1与k2,其中2个优先级是10和5,
#而processor的maxpenalty被设置为10秒,默认是30秒
~~~
启动命令:
~~~
flume-ng agent -n agent1 -c conf -f /root/flume/conf/flume-client.conf -Dflume.root.logger=DEBUG,console
~~~
2. it2和it3上的flume-server.conf配置
~~~
#set Agent name
a1.sources = r1
a1.channels = c1
a1.sinks = k1
#set channel
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100
# other node,nna to nns
a1.sources.r1.type = avro
# 任意一台服务器都可以连接
a1.sources.r1.bind = 0.0.0.0
a1.sources.r1.port = 52020
a1.sources.r1.channels = c1
# 配置拦截器
a1.sources.r1.interceptors = i1 i2
a1.sources.r1.interceptors.i1.type = timestamp
a1.sources.r1.interceptors.i2.type = host
a1.sources.r1.interceptors.i2.hostHeader=hostname
#set sink to hdfs
a1.sinks.k1.type=hdfs
a1.sinks.k1.hdfs.path=/data/flume/logs/%{hostname}
a1.sinks.k1.hdfs.filePrefix=%Y-%m-%d
a1.sinks.k1.hdfs.fileType=DataStream
a1.sinks.k1.hdfs.writeFormat=TEXT
a1.sinks.k1.hdfs.rollInterval=10
a1.sinks.k1.channel=c1
~~~
启动命令:
~~~
flume-ng agent -n a1 -c conf -f /root/flume/conf/flume-server.conf -Dflume.root.logger=DEBUG,console
~~~
## 测试failover
1. 先在it2和it3上启动脚本
~~~
flume-ng agent -n a1 -c conf -f conf/flume-server.conf
-Dflume.root.logger=DEBUG,console
~~~
2. 然后启动it1上的脚本
~~~
flume-ng agent -n agent1 -c conf -f conf/flume-client.conf
-Dflume.root.logger=DEBUG,console
~~~
3. Shell脚本生成数据
~~~
while true;do date >> test.log; sleep 1s ;done
~~~
# load balance负载均衡
1. 节点分配
如failover故障转移的节点分配
2. 配置
在failover故障转移的配置上稍作修改
it1上的flume-client-loadbalance.conf配置
~~~
#agent1 name
agent1.channels = c1
agent1.sources = r1
agent1.sinks = k1 k2
#set gruop
agent1.sinkgroups = g1
#set channel
agent1.channels.c1.type = memory
agent1.channels.c1.capacity = 1000
agent1.channels.c1.transactionCapacity = 100
agent1.sources.r1.channels = c1
agent1.sources.r1.type = exec
agent1.sources.r1.command = tail -F /root/log/test.log
# set sink1
agent1.sinks.k1.channel = c1
agent1.sinks.k1.type = avro
agent1.sinks.k1.hostname = it2
agent1.sinks.k1.port = 52020
# set sink2
agent1.sinks.k2.channel = c1
agent1.sinks.k2.type = avro
agent1.sinks.k2.hostname = it3
agent1.sinks.k2.port = 52020
#set sink group
agent1.sinkgroups.g1.sinks = k1 k2
#设置负载均衡
agent1.sinkgroups.g1.processor.type = load_balance
# 默认是round_robin轮询,还可以选择random随机
agent1.sinkgroups.g1.processor.selector = round_robin
#如果backoff被开启,则 sink processor会屏蔽故障的sink
# 开启,表示如果有某个slink发生故障,他就不会再继续发送,会把他剔除
agent1.sinkgroups.g1.processor.backoff = true
~~~
it2和it3上的flume-server-loadbalance.conf配置
~~~
#set Agent name
a1.sources = r1
a1.channels = c1
a1.sinks = k1
#set channel
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100
# other node,nna to nns
a1.sources.r1.type = avro
a1.sources.r1.bind = 0.0.0.0
a1.sources.r1.port = 52020
a1.sources.r1.channels = c1
a1.sources.r1.interceptors = i1 i2
a1.sources.r1.interceptors.i1.type = timestamp
a1.sources.r1.interceptors.i2.type = host
a1.sources.r1.interceptors.i2.hostHeader=hostname
a1.sources.r1.interceptors.i2.useIP=false
#set sink to hdfs
a1.sinks.k1.type=hdfs
a1.sinks.k1.hdfs.path=/data/flume/loadbalance/%{hostname}
a1.sinks.k1.hdfs.fileType=DataStream
a1.sinks.k1.hdfs.writeFormat=TEXT
a1.sinks.k1.hdfs.rollInterval=10
a1.sinks.k1.channel=c1
a1.sinks.k1.hdfs.filePrefix=%Y-%m-%d
~~~
## 测试load balance
1. 先在it2和it3上启动脚本
~~~
bin/flume-ng agent -n a1 -c conf -f conf/flume-server-loadbalance.conf
-Dflume.root.logger=DEBUG,console
~~~
2. 然后启动itcast01上的脚本
~~~
bin/flume-ng agent -n agent1 -c conf -f conf/flume-client-loadbalance.conf
-Dflume.root.logger=DEBUG,console
~~~
3. Shell脚本生成数据
~~~
while true;do date >> test.log; sleep 1s ;done
~~~
4. 观察HDFS上生成的数据目录,由于轮训机制都会收集到数据
5. it2上的agent被干掉之后,it2上不在产生数据
6. it2上的agent重新启动后,两者都可以接受到数据
- linux
- 常用命令
- 高级文本命令
- 面试题
- redis
- String
- list
- hash
- set
- sortedSet
- 案例-推荐
- java高级特性
- 多线程
- 实现线程的三种方式
- 同步关键词
- 读写锁
- 锁的相关概念
- 多线程的join
- 有三个线程T1 T2 T3,保证顺序执行
- java五种线程池
- 守护线程与普通线程
- ThreadLocal
- BlockingQueue消息队列
- JMS
- 反射
- volatile
- jvm
- IO
- nio
- netty
- netty简介
- 案例一发送字符串
- 案例二发送对象
- 轻量级RPC开发
- 简介
- spring(IOC/AOP)
- spring初始化顺序
- 通过ApplicationContextAware加载Spring上下文
- InitializingBean的作用
- 结论
- 自定义注解
- zk在框架中的应用
- hadoop
- 简介
- hadoop集群搭建
- hadoop单机安装
- HDFS简介
- hdfs基本操作
- hdfs环境搭建
- 常见问题汇总
- hdfs客户端操作
- mapreduce工作机制
- 案列-单词统计
- 局部聚合Combiner
- 案列-流量统计(分区,排序,比较)
- 案列-倒排索引
- 案例-共同好友
- 案列-join算法实现
- 案例-求topN(分组)
- 自定义inputFormat
- 自定义outputFormat
- 框架运算全流程
- mapreduce的优化方案
- HA机制
- Hive
- 安装
- DDL操作
- 创建表
- 修改表
- DML操作
- Load
- insert
- select
- join操作
- 严格模式
- 数据类型
- shell参数
- 函数
- 内置运算符
- 内置函数
- 自定义函数
- Transform实现
- 特殊分割符处理
- 案例
- 级联求和accumulate
- flume
- 简介
- 安装
- 常用的组件
- 拦截器
- 案例
- 采集目录到HDFS
- 采集文件到HDFS
- 多个agent串联
- 日志采集和汇总
- 自定义拦截器
- 高可用配置
- 使用注意
- sqoop
- 安装
- 数据导入
- 导入数据到HDFS
- 导入关系表到HIVE
- 导入表数据子集
- 增量导入
- 数据导出
- 作业
- 原理
- azkaban
- 简介
- 安装
- 案例
- 简介
- command类型单一job
- command类型多job工作流flow
- HDFS操作任务
- mapreduce任务
- hive脚本任务
- hbase
- 简介
- 安装
- 命令行
- 基本CURD
- 过滤器查询
- 系统架构
- 物理存储
- 寻址机制
- 读写过程
- Region管理
- master工作机制
- 建表高级属性
- 与mapreduce结合
- 协处理器
- 点击流平台开发
- 简介
- storm
- 简介
- 安装
- 集群启动及任务过程分析
- 单词统计
- 并行度
- ACK容错机制
- ACK简介