[TOC]
# filter
## 1. filter作用
> Logstash过滤器根据规则提取数据
## 2. 过滤器种类
* [ ] date过滤器:获得时间格式的数据
* [ ] extractnumbers过滤器:从字符串中提取数字
* [ ] grok过滤器:使用grok格式提取数据中的指定内容,比如从一长串的日志中,提取出ip地址
### 2.1 date过滤器
> 用于解析事件中的时间数据,日期过滤器用于解析字段中的日期,然后使用该日期或时间戳作为事件的logstash时间戳。
> 默认情况下如果不加date插件的话,在kibana上创建索引模式的时候,选择的时间戳字段是logstash处理日志的时间,如果日志输出不是实时的,显示日志时间与对应的事件是不符合的(即es存储日志的时间字段是当前时间,不是log实际发生的时间)
~~~dart
match => ["timestamp", "ISO8601", "dd-MMM-yyyy HH:mm:ss.SSS", "yyyy-MM-dd HH:mm:ss"]
默认target指向的就是@timestamp,意思就是如果timestamp匹配上后面自定义的日志格式,就会替换对应的@times
~~~
可配置的参数:
| 参数 | 作用 | 参数类型 |
| --- | ---| --- |
| locale | 指定用于日期解析的区域设置 |string |
| match| 如何匹配时间格式 |array |
| tag_on_failure | 匹配失败后追加的内容 | array |
| target | 匹配成功后的内容需要设置的目标字段 | string |
| timezone | 指定用于日期解析的时区规范ID |string |
配置
```
input {
redis {
key => "logstash-date"
host => "localhost"
password => "dailearn"
port => 6379
db => "0"
data_type => "list"
type => "date"
}
}
filter {
date {
match => [ "message", "yyyy-MM-dd HH:mm:ss" ]
locale => "Asia/Shanghai"
timezone => "Europe/Paris"
target => "messageDate"
}
}
output {
stdout { codec => rubydebug }
}
```
测试数据
2020-05-07 23:59:59
控制台输出
```
{
"@timestamp" => 2020-05-12T13:47:26.094Z,
"type" => "date",
"tags" => [
[0] "_jsonparsefailure"
],
"@version" => "1",
"message" => "2020-05-07 23:59:59",
"messageDate" => 2020-05-07T21:59:59.000Z
}
```
需要注意我是在2020-05-12 21:47:26发送的消息,而服务器使用的时区为UTC/GMT 0。所以这个时候我插入了2020-05-07 23:59:59的时间,被指定为Europe/Paris时区,所以最后被转为2020-05-07T21:59:59.000Z
### 2.2 extractnumbers过滤器
注意默认情况下次过滤器为捆绑需要执行bin/logstash-plugin install logstash-filter-extractnumbers操作安装插件
此过滤器自动提取字符串中找到的所有数字
可配置的参数
![](https://img.kancloud.cn/af/72/af728770dafcb72b8e461e04898733e3_964x82.png)
配置
```
input {
redis {
key => "logstash-extractnumbers"
host => "localhost"
password => "dailearn"
port => 6379
db => "0"
data_type => "list"
type => "extractnumbers"
}
}
filter {
extractnumbers {
source => "message"
target => "message2"
}
}
```
```
output {
stdout { codec => rubydebug }
}
```
测试数据
zhangSan5,age16 + 0.5 456 789
控制台输出
{
"float1" => 0.5,
"int2" => 789,
"int1" => 456,
"@timestamp" => 2020-05-17T03:51:23.695Z,
"@version" => "1",
"type" => "extractnumbers",
"message" => "zhangSan5,age16 + 0.5 456 789",
"tags" => [
[0] "_jsonparsefailure"
]
}
虽然文档中介绍,此过滤器会尝试提取出字符串中所有的字段,但是实际中部分和字母结合的字符串结构并没有被提取出来,而那些被字母和其他符号被包裹起来的数字没有被完整的提取出来,比如之前测试的时候使用的zhangSan5,age16 + 0.5,456[789],888这样的数据就没有任何内容被提取出来。
### 2.3 grok过滤器
> Grok 是 Logstash 最重要的插件。它的主要作用就是将文本格式的字符串,转换成为具体的结构化的数据,配合正则表达式使用。
下面针对Apache日志来分割处理
![](https://img.kancloud.cn/59/b1/59b1b84967304c71fffbf14cd8ca475e_712x268.png)
下面是apache日志
![](https://img.kancloud.cn/0e/3a/0e3a07461bfed61b77d240fcc812bf71_710x64.png)
日志中每个字段之间空格隔开,分别对应message中的字段。
如:%{IPORHOST:addre} --> 192.168.10.197
但问题是IPORHOST又不是正则表达式,怎么能匹配IP地址呢?
因为IPPRHOST是grok表达式,它代表的正则表达式如下:
![](https://img.kancloud.cn/8d/d2/8dd2e54a5ac6864329a0805cf1516358_725x382.png)
IPORHOST代表的是ipv4或者ipv6或者HOSTNAME所匹配的grok表达式。
上面的IPORHOST有点复杂,我们来看看简单点的,如USER
```
USERNAME \[a-zA-Z0-9.\_-\]+
```
#USERNAME是匹配由字母,数字,“.”, "\_", "-"组成的任意字符
USER %{USERNAME}
#USER代表USERNAME的正则表达式
第一行,用普通的正则表达式来定义一个 grok 表达式;
第二行,通过打印赋值格式,用前面定义好的 grok 表达式来定义另一个 grok 表达式。
**grok的语法:**
%{syntax:semantic}
syntax代表的是正则表达式替代字段,semantic是代表这个表达式对应的字段名,你可以自由命名。这个命名尽量能简单易懂的表达出这个字段代表的意思。
logstash安装时就带有已经写好的正则表达式。路径如下:
/usr/local/logstash-2.3.4/vendor/bundle/jruby/1.9/gems/logstash-patterns-core-2.0.5/patterns
或者直接访问https://github.com/elastic/logstash/blob/v1.4.2/patterns/grok-patterns
上面IPORHOST,USER等都是在里面已经定义好的!当然还有其他的,基本能满足我们的需求
**日志匹配**
当我们拿到一段日志,按照上面的grok表达式一个个去匹配时,我们如何确定我们匹配的是否正确呢?
http://grokdebug.herokuapp.com/ 这个地址可以满足我们的测试需求。就拿上面apache的日志测试。
![](https://img.kancloud.cn/8c/fc/8cfc03fd858727ace5bdf38bb4669ede_942x290.png)
点击后就出现如下数据,你写的每个grok表达式都获取到值了。为了测试准确,可以多测试几条日志。
~~~
{
"addre": [
[
"192.168.10.97"
]
],
"HOSTNAME": [
[
"192.168.10.97",
"192.168.10.175"
]
...........中间省略多行...........
"http_referer": [
[
"http://192.168.10.175/"
]
],
"URIPROTO": [
[
"http"
]
],
"URIHOST": [
[
"192.168.10.175"
]
],
"IPORHOST": [
[
"192.168.10.175"
]
],
"User_Agent": [
[
"Mozilla/5.0 (Windows NT 6.1; WOW64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/45.0.2454.101 Safari/537.36"
]
]
}
~~~
每条日志总有些字段是没有数据显示,然后以“-”代替的。所有我们在匹配日志的时候也要判断。
如:(?:%{NUMBER:bytes}|-)
但是有些字符串是在太长,如:Mozilla/5.0 (Windows NT 6.1; WOW64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/45.0.2454.101 Safari/537.36
我们可以使用%{GREEDYDATA browser}.
对应的grok表达式: GREEDYDATA .\*
#GREEDYDATA表达式的意思能匹配任意字符串
**自定义grok表达式**
如果你感觉logstash自带的grok表达式不能满足需要,你也可以自己定义
如:
![](https://img.kancloud.cn/49/df/49dfd520d0fa2f833bbde64d5725ee5c_705x261.png)
patterns\_dir为自定义的grok表达式的路径。
自定义的patterns中按照logstash自带的格式书写。
APACHE\_LOG %{IPORHOST:addre} %{USER:ident} %{USER:auth} \\\[%{HTTPDATE:timestamp}\\\] \\"%{WORD:http\_method} %{NOTSPACE:request} HTTP/%{NUMBER:httpversion}\\" %{NUMBER:status} (?:%{NUMBER:bytes}|-) \\"(?:%{URI:http\_referer}|-)\\" \\"%{GREEDYDATA:User\_Agent}\\"
我只是把apache日志匹配的grok表达式写入自定义文件中,简化conf文件。单个字段的正则表达式匹配你可以自己书写测试。
### 2.3 提取Java日志字段
一般来说,我们从filebeat或者其他地方拿到下面的日志:
~~~text
2018-04-13 16:03:49.822 INFO o.n.p.j.c.XXXXX - Star Calculator
~~~
然后想把它其中的一些信息提取出来再扔到es中存储,我们就需要grok match把日志信息切分成[索引数据](https://www.zhihu.com/search?q=%E7%B4%A2%E5%BC%95%E6%95%B0%E6%8D%AE&search_source=Entity&hybrid_search_source=Entity&hybrid_search_extra=%7B%22sourceType%22%3A%22article%22%2C%22sourceId%22%3A37128731%7D)(match本质是一个正则匹配)
[grok match](https://www.zhihu.com/search?q=grok+match&search_source=Entity&hybrid_search_source=Entity&hybrid_search_extra=%7B%22sourceType%22%3A%22article%22%2C%22sourceId%22%3A37128731%7D):
~~~json
match => { "message" => "%{DATA:log_date} %{TIME:log_localtime} %{WORD:log_type} %{JAVAFILE:log_file} - %{GREEDYDATA:log_content}"}
~~~
切出来的数据
~~~json
{
"log_date": [
[
"2018-04-13"
]
],
"log_localtime": [
[
"16:03:49.822"
]
],
"HOUR": [
[
"16"
]
],
"MINUTE": [
[
"03"
]
],
"SECOND": [
[
"49.822"
]
],
"log_type": [
[
"INFO"
]
],
"log_file": [
[
"o.n.p.j.c.XXXX"
]
],
"log_content": [
[
"Star Calculator"
]
]
}
~~~
上面所有切出来的field都是es中[mapping index](https://www.zhihu.com/search?q=mapping+index&search_source=Entity&hybrid_search_source=Entity&hybrid_search_extra=%7B%22sourceType%22%3A%22article%22%2C%22sourceId%22%3A37128731%7D),都可以在用来做条件查询.
[grokdebug.herokuapp.com](https://link.zhihu.com/?target=http%3A//grokdebug.herokuapp.com/)里面可以做测试.
[grokdebug.herokuapp.com/patterns](https://link.zhihu.com/?target=http%3A//grokdebug.herokuapp.com/patterns)所有可用的patterns都可以在这里查到.
现在我们在用的配置见/logstash/logstash-k8s.conf
Q: 需要指定mapping index的数据类型怎么办?
A: grok match本质是一个正则匹配,默认出来的数据都是String.有些时候我们知道某个值其实是个数据类型,这时候可以直接指定数据类型. 不过match中仅支持直接转换成int ,float,语法是 %{NUMBER:response\_time:int}
完整配置:
~~~json
match => {
"message" => "%{DATA:log_date} %{TIME:log_localtime} %{WORD:log_type} %{JAVAFILE:log_file} - %{WORD:method} %{URIPATHPARAM:uri} %{NUMBER:status:int} %{NUMBER:size:int} %{NUMBER:response_time:int}"}
~~~
Q:[索引文件](https://www.zhihu.com/search?q=%E7%B4%A2%E5%BC%95%E6%96%87%E4%BB%B6&search_source=Entity&hybrid_search_source=Entity&hybrid_search_extra=%7B%22sourceType%22%3A%22article%22%2C%22sourceId%22%3A37128731%7D)想需要按日期分别存放,怎么办?
A: out中指定index格式,如 index=> "k8s-%{+YYYY.MM.dd}"
完整out如下:
~~~json
output {
elasticsearch {
hosts => "${ES_URL}"
manage_template => false
index => "k8s-%{+YYYY.MM.dd}"
}
}
~~~
完整[logstash.conf](https://www.zhihu.com/search?q=logstash.conf&search_source=Entity&hybrid_search_source=Entity&hybrid_search_extra=%7B%22sourceType%22%3A%22article%22%2C%22sourceId%22%3A37128731%7D)
~~~text
input {
beats {
host => "0.0.0.0"
port => 5043
}
}
filter {
if [type] == "kube-logs" {
mutate {
rename => ["log", "message"]
}
date {
match => ["time", "ISO8601"]
remove_field => ["time"]
}
grok {
match => {
"source" => "/var/log/containers/%{DATA:pod_name}_%{DATA:namespace}_%{GREEDYDATA:container_name}-%{DATA:container_id}.log"}
match => {
"message" => "%{DATA:log_date} %{TIME:log_localtime} %{WORD:log_type} %{JAVAFILE:log_file} - %{WORD:method} %{URIPATHPARAM:uri} %{NUMBER:status:int} %{NUMBER:size:int} %{NUMBER:response_time:int}"}
remove_field => ["source"]
break_on_match => false
}
}
}
output {
elasticsearch {
hosts => "${ES_URL}"
manage_template => false
index => "k8s-%{+YYYY.MM.dd}"
}
}
~~~
- springcloud
- springcloud的作用
- springboot服务提供者和消费者
- Eureka
- ribbon
- Feign
- feign在微服务中的使用
- feign充当http请求工具
- Hystrix 熔断器
- Zuul 路由网关
- Spring Cloud Config 分布式配置中心
- config介绍与配置
- Spring Cloud Config 配置实战
- Spring Cloud Bus
- gateway
- 概念讲解
- 实例
- GateWay
- 统一日志追踪
- 分布式锁
- 1.redis
- springcloud Alibaba
- 1. Nacos
- 1.1 安装
- 1.2 特性
- 1.3 实例
- 1. 整合nacos服务发现
- 2. 整合nacos配置功能
- 1.4 生产部署方案
- 环境隔离
- 原理讲解
- 1. 服务发现
- 2. sentinel
- 3. Seata事务
- CAP理论
- 3.1 安装
- 分布式协议
- 4.熔断和降级
- springcloud与alibba
- oauth
- 1. abstract
- 2. oauth2 in micro-service
- 微服务框架付费
- SkyWalking
- 介绍与相关资料
- APM系统简单对比(zipkin,pinpoint和skywalking)
- server安装部署
- agent安装
- 日志清理
- 统一日志中心
- docker安装部署
- 安装部署
- elasticsearch 7.x
- logstash 7.x
- kibana 7.x
- ES索引管理
- 定时清理数据
- index Lifecycle Management
- 没数据排查思路
- ELK自身组件监控
- 多租户方案
- 慢查询sql
- 日志审计
- 开发
- 登录认证
- 链路追踪
- elk
- Filebeat
- Filebeat基础
- Filebeat安装部署
- 多行消息Multiline
- how Filebeat works
- Logstash
- 安装
- rpm安装
- docker安装Logstash
- grok调试
- Grok语法调试
- Grok常用表达式
- 配置中常见判断
- filter提取器
- elasticsearch
- 安装
- rpm安装
- docker安装es
- 使用
- 概念
- 基础
- 中文分词
- 统计
- 排序
- 倒排与正排索引
- 自定义dynamic
- 练习
- nested object
- 父子关系模型
- 高亮
- 搜索提示
- kibana
- 安装
- docker安装
- rpm安装
- 整合
- 收集日志
- 慢sql
- 日志审计s
- 云
- 分布式架构
- 分布式锁
- Redis实现
- redisson
- 熔断和降级