K8S部署Kafka集群 - 部署笔记
一、基础说明
Kafka和zookeeper是两种典型的有状态的应用集群服务。首先kafka和zookeeper都需要存储盘来保存有状态信息;其次kafka和zookeeper每一个实例都需要有对应的实例Id (Kafka需broker.id, zookeeper需要my.id) 来作为集群内部每个成员的标识,集群内节点之间进行内部通信时需要用到这些标识。
对于这类服务的部署,需要解决两个大的问题:一个是状态保存,另一个是集群管理 (多服务实例管理)。kubernetes中提的StatefulSet方便了有状态集群服务在上的部署和管理。通常来说,通过下面三个手段来实现有状态集群服务的部署:
通过Init Container来做集群的初始化工 作。
通过Headless Service来维持集群成员的稳定关系。
通过Persistent Volume和Persistent Volume Claim提供网络存储来持久化数据。
因此,在K8S集群里面部署类似kafka、zookeeper这种有状态的服务,不能使用Deployment,必须使用StatefulSet来部署,有状态简单来说就是需要持久化数据,比如日志、数据库数据、服务状态等。
StatefulSet 应用场景:
稳定的持久化存储,即Pod重新调度后还是能访问到相同的持久化数据,基于PVC来实现
稳定的网络标志,即Pod重新调度后其PodName和HostName不变,基于Headless Service(即没有Cluster IP的Service)来实现
有序部署,有序扩展,即Pod是有顺序的,在部署或者扩展的时候要依据定义的顺序依次依次进行(即从0到N-1,在下一个Pod运行之前所有之前的Pod必须都是Running和Ready状态),基于init containers来实现
有序收缩,有序删除(即从N-1到0)
StatefulSet组成:
用于定义网络标志(DNS domain)的Headless Service
用于创建PersistentVolumes的volumeClaimTemplates
定义具体应用的StatefulSet
StatefulSet中每个Pod的DNS格式为:
statefulSetName-{0..N-1}.serviceName.namespace.svc.cluster.local,其中:
statefulSetName为StatefulSet的名字
0..N-1为Pod所在的序号,从0开始到N-1
serviceName为Headless Service的名字
namespace为服务所在的namespace,Headless Servic和StatefulSet必须在相同的namespace
svc.cluster.local为K8S的Cluster Domain集群根域
二、部署过程记录(NAS存储)
这里使用K8S搭建一个三节点的kafka容器集群,因为kafka集群需要用到存储,所以需要准备持久卷(Persistent Volume) 简称就是PV。
1. 配置StatefulSet的动态持久化存储
1)使用阿里云NAS存储
阿里云平台创建的NAS存储地址:1*********-beijing.nas.aliyuncs.com
首先在NAS文件系统里创建zk和kafka的子目录。
挂载NAS存储的子目录必须是真实存在的,否则挂载失败,报错不能访问子目录。
创建方法:将NAS存储挂载到任意一个node节点,创建NAS存储的子目录,然后再卸载。
[root@k8s-node04 ~]# ls /mnt/
[root@k8s-node04 ~]# mkdir /mnt/test
[root@k8s-node04 ~]# mount -t nfs -o vers=3,nolock,proto=tcp,rsize=1048576,wsize=1048576,hard,timeo=600,retrans=2,noresvport 1*********-beijing.nas.aliyuncs.com:/ /mnt/test/
[root@k8s-node04 ~]# cd /mnt/test/
[root@k8s-node04 test]# mkdir kafka
[root@k8s-node04 test]# mkdir zk
[root@k8s-node04 test]# ll
total 2
drwxr-xr-x 2 root root 4096 Jan 24 22:53 kafka
drwxr-xr-x 2 root root 4096 Jan 24 22:53 zk
[root@k8s-node04 ~]# df -h|grep mnt
1*********-beijing.nas.aliyuncs.com:/ 10P 0 10P 0% /mnt/test
[root@k8s-node04 ~]# umount /mnt/test
[root@k8s-node04 ~]# rm -rf /mnt/test
[root@k8s-node04 ~]# ls /mnt/
2)创建nfs的rbac
[root@k8s-master01 ~]# mkdir -p /opt/k8s/k8s_project/kafka_zk
[root@k8s-master01 ~]# cd /opt/k8s/k8s_project/kafka_zk
[root@k8s-master01 kafka_zk]# vim nfs-rbac.yaml
---
apiVersion: v1
kind: ServiceAccount
metadata:
name: nfs-provisioner
namespace: wiseco
---
kind: ClusterRole
apiVersion: rbac.authorization.k8s.io/v1
metadata:
name: nfs-provisioner-runner
namespace: wiseco
rules:
- apiGroups: [""]
resources: ["persistentvolumes"]
verbs: ["get", "list", "watch", "create", "delete"]
- apiGroups: [""]
resources: ["persistentvolumeclaims"]
verbs: ["get", "list", "watch", "update"]
- apiGroups: ["storage.k8s.io"]
resources: ["storageclasses"]
verbs: ["get", "list", "watch"]
- apiGroups: [""]
resources: ["events"]
verbs: ["watch", "create", "update", "patch"]
- apiGroups: [""]
resources: ["services", "endpoints"]
verbs: ["get","create","list", "watch","update"]
- apiGroups: ["extensions"]
resources: ["podsecuritypolicies"]
resourceNames: ["nfs-provisioner"]
verbs: ["use"]
---
kind: ClusterRoleBinding
apiVersion: rbac.authorization.k8s.io/v1
metadata:
name: run-nfs-provisioner
subjects:
- kind: ServiceAccount
name: nfs-provisioner
namespace: wiseco
roleRef:
kind: ClusterRole
name: nfs-provisioner-runner
apiGroup: rbac.authorization.k8s.io
创建并查看
[root@k8s-master01 kafka_zk]# kubectl apply -f nfs-rbac.yaml
serviceaccount/nfs-provisioner created
clusterrole.rbac.authorization.k8s.io/nfs-provisioner-runner created
clusterrolebinding.rbac.authorization.k8s.io/run-nfs-provisioner created
[root@k8s-master01 kafka_zk]# kubectl get sa -n wiseco|grep nfs
nfs-provisioner 1 14s
[root@k8s-master01 kafka_zk]# kubectl get clusterrole -n wiseco|grep nfs
nfs-provisioner-runner 2021-01-04T13:03:55Z
[root@k8s-master01 kafka_zk]# kubectl get clusterrolebinding -n wiseco|grep nfs
run-nfs-provisioner ClusterRole/nfs-provisioner-runner 36s
3)创建zk集群的storageclass
[root@k8s-master01 kafka_zk]# mkdir zk
[root@k8s-master01 kafka_zk]# cd zk
[root@k8s-master01 zk]# pwd
/opt/k8s/k8s_project/kafka_zk/zk
[root@k8s-master01 zk]# cat zk-nfs-class.yaml
apiVersion: storage.k8s.io/v1beta1
kind: StorageClass
metadata:
name: zk-nfs-storage
namespace: wiseco
mountOptions:
- vers=4.0
- nolock,tcp,noresvport
provisioner: zk/nfs
reclaimPolicy: Retain
创建并查看
[root@k8s-master01 zk]# kubectl apply -f zk-nfs-class.yaml
storageclass.storage.k8s.io/zk-nfs-storage created
[root@k8s-master01 zk]# kubectl get sc -n wiseco
NAME PROVISIONER RECLAIMPOLICY VOLUMEBINDINGMODE ALLOWVOLUMEEXPANSION AGE
zk-nfs-storage zk/nfs Retain Immediate false 28s
4)创建kafka集群的storageclass
[root@k8s-master01 zk]# mkdir ../kafka
[root@k8s-master01 zk]# cd ../kafka
[root@k8s-master01 kafka]# pwd
/opt/k8s/k8s_project/kafka_zk/kafka
[root@k8s-master01 kafka]# cat kafka-nfs-class.yaml
apiVersion: storage.k8s.io/v1beta1
kind: StorageClass
metadata:
name: kafka-nfs-storage
namespace: wiseco
mountOptions:
- vers=4.0
- nolock,tcp,noresvport
provisioner: kafka/nfs
reclaimPolicy: Retain
创建并查看
[root@k8s-master01 kafka]# kubectl apply -f kafka-nfs-class.yaml
storageclass.storage.k8s.io/kafka-nfs-storage created
[root@k8s-master01 kafka]# kubectl get sc -n wiseco
NAME PROVISIONER RECLAIMPOLICY VOLUMEBINDINGMODE ALLOWVOLUMEEXPANSION AGE
kafka-nfs-storage kafka/nfs Retain Immediate false 4s
zk-nfs-storage zk/nfs Retain Immediate false 12m
5)创建zk集群的nfs-client-provisioner
[root@k8s-master01 zk]# pwd
/opt/k8s/k8s-project/kafka_zk/zk
[root@k8s-master01 zk]# cat zk-nfs.yml
apiVersion: apps/v1
kind: Deployment
metadata:
name: zk-nfs-client-provisioner
namespace: wiseco
spec:
replicas: 1
selector:
matchLabels:
app: zk-nfs-client-provisioner
strategy:
type: Recreate
template:
metadata:
labels:
app: zk-nfs-client-provisioner
spec:
serviceAccount: nfs-provisioner
containers:
- name: zk-nfs-client-provisioner
image: registry.cn-hangzhou.aliyuncs.com/open-ali/nfs-client-provisioner
imagePullPolicy: IfNotPresent
volumeMounts:
- name: nfs-client-root
mountPath: /persistentvolumes
env:
- name: PROVISIONER_NAME
value: zk/nfs
- name: NFS_SERVER
value: 1*********-beijing.nas.aliyuncs.com
- name: NFS_PATH
value: /zk
volumes:
- name: nfs-client-root
nfs:
server: 1*********-beijing.nas.aliyuncs.com
path: /zk
创建并查看
[root@k8s-master01 zk]# kubectl apply -f zk-nfs.yml
deployment.apps/zk-nfs-client-provisioner created
[root@k8s-master01 zk]# kubectl get pods -n wiseco|grep nfs
zk-nfs-client-provisioner-bd8d65798-qrz87 1/1 Running 0 39s
6)创建kafka集群的nfs-client-provisioner
[root@k8s-master01 kafka]# pwd
/opt/k8s/k8s-project/kafka_zk/kafka
[root@k8s-master01 kafka]# cat kafka-nfs.yml
apiVersion: apps/v1
kind: Deployment
metadata:
name: kafka-nfs-client-provisioner
namespace: wiseco
spec:
replicas: 1
selector:
matchLabels:
app: kafka-nfs-client-provisioner
strategy:
type: Recreate
template:
metadata:
labels:
app: kafka-nfs-client-provisioner
spec:
serviceAccount: nfs-provisioner
containers:
- name: kafka-nfs-client-provisioner
image: registry.cn-hangzhou.aliyuncs.com/open-ali/nfs-client-provisioner
imagePullPolicy: IfNotPresent
volumeMounts:
- name: nfs-client-root
mountPath: /persistentvolumes
env:
- name: PROVISIONER_NAME
value: kafka/nfs
- name: NFS_SERVER
value: 1*********-beijing.nas.aliyuncs.com
- name: NFS_PATH
value: /kafka
volumes:
- name: nfs-client-root
nfs:
server: 1*********-beijing.nas.aliyuncs.com
path: /kafka
创建并查看
[root@k8s-master01 kafka]# kubectl apply -f kafka-nfs.yml
deployment.apps/kafka-nfs-client-provisioner created
[root@k8s-master01 kafka]# kubectl get pods -n wiseco|grep nfs
kafka-nfs-client-provisioner-6747dc587c-8qjn7 1/1 Running 0 16s
zk-nfs-client-provisioner-bd8d65798-cdf6h 1/1 Running 0 47s
2. 创建ZK集群
[root@k8s-master01 zk]# pwd
/opt/k8s/k8s_project/kafka_zk/zk
[root@k8s-master01 zk]# ls
zk-nfs-class.yaml zk-nfs.yml
[root@k8s-master01 zk]# vim zk.yaml
apiVersion: v1
kind: Service
metadata:
namespace: wiseco
name: zk-hs
labels:
app: zk
spec:
ports:
- port: 2888
name: server
- port: 3888
name: leader-election
clusterIP: None
selector:
app: zk
---
apiVersion: v1
kind: Service
metadata:
namespace: wiseco
name: zk-cs
labels:
app: zk
spec:
type: NodePort
ports:
- port: 2181
targetPort: 2181
name: client
nodePort: 32181
selector:
app: zk
---
apiVersion: policy/v1beta1
kind: PodDisruptionBudget
metadata:
namespace: wiseco
name: zk-pdb
spec:
selector:
matchLabels:
app: zk
maxUnavailable: 1
---
apiVersion: apps/v1
kind: StatefulSet
metadata:
namespace: wiseco
name: zok
spec:
serviceName: zk-hs
replicas: 3
selector:
matchLabels:
app: zk
template:
metadata:
labels:
app: zk
spec:
affinity:
podAntiAffinity:
requiredDuringSchedulingIgnoredDuringExecution:
- labelSelector:
matchExpressions:
- key: "app"
operator: In
values:
- zk
topologyKey: "kubernetes.io/hostname"
containers:
- name: kubernetes-zookeeper
imagePullPolicy: Always
image: leolee32/kubernetes-library:kubernetes-zookeeper1.0-3.4.10
resources:
requests:
memory: "1024Mi"
cpu: "500m"
ports:
- containerPort: 2181
name: client
- containerPort: 2888
name: server
- containerPort: 3888
name: leader-election
command:
- sh
- -c
- "start-zookeeper \
--servers=3 \
--data_dir=/var/lib/zookeeper/data \
--data_log_dir=/var/lib/zookeeper/data/log \
--conf_dir=/opt/zookeeper/conf \
--client_port=2181 \
--election_port=3888 \
--server_port=2888 \
--tick_time=2000 \
--init_limit=10 \
--sync_limit=5 \
--heap=512M \
--max_client_cnxns=60 \
--snap_retain_count=3 \
--purge_interval=12 \
--max_session_timeout=40000 \
--min_session_timeout=4000 \
--log_level=INFO"
readinessProbe:
exec:
command:
- sh
- -c
- "zookeeper-ready 2181"
initialDelaySeconds: 10
timeoutSeconds: 5
livenessProbe:
exec:
command:
- sh
- -c
- "zookeeper-ready 2181"
initialDelaySeconds: 10
timeoutSeconds: 5
volumeMounts:
- name: datadir
mountPath: /var/lib/zookeeper
volumeClaimTemplates:
- metadata:
name: datadir
annotations:
volume.beta.kubernetes.io/storage-class: "zk-nfs-storage"
spec:
accessModes:
- ReadWriteMany
resources:
requests:
storage: 10Gi
创建并查看
[root@k8s-master01 zk]# kubectl apply -f zk.yaml
service/zk-hs created
service/zk-cs created
poddisruptionbudget.policy/zk-pdb created
statefulset.apps/zok created
查看创建的zk集群信息
查看zk集群pod
[root@k8s-master01 zk]# kubectl get pods -n wiseco|grep zok
zok-0 1/1 Running 0 2m1s
zok-1 1/1 Running 0 88s
zok-2 1/1 Running 0 51s
查看zk集群的svc
[root@k8s-master01 zk]# kubectl get svc -n wiseco|grep zk
zk-cs NodePort 10.254.223.248 2181:32181/TCP 2m24s
zk-hs ClusterIP None 2888/TCP,3888/TCP 2m25s
查看zk集群的pv和pvc
其中:
PV是集群级别的,查询时可以不需要跟"-n 命名空间"
PVC是namespace命名空间级别的,查询时需要跟"-n 命名空间"
[root@k8s-master01 zk]# kubectl get pv -n wiseco|grep zok
pvc-2bace45f-4567-4751-a7ee-233c7034bb09 10Gi RWX Delete Bound wiseco/datadir-zok-0 zk-nfs-storage 2m59s
pvc-e5a6e15a-5765-4889-b4e0-0a3b03856c1e 10Gi RWX Delete Bound wiseco/datadir-zok-1 zk-nfs-storage 2m24s
pvc-e6f00ae5-aaf3-496f-8012-c4bf714f9219 10Gi RWX Delete Bound wiseco/datadir-zok-2 zk-nfs-storage 107s
[root@k8s-master01 zk]# kubectl get pvc -n wiseco|grep zok
datadir-zok-0 Bound pvc-2bace45f-4567-4751-a7ee-233c7034bb09 10Gi RWX zk-nfs-storage 3m2s
datadir-zok-1 Bound pvc-e5a6e15a-5765-4889-b4e0-0a3b03856c1e 10Gi RWX zk-nfs-storage 2m29s
datadir-zok-2 Bound pvc-e6f00ae5-aaf3-496f-8012-c4bf714f9219 10Gi RWX zk-nfs-storage 112s
到NFS服务器端,查看zk集群的持久化共享目录
可以发现,NFS的持久化目录名称组成:namespace名称-PVC名称-PV名称
只要PVC和PC不删除,这个持久化目录名称就不会变。
[root@k8s-harbor01 ~]# cd /data/storage/k8s/zk/
[root@k8s-harbor01 zk]# ll
total 0
drwxrwxrwx 3 root root 18 Jan 4 22:22 wiseco-datadir-zok-0-pvc-2bace45f-4567-4751-a7ee-233c7034bb09
drwxrwxrwx 3 root root 18 Jan 4 22:23 wiseco-datadir-zok-1-pvc-e5a6e15a-5765-4889-b4e0-0a3b03856c1e
drwxrwxrwx 3 root root 18 Jan 4 22:23 wiseco-datadir-zok-2-pvc-e6f00ae5-aaf3-496f-8012-c4bf714f9219
[root@k8s-harbor01 zk]# ls *
wiseco-datadir-zok-0-pvc-2bace45f-4567-4751-a7ee-233c7034bb09:
data
wiseco-datadir-zok-1-pvc-e5a6e15a-5765-4889-b4e0-0a3b03856c1e:
data
wiseco-datadir-zok-2-pvc-e6f00ae5-aaf3-496f-8012-c4bf714f9219:
data
[root@k8s-harbor01 zk]# ls */*
wiseco-datadir-zok-0-pvc-2bace45f-4567-4751-a7ee-233c7034bb09/data:
log myid version-2
wiseco-datadir-zok-1-pvc-e5a6e15a-5765-4889-b4e0-0a3b03856c1e/data:
log myid version-2
wiseco-datadir-zok-2-pvc-e6f00ae5-aaf3-496f-8012-c4bf714f9219/data:
log myid version-2
[root@k8s-harbor01 zk]# cat wiseco-datadir-zok-0-pvc-2bace45f-4567-4751-a7ee-233c7034bb09/data/myid
1
[root@k8s-harbor01 zk]# cat wiseco-datadir-zok-1-pvc-e5a6e15a-5765-4889-b4e0-0a3b03856c1e/data/myid
2
[root@k8s-harbor01 zk]# cat wiseco-datadir-zok-2-pvc-e6f00ae5-aaf3-496f-8012-c4bf714f9219/data/myid
3
查看zk集群主从关系
[root@k8s-master01 kafka]# kubectl get pods -n wiseco|grep zok
zok-0 1/1 Running 0 17h
zok-1 1/1 Running 0 17h
zok-2 1/1 Running 0 17h
[root@k8s-master01 kafka]# kubectl exec -ti zok-0 -n wiseco -- zkServer.sh status
ZooKeeper JMX enabled by default
Using config: /usr/bin/../etc/zookeeper/zoo.cfg
Mode: follower
[root@k8s-master01 kafka]# kubectl exec -ti zok-1 -n wiseco -- zkServer.sh status
ZooKeeper JMX enabled by default
Using config: /usr/bin/../etc/zookeeper/zoo.cfg
Mode: leader
[root@k8s-master01 kafka]# kubectl exec -ti zok-2 -n wiseco -- zkServer.sh status
ZooKeeper JMX enabled by default
Using config: /usr/bin/../etc/zookeeper/zoo.cfg
Mode: follower
验证zk集群连接
[root@k8s-master01 kafka]# kubectl get svc -n wiseco|grep zk
zk-cs NodePort 10.254.223.248 2181:32181/TCP 17h
zk-hs ClusterIP None 2888/TCP,3888/TCP 17h
[root@k8s-master01 ~]# kubectl exec -ti zok-1 -n wiseco -- zkCli.sh -server zk-cs:2181
Connecting to zk-cs:2181
2021-01-05 08:16:10,169 [myid:] - INFO [main:Environment@100] - Client environment:zookeeper.version=3.4.10-39d3a4f269333c922ed3db283be479f9deacaa0f, built on 03/23/2017 10:13 GMT
2021-01-05 08:16:10,173 [myid:] - INFO [main:Environment@100] - Client environment:host.name=zok-1.zk-hs.wiseco.svc.cluster.local
2021-01-05 08:16:10,173 [myid:] - INFO [main:Environment@100] - Client environment:java.version=1.8.0_131
2021-01-05 08:16:10,175 [myid:] - INFO [main:Environment@100] - Client environment:java.vendor=Oracle Corporation
2021-01-05 08:16:10,175 [myid:] - INFO [main:Environment@100] - Client environment:java.home=/usr/lib/jvm/java-8-openjdk-amd64/jre
2021-01-05 08:16:10,175 [myid:] - INFO [main:Environment@100] - Client environment:java.class.path=/usr/bin/../build/classes:/usr/bin/../build/lib/*.jar:/usr/bin/../share/zookeeper/zookeeper-3.4.10.jar:/usr/bin/../share/zookeeper/slf4j-log4j12-1.6.1.jar:/usr/bin/../share/zookeeper/slf4j-api-1.6.1.jar:/usr/bin/../share/zookeeper/netty-3.10.5.Final.jar:/usr/bin/../share/zookeeper/log4j-1.2.16.jar:/usr/bin/../share/zookeeper/jline-0.9.94.jar:/usr/bin/../src/java/lib/*.jar:/usr/bin/../etc/zookeeper:
2021-01-05 08:16:10,175 [myid:] - INFO [main:Environment@100] - Client environment:java.library.path=/usr/java/packages/lib/amd64:/usr/lib/x86_64-linux-gnu/jni:/lib/x86_64-linux-gnu:/usr/lib/x86_64-linux-gnu:/usr/lib/jni:/lib:/usr/lib
2021-01-05 08:16:10,175 [myid:] - INFO [main:Environment@100] - Client environment:java.io.tmpdir=/tmp
2021-01-05 08:16:10,175 [myid:] - INFO [main:Environment@100] - Client environment:java.compiler=
2021-01-05 08:16:10,175 [myid:] - INFO [main:Environment@100] - Client environment:os.name=Linux
2021-01-05 08:16:10,175 [myid:] - INFO [main:Environment@100] - Client environment:os.arch=amd64
2021-01-05 08:16:10,176 [myid:] - INFO [main:Environment@100] - Client environment:os.version=4.4.243-1.el7.elrepo.x86_64
2021-01-05 08:16:10,176 [myid:] - INFO [main:Environment@100] - Client environment:user.name=root
2021-01-05 08:16:10,176 [myid:] - INFO [main:Environment@100] - Client environment:user.home=/root
2021-01-05 08:16:10,176 [myid:] - INFO [main:Environment@100] - Client environment:user.dir=/
2021-01-05 08:16:10,177 [myid:] - INFO [main:ZooKeeper@438] - Initiating client connection, connectString=zk-cs:2181 sessionTimeout=30000 watcher=org.apache.zookeeper.ZooKeeperMain$MyWatcher@22d8cfe0
Welcome to ZooKeeper!
JLine support is enabled
2021-01-05 08:16:10,201 [myid:] - INFO [main-SendThread(zk-cs.wiseco.svc.cluster.local:2181):ClientCnxn$SendThread@1032] - Opening socket connection to server zk-cs.wiseco.svc.cluster.local/10.254.223.248:2181. Will not attempt to authenticate using SASL (unknown error)
2021-01-05 08:16:10,261 [myid:] - INFO [main-SendThread(zk-cs.wiseco.svc.cluster.local:2181):ClientCnxn$SendThread@876] - Socket connection established to zk-cs.wiseco.svc.cluster.local/10.254.223.248:2181, initiating session
[zk: zk-cs:2181(CONNECTING) 0] 2021-01-05 08:16:10,316 [myid:] - INFO [main-SendThread(zk-cs.wiseco.svc.cluster.local:2181):ClientCnxn$SendThread@1299] - Session establishment complete on server zk-cs.wiseco.svc.cluster.local/10.254.223.248:2181, sessionid = 0x376cdc83254000a, negotiated timeout = 30000
WATCHER::
WatchedEvent state:SyncConnected type:None path:null #到这里,按Enter键
[zk: zk-cs:2181(CONNECTED) 0] ls /
[zookeeper]
[zk: zk-cs:2181(CONNECTED) 1]
3. 创建KAFKA集群
这里要求能在外部访问到Kafka集群,需要配置三个外网地址,或者一个外网地址+三个端口,依次代理对应后端三个Kafka的Pod容器实例;外网连接Kafka,需要配置使用advertised.listeners监听器;
advertised_listeners 监听器是对外暴露的服务端口,kafka组件之间通讯用的是 listeners;
advertised_listeners 监听器会注册在 zookeeper 中;
当我们通过对"<公网ip>:端口"请求建立连接,kafka服务器会通过 zookeeper 中注册的监听器,找到定义的公网监听器(默认监听器是PLAINTEXT、也可以自定义),然后通过listeners中找到对应的"通讯ip和端口";
1)kafka 2.12-2.5.0版本的Dockerfile镜像制作
FROM 192.168.10.10/wiseco/jdk1.8.0_192
RUN rm -f /etc/localtime \
&& ln -sv /usr/share/zoneinfo/Asia/Shanghai /etc/localtime \
&& echo "Asia/Shanghai" > /etc/timezone
ENV LANG en_US.UTF-8
ENV KAFKA_DATA_DIR /var/lib/kafka/data
ENV JAVA_HOME /usr/java/jdk1.8.0_192
ENV KAFKA_HOME /opt/kafka
ENV PATH /usr/local/sbin:/usr/local/bin:/usr/sbin:/usr/bin:/sbin:/bin:/opt/kafka/bin
WORKDIR /opt
RUN yum install -y wget \
&& wget \
&& tar -zvxf kafka_2.12-2.5.0.tgz \
&& ln -s /opt/kafka_2.12-2.5.0 /opt/kafka \
&& rm -rf kafka_2.12-2.5.0.tgz \
&& mkdir -p /var/lib/kafka/data
CMD ["/bin/bash"]
制作镜像并上传到Harbor参考
# docker build -t 192.168.10.10/wiseco/kafka:v2.12-2.5.0 .
# docker push 192.168.10.10/wiseco/kafka:v2.12-2.5.0
2)创建kafka集群
[root@k8s-master01 ~]# mkdir -p /opt/k8s/k8s_project/kafka/
[root@k8s-master01 ~]# cd /opt/k8s/k8s_project/kafka/
kafka1.yaml 文件内容:
[root@k8s-master01 kafka]# vim kafka1.yaml
apiVersion: v1
kind: Service
metadata:
namespace: wiseco
name: kafka-hs1
labels:
app: kafka1
spec:
ports:
- port: 1099
name: jmx1
clusterIP: None
selector:
app: kafka1
---
apiVersion: v1
kind: Service
metadata:
namespace: wiseco
name: kafka-cs1
labels:
app: kafka1
spec:
type: NodePort
ports:
- port: 9092
targetPort: 9092
name: client1
nodePort: 32092
selector:
app: kafka1
---
apiVersion: apps/v1
kind: StatefulSet
metadata:
namespace: wiseco
name: kafoka1
spec:
serviceName: kafka-hs1
replicas: 1
selector:
matchLabels:
app: kafka1
template:
metadata:
labels:
app: kafka1
spec:
containers:
- name: k8skafka
imagePullPolicy: Always
image: 192.168.10.10/wiseco/kafka:v2.12-2.5.0
ports:
- containerPort: 9092
name: client1
- containerPort: 1099
name: jmx1
command:
- sh
- -c
- "exec kafka-server-start.sh /opt/kafka/config/server.properties --override broker.id=0 \
--override advertised.listeners=PLAINTEXT://156.56.46.37:32092 \
--override zookeeper.connect=zok-0.zk-hs.wiseco.svc.cluster.local:2181,zok-1.zk-hs.wiseco.svc.cluster.local:2181,zok-2.zk-hs.wiseco.svc.cluster.local:2181 \
--override log.dirs=/var/lib/kafka \
--override auto.create.topics.enable=true \
--override auto.leader.rebalance.enable=true \
--override background.threads=10 \
--override compression.type=producer \
--override delete.topic.enable=true \
--override leader.imbalance.check.interval.seconds=300 \
--override leader.imbalance.per.broker.percentage=10 \
--override log.flush.interval.messages=9223372036854775807 \
--override log.flush.offset.checkpoint.interval.ms=60000 \
--override log.flush.scheduler.interval.ms=9223372036854775807 \
--override log.retention.bytes=-1 \
--override log.retention.hours=168 \
--override log.roll.hours=168 \
--override log.roll.jitter.hours=0 \
--override log.segment.bytes=1073741824 \
--override log.segment.delete.delay.ms=60000 \
--override message.max.bytes=1000012 \
--override min.insync.replicas=1 \
--override num.io.threads=8 \
--override num.network.threads=3 \
--override num.recovery.threads.per.data.dir=1 \
--override num.replica.fetchers=1 \
--override offset.metadata.max.bytes=4096 \
--override offsets.commit.required.acks=-1 \
--override offsets.commit.timeout.ms=5000 \
--override offsets.load.buffer.size=5242880 \
--override offsets.retention.check.interval.ms=600000 \
--override offsets.retention.minutes=1440 \
--override offsets.topic.compression.codec=0 \
--override offsets.topic.num.partitions=50 \
--override offsets.topic.replication.factor=3 \
--override offsets.topic.segment.bytes=104857600 \
--override queued.max.requests=500 \
--override quota.consumer.default=9223372036854775807 \
--override quota.producer.default=9223372036854775807 \
--override replica.fetch.min.bytes=1 \
--override replica.fetch.wait.max.ms=500 \
--override replica.high.watermark.checkpoint.interval.ms=5000 \
--override replica.lag.time.max.ms=10000 \
--override replica.socket.receive.buffer.bytes=65536 \
--override replica.socket.timeout.ms=30000 \
--override request.timeout.ms=30000 \
--override socket.receive.buffer.bytes=102400 \
--override socket.request.max.bytes=104857600 \
--override socket.send.buffer.bytes=102400 \
--override unclean.leader.election.enable=true \
--override zookeeper.session.timeout.ms=6000 \
--override zookeeper.set.acl=false \
--override broker.id.generation.enable=true \
--override connections.max.idle.ms=600000 \
--override controlled.shutdown.enable=true \
--override controlled.shutdown.max.retries=3 \
--override controlled.shutdown.retry.backoff.ms=5000 \
--override controller.socket.timeout.ms=30000 \
--override default.replication.factor=1 \
--override fetch.purgatory.purge.interval.requests=1000 \
--override group.max.session.timeout.ms=300000 \
--override group.min.session.timeout.ms=6000 \
--override log.cleaner.backoff.ms=15000 \
--override log.cleaner.dedupe.buffer.size=134217728 \
--override log.cleaner.delete.retention.ms=86400000 \
--override log.cleaner.enable=true \
--override log.cleaner.io.buffer.load.factor=0.9 \
--override log.cleaner.io.buffer.size=524288 \
--override log.cleaner.io.max.bytes.per.second=1.7976931348623157E308 \
--override log.cleaner.min.cleanable.ratio=0.5 \
--override log.cleaner.min.compaction.lag.ms=0 \
--override log.cleaner.threads=1 \
--override log.cleanup.policy=delete \
--override log.index.interval.bytes=4096 \
--override log.index.size.max.bytes=10485760 \
--override log.message.timestamp.difference.max.ms=9223372036854775807 \
--override log.message.timestamp.type=CreateTime \
--override log.preallocate=false \
--override log.retention.check.interval.ms=300000 \
--override max.connections.per.ip=2147483647 \
--override num.partitions=1 \
--override producer.purgatory.purge.interval.requests=1000 \
--override replica.fetch.backoff.ms=1000 \
--override replica.fetch.max.bytes=1048576 \
--override replica.fetch.response.max.bytes=10485760 \
--override reserved.broker.max.id=1000 "
env:
- name: KAFKA_HEAP_OPTS
value : "-Xmx512M -Xms512M"
- name: KAFKA_OPTS
value: "-Dlogging.level=INFO"
volumeMounts:
- name: kafkadatadir
mountPath: /var/lib/kafka
lifecycle:
postStart:
exec:
command: ["/bin/sh","-c","touch /tmp/health"]
livenessProbe:
exec:
command: ["test","-e","/tmp/health"]
initialDelaySeconds: 5
timeoutSeconds: 5
periodSeconds: 10
readinessProbe:
tcpSocket:
port: client1
initialDelaySeconds: 15
timeoutSeconds: 5
periodSeconds: 20
volumeClaimTemplates:
- metadata:
name: kafkadatadir
annotations:
volume.beta.kubernetes.io/storage-class: "kafka-nfs-storage"
spec:
accessModes:
- ReadWriteMany
resources:
requests:
storage: 10Gi
kafka2.yaml 文件内容:
[root@k8s-master01 kafka]# vim kafka2.yaml
apiVersion: v1
kind: Service
metadata:
namespace: wiseco
name: kafka-hs2
labels:
app: kafka2
spec:
ports:
- port: 1099
name: jmx2
clusterIP: None
selector:
app: kafka2
---
apiVersion: v1
kind: Service
metadata:
namespace: wiseco
name: kafka-cs2
labels:
app: kafka2
spec:
type: NodePort
ports:
- port: 9092
targetPort: 9092
name: client2
nodePort: 32093
selector:
app: kafka2
---
apiVersion: apps/v1
kind: StatefulSet
metadata:
namespace: wiseco
name: kafoka2
spec:
serviceName: kafka-hs2
replicas: 1
selector:
matchLabels:
app: kafka2
template:
metadata:
labels:
app: kafka2
spec:
containers:
- name: k8skafka
imagePullPolicy: Always
image: 192.168.10.10/wiseco/kafka:v2.12-2.5.0
ports:
- containerPort: 9092
name: client2
- containerPort: 1099
name: jmx2
command:
- sh
- -c
- "exec kafka-server-start.sh /opt/kafka/config/server.properties --override broker.id=1 \
--override advertised.listeners=PLAINTEXT://156.56.46.37:32093 \
--override zookeeper.connect=zok-0.zk-hs.wiseco.svc.cluster.local:2181,zok-1.zk-hs.wiseco.svc.cluster.local:2181,zok-2.zk-hs.wiseco.svc.cluster.local:2181 \
--override log.dirs=/var/lib/kafka \
--override auto.create.topics.enable=true \
--override auto.leader.rebalance.enable=true \
--override background.threads=10 \
--override compression.type=producer \
--override delete.topic.enable=true \
--override leader.imbalance.check.interval.seconds=300 \
--override leader.imbalance.per.broker.percentage=10 \
--override log.flush.interval.messages=9223372036854775807 \
--override log.flush.offset.checkpoint.interval.ms=60000 \
--override log.flush.scheduler.interval.ms=9223372036854775807 \
--override log.retention.bytes=-1 \
--override log.retention.hours=168 \
--override log.roll.hours=168 \
--override log.roll.jitter.hours=0 \
--override log.segment.bytes=1073741824 \
--override log.segment.delete.delay.ms=60000 \
--override message.max.bytes=1000012 \
--override min.insync.replicas=1 \
--override num.io.threads=8 \
--override num.network.threads=3 \
--override num.recovery.threads.per.data.dir=1 \
--override num.replica.fetchers=1 \
--override offset.metadata.max.bytes=4096 \
--override offsets.commit.required.acks=-1 \
--override offsets.commit.timeout.ms=5000 \
--override offsets.load.buffer.size=5242880 \
--override offsets.retention.check.interval.ms=600000 \
--override offsets.retention.minutes=1440 \
--override offsets.topic.compression.codec=0 \
--override offsets.topic.num.partitions=50 \
--override offsets.topic.replication.factor=3 \
--override offsets.topic.segment.bytes=104857600 \
--override queued.max.requests=500 \
--override quota.consumer.default=9223372036854775807 \
--override quota.producer.default=9223372036854775807 \
--override replica.fetch.min.bytes=1 \
--override replica.fetch.wait.max.ms=500 \
--override replica.high.watermark.checkpoint.interval.ms=5000 \
--override replica.lag.time.max.ms=10000 \
--override replica.socket.receive.buffer.bytes=65536 \
--override replica.socket.timeout.ms=30000 \
--override request.timeout.ms=30000 \
--override socket.receive.buffer.bytes=102400 \
--override socket.request.max.bytes=104857600 \
--override socket.send.buffer.bytes=102400 \
--override unclean.leader.election.enable=true \
--override zookeeper.session.timeout.ms=6000 \
--override zookeeper.set.acl=false \
--override broker.id.generation.enable=true \
--override connections.max.idle.ms=600000 \
--override controlled.shutdown.enable=true \
--override controlled.shutdown.max.retries=3 \
--override controlled.shutdown.retry.backoff.ms=5000 \
--override controller.socket.timeout.ms=30000 \
--override default.replication.factor=1 \
--override fetch.purgatory.purge.interval.requests=1000 \
--override group.max.session.timeout.ms=300000 \
--override group.min.session.timeout.ms=6000 \
--override log.cleaner.backoff.ms=15000 \
--override log.cleaner.dedupe.buffer.size=134217728 \
--override log.cleaner.delete.retention.ms=86400000 \
--override log.cleaner.enable=true \
--override log.cleaner.io.buffer.load.factor=0.9 \
--override log.cleaner.io.buffer.size=524288 \
--override log.cleaner.io.max.bytes.per.second=1.7976931348623157E308 \
--override log.cleaner.min.cleanable.ratio=0.5 \
--override log.cleaner.min.compaction.lag.ms=0 \
--override log.cleaner.threads=1 \
--override log.cleanup.policy=delete \
--override log.index.interval.bytes=4096 \
--override log.index.size.max.bytes=10485760 \
--override log.message.timestamp.difference.max.ms=9223372036854775807 \
--override log.message.timestamp.type=CreateTime \
--override log.preallocate=false \
--override log.retention.check.interval.ms=300000 \
--override max.connections.per.ip=2147483647 \
--override num.partitions=1 \
--override producer.purgatory.purge.interval.requests=1000 \
--override replica.fetch.backoff.ms=1000 \
--override replica.fetch.max.bytes=1048576 \
--override replica.fetch.response.max.bytes=10485760 \
--override reserved.broker.max.id=1000 "
env:
- name: KAFKA_HEAP_OPTS
value : "-Xmx512M -Xms512M"
- name: KAFKA_OPTS
value: "-Dlogging.level=INFO"
volumeMounts:
- name: kafkadatadir
mountPath: /var/lib/kafka
lifecycle:
postStart:
exec:
command: ["/bin/sh","-c","touch /tmp/health"]
livenessProbe:
exec:
command: ["test","-e","/tmp/health"]
initialDelaySeconds: 5
timeoutSeconds: 5
periodSeconds: 10
readinessProbe:
tcpSocket:
port: client2
initialDelaySeconds: 15
timeoutSeconds: 5
periodSeconds: 20
volumeClaimTemplates:
- metadata:
name: kafkadatadir
annotations:
volume.beta.kubernetes.io/storage-class: "kafka-nfs-storage"
spec:
accessModes:
- ReadWriteMany
resources:
requests:
storage: 10Gi
kafka3.yaml 文件内容:
[root@k8s-master01 kafka]# vim kafka3.yaml
apiVersion: v1
kind: Service
metadata:
namespace: wiseco
name: kafka-hs3
labels:
app: kafka3
spec:
ports:
- port: 1099
name: jmx3
clusterIP: None
selector:
app: kafka3
---
apiVersion: v1
kind: Service
metadata:
namespace: wiseco
name: kafka-cs3
labels:
app: kafka3
spec:
type: NodePort
ports:
- port: 9092
targetPort: 9092
name: client3
nodePort: 32094
selector:
app: kafka3
---
apiVersion: apps/v1
kind: StatefulSet
metadata:
namespace: wiseco
name: kafoka3
spec:
serviceName: kafka-hs3
replicas: 1
selector:
matchLabels:
app: kafka3
template:
metadata:
labels:
app: kafka3
spec:
containers:
- name: k8skafka
imagePullPolicy: Always
image: 192.168.10.10/wiseco/kafka:v2.12-2.5.0
ports:
- containerPort: 9092
name: client3
- containerPort: 1099
name: jmx3
command:
- sh
- -c
- "exec kafka-server-start.sh /opt/kafka/config/server.properties --override broker.id=2 \
--override advertised.listeners=PLAINTEXT://156.56.46.37:32094 \
--override zookeeper.connect=zok-0.zk-hs.wiseco.svc.cluster.local:2181,zok-1.zk-hs.wiseco.svc.cluster.local:2181,zok-2.zk-hs.wiseco.svc.cluster.local:2181 \
--override log.dirs=/var/lib/kafka \
--override auto.create.topics.enable=true \
--override auto.leader.rebalance.enable=true \
--override background.threads=10 \
--override compression.type=producer \
--override delete.topic.enable=true \
--override leader.imbalance.check.interval.seconds=300 \
--override leader.imbalance.per.broker.percentage=10 \
--override log.flush.interval.messages=9223372036854775807 \
--override log.flush.offset.checkpoint.interval.ms=60000 \
--override log.flush.scheduler.interval.ms=9223372036854775807 \
--override log.retention.bytes=-1 \
--override log.retention.hours=168 \
--override log.roll.hours=168 \
--override log.roll.jitter.hours=0 \
--override log.segment.bytes=1073741824 \
--override log.segment.delete.delay.ms=60000 \
--override message.max.bytes=1000012 \
--override min.insync.replicas=1 \
--override num.io.threads=8 \
--override num.network.threads=3 \
--override num.recovery.threads.per.data.dir=1 \
--override num.replica.fetchers=1 \
--override offset.metadata.max.bytes=4096 \
--override offsets.commit.required.acks=-1 \
--override offsets.commit.timeout.ms=5000 \
--override offsets.load.buffer.size=5242880 \
--override offsets.retention.check.interval.ms=600000 \
--override offsets.retention.minutes=1440 \
--override offsets.topic.compression.codec=0 \
--override offsets.topic.num.partitions=50 \
--override offsets.topic.replication.factor=3 \
--override offsets.topic.segment.bytes=104857600 \
--override queued.max.requests=500 \
--override quota.consumer.default=9223372036854775807 \
--override quota.producer.default=9223372036854775807 \
--override replica.fetch.min.bytes=1 \
--override replica.fetch.wait.max.ms=500 \
--override replica.high.watermark.checkpoint.interval.ms=5000 \
--override replica.lag.time.max.ms=10000 \
--override replica.socket.receive.buffer.bytes=65536 \
--override replica.socket.timeout.ms=30000 \
--override request.timeout.ms=30000 \
--override socket.receive.buffer.bytes=102400 \
--override socket.request.max.bytes=104857600 \
--override socket.send.buffer.bytes=102400 \
--override unclean.leader.election.enable=true \
--override zookeeper.session.timeout.ms=6000 \
--override zookeeper.set.acl=false \
--override broker.id.generation.enable=true \
--override connections.max.idle.ms=600000 \
--override controlled.shutdown.enable=true \
--override controlled.shutdown.max.retries=3 \
--override controlled.shutdown.retry.backoff.ms=5000 \
--override controller.socket.timeout.ms=30000 \
--override default.replication.factor=1 \
--override fetch.purgatory.purge.interval.requests=1000 \
--override group.max.session.timeout.ms=300000 \
--override group.min.session.timeout.ms=6000 \
--override log.cleaner.backoff.ms=15000 \
--override log.cleaner.dedupe.buffer.size=134217728 \
--override log.cleaner.delete.retention.ms=86400000 \
--override log.cleaner.enable=true \
--override log.cleaner.io.buffer.load.factor=0.9 \
--override log.cleaner.io.buffer.size=524288 \
--override log.cleaner.io.max.bytes.per.second=1.7976931348623157E308 \
--override log.cleaner.min.cleanable.ratio=0.5 \
--override log.cleaner.min.compaction.lag.ms=0 \
--override log.cleaner.threads=1 \
--override log.cleanup.policy=delete \
--override log.index.interval.bytes=4096 \
--override log.index.size.max.bytes=10485760 \
--override log.message.timestamp.difference.max.ms=9223372036854775807 \
--override log.message.timestamp.type=CreateTime \
--override log.preallocate=false \
--override log.retention.check.interval.ms=300000 \
--override max.connections.per.ip=2147483647 \
--override num.partitions=1 \
--override producer.purgatory.purge.interval.requests=1000 \
--override replica.fetch.backoff.ms=1000 \
--override replica.fetch.max.bytes=1048576 \
--override replica.fetch.response.max.bytes=10485760 \
--override reserved.broker.max.id=1000 "
env:
- name: KAFKA_HEAP_OPTS
value : "-Xmx512M -Xms512M"
- name: KAFKA_OPTS
value: "-Dlogging.level=INFO"
volumeMounts:
- name: kafkadatadir
mountPath: /var/lib/kafka
lifecycle:
postStart:
exec:
command: ["/bin/sh","-c","touch /tmp/health"]
livenessProbe:
exec:
command: ["test","-e","/tmp/health"]
initialDelaySeconds: 5
timeoutSeconds: 5
periodSeconds: 10
readinessProbe:
tcpSocket:
port: client3
initialDelaySeconds: 15
timeoutSeconds: 5
periodSeconds: 20
volumeClaimTemplates:
- metadata:
name: kafkadatadir
annotations:
volume.beta.kubernetes.io/storage-class: "kafka-nfs-storage"
spec:
accessModes:
- ReadWriteMany
resources:
requests:
storage: 10Gi
创建并查看
[root@k8s-vm01 new_kafka]# kubectl apply -f kafka1.yaml
[root@k8s-vm01 new_kafka]# kubectl apply -f kafka2.yaml
[root@k8s-vm01 new_kafka]# kubectl apply -f kafka3.yaml
[root@k8s-vm01 new_kafka]# kubectl get pods -n wiseco|grep kafoka
kafoka1-0 1/1 Running 0 5d21h
kafoka2-0 1/1 Running 0 5d21h
kafoka3-0 1/1 Running 0 5d21h
[root@k8s-vm01 new_kafka]# kubectl get svc -n wiseco|grep kafka
kafka-cs1 NodePort 10.254.66.1 9092:32092/TCP 5d21h
kafka-cs2 NodePort 10.254.177.227 9092:32093/TCP 5d21h
kafka-cs3 NodePort 10.254.136.242 9092:32094/TCP 5d21h
kafka-hs1 ClusterIP None 1099/TCP 5d21h
kafka-hs2 ClusterIP None 1099/TCP 5d21h
kafka-hs3 ClusterIP None 1099/TCP 5d21h
[root@k8s-vm01 new_kafka]# kubectl get statefulset -n wiseco|grep kafoka
kafoka1 1/1 5d21h
kafoka2 1/1 5d21h
kafoka3 1/1 5d21h
查看kafka集群的PV、PVC
[root@k8s-vm01 new_kafka]# kubectl get pv -n wiseco|grep kafoka
pvc-17aeea13-4f2a-4f10-8f63-67888c468831 10Gi RWX Delete Bound wiseco/kafkadatadir-kafoka1-0 kafka-nfs-storage 6d15h
pvc-447d344d-2eaa-4db5-baf8-473ce9811378 10Gi RWX Delete Bound wiseco/kafkadatadir-kafoka2-0 kafka-nfs-storage 6d15h
pvc-c3b26e12-3c96-48ca-9019-6211e9e6843b 10Gi RWX Delete Bound wiseco/kafkadatadir-kafoka3-0 kafka-nfs-storage 6d15h
[root@k8s-vm01 new_kafka]# kubectl get pvc -n wiseco|grep kafoka
kafkadatadir-kafoka1-0 Bound pvc-17aeea13-4f2a-4f10-8f63-67888c468831 10Gi RWX kafka-nfs-storage 6d15h
kafkadatadir-kafoka2-0 Bound pvc-447d344d-2eaa-4db5-baf8-473ce9811378 10Gi RWX kafka-nfs-storage 6d15h
kafkadatadir-kafoka3-0 Bound pvc-c3b26e12-3c96-48ca-9019-6211e9e6843b 10Gi RWX kafka-nfs-storage 6d15h
查看NFS服务器上,kafka集群的持久化存储
[root@k8s_storage ~]# ll /data/storage/kafka/
total 12
drwxrwxrwx 54 root root 4096 Jan 18 14:30 wiseco-kafkadatadir-kafoka1-0-pvc-17aeea13-4f2a-4f10-8f63-67888c468831
drwxrwxrwx 54 root root 4096 Jan 18 14:30 wiseco-kafkadatadir-kafoka2-0-pvc-447d344d-2eaa-4db5-baf8-473ce9811378
drwxrwxrwx 52 root root 4096 Jan 18 14:30 wiseco-kafkadatadir-kafoka3-0-pvc-c3b26e12-3c96-48ca-9019-6211e9e6843b
[root@k8s_storage ~]# cat /data/storage/kafka/wiseco-kafkadatadir-kafoka1-0-pvc-17aeea13-4f2a-4f10-8f63-67888c468831/meta.properties
#
#Tue Jan 12 17:16:03 CST 2021
cluster.id=8H9vYTT_RBOiwUKqFRvn0w
version=0
broker.id=0
[root@k8s_storage ~]# cat /data/storage/kafka/wiseco-kafkadatadir-kafoka2-0-pvc-447d344d-2eaa-4db5-baf8-473ce9811378/meta.properties
#
#Tue Jan 12 17:17:11 CST 2021
cluster.id=8H9vYTT_RBOiwUKqFRvn0w
version=0
broker.id=1
[root@k8s_storage ~]# cat /data/storage/kafka/wiseco-kafkadatadir-kafoka3-0-pvc-c3b26e12-3c96-48ca-9019-6211e9e6843b/meta.properties
#
#Tue Jan 12 17:17:29 CST 2021
cluster.id=8H9vYTT_RBOiwUKqFRvn0w
version=0
broker.id=2
3)验证kafka集群数据的生产和消费
[root@k8s-vm01 new_kafka]# kubectl get svc -n wiseco|grep kafka
kafka-cs1 NodePort 10.254.66.1 9092:32092/TCP 5d21h
kafka-cs2 NodePort 10.254.177.227 9092:32093/TCP 5d21h
kafka-cs3 NodePort 10.254.136.242 9092:32094/TCP 5d21h
kafka-hs1 ClusterIP None 1099/TCP 5d21h
kafka-hs2 ClusterIP None 1099/TCP 5d21h
kafka-hs3 ClusterIP None 1099/TCP 5d21h
[root@k8s-vm01 new_kafka]# kubectl get svc -n wiseco|grep zk
zk-cs NodePort 10.254.209.162 2181:32181/TCP 6d16h
zk-hs ClusterIP None 2888/TCP,3888/TCP 6d16h
登录到三个kafka集群中的任意一个kafka的pod容器实例(比如kafka2节点),进行kafka数据生产:
[root@k8s-vm01 new_kafka]# kubectl get pods -n wiseco|grep kafoka
kafoka1-0 1/1 Running 0 5d21h
kafoka2-0 1/1 Running 0 5d21h
kafoka3-0 1/1 Running 0 5d21h
[root@k8s-vm01 new_kafka]# kubectl exec -ti kafoka2-0 -n wiseco -- /bin/bash
[root@kafoka2-0 opt]#
查看topics
[root@kafoka2-0 opt]# kafka-topics.sh --list --zookeeper zk-cs:2181
创建topics,比如名称为wisecotest
[root@kafoka2-0 opt]# kafka-topics.sh --create --topic wisecotest --zookeeper zk-cs:2181 --partitions 1 --replication-factor 1
Created topic wisecotest.
查看topics
[root@kafoka2-0 opt]# kafka-topics.sh --list --zookeeper zk-cs:2181
wisecotest
往上面创建的名称为wisecotest的topics里生产数据
[root@kafoka2-0 opt]# kafka-console-producer.sh --broker-list 156.56.46.37:32092,156.56.46.37:32093,156.56.46.37:32094 --topic wisecotest
>test1
>test2
>test3
>
再登录到另外的一个kafka的pod实例,比如kafka3,验证kafka的数据消费
发现上面生产的kafka数据,已经可以消费了
[root@k8s-vm01 ~]# kubectl exec -ti kafoka3-0 -n wiseco -- /bin/bash
[root@kafoka3-0 opt]# kafka-console-consumer.sh --bootstrap-server 156.56.46.37:32092,156.56.46.37:32093,156.56.46.37:32094 --topic wisecotest --from-beginning
test1
test2
test3
再在kafka2的容器里生产数据
[root@kafoka2-0 opt]# kafka-console-producer.sh --broker-list 156.56.46.37:32092,156.56.46.37:32093,156.56.46.37:32094 --topic wisecotest
>test1
>test2
>test3
>hahahaha
>heiheihei
>
再到kafka3的容器里查看数据消费
[root@kafoka3-0 opt]# kafka-console-consumer.sh --bootstrap-server 156.56.46.37:32092,156.56.46.37:32093,156.56.46.37:32094 --topic wisecotest --from-beginning
test1
test2
test3
hahahaha
heiheihei
使用一个外网地址也可以消费
[root@kafoka3-0 opt]# kafka-console-consumer.sh --bootstrap-server 156.56.46.37:32094 --topic wisecotest --from-beginning
test1
test2
test3
hahahaha
heiheihei
使用kafka内网地址,在K8S集群内部也可以正常消费kafka数据
[root@kafoka3-0 opt]# kafka-console-consumer.sh --bootstrap-server kafka-cs1:9092 --topic wisecotest --from-beginning
test1
test2
test3
hahahaha
heiheihei
[root@kafoka3-0 opt]# kafka-console-consumer.sh --bootstrap-server kafka-cs2:9092 --topic wisecotest --from-beginning
test1
test2
test3
hahahaha
heiheihei
下面告警表示:在当前kafoka3的pod内容使用自己的service地址访问不可达!如果在别的非kafka实例pod内容使用下面地址就没有这个告警了!
因为K8S规定:在Pod内容不可使用Pod自己的service地址往自己连接!
[root@kafoka3-0 opt]# kafka-console-consumer.sh --bootstrap-server kafka-cs1:9092,kafka-cs2:9092,kafka-cs3:9093 --topic wisecotest --from-beginning
[2021-01-18 14:47:33,820] WARN [Consumer clientId=consumer-console-consumer-4149-1, groupId=console-consumer-4149] Connection to node -3 (kafka-cs3/10.254.136.242:9093) could not be established. Broker may not be available. (org.apache.kafka.clients.NetworkClient)
[2021-01-18 14:47:33,821] WARN [Consumer clientId=consumer-console-consumer-4149-1, groupId=console-consumer-4149] Bootstrap broker kafka-cs3:9093 (id: -3 rack: null) disconnected (org.apache.kafka.clients.NetworkClient)
test1
test2
test3
hahahaha
heiheihei
现在验证在K8S集群外部,通过外网地址连接和消费Kafka数据
登录到安装kafka客户端的一台外部服务器上:
[root@dev-env ~]# cd /usr/kafka/kafka_2.11-2.1.0/bin/
[root@dev-env bin]#
如下发现,在K8S集群外部可以通过外网地址连接和消费内部生产的Kafka数据了
[root@dev-env bin]# ./kafka-console-consumer.sh --bootstrap-server 156.56.46.37:32092,156.56.46.37:32093,156.56.46.37:32094 --topic wisecotest --from-beginning
test1
test2
test3
hahahaha
heiheihei
在kafka的pod内部再生产数据
[root@kafoka2-0 opt]# kafka-console-producer.sh --broker-list 156.56.46.37:32092,156.56.46.37:32093,156.56.46.37:32094 --topic wisecotest
>test1
>test2
>test3
>hahahaha
>heiheihei
>123456789
>abcdefghijk
>
在外部查看,可以正常消费这些kafka数据
[root@dev-env bin]# ./kafka-console-consumer.sh --bootstrap-server 156.56.46.37:32092,156.56.46.37:32093,156.56.46.37:32094 --topic wisecotest --from-beginning
test1
test2
test3
hahahaha
heiheihei
123456789
abcdefghijk
kafka集群在内部的连接地址:
kafka-cs1:9092,kafka-cs2:9092,kafka-cs3:9092
kafka集群在外部的连接地址:
156.56.46.37:32092,156.56.46.37:32093,156.56.46.37:32094
*************** 当你发现自己的才华撑不起野心时,就请安静下来学习吧!***************
版权声明:本文内容由网络用户投稿,版权归原作者所有,本站不拥有其著作权,亦不承担相应法律责任。如果您发现本站中有涉嫌抄袭或描述失实的内容,请联系我们jiasou666@gmail.com 处理,核实后本网站将在24小时内删除侵权内容。
暂时没有评论,来抢沙发吧~