LogStash管道通常有一个或多个输入,过滤,输出插件。本节中的场景通过Logstash配置文件来指定这些插件,并讨论了每个插件在做什么。
Logstash配置文件定义了Logstash 管道,当你使用-f <path/to/file>启动一个Logstash实例,其实使用了一个配置文件定义了管道实例。
一个Logstash管道有两个必备元素,输入和输出,一个可选元素,过滤器。input插件从一个源摄取数据,filter插件按照你指定的方式修改数据,output插件写出数据到一个目标数据库。
![](https://box.kancloud.cn/2016-05-26_57465ebc058e6.png)
下面是一个管道配置的模板:
```
# #是注释,使用注释描述配置
input {
}
# 该部分被注释,表示filter是可选的
# filter {
#
# }
output {
}
```
这个模板是不具备功能的,因为输入和输出部分没有定义的任何有效的选项。
将模板粘贴到你的Logstash根目录,命名为first-pipeline.conf。
## 解析Apache日志输出到Elasticsearch
这个例子创建一个Logstash管道,使用Apache web logs 作为输入,解析这些日志到特定的命名字段,并且输出这些解析数据到ElashticSearch集群
你可以下载样例数据,[这里](https://download.elastic.co/demos/logstash/gettingstarted/logstash-tutorial.log.gz)
### 配置Logstash用于文件输入
启动Logstash管道,使用file input插件配置Logstash实例。
修改first-pipeline.conf 添加如下内容:
```
input {
file {
path => "/path/to/logstash-tutorial.log"
start_position => beginning
ignore_older => 0
}
}
```
* file input 插件的默认行为是监视文件中的末尾新内容,和Unix中的tail -f 命令异曲同工。改变这种默认行为的方式是指定Logstash从文件的什么位置开始处理文件。
* 文件输入插件的默认行为是忽略最后时间距今超过86400秒的文件。要更改这种默认行为,并处理tutorial file(日期距今大于1天的文件),我们需要指定不能忽视旧文件。
用实际的logstash-tutorial.log日志文件目录替换 /path/to/。
### 用Grok 过滤器插件解析web logs
gork 过滤器插件是Logstash的几个默认可用插件之一。关于如何管理Logstash 插件的更多细节请参考:[reference documentation](https://www.elastic.co/guide/en/logstash/current/working-with-plugins.html)
grok 过滤器插件在输入日志中查找指定的模式,配置文件中需要指定哪些模式的识别是你的案例所感兴趣的。一个代表性的web server 日志文件行信息,如下:
```
83.149.9.216 - - [04/Jan/2015:05:13:42 +0000] "GET /presentations/logstash-monitorama-2013/images/kibana-search.png
HTTP/1.1" 200 203023 "http://semicomplete.com/presentations/logstash-monitorama-2013/" "Mozilla/5.0 (Macintosh; Intel
Mac OS X 10_9_1) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/32.0.1700.77 Safari/537.36"
```
行首的IP地址信息很容易被识别,括号中的时间戳也是如此。在此教程中,使用```%{COMBINEDAPACHELOG} grok``` 模式,它的行信息结构有如下的一些字段。
|Information|Field Name|
|-|-|
|IP Address|clientip|
|User ID|ident|
|User Authentication|auth|
|timestamp|timestamp|
|HTTP Verb|verb|
|Request body|request|
|HTTP Version|httpversion|
|HTTP Status Code|response|
|Bytes served|bytes|
|Referrer URL|referrer|
|User agent|agent|
编写 first-pipeline.conf添加如下的内容:
```
filter {
grok {
match => { "message" => "%{COMBINEDAPACHELOG}"}
}
}
```
过滤器处理之后,一个行信息是如下的JSON的结构:
```
{
"clientip" : "83.149.9.216",
"ident" : ,
"auth" : ,
"timestamp" : "04/Jan/2015:05:13:42 +0000",
"verb" : "GET",
"request" : "/presentations/logstash-monitorama-2013/images/kibana-search.png",
"httpversion" : "HTTP/1.1",
"response" : "200",
"bytes" : "203023",
"referrer" : "http://semicomplete.com/presentations/logstash-monitorama-2013/",
"agent" : "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_9_1) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/32.0.1700.77 Safari/537.36"
}
```
### 索引数据到ElasticSearch集群
Now that the web logs are broken down into specific fields, the Logstash pipeline can index the data into an Elasticsearch cluster. Edit the first-pipeline.conf file to add the following text after the input section:
现在,网络日志被分解成具体的字段, 使用Logstash管道可以将数据索引到Elasticsearch集群。编辑first-pipeline.conf 文件,在input 段之后添加以下文字:
```
output {
elasticsearch {
}
}
```
用这样的配置, Logstash使用http协议连接到Elasticsearch 。上面的例子假设Logstash和Elasticsearch运行在同一实例中。您可以使用主机配置像```hosts => "es-machine:9092"```指定远程Elasticsearch实例。
### 使用geoip插件丰富你的数据内容
除了解析日志数据,filter插件可以从现有数据基础上补充信息。例如: geoip插件查找IP地址,从而导出地址地理位置信息,并将该位置信息发送给日志。
使用geoip filter插件配置您的Logstash,如:通过添加以下行到first-pipeline.conf文件的filter部分:
```
geoip {
source => "clientip"
}
```
插件geoip的配置需要的数据已经被定义为独立的字段中的数据。确保在配置文件中,geoip 配置段在grok配置段之后。
需要指定包含IP地址的字段的名称。在本例中,字段名称是clientip(grok插件字段) 。
### 测试一下
此时此刻,你的first-pipeline.conf文件有input,filter和output段,正确的配置看起来是这样的:
```
input {
file {
path => "/Users/palecur/logstash-1.5.2/logstash-tutorial-dataset"
start_position => beginning
}
}
filter {
grok {
match => { "message" => "%{COMBINEDAPACHELOG}"}
}
geoip {
source => "clientip"
}
}
output {
elasticsearch {}
stdout {}
}
```
测试配置语法的正确性,使用下面的命令:
```
bin/logstash -f first-pipeline.conf --configtest
```
使用--configtest 选项解析你的配置文件并报告错误.当配置文件通过检查之后,用如下命令启动Logstash:
```
bin/logstash -f first-pipeline.conf
```
基于gork filter插件的字段索引,在Elasticsearch运行一个测试查询,
```
curl -XGET 'localhost:9200/logstash-$DATE/_search?q=response=200'
```
使用当前日期替换$DATE(YYYY.MM.DD格式).因为我们的例子只有一条200 HTTP response,所以查询命中一条结果:
```
{"took":2,
"timed_out":false,
"_shards":{"total":5,
"successful":5,
"failed":0},
"hits":{"total":1,
"max_score":1.5351382,
"hits":[{"_index":"logstash-2015.07.30",
"_type":"logs",
"_id":"AU7gqOky1um3U6ZomFaF",
"_score":1.5351382,
"_source":{"message":"83.149.9.216 - - [04/Jan/2015:05:13:45 +0000] \"GET /presentations/logstash-monitorama-2013/images/frontend-response-codes.png HTTP/1.1\" 200 52878 \"http://semicomplete.com/presentations/logstash-monitorama-2013/\" \"Mozilla/5.0 (Macintosh; Intel Mac OS X 10_9_1) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/32.0.1700.77 Safari/537.36\"",
"@version":"1",
"@timestamp":"2015-07-30T20:30:41.265Z",
"host":"localhost",
"path":"/path/to/logstash-tutorial-dataset",
"clientip":"83.149.9.216",
"ident":"-",
"auth":"-",
"timestamp":"04/Jan/2015:05:13:45 +0000",
"verb":"GET",
"request":"/presentations/logstash-monitorama-2013/images/frontend-response-codes.png",
"httpversion":"1.1",
"response":"200",
"bytes":"52878",
"referrer":"\"http://semicomplete.com/presentations/logstash-monitorama-2013/\"",
"agent":"\"Mozilla/5.0 (Macintosh; Intel Mac OS X 10_9_1) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/32.0.1700.77 Safari/537.36\""
}
}]
}
}
```
尝试一个基于ip地址的地理信息查询:
```
curl -XGET 'localhost:9200/logstash-$DATE/_search?q=geoip.city_name=Buffalo'
```
使用当前日期替换$DATE(YYYY.MM.DD格式).
只有一条log记录是来自于城市Buffalo,所以查询的结果是:
```
{"took":3,
"timed_out":false,
"_shards":{
"total":5,
"successful":5,
"failed":0},
"hits":{"total":1,
"max_score":1.03399,
"hits":[{"_index":"logstash-2015.07.31",
"_type":"logs",
"_id":"AU7mK3CVSiMeBsJ0b_EP",
"_score":1.03399,
"_source":{
"message":"108.174.55.234 - - [04/Jan/2015:05:27:45 +0000] \"GET /?flav=rss20 HTTP/1.1\" 200 29941 \"-\" \"-\"",
"@version":"1",
"@timestamp":"2015-07-31T22:11:22.347Z",
"host":"localhost",
"path":"/path/to/logstash-tutorial-dataset",
"clientip":"108.174.55.234",
"ident":"-",
"auth":"-",
"timestamp":"04/Jan/2015:05:27:45 +0000",
"verb":"GET",
"request":"/?flav=rss20",
"httpversion":"1.1",
"response":"200",
"bytes":"29941",
"referrer":"\"-\"",
"agent":"\"-\"",
"geoip":{
"ip":"108.174.55.234",
"country_code2":"US",
"country_code3":"USA",
"country_name":"United States",
"continent_code":"NA",
"region_name":"NY",
"city_name":"Buffalo",
"postal_code":"14221",
"latitude":42.9864,
"longitude":-78.7279,
"dma_code":514,
"area_code":716,
"timezone":"America/New_York",
"real_region_name":"New York",
"location":[-78.7279,42.9864]
}
}
}]
}
}
```
## 使用多个输入和输出插件
你需要管理的信息通常来自于不同的源,你的案例也可能将数据送到多个目标。Logstash 管道需要使用多个输出或者输出插件来满足这些需求。
这个例子创建一个Logstash 管道来从Twitter feed 和Filebeat client获取输入信息,然后将这些信息送到Elasticsearch集群,同时写入一个文件.
### 从Twitter feed读数据
要添加一个Twitter feed,你需要一系列的条件:
* 一个consumer key :能唯一标识你的Twitter app(在这个例子中是Logstash)
* 一个consumer secret, 作为你的Twitter app的密码
* 一个或多个 keywords用于查询 feed.
* 一个oauth token,标志使用这个app的Twitter账号
* 一个 oauth token secret, 作为 Twitter 账号密码.
访问 https://dev.twitter.com/apps 注册一个Twitter 账号 并且生成你的 consumer key 和 secret, 还有你的 OAuth token 和 secret.
把如下的内容追加到first-pipeline.conf文件的input段:
```
twitter {
consumer_key =>
consumer_secret =>
keywords =>
oauth_token =>
oauth_token_secret =>
}
```
### Filebeat Client
filebeat 客户端是一个轻量级的资源友好的文件服务端日志收集工具,并且把这些日志送到Logstash实例来处理。filebeat 客户端使用安全Beats协议与Logstash实例通信。lumberjack协议被设计成可靠的低延迟的.Filebeat 使用托管源数据的计算机的计算资源,并且Beats input插件减少了 Logstash实例的资源需求.
>注意
一个典型的用法是,Filebeat运行在,与Logstash实例分开的一个单独的计算机上.为了教学方便,我们将Logstash 和 Filebeat运行在同一个计算机上.
默认的Logstash配置包含Beats input插件,它被设计成资源友好的.安装Filebeat在数据源所在的机器,从[Filebeat产品页](https://www.elastic.co/downloads/beats/filebeat)下载合适的包.
创建一个和这个例子类似的配置文件:
```
filebeat:
prospectors:
-
paths:
- "/path/to/sample-log"
fields:
type: syslog
output:
logstash:
hosts: ["localhost:5043"]
tls:
certificate: /path/to/ssl-certificate.crt
certificate_key: /path/to/ssl-certificate.key
certificate_authorities: /path/to/ssl-certificate.crt
timeout: 15
```
* Filebeat 处理的一个或多个文件的路径
* Logstash实例的SSL证书路径
以filebeat.yml为名,保存配置文件
配置你的Logstash实例,加入Filebeat input插件.将如下内容添加到first-pipeline.conf文件的input段中.
```
beats {
port => "5043"
ssl => true
ssl_certificate => "/path/to/ssl-cert"
ssl_key => "/path/to/ssl-key"
}
```
* Logstash实例验证自己到Filebeat的SSL证书的路径。
* SSL证书的key的保存路径。
### 将Logstash数据写入一个文件
可以通过 file output插件配置让Logstash管道将数据直接写入一个文件.
将如下内容添加到first-pipeline.conf文件的output段,完成Logstash实例配置:
```
file {
path => /path/to/target/file
}
```
### 输出数据到多个 Elasticsearch节点
输出数据到多个 Elasticsearch节点,可以减轻某个给定节点的资源需求.当某一个节点挂掉的时候,提供了数据记录输入Elasticsearch的更多选择.
修改你的first-pipeline.conf文件的output段,这样就可以让你的Logstash实例将数据写入到多个Elasticsearch节点.
```
output {
elasticsearch {
hosts => ["IP Address 1:port1", "IP Address 2:port2", "IP Address 3"]
}
}
```
host参数要使用三个非master的Elasticsearch集群节点ip.当host参数列出多个ip地址时,Logstash负载均衡请求到列表中的地址.另外Elasticsearch的默认端口是9200,在上面的配置中可以省略.
### 测试管道
此时,你的 first-pipeline.conf 文件应该是下面的这样:
```
input {
twitter {
consumer_key =>
consumer_secret =>
keywords =>
oauth_token =>
oauth_token_secret =>
}
beats {
port => "5043"
ssl => true
ssl_certificate => "/path/to/ssl-cert"
ssl_key => "/path/to/ssl-key"
}
}
output {
elasticsearch {
hosts => ["IP Address 1:port1", "IP Address 2:port2", "IP Address 3"]
}
file {
path => /path/to/target/file
}
}
```
Logstash从你配置的Twitter feed消费数据, 从 Filebeat获取数据, 并且将这些数据索引到Elasticsearch的三个节点,并且同时将数据写入一个文件.
在数据源节点上,使用如下命令启动Filebeat:
```
sudo ./filebeat -e -c filebeat.yml -d "publish"
```
Filebeat尝试连接5403端口,知道你的Logstash的Beats 插件生效,这个端口都没有响应,所以现在可以将任何的连接异常看作是正常的.
校验你的配置,使用如下命令:
```
bin/logstash -f first-pipeline.conf --configtest
```
使用--configtest 选项解析你的配置文件并报告错误.当配置文件通过检查之后,用如下命令启动Logstash:
```
bin/logstash -f first-pipeline.conf
```
使用grep命令在你的目标文件中查找,验证Mozilla信息的存在.
```
grep Mozilla /path/to/target/file
```
在你的Elasticsearch集群中运行Elasticsearch查询,验证同样的信息也存在!
```
curl -XGET 'localhost:9200/logstash-2015.07.30/_search?q=agent=Mozilla'
```