使用logstash收集java、nginx、系统等常见日志

目录

1、使用codec的multiline插件收集java日志... 1

2、收集nginx日志... 2

3、收集系统syslog日志... 3

4、使用fliter的grok模块收集mysql日志... 4

1、使用codec的multiline插件收集java日志

对于采用ELK作为应用日志来说,多行消息的友好展示是必不可少的,否则ELK的价值就大大打折了。要正确的处理多行消息,需使用multiline插件

比如,对于java日志而言,可以使用:

multiline.pattern: '^\['

multiline.negate: true

multiline.match: after

这样,下面的日志就算一个事件了。

input {

file {

path => "/var/log/elasticsearch/chuck-clueser.log"

type => "es-error"

start_position => "beginning"

codec => multiline {

pattern => "^\[" #使用正则表式, 以中括号开头的就是一行日志

negate => true

what => "previous"

}

}

}

output {

if [type] == "es-error" {

elasticsearch {

hosts => ["192.168.100.163:9200"]

index => "es-error-%{+YYYY.MM.dd}"

}

}

}

2、收集nginx日志

使用codec的json插件将日志的域进行分段,使用key-value的方式,使日志格式更清晰,易于搜索,还可以降低cpu的负载

2.1 更改nginx的配置文件的日志格式,使用json

[root@linux-node1 ~]# vim /etc/nginx/nginx.conf #添加日志格式,把自带的格式注释掉

17 http {

18 #log_format main '$remote_addr - $remote_user [$time_local] "$request" '

19 # '$status $body_bytes_sent "$http_referer" '

20 # '"$http_user_agent" "$http_x_forwarded_for"';

21 #access_log /var/log/nginx/access.log main;

22 log_format json '{ "@timestamp": "$time_local", '

23 '"@fields": { '

24 '"remote_addr": "$remote_addr", '

25 '"remote_user": "$remote_user", '

26 '"body_bytes_sent": "$body_bytes_sent", '

27 '"request_time": "$request_time", '

28 '"status": "$status", '

29 '"request": "$request", '

30 '"request_method": "$request_method", '

31 '"http_referrer": "$http_referer", '

32 '"body_bytes_sent":"$body_bytes_sent", '

33 '"http_x_forwarded_for": "$http_x_forwarded_for", '

34 '"http_user_agent": "$http_user_agent" } }';

35 access_log /var/log/nginx/access_json.log json;

[root@linux-node1 ~]# nginx -t #检查配置文件

[root@linux-node1 ~]# systemctl start nginx

日志格式如下

2.2使用logstash将nginx访问日志收集起来

[root@linux-node1 ~]# cat log_nginx.conf 4、

input {

file {

path => "/var/log/nginx/access_json.log"

codec => "json"

start_position => "beginning"

type => "nginx-log"

}

}

output {

elasticsearch {

hosts => ["http://192.168.100.163:9200"]

index => "nginx-%{+YYY.MM.dd}"

}

}

[root@linux-node1 ~]# /usr/local/logstash/bin/logstash -f log_nginx.conf

3、收集系统syslog日志

[root@linux-node1 ~]# vim syslog.conf

input {

syslog {

type => "system-syslog"

#绑定个ip,监听个514端口,启动后,别的机器可以通过网络把日志发过来

host => "192.168.100.161"

port => "514"

}

}

output {

elasticsearch {

hosts => ["192.168.100.161:9200"]

index => "system-syslog-%{+YYYY.MM.dd}"

}

}

[root@linux-node1 ~]# /usr/local/logstash/bin/logstash -f syslog.conf

修改服务器的syslog配置文件,把日志信息发送到514端口上

[root@linux-node2 ~]# vim /etc/rsyslog.conf

90 *.* @@192.168.100.161:514

[root@linux-node2 ~]# systemctl restart rsyslog

4、使用fliter的grok模块收集mysql日志

filter插件有很多,在这里就学习grok插件,使用正则匹配日志里的域来拆分。在实际生产中,apache日志不支持jason,就只能使用grok插件匹配;mysql慢查询日志也是无法拆分,只能使用grok正则表达式匹配拆分。

在如下链接,github上有很多写好的grok模板,可以直接引用

https://github.com/logstash-plugins/logstash-patterns-core/blob/master/patterns/grok-patterns

在装好的logstash中也会有grok匹配规则,直接可以引用,路径如下

[root@linux-node1 patterns]# pwd

/usr/local/logstash/vendor/bundle/jruby/1.9/gems/logstash-patterns-core-4.1.0/patterns

4.1日志文件

[root@linux-node1 ~]# cat slow.log

# Time: 160108 15:46:14

# User@Host: dev_select_user[dev_select_user] @ [192.168.97.86] Id: 714519

# Query_time: 1.638396 Lock_time: 0.000163 Rows_sent: 40 Rows_examined: 939155

SET timestamp=1452239174;

SELECT DATE(create_time) as day,HOUR(create_time) as h,round(avg(low_price),2) as low_price

FROM t_actual_ad_num_log WHERE create_time>='2016-01-07' and ad_num<=10

GROUP BY DATE(create_time),HOUR(create_time);

4.2编写slow.conf

[root@linux-node1 ~]# cat mysql-slow.conf

input{

file {

path => "/root/slow.log"

type => "mysql-slow-log"

start_position => "beginning"

codec => multiline {

pattern => "^# User@Host:"

negate => true

what => "previous"

}

}

}

filter {

# drop sleep events

grok {

match => { "message" =>"SELECT SLEEP" }

add_tag => [ "sleep_drop" ]

tag_on_failure => [] # prevent default _grokparsefailure tag on real records

}

if "sleep_drop" in [tags] {

drop {}

}

grok {

match => [ "message", "(?m)^# User@Host: %{USER:user}\[[^\]]+\] @ (?:(?<clienthost>\S*) )?\[(?:%{IP:clientip})?\]\s+Id: %{NUMBER:row_id:int}\s*# Query_time: %{NUMBER:query_time:float}\s+Lock_time: %{NUMBER:lock_time:float}\s+Rows_sent: %{NUMBER:rows_sent:int}\s+Rows_examined: %{NUMBER:rows_examined:int}\s*(?:use %{DATA:database};\s*)?SET timestamp=%{NUMBER:timestamp};\s*(?<query>(?<action>\w+)\s+.*)\n#\s*" ]

}

date {

match => [ "timestamp", "UNIX" ]

remove_field => [ "timestamp" ]

}

}

output {

stdout{

codec => "rubydebug"

}

}

执行该配置文件,查看grok正则匹配结果