diff --git a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/HiveTableOperations.java b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/HiveTableOperations.java index c1e5ee8967d5..255a7442e1e8 100644 --- a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/HiveTableOperations.java +++ b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/HiveTableOperations.java @@ -28,7 +28,6 @@ import org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe; import org.apache.hadoop.mapred.FileInputFormat; import org.apache.hadoop.mapred.FileOutputFormat; -import org.apache.iceberg.LocationProviders; import org.apache.iceberg.TableMetadata; import org.apache.iceberg.TableMetadataParser; import org.apache.iceberg.TableOperations; @@ -54,6 +53,7 @@ import static io.trino.plugin.hive.HiveType.toHiveType; import static io.trino.plugin.hive.metastore.MetastoreUtil.buildInitialPrivilegeSet; import static io.trino.plugin.iceberg.IcebergErrorCode.ICEBERG_INVALID_METADATA; +import static io.trino.plugin.iceberg.IcebergUtil.getLocationProvider; import static io.trino.plugin.iceberg.IcebergUtil.isIcebergTable; import static java.lang.Integer.parseInt; import static java.lang.String.format; @@ -255,7 +255,7 @@ public String metadataFileLocation(String filename) public LocationProvider locationProvider() { TableMetadata metadata = current(); - return LocationProviders.locationsFor(metadata.location(), metadata.properties()); + return getLocationProvider(getSchemaTableName(), metadata.location(), metadata.properties()); } private Table getTable() diff --git a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergMetadata.java b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergMetadata.java index 847057920d79..be76996d12e4 100644 --- a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergMetadata.java +++ b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergMetadata.java @@ -145,7 +145,6 @@ import static io.trino.plugin.iceberg.IcebergTableProperties.getTableLocation; import static io.trino.plugin.iceberg.IcebergUtil.deserializePartitionValue; import static io.trino.plugin.iceberg.IcebergUtil.getColumns; -import static io.trino.plugin.iceberg.IcebergUtil.getDataPath; import static io.trino.plugin.iceberg.IcebergUtil.getFileFormat; import static io.trino.plugin.iceberg.IcebergUtil.getIcebergTableWithMetadata; import static io.trino.plugin.iceberg.IcebergUtil.getPartitionKeys; @@ -174,6 +173,9 @@ import static org.apache.iceberg.TableMetadata.newTableMetadata; import static org.apache.iceberg.TableProperties.DEFAULT_FILE_FORMAT; import static org.apache.iceberg.TableProperties.DEFAULT_FILE_FORMAT_DEFAULT; +import static org.apache.iceberg.TableProperties.OBJECT_STORE_PATH; +import static org.apache.iceberg.TableProperties.WRITE_METADATA_LOCATION; +import static org.apache.iceberg.TableProperties.WRITE_NEW_DATA_LOCATION; import static org.apache.iceberg.Transactions.createTableTransaction; public class IcebergMetadata @@ -607,7 +609,8 @@ public ConnectorOutputTableHandle beginCreateTable(ConnectorSession session, Con propertiesBuilder.put(TABLE_COMMENT, tableMetadata.getComment().get()); } - TableMetadata metadata = newTableMetadata(schema, partitionSpec, targetPath, propertiesBuilder.build()); + Map properties = propertiesBuilder.build(); + TableMetadata metadata = newTableMetadata(schema, partitionSpec, targetPath, properties); transaction = createTableTransaction(tableName, operations, metadata); @@ -618,7 +621,8 @@ public ConnectorOutputTableHandle beginCreateTable(ConnectorSession session, Con PartitionSpecParser.toJson(metadata.spec()), getColumns(metadata.schema(), typeManager), targetPath, - fileFormat); + fileFormat, + properties); } @Override @@ -641,8 +645,9 @@ public ConnectorInsertTableHandle beginInsert(ConnectorSession session, Connecto SchemaParser.toJson(icebergTable.schema()), PartitionSpecParser.toJson(icebergTable.spec()), getColumns(icebergTable.schema(), typeManager), - getDataPath(icebergTable.location()), - getFileFormat(icebergTable)); + icebergTable.location(), + getFileFormat(icebergTable), + icebergTable.properties()); } @Override @@ -702,6 +707,13 @@ public Optional getInfo(ConnectorTableHandle tableHandle) public void dropTable(ConnectorSession session, ConnectorTableHandle tableHandle) { IcebergTableHandle handle = (IcebergTableHandle) tableHandle; + // TODO: support path override in Iceberg table creation: https://github.com/trinodb/trino/issues/8861 + org.apache.iceberg.Table table = getIcebergTable(session, handle.getSchemaTableName()); + if (table.properties().containsKey(OBJECT_STORE_PATH) || + table.properties().containsKey(WRITE_NEW_DATA_LOCATION) || + table.properties().containsKey(WRITE_METADATA_LOCATION)) { + throw new TrinoException(NOT_SUPPORTED, "Table " + handle.getSchemaTableName() + " contains Iceberg path override properties and cannot be dropped from Trino"); + } metastore.dropTable(new HiveIdentity(session), handle.getSchemaName(), handle.getTableName(), true); } @@ -1147,8 +1159,9 @@ public ConnectorInsertTableHandle beginRefreshMaterializedView(ConnectorSession SchemaParser.toJson(icebergTable.schema()), PartitionSpecParser.toJson(icebergTable.spec()), getColumns(icebergTable.schema(), typeManager), - getDataPath(icebergTable.location()), - getFileFormat(icebergTable)); + icebergTable.location(), + getFileFormat(icebergTable), + icebergTable.properties()); } @Override diff --git a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergPageSink.java b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergPageSink.java index 31dc1ae12430..7db34efbc673 100644 --- a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergPageSink.java +++ b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergPageSink.java @@ -44,6 +44,7 @@ import org.apache.iceberg.PartitionField; import org.apache.iceberg.PartitionSpec; import org.apache.iceberg.Schema; +import org.apache.iceberg.io.LocationProvider; import org.apache.iceberg.transforms.Transform; import java.util.ArrayList; @@ -84,7 +85,7 @@ public class IcebergPageSink private final int maxOpenWriters; private final Schema outputSchema; private final PartitionSpec partitionSpec; - private final String outputPath; + private final LocationProvider locationProvider; private final IcebergFileWriterFactory fileWriterFactory; private final HdfsEnvironment hdfsEnvironment; private final HdfsContext hdfsContext; @@ -103,7 +104,7 @@ public class IcebergPageSink public IcebergPageSink( Schema outputSchema, PartitionSpec partitionSpec, - String outputPath, + LocationProvider locationProvider, IcebergFileWriterFactory fileWriterFactory, PageIndexerFactory pageIndexerFactory, HdfsEnvironment hdfsEnvironment, @@ -117,11 +118,11 @@ public IcebergPageSink( requireNonNull(inputColumns, "inputColumns is null"); this.outputSchema = requireNonNull(outputSchema, "outputSchema is null"); this.partitionSpec = requireNonNull(partitionSpec, "partitionSpec is null"); - this.outputPath = requireNonNull(outputPath, "outputPath is null"); + this.locationProvider = requireNonNull(locationProvider, "locationProvider is null"); this.fileWriterFactory = requireNonNull(fileWriterFactory, "fileWriterFactory is null"); this.hdfsEnvironment = requireNonNull(hdfsEnvironment, "hdfsEnvironment is null"); this.hdfsContext = requireNonNull(hdfsContext, "hdfsContext is null"); - this.jobConf = toJobConf(hdfsEnvironment.getConfiguration(hdfsContext, new Path(outputPath))); + this.jobConf = toJobConf(hdfsEnvironment.getConfiguration(hdfsContext, new Path(locationProvider.newDataLocation("data-file")))); this.jsonCodec = requireNonNull(jsonCodec, "jsonCodec is null"); this.session = requireNonNull(session, "session is null"); this.fileFormat = requireNonNull(fileFormat, "fileFormat is null"); @@ -287,9 +288,7 @@ private int[] getWriterIndexes(Page page) } Optional partitionData = getPartitionData(pagePartitioner.getColumns(), page, position); - Optional partitionPath = partitionData.map(partitionSpec::partitionToPath); - - WriteContext writer = createWriter(partitionPath, partitionData); + WriteContext writer = createWriter(partitionData); writers.set(writerIndex, writer); } @@ -299,14 +298,11 @@ private int[] getWriterIndexes(Page page) return writerIndexes; } - private WriteContext createWriter(Optional partitionPath, Optional partitionData) + private WriteContext createWriter(Optional partitionData) { - Path outputPath = new Path(this.outputPath); - if (partitionPath.isPresent()) { - outputPath = new Path(outputPath, partitionPath.get()); - } - outputPath = new Path(outputPath, randomUUID().toString()); - outputPath = new Path(fileFormat.addExtension(outputPath.toString())); + String fileName = fileFormat.addExtension(randomUUID().toString()); + Path outputPath = partitionData.map(partition -> new Path(locationProvider.newDataLocation(partitionSpec, partition, fileName))) + .orElse(new Path(locationProvider.newDataLocation(fileName))); IcebergFileWriter writer = fileWriterFactory.createFileWriter( outputPath, diff --git a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergPageSinkProvider.java b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergPageSinkProvider.java index 94051d1f2083..21ea0017f2cf 100644 --- a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergPageSinkProvider.java +++ b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergPageSinkProvider.java @@ -23,13 +23,16 @@ import io.trino.spi.connector.ConnectorPageSinkProvider; import io.trino.spi.connector.ConnectorSession; import io.trino.spi.connector.ConnectorTransactionHandle; +import io.trino.spi.connector.SchemaTableName; import org.apache.iceberg.PartitionSpec; import org.apache.iceberg.PartitionSpecParser; import org.apache.iceberg.Schema; import org.apache.iceberg.SchemaParser; +import org.apache.iceberg.io.LocationProvider; import javax.inject.Inject; +import static io.trino.plugin.iceberg.IcebergUtil.getLocationProvider; import static java.util.Objects.requireNonNull; public class IcebergPageSinkProvider @@ -74,10 +77,12 @@ private ConnectorPageSink createPageSink(ConnectorSession session, IcebergWritab HdfsContext hdfsContext = new HdfsContext(session); Schema schema = SchemaParser.fromJson(tableHandle.getSchemaAsJson()); PartitionSpec partitionSpec = PartitionSpecParser.fromJson(schema, tableHandle.getPartitionSpecAsJson()); + LocationProvider locationProvider = getLocationProvider(new SchemaTableName(tableHandle.getSchemaName(), tableHandle.getTableName()), + tableHandle.getOutputPath(), tableHandle.getStorageProperties()); return new IcebergPageSink( schema, partitionSpec, - tableHandle.getOutputPath(), + locationProvider, fileWriterFactory, pageIndexerFactory, hdfsEnvironment, diff --git a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergUtil.java b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergUtil.java index 5cf6341fbf92..51be25c5be4c 100644 --- a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergUtil.java +++ b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergUtil.java @@ -39,6 +39,7 @@ import org.apache.iceberg.Table; import org.apache.iceberg.TableMetadata; import org.apache.iceberg.TableOperations; +import org.apache.iceberg.io.LocationProvider; import org.apache.iceberg.types.Type.PrimitiveType; import org.apache.iceberg.types.Types; @@ -64,6 +65,7 @@ import static io.trino.plugin.iceberg.IcebergErrorCode.ICEBERG_INVALID_SNAPSHOT_ID; import static io.trino.plugin.iceberg.util.Timestamps.timestampTzFromMicros; import static io.trino.spi.StandardErrorCode.GENERIC_INTERNAL_ERROR; +import static io.trino.spi.StandardErrorCode.NOT_SUPPORTED; import static io.trino.spi.type.BigintType.BIGINT; import static io.trino.spi.type.BooleanType.BOOLEAN; import static io.trino.spi.type.DateType.DATE; @@ -84,8 +86,10 @@ import static java.nio.charset.StandardCharsets.UTF_8; import static org.apache.iceberg.BaseMetastoreTableOperations.ICEBERG_TABLE_TYPE_VALUE; import static org.apache.iceberg.BaseMetastoreTableOperations.TABLE_TYPE_PROP; +import static org.apache.iceberg.LocationProviders.locationsFor; import static org.apache.iceberg.TableProperties.DEFAULT_FILE_FORMAT; import static org.apache.iceberg.TableProperties.DEFAULT_FILE_FORMAT_DEFAULT; +import static org.apache.iceberg.TableProperties.WRITE_LOCATION_PROVIDER_IMPL; import static org.apache.iceberg.types.Type.TypeID.BINARY; import static org.apache.iceberg.types.Type.TypeID.FIXED; @@ -190,14 +194,6 @@ private static Stream> primitiveFieldTypes(Types.N throw new IllegalStateException("Unsupported field type: " + nestedField); } - public static String getDataPath(String location) - { - if (!location.endsWith("/")) { - location += "/"; - } - return location + "data"; - } - public static FileFormat getFileFormat(Table table) { return FileFormat.valueOf(table.properties() @@ -327,4 +323,13 @@ public static Map getPartitionKeys(FileScanTask scanTask) return Collections.unmodifiableMap(partitionKeys); } + + public static LocationProvider getLocationProvider(SchemaTableName schemaTableName, String tableLocation, Map storageProperties) + { + if (storageProperties.containsKey(WRITE_LOCATION_PROVIDER_IMPL)) { + throw new TrinoException(NOT_SUPPORTED, "Table " + schemaTableName + " specifies " + storageProperties.get(WRITE_LOCATION_PROVIDER_IMPL) + + " as a location provider. Writing to Iceberg tables with custom location provider is not supported."); + } + return locationsFor(tableLocation, storageProperties); + } } diff --git a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergWritableTableHandle.java b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergWritableTableHandle.java index 267ccfb72da4..b95f295884aa 100644 --- a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergWritableTableHandle.java +++ b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergWritableTableHandle.java @@ -21,6 +21,7 @@ import org.apache.iceberg.FileFormat; import java.util.List; +import java.util.Map; import static java.util.Objects.requireNonNull; @@ -34,6 +35,7 @@ public class IcebergWritableTableHandle private final List inputColumns; private final String outputPath; private final FileFormat fileFormat; + private final Map storageProperties; @JsonCreator public IcebergWritableTableHandle( @@ -43,7 +45,8 @@ public IcebergWritableTableHandle( @JsonProperty("partitionSpecAsJson") String partitionSpecAsJson, @JsonProperty("inputColumns") List inputColumns, @JsonProperty("outputPath") String outputPath, - @JsonProperty("fileFormat") FileFormat fileFormat) + @JsonProperty("fileFormat") FileFormat fileFormat, + @JsonProperty("properties") Map storageProperties) { this.schemaName = requireNonNull(schemaName, "schemaName is null"); this.tableName = requireNonNull(tableName, "tableName is null"); @@ -52,6 +55,7 @@ public IcebergWritableTableHandle( this.inputColumns = ImmutableList.copyOf(requireNonNull(inputColumns, "inputColumns is null")); this.outputPath = requireNonNull(outputPath, "outputPath is null"); this.fileFormat = requireNonNull(fileFormat, "fileFormat is null"); + this.storageProperties = requireNonNull(storageProperties, "storageProperties is null"); } @JsonProperty @@ -96,6 +100,12 @@ public FileFormat getFileFormat() return fileFormat; } + @JsonProperty + public Map getStorageProperties() + { + return storageProperties; + } + @Override public String toString() { diff --git a/testing/trino-product-tests/src/main/java/io/trino/tests/product/iceberg/TestIcebergSparkCompatibility.java b/testing/trino-product-tests/src/main/java/io/trino/tests/product/iceberg/TestIcebergSparkCompatibility.java index d876e525446b..9cbf76dda9db 100644 --- a/testing/trino-product-tests/src/main/java/io/trino/tests/product/iceberg/TestIcebergSparkCompatibility.java +++ b/testing/trino-product-tests/src/main/java/io/trino/tests/product/iceberg/TestIcebergSparkCompatibility.java @@ -22,6 +22,7 @@ import static io.trino.tempto.assertions.QueryAssert.Row; import static io.trino.tempto.assertions.QueryAssert.Row.row; +import static io.trino.tempto.assertions.QueryAssert.assertQueryFailure; import static io.trino.tempto.assertions.QueryAssert.assertThat; import static io.trino.tests.product.TestGroups.ICEBERG; import static io.trino.tests.product.TestGroups.PROFILE_SPECIFIC_TESTS; @@ -29,6 +30,7 @@ import static io.trino.tests.product.utils.QueryExecutors.onTrino; import static java.lang.String.format; import static org.testng.Assert.assertEquals; +import static org.testng.Assert.assertTrue; public class TestIcebergSparkCompatibility extends ProductTest @@ -423,6 +425,34 @@ public void testTrinoShowingSparkCreatedTables() onTrino().executeQuery("DROP TABLE " + trinoTableName(trinoTable)); } + @Test(groups = {ICEBERG, PROFILE_SPECIFIC_TESTS}) + public void testTrinoWritingDataWithObjectStorageLocationProvider() + { + String baseTableName = "test_object_storage_location_provider"; + String sparkTableName = sparkTableName(baseTableName); + String trinoTableName = trinoTableName(baseTableName); + String dataPath = "hdfs://hadoop-master:9000/user/hive/warehouse/test_object_storage_location_provider/obj-data"; + + onSpark().executeQuery(format("CREATE TABLE %s (_string STRING, _bigint BIGINT) USING ICEBERG TBLPROPERTIES (" + + "'write.object-storage.enabled'=true," + + "'write.object-storage.path'='%s')", + sparkTableName, dataPath)); + onTrino().executeQuery(format("INSERT INTO %s VALUES ('a_string', 1000000000000000)", trinoTableName)); + + Row result = row("a_string", 1000000000000000L); + assertThat(onSpark().executeQuery(format("SELECT _string, _bigint FROM %s", sparkTableName))).containsOnly(result); + assertThat(onTrino().executeQuery(format("SELECT _string, _bigint FROM %s", trinoTableName))).containsOnly(result); + + QueryResult queryResult = onTrino().executeQuery(format("SELECT file_path FROM %s", trinoTableName("\"" + baseTableName + "$files\""))); + assertThat(queryResult).hasRowsCount(1).hasColumnsCount(1); + assertTrue(((String) queryResult.row(0).get(0)).contains(dataPath)); + + // TODO: support path override in Iceberg table creation: https://github.com/trinodb/trino/issues/8861 + assertQueryFailure(() -> onTrino().executeQuery("DROP TABLE " + trinoTableName)) + .hasMessageContaining("contains Iceberg path override properties and cannot be dropped from Trino"); + onSpark().executeQuery("DROP TABLE " + sparkTableName); + } + private static String sparkTableName(String tableName) { return format("%s.%s.%s", SPARK_CATALOG, TEST_SCHEMA_NAME, tableName);