Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[INDY-1173] add POOL_RESTART handling #582

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
25 commits
Select commit Hold shift + click to select a range
bf29651
INDY-1173: added ActionResult
Toktar Mar 22, 2018
fc53001
Merge branch 'master' of https://github.com/hyperledger/indy-plenum i…
Toktar Mar 23, 2018
682f54c
INDY-1173: Refactoring POOL_RESTART
Toktar Mar 27, 2018
0fdf438
INDY-1173: Bagfix POOL_RESTART
Toktar Mar 28, 2018
0ae333a
INDY-1173: refactoring
Toktar Mar 30, 2018
a3a7058
INDY-1173: refactoring
Toktar Apr 2, 2018
c432a8e
INDY-1173: refactoring
Toktar Apr 3, 2018
f8cfb54
INDY-1173: refactoring
Toktar Apr 3, 2018
fb9d395
Merge branch 'master' of https://github.com/hyperledger/indy-plenum i…
Toktar Apr 5, 2018
cf14932
INDY-1173: Refactoring code style
Toktar Apr 5, 2018
de736d3
INDY-1173: fixing problem after merge
Toktar Apr 6, 2018
d1b5502
INDY-1173: fixed bug in a register_req_handler
Toktar Apr 6, 2018
3cc6b12
Merge branch 'story-1173-add-command-to-restart-pool' of https://gith…
Toktar Apr 6, 2018
1503fff
INDY-1173: fixed code style in the node
Toktar Apr 6, 2018
a754c11
Merge branch 'master' of https://github.com/hyperledger/indy-plenum i…
Toktar Apr 10, 2018
25787cd
INDY-1173: Added implementation of abstract methods
Toktar Apr 10, 2018
5f07ea2
INDY-1173: fixed code style
Toktar Apr 10, 2018
c41c470
INDY-1173: fixed problems after merge
Toktar Apr 10, 2018
0a0cc96
INDY-1173: refactoring node
Toktar Apr 11, 2018
b89b699
INDY-1173:refactoring
Toktar Apr 12, 2018
8e501a2
INDY-1173:refactoring
Toktar Apr 12, 2018
007cc51
INDY-1173: refactoring
Toktar Apr 12, 2018
aa08fd5
Merge branch 'master' of https://github.com/hyperledger/indy-plenum i…
Toktar Apr 13, 2018
5605a5d
INDY-1184: bugfix
Toktar Apr 17, 2018
427e1f5
Merge branch 'master' of https://github.com/hyperledger/indy-plenum i…
Toktar Apr 17, 2018
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 6 additions & 3 deletions plenum/common/messages/node_messages.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,11 @@
from typing import TypeVar, NamedTuple

from plenum.common.constants import NOMINATE, BATCH, REELECTION, PRIMARY, BLACKLIST, REQACK, REQNACK, REJECT, \
POOL_LEDGER_TXNS, ORDERED, PROPAGATE, PREPREPARE, PREPARE, COMMIT, CHECKPOINT, THREE_PC_STATE, CHECKPOINT_STATE, \
REPLY, INSTANCE_CHANGE, LEDGER_STATUS, CONSISTENCY_PROOF, CATCHUP_REQ, CATCHUP_REP, VIEW_CHANGE_DONE, CURRENT_STATE, \
from plenum.common.constants import NOMINATE, BATCH, REELECTION, PRIMARY, \
BLACKLIST, REQACK, REQNACK, REJECT, \
POOL_LEDGER_TXNS, ORDERED, PROPAGATE, PREPREPARE, PREPARE, COMMIT, \
CHECKPOINT, THREE_PC_STATE, CHECKPOINT_STATE, \
REPLY, INSTANCE_CHANGE, LEDGER_STATUS, CONSISTENCY_PROOF, CATCHUP_REQ, \
CATCHUP_REP, VIEW_CHANGE_DONE, CURRENT_STATE, \
MESSAGE_REQUEST, MESSAGE_RESPONSE, OBSERVED_DATA, BATCH_COMMITTED
from plenum.common.messages.client_request import ClientMessageValidator
from plenum.common.messages.fields import NonNegativeNumberField, IterableField, \
Expand Down
1 change: 1 addition & 0 deletions plenum/common/types.py
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ class f: # provides a namespace for reusable field constants
ELECTION_DATA = Field('electionData', Any)
TXN_ID = Field('txnId', str)
REASON = Field('reason', Any)
IS_SUCCESS = Field('isSuccess', Any)
SENDER_CLIENT = Field('senderClient', str)
PP_TIME = Field("ppTime", float)
REQ_IDR = Field("reqIdr", List[Tuple[str, int]])
Expand Down
16 changes: 16 additions & 0 deletions plenum/server/action_req_handler.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
from plenum.common.request import Request
from plenum.server.req_handler import RequestHandler


class ActionReqHandler(RequestHandler):
def __init__(self):
super().__init__()

def doStaticValidation(self, request: Request):
pass

def validate(self, req: Request):
pass

def apply(self, req: Request, cons_time: int):
pass
6 changes: 6 additions & 0 deletions plenum/server/client_authn.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
InsufficientSignatures, InsufficientCorrectSignatures
from plenum.common.types import f
from plenum.common.verifier import DidVerifier, Verifier
from plenum.server.action_req_handler import ActionReqHandler
from plenum.server.domain_req_handler import DomainRequestHandler
from plenum.server.pool_req_handler import PoolRequestHandler
from stp_core.common.log import getlogger
Expand Down Expand Up @@ -179,13 +180,18 @@ class CoreAuthMixin:
).union(
DomainRequestHandler.query_types
)
action_types = ActionReqHandler.operation_types

def is_query(self, typ):
return typ in self.query_types

def is_write(self, typ):
return typ in self.write_types

@classmethod
def is_action(cls, typ):
return typ in cls.action_types

@staticmethod
def _extract_signature(msg):
if f.SIG.nm not in msg:
Expand Down
17 changes: 15 additions & 2 deletions plenum/server/config_req_handler.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,19 @@
from plenum.server.req_handler import RequestHandler
from plenum.common.request import Request
from plenum.server.ledger_req_handler import LedgerRequestHandler


class ConfigReqHandler(RequestHandler):
class ConfigReqHandler(LedgerRequestHandler):
def __init__(self, ledger, state):
super().__init__(ledger, state)

def doStaticValidation(self, request: Request):
pass

def get_query_response(self, request):
pass

def validate(self, req: Request):
pass

def apply(self, req: Request, cons_time: int):
pass
3 changes: 2 additions & 1 deletion plenum/server/domain_req_handler.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,13 +11,14 @@
from plenum.common.txn_util import reqToTxn
from plenum.common.types import f
from plenum.persistence.util import txnsWithSeqNo
from plenum.server.ledger_req_handler import LedgerRequestHandler
from plenum.server.req_handler import RequestHandler
from stp_core.common.log import getlogger

logger = getlogger()


class DomainRequestHandler(RequestHandler):
class DomainRequestHandler(LedgerRequestHandler):
stateSerializer = domain_state_serializer
write_types = {NYM, }

Expand Down
78 changes: 78 additions & 0 deletions plenum/server/ledger_req_handler.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,78 @@
from abc import ABCMeta, abstractmethod
from typing import List

import base58

from plenum.common.ledger import Ledger
from plenum.common.request import Request
from plenum.persistence.util import txnsWithSeqNo
from plenum.server.req_handler import RequestHandler
from stp_core.common.log import getlogger

from state.state import State

logger = getlogger()


class LedgerRequestHandler(RequestHandler, metaclass=ABCMeta):
"""
Base class for request handlers
Declares methods for validation, application of requests and
state control
"""

query_types = set()
write_types = set()

def __init__(self, ledger: Ledger, state: State):
self.state = state
self.ledger = ledger

def updateState(self, txns, isCommitted=False):
"""
Updates current state with a number of committed or
not committed transactions
"""

def commit(self, txnCount, stateRoot, txnRoot, ppTime) -> List:
"""
:param txnCount: The number of requests to commit (The actual requests
are picked up from the uncommitted list from the ledger)
:param stateRoot: The state trie root after the txns are committed
:param txnRoot: The txn merkle root after the txns are committed

:return: list of committed transactions
"""

(seqNoStart, seqNoEnd), committedTxns = \
self.ledger.commitTxns(txnCount)
stateRoot = base58.b58decode(stateRoot.encode())
# Probably the following assertion fail should trigger catchup
assert self.ledger.root_hash == txnRoot, '{} {}'.format(
self.ledger.root_hash, txnRoot)
self.state.commit(rootHash=stateRoot)
return txnsWithSeqNo(seqNoStart, seqNoEnd, committedTxns)

def onBatchCreated(self, state_root):
pass

def onBatchRejected(self):
pass

@abstractmethod
def doStaticValidation(self, request: Request):
pass

def is_query(self, txn_type):
return txn_type in self.query_types

def get_query_response(self, request):
raise NotImplementedError

@staticmethod
def transform_txn_for_ledger(txn):
return txn

@property
def operation_types(self) -> set:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This property must be a class property. So it must take the argument cls, not self.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It doesn't work because "./plenum/server/ledger_req_handler.py:77:9: N805 first argument of a method should be named 'self'"
https://ci.evernym.com/job/Indy-Plenum/job/indy-plenum-verify-x86_64/941/console

return self.write_types.union(self.query_types)
51 changes: 40 additions & 11 deletions plenum/server/node.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
from contextlib import closing
from functools import partial
from typing import Dict, Any, Mapping, Iterable, List, Optional, Set, Tuple, Callable

from plenum.server.ledger_req_handler import LedgerRequestHandler
from crypto.bls.bls_key_manager import LoadBLSKeyError
from intervaltree import IntervalTree
from ledger.compact_merkle_tree import CompactMerkleTree
Expand Down Expand Up @@ -58,6 +58,7 @@
from plenum.common.verifier import DidVerifier
from plenum.persistence.req_id_to_txn import ReqIdrToTxn
from plenum.persistence.storage import Storage, initStorage
from plenum.server.action_req_handler import ActionReqHandler
from plenum.server.blacklister import Blacklister
from plenum.server.blacklister import SimpleBlacklister
from plenum.server.client_authn import ClientAuthNr, SimpleAuthNr, CoreAuthNr
Expand Down Expand Up @@ -182,7 +183,7 @@ def __init__(self,
# init before domain req handler!
self.bls_bft = self._create_bls_bft()

self.register_req_handler(DOMAIN_LEDGER_ID, self.getDomainReqHandler())
self.register_req_handler(self.getDomainReqHandler(), DOMAIN_LEDGER_ID)
self.register_executer(DOMAIN_LEDGER_ID, self.executeDomainTxns)

self.initDomainState()
Expand All @@ -192,7 +193,7 @@ def __init__(self,
self.addGenesisNyms()

self.mode = None # type: Optional[Mode]
self.register_req_handler(POOL_LEDGER_ID, self.poolManager.reqHandler)
self.register_req_handler(self.poolManager.reqHandler, POOL_LEDGER_ID)

self.nodeReg = self.poolManager.nodeReg

Expand Down Expand Up @@ -424,6 +425,7 @@ def view_change_in_progress(self):
def init_config_state(self):
self.register_state(CONFIG_LEDGER_ID, self.loadConfigState())
self.setup_config_req_handler()
self.setup_action_req_handler()
self.initConfigState()

def _add_config_ledger(self):
Expand All @@ -436,7 +438,11 @@ def _add_config_ledger(self):

def setup_config_req_handler(self):
self.configReqHandler = self.getConfigReqHandler()
self.register_req_handler(CONFIG_LEDGER_ID, self.configReqHandler)
self.register_req_handler(self.configReqHandler, CONFIG_LEDGER_ID)

def setup_action_req_handler(self):
self.actionReqHandler = self.get_action_req_handler()
self.register_req_handler(self.actionReqHandler)

def getConfigLedger(self):
return Ledger(CompactMerkleTree(hashStore=self.getHashStore('config')),
Expand All @@ -460,6 +466,9 @@ def getConfigReqHandler(self):
return ConfigReqHandler(self.configLedger,
self.states[CONFIG_LEDGER_ID])

def get_action_req_handler(self):
return ActionReqHandler()

def postConfigLedgerCaughtUp(self, **kwargs):
pass

Expand Down Expand Up @@ -728,17 +737,21 @@ def on_new_ledger_added(self, ledger_id):
def register_state(self, ledger_id, state):
self.states[ledger_id] = state

def register_req_handler(self, ledger_id: int, req_handler: RequestHandler):
self.ledger_to_req_handler[ledger_id] = req_handler
for txn_type in req_handler.valid_txn_types:
self.register_txn_type(txn_type, ledger_id, req_handler)
def register_req_handler(self, req_handler: RequestHandler,
ledger_id: int = None):
if ledger_id is not None:
self.ledger_to_req_handler[ledger_id] = req_handler
for txn_type in req_handler.operation_types:
self.register_txn_type(txn_type, req_handler, ledger_id)

def register_txn_type(self, txn_type, ledger_id: int, req_handler: RequestHandler):
def register_txn_type(self, txn_type, req_handler: RequestHandler,
ledger_id: int = None):
if txn_type in self.txn_type_to_req_handler:
raise ValueError('{} already registered for {}'
.format(txn_type, self.txn_type_to_req_handler[txn_type]))
self.txn_type_to_req_handler[txn_type] = req_handler
self.txn_type_to_ledger_id[txn_type] = ledger_id
if ledger_id is not None:
self.txn_type_to_ledger_id[txn_type] = ledger_id

def register_executer(self, ledger_id: int, executer: Callable):
self.requestExecuter[ledger_id] = executer
Expand Down Expand Up @@ -2030,7 +2043,12 @@ def processRequest(self, request: Request, frm: str):
def is_query(self, txn_type) -> bool:
# Does the transaction type correspond to a read?
handler = self.get_req_handler(txn_type=txn_type)
return handler and handler.is_query(txn_type)
return handler and isinstance(handler,
LedgerRequestHandler
) and handler.is_query(txn_type)

def is_action(self, txn_type) -> bool:
return txn_type in self.actionReqHandler.operation_types

def process_query(self, request: Request, frm: str):
# Process a read request from client
Expand All @@ -2043,6 +2061,17 @@ def process_query(self, request: Request, frm: str):
result = handler.get_query_response(request)
self.transmitToClient(Reply(result), frm)

def process_action(self, request, frm):
# Process an execute action request
self.send_ack_to_client(request.key, frm)
try:
self.actionReqHandler.validate(request)
result = self.actionReqHandler.apply(request)
self.transmitToClient(Reply(result), frm)
except Exception as ex:
self.transmitToClient(Reject(request.identifier,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Reject must be sent to the sender only in case dynamic validation (validate) has failed.

request.reqId, str(ex)), frm)

# noinspection PyUnusedLocal
def processPropagate(self, msg: Propagate, frm):
"""
Expand Down
3 changes: 2 additions & 1 deletion plenum/server/pool_req_handler.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,14 +10,15 @@
from plenum.common.types import f
from plenum.persistence.util import txnsWithSeqNo
from plenum.server.domain_req_handler import DomainRequestHandler
from plenum.server.ledger_req_handler import LedgerRequestHandler
from plenum.server.req_handler import RequestHandler
from state.state import State
from stp_core.common.log import getlogger

logger = getlogger()


class PoolRequestHandler(RequestHandler):
class PoolRequestHandler(LedgerRequestHandler):
write_types = {NODE, }

def __init__(self, ledger: Ledger, state: State,
Expand Down
3 changes: 2 additions & 1 deletion plenum/server/req_authenticator.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,8 @@ def authenticate(self, req_data):
for authenticator in self._authenticators:
if authenticator.is_query(typ):
return set()
if not authenticator.is_write(typ):
if not (authenticator.is_write(typ) or
authenticator.is_action(typ)):
continue
rv = authenticator.authenticate(req_data) or set()
identifiers.update(rv)
Expand Down
Loading