filebeat--->logstash----->elasticsearch------>kibana
filebeat---->kafka--->logstash----->elasticsearch------>kibana
logback------>logback appender---->kafka--->logstash----->elasticsearch------>kibana
接下来先说第一种方案
环境
四台 linux为centos 6 32位,网上下载的教程里面带的虚拟机 ,elasticsearch-6.6.0, jdk1.8 ,IP 分别为 192.168.10.132,192.168.10.133,192.168.10.134,192.168.10.135
本机 window 64位主机,虚拟软件用的是vmware workstaion ,kibana-6.6.0-windows-x86_64, jdk1.8
[root@node-134 ~]# cat /proc/version
Linux version 2.6.32-358.el6.i686 (mockbuild@c6b8.bsys.dev.centos.org) (gcc version 4.4.7 20120313 (Red Hat 4.4.7-3) (GCC) ) #1 SMP Thu Feb 21 21:50:49 UTC 2013
[root@node-134 ~]# java -version
java version "1.8.0_181"
Java(TM) SE Runtime Environment (build 1.8.0_181-b13)
Java HotSpot(TM) Server VM (build 25.181-b13, mixed mode)
下载 elasticsearch-6.6.0.tar.gz
wget https://artifacts.elastic.co/downloads/elasticsearch/elasticsearch-6.6.0.tar.gz
创建组dev和用户elk
groupadd dev useradd elk -g dev -p elk
tar -zxvf elasticsearch-6.6.0.tar.gz
chown -R elk:dev /usr/local/lddxfs/elasticsearch-6.6.0 chmod -R 777 /usr/local/lddxfs/elasticsearch-6.6.0
使用root用户修改配置
主机192.168.10.132(主机192.168.10.133 主机192.168.10.134配置类似,不再列出)
/usr/local/lddxfs/elasticsearch-6.6.0/config/elasticsearch.yml完整配置
cluster.name: lddxfs-application
node.name: node-132
bootstrap.memory_lock: false
bootstrap.system_call_filter: false
network.host: 192.168.10.132
http.port: 9200
discovery.zen.ping.unicast.hosts: ["192.168.10.132", "192.168.10.133","192.168.10.134","192.168.10.135"]
xpack.ml.enabled: false
主机192.168.10.135
/usr/local/lddxfs/elasticsearch-6.6.0/config/elasticsearch.yml完整配置
cluster.name: lddxfs-application
node.name: node-135
node.master: false
node.data: false
node.ingest: false
bootstrap.memory_lock: false
bootstrap.system_call_filter: false
network.host: 192.168.10.135
http.port: 9200
discovery.zen.ping.unicast.hosts: ["192.168.10.132", "192.168.10.133","192.168.10.134","192.168.10.135"]
xpack.ml.enabled: false
四台 linux其他配置
/etc/security/limits.conf 末尾增加配置,*也是要的
* soft nofile 65536
* hard nofile 65536
* soft nproc 32000
* hard nproc 32000
* hard memlock unlimited
* soft memlock unlimited
/etc/sysctl.conf 首行增加配置
vm.max_map_count = 655360
/etc/profile 文件末尾增加
export JAVA_HOME=/usr/java/jdk1.8.0_181-i586
export PATH=$JAVA_HOME/bin:$PATH
export CLASSPATH=.:$JAVA_HOME/lib/dt.jar:$JAVA_HOME/lib/tools.jar:$CLASSPATH
export JRE_HOME=$JAVA_HOME/jre
/usr/java/jdk1.8.0_181-i586/jre/lib/i386/jvm.cfg 文件的内容稍作修改,调整-server和-client的顺序,修改好后
-server KNOWN
-client IF_SERVER_CLASS -server
-minimal KNOWN
修改主机名配置域名解析(这个好像不是必须的,防火墙的关系)主机192.168.10.133 主机192.168.10.134 主机192.168.10.135配置类似,不再列出。
[root@node-132 i386]# cat /etc/hosts
127.0.0.1 localhost node-132 localhost4 localhost4.localdomain4
::1 localhost localhost.localdomain localhost6 localhost6.localdomain6
192.168.10.132 node-132
192.168.10.133 node-133
192.168.10.134 node-134
192.168.10.135 node-135
关闭防火墙
chkconfig iptables off service iptables stop
修改主机名
vi /etc/sysconfig/network
修改完重启服务
reboot
启动服务
[root@node-134 ~]#su elk
[root@node-134 ~]# /usr/local/lddxfs/elasticsearch-6.6.0/bin/elasticsearch -h
starts elasticsearch
Option Description
------ -----------
-E <KeyValuePair> Configure a setting
-V, --version Prints elasticsearch version information and exits
-d, --daemonize Starts Elasticsearch in the background
-h, --help show help
-p, --pidfile <Path> Creates a pid file in the specified path on start
-q, --quiet Turns off standard output/error streams logging in console
-s, --silent show minimal output
-v, --verbose show verbose output
[root@node-134 ~]# /usr/local/lddxfs/elasticsearch-6.6.0/bin/elasticsearch -d
elasticsearch集群验证
ES的api
https://www.elastic.co/guide/en/elasticsearch/reference/master/index.html
https://www.elastic.co/guide/en/elasticsearch/reference/master/indices-delete-index.html
ES参考博客
http://blog.51cto.com/zero01/2079879
http://blog.51cto.com/zero01/2082794
ES连接kibana
https://www.elastic.co/guide/en/kibana/current/production.html#load-balancing
D:\software\kibana-6.6.0-windows-x86_64\config
server.port: 5601
server.host: "localhost"
elasticsearch.hosts: ["http://192.168.10.135:9200"]
运行kibana
D:\software\kibana-6.6.0-windows-x86_64\bin\kibana.bat
初步运行起来了,下图这些数据是kibana 的demo数据
http://127.0.0.1:5601/app/kibana#/home?_g=()
logstash配置文件
创建logstah配置文件/usr/local/lddxfs/logstash-6.5.4/config/logstash-beats.conf
[root@node-135 config]# cat logstash-beats.conf
# Sample Logstash configuration for creating a simple
# Beats -> Logstash -> Elasticsearch pipeline.
input {
beats {
port => 5044
}
}
output {
elasticsearch {
hosts => ["http://192.168.10.132:9200","http://192.168.10.133:9200","http://192.168.10.134:9200"]
index => "demolog-%{+YYYY.MM.dd}"
}
}
运行logstash
/usr/local/lddxfs/logstash-6.5.4/bin/logstash -f /usr/local/lddxfs/logstash-6.5.4/config/logstash-beats.conf
logstash参考文档
目标
修改filebeat配置文件,发送/var/log/*.log日志到logstash
操作
下载filebeat-6.5.4-linux-x86压缩文件 ,上传到192.168.10.135,解压后
cd /usr/local/lddxfs/filebeat-6.5.4-linux-x86
[root@node-135 filebeat-6.5.4-linux-x86]# cat filebeat.yml
filebeat.inputs:
- type: log
enabled: true
paths:
- /var/log/*.log
filebeat.config.modules:
path: ${path.config}/modules.d/*.yml
reload.enabled: false
setup.template.settings:
index.number_of_shards: 3
setup.kibana:
output.logstash:
hosts: ["localhost:5044"]
processors:
- add_host_metadata: ~
- add_cloud_metadata: ~
日志目录的权限需要修改,这里省略了
运行filebeat
./filebeat -c filebeat.yml -e
打开浏览器访问kibana,创建索引
这个索引(指:index => "demolog-%{+YYYY.MM.dd}")创建不太合理,实际使用是按项目名的
使用192.168.10.132
解压zookeeper,复制三份,分别放到 zookeeper_2185 ,zookeeper_2186 ,zookeeper_2187三个目录下,修改配置如下
cat /usr/local/lddxfs/zookeeper_2185/conf/zoo.cfg
tickTime=2000
initLimit=10
syncLimit=5
dataDir=/usr/local/lddxfs/zookeeper_2185/datadir
dataLogDir=/usr/local/lddxfs/zookeeper_2185/logdir
clientPort=2185
autopurge.snapRetainCount=30
autopurge.purgeInterval=48
server.1=192.168.10.132:2885:3555
server.2=192.168.10.132:2886:3556
server.3=192.168.10.132:2887:3557
cat /usr/local/lddxfs/zookeeper_2186/conf/zoo.cfg
tickTime=2000
initLimit=10
syncLimit=5
dataDir=/usr/local/lddxfs/zookeeper_2186/datadir
dataLogDir=/usr/local/lddxfs/zookeeper_2186/logdir
clientPort=2186
autopurge.snapRetainCount=30
autopurge.purgeInterval=48
server.1=192.168.10.132:2885:3555
server.2=192.168.10.132:2886:3556
server.3=192.168.10.132:2887:3557
cat /usr/local/lddxfs/zookeeper_2187/conf/zoo.cfg
tickTime=2000
initLimit=10
syncLimit=5
dataDir=/usr/local/lddxfs/zookeeper_2187/datadir
dataLogDir=/usr/local/lddxfs/zookeeper_2187/logdir
autopurge.snapRetainCount=30
autopurge.purgeInterval=48
clientPort=2187
server.1=192.168.10.132:2885:3555
server.2=192.168.10.132:2886:3556
server.3=192.168.10.132:2887:3557
新建文件,文件名为myid 文件内容:
[root@node-132 datadir]# cat /usr/local/lddxfs/zookeeper_2185/datadir/myid
1
[root@node-132 datadir]# cat /usr/local/lddxfs/zookeeper_2186/datadir/myid
2
[root@node-132 datadir]# cat /usr/local/lddxfs/zookeeper_2187/datadir/myid
3
启动服务
[root@node-132 lddxfs]# sh /usr/local/lddxfs/zookeeper_2185/bin/zkServer.sh start
ZooKeeper JMX enabled by default
Using config: /usr/local/lddxfs/zookeeper_2185/bin/../conf/zoo.cfg
Starting zookeeper ... STARTED
[root@node-132 lddxfs]# sh /usr/local/lddxfs/zookeeper_2186/bin/zkServer.sh start
ZooKeeper JMX enabled by default
Using config: /usr/local/lddxfs/zookeeper_2186/bin/../conf/zoo.cfg
Starting zookeeper ... STARTED
[root@node-132 lddxfs]# sh /usr/local/lddxfs/zookeeper_2187/bin/zkServer.sh start
ZooKeeper JMX enabled by default
Using config: /usr/local/lddxfs/zookeeper_2187/bin/../conf/zoo.cfg
Starting zookeeper ... STARTED
[root@node-132 lddxfs]#
192.168.10.133
cat /usr/local/lddxfs/kafka_2.12-2.1.0/config/server.properties |grep -v '#' | grep -vG '^\s*$'
broker.id=2
listeners=PLAINTEXT://192.168.10.133:9092
num.network.threads=8
num.io.threads=8
socket.send.buffer.bytes=102400
socket.receive.buffer.bytes=102400
socket.request.max.bytes=104857600
log.dirs=/tmp/kafka-logs
num.partitions=3
num.recovery.threads.per.data.dir=1
offsets.topic.replication.factor=1
transaction.state.log.replication.factor=1
transaction.state.log.min.isr=1
log.retention.hours=168
log.segment.bytes=1073741824
log.retention.check.interval.ms=300000
zookeeper.connect=192.168.10.132:2185,192.168.10.132:2186,192.168.10.132:2187
zookeeper.connection.timeout.ms=6000
delete.topic.enable=true
复制到其他机器
scp -r kafka_2.12-2.1.0 root@192.168.10.132:/usr/local/lddxfs
scp -r kafka_2.12-2.1.0 root@192.168.10.134:/usr/local/lddxfs
修改/usr/local/lddxfs/kafka_2.12-2.1.0/config/server.properties 的listeners和broker.id
192.168.10.132
broker.id=1
listeners=PLAINTEXT://192.168.10.132:9092
#其他同133.......
192.168.10.134
broker.id=3
listeners=PLAINTEXT://192.168.10.134:9092
#其他同133.......
依次启动3台kafka
./bin/kafka-server-start.sh -daemon ./config/server.properties
创建topic
./bin/kafka-topics.sh --create --zookeeper 192.168.10.132:2185,192.168.10.132:2186,192.168.10.132:2187 --topic test --partitions 1 --replication-factor
1Created topic "test".86,192.168.10.132:2187 --topic test --partitions 1 --replication-factor
查看topic
./bin/kafka-topics.sh --list --zookeeper 192.168.10.132:2185,192.168.10.132:2186,192.168.10.132:2187
./bin/kafka-topics.sh --zookeeper 192.168.10.132:2185,192.168.10.132:2186,192.168.10.132:2187 --describe --topic test
Topic:test PartitionCount:1 ReplicationFactor:1 Configs: Topic: test Partition: 0 Leader: 1 Replicas: 1 Isr: 1
kafka UI工具
http://www.kafkatool.com/download.html
java项目 引入maven坐标
<!-- https://mvnrepository.com/artifact/com.github.danielwegener/logback-kafka-appender -->
<dependency>
<groupId>com.github.danielwegener</groupId>
<artifactId>logback-kafka-appender</artifactId>
<version>0.2.0-RC2</version>
</dependency>
<dependency>
<groupId>ch.qos.logback</groupId>
<artifactId>logback-core</artifactId>
<version>1.2.3</version>
</dependency>
<dependency>
<groupId>ch.qos.logback</groupId>
<artifactId>logback-classic</artifactId>
<version>1.2.3</version>
</dependency>
到网址 https://github.com/danielwegener/logback-kafka-appender/tags
下载logback-kafka-appender-0.2.0-RC2.zip,解压后找到logback.xml稍作修改
logback配置文件logback.xml 放到resource目录(springboot项目)
<?xml version="1.0" encoding="UTF-8"?>
<configuration>
<appender name="STDOUT" class="ch.qos.logback.core.ConsoleAppender">
<encoder>
<pattern>%date{ISO8601} %-5level [%thread] %logger{36}:%L - %msg%n</pattern>
</encoder>
</appender>
<!-- This example configuration is probably most unreliable under
failure conditions but wont block your application at all -->
<appender name="very-relaxed-and-fast-kafka-appender" class="com.github.danielwegener.logback.kafka.KafkaAppender">
<encoder class="ch.qos.logback.classic.encoder.PatternLayoutEncoder">
<pattern>%date{ISO8601} %-5level [%thread] %logger{36}:%L - %msg%n</pattern>
</encoder>
<topic>lddxfs-consumer-demo</topic>
<!-- we don't care how the log messages will be partitioned -->
<keyingStrategy class="com.github.danielwegener.logback.kafka.keying.NoKeyKeyingStrategy" />
<!-- use async delivery. the application threads are not blocked by logging -->
<deliveryStrategy class="com.github.danielwegener.logback.kafka.delivery.AsynchronousDeliveryStrategy" />
<!-- each <producerConfig> translates to regular kafka-client config (format: key=value) -->
<!-- producer configs are documented here: https://kafka.apache.org/documentation.html#newproducerconfigs -->
<!-- bootstrap.servers is the only mandatory producerConfig -->
<producerConfig>bootstrap.servers=192.168.10.132:9092,192.168.10.133:9092,192.168.10.134:9092</producerConfig>
<!-- don't wait for a broker to ack the reception of a batch. -->
<producerConfig>acks=0</producerConfig>
<!-- wait up to 1000ms and collect log messages before sending them as a batch -->
<producerConfig>linger.ms=1000</producerConfig>
<!-- even if the producer buffer runs full, do not block the application but start to drop messages -->
<producerConfig>max.block.ms=0</producerConfig>
<!-- define a client-id that you use to identify yourself against the kafka broker -->
<producerConfig>client.id=${HOSTNAME}-${CONTEXT_NAME}-logback-relaxed</producerConfig>
<!-- there is no fallback <appender-ref>. If this appender cannot deliver, it will drop its messages. -->
<appender-ref ref="STDOUT"/>
</appender>
<root level="info">
<appender-ref ref="very-relaxed-and-fast-kafka-appender" />
</root>
</configuration>
创建logstah配置文件/usr/local/lddxfs/logstash-6.5.4/config/logstash-kafka.conf
[root@node-135 config]# cat logstash-kafka.conf
# Sample Logstash configuration for creating a simple
# Beats -> Logstash -> Elasticsearch pipeline.
input {
kafka {
id => "pc135"
bootstrap_servers => "192.168.10.132:9092,192.168.10.133:9092,192.168.10.134:9092"
consumer_threads => 5
topics_pattern => "lddxfs.*"
decorate_events => true
}
}
output {
elasticsearch {
hosts => ["http://192.168.10.132:9200","http://192.168.10.133:9200","http://192.168.10.134:9200"]
index => "%{[@metadata][kafka][topic]}"
#user => "elastic"
#password => "changeme"
}
}
这个output》elasticsearch》index 索引是动态的,由java项目logback.xml指定
启动logstash
/usr/local/lddxfs/logstash-6.5.4/bin/logstash -f /usr/local/lddxfs/logstash-6.5.4/config/logstash-kafka.conf
启动java项目后查看es索引 网站 http://192.168.10.135:9200/_cat/indices?pretty
kibana创建索引之后 ,message这里还需要做调整
这里可以优化一下,不过message里面的信息太少了 又有一个框架(logstash-logback-encoder)可以收集更详细的日志,这个框架将日志事件转为json发送到kafka,而不仅仅只有message里的内容
网址
https://github.com/logstash/logstash-logback-encoder
maven 依赖增加以下内容
<!--使用kafka收集日志-->
<dependency>
<groupId>com.github.danielwegener</groupId>
<artifactId>logback-kafka-appender</artifactId>
<version>0.2.0-RC2</version>
</dependency>
<dependency>
<groupId>ch.qos.logback</groupId>
<artifactId>logback-core</artifactId>
<version>1.2.3</version>
</dependency>
<dependency>
<groupId>ch.qos.logback</groupId>
<artifactId>logback-classic</artifactId>
<version>1.2.3</version>
</dependency>
<dependency>
<groupId>ch.qos.logback</groupId>
<artifactId>logback-access</artifactId>
<version>1.2.3</version>
</dependency>
<!-- https://mvnrepository.com/artifact/net.logstash.logback/logstash-logback-encoder -->
<dependency>
<groupId>net.logstash.logback</groupId>
<artifactId>logstash-logback-encoder</artifactId>
<version>5.3</version>
</dependency>
修改后完整的logback.xml配置
<?xml version="1.0" encoding="UTF-8"?>
<configuration>
<appender name="stdout" class="ch.qos.logback.core.ConsoleAppender">
<encoder>
<pattern>%date{ISO8601} - %-5level - [%thread] - %logger{36}:%L - %msg%n</pattern>
</encoder>
</appender>
<appender name="fileLog" class="ch.qos.logback.core.rolling.RollingFileAppender">
<filter class="ch.qos.logback.classic.filter.ThresholdFilter">
<level>info</level>
</filter>
<file>/logs/app/lddxfs/consumer-demo/file.log</file>
<rollingPolicy class="ch.qos.logback.core.rolling.TimeBasedRollingPolicy">
<fileNamePattern>/logs/app/lddxfs/consumer-demo/file.log.%d{yyyy-MM-dd}</fileNamePattern>
<maxHistory>14</maxHistory>
</rollingPolicy>
<encoder>
<pattern>%date{ISO8601} - %-5level - [%thread] - %logger{36}:%L - %msg%n</pattern>
</encoder>
</appender>
<appender name="kafkaLog" class="com.github.danielwegener.logback.kafka.KafkaAppender">
<filter class="ch.qos.logback.classic.filter.ThresholdFilter">
<level>info</level>
</filter>
<encoder class="net.logstash.logback.encoder.LogstashEncoder"/>
<topic>lddxfs-consumer-demo</topic>
<!-- we don't care how the log messages will be partitioned -->
<keyingStrategy class="com.github.danielwegener.logback.kafka.keying.NoKeyKeyingStrategy"/>
<!-- use async delivery. the application threads are not blocked by logging -->
<deliveryStrategy class="com.github.danielwegener.logback.kafka.delivery.AsynchronousDeliveryStrategy"/>
<producerConfig>bootstrap.servers=192.168.10.132:9092,192.168.10.133:9092,192.168.10.134:9092</producerConfig>
<!-- don't wait for a broker to ack the reception of a batch. -->
<producerConfig>acks=0</producerConfig>
<!-- wait up to 1000ms and collect log messages before sending them as a batch -->
<producerConfig>linger.ms=1000</producerConfig>
<!-- even if the producer buffer runs full, do not block the application but start to drop messages -->
<producerConfig>max.block.ms=0</producerConfig>
<!-- define a client-id that you use to identify yourself against the kafka broker -->
<producerConfig>client.id=${HOSTNAME}-${CONTEXT_NAME}-logback-relaxed</producerConfig>
</appender>
<root level="info">
<appender-ref ref="fileLog"/>
<appender-ref ref="stdout"/>
<appender-ref ref="kafkaLog"/>
</root>
</configuration>
创建logstah配置文件/usr/local/lddxfs/logstash-6.5.4/config/logstash-kafka-json.conf
[root@node-135 config]# cat logstash-kafka-json.conf
# Sample Logstash configuration for creating a simple
# Beats -> Logstash -> Elasticsearch pipeline.
input {
kafka {
id => "pc135"
bootstrap_servers => "192.168.10.132:9092,192.168.10.133:9092,192.168.10.134:9092"
consumer_threads => 5
topics_pattern => "lddxfs.*"
decorate_events => true
}
}
filter {
json {
source => "message"
}
}
output {
elasticsearch {
hosts => ["http://192.168.10.132:9200","http://192.168.10.133:9200","http://192.168.10.134:9200"]
index => "%{[@metadata][kafka][topic]}"
#user => "elastic"
#password => "changeme"
}
}
启动logstash
/usr/local/lddxfs/logstash-6.5.4/bin/logstash -f /usr/local/lddxfs/logstash-6.5.4/config/logstash-kafka-json.conf
java代码
/**
* testLog
*/
@GetMapping("testLog")
@ApiOperation(value = "testLog", httpMethod = "GET")
public void testLog(HttpServletRequest request) {
MDC.clear();
MDC.put("request.remoteHost", request.getRemoteHost());
MDC.put("request.cookies", JSON.toJSONString(request.getCookies()) );
logger.error("test log", new RuntimeException("日志测试-看看输出异常了吗"));
}
最终效果
可以看到MDC里面的字段也可以输出,而且logstash中不用写grok正则解析日志
一些参考
https://logback.qos.ch/demo.html
logstach kafka配置参考
日志格式配置参考
https://logback.qos.ch/manual/layouts.html
logstash插件
https://github.com/logstash-plugins/logstash-filter-grok
https://github.com/logstash-plugins/logstash-patterns-core/tree/master/patterns
此处可能存在不合适展示的内容,页面不予展示。您可通过相关编辑功能自查并修改。
如您确认内容无涉及 不当用语 / 纯广告导流 / 暴力 / 低俗色情 / 侵权 / 盗版 / 虚假 / 无价值内容或违法国家有关法律法规的内容,可点击提交进行申诉,我们将尽快为您处理。