*一个好的系统,必须要对日志进行收集过滤
这里我们采用分布式日志收集系统*
****kafka+zookpeer+logstash+elasearch+kibana****
kafka+zookeeper集群安装
三个节点安装
~~~
add-apt-repository ppa:webupd8team/java
apt-get update
apt-get install oracle-java8-installer
curl –L http://www-us.apache.org/dist/zookeeper/stable/zookeeper-3.4.12.tar.gz -o /usr/local/zookeeper
cat /usr/local/zookeeper/conf/zoo.cfg
tickTime=2000
initLimit=5
syncLimit=2
dataDir=/data/zookeeper/data
dataLogDir=/data/zookeeper/log
clientPort=2181
server.1= 192.168.11.223:2888:3888
server.2= 192.168.11.224:2888:3888
server.3= 192.168.11.225:2888:3888
mkdir –p /data/zookeeper/{data,log}
~~~
三台分别如下
~~~
echo “1” >> /data/zookeeper/data/myid
echo “2” >> /data/zookeeper/data/myid
echo “3” >> /data/zookeeper/data/myid
./zkServer.sh start
~~~
* * * * *
配置kafa
~~~
wget https://www.apache.org/dyn/closer.cgi?path=/kafka/1.1.0/kafka_2.11-1.1.0.tgz
tar xf kafka_2.11-1.1.0.tgz –C /usr/local/
修改三台kafka配置/usr/local/kafka/config/server.properties
broker.id=1
listeners=PLAINTEXT://192.168.11.223:9092
zookeeper.connect=192.168.11.223:2181,192.168.11.224:2181,192.168.11.225:2181
broker.id=2
listeners=PLAINTEXT://192.168.11.224:9092
zookeeper.connect=192.168.11.223:2181,192.168.11.224:2181,192.168.11.225:2181
broker.id=3
listeners=PLAINTEXT://192.168.11.225:9092
zookeeper.connect=192.168.11.223:2181,192.168.11.224:2181,192.168.11.225:2181
nohup ./bin/kafka-server-start.sh config/server.properties &
~~~
* * * * *
安装logstash+elasticsearch集群+kibana
~~~
mv /var/lib/{elasticsearch,logstash} /data/
rpm –ivh https://artifacts.elastic.co/downloads/logstash/logstash-6.3.0.rpm
cat /etc/logstash/logstash.yml
path.data: /data/logstash
path.config: /etc/logstash/conf.d/*.conf
path.logs: /var/log/logstash
~~~
定义日志格式
~~~
vi /etc/logstash/conf.d/nginxlog.conf
input {
kafka {
bootstrap_servers => "192.168.1.206:9092"
topics => "nginxacc"
consumer_threads => 5
codec => "json"
}
}
filter {
ruby {
code => " if event.get('message')
event.set('message', event.get('message').gsub('\x','Xx'))
event.set('message', event.get('message').gsub('\\x','XXx'))
end
"
}
json {
remove_field => "message"
source => "message"
}
mutate {
gsub => ["client", ",.*", ""]
convert => { "size" => "integer" }
convert => { "requesttime" => "float" }
}
geoip {
source => "client"
target => "geoip"
remove_field => "client"
}
useragent {
source => "agent"
target => "user_agent"
remove_field => "agent"
}
}
output {
elasticsearch {
hosts => ["elasticsearch:9200"]
index => "logstash-nginxacc-%{+YYYY.MM.dd}"
}
}
systemctl start logstash
~~~
安装elasticsearch
~~~
rpm -ivh https://artifacts.elastic.co/downloads/elasticsearch/elasticsearch-6.3.0.rpm
cat /etc/elasticsearch/elasticsearch.yml
path.data: /data/elasticsearch
path.logs: /var/log/elasticsearch
network.host: 192.168.11.231
discovery.zen.ping.unicast.hosts: ["192.168.11.231:9300","192.168.11.232:9300","192.168.11.233:9300"]
discovery.zen.minimum_master_nodes: 3
http.cors.enabled: true
http.cors.allow-origin: "*"
systemctl start elasticsearch
~~~
安装kibana
~~~
rpm –ivh https://artifacts.elastic.co/downloads/kibana/kibana-6.3.0-x86_64.rpm
cat /etc/kibana/kibana.yml
server.host: "192.168.11.231"
elasticsearch.url: http://192.168.11.231:9200
systemctl start kibana
~~~