Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Tutorial dbt run errors, but it's running #60

Open
haf opened this issue Mar 18, 2023 · 1 comment
Open

Tutorial dbt run errors, but it's running #60

haf opened this issue Mar 18, 2023 · 1 comment

Comments

@haf
Copy link

haf commented Mar 18, 2023

This tutorial https://getindata.com/blog/dbt-run-real-time-analytics-on-apache-flink-announcing-the-dbt-flink-adapter/

...after downloading the Kafka Table Connector curl -LO https://repo.maven.apache.org/maven2/org/apache/flink/flink-sql-connector-kafka/1.16.0/flink-sql-connector-kafka-1.16.0.jar into the right place...

returns:

17:08:33  Running with dbt=1.3.3
17:08:33  Found 4 models, 0 tests, 0 snapshots, 0 analyses, 297 macros, 1 operation, 0 seed files, 2 sources, 0 exposures, 0 metrics
17:08:33
17:08:33
17:08:33  Running 1 on-run-start hook
17:08:33  Flink adapter: Session created: af8d9593-f26b-4aae-b43f-d41bbff8e65c
17:08:33  Flink adapter: Creating new cursor for session <flink.sqlgateway.session.SqlGatewaySession object at 0x1064dee60>
17:08:33  Flink adapter: Executing statement "/* {"app": "dbt", "dbt_version": "1.3.3", "profile_name": "example1", "target_name": "dev", "connection_name": "master"} */







CREATE TABLE IF NOT EXISTS clickstream /** mode('streaming')*/ (
 `event_timestamp` TIMESTAMP(3),
 `user_id` DECIMAL,
 `event` STRING
, WATERMARK FOR event_timestamp AS event_timestamp
)
with (
 'connector' = 'kafka',
 'properties.bootstrap.servers' = 'kafka:29092',
 'scan.startup.mode' = 'earliest-offset',
 'value.format' = 'json',
 'value.json.encode.decimal-as-plain-number' = 'true',
 'value.json.timestamp-format.standard' = 'ISO-8601',
 'properties.group.id' = 'dbt',
 'topic' = 'clickstream'

);

  "
17:08:33  Flink adapter: Setting 'execution.runtime-mode' to 'streaming'
SQL gateway response: {"operationHandle": "4fd944af-96be-4cd2-bcf2-1fab3586c9a3"}
SQL gateway response: {"operationHandle": "4994f297-501c-4ec1-a0c0-b3d4e1a2b30f"}
17:08:34  Flink adapter: Statement executed. Status FINISHED, operation handle: 4994f297-501c-4ec1-a0c0-b3d4e1a2b30f
17:08:34  Flink adapter: SQL rows returned: [{'kind': 'INSERT', 'fields': ['OK']}]
17:08:34  Flink adapter: Buffered: 1 rows
17:08:34  Flink adapter: SQL rows returned: []
17:08:34  Flink adapter: Buffered: 0 rows
17:08:34  Flink adapter: Fetched results from Flink: [('OK',)]
17:08:34  Flink adapter: Returned results from adapter: [('OK',)]
17:08:34  Flink adapter: Creating new cursor for session <flink.sqlgateway.session.SqlGatewaySession object at 0x1064dee60>
17:08:34  Flink adapter: Executing statement "/* {"app": "dbt", "dbt_version": "1.3.3", "profile_name": "example1", "target_name": "dev", "connection_name": "master"} */







CREATE TABLE IF NOT EXISTS trx /** mode('streaming')*/ (
 `event_timestamp` TIMESTAMP(3),
 `user_id` DECIMAL,
 `source` STRING,
 `target` STRING,
 `amount` DECIMAL,
 `deposit_balance_after_trx` DECIMAL,
 `credit_balance_after_trx` DECIMAL
, WATERMARK FOR event_timestamp AS event_timestamp
)
with (
 'connector' = 'kafka',
 'properties.bootstrap.servers' = 'kafka:29092',
 'scan.startup.mode' = 'earliest-offset',
 'value.format' = 'json',
 'value.json.encode.decimal-as-plain-number' = 'true',
 'value.json.timestamp-format.standard' = 'ISO-8601',
 'properties.group.id' = 'dbt',
 'topic' = 'trx'

);

  "
17:08:34  Flink adapter: Setting 'execution.runtime-mode' to 'streaming'
SQL gateway response: {"operationHandle": "14f2770a-833e-4b34-b7d8-4d4a25967e0a"}
SQL gateway response: {"operationHandle": "9dee1626-d008-4f86-80f6-3a169e0629eb"}
17:08:34  Flink adapter: Statement executed. Status FINISHED, operation handle: 9dee1626-d008-4f86-80f6-3a169e0629eb
17:08:34  Flink adapter: SQL rows returned: [{'kind': 'INSERT', 'fields': ['OK']}]
17:08:34  Flink adapter: Buffered: 1 rows
17:08:34  Flink adapter: SQL rows returned: []
17:08:34  Flink adapter: Buffered: 0 rows
17:08:34  Flink adapter: Fetched results from Flink: [('OK',)]
17:08:34  Flink adapter: Returned results from adapter: [('OK',)]
17:08:34  1 of 1 START hook: dbt_flink.on-run-start.0 .................................... [RUN]
17:08:34  1 of 1 OK hook: dbt_flink.on-run-start.0 ....................................... [OK in 0.00s]
17:08:34
17:08:34  Concurrency: 1 threads (target='dev')
17:08:34
17:08:34  1 of 4 START sql table model high_loan ......................................... [RUN]
17:08:34  Flink adapter: Restored session from file. Session handle: af8d9593-f26b-4aae-b43f-d41bbff8e65c
17:08:34  Flink adapter: Creating new cursor for session <flink.sqlgateway.session.SqlGatewaySession object at 0x1065fbe50>
17:08:34  Flink adapter: Executing statement "/* {"app": "dbt", "dbt_version": "1.3.3", "profile_name": "example1", "target_name": "dev", "node_id": "model.example1.high_loan"} */







  create  table
    high_loan
    /** mode('streaming')*/
  with (
     'connector' = 'kafka',
     'properties.bootstrap.servers' = 'kafka:29092',
     'scan.startup.mode' = 'earliest-offset',
     'value.format' = 'json',
     'value.json.encode.decimal-as-plain-number' = 'true',
     'value.json.timestamp-format.standard' = 'ISO-8601',
     'properties.group.id' = 'dbt',
     'topic' = 'high-loan'

  )
  as (
    select *
from trx
where source = 'credit'
and target = 'deposit'
and amount > 5000
  );
  "
17:08:34  Flink adapter: Setting 'execution.runtime-mode' to 'streaming'
SQL gateway response: {"operationHandle": "c1121476-0dbd-4114-9319-c4e05d7a0491"}
SQL gateway response: {"operationHandle": "5e766b11-a03f-4380-8ec0-dec9b97c8166"}
17:08:36  Flink adapter: Statement executed. Status FINISHED, operation handle: 5e766b11-a03f-4380-8ec0-dec9b97c8166
17:08:36  1 of 4 OK created sql table model high_loan .................................... [FINISHED in 1.74s]
17:08:36  2 of 4 START sql table model joined_data ....................................... [RUN]
17:08:36  Flink adapter: Restored session from file. Session handle: af8d9593-f26b-4aae-b43f-d41bbff8e65c
17:08:36  Flink adapter: Creating new cursor for session <flink.sqlgateway.session.SqlGatewaySession object at 0x10661a260>
17:08:36  Flink adapter: Executing statement "/* {"app": "dbt", "dbt_version": "1.3.3", "profile_name": "example1", "target_name": "dev", "node_id": "model.example1.joined_data"} */







  create  table
    joined_data
    /** mode('streaming')*/
  with (
     'connector' = 'kafka',
     'properties.bootstrap.servers' = 'kafka:29092',
     'scan.startup.mode' = 'earliest-offset',
     'value.format' = 'json',
     'value.json.encode.decimal-as-plain-number' = 'true',
     'value.json.timestamp-format.standard' = 'ISO-8601',
     'properties.group.id' = 'dbt'

  )
  as (
    select cs.event_timestamp, cs.user_id, cs.event, trx.source, trx.target, trx.amount, trx.deposit_balance_after_trx, trx.credit_balance_after_trx
from clickstream as cs
join trx as trx
on cs.user_id = trx.user_id
and cs.event_timestamp = trx.event_timestamp
  );
  "
17:08:36  Flink adapter: Setting 'execution.runtime-mode' to 'streaming'
SQL gateway response: {"operationHandle": "ffbb848f-a82a-46dc-87f3-aecd655afcbb"}
SQL gateway response: {"operationHandle": "b993870a-013f-4720-a1c6-336cd1689a13"}
17:08:36  Flink adapter: Statement executed. Status ERROR, operation handle: b993870a-013f-4720-a1c6-336cd1689a13
17:08:36  Flink adapter: Exception thrown during execution: Statement execution failed
17:08:36  2 of 4 ERROR creating sql table model joined_data .............................. [ERROR in 0.14s]
17:08:36  3 of 4 SKIP relation _default_database.daily_spending .......................... [SKIP]
17:08:36  4 of 4 SKIP relation _default_database.joined_data_output ...................... [SKIP]
17:08:36
17:08:36  Finished running 4 table models, 1 hook in 0 hours 0 minutes and 3.17 seconds (3.17s).
17:08:36
17:08:36  Completed with 1 error and 0 warnings:
17:08:36
17:08:36  Runtime Error in model joined_data (models/joined_data.sql)
17:08:36    Statement execution failed
17:08:36
17:08:36  Done. PASS=1 WARN=0 ERROR=1 SKIP=2 TOTAL=4

But the UI looks like good:

image

@gliter
Copy link
Collaborator

gliter commented Mar 19, 2023

In the UI you should have more than one job.
Could you post Flink logs as well? If you have run with docker-compose up please take a look at console output. There should be some exception there.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

2 participants