diff --git a/hathor/nanocontracts/allowed_imports.py b/hathor/nanocontracts/allowed_imports.py new file mode 100644 index 000000000..7d82d49bc --- /dev/null +++ b/hathor/nanocontracts/allowed_imports.py @@ -0,0 +1,63 @@ +# Copyright 2025 Hathor Labs +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import collections +import math +import typing + +import hathor.nanocontracts as nc + +# this is what's allowed to be imported in blueprints, to be checked in the AST and in runtime +ALLOWED_IMPORTS: dict[str, dict[str, object]] = { + # globals + 'math': dict( + ceil=math.ceil, + floor=math.floor, + ), + 'typing': dict( + Optional=typing.Optional, + NamedTuple=typing.NamedTuple, + TypeAlias=typing.TypeAlias, + Union=typing.Union, + ), + 'collections': dict(OrderedDict=collections.OrderedDict), + # hathor + 'hathor.nanocontracts': dict(Blueprint=nc.Blueprint), + 'hathor.nanocontracts.blueprint': dict(Blueprint=nc.Blueprint), + 'hathor.nanocontracts.context': dict(Context=nc.Context), + 'hathor.nanocontracts.exception': dict(NCFail=nc.NCFail), + 'hathor.nanocontracts.types': dict( + NCAction=nc.types.NCAction, + NCActionType=nc.types.NCActionType, + SignedData=nc.types.SignedData, + public=nc.public, + view=nc.view, + fallback=nc.fallback, + Address=nc.types.Address, + Amount=nc.types.Amount, + Timestamp=nc.types.Timestamp, + TokenUid=nc.types.TokenUid, + TxOutputScript=nc.types.TxOutputScript, + BlueprintId=nc.types.BlueprintId, + ContractId=nc.types.ContractId, + VertexId=nc.types.VertexId, + NCDepositAction=nc.types.NCDepositAction, + NCWithdrawalAction=nc.types.NCWithdrawalAction, + NCGrantAuthorityAction=nc.types.NCGrantAuthorityAction, + NCAcquireAuthorityAction=nc.types.NCAcquireAuthorityAction, + NCArgs=nc.types.NCArgs, + NCRawArgs=nc.types.NCRawArgs, + NCParsedArgs=nc.types.NCParsedArgs, + ), +} diff --git a/hathor/nanocontracts/custom_builtins.py b/hathor/nanocontracts/custom_builtins.py index 37b50aba1..817ae17cb 100644 --- a/hathor/nanocontracts/custom_builtins.py +++ b/hathor/nanocontracts/custom_builtins.py @@ -27,13 +27,15 @@ Sequence, SupportsIndex, TypeVar, + cast, final, ) from typing_extensions import Self, TypeVarTuple +from hathor.nanocontracts.allowed_imports import ALLOWED_IMPORTS from hathor.nanocontracts.exception import NCDisabledBuiltinError -from hathor.nanocontracts.on_chain_blueprint import ALLOWED_IMPORTS, BLUEPRINT_CLASS_NAME +from hathor.nanocontracts.on_chain_blueprint import BLUEPRINT_CLASS_NAME T = TypeVar('T') Ts = TypeVarTuple('Ts') @@ -218,7 +220,7 @@ def __call__( ... -def _generate_restricted_import_function(allowed_imports: dict[str, set[str]]) -> ImportFunction: +def _generate_restricted_import_function(allowed_imports: dict[str, dict[str, object]]) -> ImportFunction: """Returns a function equivalent to builtins.__import__ but that will only import `allowed_imports`""" @_wraps(builtins.__import__) def __import__( @@ -235,11 +237,23 @@ def __import__( raise ImportError('Only `from ... import ...` imports are allowed') if name not in allowed_imports: raise ImportError(f'Import from "{name}" is not allowed.') + + # Create a fake module class that will only be returned by this import call + class FakeModule: + __slots__ = tuple(fromlist) + + fake_module = FakeModule() allowed_fromlist = allowed_imports[name] + for import_what in fromlist: if import_what not in allowed_fromlist: raise ImportError(f'Import from "{name}.{import_what}" is not allowed.') - return builtins.__import__(name=name, globals=globals, fromlist=fromlist, level=0) + + setattr(fake_module, import_what, allowed_fromlist[import_what]) + + # This cast is safe because the only requirement is that the object contains the imported attributes. + return cast(types.ModuleType, fake_module) + return __import__ diff --git a/hathor/nanocontracts/metered_exec.py b/hathor/nanocontracts/metered_exec.py index 0097c30dd..e2e82c19d 100644 --- a/hathor/nanocontracts/metered_exec.py +++ b/hathor/nanocontracts/metered_exec.py @@ -18,7 +18,6 @@ from structlog import get_logger -from hathor.nanocontracts.custom_builtins import EXEC_BUILTINS from hathor.nanocontracts.on_chain_blueprint import PYTHON_CODE_COMPAT_VERSION logger = get_logger() @@ -59,6 +58,7 @@ def get_memory_limit(self) -> int: def exec(self, source: str, /) -> dict[str, Any]: """ This is equivalent to `exec(source)` but with execution metering and memory limiting. """ + from hathor.nanocontracts.custom_builtins import EXEC_BUILTINS env: dict[str, object] = { '__builtins__': EXEC_BUILTINS, } @@ -80,6 +80,7 @@ def exec(self, source: str, /) -> dict[str, Any]: def call(self, func: Callable[_P, _T], /, *, args: _P.args) -> _T: """ This is equivalent to `func(*args, **kwargs)` but with execution metering and memory limiting. """ + from hathor.nanocontracts.custom_builtins import EXEC_BUILTINS env: dict[str, object] = { '__builtins__': EXEC_BUILTINS, '__func__': func, diff --git a/hathor/nanocontracts/on_chain_blueprint.py b/hathor/nanocontracts/on_chain_blueprint.py index 0eef02261..3b7c88a24 100644 --- a/hathor/nanocontracts/on_chain_blueprint.py +++ b/hathor/nanocontracts/on_chain_blueprint.py @@ -53,42 +53,6 @@ # max compression level, used as default MAX_COMPRESSION_LEVEL = 9 -# this is what's allowed to be imported, to be checked in the AST and in runtime -ALLOWED_IMPORTS: dict[str, set[str]] = { - # globals - 'math': {'ceil', 'floor'}, - 'typing': {'Optional', 'NamedTuple', 'TypeAlias', 'Union'}, - 'collections': {'OrderedDict'}, - # hathor - 'hathor.nanocontracts': {'Blueprint'}, - 'hathor.nanocontracts.blueprint': {'Blueprint'}, - 'hathor.nanocontracts.context': {'Context'}, - 'hathor.nanocontracts.exception': {'NCFail'}, - 'hathor.nanocontracts.types': { - 'NCAction', - 'NCActionType', - 'SignedData', - 'public', - 'view', - 'fallback', - 'Address', - 'Amount', - 'Timestamp', - 'TokenUid', - 'TxOutputScript', - 'BlueprintId', - 'ContractId', - 'VertexId', - 'NCDepositAction', - 'NCWithdrawalAction', - 'NCGrantAuthorityAction', - 'NCAcquireAuthorityAction', - 'NCArgs', - 'NCRawArgs', - 'NCParsedArgs', - }, -} - # these names aren't allowed in the code, to be checked in the AST only AST_NAME_BLACKLIST: set[str] = { '__builtins__', diff --git a/hathor/verification/on_chain_blueprint_verifier.py b/hathor/verification/on_chain_blueprint_verifier.py index 68d52ff1a..7ef689b01 100644 --- a/hathor/verification/on_chain_blueprint_verifier.py +++ b/hathor/verification/on_chain_blueprint_verifier.py @@ -22,9 +22,9 @@ from hathor.conf.settings import HathorSettings from hathor.crypto.util import get_address_b58_from_public_key_bytes, get_public_key_from_bytes_compressed from hathor.nanocontracts import OnChainBlueprint +from hathor.nanocontracts.allowed_imports import ALLOWED_IMPORTS from hathor.nanocontracts.exception import NCInvalidPubKey, NCInvalidSignature, OCBInvalidScript, OCBPubKeyNotAllowed from hathor.nanocontracts.on_chain_blueprint import ( - ALLOWED_IMPORTS, AST_NAME_BLACKLIST, BLUEPRINT_CLASS_NAME, PYTHON_CODE_COMPAT_VERSION, diff --git a/tests/nanocontracts/blueprints/unittest.py b/tests/nanocontracts/blueprints/unittest.py index c913f9a92..972b03276 100644 --- a/tests/nanocontracts/blueprints/unittest.py +++ b/tests/nanocontracts/blueprints/unittest.py @@ -89,20 +89,40 @@ def _register_blueprint_class( def register_blueprint_file(self, path: PathLike[str], blueprint_id: BlueprintId | None = None) -> BlueprintId: """Register a blueprint file with an optional id, allowing contracts to be created from it.""" with open(path, 'r') as f: - return self.register_blueprint_contents(f, blueprint_id) + return self._register_blueprint_contents(f, blueprint_id) - def register_blueprint_contents( + def _register_blueprint_contents( self, contents: TextIOWrapper, blueprint_id: BlueprintId | None = None, + *, + skip_verification: bool = False, + inject_in_class: dict[str, object] | None = None, ) -> BlueprintId: - """Register blueprint contents with an optional id, allowing contracts to be created from it.""" + """ + Register blueprint contents with an optional id, allowing contracts to be created from it. + + Args: + contents: the blueprint source code, usually a file or StringIO + blueprint_id: optional ID for the blueprint + skip_verification: skip verifying the blueprint with restrictions such as AST verification + inject_in_class: objects to inject in the blueprint class, accessible in contract runtime + + Returns: the blueprint_id + """ code = Code.from_python_code(contents.read(), self._settings) - verifier = OnChainBlueprintVerifier(settings=self._settings) ocb = OnChainBlueprint(hash=b'', code=code) - verifier.verify_code(ocb) - return self._register_blueprint_class(ocb.get_blueprint_class(), blueprint_id) + if not skip_verification: + verifier = OnChainBlueprintVerifier(settings=self._settings) + verifier.verify_code(ocb) + + blueprint_class = ocb.get_blueprint_class() + if inject_in_class is not None: + for key, value in inject_in_class.items(): + setattr(blueprint_class, key, value) + + return self._register_blueprint_class(blueprint_class, blueprint_id) def build_runner(self) -> TestRunner: """Create a Runner instance.""" diff --git a/tests/nanocontracts/test_custom_import.py b/tests/nanocontracts/test_custom_import.py index 6197dbf36..876c15f33 100644 --- a/tests/nanocontracts/test_custom_import.py +++ b/tests/nanocontracts/test_custom_import.py @@ -12,32 +12,33 @@ # See the License for the specific language governing permissions and # limitations under the License. +import builtins from io import StringIO from textwrap import dedent -from unittest.mock import ANY, Mock, call +from unittest.mock import ANY, Mock, call, patch from hathor.nanocontracts.custom_builtins import EXEC_BUILTINS from tests.nanocontracts.blueprints.unittest import BlueprintTestCase class TestCustomImport(BlueprintTestCase): - def test_custom_import(self) -> None: + def test_custom_import_is_used(self) -> None: """Guarantee our custom import function is being called, instead of the builtin one.""" contract_id = self.gen_random_contract_id() blueprint = ''' - from hathor.nanocontracts import Blueprint - from hathor.nanocontracts.context import Context - from hathor.nanocontracts.types import public - - class MyBlueprint(Blueprint): - @public - def initialize(self, ctx: Context) -> None: - from math import ceil, floor - from collections import OrderedDict - from hathor.nanocontracts.exception import NCFail - from hathor.nanocontracts.types import NCAction, NCActionType - - __blueprint__ = MyBlueprint + from hathor.nanocontracts import Blueprint + from hathor.nanocontracts.context import Context + from hathor.nanocontracts.types import public + + class MyBlueprint(Blueprint): + @public + def initialize(self, ctx: Context) -> None: + from math import ceil, floor + from collections import OrderedDict + from hathor.nanocontracts.exception import NCFail + from hathor.nanocontracts.types import NCAction, NCActionType + + __blueprint__ = MyBlueprint ''' # Wrap our custom builtin so we can spy its calls @@ -49,7 +50,7 @@ def initialize(self, ctx: Context) -> None: # During blueprint registration, the function is called for each import at the module level. # This happens twice, once during verification and once during the actual registration. - blueprint_id = self.register_blueprint_contents(StringIO(dedent(blueprint))) + blueprint_id = self._register_blueprint_contents(StringIO(dedent(blueprint))) module_level_calls = [ call('hathor.nanocontracts', ANY, ANY, ('Blueprint',), 0), call('hathor.nanocontracts.context', ANY, ANY, ('Context',), 0), @@ -69,3 +70,45 @@ def initialize(self, ctx: Context) -> None: ] assert wrapped_import_function.call_count == len(method_level_imports) wrapped_import_function.assert_has_calls(method_level_imports) + + def test_builtin_import_is_not_used(self) -> None: + """ + Guarantee the builtin import function is never called in the contract runtime. + + To implement this test we need to use source code instead of a class directly, otherwise + the imports wouldn't run during nano runtime, but before. Because of that, we also need to + use `inject_in_class` to provide the blueprint with objects it cannot normally import. + """ + contract_id = self.gen_random_contract_id() + blueprint = ''' + from hathor.nanocontracts import Blueprint + from hathor.nanocontracts.context import Context + from hathor.nanocontracts.types import public + + class MyBlueprint(Blueprint): + @public + def initialize(self, ctx: Context) -> None: + wrapped_builtin_import = self.Mock(wraps=self.builtins.__import__) + wrapped_builtin_import.assert_not_called() + + with self.patch.object(self.builtins, '__import__', wrapped_builtin_import): + from math import ceil, floor + from collections import OrderedDict + from hathor.nanocontracts.exception import NCFail + from hathor.nanocontracts.types import NCAction, NCActionType + + wrapped_builtin_import.assert_not_called() + + __blueprint__ = MyBlueprint + ''' + + blueprint_id = self._register_blueprint_contents( + contents=StringIO(dedent(blueprint)), + skip_verification=True, + inject_in_class=dict( + builtins=builtins, + Mock=Mock, + patch=patch, + ) + ) + self.runner.create_contract(contract_id, blueprint_id, self.create_context()) diff --git a/tests/nanocontracts/test_exposed_properties.py b/tests/nanocontracts/test_exposed_properties.py index 2240b0ea8..2c6718f65 100644 --- a/tests/nanocontracts/test_exposed_properties.py +++ b/tests/nanocontracts/test_exposed_properties.py @@ -4,8 +4,8 @@ from typing import Any from hathor.nanocontracts import Blueprint, Context, public +from hathor.nanocontracts.allowed_imports import ALLOWED_IMPORTS from hathor.nanocontracts.custom_builtins import EXEC_BUILTINS -from hathor.nanocontracts.on_chain_blueprint import ALLOWED_IMPORTS from tests.nanocontracts.blueprints.unittest import BlueprintTestCase MAX_DEPTH = 20 @@ -195,6 +195,13 @@ 'super.some_new_attribute', 'type.mro', 'type.some_new_attribute', + 'typing.NamedTuple.some_new_attribute', + 'typing.Optional._getitem', + 'typing.Optional._name', + 'typing.TypeAlias._getitem', + 'typing.TypeAlias._name', + 'typing.Union._getitem', + 'typing.Union._name', 'vars.some_new_attribute', ] @@ -315,9 +322,6 @@ def check(self, ctx: Context) -> list[str]: mutable_props.extend(search_writeable_properties(ctx, 'ctx')) custom_import = EXEC_BUILTINS['__import__'] for module_name, import_names in ALLOWED_IMPORTS.items(): - if module_name == 'typing': - # FIXME: typing module causes problems for some reason - continue module = custom_import(module_name, fromlist=list(import_names)) for import_name in import_names: obj = getattr(module, import_name)