diff --git a/iceberg/iceberg-catalog/src/test/java/org/apache/iceberg/hive/HiveTableTest.java b/iceberg/iceberg-catalog/src/test/java/org/apache/iceberg/hive/HiveTableTest.java index 7bf660120304..8be9740be490 100644 --- a/iceberg/iceberg-catalog/src/test/java/org/apache/iceberg/hive/HiveTableTest.java +++ b/iceberg/iceberg-catalog/src/test/java/org/apache/iceberg/hive/HiveTableTest.java @@ -38,18 +38,17 @@ import org.apache.hadoop.hive.serde.serdeConstants; import org.apache.hive.iceberg.org.apache.avro.generic.GenericData; import org.apache.hive.iceberg.org.apache.avro.generic.GenericRecordBuilder; -import org.apache.iceberg.BaseTable; import org.apache.iceberg.DataFile; import org.apache.iceberg.DataFiles; import org.apache.iceberg.FileScanTask; import org.apache.iceberg.Files; -import org.apache.iceberg.HasTableOperations; import org.apache.iceberg.ManifestFile; import org.apache.iceberg.PartitionSpec; import org.apache.iceberg.Schema; import org.apache.iceberg.Table; import org.apache.iceberg.TableMetadataParser; import org.apache.iceberg.TableProperties; +import org.apache.iceberg.TableUtil; import org.apache.iceberg.avro.Avro; import org.apache.iceberg.avro.AvroSchemaUtil; import org.apache.iceberg.catalog.Namespace; @@ -259,7 +258,7 @@ public void testDropTable() throws IOException { .as("Table manifest files should not exist") .doesNotExist(); } - assertThat(new File(((HasTableOperations) table).operations().current().metadataFileLocation() + assertThat(new File(TableUtil.metadataFileLocation(table) .replace("file:", ""))) .as("Table metadata file should not exist") .doesNotExist(); @@ -552,7 +551,7 @@ public void testRegisterHadoopTableToHiveCatalog() throws IOException, TExceptio .hasMessage("Table does not exist: hivedb.table1"); // register the table to hive catalog using the latest metadata file - String latestMetadataFile = ((BaseTable) table).operations().current().metadataFileLocation(); + String latestMetadataFile = TableUtil.metadataFileLocation(table); catalog.registerTable(identifier, "file:" + latestMetadataFile); assertThat(HIVE_METASTORE_EXTENSION.metastoreClient().getTable(DB_NAME, "table1")).isNotNull(); diff --git a/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/data/PartitionStatsHandler.java b/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/data/PartitionStatsHandler.java deleted file mode 100644 index 20e819dec457..000000000000 --- a/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/data/PartitionStatsHandler.java +++ /dev/null @@ -1,285 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.apache.iceberg.data; - -import java.io.IOException; -import java.util.Collection; -import java.util.List; -import java.util.Locale; -import java.util.UUID; -import org.apache.iceberg.FileFormat; -import org.apache.iceberg.HasTableOperations; -import org.apache.iceberg.ImmutableGenericPartitionStatisticsFile; -import org.apache.iceberg.PartitionSpec; -import org.apache.iceberg.PartitionStatisticsFile; -import org.apache.iceberg.PartitionStats; -import org.apache.iceberg.PartitionStatsUtil; -import org.apache.iceberg.Partitioning; -import org.apache.iceberg.Schema; -import org.apache.iceberg.Snapshot; -import org.apache.iceberg.StructLike; -import org.apache.iceberg.Table; -import org.apache.iceberg.avro.Avro; -import org.apache.iceberg.avro.InternalReader; -import org.apache.iceberg.data.parquet.InternalWriter; -import org.apache.iceberg.io.CloseableIterable; -import org.apache.iceberg.io.DataWriter; -import org.apache.iceberg.io.InputFile; -import org.apache.iceberg.io.OutputFile; -import org.apache.iceberg.parquet.Parquet; -import org.apache.iceberg.relocated.com.google.common.annotations.VisibleForTesting; -import org.apache.iceberg.relocated.com.google.common.base.Preconditions; -import org.apache.iceberg.types.Types.IntegerType; -import org.apache.iceberg.types.Types.LongType; -import org.apache.iceberg.types.Types.NestedField; -import org.apache.iceberg.types.Types.StructType; - -import static org.apache.iceberg.TableProperties.DEFAULT_FILE_FORMAT; -import static org.apache.iceberg.TableProperties.DEFAULT_FILE_FORMAT_DEFAULT; - -/** - * Computes, writes and reads the {@link PartitionStatisticsFile}. Uses generic readers and writers - * to support writing and reading of the stats in table default format. - */ -public class PartitionStatsHandler { - - private PartitionStatsHandler() { - } - - public static final int PARTITION_FIELD_ID = 0; - public static final String PARTITION_FIELD_NAME = "partition"; - public static final NestedField SPEC_ID = NestedField.required(1, "spec_id", IntegerType.get()); - public static final NestedField DATA_RECORD_COUNT = - NestedField.required(2, "data_record_count", LongType.get()); - public static final NestedField DATA_FILE_COUNT = - NestedField.required(3, "data_file_count", IntegerType.get()); - public static final NestedField TOTAL_DATA_FILE_SIZE_IN_BYTES = - NestedField.required(4, "total_data_file_size_in_bytes", LongType.get()); - public static final NestedField POSITION_DELETE_RECORD_COUNT = - NestedField.optional(5, "position_delete_record_count", LongType.get()); - public static final NestedField POSITION_DELETE_FILE_COUNT = - NestedField.optional(6, "position_delete_file_count", IntegerType.get()); - public static final NestedField EQUALITY_DELETE_RECORD_COUNT = - NestedField.optional(7, "equality_delete_record_count", LongType.get()); - public static final NestedField EQUALITY_DELETE_FILE_COUNT = - NestedField.optional(8, "equality_delete_file_count", IntegerType.get()); - public static final NestedField TOTAL_RECORD_COUNT = - NestedField.optional(9, "total_record_count", LongType.get()); - public static final NestedField LAST_UPDATED_AT = - NestedField.optional(10, "last_updated_at", LongType.get()); - public static final NestedField LAST_UPDATED_SNAPSHOT_ID = - NestedField.optional(11, "last_updated_snapshot_id", LongType.get()); - - /** - * Generates the partition stats file schema based on a combined partition type which considers - * all specs in a table. - * - * @param unifiedPartitionType unified partition schema type. Could be calculated by {@link - * Partitioning#partitionType(Table)}. - * @return a schema that corresponds to the provided unified partition type. - */ - public static Schema schema(StructType unifiedPartitionType) { - Preconditions.checkState(!unifiedPartitionType.fields().isEmpty(), "Table must be partitioned"); - return new Schema( - NestedField.required(PARTITION_FIELD_ID, PARTITION_FIELD_NAME, unifiedPartitionType), - SPEC_ID, - DATA_RECORD_COUNT, - DATA_FILE_COUNT, - TOTAL_DATA_FILE_SIZE_IN_BYTES, - POSITION_DELETE_RECORD_COUNT, - POSITION_DELETE_FILE_COUNT, - EQUALITY_DELETE_RECORD_COUNT, - EQUALITY_DELETE_FILE_COUNT, - TOTAL_RECORD_COUNT, - LAST_UPDATED_AT, - LAST_UPDATED_SNAPSHOT_ID); - } - - /** - * Computes and writes the {@link PartitionStatisticsFile} for a given table's current snapshot. - * - * @param table The {@link Table} for which the partition statistics is computed. - * @return {@link PartitionStatisticsFile} for the current snapshot, or null if no statistics are - * present. - */ - public static PartitionStatisticsFile computeAndWriteStatsFile(Table table) throws IOException { - if (table.currentSnapshot() == null) { - return null; - } - - return computeAndWriteStatsFile(table, table.currentSnapshot().snapshotId()); - } - - /** - * Computes and writes the {@link PartitionStatisticsFile} for a given table and snapshot. - * - * @param table The {@link Table} for which the partition statistics is computed. - * @param snapshotId snapshot for which partition statistics are computed. - * @return {@link PartitionStatisticsFile} for the given snapshot, or null if no statistics are - * present. - */ - public static PartitionStatisticsFile computeAndWriteStatsFile(Table table, long snapshotId) - throws IOException { - Snapshot snapshot = table.snapshot(snapshotId); - Preconditions.checkArgument(snapshot != null, "Snapshot not found: %s", snapshotId); - - Collection stats = PartitionStatsUtil.computeStats(table, snapshot); - if (stats.isEmpty()) { - return null; - } - - StructType partitionType = Partitioning.partitionType(table); - List sortedStats = PartitionStatsUtil.sortStats(stats, partitionType); - return writePartitionStatsFile( - table, snapshot.snapshotId(), schema(partitionType), sortedStats); - } - - @VisibleForTesting - static PartitionStatisticsFile writePartitionStatsFile( - Table table, long snapshotId, Schema dataSchema, Iterable records) - throws IOException { - FileFormat fileFormat = - FileFormat.fromString( - table.properties().getOrDefault(DEFAULT_FILE_FORMAT, DEFAULT_FILE_FORMAT_DEFAULT)); - - if (fileFormat == FileFormat.ORC) { - // Internal writers are not supported for ORC yet. Temporary we go with AVRO. - fileFormat = FileFormat.AVRO; - } - - OutputFile outputFile = newPartitionStatsFile(table, fileFormat, snapshotId); - - try (DataWriter writer = dataWriter(dataSchema, outputFile, fileFormat)) { - records.iterator().forEachRemaining(writer::write); - } - - return ImmutableGenericPartitionStatisticsFile.builder() - .snapshotId(snapshotId) - .path(outputFile.location()) - .fileSizeInBytes(outputFile.toInputFile().getLength()) - .build(); - } - - /** - * Reads partition statistics from the specified {@link InputFile} using given schema. - * - * @param schema The {@link Schema} of the partition statistics file. - * @param inputFile An {@link InputFile} pointing to the partition stats file. - */ - public static CloseableIterable readPartitionStatsFile( - Schema schema, InputFile inputFile) { - CloseableIterable records = dataReader(schema, inputFile); - return CloseableIterable.transform(records, PartitionStatsHandler::recordToPartitionStats); - } - - private static OutputFile newPartitionStatsFile( - Table table, FileFormat fileFormat, long snapshotId) { - Preconditions.checkArgument( - table instanceof HasTableOperations, - "Table must have operations to retrieve metadata location"); - - return table - .io() - .newOutputFile( - ((HasTableOperations) table) - .operations() - .metadataFileLocation( - fileFormat.addExtension( - String.format( - Locale.ROOT, "partition-stats-%d-%s", snapshotId, UUID.randomUUID())))); - } - - private static DataWriter dataWriter( - Schema dataSchema, OutputFile outputFile, FileFormat fileFormat) throws IOException { - switch (fileFormat) { - case PARQUET: - return Parquet.writeData(outputFile) - .schema(dataSchema) - .createWriterFunc(InternalWriter::createWriter) - .withSpec(PartitionSpec.unpartitioned()) - .build(); - case ORC: - // Internal writers are not supported for ORC yet. Temporary we go with AVRO. - case AVRO: - return Avro.writeData(outputFile) - .schema(dataSchema) - .createWriterFunc(org.apache.iceberg.avro.InternalWriter::create) - .withSpec(PartitionSpec.unpartitioned()) - .build(); - default: - throw new UnsupportedOperationException("Unsupported file format:" + fileFormat.name()); - } - } - - private static CloseableIterable dataReader(Schema schema, InputFile inputFile) { - FileFormat fileFormat = FileFormat.fromFileName(inputFile.location()); - Preconditions.checkArgument( - fileFormat != null, "Unable to determine format of file: %s", inputFile.location()); - - switch (fileFormat) { - case PARQUET: - return Parquet.read(inputFile) - .project(schema) - .createReaderFunc( - fileSchema -> - org.apache.iceberg.data.parquet.InternalReader.create(schema, fileSchema)) - .build(); - case ORC: - // Internal writers are not supported for ORC yet. Temporary we go with AVRO. - case AVRO: - return Avro.read(inputFile) - .project(schema) - .createReaderFunc(fileSchema -> InternalReader.create(schema)) - .build(); - default: - throw new UnsupportedOperationException("Unsupported file format:" + fileFormat.name()); - } - } - - private static PartitionStats recordToPartitionStats(StructLike record) { - PartitionStats stats = - new PartitionStats( - record.get(PARTITION_FIELD_ID, StructLike.class), - record.get(SPEC_ID.fieldId(), Integer.class)); - stats.set(DATA_RECORD_COUNT.fieldId(), record.get(DATA_RECORD_COUNT.fieldId(), Long.class)); - stats.set(DATA_FILE_COUNT.fieldId(), record.get(DATA_FILE_COUNT.fieldId(), Integer.class)); - stats.set( - TOTAL_DATA_FILE_SIZE_IN_BYTES.fieldId(), - record.get(TOTAL_DATA_FILE_SIZE_IN_BYTES.fieldId(), Long.class)); - stats.set( - POSITION_DELETE_RECORD_COUNT.fieldId(), - record.get(POSITION_DELETE_RECORD_COUNT.fieldId(), Long.class)); - stats.set( - POSITION_DELETE_FILE_COUNT.fieldId(), - record.get(POSITION_DELETE_FILE_COUNT.fieldId(), Integer.class)); - stats.set( - EQUALITY_DELETE_RECORD_COUNT.fieldId(), - record.get(EQUALITY_DELETE_RECORD_COUNT.fieldId(), Long.class)); - stats.set( - EQUALITY_DELETE_FILE_COUNT.fieldId(), - record.get(EQUALITY_DELETE_FILE_COUNT.fieldId(), Integer.class)); - stats.set(TOTAL_RECORD_COUNT.fieldId(), record.get(TOTAL_RECORD_COUNT.fieldId(), Long.class)); - stats.set(LAST_UPDATED_AT.fieldId(), record.get(LAST_UPDATED_AT.fieldId(), Long.class)); - stats.set( - LAST_UPDATED_SNAPSHOT_ID.fieldId(), - record.get(LAST_UPDATED_SNAPSHOT_ID.fieldId(), Long.class)); - return stats; - } -} diff --git a/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/BaseHiveIcebergMetaHook.java b/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/BaseHiveIcebergMetaHook.java index 58908b006610..3ce6aa484b25 100644 --- a/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/BaseHiveIcebergMetaHook.java +++ b/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/BaseHiveIcebergMetaHook.java @@ -46,7 +46,6 @@ import org.apache.hadoop.hive.ql.ddl.misc.sortoder.ZOrderFields; import org.apache.hadoop.hive.ql.util.NullOrdering; import org.apache.iceberg.BaseMetastoreTableOperations; -import org.apache.iceberg.BaseTable; import org.apache.iceberg.NullOrder; import org.apache.iceberg.PartitionSpec; import org.apache.iceberg.PartitionSpecParser; @@ -57,6 +56,7 @@ import org.apache.iceberg.SortOrderParser; import org.apache.iceberg.Table; import org.apache.iceberg.TableProperties; +import org.apache.iceberg.TableUtil; import org.apache.iceberg.catalog.TableIdentifier; import org.apache.iceberg.exceptions.NoSuchTableException; import org.apache.iceberg.exceptions.NotFoundException; @@ -440,7 +440,7 @@ static boolean isOrcFileFormat(org.apache.hadoop.hive.metastore.api.Table hmsTab } protected void setWriteModeDefaults(Table icebergTbl, Map newProps, EnvironmentContext context) { - if ((icebergTbl == null || ((BaseTable) icebergTbl).operations().current().formatVersion() == 1) && + if ((icebergTbl == null || TableUtil.formatVersion(icebergTbl) == 1) && IcebergTableUtil.isV2TableOrAbove(newProps)) { List writeModeList = ImmutableList.of( TableProperties.DELETE_MODE, TableProperties.UPDATE_MODE, TableProperties.MERGE_MODE); @@ -472,7 +472,7 @@ public void postGetTable(org.apache.hadoop.hive.metastore.api.Table hmsTable) { if (hmsTable != null) { try { Table tbl = IcebergTableUtil.getTable(conf, hmsTable); - String formatVersion = String.valueOf(((BaseTable) tbl).operations().current().formatVersion()); + String formatVersion = String.valueOf(TableUtil.formatVersion(tbl)); hmsTable.getParameters().put(TableProperties.FORMAT_VERSION, formatVersion); // Set the serde info hmsTable.getSd().setInputFormat(HiveIcebergInputFormat.class.getName()); diff --git a/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergMetaHook.java b/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergMetaHook.java index fee98b185d48..3233adcaaa78 100644 --- a/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergMetaHook.java +++ b/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergMetaHook.java @@ -96,6 +96,7 @@ import org.apache.iceberg.TableMetadata; import org.apache.iceberg.TableMetadataParser; import org.apache.iceberg.TableProperties; +import org.apache.iceberg.TableUtil; import org.apache.iceberg.Transaction; import org.apache.iceberg.UpdatePartitionSpec; import org.apache.iceberg.UpdateProperties; @@ -552,7 +553,7 @@ public void rollbackAlterTable(org.apache.hadoop.hive.metastore.api.Table hmsTab } // we want to keep the data files but get rid of the metadata directory - String metadataLocation = ((BaseTable) this.icebergTable).operations().current().metadataFileLocation(); + String metadataLocation = TableUtil.metadataFileLocation(this.icebergTable); try { Path path = new Path(metadataLocation).getParent(); FileSystem.get(path.toUri(), conf).delete(path, true); diff --git a/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergOutputFormat.java b/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergOutputFormat.java index cac9b480232a..1956d8100b6d 100644 --- a/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergOutputFormat.java +++ b/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergOutputFormat.java @@ -72,7 +72,7 @@ private static HiveIcebergWriter writer(JobConf jc) { } private static void setWriterLevelConfiguration(JobConf jc, Table table) { - final String writeFormat = table.properties().get("write.format.default"); + final String writeFormat = table.properties().get(TableProperties.DEFAULT_FILE_FORMAT); if (writeFormat == null || "PARQUET".equalsIgnoreCase(writeFormat)) { if (table.properties().get(TableProperties.PARQUET_ROW_GROUP_SIZE_BYTES) == null && jc.get(ParquetOutputFormat.BLOCK_SIZE) != null) { diff --git a/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergStorageHandler.java b/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergStorageHandler.java index d5c58e63f6e5..5a9dc06bfac5 100644 --- a/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergStorageHandler.java +++ b/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergStorageHandler.java @@ -85,7 +85,6 @@ import org.apache.hadoop.hive.ql.exec.UDFArgumentException; import org.apache.hadoop.hive.ql.exec.Utilities; import org.apache.hadoop.hive.ql.hooks.WriteEntity; -import org.apache.hadoop.hive.ql.io.IOConstants; import org.apache.hadoop.hive.ql.io.StorageFormatDescriptor; import org.apache.hadoop.hive.ql.io.parquet.vector.VectorizedParquetRecordReader; import org.apache.hadoop.hive.ql.io.sarg.ConvertAstToSearchArg; @@ -151,6 +150,7 @@ import org.apache.iceberg.PartitionSpecParser; import org.apache.iceberg.PartitionStatisticsFile; import org.apache.iceberg.PartitionStats; +import org.apache.iceberg.PartitionStatsHandler; import org.apache.iceberg.Partitioning; import org.apache.iceberg.RowLevelOperationMode; import org.apache.iceberg.Schema; @@ -164,8 +164,9 @@ import org.apache.iceberg.TableMetadata; import org.apache.iceberg.TableProperties; import org.apache.iceberg.TableScan; +import org.apache.iceberg.TableUtil; +import org.apache.iceberg.Transaction; import org.apache.iceberg.actions.DeleteOrphanFiles; -import org.apache.iceberg.data.PartitionStatsHandler; import org.apache.iceberg.exceptions.NoSuchTableException; import org.apache.iceberg.exceptions.ValidationException; import org.apache.iceberg.expressions.Evaluator; @@ -433,8 +434,8 @@ public boolean canProvidePartitionStatistics(org.apache.hadoop.hive.ql.metadata. public StorageFormatDescriptor getStorageFormatDescriptor(org.apache.hadoop.hive.metastore.api.Table table) throws SemanticException { if (table.getParameters() != null) { - String format = table.getParameters().getOrDefault(TableProperties.DEFAULT_FILE_FORMAT, IOConstants.PARQUET); - return StorageFormat.getDescriptor(format, TableProperties.DEFAULT_FILE_FORMAT); + FileFormat format = IcebergTableUtil.defaultFileFormat(table.getParameters()::getOrDefault); + return StorageFormat.getDescriptor(format.name(), TableProperties.DEFAULT_FILE_FORMAT); } return null; } @@ -460,8 +461,8 @@ public void appendFiles(org.apache.hadoop.hive.metastore.api.Table table, URI fr Map partitionSpec) throws SemanticException { Table icebergTbl = IcebergTableUtil.getTable(conf, table); - String format = table.getParameters().get(TableProperties.DEFAULT_FILE_FORMAT); - HiveTableUtil.appendFiles(fromURI, format, icebergTbl, isOverwrite, partitionSpec, conf); + FileFormat format = IcebergTableUtil.defaultFileFormat(icebergTbl); + HiveTableUtil.appendFiles(fromURI, format.name(), icebergTbl, isOverwrite, partitionSpec, conf); } @Override @@ -535,7 +536,19 @@ public Map computeBasicStatistics(Partish partish) { PartitionStatisticsFile statsFile = IcebergTableUtil.getPartitionStatsFile(table, snapshot.snapshotId()); if (statsFile == null) { try { - statsFile = PartitionStatsHandler.computeAndWriteStatsFile(table); + Table statsTable = table; + if (FileFormat.ORC == IcebergTableUtil.defaultFileFormat(table)) { + // PartitionStatsHandler uses the table default file format for writing the stats file. + // ORC is not supported by InternalData writers, so we create an uncommitted transaction + // view of the table without DEFAULT_FILE_FORMAT to fall back to DEFAULT_FILE_FORMAT_DEFAULT. + // NOTE: we intentionally do not call commitTransaction(), so this property change is never published. + Transaction tx = table.newTransaction(); + tx.updateProperties() + .remove(TableProperties.DEFAULT_FILE_FORMAT) + .commit(); + statsTable = tx.table(); + } + statsFile = PartitionStatsHandler.computeAndWriteStatsFile(statsTable); } catch (IOException e) { throw new UncheckedIOException(e); } @@ -553,7 +566,7 @@ private static Map getPartishSummary(Partish partish, Table tabl PartitionStatisticsFile statsFile = IcebergTableUtil.getPartitionStatsFile(table, snapshot.snapshotId()); if (statsFile != null) { Types.StructType partitionType = Partitioning.partitionType(table); - Schema recordSchema = PartitionStatsHandler.schema(partitionType); + Schema recordSchema = PartitionStatsHandler.schema(partitionType, TableUtil.formatVersion(table)); try (CloseableIterable recordIterator = PartitionStatsHandler.readPartitionStatsFile( recordSchema, table.io().newInputFile(statsFile.path()))) { @@ -1115,7 +1128,7 @@ public void executeOperation(org.apache.hadoop.hive.ql.metadata.Table hmsTable, "try altering the metadata location to the current metadata location by executing the following query:" + "ALTER TABLE {}.{} SET TBLPROPERTIES('metadata_location'='{}'). This operation is supported for Hive " + "Catalog tables.", hmsTable.getDbName(), hmsTable.getTableName(), - ((BaseTable) icebergTable).operations().current().metadataFileLocation()); + TableUtil.metadataFileLocation(icebergTable)); AlterTableExecuteSpec.RollbackSpec rollbackSpec = (AlterTableExecuteSpec.RollbackSpec) executeSpec.getOperationParams(); IcebergTableUtil.rollback(icebergTable, rollbackSpec.getRollbackType(), rollbackSpec.getParam()); @@ -1377,8 +1390,7 @@ public URI getURIForAuth(org.apache.hadoop.hive.metastore.api.Table hmsTable) th .append(encodeString("/metadata/dummy.metadata.json")); } else { Table table = IcebergTableUtil.getTable(conf, hmsTable); - authURI.append(getPathForAuth(((BaseTable) table).operations().current().metadataFileLocation(), - hmsTable.getSd().getLocation())); + authURI.append(getPathForAuth(TableUtil.metadataFileLocation(table), hmsTable.getSd().getLocation())); } } LOG.debug("Iceberg storage handler authorization URI {}", authURI); @@ -1736,7 +1748,7 @@ private String collectColumnAndReplaceDummyValues(ExprNodeDesc node, String foun private void fallbackToNonVectorizedModeBasedOnProperties(Properties tableProps) { Schema tableSchema = SchemaParser.fromJson(tableProps.getProperty(InputFormatConfig.TABLE_SCHEMA)); - if (FileFormat.AVRO.name().equalsIgnoreCase(tableProps.getProperty(TableProperties.DEFAULT_FILE_FORMAT)) || + if (FileFormat.AVRO == IcebergTableUtil.defaultFileFormat(tableProps::getProperty) || isValidMetadataTable(tableProps.getProperty(IcebergAcidUtil.META_TABLE_PROPERTY)) || hasOrcTimeInSchema(tableProps, tableSchema) || !hasParquetNestedTypeWithinListOrMap(tableProps, tableSchema)) { @@ -1755,10 +1767,11 @@ private void fallbackToNonVectorizedModeBasedOnProperties(Properties tableProps) * @return true if having time type column */ private static boolean hasOrcTimeInSchema(Properties tableProps, Schema tableSchema) { - if (!FileFormat.ORC.name().equalsIgnoreCase(tableProps.getProperty(TableProperties.DEFAULT_FILE_FORMAT))) { + if (FileFormat.ORC != IcebergTableUtil.defaultFileFormat(tableProps::getProperty)) { return false; } - return tableSchema.columns().stream().anyMatch(f -> Types.TimeType.get().typeId() == f.type().typeId()); + return tableSchema.columns().stream() + .anyMatch(f -> Types.TimeType.get().typeId() == f.type().typeId()); } /** @@ -1770,7 +1783,7 @@ private static boolean hasOrcTimeInSchema(Properties tableProps, Schema tableSch * @return true if having nested types */ private static boolean hasParquetNestedTypeWithinListOrMap(Properties tableProps, Schema tableSchema) { - if (!FileFormat.PARQUET.name().equalsIgnoreCase(tableProps.getProperty(TableProperties.DEFAULT_FILE_FORMAT))) { + if (FileFormat.PARQUET != IcebergTableUtil.defaultFileFormat(tableProps::getProperty)) { return true; } diff --git a/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/IcebergTableUtil.java b/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/IcebergTableUtil.java index e0f70b81a531..6e0bb0a84df5 100644 --- a/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/IcebergTableUtil.java +++ b/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/IcebergTableUtil.java @@ -67,6 +67,7 @@ import org.apache.hadoop.util.Sets; import org.apache.iceberg.ContentFile; import org.apache.iceberg.DeleteFiles; +import org.apache.iceberg.FileFormat; import org.apache.iceberg.FileScanTask; import org.apache.iceberg.ManageSnapshots; import org.apache.iceberg.ManifestFile; @@ -440,6 +441,17 @@ private static Integer formatVersion(BinaryOperator props) { } } + public static FileFormat defaultFileFormat(Table table) { + return defaultFileFormat(table.properties()::getOrDefault); + } + + public static FileFormat defaultFileFormat(BinaryOperator props) { + return FileFormat.fromString( + props.apply( + TableProperties.DEFAULT_FILE_FORMAT, + TableProperties.DEFAULT_FILE_FORMAT_DEFAULT)); + } + private static String getWriteModeDefault(BinaryOperator props) { return (isV2TableOrAbove(props) ? MERGE_ON_READ : COPY_ON_WRITE).modeName(); } diff --git a/iceberg/iceberg-handler/src/test/java/org/apache/iceberg/data/TestPartitionStatsHandler.java b/iceberg/iceberg-handler/src/test/java/org/apache/iceberg/data/TestPartitionStatsHandler.java deleted file mode 100644 index 753af7505ecc..000000000000 --- a/iceberg/iceberg-handler/src/test/java/org/apache/iceberg/data/TestPartitionStatsHandler.java +++ /dev/null @@ -1,605 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.apache.iceberg.data; - -import java.io.File; -import java.io.IOException; -import java.math.BigDecimal; -import java.nio.ByteBuffer; -import java.nio.file.Path; -import java.util.Arrays; -import java.util.Collections; -import java.util.Comparator; -import java.util.List; -import java.util.Objects; -import java.util.Random; -import java.util.UUID; -import java.util.concurrent.ThreadLocalRandom; -import org.apache.iceberg.DataFile; -import org.apache.iceberg.DeleteFile; -import org.apache.iceberg.FileFormat; -import org.apache.iceberg.Files; -import org.apache.iceberg.PartitionData; -import org.apache.iceberg.PartitionSpec; -import org.apache.iceberg.PartitionStatisticsFile; -import org.apache.iceberg.PartitionStats; -import org.apache.iceberg.Partitioning; -import org.apache.iceberg.Schema; -import org.apache.iceberg.Snapshot; -import org.apache.iceberg.SortOrder; -import org.apache.iceberg.StructLike; -import org.apache.iceberg.Table; -import org.apache.iceberg.TableProperties; -import org.apache.iceberg.TestHelpers; -import org.apache.iceberg.TestTables; -import org.apache.iceberg.deletes.PositionDelete; -import org.apache.iceberg.expressions.Literal; -import org.apache.iceberg.io.CloseableIterable; -import org.apache.iceberg.io.OutputFile; -import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList; -import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap; -import org.apache.iceberg.relocated.com.google.common.collect.Lists; -import org.apache.iceberg.types.Comparators; -import org.apache.iceberg.types.Types; -import org.assertj.core.api.Assumptions; -import org.assertj.core.groups.Tuple; -import org.junit.jupiter.api.Test; -import org.junit.jupiter.api.io.TempDir; - -import static org.apache.iceberg.data.PartitionStatsHandler.DATA_FILE_COUNT; -import static org.apache.iceberg.data.PartitionStatsHandler.DATA_RECORD_COUNT; -import static org.apache.iceberg.data.PartitionStatsHandler.EQUALITY_DELETE_FILE_COUNT; -import static org.apache.iceberg.data.PartitionStatsHandler.EQUALITY_DELETE_RECORD_COUNT; -import static org.apache.iceberg.data.PartitionStatsHandler.LAST_UPDATED_AT; -import static org.apache.iceberg.data.PartitionStatsHandler.LAST_UPDATED_SNAPSHOT_ID; -import static org.apache.iceberg.data.PartitionStatsHandler.PARTITION_FIELD_ID; -import static org.apache.iceberg.data.PartitionStatsHandler.POSITION_DELETE_FILE_COUNT; -import static org.apache.iceberg.data.PartitionStatsHandler.POSITION_DELETE_RECORD_COUNT; -import static org.apache.iceberg.data.PartitionStatsHandler.TOTAL_DATA_FILE_SIZE_IN_BYTES; -import static org.apache.iceberg.data.PartitionStatsHandler.TOTAL_RECORD_COUNT; -import static org.apache.iceberg.types.Types.NestedField.optional; -import static org.apache.iceberg.types.Types.NestedField.required; -import static org.assertj.core.api.Assertions.assertThat; -import static org.assertj.core.api.Assertions.assertThatThrownBy; - -public class TestPartitionStatsHandler { - private static final Schema SCHEMA = - new Schema( - optional(1, "c1", Types.IntegerType.get()), - optional(2, "c2", Types.StringType.get()), - optional(3, "c3", Types.StringType.get())); - - protected static final PartitionSpec SPEC = - PartitionSpec.builderFor(SCHEMA).identity("c2").identity("c3").build(); - - @TempDir - public Path temp; - - private static final Random RANDOM = ThreadLocalRandom.current(); - -// @Parameters(name = "fileFormat = {0}") -// public static List parameters() { -// return Arrays.asList(FileFormat.PARQUET, FileFormat.ORC, FileFormat.AVRO); -// } - - public FileFormat format = FileFormat.AVRO; - - @Test - public void testPartitionStatsOnEmptyTable() throws Exception { - Table testTable = TestTables.create(tempDir("empty_table"), "empty_table", SCHEMA, SPEC, 2); - assertThat(PartitionStatsHandler.computeAndWriteStatsFile(testTable)).isNull(); - } - - @Test - public void testPartitionStatsOnEmptyBranch() throws Exception { - Table testTable = TestTables.create(tempDir("empty_branch"), "empty_branch", SCHEMA, SPEC, 2); - testTable.manageSnapshots().createBranch("b1").commit(); - long branchSnapshot = testTable.refs().get("b1").snapshotId(); - assertThat(PartitionStatsHandler.computeAndWriteStatsFile(testTable, branchSnapshot)).isNull(); - } - - @Test - public void testPartitionStatsOnInvalidSnapshot() throws Exception { - Table testTable = - TestTables.create(tempDir("invalid_snapshot"), "invalid_snapshot", SCHEMA, SPEC, 2); - assertThatThrownBy(() -> PartitionStatsHandler.computeAndWriteStatsFile(testTable, 42L)) - .isInstanceOf(IllegalArgumentException.class) - .hasMessage("Snapshot not found: 42"); - } - - @Test - public void testPartitionStatsOnUnPartitionedTable() throws Exception { - Table testTable = - TestTables.create( - tempDir("unpartitioned_table"), - "unpartitioned_table", - SCHEMA, - PartitionSpec.unpartitioned(), - 2); - - List records = prepareRecords(testTable.schema()); - DataFile dataFile = FileHelpers.writeDataFile(testTable, outputFile(), records); - testTable.newAppend().appendFile(dataFile).commit(); - - assertThatThrownBy(() -> PartitionStatsHandler.computeAndWriteStatsFile(testTable)) - .isInstanceOf(IllegalArgumentException.class) - .hasMessage("table must be partitioned"); - } - - @Test - public void testAllDatatypePartitionWriting() throws Exception { - Schema schema = - new Schema( - required(100, "id", Types.LongType.get()), - optional(101, "data", Types.StringType.get()), - required(102, "b", Types.BooleanType.get()), - optional(103, "i", Types.IntegerType.get()), - required(104, "l", Types.LongType.get()), - optional(105, "f", Types.FloatType.get()), - required(106, "d", Types.DoubleType.get()), - optional(107, "date", Types.DateType.get()), - required(108, "ts", Types.TimestampType.withoutZone()), - required(110, "s", Types.StringType.get()), - required(111, "uuid", Types.UUIDType.get()), - required(112, "fixed", Types.FixedType.ofLength(7)), - optional(113, "bytes", Types.BinaryType.get()), - required(114, "dec_9_0", Types.DecimalType.of(9, 0)), - required(115, "dec_11_2", Types.DecimalType.of(11, 2)), - required(116, "dec_38_10", Types.DecimalType.of(38, 10)), // maximum precision - required(117, "time", Types.TimeType.get())); - - PartitionSpec spec = - PartitionSpec.builderFor(schema) - .identity("b") - .identity("i") - .identity("l") - .identity("f") - .identity("d") - .identity("date") - .identity("ts") - .identity("s") - .identity("uuid") - .identity("fixed") - .identity("bytes") - .identity("dec_9_0") - .identity("dec_11_2") - .identity("dec_38_10") - .identity("time") - .build(); - - Table testTable = - TestTables.create( - tempDir("test_all_type"), "test_all_type", schema, spec, SortOrder.unsorted(), 2); - - Types.StructType partitionSchema = Partitioning.partitionType(testTable); - Schema dataSchema = PartitionStatsHandler.schema(partitionSchema); - - PartitionData partitionData = - new PartitionData(dataSchema.findField(PARTITION_FIELD_ID).type().asStructType()); - partitionData.set(0, true); - partitionData.set(1, 42); - partitionData.set(2, 42L); - partitionData.set(3, 3.14f); - partitionData.set(4, 3.141592653589793); - partitionData.set(5, Literal.of("2022-01-01").to(Types.DateType.get()).value()); - partitionData.set( - 6, Literal.of("2017-12-01T10:12:55.038194").to(Types.TimestampType.withoutZone()).value()); - partitionData.set(7, "string"); - partitionData.set(8, UUID.randomUUID()); - partitionData.set(9, ByteBuffer.wrap(new byte[] {0, 1, 2, 3, 4, 5, 6})); - partitionData.set(10, ByteBuffer.wrap(new byte[] {1, 2, 3})); - partitionData.set(11, new BigDecimal("123456789")); - partitionData.set(12, new BigDecimal("1234567.89")); - partitionData.set(13, new BigDecimal("12345678901234567890.1234567890")); - partitionData.set(14, Literal.of("10:10:10").to(Types.TimeType.get()).value()); - - PartitionStats partitionStats = new PartitionStats(partitionData, RANDOM.nextInt(10)); - partitionStats.set(DATA_RECORD_COUNT.fieldId(), RANDOM.nextLong()); - partitionStats.set(DATA_FILE_COUNT.fieldId(), RANDOM.nextInt()); - partitionStats.set(TOTAL_DATA_FILE_SIZE_IN_BYTES.fieldId(), 1024L * RANDOM.nextInt(20)); - List expected = Collections.singletonList(partitionStats); - PartitionStatisticsFile statisticsFile = - PartitionStatsHandler.writePartitionStatsFile(testTable, 42L, dataSchema, expected); - - List written; - try (CloseableIterable recordIterator = - PartitionStatsHandler.readPartitionStatsFile( - dataSchema, Files.localInput(statisticsFile.path()))) { - written = Lists.newArrayList(recordIterator); - } - - assertThat(written).hasSize(expected.size()); - Comparator comparator = Comparators.forType(partitionSchema); - for (int i = 0; i < written.size(); i++) { - assertThat(isEqual(comparator, written.get(i), expected.get(i))).isTrue(); - } - } - - @Test - public void testOptionalFieldsWriting() throws Exception { - PartitionSpec spec = PartitionSpec.builderFor(SCHEMA).identity("c1").build(); - Table testTable = - TestTables.create( - tempDir("test_partition_stats_optional"), - "test_partition_stats_optional", - SCHEMA, - spec, - SortOrder.unsorted(), - 2); - - Types.StructType partitionSchema = Partitioning.partitionType(testTable); - Schema dataSchema = PartitionStatsHandler.schema(partitionSchema); - - ImmutableList.Builder partitionListBuilder = ImmutableList.builder(); - for (int i = 0; i < 5; i++) { - PartitionData partitionData = - new PartitionData(dataSchema.findField(PARTITION_FIELD_ID).type().asStructType()); - partitionData.set(0, RANDOM.nextInt()); - - PartitionStats stats = new PartitionStats(partitionData, RANDOM.nextInt(10)); - stats.set(PARTITION_FIELD_ID, partitionData); - stats.set(DATA_RECORD_COUNT.fieldId(), RANDOM.nextLong()); - stats.set(DATA_FILE_COUNT.fieldId(), RANDOM.nextInt()); - stats.set(TOTAL_DATA_FILE_SIZE_IN_BYTES.fieldId(), 1024L * RANDOM.nextInt(20)); - stats.set(POSITION_DELETE_RECORD_COUNT.fieldId(), null); - stats.set(POSITION_DELETE_FILE_COUNT.fieldId(), null); - stats.set(EQUALITY_DELETE_RECORD_COUNT.fieldId(), null); - stats.set(EQUALITY_DELETE_FILE_COUNT.fieldId(), null); - stats.set(TOTAL_RECORD_COUNT.fieldId(), null); - stats.set(LAST_UPDATED_AT.fieldId(), null); - stats.set(LAST_UPDATED_SNAPSHOT_ID.fieldId(), null); - - partitionListBuilder.add(stats); - } - - List expected = partitionListBuilder.build(); - - assertThat(expected.get(0)) - .extracting( - PartitionStats::positionDeleteRecordCount, - PartitionStats::positionDeleteFileCount, - PartitionStats::equalityDeleteRecordCount, - PartitionStats::equalityDeleteFileCount, - PartitionStats::totalRecords, - PartitionStats::lastUpdatedAt, - PartitionStats::lastUpdatedSnapshotId) - .isEqualTo( - Arrays.asList( - 0L, 0, 0L, 0, null, null, null)); // null counters must be initialized to zero. - - PartitionStatisticsFile statisticsFile = - PartitionStatsHandler.writePartitionStatsFile(testTable, 42L, dataSchema, expected); - - List written; - try (CloseableIterable recordIterator = - PartitionStatsHandler.readPartitionStatsFile( - dataSchema, Files.localInput(statisticsFile.path()))) { - written = Lists.newArrayList(recordIterator); - } - - assertThat(written).hasSize(expected.size()); - Comparator comparator = Comparators.forType(partitionSchema); - for (int i = 0; i < written.size(); i++) { - assertThat(isEqual(comparator, written.get(i), expected.get(i))).isTrue(); - } - } - - @SuppressWarnings("checkstyle:MethodLength") - @Test // Tests for all the table formats (PARQUET, ORC, AVRO) - public void testPartitionStats() throws Exception { - Assumptions.assumeThat(format) - .as("ORC internal readers and writers are not supported") - .isNotEqualTo(FileFormat.ORC); - - Table testTable = - TestTables.create( - tempDir("partition_stats_" + format.name()), - "partition_stats_compute_" + format.name(), - SCHEMA, - SPEC, - 2, - ImmutableMap.of(TableProperties.DEFAULT_FILE_FORMAT, format.name())); - - List records = prepareRecords(testTable.schema()); - DataFile dataFile1 = - FileHelpers.writeDataFile( - testTable, outputFile(), TestHelpers.Row.of("foo", "A"), records.subList(0, 3)); - DataFile dataFile2 = - FileHelpers.writeDataFile( - testTable, outputFile(), TestHelpers.Row.of("foo", "B"), records.subList(3, 4)); - DataFile dataFile3 = - FileHelpers.writeDataFile( - testTable, outputFile(), TestHelpers.Row.of("bar", "A"), records.subList(4, 5)); - DataFile dataFile4 = - FileHelpers.writeDataFile( - testTable, outputFile(), TestHelpers.Row.of("bar", "B"), records.subList(5, 7)); - - for (int i = 0; i < 3; i++) { - // insert same set of seven records thrice to have a new manifest files - testTable - .newAppend() - .appendFile(dataFile1) - .appendFile(dataFile2) - .appendFile(dataFile3) - .appendFile(dataFile4) - .commit(); - } - - Snapshot snapshot1 = testTable.currentSnapshot(); - Schema recordSchema = PartitionStatsHandler.schema(Partitioning.partitionType(testTable)); - Types.StructType partitionType = - recordSchema.findField(PARTITION_FIELD_ID).type().asStructType(); - computeAndValidatePartitionStats( - testTable, - recordSchema, - Tuple.tuple( - partitionRecord(partitionType, "foo", "A"), - 0, - 9L, - 3, - 3 * dataFile1.fileSizeInBytes(), - 0L, - 0, - 0L, - 0, - null, - snapshot1.timestampMillis(), - snapshot1.snapshotId()), - Tuple.tuple( - partitionRecord(partitionType, "foo", "B"), - 0, - 3L, - 3, - 3 * dataFile2.fileSizeInBytes(), - 0L, - 0, - 0L, - 0, - null, - snapshot1.timestampMillis(), - snapshot1.snapshotId()), - Tuple.tuple( - partitionRecord(partitionType, "bar", "A"), - 0, - 3L, - 3, - 3 * dataFile3.fileSizeInBytes(), - 0L, - 0, - 0L, - 0, - null, - snapshot1.timestampMillis(), - snapshot1.snapshotId()), - Tuple.tuple( - partitionRecord(partitionType, "bar", "B"), - 0, - 6L, - 3, - 3 * dataFile4.fileSizeInBytes(), - 0L, - 0, - 0L, - 0, - null, - snapshot1.timestampMillis(), - snapshot1.snapshotId())); - - DeleteFile posDeletes = commitPositionDeletes(testTable, dataFile1); - Snapshot snapshot2 = testTable.currentSnapshot(); - - DeleteFile eqDeletes = commitEqualityDeletes(testTable); - Snapshot snapshot3 = testTable.currentSnapshot(); - - recordSchema = PartitionStatsHandler.schema(Partitioning.partitionType(testTable)); - partitionType = recordSchema.findField(PARTITION_FIELD_ID).type().asStructType(); - computeAndValidatePartitionStats( - testTable, - recordSchema, - Tuple.tuple( - partitionRecord(partitionType, "foo", "A"), - 0, - 9L, - 3, - 3 * dataFile1.fileSizeInBytes(), - 0L, - 0, - eqDeletes.recordCount(), - 1, - null, - snapshot3.timestampMillis(), - snapshot3.snapshotId()), - Tuple.tuple( - partitionRecord(partitionType, "foo", "B"), - 0, - 3L, - 3, - 3 * dataFile2.fileSizeInBytes(), - 0L, - 0, - 0L, - 0, - null, - snapshot1.timestampMillis(), - snapshot1.snapshotId()), - Tuple.tuple( - partitionRecord(partitionType, "bar", "A"), - 0, - 3L, - 3, - 3 * dataFile3.fileSizeInBytes(), - posDeletes.recordCount(), - 1, - 0L, - 0, - null, - snapshot2.timestampMillis(), - snapshot2.snapshotId()), - Tuple.tuple( - partitionRecord(partitionType, "bar", "B"), - 0, - 6L, - 3, - 3 * dataFile4.fileSizeInBytes(), - 0L, - 0, - 0L, - 0, - null, - snapshot1.timestampMillis(), - snapshot1.snapshotId())); - } - - private OutputFile outputFile() throws IOException { - return Files.localOutput(File.createTempFile("data", null, tempDir("stats"))); - } - - private static StructLike partitionRecord( - Types.StructType partitionType, String val1, String val2) { - GenericRecord record = GenericRecord.create(partitionType); - record.set(0, val1); - record.set(1, val2); - return record; - } - - private static List prepareRecords(Schema schema) { - GenericRecord record = GenericRecord.create(schema); - List records = Lists.newArrayList(); - // foo 4 records, bar 3 records - // foo, A -> 3 records - records.add(record.copy("c1", 0, "c2", "foo", "c3", "A")); - records.add(record.copy("c1", 1, "c2", "foo", "c3", "A")); - records.add(record.copy("c1", 2, "c2", "foo", "c3", "A")); - // foo, B -> 1 record - records.add(record.copy("c1", 3, "c2", "foo", "c3", "B")); - // bar, A -> 1 record - records.add(record.copy("c1", 4, "c2", "bar", "c3", "A")); - // bar, B -> 2 records - records.add(record.copy("c1", 5, "c2", "bar", "c3", "B")); - records.add(record.copy("c1", 6, "c2", "bar", "c3", "B")); - return records; - } - - private static void computeAndValidatePartitionStats( - Table testTable, Schema recordSchema, Tuple... expectedValues) throws IOException { - // compute and commit partition stats file - Snapshot currentSnapshot = testTable.currentSnapshot(); - PartitionStatisticsFile result = PartitionStatsHandler.computeAndWriteStatsFile(testTable); - testTable.updatePartitionStatistics().setPartitionStatistics(result).commit(); - assertThat(result.snapshotId()).isEqualTo(currentSnapshot.snapshotId()); - - // read the partition entries from the stats file - List partitionStats; - try (CloseableIterable recordIterator = - PartitionStatsHandler.readPartitionStatsFile( - recordSchema, Files.localInput(result.path()))) { - partitionStats = Lists.newArrayList(recordIterator); - } - - assertThat(partitionStats) - .extracting( - PartitionStats::partition, - PartitionStats::specId, - PartitionStats::dataRecordCount, - PartitionStats::dataFileCount, - PartitionStats::totalDataFileSizeInBytes, - PartitionStats::positionDeleteRecordCount, - PartitionStats::positionDeleteFileCount, - PartitionStats::equalityDeleteRecordCount, - PartitionStats::equalityDeleteFileCount, - PartitionStats::totalRecords, - PartitionStats::lastUpdatedAt, - PartitionStats::lastUpdatedSnapshotId) - .containsExactlyInAnyOrder(expectedValues); - } - - private DeleteFile commitEqualityDeletes(Table testTable) throws IOException { - Schema deleteRowSchema = testTable.schema().select("c1"); - Record dataDelete = GenericRecord.create(deleteRowSchema); - List dataDeletes = - Lists.newArrayList(dataDelete.copy("c1", 1), dataDelete.copy("c1", 2)); - - DeleteFile eqDeletes = - FileHelpers.writeDeleteFile( - testTable, - Files.localOutput(File.createTempFile("junit", null, tempDir("eq_delete"))), - TestHelpers.Row.of("foo", "A"), - dataDeletes, - deleteRowSchema); - testTable.newRowDelta().addDeletes(eqDeletes).commit(); - return eqDeletes; - } - - private DeleteFile commitPositionDeletes(Table testTable, DataFile dataFile1) throws IOException { - List> deletes = Lists.newArrayList(); - for (long i = 0; i < 2; i++) { - deletes.add( - positionDelete(testTable.schema(), dataFile1.location(), i, (int) i, String.valueOf(i))); - } - - DeleteFile posDeletes = - FileHelpers.writePosDeleteFile( - testTable, - Files.localOutput(File.createTempFile("junit", null, tempDir("pos_delete"))), - TestHelpers.Row.of("bar", "A"), - deletes); - testTable.newRowDelta().addDeletes(posDeletes).commit(); - return posDeletes; - } - - private static PositionDelete positionDelete( - Schema tableSchema, CharSequence path, Long position, Object... values) { - PositionDelete posDelete = PositionDelete.create(); - GenericRecord nested = GenericRecord.create(tableSchema); - for (int i = 0; i < values.length; i++) { - nested.set(i, values[i]); - } - - posDelete.set(path, position, nested); - return posDelete; - } - - private File tempDir(String folderName) throws IOException { - return java.nio.file.Files.createTempDirectory(temp.toAbsolutePath(), folderName).toFile(); - } - - @SuppressWarnings("checkstyle:CyclomaticComplexity") - private static boolean isEqual( - Comparator partitionComparator, PartitionStats stats1, PartitionStats stats2) { - if (stats1 == stats2) { - return true; - } else if (stats1 == null || stats2 == null) { - return false; - } - - return partitionComparator.compare(stats1.partition(), stats2.partition()) == 0 && - stats1.specId() == stats2.specId() && - stats1.dataRecordCount() == stats2.dataRecordCount() && - stats1.dataFileCount() == stats2.dataFileCount() && - stats1.totalDataFileSizeInBytes() == stats2.totalDataFileSizeInBytes() && - stats1.positionDeleteRecordCount() == stats2.positionDeleteRecordCount() && - stats1.positionDeleteFileCount() == stats2.positionDeleteFileCount() && - stats1.equalityDeleteRecordCount() == stats2.equalityDeleteRecordCount() && - stats1.equalityDeleteFileCount() == stats2.equalityDeleteFileCount() && - Objects.equals(stats1.totalRecords(), stats2.totalRecords()) && - Objects.equals(stats1.lastUpdatedAt(), stats2.lastUpdatedAt()) && - Objects.equals(stats1.lastUpdatedSnapshotId(), stats2.lastUpdatedSnapshotId()); - } -} diff --git a/iceberg/iceberg-handler/src/test/java/org/apache/iceberg/mr/hive/TestHiveIcebergRollback.java b/iceberg/iceberg-handler/src/test/java/org/apache/iceberg/mr/hive/TestHiveIcebergRollback.java index 4f6fb9699f90..6ba9a04d1dd2 100644 --- a/iceberg/iceberg-handler/src/test/java/org/apache/iceberg/mr/hive/TestHiveIcebergRollback.java +++ b/iceberg/iceberg-handler/src/test/java/org/apache/iceberg/mr/hive/TestHiveIcebergRollback.java @@ -22,9 +22,9 @@ import java.io.IOException; import org.apache.iceberg.AssertHelpers; -import org.apache.iceberg.BaseTable; import org.apache.iceberg.FileFormat; import org.apache.iceberg.Table; +import org.apache.iceberg.TableUtil; import org.apache.iceberg.catalog.TableIdentifier; import org.junit.Assert; import org.junit.Assume; @@ -99,7 +99,7 @@ public void testRevertRollback() throws IOException, InterruptedException { Table table = testTables.createTableWithVersions(shell, identifier.name(), HiveIcebergStorageHandlerTestUtils.CUSTOMER_SCHEMA, fileFormat, HiveIcebergStorageHandlerTestUtils.CUSTOMER_RECORDS, 2); - String metadataLocationBeforeRollback = ((BaseTable) table).operations().current().metadataFileLocation(); + String metadataLocationBeforeRollback = TableUtil.metadataFileLocation(table); shell.executeStatement("ALTER TABLE " + identifier.name() + " EXECUTE ROLLBACK(" + table.history().get(0).snapshotId() + ")"); Assert.assertEquals(3, shell.executeStatement("SELECT * FROM " + identifier.name()).size()); diff --git a/iceberg/iceberg-handler/src/test/java/org/apache/iceberg/mr/hive/TestHiveIcebergStorageHandlerNoScan.java b/iceberg/iceberg-handler/src/test/java/org/apache/iceberg/mr/hive/TestHiveIcebergStorageHandlerNoScan.java index 09229d3b91ba..2bcf4bedb0ed 100644 --- a/iceberg/iceberg-handler/src/test/java/org/apache/iceberg/mr/hive/TestHiveIcebergStorageHandlerNoScan.java +++ b/iceberg/iceberg-handler/src/test/java/org/apache/iceberg/mr/hive/TestHiveIcebergStorageHandlerNoScan.java @@ -59,6 +59,7 @@ import org.apache.iceberg.SnapshotSummary; import org.apache.iceberg.Table; import org.apache.iceberg.TableProperties; +import org.apache.iceberg.TableUtil; import org.apache.iceberg.UpdateSchema; import org.apache.iceberg.catalog.TableIdentifier; import org.apache.iceberg.data.Record; @@ -1262,7 +1263,7 @@ public void testCreateTableWithFormatV2ThroughTableProperty() { org.apache.iceberg.Table icebergTable = testTables.loadTable(identifier); Assert.assertEquals("should create table using format v2", - 2, ((BaseTable) icebergTable).operations().current().formatVersion()); + 2, TableUtil.formatVersion(icebergTable)); } @Test @@ -1522,7 +1523,7 @@ public void testAuthzURI(boolean masked) throws TException, InterruptedException URI uriForAuth = storageHandler.getURIForAuth(hmsTable); String metadataLocation = - storageHandler.getPathForAuth(((BaseTable) table).operations().current().metadataFileLocation(), + storageHandler.getPathForAuth(TableUtil.metadataFileLocation(table), hmsTable.getSd().getLocation()); if (masked) { @@ -1562,7 +1563,7 @@ public void testAuthzURIWithAuthEnabledWithMetadataLocation(boolean masked) thro PartitionSpec.unpartitioned(), FileFormat.PARQUET, ImmutableList.of()); String metadataFileLocation = - URI.create(((BaseTable) sourceTable).operations().current().metadataFileLocation()).getPath(); + URI.create(TableUtil.metadataFileLocation(sourceTable)).getPath(); TableIdentifier target = TableIdentifier.of("default", "target"); Table targetTable = testTables.createTable(shell, target.name(), HiveIcebergStorageHandlerTestUtils.CUSTOMER_SCHEMA, @@ -1617,7 +1618,7 @@ public void testAuthzURIWithAuthEnabledAndMockCommandAuthorizer(boolean masked) HiveIcebergStorageHandler storageHandler = new HiveIcebergStorageHandler(); storageHandler.setConf(shell.getHiveConf()); String metadataLocation = HiveConf.EncoderDecoderFactory.URL_ENCODER_DECODER.decode( - storageHandler.getPathForAuth(((BaseTable) table).operations().current().metadataFileLocation(), + storageHandler.getPathForAuth(TableUtil.metadataFileLocation(table), hmsTable.getSd().getLocation())); if (masked) { @@ -1656,7 +1657,7 @@ public void testAuthzURIWithAuthEnabled(boolean masked) throws TException, Inter storageHandler.setConf(shell.getHiveConf()); URI uriForAuth = storageHandler.getURIForAuth(hmsTable); String metadataLocation = - storageHandler.getPathForAuth(((BaseTable) table).operations().current().metadataFileLocation(), + storageHandler.getPathForAuth(TableUtil.metadataFileLocation(table), hmsTable.getSd().getLocation()); if (masked) { @@ -1678,7 +1679,7 @@ public void testCreateTableWithMetadataLocation() throws IOException { ImmutableMap.builder().put(InputFormatConfig.EXTERNAL_TABLE_PURGE, "FALSE").build()); testTables.appendIcebergTable(shell.getHiveConf(), sourceTable, FileFormat.PARQUET, null, HiveIcebergStorageHandlerTestUtils.CUSTOMER_RECORDS); - String metadataLocation = ((BaseTable) sourceTable).operations().current().metadataFileLocation(); + String metadataLocation = TableUtil.metadataFileLocation(sourceTable); shell.executeStatement("DROP TABLE " + sourceIdentifier.name()); TableIdentifier targetIdentifier = TableIdentifier.of("default", "target"); Table targetTable = @@ -1686,7 +1687,7 @@ public void testCreateTableWithMetadataLocation() throws IOException { PartitionSpec.unpartitioned(), FileFormat.PARQUET, Collections.emptyList(), 1, ImmutableMap.builder().put("metadata_location", metadataLocation).build() ); - Assert.assertEquals(metadataLocation, ((BaseTable) targetTable).operations().current().metadataFileLocation()); + Assert.assertEquals(metadataLocation, TableUtil.metadataFileLocation(targetTable)); List rows = shell.executeStatement("SELECT * FROM " + targetIdentifier.name()); List records = HiveIcebergTestUtils.valueForRow(targetTable.schema(), rows); HiveIcebergTestUtils.validateData(HiveIcebergStorageHandlerTestUtils.CUSTOMER_RECORDS, records, 0); @@ -1711,11 +1712,11 @@ public void testAlterTableWithMetadataLocation() throws IOException { PartitionSpec.unpartitioned(), FileFormat.PARQUET, Collections.emptyList(), 1, Collections.emptyMap()); testTables.appendIcebergTable(shell.getHiveConf(), table, FileFormat.PARQUET, null, HiveIcebergStorageHandlerTestUtils.CUSTOMER_RECORDS); - String firstMetadataLocation = ((BaseTable) table).operations().current().metadataFileLocation(); + String firstMetadataLocation = TableUtil.metadataFileLocation(table); testTables.appendIcebergTable(shell.getHiveConf(), table, FileFormat.PARQUET, null, HiveIcebergStorageHandlerTestUtils.CUSTOMER_RECORDS); table.refresh(); - String secondMetadataLocation = ((BaseTable) table).operations().current().metadataFileLocation(); + String secondMetadataLocation = TableUtil.metadataFileLocation(table); Assert.assertNotEquals(firstMetadataLocation, secondMetadataLocation); shell.executeStatement("ALTER TABLE " + tableIdentifier.name() + " SET TBLPROPERTIES('metadata_location'='" + firstMetadataLocation + "')"); @@ -1742,7 +1743,7 @@ public void testAlterTableWithMetadataLocationFromAnotherTable() throws IOExcept ImmutableMap.builder().put(InputFormatConfig.EXTERNAL_TABLE_PURGE, "FALSE").build()); testTables.appendIcebergTable(shell.getHiveConf(), sourceTable, FileFormat.PARQUET, null, HiveIcebergStorageHandlerTestUtils.CUSTOMER_RECORDS); - String metadataLocation = ((BaseTable) sourceTable).operations().current().metadataFileLocation(); + String metadataLocation = TableUtil.metadataFileLocation(sourceTable); shell.executeStatement("DROP TABLE " + sourceIdentifier.name()); TableIdentifier targetIdentifier = TableIdentifier.of("default", "target"); testTables.createTable(shell, targetIdentifier.name(), HiveIcebergStorageHandlerTestUtils.CUSTOMER_SCHEMA, @@ -1911,7 +1912,7 @@ public void testConcurrentIcebergCommitsAndHiveAlterTableCalls() throws Exceptio executorService.awaitTermination(1, TimeUnit.MINUTES); // Verify that the insert was effective - Assert.assertEquals(((BaseTable) testTables.loadTable(identifier)).operations().current().metadataFileLocation(), + Assert.assertEquals(TableUtil.metadataFileLocation(testTables.loadTable(identifier)), (long) HiveIcebergStorageHandlerTestUtils.CUSTOMER_RECORDS.size(), shell.executeStatement("select count(*) from customers").get(0)[0] ); @@ -1936,7 +1937,7 @@ public void testCreateTableWithMetadataLocationWithoutSchema() throws IOExceptio testTables.createTable(shell, sourceIdentifier.name(), HiveIcebergStorageHandlerTestUtils.CUSTOMER_SCHEMA, spec, FileFormat.PARQUET, records, 1, ImmutableMap.builder().put(InputFormatConfig.EXTERNAL_TABLE_PURGE, "FALSE").build()); - String metadataLocation = ((BaseTable) sourceTable).operations().current().metadataFileLocation(); + String metadataLocation = TableUtil.metadataFileLocation(sourceTable); shell.executeStatement("DROP TABLE " + sourceIdentifier.name()); TableIdentifier targetIdentifier = TableIdentifier.of("default", "target"); diff --git a/itests/hive-iceberg/pom.xml b/itests/hive-iceberg/pom.xml index e9812a6b2709..70bdb9bc95a1 100644 --- a/itests/hive-iceberg/pom.xml +++ b/itests/hive-iceberg/pom.xml @@ -27,7 +27,7 @@ ../.. UTF-8 false - 1.9.1 + 1.10.0 diff --git a/pom.xml b/pom.xml index 9dee9a882ee9..0d9fb17e16fd 100644 --- a/pom.xml +++ b/pom.xml @@ -202,7 +202,7 @@ 3.11.4 ${env.PROTOC_PATH} 0.16.1 - 1.2.1 + 1.3.0 1.0.1 1.7.30 4.0.4