From 60366fd22c84db65c6a5734deffb8049ac397d66 Mon Sep 17 00:00:00 2001 From: Gabriel Levcovitz Date: Mon, 23 Jun 2025 20:29:56 -0300 Subject: [PATCH] tests(nano): add basic structures and resources tests --- hathor/conf/nano_testnet.py | 3 - hathor/conf/nano_testnet.yml | 2 - hathor/nanocontracts/blueprints/__init__.py | 3 +- hathor/nanocontracts/storage/block_storage.py | 8 +- pyproject.toml | 1 + tests/dag_builder/builder.py | 2 + tests/dag_builder/test_dag_builder.py | 201 ++++++ tests/nanocontracts/__init__.py | 0 tests/nanocontracts/blueprints/__init__.py | 0 tests/nanocontracts/blueprints/unittest.py | 103 ++++ tests/nanocontracts/fields/__init__.py | 0 tests/nanocontracts/fields/utils.py | 51 ++ .../on_chain_blueprints/__init__.py | 0 .../on_chain_blueprints/utils.py | 10 + .../nanocontracts/test_blueprints/__init__.py | 0 tests/nanocontracts/test_blueprints/bet.py | 221 +++++++ .../test_blueprints/test_blueprint1.py | 27 + tests/nanocontracts/utils.py | 56 ++ .../resources/nanocontracts/base_resource.py | 40 ++ .../nanocontracts/dummy_blueprint.py | 18 + tests/resources/nanocontracts/my_blueprint.py | 58 ++ .../resources/nanocontracts/test_blueprint.py | 156 +++++ .../test_blueprint_source_code.py | 121 ++++ tests/resources/nanocontracts/test_builtin.py | 277 +++++++++ tests/resources/nanocontracts/test_history.py | 252 ++++++++ .../nanocontracts/test_nc_creation.py | 570 ++++++++++++++++++ .../resources/nanocontracts/test_on_chain.py | 427 +++++++++++++ tests/resources/nanocontracts/test_state.py | 512 ++++++++++++++++ tests/tx/test_headers.py | 232 +++++++ tests/tx/test_indexes_nc_history.py | 219 +++++++ 30 files changed, 3561 insertions(+), 9 deletions(-) create mode 100644 tests/nanocontracts/__init__.py create mode 100644 tests/nanocontracts/blueprints/__init__.py create mode 100644 tests/nanocontracts/blueprints/unittest.py create mode 100644 tests/nanocontracts/fields/__init__.py create mode 100644 tests/nanocontracts/fields/utils.py create mode 100644 tests/nanocontracts/on_chain_blueprints/__init__.py create mode 100644 tests/nanocontracts/on_chain_blueprints/utils.py create mode 100644 tests/nanocontracts/test_blueprints/__init__.py create mode 100644 tests/nanocontracts/test_blueprints/bet.py create mode 100644 tests/nanocontracts/test_blueprints/test_blueprint1.py create mode 100644 tests/nanocontracts/utils.py create mode 100644 tests/resources/nanocontracts/base_resource.py create mode 100644 tests/resources/nanocontracts/dummy_blueprint.py create mode 100644 tests/resources/nanocontracts/my_blueprint.py create mode 100644 tests/resources/nanocontracts/test_blueprint.py create mode 100644 tests/resources/nanocontracts/test_blueprint_source_code.py create mode 100644 tests/resources/nanocontracts/test_builtin.py create mode 100644 tests/resources/nanocontracts/test_history.py create mode 100644 tests/resources/nanocontracts/test_nc_creation.py create mode 100644 tests/resources/nanocontracts/test_on_chain.py create mode 100644 tests/resources/nanocontracts/test_state.py create mode 100644 tests/tx/test_headers.py create mode 100644 tests/tx/test_indexes_nc_history.py diff --git a/hathor/conf/nano_testnet.py b/hathor/conf/nano_testnet.py index d615947a8..4a7bc84ee 100644 --- a/hathor/conf/nano_testnet.py +++ b/hathor/conf/nano_testnet.py @@ -38,9 +38,6 @@ NC_ON_CHAIN_BLUEPRINT_ALLOWED_ADDRESSES=[ 'WWFiNeWAFSmgtjm4ht2MydwS5GY3kMJsEK', ], - BLUEPRINTS={ - bytes.fromhex('3cb032600bdf7db784800e4ea911b10676fa2f67591f82bb62628c234e771595'): 'Bet', - }, SOFT_VOIDED_TX_IDS=list(map(bytes.fromhex, [ '0000003dd5802b05f430a1f54304879173550c0944b49d74321bb9125ee727cb', ])), diff --git a/hathor/conf/nano_testnet.yml b/hathor/conf/nano_testnet.yml index cb022710a..2ddcaae21 100644 --- a/hathor/conf/nano_testnet.yml +++ b/hathor/conf/nano_testnet.yml @@ -22,8 +22,6 @@ ENABLE_NANO_CONTRACTS: true ENABLE_ON_CHAIN_BLUEPRINTS: true NC_ON_CHAIN_BLUEPRINT_ALLOWED_ADDRESSES: - WWFiNeWAFSmgtjm4ht2MydwS5GY3kMJsEK -BLUEPRINTS: - 3cb032600bdf7db784800e4ea911b10676fa2f67591f82bb62628c234e771595: Bet SOFT_VOIDED_TX_IDS: - 0000003dd5802b05f430a1f54304879173550c0944b49d74321bb9125ee727cb diff --git a/hathor/nanocontracts/blueprints/__init__.py b/hathor/nanocontracts/blueprints/__init__.py index b4004cc5d..bf68061ff 100644 --- a/hathor/nanocontracts/blueprints/__init__.py +++ b/hathor/nanocontracts/blueprints/__init__.py @@ -18,5 +18,4 @@ from hathor.nanocontracts.blueprint import Blueprint -_blueprints_mapper: dict[str, Type['Blueprint']] = { -} +_blueprints_mapper: dict[str, Type['Blueprint']] = {} diff --git a/hathor/nanocontracts/storage/block_storage.py b/hathor/nanocontracts/storage/block_storage.py index b084be687..ee870b878 100644 --- a/hathor/nanocontracts/storage/block_storage.py +++ b/hathor/nanocontracts/storage/block_storage.py @@ -17,6 +17,7 @@ from enum import Enum from typing import NamedTuple, Optional +from hathor.nanocontracts.exception import NanoContractDoesNotExist from hathor.nanocontracts.nc_types.dataclass_nc_type import make_dataclass_nc_type from hathor.nanocontracts.storage.contract_storage import NCContractStorage from hathor.nanocontracts.storage.patricia_trie import NodeId, PatriciaTrie @@ -102,8 +103,11 @@ def _get_trie(self, root_id: Optional[bytes]) -> 'PatriciaTrie': return trie def get_contract_storage(self, contract_id: ContractId) -> NCContractStorage: - nc_root_id = self.get_contract_root_id(contract_id) - trie = self._get_trie(nc_root_id) + try: + nc_root_id = self.get_contract_root_id(contract_id) + trie = self._get_trie(nc_root_id) + except KeyError: + raise NanoContractDoesNotExist(contract_id.hex()) token_proxy = TokenProxy(self) return NCContractStorage(trie=trie, nc_id=contract_id, token_proxy=token_proxy) diff --git a/pyproject.toml b/pyproject.toml index 9a3bddd5b..00af077d2 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -175,6 +175,7 @@ addopts = "-n auto" markers = [ "slow", ] +norecursedirs = ["tests/nanocontracts/test_blueprints"] [build-system] requires = ["poetry-core >= 1.3.2", "cython < 0.30"] diff --git a/tests/dag_builder/builder.py b/tests/dag_builder/builder.py index 114776322..a787fe83e 100644 --- a/tests/dag_builder/builder.py +++ b/tests/dag_builder/builder.py @@ -21,6 +21,7 @@ from hathor.manager import HathorManager from hathor.util import Random from hathor.wallet import HDWallet +from tests.nanocontracts import test_blueprints from tests.utils import GENESIS_SEED @@ -45,4 +46,5 @@ def from_manager( manager=manager, genesis_words=genesis_words or GENESIS_SEED, wallet_factory=wallet_factory or (lambda: TestDAGBuilder.create_random_hd_wallet(manager.rng)), + blueprints_module=blueprints_module or test_blueprints, ) diff --git a/tests/dag_builder/test_dag_builder.py b/tests/dag_builder/test_dag_builder.py index b67afa894..aa60804fc 100644 --- a/tests/dag_builder/test_dag_builder.py +++ b/tests/dag_builder/test_dag_builder.py @@ -1,7 +1,31 @@ +import pytest + +from hathor.nanocontracts import Blueprint, Context, OnChainBlueprint, public +from hathor.nanocontracts.types import NCDepositAction, NCWithdrawalAction, TokenUid +from hathor.nanocontracts.utils import load_builtin_blueprint_for_ocb from hathor.transaction import Block, Transaction from hathor.transaction.token_creation_tx import TokenCreationTransaction from tests import unittest from tests.dag_builder.builder import TestDAGBuilder +from tests.nanocontracts import test_blueprints + + +class MyBlueprint(Blueprint): + counter: int + + @public + def initialize(self, ctx: Context, initial: int) -> None: + self.counter = initial + + @public + def add(self, ctx: Context, value: int) -> int: + self.counter += value + return self.counter + + @public + def sub(self, ctx: Context, value: int) -> int: + self.counter -= value + return self.counter class DAGBuilderTestCase(unittest.TestCase): @@ -217,3 +241,180 @@ def test_propagate_with(self) -> None: artifacts.propagate_with(self.manager) assert len(list(tx_storage.get_all_transactions())) == 16 # 3 genesis + 10 blocks + dummy + tx1 + tx2 + + def test_nc_transactions(self) -> None: + blueprint_id = b'x' * 32 + self.nc_catalog.blueprints[blueprint_id] = MyBlueprint + + artifacts = self.dag_builder.build_from_str(f""" + blockchain genesis a[0..40] + a30 < dummy + + tx1.nc_id = "{blueprint_id.hex()}" + tx1.nc_method = initialize(0) + + tx2.nc_id = tx1 + tx2.nc_method = add(5) + tx2.nc_deposit = 10 HTR + tx2.nc_deposit = 5 TKA + + tx3.nc_id = tx1 + tx3.nc_method = sub(3) + tx3.nc_deposit = 3 HTR + tx3.nc_withdrawal = 2 TKA + + a31 --> tx1 + a32 --> tx2 + a33 --> tx3 + """) + + artifacts.propagate_with(self.manager) + + tx1 = artifacts.by_name['tx1'].vertex + self.assertIsInstance(tx1, Transaction) + self.assertTrue(tx1.is_nano_contract()) + + htr_id = TokenUid(b'\0') + tka_id = TokenUid(artifacts.by_name['TKA'].vertex.hash) + + tx2 = artifacts.by_name['tx2'].vertex + tx3 = artifacts.by_name['tx3'].vertex + + ctx2 = tx2.get_nano_header().get_context() + self.assertEqual(dict(ctx2.actions), { + tka_id: (NCDepositAction(token_uid=tka_id, amount=5),), + htr_id: (NCDepositAction(token_uid=htr_id, amount=10),), + }) + + ctx3 = tx3.get_nano_header().get_context() + self.assertEqual(dict(ctx3.actions), { + htr_id: (NCDepositAction(token_uid=htr_id, amount=3),), + tka_id: (NCWithdrawalAction(token_uid=tka_id, amount=2),), + }) + + def test_multiline_literals(self) -> None: + artifacts = self.dag_builder.build_from_str(""" + tx.attr1 = ``` + test + ``` + tx.attr2 = ``` + if foo: + bar + ``` + """) + node = artifacts.by_name['tx'].node + + # asserting with raw shifted strings to make sure we get the expected output. + assert node.get_required_literal('attr1') == """\ +test""" + assert node.get_required_literal('attr2') == """\ +if foo: + bar""" + + invalid_start_texts = [ + """ + tx.attr1 = a``` + ``` + """, + """ + tx.attr1 = ```a + ``` + """, + """ + tx.attr1 = ```a``` + """, + ] + + for text in invalid_start_texts: + with pytest.raises(SyntaxError) as e: + self.dag_builder.build_from_str(text) + assert str(e.value) == 'invalid multiline string start' + + invalid_end_texts = [ + """ + tx.attr1 = ``` + a``` + """, + """ + tx.attr1 = ``` + ```a + """, + ] + + for text in invalid_end_texts: + with pytest.raises(SyntaxError) as e: + self.dag_builder.build_from_str(text) + assert str(e.value) == 'invalid multiline string end' + + with pytest.raises(SyntaxError) as e: + self.dag_builder.build_from_str(""" + tx.attr1 = ``` + test + """) + assert str(e.value) == 'unclosed multiline string' + + def test_on_chain_blueprints(self) -> None: + bet_code = load_builtin_blueprint_for_ocb('bet.py', 'Bet', test_blueprints) + private_key = unittest.OCB_TEST_PRIVKEY.hex() + password = unittest.OCB_TEST_PASSWORD.hex() + artifacts = self.dag_builder.build_from_str(f""" + blockchain genesis b[1..11] + b10 < dummy + + ocb1.ocb_private_key = "{private_key}" + ocb1.ocb_password = "{password}" + + ocb2.ocb_private_key = "{private_key}" + ocb2.ocb_password = "{password}" + + ocb3.ocb_private_key = "{private_key}" + ocb3.ocb_password = "{password}" + + nc1.nc_id = ocb1 + nc1.nc_method = initialize("00", "00", 0) + + nc2.nc_id = ocb2 + nc2.nc_method = initialize(0) + + nc3.nc_id = ocb3 + nc3.nc_method = initialize() + + ocb1 <-- ocb2 <-- ocb3 <-- b11 + b11 < nc1 < nc2 < nc3 + + ocb1.ocb_code = "{bet_code.encode().hex()}" + ocb2.ocb_code = test_blueprint1.py, TestBlueprint1 + ocb3.ocb_code = ``` + from hathor.nanocontracts import Blueprint + from hathor.nanocontracts.context import Context + from hathor.nanocontracts.types import public + class MyBlueprint(Blueprint): + @public + def initialize(self, ctx: Context) -> None: + pass + __blueprint__ = MyBlueprint + ``` + """) + + artifacts.propagate_with(self.manager) + ocb1, ocb2, ocb3 = artifacts.get_typed_vertices(['ocb1', 'ocb2', 'ocb3'], OnChainBlueprint) + nc1, nc2, nc3 = artifacts.get_typed_vertices(['nc1', 'nc2', 'nc3'], Transaction) + + assert nc1.is_nano_contract() + assert nc2.is_nano_contract() + assert nc3.is_nano_contract() + + assert ocb1.get_blueprint_class().__name__ == 'Bet' + assert nc1.get_nano_header().nc_id == ocb1.hash + blueprint_class = self.manager.tx_storage.get_blueprint_class(ocb1.hash) + assert blueprint_class.__name__ == 'Bet' + + assert ocb2.get_blueprint_class().__name__ == 'TestBlueprint1' + assert nc2.get_nano_header().nc_id == ocb2.hash + blueprint_class = self.manager.tx_storage.get_blueprint_class(ocb2.hash) + assert blueprint_class.__name__ == 'TestBlueprint1' + + assert ocb3.get_blueprint_class().__name__ == 'MyBlueprint' + assert nc3.get_nano_header().nc_id == ocb3.hash + blueprint_class = self.manager.tx_storage.get_blueprint_class(ocb3.hash) + assert blueprint_class.__name__ == 'MyBlueprint' diff --git a/tests/nanocontracts/__init__.py b/tests/nanocontracts/__init__.py new file mode 100644 index 000000000..e69de29bb diff --git a/tests/nanocontracts/blueprints/__init__.py b/tests/nanocontracts/blueprints/__init__.py new file mode 100644 index 000000000..e69de29bb diff --git a/tests/nanocontracts/blueprints/unittest.py b/tests/nanocontracts/blueprints/unittest.py new file mode 100644 index 000000000..6ad3fdbe3 --- /dev/null +++ b/tests/nanocontracts/blueprints/unittest.py @@ -0,0 +1,103 @@ +from hathor.conf import HathorSettings +from hathor.crypto.util import decode_address +from hathor.manager import HathorManager +from hathor.nanocontracts import Context +from hathor.nanocontracts.blueprint import Blueprint +from hathor.nanocontracts.nc_exec_logs import NCLogConfig +from hathor.nanocontracts.storage import NCBlockStorage, NCMemoryStorageFactory +from hathor.nanocontracts.storage.backends import MemoryNodeTrieStore +from hathor.nanocontracts.storage.patricia_trie import PatriciaTrie +from hathor.nanocontracts.types import Address, BlueprintId, ContractId, NCAction, TokenUid, VertexId +from hathor.nanocontracts.vertex_data import VertexData +from hathor.transaction import BaseTransaction, Transaction +from hathor.util import not_none +from hathor.wallet import KeyPair +from tests import unittest +from tests.nanocontracts.utils import TestRunner + +settings = HathorSettings() + + +class BlueprintTestCase(unittest.TestCase): + use_memory_storage = True + + def setUp(self): + super().setUp() + self.manager = self.build_manager() + self.rng = self.manager.rng + self.wallet = self.manager.wallet + self.reactor = self.manager.reactor + self.nc_catalog = self.manager.tx_storage.nc_catalog + + self.htr_token_uid = settings.HATHOR_TOKEN_UID + self.runner = self.build_runner() + self.now = int(self.reactor.seconds()) + + self._token_index = 1 + + def build_manager(self) -> HathorManager: + """Create a HathorManager instance.""" + return self.create_peer('testnet', nc_indices=True, nc_log_config=NCLogConfig.FAILED, wallet_index=True) + + def register_blueprint_class(self, blueprint_id: BlueprintId, blueprint_class: type[Blueprint]) -> None: + """Register a blueprint class with a given id, allowing contracts to be created from it.""" + assert blueprint_id not in self.nc_catalog.blueprints + self.nc_catalog.blueprints[blueprint_id] = blueprint_class + + def build_runner(self) -> TestRunner: + """Create a Runner instance.""" + nc_storage_factory = NCMemoryStorageFactory() + store = MemoryNodeTrieStore() + block_trie = PatriciaTrie(store) + block_storage = NCBlockStorage(block_trie) + return TestRunner( + self.manager.tx_storage, nc_storage_factory, block_storage, settings=self._settings, reactor=self.reactor + ) + + def gen_random_token_uid(self) -> TokenUid: + """Generate a random token UID (32 bytes).""" + token = self._token_index.to_bytes(32, byteorder='big', signed=False) + self._token_index += 1 + return TokenUid(token) + + def gen_random_address(self) -> Address: + """Generate a random wallet address.""" + address, _ = self.gen_random_address_with_key() + return address + + def gen_random_address_with_key(self) -> tuple[Address, KeyPair]: + """Generate a random wallet address with its key.""" + password = self.rng.randbytes(12) + key = KeyPair.create(password) + address_b58 = key.address + address_bytes = decode_address(not_none(address_b58)) + return Address(address_bytes), key + + def gen_random_contract_id(self) -> ContractId: + """Generate a random contract id.""" + return ContractId(VertexId(self.rng.randbytes(32))) + + def gen_random_blueprint_id(self) -> BlueprintId: + """Generate a random contract id.""" + return BlueprintId(self.rng.randbytes(32)) + + def get_genesis_tx(self) -> Transaction: + """Return a genesis transaction.""" + genesis = self.manager.tx_storage.get_all_genesis() + tx = list(tx for tx in genesis if isinstance(tx, Transaction))[0] + return tx + + def create_context( + self, + actions: list[NCAction] | None = None, + vertex: BaseTransaction | VertexData | None = None, + address: Address | None = None, + timestamp: int | None = None, + ) -> Context: + """Create a Context instance with optional values or defaults.""" + return Context( + actions=actions if actions is not None else [], + vertex=vertex or self.get_genesis_tx(), + address=address or self.gen_random_address(), + timestamp=timestamp or self.now, + ) diff --git a/tests/nanocontracts/fields/__init__.py b/tests/nanocontracts/fields/__init__.py new file mode 100644 index 000000000..e69de29bb diff --git a/tests/nanocontracts/fields/utils.py b/tests/nanocontracts/fields/utils.py new file mode 100644 index 000000000..9f900a66c --- /dev/null +++ b/tests/nanocontracts/fields/utils.py @@ -0,0 +1,51 @@ +# Copyright 2025 Hathor Labs +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from typing import Any, TypeVar + +from typing_extensions import override + +from hathor.nanocontracts.nc_types import NCType +from hathor.nanocontracts.storage import NCContractStorage +from hathor.nanocontracts.storage.types import _NOT_PROVIDED + +T = TypeVar('T') +D = TypeVar('D') + + +class MockNCStorage(NCContractStorage): + __slots__ = ('store',) + + def __init__(self) -> None: + self.store: dict[bytes, Any] = {} + + @override + def get_obj(self, key: bytes, value: NCType[T], *, default: D = _NOT_PROVIDED) -> T | D: + if item := self.store.get(key, default): + return item + if default is _NOT_PROVIDED: + raise KeyError + return default + + @override + def put_obj(self, key: bytes, value: NCType[T], data: T) -> None: + self.store[key] = data + + @override + def del_obj(self, key: bytes) -> None: + del self.store[key] + + @override + def has_obj(self, key: bytes) -> bool: + return key in self.store diff --git a/tests/nanocontracts/on_chain_blueprints/__init__.py b/tests/nanocontracts/on_chain_blueprints/__init__.py new file mode 100644 index 000000000..e69de29bb diff --git a/tests/nanocontracts/on_chain_blueprints/utils.py b/tests/nanocontracts/on_chain_blueprints/utils.py new file mode 100644 index 000000000..38072582b --- /dev/null +++ b/tests/nanocontracts/on_chain_blueprints/utils.py @@ -0,0 +1,10 @@ +from cryptography.hazmat.primitives.asymmetric import ec + +from hathor.wallet import KeyPair +from tests import unittest + + +def get_ocb_private_key() -> ec.EllipticCurvePrivateKey: + """Return the private key used to sign on-chain blueprints on tests.""" + key = KeyPair(unittest.OCB_TEST_PRIVKEY) + return key.get_private_key(unittest.OCB_TEST_PASSWORD) diff --git a/tests/nanocontracts/test_blueprints/__init__.py b/tests/nanocontracts/test_blueprints/__init__.py new file mode 100644 index 000000000..e69de29bb diff --git a/tests/nanocontracts/test_blueprints/bet.py b/tests/nanocontracts/test_blueprints/bet.py new file mode 100644 index 000000000..ee4e05a13 --- /dev/null +++ b/tests/nanocontracts/test_blueprints/bet.py @@ -0,0 +1,221 @@ +# Copyright 2023 Hathor Labs +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from math import floor +from typing import Optional, TypeAlias + +from hathor.nanocontracts.blueprint import Blueprint +from hathor.nanocontracts.context import Context +from hathor.nanocontracts.exception import NCFail +from hathor.nanocontracts.types import ( + Address, + NCAction, + NCDepositAction, + NCWithdrawalAction, + SignedData, + Timestamp, + TokenUid, + TxOutputScript, + public, + view, +) + +Result: TypeAlias = str +Amount: TypeAlias = int + + +class InvalidToken(NCFail): + pass + + +class ResultAlreadySet(NCFail): + pass + + +class ResultNotAvailable(NCFail): + pass + + +class TooManyActions(NCFail): + pass + + +class TooLate(NCFail): + pass + + +class InsufficientBalance(NCFail): + pass + + +class InvalidOracleSignature(NCFail): + pass + + +class Bet(Blueprint): + """Bet blueprint with final result provided by an oracle. + + The life cycle of contracts using this blueprint is the following: + + 1. [Owner ] Create a contract. + 2. [User 1] `bet(...)` on result A. + 3. [User 2] `bet(...)` on result A. + 4. [User 3] `bet(...)` on result B. + 5. [Oracle] `set_result(...)` as result A. + 6. [User 1] `withdraw(...)` + 7. [User 2] `withdraw(...)` + + Notice that, in the example above, users 1 and 2 won. + """ + + # Total bets per result. + bets_total: dict[Result, Amount] + + # Total bets per (result, address). + bets_address: dict[tuple[Result, Address], Amount] + + # Bets grouped by address. + address_details: dict[Address, dict[Result, Amount]] + + # Amount that has already been withdrawn per address. + withdrawals: dict[Address, Amount] + + # Total bets. + total: Amount + + # Final result. + final_result: Optional[Result] + + # Oracle script to set the final result. + oracle_script: TxOutputScript + + # Maximum timestamp to make a bet. + date_last_bet: Timestamp + + # Token for this bet. + token_uid: TokenUid + + @public + def initialize(self, ctx: Context, oracle_script: TxOutputScript, token_uid: TokenUid, + date_last_bet: Timestamp) -> None: + if len(ctx.actions) != 0: + raise NCFail('must be a single call') + self.oracle_script = oracle_script + self.token_uid = token_uid + self.date_last_bet = date_last_bet + self.final_result = None + self.total = Amount(0) + + @view + def has_result(self) -> bool: + """Return True if the final result has already been set.""" + return bool(self.final_result is not None) + + def fail_if_result_is_available(self) -> None: + """Fail the execution if the final result has already been set.""" + if self.has_result(): + raise ResultAlreadySet + + def fail_if_result_is_not_available(self) -> None: + """Fail the execution if the final result is not available yet.""" + if not self.has_result(): + raise ResultNotAvailable + + def fail_if_invalid_token(self, action: NCAction) -> None: + """Fail the execution if the token is invalid.""" + if action.token_uid != self.token_uid: + token1 = self.token_uid.hex() if self.token_uid else None + token2 = action.token_uid.hex() if action.token_uid else None + raise InvalidToken(f'invalid token ({token1} != {token2})') + + def _get_action(self, ctx: Context) -> NCAction: + """Return the only action available; fails otherwise.""" + if len(ctx.actions) != 1: + raise TooManyActions('only one token supported') + if self.token_uid not in ctx.actions: + raise InvalidToken(f'token different from {self.token_uid.hex()}') + return ctx.get_single_action(self.token_uid) + + @public(allow_deposit=True) + def bet(self, ctx: Context, address: Address, score: str) -> None: + """Make a bet.""" + action = self._get_action(ctx) + assert isinstance(action, NCDepositAction) + self.fail_if_result_is_available() + self.fail_if_invalid_token(action) + if ctx.timestamp > self.date_last_bet: + raise TooLate(f'cannot place bets after {self.date_last_bet}') + amount = Amount(action.amount) + self.total = Amount(self.total + amount) + if score not in self.bets_total: + self.bets_total[score] = amount + else: + self.bets_total[score] += amount + key = (score, address) + if key not in self.bets_address: + self.bets_address[key] = amount + else: + self.bets_address[key] += amount + + # Update dict indexed by address + partial = self.address_details.get(address, {}) + partial.update({ + score: self.bets_address[key] + }) + + self.address_details[address] = partial + + @public + def set_result(self, ctx: Context, result: SignedData[Result]) -> None: + """Set final result. This method is called by the oracle.""" + self.fail_if_result_is_available() + if not result.checksig(self.syscall.get_contract_id(), self.oracle_script): + raise InvalidOracleSignature + self.final_result = result.data + + @public(allow_withdrawal=True) + def withdraw(self, ctx: Context) -> None: + """Withdraw tokens after the final result is set.""" + action = self._get_action(ctx) + assert isinstance(action, NCWithdrawalAction) + self.fail_if_result_is_not_available() + self.fail_if_invalid_token(action) + address = Address(ctx.address) + allowed = self.get_max_withdrawal(address) + if action.amount > allowed: + raise InsufficientBalance(f'withdrawal amount is greater than available (max: {allowed})') + if address not in self.withdrawals: + self.withdrawals[address] = action.amount + else: + self.withdrawals[address] += action.amount + + @view + def get_max_withdrawal(self, address: Address) -> Amount: + """Return the maximum amount available for withdrawal.""" + total = self.get_winner_amount(address) + withdrawals = self.withdrawals.get(address, Amount(0)) + return total - withdrawals + + @view + def get_winner_amount(self, address: Address) -> Amount: + """Return how much an address has won.""" + self.fail_if_result_is_not_available() + if self.final_result not in self.bets_total: + return Amount(0) + result_total = self.bets_total[self.final_result] + if result_total == 0: + return Amount(0) + address_total = self.bets_address.get((self.final_result, address), 0) + percentage = address_total / result_total + return Amount(floor(percentage * self.total)) diff --git a/tests/nanocontracts/test_blueprints/test_blueprint1.py b/tests/nanocontracts/test_blueprints/test_blueprint1.py new file mode 100644 index 000000000..14e00d3a8 --- /dev/null +++ b/tests/nanocontracts/test_blueprints/test_blueprint1.py @@ -0,0 +1,27 @@ +# Copyright 2025 Hathor Labs +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from hathor.nanocontracts import Blueprint +from hathor.nanocontracts.context import Context +from hathor.nanocontracts.types import public + + +class TestBlueprint1(Blueprint): + @public + def initialize(self, ctx: Context, a: int) -> None: + pass + + @public + def nop(self, ctx: Context) -> None: + pass diff --git a/tests/nanocontracts/utils.py b/tests/nanocontracts/utils.py new file mode 100644 index 000000000..56249ea83 --- /dev/null +++ b/tests/nanocontracts/utils.py @@ -0,0 +1,56 @@ +from hathor.conf.settings import HathorSettings +from hathor.manager import HathorManager +from hathor.nanocontracts.nc_exec_logs import NCExecEntry, NCLogConfig +from hathor.nanocontracts.runner import Runner +from hathor.nanocontracts.storage import NCBlockStorage, NCStorageFactory +from hathor.reactor import ReactorProtocol +from hathor.transaction.storage import TransactionStorage +from hathor.types import VertexId +from hathor.util import not_none + + +class TestRunner(Runner): + __test__ = False + + def __init__( + self, + tx_storage: TransactionStorage, + storage_factory: NCStorageFactory, + block_storage: NCBlockStorage, + *, + settings: HathorSettings, + reactor: ReactorProtocol, + seed: bytes | None = None, + ) -> None: + if seed is None: + seed = b'x' * 32 + super().__init__( + tx_storage=tx_storage, + storage_factory=storage_factory, + block_storage=block_storage, + settings=settings, + reactor=reactor, + seed=seed, + ) + + +def get_nc_failure_entry(*, manager: HathorManager, tx_id: VertexId, block_id: VertexId) -> NCExecEntry: + """Return the failure entry for a nano execution.""" + nc_log_storage = manager.consensus_algorithm.block_algorithm_factory.nc_log_storage + assert nc_log_storage._config in {NCLogConfig.ALL, NCLogConfig.FAILED}, ( + 'to get NCFail reason, NC logs must be enabled' + ) + logs = not_none(nc_log_storage.get_logs(tx_id, block_id=block_id)) + return logs.entries[block_id][-1] + + +def assert_nc_failure_reason(*, manager: HathorManager, tx_id: VertexId, block_id: VertexId, reason: str) -> None: + """A function to assert NCFail reason in tests by inspecting NC logs.""" + failure_entry = get_nc_failure_entry(manager=manager, tx_id=tx_id, block_id=block_id) + assert failure_entry.error_traceback is not None, 'no error found' + assert reason in failure_entry.error_traceback, ( + f'reason not found in nano error traceback\n\n' + f'expected: "{reason}"\n' + f'found:\n\n' + f'{failure_entry.error_traceback}' + ) diff --git a/tests/resources/nanocontracts/base_resource.py b/tests/resources/nanocontracts/base_resource.py new file mode 100644 index 000000000..8921c4a73 --- /dev/null +++ b/tests/resources/nanocontracts/base_resource.py @@ -0,0 +1,40 @@ +from hathor.manager import HathorManager +from hathor.nanocontracts import Blueprint, OnChainBlueprint +from hathor.nanocontracts.catalog import NCBlueprintCatalog +from hathor.nanocontracts.types import BlueprintId +from tests.resources.base_resource import _BaseResourceTest + + +class GenericNanoResourceTest(_BaseResourceTest._ResourceTest): + __test__ = False + + def create_builtin_blueprint( + self, + manager: HathorManager, + blueprint_id: BlueprintId, + blueprint_class: type[Blueprint], + ) -> None: + manager.tx_storage.nc_catalog = NCBlueprintCatalog({ + blueprint_id: blueprint_class, + }) + + def create_on_chain_blueprint(self, manager: HathorManager, nc_code: str) -> OnChainBlueprint: + from hathor.nanocontracts.on_chain_blueprint import Code + from tests.nanocontracts.on_chain_blueprints.utils import get_ocb_private_key + code = Code.from_python_code(nc_code, self._settings) + timestamp = manager.tx_storage.latest_timestamp + 1 + parents = manager.get_new_tx_parents(timestamp) + blueprint = OnChainBlueprint( + weight=1, + inputs=[], + outputs=[], + parents=parents, + storage=manager.tx_storage, + timestamp=timestamp, + code=code, + ) + blueprint.weight = manager.daa.minimum_tx_weight(blueprint) + blueprint.sign(get_ocb_private_key()) + manager.cpu_mining_service.resolve(blueprint) + manager.reactor.advance(2) # type: ignore + return blueprint diff --git a/tests/resources/nanocontracts/dummy_blueprint.py b/tests/resources/nanocontracts/dummy_blueprint.py new file mode 100644 index 000000000..911673714 --- /dev/null +++ b/tests/resources/nanocontracts/dummy_blueprint.py @@ -0,0 +1,18 @@ +from hathor.nanocontracts import Blueprint +from hathor.nanocontracts.context import Context +from hathor.nanocontracts.types import public + + +class TestBlueprint(Blueprint): + """ This class is used by the test for the blueprint source code resource + It must be in a separate file for the assert in the test + """ + int_attribute: int + + @public + def initialize(self, ctx: Context) -> None: + self.int_attribute = 0 + + @public + def sum(self, ctx: Context, arg1: int) -> None: + self.int_attribute += arg1 diff --git a/tests/resources/nanocontracts/my_blueprint.py b/tests/resources/nanocontracts/my_blueprint.py new file mode 100644 index 000000000..59efb7578 --- /dev/null +++ b/tests/resources/nanocontracts/my_blueprint.py @@ -0,0 +1,58 @@ +from typing import Optional + +from hathor.nanocontracts import Blueprint +from hathor.nanocontracts.context import Context +from hathor.nanocontracts.types import Address, Amount, SignedData, Timestamp, TokenUid, TxOutputScript, public, view + + +class MyBlueprint(Blueprint): + a_int: int + a_str: str + a_bool: bool + a_address: Address + a_amount: Amount + a_timestamp: Timestamp + a_token_uid: TokenUid + a_script: TxOutputScript + a_signed_data: SignedData[str] + a_dict: dict[str, int] + a_tuple: tuple[str, int, bool] + a_dict_dict_tuple: dict[str, tuple[str, int]] + a_optional_int: Optional[int] + + @public + def initialize(self, ctx: Context) -> None: + pass + + @public + def nop(self, ctx: Context, arg1: int, arg2: SignedData[str]) -> None: + """No operation.""" + self.a = arg1 + + @view + def my_private_method_nop(self, arg1: int) -> int: + return 1 + + @view + def my_private_method_2(self) -> dict[dict[str, int], tuple[bool, str, int, int]]: + return {} + + @view + def my_private_method_3(self) -> list[str]: + return [] + + @view + def my_private_method_4(self) -> set[int]: + return set() + + @view + def my_private_method_5(self) -> str | None: + return None + + @view + def my_private_method_6(self) -> None | str: + return None + + @view + def my_private_method_7(self) -> str | int | bool | None: + return 0 diff --git a/tests/resources/nanocontracts/test_blueprint.py b/tests/resources/nanocontracts/test_blueprint.py new file mode 100644 index 000000000..5772718a6 --- /dev/null +++ b/tests/resources/nanocontracts/test_blueprint.py @@ -0,0 +1,156 @@ +from collections.abc import Generator +from typing import Any + +from twisted.internet.defer import Deferred, inlineCallbacks + +from hathor.nanocontracts.resources.blueprint import BlueprintInfoResource +from hathor.nanocontracts.types import BlueprintId, VertexId +from hathor.nanocontracts.utils import load_builtin_blueprint_for_ocb +from hathor.simulator.utils import add_new_blocks +from tests.resources.base_resource import StubSite +from tests.resources.nanocontracts.base_resource import GenericNanoResourceTest + + +class BaseBlueprintInfoTest(GenericNanoResourceTest): + # this is what subclasses have to define + blueprint_id: BlueprintId + + def setUp(self): + super().setUp() + self.manager = self.create_peer('testnet') + self.web = StubSite(BlueprintInfoResource(self.manager)) + + @inlineCallbacks + def test_fail_missing_id(self) -> Generator[Deferred[Any], Any, None]: + response1 = yield self.web.get('blueprint') + self.assertEqual(400, response1.responseCode) + + @inlineCallbacks + def test_fail_invalid_id(self) -> Generator[Deferred[Any], Any, None]: + response1 = yield self.web.get( + 'blueprint', + { + b'blueprint_id': b'xxx', + } + ) + self.assertEqual(400, response1.responseCode) + + @inlineCallbacks + def test_fail_unknown_id(self) -> Generator[Deferred[Any], Any, None]: + response1 = yield self.web.get( + 'blueprint', + { + b'blueprint_id': b'0' * 32, + } + ) + self.assertEqual(404, response1.responseCode) + + @inlineCallbacks + def test_success(self) -> Generator[Deferred[Any], Any, None]: + response1 = yield self.web.get( + 'blueprint', + { + b'blueprint_id': bytes(self.blueprint_id.hex(), 'utf-8'), + } + ) + data = response1.json_value() + + self.assertEqual(data['id'], self.blueprint_id.hex()) + self.assertEqual(data['name'], 'MyBlueprint') + self.assertEqual(data['attributes'], { + 'a_int': 'int', + 'a_str': 'str', + 'a_bool': 'bool', + 'a_address': 'Address', + 'a_amount': 'Amount', + 'a_timestamp': 'Timestamp', + 'a_token_uid': 'TokenUid', + 'a_script': 'TxOutputScript', + 'a_signed_data': 'SignedData[str]', + 'a_dict': 'dict[str, int]', + 'a_tuple': 'tuple[str, int, bool]', + 'a_dict_dict_tuple': 'dict[str, tuple[str, int]]', + 'a_optional_int': 'int?', + }) + self.assertEqual(data['public_methods'], { + 'initialize': { + 'args': [], + 'return_type': 'null', + 'docstring': None, + }, + 'nop': { + 'args': [{ + 'name': 'arg1', + 'type': 'int' + }, { + 'name': 'arg2', + 'type': 'SignedData[str]', + }], + 'return_type': 'null', + 'docstring': 'No operation.', + }, + }) + expected_data = { + 'my_private_method_nop': { + 'args': [{ + 'name': 'arg1', + 'type': 'int', + }], + 'return_type': 'int', + 'docstring': None, + }, + 'my_private_method_2': { + 'args': [], + 'return_type': 'dict[dict[str, int], tuple[bool, str, int, int]]', + 'docstring': None, + }, + 'my_private_method_3': { + 'args': [], + 'return_type': 'list[str]', + 'docstring': None, + }, + 'my_private_method_4': { + 'args': [], + 'return_type': 'set[int]', + 'docstring': None, + }, + 'my_private_method_5': { + 'args': [], + 'return_type': 'str?', + 'docstring': None, + }, + 'my_private_method_6': { + 'args': [], + 'return_type': 'str?', + 'docstring': None, + }, + 'my_private_method_7': { + 'args': [], + 'return_type': 'union[str, int, bool, null]', + 'docstring': None, + }, + } + self.assertEqual(data['private_methods'], expected_data) + + +class BuiltinBlueprintInfoTest(BaseBlueprintInfoTest): + __test__ = True + + def setUp(self): + super().setUp() + from tests.resources.nanocontracts import my_blueprint + self.blueprint_id = BlueprintId(VertexId(b'3cb032600bdf7db784800e4ea911b10676fa2f67591f82bb62628c234e771595')) + self.create_builtin_blueprint(self.manager, self.blueprint_id, my_blueprint.MyBlueprint) + + +class OCBBlueprintInfoTest(BaseBlueprintInfoTest): + __test__ = True + + def setUp(self): + super().setUp() + from tests.resources import nanocontracts + nc_code = load_builtin_blueprint_for_ocb('my_blueprint.py', 'MyBlueprint', nanocontracts) + blueprint = self.create_on_chain_blueprint(self.manager, nc_code) + self.manager.vertex_handler.on_new_relayed_vertex(blueprint) + add_new_blocks(self.manager, 1, advance_clock=30) # confirm the on-chain blueprint vertex + self.blueprint_id = BlueprintId(VertexId(blueprint.hash)) diff --git a/tests/resources/nanocontracts/test_blueprint_source_code.py b/tests/resources/nanocontracts/test_blueprint_source_code.py new file mode 100644 index 000000000..4523e6ee8 --- /dev/null +++ b/tests/resources/nanocontracts/test_blueprint_source_code.py @@ -0,0 +1,121 @@ +from twisted.internet.defer import inlineCallbacks + +from hathor.nanocontracts.resources import BlueprintSourceCodeResource +from hathor.nanocontracts.types import BlueprintId +from hathor.nanocontracts.utils import load_builtin_blueprint_for_ocb +from hathor.simulator.utils import add_new_blocks +from tests.resources.base_resource import StubSite +from tests.resources.nanocontracts.base_resource import GenericNanoResourceTest + + +class BaseBlueprintSourceCodeTest(GenericNanoResourceTest): + __test__ = False + + # this is what subclasses have to define + blueprint_id: BlueprintId + blueprint_source: str + + def setUp(self): + super().setUp() + self.manager = self.create_peer('testnet') + self.web = StubSite(BlueprintSourceCodeResource(self.manager)) + + @inlineCallbacks + def test_fail_missing_id(self): + response1 = yield self.web.get('blueprint/source') + self.assertEqual(400, response1.responseCode) + + @inlineCallbacks + def test_fail_invalid_id(self): + response1 = yield self.web.get( + 'blueprint/source', + { + b'blueprint_id': b'xxx', + } + ) + self.assertEqual(400, response1.responseCode) + + @inlineCallbacks + def test_fail_unknown_id(self): + response1 = yield self.web.get( + 'blueprint/source', + { + b'blueprint_id': b'0' * 32, + } + ) + self.assertEqual(404, response1.responseCode) + + @inlineCallbacks + def test_success(self): + response1 = yield self.web.get( + 'blueprint/source', + { + b'blueprint_id': bytes(self.blueprint_id.hex(), 'utf-8'), + } + ) + data = response1.json_value() + self.assertEqual(self.blueprint_source, data['source_code']) + + +class BuiltinBlueprintSourceCodeTest(BaseBlueprintSourceCodeTest): + __test__ = True + + blueprint_source = r'''from hathor.nanocontracts import Blueprint +from hathor.nanocontracts.context import Context +from hathor.nanocontracts.types import public + + +class TestBlueprint(Blueprint): + """ This class is used by the test for the blueprint source code resource + It must be in a separate file for the assert in the test + """ + int_attribute: int + + @public + def initialize(self, ctx: Context) -> None: + self.int_attribute = 0 + + @public + def sum(self, ctx: Context, arg1: int) -> None: + self.int_attribute += arg1 +''' + + def setUp(self): + super().setUp() + from tests.resources.nanocontracts import dummy_blueprint + self.blueprint_id = BlueprintId(b'3cb032600bdf7db784800e4ea911b10676fa2f67591f82bb62628c234e771595') + self.create_builtin_blueprint(self.manager, self.blueprint_id, dummy_blueprint.TestBlueprint) + + +class OCBBlueprintSourceCodeTest(BaseBlueprintSourceCodeTest): + __test__ = True + + blueprint_source = r'''from hathor.nanocontracts import Blueprint +from hathor.nanocontracts.context import Context +from hathor.nanocontracts.types import public + + +class TestBlueprint(Blueprint): + """ This class is used by the test for the blueprint source code resource + It must be in a separate file for the assert in the test + """ + int_attribute: int + + @public + def initialize(self, ctx: Context) -> None: + self.int_attribute = 0 + + @public + def sum(self, ctx: Context, arg1: int) -> None: + self.int_attribute += arg1 +__blueprint__ = TestBlueprint +''' + + def setUp(self): + super().setUp() + from tests.resources import nanocontracts + nc_code = load_builtin_blueprint_for_ocb('dummy_blueprint.py', 'TestBlueprint', nanocontracts) + blueprint = self.create_on_chain_blueprint(self.manager, nc_code) + self.manager.vertex_handler.on_new_relayed_vertex(blueprint) + add_new_blocks(self.manager, 1, advance_clock=30) # confirm the on-chain blueprint vertex + self.blueprint_id = BlueprintId(blueprint.hash) diff --git a/tests/resources/nanocontracts/test_builtin.py b/tests/resources/nanocontracts/test_builtin.py new file mode 100644 index 000000000..6dfed0898 --- /dev/null +++ b/tests/resources/nanocontracts/test_builtin.py @@ -0,0 +1,277 @@ +# Copyright 2025 Hathor Labs +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from hathor.nanocontracts import Blueprint, Context, public +from hathor.nanocontracts.catalog import NCBlueprintCatalog +from hathor.nanocontracts.resources.builtin import BlueprintBuiltinResource +from tests.resources.base_resource import StubSite, _BaseResourceTest + + +class MyBlueprint1(Blueprint): + @public + def initialize(self, ctx: Context) -> None: + pass + + +class MyBlueprint2(Blueprint): + @public + def initialize(self, ctx: Context) -> None: + pass + + +class BlueprintBuiltinResourceTest(_BaseResourceTest._ResourceTest): + + def setUp(self): + super().setUp() + self.manager = self.create_peer( + 'testnet', + nc_indices=True, + ) + self.web = StubSite(BlueprintBuiltinResource(self.manager)) + + self.manager.tx_storage.nc_catalog = NCBlueprintCatalog({ + (b'\x11' * 32): MyBlueprint1, + (b'\x22' * 32): MyBlueprint2, + (b'\x33' * 32): MyBlueprint2, + (b'\x44' * 32): MyBlueprint2, + (b'\x55' * 32): MyBlueprint2, + }) + + async def test_success(self) -> None: + response = await self.web.get('builtin') + data = response.json_value() + assert data == dict( + success=True, + before=None, + after=None, + count=10, + has_more=False, + blueprints=[ + dict(id='11' * 32, name='MyBlueprint1'), + dict(id='22' * 32, name='MyBlueprint2'), + dict(id='33' * 32, name='MyBlueprint2'), + dict(id='44' * 32, name='MyBlueprint2'), + dict(id='55' * 32, name='MyBlueprint2'), + ], + ) + + async def test_pagination(self) -> None: + response = await self.web.get('builtin', { + b'count': b'2', + }) + data = response.json_value() + assert data == dict( + success=True, + before=None, + after=None, + count=2, + has_more=True, + blueprints=[ + dict(id='11' * 32, name='MyBlueprint1'), + dict(id='22' * 32, name='MyBlueprint2'), + ], + ) + + after = '22' * 32 + response = await self.web.get('builtin', { + b'after': after.encode(), + b'count': b'2', + }) + data = response.json_value() + assert data == dict( + success=True, + before=None, + after=after, + count=2, + has_more=True, + blueprints=[ + dict(id='33' * 32, name='MyBlueprint2'), + dict(id='44' * 32, name='MyBlueprint2'), + ], + ) + + after = '44' * 32 + response = await self.web.get('builtin', { + b'after': after.encode(), + b'count': b'2', + }) + data = response.json_value() + assert data == dict( + success=True, + before=None, + after=after, + count=2, + has_more=False, + blueprints=[ + dict(id='55' * 32, name='MyBlueprint2'), + ], + ) + + after = '55' * 32 + response = await self.web.get('builtin', { + b'after': after.encode(), + b'count': b'2', + }) + data = response.json_value() + assert data == dict( + success=True, + before=None, + after=after, + count=2, + has_more=False, + blueprints=[], + ) + + before = '55' * 32 + response = await self.web.get('builtin', { + b'before': before.encode(), + b'count': b'2', + }) + data = response.json_value() + assert data == dict( + success=True, + before=before, + after=None, + count=2, + has_more=True, + blueprints=[ + dict(id='44' * 32, name='MyBlueprint2'), + dict(id='33' * 32, name='MyBlueprint2'), + ], + ) + + before = '33' * 32 + response = await self.web.get('builtin', { + b'before': before.encode(), + b'count': b'2', + }) + data = response.json_value() + assert data == dict( + success=True, + before=before, + after=None, + count=2, + has_more=False, + blueprints=[ + dict(id='22' * 32, name='MyBlueprint2'), + dict(id='11' * 32, name='MyBlueprint1'), + ], + ) + + before = '11' * 32 + response = await self.web.get('builtin', { + b'before': before.encode(), + b'count': b'2', + }) + data = response.json_value() + assert data == dict( + success=True, + before=before, + after=None, + count=2, + has_more=False, + blueprints=[], + ) + + async def test_search_by_id(self) -> None: + bp_id = '33' * 32 + response = await self.web.get('builtin', { + b'search': bp_id.encode(), + }) + data = response.json_value() + assert data == dict( + success=True, + before=None, + after=None, + count=10, + has_more=False, + blueprints=[ + dict(id=bp_id, name='MyBlueprint2'), + ], + ) + + # tx exists but is not a blueprint + bp_id = self._settings.GENESIS_TX1_HASH.hex() + response = await self.web.get('builtin', { + b'search': bp_id.encode(), + }) + data = response.json_value() + assert data == dict( + success=True, + before=None, + after=None, + count=10, + has_more=False, + blueprints=[], + ) + + response = await self.web.get('builtin', { + b'search': b'ff' * 32, + }) + data = response.json_value() + assert data == dict( + success=True, + before=None, + after=None, + count=10, + has_more=False, + blueprints=[], + ) + + async def test_search_by_name(self) -> None: + response = await self.web.get('builtin', { + b'search': b'myblueprint1', + }) + data = response.json_value() + assert data == dict( + success=True, + before=None, + after=None, + count=10, + has_more=False, + blueprints=[ + dict(id='11' * 32, name='MyBlueprint1'), + ], + ) + + response = await self.web.get('builtin', { + b'search': b'MyBlueprint2', + }) + data = response.json_value() + assert data == dict( + success=True, + before=None, + after=None, + count=10, + has_more=False, + blueprints=[ + dict(id='22' * 32, name='MyBlueprint2'), + dict(id='33' * 32, name='MyBlueprint2'), + dict(id='44' * 32, name='MyBlueprint2'), + dict(id='55' * 32, name='MyBlueprint2'), + ], + ) + + response = await self.web.get('builtin', { + b'search': b'Unknown', + }) + data = response.json_value() + assert data == dict( + success=True, + before=None, + after=None, + count=10, + has_more=False, + blueprints=[], + ) diff --git a/tests/resources/nanocontracts/test_history.py b/tests/resources/nanocontracts/test_history.py new file mode 100644 index 000000000..4d398ccf8 --- /dev/null +++ b/tests/resources/nanocontracts/test_history.py @@ -0,0 +1,252 @@ +from typing import Any + +from cryptography.hazmat.primitives.asymmetric import ec +from twisted.internet.defer import inlineCallbacks + +from hathor.conf import HathorSettings +from hathor.nanocontracts import Blueprint, Context, public +from hathor.nanocontracts.catalog import NCBlueprintCatalog +from hathor.nanocontracts.method import Method +from hathor.nanocontracts.resources import NanoContractHistoryResource +from hathor.nanocontracts.utils import sign_openssl +from hathor.simulator.utils import add_new_block +from hathor.transaction import Transaction +from hathor.transaction.headers import NanoHeader +from tests.resources.base_resource import StubSite, _BaseResourceTest +from tests.utils import add_blocks_unlock_reward, get_genesis_key + +settings = HathorSettings() + + +class MyBlueprint(Blueprint): + a: int + + @public + def initialize(self, ctx: Context, a: int) -> None: + self.a = a + + @public + def set_a(self, ctx: Context, a: int) -> None: + self.a = a + + +class NanoContractHistoryTest(_BaseResourceTest._ResourceTest): + def setUp(self): + super().setUp() + + self.manager = self.create_peer( + 'testnet', + unlock_wallet=True, + wallet_index=True, + nc_indices=True, + ) + self.tx_storage = self.manager.tx_storage + + self.genesis = self.tx_storage.get_all_genesis() + self.genesis_blocks = [tx for tx in self.genesis if tx.is_block] + self.genesis_txs = [tx for tx in self.genesis if not tx.is_block] + + # read genesis keys + self.genesis_private_key = get_genesis_key() + self.genesis_public_key = self.genesis_private_key.public_key() + + add_blocks_unlock_reward(self.manager) + + self.web = StubSite(NanoContractHistoryResource(self.manager)) + + self.blueprint_id = b'1' * 32 + self.catalog = NCBlueprintCatalog({ + self.blueprint_id: MyBlueprint + }) + self.tx_storage.nc_catalog = self.catalog + self.nc_seqnum = 0 + + @inlineCallbacks + def test_fail_missing_id(self): + response1 = yield self.web.get('history') + self.assertEqual(400, response1.responseCode) + + @inlineCallbacks + def test_fail_invalid_id(self): + response1 = yield self.web.get( + 'history', + { + b'id': b'xxx', + } + ) + self.assertEqual(400, response1.responseCode) + + @inlineCallbacks + def test_fail_unknown_id(self): + response1 = yield self.web.get( + 'history', + { + b'id': b'0' * 32, + } + ) + self.assertEqual(404, response1.responseCode) + + @inlineCallbacks + def test_fail_not_contract_id(self): + response1 = yield self.web.get( + 'history', + { + b'id': self.genesis_txs[0].hash.hex().encode('ascii'), + } + ) + self.assertEqual(404, response1.responseCode) + + def _fill_nc(self, + nc: Transaction, + nc_id: bytes, + nc_method: str, + nc_args: list[Any], + private_key: ec.EllipticCurvePrivateKeyWithSerialization) -> None: + + method = getattr(MyBlueprint, nc_method) + method_parser = Method.from_callable(method) + nc_args_bytes = method_parser.serialize_args_bytes(nc_args) + + nano_header = NanoHeader( + tx=nc, + nc_seqnum=self.nc_seqnum, + nc_id=nc_id, + nc_method=nc_method, + nc_args_bytes=nc_args_bytes, + nc_address=b'', + nc_script=b'', + nc_actions=[], + ) + nc.headers.append(nano_header) + self.nc_seqnum += 1 + + sign_openssl(nano_header, private_key) + self.manager.cpu_mining_service.resolve(nc) + + def _create_contract(self, parents: list[bytes], timestamp: int) -> Transaction: + nc = Transaction( + weight=1, + inputs=[], + outputs=[], + parents=parents, + storage=self.tx_storage, + timestamp=timestamp + ) + self._fill_nc(nc, self.blueprint_id, 'initialize', [0], self.genesis_private_key) + self.assertTrue(self.manager.on_new_tx(nc)) + add_new_block(self.manager) + return nc + + @inlineCallbacks + def test_success(self): + parents = [tx.hash for tx in self.genesis_txs] + timestamp = 1 + max(tx.timestamp for tx in self.genesis) + nc1 = self._create_contract(parents, timestamp) + + timestamp += 1 + nc2 = self._create_contract(parents, timestamp) + self.assertNotEqual(nc1.hash, nc2.hash) + + response1 = yield self.web.get( + 'history', + { + b'id': bytes(nc1.hash.hex(), 'utf-8'), + } + ) + data1 = response1.json_value() + self.assertEqual(len(data1['history']), 1) + self.assertEqual(data1['has_more'], False) + self.assertEqual(data1['history'][0]['hash'], nc1.hash.hex()) + self.assertEqual(data1['history'][0]['nc_method'], 'initialize') + + # Now we create a transaction + tx1 = Transaction( + weight=1, + inputs=[], + outputs=[], + parents=parents, + storage=self.tx_storage, + timestamp=timestamp + ) + self._fill_nc(tx1, nc1.hash, 'set_a', [1], self.genesis_private_key) + self.assertTrue(self.manager.on_new_tx(tx1)) + add_new_block(self.manager) + + # Check both transactions belongs to nc1 history. + response2 = yield self.web.get( + 'history', + { + b'id': nc1.hash.hex().encode('ascii'), + } + ) + data2 = response2.json_value() + self.assertEqual(data2['has_more'], False) + self.assertEqual(len(data2['history']), 2) + ids = [tx['hash'] for tx in data2['history']] + self.assertEqual(ids, [tx1.hash.hex(), nc1.hash.hex()]) + + # Check paging works minimally with after + response2a = yield self.web.get( + 'history', + { + b'id': nc1.hash.hex().encode('ascii'), + b'count': b'1', + b'after': ids[0].encode('ascii'), + } + ) + data2a = response2a.json_value() + self.assertEqual(len(data2a['history']), 1) + self.assertEqual(data2a['has_more'], False) + self.assertEqual(data2a['count'], 1) + self.assertEqual(data2a['after'], ids[0]) + self.assertEqual(data2a['before'], None) + paginated_ids = [tx['hash'] for tx in data2a['history']] + self.assertEqual(paginated_ids, [ids[1]]) + + # Check paging works minimally with before + response2b = yield self.web.get( + 'history', + { + b'id': nc1.hash.hex().encode('ascii'), + b'count': b'1', + b'before': ids[1].encode('ascii'), + } + ) + data2b = response2b.json_value() + self.assertEqual(len(data2b['history']), 1) + self.assertEqual(data2b['has_more'], False) + self.assertEqual(data2b['count'], 1) + self.assertEqual(data2b['after'], None) + self.assertEqual(data2b['before'], ids[1]) + paginated_ids = [tx['hash'] for tx in data2b['history']] + self.assertEqual(paginated_ids, [ids[0]]) + + # Getting the first page only + response2c = yield self.web.get( + 'history', + { + b'id': nc1.hash.hex().encode('ascii'), + b'count': b'1', + } + ) + data2c = response2c.json_value() + self.assertEqual(len(data2c['history']), 1) + self.assertEqual(data2c['has_more'], True) + self.assertEqual(data2c['count'], 1) + self.assertEqual(data2c['after'], None) + self.assertEqual(data2c['before'], None) + paginated_ids = [tx['hash'] for tx in data2c['history']] + self.assertEqual(paginated_ids, [ids[0]]) + + # Make sure nc2 index still has only one tx. + response3 = yield self.web.get( + 'history', + { + b'id': nc2.hash.hex().encode('ascii'), + } + ) + data3 = response3.json_value() + self.assertEqual(data3['has_more'], False) + self.assertEqual(len(data3['history']), 1) + ids = set(tx['hash'] for tx in data3['history']) + self.assertEqual(ids, {nc2.hash.hex()}) diff --git a/tests/resources/nanocontracts/test_nc_creation.py b/tests/resources/nanocontracts/test_nc_creation.py new file mode 100644 index 000000000..4a4118a89 --- /dev/null +++ b/tests/resources/nanocontracts/test_nc_creation.py @@ -0,0 +1,570 @@ +# Copyright 2025 Hathor Labs +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from typing import Any + +from hathor.nanocontracts.resources.nc_creation import NCCreationResource +from hathor.nanocontracts.types import BlueprintId, VertexId +from hathor.nanocontracts.utils import load_builtin_blueprint_for_ocb +from hathor.transaction import Transaction +from tests import unittest +from tests.dag_builder.builder import TestDAGBuilder +from tests.nanocontracts import test_blueprints +from tests.nanocontracts.test_blueprints.bet import Bet +from tests.resources.base_resource import StubSite, _BaseResourceTest +from tests.utils import get_genesis_key + + +class NCCreationResourceTest(_BaseResourceTest._ResourceTest): + + def setUp(self): + super().setUp() + self.manager = self.create_peer( + 'testnet', + nc_indices=True, + ) + self.web = StubSite(NCCreationResource(self.manager)) + self.genesis_private_key = get_genesis_key() + self.builtin_bet_blueprint_id = BlueprintId(self.manager.rng.randbytes(32)) + self.manager.tx_storage.nc_catalog.blueprints[self.builtin_bet_blueprint_id] = Bet + + def prepare_ncs(self) -> tuple[Transaction, Transaction, Transaction, Transaction, Transaction]: + bet_code = load_builtin_blueprint_for_ocb('bet.py', 'Bet', test_blueprints) + private_key = unittest.OCB_TEST_PRIVKEY.hex() + password = unittest.OCB_TEST_PASSWORD.hex() + dag_builder = TestDAGBuilder.from_manager(self.manager) + artifacts = dag_builder.build_from_str(f''' + blockchain genesis b[1..11] + b10 < dummy + + ocb1.ocb_private_key = "{private_key}" + ocb1.ocb_password = "{password}" + + ocb2.ocb_private_key = "{private_key}" + ocb2.ocb_password = "{password}" + + nc1.nc_id = ocb2 + nc1.nc_method = initialize() + + nc2.nc_id = "{self.builtin_bet_blueprint_id.hex()}" + nc2.nc_method = initialize("00", "00", 0) + + nc3.nc_id = ocb2 + nc3.nc_method = initialize() + + nc4.nc_id = ocb1 + nc4.nc_method = initialize("00", "00", 0) + + nc5.nc_id = "{self.builtin_bet_blueprint_id.hex()}" + nc5.nc_method = initialize("00", "00", 0) + + ocb1 <-- ocb2 <-- b11 + b11 < nc1 < nc2 < nc3 < nc4 < nc5 + + ocb1.ocb_code = "{bet_code.encode().hex()}" + ocb2.ocb_code = ``` + from hathor.nanocontracts import Blueprint + from hathor.nanocontracts.context import Context + from hathor.nanocontracts.types import public + class MyBlueprint(Blueprint): + @public + def initialize(self, ctx: Context) -> None: + pass + __blueprint__ = MyBlueprint + ``` + ''') + + artifacts.propagate_with(self.manager) + nc1, nc2, nc3, nc4, nc5 = artifacts.get_typed_vertices(['nc1', 'nc2', 'nc3', 'nc4', 'nc5'], Transaction) + assert nc1.is_nano_contract() + assert nc2.is_nano_contract() + assert nc3.is_nano_contract() + assert nc4.is_nano_contract() + assert nc5.is_nano_contract() + return nc1, nc2, nc3, nc4, nc5 + + def nc_to_response_item(self, nc: Transaction) -> dict[str, Any]: + assert nc.storage is not None + assert nc.is_nano_contract() + nano_header = nc.get_nano_header() + blueprint_id = BlueprintId(VertexId(nano_header.nc_id)) + blueprint_class = nc.storage.get_blueprint_class(blueprint_id) + return dict( + nano_contract_id=nc.hash_hex, + blueprint_id=blueprint_id.hex(), + blueprint_name=blueprint_class.__name__, + last_tx_timestamp=nc.timestamp, + total_txs=1, + created_at=nc.timestamp, + ) + + async def test_success(self) -> None: + nc1, nc2, nc3, nc4, nc5 = self.prepare_ncs() + response = await self.web.get('creation') + data = response.json_value() + + assert data == dict( + success=True, + before=None, + after=None, + count=10, + has_more=False, + nc_creation_txs=[ + self.nc_to_response_item(nc5), + self.nc_to_response_item(nc4), + self.nc_to_response_item(nc3), + self.nc_to_response_item(nc2), + self.nc_to_response_item(nc1), + ], + ) + + async def test_tx_aggregation(self) -> None: + private_key = unittest.OCB_TEST_PRIVKEY.hex() + password = unittest.OCB_TEST_PASSWORD.hex() + dag_builder = TestDAGBuilder.from_manager(self.manager) + artifacts = dag_builder.build_from_str(f''' + blockchain genesis b[1..12] + b10 < dummy + + ocb1.ocb_private_key = "{private_key}" + ocb1.ocb_password = "{password}" + ocb1.ocb_code = test_blueprint1.py, TestBlueprint1 + + ocb2.ocb_private_key = "{private_key}" + ocb2.ocb_password = "{password}" + ocb2.ocb_code = test_blueprint1.py, TestBlueprint1 + + nc1.nc_id = ocb1 + nc1.nc_method = initialize(0) + + nc2.nc_id = ocb2 + nc2.nc_method = initialize(0) + + nc3.nc_id = nc2 + nc3.nc_method = nop() + + nc4.nc_id = nc1 + nc4.nc_method = nop() + + nc5.nc_id = nc2 + nc5.nc_method = nop() + + nc6.nc_id = nc2 + nc6.nc_method = nop() + + nc7.nc_id = nc1 + nc7.nc_method = nop() + + ocb1 <-- ocb2 <-- b11 + b11 < nc1 < nc2 < nc3 < nc4 < nc5 < nc6 < nc7 + + nc1 <-- nc2 <-- nc3 <-- b12 + ''') + + artifacts.propagate_with(self.manager) + nc1, nc2, nc6, nc7 = artifacts.get_typed_vertices(['nc1', 'nc2', 'nc6', 'nc7'], Transaction) + assert nc1.is_nano_contract() + assert nc2.is_nano_contract() + assert nc6.is_nano_contract() + assert nc7.is_nano_contract() + response = await self.web.get('creation') + data = response.json_value() + + assert data == dict( + success=True, + before=None, + after=None, + count=10, + has_more=False, + nc_creation_txs=[ + dict( + nano_contract_id=nc2.hash_hex, + blueprint_id=nc2.get_nano_header().nc_id.hex(), + blueprint_name='TestBlueprint1', + last_tx_timestamp=nc6.timestamp, + total_txs=4, + created_at=nc2.timestamp, + ), + dict( + nano_contract_id=nc1.hash_hex, + blueprint_id=nc1.get_nano_header().nc_id.hex(), + blueprint_name='TestBlueprint1', + last_tx_timestamp=nc7.timestamp, + total_txs=3, + created_at=nc1.timestamp, + ) + ], + ) + + async def test_pagination(self) -> None: + nc1, nc2, nc3, nc4, nc5 = self.prepare_ncs() + response = await self.web.get('creation', { + b'count': b'2', + }) + data = response.json_value() + assert data == dict( + success=True, + before=None, + after=None, + count=2, + has_more=True, + nc_creation_txs=[ + self.nc_to_response_item(nc5), + self.nc_to_response_item(nc4), + ], + ) + + response = await self.web.get('creation', { + b'after': nc4.hash_hex.encode(), + b'count': b'2', + }) + data = response.json_value() + assert data == dict( + success=True, + before=None, + after=nc4.hash_hex, + count=2, + has_more=True, + nc_creation_txs=[ + self.nc_to_response_item(nc3), + self.nc_to_response_item(nc2), + ], + ) + + response = await self.web.get('creation', { + b'after': nc2.hash_hex.encode(), + b'count': b'2', + }) + data = response.json_value() + assert data == dict( + success=True, + before=None, + after=nc2.hash_hex, + count=2, + has_more=False, + nc_creation_txs=[ + self.nc_to_response_item(nc1), + ], + ) + + response = await self.web.get('creation', { + b'after': nc1.hash_hex.encode(), + b'count': b'2', + }) + data = response.json_value() + assert data == dict( + success=True, + before=None, + after=nc1.hash_hex, + count=2, + has_more=False, + nc_creation_txs=[], + ) + + response = await self.web.get('creation', { + b'before': nc1.hash_hex.encode(), + b'count': b'2', + }) + data = response.json_value() + assert data == dict( + success=True, + before=nc1.hash_hex, + after=None, + count=2, + has_more=True, + nc_creation_txs=[ + self.nc_to_response_item(nc2), + self.nc_to_response_item(nc3), + ], + ) + + response = await self.web.get('creation', { + b'before': nc3.hash_hex.encode(), + b'count': b'2', + }) + data = response.json_value() + assert data == dict( + success=True, + before=nc3.hash_hex, + after=None, + count=2, + has_more=False, + nc_creation_txs=[ + self.nc_to_response_item(nc4), + self.nc_to_response_item(nc5), + ], + ) + + async def test_pagination_asc(self) -> None: + nc1, nc2, nc3, nc4, nc5 = self.prepare_ncs() + response = await self.web.get('creation', { + b'count': b'2', + b'order': b'asc', + }) + data = response.json_value() + assert data == dict( + success=True, + before=None, + after=None, + count=2, + has_more=True, + nc_creation_txs=[ + self.nc_to_response_item(nc1), + self.nc_to_response_item(nc2), + ], + ) + + response = await self.web.get('creation', { + b'after': nc2.hash_hex.encode(), + b'count': b'2', + b'order': b'asc', + }) + data = response.json_value() + assert data == dict( + success=True, + before=None, + after=nc2.hash_hex, + count=2, + has_more=True, + nc_creation_txs=[ + self.nc_to_response_item(nc3), + self.nc_to_response_item(nc4), + ], + ) + + response = await self.web.get('creation', { + b'after': nc4.hash_hex.encode(), + b'count': b'2', + b'order': b'asc', + }) + data = response.json_value() + assert data == dict( + success=True, + before=None, + after=nc4.hash_hex, + count=2, + has_more=False, + nc_creation_txs=[ + self.nc_to_response_item(nc5), + ], + ) + + response = await self.web.get('creation', { + b'after': nc5.hash_hex.encode(), + b'count': b'2', + b'order': b'asc', + }) + data = response.json_value() + assert data == dict( + success=True, + before=None, + after=nc5.hash_hex, + count=2, + has_more=False, + nc_creation_txs=[], + ) + + response = await self.web.get('creation', { + b'before': nc5.hash_hex.encode(), + b'count': b'2', + b'order': b'asc', + }) + data = response.json_value() + assert data == dict( + success=True, + before=nc5.hash_hex, + after=None, + count=2, + has_more=True, + nc_creation_txs=[ + self.nc_to_response_item(nc4), + self.nc_to_response_item(nc3), + ], + ) + + response = await self.web.get('creation', { + b'before': nc3.hash_hex.encode(), + b'count': b'2', + b'order': b'asc', + }) + data = response.json_value() + assert data == dict( + success=True, + before=nc3.hash_hex, + after=None, + count=2, + has_more=False, + nc_creation_txs=[ + self.nc_to_response_item(nc2), + self.nc_to_response_item(nc1), + ], + ) + + async def test_search_by_nc_id(self) -> None: + nc1, nc2, nc3, nc4, nc5 = self.prepare_ncs() + response = await self.web.get('on_chain', { + b'search': nc3.hash_hex.encode(), + }) + data = response.json_value() + assert data == dict( + success=True, + before=None, + after=None, + count=10, + has_more=False, + nc_creation_txs=[ + self.nc_to_response_item(nc3), + ], + ) + + async def test_search_by_blueprint_id(self) -> None: + nc1, nc2, nc3, nc4, nc5 = self.prepare_ncs() + response = await self.web.get('on_chain', { + b'search': nc1.get_nano_header().nc_id.hex().encode(), + }) + data = response.json_value() + assert data == dict( + success=True, + before=None, + after=None, + count=10, + has_more=False, + nc_creation_txs=[ + self.nc_to_response_item(nc3), + self.nc_to_response_item(nc1), + ], + ) + + response = await self.web.get('on_chain', { + b'search': nc2.get_nano_header().nc_id.hex().encode(), + }) + data = response.json_value() + assert data == dict( + success=True, + before=None, + after=None, + count=10, + has_more=False, + nc_creation_txs=[ + self.nc_to_response_item(nc5), + self.nc_to_response_item(nc2), + ], + ) + + response = await self.web.get('on_chain', { + b'search': nc4.get_nano_header().nc_id.hex().encode(), + }) + data = response.json_value() + assert data == dict( + success=True, + before=None, + after=None, + count=10, + has_more=False, + nc_creation_txs=[ + self.nc_to_response_item(nc4), + ], + ) + + async def test_search_by_blueprint_id_with_pagination(self) -> None: + nc1, nc2, nc3, nc4, nc5 = self.prepare_ncs() + nc1_nano_header = nc1.get_nano_header() + response = await self.web.get('on_chain', { + b'search': nc1_nano_header.nc_id.hex().encode(), + b'count': b'1', + }) + data = response.json_value() + assert data == dict( + success=True, + before=None, + after=None, + count=1, + has_more=True, + nc_creation_txs=[ + self.nc_to_response_item(nc3), + ], + ) + + response = await self.web.get('on_chain', { + b'search': nc1_nano_header.nc_id.hex().encode(), + b'count': b'1', + b'after': nc3.hash_hex.encode() + }) + data = response.json_value() + assert data == dict( + success=True, + before=None, + after=nc3.hash_hex, + count=1, + has_more=False, + nc_creation_txs=[ + self.nc_to_response_item(nc1), + ], + ) + + async def test_search_non_existent(self) -> None: + self.prepare_ncs() + response = await self.web.get('on_chain', { + b'search': self._settings.GENESIS_BLOCK_HASH.hex().encode(), + }) + data = response.json_value() + assert data == dict( + success=True, + before=None, + after=None, + count=10, + has_more=False, + nc_creation_txs=[], + ) + + response = await self.web.get('on_chain', { + b'search': b'fe' * 32, + }) + data = response.json_value() + assert data == dict( + success=True, + before=None, + after=None, + count=10, + has_more=False, + nc_creation_txs=[], + ) + + async def test_search_non_hex(self) -> None: + self.prepare_ncs() + response = await self.web.get('builtin', { + b'search': b'abc', + }) + data = response.json_value() + assert data == dict( + success=True, + count=10, + before=None, + after=None, + has_more=False, + nc_creation_txs=[], + ) + + async def test_non_hex_pagination(self) -> None: + self.prepare_ncs() + response = await self.web.get('creation', { + b'after': b'abc', + b'count': b'2', + }) + data = response.json_value() + assert response.responseCode == 400 + assert data == dict( + success=False, + error='Invalid "before" or "after": abc' + ) diff --git a/tests/resources/nanocontracts/test_on_chain.py b/tests/resources/nanocontracts/test_on_chain.py new file mode 100644 index 000000000..318475cf5 --- /dev/null +++ b/tests/resources/nanocontracts/test_on_chain.py @@ -0,0 +1,427 @@ +# Copyright 2025 Hathor Labs +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from typing import Any + +from hathor.nanocontracts import OnChainBlueprint +from hathor.nanocontracts.resources.on_chain import BlueprintOnChainResource +from hathor.nanocontracts.utils import load_builtin_blueprint_for_ocb +from tests import unittest +from tests.dag_builder.builder import TestDAGBuilder +from tests.nanocontracts import test_blueprints +from tests.resources.base_resource import StubSite, _BaseResourceTest + + +class BlueprintOnChainResourceTest(_BaseResourceTest._ResourceTest): + + def setUp(self): + super().setUp() + self.manager = self.create_peer( + 'testnet', + nc_indices=True, + ) + self.web = StubSite(BlueprintOnChainResource(self.manager)) + self.dag_builder = TestDAGBuilder.from_manager(self.manager) + + def propagate_ocbs(self) -> list[OnChainBlueprint]: + bet_code = load_builtin_blueprint_for_ocb('bet.py', 'Bet', test_blueprints) + private_key = unittest.OCB_TEST_PRIVKEY.hex() + password = unittest.OCB_TEST_PASSWORD.hex() + + artifacts = self.dag_builder.build_from_str(f""" + blockchain genesis b[1..11] + b10 < dummy + + ocb1.ocb_private_key = "{private_key}" + ocb2.ocb_private_key = "{private_key}" + ocb3.ocb_private_key = "{private_key}" + ocb4.ocb_private_key = "{private_key}" + ocb5.ocb_private_key = "{private_key}" + + ocb1.ocb_password = "{password}" + ocb2.ocb_password = "{password}" + ocb3.ocb_password = "{password}" + ocb4.ocb_password = "{password}" + ocb5.ocb_password = "{password}" + + ocb1.ocb_code = "{bet_code.encode().hex()}" + ocb2.ocb_code = "{bet_code.encode().hex()}" + ocb3.ocb_code = "{bet_code.encode().hex()}" + ocb4.ocb_code = "{bet_code.encode().hex()}" + ocb5.ocb_code = "{bet_code.encode().hex()}" + + ocb1 <-- ocb2 <-- ocb3 <-- ocb4 <-- ocb5 <-- b11 + """) + + artifacts.propagate_with(self.manager) + return artifacts.get_typed_vertices(['ocb1', 'ocb2', 'ocb3', 'ocb4', 'ocb5'], OnChainBlueprint) + + def blueprint_tx_to_response(self, bp_tx: OnChainBlueprint, *, name: str = 'Bet') -> dict[str, Any]: + return dict( + id=bp_tx.blueprint_id().hex(), + name=name, + created_at=bp_tx.timestamp + ) + + async def test_success(self) -> None: + # test when there are no OCBs + response = await self.web.get('on_chain') + data = response.json_value() + + assert data == dict( + success=True, + before=None, + after=None, + count=10, + has_more=False, + blueprints=[], + ) + + ocbs = self.propagate_ocbs() + response = await self.web.get('on_chain') + data = response.json_value() + expected_bps = [self.blueprint_tx_to_response(ocb)for ocb in reversed(ocbs)] + + assert data == dict( + success=True, + before=None, + after=None, + count=10, + has_more=False, + blueprints=expected_bps, + ) + + async def test_ocb_not_confirmed(self) -> None: + private_key = unittest.OCB_TEST_PRIVKEY.hex() + password = unittest.OCB_TEST_PASSWORD.hex() + artifacts = self.dag_builder.build_from_str(f""" + blockchain genesis b[1..11] + b10 < dummy + + ocb1.ocb_private_key = "{private_key}" + ocb1.ocb_password = "{password}" + ocb1.ocb_code = test_blueprint1.py, TestBlueprint1 + + ocb2.ocb_private_key = "{private_key}" + ocb2.ocb_password = "{password}" + ocb2.ocb_code = test_blueprint1.py, TestBlueprint1 + + ocb1 <-- b11 + """) + + artifacts.propagate_with(self.manager) + ocb1 = artifacts.get_typed_vertex('ocb1', OnChainBlueprint) + + response = await self.web.get('on_chain') + data = response.json_value() + assert data == dict( + success=True, + before=None, + after=None, + count=10, + has_more=False, + blueprints=[ + self.blueprint_tx_to_response(ocb1, name='TestBlueprint1') + ], + ) + + async def test_pagination(self) -> None: + ocbs = self.propagate_ocbs() + response = await self.web.get('on_chain', { + b'count': b'2', + }) + data = response.json_value() + assert data == dict( + success=True, + before=None, + after=None, + count=2, + has_more=True, + blueprints=[ + self.blueprint_tx_to_response(ocbs[4]), + self.blueprint_tx_to_response(ocbs[3]), + ], + ) + + after = ocbs[3].hash_hex + response = await self.web.get('on_chain', { + b'after': after.encode(), + b'count': b'2', + }) + data = response.json_value() + assert data == dict( + success=True, + before=None, + after=after, + count=2, + has_more=True, + blueprints=[ + self.blueprint_tx_to_response(ocbs[2]), + self.blueprint_tx_to_response(ocbs[1]), + ], + ) + + after = ocbs[1].hash_hex + response = await self.web.get('on_chain', { + b'after': after.encode(), + b'count': b'2', + }) + data = response.json_value() + assert data == dict( + success=True, + before=None, + after=after, + count=2, + has_more=False, + blueprints=[ + self.blueprint_tx_to_response(ocbs[0]), + ], + ) + + after = ocbs[0].hash_hex + response = await self.web.get('on_chain', { + b'after': after.encode(), + b'count': b'2', + }) + data = response.json_value() + assert data == dict( + success=True, + before=None, + after=after, + count=2, + has_more=False, + blueprints=[], + ) + + before = ocbs[0].hash_hex + response = await self.web.get('on_chain', { + b'before': before.encode(), + b'count': b'2', + }) + data = response.json_value() + assert data == dict( + success=True, + before=before, + after=None, + count=2, + has_more=True, + blueprints=[ + self.blueprint_tx_to_response(ocbs[1]), + self.blueprint_tx_to_response(ocbs[2]), + ], + ) + + before = ocbs[2].hash_hex + response = await self.web.get('on_chain', { + b'before': before.encode(), + b'count': b'2', + }) + data = response.json_value() + assert data == dict( + success=True, + before=before, + after=None, + count=2, + has_more=False, + blueprints=[ + self.blueprint_tx_to_response(ocbs[3]), + self.blueprint_tx_to_response(ocbs[4]), + ], + ) + + before = ocbs[4].hash_hex + response = await self.web.get('on_chain', { + b'before': before.encode(), + b'count': b'2', + }) + data = response.json_value() + assert data == dict( + success=True, + before=before, + after=None, + count=2, + has_more=False, + blueprints=[], + ) + + async def test_pagination_asc(self) -> None: + ocbs = self.propagate_ocbs() + response = await self.web.get('on_chain', { + b'count': b'2', + b'order': b'asc', + }) + data = response.json_value() + assert data == dict( + success=True, + before=None, + after=None, + count=2, + has_more=True, + blueprints=[ + self.blueprint_tx_to_response(ocbs[0]), + self.blueprint_tx_to_response(ocbs[1]), + ], + ) + + after = ocbs[1].hash_hex + response = await self.web.get('on_chain', { + b'after': after.encode(), + b'count': b'2', + b'order': b'asc', + }) + data = response.json_value() + assert data == dict( + success=True, + before=None, + after=after, + count=2, + has_more=True, + blueprints=[ + self.blueprint_tx_to_response(ocbs[2]), + self.blueprint_tx_to_response(ocbs[3]), + ], + ) + + after = ocbs[3].hash_hex + response = await self.web.get('on_chain', { + b'after': after.encode(), + b'count': b'2', + b'order': b'asc', + }) + data = response.json_value() + assert data == dict( + success=True, + before=None, + after=after, + count=2, + has_more=False, + blueprints=[ + self.blueprint_tx_to_response(ocbs[4]), + ], + ) + + after = ocbs[4].hash_hex + response = await self.web.get('on_chain', { + b'after': after.encode(), + b'count': b'2', + b'order': b'asc', + }) + data = response.json_value() + assert data == dict( + success=True, + before=None, + after=after, + count=2, + has_more=False, + blueprints=[], + ) + + before = ocbs[4].hash_hex + response = await self.web.get('on_chain', { + b'before': before.encode(), + b'count': b'2', + b'order': b'asc', + }) + data = response.json_value() + assert data == dict( + success=True, + before=before, + after=None, + count=2, + has_more=True, + blueprints=[ + self.blueprint_tx_to_response(ocbs[3]), + self.blueprint_tx_to_response(ocbs[2]), + ], + ) + + before = ocbs[2].hash_hex + response = await self.web.get('on_chain', { + b'before': before.encode(), + b'count': b'2', + b'order': b'asc', + }) + data = response.json_value() + assert data == dict( + success=True, + before=before, + after=None, + count=2, + has_more=False, + blueprints=[ + self.blueprint_tx_to_response(ocbs[1]), + self.blueprint_tx_to_response(ocbs[0]), + ], + ) + + async def test_search_by_bp_id(self) -> None: + ocbs = self.propagate_ocbs() + some_bp_tx = ocbs[2] + response = await self.web.get('on_chain', { + b'search': some_bp_tx.hash_hex.encode(), + }) + data = response.json_value() + assert data == dict( + success=True, + before=None, + after=None, + count=10, + has_more=False, + blueprints=[ + self.blueprint_tx_to_response(some_bp_tx), + ], + ) + + # tx exists but is not a blueprint + bp_id = self._settings.GENESIS_TX1_HASH.hex() + response = await self.web.get('builtin', { + b'search': bp_id.encode(), + }) + data = response.json_value() + assert data == dict( + success=True, + before=None, + after=None, + count=10, + has_more=False, + blueprints=[], + ) + + response = await self.web.get('on_chain', { + b'search': b'ff' * 32, + }) + data = response.json_value() + assert data == dict( + success=True, + before=None, + after=None, + count=10, + has_more=False, + blueprints=[], + ) + + async def test_search_by_name(self) -> None: + response = await self.web.get('builtin', { + b'search': b'Bet', + }) + data = response.json_value() + # it's not implemented so it returns empty + assert data == dict( + success=True, + before=None, + after=None, + count=10, + has_more=False, + blueprints=[], + ) diff --git a/tests/resources/nanocontracts/test_state.py b/tests/resources/nanocontracts/test_state.py new file mode 100644 index 000000000..824d0c92b --- /dev/null +++ b/tests/resources/nanocontracts/test_state.py @@ -0,0 +1,512 @@ +import hashlib +import math +from typing import Any, NamedTuple, Optional, TypeAlias + +from cryptography.hazmat.primitives import hashes +from cryptography.hazmat.primitives.asymmetric import ec +from twisted.internet.defer import inlineCallbacks + +from hathor.conf import HathorSettings +from hathor.crypto.util import decode_address, get_address_b58_from_bytes, get_public_key_bytes_compressed +from hathor.nanocontracts import Blueprint, Context, public, view +from hathor.nanocontracts.catalog import NCBlueprintCatalog +from hathor.nanocontracts.method import Method +from hathor.nanocontracts.resources import NanoContractStateResource +from hathor.nanocontracts.types import Address, NCActionType, NCDepositAction, Timestamp, TokenUid +from hathor.nanocontracts.utils import sign_openssl +from hathor.simulator.utils import add_new_block +from hathor.transaction import Transaction, TxInput +from hathor.transaction.headers import NanoHeader +from hathor.transaction.headers.nano_header import NanoHeaderAction +from hathor.transaction.scripts import P2PKH +from tests.resources.base_resource import StubSite, _BaseResourceTest +from tests.utils import add_blocks_unlock_reward, get_genesis_key + +settings = HathorSettings() + +Amount: TypeAlias = int + + +class MyNamedTuple(NamedTuple): + amount1: int + amount2: int + address: Optional[Address] + + +class MyBlueprint(Blueprint): + token_uid: TokenUid + total: Amount + date_last_bet: Timestamp + address_details: dict[Address, dict[str, Amount]] + bytes_field: bytes + dict_with_bytes: dict[bytes, str] + + @public + def initialize(self, ctx: Context, token_uid: TokenUid, date_last_bet: Timestamp) -> None: + self.token_uid = token_uid + self.date_last_bet = date_last_bet + self.total = 0 + + @public(allow_deposit=True) + def bet(self, ctx: Context, address: Address, score: str) -> None: + action = ctx.get_single_action(self.token_uid) + assert isinstance(action, NCDepositAction) + self.total += action.amount + partial = self.address_details.get(address, {}) + if score not in partial: + partial[score] = action.amount + else: + partial[score] += action.amount + self.address_details[address] = partial + + encoded_score = score.encode() + self.bytes_field = encoded_score + self.dict_with_bytes[encoded_score] = score + + @view + def has_result(self) -> bool: + return False + + @view + def add(self, a: int, b: int) -> int: + return a + b + + @view + def conditional_add(self, test_tuple: MyNamedTuple) -> Optional[int]: + """A method only for testing that sums amount1 + amount2, in case + the address is equal to WewDeXWyvHP7jJTs7tjLoQfoB72LLxJQqN + """ + conditional_address = 'WewDeXWyvHP7jJTs7tjLoQfoB72LLxJQqN' + if test_tuple.address and get_address_b58_from_bytes(test_tuple.address) == conditional_address: + return test_tuple.amount1 + test_tuple.amount2 + + return None + + @view + def multiply(self, elements: list[int]) -> int: + return math.prod(elements) + + @view + def conditional_multiply_bytes(self, t: tuple[int, Optional[bytes]]) -> Optional[bytes]: + multiplier = t[0] + data = t[1] + if not data: + return None + + return multiplier * data + + +class BaseNanoContractStateTest(_BaseResourceTest._ResourceTest): + def setUp(self): + super().setUp() + + self.manager = self.create_peer('testnet', unlock_wallet=True, wallet_index=True) + self.tx_storage = self.manager.tx_storage + + self.genesis = self.tx_storage.get_all_genesis() + self.genesis_blocks = [tx for tx in self.genesis if tx.is_block] + self.genesis_txs = [tx for tx in self.genesis if not tx.is_block] + + # read genesis keys + self.genesis_private_key = get_genesis_key() + self.genesis_public_key = self.genesis_private_key.public_key() + + *_, self.last_block = add_blocks_unlock_reward(self.manager) + + self.web = StubSite(NanoContractStateResource(self.manager)) + + self.bet_id = bytes.fromhex('3cb032600bdf7db784800e4ea911b10676fa2f67591f82bb62628c234e771595') + self.catalog = NCBlueprintCatalog({ + self.bet_id: MyBlueprint + }) + + self.tx_storage.nc_catalog = self.catalog + self.nc_seqnum = 0 + + @inlineCallbacks + def test_fail_missing_id(self): + response1 = yield self.web.get('state') + self.assertEqual(400, response1.responseCode) + + @inlineCallbacks + def test_fail_invalid_id(self): + response1 = yield self.web.get('state', { + b'id': b'xxx', + }) + self.assertEqual(400, response1.responseCode) + + @inlineCallbacks + def test_fail_unknown_id(self): + response1 = yield self.web.get('history', { + b'id': b'0' * 32, + }) + self.assertEqual(404, response1.responseCode) + + @inlineCallbacks + def test_fail_not_contract_id(self): + response1 = yield self.web.get('history', { + b'id': self.genesis_txs[0].hash.hex().encode('ascii'), + }) + self.assertEqual(404, response1.responseCode) + + def _fill_nc( + self, + nc: Transaction, + nc_id: bytes, + nc_method: str, + nc_args: list[Any], + private_key: ec.EllipticCurvePrivateKeyWithSerialization, + *, + nc_actions: list[NanoHeaderAction] | None = None + ) -> None: + + method_parser = Method.from_callable(getattr(MyBlueprint, nc_method)) + nc_args_bytes = method_parser.serialize_args_bytes(nc_args) + + nano_header = NanoHeader( + tx=nc, + nc_seqnum=self.nc_seqnum, + nc_id=nc_id, + nc_method=nc_method, + nc_args_bytes=nc_args_bytes, + nc_address=b'', + nc_script=b'', + nc_actions=nc_actions or [], + ) + nc.headers.append(nano_header) + self.nc_seqnum += 1 + + sign_openssl(nano_header, private_key) + self.manager.cpu_mining_service.resolve(nc) + + @inlineCallbacks + def test_success(self): + parents = [tx.hash for tx in self.genesis_txs] + timestamp = 1 + max(tx.timestamp for tx in self.genesis) + + date_last_bet = 1699579721 + # Create bet nano contract + nc = Transaction( + weight=1, + inputs=[], + outputs=[], + parents=parents, + storage=self.tx_storage, + timestamp=timestamp + ) + self._fill_nc( + nc, + self.bet_id, + 'initialize', + [settings.HATHOR_TOKEN_UID, date_last_bet], + self.genesis_private_key, + ) + self.assertTrue(self.manager.on_new_tx(nc)) + + # Before the execution we can't get the state + response0 = yield self.web.get( + 'state', [ + (b'id', nc.hash.hex().encode('ascii')), + (b'fields[]', b'token_uid'), + ] + ) + self.assertEqual(404, response0.responseCode) + # Execute the nano contract + block1 = add_new_block(self.manager) + + response1 = yield self.web.get( + 'state', [ + (b'id', nc.hash.hex().encode('ascii')), + (b'fields[]', b'token_uid'), + (b'fields[]', b'total'), + (b'fields[]', b'date_last_bet'), + (b'balances[]', settings.HATHOR_TOKEN_UID.hex().encode('ascii')), + (b'calls[]', b'has_result()'), + (b'calls[]', b'unknown_method()'), + (b'calls[]', b'add(5, 12)'), + (b'calls[]', b'conditional_add([2, 4, null])'), + (b'calls[]', b'conditional_add([2, 4, "WewDeXWyvHP7jJTs7tjLoQfoB72LLxJQqN"])'), + (b'calls[]', b'multiply([2, 5, 8, 10])'), + (b'calls[]', b'conditional_multiply_bytes([5, "01"])'), + (b'calls[]', b'conditional_multiply_bytes([3, null])'), + ] + ) + data1 = response1.json_value() + fields1 = data1['fields'] + self.assertEqual(data1['blueprint_id'], self.bet_id.hex()) + self.assertEqual(data1['blueprint_name'], 'MyBlueprint') + self.assertEqual(fields1['token_uid'], {'value': settings.HATHOR_TOKEN_UID.hex()}) + self.assertEqual(fields1['total'], {'value': 0}) + self.assertEqual(fields1['date_last_bet'], {'value': date_last_bet}) + balances1 = data1['balances'] + self.assertEqual( + balances1, + {settings.HATHOR_TOKEN_UID.hex(): {'value': '0', 'can_mint': False, 'can_melt': False}} + ) + calls1 = data1['calls'] + self.assertEqual(calls1, { + 'has_result()': {'value': False}, + 'unknown_method()': {'errmsg': "NCMethodNotFound('MyBlueprint.unknown_method')"}, + 'add(5, 12)': {'value': 17}, + 'conditional_add([2, 4, null])': {'value': None}, + 'conditional_add([2, 4, "WewDeXWyvHP7jJTs7tjLoQfoB72LLxJQqN"])': {'value': 6}, + 'multiply([2, 5, 8, 10])': {'value': 800}, + 'conditional_multiply_bytes([5, "01"])': {'value': '0101010101'}, + 'conditional_multiply_bytes([3, null])': {'value': None} + }) + + # Now we create a deposit in the nano contract with the genesis output + inputs = [TxInput(self.genesis_blocks[0].hash, 0, b'')] + address_b58 = self.genesis_blocks[0].outputs[0].to_human_readable()['address'] + nc_bet = Transaction( + weight=1, + inputs=inputs, + outputs=[], + parents=parents, + storage=self.tx_storage, + timestamp=timestamp + ) + bet_result = '1x0' + self._fill_nc( + nc_bet, + nc.hash, + 'bet', + [decode_address(address_b58), bet_result], + self.genesis_private_key, + nc_actions=[ + NanoHeaderAction( + type=NCActionType.DEPOSIT, + token_index=0, + amount=self.genesis_blocks[0].outputs[0].value, + ) + ] + ) + + data_to_sign = nc_bet.get_sighash_all() + public_key_bytes = get_public_key_bytes_compressed(self.genesis_public_key) + hashed_data = hashlib.sha256(data_to_sign).digest() + signature = self.genesis_private_key.sign(hashed_data, ec.ECDSA(hashes.SHA256())) + nc_bet.inputs[0].data = P2PKH.create_input_data(public_key_bytes, signature) + + self.manager.cpu_mining_service.resolve(nc_bet) + # Add to DAG. + self.assertTrue(self.manager.on_new_tx(nc_bet)) + # Execute the deposit + block2 = add_new_block(self.manager) + + address_param = "address_details.a'{}'".format(address_b58) + dict_with_bytes_param = "dict_with_bytes.b'{}'".format(bet_result.encode().hex()) + response2 = yield self.web.get( + 'state', [ + (b'id', nc.hash.hex().encode('ascii')), + (b'fields[]', b'token_uid'), + (b'fields[]', b'total'), + (b'fields[]', b'date_last_bet'), + (b'fields[]', address_param.encode()), + (b'fields[]', b'bytes_field'), + (b'fields[]', dict_with_bytes_param.encode()), + (b'balances[]', settings.HATHOR_TOKEN_UID.hex().encode('ascii')), + ] + ) + data2 = response2.json_value() + fields2 = data2['fields'] + self.assertEqual(data2['blueprint_id'], self.bet_id.hex()) + self.assertEqual(data2['blueprint_name'], 'MyBlueprint') + self.assertEqual(fields2['token_uid'], {'value': settings.HATHOR_TOKEN_UID.hex()}) + self.assertEqual(fields2['total'], {'value': 10**11}) + self.assertEqual(fields2['date_last_bet'], {'value': date_last_bet}) + self.assertEqual(len(fields2[address_param]), 1) + # TODO: RE-IMPLEMENT SUPPORT FOR THIS + # FIXME + self.assertEqual(fields2[address_param], {'errmsg': 'not a blueprint field'}) + # self.assertEqual(fields2[address_param], {'value': {'1x0': 10**11}}) + self.assertEqual(fields2['bytes_field'], {'value': bet_result.encode().hex()}) + # FIXME + self.assertEqual(fields2[dict_with_bytes_param], {'errmsg': 'not a blueprint field'}) + # self.assertEqual(fields2[dict_with_bytes_param], {'value': '1x0'}) + balances2 = data2['balances'] + self.assertEqual( + balances2, + {settings.HATHOR_TOKEN_UID.hex(): {'value': '100000000000', 'can_mint': False, 'can_melt': False}} + ) + + # Test __all__ balance + response3 = yield self.web.get( + 'state', + { + b'id': nc.hash.hex().encode('ascii'), + b'balances[]': '__all__'.encode('ascii'), + } + ) + data3 = response3.json_value() + self.assertEqual(data3['blueprint_id'], self.bet_id.hex()) + self.assertEqual(data3['blueprint_name'], 'MyBlueprint') + balances3 = data3['balances'] + self.assertEqual( + balances3, + {settings.HATHOR_TOKEN_UID.hex(): {'value': '100000000000', 'can_mint': False, 'can_melt': False}} + ) + + # Test getting the state in a previous block + # With block hash + response4 = yield self.web.get( + 'state', [ + (b'id', nc.hash.hex().encode('ascii')), + (b'fields[]', b'token_uid'), + (b'fields[]', b'total'), + (b'fields[]', b'date_last_bet'), + (b'fields[]', address_param.encode()), + (b'balances[]', settings.HATHOR_TOKEN_UID.hex().encode('ascii')), + (b'block_hash', block1.hash.hex().encode('ascii')), + ] + ) + data4 = response4.json_value() + fields4 = data4['fields'] + self.assertEqual(data4['blueprint_id'], self.bet_id.hex()) + self.assertEqual(data4['blueprint_name'], 'MyBlueprint') + self.assertEqual(fields4['token_uid'], {'value': settings.HATHOR_TOKEN_UID.hex()}) + self.assertEqual(fields4['total'], {'value': 0}) + self.assertEqual(fields4['date_last_bet'], {'value': date_last_bet}) + self.assertEqual(fields4[address_param].get('value'), None) + balances4 = data4['balances'] + self.assertEqual( + balances4, + {settings.HATHOR_TOKEN_UID.hex(): {'value': '0', 'can_mint': False, 'can_melt': False}} + ) + + # With block height + response5 = yield self.web.get( + 'state', [ + (b'id', nc.hash.hex().encode('ascii')), + (b'fields[]', b'token_uid'), + (b'fields[]', b'total'), + (b'fields[]', b'date_last_bet'), + (b'fields[]', address_param.encode()), + (b'balances[]', settings.HATHOR_TOKEN_UID.hex().encode('ascii')), + (b'block_height', str(block1.static_metadata.height).encode('ascii')), + ] + ) + data5 = response5.json_value() + fields5 = data5['fields'] + self.assertEqual(data5['blueprint_id'], self.bet_id.hex()) + self.assertEqual(data5['blueprint_name'], 'MyBlueprint') + self.assertEqual(fields5['token_uid'], {'value': settings.HATHOR_TOKEN_UID.hex()}) + self.assertEqual(fields5['total'], {'value': 0}) + self.assertEqual(fields5['date_last_bet'], {'value': date_last_bet}) + self.assertEqual(fields5[address_param].get('value'), None) + balances5 = data5['balances'] + self.assertEqual( + balances5, + {settings.HATHOR_TOKEN_UID.hex(): {'value': '0', 'can_mint': False, 'can_melt': False}} + ) + + # With block2.timestamp, should get block2 state + response6 = yield self.web.get( + 'state', [ + (b'id', nc.hash.hex().encode('ascii')), + (b'fields[]', b'token_uid'), + (b'fields[]', b'total'), + (b'fields[]', b'date_last_bet'), + (b'fields[]', address_param.encode()), + (b'balances[]', settings.HATHOR_TOKEN_UID.hex().encode('ascii')), + (b'timestamp', str(block2.timestamp).encode('ascii')), + ] + ) + data6 = response6.json_value() + fields6 = data6['fields'] + self.assertEqual(data6['blueprint_id'], self.bet_id.hex()) + self.assertEqual(data6['blueprint_name'], 'MyBlueprint') + self.assertEqual(fields6['token_uid'], {'value': settings.HATHOR_TOKEN_UID.hex()}) + self.assertEqual(fields6['total'], {'value': 10**11}) + self.assertEqual(fields6['date_last_bet'], {'value': date_last_bet}) + self.assertEqual(fields6[address_param].get('value'), None) + balances6 = data6['balances'] + self.assertEqual( + balances6, + {settings.HATHOR_TOKEN_UID.hex(): {'value': '100000000000', 'can_mint': False, 'can_melt': False}} + ) + + # With block2.timestamp - 1, should get block1 state + response7 = yield self.web.get( + 'state', [ + (b'id', nc.hash.hex().encode('ascii')), + (b'fields[]', b'token_uid'), + (b'fields[]', b'total'), + (b'fields[]', b'date_last_bet'), + (b'fields[]', address_param.encode()), + (b'balances[]', settings.HATHOR_TOKEN_UID.hex().encode('ascii')), + (b'timestamp', str(block2.timestamp - 1).encode('ascii')), + ] + ) + data7 = response7.json_value() + fields7 = data7['fields'] + self.assertEqual(data7['blueprint_id'], self.bet_id.hex()) + self.assertEqual(data7['blueprint_name'], 'MyBlueprint') + self.assertEqual(fields7['token_uid'], {'value': settings.HATHOR_TOKEN_UID.hex()}) + self.assertEqual(fields7['total'], {'value': 0}) + self.assertEqual(fields7['date_last_bet'], {'value': date_last_bet}) + self.assertEqual(fields7[address_param].get('value'), None) + balances7 = data7['balances'] + self.assertEqual( + balances7, + {settings.HATHOR_TOKEN_UID.hex(): {'value': '0', 'can_mint': False, 'can_melt': False}} + ) + + # With block1.timestamp - 1, the contract doesn't exist + response7 = yield self.web.get( + 'state', [ + (b'id', nc.hash.hex().encode('ascii')), + (b'fields[]', b'token_uid'), + (b'fields[]', b'total'), + (b'fields[]', b'date_last_bet'), + (b'fields[]', address_param.encode()), + (b'balances[]', settings.HATHOR_TOKEN_UID.hex().encode('ascii')), + (b'timestamp', str(block1.timestamp - 1).encode('ascii')), + ] + ) + self.assertEqual(response7.responseCode, 404) + data7 = response7.json_value() + self.assertEqual(data7['error'], f'Nano contract does not exist at block {self.last_block.hash_hex}.') + + # Validate errors using block_hash / block_height + + # Both parameters can't be used together + response8 = yield self.web.get( + 'state', [ + (b'id', nc.hash.hex().encode('ascii')), + (b'fields[]', b'token_uid'), + (b'block_height', str(block1.static_metadata.height).encode('ascii')), + (b'block_hash', block1.hash.hex().encode('ascii')), + ] + ) + self.assertEqual(400, response8.responseCode) + + # block_height does not exist + response9 = yield self.web.get( + 'state', [ + (b'id', nc.hash.hex().encode('ascii')), + (b'fields[]', b'token_uid'), + (b'block_height', str(block1.static_metadata.height + 5).encode('ascii')), + ] + ) + self.assertEqual(400, response9.responseCode) + + # invalid block_hash does not exist + response10 = yield self.web.get( + 'state', [ + (b'id', nc.hash.hex().encode('ascii')), + (b'fields[]', b'token_uid'), + (b'block_hash', '123'.encode('ascii')), + ] + ) + self.assertEqual(400, response10.responseCode) + + # block_hash is a tx + response11 = yield self.web.get( + 'state', [ + (b'id', nc.hash.hex().encode('ascii')), + (b'fields[]', b'token_uid'), + (b'block_hash', nc_bet.hash.hex().encode('ascii')), + ] + ) + self.assertEqual(400, response11.responseCode) diff --git a/tests/tx/test_headers.py b/tests/tx/test_headers.py new file mode 100644 index 000000000..a4340418d --- /dev/null +++ b/tests/tx/test_headers.py @@ -0,0 +1,232 @@ +import pytest + +from hathor.exception import InvalidNewTransaction +from hathor.nanocontracts import Blueprint, Context, OnChainBlueprint, public +from hathor.nanocontracts.types import NCActionType +from hathor.transaction import BaseTransaction, Block, Transaction +from hathor.transaction.exceptions import HeaderNotSupported +from hathor.transaction.headers import NanoHeader, VertexBaseHeader +from hathor.transaction.headers.nano_header import ADDRESS_LEN_BYTES, NanoHeaderAction +from hathor.transaction.token_creation_tx import TokenCreationTransaction +from hathor.transaction.util import VerboseCallback +from tests import unittest +from tests.dag_builder.builder import TestDAGBuilder + + +class MyTestBlueprint(Blueprint): + @public + def initialize(self, ctx: Context) -> None: + pass + + @public + def nop(self, ctx: Context) -> None: + pass + + +class FakeHeader(VertexBaseHeader): + @classmethod + def deserialize( + cls, + tx: BaseTransaction, + buf: bytes, + *, + verbose: VerboseCallback = None, + ) -> tuple[VertexBaseHeader, bytes]: + raise NotImplementedError + + def serialize(self) -> bytes: + return b'fake header' + + def get_sighash_bytes(self) -> bytes: + return b'fake sighash' + + +class VertexHeadersTest(unittest.TestCase): + def has_nano_header(self, vertex: BaseTransaction) -> bool: + for header in vertex.headers: + if isinstance(header, NanoHeader): + return True + return False + + def setUp(self) -> None: + super().setUp() + self.blueprint_id = b'x' * 32 + self.manager = self.create_peer('testnet') + self.manager.tx_storage.nc_catalog.blueprints[self.blueprint_id] = MyTestBlueprint + self.dag_builder = TestDAGBuilder.from_manager(self.manager) + + private_key = unittest.OCB_TEST_PRIVKEY.hex() + password = unittest.OCB_TEST_PASSWORD.hex() + + self.artifacts = self.dag_builder.build_from_str(f''' + blockchain genesis b[1..12] + b10 < dummy + + nc1.nc_id = "{self.blueprint_id.hex()}" + nc1.nc_method = initialize() + + tx1.out[0] = 5 TKA + tx2.out[0] = 3 TKB + + b12.nc_id = nc1 + b12.nc_method = nop() + + tx2.nc_id = nc1 + tx2.nc_method = nop() + + TKB.nc_id = nc1 + TKB.nc_method = nop() + + ocb1.ocb_private_key = "{private_key}" + ocb1.ocb_password = "{password}" + ocb1.ocb_code = test_blueprint1.py, TestBlueprint1 + + dummy < b11 < nc1 < TKA < tx1 < b12 < TKB < tx2 < ocb1 + ''') + self.artifacts.propagate_with(self.manager, up_to='dummy') + + self.valid_vertices: list[tuple[str, type[BaseTransaction], bool]] = [ + ('b11', Block, False), + ('nc1', Transaction, True), + ('TKA', TokenCreationTransaction, False), + ('TKB', TokenCreationTransaction, True), + ('tx1', Transaction, False), + ('ocb1', OnChainBlueprint, False), + # TODO: We should also test MergeMinedBlock, but the DAGBuilder doesn't support it yet + ] + + def test_headers_affect_hash(self) -> None: + for name, type_, is_nano in self.valid_vertices: + vertex: BaseTransaction = self.artifacts.get_typed_vertex(name, type_) + assert self.has_nano_header(vertex) == is_nano + + # Test adding a new header. + msg = f'changing headers should change the hash on "{name}"' + clone = vertex.clone(include_storage=False, include_metadata=False) + assert clone.hash == clone.calculate_hash() + clone.headers.append(FakeHeader()) + assert clone.hash != clone.calculate_hash(), msg + + # Now we'll test nano header attributes, so we can skip non-nano txs + if not is_nano: + continue + + assert isinstance(vertex, Transaction) + attributes_and_new_values = [ + ('nc_id', b'123'), + ('nc_seqnum', vertex.get_nano_header().nc_seqnum + 1), + ('nc_method', 'new_method'), + ('nc_args_bytes', b'new args'), + ('nc_actions', [NanoHeaderAction(type=NCActionType.DEPOSIT, token_index=0, amount=123)]), + ('nc_address', b'\x01' * ADDRESS_LEN_BYTES), + ('nc_script', b'new script'), + ] + + # Test editing existing nano header. + for attribute, new_value in attributes_and_new_values: + clone = vertex.clone(include_storage=False, include_metadata=False) + assert clone.hash == vertex.hash + assert clone.hash == clone.calculate_hash() + setattr(clone.get_nano_header(), attribute, new_value) + assert clone.hash != clone.calculate_hash(), msg + + def test_headers_affect_sighash_all(self) -> None: + for name, type_, is_nano in self.valid_vertices: + vertex: BaseTransaction = self.artifacts.get_typed_vertex(name, type_) + assert self.has_nano_header(vertex) == is_nano + + if not isinstance(vertex, Transaction): + # only transactions have sighash + continue + + # Test adding a new header. + msg = f'changing headers should change the sighash on "{name}"' + clone = vertex.clone(include_storage=False, include_metadata=False) + sighash_before = clone.get_sighash_all(skip_cache=True) + assert sighash_before == vertex.get_sighash_all(skip_cache=True) + clone.headers.append(FakeHeader()) + sighash_after = clone.get_sighash_all(skip_cache=True) + assert sighash_before != sighash_after, msg + + # Now we'll test nano header attributes, so we can skip non-nano txs + if not is_nano: + continue + + assert isinstance(vertex, Transaction) + attributes_and_new_values = [ + ('nc_id', b'123'), + ('nc_seqnum', vertex.get_nano_header().nc_seqnum + 1), + ('nc_method', 'new_method'), + ('nc_args_bytes', b'new args'), + ('nc_actions', [NanoHeaderAction(type=NCActionType.DEPOSIT, token_index=0, amount=123)]), + ('nc_address', b'\x01' * ADDRESS_LEN_BYTES), + ] + + # Test editing existing nano header. + for attribute, new_value in attributes_and_new_values: + clone = vertex.clone(include_storage=False, include_metadata=False) + sighash_before = clone.get_sighash_all(skip_cache=True) + assert sighash_before == vertex.get_sighash_all(skip_cache=True) + setattr(clone.get_nano_header(), attribute, new_value) + sighash_after = clone.get_sighash_all(skip_cache=True) + assert sighash_before != sighash_after, msg + + # Changing the nc_script does not affect sighash all. + clone = vertex.clone(include_storage=False, include_metadata=False) + sighash_before = clone.get_sighash_all(skip_cache=True) + assert sighash_before == vertex.get_sighash_all(skip_cache=True) + clone.get_nano_header().nc_script = b'new script' + sighash_after = clone.get_sighash_all(skip_cache=True) + assert sighash_before == sighash_after, msg + + def test_nano_header_allowed_vertices(self) -> None: + for name, _type, should_have_nano_header in self.valid_vertices: + vertex: BaseTransaction = self.artifacts.get_typed_vertex(name, _type) + assert self.has_nano_header(vertex) == should_have_nano_header + vertex.storage = self.manager.tx_storage + clone = vertex.clone(include_metadata=False, include_storage=True) + assert bytes(clone) == bytes(vertex) + assert self.manager.on_new_tx(vertex) + + expected_to_fail: list[tuple[str, type[BaseTransaction], bool]] = [ + ('b12', Block, True), + ] + + for name, _type, should_have_nano_header in expected_to_fail: + vertex = self.artifacts.get_typed_vertex(name, _type) + assert self.has_nano_header(vertex) == should_have_nano_header + with pytest.raises(InvalidNewTransaction) as e: + self.manager.on_new_tx(vertex) + assert isinstance(e.value.__cause__, HeaderNotSupported) + + def test_nano_header_round_trip(self) -> None: + tx = Transaction() + header1 = NanoHeader( + tx=tx, + nc_id=b'1' * 32, + nc_seqnum=0, + nc_method='some_method', + nc_args_bytes=b'some args', + nc_actions=[ + NanoHeaderAction( + type=NCActionType.DEPOSIT, + token_index=0, + amount=123, + ), + ], + nc_address=b'\x01' * ADDRESS_LEN_BYTES, + nc_script=b'some script', + ) + + header1_bytes = header1.serialize() + header2, buf = NanoHeader.deserialize(tx, header1_bytes) + + assert len(buf) == 0 + assert header1_bytes == header2.serialize() + assert header1.tx is header2.tx + assert header1.nc_id == header2.nc_id + assert header1.nc_method == header2.nc_method + assert header1.nc_args_bytes == header2.nc_args_bytes + assert header1.nc_actions == header2.nc_actions + assert header1.nc_address == header2.nc_address + assert header1.nc_script == header2.nc_script diff --git a/tests/tx/test_indexes_nc_history.py b/tests/tx/test_indexes_nc_history.py new file mode 100644 index 000000000..b214f5218 --- /dev/null +++ b/tests/tx/test_indexes_nc_history.py @@ -0,0 +1,219 @@ +from hathor.conf import HathorSettings +from hathor.crypto.util import get_address_b58_from_bytes +from hathor.nanocontracts import Blueprint, Context, public +from hathor.nanocontracts.catalog import NCBlueprintCatalog +from hathor.nanocontracts.utils import sign_openssl +from hathor.storage.rocksdb_storage import RocksDBStorage +from hathor.transaction import Transaction +from hathor.transaction.headers import NanoHeader +from hathor.transaction.storage import TransactionRocksDBStorage +from hathor.util import not_none +from hathor.wallet import KeyPair, Wallet +from tests import unittest +from tests.dag_builder.builder import TestDAGBuilder +from tests.utils import add_blocks_unlock_reward, get_genesis_key + +settings = HathorSettings() + + +class MyTestBlueprint(Blueprint): + @public + def initialize(self, ctx: Context) -> None: + pass + + @public + def nop(self, ctx: Context) -> None: + pass + + +class NCHistoryIndexesTest(unittest.TestCase): + __test__ = False + + def test_basic(self): + blueprint_id = b'x' * 32 + self.catalog = NCBlueprintCatalog({ + blueprint_id: MyTestBlueprint + }) + self.manager.tx_storage.nc_catalog = self.catalog + + parents = self.manager.get_new_tx_parents() + nc = Transaction(weight=1, inputs=[], outputs=[], parents=parents, storage=self.tx_storage) + + nc_id = blueprint_id + nc_method = 'initialize' + nc_args_bytes = b'\00' + + key = KeyPair.create(b'my-pass') + privkey = key.get_private_key(b'my-pass') + + nano_header = NanoHeader( + tx=nc, + nc_seqnum=0, + nc_id=nc_id, + nc_method=nc_method, + nc_args_bytes=nc_args_bytes, + nc_address=b'', + nc_script=b'', + nc_actions=[], + ) + nc.headers.append(nano_header) + + sign_openssl(nano_header, privkey) + self.manager.cpu_mining_service.resolve(nc) + + self.assertTrue(self.manager.on_new_tx(nc)) + + contract_id = nc.hash + nc_history_index = self.manager.tx_storage.indexes.nc_history + self.assertEqual( + [nc.hash], + list(nc_history_index.get_sorted_from_contract_id(contract_id)) + ) + + addresses_index = self.manager.tx_storage.indexes.addresses + address = get_address_b58_from_bytes(nano_header.nc_address) + self.assertEqual( + [nc.hash], + list(addresses_index.get_sorted_from_address(address)) + ) + + def test_latest_tx_timestamp(self) -> None: + blueprint_id = b'x' * 32 + catalog = NCBlueprintCatalog({ + blueprint_id: MyTestBlueprint + }) + manager = self.create_peer('testnet', nc_indices=True) + nc_history_index = manager.tx_storage.indexes.nc_history + manager.tx_storage.nc_catalog = catalog + dag_builder = TestDAGBuilder.from_manager(manager) + artifacts = dag_builder.build_from_str(f''' + blockchain genesis b[1..11] + b10 < dummy + + nc1.nc_id = "{blueprint_id.hex()}" + nc1.nc_method = initialize() + + nc2.nc_id = nc1 + nc2.nc_method = nop() + + nc1 <-- nc2 <-- b11 + ''') + artifacts.propagate_with(manager) + + nc1, nc2 = artifacts.get_typed_vertices(['nc1', 'nc2'], Transaction) + + assert nc1.is_nano_contract() + assert nc2.is_nano_contract() + + assert nc_history_index.get_latest_tx_timestamp(nc1.hash) == nc2.timestamp + assert nc_history_index.get_latest_tx_timestamp(nc2.hash) is None + + def test_transaction_count(self) -> None: + builder = self.get_builder().enable_nc_indices() + manager = self.create_peer_from_builder(builder) + assert isinstance(manager.tx_storage, TransactionRocksDBStorage) + path = manager.tx_storage._rocksdb_storage.path + indexes_manager = not_none(manager.tx_storage.indexes) + nc_history_index = not_none(indexes_manager.nc_history) + private_key = unittest.OCB_TEST_PRIVKEY.hex() + password = unittest.OCB_TEST_PASSWORD.hex() + dag_builder = TestDAGBuilder.from_manager(manager) + artifacts = dag_builder.build_from_str(f''' + blockchain genesis b[1..11] + b10 < dummy + + ocb1.ocb_private_key = "{private_key}" + ocb1.ocb_password = "{password}" + ocb1.ocb_code = test_blueprint1.py, TestBlueprint1 + + ocb2.ocb_private_key = "{private_key}" + ocb2.ocb_password = "{password}" + ocb2.ocb_code = test_blueprint1.py, TestBlueprint1 + + nc1.nc_id = ocb1 + nc1.nc_method = initialize(0) + + nc2.nc_id = ocb2 + nc2.nc_method = initialize(0) + + nc3.nc_id = nc2 + nc3.nc_method = nop() + + nc4.nc_id = nc1 + nc4.nc_method = nop() + + nc5.nc_id = nc2 + nc5.nc_method = nop() + + nc6.nc_id = nc2 + nc6.nc_method = nop() + + nc7.nc_id = nc1 + nc7.nc_method = nop() + + ocb1 <-- ocb2 <-- b11 + b11 < nc1 < nc2 < nc3 < nc4 < nc5 < nc6 < nc7 + ''') + + artifacts.propagate_with(manager) + nc1, nc2, nc6, nc7 = artifacts.get_typed_vertices(['nc1', 'nc2', 'nc6', 'nc7'], Transaction) + + assert nc1.is_nano_contract() + assert nc2.is_nano_contract() + assert nc6.is_nano_contract() + assert nc7.is_nano_contract() + + assert nc_history_index.get_transaction_count(nc1.hash) == 3 + assert nc_history_index.get_transaction_count(nc2.hash) == 4 + + assert isinstance(manager.tx_storage, TransactionRocksDBStorage) + manager.stop() + manager.tx_storage._rocksdb_storage.close() + + # Test loading counts from existing db + builder2 = self.get_builder().set_rocksdb_path(path).enable_nc_indices() + manager2 = self.create_peer_from_builder(builder2) + indexes_manager2 = not_none(manager2.tx_storage.indexes) + nc_history_index = not_none(indexes_manager2.nc_history) + + assert nc_history_index.get_transaction_count(nc1.hash) == 3 + assert nc_history_index.get_transaction_count(nc2.hash) == 4 + + +class RocksDBNCHistoryIndexesTest(NCHistoryIndexesTest): + __test__ = True + + def setUp(self): + import tempfile + + from hathor.nanocontracts.storage import NCRocksDBStorageFactory + from hathor.transaction.storage import TransactionRocksDBStorage + from hathor.transaction.vertex_parser import VertexParser + + super().setUp() + self.wallet = Wallet() + directory = tempfile.mkdtemp() + self.tmpdirs.append(directory) + rocksdb_storage = RocksDBStorage(path=directory) + vertex_parser = VertexParser(settings=self._settings) + nc_storage_factory = NCRocksDBStorageFactory(rocksdb_storage) + self.tx_storage = TransactionRocksDBStorage(rocksdb_storage, + settings=self._settings, + vertex_parser=vertex_parser, + nc_storage_factory=nc_storage_factory) + self.genesis = self.tx_storage.get_all_genesis() + self.genesis_blocks = [tx for tx in self.genesis if tx.is_block] + self.genesis_txs = [tx for tx in self.genesis if not tx.is_block] + + # read genesis keys + self.genesis_private_key = get_genesis_key() + self.genesis_public_key = self.genesis_private_key.public_key() + + # this makes sure we can spend the genesis outputs + self.manager = self.create_peer('testnet', tx_storage=self.tx_storage, unlock_wallet=True, wallet_index=True, + utxo_index=True, nc_indices=True) + self.blocks = add_blocks_unlock_reward(self.manager) + self.last_block = self.blocks[-1] + + from hathor.graphviz import GraphvizVisualizer + self.graphviz = GraphvizVisualizer(self.tx_storage, include_verifications=True, include_funds=True)