diff --git a/Makefile b/Makefile index fc0cbfbbb..c34c8d735 100644 --- a/Makefile +++ b/Makefile @@ -6,6 +6,7 @@ all: check tests # testing: tests_cli = tests/cli/ +tests_nano = tests/nanocontracts/ tests/tx/test_indexes_nc_history.py tests/resources/nanocontracts/ tests_lib = $(filter-out ${tests_cli} tests/__pycache__/, $(dir $(wildcard tests/*/.))) tests_ci = extras/github/ @@ -24,6 +25,10 @@ pytest_flags = -p no:warnings --cov-report=term --cov-report=html --cov-report=x #--implicit-reexport #--no-implicit-reexport +.PHONY: tests-nano +tests-nano: + pytest --durations=10 --cov-report=html --cov=hathor/nanocontracts/ --cov-config=.coveragerc_full -p no:warnings $(tests_nano) + .PHONY: tests-cli tests-cli: pytest --durations=10 --cov=hathor/cli/ --cov-config=.coveragerc_full --cov-fail-under=27 -p no:warnings $(tests_cli) diff --git a/tests/nanocontracts/test_execution_verification.py b/tests/nanocontracts/test_execution_verification.py new file mode 100644 index 000000000..43ffa8417 --- /dev/null +++ b/tests/nanocontracts/test_execution_verification.py @@ -0,0 +1,94 @@ +# Copyright 2025 Hathor Labs +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import pytest + +from hathor.nanocontracts import Blueprint, Context, public +from hathor.nanocontracts.exception import ( + BlueprintDoesNotExist, + NCFail, + NCMethodNotFound, + NCUninitializedContractError, +) +from hathor.nanocontracts.method import ArgsOnly +from hathor.nanocontracts.runner.types import NCRawArgs +from tests.nanocontracts.blueprints.unittest import BlueprintTestCase + + +class MyBlueprint(Blueprint): + @public + def initialize(self, ctx: Context, a: int) -> None: + pass + + +class TestExecutionVerification(BlueprintTestCase): + def setUp(self) -> None: + super().setUp() + self.blueprint_id = self.gen_random_blueprint_id() + self.contract_id = self.gen_random_contract_id() + self.register_blueprint_class(self.blueprint_id, MyBlueprint) + + def test_blueprint_does_not_exist(self) -> None: + with pytest.raises(BlueprintDoesNotExist): + self.runner.create_contract(self.contract_id, self.gen_random_blueprint_id(), self.create_context(), 123) + + def test_contract_does_not_exist(self) -> None: + with pytest.raises(NCUninitializedContractError): + self.runner.call_public_method(self.gen_random_contract_id(), 'method', self.create_context()) + + def test_method_not_found(self) -> None: + self.runner.create_contract(self.contract_id, self.blueprint_id, self.create_context(), 123) + + with pytest.raises(NCMethodNotFound): + self.runner.call_public_method(self.contract_id, 'not_found', self.create_context()) + + def test_empty_args(self) -> None: + with pytest.raises(NCFail) as e: + self.runner.create_contract(self.contract_id, self.blueprint_id, self.create_context()) + assert isinstance(e.value.__cause__, TypeError) + assert e.value.__cause__.args[0] == "MyBlueprint.initialize() missing 1 required positional argument: 'a'" + + def test_too_many_args(self) -> None: + with pytest.raises(NCFail) as e: + self.runner.create_contract(self.contract_id, self.blueprint_id, self.create_context(), 123, 456) + assert isinstance(e.value.__cause__, TypeError) + assert e.value.__cause__.args[0] == "MyBlueprint.initialize() takes 3 positional arguments but 4 were given" + + @pytest.mark.xfail(strict=True, reason='not implemented yet') + def test_wrong_arg_type_parsed(self) -> None: + with pytest.raises(NCFail): + self.runner.create_contract(self.contract_id, self.blueprint_id, self.create_context(), 'abc') + + def test_wrong_arg_type_raw(self) -> None: + args_parser = ArgsOnly.from_arg_types((str,)) + args_bytes = args_parser.serialize_args_bytes(('abc',)) + nc_args = NCRawArgs(args_bytes) + + with pytest.raises(NCFail) as e: + self.runner.create_contract_with_nc_args( + self.contract_id, self.blueprint_id, self.create_context(), nc_args + ) + assert isinstance(e.value.__cause__, ValueError) + assert e.value.__cause__.args[0] == 'trailing data' + + @pytest.mark.xfail(strict=True, reason='not implemented yet') + def test_wrong_arg_type_but_valid_serialization(self) -> None: + args_parser = ArgsOnly.from_arg_types((str,)) + args_bytes = args_parser.serialize_args_bytes(('',)) + nc_args = NCRawArgs(args_bytes) + + with pytest.raises(NCFail): + self.runner.create_contract_with_nc_args( + self.contract_id, self.blueprint_id, self.create_context(), nc_args + ) diff --git a/tests/nanocontracts/test_fallback_method.py b/tests/nanocontracts/test_fallback_method.py new file mode 100644 index 000000000..1159a4f4d --- /dev/null +++ b/tests/nanocontracts/test_fallback_method.py @@ -0,0 +1,211 @@ +# Copyright 2025 Hathor Labs +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from typing import Any, assert_never +from unittest.mock import ANY + +import pytest + +from hathor.conf.settings import HATHOR_TOKEN_UID +from hathor.nanocontracts import Blueprint, Context, NCFail, public +from hathor.nanocontracts.exception import NCError, NCInvalidMethodCall +from hathor.nanocontracts.method import ArgsOnly +from hathor.nanocontracts.nc_exec_logs import NCCallBeginEntry, NCCallEndEntry +from hathor.nanocontracts.runner.types import CallType, NCArgs, NCParsedArgs, NCRawArgs +from hathor.nanocontracts.types import ContractId, NCDepositAction, TokenUid, fallback +from hathor.transaction import Block, Transaction +from tests.dag_builder.builder import TestDAGBuilder +from tests.nanocontracts.blueprints.unittest import BlueprintTestCase +from tests.nanocontracts.utils import assert_nc_failure_reason + +# TODO: Test support for container args/kwargs such as list[int] after Jan's PR + + +class MyBlueprint(Blueprint): + @public(allow_deposit=True) + def initialize(self, ctx: Context) -> None: + pass + + @fallback(allow_deposit=True) + def fallback(self, ctx: Context, method_name: str, nc_args: NCArgs) -> str: + assert method_name == 'unknown' + match nc_args: + case NCRawArgs(): + # XXX: we might need to provide a better way to describe the expected signature to `try_parse_as`, + # because only looking a a tuple of types might not be enough, currently it is implemented + # without the knowledge of default arguments, what this implies is that considering a signature + # with types (str, int), it is possible for an empty tuple () to be a valid call, as long as the + # function has default values for its two arguments, the parser takes the optimist path and + # accepts parsing an empty tuple, so in this case args_bytes=b'\x00' parses to (), because it is + # possible that that is a valid call + result = nc_args.try_parse_as((str, int)) + if result is None: + raise NCFail(f'unsupported args: {nc_args}') + greeting, x = result + return self.greet_double(ctx, greeting, x) + case NCParsedArgs(args, kwargs): + return self.greet_double(ctx, *args, **kwargs) + case _: + assert_never(nc_args) + + def greet_double(self, ctx: Context, greeting: str, x: int) -> str: + return f'{greeting} {x + x}' + + @public(allow_deposit=True) + def call_another_fallback(self, ctx: Context, contract_id: ContractId) -> Any: + return self.syscall.call_public_method(contract_id, 'fallback', []) + + +class TestFallbackMethod(BlueprintTestCase): + def setUp(self) -> None: + super().setUp() + + self.blueprint_id = self.gen_random_blueprint_id() + self.contract_id = self.gen_random_contract_id() + self.register_blueprint_class(self.blueprint_id, MyBlueprint) + + self.ctx = Context( + actions=[NCDepositAction(token_uid=TokenUid(HATHOR_TOKEN_UID), amount=123)], + vertex=self.get_genesis_tx(), + address=self.gen_random_address(), + timestamp=self.now, + ) + self.runner.create_contract(self.contract_id, self.blueprint_id, self.ctx) + + def test_fallback_only_args_success(self) -> None: + result = self.runner.call_public_method(self.contract_id, 'unknown', self.ctx, 'hello', 123) + assert result == 'hello 246' + + last_call_info = self.runner.get_last_call_info() + assert last_call_info.nc_logger.__entries__ == [ + NCCallBeginEntry.construct( + timestamp=ANY, + nc_id=self.contract_id, + call_type=CallType.PUBLIC, + method_name='fallback', + str_args="('unknown', NCParsedArgs(args=('hello', 123), kwargs={}))", + str_kwargs='{}', + actions=[dict(amount=123, token_uid='00', type='deposit')] + ), + NCCallEndEntry.construct(timestamp=ANY), + ] + + def test_fallback_only_kwargs_success(self) -> None: + result = self.runner.call_public_method(self.contract_id, 'unknown', self.ctx, greeting='hello', x=123) + assert result == 'hello 246' + + last_call_info = self.runner.get_last_call_info() + assert last_call_info.nc_logger.__entries__ == [ + NCCallBeginEntry.construct( + timestamp=ANY, + nc_id=self.contract_id, + call_type=CallType.PUBLIC, + method_name='fallback', + str_args="('unknown', NCParsedArgs(args=(), kwargs={'greeting': 'hello', 'x': 123}))", + str_kwargs='{}', + actions=[dict(amount=123, token_uid='00', type='deposit')] + ), + NCCallEndEntry.construct(timestamp=ANY), + ] + + def test_fallback_args_kwargs_success(self) -> None: + result = self.runner.call_public_method(self.contract_id, 'unknown', self.ctx, 'hello', x=123) + assert result == 'hello 246' + + last_call_info = self.runner.get_last_call_info() + assert last_call_info.nc_logger.__entries__ == [ + NCCallBeginEntry.construct( + timestamp=ANY, + nc_id=self.contract_id, + call_type=CallType.PUBLIC, + method_name='fallback', + str_args="('unknown', NCParsedArgs(args=('hello',), kwargs={'x': 123}))", + str_kwargs='{}', + actions=[dict(amount=123, token_uid='00', type='deposit')] + ), + NCCallEndEntry.construct(timestamp=ANY), + ] + + def test_cannot_call_fallback_directly(self) -> None: + with pytest.raises(NCError, match='method `fallback` is not a public method'): + self.runner.call_public_method(self.contract_id, 'fallback', self.ctx) + + def test_cannot_call_another_fallback_directly(self) -> None: + contract_id = self.gen_random_contract_id() + self.runner.create_contract(contract_id, self.blueprint_id, self.ctx) + with pytest.raises(NCInvalidMethodCall, match='method `fallback` is not a public method'): + self.runner.call_public_method(self.contract_id, 'call_another_fallback', self.ctx, contract_id) + + def test_fallback_args_bytes_success(self) -> None: + args_parser = ArgsOnly.from_arg_types((str, int)) + args_bytes = args_parser.serialize_args_bytes(('hello', 123)) + nc_args = NCRawArgs(args_bytes) + result = self.runner.call_public_method_with_nc_args(self.contract_id, 'unknown', self.ctx, nc_args) + assert result == 'hello 246' + + last_call_info = self.runner.get_last_call_info() + assert last_call_info.nc_logger.__entries__ == [ + NCCallBeginEntry.construct( + timestamp=ANY, + nc_id=self.contract_id, + call_type=CallType.PUBLIC, + method_name='fallback', + str_args=f"('unknown', NCRawArgs('{args_bytes.hex()}'))", + str_kwargs='{}', + actions=[dict(amount=123, token_uid='00', type='deposit')] + ), + NCCallEndEntry.construct(timestamp=ANY), + ] + + def test_dag_fallback(self) -> None: + dag_builder = TestDAGBuilder.from_manager(self.manager) + args_parser = ArgsOnly.from_arg_types((str, int)) + valid_args_bytes = args_parser.serialize_args_bytes(('hello', 123)) + + artifacts = dag_builder.build_from_str(f''' + blockchain genesis b[1..11] + b10 < dummy + + nc1.nc_id = "{self.blueprint_id.hex()}" + nc1.nc_method = initialize() + + nc2.nc_id = nc1 + nc2.nc_method = unknown + nc2.nc_args_bytes = "{valid_args_bytes.hex()}" + + nc3.nc_id = nc1 + nc3.nc_method = unknown + nc3.nc_args_bytes = "00" + + nc1 <-- nc2 <-- nc3 <-- b11 + ''') + + artifacts.propagate_with(self.manager) + b11 = artifacts.get_typed_vertex('b11', Block) + nc1, nc2, nc3 = artifacts.get_typed_vertices(['nc1', 'nc2', 'nc3'], Transaction) + + assert b11.get_metadata().voided_by is None + assert nc1.get_metadata().voided_by is None + + # nc2 successfully executes because the nc_args_bytes is correct + assert nc2.get_metadata().voided_by is None + + # nc3 fails because the fallback method is not expecting these args_bytes + assert nc3.get_metadata().voided_by == {nc3.hash, self._settings.NC_EXECUTION_FAIL_ID} + assert_nc_failure_reason( + manager=self.manager, + tx_id=nc3.hash, + block_id=b11.hash, + reason='NCFail: unsupported args: 00', + ) diff --git a/tests/nanocontracts/test_follow_up_call.py b/tests/nanocontracts/test_follow_up_call.py new file mode 100644 index 000000000..99a96320a --- /dev/null +++ b/tests/nanocontracts/test_follow_up_call.py @@ -0,0 +1,115 @@ +# Copyright 2025 Hathor Labs +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import pytest + +from hathor.nanocontracts import Blueprint, Context, NCFail, public, view +from hathor.nanocontracts.types import ContractId +from tests.nanocontracts.blueprints.unittest import BlueprintTestCase + + +class MyBlueprint1(Blueprint): + other_id: ContractId + + @public + def initialize(self, ctx: Context, other_id: ContractId) -> None: + self.other_id = other_id + + @public + def public_nop(self, ctx: Context) -> None: + pass + + @view + def view_call_other_view(self, method_name: str) -> None: + self.syscall.call_view_method(self.other_id, method_name) + + @public + def public_call_other_view(self, ctx: Context, method_name: str) -> None: + self.syscall.call_view_method(self.other_id, method_name) + + @public + def public_call_other_public(self, ctx: Context, method_name: str) -> None: + self.syscall.call_public_method(self.other_id, method_name, []) + + +class MyBlueprint2(Blueprint): + @public + def initialize(self, ctx: Context) -> None: + pass + + @view + def view_nop(self) -> None: + pass + + @view + def view_fail(self) -> None: + raise NCFail('fail called') + + @public + def public_nop(self, ctx: Context) -> None: + pass + + @public + def public_fail(self, ctx: Context) -> None: + raise NCFail('fail called') + + +class TestFollowUpCall(BlueprintTestCase): + def setUp(self) -> None: + super().setUp() + + self.blueprint_id1 = self.gen_random_blueprint_id() + self.blueprint_id2 = self.gen_random_blueprint_id() + + self.register_blueprint_class(self.blueprint_id1, MyBlueprint1) + self.register_blueprint_class(self.blueprint_id2, MyBlueprint2) + + self.contract_id = self.gen_random_contract_id() + self.other_id = self.gen_random_contract_id() + + self.runner.create_contract(self.other_id, self.blueprint_id2, self.create_context()) + self.runner.create_contract(self.contract_id, self.blueprint_id1, self.create_context(), self.other_id) + + def test_view_call_other_view_success(self) -> None: + self.runner.call_view_method(self.contract_id, 'view_call_other_view', 'view_nop') + self.runner.call_public_method(self.contract_id, 'public_nop', self.create_context()) + + def test_public_call_other_view_success(self) -> None: + self.runner.call_public_method(self.contract_id, 'public_call_other_view', self.create_context(), 'view_nop') + self.runner.call_public_method(self.contract_id, 'public_nop', self.create_context()) + + def test_public_call_other_public_success(self) -> None: + self.runner.call_public_method( + self.contract_id, 'public_call_other_public', self.create_context(), 'public_nop' + ) + self.runner.call_public_method(self.contract_id, 'public_nop', self.create_context()) + + def test_view_call_other_view_fail(self) -> None: + with pytest.raises(NCFail, match='fail called'): + self.runner.call_view_method(self.contract_id, 'view_call_other_view', 'view_fail') + self.runner.call_public_method(self.contract_id, 'public_nop', self.create_context()) + + def test_public_call_other_view_fail(self) -> None: + with pytest.raises(NCFail, match='fail called'): + self.runner.call_public_method( + self.contract_id, 'public_call_other_view', self.create_context(), 'view_fail' + ) + self.runner.call_public_method(self.contract_id, 'public_nop', self.create_context()) + + def test_public_call_other_public_fail(self) -> None: + with pytest.raises(NCFail, match='fail called'): + self.runner.call_public_method( + self.contract_id, 'public_call_other_public', self.create_context(), 'public_fail' + ) + self.runner.call_public_method(self.contract_id, 'public_nop', self.create_context()) diff --git a/tests/nanocontracts/test_indexes.py b/tests/nanocontracts/test_indexes.py new file mode 100644 index 000000000..91cd87818 --- /dev/null +++ b/tests/nanocontracts/test_indexes.py @@ -0,0 +1,229 @@ +from typing import Any, Optional + +from hathor.conf import HathorSettings +from hathor.dag_builder.artifacts import DAGArtifacts +from hathor.manager import HathorManager +from hathor.nanocontracts import Blueprint, Context, NCFail, public +from hathor.nanocontracts.catalog import NCBlueprintCatalog +from hathor.nanocontracts.method import Method +from hathor.nanocontracts.types import NCActionType +from hathor.nanocontracts.utils import sign_pycoin +from hathor.simulator.trigger import StopAfterMinimumBalance, StopAfterNMinedBlocks +from hathor.transaction import BaseTransaction, Transaction, TxOutput +from hathor.transaction.headers.nano_header import NanoHeaderAction +from hathor.types import AddressB58 +from tests.dag_builder.builder import TestDAGBuilder +from tests.nanocontracts.blueprints.unittest import BlueprintTestCase +from tests.simulation.base import SimulatorTestCase + +settings = HathorSettings() + + +class MyBlueprint(Blueprint): + counter: int + + @public(allow_deposit=True) + def initialize(self, ctx: Context) -> None: + self.counter = 0 + + @public + def nop(self, ctx: Context) -> None: + self.counter += 1 + + @public + def fail(self, ctx: Context) -> None: + raise NCFail('fail') + + +class BaseIndexesTestCase(BlueprintTestCase, SimulatorTestCase): + __test__ = False + + def setUp(self): + super().setUp() + + self.myblueprint_id = b'x' * 32 + self.catalog = NCBlueprintCatalog({ + self.myblueprint_id: MyBlueprint + }) + self.nc_seqnum = 0 + + self.manager.allow_mining_without_peers() + self.manager.tx_storage.nc_catalog = self.catalog + + self.wallet = self.manager.wallet + + self.miner = self.simulator.create_miner(self.manager, hashpower=100e6) + self.miner.start() + + self.token_uid = b'\0' + trigger = StopAfterMinimumBalance(self.wallet, self.token_uid, 1) + self.assertTrue(self.simulator.run(7200, trigger=trigger)) + self.assertTrue(self.simulator.run(120)) + + def fill_nc_tx( + self, + nc: Transaction, + nc_id: bytes, + nc_method: str, + nc_args: list[Any], + *, + address: Optional[AddressB58] = None, + nc_actions: list[NanoHeaderAction] | None = None, + ) -> None: + method_parser = Method.from_callable(getattr(MyBlueprint, nc_method)) + nc_args_bytes = method_parser.serialize_args_bytes(nc_args) + + if address is None: + address = self.wallet.get_unused_address() + privkey = self.wallet.get_private_key(address) + + from hathor.transaction.headers import NanoHeader + nano_header = NanoHeader( + tx=nc, + nc_seqnum=self.nc_seqnum, + nc_id=nc_id, + nc_method=nc_method, + nc_args_bytes=nc_args_bytes, + nc_address=b'', + nc_script=b'', + nc_actions=nc_actions or [], + ) + nc.headers.append(nano_header) + self.nc_seqnum += 1 + + sign_pycoin(nano_header, privkey) + + def finish_and_broadcast_tx(self, tx: BaseTransaction, confirmations: int = 1) -> None: + tx.timestamp = int(self.manager.reactor.seconds()) + tx.parents = self.manager.get_new_tx_parents() + tx.weight = self.manager.daa.minimum_tx_weight(tx) + + # broadcast + self.manager.cpu_mining_service.resolve(tx) + self.manager.on_new_tx(tx) + trigger = StopAfterNMinedBlocks(self.miner, quantity=confirmations) + self.assertTrue(self.simulator.run(7200, trigger=trigger)) + + def test_tokens_index(self): + token_info0 = self.manager.tx_storage.indexes.tokens.get_token_info(self.token_uid) + new_blocks = 0 + + # Deposits 1 HTR + _inputs, deposit_amount = self.wallet.get_inputs_from_amount(1, self.manager.tx_storage) + tx = self.wallet.prepare_transaction(Transaction, _inputs, []) + self.fill_nc_tx(tx, self.myblueprint_id, 'initialize', [], nc_actions=[ + NanoHeaderAction( + type=NCActionType.DEPOSIT, + token_index=0, + amount=deposit_amount, + ) + ]) + self.finish_and_broadcast_tx(tx, confirmations=2) + new_blocks += 2 + + self.assertIsNotNone(tx.get_metadata().first_block) + self.assertIsNone(tx.get_metadata().voided_by) + nc_id = tx.hash + + token_info1 = self.manager.tx_storage.indexes.tokens.get_token_info(self._settings.HATHOR_TOKEN_UID) + self.assertEqual(token_info0.get_total() + 64_00 * new_blocks, token_info1.get_total()) + + # Withdrawals 1 HTR + tx2 = Transaction(outputs=[TxOutput(1, b'', 0)]) + self.fill_nc_tx(tx2, nc_id, 'nop', [], nc_actions=[ + NanoHeaderAction( + type=NCActionType.WITHDRAWAL, + token_index=0, + amount=1, + ) + ]) + self.finish_and_broadcast_tx(tx2, confirmations=2) + new_blocks += 2 + + token_info1 = self.manager.tx_storage.indexes.tokens.get_token_info(self._settings.HATHOR_TOKEN_UID) + self.assertEqual(token_info0.get_total() + 64_00 * new_blocks, token_info1.get_total()) + + def test_remove_voided_nano_tx_from_parents_1(self): + vertices = self._run_test_remove_voided_nano_tx_from_parents('tx3 < b35') + v = [node.name for node, _ in vertices.list] + self.assertTrue(v.index('tx3') < v.index('b35')) + + def test_remove_voided_nano_tx_from_parents_2(self): + vertices = self._run_test_remove_voided_nano_tx_from_parents('b35 < tx3') + v = [node.name for node, _ in vertices.list] + self.assertTrue(v.index('b35') < v.index('tx3')) + + def _run_test_remove_voided_nano_tx_from_parents(self, order: str) -> DAGArtifacts: + builder = TestDAGBuilder.from_manager(self.manager) + vertices = builder.build_from_str(f''' + blockchain genesis b[0..40] + b0.weight = 50 + + b30 < dummy + + tx1.nc_id = "{self.myblueprint_id.hex()}" + tx1.nc_method = initialize() + tx1.nc_deposit = 10 HTR + tx1.out[0] <<< tx2 + + tx2.nc_id = tx1 + tx2.nc_method = fail() + tx2.out[0] <<< tx3 + + tx3.nc_id = tx1 + tx3.nc_method = nop() + + tx1 <-- tx2 <-- b35 + + {order} + ''') + + for node, vertex in vertices.list: + print() + print(node.name) + print() + self.manager.on_new_tx(vertex) + + tx1 = vertices.by_name['tx1'].vertex + tx2 = vertices.by_name['tx2'].vertex + tx3 = vertices.by_name['tx3'].vertex + b35 = vertices.by_name['b35'].vertex + + meta1 = tx1.get_metadata() + meta2 = tx2.get_metadata() + meta3 = tx3.get_metadata() + + # confirm that b35 belongs to the best blockchain + self.assertIsNone(b35.get_metadata().voided_by) + + # only tx1 and tx2 should be confirmed + self.assertEqual(meta1.first_block, b35.hash) + self.assertEqual(meta2.first_block, b35.hash) + self.assertIsNone(meta3.first_block) + + # tx1 succeeded; tx2 failed so tx3 must be voided + self.assertIsNone(meta1.voided_by) + self.assertEqual(meta2.voided_by, {tx2.hash, self._settings.NC_EXECUTION_FAIL_ID}) + self.assertEqual(meta3.voided_by, {tx2.hash}) + + # check we are not using tx3 as parents for transactions + parent_txs = self.manager.generate_parent_txs(timestamp=None) + self.assertNotIn(tx3.hash, parent_txs.can_include) + self.assertNotIn(tx3.hash, parent_txs.must_include) + + # check we are not using tx3 as parents for blocks + block_templates = self.manager.make_block_templates() + for template in block_templates: + self.assertNotIn(tx3.hash, template.parents) + self.assertNotIn(tx3.hash, template.parents_any) + + return vertices + + +class RocksDBIndexesTestCase(BaseIndexesTestCase): + __test__ = True + + def build_manager(self) -> 'HathorManager': + builder = self.simulator.get_default_builder() + builder.enable_wallet_index() + return self.simulator.create_peer(builder) diff --git a/tests/nanocontracts/test_method_parser.py b/tests/nanocontracts/test_method_parser.py new file mode 100644 index 000000000..ecac69d9a --- /dev/null +++ b/tests/nanocontracts/test_method_parser.py @@ -0,0 +1,318 @@ +import json +from collections.abc import Callable +from typing import Any, Optional, TypeVar + +from hathor.nanocontracts.context import Context +from hathor.nanocontracts.exception import NCSerializationArgTooLong +from hathor.nanocontracts.method import MAX_BYTES_SERIALIZED_ARG, Method +from hathor.nanocontracts.types import SignedData, public +from tests import unittest + +T = TypeVar('T') + + +class MyBlueprint: + @public + def initialize(self, ctx: Context, a: str, b: bytes, c: int, d: bool) -> None: + pass + + @public + def method_str(self, ctx: Context, x: str) -> None: + pass + + @public + def method_bytes(self, ctx: Context, x: bytes) -> None: + pass + + @public + def method_int(self, ctx: Context, x: int) -> None: + pass + + @public + def method_bool(self, ctx: Context, x: bool) -> None: + pass + + @public + def method_signed_str(self, ctx: Context, x: SignedData[str]) -> None: + pass + + @public + def method_with_optional(self, ctx: Context, x: Optional[str]) -> None: + pass + + @public + def method_with_tuple(self, ctx: Context, x: tuple[str, int, int]) -> None: + pass + + +class NCBlueprintTestCase(unittest.TestCase): + def _run_test(self, method: Callable[[Any, T], None], data: T) -> None: + parser = Method.from_callable(method) + self._run_test_parser(parser, data) + + def _run_test_parser(self, method_parser: Method, data: T) -> None: + # Then, check serialization and deserialization. + args_in = (data,) + serialized_args_in = method_parser.serialize_args_bytes(args_in) + args_out = method_parser.deserialize_args_bytes(serialized_args_in) + self.assertEqual(args_in, args_out) + + # Also check that types match (they don't necessarily always match) + self.assertEqual(type(args_in), type(args_out)) + + def test_type_str_wrong_type(self) -> None: + with self.assertRaises(TypeError): + self._run_test(MyBlueprint.method_str, b'') + + def test_type_str_empty(self) -> None: + self._run_test(MyBlueprint.method_str, '') + + def test_type_str_small(self) -> None: + self._run_test(MyBlueprint.method_str, 'a') + + def test_type_str_long(self) -> None: + # there are 3 bytes of overhead when serializing + # 1 byte for the number of arguments in method_bytes + # 2 bytes for the length of the byte sequence that follows (because its length exceeds 63 bytes) + # since utf-8 encoding for 'a' doesn't change it, it works as if it was bytes + overhead = 3 + length = MAX_BYTES_SERIALIZED_ARG - overhead + self._run_test(MyBlueprint.method_str, 'a' * length) + + def test_type_str_too_long(self) -> None: + with self.assertRaises(NCSerializationArgTooLong): + length = MAX_BYTES_SERIALIZED_ARG + 1 + self._run_test(MyBlueprint.method_str, 'a' * length) + + def test_type_str_accents(self) -> None: + self._run_test(MyBlueprint.method_str, 'áéíóú') + + def test_type_bytes_empty(self) -> None: + self._run_test(MyBlueprint.method_bytes, b'') + + def test_type_bytes_small(self) -> None: + self._run_test(MyBlueprint.method_bytes, b'a') + + def test_type_bytes_long(self) -> None: + # there are 3 bytes of overhead when serializing + # 1 byte for the number of arguments in method_bytes + # 2 bytes for the length of the byte sequence that follows (because its length exceeds 63 bytes) + overhead = 3 + length = MAX_BYTES_SERIALIZED_ARG - overhead + self._run_test(MyBlueprint.method_bytes, b'a' * length) + + def test_type_bytes_too_long(self) -> None: + with self.assertRaises(NCSerializationArgTooLong): + length = MAX_BYTES_SERIALIZED_ARG + 1 + self._run_test(MyBlueprint.method_bytes, b'a' * length) + + def test_type_bytes_even_longer(self) -> None: + class Foo: + def bar(self, data: bytes) -> None: + pass + parser = Method.from_callable(Foo.bar) + parser.args._max_bytes = 2**32 # more than long enough to test a single bytes write + max_write_length = 2**16 - 3 + self._run_test_parser(parser, b'a' * max_write_length) # largest valid write + with self.assertRaises(NCSerializationArgTooLong): + self._run_test_parser(parser, b'a' * (max_write_length + 1)) # smallest invalid write + + def test_type_int_negative(self) -> None: + self._run_test(MyBlueprint.method_int, -100) + + def test_type_int_zero(self) -> None: + self._run_test(MyBlueprint.method_int, 0) + + def test_type_int_positive(self) -> None: + self._run_test(MyBlueprint.method_int, 100) + + def test_type_int_too_big(self) -> None: + with self.assertRaises(ValueError): + self._run_test(MyBlueprint.method_int, 2**223) + + def test_type_int_too_small(self) -> None: + with self.assertRaises(ValueError): + self._run_test(MyBlueprint.method_int, -2**223 - 1) + + def test_type_int_wrong_type(self) -> None: + with self.assertRaises(TypeError): + self._run_test(MyBlueprint.method_int, 1.) + + def test_type_int(self) -> None: + class Foo: + def bar(self, i: int) -> None: + pass + + valid_values = [ + 0, + 1, + -1, + 2**31, + -2**31, + # edge valid values for 32 bytes of signed leb128 with 4 bytes + 2**223 - 1, + -2**223, + ] + for valid_value in valid_values: + self._run_test(Foo.bar, valid_value) + + invalid_values = [ + 2**223, + -2**223 - 1, + 2**223 + 1, + 2**224, + -2**223 - 2, + -2**224, + ] + for invalid_value in invalid_values: + with self.assertRaises(ValueError): + self._run_test(Foo.bar, invalid_value) + + def test_type_bool_false(self) -> None: + self._run_test(MyBlueprint.method_bool, False) + + def test_type_bool_true(self) -> None: + self._run_test(MyBlueprint.method_bool, True) + + def test_type_optional_str_none(self) -> None: + self._run_test(MyBlueprint.method_with_optional, None) + + def test_type_optional_str_empty(self) -> None: + self._run_test(MyBlueprint.method_with_optional, '') + + def test_type_optional_str(self) -> None: + self._run_test(MyBlueprint.method_with_optional, 'hathor') + + def test_type_tuple(self) -> None: + self._run_test(MyBlueprint.method_with_tuple, ('x', 1, 2)) + + def test_type_signed_str(self) -> None: + x: SignedData[str] = SignedData[str]('áéíóú', b'here-goes-the-signature') + self._run_test(MyBlueprint.method_signed_str, x) + + def test_basic_types(self) -> None: + parser = Method.from_callable(MyBlueprint.initialize) + + # Then, check serialization and deserialization. + args_in = ('a', b'b', 1, True) + serialized_args_in = parser.serialize_args_bytes(args_in) + args_out = parser.deserialize_args_bytes(serialized_args_in) + self.assertEqual(args_in, args_out) + + def test_arg_parse_str(self) -> None: + parser = Method.from_callable(MyBlueprint.method_str) + + value = 'test' + args_json = json.loads(f'["{value}"]') + parsed_args = parser.args.json_to_value(args_json) + + # test that it parsed back the original value + self.assertEqual(len(parsed_args), 1) + self.assertEqual(parsed_args[0], value) + + # also test that it can generate the same JSON representation + args_json2 = parser.args.value_to_json((value,)) + self.assertEqual(args_json, args_json2) + + def test_arg_parse_bytes(self) -> None: + parser = Method.from_callable(MyBlueprint.method_bytes) + + value = b'\x01' + args_json = json.loads(f'["{value.hex()}"]') + parsed_args = parser.args.json_to_value(args_json) + + # test that it parsed back the original value + self.assertEqual(len(parsed_args), 1) + self.assertEqual(parsed_args[0], value) + + # also test that it can generate the same JSON representation + args_json2 = parser.args.value_to_json((value,)) + self.assertEqual(args_json, args_json2) + + def test_arg_parse_int(self) -> None: + parser = Method.from_callable(MyBlueprint.method_int) + + value = 1 + args_json = json.loads(f'[{value}]') + parsed_args = parser.args.json_to_value(args_json) + + # test that it parsed back the original value + self.assertEqual(len(parsed_args), 1) + self.assertEqual(parsed_args[0], value) + + # also test that it can generate the same JSON representation + args_json2 = parser.args.value_to_json((value,)) + self.assertEqual(args_json, args_json2) + + def test_arg_parse_bool(self) -> None: + parser = Method.from_callable(MyBlueprint.method_bool) + + args_json = json.loads('[false]') + parsed_args = parser.args.json_to_value(args_json) + + # test that it parsed back the original value + self.assertEqual(len(parsed_args), 1) + self.assertEqual(parsed_args[0], False) + + # also test that it can generate the same JSON representation + args_json2 = parser.args.value_to_json((False,)) + self.assertEqual(args_json, args_json2) + + def test_arg_parse_optional_none(self) -> None: + parser = Method.from_callable(MyBlueprint.method_with_optional) + + # If optional is None + args_json = json.loads('[null]') + parsed_args = parser.args.json_to_value(args_json) + + # test that it parsed back the original value + self.assertEqual(len(parsed_args), 1) + self.assertEqual(parsed_args[0], None) + + # also test that it can generate the same JSON representation + args_json2 = parser.args.value_to_json((None,)) + self.assertEqual(args_json, args_json2) + + def test_arg_parse_optional_some(self) -> None: + parser = Method.from_callable(MyBlueprint.method_with_optional) + + # If optional has str value + value = 'test' + args_json = json.loads(f'["{value}"]') + parsed_args = parser.args.json_to_value(args_json) + + # test that it parsed back the original value + self.assertEqual(len(parsed_args), 1) + self.assertEqual(parsed_args[0], value) + + # also test that it can generate the same JSON representation + args_json2 = parser.args.value_to_json(('test',)) + self.assertEqual(args_json, args_json2) + + def test_arg_parse_tuple(self): + parser = Method.from_callable(MyBlueprint.method_with_tuple) + + args_json = json.loads('[["test", 1, 2]]') + parsed_args = parser.args.json_to_value(args_json) + + # test that it parsed back the original value + self.assertEqual(len(parsed_args), 1) + self.assertEqual(parsed_args[0], ('test', 1, 2)) + + # also test that it can generate the same JSON representation + args_json2 = parser.args.value_to_json((('test', 1, 2),)) + self.assertEqual(args_json, args_json2) + + def test_arg_parse_signed_data(self) -> None: + parser = Method.from_callable(MyBlueprint.method_signed_str) + + args_json = json.loads('[["test", "1234"]]') + parsed_args = parser.args.json_to_value(args_json) + + # test that it parsed back the original value + self.assertEqual(len(parsed_args), 1) + self.assertEqual(parsed_args[0], SignedData[str]('test', bytes.fromhex('1234'))) + + # also test that it can generate the same JSON representation + args_json2 = parser.args.value_to_json((SignedData[str]('test', bytes.fromhex('1234')),)) + self.assertEqual(args_json, args_json2) diff --git a/tests/nanocontracts/test_nanocontract.py b/tests/nanocontracts/test_nanocontract.py new file mode 100644 index 000000000..b0034d15e --- /dev/null +++ b/tests/nanocontracts/test_nanocontract.py @@ -0,0 +1,482 @@ +from typing import Any + +import pytest +from cryptography.hazmat.primitives import hashes +from cryptography.hazmat.primitives.asymmetric import ec + +from hathor.crypto.util import ( + decode_address, + get_address_b58_from_bytes, + get_address_from_public_key_bytes, + get_public_key_bytes_compressed, +) +from hathor.nanocontracts.blueprint import Blueprint +from hathor.nanocontracts.catalog import NCBlueprintCatalog +from hathor.nanocontracts.context import Context +from hathor.nanocontracts.exception import NCInvalidSignature +from hathor.nanocontracts.method import Method +from hathor.nanocontracts.nc_types import make_nc_type_for_type +from hathor.nanocontracts.types import ( + NCActionType, + NCDepositAction, + NCWithdrawalAction, + TokenUid, + VertexId, + public, + view, +) +from hathor.nanocontracts.utils import sign_openssl, sign_openssl_multisig +from hathor.transaction import Transaction, TxInput, TxOutput +from hathor.transaction.exceptions import ( + EqualVerifyFailed, + FinalStackInvalid, + InvalidScriptError, + MissingStackItems, + TooManySigOps, +) +from hathor.transaction.headers import NanoHeader, VertexHeaderId +from hathor.transaction.headers.nano_header import NanoHeaderAction +from hathor.transaction.scripts import P2PKH, HathorScript, Opcode +from hathor.transaction.validation_state import ValidationState +from hathor.verification.nano_header_verifier import MAX_NC_SCRIPT_SIGOPS_COUNT, MAX_NC_SCRIPT_SIZE +from hathor.wallet import KeyPair +from tests import unittest + +STR_NC_TYPE = make_nc_type_for_type(str) +INT_NC_TYPE = make_nc_type_for_type(int) + + +class MyBlueprint(Blueprint): + a: str + b: int + + @public + def initialize(self, ctx: Context, a: str, b: int) -> None: + self.a = a + self.b = b + + @public + def inc_b(self, ctx: Context) -> None: + self.b += 1 + + @view + def get_a(self) -> str: + return self.a + + @view + def get_b(self) -> int: + return self.b + + +class NCNanoContractTestCase(unittest.TestCase): + def setUp(self) -> None: + super().setUp() + + self.myblueprint_id = VertexId(b'x' * 32) + self.catalog = NCBlueprintCatalog({ + self.myblueprint_id: MyBlueprint + }) + self.nc_seqnum = 0 + + self.peer = self.create_peer('testnet') + self.peer.tx_storage.nc_catalog = self.catalog + + self.genesis = self.peer.tx_storage.get_all_genesis() + self.genesis_txs = [tx for tx in self.genesis if not tx.is_block] + + def _create_nc( + self, + nc_id: VertexId, + nc_method: str, + nc_args: list[Any], + *, + parents: list[bytes] | None = None, + timestamp: int = 0, + ) -> Transaction: + + if parents is None: + parents = [] + + tx_storage = self.peer.tx_storage + + nc = Transaction(weight=1, inputs=[], outputs=[], parents=parents, storage=tx_storage, timestamp=timestamp) + self._fill_nc(nc, nc_id, nc_method, nc_args) + return nc + + def _fill_nc(self, nc: Transaction, nc_id: VertexId, nc_method: str, nc_args: list[Any]) -> None: + method = getattr(MyBlueprint, nc_method, None) + if method is not None: + method_parser = Method.from_callable(method) + nc_args_bytes = method_parser.serialize_args_bytes(nc_args) + else: + nc_args_bytes = b'' + + key = KeyPair.create(b'123') + privkey = key.get_private_key(b'123') + + nano_header = NanoHeader( + tx=nc, + nc_seqnum=self.nc_seqnum, + nc_id=nc_id, + nc_method=nc_method, + nc_args_bytes=nc_args_bytes, + nc_address=b'', + nc_script=b'', + nc_actions=[], + ) + nc.headers.append(nano_header) + self.nc_seqnum += 1 + + sign_openssl(nano_header, privkey) + self.peer.cpu_mining_service.resolve(nc) + + def _get_nc(self, *, parents: list[bytes] | None = None, timestamp: int = 0) -> Transaction: + return self._create_nc(self.myblueprint_id, 'initialize', ['string', 1], parents=parents, timestamp=timestamp) + + def test_serialization(self) -> None: + nc = self._get_nc() + + nc_bytes = bytes(nc) + nc2 = Transaction.create_from_struct(nc_bytes, verbose=print) + self.assertEqual(nc_bytes, bytes(nc2)) + + nc2 = Transaction.create_from_struct(nc_bytes) + self.assertEqual(nc_bytes, bytes(nc2)) + + nc_header = nc.get_nano_header() + nc2_header = nc2.get_nano_header() + + self.assertEqual(nc_header.nc_seqnum, nc2_header.nc_seqnum) + self.assertEqual(nc_header.nc_id, nc2_header.nc_id) + self.assertEqual(nc_header.nc_method, nc2_header.nc_method) + self.assertEqual(nc_header.nc_args_bytes, nc2_header.nc_args_bytes) + self.assertEqual(nc_header.nc_actions, nc2_header.nc_actions) + self.assertEqual(nc_header.nc_address, nc2_header.nc_address) + self.assertEqual(nc_header.nc_script, nc2_header.nc_script) + + def test_serialization_skip_signature(self) -> None: + nc = self._get_nc() + nano_header = nc.get_nano_header() + sighash_bytes = nano_header.get_sighash_bytes() + deserialized, buf = NanoHeader.deserialize(Transaction(), VertexHeaderId.NANO_HEADER.value + sighash_bytes) + + assert len(buf) == 0 + assert deserialized.nc_seqnum == nano_header.nc_seqnum + assert deserialized.nc_id == nano_header.nc_id + assert deserialized.nc_method == nano_header.nc_method + assert deserialized.nc_args_bytes == nano_header.nc_args_bytes + assert deserialized.nc_actions == nano_header.nc_actions + assert deserialized.nc_address == nano_header.nc_address + assert deserialized.nc_script == b'' + + def test_verify_signature_success(self) -> None: + nc = self._get_nc() + nc.clear_sighash_cache() + self.peer.verification_service.verifiers.nano_header.verify_nc_signature(nc) + + def test_verify_signature_fails_nc_id(self) -> None: + nc = self._get_nc() + nano_header = nc.get_nano_header() + nano_header.nc_id = b'a' * 32 + nc.clear_sighash_cache() + with self.assertRaises(NCInvalidSignature): + self.peer.verification_service.verifiers.nano_header.verify_nc_signature(nc) + + def test_verify_signature_fails_nc_method(self) -> None: + nc = self._get_nc() + nano_header = nc.get_nano_header() + nano_header.nc_method = 'other_nc_method' + nc.clear_sighash_cache() + with self.assertRaises(NCInvalidSignature): + self.peer.verification_service.verifiers.nano_header.verify_nc_signature(nc) + + def test_verify_signature_fails_nc_args_bytes(self) -> None: + nc = self._get_nc() + nano_header = nc.get_nano_header() + nano_header.nc_args_bytes = b'other_nc_args_bytes' + nc.clear_sighash_cache() + with self.assertRaises(NCInvalidSignature): + self.peer.verification_service.verifiers.nano_header.verify_nc_signature(nc) + + def test_verify_signature_fails_invalid_nc_address(self) -> None: + nc = self._get_nc() + nano_header = nc.get_nano_header() + nano_header.nc_address = b'invalid-address' + nc.clear_sighash_cache() + with pytest.raises(NCInvalidSignature, match=f'invalid address: {nano_header.nc_address.hex()}'): + self.peer.verification_service.verifiers.nano_header.verify_nc_signature(nc) + + def test_verify_signature_fails_invalid_nc_script(self) -> None: + nc = self._get_nc() + nano_header = nc.get_nano_header() + nano_header.nc_script = b'invalid-script' + nc.clear_sighash_cache() + with pytest.raises(InvalidScriptError, match='Invalid Opcode'): + self.peer.verification_service.verifiers.nano_header.verify_nc_signature(nc) + + def test_verify_signature_fails_wrong_nc_address(self) -> None: + key = KeyPair.create(b'xyz') + privkey = key.get_private_key(b'xyz') + pubkey = privkey.public_key() + pubkey_bytes = get_public_key_bytes_compressed(pubkey) + + nc = self._get_nc() + nano_header = nc.get_nano_header() + nano_header.nc_address = get_address_from_public_key_bytes(pubkey_bytes) + nc.clear_sighash_cache() + with pytest.raises(NCInvalidSignature) as e: + self.peer.verification_service.verifiers.nano_header.verify_nc_signature(nc) + assert isinstance(e.value.__cause__, EqualVerifyFailed) + + def test_verify_signature_fails_wrong_pubkey(self) -> None: + nc = self._get_nc() + nano_header = nc.get_nano_header() + + key = KeyPair.create(b'xyz') + privkey = key.get_private_key(b'xyz') + pubkey = privkey.public_key() + pubkey_bytes = get_public_key_bytes_compressed(pubkey) + nano_header.nc_address = get_address_from_public_key_bytes(pubkey_bytes) + + nc.clear_sighash_cache() + data = nc.get_sighash_all_data() + signature = privkey.sign(data, ec.ECDSA(hashes.SHA256())) + nano_header.nc_script = P2PKH.create_input_data(public_key_bytes=pubkey_bytes, signature=signature) + + # First, it's passing with the key from above + self.peer.verification_service.verifiers.nano_header.verify_nc_signature(nc) + + # We change the script to use a new pubkey, but with the same signature + key = KeyPair.create(b'wrong') + privkey = key.get_private_key(b'wrong') + pubkey = privkey.public_key() + pubkey_bytes = get_public_key_bytes_compressed(pubkey) + nano_header.nc_script = P2PKH.create_input_data(public_key_bytes=pubkey_bytes, signature=signature) + + with pytest.raises(NCInvalidSignature) as e: + self.peer.verification_service.verifiers.nano_header.verify_nc_signature(nc) + assert isinstance(e.value.__cause__, EqualVerifyFailed) + + def test_verify_signature_fails_wrong_signature(self) -> None: + nc = self._get_nc() + nano_header = nc.get_nano_header() + + key = KeyPair.create(b'xyz') + privkey = key.get_private_key(b'xyz') + pubkey = privkey.public_key() + pubkey_bytes = get_public_key_bytes_compressed(pubkey) + nano_header.nc_address = get_address_from_public_key_bytes(pubkey_bytes) + + nc.clear_sighash_cache() + data = nc.get_sighash_all_data() + signature = privkey.sign(data, ec.ECDSA(hashes.SHA256())) + nano_header.nc_script = P2PKH.create_input_data(public_key_bytes=pubkey_bytes, signature=signature) + + # First, it's passing with the key from above + self.peer.verification_service.verifiers.nano_header.verify_nc_signature(nc) + + # We change the script to use a new signature, but with the same pubkey + key = KeyPair.create(b'wrong') + privkey = key.get_private_key(b'wrong') + signature = privkey.sign(data, ec.ECDSA(hashes.SHA256())) + nano_header.nc_script = P2PKH.create_input_data(public_key_bytes=pubkey_bytes, signature=signature) + + with pytest.raises(NCInvalidSignature) as e: + self.peer.verification_service.verifiers.nano_header.verify_nc_signature(nc) + assert isinstance(e.value.__cause__, FinalStackInvalid) + assert 'Stack left with False value' in e.value.__cause__.args[0] + + def test_verify_signature_fails_nc_script_too_large(self) -> None: + nc = self._get_nc() + nano_header = nc.get_nano_header() + nano_header.nc_script = b'\x00' * (MAX_NC_SCRIPT_SIZE + 1) + + with pytest.raises(NCInvalidSignature, match='nc_script larger than max: 1025 > 1024'): + self.peer.verification_service.verifiers.nano_header.verify_nc_signature(nc) + + def test_verify_signature_fails_nc_script_too_many_sigops(self) -> None: + nc = self._get_nc() + nano_header = nc.get_nano_header() + + script = HathorScript() + for _ in range(MAX_NC_SCRIPT_SIGOPS_COUNT + 1): + script.addOpcode(Opcode.OP_CHECKSIG) + + nano_header.nc_script = script.data + + with pytest.raises(TooManySigOps, match='sigops count greater than max: 21 > 20'): + self.peer.verification_service.verifiers.nano_header.verify_nc_signature(nc) + + def test_verify_signature_multisig(self) -> None: + nc = self._get_nc() + nano_header = nc.get_nano_header() + + keys: list[tuple[ec.EllipticCurvePrivateKey, bytes]] = [] + for i in range(3): + password = i.to_bytes() + key = KeyPair.create(password) + privkey = key.get_private_key(password) + pubkey = privkey.public_key() + pubkey_bytes = get_public_key_bytes_compressed(pubkey) + keys.append((privkey, pubkey_bytes)) + + # 3 keys are accepted + redeem_pubkey_bytes = [x[1] for x in keys] + + # Test fails because requires 2 signatures, but only has 1 + nc.clear_sighash_cache() + sign_openssl_multisig( + nano_header, + required_count=2, + redeem_pubkey_bytes=redeem_pubkey_bytes, + sign_privkeys=[keys[0][0]], + ) + with pytest.raises(NCInvalidSignature) as e: + self.peer.verification_service.verifiers.nano_header.verify_nc_signature(nc) + assert isinstance(e.value.__cause__, MissingStackItems) + assert e.value.__cause__.args[0] == 'OP_CHECKMULTISIG: not enough signatures on the stack' + + # Test fails because requires 1 signature, but used wrong privkey + nc.clear_sighash_cache() + sign_openssl_multisig( + nano_header, + required_count=1, + redeem_pubkey_bytes=redeem_pubkey_bytes, + sign_privkeys=[KeyPair.create(b'invalid').get_private_key(b'invalid')], + ) + with pytest.raises(NCInvalidSignature) as e: + self.peer.verification_service.verifiers.nano_header.verify_nc_signature(nc) + assert isinstance(e.value.__cause__, FinalStackInvalid) + assert 'Stack left with False value' in e.value.__cause__.args[0] + + # Test passes because requires 2 signatures, and signed with 2 correct privkeys + nc.clear_sighash_cache() + sign_openssl_multisig( + nano_header, + required_count=2, + redeem_pubkey_bytes=redeem_pubkey_bytes, + sign_privkeys=[x[0] for x in keys[:2]], + ) + self.peer.verification_service.verifiers.nano_header.verify_nc_signature(nc) + + # Test fails because the address was changed + nc.clear_sighash_cache() + nano_header.nc_address = decode_address(self.peer.wallet.get_unused_address()) + with pytest.raises(NCInvalidSignature) as e: + self.peer.verification_service.verifiers.nano_header.verify_nc_signature(nc) + assert isinstance(e.value.__cause__, EqualVerifyFailed) + + def test_get_related_addresses(self) -> None: + nc = self._get_nc() + nano_header = nc.get_nano_header() + related_addresses = set(nc.get_related_addresses()) + address = get_address_b58_from_bytes(nano_header.nc_address) + self.assertIn(address, related_addresses) + + def create_nano(self) -> Transaction: + parents = [tx.hash for tx in self.genesis_txs] + timestamp = 1 + max(tx.timestamp for tx in self.genesis) + + nc = self._get_nc(parents=parents, timestamp=timestamp) + self.assertTrue(self.peer.on_new_tx(nc)) + return nc + + def test_dag_call_public_method(self) -> None: + nc = self.create_nano() + + parents = [tx.hash for tx in self.genesis_txs] + timestamp = 1 + max(tx.timestamp for tx in self.genesis) + + nc2 = self._create_nc( + nc_id=VertexId(nc.hash), + nc_method='inc_b', + nc_args=[], + parents=parents, + timestamp=timestamp, + ) + self.assertTrue(self.peer.on_new_tx(nc2)) + + def test_get_context(self) -> None: + tx_storage = self.peer.tx_storage + + # Incomplete transaction. It will be used as input of nc2. + outputs = [ + TxOutput(100, b'', 0), # HTR + TxOutput(200, b'', 1), # TOKEN A + TxOutput(300, b'', 2), # TOKEN B + ] + tokens = [b'token-a', b'token-b'] + tx = Transaction(outputs=outputs, tokens=tokens) + tx.parents = [tx.hash for tx in self.genesis_txs] + tx.get_metadata().validation = ValidationState.FULL + tx.update_hash() + tx.init_static_metadata_from_storage(self._settings, tx_storage) + tx_storage.save_transaction(tx) + + # Incomplete nanocontract transaction. + inputs = [ + TxInput(tx.hash, 0, b''), + TxInput(tx.hash, 1, b''), + TxInput(tx.hash, 2, b''), + ] + outputs = [ + TxOutput(10, b'', 0), # HTR + TxOutput(250, b'', 1), # TOKEN A + TxOutput(300, b'', 2), # TOKEN B + ] + nc2 = Transaction( + weight=1, + inputs=inputs, + outputs=outputs, + tokens=tokens, + storage=tx_storage, + ) + nc2.headers.append(NanoHeader( + tx=nc2, + nc_seqnum=0, + nc_id=b'', + nc_method='', + nc_args_bytes=b'', + nc_address=b'\x00' * 25, + nc_script=b'', + nc_actions=[ + NanoHeaderAction( + type=NCActionType.WITHDRAWAL, + token_index=1, + amount=50, + ), + NanoHeaderAction( + type=NCActionType.DEPOSIT, + token_index=0, + amount=90, + ), + ], + )) + nc2.update_hash() + nc2_nano_header = nc2.get_nano_header() + context = nc2_nano_header.get_context() + self.assertEqual(2, len(context.actions)) + + action1 = context.get_single_action(TokenUid(b'token-a')) + assert isinstance(action1, NCWithdrawalAction) + self.assertEqual(action1.amount, 50) + + action2 = context.get_single_action(TokenUid(b'\0')) + assert isinstance(action2, NCDepositAction) + self.assertEqual(action2.amount, 90) + + def _to_frozenset(x: list[dict]) -> set[frozenset]: + return {frozenset(d.items()) for d in x} + + expected_json_actions = [{ + 'type': 'withdrawal', + 'token_uid': b'token-a'.hex(), + 'amount': 50, + }, { + 'type': 'deposit', + 'token_uid': b'\0'.hex(), + 'amount': 90, + }] + data = context.to_json() + json_actions = data['actions'] + self.assertEqual(_to_frozenset(json_actions), _to_frozenset(expected_json_actions)) diff --git a/tests/nanocontracts/test_nc_exec_logs.py b/tests/nanocontracts/test_nc_exec_logs.py new file mode 100644 index 000000000..d54282fe6 --- /dev/null +++ b/tests/nanocontracts/test_nc_exec_logs.py @@ -0,0 +1,499 @@ +# Copyright 2025 Hathor Labs +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from textwrap import dedent +from unittest.mock import ANY + +from hathor.nanocontracts import Blueprint, Context, NCFail, public +from hathor.nanocontracts.nc_exec_logs import ( + NCCallBeginEntry, + NCCallEndEntry, + NCExecEntry, + NCLogConfig, + NCLogEntry, + NCLogLevel, +) +from hathor.nanocontracts.runner import CallType +from hathor.nanocontracts.types import ContractId, NCAction, NCDepositAction, TokenUid, view +from hathor.transaction import Block, Transaction +from hathor.util import not_none +from tests import unittest +from tests.dag_builder.builder import TestDAGBuilder + +MY_BLUEPRINT1_ID: bytes = b'\x11' * 32 +MY_BLUEPRINT2_ID: bytes = b'\x22' * 32 + + +class MyBlueprint1(Blueprint): + @public + def initialize(self, ctx: Context) -> None: + self.log.info('initialize() called on MyBlueprint1') + + @public + def log_levels(self, ctx: Context) -> None: + msg = 'log_levels() called' + self.log.debug(msg, test1=1) + self.log.info(msg, test2=2) + self.log.warn(msg, test3=3) + self.log.error(msg, test4=4) + + @public + def fail(self, ctx: Context) -> None: + self.log.warn('fail() called') + raise NCFail('some fail') + + @public + def value_error(self, ctx: Context) -> None: + self.log.warn('value_error() called') + raise ValueError('some value error') + + @public(allow_deposit=True) + def call_another_public(self, ctx: Context, contract_id: ContractId) -> None: + self.log.debug('call_another_public() called on MyBlueprint1', contract_id=contract_id) + actions: list[NCAction] = [NCDepositAction(token_uid=TokenUid(b'\x00'), amount=5)] + result1 = self.syscall.call_public_method(contract_id, 'sum', actions, 1, 2) + result2 = self.syscall.call_view_method(contract_id, 'hello_world') + self.log.debug('results on MyBlueprint1', result1=result1, result2=result2) + + +class MyBlueprint2(Blueprint): + @public + def initialize(self, ctx: Context) -> None: + self.log.info('initialize() called on MyBlueprint2') + + @public(allow_deposit=True) + def sum(self, ctx: Context, a: int, b: int) -> int: + self.log.debug('sum() called on MyBlueprint2', a=a, b=b) + return a + b + + @view + def hello_world(self) -> str: + self.log.debug('hello_world() called on MyBlueprint2') + return 'hello world' + + +class BaseNCExecLogs(unittest.TestCase): + __test__ = False + + def _get_initialize_entries(self, tx: Transaction) -> list[NCCallBeginEntry | NCLogEntry | NCCallEndEntry]: + assert tx.is_nano_contract() + nano_header = tx.get_nano_header() + assert self.manager.tx_storage.nc_catalog is not None + blueprint_class = self.manager.tx_storage.nc_catalog.blueprints[nano_header.nc_id] + return [ + NCCallBeginEntry.construct( + nc_id=tx.hash, + call_type=CallType.PUBLIC, + method_name='initialize', + timestamp=ANY, + actions=[], + ), + NCLogEntry.construct( + level=NCLogLevel.INFO, + message=f'initialize() called on {blueprint_class.__name__}', + timestamp=ANY, + ), + NCCallEndEntry.construct(timestamp=ANY), + ] + + def _prepare(self, nc_log_config: NCLogConfig = NCLogConfig.ALL) -> None: + settings = self._settings._replace( + REWARD_SPEND_MIN_BLOCKS=1, # to make tests quicker + ) + artifacts = self.get_builder() \ + .set_settings(settings) \ + .set_nc_log_config(nc_log_config) \ + .build() + + self.nc_log_storage = not_none(artifacts.consensus.block_algorithm_factory.nc_log_storage) + self.manager = artifacts.manager + assert self.manager.tx_storage.nc_catalog is not None + self.manager.tx_storage.nc_catalog.blueprints = { + MY_BLUEPRINT1_ID: MyBlueprint1, + MY_BLUEPRINT2_ID: MyBlueprint2, + } + self.dag_builder = TestDAGBuilder.from_manager(self.manager) + + +class TestNCExecLogs(BaseNCExecLogs): + __test__ = True + + def test_config_all(self) -> None: + self._prepare(nc_log_config=NCLogConfig.ALL) + artifacts = self.dag_builder.build_from_str(f""" + blockchain genesis b[1..2] + b1 < dummy + + nc1.nc_id = "{MY_BLUEPRINT1_ID.hex()}" + nc1.nc_method = initialize() + + nc2.nc_id = nc1 + nc2.nc_method = fail() + + nc3.nc_id = nc1 + nc3.nc_method = value_error() + + nc1 <-- nc2 <-- nc3 <-- b2 + """) + artifacts.propagate_with(self.manager) + + nc1, nc2, nc3 = artifacts.get_typed_vertices(['nc1', 'nc2', 'nc3'], Transaction) + b2 = artifacts.get_typed_vertex('b2', Block) + assert nc1.is_nano_contract() + assert nc2.is_nano_contract() + assert nc3.is_nano_contract() + + assert not_none(self.nc_log_storage.get_logs(nc1.hash)).entries == { + b2.hash: [NCExecEntry( + logs=self._get_initialize_entries(nc1), + )], + } + + assert len(not_none(self.nc_log_storage.get_logs(nc2.hash)).entries[b2.hash]) > 0 + assert len(not_none(self.nc_log_storage.get_logs(nc3.hash)).entries[b2.hash]) > 0 + + def test_config_none(self) -> None: + self._prepare(nc_log_config=NCLogConfig.NONE) + artifacts = self.dag_builder.build_from_str(f""" + blockchain genesis b[1..2] + b1 < dummy + + nc1.nc_id = "{MY_BLUEPRINT1_ID.hex()}" + nc1.nc_method = initialize() + + nc2.nc_id = nc1 + nc2.nc_method = fail() + + nc3.nc_id = nc1 + nc3.nc_method = value_error() + + nc1 <-- nc2 <-- nc3 <-- b2 + """) + artifacts.propagate_with(self.manager) + + nc1, nc2, nc3 = artifacts.get_typed_vertices(['nc1', 'nc2', 'nc3'], Transaction) + assert nc1.is_nano_contract() + assert nc2.is_nano_contract() + assert nc3.is_nano_contract() + + assert self.nc_log_storage.get_logs(nc1.hash) is None + assert self.nc_log_storage.get_logs(nc2.hash) is None + assert self.nc_log_storage.get_logs(nc3.hash) is None + + def test_config_failed(self) -> None: + self._prepare(nc_log_config=NCLogConfig.FAILED) + artifacts = self.dag_builder.build_from_str(f""" + blockchain genesis b[1..2] + b1 < dummy + + nc1.nc_id = "{MY_BLUEPRINT1_ID.hex()}" + nc1.nc_method = initialize() + + nc2.nc_id = nc1 + nc2.nc_method = fail() + + nc3.nc_id = nc1 + nc3.nc_method = value_error() + + nc1 <-- nc2 <-- nc3 <-- b2 + """) + artifacts.propagate_with(self.manager) + + nc1, nc2, nc3 = artifacts.get_typed_vertices(['nc1', 'nc2', 'nc3'], Transaction) + b2 = artifacts.get_typed_vertex('b2', Block) + assert nc1.is_nano_contract() + assert nc2.is_nano_contract() + assert nc3.is_nano_contract() + + assert self.nc_log_storage.get_logs(nc1.hash) is None + assert len(not_none(self.nc_log_storage.get_logs(nc2.hash)).entries[b2.hash]) > 0 + assert len(not_none(self.nc_log_storage.get_logs(nc3.hash)).entries[b2.hash]) > 0 + + def test_config_failed_unhandled(self) -> None: + self._prepare(nc_log_config=NCLogConfig.FAILED_UNHANDLED) + artifacts = self.dag_builder.build_from_str(f""" + blockchain genesis b[1..2] + b1 < dummy + + nc1.nc_id = "{MY_BLUEPRINT1_ID.hex()}" + nc1.nc_method = initialize() + + nc2.nc_id = nc1 + nc2.nc_method = fail() + + nc3.nc_id = nc1 + nc3.nc_method = value_error() + + nc1 <-- nc2 <-- nc3 <-- b2 + """) + artifacts.propagate_with(self.manager) + + nc1, nc2, nc3 = artifacts.get_typed_vertices(['nc1', 'nc2', 'nc3'], Transaction) + b2 = artifacts.get_typed_vertex('b2', Block) + assert nc1.is_nano_contract() + assert nc2.is_nano_contract() + assert nc3.is_nano_contract() + + assert self.nc_log_storage.get_logs(nc1.hash) is None + assert self.nc_log_storage.get_logs(nc2.hash) is None + assert len(not_none(self.nc_log_storage.get_logs(nc3.hash)).entries[b2.hash]) > 0 + + def test_log_levels_and_key_values(self) -> None: + self._prepare() + artifacts = self.dag_builder.build_from_str(f""" + blockchain genesis b[1..2] + b1 < dummy + + nc1.nc_id = "{MY_BLUEPRINT1_ID.hex()}" + nc1.nc_method = initialize() + + nc2.nc_id = nc1 + nc2.nc_method = log_levels() + + nc1 <-- nc2 <-- b2 + """) + artifacts.propagate_with(self.manager) + + nc1, nc2 = artifacts.get_typed_vertices(['nc1', 'nc2'], Transaction) + b2 = artifacts.get_typed_vertex('b2', Block) + assert nc1.is_nano_contract() + assert nc2.is_nano_contract() + + assert not_none(self.nc_log_storage.get_logs(nc1.hash)).entries == { + b2.hash: [NCExecEntry( + logs=self._get_initialize_entries(nc1), + )], + } + + assert not_none(self.nc_log_storage.get_logs(nc2.hash)).entries == { + b2.hash: [NCExecEntry( + logs=[ + NCCallBeginEntry.construct( + nc_id=nc1.hash, + call_type=CallType.PUBLIC, + method_name='log_levels', + timestamp=ANY, + actions=[], + ), + NCLogEntry.construct( + level=NCLogLevel.DEBUG, + message='log_levels() called', + key_values=dict(test1='1'), + timestamp=ANY, + ), + NCLogEntry.construct( + level=NCLogLevel.INFO, + message='log_levels() called', + key_values=dict(test2='2'), + timestamp=ANY, + ), + NCLogEntry.construct( + level=NCLogLevel.WARN, + message='log_levels() called', + key_values=dict(test3='3'), + timestamp=ANY, + ), + NCLogEntry.construct( + level=NCLogLevel.ERROR, + message='log_levels() called', + key_values=dict(test4='4'), + timestamp=ANY, + ), + NCCallEndEntry.construct(timestamp=ANY), + ], + )], + } + + # test log level filter + assert not_none(self.nc_log_storage.get_logs(nc2.hash, log_level=NCLogLevel.WARN)).entries == { + b2.hash: [NCExecEntry( + logs=[ + NCLogEntry.construct( + level=NCLogLevel.WARN, + message='log_levels() called', + key_values=dict(test3='3'), + timestamp=ANY, + ), + NCLogEntry.construct( + level=NCLogLevel.ERROR, + message='log_levels() called', + key_values=dict(test4='4'), + timestamp=ANY, + ), + ], + )], + } + + def test_nc_fail(self) -> None: + self._prepare() + artifacts = self.dag_builder.build_from_str(f""" + blockchain genesis b[1..2] + b1 < dummy + + nc1.nc_id = "{MY_BLUEPRINT1_ID.hex()}" + nc1.nc_method = initialize() + + nc2.nc_id = nc1 + nc2.nc_method = fail() + + nc1 <-- nc2 <-- b2 + """) + artifacts.propagate_with(self.manager) + + nc1, nc2 = artifacts.get_typed_vertices(['nc1', 'nc2'], Transaction) + b2 = artifacts.get_typed_vertex('b2', Block) + assert nc1.is_nano_contract() + assert nc2.is_nano_contract() + + assert not_none(self.nc_log_storage.get_logs(nc1.hash)).entries == { + b2.hash: [NCExecEntry( + logs=self._get_initialize_entries(nc1), + )], + } + + result = not_none(self.nc_log_storage.get_logs(nc2.hash)) + assert result.entries == { + b2.hash: [NCExecEntry.construct( + error_traceback=ANY, + logs=[ + NCCallBeginEntry.construct( + nc_id=nc1.hash, + call_type=CallType.PUBLIC, + method_name='fail', + timestamp=ANY, + actions=[], + ), + NCLogEntry.construct(level=NCLogLevel.WARN, message='fail() called', timestamp=ANY), + ], + )], + } + + error_tb = result.entries[b2.hash][0].error_traceback + assert error_tb is not None + assert error_tb.startswith('Traceback (most recent call last):') + assert error_tb.endswith('hathor.nanocontracts.exception.NCFail: some fail\n') + + def test_value_error(self) -> None: + self._prepare() + artifacts = self.dag_builder.build_from_str(f""" + blockchain genesis b[1..2] + b1 < dummy + + nc1.nc_id = "{MY_BLUEPRINT1_ID.hex()}" + nc1.nc_method = initialize() + + nc2.nc_id = nc1 + nc2.nc_method = value_error() + + nc1 <-- nc2 <-- b2 + """) + artifacts.propagate_with(self.manager) + + nc1, nc2 = artifacts.get_typed_vertices(['nc1', 'nc2'], Transaction) + b2 = artifacts.get_typed_vertex('b2', Block) + assert nc1.is_nano_contract() + assert nc2.is_nano_contract() + + assert not_none(self.nc_log_storage.get_logs(nc1.hash)).entries == { + b2.hash: [NCExecEntry( + logs=self._get_initialize_entries(nc1), + )], + } + + result = not_none(self.nc_log_storage.get_logs(nc2.hash)) + assert result.entries == { + b2.hash: [NCExecEntry.construct( + error_traceback=ANY, + logs=[ + NCCallBeginEntry.construct( + nc_id=nc1.hash, + call_type=CallType.PUBLIC, + method_name='value_error', + timestamp=ANY, + actions=[], + ), + NCLogEntry.construct(level=NCLogLevel.WARN, message='value_error() called', timestamp=ANY), + ], + )], + } + + error_tb = result.entries[b2.hash][0].error_traceback + assert error_tb is not None + assert error_tb.startswith('Traceback (most recent call last):') + assert dedent(""" + ValueError: some value error\n + The above exception was the direct cause of the following exception:\n + Traceback (most recent call last): + """) in error_tb + assert error_tb.endswith('hathor.nanocontracts.exception.NCFail\n') + + def test_reexecution_on_reorgs(self) -> None: + self._prepare() + artifacts = self.dag_builder.build_from_str(f""" + blockchain genesis b[1..4] + blockchain b1 a[2..3] + b1 < dummy + b2 < a2 < a3 < b3 < b4 + + nc1.nc_id = "{MY_BLUEPRINT1_ID.hex()}" + nc1.nc_method = initialize() + + nc1 <-- b2 + nc1 <-- a2 + """) + + nc1 = artifacts.get_typed_vertex('nc1', Transaction) + b2, a2 = artifacts.get_typed_vertices(['b2', 'a2'], Block) + assert nc1.is_nano_contract() + + # 2 reorgs happen, so nc1.initialize() gets executed 3 times, once in block a2 and twice in block b2 + artifacts.propagate_with(self.manager, up_to='b2') + assert nc1.get_metadata().first_block == b2.hash + assert b2.get_metadata().voided_by is None + assert not_none(self.nc_log_storage.get_logs(nc1.hash)).entries == { + b2.hash: [NCExecEntry( + logs=self._get_initialize_entries(nc1), + )], + } + + artifacts.propagate_with(self.manager, up_to='a3') + assert nc1.get_metadata().first_block == a2.hash + assert b2.get_metadata().voided_by == {b2.hash} + assert a2.get_metadata().voided_by is None + assert not_none(self.nc_log_storage.get_logs(nc1.hash)).entries == { + b2.hash: [NCExecEntry( + logs=self._get_initialize_entries(nc1), + )], + a2.hash: [NCExecEntry( + logs=self._get_initialize_entries(nc1), + )], + } + + artifacts.propagate_with(self.manager) + assert nc1.get_metadata().first_block == b2.hash + assert b2.get_metadata().voided_by is None + assert a2.get_metadata().voided_by == {a2.hash} + assert not_none(self.nc_log_storage.get_logs(nc1.hash)).entries == { + b2.hash: [ + NCExecEntry( + logs=self._get_initialize_entries(nc1), + ), + NCExecEntry( + logs=self._get_initialize_entries(nc1), + ), + ], + a2.hash: [NCExecEntry( + logs=self._get_initialize_entries(nc1), + )], + } diff --git a/tests/nanocontracts/test_patricia_trie.py b/tests/nanocontracts/test_patricia_trie.py new file mode 100644 index 000000000..a6b10049d --- /dev/null +++ b/tests/nanocontracts/test_patricia_trie.py @@ -0,0 +1,232 @@ +import hashlib +import tempfile +from math import log +from typing import Optional + +from hathor.nanocontracts.storage.backends import MemoryNodeTrieStore, RocksDBNodeTrieStore +from hathor.nanocontracts.storage.patricia_trie import Node, PatriciaTrie +from hathor.storage.rocksdb_storage import RocksDBStorage +from tests import unittest + + +def export_trie_outline(trie: PatriciaTrie, *, node: Optional[Node] = None) -> tuple[bytes, Optional[bytes], dict]: + """Return the tree outline for testing purposes. + + The returned format is (key, value, list[children]) where each child has the same format. + """ + if node is None: + node = trie.root + + d = {} + for k, child_id in node.children.items(): + child = trie.get_node(child_id) + d[trie._decode_key(k)] = export_trie_outline(trie, node=child) + return (trie._decode_key(node.key), node.content, d) + + +class PatriciaTrieTestCase(unittest.TestCase): + __test__ = False + + def create_trie(self) -> PatriciaTrie: + raise NotImplementedError + + def test_empty_key(self) -> None: + trie = self.create_trie() + with self.assertRaises(KeyError): + trie.get(b'') + + def test_empty_trie(self) -> None: + trie = self.create_trie() + with self.assertRaises(KeyError): + trie.get(b'my-key') + + def test_single_key(self) -> None: + trie = self.create_trie() + key = b'my-key' + + with self.assertRaises(KeyError): + trie.get(key) + + trie.update(key, b'1') + trie.commit() + self.assertEqual(trie.get(key), b'1') + root1_id = trie.root.id + + trie.update(key, b'1') + trie.commit() + self.assertEqual(trie.get(key), b'1') + self.assertEqual(root1_id, trie.root.id) + + trie.update(key, b'2') + trie.commit() + self.assertEqual(trie.get(key), b'2') + + self.assertNotEqual(root1_id, trie.root.id) + self.assertEqual(trie.get(key, root_id=root1_id), b'1') + + n_nodes = len(trie._db) + trie.update(key, b'1') + trie.commit() + self.assertEqual(trie.get(key), b'1') + self.assertEqual(root1_id, trie.root.id) + self.assertEqual(n_nodes, len(trie._db)) + + trie.print_dfs() + + self.assertEqual( + export_trie_outline(trie), + (b'', None, { + key: (key, b'1', {}), + }) + ) + + def test_independent_keys(self) -> None: + trie = self.create_trie() + + key1 = b'\x00abcde' + key2 = b'\x10fghijklmn' + + trie.update(key1, b'1') + trie.commit() + self.assertEqual(trie.get(key1), b'1') + + trie.update(key2, b'2') + trie.commit() + self.assertEqual(trie.get(key2), b'2') + + self.assertEqual(len(trie.root.children), 2) + trie.print_dfs() + + self.assertEqual( + export_trie_outline(trie), + (b'', None, { + key1: (key1, b'1', {}), + key2: (key2, b'2', {}), + }) + ) + + def test_simple_chain(self) -> None: + trie = self.create_trie() + + data = { + b'a': b'1', + b'abcd': b'2', + b'ab': b'3', + b'abcdefg': b'4', + b'abcdefh': b'5', + } + for k, v in data.items(): + trie.update(k, v) + # print('!! UPDATE', k) + # print() + # trie.print_dfs() + # print() + # print() + # print() + # print() + + for k, v in data.items(): + self.assertEqual(trie.get(k), v) + trie.commit() + + self.assertEqual( + export_trie_outline(trie), + (b'', None, { + b'a': (b'a', b'1', { + b'b': (b'ab', b'3', { + b'cd': (b'abcd', b'2', { + b'ef`': (b'abcdef`', None, { + b'p': (b'abcdefg', b'4', {}), + b'\x80': (b'abcdefh', b'5', {}), + }), + }), + }), + }), + }) + ) + + def test_random_data(self) -> None: + trie = self.create_trie() + + data = {} + for v_int in range(20_000): + v = str(v_int).encode('ascii') + k = hashlib.sha1(v).digest() + data[k] = v + trie.update(k, v) + + for k, v in data.items(): + self.assertEqual(trie.get(k), v) + trie.commit() + + max_children = max(len(x.children) for x, _, _ in trie.iter_dfs()) + max_height = max(h for _, h, _ in trie.iter_dfs()) + + print('max_children', max_children) + print('max_height', max_height) + print('n_nodes', len(trie._db)) + + self.assertLessEqual(max_children, 16) + self.assertLessEqual(max_height, 2*log(len(data), 16)) + + def test_commit(self) -> None: + trie = self.create_trie() + + data = {} + for v_int in range(20_000): + v = str(v_int).encode('ascii') + k = hashlib.sha1(v).digest() + data[k] = v + trie.update(k, v) + trie.commit() + root1_id = trie.root.id + + key1, value1 = next(iter(data.items())) + trie.update(key1, value1 + b'1') + self.assertTrue(trie.is_dirty()) + trie.commit() + self.assertFalse(trie.is_dirty()) + root2_id = trie.root.id + + self.assertNotEqual(root1_id, root2_id) + self.assertEqual(trie.get(key1, root_id=root1_id), value1) + self.assertEqual(trie.get(key1, root_id=root2_id), value1 + b'1') + + def test_multiple_keys_same_value(self) -> None: + trie = self.create_trie() + data = { + b'a': b'1', + b'abcd': b'1', + b'ab': b'1', + b'abcdefg': b'1', + b'abcdefh': b'1', + b'\x01xyz': b'1', + } + for k, v in data.items(): + trie.update(k, v) + trie.commit() + + for k, v in data.items(): + self.assertEqual(trie.get(k), v) + + +class MemoryPatriciaTrieTest(PatriciaTrieTestCase): + __test__ = True + + def create_trie(self) -> PatriciaTrie: + store = MemoryNodeTrieStore() + return PatriciaTrie(store) + + +class RocksDBPatriciaTrieTest(PatriciaTrieTestCase): + __test__ = True + + def setUp(self) -> None: + super().setUp() + directory = tempfile.mkdtemp() + self.tmpdirs.append(directory) + self.rocksdb_storage = RocksDBStorage(path=directory) + + def create_trie(self) -> PatriciaTrie: + store = RocksDBNodeTrieStore(self.rocksdb_storage) + return PatriciaTrie(store) diff --git a/tests/nanocontracts/test_rng.py b/tests/nanocontracts/test_rng.py new file mode 100644 index 000000000..ded6758f8 --- /dev/null +++ b/tests/nanocontracts/test_rng.py @@ -0,0 +1,261 @@ +from math import floor, sqrt + +from hathor.conf import HathorSettings +from hathor.nanocontracts import Blueprint, Context, public +from hathor.nanocontracts.catalog import NCBlueprintCatalog +from hathor.nanocontracts.exception import NCFail +from hathor.nanocontracts.rng import NanoRNG +from hathor.transaction import Transaction +from tests.dag_builder.builder import TestDAGBuilder +from tests.simulation.base import SimulatorTestCase + +settings = HathorSettings() + + +class MyBlueprint(Blueprint): + @public + def initialize(self, ctx: Context) -> None: + pass + + @public + def nop(self, ctx: Context) -> None: + x = self.syscall.rng.random() + if x < 0.5: + raise NCFail('bad luck') + + +class NCConsensusTestCase(SimulatorTestCase): + __test__ = True + + def setUp(self): + super().setUp() + + self.myblueprint_id = b'x' * 32 + self.catalog = NCBlueprintCatalog({ + self.myblueprint_id: MyBlueprint + }) + + self.manager = self.simulator.create_peer() + self.manager.tx_storage.nc_catalog = self.catalog + + self.wallet = self.manager.wallet + + def test_rng_consistency(self) -> None: + seed = self.rng.randbytes(32) + n = 100_000 + + rng1 = NanoRNG(seed=seed) + v1 = [rng1.randbits(32) for _ in range(n)] + for _ in range(10): + rng2 = NanoRNG(seed=seed) + v2 = [rng2.randbits(32) for _ in range(n)] + assert v1 == v2 + + def assertGoodnessOfFitTest(self, observed: list[int], expected: list[int]) -> None: + """Pearson chi-square goodness-of-fit test for uniform [0, 1)""" + assert len(observed) == len(expected) + size = len(expected) + N = sum(expected) + assert N == sum(observed) + + # chi2 = sum((observed[k] - expected[k])**2 / expected[k] for k in range(size)) + # After some algebra, the equation above turns out to be: + # chi2 = sum(observed[k]**2 / expected[k] for k in range(size)) - N + df = 0 + chi2 = 0. + for k in range(size): + if expected[k] == 0: + assert observed[k] == 0 + else: + chi2 += observed[k]**2 / expected[k] + df += 1 + chi2 -= N + df -= 1 + + # assumption so we can approximate the chi2 distribution by a normal distribution + # with mean df and variance 2*df. + assert df >= 30 + + z_score = (chi2 - df) / sqrt(2 * df) + L = 3 + + # The probability of -L < z_score < L is: phi(L) - phi(-L) + # where phi(x) is the cdf of the standard normal distribution + # For L = 3, it is 99.73%. + # In other words, this assert should pass 99.73% of the runs. + assert -L < z_score < L + + def test_rng_randbits(self) -> None: + seed = self.rng.randbytes(32) + rng = NanoRNG(seed=seed) + + size = 4096 # keep it a power of 2 + expected = 100 + frequencies = [0] * size + for _ in range(expected * size): + idx = rng.randbits(32) % size + frequencies[idx] += 1 + + self.assertGoodnessOfFitTest(frequencies, [expected] * size) + + def test_rng_randbelow(self) -> None: + seed = self.rng.randbytes(32) + rng = NanoRNG(seed=seed) + + size = 10_000 + expected = 100 + frequencies = [0] * size + for _ in range(expected * size): + idx = rng.randbelow(size) + frequencies[idx] += 1 + + self.assertGoodnessOfFitTest(frequencies, [expected] * size) + + def test_rng_randint(self) -> None: + seed = self.rng.randbytes(32) + rng = NanoRNG(seed=seed) + + size = 10_000 + expected = 100 + frequencies = [0] * size + + a = 150_000 + b = a + size - 1 + for _ in range(expected * size): + idx = rng.randint(a, b) - a + frequencies[idx] += 1 + + self.assertGoodnessOfFitTest(frequencies, [expected] * size) + + def test_rng_choice(self) -> None: + seed = self.rng.randbytes(32) + rng = NanoRNG(seed=seed) + + size = 10_000 + expected = 100 + frequencies = [0] * size + + v = list(range(size)) + for _ in range(expected * size): + idx = rng.choice(v) + frequencies[idx] += 1 + + self.assertGoodnessOfFitTest(frequencies, [expected] * size) + + def test_rng_randrange_small(self) -> None: + seed = self.rng.randbytes(32) + rng = NanoRNG(seed=seed) + + size = 10_000 + expected_per_bin = 500 + frequencies = [0] * size + + start = 15 + stop = size + step = 7 + + valid = set(range(start, stop, step)) + expected = [expected_per_bin if idx in valid else 0 for idx in range(size)] + + for _ in range(expected_per_bin * len(valid)): + idx = rng.randrange(start, stop, step) + assert idx in valid + frequencies[idx] += 1 + + self.assertGoodnessOfFitTest(frequencies, expected) + + def test_rng_randrange_large(self) -> None: + seed = self.rng.randbytes(32) + rng = NanoRNG(seed=seed) + + size = 1007 + expected = 1000 + frequencies = [0] * size + + start = 15_000_000 + stop = 20_000_000_000 + step = (stop - start + size - 1) // size + + for _ in range(expected * size): + x = rng.randrange(start, stop, step) + assert (x - start) % step == 0 + idx = (x - start) // step + frequencies[idx] += 1 + + self.assertGoodnessOfFitTest(frequencies, [expected] * size) + + def test_rng_random(self) -> None: + seed = self.rng.randbytes(32) + rng = NanoRNG(seed=seed) + + size = 200 + expected = 1000 + frequencies = [0] * size + for _ in range(expected * size): + x = rng.random() + assert 0 <= x < 1 + idx = floor(size * x) + frequencies[idx] += 1 + + self.assertGoodnessOfFitTest(frequencies, [expected] * size) + + def test_simple_rng(self) -> None: + dag_builder = TestDAGBuilder.from_manager(self.manager) + + n = 250 + nc_calls_parts = [] + for i in range(2, n + 2): + nc_calls_parts.append(f''' + nc{i}.nc_id = nc1 + nc{i}.nc_method = nop() + nc{i} --> nc{i-1} + ''') + nc_calls = ''.join(nc_calls_parts) + + artifacts = dag_builder.build_from_str(f''' + blockchain genesis b[1..33] + b30 < dummy + + nc1.nc_id = "{self.myblueprint_id.hex()}" + nc1.nc_method = initialize() + + {nc_calls} + + nc{n+1} <-- b32 + ''') + + for node, vertex in artifacts.list: + assert self.manager.on_new_tx(vertex) + + nc1, = artifacts.get_typed_vertices(['nc1'], Transaction) + assert nc1.is_nano_contract() + assert nc1.get_metadata().voided_by is None + + names = [f'nc{i}' for i in range(2, n + 2)] + vertices = artifacts.get_typed_vertices(names, Transaction) + + success = 0 + fail = 0 + for v in vertices: + assert v.is_nano_contract() + if v.get_metadata().voided_by is None: + success += 1 + else: + fail += 1 + self.assertEqual(n, fail + success) + + p = 0.5 + ratio = success / n + + # success ~ Binomial(n=250, p=0.5) + # For n large, Binomial(n, p) ~ N(n*p, n*p*(1-p)) + # So, ratio ~ N(p, p*(1-p)/n) + + z_score = (ratio - p) / (p * (1 - p) / n)**0.5 + L = 3 + + # The probability of -L < z_score < L is: phi(L) - phi(-L) + # where phi(x) is the cdf of the standard normal distribution + # For L = 3, it is 99.73%. + # In other words, this assert should pass 99.73% of the runs. + assert -L < z_score < L diff --git a/tests/nanocontracts/test_seqnum.py b/tests/nanocontracts/test_seqnum.py new file mode 100644 index 000000000..d0d19513c --- /dev/null +++ b/tests/nanocontracts/test_seqnum.py @@ -0,0 +1,287 @@ +from hathor.nanocontracts import Blueprint, Context, public +from hathor.nanocontracts.exception import NCFail +from hathor.transaction import Block, Transaction +from hathor.transaction.nc_execution_state import NCExecutionState +from tests.dag_builder.builder import TestDAGBuilder +from tests.nanocontracts.blueprints.unittest import BlueprintTestCase +from tests.nanocontracts.utils import assert_nc_failure_reason + + +class MyBlueprint1(Blueprint): + @public + def initialize(self, ctx: Context) -> None: + pass + + @public + def nop(self, ctx: Context) -> None: + pass + + @public + def fail(self, ctx: Context) -> None: + raise NCFail('oops') + + +class NCBlueprintTestCase(BlueprintTestCase): + def setUp(self): + super().setUp() + self.blueprint1_id = self.gen_random_blueprint_id() + self.register_blueprint_class(self.blueprint1_id, MyBlueprint1) + + def test_seqnum_fail_after_success(self) -> None: + """tx2 will successfully execute, so tx3 will fail because it has the same seqnum.""" + dag_builder = TestDAGBuilder.from_manager(self.manager) + artifacts = dag_builder.build_from_str(f''' + blockchain genesis b[1..34] + b30 < dummy + + nc1.nc_id = "{self.blueprint1_id.hex()}" + nc1.nc_method = initialize() + + tx2.nc_id = nc1 + tx2.nc_method = nop() + tx2.nc_address = wallet1 + tx2.nc_seqnum = 0 + + tx3.nc_id = nc1 + tx3.nc_method = nop() + tx3.nc_address = wallet1 + tx3.nc_seqnum = 0 + tx3 --> tx2 + + nc1 <-- b31 + tx3 <-- b31 + ''') + + artifacts.propagate_with(self.manager) + + nc1, tx2, tx3 = artifacts.get_typed_vertices(['nc1', 'tx2', 'tx3'], Transaction) + b31 = artifacts.get_typed_vertex('b31', Block) + + assert nc1.get_metadata().voided_by is None + assert tx2.get_metadata().voided_by is None + assert tx3.get_metadata().voided_by == {tx3.hash, self._settings.NC_EXECUTION_FAIL_ID} + assert nc1.get_metadata().nc_execution is NCExecutionState.SUCCESS + assert tx2.get_metadata().nc_execution is NCExecutionState.SUCCESS + assert tx3.get_metadata().nc_execution is NCExecutionState.FAILURE + + tx2_nano_header = tx2.get_nano_header() + tx3_nano_header = tx3.get_nano_header() + + assert tx2_nano_header.nc_address == tx3_nano_header.nc_address + assert tx2_nano_header.nc_seqnum == tx3_nano_header.nc_seqnum + + assert_nc_failure_reason( + manager=self.manager, + tx_id=tx3.hash, + block_id=b31.hash, + reason='NCFail: invalid seqnum' + ) + + def test_seqnum_fail_after_fail(self) -> None: + """tx2 will fail execution but it should increase the seqnum anyways. + So tx3 will fail because it has the same seqnum.""" + dag_builder = TestDAGBuilder.from_manager(self.manager) + artifacts = dag_builder.build_from_str(f''' + blockchain genesis b[1..34] + b30 < dummy + + nc1.nc_id = "{self.blueprint1_id.hex()}" + nc1.nc_method = initialize() + + tx2.nc_id = nc1 + tx2.nc_method = fail() + tx2.nc_address = wallet1 + tx2.nc_seqnum = 0 + + tx3.nc_id = nc1 + tx3.nc_method = nop() + tx3.nc_address = wallet1 + tx3.nc_seqnum = 0 + tx3 --> tx2 + + nc1 <-- b31 + tx3 <-- b31 + ''') + + artifacts.propagate_with(self.manager) + + nc1, tx2, tx3 = artifacts.get_typed_vertices(['nc1', 'tx2', 'tx3'], Transaction) + b31 = artifacts.get_typed_vertex('b31', Block) + + assert nc1.get_metadata().voided_by is None + assert tx2.get_metadata().voided_by == {tx2.hash, self._settings.NC_EXECUTION_FAIL_ID} + assert tx3.get_metadata().voided_by == {tx3.hash, self._settings.NC_EXECUTION_FAIL_ID} + assert nc1.get_metadata().nc_execution is NCExecutionState.SUCCESS + assert tx2.get_metadata().nc_execution is NCExecutionState.FAILURE + assert tx3.get_metadata().nc_execution is NCExecutionState.FAILURE + + assert_nc_failure_reason( + manager=self.manager, + tx_id=tx2.hash, + block_id=b31.hash, + reason='NCFail: oops' + ) + + tx2_nano_header = tx2.get_nano_header() + tx3_nano_header = tx3.get_nano_header() + + assert tx2_nano_header.nc_address == tx3_nano_header.nc_address + assert tx2_nano_header.nc_seqnum == tx3_nano_header.nc_seqnum + + assert_nc_failure_reason( + manager=self.manager, + tx_id=tx3.hash, + block_id=b31.hash, + reason='NCFail: invalid seqnum' + ) + + def test_seqnum_fail_after_skip(self) -> None: + """tx2 will skip execution but it should increase the seqnum anyways. + So tx3 will fail because it has the same seqnum.""" + dag_builder = TestDAGBuilder.from_manager(self.manager) + artifacts = dag_builder.build_from_str(f''' + blockchain genesis b[1..34] + b30 < dummy + + nc1.nc_id = "{self.blueprint1_id.hex()}" + nc1.nc_method = initialize() + + tx1.nc_id = nc1 + tx1.nc_method = fail() + tx1.out[0] <<< tx2 + + tx2.nc_id = nc1 + tx2.nc_method = nop() + tx2.nc_address = wallet1 + tx2.nc_seqnum = 0 + + tx3.nc_id = nc1 + tx3.nc_method = nop() + tx3.nc_address = wallet1 + tx3.nc_seqnum = 0 + tx3 --> tx2 + + nc1 <-- b31 + tx3 <-- b31 + ''') + + artifacts.propagate_with(self.manager) + + nc1, tx1, tx2, tx3 = artifacts.get_typed_vertices(['nc1', 'tx1', 'tx2', 'tx3'], Transaction) + b31 = artifacts.get_typed_vertex('b31', Block) + + assert nc1.get_metadata().voided_by is None + assert tx1.get_metadata().voided_by == {tx1.hash, self._settings.NC_EXECUTION_FAIL_ID} + assert tx2.get_metadata().voided_by == {tx1.hash} + assert tx3.get_metadata().voided_by == {tx3.hash, self._settings.NC_EXECUTION_FAIL_ID} + assert nc1.get_metadata().nc_execution is NCExecutionState.SUCCESS + assert tx1.get_metadata().nc_execution is NCExecutionState.FAILURE + assert tx2.get_metadata().nc_execution is NCExecutionState.SKIPPED + assert tx3.get_metadata().nc_execution is NCExecutionState.FAILURE + + assert_nc_failure_reason( + manager=self.manager, + tx_id=tx1.hash, + block_id=b31.hash, + reason='NCFail: oops' + ) + + tx2_nano_header = tx2.get_nano_header() + tx3_nano_header = tx3.get_nano_header() + + assert tx2_nano_header.nc_address == tx3_nano_header.nc_address + assert tx2_nano_header.nc_seqnum == tx3_nano_header.nc_seqnum + + assert_nc_failure_reason( + manager=self.manager, + tx_id=tx3.hash, + block_id=b31.hash, + reason='NCFail: invalid seqnum' + ) + + def test_invalid_block(self) -> None: + dag_builder = TestDAGBuilder.from_manager(self.manager) + artifacts = dag_builder.build_from_str(f''' + blockchain genesis b[1..34] + b30 < dummy + + nc1.nc_id = "{self.blueprint1_id.hex()}" + nc1.nc_method = initialize() + + tx2.nc_id = nc1 + tx2.nc_method = nop() + tx2.nc_address = wallet1 + tx2.nc_seqnum = 0 + + tx3.nc_id = nc1 + tx3.nc_method = nop() + tx3.nc_address = wallet1 + tx3.nc_seqnum = 0 + tx3 --> tx2 + + tx4.nc_id = nc1 + tx4.nc_method = nop() + tx4.nc_address = wallet1 + tx4.nc_seqnum = 1 + tx4 --> tx3 + + tx5.nc_id = nc1 + tx5.nc_method = nop() + tx5.nc_address = wallet1 + tx5.nc_seqnum = 12 + tx5 --> tx4 + + tx6.nc_id = nc1 + tx6.nc_method = nop() + tx6.nc_address = wallet1 + tx6.nc_seqnum = 11 + tx6 --> tx5 + + nc1 <-- b31 + tx6 <-- b32 + ''') + + artifacts.propagate_with(self.manager) + + nc1, tx2, tx3 = artifacts.get_typed_vertices(['nc1', 'tx2', 'tx3'], Transaction) + tx4, tx5, tx6 = artifacts.get_typed_vertices(['tx4', 'tx5', 'tx6'], Transaction) + b32 = artifacts.get_typed_vertex('b32', Block) + + assert nc1.get_metadata().voided_by is None + assert nc1.get_metadata().nc_execution is NCExecutionState.SUCCESS + + assert tx2.get_metadata().voided_by is None + assert tx2.get_metadata().nc_execution is NCExecutionState.SUCCESS + + assert tx3.get_metadata().voided_by == {tx3.hash, self._settings.NC_EXECUTION_FAIL_ID} + assert tx3.get_metadata().nc_execution is NCExecutionState.FAILURE + + assert tx4.get_metadata().voided_by is None + assert tx4.get_metadata().nc_execution is NCExecutionState.SUCCESS + + assert tx5.get_metadata().voided_by == {tx5.hash, self._settings.NC_EXECUTION_FAIL_ID} + assert tx5.get_metadata().nc_execution is NCExecutionState.FAILURE + + assert tx6.get_metadata().voided_by is None + assert tx6.get_metadata().nc_execution is NCExecutionState.SUCCESS + + assert b32.get_metadata().voided_by is None + + tx2_nano_header = tx2.get_nano_header() + tx3_nano_header = tx3.get_nano_header() + + assert tx2_nano_header.nc_address == tx3_nano_header.nc_address + assert tx2_nano_header.nc_seqnum == tx3_nano_header.nc_seqnum + + assert_nc_failure_reason( + manager=self.manager, + tx_id=tx3.hash, + block_id=b32.hash, + reason='NCFail: invalid seqnum (diff=0)' + ) + assert_nc_failure_reason( + manager=self.manager, + tx_id=tx5.hash, + block_id=b32.hash, + reason='NCFail: invalid seqnum (diff=11)' + ) diff --git a/tests/nanocontracts/test_serializer.py b/tests/nanocontracts/test_serializer.py new file mode 100644 index 000000000..654c546f4 --- /dev/null +++ b/tests/nanocontracts/test_serializer.py @@ -0,0 +1,118 @@ +from typing import Optional, TypeVar + +from hathor.nanocontracts.nc_types import NCType, make_nc_type_for_type +from hathor.nanocontracts.types import SignedData +from tests import unittest + +T = TypeVar('T') + + +class NCSerializerTestCase(unittest.TestCase): + def _run_test(self, type_: type[T], result: T) -> None: + nc_type = make_nc_type_for_type(type_) + result_bytes = nc_type.to_bytes(result) + result2: T = nc_type.from_bytes(result_bytes) + self.assertEqual(result, result2) + + def _run_test_signed(self, type_: type[T], result: T) -> None: + from hathor.wallet import KeyPair + + nc_type = make_nc_type_for_type(type_) + result_bytes = nc_type.to_bytes(result) + result2: T = nc_type.from_bytes(result_bytes) + self.assertEqual(result, result2) + + # Oracle's private key. + key = KeyPair.create(b'my-key') + script_input = key.p2pkh_create_input_data(b'my-key', result_bytes) + # XXX: ignoring valid-type because type_ can and must be used with SignedData + signed_result: SignedData[T] = SignedData[type_](result, script_input) # type: ignore[valid-type] + signeddata_nc_type = make_nc_type_for_type(SignedData[type_]) # type: ignore[valid-type] + serialized_bytes = signeddata_nc_type.to_bytes(signed_result) + signed_result2: SignedData[T] = signeddata_nc_type.from_bytes(serialized_bytes) + self.assertEqual(signed_result.data, signed_result2.data) + self.assertEqual(signed_result.script_input, signed_result2.script_input) + + def _run_test_nc_type(self, nc_type: NCType[T], result: T) -> None: + result_bytes = nc_type.to_bytes(result) + result2: T = nc_type.from_bytes(result_bytes) + self.assertEqual(result, result2) + + def test_signed_bytes(self): + self._run_test_signed(bytes, b'1x1') + + def test_signed_str(self): + self._run_test_signed(str, '1x1') + + def test_signed_bool(self): + self._run_test_signed(bool, True) + + def test_signed_invalid_type(self): + # XXX: list must be given a type argument, otherwise we cannot choose the inner parser, which is needed + # even if the list is empty, in this test we're checking that it will error + with self.assertRaises(TypeError): + self._run_test_signed(list, []) + + def test_invalid_bool(self): + from hathor.nanocontracts.nc_types import BoolNCType + bool_nc_type = BoolNCType() + with self.assertRaises(ValueError): + bool_nc_type.from_bytes(b'\x02') + + def test_str_empty(self): + self._run_test(str, '') + + def test_str_valid(self): + self._run_test(str, 'hathor') + + def test_str_accents(self): + self._run_test(str, 'áéíóúçãõ') + + def test_bytes_empty(self): + self._run_test(bytes, b'') + + def test_bytes_valid(self): + self._run_test(bytes, b'\x01\x02') + + def test_int_negative(self): + self._run_test(int, -100) + + def test_int_zero(self): + self._run_test(int, 0) + + def test_int_positive(self): + self._run_test(int, 100) + + def test_int_too_big(self): + from hathor.nanocontracts.nc_types import Int32NCType + with self.assertRaises(ValueError): + # this fails because Int32NCType's range is [-2**31, 2**31) + self._run_test_nc_type(Int32NCType(), 2**31) + # but this doesn't fail because int maps to VarInt32NCType + self._run_test(int, 2**31) + with self.assertRaises(ValueError): + # which has a larger, but still limited range, so this will fail: + self._run_test(int, 2**223) + + def test_optional_str_none(self): + self._run_test(Optional[str], None) + self._run_test(str | None, None) + + def test_optional_str_empty(self): + self._run_test(Optional[str], '') + self._run_test(str | None, '') + + def test_optional_str(self): + self._run_test(Optional[str], 'hathor') + self._run_test(str | None, 'hathor') + + def test_tuple(self): + self._run_test(tuple[int, str, bytes], (1, 'a', b'b')) + + def test_tuple_optional_str(self): + type_ = tuple[int, Optional[str]] + self._run_test(type_, (1, 'a')) + + def test_tuple_optional_none(self): + type_ = tuple[int, Optional[str]] + self._run_test(type_, (1, None)) diff --git a/tests/nanocontracts/test_sorter.py b/tests/nanocontracts/test_sorter.py new file mode 100644 index 000000000..c8e8d8d87 --- /dev/null +++ b/tests/nanocontracts/test_sorter.py @@ -0,0 +1,211 @@ +from hathor.nanocontracts.sorter.random_sorter import NCBlockSorter +from hathor.transaction import Transaction +from hathor.types import VertexId +from tests import unittest +from tests.dag_builder.builder import TestDAGBuilder + + +class NCBlockSorterTestCase(unittest.TestCase): + def setUp(self) -> None: + super().setUp() + + self.nodes = {} + for i in range(100): + self.nodes[i] = VertexId(f'{i}'.encode('ascii')) + + self.nc_nodes = {} + for i in range(99): + self.nc_nodes[i] = VertexId(f'nc-{i}'.encode('ascii')) + + def test_all_independent(self) -> None: + sorter = NCBlockSorter() + for node in self.nodes.values(): + sorter.get_node(node) + + seed = self.rng.randbytes(32) + order = sorter.copy().generate_random_topological_order(seed) + self.assertEqual(len(self.nodes), len(set(order))) + + order2 = sorter.copy().generate_random_topological_order(seed) + self.assertEqual(order, order2) + + # There are n! permutations. + # Therefore, the probability of getting the same order is 1/100!, which is around 1e-158. + for _ in range(100): + seed2 = self.rng.randbytes(32) + order2 = sorter.copy().generate_random_topological_order(seed2) + self.assertNotEqual(order, order2) + + def test_single_one_step_dependencies(self) -> None: + sorter = NCBlockSorter() + + # Generate the following graph: + # 0 -> NC0 -> 1 -> NC1 -> 2 -> NC2 -> 3 -> ... + for i in range(len(self.nodes) - 1): + sorter.add_edge(self.nodes[i], self.nc_nodes[i]) + sorter.add_edge(self.nc_nodes[i], self.nodes[i + 1]) + for _id in self.nodes.values(): + sorter.remove_vertex(_id) + + seed = self.rng.randbytes(32) + order = sorter.copy().generate_random_topological_order(seed) + self.assertEqual(set(self.nc_nodes.values()), set(order)) + + # There's only one valid order. So it must return the same order for any seed. + for _ in range(100): + seed2 = self.rng.randbytes(32) + order2 = sorter.copy().generate_random_topological_order(seed2) + self.assertEqual(order, order2) + + def test_single_long_dependencies(self) -> None: + sorter = NCBlockSorter() + + # Generate the following graph: + # 0 -> NC0 -> 1 -> 2 -> 3 -> 4 -> NC4 -> 5 -> 6 -> 7 -> 8 -> NC8 -> ... + for i in range(len(self.nodes) - 1): + if i % 4 == 0: + sorter.add_edge(self.nodes[i], self.nc_nodes[i]) + sorter.add_edge(self.nc_nodes[i], self.nodes[i + 1]) + else: + sorter.add_edge(self.nodes[i], self.nodes[i + 1]) + for _id in self.nodes.values(): + sorter.remove_vertex(_id) + + seed = self.rng.randbytes(32) + order = sorter.copy().generate_random_topological_order(seed) + self.assertEqual(set(x for i, x in self.nc_nodes.items() if i % 4 == 0), set(order)) + + # There's only one valid order. So it must return the same order for any seed. + for _ in range(100): + seed2 = self.rng.randbytes(32) + order2 = sorter.copy().generate_random_topological_order(seed2) + self.assertEqual(order, order2) + + def test_linear_multiple_dependencies(self) -> None: + sorter = NCBlockSorter() + sorter.add_edge(self.nc_nodes[0], self.nodes[1]) + sorter.add_edge(self.nodes[1], self.nodes[2]) + sorter.add_edge(self.nodes[2], self.nodes[3]) + sorter.add_edge(self.nodes[3], self.nodes[4]) + sorter.add_edge(self.nodes[4], self.nc_nodes[5]) + for _id in self.nodes.values(): + sorter.remove_vertex(_id, discard=True) + + seed = self.rng.randbytes(32) + order = sorter.copy().generate_random_topological_order(seed) + self.assertEqual(order, [ + self.nc_nodes[5], + self.nc_nodes[0], + ]) + + def test_grid_multiple_dependencies(self) -> None: + sorter = NCBlockSorter() + + idx = 0 + n_layers = 10 + n_per_layer = 8 + layers: list[list[VertexId]] = [] + + selected_nc_nodes = {1, 57, 75} + + for _ in range(n_layers): + current = [] + for j in range(n_per_layer): + if idx in selected_nc_nodes: + vertex_id = self.nc_nodes[idx] + else: + vertex_id = self.nodes[idx] + current.append(vertex_id) + idx += 1 + + _ = sorter.get_node(vertex_id) + if layers: + previous = layers[-1] + if j > 0: + sorter.add_edge(previous[j - 1], vertex_id) + sorter.add_edge(previous[j], vertex_id) + layers.append(current) + + for _id in self.nodes.values(): + sorter.remove_vertex(_id, discard=True) + + seed = self.rng.randbytes(32) + order = sorter.copy().generate_random_topological_order(seed) + self.assertEqual(order, [ + self.nc_nodes[75], + self.nc_nodes[57], + self.nc_nodes[1], + ]) + + # There's only one valid order. So it must return the same order for any seed. + for _ in range(100): + seed2 = self.rng.randbytes(32) + order2 = sorter.copy().generate_random_topological_order(seed2) + self.assertEqual(order, order2) + + def test_dag_dependencies(self) -> None: + builder = self.get_builder() + builder.enable_nc_anti_mev() + manager = self.create_peer_from_builder(builder) + dag_builder = TestDAGBuilder.from_manager(manager) + + private_key = unittest.OCB_TEST_PRIVKEY.hex() + password = unittest.OCB_TEST_PASSWORD.hex() + artifacts = dag_builder.build_from_str(f""" + blockchain genesis b[1..32] + b30 < dummy + + nc1.nc_id = ocb1 + nc1.nc_method = initialize() + + nc2.nc_id = nc1 + nc2.nc_method = nop() + + nc3.nc_id = nc1 + nc3.nc_method = nop() + + nc4.nc_id = nc1 + nc4.nc_method = nop() + + nc5.nc_id = nc1 + nc5.nc_method = nop() + + nc6.nc_id = nc1 + nc6.nc_method = nop() + + b31 --> ocb1 # OCB must be confirmed before being used to create a contract + b31 < nc1 + nc1 <-- nc2 <-- b32 + + ocb1.ocb_private_key = "{private_key}" + ocb1.ocb_password = "{password}" + ocb1.ocb_code = ``` + from hathor.nanocontracts import Blueprint + from hathor.nanocontracts.context import Context + from hathor.nanocontracts.types import public + class MyBlueprint(Blueprint): + @public + def initialize(self, ctx: Context) -> None: + pass + + @public + def nop(self, ctx: Context) -> None: + pass + __blueprint__ = MyBlueprint + ``` + """) + + artifacts.propagate_with(manager) + + ocb1, nc1 = artifacts.get_typed_vertices(['ocb1', 'nc1'], Transaction) + + nc_others = [] + for i in range(2, 7): + nc_others.append(artifacts.get_typed_vertex(f'nc{i}', Transaction)) + + assert ocb1.get_metadata().voided_by is None + assert nc1.get_metadata().voided_by is None + + for tx in nc_others: + # TODO Assert the execution order. + assert tx.get_metadata().voided_by is None diff --git a/tests/nanocontracts/test_storage.py b/tests/nanocontracts/test_storage.py new file mode 100644 index 000000000..9b28793d2 --- /dev/null +++ b/tests/nanocontracts/test_storage.py @@ -0,0 +1,116 @@ +from typing import TypeVar + +from hathor.nanocontracts.nc_types import NCType, NullNCType, make_nc_type_for_type +from hathor.nanocontracts.storage import NCChangesTracker, NCContractStorage +from hathor.nanocontracts.types import ContractId, VertexId +from tests import unittest + +T = TypeVar('T') + +STR_NC_TYPE = make_nc_type_for_type(str) +BYTES_NC_TYPE = make_nc_type_for_type(bytes) +INT_NC_TYPE = make_nc_type_for_type(int) +BOOL_NC_TYPE = make_nc_type_for_type(bool) + + +class BaseNCStorageTestCase(unittest.TestCase): + __test__ = False + + storage: NCContractStorage + + def _run_test(self, data_in: T, value: NCType[T]) -> None: + # XXX: maybe make the key random? + key = b'x' + # make sure the key is unused + self.assertFalse(self.storage.has_obj(key)) + # value goes in + self.storage.put_obj(key, value, data_in) + # the key should be present + self.assertTrue(self.storage.has_obj(key)) + # value comes out + data_out = self.storage.get_obj(key, value) + # should be the same + self.assertEqual(data_in, data_out) + # clean up + self.storage.del_obj(key) + # make sure the storage got rid of it + self.assertFalse(self.storage.has_obj(key)) + + def test_str(self) -> None: + self._run_test('nano', STR_NC_TYPE) + + def test_str_empty(self) -> None: + self._run_test('', STR_NC_TYPE) + + def test_bytes(self) -> None: + self._run_test(b'nano', BYTES_NC_TYPE) + + def test_bytes_empty(self) -> None: + self._run_test(b'', BYTES_NC_TYPE) + + def test_int_positive(self) -> None: + self._run_test(123, INT_NC_TYPE) + + def test_int_zero(self) -> None: + self._run_test(0, INT_NC_TYPE) + + def test_int_negative(self) -> None: + self._run_test(-123, INT_NC_TYPE) + + def test_bigint(self) -> None: + self._run_test(2**40, INT_NC_TYPE) + + def test_float(self) -> None: + with self.assertRaises(TypeError): + make_nc_type_for_type(float) + with self.assertRaises(TypeError): + # XXX: ignore misc, mypy catches this error but we want to test for it + self._run_test(1.23, INT_NC_TYPE) # type: ignore[misc] + + def test_none(self) -> None: + value = NullNCType() + self._run_test(None, value) + + def test_optional(self) -> None: + value: NCType[int | None] = make_nc_type_for_type(int | None) # type: ignore[arg-type] + self._run_test(1, value) + self._run_test(None, value) + + def test_bool_true(self) -> None: + self._run_test(True, BOOL_NC_TYPE) + + def test_bool_false(self) -> None: + self._run_test(False, BOOL_NC_TYPE) + + def test_tuple(self) -> None: + value: NCType[tuple[str, int, set[int], bool]] + value = make_nc_type_for_type(tuple[str, int, set[int], bool]) # type: ignore[arg-type] + self._run_test(('str', 1, {3}, True), value) + + def test_changes_tracker_delete(self) -> None: + self.storage.put_obj(b'x', INT_NC_TYPE, 1) + changes_tracker = NCChangesTracker(ContractId(VertexId(b'')), self.storage) + self.assertEqual(1, changes_tracker.get_obj(b'x', INT_NC_TYPE)) + + changes_tracker.del_obj(b'x') + # Confirm the key has been deleted. + with self.assertRaises(KeyError): + changes_tracker.get_obj(b'x', INT_NC_TYPE) + # Check that the key has not been deleted on the storage. + self.assertEqual(1, self.storage.get_obj(b'x', INT_NC_TYPE)) + + # Commit changes and confirm the key was deleted on the storage. + changes_tracker.commit() + with self.assertRaises(KeyError): + self.storage.get_obj(b'x', INT_NC_TYPE) + + +class NCMemoryStorageTestCase(BaseNCStorageTestCase): + __test__ = True + + def setUp(self) -> None: + from hathor.nanocontracts.storage import NCMemoryStorageFactory + factory = NCMemoryStorageFactory() + block_storage = factory.get_empty_block_storage() + self.storage = block_storage.get_empty_contract_storage(ContractId(VertexId(b''))) + super().setUp() diff --git a/tests/nanocontracts/test_syscalls.py b/tests/nanocontracts/test_syscalls.py new file mode 100644 index 000000000..f4ab111af --- /dev/null +++ b/tests/nanocontracts/test_syscalls.py @@ -0,0 +1,169 @@ +from typing import Optional + +import pytest + +from hathor.conf.settings import HATHOR_TOKEN_UID +from hathor.nanocontracts.blueprint import Blueprint +from hathor.nanocontracts.context import Context +from hathor.nanocontracts.exception import NCInvalidSyscall +from hathor.nanocontracts.nc_types import NCType, make_nc_type_for_type +from hathor.nanocontracts.storage.contract_storage import Balance, BalanceKey +from hathor.nanocontracts.types import ( + BlueprintId, + ContractId, + NCDepositAction, + NCGrantAuthorityAction, + TokenUid, + public, +) +from tests.nanocontracts.blueprints.unittest import BlueprintTestCase + +CONTRACT_NC_TYPE = make_nc_type_for_type(ContractId) +BLUEPRINT_NC_TYPE = make_nc_type_for_type(BlueprintId) +OPT_CONTRACT_NC_TYPE: NCType[ContractId | None] = make_nc_type_for_type(ContractId | None) # type: ignore[arg-type] +OPT_BLUEPRINT_NC_TYPE: NCType[BlueprintId | None] = make_nc_type_for_type(BlueprintId | None) # type: ignore[arg-type] + + +class MyBlueprint(Blueprint): + my_nc_id: ContractId + my_blueprint_id: BlueprintId + + other_nc_id: Optional[ContractId] + other_blueprint_id: Optional[BlueprintId] + + @public + def initialize(self, ctx: Context, other_nc_id: ContractId) -> None: + self.my_nc_id = self.syscall.get_contract_id() + self.my_blueprint_id = self.syscall.get_blueprint_id() + + self.other_nc_id = other_nc_id + self.other_blueprint_id = self.syscall.get_blueprint_id(other_nc_id) + + +class OtherBlueprint(Blueprint): + @public(allow_deposit=True, allow_grant_authority=True) + def initialize(self, ctx: Context) -> None: + pass + + @public(allow_grant_authority=True) + def nop(self, ctx: Context) -> None: + pass + + @public + def revoke(self, ctx: Context, token_uid: TokenUid, revoke_mint: bool, revoke_melt: bool) -> None: + self.syscall.revoke_authorities(token_uid, revoke_mint=revoke_mint, revoke_melt=revoke_melt) + + @public + def mint(self, ctx: Context, token_uid: TokenUid, amount: int) -> None: + self.syscall.mint_tokens(token_uid, amount) + + @public + def melt(self, ctx: Context, token_uid: TokenUid, amount: int) -> None: + self.syscall.melt_tokens(token_uid, amount) + + +class NCNanoContractTestCase(BlueprintTestCase): + def setUp(self) -> None: + super().setUp() + + self.my_blueprint_id = self.gen_random_blueprint_id() + self.other_blueprint_id = self.gen_random_blueprint_id() + + self.nc_catalog.blueprints[self.my_blueprint_id] = MyBlueprint + self.nc_catalog.blueprints[self.other_blueprint_id] = OtherBlueprint + + def test_authorities(self) -> None: + nc_id = self.gen_random_contract_id() + token_a_uid = self.gen_random_token_uid() + htr_balance_key = BalanceKey(nc_id=nc_id, token_uid=HATHOR_TOKEN_UID) + tka_balance_key = BalanceKey(nc_id=nc_id, token_uid=token_a_uid) + + ctx_initialize = Context( + actions=[ + NCDepositAction(token_uid=TokenUid(HATHOR_TOKEN_UID), amount=1000), + NCDepositAction(token_uid=token_a_uid, amount=1000), + ], + vertex=self.get_genesis_tx(), + address=self.gen_random_address(), + timestamp=0, + ) + + self.runner.create_contract(nc_id, self.other_blueprint_id, ctx_initialize) + storage = self.runner.get_storage(nc_id) + + ctx_grant = Context( + actions=[NCGrantAuthorityAction(token_uid=token_a_uid, mint=True, melt=True)], + vertex=self.get_genesis_tx(), + address=self.gen_random_address(), + timestamp=0, + ) + self.runner.call_public_method(nc_id, 'nop', ctx_grant) + + ctx = Context( + actions=[], + vertex=self.get_genesis_tx(), + address=self.gen_random_address(), + timestamp=0, + ) + + # Starting state + assert storage.get_all_balances() == { + htr_balance_key: Balance(value=1000, can_mint=False, can_melt=False), + tka_balance_key: Balance(value=1000, can_mint=True, can_melt=True), + } + + # After mint + self.runner.call_public_method(nc_id, 'mint', ctx, token_a_uid, 123) + assert storage.get_all_balances() == { + htr_balance_key: Balance(value=998, can_mint=False, can_melt=False), + tka_balance_key: Balance(value=1123, can_mint=True, can_melt=True), + } + + # After melt + self.runner.call_public_method(nc_id, 'melt', ctx, token_a_uid, 456) + assert storage.get_all_balances() == { + htr_balance_key: Balance(value=1002, can_mint=False, can_melt=False), + tka_balance_key: Balance(value=667, can_mint=True, can_melt=True), + } + + # After revoke mint + self.runner.call_public_method(nc_id, 'revoke', ctx, token_a_uid, True, False) + assert storage.get_all_balances() == { + htr_balance_key: Balance(value=1002, can_mint=False, can_melt=False), + tka_balance_key: Balance(value=667, can_mint=False, can_melt=True), + } + + # After revoke melt + self.runner.call_public_method(nc_id, 'revoke', ctx, token_a_uid, False, True) + assert storage.get_all_balances() == { + htr_balance_key: Balance(value=1002, can_mint=False, can_melt=False), + tka_balance_key: Balance(value=667, can_mint=False, can_melt=False), + } + + # Try mint TKA + msg = f'contract {nc_id.hex()} cannot mint {token_a_uid.hex()} tokens' + with pytest.raises(NCInvalidSyscall, match=msg): + self.runner.call_public_method(nc_id, 'mint', ctx, token_a_uid, 123) + + # Try melt TKA + msg = f'contract {nc_id.hex()} cannot melt {token_a_uid.hex()} tokens' + with pytest.raises(NCInvalidSyscall, match=msg): + self.runner.call_public_method(nc_id, 'melt', ctx, token_a_uid, 456) + + # Try mint HTR + with pytest.raises(NCInvalidSyscall, match=f'contract {nc_id.hex()} cannot mint HTR tokens'): + self.runner.call_public_method(nc_id, 'mint', ctx, HATHOR_TOKEN_UID, 123) + + # Try melt HTR + with pytest.raises(NCInvalidSyscall, match=f'contract {nc_id.hex()} cannot melt HTR tokens'): + self.runner.call_public_method(nc_id, 'melt', ctx, HATHOR_TOKEN_UID, 456) + + # Try revoke HTR authorities + with pytest.raises(NCInvalidSyscall, match=f'contract {nc_id.hex()} cannot revoke authorities from HTR token'): + self.runner.call_public_method(nc_id, 'revoke', ctx, HATHOR_TOKEN_UID, True, False) + + # Final state + assert storage.get_all_balances() == { + htr_balance_key: Balance(value=1002, can_mint=False, can_melt=False), + tka_balance_key: Balance(value=667, can_mint=False, can_melt=False), + } diff --git a/tests/nanocontracts/test_syscalls_in_view.py b/tests/nanocontracts/test_syscalls_in_view.py new file mode 100644 index 000000000..57466ed93 --- /dev/null +++ b/tests/nanocontracts/test_syscalls_in_view.py @@ -0,0 +1,127 @@ +# Copyright 2025 Hathor Labs +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import pytest + +from hathor.nanocontracts import Blueprint, Context, public, view +from hathor.nanocontracts.exception import NCViewMethodError +from hathor.nanocontracts.types import BlueprintId, ContractId, TokenUid, VertexId +from tests.nanocontracts.blueprints.unittest import BlueprintTestCase + + +class MyBlueprint(Blueprint): + other_id: ContractId | None + + @public + def initialize(self, ctx: Context, other_id: ContractId | None) -> None: + self.other_id = other_id + + @view + def nop(self) -> None: + pass + + @view + def test_rng(self) -> None: + self.syscall.rng.randbits(1) + + @view + def get_contract_id(self) -> None: + self.syscall.get_contract_id() + + @view + def get_blueprint_id(self) -> None: + self.syscall.get_blueprint_id() + + @view + def get_balance(self) -> None: + self.syscall.get_balance() + + @view + def get_balance_before_current_call(self) -> None: + self.syscall.get_balance_before_current_call() + + @view + def get_current_balance(self) -> None: + self.syscall.get_current_balance() + + @view + def can_mint(self) -> None: + self.syscall.can_mint(TokenUid(b'')) + + @view + def can_mint_before_current_call(self) -> None: + self.syscall.can_mint_before_current_call(TokenUid(b'')) + + @view + def can_melt(self) -> None: + self.syscall.can_melt(TokenUid(b'')) + + @view + def can_melt_before_current_call(self) -> None: + self.syscall.can_melt_before_current_call(TokenUid(b'')) + + @view + def call_public_method(self) -> None: + self.syscall.call_public_method(ContractId(VertexId(b'')), '', []) + + @view + def call_view_method(self) -> None: + assert self.other_id is not None + self.syscall.call_view_method(self.other_id, 'nop') + + @view + def revoke_authorities(self) -> None: + self.syscall.revoke_authorities(TokenUid(b''), revoke_mint=True, revoke_melt=True) + + @view + def mint_tokens(self) -> None: + self.syscall.mint_tokens(TokenUid(b''), 0) + + @view + def melt_tokens(self) -> None: + self.syscall.melt_tokens(TokenUid(b''), 0) + + @view + def create_contract(self) -> None: + self.syscall.create_contract(BlueprintId(VertexId(b'')), b'', []) + + @view + def emit_event(self) -> None: + self.syscall.emit_event(b'') + + @view + def create_token(self) -> None: + self.syscall.create_token('', '', 0) + + +class TestSyscallsInView(BlueprintTestCase): + def setUp(self) -> None: + super().setUp() + + self.blueprint_id = self.gen_random_blueprint_id() + self.register_blueprint_class(self.blueprint_id, MyBlueprint) + + self.ctx = Context( + actions=[], + vertex=self.get_genesis_tx(), + address=self.gen_random_address(), + timestamp=self.now, + ) + + def test_rng(self) -> None: + contract_id = self.gen_random_contract_id() + self.runner.create_contract(contract_id, self.blueprint_id, self.ctx, None) + + with pytest.raises(NCViewMethodError, match='@view method cannot call `syscall.rng`'): + self.runner.call_view_method(contract_id, 'test_rng') diff --git a/tests/nanocontracts/test_token_creation.py b/tests/nanocontracts/test_token_creation.py new file mode 100644 index 000000000..9f09af27e --- /dev/null +++ b/tests/nanocontracts/test_token_creation.py @@ -0,0 +1,241 @@ + +from hathor.conf import HathorSettings +from hathor.nanocontracts.blueprint import Blueprint +from hathor.nanocontracts.catalog import NCBlueprintCatalog +from hathor.nanocontracts.context import Context +from hathor.nanocontracts.nc_exec_logs import NCLogConfig +from hathor.nanocontracts.storage.contract_storage import Balance, BalanceKey +from hathor.nanocontracts.types import ContractId, NCWithdrawalAction, TokenUid, VertexId, public +from hathor.nanocontracts.utils import derive_child_token_id +from hathor.transaction import Block, Transaction +from hathor.transaction.nc_execution_state import NCExecutionState +from hathor.transaction.token_creation_tx import TokenDescription +from tests import unittest +from tests.dag_builder.builder import TestDAGBuilder +from tests.nanocontracts.utils import assert_nc_failure_reason + +settings = HathorSettings() + + +class MyBlueprint(Blueprint): + a: str + b: int + + @public(allow_deposit=True) + def initialize(self, ctx: Context) -> None: + pass + + @public(allow_withdrawal=True) + def withdraw(self, ctx: Context) -> None: + pass + + @public(allow_deposit=True) + def create_token( + self, + ctx: Context, + token_name: str, + token_symbol: str, + amount: int, + mint_authority: bool, + melt_authority: bool, + ) -> None: + self.syscall.create_token(token_name, token_symbol, amount, mint_authority, melt_authority) + + +class NCNanoContractTestCase(unittest.TestCase): + def setUp(self): + super().setUp() + + self.myblueprint_id = b'x' * 32 + self.catalog = NCBlueprintCatalog({ + self.myblueprint_id: MyBlueprint + }) + + self.manager = self.create_peer('testnet', nc_log_config=NCLogConfig.FAILED, wallet_index=True) + self.manager.tx_storage.nc_catalog = self.catalog + + def test_token_creation_by_vertex(self) -> None: + dag_builder = TestDAGBuilder.from_manager(self.manager) + vertices = dag_builder.build_from_str(f''' + blockchain genesis b[1..40] + b30 < dummy + + tx1.nc_id = "{self.myblueprint_id.hex()}" + tx1.nc_method = initialize() + tx1.nc_deposit = 1 HTR + + tx2.out[0] = 10 HTR + tx2.out[1] = 100 TKA # call a method of an existing contract + tx2.out[2] = 150 ABC # ABC is a token created w/out using nano headers + tx2.out[3] = 250 DEF # create a new contract, no deposits + tx2.out[4] = 350 GHI # create a new contract, depositing 10 HTR into it + tx2.out[5] = 450 JKL # call a method of an existing contract with partial withdrawal + + tx3.out[1] = 200 TKB + + TKA.nc_id = tx1 + TKA.nc_method = withdraw() + TKA.nc_withdrawal = 1 HTR + + DEF.nc_id = "{self.myblueprint_id.hex()}" + DEF.nc_method = initialize() + + GHI.nc_id = "{self.myblueprint_id.hex()}" + GHI.nc_method = initialize() + GHI.nc_deposit = 10 HTR + + # JKL needs to deposit 5 HTR to create 450 JKL tokens. + # - 3 HTR will be covered by a withdrawal from a contract + # - 2 HTR will be covered by inputs + JKL.nc_id = GHI + JKL.nc_method = withdraw() + JKL.nc_withdrawal = 3 HTR + + TKB.nc_id = tx1 + TKB.nc_method = withdraw() + TKB.nc_withdrawal = 2 HTR + + TKA < TKB + + b31 --> tx1 + b32 --> tx2 + b33 --> tx3 + ''') + + vertices.propagate_with(self.manager, up_to='b31') + tx1, = vertices.get_typed_vertices(['tx1'], Transaction) + + nc_storage = self.manager.get_best_block_nc_storage(tx1.hash) + assert tx1.is_nano_contract() + assert nc_storage.get_balance(settings.HATHOR_TOKEN_UID) == Balance(value=1, can_mint=False, can_melt=False) + + vertices.propagate_with(self.manager, up_to='b32') + TKA, ABC, DEF, GHI, JKL, tx2 = vertices.get_typed_vertices( + ['TKA', 'ABC', 'DEF', 'GHI', 'JKL', 'tx2'], + Transaction + ) + + assert not ABC.is_nano_contract() + assert TKA.get_metadata().voided_by is None + + assert TKA.is_nano_contract() + assert TKA.get_metadata().voided_by is None + + assert DEF.is_nano_contract() + assert DEF.get_metadata().voided_by is None + + assert GHI.is_nano_contract() + assert GHI.get_metadata().voided_by is None + + assert JKL.is_nano_contract() + assert JKL.get_metadata().voided_by is None + + nc_storage = self.manager.get_best_block_nc_storage(tx1.hash) + assert nc_storage.get_balance(settings.HATHOR_TOKEN_UID) == Balance(value=0, can_mint=False, can_melt=False) + + ghi_nc_storage = self.manager.get_best_block_nc_storage(GHI.hash) + assert ghi_nc_storage.get_balance(settings.HATHOR_TOKEN_UID) == ( + Balance(value=7, can_mint=False, can_melt=False) + ) + + jkl_token_info = JKL._get_token_info_from_inputs() + JKL._update_token_info_from_outputs(token_dict=jkl_token_info) + assert jkl_token_info[settings.HATHOR_TOKEN_UID].amount == -2 + + jkl_context = JKL.get_nano_header().get_context() + htr_token_uid = TokenUid(settings.HATHOR_TOKEN_UID) + assert jkl_context.actions[htr_token_uid] == (NCWithdrawalAction(token_uid=htr_token_uid, amount=3),) + + assert not tx2.is_nano_contract() + assert tx2.get_metadata().voided_by is None + + vertices.propagate_with(self.manager) + TKB, tx3 = vertices.get_typed_vertices(['TKB', 'tx3'], Transaction) + + nc_storage = self.manager.get_best_block_nc_storage(tx1.hash) + assert nc_storage.get_balance(settings.HATHOR_TOKEN_UID) == Balance(value=0, can_mint=False, can_melt=False) + + assert TKB.is_nano_contract() + assert TKB.get_metadata().voided_by == {TKB.hash, settings.NC_EXECUTION_FAIL_ID} + + assert not tx3.is_nano_contract() + assert tx3.get_metadata().voided_by == {TKB.hash} + + def test_token_creation_by_contract(self) -> None: + token_symbol = 'TKA' + + dag_builder = TestDAGBuilder.from_manager(self.manager) + vertices = dag_builder.build_from_str(f''' + blockchain genesis b[1..40] + b30 < dummy + + tx1.nc_id = "{self.myblueprint_id.hex()}" + tx1.nc_method = initialize() + + tx2.nc_id = tx1 + tx2.nc_method = create_token("MyToken", "{token_symbol}", 100, false, false) + tx2.nc_deposit = 3 HTR + + tx3.nc_id = tx1 + tx3.nc_method = create_token("MyToken (2)", "{token_symbol}", 50, true, false) + tx3.nc_deposit = 1 HTR + + tx2 < tx3 + + b31 --> tx1 + b31 --> tx2 + b32 --> tx3 + ''') + + vertices.propagate_with(self.manager) + + tx1, tx2, tx3 = vertices.get_typed_vertices(['tx1', 'tx2', 'tx3'], Transaction) + b31, b32 = vertices.get_typed_vertices(['b31', 'b32'], Block) + + # Uncomment for debugging: + # from tests.nanocontracts.utils import get_nc_failure_entry + # failure_entry = get_nc_failure_entry(manager=self.manager, tx_id=tx2.hash, block_id=b31.hash) + # print(failure_entry.error_traceback) + + assert tx1.get_metadata().voided_by is None + assert tx1.get_metadata().nc_execution is NCExecutionState.SUCCESS + + assert tx2.get_metadata().voided_by is None + assert tx2.get_metadata().nc_execution is NCExecutionState.SUCCESS + + assert tx3.get_metadata().voided_by == {tx3.hash, self._settings.NC_EXECUTION_FAIL_ID} + assert tx3.get_metadata().nc_execution is NCExecutionState.FAILURE + + assert b31.get_metadata().voided_by is None + assert b32.get_metadata().voided_by is None + + assert_nc_failure_reason( + manager=self.manager, + tx_id=tx3.hash, + block_id=b32.hash, + reason='NCTokenAlreadyExists', + ) + + child_token_id = derive_child_token_id(ContractId(VertexId(tx1.hash)), token_symbol) + child_token_balance_key = BalanceKey(nc_id=tx1.hash, token_uid=child_token_id) + htr_balance_key = BalanceKey(nc_id=tx1.hash, token_uid=settings.HATHOR_TOKEN_UID) + + block_storage = self.manager.get_nc_block_storage(b31) + expected_token_info = TokenDescription( + token_id=child_token_id, + token_name='MyToken', + token_symbol=token_symbol, + ) + assert block_storage.get_token_description(child_token_id) == expected_token_info + + nc_storage = block_storage.get_contract_storage(tx1.hash) + assert nc_storage.get_all_balances() == { + child_token_balance_key: Balance(value=100, can_mint=False, can_melt=False), + htr_balance_key: Balance(value=2, can_mint=False, can_melt=False), + } + + tokens_index = self.manager.tx_storage.indexes.tokens + assert tokens_index.get_token_info(settings.HATHOR_TOKEN_UID).get_total() == ( + settings.GENESIS_TOKENS + 40 * settings.INITIAL_TOKENS_PER_BLOCK - 1 + ) + assert tokens_index.get_token_info(child_token_id).get_total() == 100 diff --git a/tests/nanocontracts/test_types.py b/tests/nanocontracts/test_types.py new file mode 100644 index 000000000..3bbea685e --- /dev/null +++ b/tests/nanocontracts/test_types.py @@ -0,0 +1,45 @@ +from hathor.nanocontracts.types import ContractId, SignedData, VertexId +from hathor.transaction.scripts import P2PKH +from tests import unittest + + +class BaseNanoContractTestCase(unittest.TestCase): + def test_signed(self) -> None: + from hathor.wallet import KeyPair + + nc_id = ContractId(VertexId(b'x' * 32)) + + result = b'1x1' + signed_result = SignedData[bytes](result, b'') + result_bytes = signed_result.get_data_bytes(nc_id) + + # Check signature using oracle's private key. + key = KeyPair.create(b'123') + assert key.address is not None + script_input = key.p2pkh_create_input_data(b'123', result_bytes) + signed_result = SignedData[bytes](result, script_input) + + p2pkh = P2PKH(key.address) + oracle_script = p2pkh.get_script() + self.assertTrue(signed_result.checksig(nc_id, oracle_script)) + + # Try to tamper with the data. + fake_result = b'2x2' + self.assertNotEqual(result, fake_result) + invalid_signed_result = SignedData[bytes](fake_result, script_input) + self.assertFalse(invalid_signed_result.checksig(nc_id, oracle_script)) + + # Try to use the wrong private key to sign the data. + key2 = KeyPair.create(b'456') + assert key2.address is not None + p2pkh2 = P2PKH(key2.address) + oracle_script2 = p2pkh2.get_script() + self.assertFalse(signed_result.checksig(nc_id, oracle_script2)) + + def test_signed_eq(self): + x = SignedData[str]('data', b'signature') + + self.assertEqual(x, SignedData[str]('data', b'signature')) + self.assertNotEqual(x, SignedData[str]('data', b'another-signature')) + self.assertNotEqual(x, SignedData[str]('another-data', 'signature')) + self.assertNotEqual(x, SignedData[str]('another-data', 'another-signature')) diff --git a/tests/nanocontracts/test_violations.py b/tests/nanocontracts/test_violations.py new file mode 100644 index 000000000..c17cb0cac --- /dev/null +++ b/tests/nanocontracts/test_violations.py @@ -0,0 +1,81 @@ +from hathor.nanocontracts import Blueprint, public +from hathor.nanocontracts.context import Context +from hathor.nanocontracts.exception import NCFail +from hathor.nanocontracts.types import NCDepositAction +from tests.nanocontracts.blueprints.unittest import BlueprintTestCase + + +class MyBlueprint(Blueprint): + total: int + + @public + def initialize(self, ctx: Context) -> None: + self.total = 3 + + @public + def modify_actions(self, ctx: Context) -> None: + ctx.actions[b'\00'] = NCDepositAction(token_uid=b'\00', amount=1_000) # type: ignore + + @public + def modify_vertex(self, ctx: Context) -> None: + ctx.vertex.inputs[0] = None # type: ignore + + @public + def assign_declared_attribute(self, ctx: Context) -> None: + self.total += 1 + + @public + def assign_non_declared_attribute(self, ctx: Context) -> None: + self.unknown = 1 + + +class ViolationsTestCase(BlueprintTestCase): + def setUp(self): + super().setUp() + + self.blueprint_id = self.gen_random_blueprint_id() + self.contract_id = self.gen_random_contract_id() + self.nc_catalog.blueprints[self.blueprint_id] = MyBlueprint + self.tx = self.get_genesis_tx() + self.address = self.gen_random_address() + + def test_modify_actions(self) -> None: + context = Context( + actions=[], + vertex=self.tx, + address=self.address, + timestamp=self.now + ) + self.runner.create_contract(self.contract_id, self.blueprint_id, context) + + with self.assertRaises(NCFail) as cm: + self.runner.call_public_method(self.contract_id, 'modify_actions', context) + exc = cm.exception + self.assertIsInstance(exc.__cause__, TypeError) + + def test_modify_vertex(self) -> None: + context = Context( + actions=[], + vertex=self.tx, + address=self.address, + timestamp=self.now + ) + self.runner.create_contract(self.contract_id, self.blueprint_id, context) + with self.assertRaises(NCFail) as cm: + self.runner.call_public_method(self.contract_id, 'modify_vertex', context) + exc = cm.exception + self.assertIsInstance(exc.__cause__, TypeError) + + def test_assign_non_declared_attribute(self) -> None: + context = Context( + actions=[], + vertex=self.tx, + address=self.address, + timestamp=self.now + ) + self.runner.create_contract(self.contract_id, self.blueprint_id, context) + self.runner.call_public_method(self.contract_id, 'assign_declared_attribute', context) + with self.assertRaises(NCFail) as cm: + self.runner.call_public_method(self.contract_id, 'assign_non_declared_attribute', context) + exc = cm.exception + self.assertIsInstance(exc.__cause__, AttributeError) diff --git a/tests/resources/nanocontracts/test_nc_exec_logs.py b/tests/resources/nanocontracts/test_nc_exec_logs.py new file mode 100644 index 000000000..2e2e2dfe7 --- /dev/null +++ b/tests/resources/nanocontracts/test_nc_exec_logs.py @@ -0,0 +1,216 @@ +# Copyright 2025 Hathor Labs +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from unittest.mock import ANY + +from hathor.nanocontracts.resources.nc_exec_logs import NCExecLogsResource +from hathor.transaction import Block, Transaction +from tests.nanocontracts.test_nc_exec_logs import MY_BLUEPRINT1_ID, BaseNCExecLogs +from tests.resources.base_resource import StubSite + + +class NCExecLogsResourceTest(BaseNCExecLogs): + __test__ = True + + def setUp(self): + super().setUp() + self._prepare() + self.web = StubSite(NCExecLogsResource(self.manager)) + artifacts = self.dag_builder.build_from_str(f""" + blockchain genesis b[1..2] + blockchain b1 a[2..3] + b1 < dummy + b2 < a2 + + nc1.nc_id = "{MY_BLUEPRINT1_ID.hex()}" + nc1.nc_method = initialize() + + nc1 <-- b2 + nc1 <-- a2 + """) + + for _, vertex in artifacts.list: + assert self.manager.on_new_tx(vertex) + + self.nc1 = artifacts.get_typed_vertex('nc1', Transaction) + assert self.nc1.is_nano_contract() + self.b2, self.a2 = artifacts.get_typed_vertices(['b2', 'a2'], Block) + + async def test_missing_id(self) -> None: + response = await self.web.get('logs') + data = response.json_value() + assert response.responseCode == 400 + assert not data['success'] + + async def test_invalid_id(self) -> None: + response = await self.web.get('logs', { + b'id': b'a', + }) + data = response.json_value() + assert response.responseCode == 400 + assert data == dict( + success=False, + error='Invalid id: a' + ) + + async def test_tx_not_found(self) -> None: + response = await self.web.get('logs', { + b'id': b'aa', + }) + data = response.json_value() + assert response.responseCode == 404 + assert data == dict( + success=False, + error='NC "aa" not found.' + ) + + async def test_nc_not_found(self) -> None: + genesis_hash = self._settings.GENESIS_TX1_HASH.hex() + response = await self.web.get('logs', { + b'id': genesis_hash.encode() + }) + data = response.json_value() + assert response.responseCode == 404 + assert data == dict( + success=False, + error=f'NC "{genesis_hash}" not found.' + ) + + async def test_invalid_log_level(self) -> None: + response = await self.web.get('logs', { + b'id': self.nc1.hash_hex.encode(), + b'log_level': b'UNKNOWN' + }) + data = response.json_value() + assert response.responseCode == 400 + assert data == dict( + success=False, + error='Invalid log level: UNKNOWN' + ) + + async def test_success(self) -> None: + response = await self.web.get('logs', { + b'id': self.nc1.hash_hex.encode(), + }) + data = response.json_value() + assert data == dict( + success=True, + nc_id=self.nc1.get_nano_header().get_contract_id().hex(), + nc_execution='success', + logs={ + self.a2.hash_hex: [ + dict( + error_traceback=None, + logs=[ + dict( + type='CALL_BEGIN', + level='DEBUG', + nc_id=self.nc1.hash_hex, + call_type='public', + method_name='initialize', + str_args='()', + str_kwargs='{}', + timestamp=ANY, + actions=[], + ), + dict( + type='LOG', + level='INFO', + message='initialize() called on MyBlueprint1', + key_values={}, + timestamp=ANY, + ), + dict( + type='CALL_END', + level='DEBUG', + timestamp=ANY, + ) + ], + ), + ], + }, + ) + + async def test_all_execs(self) -> None: + response = await self.web.get('logs', { + b'id': self.nc1.hash_hex.encode(), + b'all_execs': b'true' + }) + data = response.json_value() + + expected_initialize_call_logs = [ + dict( + type='CALL_BEGIN', + level='DEBUG', + nc_id=self.nc1.hash_hex, + call_type='public', + method_name='initialize', + str_args='()', + str_kwargs='{}', + timestamp=ANY, + actions=[], + ), + dict( + type='LOG', + level='INFO', + message='initialize() called on MyBlueprint1', + key_values={}, + timestamp=ANY, + ), + dict( + type='CALL_END', + level='DEBUG', + timestamp=ANY, + ) + ] + + assert data == dict( + success=True, + nc_id=self.nc1.get_nano_header().get_contract_id().hex(), + nc_execution='success', + logs={ + self.b2.hash_hex: [ + dict( + error_traceback=None, + logs=expected_initialize_call_logs, + ), + ], + self.a2.hash_hex: [ + dict( + error_traceback=None, + logs=expected_initialize_call_logs, + ), + ], + }, + ) + + async def test_filter_log_level(self) -> None: + response = await self.web.get('logs', { + b'id': self.nc1.hash_hex.encode(), + b'log_level': b'ERROR' + }) + data = response.json_value() + assert data == dict( + success=True, + nc_id=self.nc1.get_nano_header().get_contract_id().hex(), + nc_execution='success', + logs={ + self.a2.hash_hex: [ + dict( + error_traceback=None, + logs=[], + ), + ], + }, + )