diff --git a/hathor/nanocontracts/blueprint.py b/hathor/nanocontracts/blueprint.py new file mode 100644 index 000000000..87c552ec1 --- /dev/null +++ b/hathor/nanocontracts/blueprint.py @@ -0,0 +1,124 @@ +# Copyright 2023 Hathor Labs +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from __future__ import annotations + +from typing import Any + +from hathor.nanocontracts.exception import BlueprintSyntaxError +from hathor.nanocontracts.types import NC_FALLBACK_METHOD, NC_INITIALIZE_METHOD, NC_METHOD_TYPE_ATTR, NCMethodType + +FORBIDDEN_NAMES = { + 'syscall', + 'log', +} + +NC_FIELDS_ATTR: str = '__fields' + + +class _BlueprintBase(type): + """Metaclass for blueprints. + + This metaclass will modify the attributes and set Fields to them according to their types. + """ + + def __new__(cls, name, bases, attrs, **kwargs): + from hathor.nanocontracts.fields import make_field_for_type + + # Initialize only subclasses of Blueprint. + parents = [b for b in bases if isinstance(b, _BlueprintBase)] + if not parents: + return super().__new__(cls, name, bases, attrs, **kwargs) + + cls._validate_initialize_method(attrs) + cls._validate_fallback_method(attrs) + nc_fields = attrs.get('__annotations__', {}) + + # Check for forbidden names. + for field_name in nc_fields: + if field_name in FORBIDDEN_NAMES: + raise BlueprintSyntaxError(f'field name is forbidden: `{field_name}`') + + if field_name.startswith('_'): + raise BlueprintSyntaxError(f'field name cannot start with underscore: `{field_name}`') + + # Create the fields attribute with the type for each field. + attrs[NC_FIELDS_ATTR] = nc_fields + + # Use an empty __slots__ to prevent storing any attributes directly on instances. + # The declared attributes are stored as fields on the class, so they still work despite the empty slots. + attrs['__slots__'] = tuple() + + # Finally, create class! + new_class = super().__new__(cls, name, bases, attrs, **kwargs) + + # Create the Field instance according to each type. + for field_name, field_type in attrs[NC_FIELDS_ATTR].items(): + value = getattr(new_class, field_name, None) + if value is None: + # This is the case when a type is specified but not a value. + # Example: + # name: str + # age: int + try: + field = make_field_for_type(field_name, field_type) + except TypeError: + raise BlueprintSyntaxError( + f'unsupported field type `{field_type.__name__}` on field `{field_name}`' + ) + setattr(new_class, field_name, field) + else: + # This is the case when a value is specified. + # Example: + # name: str = StrField() + # + # This was not implemented yet and will be extended later. + raise BlueprintSyntaxError(f'fields with default values are currently not supported: `{field_name}`') + + return new_class + + @staticmethod + def _validate_initialize_method(attrs: Any) -> None: + if NC_INITIALIZE_METHOD not in attrs: + raise BlueprintSyntaxError(f'blueprints require a method called `{NC_INITIALIZE_METHOD}`') + + method = attrs[NC_INITIALIZE_METHOD] + method_type = getattr(method, NC_METHOD_TYPE_ATTR, None) + + if method_type is not NCMethodType.PUBLIC: + raise BlueprintSyntaxError(f'`{NC_INITIALIZE_METHOD}` method must be annotated with @public') + + @staticmethod + def _validate_fallback_method(attrs: Any) -> None: + if NC_FALLBACK_METHOD not in attrs: + return + + method = attrs[NC_FALLBACK_METHOD] + method_type = getattr(method, NC_METHOD_TYPE_ATTR, None) + + if method_type is not NCMethodType.FALLBACK: + raise BlueprintSyntaxError(f'`{NC_FALLBACK_METHOD}` method must be annotated with @fallback') + + +class Blueprint(metaclass=_BlueprintBase): + """Base class for all blueprints. + + Example: + + class MyBlueprint(Blueprint): + name: str + age: int + """ + + __slots__ = ('__env',) diff --git a/hathor/nanocontracts/fields/__init__.py b/hathor/nanocontracts/fields/__init__.py new file mode 100644 index 000000000..9a7abc4fd --- /dev/null +++ b/hathor/nanocontracts/fields/__init__.py @@ -0,0 +1,61 @@ +# Copyright 2023 Hathor Labs +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from collections import deque +from typing import TypeVar + +from hathor.nanocontracts.fields.deque_field import DequeField +from hathor.nanocontracts.fields.dict_field import DictField +from hathor.nanocontracts.fields.field import Field +from hathor.nanocontracts.fields.set_field import SetField +from hathor.nanocontracts.fields.utils import TypeToFieldMap +from hathor.nanocontracts.nc_types import DEFAULT_TYPE_ALIAS_MAP, DEFAULT_TYPE_TO_NC_TYPE_MAP +from hathor.nanocontracts.nc_types.utils import TypeAliasMap, TypeToNCTypeMap + +__all__ = [ + 'DEFAULT_TYPE_TO_FIELD_MAP', + 'DequeField', + 'DictField', + 'Field', + 'SetField', + 'TypeToFieldMap', + 'make_field_for_type', +] + +T = TypeVar('T') + +DEFAULT_TYPE_TO_FIELD_MAP: TypeToFieldMap = { + dict: DictField, + list: DequeField, # XXX: we should really make a ListField, a deque is different from a list + set: SetField, + deque: DequeField, + # XXX: other types fallback to DEFAULT_TYPE_TO_NC_TYPE_MAP +} + + +def make_field_for_type( + name: str, + type_: type[T], + /, + *, + type_field_map: TypeToFieldMap = DEFAULT_TYPE_TO_FIELD_MAP, + type_nc_type_map: TypeToNCTypeMap = DEFAULT_TYPE_TO_NC_TYPE_MAP, + type_alias_map: TypeAliasMap = DEFAULT_TYPE_ALIAS_MAP, +) -> Field[T]: + """ Like Field.from_name_and_type, but with default maps. + + Default arguments can't be easily added to NCType.from_type signature because of recursion. + """ + type_map = Field.TypeMap(type_alias_map, type_nc_type_map, type_field_map) + return Field.from_name_and_type(name, type_, type_map=type_map) diff --git a/hathor/nanocontracts/fields/container_field.py b/hathor/nanocontracts/fields/container_field.py new file mode 100644 index 000000000..c3aec5388 --- /dev/null +++ b/hathor/nanocontracts/fields/container_field.py @@ -0,0 +1,117 @@ +# Copyright 2025 Hathor Labs +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from __future__ import annotations + +from abc import ABC, abstractmethod +from collections.abc import Container +from typing import Generic, TypeVar + +from typing_extensions import TYPE_CHECKING, Self, override + +from hathor.nanocontracts.fields.field import Field +from hathor.nanocontracts.storage import NCContractStorage +from hathor.util import not_none +from hathor.utils.typing import InnerTypeMixin, get_origin + +if TYPE_CHECKING: + from hathor.nanocontracts.blueprint import Blueprint + +C = TypeVar('C', bound=Container) + +KEY_SEPARATOR: str = ':' + + +class StorageContainer(Generic[C], ABC): + """ Abstraction over the class that will be returned when accessing a container field. + + Every method and property in this class should use either `__dunder` or `__special__` naming pattern, because + otherwise the property/method would be accessible from an OCB. Even if there would be no harm, this is generally + avoided. + """ + __slots__ = () + + @classmethod + @abstractmethod + def __check_name_and_type__(cls, name: str, type_: type[C]) -> None: + """Should raise a TypeError if the given name or type is incompatible for use with container.""" + raise NotImplementedError + + @classmethod + @abstractmethod + def __from_name_and_type__( + cls, + storage: NCContractStorage, + name: str, + type_: type[C], + /, + *, + type_map: Field.TypeMap, + ) -> Self: + """Every StorageContainer should be able to be built with this signature. + + Expect a type that has been previously checked with `cls.__check_name_and_type__`. + """ + raise NotImplementedError + + +T = TypeVar('T', bound=StorageContainer) + + +class ContainerField(InnerTypeMixin[T], Field[T]): + """ This class models a Field with a StorageContainer, it can't be set, only accessed as a container. + + This is modeled after a Python descriptor, similar to the built in `property`, see: + + - https://docs.python.org/3/reference/datamodel.html#implementing-descriptors + + The observed value behaves like a container, the specific behavior depends on the container type. + """ + + __slots__ = ('__name', '__type', '__type_map') + __name: str + __type: type[T] + __type_map: Field.TypeMap + + # XXX: customize InnerTypeMixin behavior so it stores the origin type, since that's what we want + @classmethod + def __extract_inner_type__(cls, args: tuple[type, ...], /) -> type[T]: + inner_type: type[T] = InnerTypeMixin.__extract_inner_type__(args) + return not_none(get_origin(inner_type)) + + @override + @classmethod + def _from_name_and_type(cls, name: str, type_: type[T], /, *, type_map: Field.TypeMap) -> Self: + if not issubclass(cls.__inner_type__, StorageContainer): + raise TypeError(f'{cls.__inner_type__} is not a StorageContainer') + cls.__inner_type__.__check_name_and_type__(name, type_) + field = cls() + field.__name = name + field.__type = type_ + field.__type_map = type_map + return field + + @override + def __set__(self, instance: Blueprint, value: T) -> None: + # XXX: alternatively this could mimick a `my_container.clear(); my_container.update(value)` + raise AttributeError('cannot set a container field') + + @override + def __get__(self, instance: Blueprint, owner: object | None = None) -> T: + raise NotImplementedError('temporarily removed during nano merge') + + @override + def __delete__(self, instance: Blueprint) -> None: + # XXX: alternatively delete the database + raise AttributeError('cannot delete a container field') diff --git a/hathor/nanocontracts/fields/deque_field.py b/hathor/nanocontracts/fields/deque_field.py new file mode 100644 index 000000000..2f2b449e6 --- /dev/null +++ b/hathor/nanocontracts/fields/deque_field.py @@ -0,0 +1,292 @@ +# Copyright 2025 Hathor Labs +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from collections import deque +from collections.abc import Iterable, Iterator, Sequence +from dataclasses import dataclass, replace +from typing import ClassVar, SupportsIndex, TypeVar, get_args, get_origin + +from typing_extensions import Self, override + +from hathor.nanocontracts.fields.container_field import KEY_SEPARATOR, ContainerField, StorageContainer +from hathor.nanocontracts.fields.field import Field +from hathor.nanocontracts.nc_types import NCType, VarInt32NCType +from hathor.nanocontracts.nc_types.dataclass_nc_type import make_dataclass_opt_nc_type +from hathor.nanocontracts.storage import NCContractStorage +from hathor.util import not_none + +T = TypeVar('T') +_METADATA_KEY: str = '__metadata__' +_INDEX_NC_TYPE = VarInt32NCType() + +# TODO: support maxlen (will require support for initialization values) + + +@dataclass(slots=True, frozen=True, kw_only=True) +class _DequeMetadata: + first_index: int = 0 + length: int = 0 + reversed: bool = False + + @property + def last_index(self) -> int: + return self.first_index + self.length - 1 + + +_METADATA_NC_TYPE = make_dataclass_opt_nc_type(_DequeMetadata) + + +class DequeStorageContainer(StorageContainer[Sequence[T]]): + # from https://github.com/python/typeshed/blob/main/stdlib/collections/__init__.pyi + __slots__ = ('__storage', '__name', '__value', '__metadata_key') + __storage: NCContractStorage + __name: str + __value: NCType[T] + __metadata_key: bytes + + def __init__(self, storage: NCContractStorage, name: str, value: NCType[T]) -> None: + self.__storage = storage + self.__name = name + self.__value = value + self.__metadata_key = f'{name}{KEY_SEPARATOR}{_METADATA_KEY}'.encode() + + # Methods needed by StorageContainer: + + @override + @classmethod + def __check_name_and_type__(cls, name: str, type_: type[Sequence[T]]) -> None: + if not name.isidentifier(): + raise TypeError('field name must be a valid identifier') + origin_type: type[Sequence[T]] = not_none(get_origin(type_)) + if not issubclass(origin_type, Sequence): + raise TypeError('expected Sequence type') + args = get_args(type_) + if not args or len(args) != 1: + raise TypeError(f'expected {type_.__name__}[]') + + @override + @classmethod + def __from_name_and_type__( + cls, + storage: NCContractStorage, + name: str, + type_: type[Sequence[T]], + /, + *, + type_map: Field.TypeMap, + ) -> 'Self': + item_type, = get_args(type_) + item_nc_type = NCType.from_type(item_type, type_map=type_map.to_nc_type_map()) + return cls(storage, name, item_nc_type) + + # INTERNAL METHODS: all of these must be __dunder_methods so they aren't accessible from an OCB + + def __to_db_key(self, index: SupportsIndex) -> bytes: + return f'{self.__name}{KEY_SEPARATOR}'.encode() + _INDEX_NC_TYPE.to_bytes(index.__index__()) + + def __get_metadata(self) -> _DequeMetadata: + metadata = self.__storage.get_obj(self.__metadata_key, _METADATA_NC_TYPE, default=None) + + if metadata is None: + metadata = _DequeMetadata() + self.__storage.put_obj(self.__metadata_key, _METADATA_NC_TYPE, metadata) + + assert isinstance(metadata, _DequeMetadata) + return metadata + + def __update_metadata(self, new_metadata: _DequeMetadata) -> None: + assert new_metadata.length >= 0 + if new_metadata.length == 0: + return self.__storage.del_obj(self.__metadata_key) + self.__storage.put_obj(self.__metadata_key, _METADATA_NC_TYPE, new_metadata) + + def __extend(self, *, items: Iterable[T], metadata: _DequeMetadata) -> None: + new_last_index = metadata.last_index + for item in items: + new_last_index += 1 + key = self.__to_db_key(new_last_index) + self.__storage.put_obj(key, self.__value, item) + new_metadata = replace(metadata, length=new_last_index - metadata.first_index + 1) + self.__update_metadata(new_metadata) + + def __extendleft(self, *, items: Iterable[T], metadata: _DequeMetadata) -> None: + new_first_index = metadata.first_index + for item in items: + new_first_index -= 1 + key = self.__to_db_key(new_first_index) + self.__storage.put_obj(key, self.__value, item) + new_metadata = replace( + metadata, + first_index=new_first_index, + length=metadata.last_index - new_first_index + 1, + ) + self.__update_metadata(new_metadata) + + def __pop(self, *, metadata: _DequeMetadata, left: bool) -> T: + if metadata.length == 0: + raise IndexError + + index = metadata.first_index if left else metadata.last_index + key = self.__to_db_key(index) + item = self.__storage.get_obj(key, self.__value) + self.__storage.del_obj(key) + new_metadata = replace( + metadata, + first_index=metadata.first_index + 1 if left else metadata.first_index, + length=metadata.length - 1 + ) + self.__update_metadata(new_metadata) + return item + + def __to_internal_index(self, *, index: SupportsIndex) -> int: + metadata = self.__get_metadata() + idx = index.__index__() + + if idx < 0: + idx += metadata.length + + if idx < 0 or idx >= metadata.length: + raise IndexError + + return metadata.last_index - idx if metadata.reversed else metadata.first_index + idx + + # Methods needed by MutableSequence and Sequence: + + def __getitem__(self, index: SupportsIndex, /) -> T: + internal_index = self.__to_internal_index(index=index) + key = self.__to_db_key(internal_index) + return self.__storage.get_obj(key, self.__value) + + def __len__(self) -> int: + return self.__get_metadata().length + + def __setitem__(self, index: SupportsIndex, value: T, /) -> None: + internal_index = self.__to_internal_index(index=index) + key = self.__to_db_key(internal_index) + self.__storage.put_obj(key, self.__value, value) + + def __delitem__(self, key: SupportsIndex, /) -> None: + raise NotImplementedError + + def insert(self, i: int, x: T, /) -> None: + raise NotImplementedError + + # Methods provided by Sequence (currently not implemented): + + # def index(self, x: T, start: int = 0, stop: int = ..., /) -> int: ... + # def count(self, x: T, /) -> int: ... + # def __contains__(self, key: object, /) -> bool: ... + # def __iter__(self) -> Iterator[_T_co]: ... + # def __reversed__(self) -> None: + + # Methods provided by MutableSequence (currently not implemented): + + # def append(self, x: T, /) -> None: ... + # def clear(self) -> None: ... + # def extend(self, iterable: Iterable[T], /) -> None: ... + # def reverse(self) -> None: + # def pop(self) -> T: ... # type: ignore[override] + # def remove(self, value: T, /) -> None: ... + # def __iadd__(self, value: Iterable[T], /) -> Self: ... + + # out of those, we specialize these: + + def append(self, item: T, /) -> None: + self.extend((item,)) + + def extend(self, items: Iterable[T], /) -> None: + metadata = self.__get_metadata() + if metadata.reversed: + return self.__extendleft(items=items, metadata=metadata) + self.__extend(items=items, metadata=metadata) + + def pop(self) -> T: + metadata = self.__get_metadata() + return self.__pop(metadata=metadata, left=metadata.reversed) + + def reverse(self) -> None: + metadata = self.__get_metadata() + new_metadata = replace(metadata, reversed=not metadata.reversed) + self.__update_metadata(new_metadata) + + def __iter__(self) -> Iterator[T]: + metadata = self.__get_metadata() + indexes = range(metadata.first_index, metadata.last_index + 1) + + if metadata.reversed: + indexes = range(metadata.last_index, metadata.first_index - 1, -1) + + for i in indexes: + key = self.__to_db_key(i) + yield self.__storage.get_obj(key, self.__value) + + # Other deque methods that we implement to look like a deque: + + @property + def maxlen(self) -> int | None: + return None + + def appendleft(self, item: T, /) -> None: + self.extendleft((item,)) + + def extendleft(self, items: Iterable[T], /) -> None: + metadata = self.__get_metadata() + if metadata.reversed: + return self.__extend(items=items, metadata=metadata) + self.__extendleft(items=items, metadata=metadata) + + def popleft(self) -> T: + metadata = self.__get_metadata() + return self.__pop(metadata=metadata, left=not metadata.reversed) + + def copy(self) -> 'Self': + raise NotImplementedError + + def rotate(self, n: int = 1, /) -> None: + raise NotImplementedError + + def __copy__(self) -> 'Self': + raise NotImplementedError + + __hash__: ClassVar[None] # type: ignore[assignment] + + def __reduce__(self) -> tuple[type['Self'], tuple[()], None, Iterator[T]]: + raise NotImplementedError + + def __add__(self, value: 'Self', /) -> 'Self': + raise NotImplementedError + + def __mul__(self, value: int, /) -> 'Self': + raise NotImplementedError + + def __imul__(self, value: int, /) -> 'Self': + raise NotImplementedError + + def __lt__(self, value: deque[T], /) -> bool: + raise NotImplementedError + + def __le__(self, value: deque[T], /) -> bool: + raise NotImplementedError + + def __gt__(self, value: deque[T], /) -> bool: + raise NotImplementedError + + def __ge__(self, value: deque[T], /) -> bool: + raise NotImplementedError + + def __eq__(self, value: object, /) -> bool: + raise NotImplementedError + + +DequeField = ContainerField[DequeStorageContainer[T]] diff --git a/hathor/nanocontracts/fields/dict_field.py b/hathor/nanocontracts/fields/dict_field.py new file mode 100644 index 000000000..43b76925f --- /dev/null +++ b/hathor/nanocontracts/fields/dict_field.py @@ -0,0 +1,192 @@ +# Copyright 2023 Hathor Labs +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from collections.abc import Hashable, Iterator, Mapping +from typing import TypeVar, get_args, get_origin, overload + +from typing_extensions import Self, override + +from hathor.nanocontracts.fields.container_field import KEY_SEPARATOR, ContainerField, StorageContainer +from hathor.nanocontracts.fields.field import Field +from hathor.nanocontracts.nc_types import NCType, VarUint32NCType +from hathor.nanocontracts.nc_types.utils import is_origin_hashable +from hathor.nanocontracts.storage import NCContractStorage +from hathor.util import not_none + +K = TypeVar('K', bound=Hashable) +V = TypeVar('V') +_T = TypeVar('_T') +_LENGTH_KEY: str = '__length__' +_LENGTH_NC_TYPE = VarUint32NCType() + + +class DictStorageContainer(StorageContainer[Mapping[K, V]]): + """This is a dict-like object. + + Based on the implementation of UserDict, see: + - https://github.com/python/cpython/blob/main/Lib/collections/__init__.py + """ + + __slots__ = ('__storage', '__name', '__key', '__value', '__length_key') + __storage: NCContractStorage + __name: str + __key: NCType[K] + __value: NCType[V] + __length_key: bytes + + def __init__(self, storage: NCContractStorage, name: str, key: NCType[K], value: NCType[V]) -> None: + self.__storage = storage + self.__name = name + self.__key = key + self.__value = value + self.__length_key = f'{name}{KEY_SEPARATOR}{_LENGTH_KEY}'.encode() + + # Methods needed by StorageContainer: + + @override + @classmethod + def __check_name_and_type__(cls, name: str, type_: type[Mapping[K, V]]) -> None: + if not name.isidentifier(): + raise TypeError('field name must be a valid identifier') + origin_type: type[Mapping[K, V]] = not_none(get_origin(type_)) + if not issubclass(origin_type, Mapping): + raise TypeError('expected Mapping type') + args = get_args(type_) + if not args or len(args) != 2: + raise TypeError(f'expected {type_.__name__}[, ]') + key_type, value_type = args + if not is_origin_hashable(key_type): + raise TypeError(f'{key_type} is not hashable') + + @override + @classmethod + def __from_name_and_type__( + cls, + storage: NCContractStorage, + name: str, + type_: type[Mapping[K, V]], + /, + *, + type_map: Field.TypeMap, + ) -> Self: + key_type, value_type = get_args(type_) + key_nc_type = NCType.from_type(key_type, type_map=type_map.to_nc_type_map()) + assert key_nc_type.is_hashable(), 'hashable "types" must produce hashable "values"' + value_nc_type = NCType.from_type(value_type, type_map=type_map.to_nc_type_map()) + return cls(storage, name, key_nc_type, value_nc_type) + + # INTERNAL METHODS: all of these must be __dunder_methods so they aren't accessible from an OCB + + def __to_db_key(self, key: K) -> bytes: + # We don't need to explicitly hash the key here, because the trie already does it internally. + return f'{self.__name}{KEY_SEPARATOR}'.encode() + self.__key.to_bytes(key) + + def __get_length(self) -> int: + return self.__storage.get_obj(self.__length_key, _LENGTH_NC_TYPE, default=0) + + def __increase_length(self) -> None: + self.__storage.put_obj(self.__length_key, _LENGTH_NC_TYPE, self.__get_length() + 1) + + def __decrease_length(self) -> None: + length = self.__get_length() + assert length > 0 + self.__storage.put_obj(self.__length_key, _LENGTH_NC_TYPE, length - 1) + + # Methods needed by MutableMapping (and to behave like a dict) + + def __len__(self) -> int: + return self.__get_length() + + def __getitem__(self, key: K, /) -> V: + # get the data from the storage + db_key = self.__to_db_key(key) + return self.__storage.get_obj(db_key, self.__value) + + def __setitem__(self, key: K, value: V, /) -> None: + if key not in self: + self.__increase_length() + # store `value` at `key` in the storage + self.__storage.put_obj(self.__to_db_key(key), self.__value, value) + + def __delitem__(self, key: K, /) -> None: + if key not in self: + return + self.__decrease_length() + # delete the key from the storage + self.__storage.del_obj(self.__to_db_key(key)) + + def __iter__(self) -> Iterator[K]: + raise NotImplementedError + + # Methods provided by MutableMapping (currently not implemented): + + # def pop(self, key, default=__marker): + # def popitem(self): + # def clear(self): + # def update(self, other=(), /, **kwds): + # def setdefault(self, key, default=None): + + # Modify __contains__ and get() to work like dict does when __missing__ is present. + + def __contains__(self, key: K, /) -> bool: + # return true if the `key` exists in the collection + try: + self[key] + except KeyError: + return False + else: + return True + + @overload + def get(self, key: K, /) -> V: + ... + + @overload + def get(self, key: K, default: V | _T | None, /) -> V | _T | None: + ... + + # XXX: `misc` is ignored because mypy thinks this function does not accept all arguments of the second get overload + def get(self, key: K, default: V | _T | None = None, /) -> V | _T | None: # type: ignore[misc] + # return the value for key if key is in the storage, else default + try: + return self[key] + except KeyError: + return default + + # Now, add the methods in dicts but not in MutableMapping + + # def __repr__(self): + # raise NotImplementedError + + def __or__(self, value, /): + raise NotImplementedError + + def __ror__(self, value, /): + raise NotImplementedError + + def __ior__(self, value, /): + raise NotImplementedError + + def __copy__(self): + raise NotImplementedError + + def copy(self): + raise NotImplementedError + + @classmethod + def fromkeys(cls, iterable, value=None, /): + raise NotImplementedError + + +DictField = ContainerField[DictStorageContainer[K, V]] diff --git a/hathor/nanocontracts/fields/field.py b/hathor/nanocontracts/fields/field.py new file mode 100644 index 000000000..4103cf0f2 --- /dev/null +++ b/hathor/nanocontracts/fields/field.py @@ -0,0 +1,93 @@ +# Copyright 2025 Hathor Labs +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from __future__ import annotations + +from abc import ABC, abstractmethod +from typing import Generic, NamedTuple, TypeVar, final, get_origin + +from typing_extensions import TYPE_CHECKING, Self + +from hathor.nanocontracts.fields.utils import TypeToFieldMap +from hathor.nanocontracts.nc_types import NCType +from hathor.nanocontracts.nc_types.utils import TypeAliasMap, TypeToNCTypeMap + +if TYPE_CHECKING: + from hathor.nanocontracts.blueprint import Blueprint + +T = TypeVar('T') + + +class Field(Generic[T], ABC): + """ This class is used to model the fields of a Blueprint from the signature that defines them. + + Fields are generally free to implement how they behave, but we have 2 types of behavior: + + - `self.foo = 1` will serialize `1` and save to db on a key derived from `'foo'` name + - `self.foo['bar'] = 'baz'` will serialize and save to db on a key derive from `('foo', 'bar')` + + Usually only one of the two patterns above is supported by a field. The base class itself only defines how to + construct a Field instance from a name and type signature, which is what the Blueprint metaclass needs. + + + OCB safety considerations: + + - A Blueprint must not be able to access a Field instance directly + """ + + class TypeMap(NamedTuple): + alias_map: TypeAliasMap + nc_types_map: TypeToNCTypeMap + fields_map: TypeToFieldMap + + def to_nc_type_map(self) -> NCType.TypeMap: + return NCType.TypeMap(self.alias_map, self.nc_types_map) + + # XXX: do we need to define field.__objclass__ for anything? + + @final + @staticmethod + def from_name_and_type(name: str, type_: type[T], /, *, type_map: TypeMap) -> Field[T]: + from hathor.nanocontracts.fields.nc_type_field import NCTypeField + + # if we have a `dict[int, int]` we use `get_origin()` to get the `dict` part, since it's a different instance + origin_type = get_origin(type_) or type_ + + if origin_type in type_map.fields_map: + field_class = type_map.fields_map[origin_type] + return field_class._from_name_and_type(name, type_, type_map=type_map) + elif NCType.is_supported(origin_type, type_map=type_map.to_nc_type_map()): + return NCTypeField._from_name_and_type(name, type_, type_map=type_map) + else: + raise TypeError(f'type {type_} is not supported by any Field class') + + @classmethod + @abstractmethod + def _from_name_and_type(cls, name: str, type_: type[T], /, *, type_map: TypeMap) -> Self: + raise NotImplementedError + + @abstractmethod + def __set__(self, instance: Blueprint, value: T) -> None: + # called when doing `instance.field = value` + raise NotImplementedError + + @abstractmethod + def __get__(self, instance: Blueprint, owner: object | None = None) -> T: + # called when doing `instance.field` as an expression + raise NotImplementedError + + @abstractmethod + def __delete__(self, instance: Blueprint) -> None: + # called when doing `del instance.field` + raise NotImplementedError diff --git a/hathor/nanocontracts/fields/nc_type_field.py b/hathor/nanocontracts/fields/nc_type_field.py new file mode 100644 index 000000000..41abdcc82 --- /dev/null +++ b/hathor/nanocontracts/fields/nc_type_field.py @@ -0,0 +1,55 @@ +# Copyright 2025 Hathor Labs +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from typing import TypeVar + +from typing_extensions import Self + +from hathor.nanocontracts.blueprint import Blueprint +from hathor.nanocontracts.fields.field import Field +from hathor.nanocontracts.nc_types import NCType + +T = TypeVar('T') + + +class NCTypeField(Field[T]): + """ This class models a Field after a NCType, where acessing the field implies deserializing the value from the db. + + This is modeled after a Python descriptor, similar to the built in `property`, see: + + - https://docs.python.org/3/reference/datamodel.html#implementing-descriptors + """ + __slots__ = ('__name', '__nc_type') + + __name: str + __nc_type: NCType[T] + + @classmethod + def _from_name_and_type(cls, name: str, type_: type[T], /, *, type_map: Field.TypeMap) -> Self: + field = cls() + field.__name = name + field.__nc_type = NCType.from_type(type_, type_map=type_map.to_nc_type_map()) + return field + + def __storage_key(self) -> bytes: + return self.__name.encode('utf-8') + + def __set__(self, instance: Blueprint, obj: T) -> None: + raise NotImplementedError('temporarily removed during nano merge') + + def __get__(self, instance: Blueprint, owner: object | None = None) -> T: + raise NotImplementedError('temporarily removed during nano merge') + + def __delete__(self, instance: Blueprint) -> None: + raise NotImplementedError('temporarily removed during nano merge') diff --git a/hathor/nanocontracts/fields/set_field.py b/hathor/nanocontracts/fields/set_field.py new file mode 100644 index 000000000..2826aae73 --- /dev/null +++ b/hathor/nanocontracts/fields/set_field.py @@ -0,0 +1,211 @@ +# Copyright 2025 Hathor Labs +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from collections.abc import Iterable, Iterator +from typing import Any, TypeVar, get_args, get_origin + +from typing_extensions import Self, override + +from hathor.nanocontracts.fields.container_field import KEY_SEPARATOR, ContainerField, StorageContainer +from hathor.nanocontracts.fields.field import Field +from hathor.nanocontracts.nc_types import NCType, VarUint32NCType +from hathor.nanocontracts.nc_types.utils import is_origin_hashable +from hathor.nanocontracts.storage import NCContractStorage +from hathor.util import not_none + +T = TypeVar('T') +_S = TypeVar('_S') +_NOT_PROVIDED = object() +_LENGTH_KEY: str = '__length__' +_LENGTH_NC_TYPE = VarUint32NCType() + + +class SetStorageContainer(StorageContainer[set[T]]): + # from https://github.com/python/typeshed/blob/main/stdlib/collections/__init__.pyi + # from https://github.com/python/typeshed/blob/main/stdlib/typing.pyi + + __slots__ = ('__storage', '__name', '__value', '__length_key') + __storage: NCContractStorage + __name: str + __value: NCType[T] + __length_key: bytes + + # XXX: what to do with this: + # __hash__: ClassVar[None] # type: ignore[assignment] + + def __init__(self, storage: NCContractStorage, name: str, value: NCType[T]) -> None: + self.__storage = storage + self.__name = name + self.__value = value + self.__length_key = f'{name}{KEY_SEPARATOR}{_LENGTH_KEY}'.encode() + + # Methods needed by StorageContainer: + + @override + @classmethod + def __check_name_and_type__(cls, name: str, type_: type[set[T]]) -> None: + if not name.isidentifier(): + raise TypeError('field name must be a valid identifier') + origin_type: type[set[T]] = not_none(get_origin(type_)) + if not issubclass(origin_type, set): + raise TypeError('expected set type') + args = get_args(type_) + if not args or len(args) != 1: + raise TypeError(f'expected {type_.__name__}[]') + item_type, = args + if not is_origin_hashable(item_type): + raise TypeError(f'{item_type} is not hashable') + + @override + @classmethod + def __from_name_and_type__( + cls, + storage: NCContractStorage, + name: str, + type_: type[set[T]], + /, + *, + type_map: Field.TypeMap, + ) -> Self: + item_type, = get_args(type_) + item_nc_type = NCType.from_type(item_type, type_map=type_map.to_nc_type_map()) + assert item_nc_type.is_hashable(), 'hashable "types" must produce hashable "values"' + return cls(storage, name, item_nc_type) + + def __to_db_key(self, elem: T) -> bytes: + # We don't need to explicitly hash the value here, because the trie already does it internally. + return f'{self.__name}{KEY_SEPARATOR}'.encode() + self.__value.to_bytes(elem) + + def __get_length(self) -> int: + return self.__storage.get_obj(self.__length_key, _LENGTH_NC_TYPE, default=0) + + def __increase_length(self) -> None: + self.__storage.put_obj(self.__length_key, _LENGTH_NC_TYPE, self.__get_length() + 1) + + def __decrease_length(self) -> None: + length = self.__get_length() + assert length > 0 + self.__storage.put_obj(self.__length_key, _LENGTH_NC_TYPE, length - 1) + + # required by Iterable + + def __iter__(self) -> Iterator[T]: + raise NotImplementedError + + # required bt Collection + + def __len__(self) -> int: + return self.__get_length() + + # required by AbstractSet + + def __contains__(self, elem: T, /) -> bool: + key = self.__to_db_key(elem) + return self.__storage.has_obj(key) + + # provided by Set (currently not implemented): + # + # def _hash(self) -> int: ... + # def __le__(self, other: set[Any]) -> bool: ... + # def __lt__(self, other: set[Any]) -> bool: ... + # def __gt__(self, other: set[Any]) -> bool: ... + # def __ge__(self, other: set[Any]) -> bool: ... + # def __and__(self, other: set[Any]) -> set[T]: ... + # def __or__(self, other: set[T]) -> set[T]: ... + # def __sub__(self, other: set[Any]) -> set[T]: ... + # def __xor__(self, other: set[T]) -> set[T]: ... + # def __eq__(self, other: object) -> bool: ... + # def isdisjoint(self, other: Iterable[Any]) -> bool: ... + + def isdisjoint(self, other: Iterable[Any]) -> bool: + return len(self.intersection(other)) == 0 + + # required by MutableSet + + def add(self, elem: T, /) -> None: + key = self.__to_db_key(elem) + if self.__storage.has_obj(key): + return + self.__storage.put_obj(key, self.__value, elem) + self.__increase_length() + + def discard(self, elem: T, /) -> None: + key = self.__to_db_key(elem) + if not self.__storage.has_obj(key): + return + self.__storage.del_obj(key) + self.__decrease_length() + + # provided by MutableSet (currently not implemented): + # + # def clear(self) -> None: ... + # def pop(self) -> T: ... + # def remove(self, value: T) -> None: ... + # def __ior__(self, it: set[T]) -> Self: ... # type: ignore[override,misc] + # def __iand__(self, it: set[Any]) -> Self: ... + # def __ixor__(self, it: set[T]) -> Self: ... # type: ignore[override,misc] + # def __isub__(self, it: set[Any]) -> Self: ... + + # of which we override: + + def remove(self, elem: T, /) -> None: + key = self.__to_db_key(elem) + if not self.__storage.has_obj(key): + raise KeyError + self.__storage.del_obj(key) + self.__decrease_length() + + # Additional methods to behave like a set + # see https://github.com/python/typeshed/blob/main/stdlib/builtins.pyi#L1168 + + def copy(self) -> set[T]: + raise NotImplementedError + + def difference(self, *s: Iterable[Any]) -> set[T]: + raise NotImplementedError + + def difference_update(self, *others: Iterable[Any]) -> None: + for other in others: + for elem in other: + self.discard(elem) + + # def intersection(self, *s: Iterable[Any]) -> set[T]: ... + def intersection(self, other: Iterable[Any]) -> set[T]: + return set(elem for elem in other if elem in self) + + def intersection_update(self, *s: Iterable[Any]) -> None: + raise NotImplementedError + + def issubset(self, s: Iterable[Any], /) -> bool: + raise NotImplementedError + + def issuperset(self, other: Iterable[Any]) -> bool: + return all(elem in self for elem in other) + + def symmetric_difference(self, s: Iterable[T], /) -> set[T]: + raise NotImplementedError + + def symmetric_difference_update(self, s: Iterable[T], /) -> None: + raise NotImplementedError + + def union(self, *s: Iterable[_S]) -> set[T | _S]: + raise NotImplementedError + + def update(self, *others: Iterable[T]) -> None: + for other in others: + for elem in other: + self.add(elem) + + +SetField = ContainerField[SetStorageContainer[T]] diff --git a/hathor/nanocontracts/fields/utils.py b/hathor/nanocontracts/fields/utils.py new file mode 100644 index 000000000..b03027b17 --- /dev/null +++ b/hathor/nanocontracts/fields/utils.py @@ -0,0 +1,22 @@ +# Copyright 2025 Hathor Labs +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from collections.abc import Mapping +from typing import TYPE_CHECKING, TypeAlias + +if TYPE_CHECKING: + from hathor.nanocontracts.fields import Field + + +TypeToFieldMap: TypeAlias = Mapping[type, type['Field']]