diff --git a/core/trino-main/src/main/java/io/trino/connector/CatalogName.java b/core/trino-main/src/main/java/io/trino/connector/CatalogName.java index 602a5a453349..62b2e6d489cc 100644 --- a/core/trino-main/src/main/java/io/trino/connector/CatalogName.java +++ b/core/trino-main/src/main/java/io/trino/connector/CatalogName.java @@ -15,14 +15,18 @@ import com.fasterxml.jackson.annotation.JsonCreator; import com.fasterxml.jackson.annotation.JsonValue; +import org.openjdk.jol.info.ClassLayout; import java.util.Objects; import static com.google.common.base.Preconditions.checkArgument; +import static io.airlift.slice.SizeOf.estimatedSizeOf; import static java.util.Objects.requireNonNull; public final class CatalogName { + private static final int INSTANCE_SIZE = ClassLayout.parseClass(CatalogName.class).instanceSize(); + private static final String INFORMATION_SCHEMA_CONNECTOR_PREFIX = "$info_schema@"; private static final String SYSTEM_TABLES_CONNECTOR_PREFIX = "$system@"; @@ -66,6 +70,12 @@ public String toString() return catalogName; } + public long getRetainedSizeInBytes() + { + return INSTANCE_SIZE + + estimatedSizeOf(catalogName); + } + public static boolean isInternalSystemConnector(CatalogName catalogName) { return catalogName.getCatalogName().startsWith(SYSTEM_TABLES_CONNECTOR_PREFIX) || diff --git a/core/trino-main/src/main/java/io/trino/connector/informationschema/InformationSchemaSplit.java b/core/trino-main/src/main/java/io/trino/connector/informationschema/InformationSchemaSplit.java index 0aea04eb4a5d..c25471f1349e 100644 --- a/core/trino-main/src/main/java/io/trino/connector/informationschema/InformationSchemaSplit.java +++ b/core/trino-main/src/main/java/io/trino/connector/informationschema/InformationSchemaSplit.java @@ -18,15 +18,19 @@ import com.google.common.collect.ImmutableList; import io.trino.spi.HostAddress; import io.trino.spi.connector.ConnectorSplit; +import org.openjdk.jol.info.ClassLayout; import java.util.List; import static com.google.common.base.Preconditions.checkArgument; +import static io.airlift.slice.SizeOf.estimatedSizeOf; import static java.util.Objects.requireNonNull; public class InformationSchemaSplit implements ConnectorSplit { + private static final int INSTANCE_SIZE = ClassLayout.parseClass(InformationSchemaSplit.class).instanceSize(); + private final List addresses; @JsonCreator @@ -56,4 +60,11 @@ public Object getInfo() { return this; } + + @Override + public long getRetainedSizeInBytes() + { + return INSTANCE_SIZE + + estimatedSizeOf(addresses, HostAddress::getRetainedSizeInBytes); + } } diff --git a/core/trino-main/src/main/java/io/trino/connector/system/SystemColumnHandle.java b/core/trino-main/src/main/java/io/trino/connector/system/SystemColumnHandle.java index 36604968aeb9..4bafb073cee3 100644 --- a/core/trino-main/src/main/java/io/trino/connector/system/SystemColumnHandle.java +++ b/core/trino-main/src/main/java/io/trino/connector/system/SystemColumnHandle.java @@ -18,15 +18,19 @@ import io.trino.spi.connector.ColumnHandle; import io.trino.spi.connector.ColumnMetadata; import io.trino.spi.connector.ConnectorTableMetadata; +import org.openjdk.jol.info.ClassLayout; import java.util.Map; import static com.google.common.collect.ImmutableMap.toImmutableMap; +import static io.airlift.slice.SizeOf.estimatedSizeOf; import static java.util.Objects.requireNonNull; public class SystemColumnHandle implements ColumnHandle { + private static final int INSTANCE_SIZE = ClassLayout.parseClass(SystemColumnHandle.class).instanceSize(); + private final String columnName; @JsonCreator @@ -67,6 +71,12 @@ public String toString() return columnName; } + public long getRetainedSizeInBytes() + { + return INSTANCE_SIZE + + estimatedSizeOf(columnName); + } + public static Map toSystemColumnHandles(ConnectorTableMetadata tableMetadata) { return tableMetadata.getColumns().stream().collect(toImmutableMap( diff --git a/core/trino-main/src/main/java/io/trino/connector/system/SystemSplit.java b/core/trino-main/src/main/java/io/trino/connector/system/SystemSplit.java index 022e66106a7c..9add5db9ef9b 100644 --- a/core/trino-main/src/main/java/io/trino/connector/system/SystemSplit.java +++ b/core/trino-main/src/main/java/io/trino/connector/system/SystemSplit.java @@ -20,16 +20,20 @@ import io.trino.spi.connector.ColumnHandle; import io.trino.spi.connector.ConnectorSplit; import io.trino.spi.predicate.TupleDomain; +import org.openjdk.jol.info.ClassLayout; import java.util.List; import static com.google.common.base.MoreObjects.toStringHelper; import static com.google.common.base.Preconditions.checkArgument; +import static io.airlift.slice.SizeOf.estimatedSizeOf; import static java.util.Objects.requireNonNull; public class SystemSplit implements ConnectorSplit { + private static final int INSTANCE_SIZE = ClassLayout.parseClass(SystemSplit.class).instanceSize(); + private final List addresses; private final TupleDomain constraint; @@ -74,6 +78,14 @@ public Object getInfo() return this; } + @Override + public long getRetainedSizeInBytes() + { + return INSTANCE_SIZE + + estimatedSizeOf(addresses, HostAddress::getRetainedSizeInBytes) + + constraint.getRetainedSizeInBytes(columnHandle -> ((SystemColumnHandle) columnHandle).getRetainedSizeInBytes()); + } + @Override public String toString() { diff --git a/core/trino-main/src/main/java/io/trino/execution/TaskId.java b/core/trino-main/src/main/java/io/trino/execution/TaskId.java index b0041f99953d..c82dbc3bf71f 100644 --- a/core/trino-main/src/main/java/io/trino/execution/TaskId.java +++ b/core/trino-main/src/main/java/io/trino/execution/TaskId.java @@ -16,17 +16,21 @@ import com.fasterxml.jackson.annotation.JsonCreator; import com.fasterxml.jackson.annotation.JsonValue; import io.trino.spi.QueryId; +import org.openjdk.jol.info.ClassLayout; import java.util.List; import java.util.Objects; import static com.google.common.base.Preconditions.checkArgument; +import static io.airlift.slice.SizeOf.estimatedSizeOf; import static io.trino.spi.QueryId.parseDottedId; import static java.lang.Integer.parseInt; import static java.util.Objects.requireNonNull; public class TaskId { + private static final int INSTANCE_SIZE = ClassLayout.parseClass(TaskId.class).instanceSize(); + @JsonCreator public static TaskId valueOf(String taskId) { @@ -94,4 +98,9 @@ public boolean equals(Object obj) TaskId other = (TaskId) obj; return Objects.equals(this.fullId, other.fullId); } + + public long getRetainedSizeInBytes() + { + return INSTANCE_SIZE + estimatedSizeOf(fullId); + } } diff --git a/core/trino-main/src/main/java/io/trino/execution/scheduler/PipelinedStageExecution.java b/core/trino-main/src/main/java/io/trino/execution/scheduler/PipelinedStageExecution.java index eeada585f39c..ad281bdb00bc 100644 --- a/core/trino-main/src/main/java/io/trino/execution/scheduler/PipelinedStageExecution.java +++ b/core/trino-main/src/main/java/io/trino/execution/scheduler/PipelinedStageExecution.java @@ -539,7 +539,7 @@ private static Split createExchangeSplit(RemoteTask sourceTask, RemoteTask desti // Fetch the results from the buffer assigned to the task based on id URI exchangeLocation = sourceTask.getTaskStatus().getSelf(); URI splitLocation = uriBuilderFrom(exchangeLocation).appendPath("results").appendPath(String.valueOf(destinationTask.getTaskId().getPartitionId())).build(); - return new Split(REMOTE_CONNECTOR_ID, new RemoteSplit(sourceTask.getTaskId(), splitLocation), Lifespan.taskWide()); + return new Split(REMOTE_CONNECTOR_ID, new RemoteSplit(sourceTask.getTaskId(), splitLocation.toString()), Lifespan.taskWide()); } public enum State diff --git a/core/trino-main/src/main/java/io/trino/operator/ExchangeOperator.java b/core/trino-main/src/main/java/io/trino/operator/ExchangeOperator.java index 0893c08d2648..33987fcca719 100644 --- a/core/trino-main/src/main/java/io/trino/operator/ExchangeOperator.java +++ b/core/trino-main/src/main/java/io/trino/operator/ExchangeOperator.java @@ -24,6 +24,7 @@ import io.trino.split.RemoteSplit; import io.trino.sql.planner.plan.PlanNodeId; +import java.net.URI; import java.util.Optional; import java.util.function.Supplier; @@ -124,7 +125,7 @@ public Supplier> addSplit(Split split) checkArgument(split.getCatalogName().equals(REMOTE_CONNECTOR_ID), "split is not a remote split"); RemoteSplit remoteSplit = (RemoteSplit) split.getConnectorSplit(); - exchangeClient.addLocation(remoteSplit.getTaskId(), remoteSplit.getLocation()); + exchangeClient.addLocation(remoteSplit.getTaskId(), URI.create(remoteSplit.getLocation())); return Optional::empty; } diff --git a/core/trino-main/src/main/java/io/trino/operator/MergeOperator.java b/core/trino-main/src/main/java/io/trino/operator/MergeOperator.java index 8a32faae1469..46e1b2edbd2e 100644 --- a/core/trino-main/src/main/java/io/trino/operator/MergeOperator.java +++ b/core/trino-main/src/main/java/io/trino/operator/MergeOperator.java @@ -29,6 +29,7 @@ import java.io.IOException; import java.io.UncheckedIOException; +import java.net.URI; import java.util.ArrayList; import java.util.List; import java.util.Optional; @@ -160,7 +161,7 @@ public Supplier> addSplit(Split split) TaskContext taskContext = operatorContext.getDriverContext().getPipelineContext().getTaskContext(); ExchangeClient exchangeClient = closer.register(exchangeClientSupplier.get(operatorContext.localSystemMemoryContext(), taskContext::sourceTaskFailed, RetryPolicy.NONE)); RemoteSplit remoteSplit = (RemoteSplit) split.getConnectorSplit(); - exchangeClient.addLocation(remoteSplit.getTaskId(), remoteSplit.getLocation()); + exchangeClient.addLocation(remoteSplit.getTaskId(), URI.create(remoteSplit.getLocation())); exchangeClient.noMoreLocations(); pageProducers.add(exchangeClient.pages() .map(serializedPage -> { diff --git a/core/trino-main/src/main/java/io/trino/operator/index/IndexSplit.java b/core/trino-main/src/main/java/io/trino/operator/index/IndexSplit.java index 06e95b3a4d66..1981a2817a01 100644 --- a/core/trino-main/src/main/java/io/trino/operator/index/IndexSplit.java +++ b/core/trino-main/src/main/java/io/trino/operator/index/IndexSplit.java @@ -49,6 +49,13 @@ public Object getInfo() return null; } + @Override + public long getRetainedSizeInBytes() + { + // IndexSplit is expected to be short lived and is not expected to be queried for the memory it retains + throw new UnsupportedOperationException(); + } + public RecordSet getKeyRecordSet() { return keyRecordSet; diff --git a/core/trino-main/src/main/java/io/trino/split/EmptySplit.java b/core/trino-main/src/main/java/io/trino/split/EmptySplit.java index 14cb823e4bd6..a6f571a10afc 100644 --- a/core/trino-main/src/main/java/io/trino/split/EmptySplit.java +++ b/core/trino-main/src/main/java/io/trino/split/EmptySplit.java @@ -19,6 +19,7 @@ import io.trino.connector.CatalogName; import io.trino.spi.HostAddress; import io.trino.spi.connector.ConnectorSplit; +import org.openjdk.jol.info.ClassLayout; import java.util.List; @@ -27,6 +28,8 @@ public class EmptySplit implements ConnectorSplit { + private static final int INSTANCE_SIZE = ClassLayout.parseClass(EmptySplit.class).instanceSize(); + private final CatalogName catalogName; @JsonCreator @@ -54,6 +57,13 @@ public Object getInfo() return this; } + @Override + public long getRetainedSizeInBytes() + { + return INSTANCE_SIZE + + catalogName.getRetainedSizeInBytes(); + } + @JsonProperty public CatalogName getCatalogName() { diff --git a/core/trino-main/src/main/java/io/trino/split/RemoteSplit.java b/core/trino-main/src/main/java/io/trino/split/RemoteSplit.java index e043fd1a2b22..ff1f034ddd09 100644 --- a/core/trino-main/src/main/java/io/trino/split/RemoteSplit.java +++ b/core/trino-main/src/main/java/io/trino/split/RemoteSplit.java @@ -19,21 +19,24 @@ import io.trino.execution.TaskId; import io.trino.spi.HostAddress; import io.trino.spi.connector.ConnectorSplit; +import org.openjdk.jol.info.ClassLayout; -import java.net.URI; import java.util.List; import static com.google.common.base.MoreObjects.toStringHelper; +import static io.airlift.slice.SizeOf.estimatedSizeOf; import static java.util.Objects.requireNonNull; public class RemoteSplit implements ConnectorSplit { + private static final int INSTANCE_SIZE = ClassLayout.parseClass(RemoteSplit.class).instanceSize(); + private final TaskId taskId; - private final URI location; + private final String location; @JsonCreator - public RemoteSplit(@JsonProperty("taskId") TaskId taskId, @JsonProperty("location") URI location) + public RemoteSplit(@JsonProperty("taskId") TaskId taskId, @JsonProperty("location") String location) { this.taskId = requireNonNull(taskId, "taskId is null"); this.location = requireNonNull(location, "location is null"); @@ -46,7 +49,7 @@ public TaskId getTaskId() } @JsonProperty - public URI getLocation() + public String getLocation() { return location; } @@ -77,4 +80,12 @@ public String toString() .add("location", location) .toString(); } + + @Override + public long getRetainedSizeInBytes() + { + return INSTANCE_SIZE + + taskId.getRetainedSizeInBytes() + + estimatedSizeOf(location); + } } diff --git a/core/trino-main/src/main/java/io/trino/testing/TestingSplit.java b/core/trino-main/src/main/java/io/trino/testing/TestingSplit.java index fb5d0b414645..05c0bded3b24 100644 --- a/core/trino-main/src/main/java/io/trino/testing/TestingSplit.java +++ b/core/trino-main/src/main/java/io/trino/testing/TestingSplit.java @@ -18,12 +18,17 @@ import com.google.common.collect.ImmutableList; import io.trino.spi.HostAddress; import io.trino.spi.connector.ConnectorSplit; +import org.openjdk.jol.info.ClassLayout; import java.util.List; +import static io.airlift.slice.SizeOf.estimatedSizeOf; + public class TestingSplit implements ConnectorSplit { + private static final int INSTANCE_SIZE = ClassLayout.parseClass(TestingSplit.class).instanceSize(); + private static final HostAddress localHost = HostAddress.fromString("127.0.0.1"); private final boolean remotelyAccessible; @@ -70,4 +75,11 @@ public Object getInfo() { return this; } + + @Override + public long getRetainedSizeInBytes() + { + return INSTANCE_SIZE + + estimatedSizeOf(addresses, HostAddress::getRetainedSizeInBytes); + } } diff --git a/core/trino-main/src/test/java/io/trino/connector/MockConnector.java b/core/trino-main/src/test/java/io/trino/connector/MockConnector.java index e06692b8eb3b..3643d2f4bdb6 100644 --- a/core/trino-main/src/test/java/io/trino/connector/MockConnector.java +++ b/core/trino-main/src/test/java/io/trino/connector/MockConnector.java @@ -727,5 +727,11 @@ public Object getInfo() { return "mock connector split"; } + + @Override + public long getRetainedSizeInBytes() + { + return 0; + } } } diff --git a/core/trino-main/src/test/java/io/trino/execution/BenchmarkNodeScheduler.java b/core/trino-main/src/test/java/io/trino/execution/BenchmarkNodeScheduler.java index b89252294f3c..928b4e42f346 100644 --- a/core/trino-main/src/test/java/io/trino/execution/BenchmarkNodeScheduler.java +++ b/core/trino-main/src/test/java/io/trino/execution/BenchmarkNodeScheduler.java @@ -53,6 +53,7 @@ import org.openjdk.jmh.annotations.State; import org.openjdk.jmh.annotations.TearDown; import org.openjdk.jmh.annotations.Warmup; +import org.openjdk.jol.info.ClassLayout; import java.net.URI; import java.util.ArrayList; @@ -68,6 +69,7 @@ import java.util.concurrent.TimeUnit; import static io.airlift.concurrent.Threads.daemonThreadsNamed; +import static io.airlift.slice.SizeOf.estimatedSizeOf; import static io.trino.SystemSessionProperties.MAX_UNACKNOWLEDGED_SPLITS_PER_TASK; import static java.util.concurrent.Executors.newCachedThreadPool; import static java.util.concurrent.Executors.newScheduledThreadPool; @@ -248,6 +250,8 @@ private static TopologyAwareNodeSelectorConfig getBenchmarkNetworkTopologyConfig private static class TestSplitRemote implements ConnectorSplit { + private static final int INSTANCE_SIZE = ClassLayout.parseClass(TestSplitRemote.class).instanceSize(); + private final List hosts; public TestSplitRemote(int dataHost) @@ -272,6 +276,13 @@ public Object getInfo() { return this; } + + @Override + public long getRetainedSizeInBytes() + { + return INSTANCE_SIZE + + estimatedSizeOf(hosts, HostAddress::getRetainedSizeInBytes); + } } private static HostAddress addressForHost(int host) diff --git a/core/trino-main/src/test/java/io/trino/execution/TestNodeScheduler.java b/core/trino-main/src/test/java/io/trino/execution/TestNodeScheduler.java index 0eb58e5dd49d..800cab175dd1 100644 --- a/core/trino-main/src/test/java/io/trino/execution/TestNodeScheduler.java +++ b/core/trino-main/src/test/java/io/trino/execution/TestNodeScheduler.java @@ -47,6 +47,7 @@ import io.trino.sql.planner.plan.PlanNodeId; import io.trino.testing.TestingSession; import io.trino.util.FinalizerService; +import org.openjdk.jol.info.ClassLayout; import org.testng.annotations.AfterMethod; import org.testng.annotations.BeforeMethod; import org.testng.annotations.DataProvider; @@ -72,6 +73,7 @@ import static com.google.common.base.MoreObjects.toStringHelper; import static com.google.common.collect.Iterables.getOnlyElement; import static io.airlift.concurrent.Threads.daemonThreadsNamed; +import static io.airlift.slice.SizeOf.estimatedSizeOf; import static io.airlift.testing.Assertions.assertLessThanOrEqual; import static io.trino.spi.StandardErrorCode.NO_NODES_AVAILABLE; import static io.trino.testing.assertions.TrinoExceptionAssert.assertTrinoExceptionThrownBy; @@ -869,6 +871,8 @@ private static Session sessionWithMaxUnacknowledgedSplitsPerTask(int maxUnacknow private static class TestSplitLocal implements ConnectorSplit { + private static final int INSTANCE_SIZE = ClassLayout.parseClass(TestSplitLocal.class).instanceSize(); + private final HostAddress address; private final SplitWeight splitWeight; @@ -912,6 +916,14 @@ public SplitWeight getSplitWeight() return splitWeight; } + @Override + public long getRetainedSizeInBytes() + { + return INSTANCE_SIZE + + address.getRetainedSizeInBytes() + + splitWeight.getRetainedSizeInBytes(); + } + @Override public String toString() { @@ -941,11 +953,19 @@ public Object getInfo() { return this; } + + @Override + public long getRetainedSizeInBytes() + { + return 0; + } } private static class TestSplitRemote implements ConnectorSplit { + private static final int INSTANCE_SIZE = ClassLayout.parseClass(TestSplitRemote.class).instanceSize(); + private final List hosts; private final SplitWeight splitWeight; @@ -992,6 +1012,14 @@ public SplitWeight getSplitWeight() { return splitWeight; } + + @Override + public long getRetainedSizeInBytes() + { + return INSTANCE_SIZE + + estimatedSizeOf(hosts, HostAddress::getRetainedSizeInBytes) + + splitWeight.getRetainedSizeInBytes(); + } } private static class TestNetworkTopology diff --git a/core/trino-main/src/test/java/io/trino/execution/TestSqlTaskExecution.java b/core/trino-main/src/test/java/io/trino/execution/TestSqlTaskExecution.java index 226b645a07e8..46ab542263ce 100644 --- a/core/trino-main/src/test/java/io/trino/execution/TestSqlTaskExecution.java +++ b/core/trino-main/src/test/java/io/trino/execution/TestSqlTaskExecution.java @@ -60,6 +60,7 @@ import io.trino.spiller.SpillSpaceTracker; import io.trino.sql.planner.LocalExecutionPlanner.LocalExecutionPlan; import io.trino.sql.planner.plan.PlanNodeId; +import org.openjdk.jol.info.ClassLayout; import org.testng.annotations.DataProvider; import org.testng.annotations.Test; @@ -1311,6 +1312,8 @@ public ListenableFuture getLookupDoneFuture() public static class TestingSplit implements ConnectorSplit { + private static final int INSTANCE_SIZE = ClassLayout.parseClass(TestingSplit.class).instanceSize(); + private final int begin; private final int end; @@ -1339,6 +1342,12 @@ public Object getInfo() return this; } + @Override + public long getRetainedSizeInBytes() + { + return INSTANCE_SIZE; + } + public int getBegin() { return begin; diff --git a/core/trino-main/src/test/java/io/trino/memory/TestSystemMemoryBlocking.java b/core/trino-main/src/test/java/io/trino/memory/TestSystemMemoryBlocking.java index 3f249ff9a7fb..3dd1d183747e 100644 --- a/core/trino-main/src/test/java/io/trino/memory/TestSystemMemoryBlocking.java +++ b/core/trino-main/src/test/java/io/trino/memory/TestSystemMemoryBlocking.java @@ -168,5 +168,11 @@ public Object getInfo() { return null; } + + @Override + public long getRetainedSizeInBytes() + { + return 0; + } } } diff --git a/core/trino-main/src/test/java/io/trino/operator/TestDriver.java b/core/trino-main/src/test/java/io/trino/operator/TestDriver.java index c2549c7f705b..7f4bcecc5438 100644 --- a/core/trino-main/src/test/java/io/trino/operator/TestDriver.java +++ b/core/trino-main/src/test/java/io/trino/operator/TestDriver.java @@ -486,5 +486,11 @@ public Object getInfo() { return null; } + + @Override + public long getRetainedSizeInBytes() + { + return 0; + } } } diff --git a/core/trino-main/src/test/java/io/trino/operator/TestExchangeOperator.java b/core/trino-main/src/test/java/io/trino/operator/TestExchangeOperator.java index 19057f243494..86d05453cd9d 100644 --- a/core/trino-main/src/test/java/io/trino/operator/TestExchangeOperator.java +++ b/core/trino-main/src/test/java/io/trino/operator/TestExchangeOperator.java @@ -38,7 +38,6 @@ import org.testng.annotations.BeforeMethod; import org.testng.annotations.Test; -import java.net.URI; import java.util.ArrayList; import java.util.List; import java.util.concurrent.ExecutorService; @@ -149,7 +148,7 @@ public void testSimple() private static Split newRemoteSplit(TaskId taskId) { - return new Split(REMOTE_CONNECTOR_ID, new RemoteSplit(taskId, URI.create("http://localhost/" + taskId)), Lifespan.taskWide()); + return new Split(REMOTE_CONNECTOR_ID, new RemoteSplit(taskId, "http://localhost/" + taskId), Lifespan.taskWide()); } @Test diff --git a/core/trino-main/src/test/java/io/trino/operator/TestMergeOperator.java b/core/trino-main/src/test/java/io/trino/operator/TestMergeOperator.java index d80e354aa7bb..03a75722af2b 100644 --- a/core/trino-main/src/test/java/io/trino/operator/TestMergeOperator.java +++ b/core/trino-main/src/test/java/io/trino/operator/TestMergeOperator.java @@ -38,7 +38,6 @@ import org.testng.annotations.BeforeMethod; import org.testng.annotations.Test; -import java.net.URI; import java.util.ArrayList; import java.util.List; import java.util.concurrent.ScheduledExecutorService; @@ -355,7 +354,7 @@ private MergeOperator createMergeOperator(List sourceTypes, List private static Split createRemoteSplit(TaskId taskId) { - return new Split(ExchangeOperator.REMOTE_CONNECTOR_ID, new RemoteSplit(taskId, URI.create("http://localhost/" + taskId)), Lifespan.taskWide()); + return new Split(ExchangeOperator.REMOTE_CONNECTOR_ID, new RemoteSplit(taskId, "http://localhost/" + taskId), Lifespan.taskWide()); } private static List pullAvailablePages(Operator operator) diff --git a/core/trino-main/src/test/java/io/trino/operator/scalar/FunctionAssertions.java b/core/trino-main/src/test/java/io/trino/operator/scalar/FunctionAssertions.java index 470052ec9a6f..5edd3b0afedb 100644 --- a/core/trino-main/src/test/java/io/trino/operator/scalar/FunctionAssertions.java +++ b/core/trino-main/src/test/java/io/trino/operator/scalar/FunctionAssertions.java @@ -983,6 +983,8 @@ private static Block createTestRowData(RowType rowType) private static class TestSplit implements ConnectorSplit { + private static final int INSTANCE_SIZE = ClassLayout.parseClass(TestSplit.class).instanceSize(); + private final boolean recordSet; private TestSplit(boolean recordSet) @@ -1012,5 +1014,11 @@ public Object getInfo() { return this; } + + @Override + public long getRetainedSizeInBytes() + { + return INSTANCE_SIZE; + } } } diff --git a/core/trino-main/src/test/java/io/trino/split/MockSplitSource.java b/core/trino-main/src/test/java/io/trino/split/MockSplitSource.java index b1424c612b84..74045db3d736 100644 --- a/core/trino-main/src/test/java/io/trino/split/MockSplitSource.java +++ b/core/trino-main/src/test/java/io/trino/split/MockSplitSource.java @@ -173,6 +173,12 @@ public Object getInfo() { return "A mock split"; } + + @Override + public long getRetainedSizeInBytes() + { + return 0; + } } public enum Action diff --git a/core/trino-spi/src/main/java/io/trino/spi/HostAddress.java b/core/trino-spi/src/main/java/io/trino/spi/HostAddress.java index 882a80627b19..900a54126203 100644 --- a/core/trino-spi/src/main/java/io/trino/spi/HostAddress.java +++ b/core/trino-spi/src/main/java/io/trino/spi/HostAddress.java @@ -15,6 +15,7 @@ import com.fasterxml.jackson.annotation.JsonCreator; import com.fasterxml.jackson.annotation.JsonValue; +import org.openjdk.jol.info.ClassLayout; import java.net.InetAddress; import java.net.URI; @@ -25,6 +26,7 @@ import java.util.regex.Matcher; import java.util.regex.Pattern; +import static io.airlift.slice.SizeOf.estimatedSizeOf; import static java.util.Collections.unmodifiableList; import static java.util.Objects.requireNonNull; @@ -61,6 +63,8 @@ */ public class HostAddress { + private static final int INSTANCE_SIZE = ClassLayout.parseClass(HostAddress.class).instanceSize(); + /** * Magic value indicating the absence of a port number. */ @@ -303,4 +307,9 @@ private static boolean isValidPort(int port) { return port >= 0 && port <= 65535; } + + public long getRetainedSizeInBytes() + { + return INSTANCE_SIZE + estimatedSizeOf(host); + } } diff --git a/core/trino-spi/src/main/java/io/trino/spi/SplitWeight.java b/core/trino-spi/src/main/java/io/trino/spi/SplitWeight.java index 05d3c075bebf..7f23358592e1 100644 --- a/core/trino-spi/src/main/java/io/trino/spi/SplitWeight.java +++ b/core/trino-spi/src/main/java/io/trino/spi/SplitWeight.java @@ -15,6 +15,7 @@ import com.fasterxml.jackson.annotation.JsonCreator; import com.fasterxml.jackson.annotation.JsonValue; +import org.openjdk.jol.info.ClassLayout; import java.math.BigDecimal; import java.util.Collection; @@ -25,6 +26,8 @@ public final class SplitWeight { + private static final int INSTANCE_SIZE = ClassLayout.parseClass(SplitWeight.class).instanceSize(); + private static final long UNIT_VALUE = 100; private static final int UNIT_SCALE = 2; // Decimal scale such that (10 ^ UNIT_SCALE) == UNIT_VALUE private static final SplitWeight STANDARD_WEIGHT = new SplitWeight(UNIT_VALUE); @@ -109,4 +112,9 @@ public static long rawValueSum(Collection collection, Function nonNullValues; diff --git a/core/trino-spi/src/main/java/io/trino/spi/predicate/EquatableValueSet.java b/core/trino-spi/src/main/java/io/trino/spi/predicate/EquatableValueSet.java index 4f5413b3d4f8..247949dd074a 100644 --- a/core/trino-spi/src/main/java/io/trino/spi/predicate/EquatableValueSet.java +++ b/core/trino-spi/src/main/java/io/trino/spi/predicate/EquatableValueSet.java @@ -18,6 +18,7 @@ import io.trino.spi.block.Block; import io.trino.spi.connector.ConnectorSession; import io.trino.spi.type.Type; +import org.openjdk.jol.info.ClassLayout; import java.lang.invoke.MethodHandle; import java.util.Collection; @@ -33,6 +34,7 @@ import java.util.stream.Collector; import java.util.stream.Stream; +import static io.airlift.slice.SizeOf.estimatedSizeOf; import static io.trino.spi.function.InvocationConvention.InvocationArgumentConvention.BLOCK_POSITION; import static io.trino.spi.function.InvocationConvention.InvocationReturnConvention.FAIL_ON_NULL; import static io.trino.spi.function.InvocationConvention.InvocationReturnConvention.NULLABLE_RETURN; @@ -54,6 +56,8 @@ public class EquatableValueSet implements ValueSet { + private static final int INSTANCE_SIZE = ClassLayout.parseClass(EquatableValueSet.class).instanceSize(); + private final Type type; private final boolean inclusive; private final Set entries; @@ -338,6 +342,13 @@ public String toString(ConnectorSession session, int limit) .toString(); } + @Override + public long getRetainedSizeInBytes() + { + // type is not accounted for as the instances are cached (by TypeRegistry) and shared + return INSTANCE_SIZE + estimatedSizeOf(entries, ValueEntry::getRetainedSizeInBytes); + } + private String formatValues(ConnectorSession session, int limit) { return Stream.concat( @@ -418,6 +429,8 @@ public boolean equals(Object obj) public static class ValueEntry { + private static final int INSTANCE_SIZE = ClassLayout.parseClass(ValueEntry.class).instanceSize(); + private final Type type; private final Block block; private final MethodHandle equalOperator; @@ -494,6 +507,12 @@ public boolean equals(Object obj) } return Boolean.TRUE.equals(result); } + + public long getRetainedSizeInBytes() + { + // type is not accounted for as the instances are cached (by TypeRegistry) and shared + return INSTANCE_SIZE + block.getRetainedSizeInBytes(); + } } private static Collector> toLinkedSet() diff --git a/core/trino-spi/src/main/java/io/trino/spi/predicate/SortedRangeSet.java b/core/trino-spi/src/main/java/io/trino/spi/predicate/SortedRangeSet.java index bdffcaa851fa..6993bc2e10e2 100644 --- a/core/trino-spi/src/main/java/io/trino/spi/predicate/SortedRangeSet.java +++ b/core/trino-spi/src/main/java/io/trino/spi/predicate/SortedRangeSet.java @@ -21,6 +21,7 @@ import io.trino.spi.block.RunLengthEncodedBlock; import io.trino.spi.connector.ConnectorSession; import io.trino.spi.type.Type; +import org.openjdk.jol.info.ClassLayout; import java.lang.invoke.MethodHandle; import java.util.ArrayList; @@ -35,6 +36,7 @@ import java.util.stream.IntStream; import java.util.stream.Stream; +import static io.airlift.slice.SizeOf.sizeOf; import static io.trino.spi.function.InvocationConvention.InvocationArgumentConvention.BLOCK_POSITION; import static io.trino.spi.function.InvocationConvention.InvocationReturnConvention.FAIL_ON_NULL; import static io.trino.spi.function.InvocationConvention.InvocationReturnConvention.NULLABLE_RETURN; @@ -62,6 +64,8 @@ public final class SortedRangeSet implements ValueSet { + private static final int INSTANCE_SIZE = ClassLayout.parseClass(SortedRangeSet.class).instanceSize(); + private final Type type; private final MethodHandle equalOperator; private final MethodHandle hashCodeOperator; @@ -901,6 +905,14 @@ public String toString(ConnectorSession session, int limit) .toString(); } + @Override + public long getRetainedSizeInBytes() + { + return INSTANCE_SIZE + + sizeOf(inclusive) + + sortedRanges.getRetainedSizeInBytes(); + } + private String formatRanges(ConnectorSession session, int limit) { if (isNone()) { diff --git a/core/trino-spi/src/main/java/io/trino/spi/predicate/TupleDomain.java b/core/trino-spi/src/main/java/io/trino/spi/predicate/TupleDomain.java index 5f98e4b856d5..c95a826303a2 100644 --- a/core/trino-spi/src/main/java/io/trino/spi/predicate/TupleDomain.java +++ b/core/trino-spi/src/main/java/io/trino/spi/predicate/TupleDomain.java @@ -18,6 +18,7 @@ import com.fasterxml.jackson.annotation.JsonProperty; import io.trino.spi.connector.ConnectorSession; import io.trino.spi.type.Type; +import org.openjdk.jol.info.ClassLayout; import java.util.ArrayList; import java.util.Arrays; @@ -35,8 +36,11 @@ import java.util.function.BiPredicate; import java.util.function.Function; import java.util.function.Predicate; +import java.util.function.ToLongFunction; import java.util.stream.Collector; +import static io.airlift.slice.SizeOf.estimatedSizeOf; +import static io.airlift.slice.SizeOf.sizeOf; import static java.lang.String.format; import static java.util.Collections.emptyMap; import static java.util.Collections.unmodifiableList; @@ -50,6 +54,8 @@ */ public final class TupleDomain { + private static final int INSTANCE_SIZE = ClassLayout.parseClass(TupleDomain.class).instanceSize(); + private static final TupleDomain NONE = new TupleDomain<>(Optional.empty()); private static final TupleDomain ALL = new TupleDomain<>(Optional.of(emptyMap())); @@ -620,4 +626,10 @@ public Domain getDomain() (u, v) -> { throw new IllegalStateException(format("Duplicate values for a key: %s and %s", u, v)); }, LinkedHashMap::new); } + + public long getRetainedSizeInBytes(ToLongFunction keySize) + { + return INSTANCE_SIZE + + sizeOf(domains, value -> estimatedSizeOf(value, keySize, Domain::getRetainedSizeInBytes)); + } } diff --git a/core/trino-spi/src/main/java/io/trino/spi/predicate/ValueSet.java b/core/trino-spi/src/main/java/io/trino/spi/predicate/ValueSet.java index cce4eb00260c..7c9f6b686536 100644 --- a/core/trino-spi/src/main/java/io/trino/spi/predicate/ValueSet.java +++ b/core/trino-spi/src/main/java/io/trino/spi/predicate/ValueSet.java @@ -159,4 +159,6 @@ default boolean contains(ValueSet other) String toString(ConnectorSession session); String toString(ConnectorSession session, int limit); + + long getRetainedSizeInBytes(); } diff --git a/plugin/trino-accumulo/src/main/java/io/trino/plugin/accumulo/AccumuloSplitManager.java b/plugin/trino-accumulo/src/main/java/io/trino/plugin/accumulo/AccumuloSplitManager.java index 303c4973e9d1..e4326b66b695 100644 --- a/plugin/trino-accumulo/src/main/java/io/trino/plugin/accumulo/AccumuloSplitManager.java +++ b/plugin/trino-accumulo/src/main/java/io/trino/plugin/accumulo/AccumuloSplitManager.java @@ -18,8 +18,8 @@ import io.trino.plugin.accumulo.model.AccumuloColumnHandle; import io.trino.plugin.accumulo.model.AccumuloSplit; import io.trino.plugin.accumulo.model.AccumuloTableHandle; +import io.trino.plugin.accumulo.model.SerializedRange; import io.trino.plugin.accumulo.model.TabletSplitMetadata; -import io.trino.plugin.accumulo.model.WrappedRange; import io.trino.spi.connector.ColumnHandle; import io.trino.spi.connector.ConnectorSession; import io.trino.spi.connector.ConnectorSplit; @@ -79,7 +79,7 @@ public ConnectorSplitSource getSplits( ImmutableList.Builder cSplits = ImmutableList.builder(); for (TabletSplitMetadata splitMetadata : tabletSplits) { AccumuloSplit split = new AccumuloSplit( - splitMetadata.getRanges().stream().map(WrappedRange::new).collect(Collectors.toList()), + splitMetadata.getRanges().stream().map(SerializedRange::serialize).collect(Collectors.toList()), splitMetadata.getHostPort()); cSplits.add(split); } diff --git a/plugin/trino-accumulo/src/main/java/io/trino/plugin/accumulo/model/AccumuloSplit.java b/plugin/trino-accumulo/src/main/java/io/trino/plugin/accumulo/model/AccumuloSplit.java index 1a43238bfba4..0b3669e45088 100644 --- a/plugin/trino-accumulo/src/main/java/io/trino/plugin/accumulo/model/AccumuloSplit.java +++ b/plugin/trino-accumulo/src/main/java/io/trino/plugin/accumulo/model/AccumuloSplit.java @@ -17,27 +17,33 @@ import com.fasterxml.jackson.annotation.JsonIgnore; import com.fasterxml.jackson.annotation.JsonProperty; import com.google.common.collect.ImmutableList; +import io.airlift.slice.SizeOf; import io.trino.spi.HostAddress; import io.trino.spi.connector.ConnectorSplit; import org.apache.accumulo.core.data.Range; +import org.openjdk.jol.info.ClassLayout; import java.util.List; import java.util.Optional; import java.util.stream.Collectors; import static com.google.common.base.MoreObjects.toStringHelper; +import static io.airlift.slice.SizeOf.estimatedSizeOf; +import static io.airlift.slice.SizeOf.sizeOf; import static java.util.Objects.requireNonNull; public class AccumuloSplit implements ConnectorSplit { + private static final int INSTANCE_SIZE = ClassLayout.parseClass(AccumuloSplit.class).instanceSize(); + private final Optional hostPort; private final List addresses; - private final List ranges; + private final List ranges; @JsonCreator public AccumuloSplit( - @JsonProperty("ranges") List ranges, + @JsonProperty("ranges") List ranges, @JsonProperty("hostPort") Optional hostPort) { this.hostPort = requireNonNull(hostPort, "hostPort is null"); @@ -59,7 +65,7 @@ public Optional getHostPort() } @JsonProperty("ranges") - public List getWrappedRanges() + public List getSerializedRanges() { return ranges; } @@ -67,7 +73,7 @@ public List getWrappedRanges() @JsonIgnore public List getRanges() { - return ranges.stream().map(WrappedRange::getRange).collect(Collectors.toList()); + return ranges.stream().map(SerializedRange::deserialize).collect(Collectors.toList()); } @Override @@ -88,6 +94,15 @@ public Object getInfo() return this; } + @Override + public long getRetainedSizeInBytes() + { + return INSTANCE_SIZE + + sizeOf(hostPort, SizeOf::estimatedSizeOf) + + estimatedSizeOf(addresses, HostAddress::getRetainedSizeInBytes) + + estimatedSizeOf(ranges, SerializedRange::getRetainedSizeInBytes); + } + @Override public String toString() { diff --git a/plugin/trino-accumulo/src/main/java/io/trino/plugin/accumulo/model/SerializedRange.java b/plugin/trino-accumulo/src/main/java/io/trino/plugin/accumulo/model/SerializedRange.java new file mode 100644 index 000000000000..524ea1a59675 --- /dev/null +++ b/plugin/trino-accumulo/src/main/java/io/trino/plugin/accumulo/model/SerializedRange.java @@ -0,0 +1,77 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.trino.plugin.accumulo.model; + +import com.fasterxml.jackson.annotation.JsonCreator; +import com.fasterxml.jackson.annotation.JsonProperty; +import com.google.common.io.ByteArrayDataOutput; +import com.google.common.io.ByteStreams; +import org.apache.accumulo.core.data.Range; +import org.openjdk.jol.info.ClassLayout; + +import java.io.DataInput; +import java.io.IOException; +import java.io.UncheckedIOException; + +import static io.airlift.slice.SizeOf.sizeOf; +import static java.util.Objects.requireNonNull; + +public class SerializedRange +{ + private static final int INSTANCE_SIZE = ClassLayout.parseClass(SerializedRange.class).instanceSize(); + + private final byte[] bytes; + + public static SerializedRange serialize(Range range) + { + ByteArrayDataOutput out = ByteStreams.newDataOutput(); + try { + range.write(out); + } + catch (IOException e) { + throw new UncheckedIOException(e); + } + return new SerializedRange(out.toByteArray()); + } + + @JsonCreator + public SerializedRange(@JsonProperty("data") byte[] bytes) + { + this.bytes = requireNonNull(bytes, "bytes is null"); + } + + @JsonProperty + public byte[] getBytes() + { + return bytes; + } + + public Range deserialize() + { + DataInput in = ByteStreams.newDataInput(bytes); + Range range = new Range(); + try { + range.readFields(in); + } + catch (IOException e) { + throw new UncheckedIOException(e); + } + return range; + } + + public long getRetainedSizeInBytes() + { + return INSTANCE_SIZE + sizeOf(bytes); + } +} diff --git a/plugin/trino-accumulo/src/main/java/io/trino/plugin/accumulo/model/WrappedRange.java b/plugin/trino-accumulo/src/main/java/io/trino/plugin/accumulo/model/WrappedRange.java deleted file mode 100644 index 092f4549f44c..000000000000 --- a/plugin/trino-accumulo/src/main/java/io/trino/plugin/accumulo/model/WrappedRange.java +++ /dev/null @@ -1,57 +0,0 @@ -/* - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package io.trino.plugin.accumulo.model; - -import com.fasterxml.jackson.annotation.JsonCreator; -import com.fasterxml.jackson.annotation.JsonValue; -import com.google.common.io.ByteArrayDataOutput; -import com.google.common.io.ByteStreams; -import org.apache.accumulo.core.data.Range; - -import java.io.DataInput; -import java.io.IOException; - -public class WrappedRange -{ - private final Range range; - - public WrappedRange(Range range) - { - this.range = range; - } - - public Range getRange() - { - return range; - } - - @JsonValue - public byte[] toBytes() - throws IOException - { - ByteArrayDataOutput out = ByteStreams.newDataOutput(); - range.write(out); - return out.toByteArray(); - } - - @JsonCreator - public static WrappedRange fromBytes(byte[] bytes) - throws IOException - { - DataInput in = ByteStreams.newDataInput(bytes); - Range range = new Range(); - range.readFields(in); - return new WrappedRange(range); - } -} diff --git a/plugin/trino-accumulo/src/test/java/io/trino/plugin/accumulo/model/TestAccumuloSplit.java b/plugin/trino-accumulo/src/test/java/io/trino/plugin/accumulo/model/TestAccumuloSplit.java index f862778bee30..4867e409edca 100644 --- a/plugin/trino-accumulo/src/test/java/io/trino/plugin/accumulo/model/TestAccumuloSplit.java +++ b/plugin/trino-accumulo/src/test/java/io/trino/plugin/accumulo/model/TestAccumuloSplit.java @@ -31,7 +31,7 @@ public class TestAccumuloSplit public void testJsonRoundTrip() { AccumuloSplit expected = new AccumuloSplit( - ImmutableList.of(new Range(), new Range("bar", "foo"), new Range("bar", false, "baz", false)).stream().map(WrappedRange::new).collect(Collectors.toList()), + ImmutableList.of(new Range(), new Range("bar", "foo"), new Range("bar", false, "baz", false)).stream().map(SerializedRange::serialize).collect(Collectors.toList()), Optional.of("localhost:9000")); String json = codec.toJson(expected); diff --git a/plugin/trino-accumulo/src/test/java/io/trino/plugin/accumulo/model/TestWrappedRange.java b/plugin/trino-accumulo/src/test/java/io/trino/plugin/accumulo/model/TestSerializedRange.java similarity index 71% rename from plugin/trino-accumulo/src/test/java/io/trino/plugin/accumulo/model/TestWrappedRange.java rename to plugin/trino-accumulo/src/test/java/io/trino/plugin/accumulo/model/TestSerializedRange.java index e5d76859a92b..cbc8029bb79c 100644 --- a/plugin/trino-accumulo/src/test/java/io/trino/plugin/accumulo/model/TestWrappedRange.java +++ b/plugin/trino-accumulo/src/test/java/io/trino/plugin/accumulo/model/TestSerializedRange.java @@ -18,18 +18,17 @@ import static org.testng.Assert.assertEquals; -public class TestWrappedRange +public class TestSerializedRange { @Test public void testJsonRoundTrip() - throws Exception { Range exact = new Range("foo"); Range range = new Range("bar", "foo"); Range exclusiveRange = new Range("asiago", false, "bagel", false); - assertEquals(WrappedRange.fromBytes(new WrappedRange(exact).toBytes()).getRange(), exact); - assertEquals(WrappedRange.fromBytes(new WrappedRange(range).toBytes()).getRange(), range); - assertEquals(WrappedRange.fromBytes(new WrappedRange(exclusiveRange).toBytes()).getRange(), exclusiveRange); + assertEquals(SerializedRange.serialize(exact).deserialize(), exact); + assertEquals(SerializedRange.serialize(range).deserialize(), range); + assertEquals(SerializedRange.serialize(exclusiveRange).deserialize(), exclusiveRange); } } diff --git a/plugin/trino-atop/src/main/java/io/trino/plugin/atop/AtopSplit.java b/plugin/trino-atop/src/main/java/io/trino/plugin/atop/AtopSplit.java index f0638ea4c17a..bbaafd5a183c 100644 --- a/plugin/trino-atop/src/main/java/io/trino/plugin/atop/AtopSplit.java +++ b/plugin/trino-atop/src/main/java/io/trino/plugin/atop/AtopSplit.java @@ -18,30 +18,35 @@ import com.google.common.collect.ImmutableList; import io.trino.spi.HostAddress; import io.trino.spi.connector.ConnectorSplit; +import org.openjdk.jol.info.ClassLayout; import java.time.ZoneId; import java.time.ZonedDateTime; import java.util.List; import static com.google.common.base.MoreObjects.toStringHelper; +import static io.airlift.slice.SizeOf.estimatedSizeOf; import static java.time.Instant.ofEpochSecond; import static java.util.Objects.requireNonNull; public class AtopSplit implements ConnectorSplit { + private static final int INSTANCE_SIZE = ClassLayout.parseClass(AtopSplit.class).instanceSize(); + private final HostAddress host; - private final ZonedDateTime date; + private final long epochSeconds; + private final String timeZoneId; @JsonCreator public AtopSplit( @JsonProperty("host") HostAddress host, @JsonProperty("epochSeconds") long epochSeconds, - @JsonProperty("timeZone") ZoneId timeZone) + @JsonProperty("timeZoneId") String timeZoneId) { this.host = requireNonNull(host, "host is null"); - requireNonNull(timeZone, "timeZone is null"); - this.date = ZonedDateTime.ofInstant(ofEpochSecond(epochSeconds), timeZone); + this.epochSeconds = epochSeconds; + this.timeZoneId = requireNonNull(timeZoneId, "timeZoneId is null"); } @JsonProperty @@ -53,18 +58,18 @@ public HostAddress getHost() @JsonProperty public long getEpochSeconds() { - return date.toEpochSecond(); + return epochSeconds; } @JsonProperty - public ZoneId getTimeZone() + public String getTimeZoneId() { - return date.getZone(); + return timeZoneId; } public ZonedDateTime getDate() { - return date; + return ZonedDateTime.ofInstant(ofEpochSecond(epochSeconds), ZoneId.of(timeZoneId)); } @Override @@ -86,12 +91,21 @@ public Object getInfo() return this; } + @Override + public long getRetainedSizeInBytes() + { + return INSTANCE_SIZE + + host.getRetainedSizeInBytes() + + estimatedSizeOf(timeZoneId); + } + @Override public String toString() { return toStringHelper(this) .add("host", host) - .add("date", date) + .add("epochSeconds", epochSeconds) + .add("timeZoneId", timeZoneId) .toString(); } } diff --git a/plugin/trino-atop/src/main/java/io/trino/plugin/atop/AtopSplitManager.java b/plugin/trino-atop/src/main/java/io/trino/plugin/atop/AtopSplitManager.java index 196405361919..181064f09d60 100644 --- a/plugin/trino-atop/src/main/java/io/trino/plugin/atop/AtopSplitManager.java +++ b/plugin/trino-atop/src/main/java/io/trino/plugin/atop/AtopSplitManager.java @@ -80,7 +80,7 @@ public ConnectorSplitSource getSplits( true)), false); if (tableHandle.getStartTimeConstraint().overlaps(splitDomain) && tableHandle.getEndTimeConstraint().overlaps(splitDomain)) { - splits.add(new AtopSplit(node.getHostAndPort(), start.toEpochSecond(), start.getZone())); + splits.add(new AtopSplit(node.getHostAndPort(), start.toEpochSecond(), start.getZone().getId())); } start = start.plusDays(1).withHour(0).withMinute(0).withSecond(0).withNano(0); } diff --git a/plugin/trino-atop/src/test/java/io/trino/plugin/atop/TestAtopSplit.java b/plugin/trino-atop/src/test/java/io/trino/plugin/atop/TestAtopSplit.java index 8b7d0891c000..1092d92a33a9 100644 --- a/plugin/trino-atop/src/test/java/io/trino/plugin/atop/TestAtopSplit.java +++ b/plugin/trino-atop/src/test/java/io/trino/plugin/atop/TestAtopSplit.java @@ -29,11 +29,11 @@ public void testSerialization() { JsonCodec codec = JsonCodec.jsonCodec(AtopSplit.class); ZonedDateTime now = ZonedDateTime.now(ZoneId.of("+01:23")); - AtopSplit split = new AtopSplit(HostAddress.fromParts("localhost", 123), now.toEpochSecond(), now.getZone()); + AtopSplit split = new AtopSplit(HostAddress.fromParts("localhost", 123), now.toEpochSecond(), now.getZone().getId()); AtopSplit decoded = codec.fromJson(codec.toJson(split)); assertEquals(decoded.getHost(), split.getHost()); assertEquals(decoded.getDate(), split.getDate()); assertEquals(decoded.getEpochSeconds(), split.getEpochSeconds()); - assertEquals(decoded.getTimeZone(), split.getTimeZone()); + assertEquals(decoded.getTimeZoneId(), split.getTimeZoneId()); } } diff --git a/plugin/trino-base-jdbc/pom.xml b/plugin/trino-base-jdbc/pom.xml index 9d78420755e9..5a36e77aea1c 100644 --- a/plugin/trino-base-jdbc/pom.xml +++ b/plugin/trino-base-jdbc/pom.xml @@ -145,6 +145,12 @@ provided + + org.openjdk.jol + jol-core + provided + + io.trino diff --git a/plugin/trino-base-jdbc/src/main/java/io/trino/plugin/jdbc/JdbcSplit.java b/plugin/trino-base-jdbc/src/main/java/io/trino/plugin/jdbc/JdbcSplit.java index 9c70ba36982c..ac4ca91f49b6 100644 --- a/plugin/trino-base-jdbc/src/main/java/io/trino/plugin/jdbc/JdbcSplit.java +++ b/plugin/trino-base-jdbc/src/main/java/io/trino/plugin/jdbc/JdbcSplit.java @@ -16,17 +16,22 @@ import com.fasterxml.jackson.annotation.JsonCreator; import com.fasterxml.jackson.annotation.JsonProperty; import com.google.common.collect.ImmutableList; +import io.airlift.slice.SizeOf; import io.trino.spi.HostAddress; import io.trino.spi.connector.ConnectorSplit; +import org.openjdk.jol.info.ClassLayout; import java.util.List; import java.util.Optional; +import static io.airlift.slice.SizeOf.sizeOf; import static java.util.Objects.requireNonNull; public class JdbcSplit implements ConnectorSplit { + private static final int INSTANCE_SIZE = ClassLayout.parseClass(JdbcSplit.class).instanceSize(); + private final Optional additionalPredicate; @JsonCreator @@ -59,4 +64,11 @@ public Object getInfo() { return this; } + + @Override + public long getRetainedSizeInBytes() + { + return INSTANCE_SIZE + + sizeOf(additionalPredicate, SizeOf::estimatedSizeOf); + } } diff --git a/plugin/trino-bigquery/pom.xml b/plugin/trino-bigquery/pom.xml index 1d7f01700ba6..9de23cb26e50 100644 --- a/plugin/trino-bigquery/pom.xml +++ b/plugin/trino-bigquery/pom.xml @@ -226,6 +226,12 @@ provided + + org.openjdk.jol + jol-core + provided + + io.trino diff --git a/plugin/trino-bigquery/src/main/java/io/trino/plugin/bigquery/BigQueryColumnHandle.java b/plugin/trino-bigquery/src/main/java/io/trino/plugin/bigquery/BigQueryColumnHandle.java index 891782200326..189cd60ca206 100644 --- a/plugin/trino-bigquery/src/main/java/io/trino/plugin/bigquery/BigQueryColumnHandle.java +++ b/plugin/trino-bigquery/src/main/java/io/trino/plugin/bigquery/BigQueryColumnHandle.java @@ -20,6 +20,7 @@ import com.google.common.collect.ImmutableList; import io.trino.spi.connector.ColumnHandle; import io.trino.spi.connector.ColumnMetadata; +import org.openjdk.jol.info.ClassLayout; import java.util.List; import java.util.Map; @@ -28,12 +29,16 @@ import static com.google.common.base.MoreObjects.toStringHelper; import static com.google.common.collect.ImmutableMap.toImmutableMap; +import static io.airlift.slice.SizeOf.estimatedSizeOf; +import static io.airlift.slice.SizeOf.sizeOf; import static java.lang.String.format; import static java.util.Objects.requireNonNull; public class BigQueryColumnHandle implements ColumnHandle, BigQueryType.Adaptor { + private static final int INSTANCE_SIZE = ClassLayout.parseClass(BigQueryColumnHandle.class).instanceSize(); + private final String name; private final BigQueryType bigQueryType; private final Field.Mode mode; @@ -184,4 +189,14 @@ public String toString() .add("description", description) .toString(); } + + public long getRetainedSizeInBytes() + { + return INSTANCE_SIZE + + estimatedSizeOf(name) + + sizeOf(precision) + + sizeOf(scale) + + estimatedSizeOf(subColumns, BigQueryColumnHandle::getRetainedSizeInBytes) + + estimatedSizeOf(description); + } } diff --git a/plugin/trino-bigquery/src/main/java/io/trino/plugin/bigquery/BigQuerySplit.java b/plugin/trino-bigquery/src/main/java/io/trino/plugin/bigquery/BigQuerySplit.java index 86ef38409389..79f16a46b4bf 100644 --- a/plugin/trino-bigquery/src/main/java/io/trino/plugin/bigquery/BigQuerySplit.java +++ b/plugin/trino-bigquery/src/main/java/io/trino/plugin/bigquery/BigQuerySplit.java @@ -19,16 +19,20 @@ import io.trino.spi.HostAddress; import io.trino.spi.connector.ColumnHandle; import io.trino.spi.connector.ConnectorSplit; +import org.openjdk.jol.info.ClassLayout; import java.util.List; import java.util.Objects; import static com.google.common.base.MoreObjects.toStringHelper; +import static io.airlift.slice.SizeOf.estimatedSizeOf; import static java.util.Objects.requireNonNull; public class BigQuerySplit implements ConnectorSplit { + private static final int INSTANCE_SIZE = ClassLayout.parseClass(BigQuerySplit.class).instanceSize(); + private static final int NO_ROWS_TO_GENERATE = -1; private final String streamName; @@ -102,6 +106,15 @@ public Object getInfo() return this; } + @Override + public long getRetainedSizeInBytes() + { + return INSTANCE_SIZE + + estimatedSizeOf(streamName) + + estimatedSizeOf(avroSchema) + + estimatedSizeOf(columns, column -> ((BigQueryColumnHandle) column).getRetainedSizeInBytes()); + } + @Override public boolean equals(Object o) { diff --git a/plugin/trino-blackhole/src/main/java/io/trino/plugin/blackhole/BlackHoleSplit.java b/plugin/trino-blackhole/src/main/java/io/trino/plugin/blackhole/BlackHoleSplit.java index ff796c5a8e32..4b0d8913edda 100644 --- a/plugin/trino-blackhole/src/main/java/io/trino/plugin/blackhole/BlackHoleSplit.java +++ b/plugin/trino-blackhole/src/main/java/io/trino/plugin/blackhole/BlackHoleSplit.java @@ -41,4 +41,10 @@ public Object getInfo() { return this; } + + @Override + public long getRetainedSizeInBytes() + { + return 0; + } } diff --git a/plugin/trino-cassandra/src/main/java/io/trino/plugin/cassandra/CassandraSplit.java b/plugin/trino-cassandra/src/main/java/io/trino/plugin/cassandra/CassandraSplit.java index 9f2d76f416ae..3de7ed9dbbf4 100644 --- a/plugin/trino-cassandra/src/main/java/io/trino/plugin/cassandra/CassandraSplit.java +++ b/plugin/trino-cassandra/src/main/java/io/trino/plugin/cassandra/CassandraSplit.java @@ -19,15 +19,19 @@ import com.google.common.collect.ImmutableMap; import io.trino.spi.HostAddress; import io.trino.spi.connector.ConnectorSplit; +import org.openjdk.jol.info.ClassLayout; import java.util.List; import static com.google.common.base.MoreObjects.toStringHelper; +import static io.airlift.slice.SizeOf.estimatedSizeOf; import static java.util.Objects.requireNonNull; public class CassandraSplit implements ConnectorSplit { + private static final int INSTANCE_SIZE = ClassLayout.parseClass(CassandraSplit.class).instanceSize(); + private final String partitionId; private final List addresses; private final String splitCondition; @@ -80,6 +84,15 @@ public Object getInfo() .build(); } + @Override + public long getRetainedSizeInBytes() + { + return INSTANCE_SIZE + + estimatedSizeOf(partitionId) + + estimatedSizeOf(addresses, HostAddress::getRetainedSizeInBytes) + + estimatedSizeOf(splitCondition); + } + @Override public String toString() { diff --git a/plugin/trino-elasticsearch/src/main/java/io/trino/plugin/elasticsearch/ElasticsearchSplit.java b/plugin/trino-elasticsearch/src/main/java/io/trino/plugin/elasticsearch/ElasticsearchSplit.java index db2532303de1..92e9ea356903 100644 --- a/plugin/trino-elasticsearch/src/main/java/io/trino/plugin/elasticsearch/ElasticsearchSplit.java +++ b/plugin/trino-elasticsearch/src/main/java/io/trino/plugin/elasticsearch/ElasticsearchSplit.java @@ -16,18 +16,24 @@ import com.fasterxml.jackson.annotation.JsonCreator; import com.fasterxml.jackson.annotation.JsonProperty; import com.google.common.collect.ImmutableList; +import io.airlift.slice.SizeOf; import io.trino.spi.HostAddress; import io.trino.spi.connector.ConnectorSplit; +import org.openjdk.jol.info.ClassLayout; import java.util.List; import java.util.Optional; import static com.google.common.base.MoreObjects.toStringHelper; +import static io.airlift.slice.SizeOf.estimatedSizeOf; +import static io.airlift.slice.SizeOf.sizeOf; import static java.util.Objects.requireNonNull; public class ElasticsearchSplit implements ConnectorSplit { + private static final int INSTANCE_SIZE = ClassLayout.parseClass(ElasticsearchSplit.class).instanceSize(); + private final String index; private final int shard; private final Optional address; @@ -80,6 +86,14 @@ public Object getInfo() return this; } + @Override + public long getRetainedSizeInBytes() + { + return INSTANCE_SIZE + + estimatedSizeOf(index) + + sizeOf(address, SizeOf::estimatedSizeOf); + } + @Override public String toString() { diff --git a/plugin/trino-example-http/src/main/java/io/trino/plugin/example/ExampleRecordSet.java b/plugin/trino-example-http/src/main/java/io/trino/plugin/example/ExampleRecordSet.java index 67baf4608f6e..67b4a9bc44bb 100644 --- a/plugin/trino-example-http/src/main/java/io/trino/plugin/example/ExampleRecordSet.java +++ b/plugin/trino-example-http/src/main/java/io/trino/plugin/example/ExampleRecordSet.java @@ -21,6 +21,7 @@ import io.trino.spi.type.Type; import java.net.MalformedURLException; +import java.net.URI; import java.util.List; import static java.util.Objects.requireNonNull; @@ -44,7 +45,7 @@ public ExampleRecordSet(ExampleSplit split, List columnHand this.columnTypes = types.build(); try { - byteSource = Resources.asByteSource(split.getUri().toURL()); + byteSource = Resources.asByteSource(URI.create(split.getUri()).toURL()); } catch (MalformedURLException e) { throw new RuntimeException(e); diff --git a/plugin/trino-example-http/src/main/java/io/trino/plugin/example/ExampleSplit.java b/plugin/trino-example-http/src/main/java/io/trino/plugin/example/ExampleSplit.java index 6935daf64994..b7a9d2374458 100644 --- a/plugin/trino-example-http/src/main/java/io/trino/plugin/example/ExampleSplit.java +++ b/plugin/trino-example-http/src/main/java/io/trino/plugin/example/ExampleSplit.java @@ -18,31 +18,34 @@ import com.google.common.collect.ImmutableList; import io.trino.spi.HostAddress; import io.trino.spi.connector.ConnectorSplit; +import org.openjdk.jol.info.ClassLayout; import java.net.URI; import java.util.List; +import static io.airlift.slice.SizeOf.estimatedSizeOf; import static java.util.Objects.requireNonNull; public class ExampleSplit implements ConnectorSplit { - private final URI uri; + private static final int INSTANCE_SIZE = ClassLayout.parseClass(ExampleSplit.class).instanceSize(); + + private final String uri; private final boolean remotelyAccessible; private final List addresses; @JsonCreator - public ExampleSplit( - @JsonProperty("uri") URI uri) + public ExampleSplit(@JsonProperty("uri") String uri) { this.uri = requireNonNull(uri, "uri is null"); remotelyAccessible = true; - addresses = ImmutableList.of(HostAddress.fromUri(uri)); + addresses = ImmutableList.of(HostAddress.fromUri(URI.create(uri))); } @JsonProperty - public URI getUri() + public String getUri() { return uri; } @@ -65,4 +68,12 @@ public Object getInfo() { return this; } + + @Override + public long getRetainedSizeInBytes() + { + return INSTANCE_SIZE + + estimatedSizeOf(uri) + + estimatedSizeOf(addresses, HostAddress::getRetainedSizeInBytes); + } } diff --git a/plugin/trino-example-http/src/main/java/io/trino/plugin/example/ExampleSplitManager.java b/plugin/trino-example-http/src/main/java/io/trino/plugin/example/ExampleSplitManager.java index d57eec4dd943..d3057b57f5ff 100644 --- a/plugin/trino-example-http/src/main/java/io/trino/plugin/example/ExampleSplitManager.java +++ b/plugin/trino-example-http/src/main/java/io/trino/plugin/example/ExampleSplitManager.java @@ -61,7 +61,7 @@ public ConnectorSplitSource getSplits( List splits = new ArrayList<>(); for (URI uri : table.getSources()) { - splits.add(new ExampleSplit(uri)); + splits.add(new ExampleSplit(uri.toString())); } Collections.shuffle(splits); diff --git a/plugin/trino-example-http/src/test/java/io/trino/plugin/example/TestExampleRecordSet.java b/plugin/trino-example-http/src/test/java/io/trino/plugin/example/TestExampleRecordSet.java index 314129633cba..fb1998b26814 100644 --- a/plugin/trino-example-http/src/test/java/io/trino/plugin/example/TestExampleRecordSet.java +++ b/plugin/trino-example-http/src/test/java/io/trino/plugin/example/TestExampleRecordSet.java @@ -21,7 +21,6 @@ import org.testng.annotations.BeforeClass; import org.testng.annotations.Test; -import java.net.URI; import java.util.LinkedHashMap; import java.util.Map; @@ -33,7 +32,7 @@ public class TestExampleRecordSet { private ExampleHttpServer exampleHttpServer; - private URI dataUri; + private String dataUri; @Test public void testGetColumnTypes() @@ -115,7 +114,7 @@ public void testCursorMixedOrder() public void setUp() { exampleHttpServer = new ExampleHttpServer(); - dataUri = exampleHttpServer.resolve("/example-data/numbers-2.csv"); + dataUri = exampleHttpServer.resolve("/example-data/numbers-2.csv").toString(); } @AfterClass(alwaysRun = true) diff --git a/plugin/trino-example-http/src/test/java/io/trino/plugin/example/TestExampleRecordSetProvider.java b/plugin/trino-example-http/src/test/java/io/trino/plugin/example/TestExampleRecordSetProvider.java index d18424b794c7..6cf777dbbe1e 100644 --- a/plugin/trino-example-http/src/test/java/io/trino/plugin/example/TestExampleRecordSetProvider.java +++ b/plugin/trino-example-http/src/test/java/io/trino/plugin/example/TestExampleRecordSetProvider.java @@ -22,7 +22,6 @@ import org.testng.annotations.BeforeClass; import org.testng.annotations.Test; -import java.net.URI; import java.util.LinkedHashMap; import java.util.Map; @@ -35,7 +34,7 @@ public class TestExampleRecordSetProvider { private ExampleHttpServer exampleHttpServer; - private URI dataUri; + private String dataUri; @Test public void testGetRecordSet() @@ -69,7 +68,7 @@ public void testGetRecordSet() public void setUp() { exampleHttpServer = new ExampleHttpServer(); - dataUri = exampleHttpServer.resolve("/example-data/numbers-2.csv"); + dataUri = exampleHttpServer.resolve("/example-data/numbers-2.csv").toString(); } @AfterClass(alwaysRun = true) diff --git a/plugin/trino-example-http/src/test/java/io/trino/plugin/example/TestExampleSplit.java b/plugin/trino-example-http/src/test/java/io/trino/plugin/example/TestExampleSplit.java index 1d9bceba7ca6..afe215ec7b22 100644 --- a/plugin/trino-example-http/src/test/java/io/trino/plugin/example/TestExampleSplit.java +++ b/plugin/trino-example-http/src/test/java/io/trino/plugin/example/TestExampleSplit.java @@ -18,35 +18,33 @@ import io.trino.spi.HostAddress; import org.testng.annotations.Test; -import java.net.URI; - import static io.airlift.json.JsonCodec.jsonCodec; import static org.testng.Assert.assertEquals; public class TestExampleSplit { - private final ExampleSplit split = new ExampleSplit(URI.create("http://127.0.0.1/test.file")); + private final ExampleSplit split = new ExampleSplit("http://127.0.0.1/test.file"); @Test public void testAddresses() { // http split with default port - ExampleSplit httpSplit = new ExampleSplit(URI.create("http://example.com/example")); + ExampleSplit httpSplit = new ExampleSplit("http://example.com/example"); assertEquals(httpSplit.getAddresses(), ImmutableList.of(HostAddress.fromString("example.com"))); assertEquals(httpSplit.isRemotelyAccessible(), true); // http split with custom port - httpSplit = new ExampleSplit(URI.create("http://example.com:8080/example")); + httpSplit = new ExampleSplit("http://example.com:8080/example"); assertEquals(httpSplit.getAddresses(), ImmutableList.of(HostAddress.fromParts("example.com", 8080))); assertEquals(httpSplit.isRemotelyAccessible(), true); // http split with default port - ExampleSplit httpsSplit = new ExampleSplit(URI.create("https://example.com/example")); + ExampleSplit httpsSplit = new ExampleSplit("https://example.com/example"); assertEquals(httpsSplit.getAddresses(), ImmutableList.of(HostAddress.fromString("example.com"))); assertEquals(httpsSplit.isRemotelyAccessible(), true); // http split with custom port - httpsSplit = new ExampleSplit(URI.create("https://example.com:8443/example")); + httpsSplit = new ExampleSplit("https://example.com:8443/example"); assertEquals(httpsSplit.getAddresses(), ImmutableList.of(HostAddress.fromParts("example.com", 8443))); assertEquals(httpsSplit.isRemotelyAccessible(), true); } diff --git a/plugin/trino-google-sheets/src/main/java/io/trino/plugin/google/sheets/SheetsClient.java b/plugin/trino-google-sheets/src/main/java/io/trino/plugin/google/sheets/SheetsClient.java index a8c97ada7e63..9cb796abf014 100644 --- a/plugin/trino-google-sheets/src/main/java/io/trino/plugin/google/sheets/SheetsClient.java +++ b/plugin/trino-google-sheets/src/main/java/io/trino/plugin/google/sheets/SheetsClient.java @@ -47,6 +47,7 @@ import static com.google.api.client.googleapis.javanet.GoogleNetHttpTransport.newTrustedTransport; import static com.google.common.base.Throwables.throwIfInstanceOf; import static com.google.common.cache.CacheLoader.from; +import static com.google.common.collect.ImmutableList.toImmutableList; import static io.trino.plugin.google.sheets.SheetsErrorCode.SHEETS_BAD_CREDENTIALS_ERROR; import static io.trino.plugin.google.sheets.SheetsErrorCode.SHEETS_METASTORE_ERROR; import static io.trino.plugin.google.sheets.SheetsErrorCode.SHEETS_TABLE_LOAD_ERROR; @@ -111,15 +112,15 @@ public Map> loadAll(Iterable tableLis public Optional getTable(String tableName) { - List> values = readAllValues(tableName); + List> values = convertToStringValues(readAllValues(tableName)); if (values.size() > 0) { ImmutableList.Builder columns = ImmutableList.builder(); Set columnNames = new HashSet<>(); // Assuming 1st line is always header - List header = values.get(0); + List header = values.get(0); int count = 0; - for (Object column : header) { - String columnValue = column.toString().toLowerCase(ENGLISH); + for (String column : header) { + String columnValue = column.toLowerCase(ENGLISH); // when empty or repeated column header, adding a placeholder column name if (columnValue.isEmpty() || columnNames.contains(columnValue)) { columnValue = "column_" + ++count; @@ -127,7 +128,7 @@ public Optional getTable(String tableName) columnNames.add(columnValue); columns.add(new SheetsColumn(columnValue, VarcharType.VARCHAR)); } - List> dataValues = values.subList(1, values.size()); // removing header info + List> dataValues = values.subList(1, values.size()); // removing header info return Optional.of(new SheetsTable(tableName, columns.build(), dataValues)); } return Optional.empty(); @@ -166,6 +167,13 @@ public List> readAllValues(String tableName) } } + public static List> convertToStringValues(List> values) + { + return values.stream() + .map(columns -> columns.stream().map(String::valueOf).collect(toImmutableList())) + .collect(toImmutableList()); + } + private Optional getSheetExpressionForTable(String tableName) { Map> tableSheetMap = getAllTableSheetExpressionMapping(); diff --git a/plugin/trino-google-sheets/src/main/java/io/trino/plugin/google/sheets/SheetsRecordCursor.java b/plugin/trino-google-sheets/src/main/java/io/trino/plugin/google/sheets/SheetsRecordCursor.java index 4427ba74f3b1..5595ae1735c7 100644 --- a/plugin/trino-google-sheets/src/main/java/io/trino/plugin/google/sheets/SheetsRecordCursor.java +++ b/plugin/trino-google-sheets/src/main/java/io/trino/plugin/google/sheets/SheetsRecordCursor.java @@ -36,12 +36,12 @@ public class SheetsRecordCursor { private final List columnHandles; private final long totalBytes; - private final List> dataValues; + private final List> dataValues; private List fields; private int currentIndex; - public SheetsRecordCursor(List columnHandles, List> dataValues) + public SheetsRecordCursor(List columnHandles, List> dataValues) { requireNonNull(columnHandles, "columnHandles is null"); requireNonNull(dataValues, "dataValues is null"); @@ -49,9 +49,9 @@ public SheetsRecordCursor(List columnHandles, List objList : dataValues) { - for (Object obj : objList) { - inputLength += String.valueOf(obj).length(); + for (List objList : dataValues) { + for (String obj : objList) { + inputLength += obj.length(); } } totalBytes = inputLength; @@ -79,7 +79,7 @@ public Type getType(int field) @Override public boolean advanceNextPosition() { - List currentVals = null; + List currentVals = null; // Skip empty rows from sheet while (currentVals == null || currentVals.size() == 0) { if (currentIndex == dataValues.size()) { @@ -93,7 +93,7 @@ public boolean advanceNextPosition() for (int i = 0; i < allFields.length; i++) { int ordinalPos = columnHandles.get(i).getOrdinalPosition(); if (currentVals.size() > ordinalPos) { - allFields[i] = String.valueOf(currentVals.get(ordinalPos)); + allFields[i] = currentVals.get(ordinalPos); } } fields = Arrays.asList(allFields); diff --git a/plugin/trino-google-sheets/src/main/java/io/trino/plugin/google/sheets/SheetsRecordSet.java b/plugin/trino-google-sheets/src/main/java/io/trino/plugin/google/sheets/SheetsRecordSet.java index 57864c25410b..314b82f08fc1 100644 --- a/plugin/trino-google-sheets/src/main/java/io/trino/plugin/google/sheets/SheetsRecordSet.java +++ b/plugin/trino-google-sheets/src/main/java/io/trino/plugin/google/sheets/SheetsRecordSet.java @@ -27,7 +27,7 @@ public class SheetsRecordSet { private final List columnHandles; private final List columnTypes; - private final List> values; + private final List> values; public SheetsRecordSet(SheetsSplit split, List columnHandles) { diff --git a/plugin/trino-google-sheets/src/main/java/io/trino/plugin/google/sheets/SheetsSplit.java b/plugin/trino-google-sheets/src/main/java/io/trino/plugin/google/sheets/SheetsSplit.java index 3def1175ef91..b2f0cc5819fa 100644 --- a/plugin/trino-google-sheets/src/main/java/io/trino/plugin/google/sheets/SheetsSplit.java +++ b/plugin/trino-google-sheets/src/main/java/io/trino/plugin/google/sheets/SheetsSplit.java @@ -17,26 +17,31 @@ import com.fasterxml.jackson.annotation.JsonProperty; import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableMap; +import io.airlift.slice.SizeOf; import io.trino.spi.HostAddress; import io.trino.spi.connector.ConnectorSplit; +import org.openjdk.jol.info.ClassLayout; import java.util.List; +import static io.airlift.slice.SizeOf.estimatedSizeOf; import static java.util.Objects.requireNonNull; public class SheetsSplit implements ConnectorSplit { + private static final int INSTANCE_SIZE = ClassLayout.parseClass(SheetsSplit.class).instanceSize(); + private final String schemaName; private final String tableName; - private final List> values; + private final List> values; private final List hostAddresses; @JsonCreator public SheetsSplit( @JsonProperty("schemaName") String schemaName, @JsonProperty("tableName") String tableName, - @JsonProperty("values") List> values) + @JsonProperty("values") List> values) { this.schemaName = requireNonNull(schemaName, "schemaName is null"); this.tableName = requireNonNull(tableName, "tableName is null"); @@ -57,7 +62,7 @@ public String getTableName() } @JsonProperty - public List> getValues() + public List> getValues() { return values; } @@ -83,4 +88,14 @@ public Object getInfo() .put("hostAddresses", hostAddresses) .build(); } + + @Override + public long getRetainedSizeInBytes() + { + return INSTANCE_SIZE + + estimatedSizeOf(schemaName) + + estimatedSizeOf(tableName) + + estimatedSizeOf(values, value -> estimatedSizeOf(value, SizeOf::estimatedSizeOf)) + + estimatedSizeOf(hostAddresses, HostAddress::getRetainedSizeInBytes); + } } diff --git a/plugin/trino-google-sheets/src/main/java/io/trino/plugin/google/sheets/SheetsTable.java b/plugin/trino-google-sheets/src/main/java/io/trino/plugin/google/sheets/SheetsTable.java index d31f94fad070..156ab453ca55 100644 --- a/plugin/trino-google-sheets/src/main/java/io/trino/plugin/google/sheets/SheetsTable.java +++ b/plugin/trino-google-sheets/src/main/java/io/trino/plugin/google/sheets/SheetsTable.java @@ -27,13 +27,13 @@ public class SheetsTable { private final List columnsMetadata; - private final List> values; + private final List> values; @JsonCreator public SheetsTable( @JsonProperty("name") String name, @JsonProperty("columns") List columns, - @JsonProperty("values") List> values) + @JsonProperty("values") List> values) { checkArgument(!isNullOrEmpty(name), "name is null or is empty"); requireNonNull(columns, "columns is null"); @@ -47,7 +47,7 @@ public SheetsTable( } @JsonProperty - public List> getValues() + public List> getValues() { return values; } diff --git a/plugin/trino-hive/src/main/java/io/trino/plugin/hive/AcidInfo.java b/plugin/trino-hive/src/main/java/io/trino/plugin/hive/AcidInfo.java index d417d9377f78..a143ade6ede6 100644 --- a/plugin/trino-hive/src/main/java/io/trino/plugin/hive/AcidInfo.java +++ b/plugin/trino-hive/src/main/java/io/trino/plugin/hive/AcidInfo.java @@ -19,6 +19,7 @@ import com.google.common.collect.ImmutableList; import com.google.common.collect.ListMultimap; import org.apache.hadoop.fs.Path; +import org.openjdk.jol.info.ClassLayout; import java.util.ArrayList; import java.util.List; @@ -28,6 +29,7 @@ import static com.google.common.base.MoreObjects.toStringHelper; import static com.google.common.base.Preconditions.checkArgument; import static com.google.common.base.Preconditions.checkState; +import static io.airlift.slice.SizeOf.estimatedSizeOf; import static java.util.Objects.requireNonNull; /** @@ -35,6 +37,8 @@ */ public class AcidInfo { + private static final int INSTANCE_SIZE = ClassLayout.parseClass(AcidInfo.class).instanceSize(); + private final String partitionLocation; private final List deleteDeltas; private final List originalFiles; @@ -121,8 +125,18 @@ public String toString() .toString(); } + public long getRetainedSizeInBytes() + { + return INSTANCE_SIZE + + estimatedSizeOf(partitionLocation) + + estimatedSizeOf(deleteDeltas, DeleteDeltaInfo::getRetainedSizeInBytes) + + estimatedSizeOf(originalFiles, OriginalFileInfo::getRetainedSizeInBytes); + } + public static class DeleteDeltaInfo { + private static final int INSTANCE_SIZE = ClassLayout.parseClass(DeleteDeltaInfo.class).instanceSize(); + private final String directoryName; @JsonCreator @@ -163,10 +177,17 @@ public String toString() .add("directoryName", directoryName) .toString(); } + + public long getRetainedSizeInBytes() + { + return INSTANCE_SIZE + estimatedSizeOf(directoryName); + } } public static class OriginalFileInfo { + private static final int INSTANCE_SIZE = ClassLayout.parseClass(OriginalFileInfo.class).instanceSize(); + private final String name; private final long fileSize; @@ -219,6 +240,11 @@ public String toString() .add("fileSize", fileSize) .toString(); } + + public long getRetainedSizeInBytes() + { + return INSTANCE_SIZE + estimatedSizeOf(name); + } } public static Builder builder(Path partitionPath) diff --git a/plugin/trino-hive/src/main/java/io/trino/plugin/hive/HiveColumnHandle.java b/plugin/trino-hive/src/main/java/io/trino/plugin/hive/HiveColumnHandle.java index b9ca91d2c683..26bffc9b3f44 100644 --- a/plugin/trino-hive/src/main/java/io/trino/plugin/hive/HiveColumnHandle.java +++ b/plugin/trino-hive/src/main/java/io/trino/plugin/hive/HiveColumnHandle.java @@ -15,10 +15,12 @@ import com.fasterxml.jackson.annotation.JsonCreator; import com.fasterxml.jackson.annotation.JsonProperty; +import io.airlift.slice.SizeOf; import io.trino.plugin.hive.metastore.Column; import io.trino.spi.connector.ColumnHandle; import io.trino.spi.connector.ColumnMetadata; import io.trino.spi.type.Type; +import org.openjdk.jol.info.ClassLayout; import java.util.List; import java.util.Objects; @@ -26,6 +28,8 @@ import static com.google.common.base.Preconditions.checkArgument; import static com.google.common.collect.ImmutableList.toImmutableList; +import static io.airlift.slice.SizeOf.estimatedSizeOf; +import static io.airlift.slice.SizeOf.sizeOf; import static io.trino.plugin.hive.HiveColumnHandle.ColumnType.PARTITION_KEY; import static io.trino.plugin.hive.HiveColumnHandle.ColumnType.SYNTHESIZED; import static io.trino.plugin.hive.HiveType.HIVE_INT; @@ -48,6 +52,8 @@ public class HiveColumnHandle implements ColumnHandle { + private static final int INSTANCE_SIZE = ClassLayout.parseClass(HiveColumnHandle.class).instanceSize(); + public static final int PATH_COLUMN_INDEX = -11; public static final String PATH_COLUMN_NAME = "$path"; public static final HiveType PATH_HIVE_TYPE = HIVE_STRING; @@ -327,4 +333,15 @@ public static boolean isRowIdColumnHandle(HiveColumnHandle column) { return column.getBaseHiveColumnIndex() == UPDATE_ROW_ID_COLUMN_INDEX; } + + public long getRetainedSizeInBytes() + { + return INSTANCE_SIZE + + estimatedSizeOf(baseColumnName) + + baseHiveType.getRetainedSizeInBytes() + // baseType is not accounted for as the instances are cached (by TypeRegistry) and shared + + sizeOf(comment, SizeOf::estimatedSizeOf) + + sizeOf(hiveColumnProjectionInfo, HiveColumnProjectionInfo::getRetainedSizeInBytes) + + estimatedSizeOf(name); + } } diff --git a/plugin/trino-hive/src/main/java/io/trino/plugin/hive/HiveColumnProjectionInfo.java b/plugin/trino-hive/src/main/java/io/trino/plugin/hive/HiveColumnProjectionInfo.java index 005c4d530313..96f247586edd 100644 --- a/plugin/trino-hive/src/main/java/io/trino/plugin/hive/HiveColumnProjectionInfo.java +++ b/plugin/trino-hive/src/main/java/io/trino/plugin/hive/HiveColumnProjectionInfo.java @@ -15,17 +15,22 @@ import com.fasterxml.jackson.annotation.JsonCreator; import com.fasterxml.jackson.annotation.JsonProperty; +import io.airlift.slice.SizeOf; import io.trino.spi.type.Type; +import org.openjdk.jol.info.ClassLayout; import java.util.List; import java.util.Objects; import java.util.stream.Collectors; import static com.google.common.base.Preconditions.checkArgument; +import static io.airlift.slice.SizeOf.estimatedSizeOf; import static java.util.Objects.requireNonNull; public class HiveColumnProjectionInfo { + private static final int INSTANCE_SIZE = ClassLayout.parseClass(HiveColumnProjectionInfo.class).instanceSize(); + private final List dereferenceIndices; private final List dereferenceNames; private final HiveType hiveType; @@ -108,4 +113,14 @@ public static String generatePartialName(List dereferenceNames) .map(name -> "#" + name) .collect(Collectors.joining()); } + + public long getRetainedSizeInBytes() + { + return INSTANCE_SIZE + + estimatedSizeOf(dereferenceIndices, SizeOf::sizeOf) + + estimatedSizeOf(dereferenceNames, SizeOf::estimatedSizeOf) + + hiveType.getRetainedSizeInBytes() + // type is not accounted for as the instances are cached (by TypeRegistry) and shared + + estimatedSizeOf(partialName); + } } diff --git a/plugin/trino-hive/src/main/java/io/trino/plugin/hive/HiveSplit.java b/plugin/trino-hive/src/main/java/io/trino/plugin/hive/HiveSplit.java index 50661112dfb7..8a25ce3d980a 100644 --- a/plugin/trino-hive/src/main/java/io/trino/plugin/hive/HiveSplit.java +++ b/plugin/trino-hive/src/main/java/io/trino/plugin/hive/HiveSplit.java @@ -21,6 +21,7 @@ import io.trino.spi.HostAddress; import io.trino.spi.SplitWeight; import io.trino.spi.connector.ConnectorSplit; +import org.openjdk.jol.info.ClassLayout; import java.util.List; import java.util.Objects; @@ -30,12 +31,16 @@ import static com.google.common.base.MoreObjects.toStringHelper; import static com.google.common.base.Preconditions.checkArgument; +import static io.airlift.slice.SizeOf.estimatedSizeOf; +import static io.airlift.slice.SizeOf.sizeOf; import static io.trino.plugin.hive.util.HiveUtil.getDeserializerClassName; import static java.util.Objects.requireNonNull; public class HiveSplit implements ConnectorSplit { + private static final int INSTANCE_SIZE = ClassLayout.parseClass(HiveSplit.class).instanceSize(); + private final String path; private final long start; private final long length; @@ -255,6 +260,25 @@ public SplitWeight getSplitWeight() return splitWeight; } + @Override + public long getRetainedSizeInBytes() + { + return INSTANCE_SIZE + + estimatedSizeOf(path) + + estimatedSizeOf(schema, key -> estimatedSizeOf((String) key), value -> estimatedSizeOf((String) value)) + + estimatedSizeOf(partitionKeys, HivePartitionKey::getEstimatedSizeInBytes) + + estimatedSizeOf(addresses, HostAddress::getRetainedSizeInBytes) + + estimatedSizeOf(database) + + estimatedSizeOf(table) + + estimatedSizeOf(partitionName) + + sizeOf(bucketNumber) + + tableToPartitionMapping.getEstimatedSizeInBytes() + + sizeOf(bucketConversion, BucketConversion::getRetainedSizeInBytes) + + sizeOf(bucketValidation, BucketValidation::getRetainedSizeInBytes) + + sizeOf(acidInfo, AcidInfo::getRetainedSizeInBytes) + + splitWeight.getRetainedSizeInBytes(); + } + @Override public Object getInfo() { @@ -287,6 +311,8 @@ public String toString() public static class BucketConversion { + private static final int INSTANCE_SIZE = ClassLayout.parseClass(BucketConversion.class).instanceSize(); + private final BucketingVersion bucketingVersion; private final int tableBucketCount; private final int partitionBucketCount; @@ -350,10 +376,18 @@ public int hashCode() { return Objects.hash(tableBucketCount, partitionBucketCount, bucketColumnNames); } + + public long getRetainedSizeInBytes() + { + return INSTANCE_SIZE + + estimatedSizeOf(bucketColumnNames, HiveColumnHandle::getRetainedSizeInBytes); + } } public static class BucketValidation { + private static final int INSTANCE_SIZE = ClassLayout.parseClass(BucketValidation.class).instanceSize(); + private final BucketingVersion bucketingVersion; private final int bucketCount; private final List bucketColumns; @@ -386,5 +420,11 @@ public List getBucketColumns() { return bucketColumns; } + + public long getRetainedSizeInBytes() + { + return INSTANCE_SIZE + + estimatedSizeOf(bucketColumns, HiveColumnHandle::getRetainedSizeInBytes); + } } } diff --git a/plugin/trino-hive/src/main/java/io/trino/plugin/hive/HiveType.java b/plugin/trino-hive/src/main/java/io/trino/plugin/hive/HiveType.java index f823ea85c0f9..6b9ef6755439 100644 --- a/plugin/trino-hive/src/main/java/io/trino/plugin/hive/HiveType.java +++ b/plugin/trino-hive/src/main/java/io/trino/plugin/hive/HiveType.java @@ -26,6 +26,7 @@ import org.apache.hadoop.hive.serde2.typeinfo.StructTypeInfo; import org.apache.hadoop.hive.serde2.typeinfo.TypeInfo; import org.apache.hadoop.hive.serde2.typeinfo.UnionTypeInfo; +import org.openjdk.jol.info.ClassLayout; import java.util.List; import java.util.Optional; @@ -56,6 +57,8 @@ public final class HiveType { + private static final int INSTANCE_SIZE = ClassLayout.parseClass(HiveType.class).instanceSize(); + public static final HiveType HIVE_BOOLEAN = new HiveType(booleanTypeInfo); public static final HiveType HIVE_BYTE = new HiveType(byteTypeInfo); public static final HiveType HIVE_SHORT = new HiveType(shortTypeInfo); @@ -245,4 +248,10 @@ public List getHiveDereferenceNames(List dereferences) return dereferenceNames.build(); } + + public long getRetainedSizeInBytes() + { + // typeInfo is not accounted for as the instances are cached (by TypeInfoFactory) and shared + return INSTANCE_SIZE + hiveTypeName.getEstimatedSizeInBytes(); + } } diff --git a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergSplit.java b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergSplit.java index f06e3371a5df..3045bd60b1b2 100644 --- a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergSplit.java +++ b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergSplit.java @@ -17,20 +17,26 @@ import com.fasterxml.jackson.annotation.JsonProperty; import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableMap; +import io.airlift.slice.SizeOf; import io.trino.spi.HostAddress; import io.trino.spi.connector.ConnectorSplit; import org.apache.iceberg.FileFormat; +import org.openjdk.jol.info.ClassLayout; import java.util.List; import java.util.Map; import java.util.Optional; import static com.google.common.base.MoreObjects.toStringHelper; +import static io.airlift.slice.SizeOf.estimatedSizeOf; +import static io.airlift.slice.SizeOf.sizeOf; import static java.util.Objects.requireNonNull; public class IcebergSplit implements ConnectorSplit { + private static final int INSTANCE_SIZE = ClassLayout.parseClass(IcebergSplit.class).instanceSize(); + private final String path; private final long start; private final long length; @@ -117,6 +123,15 @@ public Object getInfo() .build(); } + @Override + public long getRetainedSizeInBytes() + { + return INSTANCE_SIZE + + estimatedSizeOf(path) + + estimatedSizeOf(addresses, HostAddress::getRetainedSizeInBytes) + + estimatedSizeOf(partitionKeys, SizeOf::sizeOf, valueOptional -> sizeOf(valueOptional, SizeOf::estimatedSizeOf)); + } + @Override public String toString() { diff --git a/plugin/trino-jmx/src/main/java/io/trino/plugin/jmx/JmxSplit.java b/plugin/trino-jmx/src/main/java/io/trino/plugin/jmx/JmxSplit.java index 2e314611cae9..bd04afc72902 100644 --- a/plugin/trino-jmx/src/main/java/io/trino/plugin/jmx/JmxSplit.java +++ b/plugin/trino-jmx/src/main/java/io/trino/plugin/jmx/JmxSplit.java @@ -18,14 +18,18 @@ import com.google.common.collect.ImmutableList; import io.trino.spi.HostAddress; import io.trino.spi.connector.ConnectorSplit; +import org.openjdk.jol.info.ClassLayout; import java.util.List; +import static io.airlift.slice.SizeOf.estimatedSizeOf; import static java.util.Objects.requireNonNull; public class JmxSplit implements ConnectorSplit { + private static final int INSTANCE_SIZE = ClassLayout.parseClass(JmxSplit.class).instanceSize(); + private final List addresses; @JsonCreator @@ -53,4 +57,11 @@ public Object getInfo() { return this; } + + @Override + public long getRetainedSizeInBytes() + { + return INSTANCE_SIZE + + estimatedSizeOf(addresses, HostAddress::getRetainedSizeInBytes); + } } diff --git a/plugin/trino-kafka/src/main/java/io/trino/plugin/kafka/KafkaSplit.java b/plugin/trino-kafka/src/main/java/io/trino/plugin/kafka/KafkaSplit.java index 727f9a124ea9..011dabd43514 100644 --- a/plugin/trino-kafka/src/main/java/io/trino/plugin/kafka/KafkaSplit.java +++ b/plugin/trino-kafka/src/main/java/io/trino/plugin/kafka/KafkaSplit.java @@ -16,18 +16,24 @@ import com.fasterxml.jackson.annotation.JsonCreator; import com.fasterxml.jackson.annotation.JsonProperty; import com.google.common.collect.ImmutableList; +import io.airlift.slice.SizeOf; import io.trino.spi.HostAddress; import io.trino.spi.connector.ConnectorSplit; +import org.openjdk.jol.info.ClassLayout; import java.util.List; import java.util.Optional; import static com.google.common.base.MoreObjects.toStringHelper; +import static io.airlift.slice.SizeOf.estimatedSizeOf; +import static io.airlift.slice.SizeOf.sizeOf; import static java.util.Objects.requireNonNull; public class KafkaSplit implements ConnectorSplit { + private static final int INSTANCE_SIZE = ClassLayout.parseClass(KafkaSplit.class).instanceSize(); + private final String topicName; private final String keyDataFormat; private final String messageDataFormat; @@ -124,6 +130,19 @@ public Object getInfo() return this; } + @Override + public long getRetainedSizeInBytes() + { + return INSTANCE_SIZE + + estimatedSizeOf(topicName) + + estimatedSizeOf(keyDataFormat) + + estimatedSizeOf(messageDataFormat) + + sizeOf(keyDataSchemaContents, SizeOf::estimatedSizeOf) + + sizeOf(messageDataSchemaContents, SizeOf::estimatedSizeOf) + + messagesRange.getRetainedSizeInBytes() + + leader.getRetainedSizeInBytes(); + } + @Override public String toString() { diff --git a/plugin/trino-kafka/src/main/java/io/trino/plugin/kafka/Range.java b/plugin/trino-kafka/src/main/java/io/trino/plugin/kafka/Range.java index d1d34bf3f810..ceff54f14ee2 100644 --- a/plugin/trino-kafka/src/main/java/io/trino/plugin/kafka/Range.java +++ b/plugin/trino-kafka/src/main/java/io/trino/plugin/kafka/Range.java @@ -16,6 +16,7 @@ import com.fasterxml.jackson.annotation.JsonCreator; import com.fasterxml.jackson.annotation.JsonProperty; import com.google.common.collect.ImmutableList; +import org.openjdk.jol.info.ClassLayout; import java.util.List; @@ -24,6 +25,8 @@ public class Range { + private static final int INSTANCE_SIZE = ClassLayout.parseClass(Range.class).instanceSize(); + private final long begin; // inclusive private final long end; // exclusive @@ -65,4 +68,9 @@ public String toString() .add("end", end) .toString(); } + + public long getRetainedSizeInBytes() + { + return INSTANCE_SIZE; + } } diff --git a/plugin/trino-kinesis/src/main/java/io/trino/plugin/kinesis/KinesisSplit.java b/plugin/trino-kinesis/src/main/java/io/trino/plugin/kinesis/KinesisSplit.java index 7db01e08ef81..80a193f80f9e 100644 --- a/plugin/trino-kinesis/src/main/java/io/trino/plugin/kinesis/KinesisSplit.java +++ b/plugin/trino-kinesis/src/main/java/io/trino/plugin/kinesis/KinesisSplit.java @@ -18,10 +18,12 @@ import com.google.common.collect.ImmutableList; import io.trino.spi.HostAddress; import io.trino.spi.connector.ConnectorSplit; +import org.openjdk.jol.info.ClassLayout; import java.util.List; import static com.google.common.base.MoreObjects.toStringHelper; +import static io.airlift.slice.SizeOf.estimatedSizeOf; import static java.util.Objects.requireNonNull; /** @@ -31,6 +33,8 @@ public class KinesisSplit implements ConnectorSplit { + private static final int INSTANCE_SIZE = ClassLayout.parseClass(KinesisSplit.class).instanceSize(); + private final String streamName; private final String messageDataFormat; private final KinesisCompressionCodec compressionCodec; @@ -109,6 +113,17 @@ public Object getInfo() return this; } + @Override + public long getRetainedSizeInBytes() + { + return INSTANCE_SIZE + + estimatedSizeOf(streamName) + + estimatedSizeOf(messageDataFormat) + + estimatedSizeOf(shardId) + + estimatedSizeOf(start) + + estimatedSizeOf(end); + } + @Override public String toString() { diff --git a/plugin/trino-kudu/src/main/java/io/trino/plugin/kudu/KuduClientSession.java b/plugin/trino-kudu/src/main/java/io/trino/plugin/kudu/KuduClientSession.java index 3c25f72feea4..1b08fc9aaca8 100644 --- a/plugin/trino-kudu/src/main/java/io/trino/plugin/kudu/KuduClientSession.java +++ b/plugin/trino-kudu/src/main/java/io/trino/plugin/kudu/KuduClientSession.java @@ -603,7 +603,7 @@ private KuduSplit toKuduSplit(KuduTableHandle tableHandle, KuduScanToken token, { try { byte[] serializedScanToken = token.serialize(); - return new KuduSplit(tableHandle, primaryKeyColumnCount, serializedScanToken, bucketNumber); + return new KuduSplit(tableHandle.getSchemaTableName(), primaryKeyColumnCount, serializedScanToken, bucketNumber); } catch (IOException e) { throw new TrinoException(GENERIC_INTERNAL_ERROR, e); diff --git a/plugin/trino-kudu/src/main/java/io/trino/plugin/kudu/KuduRecordSet.java b/plugin/trino-kudu/src/main/java/io/trino/plugin/kudu/KuduRecordSet.java index a2241f96437c..f688cb0c4034 100755 --- a/plugin/trino-kudu/src/main/java/io/trino/plugin/kudu/KuduRecordSet.java +++ b/plugin/trino-kudu/src/main/java/io/trino/plugin/kudu/KuduRecordSet.java @@ -34,6 +34,8 @@ public class KuduRecordSet private final KuduSplit kuduSplit; private final List columns; + private KuduTable kuduTable; + public KuduRecordSet(KuduClientSession clientSession, KuduSplit kuduSplit, List columns) { this.clientSession = clientSession; @@ -70,7 +72,10 @@ public RecordCursor cursor() KuduTable getTable() { - return kuduSplit.getTableHandle().getTable(clientSession); + if (kuduTable == null) { + kuduTable = clientSession.openTable(kuduSplit.getSchemaTableName()); + } + return kuduTable; } KuduClientSession getClientSession() diff --git a/plugin/trino-kudu/src/main/java/io/trino/plugin/kudu/KuduSplit.java b/plugin/trino-kudu/src/main/java/io/trino/plugin/kudu/KuduSplit.java index 5e2c73fec2b7..c5a5d9d0b866 100755 --- a/plugin/trino-kudu/src/main/java/io/trino/plugin/kudu/KuduSplit.java +++ b/plugin/trino-kudu/src/main/java/io/trino/plugin/kudu/KuduSplit.java @@ -18,27 +18,33 @@ import com.google.common.collect.ImmutableList; import io.trino.spi.HostAddress; import io.trino.spi.connector.ConnectorSplit; +import io.trino.spi.connector.SchemaTableName; +import org.openjdk.jol.info.ClassLayout; import java.util.List; import static com.google.common.base.Preconditions.checkArgument; +import static io.airlift.slice.SizeOf.sizeOf; import static java.util.Objects.requireNonNull; public class KuduSplit implements ConnectorSplit { - private final KuduTableHandle tableHandle; + private static final int INSTANCE_SIZE = ClassLayout.parseClass(KuduSplit.class).instanceSize(); + + private final SchemaTableName schemaTableName; private final int primaryKeyColumnCount; private final byte[] serializedScanToken; private final int bucketNumber; @JsonCreator - public KuduSplit(@JsonProperty("tableHandle") KuduTableHandle tableHandle, + public KuduSplit( + @JsonProperty("schemaTableName") SchemaTableName schemaTableName, @JsonProperty("primaryKeyColumnCount") int primaryKeyColumnCount, @JsonProperty("serializedScanToken") byte[] serializedScanToken, @JsonProperty("bucketNumber") int bucketNumber) { - this.tableHandle = requireNonNull(tableHandle, "tableHandle is null"); + this.schemaTableName = requireNonNull(schemaTableName, "schemaTableName is null"); this.primaryKeyColumnCount = primaryKeyColumnCount; this.serializedScanToken = requireNonNull(serializedScanToken, "serializedScanToken is null"); checkArgument(bucketNumber >= 0, "bucketNumber is negative"); @@ -46,9 +52,9 @@ public KuduSplit(@JsonProperty("tableHandle") KuduTableHandle tableHandle, } @JsonProperty - public KuduTableHandle getTableHandle() + public SchemaTableName getSchemaTableName() { - return tableHandle; + return schemaTableName; } @JsonProperty @@ -86,4 +92,12 @@ public Object getInfo() { return this; } + + @Override + public long getRetainedSizeInBytes() + { + return INSTANCE_SIZE + + schemaTableName.getRetainedSizeInBytes() + + sizeOf(serializedScanToken); + } } diff --git a/plugin/trino-local-file/src/main/java/io/trino/plugin/localfile/LocalFileSplit.java b/plugin/trino-local-file/src/main/java/io/trino/plugin/localfile/LocalFileSplit.java index d78d056339b5..ed1b36bf5b35 100644 --- a/plugin/trino-local-file/src/main/java/io/trino/plugin/localfile/LocalFileSplit.java +++ b/plugin/trino-local-file/src/main/java/io/trino/plugin/localfile/LocalFileSplit.java @@ -18,6 +18,7 @@ import com.google.common.collect.ImmutableList; import io.trino.spi.HostAddress; import io.trino.spi.connector.ConnectorSplit; +import org.openjdk.jol.info.ClassLayout; import java.util.List; @@ -27,6 +28,8 @@ public class LocalFileSplit implements ConnectorSplit { + private static final int INSTANCE_SIZE = ClassLayout.parseClass(LocalFileSplit.class).instanceSize(); + private final HostAddress address; @JsonCreator @@ -59,6 +62,13 @@ public Object getInfo() return this; } + @Override + public long getRetainedSizeInBytes() + { + return INSTANCE_SIZE + + address.getRetainedSizeInBytes(); + } + @Override public String toString() { diff --git a/plugin/trino-memory/src/main/java/io/trino/plugin/memory/MemorySplit.java b/plugin/trino-memory/src/main/java/io/trino/plugin/memory/MemorySplit.java index 2a799235b6d3..249917abd21f 100644 --- a/plugin/trino-memory/src/main/java/io/trino/plugin/memory/MemorySplit.java +++ b/plugin/trino-memory/src/main/java/io/trino/plugin/memory/MemorySplit.java @@ -18,6 +18,7 @@ import com.google.common.collect.ImmutableList; import io.trino.spi.HostAddress; import io.trino.spi.connector.ConnectorSplit; +import org.openjdk.jol.info.ClassLayout; import java.util.List; import java.util.Objects; @@ -25,11 +26,14 @@ import static com.google.common.base.MoreObjects.toStringHelper; import static com.google.common.base.Preconditions.checkState; +import static io.airlift.slice.SizeOf.sizeOf; import static java.util.Objects.requireNonNull; public class MemorySplit implements ConnectorSplit { + private static final int INSTANCE_SIZE = ClassLayout.parseClass(MemorySplit.class).instanceSize(); + private final long table; private final int totalPartsPerWorker; // how many concurrent reads there will be from one worker private final int partNumber; // part of the pages on one worker that this splits is responsible @@ -82,6 +86,14 @@ public Object getInfo() return this; } + @Override + public long getRetainedSizeInBytes() + { + return INSTANCE_SIZE + + address.getRetainedSizeInBytes() + + sizeOf(limit); + } + @Override public boolean isRemotelyAccessible() { diff --git a/plugin/trino-mongodb/src/main/java/io/trino/plugin/mongodb/MongoSplit.java b/plugin/trino-mongodb/src/main/java/io/trino/plugin/mongodb/MongoSplit.java index 6d081e4b045a..6fa7301bc051 100644 --- a/plugin/trino-mongodb/src/main/java/io/trino/plugin/mongodb/MongoSplit.java +++ b/plugin/trino-mongodb/src/main/java/io/trino/plugin/mongodb/MongoSplit.java @@ -18,14 +18,18 @@ import com.google.common.collect.ImmutableList; import io.trino.spi.HostAddress; import io.trino.spi.connector.ConnectorSplit; +import org.openjdk.jol.info.ClassLayout; import java.util.List; +import static io.airlift.slice.SizeOf.estimatedSizeOf; import static java.util.Objects.requireNonNull; public class MongoSplit implements ConnectorSplit { + private static final int INSTANCE_SIZE = ClassLayout.parseClass(MongoSplit.class).instanceSize(); + private final List addresses; @JsonCreator @@ -52,4 +56,11 @@ public Object getInfo() { return this; } + + @Override + public long getRetainedSizeInBytes() + { + return INSTANCE_SIZE + + estimatedSizeOf(addresses, HostAddress::getRetainedSizeInBytes); + } } diff --git a/plugin/trino-phoenix/src/main/java/io/trino/plugin/phoenix/PhoenixSplit.java b/plugin/trino-phoenix/src/main/java/io/trino/plugin/phoenix/PhoenixSplit.java index 1b86f6b10e0d..edf3a6c167ae 100644 --- a/plugin/trino-phoenix/src/main/java/io/trino/plugin/phoenix/PhoenixSplit.java +++ b/plugin/trino-phoenix/src/main/java/io/trino/plugin/phoenix/PhoenixSplit.java @@ -16,29 +16,35 @@ import com.fasterxml.jackson.annotation.JsonCreator; import com.fasterxml.jackson.annotation.JsonIgnore; import com.fasterxml.jackson.annotation.JsonProperty; +import io.airlift.slice.SizeOf; import io.trino.plugin.jdbc.JdbcSplit; import io.trino.spi.HostAddress; import org.apache.phoenix.mapreduce.PhoenixInputSplit; +import org.openjdk.jol.info.ClassLayout; import java.util.List; import java.util.Optional; +import static io.airlift.slice.SizeOf.estimatedSizeOf; +import static io.airlift.slice.SizeOf.sizeOf; import static java.util.Objects.requireNonNull; public class PhoenixSplit extends JdbcSplit { + private static final int INSTANCE_SIZE = ClassLayout.parseClass(PhoenixSplit.class).instanceSize(); + private final List addresses; - private final WrappedPhoenixInputSplit phoenixInputSplit; + private final SerializedPhoenixInputSplit serializedPhoenixInputSplit; @JsonCreator public PhoenixSplit( @JsonProperty("addresses") List addresses, - @JsonProperty("phoenixInputSplit") WrappedPhoenixInputSplit wrappedPhoenixInputSplit) + @JsonProperty("serializedPhoenixInputSplit") SerializedPhoenixInputSplit serializedPhoenixInputSplit) { super(Optional.empty()); this.addresses = requireNonNull(addresses, "addresses is null"); - this.phoenixInputSplit = requireNonNull(wrappedPhoenixInputSplit, "wrappedPhoenixInputSplit is null"); + this.serializedPhoenixInputSplit = requireNonNull(serializedPhoenixInputSplit, "serializedPhoenixInputSplit is null"); } @JsonProperty @@ -48,15 +54,24 @@ public List getAddresses() return addresses; } - @JsonProperty("phoenixInputSplit") - public WrappedPhoenixInputSplit getWrappedPhoenixInputSplit() + @JsonProperty + public SerializedPhoenixInputSplit getSerializedPhoenixInputSplit() { - return phoenixInputSplit; + return serializedPhoenixInputSplit; } @JsonIgnore public PhoenixInputSplit getPhoenixInputSplit() { - return phoenixInputSplit.getPhoenixInputSplit(); + return serializedPhoenixInputSplit.deserialize(); + } + + @Override + public long getRetainedSizeInBytes() + { + return INSTANCE_SIZE + + sizeOf(getAdditionalPredicate(), SizeOf::estimatedSizeOf) + + estimatedSizeOf(addresses, HostAddress::getRetainedSizeInBytes) + + serializedPhoenixInputSplit.getRetainedSizeInBytes(); } } diff --git a/plugin/trino-phoenix/src/main/java/io/trino/plugin/phoenix/PhoenixSplitManager.java b/plugin/trino-phoenix/src/main/java/io/trino/plugin/phoenix/PhoenixSplitManager.java index 42319934facd..9c0c7a3eeb2e 100644 --- a/plugin/trino-phoenix/src/main/java/io/trino/plugin/phoenix/PhoenixSplitManager.java +++ b/plugin/trino-phoenix/src/main/java/io/trino/plugin/phoenix/PhoenixSplitManager.java @@ -93,7 +93,7 @@ public ConnectorSplitSource getSplits( .map(PhoenixInputSplit.class::cast) .map(split -> new PhoenixSplit( getSplitAddresses(split), - new WrappedPhoenixInputSplit(split))) + SerializedPhoenixInputSplit.serialize(split))) .collect(toImmutableList()); return new FixedSplitSource(splits); } diff --git a/plugin/trino-phoenix/src/main/java/io/trino/plugin/phoenix/SerializedPhoenixInputSplit.java b/plugin/trino-phoenix/src/main/java/io/trino/plugin/phoenix/SerializedPhoenixInputSplit.java new file mode 100644 index 000000000000..7e249fc8f154 --- /dev/null +++ b/plugin/trino-phoenix/src/main/java/io/trino/plugin/phoenix/SerializedPhoenixInputSplit.java @@ -0,0 +1,68 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.trino.plugin.phoenix; + +import com.fasterxml.jackson.annotation.JsonCreator; +import com.fasterxml.jackson.annotation.JsonProperty; +import com.google.common.io.ByteStreams; +import org.apache.hadoop.io.WritableUtils; +import org.apache.phoenix.mapreduce.PhoenixInputSplit; +import org.openjdk.jol.info.ClassLayout; + +import java.io.IOException; +import java.io.UncheckedIOException; + +import static io.airlift.slice.SizeOf.sizeOf; +import static java.util.Objects.requireNonNull; + +public class SerializedPhoenixInputSplit +{ + private static final int INSTANCE_SIZE = ClassLayout.parseClass(SerializedPhoenixInputSplit.class).instanceSize(); + + private final byte[] bytes; + + public static SerializedPhoenixInputSplit serialize(PhoenixInputSplit split) + { + return new SerializedPhoenixInputSplit(WritableUtils.toByteArray(split)); + } + + @JsonCreator + public SerializedPhoenixInputSplit(@JsonProperty("bytes") byte[] bytes) + { + this.bytes = requireNonNull(bytes, "bytes is null"); + } + + @JsonProperty + public byte[] getBytes() + { + return bytes; + } + + public PhoenixInputSplit deserialize() + { + PhoenixInputSplit split = new PhoenixInputSplit(); + try { + split.readFields(ByteStreams.newDataInput(bytes)); + } + catch (IOException e) { + throw new UncheckedIOException(e); + } + return split; + } + + public long getRetainedSizeInBytes() + { + return INSTANCE_SIZE + sizeOf(bytes); + } +} diff --git a/plugin/trino-phoenix/src/main/java/io/trino/plugin/phoenix/WrappedPhoenixInputSplit.java b/plugin/trino-phoenix/src/main/java/io/trino/plugin/phoenix/WrappedPhoenixInputSplit.java deleted file mode 100644 index 33eb7fb3b932..000000000000 --- a/plugin/trino-phoenix/src/main/java/io/trino/plugin/phoenix/WrappedPhoenixInputSplit.java +++ /dev/null @@ -1,68 +0,0 @@ -/* - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package io.trino.plugin.phoenix; - -import com.fasterxml.jackson.annotation.JsonCreator; -import com.fasterxml.jackson.annotation.JsonValue; -import com.google.common.io.ByteStreams; -import org.apache.hadoop.io.WritableUtils; -import org.apache.phoenix.mapreduce.PhoenixInputSplit; - -import java.io.IOException; - -public class WrappedPhoenixInputSplit -{ - private final PhoenixInputSplit phoenixInputSplit; - - public WrappedPhoenixInputSplit(PhoenixInputSplit phoenixInputSplit) - { - this.phoenixInputSplit = phoenixInputSplit; - } - - public PhoenixInputSplit getPhoenixInputSplit() - { - return phoenixInputSplit; - } - - @JsonValue - public byte[] toBytes() - { - return WritableUtils.toByteArray(phoenixInputSplit); - } - - @JsonCreator - public static WrappedPhoenixInputSplit fromBytes(byte[] bytes) - throws IOException - { - PhoenixInputSplit materialized = new PhoenixInputSplit(); - materialized.readFields(ByteStreams.newDataInput(bytes)); - return new WrappedPhoenixInputSplit(materialized); - } - - @Override - public int hashCode() - { - return phoenixInputSplit.hashCode(); - } - - @Override - public boolean equals(Object obj) - { - if (obj instanceof WrappedPhoenixInputSplit) { - PhoenixInputSplit otherSplit = ((WrappedPhoenixInputSplit) obj).getPhoenixInputSplit(); - return phoenixInputSplit.equals(otherSplit); - } - return false; - } -} diff --git a/plugin/trino-phoenix/src/test/java/io/trino/plugin/phoenix/TestPhoenixSplit.java b/plugin/trino-phoenix/src/test/java/io/trino/plugin/phoenix/TestPhoenixSplit.java index de4763a791ef..a4a21f64bb95 100644 --- a/plugin/trino-phoenix/src/test/java/io/trino/plugin/phoenix/TestPhoenixSplit.java +++ b/plugin/trino-phoenix/src/test/java/io/trino/plugin/phoenix/TestPhoenixSplit.java @@ -40,7 +40,7 @@ public void testPhoenixSplitJsonRoundtrip() PhoenixInputSplit phoenixInputSplit = new PhoenixInputSplit(scans); PhoenixSplit expected = new PhoenixSplit( addresses, - new WrappedPhoenixInputSplit(phoenixInputSplit)); + SerializedPhoenixInputSplit.serialize(phoenixInputSplit)); assertTrue(objectMapper.canSerialize(PhoenixSplit.class)); diff --git a/plugin/trino-phoenix5/src/main/java/io/trino/plugin/phoenix5/PhoenixSplit.java b/plugin/trino-phoenix5/src/main/java/io/trino/plugin/phoenix5/PhoenixSplit.java index e664b203350b..6ce5ea9c0b4a 100644 --- a/plugin/trino-phoenix5/src/main/java/io/trino/plugin/phoenix5/PhoenixSplit.java +++ b/plugin/trino-phoenix5/src/main/java/io/trino/plugin/phoenix5/PhoenixSplit.java @@ -16,29 +16,35 @@ import com.fasterxml.jackson.annotation.JsonCreator; import com.fasterxml.jackson.annotation.JsonIgnore; import com.fasterxml.jackson.annotation.JsonProperty; +import io.airlift.slice.SizeOf; import io.trino.plugin.jdbc.JdbcSplit; import io.trino.spi.HostAddress; import org.apache.phoenix.mapreduce.PhoenixInputSplit; +import org.openjdk.jol.info.ClassLayout; import java.util.List; import java.util.Optional; +import static io.airlift.slice.SizeOf.estimatedSizeOf; +import static io.airlift.slice.SizeOf.sizeOf; import static java.util.Objects.requireNonNull; public class PhoenixSplit extends JdbcSplit { + private static final int INSTANCE_SIZE = ClassLayout.parseClass(PhoenixSplit.class).instanceSize(); + private final List addresses; - private final WrappedPhoenixInputSplit phoenixInputSplit; + private final SerializedPhoenixInputSplit serializedPhoenixInputSplit; @JsonCreator public PhoenixSplit( @JsonProperty("addresses") List addresses, - @JsonProperty("phoenixInputSplit") WrappedPhoenixInputSplit wrappedPhoenixInputSplit) + @JsonProperty("serializedPhoenixInputSplit") SerializedPhoenixInputSplit serializedPhoenixInputSplit) { super(Optional.empty()); this.addresses = requireNonNull(addresses, "addresses is null"); - this.phoenixInputSplit = requireNonNull(wrappedPhoenixInputSplit, "wrappedPhoenixInputSplit is null"); + this.serializedPhoenixInputSplit = requireNonNull(serializedPhoenixInputSplit, "serializedPhoenixInputSplit is null"); } @JsonProperty @@ -48,15 +54,24 @@ public List getAddresses() return addresses; } - @JsonProperty("phoenixInputSplit") - public WrappedPhoenixInputSplit getWrappedPhoenixInputSplit() + @JsonProperty + public SerializedPhoenixInputSplit getSerializedPhoenixInputSplit() { - return phoenixInputSplit; + return serializedPhoenixInputSplit; } @JsonIgnore public PhoenixInputSplit getPhoenixInputSplit() { - return phoenixInputSplit.getPhoenixInputSplit(); + return serializedPhoenixInputSplit.deserialize(); + } + + @Override + public long getRetainedSizeInBytes() + { + return INSTANCE_SIZE + + sizeOf(getAdditionalPredicate(), SizeOf::estimatedSizeOf) + + estimatedSizeOf(addresses, HostAddress::getRetainedSizeInBytes) + + serializedPhoenixInputSplit.getRetainedSizeInBytes(); } } diff --git a/plugin/trino-phoenix5/src/main/java/io/trino/plugin/phoenix5/PhoenixSplitManager.java b/plugin/trino-phoenix5/src/main/java/io/trino/plugin/phoenix5/PhoenixSplitManager.java index e156999f6905..751188aeaa8d 100644 --- a/plugin/trino-phoenix5/src/main/java/io/trino/plugin/phoenix5/PhoenixSplitManager.java +++ b/plugin/trino-phoenix5/src/main/java/io/trino/plugin/phoenix5/PhoenixSplitManager.java @@ -93,7 +93,7 @@ public ConnectorSplitSource getSplits( .map(PhoenixInputSplit.class::cast) .map(split -> new PhoenixSplit( getSplitAddresses(split), - new WrappedPhoenixInputSplit(split))) + SerializedPhoenixInputSplit.serialize(split))) .collect(toImmutableList()); return new FixedSplitSource(splits); } diff --git a/plugin/trino-phoenix5/src/main/java/io/trino/plugin/phoenix5/SerializedPhoenixInputSplit.java b/plugin/trino-phoenix5/src/main/java/io/trino/plugin/phoenix5/SerializedPhoenixInputSplit.java new file mode 100644 index 000000000000..abc3621cd9e6 --- /dev/null +++ b/plugin/trino-phoenix5/src/main/java/io/trino/plugin/phoenix5/SerializedPhoenixInputSplit.java @@ -0,0 +1,68 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.trino.plugin.phoenix5; + +import com.fasterxml.jackson.annotation.JsonCreator; +import com.fasterxml.jackson.annotation.JsonProperty; +import com.google.common.io.ByteStreams; +import org.apache.hadoop.io.WritableUtils; +import org.apache.phoenix.mapreduce.PhoenixInputSplit; +import org.openjdk.jol.info.ClassLayout; + +import java.io.IOException; +import java.io.UncheckedIOException; + +import static io.airlift.slice.SizeOf.sizeOf; +import static java.util.Objects.requireNonNull; + +public class SerializedPhoenixInputSplit +{ + private static final int INSTANCE_SIZE = ClassLayout.parseClass(SerializedPhoenixInputSplit.class).instanceSize(); + + private final byte[] bytes; + + public static SerializedPhoenixInputSplit serialize(PhoenixInputSplit split) + { + return new SerializedPhoenixInputSplit(WritableUtils.toByteArray(split)); + } + + @JsonCreator + public SerializedPhoenixInputSplit(@JsonProperty("bytes") byte[] bytes) + { + this.bytes = requireNonNull(bytes, "bytes is null"); + } + + @JsonProperty + public byte[] getBytes() + { + return bytes; + } + + public PhoenixInputSplit deserialize() + { + PhoenixInputSplit split = new PhoenixInputSplit(); + try { + split.readFields(ByteStreams.newDataInput(bytes)); + } + catch (IOException e) { + throw new UncheckedIOException(e); + } + return split; + } + + public long getRetainedSizeInBytes() + { + return INSTANCE_SIZE + sizeOf(bytes); + } +} diff --git a/plugin/trino-phoenix5/src/main/java/io/trino/plugin/phoenix5/WrappedPhoenixInputSplit.java b/plugin/trino-phoenix5/src/main/java/io/trino/plugin/phoenix5/WrappedPhoenixInputSplit.java deleted file mode 100644 index fe799c6c4df8..000000000000 --- a/plugin/trino-phoenix5/src/main/java/io/trino/plugin/phoenix5/WrappedPhoenixInputSplit.java +++ /dev/null @@ -1,68 +0,0 @@ -/* - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package io.trino.plugin.phoenix5; - -import com.fasterxml.jackson.annotation.JsonCreator; -import com.fasterxml.jackson.annotation.JsonValue; -import com.google.common.io.ByteStreams; -import org.apache.hadoop.io.WritableUtils; -import org.apache.phoenix.mapreduce.PhoenixInputSplit; - -import java.io.IOException; - -public class WrappedPhoenixInputSplit -{ - private final PhoenixInputSplit phoenixInputSplit; - - public WrappedPhoenixInputSplit(PhoenixInputSplit phoenixInputSplit) - { - this.phoenixInputSplit = phoenixInputSplit; - } - - public PhoenixInputSplit getPhoenixInputSplit() - { - return phoenixInputSplit; - } - - @JsonValue - public byte[] toBytes() - { - return WritableUtils.toByteArray(phoenixInputSplit); - } - - @JsonCreator - public static WrappedPhoenixInputSplit fromBytes(byte[] bytes) - throws IOException - { - PhoenixInputSplit materialized = new PhoenixInputSplit(); - materialized.readFields(ByteStreams.newDataInput(bytes)); - return new WrappedPhoenixInputSplit(materialized); - } - - @Override - public int hashCode() - { - return phoenixInputSplit.hashCode(); - } - - @Override - public boolean equals(Object obj) - { - if (obj instanceof WrappedPhoenixInputSplit) { - PhoenixInputSplit otherSplit = ((WrappedPhoenixInputSplit) obj).getPhoenixInputSplit(); - return phoenixInputSplit.equals(otherSplit); - } - return false; - } -} diff --git a/plugin/trino-phoenix5/src/test/java/io/trino/plugin/phoenix5/TestPhoenixSplit.java b/plugin/trino-phoenix5/src/test/java/io/trino/plugin/phoenix5/TestPhoenixSplit.java index dbc8879d4d4f..8820ad988d5a 100644 --- a/plugin/trino-phoenix5/src/test/java/io/trino/plugin/phoenix5/TestPhoenixSplit.java +++ b/plugin/trino-phoenix5/src/test/java/io/trino/plugin/phoenix5/TestPhoenixSplit.java @@ -40,7 +40,7 @@ public void testPhoenixSplitJsonRoundtrip() PhoenixInputSplit phoenixInputSplit = new PhoenixInputSplit(scans); PhoenixSplit expected = new PhoenixSplit( addresses, - new WrappedPhoenixInputSplit(phoenixInputSplit)); + SerializedPhoenixInputSplit.serialize(phoenixInputSplit)); assertTrue(objectMapper.canSerialize(PhoenixSplit.class)); diff --git a/plugin/trino-pinot/src/main/java/io/trino/plugin/pinot/PinotSplit.java b/plugin/trino-pinot/src/main/java/io/trino/plugin/pinot/PinotSplit.java index b907fbb3324e..9eb3b5d38a31 100755 --- a/plugin/trino-pinot/src/main/java/io/trino/plugin/pinot/PinotSplit.java +++ b/plugin/trino-pinot/src/main/java/io/trino/plugin/pinot/PinotSplit.java @@ -16,19 +16,25 @@ import com.fasterxml.jackson.annotation.JsonCreator; import com.fasterxml.jackson.annotation.JsonProperty; import com.google.common.collect.ImmutableList; +import io.airlift.slice.SizeOf; import io.trino.spi.HostAddress; import io.trino.spi.connector.ConnectorSplit; +import org.openjdk.jol.info.ClassLayout; import java.util.List; import java.util.Optional; import static com.google.common.base.MoreObjects.toStringHelper; import static com.google.common.base.Preconditions.checkArgument; +import static io.airlift.slice.SizeOf.estimatedSizeOf; +import static io.airlift.slice.SizeOf.sizeOf; import static java.util.Objects.requireNonNull; public class PinotSplit implements ConnectorSplit { + private static final int INSTANCE_SIZE = ClassLayout.parseClass(PinotSplit.class).instanceSize(); + private final SplitType splitType; private final Optional suffix; private final List segments; @@ -135,6 +141,16 @@ public Object getInfo() return this; } + @Override + public long getRetainedSizeInBytes() + { + return INSTANCE_SIZE + + sizeOf(suffix, SizeOf::estimatedSizeOf) + + estimatedSizeOf(segments, SizeOf::estimatedSizeOf) + + sizeOf(segmentHost, SizeOf::estimatedSizeOf) + + sizeOf(timePredicate, SizeOf::estimatedSizeOf); + } + public enum SplitType { SEGMENT, diff --git a/plugin/trino-prometheus/src/main/java/io/trino/plugin/prometheus/PrometheusRecordSet.java b/plugin/trino-prometheus/src/main/java/io/trino/plugin/prometheus/PrometheusRecordSet.java index 128c19eb5dea..39c7c38f1c8f 100644 --- a/plugin/trino-prometheus/src/main/java/io/trino/plugin/prometheus/PrometheusRecordSet.java +++ b/plugin/trino-prometheus/src/main/java/io/trino/plugin/prometheus/PrometheusRecordSet.java @@ -19,6 +19,7 @@ import io.trino.spi.connector.RecordSet; import io.trino.spi.type.Type; +import java.net.URI; import java.util.List; import static java.util.Objects.requireNonNull; @@ -42,7 +43,7 @@ public PrometheusRecordSet(PrometheusClient prometheusClient, PrometheusSplit sp } this.columnTypes = types.build(); - this.byteSource = ByteSource.wrap(prometheusClient.fetchUri(split.getUri())); + this.byteSource = ByteSource.wrap(prometheusClient.fetchUri(URI.create(split.getUri()))); } @Override diff --git a/plugin/trino-prometheus/src/main/java/io/trino/plugin/prometheus/PrometheusSplit.java b/plugin/trino-prometheus/src/main/java/io/trino/plugin/prometheus/PrometheusSplit.java index 0a9cf391e727..7ce3722cfd7c 100644 --- a/plugin/trino-prometheus/src/main/java/io/trino/plugin/prometheus/PrometheusSplit.java +++ b/plugin/trino-prometheus/src/main/java/io/trino/plugin/prometheus/PrometheusSplit.java @@ -18,29 +18,32 @@ import com.google.common.collect.ImmutableList; import io.trino.spi.HostAddress; import io.trino.spi.connector.ConnectorSplit; +import org.openjdk.jol.info.ClassLayout; import java.net.URI; import java.util.List; +import static io.airlift.slice.SizeOf.estimatedSizeOf; import static java.util.Objects.requireNonNull; public class PrometheusSplit implements ConnectorSplit { - private final URI uri; + private static final int INSTANCE_SIZE = ClassLayout.parseClass(PrometheusSplit.class).instanceSize(); + + private final String uri; private final List addresses; @JsonCreator - public PrometheusSplit( - @JsonProperty("uri") URI uri) + public PrometheusSplit(@JsonProperty("uri") String uri) { this.uri = requireNonNull(uri, "uri is null"); - addresses = ImmutableList.of(HostAddress.fromUri(uri)); + addresses = ImmutableList.of(HostAddress.fromUri(URI.create(uri))); } @JsonProperty - public URI getUri() + public String getUri() { return uri; } @@ -62,4 +65,12 @@ public Object getInfo() { return this; } + + @Override + public long getRetainedSizeInBytes() + { + return INSTANCE_SIZE + + estimatedSizeOf(uri) + + estimatedSizeOf(addresses, HostAddress::getRetainedSizeInBytes); + } } diff --git a/plugin/trino-prometheus/src/main/java/io/trino/plugin/prometheus/PrometheusSplitManager.java b/plugin/trino-prometheus/src/main/java/io/trino/plugin/prometheus/PrometheusSplitManager.java index b739459e1850..ad77c030d66c 100644 --- a/plugin/trino-prometheus/src/main/java/io/trino/plugin/prometheus/PrometheusSplitManager.java +++ b/plugin/trino-prometheus/src/main/java/io/trino/plugin/prometheus/PrometheusSplitManager.java @@ -101,7 +101,7 @@ public ConnectorSplitSource getSplits( prometheusURI, time, table.getName(), - queryChunkSizeDuration)); + queryChunkSizeDuration).toString()); } catch (URISyntaxException e) { throw new TrinoException(PROMETHEUS_UNKNOWN_ERROR, "split URI invalid: " + e.getMessage()); diff --git a/plugin/trino-prometheus/src/test/java/io/trino/plugin/prometheus/TestPrometheusIntegrationMetrics.java b/plugin/trino-prometheus/src/test/java/io/trino/plugin/prometheus/TestPrometheusIntegrationMetrics.java index 2e92531e330a..3a95d59aee18 100644 --- a/plugin/trino-prometheus/src/test/java/io/trino/plugin/prometheus/TestPrometheusIntegrationMetrics.java +++ b/plugin/trino-prometheus/src/test/java/io/trino/plugin/prometheus/TestPrometheusIntegrationMetrics.java @@ -25,8 +25,6 @@ import org.testng.annotations.BeforeClass; import org.testng.annotations.Test; -import java.net.URI; - import static io.trino.plugin.prometheus.PrometheusClient.TIMESTAMP_COLUMN_TYPE; import static io.trino.plugin.prometheus.PrometheusQueryRunner.createPrometheusClient; import static io.trino.spi.type.BigintType.BIGINT; @@ -123,7 +121,7 @@ public void testDropTableTable() @Test public void testGetColumnTypes() { - URI dataUri = server.getUri(); + String dataUri = server.getUri().toString(); RecordSet recordSet = new PrometheusRecordSet( client, new PrometheusSplit(dataUri), diff --git a/plugin/trino-prometheus/src/test/java/io/trino/plugin/prometheus/TestPrometheusRecordSet.java b/plugin/trino-prometheus/src/test/java/io/trino/plugin/prometheus/TestPrometheusRecordSet.java index bb3064c42b7e..95ea11885ad9 100644 --- a/plugin/trino-prometheus/src/test/java/io/trino/plugin/prometheus/TestPrometheusRecordSet.java +++ b/plugin/trino-prometheus/src/test/java/io/trino/plugin/prometheus/TestPrometheusRecordSet.java @@ -23,7 +23,6 @@ import org.testng.annotations.BeforeClass; import org.testng.annotations.Test; -import java.net.URI; import java.time.Instant; import java.util.ArrayList; import java.util.List; @@ -42,7 +41,7 @@ public class TestPrometheusRecordSet { private PrometheusHttpServer prometheusHttpServer; - private URI dataUri; + private String dataUri; @Test public void testCursorSimple() @@ -96,7 +95,7 @@ public void testCursorSimple() public void setUp() { prometheusHttpServer = new PrometheusHttpServer(); - dataUri = prometheusHttpServer.resolve("/prometheus-data/up_matrix_response.json"); + dataUri = prometheusHttpServer.resolve("/prometheus-data/up_matrix_response.json").toString(); } @AfterClass(alwaysRun = true) diff --git a/plugin/trino-prometheus/src/test/java/io/trino/plugin/prometheus/TestPrometheusRecordSetProvider.java b/plugin/trino-prometheus/src/test/java/io/trino/plugin/prometheus/TestPrometheusRecordSetProvider.java index 3a4fa462d8e0..e0f82b4888c0 100644 --- a/plugin/trino-prometheus/src/test/java/io/trino/plugin/prometheus/TestPrometheusRecordSetProvider.java +++ b/plugin/trino-prometheus/src/test/java/io/trino/plugin/prometheus/TestPrometheusRecordSetProvider.java @@ -24,7 +24,6 @@ import org.testng.annotations.BeforeClass; import org.testng.annotations.Test; -import java.net.URI; import java.time.Instant; import java.util.LinkedHashMap; import java.util.Map; @@ -42,14 +41,14 @@ public class TestPrometheusRecordSetProvider { private PrometheusHttpServer prometheusHttpServer; - private URI dataUri; + private String dataUri; private PrometheusClient client; @BeforeClass public void setUp() { prometheusHttpServer = new PrometheusHttpServer(); - dataUri = prometheusHttpServer.resolve("/prometheus-data/up_matrix_response.json"); + dataUri = prometheusHttpServer.resolve("/prometheus-data/up_matrix_response.json").toString(); client = new PrometheusClient(new PrometheusConnectorConfig(), METRIC_CODEC, TESTING_TYPE_MANAGER); } diff --git a/plugin/trino-prometheus/src/test/java/io/trino/plugin/prometheus/TestPrometheusSplit.java b/plugin/trino-prometheus/src/test/java/io/trino/plugin/prometheus/TestPrometheusSplit.java index 92a88691fe20..9324b908553e 100644 --- a/plugin/trino-prometheus/src/test/java/io/trino/plugin/prometheus/TestPrometheusSplit.java +++ b/plugin/trino-prometheus/src/test/java/io/trino/plugin/prometheus/TestPrometheusSplit.java @@ -67,7 +67,7 @@ public class TestPrometheusSplit { private PrometheusHttpServer prometheusHttpServer; - private final PrometheusSplit split = new PrometheusSplit(URI.create("http://127.0.0.1/test.file")); + private final PrometheusSplit split = new PrometheusSplit("http://127.0.0.1/test.file"); private static final int NUMBER_MORE_THAN_EXPECTED_NUMBER_SPLITS = 100; @BeforeClass @@ -80,22 +80,22 @@ public void setUp() public void testAddresses() { // http split with default port - PrometheusSplit httpSplit = new PrometheusSplit(URI.create("http://prometheus.com/prometheus")); + PrometheusSplit httpSplit = new PrometheusSplit("http://prometheus.com/prometheus"); assertEquals(httpSplit.getAddresses(), ImmutableList.of(HostAddress.fromString("prometheus.com"))); assertTrue(httpSplit.isRemotelyAccessible()); // http split with custom port - httpSplit = new PrometheusSplit(URI.create("http://prometheus.com:8080/prometheus")); + httpSplit = new PrometheusSplit("http://prometheus.com:8080/prometheus"); assertEquals(httpSplit.getAddresses(), ImmutableList.of(HostAddress.fromParts("prometheus.com", 8080))); assertTrue(httpSplit.isRemotelyAccessible()); // http split with default port - PrometheusSplit httpsSplit = new PrometheusSplit(URI.create("https://prometheus.com/prometheus")); + PrometheusSplit httpsSplit = new PrometheusSplit("https://prometheus.com/prometheus"); assertEquals(httpsSplit.getAddresses(), ImmutableList.of(HostAddress.fromString("prometheus.com"))); assertTrue(httpsSplit.isRemotelyAccessible()); // http split with custom port - httpsSplit = new PrometheusSplit(URI.create("https://prometheus.com:8443/prometheus")); + httpsSplit = new PrometheusSplit("https://prometheus.com:8443/prometheus"); assertEquals(httpsSplit.getAddresses(), ImmutableList.of(HostAddress.fromParts("prometheus.com", 8443))); assertTrue(httpsSplit.isRemotelyAccessible()); } @@ -128,7 +128,7 @@ public void testQueryWithTableNameNeedingURLEncodeInSplits() null, (DynamicFilter) null); PrometheusSplit split = (PrometheusSplit) splits.getNextBatch(NOT_PARTITIONED, 1).getNow(null).getSplits().get(0); - String queryInSplit = split.getUri().getQuery(); + String queryInSplit = URI.create(split.getUri()).getQuery(); String timeShouldBe = decimalSecondString(now.toEpochMilli() - config.getMaxQueryRangeDuration().toMillis() + config.getQueryChunkSizeDuration().toMillis() - @@ -154,7 +154,7 @@ public void testQueryDividedIntoSplitsFirstSplitHasRightTime() null, (DynamicFilter) null); PrometheusSplit split = (PrometheusSplit) splits.getNextBatch(NOT_PARTITIONED, 1).getNow(null).getSplits().get(0); - String queryInSplit = split.getUri().getQuery(); + String queryInSplit = URI.create(split.getUri()).getQuery(); String timeShouldBe = decimalSecondString(now.toEpochMilli() - config.getMaxQueryRangeDuration().toMillis() + config.getQueryChunkSizeDuration().toMillis() - @@ -182,7 +182,7 @@ public void testQueryDividedIntoSplitsLastSplitHasRightTime() List splits = splitsMaybe.getNextBatch(NOT_PARTITIONED, NUMBER_MORE_THAN_EXPECTED_NUMBER_SPLITS).getNow(null).getSplits(); int lastSplitIndex = splits.size() - 1; PrometheusSplit lastSplit = (PrometheusSplit) splits.get(lastSplitIndex); - String queryInSplit = lastSplit.getUri().getQuery(); + String queryInSplit = URI.create(lastSplit.getUri()).getQuery(); String timeShouldBe = decimalSecondString(now.toEpochMilli()); URI uriAsFormed = new URI("http://doesnotmatter:9090/api/v1/query?query=up[" + getQueryChunkSizeDurationAsPrometheusCompatibleDurationString(config) + "]" + @@ -205,9 +205,9 @@ public void testQueryDividedIntoSplitsShouldHaveCorrectSpacingBetweenTimes() null, (DynamicFilter) null); PrometheusSplit split1 = (PrometheusSplit) splits.getNextBatch(NOT_PARTITIONED, 1).getNow(null).getSplits().get(0); - Map paramsMap1 = parse(split1.getUri(), StandardCharsets.UTF_8).stream().collect(Collectors.toMap(NameValuePair::getName, NameValuePair::getValue)); + Map paramsMap1 = parse(URI.create(split1.getUri()), StandardCharsets.UTF_8).stream().collect(Collectors.toMap(NameValuePair::getName, NameValuePair::getValue)); PrometheusSplit split2 = (PrometheusSplit) splits.getNextBatch(NOT_PARTITIONED, 1).getNow(null).getSplits().get(0); - Map paramsMap2 = parse(split2.getUri(), StandardCharsets.UTF_8).stream().collect(Collectors.toMap(NameValuePair::getName, NameValuePair::getValue)); + Map paramsMap2 = parse(URI.create(split2.getUri()), StandardCharsets.UTF_8).stream().collect(Collectors.toMap(NameValuePair::getName, NameValuePair::getValue)); assertEquals(paramsMap1.get("query"), "up[1d]"); assertEquals(paramsMap2.get("query"), "up[1d]"); long diff = Double.valueOf(paramsMap2.get("time")).longValue() - Double.valueOf(paramsMap1.get("time")).longValue(); diff --git a/plugin/trino-raptor-legacy/src/main/java/io/trino/plugin/raptor/legacy/RaptorSplit.java b/plugin/trino-raptor-legacy/src/main/java/io/trino/plugin/raptor/legacy/RaptorSplit.java index d42982667639..fbe5969c9802 100644 --- a/plugin/trino-raptor-legacy/src/main/java/io/trino/plugin/raptor/legacy/RaptorSplit.java +++ b/plugin/trino-raptor-legacy/src/main/java/io/trino/plugin/raptor/legacy/RaptorSplit.java @@ -19,6 +19,7 @@ import com.google.common.collect.ImmutableSet; import io.trino.spi.HostAddress; import io.trino.spi.connector.ConnectorSplit; +import org.openjdk.jol.info.ClassLayout; import java.util.List; import java.util.OptionalInt; @@ -27,11 +28,16 @@ import java.util.UUID; import static com.google.common.base.MoreObjects.toStringHelper; +import static io.airlift.slice.SizeOf.estimatedSizeOf; +import static io.airlift.slice.SizeOf.sizeOf; import static java.util.Objects.requireNonNull; public class RaptorSplit implements ConnectorSplit { + private static final int INSTANCE_SIZE = ClassLayout.parseClass(RaptorSplit.class).instanceSize(); + private static final int UUID_INSTANCE_SIZE = ClassLayout.parseClass(UUID.class).instanceSize(); + private final Set shardUuids; private final OptionalInt bucketNumber; private final List addresses; @@ -111,6 +117,16 @@ public Object getInfo() return this; } + @Override + public long getRetainedSizeInBytes() + { + return INSTANCE_SIZE + + estimatedSizeOf(shardUuids, value -> UUID_INSTANCE_SIZE) + + sizeOf(bucketNumber) + + estimatedSizeOf(addresses, HostAddress::getRetainedSizeInBytes) + + sizeOf(transactionId); + } + @Override public String toString() { diff --git a/plugin/trino-redis/src/main/java/io/trino/plugin/redis/RedisSplit.java b/plugin/trino-redis/src/main/java/io/trino/plugin/redis/RedisSplit.java index a294e60100b6..e79c8bcc4a66 100644 --- a/plugin/trino-redis/src/main/java/io/trino/plugin/redis/RedisSplit.java +++ b/plugin/trino-redis/src/main/java/io/trino/plugin/redis/RedisSplit.java @@ -18,10 +18,12 @@ import com.google.common.collect.ImmutableList; import io.trino.spi.HostAddress; import io.trino.spi.connector.ConnectorSplit; +import org.openjdk.jol.info.ClassLayout; import java.util.List; import static com.google.common.base.MoreObjects.toStringHelper; +import static io.airlift.slice.SizeOf.estimatedSizeOf; import static java.util.Objects.requireNonNull; /** @@ -30,6 +32,8 @@ public final class RedisSplit implements ConnectorSplit { + private static final int INSTANCE_SIZE = ClassLayout.parseClass(RedisSplit.class).instanceSize(); + private final String schemaName; private final String tableName; private final String keyDataFormat; @@ -143,6 +147,18 @@ public Object getInfo() return this; } + @Override + public long getRetainedSizeInBytes() + { + return INSTANCE_SIZE + + estimatedSizeOf(schemaName) + + estimatedSizeOf(tableName) + + estimatedSizeOf(keyDataFormat) + + estimatedSizeOf(keyName) + + estimatedSizeOf(valueDataFormat) + + estimatedSizeOf(nodes, HostAddress::getRetainedSizeInBytes); + } + private static RedisDataType toRedisDataType(String dataFormat) { switch (dataFormat) { diff --git a/plugin/trino-thrift-api/pom.xml b/plugin/trino-thrift-api/pom.xml index 92762e15922e..078a3a6739ac 100644 --- a/plugin/trino-thrift-api/pom.xml +++ b/plugin/trino-thrift-api/pom.xml @@ -49,6 +49,11 @@ guava + + org.openjdk.jol + jol-core + + io.trino diff --git a/plugin/trino-thrift-api/src/main/java/io/trino/plugin/thrift/api/TrinoThriftId.java b/plugin/trino-thrift-api/src/main/java/io/trino/plugin/thrift/api/TrinoThriftId.java index 3f203e16a534..38ee6651b623 100644 --- a/plugin/trino-thrift-api/src/main/java/io/trino/plugin/thrift/api/TrinoThriftId.java +++ b/plugin/trino-thrift-api/src/main/java/io/trino/plugin/thrift/api/TrinoThriftId.java @@ -20,15 +20,19 @@ import io.airlift.drift.annotations.ThriftConstructor; import io.airlift.drift.annotations.ThriftField; import io.airlift.drift.annotations.ThriftStruct; +import org.openjdk.jol.info.ClassLayout; import java.util.Arrays; import static com.google.common.base.MoreObjects.toStringHelper; +import static io.airlift.slice.SizeOf.sizeOf; import static java.util.Objects.requireNonNull; @ThriftStruct public final class TrinoThriftId { + private static final int INSTANCE_SIZE = ClassLayout.parseClass(TrinoThriftId.class).instanceSize(); + private static final int PREFIX_SUFFIX_BYTES = 8; private static final String FILLER = ".."; private static final int MAX_DISPLAY_CHARACTERS = PREFIX_SUFFIX_BYTES * 4 + FILLER.length(); @@ -76,6 +80,11 @@ public String toString() .toString(); } + public long getRetainedSizeInBytes() + { + return INSTANCE_SIZE + sizeOf(id); + } + @VisibleForTesting static String summarize(byte[] value) { diff --git a/plugin/trino-thrift/src/main/java/io/trino/plugin/thrift/ThriftConnectorSplit.java b/plugin/trino-thrift/src/main/java/io/trino/plugin/thrift/ThriftConnectorSplit.java index 425d759ec5a5..ac4112d7ae09 100644 --- a/plugin/trino-thrift/src/main/java/io/trino/plugin/thrift/ThriftConnectorSplit.java +++ b/plugin/trino-thrift/src/main/java/io/trino/plugin/thrift/ThriftConnectorSplit.java @@ -19,16 +19,20 @@ import io.trino.plugin.thrift.api.TrinoThriftId; import io.trino.spi.HostAddress; import io.trino.spi.connector.ConnectorSplit; +import org.openjdk.jol.info.ClassLayout; import java.util.List; import java.util.Objects; import static com.google.common.base.MoreObjects.toStringHelper; +import static io.airlift.slice.SizeOf.estimatedSizeOf; import static java.util.Objects.requireNonNull; public class ThriftConnectorSplit implements ConnectorSplit { + private static final int INSTANCE_SIZE = ClassLayout.parseClass(ThriftConnectorSplit.class).instanceSize(); + private final TrinoThriftId splitId; private final List addresses; @@ -60,6 +64,14 @@ public Object getInfo() return ""; } + @Override + public long getRetainedSizeInBytes() + { + return INSTANCE_SIZE + + splitId.getRetainedSizeInBytes() + + estimatedSizeOf(addresses, HostAddress::getRetainedSizeInBytes); + } + @Override public boolean isRemotelyAccessible() { diff --git a/plugin/trino-tpcds/src/main/java/io/trino/plugin/tpcds/TpcdsSplit.java b/plugin/trino-tpcds/src/main/java/io/trino/plugin/tpcds/TpcdsSplit.java index ae8e538803b5..5c4419c563a1 100644 --- a/plugin/trino-tpcds/src/main/java/io/trino/plugin/tpcds/TpcdsSplit.java +++ b/plugin/trino-tpcds/src/main/java/io/trino/plugin/tpcds/TpcdsSplit.java @@ -18,17 +18,21 @@ import com.google.common.collect.ImmutableList; import io.trino.spi.HostAddress; import io.trino.spi.connector.ConnectorSplit; +import org.openjdk.jol.info.ClassLayout; import java.util.List; import java.util.Objects; import static com.google.common.base.MoreObjects.toStringHelper; import static com.google.common.base.Preconditions.checkState; +import static io.airlift.slice.SizeOf.estimatedSizeOf; import static java.util.Objects.requireNonNull; public class TpcdsSplit implements ConnectorSplit { + private static final int INSTANCE_SIZE = ClassLayout.parseClass(TpcdsSplit.class).instanceSize(); + private final int totalParts; private final int partNumber; private final List addresses; @@ -70,6 +74,13 @@ public Object getInfo() return this; } + @Override + public long getRetainedSizeInBytes() + { + return INSTANCE_SIZE + + estimatedSizeOf(addresses, HostAddress::getRetainedSizeInBytes); + } + @Override public boolean isRemotelyAccessible() { diff --git a/plugin/trino-tpch/src/main/java/io/trino/plugin/tpch/TpchSplit.java b/plugin/trino-tpch/src/main/java/io/trino/plugin/tpch/TpchSplit.java index fc899fe17c6c..ab59ab0668a4 100644 --- a/plugin/trino-tpch/src/main/java/io/trino/plugin/tpch/TpchSplit.java +++ b/plugin/trino-tpch/src/main/java/io/trino/plugin/tpch/TpchSplit.java @@ -18,17 +18,21 @@ import com.google.common.collect.ImmutableList; import io.trino.spi.HostAddress; import io.trino.spi.connector.ConnectorSplit; +import org.openjdk.jol.info.ClassLayout; import java.util.List; import java.util.Objects; import static com.google.common.base.MoreObjects.toStringHelper; import static com.google.common.base.Preconditions.checkState; +import static io.airlift.slice.SizeOf.estimatedSizeOf; import static java.util.Objects.requireNonNull; public class TpchSplit implements ConnectorSplit { + private static final int INSTANCE_SIZE = ClassLayout.parseClass(TpchSplit.class).instanceSize(); + private final int totalParts; private final int partNumber; private final List addresses; @@ -107,4 +111,11 @@ public String toString() .add("totalParts", totalParts) .toString(); } + + @Override + public long getRetainedSizeInBytes() + { + return INSTANCE_SIZE + + estimatedSizeOf(addresses, HostAddress::getRetainedSizeInBytes); + } }