Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
220 changes: 220 additions & 0 deletions tests/nanocontracts/blueprints/test_bet.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,220 @@
import os
import re
from typing import NamedTuple, Optional

from hathor.conf import HathorSettings
from hathor.crypto.util import decode_address
from hathor.nanocontracts.context import Context
from hathor.nanocontracts.nc_types import NCType, make_nc_type_for_type
from hathor.nanocontracts.types import (
Address,
Amount,
ContractId,
NCDepositAction,
NCWithdrawalAction,
SignedData,
Timestamp,
TokenUid,
TxOutputScript,
VertexId,
)
from hathor.transaction import BaseTransaction
from hathor.transaction.scripts import P2PKH
from hathor.util import not_none
from hathor.wallet import KeyPair
from tests.nanocontracts.blueprints.unittest import BlueprintTestCase
from tests.nanocontracts.test_blueprints.bet import Bet

settings = HathorSettings()

TX_OUTPUT_SCRIPT_NC_TYPE = make_nc_type_for_type(TxOutputScript)
RESULT_NC_TYPE: NCType[str | None] = make_nc_type_for_type(str | None) # type: ignore[arg-type]
TIMESTAMP_NC_TYPE = make_nc_type_for_type(Timestamp)
TOKEN_UID_NC_TYPE = make_nc_type_for_type(TokenUid)


class BetInfo(NamedTuple):
key: KeyPair
address: Address
amount: Amount
score: str


class NCBetBlueprintTestCase(BlueprintTestCase):
def setUp(self):
super().setUp()
self.blueprint_id = self.gen_random_blueprint_id()
self.register_blueprint_class(self.blueprint_id, Bet)
self.token_uid = TokenUid(settings.HATHOR_TOKEN_UID)
self.nc_id = ContractId(VertexId(b'1' * 32))
self.initialize_contract()
self.nc_storage = self.runner.get_storage(self.nc_id)

def _get_any_tx(self) -> BaseTransaction:
genesis = self.manager.tx_storage.get_all_genesis()
tx = [t for t in genesis if t.is_transaction][0]
return tx

def _get_any_address(self) -> tuple[Address, KeyPair]:
password = os.urandom(12)
key = KeyPair.create(password)
address_b58 = key.address
address_bytes = Address(decode_address(not_none(address_b58)))
return address_bytes, key

def get_current_timestamp(self) -> int:
return int(self.clock.seconds())

def _make_a_bet(self, amount: int, score: str, *, timestamp: Optional[int] = None) -> BetInfo:
(address_bytes, key) = self._get_any_address()
tx = self._get_any_tx()
action = NCDepositAction(token_uid=self.token_uid, amount=amount)
if timestamp is None:
timestamp = self.get_current_timestamp()
context = Context([action], tx, address_bytes, timestamp=timestamp)
self.runner.call_public_method(self.nc_id, 'bet', context, address_bytes, score)
return BetInfo(key=key, address=Address(address_bytes), amount=Amount(amount), score=score)

def _set_result(self, result: str, oracle_key: Optional[KeyPair] = None) -> None:
signed_result = SignedData[str](result, b'')

if oracle_key is None:
oracle_key = self.oracle_key

result_bytes = signed_result.get_data_bytes(self.nc_id)
signed_result.script_input = oracle_key.p2pkh_create_input_data(b'123', result_bytes)

tx = self._get_any_tx()
context = Context([], tx, Address(b''), timestamp=self.get_current_timestamp())
self.runner.call_public_method(self.nc_id, 'set_result', context, signed_result)
final_result = self.nc_storage.get_obj(b'final_result', RESULT_NC_TYPE)
self.assertEqual(final_result, '2x2')

def _withdraw(self, address: Address, amount: int) -> None:
tx = self._get_any_tx()
action = NCWithdrawalAction(token_uid=self.token_uid, amount=amount)
context = Context([action], tx, address, timestamp=self.get_current_timestamp())
self.runner.call_public_method(self.nc_id, 'withdraw', context)

def initialize_contract(self) -> None:
self.oracle_key = KeyPair.create(b'123')
assert self.oracle_key.address is not None
self.oracle_script = P2PKH(self.oracle_key.address).get_script()
self.date_last_bet = self.get_current_timestamp() + 3600 * 24
self.runner.create_contract(
self.nc_id,
self.blueprint_id,
Context([], self._get_any_tx(), Address(b''), timestamp=self.get_current_timestamp()),
self.oracle_script,
self.token_uid,
self.date_last_bet,
)

def test_blueprint_initialization(self) -> None:
# if initialization was correct we should be able to observe these in the nc_storage:
self.assertEqual(self.nc_storage.get_obj(b'oracle_script', TX_OUTPUT_SCRIPT_NC_TYPE), self.oracle_script)
self.assertEqual(self.nc_storage.get_obj(b'token_uid', TOKEN_UID_NC_TYPE), self.token_uid)
self.assertEqual(self.nc_storage.get_obj(b'date_last_bet', TIMESTAMP_NC_TYPE), self.date_last_bet)

def test_basic_flow(self) -> None:
runner = self.runner

tx = self._get_any_tx()

###
# Make some bets.
###
self._make_a_bet(100, '1x1')
self._make_a_bet(200, '1x1')
self._make_a_bet(300, '1x1')
bet1 = self._make_a_bet(500, '2x2')

###
# Set the final result.
###
self._set_result('2x2')

###
# Single winner withdraws all funds.
###
self.assertEqual(1100, runner.call_view_method(self.nc_id, 'get_max_withdrawal', bet1.address))

self._withdraw(bet1.address, 100)
self.assertEqual(1000, runner.call_view_method(self.nc_id, 'get_max_withdrawal', bet1.address))

self._withdraw(bet1.address, 1000)
self.assertEqual(0, runner.call_view_method(self.nc_id, 'get_max_withdrawal', bet1.address))

# Out of funds! Any withdrawal must fail from now on...
amount = 1
action = NCWithdrawalAction(token_uid=self.token_uid, amount=amount)
context = Context([action], tx, bet1.address, timestamp=self.get_current_timestamp())
with self.assertNCFail('InsufficientBalance', 'withdrawal amount is greater than available (max: 0)'):
runner.call_public_method(self.nc_id, 'withdraw', context)

def test_make_a_bet_with_withdrawal(self) -> None:
self._make_a_bet(100, '1x1')

(address_bytes, _) = self._get_any_address()
tx = self._get_any_tx()
action = NCWithdrawalAction(token_uid=self.token_uid, amount=1)
context = Context([action], tx, address_bytes, timestamp=self.get_current_timestamp())
score = '1x1'
with self.assertNCFail('NCForbiddenAction', 'action WITHDRAWAL is forbidden on method `bet`'):
self.runner.call_public_method(self.nc_id, 'bet', context, address_bytes, score)

def test_make_a_bet_after_result(self) -> None:
self._make_a_bet(100, '1x1')
self._set_result('2x2')
with self.assertNCFail('ResultAlreadySet', ''):
self._make_a_bet(100, '1x1')

def test_make_a_bet_after_date_last_bet(self) -> None:
with self.assertNCFail('TooLate', re.compile(r'cannot place bets after \d+')):
self._make_a_bet(100, '1x1', timestamp=self.date_last_bet + 1)

def test_set_results_two_times(self) -> None:
self._set_result('2x2')
with self.assertNCFail('ResultAlreadySet', ''):
self._set_result('5x1')

def test_set_results_wrong_signature(self) -> None:
wrong_oracle_key = KeyPair.create(b'123')
with self.assertNCFail('InvalidOracleSignature', ''):
self._set_result('3x2', oracle_key=wrong_oracle_key)

def test_withdraw_before_result(self) -> None:
bet1 = self._make_a_bet(100, '1x1')
with self.assertNCFail('ResultNotAvailable', ''):
self._withdraw(bet1.address, 100)

def test_withdraw_with_deposits(self) -> None:
(address_bytes, _) = self._get_any_address()
tx = self._get_any_tx()
action = NCDepositAction(token_uid=self.token_uid, amount=1)
context = Context([action], tx, address_bytes, timestamp=self.get_current_timestamp())
with self.assertNCFail('NCForbiddenAction', 'action DEPOSIT is forbidden on method `withdraw`'):
self.runner.call_public_method(self.nc_id, 'withdraw', context)

def test_make_a_bet_wrong_token(self) -> None:

(address_bytes, _) = self._get_any_address()
tx = self._get_any_tx()
token_uid = TokenUid(b'xxx')
self.assertNotEqual(token_uid, self.token_uid)
action = NCDepositAction(token_uid=token_uid, amount=1)
context = Context([action], tx, address_bytes, timestamp=self.get_current_timestamp())
score = '1x1'
with self.assertNCFail('InvalidToken', 'token different from 00'):
self.runner.call_public_method(self.nc_id, 'bet', context, address_bytes, score)

def test_withdraw_wrong_token(self) -> None:
bet1 = self._make_a_bet(100, '1x1')

tx = self._get_any_tx()
token_uid = TokenUid(b'xxx')
self.assertNotEqual(token_uid, self.token_uid)
action = NCWithdrawalAction(token_uid=token_uid, amount=1)
context = Context([action], tx, bet1.address, timestamp=self.get_current_timestamp())
with self.assertNCFail('InvalidToken', 'token different from 00'):
self.runner.call_public_method(self.nc_id, 'withdraw', context)
116 changes: 116 additions & 0 deletions tests/nanocontracts/blueprints/test_swap_demo.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,116 @@
from hathor.nanocontracts.context import Context
from hathor.nanocontracts.nc_types import make_nc_type_for_type
from hathor.nanocontracts.storage.contract_storage import Balance
from hathor.nanocontracts.types import NCDepositAction, NCWithdrawalAction, TokenUid
from tests.nanocontracts.blueprints.unittest import BlueprintTestCase
from tests.nanocontracts.test_blueprints.swap_demo import InvalidActions, InvalidRatio, InvalidTokens, SwapDemo

SWAP_NC_TYPE = make_nc_type_for_type(int)


class SwapDemoTestCase(BlueprintTestCase):
def setUp(self):
super().setUp()

self.blueprint_id = self.gen_random_blueprint_id()
self.contract_id = self.gen_random_contract_id()

self.nc_catalog.blueprints[self.blueprint_id] = SwapDemo

# Test doubles:
self.token_a = self.gen_random_token_uid()
self.token_b = self.gen_random_token_uid()
self.token_c = self.gen_random_token_uid()
self.address = self.gen_random_address()
self.tx = self.get_genesis_tx()

def _initialize(
self,
init_token_a: tuple[TokenUid, int, int],
init_token_b: tuple[TokenUid, int, int]
) -> None:
# Arrange:
token_a, multiplier_a, amount_a = init_token_a
token_b, multiplier_b, amount_b = init_token_b
deposit_a = NCDepositAction(token_uid=token_a, amount=amount_a)
deposit_b = NCDepositAction(token_uid=token_b, amount=amount_b)
context = Context(
actions=[deposit_a, deposit_b],
vertex=self.tx,
address=self.address,
timestamp=self.now
)

# Act:
self.runner.create_contract(
self.contract_id,
self.blueprint_id,
context,
token_a,
token_b,
multiplier_a,
multiplier_b,
)
self.nc_storage = self.runner.get_storage(self.contract_id)

def _swap(
self,
amount_a: tuple[int, TokenUid],
amount_b: tuple[int, TokenUid]
) -> None:
# Arrange:
value_a, token_a = amount_a
value_b, token_b = amount_b
action_a_type = self.get_action_type(value_a)
action_b_type = self.get_action_type(value_b)
swap_a = action_a_type(token_uid=token_a, amount=abs(value_a))
swap_b = action_b_type(token_uid=token_b, amount=abs(value_b))
context = Context(
actions=[swap_a, swap_b],
vertex=self.tx,
address=self.address,
timestamp=self.now
)

# Act:
self.runner.call_public_method(self.contract_id, 'swap', context)

def test_lifecycle(self) -> None:
# Create a contract.
# Arrange and act within:
self._initialize((self.token_a, 1, 100_00), (self.token_b, 1, 100_00))

# Assert:
self.assertEqual(
Balance(value=100_00, can_mint=False, can_melt=False), self.nc_storage.get_balance(self.token_a)
)
self.assertEqual(
Balance(value=100_00, can_mint=False, can_melt=False), self.nc_storage.get_balance(self.token_b)
)
self.assertEqual(0, self.nc_storage.get_obj(b'swaps_counter', SWAP_NC_TYPE))

# Make a valid swap.
# Arrange and act within:
self._swap((20_00, self.token_a), (-20_00, self.token_b))
# Assert:
self.assertEqual(
Balance(value=120_00, can_mint=False, can_melt=False), self.nc_storage.get_balance(self.token_a)
)
self.assertEqual(
Balance(value=80_00, can_mint=False, can_melt=False), self.nc_storage.get_balance(self.token_b)
)
self.assertEqual(1, self.nc_storage.get_obj(b'swaps_counter', SWAP_NC_TYPE))

# Make multiple invalid swaps raising all possible exceptions.
with self.assertRaises(InvalidTokens):
self._swap((-20_00, self.token_a), (20_00, self.token_c))
with self.assertRaises(InvalidActions):
self._swap((20_00, self.token_a), (40_00, self.token_b))
with self.assertRaises(InvalidRatio):
self._swap((20_00, self.token_a), (-40_00, self.token_b))

def get_action_type(self, amount: int) -> type[NCDepositAction] | type[NCWithdrawalAction]:
if amount >= 0:
return NCDepositAction
else:
return NCWithdrawalAction
Loading