PostgreSQL数据库高可用patroni源码学习——Etcd类

网友投稿 251 2022-12-02

PostgreSQL数据库高可用patroni源码学习——Etcd类

Etcd类继承自AbstractEtcd抽象类,AbstractEtcd抽象类继承自AbstractDCS(AbstractEtcd的构造函数会传入EtcdClient类)。Etcd类的构造函数首先会调用父类的构造函数,然后将__do_not_watch成员设置为False。

class Etcd(AbstractEtcd): def __init__(self, config): super(Etcd, self).__init__(config, EtcdClient, (etcd.EtcdLeaderElectionInProgress, EtcdRaftInternal)) self.__do_not_watch = False

ETCD Module

创建一个client对象用于连接etcd集群,如下代码为etcd模块中的Client对象不同的初始化方式,代表了client使用不同的选项连接etcd集群。默认创建一个连接服务于localhost端口4001的etcd集群的client对象。

import etcdclient = etcd.Client() # this will create a client against etcd server running on localhost on port 4001client = etcd.Client(port=4002)client = etcd.Client(host='127.0.0.1', port=4003)client = etcd.Client(host='127.0.0.1', port=4003, allow_redirect=False) # wont let you run sensitive commands on non-leader machines, default is trueclient = etcd.Client(host='127.0.0.1', port=4003, allow_reconnect=True, protocol='1)# with ttlclient.write('/nodes/n2', 2, ttl=4) # sets the ttl to 4 seconds# create onlyclient.write('/nodes/n3', 'test', prevExist=False)# Compare and swap values atomicallyclient.write('/nodes/n3', 'test2', prevValue='test1') #this fails to writeclient.write('/nodes/n3', 'test2', prevIndex=10) #this fails to write# mkdirclient.write('/nodes/queue', None, dir=True)# Append a value to a queue dirclient.write('/nodes/queue', 'test', append=True) #will write i.e. /nodes/queue/11client.write('/nodes/queue', 'test2', append=True) #will write i.e. /nodes/queue/12

同样你也可以使用如下方式自动更新结果(You can also atomically update a result)。

result = client.read('/foo')print(result.value) # barresult.value += u'bar'updated = client.update(result) # if any other client wrote '/foo' in the meantime this will failprint(updated.value) # barbar

通过如下代码可以使用client从etcd集群中获取键相应的值。

client.read('/nodes/n2').value#recursively read a directory 递归获取目录下的所有键值r = client.read('/nodes', recursive=True, sorted=True)for child in r.children: print("%s: %s" % (child.key,child.value))client.read('/nodes/n2', wait=True) #Waits for a change in value in the key before returning.client.read('/nodes/n2', wait=True, waitIndex=10)# raises etcd.EtcdKeyNotFound when key not foundtry: client.read('/invalid/path')except etcd.EtcdKeyNotFound: # do something print "error"

删除一个键

client.delete('/nodes/n1')client.delete('/nodes', dir=True) #spits an error if dir is not emptyclient.delete('/nodes', recursive=True) #this works recursively

使用etcd模块提供的锁机制(Locking module)

# Initialize the lock object:# NOTE: this does not acquire a lock yetclient = etcd.Client()lock = etcd.Lock(client, 'my_lock_name')# Use the lock object:lock.acquire(blocking=True, # will block until the lock is acquired lock_ttl=None) # lock will live until we release itlock.is_acquired # Truelock.acquire(lock_ttl=60) # renew a locklock.release() # release an existing locklock.is_acquired # False# The lock object may also be used as a context manager:client = etcd.Client()with etcd.Lock(client, 'customer1') as my_lock: do_stuff() my_lock.is_acquired # True my_lock.acquire(lock_ttl=60)my_lock.is_acquired # False

获取etcd集群中的节点和leader(​​client.machines​​​、​​client.leader​​),详细内容参见class="data-table" data-id="t7a7e9d1-7fKIENmd" data-width="" style="outline: none; border-collapse: collapse; width: 100%;">

函数名

用途

代码

touch_member(self, data, permanent=False)

self._client.set(self.member_path, json.dumps(data, separators=(’,’, ‘:’)), None if permanent else self._ttl)

take_leader(self)

self.retry(self._client.write, self.leader_path, self._name, ttl=self._ttl)

set_failover_value(self, value, index=None)

设置failover_path的值

self._client.write(self.failover_path, value, prevIndex=index or 0)

set_config_value(self, value, index=None)

设置config_path的值

self._client.write(self.config_path, value, prevIndex=index or 0)

_write_leader_optime(self, last_lsn)

设置leader_optime_path的lsn值

self._client.set(self.leader_optime_path, last_lsn)

_write_status(self, value)

设置status_path的值

self._client.set(self.status_path, value)

_update_leader(self)

self.retry(self._client.write, self.leader_path, self._name, prevValue=self._name, ttl=self._ttl)

_delete_leader(self)

删除leader_path的值

self._client.delete(self.leader_path, prevValue=self._name)

initialize(self, create_new=True, sysid="")

self.retry(self._client.write, self.initialize_path, sysid, prevExist=(not create_new))

cancel_initialization(self)

self.retry(self._client.delete, self.initialize_path)

delete_cluster(self)

self.retry(self._client.delete, self.client_path(’’), recursive=True)

set_history_value(self, value)

设置history_path的值

self._client.write(self.history_path, value)

set_sync_state_value(self, value, index=None)

self.retry(self._client.write, self.sync_path, value, prevIndex=index or 0)

delete_sync_state(self, index=None)

self.retry(self._client.delete, self.sync_path, prevIndex=index or 0)

attempt_to_acquire_leader函数用于尝试获取leader键,设置为自己。捕获etcd.EtcdAlreadyExist,说明获取leader地位失败。

def attempt_to_acquire_leader(self, permanent=False): try: return bool(self.retry(self._client.write, self.leader_path, self._name, ttl=None if permanent else self._ttl, prevExist=False)) except etcd.EtcdAlreadyExist: logger.info('Could not take out TTL lock') except (RetryFailedError, etcd.EtcdException): pass return False

watch成员函数

watch成员函数前面对__do_not_watch和leader_index的判断不做解释,其主要while循环的作用是利用etcd client去watch领导path,如果没出现异常,返回True,设置_has_failed为False;如果发生etcd.EtcdWatchTimedOut,返回False,设置_has_failed为False;如果发生etcd.EtcdEventIndexCleared或etcd.EtcdWatcherCleared,说明watch失败,返回True,设置_has_failed为False;其他异常调用_handle_exception函数。

def watch(self, leader_index, timeout): if self.__do_not_watch: self.__do_not_watch = False return True if leader_index: end_time = time.time() + timeout while timeout >= 1: # when timeout is too small urllib3 doesn't have enough time to connect try: result = self._client.watch(self.leader_path, index=leader_index, timeout=timeout + 0.5) self._has_failed = False if result.action == 'compareAndSwap': time.sleep(0.01) # Synchronous work of all cluster members with etcd is less expensive # than reestablishing connection every time from every replica. return True except etcd.EtcdWatchTimedOut: self._has_failed = False return False except (etcd.EtcdEventIndexCleared, etcd.EtcdWatcherCleared): # Watch failed self._has_failed = False return True # leave the loop, because watch with the same parameters will fail anyway except etcd.EtcdException as e: self._handle_exception(e, 'watch', True) timeout = end_time - time.time() try: return super(Etcd, self).watch(None, timeout) # AbstractDCS的watch成员函数(self.event.wait(timeout) return self.event.isSet()) finally: self.event.clear()

如果没有设置leader_index,或timeout小于1,则使用python多线程事件的wait进行阻塞。 python多线程之事件(Event):小伙伴a,b,c围着吃火锅,当菜上齐了,请客的主人说:开吃!,于是小伙伴一起动筷子,这种场景如何实现? 事件处理的机制:全局定义了一个内置标志Flag,如果Flag值为 False,那么当程序执行 event.wait方法时就会阻塞,如果Flag值为True,那么event.wait 方法时便不再阻塞。Event其实就是一个简化版的 Condition。Event没有锁,无法使线程进入同步阻塞状态。

set(): 将标志设为True,并通知所有处于等待阻塞状态的线程恢复运行状态。clear(): 将标志设为False。wait(timeout): 如果标志为True将立即返回,否则阻塞线程至等待阻塞状态,等待其他线程调用set()。isSet(): 获取内置标志状态,返回True或False。

版权声明:本文内容由网络用户投稿,版权归原作者所有,本站不拥有其著作权,亦不承担相应法律责任。如果您发现本站中有涉嫌抄袭或描述失实的内容,请联系我们jiasou666@gmail.com 处理,核实后本网站将在24小时内删除侵权内容。

上一篇:PostgreSQL数据库查询执行——ProcessUtilitySlow
下一篇:Java秒杀系统:web层详解
相关文章

 发表评论

暂时没有评论,来抢沙发吧~