logstash
安装logstash请参考我之前的elk环境搭建
https://www.coonote.com/note/elk-log-systme-deploy.html
logstash是一个数据分析软件,主要目的是分析log日志。整一套软件可以当作一个mvc模型,logstash是controller层,elasticsearch是一个model层,kibana是view层。
首先将数据传给logstash,它将数据进行过滤和格式化(转成json格式),然后传给elasticsearch进行存储、建搜索的索引,kibana提供前端的页面再进行搜索和图表可视化,它是调用elasticsearch的接口返回的数据进行可视化。logstash和elasticsearch是用java写的,kibana使用node.js框架。
它组要组成部分是数据输入,数据源过滤,数据输出三部分。
数据源input使用详解
input 及输入是指日志数据传输到logstash中。其中常见的配置如下:
- file:从文件系统中读取一个文件,很像unix命令 “tail -0a”
- syslog:监听514端口,按照rfc3164标准解析日志数据
- redis:从redis服务器读取数据,支持channel(发布订阅)和list模式。redis一般在logstash消费集群中作为"broker"角色,保存events队列共logstash消费。
- lumberjack:使用lumberjack协议来接收数据,目前已经改为 logstash-forwarder。
stdin { } # 从控制台中输入来源
file { # 从文件中来
path => "e:/software/logstash-1.5.4/logstash-1.5.4/data/*" #单一文件
#监听文件的多个路径
path => ["e:/software/logstash-1.5.4/logstash-1.5.4/data/*.log","f:/*.log"]
#排除不想监听的文件
exclude => "1.log"
#添加自定义的字段
add_field => {"test"=>"test"}
#增加标签
tags => "tag1"
#设置新事件的标志
delimiter => "\n"
#设置多长时间扫描目录,发现新文件
discover_interval => 15
#设置多长时间检测文件是否修改
stat_interval => 1
#监听文件的起始位置,默认是end
start_position => beginning
#监听文件读取信息记录的位置
sincedb_path => "e:/software/logstash-1.5.4/logstash-1.5.4/test.txt"
#设置多长时间会写入读取的位置信息
sincedb_write_interval => 15
}
syslog { # 系统日志方式
type => "system-syslog" # 定义类型
port => 10514 # 定义监听端口
}
beats { # filebeats方式
port => 5044
}
以上文件来源file,syslog,beats 只能选择其中一种
注意:
- 文件的路径名需要时绝对路径
- 支持globs写法
- 如果想要监听多个目标文件可以改成数组
1、path
path是file中唯一必需的参数。其他都是可选参数
2、exclude
是不想监听的文件,logstash会自动忽略该文件的监听。配置的规则与path类似,支持字符串或者数组,但是要求必须是绝对路径。
3、start_position
是监听的位置,默认是end,即一个文件如果没有记录它的读取信息,则从文件的末尾开始读取,也就是说,仅仅读取新添加的内容。对于一些更新的日志类型的监听,通常直接使用end就可以了;相反,beginning就会从一个文件的头开始读取。但是如果记录过文件的读取信息,这个配置也就失去作用了。
4、sincedb_path
这个选项配置了默认的读取文件信息记录在哪个文件中,默认是按照文件的inode等信息自动生成。其中记录了inode、主设备号、次设备号以及读取的位置。因此,如果一个文件仅仅是重命名,那么它的inode以及其他信息就不会改变,因此也不会重新读取文件的任何信息。类似的,如果复制了一个文件,就相当于创建了一个新的inode,如果监听的是一个目录,就会读取该文件的所有信息。
5、关于扫描和检测的时间
按照默认的来就好了,如果频繁创建新的文件,想要快速监听,那么可以考虑缩短检测的时间。
6、add_field
就是增加一个字段,例如:
7、tags
用于增加一些标签,这个标签可能在后续的处理中起到标志的作用
ogstash官方文档:
https://www.elastic.co/guide/en/logstash/current/plugins-inputs-file.html#plugins-inputs-file-sincedb_path
数据过滤filter
fillters 在logstash处理链中担任中间处理组件。他们经常被组合起来实现一些特定的行为来,处理匹配特定规则的事件流。常见的filters如下:
- grok:解析无规则的文字并转化为有结构的格式。grok 是目前最好的方式来将无结构的数据转换为有结构可查询的数据。有120多种匹配规则,会有一种满足你的需要。
- mutate:mutate filter 允许改变输入的文档,你可以从命名,删除,移动或者修改字段在处理事件的过程中。
- drop:丢弃一部分events不进行处理,例如:debug events。
- clone:拷贝 event,这个过程中也可以添加或移除字段。
- geoip:添加地理信息(为前台kibana图形化展示使用)
它的主要作用就是把数据解析成规则的json键值对格式便于输出到其他组件中使用。
logstash自带的文件正则支持grok,date,geoip
filter {
#定义数据的格式
grok {
match => { "message" => "%{data:timestamp}\|%{ip:serverip}\|%{ip:clientip}\|%{data:logsource}\|%{data:userid}\|%{data:requrl}\|%{data:requri}\|%{data:refer}\|%{data:device}\|%{data:textduring}\|%{data:duringtime:int}\|\|"}
}
#定义时间戳的格式
date {
match => [ "timestamp", "yyyy-mm-dd-hh:mm:ss" ]
locale => "cn"
}
#定义客户端的ip是哪个字段(上面定义的数据格式)
geoip {
source => "clientip"
}
}
#需要进行转换的字段,这里是将访问的时间转成int,再传给elasticsearch
mutate {
convert => ["duringtime", "integer"]
}
输出配置output
outputs是logstash处理管道的最末端组件。一个event可以在处理过程中经过多重输出,但是一旦所有的outputs都执行结束,这个event也就完成生命周期。一些常用的outputs包括:
- elasticsearch:如果你计划将高效的保存数据,并且能够方便和简单的进行查询.
-
file:将event数据保存到文件中。
-
graphite:将event数据发送到图形化组件中,一个很流行的开源存储图形化展示的组件。http://graphite.wikidot.com/。
-
statsd:statsd是一个统计服务,比如技术和时间统计,通过udp通讯,聚合一个或者多个后台服务,如果你已经开始使用statsd,该选项对你应该很有用。
默认情况下将过滤扣的数据输出到elasticsearch,当我们不需要输出到es时需要特别声明输出的方式是哪一种,同时支持配置多个输出源
output {
#将输出保存到elasticsearch,如果没有匹配到时间就不保存,因为日志里的网址参数有些带有换行
if [timestamp] =~ /^\d{4}-\d{2}-\d{2}/ {
elasticsearch { host => localhost }
}
#输出到stdout
# stdout { codec => rubydebug }
#定义访问数据的用户名和密码
# user => webservice
# password => 1q2w3e4r
}
codecs
codecs 是基于数据流的过滤器,它可以作为input,output的一部分配置。codecs可以帮助你轻松的分割发送过来已经被序列化的数据。流行的codecs包括 json,msgpack,plain(text)。
- json:使用json格式对数据进行编码/解码
- multiline:将汇多个事件中数据汇总为一个单一的行。比如:java异常信息和堆栈信息
获取完整的配置信息,请参考 logstash文档中 "plugin configuration"部分。
调试模式
日志解析时需要不停的对数据进行格式调试,因此这里直接配置成调试模式下使用,其实就是控制台输入模式。
logstash-7.3.0\config\logstash.conf
input {
stdin { }
# file { #文件配置方式
# path => "/opt/logstash/1.log"
# start_position => "beginning"
# }
}
filter { # 配置过滤器
grok {
patterns_dir => "d:\mnt\work_space\elk\logstash-7.3.0\config\patterns"
match => {
"message" =>"%{iporhost:remote_addr} - - \[%{httpdate:log_timestamp}\] %{hostname:http_host} %{word:request_method} \"%{uripath1:uri}\" \"%{uriparm1:param}\" %{base10num:http_status} %{base10num:body_bytes_sent} \"(?:%{uri1:http_referrer}|-)\" (%{base10num:upstream_status}|-) (?:%{hostport:upstream_addr}|-) (%{base16float:upstream_response_time}|-) (%{base16float:request_time}|-) (?:%{quotedstring:user_agent}|-) \"(%{word:x_forword_for}|-)\""
}
}
date {
match => [ "log_timestamp", "yyyy-mm-dd-hh:mm:ss" ]
locale => "cn"
}
}
output {
stdout {
codec => rubydebug # 将日志输出到当前的终端上显示
}
}
message是根据下面的日志数据格式进行的解析,它会自动解析成k-v键值对形式在控制台打印出来,这也是后面数据导入到es或者其他持久化文件中的格式,便于其他地方直接使用。
启动logstash
d:\mnt\work_space\elk\logstash-7.3.0\bin
# .\logstash -f d:\mnt\work_space\elk\logstash-7.3.0\config\logstash.conf --config.reload.automatic
配置文件更新时,logstash会自动重启模式
nginx日志数据格式实操
需要解析的日志格式ngxin的access.log日志格式配置如下
log_format main '$remote_addr - $remote_user [$time_local] $http_host $request_method "$uri" "$query_string" '
'$status $body_bytes_sent "$http_referer" $upstream_status $upstream_addr $request_time $upstream_response_time '
'"$http_user_agent" "$http_x_forwarded_for"' ;
access_log logs/access.log main;
192.168.0.200 - - [12/aug/2019:01:11:51 0800] 192.168.0.200 get "/index.html" "name=bamboo" 200 612 "-" - - 0.000 - "mozilla/5.0 (windows nt 10.0; win64; x64) applewebkit/537.36 (khtml, like gecko) chrome/76.0.3809.100 safari/537.36" "-"
为了方便,我们这里直接把上面这条数据作为测数据,在控制台贴入回车,即可所见所得。
测试,启动后把上面的测试数据贴入测试环境中,可以看到打印出如下数据,
{
"host" => "desktop-vlo4b8k",
"param" => "name=bamboo",
"message" => "192.168.0.200 - - [12/aug/2019:01:11:51 0800] 192.168.0.200 get \"/index.html\" \"name=bamboo\" 200 612 \"-\" - - 0.000 - \"mozilla/5.0 (windows nt 10.0; win64; x64) applewebkit/537.36 (khtml, like gecko) chrome/76.0.3809.100 safari/537.36\" \"-\"\r",
"request_method" => "get",
"uri" => "/index.html",
"http_status" => "200",
"body_bytes_sent" => "612",
"tags" => [
[0] "_dateparsefailure"
],
"log_timestamp" => "12/aug/2019:01:11:51 0800",
"remote_addr" => "192.168.0.200",
"@version" => "1",
"http_host" => "192.168.0.200",
"@timestamp" => 2019-08-11t17:12:25.607z,
"upstream_response_time" => "0.000",
"user_agent" => "\"mozilla/5.0 (windows nt 10.0; win64; x64) applewebkit/537.36 (khtml, like gecko) chrome/76.0.3809.100 safari/537.36\""
}
@version,@timestamp,host,message是logstash本身就需要的基本数据,其他数据则是我们在message解析后得到的键值对数据。
字符串正则和自定义grok正则表达式
logstash的配置文件
创建自定义正则目录
# mkdir -p /usr/local/logstash/patterns
# vi /usr/local/logstash/patterns/nginx
然后写入上面自定义的正则
uriparm1 [a-za-z0-9$. !*'|(){},~@#%&/=:;_?\-\[\]]*
uripath1 (?:/[a-za-z0-9$. !*'(){},~:;=@#%&_\- ]*)
uri1 (%{uriproto}://)?(?:%{user}(?::[^@]*)?@)?(?:%{urihost})?(?:%{uripathparam})?
nginxaccess %{iporhost:remote_addr} - (%{username:user}|-) \[%{httpdate:log_timestamp}\] %{hostname:http_host} %{word:request_method} \"%{uripath1:uri}\" \"%{uriparm1:param}\" %{base10num:http_status} (?:%{base10num:body_bytes_sent}|-) \"(?:%{uri1:http_referrer}|-)\" (%{base10num:upstream_status}|-) (?:%{hostport:upstream_addr}|-) (%{base16float:upstream_response_time}|-) (%{base16float:request_time}|-) (?:%{quotedstring:user_agent}|-) \"(%{ipv4:client_ip}|-)\" \"(%{word:x_forword_for}|-)\"
使用方式
grok {
patterns_dir => "/usr/local/logstash/patterns" //设置自定义正则路径
match => {
"message" => "%{nginxaccess}"
}
}
其他格式日志解析样例
常见的日志样例,便于参考解析
内容: - 2015-04-29 13:04:23,733 [main] info (api.batch.threadpoolworker) command-line options for this run:
正则:- %{timestamp_iso8601:time} \[%{word:main}\] %{loglevel:loglevel} \(%{javaclass:class}\) %{greedydata:mydata}
结果:
{
"time": [
"2015-04-29 13:04:23,733"
],
"main": [
"main"
],
"loglevel": [
"info"
],
"class": [
"api.batch.threadpoolworker"
],
"mydata": [
"command-line options for this run:"
]
}
内容:/wls/applogs/rtlog/icore-pamsdrserver1351/icore-pamsdrserver1351.out
正则:/wls/applogs/rtlog/(?(?[a-za-z-] )([0-9]*(?:sf)|(?:wii)|(?:dmz)|(?:dr))([0-9a-za-z] ))%{unixpath:filepath}
结果:
{
"host": [
"icore-pamsdrserver1351"
],
"appname": [
"icore-pams"
],
"filepath": [
"/icore-pamsdrserver1351.out"
]
}