多应用+插件架构,代码干净,二开方便,首家独创一键云编译技术,文档视频完善,免费商用码云13.8K 广告
## 一、下载软件 [https://www.elastic.co/cn/downloads/logstash](https://www.elastic.co/cn/downloads/logstash) ## 二、安装部署 ### 2.1. 准备patterns文件 到logstash的目录下,创建`patterns`目录 ~~~ mkdir patterns ~~~ 在`patterns`目录里新建一个`java`的patterns文件 ![](https://img.kancloud.cn/b0/8f/b08fe45b9c545567b2a60d6be73fd1a0_723x108.png) 内容如下: ~~~ # user-center MYAPPNAME [0-9a-zA-Z_-]* # RMI TCP Connection(2)-127.0.0.1 MYTHREADNAME ([0-9a-zA-Z._-]|\(|\)|\s)* ~~~ > 就是一个名字叫做**java**的文件,不需要文件后缀 ### 2.2. 创建配置文件 到logstash的config目录下创建`logstash.conf`文件 > 需要修改以下地方 > > 1. filter 块中的`patterns_dir`路径 > 2. output 块中的密码 **vim logstash.conf** ~~~ input { beats { port => 5044 } } filter { if [type] == "syslog" { grok { match => { "message" => "%{SYSLOGTIMESTAMP:syslog_timestamp} %{SYSLOGHOST:syslog_hostname} %{DATA:syslog_program}(?:\[%{POSINT:syslog_pid}\])?: %{GREEDYDATA:syslog_message}" } add_field => [ "received_at", "%{@timestamp}" ] add_field => [ "received_from", "%{host}" ] } syslog_pri { } date { match => [ "syslog_timestamp", "MMM d HH:mm:ss", "MMM dd HH:mm:ss" ] } } if [fields][docType] == "sys-log" { grok { patterns_dir => ["/opt/logstash-7.6.1/patterns"] match => { "message" => "\[%{NOTSPACE:appName}:%{IP:serverIp}:%{NOTSPACE:serverPort}\] %{TIMESTAMP_ISO8601:logTime} %{LOGLEVEL:logLevel} %{WORD:pid} \[%{MYAPPNAME:traceId},%{MYAPPNAME:spanId},%{MYAPPNAME:parentId}\] \[%{MYTHREADNAME:threadName}\] %{NOTSPACE:classname} %{GREEDYDATA:message}" } overwrite => ["message"] } date { match => ["logTime","yyyy-MM-dd HH:mm:ss.SSS Z"] } date { match => ["logTime","yyyy-MM-dd HH:mm:ss.SSS"] target => "timestamp" locale => "en" timezone => "+08:00" } mutate { remove_field => "logTime" remove_field => "@version" remove_field => "host" remove_field => "offset" } } if [fields][docType] == "point-log" { grok { patterns_dir => ["/opt/logstash-7.6.1/patterns"] match => { "message" => "%{TIMESTAMP_ISO8601:logTime}\|%{MYAPPNAME:appName}\|%{WORD:resouceid}\|%{MYAPPNAME:type}\|%{GREEDYDATA:object}" } } kv { source => "object" field_split => "&" value_split => "=" } date { match => ["logTime","yyyy-MM-dd HH:mm:ss.SSS Z"] } date { match => ["logTime","yyyy-MM-dd HH:mm:ss.SSS"] target => "timestamp" locale => "en" timezone => "+08:00" } mutate { remove_field => "message" remove_field => "logTime" remove_field => "@version" remove_field => "host" remove_field => "offset" } } if [fields][docType] == "mysqlslowlogs" { grok { match => [ "message", "^#\s+User@Host:\s+%{USER:user}\[[^\]]+\]\s+@\s+(?:(?<clienthost>\S*) )?\[(?:%{IP:clientip})?\]\s+Id:\s+%{NUMBER:id}\n# Query_time: %{NUMBER:query_time}\s+Lock_time: %{NUMBER:lock_time}\s+Rows_sent: %{NUMBER:rows_sent}\s+Rows_examined: %{NUMBER:rows_examined}\nuse\s(?<dbname>\w+);\nSET\s+timestamp=%{NUMBER:timestamp_mysql};\n(?<query_str>[\s\S]*)", "message", "^#\s+User@Host:\s+%{USER:user}\[[^\]]+\]\s+@\s+(?:(?<clienthost>\S*) )?\[(?:%{IP:clientip})?\]\s+Id:\s+%{NUMBER:id}\n# Query_time: %{NUMBER:query_time}\s+Lock_time: %{NUMBER:lock_time}\s+Rows_sent: %{NUMBER:rows_sent}\s+Rows_examined: %{NUMBER:rows_examined}\nSET\s+timestamp=%{NUMBER:timestamp_mysql};\n(?<query_str>[\s\S]*)", "message", "^#\s+User@Host:\s+%{USER:user}\[[^\]]+\]\s+@\s+(?:(?<clienthost>\S*) )?\[(?:%{IP:clientip})?\]\n# Query_time: %{NUMBER:query_time}\s+Lock_time: %{NUMBER:lock_time}\s+Rows_sent: %{NUMBER:rows_sent}\s+Rows_examined: %{NUMBER:rows_examined}\nuse\s(?<dbname>\w+);\nSET\s+timestamp=%{NUMBER:timestamp_mysql};\n(?<query_str>[\s\S]*)", "message", "^#\s+User@Host:\s+%{USER:user}\[[^\]]+\]\s+@\s+(?:(?<clienthost>\S*) )?\[(?:%{IP:clientip})?\]\n# Query_time: %{NUMBER:query_time}\s+Lock_time: %{NUMBER:lock_time}\s+Rows_sent: %{NUMBER:rows_sent}\s+Rows_examined: %{NUMBER:rows_examined}\nSET\s+timestamp=%{NUMBER:timestamp_mysql};\n(?<query_str>[\s\S]*)" ] } date { match => ["timestamp_mysql","yyyy-MM-dd HH:mm:ss.SSS","UNIX"] } date { match => ["timestamp_mysql","yyyy-MM-dd HH:mm:ss.SSS","UNIX"] target => "timestamp" } mutate { convert => ["query_time", "float"] convert => ["lock_time", "float"] convert => ["rows_sent", "integer"] convert => ["rows_examined", "integer"] remove_field => "message" remove_field => "timestamp_mysql" remove_field => "@version" } } } output { if [fields][docType] == "sys-log" { elasticsearch { hosts => ["http://localhost:9200"] user => "elastic" password => "qEnNfKNujqNrOPD9q5kb" index => "sys-log-%{+YYYY.MM.dd}" } } if [fields][docType] == "point-log" { elasticsearch { hosts => ["http://localhost:9200"] user => "elastic" password => "qEnNfKNujqNrOPD9q5kb" index => "point-log-%{+YYYY.MM.dd}" routing => "%{type}" } } if [fields][docType] == "mysqlslowlogs" { elasticsearch { hosts => ["http://localhost:9200"] user => "elastic" password => "qEnNfKNujqNrOPD9q5kb" index => "mysql-slowlog-%{+YYYY.MM.dd}" } } } ~~~ ### 2.3. 修改logstash配置 修改文件`config/logstash.yml`把`http.host`改为以下内容 ~~~ http.host: 0.0.0.0 ~~~ ### 2.4. 后台运行 ~~~ nohup bin/logstash -f config/logstash.conf & ~~~