Skip to content
Closed
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -1065,35 +1065,35 @@ public void testTrinoSparkConcurrentInsert()
QueryExecutor onTrino = onTrino();
QueryExecutor onSpark = onSpark();
List<Row> allInserted = executor.invokeAll(
Stream.of(Engine.TRINO, Engine.SPARK)
.map(engine -> (Callable<List<Row>>) () -> {
List<Row> inserted = new ArrayList<>();
for (int i = 0; i < insertsPerEngine; i++) {
barrier.await(20, SECONDS);
String engineName = engine.name().toLowerCase(ENGLISH);
long value = i;
switch (engine) {
case TRINO:
try {
onTrino.executeQuery(format("INSERT INTO %s VALUES ('%s', %d)", trinoTableName, engineName, value));
}
catch (QueryExecutionException queryExecutionException) {
// failed to insert
continue; // next loop iteration
}
break;
case SPARK:
onSpark.executeQuery(format("INSERT INTO %s VALUES ('%s', %d)", sparkTableName, engineName, value));
break;
default:
throw new UnsupportedOperationException("Unexpected engine: " + engine);
Stream.of(Engine.TRINO, Engine.SPARK)
.map(engine -> (Callable<List<Row>>) () -> {
List<Row> inserted = new ArrayList<>();
for (int i = 0; i < insertsPerEngine; i++) {
barrier.await(20, SECONDS);
String engineName = engine.name().toLowerCase(ENGLISH);
long value = i;
switch (engine) {
case TRINO:
try {
onTrino.executeQuery(format("INSERT INTO %s VALUES ('%s', %d)", trinoTableName, engineName, value));
}

inserted.add(row(engineName, value));
}
return inserted;
})
.collect(toImmutableList())).stream()
catch (QueryExecutionException queryExecutionException) {
// failed to insert
continue; // next loop iteration
}
break;
case SPARK:
onSpark.executeQuery(format("INSERT INTO %s VALUES ('%s', %d)", sparkTableName, engineName, value));
break;
default:
throw new UnsupportedOperationException("Unexpected engine: " + engine);
}

inserted.add(row(engineName, value));
}
return inserted;
})
.collect(toImmutableList())).stream()
.map(MoreFutures::getDone)
.flatMap(List::stream)
.collect(toImmutableList());
Expand Down Expand Up @@ -1568,4 +1568,206 @@ public enum CreateMode
CREATE_TABLE_AS_SELECT,
CREATE_TABLE_WITH_NO_DATA_AND_INSERT,
}

@Test(groups = {ICEBERG, PROFILE_SPECIFIC_TESTS}, dataProvider = "storageFormats")
public void testSparkDoesntReadTrinoTableWithDeletes(StorageFormat storageFormat)
{
String baseTableName = "test_spark_reads_trino_partitioned_table_with_deletes_after_expiring_snapshots_after_optimize" + storageFormat;
String trinoTableName = trinoTableName(baseTableName);
String sparkTableName = sparkTableName(baseTableName);
onTrino().executeQuery("DROP TABLE IF EXISTS " + trinoTableName);

onTrino().executeQuery(format("CREATE TABLE %s (_string VARCHAR, _bigint BIGINT) WITH (partitioning = ARRAY['_string'], format = '%s')", trinoTableName, storageFormat));
// separate inserts give us snapshot per insert
onTrino().executeQuery(format("INSERT INTO %s VALUES ('a', 1001)", trinoTableName));
onTrino().executeQuery(format("INSERT INTO %s VALUES ('a', 1002)", trinoTableName));
onSpark().executeQuery(format("DELETE FROM %s WHERE _bigint = 1002", sparkTableName));
onTrino().executeQuery(format("INSERT INTO %s VALUES ('a', 1003)", trinoTableName));
onTrino().executeQuery(format("INSERT INTO %s VALUES ('a', 1004)", trinoTableName));

Row row = row(3008);
String selectByString = "SELECT SUM(_bigint) FROM %s WHERE _string = 'a'";
assertThat(onTrino().executeQuery(format(selectByString, trinoTableName)))
.containsOnly(row);
assertThat(onSpark().executeQuery(format(selectByString, sparkTableName)))
.containsOnly(row);

onTrino().executeQuery("DROP TABLE " + trinoTableName);
}

@Test(groups = {ICEBERG, PROFILE_SPECIFIC_TESTS}, dataProvider = "storageFormats")
public void testSparkReadsTrinoTableWithDeletes(StorageFormat storageFormat)
{
String baseTableName = "test_spark_reads_trino_partitioned_table_with_deletes_after_expiring_snapshots_after_optimize" + storageFormat;
String trinoTableName = trinoTableName(baseTableName);
String sparkTableName = sparkTableName(baseTableName);
onTrino().executeQuery("DROP TABLE IF EXISTS " + trinoTableName);

onTrino().executeQuery(format("CREATE TABLE %s (_string VARCHAR, _bigint BIGINT) WITH (partitioning = ARRAY['_string'], format = '%s')", trinoTableName, storageFormat));
// separate inserts give us snapshot per insert
onTrino().executeQuery(format("INSERT INTO %s VALUES ('a', 1001)", trinoTableName));
onTrino().executeQuery(format("INSERT INTO %s VALUES ('a', 1002)", trinoTableName));
onSpark().executeQuery(format("DELETE FROM %s WHERE _bigint = 1002", sparkTableName));
onSpark().executeQuery(format("INSERT INTO %s VALUES ('a', 1003)", sparkTableName));
onSpark().executeQuery(format("INSERT INTO %s VALUES ('a', 1004)", sparkTableName));

Row row = row(3008);
String selectByString = "SELECT SUM(_bigint) FROM %s WHERE _string = 'a'";
assertThat(onTrino().executeQuery(format(selectByString, trinoTableName)))
.containsOnly(row);
assertThat(onSpark().executeQuery(format(selectByString, sparkTableName)))
.containsOnly(row);

onTrino().executeQuery("DROP TABLE " + trinoTableName);
}

@Test(groups = {ICEBERG, PROFILE_SPECIFIC_TESTS}, dataProvider = "storageFormats")
public void testSparkReadsTrinoPartitionedTableWithOnlyTrinoInserts(StorageFormat storageFormat)
{
String baseTableName = "test_spark_trino_partitioned_tables_with_trino_only_inserts" + storageFormat;
String trinoTableName = trinoTableName(baseTableName);
String sparkTableName = sparkTableName(baseTableName);
onTrino().executeQuery("DROP TABLE IF EXISTS " + trinoTableName);

onTrino().executeQuery(format("CREATE TABLE %s (_string VARCHAR, _bigint BIGINT) WITH (partitioning = ARRAY['_string'], format = '%s')", trinoTableName, storageFormat));
// separate inserts give us snapshot per insert
onTrino().executeQuery(format("INSERT INTO %s VALUES ('a', 1001)", trinoTableName));
onTrino().executeQuery(format("INSERT INTO %s VALUES ('a', 1002)", trinoTableName));
onTrino().executeQuery(format("INSERT INTO %s VALUES ('a', 1003)", trinoTableName));
onTrino().executeQuery(format("INSERT INTO %s VALUES ('b', 2001)", trinoTableName));
onTrino().executeQuery(format("INSERT INTO %s VALUES ('b', 2002)", trinoTableName));
onTrino().executeQuery(format("INSERT INTO %s VALUES ('b', 2003)", trinoTableName));
onTrino().executeQuery(format("INSERT INTO %s VALUES ('c', 3001)", trinoTableName));
onTrino().executeQuery(format("INSERT INTO %s VALUES ('c', 3002)", trinoTableName));
onTrino().executeQuery(format("INSERT INTO %s VALUES ('c', 3003)", trinoTableName));

Row row1 = row(3006);
String selectByString1 = "SELECT SUM(_bigint) FROM %s WHERE _string = 'a'";
assertThat(onTrino().executeQuery(format(selectByString1, trinoTableName)))
.containsOnly(row1);
assertThat(onSpark().executeQuery(format(selectByString1, sparkTableName)))
.containsOnly(row1);

Row row2 = row(6006);
String selectByString2 = "SELECT SUM(_bigint) FROM %s WHERE _string = 'b'";
assertThat(onTrino().executeQuery(format(selectByString2, trinoTableName)))
.containsOnly(row2);
assertThat(onSpark().executeQuery(format(selectByString2, sparkTableName)))
.containsOnly(row2);

Row row3 = row(9006);
String selectByString3 = "SELECT SUM(_bigint) FROM %s WHERE _string = 'c'";
assertThat(onTrino().executeQuery(format(selectByString3, trinoTableName)))
.containsOnly(row3);
assertThat(onSpark().executeQuery(format(selectByString3, sparkTableName)))
.containsOnly(row3);
}

@Test(groups = {ICEBERG, PROFILE_SPECIFIC_TESTS}, dataProvider = "storageFormats")
public void testSparkReadsTrinoPartitionedTableWithOnlySparkInserts(StorageFormat storageFormat)
{
String baseTableName = "test_spark_trino_partitioned_tables_with_spark_only_inserts" + storageFormat;
String trinoTableName = trinoTableName(baseTableName);
String sparkTableName = sparkTableName(baseTableName);
onTrino().executeQuery("DROP TABLE IF EXISTS " + trinoTableName);

onTrino().executeQuery(format("CREATE TABLE %s (_string VARCHAR, _bigint BIGINT) WITH (partitioning = ARRAY['_string'], format = '%s')", trinoTableName, storageFormat));
// separate inserts give us snapshot per insert
onSpark().executeQuery(format("INSERT INTO %s VALUES ('a', 1001)", sparkTableName));
onSpark().executeQuery(format("INSERT INTO %s VALUES ('a', 1002)", sparkTableName));
onSpark().executeQuery(format("INSERT INTO %s VALUES ('a', 1003)", sparkTableName));
onSpark().executeQuery(format("INSERT INTO %s VALUES ('b', 2001)", sparkTableName));
onSpark().executeQuery(format("INSERT INTO %s VALUES ('b', 2002)", sparkTableName));
onSpark().executeQuery(format("INSERT INTO %s VALUES ('b', 2003)", sparkTableName));
onSpark().executeQuery(format("INSERT INTO %s VALUES ('c', 3001)", sparkTableName));
onSpark().executeQuery(format("INSERT INTO %s VALUES ('c', 3002)", sparkTableName));
onSpark().executeQuery(format("INSERT INTO %s VALUES ('c', 3003)", sparkTableName));

Row row1 = row(3006);
String selectByString1 = "SELECT SUM(_bigint) FROM %s WHERE _string = 'a'";
assertThat(onTrino().executeQuery(format(selectByString1, trinoTableName)))
.containsOnly(row1);
assertThat(onSpark().executeQuery(format(selectByString1, sparkTableName)))
.containsOnly(row1);

Row row2 = row(6006);
String selectByString2 = "SELECT SUM(_bigint) FROM %s WHERE _string = 'b'";
assertThat(onTrino().executeQuery(format(selectByString2, trinoTableName)))
.containsOnly(row2);
assertThat(onSpark().executeQuery(format(selectByString2, sparkTableName)))
.containsOnly(row2);

Row row3 = row(9006);
String selectByString3 = "SELECT SUM(_bigint) FROM %s WHERE _string = 'c'";
assertThat(onTrino().executeQuery(format(selectByString3, trinoTableName)))
.containsOnly(row3);
assertThat(onSpark().executeQuery(format(selectByString3, sparkTableName)))
.containsOnly(row3);
}

@Test(groups = {ICEBERG, PROFILE_SPECIFIC_TESTS}, dataProvider = "storageFormats")
public void testTrinoReadsSparkUnpartitioned(StorageFormat storageFormat)
{
String baseTableName = "test_spark_trino_partitioned_tables_with_spark_only_inserts" + storageFormat;
String trinoTableName = trinoTableName(baseTableName);
String sparkTableName = sparkTableName(baseTableName);
onTrino().executeQuery("DROP TABLE IF EXISTS " + trinoTableName);

onTrino().executeQuery(format("CREATE TABLE %s (_string VARCHAR, _bigint BIGINT) WITH (format = '%s')", trinoTableName, storageFormat));
// separate inserts give us snapshot per insert

onSpark().executeQuery(format("INSERT INTO %s VALUES ('a', 1001)", sparkTableName));
onSpark().executeQuery(format("INSERT INTO %s VALUES ('a', 1002)", sparkTableName));
onSpark().executeQuery(format("INSERT INTO %s VALUES ('a', 1003)", sparkTableName));
onSpark().executeQuery(format("INSERT INTO %s VALUES ('b', 2001)", sparkTableName));
onTrino().executeQuery(format("INSERT INTO %s VALUES ('b', 2002)", trinoTableName));
onSpark().executeQuery(format("INSERT INTO %s VALUES ('b', 2003)", sparkTableName));
onSpark().executeQuery(format("INSERT INTO %s VALUES ('c', 3001)", sparkTableName));
onSpark().executeQuery(format("INSERT INTO %s VALUES ('c', 3002)", sparkTableName));
onTrino().executeQuery(format("INSERT INTO %s VALUES ('c', 3003)", trinoTableName));

Row row1 = row(3006);
String selectByString1 = "SELECT SUM(_bigint) FROM %s WHERE _string = 'a'";
assertThat(onTrino().executeQuery(format(selectByString1, trinoTableName)))
.containsOnly(row1);
assertThat(onSpark().executeQuery(format(selectByString1, sparkTableName)))
.containsOnly(row1);

Row row2 = row(6006);
String selectByString2 = "SELECT SUM(_bigint) FROM %s WHERE _string = 'b'";
assertThat(onTrino().executeQuery(format(selectByString2, trinoTableName)))
.containsOnly(row2);
assertThat(onSpark().executeQuery(format(selectByString2, sparkTableName)))
.containsOnly(row2);

Row row3 = row(9006);
String selectByString3 = "SELECT SUM(_bigint) FROM %s WHERE _string = 'c'";
assertThat(onTrino().executeQuery(format(selectByString3, trinoTableName)))
.containsOnly(row3);
assertThat(onSpark().executeQuery(format(selectByString3, sparkTableName)))
.containsOnly(row3);
}

@Test(groups = {ICEBERG, PROFILE_SPECIFIC_TESTS}, dataProvider = "storageFormats")
public void testSparkSelectsBeforeAndAfterInsertFromTrino(StorageFormat storageFormat)
{
String baseTableName = "test_spark_selects_befpre_and_after_insert_from_trino" + storageFormat;
String trinoTableName = trinoTableName(baseTableName);
String sparkTableName = sparkTableName(baseTableName);
onTrino().executeQuery("DROP TABLE IF EXISTS " + trinoTableName);

onTrino().executeQuery(format("CREATE TABLE %s (_string VARCHAR, _bigint BIGINT) WITH (format = '%s')", trinoTableName, storageFormat));
// separate inserts give us snapshot per insert

onTrino().executeQuery(format("INSERT INTO %s VALUES ('a', 1001)", trinoTableName));
assertThat(onSpark().executeQuery(format("SELECT _bigint FROM %s WHERE _string = 'a'", sparkTableName))).containsOnly(row(1001));
onTrino().executeQuery(format("INSERT INTO %s VALUES ('a', 1002)", trinoTableName));

Row row1 = row(2003);
String selectByString1 = "SELECT SUM(_bigint) FROM %s WHERE _string = 'a'";
assertThat(onTrino().executeQuery(format(selectByString1, trinoTableName)))
.containsOnly(row1);
assertThat(onSpark().executeQuery(format(selectByString1, sparkTableName)))
.containsOnly(row1);
}
}