diff --git a/tests/tx/scripts/__init__.py b/tests/tx/scripts/__init__.py new file mode 100644 index 000000000..e69de29bb diff --git a/tests/tx/scripts/test_p2pkh.py b/tests/tx/scripts/test_p2pkh.py new file mode 100644 index 000000000..9c11b4feb --- /dev/null +++ b/tests/tx/scripts/test_p2pkh.py @@ -0,0 +1,105 @@ +# Copyright 2023 Hathor Labs +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from hathor.transaction.scripts import P2PKH, Opcode +from hathor.transaction.scripts.sighash import InputsOutputsLimit, SighashBitmask + + +def test_create_input_data_simple() -> None: + pub_key = b'my_pub_key' + signature = b'my_signature' + data = P2PKH.create_input_data(public_key_bytes=pub_key, signature=signature) + + assert data == bytes([ + len(signature), + *signature, + len(pub_key), + *pub_key + ]) + + +def test_create_input_data_with_sighash_bitmask() -> None: + pub_key = b'my_pub_key' + signature = b'my_signature' + inputs_bitmask = 0b111 + outputs_bitmask = 0b101 + sighash = SighashBitmask(inputs=inputs_bitmask, outputs=outputs_bitmask) + data = P2PKH.create_input_data(public_key_bytes=pub_key, signature=signature, sighash=sighash) + + assert data == bytes([ + 1, + inputs_bitmask, + 1, + outputs_bitmask, + Opcode.OP_SIGHASH_BITMASK, + len(signature), + *signature, + len(pub_key), + *pub_key + ]) + + +def test_create_input_data_with_inputs_outputs_limit() -> None: + pub_key = b'my_pub_key' + signature = b'my_signature' + max_inputs = 2 + max_outputs = 3 + limit = InputsOutputsLimit(max_inputs=max_inputs, max_outputs=max_outputs) + data = P2PKH.create_input_data(public_key_bytes=pub_key, signature=signature, inputs_outputs_limit=limit) + + assert data == bytes([ + 1, + max_inputs, + 1, + max_outputs, + Opcode.OP_MAX_INPUTS_OUTPUTS, + len(signature), + *signature, + len(pub_key), + *pub_key + ]) + + +def test_create_input_data_with_sighash_bitmask_and_inputs_outputs_limit() -> None: + pub_key = b'my_pub_key' + signature = b'my_signature' + inputs_bitmask = 0b111 + outputs_bitmask = 0b101 + max_inputs = 2 + max_outputs = 3 + sighash = SighashBitmask(inputs=inputs_bitmask, outputs=outputs_bitmask) + limit = InputsOutputsLimit(max_inputs=max_inputs, max_outputs=max_outputs) + data = P2PKH.create_input_data( + public_key_bytes=pub_key, + signature=signature, + sighash=sighash, + inputs_outputs_limit=limit + ) + + assert data == bytes([ + 1, + inputs_bitmask, + 1, + outputs_bitmask, + Opcode.OP_SIGHASH_BITMASK, + 1, + max_inputs, + 1, + max_outputs, + Opcode.OP_MAX_INPUTS_OUTPUTS, + len(signature), + *signature, + len(pub_key), + *pub_key + ]) diff --git a/tests/tx/scripts/test_script_context.py b/tests/tx/scripts/test_script_context.py new file mode 100644 index 000000000..bba18ae0f --- /dev/null +++ b/tests/tx/scripts/test_script_context.py @@ -0,0 +1,97 @@ +# Copyright 2023 Hathor Labs +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import hashlib +from unittest.mock import Mock + +import pytest + +from hathor.conf.settings import HathorSettings +from hathor.transaction import Transaction, TxInput, TxOutput +from hathor.transaction.exceptions import ScriptError +from hathor.transaction.scripts.script_context import ScriptContext +from hathor.transaction.scripts.sighash import SighashAll, SighashBitmask + + +@pytest.mark.parametrize(['max_num_outputs'], [(99,), (255,)]) +def test_defaults(max_num_outputs: int) -> None: + settings = Mock(spec_set=HathorSettings) + settings.MAX_NUM_OUTPUTS = max_num_outputs + + context = ScriptContext(settings=settings, stack=Mock(), logs=[], extras=Mock()) + + tx = Transaction( + inputs=[ + TxInput(tx_id=b'tx1', index=0, data=b''), + TxInput(tx_id=b'tx2', index=1, data=b''), + ], + outputs=[ + TxOutput(value=11, script=b''), + TxOutput(value=22, script=b''), + ] + ) + + assert context._sighash == SighashAll() + assert context.get_tx_sighash_data(tx) == tx.get_sighash_all_data() + assert context.get_selected_outputs() == set(range(max_num_outputs)) + + +def test_set_sighash() -> None: + context = ScriptContext(settings=Mock(), stack=Mock(), logs=[], extras=Mock()) + + sighash = SighashBitmask(inputs=0b111, outputs=0b101) + context.set_sighash(sighash) + assert context._sighash == sighash + + with pytest.raises(ScriptError): + context.set_sighash(sighash) + + +@pytest.mark.parametrize( + ['outputs_bitmask', 'selected_outputs'], + [ + (0b00, set()), + (0b01, {0}), + (0b10, {1}), + (0b11, {0, 1}), + ] +) +def test_sighash_bitmask(outputs_bitmask: int, selected_outputs: set[int]) -> None: + settings = Mock() + settings.MAX_NUM_INPUTS = 88 + settings.MAX_NUM_OUTPUTS = 99 + + context = ScriptContext(settings=settings, stack=Mock(), logs=[], extras=Mock()) + tx = Transaction( + inputs=[ + TxInput(tx_id=b'tx1', index=0, data=b''), + TxInput(tx_id=b'tx2', index=1, data=b''), + ], + outputs=[ + TxOutput(value=11, script=b''), + TxOutput(value=22, script=b''), + ] + ) + + sighash_bitmask = SighashBitmask(inputs=0b11, outputs=outputs_bitmask) + context.set_sighash(sighash_bitmask) + + data = tx.get_custom_sighash_data(sighash_bitmask) + assert context.get_tx_sighash_data(tx) == hashlib.sha256(data).digest() + + with pytest.raises(ScriptError) as e: + context.set_sighash(Mock()) + + assert str(e.value) == 'Cannot modify sighash after it is already set.' + assert context.get_selected_outputs() == selected_outputs diff --git a/tests/tx/scripts/test_sighash.py b/tests/tx/scripts/test_sighash.py new file mode 100644 index 000000000..d78af971f --- /dev/null +++ b/tests/tx/scripts/test_sighash.py @@ -0,0 +1,312 @@ +# Copyright 2023 Hathor Labs +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from typing import cast +from unittest.mock import patch + +import pytest + +from hathor.crypto.util import decode_address +from hathor.exception import InvalidNewTransaction +from hathor.manager import HathorManager +from hathor.transaction import Transaction, TxInput, TxOutput +from hathor.transaction.exceptions import InputOutputMismatch, InvalidInputData, InvalidScriptError +from hathor.transaction.scripts.p2pkh import P2PKH +from hathor.transaction.scripts.sighash import InputsOutputsLimit, SighashBitmask +from hathor.transaction.static_metadata import TransactionStaticMetadata +from hathor.util import not_none +from tests import unittest +from tests.utils import add_blocks_unlock_reward, create_tokens, get_genesis_key + + +class SighashTest(unittest.TestCase): + def setUp(self) -> None: + super().setUp() + self.manager1: HathorManager = self.create_peer('testnet', unlock_wallet=True, wallet_index=True) + self.manager2: HathorManager = self.create_peer('testnet', unlock_wallet=True, wallet_index=True) + + # 1 is Alice + assert self.manager1.wallet + self.address1_b58 = self.manager1.wallet.get_unused_address() + self.private_key1 = self.manager1.wallet.get_private_key(self.address1_b58) + self.address1 = decode_address(self.address1_b58) + + # 2 is Bob + assert self.manager2.wallet + self.address2_b58 = self.manager2.wallet.get_unused_address() + self.address2 = decode_address(self.address2_b58) + + self.genesis_private_key = get_genesis_key() + self.genesis_block = self.manager1.tx_storage.get_transaction(self._settings.GENESIS_BLOCK_HASH) + + # Add some blocks so we can spend the genesis outputs + add_blocks_unlock_reward(self.manager1) + + @patch('hathor.transaction.scripts.opcode.is_opcode_valid', lambda _: True) + def test_sighash_bitmask(self) -> None: + # Create a new test token + token_creation_tx = create_tokens(self.manager1, self.address1_b58) + token_uid = token_creation_tx.tokens[0] + token_creation_utxo = token_creation_tx.outputs[0] + genesis_utxo = self.genesis_block.outputs[0] + parents = self.manager1.get_new_tx_parents() + + # Alice creates an input spending all created test tokens + tokens_input = TxInput(not_none(token_creation_tx.hash), 0, b'') + + # Alice creates an output sending half genesis HTR to herself + alice_output_script = P2PKH.create_output_script(self.address1) + htr_output = TxOutput(int(genesis_utxo.value / 2), alice_output_script) + + # Alice creates an atomic swap tx that's missing only Bob's input, with half genesis HTR + atomic_swap_tx = Transaction( + weight=1, + inputs=[tokens_input], + outputs=[htr_output], + parents=parents, + tokens=[token_uid], + storage=self.manager1.tx_storage, + timestamp=token_creation_tx.timestamp + 1 + ) + + # Alice signs her input using sighash bitmasks, instead of sighash_all. + sighash_bitmask = SighashBitmask(inputs=0b1, outputs=0b1) + data_to_sign1 = atomic_swap_tx.get_custom_sighash_data(sighash_bitmask) + assert self.manager1.wallet + public_bytes1, signature1 = self.manager1.wallet.get_input_aux_data(data_to_sign1, self.private_key1) + tokens_input.data = P2PKH.create_input_data( + public_key_bytes=public_bytes1, + signature=signature1, + sighash=sighash_bitmask, + ) + + self.manager1.cpu_mining_service.resolve(atomic_swap_tx) + static_metadata = TransactionStaticMetadata.create_from_storage( + atomic_swap_tx, self._settings, self.manager1.tx_storage + ) + atomic_swap_tx.set_static_metadata(static_metadata) + + # At this point, the tx is partial. The inputs are valid, but they're mismatched with outputs + self.manager1.verification_service.verifiers.tx.verify_inputs(atomic_swap_tx) + with pytest.raises(InputOutputMismatch): + self.manager1.verification_service.verify(atomic_swap_tx) + + # Alice sends the tx bytes to Bob, represented here by cloning the tx + atomic_swap_tx_clone = cast(Transaction, atomic_swap_tx.clone()) + + # Bob creates an input spending all genesis HTR and adds it to the atomic swap tx + htr_input = TxInput(not_none(self.genesis_block.hash), 0, b'') + atomic_swap_tx_clone.inputs.append(htr_input) + + # Bob adds an output to receive all test tokens + bob_output_script = P2PKH.create_output_script(self.address2) + tokens_output = TxOutput(token_creation_utxo.value, bob_output_script, 1) + atomic_swap_tx_clone.outputs.append(tokens_output) + + # Bob adds a change output for his HTR + htr_output = TxOutput(int(genesis_utxo.value / 2), bob_output_script) + atomic_swap_tx_clone.outputs.append(htr_output) + + # Bob signs his input using sighash_all to complete the tx + data_to_sign2 = atomic_swap_tx_clone.get_sighash_all() + assert self.manager2.wallet + public_bytes2, signature2 = self.manager2.wallet.get_input_aux_data(data_to_sign2, self.genesis_private_key) + htr_input.data = P2PKH.create_input_data(public_bytes2, signature2) + + # The atomic swap tx is now completed and valid, and can be propagated + self.manager1.cpu_mining_service.resolve(atomic_swap_tx_clone) + self.manager1.verification_service.verify(atomic_swap_tx_clone) + self.manager1.propagate_tx(atomic_swap_tx_clone, fails_silently=False) + + @patch('hathor.transaction.scripts.opcode.is_opcode_valid', lambda _: True) + def test_sighash_bitmask_with_limit(self) -> None: + # Create a new test token + token_creation_tx = create_tokens(self.manager1, self.address1_b58) + token_uid = token_creation_tx.tokens[0] + token_creation_utxo = token_creation_tx.outputs[0] + genesis_utxo = self.genesis_block.outputs[0] + parents = self.manager1.get_new_tx_parents() + + # Alice creates an input spending all created test tokens + tokens_input = TxInput(not_none(token_creation_tx.hash), 0, b'') + + # Alice creates an output sending half genesis HTR to herself + alice_output_script = P2PKH.create_output_script(self.address1) + htr_output = TxOutput(int(genesis_utxo.value / 2), alice_output_script) + + # Alice creates an atomic swap tx that's missing only Bob's input, with half genesis HTR + atomic_swap_tx = Transaction( + weight=1, + inputs=[tokens_input], + outputs=[htr_output], + parents=parents, + tokens=[token_uid], + storage=self.manager1.tx_storage, + timestamp=token_creation_tx.timestamp + 1 + ) + + # Alice signs her input using sighash bitmasks, instead of sighash_all. + # She also sets max inputs and max outputs limits, including one output for change. + sighash_bitmask = SighashBitmask(inputs=0b1, outputs=0b1) + data_to_sign1 = atomic_swap_tx.get_custom_sighash_data(sighash_bitmask) + assert self.manager1.wallet + public_bytes1, signature1 = self.manager1.wallet.get_input_aux_data(data_to_sign1, self.private_key1) + tokens_input.data = P2PKH.create_input_data( + public_key_bytes=public_bytes1, + signature=signature1, + sighash=sighash_bitmask, + inputs_outputs_limit=InputsOutputsLimit(max_inputs=2, max_outputs=3) + ) + + # At this point, the tx is partial. The inputs are valid, but they're mismatched with outputs + self.manager1.cpu_mining_service.resolve(atomic_swap_tx) + self.manager1.verification_service.verifiers.tx.verify_inputs(atomic_swap_tx) + with pytest.raises(InputOutputMismatch): + self.manager1.verification_service.verify(atomic_swap_tx) + + # Alice sends the tx bytes to Bob, represented here by cloning the tx + atomic_swap_tx_clone = cast(Transaction, atomic_swap_tx.clone()) + + # Bob creates an input spending all genesis HTR and adds it to the atomic swap tx + htr_input = TxInput(not_none(self.genesis_block.hash), 0, b'') + atomic_swap_tx_clone.inputs.append(htr_input) + + # Bob adds an output to receive all test tokens + bob_output_script = P2PKH.create_output_script(self.address2) + tokens_output = TxOutput(token_creation_utxo.value, bob_output_script, 1) + atomic_swap_tx_clone.outputs.append(tokens_output) + + # Bob adds two change outputs for his HTR, which violates the maximum tx outputs set by Alice + htr_output1 = TxOutput(genesis_utxo.value // 4, bob_output_script) + htr_output2 = TxOutput(genesis_utxo.value // 4, bob_output_script) + atomic_swap_tx_clone.outputs.append(htr_output1) + atomic_swap_tx_clone.outputs.append(htr_output2) + + # Bob signs his input using sighash_all to complete the tx + data_to_sign2 = atomic_swap_tx_clone.get_sighash_all() + assert self.manager2.wallet + public_bytes2, signature2 = self.manager2.wallet.get_input_aux_data(data_to_sign2, self.genesis_private_key) + htr_input.data = P2PKH.create_input_data(public_bytes2, signature2) + + # The atomic swap tx is not valid and cannot be propagated + self.manager1.cpu_mining_service.resolve(atomic_swap_tx_clone) + with pytest.raises(InvalidInputData) as e: + self.manager1.verification_service.verify(atomic_swap_tx_clone) + + self.assertEqual(str(e.value), "Maximum number of outputs exceeded (4 > 3).") + + with pytest.raises(InvalidNewTransaction): + self.manager1.propagate_tx(atomic_swap_tx_clone, fails_silently=False) + + @patch('hathor.transaction.scripts.opcode.is_opcode_valid', lambda _: True) + def test_sighash_bitmask_input_not_selected(self) -> None: + # Create a new test token + token_creation_tx = create_tokens(self.manager1, self.address1_b58) + token_uid = token_creation_tx.tokens[0] + parents = self.manager1.get_new_tx_parents() + + # Alice creates an input spending all created test tokens + tokens_input = TxInput(not_none(token_creation_tx.hash), 0, b'') + + # Alice creates an input spending all genesis HTR + genesis_input = TxInput(not_none(self.genesis_block.hash), 0, b'') + + # Alice creates an atomic swap tx + atomic_swap_tx = Transaction( + weight=1, + inputs=[tokens_input, genesis_input], + outputs=[], + parents=parents, + tokens=[token_uid], + storage=self.manager1.tx_storage, + timestamp=token_creation_tx.timestamp + 1 + ) + self.manager1.cpu_mining_service.resolve(atomic_swap_tx) + + # Alice signs her token input using sighash bitmasks, instead of sighash_all. + sighash_bitmask = SighashBitmask(inputs=0b01, outputs=0b00) + data_to_sign1 = atomic_swap_tx.get_custom_sighash_data(sighash_bitmask) + assert self.manager1.wallet + public_bytes1, signature1 = self.manager1.wallet.get_input_aux_data(data_to_sign1, self.private_key1) + tokens_input.data = P2PKH.create_input_data( + public_key_bytes=public_bytes1, + signature=signature1, + sighash=sighash_bitmask, + ) + + # Alice signs her genesis input using the same sighash, so the genesis input is not selected in the bitmask. + public_bytes1, signature1 = self.manager1.wallet.get_input_aux_data(data_to_sign1, self.genesis_private_key) + genesis_input.data = P2PKH.create_input_data( + public_key_bytes=public_bytes1, + signature=signature1, + sighash=sighash_bitmask, + ) + + # The inputs are invalid, since one of them doesn't select itself. + with pytest.raises(InvalidInputData) as e: + self.manager1.verification_service.verifiers.tx.verify_inputs(atomic_swap_tx) + + self.assertEqual(str(e.value), 'Input at index 1 must select itself when using a custom sighash.') + + with pytest.raises(InvalidInputData) as e: + self.manager1.verification_service.verify(atomic_swap_tx) + + self.assertEqual(str(e.value), 'Input at index 1 must select itself when using a custom sighash.') + + @patch('hathor.transaction.scripts.opcode.is_opcode_valid', lambda _: True) + def test_sighash_bitmask_nonexistent_input(self) -> None: + # Create a new test token + token_creation_tx = create_tokens(self.manager1, self.address1_b58) + token_uid = token_creation_tx.tokens[0] + genesis_utxo = self.genesis_block.outputs[0] + parents = self.manager1.get_new_tx_parents() + + # Alice creates an input spending all created test tokens + tokens_input = TxInput(not_none(token_creation_tx.hash), 0, b'') + + # Alice creates an output sending half genesis HTR to herself + alice_output_script = P2PKH.create_output_script(self.address1) + htr_output = TxOutput(int(genesis_utxo.value / 2), alice_output_script) + + # Alice creates an atomic swap tx that's missing only Bob's input, with half genesis HTR + atomic_swap_tx = Transaction( + weight=1, + inputs=[tokens_input], + outputs=[htr_output], + parents=parents, + tokens=[token_uid], + storage=self.manager1.tx_storage, + timestamp=token_creation_tx.timestamp + 1 + ) + self.manager1.cpu_mining_service.resolve(atomic_swap_tx) + + # Alice signs her input using sighash bitmasks, instead of sighash_all. + sighash_bitmask = SighashBitmask(inputs=0b1, outputs=0b1) + data_to_sign1 = atomic_swap_tx.get_custom_sighash_data(sighash_bitmask) + assert self.manager1.wallet + public_bytes1, signature1 = self.manager1.wallet.get_input_aux_data(data_to_sign1, self.private_key1) + tokens_input.data = P2PKH.create_input_data( + public_key_bytes=public_bytes1, + signature=signature1, + sighash=SighashBitmask(inputs=0b11, outputs=0b1), + ) + + # The input is invalid, since it selects a nonexistent input + with pytest.raises(InvalidScriptError) as e: + self.manager1.verification_service.verifiers.tx.verify_inputs(atomic_swap_tx) + + assert str(e.value) == 'Custom sighash selected nonexistent input/output.' + + with pytest.raises(InvalidScriptError): + self.manager1.verification_service.verify(atomic_swap_tx) diff --git a/tests/tx/scripts/test_tx_sighash.py b/tests/tx/scripts/test_tx_sighash.py new file mode 100644 index 000000000..516038b9d --- /dev/null +++ b/tests/tx/scripts/test_tx_sighash.py @@ -0,0 +1,77 @@ +# Copyright 2023 Hathor Labs +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from unittest.mock import patch + +from hathor.transaction import Transaction, TxInput, TxOutput +from hathor.transaction.scripts.sighash import SighashBitmask + + +def test_get_sighash_bitmask() -> None: + inputs = [ + TxInput(tx_id=b'tx1', index=0, data=b''), + TxInput(tx_id=b'tx2', index=1, data=b''), + TxInput(tx_id=b'tx3', index=1, data=b''), + TxInput(tx_id=b'tx4', index=1, data=b''), + TxInput(tx_id=b'tx5', index=1, data=b''), + TxInput(tx_id=b'tx6', index=1, data=b''), + TxInput(tx_id=b'tx7', index=1, data=b''), + TxInput(tx_id=b'tx8', index=1, data=b''), + ] + outputs = [ + TxOutput(value=11, script=b''), + TxOutput(value=22, script=b''), + TxOutput(value=33, script=b''), + TxOutput(value=44, script=b''), + TxOutput(value=55, script=b''), + TxOutput(value=66, script=b''), + TxOutput(value=77, script=b''), + TxOutput(value=88, script=b''), + ] + tx = Transaction(inputs=inputs, outputs=outputs) + + with patch.object(tx, '_get_sighash') as mock: + tx.get_custom_sighash_data(SighashBitmask(inputs=0b0000_0001, outputs=0b0000_0000)) + mock.assert_called_once_with(inputs=inputs[0:1], outputs=[]) + mock.reset_mock() + + tx.get_custom_sighash_data(SighashBitmask(inputs=0b0000_0011, outputs=0b0000_0001)) + mock.assert_called_once_with(inputs=inputs[0:2], outputs=outputs[0:1]) + mock.reset_mock() + + tx.get_custom_sighash_data(SighashBitmask(inputs=0b0000_1111, outputs=0b0000_1111)) + mock.assert_called_once_with(inputs=inputs[0:4], outputs=outputs[0:4]) + mock.reset_mock() + + tx.get_custom_sighash_data(SighashBitmask(inputs=0b1110_0000, outputs=0b0111_0000)) + mock.assert_called_once_with(inputs=inputs[5:8], outputs=outputs[4:7]) + mock.reset_mock() + + tx.get_custom_sighash_data(SighashBitmask(inputs=0b1101_1010, outputs=0b1110_0010)) + mock.assert_called_once_with( + inputs=[ + inputs[1], + inputs[3], + inputs[4], + inputs[6], + inputs[7], + ], + outputs=[ + outputs[1], + outputs[5], + outputs[6], + outputs[7], + ] + ) + mock.reset_mock() diff --git a/tests/tx/test_scripts.py b/tests/tx/test_scripts.py index a3fd98a92..76b96feda 100644 --- a/tests/tx/test_scripts.py +++ b/tests/tx/test_scripts.py @@ -1,16 +1,22 @@ import struct -from unittest.mock import Mock +from unittest.mock import Mock, patch +import pytest from cryptography.hazmat.primitives import hashes from cryptography.hazmat.primitives.asymmetric import ec from hathor.crypto.util import get_address_from_public_key, get_hash160, get_public_key_bytes_compressed from hathor.transaction.exceptions import ( + CustomSighashModelInvalid, DataIndexError, EqualVerifyFailed, FinalStackInvalid, + InputNotSelectedError, + InputsOutputsLimitModelInvalid, InvalidScriptError, InvalidStackData, + MaxInputsExceededError, + MaxOutputsExceededError, MissingStackItems, OracleChecksigFailed, OutOfData, @@ -38,6 +44,8 @@ get_script_op, ) from hathor.transaction.scripts.opcode import ( + execute_op_code, + is_opcode_valid, op_checkdatasig, op_checkmultisig, op_checksig, @@ -52,10 +60,13 @@ op_greaterthan_timestamp, op_hash160, op_integer, + op_max_inputs_outputs, op_pushdata, op_pushdata1, + op_sighash_bitmask, ) from hathor.transaction.scripts.script_context import ScriptContext +from hathor.transaction.scripts.sighash import SighashBitmask from hathor.transaction.storage import TransactionMemoryStorage from hathor.wallet import HDWallet from tests import unittest @@ -980,3 +991,112 @@ def test_get_sigops_count(self): self.assertEqual(get_sigops_count(input_script.data, p2pkh_script), 1) # if no output_script, count only input self.assertEqual(get_sigops_count(input_script.data), 1) + + def test_op_sighash_bitmask(self) -> None: + with self.assertRaises(MissingStackItems): + op_sighash_bitmask(ScriptContext(stack=[], extras=Mock(), logs=[], settings=Mock())) + + with self.assertRaises(MissingStackItems): + op_sighash_bitmask(ScriptContext(stack=[b''], extras=Mock(), logs=[], settings=Mock())) + + with self.assertRaises(AssertionError): + op_sighash_bitmask(ScriptContext(stack=[0b110, 0b101], extras=Mock(), logs=[], settings=Mock())) + + stack: list[bytes | int | str] = [bytes([0b0]), bytes([0xFF, 0xFF])] + context = Mock(spec_set=ScriptContext) + extras = Mock(spec_set=ScriptExtras) + context.stack = stack + context.extras = extras + + with self.assertRaises(CustomSighashModelInvalid): + op_sighash_bitmask(context) + + context.stack = [bytes([0b110]), bytes([0b101])] + extras.input_index = 3 + + with self.assertRaises(InputNotSelectedError): + op_sighash_bitmask(context) + + context.stack = [bytes([0b110]), bytes([0b101])] + extras.input_index = 2 + op_sighash_bitmask(context) + + self.assertEqual(stack, []) + context.set_sighash.assert_called_once_with( + SighashBitmask( + inputs=0b110, + outputs=0b101 + ) + ) + + def test_op_max_inputs_outputs(self) -> None: + context = Mock(spec_set=ScriptContext) + with self.assertRaises(MissingStackItems): + context.stack = [] + op_max_inputs_outputs(context) + + with self.assertRaises(MissingStackItems): + context.stack = [b''] + op_max_inputs_outputs(context) + + with self.assertRaises(AssertionError): + context.stack = [1, 2] + op_max_inputs_outputs(context) + + context.stack = [bytes([0]), bytes([0])] + context.extras = Mock(spec_set=ScriptExtras) + + with self.assertRaises(InputsOutputsLimitModelInvalid): + op_max_inputs_outputs(context) + + context.stack = [bytes([1]), bytes([2])] + context.extras.tx.inputs = ['a', 'b'] + + with self.assertRaises(MaxInputsExceededError): + op_max_inputs_outputs(context) + + context.stack = [bytes([1]), bytes([2])] + context.extras.tx.inputs = ['a'] + context.extras.tx.outputs = ['a', 'b', 'c'] + + with self.assertRaises(MaxOutputsExceededError): + op_max_inputs_outputs(context) + + context.stack = [bytes([1]), bytes([2])] + context.extras.tx.inputs = ['a'] + context.extras.tx.outputs = ['a', 'b'] + + op_max_inputs_outputs(context) + + self.assertEqual(context.stack, []) + + def test_execute_op_code(self) -> None: + # Test that when `is_opcode_valid` returns False, execution must fail, regardless of the opcode. + with ( + patch('hathor.transaction.scripts.opcode.is_opcode_valid', lambda _: False), + pytest.raises(ScriptError) as e + ): + opcode = Mock() + opcode.name = 'MyOp' + execute_op_code(opcode=opcode, context=Mock()) + assert str(e.value) == 'Opcode "MyOp" is invalid.' + + # Test that when `is_opcode_valid` returns True, execution must fail if it's not a "function opcode". + with ( + patch('hathor.transaction.scripts.opcode.is_opcode_valid', lambda _: True), + pytest.raises(ScriptError) as e + ): + execute_op_code(opcode=Opcode.OP_0, context=Mock()) + assert str(e.value) == f'unknown opcode: {Opcode.OP_0}' + + # Test that a valid opcode is correctly executed. + with patch('hathor.transaction.scripts.opcode.op_dup') as op_mock: + execute_op_code(opcode=Opcode.OP_DUP, context=Mock()) + + op_mock.assert_called_once() + + def test_is_opcode_valid(self) -> None: + self.assertTrue(is_opcode_valid(Opcode.OP_DUP)) + self.assertFalse(is_opcode_valid(Opcode.OP_SIGHASH_BITMASK)) + self.assertFalse(is_opcode_valid(Opcode.OP_SIGHASH_RANGE)) + self.assertFalse(is_opcode_valid(Opcode.OP_MAX_INPUTS_OUTPUTS)) diff --git a/tests/tx/test_tx.py b/tests/tx/test_tx.py index 747ffa1bc..832267f58 100644 --- a/tests/tx/test_tx.py +++ b/tests/tx/test_tx.py @@ -24,6 +24,7 @@ InvalidOutputScriptSize, InvalidOutputValue, NoInputError, + OutputNotSelected, ParentDoesNotExist, PowError, TimestampError, @@ -34,6 +35,7 @@ WeightError, ) from hathor.transaction.scripts import P2PKH, parse_address_script +from hathor.transaction.scripts.sighash import SighashBitmask from hathor.transaction.util import int_to_bytes from hathor.transaction.validation_state import ValidationState from hathor.wallet import Wallet @@ -149,6 +151,37 @@ def test_too_many_outputs(self): with self.assertRaises(TooManyOutputs): self._verifiers.vertex.verify_number_of_outputs(tx) + @patch('hathor.transaction.scripts.opcode.is_opcode_valid', lambda _: True) + def test_output_not_selected(self) -> None: + parents = [tx.hash for tx in self.genesis_txs] + genesis_block = self.genesis_blocks[0] + + value = genesis_block.outputs[0].value + address = get_address_from_public_key(self.genesis_public_key) + script = P2PKH.create_output_script(address) + output = TxOutput(value, script) + + tx_input = TxInput(genesis_block.hash, 0, b'') + tx = Transaction( + weight=1, + inputs=[tx_input], + outputs=[output], + parents=parents, + storage=self.tx_storage, + timestamp=self.last_block.timestamp + 1 + ) + + sighash = SighashBitmask(inputs=0b1, outputs=0b0) + data_to_sign = tx.get_custom_sighash_data(sighash) + public_bytes, signature = self.wallet.get_input_aux_data(data_to_sign, self.genesis_private_key) + tx_input.data = P2PKH.create_input_data(public_bytes, signature, sighash=sighash) + + self.manager.cpu_mining_service.resolve(tx) + with pytest.raises(OutputNotSelected) as e: + self.manager.verification_service.verify(tx) + + self.assertEqual(str(e.value), "Output at index 0 is not signed by any input.") + def _gen_tx_spending_genesis_block(self): parents = [tx.hash for tx in self.genesis_txs] genesis_block = self.genesis_blocks[0]