"""Definition of an OrderedDict-subclass that maintains the order key values
rather than the insertion order. It can be specialized to use a specific
comparison function for ordering keys.
"""
import collections
from typing import Callable
# Imports needed for replicating OrderedDict behaviour
import sys as _sys
from operator import eq as _eq
from collections import _proxy, _Link, _OrderedDictKeysView
from collections import _OrderedDictValuesView, _OrderedDictItemsView
from collections.abc import MutableMapping
from reprlib import recursive_repr as _recursive_repr
# -----------------------------------------------------------------------------
[docs]class KeyOrderedDict(dict):
"""This dict maintains the order of keys not by their insertion but by
their value. It is a re-implementation of collections.OrderedDict, because
subclassing that class is really difficult.
Ordering is maintained by adjusting those methods of OrderedDict that
take care of building and maintaining the doubly-linked list that provides
the ordering. See OrderedDict for details, e.g. the weak-referencing.
In effect, this relates only to __setitem__; all other methods rely on this
to add elements to the mapping.
For comparison, the ``key`` callable given at initialisation can be used to
perform a operation on keys, the result of which is used in comparison.
If this is not given, the ``DEFAULT_KEY_COMPARATOR`` class variable is
used; note that this needs to be a binary function, where the first
argument is equivalent to ``self`` and the second is the actual key to
perform the unary operation on.
"""
# The default key comparator
DEFAULT_KEY_COMPARATOR = lambda _, k: k
[docs] def __init__(self, *args, key: Callable=None, **kwds):
"""Initialize a KeyOrderedDict, which maintains key ordering. If no
custom ordering function is given, orders by simple "smaller than"
comparison.
Apart from that, the interface is the same as for regular dictionaries.
Args:
*args: a single sequence of (key, value) pairs to insert
key (Callable, optional): The callable used to
**kwds: Passed on to ``update`` method
Raises:
TypeError: on len(args) > 1
"""
if len(args) > 1:
raise TypeError("expected at most 1 arguments, got {}"
"".format(len(args)))
# Set the key comparison function, passing through if nothing is set
self._key = key if key is not None else self.DEFAULT_KEY_COMPARATOR
# Set up internally used attributes
try:
self.__root
except AttributeError:
self.__hardroot = _Link()
self.__root = root = _proxy(self.__hardroot)
root.prev = root.next = root
self.__map = {}
# Populate the dict
self.__update(*args, **kwds)
# .........................................................................
# Custom implementations that ascertain key ordering
[docs] def _key_comp_lt(self, k1, k2) -> bool:
"""The key comparator. Returns true for k1 < k2.
Before comparison, the unary ``self._key`` method is invoked on both
keys.
Args:
k1: lhs of comparison
k2: rhs of comparison
Returns:
bool: k1 < k2
"""
# Create the actual keys to compare
try:
_k1, _k2 = self._key(k1), self._key(k2)
except Exception as exc:
raise ValueError("Could not apply key transformation method on "
"one or both of the keys '{}' and '{}'!"
"".format(k1, k2)
) from exc
# Compare
try:
return _k1 < _k2
except Exception as exc:
raise ValueError("Failed comparing '{}' (type {}, from key '{}') "
"to '{}' (type {}, from key '{}')! Keys of {} "
"need to be comparable after the key "
"transformation function {} was applied to them."
"".format(_k1, type(_k1), k1,
_k2, type(_k2), k2,
self.__class__.__name__,
self._key.__name__)
) from exc
[docs] def __setitem__(self, key, value, *,
dict_setitem=dict.__setitem__, proxy=_proxy, Link=_Link):
"""Set the item with the provided key to the given value, maintaining
the ordering specified by NEW_KEY_GT_NEXT_KEY
If the key is not available, this takes care of finding the right place
to insert the new link in the doubly-linked list.
"""
if key not in self:
# Create a new link in the inherited mapping
self.__map[key] = link = Link()
# Get the sentinel element and the one at the end
root = self.__root
last = root.prev
# Find the link in the doubly-linked list after which the new
# element is to be inserted, i.e.: the first element that compares
# true when invoking the key comparator
# Start with root element. For empty maps, this is already the
# element after which the link can be inserted.
element = root
# For populated maps, need to iterate over the linked list
if last is not root:
# Start at root and iterate until again reaching root
while element.next is not root:
# Check if the next element's key would compare strictly
# greater than the key to be inserted.
# NOTE root.key is not available!
if self._key_comp_lt(key, element.next.key):
# Found it.
break
# Did not find it. Go to the next one
element = element.next
# element is now the element _after_ which to insert the new link
link.prev, link.next, link.key = element, element.next, key
# Update neighbouring links such that the new link is in between
link.prev.next = link
link.next.prev = proxy(link) # need be a weak link
# Key is now available
# Set the value of the item using the method implemented by dict
dict_setitem(self, key, value)
[docs] def insert(self, key, value, *, hint_after=None, hint_before=None):
"""Inserts a (key, value) pair using hint information to speed up
key search.
"""
raise NotImplementedError
# .........................................................................
# Remaining dict interface, oriented at collections.OrderedDict
[docs] def __delitem__(self, key, dict_delitem=dict.__delitem__):
"""kod.__delitem__(y) <==> del kod[y]
Deleting an existing item uses self.__map to find the link which gets
removed by updating the links in the predecessor and successor nodes.
"""
dict_delitem(self, key)
link = self.__map.pop(key)
link_prev = link.prev
link_next = link.next
link_prev.next = link_next
link_next.prev = link_prev
link.prev = None
link.next = None
[docs] def __iter__(self):
"""kod.__iter__() <==> iter(kod)
Traverse the linked list in order.
"""
root = self.__root
curr = root.next
while curr is not root:
yield curr.key
curr = curr.next
[docs] def __reversed__(self):
"""kod.__reversed__() <==> reversed(kod)
Traverse the linked list in reverse order.
"""
root = self.__root
curr = root.prev
while curr is not root:
yield curr.key
curr = curr.prev
[docs] def clear(self):
"""Remove all items"""
root = self.__root
root.prev = root.next = root
self.__map.clear()
dict.clear(self)
[docs] def __sizeof__(self):
"""Get the size of this object"""
sizeof = _sys.getsizeof
# number of links including root
n = len(self) + 1
# instance dictionary
size = sizeof(self.__dict__)
# internal dict and inherited dict
size += sizeof(self.__map) * 2
# link and proxy objects
size += sizeof(self.__hardroot) * n
size += sizeof(self.__root) * n
# key comparator
size += sizeof(self._key)
return size
update = __update = MutableMapping.update
[docs] def keys(self):
"""D.keys() -> a set-like object providing a view on D's keys"""
return _OrderedDictKeysView(self)
[docs] def items(self):
"""D.items() -> a set-like object providing a view on D's items"""
return _OrderedDictItemsView(self)
[docs] def values(self):
"""D.values() -> an object providing a view on D's values"""
return _OrderedDictValuesView(self)
__ne__ = MutableMapping.__ne__
__marker = object()
[docs] def pop(self, key, default=__marker):
"""kod.pop(k[,d]) -> v, remove specified key and return the
corresponding value. If key is not found, default is returned if
given, otherwise KeyError is raised.
"""
if key in self:
result = self[key]
del self[key]
return result
if default is self.__marker:
raise KeyError(key)
return default
[docs] def setdefault(self, key, default=None):
"""kod.setdefault(k[,d]) -> kod.get(k,d), also set kod[k]=d if k not
in kod
"""
if key in self:
return self[key]
self[key] = default
return default
@_recursive_repr()
def __repr__(self):
"""kod.__repr__() <==> repr(kod)"""
if not self:
return '%s()' % (self.__class__.__name__,)
return '%s(%r)' % (self.__class__.__name__, list(self.items()))
[docs] def __reduce__(self):
"""Return state information for pickling"""
inst_dict = vars(self).copy()
for k in vars(KeyOrderedDict()):
inst_dict.pop(k, None)
return self.__class__, (), inst_dict or None, None, iter(self.items())
[docs] def copy(self):
"""kod.copy() -> a shallow copy of kod, maintaining the key comparator"""
return self.__class__(self, key=self._key)
[docs] @classmethod
def fromkeys(cls, iterable, value=None, *, key: Callable=None):
"""KOD.fromkeys(S[, v]) -> New key-ordered dictionary with keys from S.
If not specified, the value defaults to None.
"""
self = cls(key=key)
for key in iterable:
self[key] = value
return self
[docs] def __eq__(self, other):
"""kod.__eq__(y) <==> kod==y.
Comparison to another KeyOrderedDict or OrderedDict is order-sensitive
while comparison to a regular mapping is order-insensitive.
"""
if isinstance(other, (KeyOrderedDict, collections.OrderedDict)):
return dict.__eq__(self, other) and all(map(_eq, self, other))
return dict.__eq__(self, other)
# -----------------------------------------------------------------------------
# Specializations
[docs]class IntOrderedDict(KeyOrderedDict):
"""A :py:class:`~dantro.utils.ordereddict.KeyOrderedDict` specialization
that assumes keys to be castable to integer and using the comparison of
the resulting integer values for maintaining the order
"""
DEFAULT_KEY_COMPARATOR = lambda _, k: int(k)