Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Streaming changes #6

Merged
merged 26 commits into from
Sep 2, 2022
Merged
Show file tree
Hide file tree
Changes from 25 commits
Commits
Show all changes
26 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -50,3 +50,6 @@ venv/
ENV/

.idea/

*.json
*.txt
5 changes: 5 additions & 0 deletions README_CONTAINER_REGISTRY.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
BITCOINETL_STREAMING_VERSION=1.4-streaming
docker build -t merklescience/bitcoin-etl:${BITCOINETL_STREAMING_VERSION} -f Dockerfile_with_streaming .
docker tag merklescience/bitcoin-etl:${BITCOINETL_STREAMING_VERSION} us.gcr.io/staging-btc-etl/merklescience/bitcoin-etl:${BITCOINETL_STREAMING_VERSION}
docker push us.gcr.io/staging-btc-etl/merklescience/bitcoin-etl:${BITCOINETL_STREAMING_VERSION}

7 changes: 2 additions & 5 deletions bitcoinetl/cli/export_all.py
Original file line number Diff line number Diff line change
Expand Up @@ -96,10 +96,7 @@ def get_partitions(start, end, partition_batch_size, provider_uri):
@click.option('-c', '--chain', default=Chain.BITCOIN, type=click.Choice(Chain.ALL),
help='The type of chain.')
@click.option('--enrich', default=False, type=bool, help='Enable filling in transactions inputs fields.')
@click.option('--coin-price-type', default=CoinPriceType.empty, type=int,
help='Enable querying CryptoCompare for coin prices. 0 for no price, 1 for daily price, 2 for hourly price.')
def export_all(start, end, partition_batch_size, provider_uri, output_dir, max_workers, export_batch_size, chain, enrich, coin_price_type):
def export_all(start, end, partition_batch_size, provider_uri, output_dir, max_workers, export_batch_size, chain, enrich):
"""Exports all data for a range of blocks."""
do_export_all(chain, get_partitions(start, end, partition_batch_size, provider_uri),
output_dir, provider_uri, max_workers, export_batch_size, enrich,
coin_price_type)
output_dir, provider_uri, max_workers, export_batch_size, enrich)
6 changes: 1 addition & 5 deletions bitcoinetl/cli/export_blocks_and_transactions.py
Original file line number Diff line number Diff line change
Expand Up @@ -48,11 +48,8 @@
'If not provided transactions will not be exported. Use "-" for stdout')
@click.option('-c', '--chain', default=Chain.BITCOIN, type=click.Choice(Chain.ALL),
help='The type of chain')
@click.option('--coin-price-type', default=CoinPriceType.empty, type=int,
help='Enable querying CryptoCompare for coin prices. 0 for no price, 1 for daily price, 2 for hourly price.')
def export_blocks_and_transactions(start_block, end_block, batch_size, provider_uri,
max_workers, blocks_output, transactions_output, chain,
coin_price_type):
max_workers, blocks_output, transactions_output, chain,):
"""Export blocks and transactions."""
if blocks_output is None and transactions_output is None:
raise ValueError('Either --blocks-output or --transactions-output options must be provided')
Expand All @@ -67,6 +64,5 @@ def export_blocks_and_transactions(start_block, end_block, batch_size, provider_
chain=chain,
export_blocks=blocks_output is not None,
export_transactions=transactions_output is not None,
coin_price_type=coin_price_type
)
job.run()
12 changes: 7 additions & 5 deletions bitcoinetl/cli/stream.py
Original file line number Diff line number Diff line change
Expand Up @@ -43,16 +43,17 @@
'If not specified will print to console.')
@click.option('-s', '--start-block', default=None, type=int, help='Start block.')
@click.option('-c', '--chain', default=Chain.BITCOIN, type=click.Choice(Chain.ALL), help='The type of chain.')
@click.option('--period-seconds', default=10, type=int, help='How many seconds to sleep between syncs.')
@click.option('-b', '--batch-size', default=2, type=int, help='How many blocks to batch in single request.')
@click.option('--period-seconds', default=1, type=int, help='How many seconds to sleep between syncs.')
@click.option('-b', '--batch-size', default=1, type=int, help='How many blocks to batch in single request.')
@click.option('-B', '--block-batch-size', default=10, type=int, help='How many blocks to batch in single sync round.')
@click.option('-w', '--max-workers', default=5, type=int, help='The number of workers.')
@click.option('--log-file', default=None, type=str, help='Log file.')
@click.option('--pid-file', default=None, type=str, help='pid file.')
@click.option('--enrich', default=True, type=bool, help='Enable filling in transactions inputs fields.')
@click.option('--retry_errors', default=True, type=bool, help='Enable Retry on streaming failures')
def stream(last_synced_block_file, lag, provider_uri, output, start_block, chain=Chain.BITCOIN,
period_seconds=10, batch_size=2, block_batch_size=10, max_workers=5, log_file=None, pid_file=None,
enrich=True):
period_seconds=1, batch_size=1, block_batch_size=10, max_workers=5, log_file=None, pid_file=None,
enrich=True, retry_errors=True):
"""Streams all data types to console or Google Pub/Sub."""
configure_logging(log_file)
configure_signals()
Expand All @@ -67,7 +68,7 @@ def stream(last_synced_block_file, lag, provider_uri, output, start_block, chain
chain=chain,
batch_size=batch_size,
enable_enrich=enrich,
max_workers=max_workers
max_workers=max_workers,
)
streamer = Streamer(
blockchain_streamer_adapter=streamer_adapter,
Expand All @@ -77,5 +78,6 @@ def stream(last_synced_block_file, lag, provider_uri, output, start_block, chain
period_seconds=period_seconds,
block_batch_size=block_batch_size,
pid_file=pid_file,
retry_errors=retry_errors
)
streamer.stream()
1 change: 0 additions & 1 deletion bitcoinetl/domain/block.py
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,6 @@ def __init__(self):

self.block_reward = None
self.transaction_fees = None
self.coin_price_usd = None
self.coinbase_txid = None
self.coinbase_param_decoded = None

Expand Down
1 change: 0 additions & 1 deletion bitcoinetl/domain/transaction.py
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,6 @@ def __init__(self):
self.input_value = None
self.output_count = None
self.output_value = None
self.coin_price_usd = None

def add_input(self, input):
if len(self.inputs) > 0:
Expand Down
4 changes: 3 additions & 1 deletion bitcoinetl/enumeration/chain.py
Original file line number Diff line number Diff line change
@@ -1,14 +1,15 @@
class Chain:
BITCOIN = 'bitcoin'
BITCOIN_CASH = 'bitcoin_cash'
BITCOIN_CASH_SV = 'bitcoin_cash_sv'
BITCOIN_GOLD = 'bitcoin_gold'
DOGECOIN = 'dogecoin'
LITECOIN = 'litecoin'
DASH = 'dash'
ZCASH = 'zcash'
MONACOIN = 'monacoin'

ALL = [BITCOIN, BITCOIN_CASH, BITCOIN_GOLD, DOGECOIN, LITECOIN, DASH, ZCASH, MONACOIN]
ALL = [BITCOIN, BITCOIN_CASH, BITCOIN_CASH_SV, BITCOIN_GOLD, DOGECOIN, LITECOIN, DASH, ZCASH, MONACOIN]
# Old API doesn't support verbosity for getblock which doesn't allow querying all transactions in a block in 1 go.
HAVE_OLD_API = [BITCOIN_CASH, DOGECOIN, DASH, MONACOIN]
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

should we check if they support verbosity for these chains now?


Expand All @@ -17,6 +18,7 @@ def ticker_symbol(cls, chain):
symbols = {
'bitcoin': 'BTC',
'bitcoin_cash': 'BCH',
'bitcoin_cash_sv': 'BSV',
'dogecoin': 'DOGE',
'litecoin': 'LTC',
'dash': 'DASH',
Expand Down
22 changes: 11 additions & 11 deletions bitcoinetl/jobs/enrich_transactions.py
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ def _enrich_transactions(self, transactions):
input_transactions_map = self._get_input_transactions_as_map(transaction_input_batch)
for input in transaction_input_batch:
output = self._get_output_for_input(input, input_transactions_map) \
if input.spent_transaction_hash is not None else None
if input.create_transaction_id is not None else None
if output is not None:
input.required_signatures = output.required_signatures
input.type = output.type
Expand All @@ -74,29 +74,29 @@ def _enrich_transactions(self, transactions):
self.item_exporter.export_item(self.transaction_mapper.transaction_to_dict(transaction))

def _get_input_transactions_as_map(self, transaction_inputs):
transaction_hashes = [input.spent_transaction_hash for input in transaction_inputs
if input.spent_transaction_hash is not None]
transaction_hashes = [input.create_transaction_id for input in transaction_inputs
if input.create_transaction_id is not None]

transaction_hashes = set(transaction_hashes)
if len(transaction_hashes) > 0:
transactions = self.btc_service.get_transactions_by_hashes(transaction_hashes)
return {transaction.hash: transaction for transaction in transactions}
return {transaction.transaction_id: transaction for transaction in transactions}
else:
return {}

def _get_output_for_input(self, transaction_input, input_transactions_map):
spent_transaction_hash = transaction_input.spent_transaction_hash
input_transaction = input_transactions_map.get(spent_transaction_hash)
create_transaction_id = transaction_input.create_transaction_id
input_transaction = input_transactions_map.get(create_transaction_id)
if input_transaction is None:
raise ValueError('Input transaction with hash {} not found'.format(spent_transaction_hash))
raise ValueError('Input transaction with hash {} not found'.format(create_transaction_id))

spent_output_index = transaction_input.spent_output_index
if input_transaction.outputs is None or len(input_transaction.outputs) < (spent_output_index + 1):
create_output_index = transaction_input.create_output_index
if input_transaction.outputs is None or len(input_transaction.outputs) < (create_output_index + 1):
raise ValueError(
'There is no output with index {} in transaction with hash {}'.format(
spent_output_index, spent_transaction_hash))
create_output_index, create_transaction_id))

output = input_transaction.outputs[spent_output_index]
output = input_transaction.outputs[create_output_index]
return output

def _end(self):
Expand Down
2 changes: 0 additions & 2 deletions bitcoinetl/jobs/export_all.py
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,6 @@

def export_all(
chain, partitions, output_dir, provider_uri, max_workers, batch_size, enrich,
coin_price_type
):
for batch_start_block, batch_end_block, partition_dir, *args in partitions:
# # # start # # #
Expand Down Expand Up @@ -105,7 +104,6 @@ def export_all(
item_exporter=blocks_and_transactions_item_exporter(blocks_file, transactions_file),
export_blocks=blocks_file is not None,
export_transactions=transactions_file is not None,
coin_price_type=coin_price_type,
)
job.run()

Expand Down
5 changes: 2 additions & 3 deletions bitcoinetl/jobs/export_blocks_job.py
Original file line number Diff line number Diff line change
Expand Up @@ -42,8 +42,7 @@ def __init__(
item_exporter,
chain,
export_blocks=True,
export_transactions=True,
coin_price_type=CoinPriceType.empty):
export_transactions=True):
validate_range(start_block, end_block)

self.start_block = start_block
Expand All @@ -57,7 +56,7 @@ def __init__(
if not self.export_blocks and not self.export_transactions:
raise ValueError('At least one of export_blocks or export_transactions must be True')

self.btc_service = BtcService(bitcoin_rpc, chain, coin_price_type)
self.btc_service = BtcService(bitcoin_rpc, chain)
self.block_mapper = BtcBlockMapper()
self.transaction_mapper = BtcTransactionMapper()

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,6 @@
"transaction_count",
"block_reward",
"transaction_ids",
"coin_price_usd",
]


Expand All @@ -69,7 +68,6 @@
'output_value',
'inputs',
'outputs',
'coin_price_usd',
]


Expand Down
2 changes: 0 additions & 2 deletions bitcoinetl/mappers/block_mapper.py
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,6 @@ def json_dict_to_block(self, json_dict):
block.chain_work = json_dict.get("chainwork")
block.coinbase_txid = json_dict.get("coinbase_txid")
block.previous_block_hash = json_dict.get("previousblockhash")
block.coin_price_usd = json_dict.get('coin_price_usd')
return block

def block_to_dict(self, block):
Expand All @@ -94,7 +93,6 @@ def block_to_dict(self, block):
'difficulty': block.difficulty,
'chain_work': block.chain_work,
'previous_block_hash': block.previous_block_hash,
"coin_price_usd": block.coin_price_usd,
"transaction_ids": block.transaction_ids
}

Expand Down
12 changes: 6 additions & 6 deletions bitcoinetl/mappers/transaction_input_mapper.py
Original file line number Diff line number Diff line change
Expand Up @@ -48,8 +48,8 @@ def json_dict_to_input(self, json_dict, spending_transaction_id=None):
input.sequence = json_dict.get('sequence')

if 'scriptSig' in json_dict:
input.script_asm = (json_dict.get('scriptSig')).get('asm')
input.script_hex = (json_dict.get('scriptSig')).get('hex')
input.script_asm = '' #(json_dict.get('scriptSig')).get('asm')
input.script_hex = '' #(json_dict.get('scriptSig')).get('hex')

return input

Expand All @@ -63,8 +63,8 @@ def inputs_to_dicts(self, inputs):
'create_output_index': input.create_output_index,
'sequence': input.sequence,

'script_asm': input.script_asm,
'script_hex': input.script_hex,
'script_asm': '', #input.script_asm
'script_hex': '', #input.script_hex

'required_signatures': input.required_signatures,
'addresses': input.addresses,
Expand All @@ -83,8 +83,8 @@ def dicts_to_inputs(self, dicts):
input.index = dict.get('index')
input.create_transaction_id = dict.get('create_transaction_id')
input.create_output_index = dict.get('create_output_index')
input.script_asm = dict.get('script_asm')
input.script_hex = dict.get('script_hex')
input.script_asm = '' #dict.get('script_asm')
input.script_hex = '' #dict.get('script_hex')
input.sequence = dict.get('sequence')
input.required_signatures = dict.get('required_signatures')
input.type = dict.get('type')
Expand Down
6 changes: 1 addition & 5 deletions bitcoinetl/mappers/transaction_mapper.py
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ def __init__(self):
self.transaction_output_mapper = BtcTransactionOutputMapper()
self.join_split_mapper = BtcJoinSplitMapper()

def json_dict_to_transaction(self, json_dict, block=None, index=None, coin_price_usd=None):
def json_dict_to_transaction(self, json_dict, block=None, index=None):
transaction = BtcTransaction()
transaction.hash = json_dict.get('hash')
transaction.size = json_dict.get('size')
Expand Down Expand Up @@ -72,8 +72,6 @@ def json_dict_to_transaction(self, json_dict, block=None, index=None, coin_price
transaction.join_splits = self.join_split_mapper.vjoinsplit_to_join_splits(json_dict.get('vjoinsplit'))
transaction.value_balance = bitcoin_to_satoshi(json_dict.get('valueBalance'))

# New fields
transaction.coin_price_usd = coin_price_usd
transaction.weight = json_dict.get('weight')
transaction.output_addresses = self.get_output_addresses(transaction)
return transaction
Expand Down Expand Up @@ -104,7 +102,6 @@ def transaction_to_dict(self, transaction):
'input_value': transaction.calculate_input_value(),
'output_value': transaction.calculate_output_value(),
'fee': transaction.calculate_fee(),
'coin_price_usd': transaction.coin_price_usd,
'weight': transaction.weight,
'output_addresses': transaction.output_addresses
}
Expand All @@ -123,7 +120,6 @@ def dict_to_transaction(self, dict):
transaction.block_timestamp = dict.get('block_timestamp')
transaction.is_coinbase = dict.get('is_coinbase')
transaction.index = dict.get('index')
transaction.coin_price_usd = dict.get('coin_price_usd')
transaction.weight = dict.get('weight')
transaction.output_addresses = dict.get('output_addresses')
transaction.input_addresses = dict.get('input_addresses')
Expand Down
26 changes: 16 additions & 10 deletions bitcoinetl/mappers/transaction_output_mapper.py
Original file line number Diff line number Diff line change
Expand Up @@ -38,17 +38,23 @@ def json_dict_to_output(self, json_dict, create_transaction_id=None):

output.index = json_dict.get('n')
output.addresses = json_dict.get('addresses')
output.txinwitness = json_dict.get('txinwitness')
output.witness = json_dict.get('txinwitness')
output.value = bitcoin_to_satoshi(json_dict.get('value'))
output.create_transaction_id = create_transaction_id

if 'scriptPubKey' in json_dict:
script_pub_key = json_dict.get('scriptPubKey')
output.script_asm = script_pub_key.get('asm')
output.script_hex = script_pub_key.get('hex')
output.script_asm = '' #script_pub_key.get('asm')
output.script_hex = '' #script_pub_key.get('hex')
output.required_signatures = script_pub_key.get('reqSigs')
output.type = script_pub_key.get('type')
output.addresses = script_pub_key.get('addresses')
#output.addresses = script_pub_key.get('addresses')
if script_pub_key.get('addresses') is not None and len(script_pub_key.get('addresses')) > 0:
output.addresses = script_pub_key.get('addresses')
elif script_pub_key.get('address') is None:
output.addresses = []
else:
output.addresses = [script_pub_key.get('address')]

return output

Expand All @@ -60,16 +66,16 @@ def outputs_to_dicts(self, outputs):
'create_transaction_id': output.create_transaction_id,
'spending_transaction_id': None,

'script_asm': output.script_asm,
'script_hex': output.script_hex,
'script_asm': '', #output.script_asm
'script_hex': '', #output.script_hex

'type': output.type,
'addresses': output.addresses,
'value': output.value,
'required_signatures': output.required_signatures,
}
if output.txinwitness:
item['witness'] = output.txinwitness
if output.witness:
item['witness'] = output.witness

result.append(item)
return result
Expand All @@ -79,8 +85,8 @@ def dicts_to_outputs(self, dicts):
for dict in dicts:
input = BtcTransactionOutput()
input.index = dict.get('index')
input.script_asm = dict.get('script_asm')
input.script_hex = dict.get('script_hex')
input.script_asm = '' #dict.get('script_asm')
input.script_hex = '' #dict.get('script_hex')
input.required_signatures = dict.get('required_signatures')
input.type = dict.get('type')
input.addresses = dict.get('addresses')
Expand Down
Loading