diff --git a/presto-docs/src/main/sphinx/connector/iceberg.rst b/presto-docs/src/main/sphinx/connector/iceberg.rst index 87c9b1fb7d0bf..dd570295342af 100644 --- a/presto-docs/src/main/sphinx/connector/iceberg.rst +++ b/presto-docs/src/main/sphinx/connector/iceberg.rst @@ -256,15 +256,52 @@ Property Name Description ======================================================= ============================================================= ============ ``iceberg.catalog.warehouse`` The catalog warehouse root path for Iceberg tables. + The Hadoop catalog requires a file system that supports + an atomic rename operation, such as HDFS, to maintain + metadata files in order to implement an atomic transaction + commit. + Example: ``hdfs://nn:8020/warehouse/path`` + + Do not set ``iceberg.catalog.warehouse`` to a path in object + stores or local file systems in the production environment. + This property is required if the ``iceberg.catalog.type`` is ``hadoop``. Otherwise, it will be ignored. +``iceberg.catalog.hadoop.warehouse.datadir`` The catalog warehouse root data path for Iceberg tables. + It is only supported with the Hadoop catalog. + + Example: ``s3://iceberg_bucket/warehouse``. + + This optional property can be set to a path in object + stores or HDFS. + If set, all tables in this Hadoop catalog default to saving + their data and delete files in the specified root + data directory. + ``iceberg.catalog.cached-catalog-num`` The number of Iceberg catalogs to cache. This property is ``10`` required if the ``iceberg.catalog.type`` is ``hadoop``. Otherwise, it will be ignored. ======================================================= ============================================================= ============ +Configure the `Amazon S3 `_ +properties to specify a S3 location as the warehouse data directory for the Hadoop catalog. This way, +the data and delete files of Iceberg tables are stored in S3. An example configuration includes: + +.. code-block:: none + + connector.name=iceberg + iceberg.catalog.type=hadoop + iceberg.catalog.warehouse=hdfs://nn:8020/warehouse/path + iceberg.catalog.hadoop.warehouse.datadir=s3://iceberg_bucket/warehouse + + hive.s3.use-instance-credentials=false + hive.s3.aws-access-key=accesskey + hive.s3.aws-secret-key=secretkey + hive.s3.endpoint=http://192.168.0.103:9878 + hive.s3.path-style-access=true + Configuration Properties ------------------------ @@ -370,6 +407,12 @@ Property Name Description ``location`` Optionally specifies the file system location URI for the table. +``write.data.path`` Optionally specifies the file system location URI for + storing the data and delete files of the table. This only + applies to files written after this property is set. Files + previously written aren't relocated to reflect this + parameter. + ``format_version`` Optionally specifies the format version of the Iceberg ``2`` specification to use for new tables, either ``1`` or ``2``. diff --git a/presto-iceberg/pom.xml b/presto-iceberg/pom.xml index bbf7da4e68cf5..deaf21702d31c 100644 --- a/presto-iceberg/pom.xml +++ b/presto-iceberg/pom.xml @@ -263,6 +263,16 @@ + + com.amazonaws + aws-java-sdk-core + + + + com.amazonaws + aws-java-sdk-s3 + + org.apache.iceberg iceberg-core @@ -598,6 +608,18 @@ test + + org.testcontainers + testcontainers + test + + + org.slf4j + slf4j-api + + + + org.apache.iceberg iceberg-core diff --git a/presto-iceberg/src/main/java/com/facebook/presto/iceberg/IcebergAbstractMetadata.java b/presto-iceberg/src/main/java/com/facebook/presto/iceberg/IcebergAbstractMetadata.java index d46b44d9a271d..9198272f5775e 100644 --- a/presto-iceberg/src/main/java/com/facebook/presto/iceberg/IcebergAbstractMetadata.java +++ b/presto-iceberg/src/main/java/com/facebook/presto/iceberg/IcebergAbstractMetadata.java @@ -123,6 +123,7 @@ import static com.facebook.presto.hive.MetadataUtils.getDiscretePredicates; import static com.facebook.presto.hive.MetadataUtils.getPredicate; import static com.facebook.presto.hive.MetadataUtils.getSubfieldPredicate; +import static com.facebook.presto.hive.metastore.MetastoreUtil.TABLE_COMMENT; import static com.facebook.presto.iceberg.ExpressionConverter.toIcebergExpression; import static com.facebook.presto.iceberg.IcebergColumnHandle.DATA_SEQUENCE_NUMBER_COLUMN_HANDLE; import static com.facebook.presto.iceberg.IcebergColumnHandle.DATA_SEQUENCE_NUMBER_COLUMN_METADATA; @@ -146,6 +147,9 @@ import static com.facebook.presto.iceberg.IcebergTableProperties.METRICS_MAX_INFERRED_COLUMN; import static com.facebook.presto.iceberg.IcebergTableProperties.PARTITIONING_PROPERTY; import static com.facebook.presto.iceberg.IcebergTableProperties.SORTED_BY_PROPERTY; +import static com.facebook.presto.iceberg.IcebergTableProperties.getCommitRetries; +import static com.facebook.presto.iceberg.IcebergTableProperties.getFormatVersion; +import static com.facebook.presto.iceberg.IcebergTableProperties.getWriteDataLocation; import static com.facebook.presto.iceberg.IcebergTableType.CHANGELOG; import static com.facebook.presto.iceberg.IcebergTableType.DATA; import static com.facebook.presto.iceberg.IcebergTableType.EQUALITY_DELETES; @@ -153,14 +157,20 @@ import static com.facebook.presto.iceberg.IcebergUtil.getColumns; import static com.facebook.presto.iceberg.IcebergUtil.getDeleteMode; import static com.facebook.presto.iceberg.IcebergUtil.getFileFormat; +import static com.facebook.presto.iceberg.IcebergUtil.getMetadataPreviousVersionsMax; +import static com.facebook.presto.iceberg.IcebergUtil.getMetricsMaxInferredColumn; import static com.facebook.presto.iceberg.IcebergUtil.getPartitionFields; import static com.facebook.presto.iceberg.IcebergUtil.getPartitionKeyColumnHandles; import static com.facebook.presto.iceberg.IcebergUtil.getPartitionSpecsIncludingValidData; import static com.facebook.presto.iceberg.IcebergUtil.getPartitions; import static com.facebook.presto.iceberg.IcebergUtil.getSnapshotIdTimeOperator; import static com.facebook.presto.iceberg.IcebergUtil.getSortFields; +import static com.facebook.presto.iceberg.IcebergUtil.getSplitSize; import static com.facebook.presto.iceberg.IcebergUtil.getTableComment; +import static com.facebook.presto.iceberg.IcebergUtil.getUpdateMode; import static com.facebook.presto.iceberg.IcebergUtil.getViewComment; +import static com.facebook.presto.iceberg.IcebergUtil.isMetadataDeleteAfterCommit; +import static com.facebook.presto.iceberg.IcebergUtil.parseFormatVersion; import static com.facebook.presto.iceberg.IcebergUtil.resolveSnapshotIdByName; import static com.facebook.presto.iceberg.IcebergUtil.toHiveColumns; import static com.facebook.presto.iceberg.IcebergUtil.tryGetLocation; @@ -188,6 +198,7 @@ import static com.facebook.presto.spi.StandardErrorCode.NOT_SUPPORTED; import static com.facebook.presto.spi.StandardWarningCode.SORT_COLUMN_TRANSFORM_NOT_SUPPORTED_WARNING; import static com.facebook.presto.spi.statistics.TableStatisticType.ROW_COUNT; +import static com.google.common.base.Strings.isNullOrEmpty; import static com.google.common.base.Verify.verify; import static com.google.common.collect.ImmutableList.toImmutableList; import static com.google.common.collect.ImmutableMap.toImmutableMap; @@ -200,10 +211,17 @@ import static org.apache.iceberg.SnapshotSummary.DELETED_RECORDS_PROP; import static org.apache.iceberg.SnapshotSummary.REMOVED_EQ_DELETES_PROP; import static org.apache.iceberg.SnapshotSummary.REMOVED_POS_DELETES_PROP; +import static org.apache.iceberg.TableProperties.COMMIT_NUM_RETRIES; +import static org.apache.iceberg.TableProperties.DEFAULT_FILE_FORMAT; import static org.apache.iceberg.TableProperties.DELETE_ISOLATION_LEVEL; import static org.apache.iceberg.TableProperties.DELETE_ISOLATION_LEVEL_DEFAULT; +import static org.apache.iceberg.TableProperties.METADATA_DELETE_AFTER_COMMIT_ENABLED; +import static org.apache.iceberg.TableProperties.METRICS_MAX_INFERRED_COLUMN_DEFAULTS; +import static org.apache.iceberg.TableProperties.ORC_COMPRESSION; +import static org.apache.iceberg.TableProperties.PARQUET_COMPRESSION; import static org.apache.iceberg.TableProperties.SPLIT_SIZE; import static org.apache.iceberg.TableProperties.UPDATE_MODE; +import static org.apache.iceberg.TableProperties.WRITE_DATA_LOCATION; public abstract class IcebergAbstractMetadata implements ConnectorMetadata @@ -716,12 +734,17 @@ protected ImmutableMap createMetadataProperties(Table icebergTab properties.put(LOCATION_PROPERTY, icebergTable.location()); } - properties.put(DELETE_MODE, IcebergUtil.getDeleteMode(icebergTable)); - properties.put(UPDATE_MODE, IcebergUtil.getUpdateMode(icebergTable)); - properties.put(METADATA_PREVIOUS_VERSIONS_MAX, IcebergUtil.getMetadataPreviousVersionsMax(icebergTable)); - properties.put(METADATA_DELETE_AFTER_COMMIT, IcebergUtil.isMetadataDeleteAfterCommit(icebergTable)); - properties.put(METRICS_MAX_INFERRED_COLUMN, IcebergUtil.getMetricsMaxInferredColumn(icebergTable)); - properties.put(SPLIT_SIZE, IcebergUtil.getSplitSize(icebergTable)); + String writeDataLocation = icebergTable.properties().get(WRITE_DATA_LOCATION); + if (!isNullOrEmpty(writeDataLocation)) { + properties.put(WRITE_DATA_LOCATION, writeDataLocation); + } + + properties.put(DELETE_MODE, getDeleteMode(icebergTable)); + properties.put(UPDATE_MODE, getUpdateMode(icebergTable)); + properties.put(METADATA_PREVIOUS_VERSIONS_MAX, getMetadataPreviousVersionsMax(icebergTable)); + properties.put(METADATA_DELETE_AFTER_COMMIT, isMetadataDeleteAfterCommit(icebergTable)); + properties.put(METRICS_MAX_INFERRED_COLUMN, getMetricsMaxInferredColumn(icebergTable)); + properties.put(SPLIT_SIZE, getSplitSize(icebergTable)); SortOrder sortOrder = icebergTable.sortOrder(); // TODO: Support sort column transforms (https://github.com/prestodb/presto/issues/24250) @@ -1140,6 +1163,62 @@ public void setTableProperties(ConnectorSession session, ConnectorTableHandle ta transaction.commitTransaction(); } + protected Map populateTableProperties(ConnectorTableMetadata tableMetadata, com.facebook.presto.iceberg.FileFormat fileFormat, ConnectorSession session) + { + ImmutableMap.Builder propertiesBuilder = ImmutableMap.builderWithExpectedSize(16); + + String writeDataLocation = getWriteDataLocation(tableMetadata.getProperties()); + if (!isNullOrEmpty(writeDataLocation)) { + propertiesBuilder.put(WRITE_DATA_LOCATION, writeDataLocation); + } + else { + Optional dataLocation = getDataLocationBasedOnWarehouseDataDir(tableMetadata.getTable()); + dataLocation.ifPresent(location -> propertiesBuilder.put(WRITE_DATA_LOCATION, location)); + } + + Integer commitRetries = getCommitRetries(tableMetadata.getProperties()); + propertiesBuilder.put(DEFAULT_FILE_FORMAT, fileFormat.toString()); + propertiesBuilder.put(COMMIT_NUM_RETRIES, String.valueOf(commitRetries)); + switch (fileFormat) { + case PARQUET: + propertiesBuilder.put(PARQUET_COMPRESSION, getCompressionCodec(session).getParquetCompressionCodec().get().toString()); + break; + case ORC: + propertiesBuilder.put(ORC_COMPRESSION, getCompressionCodec(session).getOrcCompressionKind().name()); + break; + } + if (tableMetadata.getComment().isPresent()) { + propertiesBuilder.put(TABLE_COMMENT, tableMetadata.getComment().get()); + } + + String formatVersion = getFormatVersion(tableMetadata.getProperties()); + verify(formatVersion != null, "Format version cannot be null"); + propertiesBuilder.put(TableProperties.FORMAT_VERSION, formatVersion); + + if (parseFormatVersion(formatVersion) < MIN_FORMAT_VERSION_FOR_DELETE) { + propertiesBuilder.put(TableProperties.DELETE_MODE, RowLevelOperationMode.COPY_ON_WRITE.modeName()); + propertiesBuilder.put(TableProperties.UPDATE_MODE, RowLevelOperationMode.COPY_ON_WRITE.modeName()); + } + else { + RowLevelOperationMode deleteMode = IcebergTableProperties.getDeleteMode(tableMetadata.getProperties()); + propertiesBuilder.put(TableProperties.DELETE_MODE, deleteMode.modeName()); + RowLevelOperationMode updateMode = IcebergTableProperties.getUpdateMode(tableMetadata.getProperties()); + propertiesBuilder.put(TableProperties.UPDATE_MODE, updateMode.modeName()); + } + + Integer metadataPreviousVersionsMax = IcebergTableProperties.getMetadataPreviousVersionsMax(tableMetadata.getProperties()); + propertiesBuilder.put(TableProperties.METADATA_PREVIOUS_VERSIONS_MAX, String.valueOf(metadataPreviousVersionsMax)); + + Boolean metadataDeleteAfterCommit = IcebergTableProperties.isMetadataDeleteAfterCommit(tableMetadata.getProperties()); + propertiesBuilder.put(METADATA_DELETE_AFTER_COMMIT_ENABLED, String.valueOf(metadataDeleteAfterCommit)); + + Integer metricsMaxInferredColumn = IcebergTableProperties.getMetricsMaxInferredColumn(tableMetadata.getProperties()); + propertiesBuilder.put(METRICS_MAX_INFERRED_COLUMN_DEFAULTS, String.valueOf(metricsMaxInferredColumn)); + + propertiesBuilder.put(SPLIT_SIZE, String.valueOf(IcebergTableProperties.getTargetSplitSize(tableMetadata.getProperties()))); + return propertiesBuilder.build(); + } + /** * Deletes all the files for a specific predicate * @@ -1265,4 +1344,9 @@ public void finishUpdate(ConnectorSession session, ConnectorTableHandle tableHan handle.getSortOrder()); finishWrite(session, outputTableHandle, fragments, UPDATE_AFTER); } + + protected Optional getDataLocationBasedOnWarehouseDataDir(SchemaTableName schemaTableName) + { + return Optional.empty(); + } } diff --git a/presto-iceberg/src/main/java/com/facebook/presto/iceberg/IcebergCommonModule.java b/presto-iceberg/src/main/java/com/facebook/presto/iceberg/IcebergCommonModule.java index 6ac0fc4622b24..9f65aebba0e3a 100644 --- a/presto-iceberg/src/main/java/com/facebook/presto/iceberg/IcebergCommonModule.java +++ b/presto-iceberg/src/main/java/com/facebook/presto/iceberg/IcebergCommonModule.java @@ -99,6 +99,8 @@ import static com.facebook.airlift.concurrent.Threads.daemonThreadsNamed; import static com.facebook.airlift.configuration.ConfigBinder.configBinder; import static com.facebook.airlift.json.JsonCodecBinder.jsonCodecBinder; +import static com.facebook.presto.common.Utils.checkArgument; +import static com.facebook.presto.iceberg.CatalogType.HADOOP; import static com.facebook.presto.orc.StripeMetadataSource.CacheableRowGroupIndices; import static com.facebook.presto.orc.StripeMetadataSource.CacheableSlice; import static com.google.common.util.concurrent.MoreExecutors.newDirectExecutorService; @@ -142,6 +144,9 @@ protected void setup(Binder binder) configBinder(binder).bindConfig(IcebergConfig.class); + IcebergConfig icebergConfig = buildConfigObject(IcebergConfig.class); + checkArgument(icebergConfig.getCatalogType().equals(HADOOP) || icebergConfig.getCatalogWarehouseDataDir() == null, "'iceberg.catalog.hadoop.warehouse.datadir' can only be specified in Hadoop catalog"); + binder.bind(IcebergSessionProperties.class).in(Scopes.SINGLETON); newOptionalBinder(binder, IcebergNessieConfig.class); // bind optional Nessie config to IcebergSessionProperties diff --git a/presto-iceberg/src/main/java/com/facebook/presto/iceberg/IcebergConfig.java b/presto-iceberg/src/main/java/com/facebook/presto/iceberg/IcebergConfig.java index 42328ddb4e7f4..d1bc37ff0e0be 100644 --- a/presto-iceberg/src/main/java/com/facebook/presto/iceberg/IcebergConfig.java +++ b/presto-iceberg/src/main/java/com/facebook/presto/iceberg/IcebergConfig.java @@ -50,6 +50,7 @@ public class IcebergConfig private HiveCompressionCodec compressionCodec = GZIP; private CatalogType catalogType = HIVE; private String catalogWarehouse; + private String catalogWarehouseDataDir; private int catalogCacheSize = 10; private int maxPartitionsPerWriter = 100; private List hadoopConfigResources = ImmutableList.of(); @@ -127,6 +128,19 @@ public IcebergConfig setCatalogWarehouse(String catalogWarehouse) return this; } + public String getCatalogWarehouseDataDir() + { + return catalogWarehouseDataDir; + } + + @Config("iceberg.catalog.hadoop.warehouse.datadir") + @ConfigDescription("Iceberg catalog default root data writing directory. This is only supported with Hadoop catalog.") + public IcebergConfig setCatalogWarehouseDataDir(String catalogWarehouseDataDir) + { + this.catalogWarehouseDataDir = catalogWarehouseDataDir; + return this; + } + @Min(1) public int getCatalogCacheSize() { diff --git a/presto-iceberg/src/main/java/com/facebook/presto/iceberg/IcebergHiveMetadata.java b/presto-iceberg/src/main/java/com/facebook/presto/iceberg/IcebergHiveMetadata.java index 5db5a45c24e85..e7b9f3858fc3f 100644 --- a/presto-iceberg/src/main/java/com/facebook/presto/iceberg/IcebergHiveMetadata.java +++ b/presto-iceberg/src/main/java/com/facebook/presto/iceberg/IcebergHiveMetadata.java @@ -123,7 +123,6 @@ import static com.facebook.presto.iceberg.IcebergUtil.getColumns; import static com.facebook.presto.iceberg.IcebergUtil.getHiveIcebergTable; import static com.facebook.presto.iceberg.IcebergUtil.isIcebergTable; -import static com.facebook.presto.iceberg.IcebergUtil.populateTableProperties; import static com.facebook.presto.iceberg.IcebergUtil.toHiveColumns; import static com.facebook.presto.iceberg.IcebergUtil.tryGetProperties; import static com.facebook.presto.iceberg.PartitionFields.parsePartitionFields; diff --git a/presto-iceberg/src/main/java/com/facebook/presto/iceberg/IcebergNativeCatalogFactory.java b/presto-iceberg/src/main/java/com/facebook/presto/iceberg/IcebergNativeCatalogFactory.java index 4d23a97a3ff71..5a24e94f1d8ea 100644 --- a/presto-iceberg/src/main/java/com/facebook/presto/iceberg/IcebergNativeCatalogFactory.java +++ b/presto-iceberg/src/main/java/com/facebook/presto/iceberg/IcebergNativeCatalogFactory.java @@ -52,6 +52,7 @@ public class IcebergNativeCatalogFactory private final String catalogName; protected final CatalogType catalogType; private final String catalogWarehouse; + private final String catalogWarehouseDataDir; protected final IcebergConfig icebergConfig; private final List hadoopConfigResources; @@ -69,6 +70,7 @@ public IcebergNativeCatalogFactory( this.icebergConfig = requireNonNull(config, "config is null"); this.catalogType = config.getCatalogType(); this.catalogWarehouse = config.getCatalogWarehouse(); + this.catalogWarehouseDataDir = config.getCatalogWarehouseDataDir(); this.hadoopConfigResources = icebergConfig.getHadoopConfigResources(); this.s3ConfigurationUpdater = requireNonNull(s3ConfigurationUpdater, "s3ConfigurationUpdater is null"); this.gcsConfigurationInitialize = requireNonNull(gcsConfigurationInitialize, "gcsConfigurationInitialize is null"); @@ -90,6 +92,11 @@ public Catalog getCatalog(ConnectorSession session) } } + public String getCatalogWarehouseDataDir() + { + return this.catalogWarehouseDataDir; + } + public SupportsNamespaces getNamespaces(ConnectorSession session) { Catalog catalog = getCatalog(session); diff --git a/presto-iceberg/src/main/java/com/facebook/presto/iceberg/IcebergNativeMetadata.java b/presto-iceberg/src/main/java/com/facebook/presto/iceberg/IcebergNativeMetadata.java index dd37bed769683..b79c1d29e622e 100644 --- a/presto-iceberg/src/main/java/com/facebook/presto/iceberg/IcebergNativeMetadata.java +++ b/presto-iceberg/src/main/java/com/facebook/presto/iceberg/IcebergNativeMetadata.java @@ -58,6 +58,7 @@ import java.util.concurrent.ConcurrentMap; import java.util.stream.Stream; +import static com.facebook.presto.iceberg.CatalogType.HADOOP; import static com.facebook.presto.iceberg.IcebergErrorCode.ICEBERG_COMMIT_ERROR; import static com.facebook.presto.iceberg.IcebergSessionProperties.getCompressionCodec; import static com.facebook.presto.iceberg.IcebergTableProperties.getFileFormat; @@ -70,7 +71,6 @@ import static com.facebook.presto.iceberg.IcebergUtil.getColumns; import static com.facebook.presto.iceberg.IcebergUtil.getNativeIcebergTable; import static com.facebook.presto.iceberg.IcebergUtil.getNativeIcebergView; -import static com.facebook.presto.iceberg.IcebergUtil.populateTableProperties; import static com.facebook.presto.iceberg.PartitionFields.parsePartitionFields; import static com.facebook.presto.iceberg.PartitionSpecConverter.toPrestoPartitionSpec; import static com.facebook.presto.iceberg.SchemaConverter.toPrestoSchema; @@ -95,6 +95,7 @@ public class IcebergNativeMetadata { private static final String VIEW_DIALECT = "presto"; + private final Optional warehouseDataDir; private final IcebergNativeCatalogFactory catalogFactory; private final CatalogType catalogType; private final ConcurrentMap icebergViews = new ConcurrentHashMap<>(); @@ -113,6 +114,7 @@ public IcebergNativeMetadata( super(typeManager, functionResolution, rowExpressionService, commitTaskCodec, nodeVersion, filterStatsCalculatorService, statisticsFileCache); this.catalogFactory = requireNonNull(catalogFactory, "catalogFactory is null"); this.catalogType = requireNonNull(catalogType, "catalogType is null"); + this.warehouseDataDir = Optional.ofNullable(catalogFactory.getCatalogWarehouseDataDir()); } @Override @@ -320,20 +322,21 @@ public ConnectorOutputTableHandle beginCreateTable(ConnectorSession session, Con try { TableIdentifier tableIdentifier = toIcebergTableIdentifier(schemaTableName, catalogFactory.isNestedNamespaceEnabled()); String targetPath = getTableLocation(tableMetadata.getProperties()); + Map tableProperties = populateTableProperties(tableMetadata, fileFormat, session); if (!isNullOrEmpty(targetPath)) { transaction = catalogFactory.getCatalog(session).newCreateTableTransaction( tableIdentifier, schema, partitionSpec, targetPath, - populateTableProperties(tableMetadata, fileFormat, session)); + tableProperties); } else { transaction = catalogFactory.getCatalog(session).newCreateTableTransaction( tableIdentifier, schema, partitionSpec, - populateTableProperties(tableMetadata, fileFormat, session)); + tableProperties); } } catch (AlreadyExistsException e) { @@ -403,4 +406,12 @@ public void unregisterTable(ConnectorSession clientSession, SchemaTableName sche { catalogFactory.getCatalog(clientSession).dropTable(toIcebergTableIdentifier(schemaTableName, catalogFactory.isNestedNamespaceEnabled()), false); } + + protected Optional getDataLocationBasedOnWarehouseDataDir(SchemaTableName schemaTableName) + { + if (!catalogType.equals(HADOOP)) { + return Optional.empty(); + } + return warehouseDataDir.map(base -> base + schemaTableName.getSchemaName() + "/" + schemaTableName.getTableName()); + } } diff --git a/presto-iceberg/src/main/java/com/facebook/presto/iceberg/IcebergTableProperties.java b/presto-iceberg/src/main/java/com/facebook/presto/iceberg/IcebergTableProperties.java index 1bb8976176e86..945fd525d3d29 100644 --- a/presto-iceberg/src/main/java/com/facebook/presto/iceberg/IcebergTableProperties.java +++ b/presto-iceberg/src/main/java/com/facebook/presto/iceberg/IcebergTableProperties.java @@ -34,6 +34,7 @@ import static com.google.common.collect.ImmutableList.toImmutableList; import static java.util.Locale.ENGLISH; import static org.apache.iceberg.TableProperties.UPDATE_MODE; +import static org.apache.iceberg.TableProperties.WRITE_DATA_LOCATION; public class IcebergTableProperties { @@ -92,6 +93,11 @@ public IcebergTableProperties(IcebergConfig icebergConfig) false, value -> (List) value, value -> value)) + .add(stringProperty( + WRITE_DATA_LOCATION, + "File system location URI for the table's data and delete files", + null, + false)) .add(stringProperty( FORMAT_VERSION, "Format version for the table", @@ -182,6 +188,11 @@ public static String getTableLocation(Map tableProperties) return (String) tableProperties.get(LOCATION_PROPERTY); } + public static String getWriteDataLocation(Map tableProperties) + { + return (String) tableProperties.get(WRITE_DATA_LOCATION); + } + public static String getFormatVersion(Map tableProperties) { return (String) tableProperties.get(FORMAT_VERSION); diff --git a/presto-iceberg/src/main/java/com/facebook/presto/iceberg/IcebergUtil.java b/presto-iceberg/src/main/java/com/facebook/presto/iceberg/IcebergUtil.java index 229b77d61f8f4..622aa3ff6a3a4 100644 --- a/presto-iceberg/src/main/java/com/facebook/presto/iceberg/IcebergUtil.java +++ b/presto-iceberg/src/main/java/com/facebook/presto/iceberg/IcebergUtil.java @@ -38,7 +38,6 @@ import com.facebook.presto.spi.ColumnHandle; import com.facebook.presto.spi.ConnectorSession; import com.facebook.presto.spi.ConnectorTableHandle; -import com.facebook.presto.spi.ConnectorTableMetadata; import com.facebook.presto.spi.Constraint; import com.facebook.presto.spi.PrestoException; import com.facebook.presto.spi.SchemaTableName; @@ -69,7 +68,6 @@ import org.apache.iceberg.StructLike; import org.apache.iceberg.Table; import org.apache.iceberg.TableOperations; -import org.apache.iceberg.TableProperties; import org.apache.iceberg.TableScan; import org.apache.iceberg.catalog.Catalog; import org.apache.iceberg.catalog.ViewCatalog; @@ -140,10 +138,7 @@ import static com.facebook.presto.iceberg.IcebergErrorCode.ICEBERG_INVALID_TABLE_TIMESTAMP; import static com.facebook.presto.iceberg.IcebergMetadataColumn.isMetadataColumnId; import static com.facebook.presto.iceberg.IcebergPartitionType.IDENTITY; -import static com.facebook.presto.iceberg.IcebergSessionProperties.getCompressionCodec; import static com.facebook.presto.iceberg.IcebergSessionProperties.isMergeOnReadModeEnabled; -import static com.facebook.presto.iceberg.IcebergTableProperties.getCommitRetries; -import static com.facebook.presto.iceberg.IcebergTableProperties.getFormatVersion; import static com.facebook.presto.iceberg.TypeConverter.toIcebergType; import static com.facebook.presto.iceberg.TypeConverter.toPrestoType; import static com.facebook.presto.iceberg.util.IcebergPrestoModelConverters.toIcebergTableIdentifier; @@ -183,12 +178,10 @@ import static org.apache.iceberg.CatalogProperties.IO_MANIFEST_CACHE_MAX_TOTAL_BYTES; import static org.apache.iceberg.LocationProviders.locationsFor; import static org.apache.iceberg.MetadataTableUtils.createMetadataTableInstance; -import static org.apache.iceberg.TableProperties.COMMIT_NUM_RETRIES; import static org.apache.iceberg.TableProperties.DEFAULT_FILE_FORMAT; import static org.apache.iceberg.TableProperties.DEFAULT_FILE_FORMAT_DEFAULT; import static org.apache.iceberg.TableProperties.DELETE_MODE; import static org.apache.iceberg.TableProperties.DELETE_MODE_DEFAULT; -import static org.apache.iceberg.TableProperties.FORMAT_VERSION; import static org.apache.iceberg.TableProperties.MERGE_MODE; import static org.apache.iceberg.TableProperties.METADATA_DELETE_AFTER_COMMIT_ENABLED; import static org.apache.iceberg.TableProperties.METADATA_DELETE_AFTER_COMMIT_ENABLED_DEFAULT; @@ -196,13 +189,15 @@ import static org.apache.iceberg.TableProperties.METADATA_PREVIOUS_VERSIONS_MAX_DEFAULT; import static org.apache.iceberg.TableProperties.METRICS_MAX_INFERRED_COLUMN_DEFAULTS; import static org.apache.iceberg.TableProperties.METRICS_MAX_INFERRED_COLUMN_DEFAULTS_DEFAULT; -import static org.apache.iceberg.TableProperties.ORC_COMPRESSION; -import static org.apache.iceberg.TableProperties.PARQUET_COMPRESSION; +import static org.apache.iceberg.TableProperties.OBJECT_STORE_PATH; import static org.apache.iceberg.TableProperties.SPLIT_SIZE; import static org.apache.iceberg.TableProperties.SPLIT_SIZE_DEFAULT; import static org.apache.iceberg.TableProperties.UPDATE_MODE; import static org.apache.iceberg.TableProperties.UPDATE_MODE_DEFAULT; +import static org.apache.iceberg.TableProperties.WRITE_DATA_LOCATION; +import static org.apache.iceberg.TableProperties.WRITE_FOLDER_STORAGE_LOCATION; import static org.apache.iceberg.TableProperties.WRITE_LOCATION_PROVIDER_IMPL; +import static org.apache.iceberg.TableProperties.WRITE_METADATA_LOCATION; import static org.apache.iceberg.types.Type.TypeID.BINARY; import static org.apache.iceberg.types.Type.TypeID.FIXED; @@ -1140,53 +1135,6 @@ public void close() } } - public static Map populateTableProperties(ConnectorTableMetadata tableMetadata, FileFormat fileFormat, ConnectorSession session) - { - ImmutableMap.Builder propertiesBuilder = ImmutableMap.builderWithExpectedSize(5); - Integer commitRetries = getCommitRetries(tableMetadata.getProperties()); - propertiesBuilder.put(DEFAULT_FILE_FORMAT, fileFormat.toString()); - propertiesBuilder.put(COMMIT_NUM_RETRIES, String.valueOf(commitRetries)); - switch (fileFormat) { - case PARQUET: - propertiesBuilder.put(PARQUET_COMPRESSION, getCompressionCodec(session).getParquetCompressionCodec().get().toString()); - break; - case ORC: - propertiesBuilder.put(ORC_COMPRESSION, getCompressionCodec(session).getOrcCompressionKind().name()); - break; - } - if (tableMetadata.getComment().isPresent()) { - propertiesBuilder.put(TABLE_COMMENT, tableMetadata.getComment().get()); - } - - String formatVersion = getFormatVersion(tableMetadata.getProperties()); - verify(formatVersion != null, "Format version cannot be null"); - propertiesBuilder.put(FORMAT_VERSION, formatVersion); - - if (parseFormatVersion(formatVersion) < MIN_FORMAT_VERSION_FOR_DELETE) { - propertiesBuilder.put(DELETE_MODE, RowLevelOperationMode.COPY_ON_WRITE.modeName()); - propertiesBuilder.put(UPDATE_MODE, RowLevelOperationMode.COPY_ON_WRITE.modeName()); - } - else { - RowLevelOperationMode deleteMode = IcebergTableProperties.getDeleteMode(tableMetadata.getProperties()); - propertiesBuilder.put(DELETE_MODE, deleteMode.modeName()); - RowLevelOperationMode updateMode = IcebergTableProperties.getUpdateMode(tableMetadata.getProperties()); - propertiesBuilder.put(UPDATE_MODE, updateMode.modeName()); - } - - Integer metadataPreviousVersionsMax = IcebergTableProperties.getMetadataPreviousVersionsMax(tableMetadata.getProperties()); - propertiesBuilder.put(METADATA_PREVIOUS_VERSIONS_MAX, String.valueOf(metadataPreviousVersionsMax)); - - Boolean metadataDeleteAfterCommit = IcebergTableProperties.isMetadataDeleteAfterCommit(tableMetadata.getProperties()); - propertiesBuilder.put(METADATA_DELETE_AFTER_COMMIT_ENABLED, String.valueOf(metadataDeleteAfterCommit)); - - Integer metricsMaxInferredColumn = IcebergTableProperties.getMetricsMaxInferredColumn(tableMetadata.getProperties()); - propertiesBuilder.put(METRICS_MAX_INFERRED_COLUMN_DEFAULTS, String.valueOf(metricsMaxInferredColumn)); - - propertiesBuilder.put(SPLIT_SIZE, String.valueOf(IcebergTableProperties.getTargetSplitSize(tableMetadata.getProperties()))); - - return propertiesBuilder.build(); - } - public static int parseFormatVersion(String formatVersion) { try { @@ -1265,7 +1213,7 @@ public static Optional partitionDataFromStructLike(PartitionSpec */ public static String metadataLocation(Table icebergTable) { - String metadataLocation = icebergTable.properties().get(TableProperties.WRITE_METADATA_LOCATION); + String metadataLocation = icebergTable.properties().get(WRITE_METADATA_LOCATION); if (metadataLocation != null) { return String.format("%s", LocationUtil.stripTrailingSlash(metadataLocation)); @@ -1282,11 +1230,11 @@ public static String metadataLocation(Table icebergTable) public static String dataLocation(Table icebergTable) { Map properties = icebergTable.properties(); - String dataLocation = properties.get(TableProperties.WRITE_DATA_LOCATION); + String dataLocation = properties.get(WRITE_DATA_LOCATION); if (dataLocation == null) { - dataLocation = properties.get(TableProperties.OBJECT_STORE_PATH); + dataLocation = properties.get(OBJECT_STORE_PATH); if (dataLocation == null) { - dataLocation = properties.get(TableProperties.WRITE_FOLDER_STORAGE_LOCATION); + dataLocation = properties.get(WRITE_FOLDER_STORAGE_LOCATION); if (dataLocation == null) { dataLocation = String.format("%s/data", icebergTable.location()); } diff --git a/presto-iceberg/src/test/java/com/facebook/presto/iceberg/IcebergDistributedSmokeTestBase.java b/presto-iceberg/src/test/java/com/facebook/presto/iceberg/IcebergDistributedSmokeTestBase.java index e950702b8fe64..9f872845c9514 100644 --- a/presto-iceberg/src/test/java/com/facebook/presto/iceberg/IcebergDistributedSmokeTestBase.java +++ b/presto-iceberg/src/test/java/com/facebook/presto/iceberg/IcebergDistributedSmokeTestBase.java @@ -23,13 +23,14 @@ import com.facebook.presto.testing.QueryRunner; import com.facebook.presto.testing.assertions.Assert; import com.facebook.presto.tests.AbstractTestIntegrationSmokeTest; +import org.apache.hadoop.fs.Path; import org.apache.iceberg.Table; import org.apache.iceberg.UpdateProperties; import org.intellij.lang.annotations.Language; import org.testng.annotations.DataProvider; import org.testng.annotations.Test; -import java.nio.file.Path; +import java.io.IOException; import java.util.function.BiConsumer; import java.util.regex.Matcher; import java.util.regex.Pattern; @@ -47,6 +48,7 @@ import static com.google.common.base.Preconditions.checkArgument; import static com.google.common.collect.Iterables.getOnlyElement; import static java.lang.String.format; +import static java.nio.file.Files.createTempDirectory; import static java.util.Locale.ENGLISH; import static java.util.Objects.requireNonNull; import static java.util.stream.Collectors.joining; @@ -152,6 +154,91 @@ public void testShowCreateTable() ")", schemaName, getLocation(schemaName, "orders"))); } + @Test + public void testTableWithSpecifiedWriteDataLocation() + throws IOException + { + String tableName = "test_table_with_specified_write_data_location"; + String dataWriteLocation = createTempDirectory(tableName).toAbsolutePath().toString(); + try { + assertUpdate(format("create table %s(a int, b varchar) with (\"write.data.path\" = '%s')", tableName, dataWriteLocation)); + assertUpdate(format("insert into %s values(1, '1001'), (2, '1002'), (3, '1003')", tableName), 3); + assertQuery("select * from " + tableName, "values(1, '1001'), (2, '1002'), (3, '1003')"); + assertUpdate(format("delete from %s where a > 2", tableName), 1); + assertQuery("select * from " + tableName, "values(1, '1001'), (2, '1002')"); + } + finally { + try { + getQueryRunner().execute("drop table if exists " + tableName); + } + catch (Exception e) { + // ignored for hive catalog compatibility + } + } + } + + @Test + public void testPartitionedTableWithSpecifiedWriteDataLocation() + throws IOException + { + String tableName = "test_partitioned_table_with_specified_write_data_location"; + String dataWriteLocation = createTempDirectory(tableName).toAbsolutePath().toString(); + try { + assertUpdate(format("create table %s(a int, b varchar) with (partitioning = ARRAY['a'], \"write.data.path\" = '%s')", tableName, dataWriteLocation)); + assertUpdate(format("insert into %s values(1, '1001'), (2, '1002'), (3, '1003')", tableName), 3); + assertQuery("select * from " + tableName, "values(1, '1001'), (2, '1002'), (3, '1003')"); + assertUpdate(format("delete from %s where a > 2", tableName), 1); + assertQuery("select * from " + tableName, "values(1, '1001'), (2, '1002')"); + } + finally { + try { + getQueryRunner().execute("drop table if exists " + tableName); + } + catch (Exception e) { + // ignored for hive catalog compatibility + } + } + } + + @Test + public void testShowCreateTableWithSpecifiedWriteDataLocation() + throws IOException + { + String tableName = "test_show_table_with_specified_write_data_location"; + String dataWriteLocation = createTempDirectory("test1").toAbsolutePath().toString(); + try { + assertUpdate(format("CREATE TABLE %s(a int, b varchar) with (\"write.data.path\" = '%s')", tableName, dataWriteLocation)); + String schemaName = getSession().getSchema().get(); + String location = getLocation(schemaName, tableName); + String createTableSql = "CREATE TABLE iceberg.%s.%s (\n" + + " \"a\" integer,\n" + + " \"b\" varchar\n" + + ")\n" + + "WITH (\n" + + " delete_mode = 'merge-on-read',\n" + + " format = 'PARQUET',\n" + + " format_version = '2',\n" + + " location = '%s',\n" + + " metadata_delete_after_commit = false,\n" + + " metadata_previous_versions_max = 100,\n" + + " metrics_max_inferred_column = 100,\n" + + " \"read.split.target-size\" = 134217728,\n" + + " \"write.data.path\" = '%s',\n" + + " \"write.update.mode\" = 'merge-on-read'\n" + + ")"; + assertThat(computeActual("SHOW CREATE TABLE " + tableName).getOnlyValue()) + .isEqualTo(format(createTableSql, schemaName, tableName, location, dataWriteLocation)); + } + finally { + try { + getQueryRunner().execute("DROP TABLE IF EXISTS " + tableName); + } + catch (Exception e) { + // ignored for hive catalog compatibility + } + } + } + @Test public void testDecimal() { @@ -714,7 +801,7 @@ private void testSchemaEvolution(Session session, FileFormat fileFormat) } @Test - private void testCreateTableLike() + protected void testCreateTableLike() { Session session = getSession(); String schemaName = session.getSchema().get(); @@ -883,7 +970,7 @@ private void testWithAllFormatVersions(BiConsumer test) test.accept("2", "merge-on-read"); } - private String getTablePropertiesString(String tableName) + protected String getTablePropertiesString(String tableName) { MaterializedResult showCreateTable = computeActual("SHOW CREATE TABLE " + tableName); String createTable = (String) getOnlyElement(showCreateTable.getOnlyColumnAsSet()); @@ -1216,8 +1303,8 @@ protected String getLocation(String schema, String table) protected Path getCatalogDirectory() { - Path dataDirectory = getDistributedQueryRunner().getCoordinator().getDataDirectory(); - return getIcebergDataDirectoryPath(dataDirectory, catalogType.name(), new IcebergConfig().getFileFormat(), false); + java.nio.file.Path dataDirectory = getDistributedQueryRunner().getCoordinator().getDataDirectory(); + return new Path(getIcebergDataDirectoryPath(dataDirectory, catalogType.name(), new IcebergConfig().getFileFormat(), false).toFile().toURI()); } protected Table getIcebergTable(ConnectorSession session, String namespace, String tableName) @@ -1538,7 +1625,7 @@ public void testRegisterTableWithInvalidLocation() assertUpdate("CREATE TABLE " + tableName + " (id integer, value integer)"); assertUpdate("INSERT INTO " + tableName + " VALUES(1, 1)", 1); - String metadataLocation = getLocation(schemaName, tableName).replace("//", "/") + "_invalid"; + String metadataLocation = getLocation(schemaName, tableName).replace("//", "") + "_invalid"; @Language("RegExp") String errorMessage = format("Unable to find metadata at location %s/%s", metadataLocation, METADATA_FOLDER_NAME); assertQueryFails("CALL system.register_table ('" + schemaName + "', '" + tableName + "', '" + metadataLocation + "')", errorMessage); diff --git a/presto-iceberg/src/test/java/com/facebook/presto/iceberg/IcebergDistributedTestBase.java b/presto-iceberg/src/test/java/com/facebook/presto/iceberg/IcebergDistributedTestBase.java index 89a4d678808d7..9de94a10f6889 100644 --- a/presto-iceberg/src/test/java/com/facebook/presto/iceberg/IcebergDistributedTestBase.java +++ b/presto-iceberg/src/test/java/com/facebook/presto/iceberg/IcebergDistributedTestBase.java @@ -32,6 +32,9 @@ import com.facebook.presto.hive.HiveHdfsConfiguration; import com.facebook.presto.hive.MetastoreClientConfig; import com.facebook.presto.hive.authentication.NoHdfsAuthentication; +import com.facebook.presto.hive.s3.HiveS3Config; +import com.facebook.presto.hive.s3.PrestoS3ConfigurationUpdater; +import com.facebook.presto.hive.s3.S3ConfigurationUpdater; import com.facebook.presto.iceberg.delete.DeleteFile; import com.facebook.presto.metadata.CatalogMetadata; import com.facebook.presto.metadata.Metadata; @@ -63,6 +66,7 @@ import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; import org.apache.iceberg.BaseTable; import org.apache.iceberg.CatalogUtil; import org.apache.iceberg.FileScanTask; @@ -102,7 +106,6 @@ import java.lang.reflect.Field; import java.net.URI; import java.nio.ByteBuffer; -import java.nio.file.Path; import java.time.LocalDateTime; import java.time.LocalTime; import java.time.format.DateTimeFormatter; @@ -156,8 +159,8 @@ import static com.facebook.presto.tests.sql.TestTable.randomTableSuffix; import static com.facebook.presto.type.DecimalParametricType.DECIMAL; import static com.google.common.collect.ImmutableMap.toImmutableMap; -import static com.google.common.io.Files.createTempDir; import static java.lang.String.format; +import static java.nio.file.Files.createTempDirectory; import static java.util.Objects.requireNonNull; import static java.util.UUID.randomUUID; import static java.util.function.Function.identity; @@ -613,9 +616,10 @@ private void testPartitionedByTimestampTypeForFormat(Session session, FileFormat @Test public void testCreateTableWithCustomLocation() + throws IOException { String tableName = "test_table_with_custom_location"; - URI tableTargetURI = createTempDir().toURI(); + URI tableTargetURI = createTempDirectory(tableName).toUri(); try { assertQuerySucceeds(format("create table %s (a int, b varchar)" + " with (location = '%s')", tableName, tableTargetURI.toString())); @@ -1495,12 +1499,13 @@ public void testWithoutSortOrder() public boolean isFileSorted(String path, String sortColumnName, String sortOrder) throws IOException { - Configuration configuration = new Configuration(); - try (ParquetReader reader = ParquetReader.builder(new GroupReadSupport(), new org.apache.hadoop.fs.Path(path)) + Path filePath = new Path(path); + Configuration configuration = getHdfsEnvironment().getConfiguration(new HdfsContext(SESSION), filePath); + try (ParquetReader reader = ParquetReader.builder(new GroupReadSupport(), new Path(path)) .withConf(configuration) .build()) { Group record; - ParquetMetadata readFooter = ParquetFileReader.readFooter(configuration, new org.apache.hadoop.fs.Path(path)); + ParquetMetadata readFooter = ParquetFileReader.readFooter(configuration, filePath); MessageType schema = readFooter.getFileMetaData().getSchema(); Double previousValue = null; while ((record = reader.read()) != null) { @@ -1758,14 +1763,14 @@ public void testMetadataVersionsMaintainingProperties() // Table `test_table_with_default_setting_properties`'s current metadata record all 5 previous metadata files assertEquals(defaultTableMetadata.previousFiles().size(), 5); - FileSystem fileSystem = getHdfsEnvironment().getFileSystem(new HdfsContext(SESSION), new org.apache.hadoop.fs.Path(settingTable.location())); + FileSystem fileSystem = getHdfsEnvironment().getFileSystem(new HdfsContext(SESSION), new Path(settingTable.location())); // Table `test_table_with_setting_properties`'s all existing metadata files count is 2 - FileStatus[] settingTableFiles = fileSystem.listStatus(new org.apache.hadoop.fs.Path(settingTable.location(), "metadata"), name -> name.getName().contains(METADATA_FILE_EXTENSION)); + FileStatus[] settingTableFiles = fileSystem.listStatus(new Path(settingTable.location(), "metadata"), name -> name.getName().contains(METADATA_FILE_EXTENSION)); assertEquals(settingTableFiles.length, 2); // Table `test_table_with_default_setting_properties`'s all existing metadata files count is 6 - FileStatus[] defaultTableFiles = fileSystem.listStatus(new org.apache.hadoop.fs.Path(defaultTable.location(), "metadata"), name -> name.getName().contains(METADATA_FILE_EXTENSION)); + FileStatus[] defaultTableFiles = fileSystem.listStatus(new Path(defaultTable.location(), "metadata"), name -> name.getName().contains(METADATA_FILE_EXTENSION)); assertEquals(defaultTableFiles.length, 6); } finally { @@ -2431,12 +2436,12 @@ private void testCheckDeleteFiles(Table icebergTable, int expectedSize, List writer = Parquet.writeDeletes(HadoopOutputFile.fromPath(path, fs)) .createWriterFunc(GenericParquetWriter::buildWriter) .forTable(icebergTable) @@ -2463,13 +2468,13 @@ private void writeEqualityDeleteToNationTable(Table icebergTable, Map overwriteValues, Map partitionValues) throws Exception { - Path dataDirectory = getDistributedQueryRunner().getCoordinator().getDataDirectory(); + java.nio.file.Path dataDirectory = getDistributedQueryRunner().getCoordinator().getDataDirectory(); File metastoreDir = getIcebergDataDirectoryPath(dataDirectory, catalogType.name(), new IcebergConfig().getFileFormat(), false).toFile(); - org.apache.hadoop.fs.Path metadataDir = new org.apache.hadoop.fs.Path(metastoreDir.toURI()); + Path metadataDir = new Path(metastoreDir.toURI()); String deleteFileName = "delete_file_" + randomUUID(); FileSystem fs = getHdfsEnvironment().getFileSystem(new HdfsContext(SESSION), metadataDir); Schema deleteRowSchema = icebergTable.schema().select(overwriteValues.keySet()); - Parquet.DeleteWriteBuilder writerBuilder = Parquet.writeDeletes(HadoopOutputFile.fromPath(new org.apache.hadoop.fs.Path(metadataDir, deleteFileName), fs)) + Parquet.DeleteWriteBuilder writerBuilder = Parquet.writeDeletes(HadoopOutputFile.fromPath(new Path(metadataDir, deleteFileName), fs)) .forTable(icebergTable) .rowSchema(deleteRowSchema) .createWriterFunc(GenericParquetWriter::buildWriter) @@ -2490,13 +2495,19 @@ private void writeEqualityDeleteToNationTable(Table icebergTable, Map {}), + ImmutableSet.of(), hiveClientConfig); return new HdfsEnvironment(hdfsConfiguration, metastoreClientConfig, new NoHdfsAuthentication()); } @@ -2518,18 +2529,18 @@ protected Table loadTable(String tableName) protected Map getProperties() { - File metastoreDir = getCatalogDirectory(); + Path metastoreDir = getCatalogDirectory(); return ImmutableMap.of("warehouse", metastoreDir.toString()); } - protected File getCatalogDirectory() + protected Path getCatalogDirectory() { - Path dataDirectory = getDistributedQueryRunner().getCoordinator().getDataDirectory(); + java.nio.file.Path dataDirectory = getDistributedQueryRunner().getCoordinator().getDataDirectory(); switch (catalogType) { case HIVE: case HADOOP: case NESSIE: - return getIcebergDataDirectoryPath(dataDirectory, catalogType.name(), new IcebergConfig().getFileFormat(), false).toFile(); + return new Path(getIcebergDataDirectoryPath(dataDirectory, catalogType.name(), new IcebergConfig().getFileFormat(), false).toFile().toURI()); } throw new PrestoException(NOT_SUPPORTED, "Unsupported Presto Iceberg catalog type " + catalogType); diff --git a/presto-iceberg/src/test/java/com/facebook/presto/iceberg/IcebergQueryRunner.java b/presto-iceberg/src/test/java/com/facebook/presto/iceberg/IcebergQueryRunner.java index 6d71f2dcc8d07..217522ea721a6 100644 --- a/presto-iceberg/src/test/java/com/facebook/presto/iceberg/IcebergQueryRunner.java +++ b/presto-iceberg/src/test/java/com/facebook/presto/iceberg/IcebergQueryRunner.java @@ -232,15 +232,14 @@ public IcebergQueryRunner build() Path icebergDataDirectory = getIcebergDataDirectoryPath(queryRunner.getCoordinator().getDataDirectory(), catalogType.name(), format, addStorageFormatToPath); - Map icebergProperties = ImmutableMap.builder() - .put("iceberg.file-format", format.name()) - .put("iceberg.catalog.type", catalogType.name()) - .putAll(getConnectorProperties(catalogType, icebergDataDirectory)) - .putAll(extraConnectorProperties) - .build(); - - queryRunner.createCatalog(ICEBERG_CATALOG, "iceberg", icebergProperties); - icebergCatalogs.put(ICEBERG_CATALOG, icebergProperties); + Map icebergProperties = new HashMap<>(); + icebergProperties.put("iceberg.file-format", format.name()); + icebergProperties.put("iceberg.catalog.type", catalogType.name()); + icebergProperties.putAll(getConnectorProperties(catalogType, icebergDataDirectory)); + icebergProperties.putAll(extraConnectorProperties); + + queryRunner.createCatalog(ICEBERG_CATALOG, "iceberg", ImmutableMap.copyOf(icebergProperties)); + icebergCatalogs.put(ICEBERG_CATALOG, ImmutableMap.copyOf(icebergProperties)); if (addJmxPlugin) { queryRunner.createCatalog("jmx", "jmx"); diff --git a/presto-iceberg/src/test/java/com/facebook/presto/iceberg/TestIcebergConfig.java b/presto-iceberg/src/test/java/com/facebook/presto/iceberg/TestIcebergConfig.java index 588b7273d44c5..bc66bae273792 100644 --- a/presto-iceberg/src/test/java/com/facebook/presto/iceberg/TestIcebergConfig.java +++ b/presto-iceberg/src/test/java/com/facebook/presto/iceberg/TestIcebergConfig.java @@ -49,6 +49,7 @@ public void testDefaults() .setCompressionCodec(GZIP) .setCatalogType(HIVE) .setCatalogWarehouse(null) + .setCatalogWarehouseDataDir(null) .setCatalogCacheSize(10) .setHadoopConfigResources(null) .setHiveStatisticsMergeFlags("") @@ -81,6 +82,7 @@ public void testExplicitPropertyMappings() .put("iceberg.compression-codec", "NONE") .put("iceberg.catalog.type", "HADOOP") .put("iceberg.catalog.warehouse", "path") + .put("iceberg.catalog.hadoop.warehouse.datadir", "path_data_dir") .put("iceberg.catalog.cached-catalog-num", "6") .put("iceberg.hadoop.config.resources", "/etc/hadoop/conf/core-site.xml") .put("iceberg.max-partitions-per-writer", "222") @@ -110,6 +112,7 @@ public void testExplicitPropertyMappings() .setCompressionCodec(NONE) .setCatalogType(HADOOP) .setCatalogWarehouse("path") + .setCatalogWarehouseDataDir("path_data_dir") .setCatalogCacheSize(6) .setHadoopConfigResources("/etc/hadoop/conf/core-site.xml") .setMaxPartitionsPerWriter(222) diff --git a/presto-iceberg/src/test/java/com/facebook/presto/iceberg/TestIcebergFileWriter.java b/presto-iceberg/src/test/java/com/facebook/presto/iceberg/TestIcebergFileWriter.java index c9b84e4ee2904..014cb8a20b2c6 100644 --- a/presto-iceberg/src/test/java/com/facebook/presto/iceberg/TestIcebergFileWriter.java +++ b/presto-iceberg/src/test/java/com/facebook/presto/iceberg/TestIcebergFileWriter.java @@ -25,10 +25,13 @@ import com.facebook.presto.hive.FileFormatDataSourceStats; import com.facebook.presto.hive.HdfsContext; import com.facebook.presto.hive.HdfsEnvironment; +import com.facebook.presto.hive.HiveClientConfig; import com.facebook.presto.hive.HiveCompressionCodec; import com.facebook.presto.hive.HiveDwrfEncryptionProvider; +import com.facebook.presto.hive.MetastoreClientConfig; import com.facebook.presto.hive.NodeVersion; import com.facebook.presto.hive.OrcFileWriterConfig; +import com.facebook.presto.hive.s3.HiveS3Config; import com.facebook.presto.metadata.SessionPropertyManager; import com.facebook.presto.parquet.FileParquetDataSource; import com.facebook.presto.parquet.cache.MetadataReader; @@ -61,6 +64,7 @@ import static com.facebook.presto.common.type.VarbinaryType.VARBINARY; import static com.facebook.presto.common.type.VarcharType.VARCHAR; import static com.facebook.presto.iceberg.IcebergAbstractMetadata.toIcebergSchema; +import static com.facebook.presto.iceberg.IcebergDistributedTestBase.getHdfsEnvironment; import static com.facebook.presto.iceberg.IcebergQueryRunner.ICEBERG_CATALOG; import static com.facebook.presto.iceberg.IcebergSessionProperties.dataSizeSessionProperty; import static com.facebook.presto.metadata.SessionPropertyManager.createTestingSessionPropertyManager; @@ -114,7 +118,7 @@ public void setup() this.connectorSession = session.toConnectorSession(connectorId); TypeManager typeManager = new TestingTypeManager(); this.hdfsContext = new HdfsContext(connectorSession); - HdfsEnvironment hdfsEnvironment = IcebergDistributedTestBase.getHdfsEnvironment(); + HdfsEnvironment hdfsEnvironment = getHdfsEnvironment(new HiveClientConfig(), new MetastoreClientConfig(), new HiveS3Config()); this.icebergFileWriterFactory = new IcebergFileWriterFactory(hdfsEnvironment, typeManager, new FileFormatDataSourceStats(), new NodeVersion("test"), new OrcFileWriterConfig(), HiveDwrfEncryptionProvider.NO_ENCRYPTION); } diff --git a/presto-iceberg/src/test/java/com/facebook/presto/iceberg/TestIcebergSystemTables.java b/presto-iceberg/src/test/java/com/facebook/presto/iceberg/TestIcebergSystemTables.java index 92016c4e263f9..56195902ca8bf 100644 --- a/presto-iceberg/src/test/java/com/facebook/presto/iceberg/TestIcebergSystemTables.java +++ b/presto-iceberg/src/test/java/com/facebook/presto/iceberg/TestIcebergSystemTables.java @@ -255,15 +255,25 @@ public void testSessionPropertiesInManuallyStartedTransaction() } } + protected void checkTableProperties(String schemaName, String tableName, String deleteMode, String dataWriteLocation) + { + checkTableProperties(schemaName, tableName, deleteMode, 10, ImmutableMap.of("write.data.path", dataWriteLocation)); + } + protected void checkTableProperties(String tableName, String deleteMode) { - assertQuery(String.format("SHOW COLUMNS FROM test_schema.\"%s$properties\"", tableName), + checkTableProperties("test_schema", tableName, deleteMode, 9, ImmutableMap.of()); + } + + protected void checkTableProperties(String schemaName, String tableName, String deleteMode, int propertiesCount, Map additionalValidateProperties) + { + assertQuery(String.format("SHOW COLUMNS FROM %s.\"%s$properties\"", schemaName, tableName), "VALUES ('key', 'varchar', '', '')," + "('value', 'varchar', '', '')"); - assertQuery(String.format("SELECT COUNT(*) FROM test_schema.\"%s$properties\"", tableName), "VALUES 9"); + assertQuery(String.format("SELECT COUNT(*) FROM %s.\"%s$properties\"", schemaName, tableName), "VALUES " + propertiesCount); List materializedRows = computeActual(getSession(), - String.format("SELECT * FROM test_schema.\"%s$properties\"", tableName)).getMaterializedRows(); + String.format("SELECT * FROM %s.\"%s$properties\"", schemaName, tableName)).getMaterializedRows(); - assertThat(materializedRows).hasSize(9); + assertThat(materializedRows).hasSize(propertiesCount); assertThat(materializedRows) .anySatisfy(row -> assertThat(row) .isEqualTo(new MaterializedRow(MaterializedResult.DEFAULT_PRECISION, "write.delete.mode", deleteMode))) @@ -283,6 +293,11 @@ protected void checkTableProperties(String tableName, String deleteMode) .isEqualTo(new MaterializedRow(MaterializedResult.DEFAULT_PRECISION, "write.metadata.metrics.max-inferred-column-defaults", "100"))) .anySatisfy(row -> assertThat(row) .isEqualTo(new MaterializedRow(MaterializedResult.DEFAULT_PRECISION, IcebergTableProperties.TARGET_SPLIT_SIZE, Long.toString(DataSize.valueOf("128MB").toBytes())))); + + additionalValidateProperties.entrySet().stream() + .forEach(entry -> assertThat(materializedRows) + .anySatisfy(row -> assertThat(row) + .isEqualTo(new MaterializedRow(MaterializedResult.DEFAULT_PRECISION, entry.getKey(), entry.getValue())))); } protected void checkORCFormatTableProperties(String tableName, String deleteMode) diff --git a/presto-iceberg/src/test/java/com/facebook/presto/iceberg/TestIcebergSystemTablesHadoop.java b/presto-iceberg/src/test/java/com/facebook/presto/iceberg/TestIcebergSystemTablesHadoop.java new file mode 100644 index 0000000000000..3e252994399cc --- /dev/null +++ b/presto-iceberg/src/test/java/com/facebook/presto/iceberg/TestIcebergSystemTablesHadoop.java @@ -0,0 +1,51 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.facebook.presto.iceberg; + +import com.facebook.presto.testing.QueryRunner; +import org.testng.annotations.Test; + +import java.io.IOException; +import java.nio.file.Files; + +import static com.facebook.presto.iceberg.CatalogType.HADOOP; + +public class TestIcebergSystemTablesHadoop + extends TestIcebergSystemTables +{ + @Override + protected QueryRunner createQueryRunner() + throws Exception + { + return IcebergQueryRunner.builder() + .setCatalogType(HADOOP) + .build().getQueryRunner(); + } + + @Test + public void testPropertiesTableWithSpecifiedDataWriteLocation() + throws IOException + { + String dataLocation = Files.createTempDirectory("test_table_with_write_data_location").toAbsolutePath().toString(); + assertUpdate("CREATE SCHEMA test_schema_temp"); + try { + assertUpdate(String.format("CREATE TABLE test_schema_temp.test_table_with_write_data_location (_bigint BIGINT, _date DATE) WITH (partitioning = ARRAY['_date'], \"write.data.path\" = '%s')", dataLocation)); + checkTableProperties("test_schema_temp", "test_table_with_write_data_location", "merge-on-read", dataLocation); + } + finally { + assertUpdate("DROP TABLE test_schema_temp.test_table_with_write_data_location"); + assertUpdate("DROP SCHEMA test_schema_temp"); + } + } +} diff --git a/presto-iceberg/src/test/java/com/facebook/presto/iceberg/container/IcebergMinIODataLake.java b/presto-iceberg/src/test/java/com/facebook/presto/iceberg/container/IcebergMinIODataLake.java new file mode 100644 index 0000000000000..ff626fb4e1ea9 --- /dev/null +++ b/presto-iceberg/src/test/java/com/facebook/presto/iceberg/container/IcebergMinIODataLake.java @@ -0,0 +1,115 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.facebook.presto.iceberg.container; + +import com.amazonaws.auth.AWSStaticCredentialsProvider; +import com.amazonaws.auth.BasicAWSCredentials; +import com.amazonaws.client.builder.AwsClientBuilder; +import com.amazonaws.services.s3.AmazonS3; +import com.amazonaws.services.s3.AmazonS3ClientBuilder; +import com.facebook.presto.testing.containers.MinIOContainer; +import com.facebook.presto.util.AutoCloseableCloser; +import com.google.common.collect.ImmutableMap; +import org.testcontainers.containers.Network; + +import java.io.Closeable; +import java.io.IOException; +import java.util.concurrent.atomic.AtomicBoolean; + +import static java.util.Objects.requireNonNull; +import static org.testcontainers.containers.Network.newNetwork; + +public class IcebergMinIODataLake + implements Closeable +{ + public static final String ACCESS_KEY = "minioadmin"; + public static final String SECRET_KEY = "minioadmin"; + + private final String bucketName; + private final String warehouseDir; + private final MinIOContainer minIOContainer; + + private final AtomicBoolean isStarted = new AtomicBoolean(false); + private final AutoCloseableCloser closer = AutoCloseableCloser.create(); + + public IcebergMinIODataLake(String bucketName, String warehouseDir) + { + this.bucketName = requireNonNull(bucketName, "bucketName is null"); + this.warehouseDir = requireNonNull(warehouseDir, "warehouseDir is null"); + Network network = closer.register(newNetwork()); + this.minIOContainer = closer.register( + MinIOContainer.builder() + .withNetwork(network) + .withEnvVars(ImmutableMap.builder() + .put("MINIO_ACCESS_KEY", ACCESS_KEY) + .put("MINIO_SECRET_KEY", SECRET_KEY) + .build()) + .build()); + } + + public void start() + { + if (isStarted()) { + return; + } + try { + this.minIOContainer.start(); + AmazonS3 s3Client = AmazonS3ClientBuilder + .standard() + .withEndpointConfiguration(new AwsClientBuilder.EndpointConfiguration( + "http://localhost:" + minIOContainer.getMinioApiEndpoint().getPort(), + "us-east-1")) + .withPathStyleAccessEnabled(true) + .withCredentials(new AWSStaticCredentialsProvider( + new BasicAWSCredentials(ACCESS_KEY, SECRET_KEY))) + .build(); + s3Client.createBucket(this.bucketName); + s3Client.putObject(this.bucketName, this.warehouseDir, ""); + } + finally { + isStarted.set(true); + } + } + + public boolean isStarted() + { + return isStarted.get(); + } + + public void stop() + { + if (!isStarted()) { + return; + } + try { + closer.close(); + isStarted.set(false); + } + catch (Exception e) { + throw new RuntimeException("Failed to stop IcebergMinioDataLake", e); + } + } + + public MinIOContainer getMinio() + { + return minIOContainer; + } + + @Override + public void close() + throws IOException + { + stop(); + } +} diff --git a/presto-iceberg/src/test/java/com/facebook/presto/iceberg/hadoop/TestIcebergDistributedHadoop.java b/presto-iceberg/src/test/java/com/facebook/presto/iceberg/hadoop/TestIcebergDistributedHadoop.java index c08a2799514de..a5efac992adae 100644 --- a/presto-iceberg/src/test/java/com/facebook/presto/iceberg/hadoop/TestIcebergDistributedHadoop.java +++ b/presto-iceberg/src/test/java/com/facebook/presto/iceberg/hadoop/TestIcebergDistributedHadoop.java @@ -16,11 +16,12 @@ import com.facebook.presto.iceberg.IcebergDistributedTestBase; import org.testng.annotations.Test; +import java.io.IOException; import java.net.URI; import static com.facebook.presto.iceberg.CatalogType.HADOOP; -import static com.google.common.io.Files.createTempDir; import static java.lang.String.format; +import static java.nio.file.Files.createTempDirectory; @Test public class TestIcebergDistributedHadoop @@ -33,9 +34,10 @@ public TestIcebergDistributedHadoop() @Override public void testCreateTableWithCustomLocation() + throws IOException { String tableName = "test_hadoop_table_with_custom_location"; - URI tableTargetURI = createTempDir().toURI(); + URI tableTargetURI = createTempDirectory(tableName).toUri(); assertQueryFails(format("create table %s (a int, b varchar)" + " with (location = '%s')", tableName, tableTargetURI.toString()), "Cannot set a custom location for a path-based table.*"); } diff --git a/presto-iceberg/src/test/java/com/facebook/presto/iceberg/hadoop/TestIcebergDistributedOnS3Hadoop.java b/presto-iceberg/src/test/java/com/facebook/presto/iceberg/hadoop/TestIcebergDistributedOnS3Hadoop.java new file mode 100644 index 0000000000000..5e1eda7e5afd3 --- /dev/null +++ b/presto-iceberg/src/test/java/com/facebook/presto/iceberg/hadoop/TestIcebergDistributedOnS3Hadoop.java @@ -0,0 +1,136 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.facebook.presto.iceberg.hadoop; + +import com.facebook.presto.hive.HdfsContext; +import com.facebook.presto.hive.HdfsEnvironment; +import com.facebook.presto.hive.HiveClientConfig; +import com.facebook.presto.hive.MetastoreClientConfig; +import com.facebook.presto.hive.s3.HiveS3Config; +import com.facebook.presto.iceberg.IcebergDistributedTestBase; +import com.facebook.presto.iceberg.IcebergQueryRunner; +import com.facebook.presto.iceberg.container.IcebergMinIODataLake; +import com.facebook.presto.testing.QueryRunner; +import com.google.common.collect.ImmutableMap; +import com.google.common.net.HostAndPort; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.Path; +import org.apache.iceberg.CatalogUtil; +import org.apache.iceberg.Table; +import org.apache.iceberg.catalog.Catalog; +import org.apache.iceberg.catalog.TableIdentifier; +import org.testng.annotations.AfterClass; +import org.testng.annotations.BeforeClass; + +import java.io.IOException; +import java.net.URI; + +import static com.facebook.presto.iceberg.CatalogType.HADOOP; +import static com.facebook.presto.iceberg.container.IcebergMinIODataLake.ACCESS_KEY; +import static com.facebook.presto.iceberg.container.IcebergMinIODataLake.SECRET_KEY; +import static com.facebook.presto.testing.TestingConnectorSession.SESSION; +import static com.facebook.presto.tests.sql.TestTable.randomTableSuffix; +import static java.lang.String.format; +import static java.nio.file.Files.createTempDirectory; + +public class TestIcebergDistributedOnS3Hadoop + extends IcebergDistributedTestBase +{ + static final String WAREHOUSE_DATA_DIR = "warehouse_data/"; + final String bucketName; + final String catalogWarehouseDir; + private IcebergMinIODataLake dockerizedS3DataLake; + HostAndPort hostAndPort; + + public TestIcebergDistributedOnS3Hadoop() + throws IOException + { + super(HADOOP); + bucketName = "forhadoop-" + randomTableSuffix(); + catalogWarehouseDir = createTempDirectory(bucketName).toUri().toString(); + } + + protected QueryRunner createQueryRunner() + throws Exception + { + return IcebergQueryRunner.builder() + .setCatalogType(HADOOP) + .setExtraConnectorProperties(ImmutableMap.of( + "iceberg.catalog.warehouse", catalogWarehouseDir, + "iceberg.catalog.hadoop.warehouse.datadir", getCatalogDataDirectory().toString(), + "hive.s3.aws-access-key", ACCESS_KEY, + "hive.s3.aws-secret-key", SECRET_KEY, + "hive.s3.endpoint", format("http://%s:%s", hostAndPort.getHost(), hostAndPort.getPort()), + "hive.s3.path-style-access", "true")) + .build().getQueryRunner(); + } + + @BeforeClass + @Override + public void init() + throws Exception + { + this.dockerizedS3DataLake = new IcebergMinIODataLake(bucketName, WAREHOUSE_DATA_DIR); + this.dockerizedS3DataLake.start(); + hostAndPort = this.dockerizedS3DataLake.getMinio().getMinioApiEndpoint(); + super.init(); + } + + @AfterClass(alwaysRun = true) + public void tearDown() + { + if (dockerizedS3DataLake != null) { + dockerizedS3DataLake.stop(); + } + } + + @Override + public void testCreateTableWithCustomLocation() + throws IOException + { + String tableName = "test_hadoop_table_with_custom_location"; + URI tableTargetURI = createTempDirectory(tableName).toUri(); + assertQueryFails(format("create table %s (a int, b varchar)" + " with (location = '%s')", tableName, tableTargetURI.toString()), + "Cannot set a custom location for a path-based table.*"); + } + + protected Path getCatalogDataDirectory() + { + return new Path(URI.create(format("s3://%s/%s", bucketName, WAREHOUSE_DATA_DIR))); + } + + protected Path getCatalogDirectory() + { + return new Path(catalogWarehouseDir); + } + + protected HdfsEnvironment getHdfsEnvironment() + { + HiveClientConfig hiveClientConfig = new HiveClientConfig(); + MetastoreClientConfig metastoreClientConfig = new MetastoreClientConfig(); + HiveS3Config hiveS3Config = new HiveS3Config() + .setS3AwsAccessKey(ACCESS_KEY) + .setS3AwsSecretKey(SECRET_KEY) + .setS3PathStyleAccess(true) + .setS3Endpoint(format("http://%s:%s", hostAndPort.getHost(), hostAndPort.getPort())); + return getHdfsEnvironment(hiveClientConfig, metastoreClientConfig, hiveS3Config); + } + + protected Table loadTable(String tableName) + { + Configuration configuration = getHdfsEnvironment().getConfiguration(new HdfsContext(SESSION), getCatalogDataDirectory()); + Catalog catalog = CatalogUtil.loadCatalog(HADOOP.getCatalogImpl(), "test-hive", getProperties(), configuration); + return catalog.loadTable(TableIdentifier.of("tpch", tableName)); + } +} diff --git a/presto-iceberg/src/test/java/com/facebook/presto/iceberg/hadoop/TestIcebergHadoopCatalogOnS3DistributedQueries.java b/presto-iceberg/src/test/java/com/facebook/presto/iceberg/hadoop/TestIcebergHadoopCatalogOnS3DistributedQueries.java new file mode 100644 index 0000000000000..17f4d9529ad46 --- /dev/null +++ b/presto-iceberg/src/test/java/com/facebook/presto/iceberg/hadoop/TestIcebergHadoopCatalogOnS3DistributedQueries.java @@ -0,0 +1,95 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.facebook.presto.iceberg.hadoop; + +import com.facebook.presto.iceberg.IcebergQueryRunner; +import com.facebook.presto.iceberg.TestIcebergDistributedQueries; +import com.facebook.presto.iceberg.container.IcebergMinIODataLake; +import com.facebook.presto.testing.QueryRunner; +import com.google.common.collect.ImmutableMap; +import com.google.common.net.HostAndPort; +import org.testng.annotations.AfterClass; +import org.testng.annotations.BeforeClass; + +import java.io.IOException; + +import static com.facebook.presto.iceberg.CatalogType.HADOOP; +import static com.facebook.presto.iceberg.container.IcebergMinIODataLake.ACCESS_KEY; +import static com.facebook.presto.iceberg.container.IcebergMinIODataLake.SECRET_KEY; +import static com.facebook.presto.tests.sql.TestTable.randomTableSuffix; +import static java.lang.String.format; +import static java.nio.file.Files.createTempDirectory; + +public class TestIcebergHadoopCatalogOnS3DistributedQueries + extends TestIcebergDistributedQueries +{ + static final String WAREHOUSE_DATA_DIR = "warehouse_data/"; + final String bucketName; + final String catalogWarehouseDir; + private IcebergMinIODataLake dockerizedS3DataLake; + HostAndPort hostAndPort; + + public TestIcebergHadoopCatalogOnS3DistributedQueries() + throws IOException + { + super(HADOOP); + bucketName = "forhadoop-" + randomTableSuffix(); + catalogWarehouseDir = createTempDirectory(bucketName).toUri().toString(); + } + + protected QueryRunner createQueryRunner() + throws Exception + { + return IcebergQueryRunner.builder() + .setCatalogType(HADOOP) + .setExtraConnectorProperties(ImmutableMap.of( + "iceberg.catalog.warehouse", catalogWarehouseDir, + "iceberg.catalog.hadoop.warehouse.datadir", format("s3://%s/%s", bucketName, WAREHOUSE_DATA_DIR), + "hive.s3.aws-access-key", ACCESS_KEY, + "hive.s3.aws-secret-key", SECRET_KEY, + "hive.s3.endpoint", format("http://%s:%s", hostAndPort.getHost(), hostAndPort.getPort()), + "hive.s3.path-style-access", "true")) + .build().getQueryRunner(); + } + + @BeforeClass + @Override + public void init() + throws Exception + { + this.dockerizedS3DataLake = new IcebergMinIODataLake(bucketName, WAREHOUSE_DATA_DIR); + this.dockerizedS3DataLake.start(); + hostAndPort = this.dockerizedS3DataLake.getMinio().getMinioApiEndpoint(); + super.init(); + } + + @AfterClass(alwaysRun = true) + public void tearDown() + { + if (dockerizedS3DataLake != null) { + dockerizedS3DataLake.stop(); + } + } + + protected boolean supportsViews() + { + return false; + } + + @Override + public void testRenameTable() + { + // Rename table are not supported by hadoop catalog + } +} diff --git a/presto-iceberg/src/test/java/com/facebook/presto/iceberg/hadoop/TestIcebergSmokeHadoop.java b/presto-iceberg/src/test/java/com/facebook/presto/iceberg/hadoop/TestIcebergSmokeHadoop.java index 630d78efc3a50..e7429bb8adce5 100644 --- a/presto-iceberg/src/test/java/com/facebook/presto/iceberg/hadoop/TestIcebergSmokeHadoop.java +++ b/presto-iceberg/src/test/java/com/facebook/presto/iceberg/hadoop/TestIcebergSmokeHadoop.java @@ -24,12 +24,10 @@ import com.facebook.presto.iceberg.IcebergUtil; import com.facebook.presto.spi.ConnectorSession; import com.facebook.presto.spi.SchemaTableName; +import org.apache.hadoop.fs.Path; import org.apache.iceberg.Table; import org.testng.annotations.Test; -import java.io.File; -import java.nio.file.Path; - import static com.facebook.presto.iceberg.CatalogType.HADOOP; import static com.facebook.presto.iceberg.IcebergQueryRunner.ICEBERG_CATALOG; import static com.facebook.presto.iceberg.IcebergQueryRunner.getIcebergDataDirectoryPath; @@ -47,15 +45,15 @@ public TestIcebergSmokeHadoop() @Override protected String getLocation(String schema, String table) { - File tempLocation = getCatalogDirectory().toFile(); - return format("%s%s/%s", tempLocation.toURI(), schema, table); + Path tempLocation = getCatalogDirectory(); + return format("%s%s/%s", tempLocation.toUri(), schema, table); } @Override protected Path getCatalogDirectory() { - Path dataDirectory = getDistributedQueryRunner().getCoordinator().getDataDirectory(); - Path catalogDirectory = getIcebergDataDirectoryPath(dataDirectory, HADOOP.name(), new IcebergConfig().getFileFormat(), false); + java.nio.file.Path dataDirectory = getDistributedQueryRunner().getCoordinator().getDataDirectory(); + Path catalogDirectory = new Path(getIcebergDataDirectoryPath(dataDirectory, HADOOP.name(), new IcebergConfig().getFileFormat(), false).toFile().toURI()); return catalogDirectory; } @@ -64,7 +62,7 @@ protected Table getIcebergTable(ConnectorSession session, String schema, String { IcebergConfig icebergConfig = new IcebergConfig(); icebergConfig.setCatalogType(HADOOP); - icebergConfig.setCatalogWarehouse(getCatalogDirectory().toFile().getPath()); + icebergConfig.setCatalogWarehouse(getCatalogDirectory().toString()); IcebergNativeCatalogFactory catalogFactory = new IcebergNativeCatalogFactory(icebergConfig, new IcebergCatalogName(ICEBERG_CATALOG), diff --git a/presto-iceberg/src/test/java/com/facebook/presto/iceberg/hadoop/TestIcebergSmokeOnS3Hadoop.java b/presto-iceberg/src/test/java/com/facebook/presto/iceberg/hadoop/TestIcebergSmokeOnS3Hadoop.java new file mode 100644 index 0000000000000..fc85c0af1187c --- /dev/null +++ b/presto-iceberg/src/test/java/com/facebook/presto/iceberg/hadoop/TestIcebergSmokeOnS3Hadoop.java @@ -0,0 +1,496 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.facebook.presto.iceberg.hadoop; + +import com.facebook.presto.Session; +import com.facebook.presto.hive.gcs.HiveGcsConfig; +import com.facebook.presto.hive.gcs.HiveGcsConfigurationInitializer; +import com.facebook.presto.hive.s3.HiveS3Config; +import com.facebook.presto.hive.s3.PrestoS3ConfigurationUpdater; +import com.facebook.presto.iceberg.FileFormat; +import com.facebook.presto.iceberg.IcebergCatalogName; +import com.facebook.presto.iceberg.IcebergConfig; +import com.facebook.presto.iceberg.IcebergDistributedSmokeTestBase; +import com.facebook.presto.iceberg.IcebergNativeCatalogFactory; +import com.facebook.presto.iceberg.IcebergQueryRunner; +import com.facebook.presto.iceberg.container.IcebergMinIODataLake; +import com.facebook.presto.spi.ConnectorSession; +import com.facebook.presto.spi.SchemaTableName; +import com.facebook.presto.testing.MaterializedResult; +import com.facebook.presto.testing.QueryRunner; +import com.google.common.collect.ImmutableMap; +import com.google.common.net.HostAndPort; +import org.apache.hadoop.fs.Path; +import org.apache.iceberg.Table; +import org.intellij.lang.annotations.Language; +import org.testng.annotations.AfterClass; +import org.testng.annotations.BeforeClass; +import org.testng.annotations.Test; + +import java.io.IOException; +import java.net.URI; + +import static com.facebook.presto.iceberg.CatalogType.HADOOP; +import static com.facebook.presto.iceberg.IcebergQueryRunner.ICEBERG_CATALOG; +import static com.facebook.presto.iceberg.IcebergUtil.getNativeIcebergTable; +import static com.facebook.presto.iceberg.container.IcebergMinIODataLake.ACCESS_KEY; +import static com.facebook.presto.iceberg.container.IcebergMinIODataLake.SECRET_KEY; +import static com.facebook.presto.tests.sql.TestTable.randomTableSuffix; +import static com.google.common.collect.Iterables.getOnlyElement; +import static java.lang.String.format; +import static java.nio.file.Files.createTempDirectory; +import static java.util.Locale.ENGLISH; +import static org.assertj.core.api.Assertions.assertThat; +import static org.testng.Assert.assertEquals; + +public class TestIcebergSmokeOnS3Hadoop + extends IcebergDistributedSmokeTestBase +{ + static final String WAREHOUSE_DATA_DIR = "warehouse_data/"; + final String bucketName; + final String catalogWarehouseDir; + + private IcebergMinIODataLake dockerizedS3DataLake; + HostAndPort hostAndPort; + + public TestIcebergSmokeOnS3Hadoop() + throws IOException + { + super(HADOOP); + bucketName = "forhadoop-" + randomTableSuffix(); + catalogWarehouseDir = createTempDirectory(bucketName).toUri().toString(); + } + + protected QueryRunner createQueryRunner() + throws Exception + { + return IcebergQueryRunner.builder() + .setCatalogType(HADOOP) + .setExtraConnectorProperties(ImmutableMap.of( + "iceberg.catalog.warehouse", catalogWarehouseDir, + "iceberg.catalog.hadoop.warehouse.datadir", getCatalogDataDirectory().toString(), + "hive.s3.aws-access-key", ACCESS_KEY, + "hive.s3.aws-secret-key", SECRET_KEY, + "hive.s3.endpoint", format("http://%s:%s", hostAndPort.getHost(), hostAndPort.getPort()), + "hive.s3.path-style-access", "true")) + .build().getQueryRunner(); + } + + @BeforeClass + @Override + public void init() + throws Exception + { + this.dockerizedS3DataLake = new IcebergMinIODataLake(bucketName, WAREHOUSE_DATA_DIR); + this.dockerizedS3DataLake.start(); + hostAndPort = this.dockerizedS3DataLake.getMinio().getMinioApiEndpoint(); + super.init(); + } + + @AfterClass(alwaysRun = true) + public void tearDown() + { + if (dockerizedS3DataLake != null) { + dockerizedS3DataLake.stop(); + } + } + + @Test + public void testShowCreateTableWithSpecifiedWriteDataLocation() + { + String tableName = "test_table_with_specified_write_data_location"; + String dataWriteLocation = getPathBasedOnDataDirectory("test-" + randomTableSuffix()); + try { + assertUpdate(format("CREATE TABLE %s(a int, b varchar) with (\"write.data.path\" = '%s')", tableName, dataWriteLocation)); + String schemaName = getSession().getSchema().get(); + String location = getLocation(schemaName, tableName); + String createTableSql = "CREATE TABLE iceberg.%s.%s (\n" + + " \"a\" integer,\n" + + " \"b\" varchar\n" + + ")\n" + + "WITH (\n" + + " delete_mode = 'merge-on-read',\n" + + " format = 'PARQUET',\n" + + " format_version = '2',\n" + + " location = '%s',\n" + + " metadata_delete_after_commit = false,\n" + + " metadata_previous_versions_max = 100,\n" + + " metrics_max_inferred_column = 100,\n" + + " \"read.split.target-size\" = 134217728,\n" + + " \"write.data.path\" = '%s',\n" + + " \"write.update.mode\" = 'merge-on-read'\n" + + ")"; + assertThat(computeActual("SHOW CREATE TABLE " + tableName).getOnlyValue()) + .isEqualTo(format(createTableSql, schemaName, tableName, location, dataWriteLocation)); + } + finally { + assertUpdate("DROP TABLE IF EXISTS " + tableName); + } + } + + @Test + public void testTableWithSpecifiedWriteDataLocation() + { + String tableName = "test_table_with_specified_write_data_location2"; + String dataWriteLocation = getPathBasedOnDataDirectory("test-" + randomTableSuffix()); + try { + assertUpdate(format("create table %s(a int, b varchar) with (\"write.data.path\" = '%s')", tableName, dataWriteLocation)); + assertUpdate(format("insert into %s values(1, '1001'), (2, '1002'), (3, '1003')", tableName), 3); + assertQuery("select * from " + tableName, "values(1, '1001'), (2, '1002'), (3, '1003')"); + assertUpdate(format("delete from %s where a > 2", tableName), 1); + assertQuery("select * from " + tableName, "values(1, '1001'), (2, '1002')"); + } + finally { + assertUpdate("drop table if exists " + tableName); + } + } + + @Test + public void testPartitionedTableWithSpecifiedWriteDataLocation() + { + String tableName = "test_table_with_specified_write_data_location3"; + String dataWriteLocation = getPathBasedOnDataDirectory("test-" + randomTableSuffix()); + try { + assertUpdate(format("create table %s(a int, b varchar) with (partitioning = ARRAY['a'], \"write.data.path\" = '%s')", tableName, dataWriteLocation)); + assertUpdate(format("insert into %s values(1, '1001'), (2, '1002'), (3, '1003')", tableName), 3); + assertQuery("select * from " + tableName, "values(1, '1001'), (2, '1002'), (3, '1003')"); + assertUpdate(format("delete from %s where a > 2", tableName), 1); + assertQuery("select * from " + tableName, "values(1, '1001'), (2, '1002')"); + } + finally { + assertUpdate("drop table if exists " + tableName); + } + } + + @Override + protected void testCreatePartitionedTableAs(Session session, FileFormat fileFormat) + { + String tableName = "test_create_partitioned_table_as_" + fileFormat.toString().toLowerCase(ENGLISH); + @Language("SQL") String createTable = "" + + "CREATE TABLE " + tableName + " " + + "WITH (" + + "format = '" + fileFormat + "', " + + "partitioning = ARRAY['ORDER_STATUS', 'Ship_Priority', 'Bucket(order_key,9)']" + + ") " + + "AS " + + "SELECT orderkey AS order_key, shippriority AS ship_priority, orderstatus AS order_status " + + "FROM tpch.tiny.orders"; + + assertUpdate(session, createTable, "SELECT count(*) from orders"); + + String createTableSql = "" + + "CREATE TABLE %s.%s.%s (\n" + + " \"order_key\" bigint,\n" + + " \"ship_priority\" integer,\n" + + " \"order_status\" varchar\n" + + ")\n" + + "WITH (\n" + + " delete_mode = 'merge-on-read',\n" + + " format = '" + fileFormat + "',\n" + + " format_version = '2',\n" + + " location = '%s',\n" + + " metadata_delete_after_commit = false,\n" + + " metadata_previous_versions_max = 100,\n" + + " metrics_max_inferred_column = 100,\n" + + " partitioning = ARRAY['order_status','ship_priority','bucket(order_key, 9)'],\n" + + " \"read.split.target-size\" = 134217728,\n" + + " \"write.data.path\" = '%s',\n" + + " \"write.update.mode\" = 'merge-on-read'\n" + + ")"; + + MaterializedResult actualResult = computeActual("SHOW CREATE TABLE " + tableName); + assertEquals(getOnlyElement(actualResult.getOnlyColumnAsSet()), + format(createTableSql, + getSession().getCatalog().get(), + getSession().getSchema().get(), + tableName, + getLocation(getSession().getSchema().get(), tableName), + getPathBasedOnDataDirectory(getSession().getSchema().get() + "/" + tableName))); + + assertQuery(session, "SELECT * from " + tableName, "SELECT orderkey, shippriority, orderstatus FROM orders"); + + dropTable(session, tableName); + } + + @Override + protected void testCreateTableLike() + { + Session session = getSession(); + String schemaName = session.getSchema().get(); + + String tablePropertiesString = "WITH (\n" + + " delete_mode = 'merge-on-read',\n" + + " format = 'PARQUET',\n" + + " format_version = '2',\n" + + " location = '%s',\n" + + " metadata_delete_after_commit = false,\n" + + " metadata_previous_versions_max = 100,\n" + + " metrics_max_inferred_column = 100,\n" + + " partitioning = ARRAY['adate'],\n" + + " \"read.split.target-size\" = 134217728,\n" + + " \"write.data.path\" = '%s',\n" + + " \"write.update.mode\" = 'merge-on-read'\n" + + ")"; + assertUpdate(session, "CREATE TABLE test_create_table_like_original (col1 INTEGER, aDate DATE) WITH(format = 'PARQUET', partitioning = ARRAY['aDate'])"); + assertEquals(getTablePropertiesString("test_create_table_like_original"), + format(tablePropertiesString, + getLocation(schemaName, "test_create_table_like_original"), + getPathBasedOnDataDirectory(schemaName + "/test_create_table_like_original"))); + + assertUpdate(session, "CREATE TABLE test_create_table_like_copy0 (LIKE test_create_table_like_original, col2 INTEGER)"); + assertUpdate(session, "INSERT INTO test_create_table_like_copy0 (col1, aDate, col2) VALUES (1, CAST('1950-06-28' AS DATE), 3)", 1); + assertQuery(session, "SELECT * from test_create_table_like_copy0", "VALUES(1, CAST('1950-06-28' AS DATE), 3)"); + dropTable(session, "test_create_table_like_copy0"); + + assertUpdate(session, "CREATE TABLE test_create_table_like_copy1 (LIKE test_create_table_like_original)"); + tablePropertiesString = "WITH (\n" + + " delete_mode = 'merge-on-read',\n" + + " format = 'PARQUET',\n" + + " format_version = '2',\n" + + " location = '%s',\n" + + " metadata_delete_after_commit = false,\n" + + " metadata_previous_versions_max = 100,\n" + + " metrics_max_inferred_column = 100,\n" + + " \"read.split.target-size\" = 134217728,\n" + + " \"write.data.path\" = '%s',\n" + + " \"write.update.mode\" = 'merge-on-read'\n" + + ")"; + assertEquals(getTablePropertiesString("test_create_table_like_copy1"), + format(tablePropertiesString, + getLocation(schemaName, "test_create_table_like_copy1"), + getPathBasedOnDataDirectory(schemaName + "/test_create_table_like_copy1"))); + dropTable(session, "test_create_table_like_copy1"); + + assertUpdate(session, "CREATE TABLE test_create_table_like_copy2 (LIKE test_create_table_like_original EXCLUDING PROPERTIES)"); + tablePropertiesString = "WITH (\n" + + " delete_mode = 'merge-on-read',\n" + + " format = 'PARQUET',\n" + + " format_version = '2',\n" + + " location = '%s',\n" + + " metadata_delete_after_commit = false,\n" + + " metadata_previous_versions_max = 100,\n" + + " metrics_max_inferred_column = 100,\n" + + " \"read.split.target-size\" = 134217728,\n" + + " \"write.data.path\" = '%s',\n" + + " \"write.update.mode\" = 'merge-on-read'\n" + + ")"; + assertEquals(getTablePropertiesString("test_create_table_like_copy2"), + format(tablePropertiesString, + getLocation(schemaName, "test_create_table_like_copy2"), + getPathBasedOnDataDirectory(schemaName + "/test_create_table_like_copy2"))); + dropTable(session, "test_create_table_like_copy2"); + + assertUpdate(session, "CREATE TABLE test_create_table_like_copy5 (LIKE test_create_table_like_original INCLUDING PROPERTIES)" + + " WITH (location = '', \"write.data.path\" = '', format = 'ORC')"); + tablePropertiesString = "WITH (\n" + + " delete_mode = 'merge-on-read',\n" + + " format = 'ORC',\n" + + " format_version = '2',\n" + + " location = '%s',\n" + + " metadata_delete_after_commit = false,\n" + + " metadata_previous_versions_max = 100,\n" + + " metrics_max_inferred_column = 100,\n" + + " partitioning = ARRAY['adate'],\n" + + " \"read.split.target-size\" = 134217728,\n" + + " \"write.data.path\" = '%s',\n" + + " \"write.update.mode\" = 'merge-on-read'\n" + + ")"; + assertEquals(getTablePropertiesString("test_create_table_like_copy5"), + format(tablePropertiesString, + getLocation(schemaName, "test_create_table_like_copy5"), + getPathBasedOnDataDirectory(schemaName + "/test_create_table_like_copy5"))); + dropTable(session, "test_create_table_like_copy5"); + + assertQueryFails(session, "CREATE TABLE test_create_table_like_copy6 (LIKE test_create_table_like_original INCLUDING PROPERTIES)", + "Cannot set a custom location for a path-based table.*"); + + dropTable(session, "test_create_table_like_original"); + } + + @Override + protected void testCreateTableWithFormatVersion(String formatVersion, String defaultDeleteMode) + { + String tableName = "test_create_table_with_format_version_" + formatVersion; + @Language("SQL") String createTable = "" + + "CREATE TABLE " + tableName + " " + + "WITH (" + + "format = 'PARQUET', " + + "format_version = '" + formatVersion + "'" + + ") " + + "AS " + + "SELECT orderkey AS order_key, shippriority AS ship_priority, orderstatus AS order_status " + + "FROM tpch.tiny.orders"; + + Session session = getSession(); + + assertUpdate(session, createTable, "SELECT count(*) from orders"); + + String createTableSql = "" + + "CREATE TABLE %s.%s.%s (\n" + + " \"order_key\" bigint,\n" + + " \"ship_priority\" integer,\n" + + " \"order_status\" varchar\n" + + ")\n" + + "WITH (\n" + + " delete_mode = '%s',\n" + + " format = 'PARQUET',\n" + + " format_version = '%s',\n" + + " location = '%s',\n" + + " metadata_delete_after_commit = false,\n" + + " metadata_previous_versions_max = 100,\n" + + " metrics_max_inferred_column = 100,\n" + + " \"read.split.target-size\" = 134217728,\n" + + " \"write.data.path\" = '%s',\n" + + " \"write.update.mode\" = '%s'\n" + + ")"; + + MaterializedResult actualResult = computeActual("SHOW CREATE TABLE " + tableName); + assertEquals(getOnlyElement(actualResult.getOnlyColumnAsSet()), + format(createTableSql, + getSession().getCatalog().get(), + getSession().getSchema().get(), + tableName, + defaultDeleteMode, + formatVersion, + getLocation(getSession().getSchema().get(), tableName), + getPathBasedOnDataDirectory(getSession().getSchema().get() + "/" + tableName), + defaultDeleteMode)); + + dropTable(session, tableName); + } + + @Override + public void testShowCreateTable() + { + String schemaName = getSession().getSchema().get(); + String createTableSql = "CREATE TABLE iceberg.%s.orders (\n" + + " \"orderkey\" bigint,\n" + + " \"custkey\" bigint,\n" + + " \"orderstatus\" varchar,\n" + + " \"totalprice\" double,\n" + + " \"orderdate\" date,\n" + + " \"orderpriority\" varchar,\n" + + " \"clerk\" varchar,\n" + + " \"shippriority\" integer,\n" + + " \"comment\" varchar\n" + + ")\n" + + "WITH (\n" + + " delete_mode = 'merge-on-read',\n" + + " format = 'PARQUET',\n" + + " format_version = '2',\n" + + " location = '%s',\n" + + " metadata_delete_after_commit = false,\n" + + " metadata_previous_versions_max = 100,\n" + + " metrics_max_inferred_column = 100,\n" + + " \"read.split.target-size\" = 134217728,\n" + + " \"write.data.path\" = '%s',\n" + + " \"write.update.mode\" = 'merge-on-read'\n" + + ")"; + assertThat(computeActual("SHOW CREATE TABLE orders").getOnlyValue()) + .isEqualTo(format(createTableSql, + schemaName, + getLocation(schemaName, "orders"), + getPathBasedOnDataDirectory(schemaName + "/orders"))); + } + + @Test + public void testTableComments() + { + Session session = getSession(); + String schemaName = session.getSchema().get(); + + @Language("SQL") String createTable = "" + + "CREATE TABLE iceberg.%s.test_table_comments (\n" + + " \"_x\" bigint\n" + + ")\n" + + "COMMENT '%s'\n" + + "WITH (\n" + + " format = 'ORC',\n" + + " format_version = '2'\n" + + ")"; + + assertUpdate(format(createTable, schemaName, "test table comment")); + + String createTableSql = "" + + "CREATE TABLE iceberg.%s.test_table_comments (\n" + + " \"_x\" bigint\n" + + ")\n" + + "COMMENT '%s'\n" + + "WITH (\n" + + " delete_mode = 'merge-on-read',\n" + + " format = 'ORC',\n" + + " format_version = '2',\n" + + " location = '%s',\n" + + " metadata_delete_after_commit = false,\n" + + " metadata_previous_versions_max = 100,\n" + + " metrics_max_inferred_column = 100,\n" + + " \"read.split.target-size\" = 134217728,\n" + + " \"write.data.path\" = '%s',\n" + + " \"write.update.mode\" = 'merge-on-read'\n" + + ")"; + + MaterializedResult resultOfCreate = computeActual("SHOW CREATE TABLE test_table_comments"); + assertEquals(getOnlyElement(resultOfCreate.getOnlyColumnAsSet()), + format(createTableSql, schemaName, "test table comment", + getLocation(schemaName, "test_table_comments"), + getPathBasedOnDataDirectory(schemaName + "/test_table_comments"))); + + dropTable(session, "test_table_comments"); + } + + @Override + protected String getLocation(String schema, String table) + { + Path tempLocation = getCatalogDirectory(); + return format("%s/%s/%s", tempLocation.toUri(), schema, table); + } + + @Override + protected Table getIcebergTable(ConnectorSession session, String schema, String tableName) + { + IcebergConfig icebergConfig = new IcebergConfig(); + icebergConfig.setCatalogType(HADOOP); + icebergConfig.setCatalogWarehouse(getCatalogDirectory().toString()); + + HiveS3Config hiveS3Config = new HiveS3Config() + .setS3AwsAccessKey(ACCESS_KEY) + .setS3AwsSecretKey(SECRET_KEY) + .setS3PathStyleAccess(true) + .setS3Endpoint(format("http://%s:%s", hostAndPort.getHost(), hostAndPort.getPort())); + + IcebergNativeCatalogFactory catalogFactory = new IcebergNativeCatalogFactory(icebergConfig, + new IcebergCatalogName(ICEBERG_CATALOG), + new PrestoS3ConfigurationUpdater(hiveS3Config), + new HiveGcsConfigurationInitializer(new HiveGcsConfig())); + + return getNativeIcebergTable(catalogFactory, + session, + SchemaTableName.valueOf(schema + "." + tableName)); + } + + protected Path getCatalogDirectory() + { + return new Path(catalogWarehouseDir); + } + + private Path getCatalogDataDirectory() + { + return new Path(URI.create(format("s3://%s/%s", bucketName, WAREHOUSE_DATA_DIR))); + } + + private String getPathBasedOnDataDirectory(String name) + { + return new Path(getCatalogDataDirectory(), name).toString(); + } +} diff --git a/presto-iceberg/src/test/java/com/facebook/presto/iceberg/hive/TestIcebergDistributedHive.java b/presto-iceberg/src/test/java/com/facebook/presto/iceberg/hive/TestIcebergDistributedHive.java index f0f8627813bf7..68fba51a4526a 100644 --- a/presto-iceberg/src/test/java/com/facebook/presto/iceberg/hive/TestIcebergDistributedHive.java +++ b/presto-iceberg/src/test/java/com/facebook/presto/iceberg/hive/TestIcebergDistributedHive.java @@ -82,7 +82,7 @@ protected Table loadTable(String tableName) protected ExtendedHiveMetastore getFileHiveMetastore() { IcebergFileHiveMetastore fileHiveMetastore = new IcebergFileHiveMetastore(getHdfsEnvironment(), - getCatalogDirectory().getPath(), + getCatalogDirectory().toString(), "test"); return memoizeMetastore(fileHiveMetastore, false, 1000, 0); } diff --git a/presto-iceberg/src/test/java/com/facebook/presto/iceberg/hive/TestIcebergSmokeHive.java b/presto-iceberg/src/test/java/com/facebook/presto/iceberg/hive/TestIcebergSmokeHive.java index 406d522ecfc20..e1334bcaad855 100644 --- a/presto-iceberg/src/test/java/com/facebook/presto/iceberg/hive/TestIcebergSmokeHive.java +++ b/presto-iceberg/src/test/java/com/facebook/presto/iceberg/hive/TestIcebergSmokeHive.java @@ -68,7 +68,7 @@ protected static HdfsEnvironment getHdfsEnvironment() protected ExtendedHiveMetastore getFileHiveMetastore() { IcebergFileHiveMetastore fileHiveMetastore = new IcebergFileHiveMetastore(getHdfsEnvironment(), - getCatalogDirectory().toFile().getPath(), + getCatalogDirectory().toString(), "test"); return memoizeMetastore(fileHiveMetastore, false, 1000, 0); } diff --git a/presto-iceberg/src/test/java/com/facebook/presto/iceberg/nessie/TestIcebergDistributedNessie.java b/presto-iceberg/src/test/java/com/facebook/presto/iceberg/nessie/TestIcebergDistributedNessie.java index 3aacc290d2055..095e25e74a0d9 100644 --- a/presto-iceberg/src/test/java/com/facebook/presto/iceberg/nessie/TestIcebergDistributedNessie.java +++ b/presto-iceberg/src/test/java/com/facebook/presto/iceberg/nessie/TestIcebergDistributedNessie.java @@ -18,11 +18,11 @@ import com.facebook.presto.testing.QueryRunner; import com.facebook.presto.testing.containers.NessieContainer; import com.google.common.collect.ImmutableMap; +import org.apache.hadoop.fs.Path; import org.testng.annotations.AfterClass; import org.testng.annotations.BeforeClass; import org.testng.annotations.Test; -import java.io.File; import java.util.Map; import static com.facebook.presto.iceberg.CatalogType.NESSIE; @@ -43,7 +43,7 @@ protected TestIcebergDistributedNessie() @Override protected Map getProperties() { - File metastoreDir = getCatalogDirectory(); + Path metastoreDir = getCatalogDirectory(); return ImmutableMap.of("warehouse", metastoreDir.toString(), "uri", nessieContainer.getRestApiUri()); } diff --git a/presto-iceberg/src/test/java/com/facebook/presto/iceberg/nessie/TestIcebergSmokeNessie.java b/presto-iceberg/src/test/java/com/facebook/presto/iceberg/nessie/TestIcebergSmokeNessie.java index 1e9e882eb0a36..064679730c9e2 100644 --- a/presto-iceberg/src/test/java/com/facebook/presto/iceberg/nessie/TestIcebergSmokeNessie.java +++ b/presto-iceberg/src/test/java/com/facebook/presto/iceberg/nessie/TestIcebergSmokeNessie.java @@ -105,7 +105,7 @@ protected Table getIcebergTable(ConnectorSession session, String schema, String { IcebergConfig icebergConfig = new IcebergConfig(); icebergConfig.setCatalogType(NESSIE); - icebergConfig.setCatalogWarehouse(getCatalogDirectory().toFile().getPath()); + icebergConfig.setCatalogWarehouse(getCatalogDirectory().toString()); IcebergNessieConfig nessieConfig = new IcebergNessieConfig().setServerUri(nessieContainer.getRestApiUri()); diff --git a/presto-iceberg/src/test/java/com/facebook/presto/iceberg/rest/IcebergRestTestUtil.java b/presto-iceberg/src/test/java/com/facebook/presto/iceberg/rest/IcebergRestTestUtil.java index 1a6f71068f0ba..a874a42168c2b 100644 --- a/presto-iceberg/src/test/java/com/facebook/presto/iceberg/rest/IcebergRestTestUtil.java +++ b/presto-iceberg/src/test/java/com/facebook/presto/iceberg/rest/IcebergRestTestUtil.java @@ -20,6 +20,9 @@ import com.facebook.airlift.node.NodeInfo; import com.facebook.presto.hive.HdfsContext; import com.facebook.presto.hive.HdfsEnvironment; +import com.facebook.presto.hive.HiveClientConfig; +import com.facebook.presto.hive.MetastoreClientConfig; +import com.facebook.presto.hive.s3.HiveS3Config; import com.facebook.presto.spi.ConnectorSession; import com.facebook.presto.testing.TestingConnectorSession; import com.google.common.collect.ImmutableList; @@ -61,7 +64,7 @@ public static Map restConnectorProperties(String serverUri) public static TestingHttpServer getRestServer(String location) { JdbcCatalog backingCatalog = new JdbcCatalog(); - HdfsEnvironment hdfsEnvironment = getHdfsEnvironment(); + HdfsEnvironment hdfsEnvironment = getHdfsEnvironment(new HiveClientConfig(), new MetastoreClientConfig(), new HiveS3Config()); backingCatalog.setConf(hdfsEnvironment.getConfiguration(new HdfsContext(SESSION), new Path(location))); Map properties = ImmutableMap.builder() diff --git a/presto-iceberg/src/test/java/com/facebook/presto/iceberg/rest/TestIcebergSmokeRestNestedNamespace.java b/presto-iceberg/src/test/java/com/facebook/presto/iceberg/rest/TestIcebergSmokeRestNestedNamespace.java index 113ee320fdd27..62627eb352d04 100644 --- a/presto-iceberg/src/test/java/com/facebook/presto/iceberg/rest/TestIcebergSmokeRestNestedNamespace.java +++ b/presto-iceberg/src/test/java/com/facebook/presto/iceberg/rest/TestIcebergSmokeRestNestedNamespace.java @@ -38,6 +38,7 @@ import org.testng.annotations.Test; import java.io.File; +import java.io.IOException; import java.util.Map; import java.util.Optional; @@ -50,6 +51,7 @@ import static com.google.common.io.MoreFiles.deleteRecursively; import static com.google.common.io.RecursiveDeleteOption.ALLOW_INSECURE; import static java.lang.String.format; +import static java.nio.file.Files.createTempDirectory; import static java.util.Locale.ENGLISH; import static org.assertj.core.api.Assertions.assertThat; import static org.assertj.core.api.Assertions.assertThatThrownBy; @@ -182,6 +184,40 @@ public void testShowCreateTable() ")", schemaName, getLocation(schemaName, "orders"))); } + @Test + public void testShowCreateTableWithSpecifiedWriteDataLocation() + throws IOException + { + String tableName = "test_table_with_specified_write_data_location"; + String dataWriteLocation = createTempDirectory("test1").toAbsolutePath().toString(); + try { + assertUpdate(format("CREATE TABLE %s(a int, b varchar) with (\"write.data.path\" = '%s')", tableName, dataWriteLocation)); + String schemaName = getSession().getSchema().get(); + String location = getLocation(schemaName, tableName); + String createTableSql = "CREATE TABLE iceberg.\"%s\".%s (\n" + + " \"a\" integer,\n" + + " \"b\" varchar\n" + + ")\n" + + "WITH (\n" + + " delete_mode = 'merge-on-read',\n" + + " format = 'PARQUET',\n" + + " format_version = '2',\n" + + " location = '%s',\n" + + " metadata_delete_after_commit = false,\n" + + " metadata_previous_versions_max = 100,\n" + + " metrics_max_inferred_column = 100,\n" + + " \"read.split.target-size\" = 134217728,\n" + + " \"write.data.path\" = '%s',\n" + + " \"write.update.mode\" = 'merge-on-read'\n" + + ")"; + assertThat(computeActual("SHOW CREATE TABLE " + tableName).getOnlyValue()) + .isEqualTo(format(createTableSql, schemaName, tableName, location, dataWriteLocation)); + } + finally { + assertUpdate(("DROP TABLE IF EXISTS " + tableName)); + } + } + @Test @Override // override due to double quotes around nested namespace public void testTableComments()