-
Notifications
You must be signed in to change notification settings - Fork 45
feat(sync-v2): Add both BlockchainStreamingClient and TransactionStreamingClient to manage streamings from the client side #848
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Merged
Merged
Changes from all commits
Commits
File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
Large diffs are not rendered by default.
Oops, something went wrong.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,141 @@ | ||
| # Copyright 2023 Hathor Labs | ||
| # | ||
| # Licensed under the Apache License, Version 2.0 (the "License"); | ||
| # you may not use this file except in compliance with the License. | ||
| # You may obtain a copy of the License at | ||
| # | ||
| # http://www.apache.org/licenses/LICENSE-2.0 | ||
| # | ||
| # Unless required by applicable law or agreed to in writing, software | ||
| # distributed under the License is distributed on an "AS IS" BASIS, | ||
| # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
| # See the License for the specific language governing permissions and | ||
| # limitations under the License. | ||
|
|
||
| from typing import TYPE_CHECKING, Optional | ||
|
|
||
| from structlog import get_logger | ||
| from twisted.internet.defer import Deferred | ||
|
|
||
| from hathor.p2p.sync_v2.exception import ( | ||
| BlockNotConnectedToPreviousBlock, | ||
| InvalidVertexError, | ||
| StreamingError, | ||
| TooManyRepeatedVerticesError, | ||
| TooManyVerticesReceivedError, | ||
| ) | ||
| from hathor.p2p.sync_v2.streamers import StreamEnd | ||
| from hathor.transaction import Block | ||
| from hathor.transaction.exceptions import HathorError | ||
| from hathor.types import VertexId | ||
|
|
||
| if TYPE_CHECKING: | ||
| from hathor.p2p.sync_v2.agent import NodeBlockSync, _HeightInfo | ||
|
|
||
| logger = get_logger() | ||
|
|
||
|
|
||
| class BlockchainStreamingClient: | ||
| def __init__(self, sync_agent: 'NodeBlockSync', start_block: '_HeightInfo', end_block: '_HeightInfo') -> None: | ||
| self.sync_agent = sync_agent | ||
| self.protocol = self.sync_agent.protocol | ||
| self.tx_storage = self.sync_agent.tx_storage | ||
| self.manager = self.sync_agent.manager | ||
|
|
||
| self.log = logger.new(peer=self.protocol.get_short_peer_id()) | ||
|
|
||
| self.start_block = start_block | ||
| self.end_block = end_block | ||
|
|
||
| # When syncing blocks we start streaming with all peers | ||
| # so the moment I get some repeated blocks, I stop the download | ||
| # because it's probably a streaming that I've already received | ||
| self.max_repeated_blocks = 10 | ||
|
|
||
| self._deferred: Deferred[StreamEnd] = Deferred() | ||
|
|
||
| self._blk_received: int = 0 | ||
| self._blk_repeated: int = 0 | ||
|
|
||
| self._blk_max_quantity = self.end_block.height - self.start_block.height + 1 | ||
| self._reverse: bool = False | ||
| if self._blk_max_quantity < 0: | ||
| self._blk_max_quantity = -self._blk_max_quantity | ||
| self._reverse = True | ||
|
|
||
| self._last_received_block: Optional[Block] = None | ||
|
|
||
| self._partial_blocks: list[Block] = [] | ||
|
|
||
| def wait(self) -> Deferred[StreamEnd]: | ||
| """Return the deferred.""" | ||
| return self._deferred | ||
|
|
||
| def fails(self, reason: 'StreamingError') -> None: | ||
| """Fail the execution by resolving the deferred with an error.""" | ||
| self._deferred.errback(reason) | ||
|
|
||
| def partial_vertex_exists(self, vertex_id: VertexId) -> bool: | ||
| """Return true if the vertex exists no matter its validation state.""" | ||
| with self.tx_storage.allow_partially_validated_context(): | ||
| return self.tx_storage.transaction_exists(vertex_id) | ||
|
|
||
| def handle_blocks(self, blk: Block) -> None: | ||
| """This method is called by the sync agent when a BLOCKS message is received.""" | ||
| if self._deferred.called: | ||
| return | ||
|
|
||
| self._blk_received += 1 | ||
| if self._blk_received > self._blk_max_quantity: | ||
| self.log.warn('too many blocks received', | ||
| blk_received=self._blk_received, | ||
| blk_max_quantity=self._blk_max_quantity) | ||
| self.fails(TooManyVerticesReceivedError()) | ||
| return | ||
|
|
||
| assert blk.hash is not None | ||
| is_duplicated = False | ||
| if self.partial_vertex_exists(blk.hash): | ||
| # We reached a block we already have. Skip it. | ||
| self._blk_repeated += 1 | ||
| is_duplicated = True | ||
| if self._blk_repeated > self.max_repeated_blocks: | ||
| self.log.debug('too many repeated block received', total_repeated=self._blk_repeated) | ||
| self.fails(TooManyRepeatedVerticesError()) | ||
jansegre marked this conversation as resolved.
Show resolved
Hide resolved
|
||
|
|
||
| # basic linearity validation, crucial for correctly predicting the next block's height | ||
| if self._reverse: | ||
| if self._last_received_block and blk.hash != self._last_received_block.get_block_parent_hash(): | ||
| self.fails(BlockNotConnectedToPreviousBlock()) | ||
| return | ||
| else: | ||
| if self._last_received_block and blk.get_block_parent_hash() != self._last_received_block.hash: | ||
| self.fails(BlockNotConnectedToPreviousBlock()) | ||
| return | ||
|
|
||
| try: | ||
| # this methods takes care of checking if the block already exists, | ||
| # it will take care of doing at least a basic validation | ||
| if is_duplicated: | ||
| self.log.debug('block early terminate?', blk_id=blk.hash.hex()) | ||
| else: | ||
| self.log.debug('block received', blk_id=blk.hash.hex()) | ||
| self.sync_agent.on_new_tx(blk, propagate_to_peers=False, quiet=True) | ||
| except HathorError: | ||
| self.fails(InvalidVertexError()) | ||
| return | ||
| else: | ||
| self._last_received_block = blk | ||
| self._blk_repeated = 0 | ||
| # XXX: debugging log, maybe add timing info | ||
| if self._blk_received % 500 == 0: | ||
| self.log.debug('block streaming in progress', blocks_received=self._blk_received) | ||
|
|
||
| if not blk.can_validate_full(): | ||
| self._partial_blocks.append(blk) | ||
|
|
||
| def handle_blocks_end(self, response_code: StreamEnd) -> None: | ||
| """This method is called by the sync agent when a BLOCKS-END message is received.""" | ||
| if self._deferred.called: | ||
| return | ||
| self._deferred.callback(response_code) | ||
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,37 @@ | ||
| # Copyright 2023 Hathor Labs | ||
| # | ||
| # Licensed under the Apache License, Version 2.0 (the "License"); | ||
| # you may not use this file except in compliance with the License. | ||
| # You may obtain a copy of the License at | ||
| # | ||
| # http://www.apache.org/licenses/LICENSE-2.0 | ||
| # | ||
| # Unless required by applicable law or agreed to in writing, software | ||
| # distributed under the License is distributed on an "AS IS" BASIS, | ||
| # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
| # See the License for the specific language governing permissions and | ||
| # limitations under the License. | ||
|
|
||
| class StreamingError(Exception): | ||
| """Base error for sync-v2 streaming.""" | ||
| pass | ||
|
|
||
|
|
||
| class TooManyVerticesReceivedError(StreamingError): | ||
| """Raised when the other peer sent too many vertices.""" | ||
| pass | ||
|
|
||
|
|
||
| class TooManyRepeatedVerticesError(StreamingError): | ||
| """Raised when the other peer sent too many repeated vertices.""" | ||
| pass | ||
|
|
||
|
|
||
| class BlockNotConnectedToPreviousBlock(StreamingError): | ||
| """Raised when the received block is not connected to the previous one.""" | ||
| pass | ||
|
|
||
|
|
||
| class InvalidVertexError(StreamingError): | ||
| """Raised when the received vertex fails validation.""" | ||
| pass |
glevco marked this conversation as resolved.
Show resolved
Hide resolved
|
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,73 @@ | ||
| # Copyright 2023 Hathor Labs | ||
| # | ||
| # Licensed under the Apache License, Version 2.0 (the "License"); | ||
| # you may not use this file except in compliance with the License. | ||
| # You may obtain a copy of the License at | ||
| # | ||
| # http://www.apache.org/licenses/LICENSE-2.0 | ||
| # | ||
| # Unless required by applicable law or agreed to in writing, software | ||
| # distributed under the License is distributed on an "AS IS" BASIS, | ||
| # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
| # See the License for the specific language governing permissions and | ||
| # limitations under the License. | ||
|
|
||
| from pydantic import validator | ||
|
|
||
| from hathor.types import VertexId | ||
| from hathor.utils.pydantic import BaseModel | ||
|
|
||
|
|
||
| class PayloadBaseModel(BaseModel): | ||
|
|
||
| @classmethod | ||
| def convert_hex_to_bytes(cls, value: str | VertexId) -> VertexId: | ||
| """Convert a string in hex format to bytes. If bytes are given, it does nothing.""" | ||
| if isinstance(value, str): | ||
| return bytes.fromhex(value) | ||
| elif isinstance(value, VertexId): | ||
| return value | ||
jansegre marked this conversation as resolved.
Show resolved
Hide resolved
|
||
| raise ValueError('invalid type') | ||
|
|
||
| class Config: | ||
| json_encoders = { | ||
| VertexId: lambda x: x.hex() | ||
| } | ||
|
|
||
|
|
||
| class GetNextBlocksPayload(PayloadBaseModel): | ||
| """GET-NEXT-BLOCKS message is used to request a stream of blocks in the best blockchain.""" | ||
|
|
||
| start_hash: VertexId | ||
| end_hash: VertexId | ||
| quantity: int | ||
|
|
||
| @validator('start_hash', 'end_hash', pre=True) | ||
| def validate_bytes_fields(cls, value: str | bytes) -> VertexId: | ||
| return cls.convert_hex_to_bytes(value) | ||
|
|
||
|
|
||
| class BestBlockPayload(PayloadBaseModel): | ||
| """BEST-BLOCK message is used to send information about the current best block.""" | ||
|
|
||
| block: VertexId | ||
| height: int | ||
|
|
||
| @validator('block', pre=True) | ||
| def validate_bytes_fields(cls, value: str | VertexId) -> VertexId: | ||
| return cls.convert_hex_to_bytes(value) | ||
|
|
||
|
|
||
| class GetTransactionsBFSPayload(PayloadBaseModel): | ||
| """GET-TRANSACTIONS-BFS message is used to request a stream of transactions confirmed by blocks.""" | ||
| start_from: list[VertexId] | ||
| first_block_hash: VertexId | ||
| last_block_hash: VertexId | ||
|
|
||
| @validator('first_block_hash', 'last_block_hash', pre=True) | ||
| def validate_bytes_fields(cls, value: str | VertexId) -> VertexId: | ||
| return cls.convert_hex_to_bytes(value) | ||
|
|
||
| @validator('start_from', pre=True, each_item=True) | ||
| def validate_start_from(cls, value: str | VertexId) -> VertexId: | ||
| return cls.convert_hex_to_bytes(value) | ||
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Oops, something went wrong.
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
Uh oh!
There was an error while loading. Please reload this page.