diff --git a/hathor/transaction/exceptions.py b/hathor/transaction/exceptions.py index 2d1bfbda8..1f9a2c1f2 100644 --- a/hathor/transaction/exceptions.py +++ b/hathor/transaction/exceptions.py @@ -74,10 +74,18 @@ class ConflictingInputs(TxValidationError): """Inputs in the tx are spending the same output""" +class OutputNotSelected(TxValidationError): + """At least one output is not selected for signing by some input.""" + + class TooManyOutputs(TxValidationError): """More than 256 outputs""" +class TooManySighashSubsets(TxValidationError): + """More sighash subsets than the configured maximum.""" + + class InvalidOutputValue(TxValidationError): """Value of output is invalid""" @@ -202,3 +210,25 @@ class VerifyFailed(ScriptError): class TimeLocked(ScriptError): """Transaction is invalid because it is time locked""" + + +class InputNotSelectedError(ScriptError): + """Raised when an input does not select itself for signing in its script.""" + + +class MaxInputsExceededError(ScriptError): + """The transaction has more inputs than the maximum configured in the script.""" + + +class MaxOutputsExceededError(ScriptError): + """The transaction has more outputs than the maximum configured in the script.""" + + +class InputsOutputsLimitModelInvalid(ScriptError): + """ + Raised when the inputs outputs limit model could not be constructed from the arguments provided in the script. + """ + + +class CustomSighashModelInvalid(ScriptError): + """Raised when the sighash model could not be constructed from the arguments provided in the script.""" diff --git a/hathor/transaction/scripts/execute.py b/hathor/transaction/scripts/execute.py index 23109afbc..4a9a743b0 100644 --- a/hathor/transaction/scripts/execute.py +++ b/hathor/transaction/scripts/execute.py @@ -13,17 +13,33 @@ # limitations under the License. import struct -from typing import NamedTuple, Optional, Union +from dataclasses import dataclass +from typing import TYPE_CHECKING, NamedTuple, Optional, Union +from hathor.conf.get_settings import get_global_settings from hathor.transaction import BaseTransaction, Transaction, TxInput from hathor.transaction.exceptions import DataIndexError, FinalStackInvalid, InvalidScriptError, OutOfData +if TYPE_CHECKING: + from hathor.transaction.scripts.script_context import ScriptContext + -class ScriptExtras(NamedTuple): +@dataclass(slots=True, frozen=True, kw_only=True) +class ScriptExtras: + """ + A simple container for auxiliary data that may be used during execution of scripts. + """ tx: Transaction - txin: TxInput + input_index: int spent_tx: BaseTransaction + @property + def txin(self) -> TxInput: + return self.tx.inputs[self.input_index] + + def __post_init__(self) -> None: + assert self.txin.tx_id == self.spent_tx.hash + # XXX: Because the Stack is a heterogeneous list of bytes and int, and some OPs only work for when the stack has some # or the other type, there are many places that require an assert to prevent the wrong type from being used, @@ -39,7 +55,7 @@ class OpcodePosition(NamedTuple): position: int -def execute_eval(data: bytes, log: list[str], extras: ScriptExtras) -> None: +def execute_eval(data: bytes, log: list[str], extras: ScriptExtras) -> 'ScriptContext': """ Execute eval from data executing opcode methods :param data: data to be evaluated that contains data and opcodes @@ -56,8 +72,9 @@ def execute_eval(data: bytes, log: list[str], extras: ScriptExtras) -> None: """ from hathor.transaction.scripts.opcode import Opcode, execute_op_code from hathor.transaction.scripts.script_context import ScriptContext + settings = get_global_settings() stack: Stack = [] - context = ScriptContext(stack=stack, logs=log, extras=extras) + context = ScriptContext(settings=settings, stack=stack, logs=log, extras=extras) data_len = len(data) pos = 0 while pos < data_len: @@ -70,6 +87,8 @@ def execute_eval(data: bytes, log: list[str], extras: ScriptExtras) -> None: evaluate_final_stack(stack, log) + return context + def evaluate_final_stack(stack: Stack, log: list[str]) -> None: """ Checks the final state of the stack. @@ -88,7 +107,7 @@ def evaluate_final_stack(stack: Stack, log: list[str]) -> None: raise FinalStackInvalid('\n'.join(log)) -def script_eval(tx: Transaction, txin: TxInput, spent_tx: BaseTransaction) -> None: +def script_eval(tx: Transaction, spent_tx: BaseTransaction, *, input_index: int) -> 'ScriptContext': """Evaluates the output script and input data according to a very limited subset of Bitcoin's scripting language. @@ -103,10 +122,10 @@ def script_eval(tx: Transaction, txin: TxInput, spent_tx: BaseTransaction) -> No :raises ScriptError: if script verification fails """ - input_data = txin.data - output_script = spent_tx.outputs[txin.index].script + extras = ScriptExtras(tx=tx, input_index=input_index, spent_tx=spent_tx) + input_data = extras.txin.data + output_script = spent_tx.outputs[extras.txin.index].script log: list[str] = [] - extras = ScriptExtras(tx=tx, txin=txin, spent_tx=spent_tx) from hathor.transaction.scripts import MultiSig if MultiSig.re_match.search(output_script): @@ -115,17 +134,17 @@ def script_eval(tx: Transaction, txin: TxInput, spent_tx: BaseTransaction) -> No # we can't use input_data + output_script because it will end with an invalid stack # i.e. the signatures will still be on the stack after ouput_script is executed redeem_script_pos = MultiSig.get_multisig_redeem_script_pos(input_data) - full_data = txin.data[redeem_script_pos:] + output_script + full_data = extras.txin.data[redeem_script_pos:] + output_script execute_eval(full_data, log, extras) # Second, we need to validate that the signatures on the input_data solves the redeem_script # we pop and append the redeem_script to the input_data and execute it multisig_data = MultiSig.get_multisig_data(extras.txin.data) - execute_eval(multisig_data, log, extras) + return execute_eval(multisig_data, log, extras) else: # merge input_data and output_script full_data = input_data + output_script - execute_eval(full_data, log, extras) + return execute_eval(full_data, log, extras) def decode_opn(opcode: int) -> int: diff --git a/hathor/transaction/scripts/hathor_script.py b/hathor/transaction/scripts/hathor_script.py index 0a1214c1b..d5f311cde 100644 --- a/hathor/transaction/scripts/hathor_script.py +++ b/hathor/transaction/scripts/hathor_script.py @@ -15,7 +15,16 @@ import struct from typing import Union +from typing_extensions import assert_never + from hathor.transaction.scripts.opcode import Opcode +from hathor.transaction.scripts.sighash import ( + InputsOutputsLimit, + SighashAll, + SighashBitmask, + SighashRange, + SighashType, +) class HathorScript: @@ -49,3 +58,38 @@ def pushData(self, data: Union[int, bytes]) -> None: self.data += (bytes([len(data)]) + data) else: self.data += (bytes([Opcode.OP_PUSHDATA1]) + bytes([len(data)]) + data) + + def push_sighash(self, sighash: SighashType) -> None: + """Push a custom sighash to the script.""" + match sighash: + case SighashAll(): + pass + case SighashBitmask(): + self.pushData(sighash.inputs) + self.pushData(sighash.outputs) + self.addOpcode(Opcode.OP_SIGHASH_BITMASK) + case SighashRange(): + self.pushData(sighash.input_start) + self.pushData(sighash.input_end) + self.pushData(sighash.output_start) + self.pushData(sighash.output_end) + self.addOpcode(Opcode.OP_SIGHASH_RANGE) + case _: + assert_never(sighash) + + def push_max_sighash_subsets(self, max_subsets: int | None) -> None: + """Push a maximum limit for custom sighash subsets.""" + if max_subsets is None: + return + + self.pushData(max_subsets) + self.addOpcode(Opcode.OP_MAX_SIGHASH_SUBSETS) + + def push_inputs_outputs_limit(self, limit: InputsOutputsLimit | None) -> None: + """Push a custom inputs and outputs limit to the script.""" + if not limit: + return + + self.pushData(limit.max_inputs) + self.pushData(limit.max_outputs) + self.addOpcode(Opcode.OP_MAX_INPUTS_OUTPUTS) diff --git a/hathor/transaction/scripts/multi_sig.py b/hathor/transaction/scripts/multi_sig.py index 7fe4f10ed..66b9b50bf 100644 --- a/hathor/transaction/scripts/multi_sig.py +++ b/hathor/transaction/scripts/multi_sig.py @@ -21,6 +21,7 @@ from hathor.transaction.scripts.execute import Stack, get_script_op from hathor.transaction.scripts.hathor_script import HathorScript from hathor.transaction.scripts.opcode import Opcode, op_pushdata, op_pushdata1 +from hathor.transaction.scripts.sighash import InputsOutputsLimit, SighashAll, SighashType class MultiSig(BaseScript): @@ -111,7 +112,15 @@ def create_output_script(cls, address: bytes, timelock: Optional[Any] = None) -> return s.data @classmethod - def create_input_data(cls, redeem_script: bytes, signatures: list[bytes]) -> bytes: + def create_input_data( + cls, + redeem_script: bytes, + signatures: list[bytes], + *, + sighash: SighashType = SighashAll(), + max_sighash_subsets: int | None = None, + inputs_outputs_limit: InputsOutputsLimit | None = None + ) -> bytes: """ :param redeem_script: script to redeem the tokens: ... :type redeem_script: bytes @@ -122,6 +131,9 @@ def create_input_data(cls, redeem_script: bytes, signatures: list[bytes]) -> byt :rtype: bytes """ s = HathorScript() + s.push_sighash(sighash) + s.push_max_sighash_subsets(max_sighash_subsets) + s.push_inputs_outputs_limit(inputs_outputs_limit) for signature in signatures: s.pushData(signature) s.pushData(redeem_script) diff --git a/hathor/transaction/scripts/opcode.py b/hathor/transaction/scripts/opcode.py index 460c66821..acd51c67a 100644 --- a/hathor/transaction/scripts/opcode.py +++ b/hathor/transaction/scripts/opcode.py @@ -16,6 +16,7 @@ import struct from enum import IntEnum +import pydantic from cryptography.exceptions import InvalidSignature from cryptography.hazmat.primitives import hashes from cryptography.hazmat.primitives.asymmetric import ec @@ -28,9 +29,14 @@ is_pubkey_compressed, ) from hathor.transaction.exceptions import ( + CustomSighashModelInvalid, EqualVerifyFailed, + InputNotSelectedError, + InputsOutputsLimitModelInvalid, InvalidScriptError, InvalidStackData, + MaxInputsExceededError, + MaxOutputsExceededError, MissingStackItems, OracleChecksigFailed, ScriptError, @@ -39,6 +45,8 @@ ) from hathor.transaction.scripts.execute import Stack, binary_to_int, decode_opn, get_data_value, get_script_op from hathor.transaction.scripts.script_context import ScriptContext +from hathor.transaction.scripts.sighash import InputsOutputsLimit, SighashBitmask, SighashRange +from hathor.transaction.util import bytes_to_int class Opcode(IntEnum): @@ -72,6 +80,10 @@ class Opcode(IntEnum): OP_DATA_GREATERTHAN = 0xC1 OP_FIND_P2PKH = 0xD0 OP_DATA_MATCH_VALUE = 0xD1 + OP_SIGHASH_BITMASK = 0xE0 + OP_SIGHASH_RANGE = 0xE1 + OP_MAX_SIGHASH_SUBSETS = 0xE2 + OP_MAX_INPUTS_OUTPUTS = 0xE3 @classmethod def is_pushdata(cls, opcode: int) -> bool: @@ -249,7 +261,8 @@ def op_checksig(context: ScriptContext) -> None: # pubkey is not compressed public key raise ScriptError('OP_CHECKSIG: pubkey is not a public key') from e try: - public_key.verify(signature, context.extras.tx.get_sighash_all_data(), ec.ECDSA(hashes.SHA256())) + sighash_data = context.get_tx_sighash_data(context.extras.tx) + public_key.verify(signature, sighash_data, ec.ECDSA(hashes.SHA256())) # valid, push true to stack context.stack.append(1) except InvalidSignature: @@ -579,21 +592,25 @@ def op_checkmultisig(context: ScriptContext) -> None: # For each signature we check if it's valid with one of the public keys # Signatures must be in order (same as the public keys in the multi sig wallet) pubkey_index = 0 + old_stack = context.stack for signature in signatures: while pubkey_index < len(pubkeys): pubkey = pubkeys[pubkey_index] new_stack = [signature, pubkey] - op_checksig(ScriptContext(stack=new_stack, logs=context.logs, extras=context.extras)) + context.stack = new_stack + op_checksig(context) result = new_stack.pop() pubkey_index += 1 if result == 1: break else: # finished all pubkeys and did not verify all signatures + context.stack = old_stack context.stack.append(0) return # If all signatures are valids we push 1 + context.stack = old_stack context.stack.append(1) @@ -617,6 +634,103 @@ def op_integer(opcode: int, stack: Stack) -> None: raise ScriptError(e) from e +def op_sighash_bitmask(context: ScriptContext) -> None: + """Pop two items from the stack, constructing a sighash bitmask and setting it in the script context.""" + if len(context.stack) < 2: + raise MissingStackItems(f'OP_SIGHASH_BITMASK: expected 2 elements on stack, has {len(context.stack)}') + + outputs = context.stack.pop() + inputs = context.stack.pop() + assert isinstance(inputs, bytes) + assert isinstance(outputs, bytes) + + try: + sighash = SighashBitmask( + inputs=bytes_to_int(inputs), + outputs=bytes_to_int(outputs) + ) + except pydantic.ValidationError as e: + raise CustomSighashModelInvalid('Could not construct sighash bitmask.') from e + + if context.extras.input_index not in sighash.get_input_indexes(): + raise InputNotSelectedError( + f'Input at index {context.extras.input_index} must select itself when using a custom sighash.' + ) + + context.set_sighash(sighash) + + +def op_sighash_range(context: ScriptContext) -> None: + """Pop four items from the stack, constructing a sighash range and setting it in the script context.""" + if len(context.stack) < 4: + raise MissingStackItems(f'OP_SIGHASH_RANGE: expected 4 elements on stack, has {len(context.stack)}') + + output_end = context.stack.pop() + output_start = context.stack.pop() + input_end = context.stack.pop() + input_start = context.stack.pop() + assert isinstance(output_end, bytes) + assert isinstance(output_start, bytes) + assert isinstance(input_end, bytes) + assert isinstance(input_start, bytes) + + try: + sighash = SighashRange( + input_start=bytes_to_int(input_start), + input_end=bytes_to_int(input_end), + output_start=bytes_to_int(output_start), + output_end=bytes_to_int(output_end), + ) + except Exception as e: + raise CustomSighashModelInvalid('Could not construct sighash range.') from e + + if context.extras.input_index not in sighash.get_input_indexes(): + raise InputNotSelectedError( + f'Input at index {context.extras.input_index} must select itself when using a custom sighash.' + ) + + context.set_sighash(sighash) + + +def op_max_sighash_subsets(context: ScriptContext) -> None: + """Pop one item from the stack, setting it in the script context.""" + if len(context.stack) < 1: + raise MissingStackItems(f'OP_MAX_SIGHASH_SUBSETS: expected 1 element on stack, has {len(context.stack)}') + + max_subsets = context.stack.pop() + assert isinstance(max_subsets, bytes) + max_subsets = bytes_to_int(max_subsets) + + context.set_max_sighash_subsets(max_subsets) + + +def op_max_inputs_outputs(context: ScriptContext) -> None: + """Pop two items from the stack, constructing an inputs and outputs limit and setting it in the script context.""" + if len(context.stack) < 2: + raise MissingStackItems(f'OP_MAX_INPUTS_OUTPUTS: expected 2 elements on stack, has {len(context.stack)}') + + max_outputs = context.stack.pop() + max_inputs = context.stack.pop() + assert isinstance(max_inputs, bytes) + assert isinstance(max_outputs, bytes) + + try: + limit = InputsOutputsLimit( + max_inputs=bytes_to_int(max_inputs), + max_outputs=bytes_to_int(max_outputs) + ) + except pydantic.ValidationError as e: + raise InputsOutputsLimitModelInvalid("Could not construct inputs and outputs limits.") from e + + tx_inputs_len = len(context.extras.tx.inputs) + if tx_inputs_len > limit.max_inputs: + raise MaxInputsExceededError(f'Maximum number of inputs exceeded ({tx_inputs_len} > {limit.max_inputs}).') + + tx_outputs_len = len(context.extras.tx.outputs) + if tx_outputs_len > limit.max_outputs: + raise MaxOutputsExceededError(f'Maximum number of outputs exceeded ({tx_outputs_len} > {limit.max_outputs}).') + + def execute_op_code(opcode: Opcode, context: ScriptContext) -> None: """ Execute a function opcode. @@ -625,6 +739,8 @@ def execute_op_code(opcode: Opcode, context: ScriptContext) -> None: opcode: the opcode to be executed. context: the script context to be manipulated. """ + if not is_opcode_valid(opcode): + raise ScriptError(f'Opcode "{opcode.name}" is invalid.') context.logs.append(f'Executing function opcode {opcode.name} ({hex(opcode.value)})') match opcode: case Opcode.OP_DUP: op_dup(context) @@ -639,4 +755,28 @@ def execute_op_code(opcode: Opcode, context: ScriptContext) -> None: case Opcode.OP_DATA_MATCH_VALUE: op_data_match_value(context) case Opcode.OP_CHECKDATASIG: op_checkdatasig(context) case Opcode.OP_FIND_P2PKH: op_find_p2pkh(context) + case Opcode.OP_SIGHASH_BITMASK: op_sighash_bitmask(context) + case Opcode.OP_SIGHASH_RANGE: op_sighash_range(context) + case Opcode.OP_MAX_SIGHASH_SUBSETS: op_max_sighash_subsets(context) + case Opcode.OP_MAX_INPUTS_OUTPUTS: op_max_inputs_outputs(context) case _: raise ScriptError(f'unknown opcode: {opcode}') + + +def is_opcode_valid(opcode: Opcode) -> bool: + """Return whether an opcode is valid, that is, it's currently enabled.""" + valid_opcodes = [ + Opcode.OP_DUP, + Opcode.OP_EQUAL, + Opcode.OP_EQUALVERIFY, + Opcode.OP_CHECKSIG, + Opcode.OP_HASH160, + Opcode.OP_GREATERTHAN_TIMESTAMP, + Opcode.OP_CHECKMULTISIG, + Opcode.OP_DATA_STREQUAL, + Opcode.OP_DATA_GREATERTHAN, + Opcode.OP_DATA_MATCH_VALUE, + Opcode.OP_CHECKDATASIG, + Opcode.OP_FIND_P2PKH, + ] + + return opcode in valid_opcodes diff --git a/hathor/transaction/scripts/p2pkh.py b/hathor/transaction/scripts/p2pkh.py index 52812680c..c26c322ed 100644 --- a/hathor/transaction/scripts/p2pkh.py +++ b/hathor/transaction/scripts/p2pkh.py @@ -20,6 +20,7 @@ from hathor.transaction.scripts.construct import get_pushdata, re_compile from hathor.transaction.scripts.hathor_script import HathorScript from hathor.transaction.scripts.opcode import Opcode +from hathor.transaction.scripts.sighash import InputsOutputsLimit, SighashAll, SighashType class P2PKH(BaseScript): @@ -91,7 +92,15 @@ def create_output_script(cls, address: bytes, timelock: Optional[Any] = None) -> return s.data @classmethod - def create_input_data(cls, public_key_bytes: bytes, signature: bytes) -> bytes: + def create_input_data( + cls, + public_key_bytes: bytes, + signature: bytes, + *, + sighash: SighashType = SighashAll(), + max_sighash_subsets: int | None = None, + inputs_outputs_limit: InputsOutputsLimit | None = None + ) -> bytes: """ :param private_key: key corresponding to the address we want to spend tokens from :type private_key: :py:class:`cryptography.hazmat.primitives.asymmetric.ec.EllipticCurvePrivateKey` @@ -99,8 +108,12 @@ def create_input_data(cls, public_key_bytes: bytes, signature: bytes) -> bytes: :rtype: bytes """ s = HathorScript() + s.push_sighash(sighash) + s.push_max_sighash_subsets(max_sighash_subsets) + s.push_inputs_outputs_limit(inputs_outputs_limit) s.pushData(signature) s.pushData(public_key_bytes) + return s.data @classmethod diff --git a/hathor/transaction/scripts/script_context.py b/hathor/transaction/scripts/script_context.py index 925a881f1..2641fb340 100644 --- a/hathor/transaction/scripts/script_context.py +++ b/hathor/transaction/scripts/script_context.py @@ -12,14 +12,76 @@ # See the License for the specific language governing permissions and # limitations under the License. +import hashlib + +from typing_extensions import assert_never + +from hathor.conf.settings import HathorSettings +from hathor.transaction import Transaction +from hathor.transaction.exceptions import ScriptError from hathor.transaction.scripts.execute import ScriptExtras, Stack +from hathor.transaction.scripts.sighash import SighashAll, SighashBitmask, SighashRange, SighashType class ScriptContext: """A context to be manipulated during script execution. A separate instance must be used for each script.""" - __slots__ = ('stack', 'logs', 'extras') + __slots__ = ('stack', 'logs', 'extras', '_settings', '_sighash', '_max_sighash_subsets') - def __init__(self, *, stack: Stack, logs: list[str], extras: ScriptExtras) -> None: + def __init__(self, *, stack: Stack, logs: list[str], extras: ScriptExtras, settings: HathorSettings) -> None: self.stack = stack self.logs = logs self.extras = extras + self._settings = settings + self._sighash: SighashType = SighashAll() + self._max_sighash_subsets: int | None = None + + def set_sighash(self, sighash: SighashType) -> None: + """ + Set a Sighash type in this context. + It can only be set once, that is, a script cannot use more than one sighash type. + """ + if type(self._sighash) is not SighashAll: + raise ScriptError('Cannot modify sighash after it is already set.') + + self._sighash = sighash + + def get_sighash(self) -> SighashType: + """Get the configured sighash in this context.""" + return self._sighash + + def set_max_sighash_subsets(self, max_subsets: int) -> None: + """ + Set the maximum number of sighash subsets in this context. + """ + if self._max_sighash_subsets is None: + self._max_sighash_subsets = max_subsets + else: + self._max_sighash_subsets = min(self._max_sighash_subsets, max_subsets) + + def get_max_sighash_subsets(self) -> int | None: + """Get the configured maximum number of sighash subsets.""" + return self._max_sighash_subsets + + def get_tx_sighash_data(self, tx: Transaction) -> bytes: + """ + Return the sighash data for a tx, depending on the sighash type set in this context. + Must be used when verifying signatures during script execution. + """ + match self._sighash: + case SighashAll(): + return tx.get_sighash_all_data() + case SighashBitmask() | SighashRange(): + data = tx.get_custom_sighash_data(self._sighash) + return hashlib.sha256(data).digest() + case _: + assert_never(self._sighash) + + def get_selected_outputs(self) -> set[int]: + """Get a set with all output indexes selected (that is, signed) in this context.""" + match self._sighash: + case SighashAll(): + return set(range(self._settings.MAX_NUM_OUTPUTS)) + case SighashBitmask() | SighashRange(): + return set(self._sighash.get_output_indexes()) + case _: + assert_never(self._sighash) diff --git a/hathor/transaction/scripts/sighash.py b/hathor/transaction/scripts/sighash.py new file mode 100644 index 000000000..7c944a4f8 --- /dev/null +++ b/hathor/transaction/scripts/sighash.py @@ -0,0 +1,111 @@ +# Copyright 2023 Hathor Labs +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from abc import ABC, abstractmethod +from dataclasses import dataclass +from typing import Any, TypeAlias + +from pydantic import Field, validator +from typing_extensions import override + +from hathor.utils.pydantic import BaseModel + + +@dataclass(frozen=True, slots=True) +class SighashAll: + """A model representing the sighash all, which is the default sighash type.""" + pass + + +class CustomSighash(ABC, BaseModel): + """An interface to be implemented by custom sighash models.""" + @abstractmethod + def get_input_indexes(self) -> list[int]: + """Return a list of input indexes selected by this sighash.""" + raise NotImplementedError + + @abstractmethod + def get_output_indexes(self) -> list[int]: + """Return a list of output indexes selected by this sighash.""" + raise NotImplementedError + + +class SighashBitmask(CustomSighash): + """A model representing the sighash bitmask type config.""" + inputs: int = Field(ge=0x01, le=0xFF) + outputs: int = Field(ge=0x00, le=0xFF) + + @override + def get_input_indexes(self) -> list[int]: + return self._get_indexes(self.inputs) + + @override + def get_output_indexes(self) -> list[int]: + return self._get_indexes(self.outputs) + + @staticmethod + def _get_indexes(bitmask: int) -> list[int]: + """Return a list of indexes equivalent to some bitmask.""" + return [index for index in range(8) if (bitmask >> index) & 1] + + +class SighashRange(CustomSighash): + """A model representing the sighash range type config. Range ends are not inclusive.""" + input_start: int = Field(ge=0, le=255) + input_end: int = Field(ge=0, le=255) + output_start: int = Field(ge=0, le=255) + output_end: int = Field(ge=0, le=255) + + @validator('input_end') + def _validate_input_end(cls, input_end: int, values: dict[str, Any]) -> int: + if input_end < values['input_start']: + raise ValueError('input_end must be greater than or equal to input_start.') + + return input_end + + @validator('output_end') + def _validate_output_end(cls, output_end: int, values: dict[str, Any]) -> int: + if output_end < values['output_start']: + raise ValueError('output_end must be greater than or equal to output_start.') + + return output_end + + @override + def get_input_indexes(self) -> list[int]: + return list(range(self.input_start, self.input_end)) + + @override + def get_output_indexes(self) -> list[int]: + return list(range(self.output_start, self.output_end)) + + +SighashType: TypeAlias = SighashAll | SighashBitmask | SighashRange + + +class InputsOutputsLimit(BaseModel): + """A model representing inputs and outputs limits config.""" + max_inputs: int = Field(ge=1) + max_outputs: int = Field(ge=1) + + +def get_unique_sighash_subsets(sighashes: list[SighashType]) -> set[tuple[frozenset[int], frozenset[int]]]: + """ + A sighash subset is equivalent to its select inputs and outputs. That is, two subsets are equal if they select + the same inputs and outputs. This function takes a list of sighashes and returns a set of their subsets. + SighashAll subsets are excluded. + """ + return set([ + (frozenset(sighash.get_input_indexes()), frozenset(sighash.get_output_indexes())) + for sighash in sighashes if not isinstance(sighash, SighashAll) + ]) diff --git a/hathor/transaction/token_creation_tx.py b/hathor/transaction/token_creation_tx.py index 629050197..fe078d470 100644 --- a/hathor/transaction/token_creation_tx.py +++ b/hathor/transaction/token_creation_tx.py @@ -147,8 +147,8 @@ def get_sighash_all(self) -> bytes: :return: Serialization of the inputs, outputs and tokens :rtype: bytes """ - if self._sighash_cache: - return self._sighash_cache + if self._sighash_all_cache: + return self._sighash_all_cache struct_bytes = pack( _SIGHASH_ALL_FORMAT_STRING, @@ -169,7 +169,7 @@ def get_sighash_all(self) -> bytes: struct_bytes += b''.join(tx_outputs) struct_bytes += self.serialize_token_info() - self._sighash_cache = struct_bytes + self._sighash_all_cache = struct_bytes return struct_bytes diff --git a/hathor/transaction/transaction.py b/hathor/transaction/transaction.py index a51eaeffe..6865226c3 100644 --- a/hathor/transaction/transaction.py +++ b/hathor/transaction/transaction.py @@ -24,13 +24,14 @@ from hathor.exception import InvalidNewTransaction from hathor.transaction import TxInput, TxOutput, TxVersion from hathor.transaction.base_transaction import TX_HASH_SIZE, GenericVertex -from hathor.transaction.exceptions import InvalidToken +from hathor.transaction.exceptions import InvalidScriptError, InvalidToken from hathor.transaction.static_metadata import TransactionStaticMetadata from hathor.transaction.util import VerboseCallback, unpack, unpack_len from hathor.types import TokenUid, VertexId if TYPE_CHECKING: from hathor.conf.settings import HathorSettings + from hathor.transaction.scripts.sighash import CustomSighash from hathor.transaction.storage import TransactionStorage # noqa: F401 # Signal bits (B), version (B), token uids len (B) and inputs len (B), outputs len (B). @@ -88,7 +89,7 @@ def __init__( settings=settings ) self.tokens = tokens or [] - self._sighash_cache: Optional[bytes] = None + self._sighash_all_cache: Optional[bytes] = None self._sighash_data_cache: Optional[bytes] = None @property @@ -193,32 +194,37 @@ def get_sighash_all(self) -> bytes: # This method does not depend on the input itself, however we call it for each one to sign it. # For transactions that have many inputs there is a significant decrease on the verify time # when using this cache, so we call this method only once. - if self._sighash_cache: - return self._sighash_cache + if self._sighash_all_cache: + return self._sighash_all_cache + sighash = self._get_sighash(inputs=self.inputs, outputs=self.outputs) + self._sighash_all_cache = sighash + + return sighash + + def _get_sighash(self, *, inputs: list[TxInput], outputs: list[TxOutput]) -> bytes: + """Return the sighash data for this tx using a custom list of inputs and outputs.""" struct_bytes = bytearray( pack( _SIGHASH_ALL_FORMAT_STRING, self.signal_bits, self.version, len(self.tokens), - len(self.inputs), - len(self.outputs) + len(inputs), + len(outputs) ) ) for token_uid in self.tokens: struct_bytes += token_uid - for tx_input in self.inputs: + for tx_input in inputs: struct_bytes += tx_input.get_sighash_bytes() - for tx_output in self.outputs: + for tx_output in outputs: struct_bytes += bytes(tx_output) - ret = bytes(struct_bytes) - self._sighash_cache = ret - return ret + return bytes(struct_bytes) def get_sighash_all_data(self) -> bytes: """Return the sha256 hash of sighash_all""" @@ -227,6 +233,19 @@ def get_sighash_all_data(self) -> bytes: return self._sighash_data_cache + def get_custom_sighash_data(self, sighash: 'CustomSighash') -> bytes: + """ + Return the sighash data for this tx using a custom sighash type. + Inputs and outputs are selected according to indexes selected by the sighash. + """ + try: + inputs = [self.inputs[index] for index in sighash.get_input_indexes()] + outputs = [self.outputs[index] for index in sighash.get_output_indexes()] + except IndexError: + raise InvalidScriptError('Custom sighash selected nonexistent input/output.') + + return self._get_sighash(inputs=inputs, outputs=outputs) + def get_token_uid(self, index: int) -> TokenUid: """Returns the token uid with corresponding index from the tx token uid list. diff --git a/hathor/verification/transaction_verifier.py b/hathor/verification/transaction_verifier.py index 906df38c2..fffb37a30 100644 --- a/hathor/verification/transaction_verifier.py +++ b/hathor/verification/transaction_verifier.py @@ -17,7 +17,7 @@ from hathor.profiler import get_cpu_profiler from hathor.reward_lock import get_spent_reward_locked_info from hathor.reward_lock.reward_lock import get_minimum_best_height -from hathor.transaction import BaseTransaction, Transaction, TxInput +from hathor.transaction import BaseTransaction, Transaction from hathor.transaction.exceptions import ( ConflictingInputs, DuplicatedParents, @@ -28,13 +28,18 @@ InvalidInputDataSize, InvalidToken, NoInputError, + OutputNotSelected, RewardLocked, ScriptError, TimestampError, TooManyInputs, + TooManySighashSubsets, TooManySigOps, WeightError, ) +from hathor.transaction.scripts import script_eval +from hathor.transaction.scripts.script_context import ScriptContext +from hathor.transaction.scripts.sighash import get_unique_sighash_subsets from hathor.transaction.transaction import TokenInfo from hathor.transaction.util import get_deposit_amount, get_withdraw_amount from hathor.types import TokenUid, VertexId @@ -97,8 +102,10 @@ def verify_inputs(self, tx: Transaction, *, skip_script: bool = False) -> None: """Verify inputs signatures and ownership and all inputs actually exist""" from hathor.transaction.storage.exceptions import TransactionDoesNotExist + spent_txs: dict[VertexId, BaseTransaction] = {} spent_outputs: set[tuple[VertexId, int]] = set() - for input_tx in tx.inputs: + + for input_index, input_tx in enumerate(tx.inputs): if len(input_tx.data) > self._settings.MAX_INPUT_DATA_SIZE: raise InvalidInputDataSize('size: {} and max-size: {}'.format( len(input_tx.data), self._settings.MAX_INPUT_DATA_SIZE @@ -106,6 +113,7 @@ def verify_inputs(self, tx: Transaction, *, skip_script: bool = False) -> None: try: spent_tx = tx.get_spent_tx(input_tx) + spent_txs[spent_tx.hash] = spent_tx if input_tx.index >= len(spent_tx.outputs): raise InexistentInput('Output spent by this input does not exist: {} index {}'.format( input_tx.tx_id.hex(), input_tx.index)) @@ -120,9 +128,6 @@ def verify_inputs(self, tx: Transaction, *, skip_script: bool = False) -> None: spent_tx.timestamp, )) - if not skip_script: - self.verify_script(tx=tx, input_tx=input_tx, spent_tx=spent_tx) - # check if any other input in this tx is spending the same output key = (input_tx.tx_id, input_tx.index) if key in spent_outputs: @@ -130,17 +135,45 @@ def verify_inputs(self, tx: Transaction, *, skip_script: bool = False) -> None: tx.hash_hex, input_tx.tx_id.hex(), input_tx.index)) spent_outputs.add(key) - def verify_script(self, *, tx: Transaction, input_tx: TxInput, spent_tx: BaseTransaction) -> None: + if not skip_script: + self.verify_scripts(tx, spent_txs=spent_txs) + + def verify_scripts(self, tx: Transaction, *, spent_txs: dict[VertexId, BaseTransaction]) -> None: """ - :type tx: Transaction - :type input_tx: TxInput - :type spent_tx: Transaction + Verify the tx's input scripts by running them, checking their signatures, and evaluating the resulting + script contexts. """ - from hathor.transaction.scripts import script_eval - try: - script_eval(tx, input_tx, spent_tx) - except ScriptError as e: - raise InvalidInputData(e) from e + all_contexts: list[ScriptContext] = [] + + for input_index, input_tx in enumerate(tx.inputs): + try: + script_context = script_eval(tx, input_tx, spent_txs[input_tx.tx_id], input_index=input_index) + except ScriptError as e: + raise InvalidInputData(e) from e + + all_contexts.append(script_context) + + all_sighashes = [context.get_sighash() for context in all_contexts] + sighash_subsets = get_unique_sighash_subsets(all_sighashes) + + all_max_sighash_subsets = [context.get_max_sighash_subsets() for context in all_contexts] + valid_max_sighash_subsets: list[int] = [ + max_subsets for max_subsets in all_max_sighash_subsets if max_subsets is not None + ] + + if len(valid_max_sighash_subsets) > 0: + max_sighash_subsets = min(valid_max_sighash_subsets) + if len(sighash_subsets) > max_sighash_subsets: + raise TooManySighashSubsets( + f'There are more custom sighash subsets than the configured maximum ' + f'({len(sighash_subsets)} > {max_sighash_subsets}).' + ) + + all_selected_outputs_subsets = [context.get_selected_outputs() for context in all_contexts] + all_selected_outputs = set().union(*all_selected_outputs_subsets) + for index, _ in enumerate(tx.outputs): + if index not in all_selected_outputs: + raise OutputNotSelected(f'Output at index {index} is not signed by any input.') def verify_reward_locked(self, tx: Transaction) -> None: """Will raise `RewardLocked` if any reward is spent before the best block height is enough, considering both diff --git a/hathor/wallet/util.py b/hathor/wallet/util.py index b8b1aa9b4..f7d11f3a1 100644 --- a/hathor/wallet/util.py +++ b/hathor/wallet/util.py @@ -88,7 +88,7 @@ def generate_multisig_address(redeem_script: bytes, version_byte: Optional[bytes def generate_signature(tx: Transaction, private_key_bytes: bytes, password: Optional[bytes] = None) -> bytes: - """ Create a signature for the tx + """ Create a signature for the tx using the default sighash_all :param tx: transaction with the data to be signed :type tx: :py:class:`hathor.transaction.transaction.Transaction` @@ -102,8 +102,16 @@ def generate_signature(tx: Transaction, private_key_bytes: bytes, password: Opti :return: signature of the tx :rtype: bytes """ + return generate_signature_for_data(tx.get_sighash_all(), private_key_bytes, password) + + +def generate_signature_for_data( + data_to_sign: bytes, + private_key_bytes: bytes, + password: Optional[bytes] = None +) -> bytes: + """Create a signature for some custom data.""" private_key = get_private_key_from_bytes(private_key_bytes, password=password) - data_to_sign = tx.get_sighash_all() hashed_data = hashlib.sha256(data_to_sign).digest() signature = private_key.sign(hashed_data, ec.ECDSA(hashes.SHA256())) return signature diff --git a/tests/tx/scripts/__init__.py b/tests/tx/scripts/__init__.py new file mode 100644 index 000000000..e69de29bb diff --git a/tests/tx/scripts/test_get_unique_sighash_subsets.py b/tests/tx/scripts/test_get_unique_sighash_subsets.py new file mode 100644 index 000000000..6f3d6a020 --- /dev/null +++ b/tests/tx/scripts/test_get_unique_sighash_subsets.py @@ -0,0 +1,91 @@ +# Copyright 2023 Hathor Labs +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import pytest + +from hathor.transaction.scripts.sighash import ( + SighashAll, + SighashBitmask, + SighashRange, + SighashType, + get_unique_sighash_subsets, +) + + +@pytest.mark.parametrize( + ['sighashes', 'expected'], + [ + # empty -> 0 subsets + ([], set()), + # SighashAll is removed -> 0 subsets + ([SighashAll(), SighashAll(), SighashAll()], set()), + # SighashBitmask -> 1 subset + ( + [SighashAll(), SighashBitmask(inputs=0b101, outputs=0b010)], + {(frozenset([0, 2]), frozenset([1]))} + ), + # SighashRange -> 1 subset + ( + [SighashAll(), SighashRange(input_start=3, input_end=5, output_start=0, output_end=4)], + {(frozenset(range(3, 5)), frozenset(range(0, 4)))} + ), + # Different Sighash bitmasks -> 2 subsets + ( + [ + SighashBitmask(inputs=0b111, outputs=0b1), + SighashBitmask(inputs=0b1, outputs=0b111), + ], + { + (frozenset([0, 1, 2]), frozenset([0])), + (frozenset([0]), frozenset([0, 1, 2])), + } + ), + # Equal Sighash bitmasks -> 1 subset + ( + [ + SighashBitmask(inputs=0b111, outputs=0b1), + SighashBitmask(inputs=0b111, outputs=0b1), + ], + { + (frozenset([0, 1, 2]), frozenset([0])), + } + ), + # Different Sighash bitmask and range -> 2 subsets + ( + [ + SighashBitmask(inputs=0b111, outputs=0b1), + SighashRange(input_start=3, input_end=5, output_start=0, output_end=4) + ], + { + (frozenset([0, 1, 2]), frozenset([0])), + (frozenset(range(3, 5)), frozenset(range(0, 4))), + } + ), + # Equal Sighash bitmask and range -> 1 subset + ( + [ + SighashBitmask(inputs=0b111, outputs=0b1), + SighashRange(input_start=0, input_end=3, output_start=0, output_end=1) + ], + { + (frozenset([0, 1, 2]), frozenset([0])), + } + ), + ] +) +def test_get_unique_sighash_subsets( + sighashes: list[SighashType], + expected: set[tuple[frozenset[int], frozenset[int]]] +) -> None: + assert get_unique_sighash_subsets(sighashes) == expected diff --git a/tests/tx/scripts/test_p2pkh.py b/tests/tx/scripts/test_p2pkh.py new file mode 100644 index 000000000..2102640ab --- /dev/null +++ b/tests/tx/scripts/test_p2pkh.py @@ -0,0 +1,212 @@ +# Copyright 2023 Hathor Labs +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from hathor.transaction.scripts import P2PKH, Opcode +from hathor.transaction.scripts.sighash import InputsOutputsLimit, SighashAll, SighashBitmask, SighashRange + + +def test_create_input_data_simple() -> None: + pub_key = b'my_pub_key' + signature = b'my_signature' + data = P2PKH.create_input_data(public_key_bytes=pub_key, signature=signature) + + assert data == bytes([ + len(signature), + *signature, + len(pub_key), + *pub_key + ]) + + +def test_create_input_data_with_sighash_all() -> None: + pub_key = b'my_pub_key' + signature = b'my_signature' + data = P2PKH.create_input_data(public_key_bytes=pub_key, signature=signature, sighash=SighashAll()) + + assert data == bytes([ + len(signature), + *signature, + len(pub_key), + *pub_key + ]) + + +def test_create_input_data_with_sighash_bitmask() -> None: + pub_key = b'my_pub_key' + signature = b'my_signature' + inputs_bitmask = 0b111 + outputs_bitmask = 0b101 + sighash = SighashBitmask(inputs=inputs_bitmask, outputs=outputs_bitmask) + data = P2PKH.create_input_data(public_key_bytes=pub_key, signature=signature, sighash=sighash) + + assert data == bytes([ + 1, + inputs_bitmask, + 1, + outputs_bitmask, + Opcode.OP_SIGHASH_BITMASK, + len(signature), + *signature, + len(pub_key), + *pub_key + ]) + + +def test_create_input_data_with_sighash_range() -> None: + pub_key = b'my_pub_key' + signature = b'my_signature' + input_start = 123 + input_end = 145 + output_start = 10 + output_end = 20 + sighash = SighashRange( + input_start=input_start, + input_end=input_end, + output_start=output_start, + output_end=output_end, + ) + data = P2PKH.create_input_data(public_key_bytes=pub_key, signature=signature, sighash=sighash) + + assert data == bytes([ + 1, + input_start, + 1, + input_end, + 1, + output_start, + 1, + output_end, + Opcode.OP_SIGHASH_RANGE, + len(signature), + *signature, + len(pub_key), + *pub_key + ]) + + +def test_create_input_data_with_inputs_outputs_limit() -> None: + pub_key = b'my_pub_key' + signature = b'my_signature' + max_inputs = 2 + max_outputs = 3 + limit = InputsOutputsLimit(max_inputs=max_inputs, max_outputs=max_outputs) + data = P2PKH.create_input_data(public_key_bytes=pub_key, signature=signature, inputs_outputs_limit=limit) + + assert data == bytes([ + 1, + max_inputs, + 1, + max_outputs, + Opcode.OP_MAX_INPUTS_OUTPUTS, + len(signature), + *signature, + len(pub_key), + *pub_key + ]) + + +def test_create_input_data_with_max_sighash_subsets() -> None: + pub_key = b'my_pub_key' + signature = b'my_signature' + max_subsets = 7 + data = P2PKH.create_input_data(public_key_bytes=pub_key, signature=signature, max_sighash_subsets=max_subsets) + + assert data == bytes([ + 1, + max_subsets, + Opcode.OP_MAX_SIGHASH_SUBSETS, + len(signature), + *signature, + len(pub_key), + *pub_key + ]) + + +def test_create_input_data_with_sighash_bitmask_and_inputs_outputs_limit() -> None: + pub_key = b'my_pub_key' + signature = b'my_signature' + inputs_bitmask = 0b111 + outputs_bitmask = 0b101 + max_inputs = 2 + max_outputs = 3 + sighash = SighashBitmask(inputs=inputs_bitmask, outputs=outputs_bitmask) + limit = InputsOutputsLimit(max_inputs=max_inputs, max_outputs=max_outputs) + data = P2PKH.create_input_data( + public_key_bytes=pub_key, + signature=signature, + sighash=sighash, + inputs_outputs_limit=limit + ) + + assert data == bytes([ + 1, + inputs_bitmask, + 1, + outputs_bitmask, + Opcode.OP_SIGHASH_BITMASK, + 1, + max_inputs, + 1, + max_outputs, + Opcode.OP_MAX_INPUTS_OUTPUTS, + len(signature), + *signature, + len(pub_key), + *pub_key + ]) + + +def test_create_input_data_with_sighash_range_and_inputs_outputs_limit() -> None: + pub_key = b'my_pub_key' + signature = b'my_signature' + input_start = 123 + input_end = 145 + output_start = 10 + output_end = 20 + max_inputs = 2 + max_outputs = 3 + sighash = SighashRange( + input_start=input_start, + input_end=input_end, + output_start=output_start, + output_end=output_end, + ) + limit = InputsOutputsLimit(max_inputs=max_inputs, max_outputs=max_outputs) + data = P2PKH.create_input_data( + public_key_bytes=pub_key, + signature=signature, + sighash=sighash, + inputs_outputs_limit=limit + ) + + assert data == bytes([ + 1, + input_start, + 1, + input_end, + 1, + output_start, + 1, + output_end, + Opcode.OP_SIGHASH_RANGE, + 1, + max_inputs, + 1, + max_outputs, + Opcode.OP_MAX_INPUTS_OUTPUTS, + len(signature), + *signature, + len(pub_key), + *pub_key + ]) diff --git a/tests/tx/scripts/test_script_context.py b/tests/tx/scripts/test_script_context.py new file mode 100644 index 000000000..e6b0bce33 --- /dev/null +++ b/tests/tx/scripts/test_script_context.py @@ -0,0 +1,151 @@ +# Copyright 2023 Hathor Labs +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import hashlib +from unittest.mock import Mock + +import pytest + +from hathor.conf.settings import HathorSettings +from hathor.transaction import Transaction, TxInput, TxOutput +from hathor.transaction.exceptions import ScriptError +from hathor.transaction.scripts.script_context import ScriptContext +from hathor.transaction.scripts.sighash import SighashAll, SighashBitmask, SighashRange + + +@pytest.mark.parametrize(['max_num_outputs'], [(99,), (255,)]) +def test_defaults(max_num_outputs: int) -> None: + settings = Mock(spec_set=HathorSettings) + settings.MAX_NUM_OUTPUTS = max_num_outputs + + context = ScriptContext(settings=settings, stack=Mock(), logs=[], extras=Mock()) + + tx = Transaction( + inputs=[ + TxInput(tx_id=b'tx1', index=0, data=b''), + TxInput(tx_id=b'tx2', index=1, data=b''), + ], + outputs=[ + TxOutput(value=11, script=b''), + TxOutput(value=22, script=b''), + ] + ) + + assert context.get_sighash() == SighashAll() + assert context.get_tx_sighash_data(tx) == tx.get_sighash_all_data() + assert context.get_selected_outputs() == set(range(max_num_outputs)) + assert context.get_max_sighash_subsets() is None + + +def test_set_sighash() -> None: + context = ScriptContext(settings=Mock(), stack=Mock(), logs=[], extras=Mock()) + + sighash = SighashBitmask(inputs=0b111, outputs=0b101) + context.set_sighash(sighash) + assert context.get_sighash() == sighash + + with pytest.raises(ScriptError): + context.set_sighash(sighash) + + +def test_set_max_sighash_subsets() -> None: + context = ScriptContext(settings=Mock(), stack=Mock(), logs=[], extras=Mock()) + assert context.get_max_sighash_subsets() is None + + context.set_max_sighash_subsets(10) + assert context.get_max_sighash_subsets() == 10 + + context.set_max_sighash_subsets(20) + assert context.get_max_sighash_subsets() == 10 + + context.set_max_sighash_subsets(3) + assert context.get_max_sighash_subsets() == 3 + + +@pytest.mark.parametrize( + ['outputs_bitmask', 'selected_outputs'], + [ + (0b00, set()), + (0b01, {0}), + (0b10, {1}), + (0b11, {0, 1}), + ] +) +def test_sighash_bitmask(outputs_bitmask: int, selected_outputs: set[int]) -> None: + settings = Mock() + settings.MAX_NUM_INPUTS = 88 + settings.MAX_NUM_OUTPUTS = 99 + + context = ScriptContext(settings=settings, stack=Mock(), logs=[], extras=Mock()) + tx = Transaction( + inputs=[ + TxInput(tx_id=b'tx1', index=0, data=b''), + TxInput(tx_id=b'tx2', index=1, data=b''), + ], + outputs=[ + TxOutput(value=11, script=b''), + TxOutput(value=22, script=b''), + ] + ) + + sighash_bitmask = SighashBitmask(inputs=0b11, outputs=outputs_bitmask) + context.set_sighash(sighash_bitmask) + + data = tx.get_custom_sighash_data(sighash_bitmask) + assert context.get_tx_sighash_data(tx) == hashlib.sha256(data).digest() + + with pytest.raises(ScriptError) as e: + context.set_sighash(Mock()) + + assert str(e.value) == 'Cannot modify sighash after it is already set.' + assert context.get_selected_outputs() == selected_outputs + + +@pytest.mark.parametrize( + ['output_start', 'output_end', 'selected_outputs'], + [ + (100, 100, set()), + (0, 1, {0}), + (1, 2, {1}), + (0, 2, {0, 1}), + ] +) +def test_sighash_range(output_start: int, output_end: int, selected_outputs: set[int]) -> None: + settings = Mock() + settings.MAX_NUM_INPUTS = 88 + settings.MAX_NUM_OUTPUTS = 99 + + context = ScriptContext(settings=settings, stack=Mock(), logs=[], extras=Mock()) + tx = Transaction( + inputs=[ + TxInput(tx_id=b'tx1', index=0, data=b''), + TxInput(tx_id=b'tx2', index=1, data=b''), + ], + outputs=[ + TxOutput(value=11, script=b''), + TxOutput(value=22, script=b''), + ] + ) + + sighash_range = SighashRange(input_start=0, input_end=2, output_start=output_start, output_end=output_end) + context.set_sighash(sighash_range) + + data = tx.get_custom_sighash_data(sighash_range) + assert context.get_tx_sighash_data(tx) == hashlib.sha256(data).digest() + + with pytest.raises(ScriptError) as e: + context.set_sighash(Mock()) + + assert str(e.value) == 'Cannot modify sighash after it is already set.' + assert context.get_selected_outputs() == selected_outputs diff --git a/tests/tx/scripts/test_sighash_bitmask.py b/tests/tx/scripts/test_sighash_bitmask.py new file mode 100644 index 000000000..24aea0aaa --- /dev/null +++ b/tests/tx/scripts/test_sighash_bitmask.py @@ -0,0 +1,321 @@ +# Copyright 2023 Hathor Labs +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from typing import cast +from unittest.mock import patch + +import pytest + +from hathor.crypto.util import decode_address +from hathor.exception import InvalidNewTransaction +from hathor.manager import HathorManager +from hathor.transaction import Transaction, TxInput, TxOutput +from hathor.transaction.exceptions import InputOutputMismatch, InvalidInputData, InvalidScriptError +from hathor.transaction.scripts.p2pkh import P2PKH +from hathor.transaction.scripts.sighash import InputsOutputsLimit, SighashBitmask +from hathor.util import not_none +from tests import unittest +from tests.utils import add_blocks_unlock_reward, create_tokens, get_genesis_key + + +class BaseSighashBitmaskTest(unittest.TestCase): + __test__ = False + + def setUp(self) -> None: + super().setUp() + self.manager1: HathorManager = self.create_peer('testnet', unlock_wallet=True, wallet_index=True) + self.manager2: HathorManager = self.create_peer('testnet', unlock_wallet=True, wallet_index=True) + + # 1 is Alice + assert self.manager1.wallet + self.address1_b58 = self.manager1.wallet.get_unused_address() + self.private_key1 = self.manager1.wallet.get_private_key(self.address1_b58) + self.address1 = decode_address(self.address1_b58) + + # 2 is Bob + assert self.manager2.wallet + self.address2_b58 = self.manager2.wallet.get_unused_address() + self.address2 = decode_address(self.address2_b58) + + self.genesis_private_key = get_genesis_key() + self.genesis_block = self.manager1.tx_storage.get_transaction(self._settings.GENESIS_BLOCK_HASH) + + # Add some blocks so we can spend the genesis outputs + add_blocks_unlock_reward(self.manager1) + + @patch('hathor.transaction.scripts.opcode.is_opcode_valid', lambda _: True) + def test_sighash_bitmask(self) -> None: + # Create a new test token + token_creation_tx = create_tokens(self.manager1, self.address1_b58) + token_uid = token_creation_tx.tokens[0] + token_creation_utxo = token_creation_tx.outputs[0] + genesis_utxo = self.genesis_block.outputs[0] + parents = self.manager1.get_new_tx_parents() + + # Alice creates an input spending all created test tokens + tokens_input = TxInput(not_none(token_creation_tx.hash), 0, b'') + + # Alice creates an output sending half genesis HTR to herself + alice_output_script = P2PKH.create_output_script(self.address1) + htr_output = TxOutput(int(genesis_utxo.value / 2), alice_output_script) + + # Alice creates an atomic swap tx that's missing Bob's input, with half genesis HTR, and his output + atomic_swap_tx = Transaction( + weight=1, + inputs=[tokens_input], + outputs=[htr_output], + parents=parents, + tokens=[token_uid], + storage=self.manager1.tx_storage, + timestamp=token_creation_tx.timestamp + 1 + ) + self.manager1.cpu_mining_service.resolve(atomic_swap_tx) + + # Alice signs her input using sighash bitmasks, instead of sighash_all. + sighash_bitmask = SighashBitmask(inputs=0b1, outputs=0b1) + data_to_sign1 = atomic_swap_tx.get_custom_sighash_data(sighash_bitmask) + assert self.manager1.wallet + public_bytes1, signature1 = self.manager1.wallet.get_input_aux_data(data_to_sign1, self.private_key1) + tokens_input.data = P2PKH.create_input_data( + public_key_bytes=public_bytes1, + signature=signature1, + sighash=sighash_bitmask, + ) + + # At this point, the tx is partial. The inputs are valid, but they're mismatched with outputs + self.manager1.verification_service.verifiers.tx.verify_inputs(atomic_swap_tx) + with pytest.raises(InputOutputMismatch): + self.manager1.verification_service.verify(atomic_swap_tx) + + # Alice sends the tx bytes to Bob, represented here by cloning the tx + atomic_swap_tx_clone = cast(Transaction, atomic_swap_tx.clone()) + self.manager1.cpu_mining_service.resolve(atomic_swap_tx_clone) + + # Bob creates an input spending all genesis HTR and adds it to the atomic swap tx + htr_input = TxInput(not_none(self.genesis_block.hash), 0, b'') + atomic_swap_tx_clone.inputs.append(htr_input) + + # Bob adds an output to receive all test tokens + bob_output_script = P2PKH.create_output_script(self.address2) + tokens_output = TxOutput(token_creation_utxo.value, bob_output_script, 1) + atomic_swap_tx_clone.outputs.append(tokens_output) + + # Bob adds a change output for his HTR + htr_output = TxOutput(int(genesis_utxo.value / 2), bob_output_script) + atomic_swap_tx_clone.outputs.append(htr_output) + + # Bob signs his input using sighash_all to complete the tx + data_to_sign2 = atomic_swap_tx_clone.get_sighash_all() + assert self.manager2.wallet + public_bytes2, signature2 = self.manager2.wallet.get_input_aux_data(data_to_sign2, self.genesis_private_key) + htr_input.data = P2PKH.create_input_data(public_bytes2, signature2) + + # The atomic swap tx is now completed and valid, and can be propagated + self.manager1.verification_service.verify(atomic_swap_tx_clone) + self.manager1.propagate_tx(atomic_swap_tx_clone, fails_silently=False) + + @patch('hathor.transaction.scripts.opcode.is_opcode_valid', lambda _: True) + def test_sighash_bitmask_with_limit(self) -> None: + # Create a new test token + token_creation_tx = create_tokens(self.manager1, self.address1_b58) + token_uid = token_creation_tx.tokens[0] + token_creation_utxo = token_creation_tx.outputs[0] + genesis_utxo = self.genesis_block.outputs[0] + parents = self.manager1.get_new_tx_parents() + + # Alice creates an input spending all created test tokens + tokens_input = TxInput(not_none(token_creation_tx.hash), 0, b'') + + # Alice creates an output sending half genesis HTR to herself + alice_output_script = P2PKH.create_output_script(self.address1) + htr_output = TxOutput(int(genesis_utxo.value / 2), alice_output_script) + + # Alice creates an atomic swap tx that's missing Bob's input, with half genesis HTR, and his output + atomic_swap_tx = Transaction( + weight=1, + inputs=[tokens_input], + outputs=[htr_output], + parents=parents, + tokens=[token_uid], + storage=self.manager1.tx_storage, + timestamp=token_creation_tx.timestamp + 1 + ) + self.manager1.cpu_mining_service.resolve(atomic_swap_tx) + + # Alice signs her input using sighash bitmasks, instead of sighash_all. + # She also sets max inputs and max outputs limits, including one output for change. + sighash_bitmask = SighashBitmask(inputs=0b1, outputs=0b1) + data_to_sign1 = atomic_swap_tx.get_custom_sighash_data(sighash_bitmask) + assert self.manager1.wallet + public_bytes1, signature1 = self.manager1.wallet.get_input_aux_data(data_to_sign1, self.private_key1) + tokens_input.data = P2PKH.create_input_data( + public_key_bytes=public_bytes1, + signature=signature1, + sighash=sighash_bitmask, + inputs_outputs_limit=InputsOutputsLimit(max_inputs=2, max_outputs=3) + ) + + # At this point, the tx is partial. The inputs are valid, but they're mismatched with outputs + self.manager1.verification_service.verifiers.tx.verify_inputs(atomic_swap_tx) + with pytest.raises(InputOutputMismatch): + self.manager1.verification_service.verify(atomic_swap_tx) + + # Alice sends the tx bytes to Bob, represented here by cloning the tx + atomic_swap_tx_clone = cast(Transaction, atomic_swap_tx.clone()) + self.manager1.cpu_mining_service.resolve(atomic_swap_tx_clone) + + # Bob creates an input spending all genesis HTR and adds it to the atomic swap tx + htr_input = TxInput(not_none(self.genesis_block.hash), 0, b'') + atomic_swap_tx_clone.inputs.append(htr_input) + + # Bob adds an output to receive all test tokens + bob_output_script = P2PKH.create_output_script(self.address2) + tokens_output = TxOutput(token_creation_utxo.value, bob_output_script, 1) + atomic_swap_tx_clone.outputs.append(tokens_output) + + # Bob adds two change outputs for his HTR, which violates the maximum tx outputs set by Alice + htr_output1 = TxOutput(genesis_utxo.value // 4, bob_output_script) + htr_output2 = TxOutput(genesis_utxo.value // 4, bob_output_script) + atomic_swap_tx_clone.outputs.append(htr_output1) + atomic_swap_tx_clone.outputs.append(htr_output2) + + # Bob signs his input using sighash_all to complete the tx + data_to_sign2 = atomic_swap_tx_clone.get_sighash_all() + assert self.manager2.wallet + public_bytes2, signature2 = self.manager2.wallet.get_input_aux_data(data_to_sign2, self.genesis_private_key) + htr_input.data = P2PKH.create_input_data(public_bytes2, signature2) + + # The atomic swap tx is not valid and cannot be propagated + with pytest.raises(InvalidInputData) as e: + self.manager1.verification_service.verify(atomic_swap_tx_clone) + + self.assertEqual(str(e.value), "Maximum number of outputs exceeded (4 > 3).") + + with pytest.raises(InvalidNewTransaction): + self.manager1.propagate_tx(atomic_swap_tx_clone, fails_silently=False) + + @patch('hathor.transaction.scripts.opcode.is_opcode_valid', lambda _: True) + def test_sighash_bitmask_input_not_selected(self) -> None: + # Create a new test token + token_creation_tx = create_tokens(self.manager1, self.address1_b58) + token_uid = token_creation_tx.tokens[0] + parents = self.manager1.get_new_tx_parents() + + # Alice creates an input spending all created test tokens + tokens_input = TxInput(not_none(token_creation_tx.hash), 0, b'') + + # Alice creates an input spending all genesis HTR + genesis_input = TxInput(not_none(self.genesis_block.hash), 0, b'') + + # Alice creates an atomic swap tx + atomic_swap_tx = Transaction( + weight=1, + inputs=[tokens_input, genesis_input], + outputs=[], + parents=parents, + tokens=[token_uid], + storage=self.manager1.tx_storage, + timestamp=token_creation_tx.timestamp + 1 + ) + self.manager1.cpu_mining_service.resolve(atomic_swap_tx) + + # Alice signs her token input using sighash bitmasks, instead of sighash_all. + sighash_bitmask = SighashBitmask(inputs=0b01, outputs=0b00) + data_to_sign1 = atomic_swap_tx.get_custom_sighash_data(sighash_bitmask) + assert self.manager1.wallet + public_bytes1, signature1 = self.manager1.wallet.get_input_aux_data(data_to_sign1, self.private_key1) + tokens_input.data = P2PKH.create_input_data( + public_key_bytes=public_bytes1, + signature=signature1, + sighash=sighash_bitmask, + ) + + # Alice signs her genesis input using the same sighash, so the genesis input is not selected in the bitmask. + public_bytes1, signature1 = self.manager1.wallet.get_input_aux_data(data_to_sign1, self.genesis_private_key) + genesis_input.data = P2PKH.create_input_data( + public_key_bytes=public_bytes1, + signature=signature1, + sighash=sighash_bitmask, + ) + + # The inputs are invalid, since one of them doesn't select itself. + with pytest.raises(InvalidInputData) as e: + self.manager1.verification_service.verifiers.tx.verify_inputs(atomic_swap_tx) + + self.assertEqual(str(e.value), 'Input at index 1 must select itself when using a custom sighash.') + + with pytest.raises(InvalidInputData) as e: + self.manager1.verification_service.verify(atomic_swap_tx) + + self.assertEqual(str(e.value), 'Input at index 1 must select itself when using a custom sighash.') + + @patch('hathor.transaction.scripts.opcode.is_opcode_valid', lambda _: True) + def test_sighash_bitmask_nonexistent_input(self) -> None: + # Create a new test token + token_creation_tx = create_tokens(self.manager1, self.address1_b58) + token_uid = token_creation_tx.tokens[0] + genesis_utxo = self.genesis_block.outputs[0] + parents = self.manager1.get_new_tx_parents() + + # Alice creates an input spending all created test tokens + tokens_input = TxInput(not_none(token_creation_tx.hash), 0, b'') + + # Alice creates an output sending half genesis HTR to herself + alice_output_script = P2PKH.create_output_script(self.address1) + htr_output = TxOutput(int(genesis_utxo.value / 2), alice_output_script) + + # Alice creates an atomic swap tx that's missing Bob's input, with half genesis HTR, and his output + atomic_swap_tx = Transaction( + weight=1, + inputs=[tokens_input], + outputs=[htr_output], + parents=parents, + tokens=[token_uid], + storage=self.manager1.tx_storage, + timestamp=token_creation_tx.timestamp + 1 + ) + self.manager1.cpu_mining_service.resolve(atomic_swap_tx) + + # Alice signs her input using sighash bitmasks, instead of sighash_all. + sighash_bitmask = SighashBitmask(inputs=0b1, outputs=0b1) + data_to_sign1 = atomic_swap_tx.get_custom_sighash_data(sighash_bitmask) + assert self.manager1.wallet + public_bytes1, signature1 = self.manager1.wallet.get_input_aux_data(data_to_sign1, self.private_key1) + tokens_input.data = P2PKH.create_input_data( + public_key_bytes=public_bytes1, + signature=signature1, + sighash=SighashBitmask(inputs=0b11, outputs=0b1), + ) + + # The input is invalid, since it selects a nonexistent input + with pytest.raises(InvalidScriptError) as e: + self.manager1.verification_service.verifiers.tx.verify_inputs(atomic_swap_tx) + + assert str(e.value) == 'Custom sighash selected nonexistent input/output.' + + with pytest.raises(InvalidScriptError): + self.manager1.verification_service.verify(atomic_swap_tx) + + +class SyncV1SighashTest(unittest.SyncV1Params, BaseSighashBitmaskTest): + __test__ = True + + +class SyncV2SighashTest(unittest.SyncV2Params, BaseSighashBitmaskTest): + __test__ = True + + +# sync-bridge should behave like sync-v2 +class SyncBridgeSighashTest(unittest.SyncBridgeParams, SyncV2SighashTest): + pass diff --git a/tests/tx/scripts/test_sighash_range.py b/tests/tx/scripts/test_sighash_range.py new file mode 100644 index 000000000..866fd1b0b --- /dev/null +++ b/tests/tx/scripts/test_sighash_range.py @@ -0,0 +1,516 @@ +# Copyright 2023 Hathor Labs +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from typing import cast +from unittest.mock import patch + +import pytest + +from hathor.crypto.util import decode_address +from hathor.exception import InvalidNewTransaction +from hathor.manager import HathorManager +from hathor.transaction import Transaction, TxInput, TxOutput +from hathor.transaction.exceptions import ( + InputOutputMismatch, + InvalidInputData, + InvalidScriptError, + TooManySighashSubsets, +) +from hathor.transaction.scripts import MultiSig +from hathor.transaction.scripts.p2pkh import P2PKH +from hathor.transaction.scripts.sighash import InputsOutputsLimit, SighashRange +from hathor.util import not_none +from hathor.wallet.util import generate_multisig_address, generate_multisig_redeem_script, generate_signature_for_data +from tests import unittest +from tests.utils import add_blocks_unlock_reward, create_tokens, get_genesis_key + + +class BaseSighashRangeTest(unittest.TestCase): + __test__ = False + + def setUp(self) -> None: + super().setUp() + self.manager1: HathorManager = self.create_peer('testnet', unlock_wallet=True, wallet_index=True) + self.manager2: HathorManager = self.create_peer('testnet', unlock_wallet=True, wallet_index=True) + + # 1 is Alice + assert self.manager1.wallet + self.address1_b58 = self.manager1.wallet.get_unused_address() + self.private_key1 = self.manager1.wallet.get_private_key(self.address1_b58) + self.address1 = decode_address(self.address1_b58) + + # 2 is Bob + assert self.manager2.wallet + self.address2_b58 = self.manager2.wallet.get_unused_address() + self.address2 = decode_address(self.address2_b58) + + self.genesis_private_key = get_genesis_key() + self.genesis_block = self.manager1.tx_storage.get_transaction(self._settings.GENESIS_BLOCK_HASH) + + # Add some blocks so we can spend the genesis outputs + add_blocks_unlock_reward(self.manager1) + + @patch('hathor.transaction.scripts.opcode.is_opcode_valid', lambda _: True) + def test_sighash_range(self) -> None: + # Create a new test token + token_creation_tx = create_tokens(self.manager1, self.address1_b58) + token_uid = token_creation_tx.tokens[0] + token_creation_utxo = token_creation_tx.outputs[0] + genesis_utxo = self.genesis_block.outputs[0] + parents = self.manager1.get_new_tx_parents() + + # Alice creates an input spending all created test tokens + tokens_input = TxInput(not_none(token_creation_tx.hash), 0, b'') + + # Alice creates an output sending half genesis HTR to herself + alice_output_script = P2PKH.create_output_script(self.address1) + htr_output = TxOutput(int(genesis_utxo.value / 2), alice_output_script) + + # Alice creates an atomic swap tx that's missing Bob's input, with half genesis HTR, and his output + atomic_swap_tx = Transaction( + weight=1, + inputs=[tokens_input], + outputs=[htr_output], + parents=parents, + tokens=[token_uid], + storage=self.manager1.tx_storage, + timestamp=token_creation_tx.timestamp + 1 + ) + self.manager1.cpu_mining_service.resolve(atomic_swap_tx) + + # Alice signs her input using sighash range, instead of sighash_all. + sighash_range = SighashRange(input_start=0, input_end=1, output_start=0, output_end=1) + data_to_sign1 = atomic_swap_tx.get_custom_sighash_data(sighash_range) + assert self.manager1.wallet + public_bytes1, signature1 = self.manager1.wallet.get_input_aux_data(data_to_sign1, self.private_key1) + tokens_input.data = P2PKH.create_input_data( + public_key_bytes=public_bytes1, + signature=signature1, + sighash=sighash_range, + ) + + # At this point, the tx is partial. The inputs are valid, but they're mismatched with outputs + self.manager1.verification_service.verifiers.tx.verify_inputs(atomic_swap_tx) + with pytest.raises(InputOutputMismatch): + self.manager1.verification_service.verify(atomic_swap_tx) + + # Alice sends the tx bytes to Bob, represented here by cloning the tx + atomic_swap_tx_clone = cast(Transaction, atomic_swap_tx.clone()) + self.manager1.cpu_mining_service.resolve(atomic_swap_tx_clone) + + # Bob creates an input spending all genesis HTR and adds it to the atomic swap tx + htr_input = TxInput(not_none(self.genesis_block.hash), 0, b'') + atomic_swap_tx_clone.inputs.append(htr_input) + + # Bob adds an output to receive all test tokens + bob_output_script = P2PKH.create_output_script(self.address2) + tokens_output = TxOutput(token_creation_utxo.value, bob_output_script, 1) + atomic_swap_tx_clone.outputs.append(tokens_output) + + # Bob adds a change output for his HTR + htr_output = TxOutput(int(genesis_utxo.value / 2), bob_output_script) + atomic_swap_tx_clone.outputs.append(htr_output) + + # Bob signs his input using sighash_all to complete the tx + data_to_sign2 = atomic_swap_tx_clone.get_sighash_all() + assert self.manager2.wallet + public_bytes2, signature2 = self.manager2.wallet.get_input_aux_data(data_to_sign2, self.genesis_private_key) + htr_input.data = P2PKH.create_input_data(public_bytes2, signature2) + + # The atomic swap tx is now completed and valid, and can be propagated + self.manager1.verification_service.verify(atomic_swap_tx_clone) + self.manager1.propagate_tx(atomic_swap_tx_clone, fails_silently=False) + + @patch('hathor.transaction.scripts.opcode.is_opcode_valid', lambda _: True) + def test_sighash_range_with_multisig(self) -> None: + # Create a new test token + token_creation_tx = create_tokens(self.manager1, self.address1_b58) + token_uid = token_creation_tx.tokens[0] + token_creation_utxo = token_creation_tx.outputs[0] + genesis_utxo = self.genesis_block.outputs[0] + parents = self.manager1.get_new_tx_parents() + + public_keys = [ + bytes.fromhex('0250bf5890c9c6e9b4ab7f70375d31b827d45d0b7b4e3ba1918bcbe71b412c11d7'), + bytes.fromhex('02d83dd1e9e0ac7976704eedab43fe0b79309166a47d70ec3ce8bbb08b8414db46'), + bytes.fromhex('02358c539fa7474bf12f774749d0e1b5a9bc6e50920464818ebdb0043b143ae2ba'), + ] + + private_keys = [ + '3081de304906092a864886f70d01050d303c301b06092a864886f70d01050c300e04089abeae5e8a8f75d302020800301d060960864801650304012a0410abbde27221fd302280c13fca7887c85e048190c41403f39b1e9bbc5b6b7c3be4729c054fae9506dc0f8361adcff0ea393f0bb3ca9f992fc2eea83d532691bc9a570ed7fb9e939e6d1787881af40b19fb467f06595229e29b5a6268d831f0287530c7935d154deac61dd4ced988166f9c98054912935b607e2fb332e11c95b30ea4686eb0bda7dd57ed1eeb25b07cea9669dde5210528a00653159626a5baa61cdee7f4', # noqa: E501 + '3081de304906092a864886f70d01050d303c301b06092a864886f70d01050c300e040817ca6c6c47ade0de02020800301d060960864801650304012a041003746599b1d7dde5b875e4d8e2c4c157048190a25ccabb17e603260f8a1407bdca24904b6ae0aa9ae225d87552e5a9aa62d98b35b2c6c78f33cb051f3a3932387b4cea6f49e94f14ee856d0b630d77c1299ad7207b0be727d338cf92a3fffe232aff59764240aff84e079a5f6fb3355048ac15703290a005a9a033fdcb7fcf582a5ddf6fd7b7c1193bd7912cd275a88a8a6823b6c3ed291b4a3f4724875a3ae058054c', # noqa: E501 + '3081de304906092a864886f70d01050d303c301b06092a864886f70d01050c300e0408089f48fbf59fa92902020800301d060960864801650304012a041072f553e860b77654fd5fb80e5891e7c90481900fde272b88f9a70e7220b2d5adeda1ed29667527caedc2385be7f9e0d63defdde20557e90726e102f879eaf2233cceca8d4af239d5b2a159467255446f001c99b69e570bb176b95248fc21cb752d463b494c2195411639989086336a530d1f4eae91493faf89368f439991baa947ebeca00be7f5099ed69606dc78a4cc384d41542350a9054c5fa1295305dfc37e5989', # noqa: E501 + ] + + # Change the created token utxo to a MultiSig requiring 2 signatures + redeem_script = generate_multisig_redeem_script(signatures_required=2, public_key_bytes=public_keys) + multisig_address_b58 = generate_multisig_address(redeem_script) + multisig_address = decode_address(multisig_address_b58) + multisig_script = MultiSig.create_output_script(multisig_address) + + token_creation_utxo.script = multisig_script + + # Alice creates an input spending all created test tokens + tokens_input = TxInput(not_none(token_creation_tx.hash), 0, b'') + + # Alice creates an output sending half genesis HTR to herself + alice_output_script = P2PKH.create_output_script(self.address1) + htr_output = TxOutput(int(genesis_utxo.value / 2), alice_output_script) + + # Alice creates an atomic swap tx that's missing Bob's input, with half genesis HTR, and his output + atomic_swap_tx = Transaction( + weight=1, + inputs=[tokens_input], + outputs=[htr_output], + parents=parents, + tokens=[token_uid], + storage=self.manager1.tx_storage, + timestamp=token_creation_tx.timestamp + 1 + ) + self.manager1.cpu_mining_service.resolve(atomic_swap_tx) + + # Alice signs her input using sighash range, instead of sighash_all. + sighash_range = SighashRange(input_start=0, input_end=1, output_start=0, output_end=1) + data_to_sign1 = atomic_swap_tx.get_custom_sighash_data(sighash_range) + + signatures = [] + for private_key_hex in private_keys[:2]: + signature = generate_signature_for_data(data_to_sign1, bytes.fromhex(private_key_hex), password=b'1234') + signatures.append(signature) + + tokens_input.data = MultiSig.create_input_data( + redeem_script=redeem_script, + signatures=signatures, + sighash=sighash_range, + ) + + # At this point, the tx is partial. The inputs are valid, but they're mismatched with outputs + self.manager1.verification_service.verifiers.tx.verify_inputs(atomic_swap_tx) + with pytest.raises(InputOutputMismatch): + self.manager1.verification_service.verify(atomic_swap_tx) + + # Alice sends the tx bytes to Bob, represented here by cloning the tx + atomic_swap_tx_clone = cast(Transaction, atomic_swap_tx.clone()) + self.manager1.cpu_mining_service.resolve(atomic_swap_tx_clone) + + # Bob creates an input spending all genesis HTR and adds it to the atomic swap tx + htr_input = TxInput(not_none(self.genesis_block.hash), 0, b'') + atomic_swap_tx_clone.inputs.append(htr_input) + + # Bob adds an output to receive all test tokens + bob_output_script = P2PKH.create_output_script(self.address2) + tokens_output = TxOutput(token_creation_utxo.value, bob_output_script, 1) + atomic_swap_tx_clone.outputs.append(tokens_output) + + # Bob adds a change output for his HTR + htr_output = TxOutput(int(genesis_utxo.value / 2), bob_output_script) + atomic_swap_tx_clone.outputs.append(htr_output) + + # Bob signs his input using sighash_all to complete the tx + data_to_sign2 = atomic_swap_tx_clone.get_sighash_all() + assert self.manager2.wallet + public_bytes2, signature2 = self.manager2.wallet.get_input_aux_data(data_to_sign2, self.genesis_private_key) + htr_input.data = P2PKH.create_input_data(public_bytes2, signature2) + + # The atomic swap tx is now completed and valid, and can be propagated + self.manager1.verification_service.verify(atomic_swap_tx_clone) + self.manager1.propagate_tx(atomic_swap_tx_clone, fails_silently=False) + + @patch('hathor.transaction.scripts.opcode.is_opcode_valid', lambda _: True) + def test_sighash_range_with_limit(self) -> None: + # Create a new test token + token_creation_tx = create_tokens(self.manager1, self.address1_b58) + token_uid = token_creation_tx.tokens[0] + token_creation_utxo = token_creation_tx.outputs[0] + genesis_utxo = self.genesis_block.outputs[0] + parents = self.manager1.get_new_tx_parents() + + # Alice creates an input spending all created test tokens + tokens_input = TxInput(not_none(token_creation_tx.hash), 0, b'') + + # Alice creates an output sending half genesis HTR to herself + alice_output_script = P2PKH.create_output_script(self.address1) + htr_output = TxOutput(int(genesis_utxo.value / 2), alice_output_script) + + # Alice creates an atomic swap tx that's missing Bob's input, with half genesis HTR, and his output + atomic_swap_tx = Transaction( + weight=1, + inputs=[tokens_input], + outputs=[htr_output], + parents=parents, + tokens=[token_uid], + storage=self.manager1.tx_storage, + timestamp=token_creation_tx.timestamp + 1 + ) + self.manager1.cpu_mining_service.resolve(atomic_swap_tx) + + # Alice signs her input using sighash range, instead of sighash_all. + # She also sets max inputs and max outputs limits, including one output for change. + sighash_range = SighashRange(input_start=0, input_end=1, output_start=0, output_end=1) + data_to_sign1 = atomic_swap_tx.get_custom_sighash_data(sighash_range) + assert self.manager1.wallet + public_bytes1, signature1 = self.manager1.wallet.get_input_aux_data(data_to_sign1, self.private_key1) + tokens_input.data = P2PKH.create_input_data( + public_key_bytes=public_bytes1, + signature=signature1, + sighash=sighash_range, + inputs_outputs_limit=InputsOutputsLimit(max_inputs=2, max_outputs=3) + ) + + # At this point, the tx is partial. The inputs are valid, but they're mismatched with outputs + self.manager1.verification_service.verifiers.tx.verify_inputs(atomic_swap_tx) + with pytest.raises(InputOutputMismatch): + self.manager1.verification_service.verify(atomic_swap_tx) + + # Alice sends the tx bytes to Bob, represented here by cloning the tx + atomic_swap_tx_clone = cast(Transaction, atomic_swap_tx.clone()) + self.manager1.cpu_mining_service.resolve(atomic_swap_tx_clone) + + # Bob creates an input spending all genesis HTR and adds it to the atomic swap tx + htr_input = TxInput(not_none(self.genesis_block.hash), 0, b'') + atomic_swap_tx_clone.inputs.append(htr_input) + + # Bob adds an output to receive all test tokens + bob_output_script = P2PKH.create_output_script(self.address2) + tokens_output = TxOutput(token_creation_utxo.value, bob_output_script, 1) + atomic_swap_tx_clone.outputs.append(tokens_output) + + # Bob adds two change outputs for his HTR, which violates the maximum tx outputs set by Alice + htr_output1 = TxOutput(int(genesis_utxo.value / 4), bob_output_script) + htr_output2 = TxOutput(int(genesis_utxo.value / 4), bob_output_script) + atomic_swap_tx_clone.outputs.append(htr_output1) + atomic_swap_tx_clone.outputs.append(htr_output2) + + # Bob signs his input using sighash_all to complete the tx + data_to_sign2 = atomic_swap_tx_clone.get_sighash_all() + assert self.manager2.wallet + public_bytes2, signature2 = self.manager2.wallet.get_input_aux_data(data_to_sign2, self.genesis_private_key) + htr_input.data = P2PKH.create_input_data(public_bytes2, signature2) + + # The atomic swap tx is not valid and cannot be propagated + with pytest.raises(InvalidInputData) as e: + self.manager1.verification_service.verify(atomic_swap_tx_clone) + + self.assertEqual(str(e.value), "Maximum number of outputs exceeded (4 > 3).") + + with pytest.raises(InvalidNewTransaction): + self.manager1.propagate_tx(atomic_swap_tx_clone, fails_silently=False) + + @patch('hathor.transaction.scripts.opcode.is_opcode_valid', lambda _: True) + def test_sighash_range_input_not_selected(self) -> None: + # Create a new test token + token_creation_tx = create_tokens(self.manager1, self.address1_b58) + token_uid = token_creation_tx.tokens[0] + parents = self.manager1.get_new_tx_parents() + + # Alice creates an input spending all created test tokens + tokens_input = TxInput(not_none(token_creation_tx.hash), 0, b'') + + # Alice creates an input spending all genesis HTR + genesis_input = TxInput(not_none(self.genesis_block.hash), 0, b'') + + # Alice creates an atomic swap tx + atomic_swap_tx = Transaction( + weight=1, + inputs=[tokens_input, genesis_input], + outputs=[], + parents=parents, + tokens=[token_uid], + storage=self.manager1.tx_storage, + timestamp=token_creation_tx.timestamp + 1 + ) + self.manager1.cpu_mining_service.resolve(atomic_swap_tx) + + # Alice signs her token input using sighash range, instead of sighash_all. + sighash_range = SighashRange(input_start=0, input_end=1, output_start=0, output_end=0) + data_to_sign1 = atomic_swap_tx.get_custom_sighash_data(sighash_range) + assert self.manager1.wallet + public_bytes1, signature1 = self.manager1.wallet.get_input_aux_data(data_to_sign1, self.private_key1) + tokens_input.data = P2PKH.create_input_data( + public_key_bytes=public_bytes1, + signature=signature1, + sighash=sighash_range, + ) + + # Alice signs her genesis input using the same sighash, so the genesis input is not selected in the range. + public_bytes1, signature1 = self.manager1.wallet.get_input_aux_data(data_to_sign1, self.genesis_private_key) + genesis_input.data = P2PKH.create_input_data( + public_key_bytes=public_bytes1, + signature=signature1, + sighash=sighash_range, + ) + + # The inputs are invalid, since one of them doesn't select itself. + with pytest.raises(InvalidInputData) as e: + self.manager1.verification_service.verifiers.tx.verify_inputs(atomic_swap_tx) + + self.assertEqual(str(e.value), 'Input at index 1 must select itself when using a custom sighash.') + + with pytest.raises(InvalidInputData) as e: + self.manager1.verification_service.verify(atomic_swap_tx) + + self.assertEqual(str(e.value), 'Input at index 1 must select itself when using a custom sighash.') + + @patch('hathor.transaction.scripts.opcode.is_opcode_valid', lambda _: True) + def test_sighash_range_nonexistent_input(self) -> None: + # Create a new test token + token_creation_tx = create_tokens(self.manager1, self.address1_b58) + token_uid = token_creation_tx.tokens[0] + genesis_utxo = self.genesis_block.outputs[0] + parents = self.manager1.get_new_tx_parents() + + # Alice creates an input spending all created test tokens + tokens_input = TxInput(not_none(token_creation_tx.hash), 0, b'') + + # Alice creates an output sending half genesis HTR to herself + alice_output_script = P2PKH.create_output_script(self.address1) + htr_output = TxOutput(int(genesis_utxo.value / 2), alice_output_script) + + # Alice creates an atomic swap tx that's missing Bob's input, with half genesis HTR, and his output + atomic_swap_tx = Transaction( + weight=1, + inputs=[tokens_input], + outputs=[htr_output], + parents=parents, + tokens=[token_uid], + storage=self.manager1.tx_storage, + timestamp=token_creation_tx.timestamp + 1 + ) + self.manager1.cpu_mining_service.resolve(atomic_swap_tx) + + # Alice signs her input using sighash range, instead of sighash_all. + sighash_range = SighashRange(input_start=0, input_end=1, output_start=0, output_end=1) + data_to_sign1 = atomic_swap_tx.get_custom_sighash_data(sighash_range) + assert self.manager1.wallet + public_bytes1, signature1 = self.manager1.wallet.get_input_aux_data(data_to_sign1, self.private_key1) + tokens_input.data = P2PKH.create_input_data( + public_key_bytes=public_bytes1, + signature=signature1, + sighash=SighashRange(input_start=0, input_end=2, output_start=0, output_end=1), + ) + + # The input is invalid, since it selects a nonexistent input + with pytest.raises(InvalidScriptError) as e: + self.manager1.verification_service.verifiers.tx.verify_inputs(atomic_swap_tx) + + assert str(e.value) == 'Custom sighash selected nonexistent input/output.' + + with pytest.raises(InvalidScriptError): + self.manager1.verification_service.verify(atomic_swap_tx) + + @patch('hathor.transaction.scripts.opcode.is_opcode_valid', lambda _: True) + def test_sighash_range_with_max_subsets(self) -> None: + # Create a new test token + token_creation_tx = create_tokens(self.manager1, self.address1_b58) + token_uid = token_creation_tx.tokens[0] + token_creation_utxo = token_creation_tx.outputs[0] + genesis_utxo = self.genesis_block.outputs[0] + parents = self.manager1.get_new_tx_parents() + + # Alice creates an input spending all created test tokens + tokens_input = TxInput(not_none(token_creation_tx.hash), 0, b'') + + # Alice creates an output sending half genesis HTR to herself + alice_output_script = P2PKH.create_output_script(self.address1) + htr_output = TxOutput(int(genesis_utxo.value / 2), alice_output_script) + + # Alice creates an atomic swap tx that's missing Bob's input, with half genesis HTR, and his output + atomic_swap_tx = Transaction( + weight=1, + inputs=[tokens_input], + outputs=[htr_output], + parents=parents, + tokens=[token_uid], + storage=self.manager1.tx_storage, + timestamp=token_creation_tx.timestamp + 1 + ) + self.manager1.cpu_mining_service.resolve(atomic_swap_tx) + + # Alice signs her input using sighash range, instead of sighash_all. + # She also sets a max sighash subsets limit. + sighash_range = SighashRange(input_start=0, input_end=1, output_start=0, output_end=1) + data_to_sign1 = atomic_swap_tx.get_custom_sighash_data(sighash_range) + assert self.manager1.wallet + public_bytes1, signature1 = self.manager1.wallet.get_input_aux_data(data_to_sign1, self.private_key1) + tokens_input.data = P2PKH.create_input_data( + public_key_bytes=public_bytes1, + signature=signature1, + sighash=sighash_range, + max_sighash_subsets=1, + ) + + # At this point, the tx is partial. The inputs are valid, but they're mismatched with outputs + self.manager1.verification_service.verifiers.tx.verify_inputs(atomic_swap_tx) + with pytest.raises(InputOutputMismatch): + self.manager1.verification_service.verify(atomic_swap_tx) + + # Alice sends the tx bytes to Bob, represented here by cloning the tx + atomic_swap_tx_clone = cast(Transaction, atomic_swap_tx.clone()) + self.manager1.cpu_mining_service.resolve(atomic_swap_tx_clone) + + # Bob creates an input spending all genesis HTR and adds it to the atomic swap tx + htr_input = TxInput(not_none(self.genesis_block.hash), 0, b'') + atomic_swap_tx_clone.inputs.append(htr_input) + + # Bob adds an output to receive all test tokens + bob_output_script = P2PKH.create_output_script(self.address2) + tokens_output = TxOutput(token_creation_utxo.value, bob_output_script, 1) + atomic_swap_tx_clone.outputs.append(tokens_output) + + # Bob adds a change output for his HTR + htr_output = TxOutput(int(genesis_utxo.value / 2), bob_output_script) + atomic_swap_tx_clone.outputs.append(htr_output) + + # Bob signs his input with a different sighash subset, which violates the maximum subsets set by Alice + sighash_range2 = SighashRange(input_start=1, input_end=2, output_start=1, output_end=2) + data_to_sign2 = atomic_swap_tx_clone.get_custom_sighash_data(sighash_range2) + assert self.manager2.wallet + public_bytes2, signature2 = self.manager2.wallet.get_input_aux_data(data_to_sign2, self.genesis_private_key) + htr_input.data = P2PKH.create_input_data( + public_key_bytes=public_bytes2, + signature=signature2, + sighash=sighash_range2, + ) + + # The atomic swap tx is not valid and cannot be propagated + with pytest.raises(TooManySighashSubsets) as e: + self.manager1.verification_service.verify(atomic_swap_tx_clone) + + self.assertEqual(str(e.value), "There are more custom sighash subsets than the configured maximum (2 > 1).") + + with pytest.raises(InvalidNewTransaction): + self.manager1.propagate_tx(atomic_swap_tx_clone, fails_silently=False) + + # Bob signs his input using sighash_all to complete the tx + data_to_sign3 = atomic_swap_tx_clone.get_sighash_all() + assert self.manager2.wallet + public_bytes3, signature3 = self.manager2.wallet.get_input_aux_data(data_to_sign3, self.genesis_private_key) + htr_input.data = P2PKH.create_input_data(public_bytes3, signature3) + + # The atomic swap tx is now completed and valid, and can be propagated + self.manager1.verification_service.verify(atomic_swap_tx_clone) + self.manager1.propagate_tx(atomic_swap_tx_clone, fails_silently=False) + + +class SyncV1SighashTest(unittest.SyncV1Params, BaseSighashRangeTest): + __test__ = True + + +class SyncV2SighashTest(unittest.SyncV2Params, BaseSighashRangeTest): + __test__ = True + + +# sync-bridge should behave like sync-v2 +class SyncBridgeSighashTest(unittest.SyncBridgeParams, SyncV2SighashTest): + pass diff --git a/tests/tx/scripts/test_tx_sighash.py b/tests/tx/scripts/test_tx_sighash.py new file mode 100644 index 000000000..915165450 --- /dev/null +++ b/tests/tx/scripts/test_tx_sighash.py @@ -0,0 +1,111 @@ +# Copyright 2023 Hathor Labs +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from unittest.mock import patch + +from hathor.transaction import Transaction, TxInput, TxOutput +from hathor.transaction.scripts.sighash import SighashBitmask, SighashRange + + +def test_get_sighash_bitmask() -> None: + inputs = [ + TxInput(tx_id=b'tx1', index=0, data=b''), + TxInput(tx_id=b'tx2', index=1, data=b''), + TxInput(tx_id=b'tx3', index=1, data=b''), + TxInput(tx_id=b'tx4', index=1, data=b''), + TxInput(tx_id=b'tx5', index=1, data=b''), + TxInput(tx_id=b'tx6', index=1, data=b''), + TxInput(tx_id=b'tx7', index=1, data=b''), + TxInput(tx_id=b'tx8', index=1, data=b''), + ] + outputs = [ + TxOutput(value=11, script=b''), + TxOutput(value=22, script=b''), + TxOutput(value=33, script=b''), + TxOutput(value=44, script=b''), + TxOutput(value=55, script=b''), + TxOutput(value=66, script=b''), + TxOutput(value=77, script=b''), + TxOutput(value=88, script=b''), + ] + tx = Transaction(inputs=inputs, outputs=outputs) + + with patch.object(tx, '_get_sighash') as mock: + tx.get_custom_sighash_data(SighashBitmask(inputs=0b0000_0001, outputs=0b0000_0000)) + mock.assert_called_once_with(inputs=inputs[0:1], outputs=[]) + mock.reset_mock() + + tx.get_custom_sighash_data(SighashBitmask(inputs=0b0000_0011, outputs=0b0000_0001)) + mock.assert_called_once_with(inputs=inputs[0:2], outputs=outputs[0:1]) + mock.reset_mock() + + tx.get_custom_sighash_data(SighashBitmask(inputs=0b0000_1111, outputs=0b0000_1111)) + mock.assert_called_once_with(inputs=inputs[0:4], outputs=outputs[0:4]) + mock.reset_mock() + + tx.get_custom_sighash_data(SighashBitmask(inputs=0b1110_0000, outputs=0b0111_0000)) + mock.assert_called_once_with(inputs=inputs[5:8], outputs=outputs[4:7]) + mock.reset_mock() + + tx.get_custom_sighash_data(SighashBitmask(inputs=0b1101_1010, outputs=0b1110_0010)) + mock.assert_called_once_with( + inputs=[inputs[1], inputs[3], inputs[4], inputs[6], inputs[7]], + outputs=[outputs[1], outputs[5], outputs[6], outputs[7]] + ) + mock.reset_mock() + + tx.get_custom_sighash_data(SighashBitmask(inputs=0b1111_1111, outputs=0b1111_1111)) + mock.assert_called_once_with(inputs=inputs, outputs=outputs) + mock.reset_mock() + + +def test_get_sighash_range() -> None: + inputs = [ + TxInput(tx_id=b'tx1', index=0, data=b''), + TxInput(tx_id=b'tx2', index=1, data=b''), + TxInput(tx_id=b'tx3', index=1, data=b''), + TxInput(tx_id=b'tx4', index=1, data=b''), + TxInput(tx_id=b'tx5', index=1, data=b''), + TxInput(tx_id=b'tx6', index=1, data=b''), + TxInput(tx_id=b'tx7', index=1, data=b''), + TxInput(tx_id=b'tx8', index=1, data=b''), + ] + outputs = [ + TxOutput(value=11, script=b''), + TxOutput(value=22, script=b''), + TxOutput(value=33, script=b''), + TxOutput(value=44, script=b''), + TxOutput(value=55, script=b''), + TxOutput(value=66, script=b''), + TxOutput(value=77, script=b''), + TxOutput(value=88, script=b''), + ] + tx = Transaction(inputs=inputs, outputs=outputs) + + with patch.object(tx, '_get_sighash') as mock: + tx.get_custom_sighash_data(SighashRange(input_start=34, input_end=34, output_start=123, output_end=123)) + mock.assert_called_once_with(inputs=[], outputs=[]) + mock.reset_mock() + + tx.get_custom_sighash_data(SighashRange(input_start=0, input_end=1, output_start=0, output_end=0)) + mock.assert_called_once_with(inputs=inputs[0:1], outputs=[]) + mock.reset_mock() + + tx.get_custom_sighash_data(SighashRange(input_start=2, input_end=7, output_start=4, output_end=8)) + mock.assert_called_once_with(inputs=inputs[2:7], outputs=outputs[4:8]) + mock.reset_mock() + + tx.get_custom_sighash_data(SighashRange(input_start=0, input_end=8, output_start=0, output_end=8)) + mock.assert_called_once_with(inputs=inputs, outputs=outputs) + mock.reset_mock() diff --git a/tests/tx/test_multisig.py b/tests/tx/test_multisig.py index 22748266e..be1c445cc 100644 --- a/tests/tx/test_multisig.py +++ b/tests/tx/test_multisig.py @@ -134,7 +134,7 @@ def test_spend_multisig(self): expected_dict = {'type': 'MultiSig', 'address': self.multisig_address_b58, 'timelock': None} self.assertEqual(cls_script.to_human_readable(), expected_dict) - script_eval(tx, tx_input, tx1) + script_eval(tx, tx1, input_index=0) # Script error with self.assertRaises(ScriptError): diff --git a/tests/tx/test_nano_contracts.py b/tests/tx/test_nano_contracts.py index b23addf81..bae489ff7 100644 --- a/tests/tx/test_nano_contracts.py +++ b/tests/tx/test_nano_contracts.py @@ -36,6 +36,6 @@ def test_match_values(self): input_data = NanoContractMatchValues.create_input_data( base64.b64decode(oracle_data), base64.b64decode(oracle_signature), base64.b64decode(pubkey)) txin = TxInput(b'aa', 0, input_data) - spent_tx = Transaction(outputs=[TxOutput(20, script)]) - tx = Transaction(outputs=[TxOutput(20, P2PKH.create_output_script(address))]) - script_eval(tx, txin, spent_tx) + spent_tx = Transaction(hash=b'aa', outputs=[TxOutput(20, script)]) + tx = Transaction(inputs=[txin], outputs=[TxOutput(20, P2PKH.create_output_script(address))]) + script_eval(tx, spent_tx, input_index=0) diff --git a/tests/tx/test_scripts.py b/tests/tx/test_scripts.py index 34ce6ac25..f0b27c7a9 100644 --- a/tests/tx/test_scripts.py +++ b/tests/tx/test_scripts.py @@ -1,16 +1,21 @@ import struct -from unittest.mock import Mock +from unittest.mock import Mock, patch from cryptography.hazmat.primitives import hashes from cryptography.hazmat.primitives.asymmetric import ec from hathor.crypto.util import get_address_from_public_key, get_hash160, get_public_key_bytes_compressed from hathor.transaction.exceptions import ( + CustomSighashModelInvalid, DataIndexError, EqualVerifyFailed, FinalStackInvalid, + InputNotSelectedError, + InputsOutputsLimitModelInvalid, InvalidScriptError, InvalidStackData, + MaxInputsExceededError, + MaxOutputsExceededError, MissingStackItems, OracleChecksigFailed, OutOfData, @@ -38,6 +43,8 @@ get_script_op, ) from hathor.transaction.scripts.opcode import ( + execute_op_code, + is_opcode_valid, op_checkdatasig, op_checkmultisig, op_checksig, @@ -52,10 +59,15 @@ op_greaterthan_timestamp, op_hash160, op_integer, + op_max_inputs_outputs, + op_max_sighash_subsets, op_pushdata, op_pushdata1, + op_sighash_bitmask, + op_sighash_range, ) from hathor.transaction.scripts.script_context import ScriptContext +from hathor.transaction.scripts.sighash import SighashBitmask, SighashRange from hathor.transaction.storage import TransactionMemoryStorage from hathor.wallet import HDWallet from tests import unittest @@ -176,22 +188,22 @@ def test_pushdata1(self): def test_dup(self): with self.assertRaises(MissingStackItems): - op_dup(ScriptContext(stack=[], logs=[], extras=Mock())) + op_dup(ScriptContext(stack=[], logs=[], extras=Mock(), settings=Mock())) stack = [1] - op_dup(ScriptContext(stack=stack, logs=[], extras=Mock())) + op_dup(ScriptContext(stack=stack, logs=[], extras=Mock(), settings=Mock())) self.assertEqual(stack[-1], stack[-2]) def test_equalverify(self): elem = b'a' with self.assertRaises(MissingStackItems): - op_equalverify(ScriptContext(stack=[elem], logs=[], extras=Mock())) + op_equalverify(ScriptContext(stack=[elem], logs=[], extras=Mock(), settings=Mock())) # no exception should be raised - op_equalverify(ScriptContext(stack=[elem, elem], logs=[], extras=Mock())) + op_equalverify(ScriptContext(stack=[elem, elem], logs=[], extras=Mock(), settings=Mock())) with self.assertRaises(EqualVerifyFailed): - op_equalverify(ScriptContext(stack=[elem, b'aaaa'], logs=[], extras=Mock())) + op_equalverify(ScriptContext(stack=[elem, b'aaaa'], logs=[], extras=Mock(), settings=Mock())) def test_checksig_raise_on_uncompressed_pubkey(self): """ Uncompressed pubkeys shoud not be accepted, even if they solve the signature @@ -213,11 +225,11 @@ def test_checksig_raise_on_uncompressed_pubkey(self): # ScriptError if pubkey is not a valid compressed public key # with wrong signature with self.assertRaises(ScriptError): - op_checksig(ScriptContext(stack=[b'123', pubkey_uncompressed], logs=[], extras=Mock())) + op_checksig(ScriptContext(stack=[b'123', pubkey_uncompressed], logs=[], extras=Mock(), settings=Mock())) # or with rigth one # this will make sure the signature is not made when parameters are wrong with self.assertRaises(ScriptError): - op_checksig(ScriptContext(stack=[signature, pubkey_uncompressed], logs=[], extras=Mock())) + op_checksig(ScriptContext(stack=[signature, pubkey_uncompressed], logs=[], extras=Mock(), settings=Mock())) def test_checksig_check_for_compressed_pubkey(self): """ Compressed pubkeys bytes representation always start with a byte 2 or 3 @@ -226,19 +238,19 @@ def test_checksig_check_for_compressed_pubkey(self): """ # ScriptError if pubkey is not a public key but starts with 2 or 3 with self.assertRaises(ScriptError): - op_checksig(ScriptContext(stack=[b'\x0233', b'\x0233'], logs=[], extras=Mock())) + op_checksig(ScriptContext(stack=[b'\x0233', b'\x0233'], logs=[], extras=Mock(), settings=Mock())) with self.assertRaises(ScriptError): - op_checksig(ScriptContext(stack=[b'\x0321', b'\x0321'], logs=[], extras=Mock())) + op_checksig(ScriptContext(stack=[b'\x0321', b'\x0321'], logs=[], extras=Mock(), settings=Mock())) # ScriptError if pubkey does not start with 2 or 3 with self.assertRaises(ScriptError): - op_checksig(ScriptContext(stack=[b'\x0123', b'\x0123'], logs=[], extras=Mock())) + op_checksig(ScriptContext(stack=[b'\x0123', b'\x0123'], logs=[], extras=Mock(), settings=Mock())) with self.assertRaises(ScriptError): - op_checksig(ScriptContext(stack=[b'\x0423', b'\x0423'], logs=[], extras=Mock())) + op_checksig(ScriptContext(stack=[b'\x0423', b'\x0423'], logs=[], extras=Mock(), settings=Mock())) def test_checksig(self): with self.assertRaises(MissingStackItems): - op_checksig(ScriptContext(stack=[1], logs=[], extras=Mock())) + op_checksig(ScriptContext(stack=[1], logs=[], extras=Mock(), settings=Mock())) block = self.genesis_blocks[0] @@ -253,15 +265,15 @@ def test_checksig(self): signature = self.genesis_private_key.sign(hashed_data, ec.ECDSA(hashes.SHA256())) pubkey_bytes = get_public_key_bytes_compressed(self.genesis_public_key) - extras = ScriptExtras(tx=tx, txin=Mock(), spent_tx=Mock()) + extras = ScriptExtras(tx=tx, spent_tx=block, input_index=0) # wrong signature puts False (0) on stack stack = [b'aaaaaaaaa', pubkey_bytes] - op_checksig(ScriptContext(stack=stack, logs=[], extras=extras)) + op_checksig(ScriptContext(stack=stack, logs=[], extras=extras, settings=Mock())) self.assertEqual(0, stack.pop()) stack = [signature, pubkey_bytes] - op_checksig(ScriptContext(stack=stack, logs=[], extras=extras)) + op_checksig(ScriptContext(stack=stack, logs=[], extras=extras, settings=Mock())) self.assertEqual(1, stack.pop()) def test_checksig_cache(self): @@ -278,22 +290,22 @@ def test_checksig_cache(self): signature = self.genesis_private_key.sign(hashed_data, ec.ECDSA(hashes.SHA256())) pubkey_bytes = get_public_key_bytes_compressed(self.genesis_public_key) - extras = ScriptExtras(tx=tx, txin=Mock(), spent_tx=Mock()) + extras = ScriptExtras(tx=tx, spent_tx=block, input_index=0) stack = [signature, pubkey_bytes] self.assertIsNone(tx._sighash_data_cache) - op_checksig(ScriptContext(stack=stack, logs=[], extras=extras)) + op_checksig(ScriptContext(stack=stack, logs=[], extras=extras, settings=Mock())) self.assertIsNotNone(tx._sighash_data_cache) self.assertEqual(1, stack.pop()) def test_hash160(self): with self.assertRaises(MissingStackItems): - op_hash160(ScriptContext(stack=[], logs=[], extras=Mock())) + op_hash160(ScriptContext(stack=[], logs=[], extras=Mock(), settings=Mock())) elem = b'aaaaaaaa' hash160 = get_hash160(elem) stack = [elem] - op_hash160(ScriptContext(stack=stack, logs=[], extras=Mock())) + op_hash160(ScriptContext(stack=stack, logs=[], extras=Mock(), settings=Mock())) self.assertEqual(hash160, stack.pop()) def test_checkdatasig_raise_on_uncompressed_pubkey(self): @@ -316,27 +328,33 @@ def test_checkdatasig_raise_on_uncompressed_pubkey(self): # with wrong signature stack = [data, b'123', pubkey_uncompressed] with self.assertRaises(ScriptError): - op_checkdatasig(ScriptContext(stack=stack, logs=[], extras=Mock())) + op_checkdatasig(ScriptContext(stack=stack, logs=[], extras=Mock(), settings=Mock())) # or with rigth one # this will make sure the signature is not made when parameters are wrong stack = [data, signature, pubkey_uncompressed] with self.assertRaises(ScriptError): - op_checkdatasig(ScriptContext(stack=stack, logs=[], extras=Mock())) + op_checkdatasig(ScriptContext(stack=stack, logs=[], extras=Mock(), settings=Mock())) def test_checkdatasig_check_for_compressed_pubkey(self): # ScriptError if pubkey is not a public key but starts with 2 or 3 with self.assertRaises(ScriptError): - op_checkdatasig(ScriptContext(stack=[b'\x0233', b'\x0233', b'\x0233'], logs=[], extras=Mock())) + op_checkdatasig( + ScriptContext(stack=[b'\x0233', b'\x0233', b'\x0233'], logs=[], extras=Mock(), settings=Mock()) + ) with self.assertRaises(ScriptError): - op_checkdatasig(ScriptContext(stack=[b'\x0321', b'\x0321', b'\x0321'], logs=[], extras=Mock())) + op_checkdatasig( + ScriptContext(stack=[b'\x0321', b'\x0321', b'\x0321'], logs=[], extras=Mock(), settings=Mock()) + ) # ScriptError if pubkey is not a public key with self.assertRaises(ScriptError): - op_checkdatasig(ScriptContext(stack=[b'\x0123', b'\x0123', b'\x0123'], logs=[], extras=Mock())) + op_checkdatasig( + ScriptContext(stack=[b'\x0123', b'\x0123', b'\x0123'], logs=[], extras=Mock(), settings=Mock()) + ) def test_checkdatasig(self): with self.assertRaises(MissingStackItems): - op_checkdatasig(ScriptContext(stack=[1, 1], logs=[], extras=Mock())) + op_checkdatasig(ScriptContext(stack=[1, 1], logs=[], extras=Mock(), settings=Mock())) data = b'some_random_data' signature = self.genesis_private_key.sign(data, ec.ECDSA(hashes.SHA256())) @@ -344,12 +362,12 @@ def test_checkdatasig(self): stack = [data, signature, pubkey_bytes] # no exception should be raised and data is left on stack - op_checkdatasig(ScriptContext(stack=stack, logs=[], extras=Mock())) + op_checkdatasig(ScriptContext(stack=stack, logs=[], extras=Mock(), settings=Mock())) self.assertEqual(data, stack.pop()) stack = [b'data_not_matching', signature, pubkey_bytes] with self.assertRaises(OracleChecksigFailed): - op_checkdatasig(ScriptContext(stack=stack, logs=[], extras=Mock())) + op_checkdatasig(ScriptContext(stack=stack, logs=[], extras=Mock(), settings=Mock())) def test_get_data_value(self): value0 = b'value0' @@ -370,7 +388,7 @@ def test_get_data_value(self): def test_data_strequal(self): with self.assertRaises(MissingStackItems): - op_data_strequal(ScriptContext(stack=[1, 1], logs=[], extras=Mock())) + op_data_strequal(ScriptContext(stack=[1, 1], logs=[], extras=Mock(), settings=Mock())) value0 = b'value0' value1 = b'vvvalue1' @@ -379,20 +397,20 @@ def test_data_strequal(self): data = (bytes([len(value0)]) + value0 + bytes([len(value1)]) + value1 + bytes([len(value2)]) + value2) stack = [data, 0, value0] - op_data_strequal(ScriptContext(stack=stack, logs=[], extras=Mock())) + op_data_strequal(ScriptContext(stack=stack, logs=[], extras=Mock(), settings=Mock())) self.assertEqual(stack.pop(), data) stack = [data, 1, value0] with self.assertRaises(VerifyFailed): - op_data_strequal(ScriptContext(stack=stack, logs=[], extras=Mock())) + op_data_strequal(ScriptContext(stack=stack, logs=[], extras=Mock(), settings=Mock())) stack = [data, b'\x00', value0] with self.assertRaises(VerifyFailed): - op_data_strequal(ScriptContext(stack=stack, logs=[], extras=Mock())) + op_data_strequal(ScriptContext(stack=stack, logs=[], extras=Mock(), settings=Mock())) def test_data_greaterthan(self): with self.assertRaises(MissingStackItems): - op_data_greaterthan(ScriptContext(stack=[1, 1], logs=[], extras=Mock())) + op_data_greaterthan(ScriptContext(stack=[1, 1], logs=[], extras=Mock(), settings=Mock())) value0 = struct.pack('!I', 1000) value1 = struct.pack('!I', 1) @@ -400,24 +418,24 @@ def test_data_greaterthan(self): data = (bytes([len(value0)]) + value0 + bytes([len(value1)]) + value1) stack = [data, 0, struct.pack('!I', 999)] - op_data_greaterthan(ScriptContext(stack=stack, logs=[], extras=Mock())) + op_data_greaterthan(ScriptContext(stack=stack, logs=[], extras=Mock(), settings=Mock())) self.assertEqual(stack.pop(), data) stack = [data, 1, struct.pack('!I', 0)] - op_data_greaterthan(ScriptContext(stack=stack, logs=[], extras=Mock())) + op_data_greaterthan(ScriptContext(stack=stack, logs=[], extras=Mock(), settings=Mock())) self.assertEqual(stack.pop(), data) with self.assertRaises(VerifyFailed): stack = [data, 1, struct.pack('!I', 1)] - op_data_greaterthan(ScriptContext(stack=stack, logs=[], extras=Mock())) + op_data_greaterthan(ScriptContext(stack=stack, logs=[], extras=Mock(), settings=Mock())) stack = [data, 1, b'not_an_int'] with self.assertRaises(VerifyFailed): - op_data_greaterthan(ScriptContext(stack=stack, logs=[], extras=Mock())) + op_data_greaterthan(ScriptContext(stack=stack, logs=[], extras=Mock(), settings=Mock())) stack = [data, b'\x00', struct.pack('!I', 0)] with self.assertRaises(VerifyFailed): - op_data_greaterthan(ScriptContext(stack=stack, logs=[], extras=Mock())) + op_data_greaterthan(ScriptContext(stack=stack, logs=[], extras=Mock(), settings=Mock())) def test_data_match_interval(self): with self.assertRaises(MissingStackItems): @@ -453,40 +471,40 @@ def test_data_match_interval(self): def test_data_match_value(self): with self.assertRaises(MissingStackItems): - op_data_match_value(ScriptContext(stack=[1, b'2'], logs=[], extras=Mock())) + op_data_match_value(ScriptContext(stack=[1, b'2'], logs=[], extras=Mock(), settings=Mock())) value0 = struct.pack('!I', 1000) data = (bytes([len(value0)]) + value0) stack = [data, 0, 'key1', struct.pack('!I', 1000), 'key2', struct.pack('!I', 1005), 'key3', bytes([2])] - op_data_match_value(ScriptContext(stack=stack, logs=[], extras=Mock())) + op_data_match_value(ScriptContext(stack=stack, logs=[], extras=Mock(), settings=Mock())) self.assertEqual(stack.pop(), 'key2') self.assertEqual(len(stack), 0) stack = [data, 0, 'key1', struct.pack('!I', 999), 'key2', struct.pack('!I', 1000), 'key3', bytes([2])] - op_data_match_value(ScriptContext(stack=stack, logs=[], extras=Mock())) + op_data_match_value(ScriptContext(stack=stack, logs=[], extras=Mock(), settings=Mock())) self.assertEqual(stack.pop(), 'key3') self.assertEqual(len(stack), 0) # missing 1 item on stack stack = [data, 0, 'key1', struct.pack('!I', 1000), 'key2', struct.pack('!I', 1000), bytes([2])] with self.assertRaises(MissingStackItems): - op_data_match_value(ScriptContext(stack=stack, logs=[], extras=Mock())) + op_data_match_value(ScriptContext(stack=stack, logs=[], extras=Mock(), settings=Mock())) # no value matches stack = [data, 0, 'key1', struct.pack('!I', 999), 'key2', struct.pack('!I', 1111), 'key3', bytes([2])] - op_data_match_value(ScriptContext(stack=stack, logs=[], extras=Mock())) + op_data_match_value(ScriptContext(stack=stack, logs=[], extras=Mock(), settings=Mock())) self.assertEqual(stack.pop(), 'key1') self.assertEqual(len(stack), 0) # value should be an integer stack = [data, 0, 'key1', struct.pack('!I', 100), 'key2', b'not_an_int', 'key3', bytes([2])] with self.assertRaises(VerifyFailed): - op_data_match_value(ScriptContext(stack=stack, logs=[], extras=Mock())) + op_data_match_value(ScriptContext(stack=stack, logs=[], extras=Mock(), settings=Mock())) def test_find_p2pkh(self): with self.assertRaises(MissingStackItems): - op_find_p2pkh(ScriptContext(stack=[], logs=[], extras=Mock())) + op_find_p2pkh(ScriptContext(stack=[], logs=[], extras=Mock(), settings=Mock())) addr1 = '15d14K5jMqsN2uwUEFqiPG5SoD7Vr1BfnH' addr2 = '1K35zJQeYrVzQAW7X3s7vbPKmngj5JXTBc' @@ -502,64 +520,72 @@ def test_find_p2pkh(self): out_genesis = P2PKH.create_output_script(genesis_address) from hathor.transaction import Transaction, TxInput, TxOutput - spent_tx = Transaction(outputs=[TxOutput(1, b'nano_contract_code')]) - txin = TxInput(b'dont_care', 0, b'data') + spent_tx = Transaction(hash=b'some_hash', outputs=[TxOutput(1, b'nano_contract_code')]) + txin = TxInput(b'some_hash', 0, b'data') # try with just 1 output stack = [genesis_address] - tx = Transaction(outputs=[TxOutput(1, out_genesis)]) - extras = ScriptExtras(tx=tx, txin=txin, spent_tx=spent_tx) - op_find_p2pkh(ScriptContext(stack=stack, logs=[], extras=extras)) + tx = Transaction(inputs=[txin], outputs=[TxOutput(1, out_genesis)]) + extras = ScriptExtras(tx=tx, spent_tx=spent_tx, input_index=0) + op_find_p2pkh(ScriptContext(stack=stack, logs=[], extras=extras, settings=Mock())) self.assertEqual(stack.pop(), 1) # several outputs and correct output among them stack = [genesis_address] - tx = Transaction(outputs=[TxOutput(1, out1), TxOutput(1, out2), TxOutput(1, out_genesis), TxOutput(1, out3)]) - extras = ScriptExtras(tx=tx, txin=txin, spent_tx=spent_tx) - op_find_p2pkh(ScriptContext(stack=stack, logs=[], extras=extras)) + tx = Transaction( + inputs=[txin], + outputs=[TxOutput(1, out1), TxOutput(1, out2), TxOutput(1, out_genesis), TxOutput(1, out3)] + ) + extras = ScriptExtras(tx=tx, spent_tx=spent_tx, input_index=0) + op_find_p2pkh(ScriptContext(stack=stack, logs=[], extras=extras, settings=Mock())) self.assertEqual(stack.pop(), 1) # several outputs without correct amount output stack = [genesis_address] - tx = Transaction(outputs=[TxOutput(1, out1), TxOutput(1, out2), TxOutput(2, out_genesis), TxOutput(1, out3)]) - extras = ScriptExtras(tx=tx, txin=txin, spent_tx=spent_tx) + tx = Transaction( + inputs=[txin], + outputs=[TxOutput(1, out1), TxOutput(1, out2), TxOutput(2, out_genesis), TxOutput(1, out3)] + ) + extras = ScriptExtras(tx=tx, spent_tx=spent_tx, input_index=0) with self.assertRaises(VerifyFailed): - op_find_p2pkh(ScriptContext(stack=stack, logs=[], extras=extras)) + op_find_p2pkh(ScriptContext(stack=stack, logs=[], extras=extras, settings=Mock())) # several outputs without correct address output stack = [genesis_address] - tx = Transaction(outputs=[TxOutput(1, out1), TxOutput(1, out2), TxOutput(1, out3)]) - extras = ScriptExtras(tx=tx, txin=txin, spent_tx=spent_tx) + tx = Transaction(inputs=[txin], outputs=[TxOutput(1, out1), TxOutput(1, out2), TxOutput(1, out3)]) + extras = ScriptExtras(tx=tx, spent_tx=spent_tx, input_index=0) with self.assertRaises(VerifyFailed): - op_find_p2pkh(ScriptContext(stack=stack, logs=[], extras=extras)) + op_find_p2pkh(ScriptContext(stack=stack, logs=[], extras=extras, settings=Mock())) def test_greaterthan_timestamp(self): with self.assertRaises(MissingStackItems): - op_greaterthan_timestamp(ScriptContext(stack=[], logs=[], extras=Mock())) + op_greaterthan_timestamp(ScriptContext(stack=[], logs=[], extras=Mock(), settings=Mock())) timestamp = 1234567 - from hathor.transaction import Transaction - tx = Transaction() + from hathor.transaction import Transaction, TxInput + spent_tx = Transaction(hash=b'some_hash') + tx_input = TxInput(tx_id=b'some_hash', index=0, data=b'') + tx = Transaction(inputs=[tx_input]) stack = [struct.pack('!I', timestamp)] - extras = ScriptExtras(tx=tx, txin=Mock(), spent_tx=Mock()) + extras = ScriptExtras(tx=tx, spent_tx=spent_tx, input_index=0) with self.assertRaises(TimeLocked): tx.timestamp = timestamp - 1 - op_greaterthan_timestamp(ScriptContext(stack=list(stack), logs=[], extras=extras)) + op_greaterthan_timestamp(ScriptContext(stack=list(stack), logs=[], extras=extras, settings=Mock())) with self.assertRaises(TimeLocked): tx.timestamp = timestamp - op_greaterthan_timestamp(ScriptContext(stack=list(stack), logs=[], extras=extras)) + op_greaterthan_timestamp(ScriptContext(stack=list(stack), logs=[], extras=extras, settings=Mock())) tx.timestamp = timestamp + 1 - op_greaterthan_timestamp(ScriptContext(stack=stack, logs=[], extras=extras)) + op_greaterthan_timestamp(ScriptContext(stack=stack, logs=[], extras=extras, settings=Mock())) self.assertEqual(len(stack), 0) def test_checkmultisig(self): with self.assertRaises(MissingStackItems): - op_checkmultisig(ScriptContext(stack=[], logs=[], extras=Mock())) + op_checkmultisig(ScriptContext(stack=[], logs=[], extras=Mock(), settings=Mock())) block = self.genesis_blocks[0] @@ -569,7 +595,7 @@ def test_checkmultisig(self): tx = Transaction(inputs=[txin], outputs=[txout]) data_to_sign = tx.get_sighash_all() - extras = ScriptExtras(tx=tx, txin=Mock(), spent_tx=Mock()) + extras = ScriptExtras(tx=tx, spent_tx=block, input_index=0) wallet = HDWallet() wallet._manually_initialize() @@ -598,92 +624,92 @@ def test_checkmultisig(self): stack = [ keys[0]['signature'], keys[2]['signature'], 2, keys[0]['pubkey'], keys[1]['pubkey'], keys[2]['pubkey'], 3 ] - op_checkmultisig(ScriptContext(stack=stack, logs=[], extras=extras)) + op_checkmultisig(ScriptContext(stack=stack, logs=[], extras=extras, settings=Mock())) self.assertEqual(1, stack.pop()) # New set of valid signatures stack = [ keys[0]['signature'], keys[1]['signature'], 2, keys[0]['pubkey'], keys[1]['pubkey'], keys[2]['pubkey'], 3 ] - op_checkmultisig(ScriptContext(stack=stack, logs=[], extras=extras)) + op_checkmultisig(ScriptContext(stack=stack, logs=[], extras=extras, settings=Mock())) self.assertEqual(1, stack.pop()) # Changing the signatures but they match stack = [ keys[1]['signature'], keys[2]['signature'], 2, keys[0]['pubkey'], keys[1]['pubkey'], keys[2]['pubkey'], 3 ] - op_checkmultisig(ScriptContext(stack=stack, logs=[], extras=extras)) + op_checkmultisig(ScriptContext(stack=stack, logs=[], extras=extras, settings=Mock())) self.assertEqual(1, stack.pop()) # Signatures are valid but in wrong order stack = [ keys[1]['signature'], keys[0]['signature'], 2, keys[0]['pubkey'], keys[1]['pubkey'], keys[2]['pubkey'], 3 ] - op_checkmultisig(ScriptContext(stack=stack, logs=[], extras=extras)) + op_checkmultisig(ScriptContext(stack=stack, logs=[], extras=extras, settings=Mock())) self.assertEqual(0, stack.pop()) # Adding wrong signature, so we get error stack = [ keys[0]['signature'], wrong_key['signature'], 2, keys[0]['pubkey'], keys[1]['pubkey'], keys[2]['pubkey'], 3 ] - op_checkmultisig(ScriptContext(stack=stack, logs=[], extras=extras)) + op_checkmultisig(ScriptContext(stack=stack, logs=[], extras=extras, settings=Mock())) self.assertEqual(0, stack.pop()) # Adding same signature twice, so we get error stack = [ keys[0]['signature'], keys[0]['signature'], 2, keys[0]['pubkey'], keys[1]['pubkey'], keys[2]['pubkey'], 3 ] - op_checkmultisig(ScriptContext(stack=stack, logs=[], extras=extras)) + op_checkmultisig(ScriptContext(stack=stack, logs=[], extras=extras, settings=Mock())) self.assertEqual(0, stack.pop()) # Adding less signatures than required, so we get error stack = [keys[0]['signature'], 2, keys[0]['pubkey'], keys[1]['pubkey'], keys[2]['pubkey'], 3] with self.assertRaises(MissingStackItems): - op_checkmultisig(ScriptContext(stack=stack, logs=[], extras=extras)) + op_checkmultisig(ScriptContext(stack=stack, logs=[], extras=extras, settings=Mock())) # Quantity of signatures is more than it should stack = [ keys[0]['signature'], keys[1]['signature'], 3, keys[0]['pubkey'], keys[1]['pubkey'], keys[2]['pubkey'], 3 ] with self.assertRaises(MissingStackItems): - op_checkmultisig(ScriptContext(stack=stack, logs=[], extras=extras)) + op_checkmultisig(ScriptContext(stack=stack, logs=[], extras=extras, settings=Mock())) # Quantity of pubkeys is more than it should stack = [ keys[0]['signature'], keys[1]['signature'], 2, keys[0]['pubkey'], keys[1]['pubkey'], keys[2]['pubkey'], 4 ] with self.assertRaises(InvalidStackData): - op_checkmultisig(ScriptContext(stack=stack, logs=[], extras=extras)) + op_checkmultisig(ScriptContext(stack=stack, logs=[], extras=extras, settings=Mock())) # Exception pubkey_count should be integer stack = [ keys[0]['signature'], keys[1]['signature'], 2, keys[0]['pubkey'], keys[1]['pubkey'], keys[2]['pubkey'], '3' ] with self.assertRaises(InvalidStackData): - op_checkmultisig(ScriptContext(stack=stack, logs=[], extras=extras)) + op_checkmultisig(ScriptContext(stack=stack, logs=[], extras=extras, settings=Mock())) # Exception not enough pub keys stack = [keys[0]['pubkey'], keys[1]['pubkey'], 3] with self.assertRaises(MissingStackItems): - op_checkmultisig(ScriptContext(stack=stack, logs=[], extras=extras)) + op_checkmultisig(ScriptContext(stack=stack, logs=[], extras=extras, settings=Mock())) # Exception stack empty after pubkeys stack = [keys[0]['pubkey'], keys[1]['pubkey'], keys[2]['pubkey'], 3] with self.assertRaises(MissingStackItems): - op_checkmultisig(ScriptContext(stack=stack, logs=[], extras=extras)) + op_checkmultisig(ScriptContext(stack=stack, logs=[], extras=extras, settings=Mock())) def test_equal(self): elem = b'a' with self.assertRaises(MissingStackItems): - op_equal(ScriptContext(stack=[elem], logs=[], extras=Mock())) + op_equal(ScriptContext(stack=[elem], logs=[], extras=Mock(), settings=Mock())) # no exception should be raised stack = [elem, elem] - op_equal(ScriptContext(stack=stack, logs=[], extras=Mock())) + op_equal(ScriptContext(stack=stack, logs=[], extras=Mock(), settings=Mock())) self.assertEqual(stack.pop(), 1) stack = [elem, b'aaaa'] - op_equal(ScriptContext(stack=stack, logs=[], extras=Mock())) + op_equal(ScriptContext(stack=stack, logs=[], extras=Mock(), settings=Mock())) self.assertEqual(stack.pop(), 0) def test_integer_opcode(self): @@ -966,3 +992,166 @@ def test_get_sigops_count(self): self.assertEqual(get_sigops_count(input_script.data, p2pkh_script), 1) # if no output_script, count only input self.assertEqual(get_sigops_count(input_script.data), 1) + + def test_op_sighash_bitmask(self) -> None: + with self.assertRaises(MissingStackItems): + op_sighash_bitmask(ScriptContext(stack=[], extras=Mock(), logs=[], settings=Mock())) + + with self.assertRaises(MissingStackItems): + op_sighash_bitmask(ScriptContext(stack=[b''], extras=Mock(), logs=[], settings=Mock())) + + with self.assertRaises(AssertionError): + op_sighash_bitmask(ScriptContext(stack=[0b111, 0b101], extras=Mock(), logs=[], settings=Mock())) + + stack: list[bytes | int | str] = [bytes([0b0]), bytes([0xFF, 0xFF])] + context = Mock(spec_set=ScriptContext) + extras = Mock(spec_set=ScriptExtras) + context.stack = stack + context.extras = extras + + with self.assertRaises(CustomSighashModelInvalid): + op_sighash_bitmask(context) + + context.stack = [bytes([0b111]), bytes([0b101])] + extras.input_index = 3 + + with self.assertRaises(InputNotSelectedError): + op_sighash_bitmask(context) + + context.stack = [bytes([0b111]), bytes([0b101])] + extras.input_index = 2 + op_sighash_bitmask(context) + + self.assertEqual(stack, []) + context.set_sighash.assert_called_once_with( + SighashBitmask( + inputs=0b111, + outputs=0b101 + ) + ) + + def test_op_sighash_range(self) -> None: + context = Mock(spec_set=ScriptContext) + with self.assertRaises(MissingStackItems): + context.stack = [] + op_sighash_range(context) + + with self.assertRaises(MissingStackItems): + context.stack = [b'', b'', b''] + op_sighash_range(context) + + with self.assertRaises(AssertionError): + context.stack = [10, 20, 30, 40] + op_sighash_range(context) + + context.stack = [bytes([1, 2]), bytes([3, 5]), bytes([5, 6]), bytes([7, 8])] + context.extras = Mock(spec_set=ScriptExtras) + + with self.assertRaises(CustomSighashModelInvalid): + op_sighash_range(context) + + context.stack = [bytes([10]), bytes([20]), bytes([30]), bytes([40])] + context.extras.input_index = 3 + + with self.assertRaises(InputNotSelectedError): + op_sighash_range(context) + + context.stack = [bytes([10]), bytes([20]), bytes([30]), bytes([40])] + context.extras.input_index = 15 + op_sighash_range(context) + + self.assertEqual(context.stack, []) + context.set_sighash.assert_called_once_with( + SighashRange( + input_start=10, + input_end=20, + output_start=30, + output_end=40, + ) + ) + + def test_op_max_inputs_outputs(self) -> None: + context = Mock(spec_set=ScriptContext) + with self.assertRaises(MissingStackItems): + context.stack = [] + op_max_inputs_outputs(context) + + with self.assertRaises(MissingStackItems): + context.stack = [b''] + op_max_inputs_outputs(context) + + with self.assertRaises(AssertionError): + context.stack = [1, 2] + op_max_inputs_outputs(context) + + context.stack = [bytes([0]), bytes([0])] + context.extras = Mock(spec_set=ScriptExtras) + + with self.assertRaises(InputsOutputsLimitModelInvalid): + op_max_inputs_outputs(context) + + context.stack = [bytes([1]), bytes([2])] + context.extras.tx.inputs = ['a', 'b'] + + with self.assertRaises(MaxInputsExceededError): + op_max_inputs_outputs(context) + + context.stack = [bytes([1]), bytes([2])] + context.extras.tx.inputs = ['a'] + context.extras.tx.outputs = ['a', 'b', 'c'] + + with self.assertRaises(MaxOutputsExceededError): + op_max_inputs_outputs(context) + + context.stack = [bytes([1]), bytes([2])] + context.extras.tx.inputs = ['a'] + context.extras.tx.outputs = ['a', 'b'] + + op_max_inputs_outputs(context) + + self.assertEqual(context.stack, []) + + def test_op_max_sighash_subsets(self) -> None: + with self.assertRaises(MissingStackItems): + op_max_sighash_subsets(ScriptContext(stack=[], extras=Mock(), logs=[], settings=Mock())) + + with self.assertRaises(AssertionError): + op_max_sighash_subsets(ScriptContext(stack=[7], extras=Mock(), logs=[], settings=Mock())) + + stack: list[bytes | int | str] = [bytes([7])] + context = Mock(spec_set=ScriptContext) + extras = Mock(spec_set=ScriptExtras) + context.stack = stack + context.extras = extras + + op_max_sighash_subsets(context) + + self.assertEqual(stack, []) + context.set_max_sighash_subsets.assert_called_once_with(7) + + def test_execute_op_code(self) -> None: + # Test that when `is_opcode_valid` returns False, execution must fail, regardless of the opcode. + with ( + patch('hathor.transaction.scripts.opcode.is_opcode_valid', lambda _: False), + self.assertRaises(ScriptError) + ): + execute_op_code(opcode=Mock(), context=Mock()) + + # Test that when `is_opcode_valid` returns True, execution must fail if it's not a "function opcode". + with ( + patch('hathor.transaction.scripts.opcode.is_opcode_valid', lambda _: True), + self.assertRaises(ScriptError) + ): + execute_op_code(opcode=Opcode.OP_0, context=Mock()) + + # Test that a valid opcode is correctly executed. + with patch('hathor.transaction.scripts.opcode.op_dup') as op_mock: + execute_op_code(opcode=Opcode.OP_DUP, context=Mock()) + + op_mock.assert_called_once() + + def test_is_opcode_valid(self) -> None: + self.assertTrue(is_opcode_valid(Opcode.OP_DUP)) + self.assertFalse(is_opcode_valid(Opcode.OP_SIGHASH_BITMASK)) + self.assertFalse(is_opcode_valid(Opcode.OP_SIGHASH_RANGE)) + self.assertFalse(is_opcode_valid(Opcode.OP_MAX_INPUTS_OUTPUTS)) diff --git a/tests/tx/test_tx.py b/tests/tx/test_tx.py index 833d158d2..3bc18bbd3 100644 --- a/tests/tx/test_tx.py +++ b/tests/tx/test_tx.py @@ -24,16 +24,19 @@ InvalidOutputScriptSize, InvalidOutputValue, NoInputError, + OutputNotSelected, ParentDoesNotExist, PowError, TimestampError, TooManyInputs, TooManyOutputs, + TooManySighashSubsets, TooManySigOps, TransactionDataError, WeightError, ) from hathor.transaction.scripts import P2PKH, parse_address_script +from hathor.transaction.scripts.sighash import SighashBitmask from hathor.transaction.util import int_to_bytes from hathor.transaction.validation_state import ValidationState from hathor.wallet import Wallet @@ -151,6 +154,68 @@ def test_too_many_outputs(self): with self.assertRaises(TooManyOutputs): self._verifiers.vertex.verify_number_of_outputs(tx) + @patch('hathor.transaction.scripts.opcode.is_opcode_valid', lambda _: True) + def test_output_not_selected(self) -> None: + parents = [tx.hash for tx in self.genesis_txs] + genesis_block = self.genesis_blocks[0] + + value = genesis_block.outputs[0].value + address = get_address_from_public_key(self.genesis_public_key) + script = P2PKH.create_output_script(address) + output = TxOutput(value, script) + + tx_input = TxInput(genesis_block.hash, 0, b'') + tx = Transaction( + weight=1, + inputs=[tx_input], + outputs=[output], + parents=parents, + storage=self.tx_storage, + timestamp=self.last_block.timestamp + 1 + ) + + sighash = SighashBitmask(inputs=0b1, outputs=0b0) + data_to_sign = tx.get_custom_sighash_data(sighash) + public_bytes, signature = self.wallet.get_input_aux_data(data_to_sign, self.genesis_private_key) + tx_input.data = P2PKH.create_input_data(public_bytes, signature, sighash=sighash) + + self.manager.cpu_mining_service.resolve(tx) + with pytest.raises(OutputNotSelected) as e: + self.manager.verification_service.verify(tx) + + self.assertEqual(str(e.value), "Output at index 0 is not signed by any input.") + + @patch('hathor.transaction.scripts.opcode.is_opcode_valid', lambda _: True) + def test_too_many_sighash_subsets(self) -> None: + parents = [tx.hash for tx in self.genesis_txs] + genesis_block = self.genesis_blocks[0] + + value = genesis_block.outputs[0].value + address = get_address_from_public_key(self.genesis_public_key) + script = P2PKH.create_output_script(address) + output = TxOutput(value, script) + + tx_input = TxInput(genesis_block.hash, 0, b'') + tx = Transaction( + weight=1, + inputs=[tx_input], + outputs=[output], + parents=parents, + storage=self.tx_storage, + timestamp=self.last_block.timestamp + 1 + ) + + sighash = SighashBitmask(inputs=0b1, outputs=0b1) + data_to_sign = tx.get_custom_sighash_data(sighash) + public_bytes, signature = self.wallet.get_input_aux_data(data_to_sign, self.genesis_private_key) + tx_input.data = P2PKH.create_input_data(public_bytes, signature, sighash=sighash, max_sighash_subsets=0) + + self.manager.cpu_mining_service.resolve(tx) + with pytest.raises(TooManySighashSubsets) as e: + self.manager.verification_service.verify(tx) + + self.assertEqual(str(e.value), "There are more custom sighash subsets than the configured maximum (1 > 0).") + def _gen_tx_spending_genesis_block(self): parents = [tx.hash for tx in self.genesis_txs] genesis_block = self.genesis_blocks[0] @@ -1058,7 +1123,7 @@ def test_wallet_index(self): self.assertEqual(len(self.tx_storage.indexes.addresses.get_from_address(output3_address_b58)), 1) self.assertEqual(len(self.tx_storage.indexes.addresses.get_from_address(new_address_b58)), 1) - def test_sighash_cache(self): + def test_sighash_all_cache(self): from unittest import mock address = get_address_from_public_key(self.genesis_public_key) diff --git a/tests/tx/test_verification.py b/tests/tx/test_verification.py index ee92a3f22..3341b24a1 100644 --- a/tests/tx/test_verification.py +++ b/tests/tx/test_verification.py @@ -601,7 +601,7 @@ def test_transaction_verify(self) -> None: verify_sigops_output_wrapped = Mock(wraps=self.verifiers.vertex.verify_sigops_output) verify_sigops_input_wrapped = Mock(wraps=self.verifiers.tx.verify_sigops_input) verify_inputs_wrapped = Mock(wraps=self.verifiers.tx.verify_inputs) - verify_script_wrapped = Mock(wraps=self.verifiers.tx.verify_script) + verify_scripts_wrapped = Mock(wraps=self.verifiers.tx.verify_scripts) verify_parents_wrapped = Mock(wraps=self.verifiers.vertex.verify_parents) verify_sum_wrapped = Mock(wraps=self.verifiers.tx.verify_sum) verify_reward_locked_wrapped = Mock(wraps=self.verifiers.tx.verify_reward_locked) @@ -615,7 +615,7 @@ def test_transaction_verify(self) -> None: patch.object(VertexVerifier, 'verify_sigops_output', verify_sigops_output_wrapped), patch.object(TransactionVerifier, 'verify_sigops_input', verify_sigops_input_wrapped), patch.object(TransactionVerifier, 'verify_inputs', verify_inputs_wrapped), - patch.object(TransactionVerifier, 'verify_script', verify_script_wrapped), + patch.object(TransactionVerifier, 'verify_scripts', verify_scripts_wrapped), patch.object(VertexVerifier, 'verify_parents', verify_parents_wrapped), patch.object(TransactionVerifier, 'verify_sum', verify_sum_wrapped), patch.object(TransactionVerifier, 'verify_reward_locked', verify_reward_locked_wrapped), @@ -633,7 +633,7 @@ def test_transaction_verify(self) -> None: verify_sigops_output_wrapped.assert_called_once() verify_sigops_input_wrapped.assert_called_once() verify_inputs_wrapped.assert_called_once() - verify_script_wrapped.assert_called_once() + verify_scripts_wrapped.assert_called_once() verify_parents_wrapped.assert_called_once() verify_sum_wrapped.assert_called_once() verify_reward_locked_wrapped.assert_called_once() @@ -735,7 +735,7 @@ def test_transaction_validate_full(self) -> None: verify_sigops_output_wrapped = Mock(wraps=self.verifiers.vertex.verify_sigops_output) verify_sigops_input_wrapped = Mock(wraps=self.verifiers.tx.verify_sigops_input) verify_inputs_wrapped = Mock(wraps=self.verifiers.tx.verify_inputs) - verify_script_wrapped = Mock(wraps=self.verifiers.tx.verify_script) + verify_scripts_wrapped = Mock(wraps=self.verifiers.tx.verify_scripts) verify_parents_wrapped = Mock(wraps=self.verifiers.vertex.verify_parents) verify_sum_wrapped = Mock(wraps=self.verifiers.tx.verify_sum) verify_reward_locked_wrapped = Mock(wraps=self.verifiers.tx.verify_reward_locked) @@ -752,7 +752,7 @@ def test_transaction_validate_full(self) -> None: patch.object(VertexVerifier, 'verify_sigops_output', verify_sigops_output_wrapped), patch.object(TransactionVerifier, 'verify_sigops_input', verify_sigops_input_wrapped), patch.object(TransactionVerifier, 'verify_inputs', verify_inputs_wrapped), - patch.object(TransactionVerifier, 'verify_script', verify_script_wrapped), + patch.object(TransactionVerifier, 'verify_scripts', verify_scripts_wrapped), patch.object(VertexVerifier, 'verify_parents', verify_parents_wrapped), patch.object(TransactionVerifier, 'verify_sum', verify_sum_wrapped), patch.object(TransactionVerifier, 'verify_reward_locked', verify_reward_locked_wrapped), @@ -773,7 +773,7 @@ def test_transaction_validate_full(self) -> None: assert verify_sigops_output_wrapped.call_count == 2 verify_sigops_input_wrapped.assert_called_once() verify_inputs_wrapped.assert_called_once() - verify_script_wrapped.assert_called_once() + verify_scripts_wrapped.assert_called_once() verify_parents_wrapped.assert_called_once() verify_sum_wrapped.assert_called_once() verify_reward_locked_wrapped.assert_called_once() @@ -896,7 +896,7 @@ def test_token_creation_transaction_verify(self) -> None: verify_sigops_output_wrapped = Mock(wraps=self.verifiers.vertex.verify_sigops_output) verify_sigops_input_wrapped = Mock(wraps=self.verifiers.tx.verify_sigops_input) verify_inputs_wrapped = Mock(wraps=self.verifiers.tx.verify_inputs) - verify_script_wrapped = Mock(wraps=self.verifiers.tx.verify_script) + verify_scripts_wrapped = Mock(wraps=self.verifiers.tx.verify_scripts) verify_parents_wrapped = Mock(wraps=self.verifiers.vertex.verify_parents) verify_sum_wrapped = Mock(wraps=self.verifiers.tx.verify_sum) verify_reward_locked_wrapped = Mock(wraps=self.verifiers.tx.verify_reward_locked) @@ -913,7 +913,7 @@ def test_token_creation_transaction_verify(self) -> None: patch.object(VertexVerifier, 'verify_sigops_output', verify_sigops_output_wrapped), patch.object(TransactionVerifier, 'verify_sigops_input', verify_sigops_input_wrapped), patch.object(TransactionVerifier, 'verify_inputs', verify_inputs_wrapped), - patch.object(TransactionVerifier, 'verify_script', verify_script_wrapped), + patch.object(TransactionVerifier, 'verify_scripts', verify_scripts_wrapped), patch.object(VertexVerifier, 'verify_parents', verify_parents_wrapped), patch.object(TransactionVerifier, 'verify_sum', verify_sum_wrapped), patch.object(TransactionVerifier, 'verify_reward_locked', verify_reward_locked_wrapped), @@ -933,7 +933,7 @@ def test_token_creation_transaction_verify(self) -> None: verify_sigops_output_wrapped.assert_called_once() verify_sigops_input_wrapped.assert_called_once() verify_inputs_wrapped.assert_called_once() - verify_script_wrapped.assert_called_once() + verify_scripts_wrapped.assert_called_once() verify_parents_wrapped.assert_called_once() verify_sum_wrapped.assert_called_once() verify_reward_locked_wrapped.assert_called_once() @@ -1038,7 +1038,7 @@ def test_token_creation_transaction_validate_full(self) -> None: verify_sigops_output_wrapped = Mock(wraps=self.verifiers.vertex.verify_sigops_output) verify_sigops_input_wrapped = Mock(wraps=self.verifiers.tx.verify_sigops_input) verify_inputs_wrapped = Mock(wraps=self.verifiers.tx.verify_inputs) - verify_script_wrapped = Mock(wraps=self.verifiers.tx.verify_script) + verify_scripts_wrapped = Mock(wraps=self.verifiers.tx.verify_scripts) verify_parents_wrapped = Mock(wraps=self.verifiers.vertex.verify_parents) verify_sum_wrapped = Mock(wraps=self.verifiers.tx.verify_sum) verify_reward_locked_wrapped = Mock(wraps=self.verifiers.tx.verify_reward_locked) @@ -1058,7 +1058,7 @@ def test_token_creation_transaction_validate_full(self) -> None: patch.object(VertexVerifier, 'verify_sigops_output', verify_sigops_output_wrapped), patch.object(TransactionVerifier, 'verify_sigops_input', verify_sigops_input_wrapped), patch.object(TransactionVerifier, 'verify_inputs', verify_inputs_wrapped), - patch.object(TransactionVerifier, 'verify_script', verify_script_wrapped), + patch.object(TransactionVerifier, 'verify_scripts', verify_scripts_wrapped), patch.object(VertexVerifier, 'verify_parents', verify_parents_wrapped), patch.object(TransactionVerifier, 'verify_sum', verify_sum_wrapped), patch.object(TransactionVerifier, 'verify_reward_locked', verify_reward_locked_wrapped), @@ -1081,7 +1081,7 @@ def test_token_creation_transaction_validate_full(self) -> None: assert verify_sigops_output_wrapped.call_count == 2 verify_sigops_input_wrapped.assert_called_once() verify_inputs_wrapped.assert_called_once() - verify_script_wrapped.assert_called_once() + verify_scripts_wrapped.assert_called_once() verify_parents_wrapped.assert_called_once() verify_sum_wrapped.assert_called_once() verify_reward_locked_wrapped.assert_called_once() diff --git a/tests/wallet/test_wallet_hd.py b/tests/wallet/test_wallet_hd.py index 3366df47c..9de6afbe5 100644 --- a/tests/wallet/test_wallet_hd.py +++ b/tests/wallet/test_wallet_hd.py @@ -41,7 +41,7 @@ def test_transaction_and_balance(self): tx1 = self.wallet.prepare_transaction_compute_inputs(Transaction, [out], self.tx_storage) tx1.update_hash() verifier = self.manager.verification_service.verifiers.tx - verifier.verify_script(tx=tx1, input_tx=tx1.inputs[0], spent_tx=block) + verifier.verify_scripts(tx1, spent_txs={block.hash: block}) tx1.storage = self.tx_storage tx1.get_metadata().validation = ValidationState.FULL self.wallet.on_new_tx(tx1) @@ -62,7 +62,7 @@ def test_transaction_and_balance(self): tx2.storage = self.tx_storage tx2.update_hash() tx2.storage = self.tx_storage - verifier.verify_script(tx=tx2, input_tx=tx2.inputs[0], spent_tx=tx1) + verifier.verify_scripts(tx2, spent_txs={tx1.hash: tx1}) tx2.get_metadata().validation = ValidationState.FULL tx2.init_static_metadata_from_storage(self._settings, self.tx_storage) self.tx_storage.save_transaction(tx2)