diff --git a/testing/trino-product-tests/src/main/java/io/trino/tests/product/iceberg/TestIcebergSparkCompatibility.java b/testing/trino-product-tests/src/main/java/io/trino/tests/product/iceberg/TestIcebergSparkCompatibility.java index 88a76824d8ca..7842de2ac75d 100644 --- a/testing/trino-product-tests/src/main/java/io/trino/tests/product/iceberg/TestIcebergSparkCompatibility.java +++ b/testing/trino-product-tests/src/main/java/io/trino/tests/product/iceberg/TestIcebergSparkCompatibility.java @@ -1065,35 +1065,35 @@ public void testTrinoSparkConcurrentInsert() QueryExecutor onTrino = onTrino(); QueryExecutor onSpark = onSpark(); List allInserted = executor.invokeAll( - Stream.of(Engine.TRINO, Engine.SPARK) - .map(engine -> (Callable>) () -> { - List inserted = new ArrayList<>(); - for (int i = 0; i < insertsPerEngine; i++) { - barrier.await(20, SECONDS); - String engineName = engine.name().toLowerCase(ENGLISH); - long value = i; - switch (engine) { - case TRINO: - try { - onTrino.executeQuery(format("INSERT INTO %s VALUES ('%s', %d)", trinoTableName, engineName, value)); - } - catch (QueryExecutionException queryExecutionException) { - // failed to insert - continue; // next loop iteration - } - break; - case SPARK: - onSpark.executeQuery(format("INSERT INTO %s VALUES ('%s', %d)", sparkTableName, engineName, value)); - break; - default: - throw new UnsupportedOperationException("Unexpected engine: " + engine); + Stream.of(Engine.TRINO, Engine.SPARK) + .map(engine -> (Callable>) () -> { + List inserted = new ArrayList<>(); + for (int i = 0; i < insertsPerEngine; i++) { + barrier.await(20, SECONDS); + String engineName = engine.name().toLowerCase(ENGLISH); + long value = i; + switch (engine) { + case TRINO: + try { + onTrino.executeQuery(format("INSERT INTO %s VALUES ('%s', %d)", trinoTableName, engineName, value)); } - - inserted.add(row(engineName, value)); - } - return inserted; - }) - .collect(toImmutableList())).stream() + catch (QueryExecutionException queryExecutionException) { + // failed to insert + continue; // next loop iteration + } + break; + case SPARK: + onSpark.executeQuery(format("INSERT INTO %s VALUES ('%s', %d)", sparkTableName, engineName, value)); + break; + default: + throw new UnsupportedOperationException("Unexpected engine: " + engine); + } + + inserted.add(row(engineName, value)); + } + return inserted; + }) + .collect(toImmutableList())).stream() .map(MoreFutures::getDone) .flatMap(List::stream) .collect(toImmutableList()); @@ -1568,4 +1568,206 @@ public enum CreateMode CREATE_TABLE_AS_SELECT, CREATE_TABLE_WITH_NO_DATA_AND_INSERT, } + + @Test(groups = {ICEBERG, PROFILE_SPECIFIC_TESTS}, dataProvider = "storageFormats") + public void testSparkDoesntReadTrinoTableWithDeletes(StorageFormat storageFormat) + { + String baseTableName = "test_spark_reads_trino_partitioned_table_with_deletes_after_expiring_snapshots_after_optimize" + storageFormat; + String trinoTableName = trinoTableName(baseTableName); + String sparkTableName = sparkTableName(baseTableName); + onTrino().executeQuery("DROP TABLE IF EXISTS " + trinoTableName); + + onTrino().executeQuery(format("CREATE TABLE %s (_string VARCHAR, _bigint BIGINT) WITH (partitioning = ARRAY['_string'], format = '%s')", trinoTableName, storageFormat)); + // separate inserts give us snapshot per insert + onTrino().executeQuery(format("INSERT INTO %s VALUES ('a', 1001)", trinoTableName)); + onTrino().executeQuery(format("INSERT INTO %s VALUES ('a', 1002)", trinoTableName)); + onSpark().executeQuery(format("DELETE FROM %s WHERE _bigint = 1002", sparkTableName)); + onTrino().executeQuery(format("INSERT INTO %s VALUES ('a', 1003)", trinoTableName)); + onTrino().executeQuery(format("INSERT INTO %s VALUES ('a', 1004)", trinoTableName)); + + Row row = row(3008); + String selectByString = "SELECT SUM(_bigint) FROM %s WHERE _string = 'a'"; + assertThat(onTrino().executeQuery(format(selectByString, trinoTableName))) + .containsOnly(row); + assertThat(onSpark().executeQuery(format(selectByString, sparkTableName))) + .containsOnly(row); + + onTrino().executeQuery("DROP TABLE " + trinoTableName); + } + + @Test(groups = {ICEBERG, PROFILE_SPECIFIC_TESTS}, dataProvider = "storageFormats") + public void testSparkReadsTrinoTableWithDeletes(StorageFormat storageFormat) + { + String baseTableName = "test_spark_reads_trino_partitioned_table_with_deletes_after_expiring_snapshots_after_optimize" + storageFormat; + String trinoTableName = trinoTableName(baseTableName); + String sparkTableName = sparkTableName(baseTableName); + onTrino().executeQuery("DROP TABLE IF EXISTS " + trinoTableName); + + onTrino().executeQuery(format("CREATE TABLE %s (_string VARCHAR, _bigint BIGINT) WITH (partitioning = ARRAY['_string'], format = '%s')", trinoTableName, storageFormat)); + // separate inserts give us snapshot per insert + onTrino().executeQuery(format("INSERT INTO %s VALUES ('a', 1001)", trinoTableName)); + onTrino().executeQuery(format("INSERT INTO %s VALUES ('a', 1002)", trinoTableName)); + onSpark().executeQuery(format("DELETE FROM %s WHERE _bigint = 1002", sparkTableName)); + onSpark().executeQuery(format("INSERT INTO %s VALUES ('a', 1003)", sparkTableName)); + onSpark().executeQuery(format("INSERT INTO %s VALUES ('a', 1004)", sparkTableName)); + + Row row = row(3008); + String selectByString = "SELECT SUM(_bigint) FROM %s WHERE _string = 'a'"; + assertThat(onTrino().executeQuery(format(selectByString, trinoTableName))) + .containsOnly(row); + assertThat(onSpark().executeQuery(format(selectByString, sparkTableName))) + .containsOnly(row); + + onTrino().executeQuery("DROP TABLE " + trinoTableName); + } + + @Test(groups = {ICEBERG, PROFILE_SPECIFIC_TESTS}, dataProvider = "storageFormats") + public void testSparkReadsTrinoPartitionedTableWithOnlyTrinoInserts(StorageFormat storageFormat) + { + String baseTableName = "test_spark_trino_partitioned_tables_with_trino_only_inserts" + storageFormat; + String trinoTableName = trinoTableName(baseTableName); + String sparkTableName = sparkTableName(baseTableName); + onTrino().executeQuery("DROP TABLE IF EXISTS " + trinoTableName); + + onTrino().executeQuery(format("CREATE TABLE %s (_string VARCHAR, _bigint BIGINT) WITH (partitioning = ARRAY['_string'], format = '%s')", trinoTableName, storageFormat)); + // separate inserts give us snapshot per insert + onTrino().executeQuery(format("INSERT INTO %s VALUES ('a', 1001)", trinoTableName)); + onTrino().executeQuery(format("INSERT INTO %s VALUES ('a', 1002)", trinoTableName)); + onTrino().executeQuery(format("INSERT INTO %s VALUES ('a', 1003)", trinoTableName)); + onTrino().executeQuery(format("INSERT INTO %s VALUES ('b', 2001)", trinoTableName)); + onTrino().executeQuery(format("INSERT INTO %s VALUES ('b', 2002)", trinoTableName)); + onTrino().executeQuery(format("INSERT INTO %s VALUES ('b', 2003)", trinoTableName)); + onTrino().executeQuery(format("INSERT INTO %s VALUES ('c', 3001)", trinoTableName)); + onTrino().executeQuery(format("INSERT INTO %s VALUES ('c', 3002)", trinoTableName)); + onTrino().executeQuery(format("INSERT INTO %s VALUES ('c', 3003)", trinoTableName)); + + Row row1 = row(3006); + String selectByString1 = "SELECT SUM(_bigint) FROM %s WHERE _string = 'a'"; + assertThat(onTrino().executeQuery(format(selectByString1, trinoTableName))) + .containsOnly(row1); + assertThat(onSpark().executeQuery(format(selectByString1, sparkTableName))) + .containsOnly(row1); + + Row row2 = row(6006); + String selectByString2 = "SELECT SUM(_bigint) FROM %s WHERE _string = 'b'"; + assertThat(onTrino().executeQuery(format(selectByString2, trinoTableName))) + .containsOnly(row2); + assertThat(onSpark().executeQuery(format(selectByString2, sparkTableName))) + .containsOnly(row2); + + Row row3 = row(9006); + String selectByString3 = "SELECT SUM(_bigint) FROM %s WHERE _string = 'c'"; + assertThat(onTrino().executeQuery(format(selectByString3, trinoTableName))) + .containsOnly(row3); + assertThat(onSpark().executeQuery(format(selectByString3, sparkTableName))) + .containsOnly(row3); + } + + @Test(groups = {ICEBERG, PROFILE_SPECIFIC_TESTS}, dataProvider = "storageFormats") + public void testSparkReadsTrinoPartitionedTableWithOnlySparkInserts(StorageFormat storageFormat) + { + String baseTableName = "test_spark_trino_partitioned_tables_with_spark_only_inserts" + storageFormat; + String trinoTableName = trinoTableName(baseTableName); + String sparkTableName = sparkTableName(baseTableName); + onTrino().executeQuery("DROP TABLE IF EXISTS " + trinoTableName); + + onTrino().executeQuery(format("CREATE TABLE %s (_string VARCHAR, _bigint BIGINT) WITH (partitioning = ARRAY['_string'], format = '%s')", trinoTableName, storageFormat)); + // separate inserts give us snapshot per insert + onSpark().executeQuery(format("INSERT INTO %s VALUES ('a', 1001)", sparkTableName)); + onSpark().executeQuery(format("INSERT INTO %s VALUES ('a', 1002)", sparkTableName)); + onSpark().executeQuery(format("INSERT INTO %s VALUES ('a', 1003)", sparkTableName)); + onSpark().executeQuery(format("INSERT INTO %s VALUES ('b', 2001)", sparkTableName)); + onSpark().executeQuery(format("INSERT INTO %s VALUES ('b', 2002)", sparkTableName)); + onSpark().executeQuery(format("INSERT INTO %s VALUES ('b', 2003)", sparkTableName)); + onSpark().executeQuery(format("INSERT INTO %s VALUES ('c', 3001)", sparkTableName)); + onSpark().executeQuery(format("INSERT INTO %s VALUES ('c', 3002)", sparkTableName)); + onSpark().executeQuery(format("INSERT INTO %s VALUES ('c', 3003)", sparkTableName)); + + Row row1 = row(3006); + String selectByString1 = "SELECT SUM(_bigint) FROM %s WHERE _string = 'a'"; + assertThat(onTrino().executeQuery(format(selectByString1, trinoTableName))) + .containsOnly(row1); + assertThat(onSpark().executeQuery(format(selectByString1, sparkTableName))) + .containsOnly(row1); + + Row row2 = row(6006); + String selectByString2 = "SELECT SUM(_bigint) FROM %s WHERE _string = 'b'"; + assertThat(onTrino().executeQuery(format(selectByString2, trinoTableName))) + .containsOnly(row2); + assertThat(onSpark().executeQuery(format(selectByString2, sparkTableName))) + .containsOnly(row2); + + Row row3 = row(9006); + String selectByString3 = "SELECT SUM(_bigint) FROM %s WHERE _string = 'c'"; + assertThat(onTrino().executeQuery(format(selectByString3, trinoTableName))) + .containsOnly(row3); + assertThat(onSpark().executeQuery(format(selectByString3, sparkTableName))) + .containsOnly(row3); + } + + @Test(groups = {ICEBERG, PROFILE_SPECIFIC_TESTS}, dataProvider = "storageFormats") + public void testTrinoReadsSparkUnpartitioned(StorageFormat storageFormat) + { + String baseTableName = "test_spark_trino_partitioned_tables_with_spark_only_inserts" + storageFormat; + String trinoTableName = trinoTableName(baseTableName); + String sparkTableName = sparkTableName(baseTableName); + onTrino().executeQuery("DROP TABLE IF EXISTS " + trinoTableName); + + onTrino().executeQuery(format("CREATE TABLE %s (_string VARCHAR, _bigint BIGINT) WITH (format = '%s')", trinoTableName, storageFormat)); + // separate inserts give us snapshot per insert + + onSpark().executeQuery(format("INSERT INTO %s VALUES ('a', 1001)", sparkTableName)); + onSpark().executeQuery(format("INSERT INTO %s VALUES ('a', 1002)", sparkTableName)); + onSpark().executeQuery(format("INSERT INTO %s VALUES ('a', 1003)", sparkTableName)); + onSpark().executeQuery(format("INSERT INTO %s VALUES ('b', 2001)", sparkTableName)); + onTrino().executeQuery(format("INSERT INTO %s VALUES ('b', 2002)", trinoTableName)); + onSpark().executeQuery(format("INSERT INTO %s VALUES ('b', 2003)", sparkTableName)); + onSpark().executeQuery(format("INSERT INTO %s VALUES ('c', 3001)", sparkTableName)); + onSpark().executeQuery(format("INSERT INTO %s VALUES ('c', 3002)", sparkTableName)); + onTrino().executeQuery(format("INSERT INTO %s VALUES ('c', 3003)", trinoTableName)); + + Row row1 = row(3006); + String selectByString1 = "SELECT SUM(_bigint) FROM %s WHERE _string = 'a'"; + assertThat(onTrino().executeQuery(format(selectByString1, trinoTableName))) + .containsOnly(row1); + assertThat(onSpark().executeQuery(format(selectByString1, sparkTableName))) + .containsOnly(row1); + + Row row2 = row(6006); + String selectByString2 = "SELECT SUM(_bigint) FROM %s WHERE _string = 'b'"; + assertThat(onTrino().executeQuery(format(selectByString2, trinoTableName))) + .containsOnly(row2); + assertThat(onSpark().executeQuery(format(selectByString2, sparkTableName))) + .containsOnly(row2); + + Row row3 = row(9006); + String selectByString3 = "SELECT SUM(_bigint) FROM %s WHERE _string = 'c'"; + assertThat(onTrino().executeQuery(format(selectByString3, trinoTableName))) + .containsOnly(row3); + assertThat(onSpark().executeQuery(format(selectByString3, sparkTableName))) + .containsOnly(row3); + } + + @Test(groups = {ICEBERG, PROFILE_SPECIFIC_TESTS}, dataProvider = "storageFormats") + public void testSparkSelectsBeforeAndAfterInsertFromTrino(StorageFormat storageFormat) + { + String baseTableName = "test_spark_selects_befpre_and_after_insert_from_trino" + storageFormat; + String trinoTableName = trinoTableName(baseTableName); + String sparkTableName = sparkTableName(baseTableName); + onTrino().executeQuery("DROP TABLE IF EXISTS " + trinoTableName); + + onTrino().executeQuery(format("CREATE TABLE %s (_string VARCHAR, _bigint BIGINT) WITH (format = '%s')", trinoTableName, storageFormat)); + // separate inserts give us snapshot per insert + + onTrino().executeQuery(format("INSERT INTO %s VALUES ('a', 1001)", trinoTableName)); + assertThat(onSpark().executeQuery(format("SELECT _bigint FROM %s WHERE _string = 'a'", sparkTableName))).containsOnly(row(1001)); + onTrino().executeQuery(format("INSERT INTO %s VALUES ('a', 1002)", trinoTableName)); + + Row row1 = row(2003); + String selectByString1 = "SELECT SUM(_bigint) FROM %s WHERE _string = 'a'"; + assertThat(onTrino().executeQuery(format(selectByString1, trinoTableName))) + .containsOnly(row1); + assertThat(onSpark().executeQuery(format(selectByString1, sparkTableName))) + .containsOnly(row1); + } }