Skip to content

Commit bacde5b

Browse files
Allow using local staging path for sorted writes in iceberg
Added iceberg.sorted-writing.local-staging-path config property Co-Authored-By: Raunaq Morarka <[email protected]>
1 parent ab38a19 commit bacde5b

File tree

6 files changed

+167
-1
lines changed

6 files changed

+167
-1
lines changed

docs/src/main/sphinx/connector/iceberg.md

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -217,6 +217,15 @@ implementation is used:
217217
- Enable [sorted writing](iceberg-sorted-files) to tables with a specified sort order. Equivalent
218218
session property is `sorted_writing_enabled`.
219219
- `true`
220+
* - `iceberg.sorted-writing.local-staging-path`
221+
- A local directory that Trino can use for staging writes to sorted tables.
222+
The `${USER}` placeholder can be used to use a different
223+
location for each user. When this property is not configured, the target
224+
storage will be used for staging while writing to sorted tables which can
225+
be inefficient when writing to object stores like S3. When
226+
`fs.hadoop.enabled` is not enabled, using this feature requires setup of
227+
[local file system](/object-storage/file-system-local)
228+
-
220229
* - `iceberg.allowed-extra-properties`
221230
- List of extra properties that are allowed to be set on Iceberg tables.
222231
Use `*` to allow all properties.

plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergConfig.java

Lines changed: 29 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -22,8 +22,10 @@
2222
import io.airlift.units.DataSize;
2323
import io.airlift.units.Duration;
2424
import io.airlift.units.ThreadCount;
25+
import io.trino.filesystem.Location;
2526
import io.trino.plugin.hive.HiveCompressionOption;
2627
import jakarta.validation.constraints.AssertFalse;
28+
import jakarta.validation.constraints.AssertTrue;
2729
import jakarta.validation.constraints.DecimalMax;
2830
import jakarta.validation.constraints.DecimalMin;
2931
import jakarta.validation.constraints.Max;
@@ -86,6 +88,7 @@ public class IcebergConfig
8688
private boolean hideMaterializedViewStorageTable = true;
8789
private Optional<String> materializedViewsStorageSchema = Optional.empty();
8890
private boolean sortedWritingEnabled = true;
91+
private Optional<String> sortedWritingLocalStagingPath = Optional.empty();
8992
private boolean queryPartitionFilterRequired;
9093
private Set<String> queryPartitionFilterRequiredSchemas = ImmutableSet.of();
9194
private int splitManagerThreads = Math.min(Runtime.getRuntime().availableProcessors() * 2, 32);
@@ -453,6 +456,32 @@ public IcebergConfig setSortedWritingEnabled(boolean sortedWritingEnabled)
453456
return this;
454457
}
455458

459+
@NotNull
460+
public Optional<String> getSortedWritingLocalStagingPath()
461+
{
462+
return sortedWritingLocalStagingPath;
463+
}
464+
465+
@Config("iceberg.sorted-writing.local-staging-path")
466+
@ConfigDescription("Use provided local directory for staging writes to sorted tables. Use ${USER} placeholder to use different location for each user")
467+
public IcebergConfig setSortedWritingLocalStagingPath(String sortedWritingLocalStagingPath)
468+
{
469+
this.sortedWritingLocalStagingPath = Optional.ofNullable(sortedWritingLocalStagingPath);
470+
return this;
471+
}
472+
473+
@AssertTrue(message = "iceberg.sorted-writing.local-staging-path must not use any prefix other than file:// or local://")
474+
public boolean isSortedWritingLocalStagingPathValid()
475+
{
476+
if (sortedWritingLocalStagingPath.isEmpty()) {
477+
return true;
478+
}
479+
Optional<String> scheme = Location.of(sortedWritingLocalStagingPath.get()).scheme();
480+
return scheme.isEmpty()
481+
|| scheme.equals(Optional.of("file"))
482+
|| scheme.equals(Optional.of("local"));
483+
}
484+
456485
@Config("iceberg.query-partition-filter-required")
457486
@ConfigDescription("Require a filter on at least one partition column")
458487
public IcebergConfig setQueryPartitionFilterRequired(boolean queryPartitionFilterRequired)

plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergPageSink.java

Lines changed: 15 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -149,6 +149,7 @@ public IcebergPageSink(
149149
List<TrinoSortField> sortOrder,
150150
DataSize sortingFileWriterBufferSize,
151151
int sortingFileWriterMaxOpenFiles,
152+
Optional<String> sortedWritingLocalStagingPath,
152153
TypeManager typeManager,
153154
PageSorter pageSorter)
154155
{
@@ -171,13 +172,17 @@ public IcebergPageSink(
171172
this.sortedWritingEnabled = isSortedWritingEnabled(session);
172173
this.sortingFileWriterBufferSize = requireNonNull(sortingFileWriterBufferSize, "sortingFileWriterBufferSize is null");
173174
this.sortingFileWriterMaxOpenFiles = sortingFileWriterMaxOpenFiles;
174-
this.tempDirectory = Location.of(locationProvider.newDataLocation("trino-tmp-files"));
175175
this.typeManager = requireNonNull(typeManager, "typeManager is null");
176176
this.pageSorter = requireNonNull(pageSorter, "pageSorter is null");
177177
this.columnTypes = getTopLevelColumns(outputSchema, typeManager).stream()
178178
.map(IcebergColumnHandle::getType)
179179
.collect(toImmutableList());
180180

181+
this.tempDirectory = sortedWritingLocalStagingPath
182+
.map(path -> path.replace("${USER}", session.getIdentity().getUser()))
183+
.map(IcebergPageSink::createLocalSchemeIfAbsent)
184+
.orElseGet(() -> Location.of(locationProvider.newDataLocation("trino-tmp-files")));
185+
181186
if (sortedWritingEnabled) {
182187
ImmutableList.Builder<Integer> sortColumnIndexes = ImmutableList.builder();
183188
ImmutableList.Builder<SortOrder> sortOrders = ImmutableList.builder();
@@ -591,6 +596,15 @@ private static int findFieldPosFromSchema(int fieldId, Types.StructType struct)
591596
throw new IllegalArgumentException("Could not find field " + fieldId + " in schema");
592597
}
593598

599+
private static Location createLocalSchemeIfAbsent(String path)
600+
{
601+
Location location = Location.of(path);
602+
if (location.scheme().isPresent()) {
603+
return location;
604+
}
605+
return Location.of("local:///" + location.path());
606+
}
607+
594608
private static class WriteContext
595609
{
596610
private final IcebergFileWriter writer;

plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergPageSinkProvider.java

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -39,6 +39,7 @@
3939
import org.apache.iceberg.io.LocationProvider;
4040

4141
import java.util.Map;
42+
import java.util.Optional;
4243

4344
import static com.google.common.collect.Maps.transformValues;
4445
import static io.trino.plugin.iceberg.IcebergSessionProperties.maxPartitionsPerWriter;
@@ -54,6 +55,7 @@ public class IcebergPageSinkProvider
5455
private final PageIndexerFactory pageIndexerFactory;
5556
private final DataSize sortingFileWriterBufferSize;
5657
private final int sortingFileWriterMaxOpenFiles;
58+
private final Optional<String> sortingFileWriterLocalStagingPath;
5759
private final TypeManager typeManager;
5860
private final PageSorter pageSorter;
5961

@@ -64,6 +66,7 @@ public IcebergPageSinkProvider(
6466
IcebergFileWriterFactory fileWriterFactory,
6567
PageIndexerFactory pageIndexerFactory,
6668
SortingFileWriterConfig sortingFileWriterConfig,
69+
IcebergConfig icebergConfig,
6770
TypeManager typeManager,
6871
PageSorter pageSorter)
6972
{
@@ -73,6 +76,7 @@ public IcebergPageSinkProvider(
7376
this.pageIndexerFactory = requireNonNull(pageIndexerFactory, "pageIndexerFactory is null");
7477
this.sortingFileWriterBufferSize = sortingFileWriterConfig.getWriterSortBufferSize();
7578
this.sortingFileWriterMaxOpenFiles = sortingFileWriterConfig.getMaxOpenSortFiles();
79+
this.sortingFileWriterLocalStagingPath = icebergConfig.getSortedWritingLocalStagingPath();
7680
this.typeManager = requireNonNull(typeManager, "typeManager is null");
7781
this.pageSorter = requireNonNull(pageSorter, "pageSorter is null");
7882
}
@@ -111,6 +115,7 @@ private ConnectorPageSink createPageSink(ConnectorSession session, IcebergWritab
111115
tableHandle.sortOrder(),
112116
sortingFileWriterBufferSize,
113117
sortingFileWriterMaxOpenFiles,
118+
sortingFileWriterLocalStagingPath,
114119
typeManager,
115120
pageSorter);
116121
}
@@ -142,6 +147,7 @@ public ConnectorPageSink createPageSink(ConnectorTransactionHandle transactionHa
142147
optimizeHandle.sortOrder(),
143148
sortingFileWriterBufferSize,
144149
sortingFileWriterMaxOpenFiles,
150+
sortingFileWriterLocalStagingPath,
145151
typeManager,
146152
pageSorter);
147153
case OPTIMIZE_MANIFESTS:

plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/TestIcebergConfig.java

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@
2020
import io.airlift.units.Duration;
2121
import io.trino.plugin.hive.HiveCompressionOption;
2222
import jakarta.validation.constraints.AssertFalse;
23+
import jakarta.validation.constraints.AssertTrue;
2324
import org.junit.jupiter.api.Test;
2425

2526
import java.util.Map;
@@ -70,6 +71,7 @@ public void testDefaults()
7071
.setRegisterTableProcedureEnabled(false)
7172
.setAddFilesProcedureEnabled(false)
7273
.setSortedWritingEnabled(true)
74+
.setSortedWritingLocalStagingPath(null)
7375
.setQueryPartitionFilterRequired(false)
7476
.setQueryPartitionFilterRequiredSchemas(ImmutableSet.of())
7577
.setSplitManagerThreads(Integer.toString(Math.min(Runtime.getRuntime().availableProcessors() * 2, 32)))
@@ -114,6 +116,7 @@ public void testExplicitPropertyMappings()
114116
.put("iceberg.register-table-procedure.enabled", "true")
115117
.put("iceberg.add-files-procedure.enabled", "true")
116118
.put("iceberg.sorted-writing-enabled", "false")
119+
.put("iceberg.sorted-writing.local-staging-path", "/tmp/trino")
117120
.put("iceberg.query-partition-filter-required", "true")
118121
.put("iceberg.query-partition-filter-required-schemas", "bronze,silver")
119122
.put("iceberg.split-manager-threads", "42")
@@ -154,6 +157,7 @@ public void testExplicitPropertyMappings()
154157
.setRegisterTableProcedureEnabled(true)
155158
.setAddFilesProcedureEnabled(true)
156159
.setSortedWritingEnabled(false)
160+
.setSortedWritingLocalStagingPath("/tmp/trino")
157161
.setQueryPartitionFilterRequired(true)
158162
.setQueryPartitionFilterRequiredSchemas(ImmutableSet.of("bronze", "silver"))
159163
.setSplitManagerThreads("42")
@@ -181,5 +185,12 @@ public void testValidation()
181185
"storageSchemaSetWhenHidingIsEnabled",
182186
"iceberg.materialized-views.storage-schema may only be set when iceberg.materialized-views.hide-storage-table is set to false",
183187
AssertFalse.class);
188+
189+
assertFailsValidation(
190+
new IcebergConfig()
191+
.setSortedWritingLocalStagingPath("s3://bucket/path"),
192+
"sortedWritingLocalStagingPathValid",
193+
"iceberg.sorted-writing.local-staging-path must not use any prefix other than file:// or local://",
194+
AssertTrue.class);
184195
}
185196
}
Lines changed: 97 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,97 @@
1+
/*
2+
* Licensed under the Apache License, Version 2.0 (the "License");
3+
* you may not use this file except in compliance with the License.
4+
* You may obtain a copy of the License at
5+
*
6+
* http://www.apache.org/licenses/LICENSE-2.0
7+
*
8+
* Unless required by applicable law or agreed to in writing, software
9+
* distributed under the License is distributed on an "AS IS" BASIS,
10+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
11+
* See the License for the specific language governing permissions and
12+
* limitations under the License.
13+
*/
14+
package io.trino.plugin.iceberg;
15+
16+
import com.google.common.collect.ImmutableList;
17+
import io.trino.Session;
18+
import io.trino.filesystem.Location;
19+
import io.trino.filesystem.TrinoFileSystem;
20+
import io.trino.testing.AbstractTestQueryFramework;
21+
import io.trino.testing.QueryRunner;
22+
import io.trino.testing.sql.TestTable;
23+
import io.trino.tpch.TpchTable;
24+
import org.apache.iceberg.FileFormat;
25+
import org.junit.jupiter.api.BeforeAll;
26+
import org.junit.jupiter.api.Test;
27+
28+
import static io.trino.plugin.iceberg.IcebergTestUtils.checkOrcFileSorting;
29+
import static io.trino.plugin.iceberg.IcebergTestUtils.checkParquetFileSorting;
30+
import static io.trino.plugin.iceberg.IcebergTestUtils.getFileSystemFactory;
31+
import static io.trino.testing.TestingConnectorSession.SESSION;
32+
import static org.apache.iceberg.FileFormat.PARQUET;
33+
import static org.assertj.core.api.Assertions.assertThat;
34+
35+
public class TestIcebergSortedWriting
36+
extends AbstractTestQueryFramework
37+
{
38+
private TrinoFileSystem fileSystem;
39+
40+
@Override
41+
protected QueryRunner createQueryRunner()
42+
throws Exception
43+
{
44+
return IcebergQueryRunner.builder()
45+
.setInitialTables(ImmutableList.of(TpchTable.LINE_ITEM))
46+
.addIcebergProperty("iceberg.sorted-writing-enabled", "true")
47+
// Test staging of sorted writes to local disk
48+
.addIcebergProperty("iceberg.sorted-writing.local-staging-path", "/tmp/trino-${USER}")
49+
// Allows testing the sorting writer flushing to the file system with smaller tables
50+
.addIcebergProperty("iceberg.writer-sort-buffer-size", "1MB")
51+
.build();
52+
}
53+
54+
@BeforeAll
55+
public void initFileSystem()
56+
{
57+
fileSystem = getFileSystemFactory(getDistributedQueryRunner()).create(SESSION);
58+
}
59+
60+
@Test
61+
public void testSortedWritingWithLocalStaging()
62+
{
63+
testSortedWritingWithLocalStaging(FileFormat.ORC);
64+
testSortedWritingWithLocalStaging(FileFormat.PARQUET);
65+
}
66+
67+
private void testSortedWritingWithLocalStaging(FileFormat format)
68+
{
69+
// Using a larger table forces buffered data to be written to disk
70+
Session withSmallRowGroups = Session.builder(getSession())
71+
.setCatalogSessionProperty("iceberg", "orc_writer_max_stripe_rows", "200")
72+
.setCatalogSessionProperty("iceberg", "parquet_writer_block_size", "20kB")
73+
.setCatalogSessionProperty("iceberg", "parquet_writer_batch_size", "200")
74+
.build();
75+
try (TestTable table = new TestTable(
76+
getQueryRunner()::execute,
77+
"test_sorted_lineitem_table",
78+
"WITH (sorted_by = ARRAY['comment'], format = '" + format.name() + "') AS TABLE tpch.tiny.lineitem WITH NO DATA")) {
79+
assertUpdate(
80+
withSmallRowGroups,
81+
"INSERT INTO " + table.getName() + " TABLE tpch.tiny.lineitem",
82+
"VALUES 60175");
83+
for (Object filePath : computeActual("SELECT file_path from \"" + table.getName() + "$files\"").getOnlyColumnAsSet()) {
84+
assertThat(isFileSorted(Location.of((String) filePath), "comment", format)).isTrue();
85+
}
86+
assertQuery("SELECT * FROM " + table.getName(), "SELECT * FROM lineitem");
87+
}
88+
}
89+
90+
private boolean isFileSorted(Location path, String sortColumnName, FileFormat format)
91+
{
92+
if (format == PARQUET) {
93+
return checkParquetFileSorting(fileSystem.newInputFile(path), sortColumnName);
94+
}
95+
return checkOrcFileSorting(fileSystem, path, sortColumnName);
96+
}
97+
}

0 commit comments

Comments
 (0)