linux怎么查看本机内存大小
356
2022-11-01
ELK学习笔记之基于kakfa (confluent)搭建ELK
0x00 概述
测试搭建一个使用kafka作为消息队列的ELK环境,数据采集转换实现结构如下:
F5 HSL–>logstash(流处理)–> kafka –>elasticsearch
测试中的elk版本为6.3, confluent版本是4.1.1
希望实现的效果是 HSL发送的日志胫骨logstash进行流处理后输出为json,该json类容原样直接保存到kafka中,kafka不再做其它方面的格式处理。
0x01 测试
192.168.214.138: 安装 logstash,confluent环境
192.168.214.137: 安装ELK套件(停用logstash,只启动es和kibana)
confluent安装调试备忘:
像安装elk环境一样,安装java环境先首先在不考虑kafka的情形下,实现F5 HSL—Logstash–ES的正常运行,并实现简单的正常kibana的展现。后面改用kafka时候直接将这里output修改为kafka plugin配置即可。此时logstash的相关配置
input { udp { port => 8514 type => 'f5-dns' }}filter { if [type] == 'f5-dns' { grok { match => { "message" => "%{HOSTNAME:F5hostname} %{IP:clientip} %{POSINT:clientport} %{IP:svrip} %{NUMBER:qid} %{HOSTNAME:qname} %{GREEDYDATA:qtype} %{GREEDYDATA:status} %{GREEDYDATA:origin}" } } geoip { source => "clientip" target => "geoip" } }}output { #stdout{ codec => rubydebug } #elasticsearch { # hosts => ["192.168.214.137:9200"] # index => "f5-dns-%{+YYYY.MM.dd}" #template_name => "f5-dns" #} kafka { codec => json bootstrap_servers => "localhost:9092" topic_id => "f5-dns-kafka" }}
发一些测试流量,确认es正常收到数据,查看cerebro上显示的状态。(截图是调试完毕后截图)
# cd /usr/share/cerebro/cerebro-0.8.1/# /bin/cerebro -D-Dbin]# ./confluent startUsing CONFLUENT_CURRENT: /tmp/confluent.dA0KYIWjStarting zookeeperzookeeper is [UP]Starting kafka/Kafka failed to startkafka is [DOWN]Cannot start Schema Registry, Kafka Server is not running. Check your deployment
检查发现由于虚机内存给太少了,导致java无法分配足够内存给kafka
[root@kafka-logstash bin]# ./kafka-server-start ../etc/kafka/server.propertiesOpenJDK 64-Bit Server VM warning: INFO: os::commit_memory(0x00000000c0000000, 1073741824, 0) failed; error='Cannot allocate memory' (errno=12)
扩大虚拟机内存,并将logstash的jvm配置中设置的内存调小
kafka server配置文件
[root@kafka-logstash kafka]# pwd/root/confluent-4.1.1/etc/kafka[root@kafka-logstash kafka]# egrep -v "^#|^$" server.propertiesbroker.id=0listeners=PLAINTEXT://localhost:9092num.network.threads=3num.io.threads=8socket.send.buffer.bytes=102400socket.receive.buffer.bytes=102400socket.request.max.bytes=104857600log.dirs=/tmp/kafka-logsnum.partitions=1num.recovery.threads.per.data.dir=1offsets.topic.replication.factor=1transaction.state.log.replication.factor=1transaction.state.log.min.isr=1log.retention.hours=168log.segment.bytes=1073741824log.retention.check.interval.ms=300000zookeeper.connect=localhost:2181zookeeper.connection.timeout.ms=6000confluent.support.metrics.enable=trueconfluent.support.customer.id=anonymousgroup.initial.rebalance.delay.ms=0
connect 配置文件,此配置中,将原来的avro converter替换成了json,同时关闭了key vlaue的schema识别。因为我们输入的内容是直接的json类容,没有相关schema,这里只是希望kafka原样解析logstash输出的json内容到es
[root@kafka-logstash kafka]# pwd/root/confluent-4.1.1/etc/kafka[root@kafka-logstash kafka]# egrep -v "^#|^$" connect-standalone.propertiesbootstrap.servers=localhost:9092key.converter=org.apache.kafka.connect.json.JsonConvertervalue.converter=org.apache.kafka.connect.json.JsonConverterkey.converter.schemas.enable=falsevalue.converter.schemas.enable=falseinternal.key.converter=org.apache.kafka.connect.json.JsonConverterinternal.value.converter=org.apache.kafka.connect.json.JsonConverterinternal.key.converter.schemas.enable=falseinternal.value.converter.schemas.enable=falseoffset.storage.file.filename=/tmp/connect.offsetsoffset.flush.interval.ms=10000plugin.path=share/java
如果不做上述修改,connect总会在将日志sink到ES时提示无法反序列化,magic byte错误等。如果使用confluent status命令查看,会发现connect会从up变为down
[root@kafka-logstash confluent-4.1.1]# ./bin/confluent statusksql-server is [DOWN]connect is [DOWN]kafka-rest is [UP]schema-registry is [UP]kafka is [UP]zookeeper is [UP]
schema-registry 相关配置
[root@kafka-logstash schema-registry]# pwd/root/confluent-4.1.1/etc/schema-registry[root@kafka-logstash schema-registry]# egrep -v "^#|^$"connect-avro-distributed.properties connect-avro-standalone.properties log4j.properties schema-registry.properties[root@kafka-logstash schema-registry]# egrep -v "^#|^$" connect-avro-standalone.propertiesbootstrap.servers=localhost:9092key.converter.schema.registry.url=schema-registry]# egrep -v "^#|^$" schema-registry.propertieslisteners=kafka-connect-elasticsearch]# pwd/root/confluent-4.1.1/etc/kafka-connect-elasticsearch[root@kafka-logstash kafka-connect-elasticsearch]# egrep -v "^#|^$" quickstart-elasticsearch.propertiesname=f5-dnsconnector.class=io.confluent.connect.elasticsearch.ElasticsearchSinkConnectortasks.max=1topics=f5-dns-kafkakey.ignore=truevalue.ignore=trueschema.ignore=trueconnection.url=router来实现将topic按天动态映射为ES中的index,这样可以让ES每天产生一个index。注意需要配置schema.ignore=true,否则kafka无法将受收到的数据发送到ES上,connect的 connect.stdout 日志会显示:
[root@kafka-logstash connect]# pwd/tmp/confluent.dA0KYIWj/connectCaused by: org.apache.kafka.connect.errors.DataException: Cannot infer mapping without schema. at io.confluent.connect.elasticsearch.Mapping.inferMapping(Mapping.java:84) at io.confluent.connect.elasticsearch.jest.JestElasticsearchClient.createMapping(JestElasticsearchClient.java:221) at io.confluent.connect.elasticsearch.Mapping.createMapping(Mapping.java:66) at io.confluent.connect.elasticsearch.ElasticsearchWriter.write(ElasticsearchWriter.java:260) at io.confluent.connect.elasticsearch.ElasticsearchSinkTask.put(ElasticsearchSinkTask.java:162) at org.apache.kafka.connect.runtime.WorkerSinkTask.deliverMessages(WorkerSinkTask.java:524)
配置修正完毕后,向logstash发送数据,发现日志已经可以正常发送到了ES上,且格式和没有kafka时是一致的。
没有kafka时:
{ "_index": "f5-dns-2018.06.26", "_type": "doc", "_id": "KrddO2QBXB-i0ay0g5G9", "_version": 1, "_score": 1, "_source": { "message": "localhost.lan 202.202.102.100 53777 172.16.199.136 42487 test.com A NOERROR GTM_REWRITE ", "F5hostname": "localhost.lan", "qid": "42487", "clientip": "202.202.102.100", "geoip": { "region_name": "Chongqing", "location": { "lon": 106.5528, "lat": 29.5628 }, "country_code2": "CN", "timezone": "Asia/Shanghai", "country_name": "China", "region_code": "50", "continent_code": "AS", "city_name": "Chongqing", "country_code3": "CN", "ip": "202.202.102.100", "latitude": 29.5628, "longitude": 106.5528 }, "status": "NOERROR", "qname": "test.com", "clientport": "53777", "@version": "1", "@timestamp": "2018-06-26T09:12:21.585Z", "host": "192.168.214.1", "type": "f5-dns", "qtype": "A", "origin": "GTM_REWRITE ", "svrip": "172.16.199.136" }}
有kafka时:
{ "_index": "f5-dns-kafka-20180628", "_type": "doc", "_id": "f5-dns-kafka-20180628+0+23", "_version": 1, "_score": 1, "_source": { "F5hostname": "localhost.lan", "geoip": { "city_name": "Chongqing", "timezone": "Asia/Shanghai", "ip": "202.202.100.100", "latitude": 29.5628, "country_name": "China", "country_code2": "CN", "continent_code": "AS", "country_code3": "CN", "region_name": "Chongqing", "location": { "lon": 106.5528, "lat": 29.5628 }, "region_code": "50", "longitude": 106.5528 }, "qtype": "A", "origin": "DNSX ", "type": "f5-dns", "message": "localhost.lan 202.202.100.100 53777 172.16.199.136 42487 myf5.net A NOERROR DNSX ", "qid": "42487", "clientport": "53777", "@timestamp": "2018-06-28T09:05:20.594Z", "clientip": "202.202.100.100", "qname": "myf5.net", "host": "192.168.214.1", "@version": "1", "svrip": "172.16.199.136", "status": "NOERROR" }}
相关REST API输出
{ "id": { "connector": "elasticsearch-sink", "task": 0 }, "config": { "connector.class": "io.confluent.connect.elasticsearch.ElasticsearchSinkConnector", "type.name": "doc", "value.ignore": "true", "tasks.max": "1", "topics": "f5-dns-kafka", "transforms.MyRouter.topic.format": "${topic}-${timestamp}", "transforms": "MyRouter", "key.ignore": "true", "schema.ignore": "true", "transforms.MyRouter.timestamp.format": "yyyyMMdd", "task.class": "io.confluent.connect.elasticsearch.ElasticsearchSinkTask", "name": "elasticsearch-sink", "connection.url": " "transforms.MyRouter.type": "org.apache.kafka.connect.transforms.TimestampRouter" } }] "name": "elasticsearch-sink", "config": { "connector.class": "io.confluent.connect.elasticsearch.ElasticsearchSinkConnector", "type.name": "doc", "value.ignore": "true", "tasks.max": "1", "topics": "f5-dns-kafka", "transforms.MyRouter.topic.format": "${topic}-${timestamp}", "transforms": "MyRouter", "key.ignore": "true", "schema.ignore": "true", "transforms.MyRouter.timestamp.format": "yyyyMMdd", "name": "elasticsearch-sink", "connection.url": " "transforms.MyRouter.type": "org.apache.kafka.connect.transforms.TimestampRouter" }, "tasks": [ { "connector": "elasticsearch-sink", "task": 0 } ], "type": "sink"} "name": "elasticsearch-sink", "connector": { "state": "RUNNING", "worker_id": "172.16.150.179:8083" }, "tasks": [ { "state": "RUNNING", "id": 0, "worker_id": "172.16.150.179:8083" } ], "type": "sink"}
"brokers": [ 0 ]} "__confluent.support.metrics", "_confluent-ksql-default__command_topic", "_schemas", "connect-configs", "connect-offsets", "connect-statuses", "f5-dns-2018.06", "f5-dns-2018.06.27", "f5-dns-kafka", "test-elasticsearch-sink"] "name": "f5-dns-kafka", "configs": { "file.delete.delay.ms": "60000", "segment.ms": "604800000", "min.compaction.lag.ms": "0", "retention.bytes": "-1", "segment.index.bytes": "10485760", "cleanup.policy": "delete", "follower.replication.throttled.replicas": "", "message.timestamp.difference.max.ms": "9223372036854775807", "segment.jitter.ms": "0", "preallocate": "false", "segment.bytes": "1073741824", "message.timestamp.type": "CreateTime", "message.format.version": "1.1-IV0", "max.message.bytes": "1000012", "unclean.leader.election.enable": "false", "retention.ms": "604800000", "flush.ms": "9223372036854775807", "delete.retention.ms": "86400000", "leader.replication.throttled.replicas": "", "min.insync.replicas": "1", "flush.messages": "9223372036854775807", "compression.type": "producer", "min.cleanable.dirty.ratio": "0.5", "index.interval.bytes": "4096" }, "partitions": [ { "partition": 0, "leader": 0, "replicas": [ { "broker": 0, "leader": true, "in_sync": true } ] } ]}
测试中kafka的配置基本都为确实配置,没有考虑任何的内存优化,kafka使用磁盘的大小考虑等
测试参考:
CLI:
confluent: A command line interface to manage Confluent servicesUsage: confluent
confluent platform 服务端口表
参考
版权声明:本文内容由网络用户投稿,版权归原作者所有,本站不拥有其著作权,亦不承担相应法律责任。如果您发现本站中有涉嫌抄袭或描述失实的内容,请联系我们jiasou666@gmail.com 处理,核实后本网站将在24小时内删除侵权内容。
发表评论
暂时没有评论,来抢沙发吧~