Skip to content
This repository has been archived by the owner on Dec 15, 2021. It is now read-only.

Commit

Permalink
[LC-272] fixed to add limited tx_items to queue
Browse files Browse the repository at this point in the history
 - tx_items has maximum items that not over tx_max_size or tx_max_count
  • Loading branch information
yakkle committed Jan 15, 2020
1 parent 3e8b98c commit 9cde387
Show file tree
Hide file tree
Showing 4 changed files with 288 additions and 102 deletions.
64 changes: 16 additions & 48 deletions loopchain/baseservice/broadcast_scheduler.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,25 +22,19 @@
import threading
import time
from concurrent import futures
from enum import Enum
from functools import partial

import grpc
from grpc._channel import _Rendezvous

from loopchain import configure as conf, utils as util
from loopchain.baseservice import StubManager, ObjectManager, CommonThread, BroadcastCommand, \
TimerService, Timer
from loopchain.baseservice import (StubManager, ObjectManager, CommonThread, BroadcastCommand,
TimerService, Timer)
from loopchain.baseservice.module_process import ModuleProcess, ModuleProcessProperties
from loopchain.baseservice.tx_item_helper import TxItem
from loopchain.baseservice.tx_message import TxItem, TxMessagesQueue
from loopchain.protos import loopchain_pb2_grpc, loopchain_pb2


class PeerThreadStatus(Enum):
normal = 0
leader_complained = 1


class _Broadcaster:
"""broadcast class for each channel"""
THREAD_VARIABLE_PEER_STATUS = "peer_status"
Expand All @@ -50,8 +44,6 @@ def __init__(self, channel: str, self_target: str=None):
self.__self_target = self_target

self.__audience = {} # self.__audience[peer_target] = stub_manager
self.__thread_variables = dict()
self.__thread_variables[self.THREAD_VARIABLE_PEER_STATUS] = PeerThreadStatus.normal

if conf.IS_BROADCAST_ASYNC:
self.__broadcast_run = self.__broadcast_run_async
Expand All @@ -71,7 +63,7 @@ def __init__(self, channel: str, self_target: str=None):
"BroadcastVote"
}

self.stored_tx = queue.Queue()
self.tx_messages_queue: TxMessagesQueue = TxMessagesQueue()

self.__timer_service = TimerService()

Expand Down Expand Up @@ -230,48 +222,26 @@ def __handler_broadcast(self, broadcast_param):
# util.logger.debug("BroadcastThread method param: " + str(broadcast_method_param))
self.__broadcast_run(broadcast_method_name, broadcast_method_param, **broadcast_method_kwparam)

def __make_tx_list_message(self):
tx_list = []
tx_list_size = 0
tx_list_count = 0
remains = False
while not self.stored_tx.empty():
stored_tx_item = self.stored_tx.get()
tx_list_size += len(stored_tx_item)
tx_list_count += 1
if tx_list_size >= conf.MAX_TX_SIZE_IN_BLOCK or tx_list_count >= conf.MAX_TX_COUNT_IN_ADDTX_LIST:
self.stored_tx.put(stored_tx_item)
remains = True
break
tx_list.append(stored_tx_item.get_tx_message())
def __send_tx_by_timer(self, **kwargs):
# Send multiple tx
tx_messages = self.tx_messages_queue.get()
if tx_messages is None:
util.logger.info(f"__send_tx_by_timer() : no more tx_messages")
return

message = loopchain_pb2.TxSendList(
channel=self.__channel,
tx_list=tx_list
tx_list=tx_messages.get_messages()
)

return remains, message

def __send_tx_by_timer(self, **kwargs):
# util.logger.spam(f"broadcast_scheduler:__send_tx_by_timer")
if self.__thread_variables[self.THREAD_VARIABLE_PEER_STATUS] == PeerThreadStatus.leader_complained:
logging.warning("Leader is complained your tx just stored in queue by temporally: "
+ str(self.stored_tx.qsize()))
else:
# Send single tx for test
# stored_tx_item = self.stored_tx.get()
# self.__broadcast_run("AddTx", stored_tx_item.get_tx_message())

# Send multiple tx
remains, message = self.__make_tx_list_message()
self.__broadcast_run("AddTxList", message)
if remains:
self.__send_tx_in_timer()
self.__broadcast_run("AddTxList", message)
if self.tx_messages_queue.remain():
self.__send_tx_in_timer()

def __send_tx_in_timer(self, tx_item=None):
# util.logger.spam(f"broadcast_scheduler:__send_tx_in_timer")
duration = 0
if tx_item:
self.stored_tx.put(tx_item)
self.tx_messages_queue.append(tx_item)
duration = conf.SEND_TX_LIST_DURATION

if TimerService.TIMER_KEY_ADD_TX not in self.__timer_service.timer_list:
Expand All @@ -284,8 +254,6 @@ def __send_tx_in_timer(self, tx_item=None):
callback_kwargs={}
)
)
else:
pass

def __handler_create_tx(self, create_tx_param):
# logging.debug(f"Broadcast create_tx....")
Expand Down
54 changes: 0 additions & 54 deletions loopchain/baseservice/tx_item_helper.py

This file was deleted.

106 changes: 106 additions & 0 deletions loopchain/baseservice/tx_message.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,106 @@
"""helper class for TxMessages"""

import json
import queue
import sys
from typing import List, Optional

from loopchain import configure
from loopchain.blockchain.transactions import Transaction, TransactionVersioner, TransactionSerializer
from loopchain.protos import loopchain_pb2


class TxItem:
tx_serializers = {}

def __init__(self, tx_json: str, channel: str):
self.channel = channel
self.__tx_json = tx_json
self.__len = sys.getsizeof(tx_json) + sys.getsizeof(channel)

def __len__(self):
return self.__len

def get_tx_message(self):
message = loopchain_pb2.TxSend(
tx_json=self.__tx_json,
channel=self.channel)
return message

@classmethod
def create_tx_item(cls, tx_param: tuple, channel: str):
tx, tx_versioner = tx_param
tx_serializer = cls.get_serializer(tx, tx_versioner)
tx_item = TxItem(
json.dumps(tx_serializer.to_raw_data(tx)),
channel
)
return tx_item

@classmethod
def get_serializer(cls, tx: Transaction, tx_versioner: TransactionVersioner):
if tx.version not in cls.tx_serializers:
cls.tx_serializers[tx.version] = TransactionSerializer.new(tx.version, tx.type(), tx_versioner)
return cls.tx_serializers[tx.version]


class TxMessages:
def __init__(self):
self.total_size = 0
self._transactions: List[TxItem] = []

def __len__(self):
return len(self._transactions)

def append(self, tx_item):
self._transactions.append(tx_item)
self.total_size += len(tx_item)

def get_messages(self) -> List:
tx_messages = [tx.get_tx_message() for tx in self._transactions]

return tx_messages

def reset(self):
self.total_size = 0
self._transactions.clear()


class TxMessagesQueue:
def __init__(self, max_tx_size=None, max_tx_count=None):
self.max_tx_size = max_tx_size or configure.MAX_TX_SIZE_IN_BLOCK
self.max_tx_count = max_tx_count or configure.MAX_TX_COUNT_IN_ADDTX_LIST

self._queue = queue.Queue()
self._tx_messages = TxMessages()

def get(self) -> Optional[TxMessages]:
if self._queue.empty():
tx_messages = self._tx_messages
if len(tx_messages) == 0:
return None
self._tx_messages = TxMessages()
else:
tx_messages = self._queue.get()

return tx_messages

def append(self, tx_item: TxItem):
tx_total_size = self._tx_messages.total_size + len(tx_item)
tx_total_count = len(self._tx_messages) + 1

if tx_total_size >= self.max_tx_size or tx_total_count >= self.max_tx_count:
self._queue.put(self._tx_messages)
self._tx_messages = TxMessages()

self._tx_messages.append(tx_item)

def remain(self) -> bool:
return not self._queue.empty() or len(self._tx_messages) > 0

def size(self) -> int:
return self._queue.qsize()

def __str__(self):
return (f"{self.__class__.__name__}(queue={self._queue.qsize()}, "
f"x_total_count = {len(self._tx_messages)})")
Loading

0 comments on commit 9cde387

Please sign in to comment.