目录

ELK系列(3) - Logstash问题汇总

启动参数

启动Logstash时可以指定一些参数:

1
2
3
4
5
-w # 指定线程,默认是cpu核数 
-f # 指定配置文件
-r # 启用热加载,可以在运行期间修改配置文件并生效
-t # 测试配置文件是否正常
-b # 执行filter模块之前最大能积累的日志,数值越大性能越好,同时越占内存

关闭Logstash

如果将Logstash作为服务启动的,通过以下方式之一关闭:

1
2
3
4
5
6
7
8
// systemd
systemctl stop logstash

// upstart
initctl stop logstash

// sysv
/etc/init.d/logstash stop

如果是在POSIX系统的控制台中直接运行Logstash则这样关闭:

1
kill -TERM {logstash_pid}

或者在控制台中输入Ctrl-C

启动、关闭脚本

下面是一个Logstash启动、关闭的ansible脚本,里面有几个环境变量(logstash_home,logstash_bin_folder,logstash_config)需要替换成对应的值才能作为一个完整的shell文件执行:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
#!/bin/sh

#LOGSTASH_USAGE is the message if this script is called without any options
LOGSTASH_USAGE="Usage: $0 {start|stop|status|restart}"

#SHUTDOWN_WAIT is wait time in seconds for java proccess to stop
SHUTDOWN_WAIT=20

#------ private functions
logstash_pid() {
  echo `ps -fe | grep {{ logstash_home }} | grep -v grep | grep -v $0 | tr -s " "|cut -d" " -f2`
}

start() {
  pid=$(logstash_pid)
  if [ -n "$pid" ]
  then
    echo -e "Logstash is already running (pid: $pid)"
  else
    # Start Logstash
    echo -e "Starting Logstash"
    nohup sh {{ logstash_bin_folder }}/logstash -f {{ logstash_config }} &
  fi
  return 0
}

status(){
  pid=$(logstash_pid)
  if [ -n "$pid" ]; then echo -e "Logstash is running with pid: $pid"
  else echo -e "Logstash is not running"
  fi
}

stop() {
  pid=$(logstash_pid)
  if [ -n "$pid" ]
  then
    echo -e "Stoping Logstash [$pid]"
    kill -TERM $pid

    let kwait=$SHUTDOWN_WAIT
    count=0;
    until [ `ps -p $pid | grep -c $pid` = '0' ] || [ $count -gt $kwait ]
    do
      echo -n -e "\nwaiting for processes to exit";
      sleep 1
      let count=$count+1;
    done

    if [ $count -gt $kwait ]; then
      echo -n -e "\nkilling processes which didn't stop after $SHUTDOWN_WAIT seconds"
      kill -9 $pid
    fi
  else
    echo -e "Logstash is not running"
  fi

  return 0
}

#----- main program
case $1 in
  start)
    start
  ;;

  stop)
    stop
  ;;

  restart)
    stop
    start
  ;;

  status)
    status
  ;;

  *)
    echo -e $LOGSTASH_USAGE
  ;;
esac

exit 0

分割字符串并添加新的字段到Elasticsearch

有时需要对收集到的日志等信息进行分割,并且将分割后的字符作为新的字段index到Elasticsearch里。假定需求如下:

Logstash收集到的日志字段message的值是由多个字段拼接而成的,分隔符是;,;,如下:

1
2
3
{
    "message": "key_1=value_1;,;key_2=value_2"
}

现在想要将message的值拆分成2个新的字段:key_1、key_2,并且将它们index到ES里,可以借助Logstash的filter的插件来完成;这里提供两种解决方案。

方案一:使用mutate插件

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
filter {
    mutate {
        split => ["message",";,;"]
    }

    if [message][0] {
        mutate {                
            add_field =>   {
                "temp1" => "%{[message][0]}"
            }
        }
    }
    
    if [message][1] {
        mutate {                
            add_field =>   {
                "temp2" => "%{[message][1]}"
            }
        }
    }   

    if [temp1][1] {
        mutate {
            split => ["temp1","="]
            add_field =>   {
                "%{[temp1][0]}" => "%{[temp1][1]}"
            }
        }
    }
    
    if [temp2][1] {
        mutate {
            split => ["temp2","="]
            add_field =>   {
                "%{[temp2][0]}" => "%{[temp2][1]}"
            }
            remove_field => [ "temp1", "temp2", "message" ]
        }
    }
}

在filter里可以通过if [varA]来判断一个变量varA是否为空或null,如果需要取反则是if ![varA]

从上面代码可以看出,这种做法很麻烦,也不利于日后的维护。每当message里被拼接的字段的数量增加时,就必须同步改动这里的filter逻辑,而且添加的代码量也是呈线性递增的。

此外,这里使用的诸如temp1等临时变量,可以用[@metadata][temp1]的写法来作为临时变量,这样就不需要去手动remove掉了。

方案二:使用ruby插件

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
filter {
	ruby {
		code => "
			array1 = event.get('message').split(';,;')
			array1.each do |temp1|
				if temp1.nil? then
					next
				end
				array2 = temp1.split('=')
				key = array2[0]
				value = array2[1]
				if key.nil? then
					next
				end
				event.set(key, value)
			end
		"
		remove_field => [ "message" ]
	}
}

ruby插件可以允许你使用ruby的语法来完成各种复杂的逻辑,使用这种方案可以完美解决方案一中的不足之处,便于日后的维护。

block in multi_receive_encoded

测试坏境中Logstash总是会down掉,查看了下日志文件,发现报错如下:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
[2019-06-28T07:56:13,148][FATAL][logstash.runner          ] An unexpected error occurred!
{
	: error=>#<Errno: : EPIPE: Brokenpipe-<STDOUT>>,
	: backtrace=>["org/jruby/RubyIO.java:1457:in `write'",
	"org/jruby/RubyIO.java:1428:in `write'",
	"/home/cbx6/software/logstash-6.6.1/vendor/bundle/jruby/2.3.0/gems/logstash-output-stdout-3.1.4/lib/logstash/outputs/stdout.rb:43:in `block in multi_receive_encoded'",
	"org/jruby/RubyArray.java:1734:in `each'",
	"/home/cbx6/software/logstash-6.6.1/vendor/bundle/jruby/2.3.0/gems/logstash-output-stdout-3.1.4/lib/logstash/outputs/stdout.rb:42:in `multi_receive_encoded'",
	"/home/cbx6/software/logstash-6.6.1/logstash-core/lib/logstash/outputs/base.rb:87:in `multi_receive'",
	"org/logstash/config/ir/compiler/OutputStrategyExt.java:114:in `multi_receive'",
	"org/logstash/config/ir/compiler/AbstractOutputDelegatorExt.java:97:in `multi_receive'",
	"/home/cbx6/software/logstash-6.6.1/logstash-core/lib/logstash/pipeline.rb:390:in `block in output_batch'",
	"org/jruby/RubyHash.java:1343:in `each'",
	"/home/cbx6/software/logstash-6.6.1/logstash-core/lib/logstash/pipeline.rb:389:in `output_batch'",
	"/home/cbx6/software/logstash-6.6.1/logstash-core/lib/logstash/pipeline.rb:341:in `worker_loop'",
	"/home/cbx6/software/logstash-6.6.1/logstash-core/lib/logstash/pipeline.rb:304:in `block in start_workers'"]
}

从堆栈信息里可以看到关键字眼:block in multi_receive_encodedblock in output_batch;另外,还可以发现这些错误信息都是由logstash-output-stdout-3.1.4这个插件引发的。

简单分析来看,应该是由于测试环境下同一时间内太多message要经由logstash-output-stdout输出到控制台造成的某种未知的并发问题。下面是对应的Logstash的output的配置:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
output {
    stdout {
        codec => rubydebug
    }
    elasticsearch {
        hosts => ["{{ cbx_logstash_es_server }}"]
        index => "%{indexName}"
        action => "index"
    }
}

根据配置,并结合堆栈信息来分析,可以认为是Logstash的stdout插件在高并发状态下使用rubydebug进行编解码时抛出了异常。

其实这里的stdout插件是不必要的,之前只是在本地测试使用到的。而在测试环境下,并发量远非本地测试能比,此外将大量的message输出到console上也会对性能产生影响。可以说,这种配置等同于在Java代码中频繁使用System.out.print()语句来输出信息,应该去除掉,最终output的配置如下:

1
2
3
4
5
6
7
output {
    elasticsearch {
        hosts => ["{{ cbx_logstash_es_server }}"]
        index => "%{indexName}"
        action => "index"
    }
}

将stdout插件的配置去除后,在之后的一段时间里,测试环境的Logstash不再发生异常退出,证实该issue确实是由stdoutcodec所引发的。注意,不要在正式环境使用该插件来输出信息到控制台,有可能会引发类似的并发异常问题或者性能问题。

参考链接

警告
本文最后更新于 March 19, 2022,文中内容可能已过时,请谨慎使用。