一、说明
1.Filebeat是一个日志文件托运工具,在你的服务器上安装客户端后,filebeat会监控日志目录或者指定的日志文件,追踪读取这些文件(追踪文件的变化,不停的读)
2.Kafka是一种高吞吐量的分布式发布订阅消息系统,它可以处理消费者规模的网站中的所有动作流数据
3.Logstash是一根具备实时数据传输能力的管道,负责将数据信息从管道的输入端传输到管道的输出端;与此同时这根管道还可以让你根据自己的需求在中间加上滤网,Logstash提供里很多功能强大的滤网以满足你的各种应用场景
4.ElasticSearch它提供了一个分布式多用户能力的全文搜索引擎,基于RESTful web接口
5.Kibana是ElasticSearch的用户界面
6.在实际应用场景下,为了满足大数据实时检索的场景,利用Filebeat去监控日志文件,将Kafka作为Filebeat的输出端,Kafka实时接收到Filebeat后以Logstash作为输出端输出,到Logstash的数据也许还不是我们想要的格式化或者特定业务的数据,这时可以通过Logstash的一些过了插件对数据进行过滤最后达到想要的数据格式以ElasticSearch作为输出端输出,数据到ElasticSearch就可以进行丰富的分布式检索了
二、系统环境
软件版本:filebeat6.5 logstash6.5 Centos7 yjava-1.8.0-openjdk
数据流向:filebeat --->> kafka--->> logstash
三、安装
1、Filebeat安装
[root@test ~]# yum install https://artifacts.elastic.co/downloads/beats/filebeat/filebeat-6.5.4-x86_64.rpm
2.Logstash的安装
[root@test ~]# yum install https://artifacts.elastic.co/downloads/logstash/logstash-6.5.4.rpm
3.kafka的安装
[root@localhost ~]# wget https://www.apache.org/dyn/closer.cgi?path=/kafka/2.1.0/kafka_2.12-2.1.0.tgz
[root@localhost ~]# wget http://mirror.bit.edu.cn/apache/zookeeper/zookeeper-3.4.10/zookeeper-3.4.10.tar.gz
[root@localhost ~]# tar xf zookeeper-3.4.10.tar.gz -C /usr/local/
[root@localhost ~]# tar xf kafka_2.12-2.1.0.tgz -C /usr/local/
[root@localhost ~]# mv /usr/local/zookeeper-3.4.10/conf/zoo_sample.cfg /usr/local/zookeeper-3.4.10/conf/zoo.cfg
[root@localhost ~]# /usr/local/zookeeper-3.4.10/bin/zkServer.sh start
[root@localhost ~]# vim /usr/local/kafka_2.12-2.1.0/config/server.properties
broker.id=0
listeners=PLAINTEXT://192.168.30.160:9092 # 默认监听所有地址,这个需要指定一个IP,否则在filebeat那往kafka中发送数据会失败。
num.network.threads=3
num.io.threads=8
socket.send.buffer.bytes=102400
socket.receive.buffer.bytes=102400
socket.request.max.bytes=104857600
log.dirs=/tmp/kafka-logs
num.partitions=1
num.recovery.threads.per.data.dir=1
offsets.topic.replication.factor=1
transaction.state.log.replication.factor=1
transaction.state.log.min.isr=1
log.retention.hours=24
log.segment.bytes=1073741824
log.cleaner.enable=true
log.retention.check.interval.ms=300000
zookeeper.connect=192.168.30.160:2181
zookeeper.connection.timeout.ms=600
group.initial.rebalance.delay.ms=0
[root@localhost ~]# /usr/local/kafka_2.12-2.1.0/bin/kafka-server-start.sh -daemon /usr/local/kafka_2.12-2.1.0/config/server.properties
[root@localhost ~]# ss -tunl|grep 9092
tcp LISTEN 0 50 ::ffff:192.168.30.160:9092 :::*
备注:一定得注意filebeat版本所兼容的kafka版本,详细查看:https://www.elastic.co/guide/en/beats/filebeat/current/kafka-output.html#_configuration_options_9
四、filebeat配置
[root@test ~]# cat /etc/filebeat/filebeat.yml
filebeat.inputs:
- type: log
enabled: true
paths:
- /var/log/httpd/access_log
fields:
log_topics: httpd
tags: ["httpd"]
- type: log
enabled: true
paths:
- /var/log/messages
fields:
log_topics: messages
tags: ["messages"]
include_lines: ['^Dec'] # 匹配需要行
tail_files: true # 从文件尾开始监控文件新增内容
processors:
- drop_fields:
fields: ["beat","input","source","offset"] # 过滤不需要的字段
name api-server # 添加字段,标记主机的日志来源
output.kafka:
enabled: true
codec.format: # 默认json格式
string: '%{[message]}' # 只输原始日志格式
hosts: ["192.168.30.160:9092"]
topic: '%{[fields][log_topics]}'
partition.round_robin: # 集群是否开启kafka的partition分区
reachable_only: false
worker: 2
required_acks: 1
compression: gzip # 压缩格式
max_message_bytes: 10000000 # 压缩格式字节大小
配置文件测试
filebeat test config -c filebeat.yml
[root@test ~]# systemctl start filebeat && systemctl enable filebeat
五、logstash配置文件
[root@test ~]# cat /etc/logstash/conf.d/logstash.conf
input {
kafka {
bootstrap_servers => "192.168.30.160:9092"
topics => ["httpd","messages"]
codec => "json"
consumer_threads => 4 # 消费进程数
enable_auto_commit => true # 自动提交offset
auto_commit_interval_ms => "1000" # consumer 隔多久提交
# offsets --消费指针
auto_offset_reset => "latest" # earliest
# 当各分区下有已提交的offset时,从提交的offset开始消费;无提交的offset时,从头开始消费
# latest
# 当各分区下有已提交的offset时,从提交的offset开始消费;无提交的offset时,消费新产生的该分区下的数据
# none
# topic各分区都存在已提交的offset时,从offset后开始消费;只要有一个分区不存在已提交的offset,则抛出异常
}
}
output {
if [tags] == "httpd" {
elasticsearch {
hosts => ["192.168.30.160:9200"]
index => "logstash-httpd-%{+YYYY.MM.dd}"
manage_template => true
}
}
if [tags] == "messages" {
elasticsearch {
hosts => ["192.168.30.160:9200"]
index => "logstash-messages-%{+YYYY.MM.dd}"
manage_template => true
}
}
}
#stdout { codec => rubydebug }
#path => "/tmp/logstash.log"
}
配置文件测试
/usr/share/logstash/bin/logstash -f /etc/logstash/conf.d/logstash.conf -t
[root@test ~]# systemctl start logstash && systemctl enable logstash
六、kafka常用命令
创建kafka topic
bin/kafka-topics.sh --zookeeper localhost:2181 --create --topic test --partitions 1 --replication-factor 1
删除kafka-topic
bin/kafka-topics.sh --zookeeper localhost:2181 --delete --topic test
查看所有topic列表
bin/kafka-topics.sh --zookeeper localhost:2181 --list
查看指定topic信息
bin/kafka-topics.sh --zookeeper localhost:2181 --describe --topic test
控制台向topic生产数据
bin/kafka-console-producer.sh --broker-list 192.168.30.160:9092 --topic test
控制台消费topic的数据
bin/kafka-console-consumer.sh --bootstrap-server 192.168.30.160:9092 --topic test --from-beginning