ELK+K8S

2021年08月13日 阅读数:86
这篇文章主要向大家介绍ELK+K8S,主要内容包括基础应用、实用技巧、原理机制等方面,希望对大家有所帮助。

 

 

ELK+K8Scss

 

 

做者html

刘畅java

时间node

2021-07-26linux

 

 

环境: CentOS7.5nginx

主机名称git

IPgithub

软件web

controlnode正则表达式

172.16.1.120

[48G]

kafka环境[zookeeper-3.7.0kafka_2.13-2.8.0jdk1.8.0_45]

ELK环境[nginxtomcatES-7.13.4ElasticHDlogstash-7.13.4filebeat-7.13.4kibana-7.13.4]

slavenode1

172.16.1.121

[24G]

kafka环境[zookeeper-3.7.0kafka_2.13-2.8.0jdk1.8.0_45]

ELK环境[ES-7.13.4logstash-7.13.4]

slavenode2

172.16.1.122

[24G]

kafka环境[zookeeper-3.7.0kafka_2.13-2.8.0jdk1.8.0_45]

ELK环境[ES-7.13.4]

k8s-admin

172.16.1.70

[24G]

K8S控制端

k8s-node1

172.16.1.71

[48G]

k8s slave01

k8s-node2

172.16.1.72

[48G]

k8s slave02

(1) : 本文档不记录zookeeper+kafkak8s的搭建过程。

(2) ELK官方文档: https://www.elastic.co/guide/index.html

(3) ELK下载地址: https://www.elastic.co/downloads/

(4) elasticsearchlogstash自带jdk环境。

(5) elk属于比较消耗资源的服务,不建议采用docker方式部署,docker主要是简化部署,有助于后面的升级。该文档主要采用二进制方式部署,也能够采用rpm包的方式部署。

(6) 测试文件建立

# touch /tmp/test.log && chmod 777 /tmp/test.log

# touch /tmp/prod.log && chmod 777 /tmp/prod.log

# touch /tmp/unknown.log && chmod 777 /tmp/unknown.log

# touch /tmp/out_result.txt && chmod 777 /tmp/out_result.txt

# touch /tmp/filebeat_out.txt && chmod 777 /tmp/filebeat_out.txt

 

 

 

 

目录

1 ELK介绍 1

1.1 需求背景 1

1.2 ELK介绍 1

1.3 ELK架构 1

2 Elasticsearch集群部署 2

2.1 Elasticsearch介绍 2

2.2 安装 2

2.3 Elasticsearch服务加入systemd 3

2.4 图形页面管理ES 5

3 Logstash部署 8

3.1 Logstash介绍 8

3.2 安装 9

3.3 Logstash服务加入systemd 10

3.4 Logstash标准输入输出 11

3.5 输入插件input(输入阶段,从哪里获取日志) 12

3.5.1 经常使用插件 12

3.5.2 通用配置字段 12

3.5.3 输入插件file 13

3.5.4 输入插件Beats 15

3.6 过滤插件filter(过滤阶段,将日志格式化处理) 15

3.6.1 经常使用插件 15

3.6.2 通用配置字段 16

3.6.3 过滤插件json 16

3.6.4 过滤插件kv 17

3.6.5 过滤插件grok 18

3.6.6 过滤插件geoip 22

3.7 输出插件output(将处理完成的日志推送到远程数据库存储) 24

3.7.1 经常使用插件 24

3.7.2 输出插件elasticsearch 24

3.8 条件判断 26

3.8.1 操做符 26

3.8.2 示例 27

4 Filebeat部署 29

4.1 Filebeat介绍 29

4.2 安装 30

4.3 filebeat服务加入systemd 30

4.4 推送日志到Logstash 31

4.5 推送日志到ES 35

5 收集k8s日志 35

5.1 应用程序日志记录方式 35

5.2 logstash配置文件 36

5.3 标准输出方式采集日志 38

5.4 日志文件方式采集日志 40

6 Kibana部署 41

6.1 Kibana介绍 42

6.2 安装 42

6.3 Kibana服务加入systemd 43

6.4 Kibana基本使用 43

7 架构优化 50

7.1 增长数据缓冲队列 50

7.2 将来架构扩展思路 52

7.3 其它优化点 52

8 Filebeat->Logstash->Kafka->Logstash->ES->Kibana 53

8.1 架构图 53

8.2 配置Filebeat->logstash 53

8.3 配置logstash->kafka 57

8.4 配置kafka->es 58

8.5 kibana可视化展现日志 60

9 知识拾遗 62

9.1 elasticsearch单实例部署 62

9.2 elasticsearch集群设置证书验证 64

9.3 kibanalogstash带密码方式访问elasticsearch 68

9.4 filebeat优化 71

9.5 logstash优化 72

9.6 kibana优化 72

9.7 kibana监控的使用 73

9.8 elasticsearch索引分片及副本的设置 81

 


1 ELK介绍

1.1 需求背景

1 业务发展愈来愈庞大,服务器愈来愈多。

2 各类访问日志、应用日志、错误日志量愈来愈多。

3 开发人员排查问题,须要到服务器上查日志,效率低、权限很差控制。

4 运维需实时关注业务访问状况。

1.2 ELK介绍

1 ELK是三个开源软件的缩写,提供一套完整的企业级日志平台解决方案。

 

2 ELK分别是

(1) Elasticsearch # 搜索、分析和存储数据。

(2) Logstash # 采集日志、格式化、过滤,最后将数据推送到Elasticsearch存储。

(3) Kibana # 数据可视化。

 

3 Beats

# 集合了多种单一用途数据采集器,用于实现从边缘机器向LogstashElasticsearch发送数据。是应用最多的,是一个轻量级日志采集器。

1.3 ELK架构

wps1 

2 Elasticsearch集群部署

172.16.1.120121122节点上操做

2.1 Elasticsearch介绍

1 Elasticsearch(简称ES)是一个分布式、RESTful风格的搜索和数据分析引擎,用于集中存储日志数据。

 

2 Elasticsearch术语

(1) Index # 索引是多个文档的集合。

(2) Document  # Index里每条记录称为Document,若干文档构建一个Index

(3) Type # 一个Index能够定义一种或多种类型,将Document逻辑分组。

(4) Field # ES存储的最小单元。

 

3 ES与关系型数据库术语对比

Elasticsearch

关系型数据库

Index

Database

Type

Table

Document

Row

Filed

Colume

2.2 安装

1 下载二进制包

https://artifacts.elastic.co/downloads/elasticsearch/elasticsearch-7.13.4-linux-x86_64.tar.gz

 

2 修改配置文件

# tar -xzf elasticsearch-7.13.4-linux-x86_64.tar.gz

# mv elasticsearch-7.13.4/ /usr/local/elasticsearch/

# mkdir -p /usr/local/elasticsearch/logs

# mkdir -p /usr/local/elasticsearch/data

# useradd -M -s /sbin/nologin elasticsearch

# id elasticsearch

uid=1002(elasticsearch) gid=1002(elasticsearch) groups=1002(elasticsearch)

# chown -R elasticsearch.elasticsearch /usr/local/elasticsearch/

 

# grep "^[a-Z]" /usr/local/elasticsearch/config/elasticsearch.yml

cluster.name: elk-cluster

# 集群名称

node.name: es01

# 集群节点名称,172.16.1.120节点为es01172.16.1.121节点为es02172.16.1.122节点为es03

path.data: /usr/local/elasticsearch/data

# 数据目录

path.logs: /usr/local/elasticsearch/logs

# 日志目录

bootstrap.memory_lock: true

# 锁定物理内存地址,防止es内存和swap进行交换

network.host: 172.16.1.120

# 监听地址,三个节点分别为172.16.1.120172.16.1.121172.16.1.122

http.port: 9200

# 监听地址端口

transport.tcp.port: 9300

# 集群内部节点之间通讯端口

discovery.seed_hosts: ["172.16.1.120", "172.16.1.121", "172.16.1.122"]

# 集群节点列表

cluster.initial_master_nodes: ["172.16.1.120", "172.16.1.121", "172.16.1.122"]

# 首次启动指定的Master节点,这里指定全部node节点,会自动选举

http.cors.enabled: true

http.cors.allow-origin: "*"

# 开启elasticsearch跨域访问功能

2.3 Elasticsearch服务加入systemd

1 调整进程最大打开文件数数量

# ulimit -SHn 65535

# cat >> /etc/security/limits.conf << EOF

    * soft nofile 65535

    * hard nofile 65535

EOF

 

2 调整进程最大虚拟内存区域数量

# echo "vm.max_map_count=262144" >> /etc/sysctl.conf

# sysctl -p

 

3 jvm堆大小设置为大约可用内存的一半

# vim /usr/local/elasticsearch/config/jvm.options

-Xms512m

-Xmx512m

 

4 elasticsearch.service文件

# vim /usr/lib/systemd/system/elasticsearch.service

[Unit]

Description=elasticsearch server

Requires=network.target

After=network.target

[Service]

# 不限制内存大小

LimitMEMLOCK=infinity

# 指定此进程能够打开的最大文件描述符

LimitNOFILE=65535

# 指定最大进程数

LimitNPROC=4096

# 不限制虚拟内存大小

LimitAS=infinity

# 不限制最大文件大小

LimitFSIZE=infinity

# JVM接收到SIGTERM信号时,它将退出,并返回代码143

SuccessExitStatus=143

Type=simple

Environment=JAVA_HOME=/usr/local/elasticsearch/jdk

ExecStart=/usr/local/elasticsearch/bin/elasticsearch

ExecReload=/bin/kill -HUP $MAINPID

KillMode=process

Restart=on-failure

User=elasticsearch

Group=elasticsearch

[Install]

WantedBy=multi-user.target

 

5 启动服务

# systemctl daemon-reload

# systemctl start elasticsearch.service

# systemctl enable elasticsearch.service

 

6 查看集群信息

(1) 查看端口信息

# netstat -tunlp

tcp6       0      0 172.16.1.120:9200       :::*                    LISTEN      3177/java

tcp6       0      0 172.16.1.120:9300       :::*                    LISTEN      3177/java

 

(2) 查看集群状态

1) 查看集群节点信息

[root@controlnode tools]# curl -XGET 'http://172.16.1.120:9200/_cat/nodes?pretty'

172.16.1.120 54 35 2 0.02 0.25 0.24 cdfhilmrstw * es01

172.16.1.121 17 96 5 0.05 0.59 0.51 cdfhilmrstw - es02

172.16.1.122 24 97 5 0.07 0.56 0.49 cdfhilmrstw - es03

 

2) 查询集群健康状态

[root@controlnode tools]# curl -i -XGET http://172.16.1.120:9200/_cluster/health?pretty

wps2 

 

3) 查看指定es节点基本信息

[root@controlnode ~]# curl -XGET '172.16.1.120:9200/?pretty'

wps3 

2.4 图形页面管理ES

172.16.1.120节点上操做

1 下载

推荐插件: ElasticHDcerebroelasticsearch-head,这里使用ElasticHD

https://github.com/360EntSecGroup-Skylar/ElasticHD/releases/download/1.4/elasticHD_linux_amd64.zip

 

2 启动

# unzip -q elasticHD_linux_amd64.zip

# mv ElasticHD /usr/bin/

# nohup ElasticHD -p 172.16.1.120:9800 >/dev/null 2>&1 &

# chmod +x /etc/rc.d/rc.local

# cat >>/etc/rc.local<< EOF

source /etc/profile

nohup ElasticHD -p 172.16.1.120:9800 >/dev/null 2>&1 &

EOF

 

3 访问

URL: http://172.16.1.120:9800/

wps4 

 

4 补充: cerebro的使用

(1) 下载

https://github.com/lmenezes/cerebro/releases/download/v0.9.4/cerebro-0.9.4.zip

 

(2) 安装

# unzip -q cerebro-0.9.4.zip

# mv cerebro-0.9.4/ /usr/local/cerebro/

 

(3) 启动

# nohup /usr/local/cerebro/bin/cerebro -Dhttp.port=9000 >/dev/null 2>&1 &

# chmod +x /etc/rc.d/rc.local

# cat >>/etc/rc.local<< EOF

source /etc/profile

nohup /usr/local/cerebro/bin/cerebro -Dhttp.port=9000 >/dev/null 2>&1 &

EOF

 

# netstat -tunlp | grep 9000

tcp6       0      0 :::9000                 :::*                    LISTEN      2553/java 

 

(4) 访问

URI: http://172.16.1.120:9000

wps5 

概要信息:

wps6 

3 Logstash部署

172.16.1.120节点上操做

3.1 Logstash介绍

Logstash可以将采集日志、格式化、过滤,最后将数据推送到Elasticsearch存储

Logstash不仅是一个input|filter|output的数据流,而是一个input|decode|filter|encode|output的数据流。codec就是用来decodeencode 事件的。因此codec经常使用在inputoutput中,经常使用的codec插件有plainjsonmultiline等。

服务名

logstash input默认解码方式

logstash output默认编码方式

redis

json

json

kafka

plain

plain

file

plain

json_lines

beats

plain

不支持

tcp

line

json

elasticsearch

json

-

rabbitmq

json

json

stdin

line

不支持

stdout

不支持

rubydebug

(1) json

1) 此编解码器可用于解码(经过输入)和编码(经过输出)完整的JSON消息。若是发送的数据是其根处的JSON 数组,则将建立多个事件(每一个元素一个)。若是您正在流式传输由"\n"分隔的JSON消息,请参阅"json_lines"编解码器。

2) 编码将产生一个紧凑的JSON表示(没有行终止符或缩进),若是此编解码器从无效JSON的输入中收到有效负载,则它将回退到纯文本并添加标签"_jsonparsefailure"JSON失败时,有效负载将存储在该message字段中。

3) "charset"默认值为"UTF-8"

4) 解码和编码都是jsonjson数据不添加任何内容。

(2) plain

1) "plain"编解码器用于纯文本,事件之间没有分隔。这主要用于在其传输协议中已经定义了框架的输入和输出(例如zeromqrabbitmqredis等)

2) "charset"默认值为"UTF-8"

(3) json_lines

1) 此编解码器将解码以换行符分隔的流式JSON。编码将发出一个以"@delimiter"注释结尾的JSON字符串若是您的源输入是面向行的JSON,例如redis或文件输入,请不要使用此编解码器。而是使用json编解码器。

2) 此编解码器指望接收换行符终止行的流(字符串)。文件输入将产生一个没有换行符的行串。所以,此编解码器不能与面向行的输入一块儿使用。

3) "charset"默认值为"UTF-8"

4) "delimiter"默认值为 "\n"

(4) line

1) 面向行的文本数据。

2) 解码行为:只会发出整行事件。

3) 编码行为:每一个事件都将与尾随换行符一块儿发出。

4) "charset"默认值为"UTF-8"

4) "delimiter"默认值为 "\n"

(5) rubydebug

rubydebug编解码器将使用 Ruby Amazing Print 库输出您的 Logstash 事件数据。

 

wps7 

(1) Input 输入,输入数据能够是StdinFileTCPRedisSyslog等。

(2) Filter 过滤,将日志格式化。有丰富的过滤插件:Grok正则捕获、Date时间处理、Json编解码

Mutate数据修改等。

(3) Output:输出,输出目标能够是StdoutFileTCPRedisES等。

3.2 安装

1 下载二进制软件包

https://artifacts.elastic.co/downloads/logstash/logstash-7.13.4-linux-x86_64.tar.gz

 

2 修改配置文件

# tar -xzf logstash-7.13.4-linux-x86_64.tar.gz

# mv logstash-7.13.4/ /usr/local/logstash/

# useradd -M -s /sbin/nologin logstash

# id logstash

uid=1003(logstash) gid=1003(logstash) groups=1003(logstash)

# mkdir -p /usr/local/logstash/conf.d/

# mkdir -p /usr/local/logstash/logs/

# chown -R logstash.logstash /usr/local/logstash/

 

# egrep -v "^$|^#" /usr/local/logstash/config/logstash.yml

pipeline:

  batch:

    size: 125

delay: 5

# 管道配置

pipeline.ordered: auto

path.config: /usr/local/logstash/conf.d

# 自定义inputoutput流处理文件路径

config.reload.automatic: false

# 关闭按期检查配置是否修改,并从新加载管道(咱们使用SIGHUP信号手动触发)

config.reload.interval: 3s

# 检查配置是否修改的时间间隔。

http.enabled: true

# 开启http API

http.host: 172.16.1.120

# 监听的IP

http.port: 9600-9700

# 监听的端口

log.level: info

# 日志级别

path.logs: /usr/local/logstash/logs

# 日志路径

 

补充:

logstash默认数据存储目录为: "path.data: LOGSTASH_HOME/data"

3.3 Logstash服务加入systemd

1 logstash.service配置文件

# vim /usr/lib/systemd/system/logstash.service

[Unit]

Description=logstash server

After=network.target

[Service]

SuccessExitStatus=143

Type=simple

Environment=JAVA_HOME=/usr/local/logstash/jdk

ExecStart=/usr/local/logstash/bin/logstash

ExecReload=/bin/kill -HUP $MAINPID

KillMode=process

Restart=on-failure

User=logstash

Group=logstash

[Install]

WantedBy=multi-user.target

 

2 启动服务

# systemctl daemon-reload

# systemctl start logstash.service

# systemctl enable logstash.service

 

注意:

上面启动了logstash服务,但因为logstash服务进程没有可处理的数据,过一会logstash服务会自动中止运行,因此看不到logstash9600端口号。

3.4 Logstash标准输入输出

1 编写配置文件

# cat /usr/local/logstash/conf.d/input_stdin-output_stdout.conf

input{

  stdin{}

}

 

output{

  stdout{

    codec=>rubydebug

  }

}

 

2 验证配置文件是否有语法错误

# /usr/local/logstash/bin/logstash -f /usr/local/logstash/conf.d/input_stdin-output_stdout.conf -t

wps8 

 

3 从配置文件启动logstash

# /usr/local/logstash/bin/logstash -f /usr/local/logstash/conf.d/input_stdin-output_stdout.conf

wps9 

说明:

message # 输入的信息

默认给日志加了以下三个字段:

(1) "@timestamp" # 标记事件发生的时间点,写入es时索引用到的" %{+YYYY.MM.dd} "变量就来自这里。

(2) "host" # 标记事件发生的主机

(3) "@version"

 

4 查看logstash节点基本信息

# curl -XGET '172.16.1.120:9600/?pretty'

wps10 

 

5 logstash命令行参数说明

(1) -f

# 指定配置文件在shell终端启动logstash

logstash -f <file_name>.conf

(2) -t

# 验证配置文件是否有语法错误

logstash -f <file_name>.conf -t

(3) -e

# shell终端启动配置的logstash

logstash -e 'input{stdin{}}output{stdout{codec=>rubydebug}}'

注意:

使用该方式启动logstash,须要先注释掉"/usr/local/logstash/config/logstash.yml"配置文件中的

"path.config:"参数,不然会报以下错误。

ERROR: Settings 'path.config' (-f) and 'config.string' (-e) can't be used simultaneously.

3.5 输入插件input(输入阶段,从哪里获取日志)

3.5.1 经常使用插件

1 stdin # 通常用于调试

2 File

3 Redis

4 beats # 列如filebeat

3.5.2 通用配置字段

1 add_field # 添加一个字段到一个事件,放到事件顶部,通常标记日志来源。例如属于哪一个项目,哪一个应用。值类型是hash

add_field => {

  "project" => "microservice"

  "app" => "product"

}

2 tags # 添加任意数量的标签,用于标记日志的其它属性。例如表名是访问日志仍是错误日志。

值类型是arraytags => ["web","nginx"]

3 type # 为全部输入添加一个字段,例如代表日志的类型。

值类型是stringtype => "access"

3.5.3 输入插件file

file插件用于读取指定日志的文件。

 

1 经常使用字段

(1) path # 日志文件路径,可使用通配符。

值类型是arraypath => ["/tmp/test.log","/tmp/test1.log"]

(2) exclude # 排除采集的日志文件。

(3) discover_interval # 多久检查被监听的目录下是否有新文件,默认15s

(4) sincedb_write_interval # 多久写一次sincedb文件,默认15s

(5) stat_interval # 多久检查被监听文件的状态默认1s

(6) start_position # 指定日志文件从什么位置开始读,默认从结尾开始,通常仅在初次读取文件时起做用,若是文件已被记录在sincedb中,则根据pos

 

注意:

(1) logstash默认从日志文件的末尾开始读取,指定beginning表示从头开始读日志文件,但这也仅限于第一次读取文件时有效,读取完成后会记录日志的位置(.sincedb_xxxx),下次从记录的位置开始读。若是日志被清理致使实际位置点比记录的位置点小,那么logstash会从头开始读取日志。若是想要全部收集的日志文件都从开头读取,先stop logstash,而后删除logstash数据目录下的全部数据,再启动logstash便可(rm -rf /usr/local/logstash/data/*)

(2) logstash收集日志配置文件中设置以下内容,能够实现读取的目标文件未经修改,而仅修改了conf文件,实现每次从新运行logstash都从文件头开始收集日志。若是日志收集完成,重启logstash不想再从头收集日志,把添加的配置去掉,实现重启logstash后从日志末尾开始收集日志。

input {

  file {

    path => ["/tmp/test.log"]

    start_position => "beginning"

    sincedb_path => "/dev/null"

  }

}

 

2 示例: 读取日志文件并输出到文件

(1) 配置文件

# cat /usr/local/logstash/conf.d/input_file-output_file.conf

input {

  file {

    path => "/tmp/test.log"

    exclude => "error.log"

    start_position => "beginning"

    tags => "web"

    tags => "nginx"

    type => "access"

    add_field => {

      "project" => "microservice"

      "app" => "product"

    }

  }

}

 

output {

  file {

    path => "/tmp/out_result.txt"

  }

}

 

(2) 检查配置文件

/usr/local/logstash/bin/logstash -f /usr/local/logstash/conf.d/input_file-out_file.conf -t

……

Configuration OK

……

 

(3) 重启logstash

# systemctl status logstash.service

wps11 

# kill -HUP 3386

查看logstash日志,肯定logstash已经从新启动

# tailf /usr/local/logstash/logs/logstash-plain.log

 

(4) 测试

# echo "input_file-test" >>/tmp/test.log

经过/tmp/out_result.txt文件找到输出日志,经过在线json效验网站将输出内容进行格式化。

https://www.bejson.com/

wps12 

 

(5) 说明

下面的相关测试按照此方式进行,但会省略一些重复的步骤,只贴出配置文件和输出结果。

3.5.4 输入插件Beats

Beats插件接收来自Beats数据采集器发来的数据,例如Filebeat

经常使用字段: host—监听地址,port—监听端口。

 

用法:

# vim input_beats-output_file.conf

input {

  beats {

    host => "0.0.0.0"

    port => 5044

  }

}

 

filter {

}

 

output {

  file {

    path => "/tmp/out_result.txt"

  }

}

3.6 过滤插件filter(过滤阶段,将日志格式化处理)

3.6.1 经常使用插件

jsonkvgrokgeoipdate

3.6.2 通用配置字段

1 add_field # 若是过滤成功,添加一个字段到这个事件。

2 add_tags # 若是过滤成功,添加任意数量的标签到这个事件。

3 remove_field # 若是过滤成功,从这个事件移除任意字段。

4 remove_tag # 若是过滤成功,从这个事件移除任意标签。

3.6.3 过滤插件json

1 说明

JSON插件接收一个json数据,将其展开为Logstash事件中的数据结构,放到事件的顶层。

 

2 经常使用字段

(1) source # 值类型为string,指定要解析的字段,通常是原始消息message字段。

(2) targe t # 值类型为string,将解析的结果放到指定字段,若是不指定,默认在事件的顶层。若是该target字段已经存在,它将被覆盖。

(3) remove_field  # 值类型为array,若是此过滤器成功,则今后事件中删除任意字段。

 

3 示例: 解析HHTP请求日志

(1) 配置文件

# cat /usr/local/logstash/conf.d/filter_json-output_file.conf

input {

  file {

    path => "/tmp/test.log"

    start_position => "beginning"

  }

}

 

filter {

  json {

    source => "message"

  }

}

 

output {

  file {

    path => "/tmp/out_result.txt"

  }

}

 

(2) 收集的日志

echo '{"remote_addr":"223.5.5.5","url":"/index.html","status":"200"}' >>/tmp/test.log

 

wps13 

3.6.4 过滤插件kv

1 说明

kv插件接收一个键值数据,按照指定分割符解析为logstash事件中的数据结构,放到事件顶层。

 

2 经常使用字段

field_split # 指定键值分隔符,默认是空

 

3 示例: 解析URL中的参数

(1) 配置文件

# cat /usr/local/logstash/conf.d/filter_kv-output_file.conf

input {

  file {

    path => "/tmp/test.log"

    start_position => "beginning"

  }

}

 

filter {

  kv {

    field_split => "?&"

  }

}

 

output {

  file {

    path => "/tmp/out_result.txt"

  }

}

 

(2) 收集日志

echo 'www.baidu.com?id=19&name=cctv1&content=show' >>/tmp/test.log

wps14 

3.6.5 过滤插件grok

1 说明

(1) 若是采集的日志格式是非结构化的,能够写正则表达式提取,grok是正则表达式支持的实现。

 

(2) Logstash内置的正则匹配模式,在安装目录下能够看到,路径以下:

vendor/bundle/jruby/2.5.0/gems/logstash-patterns-core-4.3.1/patterns/legacy/grok-patterns

 

(3) 正则匹配模式语法格式

%{SYNTAX:SEMANTIC}

1) SYNTAX # 模式名称,模式文件中的第一列

2) SEMANTIC # 匹配文件的字段名

例如: %{IP:client}

 

(4) 正则符号说明

wps15 

 

2 经常使用字段

1) match # 正则匹配模式

2) patterns_dir # 自定义正则模式文件

 

3 示例

(1) 使用内置正则匹配http请求日志

1) 模拟数据

223.5.5.5 GET /index.html 55632 0.013

2) 正则

%{IP:client} %{WORD:method} %{URIPATHPARAM:request} %{NUMBER:bytes} %{NUMBER:duration}

3) kibana上验证

wps16 

 

(2) 自定义正则匹配http请求日志

1) 自定义匹配规则

CID [0-9]{5,6}

TAG \w+

2) 自定义匹配日志示例1

223.5.5.5 GET /index.html 55632 0.013 123456

%{IP:client} %{WORD:method} %{URIPATHPARAM:request} %{NUMBER:bytes} %{NUMBER:duration} %{CID:cid}

3) 自定义匹配日志示例2

223.5.5.5 GET /index.html 55632 0.013 123456 abcdef

%{IP:client} %{WORD:method} %{URIPATHPARAM:request} %{NUMBER:bytes} %{NUMBER:duration} %{CID:cid} %{TAG:tag}

 

(3) logstash中匹配多格式日志

若是一个日志文件下有多个日志格式怎么办,例如项目新版本添加一个日志字段,须要兼容旧日志匹配使用多模式匹配,写多个正则表达式,只要知足其中一条就能匹配成功。 

注意:

logstash多模式匹配时,正则匹配规则遵循从前日后进行匹配,若是输入的日志字段比较长,而短日志字段匹配正则排在前面,那么会丢失日志字段,因此在排列正则时匹配字段较长的正则排在匹配字段较小的正则以前。

 

1) 配置文件

# mkdir -p /usr/local/logstash/patterns/

# cat /usr/local/logstash/patterns/patterns.conf

CID [0-9]{5,6}

TAG \w+

 

# cat /usr/local/logstash/conf.d/filter_grok-output_file.conf

input {

  file {

    path => "/tmp/test.log"

    start_position => "beginning"

  }

}

 

filter {

  grok {

    patterns_dir => "/usr/local/logstash/patterns"

    # 单模式匹配

    #match => {

    #  "message" => "%{IP:client} %{WORD:method} %{URIPATHPARAM:request} %{NUMBER:bytes} %{NUMBER:duration} %{CID:cid}"

    #}

    # 多模式匹配

    match => [

      "message","%{IP:client} %{WORD:method} %{URIPATHPARAM:request} %{NUMBER:bytes} %{NUMBER:duration} %{CID:cid} %{TAG:tag}",

      "message","%{IP:client} %{WORD:method} %{URIPATHPARAM:request} %{NUMBER:bytes} %{NUMBER:duration} %{CID:cid}"

    ]

  }

}

 

output {

  file {

    path => "/tmp/out_result.txt"

  }

}

 

2) 查看输出

echo '223.5.5.5 GET /index.html 55632 0.013 123456' >>/tmp/test.log

wps17 

 

echo '223.5.5.5 GET /index.html 55632 0.013 123456 abcdef' >>/tmp/test.log

wps18 

3.6.6 过滤插件geoip

1 说明

(1) GeoIP插件根据Maxmind GeoLite2数据库中的数据添加有关IP地址位置信息。使用多模式匹配,写多个正则表达式,只要知足其中一条就能匹配成功。

(2) 下载地址[须要注册登陆才能下载]

https://www.maxmind.com/en/accounts/current/geoip/downloads

wps19 

 

2 经常使用字段

(1) source # 指定要解析的IP字段,结果默认保存到geoip字段

(2) database GeoLite2 # 数据库文件的路径

(3) fields # 保留解析的指定字段

 

3 示例

(1) 配置文件

# tar -xzf GeoLite2-City_20210727.tar.gz

# mv GeoLite2-City_20210727/ /usr/local/logstash/

# cat /usr/local/logstash/conf.d/filter_geoip-output_file.conf

input {

  file {

    path => "/tmp/test.log"

    start_position => "beginning"

  }

}

 

filter {

  grok {

    patterns_dir => "/usr/local/logstash/patterns"

    # 单模式匹配

    match => {

      "message" => "%{IP:client} %{WORD:method} %{URIPATHPARAM:request} %{NUMBER:bytes} %{NUMBER:duration}"

    }

  }

  geoip {

    source => "client"

    database => "/usr/local/logstash/GeoLite2-City_20210727/GeoLite2-City.mmdb"

    # 指定保留geoip解析的字段,国家编码、国家名、省会名、城市名

    fields => ["country_code2", "country_name", "region_name", "city_name"]

    # 保存到geoip字段(默认)

    target => "geoip"

  }

}

 

output {

  file {

    path => "/tmp/out_result.txt"

  }

}

 

(2) 输出结果

echo '223.5.5.5 GET /index.html 55632 0.013' >>/tmp/test.log

 

wps20 

 

# 未指定保留geoip字段内特定内容的geoip字段输出结果。

wps21 

3.7 输出插件output(将处理完成的日志推送到远程数据库存储)

3.7.1 经常使用插件

fileElasticsearch

3.7.2 输出插件elasticsearch

1 说明

Elasticsearch插件将数据推送到ES存储。

 

2 经常使用字段

(1) hosts # 指定ES主机地址。

值类型是arrayhosts => ["172.16.1.120","172.16.1.121","172.16.1.122"]

设置远程实例的主机。若是给定一个数组,它将在hosts参数中指定的主机之间对请求进行负载平衡。

 

(2) index # 指定写入的ES索引名称,通常按日期ec划分。

 

3 示例

(1) 配置文件

# cat /usr/local/logstash/conf.d/input_file-output_es.conf

input {

  file {

    path => "/tmp/test.log"

    start_position => "beginning"

    tags => "web"

    tags => "nginx"

    type => "access"

    add_field => {

      "project" => "microservice"

      "app" => "product"

    }

  }

}

 

output {

  elasticsearch {

    hosts => ["172.16.1.120","172.16.1.121","172.16.1.122"]

    index => "microservice-product-%{+YYYY.MM.dd}"

  }

}

 

(2) 查看输出

# echo "hello elasticsearch" >>/tmp/test.log

wps22 

3.8 条件判断

注意:

(1) /usr/local/logstash/conf.d/目录下自定义的全部日志收集配置文件,实际上全部的配置都在一个文件内,若是logstash在收集日志时不根据日志类型作条件判断,会致使一个input输入同时写入多个output中的状况(若是有的output作了判断,有的output没有作判断,那么没有作判断的output是全部日志的总和)logstash能够同时有多个input和多个output

(2) 如何在一台服务器上起多台logstash实例

1) 方法一

另安装一个logstash实例,修改配置文件,启动logstash便可。多个logstash实例的端口号依次排序为96009601……9700

2) 方法二

使用"/usr/local/logstash/bin/logstash -f /usr/local/logstash/conf.d/logstash1.conf --path.data=/tmp/data1"方式指定配置文件启动logstash

实例," --path.data=/tmp/data1 "表示实例的数据目录(该目录须要本身建立,注意权限),每一个实例的数据目录都不相同,若是不指

定该参数启动logstash,会报' Logstash could not be started because there is already another instance using the configured data directory. If

you wish to run multiple instances, you must change the "path.data" setting. '的错误。

-f  # 覆盖logstash.yml文件中的" path.config: /usr/local/logstash/conf.d "参数。

--path.data  # 覆盖logstash.yml文件中的" path.data: LOGSTASH_HOME/data "参数。

3.8.1 操做符

1 比较操做符

==!=<><=>=

if [type] == " access" {}

# []表明引用的字段值,该值多是单值或是一个列表。

# 单值字段 # type字段、经过field添加的字段,或者其它字段。

# 列表字段 # 好比tags字段。

 

2 正则匹配操做符

=~(匹配正则)!~(不匹配正则)

 

3 成员操做符

in(包含)not in(不包含)

if "nginx " in [tags]{}

 

4 逻辑操做符

and()or()nand(非与)nor(非或)

 

5 一元运算符

!(取反)()(复合表达式)!()(对复合表达式结果取反)

3.8.2 示例

1 典型应用场景

根据日志来源字段写入不一样索引名称。

日志来源字段能够为项目名称、应用名或测试环境、生产环境等。

 

2 配置文件

# cat /usr/local/logstash/conf.d/input_mutate-output_es.conf

input {

  file {

    path => "/tmp/test.log"

    add_field => {

      "log_type" => "test"

    }

    start_position => "beginning"

  }

  file {

    path => "/tmp/prod.log"

    add_field => {

      "log_type" => "prod"

    }

    start_position => "beginning"

  }

  file {

    path => "/tmp/unknown.log"

    add_field => {

      "log_type" => "unknown"

    }

    start_position => "beginning"

  }

}

 

filter {

  if [log_type] in ["test","dev"] {

    mutate {

      add_field => {

        "[@metadata][target_index]" => "test-%{+YYYY.MM}"

      }

    }

  } else if [log_type] == "prod" {

    mutate {

      add_field => {

        "[@metadata][target_index]" => "prod-%{+YYYY.MM.dd}"

      }

    }

  } else {

    mutate {

      add_field => {

        "[@metadata][target_index]" => "unknown-%{+YYYY}"

      }

    }

  }

}

 

output {

  elasticsearch {

    hosts => ["172.16.1.120","172.16.1.121","172.16.1.122"]

    index => "%{[@metadata][target_index]}"

  }

}

 

说明:

Logstash 1.5开始,咱们能够在logstash配置中使用metadatametadata不会在output中被序列化输出,这样咱们即可以在metadata中添加一些临时的中间数据,而不须要去删除它。

 

3 查看输出

# echo "test" >>/tmp/test.log

# echo "prod" >>/tmp/prod.log

# echo "unknown" >>/tmp/unknown.log

wps23 

4 Filebeat部署

172.16.1.120节点上操做

4.1 Filebeat介绍

Filebeat是一个轻量级的日志采集器,将采集的数据推送到LogstashES存储。

wps24 

服务名

filebeat output默认编码方式

redis

json

kafka

json

file

json

elasticsearch

-

logstash

-

4.2 安装

1 安装包下载

(1) 二进制包

https://artifacts.elastic.co/downloads/beats/filebeat/filebeat-7.13.4-linux-x86_64.tar.gz

(2) RPM(生产建议使用)

https://artifacts.elastic.co/downloads/beats/filebeat/filebeat-7.13.4-x86_64.rpm

 

2 解压安装包

# tar -xzf filebeat-7.13.4-linux-x86_64.tar.gz

# mv filebeat-7.13.4-linux-x86_64/ /usr/local/filebeat/

# mkdir -p /usr/local/filebeat/data/

# mkdir -p /usr/local/filebeat/logs/

# useradd -M -s /sbin/nologin filebeat

# id filebeat

uid=1005(filebeat) gid=1005(filebeat) groups=1005(filebeat)

# chown -R filebeat.filebeat /usr/local/filebeat/

4.3 filebeat服务加入systemd

1 filebeat.service配置文件

# cat filebeat.service

[Unit]

Description=filebeat server

After=network.target

[Service]

Type=simple

ExecStart=/usr/local/filebeat/filebeat --path.config /usr/local/filebeat -c filebeat.yml --path.data /usr/local/filebeat/data --path.home /usr/local/filebeat --path.logs /usr/local/filebeat/logs

ExecReload=/bin/kill -HUP $MAINPID

KillMode=control-group

Restart=on-failure

User=filebeat

Group=filebeat

[Install]

WantedBy=multi-user.target

 

2 启动服务

# systemctl daemon-reload

# systemctl start filebeat.service

# systemctl enable filebeat.service

4.4 推送日志到Logstash

注意: 

(1) filebeat日志输入类型能够有多个,输出只能有一个,不然会报以下错误。

Exiting: error unpacking config data: more than one namespace configured accessing 'output' (source:'/usr/local/filebeat/filebeat.yml')

(2) filebeat默认第一次读取文件时从头开始读取,并记录日志的位置,下次从记录的位置开始读,若是日志被清理致使实际位置点比记录的位置点小,那么filebeat会从头开始读取日志。若是想要全部收集的日志文件都从开头读取,先stop filebeat,而后删除

filebeat数据目录下的全部数据,再启动filebeat便可。rm -rf /usr/local/filebeat/data/*

 

1 filebeat配置文件

# cat /usr/local/filebeat/filebeat.yml

filebeat.inputs:

# 配置不一样的输入

- type: log

  # 是否启用该输入配置

  enabled: true

  # 采集的日志文件路径,能够通配

  paths:

    - /tmp/test.log

  # 正则匹配要排除的行,这里以DBG开头的行都过滤掉

  exclude_lines: ['^DBG']

  # 正则匹配要采集的行,这里以ERR/WARN开头的行都采集

  #include_lines: ['^ERR', '^WARN']

  # 排除的文件,默认采集全部

  exclude_files: ['.gz$']

  # 添加标签

  tags: ["web","nginx"]

  # 添加类型,改参数已经在filebeat6.0时被取消

  # document_type: "access"

  # 下面fields添加的字段默认是在fields.xxx,能够设置在顶级对象下

  fields_under_root: true

  # 自定义添加的字段,通常用于标记日志来源

  fields:

    project: microservice

    app: product

 

# 输出到文件

output.file:

  path: "/tmp"

  enabled: false

  filename: "filebeat_out.txt"

 

# 输出到logstash

output.logstash:

  hosts: ["172.16.1.120:5044"]

  # logstash服务器地址,能够有多个

  enabled: true

  # 是否开启输出至logstash,默认即为true

  worker: 1

  # 工做线程数

  compression_level: 3

  # 压缩级别

  # loadbalance: true

  # 多个输出的时候开启负载

 

# 输出到es

setup.ilm.enabled: false

setup.template.name: "filebeat-es"

setup.template.pattern: "filebeat-es-*"

output.elasticsearch:

  hosts: ["172.16.1.120:9200","172.16.1.121:9200","172.16.1.122:9200"]

  index: "filebeat-es-%{+yyyy.MM.dd}"

  enabled: false

 

# 添加模块

filebeat.config.modules:

  path: ${path.config}/modules.d/*.yml

  reload.enabled: false

  reload.period: 10s

 

# 添加主机信息

processors:

  - add_host_metadata:

      when.not.contains.tags: forwarded

  - add_cloud_metadata: ~

  - add_docker_metadata: ~

  - add_kubernetes_metadata: ~