diff --git a/docs/src/main/sphinx/connector/iceberg.md b/docs/src/main/sphinx/connector/iceberg.md index 1d415e83021e..c8dd790c5abd 100644 --- a/docs/src/main/sphinx/connector/iceberg.md +++ b/docs/src/main/sphinx/connector/iceberg.md @@ -217,6 +217,15 @@ implementation is used: - Enable [sorted writing](iceberg-sorted-files) to tables with a specified sort order. Equivalent session property is `sorted_writing_enabled`. - `true` +* - `iceberg.sorted-writing.local-staging-path` + - A local directory that Trino can use for staging writes to sorted tables. + The `${USER}` placeholder can be used to use a different + location for each user. When this property is not configured, the target + storage will be used for staging while writing to sorted tables which can + be inefficient when writing to object stores like S3. When + `fs.hadoop.enabled` is not enabled, using this feature requires setup of + [local file system](/object-storage/file-system-local) + - * - `iceberg.allowed-extra-properties` - List of extra properties that are allowed to be set on Iceberg tables. Use `*` to allow all properties. diff --git a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergConfig.java b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergConfig.java index 926059a7c6c5..ba95f5f56255 100644 --- a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergConfig.java +++ b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergConfig.java @@ -22,8 +22,10 @@ import io.airlift.units.DataSize; import io.airlift.units.Duration; import io.airlift.units.ThreadCount; +import io.trino.filesystem.Location; import io.trino.plugin.hive.HiveCompressionOption; import jakarta.validation.constraints.AssertFalse; +import jakarta.validation.constraints.AssertTrue; import jakarta.validation.constraints.DecimalMax; import jakarta.validation.constraints.DecimalMin; import jakarta.validation.constraints.Max; @@ -86,6 +88,7 @@ public class IcebergConfig private boolean hideMaterializedViewStorageTable = true; private Optional materializedViewsStorageSchema = Optional.empty(); private boolean sortedWritingEnabled = true; + private Optional sortedWritingLocalStagingPath = Optional.empty(); private boolean queryPartitionFilterRequired; private Set queryPartitionFilterRequiredSchemas = ImmutableSet.of(); private int splitManagerThreads = Math.min(Runtime.getRuntime().availableProcessors() * 2, 32); @@ -453,6 +456,32 @@ public IcebergConfig setSortedWritingEnabled(boolean sortedWritingEnabled) return this; } + @NotNull + public Optional getSortedWritingLocalStagingPath() + { + return sortedWritingLocalStagingPath; + } + + @Config("iceberg.sorted-writing.local-staging-path") + @ConfigDescription("Use provided local directory for staging writes to sorted tables. Use ${USER} placeholder to use different location for each user") + public IcebergConfig setSortedWritingLocalStagingPath(String sortedWritingLocalStagingPath) + { + this.sortedWritingLocalStagingPath = Optional.ofNullable(sortedWritingLocalStagingPath); + return this; + } + + @AssertTrue(message = "iceberg.sorted-writing.local-staging-path must not use any prefix other than file:// or local://") + public boolean isSortedWritingLocalStagingPathValid() + { + if (sortedWritingLocalStagingPath.isEmpty()) { + return true; + } + Optional scheme = Location.of(sortedWritingLocalStagingPath.get()).scheme(); + return scheme.isEmpty() + || scheme.equals(Optional.of("file")) + || scheme.equals(Optional.of("local")); + } + @Config("iceberg.query-partition-filter-required") @ConfigDescription("Require a filter on at least one partition column") public IcebergConfig setQueryPartitionFilterRequired(boolean queryPartitionFilterRequired) diff --git a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergPageSink.java b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergPageSink.java index 3432159270f5..872f12ca9501 100644 --- a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergPageSink.java +++ b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergPageSink.java @@ -149,6 +149,7 @@ public IcebergPageSink( List sortOrder, DataSize sortingFileWriterBufferSize, int sortingFileWriterMaxOpenFiles, + Optional sortedWritingLocalStagingPath, TypeManager typeManager, PageSorter pageSorter) { @@ -171,13 +172,17 @@ public IcebergPageSink( this.sortedWritingEnabled = isSortedWritingEnabled(session); this.sortingFileWriterBufferSize = requireNonNull(sortingFileWriterBufferSize, "sortingFileWriterBufferSize is null"); this.sortingFileWriterMaxOpenFiles = sortingFileWriterMaxOpenFiles; - this.tempDirectory = Location.of(locationProvider.newDataLocation("trino-tmp-files")); this.typeManager = requireNonNull(typeManager, "typeManager is null"); this.pageSorter = requireNonNull(pageSorter, "pageSorter is null"); this.columnTypes = getTopLevelColumns(outputSchema, typeManager).stream() .map(IcebergColumnHandle::getType) .collect(toImmutableList()); + this.tempDirectory = sortedWritingLocalStagingPath + .map(path -> path.replace("${USER}", session.getIdentity().getUser())) + .map(IcebergPageSink::createLocalSchemeIfAbsent) + .orElseGet(() -> Location.of(locationProvider.newDataLocation("trino-tmp-files"))); + if (sortedWritingEnabled) { ImmutableList.Builder sortColumnIndexes = ImmutableList.builder(); ImmutableList.Builder sortOrders = ImmutableList.builder(); @@ -591,6 +596,15 @@ private static int findFieldPosFromSchema(int fieldId, Types.StructType struct) throw new IllegalArgumentException("Could not find field " + fieldId + " in schema"); } + private static Location createLocalSchemeIfAbsent(String path) + { + Location location = Location.of(path); + if (location.scheme().isPresent()) { + return location; + } + return Location.of("local:///" + location.path()); + } + private static class WriteContext { private final IcebergFileWriter writer; diff --git a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergPageSinkProvider.java b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergPageSinkProvider.java index be6b8cb9dfa7..e94d1b5ba19e 100644 --- a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergPageSinkProvider.java +++ b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergPageSinkProvider.java @@ -39,6 +39,7 @@ import org.apache.iceberg.io.LocationProvider; import java.util.Map; +import java.util.Optional; import static com.google.common.collect.Maps.transformValues; import static io.trino.plugin.iceberg.IcebergSessionProperties.maxPartitionsPerWriter; @@ -54,6 +55,7 @@ public class IcebergPageSinkProvider private final PageIndexerFactory pageIndexerFactory; private final DataSize sortingFileWriterBufferSize; private final int sortingFileWriterMaxOpenFiles; + private final Optional sortingFileWriterLocalStagingPath; private final TypeManager typeManager; private final PageSorter pageSorter; @@ -64,6 +66,7 @@ public IcebergPageSinkProvider( IcebergFileWriterFactory fileWriterFactory, PageIndexerFactory pageIndexerFactory, SortingFileWriterConfig sortingFileWriterConfig, + IcebergConfig icebergConfig, TypeManager typeManager, PageSorter pageSorter) { @@ -73,6 +76,7 @@ public IcebergPageSinkProvider( this.pageIndexerFactory = requireNonNull(pageIndexerFactory, "pageIndexerFactory is null"); this.sortingFileWriterBufferSize = sortingFileWriterConfig.getWriterSortBufferSize(); this.sortingFileWriterMaxOpenFiles = sortingFileWriterConfig.getMaxOpenSortFiles(); + this.sortingFileWriterLocalStagingPath = icebergConfig.getSortedWritingLocalStagingPath(); this.typeManager = requireNonNull(typeManager, "typeManager is null"); this.pageSorter = requireNonNull(pageSorter, "pageSorter is null"); } @@ -111,6 +115,7 @@ private ConnectorPageSink createPageSink(ConnectorSession session, IcebergWritab tableHandle.sortOrder(), sortingFileWriterBufferSize, sortingFileWriterMaxOpenFiles, + sortingFileWriterLocalStagingPath, typeManager, pageSorter); } @@ -142,6 +147,7 @@ public ConnectorPageSink createPageSink(ConnectorTransactionHandle transactionHa optimizeHandle.sortOrder(), sortingFileWriterBufferSize, sortingFileWriterMaxOpenFiles, + sortingFileWriterLocalStagingPath, typeManager, pageSorter); case OPTIMIZE_MANIFESTS: diff --git a/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/TestIcebergConfig.java b/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/TestIcebergConfig.java index d7410a93f069..f7d9d02e94b6 100644 --- a/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/TestIcebergConfig.java +++ b/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/TestIcebergConfig.java @@ -20,6 +20,7 @@ import io.airlift.units.Duration; import io.trino.plugin.hive.HiveCompressionOption; import jakarta.validation.constraints.AssertFalse; +import jakarta.validation.constraints.AssertTrue; import org.junit.jupiter.api.Test; import java.util.Map; @@ -70,6 +71,7 @@ public void testDefaults() .setRegisterTableProcedureEnabled(false) .setAddFilesProcedureEnabled(false) .setSortedWritingEnabled(true) + .setSortedWritingLocalStagingPath(null) .setQueryPartitionFilterRequired(false) .setQueryPartitionFilterRequiredSchemas(ImmutableSet.of()) .setSplitManagerThreads(Integer.toString(Math.min(Runtime.getRuntime().availableProcessors() * 2, 32))) @@ -114,6 +116,7 @@ public void testExplicitPropertyMappings() .put("iceberg.register-table-procedure.enabled", "true") .put("iceberg.add-files-procedure.enabled", "true") .put("iceberg.sorted-writing-enabled", "false") + .put("iceberg.sorted-writing.local-staging-path", "/tmp/trino") .put("iceberg.query-partition-filter-required", "true") .put("iceberg.query-partition-filter-required-schemas", "bronze,silver") .put("iceberg.split-manager-threads", "42") @@ -154,6 +157,7 @@ public void testExplicitPropertyMappings() .setRegisterTableProcedureEnabled(true) .setAddFilesProcedureEnabled(true) .setSortedWritingEnabled(false) + .setSortedWritingLocalStagingPath("/tmp/trino") .setQueryPartitionFilterRequired(true) .setQueryPartitionFilterRequiredSchemas(ImmutableSet.of("bronze", "silver")) .setSplitManagerThreads("42") @@ -181,5 +185,12 @@ public void testValidation() "storageSchemaSetWhenHidingIsEnabled", "iceberg.materialized-views.storage-schema may only be set when iceberg.materialized-views.hide-storage-table is set to false", AssertFalse.class); + + assertFailsValidation( + new IcebergConfig() + .setSortedWritingLocalStagingPath("s3://bucket/path"), + "sortedWritingLocalStagingPathValid", + "iceberg.sorted-writing.local-staging-path must not use any prefix other than file:// or local://", + AssertTrue.class); } } diff --git a/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/TestIcebergSortedWriting.java b/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/TestIcebergSortedWriting.java new file mode 100644 index 000000000000..7df259125e7c --- /dev/null +++ b/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/TestIcebergSortedWriting.java @@ -0,0 +1,97 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.trino.plugin.iceberg; + +import com.google.common.collect.ImmutableList; +import io.trino.Session; +import io.trino.filesystem.Location; +import io.trino.filesystem.TrinoFileSystem; +import io.trino.testing.AbstractTestQueryFramework; +import io.trino.testing.QueryRunner; +import io.trino.testing.sql.TestTable; +import io.trino.tpch.TpchTable; +import org.apache.iceberg.FileFormat; +import org.junit.jupiter.api.BeforeAll; +import org.junit.jupiter.api.Test; + +import static io.trino.plugin.iceberg.IcebergTestUtils.checkOrcFileSorting; +import static io.trino.plugin.iceberg.IcebergTestUtils.checkParquetFileSorting; +import static io.trino.plugin.iceberg.IcebergTestUtils.getFileSystemFactory; +import static io.trino.testing.TestingConnectorSession.SESSION; +import static org.apache.iceberg.FileFormat.PARQUET; +import static org.assertj.core.api.Assertions.assertThat; + +public class TestIcebergSortedWriting + extends AbstractTestQueryFramework +{ + private TrinoFileSystem fileSystem; + + @Override + protected QueryRunner createQueryRunner() + throws Exception + { + return IcebergQueryRunner.builder() + .setInitialTables(ImmutableList.of(TpchTable.LINE_ITEM)) + .addIcebergProperty("iceberg.sorted-writing-enabled", "true") + // Test staging of sorted writes to local disk + .addIcebergProperty("iceberg.sorted-writing.local-staging-path", "/tmp/trino-${USER}") + // Allows testing the sorting writer flushing to the file system with smaller tables + .addIcebergProperty("iceberg.writer-sort-buffer-size", "1MB") + .build(); + } + + @BeforeAll + public void initFileSystem() + { + fileSystem = getFileSystemFactory(getDistributedQueryRunner()).create(SESSION); + } + + @Test + public void testSortedWritingWithLocalStaging() + { + testSortedWritingWithLocalStaging(FileFormat.ORC); + testSortedWritingWithLocalStaging(FileFormat.PARQUET); + } + + private void testSortedWritingWithLocalStaging(FileFormat format) + { + // Using a larger table forces buffered data to be written to disk + Session withSmallRowGroups = Session.builder(getSession()) + .setCatalogSessionProperty("iceberg", "orc_writer_max_stripe_rows", "200") + .setCatalogSessionProperty("iceberg", "parquet_writer_block_size", "20kB") + .setCatalogSessionProperty("iceberg", "parquet_writer_batch_size", "200") + .build(); + try (TestTable table = new TestTable( + getQueryRunner()::execute, + "test_sorted_lineitem_table", + "WITH (sorted_by = ARRAY['comment'], format = '" + format.name() + "') AS TABLE tpch.tiny.lineitem WITH NO DATA")) { + assertUpdate( + withSmallRowGroups, + "INSERT INTO " + table.getName() + " TABLE tpch.tiny.lineitem", + "VALUES 60175"); + for (Object filePath : computeActual("SELECT file_path from \"" + table.getName() + "$files\"").getOnlyColumnAsSet()) { + assertThat(isFileSorted(Location.of((String) filePath), "comment", format)).isTrue(); + } + assertQuery("SELECT * FROM " + table.getName(), "SELECT * FROM lineitem"); + } + } + + private boolean isFileSorted(Location path, String sortColumnName, FileFormat format) + { + if (format == PARQUET) { + return checkParquetFileSorting(fileSystem.newInputFile(path), sortColumnName); + } + return checkOrcFileSorting(fileSystem, path, sortColumnName); + } +}