From e3560ed72c71a91142d5c44b7ac7ec547b7bbe51 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=C5=81ukasz=20Osipiuk?= Date: Tue, 9 Nov 2021 14:02:04 +0100 Subject: [PATCH] Account memory for OrcDeletedRows --- .../trino/plugin/hive/orc/OrcDeletedRows.java | 20 ++++++++++++++++++- .../plugin/hive/orc/OrcPageSourceFactory.java | 3 ++- .../plugin/hive/orc/TestOrcDeletedRows.java | 4 +++- 3 files changed, 24 insertions(+), 3 deletions(-) diff --git a/plugin/trino-hive/src/main/java/io/trino/plugin/hive/orc/OrcDeletedRows.java b/plugin/trino-hive/src/main/java/io/trino/plugin/hive/orc/OrcDeletedRows.java index e14ab74be859..7c83a1db4580 100644 --- a/plugin/trino-hive/src/main/java/io/trino/plugin/hive/orc/OrcDeletedRows.java +++ b/plugin/trino-hive/src/main/java/io/trino/plugin/hive/orc/OrcDeletedRows.java @@ -14,6 +14,8 @@ package io.trino.plugin.hive.orc; import com.google.common.collect.ImmutableSet; +import io.trino.memory.context.AggregatedMemoryContext; +import io.trino.memory.context.LocalMemoryContext; import io.trino.orc.OrcCorruptionException; import io.trino.plugin.hive.AcidInfo; import io.trino.plugin.hive.HdfsEnvironment; @@ -30,6 +32,7 @@ import org.apache.hadoop.fs.Path; import org.apache.hadoop.hive.ql.io.AcidUtils; import org.apache.hadoop.hive.ql.io.BucketCodec; +import org.openjdk.jol.info.ClassLayout; import javax.annotation.Nullable; import javax.annotation.concurrent.NotThreadSafe; @@ -43,6 +46,7 @@ import static com.google.common.base.MoreObjects.toStringHelper; import static com.google.common.base.Verify.verify; +import static io.airlift.slice.SizeOf.sizeOfObjectArray; import static io.trino.plugin.hive.BackgroundHiveSplitLoader.hasAttemptId; import static io.trino.plugin.hive.HiveErrorCode.HIVE_BAD_DATA; import static io.trino.plugin.hive.HiveErrorCode.HIVE_CURSOR_ERROR; @@ -65,6 +69,7 @@ public class OrcDeletedRows private final HdfsEnvironment hdfsEnvironment; private final AcidInfo acidInfo; private final OptionalInt bucketNumber; + private final LocalMemoryContext systemMemoryUsage; @Nullable private Set deletedRows; @@ -76,7 +81,8 @@ public OrcDeletedRows( Configuration configuration, HdfsEnvironment hdfsEnvironment, AcidInfo acidInfo, - OptionalInt bucketNumber) + OptionalInt bucketNumber, + AggregatedMemoryContext systemMemoryContext) { this.sourceFileName = requireNonNull(sourceFileName, "sourceFileName is null"); this.pageSourceFactory = requireNonNull(pageSourceFactory, "pageSourceFactory is null"); @@ -85,6 +91,7 @@ public OrcDeletedRows( this.hdfsEnvironment = requireNonNull(hdfsEnvironment, "hdfsEnvironment is null"); this.acidInfo = requireNonNull(acidInfo, "acidInfo is null"); this.bucketNumber = requireNonNull(bucketNumber, "bucketNumber is null"); + this.systemMemoryUsage = requireNonNull(systemMemoryContext, "systemMemoryContext is null").newLocalMemoryContext(OrcDeletedRows.class.getSimpleName()); } public MaskDeletedRowsFunction getMaskDeletedRowsFunction(Page sourcePage, OptionalLong startRowId) @@ -262,10 +269,19 @@ private Set getDeletedRows() throw new TrinoException(HIVE_CURSOR_ERROR, "Failed to read ORC delete delta file: " + path, e); } } + deletedRows = deletedRowsBuilder.build(); + // Not updating memory usage in the loop, when deletedRows are built, as recorded information is propagated + // to operator memory context via OrcPageSource only at the end of processing of page. + systemMemoryUsage.setBytes(memorySizeOfRowIdsArray(deletedRows.size())); return deletedRows; } + private long memorySizeOfRowIdsArray(int rowCount) + { + return sizeOfObjectArray(rowCount) + (long) rowCount * RowId.INSTANCE_SIZE; + } + private static Path createPath(AcidInfo acidInfo, AcidInfo.DeleteDeltaInfo deleteDeltaInfo, String fileName) { Path directory = new Path(acidInfo.getPartitionLocation(), deleteDeltaInfo.getDirectoryName()); @@ -285,6 +301,8 @@ private static Path createPath(AcidInfo acidInfo, AcidInfo.DeleteDeltaInfo delet private static class RowId { + public static final int INSTANCE_SIZE = ClassLayout.parseClass(RowId.class).instanceSize(); + private final long originalTransaction; private final int bucket; private final int statementId; diff --git a/plugin/trino-hive/src/main/java/io/trino/plugin/hive/orc/OrcPageSourceFactory.java b/plugin/trino-hive/src/main/java/io/trino/plugin/hive/orc/OrcPageSourceFactory.java index 208ee67158d2..c9ef49bc5e78 100644 --- a/plugin/trino-hive/src/main/java/io/trino/plugin/hive/orc/OrcPageSourceFactory.java +++ b/plugin/trino-hive/src/main/java/io/trino/plugin/hive/orc/OrcPageSourceFactory.java @@ -390,7 +390,8 @@ else if (column.getBaseHiveColumnIndex() < fileColumns.size()) { configuration, hdfsEnvironment, info, - bucketNumber)); + bucketNumber, + systemMemoryUsage)); Optional originalFileRowId = acidInfo .filter(OrcPageSourceFactory::hasOriginalFilesAndDeleteDeltas) diff --git a/plugin/trino-hive/src/test/java/io/trino/plugin/hive/orc/TestOrcDeletedRows.java b/plugin/trino-hive/src/test/java/io/trino/plugin/hive/orc/TestOrcDeletedRows.java index 7204e2085698..3731929a7d90 100644 --- a/plugin/trino-hive/src/test/java/io/trino/plugin/hive/orc/TestOrcDeletedRows.java +++ b/plugin/trino-hive/src/test/java/io/trino/plugin/hive/orc/TestOrcDeletedRows.java @@ -33,6 +33,7 @@ import java.util.OptionalLong; import java.util.Set; +import static io.trino.memory.context.AggregatedMemoryContext.newSimpleAggregatedMemoryContext; import static io.trino.plugin.hive.HiveTestUtils.HDFS_ENVIRONMENT; import static io.trino.plugin.hive.HiveTestUtils.SESSION; import static io.trino.spi.type.BigintType.BIGINT; @@ -167,7 +168,8 @@ private static OrcDeletedRows createOrcDeletedRows(AcidInfo acidInfo, String sou configuration, HDFS_ENVIRONMENT, acidInfo, - OptionalInt.of(0)); + OptionalInt.of(0), + newSimpleAggregatedMemoryContext()); } private Page createTestPage(int originalTransactionStart, int originalTransactionEnd)