linux怎么查看本机内存大小
418
2022-11-25
Kafka集群监控、安全机制与最佳实践
Kafka监控安装
Kafka集群监控方案选择:
Kafka只能依靠kafka-run-class.sh等命令进行管理 Kafka Manager(CMAK)是目前比较常用的监控工具,它有如下功能: 管理多个集群 轻松检查群集状态(主题,消费者,偏移,代理,副本分发,分区分发) 运行首选副本选举 使用选项生成分区分配以选择要使用的代理 运行分区重新分配(基于生成的分配) 使用可选主题配置创建主题(0.8.1.1具有与0.8.2+不同的配置) 删除主题(仅支持0.8.2+并记住在代理配置中设置delete.topic.enable = true) 主题列表现在指示标记为删除的主题(仅支持0.8.2+) 批量生成多个主题的分区分配,并可选择要使用的代理 批量运行重新分配多个主题的分区 将分区添加到现有主题 更新现有主题的配置 可随意开启对broker级别或者对topic级别的JMX轮询 可方便的过滤出没有id 、所有者、延迟或目录等的消费者
所以本小节先介绍该监控工具的安装及配置,到如下地址可以下载各个版本的Kafka Manager:
~]# cd /usr/local/src [root@localhost /usr/local/src]# wget /usr/local/src]# unzip cmak-3.0.0.5.zip [root@localhost /usr/local/src]# mv cmak-3.0.0.5 /usr/local/kafka/kafka-manager
然后修改一下配置文件,主要是配置Kafka集群中Zookeeper的连接地址,也就是要监控哪个Kafka集群就配置哪个Kafka集群的Zookeeper地址:
[root@localhost ~]# vim /usr/local/kafka/kafka-manager/conf/application.conf # 配置多个地址使用逗号分隔即可 cmak.zkhosts="127.0.0.1:2181"
配置完成后,使用如下命令启动即可:
[root@localhost ~]# nohup /usr/local/kafka/kafka-manager/bin/cmak &
如下,正常监听了9000端口代表已经启动成功:
[root@localhost ~]# netstat -lntp |grep 9000 tcp6 0 0 :::9000 :::* LISTEN 25237/java [root@localhost ~]#
如果你的机器打开了防火墙的话,还需要在防火墙上放开9000端口:
[root@localhost ~]# firewall-cmd --zone=public --add-port=9000/tcp --permanent [root@localhost ~]# firewall-cmd --reload
Kafka监控界面
需要注意的是,如果要开启JMX轮询,则必须事先在Kafka的启动脚本中打开JMX的端口号:
[root@localhost ~]# vim /usr/local/kafka/bin/kafka-server-start.sh # 打开JMX端口 export JMX_PORT=9999
然后重启Kafka:
[root@localhost ~]# kafka-server-stop.sh [root@localhost ~]# nohub kafka-server-start.sh /usr/local/kafka/config/server.properties &
Partitions:Topic的Partition数量 Brokers:Topic的Broker数量 Replicas:Topic中Partition的副本数量 Under Replicated:Topic中的Partition副本处于同步失败或失效状态的比例,该指标过高则代表副本没有复制到足够的Broker上 Producer Message/Sec:生产者每秒投递的消息数量 Summed Recent Offsets:当前总计的消费偏移量 Brokers Spread:看作broker使用率,如kafka集群9个broker,某topic有7个partition,则broker spread: 7 / 9 = 77% Brokers Skew:partition是否存在倾斜,如kafka集群9个broker,某topic有18个partition,正常每个broker应该2个partition。若其中有3个broker上的partition数>2,则broker skew: 3 / 9 = 33% Brokers Leader Skew:leader partition是否存在倾斜,如kafka集群9个broker,某topic14个partition,则正常每个broker有2个leader partition。若其中一个broker有0个leader partition,一个有4个leader partition,则broker leader skew: (4 - 2) / 14 = 14%
Messages in /sec:每秒流入的消息数 Bytes in /sec:每秒流入的字节数 Bytes out /sec:每秒流出的字节数 Bytes rejected /sec:每秒拒绝的字节数 Failed fetch request /sec:每秒失败抓取数据请求数 Failed produce request /sec:每秒失败的生产数据请求数
Tips:这些指标的监控需要打开JMX
Kafka SSL签名库生成
Kafka的安全措施:
Kafka提供了SSL或SASL机制来保障集群安全 Kafka提供了Broker到zk连接的安全机制 Kafka支持Client的读写验证
值得一提的是通常情况下都不会给Kafka加安全措施,类似的其他中间件也是。因为通常我们都会将这些中间件部署在一个可信的网络里,例如与外网隔离的内部网络,并且有防火墙进行保护。
而且给Kafka加上SSL或SASL安全机制也会导致性能有所损耗,通常这个损耗在20~30%左右。但如果你的Kafka是允许在外网进行访问的话,那么就需要考虑增加安全机制了。
在本文中主要介绍一下SSL这种安全机制,SSL如今也不算啥冷门知识了,对HTTPS有所了解的话都应该清楚,这里就不进行赘述了。首先我们知道SSL是需要证书的,所以第一步就是创建证书,但在此之前需要先创建密钥仓库,用于存储证书文件。具体命令如下:
[root@localhost ~]# mkdir ca-store # 创建一个目录 [root@localhost ~]# cd ca-store/ # 进入该目录 [root@localhost ~/ca-store]# keytool -keystore server.keystore.jks -alias mykafka -validity 100000 -genkey # 创建密钥仓库 Enter keystore password: # 输入密码 Re-enter new password: # 确认密码 What is your first and last name? # 输入你的姓名 [Unknown]: lingyi What is the name of your organizational unit? # 输入你的组织单位 [Unknown]: zj What is the name of your organization? # 输入你的组织名称 [Unknown]: zj What is the name of your City or Locality? # 输入你的所在城市 [Unknown]: beijing What is the name of your State or Province? # 输入你的所在省份 [Unknown]: beijing What is the two-letter country code for this unit? # 输入两个字母的国家代码 [Unknown]: cn Is CN=lingyi, OU=zj, O=zj, L=beijing, ST=beijing, C=cn correct? [no]: y # 输入y确认以上信息 [root@localhost ~/ca-store]# ls server.keystore.jks # 创建完成后,当前目录下会有这样一个文件 [root@localhost ~/ca-store]#
创建CA证书:
[root@localhost ~]# openssl req -new -x509 -keyout ca-key -out ca-cert -days 100000 Generating a 2048 bit RSA private key ........................+++ ........+++ writing new private key to 'ca-key' Enter PEM pass phrase: # 输入密码 Verifying - Enter PEM pass phrase: # 确认密码 ----- You are about to be asked to enter information that will be incorporated into your certificate request. What you are about to enter is what is called a Distinguished Name or a DN. There are quite a few fields but you can leave some blank For some fields there will be a default value, If you enter '.', the field will be left blank. ----- Country Name (2 letter code) [XX]:cn # 输入两个字母的国家代码 State or Province Name (full name) []:beijing # 输入你的所在省份 Locality Name (eg, city) [Default City]:beijing # 输入你的所在城市 Organization Name (eg, company) [Default Company Ltd]:zj # 输入你的组织单位 Organizational Unit Name (eg, section) []:zj # 输入你的组织名称 Common Name (eg, your name or your server's hostname) []:lingyi # 输入你的姓名或服务器名称 Email Address []:email@example.com # 输入你的邮箱地址 [root@localhost ~/ca-store]# ls # 创建完成后,当前目录下会多出两个文件 ca-cert ca-key server.keystore.jks [root@localhost ~/ca-store]#
将生成的CA添加到客户端信任库:
[root@localhost ~]# keytool -keystore client.truststore.jks -alias CARoot -import -file ca-cert Enter keystore password: # 输入密码 Re-enter new password: # 确认密码 Owner: EMAILADDRESS=binary0_1@163.com, CN=lingyi, OU=zj, O=zj, L=beijing, ST=beijing, C=cn Issuer: EMAILADDRESS=binary0_1@163.com, CN=lingyi, OU=zj, O=zj, L=beijing, ST=beijing, C=cn Serial number: e6fa410f7c90ff2a Valid from: Mon Jul 06 22:20:50 CST 2020 until: Sat Apr 21 22:20:50 CST 2294 Certificate fingerprints: SHA1: 1F:F3:8C:F4:37:9C:47:45:42:A4:51:77:7D:DA:05:E5:59:27:0C:9F SHA256: F6:7E:F6:E2:A9:12:8B:C4:04:6E:F0:23:49:6F:0D:3C:94:5F:AD:D6:76:42:42:63:24:69:96:C6:EE:02:70:91 Signature algorithm name: SHA256withRSA Subject Public Key Algorithm: 2048-bit RSA key Version: 3 Extensions: #1: ObjectId: 2.5.29.35 Criticality=false AuthorityKeyIdentifier [ KeyIdentifier [ 0000: 9A 1D 7C 61 ED 94 C0 BC 13 EA 20 3B 59 05 6A F9 ...a...... ;Y.j. 0010: 40 3B E8 4D @;.M ] ] #2: ObjectId: 2.5.29.19 Criticality=false BasicConstraints:[ CA:true PathLen:2147483647 ] #3: ObjectId: 2.5.29.14 Criticality=false SubjectKeyIdentifier [ KeyIdentifier [ 0000: 9A 1D 7C 61 ED 94 C0 BC 13 EA 20 3B 59 05 6A F9 ...a...... ;Y.j. 0010: 40 3B E8 4D @;.M ] ] Trust this certificate? [no]: y # 是否信任此证书 Certificate was added to keystore [root@localhost ~/ca-store]# ls ca-cert ca-key client.truststore.jks server.keystore.jks [root@localhost ~/ca-store]#
为Broker提供信任库以及所有客户端签名了密钥的CA证书:
[root@localhost ~/ca-store]# keytool -keystore server.truststore.jks -alias CARoot -import -file ca-cert ... 与之前的内容一致,略 ... [root@localhost ~/ca-store]# ls ca-cert ca-key client.truststore.jks server.keystore.jks server.truststore.jks [root@localhost ~/ca-store]#
完成以上步骤后,就是对证书进行签名,也就是用自己生成的CA来签名前面生成的证书。
1、从密钥仓库导出证书:
[root@localhost ~/ca-store]# keytool -keystore server.keystore.jks -alias mykafka -certreq -file cert-file Enter keystore password: # 这里输入server.keystore.jks的密码 [root@localhost ~/ca-store]#
2、用CA签名:
[root@localhost ~/ca-store]# openssl x509 -req -CA ca-cert -CAkey ca-key -in cert-file -out cert-signed -days 100000 -CAcreateserial -passin pass:123456 Signature ok subject=/C=cn/ST=beijing/L=beijing/O=zj/OU=zj/CN=lingyi Getting CA Private Key [root@localhost ~/ca-store]#
3、导入CA的证书和已签名的证书到密钥仓库:
[root@localhost ~/ca-store]# keytool -keystore server.keystore.jks -alias CARoot -import -file ca-cert ... 与之前的内容一致,略 ... [root@localhost ~/ca-store]# keytool -keystore server.keystore.jks -alias mykafka -import -file cert-signed Enter keystore password: Certificate reply was installed in keystore [root@localhost ~/ca-store]# ls # 完成所有步骤后,当前目录下会有如下文件 ca-cert ca-cert.srl ca-key cert-file cert-signed client.truststore.jks server.keystore.jks server.truststore.jks [root@localhost ~/ca-store]#
Kafka SSL服务端集成
Kafka SSL服务端集成其实也比较简单,只需要修改一下Kafka的server.properties配置文件即可。具体如下所示:
[root@localhost ~]# vim /usr/local/kafka/config/server.properties # 在原本的配置上追加SSL的监听端口及协议配置 listeners=PLAINTEXT://192.168.220.128:9092,SSL://192.168.220.128:8989 advertised.listeners=PLAINTEXT://192.168.220.128:9092,SSL://192.168.220.128:8989 # 增加SSL相关配置 ssl.keystore.location=/root/ca-store/server.keystore.jks ssl.keystore.password=123456 ssl.key.password=123456 ssl.truststore.location=/root/ca-store/server.truststore.jks ssl.truststore.password=123456
完成配置的修改后重启Kafka:
[root@localhost ~]# kafka-server-stop.sh [root@localhost ~]# kafka-server-start.sh /usr/local/kafka/config/server.properties &
然后就可以使用openssl测试一下SSL配置是否成功,执行如下命令并输出了类似的内容则代表配置成功,已经能够通过SSL协议连接:
[root@localhost ~]# openssl s_client -debug -connect 192.168.220.128:8989 -tls1
CONNECTED(00000003)
write to 0xfcd230 [0xfd6d63] (181 bytes => 181 (0xB5))
0000 - 16 03 01 00 b0 01 00 00-ac 03 01 0c a9 85 ea 8f ................
0010 - f2 f3 c1 ac fb 9d f6 78-9c ed f4 60 97 ad 91 33 .......x...`...3
0020 - 32 ab b2 81 9e 81 6b 3f-e0 db da 00 00 64 c0 14 2.....k?.....d..
0030 - c0 0a 00 39 00 38 00 37-00 36 00 88 00 87 00 86 ...9.8.7.6......
0040 - 00 85 c0 0f c0 05 00 35-00 84 c0 13 c0 09 00 33 .......5.......3
0050 - 00 32 00 31 00 30 00 9a-00 99 00 98 00 97 00 45 .2.1.0.........E
0060 - 00 44 00 43 00 42 c0 0e-c0 04 00 2f 00 96 00 41 .D.C.B...../...A
0070 - c0 12 c0 08 00 16 00 13-00 10 00 0d c0 0d c0 03 ................
0080 - 00 0a 00 07 c0 11 c0 07-c0 0c c0 02 00 05 00 04 ................
0090 - 00 ff 01 00 00 1f 00 0b-00 04 03 00 01 02 00 0a ................
00a0 - 00 0a 00 08 00 17 00 19-00 18 00 16 00 23 00 00 .............#..
00b0 - 00 0f 00 01 01 .....
read from 0xfcd230 [0xfd2813] (5 bytes => 5 (0x5))
0005 -
Kafka SSL客户端集成
然后在创建Producer客户端的时候,增加SSL相关配置项。如下代码示例:
/**
* 创建支持SSL的Producer实例
*/
public static Producer
Consumer客户端也是同样的,只需要在创建客户端实例的时候增加相同的SSL配置即可。完整代码如下:
/**
* 创建支持SSL的Consumer实例
*/
public static Consumer
Kafka最佳实践配置项
服务端必要参数
zookeeper.connect:必配参数,建议在kafka集群的每台实例都配置所有的zk节点
broker.id:必配参数。集群节点的标示符,不得重复,取值范围0~n
log.dirs:不要使用默认的“/tmp/kafka-logs”,因为/tmp目录的性质没法保证数据的持久性
服务端推荐参数
advertised.host.name:注册到zk供用户使用的主机名
advertised.port:注册到zk供用户使用的服务端口
num.partitions:创建topic时的默认partition数量,默认是1
default.replication.factor:自动创建topic的默认副本数量,建议至少修改为2
min.insync.replicasISR:提交生成者请求的最小副本数,建议至少2~3个
unclean.leader.election.enable:是否允许不具备ISR资格的replicas被选举为leader,建议设置为否,除非能够允许数据的丢失
controlled.shutdown.enable:在kafka收到stop命令或者异常终止时,允许自动同步数据,建议开启
可动态调整的参数
unclean.leader.election.enable:不严格的leader选举,有助于集群健壮,但是存在数据丢失风险。
min.insync.replicas:如果同步状态的副本小于该值,服务器将不再接受request.required.acks为-1或all的写入请求。
max.message.bytes:单条消息的最大长度。如果修改了该值,那么replica.fetch.max.bytes和消费者的fetch.message.max.bytes也要跟着修改。
cleanup.policy:生命周期终结数据的处理,默认删除。
flush.messages:强制刷新写入的最大缓存消息数。
flush.ms:强制刷新写入的最大等待时长。
客户端配置:
Producer客户端:ack、压缩、同步生产 vs 异步生产、批处理大小(异步生产)
Consumer客户端方面主要考虑:partition数量及获取消息的大小
Kafka服务器配置最佳实践
JVM参数建议:
1、能分配较大堆的情况下使用JVM的G1垃圾回收器
以下是一段基于24GB内存、四核英特尔至强处理器,8x7200转的SATA硬盘机器的配置参考示例,可以参照该示例代入自己的机器配置进行调整:
-Xmx6g -Xms6g -XX:MetaspaceSize=96m -XX:+UseG1GC-XX:MaxGCPauseMillis=20 -XX:InitiatingHeapOccupancyPercent=35 -XX:G1HeapRegionSize=16M -XX:MinMetaspaceFreeRatio=50 -XX:MaxMetaspaceFreeRatio=80
机器硬件建议:
1. 内存:建议使用64G内存的机器2. CPU:尽量选择更多核心,将会获得多核带来的更好的并发处理性能3. 磁盘:RAID是优先推荐的,SSD可以考虑4. 网络:最好是万兆网络,千兆也可5. 文件系统:ext4是最佳选择6. 操作系统:任何Unix系统上运行良好,并且已经在Linux和Solaris上进行了测试
核心参数调整建议:
文件描述符数量调整:(number_of_partitions)*(partition_size / segment_size),通常都在100000左右 视具体情况调整最大套接字缓冲区大小 pagecache:尽量分配与大多数日志的激活日志段大小一致 禁用swap 设计broker的数量:单broker上的分区数小于2000;分区大小则建议不要超过25GB 设计partition的数量: 至少和最大的消费者组中consumer的数量一致 分区不要太大,建议小于25GB
版权声明:本文内容由网络用户投稿,版权归原作者所有,本站不拥有其著作权,亦不承担相应法律责任。如果您发现本站中有涉嫌抄袭或描述失实的内容,请联系我们jiasou666@gmail.com 处理,核实后本网站将在24小时内删除侵权内容。
发表评论
暂时没有评论,来抢沙发吧~