Skip to content
This repository has been archived by the owner on Dec 15, 2021. It is now read-only.

Commit

Permalink
[LC-272] fixed to add limited tx_items to queue
Browse files Browse the repository at this point in the history
 - tx_items has maximum items that not over tx_max_size or tx_max_count
 - monkeypatch configure value in test_custom_tx_value
 - fixed blocking issue when set maxsize in TxMessagesQueue
  • Loading branch information
yakkle committed Feb 11, 2020
1 parent 493ee8d commit 2074b56
Show file tree
Hide file tree
Showing 4 changed files with 374 additions and 102 deletions.
61 changes: 13 additions & 48 deletions loopchain/baseservice/broadcast_scheduler.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,26 +9,20 @@
import threading
import time
from concurrent import futures
from enum import Enum
from functools import partial

import grpc
from grpc._channel import _Rendezvous

from loopchain import configure as conf, utils as util
from loopchain.baseservice import StubManager, ObjectManager, CommonThread, BroadcastCommand, \
TimerService, Timer
from loopchain.baseservice import (StubManager, ObjectManager, CommonThread, BroadcastCommand,
TimerService, Timer)
from loopchain.baseservice.module_process import ModuleProcess, ModuleProcessProperties
from loopchain.baseservice.tx_item_helper import TxItem
from loopchain.baseservice.tx_message import TxItem, TxMessagesQueue
from loopchain.channel.channel_property import ChannelProperty
from loopchain.protos import loopchain_pb2_grpc, loopchain_pb2


class PeerThreadStatus(Enum):
normal = 0
leader_complained = 1


class _Broadcaster:
"""broadcast class for each channel"""
THREAD_VARIABLE_PEER_STATUS = "peer_status"
Expand All @@ -38,8 +32,6 @@ def __init__(self, channel: str, self_target: str=None):
self.__self_target = self_target

self.__audience = {} # self.__audience[peer_target] = stub_manager
self.__thread_variables = dict()
self.__thread_variables[self.THREAD_VARIABLE_PEER_STATUS] = PeerThreadStatus.normal

if conf.IS_BROADCAST_ASYNC:
self.__broadcast_run = self.__broadcast_run_async
Expand All @@ -59,7 +51,7 @@ def __init__(self, channel: str, self_target: str=None):
"BroadcastVote"
}

self.stored_tx = queue.Queue()
self.tx_messages_queue: TxMessagesQueue = TxMessagesQueue()

self.__timer_service = TimerService()

Expand Down Expand Up @@ -218,48 +210,23 @@ def __handler_broadcast(self, broadcast_param):
# util.logger.debug("BroadcastThread method param: " + str(broadcast_method_param))
self.__broadcast_run(broadcast_method_name, broadcast_method_param, **broadcast_method_kwparam)

def __make_tx_list_message(self):
tx_list = []
tx_list_size = 0
tx_list_count = 0
remains = False
while not self.stored_tx.empty():
stored_tx_item = self.stored_tx.get()
tx_list_size += len(stored_tx_item)
tx_list_count += 1
if tx_list_size >= conf.MAX_TX_SIZE_IN_BLOCK or tx_list_count >= conf.MAX_TX_COUNT_IN_ADDTX_LIST:
self.stored_tx.put(stored_tx_item)
remains = True
break
tx_list.append(stored_tx_item.get_tx_message())
def __send_tx_by_timer(self, **kwargs):
# Send multiple tx
tx_messages = self.tx_messages_queue.pop()

message = loopchain_pb2.TxSendList(
channel=self.__channel,
tx_list=tx_list
tx_list=tx_messages.get_messages()
)

return remains, message

def __send_tx_by_timer(self, **kwargs):
# util.logger.spam(f"broadcast_scheduler:__send_tx_by_timer")
if self.__thread_variables[self.THREAD_VARIABLE_PEER_STATUS] == PeerThreadStatus.leader_complained:
logging.warning("Leader is complained your tx just stored in queue by temporally: "
+ str(self.stored_tx.qsize()))
else:
# Send single tx for test
# stored_tx_item = self.stored_tx.get()
# self.__broadcast_run("AddTx", stored_tx_item.get_tx_message())

# Send multiple tx
remains, message = self.__make_tx_list_message()
self.__broadcast_run("AddTxList", message)
if remains:
self.__send_tx_in_timer()
self.__broadcast_run("AddTxList", message)
if not self.tx_messages_queue.empty():
self.__send_tx_in_timer()

def __send_tx_in_timer(self, tx_item=None):
# util.logger.spam(f"broadcast_scheduler:__send_tx_in_timer")
duration = 0
if tx_item:
self.stored_tx.put(tx_item)
self.tx_messages_queue.append(tx_item)
duration = conf.SEND_TX_LIST_DURATION

if TimerService.TIMER_KEY_ADD_TX not in self.__timer_service.timer_list:
Expand All @@ -272,8 +239,6 @@ def __send_tx_in_timer(self, tx_item=None):
callback_kwargs={}
)
)
else:
pass

def __handler_create_tx(self, create_tx_param):
# logging.debug(f"Broadcast create_tx....")
Expand Down
54 changes: 0 additions & 54 deletions loopchain/baseservice/tx_item_helper.py

This file was deleted.

142 changes: 142 additions & 0 deletions loopchain/baseservice/tx_message.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,142 @@
"""helper class for TxMessages"""

import json
from queue import Queue
from typing import List

import sys

from loopchain import configure
from loopchain.blockchain.transactions import Transaction, TransactionVersioner, TransactionSerializer
from loopchain.protos import loopchain_pb2


class TxItem:
tx_serializers = {}

def __init__(self, tx_json: str, channel: str):
self.channel = channel
self.__tx_json = tx_json
self.__len = sys.getsizeof(tx_json) + sys.getsizeof(channel)

def __len__(self):
return self.__len

def get_tx_message(self):
message = loopchain_pb2.TxSend(
tx_json=self.__tx_json,
channel=self.channel)
return message

@classmethod
def create_tx_item(cls, tx_param: tuple, channel: str):
tx, tx_versioner = tx_param
tx_serializer = cls.get_serializer(tx, tx_versioner)
tx_item = TxItem(
json.dumps(tx_serializer.to_raw_data(tx)),
channel
)
return tx_item

@classmethod
def get_serializer(cls, tx: Transaction, tx_versioner: TransactionVersioner):
if tx.version not in cls.tx_serializers:
cls.tx_serializers[tx.version] = TransactionSerializer.new(tx.version, tx.type(), tx_versioner)
return cls.tx_serializers[tx.version]


class TxMessages:
def __init__(self):
self._total_size: int = 0
self._messages: List['TxSend'] = []

def __len__(self):
return len(self._messages)

def append(self, tx_item):
self._messages.append(tx_item.get_tx_message())
self._total_size += len(tx_item)

def get_messages(self) -> List['TxSend']:
return self._messages

def size(self) -> int:
return self._total_size


class TxMessagesQueue(Queue):
"""TXMessagesQueue is for blocking queue
enqueue item to queue 'append()'
dequeue item from queue 'pop()'
"""

def __init__(self, maxsize=0, max_tx_size=None, max_tx_count=None):
super().__init__(maxsize=maxsize)
self.max_tx_size = max_tx_size or configure.MAX_TX_SIZE_IN_BLOCK
self.max_tx_count = max_tx_count or configure.MAX_TX_COUNT_IN_ADDTX_LIST

self._tx_messages = TxMessages()

def __str__(self):
return (f"{self.__class__.__name__}(queue={self.qsize()}, "
f"tx_count={len(self._tx_messages)})")

def append(self, tx_item: TxItem):
with self.not_full:
tx_total_size = self._tx_messages.size() + len(tx_item)
tx_total_count = len(self._tx_messages) + 1

if tx_total_size >= self.max_tx_size or tx_total_count >= self.max_tx_count:
if self.maxsize > 0:
while self._qsize() >= self.maxsize:
self.not_full.wait()

self._put(self._tx_messages)
self._tx_messages = TxMessages()
self.unfinished_tasks += 1

self._tx_messages.append(tx_item)
self.not_empty.notify()

def pop(self) -> TxMessages:
with self.not_empty:
while not (self._qsize() or len(self._tx_messages)):
self.not_empty.wait()

if not self._qsize():
tx_messages = self._tx_messages
self._tx_messages = TxMessages()
else:
tx_messages = self._get()
self.not_full.notify()

return tx_messages

def empty(self) -> bool:
with self.mutex:
return not self._qsize() and len(self._tx_messages) <= 0

def put(self, item, block=True, timeout=False) -> None:
"""Put is not supported to prevent block and timeout parameter
use append() method instead of put()
"""
raise NotImplementedError

def get(self, block=True, timeout=None) -> None:
"""Get is not supported to prevent block and timeout parameter
use pop() method instead of get()
"""
raise NotImplementedError

def put_nowait(self, item) -> None:
"""Put without blocking is not supported
"""
raise NotImplementedError

def get_nowait(self) -> None:
"""Get without blocking is not supported
"""
raise NotImplementedError
Loading

0 comments on commit 2074b56

Please sign in to comment.