Skip to content
Merged
Show file tree
Hide file tree
Changes from 10 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 2 additions & 11 deletions .env.example
Original file line number Diff line number Diff line change
Expand Up @@ -19,17 +19,7 @@ RPCHOST = 'jsonrpc'
RPCPORT = '4000'
# (the port debugpy is listening on)
RPCDEBUGPORT = '4678'

# GenVM server details
GENVMPROTOCOL = 'http'
GENVMHOST = 'genvm'
GENVMPORT = '6000'
# Location of file excuted inside the GenVM
GENVMCONLOC = '/tmp'
# TODO: Will be removed with the new logging
GENVMDEBUG = 1
# (the port debugpy is listening on)
GENVMDEBUGPORT = '6678'
GENVM_BIN = "/genvm/bin"

# (enables debuggin in VScode)
VSCODEDEBUG = "false" # "true" or "false"
Expand All @@ -44,6 +34,7 @@ OLAMAPORT = '11434'
WEBREQUESTPROTOCOL = 'http'
WEBREQUESTHOST = 'webrequest'
WEBREQUESTPORT = '5000'
WEBREQUESTSELENIUMPORT = '5001'

# If you want to use OpenAI add your key here
OPENAIKEY = '<add_your_openai_api_key_here>'
Expand Down
2 changes: 1 addition & 1 deletion .github/workflows/unit-tests-pr.yml
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ jobs:
- uses: actions/checkout@v4
- uses: actions/setup-python@v5
with:
python-version: 3.11
python-version: 3.12.4
cache: pip
- run: pip install -r backend/protocol_rpc/requirements.txt
- run: pip install pytest-cov
Expand Down
86 changes: 55 additions & 31 deletions backend/consensus/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@
Validator,
)
from backend.node.base import Node
from backend.node.genvm.types import ExecutionMode, Receipt, Vote
from backend.node.types import ExecutionMode, Receipt, Vote, ExecutionResultStatus
from backend.protocol_rpc.message_handler.base import MessageHandler
from backend.protocol_rpc.message_handler.types import (
LogEvent,
Expand Down Expand Up @@ -110,18 +110,34 @@ async def _run_consensus(self):
for queue in [q for q in self.queues.values() if not q.empty()]:
# sessions cannot be shared between coroutines, we need to create a new session for each coroutine
# https://docs.sqlalchemy.org/en/20/orm/session_basics.html#is-the-session-thread-safe-is-asyncsession-safe-to-share-in-concurrent-tasks
transaction = await queue.get()
transaction: Transaction = await queue.get()
with self.get_session() as session:

def contract_snapshot_factory(
contract_address,
session=session,
transaction=transaction,
):
if (
transaction.type == TransactionType.DEPLOY_CONTRACT
and contract_address == transaction.to_address
):
ret = ContractSnapshot(None, session)
ret.contract_address = transaction.to_address
ret.contract_code = transaction.data[
"contract_code"
]
ret.encoded_state = {}
return ret
return ContractSnapshot(contract_address, session)

async def exec_transaction_with_session_handling():
await self.exec_transaction(
transaction,
TransactionsProcessor(session),
ChainSnapshot(session),
AccountsManager(session),
lambda contract_address, session=session: ContractSnapshot(
contract_address, session
),
contract_snapshot_factory,
)
session.commit()

Expand Down Expand Up @@ -202,20 +218,25 @@ async def exec_transaction(

num_validators = len(remaining_validators) + 1

contract_snapshot = contract_snapshot_factory(transaction.to_address)
contract_snapshot_supplier = lambda: contract_snapshot_factory(
transaction.to_address
)
Comment thread
AgustinRamiroDiaz marked this conversation as resolved.

leaders_contract_snapshot = contract_snapshot_supplier()

# Create Leader
leader_node = node_factory(
leader,
ExecutionMode.LEADER,
contract_snapshot,
leaders_contract_snapshot,
None,
msg_handler,
contract_snapshot_factory,
)

# Leader executes transaction
leader_receipt = await leader_node.exec_transaction(transaction)

votes = {leader["address"]: leader_receipt.vote.value}
# Update transaction status
ConsensusAlgorithm.dispatch_transaction_status_update(
Expand All @@ -230,7 +251,7 @@ async def exec_transaction(
node_factory(
validator,
ExecutionMode.VALIDATOR,
contract_snapshot,
contract_snapshot_supplier(),
leader_receipt,
msg_handler,
contract_snapshot_factory,
Expand Down Expand Up @@ -258,7 +279,7 @@ async def exec_transaction(

if (
len([vote for vote in votes.values() if vote == Vote.AGREE.value])
>= num_validators // 2
>= (num_validators + 1) // 2
Comment thread
AgustinRamiroDiaz marked this conversation as resolved.
):
break # Consensus reached

Expand Down Expand Up @@ -309,30 +330,33 @@ async def exec_transaction(
)
)

# Register contract if it is a new contract
if transaction.type == TransactionType.DEPLOY_CONTRACT:
new_contract = {
"id": transaction.data["contract_address"],
"data": {
"state": leader_receipt.contract_state,
"code": transaction.data["contract_code"],
},
}
contract_snapshot.register_contract(new_contract)

msg_handler.send_message(
LogEvent(
"deployed_contract",
EventType.SUCCESS,
EventScope.GENVM,
"Contract deployed",
new_contract,
if leader_receipt.execution_result == ExecutionResultStatus.SUCCESS:
# Register contract if it is a new contract
if transaction.type == TransactionType.DEPLOY_CONTRACT:
new_contract = {
"id": transaction.data["contract_address"],
"data": {
"state": leader_receipt.contract_state,
"code": transaction.data["contract_code"],
},
}
leaders_contract_snapshot.register_contract(new_contract)

msg_handler.send_message(
LogEvent(
"deployed_contract",
EventType.SUCCESS,
EventScope.GENVM,
"Contract deployed",
new_contract,
)
)
)

# Update contract state if it is an existing contract
else:
contract_snapshot.update_contract_state(leader_receipt.contract_state)
# Update contract state if it is an existing contract
else:
leaders_contract_snapshot.update_contract_state(
leader_receipt.contract_state
)

ConsensusAlgorithm.dispatch_transaction_status_update(
transactions_processor,
Expand Down
14 changes: 9 additions & 5 deletions backend/database_handler/contract_snapshot.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,10 +9,14 @@ class ContractSnapshot:
"""
Warning: if you initialize this class with a contract_address:
- The contract_address must exist in the database.
- `self.contract_data`, `self.contract_code` and `self.cencoded_state` will be loaded from the database **only once** at initialization.
- `self.contract_data`, `self.contract_code` and `self.encoded_state` will be loaded from the database **only once** at initialization.
"""

def __init__(self, contract_address: str, session: Session):
contract_address: str
contract_code: str
encoded_state: dict[str, dict[str, str]]

def __init__(self, contract_address: str | None, session: Session):
self.session = session

if contract_address is not None:
Expand Down Expand Up @@ -46,15 +50,15 @@ def register_contract(self, contract: dict):
current_contract.data = contract["data"]
self.session.commit()

def update_contract_state(self, new_state: str):
def update_contract_state(self, new_state: dict[str, str]):
"""Update the state of the contract in the database."""
new_contract_nada = {
new_contract_data = {
"code": self.contract_data["code"],
"state": new_state,
}

contract = (
self.session.query(CurrentState).filter_by(id=self.contract_address).one()
)
contract.data = new_contract_nada
contract.data = new_contract_data
self.session.commit()
10 changes: 5 additions & 5 deletions backend/database_handler/llm_providers.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
from backend.node.create_nodes.providers import get_default_providers
from .models import LLMProviderDBModel
from sqlalchemy.orm import Session
from backend.node.genvm.llms import get_llm_plugin
from backend.llms import get_llm_plugin
import pprint


Expand All @@ -26,20 +26,20 @@ def get_all(self) -> list[LLMProvider]:
for provider in self.session.query(LLMProviderDBModel).all()
]

def get_all_dict(self) -> list[dict]:
async def get_all_dict(self) -> list[dict]:
providers = self.session.query(LLMProviderDBModel).all()
result = []

for provider in providers:
domain_provider = _to_domain(provider)
provider_dict = domain_provider.__dict__

plugin = get_llm_plugin(
plugin = await get_llm_plugin(
domain_provider.plugin, domain_provider.plugin_config
)

provider_dict["is_available"] = plugin.is_available()
provider_dict["is_model_available"] = plugin.is_model_available(
provider_dict["is_available"] = await plugin.is_available()
provider_dict["is_model_available"] = await plugin.is_model_available(
domain_provider.model
)

Expand Down
2 changes: 1 addition & 1 deletion backend/database_handler/types.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
from dataclasses import dataclass
from backend.node.genvm.types import Receipt
from backend.node.types import Receipt


@dataclass
Expand Down
8 changes: 4 additions & 4 deletions backend/domain/types.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@

from dataclasses import dataclass
import decimal
from enum import Enum
from enum import Enum, IntEnum

from backend.database_handler.models import TransactionStatus

Expand Down Expand Up @@ -54,7 +54,7 @@ def to_dict(self):
return result


class TransactionType(Enum):
class TransactionType(IntEnum):
SEND = 0
DEPLOY_CONTRACT = 1
RUN_CONTRACT = 2
Expand All @@ -65,8 +65,8 @@ class Transaction:
hash: str
status: TransactionStatus
type: TransactionType
from_address: str | None = None
to_address: str | None = None
from_address: str | None
to_address: str | None
input_data: dict | None = None
data: dict | None = None
consensus_data: dict | None = None
Expand Down
70 changes: 70 additions & 0 deletions backend/llms.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,70 @@
import asyncio
import aiohttp
from typing import Protocol, Any
import os
import json

_webrequest_url: str = (
os.environ["WEBREQUESTPROTOCOL"]
+ "://"
+ os.environ["WEBREQUESTHOST"]
+ ":"
+ os.environ["WEBREQUESTPORT"]
)


class Plugin(Protocol):
def __init__(self, plugin_config: dict): ...

async def call(
self,
node_config: dict,
prompt: str,
regex: str | None,
) -> str: ...

async def is_available(self) -> bool: ...

async def is_model_available(self, model: str) -> bool: ...


async def _call_jsonrpc(function_name: str, *args) -> Any:
payload = {
"jsonrpc": "2.0",
"method": function_name,
"params": [*args],
"id": 1,
}
async with aiohttp.ClientSession() as session:
async with session.post(
_webrequest_url + "/api",
data=json.dumps(payload),
headers={"Content-Type": "application/json"},
) as response:
res = json.loads(await response.text())
return res["result"]["response"]


class _RemotePlugin(Plugin):
def __init__(self, id: str):
self._id = id

async def call(
self,
node_config: dict,
prompt: str,
regex: str | None,
) -> str:
return await _call_jsonrpc(
"llm_plugin_call", self._id, node_config, prompt, regex
)

async def is_available(self) -> bool:
return await _call_jsonrpc("llm_plugin_is_available", self._id)

async def is_model_available(self, model: str) -> bool:
return await _call_jsonrpc("llm_plugin_is_model_available", self._id, model)


async def get_llm_plugin(plugin: str, plugin_config: dict) -> Plugin:
return _RemotePlugin(await _call_jsonrpc("llm_plugin_get", plugin, plugin_config))
Loading