Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
78 changes: 78 additions & 0 deletions docker/demo/config/test-suite/deltastreamer-hive-sync-presto.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,78 @@
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

dag_name: unit-test-cow-dag
dag_rounds: 1
dag_intermittent_delay_mins: 10
dag_content:
first_insert:
config:
record_size: 70000
num_partitions_insert: 1
repeat_count: 2
num_records_insert: 100
type: InsertNode
deps: none
second_insert:
config:
record_size: 70000
num_partitions_insert: 1
repeat_count: 1
num_records_insert: 100
type: InsertNode
deps: first_insert
third_insert:
config:
record_size: 70000
num_partitions_insert: 1
repeat_count: 1
num_records_insert: 100
type: InsertNode
deps: second_insert
first_upsert:
config:
record_size: 70000
num_partitions_upsert: 1
repeat_count: 1
num_records_upsert: 100
type: UpsertNode
deps: third_insert
first_hive_sync:
config:
queue_name: "adhoc"
engine: "mr"
type: HiveSyncNode
deps: first_upsert
first_presto_query:
config:
presto_props:
prop1: "SET SESSION hive.parquet_use_column_names = true"
presto_queries:
query1: "select count(*) from testdb.table1"
result1: 400
query2: "select count(*) from testdb.table1 group by _row_key having count(*) > 1"
result2: 0
type: PrestoQueryNode
deps: first_hive_sync
# first_trino_query:
# config:
# trino_queries:
# query1: "select count(*) from testdb1.table1"
# result1: 300
# query2: "select count(*) from testdb1.table1 group by `_row_key` having count(*) > 1"
# result2: 0
# type: TrinoQueryNode
# deps: first_presto_query
10 changes: 10 additions & 0 deletions hudi-integ-test/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -394,6 +394,16 @@
<scope>test</scope>
</dependency>

<dependency>
<groupId>com.facebook.presto</groupId>
<artifactId>presto-jdbc</artifactId>
</dependency>

<dependency>
<groupId>io.trino</groupId>
<artifactId>trino-jdbc</artifactId>
</dependency>

<dependency>
<groupId>org.awaitility</groupId>
<artifactId>awaitility</artifactId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,7 @@
import java.util.Map;

import static org.apache.hudi.common.table.HoodieTableConfig.ARCHIVELOG_FOLDER;
import static org.apache.hudi.common.util.StringUtils.EMPTY_STRING;

/**
* This is the entry point for running a Hudi Test Suite. Although this class has similarities with {@link HoodieDeltaStreamer} this class does not extend it since do not want to create a dependency
Expand Down Expand Up @@ -317,5 +318,27 @@ public static class HoodieTestSuiteConfig extends HoodieDeltaStreamer.Config {

@Parameter(names = {"--test-continuous-mode"}, description = "Tests continuous mode in deltastreamer.")
public Boolean testContinousMode = false;

@Parameter(names = {"--presto-jdbc-url"}, description = "Presto JDBC URL in the format jdbc:presto://<host>:<port>/<catalog>/<schema> "
+ "e.g. URL to connect to Presto running on localhost port 8080 with the catalog `hive` and the schema `sales`: "
+ "jdbc:presto://localhost:8080/hive/sales")
public String prestoJdbcUrl = EMPTY_STRING;

@Parameter(names = {"--presto-jdbc-username"}, description = "Username to use for authentication")
public String prestoUsername = "test";

@Parameter(names = {"--presto-jdbc-password"}, description = "Password corresponding to the username to use for authentication")
public String prestoPassword;

@Parameter(names = {"--trino-jdbc-url"}, description = "Trino JDBC URL in the format jdbc:trino://<host>:<port>/<catalog>/<schema> "
+ "e.g. URL to connect to Trino running on localhost port 8080 with the catalog `hive` and the schema `sales`: "
+ "jdbc:trino://localhost:8080/hive/sales")
public String trinoJdbcUrl = EMPTY_STRING;

@Parameter(names = {"--trino-jdbc-username"}, description = "Username to use for authentication")
public String trinoUsername = "test";

@Parameter(names = {"--trino-jdbc-password"}, description = "Password corresponding to the username to use for authentication")
public String trinoPassword;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ public class DeltaConfig implements Serializable {
private final SerializableConfiguration configuration;

public DeltaConfig(DeltaOutputMode deltaOutputMode, DeltaInputType deltaInputType,
SerializableConfiguration configuration) {
SerializableConfiguration configuration) {
this.deltaOutputMode = deltaOutputMode;
this.deltaInputType = deltaInputType;
this.configuration = configuration;
Expand Down Expand Up @@ -74,6 +74,10 @@ public static class Config {
public static final String CHILDREN = "children";
public static final String HIVE_QUERIES = "hive_queries";
public static final String HIVE_PROPERTIES = "hive_props";
public static final String PRESTO_QUERIES = "presto_queries";
public static final String PRESTO_PROPERTIES = "presto_props";
public static final String TRINO_QUERIES = "trino_queries";
public static final String TRINO_PROPERTIES = "trino_props";
private static String NUM_RECORDS_INSERT = "num_records_insert";
private static String NUM_RECORDS_UPSERT = "num_records_upsert";
private static String NUM_RECORDS_DELETE = "num_records_delete";
Expand Down Expand Up @@ -283,7 +287,7 @@ public Option<String> getPreCombineField() {

public Option<String> getPartitionField() {
return !configsMap.containsKey(PARTITION_FIELD) ? Option.empty()
: Option.of(configsMap.get(PARTITION_FIELD).toString());
: Option.of(configsMap.get(PARTITION_FIELD).toString());
}

public String getMergeCondition() {
Expand Down Expand Up @@ -319,7 +323,7 @@ public Map<String, Object> getOtherConfigs() {

public List<Pair<String, Integer>> getHiveQueries() {
try {
return (List<Pair<String, Integer>>) this.configsMap.getOrDefault("hive_queries", new ArrayList<>());
return (List<Pair<String, Integer>>) this.configsMap.getOrDefault(HIVE_QUERIES, new ArrayList<>());
} catch (Exception e) {
throw new RuntimeException("unable to get hive queries from configs");
}
Expand All @@ -333,6 +337,30 @@ public List<String> getHiveProperties() {
return (List<String>) this.configsMap.getOrDefault(HIVE_PROPERTIES, new ArrayList<>());
}

public List<String> getPrestoProperties() {
return (List<String>) this.configsMap.getOrDefault(PRESTO_PROPERTIES, new ArrayList<>());
}

public List<String> getTrinoProperties() {
return (List<String>) this.configsMap.getOrDefault(TRINO_PROPERTIES, new ArrayList<>());
}

public List<Pair<String, Integer>> getPrestoQueries() {
try {
return (List<Pair<String, Integer>>) this.configsMap.getOrDefault(PRESTO_QUERIES, new ArrayList<>());
} catch (Exception e) {
throw new RuntimeException("unable to get presto queries from configs");
}
}

public List<Pair<String, Integer>> getTrinoQueries() {
try {
return (List<Pair<String, Integer>>) this.configsMap.getOrDefault(TRINO_QUERIES, new ArrayList<>());
} catch (Exception e) {
throw new RuntimeException("unable to get trino queries from configs");
}
}

@Override
public String toString() {
try {
Expand Down Expand Up @@ -449,6 +477,26 @@ public Builder withHiveProperties(List<String> hiveProperties) {
return this;
}

public Builder withPrestoProperties(List<String> prestoProperties) {
this.configsMap.put(PRESTO_PROPERTIES, prestoProperties);
return this;
}

public Builder withTrinoProperties(List<String> trinoProperties) {
this.configsMap.put(TRINO_PROPERTIES, trinoProperties);
return this;
}

public Builder withPrestoQueryAndResults(List<Pair<String, Integer>> prestoQueries) {
this.configsMap.put(PRESTO_QUERIES, prestoQueries);
return this;
}

public Builder withTrinoQueryAndResults(List<Pair<String, Integer>> trinoQueries) {
this.configsMap.put(TRINO_QUERIES, trinoQueries);
return this;
}

public Builder withConfigsMap(Map<String, Object> configsMap) {
this.configsMap = configsMap;
return this;
Expand Down
Loading