diff --git a/.gitignore b/.gitignore index ed2a59a..2956e43 100644 --- a/.gitignore +++ b/.gitignore @@ -50,3 +50,6 @@ venv/ ENV/ .idea/ + +*.json +*.txt \ No newline at end of file diff --git a/README_CONTAINER_REGISTRY.md b/README_CONTAINER_REGISTRY.md new file mode 100644 index 0000000..da49514 --- /dev/null +++ b/README_CONTAINER_REGISTRY.md @@ -0,0 +1,5 @@ +BITCOINETL_STREAMING_VERSION=1.4-streaming + docker build -t merklescience/bitcoin-etl:${BITCOINETL_STREAMING_VERSION} -f Dockerfile_with_streaming . + docker tag merklescience/bitcoin-etl:${BITCOINETL_STREAMING_VERSION} us.gcr.io/staging-btc-etl/merklescience/bitcoin-etl:${BITCOINETL_STREAMING_VERSION} + docker push us.gcr.io/staging-btc-etl/merklescience/bitcoin-etl:${BITCOINETL_STREAMING_VERSION} + diff --git a/bitcoinetl/cli/export_all.py b/bitcoinetl/cli/export_all.py index 2628e8a..9ef1a7b 100644 --- a/bitcoinetl/cli/export_all.py +++ b/bitcoinetl/cli/export_all.py @@ -96,10 +96,7 @@ def get_partitions(start, end, partition_batch_size, provider_uri): @click.option('-c', '--chain', default=Chain.BITCOIN, type=click.Choice(Chain.ALL), help='The type of chain.') @click.option('--enrich', default=False, type=bool, help='Enable filling in transactions inputs fields.') -@click.option('--coin-price-type', default=CoinPriceType.empty, type=int, - help='Enable querying CryptoCompare for coin prices. 0 for no price, 1 for daily price, 2 for hourly price.') -def export_all(start, end, partition_batch_size, provider_uri, output_dir, max_workers, export_batch_size, chain, enrich, coin_price_type): +def export_all(start, end, partition_batch_size, provider_uri, output_dir, max_workers, export_batch_size, chain, enrich): """Exports all data for a range of blocks.""" do_export_all(chain, get_partitions(start, end, partition_batch_size, provider_uri), - output_dir, provider_uri, max_workers, export_batch_size, enrich, - coin_price_type) + output_dir, provider_uri, max_workers, export_batch_size, enrich) diff --git a/bitcoinetl/cli/export_blocks_and_transactions.py b/bitcoinetl/cli/export_blocks_and_transactions.py index 9d75e08..6c918dc 100644 --- a/bitcoinetl/cli/export_blocks_and_transactions.py +++ b/bitcoinetl/cli/export_blocks_and_transactions.py @@ -48,11 +48,8 @@ 'If not provided transactions will not be exported. Use "-" for stdout') @click.option('-c', '--chain', default=Chain.BITCOIN, type=click.Choice(Chain.ALL), help='The type of chain') -@click.option('--coin-price-type', default=CoinPriceType.empty, type=int, - help='Enable querying CryptoCompare for coin prices. 0 for no price, 1 for daily price, 2 for hourly price.') def export_blocks_and_transactions(start_block, end_block, batch_size, provider_uri, - max_workers, blocks_output, transactions_output, chain, - coin_price_type): + max_workers, blocks_output, transactions_output, chain,): """Export blocks and transactions.""" if blocks_output is None and transactions_output is None: raise ValueError('Either --blocks-output or --transactions-output options must be provided') @@ -67,6 +64,5 @@ def export_blocks_and_transactions(start_block, end_block, batch_size, provider_ chain=chain, export_blocks=blocks_output is not None, export_transactions=transactions_output is not None, - coin_price_type=coin_price_type ) job.run() diff --git a/bitcoinetl/cli/stream.py b/bitcoinetl/cli/stream.py index 46170df..b5ec94f 100644 --- a/bitcoinetl/cli/stream.py +++ b/bitcoinetl/cli/stream.py @@ -43,16 +43,17 @@ 'If not specified will print to console.') @click.option('-s', '--start-block', default=None, type=int, help='Start block.') @click.option('-c', '--chain', default=Chain.BITCOIN, type=click.Choice(Chain.ALL), help='The type of chain.') -@click.option('--period-seconds', default=10, type=int, help='How many seconds to sleep between syncs.') -@click.option('-b', '--batch-size', default=2, type=int, help='How many blocks to batch in single request.') +@click.option('--period-seconds', default=1, type=int, help='How many seconds to sleep between syncs.') +@click.option('-b', '--batch-size', default=1, type=int, help='How many blocks to batch in single request.') @click.option('-B', '--block-batch-size', default=10, type=int, help='How many blocks to batch in single sync round.') @click.option('-w', '--max-workers', default=5, type=int, help='The number of workers.') @click.option('--log-file', default=None, type=str, help='Log file.') @click.option('--pid-file', default=None, type=str, help='pid file.') @click.option('--enrich', default=True, type=bool, help='Enable filling in transactions inputs fields.') +@click.option('--retry_errors', default=True, type=bool, help='Enable Retry on streaming failures') def stream(last_synced_block_file, lag, provider_uri, output, start_block, chain=Chain.BITCOIN, - period_seconds=10, batch_size=2, block_batch_size=10, max_workers=5, log_file=None, pid_file=None, - enrich=True): + period_seconds=1, batch_size=1, block_batch_size=10, max_workers=5, log_file=None, pid_file=None, + enrich=True, retry_errors=True): """Streams all data types to console or Google Pub/Sub.""" configure_logging(log_file) configure_signals() @@ -67,7 +68,7 @@ def stream(last_synced_block_file, lag, provider_uri, output, start_block, chain chain=chain, batch_size=batch_size, enable_enrich=enrich, - max_workers=max_workers + max_workers=max_workers, ) streamer = Streamer( blockchain_streamer_adapter=streamer_adapter, @@ -77,5 +78,6 @@ def stream(last_synced_block_file, lag, provider_uri, output, start_block, chain period_seconds=period_seconds, block_batch_size=block_batch_size, pid_file=pid_file, + retry_errors=retry_errors ) streamer.stream() diff --git a/bitcoinetl/domain/block.py b/bitcoinetl/domain/block.py index 5e5ee7d..dab89f2 100644 --- a/bitcoinetl/domain/block.py +++ b/bitcoinetl/domain/block.py @@ -54,7 +54,6 @@ def __init__(self): self.block_reward = None self.transaction_fees = None - self.coin_price_usd = None self.coinbase_txid = None self.coinbase_param_decoded = None diff --git a/bitcoinetl/domain/transaction.py b/bitcoinetl/domain/transaction.py index de66cad..6ae4b75 100644 --- a/bitcoinetl/domain/transaction.py +++ b/bitcoinetl/domain/transaction.py @@ -50,7 +50,6 @@ def __init__(self): self.input_value = None self.output_count = None self.output_value = None - self.coin_price_usd = None def add_input(self, input): if len(self.inputs) > 0: diff --git a/bitcoinetl/enumeration/chain.py b/bitcoinetl/enumeration/chain.py index ff7e656..e59e266 100644 --- a/bitcoinetl/enumeration/chain.py +++ b/bitcoinetl/enumeration/chain.py @@ -1,6 +1,7 @@ class Chain: BITCOIN = 'bitcoin' BITCOIN_CASH = 'bitcoin_cash' + BITCOIN_CASH_SV = 'bitcoin_cash_sv' BITCOIN_GOLD = 'bitcoin_gold' DOGECOIN = 'dogecoin' LITECOIN = 'litecoin' @@ -8,7 +9,7 @@ class Chain: ZCASH = 'zcash' MONACOIN = 'monacoin' - ALL = [BITCOIN, BITCOIN_CASH, BITCOIN_GOLD, DOGECOIN, LITECOIN, DASH, ZCASH, MONACOIN] + ALL = [BITCOIN, BITCOIN_CASH, BITCOIN_CASH_SV, BITCOIN_GOLD, DOGECOIN, LITECOIN, DASH, ZCASH, MONACOIN] # Old API doesn't support verbosity for getblock which doesn't allow querying all transactions in a block in 1 go. HAVE_OLD_API = [BITCOIN_CASH, DOGECOIN, DASH, MONACOIN] @@ -17,6 +18,7 @@ def ticker_symbol(cls, chain): symbols = { 'bitcoin': 'BTC', 'bitcoin_cash': 'BCH', + 'bitcoin_cash_sv': 'BSV', 'dogecoin': 'DOGE', 'litecoin': 'LTC', 'dash': 'DASH', diff --git a/bitcoinetl/jobs/enrich_transactions.py b/bitcoinetl/jobs/enrich_transactions.py index a100f84..dec7dd7 100644 --- a/bitcoinetl/jobs/enrich_transactions.py +++ b/bitcoinetl/jobs/enrich_transactions.py @@ -63,7 +63,7 @@ def _enrich_transactions(self, transactions): input_transactions_map = self._get_input_transactions_as_map(transaction_input_batch) for input in transaction_input_batch: output = self._get_output_for_input(input, input_transactions_map) \ - if input.spent_transaction_hash is not None else None + if input.create_transaction_id is not None else None if output is not None: input.required_signatures = output.required_signatures input.type = output.type @@ -74,29 +74,29 @@ def _enrich_transactions(self, transactions): self.item_exporter.export_item(self.transaction_mapper.transaction_to_dict(transaction)) def _get_input_transactions_as_map(self, transaction_inputs): - transaction_hashes = [input.spent_transaction_hash for input in transaction_inputs - if input.spent_transaction_hash is not None] + transaction_hashes = [input.create_transaction_id for input in transaction_inputs + if input.create_transaction_id is not None] transaction_hashes = set(transaction_hashes) if len(transaction_hashes) > 0: transactions = self.btc_service.get_transactions_by_hashes(transaction_hashes) - return {transaction.hash: transaction for transaction in transactions} + return {transaction.transaction_id: transaction for transaction in transactions} else: return {} def _get_output_for_input(self, transaction_input, input_transactions_map): - spent_transaction_hash = transaction_input.spent_transaction_hash - input_transaction = input_transactions_map.get(spent_transaction_hash) + create_transaction_id = transaction_input.create_transaction_id + input_transaction = input_transactions_map.get(create_transaction_id) if input_transaction is None: - raise ValueError('Input transaction with hash {} not found'.format(spent_transaction_hash)) + raise ValueError('Input transaction with hash {} not found'.format(create_transaction_id)) - spent_output_index = transaction_input.spent_output_index - if input_transaction.outputs is None or len(input_transaction.outputs) < (spent_output_index + 1): + create_output_index = transaction_input.create_output_index + if input_transaction.outputs is None or len(input_transaction.outputs) < (create_output_index + 1): raise ValueError( 'There is no output with index {} in transaction with hash {}'.format( - spent_output_index, spent_transaction_hash)) + create_output_index, create_transaction_id)) - output = input_transaction.outputs[spent_output_index] + output = input_transaction.outputs[create_output_index] return output def _end(self): diff --git a/bitcoinetl/jobs/export_all.py b/bitcoinetl/jobs/export_all.py index e3a75d2..c86e94d 100644 --- a/bitcoinetl/jobs/export_all.py +++ b/bitcoinetl/jobs/export_all.py @@ -42,7 +42,6 @@ def export_all( chain, partitions, output_dir, provider_uri, max_workers, batch_size, enrich, - coin_price_type ): for batch_start_block, batch_end_block, partition_dir, *args in partitions: # # # start # # # @@ -105,7 +104,6 @@ def export_all( item_exporter=blocks_and_transactions_item_exporter(blocks_file, transactions_file), export_blocks=blocks_file is not None, export_transactions=transactions_file is not None, - coin_price_type=coin_price_type, ) job.run() diff --git a/bitcoinetl/jobs/export_blocks_job.py b/bitcoinetl/jobs/export_blocks_job.py index e19b2a1..bbb7d98 100644 --- a/bitcoinetl/jobs/export_blocks_job.py +++ b/bitcoinetl/jobs/export_blocks_job.py @@ -42,8 +42,7 @@ def __init__( item_exporter, chain, export_blocks=True, - export_transactions=True, - coin_price_type=CoinPriceType.empty): + export_transactions=True): validate_range(start_block, end_block) self.start_block = start_block @@ -57,7 +56,7 @@ def __init__( if not self.export_blocks and not self.export_transactions: raise ValueError('At least one of export_blocks or export_transactions must be True') - self.btc_service = BtcService(bitcoin_rpc, chain, coin_price_type) + self.btc_service = BtcService(bitcoin_rpc, chain) self.block_mapper = BtcBlockMapper() self.transaction_mapper = BtcTransactionMapper() diff --git a/bitcoinetl/jobs/exporters/blocks_and_transactions_item_exporter.py b/bitcoinetl/jobs/exporters/blocks_and_transactions_item_exporter.py index 243d20b..d8e1773 100644 --- a/bitcoinetl/jobs/exporters/blocks_and_transactions_item_exporter.py +++ b/bitcoinetl/jobs/exporters/blocks_and_transactions_item_exporter.py @@ -46,7 +46,6 @@ "transaction_count", "block_reward", "transaction_ids", - "coin_price_usd", ] @@ -69,7 +68,6 @@ 'output_value', 'inputs', 'outputs', - 'coin_price_usd', ] diff --git a/bitcoinetl/mappers/block_mapper.py b/bitcoinetl/mappers/block_mapper.py index d72e7f5..59bcabf 100644 --- a/bitcoinetl/mappers/block_mapper.py +++ b/bitcoinetl/mappers/block_mapper.py @@ -68,7 +68,6 @@ def json_dict_to_block(self, json_dict): block.chain_work = json_dict.get("chainwork") block.coinbase_txid = json_dict.get("coinbase_txid") block.previous_block_hash = json_dict.get("previousblockhash") - block.coin_price_usd = json_dict.get('coin_price_usd') return block def block_to_dict(self, block): @@ -94,7 +93,6 @@ def block_to_dict(self, block): 'difficulty': block.difficulty, 'chain_work': block.chain_work, 'previous_block_hash': block.previous_block_hash, - "coin_price_usd": block.coin_price_usd, "transaction_ids": block.transaction_ids } diff --git a/bitcoinetl/mappers/transaction_input_mapper.py b/bitcoinetl/mappers/transaction_input_mapper.py index 468ed04..69fe0f6 100644 --- a/bitcoinetl/mappers/transaction_input_mapper.py +++ b/bitcoinetl/mappers/transaction_input_mapper.py @@ -48,8 +48,8 @@ def json_dict_to_input(self, json_dict, spending_transaction_id=None): input.sequence = json_dict.get('sequence') if 'scriptSig' in json_dict: - input.script_asm = (json_dict.get('scriptSig')).get('asm') - input.script_hex = (json_dict.get('scriptSig')).get('hex') + input.script_asm = '' #(json_dict.get('scriptSig')).get('asm') + input.script_hex = '' #(json_dict.get('scriptSig')).get('hex') return input @@ -63,8 +63,8 @@ def inputs_to_dicts(self, inputs): 'create_output_index': input.create_output_index, 'sequence': input.sequence, - 'script_asm': input.script_asm, - 'script_hex': input.script_hex, + 'script_asm': '', #input.script_asm + 'script_hex': '', #input.script_hex 'required_signatures': input.required_signatures, 'addresses': input.addresses, @@ -83,8 +83,8 @@ def dicts_to_inputs(self, dicts): input.index = dict.get('index') input.create_transaction_id = dict.get('create_transaction_id') input.create_output_index = dict.get('create_output_index') - input.script_asm = dict.get('script_asm') - input.script_hex = dict.get('script_hex') + input.script_asm = '' #dict.get('script_asm') + input.script_hex = '' #dict.get('script_hex') input.sequence = dict.get('sequence') input.required_signatures = dict.get('required_signatures') input.type = dict.get('type') diff --git a/bitcoinetl/mappers/transaction_mapper.py b/bitcoinetl/mappers/transaction_mapper.py index c1094b6..14899ba 100644 --- a/bitcoinetl/mappers/transaction_mapper.py +++ b/bitcoinetl/mappers/transaction_mapper.py @@ -36,7 +36,7 @@ def __init__(self): self.transaction_output_mapper = BtcTransactionOutputMapper() self.join_split_mapper = BtcJoinSplitMapper() - def json_dict_to_transaction(self, json_dict, block=None, index=None, coin_price_usd=None): + def json_dict_to_transaction(self, json_dict, block=None, index=None): transaction = BtcTransaction() transaction.hash = json_dict.get('hash') transaction.size = json_dict.get('size') @@ -72,8 +72,6 @@ def json_dict_to_transaction(self, json_dict, block=None, index=None, coin_price transaction.join_splits = self.join_split_mapper.vjoinsplit_to_join_splits(json_dict.get('vjoinsplit')) transaction.value_balance = bitcoin_to_satoshi(json_dict.get('valueBalance')) - # New fields - transaction.coin_price_usd = coin_price_usd transaction.weight = json_dict.get('weight') transaction.output_addresses = self.get_output_addresses(transaction) return transaction @@ -104,7 +102,6 @@ def transaction_to_dict(self, transaction): 'input_value': transaction.calculate_input_value(), 'output_value': transaction.calculate_output_value(), 'fee': transaction.calculate_fee(), - 'coin_price_usd': transaction.coin_price_usd, 'weight': transaction.weight, 'output_addresses': transaction.output_addresses } @@ -123,7 +120,6 @@ def dict_to_transaction(self, dict): transaction.block_timestamp = dict.get('block_timestamp') transaction.is_coinbase = dict.get('is_coinbase') transaction.index = dict.get('index') - transaction.coin_price_usd = dict.get('coin_price_usd') transaction.weight = dict.get('weight') transaction.output_addresses = dict.get('output_addresses') transaction.input_addresses = dict.get('input_addresses') diff --git a/bitcoinetl/mappers/transaction_output_mapper.py b/bitcoinetl/mappers/transaction_output_mapper.py index ffc8e87..edd0361 100644 --- a/bitcoinetl/mappers/transaction_output_mapper.py +++ b/bitcoinetl/mappers/transaction_output_mapper.py @@ -38,16 +38,17 @@ def json_dict_to_output(self, json_dict, create_transaction_id=None): output.index = json_dict.get('n') output.addresses = json_dict.get('addresses') - output.txinwitness = json_dict.get('txinwitness') + output.witness = json_dict.get('txinwitness') output.value = bitcoin_to_satoshi(json_dict.get('value')) output.create_transaction_id = create_transaction_id if 'scriptPubKey' in json_dict: script_pub_key = json_dict.get('scriptPubKey') - output.script_asm = script_pub_key.get('asm') - output.script_hex = script_pub_key.get('hex') + output.script_asm = '' #script_pub_key.get('asm') + output.script_hex = '' #script_pub_key.get('hex') output.required_signatures = script_pub_key.get('reqSigs') output.type = script_pub_key.get('type') + #output.addresses = script_pub_key.get('addresses') if script_pub_key.get('addresses') is not None and len(script_pub_key.get('addresses')) > 0: output.addresses = script_pub_key.get('addresses') elif script_pub_key.get('address') is None: @@ -65,16 +66,16 @@ def outputs_to_dicts(self, outputs): 'create_transaction_id': output.create_transaction_id, 'spending_transaction_id': None, - 'script_asm': output.script_asm, - 'script_hex': output.script_hex, + 'script_asm': '', #output.script_asm + 'script_hex': '', #output.script_hex 'type': output.type, 'addresses': output.addresses, 'value': output.value, 'required_signatures': output.required_signatures, } - if output.txinwitness: - item['witness'] = output.txinwitness + if output.witness: + item['witness'] = output.witness result.append(item) return result @@ -84,8 +85,8 @@ def dicts_to_outputs(self, dicts): for dict in dicts: input = BtcTransactionOutput() input.index = dict.get('index') - input.script_asm = dict.get('script_asm') - input.script_hex = dict.get('script_hex') + input.script_asm = '' #dict.get('script_asm') + input.script_hex = '' #dict.get('script_hex') input.required_signatures = dict.get('required_signatures') input.type = dict.get('type') input.addresses = dict.get('addresses') diff --git a/bitcoinetl/service/btc_service.py b/bitcoinetl/service/btc_service.py index c2bcc44..e59b1dd 100644 --- a/bitcoinetl/service/btc_service.py +++ b/bitcoinetl/service/btc_service.py @@ -30,22 +30,14 @@ from bitcoinetl.service.btc_script_service import script_hex_to_non_standard_address from bitcoinetl.service.genesis_transactions import GENESIS_TRANSACTIONS from blockchainetl.utils import rpc_response_batch_to_results, dynamic_batch_iterator -from blockchainetl.cryptocompare import ( - get_coin_price, - get_hour_id_from_ts, - get_day_id_from_ts, - get_ts_from_hour_id, - get_ts_from_day_id -) class BtcService(object): - def __init__(self, bitcoin_rpc, chain=Chain.BITCOIN, coin_price_type=CoinPriceType.empty): + def __init__(self, bitcoin_rpc, chain=Chain.BITCOIN): self.bitcoin_rpc = bitcoin_rpc self.block_mapper = BtcBlockMapper() self.transaction_mapper = BtcTransactionMapper() self.chain = chain - self.coin_price_type = coin_price_type self.cached_prices = {} def get_block(self, block_number, with_transactions=False): @@ -82,14 +74,11 @@ def get_blocks_by_hashes(self, block_hash_batch, with_transactions=True): if self.chain in Chain.HAVE_OLD_API and with_transactions: self._fetch_transactions(blocks) - self._add_coin_price_to_blocks(blocks, self.coin_price_type) - for block in blocks: self._remove_coinbase_input(block) if block.has_full_transactions(): for transaction in block.transactions: - self._add_coin_price_to_transaction(transaction, block.coin_price_usd) self._add_non_standard_addresses(transaction) if self.chain == Chain.ZCASH: self._add_shielded_inputs_and_outputs(transaction) @@ -178,6 +167,8 @@ def _remove_coinbase_input(self, block): def _add_non_standard_addresses(self, transaction): for output in transaction.outputs: if output.addresses is None or len(output.addresses) == 0: + # output.type = 'nonstandard' + # if output.type != 'multisig': output.type = 'nonstandard' output.addresses = [script_hex_to_non_standard_address(output.script_hex)] @@ -211,42 +202,4 @@ def _add_shielded_inputs_and_outputs(self, transaction): def get_block_reward(self, block): return block.coinbase_tx.calculate_output_value() - def _add_coin_price_to_blocks(self, blocks, coin_price_type): - from_currency_code = Chain.ticker_symbol(self.chain) - - if not from_currency_code or coin_price_type == CoinPriceType.empty: - return - - elif coin_price_type == CoinPriceType.hourly: - block_hour_ids = list(set([get_hour_id_from_ts(block.timestamp) for block in blocks])) - block_hours_ts = {hour_id: get_ts_from_hour_id(hour_id) for hour_id in block_hour_ids} - - for hour_id, hour_ts in block_hours_ts.items(): - if hour_id in self.cached_prices: - continue - - self.cached_prices[hour_id] = get_coin_price(from_currency_code=from_currency_code, timestamp=hour_ts, resource="histohour") - - for block in blocks: - block_hour_id = get_hour_id_from_ts(block.timestamp) - block.coin_price_usd = self.cached_prices[block_hour_id] - - elif coin_price_type == CoinPriceType.daily: - block_day_ids = list(set([get_day_id_from_ts(block.timestamp) for block in blocks])) - block_days_ts = {day_id: get_ts_from_day_id(day_id) for day_id in block_day_ids} - - for day_id, day_ts in block_days_ts.items(): - if day_id in self.cached_prices: - continue - - self.cached_prices[day_id] = get_coin_price(from_currency_code=from_currency_code, timestamp=day_ts, resource="histoday") - - for block in blocks: - block_day_id = get_day_id_from_ts(block.timestamp) - block.coin_price_usd = self.cached_prices[block_day_id] - - def _add_coin_price_to_transaction(self, transaction, coin_price_usd): - transaction.coin_price_usd = coin_price_usd - - ADDRESS_TYPE_SHIELDED = 'shielded' diff --git a/bitcoinetl/streaming/btc_streamer_adapter.py b/bitcoinetl/streaming/btc_streamer_adapter.py index 600d65b..81c9e8d 100644 --- a/bitcoinetl/streaming/btc_streamer_adapter.py +++ b/bitcoinetl/streaming/btc_streamer_adapter.py @@ -95,6 +95,8 @@ def export_all(self, start_block, end_block): transactions = enriched_transactions logging.info('Exporting with ' + type(self.item_exporter).__name__) + logging.info('Block number ' + str(len(blocks))) + logging.info('Transaction length ' + str(len(transactions))) all_items = blocks + transactions diff --git a/blockchainetl/cryptocompare.py b/blockchainetl/cryptocompare.py deleted file mode 100644 index ffd835d..0000000 --- a/blockchainetl/cryptocompare.py +++ /dev/null @@ -1,125 +0,0 @@ -# MIT License -# -# Copyright (c) 2019 Nirmal AK, nirmal@merklescience.com -# -# Permission is hereby granted, free of charge, to any person obtaining a copy -# of this software and associated documentation files (the "Software"), to deal -# in the Software without restriction, including without limitation the rights -# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell -# copies of the Software, and to permit persons to whom the Software is -# furnished to do so, subject to the following conditions: -# -# The above copyright notice and this permission notice shall be included in all -# copies or substantial portions of the Software. -# -# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR -# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, -# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE -# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER -# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, -# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE -# SOFTWARE. - -import os -import requests -from time import time -from math import floor -from datetime import datetime, timedelta - - -CRYPTOCOMPARE_API_KEY = os.getenv("CRYPTOCOMPARE_API_KEY", "") - - -class CryptoCompareRequestException(Exception): - pass - - -def get_hour_id_from_ts(timestamp: int) -> int: - """ - returns the number of hours elapsed since 1st Jan 2000 - """ - base_ts = datetime(2000, 1, 1).timestamp() - seconds_to_hour = 60 * 60 - return floor((int(timestamp) - base_ts) / seconds_to_hour) - - -def get_day_id_from_ts(timestamp: int) -> int: - """ - returns the number of days elapsed since 1st Jan 2000 - """ - base_ts = datetime(2000, 1, 1).timestamp() - seconds_to_day = 60 * 60 * 24 - return floor((int(timestamp) - base_ts) / seconds_to_day) - - -def get_ts_from_hour_id(hour_id: int) -> int: - base_date = datetime(2000, 1, 1) - reference_date = base_date + timedelta(hours=hour_id) - return floor(reference_date.timestamp()) - - -def get_ts_from_day_id(day_id: int) -> int: - base_date = datetime(2000, 1, 1) - reference_date = base_date + timedelta(days=day_id) - return floor(reference_date.timestamp()) - - -def _make_request( - resource: str, - from_currency_code: str, - to_currency_code: str, - timestamp: int, - access_token: str, - exchange_code: str, - num_records: int, - api_version: str - ) -> requests.Response: - """ - API documentation for cryptocompare can be found at https://min-api.cryptocompare.com/documentation - """ - base_url = f"https://min-api.cryptocompare.com/data/{api_version}/{resource}" - params = { - "fsym": from_currency_code, - "tsym": to_currency_code, - "e": exchange_code, - "limit": num_records, - "toTs": timestamp, - "api_key": access_token - } - return requests.get(base_url, params=params) - - -def get_coin_price( - from_currency_code: str, - timestamp: int, - resource="histohour", - to_currency_code: str="USD", - exchange_code: str="CCCAGG", - num_records: int=1, - api_version: str ="v2", - access_token: str=CRYPTOCOMPARE_API_KEY, - ): - """ - Prices are retrieved from hourly price resource as prices - are available for historical data from when available - """ - response = _make_request( - resource=resource, - from_currency_code=from_currency_code, - to_currency_code=to_currency_code, - timestamp=int(timestamp), - access_token=access_token, - exchange_code=exchange_code, - num_records=num_records, - api_version=api_version, - ) - if not response.status_code == 200: - raise CryptoCompareRequestException - - payload = response.json() - if payload["Type"] != 100: - raise CryptoCompareRequestException(payload.get("Message", "")) - - data = payload["Data"]["Data"] - avg_price = sum(item["open"] for item in data) / len(data) - return round(avg_price, 2) diff --git a/blockchainetl/jobs/exporters/google_pubsub_item_exporter.py b/blockchainetl/jobs/exporters/google_pubsub_item_exporter.py index f8caad2..4ff13bd 100644 --- a/blockchainetl/jobs/exporters/google_pubsub_item_exporter.py +++ b/blockchainetl/jobs/exporters/google_pubsub_item_exporter.py @@ -30,7 +30,7 @@ class GooglePubSubItemExporter: def __init__(self, item_type_to_topic_mapping, message_attributes=(), - batch_max_bytes=1024 * 5, batch_max_latency=1, batch_max_messages=1000): + batch_max_bytes=1024 * 5, batch_max_latency=0.01, batch_max_messages=1000): self.item_type_to_topic_mapping = item_type_to_topic_mapping self.batch_max_bytes = batch_max_bytes @@ -45,16 +45,21 @@ def open(self): pass def export_items(self, items): - try: - self._export_items_with_timeout(items) - except timeout_decorator.TimeoutError as e: - # A bug in PubSub publisher that makes it stalled after running for some time. - # Exception in thread Thread-CommitBatchPublisher: - # details = "channel is in state TRANSIENT_FAILURE" - # https://stackoverflow.com/questions/55552606/how-can-one-catch-exceptions-in-python-pubsub-subscriber-that-are-happening-in-i?noredirect=1#comment97849067_55552606 - logging.info('Recreating Pub/Sub publisher.') - self.publisher = self.create_publisher() - raise e + tot_steps = (len(items) // 1000) + 1 + logging.info('Total publish loop steps'+str(tot_steps)) + for i in range(0, len(items), 1000): + mini_batch = items[i:i + 1000] + logging.info('Current Loop Iteration' + str(i + 1)+ 'out of'+str(tot_steps)) + try: + self._export_items_with_timeout(mini_batch) + except timeout_decorator.TimeoutError as e: + # A bug in PubSub publisher that makes it stalled after running for some time. + # Exception in thread Thread-CommitBatchPublisher: + # details = "channel is in state TRANSIENT_FAILURE" + # https://stackoverflow.com/questions/55552606/how-can-one-catch-exceptions-in-python-pubsub-subscriber-that-are-happening-in-i?noredirect=1#comment97849067_55552606 + logging.info('Recreating Pub/Sub publisher.') + self.publisher = self.create_publisher() + raise e @timeout_decorator.timeout(300) def _export_items_with_timeout(self, items): diff --git a/blockchainetl/streaming/streamer.py b/blockchainetl/streaming/streamer.py index aa8f818..1d42308 100644 --- a/blockchainetl/streaming/streamer.py +++ b/blockchainetl/streaming/streamer.py @@ -24,6 +24,7 @@ import logging import os import time +from google.api_core.exceptions import InvalidArgument from blockchainetl.streaming.streamer_adapter_stub import StreamerAdapterStub from blockchainetl.file_utils import smart_open @@ -95,7 +96,12 @@ def _sync_cycle(self): current_block, target_block, self.last_synced_block, blocks_to_sync)) if blocks_to_sync != 0: - self.blockchain_streamer_adapter.export_all(self.last_synced_block + 1, target_block) + + try: + self.blockchain_streamer_adapter.export_all(self.last_synced_block + 1, target_block) + except InvalidArgument as e: + logging.exception(f"An exception occurred while syncing block data - InvalidArgument, ERROR = {e.message}") + logging.info('Writing last synced block {}'.format(target_block)) write_last_synced_block(self.last_synced_block_file, target_block) self.last_synced_block = target_block diff --git a/dockerhub.md b/dockerhub.md index 762cd29..cbb4180 100644 --- a/dockerhub.md +++ b/dockerhub.md @@ -1,10 +1,10 @@ # Uploading to Docker Hub ```bash -> BITCOINETL_STREAMING_VERSION=1.5.2-streaming -> docker build --platform linux/x86_64 -t bitcoin-etl:${BITCOINETL_STREAMING_VERSION} -f Dockerfile_with_streaming . -> docker tag bitcoin-etl:${BITCOINETL_STREAMING_VERSION} blockchainetl/bitcoin-etl:${BITCOINETL_STREAMING_VERSION} -> docker push blockchainetl/bitcoin-etl:${BITCOINETL_STREAMING_VERSION} +> BITCOINETL_STREAMING_VERSION=1.5.0-streaming +> docker build -t bitcoin-etl:${BITCOINETL_STREAMING_VERSION} -f Dockerfile_with_streaming . +> docker tag bitcoin-etl:${BITCOINETL_STREAMING_VERSION} merklescience/bitcoin-etl:${BITCOINETL_STREAMING_VERSION} +> docker push merklescience/bitcoin-etl:${BITCOINETL_STREAMING_VERSION} > docker tag bitcoin-etl:${BITCOINETL_STREAMING_VERSION} blockchainetl/bitcoin-etl:latest-streaming > docker push blockchainetl/bitcoin-etl:latest-streaming