import random
import hashlib
from typing import Any, Optional, Type, Union
from collections.abc import Mapping, Iterable, Set
import numpy
from ewoksutils.import_utils import qualname
from . import missing_data
def classhashdata(cls: Type) -> bytes:
return qualname(cls).encode()
def multitype_sorted(sequence: Iterable, key=None) -> list:
try:
return sorted(sequence, key=key)
except TypeError:
pass
if key is None:
def key(item):
return item
adict = dict()
for item in sequence:
typename = type(key(item)).__name__
adict.setdefault(typename, list()).append(item)
return [
item
for _, items in sorted(adict.items(), key=lambda tpl: tpl[0])
for item in sorted(items, key=key)
]
class UniversalHash:
def __init__(self, hexdigest: Union[str, bytes]):
if isinstance(hexdigest, bytes):
hexdigest = hexdigest.decode()
if not isinstance(hexdigest, str):
raise TypeError(hexdigest, type(hexdigest))
self._hexdigest = hexdigest
def __hash__(self):
# make it python hashable (to use in sets and dict keys)
return hash(self._hexdigest)
def __repr__(self):
return "UniversalHash('{}')".format(self)
def __str__(self):
return self._hexdigest
def __eq__(self, other):
return str(self) == str(other)
def __lt__(self, other):
return str(self) < str(other)
def uhash(value, _hash=None) -> UniversalHash:
"""Universial hash (as opposed to python's `hash`)."""
# Avoid using python's hash!
bdigest = _hash is None
if bdigest:
_hash = hashlib.sha256()
_hash.update(classhashdata(type(value)))
if value is None:
pass
elif isinstance(value, HasUhash):
_hash.update(repr(value.uhash).encode())
elif isinstance(value, UniversalHash):
_hash.update(repr(value).encode())
elif isinstance(value, bytes):
_hash.update(value)
elif isinstance(value, str):
_hash.update(value.encode())
elif isinstance(value, int):
_hash.update(hex(value).encode())
elif isinstance(value, float):
_hash.update(value.hex().encode())
elif isinstance(value, (numpy.ndarray, numpy.number)):
_hash.update(value.tobytes())
elif isinstance(value, Mapping):
lst = multitype_sorted(value.items(), key=lambda item: item[0])
if lst:
keys, values = zip(*lst)
else:
keys = values = list()
uhash(keys, _hash=_hash)
uhash(values, _hash=_hash)
elif isinstance(value, Set):
values = multitype_sorted(value)
uhash(values, _hash=_hash)
elif isinstance(value, Iterable):
# Ordered
for v in value:
uhash(v, _hash=_hash)
else:
# TODO: register custom types
raise TypeError(f"cannot uhash {value} (type: {type(value)})")
if bdigest:
return UniversalHash(_hash.hexdigest())
class HasUhash:
@property
def uhash(self) -> Optional[UniversalHash]:
raise NotImplementedError
def __hash__(self):
# make it python hashable (to use in sets and dict keys)
uhash = self.uhash
if uhash is None:
return hash(id(self))
else:
return hash(uhash)
def __eq__(self, other):
if isinstance(other, HasUhash):
uhash = other.uhash
elif isinstance(other, UniversalHash):
uhash = other
else:
raise TypeError(other, type(other))
return self.uhash == uhash
def _get_repr_data(self) -> dict:
data = dict()
uhash = self.uhash
if uhash is None:
data["uhash"] = None
else:
data["uhash"] = repr(str(uhash))
return data
def __repr__(self):
data = self._get_repr_data()
if data:
sdata = ", ".join([f"{k}={v}" for k, v in data.items()])
return f"{super().__repr__()}({sdata})"
else:
return super().__repr__()
def __str__(self):
data = self._get_repr_data()
if data:
sdata = ", ".join([f"{k}={v}" for k, v in data.items()])
return f"{qualname(type(self))}({sdata})"
else:
return qualname(type(self))
PreUhashTypes = Union[str, bytes, UniversalHash, HasUhash]
class UniversalHashable(HasUhash):
"""The universal hash of an instance of this class is based on:
* pre-uhash
* instance nonce (if any)
The universal hash is equal to the pre-hash when an instance nonce is not provided.
The pre-uhash is either provided or based on:
* data
* class nonce (class qualifier name, class version, superclass nonce)
"""
__CLASS_NONCE = None
__VERSION = None
MISSING_DATA = missing_data.MISSING_DATA
def __init__(
self,
pre_uhash: Optional[PreUhashTypes] = None,
instance_nonce: Optional[Any] = None,
):
self.set_uhash_init(pre_uhash=pre_uhash, instance_nonce=instance_nonce)
def __init_subclass__(subcls, version=None, **kwargs):
super().__init_subclass__(**kwargs)
supercls_data = subcls.class_nonce()
subcls.__VERSION = version
subcls_data = subcls.class_nonce_data()
subcls.__CLASS_NONCE = str(uhash((subcls_data, supercls_data)))
def set_uhash_init(
self,
pre_uhash: Optional[PreUhashTypes] = None,
instance_nonce: Optional[Any] = None,
):
self.__set_pre_uhash(pre_uhash)
self.__original_pre_uhash = self.__pre_uhash
self.__instance_nonce = instance_nonce
self.__original__instance_nonce = instance_nonce
def get_uhash_init(self, serialize=False):
pre_uhash = self.__original_pre_uhash
if serialize:
if isinstance(pre_uhash, HasUhash):
pre_uhash = str(pre_uhash.uhash)
elif isinstance(pre_uhash, UniversalHash):
pre_uhash = str(pre_uhash)
return {
"pre_uhash": pre_uhash,
"instance_nonce": self.__original__instance_nonce,
}
def __set_pre_uhash(self, pre_uhash):
if pre_uhash is None:
self.__pre_uhash = None
elif isinstance(pre_uhash, (str, bytes)):
self.__pre_uhash = UniversalHash(pre_uhash)
elif isinstance(pre_uhash, (UniversalHash, HasUhash)):
self.__pre_uhash = pre_uhash
else:
self.__pre_uhash = uhash(pre_uhash)
@classmethod
def class_nonce(cls):
return cls.__CLASS_NONCE
@classmethod
def class_nonce_data(cls):
return qualname(cls), cls.__VERSION
def instance_nonce(self):
return self.__instance_nonce
def fix_uhash(self):
"""Fix the uhash when it is derived from the uhash data."""
if self.__pre_uhash is not None:
return
keep, self.__instance_nonce = self.__instance_nonce, None
try:
pre_uhash = self.uhash
finally:
self.__instance_nonce = keep
self.__set_pre_uhash(pre_uhash)
def undo_fix_uhash(self):
self.__pre_uhash = self.__original_pre_uhash
def cleanup_references(self):
"""Remove all references to other hashables.
Side effect: fixes the uhash when it depends on another hashable.
"""
if isinstance(self.__pre_uhash, HasUhash):
pre_uhash = self.__pre_uhash.uhash
self.__pre_uhash = pre_uhash
self.__original_pre_uhash = pre_uhash
@property
def uhash(self) -> Optional[UniversalHash]:
_uhash = self.__pre_uhash
if _uhash is None:
data = self._uhash_data()
if missing_data.is_missing_data(data):
return None
cnonce = self.class_nonce()
inonce = self.instance_nonce()
if inonce is None:
return uhash((data, cnonce))
else:
return uhash((data, cnonce, inonce))
else:
if isinstance(_uhash, HasUhash):
_uhash = _uhash.uhash
if _uhash is None:
return None
inonce = self.instance_nonce()
if inonce is None:
return _uhash
else:
return uhash((_uhash, inonce))
def _uhash_data(self):
return self.MISSING_DATA
def uhash_randomize(self):
self.__instance_nonce = random.randint(-1e100, 1e100)
def undo_randomize(self):
self.__instance_nonce = self.__original__instance_nonce