概述
安装 ES/Logstash/Kibana
下载 ES,注意版本,测试期间最好将 ES 和 Kibana 和 Logstash 保持一致的版本。然后启动 ES。
1
2
3
4
5
6
7
8
9
10
11
12
|
# 下载脚本
wget https://artifacts.elastic.co/downloads/elasticsearch/elasticsearch-5.3.0.zip
sha1sum elasticsearch-5.3.0.zip
unzip elasticsearch-5.3.0.zip
cd elasticsearch-5.3.0/
# 运行脚本,默认配置
bin/elasticsearch
# 启动成功可以查看相关参数
http://localhost:9200/_nodes
http://localhost:9200/_cat/indices?v&pretty
|
下载 Logstash,启动 Logstash。
1
2
|
# 默认配置启动
logstash -e 'input { stdin { } } output { stdout {} }'
|
下载 Kibana,启动 Kibana,注意不能在 tmux 下进行 nohup。
1
2
|
# 默认配置启动
nohup ./kibana >> ../logs/kibana.log &
|
通过 Logstash 收集消息导入 Kafka
演示脚本表示,通过 console 中输入消息,Logstash 把输入处理并写入 Kafka。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
|
# 创建 Kafka 主题
kafka-topics.sh --zookeeper localhost:2181 --create --topic kafka-elk-log --partition 1 --replication-factor 1
# Logstash 配置文件,保存至 etc/ 目录下
input {
stdin { }
}
output {
kafka {
topic_id => "kafka-elk-log"
bootstrap_servers => "localhost:9092"
batch_size => 5
codec => "plain"
}
stdout {
codec => rubydebug
}
}
# 创建配置文件,运行 Logstash
logstash -f ./etc/logstash_input_kafka
# 查看是否写入 Kafka,因为只有一个分区,所以容易查看数据
kafka-run-class.sh kafka.tools.DumpLogSegments --files /tmp/kafka-logs/kafka-elk-log-0/00000000000000000000.log --print-data-log
|
Logstash从Kafka消费日志
启动 ES,可以将 Kafka 数据通过 Logstash 消费打入 ES,并用 Kibana 作为展示。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
|
# Logstash 配置文件
input {
kafka {
codec => "plain"
group_id => "kafka_elk_group"
client_id => logstash
topics => "kafka-elk-log"
bootstrap_servers => "localhost:9092"
auto_offset_reset => "earliest"
consumer_threads => 5
}
}
output {
elasticsearch {
hosts => ["localhost:9200"]
codec => "plain"
index => "kafka_elk_log-%{+YYYY.MM.DD}"
}
}
# 启动 Logs
logstash -f ./etc/logstash_input_kafka
|
最终效果如下图:
日志采集
1
|
kafka-topics.sh --zookeeper localhost:2181 --create --topic acount-access-log --partitions 3 --replication-factor 1
|
警告
本文最后更新于 2017年2月1日,文中内容可能已过时,请谨慎参考。