From 23ea4f61bc26475ffa94d128ca1b6e38e1395c0d Mon Sep 17 00:00:00 2001 From: Akshay Gupta Date: Thu, 26 Aug 2021 17:33:42 +0530 Subject: [PATCH 1/5] chunking publish to pubsub --- .../exporters/google_pubsub_item_exporter.py | 24 +++++++++++-------- 1 file changed, 14 insertions(+), 10 deletions(-) diff --git a/blockchainetl/jobs/exporters/google_pubsub_item_exporter.py b/blockchainetl/jobs/exporters/google_pubsub_item_exporter.py index a0676d0..00eedd9 100644 --- a/blockchainetl/jobs/exporters/google_pubsub_item_exporter.py +++ b/blockchainetl/jobs/exporters/google_pubsub_item_exporter.py @@ -45,16 +45,20 @@ def open(self): pass def export_items(self, items): - try: - self._export_items_with_timeout(items) - except timeout_decorator.TimeoutError as e: - # A bug in PubSub publisher that makes it stalled after running for some time. - # Exception in thread Thread-CommitBatchPublisher: - # details = "channel is in state TRANSIENT_FAILURE" - # https://stackoverflow.com/questions/55552606/how-can-one-catch-exceptions-in-python-pubsub-subscriber-that-are-happening-in-i?noredirect=1#comment97849067_55552606 - logging.info('Recreating Pub/Sub publisher.') - self.publisher = self.create_publisher() - raise e + tot_steps = (len(items) // 1000) + 1 + logging.info('Total publish loop steps', tot_steps) + for i in range(0, len(items), 1000): + logging.info('Current Loop Iteration', i + 1, 'out of', tot_steps) + try: + self._export_items_with_timeout(items) + except timeout_decorator.TimeoutError as e: + # A bug in PubSub publisher that makes it stalled after running for some time. + # Exception in thread Thread-CommitBatchPublisher: + # details = "channel is in state TRANSIENT_FAILURE" + # https://stackoverflow.com/questions/55552606/how-can-one-catch-exceptions-in-python-pubsub-subscriber-that-are-happening-in-i?noredirect=1#comment97849067_55552606 + logging.info('Recreating Pub/Sub publisher.') + self.publisher = self.create_publisher() + raise e @timeout_decorator.timeout(300) def _export_items_with_timeout(self, items): From 033b3d821ceaf0549d7c51dc4d5a1236711a41db Mon Sep 17 00:00:00 2001 From: Akshay Gupta Date: Thu, 26 Aug 2021 17:39:38 +0530 Subject: [PATCH 2/5] acutal batch --- blockchainetl/jobs/exporters/google_pubsub_item_exporter.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/blockchainetl/jobs/exporters/google_pubsub_item_exporter.py b/blockchainetl/jobs/exporters/google_pubsub_item_exporter.py index 00eedd9..55728ec 100644 --- a/blockchainetl/jobs/exporters/google_pubsub_item_exporter.py +++ b/blockchainetl/jobs/exporters/google_pubsub_item_exporter.py @@ -48,9 +48,10 @@ def export_items(self, items): tot_steps = (len(items) // 1000) + 1 logging.info('Total publish loop steps', tot_steps) for i in range(0, len(items), 1000): + mini_batch = items[i:i + 1000] logging.info('Current Loop Iteration', i + 1, 'out of', tot_steps) try: - self._export_items_with_timeout(items) + self._export_items_with_timeout(mini_batch) except timeout_decorator.TimeoutError as e: # A bug in PubSub publisher that makes it stalled after running for some time. # Exception in thread Thread-CommitBatchPublisher: From dfa770649d90b8080b4145deab01fee71ebe0e5c Mon Sep 17 00:00:00 2001 From: saurabhdaga-merkle Date: Thu, 26 Aug 2021 17:54:09 +0530 Subject: [PATCH 3/5] updated README --- README_CONTAINER_REGISTRY.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/README_CONTAINER_REGISTRY.md b/README_CONTAINER_REGISTRY.md index da49514..1509f5e 100644 --- a/README_CONTAINER_REGISTRY.md +++ b/README_CONTAINER_REGISTRY.md @@ -1,4 +1,4 @@ -BITCOINETL_STREAMING_VERSION=1.4-streaming +BITCOINETL_STREAMING_VERSION=1.4-chunk-streaming docker build -t merklescience/bitcoin-etl:${BITCOINETL_STREAMING_VERSION} -f Dockerfile_with_streaming . docker tag merklescience/bitcoin-etl:${BITCOINETL_STREAMING_VERSION} us.gcr.io/staging-btc-etl/merklescience/bitcoin-etl:${BITCOINETL_STREAMING_VERSION} docker push us.gcr.io/staging-btc-etl/merklescience/bitcoin-etl:${BITCOINETL_STREAMING_VERSION} From fc03e8106c732d4506dcf60a28708c5d8ed1203d Mon Sep 17 00:00:00 2001 From: Akshay Gupta Date: Thu, 26 Aug 2021 18:14:12 +0530 Subject: [PATCH 4/5] fix logging --- blockchainetl/jobs/exporters/google_pubsub_item_exporter.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/blockchainetl/jobs/exporters/google_pubsub_item_exporter.py b/blockchainetl/jobs/exporters/google_pubsub_item_exporter.py index 55728ec..4ff13bd 100644 --- a/blockchainetl/jobs/exporters/google_pubsub_item_exporter.py +++ b/blockchainetl/jobs/exporters/google_pubsub_item_exporter.py @@ -46,10 +46,10 @@ def open(self): def export_items(self, items): tot_steps = (len(items) // 1000) + 1 - logging.info('Total publish loop steps', tot_steps) + logging.info('Total publish loop steps'+str(tot_steps)) for i in range(0, len(items), 1000): mini_batch = items[i:i + 1000] - logging.info('Current Loop Iteration', i + 1, 'out of', tot_steps) + logging.info('Current Loop Iteration' + str(i + 1)+ 'out of'+str(tot_steps)) try: self._export_items_with_timeout(mini_batch) except timeout_decorator.TimeoutError as e: From 557e58fb91f58071bab6ee72ebdb14c88c0c6f16 Mon Sep 17 00:00:00 2001 From: saurabhdaga-merkle <55530487+saurabhdaga-merkle@users.noreply.github.com> Date: Thu, 16 Dec 2021 17:42:17 +0530 Subject: [PATCH 5/5] Update README_CONTAINER_REGISTRY.md --- README_CONTAINER_REGISTRY.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/README_CONTAINER_REGISTRY.md b/README_CONTAINER_REGISTRY.md index 1509f5e..da49514 100644 --- a/README_CONTAINER_REGISTRY.md +++ b/README_CONTAINER_REGISTRY.md @@ -1,4 +1,4 @@ -BITCOINETL_STREAMING_VERSION=1.4-chunk-streaming +BITCOINETL_STREAMING_VERSION=1.4-streaming docker build -t merklescience/bitcoin-etl:${BITCOINETL_STREAMING_VERSION} -f Dockerfile_with_streaming . docker tag merklescience/bitcoin-etl:${BITCOINETL_STREAMING_VERSION} us.gcr.io/staging-btc-etl/merklescience/bitcoin-etl:${BITCOINETL_STREAMING_VERSION} docker push us.gcr.io/staging-btc-etl/merklescience/bitcoin-etl:${BITCOINETL_STREAMING_VERSION}