Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
63 changes: 63 additions & 0 deletions hathor/nanocontracts/allowed_imports.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,63 @@
# Copyright 2025 Hathor Labs
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

import collections
import math
import typing

import hathor.nanocontracts as nc

# this is what's allowed to be imported in blueprints, to be checked in the AST and in runtime
ALLOWED_IMPORTS: dict[str, dict[str, object]] = {
# globals
'math': dict(
ceil=math.ceil,
floor=math.floor,
),
'typing': dict(
Optional=typing.Optional,
NamedTuple=typing.NamedTuple,
TypeAlias=typing.TypeAlias,
Union=typing.Union,
),
'collections': dict(OrderedDict=collections.OrderedDict),
# hathor
'hathor.nanocontracts': dict(Blueprint=nc.Blueprint),
'hathor.nanocontracts.blueprint': dict(Blueprint=nc.Blueprint),
'hathor.nanocontracts.context': dict(Context=nc.Context),
'hathor.nanocontracts.exception': dict(NCFail=nc.NCFail),
'hathor.nanocontracts.types': dict(
NCAction=nc.types.NCAction,
NCActionType=nc.types.NCActionType,
SignedData=nc.types.SignedData,
public=nc.public,
view=nc.view,
fallback=nc.fallback,
Address=nc.types.Address,
Amount=nc.types.Amount,
Timestamp=nc.types.Timestamp,
TokenUid=nc.types.TokenUid,
TxOutputScript=nc.types.TxOutputScript,
BlueprintId=nc.types.BlueprintId,
ContractId=nc.types.ContractId,
VertexId=nc.types.VertexId,
NCDepositAction=nc.types.NCDepositAction,
NCWithdrawalAction=nc.types.NCWithdrawalAction,
NCGrantAuthorityAction=nc.types.NCGrantAuthorityAction,
NCAcquireAuthorityAction=nc.types.NCAcquireAuthorityAction,
NCArgs=nc.types.NCArgs,
NCRawArgs=nc.types.NCRawArgs,
NCParsedArgs=nc.types.NCParsedArgs,
),
}
20 changes: 17 additions & 3 deletions hathor/nanocontracts/custom_builtins.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,13 +27,15 @@
Sequence,
SupportsIndex,
TypeVar,
cast,
final,
)

from typing_extensions import Self, TypeVarTuple

from hathor.nanocontracts.allowed_imports import ALLOWED_IMPORTS
from hathor.nanocontracts.exception import NCDisabledBuiltinError
from hathor.nanocontracts.on_chain_blueprint import ALLOWED_IMPORTS, BLUEPRINT_CLASS_NAME
from hathor.nanocontracts.on_chain_blueprint import BLUEPRINT_CLASS_NAME

T = TypeVar('T')
Ts = TypeVarTuple('Ts')
Expand Down Expand Up @@ -218,7 +220,7 @@ def __call__(
...


def _generate_restricted_import_function(allowed_imports: dict[str, set[str]]) -> ImportFunction:
def _generate_restricted_import_function(allowed_imports: dict[str, dict[str, object]]) -> ImportFunction:
"""Returns a function equivalent to builtins.__import__ but that will only import `allowed_imports`"""
@_wraps(builtins.__import__)
def __import__(
Expand All @@ -235,11 +237,23 @@ def __import__(
raise ImportError('Only `from ... import ...` imports are allowed')
if name not in allowed_imports:
raise ImportError(f'Import from "{name}" is not allowed.')

# Create a fake module class that will only be returned by this import call
class FakeModule:
__slots__ = tuple(fromlist)

fake_module = FakeModule()
allowed_fromlist = allowed_imports[name]

for import_what in fromlist:
if import_what not in allowed_fromlist:
raise ImportError(f'Import from "{name}.{import_what}" is not allowed.')
return builtins.__import__(name=name, globals=globals, fromlist=fromlist, level=0)

setattr(fake_module, import_what, allowed_fromlist[import_what])

# This cast is safe because the only requirement is that the object contains the imported attributes.
return cast(types.ModuleType, fake_module)

return __import__


Expand Down
3 changes: 2 additions & 1 deletion hathor/nanocontracts/metered_exec.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@

from structlog import get_logger

from hathor.nanocontracts.custom_builtins import EXEC_BUILTINS
from hathor.nanocontracts.on_chain_blueprint import PYTHON_CODE_COMPAT_VERSION

logger = get_logger()
Expand Down Expand Up @@ -59,6 +58,7 @@ def get_memory_limit(self) -> int:
def exec(self, source: str, /) -> dict[str, Any]:
""" This is equivalent to `exec(source)` but with execution metering and memory limiting.
"""
from hathor.nanocontracts.custom_builtins import EXEC_BUILTINS
env: dict[str, object] = {
'__builtins__': EXEC_BUILTINS,
}
Expand All @@ -80,6 +80,7 @@ def exec(self, source: str, /) -> dict[str, Any]:
def call(self, func: Callable[_P, _T], /, *, args: _P.args) -> _T:
""" This is equivalent to `func(*args, **kwargs)` but with execution metering and memory limiting.
"""
from hathor.nanocontracts.custom_builtins import EXEC_BUILTINS
env: dict[str, object] = {
'__builtins__': EXEC_BUILTINS,
'__func__': func,
Expand Down
36 changes: 0 additions & 36 deletions hathor/nanocontracts/on_chain_blueprint.py
Original file line number Diff line number Diff line change
Expand Up @@ -53,42 +53,6 @@
# max compression level, used as default
MAX_COMPRESSION_LEVEL = 9

# this is what's allowed to be imported, to be checked in the AST and in runtime
ALLOWED_IMPORTS: dict[str, set[str]] = {
# globals
'math': {'ceil', 'floor'},
'typing': {'Optional', 'NamedTuple', 'TypeAlias', 'Union'},
'collections': {'OrderedDict'},
# hathor
'hathor.nanocontracts': {'Blueprint'},
'hathor.nanocontracts.blueprint': {'Blueprint'},
'hathor.nanocontracts.context': {'Context'},
'hathor.nanocontracts.exception': {'NCFail'},
'hathor.nanocontracts.types': {
'NCAction',
'NCActionType',
'SignedData',
'public',
'view',
'fallback',
'Address',
'Amount',
'Timestamp',
'TokenUid',
'TxOutputScript',
'BlueprintId',
'ContractId',
'VertexId',
'NCDepositAction',
'NCWithdrawalAction',
'NCGrantAuthorityAction',
'NCAcquireAuthorityAction',
'NCArgs',
'NCRawArgs',
'NCParsedArgs',
},
}

# these names aren't allowed in the code, to be checked in the AST only
AST_NAME_BLACKLIST: set[str] = {
'__builtins__',
Expand Down
2 changes: 1 addition & 1 deletion hathor/verification/on_chain_blueprint_verifier.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,9 +22,9 @@
from hathor.conf.settings import HathorSettings
from hathor.crypto.util import get_address_b58_from_public_key_bytes, get_public_key_from_bytes_compressed
from hathor.nanocontracts import OnChainBlueprint
from hathor.nanocontracts.allowed_imports import ALLOWED_IMPORTS
from hathor.nanocontracts.exception import NCInvalidPubKey, NCInvalidSignature, OCBInvalidScript, OCBPubKeyNotAllowed
from hathor.nanocontracts.on_chain_blueprint import (
ALLOWED_IMPORTS,
AST_NAME_BLACKLIST,
BLUEPRINT_CLASS_NAME,
PYTHON_CODE_COMPAT_VERSION,
Expand Down
32 changes: 26 additions & 6 deletions tests/nanocontracts/blueprints/unittest.py
Original file line number Diff line number Diff line change
Expand Up @@ -89,20 +89,40 @@ def _register_blueprint_class(
def register_blueprint_file(self, path: PathLike[str], blueprint_id: BlueprintId | None = None) -> BlueprintId:
"""Register a blueprint file with an optional id, allowing contracts to be created from it."""
with open(path, 'r') as f:
return self.register_blueprint_contents(f, blueprint_id)
return self._register_blueprint_contents(f, blueprint_id)

def register_blueprint_contents(
def _register_blueprint_contents(
self,
contents: TextIOWrapper,
blueprint_id: BlueprintId | None = None,
*,
skip_verification: bool = False,
inject_in_class: dict[str, object] | None = None,
) -> BlueprintId:
"""Register blueprint contents with an optional id, allowing contracts to be created from it."""
"""
Register blueprint contents with an optional id, allowing contracts to be created from it.

Args:
contents: the blueprint source code, usually a file or StringIO
blueprint_id: optional ID for the blueprint
skip_verification: skip verifying the blueprint with restrictions such as AST verification
inject_in_class: objects to inject in the blueprint class, accessible in contract runtime

Returns: the blueprint_id
"""
code = Code.from_python_code(contents.read(), self._settings)
verifier = OnChainBlueprintVerifier(settings=self._settings)
ocb = OnChainBlueprint(hash=b'', code=code)
verifier.verify_code(ocb)

return self._register_blueprint_class(ocb.get_blueprint_class(), blueprint_id)
if not skip_verification:
verifier = OnChainBlueprintVerifier(settings=self._settings)
verifier.verify_code(ocb)

blueprint_class = ocb.get_blueprint_class()
if inject_in_class is not None:
for key, value in inject_in_class.items():
setattr(blueprint_class, key, value)

return self._register_blueprint_class(blueprint_class, blueprint_id)

def build_runner(self) -> TestRunner:
"""Create a Runner instance."""
Expand Down
75 changes: 59 additions & 16 deletions tests/nanocontracts/test_custom_import.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,32 +12,33 @@
# See the License for the specific language governing permissions and
# limitations under the License.

import builtins
from io import StringIO
from textwrap import dedent
from unittest.mock import ANY, Mock, call
from unittest.mock import ANY, Mock, call, patch

from hathor.nanocontracts.custom_builtins import EXEC_BUILTINS
from tests.nanocontracts.blueprints.unittest import BlueprintTestCase


class TestCustomImport(BlueprintTestCase):
def test_custom_import(self) -> None:
def test_custom_import_is_used(self) -> None:
"""Guarantee our custom import function is being called, instead of the builtin one."""
contract_id = self.gen_random_contract_id()
blueprint = '''
from hathor.nanocontracts import Blueprint
from hathor.nanocontracts.context import Context
from hathor.nanocontracts.types import public

class MyBlueprint(Blueprint):
@public
def initialize(self, ctx: Context) -> None:
from math import ceil, floor
from collections import OrderedDict
from hathor.nanocontracts.exception import NCFail
from hathor.nanocontracts.types import NCAction, NCActionType

__blueprint__ = MyBlueprint
from hathor.nanocontracts import Blueprint
from hathor.nanocontracts.context import Context
from hathor.nanocontracts.types import public

class MyBlueprint(Blueprint):
@public
def initialize(self, ctx: Context) -> None:
from math import ceil, floor
from collections import OrderedDict
from hathor.nanocontracts.exception import NCFail
from hathor.nanocontracts.types import NCAction, NCActionType

__blueprint__ = MyBlueprint
'''

# Wrap our custom builtin so we can spy its calls
Expand All @@ -49,7 +50,7 @@ def initialize(self, ctx: Context) -> None:

# During blueprint registration, the function is called for each import at the module level.
# This happens twice, once during verification and once during the actual registration.
blueprint_id = self.register_blueprint_contents(StringIO(dedent(blueprint)))
blueprint_id = self._register_blueprint_contents(StringIO(dedent(blueprint)))
module_level_calls = [
call('hathor.nanocontracts', ANY, ANY, ('Blueprint',), 0),
call('hathor.nanocontracts.context', ANY, ANY, ('Context',), 0),
Expand All @@ -69,3 +70,45 @@ def initialize(self, ctx: Context) -> None:
]
assert wrapped_import_function.call_count == len(method_level_imports)
wrapped_import_function.assert_has_calls(method_level_imports)

def test_builtin_import_is_not_used(self) -> None:
"""
Guarantee the builtin import function is never called in the contract runtime.

To implement this test we need to use source code instead of a class directly, otherwise
the imports wouldn't run during nano runtime, but before. Because of that, we also need to
use `inject_in_class` to provide the blueprint with objects it cannot normally import.
"""
contract_id = self.gen_random_contract_id()
blueprint = '''
from hathor.nanocontracts import Blueprint
from hathor.nanocontracts.context import Context
from hathor.nanocontracts.types import public

class MyBlueprint(Blueprint):
@public
def initialize(self, ctx: Context) -> None:
wrapped_builtin_import = self.Mock(wraps=self.builtins.__import__)
wrapped_builtin_import.assert_not_called()

with self.patch.object(self.builtins, '__import__', wrapped_builtin_import):
from math import ceil, floor
from collections import OrderedDict
from hathor.nanocontracts.exception import NCFail
from hathor.nanocontracts.types import NCAction, NCActionType

wrapped_builtin_import.assert_not_called()

__blueprint__ = MyBlueprint
'''

blueprint_id = self._register_blueprint_contents(
contents=StringIO(dedent(blueprint)),
skip_verification=True,
inject_in_class=dict(
builtins=builtins,
Mock=Mock,
patch=patch,
)
)
self.runner.create_contract(contract_id, blueprint_id, self.create_context())
12 changes: 8 additions & 4 deletions tests/nanocontracts/test_exposed_properties.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,8 @@
from typing import Any

from hathor.nanocontracts import Blueprint, Context, public
from hathor.nanocontracts.allowed_imports import ALLOWED_IMPORTS
from hathor.nanocontracts.custom_builtins import EXEC_BUILTINS
from hathor.nanocontracts.on_chain_blueprint import ALLOWED_IMPORTS
from tests.nanocontracts.blueprints.unittest import BlueprintTestCase

MAX_DEPTH = 20
Expand Down Expand Up @@ -195,6 +195,13 @@
'super.some_new_attribute',
'type.mro',
'type.some_new_attribute',
'typing.NamedTuple.some_new_attribute',
'typing.Optional._getitem',
'typing.Optional._name',
'typing.TypeAlias._getitem',
'typing.TypeAlias._name',
'typing.Union._getitem',
'typing.Union._name',
'vars.some_new_attribute',
]

Expand Down Expand Up @@ -315,9 +322,6 @@ def check(self, ctx: Context) -> list[str]:
mutable_props.extend(search_writeable_properties(ctx, 'ctx'))
custom_import = EXEC_BUILTINS['__import__']
for module_name, import_names in ALLOWED_IMPORTS.items():
if module_name == 'typing':
# FIXME: typing module causes problems for some reason
continue
module = custom_import(module_name, fromlist=list(import_names))
for import_name in import_names:
obj = getattr(module, import_name)
Expand Down
Loading