Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 9 additions & 0 deletions docs/src/main/sphinx/connector/iceberg.md
Original file line number Diff line number Diff line change
Expand Up @@ -217,6 +217,15 @@ implementation is used:
- Enable [sorted writing](iceberg-sorted-files) to tables with a specified sort order. Equivalent
session property is `sorted_writing_enabled`.
- `true`
* - `iceberg.sorted-writing.local-staging-path`
- A local directory that Trino can use for staging writes to sorted tables.
The `${USER}` placeholder can be used to use a different
location for each user. When this property is not configured, the target
storage will be used for staging while writing to sorted tables which can
be inefficient when writing to object stores like S3. When
`fs.hadoop.enabled` is not enabled, using this feature requires setup of
[local file system](/object-storage/file-system-local)
-
* - `iceberg.allowed-extra-properties`
- List of extra properties that are allowed to be set on Iceberg tables.
Use `*` to allow all properties.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,8 +22,10 @@
import io.airlift.units.DataSize;
import io.airlift.units.Duration;
import io.airlift.units.ThreadCount;
import io.trino.filesystem.Location;
import io.trino.plugin.hive.HiveCompressionOption;
import jakarta.validation.constraints.AssertFalse;
import jakarta.validation.constraints.AssertTrue;
import jakarta.validation.constraints.DecimalMax;
import jakarta.validation.constraints.DecimalMin;
import jakarta.validation.constraints.Max;
Expand Down Expand Up @@ -86,6 +88,7 @@ public class IcebergConfig
private boolean hideMaterializedViewStorageTable = true;
private Optional<String> materializedViewsStorageSchema = Optional.empty();
private boolean sortedWritingEnabled = true;
private Optional<String> sortedWritingLocalStagingPath = Optional.empty();
private boolean queryPartitionFilterRequired;
private Set<String> queryPartitionFilterRequiredSchemas = ImmutableSet.of();
private int splitManagerThreads = Math.min(Runtime.getRuntime().availableProcessors() * 2, 32);
Expand Down Expand Up @@ -453,6 +456,32 @@ public IcebergConfig setSortedWritingEnabled(boolean sortedWritingEnabled)
return this;
}

@NotNull
public Optional<String> getSortedWritingLocalStagingPath()
{
return sortedWritingLocalStagingPath;
}

@Config("iceberg.sorted-writing.local-staging-path")
@ConfigDescription("Use provided local directory for staging writes to sorted tables. Use ${USER} placeholder to use different location for each user")
public IcebergConfig setSortedWritingLocalStagingPath(String sortedWritingLocalStagingPath)
{
this.sortedWritingLocalStagingPath = Optional.ofNullable(sortedWritingLocalStagingPath);
return this;
}

@AssertTrue(message = "iceberg.sorted-writing.local-staging-path must not use any prefix other than file:// or local://")
public boolean isSortedWritingLocalStagingPathValid()
{
if (sortedWritingLocalStagingPath.isEmpty()) {
return true;
}
Optional<String> scheme = Location.of(sortedWritingLocalStagingPath.get()).scheme();
return scheme.isEmpty()
|| scheme.equals(Optional.of("file"))
|| scheme.equals(Optional.of("local"));
}

@Config("iceberg.query-partition-filter-required")
@ConfigDescription("Require a filter on at least one partition column")
public IcebergConfig setQueryPartitionFilterRequired(boolean queryPartitionFilterRequired)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -149,6 +149,7 @@ public IcebergPageSink(
List<TrinoSortField> sortOrder,
DataSize sortingFileWriterBufferSize,
int sortingFileWriterMaxOpenFiles,
Optional<String> sortedWritingLocalStagingPath,
TypeManager typeManager,
PageSorter pageSorter)
{
Expand All @@ -171,13 +172,17 @@ public IcebergPageSink(
this.sortedWritingEnabled = isSortedWritingEnabled(session);
this.sortingFileWriterBufferSize = requireNonNull(sortingFileWriterBufferSize, "sortingFileWriterBufferSize is null");
this.sortingFileWriterMaxOpenFiles = sortingFileWriterMaxOpenFiles;
this.tempDirectory = Location.of(locationProvider.newDataLocation("trino-tmp-files"));
this.typeManager = requireNonNull(typeManager, "typeManager is null");
this.pageSorter = requireNonNull(pageSorter, "pageSorter is null");
this.columnTypes = getTopLevelColumns(outputSchema, typeManager).stream()
.map(IcebergColumnHandle::getType)
.collect(toImmutableList());

this.tempDirectory = sortedWritingLocalStagingPath
.map(path -> path.replace("${USER}", session.getIdentity().getUser()))
.map(IcebergPageSink::createLocalSchemeIfAbsent)
.orElseGet(() -> Location.of(locationProvider.newDataLocation("trino-tmp-files")));

if (sortedWritingEnabled) {
ImmutableList.Builder<Integer> sortColumnIndexes = ImmutableList.builder();
ImmutableList.Builder<SortOrder> sortOrders = ImmutableList.builder();
Expand Down Expand Up @@ -591,6 +596,15 @@ private static int findFieldPosFromSchema(int fieldId, Types.StructType struct)
throw new IllegalArgumentException("Could not find field " + fieldId + " in schema");
}

private static Location createLocalSchemeIfAbsent(String path)
{
Location location = Location.of(path);
if (location.scheme().isPresent()) {
return location;
}
return Location.of("local:///" + location.path());
}

private static class WriteContext
{
private final IcebergFileWriter writer;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@
import org.apache.iceberg.io.LocationProvider;

import java.util.Map;
import java.util.Optional;

import static com.google.common.collect.Maps.transformValues;
import static io.trino.plugin.iceberg.IcebergSessionProperties.maxPartitionsPerWriter;
Expand All @@ -54,6 +55,7 @@ public class IcebergPageSinkProvider
private final PageIndexerFactory pageIndexerFactory;
private final DataSize sortingFileWriterBufferSize;
private final int sortingFileWriterMaxOpenFiles;
private final Optional<String> sortingFileWriterLocalStagingPath;
private final TypeManager typeManager;
private final PageSorter pageSorter;

Expand All @@ -64,6 +66,7 @@ public IcebergPageSinkProvider(
IcebergFileWriterFactory fileWriterFactory,
PageIndexerFactory pageIndexerFactory,
SortingFileWriterConfig sortingFileWriterConfig,
IcebergConfig icebergConfig,
TypeManager typeManager,
PageSorter pageSorter)
{
Expand All @@ -73,6 +76,7 @@ public IcebergPageSinkProvider(
this.pageIndexerFactory = requireNonNull(pageIndexerFactory, "pageIndexerFactory is null");
this.sortingFileWriterBufferSize = sortingFileWriterConfig.getWriterSortBufferSize();
this.sortingFileWriterMaxOpenFiles = sortingFileWriterConfig.getMaxOpenSortFiles();
this.sortingFileWriterLocalStagingPath = icebergConfig.getSortedWritingLocalStagingPath();
this.typeManager = requireNonNull(typeManager, "typeManager is null");
this.pageSorter = requireNonNull(pageSorter, "pageSorter is null");
}
Expand Down Expand Up @@ -111,6 +115,7 @@ private ConnectorPageSink createPageSink(ConnectorSession session, IcebergWritab
tableHandle.sortOrder(),
sortingFileWriterBufferSize,
sortingFileWriterMaxOpenFiles,
sortingFileWriterLocalStagingPath,
typeManager,
pageSorter);
}
Expand Down Expand Up @@ -142,6 +147,7 @@ public ConnectorPageSink createPageSink(ConnectorTransactionHandle transactionHa
optimizeHandle.sortOrder(),
sortingFileWriterBufferSize,
sortingFileWriterMaxOpenFiles,
sortingFileWriterLocalStagingPath,
typeManager,
pageSorter);
case OPTIMIZE_MANIFESTS:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import io.airlift.units.Duration;
import io.trino.plugin.hive.HiveCompressionOption;
import jakarta.validation.constraints.AssertFalse;
import jakarta.validation.constraints.AssertTrue;
import org.junit.jupiter.api.Test;

import java.util.Map;
Expand Down Expand Up @@ -70,6 +71,7 @@ public void testDefaults()
.setRegisterTableProcedureEnabled(false)
.setAddFilesProcedureEnabled(false)
.setSortedWritingEnabled(true)
.setSortedWritingLocalStagingPath(null)
.setQueryPartitionFilterRequired(false)
.setQueryPartitionFilterRequiredSchemas(ImmutableSet.of())
.setSplitManagerThreads(Integer.toString(Math.min(Runtime.getRuntime().availableProcessors() * 2, 32)))
Expand Down Expand Up @@ -114,6 +116,7 @@ public void testExplicitPropertyMappings()
.put("iceberg.register-table-procedure.enabled", "true")
.put("iceberg.add-files-procedure.enabled", "true")
.put("iceberg.sorted-writing-enabled", "false")
.put("iceberg.sorted-writing.local-staging-path", "/tmp/trino")
.put("iceberg.query-partition-filter-required", "true")
.put("iceberg.query-partition-filter-required-schemas", "bronze,silver")
.put("iceberg.split-manager-threads", "42")
Expand Down Expand Up @@ -154,6 +157,7 @@ public void testExplicitPropertyMappings()
.setRegisterTableProcedureEnabled(true)
.setAddFilesProcedureEnabled(true)
.setSortedWritingEnabled(false)
.setSortedWritingLocalStagingPath("/tmp/trino")
.setQueryPartitionFilterRequired(true)
.setQueryPartitionFilterRequiredSchemas(ImmutableSet.of("bronze", "silver"))
.setSplitManagerThreads("42")
Expand Down Expand Up @@ -181,5 +185,12 @@ public void testValidation()
"storageSchemaSetWhenHidingIsEnabled",
"iceberg.materialized-views.storage-schema may only be set when iceberg.materialized-views.hide-storage-table is set to false",
AssertFalse.class);

assertFailsValidation(
new IcebergConfig()
.setSortedWritingLocalStagingPath("s3://bucket/path"),
"sortedWritingLocalStagingPathValid",
"iceberg.sorted-writing.local-staging-path must not use any prefix other than file:// or local://",
AssertTrue.class);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,97 @@
/*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.trino.plugin.iceberg;

import com.google.common.collect.ImmutableList;
import io.trino.Session;
import io.trino.filesystem.Location;
import io.trino.filesystem.TrinoFileSystem;
import io.trino.testing.AbstractTestQueryFramework;
import io.trino.testing.QueryRunner;
import io.trino.testing.sql.TestTable;
import io.trino.tpch.TpchTable;
import org.apache.iceberg.FileFormat;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.Test;

import static io.trino.plugin.iceberg.IcebergTestUtils.checkOrcFileSorting;
import static io.trino.plugin.iceberg.IcebergTestUtils.checkParquetFileSorting;
import static io.trino.plugin.iceberg.IcebergTestUtils.getFileSystemFactory;
import static io.trino.testing.TestingConnectorSession.SESSION;
import static org.apache.iceberg.FileFormat.PARQUET;
import static org.assertj.core.api.Assertions.assertThat;

public class TestIcebergSortedWriting
extends AbstractTestQueryFramework
{
private TrinoFileSystem fileSystem;

@Override
protected QueryRunner createQueryRunner()
throws Exception
{
return IcebergQueryRunner.builder()
.setInitialTables(ImmutableList.of(TpchTable.LINE_ITEM))
.addIcebergProperty("iceberg.sorted-writing-enabled", "true")
// Test staging of sorted writes to local disk
.addIcebergProperty("iceberg.sorted-writing.local-staging-path", "/tmp/trino-${USER}")
// Allows testing the sorting writer flushing to the file system with smaller tables
.addIcebergProperty("iceberg.writer-sort-buffer-size", "1MB")
.build();
}

@BeforeAll
public void initFileSystem()
{
fileSystem = getFileSystemFactory(getDistributedQueryRunner()).create(SESSION);
}

@Test
public void testSortedWritingWithLocalStaging()
{
testSortedWritingWithLocalStaging(FileFormat.ORC);
testSortedWritingWithLocalStaging(FileFormat.PARQUET);
}

private void testSortedWritingWithLocalStaging(FileFormat format)
{
// Using a larger table forces buffered data to be written to disk
Session withSmallRowGroups = Session.builder(getSession())
.setCatalogSessionProperty("iceberg", "orc_writer_max_stripe_rows", "200")
.setCatalogSessionProperty("iceberg", "parquet_writer_block_size", "20kB")
.setCatalogSessionProperty("iceberg", "parquet_writer_batch_size", "200")
.build();
try (TestTable table = new TestTable(
getQueryRunner()::execute,
"test_sorted_lineitem_table",
"WITH (sorted_by = ARRAY['comment'], format = '" + format.name() + "') AS TABLE tpch.tiny.lineitem WITH NO DATA")) {
assertUpdate(
withSmallRowGroups,
"INSERT INTO " + table.getName() + " TABLE tpch.tiny.lineitem",
"VALUES 60175");
for (Object filePath : computeActual("SELECT file_path from \"" + table.getName() + "$files\"").getOnlyColumnAsSet()) {
assertThat(isFileSorted(Location.of((String) filePath), "comment", format)).isTrue();
}
assertQuery("SELECT * FROM " + table.getName(), "SELECT * FROM lineitem");
}
}

private boolean isFileSorted(Location path, String sortColumnName, FileFormat format)
{
if (format == PARQUET) {
return checkParquetFileSorting(fileSystem.newInputFile(path), sortColumnName);
}
return checkOrcFileSorting(fileSystem, path, sortColumnName);
}
}
Loading