From a676c4e538d1e20c7d92d6989fcfbace531ad6ac Mon Sep 17 00:00:00 2001 From: Michelle Ark Date: Thu, 12 Sep 2024 16:18:49 -0400 Subject: [PATCH] dbt-postgres 'microbatch' strategy (#146) --- .changes/unreleased/Features-20240911-141416.yaml | 6 ++++++ dbt/adapters/postgres/impl.py | 2 +- .../materializations/incremental_strategies.sql | 11 +++++++++++ .../functional/adapter/test_incremental_microbatch.py | 7 +++++++ 4 files changed, 25 insertions(+), 1 deletion(-) create mode 100644 .changes/unreleased/Features-20240911-141416.yaml create mode 100644 tests/functional/adapter/test_incremental_microbatch.py diff --git a/.changes/unreleased/Features-20240911-141416.yaml b/.changes/unreleased/Features-20240911-141416.yaml new file mode 100644 index 00000000..990a09d9 --- /dev/null +++ b/.changes/unreleased/Features-20240911-141416.yaml @@ -0,0 +1,6 @@ +kind: Features +body: 'Microbatch incremental strategy implementation: merge' +time: 2024-09-11T14:14:16.538536-04:00 +custom: + Author: michelleark + Issue: "149" diff --git a/dbt/adapters/postgres/impl.py b/dbt/adapters/postgres/impl.py index d49d334b..b8d4f43d 100644 --- a/dbt/adapters/postgres/impl.py +++ b/dbt/adapters/postgres/impl.py @@ -151,7 +151,7 @@ def valid_incremental_strategies(self): """The set of standard builtin strategies which this adapter supports out-of-the-box. Not used to validate custom strategies defined by end users. """ - return ["append", "delete+insert", "merge"] + return ["append", "delete+insert", "merge", "microbatch"] def debug_query(self): self.execute("select 1 as id") diff --git a/dbt/include/postgres/macros/materializations/incremental_strategies.sql b/dbt/include/postgres/macros/materializations/incremental_strategies.sql index f2fbf41e..1d37366f 100644 --- a/dbt/include/postgres/macros/materializations/incremental_strategies.sql +++ b/dbt/include/postgres/macros/materializations/incremental_strategies.sql @@ -7,3 +7,14 @@ {% endif %} {% endmacro %} + + +{% macro postgres__get_incremental_microbatch_sql(arg_dict) %} + + {% if arg_dict["unique_key"] %} + {% do return(adapter.dispatch('get_incremental_merge_sql', 'dbt')(arg_dict)) %} + {% else %} + {{ exceptions.raise_compiler_error("dbt-postgres 'microbatch' requires a `unique_key` config") }} + {% endif %} + +{% endmacro %} diff --git a/tests/functional/adapter/test_incremental_microbatch.py b/tests/functional/adapter/test_incremental_microbatch.py new file mode 100644 index 00000000..ce5855b6 --- /dev/null +++ b/tests/functional/adapter/test_incremental_microbatch.py @@ -0,0 +1,7 @@ +from dbt.tests.adapter.incremental.test_incremental_microbatch import ( + BaseMicrobatch, +) + + +class TestPostgresMicrobatch(BaseMicrobatch): + pass