diff --git a/plugin/trino-iceberg/pom.xml b/plugin/trino-iceberg/pom.xml index 6fdecebbbb04..43fe71d1d8fb 100644 --- a/plugin/trino-iceberg/pom.xml +++ b/plugin/trino-iceberg/pom.xml @@ -24,7 +24,7 @@ trino-hive - + com.linkedin.calcite calcite-core @@ -176,23 +176,6 @@ - - org.apache.iceberg - iceberg-data - ${dep.iceberg.version} - - - - org.apache.parquet - parquet-avro - - - org.slf4j - slf4j-api - - - - org.apache.iceberg iceberg-hive-metastore @@ -221,6 +204,11 @@ + + org.roaringbitmap + RoaringBitmap + + org.weakref jmxutils diff --git a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergPageSource.java b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergPageSource.java index fc277b68b4db..c443fc8328c7 100644 --- a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergPageSource.java +++ b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergPageSource.java @@ -18,7 +18,7 @@ import io.airlift.slice.Slice; import io.trino.plugin.hive.ReaderProjectionsAdapter; import io.trino.plugin.iceberg.delete.IcebergPositionDeletePageSink; -import io.trino.plugin.iceberg.delete.TrinoRow; +import io.trino.plugin.iceberg.delete.RowPredicate; import io.trino.spi.Page; import io.trino.spi.TrinoException; import io.trino.spi.block.Block; @@ -27,10 +27,7 @@ import io.trino.spi.connector.ConnectorPageSource; import io.trino.spi.connector.UpdatablePageSource; import io.trino.spi.metrics.Metrics; -import io.trino.spi.type.Type; import org.apache.iceberg.Schema; -import org.apache.iceberg.data.DeleteFilter; -import org.apache.iceberg.io.CloseableIterable; import org.apache.iceberg.types.Types; import javax.annotation.Nullable; @@ -59,11 +56,10 @@ public class IcebergPageSource implements UpdatablePageSource { private final Schema schema; - private final Type[] columnTypes; private final int[] expectedColumnIndexes; private final ConnectorPageSource delegate; private final Optional projectionsAdapter; - private final Optional> deleteFilter; + private final Optional deletePredicate; private final Supplier positionDeleteSinkSupplier; private final Supplier updatedRowPageSinkSupplier; // An array with one element per field in the $row_id column. The value in the array points to the @@ -85,10 +81,9 @@ public IcebergPageSource( Schema schema, List expectedColumns, List requiredColumns, - List readColumns, ConnectorPageSource delegate, Optional projectionsAdapter, - Optional> deleteFilter, + Optional deletePredicate, Supplier positionDeleteSinkSupplier, Supplier updatedRowPageSinkSupplier, List updatedColumns) @@ -120,12 +115,9 @@ public IcebergPageSource( } } - this.columnTypes = readColumns.stream() - .map(IcebergColumnHandle::getType) - .toArray(Type[]::new); this.delegate = requireNonNull(delegate, "delegate is null"); this.projectionsAdapter = requireNonNull(projectionsAdapter, "projectionsAdapter is null"); - this.deleteFilter = requireNonNull(deleteFilter, "deleteFilter is null"); + this.deletePredicate = requireNonNull(deletePredicate, "deletePredicate is null"); this.positionDeleteSinkSupplier = requireNonNull(positionDeleteSinkSupplier, "positionDeleteSinkSupplier is null"); this.updatedRowPageSinkSupplier = requireNonNull(updatedRowPageSinkSupplier, "updatedRowPageSinkSupplier is null"); requireNonNull(updatedColumns, "updatedColumnFieldIds is null"); @@ -167,20 +159,8 @@ public Page getNextPage() return null; } - if (deleteFilter.isPresent()) { - int positionCount = dataPage.getPositionCount(); - int[] positionsToKeep = new int[positionCount]; - try (CloseableIterable filteredRows = deleteFilter.get().filter(CloseableIterable.withNoopClose(TrinoRow.fromPage(columnTypes, dataPage, positionCount)))) { - int positionsToKeepCount = 0; - for (TrinoRow rowToKeep : filteredRows) { - positionsToKeep[positionsToKeepCount] = rowToKeep.getPosition(); - positionsToKeepCount++; - } - dataPage = dataPage.getPositions(positionsToKeep, 0, positionsToKeepCount); - } - catch (IOException e) { - throw new TrinoException(ICEBERG_BAD_DATA, "Failed to filter rows during merge-on-read operation", e); - } + if (deletePredicate.isPresent()) { + dataPage = deletePredicate.get().filterPage(dataPage); } if (projectionsAdapter.isPresent()) { diff --git a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergPageSourceProvider.java b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergPageSourceProvider.java index da35e31021fa..dbd47267c7bd 100644 --- a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergPageSourceProvider.java +++ b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergPageSourceProvider.java @@ -13,10 +13,12 @@ */ package io.trino.plugin.iceberg; +import com.google.common.base.VerifyException; import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableMap; import com.google.common.graph.Traverser; import io.airlift.json.JsonCodec; +import io.airlift.slice.Slice; import io.trino.memory.context.AggregatedMemoryContext; import io.trino.orc.OrcColumn; import io.trino.orc.OrcCorruptionException; @@ -52,10 +54,11 @@ import io.trino.plugin.hive.parquet.ParquetPageSource; import io.trino.plugin.hive.parquet.ParquetReaderConfig; import io.trino.plugin.iceberg.IcebergParquetColumnIOConverter.FieldContext; -import io.trino.plugin.iceberg.delete.DummyFileScanTask; +import io.trino.plugin.iceberg.delete.DeleteFile; +import io.trino.plugin.iceberg.delete.DeleteFilter; import io.trino.plugin.iceberg.delete.IcebergPositionDeletePageSink; -import io.trino.plugin.iceberg.delete.TrinoDeleteFilter; -import io.trino.plugin.iceberg.delete.TrinoRow; +import io.trino.plugin.iceberg.delete.PositionDeleteFilter; +import io.trino.plugin.iceberg.delete.RowPredicate; import io.trino.spi.PageIndexerFactory; import io.trino.spi.TrinoException; import io.trino.spi.connector.ColumnHandle; @@ -68,6 +71,7 @@ import io.trino.spi.connector.DynamicFilter; import io.trino.spi.connector.EmptyPageSource; import io.trino.spi.predicate.Domain; +import io.trino.spi.predicate.NullableValue; import io.trino.spi.predicate.TupleDomain; import io.trino.spi.security.ConnectorIdentity; import io.trino.spi.type.ArrayType; @@ -81,17 +85,14 @@ import org.apache.avro.generic.GenericRecord; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FSDataInputStream; -import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hdfs.BlockMissingException; -import org.apache.iceberg.FileScanTask; import org.apache.iceberg.PartitionSpec; import org.apache.iceberg.PartitionSpecParser; import org.apache.iceberg.Schema; import org.apache.iceberg.SchemaParser; import org.apache.iceberg.avro.AvroSchemaUtil; -import org.apache.iceberg.data.DeleteFilter; import org.apache.iceberg.io.FileIO; import org.apache.iceberg.io.LocationProvider; import org.apache.iceberg.mapping.MappedField; @@ -106,10 +107,13 @@ import org.apache.parquet.io.ColumnIO; import org.apache.parquet.io.MessageColumnIO; import org.apache.parquet.schema.MessageType; +import org.roaringbitmap.longlong.LongBitmapDataProvider; +import org.roaringbitmap.longlong.Roaring64Bitmap; import javax.inject.Inject; import java.io.IOException; +import java.io.UncheckedIOException; import java.util.ArrayList; import java.util.HashMap; import java.util.List; @@ -138,7 +142,6 @@ import static io.trino.plugin.iceberg.IcebergErrorCode.ICEBERG_BAD_DATA; import static io.trino.plugin.iceberg.IcebergErrorCode.ICEBERG_CANNOT_OPEN_SPLIT; import static io.trino.plugin.iceberg.IcebergErrorCode.ICEBERG_CURSOR_ERROR; -import static io.trino.plugin.iceberg.IcebergErrorCode.ICEBERG_FILESYSTEM_ERROR; import static io.trino.plugin.iceberg.IcebergErrorCode.ICEBERG_MISSING_DATA; import static io.trino.plugin.iceberg.IcebergMetadataColumn.FILE_PATH; import static io.trino.plugin.iceberg.IcebergSessionProperties.getOrcLazyReadSmallRanges; @@ -157,21 +160,30 @@ import static io.trino.plugin.iceberg.IcebergUtil.getColumns; import static io.trino.plugin.iceberg.IcebergUtil.getLocationProvider; import static io.trino.plugin.iceberg.IcebergUtil.getPartitionKeys; +import static io.trino.plugin.iceberg.IcebergUtil.schemaFromHandles; import static io.trino.plugin.iceberg.TypeConverter.ICEBERG_BINARY_TYPE; import static io.trino.plugin.iceberg.TypeConverter.ORC_ICEBERG_ID_KEY; +import static io.trino.plugin.iceberg.delete.EqualityDeleteFilter.readEqualityDeletes; +import static io.trino.plugin.iceberg.delete.PositionDeleteFilter.readPositionDeletes; import static io.trino.spi.StandardErrorCode.NOT_SUPPORTED; import static io.trino.spi.predicate.Utils.nativeValueToBlock; import static io.trino.spi.type.BigintType.BIGINT; import static io.trino.spi.type.BooleanType.BOOLEAN; import static io.trino.spi.type.UuidType.UUID; import static io.trino.spi.type.VarbinaryType.VARBINARY; +import static io.trino.spi.type.VarcharType.VARCHAR; import static java.lang.String.format; import static java.util.Locale.ENGLISH; import static java.util.Objects.requireNonNull; +import static java.util.function.Predicate.not; import static java.util.stream.Collectors.groupingBy; import static java.util.stream.Collectors.mapping; import static java.util.stream.Collectors.toList; import static java.util.stream.Collectors.toUnmodifiableList; +import static org.apache.iceberg.FileContent.EQUALITY_DELETES; +import static org.apache.iceberg.FileContent.POSITION_DELETES; +import static org.apache.iceberg.MetadataColumns.DELETE_FILE_PATH; +import static org.apache.iceberg.MetadataColumns.DELETE_FILE_POS; import static org.apache.iceberg.MetadataColumns.ROW_POSITION; import static org.joda.time.DateTimeZone.UTC; @@ -233,18 +245,13 @@ public ConnectorPageSource createPageSource( .map(IcebergColumnHandle.class::cast) .collect(toImmutableList()); - HdfsContext hdfsContext = new HdfsContext(session); - FileIO fileIO = fileIoProvider.createFileIo(hdfsContext, session.getQueryId()); - FileScanTask dummyFileScanTask = new DummyFileScanTask(split.getPath(), split.getDeletes()); Schema tableSchema = SchemaParser.fromJson(table.getTableSchemaJson()); - // Creating a DeleteFilter with no requestedSchema ensures `deleteFilterRequiredSchema` is only columns needed by the filter. - List deleteFilterRequiredSchema = getColumns(new TrinoDeleteFilter( - dummyFileScanTask, - tableSchema, - ImmutableList.of(), - fileIO) - .requiredSchema(), - typeManager); + + List deleteFilters = readDeletes(session, tableSchema, split.getPath(), split.getDeletes()); + + Set deleteFilterRequiredColumns = deleteFilters.stream() + .flatMap(filter -> getColumns(filter.schema(), typeManager).stream()) + .collect(toImmutableSet()); PartitionSpec partitionSpec = PartitionSpecParser.fromJson(tableSchema, split.getPartitionSpecJson()); org.apache.iceberg.types.Type[] partitionColumnTypes = partitionSpec.fields().stream() @@ -254,9 +261,11 @@ public ConnectorPageSource createPageSource( Map> partitionKeys = getPartitionKeys(partitionData, partitionSpec); List requiredColumns = new ArrayList<>(icebergColumns); - deleteFilterRequiredSchema.stream() - .filter(column -> !icebergColumns.contains(column)) + + deleteFilterRequiredColumns.stream() + .filter(not(icebergColumns::contains)) .forEach(requiredColumns::add); + icebergColumns.stream() .filter(IcebergColumnHandle::isUpdateRowIdColumn) .findFirst().ifPresent(updateRowIdColumn -> { @@ -282,13 +291,20 @@ public ConnectorPageSource createPageSource( return new EmptyPageSource(); } + HdfsContext hdfsContext = new HdfsContext(session); + long fileSize = split.getFileSize(); + if (!isUseFileSizeFromMetadata(session)) { + fileSize = fileIoProvider.createFileIo(hdfsContext, session.getQueryId()) + .newInputFile(split.getPath()).getLength(); + } + ReaderPageSource dataPageSource = createDataPageSource( session, hdfsContext, new Path(split.getPath()), split.getStart(), split.getLength(), - split.getFileSize(), + fileSize, split.getFileFormat(), split.getSchemaAsJson().map(SchemaParser::fromJson), requiredColumns, @@ -306,11 +322,10 @@ public ConnectorPageSource createPageSource( List readColumns = dataPageSource.getReaderColumns() .map(readerColumns -> readerColumns.get().stream().map(IcebergColumnHandle.class::cast).collect(toList())) .orElse(requiredColumns); - DeleteFilter deleteFilter = new TrinoDeleteFilter( - dummyFileScanTask, - tableSchema, - readColumns, - fileIO); + + Optional deletePredicate = deleteFilters.stream() + .map(filter -> filter.createPredicate(readColumns)) + .reduce(RowPredicate::and); Optional partition = partitionSpec.isUnpartitioned() ? Optional.empty() : Optional.of(partitionData); LocationProvider locationProvider = getLocationProvider(table.getSchemaTableName(), table.getTableLocation(), table.getStorageProperties()); @@ -350,16 +365,83 @@ public ConnectorPageSource createPageSource( tableSchema, icebergColumns, requiredColumns, - readColumns, dataPageSource.get(), projectionsAdapter, - Optional.of(deleteFilter).filter(filter -> filter.hasPosDeletes() || filter.hasEqDeletes()), + deletePredicate, positionDeleteSink, updatedRowPageSinkSupplier, table.getUpdatedColumns()), getClass().getClassLoader()); } + private List readDeletes(ConnectorSession session, Schema schema, String dataFilePath, List deleteFiles) + { + Slice targetPath = utf8Slice(dataFilePath); + List filters = new ArrayList<>(); + LongBitmapDataProvider deletedRows = new Roaring64Bitmap(); + + IcebergColumnHandle deleteFilePath = getColumnHandle(DELETE_FILE_PATH, typeManager); + IcebergColumnHandle deleteFilePos = getColumnHandle(DELETE_FILE_POS, typeManager); + List deleteColumns = ImmutableList.of(deleteFilePath, deleteFilePos); + TupleDomain deleteDomain = TupleDomain.fromFixedValues(ImmutableMap.of(deleteFilePath, NullableValue.of(VARCHAR, targetPath))); + + for (DeleteFile delete : deleteFiles) { + if (delete.content() == POSITION_DELETES) { + try (ConnectorPageSource pageSource = openDeletes(session, delete, deleteColumns, deleteDomain)) { + readPositionDeletes(pageSource, targetPath, deletedRows); + } + catch (IOException e) { + throw new UncheckedIOException(e); + } + } + else if (delete.content() == EQUALITY_DELETES) { + List fieldIds = delete.equalityFieldIds(); + verify(!fieldIds.isEmpty(), "equality field IDs are missing"); + List columns = fieldIds.stream() + .map(id -> getColumnHandle(schema.findField(id), typeManager)) + .collect(toImmutableList()); + + try (ConnectorPageSource pageSource = openDeletes(session, delete, columns, TupleDomain.all())) { + filters.add(readEqualityDeletes(pageSource, columns, schema)); + } + catch (IOException e) { + throw new UncheckedIOException(e); + } + } + else { + throw new VerifyException("Unknown delete content: " + delete.content()); + } + } + + if (!deletedRows.isEmpty()) { + filters.add(new PositionDeleteFilter(deletedRows)); + } + + return filters; + } + + private ConnectorPageSource openDeletes( + ConnectorSession session, + DeleteFile delete, + List columns, + TupleDomain tupleDomain) + { + return createDataPageSource( + session, + new HdfsContext(session), + new Path(delete.path().toString()), + 0, + delete.fileSizeInBytes(), + delete.fileSizeInBytes(), + IcebergFileFormat.fromIceberg(delete.format()), + Optional.of(schemaFromHandles(columns)), + columns, + tupleDomain, + Optional.empty(), + ImmutableMap.of()) + .get(); + } + private ReaderPageSource createDataPageSource( ConnectorSession session, HdfsContext hdfsContext, @@ -374,17 +456,6 @@ private ReaderPageSource createDataPageSource( Optional nameMapping, Map> partitionKeys) { - if (!isUseFileSizeFromMetadata(session)) { - try { - FileStatus fileStatus = hdfsEnvironment.doAs(session.getIdentity(), - () -> hdfsEnvironment.getFileSystem(hdfsContext, path).getFileStatus(path)); - fileSize = fileStatus.getLen(); - } - catch (IOException e) { - throw new TrinoException(ICEBERG_FILESYSTEM_ERROR, e); - } - } - switch (fileFormat) { case ORC: return createOrcPageSource( diff --git a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergSplit.java b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergSplit.java index 7112a89d1474..dfcf79f25642 100644 --- a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergSplit.java +++ b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergSplit.java @@ -15,9 +15,10 @@ import com.fasterxml.jackson.annotation.JsonCreator; import com.fasterxml.jackson.annotation.JsonProperty; +import com.google.common.base.MoreObjects.ToStringHelper; import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableMap; -import io.trino.plugin.iceberg.delete.TrinoDeleteFile; +import io.trino.plugin.iceberg.delete.DeleteFile; import io.trino.spi.HostAddress; import io.trino.spi.SplitWeight; import io.trino.spi.connector.ConnectorSplit; @@ -45,7 +46,7 @@ public class IcebergSplit private final String partitionSpecJson; private final String partitionDataJson; private final Optional schemaAsJson; - private final List deletes; + private final List deletes; private final SplitWeight splitWeight; @JsonCreator @@ -60,7 +61,7 @@ public IcebergSplit( @JsonProperty("partitionSpecJson") String partitionSpecJson, @JsonProperty("partitionDataJson") String partitionDataJson, @JsonProperty("schemaAsJson") Optional schemaAsJson, - @JsonProperty("deletes") List deletes, + @JsonProperty("deletes") List deletes, @JsonProperty("splitWeight") SplitWeight splitWeight) { this.path = requireNonNull(path, "path is null"); @@ -145,7 +146,7 @@ public String getPartitionDataJson() } @JsonProperty - public List getDeletes() + public List getDeletes() { return deletes; } @@ -175,17 +176,23 @@ public long getRetainedSizeInBytes() + estimatedSizeOf(addresses, HostAddress::getRetainedSizeInBytes) + estimatedSizeOf(partitionSpecJson) + estimatedSizeOf(partitionDataJson) - + estimatedSizeOf(deletes, TrinoDeleteFile::getRetainedSizeInBytes) + + estimatedSizeOf(deletes, DeleteFile::getRetainedSizeInBytes) + splitWeight.getRetainedSizeInBytes(); } @Override public String toString() { - return toStringHelper(this) + ToStringHelper helper = toStringHelper(this) .addValue(path) - .addValue(start) - .addValue(length) - .toString(); + .add("start", start) + .add("length", length) + .add("records", fileRecordCount); + if (!deletes.isEmpty()) { + helper.add("deleteFiles", deletes.size()); + helper.add("deleteRecords", deletes.stream() + .mapToLong(DeleteFile::recordCount).sum()); + } + return helper.toString(); } } diff --git a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergSplitSource.java b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergSplitSource.java index 621267ea91aa..5b7834d5e1cc 100644 --- a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergSplitSource.java +++ b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergSplitSource.java @@ -21,7 +21,7 @@ import com.google.common.io.Closer; import io.airlift.units.DataSize; import io.airlift.units.Duration; -import io.trino.plugin.iceberg.delete.TrinoDeleteFile; +import io.trino.plugin.iceberg.delete.DeleteFile; import io.trino.plugin.iceberg.util.DataFileWithDeleteFiles; import io.trino.spi.SplitWeight; import io.trino.spi.TrinoException; @@ -396,7 +396,7 @@ private IcebergSplit toIcebergSplit(FileScanTask task) PartitionData.toJson(task.file().partition()), fileFormat != AVRO ? Optional.empty() : Optional.of(SchemaParser.toJson(task.spec().schema())), task.deletes().stream() - .map(TrinoDeleteFile::copyOf) + .map(DeleteFile::fromIceberg) .collect(toImmutableList()), SplitWeight.fromProportion(Math.min(Math.max((double) task.length() / tableScan.targetSplitSize(), minimumAssignedSplitWeight), 1.0))); } diff --git a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/delete/DeleteFile.java b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/delete/DeleteFile.java new file mode 100644 index 000000000000..eea89a9df9a0 --- /dev/null +++ b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/delete/DeleteFile.java @@ -0,0 +1,121 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.trino.plugin.iceberg.delete; + +import com.fasterxml.jackson.annotation.JsonCreator; +import com.fasterxml.jackson.annotation.JsonProperty; +import com.google.common.collect.ImmutableList; +import org.apache.iceberg.FileContent; +import org.apache.iceberg.FileFormat; +import org.openjdk.jol.info.ClassLayout; + +import java.util.List; +import java.util.Optional; + +import static com.google.common.base.MoreObjects.toStringHelper; +import static io.airlift.slice.SizeOf.SIZE_OF_INT; +import static io.airlift.slice.SizeOf.estimatedSizeOf; +import static java.util.Objects.requireNonNull; + +public final class DeleteFile +{ + private static final long INSTANCE_SIZE = ClassLayout.parseClass(DeleteFile.class).instanceSize(); + + private final FileContent content; + private final String path; + private final FileFormat format; + private final long recordCount; + private final long fileSizeInBytes; + private final List equalityFieldIds; + + public static DeleteFile fromIceberg(org.apache.iceberg.DeleteFile deleteFile) + { + return new DeleteFile( + deleteFile.content(), + deleteFile.path().toString(), + deleteFile.format(), + deleteFile.recordCount(), + deleteFile.fileSizeInBytes(), + Optional.ofNullable(deleteFile.equalityFieldIds()).orElseGet(ImmutableList::of)); + } + + @JsonCreator + public DeleteFile( + FileContent content, + String path, + FileFormat format, + long recordCount, + long fileSizeInBytes, + List equalityFieldIds) + { + this.content = requireNonNull(content, "content is null"); + this.path = requireNonNull(path, "path is null"); + this.format = requireNonNull(format, "format is null"); + this.recordCount = recordCount; + this.fileSizeInBytes = fileSizeInBytes; + this.equalityFieldIds = ImmutableList.copyOf(requireNonNull(equalityFieldIds, "equalityFieldIds is null")); + } + + @JsonProperty + public FileContent content() + { + return content; + } + + @JsonProperty + public CharSequence path() + { + return path; + } + + @JsonProperty + public FileFormat format() + { + return format; + } + + @JsonProperty + public long recordCount() + { + return recordCount; + } + + @JsonProperty + public long fileSizeInBytes() + { + return fileSizeInBytes; + } + + @JsonProperty + public List equalityFieldIds() + { + return equalityFieldIds; + } + + public long getRetainedSizeInBytes() + { + return INSTANCE_SIZE + + estimatedSizeOf(path) + + estimatedSizeOf(equalityFieldIds, ignored -> SIZE_OF_INT); + } + + @Override + public String toString() + { + return toStringHelper(this) + .addValue(path) + .add("records", recordCount) + .toString(); + } +} diff --git a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/delete/DeleteFilter.java b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/delete/DeleteFilter.java new file mode 100644 index 000000000000..a911c948e62c --- /dev/null +++ b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/delete/DeleteFilter.java @@ -0,0 +1,26 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.trino.plugin.iceberg.delete; + +import io.trino.plugin.iceberg.IcebergColumnHandle; +import org.apache.iceberg.Schema; + +import java.util.List; + +public interface DeleteFilter +{ + Schema schema(); + + RowPredicate createPredicate(List columns); +} diff --git a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/delete/DummyFileScanTask.java b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/delete/DummyFileScanTask.java deleted file mode 100644 index 2ffb2a45b647..000000000000 --- a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/delete/DummyFileScanTask.java +++ /dev/null @@ -1,232 +0,0 @@ -/* - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package io.trino.plugin.iceberg.delete; - -import com.google.common.collect.ImmutableList; -import org.apache.iceberg.CombinedScanTask; -import org.apache.iceberg.DataFile; -import org.apache.iceberg.DataTask; -import org.apache.iceberg.DeleteFile; -import org.apache.iceberg.FileFormat; -import org.apache.iceberg.FileScanTask; -import org.apache.iceberg.PartitionSpec; -import org.apache.iceberg.StructLike; -import org.apache.iceberg.expressions.Expression; - -import java.nio.ByteBuffer; -import java.util.List; -import java.util.Map; - -import static java.util.Objects.requireNonNull; - -// TODO: This wrapper is necessary until the constructors of the Iceberg DeleteFilter are made more specific -// Remove after upgrading to Iceberg with https://github.com/apache/iceberg/pull/4381 -public class DummyFileScanTask - implements FileScanTask -{ - private final DataFile file; - private final List deletes; - - public DummyFileScanTask(String path, List deletes) - { - requireNonNull(path, "path is null"); - this.file = new DummyDataFile(path); - this.deletes = ImmutableList.copyOf(requireNonNull(deletes, "deletes is null")); - } - - @Override - public DataFile file() - { - return file; - } - - @Override - public List deletes() - { - return deletes; - } - - @Override - public PartitionSpec spec() - { - throw new UnsupportedOperationException(); - } - - @Override - public long start() - { - throw new UnsupportedOperationException(); - } - - @Override - public long length() - { - throw new UnsupportedOperationException(); - } - - @Override - public Expression residual() - { - throw new UnsupportedOperationException(); - } - - @Override - public Iterable split(long l) - { - throw new UnsupportedOperationException(); - } - - @Override - public boolean isFileScanTask() - { - throw new UnsupportedOperationException(); - } - - @Override - public FileScanTask asFileScanTask() - { - throw new UnsupportedOperationException(); - } - - @Override - public boolean isDataTask() - { - throw new UnsupportedOperationException(); - } - - @Override - public DataTask asDataTask() - { - throw new UnsupportedOperationException(); - } - - @Override - public CombinedScanTask asCombinedScanTask() - { - throw new UnsupportedOperationException(); - } - - private static class DummyDataFile - implements DataFile - { - private final String path; - - private DummyDataFile(String path) - { - this.path = requireNonNull(path, "path is null"); - } - - @Override - public Long pos() - { - throw new UnsupportedOperationException(); - } - - @Override - public int specId() - { - throw new UnsupportedOperationException(); - } - - @Override - public CharSequence path() - { - return path; - } - - @Override - public FileFormat format() - { - throw new UnsupportedOperationException(); - } - - @Override - public StructLike partition() - { - throw new UnsupportedOperationException(); - } - - @Override - public long recordCount() - { - throw new UnsupportedOperationException(); - } - - @Override - public long fileSizeInBytes() - { - throw new UnsupportedOperationException(); - } - - @Override - public Map columnSizes() - { - throw new UnsupportedOperationException(); - } - - @Override - public Map valueCounts() - { - throw new UnsupportedOperationException(); - } - - @Override - public Map nullValueCounts() - { - throw new UnsupportedOperationException(); - } - - @Override - public Map nanValueCounts() - { - throw new UnsupportedOperationException(); - } - - @Override - public Map lowerBounds() - { - throw new UnsupportedOperationException(); - } - - @Override - public Map upperBounds() - { - throw new UnsupportedOperationException(); - } - - @Override - public ByteBuffer keyMetadata() - { - throw new UnsupportedOperationException(); - } - - @Override - public List splitOffsets() - { - throw new UnsupportedOperationException(); - } - - @Override - public DataFile copy() - { - throw new UnsupportedOperationException(); - } - - @Override - public DataFile copyWithoutStats() - { - throw new UnsupportedOperationException(); - } - } -} diff --git a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/delete/EqualityDeleteFilter.java b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/delete/EqualityDeleteFilter.java new file mode 100644 index 000000000000..87579c5ef43f --- /dev/null +++ b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/delete/EqualityDeleteFilter.java @@ -0,0 +1,93 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.trino.plugin.iceberg.delete; + +import io.trino.plugin.iceberg.IcebergColumnHandle; +import io.trino.spi.Page; +import io.trino.spi.connector.ConnectorPageSource; +import io.trino.spi.type.Type; +import org.apache.iceberg.Schema; +import org.apache.iceberg.StructLike; +import org.apache.iceberg.types.TypeUtil; +import org.apache.iceberg.util.StructLikeSet; +import org.apache.iceberg.util.StructProjection; + +import java.util.List; +import java.util.Set; + +import static com.google.common.collect.ImmutableSet.toImmutableSet; +import static io.trino.plugin.iceberg.IcebergUtil.schemaFromHandles; +import static java.util.Objects.requireNonNull; + +public final class EqualityDeleteFilter + implements DeleteFilter +{ + private final Schema schema; + private final StructLikeSet deleteSet; + + private EqualityDeleteFilter(Schema schema, StructLikeSet deleteSet) + { + this.schema = requireNonNull(schema, "schema is null"); + this.deleteSet = requireNonNull(deleteSet, "deleteSet is null"); + } + + @Override + public Schema schema() + { + return schema; + } + + @Override + public RowPredicate createPredicate(List columns) + { + Type[] types = columns.stream() + .map(IcebergColumnHandle::getType) + .toArray(Type[]::new); + + Schema fileSchema = schemaFromHandles(columns); + StructProjection projection = StructProjection.create(fileSchema, schema); + + return (page, position) -> { + StructLike row = new LazyTrinoRow(types, page, position); + return !deleteSet.contains(projection.wrap(row)); + }; + } + + public static DeleteFilter readEqualityDeletes(ConnectorPageSource pageSource, List columns, Schema tableSchema) + { + Set ids = columns.stream() + .map(IcebergColumnHandle::getId) + .collect(toImmutableSet()); + + Type[] types = columns.stream() + .map(IcebergColumnHandle::getType) + .toArray(Type[]::new); + + Schema deleteSchema = TypeUtil.select(tableSchema, ids); + StructLikeSet deleteSet = StructLikeSet.create(deleteSchema.asStruct()); + + while (!pageSource.isFinished()) { + Page page = pageSource.getNextPage(); + if (page == null) { + continue; + } + + for (int position = 0; position < page.getPositionCount(); position++) { + deleteSet.add(new TrinoRow(types, page, position)); + } + } + + return new EqualityDeleteFilter(deleteSchema, deleteSet); + } +} diff --git a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/delete/LazyTrinoRow.java b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/delete/LazyTrinoRow.java new file mode 100644 index 000000000000..32bcbbbf595a --- /dev/null +++ b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/delete/LazyTrinoRow.java @@ -0,0 +1,75 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.trino.plugin.iceberg.delete; + +import io.trino.spi.Page; +import io.trino.spi.type.Type; +import org.apache.iceberg.StructLike; + +import static com.google.common.base.Preconditions.checkArgument; +import static com.google.common.base.Preconditions.checkElementIndex; +import static io.trino.plugin.iceberg.IcebergPageSink.getIcebergValue; +import static java.util.Objects.requireNonNull; + +/** + * Lazy version of {@link TrinoRow}. + */ +final class LazyTrinoRow + implements StructLike +{ + private final Type[] types; + private final Page page; + private final int position; + private final Object[] values; + + public LazyTrinoRow(Type[] types, Page page, int position) + { + checkArgument(types.length == page.getChannelCount(), "mismatched types for page"); + this.types = requireNonNull(types, "types is null"); + this.page = requireNonNull(page, "page is null"); + checkElementIndex(position, page.getPositionCount(), "page position"); + this.position = position; + this.values = new Object[types.length]; + } + + @Override + public int size() + { + return page.getChannelCount(); + } + + @Override + public T get(int i, Class clazz) + { + return clazz.cast(get(i)); + } + + @Override + public void set(int i, T t) + { + throw new UnsupportedOperationException(); + } + + private Object get(int i) + { + Object value = values[i]; + if (value != null) { + return value; + } + + value = getIcebergValue(page.getBlock(i), position, types[i]); + values[i] = value; + return value; + } +} diff --git a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/delete/PositionDeleteFilter.java b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/delete/PositionDeleteFilter.java new file mode 100644 index 000000000000..303c85875f18 --- /dev/null +++ b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/delete/PositionDeleteFilter.java @@ -0,0 +1,123 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.trino.plugin.iceberg.delete; + +import io.airlift.slice.Slice; +import io.trino.plugin.iceberg.IcebergColumnHandle; +import io.trino.spi.Page; +import io.trino.spi.block.Block; +import io.trino.spi.connector.ConnectorPageSource; +import org.apache.iceberg.MetadataColumns; +import org.apache.iceberg.Schema; +import org.roaringbitmap.longlong.ImmutableLongBitmapDataProvider; +import org.roaringbitmap.longlong.LongBitmapDataProvider; + +import java.util.List; + +import static com.google.common.base.Preconditions.checkArgument; +import static io.trino.spi.type.BigintType.BIGINT; +import static io.trino.spi.type.VarcharType.VARCHAR; +import static java.util.Objects.requireNonNull; + +public final class PositionDeleteFilter + implements DeleteFilter +{ + private final ImmutableLongBitmapDataProvider deletedRows; + + public PositionDeleteFilter(ImmutableLongBitmapDataProvider deletedRows) + { + this.deletedRows = requireNonNull(deletedRows, "deletedRows is null"); + } + + @Override + public Schema schema() + { + return new Schema(MetadataColumns.ROW_POSITION); + } + + @Override + public RowPredicate createPredicate(List columns) + { + int filePosChannel = rowPositionChannel(columns); + return (page, position) -> { + long filePos = BIGINT.getLong(page.getBlock(filePosChannel), position); + return !deletedRows.contains(filePos); + }; + } + + private static int rowPositionChannel(List columns) + { + for (int i = 0; i < columns.size(); i++) { + if (columns.get(i).isRowPositionColumn()) { + return i; + } + } + throw new IllegalArgumentException("No row position column"); + } + + public static void readPositionDeletes(ConnectorPageSource pageSource, Slice targetPath, LongBitmapDataProvider deletedRows) + { + CachingVarcharComparator comparator = new CachingVarcharComparator(targetPath); + + // Use a linear search since we expect most deletion files to only contain + // entries for a single path. The comparison cost is minimal due if the + // path values are dictionary encoded, since we only do the comparison once. + while (!pageSource.isFinished()) { + Page page = pageSource.getNextPage(); + if (page == null) { + continue; + } + + Block pathBlock = page.getBlock(0); + Block posBlock = page.getBlock(1); + + for (int position = 0; position < page.getPositionCount(); position++) { + int result = comparator.compare(pathBlock, position); + if (result > 0) { + // deletion files are sorted by path, so we're done + return; + } + if (result == 0) { + deletedRows.addLong(BIGINT.getLong(posBlock, position)); + } + } + } + } + + private static final class CachingVarcharComparator + { + private final Slice reference; + private int result; + private Slice value; + + public CachingVarcharComparator(Slice reference) + { + this.reference = requireNonNull(reference, "reference is null"); + } + + @SuppressWarnings({"ObjectEquality", "ReferenceEquality"}) + public int compare(Block block, int position) + { + checkArgument(!block.isNull(position), "position is null"); + Slice next = VARCHAR.getSlice(block, position); + // The expected case is a dictionary block with many entries for the + // same path. Only perform a comparison if the object has changed. + if (value != next) { + value = next; + result = value.compareTo(reference); + } + return result; + } + } +} diff --git a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/delete/RowPredicate.java b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/delete/RowPredicate.java new file mode 100644 index 000000000000..e02b4834bd4e --- /dev/null +++ b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/delete/RowPredicate.java @@ -0,0 +1,46 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.trino.plugin.iceberg.delete; + +import io.trino.spi.Page; + +import static java.util.Objects.requireNonNull; + +public interface RowPredicate +{ + boolean test(Page page, int position); + + default RowPredicate and(RowPredicate other) + { + requireNonNull(other, "other is null"); + return (page, position) -> test(page, position) && other.test(page, position); + } + + default Page filterPage(Page page) + { + int positionCount = page.getPositionCount(); + int[] retained = new int[positionCount]; + int retainedCount = 0; + for (int position = 0; position < positionCount; position++) { + if (test(page, position)) { + retained[retainedCount] = position; + retainedCount++; + } + } + if (retainedCount == positionCount) { + return page; + } + return page.getPositions(retained, 0, retainedCount); + } +} diff --git a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/delete/TrinoDeleteFile.java b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/delete/TrinoDeleteFile.java deleted file mode 100644 index 59b72bdfd39e..000000000000 --- a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/delete/TrinoDeleteFile.java +++ /dev/null @@ -1,341 +0,0 @@ -/* - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package io.trino.plugin.iceberg.delete; - -import com.fasterxml.jackson.annotation.JsonCreator; -import com.fasterxml.jackson.annotation.JsonIgnore; -import com.fasterxml.jackson.annotation.JsonProperty; -import com.google.common.collect.ImmutableList; -import com.google.common.collect.ImmutableMap; -import io.trino.plugin.iceberg.IcebergSplit; -import org.apache.iceberg.DeleteFile; -import org.apache.iceberg.FileContent; -import org.apache.iceberg.FileFormat; -import org.apache.iceberg.StructLike; -import org.openjdk.jol.info.ClassLayout; - -import javax.annotation.Nullable; - -import java.nio.ByteBuffer; -import java.util.List; -import java.util.Map; -import java.util.function.ToLongFunction; - -import static com.google.common.collect.ImmutableMap.toImmutableMap; -import static io.airlift.slice.SizeOf.SIZE_OF_INT; -import static io.airlift.slice.SizeOf.SIZE_OF_LONG; -import static io.airlift.slice.SizeOf.estimatedSizeOf; -import static java.util.Objects.requireNonNull; - -/** - * A Jackson-serializable implementation of the Iceberg {@link DeleteFile}. Allows for delete files to be included in {@link IcebergSplit}. - */ -public class TrinoDeleteFile - implements DeleteFile -{ - private static final long INSTANCE_SIZE = ClassLayout.parseClass(TrinoDeleteFile.class).instanceSize(); - - @Nullable private final Long pos; - private final int specId; - private final FileContent fileContent; - private final String path; - private final FileFormat format; - private final long recordCount; - private final long fileSizeInBytes; - @Nullable private final Map columnSizes; - @Nullable private final Map valueCounts; - @Nullable private final Map nullValueCounts; - @Nullable private final Map nanValueCounts; - @Nullable private final Map lowerBounds; - @Nullable private final Map upperBounds; - @Nullable private final byte[] keyMetadata; - @Nullable private final List equalityFieldIds; - @Nullable private final Integer sortOrderId; - @Nullable private final List splitOffsets; - - public static TrinoDeleteFile copyOf(DeleteFile deleteFile) - { - return new TrinoDeleteFile( - deleteFile.pos(), - deleteFile.specId(), - deleteFile.content(), - deleteFile.path().toString(), - deleteFile.format(), - deleteFile.recordCount(), - deleteFile.fileSizeInBytes(), - deleteFile.columnSizes(), - deleteFile.valueCounts(), - deleteFile.nullValueCounts(), - deleteFile.nanValueCounts(), - deleteFile.lowerBounds() == null ? null : deleteFile.lowerBounds().entrySet().stream() - .collect(toImmutableMap(Map.Entry::getKey, entry -> entry.getValue().array())), - deleteFile.upperBounds() == null ? null : deleteFile.upperBounds().entrySet().stream() - .collect(toImmutableMap(Map.Entry::getKey, entry -> entry.getValue().array())), - deleteFile.keyMetadata() == null ? null : deleteFile.keyMetadata().array(), - deleteFile.equalityFieldIds(), - deleteFile.sortOrderId(), - deleteFile.splitOffsets()); - } - - @JsonCreator - public TrinoDeleteFile( - @JsonProperty("pos") @Nullable Long pos, - @JsonProperty("specId") int specId, - @JsonProperty("fileContent") FileContent fileContent, - @JsonProperty("path") String path, - @JsonProperty("format") FileFormat format, - @JsonProperty("recordCount") long recordCount, - @JsonProperty("fileSizeInBytes") long fileSizeInBytes, - @JsonProperty("columnSizes") @Nullable Map columnSizes, - @JsonProperty("valueCounts") @Nullable Map valueCounts, - @JsonProperty("nullValueCounts") @Nullable Map nullValueCounts, - @JsonProperty("nanValueCounts") @Nullable Map nanValueCounts, - @JsonProperty("lowerBounds") @Nullable Map lowerBounds, - @JsonProperty("upperBounds") @Nullable Map upperBounds, - @JsonProperty("keyMetadata") @Nullable byte[] keyMetadata, - @JsonProperty("equalityFieldIds") @Nullable List equalityFieldIds, - @JsonProperty("sortOrderId") @Nullable Integer sortOrderId, - @JsonProperty("splitOffsets") @Nullable List splitOffsets) - { - this.pos = pos; - this.specId = specId; - this.fileContent = requireNonNull(fileContent, "fileContent is null"); - this.path = requireNonNull(path, "path is null"); - this.format = requireNonNull(format, "format is null"); - this.recordCount = recordCount; - this.fileSizeInBytes = fileSizeInBytes; - this.columnSizes = columnSizes == null ? null : ImmutableMap.copyOf(columnSizes); - this.valueCounts = valueCounts == null ? null : ImmutableMap.copyOf(valueCounts); - this.nullValueCounts = nullValueCounts == null ? null : ImmutableMap.copyOf(nullValueCounts); - this.nanValueCounts = nanValueCounts == null ? null : ImmutableMap.copyOf(nanValueCounts); - this.lowerBounds = lowerBounds == null ? null : ImmutableMap.copyOf(lowerBounds); - this.upperBounds = upperBounds == null ? null : ImmutableMap.copyOf(upperBounds); - this.keyMetadata = keyMetadata == null ? null : keyMetadata.clone(); - this.equalityFieldIds = equalityFieldIds == null ? null : ImmutableList.copyOf(equalityFieldIds); - this.sortOrderId = sortOrderId; - this.splitOffsets = splitOffsets == null ? null : ImmutableList.copyOf(splitOffsets); - } - - @Override - @JsonProperty("pos") - @Nullable - public Long pos() - { - return pos; - } - - @Override - @JsonProperty("specId") - public int specId() - { - return specId; - } - - @Override - @JsonProperty("fileContent") - public FileContent content() - { - return fileContent; - } - - @Override - @JsonProperty("path") - public CharSequence path() - { - return path; - } - - @Override - @JsonProperty("format") - public FileFormat format() - { - return format; - } - - // TODO: Probably need to figure out how to serialize this - @Override - @JsonIgnore - public StructLike partition() - { - throw new UnsupportedOperationException(); - } - - @Override - @JsonProperty("recordCount") - public long recordCount() - { - return recordCount; - } - - @Override - @JsonProperty("fileSizeInBytes") - public long fileSizeInBytes() - { - return fileSizeInBytes; - } - - @Override - @JsonProperty("columnSizes") - @Nullable - public Map columnSizes() - { - return columnSizes; - } - - @Override - @JsonProperty("valueCounts") - @Nullable - public Map valueCounts() - { - return valueCounts; - } - - @Override - @JsonProperty("nullValueCounts") - @Nullable - public Map nullValueCounts() - { - return nullValueCounts; - } - - @Override - @JsonProperty("nanValueCounts") - @Nullable - public Map nanValueCounts() - { - return nanValueCounts; - } - - @JsonProperty("lowerBounds") - @Nullable - public Map lowerBoundsAsByteArray() - { - return lowerBounds; - } - - @Override - @Nullable - public Map lowerBounds() - { - if (lowerBounds == null) { - return null; - } - return lowerBounds.entrySet().stream() - .collect(toImmutableMap(Map.Entry::getKey, entry -> ByteBuffer.wrap(entry.getValue()))); - } - - @JsonProperty("upperBounds") - @Nullable - public Map upperBoundsAsByteArray() - { - return upperBounds; - } - - @Override - @Nullable - public Map upperBounds() - { - if (upperBounds == null) { - return null; - } - return upperBounds.entrySet().stream() - .collect(toImmutableMap(Map.Entry::getKey, entry -> ByteBuffer.wrap(entry.getValue()))); - } - - @JsonProperty("keyMetadata") - @Nullable - public byte[] keyMetadataAsByteArray() - { - return keyMetadata; - } - - @Override - @Nullable - public ByteBuffer keyMetadata() - { - if (keyMetadata == null) { - return null; - } - return ByteBuffer.wrap(keyMetadata); - } - - @Override - @JsonProperty("equalityFieldIds") - @Nullable - public List equalityFieldIds() - { - return equalityFieldIds; - } - - @Override - @JsonProperty("sortOrderId") - @Nullable - public Integer sortOrderId() - { - return sortOrderId; - } - - @Override - @JsonProperty("splitOffsets") - @Nullable - public List splitOffsets() - { - return splitOffsets; - } - - @Override - public DeleteFile copy() - { - return this; - } - - @Override - public DeleteFile copyWithoutStats() - { - return new TrinoDeleteFile( - pos, - specId, - fileContent, - path, - format, - recordCount, - fileSizeInBytes, - null, - null, - null, - null, - null, - null, - keyMetadata, - equalityFieldIds, - sortOrderId, - splitOffsets); - } - - public long getRetainedSizeInBytes() - { - ToLongFunction intSizeOf = ignored -> SIZE_OF_INT; - ToLongFunction longSizeOf = ignored -> SIZE_OF_LONG; - return INSTANCE_SIZE - + estimatedSizeOf(path) - + estimatedSizeOf(columnSizes, intSizeOf, longSizeOf) - + estimatedSizeOf(nullValueCounts, intSizeOf, longSizeOf) - + estimatedSizeOf(nanValueCounts, intSizeOf, longSizeOf) - + estimatedSizeOf(lowerBounds, intSizeOf, value -> value.length) - + estimatedSizeOf(upperBounds, intSizeOf, value -> value.length) - + (keyMetadata == null ? 0 : keyMetadata.length) - + estimatedSizeOf(equalityFieldIds, intSizeOf) - + estimatedSizeOf(splitOffsets, longSizeOf); - } -} diff --git a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/delete/TrinoDeleteFilter.java b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/delete/TrinoDeleteFilter.java deleted file mode 100644 index b1a12a4a71b1..000000000000 --- a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/delete/TrinoDeleteFilter.java +++ /dev/null @@ -1,80 +0,0 @@ -/* - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package io.trino.plugin.iceberg.delete; - -import io.trino.plugin.iceberg.IcebergColumnHandle; -import org.apache.iceberg.FileScanTask; -import org.apache.iceberg.Schema; -import org.apache.iceberg.StructLike; -import org.apache.iceberg.data.DeleteFilter; -import org.apache.iceberg.io.FileIO; -import org.apache.iceberg.io.InputFile; -import org.apache.iceberg.types.Types; - -import java.util.List; - -import static com.google.common.collect.ImmutableList.toImmutableList; -import static io.trino.plugin.iceberg.IcebergColumnHandle.TRINO_UPDATE_ROW_ID_COLUMN_ID; -import static io.trino.plugin.iceberg.IcebergColumnHandle.TRINO_UPDATE_ROW_ID_COLUMN_NAME; -import static java.util.Objects.requireNonNull; -import static org.apache.iceberg.MetadataColumns.FILE_PATH; -import static org.apache.iceberg.MetadataColumns.IS_DELETED; -import static org.apache.iceberg.MetadataColumns.ROW_POSITION; - -public class TrinoDeleteFilter - extends DeleteFilter -{ - private final FileIO fileIO; - - public TrinoDeleteFilter(FileScanTask task, Schema tableSchema, List requestedColumns, FileIO fileIO) - { - super(task.file().path().toString(), task.deletes(), tableSchema, toSchema(tableSchema, requestedColumns)); - this.fileIO = requireNonNull(fileIO, "fileIO is null"); - } - - @Override - protected StructLike asStructLike(TrinoRow row) - { - return row; - } - - @Override - protected InputFile getInputFile(String s) - { - return fileIO.newInputFile(s); - } - - private static Schema toSchema(Schema tableSchema, List requestedColumns) - { - return new Schema(requestedColumns.stream().map(column -> toNestedField(tableSchema, column)).collect(toImmutableList())); - } - - private static Types.NestedField toNestedField(Schema tableSchema, IcebergColumnHandle columnHandle) - { - if (columnHandle.isRowPositionColumn()) { - return ROW_POSITION; - } - if (columnHandle.isIsDeletedColumn()) { - return IS_DELETED; - } - if (columnHandle.isPathColumn()) { - return FILE_PATH; - } - if (columnHandle.isUpdateRowIdColumn()) { - return Types.NestedField.of(TRINO_UPDATE_ROW_ID_COLUMN_ID, false, TRINO_UPDATE_ROW_ID_COLUMN_NAME, Types.StructType.of()); - } - - return tableSchema.findField(columnHandle.getId()); - } -} diff --git a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/delete/TrinoRow.java b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/delete/TrinoRow.java index 3a597b7a552e..1852b8b5e98d 100644 --- a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/delete/TrinoRow.java +++ b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/delete/TrinoRow.java @@ -13,50 +13,39 @@ */ package io.trino.plugin.iceberg.delete; -import com.google.common.collect.AbstractIterator; import io.trino.spi.Page; import io.trino.spi.type.Type; import org.apache.iceberg.StructLike; -import javax.annotation.Nullable; +import java.util.Arrays; import static com.google.common.base.Preconditions.checkArgument; import static io.trino.plugin.iceberg.IcebergPageSink.getIcebergValue; -import static java.util.Objects.requireNonNull; -public class TrinoRow +final class TrinoRow implements StructLike { - private final Type[] types; - private final Page page; - private final int position; + private final Object[] values; - private TrinoRow(Type[] types, Page page, int position) + public TrinoRow(Type[] types, Page page, int position) { - this.types = requireNonNull(types, "types list is null"); - this.page = requireNonNull(page, "page is null"); - checkArgument(position >= 0, "page position must be non-negative: %s", position); - this.position = position; - } - - /** - * Gets the position in the Block this row was originally created from. - */ - public int getPosition() - { - return position; + checkArgument(types.length == page.getChannelCount(), "mismatched types for page"); + values = new Object[types.length]; + for (int i = 0; i < values.length; i++) { + values[i] = getIcebergValue(page.getBlock(i), position, types[i]); + } } @Override public int size() { - return page.getChannelCount(); + return values.length; } @Override - public T get(int i, Class aClass) + public T get(int i, Class clazz) { - return aClass.cast(getIcebergValue(page.getBlock(i), position, types[i])); + return clazz.cast(values[i]); } @Override @@ -65,22 +54,9 @@ public void set(int i, T t) throw new UnsupportedOperationException(); } - public static Iterable fromPage(Type[] types, Page page, int positionCount) + @Override + public String toString() { - return () -> new AbstractIterator<>() { - private int nextPosition; - - @Nullable - @Override - protected TrinoRow computeNext() - { - if (nextPosition == positionCount) { - return endOfData(); - } - int position = nextPosition; - nextPosition++; - return new TrinoRow(types, page, position); - } - }; + return "TrinoRow" + Arrays.toString(values); } } diff --git a/pom.xml b/pom.xml index c22b98ce1bbe..25131cff88cf 100644 --- a/pom.xml +++ b/pom.xml @@ -1661,7 +1661,7 @@ org.roaringbitmap RoaringBitmap - 0.9.25 + 0.9.30