ZooKeeper客户端源码(一)——向服务端建立连接+会话建立+心跳保持长连接

网友投稿 294 2022-11-03

ZooKeeper客户端源码(一)——向服务端建立连接+会话建立+心跳保持长连接

从ZooKeeper实例初始化开始

ZooKeeper 提供了原生的客户端库,虽然不好用,但是能够更好理解客户端与服务端建立连接和通信的过程。比较流行的Apache Curator也是对原生库的再封装。

向服务端建立连接,只需要实例化一个ZooKeeper对象,将服务器地址列表传进去即可。因为发起连接请求是一个异步的过程,所以实例化ZooKeeper时可以传一个Watcher,会话建立成功之后,客户端会生成一个 “已经建立连接(SyncConnected)” 的事件,进行回调通知。只有会话建立成功之后,才能与服务端进行通信。

String connectString = "127.0.0.1:2181,127.0.0.1:2182,127.0.0.1:2183"; ZooKeeper zooKeeper = new ZooKeeper(connectString, 20000, new Watcher() { @Override public void process(WatchedEvent event) { if (event.getState() == Event.KeeperState.SyncConnected) { System.out.println("会话建立成功"); } } });

ZooKeeper实例初始化流程如下:

创建HostProvider

HostProvider顾名思义就是“服务地址提供器”,默认实现类StaticHostProvider。StaticHostProvider核心思想是,将服务地址列表打乱,然后构成一个虚拟环,轮询向外提供服务地址。

如果StaticHostProvider不满足需求,可以自定义实现HostProvider。

打乱服务地址列表

打乱地址就很简单了,用了Java官方提供的工具java.util.Collections#shuffle。

// org.apache.zookeeper.client.StaticHostProvider#shuffle private List shuffle(Collection serverAddresses) { List tmpList = new ArrayList<>(serverAddresses.size()); tmpList.addAll(serverAddresses); Collections.shuffle(tmpList, sourceOfRandomness); return tmpList; }

随机源生成:

Random sourceOfRandomness = new Random(System.currentTimeMillis() ^ this.hashCode())

随机源的种子是当前时间毫秒值掺杂(^)当前StaticHostProvider实例的hashCode,使得每次生成的随机源更公平。

构建虚拟环轮询负载

如何将一个服务地址列表构成一个环轮询负载的呢?

有两个游标,currentIndex 和 lastIndex是实现“虚拟环轮询负载”的关键:

currentIndex,指向当前选择的位置。每选择一次就加一,如果等于服务地址列表长度,就重置为0,这样就形成了一个环。 lastIndex,上次选择的位置。会话建立成功之后会将currentIndex赋值给lastIndex。

public InetSocketAddress next(long spinDelay) { boolean needToSleep = false; InetSocketAddress addr; synchronized (this) { // 省略部分无关代码 // currentIndex自增,如果等于服务地址列表长度,就重置为0 ++currentIndex; if (currentIndex == serverAddresses.size()) { currentIndex = 0; } addr = serverAddresses.get(currentIndex); // 两个游标currentIndex、lastIndex // currentIndex 当前选择的位置,lastIndex上次选择的位置 // lastIndex 什么时候设置呢?会话建立成功之后调用 onConnected,将currentIndex赋值给lastIndex needToSleep = needToSleep || (currentIndex == lastIndex && spinDelay > 0); if (lastIndex == -1) { lastIndex = 0; } } // 如果 currentIndex和lastIndex且spinDelay>0,就需要休眠spinDelay时间, if (needToSleep) { try { Thread.sleep(spinDelay); } catch (InterruptedException e) { LOG.warn("Unexpected exception", e); } } // 解析InetSocketAddress, // 如果一个主机映射了多个ip地址(InetAddress) // 就打乱选择其中一个地址返回 return resolve(addr); }

当currentIndex和lastIndex相等时,且spinDelay>0,就会休眠spinDelay毫秒,然后再将选择的服务地址返回,为什么会有这个逻辑呢?

这是因为,如果轮询了一圈服务地址都没有成功建立连接,与其一味不停地重试,还不如休眠一段时间再试,可能成功的概率更高一些。

创建ConnectStringParser

ConnectStringParser 就是将 connectString 按一定格式解析成 InetSocketAddress 列表。connectString格式如下:

127.0.0.1:2181,127.0.0.1:2182,127.0.0.1:2183 127.0.0.1:2181,127.0.0.1:2182,127.0.0.1:2183/test

如果在服务地址后面指定了路径,后续的操作都是在该路径下进行。

public ConnectStringParser(String connectString) { // parse out chroot, if any // 解析chroot // connectString 可以指定某个路径, // 如127.0.0.1:2181,127.0.0.1:2182,127.0.0.1:2183/test int off = connectString.indexOf('/'); if (off >= 0) { String chrootPath = connectString.substring(off); // ignore "/" chroot spec, same as null if (chrootPath.length() == 1) { this.chrootPath = null; } else { PathUtils.validatePath(chrootPath); this.chrootPath = chrootPath; } connectString = connectString.substring(0, off); } else { this.chrootPath = null; } // 按 , 分割 List hostsList = split(connectString, ","); for (String host : hostsList) { int port = DEFAULT_PORT; String[] hostAndPort = NetUtils.getIPV6HostAndPort(host); if (hostAndPort.length != 0) { host = hostAndPort[0]; if (hostAndPort.length == 2) { port = Integer.parseInt(hostAndPort[1]); } } else { int pidx = host.lastIndexOf(':'); if (pidx >= 0) { // otherwise : is at the end of the string, ignore if (pidx < host.length() - 1) { port = Integer.parseInt(host.substring(pidx + 1)); } host = host.substring(0, pidx); } } // 封装未解析的InetSocketAddress serverAddresses.add(InetSocketAddress.createUnresolved(host, port)); } }

创建并启动ClientCnxn

ClientCnxn是对客户端连接的抽象和封装,负责网络连接管理和watcher管理。

还记得从ZooKeeper构造器传入的Watcher吗?它会作为默认Watcher传给ZKWatchManager,后续其他请求注册watcher时,可以不用再定义,直接使用默认的Watcher。

初始化SendThread时会传入一个ClientCnxnSocket,ClientCnxnSocket是对网络底层的封装,默认实现类为ClientCnxnSocketNIO。

ClientCnxn创建好后会进行启动操作,就是启动SendThread和EventThread两个线程。后续的网络连接建立和通信都是由SendThread线程负责。

向服务端建立连接

SendThread启动后,会进入一个循环状态,首先判断是否已经建立连接,如果没有就通过hostProvider选择一个服务地址发起连接。

网络底层处理类ClientCnxnSocket,以ClientCnxnSocketNIO实现为准,如下图是建立连接的过程:

ClientCnxnSocketNIO底层是创建了非阻塞的SocketChannel,然后注册OP_CONNECT事件,并发起连接,如果此时能立刻连上,就继续进行会话建立流程。

如下模拟服务器一直连不上,多次通过hostProvider轮询选择服务器重连的效果:

服务器地址选择了一圈以后,会休眠spinDelay毫秒,也符合源码逻辑。

会话建立请求

连接建立之后,需要继续进行会话建立请求,因为每条连接都是有状态的,临时节点的归属就是以会话为依托,连接断开,会话失效,临时节点也就跟随着清除。

每条连接跟会话绑定,如果连接因为网络等问题断开,在会话有效期内重连上,就可以恢复之前的工作场景,而如果在会话失效之后重连上,就是非法连接,需要重新进行会话建立。

发起会话建立请求

发起会话建立,首先构建一个ConnectRequest请求体:

ConnectRequest conReq = new ConnectRequest(0, lastZxid, sessionTimeout, sessId, sessionPasswd);

ConnectRequest需要传入客户端最近一次的事务zxid,会话超时时间,会话id,会话密码。首次建立会话,sessionId为0,sessionPasswd为空。

其次,将ConnectRequest请求体包装进Packet对象,Packet是ZooKeeper通信的最小单元,所有请求体都会包装进一个Packet对象再序列化发送给服务端。

Packet(RequestHeader requestHeader,ReplyHeader replyHeader,Record request, Record response, WatchRegistration watchRegistration, boolean readOnly)

对于ConnectRequest包装的Packet,请求头RequestHeader为null,并且会被放在请求队列outgoingQueue的首位。

outgoingQueue.addFirst(new Packet(null, null, conReq, null, null, readOnly));

接着注册网络IO读写事件,后续请求包就会被发送给服务端了,发送完成后再将会话建立的Packet从outgoingQueue中移除。

注:发送到服务端的会话创建流程,暂时不需要了解,后面讲解服务端源码时会详细讲述。

会话建立响应

会话建立响应,需要和普通请求响应分开处理,如果接收到响应信息,判断客户端还未初始化完成,就认为这个响应一定是会话建立响应。

首先从底层网络缓冲区读取数据反序列化构建ConnectResponse响应体,解析出经过服务器协商好的sessionTimeout以及sessionId 和 sessionPasswd。

根据协商的sessionTimeout重新设置readTimeout和connectTimeout,readTimeout是negotiatedSessionTimeout的2/3,connectTimeout是协商negotiatedSessionTimeout与服务地址列表个数平均。

最后生成一个SyncConnected的watcher事件,交由EventThread线程进行回调,这是唯一一个不需要向服务端注册的watcher事件,完全由客户端自己生成和触发。

心跳保持长连接

网络建立连接,会话建立都完成后,就可以与服务端通信了,为了保持长连接的会话一直有效,在没有向服务端发送请求的一段时间内会发送心跳请求。而一段时间是多久?如下是计算下一次发送心跳请求时间的算法:

// clientCnxnSocket.getIdleSend为距离上次发送的时长 // readTimeout = sessionTimeout * 2 / 3 int timeToNextPing = readTimeout / 2 - clientCnxnSocket.getIdleSend() - ((clientCnxnSocket.getIdleSend() > 1000) ? 1000 : 0);

根据计算,每隔sessionTimeout/3如果没有发送任何请求,就发送一次心跳。多减1秒是为了防止因为竞态情况而丢失心跳请求。

发送的心跳请求数据很简单,没有请求体,只有请求头:

如下是在源码中加了日志后的心跳过程:

协商sessionTimeout为9999,readTimeout计算得6666,timeToNextPing为3333,每次会多减1秒,大概每隔3秒发一次心跳。

总结

客户端与服务端建立的是长连接,如果连接失败,服务地址列表会轮询重试,直到连接成功,官方提供了默认的服务地址负载算法 StaticHostProvider,也可以自己实现。 每条连接是有状态的,只有建立了会话,才能真正开始与服务端通信。会话建立成功之后,会生成一个SyncConnected事件进行回调通知。 会话是临时节点的基础,在会话有效期内断开重连,可以恢复上一次工作场景。 为了保持长连接的会话一直有效,在没有向服务端发送请求的一段时间内会发送心跳请求,心跳间隔时间为sessionTimeout/3。

如若文章有错误理解,欢迎批评指正,同时非常期待你的评论、点赞和收藏。

版权声明:本文内容由网络用户投稿,版权归原作者所有,本站不拥有其著作权,亦不承担相应法律责任。如果您发现本站中有涉嫌抄袭或描述失实的内容,请联系我们jiasou666@gmail.com 处理,核实后本网站将在24小时内删除侵权内容。

上一篇:32段LCD驱动器AY0438及其与单片机的接口设计
下一篇:Linux 系统基础命令
相关文章

 发表评论

暂时没有评论,来抢沙发吧~