diff --git a/presto-native-execution/pom.xml b/presto-native-execution/pom.xml index 38211e1442ccf..60fc91e62bfd7 100644 --- a/presto-native-execution/pom.xml +++ b/presto-native-execution/pom.xml @@ -309,6 +309,12 @@ test + + com.facebook.presto.hadoop + hadoop-apache + test + + com.facebook.presto presto-hive diff --git a/presto-native-execution/src/test/java/com/facebook/presto/nativeworker/iceberg/IcebergPartitionTestBase.java b/presto-native-execution/src/test/java/com/facebook/presto/nativeworker/iceberg/IcebergPartitionTestBase.java new file mode 100644 index 0000000000000..5b37388b0bd60 --- /dev/null +++ b/presto-native-execution/src/test/java/com/facebook/presto/nativeworker/iceberg/IcebergPartitionTestBase.java @@ -0,0 +1,426 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.facebook.presto.nativeworker.iceberg; + +import com.facebook.presto.Session; +import com.facebook.presto.nativeworker.PrestoNativeQueryRunnerUtils; +import com.facebook.presto.testing.ExpectedQueryRunner; +import com.facebook.presto.testing.MaterializedResult; +import com.facebook.presto.testing.MaterializedRow; +import com.facebook.presto.testing.QueryRunner; +import com.facebook.presto.tests.AbstractTestQueryFramework; +import com.google.common.collect.ImmutableList; +import com.google.common.collect.ImmutableSet; + +import java.util.HashMap; +import java.util.HashSet; +import java.util.LinkedHashMap; +import java.util.Map; +import java.util.Optional; +import java.util.Set; + +import static com.facebook.presto.testing.assertions.Assert.assertEquals; +import static java.lang.String.format; +import static org.testng.Assert.assertNotNull; +import static org.testng.Assert.assertTrue; +import static org.testng.Assert.fail; + +public class IcebergPartitionTestBase + extends AbstractTestQueryFramework +{ + public static final String TEST_TABLE_PREFIX = "transform_test_"; + public static final String DROP_TABLE_TEMPLATE = "DROP TABLE IF EXISTS %s"; + + public static final String TRANSFORM_IDENTITY = "identity"; + public static final String TRANSFORM_BUCKET = "bucket"; + public static final String TRANSFORM_TRUNCATE = "truncate"; + public static final String TRANSFORM_YEAR = "year"; + public static final String TRANSFORM_MONTH = "month"; + public static final String TRANSFORM_DAY = "day"; + public static final String TRANSFORM_HOUR = "hour"; + + private static final Set TEMPORAL_TYPES = ImmutableSet.of("date", "timestamp"); + + @Override + protected QueryRunner createQueryRunner() + throws Exception + { + return PrestoNativeQueryRunnerUtils.nativeIcebergQueryRunnerBuilder().build(); + } + + @Override + protected ExpectedQueryRunner createExpectedQueryRunner() + throws Exception + { + return PrestoNativeQueryRunnerUtils.javaIcebergQueryRunnerBuilder().build(); + } + + public Optional createPartitionedTables(String transform, String dataType, String param) + { + if (!isValidTransformForType(transform, dataType)) { + return Optional.empty(); + } + + String nativeTableName = getTableName(transform, dataType) + "_native"; + String javaTableName = getTableName(transform, dataType) + "_java"; + String columnName = dataType + "_col"; + String partitioningClause = buildPartitioningClause(transform, columnName, param); + + dropTableSafely(nativeTableName); + dropTableOnJavaRunner(javaTableName); + + String createTableSql = getCreateTableSql(nativeTableName, columnName, dataType, partitioningClause); + assertQuerySucceeds(createTableSql); + assertQuerySucceeds(format("INSERT INTO %s SELECT int_col, %s FROM test_data", nativeTableName, columnName)); + + ((QueryRunner) getExpectedQueryRunner()).execute( + getCreateTableSql(javaTableName, columnName, dataType, partitioningClause)); + + Session legacyTimestampDisabled = Session.builder(getSession()) + .setSystemProperty("legacy_timestamp", "false") + .build(); + + ((QueryRunner) getExpectedQueryRunner()).execute(legacyTimestampDisabled, + format("INSERT INTO %s SELECT int_col, %s FROM test_data", javaTableName, columnName)); + + MaterializedResult nativeResult = computeActual(format("SELECT * FROM %s ORDER BY id", nativeTableName)); + MaterializedResult javaResult = computeExpected(format("SELECT * FROM %s ORDER BY id", javaTableName), ImmutableList.of()); + assertEquals(nativeResult, javaResult, "Data should match between native and Java tables"); + + return Optional.of(new TableNamePair(nativeTableName, javaTableName)); + } + + protected void dropTableSafely(String tableName) + { + assertQuerySucceeds(format(DROP_TABLE_TEMPLATE, tableName)); + } + + protected void dropTableOnJavaRunner(String tableName) + { + try { + ((QueryRunner) getExpectedQueryRunner()).execute(format(DROP_TABLE_TEMPLATE, tableName)); + } + catch (Exception e) { + } + } + + public void verifyTransform(String transform, PartitionInfo partitionInfo) + { + assertTrue(partitionInfo.partitionValuesMatch(), + format("Partition values should match between native and Java tables for transform %s", transform)); + assertTrue(partitionInfo.partitionCountsMatch(), + format("Partition file counts should match between native and Java tables for transform %s", transform)); + } + + public void verifyPartitionTransform(String transform, String param, PartitionInfo partitionInfo) + { + switch (transform) { + case TRANSFORM_BUCKET: + verifyBucketTransform(param, partitionInfo); + break; + case "trunc": + case TRANSFORM_TRUNCATE: + case TRANSFORM_YEAR: + case TRANSFORM_MONTH: + case TRANSFORM_DAY: + case TRANSFORM_HOUR: + case TRANSFORM_IDENTITY: + verifyTransform(transform, partitionInfo); + break; + default: + fail("No specific verification for transform: " + transform); + } + } + + public String buildPartitioningClause(String transform, String column, String parameter) + { + switch (transform) { + case TRANSFORM_IDENTITY: + return column; + case TRANSFORM_BUCKET: + if (parameter == null) { + throw new IllegalArgumentException("Bucket transform requires a parameter"); + } + return format("bucket(%s, %s)", column, parameter); + case "trunc": + case TRANSFORM_TRUNCATE: + if (parameter == null) { + throw new IllegalArgumentException("Truncate transform requires a parameter"); + } + return format("truncate(%s, %s)", column, parameter); + case TRANSFORM_YEAR: + case TRANSFORM_MONTH: + case TRANSFORM_DAY: + case TRANSFORM_HOUR: + return format("%s(%s)", transform, column); + default: + throw new IllegalArgumentException("Unknown transform: " + transform); + } + } + + public PartitionInfo collectPartitionInfo(String transform, String nativeTableName, String javaTableName, String columnName) + { + MaterializedResult nativePartitionInfo = computeActual( + format("SELECT file_path FROM \"%s$files\"", nativeTableName)); + + MaterializedResult javaPartitionInfo = computeExpected( + format("SELECT file_path FROM \"%s$files\"", javaTableName), + ImmutableList.of()); + + assertTrue(nativePartitionInfo.getRowCount() > 0, "Native runner should have at least one data file"); + assertEquals(nativePartitionInfo.getRowCount(), javaPartitionInfo.getRowCount(), + "Native and Java tables should have the same number of data files"); + + String partitionColumnName = getPartitionColumnName(columnName, transform); + + Set nativeUniquePartitionValues = new HashSet<>(); + Map nativePartitionValueCounts = new HashMap<>(); + collectPartitionValuesFromFiles(nativePartitionInfo, partitionColumnName, + nativeUniquePartitionValues, nativePartitionValueCounts); + + Set javaUniquePartitionValues = new HashSet<>(); + Map javaPartitionValueCounts = new HashMap<>(); + collectPartitionValuesFromFiles(javaPartitionInfo, partitionColumnName, + javaUniquePartitionValues, javaPartitionValueCounts); + + return new PartitionInfo( + nativePartitionInfo, + javaPartitionInfo, + partitionColumnName, + nativeUniquePartitionValues, + javaUniquePartitionValues, + nativePartitionValueCounts, + javaPartitionValueCounts); + } + + private void collectPartitionValuesFromFiles(MaterializedResult fileInfo, String partitionColumnName, + Set uniqueValues, Map valueCounts) + { + for (int i = 0; i < fileInfo.getRowCount(); i++) { + String filePath = (String) fileInfo.getMaterializedRows().get(i).getField(0); + Map partitionValues = parsePartitionValues(filePath, 1); + + assertTrue(partitionValues.containsKey(partitionColumnName), + format("Partition column %s should exist in path: %s", partitionColumnName, filePath)); + + String partitionValue = partitionValues.get(partitionColumnName); + uniqueValues.add(partitionValue); + valueCounts.merge(partitionValue, 1, Integer::sum); + } + } + + public String getPartitionColumnName(String columnName, String transform) + { + if (TRANSFORM_IDENTITY.equals(transform)) { + return columnName; + } + return columnName + "_" + transform; + } + + public void verifyBucketTransform(String param, PartitionInfo partitionInfo) + { + int bucketCount = Integer.parseInt(param); + assertTrue(partitionInfo.nativeUniquePartitionValues.size() > 0, + "Should have at least one unique bucket value"); + + for (String bucketValue : partitionInfo.nativeUniquePartitionValues) { + if ("null".equals(bucketValue)) { + continue; + } + int bucket = Integer.parseInt(bucketValue); + assertTrue(bucket >= 0 && bucket < bucketCount, + format("Bucket value %d should be in range [0, %d)", bucket, bucketCount)); + } + } + + public void verifyPartitionsMetadata(String nativeTableName, String javaTableName) + { + verifyPartitionsMetadata(nativeTableName, javaTableName, 1); + } + + public void verifyPartitionsMetadata(String nativeTableName, String javaTableName, int partitionColumnCount) + { + StringBuilder orderByClause = new StringBuilder("ORDER BY "); + for (int i = 0; i < partitionColumnCount; i++) { + if (i > 0) { + orderByClause.append(", "); + } + orderByClause.append(i + 1); + } + + MaterializedResult nativePartitions = computeActual( + format("SELECT * FROM \"%s$partitions\" %s", nativeTableName, orderByClause)); + + MaterializedResult javaPartitions = computeExpected( + format("SELECT * FROM \"%s$partitions\" %s", javaTableName, orderByClause), + ImmutableList.of()); + + assertTrue(nativePartitions.getRowCount() > 0, + format("Native table %s should have partition metadata", nativeTableName)); + + assertEquals(nativePartitions.getRowCount(), javaPartitions.getRowCount(), + "Native and Java tables should have the same number of partition rows"); + + for (int i = 0; i < nativePartitions.getRowCount(); i++) { + MaterializedRow nativeRow = nativePartitions.getMaterializedRows().get(i); + MaterializedRow javaRow = javaPartitions.getMaterializedRows().get(i); + + assertEquals(nativeRow.getField(0), javaRow.getField(0), + "Partition value should match between native and Java tables"); + assertEquals(nativeRow.getField(1), javaRow.getField(1), + "Row count should match between native and Java tables"); + assertEquals(nativeRow.getField(2), javaRow.getField(2), + "File count should match between native and Java tables"); + } + } + + public String getTableName(String transform, String dataType) + { + return TEST_TABLE_PREFIX + transform + "_" + dataType; + } + + protected boolean isValidTransformForType(String transform, String dataType) + { + switch (transform) { + case TRANSFORM_YEAR: + case TRANSFORM_MONTH: + case TRANSFORM_DAY: + return TEMPORAL_TYPES.contains(dataType); + case TRANSFORM_HOUR: + return "timestamp".equals(dataType); + default: + return true; + } + } + + private String getDataTypeDefinition(String dataType) + { + switch (dataType) { + case "int": + return "INTEGER"; + case "bigint": + return "BIGINT"; + case "varchar": + return "VARCHAR"; + case "varbinary": + return "VARBINARY"; + case "date": + return "DATE"; + case "decimal": + return "DECIMAL(18, 6)"; + case "timestamp": + return "TIMESTAMP"; + default: + throw new IllegalArgumentException("Unsupported data type: " + dataType); + } + } + + private String getCreateTableSql(String tableName, String columnName, String dataType, String partitioningClause) + { + return format("CREATE TABLE %s (" + + " id INTEGER, " + + " %s %s" + + ") WITH (format = 'PARQUET', partitioning = ARRAY['%s'])", + tableName, columnName, getDataTypeDefinition(dataType), partitioningClause); + } + + protected static Map parsePartitionValues(String filePath, int partitionColumnCount) + { + Map partitionValues = new LinkedHashMap<>(); + String[] pathParts = filePath.split("/"); + + int startIndex = pathParts.length - partitionColumnCount - 1; + if (startIndex < 0) { + throw new IllegalArgumentException(format( + "File path does not contain enough parts for %d partition columns: %s", + partitionColumnCount, filePath)); + } + + for (int i = 0; i < partitionColumnCount; i++) { + String partitionPart = pathParts[startIndex + i]; + int equalsPos = partitionPart.indexOf('='); + + if (equalsPos <= 0 || equalsPos == partitionPart.length() - 1) { + throw new IllegalArgumentException("Invalid partition format in path: " + partitionPart); + } + + String key = partitionPart.substring(0, equalsPos); + String value = partitionPart.substring(equalsPos + 1); + partitionValues.put(key, value); + } + + return partitionValues; + } + + public static class PartitionInfo + { + public final MaterializedResult nativePartitionInfo; + public final MaterializedResult javaPartitionInfo; + public final String partitionColumnName; + public final Set nativeUniquePartitionValues; + public final Set javaUniquePartitionValues; + public final Map nativePartitionValueCounts; + public final Map javaPartitionValueCounts; + + public PartitionInfo( + MaterializedResult nativePartitionInfo, + MaterializedResult javaPartitionInfo, + String partitionColumnName, + Set nativeUniquePartitionValues, + Set javaUniquePartitionValues, + Map nativePartitionValueCounts, + Map javaPartitionValueCounts) + { + this.nativePartitionInfo = nativePartitionInfo; + this.javaPartitionInfo = javaPartitionInfo; + this.partitionColumnName = partitionColumnName; + this.nativeUniquePartitionValues = nativeUniquePartitionValues; + this.javaUniquePartitionValues = javaUniquePartitionValues; + this.nativePartitionValueCounts = nativePartitionValueCounts; + this.javaPartitionValueCounts = javaPartitionValueCounts; + } + + public boolean partitionValuesMatch() + { + return nativeUniquePartitionValues.equals(javaUniquePartitionValues); + } + + public boolean partitionCountsMatch() + { + return nativePartitionValueCounts.equals(javaPartitionValueCounts); + } + } + + public static class TableNamePair + { + private final String nativeTableName; + private final String javaTableName; + + public TableNamePair(String nativeTableName, String javaTableName) + { + this.nativeTableName = nativeTableName; + this.javaTableName = javaTableName; + } + + public String getNativeTableName() + { + return nativeTableName; + } + + public String getJavaTableName() + { + return javaTableName; + } + } +} diff --git a/presto-native-execution/src/test/java/com/facebook/presto/nativeworker/iceberg/TestPartitionTransforms.java b/presto-native-execution/src/test/java/com/facebook/presto/nativeworker/iceberg/TestPartitionTransforms.java new file mode 100644 index 0000000000000..2feeb9bc134dc --- /dev/null +++ b/presto-native-execution/src/test/java/com/facebook/presto/nativeworker/iceberg/TestPartitionTransforms.java @@ -0,0 +1,282 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.facebook.presto.nativeworker.iceberg; + +import com.facebook.presto.testing.MaterializedResult; +import com.facebook.presto.testing.QueryRunner; +import com.google.common.collect.ImmutableList; +import org.testng.annotations.AfterClass; +import org.testng.annotations.BeforeClass; +import org.testng.annotations.DataProvider; +import org.testng.annotations.Test; + +import java.util.ArrayList; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.Set; + +import static com.facebook.presto.testing.assertions.Assert.assertEquals; +import static java.lang.String.format; +import static org.testng.Assert.assertTrue; + +public class TestPartitionTransforms + extends IcebergPartitionTestBase +{ + private static final String CREATE_MULTIPLE_PARTITION_TABLE_TEMPLATE = "CREATE TABLE %s (" + + " int_col INTEGER, " + + " bigint_col BIGINT, " + + " varchar_col VARCHAR, " + + " date_col DATE" + + ") WITH (format = 'PARQUET', partitioning = ARRAY['bucket(date_col, 4)', 'date_col', 'year(date_col)'])"; + + private static final String[] ALL_DATA_TYPES = {"int", "bigint", "varchar", "varbinary", "date", "timestamp", "decimal"}; + private static final String[] TEMPORAL_DATA_TYPES = {"date", "timestamp"}; + + @BeforeClass + public void setUp() + { + assertQuerySucceeds("DROP TABLE IF EXISTS test_data"); + assertQuerySucceeds( + "CREATE TABLE test_data (" + + " int_col INTEGER, " + + " bigint_col BIGINT, " + + " varchar_col VARCHAR, " + + " varbinary_col VARBINARY, " + + " date_col DATE, " + + " timestamp_col TIMESTAMP, " + + " decimal_col DECIMAL(18, 6)" + + ")"); + + assertQuerySucceeds( + "INSERT INTO test_data VALUES " + + " (1, 1000, 'apple', X'01020304', DATE '2023-01-15', TIMESTAMP '2023-01-15 10:30:00', DECIMAL '123.456'), " + + " (2, 2000, 'banana', X'05060708', DATE '2023-02-20', TIMESTAMP '2023-02-20 14:45:30', DECIMAL '234.567'), " + + " (3, 3000, 'cherry', X'090A0B0C', DATE '2023-03-25', TIMESTAMP '2023-03-25 09:15:45', DECIMAL '345.678'), " + + " (4, 4000, 'date', X'0D0E0F10', DATE '2023-04-30', TIMESTAMP '2023-04-30 16:20:10', DECIMAL '456.789'), " + + " (NULL, NULL, NULL, NULL, NULL, NULL, NULL), " + + " (5, 5000, 'elderberry', X'11121314', DATE '2023-05-05', TIMESTAMP '2023-05-05 11:55:25', DECIMAL '567.890'), " + + " (10, 10000, 'fig', X'15161718', DATE '2023-06-10', TIMESTAMP '2023-06-10 13:40:50', DECIMAL '678.901'), " + + " (NULL, NULL, NULL, NULL, NULL, NULL, NULL), " + + " (20, 20000, 'grape', X'191A1B1C', DATE '2023-07-15', TIMESTAMP '2023-07-15 08:25:35', DECIMAL '789.012'), " + + " (30, 30000, 'honeydew', X'1D1E1F20', DATE '2023-08-20', TIMESTAMP '2023-08-20 17:10:15', DECIMAL '0.000001'), " + + " (40, 40000, 'imbe', X'21222324', DATE '2023-09-25', TIMESTAMP '2023-09-25 12:05:40', DECIMAL '1.0'), " + + " (NULL, NULL, NULL, NULL, NULL, NULL, NULL), " + + " (50, 50000, 'jackfruit', X'25262728', DATE '2023-10-30', TIMESTAMP '2023-10-30 15:50:20', DECIMAL '1.00000'), " + + " (NULL, NULL, NULL, NULL, NULL, NULL, NULL), " + + " (NULL, NULL, NULL, NULL, NULL, NULL, NULL)"); + } + + @AfterClass(alwaysRun = true) + public void tearDown() + { + dropTableSafely("test_data"); + + String[] transforms = {"identity", "bucket", "trunc", "year", "month", "day", "hour"}; + for (String transform : transforms) { + for (String dataType : ALL_DATA_TYPES) { + if (isValidTransformForType(transform, dataType)) { + String nativeTableName = getTableName(transform, dataType) + "_native"; + String javaTableName = getTableName(transform, dataType) + "_java"; + dropTableSafely(nativeTableName); + dropTableOnJavaRunner(javaTableName); + } + } + } + + dropTableSafely(TEST_TABLE_PREFIX + "multiple"); + } + + @DataProvider(name = "identityTransformData") + public Object[][] identityTransformData() + { + List data = new ArrayList<>(); + for (String dataType : ALL_DATA_TYPES) { + data.add(new Object[] {dataType}); + } + return data.toArray(new Object[0][]); + } + + @DataProvider(name = "bucketTransformData") + public Object[][] bucketTransformData() + { + return new Object[][] { + {"int", "4"}, + {"bigint", "4"}, + {"varchar", "4"}, + {"varbinary", "4"}, + {"decimal", "4"}, + {"date", "4"}, + }; + } + + @DataProvider(name = "truncateTransformData") + public Object[][] truncateTransformData() + { + return new Object[][] { + {"int", "10"}, + {"bigint", "1000"}, + {"varchar", "3"}, + {"varbinary", "2"}, + {"decimal", "100"}, + }; + } + + @DataProvider(name = "temporalTransformData") + public Object[][] temporalTransformData() + { + return new Object[][] { + {"year", "date"}, + {"year", "timestamp"}, + {"month", "date"}, + {"month", "timestamp"}, + {"day", "date"}, + {"day", "timestamp"}, + }; + } + + @Test(dataProvider = "identityTransformData") + public void testIdentityTransform(String dataType) + { + testPartitionTransform(TRANSFORM_IDENTITY, dataType, null); + } + + @Test(dataProvider = "bucketTransformData") + public void testBucketTransform(String dataType, String bucketCount) + { + testPartitionTransform(TRANSFORM_BUCKET, dataType, bucketCount); + } + + @Test(enabled = false) + public void testBucketTransformTimestamp() + { + testPartitionTransform(TRANSFORM_BUCKET, "timestamp", "3"); + } + + @Test(dataProvider = "truncateTransformData") + public void testTruncateTransform(String dataType, String truncateWidth) + { + testPartitionTransform("trunc", dataType, truncateWidth); + } + + @Test(dataProvider = "temporalTransformData") + public void testTemporalTransform(String transform, String dataType) + { + testPartitionTransform(transform, dataType, null); + } + + @Test + public void testHourTransformTimestamp() + { + testPartitionTransform(TRANSFORM_HOUR, "timestamp", null); + } + + private void testPartitionTransform(String transform, String dataType, String param) + { + Optional maybeTableNames = createPartitionedTables(transform, dataType, param); + + if (!maybeTableNames.isPresent()) { + return; + } + + TableNamePair tableNames = maybeTableNames.get(); + + String columnName = dataType + "_col"; + + PartitionInfo partitionInfo = collectPartitionInfo(transform, tableNames.getNativeTableName(), tableNames.getJavaTableName(), columnName); + + assertTrue(partitionInfo.partitionValuesMatch(), + format("Native and Java runners should generate the same partition values for %s transform on %s. " + + "Native: %s, Java: %s", + transform, dataType, + partitionInfo.nativeUniquePartitionValues, + partitionInfo.javaUniquePartitionValues)); + + assertQuery(format("SELECT * FROM %s", tableNames.getNativeTableName()), format("SELECT * FROM %s", tableNames.getJavaTableName())); + assertQuery(format("SELECT * FROM %s", tableNames.getJavaTableName()), format("SELECT * FROM %s", tableNames.getNativeTableName())); + + verifyPartitionTransform(transform, param, partitionInfo); + verifyPartitionsMetadata(tableNames.getNativeTableName(), tableNames.getJavaTableName()); + } + + @Test + public void testMultiplePartitionTransforms() + { + String tableName = TEST_TABLE_PREFIX + "multiple"; + + dropTableSafely(tableName); + assertQuerySucceeds(format(CREATE_MULTIPLE_PARTITION_TABLE_TEMPLATE, tableName)); + assertQuerySucceeds("INSERT INTO " + tableName + + " SELECT int_col, bigint_col, varchar_col, date_col FROM test_data"); + + MaterializedResult result = computeActual("SELECT * FROM " + tableName + " ORDER BY int_col"); + MaterializedResult expected = computeActual( + "SELECT int_col, bigint_col, varchar_col, date_col FROM test_data ORDER BY int_col"); + assertEquals(result, expected, "Data should match after insertion"); + + dropTableOnJavaRunner(tableName); + ((QueryRunner) getExpectedQueryRunner()).execute(format(CREATE_MULTIPLE_PARTITION_TABLE_TEMPLATE, tableName)); + ((QueryRunner) getExpectedQueryRunner()).execute("INSERT INTO " + tableName + + " SELECT int_col, bigint_col, varchar_col, date_col FROM test_data"); + + MaterializedResult nativeFileInfo = computeActual("SELECT file_path FROM \"" + tableName + "$files\""); + MaterializedResult javaFileInfo = computeExpected( + "SELECT file_path FROM \"" + tableName + "$files\"", ImmutableList.of()); + assertEquals(nativeFileInfo.getRowCount(), javaFileInfo.getRowCount(), + "Native and Java tables should have the same number of data files"); + + verifyMultiplePartitionStructure(nativeFileInfo, javaFileInfo); + + dropTableSafely(tableName); + dropTableOnJavaRunner(tableName); + } + + private void verifyMultiplePartitionStructure(MaterializedResult nativeFileInfo, MaterializedResult javaFileInfo) + { + Set bucketValues = new HashSet<>(); + Set dateValues = new HashSet<>(); + Set yearValues = new HashSet<>(); + + for (int i = 0; i < nativeFileInfo.getRowCount(); i++) { + String nativeFilePath = (String) nativeFileInfo.getMaterializedRows().get(i).getField(0); + String javaFilePath = (String) javaFileInfo.getMaterializedRows().get(i).getField(0); + + Map nativePartitionValues = parsePartitionValues(nativeFilePath, 3); + Map javaPartitionValues = parsePartitionValues(javaFilePath, 3); + + assertEquals(nativePartitionValues, javaPartitionValues, + "Partition values should match between native and Java tables"); + + assertTrue(nativePartitionValues.containsKey("date_col_bucket"), + "Partition column date_col_bucket should exist in path: " + nativeFilePath); + assertTrue(nativePartitionValues.containsKey("date_col"), + "Partition column date_col should exist in path: " + nativeFilePath); + assertTrue(nativePartitionValues.containsKey("date_col_year"), + "Partition column date_col_year should exist in path: " + nativeFilePath); + + bucketValues.add(nativePartitionValues.get("date_col_bucket")); + dateValues.add(nativePartitionValues.get("date_col")); + yearValues.add(nativePartitionValues.get("date_col_year")); + } + + for (String bucketValue : bucketValues) { + if (!"null".equals(bucketValue)) { + int bucket = Integer.parseInt(bucketValue); + assertTrue(bucket >= 0 && bucket < 4, + format("Bucket value %d should be in range [0, 4)", bucket)); + } + } + } +} diff --git a/presto-native-execution/src/test/java/com/facebook/presto/nativeworker/iceberg/TestPartitionedWrite.java b/presto-native-execution/src/test/java/com/facebook/presto/nativeworker/iceberg/TestPartitionedWrite.java new file mode 100644 index 0000000000000..2a9c5646134fa --- /dev/null +++ b/presto-native-execution/src/test/java/com/facebook/presto/nativeworker/iceberg/TestPartitionedWrite.java @@ -0,0 +1,271 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.facebook.presto.nativeworker.iceberg; + +import com.facebook.presto.testing.MaterializedResult; +import org.testng.annotations.AfterClass; +import org.testng.annotations.BeforeClass; +import org.testng.annotations.BeforeMethod; +import org.testng.annotations.Test; + +import static com.facebook.presto.testing.assertions.Assert.assertEquals; +import static java.lang.String.format; + +public class TestPartitionedWrite + extends IcebergPartitionTestBase +{ + private static final String SOURCE_TABLE = "iceberg.tpch.source_table"; + private static final String TARGET_TABLE = "iceberg.tpch.target_table"; + private static final int ROWS_PER_INSERT = 5; + + @BeforeClass + public void setUp() + { + dropTableSafely(TARGET_TABLE); + dropTableSafely(SOURCE_TABLE); + assertUpdate(format("CREATE TABLE %s (id BIGINT, data VARCHAR, category VARCHAR, event_date DATE) " + + "WITH (format = 'PARQUET')", SOURCE_TABLE)); + } + + @BeforeMethod + public void cleanUpTables() + { + assertQuerySucceeds("DELETE FROM " + SOURCE_TABLE); + dropTableSafely(TARGET_TABLE); + } + + @AfterClass(alwaysRun = true) + public void tearDown() + { + dropTableSafely(SOURCE_TABLE); + dropTableSafely(TARGET_TABLE); + } + + @Test + public void testIdentityTransform() + { + int repeats = 3; + long totalRows = repeats * ROWS_PER_INSERT; + insertTestData(repeats); + + MaterializedResult expected = computeActual("SELECT * FROM " + SOURCE_TABLE + " ORDER BY id"); + assertUpdate(format("CREATE TABLE %s (id BIGINT, data VARCHAR, category VARCHAR, event_date DATE) " + + "WITH (format = 'PARQUET', partitioning = ARRAY['event_date', 'category'])", TARGET_TABLE)); + assertUpdate(format("INSERT INTO %s SELECT id, data, category, event_date FROM %s ORDER BY event_date, category", + TARGET_TABLE, SOURCE_TABLE), totalRows); + + verifyRowCount(TARGET_TABLE, totalRows); + MaterializedResult actual = computeActual("SELECT * FROM " + TARGET_TABLE + " ORDER BY id"); + assertEquals(actual, expected, "Row data should match expected"); + } + + @Test + public void testIdentityMultiInsert() + { + int repeats = 2; + long totalRows = repeats * ROWS_PER_INSERT; + insertTestData(repeats); + + assertUpdate(format("CREATE TABLE %s (id BIGINT, data VARCHAR, category VARCHAR, event_date DATE) " + + "WITH (format = 'PARQUET', partitioning = ARRAY['category'])", TARGET_TABLE)); + + assertUpdate(format("INSERT INTO %s SELECT id, data, category, event_date FROM %s", + TARGET_TABLE, SOURCE_TABLE), totalRows); + + assertUpdate(format("INSERT INTO %s SELECT id + 1000, data || '_copy', category, event_date FROM %s WHERE category LIKE 'bgd1%%'", + TARGET_TABLE, SOURCE_TABLE), totalRows); + + verifyRowCount(TARGET_TABLE, totalRows * 2); + + MaterializedResult transformedData = computeActual( + "SELECT * FROM " + TARGET_TABLE + " WHERE id > 1000 ORDER BY id LIMIT 5"); + assertEquals(transformedData.getMaterializedRows().get(0).getField(0), 1012L, "id should be original + 1000"); + assertEquals(transformedData.getMaterializedRows().get(0).getField(1), "3_copy", "data should have '_copy' suffix"); + assertEquals(transformedData.getMaterializedRows().get(0).getField(2), "bgd14", "category should be unchanged"); + + long countInBgd16 = (long) computeScalar("SELECT count(*) FROM " + TARGET_TABLE + " WHERE category = 'bgd16'"); + assertEquals(countInBgd16, 8L, "Should have 8 rows in the 'bgd16' partition"); + } + + @Test + public void testBucketTransform() + { + int repeats = 3; + long totalRows = repeats * ROWS_PER_INSERT; + insertTestData(repeats); + + MaterializedResult expected = computeActual("SELECT * FROM " + SOURCE_TABLE + " ORDER BY id"); + assertUpdate(format("CREATE TABLE %s (id BIGINT, data VARCHAR, category VARCHAR, event_date DATE) " + + "WITH (format = 'PARQUET', partitioning = ARRAY['bucket(data, 8)'])", TARGET_TABLE)); + assertUpdate(format("INSERT INTO %s SELECT id, data, category, event_date FROM %s", + TARGET_TABLE, SOURCE_TABLE), totalRows); + + verifyRowCount(TARGET_TABLE, totalRows); + MaterializedResult actual = computeActual("SELECT * FROM " + TARGET_TABLE + " ORDER BY id"); + assertEquals(actual, expected, "Row data should match expected"); + } + + @Test + public void testBucketTransformOnDate() + { + int repeats = 3; + long totalRows = repeats * ROWS_PER_INSERT; + insertTestDataWithMultipleDates(repeats); + + MaterializedResult expected = computeActual("SELECT * FROM " + SOURCE_TABLE + " ORDER BY id"); + assertUpdate(format("CREATE TABLE %s (id BIGINT, data VARCHAR, category VARCHAR, event_date DATE) " + + "WITH (format = 'PARQUET', partitioning = ARRAY['bucket(event_date, 4)'])", TARGET_TABLE)); + assertUpdate(format("INSERT INTO %s SELECT id, data, category, event_date FROM %s", + TARGET_TABLE, SOURCE_TABLE), totalRows); + + verifyRowCount(TARGET_TABLE, totalRows); + MaterializedResult actual = computeActual("SELECT * FROM " + TARGET_TABLE + " ORDER BY id"); + assertEquals(actual, expected, "Row data should match expected"); + } + + @Test + public void testTruncateTransform() + { + int repeats = 3; + long totalRows = repeats * ROWS_PER_INSERT; + insertTestData(repeats); + + MaterializedResult expected = computeActual("SELECT * FROM " + SOURCE_TABLE + " ORDER BY id"); + assertUpdate(format("CREATE TABLE %s (id BIGINT, data VARCHAR, category VARCHAR, event_date DATE) " + + "WITH (format = 'PARQUET', partitioning = ARRAY['truncate(data, 4)', 'truncate(id, 4)'])", TARGET_TABLE)); + assertUpdate(format("INSERT INTO %s SELECT id, data, category, event_date FROM %s", + TARGET_TABLE, SOURCE_TABLE), totalRows); + + verifyRowCount(TARGET_TABLE, totalRows); + MaterializedResult actual = computeActual("SELECT * FROM " + TARGET_TABLE + " ORDER BY id"); + assertEquals(actual, expected, "Row data should match expected"); + } + + @Test + public void testYearTransform() + { + int repeats = 3; + long totalRows = repeats * ROWS_PER_INSERT; + insertTestDataWithMultipleYears(repeats); + + MaterializedResult expected = computeActual("SELECT * FROM " + SOURCE_TABLE + " ORDER BY id"); + assertUpdate(format("CREATE TABLE %s (id BIGINT, data VARCHAR, category VARCHAR, event_date DATE) " + + "WITH (format = 'PARQUET', partitioning = ARRAY['year(event_date)'])", TARGET_TABLE)); + assertUpdate(format("INSERT INTO %s SELECT id, data, category, event_date FROM %s", + TARGET_TABLE, SOURCE_TABLE), totalRows); + + verifyRowCount(TARGET_TABLE, totalRows); + MaterializedResult actual = computeActual("SELECT * FROM " + TARGET_TABLE + " ORDER BY id"); + assertEquals(actual, expected, "Row data should match expected"); + } + + @Test + public void testMonthTransform() + { + int repeats = 3; + long totalRows = repeats * ROWS_PER_INSERT; + insertTestDataWithMultipleDates(repeats); + + MaterializedResult expected = computeActual("SELECT * FROM " + SOURCE_TABLE + " ORDER BY id"); + assertUpdate(format("CREATE TABLE %s (id BIGINT, data VARCHAR, category VARCHAR, event_date DATE) " + + "WITH (format = 'PARQUET', partitioning = ARRAY['month(event_date)'])", TARGET_TABLE)); + assertUpdate(format("INSERT INTO %s SELECT id, data, category, event_date FROM %s", + TARGET_TABLE, SOURCE_TABLE), totalRows); + + verifyRowCount(TARGET_TABLE, totalRows); + MaterializedResult actual = computeActual("SELECT * FROM " + TARGET_TABLE + " ORDER BY id"); + assertEquals(actual, expected, "Row data should match expected"); + } + + @Test + public void testDayTransform() + { + int repeats = 3; + long totalRows = repeats * ROWS_PER_INSERT; + insertTestDataWithMultipleDates(repeats); + + MaterializedResult expected = computeActual("SELECT * FROM " + SOURCE_TABLE + " ORDER BY id"); + assertUpdate(format("CREATE TABLE %s (id BIGINT, data VARCHAR, category VARCHAR, event_date DATE) " + + "WITH (format = 'PARQUET', partitioning = ARRAY['day(event_date)'])", TARGET_TABLE)); + assertUpdate(format("INSERT INTO %s SELECT id, data, category, event_date FROM %s", + TARGET_TABLE, SOURCE_TABLE), totalRows); + + verifyRowCount(TARGET_TABLE, totalRows); + MaterializedResult actual = computeActual("SELECT * FROM " + TARGET_TABLE + " ORDER BY id"); + assertEquals(actual, expected, "Row data should match expected"); + } + + @Test + public void testCombinedTransforms() + { + int repeats = 3; + long totalRows = repeats * ROWS_PER_INSERT; + insertTestDataWithMultipleYears(repeats); + + MaterializedResult expected = computeActual("SELECT * FROM " + SOURCE_TABLE + " ORDER BY id"); + assertUpdate(format("CREATE TABLE %s (id BIGINT, data VARCHAR, category VARCHAR, event_date DATE) " + + "WITH (format = 'PARQUET', partitioning = ARRAY['year(event_date)', 'bucket(category, 4)', 'data'])", TARGET_TABLE)); + assertUpdate(format("INSERT INTO %s SELECT id, data, category, event_date FROM %s", + TARGET_TABLE, SOURCE_TABLE), totalRows); + + verifyRowCount(TARGET_TABLE, totalRows); + MaterializedResult actual = computeActual("SELECT * FROM " + TARGET_TABLE + " ORDER BY id"); + assertEquals(actual, expected, "Row data should match expected"); + } + + private void insertTestData(int repeat) + { + for (int i = 0; i < repeat; i++) { + assertUpdate(format("INSERT INTO %s VALUES " + + "(13, '1', 'bgd16', DATE '2021-11-10'), " + + "(21, '2', 'bgd13', DATE '2021-11-10'), " + + "(12, '3', 'bgd14', DATE '2021-11-10'), " + + "(222, '3', 'bgd15', DATE '2021-11-10'), " + + "(45, '4', 'bgd16', DATE '2021-11-10')", + SOURCE_TABLE), ROWS_PER_INSERT); + } + } + + private void insertTestDataWithMultipleDates(int repeat) + { + for (int i = 0; i < repeat; i++) { + assertUpdate(format("INSERT INTO %s VALUES " + + "(13, '1', 'bgd16', DATE '2021-01-10'), " + + "(21, '2', 'bgd13', DATE '2021-02-15'), " + + "(12, '3', 'bgd14', DATE '2021-03-20'), " + + "(222, '3', 'bgd15', DATE '2021-04-25'), " + + "(45, '4', 'bgd16', DATE '2021-05-30')", + SOURCE_TABLE), ROWS_PER_INSERT); + } + } + + private void insertTestDataWithMultipleYears(int repeat) + { + for (int i = 0; i < repeat; i++) { + assertUpdate(format("INSERT INTO %s VALUES " + + "(13, '1', 'bgd16', DATE '2020-11-10'), " + + "(21, '2', 'bgd13', DATE '2021-11-10'), " + + "(12, '3', 'bgd14', DATE '2022-11-10'), " + + "(222, '3', 'bgd15', DATE '2023-11-10'), " + + "(45, '4', 'bgd16', DATE '2024-11-10')", + SOURCE_TABLE), ROWS_PER_INSERT); + } + } + + private void verifyRowCount(String tableName, long expectedCount) + { + long actualCount = (long) computeScalar("SELECT count(*) FROM " + tableName); + assertEquals(actualCount, expectedCount, format("Table %s should have %d rows", tableName, expectedCount)); + } +} diff --git a/presto-native-execution/src/test/java/com/facebook/presto/nativeworker/iceberg/TestSchemaEvolution.java b/presto-native-execution/src/test/java/com/facebook/presto/nativeworker/iceberg/TestSchemaEvolution.java new file mode 100644 index 0000000000000..2b8eaaa0124b7 --- /dev/null +++ b/presto-native-execution/src/test/java/com/facebook/presto/nativeworker/iceberg/TestSchemaEvolution.java @@ -0,0 +1,253 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.facebook.presto.nativeworker.iceberg; + +import org.testng.annotations.AfterMethod; +import org.testng.annotations.BeforeMethod; +import org.testng.annotations.Test; + +import static java.lang.String.format; + +public class TestSchemaEvolution + extends IcebergPartitionTestBase +{ + private static final String BASE_TABLE = "page_views"; + + @BeforeMethod + public void setUp() + { + assertQuerySucceeds(format("DROP TABLE IF EXISTS %s", BASE_TABLE)); + assertQuerySucceeds(format( + "CREATE TABLE %s (" + + " user_id BIGINT, " + + " page_url VARCHAR, " + + " view_time TIMESTAMP, " + + " session_id VARCHAR" + + ") WITH (format = 'PARQUET', partitioning = ARRAY['bucket(user_id, 4)'])", BASE_TABLE)); + + assertQuerySucceeds(format( + "INSERT INTO %s VALUES " + + " (1001, '/home', TIMESTAMP '2023-01-15 10:30:00', 'sess_001'), " + + " (1002, '/products', TIMESTAMP '2023-01-15 11:45:30', 'sess_002'), " + + " (1003, '/about', TIMESTAMP '2023-01-16 09:15:45', 'sess_003')", BASE_TABLE)); + } + + @AfterMethod(alwaysRun = true) + public void tearDown() + { + assertQuerySucceeds(format("DROP TABLE IF EXISTS %s", BASE_TABLE)); + } + + @Test + public void testBasicColumnOperations() + { + assertQuerySucceeds(format("ALTER TABLE %s ADD COLUMN zipcode VARCHAR", BASE_TABLE)); + assertQuerySucceeds(format("ALTER TABLE %s RENAME COLUMN zipcode TO location", BASE_TABLE)); + assertQuerySucceeds(format("ALTER TABLE %s DROP COLUMN location", BASE_TABLE)); + assertQuery(format("SELECT COUNT(*) FROM %s", BASE_TABLE), "VALUES (BIGINT '3')"); + } + + @Test + public void testIdentityPartitionColumns() + { + assertQuerySucceeds(format("ALTER TABLE %s ADD COLUMN zipcode VARCHAR WITH (partitioning = 'identity')", BASE_TABLE)); + assertQuerySucceeds(format("ALTER TABLE %s ADD COLUMN region_id INTEGER WITH (partitioning = 'identity')", BASE_TABLE)); + assertQuerySucceeds(format("ALTER TABLE %s ADD COLUMN country_code BIGINT WITH (partitioning = 'identity')", BASE_TABLE)); + assertQuerySucceeds(format("ALTER TABLE %s ADD COLUMN event_date DATE WITH (partitioning = 'identity')", BASE_TABLE)); + assertQuerySucceeds(format( + "INSERT INTO %s VALUES " + + " (2001, '/search', TIMESTAMP '2023-02-01 14:20:00', 'sess_004', '12345', 1, 840, DATE '2023-02-01')", BASE_TABLE)); + } + + @Test + public void testTruncatePartitionColumns() + { + assertQuerySucceeds(format("ALTER TABLE %s ADD COLUMN location VARCHAR WITH (partitioning = 'truncate(2)')", BASE_TABLE)); + assertQuerySucceeds(format("ALTER TABLE %s ADD COLUMN category VARCHAR WITH (partitioning = 'truncate(5)')", BASE_TABLE)); + assertQuerySucceeds(format("ALTER TABLE %s ADD COLUMN score INTEGER WITH (partitioning = 'truncate(10)')", BASE_TABLE)); + assertQuerySucceeds(format("ALTER TABLE %s ADD COLUMN revenue BIGINT WITH (partitioning = 'truncate(1000)')", BASE_TABLE)); + assertQuerySucceeds(format("ALTER TABLE %s ADD COLUMN price DECIMAL(10,2) WITH (partitioning = 'truncate(100)')", BASE_TABLE)); + assertQuerySucceeds(format( + "INSERT INTO %s VALUES " + + " (3001, '/checkout', TIMESTAMP '2023-03-01 16:30:00', 'sess_005', 'US-CA', 'electronics', 85, 15000, DECIMAL '299.99')", BASE_TABLE)); + } + + @Test + public void testBucketPartitionColumns() + { + assertQuerySucceeds(format("ALTER TABLE %s ADD COLUMN location VARCHAR WITH (partitioning = 'bucket(8)')", BASE_TABLE)); + assertQuerySucceeds(format("ALTER TABLE %s ADD COLUMN user_segment INTEGER WITH (partitioning = 'bucket(4)')", BASE_TABLE)); + assertQuerySucceeds(format("ALTER TABLE %s ADD COLUMN device_id BIGINT WITH (partitioning = 'bucket(16)')", BASE_TABLE)); + assertQuerySucceeds(format("ALTER TABLE %s ADD COLUMN rating DECIMAL(3,1) WITH (partitioning = 'bucket(5)')", BASE_TABLE)); + assertQuerySucceeds(format("ALTER TABLE %s ADD COLUMN hash_key VARBINARY WITH (partitioning = 'bucket(12)')", BASE_TABLE)); + assertQuerySucceeds(format( + "INSERT INTO %s VALUES " + + " (4001, '/profile', TIMESTAMP '2023-04-01 12:15:00', 'sess_006', 'NY-NYC', 3, 987654321, DECIMAL '4.5', X'ABCDEF')", BASE_TABLE)); + } + + @Test + public void testDateTimePartitionTransforms() + { + assertQuerySucceeds(format("ALTER TABLE %s ADD COLUMN dt DATE WITH (partitioning = 'year')", BASE_TABLE)); + assertQuerySucceeds(format("ALTER TABLE %s ADD COLUMN ts TIMESTAMP WITH (partitioning = 'month')", BASE_TABLE)); + assertQuerySucceeds(format("ALTER TABLE %s ADD COLUMN event_dt DATE WITH (partitioning = 'day')", BASE_TABLE)); + assertQuerySucceeds(format("ALTER TABLE %s ADD COLUMN created_ts TIMESTAMP WITH (partitioning = 'hour')", BASE_TABLE)); + assertQuerySucceeds(format("ALTER TABLE %s ADD COLUMN report_date DATE WITH (partitioning = 'month')", BASE_TABLE)); + assertQuerySucceeds(format( + "INSERT INTO %s VALUES " + + " (5001, '/dashboard', TIMESTAMP '2023-05-15 08:30:00', 'sess_007', " + + " DATE '2023-05-15', TIMESTAMP '2023-05-15 08:30:00', DATE '2023-05-15', " + + " TIMESTAMP '2023-05-15 08:30:00', DATE '2023-05-01'), " + + " (5002, '/reports', TIMESTAMP '2023-06-20 14:45:30', 'sess_008', " + + " DATE '2023-06-20', TIMESTAMP '2023-06-20 14:45:30', DATE '2023-06-20', " + + " TIMESTAMP '2023-06-20 14:45:30', DATE '2023-06-01')", BASE_TABLE)); + } + + @Test + public void testMixedPartitionTransforms() + { + assertQuerySucceeds(format("ALTER TABLE %s ADD COLUMN region VARCHAR WITH (partitioning = 'identity')", BASE_TABLE)); + assertQuerySucceeds(format("ALTER TABLE %s ADD COLUMN category VARCHAR WITH (partitioning = 'truncate(3)')", BASE_TABLE)); + assertQuerySucceeds(format("ALTER TABLE %s ADD COLUMN bucket_id INTEGER WITH (partitioning = 'bucket(10)')", BASE_TABLE)); + assertQuerySucceeds(format("ALTER TABLE %s ADD COLUMN event_year DATE WITH (partitioning = 'year')", BASE_TABLE)); + + assertQuerySucceeds(format( + "INSERT INTO %s VALUES " + + " (6001, '/analytics', TIMESTAMP '2023-07-10 10:00:00', 'sess_009', " + + " 'west', 'shopping', 42, DATE '2023-07-10'), " + + " (6002, '/metrics', TIMESTAMP '2023-08-15 15:30:00', 'sess_010', " + + " 'east', 'browsing', 73, DATE '2023-08-15')", BASE_TABLE)); + + assertQuery(format("SELECT COUNT(*) FROM %s WHERE region = 'west'", BASE_TABLE), "VALUES (BIGINT '1')"); + assertQuery(format("SELECT COUNT(*) FROM %s WHERE year(event_year) = 2023", BASE_TABLE), "VALUES (BIGINT '2')"); + } + + @Test + public void testSchemaEvolutionWithExistingData() + { + assertQuery(format("SELECT COUNT(*) FROM %s", BASE_TABLE), "VALUES (BIGINT '3')"); + assertQuerySucceeds(format("ALTER TABLE %s ADD COLUMN status VARCHAR WITH (partitioning = 'identity')", BASE_TABLE)); + assertQuerySucceeds(format( + "INSERT INTO %s VALUES " + + " (7001, '/new-feature', TIMESTAMP '2023-09-01 12:00:00', 'sess_011', 'active')", BASE_TABLE)); + + assertQuery(format("SELECT COUNT(*) FROM %s WHERE status IS NULL", BASE_TABLE), "VALUES (BIGINT '3')"); + assertQuery(format("SELECT COUNT(*) FROM %s WHERE status = 'active'", BASE_TABLE), "VALUES (BIGINT '1')"); + } + + @Test + public void testComplexDataTypes() + { + assertQuerySucceeds(format("ALTER TABLE %s ADD COLUMN amount DECIMAL(15,2) WITH (partitioning = 'identity')", BASE_TABLE)); + assertQuerySucceeds(format("ALTER TABLE %s ADD COLUMN score DECIMAL(5,2) WITH (partitioning = 'bucket(6)')", BASE_TABLE)); + assertQuerySucceeds(format("ALTER TABLE %s ADD COLUMN rating DECIMAL(10,3) WITH (partitioning = 'truncate(10)')", BASE_TABLE)); + + assertQuerySucceeds(format( + "INSERT INTO %s VALUES " + + " (8001, '/premium', TIMESTAMP '2023-10-01 09:30:00', 'sess_012', " + + " DECIMAL '1234.56', DECIMAL '98.75', DECIMAL '456.789')", BASE_TABLE)); + } + + @Test + public void testColumnRenaming() + { + assertQuerySucceeds(format("ALTER TABLE %s ADD COLUMN zipcode VARCHAR", BASE_TABLE)); + assertQuerySucceeds(format("ALTER TABLE %s RENAME COLUMN zipcode TO location", BASE_TABLE)); + assertQueryFails(format("SELECT zipcode FROM %s", BASE_TABLE), ".*Column 'zipcode' cannot be resolved.*"); + assertQuerySucceeds(format("SELECT location FROM %s", BASE_TABLE)); + + assertQuerySucceeds(format( + "INSERT INTO %s VALUES " + + " (9001, '/test', TIMESTAMP '2023-11-01 10:00:00', 'sess_test', 'NYC')", BASE_TABLE)); + assertQuery(format("SELECT COUNT(*) FROM %s WHERE location = 'NYC'", BASE_TABLE), "VALUES (BIGINT '1')"); + } + + @Test + public void testRenamePartitionedColumn() + { + assertQuerySucceeds(format("ALTER TABLE %s ADD COLUMN zipcode VARCHAR WITH (partitioning = 'identity')", BASE_TABLE)); + assertQuerySucceeds(format( + "INSERT INTO %s VALUES " + + " (9002, '/partition-test', TIMESTAMP '2023-11-02 11:00:00', 'sess_part', '10001')", BASE_TABLE)); + assertQuerySucceeds(format("ALTER TABLE %s RENAME COLUMN zipcode TO postal_code", BASE_TABLE)); + + assertQuery(format("SELECT COUNT(*) FROM %s WHERE postal_code = '10001'", BASE_TABLE), "VALUES (BIGINT '1')"); + + assertQuerySucceeds(format( + "INSERT INTO %s VALUES " + + " (9003, '/partition-test2', TIMESTAMP '2023-11-03 12:00:00', 'sess_part2', '10002')", BASE_TABLE)); + + assertQuery(format("SELECT COUNT(*) FROM %s WHERE postal_code IN ('10001', '10002')", BASE_TABLE), "VALUES (BIGINT '2')"); + } + + @Test + public void testRenameColumnErrorCases() + { + assertQuerySucceeds(format("ALTER TABLE %s ADD COLUMN zipcode VARCHAR", BASE_TABLE)); + assertQueryFails( + format("ALTER TABLE %s RENAME COLUMN zipcode TO user_id", BASE_TABLE), + ".*Column 'user_id' already exists.*"); + + assertQueryFails( + format("ALTER TABLE %s RENAME COLUMN nonexistent TO location", BASE_TABLE), + ".*Column 'nonexistent' does not exist.*"); + assertQueryFails( + format("ALTER TABLE %s RENAME COLUMN zipcode TO ZIPCODE", BASE_TABLE), + ".*Column.*already exists.*"); + } + + @Test + public void testMultipleColumnRenames() + { + assertQuerySucceeds(format("ALTER TABLE %s ADD COLUMN zipcode VARCHAR", BASE_TABLE)); + assertQuerySucceeds(format("ALTER TABLE %s ADD COLUMN region_id INTEGER", BASE_TABLE)); + assertQuerySucceeds(format("ALTER TABLE %s ADD COLUMN score DECIMAL(5,2)", BASE_TABLE)); + assertQuerySucceeds(format("ALTER TABLE %s RENAME COLUMN zipcode TO postal_code", BASE_TABLE)); + assertQuerySucceeds(format("ALTER TABLE %s RENAME COLUMN region_id TO area_code", BASE_TABLE)); + assertQuerySucceeds(format("ALTER TABLE %s RENAME COLUMN score TO rating", BASE_TABLE)); + + assertQuerySucceeds(format( + "INSERT INTO %s VALUES " + + " (9007, '/multi-rename', TIMESTAMP '2023-11-07 16:00:00', 'sess_multi', '12345', 100, DECIMAL '4.5')", BASE_TABLE)); + + assertQuery(format("SELECT COUNT(*) FROM %s WHERE postal_code = '12345'", BASE_TABLE), "VALUES (BIGINT '1')"); + assertQuery(format("SELECT COUNT(*) FROM %s WHERE area_code = 100", BASE_TABLE), "VALUES (BIGINT '1')"); + } + + @Test + public void testRenameWithDataIntegrity() + { + assertQuerySucceeds(format("ALTER TABLE %s ADD COLUMN zipcode VARCHAR", BASE_TABLE)); + assertQuerySucceeds(format( + "INSERT INTO %s VALUES " + + " (9008, '/integrity-test', TIMESTAMP '2023-11-08 17:00:00', 'sess_integrity', '54321')", BASE_TABLE)); + + assertQuery(format("SELECT COUNT(*) FROM %s WHERE zipcode = '54321'", BASE_TABLE), "VALUES (BIGINT '1')"); + assertQuerySucceeds(format("ALTER TABLE %s RENAME COLUMN zipcode TO location_code", BASE_TABLE)); + assertQuery(format("SELECT COUNT(*) FROM %s WHERE location_code = '54321'", BASE_TABLE), "VALUES (BIGINT '1')"); + assertQuery(format("SELECT COUNT(*) FROM %s", BASE_TABLE), "VALUES (BIGINT '4')"); + } + + @Test + public void testRenameTruncatePartitionedColumn() + { + assertQuerySucceeds(format("ALTER TABLE %s ADD COLUMN category VARCHAR WITH (partitioning = 'truncate(3)')", BASE_TABLE)); + assertQuerySucceeds(format( + "INSERT INTO %s VALUES " + + " (9005, '/truncate-test', TIMESTAMP '2023-11-05 14:00:00', 'sess_trunc', 'electronics')", BASE_TABLE)); + assertQuerySucceeds(format("ALTER TABLE %s RENAME COLUMN category TO product_type", BASE_TABLE)); + assertQuerySucceeds(format("select product_type from %s", BASE_TABLE)); + assertQuery(format("SELECT count(*) FROM %s WHERE product_type = 'electronics'", BASE_TABLE), "VALUES (BIGINT '1')"); + } +} diff --git a/presto-native-execution/src/test/java/com/facebook/presto/nativeworker/iceberg/TestTpchPartitionTransforms.java b/presto-native-execution/src/test/java/com/facebook/presto/nativeworker/iceberg/TestTpchPartitionTransforms.java new file mode 100644 index 0000000000000..41c75c2c3c07a --- /dev/null +++ b/presto-native-execution/src/test/java/com/facebook/presto/nativeworker/iceberg/TestTpchPartitionTransforms.java @@ -0,0 +1,221 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.facebook.presto.nativeworker.iceberg; + +import com.facebook.presto.nativeworker.NativeQueryRunnerUtils; +import com.facebook.presto.nativeworker.PrestoNativeQueryRunnerUtils; +import com.facebook.presto.testing.ExpectedQueryRunner; +import com.facebook.presto.testing.MaterializedResult; +import com.facebook.presto.testing.QueryRunner; +import com.google.common.collect.ImmutableList; +import org.testng.annotations.DataProvider; +import org.testng.annotations.Test; + +import static com.facebook.presto.testing.assertions.Assert.assertEquals; +import static java.lang.String.format; +import static org.testng.Assert.assertTrue; + +public class TestTpchPartitionTransforms + extends IcebergPartitionTestBase +{ + private static final String TEST_TABLE_PREFIX = "partition_test_"; + private static final String sourceTableName = "lineitem"; + private static final String sourceTableSchema = "iceberg.tpch"; + private static final String CREATE_LINEITEM_TABLE_TEMPLATE = "CREATE TABLE %s (" + + " orderkey BIGINT, " + + " partkey BIGINT, " + + " suppkey BIGINT, " + + " linenumber INTEGER, " + + " quantity DECIMAL(12,2), " + + " extendedprice DECIMAL(12,2), " + + " discount DECIMAL(12,2), " + + " tax DECIMAL(12,2), " + + " returnflag VARCHAR, " + + " linestatus VARCHAR, " + + " shipdate DATE, " + + " commitdate DATE, " + + " receiptdate DATE, " + + " shipinstruct VARCHAR, " + + " shipmode VARCHAR, " + + " comment VARCHAR" + + ") WITH (format = 'PARQUET', partitioning = ARRAY['%s'])"; + private static final String INSERT_LINEITEM_TEMPLATE = "INSERT INTO %s SELECT orderkey, partkey, suppkey, linenumber," + + "CAST(quantity AS DECIMAL(12,2)), CAST(extendedprice AS DECIMAL(12,2))," + + "CAST(discount AS DECIMAL(12,2)), CAST(tax AS DECIMAL(12,2))," + + "CAST(returnflag AS VARCHAR(1)), CAST(linestatus AS VARCHAR(1))," + + "CAST(shipdate AS DATE), CAST(commitdate AS DATE), CAST(receiptdate as DATE)," + + "CAST(shipinstruct AS VARCHAR(25)), CAST(shipmode AS VARCHAR(10))," + + "CAST(comment AS VARCHAR(44)) FROM %s.%s"; + + private static Object[][] getPartitionTransformTestParameters() + { + return new Object[][] { + {"year", "", "shipdate"}, + {"bucket", "8", "orderkey"}, + {"bucket", "4", "shipdate"}, + {"bucket", "4", "quantity"}, + {"bucket", "10", "tax"}, + {"bucket", "10", "linenumber"} + }; + } + + @Override + protected QueryRunner createQueryRunner() + throws Exception + { + QueryRunner queryRunner = PrestoNativeQueryRunnerUtils.nativeIcebergQueryRunnerBuilder().setUseThrift(true).build(); + return queryRunner; + } + + @Override + protected ExpectedQueryRunner createExpectedQueryRunner() + throws Exception + { + QueryRunner javaQueryRunner = PrestoNativeQueryRunnerUtils.javaIcebergQueryRunnerBuilder().build(); + NativeQueryRunnerUtils.createAllIcebergTables(javaQueryRunner); + return javaQueryRunner; + } + + private String[] createPartitionedLineitemTables(String targetTableName, String partitioningClause) + { + String nativeTableName = targetTableName + "_native"; + String javaTableName = targetTableName + "_java"; + + ((QueryRunner) getExpectedQueryRunner()).execute(format(DROP_TABLE_TEMPLATE, javaTableName)); + ((QueryRunner) getExpectedQueryRunner()).execute(format(CREATE_LINEITEM_TABLE_TEMPLATE, + javaTableName, partitioningClause)); + ((QueryRunner) getExpectedQueryRunner()).execute(format(INSERT_LINEITEM_TEMPLATE, javaTableName, sourceTableSchema, sourceTableName)); + + assertQuerySucceeds(format(DROP_TABLE_TEMPLATE, nativeTableName)); + assertQuerySucceeds(format( + CREATE_LINEITEM_TABLE_TEMPLATE, + nativeTableName, partitioningClause)); + assertQuerySucceeds(format(INSERT_LINEITEM_TEMPLATE, nativeTableName, sourceTableSchema, sourceTableName)); + + MaterializedResult nativeResult = computeActual(format("SELECT count(*) FROM %s", nativeTableName)); + MaterializedResult javaResult = computeExpected(format("SELECT count(*) FROM %s", javaTableName), ImmutableList.of()); + assertEquals(nativeResult, javaResult, "Row count should match between native and Java tables"); + + return new String[] {nativeTableName, javaTableName}; + } + + @Test(dataProvider = "partitionTransformTestParameters") + public void testPartitionTransform(String transform, String parameter, String column) + { + String partitioningClause = buildPartitioningClause(transform, column, parameter); + String[] tableNames = createPartitionedLineitemTables(sourceTableName, partitioningClause); + String nativeTableName = tableNames[0]; + String javaTableName = tableNames[1]; + + try { + String partitionColumnName = getPartitionColumnName(column, transform); + MaterializedResult nativePartitions = computeActual( + format("SELECT DISTINCT %s FROM \"%s$partitions\"", partitionColumnName, nativeTableName)); + MaterializedResult javaPartitions = computeExpected( + format("SELECT DISTINCT %s FROM \"%s$partitions\"", partitionColumnName, javaTableName), + ImmutableList.of()); + + assertTrue(nativePartitions.getRowCount() > 0, "Should have partitions in native table"); + assertEquals(nativePartitions.getRowCount(), javaPartitions.getRowCount(), + "Native and Java tables should have the same number of partitions"); + + assertQuery(format("SELECT * FROM %s ORDER BY orderkey", nativeTableName), format("SELECT * FROM %s ORDER BY orderkey", javaTableName)); + + PartitionInfo partitionInfo = collectPartitionInfo(transform, + nativeTableName, javaTableName, column); + + verifyPartitionTransform(transform, parameter, partitionInfo); + + verifyPartitionsMetadata(nativeTableName, javaTableName); + } + finally { + cleanupTables(nativeTableName, javaTableName); + } + } + + @DataProvider(name = "partitionTransformTestParameters") + public Object[][] partitionTransformTestParameters() + { + return getPartitionTransformTestParameters(); + } + + private void cleanupTables(String nativeTableName, String javaTableName) + { + try { + assertQuerySucceeds(format("DROP TABLE IF EXISTS %s", nativeTableName)); + } + catch (Exception e) { + } + + try { + ((QueryRunner) getExpectedQueryRunner()).execute( + format("DROP TABLE IF EXISTS %s", javaTableName)); + } + catch (Exception e) { + } + } + + @Test(enabled = false) + public void testMultiplePartitionTransforms() + { + String targetTableName = TEST_TABLE_PREFIX + "lineitem_multiple_transforms"; + String nativeTableName = targetTableName + "_native"; + String javaTableName = targetTableName + "_java"; + + assertQuerySucceeds(format(DROP_TABLE_TEMPLATE, nativeTableName)); + assertQuerySucceeds(format( + "CREATE TABLE %s (" + + " orderkey BIGINT, " + + " partkey BIGINT, " + + " suppkey BIGINT, " + + " linenumber INTEGER, " + + " quantity DOUBLE, " + + " extendedprice DOUBLE, " + + " discount DOUBLE, " + + " tax DOUBLE, " + + " returnflag VARCHAR, " + + " linestatus VARCHAR, " + + " shipdate DATE, " + + " commitdate DATE, " + + " receiptdate DATE, " + + " shipinstruct VARCHAR, " + + " shipmode VARCHAR, " + + " comment VARCHAR" + + ") WITH (format = 'PARQUET', partitioning = ARRAY['bucket(orderkey, 4)', 'truncate(shipmode, 1)', 'month(shipdate)'])", + nativeTableName)); + + assertQuerySucceeds(format(INSERT_LINEITEM_TEMPLATE, nativeTableName, sourceTableSchema, sourceTableName)); + + ((QueryRunner) getExpectedQueryRunner()).execute(format( + "CREATE TABLE %s WITH (format = 'PARQUET', partitioning = ARRAY['bucket(orderkey, 4)', 'truncate(shipmode, 1)', 'month(shipdate)']) AS " + + "SELECT * FROM %s", + javaTableName, sourceTableName)); + + MaterializedResult nativeResult = computeActual(format("SELECT count(*) FROM %s", nativeTableName)); + MaterializedResult javaResult = computeExpected(format("SELECT count(*) FROM %s", javaTableName), ImmutableList.of()); + assertEquals(nativeResult, javaResult, "Row count should match between native and Java tables"); + + MaterializedResult nativePartitions = computeActual(format( + "SELECT DISTINCT orderkey_bucket, shipmode_trunc, shipdate_month FROM \"%s$partitions\"", + nativeTableName)); + assertTrue(nativePartitions.getRowCount() > 0, "Should have partitions"); + + String query = format( + "SELECT count(*) FROM %s WHERE orderkey %% 4 = 0 AND shipmode LIKE 'A%%'", + nativeTableName); + computeActual(query); + + verifyPartitionsMetadata(nativeTableName, javaTableName); + } +}