Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,8 @@
package io.trino.plugin.hive.orc;

import com.google.common.collect.ImmutableSet;
import io.trino.memory.context.AggregatedMemoryContext;
import io.trino.memory.context.LocalMemoryContext;
import io.trino.orc.OrcCorruptionException;
import io.trino.plugin.hive.AcidInfo;
import io.trino.plugin.hive.HdfsEnvironment;
Expand All @@ -30,6 +32,7 @@
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hive.ql.io.AcidUtils;
import org.apache.hadoop.hive.ql.io.BucketCodec;
import org.openjdk.jol.info.ClassLayout;

import javax.annotation.Nullable;
import javax.annotation.concurrent.NotThreadSafe;
Expand All @@ -43,6 +46,7 @@

import static com.google.common.base.MoreObjects.toStringHelper;
import static com.google.common.base.Verify.verify;
import static io.airlift.slice.SizeOf.sizeOfObjectArray;
import static io.trino.plugin.hive.BackgroundHiveSplitLoader.hasAttemptId;
import static io.trino.plugin.hive.HiveErrorCode.HIVE_BAD_DATA;
import static io.trino.plugin.hive.HiveErrorCode.HIVE_CURSOR_ERROR;
Expand All @@ -65,6 +69,7 @@ public class OrcDeletedRows
private final HdfsEnvironment hdfsEnvironment;
private final AcidInfo acidInfo;
private final OptionalInt bucketNumber;
private final LocalMemoryContext systemMemoryUsage;

@Nullable
private Set<RowId> deletedRows;
Expand All @@ -76,7 +81,8 @@ public OrcDeletedRows(
Configuration configuration,
HdfsEnvironment hdfsEnvironment,
AcidInfo acidInfo,
OptionalInt bucketNumber)
OptionalInt bucketNumber,
AggregatedMemoryContext systemMemoryContext)
{
this.sourceFileName = requireNonNull(sourceFileName, "sourceFileName is null");
this.pageSourceFactory = requireNonNull(pageSourceFactory, "pageSourceFactory is null");
Expand All @@ -85,6 +91,7 @@ public OrcDeletedRows(
this.hdfsEnvironment = requireNonNull(hdfsEnvironment, "hdfsEnvironment is null");
this.acidInfo = requireNonNull(acidInfo, "acidInfo is null");
this.bucketNumber = requireNonNull(bucketNumber, "bucketNumber is null");
this.systemMemoryUsage = requireNonNull(systemMemoryContext, "systemMemoryContext is null").newLocalMemoryContext(OrcDeletedRows.class.getSimpleName());
}

public MaskDeletedRowsFunction getMaskDeletedRowsFunction(Page sourcePage, OptionalLong startRowId)
Expand Down Expand Up @@ -262,10 +269,19 @@ private Set<RowId> getDeletedRows()
throw new TrinoException(HIVE_CURSOR_ERROR, "Failed to read ORC delete delta file: " + path, e);
}
}

deletedRows = deletedRowsBuilder.build();
// Not updating memory usage in the loop, when deletedRows are built, as recorded information is propagated
// to operator memory context via OrcPageSource only at the end of processing of page.
systemMemoryUsage.setBytes(memorySizeOfRowIdsArray(deletedRows.size()));
return deletedRows;
}

private long memorySizeOfRowIdsArray(int rowCount)
{
return sizeOfObjectArray(rowCount) + (long) rowCount * RowId.INSTANCE_SIZE;
}

private static Path createPath(AcidInfo acidInfo, AcidInfo.DeleteDeltaInfo deleteDeltaInfo, String fileName)
{
Path directory = new Path(acidInfo.getPartitionLocation(), deleteDeltaInfo.getDirectoryName());
Expand All @@ -285,6 +301,8 @@ private static Path createPath(AcidInfo acidInfo, AcidInfo.DeleteDeltaInfo delet

private static class RowId
{
public static final int INSTANCE_SIZE = ClassLayout.parseClass(RowId.class).instanceSize();

private final long originalTransaction;
private final int bucket;
private final int statementId;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -390,7 +390,8 @@ else if (column.getBaseHiveColumnIndex() < fileColumns.size()) {
configuration,
hdfsEnvironment,
info,
bucketNumber));
bucketNumber,
systemMemoryUsage));

Optional<Long> originalFileRowId = acidInfo
.filter(OrcPageSourceFactory::hasOriginalFilesAndDeleteDeltas)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
import java.util.OptionalLong;
import java.util.Set;

import static io.trino.memory.context.AggregatedMemoryContext.newSimpleAggregatedMemoryContext;
import static io.trino.plugin.hive.HiveTestUtils.HDFS_ENVIRONMENT;
import static io.trino.plugin.hive.HiveTestUtils.SESSION;
import static io.trino.spi.type.BigintType.BIGINT;
Expand Down Expand Up @@ -167,7 +168,8 @@ private static OrcDeletedRows createOrcDeletedRows(AcidInfo acidInfo, String sou
configuration,
HDFS_ENVIRONMENT,
acidInfo,
OptionalInt.of(0));
OptionalInt.of(0),
newSimpleAggregatedMemoryContext());
}

private Page createTestPage(int originalTransactionStart, int originalTransactionEnd)
Expand Down