diff --git a/plenum/common/messages/node_messages.py b/plenum/common/messages/node_messages.py index 7a3888f09a..7ab9e3c5e0 100644 --- a/plenum/common/messages/node_messages.py +++ b/plenum/common/messages/node_messages.py @@ -1,8 +1,11 @@ from typing import TypeVar, NamedTuple -from plenum.common.constants import NOMINATE, BATCH, REELECTION, PRIMARY, BLACKLIST, REQACK, REQNACK, REJECT, \ - POOL_LEDGER_TXNS, ORDERED, PROPAGATE, PREPREPARE, PREPARE, COMMIT, CHECKPOINT, THREE_PC_STATE, CHECKPOINT_STATE, \ - REPLY, INSTANCE_CHANGE, LEDGER_STATUS, CONSISTENCY_PROOF, CATCHUP_REQ, CATCHUP_REP, VIEW_CHANGE_DONE, CURRENT_STATE, \ +from plenum.common.constants import NOMINATE, BATCH, REELECTION, PRIMARY, \ + BLACKLIST, REQACK, REQNACK, REJECT, \ + POOL_LEDGER_TXNS, ORDERED, PROPAGATE, PREPREPARE, PREPARE, COMMIT, \ + CHECKPOINT, THREE_PC_STATE, CHECKPOINT_STATE, \ + REPLY, INSTANCE_CHANGE, LEDGER_STATUS, CONSISTENCY_PROOF, CATCHUP_REQ, \ + CATCHUP_REP, VIEW_CHANGE_DONE, CURRENT_STATE, \ MESSAGE_REQUEST, MESSAGE_RESPONSE, OBSERVED_DATA, BATCH_COMMITTED from plenum.common.messages.client_request import ClientMessageValidator from plenum.common.messages.fields import NonNegativeNumberField, IterableField, \ diff --git a/plenum/common/types.py b/plenum/common/types.py index 8f1921b0ca..6534ead044 100644 --- a/plenum/common/types.py +++ b/plenum/common/types.py @@ -45,6 +45,7 @@ class f: # provides a namespace for reusable field constants ELECTION_DATA = Field('electionData', Any) TXN_ID = Field('txnId', str) REASON = Field('reason', Any) + IS_SUCCESS = Field('isSuccess', Any) SENDER_CLIENT = Field('senderClient', str) PP_TIME = Field("ppTime", float) REQ_IDR = Field("reqIdr", List[Tuple[str, int]]) diff --git a/plenum/server/action_req_handler.py b/plenum/server/action_req_handler.py new file mode 100644 index 0000000000..a764c08bf1 --- /dev/null +++ b/plenum/server/action_req_handler.py @@ -0,0 +1,16 @@ +from plenum.common.request import Request +from plenum.server.req_handler import RequestHandler + + +class ActionReqHandler(RequestHandler): + def __init__(self): + super().__init__() + + def doStaticValidation(self, request: Request): + pass + + def validate(self, req: Request): + pass + + def apply(self, req: Request, cons_time: int): + pass diff --git a/plenum/server/client_authn.py b/plenum/server/client_authn.py index 218e650d4c..e0e0b0cb5d 100644 --- a/plenum/server/client_authn.py +++ b/plenum/server/client_authn.py @@ -14,6 +14,7 @@ InsufficientSignatures, InsufficientCorrectSignatures from plenum.common.types import f from plenum.common.verifier import DidVerifier, Verifier +from plenum.server.action_req_handler import ActionReqHandler from plenum.server.domain_req_handler import DomainRequestHandler from plenum.server.pool_req_handler import PoolRequestHandler from stp_core.common.log import getlogger @@ -179,6 +180,7 @@ class CoreAuthMixin: ).union( DomainRequestHandler.query_types ) + action_types = ActionReqHandler.operation_types def is_query(self, typ): return typ in self.query_types @@ -186,6 +188,10 @@ def is_query(self, typ): def is_write(self, typ): return typ in self.write_types + @classmethod + def is_action(cls, typ): + return typ in cls.action_types + @staticmethod def _extract_signature(msg): if f.SIG.nm not in msg: diff --git a/plenum/server/config_req_handler.py b/plenum/server/config_req_handler.py index b9420db516..b64b2ec5cd 100644 --- a/plenum/server/config_req_handler.py +++ b/plenum/server/config_req_handler.py @@ -1,6 +1,19 @@ -from plenum.server.req_handler import RequestHandler +from plenum.common.request import Request +from plenum.server.ledger_req_handler import LedgerRequestHandler -class ConfigReqHandler(RequestHandler): +class ConfigReqHandler(LedgerRequestHandler): def __init__(self, ledger, state): super().__init__(ledger, state) + + def doStaticValidation(self, request: Request): + pass + + def get_query_response(self, request): + pass + + def validate(self, req: Request): + pass + + def apply(self, req: Request, cons_time: int): + pass diff --git a/plenum/server/domain_req_handler.py b/plenum/server/domain_req_handler.py index a67ee18f95..c1a9c733a4 100644 --- a/plenum/server/domain_req_handler.py +++ b/plenum/server/domain_req_handler.py @@ -11,13 +11,14 @@ from plenum.common.txn_util import reqToTxn from plenum.common.types import f from plenum.persistence.util import txnsWithSeqNo +from plenum.server.ledger_req_handler import LedgerRequestHandler from plenum.server.req_handler import RequestHandler from stp_core.common.log import getlogger logger = getlogger() -class DomainRequestHandler(RequestHandler): +class DomainRequestHandler(LedgerRequestHandler): stateSerializer = domain_state_serializer write_types = {NYM, } diff --git a/plenum/server/ledger_req_handler.py b/plenum/server/ledger_req_handler.py new file mode 100644 index 0000000000..ba5c74a093 --- /dev/null +++ b/plenum/server/ledger_req_handler.py @@ -0,0 +1,78 @@ +from abc import ABCMeta, abstractmethod +from typing import List + +import base58 + +from plenum.common.ledger import Ledger +from plenum.common.request import Request +from plenum.persistence.util import txnsWithSeqNo +from plenum.server.req_handler import RequestHandler +from stp_core.common.log import getlogger + +from state.state import State + +logger = getlogger() + + +class LedgerRequestHandler(RequestHandler, metaclass=ABCMeta): + """ + Base class for request handlers + Declares methods for validation, application of requests and + state control + """ + + query_types = set() + write_types = set() + + def __init__(self, ledger: Ledger, state: State): + self.state = state + self.ledger = ledger + + def updateState(self, txns, isCommitted=False): + """ + Updates current state with a number of committed or + not committed transactions + """ + + def commit(self, txnCount, stateRoot, txnRoot, ppTime) -> List: + """ + :param txnCount: The number of requests to commit (The actual requests + are picked up from the uncommitted list from the ledger) + :param stateRoot: The state trie root after the txns are committed + :param txnRoot: The txn merkle root after the txns are committed + + :return: list of committed transactions + """ + + (seqNoStart, seqNoEnd), committedTxns = \ + self.ledger.commitTxns(txnCount) + stateRoot = base58.b58decode(stateRoot.encode()) + # Probably the following assertion fail should trigger catchup + assert self.ledger.root_hash == txnRoot, '{} {}'.format( + self.ledger.root_hash, txnRoot) + self.state.commit(rootHash=stateRoot) + return txnsWithSeqNo(seqNoStart, seqNoEnd, committedTxns) + + def onBatchCreated(self, state_root): + pass + + def onBatchRejected(self): + pass + + @abstractmethod + def doStaticValidation(self, request: Request): + pass + + def is_query(self, txn_type): + return txn_type in self.query_types + + def get_query_response(self, request): + raise NotImplementedError + + @staticmethod + def transform_txn_for_ledger(txn): + return txn + + @property + def operation_types(self) -> set: + return self.write_types.union(self.query_types) diff --git a/plenum/server/node.py b/plenum/server/node.py index 5ae040e5a5..845ae43668 100644 --- a/plenum/server/node.py +++ b/plenum/server/node.py @@ -6,7 +6,7 @@ from contextlib import closing from functools import partial from typing import Dict, Any, Mapping, Iterable, List, Optional, Set, Tuple, Callable - +from plenum.server.ledger_req_handler import LedgerRequestHandler from crypto.bls.bls_key_manager import LoadBLSKeyError from intervaltree import IntervalTree from ledger.compact_merkle_tree import CompactMerkleTree @@ -58,6 +58,7 @@ from plenum.common.verifier import DidVerifier from plenum.persistence.req_id_to_txn import ReqIdrToTxn from plenum.persistence.storage import Storage, initStorage +from plenum.server.action_req_handler import ActionReqHandler from plenum.server.blacklister import Blacklister from plenum.server.blacklister import SimpleBlacklister from plenum.server.client_authn import ClientAuthNr, SimpleAuthNr, CoreAuthNr @@ -182,7 +183,7 @@ def __init__(self, # init before domain req handler! self.bls_bft = self._create_bls_bft() - self.register_req_handler(DOMAIN_LEDGER_ID, self.getDomainReqHandler()) + self.register_req_handler(self.getDomainReqHandler(), DOMAIN_LEDGER_ID) self.register_executer(DOMAIN_LEDGER_ID, self.executeDomainTxns) self.initDomainState() @@ -192,7 +193,7 @@ def __init__(self, self.addGenesisNyms() self.mode = None # type: Optional[Mode] - self.register_req_handler(POOL_LEDGER_ID, self.poolManager.reqHandler) + self.register_req_handler(self.poolManager.reqHandler, POOL_LEDGER_ID) self.nodeReg = self.poolManager.nodeReg @@ -424,6 +425,7 @@ def view_change_in_progress(self): def init_config_state(self): self.register_state(CONFIG_LEDGER_ID, self.loadConfigState()) self.setup_config_req_handler() + self.setup_action_req_handler() self.initConfigState() def _add_config_ledger(self): @@ -436,7 +438,11 @@ def _add_config_ledger(self): def setup_config_req_handler(self): self.configReqHandler = self.getConfigReqHandler() - self.register_req_handler(CONFIG_LEDGER_ID, self.configReqHandler) + self.register_req_handler(self.configReqHandler, CONFIG_LEDGER_ID) + + def setup_action_req_handler(self): + self.actionReqHandler = self.get_action_req_handler() + self.register_req_handler(self.actionReqHandler) def getConfigLedger(self): return Ledger(CompactMerkleTree(hashStore=self.getHashStore('config')), @@ -460,6 +466,9 @@ def getConfigReqHandler(self): return ConfigReqHandler(self.configLedger, self.states[CONFIG_LEDGER_ID]) + def get_action_req_handler(self): + return ActionReqHandler() + def postConfigLedgerCaughtUp(self, **kwargs): pass @@ -728,17 +737,21 @@ def on_new_ledger_added(self, ledger_id): def register_state(self, ledger_id, state): self.states[ledger_id] = state - def register_req_handler(self, ledger_id: int, req_handler: RequestHandler): - self.ledger_to_req_handler[ledger_id] = req_handler - for txn_type in req_handler.valid_txn_types: - self.register_txn_type(txn_type, ledger_id, req_handler) + def register_req_handler(self, req_handler: RequestHandler, + ledger_id: int = None): + if ledger_id is not None: + self.ledger_to_req_handler[ledger_id] = req_handler + for txn_type in req_handler.operation_types: + self.register_txn_type(txn_type, req_handler, ledger_id) - def register_txn_type(self, txn_type, ledger_id: int, req_handler: RequestHandler): + def register_txn_type(self, txn_type, req_handler: RequestHandler, + ledger_id: int = None): if txn_type in self.txn_type_to_req_handler: raise ValueError('{} already registered for {}' .format(txn_type, self.txn_type_to_req_handler[txn_type])) self.txn_type_to_req_handler[txn_type] = req_handler - self.txn_type_to_ledger_id[txn_type] = ledger_id + if ledger_id is not None: + self.txn_type_to_ledger_id[txn_type] = ledger_id def register_executer(self, ledger_id: int, executer: Callable): self.requestExecuter[ledger_id] = executer @@ -2030,7 +2043,12 @@ def processRequest(self, request: Request, frm: str): def is_query(self, txn_type) -> bool: # Does the transaction type correspond to a read? handler = self.get_req_handler(txn_type=txn_type) - return handler and handler.is_query(txn_type) + return handler and isinstance(handler, + LedgerRequestHandler + ) and handler.is_query(txn_type) + + def is_action(self, txn_type) -> bool: + return txn_type in self.actionReqHandler.operation_types def process_query(self, request: Request, frm: str): # Process a read request from client @@ -2043,6 +2061,17 @@ def process_query(self, request: Request, frm: str): result = handler.get_query_response(request) self.transmitToClient(Reply(result), frm) + def process_action(self, request, frm): + # Process an execute action request + self.send_ack_to_client(request.key, frm) + try: + self.actionReqHandler.validate(request) + result = self.actionReqHandler.apply(request) + self.transmitToClient(Reply(result), frm) + except Exception as ex: + self.transmitToClient(Reject(request.identifier, + request.reqId, str(ex)), frm) + # noinspection PyUnusedLocal def processPropagate(self, msg: Propagate, frm): """ diff --git a/plenum/server/pool_req_handler.py b/plenum/server/pool_req_handler.py index be9a5f3080..a5db9b4261 100644 --- a/plenum/server/pool_req_handler.py +++ b/plenum/server/pool_req_handler.py @@ -10,6 +10,7 @@ from plenum.common.types import f from plenum.persistence.util import txnsWithSeqNo from plenum.server.domain_req_handler import DomainRequestHandler +from plenum.server.ledger_req_handler import LedgerRequestHandler from plenum.server.req_handler import RequestHandler from state.state import State from stp_core.common.log import getlogger @@ -17,7 +18,7 @@ logger = getlogger() -class PoolRequestHandler(RequestHandler): +class PoolRequestHandler(LedgerRequestHandler): write_types = {NODE, } def __init__(self, ledger: Ledger, state: State, diff --git a/plenum/server/req_authenticator.py b/plenum/server/req_authenticator.py index 8a73e5b2c9..6aa3e70221 100644 --- a/plenum/server/req_authenticator.py +++ b/plenum/server/req_authenticator.py @@ -32,7 +32,8 @@ def authenticate(self, req_data): for authenticator in self._authenticators: if authenticator.is_query(typ): return set() - if not authenticator.is_write(typ): + if not (authenticator.is_write(typ) or + authenticator.is_action(typ)): continue rv = authenticator.authenticate(req_data) or set() identifiers.update(rv) diff --git a/plenum/server/req_handler.py b/plenum/server/req_handler.py index ccbb74e512..a5584f7d3a 100644 --- a/plenum/server/req_handler.py +++ b/plenum/server/req_handler.py @@ -1,3 +1,4 @@ +from abc import ABCMeta, abstractmethod from typing import List import base58 @@ -12,77 +13,30 @@ logger = getlogger() -class RequestHandler: +class RequestHandler(metaclass=ABCMeta): """ Base class for request handlers Declares methods for validation, application of requests and state control """ - write_types = set() - query_types = set() - - def __init__(self, ledger: Ledger, state: State): - self.ledger = ledger - self.state = state + operation_types = set() + @abstractmethod def doStaticValidation(self, request: Request): """ Does static validation like presence of required fields, properly formed request, etc """ + @abstractmethod def validate(self, req: Request): """ Does dynamic validation (state based validation) on request. Raises exception if request is invalid. """ + @abstractmethod def apply(self, req: Request, cons_time: int): """ Applies request """ - - def updateState(self, txns, isCommitted=False): - """ - Updates current state with a number of committed or - not committed transactions - """ - - def commit(self, txnCount, stateRoot, txnRoot, ppTime) -> List: - """ - :param txnCount: The number of requests to commit (The actual requests - are picked up from the uncommitted list from the ledger) - :param stateRoot: The state trie root after the txns are committed - :param txnRoot: The txn merkle root after the txns are committed - - :return: list of committed transactions - """ - - (seqNoStart, seqNoEnd), committedTxns = \ - self.ledger.commitTxns(txnCount) - stateRoot = base58.b58decode(stateRoot.encode()) - # Probably the following assertion fail should trigger catchup - assert self.ledger.root_hash == txnRoot, '{} {}'.format( - self.ledger.root_hash, txnRoot) - self.state.commit(rootHash=stateRoot) - return txnsWithSeqNo(seqNoStart, seqNoEnd, committedTxns) - - def onBatchCreated(self, state_root): - pass - - def onBatchRejected(self): - pass - - def is_query(self, txn_type): - return txn_type in self.query_types - - def get_query_response(self, request): - raise NotImplementedError - - @staticmethod - def transform_txn_for_ledger(txn): - return txn - - @property - def valid_txn_types(self) -> set: - return self.write_types.union(self.query_types) diff --git a/plenum/test/plugin/demo_plugin/auction_req_handler.py b/plenum/test/plugin/demo_plugin/auction_req_handler.py index 2494f986c5..104357fc77 100644 --- a/plenum/test/plugin/demo_plugin/auction_req_handler.py +++ b/plenum/test/plugin/demo_plugin/auction_req_handler.py @@ -5,12 +5,13 @@ from plenum.common.txn_util import reqToTxn from plenum.common.types import f from plenum.persistence.util import txnsWithSeqNo +from plenum.server.ledger_req_handler import LedgerRequestHandler from plenum.server.req_handler import RequestHandler from plenum.test.plugin.demo_plugin.constants import PLACE_BID, AUCTION_END, \ AUCTION_START, GET_BAL, AMOUNT -class AuctionReqHandler(RequestHandler): +class AuctionReqHandler(LedgerRequestHandler): write_types = {AUCTION_START, AUCTION_END, PLACE_BID} query_types = {GET_BAL, } diff --git a/plenum/test/plugin/demo_plugin/main.py b/plenum/test/plugin/demo_plugin/main.py index 538997a9e6..a3ecc94327 100644 --- a/plenum/test/plugin/demo_plugin/main.py +++ b/plenum/test/plugin/demo_plugin/main.py @@ -26,5 +26,5 @@ def integrate_plugin_in_node(node): auction_authnr = AuctionAuthNr(node.states[DOMAIN_LEDGER_ID]) node.clientAuthNr.register_authenticator(auction_authnr) auction_req_handler = AuctionReqHandler(ledger, state) - node.register_req_handler(AUCTION_LEDGER_ID, auction_req_handler) + node.register_req_handler(auction_req_handler, AUCTION_LEDGER_ID) return node