Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@
import io.airlift.units.Duration;
import io.trino.plugin.hive.HiveCompressionCodec;

import javax.validation.constraints.DecimalMax;
import javax.validation.constraints.DecimalMin;
import javax.validation.constraints.Max;
import javax.validation.constraints.Min;
import javax.validation.constraints.NotNull;
Expand Down Expand Up @@ -58,6 +60,7 @@ public class IcebergConfig
// putting schemas in locations with extraneous files), so default to false
// to avoid deleting those files if Trino is unable to check.
private boolean deleteSchemaLocationsFallback;
private double minimumAssignedSplitWeight = 0.05;
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Capture in code comment why 0.05.
Add similar in Hive.


public CatalogType getCatalogType()
{
Expand Down Expand Up @@ -271,4 +274,19 @@ public IcebergConfig setDeleteSchemaLocationsFallback(boolean deleteSchemaLocati
this.deleteSchemaLocationsFallback = deleteSchemaLocationsFallback;
return this;
}

@Config("iceberg.minimum-assigned-split-weight")
@ConfigDescription("Minimum weight that a split can be assigned")
public IcebergConfig setMinimumAssignedSplitWeight(double minimumAssignedSplitWeight)
{
this.minimumAssignedSplitWeight = minimumAssignedSplitWeight;
return this;
}

@DecimalMax("1")
@DecimalMin(value = "0", inclusive = false)
public double getMinimumAssignedSplitWeight()
{
return minimumAssignedSplitWeight;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,7 @@ public final class IcebergSessionProperties
private static final String PROJECTION_PUSHDOWN_ENABLED = "projection_pushdown_enabled";
private static final String TARGET_MAX_FILE_SIZE = "target_max_file_size";
private static final String HIVE_CATALOG_NAME = "hive_catalog_name";
private static final String MINIMUM_ASSIGNED_SPLIT_WEIGHT = "minimum_assigned_split_weight";
public static final String EXPIRE_SNAPSHOTS_MIN_RETENTION = "expire_snapshots_min_retention";
public static final String REMOVE_ORPHAN_FILES_MIN_RETENTION = "remove_orphan_files_min_retention";

Expand Down Expand Up @@ -229,6 +230,11 @@ public IcebergSessionProperties(
// Session-level redirections configuration does not work well with views, as view body is analyzed in context
// of a session with properties stripped off. Thus, this property is more of a test-only, or at most POC usefulness.
true))
.add(doubleProperty(
MINIMUM_ASSIGNED_SPLIT_WEIGHT,
"Minimum assigned split weight",
icebergConfig.getMinimumAssignedSplitWeight(),
false))
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think we should have a session property for this. Per my understanding, this is a safety toggle.
In particular, a user may set value to 0.0 (or close to 0.0), destabilizing the cluster.

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

(btw if you convince me this stays, it needs validator as in

)

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

i think we should remove this.
this pr #12656 removes this from hive

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@alexjo2144 we can go ahead with this PR, and i can update mine PR to remove from both

.add(durationProperty(
EXPIRE_SNAPSHOTS_MIN_RETENTION,
"Minimal retention period for expire_snapshot procedure",
Expand Down Expand Up @@ -394,4 +400,9 @@ public static Duration getRemoveOrphanFilesMinRetention(ConnectorSession session
{
return session.getProperty(REMOVE_ORPHAN_FILES_MIN_RETENTION, Duration.class);
}

public static double getMinimumAssignedSplitWeight(ConnectorSession session)
{
return session.getProperty(MINIMUM_ASSIGNED_SPLIT_WEIGHT, Double.class);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
import com.google.common.collect.ImmutableMap;
import io.trino.plugin.iceberg.delete.TrinoDeleteFile;
import io.trino.spi.HostAddress;
import io.trino.spi.SplitWeight;
Comment thread
alexjo2144 marked this conversation as resolved.
import io.trino.spi.connector.ConnectorSplit;
import org.openjdk.jol.info.ClassLayout;

Expand All @@ -43,6 +44,7 @@ public class IcebergSplit
private final String partitionSpecJson;
private final String partitionDataJson;
private final List<TrinoDeleteFile> deletes;
private final SplitWeight splitWeight;

@JsonCreator
public IcebergSplit(
Expand All @@ -55,7 +57,8 @@ public IcebergSplit(
@JsonProperty("addresses") List<HostAddress> addresses,
@JsonProperty("partitionSpecJson") String partitionSpecJson,
@JsonProperty("partitionDataJson") String partitionDataJson,
@JsonProperty("deletes") List<TrinoDeleteFile> deletes)
@JsonProperty("deletes") List<TrinoDeleteFile> deletes,
@JsonProperty("splitWeight") SplitWeight splitWeight)
{
this.path = requireNonNull(path, "path is null");
this.start = start;
Expand All @@ -67,6 +70,7 @@ public IcebergSplit(
this.partitionSpecJson = requireNonNull(partitionSpecJson, "partitionSpecJson is null");
this.partitionDataJson = requireNonNull(partitionDataJson, "partitionDataJson is null");
this.deletes = ImmutableList.copyOf(requireNonNull(deletes, "deletes is null"));
this.splitWeight = requireNonNull(splitWeight, "splitWeight is null");
}

@Override
Expand Down Expand Up @@ -136,6 +140,13 @@ public List<TrinoDeleteFile> getDeletes()
return deletes;
}

@JsonProperty
@Override
public SplitWeight getSplitWeight()
{
return splitWeight;
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

SplitWeight.fromProportion(length)

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

besides being simpler, it's better to avoid a new field, so that it's clear what the weight is, and that it's reasonable & consistent with other split's state

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We could try SplitWeight.fromRawValue(length) but that comes with the implication that 100 is the value of a "standard size split" which isn't right for length. I think if we do that the weights will imply that regular size splits are very large and may be scheduled slower. I haven't tested it though.

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

My understanding from skimming the code is that if all splits return fromRawValue(100), or fromProportion(1.0) splits will be scheduled exactly how they are now, which is what we want for full size splits.

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That's correct. The internal representation is an integer value, normalized where a single standard split weight value is 100. This is done to avoid floating point error accumulation. This is considered to be an implementation detail, and we made the choice based on the initial PR feedback to expose the a way for connectors to express split weights relative to 1.0 in case someone felt the need to adjust the expression of a "standard" weight for higher granularity, eg: normalize the "standard" weight to 1,000.

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

To address the initial feedback point, you need to know the split's length in bytes relative to the configured target split size, which means that you would need to compute the value and store it as a field unless the target split size was also hard-coded.

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I now understand this isn't a good idea.

To address the initial feedback point, you need to know the split's length in bytes relative to the configured target split size

that's weird concept from SPI perspective, since "configured target split size" is not a SPI concept itself

@pettyjamesm if a connector uses fromProportion(d) is relation between d and 1.0 really important?
how?

(and where is it documented? :) )

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

A split with a weight of 1.0 is "standard". If all splits are standard weight, then the scheduler will place exactly as many splits per node per task as the scheduler config specifies. If all splits have a weight of 0.5, then the scheduler is allowed to place 2x as many splits per node and task, and with all splits weighing 2.0- 1/2 as many.

It's not explicitly documented, although the PrestoDB PR which merged after the Trino PR did include references to this behavior in the NodeScheduler Properties Reference. We should probably port those documentation changes.

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should probably port those documentation changes.

thanks, please do

}

@Override
public Object getInfo()
{
Expand All @@ -154,7 +165,8 @@ public long getRetainedSizeInBytes()
+ estimatedSizeOf(addresses, HostAddress::getRetainedSizeInBytes)
+ estimatedSizeOf(partitionSpecJson)
+ estimatedSizeOf(partitionDataJson)
+ estimatedSizeOf(deletes, TrinoDeleteFile::getRetainedSizeInBytes);
+ estimatedSizeOf(deletes, TrinoDeleteFile::getRetainedSizeInBytes)
+ splitWeight.getRetainedSizeInBytes();
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
import javax.inject.Inject;

import static io.trino.plugin.iceberg.IcebergSessionProperties.getDynamicFilteringWaitTimeout;
import static io.trino.plugin.iceberg.IcebergSessionProperties.getMinimumAssignedSplitWeight;
import static java.util.Objects.requireNonNull;

public class IcebergSplitManager
Expand Down Expand Up @@ -79,7 +80,8 @@ public ConnectorSplitSource getSplits(
dynamicFilteringWaitTimeout,
constraint,
typeManager,
table.isRecordScannedFiles());
table.isRecordScannedFiles(),
getMinimumAssignedSplitWeight(session));

return new ClassLoaderSafeConnectorSplitSource(splitSource, Thread.currentThread().getContextClassLoader());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,10 +18,11 @@
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableSet;
import com.google.common.collect.Iterators;
import com.google.common.collect.Streams;
import com.google.common.io.Closer;
import io.airlift.units.DataSize;
import io.airlift.units.Duration;
import io.trino.plugin.iceberg.delete.TrinoDeleteFile;
import io.trino.spi.SplitWeight;
import io.trino.spi.TrinoException;
import io.trino.spi.connector.ColumnHandle;
import io.trino.spi.connector.ConnectorPartitionHandle;
Expand All @@ -36,15 +37,16 @@
import io.trino.spi.predicate.ValueSet;
import io.trino.spi.type.TypeManager;
import org.apache.hadoop.fs.Path;
import org.apache.iceberg.CombinedScanTask;
import org.apache.iceberg.DataFile;
import org.apache.iceberg.FileScanTask;
import org.apache.iceberg.PartitionSpecParser;
import org.apache.iceberg.Schema;
import org.apache.iceberg.TableScan;
import org.apache.iceberg.expressions.Expression;
import org.apache.iceberg.io.CloseableIterable;
import org.apache.iceberg.io.CloseableIterator;
import org.apache.iceberg.types.Type;
import org.apache.iceberg.util.TableScanUtil;

import javax.annotation.Nullable;

Expand All @@ -53,8 +55,6 @@
import java.net.URI;
import java.net.URLEncoder;
import java.nio.ByteBuffer;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
Expand Down Expand Up @@ -100,9 +100,11 @@ public class IcebergSplitSource
private final Stopwatch dynamicFilterWaitStopwatch;
private final Constraint constraint;
private final TypeManager typeManager;
private final Closer closer = Closer.create();
private final double minimumAssignedSplitWeight;

private CloseableIterable<CombinedScanTask> combinedScanIterable;
private Iterator<FileScanTask> fileScanIterator;
private CloseableIterable<FileScanTask> fileScanTaskIterable;
private CloseableIterator<FileScanTask> fileScanTaskIterator;
private TupleDomain<IcebergColumnHandle> pushedDownDynamicFilterPredicate;

private final boolean recordScannedFiles;
Expand All @@ -116,7 +118,8 @@ public IcebergSplitSource(
Duration dynamicFilteringWaitTimeout,
Constraint constraint,
TypeManager typeManager,
boolean recordScannedFiles)
boolean recordScannedFiles,
double minimumAssignedSplitWeight)
{
this.tableHandle = requireNonNull(tableHandle, "tableHandle is null");
this.tableScan = requireNonNull(tableScan, "tableScan is null");
Expand All @@ -128,6 +131,7 @@ public IcebergSplitSource(
this.constraint = requireNonNull(constraint, "constraint is null");
this.typeManager = requireNonNull(typeManager, "typeManager is null");
this.recordScannedFiles = recordScannedFiles;
this.minimumAssignedSplitWeight = minimumAssignedSplitWeight;
}

@Override
Expand All @@ -140,7 +144,7 @@ public CompletableFuture<ConnectorSplitBatch> getNextBatch(ConnectorPartitionHan
.completeOnTimeout(EMPTY_BATCH, timeLeft, MILLISECONDS);
}

if (combinedScanIterable == null) {
if (fileScanTaskIterable == null) {
// Used to avoid duplicating work if the Dynamic Filter was already pushed down to the Iceberg API
this.pushedDownDynamicFilterPredicate = dynamicFilter.getCurrentPredicate().transformKeys(IcebergColumnHandle.class::cast);
TupleDomain<IcebergColumnHandle> fullPredicate = tableHandle.getUnenforcedPredicate()
Expand All @@ -161,14 +165,14 @@ public CompletableFuture<ConnectorSplitBatch> getNextBatch(ConnectorPartitionHan
}

Expression filterExpression = toIcebergExpression(effectivePredicate);
this.combinedScanIterable = tableScan
.filter(filterExpression)
.includeColumnStats()
.planTasks();
this.fileScanIterator = Streams.stream(combinedScanIterable)
.map(CombinedScanTask::files)
.flatMap(Collection::stream)
.iterator();
this.fileScanTaskIterable = TableScanUtil.splitFiles(
tableScan.filter(filterExpression)
.includeColumnStats()
.planFiles(),
tableScan.targetSplitSize());
closer.register(fileScanTaskIterable);
this.fileScanTaskIterator = fileScanTaskIterable.iterator();
closer.register(fileScanTaskIterator);
}

TupleDomain<IcebergColumnHandle> dynamicFilterPredicate = dynamicFilter.getCurrentPredicate()
Expand All @@ -178,7 +182,7 @@ public CompletableFuture<ConnectorSplitBatch> getNextBatch(ConnectorPartitionHan
return completedFuture(NO_MORE_SPLITS_BATCH);
}

Iterator<FileScanTask> fileScanTasks = Iterators.limit(fileScanIterator, maxSize);
Iterator<FileScanTask> fileScanTasks = Iterators.limit(fileScanTaskIterator, maxSize);
ImmutableList.Builder<ConnectorSplit> splits = ImmutableList.builder();
while (fileScanTasks.hasNext()) {
FileScanTask scanTask = fileScanTasks.next();
Expand Down Expand Up @@ -238,14 +242,14 @@ public CompletableFuture<ConnectorSplitBatch> getNextBatch(ConnectorPartitionHan
private void finish()
{
close();
this.combinedScanIterable = CloseableIterable.empty();
this.fileScanIterator = Collections.emptyIterator();
this.fileScanTaskIterable = CloseableIterable.empty();
this.fileScanTaskIterator = CloseableIterator.empty();
}

@Override
public boolean isFinished()
{
return fileScanIterator != null && !fileScanIterator.hasNext();
return fileScanTaskIterator != null && !fileScanTaskIterator.hasNext();
}

@Override
Expand All @@ -261,13 +265,11 @@ public Optional<List<Object>> getTableExecuteSplitsInfo()
@Override
public void close()
{
if (combinedScanIterable != null) {
try {
combinedScanIterable.close();
}
catch (IOException e) {
throw new UncheckedIOException(e);
}
try {
closer.close();
}
catch (IOException e) {
throw new UncheckedIOException(e);
}
}

Expand Down Expand Up @@ -376,7 +378,7 @@ static boolean partitionMatchesPredicate(
return true;
}

private static IcebergSplit toIcebergSplit(FileScanTask task)
private IcebergSplit toIcebergSplit(FileScanTask task)
{
return new IcebergSplit(
hadoopPath(task.file().path().toString()),
Expand All @@ -390,7 +392,8 @@ private static IcebergSplit toIcebergSplit(FileScanTask task)
PartitionData.toJson(task.file().partition()),
task.deletes().stream()
.map(TrinoDeleteFile::copyOf)
.collect(toImmutableList()));
.collect(toImmutableList()),
SplitWeight.fromProportion(Math.min(Math.max((double) task.length() / tableScan.targetSplitSize(), minimumAssignedSplitWeight), 1.0)));
}

private static String hadoopPath(String path)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,8 @@ public void testDefaults()
.setExpireSnapshotsMinRetention(new Duration(7, DAYS))
.setRemoveOrphanFilesMinRetention(new Duration(7, DAYS))
.setDeleteSchemaLocationsFallback(false)
.setTargetMaxFileSize(DataSize.of(1, GIGABYTE)));
.setTargetMaxFileSize(DataSize.of(1, GIGABYTE))
.setMinimumAssignedSplitWeight(0.05));
}

@Test
Expand All @@ -77,6 +78,7 @@ public void testExplicitPropertyMappings()
.put("iceberg.remove_orphan_files.min-retention", "14h")
.put("iceberg.delete-schema-locations-fallback", "true")
.put("iceberg.target-max-file-size", "1MB")
.put("iceberg.minimum-assigned-split-weight", "0.01")
.buildOrThrow();

IcebergConfig expected = new IcebergConfig()
Expand All @@ -94,7 +96,8 @@ public void testExplicitPropertyMappings()
.setExpireSnapshotsMinRetention(new Duration(13, HOURS))
.setRemoveOrphanFilesMinRetention(new Duration(14, HOURS))
.setDeleteSchemaLocationsFallback(true)
.setTargetMaxFileSize(DataSize.of(1, MEGABYTE));
.setTargetMaxFileSize(DataSize.of(1, MEGABYTE))
.setMinimumAssignedSplitWeight(0.01);

assertFullMapping(properties, expected);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@
import io.trino.plugin.hive.parquet.ParquetReaderConfig;
import io.trino.plugin.hive.parquet.ParquetWriterConfig;
import io.trino.spi.Page;
import io.trino.spi.SplitWeight;
import io.trino.spi.block.BlockBuilder;
import io.trino.spi.connector.ColumnHandle;
import io.trino.spi.connector.ConnectorPageSource;
Expand Down Expand Up @@ -161,7 +162,8 @@ private static ConnectorPageSource createTestingPageSource(HiveTransactionHandle
ImmutableList.of(),
PartitionSpecParser.toJson(PartitionSpec.unpartitioned()),
PartitionData.toJson(new PartitionData(new Object[] {})),
ImmutableList.of());
ImmutableList.of(),
SplitWeight.standard());

TableHandle tableHandle = new TableHandle(
new CatalogName(ICEBERG_CATALOG_NAME),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -185,7 +185,8 @@ public TupleDomain<ColumnHandle> getCurrentPredicate()
new Duration(2, SECONDS),
alwaysTrue(),
new TestingTypeManager(),
false);
false,
new IcebergConfig().getMinimumAssignedSplitWeight());

ImmutableList.Builder<IcebergSplit> splits = ImmutableList.builder();
while (!splitSource.isFinished()) {
Expand Down