Kafka集群监控、安全机制与最佳实践

网友投稿 368 2022-11-25

Kafka集群监控、安全机制与最佳实践

Kafka监控安装

Kafka集群监控方案选择:

Kafka只能依靠kafka-run-class.sh等命令进行管理 Kafka Manager(CMAK)是目前比较常用的监控工具,它有如下功能: 管理多个集群 轻松检查群集状态(主题,消费者,偏移,代理,副本分发,分区分发) 运行首选副本选举 使用选项生成分区分配以选择要使用的代理 运行分区重新分配(基于生成的分配) 使用可选主题配置创建主题(0.8.1.1具有与0.8.2+不同的配置) 删除主题(仅支持0.8.2+并记住在代理配​​置中设置delete.topic.enable = true) 主题列表现在指示标记为删除的主题(仅支持0.8.2+) 批量生成多个主题的分区分配,并可选择要使用的代理 批量运行重新分配多个主题的分区 将分区添加到现有主题 更新现有主题的配置 可随意开启对broker级别或者对topic级别的JMX轮询 可方便的过滤出没有id 、所有者、延迟或目录等的消费者

所以本小节先介绍该监控工具的安装及配置,到如下地址可以下载各个版本的Kafka Manager:

~]# cd /usr/local/src [root@localhost /usr/local/src]# wget /usr/local/src]# unzip cmak-3.0.0.5.zip [root@localhost /usr/local/src]# mv cmak-3.0.0.5 /usr/local/kafka/kafka-manager

然后修改一下配置文件,主要是配置Kafka集群中Zookeeper的连接地址,也就是要监控哪个Kafka集群就配置哪个Kafka集群的Zookeeper地址:

[root@localhost ~]# vim /usr/local/kafka/kafka-manager/conf/application.conf # 配置多个地址使用逗号分隔即可 cmak.zkhosts="127.0.0.1:2181"

配置完成后,使用如下命令启动即可:

[root@localhost ~]# nohup /usr/local/kafka/kafka-manager/bin/cmak &

如下,正常监听了9000端口代表已经启动成功:

[root@localhost ~]# netstat -lntp |grep 9000 tcp6 0 0 :::9000 :::* LISTEN 25237/java [root@localhost ~]#

如果你的机器打开了防火墙的话,还需要在防火墙上放开9000端口:

[root@localhost ~]# firewall-cmd --zone=public --add-port=9000/tcp --permanent [root@localhost ~]# firewall-cmd --reload

Kafka监控界面

需要注意的是,如果要开启JMX轮询,则必须事先在Kafka的启动脚本中打开JMX的端口号:

[root@localhost ~]# vim /usr/local/kafka/bin/kafka-server-start.sh # 打开JMX端口 export JMX_PORT=9999

然后重启Kafka:

[root@localhost ~]# kafka-server-stop.sh [root@localhost ~]# nohub kafka-server-start.sh /usr/local/kafka/config/server.properties &

Partitions:Topic的Partition数量 Brokers:Topic的Broker数量 Replicas:Topic中Partition的副本数量 Under Replicated:Topic中的Partition副本处于同步失败或失效状态的比例,该指标过高则代表副本没有复制到足够的Broker上 Producer Message/Sec:生产者每秒投递的消息数量 Summed Recent Offsets:当前总计的消费偏移量 Brokers Spread:看作broker使用率,如kafka集群9个broker,某topic有7个partition,则broker spread: 7 / 9 = 77% Brokers Skew:partition是否存在倾斜,如kafka集群9个broker,某topic有18个partition,正常每个broker应该2个partition。若其中有3个broker上的partition数>2,则broker skew: 3 / 9 = 33% Brokers Leader Skew:leader partition是否存在倾斜,如kafka集群9个broker,某topic14个partition,则正常每个broker有2个leader partition。若其中一个broker有0个leader partition,一个有4个leader partition,则broker leader skew: (4 - 2) / 14 = 14%

Messages in /sec:每秒流入的消息数 Bytes in /sec:每秒流入的字节数 Bytes out /sec:每秒流出的字节数 Bytes rejected /sec:每秒拒绝的字节数 Failed fetch request /sec:每秒失败抓取数据请求数 Failed produce request /sec:每秒失败的生产数据请求数

Tips:这些指标的监控需要打开JMX

Kafka SSL签名库生成

Kafka的安全措施:

Kafka提供了SSL或SASL机制来保障集群安全 Kafka提供了Broker到zk连接的安全机制 Kafka支持Client的读写验证

值得一提的是通常情况下都不会给Kafka加安全措施,类似的其他中间件也是。因为通常我们都会将这些中间件部署在一个可信的网络里,例如与外网隔离的内部网络,并且有防火墙进行保护。

而且给Kafka加上SSL或SASL安全机制也会导致性能有所损耗,通常这个损耗在20~30%左右。但如果你的Kafka是允许在外网进行访问的话,那么就需要考虑增加安全机制了。

在本文中主要介绍一下SSL这种安全机制,SSL如今也不算啥冷门知识了,对HTTPS有所了解的话都应该清楚,这里就不进行赘述了。首先我们知道SSL是需要证书的,所以第一步就是创建证书,但在此之前需要先创建密钥仓库,用于存储证书文件。具体命令如下:

[root@localhost ~]# mkdir ca-store # 创建一个目录 [root@localhost ~]# cd ca-store/ # 进入该目录 [root@localhost ~/ca-store]# keytool -keystore server.keystore.jks -alias mykafka -validity 100000 -genkey # 创建密钥仓库 Enter keystore password: # 输入密码 Re-enter new password: # 确认密码 What is your first and last name? # 输入你的姓名 [Unknown]: lingyi What is the name of your organizational unit? # 输入你的组织单位 [Unknown]: zj What is the name of your organization? # 输入你的组织名称 [Unknown]: zj What is the name of your City or Locality? # 输入你的所在城市 [Unknown]: beijing What is the name of your State or Province? # 输入你的所在省份 [Unknown]: beijing What is the two-letter country code for this unit? # 输入两个字母的国家代码 [Unknown]: cn Is CN=lingyi, OU=zj, O=zj, L=beijing, ST=beijing, C=cn correct? [no]: y # 输入y确认以上信息 [root@localhost ~/ca-store]# ls server.keystore.jks # 创建完成后,当前目录下会有这样一个文件 [root@localhost ~/ca-store]#

创建CA证书:

[root@localhost ~]# openssl req -new -x509 -keyout ca-key -out ca-cert -days 100000 Generating a 2048 bit RSA private key ........................+++ ........+++ writing new private key to 'ca-key' Enter PEM pass phrase: # 输入密码 Verifying - Enter PEM pass phrase: # 确认密码 ----- You are about to be asked to enter information that will be incorporated into your certificate request. What you are about to enter is what is called a Distinguished Name or a DN. There are quite a few fields but you can leave some blank For some fields there will be a default value, If you enter '.', the field will be left blank. ----- Country Name (2 letter code) [XX]:cn # 输入两个字母的国家代码 State or Province Name (full name) []:beijing # 输入你的所在省份 Locality Name (eg, city) [Default City]:beijing # 输入你的所在城市 Organization Name (eg, company) [Default Company Ltd]:zj # 输入你的组织单位 Organizational Unit Name (eg, section) []:zj # 输入你的组织名称 Common Name (eg, your name or your server's hostname) []:lingyi # 输入你的姓名或服务器名称 Email Address []:email@example.com # 输入你的邮箱地址 [root@localhost ~/ca-store]# ls # 创建完成后,当前目录下会多出两个文件 ca-cert ca-key server.keystore.jks [root@localhost ~/ca-store]#

将生成的CA添加到客户端信任库:

[root@localhost ~]# keytool -keystore client.truststore.jks -alias CARoot -import -file ca-cert Enter keystore password: # 输入密码 Re-enter new password: # 确认密码 Owner: EMAILADDRESS=binary0_1@163.com, CN=lingyi, OU=zj, O=zj, L=beijing, ST=beijing, C=cn Issuer: EMAILADDRESS=binary0_1@163.com, CN=lingyi, OU=zj, O=zj, L=beijing, ST=beijing, C=cn Serial number: e6fa410f7c90ff2a Valid from: Mon Jul 06 22:20:50 CST 2020 until: Sat Apr 21 22:20:50 CST 2294 Certificate fingerprints: SHA1: 1F:F3:8C:F4:37:9C:47:45:42:A4:51:77:7D:DA:05:E5:59:27:0C:9F SHA256: F6:7E:F6:E2:A9:12:8B:C4:04:6E:F0:23:49:6F:0D:3C:94:5F:AD:D6:76:42:42:63:24:69:96:C6:EE:02:70:91 Signature algorithm name: SHA256withRSA Subject Public Key Algorithm: 2048-bit RSA key Version: 3 Extensions: #1: ObjectId: 2.5.29.35 Criticality=false AuthorityKeyIdentifier [ KeyIdentifier [ 0000: 9A 1D 7C 61 ED 94 C0 BC 13 EA 20 3B 59 05 6A F9 ...a...... ;Y.j. 0010: 40 3B E8 4D @;.M ] ] #2: ObjectId: 2.5.29.19 Criticality=false BasicConstraints:[ CA:true PathLen:2147483647 ] #3: ObjectId: 2.5.29.14 Criticality=false SubjectKeyIdentifier [ KeyIdentifier [ 0000: 9A 1D 7C 61 ED 94 C0 BC 13 EA 20 3B 59 05 6A F9 ...a...... ;Y.j. 0010: 40 3B E8 4D @;.M ] ] Trust this certificate? [no]: y # 是否信任此证书 Certificate was added to keystore [root@localhost ~/ca-store]# ls ca-cert ca-key client.truststore.jks server.keystore.jks [root@localhost ~/ca-store]#

为Broker提供信任库以及所有客户端签名了密钥的CA证书:

[root@localhost ~/ca-store]# keytool -keystore server.truststore.jks -alias CARoot -import -file ca-cert ... 与之前的内容一致,略 ... [root@localhost ~/ca-store]# ls ca-cert ca-key client.truststore.jks server.keystore.jks server.truststore.jks [root@localhost ~/ca-store]#

完成以上步骤后,就是对证书进行签名,也就是用自己生成的CA来签名前面生成的证书。

1、从密钥仓库导出证书:

[root@localhost ~/ca-store]# keytool -keystore server.keystore.jks -alias mykafka -certreq -file cert-file Enter keystore password: # 这里输入server.keystore.jks的密码 [root@localhost ~/ca-store]#

2、用CA签名:

[root@localhost ~/ca-store]# openssl x509 -req -CA ca-cert -CAkey ca-key -in cert-file -out cert-signed -days 100000 -CAcreateserial -passin pass:123456 Signature ok subject=/C=cn/ST=beijing/L=beijing/O=zj/OU=zj/CN=lingyi Getting CA Private Key [root@localhost ~/ca-store]#

3、导入CA的证书和已签名的证书到密钥仓库:

[root@localhost ~/ca-store]# keytool -keystore server.keystore.jks -alias CARoot -import -file ca-cert ... 与之前的内容一致,略 ... [root@localhost ~/ca-store]# keytool -keystore server.keystore.jks -alias mykafka -import -file cert-signed Enter keystore password: Certificate reply was installed in keystore [root@localhost ~/ca-store]# ls # 完成所有步骤后,当前目录下会有如下文件 ca-cert ca-cert.srl ca-key cert-file cert-signed client.truststore.jks server.keystore.jks server.truststore.jks [root@localhost ~/ca-store]#

Kafka SSL服务端集成

Kafka SSL服务端集成其实也比较简单,只需要修改一下Kafka的server.properties配置文件即可。具体如下所示:

[root@localhost ~]# vim /usr/local/kafka/config/server.properties # 在原本的配置上追加SSL的监听端口及协议配置 listeners=PLAINTEXT://192.168.220.128:9092,SSL://192.168.220.128:8989 advertised.listeners=PLAINTEXT://192.168.220.128:9092,SSL://192.168.220.128:8989 # 增加SSL相关配置 ssl.keystore.location=/root/ca-store/server.keystore.jks ssl.keystore.password=123456 ssl.key.password=123456 ssl.truststore.location=/root/ca-store/server.truststore.jks ssl.truststore.password=123456

完成配置的修改后重启Kafka:

[root@localhost ~]# kafka-server-stop.sh [root@localhost ~]# kafka-server-start.sh /usr/local/kafka/config/server.properties &

然后就可以使用openssl测试一下SSL配置是否成功,执行如下命令并输出了类似的内容则代表配置成功,已经能够通过SSL协议连接:

[root@localhost ~]# openssl s_client -debug -connect 192.168.220.128:8989 -tls1 CONNECTED(00000003) write to 0xfcd230 [0xfd6d63] (181 bytes => 181 (0xB5)) 0000 - 16 03 01 00 b0 01 00 00-ac 03 01 0c a9 85 ea 8f ................ 0010 - f2 f3 c1 ac fb 9d f6 78-9c ed f4 60 97 ad 91 33 .......x...`...3 0020 - 32 ab b2 81 9e 81 6b 3f-e0 db da 00 00 64 c0 14 2.....k?.....d.. 0030 - c0 0a 00 39 00 38 00 37-00 36 00 88 00 87 00 86 ...9.8.7.6...... 0040 - 00 85 c0 0f c0 05 00 35-00 84 c0 13 c0 09 00 33 .......5.......3 0050 - 00 32 00 31 00 30 00 9a-00 99 00 98 00 97 00 45 .2.1.0.........E 0060 - 00 44 00 43 00 42 c0 0e-c0 04 00 2f 00 96 00 41 .D.C.B...../...A 0070 - c0 12 c0 08 00 16 00 13-00 10 00 0d c0 0d c0 03 ................ 0080 - 00 0a 00 07 c0 11 c0 07-c0 0c c0 02 00 05 00 04 ................ 0090 - 00 ff 01 00 00 1f 00 0b-00 04 03 00 01 02 00 0a ................ 00a0 - 00 0a 00 08 00 17 00 19-00 18 00 16 00 23 00 00 .............#.. 00b0 - 00 0f 00 01 01 ..... read from 0xfcd230 [0xfd2813] (5 bytes => 5 (0x5)) 0005 - write to 0xfcd230 [0xfdc2b0] (7 bytes => 7 (0x7)) 0000 - 15 03 01 00 02 02 46 ......F 140425165125520:error:1408F10B:SSL routines:SSL3_GET_RECORD:wrong version number:s3_pkt.c:365: --- no peer certificate available --- No client certificate CA names sent --- SSL handshake has read 5 bytes and written 7 bytes --- New, (NONE), Cipher is (NONE) Secure Renegotiation IS NOT supported Compression: NONE Expansion: NONE No ALPN negotiated SSL-Session: Protocol : TLSv1 Cipher : 0000 Session-ID: Session-ID-ctx: Master-Key: Key-Arg : None Krb5 Principal: None PSK identity: None PSK identity hint: None Start Time: 1594126681 Timeout : 7200 (sec) Verify return code: 0 (ok) ---

Kafka SSL客户端集成

然后在创建Producer客户端的时候,增加SSL相关配置项。如下代码示例:

/** * 创建支持SSL的Producer实例 */ public static Producer createProducerWitchSSL() { Properties properties = new Properties(); // 注意,这里的端口一定得是Kafka服务器上配置的SSL协议监听端口 properties.setProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.119.23:8989"); properties.setProperty(ProducerConfig.ACKS_CONFIG, "all"); properties.setProperty(ProducerConfig.BATCH_SIZE_CONFIG, "16384"); properties.setProperty(ProducerConfig.LINGER_MS_CONFIG, "1"); properties.setProperty(ProducerConfig.BUFFER_MEMORY_CONFIG, "33554432"); properties.setProperty(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG , "org.apache.kafka.common.serialization.StringSerializer"); properties.setProperty(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG , "org.apache.kafka.common.serialization.StringSerializer"); // 配置事务支持 properties.setProperty(ProducerConfig.RETRIES_CONFIG, "1"); properties.setProperty(ProducerConfig.TRANSACTIONAL_ID_CONFIG, "trans-id"); // SSL配置 properties.setProperty(SslConfigs.SSL_ENDPOINT_IDENTIFICATION_ALGORITHM_CONFIG, ""); properties.setProperty(SslConfigs.SSL_TRUSTSTORE_LOCATION_CONFIG, "src\\main\\resources\\client.truststore.jks"); properties.setProperty(SslConfigs.SSL_TRUSTSTORE_PASSWORD_CONFIG, "123456"); properties.setProperty("security.protocol", "SSL"); return new KafkaProducer<>(properties); } /** * 发送消息 */ public static void producerAsyncSend() { String topicName = "MyTopic"; String key = "test-key"; String value = "this is test message!"; Producer producer = createProducerWitchSSL(); // 初始化事务 producer.initTransactions(); try { // 开启事务 producer.beginTransaction(); // 构建消息对象 ProducerRecord record = new ProducerRecord<>(topicName, key, value); // 发送一条消息 Future future = producer.send(record); System.out.println(future.get().timestamp()); // 提交事务 producer.commitTransaction(); } catch (Exception e) { e.printStackTrace(); // 发生异常回滚事务 producer.abortTransaction(); } finally { producer.close(); } }

Consumer客户端也是同样的,只需要在创建客户端实例的时候增加相同的SSL配置即可。完整代码如下:

/** * 创建支持SSL的Consumer实例 */ public static Consumer createConsumerWithSSL() { Properties props = new Properties(); // 注意,这里的端口一定得是Kafka服务器上配置的SSL协议监听端口 props.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.119.23:8989"); props.setProperty(ConsumerConfig.GROUP_ID_CONFIG, "test"); props.setProperty(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "true"); props.setProperty(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, "1000"); props.setProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer"); props.setProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer"); // SSL配置 props.setProperty(SslConfigs.SSL_ENDPOINT_IDENTIFICATION_ALGORITHM_CONFIG, ""); props.setProperty(SslConfigs.SSL_TRUSTSTORE_LOCATION_CONFIG, "src\\main\\resources\\client.truststore.jks"); props.setProperty(SslConfigs.SSL_TRUSTSTORE_PASSWORD_CONFIG, "123456"); props.setProperty("security.protocol", "SSL"); return new KafkaConsumer<>(props); } /** * 消费消息 */ public static void autoCommitOffset() { Consumer consumer = createConsumerWithSSL(); List topics = List.of("MyTopic"); // 订阅一个或多个Topic consumer.subscribe(topics); while (true) { // 从Topic中拉取数据,每1000毫秒拉取一次 ConsumerRecords records = consumer.poll(Duration.ofMillis(1000)); // 每次拉取可能都是一组数据,需要遍历出来 for (ConsumerRecord record : records) { System.out.printf("partition = %d, offset = %d, key = %s, value = %s%n", record.partition(), record.offset(), record.key(), record.value()); } } }

Kafka最佳实践配置项

服务端必要参数

zookeeper.connect:必配参数,建议在kafka集群的每台实例都配置所有的zk节点

broker.id:必配参数。集群节点的标示符,不得重复,取值范围0~n

log.dirs:不要使用默认的“/tmp/kafka-logs”,因为/tmp目录的性质没法保证数据的持久性

服务端推荐参数

advertised.host.name:注册到zk供用户使用的主机名

advertised.port:注册到zk供用户使用的服务端口

num.partitions:创建topic时的默认partition数量,默认是1

default.replication.factor:自动创建topic的默认副本数量,建议至少修改为2

min.insync.replicasISR:提交生成者请求的最小副本数,建议至少2~3个

unclean.leader.election.enable:是否允许不具备ISR资格的replicas被选举为leader,建议设置为否,除非能够允许数据的丢失

controlled.shutdown.enable:在kafka收到stop命令或者异常终止时,允许自动同步数据,建议开启

可动态调整的参数

unclean.leader.election.enable:不严格的leader选举,有助于集群健壮,但是存在数据丢失风险。

min.insync.replicas:如果同步状态的副本小于该值,服务器将不再接受request.required.acks为-1或all的写入请求。

max.message.bytes:单条消息的最大长度。如果修改了该值,那么replica.fetch.max.bytes和消费者的fetch.message.max.bytes也要跟着修改。

cleanup.policy:生命周期终结数据的处理,默认删除。

flush.messages:强制刷新写入的最大缓存消息数。

flush.ms:强制刷新写入的最大等待时长。

客户端配置:

Producer客户端:ack、压缩、同步生产 vs 异步生产、批处理大小(异步生产)

Consumer客户端方面主要考虑:partition数量及获取消息的大小

Kafka服务器配置最佳实践

JVM参数建议:

1、能分配较大堆的情况下使用JVM的G1垃圾回收器

以下是一段基于24GB内存、四核英特尔至强处理器,8x7200转的SATA硬盘机器的配置参考示例,可以参照该示例代入自己的机器配置进行调整:

-Xmx6g -Xms6g -XX:MetaspaceSize=96m -XX:+UseG1GC-XX:MaxGCPauseMillis=20 -XX:InitiatingHeapOccupancyPercent=35 -XX:G1HeapRegionSize=16M -XX:MinMetaspaceFreeRatio=50 -XX:MaxMetaspaceFreeRatio=80

机器硬件建议:

1. 内存:建议使用64G内存的机器2. CPU:尽量选择更多核心,将会获得多核带来的更好的并发处理性能3. 磁盘:RAID是优先推荐的,SSD可以考虑4. 网络:最好是万兆网络,千兆也可5. 文件系统:ext4是最佳选择6. 操作系统:任何Unix系统上运行良好,并且已经在Linux和Solaris上进行了测试

核心参数调整建议:

文件描述符数量调整:(number_of_partitions)*(partition_size / segment_size),通常都在100000左右 视具体情况调整最大套接字缓冲区大小 pagecache:尽量分配与大多数日志的激活日志段大小一致 禁用swap 设计broker的数量:单broker上的分区数小于2000;分区大小则建议不要超过25GB 设计partition的数量: 至少和最大的消费者组中consumer的数量一致 分区不要太大,建议小于25GB

版权声明:本文内容由网络用户投稿,版权归原作者所有,本站不拥有其著作权,亦不承担相应法律责任。如果您发现本站中有涉嫌抄袭或描述失实的内容,请联系我们jiasou666@gmail.com 处理,核实后本网站将在24小时内删除侵权内容。

上一篇:Cloudera Navigator介绍
下一篇:Kria KV260超长干货之开箱指南
相关文章

 发表评论

暂时没有评论,来抢沙发吧~