一、zookeeper--部署和使用

网友投稿 293 2022-11-26

一、zookeeper--部署和使用

一、部署zookeeper

1、资源规划

服务器 bigdata121/192.168.50.121,bigdata122/192.168.50.122,bigdata123/192.168.50.123
zookeeper版本 3.4.10
系统版本 centos7.2

2、集群部署

(1)安装zk

[root@bigdata121 modules]# cd /opt/modules/zookeeper-3.4.10 [root@bigdata121 zookeeper-3.4.10]# mkdir zkData [root@bigdata121 zookeeper-3.4.10]# mv conf/zoo_sample.cfg conf/zoo.cfg

(2)修改zoo.cfg配置

# The number of milliseconds of each tick tickTime=2000 # The number of ticks that the initial # synchronization phase can take initLimit=10 # The number of ticks that can pass between # sending a request and getting an acknowledgement syncLimit=5 # the directory where the snapshot is stored. # do not use /tmp for storage, /tmp here is just # example sakes. #dataDir=/tmp/zookeeper # 指定zk存储数据的目录 dataDir=/opt/modules/zookeeper-3.4.10/zkData # the port at which the clients will connect clientPort=2181 # the maximum number of client connections. # increase this if you need to handle more clients #maxClientCnxns=60 # # Be sure to read the maintenance section of the # administrator guide before turning on autopurge. # # http://zookeeper.apache.org/doc/current/zookeeperAdmin.html#sc_maintenance # # The number of snapshots to retain in dataDir #autopurge.snapRetainCount=3 # Purge task interval in hours # Set to "0" to disable auto purge feature #autopurge.purgeInterval=1 # 这里是重点配置 #############cluster############################# server.1=bigdata121:2888:3888 server.2=bigdata122:2888:3888 server.3=bigdata123:2888:3888

cluster配置参数解读:Server.A=B:C:D。A是一个数字,表示这个是第几号服务器,也就是sid;B是这个服务器的ip地址;C是这个服务器与集群中的Leader服务器交换信息的端口;不是对外的服务端口(对外的服务端口默认是2181)D是万一集群中的Leader服务器挂了,需要一个端口来重新进行选举,选出一个新的Leader,而这个端口就是用来执行选举时服务器相互通信的端口。

将配置好的整个程序目录拷贝到其他机器上,使用scp或者rsync都可以,自己看着办

(3)指定服务器id在前面配置的 dataDir 指定的目录下,创建一个“myid”文件,里面的内容就写入当前server的id,这个id就是在zk集群中的唯一标识。并且这个id需要和前面配置文件中的cluster中指定的一样,否则会报错。

(4)配置环境变量

vim /etc/profile.d/zookeeper.sh #!/bin/bash export ZOOKEEPER_HOME=/opt/modules/zookeeper-3.4.10 export PATH=${ZOOKEEPER_HOME}/bin:$PATH 然后 source /etc/profile.d/zookeeper.sh

(5)启动在三台机器上执行

启动:zkServer.sh start 查看当前主机上zk的状态:zkServer.sh status [root@bigdata121 conf]# zkServer.sh status ZooKeeper JMX enabled by default Using config: /opt/modules/zookeeper-3.4.10/bin/../conf/zoo.cfg Mode: follower

二、常用命令

使用 zkCli.sh 进入本机的zk服务。可以使用如下命令:

命令 功能
help 显示所有命令帮助
ls path [watch] 使用 ls 命令来查看当前znode中所包含的内容,后面的watch表示监听该节点下子节点的改变。注意,监听触发一次之后就会失效,如果需要持续监听,需要每次触发之后重新进行监听
ls2 path [watch] 查看当前节点数据并能看到更新次数等数据,类似于Linux中的 ls -l
Create 普通创建(永久节点) -s 含有序列,会在节点名后面加一串序列号,常用于节点名称冲突的情况 -e 创建临时节点
get path [watch] 获得节点的值。后面的watch表示监听该节点的value的改变。
Set path value 设置节点的具体值
Stat 查看节点状态
rmr path 递归删除节点

三、zk api使用(java)

1、maven依赖

org.apache.zookeeper zookeeper 3.4.10

2、创建zk客户端

import org.apache.zookeeper.*; import org.apache.zookeeper.data.Stat; import org.junit.Before; import org.junit.Test; import java.io.IOException; import java.util.List; public class ZkTest { public static String connectString = "bigdata121:2181,bigdata122:2181,bigdata123:2181"; public static int sessionTimeout = 2000; public ZooKeeper zkClient = null; @Before public void init() throws IOException { //创建zk客户端 zkClient = new ZooKeeper(connectString, sessionTimeout, new Watcher() { //返回监听事件时的处理函数,监听事件是一次性的 public void process(WatchedEvent watchedEvent) { System.out.println(watchedEvent.getState() + "," + watchedEvent.getType() + "," + watchedEvent.getPath()); try { zkClient.getChildren("/", true); } catch (KeeperException e) { e.printStackTrace(); } catch (InterruptedException e) { e.printStackTrace(); } } }); } }

3、创建节点

public void create() { //创建节点,参数为:节点名 节点值 权限 节点类型 //即 /wangjin tao 开放权限 持久化节点 try { String s = zkClient.create("/wangjin", "tao".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT); } catch (KeeperException e) { System.out.println("node exists!!!"); } catch (InterruptedException e) { e.printStackTrace(); } }

4、获取子节点

zkclient.getChildren(路径,是否监听) 返回的是子节点的列表 例子: public void getChildNode() { try { List children = zkClient.getChildren("/", false); for (String node : children) { System.out.println(node); } } catch (KeeperException e) { System.out.println("node not exists!!!"); } catch (InterruptedException e) { e.printStackTrace(); } }

5、判断节点是否存在

zkclient.exists(path, 是否监听) 返回的是节点的状态信息,如果为null,表示节点不存在 例子: public void nodeExist() { //返回的是节点的状态信息,如果为null,表示节点不存在 try { Stat stat = zkClient.exists("/king", false); System.out.println(stat == null ? "没有" : "有"); } catch (KeeperException e) { System.out.println("node not exists"); } catch (InterruptedException e) { e.printStackTrace(); } }

四、使用zk做分布式锁实例

1、maven依赖

org.apache.curator curator-framework 4.0.0 org.apache.curator curator-recipes 4.0.0 org.apache.curator curator-client 4.0.0 com.google.guava guava 16.0.1

2、需求模拟抢购秒杀场景,需要给商品数量加锁。

3、代码

import org.apache.curator.RetryPolicy; import org.apache.curator.framework.CuratorFramework; import org.apache.curator.framework.CuratorFrameworkFactory; import org.apache.curator.framework.recipes.locks.InterProcessMutex; import org.apache.curator.retry.ExponentialBackoffRetry; public class TestDistributedLock { //定义共享资源 private static int count = 10; //用于减除商品 private static void printCountNumber() { System.out.println("***********" + Thread.currentThread().getName() + "**********"); System.out.println("当前值:" + count); count--; //睡2秒 try { Thread.sleep(500); } catch (InterruptedException e) { // TODO Auto-generated catch block e.printStackTrace(); } System.out.println("***********" + Thread.currentThread().getName() + "**********"); } public static void main(String[] args) { //定义客户端重试的策略 RetryPolicy policy = new ExponentialBackoffRetry(1000, //每次等待的时间 10); //最大重试的次数 //定义ZK的一个客户端 CuratorFramework client = CuratorFrameworkFactory.builder() .connectString("bigdata121:2181") .retryPolicy(policy) .build(); //客户端对象连接zk client.start(); //创建互斥锁,其实就是在zk上创建个节点 final InterProcessMutex lock = new InterProcessMutex(client, "/mylock"); // 启动10个线程去访问共享资源 for (int i = 0; i < 10; i++) { new Thread(new Runnable() { public void run() { try { //请求得到锁 lock.acquire(); //访问共享资源 printCountNumber(); } catch (Exception ex) { ex.printStackTrace(); } finally { //释放锁 try { lock.release(); } catch (Exception e) { e.printStackTrace(); } } } }).start(); } } }

版权声明:本文内容由网络用户投稿,版权归原作者所有,本站不拥有其著作权,亦不承担相应法律责任。如果您发现本站中有涉嫌抄袭或描述失实的内容,请联系我们jiasou666@gmail.com 处理,核实后本网站将在24小时内删除侵权内容。

上一篇:java 多线程与并发之volatile详解分析
下一篇:基于DSP芯片实现异步串行通信系统的软硬件设计
相关文章

 发表评论

暂时没有评论,来抢沙发吧~