Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
24 changes: 6 additions & 18 deletions plugin/trino-iceberg/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@
<artifactId>trino-hive</artifactId>
<exclusions>
<exclusion>
<!-- calcite-core and iceberg-data have conflicting versions of org.apache.commons:commons-lang3 -->
<!-- calcite-core and Iceberg have conflicting versions of org.apache.commons:commons-lang3 -->
<groupId>com.linkedin.calcite</groupId>
<artifactId>calcite-core</artifactId>
</exclusion>
Expand Down Expand Up @@ -176,23 +176,6 @@
</exclusions>
</dependency>

<dependency>
<groupId>org.apache.iceberg</groupId>
<artifactId>iceberg-data</artifactId>
<version>${dep.iceberg.version}</version>
<exclusions>
<!-- The parquet-avro dependency uses a different version of org.apache.thrift:libthrift -->
<exclusion>
<groupId>org.apache.parquet</groupId>
<artifactId>parquet-avro</artifactId>
</exclusion>
<exclusion>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-api</artifactId>
</exclusion>
</exclusions>
</dependency>

<dependency>
<groupId>org.apache.iceberg</groupId>
<artifactId>iceberg-hive-metastore</artifactId>
Expand Down Expand Up @@ -221,6 +204,11 @@
</exclusions>
</dependency>

<dependency>
<groupId>org.roaringbitmap</groupId>
<artifactId>RoaringBitmap</artifactId>
</dependency>

<dependency>
<groupId>org.weakref</groupId>
<artifactId>jmxutils</artifactId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@
import io.airlift.slice.Slice;
import io.trino.plugin.hive.ReaderProjectionsAdapter;
import io.trino.plugin.iceberg.delete.IcebergPositionDeletePageSink;
import io.trino.plugin.iceberg.delete.TrinoRow;
import io.trino.plugin.iceberg.delete.RowPredicate;
import io.trino.spi.Page;
import io.trino.spi.TrinoException;
import io.trino.spi.block.Block;
Expand All @@ -27,10 +27,7 @@
import io.trino.spi.connector.ConnectorPageSource;
import io.trino.spi.connector.UpdatablePageSource;
import io.trino.spi.metrics.Metrics;
import io.trino.spi.type.Type;
import org.apache.iceberg.Schema;
import org.apache.iceberg.data.DeleteFilter;
import org.apache.iceberg.io.CloseableIterable;
import org.apache.iceberg.types.Types;

import javax.annotation.Nullable;
Expand Down Expand Up @@ -59,11 +56,10 @@ public class IcebergPageSource
implements UpdatablePageSource
{
private final Schema schema;
private final Type[] columnTypes;
private final int[] expectedColumnIndexes;
private final ConnectorPageSource delegate;
private final Optional<ReaderProjectionsAdapter> projectionsAdapter;
private final Optional<DeleteFilter<TrinoRow>> deleteFilter;
private final Optional<RowPredicate> deletePredicate;
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We probably need to track memory contained in this field to prevent worker node crashes.
See #9914
cc @losipiuk

Copy link
Copy Markdown
Member Author

@electrum electrum Jul 24, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Agreed, though we aren't doing that today, so this isn't a regression. We can do that as a follow up. I didn't see an easy way to get memory usage from RoaringBitmap. We could serialize and use that as an estimate, or do a simple estimate based on the cardinality (multiply by some constant factor).

private final Supplier<IcebergPositionDeletePageSink> positionDeleteSinkSupplier;
private final Supplier<IcebergPageSink> updatedRowPageSinkSupplier;
// An array with one element per field in the $row_id column. The value in the array points to the
Expand All @@ -85,10 +81,9 @@ public IcebergPageSource(
Schema schema,
List<IcebergColumnHandle> expectedColumns,
List<IcebergColumnHandle> requiredColumns,
List<IcebergColumnHandle> readColumns,
ConnectorPageSource delegate,
Optional<ReaderProjectionsAdapter> projectionsAdapter,
Optional<DeleteFilter<TrinoRow>> deleteFilter,
Optional<RowPredicate> deletePredicate,
Supplier<IcebergPositionDeletePageSink> positionDeleteSinkSupplier,
Supplier<IcebergPageSink> updatedRowPageSinkSupplier,
List<IcebergColumnHandle> updatedColumns)
Expand Down Expand Up @@ -120,12 +115,9 @@ public IcebergPageSource(
}
}

this.columnTypes = readColumns.stream()
.map(IcebergColumnHandle::getType)
.toArray(Type[]::new);
this.delegate = requireNonNull(delegate, "delegate is null");
this.projectionsAdapter = requireNonNull(projectionsAdapter, "projectionsAdapter is null");
this.deleteFilter = requireNonNull(deleteFilter, "deleteFilter is null");
this.deletePredicate = requireNonNull(deletePredicate, "deletePredicate is null");
this.positionDeleteSinkSupplier = requireNonNull(positionDeleteSinkSupplier, "positionDeleteSinkSupplier is null");
this.updatedRowPageSinkSupplier = requireNonNull(updatedRowPageSinkSupplier, "updatedRowPageSinkSupplier is null");
requireNonNull(updatedColumns, "updatedColumnFieldIds is null");
Expand Down Expand Up @@ -167,20 +159,8 @@ public Page getNextPage()
return null;
}

if (deleteFilter.isPresent()) {
int positionCount = dataPage.getPositionCount();
int[] positionsToKeep = new int[positionCount];
try (CloseableIterable<TrinoRow> filteredRows = deleteFilter.get().filter(CloseableIterable.withNoopClose(TrinoRow.fromPage(columnTypes, dataPage, positionCount)))) {
int positionsToKeepCount = 0;
for (TrinoRow rowToKeep : filteredRows) {
positionsToKeep[positionsToKeepCount] = rowToKeep.getPosition();
positionsToKeepCount++;
}
dataPage = dataPage.getPositions(positionsToKeep, 0, positionsToKeepCount);
}
catch (IOException e) {
throw new TrinoException(ICEBERG_BAD_DATA, "Failed to filter rows during merge-on-read operation", e);
}
if (deletePredicate.isPresent()) {
dataPage = deletePredicate.get().filterPage(dataPage);
}

if (projectionsAdapter.isPresent()) {
Expand Down
Loading