企业🤖AI智能体构建引擎,智能编排和调试,一键部署,支持私有化部署方案 广告
[TOC] # 介绍 拦截器是简单的插件式组件,设置在source和channel之间。source接收到的事件event,在写入channel之前,拦截器都可以进行转换或者删除这些事件。每个拦截器只处理同一个source接收到的事件。可以自定义拦截器 # flume内置的拦截器 ## 时间戳拦截器 flume中一个最经常使用的拦截器 ,该拦截器的作用是将时间戳插入到flume的事件报头中。如果不使用任何拦截器,flume接受到的只有message。时间戳拦截器的配置: ![](https://box.kancloud.cn/464464c9432ad35799f6d69cb8061d9c_704x328.png) source连接到时间戳拦截器的配置: ~~~ a1.sources.r1.interceptors=i1 a1.sources.r1.interceptors.i1.type=timestamp a1.sources.r1.interceptors.i1.preserveExisting=false ~~~ ## 主机拦截器 主机拦截器插入服务器的ip地址或者主机名,agent将这些内容插入到事件的报头中。事件报头中的key使用hostHeader配置,默认是host。主机拦截器的配置: ![](https://box.kancloud.cn/4127ba92df55a4766518629cb8bb0b15_701x406.png) source连接到主机拦截器的配置: ~~~ a1.sources.r1.interceptors=i2 a1.sources.r1.interceptors.i2.type=host a1.sources.r1.interceptors.i2.useIP=false a1.sources.r1.interceptors.i2.preserveExisting=false ~~~ ## 静态拦截器 静态拦截器的作用是将k/v插入到事件的报头中。配置如下 ![](https://box.kancloud.cn/92fe18b32ef5d3f46330bc70eab86de6_694x409.png) source连接到静态拦截器的配置: ~~~ a1.sources.r1.interceptors= i3 a1.sources.r1.interceptors.static.type=static # 自己指定k/v a1.sources.r1.interceptors.static.key=logs a1.sources.r1.interceptors.static.value=logFlume a1.sources.r1.interceptors.static.preserveExisting=false ~~~ ## 正则过滤拦截器 在日志采集的时候,可能有一些数据是我们不需要的,这样添加过滤拦截器,可以过滤掉不需要的日志,也可以根据需要收集满足正则条件的日志。配置如下 ![](https://box.kancloud.cn/4e7443fb526ccfd257480f33f3c8c17e_683x403.png) source连接到正则过滤拦截器的配置: ~~~ a1.sources.r1.interceptors=i4 a1.sources.r1.interceptors.i4.type=REGEX_FILTER # 保留内容中出现rm或者kill的字符串的记录 a1.sources.r1.interceptors.i4.regex=(rm)|(kill) a1.sources.r1.interceptors.i4.excludeEvents=false ~~~ 这样配置的拦截器就只会接收日志消息中带有rm 或者kill的日志。 测试案例: test_regex.conf ~~~ # 定义这个agent中各组件的名字 a1.sources = r1 a1.sinks = k1 a1.channels = c1 # 描述和配置source组件:r1 a1.sources.r1.type = netcat a1.sources.r1.bind = itcast01 a1.sources.r1.port = 44444 a1.sources.r1. a1.sources.r1.interceptors=i4 a1.sources.r1.interceptors.i4.type=REGEX_FILTER #保留内容中出现hadoop或者是spark的字符串的记录 a1.sources.r1.interceptors.i4.regex=(hadoop)|(spark) a1.sources.r1.interceptors.i4.excludeEvents=false # 描述和配置sink组件:k1 a1.sinks.k1.type = logger # 描述和配置channel组件,此处使用是内存缓存的方式 a1.channels.c1.type = memory a1.channels.c1.capacity = 1000 a1.channels.c1.transactionCapacity = 100 # 描述和配置source channel sink之间的连接关系 a1.sources.r1.channels = c1 a1.sinks.k1.channel = c1 ~~~