summaryrefslogtreecommitdiffstats
path: root/node-admin/scripts/pyroute2/ipdb/transactional.py
diff options
context:
space:
mode:
Diffstat (limited to 'node-admin/scripts/pyroute2/ipdb/transactional.py')
-rw-r--r--node-admin/scripts/pyroute2/ipdb/transactional.py402
1 files changed, 402 insertions, 0 deletions
diff --git a/node-admin/scripts/pyroute2/ipdb/transactional.py b/node-admin/scripts/pyroute2/ipdb/transactional.py
new file mode 100644
index 00000000000..533f3b9fd7f
--- /dev/null
+++ b/node-admin/scripts/pyroute2/ipdb/transactional.py
@@ -0,0 +1,402 @@
+# By Peter V. Saveliev https://pypi.python.org/pypi/pyroute2. Dual licensed under the Apache 2 and GPLv2+ see https://github.com/svinota/pyroute2 for License details.
+'''
+'''
+import uuid
+import threading
+from pyroute2.common import Dotkeys
+from pyroute2.ipdb.common import SYNC_TIMEOUT
+from pyroute2.ipdb.common import CommitException
+from pyroute2.ipdb.common import DeprecationException
+from pyroute2.ipdb.linkedset import LinkedSet
+
+
+class State(object):
+
+ def __init__(self, lock=None):
+ self.lock = lock or threading.Lock()
+ self.flag = 0
+
+ def acquire(self):
+ self.lock.acquire()
+ self.flag += 1
+
+ def release(self):
+ assert self.flag > 0
+ self.flag -= 1
+ self.lock.release()
+
+ def is_set(self):
+ return self.flag
+
+ def __enter__(self):
+ self.acquire()
+ return self
+
+ def __exit__(self, exc_type, exc_value, traceback):
+ self.release()
+
+
+def update(f):
+ def decorated(self, *argv, **kwarg):
+ # obtain update lock
+ ret = None
+ tid = None
+ direct = True
+ with self._write_lock:
+ dcall = kwarg.pop('direct', False)
+ if dcall:
+ self._direct_state.acquire()
+
+ direct = self._direct_state.is_set()
+ if not direct:
+ # 1. begin transaction for 'direct' type
+ if self._mode == 'direct':
+ tid = self.begin()
+ # 2. begin transaction, if there is none
+ elif self._mode == 'implicit':
+ if not self._tids:
+ self.begin()
+ # 3. require open transaction for 'explicit' type
+ elif self._mode == 'explicit':
+ if not self._tids:
+ raise TypeError('start a transaction first')
+ # 4. transactions can not require transactions :)
+ elif self._mode == 'snapshot':
+ direct = True
+ # do not support other modes
+ else:
+ raise TypeError('transaction mode not supported')
+ # now that the transaction _is_ open
+ ret = f(self, direct, *argv, **kwarg)
+
+ if dcall:
+ self._direct_state.release()
+
+ if tid:
+ # close the transaction for 'direct' type
+ self.commit(tid)
+
+ return ret
+ decorated.__doc__ = f.__doc__
+ return decorated
+
+
+class Transactional(Dotkeys):
+ '''
+ Utility class that implements common transactional logic.
+ '''
+ _fields_cmp = {}
+
+ def __init__(self, ipdb=None, mode=None, parent=None, uid=None):
+ #
+ if ipdb is not None:
+ self.nl = ipdb.nl
+ self.ipdb = ipdb
+ else:
+ self.nl = None
+ self.ipdb = None
+ #
+ self._parent = None
+ if parent is not None:
+ self._mode = mode or parent._mode
+ self._parent = parent
+ elif ipdb is not None:
+ self._mode = mode or ipdb.mode
+ else:
+ self._mode = mode or 'implicit'
+ #
+ self.nlmsg = None
+ self.uid = uid or uuid.uuid4()
+ self.last_error = None
+ self._commit_hooks = []
+ self._fields = []
+ self._sids = []
+ self._ts = threading.local()
+ self._snapshots = {}
+ self._targets = {}
+ self._local_targets = {}
+ self._write_lock = threading.RLock()
+ self._direct_state = State(self._write_lock)
+ self._linked_sets = set()
+
+ @property
+ def _tids(self):
+ if not hasattr(self._ts, 'tids'):
+ self._ts.tids = []
+ return self._ts.tids
+
+ @property
+ def _transactions(self):
+ if not hasattr(self._ts, 'transactions'):
+ self._ts.transactions = {}
+ return self._ts.transactions
+
+ def register_callback(self, callback):
+ raise DeprecationException("deprecated since 0.2.15;"
+ "use `register_commit_hook()`")
+
+ def register_commit_hook(self, hook):
+ # FIXME: write docs
+ self._commit_hooks.append(hook)
+
+ def unregister_callback(self, callback):
+ raise DeprecationException("deprecated since 0.2.15;"
+ "use `unregister_commit_hook()`")
+
+ def unregister_commit_hook(self, hook):
+ # FIXME: write docs
+ with self._write_lock:
+ for cb in tuple(self._commit_hooks):
+ if hook == cb:
+ self._commit_hooks.pop(self._commit_hooks.index(cb))
+
+ def pick(self, detached=True, uid=None, parent=None, forge_tids=False):
+ '''
+ Get a snapshot of the object. Can be of two
+ types:
+ * detached=True -- (default) "true" snapshot
+ * detached=False -- keep ip addr set updated from OS
+
+ Please note, that "updated" doesn't mean "in sync".
+ The reason behind this logic is that snapshots can be
+ used as transactions.
+ '''
+ with self._write_lock:
+ res = self.__class__(ipdb=self.ipdb,
+ mode='snapshot',
+ parent=parent,
+ uid=uid)
+ for (key, value) in self.items():
+ if key in self._fields:
+ if isinstance(value, Transactional):
+ t = value.pick(detached=detached,
+ uid=res.uid,
+ parent=self)
+ if forge_tids:
+ # forge the transaction for nested objects
+ value._transactions[res.uid] = t
+ value._tids.append(res.uid)
+ res[key] = t
+ else:
+ res[key] = self[key]
+ for key in self._linked_sets:
+ res[key] = LinkedSet(self[key])
+ if not detached:
+ self[key].connect(res[key])
+ return res
+
+ def __enter__(self):
+ # FIXME: use a bitmask?
+ if self._mode not in ('implicit', 'explicit'):
+ raise TypeError('context managers require a transactional mode')
+ if not self._tids:
+ self.begin()
+ return self
+
+ def __exit__(self, exc_type, exc_value, traceback):
+ # apply transaction only if there was no error
+ if exc_type is None:
+ try:
+ self.commit()
+ except Exception as e:
+ self.last_error = e
+ raise
+
+ def __repr__(self):
+ res = {}
+ for i in self:
+ if self[i] is not None:
+ res[i] = self[i]
+ return res.__repr__()
+
+ def __sub__(self, vs):
+ res = self.__class__(ipdb=self.ipdb, mode='snapshot')
+ with self._direct_state:
+ # simple keys
+ for key in self:
+ if (key in self._fields) and \
+ ((key not in vs) or (self[key] != vs[key])):
+ res[key] = self[key]
+ for key in self._linked_sets:
+ diff = LinkedSet(self[key] - vs[key])
+ if diff:
+ res[key] = diff
+ return res
+
+ def dump(self, not_none=True):
+ with self._write_lock:
+ res = {}
+ for key in self:
+ if self[key] is not None and key[0] != '_':
+ if isinstance(self[key], Transactional):
+ res[key] = self[key].dump()
+ elif isinstance(self[key], LinkedSet):
+ res[key] = tuple(self[key])
+ else:
+ res[key] = self[key]
+ return res
+
+ def load(self, data):
+ pass
+
+ def commit(self, *args, **kwarg):
+ pass
+
+ def last_snapshot_id(self):
+ return self._sids[-1]
+
+ def revert(self, sid):
+ with self._write_lock:
+ self._transactions[sid] = self._snapshots[sid]
+ self._tids.append(sid)
+ self._sids.remove(sid)
+ del self._snapshots[sid]
+ return self
+
+ def snapshot(self):
+ '''
+ Create new snapshot
+ '''
+ return self._begin(mapping=self._snapshots,
+ ids=self._sids,
+ detached=True)
+
+ def begin(self):
+ '''
+ Start new transaction
+ '''
+ if self._parent is not None:
+ self._parent.begin()
+ else:
+ return self._begin(mapping=self._transactions,
+ ids=self._tids,
+ detached=False)
+
+ def _begin(self, mapping, ids, detached):
+ # keep snapshot's ip addr set updated from the OS
+ # it is required by the commit logic
+ if (self.ipdb is not None) and self.ipdb._stop:
+ raise RuntimeError("Can't start transaction on released IPDB")
+ t = self.pick(detached=detached, forge_tids=True)
+ mapping[t.uid] = t
+ ids.append(t.uid)
+ return t.uid
+
+ def last_snapshot(self):
+ if not self._sids:
+ raise TypeError('create a snapshot first')
+ return self._snapshots[self._sids[-1]]
+
+ def last(self):
+ '''
+ Return last open transaction
+ '''
+ with self._write_lock:
+ if not self._tids:
+ raise TypeError('start a transaction first')
+
+ return self._transactions[self._tids[-1]]
+
+ def review(self):
+ '''
+ Review last open transaction
+ '''
+ if not self._tids:
+ raise TypeError('start a transaction first')
+
+ with self._write_lock:
+ added = self.last() - self
+ removed = self - self.last()
+ for key in self._linked_sets:
+ added['-%s' % (key)] = removed[key]
+ added['+%s' % (key)] = added[key]
+ del added[key]
+ return added
+
+ def drop(self, tid=None):
+ '''
+ Drop a transaction.
+ '''
+ with self._write_lock:
+ if isinstance(tid, Transactional):
+ tid = tid.uid
+ elif tid is None:
+ tid = self._tids[-1]
+ self._tids.remove(tid)
+ del self._transactions[tid]
+ for (key, value) in self.items():
+ if isinstance(value, Transactional):
+ try:
+ value.drop(tid)
+ except KeyError:
+ pass
+
+ @update
+ def __setitem__(self, direct, key, value):
+ with self._write_lock:
+ if not direct:
+ # automatically set target on the last transaction,
+ # which must be started prior to that call
+ transaction = self.last()
+ transaction[key] = value
+ transaction._targets[key] = threading.Event()
+ else:
+ # set the item
+ Dotkeys.__setitem__(self, key, value)
+
+ # update on local targets
+ if key in self._local_targets:
+ func = self._fields_cmp.get(key, lambda x, y: x == y)
+ if func(value, self._local_targets[key].value):
+ self._local_targets[key].set()
+
+ # cascade update on nested targets
+ for tn in tuple(self._transactions.values()):
+ if (key in tn._targets) and (key in tn):
+ if self._fields_cmp.\
+ get(key, lambda x, y: x == y)(value, tn[key]):
+ tn._targets[key].set()
+
+ @update
+ def __delitem__(self, direct, key):
+ with self._write_lock:
+ # firstly set targets
+ self[key] = None
+
+ # then continue with delete
+ if not direct:
+ transaction = self.last()
+ if key in transaction:
+ del transaction[key]
+ else:
+ Dotkeys.__delitem__(self, key)
+
+ def option(self, key, value):
+ self[key] = value
+ return self
+
+ def unset(self, key):
+ del self[key]
+ return self
+
+ def _wait_all_targets(self):
+ for key, target in self._targets.items():
+ if key not in self._virtual_fields:
+ target.wait(SYNC_TIMEOUT)
+ if not target.is_set():
+ raise CommitException('target %s is not set' % key)
+
+ def set_target(self, key, value):
+ self._local_targets[key] = threading.Event()
+ self._local_targets[key].value = value
+
+ def mirror_target(self, key_from, key_to):
+ self._local_targets[key_to] = self._local_targets[key_from]
+
+ def set_item(self, key, value):
+ with self._direct_state:
+ self[key] = value
+
+ def del_item(self, key):
+ with self._direct_state:
+ del self[key]