Skip to content

Commit

Permalink
BigQuery copy materialization enhancement (#3606)
Browse files Browse the repository at this point in the history
* Change BigQuery copy materialization

Change BigQuery copy materialization macros to copy data from several sources into single target

* Change BigQuery copy materialization

Change BigQuery connections.py to copy data from several sources into single target via copy materialization

* Change BigQuery copy materialization

Test to check default value of `copy_materialization` if it is absent in config

* Change BigQuery copy materialization

Update changelog

* Update changelog

* Var renaming + test addition

* Changelog updated

* Changelog updated

* Fix test for copy table

* Update test_bigquery_adapter.py

* Update test_bigquery_adapter.py

* Update impl.py

* Update connections.py

* Update test_bigquery_adapter.py

* Update test_bigquery_adapter.py

* Update connections.py

* Align calls from mock and from adapter

* Split long code ilnes

* Create additional.sql

* Update copy_as_several_tables.sql

* Update schema.yml

* Update copy.sql

* Update connections.py

* Update test_bigquery_copy_models.py

* Add contributor
  • Loading branch information
xemuliam authored Aug 30, 2021
1 parent 9e79667 commit 052e54d
Show file tree
Hide file tree
Showing 9 changed files with 59 additions and 31 deletions.
10 changes: 10 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,9 +1,19 @@
## dbt 0.21.0 (Release TBD)

### Features

- Enhance BigQuery copy materialization ([#3570](https://github.com/dbt-labs/dbt/issues/3570), [#3606](https://github.com/dbt-labs/dbt/pull/3606)):
- to simplify config (default usage of `copy_materialization='table'` if is is not found in global or local config)
- to let copy several source tables into single target table at a time. ([Google doc reference](https://cloud.google.com/bigquery/docs/managing-tables#copying_multiple_source_tables))

### Under the hood

- Use GitHub Actions for CI ([#3688](https://github.com/dbt-labs/dbt/issues/3688), [#3669](https://github.com/dbt-labs/dbt/pull/3669))

Contributors:

- [@xemuliam](https://github.com/xemuliam) ([#3606](https://github.com/dbt-labs/dbt/pull/3606))

## dbt 0.21.0b2 (August 19, 2021)

### Features
Expand Down
26 changes: 20 additions & 6 deletions plugins/bigquery/dbt/adapters/bigquery/connections.py
Original file line number Diff line number Diff line change
Expand Up @@ -458,26 +458,40 @@ def copy_bq_table(self, source, destination, write_disposition):
conn = self.get_thread_connection()
client = conn.handle

source_ref = self.table_ref(
source.database, source.schema, source.table, conn)
# -------------------------------------------------------------------------------
# BigQuery allows to use copy API using two different formats:
# 1. client.copy_table(source_table_id, destination_table_id)
# where source_table_id = "your-project.source_dataset.source_table"
# 2. client.copy_table(source_table_ids, destination_table_id)
# where source_table_ids = ["your-project.your_dataset.your_table_name", ...]
# Let's use uniform function call and always pass list there
# -------------------------------------------------------------------------------
if type(source) is not list:
source = [source]

source_ref_array = [self.table_ref(
src_table.database, src_table.schema, src_table.table, conn)
for src_table in source]
destination_ref = self.table_ref(
destination.database, destination.schema, destination.table, conn)

logger.debug(
'Copying table "{}" to "{}" with disposition: "{}"',
source_ref.path, destination_ref.path, write_disposition)
'Copying table(s) "{}" to "{}" with disposition: "{}"',
', '.join(source_ref.path for source_ref in source_ref_array),
destination_ref.path, write_disposition)

def copy_and_results():
job_config = google.cloud.bigquery.CopyJobConfig(
write_disposition=write_disposition)
copy_job = client.copy_table(
source_ref, destination_ref, job_config=job_config)
source_ref_array, destination_ref, job_config=job_config)
iterator = copy_job.result(timeout=self.get_timeout(conn))
return copy_job, iterator

self._retry_and_handle(
msg='copy table "{}" to "{}"'.format(
source_ref.path, destination_ref.path),
', '.join(source_ref.path for source_ref in source_ref_array),
destination_ref.path),
conn=conn, fn=copy_and_results)

@staticmethod
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,31 +3,24 @@
{# Setup #}
{{ run_hooks(pre_hooks) }}

{# there should be exactly one ref or exactly one source #}
{% set destination = this.incorporate(type='table') %}

{% set dependency_type = none %}
{% if (model.refs | length) == 1 and (model.sources | length) == 0 %}
{% set dependency_type = 'ref' %}
{% elif (model.refs | length) == 0 and (model.sources | length) == 1 %}
{% set dependency_type = 'source' %}
{% else %}
{% set msg %}
Expected exactly one ref or exactly one source, instead got {{ model.refs | length }} models and {{ model.sources | length }} sources.
{% endset %}
{% do exceptions.raise_compiler_error(msg) %}
{% endif %}

{% if dependency_type == 'ref' %}
{% set src = ref(*model.refs[0]) %}
{% else %}
{% set src = source(*model.sources[0]) %}
{% endif %}
{# there can be several ref() or source() according to BQ copy API docs #}
{# cycle over ref() and source() to create source tables array #}
{% set source_array = [] %}
{% for ref_table in model.refs %}
{{ source_array.append(ref(*ref_table)) }}
{% endfor %}

{% for src_table in model.sources %}
{{ source_array.append(source(*src_table)) }}
{% endfor %}

{# Call adapter's copy_table function #}
{%- set result_str = adapter.copy_table(
src,
source_array,
destination,
config.get('copy_materialization', 'table')) -%}
config.get('copy_materialization', default = 'table')) -%}

{{ store_result('main', response=result_str) }}

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
select 2 as id
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
select * from {{ ref('original') }}, {{ source('test_copy_several_tables', 'additional') }}
Original file line number Diff line number Diff line change
@@ -1,2 +1 @@
{{ config(copy_materialization='table') }}
{{ ref('original') }}
{{ ref('original') }}
6 changes: 6 additions & 0 deletions test/integration/022_bigquery_test/copy-models/schema.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
version: 2
sources:
- name: test_copy_several_tables
schema: "{{ target.schema }}"
tables:
- name: additional
Original file line number Diff line number Diff line change
Expand Up @@ -25,13 +25,17 @@ def project_config(self):
test:
original:
materialized: table
additional:
materialized: table
copy_as_table:
materialized: copy
copy_as_several_tables:
materialized: copy
copy_as_incremental:
materialized: copy
'''))

@use_profile('bigquery')
def test__bigquery_copy_table(self):
results = self.run_dbt()
self.assertEqual(len(results), 3)
self.assertEqual(len(results), 5)
4 changes: 2 additions & 2 deletions test/unit/test_bigquery_adapter.py
Original file line number Diff line number Diff line change
Expand Up @@ -617,7 +617,7 @@ def test_copy_bq_table_appends(self):
write_disposition=dbt.adapters.bigquery.impl.WRITE_APPEND)
args, kwargs = self.mock_client.copy_table.call_args
self.mock_client.copy_table.assert_called_once_with(
self._table_ref('project', 'dataset', 'table1', None),
[self._table_ref('project', 'dataset', 'table1', None)],
self._table_ref('project', 'dataset', 'table2', None),
job_config=ANY)
args, kwargs = self.mock_client.copy_table.call_args
Expand All @@ -630,7 +630,7 @@ def test_copy_bq_table_truncates(self):
write_disposition=dbt.adapters.bigquery.impl.WRITE_TRUNCATE)
args, kwargs = self.mock_client.copy_table.call_args
self.mock_client.copy_table.assert_called_once_with(
self._table_ref('project', 'dataset', 'table1', None),
[self._table_ref('project', 'dataset', 'table1', None)],
self._table_ref('project', 'dataset', 'table2', None),
job_config=ANY)
args, kwargs = self.mock_client.copy_table.call_args
Expand Down

0 comments on commit 052e54d

Please sign in to comment.