diff --git a/hudi-examples/hudi-examples-dbt/.gitignore b/hudi-examples/hudi-examples-dbt/.gitignore index 0eb3fd035dbc7..5f54ec9bc4158 100644 --- a/hudi-examples/hudi-examples-dbt/.gitignore +++ b/hudi-examples/hudi-examples-dbt/.gitignore @@ -6,3 +6,4 @@ logs/ .DS_Store .vscode *.log +dbt-env/ diff --git a/hudi-examples/hudi-examples-dbt/README.md b/hudi-examples/hudi-examples-dbt/README.md index 22f745911261e..a96e3182b9bbe 100644 --- a/hudi-examples/hudi-examples-dbt/README.md +++ b/hudi-examples/hudi-examples-dbt/README.md @@ -14,13 +14,13 @@ See the License for the specific language governing permissions and limitations under the License. --> -## Testing dbt project: `hudi_examples_dbt` +# Testing dbt project: `hudi_examples_dbt` This dbt project transforms demonstrates hudi integration with dbt, it has a few models to demonstrate the different ways in which you can create hudi datasets using dbt. This directory serves as a self-contained playground dbt project, useful for testing out scripts, and communicating some of the core dbt concepts. -### Setup +## Setup Switch working directory and have `python3` installed. @@ -28,7 +28,7 @@ Switch working directory and have `python3` installed. cd hudi-examples/hudi-examples-dbt ``` -### Install dbt +## Install dbt Create python virtual environment ([Reference](https://docs.getdbt.com/docs/installation)). @@ -54,7 +54,7 @@ spark: dev: type: spark method: thrift - schema: my_schema + schema: hudi_examples_dbt host: localhost port: 10000 server_side_parameters: @@ -63,7 +63,7 @@ spark: _If you have access to a data warehouse, you can use those credentials – we recommend setting your [target schema](https://docs.getdbt.com/docs/configure-your-profile#section-populating-your-profile) to be a new schema (dbt will create the schema for you, as long as you have the right privileges). If you don't have access to an existing data warehouse, you can also setup a local postgres database and connect to it in your profile._ -### Start Spark Thrift server +## Start Spark Thrift server > **NOTE** Using these versions > - Spark 3.2.3 (with Derby 10.14.2.0) @@ -104,7 +104,7 @@ $SPARK_HOME/sbin/start-thriftserver.sh \ --hiveconf 'javax.jdo.option.ConnectionURL=jdbc:derby://localhost:1527/default;create=true' ``` -### Verify dbt setup +## Verify dbt setup ```shell dbt debug @@ -116,13 +116,16 @@ Output of the above command should show this text at the end of the output: All checks passed! ``` -### Run the models +## Run the models + +### Run `example` ```shell -dbt run +dbt run -m example ``` -Output should look like this +
+Output should look like this ``` 05:47:28 Running with dbt=1.0.0 @@ -145,14 +148,16 @@ Output should look like this 05:47:42 05:47:42 Completed successfully ``` +
-### Test the output of the models +### Test `example` ```shell -dbt test +dbt test -m example ``` -Output should look like this +
+Output should look like this ``` 05:48:17 Running with dbt=1.0.0 @@ -187,6 +192,84 @@ Output should look like this 05:48:26 05:48:26 Done. PASS=10 WARN=0 ERROR=0 SKIP=0 TOTAL=10 ``` +
+ +### Run `example_cdc` + +Bootstrap the raw table `raw_updates` and `profiles`. + +```shell +dbt run -m example_cdc.raw_updates -m example_cdc.profiles +``` + +Launch a `spark-sql` shell to interact with the tables created by `example_cdc`. + +```shell +spark-sql \ +--packages org.apache.hudi:hudi-spark3.2-bundle_2.12:0.14.0 \ +--conf spark.serializer=org.apache.spark.serializer.KryoSerializer \ +--conf spark.sql.catalog.spark_catalog=org.apache.spark.sql.hudi.catalog.HoodieCatalog \ +--conf spark.sql.extensions=org.apache.spark.sql.hudi.HoodieSparkSessionExtension \ +--conf spark.sql.warehouse.dir=/tmp/hudi/hive/warehouse \ +--conf spark.hadoop.hive.metastore.warehouse.dir=/tmp/hudi/hive/warehouse \ +--conf spark.hadoop.hive.metastore.schema.verification=false \ +--conf spark.hadoop.datanucleus.schema.autoCreateAll=true \ +--conf spark.hadoop.javax.jdo.option.ConnectionDriverName=org.apache.derby.jdbc.ClientDriver \ +--conf 'spark.hadoop.javax.jdo.option.ConnectionURL=jdbc:derby://localhost:1527/default;create=true' \ +--conf 'spark.hadoop.hive.cli.print.header=true' +``` + +Insert sample records. + +```sql +use hudi_examples_dbt; +insert into raw_updates values ('101', 'D', UNIX_TIMESTAMP()); +insert into raw_updates values ('102', 'E', UNIX_TIMESTAMP()); +insert into raw_updates values ('103', 'F', UNIX_TIMESTAMP()); +``` + +Process the updates and write new date to `profiles`. + +```shell +dbt run -m example_cdc.profiles +``` + +
+Check `profiles` records. + +```shell +spark-sql> refresh table profiles; +spark-sql> select _hoodie_commit_time, user_id, city, updated_at from profiles order by updated_at; +_hoodie_commit_time user_id city updated_at +20231128013722030 101 D 1701157027 +20231128013722030 102 E 1701157031 +20231128013722030 103 F 1701157035 +Time taken: 0.219 seconds, Fetched 3 row(s) +``` +
+ +Extract changed data from `profiles` to `profile_changes`. + +```shell +dbt run -m example_cdc.profile_changes +``` + +
+Check `profile_changes` records. + +```shell +spark-sql> refresh table profile_changes; +spark-sql> select user_id, old_city, new_city from profile_changes order by process_ts; +user_id old_city new_city +101 Nil A +102 Nil B +103 Nil C +101 A D +102 B E +103 C F +Time taken: 0.129 seconds, Fetched 6 row(s) +``` +
### Generate documentation diff --git a/hudi-examples/hudi-examples-dbt/dbt_project.yml b/hudi-examples/hudi-examples-dbt/dbt_project.yml index dc5f5593d64d3..6c330e1d995ec 100644 --- a/hudi-examples/hudi-examples-dbt/dbt_project.yml +++ b/hudi-examples/hudi-examples-dbt/dbt_project.yml @@ -28,12 +28,12 @@ profile: 'spark' # These configurations specify where dbt should look for different types of files. # The `source-paths` config, for example, states that models in this project can be # found in the "models/" directory. You probably won't need to change these! -model-paths: ["models"] +model-paths: [ "models" ] target-path: "target" # directory which will store compiled SQL files -clean-targets: # directories to be removed by `dbt clean` - - "target" - - "dbt_modules" +clean-targets: # directories to be removed by `dbt clean` + - "target" + - "dbt_modules" # Configuring models # Full documentation: https://docs.getdbt.com/docs/configuring-models @@ -44,6 +44,9 @@ clean-targets: # directories to be removed by `dbt clean` models: +file_format: hudi hudi_examples_dbt: + example: # Applies to all files under models/example/ - example: - materialized: table + materialized: table + example_cdc: + # Applies to all files under models/example_cdc/ + materialized: table diff --git a/hudi-examples/hudi-examples-dbt/models/example_cdc/profile_changes.sql b/hudi-examples/hudi-examples-dbt/models/example_cdc/profile_changes.sql new file mode 100644 index 0000000000000..bef04190d407f --- /dev/null +++ b/hudi-examples/hudi-examples-dbt/models/example_cdc/profile_changes.sql @@ -0,0 +1,43 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +{{ + config( + materialized='incremental', + file_format='hudi' + ) +}} + +with new_changes as ( + select + GET_JSON_OBJECT(after, '$.user_id') AS user_id, + COALESCE(GET_JSON_OBJECT(before, '$.city'), 'Nil') AS old_city, + GET_JSON_OBJECT(after, '$.city') AS new_city, + ts_ms as process_ts + + from hudi_table_changes('hudi_examples_dbt.profiles', 'cdc', + from_unixtime(unix_timestamp() - 3600 * 24, 'yyyyMMddHHmmss')) + + {% if is_incremental() %} + where ts_ms > (select max(process_ts) from {{ this }}) + {% endif %} +) +select user_id, old_city, new_city, process_ts +from new_changes diff --git a/hudi-examples/hudi-examples-dbt/models/example_cdc/profiles.sql b/hudi-examples/hudi-examples-dbt/models/example_cdc/profiles.sql new file mode 100644 index 0000000000000..ede8c51de1e74 --- /dev/null +++ b/hudi-examples/hudi-examples-dbt/models/example_cdc/profiles.sql @@ -0,0 +1,49 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +{{ + config( + materialized='incremental', + incremental_strategy='merge', + merge_update_columns = ['city', 'updated_at'], + unique_key='user_id', + file_format='hudi', + options={ + 'type': 'cow', + 'primaryKey': 'user_id', + 'preCombineField': 'updated_at', + 'hoodie.table.cdc.enabled': 'true', + 'hoodie.table.cdc.supplemental.logging.mode': 'DATA_BEFORE_AFTER' + } + ) +}} + +with new_updates as ( + select user_id, city, updated_at from {{ ref('raw_updates') }} + + {% if is_incremental() %} + where updated_at > (select max(updated_at) from {{ this }}) + {% endif %} +) + +select + user_id, city, updated_at +from new_updates + diff --git a/hudi-examples/hudi-examples-dbt/models/example_cdc/raw_updates.sql b/hudi-examples/hudi-examples-dbt/models/example_cdc/raw_updates.sql new file mode 100644 index 0000000000000..bc670d5e97995 --- /dev/null +++ b/hudi-examples/hudi-examples-dbt/models/example_cdc/raw_updates.sql @@ -0,0 +1,40 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +{{ + config( + materialized='incremental', + file_format='hudi', + incremental_strategy='insert_overwrite' + ) +}} + +with source_data as ( + + select '101' as user_id, 'A' as city, unix_timestamp() as updated_at + union all + select '102' as user_id, 'B' as city, unix_timestamp() as updated_at + union all + select '103' as user_id, 'C' as city, unix_timestamp() as updated_at + +) + +select * +from source_data diff --git a/hudi-examples/hudi-examples-dbt/models/example_cdc/schema.yml b/hudi-examples/hudi-examples-dbt/models/example_cdc/schema.yml new file mode 100644 index 0000000000000..c7cde1bd2e7f9 --- /dev/null +++ b/hudi-examples/hudi-examples-dbt/models/example_cdc/schema.yml @@ -0,0 +1,70 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +version: 2 + +models: + - name: raw_updates + description: "user updates on profiles" + columns: + - name: user_id + description: "" + tests: + - not_null + - name: city + description: "" + tests: + - not_null + - name: updated_at + description: "" + tests: + - not_null + - name: profiles + description: "user profiles" + columns: + - name: user_id + description: "" + tests: + - unique + - not_null + - name: city + description: "" + tests: + - not_null + - name: updated_at + description: "" + tests: + - not_null + - name: profile_changes + description: "changed data" + columns: + - name: user_id + description: "" + tests: + - not_null + - name: old_city + description: "" + tests: + - not_null + - name: new_city + description: "" + tests: + - not_null + - name: process_ts + description: "" + tests: + - not_null