c语言sscanf函数的用法是什么
251
2022-12-02
PostgreSQL数据库高可用patroni源码学习——Etcd类
Etcd类继承自AbstractEtcd抽象类,AbstractEtcd抽象类继承自AbstractDCS(AbstractEtcd的构造函数会传入EtcdClient类)。Etcd类的构造函数首先会调用父类的构造函数,然后将__do_not_watch成员设置为False。
class Etcd(AbstractEtcd): def __init__(self, config): super(Etcd, self).__init__(config, EtcdClient, (etcd.EtcdLeaderElectionInProgress, EtcdRaftInternal)) self.__do_not_watch = False
ETCD Module
创建一个client对象用于连接etcd集群,如下代码为etcd模块中的Client对象不同的初始化方式,代表了client使用不同的选项连接etcd集群。默认创建一个连接服务于localhost端口4001的etcd集群的client对象。
import etcdclient = etcd.Client() # this will create a client against etcd server running on localhost on port 4001client = etcd.Client(port=4002)client = etcd.Client(host='127.0.0.1', port=4003)client = etcd.Client(host='127.0.0.1', port=4003, allow_redirect=False) # wont let you run sensitive commands on non-leader machines, default is trueclient = etcd.Client(host='127.0.0.1', port=4003, allow_reconnect=True, protocol='1)# with ttlclient.write('/nodes/n2', 2, ttl=4) # sets the ttl to 4 seconds# create onlyclient.write('/nodes/n3', 'test', prevExist=False)# Compare and swap values atomicallyclient.write('/nodes/n3', 'test2', prevValue='test1') #this fails to writeclient.write('/nodes/n3', 'test2', prevIndex=10) #this fails to write# mkdirclient.write('/nodes/queue', None, dir=True)# Append a value to a queue dirclient.write('/nodes/queue', 'test', append=True) #will write i.e. /nodes/queue/11client.write('/nodes/queue', 'test2', append=True) #will write i.e. /nodes/queue/12
同样你也可以使用如下方式自动更新结果(You can also atomically update a result)。
result = client.read('/foo')print(result.value) # barresult.value += u'bar'updated = client.update(result) # if any other client wrote '/foo' in the meantime this will failprint(updated.value) # barbar
通过如下代码可以使用client从etcd集群中获取键相应的值。
client.read('/nodes/n2').value#recursively read a directory 递归获取目录下的所有键值r = client.read('/nodes', recursive=True, sorted=True)for child in r.children: print("%s: %s" % (child.key,child.value))client.read('/nodes/n2', wait=True) #Waits for a change in value in the key before returning.client.read('/nodes/n2', wait=True, waitIndex=10)# raises etcd.EtcdKeyNotFound when key not foundtry: client.read('/invalid/path')except etcd.EtcdKeyNotFound: # do something print "error"
删除一个键
client.delete('/nodes/n1')client.delete('/nodes', dir=True) #spits an error if dir is not emptyclient.delete('/nodes', recursive=True) #this works recursively
使用etcd模块提供的锁机制(Locking module)
# Initialize the lock object:# NOTE: this does not acquire a lock yetclient = etcd.Client()lock = etcd.Lock(client, 'my_lock_name')# Use the lock object:lock.acquire(blocking=True, # will block until the lock is acquired lock_ttl=None) # lock will live until we release itlock.is_acquired # Truelock.acquire(lock_ttl=60) # renew a locklock.release() # release an existing locklock.is_acquired # False# The lock object may also be used as a context manager:client = etcd.Client()with etcd.Lock(client, 'customer1') as my_lock: do_stuff() my_lock.is_acquired # True my_lock.acquire(lock_ttl=60)my_lock.is_acquired # False
获取etcd集群中的节点和leader(client.machines、client.leader),详细内容参见class="data-table" data-id="t7a7e9d1-7fKIENmd" data-width="" style="outline: none; border-collapse: collapse; width: 100%;">
函数名
用途
代码
touch_member(self, data, permanent=False)
self._client.set(self.member_path, json.dumps(data, separators=(’,’, ‘:’)), None if permanent else self._ttl)
take_leader(self)
self.retry(self._client.write, self.leader_path, self._name, ttl=self._ttl)
set_failover_value(self, value, index=None)
设置failover_path的值
self._client.write(self.failover_path, value, prevIndex=index or 0)
set_config_value(self, value, index=None)
设置config_path的值
self._client.write(self.config_path, value, prevIndex=index or 0)
_write_leader_optime(self, last_lsn)
设置leader_optime_path的lsn值
self._client.set(self.leader_optime_path, last_lsn)
_write_status(self, value)
设置status_path的值
self._client.set(self.status_path, value)
_update_leader(self)
self.retry(self._client.write, self.leader_path, self._name, prevValue=self._name, ttl=self._ttl)
_delete_leader(self)
删除leader_path的值
self._client.delete(self.leader_path, prevValue=self._name)
initialize(self, create_new=True, sysid="")
self.retry(self._client.write, self.initialize_path, sysid, prevExist=(not create_new))
cancel_initialization(self)
self.retry(self._client.delete, self.initialize_path)
delete_cluster(self)
self.retry(self._client.delete, self.client_path(’’), recursive=True)
set_history_value(self, value)
设置history_path的值
self._client.write(self.history_path, value)
set_sync_state_value(self, value, index=None)
self.retry(self._client.write, self.sync_path, value, prevIndex=index or 0)
delete_sync_state(self, index=None)
self.retry(self._client.delete, self.sync_path, prevIndex=index or 0)
attempt_to_acquire_leader函数用于尝试获取leader键,设置为自己。捕获etcd.EtcdAlreadyExist,说明获取leader地位失败。
def attempt_to_acquire_leader(self, permanent=False): try: return bool(self.retry(self._client.write, self.leader_path, self._name, ttl=None if permanent else self._ttl, prevExist=False)) except etcd.EtcdAlreadyExist: logger.info('Could not take out TTL lock') except (RetryFailedError, etcd.EtcdException): pass return False
watch成员函数
watch成员函数前面对__do_not_watch和leader_index的判断不做解释,其主要while循环的作用是利用etcd client去watch领导path,如果没出现异常,返回True,设置_has_failed为False;如果发生etcd.EtcdWatchTimedOut,返回False,设置_has_failed为False;如果发生etcd.EtcdEventIndexCleared或etcd.EtcdWatcherCleared,说明watch失败,返回True,设置_has_failed为False;其他异常调用_handle_exception函数。
def watch(self, leader_index, timeout): if self.__do_not_watch: self.__do_not_watch = False return True if leader_index: end_time = time.time() + timeout while timeout >= 1: # when timeout is too small urllib3 doesn't have enough time to connect try: result = self._client.watch(self.leader_path, index=leader_index, timeout=timeout + 0.5) self._has_failed = False if result.action == 'compareAndSwap': time.sleep(0.01) # Synchronous work of all cluster members with etcd is less expensive # than reestablishing connection every time from every replica. return True except etcd.EtcdWatchTimedOut: self._has_failed = False return False except (etcd.EtcdEventIndexCleared, etcd.EtcdWatcherCleared): # Watch failed self._has_failed = False return True # leave the loop, because watch with the same parameters will fail anyway except etcd.EtcdException as e: self._handle_exception(e, 'watch', True) timeout = end_time - time.time() try: return super(Etcd, self).watch(None, timeout) # AbstractDCS的watch成员函数(self.event.wait(timeout) return self.event.isSet()) finally: self.event.clear()
如果没有设置leader_index,或timeout小于1,则使用python多线程事件的wait进行阻塞。 python多线程之事件(Event):小伙伴a,b,c围着吃火锅,当菜上齐了,请客的主人说:开吃!,于是小伙伴一起动筷子,这种场景如何实现? 事件处理的机制:全局定义了一个内置标志Flag,如果Flag值为 False,那么当程序执行 event.wait方法时就会阻塞,如果Flag值为True,那么event.wait 方法时便不再阻塞。Event其实就是一个简化版的 Condition。Event没有锁,无法使线程进入同步阻塞状态。
set(): 将标志设为True,并通知所有处于等待阻塞状态的线程恢复运行状态。clear(): 将标志设为False。wait(timeout): 如果标志为True将立即返回,否则阻塞线程至等待阻塞状态,等待其他线程调用set()。isSet(): 获取内置标志状态,返回True或False。
版权声明:本文内容由网络用户投稿,版权归原作者所有,本站不拥有其著作权,亦不承担相应法律责任。如果您发现本站中有涉嫌抄袭或描述失实的内容,请联系我们jiasou666@gmail.com 处理,核实后本网站将在24小时内删除侵权内容。
发表评论
暂时没有评论,来抢沙发吧~