Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
42 changes: 42 additions & 0 deletions docs/src/main/sphinx/connector/iceberg.rst
Original file line number Diff line number Diff line change
Expand Up @@ -403,6 +403,7 @@ TABLE </sql/create-table>` syntax. Optionally specify the
WITH (
format = 'PARQUET',
partitioning = ARRAY['c1', 'c2'],
sorted_by = ARRAY['c3'],
location = 's3://my-bucket/a/path/'
);

Expand Down Expand Up @@ -571,6 +572,7 @@ The following table properties can be updated after a table is created:
* ``format``
* ``format_version``
* ``partitioning``
* ``sorted_by``

For example, to update a table from v1 of the Iceberg specification to v2:

Expand Down Expand Up @@ -793,6 +795,46 @@ In this example, the table is partitioned by the month of ``order_date``, a hash
country VARCHAR)
WITH (partitioning = ARRAY['month(order_date)', 'bucket(account_number, 10)', 'country'])

Sorted tables
-------------

The connector supports sorted files as a performance improvement. Data is
sorted during writes within each file based on the specified array of one
or more columns.

Sorting is particularly beneficial when the sorted columns show a
high cardinality and are used as a filter for selective reads.

The sort order is configured with the ``sorted_by`` table property.
Specify an array of one or more columns to use for sorting
when creating the table. The following example configures the
``order_date`` column of the ``orders`` table in the ``customers``
schema in the ``example`` catalog::

CREATE TABLE example.customers.orders (
order_id BIGINT,
order_date DATE,
account_number BIGINT,
customer VARCHAR,
country VARCHAR)
WITH (sorted_by = ARRAY['order_date'])

Sorting can be combined with partitioning on the same column. For example::

CREATE TABLE example.customers.orders (
order_id BIGINT,
order_date DATE,
account_number BIGINT,
customer VARCHAR,
country VARCHAR)
WITH (
partitioning = ARRAY['month(order_date)'],
sorted_by = ARRAY['order_date']
)

You can disable sorted writing with the session property
``sorted_writing_enabled`` set to ``false``.

Rolling back to a previous snapshot
-----------------------------------

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,6 @@
import javax.validation.constraints.AssertTrue;
import javax.validation.constraints.DecimalMax;
import javax.validation.constraints.DecimalMin;
import javax.validation.constraints.Max;
import javax.validation.constraints.Min;
import javax.validation.constraints.NotNull;

Expand Down Expand Up @@ -82,7 +81,6 @@ public class HiveConfig
private Integer maxSplitsPerSecond;
private DataSize maxInitialSplitSize;
private int domainCompactionThreshold = 100;
private DataSize writerSortBufferSize = DataSize.of(64, MEGABYTE);
private boolean forceLocalScheduling;
private boolean recursiveDirWalkerEnabled;
private boolean ignoreAbsentPartitions;
Expand All @@ -105,7 +103,6 @@ public class HiveConfig
// to avoid deleting those files if Trino is unable to check.
private boolean deleteSchemaLocationsFallback;
private int maxPartitionsPerWriter = 100;
private int maxOpenSortFiles = 50;
private int writeValidationThreads = 16;
private boolean validateBucketing = true;
private boolean parallelPartitionedBucketedWrites = true;
Expand Down Expand Up @@ -274,20 +271,6 @@ public HiveConfig setTargetMaxFileSize(DataSize targetMaxFileSize)
return this;
}

@MinDataSize("1MB")
@MaxDataSize("1GB")
Comment thread
alexjo2144 marked this conversation as resolved.
Outdated
public DataSize getWriterSortBufferSize()
{
return writerSortBufferSize;
}

@Config("hive.writer-sort-buffer-size")
public HiveConfig setWriterSortBufferSize(DataSize writerSortBufferSize)
{
this.writerSortBufferSize = writerSortBufferSize;
return this;
}

public boolean isForceLocalScheduling()
{
return forceLocalScheduling;
Expand Down Expand Up @@ -609,21 +592,6 @@ public HiveConfig setMaxPartitionsPerWriter(int maxPartitionsPerWriter)
return this;
}

@Min(2)
@Max(1000)
public int getMaxOpenSortFiles()
{
return maxOpenSortFiles;
}

@Config("hive.max-open-sort-files")
@ConfigDescription("Maximum number of writer temporary files to read in one pass")
public HiveConfig setMaxOpenSortFiles(int maxOpenSortFiles)
{
this.maxOpenSortFiles = maxOpenSortFiles;
return this;
}

public int getWriteValidationThreads()
{
return writeValidationThreads;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,7 @@ public void configure(Binder binder)
{
configBinder(binder).bindConfig(HiveConfig.class);
configBinder(binder).bindConfig(HiveMetastoreConfig.class);
configBinder(binder).bindConfig(SortingFileWriterConfig.class, "hive");

binder.bind(HiveSessionProperties.class).in(Scopes.SINGLETON);
binder.bind(HiveTableProperties.class).in(Scopes.SINGLETON);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,7 @@ public HivePageSinkProvider(
PageIndexerFactory pageIndexerFactory,
TypeManager typeManager,
HiveConfig config,
SortingFileWriterConfig sortingFileWriterConfig,
LocationService locationService,
JsonCodec<PartitionUpdate> partitionUpdateCodec,
NodeManager nodeManager,
Expand All @@ -104,8 +105,8 @@ public HivePageSinkProvider(
this.pageIndexerFactory = requireNonNull(pageIndexerFactory, "pageIndexerFactory is null");
this.typeManager = requireNonNull(typeManager, "typeManager is null");
this.maxOpenPartitions = config.getMaxPartitionsPerWriter();
this.maxOpenSortFiles = config.getMaxOpenSortFiles();
this.writerSortBufferSize = requireNonNull(config.getWriterSortBufferSize(), "writerSortBufferSize is null");
this.maxOpenSortFiles = sortingFileWriterConfig.getMaxOpenSortFiles();
this.writerSortBufferSize = requireNonNull(sortingFileWriterConfig.getWriterSortBufferSize(), "writerSortBufferSize is null");
this.locationService = requireNonNull(locationService, "locationService is null");
this.writeVerificationExecutor = listeningDecorator(newFixedThreadPool(config.getWriteValidationThreads(), daemonThreadsNamed("hive-write-validation-%s")));
this.partitionUpdateCodec = requireNonNull(partitionUpdateCodec, "partitionUpdateCodec is null");
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,60 @@
/*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.trino.plugin.hive;

import io.airlift.configuration.Config;
import io.airlift.configuration.ConfigDescription;
import io.airlift.units.DataSize;
import io.airlift.units.MaxDataSize;
import io.airlift.units.MinDataSize;

import javax.validation.constraints.Max;
import javax.validation.constraints.Min;

import static io.airlift.units.DataSize.Unit.MEGABYTE;

public class SortingFileWriterConfig
{
private DataSize writerSortBufferSize = DataSize.of(64, MEGABYTE);
private int maxOpenSortFiles = 50;

@MinDataSize("1MB")
@MaxDataSize("1GB")
public DataSize getWriterSortBufferSize()
{
return writerSortBufferSize;
}

@Config("writer-sort-buffer-size")
public SortingFileWriterConfig setWriterSortBufferSize(DataSize writerSortBufferSize)
{
this.writerSortBufferSize = writerSortBufferSize;
return this;
}

@Min(2)
@Max(1000)
public int getMaxOpenSortFiles()
{
return maxOpenSortFiles;
}

@Config("max-open-sort-files")
@ConfigDescription("Maximum number of writer temporary files to read in one pass")
public SortingFileWriterConfig setMaxOpenSortFiles(int maxOpenSortFiles)
{
this.maxOpenSortFiles = maxOpenSortFiles;
return this;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -910,6 +910,7 @@ public Optional<ConnectorMaterializedViewDefinition> getMaterializedView(Connect
new GroupByHashPageIndexerFactory(JOIN_COMPILER, BLOCK_TYPE_OPERATORS),
TESTING_TYPE_MANAGER,
getHiveConfig(),
getSortingFileWriterConfig(),
locationService,
partitionUpdateCodec,
new TestingNodeManager("fake-environment"),
Expand All @@ -934,8 +935,13 @@ public Optional<ConnectorMaterializedViewDefinition> getMaterializedView(Connect
protected HiveConfig getHiveConfig()
{
return new HiveConfig()
.setTemporaryStagingDirectoryPath(temporaryStagingDirectory.toAbsolutePath().toString());
}

protected SortingFileWriterConfig getSortingFileWriterConfig()
{
return new SortingFileWriterConfig()
.setMaxOpenSortFiles(10)
.setTemporaryStagingDirectoryPath(temporaryStagingDirectory.toAbsolutePath().toString())
.setWriterSortBufferSize(DataSize.of(100, KILOBYTE));
}

Expand Down Expand Up @@ -2764,7 +2770,7 @@ private void doTestBucketSortedTables(SchemaTableName table)

assertThat(listAllDataFiles(context, stagingPathRoot))
.filteredOn(file -> file.contains(".tmp-sort."))
.size().isGreaterThan(bucketCount * getHiveConfig().getMaxOpenSortFiles() * 2);
.size().isGreaterThan(bucketCount * getSortingFileWriterConfig().getMaxOpenSortFiles() * 2);

// finish the write
Collection<Slice> fragments = getFutureValue(sink.finish());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -249,6 +249,7 @@ protected void setup(String host, int port, String databaseName, boolean s3Selec
new GroupByHashPageIndexerFactory(new JoinCompiler(typeOperators), blockTypeOperators),
TESTING_TYPE_MANAGER,
config,
new SortingFileWriterConfig(),
locationService,
partitionUpdateCodec,
new TestingNodeManager("fake-environment"),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,6 @@ public void testDefaults()
.setMaxSplitsPerSecond(null)
.setDomainCompactionThreshold(100)
.setTargetMaxFileSize(DataSize.of(1, Unit.GIGABYTE))
.setWriterSortBufferSize(DataSize.of(64, Unit.MEGABYTE))
.setForceLocalScheduling(false)
.setMaxConcurrentFileSystemOperations(20)
.setMaxConcurrentMetastoreDrops(20)
Expand All @@ -71,7 +70,6 @@ public void testDefaults()
.setSortedWritingEnabled(true)
.setPropagateTableScanSortingProperties(false)
.setMaxPartitionsPerWriter(100)
.setMaxOpenSortFiles(50)
.setWriteValidationThreads(16)
.setValidateBucketing(true)
.setParallelPartitionedBucketedWrites(true)
Expand Down Expand Up @@ -141,7 +139,6 @@ public void testExplicitPropertyMappings()
.put("hive.max-splits-per-second", "1")
.put("hive.domain-compaction-threshold", "42")
.put("hive.target-max-file-size", "72MB")
.put("hive.writer-sort-buffer-size", "13MB")
.put("hive.recursive-directories", "true")
.put("hive.ignore-absent-partitions", "true")
.put("hive.storage-format", "SEQUENCEFILE")
Expand All @@ -152,7 +149,6 @@ public void testExplicitPropertyMappings()
.put("hive.create-empty-bucket-files", "true")
.put("hive.delete-schema-locations-fallback", "true")
.put("hive.max-partitions-per-writers", "222")
.put("hive.max-open-sort-files", "333")
.put("hive.write-validation-threads", "11")
.put("hive.validate-bucketing", "false")
.put("hive.parallel-partitioned-bucketed-writes", "false")
Expand Down Expand Up @@ -226,7 +222,6 @@ public void testExplicitPropertyMappings()
.setMaxSplitsPerSecond(1)
.setDomainCompactionThreshold(42)
.setTargetMaxFileSize(DataSize.of(72, Unit.MEGABYTE))
.setWriterSortBufferSize(DataSize.of(13, Unit.MEGABYTE))
.setForceLocalScheduling(true)
.setMaxConcurrentFileSystemOperations(100)
.setMaxConcurrentMetastoreDrops(100)
Expand All @@ -242,7 +237,6 @@ public void testExplicitPropertyMappings()
.setCreateEmptyBucketFiles(true)
.setDeleteSchemaLocationsFallback(true)
.setMaxPartitionsPerWriter(222)
.setMaxOpenSortFiles(333)
.setWriteValidationThreads(11)
.setValidateBucketing(false)
.setParallelPartitionedBucketedWrites(false)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -108,6 +108,7 @@ public void testAllFormats()
throws Exception
{
HiveConfig config = new HiveConfig();
SortingFileWriterConfig sortingFileWriterConfig = new SortingFileWriterConfig();
File tempDir = Files.createTempDirectory(null).toFile();
try {
HiveMetastore metastore = createTestingFileHiveMetastore(new File(tempDir, "metastore"));
Expand All @@ -122,7 +123,7 @@ public void testAllFormats()
}
config.setHiveStorageFormat(format);
config.setHiveCompressionCodec(NONE);
long uncompressedLength = writeTestFile(config, metastore, makeFileName(tempDir, config));
long uncompressedLength = writeTestFile(config, sortingFileWriterConfig, metastore, makeFileName(tempDir, config));
assertGreaterThan(uncompressedLength, 0L);

for (HiveCompressionOption codec : HiveCompressionOption.values()) {
Expand All @@ -132,12 +133,12 @@ public void testAllFormats()
config.setHiveCompressionCodec(codec);

if (!isSupportedCodec(format, codec)) {
assertThatThrownBy(() -> writeTestFile(config, metastore, makeFileName(tempDir, config)))
assertThatThrownBy(() -> writeTestFile(config, sortingFileWriterConfig, metastore, makeFileName(tempDir, config)))
.hasMessage("Compression codec " + codec + " not supported for " + format);
continue;
}

long length = writeTestFile(config, metastore, makeFileName(tempDir, config));
long length = writeTestFile(config, sortingFileWriterConfig, metastore, makeFileName(tempDir, config));
assertTrue(uncompressedLength > length, format("%s with %s compressed to %s which is not less than %s", format, codec, length, uncompressedLength));
}
}
Expand All @@ -160,11 +161,11 @@ private static String makeFileName(File tempDir, HiveConfig config)
return tempDir.getAbsolutePath() + "/" + config.getHiveStorageFormat().name() + "." + config.getHiveCompressionCodec().name();
}

private static long writeTestFile(HiveConfig config, HiveMetastore metastore, String outputPath)
private static long writeTestFile(HiveConfig config, SortingFileWriterConfig sortingFileWriterConfig, HiveMetastore metastore, String outputPath)
{
HiveTransactionHandle transaction = new HiveTransactionHandle(false);
HiveWriterStats stats = new HiveWriterStats();
ConnectorPageSink pageSink = createPageSink(transaction, config, metastore, new Path("file:///" + outputPath), stats);
ConnectorPageSink pageSink = createPageSink(transaction, config, sortingFileWriterConfig, metastore, new Path("file:///" + outputPath), stats);
List<LineItemColumn> columns = getTestColumns();
List<Type> columnTypes = columns.stream()
.map(LineItemColumn::getType)
Expand Down Expand Up @@ -280,7 +281,7 @@ private static ConnectorPageSource createPageSource(HiveTransactionHandle transa
return provider.createPageSource(transaction, getHiveSession(config), split, table, ImmutableList.copyOf(getColumnHandles()), DynamicFilter.EMPTY);
}

private static ConnectorPageSink createPageSink(HiveTransactionHandle transaction, HiveConfig config, HiveMetastore metastore, Path outputPath, HiveWriterStats stats)
private static ConnectorPageSink createPageSink(HiveTransactionHandle transaction, HiveConfig config, SortingFileWriterConfig sortingFileWriterConfig, HiveMetastore metastore, Path outputPath, HiveWriterStats stats)
{
LocationHandle locationHandle = new LocationHandle(outputPath, outputPath, DIRECT_TO_TARGET_NEW_DIRECTORY);
HiveOutputTableHandle handle = new HiveOutputTableHandle(
Expand Down Expand Up @@ -310,6 +311,7 @@ private static ConnectorPageSink createPageSink(HiveTransactionHandle transactio
new GroupByHashPageIndexerFactory(new JoinCompiler(typeOperators), blockTypeOperators),
TESTING_TYPE_MANAGER,
config,
sortingFileWriterConfig,
new HiveLocationService(HDFS_ENVIRONMENT),
partitionUpdateCodec,
new TestingNodeManager("fake-environment"),
Expand Down
Loading