Skip to content

Commit

Permalink
[INDY-1173] add POOL_RESTART handling (#582)
Browse files Browse the repository at this point in the history
* INDY-1173: added ActionResult

Signed-off-by: toktar <[email protected]>

* INDY-1173: Refactoring POOL_RESTART

Changes:
-Add NodeController as a superclass for Restarter and Upgrader
-Add action handler for new type of transaction that should not be saved at ledger
-Change handlers logical

Signed-off-by: toktar <[email protected]>

* INDY-1173: Bagfix POOL_RESTART

Changes:
-fixed problem with auth
-fixed problem with schedule validation

Signed-off-by: toktar <[email protected]>

* INDY-1173: refactoring

Changes:
- made LedgerRequestHandler abstract

Signed-off-by: toktar <[email protected]>

* INDY-1173: refactoring

Changes:
- change documentation
- change handler logic of store operation types

Signed-off-by: toktar <[email protected]>

* INDY-1173: refactoring

Changes:
- remove OPERATION field
- revert excess different

Signed-off-by: toktar <[email protected]>

* INDY-1173: refactoring

Changes:
- implementing abstract methods for action_req_handler.py
- change sending reply logic

Signed-off-by: toktar <[email protected]>

* INDY-1173: Refactoring code style

Signed-off-by: toktar <[email protected]>

* INDY-1173: fixed code style in the node

Signed-off-by: toktar <[email protected]>

* INDY-1173: Added implementation of abstract methods

changes:
-Added implementation of abstract methods LedgerRequestHandler to ConfigReqHandler

Signed-off-by: toktar <[email protected]>

* INDY-1173: fixed code style

Signed-off-by: toktar <[email protected]>

* INDY-1173: fixed problems after merge

Signed-off-by: toktar <[email protected]>

* INDY-1173: refactoring node

changes:
- Removing an unnecessary handler call

Signed-off-by: toktar <[email protected]>

* INDY-1173:refactoring

changes:
-change operation_types in ledger_req_handler.py
-change process_action logic

Signed-off-by: toktar <[email protected]>

* INDY-1173:refactoring

changes:
-rollback changes in operation_types

Signed-off-by: toktar <[email protected]>

* INDY-1173: refactoring

changes:
- Add ack

Signed-off-by: toktar <[email protected]>

* INDY-1184: bugfix

Changes:
- change reject format

Signed-off-by: toktar <[email protected]>
  • Loading branch information
Toktar authored and andkononykhin committed Apr 17, 2018
1 parent 0a996ba commit 851ad6f
Show file tree
Hide file tree
Showing 13 changed files with 177 additions and 73 deletions.
9 changes: 6 additions & 3 deletions plenum/common/messages/node_messages.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,11 @@
from typing import TypeVar, NamedTuple

from plenum.common.constants import NOMINATE, BATCH, REELECTION, PRIMARY, BLACKLIST, REQACK, REQNACK, REJECT, \
POOL_LEDGER_TXNS, ORDERED, PROPAGATE, PREPREPARE, PREPARE, COMMIT, CHECKPOINT, THREE_PC_STATE, CHECKPOINT_STATE, \
REPLY, INSTANCE_CHANGE, LEDGER_STATUS, CONSISTENCY_PROOF, CATCHUP_REQ, CATCHUP_REP, VIEW_CHANGE_DONE, CURRENT_STATE, \
from plenum.common.constants import NOMINATE, BATCH, REELECTION, PRIMARY, \
BLACKLIST, REQACK, REQNACK, REJECT, \
POOL_LEDGER_TXNS, ORDERED, PROPAGATE, PREPREPARE, PREPARE, COMMIT, \
CHECKPOINT, THREE_PC_STATE, CHECKPOINT_STATE, \
REPLY, INSTANCE_CHANGE, LEDGER_STATUS, CONSISTENCY_PROOF, CATCHUP_REQ, \
CATCHUP_REP, VIEW_CHANGE_DONE, CURRENT_STATE, \
MESSAGE_REQUEST, MESSAGE_RESPONSE, OBSERVED_DATA, BATCH_COMMITTED
from plenum.common.messages.client_request import ClientMessageValidator
from plenum.common.messages.fields import NonNegativeNumberField, IterableField, \
Expand Down
1 change: 1 addition & 0 deletions plenum/common/types.py
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ class f: # provides a namespace for reusable field constants
ELECTION_DATA = Field('electionData', Any)
TXN_ID = Field('txnId', str)
REASON = Field('reason', Any)
IS_SUCCESS = Field('isSuccess', Any)
SENDER_CLIENT = Field('senderClient', str)
PP_TIME = Field("ppTime", float)
REQ_IDR = Field("reqIdr", List[Tuple[str, int]])
Expand Down
16 changes: 16 additions & 0 deletions plenum/server/action_req_handler.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
from plenum.common.request import Request
from plenum.server.req_handler import RequestHandler


class ActionReqHandler(RequestHandler):
def __init__(self):
super().__init__()

def doStaticValidation(self, request: Request):
pass

def validate(self, req: Request):
pass

def apply(self, req: Request, cons_time: int):
pass
6 changes: 6 additions & 0 deletions plenum/server/client_authn.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
InsufficientSignatures, InsufficientCorrectSignatures
from plenum.common.types import f
from plenum.common.verifier import DidVerifier, Verifier
from plenum.server.action_req_handler import ActionReqHandler
from plenum.server.domain_req_handler import DomainRequestHandler
from plenum.server.pool_req_handler import PoolRequestHandler
from stp_core.common.log import getlogger
Expand Down Expand Up @@ -179,13 +180,18 @@ class CoreAuthMixin:
).union(
DomainRequestHandler.query_types
)
action_types = ActionReqHandler.operation_types

def is_query(self, typ):
return typ in self.query_types

def is_write(self, typ):
return typ in self.write_types

@classmethod
def is_action(cls, typ):
return typ in cls.action_types

@staticmethod
def _extract_signature(msg):
if f.SIG.nm not in msg:
Expand Down
17 changes: 15 additions & 2 deletions plenum/server/config_req_handler.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,19 @@
from plenum.server.req_handler import RequestHandler
from plenum.common.request import Request
from plenum.server.ledger_req_handler import LedgerRequestHandler


class ConfigReqHandler(RequestHandler):
class ConfigReqHandler(LedgerRequestHandler):
def __init__(self, ledger, state):
super().__init__(ledger, state)

def doStaticValidation(self, request: Request):
pass

def get_query_response(self, request):
pass

def validate(self, req: Request):
pass

def apply(self, req: Request, cons_time: int):
pass
3 changes: 2 additions & 1 deletion plenum/server/domain_req_handler.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,13 +11,14 @@
from plenum.common.txn_util import reqToTxn
from plenum.common.types import f
from plenum.persistence.util import txnsWithSeqNo
from plenum.server.ledger_req_handler import LedgerRequestHandler
from plenum.server.req_handler import RequestHandler
from stp_core.common.log import getlogger

logger = getlogger()


class DomainRequestHandler(RequestHandler):
class DomainRequestHandler(LedgerRequestHandler):
stateSerializer = domain_state_serializer
write_types = {NYM, }

Expand Down
78 changes: 78 additions & 0 deletions plenum/server/ledger_req_handler.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,78 @@
from abc import ABCMeta, abstractmethod
from typing import List

import base58

from plenum.common.ledger import Ledger
from plenum.common.request import Request
from plenum.persistence.util import txnsWithSeqNo
from plenum.server.req_handler import RequestHandler
from stp_core.common.log import getlogger

from state.state import State

logger = getlogger()


class LedgerRequestHandler(RequestHandler, metaclass=ABCMeta):
"""
Base class for request handlers
Declares methods for validation, application of requests and
state control
"""

query_types = set()
write_types = set()

def __init__(self, ledger: Ledger, state: State):
self.state = state
self.ledger = ledger

def updateState(self, txns, isCommitted=False):
"""
Updates current state with a number of committed or
not committed transactions
"""

def commit(self, txnCount, stateRoot, txnRoot, ppTime) -> List:
"""
:param txnCount: The number of requests to commit (The actual requests
are picked up from the uncommitted list from the ledger)
:param stateRoot: The state trie root after the txns are committed
:param txnRoot: The txn merkle root after the txns are committed
:return: list of committed transactions
"""

(seqNoStart, seqNoEnd), committedTxns = \
self.ledger.commitTxns(txnCount)
stateRoot = base58.b58decode(stateRoot.encode())
# Probably the following assertion fail should trigger catchup
assert self.ledger.root_hash == txnRoot, '{} {}'.format(
self.ledger.root_hash, txnRoot)
self.state.commit(rootHash=stateRoot)
return txnsWithSeqNo(seqNoStart, seqNoEnd, committedTxns)

def onBatchCreated(self, state_root):
pass

def onBatchRejected(self):
pass

@abstractmethod
def doStaticValidation(self, request: Request):
pass

def is_query(self, txn_type):
return txn_type in self.query_types

def get_query_response(self, request):
raise NotImplementedError

@staticmethod
def transform_txn_for_ledger(txn):
return txn

@property
def operation_types(self) -> set:
return self.write_types.union(self.query_types)
51 changes: 40 additions & 11 deletions plenum/server/node.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
from contextlib import closing
from functools import partial
from typing import Dict, Any, Mapping, Iterable, List, Optional, Set, Tuple, Callable

from plenum.server.ledger_req_handler import LedgerRequestHandler
from crypto.bls.bls_key_manager import LoadBLSKeyError
from intervaltree import IntervalTree
from ledger.compact_merkle_tree import CompactMerkleTree
Expand Down Expand Up @@ -58,6 +58,7 @@
from plenum.common.verifier import DidVerifier
from plenum.persistence.req_id_to_txn import ReqIdrToTxn
from plenum.persistence.storage import Storage, initStorage
from plenum.server.action_req_handler import ActionReqHandler
from plenum.server.blacklister import Blacklister
from plenum.server.blacklister import SimpleBlacklister
from plenum.server.client_authn import ClientAuthNr, SimpleAuthNr, CoreAuthNr
Expand Down Expand Up @@ -182,7 +183,7 @@ def __init__(self,
# init before domain req handler!
self.bls_bft = self._create_bls_bft()

self.register_req_handler(DOMAIN_LEDGER_ID, self.getDomainReqHandler())
self.register_req_handler(self.getDomainReqHandler(), DOMAIN_LEDGER_ID)
self.register_executer(DOMAIN_LEDGER_ID, self.executeDomainTxns)

self.initDomainState()
Expand All @@ -192,7 +193,7 @@ def __init__(self,
self.addGenesisNyms()

self.mode = None # type: Optional[Mode]
self.register_req_handler(POOL_LEDGER_ID, self.poolManager.reqHandler)
self.register_req_handler(self.poolManager.reqHandler, POOL_LEDGER_ID)

self.nodeReg = self.poolManager.nodeReg

Expand Down Expand Up @@ -424,6 +425,7 @@ def view_change_in_progress(self):
def init_config_state(self):
self.register_state(CONFIG_LEDGER_ID, self.loadConfigState())
self.setup_config_req_handler()
self.setup_action_req_handler()
self.initConfigState()

def _add_config_ledger(self):
Expand All @@ -436,7 +438,11 @@ def _add_config_ledger(self):

def setup_config_req_handler(self):
self.configReqHandler = self.getConfigReqHandler()
self.register_req_handler(CONFIG_LEDGER_ID, self.configReqHandler)
self.register_req_handler(self.configReqHandler, CONFIG_LEDGER_ID)

def setup_action_req_handler(self):
self.actionReqHandler = self.get_action_req_handler()
self.register_req_handler(self.actionReqHandler)

def getConfigLedger(self):
return Ledger(CompactMerkleTree(hashStore=self.getHashStore('config')),
Expand All @@ -460,6 +466,9 @@ def getConfigReqHandler(self):
return ConfigReqHandler(self.configLedger,
self.states[CONFIG_LEDGER_ID])

def get_action_req_handler(self):
return ActionReqHandler()

def postConfigLedgerCaughtUp(self, **kwargs):
pass

Expand Down Expand Up @@ -728,17 +737,21 @@ def on_new_ledger_added(self, ledger_id):
def register_state(self, ledger_id, state):
self.states[ledger_id] = state

def register_req_handler(self, ledger_id: int, req_handler: RequestHandler):
self.ledger_to_req_handler[ledger_id] = req_handler
for txn_type in req_handler.valid_txn_types:
self.register_txn_type(txn_type, ledger_id, req_handler)
def register_req_handler(self, req_handler: RequestHandler,
ledger_id: int = None):
if ledger_id is not None:
self.ledger_to_req_handler[ledger_id] = req_handler
for txn_type in req_handler.operation_types:
self.register_txn_type(txn_type, req_handler, ledger_id)

def register_txn_type(self, txn_type, ledger_id: int, req_handler: RequestHandler):
def register_txn_type(self, txn_type, req_handler: RequestHandler,
ledger_id: int = None):
if txn_type in self.txn_type_to_req_handler:
raise ValueError('{} already registered for {}'
.format(txn_type, self.txn_type_to_req_handler[txn_type]))
self.txn_type_to_req_handler[txn_type] = req_handler
self.txn_type_to_ledger_id[txn_type] = ledger_id
if ledger_id is not None:
self.txn_type_to_ledger_id[txn_type] = ledger_id

def register_executer(self, ledger_id: int, executer: Callable):
self.requestExecuter[ledger_id] = executer
Expand Down Expand Up @@ -2030,7 +2043,12 @@ def processRequest(self, request: Request, frm: str):
def is_query(self, txn_type) -> bool:
# Does the transaction type correspond to a read?
handler = self.get_req_handler(txn_type=txn_type)
return handler and handler.is_query(txn_type)
return handler and isinstance(handler,
LedgerRequestHandler
) and handler.is_query(txn_type)

def is_action(self, txn_type) -> bool:
return txn_type in self.actionReqHandler.operation_types

def process_query(self, request: Request, frm: str):
# Process a read request from client
Expand All @@ -2043,6 +2061,17 @@ def process_query(self, request: Request, frm: str):
result = handler.get_query_response(request)
self.transmitToClient(Reply(result), frm)

def process_action(self, request, frm):
# Process an execute action request
self.send_ack_to_client(request.key, frm)
try:
self.actionReqHandler.validate(request)
result = self.actionReqHandler.apply(request)
self.transmitToClient(Reply(result), frm)
except Exception as ex:
self.transmitToClient(Reject(request.identifier,
request.reqId, str(ex)), frm)

# noinspection PyUnusedLocal
def processPropagate(self, msg: Propagate, frm):
"""
Expand Down
3 changes: 2 additions & 1 deletion plenum/server/pool_req_handler.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,14 +10,15 @@
from plenum.common.types import f
from plenum.persistence.util import txnsWithSeqNo
from plenum.server.domain_req_handler import DomainRequestHandler
from plenum.server.ledger_req_handler import LedgerRequestHandler
from plenum.server.req_handler import RequestHandler
from state.state import State
from stp_core.common.log import getlogger

logger = getlogger()


class PoolRequestHandler(RequestHandler):
class PoolRequestHandler(LedgerRequestHandler):
write_types = {NODE, }

def __init__(self, ledger: Ledger, state: State,
Expand Down
3 changes: 2 additions & 1 deletion plenum/server/req_authenticator.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,8 @@ def authenticate(self, req_data):
for authenticator in self._authenticators:
if authenticator.is_query(typ):
return set()
if not authenticator.is_write(typ):
if not (authenticator.is_write(typ) or
authenticator.is_action(typ)):
continue
rv = authenticator.authenticate(req_data) or set()
identifiers.update(rv)
Expand Down
Loading

0 comments on commit 851ad6f

Please sign in to comment.