import hashlib
import random
from collections.abc import Iterable
from collections.abc import Mapping
from collections.abc import Set
from typing import Any
from typing import Optional
from typing import Type
from typing import Union
import numpy
from ewoksutils.import_utils import qualname
from . import missing_data
def classhashdata(cls: Type) -> bytes:
return qualname(cls).encode()
def multitype_sorted(sequence: Iterable, key=None) -> list:
try:
return sorted(sequence, key=key)
except TypeError:
pass
if key is None:
def key(item):
return item
adict = dict()
for item in sequence:
typename = type(key(item)).__name__
adict.setdefault(typename, list()).append(item)
return [
item
for _, items in sorted(adict.items(), key=lambda tpl: tpl[0])
for item in sorted(items, key=key)
]
class UniversalHash:
def __init__(self, hexdigest: Union[str, bytes]):
if isinstance(hexdigest, bytes):
hexdigest = hexdigest.decode()
if not isinstance(hexdigest, str):
raise TypeError(hexdigest, type(hexdigest))
self._hexdigest = hexdigest
def __hash__(self):
# make it python hashable (to use in sets and dict keys)
return hash(self._hexdigest)
def __repr__(self):
return "UniversalHash('{}')".format(self)
def __str__(self):
return self._hexdigest
def __eq__(self, other):
return str(self) == str(other)
def __lt__(self, other):
return str(self) < str(other)
def uhash(value, _hash=None) -> UniversalHash:
"""Universial hash (as opposed to python's `hash`)."""
# Avoid using python's hash!
bdigest = _hash is None
if bdigest:
_hash = hashlib.sha256()
_hash.update(classhashdata(type(value)))
if value is None:
pass
elif isinstance(value, HasUhash):
_hash.update(repr(value.uhash).encode())
elif isinstance(value, UniversalHash):
_hash.update(repr(value).encode())
elif isinstance(value, bytes):
_hash.update(value)
elif isinstance(value, str):
_hash.update(value.encode())
elif isinstance(value, int):
_hash.update(hex(value).encode())
elif isinstance(value, float):
_hash.update(value.hex().encode())
elif isinstance(value, (numpy.ndarray, numpy.number)):
_hash.update(value.tobytes())
elif isinstance(value, Mapping):
lst = multitype_sorted(value.items(), key=lambda item: item[0])
if lst:
keys, values = zip(*lst)
else:
keys = values = list()
uhash(keys, _hash=_hash)
uhash(values, _hash=_hash)
elif isinstance(value, Set):
values = multitype_sorted(value)
uhash(values, _hash=_hash)
elif isinstance(value, Iterable):
# Ordered
for v in value:
uhash(v, _hash=_hash)
else:
# TODO: register custom types
raise TypeError(f"cannot uhash {value} (type: {type(value)})")
if bdigest:
return UniversalHash(_hash.hexdigest())
class HasUhash:
@property
def uhash(self) -> Optional[UniversalHash]:
raise NotImplementedError
def __hash__(self):
# make it python hashable (to use in sets and dict keys)
uhash = self.uhash
if uhash is None:
return hash(id(self))
else:
return hash(uhash)
def __eq__(self, other):
if isinstance(other, HasUhash):
uhash = other.uhash
elif isinstance(other, UniversalHash):
uhash = other
else:
raise TypeError(other, type(other))
return self.uhash == uhash
def _get_repr_data(self) -> dict:
data = dict()
uhash = self.uhash
if uhash is None:
data["uhash"] = None
else:
data["uhash"] = repr(str(uhash))
return data
def __repr__(self):
data = self._get_repr_data()
if data:
sdata = ", ".join([f"{k}={v}" for k, v in data.items()])
return f"{super().__repr__()}({sdata})"
else:
return super().__repr__()
def __str__(self):
data = self._get_repr_data()
if data:
sdata = ", ".join([f"{k}={v}" for k, v in data.items()])
return f"{qualname(type(self))}({sdata})"
else:
return qualname(type(self))
PreUhashTypes = Union[str, bytes, UniversalHash, HasUhash]
class UniversalHashable(HasUhash):
"""The universal hash of an instance of this class is based on:
* pre-uhash
* instance nonce (if any)
The universal hash is equal to the pre-hash when an instance nonce is not provided.
The pre-uhash is either provided or based on:
* data
* class nonce (class qualifier name, class version, superclass nonce)
"""
__CLASS_NONCE = None
__VERSION = None
MISSING_DATA = missing_data.MISSING_DATA
def __init__(
self,
pre_uhash: Optional[PreUhashTypes] = None,
instance_nonce: Optional[Any] = None,
):
self.set_uhash_init(pre_uhash=pre_uhash, instance_nonce=instance_nonce)
def __init_subclass__(subcls, version=None, **kwargs):
super().__init_subclass__(**kwargs)
supercls_data = subcls.class_nonce()
subcls.__VERSION = version
subcls_data = subcls.class_nonce_data()
subcls.__CLASS_NONCE = str(uhash((subcls_data, supercls_data)))
def set_uhash_init(
self,
pre_uhash: Optional[PreUhashTypes] = None,
instance_nonce: Optional[Any] = None,
):
self.__set_pre_uhash(pre_uhash)
self.__original_pre_uhash = self.__pre_uhash
self.__instance_nonce = instance_nonce
self.__original__instance_nonce = instance_nonce
def get_uhash_init(self, serialize=False):
pre_uhash = self.__original_pre_uhash
if serialize:
if isinstance(pre_uhash, HasUhash):
pre_uhash = str(pre_uhash.uhash)
elif isinstance(pre_uhash, UniversalHash):
pre_uhash = str(pre_uhash)
return {
"pre_uhash": pre_uhash,
"instance_nonce": self.__original__instance_nonce,
}
def __set_pre_uhash(self, pre_uhash):
if pre_uhash is None:
self.__pre_uhash = None
elif isinstance(pre_uhash, (str, bytes)):
self.__pre_uhash = UniversalHash(pre_uhash)
elif isinstance(pre_uhash, (UniversalHash, HasUhash)):
self.__pre_uhash = pre_uhash
else:
self.__pre_uhash = uhash(pre_uhash)
@classmethod
def class_nonce(cls):
return cls.__CLASS_NONCE
@classmethod
def class_nonce_data(cls):
return qualname(cls), cls.__VERSION
def instance_nonce(self):
return self.__instance_nonce
def fix_uhash(self):
"""Fix the uhash when it is derived from the uhash data."""
if self.__pre_uhash is not None:
return
keep, self.__instance_nonce = self.__instance_nonce, None
try:
pre_uhash = self.uhash
finally:
self.__instance_nonce = keep
self.__set_pre_uhash(pre_uhash)
def undo_fix_uhash(self):
self.__pre_uhash = self.__original_pre_uhash
def cleanup_references(self):
"""Remove all references to other hashables.
Side effect: fixes the uhash when it depends on another hashable.
"""
if isinstance(self.__pre_uhash, HasUhash):
pre_uhash = self.__pre_uhash.uhash
self.__pre_uhash = pre_uhash
self.__original_pre_uhash = pre_uhash
@property
def uhash(self) -> Optional[UniversalHash]:
_uhash = self.__pre_uhash
if _uhash is None:
data = self._uhash_data()
if missing_data.is_missing_data(data):
return None
cnonce = self.class_nonce()
inonce = self.instance_nonce()
if inonce is None:
return uhash((data, cnonce))
else:
return uhash((data, cnonce, inonce))
else:
if isinstance(_uhash, HasUhash):
_uhash = _uhash.uhash
if _uhash is None:
return None
inonce = self.instance_nonce()
if inonce is None:
return _uhash
else:
return uhash((_uhash, inonce))
def _uhash_data(self):
return self.MISSING_DATA
def uhash_randomize(self):
self.__instance_nonce = random.randint(-1e100, 1e100)
def undo_randomize(self):
self.__instance_nonce = self.__original__instance_nonce