Skip to content

Commit

Permalink
use bt_decode in runtime call
Browse files Browse the repository at this point in the history
  • Loading branch information
zyzniewski-reef committed Dec 17, 2024
1 parent 075f52a commit ce86b2e
Show file tree
Hide file tree
Showing 18 changed files with 859 additions and 1,833 deletions.
128 changes: 51 additions & 77 deletions bittensor/core/async_subtensor.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,13 +11,10 @@
from bittensor_wallet.utils import SS58_FORMAT
from numpy.typing import NDArray
from scalecodec import GenericCall
from scalecodec.base import RuntimeConfiguration
from scalecodec.type_registry import load_type_registry_preset
from substrateinterface.exceptions import SubstrateRequestException

from bittensor.core.chain_data import (
DelegateInfo,
custom_rpc_type_registry,
StakeInfo,
NeuronInfoLite,
NeuronInfo,
Expand Down Expand Up @@ -50,7 +47,6 @@
from bittensor.core.settings import version_as_int
from bittensor.utils import (
torch,
ss58_to_vec_u8,
format_error_message,
decode_hex_identity_dict,
validate_chain_endpoint,
Expand Down Expand Up @@ -498,17 +494,15 @@ async def get_delegates(
List of DelegateInfo objects, or an empty list if there are no delegates.
"""
block_hash = await self._determine_block_hash(block, block_hash, reuse_block)
hex_bytes_result = await self.query_runtime_api(
result = await self.query_runtime_api(
runtime_api="DelegateInfoRuntimeApi",
method="get_delegates",
params=[],
block_hash=block_hash,
reuse_block=reuse_block,
)
if hex_bytes_result is not None:
return DelegateInfo.list_from_vec_u8(hex_to_bytes(hex_bytes_result))
else:
return []

return DelegateInfo.list_from_any(result) if result is not None else []

async def get_stake_info_for_coldkey(
self,
Expand All @@ -534,20 +528,19 @@ async def get_stake_info_for_coldkey(
Stake information is vital for account holders to assess their investment and participation in the network's
delegation and consensus processes.
"""
encoded_coldkey = ss58_to_vec_u8(coldkey_ss58)
block_hash = await self._determine_block_hash(block, block_hash, reuse_block)
hex_bytes_result = await self.query_runtime_api(
result = await self.query_runtime_api(
runtime_api="StakeInfoRuntimeApi",
method="get_stake_info_for_coldkey",
params=[encoded_coldkey],
params=[coldkey_ss58],
block_hash=block_hash,
reuse_block=reuse_block,
)

if hex_bytes_result is None:
if result is None:
return []

return StakeInfo.list_from_vec_u8(hex_to_bytes(hex_bytes_result))
return StakeInfo.list_from_any(result)

async def get_stake_for_coldkey_and_hotkey(
self,
Expand Down Expand Up @@ -585,11 +578,11 @@ async def query_runtime_api(
self,
runtime_api: str,
method: str,
params: Optional[Union[list[list[int]], dict[str, int], list[int]]],
params: Optional[Union[list[Any], dict[str, Any]]],
block: Optional[int] = None,
block_hash: Optional[str] = None,
reuse_block: bool = False,
) -> Optional[str]:
) -> Optional[Any]:
"""
Queries the runtime API of the Bittensor blockchain, providing a way to interact with the underlying runtime and
retrieve data encoded in Scale Bytes format. This function is essential for advanced users who need to
Expand All @@ -605,46 +598,17 @@ async def query_runtime_api(
reuse_block: Whether to reuse the last-used block hash. Do not set if using block_hash or block
Returns:
The Scale Bytes encoded result from the runtime API call, or `None` if the call fails.
The decoded result from the runtime API call, or `None` if the call fails.
This function enables access to the deeper layers of the Bittensor blockchain, allowing for detailed and
specific interactions with the network's runtime environment.
"""
block_hash = await self._determine_block_hash(block, block_hash, reuse_block)

call_definition = TYPE_REGISTRY["runtime_api"][runtime_api]["methods"][method]

data = (
"0x"
if params is None
else await self.encode_params(
call_definition=call_definition, params=params
)
)
api_method = f"{runtime_api}_{method}"

json_result = await self.substrate.rpc_request(
method="state_call",
params=[api_method, data, block_hash] if block_hash else [api_method, data],
reuse_block_hash=reuse_block,
result = await self.substrate.runtime_call(
runtime_api, method, params, block_hash
)

if json_result is None:
return None

return_type = call_definition["type"]

as_scale_bytes = scalecodec.ScaleBytes(json_result["result"]) # type: ignore

rpc_runtime_config = RuntimeConfiguration()
rpc_runtime_config.update_type_registry(load_type_registry_preset("legacy"))
rpc_runtime_config.update_type_registry(custom_rpc_type_registry)

obj = rpc_runtime_config.create_scale_object(return_type, as_scale_bytes)
if obj.data.to_hex() == "0x0400": # RPC returned None result
return None

return obj.decode()
return result

async def get_balance(
self,
Expand Down Expand Up @@ -1002,18 +966,18 @@ async def neurons(
decentralized structure and the dynamics of its consensus and governance processes.
"""
block_hash = await self._determine_block_hash(block, block_hash, reuse_block)
hex_bytes_result = await self.query_runtime_api(
result = await self.query_runtime_api(
runtime_api="NeuronInfoRuntimeApi",
method="get_neurons",
params=[netuid],
block_hash=block_hash,
reuse_block=reuse_block,
)

if hex_bytes_result is None:
if result is None:
return []

return NeuronInfo.list_from_vec_u8(hex_to_bytes(hex_bytes_result))
return NeuronInfo.list_from_any(result)

async def neurons_lite(
self,
Expand Down Expand Up @@ -1041,18 +1005,18 @@ async def neurons_lite(
of the network's decentralized structure and neuron dynamics.
"""
block_hash = await self._determine_block_hash(block, block_hash, reuse_block)
hex_bytes_result = await self.query_runtime_api(
result = await self.query_runtime_api(
runtime_api="NeuronInfoRuntimeApi",
method="get_neurons_lite",
params=[netuid],
block_hash=block_hash,
reuse_block=reuse_block,
)

if hex_bytes_result is None:
if result is None:
return []

return NeuronInfoLite.list_from_vec_u8(hex_to_bytes(hex_bytes_result))
return NeuronInfoLite.list_from_any(result)

async def get_neuron_for_pubkey_and_subnet(
self,
Expand Down Expand Up @@ -1092,16 +1056,20 @@ async def get_neuron_for_pubkey_and_subnet(
if uid is None:
return NeuronInfo.get_null_neuron()

params = [netuid, uid]
json_body = await self.substrate.rpc_request(
method="neuronInfo_getNeuron",
params=params,
result = await self.query_runtime_api(
runtime_api="NeuronInfoRuntimeApi",
method="get_neuron",
params=[
netuid,
uid,
], # TODO check to see if this can accept more than one at a time
block_hash=block_hash,
)

if not (result := json_body.get("result", None)):
if not result:
return NeuronInfo.get_null_neuron()

return NeuronInfo.from_vec_u8(bytes(result))
return NeuronInfo.from_any(result)

async def neuron_for_uid(
self,
Expand Down Expand Up @@ -1137,16 +1105,20 @@ async def neuron_for_uid(
if reuse_block:
block_hash = self.substrate.last_block_hash

params = [netuid, uid, block_hash] if block_hash else [netuid, uid]
json_body = await self.substrate.rpc_request(
method="neuronInfo_getNeuron",
params=params, # custom rpc method
result = await self.query_runtime_api(
runtime_api="NeuronInfoRuntimeApi",
method="get_neuron",
params=[
netuid,
uid,
],
block_hash=block_hash,
)
if not (result := json_body.get("result", None)):

if not result:
return NeuronInfo.get_null_neuron()

bytes_result = bytes(result)
return NeuronInfo.from_vec_u8(bytes_result)
return NeuronInfo.from_any(result)

async def get_delegated(
self,
Expand Down Expand Up @@ -1177,16 +1149,18 @@ async def get_delegated(
if (bh := await self._determine_block_hash(block, block_hash, reuse_block))
else (self.substrate.last_block_hash if reuse_block else None)
)
encoded_coldkey = ss58_to_vec_u8(coldkey_ss58)
json_body = await self.substrate.rpc_request(
method="delegateInfo_getDelegated",
params=([block_hash, encoded_coldkey] if block_hash else [encoded_coldkey]),

result = await self.query_runtime_api(
runtime_api="DelegateInfoRuntimeApi",
method="get_delegated",
params=[coldkey_ss58],
block_hash=block_hash,
)

if not (result := json_body.get("result")):
if not result:
return []

return DelegateInfo.delegated_list_from_vec_u8(bytes(result))
return DelegateInfo.delegated_list_from_any(result)

async def query_identity(
self,
Expand Down Expand Up @@ -1496,18 +1470,18 @@ async def get_subnet_hyperparameters(
they interact with the network's consensus and incentive mechanisms.
"""
block_hash = await self._determine_block_hash(block, block_hash, reuse_block)
hex_bytes_result = await self.query_runtime_api(
result = await self.query_runtime_api(
runtime_api="SubnetInfoRuntimeApi",
method="get_subnet_hyperparams",
params=[netuid],
block_hash=block_hash,
reuse_block=reuse_block,
)

if hex_bytes_result is None:
if result is None:
return []

return SubnetHyperparameters.from_vec_u8(hex_to_bytes(hex_bytes_result))
return SubnetHyperparameters.from_any(result)

async def get_vote_data(
self,
Expand Down
2 changes: 1 addition & 1 deletion bittensor/core/chain_data/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,6 @@
from .stake_info import StakeInfo
from .subnet_hyperparameters import SubnetHyperparameters
from .subnet_info import SubnetInfo
from .utils import custom_rpc_type_registry, decode_account_id, process_stake_data
from .utils import decode_account_id, process_stake_data

ProposalCallData = GenericCall
57 changes: 25 additions & 32 deletions bittensor/core/chain_data/delegate_info.py
Original file line number Diff line number Diff line change
@@ -1,15 +1,17 @@
import bt_decode

from dataclasses import dataclass
from typing import Optional
from typing import Any, Optional

import bt_decode
import munch

from bittensor.core.chain_data.info_base import InfoBase
from bittensor.core.chain_data.utils import decode_account_id
from bittensor.utils import u16_normalized_float
from bittensor.utils.balance import Balance


@dataclass
class DelegateInfo:
class DelegateInfo(InfoBase):
"""
Dataclass for delegate information. For a lighter version of this class, see ``DelegateInfoLite``.
Expand Down Expand Up @@ -40,8 +42,7 @@ class DelegateInfo:
total_daily_return: Balance # Total daily return of the delegate

@classmethod
def from_vec_u8(cls, vec_u8: bytes) -> Optional["DelegateInfo"]:
decoded = bt_decode.DelegateInfo.decode(vec_u8)
def _fix_decoded(cls, decoded: "DelegateInfo") -> Optional["DelegateInfo"]:
hotkey = decode_account_id(decoded.delegate_ss58)
owner = decode_account_id(decoded.owner_ss58)
nominators = [
Expand All @@ -63,36 +64,14 @@ def from_vec_u8(cls, vec_u8: bytes) -> Optional["DelegateInfo"]:
@classmethod
def list_from_vec_u8(cls, vec_u8: bytes) -> list["DelegateInfo"]:
decoded = bt_decode.DelegateInfo.decode_vec(vec_u8)
results = []
for d in decoded:
hotkey = decode_account_id(d.delegate_ss58)
owner = decode_account_id(d.owner_ss58)
nominators = [
(decode_account_id(x), Balance.from_rao(y)) for x, y in d.nominators
]
total_stake = sum((x[1] for x in nominators)) if nominators else Balance(0)
results.append(
DelegateInfo(
hotkey_ss58=hotkey,
total_stake=total_stake,
nominators=nominators,
owner_ss58=owner,
take=u16_normalized_float(d.take),
validator_permits=d.validator_permits,
registrations=d.registrations,
return_per_1000=Balance.from_rao(d.return_per_1000),
total_daily_return=Balance.from_rao(d.total_daily_return),
)
)
return results
return [cls._fix_decoded(d) for d in decoded]

@classmethod
def delegated_list_from_vec_u8(
cls, vec_u8: bytes
def fix_delegated_list(
cls, delegated_list: list[tuple["DelegateInfo", Balance]]
) -> list[tuple["DelegateInfo", Balance]]:
decoded = bt_decode.DelegateInfo.decode_delegated(vec_u8)
results = []
for d, b in decoded:
for d, b in delegated_list:
nominators = [
(decode_account_id(x), Balance.from_rao(y)) for x, y in d.nominators
]
Expand All @@ -110,3 +89,17 @@ def delegated_list_from_vec_u8(
)
results.append((delegate, Balance.from_rao(b)))
return results

@classmethod
def delegated_list_from_vec_u8(
cls, vec_u8: bytes
) -> list[tuple["DelegateInfo", Balance]]:
decoded = bt_decode.DelegateInfo.decode_delegated(vec_u8)
return cls.fix_delegated_list(decoded)

@classmethod
def delegated_list_from_any(
cls, any_list: list[Any]
) -> list[tuple["DelegateInfo", Balance]]:
any_list = [munch.munchify(any_) for any_ in any_list]
return cls.fix_delegated_list(any_list)
Loading

0 comments on commit ce86b2e

Please sign in to comment.