Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Incorrect KestraConstraintViolationException when importing a local flow YAML file into the Kestra Docker container #6908

Closed
goosethedev opened this issue Jan 23, 2025 · 3 comments · Fixed by #7034
Assignees
Labels
area/backend Needs backend code changes bug Something isn't working

Comments

@goosethedev
Copy link

goosethedev commented Jan 23, 2025

Describe the issue

Steps to reproduce:

In an empty directory, create a docker-compose.yaml with local flow sync enabled, binding the ./kestra_flows local directory:

volumes:
  postgres-temp:
  kestra-temp:

services:
  postgres:
    image: postgres:17.2-bookworm
    volumes:
      - postgres-temp:/var/lib/postgresql/data
    environment:
      POSTGRES_DB: kestra
      POSTGRES_USER: kestra
      POSTGRES_PASSWORD: k3str4
    healthcheck:
      test: ["CMD-SHELL", "pg_isready -d $${POSTGRES_DB} -U $${POSTGRES_USER}"]
      interval: 30s
      timeout: 10s
      retries: 10
    ports:
      - 5432:5432

  kestra:
    image: kestra/kestra:latest
    pull_policy: always
    user: "root"
    command: server standalone
    volumes:
      - kestra-temp:/app/storage
      - /var/run/docker.sock:/var/run/docker.sock
      - /tmp/kestra-wd:/tmp/kestra-wd
      - ./kestra_flows:/local_flows
    environment:
      KESTRA_CONFIGURATION: |
        datasources:
          postgres:
            url: jdbc:postgresql://postgres:5432/kestra
            driverClassName: org.postgresql.Driver
            username: kestra
            password: k3str4
        kestra:
          server:
            basicAuth:
              enabled: false
              username: "[email protected]" # it must be a valid email address
              password: kestra
          repository:
            type: postgres
          storage:
            type: local
            local:
              basePath: "/app/storage"
          queue:
            type: postgres
          tasks:
            tmpDir:
              path: /tmp/kestra-wd/tmp
          url: http://localhost:8080/
        micronaut:
          io:
            watch:
              enabled: true
              paths:
                - /local_flows
    ports:
      - "8080:8080"
      - "8081:8081"
    depends_on:
      postgres:
        condition: service_started

Next, create ./kestra_flows/example.example_flow.yaml. The contents are based on an example flow from the DE Zoomcamp:

id: example_flow
namespace: example
description: |
  Best to add a label `backfill:true` from the UI to track executions created via a backfill.
  CSV data used here comes from: https://github.com/DataTalksClub/nyc-tlc-data/releases

concurrency:
  limit: 1

inputs:
  - id: taxi
    type: SELECT
    displayName: Select taxi type
    values: [yellow, green]
    defaults: yellow

variables:
  file: "{{inputs.taxi}}_tripdata_{{trigger.date | date('yyyy-MM')}}.csv"
  staging_table: "public.{{inputs.taxi}}_tripdata_staging"
  table: "public.{{inputs.taxi}}_tripdata"
  data: "{{outputs.extract.outputFiles[inputs.taxi ~ '_tripdata_' ~ (trigger.date | date('yyyy-MM')) ~ '.csv']}}"

tasks:
  - id: set_label
    type: io.kestra.plugin.core.execution.Labels
    labels:
      file: "{{render(vars.file)}}"
      taxi: "{{inputs.taxi}}"

  - id: extract
    type: io.kestra.plugin.scripts.shell.Commands
    outputFiles:
      - "*.csv"
    taskRunner:
      type: io.kestra.plugin.core.runner.Process
    commands:
      - wget -qO- https://github.com/DataTalksClub/nyc-tlc-data/releases/download/{{inputs.taxi}}/{{render(vars.file)}}.gz | gunzip > {{render(vars.file)}}

  - id: if_yellow_taxi
    type: io.kestra.plugin.core.flow.If
    condition: "{{inputs.taxi == 'yellow'}}"
    then:
      - id: yellow_create_table
        type: io.kestra.plugin.jdbc.postgresql.Queries
        sql: |
          CREATE TABLE IF NOT EXISTS {{render(vars.table)}} (
              unique_row_id          text,
              filename               text,
              VendorID               text,
              tpep_pickup_datetime   timestamp,
              tpep_dropoff_datetime  timestamp,
              passenger_count        integer,
              trip_distance          double precision,
              RatecodeID             text,
              store_and_fwd_flag     text,
              PULocationID           text,
              DOLocationID           text,
              payment_type           integer,
              fare_amount            double precision,
              extra                  double precision,
              mta_tax                double precision,
              tip_amount             double precision,
              tolls_amount           double precision,
              improvement_surcharge  double precision,
              total_amount           double precision,
              congestion_surcharge   double precision
          );

      - id: yellow_create_staging_table
        type: io.kestra.plugin.jdbc.postgresql.Queries
        sql: |
          CREATE TABLE IF NOT EXISTS {{render(vars.staging_table)}} (
              unique_row_id          text,
              filename               text,
              VendorID               text,
              tpep_pickup_datetime   timestamp,
              tpep_dropoff_datetime  timestamp,
              passenger_count        integer,
              trip_distance          double precision,
              RatecodeID             text,
              store_and_fwd_flag     text,
              PULocationID           text,
              DOLocationID           text,
              payment_type           integer,
              fare_amount            double precision,
              extra                  double precision,
              mta_tax                double precision,
              tip_amount             double precision,
              tolls_amount           double precision,
              improvement_surcharge  double precision,
              total_amount           double precision,
              congestion_surcharge   double precision
          );

      - id: yellow_truncate_staging_table
        type: io.kestra.plugin.jdbc.postgresql.Queries
        sql: |
          TRUNCATE TABLE {{render(vars.staging_table)}};

      - id: yellow_copy_in_to_staging_table
        type: io.kestra.plugin.jdbc.postgresql.CopyIn
        format: CSV
        from: "{{render(vars.data)}}"
        table: "{{render(vars.staging_table)}}"
        header: true
        columns:
          [
            VendorID,
            tpep_pickup_datetime,
            tpep_dropoff_datetime,
            passenger_count,
            trip_distance,
            RatecodeID,
            store_and_fwd_flag,
            PULocationID,
            DOLocationID,
            payment_type,
            fare_amount,
            extra,
            mta_tax,
            tip_amount,
            tolls_amount,
            improvement_surcharge,
            total_amount,
            congestion_surcharge,
          ]

      - id: yellow_add_unique_id_and_filename
        type: io.kestra.plugin.jdbc.postgresql.Queries
        sql: |
          UPDATE {{render(vars.staging_table)}}
          SET 
            unique_row_id = md5(
              COALESCE(CAST(VendorID AS text), '') ||
              COALESCE(CAST(tpep_pickup_datetime AS text), '') || 
              COALESCE(CAST(tpep_dropoff_datetime AS text), '') || 
              COALESCE(PULocationID, '') || 
              COALESCE(DOLocationID, '') || 
              COALESCE(CAST(fare_amount AS text), '') || 
              COALESCE(CAST(trip_distance AS text), '')      
            ),
            filename = '{{render(vars.file)}}';

      - id: yellow_merge_data
        type: io.kestra.plugin.jdbc.postgresql.Queries
        sql: |
          MERGE INTO {{render(vars.table)}} AS T
          USING {{render(vars.staging_table)}} AS S
          ON T.unique_row_id = S.unique_row_id
          WHEN NOT MATCHED THEN
            INSERT (
              unique_row_id, filename, VendorID, tpep_pickup_datetime, tpep_dropoff_datetime,
              passenger_count, trip_distance, RatecodeID, store_and_fwd_flag, PULocationID,
              DOLocationID, payment_type, fare_amount, extra, mta_tax, tip_amount, tolls_amount,
              improvement_surcharge, total_amount, congestion_surcharge
            )
            VALUES (
              S.unique_row_id, S.filename, S.VendorID, S.tpep_pickup_datetime, S.tpep_dropoff_datetime,
              S.passenger_count, S.trip_distance, S.RatecodeID, S.store_and_fwd_flag, S.PULocationID,
              S.DOLocationID, S.payment_type, S.fare_amount, S.extra, S.mta_tax, S.tip_amount, S.tolls_amount,
              S.improvement_surcharge, S.total_amount, S.congestion_surcharge
            );

  - id: if_green_taxi
    type: io.kestra.plugin.core.flow.If
    condition: "{{inputs.taxi == 'green'}}"
    then:
      - id: green_create_table
        type: io.kestra.plugin.jdbc.postgresql.Queries
        sql: |
          CREATE TABLE IF NOT EXISTS {{render(vars.table)}} (
              unique_row_id          text,
              filename               text,
              VendorID               text,
              lpep_pickup_datetime   timestamp,
              lpep_dropoff_datetime  timestamp,
              store_and_fwd_flag     text,
              RatecodeID             text,
              PULocationID           text,
              DOLocationID           text,
              passenger_count        integer,
              trip_distance          double precision,
              fare_amount            double precision,
              extra                  double precision,
              mta_tax                double precision,
              tip_amount             double precision,
              tolls_amount           double precision,
              ehail_fee              double precision,
              improvement_surcharge  double precision,
              total_amount           double precision,
              payment_type           integer,
              trip_type              integer,
              congestion_surcharge   double precision
          );

      - id: green_create_staging_table
        type: io.kestra.plugin.jdbc.postgresql.Queries
        sql: |
          CREATE TABLE IF NOT EXISTS {{render(vars.staging_table)}} (
              unique_row_id          text,
              filename               text,
              VendorID               text,
              lpep_pickup_datetime   timestamp,
              lpep_dropoff_datetime  timestamp,
              store_and_fwd_flag     text,
              RatecodeID             text,
              PULocationID           text,
              DOLocationID           text,
              passenger_count        integer,
              trip_distance          double precision,
              fare_amount            double precision,
              extra                  double precision,
              mta_tax                double precision,
              tip_amount             double precision,
              tolls_amount           double precision,
              ehail_fee              double precision,
              improvement_surcharge  double precision,
              total_amount           double precision,
              payment_type           integer,
              trip_type              integer,
              congestion_surcharge   double precision
          );

      - id: green_truncate_staging_table
        type: io.kestra.plugin.jdbc.postgresql.Queries
        sql: |
          TRUNCATE TABLE {{render(vars.staging_table)}};

      - id: green_copy_in_to_staging_table
        type: io.kestra.plugin.jdbc.postgresql.CopyIn
        format: CSV
        from: "{{render(vars.data)}}"
        table: "{{render(vars.staging_table)}}"
        header: true
        columns:
          [
            VendorID,
            lpep_pickup_datetime,
            lpep_dropoff_datetime,
            store_and_fwd_flag,
            RatecodeID,
            PULocationID,
            DOLocationID,
            passenger_count,
            trip_distance,
            fare_amount,
            extra,
            mta_tax,
            tip_amount,
            tolls_amount,
            ehail_fee,
            improvement_surcharge,
            total_amount,
            payment_type,
            trip_type,
            congestion_surcharge,
          ]

      - id: green_add_unique_id_and_filename
        type: io.kestra.plugin.jdbc.postgresql.Queries
        sql: |
          UPDATE {{render(vars.staging_table)}}
          SET 
            unique_row_id = md5(
              COALESCE(CAST(VendorID AS text), '') ||
              COALESCE(CAST(lpep_pickup_datetime AS text), '') || 
              COALESCE(CAST(lpep_dropoff_datetime AS text), '') || 
              COALESCE(PULocationID, '') || 
              COALESCE(DOLocationID, '') || 
              COALESCE(CAST(fare_amount AS text), '') || 
              COALESCE(CAST(trip_distance AS text), '')      
            ),
            filename = '{{render(vars.file)}}';

      - id: green_merge_data
        type: io.kestra.plugin.jdbc.postgresql.Queries
        sql: |
          MERGE INTO {{render(vars.table)}} AS T
          USING {{render(vars.staging_table)}} AS S
          ON T.unique_row_id = S.unique_row_id
          WHEN NOT MATCHED THEN
            INSERT (
              unique_row_id, filename, VendorID, lpep_pickup_datetime, lpep_dropoff_datetime,
              store_and_fwd_flag, RatecodeID, PULocationID, DOLocationID, passenger_count,
              trip_distance, fare_amount, extra, mta_tax, tip_amount, tolls_amount, ehail_fee,
              improvement_surcharge, total_amount, payment_type, trip_type, congestion_surcharge
            )
            VALUES (
              S.unique_row_id, S.filename, S.VendorID, S.lpep_pickup_datetime, S.lpep_dropoff_datetime,
              S.store_and_fwd_flag, S.RatecodeID, S.PULocationID, S.DOLocationID, S.passenger_count,
              S.trip_distance, S.fare_amount, S.extra, S.mta_tax, S.tip_amount, S.tolls_amount, S.ehail_fee,
              S.improvement_surcharge, S.total_amount, S.payment_type, S.trip_type, S.congestion_surcharge
            );

  - id: purge_files
    type: io.kestra.plugin.core.storage.PurgeCurrentExecutionFiles
    description: To avoid cluttering your storage, we will remove the downloaded files

pluginDefaults:
  - type: io.kestra.plugin.jdbc.postgresql
    values:
      url: jdbc:postgresql://postgres:5432/postgres-zoomcamp
      username: kestra
      password: k3str4

triggers:
  - id: green_schedule
    type: io.kestra.plugin.core.trigger.Schedule
    cron: "0 9 1 * *"
    inputs:
      taxi: green

  - id: yellow_schedule
    type: io.kestra.plugin.core.trigger.Schedule
    cron: "0 10 1 * *"
    inputs:
      taxi: yellow

Start the containers with docker compose up and the following error appears:

kestra-1    | 2025-01-23 15:56:51,614 WARN  standalone   i.k.c.s.FileChangedEventListener Error while parsing flow: /local_flows/example_flow.yaml
kestra-1    | io.kestra.core.models.validations.KestraConstraintViolationException: yellow_copy_in_to_staging_table.then[3].url: must not be null
kestra-1    | green_add_unique_id_and_filename.then[4].url: must not be null
kestra-1    | yellow_truncate_staging_table.then[2].url: must not be null
kestra-1    | green_merge_data.then[5].url: must not be null
kestra-1    | green_truncate_staging_table.then[2].url: must not be null
kestra-1    | green_copy_in_to_staging_table.then[3].url: must not be null
kestra-1    | yellow_create_table.then[0].url: must not be null
kestra-1    | yellow_merge_data.then[5].url: must not be null
kestra-1    | yellow_create_staging_table.then[1].url: must not be null
kestra-1    | green_create_table.then[0].url: must not be null
kestra-1    | yellow_add_unique_id_and_filename.then[4].url: must not be null
kestra-1    | green_create_staging_table.then[1].url: must not be null
kestra-1    |
kestra-1    | 	at io.kestra.core.models.validations.ModelValidator.isValid(ModelValidator.java:28)
kestra-1    | 	at io.kestra.core.models.validations.ModelValidator.validate(ModelValidator.java:18)
kestra-1    | 	at io.kestra.cli.services.FileChangedEventListener.parseFlow(FileChangedEventListener.java:233)
kestra-1    | 	at io.kestra.cli.services.FileChangedEventListener$1.visitFile(FileChangedEventListener.java:203)
kestra-1    | 	at io.kestra.cli.services.FileChangedEventListener$1.visitFile(FileChangedEventListener.java:189)
kestra-1    | 	at java.base/java.nio.file.Files.walkFileTree(Unknown Source)
kestra-1    | 	at java.base/java.nio.file.Files.walkFileTree(Unknown Source)
kestra-1    | 	at io.kestra.cli.services.FileChangedEventListener.loadFlowsFromFolder(FileChangedEventListener.java:189)
kestra-1    | 	at io.kestra.cli.services.FileChangedEventListener.setup(FileChangedEventListener.java:183)
kestra-1    | 	at io.kestra.cli.services.FileChangedEventListener.startListeningFromConfig(FileChangedEventListener.java:69)
kestra-1    | 	at io.kestra.cli.commands.servers.StandAloneCommand.call(StandAloneCommand.java:124)
kestra-1    | 	at io.kestra.cli.commands.servers.StandAloneCommand.call(StandAloneCommand.java:24)
kestra-1    | 	at picocli.CommandLine.executeUserObject(CommandLine.java:2045)
kestra-1    | 	at picocli.CommandLine.access$1500(CommandLine.java:148)
kestra-1    | 	at picocli.CommandLine$RunLast.executeUserObjectOfLastSubcommandWithSameParent(CommandLine.java:2465)
kestra-1    | 	at picocli.CommandLine$RunLast.handle(CommandLine.java:2457)
kestra-1    | 	at picocli.CommandLine$RunLast.handle(CommandLine.java:2419)
kestra-1    | 	at picocli.CommandLine$AbstractParseResultHandler.execute(CommandLine.java:2277)
kestra-1    | 	at picocli.CommandLine$RunLast.execute(CommandLine.java:2421)
kestra-1    | 	at picocli.CommandLine.execute(CommandLine.java:2174)
kestra-1    | 	at io.kestra.cli.App.execute(App.java:67)
kestra-1    | 	at io.kestra.cli.App.main(App.java:50)

EDIT: When creating the flow using the Kestra UI, it shows a warning indicating Missing property: "url" on the tasks that use the PostgreSQL plugin, but otherwise, the flow works just fine when executed. It's the import job that treats the warning as a critical error.

Environment

docker image inspect kestra/kestra:latest

[
    {
        "Id": "sha256:f27c4233c8b74249d60be8ae2f6ea0b3029c070e5cf00346b529b1b3faae9c7d",
        "RepoTags": [
            "kestra/kestra:latest"
        ],
        "RepoDigests": [
            "kestra/kestra@sha256:6220d672f6b1ce4613b03e54a35ee7fca90ffb8fd90d534a1380f6be6862f61e"
        ],
        "Parent": "",
        "Comment": "buildkit.dockerfile.v0",
        "Created": "2025-01-23T07:18:27.783368443Z",
        "DockerVersion": "",
        "Author": "",
        "Config": {
            "Hostname": "",
            "Domainname": "",
            "User": "kestra",
            "AttachStdin": false,
            "AttachStdout": false,
            "AttachStderr": false,
            "Tty": false,
            "OpenStdin": false,
            "StdinOnce": false,
            "Env": [
                "PATH=/opt/java/openjdk/bin:/usr/local/sbin:/usr/local/bin:/usr/sbin:/usr/bin:/sbin:/bin",
                "JAVA_HOME=/opt/java/openjdk",
                "LANG=en_US.UTF-8",
                "LANGUAGE=en_US:en",
                "LC_ALL=en_US.UTF-8",
                "JAVA_VERSION=jdk-21.0.5+11"
            ],
            "Cmd": [
                "--help"
            ],
            "ArgsEscaped": true,
            "Image": "",
            "Volumes": null,
            "WorkingDir": "/app",
            "Entrypoint": [
                "docker-entrypoint.sh"
            ],
            "OnBuild": null,
            "Labels": {
                "org.opencontainers.image.ref.name": "ubuntu",
                "org.opencontainers.image.version": "22.04"
            }
        },
        "Architecture": "amd64",
        "Os": "linux",
        "Size": 2220616771,
        "GraphDriver": {
            "Data": {
                "LowerDir": "/var/lib/docker/overlay2/b59daf1f5d0e4c9bce414afa294bc6a8eedc57209507c4496fc53ff0f5a7bd8d/diff:/var/lib/docker/overlay2/8896c5d5454f3e59cd12f43dd0eb924ab0da8ba67e28b0535819da75ebecee26/diff:/var/lib/docker/overlay2/8b0a244e9b253c763dc07c278344d7534763871c7e3120ac64d01ddcf23935a0/diff:/var/lib/docker/overlay2/a88cc1d4d4e53920bc4d1c05f5169323c0bc4c942a371789ff070623dbeec50b/diff:/var/lib/docker/overlay2/14a09f71dbb5d0f452e3ca6d7fa2485bd3f15a66ec7099ecd4f8ad038228b79f/diff:/var/lib/docker/overlay2/c36e23c045899df9374baded558f9ca691761ce2871e29f47e7873791a9586bb/diff:/var/lib/docker/overlay2/deb7a7d663efe04e7c66d407a8a3496842515e18facf1396b1b332738d7e994c/diff:/var/lib/docker/overlay2/187bc967052f62d84ed4da8d6205acd626fdb509c775f0a4ccbe1522f6b2c71f/diff",
                "MergedDir": "/var/lib/docker/overlay2/3150e27751d43d3e856d4a167efbc2037b4ace8df8ed31498e62e262b9dc759e/merged",
                "UpperDir": "/var/lib/docker/overlay2/3150e27751d43d3e856d4a167efbc2037b4ace8df8ed31498e62e262b9dc759e/diff",
                "WorkDir": "/var/lib/docker/overlay2/3150e27751d43d3e856d4a167efbc2037b4ace8df8ed31498e62e262b9dc759e/work"
            },
            "Name": "overlay2"
        },
        "RootFS": {
            "Type": "layers",
            "Layers": [
                "sha256:2573e0d8158209ed54ab25c87bcdcb00bd3d2539246960a3d592a1c599d70465",
                "sha256:665d6c5a9cb106dced5aa691f22557e06410278a2fea6f0ddb2f3db389020779",
                "sha256:267035a6b66bbb177122921583f1e86fcfc35d8dc77718fa40a2601072508938",
                "sha256:a81735be3a6dbc17074a03acc3842720d7266a2617b73e428bdcf6675bcb395d",
                "sha256:d8633eeeec0d89ee0db9d6d4551551cae3320899aceb8d21d87f93da41c6623a",
                "sha256:a431b171089f678570e14eeb601922ab1a2df94ddc092de757aeed1f6565610c",
                "sha256:72ed76099989e06325061408951400e815a0697423d00b3d94e7a7639ddd8ba5",
                "sha256:c4aac3152e676c0841ab3116c575ad33c9c4e0131f7f96b056b351cd5c142d24",
                "sha256:16683559f36c37d04d31065d06f88e6a641fd2defbfbad46257b46493f6d2ebe"
            ]
        },
        "Metadata": {
            "LastTagTime": "0001-01-01T00:00:00Z"
        }
    }
]
@goosethedev goosethedev added area/backend Needs backend code changes area/frontend Needs frontend code changes bug Something isn't working labels Jan 23, 2025
@github-project-automation github-project-automation bot moved this to Backlog in Issues Jan 23, 2025
@MilosPaunovic MilosPaunovic removed the area/frontend Needs frontend code changes label Jan 23, 2025
@loicmathieu
Copy link
Member

url is a mandatory field in the Queries task so it is correct to refuse to create a flow without it and the task cannot be run properly without a URL to point to the correct database.

@github-project-automation github-project-automation bot moved this from Backlog to Done in Issues Jan 28, 2025
@goosethedev
Copy link
Author

As per the example flow above, when setting the url in the plugin defaults for io.kestra.plugin.jdbc.postgresql, the flow runs successfully. Both the UI and the import job should be aware of that before warning for a missing url field.

@loicmathieu loicmathieu reopened this Jan 29, 2025
@github-project-automation github-project-automation bot moved this from Done to Backlog in Issues Jan 29, 2025
@loicmathieu
Copy link
Member

Sorry, I didn't noticed there was plugin defaults in place, they should be honored.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
area/backend Needs backend code changes bug Something isn't working
Projects
Status: Done
Development

Successfully merging a pull request may close this issue.

3 participants