diff --git a/plugin/trino-hive/src/main/java/io/trino/plugin/hive/HdfsEnvironment.java b/plugin/trino-hive/src/main/java/io/trino/plugin/hive/HdfsEnvironment.java
index 2099a062adfc..9ec45c285b64 100644
--- a/plugin/trino-hive/src/main/java/io/trino/plugin/hive/HdfsEnvironment.java
+++ b/plugin/trino-hive/src/main/java/io/trino/plugin/hive/HdfsEnvironment.java
@@ -27,13 +27,17 @@
import javax.inject.Inject;
+import java.io.Externalizable;
import java.io.IOException;
+import java.io.ObjectInput;
+import java.io.ObjectOutput;
import java.util.Optional;
import static com.google.common.base.MoreObjects.toStringHelper;
import static java.util.Objects.requireNonNull;
public class HdfsEnvironment
+ implements Externalizable
{
static {
HadoopNative.requireHadoopNative();
@@ -102,7 +106,29 @@ public void doAs(ConnectorIdentity identity, Runnable action)
hdfsAuthentication.doAs(identity, action);
}
+ public HdfsEnvironment()
+ {
+ this.hdfsConfiguration = null;
+ this.hdfsAuthentication = null;
+ this.newDirectoryPermissions = null;
+ this.newFileInheritOwnership = false;
+ this.verifyChecksum = false;
+ }
+
+ @Override
+ public void writeExternal(ObjectOutput out)
+ throws IOException
+ {
+ }
+
+ @Override
+ public void readExternal(ObjectInput in)
+ throws IOException, ClassNotFoundException
+ {
+ }
+
public static class HdfsContext
+ implements Externalizable
{
private final ConnectorIdentity identity;
@@ -130,5 +156,22 @@ public String toString()
.add("user", identity)
.toString();
}
+
+ public HdfsContext()
+ {
+ identity = null;
+ }
+
+ @Override
+ public void writeExternal(ObjectOutput out)
+ throws IOException
+ {
+ }
+
+ @Override
+ public void readExternal(ObjectInput in)
+ throws IOException, ClassNotFoundException
+ {
+ }
}
}
diff --git a/plugin/trino-iceberg/pom.xml b/plugin/trino-iceberg/pom.xml
index d74ee2d3a992..bc862be5ebdb 100644
--- a/plugin/trino-iceberg/pom.xml
+++ b/plugin/trino-iceberg/pom.xml
@@ -144,6 +144,12 @@
+
+ org.apache.iceberg
+ iceberg-data
+ ${dep.iceberg.version}
+
+
org.apache.iceberg
iceberg-hive-metastore
diff --git a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/CommitTaskData.java b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/CommitTaskData.java
index f8352e2c9bf3..8444e3556b1b 100644
--- a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/CommitTaskData.java
+++ b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/CommitTaskData.java
@@ -15,6 +15,7 @@
import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
+import org.apache.iceberg.FileContent;
import java.util.Optional;
@@ -27,18 +28,21 @@ public class CommitTaskData
private final long fileSizeInBytes;
private final MetricsWrapper metrics;
private final Optional partitionDataJson;
+ private final FileContent content;
@JsonCreator
public CommitTaskData(
@JsonProperty("path") String path,
@JsonProperty("fileSizeInBytes") long fileSizeInBytes,
@JsonProperty("metrics") MetricsWrapper metrics,
- @JsonProperty("partitionDataJson") Optional partitionDataJson)
+ @JsonProperty("partitionDataJson") Optional partitionDataJson,
+ @JsonProperty("content") FileContent content)
{
this.path = requireNonNull(path, "path is null");
this.fileSizeInBytes = fileSizeInBytes;
this.metrics = requireNonNull(metrics, "metrics is null");
this.partitionDataJson = requireNonNull(partitionDataJson, "partitionDataJson is null");
+ this.content = requireNonNull(content, "content is null");
checkArgument(fileSizeInBytes >= 0, "fileSizeInBytes is negative");
}
@@ -65,4 +69,10 @@ public Optional getPartitionDataJson()
{
return partitionDataJson;
}
+
+ @JsonProperty
+ public FileContent getContent()
+ {
+ return content;
+ }
}
diff --git a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergColumnHandle.java b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergColumnHandle.java
index d9d68ee247fd..c671aca2ded2 100644
--- a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergColumnHandle.java
+++ b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergColumnHandle.java
@@ -25,11 +25,26 @@
import java.util.Objects;
import java.util.Optional;
+import static io.trino.plugin.iceberg.ColumnIdentity.primitiveColumnIdentity;
+import static io.trino.spi.type.BigintType.BIGINT;
import static java.util.Objects.requireNonNull;
+import static org.apache.iceberg.MetadataColumns.IS_DELETED;
+import static org.apache.iceberg.MetadataColumns.ROW_POSITION;
public class IcebergColumnHandle
implements ColumnHandle
{
+ public static final IcebergColumnHandle ROW_POSITION_HANDLE = new IcebergColumnHandle(
+ primitiveColumnIdentity(ROW_POSITION.fieldId(), ROW_POSITION.name()),
+ BIGINT,
+ ImmutableList.of(),
+ BIGINT,
+ Optional.empty());
+
+ // use Integer.MIN_VALUE as $row_id field ID, which is currently not reserved by Iceberg
+ public static final int TRINO_ROW_ID_COLUMN_ID = Integer.MIN_VALUE;
+ public static final String TRINO_ROW_ID_COLUMN_NAME = "$row_id";
+
private final ColumnIdentity baseColumnIdentity;
private final Type baseType;
// The list of field ids to indicate the projected part of the top-level column represented by baseColumnIdentity
@@ -138,6 +153,21 @@ public boolean isBaseColumn()
return path.isEmpty();
}
+ public boolean isIcebergRowPositionMetadataColumn()
+ {
+ return id == ROW_POSITION.fieldId();
+ }
+
+ public boolean isIcebergIsDeletedMetadataColumn()
+ {
+ return id == IS_DELETED.fieldId();
+ }
+
+ public boolean isTrinoRowIdColumn()
+ {
+ return id == TRINO_ROW_ID_COLUMN_ID;
+ }
+
@Override
public int hashCode()
{
diff --git a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergConfig.java b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergConfig.java
index cbe71ee156d8..7580fb6f1378 100644
--- a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergConfig.java
+++ b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergConfig.java
@@ -19,6 +19,7 @@
import io.trino.plugin.hive.HiveCompressionCodec;
import org.apache.iceberg.FileFormat;
+import javax.validation.constraints.Max;
import javax.validation.constraints.Min;
import javax.validation.constraints.NotNull;
@@ -29,6 +30,9 @@
public class IcebergConfig
{
+ public static final int FORMAT_VERSION_SUPPORT_MIN = 1;
+ public static final int FORMAT_VERSION_SUPPORT_MAX = 2;
+
private IcebergFileFormat fileFormat = ORC;
private HiveCompressionCodec compressionCodec = GZIP;
private boolean useFileSizeFromMetadata = true;
@@ -38,6 +42,7 @@ public class IcebergConfig
private Duration dynamicFilteringWaitTimeout = new Duration(0, SECONDS);
private boolean tableStatisticsEnabled = true;
private boolean projectionPushdownEnabled = true;
+ private int formatVersion = FORMAT_VERSION_SUPPORT_MIN;
public CatalogType getCatalogType()
{
@@ -167,4 +172,19 @@ public IcebergConfig setProjectionPushdownEnabled(boolean projectionPushdownEnab
this.projectionPushdownEnabled = projectionPushdownEnabled;
return this;
}
+
+ @Min(FORMAT_VERSION_SUPPORT_MIN)
+ @Max(FORMAT_VERSION_SUPPORT_MAX)
+ public int getFormatVersion()
+ {
+ return formatVersion;
+ }
+
+ @Config("iceberg.format-version")
+ @ConfigDescription("Iceberg table format version to use when creating a table")
+ public IcebergConfig setFormatVersion(int formatVersion)
+ {
+ this.formatVersion = formatVersion;
+ return this;
+ }
}
diff --git a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergFileWriterFactory.java b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergFileWriterFactory.java
index 435cdf7586fc..3ba08c94906f 100644
--- a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergFileWriterFactory.java
+++ b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergFileWriterFactory.java
@@ -35,6 +35,7 @@
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.mapred.JobConf;
+import org.apache.iceberg.FileContent;
import org.apache.iceberg.FileFormat;
import org.apache.iceberg.Schema;
import org.apache.iceberg.types.Types;
@@ -110,11 +111,12 @@ public IcebergFileWriter createFileWriter(
JobConf jobConf,
ConnectorSession session,
HdfsContext hdfsContext,
- FileFormat fileFormat)
+ FileFormat fileFormat,
+ FileContent fileContent)
{
switch (fileFormat) {
case PARQUET:
- return createParquetWriter(outputPath, icebergSchema, jobConf, session, hdfsContext);
+ return createParquetWriter(outputPath, icebergSchema, jobConf, session, hdfsContext, fileContent);
case ORC:
return createOrcWriter(outputPath, icebergSchema, jobConf, session);
default:
@@ -127,7 +129,8 @@ private IcebergFileWriter createParquetWriter(
Schema icebergSchema,
JobConf jobConf,
ConnectorSession session,
- HdfsContext hdfsContext)
+ HdfsContext hdfsContext,
+ FileContent fileContent)
{
List fileColumnNames = icebergSchema.columns().stream()
.map(Types.NestedField::name)
@@ -162,7 +165,8 @@ private IcebergFileWriter createParquetWriter(
nodeVersion.toString(),
outputPath,
hdfsEnvironment,
- hdfsContext);
+ hdfsContext,
+ fileContent);
}
catch (IOException e) {
throw new TrinoException(ICEBERG_WRITER_OPEN_ERROR, "Error creating Parquet file", e);
diff --git a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergMetadata.java b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergMetadata.java
index 86ba0a78d266..3c0ef43b1e3e 100644
--- a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergMetadata.java
+++ b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergMetadata.java
@@ -26,6 +26,7 @@
import io.trino.plugin.hive.HiveApplyProjectionUtil;
import io.trino.plugin.hive.HiveApplyProjectionUtil.ProjectedColumnRepresentation;
import io.trino.plugin.hive.HiveWrittenPartitions;
+import io.trino.plugin.iceberg.serdes.IcebergTableWrapper;
import io.trino.spi.TrinoException;
import io.trino.spi.connector.Assignment;
import io.trino.spi.connector.CatalogSchemaName;
@@ -61,13 +62,17 @@
import io.trino.spi.security.TrinoPrincipal;
import io.trino.spi.statistics.ComputedStatistics;
import io.trino.spi.statistics.TableStatistics;
+import io.trino.spi.type.RowType;
import io.trino.spi.type.TypeManager;
import org.apache.iceberg.AppendFiles;
import org.apache.iceberg.DataFiles;
+import org.apache.iceberg.FileFormat;
+import org.apache.iceberg.FileMetadata;
import org.apache.iceberg.FileScanTask;
import org.apache.iceberg.PartitionField;
import org.apache.iceberg.PartitionSpec;
import org.apache.iceberg.PartitionSpecParser;
+import org.apache.iceberg.RowDelta;
import org.apache.iceberg.Schema;
import org.apache.iceberg.SchemaParser;
import org.apache.iceberg.Snapshot;
@@ -76,9 +81,11 @@
import org.apache.iceberg.Transaction;
import org.apache.iceberg.io.CloseableIterable;
import org.apache.iceberg.types.Type;
+import org.apache.iceberg.types.Types;
import java.io.IOException;
import java.io.UncheckedIOException;
+import java.util.ArrayList;
import java.util.Collection;
import java.util.Comparator;
import java.util.HashMap;
@@ -91,6 +98,7 @@
import java.util.function.BiPredicate;
import java.util.function.Function;
import java.util.function.Supplier;
+import java.util.stream.Collectors;
import static com.google.common.base.Verify.verify;
import static com.google.common.collect.ImmutableList.toImmutableList;
@@ -99,8 +107,10 @@
import static io.trino.plugin.hive.HiveApplyProjectionUtil.extractSupportedProjectedColumns;
import static io.trino.plugin.hive.HiveApplyProjectionUtil.replaceWithNewVariables;
import static io.trino.plugin.hive.util.HiveUtil.isStructuralType;
-import static io.trino.plugin.iceberg.ColumnIdentity.primitiveColumnIdentity;
+import static io.trino.plugin.iceberg.ColumnIdentity.createColumnIdentity;
import static io.trino.plugin.iceberg.ExpressionConverter.toIcebergExpression;
+import static io.trino.plugin.iceberg.IcebergColumnHandle.TRINO_ROW_ID_COLUMN_ID;
+import static io.trino.plugin.iceberg.IcebergColumnHandle.TRINO_ROW_ID_COLUMN_NAME;
import static io.trino.plugin.iceberg.IcebergErrorCode.ICEBERG_INVALID_METADATA;
import static io.trino.plugin.iceberg.IcebergSessionProperties.isProjectionPushdownEnabled;
import static io.trino.plugin.iceberg.IcebergSessionProperties.isStatisticsEnabled;
@@ -110,7 +120,7 @@
import static io.trino.plugin.iceberg.IcebergUtil.deserializePartitionValue;
import static io.trino.plugin.iceberg.IcebergUtil.getColumns;
import static io.trino.plugin.iceberg.IcebergUtil.getFileFormat;
-import static io.trino.plugin.iceberg.IcebergUtil.getPartitionKeys;
+import static io.trino.plugin.iceberg.IcebergUtil.getSerializedPartitionKeys;
import static io.trino.plugin.iceberg.IcebergUtil.getTableComment;
import static io.trino.plugin.iceberg.IcebergUtil.newCreateTableTransaction;
import static io.trino.plugin.iceberg.IcebergUtil.toIcebergSchema;
@@ -120,12 +130,16 @@
import static io.trino.plugin.iceberg.TrinoHiveCatalog.DEPENDS_ON_TABLES;
import static io.trino.plugin.iceberg.TypeConverter.toIcebergType;
import static io.trino.plugin.iceberg.TypeConverter.toTrinoType;
+import static io.trino.spi.StandardErrorCode.GENERIC_INTERNAL_ERROR;
import static io.trino.spi.StandardErrorCode.NOT_SUPPORTED;
import static io.trino.spi.type.BigintType.BIGINT;
+import static io.trino.spi.type.VarcharType.VARCHAR;
import static java.util.Collections.singletonList;
import static java.util.Objects.requireNonNull;
import static java.util.function.Function.identity;
import static java.util.stream.Collectors.joining;
+import static org.apache.iceberg.MetadataColumns.DELETE_FILE_PATH;
+import static org.apache.iceberg.MetadataColumns.DELETE_FILE_POS;
public class IcebergMetadata
implements ConnectorMetadata
@@ -185,10 +199,12 @@ public IcebergTableHandle getTableHandle(ConnectorSession session, SchemaTableNa
tableName.getSchemaName(),
name.getTableName(),
name.getTableType(),
+ IcebergTableWrapper.wrap(table),
snapshotId,
TupleDomain.all(),
TupleDomain.all(),
- ImmutableSet.of());
+ ImmutableSet.of(),
+ ImmutableList.of());
}
@Override
@@ -282,7 +298,7 @@ public ConnectorTableProperties getTableProperties(ConnectorSession session, Con
Iterable> discreteTupleDomain = Iterables.transform(files, fileScan -> {
// Extract partition values in the data file
- Map> partitionColumnValueStrings = getPartitionKeys(fileScan);
+ Map> partitionColumnValueStrings = getSerializedPartitionKeys(fileScan);
Map partitionValues = partitionSourceIds.stream()
.filter(partitionColumnValueStrings::containsKey)
.collect(toImmutableMap(
@@ -536,7 +552,114 @@ public Optional finishInsert(ConnectorSession session,
@Override
public ColumnHandle getDeleteRowIdColumnHandle(ConnectorSession session, ConnectorTableHandle tableHandle)
{
- return new IcebergColumnHandle(primitiveColumnIdentity(0, "$row_id"), BIGINT, ImmutableList.of(), BIGINT, Optional.empty());
+ return getUpdateRowIdColumnHandle(session, tableHandle, ImmutableList.of());
+ }
+
+ @Override
+ public ColumnHandle getUpdateRowIdColumnHandle(ConnectorSession session, ConnectorTableHandle tableHandle, List updatedColumns)
+ {
+ // prepare Iceberg metadata fields
+ List icebergFields = new ArrayList<>();
+ List trinoFields = new ArrayList<>();
+ List path = new ArrayList<>();
+ icebergFields.add(DELETE_FILE_PATH);
+ trinoFields.add(RowType.field(DELETE_FILE_PATH.name(), VARCHAR));
+ path.add(DELETE_FILE_PATH.fieldId());
+ icebergFields.add(DELETE_FILE_POS);
+ trinoFields.add(RowType.field(DELETE_FILE_POS.name(), BIGINT));
+ path.add(DELETE_FILE_POS.fieldId());
+
+ // for update, include all the non-update columns
+ if (!updatedColumns.isEmpty()) {
+ IcebergTableHandle table = (IcebergTableHandle) tableHandle;
+ Set updatedFields = updatedColumns.stream()
+ .map(IcebergColumnHandle.class::cast)
+ .map(IcebergColumnHandle::getId)
+ .collect(Collectors.toSet());
+ for (Types.NestedField column : table.getTable().schema().columns()) {
+ if (!updatedFields.contains(column.fieldId())) {
+ icebergFields.add(column);
+ trinoFields.add(RowType.field(column.name(), toTrinoType(column.type(), typeManager)));
+ path.add(column.fieldId());
+ }
+ }
+ }
+
+ Types.NestedField icebergRowIdField = Types.NestedField.required(TRINO_ROW_ID_COLUMN_ID, TRINO_ROW_ID_COLUMN_NAME, Types.StructType.of(icebergFields));
+ io.trino.spi.type.Type trinoRowIdType = RowType.from(trinoFields);
+ return new IcebergColumnHandle(createColumnIdentity(icebergRowIdField), trinoRowIdType, ImmutableList.of(), trinoRowIdType, Optional.empty());
+ }
+
+ @Override
+ public ConnectorTableHandle beginUpdate(ConnectorSession session, ConnectorTableHandle tableHandle, List updatedColumns)
+ {
+ IcebergTableHandle table = (IcebergTableHandle) tableHandle;
+ transaction = table.getTable().newTransaction();
+ return ((IcebergTableHandle) tableHandle).withUpdateColumns(updatedColumns.stream().map(IcebergColumnHandle.class::cast).collect(toImmutableList()));
+ }
+
+ @Override
+ public void finishUpdate(ConnectorSession session, ConnectorTableHandle tableHandle, Collection fragments)
+ {
+ IcebergTableHandle table = (IcebergTableHandle) tableHandle;
+ Table icebergTable = transaction.table();
+ Optional snapshot = table.getSnapshotId().map(icebergTable::snapshot);
+ FileFormat format = getFileFormat(icebergTable);
+
+ List commitTasks = fragments.stream()
+ .map(slice -> commitTaskCodec.fromJson(slice.getBytes()))
+ .collect(toImmutableList());
+
+ Type[] partitionColumnTypes = icebergTable.spec().fields().stream()
+ .map(field -> field.transform().getResultType(
+ icebergTable.schema().findType(field.sourceId())))
+ .toArray(Type[]::new);
+
+ RowDelta rowDelta = transaction.newRowDelta();
+ snapshot.ifPresent(s -> rowDelta.validateFromSnapshot(s.snapshotId()));
+ List referencedDataFiles = new ArrayList<>();
+ for (CommitTaskData task : commitTasks) {
+ switch (task.getContent()) {
+ case DATA:
+ DataFiles.Builder builder = DataFiles.builder(icebergTable.spec())
+ .withPath(task.getPath())
+ .withFormat(format)
+ .withFileSizeInBytes(task.getFileSizeInBytes())
+ .withMetrics(task.getMetrics().metrics());
+
+ if (!icebergTable.spec().fields().isEmpty()) {
+ String partitionDataJson = task.getPartitionDataJson()
+ .orElseThrow(() -> new VerifyException("No partition data for partitioned table"));
+ builder.withPartition(PartitionData.fromJson(partitionDataJson, partitionColumnTypes));
+ }
+
+ rowDelta.addRows(builder.build());
+ continue;
+ case POSITION_DELETES:
+ FileMetadata.Builder deleteBuilder = FileMetadata.deleteFileBuilder(icebergTable.spec())
+ .withPath(task.getPath())
+ .withFormat(format)
+ .ofPositionDeletes()
+ .withFileSizeInBytes(task.getFileSizeInBytes())
+ .withMetrics(task.getMetrics().metrics());
+
+ if (!icebergTable.spec().fields().isEmpty()) {
+ String partitionDataJson = task.getPartitionDataJson()
+ .orElseThrow(() -> new VerifyException("No partition data for partitioned table"));
+ deleteBuilder.withPartition(PartitionData.fromJson(partitionDataJson, partitionColumnTypes));
+ }
+
+ rowDelta.addDeletes(deleteBuilder.build());
+ referencedDataFiles.add(task.getPath());
+ continue;
+ case EQUALITY_DELETES:
+ default:
+ throw new TrinoException(GENERIC_INTERNAL_ERROR, "Iceberg file content type " + task.getContent() + " is not supported");
+ }
+ }
+ rowDelta.validateDataFilesExist(referencedDataFiles);
+ rowDelta.commit();
+ transaction.commitTransaction();
}
@Override
@@ -622,7 +745,13 @@ public Optional applyDelete(ConnectorSession session, Conn
@Override
public ConnectorTableHandle beginDelete(ConnectorSession session, ConnectorTableHandle tableHandle)
{
- throw new TrinoException(NOT_SUPPORTED, "This connector only supports delete where one or more identity-transformed partitions are deleted entirely");
+ return beginUpdate(session, tableHandle, new ArrayList<>());
+ }
+
+ @Override
+ public void finishDelete(ConnectorSession session, ConnectorTableHandle tableHandle, Collection fragments)
+ {
+ finishUpdate(session, tableHandle, fragments);
}
@Override
@@ -723,13 +852,7 @@ public Optional> applyFilter(C
}
return Optional.of(new ConstraintApplicationResult<>(
- new IcebergTableHandle(table.getSchemaName(),
- table.getTableName(),
- table.getTableType(),
- table.getSnapshotId(),
- newUnenforcedConstraint,
- newEnforcedConstraint,
- table.getProjectedColumns()),
+ table.withPredicates(newUnenforcedConstraint, newEnforcedConstraint),
remainingConstraint.transformKeys(ColumnHandle.class::cast),
false));
}
diff --git a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergPageSink.java b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergPageSink.java
index 6d6cd4cc903a..9b8fe93ef2e8 100644
--- a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergPageSink.java
+++ b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergPageSink.java
@@ -41,6 +41,7 @@
import io.trino.spi.type.VarcharType;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.mapred.JobConf;
+import org.apache.iceberg.FileContent;
import org.apache.iceberg.FileFormat;
import org.apache.iceberg.PartitionField;
import org.apache.iceberg.PartitionSpec;
@@ -170,7 +171,8 @@ public CompletableFuture> finish()
context.getPath().toString(),
context.getWriter().getWrittenBytes(),
new MetricsWrapper(context.getWriter().getMetrics()),
- context.getPartitionData().map(PartitionData::toJson));
+ context.getPartitionData().map(PartitionData::toJson),
+ FileContent.DATA);
commitTasks.add(wrappedBuffer(jsonCodec.toJsonBytes(task)));
}
@@ -312,7 +314,8 @@ private WriteContext createWriter(Optional partitionData)
jobConf,
session,
hdfsContext,
- fileFormat);
+ fileFormat,
+ FileContent.DATA);
return new WriteContext(writer, outputPath, partitionData);
}
diff --git a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergPageSource.java b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergPageSource.java
index d2915188412c..d570152f091b 100644
--- a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergPageSource.java
+++ b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergPageSource.java
@@ -13,65 +13,101 @@
*/
package io.trino.plugin.iceberg;
+import io.airlift.slice.Slice;
+import io.airlift.slice.Slices;
import io.trino.plugin.hive.ReaderProjectionsAdapter;
+import io.trino.plugin.iceberg.delete.IcebergPositionDeletePageSink;
+import io.trino.plugin.iceberg.delete.TrinoDeleteFilter;
+import io.trino.plugin.iceberg.delete.TrinoRow;
import io.trino.spi.Page;
import io.trino.spi.TrinoException;
import io.trino.spi.block.Block;
+import io.trino.spi.block.DictionaryBlock;
+import io.trino.spi.block.RowBlock;
import io.trino.spi.block.RunLengthEncodedBlock;
import io.trino.spi.connector.ConnectorPageSource;
-import io.trino.spi.predicate.Utils;
+import io.trino.spi.connector.UpdatablePageSource;
import io.trino.spi.type.Type;
+import org.apache.iceberg.io.CloseableIterable;
import java.io.IOException;
import java.io.UncheckedIOException;
+import java.util.ArrayList;
+import java.util.Collection;
import java.util.List;
-import java.util.Map;
import java.util.Optional;
import java.util.OptionalLong;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutionException;
+import java.util.stream.StreamSupport;
import static com.google.common.base.Throwables.throwIfInstanceOf;
import static io.trino.plugin.iceberg.IcebergErrorCode.ICEBERG_BAD_DATA;
-import static io.trino.plugin.iceberg.IcebergUtil.deserializePartitionValue;
+import static io.trino.spi.StandardErrorCode.GENERIC_INTERNAL_ERROR;
+import static io.trino.spi.block.RowBlock.fromFieldBlocks;
+import static io.trino.spi.predicate.Utils.nativeValueToBlock;
+import static io.trino.spi.type.VarcharType.VARCHAR;
import static java.util.Objects.requireNonNull;
public class IcebergPageSource
- implements ConnectorPageSource
+ implements UpdatablePageSource
{
- private final Block[] prefilledBlocks;
- private final int[] delegateIndexes;
+ private final Block filePathBlock;
+ private final TrinoDeleteFilter deleteFilter;
private final ConnectorPageSource delegate;
+ private final Block[] queriedColumnPrefillValues;
+ private final int[] queriedColumnFileReadChannels;
+ private final Block[] deleteColumnPrefillValues;
+ private final Type[] deleteColumnTypes;
+ private final int[] deleteColumnFileReadChannels;
+ private final Block[] nonUpdateColumnPrefillValues;
+ private final int[] nonUpdateColumnFileReadChannels;
+ private final int rowPositionChannel;
+ private final int[] allTableColumnChannels;
+ private final IcebergPositionDeletePageSink posDeleteSink;
+ private final IcebergPageSink updateRowSink;
private final Optional projectionsAdapter;
public IcebergPageSource(
- List columns,
- Map> partitionKeys,
+ String filePath,
+ TrinoDeleteFilter deleteFilter,
ConnectorPageSource delegate,
- Optional projectionsAdapter)
+ Block[] queriedColumnPrefillValues,
+ int[] queriedColumnFileReadChannels,
+ Block[] deleteColumnPrefillValues,
+ int[] deleteColumnFileReadChannels,
+ Type[] deleteColumnTypes,
+ Block[] nonUpdateColumnPrefillValues,
+ int[] nonUpdateColumnFileReadChannels,
+ int rowPositionChannel,
+ int[] allTableColumnChannels,
+ IcebergPositionDeletePageSink posDeleteSink,
+ IcebergPageSink updateRowSink,
+ Optional projectionsAdapter,
+ boolean isDeleteOrUpdateQuery,
+ boolean isUpdateQuery)
{
- int size = requireNonNull(columns, "columns is null").size();
- requireNonNull(partitionKeys, "partitionKeys is null");
+ this.filePathBlock = nativeValueToBlock(VARCHAR, Slices.utf8Slice(requireNonNull(filePath, "filePath is null")));
+ this.deleteFilter = requireNonNull(deleteFilter, "deleteFilter is null");
+ this.deleteColumnTypes = requireNonNull(deleteColumnTypes, "deleteColumnTypes is null");
this.delegate = requireNonNull(delegate, "delegate is null");
-
- this.prefilledBlocks = new Block[size];
- this.delegateIndexes = new int[size];
- this.projectionsAdapter = requireNonNull(projectionsAdapter, "projectionsAdapter is null");
-
- int outputIndex = 0;
- int delegateIndex = 0;
- for (IcebergColumnHandle column : columns) {
- if (partitionKeys.containsKey(column.getId())) {
- String partitionValue = partitionKeys.get(column.getId()).orElse(null);
- Type type = column.getType();
- Object prefilledValue = deserializePartitionValue(type, partitionValue, column.getName());
- prefilledBlocks[outputIndex] = Utils.nativeValueToBlock(type, prefilledValue);
- delegateIndexes[outputIndex] = -1;
- }
- else {
- delegateIndexes[outputIndex] = delegateIndex;
- delegateIndex++;
- }
- outputIndex++;
+ this.queriedColumnPrefillValues = requireNonNull(queriedColumnPrefillValues, "queriedColumnPrefillValues is null");
+ this.queriedColumnFileReadChannels = requireNonNull(queriedColumnFileReadChannels, "queriedColumnFileReadChannels is null");
+ this.deleteColumnPrefillValues = requireNonNull(deleteColumnPrefillValues, "deleteColumnPrefillValues is null");
+ this.deleteColumnFileReadChannels = requireNonNull(deleteColumnFileReadChannels, "deleteColumnFileReadChannels is null");
+ this.nonUpdateColumnPrefillValues = requireNonNull(nonUpdateColumnPrefillValues, "nonUpdateColumnPrefillValues is null");
+ this.nonUpdateColumnFileReadChannels = requireNonNull(nonUpdateColumnFileReadChannels, "nonUpdateColumnFileReadChannels is null");
+ this.allTableColumnChannels = requireNonNull(allTableColumnChannels, "allTableColumnChannels is null");
+ this.rowPositionChannel = rowPositionChannel;
+ if (isDeleteOrUpdateQuery) {
+ requireNonNull(posDeleteSink, "posDeleteSink is null");
}
+ this.posDeleteSink = posDeleteSink;
+ if (isUpdateQuery) {
+ requireNonNull(updateRowSink, "updateRowSink is null");
+ }
+ this.updateRowSink = updateRowSink;
+ this.projectionsAdapter = requireNonNull(projectionsAdapter, "projectionsAdapter is null");
}
@Override
@@ -98,6 +134,38 @@ public boolean isFinished()
return delegate.isFinished();
}
+ private Block[] buildBlocksFromPage(Page page, Block[] prefillValues, int[] fileReadChannels)
+ {
+ int batchSize = page.getPositionCount();
+ Block[] blocks = new Block[prefillValues.length];
+ for (int i = 0; i < prefillValues.length; i++) {
+ int fileReadChannel = fileReadChannels[i];
+ if (fileReadChannel == -1) {
+ blocks[i] = new RunLengthEncodedBlock(prefillValues[i], batchSize);
+ }
+ else if (fileReadChannel == -2) {
+ blocks[i] = buildRowIdBlock(page);
+ }
+ else {
+ blocks[i] = page.getBlock(fileReadChannel);
+ }
+ }
+ return blocks;
+ }
+
+ private Block buildRowIdBlock(Page page)
+ {
+ int batchSize = page.getPositionCount();
+ Block[] blocks = new Block[nonUpdateColumnPrefillValues.length + 2];
+ blocks[0] = new RunLengthEncodedBlock(filePathBlock, batchSize);
+ blocks[1] = page.getBlock(rowPositionChannel);
+ Block[] nonUpdateColumnBlocks = buildBlocksFromPage(page, nonUpdateColumnPrefillValues, nonUpdateColumnFileReadChannels);
+ for (int i = 0; i < nonUpdateColumnBlocks.length; i++) {
+ blocks[i + 2] = nonUpdateColumnBlocks[i];
+ }
+ return fromFieldBlocks(batchSize, Optional.empty(), blocks);
+ }
+
@Override
public Page getNextPage()
{
@@ -109,17 +177,21 @@ public Page getNextPage()
if (dataPage == null) {
return null;
}
+
int batchSize = dataPage.getPositionCount();
- Block[] blocks = new Block[prefilledBlocks.length];
- for (int i = 0; i < prefilledBlocks.length; i++) {
- if (prefilledBlocks[i] != null) {
- blocks[i] = new RunLengthEncodedBlock(prefilledBlocks[i], batchSize);
- }
- else {
- blocks[i] = dataPage.getBlock(delegateIndexes[i]);
- }
+ Block[] outputBlocks = buildBlocksFromPage(dataPage, queriedColumnPrefillValues, queriedColumnFileReadChannels);
+ Page outputPage = new Page(batchSize, outputBlocks);
+
+ // short-circuit if no deletes
+ if (deleteColumnFileReadChannels.length == 0) {
+ return outputPage;
}
- return new Page(batchSize, blocks);
+
+ Block[] deleteBlocks = buildBlocksFromPage(dataPage, deleteColumnPrefillValues, deleteColumnFileReadChannels);
+ CloseableIterable filteringRows = CloseableIterable.withNoopClose(TrinoRow.fromBlocks(deleteColumnTypes, deleteBlocks, batchSize));
+ CloseableIterable filteredRows = deleteFilter.filter(filteringRows);
+ int[] positionsToKeep = StreamSupport.stream(filteredRows.spliterator(), false).mapToInt(TrinoRow::getPosition).toArray();
+ return outputPage.getPositions(positionsToKeep, 0, positionsToKeep.length);
}
catch (RuntimeException e) {
closeWithSuppression(e);
@@ -128,6 +200,87 @@ public Page getNextPage()
}
}
+ @Override
+ public void deleteRows(Block rowIds)
+ {
+ RowBlock rows = resolveRowIdBlock(rowIds);
+ posDeleteSink.appendPage(new Page(rows.getPositionCount(), rows.getChildren().toArray(new Block[0])));
+ }
+
+ @Override
+ public void updateRows(Page page, List columnValueAndRowIdChannels)
+ {
+ int batchSize = page.getPositionCount();
+ Block rowIdRawBlock = page.getBlock(columnValueAndRowIdChannels.get(columnValueAndRowIdChannels.size() - 1));
+ RowBlock rowIdBlock = resolveRowIdBlock(rowIdRawBlock);
+ Block[] rowIdFieldBlocks = rowIdBlock.getChildren().toArray(new Block[0]);
+ Block[] posDeleteBlocks = new Block[2];
+ posDeleteBlocks[0] = rowIdFieldBlocks[0];
+ posDeleteBlocks[1] = rowIdFieldBlocks[1];
+ posDeleteSink.appendPage(new Page(batchSize, posDeleteBlocks));
+
+ Block[] allTableColumnBlocks = new Block[allTableColumnChannels.length];
+ for (int i = 0; i < allTableColumnChannels.length; i++) {
+ int allTableColumnChannel = allTableColumnChannels[i];
+ if (allTableColumnChannel > -1) {
+ allTableColumnBlocks[i] = rowIdFieldBlocks[allTableColumnChannel + 2];
+ }
+ else {
+ int updateValueChannelIndex = (-1) - allTableColumnChannel;
+ allTableColumnBlocks[i] = page.getBlock(columnValueAndRowIdChannels.get(updateValueChannelIndex));
+ }
+ }
+
+ updateRowSink.appendPage(new Page(batchSize, allTableColumnBlocks));
+ }
+
+ private RowBlock resolveRowIdBlock(Block rowIds)
+ {
+ RowBlock rows;
+ if (rowIds instanceof RowBlock) {
+ rows = (RowBlock) rowIds;
+ }
+ else if (rowIds instanceof DictionaryBlock) {
+ Block dictionary = ((DictionaryBlock) rowIds).getDictionary();
+ if (dictionary instanceof RowBlock) {
+ rows = (RowBlock) dictionary;
+ }
+ else {
+ throw new TrinoException(GENERIC_INTERNAL_ERROR, "Unexpected rowId dictionary block type " + dictionary);
+ }
+ }
+ else {
+ throw new TrinoException(GENERIC_INTERNAL_ERROR, "Unexpected rowId block type " + rowIds);
+ }
+ return rows;
+ }
+
+ @Override
+ public CompletableFuture> finish()
+ {
+ try {
+ Collection slices = new ArrayList<>();
+ if (posDeleteSink != null) {
+ slices.addAll(posDeleteSink.finish().get());
+ }
+ if (updateRowSink != null) {
+ slices.addAll(updateRowSink.finish().get());
+ }
+ return CompletableFuture.completedFuture(slices);
+ }
+ catch (InterruptedException | ExecutionException e) {
+ throw new RuntimeException(e);
+ }
+ }
+
+ @Override
+ public void abort()
+ {
+ posDeleteSink.abort();
+ updateRowSink.abort();
+ close();
+ }
+
@Override
public void close()
{
diff --git a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergPageSourceProvider.java b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergPageSourceProvider.java
index 0f70beea554e..130efc05d20e 100644
--- a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergPageSourceProvider.java
+++ b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergPageSourceProvider.java
@@ -17,6 +17,7 @@
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Maps;
import com.google.common.graph.Traverser;
+import io.airlift.json.JsonCodec;
import io.trino.memory.context.AggregatedMemoryContext;
import io.trino.orc.NameBasedFieldMapper;
import io.trino.orc.OrcColumn;
@@ -52,7 +53,11 @@
import io.trino.plugin.hive.parquet.ParquetPageSource;
import io.trino.plugin.hive.parquet.ParquetReaderConfig;
import io.trino.plugin.iceberg.IcebergParquetColumnIOConverter.FieldContext;
+import io.trino.plugin.iceberg.delete.IcebergPositionDeletePageSink;
+import io.trino.plugin.iceberg.delete.TrinoDeleteFilter;
+import io.trino.spi.PageIndexerFactory;
import io.trino.spi.TrinoException;
+import io.trino.spi.block.Block;
import io.trino.spi.connector.ColumnHandle;
import io.trino.spi.connector.ConnectorPageSource;
import io.trino.spi.connector.ConnectorPageSourceProvider;
@@ -66,6 +71,7 @@
import io.trino.spi.security.ConnectorIdentity;
import io.trino.spi.type.StandardTypes;
import io.trino.spi.type.Type;
+import io.trino.spi.type.TypeManager;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FileStatus;
@@ -73,6 +79,14 @@
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hdfs.BlockMissingException;
import org.apache.iceberg.FileFormat;
+import org.apache.iceberg.FileScanTask;
+import org.apache.iceberg.PartitionSpec;
+import org.apache.iceberg.Schema;
+import org.apache.iceberg.StructLike;
+import org.apache.iceberg.Table;
+import org.apache.iceberg.io.FileIO;
+import org.apache.iceberg.util.Pair;
+import org.apache.iceberg.util.StructProjection;
import org.apache.parquet.column.ColumnDescriptor;
import org.apache.parquet.hadoop.metadata.BlockMetaData;
import org.apache.parquet.hadoop.metadata.FileMetaData;
@@ -107,6 +121,7 @@
import static io.trino.parquet.ParquetTypeUtils.getParquetTypeByName;
import static io.trino.parquet.predicate.PredicateUtils.buildPredicate;
import static io.trino.parquet.predicate.PredicateUtils.predicateMatches;
+import static io.trino.plugin.iceberg.IcebergColumnHandle.ROW_POSITION_HANDLE;
import static io.trino.plugin.iceberg.IcebergErrorCode.ICEBERG_BAD_DATA;
import static io.trino.plugin.iceberg.IcebergErrorCode.ICEBERG_CANNOT_OPEN_SPLIT;
import static io.trino.plugin.iceberg.IcebergErrorCode.ICEBERG_CURSOR_ERROR;
@@ -123,9 +138,15 @@
import static io.trino.plugin.iceberg.IcebergSessionProperties.isOrcNestedLazy;
import static io.trino.plugin.iceberg.IcebergSessionProperties.isUseFileSizeFromMetadata;
import static io.trino.plugin.iceberg.IcebergSplitManager.ICEBERG_DOMAIN_COMPACTION_THRESHOLD;
+import static io.trino.plugin.iceberg.IcebergUtil.deserializePartitionValue;
+import static io.trino.plugin.iceberg.IcebergUtil.getColumns;
+import static io.trino.plugin.iceberg.IcebergUtil.getFileFormat;
import static io.trino.plugin.iceberg.TypeConverter.ICEBERG_BINARY_TYPE;
import static io.trino.plugin.iceberg.TypeConverter.ORC_ICEBERG_ID_KEY;
+import static io.trino.spi.StandardErrorCode.GENERIC_USER_ERROR;
import static io.trino.spi.StandardErrorCode.NOT_SUPPORTED;
+import static io.trino.spi.predicate.Utils.nativeValueToBlock;
+import static io.trino.spi.type.BooleanType.BOOLEAN;
import static io.trino.spi.type.UuidType.UUID;
import static io.trino.spi.type.VarbinaryType.VARBINARY;
import static java.lang.String.format;
@@ -135,6 +156,8 @@
import static java.util.stream.Collectors.mapping;
import static java.util.stream.Collectors.toList;
import static java.util.stream.Collectors.toUnmodifiableList;
+import static org.apache.iceberg.FileFormat.ORC;
+import static org.apache.iceberg.MetadataColumns.ROW_POSITION;
import static org.joda.time.DateTimeZone.UTC;
public class IcebergPageSourceProvider
@@ -144,18 +167,37 @@ public class IcebergPageSourceProvider
private final FileFormatDataSourceStats fileFormatDataSourceStats;
private final OrcReaderOptions orcReaderOptions;
private final ParquetReaderOptions parquetReaderOptions;
+ private final TypeManager typeManager;
+ private final JsonCodec jsonCodec;
+ private final IcebergFileWriterFactory fileWriterFactory;
+ private final PageIndexerFactory pageIndexerFactory;
+ private final FileIoProvider fileIoProvider;
+ private final int maxOpenPartitions;
@Inject
public IcebergPageSourceProvider(
HdfsEnvironment hdfsEnvironment,
FileFormatDataSourceStats fileFormatDataSourceStats,
OrcReaderConfig orcReaderConfig,
- ParquetReaderConfig parquetReaderConfig)
+ ParquetReaderConfig parquetReaderConfig,
+ TypeManager typeManager,
+ JsonCodec jsonCodec,
+ IcebergFileWriterFactory fileWriterFactory,
+ PageIndexerFactory pageIndexerFactory,
+ FileIoProvider fileIoProvider,
+ IcebergConfig config)
{
this.hdfsEnvironment = requireNonNull(hdfsEnvironment, "hdfsEnvironment is null");
this.fileFormatDataSourceStats = requireNonNull(fileFormatDataSourceStats, "fileFormatDataSourceStats is null");
this.orcReaderOptions = requireNonNull(orcReaderConfig, "orcReaderConfig is null").toOrcReaderOptions();
this.parquetReaderOptions = requireNonNull(parquetReaderConfig, "parquetReaderConfig is null").toParquetReaderOptions();
+ this.typeManager = requireNonNull(typeManager, "typeManager is null");
+ this.jsonCodec = requireNonNull(jsonCodec, "jsonCodec is null");
+ this.fileWriterFactory = requireNonNull(fileWriterFactory, "fileWriterFactory is null");
+ this.pageIndexerFactory = requireNonNull(pageIndexerFactory, "pageIndexerFactory is null");
+ this.fileIoProvider = requireNonNull(fileIoProvider, "fileIoProvider is null");
+ requireNonNull(config, "config is null");
+ this.maxOpenPartitions = config.getMaxPartitionsPerWriter();
}
@Override
@@ -169,22 +211,117 @@ public ConnectorPageSource createPageSource(
{
IcebergSplit split = (IcebergSplit) connectorSplit;
IcebergTableHandle table = (IcebergTableHandle) connectorTable;
-
- List icebergColumns = columns.stream()
+ Table icebergTable = table.getTable();
+ FileScanTask task = split.getTask();
+ List updateColumns = table.getUpdateColumns();
+ List queriedColumns = columns.stream()
.map(IcebergColumnHandle.class::cast)
.collect(toImmutableList());
Map> partitionKeys = split.getPartitionKeys();
+ Optional partition = task.spec().isUnpartitioned() ? Optional.empty() : Optional.of(task.file().partition());
+ Optional partitionDataForWrite = coercePartitionData(icebergTable.spec(), task.spec(), partition);
- List regularColumns = columns.stream()
- .map(IcebergColumnHandle.class::cast)
- .filter(column -> !partitionKeys.containsKey(column.getId()))
- .collect(toImmutableList());
TupleDomain effectivePredicate = table.getUnenforcedPredicate()
.intersect(dynamicFilter.getCurrentPredicate().transformKeys(IcebergColumnHandle.class::cast))
.simplify(ICEBERG_DOMAIN_COMPACTION_THRESHOLD);
+ // construct the columns that needs to be read by the file reader
+ List fileReadColumnIds = new ArrayList<>();
+ List fileReadColumns = new ArrayList<>();
+ List isRowPositionChannel = new ArrayList<>();
+
+ // 1. non-partition queried columns
+ Block[] queriedColumnPrefillValues = new Block[queriedColumns.size()];
+ int[] queriedColumnFileReadChannels = new int[queriedColumns.size()];
+ boolean isDeleteOrUpdateQuery = false;
+ for (int idx = 0; idx < queriedColumns.size(); idx++) {
+ IcebergColumnHandle column = queriedColumns.get(idx);
+ if (column.isTrinoRowIdColumn()) {
+ // TODO: it's a bit late to fail here, but failing earlier would cause metadata delete to also fail
+ if (ORC == getFileFormat(table.getTable())) {
+ throw new TrinoException(GENERIC_USER_ERROR, "Row level delete and update are not supported for ORC type");
+ }
+ isDeleteOrUpdateQuery = true;
+ queriedColumnPrefillValues[idx] = null;
+ queriedColumnFileReadChannels[idx] = -2; // use -2 to indicate $rowid column
+ }
+ else {
+ prefillPartitionValuesAndCompleteFileReadChannels(idx, column, fileReadColumnIds, fileReadColumns,
+ queriedColumnPrefillValues, queriedColumnFileReadChannels, isRowPositionChannel, partitionKeys, false);
+ }
+ }
+
+ // 2. non-partition equality delete columns
+ // make sure the order of delete columns are the same as the schema required by the delete filter
HdfsContext hdfsContext = new HdfsContext(session);
+ FileIO fileIO = fileIoProvider.createFileIo(hdfsContext, session.getQueryId());
+ TrinoDeleteFilter deleteFilter = new TrinoDeleteFilter(fileIO, task, icebergTable.schema());
+ List deleteColumns = getColumns(deleteFilter.requiredSchema(), typeManager);
+ Block[] deleteColumnPrefillValues = new Block[deleteColumns.size()];
+ int[] deleteColumnFileReadChannels = new int[deleteColumns.size()];
+ Type[] deleteColumnTypes = new Type[deleteColumns.size()];
+ int rowPositionChannel = -1;
+ for (int idx = 0; idx < deleteColumns.size(); idx++) {
+ IcebergColumnHandle column = deleteColumns.get(idx);
+ deleteColumnTypes[idx] = column.getType();
+ if (column.isIcebergIsDeletedMetadataColumn()) {
+ deleteColumnPrefillValues[idx] = nativeValueToBlock(BOOLEAN, false);
+ deleteColumnFileReadChannels[idx] = -1;
+ }
+ else {
+ prefillPartitionValuesAndCompleteFileReadChannels(idx, column, fileReadColumnIds, fileReadColumns,
+ deleteColumnPrefillValues, deleteColumnFileReadChannels, isRowPositionChannel, partitionKeys,
+ column.isIcebergRowPositionMetadataColumn());
+ if (column.isIcebergRowPositionMetadataColumn()) {
+ rowPositionChannel = deleteColumnFileReadChannels[idx];
+ }
+ }
+ }
+
+ // for delete or update query, if read of delete query does not have position deletes, add that column separately
+ if (isDeleteOrUpdateQuery && rowPositionChannel == -1) {
+ rowPositionChannel = fileReadColumns.size();
+ fileReadColumns.add(ROW_POSITION_HANDLE);
+ fileReadColumnIds.add(ROW_POSITION.fieldId());
+ isRowPositionChannel.add(true);
+ }
+
+ // for update, all table columns are needed and in the order of the table schema
+ // the channels for update pages are [update-col1, update-col2, ..., rowId]
+ // the rowId channels are [file_path, pos, non-update-col1, non-update-col2, ...]
+ // use (-1-idx) to save update channel index, and (+idx) for non-update to avoid using 2 different arrays
+ List updateColumnIds = updateColumns.stream().map(IcebergColumnHandle::getId).collect(Collectors.toList());
+ List allTableColumns = getColumns(icebergTable.schema(), typeManager);
+ int[] allTableColumnChannels = new int[allTableColumns.size()];
+ List nonUpdateColumns = new ArrayList<>();
+ boolean isUpdateQuery = !updateColumnIds.isEmpty();
+ if (isUpdateQuery) {
+ for (int idx = 0; idx < allTableColumns.size(); idx++) {
+ IcebergColumnHandle column = allTableColumns.get(idx);
+ int updateChannel = updateColumnIds.indexOf(column.getId());
+ if (updateChannel > -1) {
+ allTableColumnChannels[idx] = (-1) - updateChannel;
+ }
+ else {
+ nonUpdateColumns.add(column);
+ allTableColumnChannels[idx] = nonUpdateColumns.size() - 1;
+ }
+ }
+ }
+
+ // 3. non-partition non-update columns
+ // if there is no update column, then non-update column is also empty and it's a no-op
+ // otherwise we need to read and pass all the non-update column values into the rowid block
+ Block[] nonUpdateColumnPrefillValues = new Block[nonUpdateColumns.size()];
+ int[] nonUpdateColumnFileReadChannels = new int[nonUpdateColumns.size()];
+ for (int idx = 0; idx < nonUpdateColumns.size(); idx++) {
+ prefillPartitionValuesAndCompleteFileReadChannels(idx, nonUpdateColumns.get(idx), fileReadColumnIds, fileReadColumns,
+ nonUpdateColumnPrefillValues, nonUpdateColumnFileReadChannels, isRowPositionChannel, partitionKeys, false);
+ }
+
+ // create file page source
+ String filePath = task.file().path().toString();
ReaderPageSource dataPageSource = createDataPageSource(
session,
hdfsContext,
@@ -193,16 +330,82 @@ public ConnectorPageSource createPageSource(
split.getLength(),
split.getFileSize(),
split.getFileFormat(),
- regularColumns,
- effectivePredicate);
+ fileReadColumns,
+ effectivePredicate,
+ isRowPositionChannel);
+
+ IcebergPositionDeletePageSink posDeleteSink = isDeleteOrUpdateQuery ? new IcebergPositionDeletePageSink(
+ icebergTable.spec(),
+ partitionDataForWrite,
+ icebergTable.locationProvider(),
+ fileWriterFactory,
+ hdfsEnvironment,
+ hdfsContext,
+ jsonCodec,
+ session,
+ split.getFileFormat()) : null;
+
+ IcebergPageSink updateRowSink = isUpdateQuery ? new IcebergPageSink(
+ icebergTable.schema(),
+ icebergTable.spec(),
+ icebergTable.locationProvider(),
+ fileWriterFactory,
+ pageIndexerFactory,
+ hdfsEnvironment,
+ hdfsContext,
+ allTableColumns,
+ jsonCodec,
+ session,
+ task.file().format(),
+ maxOpenPartitions) : null;
Optional projectionsAdapter = dataPageSource.getReaderColumns().map(readerColumns ->
new ReaderProjectionsAdapter(
- regularColumns,
+ fileReadColumns,
readerColumns,
column -> ((IcebergColumnHandle) column).getType(),
IcebergPageSourceProvider::applyProjection));
- return new IcebergPageSource(icebergColumns, partitionKeys, dataPageSource.get(), projectionsAdapter);
+
+ return new IcebergPageSource(
+ filePath, deleteFilter, dataPageSource.get(),
+ queriedColumnPrefillValues, queriedColumnFileReadChannels,
+ deleteColumnPrefillValues, deleteColumnFileReadChannels, deleteColumnTypes,
+ nonUpdateColumnPrefillValues, nonUpdateColumnFileReadChannels, rowPositionChannel,
+ allTableColumnChannels, posDeleteSink, updateRowSink, projectionsAdapter,
+ isDeleteOrUpdateQuery, isUpdateQuery);
+ }
+
+ private void prefillPartitionValuesAndCompleteFileReadChannels(
+ int idx,
+ IcebergColumnHandle column,
+ List fileReadColumnIds,
+ List fileReadColumns,
+ Object[] prefillValues,
+ int[] fileReadChannels,
+ List isRowPositionChannel,
+ Map> partitionKeys,
+ boolean isRowPosition)
+ {
+ if (partitionKeys.containsKey(column.getId())) {
+ String partitionValue = partitionKeys.get(column.getId()).orElse(null);
+ Type type = column.getType();
+ Object nativeValue = deserializePartitionValue(type, partitionValue, column.getName());
+ prefillValues[idx] = nativeValueToBlock(type, nativeValue);
+ fileReadChannels[idx] = -1;
+ }
+ else {
+ prefillValues[idx] = null;
+ int fileReadChannel = fileReadColumnIds.indexOf(column.getId());
+ if (fileReadChannel > -1) {
+ fileReadChannels[idx] = fileReadChannel;
+ }
+ else {
+ fileReadChannels[idx] = fileReadColumns.size();
+ fileReadColumnIds.add(column.getId());
+ fileReadColumns.add(column);
+ isRowPositionChannel.add(isRowPosition);
+ }
+ }
}
private ReaderPageSource createDataPageSource(
@@ -214,7 +417,8 @@ private ReaderPageSource createDataPageSource(
long fileSize,
FileFormat fileFormat,
List dataColumns,
- TupleDomain predicate)
+ TupleDomain predicate,
+ List isRowPositionChannel)
{
if (!isUseFileSizeFromMetadata(session)) {
try {
@@ -262,12 +466,29 @@ private ReaderPageSource createDataPageSource(
parquetReaderOptions
.withMaxReadBlockSize(getParquetMaxReadBlockSize(session)),
predicate,
- fileFormatDataSourceStats);
+ fileFormatDataSourceStats,
+ isRowPositionChannel);
default:
throw new TrinoException(NOT_SUPPORTED, "File format not supported for Iceberg: " + fileFormat);
}
}
+ public static Optional coercePartitionData(PartitionSpec newSpec, PartitionSpec spec, Optional partition)
+ {
+ // TODO: requires 0.13 StructProjection.createAllowMissing for the correct behavior
+ StructProjection projection = StructProjection.create(new Schema(spec.partitionType().fields()), new Schema(newSpec.partitionType().fields()));
+ projection.wrap(partition.orElse(null));
+ PartitionData projectedPartition = null;
+ if (!newSpec.isUnpartitioned()) {
+ Object[] partitionValues = new Object[projection.size()];
+ for (int i = 0; i < projection.size(); i++) {
+ partitionValues[i] = projection.get(i, Object.class);
+ }
+ projectedPartition = new PartitionData(partitionValues);
+ }
+ return Optional.ofNullable(projectedPartition);
+ }
+
private static ReaderPageSource createOrcPageSource(
HdfsEnvironment hdfsEnvironment,
ConnectorIdentity identity,
@@ -545,7 +766,8 @@ private static ReaderPageSource createParquetPageSource(
List regularColumns,
ParquetReaderOptions options,
TupleDomain effectivePredicate,
- FileFormatDataSourceStats fileFormatDataSourceStats)
+ FileFormatDataSourceStats fileFormatDataSourceStats,
+ List isRowPositionChannel)
{
AggregatedMemoryContext systemMemoryContext = newSimpleAggregatedMemoryContext();
@@ -567,7 +789,9 @@ private static ReaderPageSource createParquetPageSource(
// Map by name for a migrated table
boolean mapByName = parquetIdToField.isEmpty();
- Optional columnProjections = projectColumns(regularColumns);
+ Pair, List> projectResult = projectColumnsWithRowPositionChannel(regularColumns, isRowPositionChannel);
+ Optional columnProjections = projectResult.first();
+ List projectedIsRowPositionChannel = projectResult.second();
List readColumns = columnProjections
.map(readerColumns -> (List) readerColumns.get().stream().map(IcebergColumnHandle.class::cast).collect(toImmutableList()))
.orElse(regularColumns);
@@ -586,13 +810,17 @@ private static ReaderPageSource createParquetPageSource(
TupleDomain parquetTupleDomain = getParquetTupleDomain(descriptorsByPath, effectivePredicate);
Predicate parquetPredicate = buildPredicate(requestedSchema, parquetTupleDomain, descriptorsByPath, UTC);
+ long nextStart = 0;
List blocks = new ArrayList<>();
+ ImmutableList.Builder blockStarts = ImmutableList.builder();
for (BlockMetaData block : parquetMetadata.getBlocks()) {
long firstDataPage = block.getColumns().get(0).getFirstDataPageOffset();
if (start <= firstDataPage && firstDataPage < start + length &&
predicateMatches(parquetPredicate, block, dataSource, descriptorsByPath, parquetTupleDomain)) {
blocks.add(block);
+ blockStarts.add(nextStart);
}
+ nextStart += block.getRowCount();
}
MessageColumnIO messageColumnIO = getColumnIO(fileSchema, requestedSchema);
@@ -600,7 +828,7 @@ private static ReaderPageSource createParquetPageSource(
Optional.ofNullable(fileMetaData.getCreatedBy()),
messageColumnIO,
blocks,
- Optional.empty(),
+ Optional.of(blockStarts.build()),
dataSource,
UTC,
systemMemoryContext,
@@ -628,7 +856,7 @@ private static ReaderPageSource createParquetPageSource(
}
}
- return new ReaderPageSource(new ParquetPageSource(parquetReader, trinoTypes.build(), internalFields.build()), columnProjections);
+ return new ReaderPageSource(new ParquetPageSource(parquetReader, trinoTypes.build(), projectedIsRowPositionChannel, internalFields.build()), columnProjections);
}
catch (IOException | RuntimeException e) {
try {
@@ -729,6 +957,47 @@ public static Optional projectColumns(List c
return Optional.of(new ReaderColumns(projectedColumns.build(), outputColumnMapping.build()));
}
+ /**
+ * Creates a mapping between the input {@param columns} and base columns if required.
+ */
+ public static Pair, List> projectColumnsWithRowPositionChannel(
+ List columns,
+ List isRowPositionChannel)
+ {
+ requireNonNull(columns, "columns is null");
+ requireNonNull(isRowPositionChannel, "isRowPositionChannel is null");
+
+ // No projection is required if all columns are base columns
+ if (columns.stream().allMatch(IcebergColumnHandle::isBaseColumn)) {
+ return Pair.of(Optional.empty(), isRowPositionChannel);
+ }
+
+ ImmutableList.Builder projectedColumns = ImmutableList.builder();
+ ImmutableList.Builder outputColumnMapping = ImmutableList.builder();
+ ImmutableList.Builder projectedIsRowPositionChannel = ImmutableList.builder();
+ Map mappedFieldIds = new HashMap<>();
+ int projectedColumnCount = 0;
+
+ for (IcebergColumnHandle column : columns) {
+ int baseColumnId = column.getBaseColumnIdentity().getId();
+ Integer mapped = mappedFieldIds.get(baseColumnId);
+
+ if (mapped == null) {
+ projectedColumns.add(column.getBaseColumn());
+ mappedFieldIds.put(baseColumnId, projectedColumnCount);
+ outputColumnMapping.add(projectedColumnCount);
+ projectedIsRowPositionChannel.add(column.isIcebergRowPositionMetadataColumn());
+ projectedColumnCount++;
+ }
+ else {
+ outputColumnMapping.add(mapped);
+ }
+ }
+
+ Optional readerColumns = Optional.of(new ReaderColumns(projectedColumns.build(), outputColumnMapping.build()));
+ return Pair.of(readerColumns, projectedIsRowPositionChannel.build());
+ }
+
private static TupleDomain getParquetTupleDomain(Map, RichColumnDescriptor> descriptorsByPath, TupleDomain effectivePredicate)
{
if (effectivePredicate.isNone()) {
diff --git a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergParquetFileWriter.java b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergParquetFileWriter.java
index b4204c928601..6f63d3708144 100644
--- a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergParquetFileWriter.java
+++ b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergParquetFileWriter.java
@@ -13,12 +13,14 @@
*/
package io.trino.plugin.iceberg;
+import com.google.common.collect.ImmutableMap;
import io.trino.parquet.writer.ParquetWriterOptions;
import io.trino.plugin.hive.HdfsEnvironment;
import io.trino.plugin.hive.HdfsEnvironment.HdfsContext;
import io.trino.plugin.hive.parquet.ParquetFileWriter;
import io.trino.spi.type.Type;
import org.apache.hadoop.fs.Path;
+import org.apache.iceberg.FileContent;
import org.apache.iceberg.Metrics;
import org.apache.iceberg.MetricsConfig;
import org.apache.iceberg.parquet.ParquetUtil;
@@ -31,14 +33,18 @@
import java.util.concurrent.Callable;
import static java.util.Objects.requireNonNull;
+import static org.apache.iceberg.TableProperties.DEFAULT_WRITE_METRICS_MODE;
public class IcebergParquetFileWriter
extends ParquetFileWriter
implements IcebergFileWriter
{
+ private static final MetricsConfig FULL_METRICS_CONFIG = MetricsConfig.fromProperties(ImmutableMap.of(DEFAULT_WRITE_METRICS_MODE, "full"));
+
private final Path outputPath;
private final HdfsEnvironment hdfsEnvironment;
private final HdfsContext hdfsContext;
+ private final MetricsConfig metricsConfig;
public IcebergParquetFileWriter(
OutputStream outputStream,
@@ -52,7 +58,8 @@ public IcebergParquetFileWriter(
String trinoVersion,
Path outputPath,
HdfsEnvironment hdfsEnvironment,
- HdfsContext hdfsContext)
+ HdfsContext hdfsContext,
+ FileContent fileContent)
{
super(outputStream,
rollbackAction,
@@ -66,11 +73,14 @@ public IcebergParquetFileWriter(
this.outputPath = requireNonNull(outputPath, "outputPath is null");
this.hdfsEnvironment = requireNonNull(hdfsEnvironment, "hdfsEnvironment is null");
this.hdfsContext = requireNonNull(hdfsContext, "hdfsContext is null");
+ requireNonNull(fileContent, "fileContent is null");
+ // TODO: initialize metrics config from Iceberg table properties
+ this.metricsConfig = fileContent == FileContent.POSITION_DELETES ? FULL_METRICS_CONFIG : MetricsConfig.getDefault();
}
@Override
public Metrics getMetrics()
{
- return hdfsEnvironment.doAs(hdfsContext.getIdentity(), () -> ParquetUtil.fileMetrics(new HdfsInputFile(outputPath, hdfsEnvironment, hdfsContext), MetricsConfig.getDefault()));
+ return hdfsEnvironment.doAs(hdfsContext.getIdentity(), () -> ParquetUtil.fileMetrics(new HdfsInputFile(outputPath, hdfsEnvironment, hdfsContext), metricsConfig));
}
}
diff --git a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergSplit.java b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergSplit.java
index f06e3371a5df..2a4773ef6408 100644
--- a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergSplit.java
+++ b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergSplit.java
@@ -14,48 +14,54 @@
package io.trino.plugin.iceberg;
import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonIgnore;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
+import io.trino.plugin.iceberg.serdes.IcebergFileScanTaskWrapper;
import io.trino.spi.HostAddress;
import io.trino.spi.connector.ConnectorSplit;
import org.apache.iceberg.FileFormat;
+import org.apache.iceberg.FileScanTask;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import static com.google.common.base.MoreObjects.toStringHelper;
+import static io.trino.plugin.iceberg.IcebergUtil.getSerializedPartitionKeys;
import static java.util.Objects.requireNonNull;
public class IcebergSplit
implements ConnectorSplit
{
+ private final IcebergFileScanTaskWrapper taskWrapper;
+ private final List addresses;
+
+ // cached fields
+ private final FileScanTask task;
private final String path;
private final long start;
private final long length;
private final long fileSize;
private final FileFormat fileFormat;
- private final List addresses;
private final Map> partitionKeys;
@JsonCreator
public IcebergSplit(
- @JsonProperty("path") String path,
- @JsonProperty("start") long start,
- @JsonProperty("length") long length,
- @JsonProperty("fileSize") long fileSize,
- @JsonProperty("fileFormat") FileFormat fileFormat,
- @JsonProperty("addresses") List addresses,
- @JsonProperty("partitionKeys") Map> partitionKeys)
+ @JsonProperty("taskWrapper") IcebergFileScanTaskWrapper taskWrapper,
+ @JsonProperty("addresses") List addresses)
{
- this.path = requireNonNull(path, "path is null");
- this.start = start;
- this.length = length;
- this.fileSize = fileSize;
- this.fileFormat = requireNonNull(fileFormat, "fileFormat is null");
+ this.taskWrapper = requireNonNull(taskWrapper, "taskWrapper is null");
this.addresses = ImmutableList.copyOf(requireNonNull(addresses, "addresses is null"));
- this.partitionKeys = ImmutableMap.copyOf(requireNonNull(partitionKeys, "partitionKeys is null"));
+
+ this.task = taskWrapper.getTask();
+ this.path = task.file().path().toString();
+ this.start = task.start();
+ this.length = task.length();
+ this.fileSize = task.file().fileSizeInBytes();
+ this.fileFormat = task.file().format();
+ this.partitionKeys = getSerializedPartitionKeys(task);
}
@Override
@@ -72,36 +78,48 @@ public List getAddresses()
}
@JsonProperty
+ public IcebergFileScanTaskWrapper getTaskWrapper()
+ {
+ return taskWrapper;
+ }
+
+ @JsonIgnore
+ public FileScanTask getTask()
+ {
+ return task;
+ }
+
+ @JsonIgnore
public String getPath()
{
return path;
}
- @JsonProperty
+ @JsonIgnore
public long getStart()
{
return start;
}
- @JsonProperty
+ @JsonIgnore
public long getLength()
{
return length;
}
- @JsonProperty
+ @JsonIgnore
public long getFileSize()
{
return fileSize;
}
- @JsonProperty
+ @JsonIgnore
public FileFormat getFileFormat()
{
return fileFormat;
}
- @JsonProperty
+ @JsonIgnore
public Map> getPartitionKeys()
{
return partitionKeys;
diff --git a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergSplitManager.java b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergSplitManager.java
index 4fdd9c6b0c74..9ed062f9a972 100644
--- a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergSplitManager.java
+++ b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergSplitManager.java
@@ -44,13 +44,11 @@ public class IcebergSplitManager
{
public static final int ICEBERG_DOMAIN_COMPACTION_THRESHOLD = 1000;
- private final IcebergTransactionManager transactionManager;
private final TypeManager typeManager;
@Inject
- public IcebergSplitManager(IcebergTransactionManager transactionManager, TypeManager typeManager)
+ public IcebergSplitManager(TypeManager typeManager)
{
- this.transactionManager = requireNonNull(transactionManager, "transactionManager is null");
this.typeManager = requireNonNull(typeManager, "typeManager is null");
}
@@ -69,7 +67,7 @@ public ConnectorSplitSource getSplits(
return new FixedSplitSource(ImmutableList.of());
}
- Table icebergTable = transactionManager.get(transaction).getIcebergTable(session, table.getSchemaTableName());
+ Table icebergTable = table.getTable();
Duration dynamicFilteringWaitTimeout = getDynamicFilteringWaitTimeout(session);
Set identityPartitionFieldIds = getIdentityPartitions(icebergTable.spec()).keySet().stream()
diff --git a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergSplitSource.java b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergSplitSource.java
index a9e8b31af5f7..b70c89227519 100644
--- a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergSplitSource.java
+++ b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergSplitSource.java
@@ -19,7 +19,7 @@
import com.google.common.collect.Iterators;
import com.google.common.collect.Streams;
import io.airlift.units.Duration;
-import io.trino.spi.TrinoException;
+import io.trino.plugin.iceberg.serdes.IcebergFileScanTaskWrapper;
import io.trino.spi.connector.ColumnHandle;
import io.trino.spi.connector.ConnectorPartitionHandle;
import io.trino.spi.connector.ConnectorSplit;
@@ -59,10 +59,8 @@
import static io.trino.plugin.iceberg.IcebergSplitManager.ICEBERG_DOMAIN_COMPACTION_THRESHOLD;
import static io.trino.plugin.iceberg.IcebergTypes.convertIcebergValueToTrino;
import static io.trino.plugin.iceberg.IcebergUtil.deserializePartitionValue;
-import static io.trino.plugin.iceberg.IcebergUtil.getPartitionKeys;
import static io.trino.plugin.iceberg.IcebergUtil.primitiveFieldTypes;
import static io.trino.plugin.iceberg.TypeConverter.toIcebergType;
-import static io.trino.spi.StandardErrorCode.NOT_SUPPORTED;
import static java.util.Objects.requireNonNull;
import static java.util.concurrent.CompletableFuture.completedFuture;
import static java.util.concurrent.TimeUnit.MILLISECONDS;
@@ -157,10 +155,6 @@ public CompletableFuture getNextBatch(ConnectorPartitionHan
ImmutableList.Builder splits = ImmutableList.builder();
while (fileScanTasks.hasNext()) {
FileScanTask scanTask = fileScanTasks.next();
- if (!scanTask.deletes().isEmpty()) {
- throw new TrinoException(NOT_SUPPORTED, "Iceberg tables with delete files are not supported: " + tableHandle.getSchemaTableName());
- }
-
IcebergSplit icebergSplit = toIcebergSplit(scanTask);
Supplier