ThinkChat2.0新版上线,更智能更精彩,支持会话、画图、阅读、搜索等,送10W Token,即刻开启你的AI之旅 广告
LogStash管道通常有一个或多个输入,过滤,输出插件。本节中的场景通过Logstash配置文件来指定这些插件,并讨论了每个插件在做什么。 Logstash配置文件定义了Logstash 管道,当你使用-f <path/to/file>启动一个Logstash实例,其实使用了一个配置文件定义了管道实例。 一个Logstash管道有两个必备元素,输入和输出,一个可选元素,过滤器。input插件从一个源摄取数据,filter插件按照你指定的方式修改数据,output插件写出数据到一个目标数据库。 ![](https://box.kancloud.cn/2016-05-26_57465ebc058e6.png) 下面是一个管道配置的模板: ``` # #是注释,使用注释描述配置 input { } # 该部分被注释,表示filter是可选的 # filter { # # } output { } ``` 这个模板是不具备功能的,因为输入和输出部分没有定义的任何有效的选项。 将模板粘贴到你的Logstash根目录,命名为first-pipeline.conf。 ## 解析Apache日志输出到Elasticsearch 这个例子创建一个Logstash管道,使用Apache web logs 作为输入,解析这些日志到特定的命名字段,并且输出这些解析数据到ElashticSearch集群 你可以下载样例数据,[这里](https://download.elastic.co/demos/logstash/gettingstarted/logstash-tutorial.log.gz) ### 配置Logstash用于文件输入 启动Logstash管道,使用file input插件配置Logstash实例。 修改first-pipeline.conf 添加如下内容: ``` input { file { path => "/path/to/logstash-tutorial.log" start_position => beginning ignore_older => 0 } } ``` * file input 插件的默认行为是监视文件中的末尾新内容,和Unix中的tail -f 命令异曲同工。改变这种默认行为的方式是指定Logstash从文件的什么位置开始处理文件。 * 文件输入插件的默认行为是忽略最后时间距今超过86400秒的文件。要更改这种默认行为,并处理tutorial file(日期距今大于1天的文件),我们需要指定不能忽视旧文件。 用实际的logstash-tutorial.log日志文件目录替换 /path/to/。 ### 用Grok 过滤器插件解析web logs gork 过滤器插件是Logstash的几个默认可用插件之一。关于如何管理Logstash 插件的更多细节请参考:[reference documentation](https://www.elastic.co/guide/en/logstash/current/working-with-plugins.html) grok 过滤器插件在输入日志中查找指定的模式,配置文件中需要指定哪些模式的识别是你的案例所感兴趣的。一个代表性的web server 日志文件行信息,如下: ``` 83.149.9.216 - - [04/Jan/2015:05:13:42 +0000] "GET /presentations/logstash-monitorama-2013/images/kibana-search.png HTTP/1.1" 200 203023 "http://semicomplete.com/presentations/logstash-monitorama-2013/" "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_9_1) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/32.0.1700.77 Safari/537.36" ``` 行首的IP地址信息很容易被识别,括号中的时间戳也是如此。在此教程中,使用```%{COMBINEDAPACHELOG} grok``` 模式,它的行信息结构有如下的一些字段。 |Information|Field Name| |-|-| |IP Address|clientip| |User ID|ident| |User Authentication|auth| |timestamp|timestamp| |HTTP Verb|verb| |Request body|request| |HTTP Version|httpversion| |HTTP Status Code|response| |Bytes served|bytes| |Referrer URL|referrer| |User agent|agent| 编写 first-pipeline.conf添加如下的内容: ``` filter { grok { match => { "message" => "%{COMBINEDAPACHELOG}"} } } ``` 过滤器处理之后,一个行信息是如下的JSON的结构: ``` { "clientip" : "83.149.9.216", "ident" : , "auth" : , "timestamp" : "04/Jan/2015:05:13:42 +0000", "verb" : "GET", "request" : "/presentations/logstash-monitorama-2013/images/kibana-search.png", "httpversion" : "HTTP/1.1", "response" : "200", "bytes" : "203023", "referrer" : "http://semicomplete.com/presentations/logstash-monitorama-2013/", "agent" : "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_9_1) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/32.0.1700.77 Safari/537.36" } ``` ### 索引数据到ElasticSearch集群 Now that the web logs are broken down into specific fields, the Logstash pipeline can index the data into an Elasticsearch cluster. Edit the first-pipeline.conf file to add the following text after the input section: 现在,网络日志被分解成具体的字段, 使用Logstash管道可以将数据索引到Elasticsearch集群。编辑first-pipeline.conf 文件,在input 段之后添加以下文字: ``` output { elasticsearch { } } ``` 用这样的配置, Logstash使用http协议连接到Elasticsearch 。上面的例子假设Logstash和Elasticsearch运行在同一实例中。您可以使用主机配置像```hosts => "es-machine:9092"```指定远程Elasticsearch实例。 ### 使用geoip插件丰富你的数据内容 除了解析日志数据,filter插件可以从现有数据基础上补充信息。例如: geoip插件查找IP地址,从而导出地址地理位置信息,并将该位置信息发送给日志。 使用geoip filter插件配置您的Logstash,如:通过添加以下行到first-pipeline.conf文件的filter部分: ``` geoip { source => "clientip" } ``` 插件geoip的配置需要的数据已经被定义为独立的字段中的数据。确保在配置文件中,geoip 配置段在grok配置段之后。 需要指定包含IP地址的字段的名称。在本例中,字段名称是clientip(grok插件字段) 。 ### 测试一下 此时此刻,你的first-pipeline.conf文件有input,filter和output段,正确的配置看起来是这样的: ``` input { file { path => "/Users/palecur/logstash-1.5.2/logstash-tutorial-dataset" start_position => beginning } } filter { grok { match => { "message" => "%{COMBINEDAPACHELOG}"} } geoip { source => "clientip" } } output { elasticsearch {} stdout {} } ``` 测试配置语法的正确性,使用下面的命令: ``` bin/logstash -f first-pipeline.conf --configtest ``` 使用--configtest 选项解析你的配置文件并报告错误.当配置文件通过检查之后,用如下命令启动Logstash: ``` bin/logstash -f first-pipeline.conf ``` 基于gork filter插件的字段索引,在Elasticsearch运行一个测试查询, ``` curl -XGET 'localhost:9200/logstash-$DATE/_search?q=response=200' ``` 使用当前日期替换$DATE(YYYY.MM.DD格式).因为我们的例子只有一条200 HTTP response,所以查询命中一条结果: ``` {"took":2, "timed_out":false, "_shards":{"total":5, "successful":5, "failed":0}, "hits":{"total":1, "max_score":1.5351382, "hits":[{"_index":"logstash-2015.07.30", "_type":"logs", "_id":"AU7gqOky1um3U6ZomFaF", "_score":1.5351382, "_source":{"message":"83.149.9.216 - - [04/Jan/2015:05:13:45 +0000] \"GET /presentations/logstash-monitorama-2013/images/frontend-response-codes.png HTTP/1.1\" 200 52878 \"http://semicomplete.com/presentations/logstash-monitorama-2013/\" \"Mozilla/5.0 (Macintosh; Intel Mac OS X 10_9_1) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/32.0.1700.77 Safari/537.36\"", "@version":"1", "@timestamp":"2015-07-30T20:30:41.265Z", "host":"localhost", "path":"/path/to/logstash-tutorial-dataset", "clientip":"83.149.9.216", "ident":"-", "auth":"-", "timestamp":"04/Jan/2015:05:13:45 +0000", "verb":"GET", "request":"/presentations/logstash-monitorama-2013/images/frontend-response-codes.png", "httpversion":"1.1", "response":"200", "bytes":"52878", "referrer":"\"http://semicomplete.com/presentations/logstash-monitorama-2013/\"", "agent":"\"Mozilla/5.0 (Macintosh; Intel Mac OS X 10_9_1) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/32.0.1700.77 Safari/537.36\"" } }] } } ``` 尝试一个基于ip地址的地理信息查询: ``` curl -XGET 'localhost:9200/logstash-$DATE/_search?q=geoip.city_name=Buffalo' ``` 使用当前日期替换$DATE(YYYY.MM.DD格式). 只有一条log记录是来自于城市Buffalo,所以查询的结果是: ``` {"took":3, "timed_out":false, "_shards":{ "total":5, "successful":5, "failed":0}, "hits":{"total":1, "max_score":1.03399, "hits":[{"_index":"logstash-2015.07.31", "_type":"logs", "_id":"AU7mK3CVSiMeBsJ0b_EP", "_score":1.03399, "_source":{ "message":"108.174.55.234 - - [04/Jan/2015:05:27:45 +0000] \"GET /?flav=rss20 HTTP/1.1\" 200 29941 \"-\" \"-\"", "@version":"1", "@timestamp":"2015-07-31T22:11:22.347Z", "host":"localhost", "path":"/path/to/logstash-tutorial-dataset", "clientip":"108.174.55.234", "ident":"-", "auth":"-", "timestamp":"04/Jan/2015:05:27:45 +0000", "verb":"GET", "request":"/?flav=rss20", "httpversion":"1.1", "response":"200", "bytes":"29941", "referrer":"\"-\"", "agent":"\"-\"", "geoip":{ "ip":"108.174.55.234", "country_code2":"US", "country_code3":"USA", "country_name":"United States", "continent_code":"NA", "region_name":"NY", "city_name":"Buffalo", "postal_code":"14221", "latitude":42.9864, "longitude":-78.7279, "dma_code":514, "area_code":716, "timezone":"America/New_York", "real_region_name":"New York", "location":[-78.7279,42.9864] } } }] } } ``` ## 使用多个输入和输出插件 你需要管理的信息通常来自于不同的源,你的案例也可能将数据送到多个目标。Logstash 管道需要使用多个输出或者输出插件来满足这些需求。 这个例子创建一个Logstash 管道来从Twitter feed 和Filebeat client获取输入信息,然后将这些信息送到Elasticsearch集群,同时写入一个文件. ### 从Twitter feed读数据 要添加一个Twitter feed,你需要一系列的条件: * 一个consumer key :能唯一标识你的Twitter app(在这个例子中是Logstash) * 一个consumer secret, 作为你的Twitter app的密码 * 一个或多个 keywords用于查询 feed. * 一个oauth token,标志使用这个app的Twitter账号 * 一个 oauth token secret, 作为 Twitter 账号密码. 访问 https://dev.twitter.com/apps 注册一个Twitter 账号 并且生成你的 consumer key 和 secret, 还有你的 OAuth token 和 secret. 把如下的内容追加到first-pipeline.conf文件的input段: ``` twitter { consumer_key => consumer_secret => keywords => oauth_token => oauth_token_secret => } ``` ### Filebeat Client filebeat 客户端是一个轻量级的资源友好的文件服务端日志收集工具,并且把这些日志送到Logstash实例来处理。filebeat 客户端使用安全Beats协议与Logstash实例通信。lumberjack协议被设计成可靠的低延迟的.Filebeat 使用托管源数据的计算机的计算资源,并且Beats input插件减少了 Logstash实例的资源需求. >注意 一个典型的用法是,Filebeat运行在,与Logstash实例分开的一个单独的计算机上.为了教学方便,我们将Logstash 和 Filebeat运行在同一个计算机上. 默认的Logstash配置包含Beats input插件,它被设计成资源友好的.安装Filebeat在数据源所在的机器,从[Filebeat产品页](https://www.elastic.co/downloads/beats/filebeat)下载合适的包. 创建一个和这个例子类似的配置文件: ``` filebeat: prospectors: - paths: - "/path/to/sample-log" fields: type: syslog output: logstash: hosts: ["localhost:5043"] tls: certificate: /path/to/ssl-certificate.crt certificate_key: /path/to/ssl-certificate.key certificate_authorities: /path/to/ssl-certificate.crt timeout: 15 ``` * Filebeat 处理的一个或多个文件的路径 * Logstash实例的SSL证书路径 以filebeat.yml为名,保存配置文件 配置你的Logstash实例,加入Filebeat input插件.将如下内容添加到first-pipeline.conf文件的input段中. ``` beats { port => "5043" ssl => true ssl_certificate => "/path/to/ssl-cert" ssl_key => "/path/to/ssl-key" } ``` * Logstash实例验证自己到Filebeat的SSL证书的路径。 * SSL证书的key的保存路径。 ### 将Logstash数据写入一个文件 可以通过 file output插件配置让Logstash管道将数据直接写入一个文件. 将如下内容添加到first-pipeline.conf文件的output段,完成Logstash实例配置: ``` file { path => /path/to/target/file } ``` ### 输出数据到多个 Elasticsearch节点 输出数据到多个 Elasticsearch节点,可以减轻某个给定节点的资源需求.当某一个节点挂掉的时候,提供了数据记录输入Elasticsearch的更多选择. 修改你的first-pipeline.conf文件的output段,这样就可以让你的Logstash实例将数据写入到多个Elasticsearch节点. ``` output { elasticsearch { hosts => ["IP Address 1:port1", "IP Address 2:port2", "IP Address 3"] } } ``` host参数要使用三个非master的Elasticsearch集群节点ip.当host参数列出多个ip地址时,Logstash负载均衡请求到列表中的地址.另外Elasticsearch的默认端口是9200,在上面的配置中可以省略. ### 测试管道 此时,你的 first-pipeline.conf 文件应该是下面的这样: ``` input { twitter { consumer_key => consumer_secret => keywords => oauth_token => oauth_token_secret => } beats { port => "5043" ssl => true ssl_certificate => "/path/to/ssl-cert" ssl_key => "/path/to/ssl-key" } } output { elasticsearch { hosts => ["IP Address 1:port1", "IP Address 2:port2", "IP Address 3"] } file { path => /path/to/target/file } } ``` Logstash从你配置的Twitter feed消费数据, 从 Filebeat获取数据, 并且将这些数据索引到Elasticsearch的三个节点,并且同时将数据写入一个文件. 在数据源节点上,使用如下命令启动Filebeat: ``` sudo ./filebeat -e -c filebeat.yml -d "publish" ``` Filebeat尝试连接5403端口,知道你的Logstash的Beats 插件生效,这个端口都没有响应,所以现在可以将任何的连接异常看作是正常的. 校验你的配置,使用如下命令: ``` bin/logstash -f first-pipeline.conf --configtest ``` 使用--configtest 选项解析你的配置文件并报告错误.当配置文件通过检查之后,用如下命令启动Logstash: ``` bin/logstash -f first-pipeline.conf ``` 使用grep命令在你的目标文件中查找,验证Mozilla信息的存在. ``` grep Mozilla /path/to/target/file ``` 在你的Elasticsearch集群中运行Elasticsearch查询,验证同样的信息也存在! ``` curl -XGET 'localhost:9200/logstash-2015.07.30/_search?q=agent=Mozilla' ```