Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 10 additions & 0 deletions core/trino-main/src/main/java/io/trino/connector/CatalogName.java
Original file line number Diff line number Diff line change
Expand Up @@ -15,14 +15,18 @@

import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonValue;
import org.openjdk.jol.info.ClassLayout;

import java.util.Objects;

import static com.google.common.base.Preconditions.checkArgument;
import static io.airlift.slice.SizeOf.estimatedSizeOf;
import static java.util.Objects.requireNonNull;

public final class CatalogName
{
private static final int INSTANCE_SIZE = ClassLayout.parseClass(CatalogName.class).instanceSize();

private static final String INFORMATION_SCHEMA_CONNECTOR_PREFIX = "$info_schema@";
private static final String SYSTEM_TABLES_CONNECTOR_PREFIX = "$system@";

Expand Down Expand Up @@ -66,6 +70,12 @@ public String toString()
return catalogName;
}

public long getRetainedSizeInBytes()
{
return INSTANCE_SIZE
+ estimatedSizeOf(catalogName);
}

public static boolean isInternalSystemConnector(CatalogName catalogName)
{
return catalogName.getCatalogName().startsWith(SYSTEM_TABLES_CONNECTOR_PREFIX) ||
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,15 +18,19 @@
import com.google.common.collect.ImmutableList;
import io.trino.spi.HostAddress;
import io.trino.spi.connector.ConnectorSplit;
import org.openjdk.jol.info.ClassLayout;

import java.util.List;

import static com.google.common.base.Preconditions.checkArgument;
import static io.airlift.slice.SizeOf.estimatedSizeOf;
import static java.util.Objects.requireNonNull;

public class InformationSchemaSplit
implements ConnectorSplit
{
private static final int INSTANCE_SIZE = ClassLayout.parseClass(InformationSchemaSplit.class).instanceSize();

private final List<HostAddress> addresses;

@JsonCreator
Expand Down Expand Up @@ -56,4 +60,11 @@ public Object getInfo()
{
return this;
}

@Override
public long getRetainedSizeInBytes()
{
return INSTANCE_SIZE
+ estimatedSizeOf(addresses, HostAddress::getRetainedSizeInBytes);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,15 +18,19 @@
import io.trino.spi.connector.ColumnHandle;
import io.trino.spi.connector.ColumnMetadata;
import io.trino.spi.connector.ConnectorTableMetadata;
import org.openjdk.jol.info.ClassLayout;

import java.util.Map;

import static com.google.common.collect.ImmutableMap.toImmutableMap;
import static io.airlift.slice.SizeOf.estimatedSizeOf;
import static java.util.Objects.requireNonNull;

public class SystemColumnHandle
implements ColumnHandle
{
private static final int INSTANCE_SIZE = ClassLayout.parseClass(SystemColumnHandle.class).instanceSize();

private final String columnName;

@JsonCreator
Expand Down Expand Up @@ -67,6 +71,12 @@ public String toString()
return columnName;
}

public long getRetainedSizeInBytes()
{
return INSTANCE_SIZE
+ estimatedSizeOf(columnName);
}

public static Map<String, ColumnHandle> toSystemColumnHandles(ConnectorTableMetadata tableMetadata)
{
return tableMetadata.getColumns().stream().collect(toImmutableMap(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,16 +20,20 @@
import io.trino.spi.connector.ColumnHandle;
import io.trino.spi.connector.ConnectorSplit;
import io.trino.spi.predicate.TupleDomain;
import org.openjdk.jol.info.ClassLayout;

import java.util.List;

import static com.google.common.base.MoreObjects.toStringHelper;
import static com.google.common.base.Preconditions.checkArgument;
import static io.airlift.slice.SizeOf.estimatedSizeOf;
import static java.util.Objects.requireNonNull;

public class SystemSplit
implements ConnectorSplit
{
private static final int INSTANCE_SIZE = ClassLayout.parseClass(SystemSplit.class).instanceSize();

private final List<HostAddress> addresses;
private final TupleDomain<ColumnHandle> constraint;

Expand Down Expand Up @@ -74,6 +78,14 @@ public Object getInfo()
return this;
}

@Override
public long getRetainedSizeInBytes()
{
return INSTANCE_SIZE
+ estimatedSizeOf(addresses, HostAddress::getRetainedSizeInBytes)
+ constraint.getRetainedSizeInBytes(columnHandle -> ((SystemColumnHandle) columnHandle).getRetainedSizeInBytes());
}

@Override
public String toString()
{
Expand Down
9 changes: 9 additions & 0 deletions core/trino-main/src/main/java/io/trino/execution/TaskId.java
Original file line number Diff line number Diff line change
Expand Up @@ -16,17 +16,21 @@
import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonValue;
import io.trino.spi.QueryId;
import org.openjdk.jol.info.ClassLayout;

import java.util.List;
import java.util.Objects;

import static com.google.common.base.Preconditions.checkArgument;
import static io.airlift.slice.SizeOf.estimatedSizeOf;
import static io.trino.spi.QueryId.parseDottedId;
import static java.lang.Integer.parseInt;
import static java.util.Objects.requireNonNull;

public class TaskId
{
private static final int INSTANCE_SIZE = ClassLayout.parseClass(TaskId.class).instanceSize();

@JsonCreator
public static TaskId valueOf(String taskId)
{
Expand Down Expand Up @@ -94,4 +98,9 @@ public boolean equals(Object obj)
TaskId other = (TaskId) obj;
return Objects.equals(this.fullId, other.fullId);
}

public long getRetainedSizeInBytes()
{
return INSTANCE_SIZE + estimatedSizeOf(fullId);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -539,7 +539,7 @@ private static Split createExchangeSplit(RemoteTask sourceTask, RemoteTask desti
// Fetch the results from the buffer assigned to the task based on id
URI exchangeLocation = sourceTask.getTaskStatus().getSelf();
URI splitLocation = uriBuilderFrom(exchangeLocation).appendPath("results").appendPath(String.valueOf(destinationTask.getTaskId().getPartitionId())).build();
return new Split(REMOTE_CONNECTOR_ID, new RemoteSplit(sourceTask.getTaskId(), splitLocation), Lifespan.taskWide());
return new Split(REMOTE_CONNECTOR_ID, new RemoteSplit(sourceTask.getTaskId(), splitLocation.toString()), Lifespan.taskWide());
}

public enum State
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import io.trino.split.RemoteSplit;
import io.trino.sql.planner.plan.PlanNodeId;

import java.net.URI;
import java.util.Optional;
import java.util.function.Supplier;

Expand Down Expand Up @@ -124,7 +125,7 @@ public Supplier<Optional<UpdatablePageSource>> addSplit(Split split)
checkArgument(split.getCatalogName().equals(REMOTE_CONNECTOR_ID), "split is not a remote split");

RemoteSplit remoteSplit = (RemoteSplit) split.getConnectorSplit();
exchangeClient.addLocation(remoteSplit.getTaskId(), remoteSplit.getLocation());
exchangeClient.addLocation(remoteSplit.getTaskId(), URI.create(remoteSplit.getLocation()));

return Optional::empty;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@

import java.io.IOException;
import java.io.UncheckedIOException;
import java.net.URI;
import java.util.ArrayList;
import java.util.List;
import java.util.Optional;
Expand Down Expand Up @@ -160,7 +161,7 @@ public Supplier<Optional<UpdatablePageSource>> addSplit(Split split)
TaskContext taskContext = operatorContext.getDriverContext().getPipelineContext().getTaskContext();
ExchangeClient exchangeClient = closer.register(exchangeClientSupplier.get(operatorContext.localSystemMemoryContext(), taskContext::sourceTaskFailed, RetryPolicy.NONE));
RemoteSplit remoteSplit = (RemoteSplit) split.getConnectorSplit();
exchangeClient.addLocation(remoteSplit.getTaskId(), remoteSplit.getLocation());
exchangeClient.addLocation(remoteSplit.getTaskId(), URI.create(remoteSplit.getLocation()));
exchangeClient.noMoreLocations();
pageProducers.add(exchangeClient.pages()
.map(serializedPage -> {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,13 @@ public Object getInfo()
return null;
}

@Override
public long getRetainedSizeInBytes()
{
// IndexSplit is expected to be short lived and is not expected to be queried for the memory it retains
throw new UnsupportedOperationException();
}

public RecordSet getKeyRecordSet()
{
return keyRecordSet;
Expand Down
10 changes: 10 additions & 0 deletions core/trino-main/src/main/java/io/trino/split/EmptySplit.java
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
import io.trino.connector.CatalogName;
import io.trino.spi.HostAddress;
import io.trino.spi.connector.ConnectorSplit;
import org.openjdk.jol.info.ClassLayout;

import java.util.List;

Expand All @@ -27,6 +28,8 @@
public class EmptySplit
implements ConnectorSplit
{
private static final int INSTANCE_SIZE = ClassLayout.parseClass(EmptySplit.class).instanceSize();

private final CatalogName catalogName;

@JsonCreator
Expand Down Expand Up @@ -54,6 +57,13 @@ public Object getInfo()
return this;
}

@Override
public long getRetainedSizeInBytes()
{
return INSTANCE_SIZE
+ catalogName.getRetainedSizeInBytes();
}

@JsonProperty
public CatalogName getCatalogName()
{
Expand Down
19 changes: 15 additions & 4 deletions core/trino-main/src/main/java/io/trino/split/RemoteSplit.java
Original file line number Diff line number Diff line change
Expand Up @@ -19,21 +19,24 @@
import io.trino.execution.TaskId;
import io.trino.spi.HostAddress;
import io.trino.spi.connector.ConnectorSplit;
import org.openjdk.jol.info.ClassLayout;

import java.net.URI;
import java.util.List;

import static com.google.common.base.MoreObjects.toStringHelper;
import static io.airlift.slice.SizeOf.estimatedSizeOf;
import static java.util.Objects.requireNonNull;

public class RemoteSplit
implements ConnectorSplit
{
private static final int INSTANCE_SIZE = ClassLayout.parseClass(RemoteSplit.class).instanceSize();

private final TaskId taskId;
private final URI location;
private final String location;
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why change from URI to String? Similarly for others parts (e.g. change to serialized page), is it for accurate memory tracking? How much of a difference does it make?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's hard to say. As i tried to explain in the commit message it is difficult to predict how much memory does the URI use, as it has many different fields where it caches certain URI parts as well as decoded / encoded representation for some of those.


@JsonCreator
public RemoteSplit(@JsonProperty("taskId") TaskId taskId, @JsonProperty("location") URI location)
public RemoteSplit(@JsonProperty("taskId") TaskId taskId, @JsonProperty("location") String location)
{
this.taskId = requireNonNull(taskId, "taskId is null");
this.location = requireNonNull(location, "location is null");
Expand All @@ -46,7 +49,7 @@ public TaskId getTaskId()
}

@JsonProperty
public URI getLocation()
public String getLocation()
{
return location;
}
Expand Down Expand Up @@ -77,4 +80,12 @@ public String toString()
.add("location", location)
.toString();
}

@Override
public long getRetainedSizeInBytes()
{
return INSTANCE_SIZE
+ taskId.getRetainedSizeInBytes()
+ estimatedSizeOf(location);
}
}
12 changes: 12 additions & 0 deletions core/trino-main/src/main/java/io/trino/testing/TestingSplit.java
Original file line number Diff line number Diff line change
Expand Up @@ -18,12 +18,17 @@
import com.google.common.collect.ImmutableList;
import io.trino.spi.HostAddress;
import io.trino.spi.connector.ConnectorSplit;
import org.openjdk.jol.info.ClassLayout;

import java.util.List;

import static io.airlift.slice.SizeOf.estimatedSizeOf;

public class TestingSplit
implements ConnectorSplit
{
private static final int INSTANCE_SIZE = ClassLayout.parseClass(TestingSplit.class).instanceSize();

private static final HostAddress localHost = HostAddress.fromString("127.0.0.1");

private final boolean remotelyAccessible;
Expand Down Expand Up @@ -70,4 +75,11 @@ public Object getInfo()
{
return this;
}

@Override
public long getRetainedSizeInBytes()
{
return INSTANCE_SIZE
+ estimatedSizeOf(addresses, HostAddress::getRetainedSizeInBytes);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -727,5 +727,11 @@ public Object getInfo()
{
return "mock connector split";
}

@Override
public long getRetainedSizeInBytes()
{
return 0;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@
import org.openjdk.jmh.annotations.State;
import org.openjdk.jmh.annotations.TearDown;
import org.openjdk.jmh.annotations.Warmup;
import org.openjdk.jol.info.ClassLayout;

import java.net.URI;
import java.util.ArrayList;
Expand All @@ -68,6 +69,7 @@
import java.util.concurrent.TimeUnit;

import static io.airlift.concurrent.Threads.daemonThreadsNamed;
import static io.airlift.slice.SizeOf.estimatedSizeOf;
import static io.trino.SystemSessionProperties.MAX_UNACKNOWLEDGED_SPLITS_PER_TASK;
import static java.util.concurrent.Executors.newCachedThreadPool;
import static java.util.concurrent.Executors.newScheduledThreadPool;
Expand Down Expand Up @@ -248,6 +250,8 @@ private static TopologyAwareNodeSelectorConfig getBenchmarkNetworkTopologyConfig
private static class TestSplitRemote
implements ConnectorSplit
{
private static final int INSTANCE_SIZE = ClassLayout.parseClass(TestSplitRemote.class).instanceSize();

private final List<HostAddress> hosts;

public TestSplitRemote(int dataHost)
Expand All @@ -272,6 +276,13 @@ public Object getInfo()
{
return this;
}

@Override
public long getRetainedSizeInBytes()
{
return INSTANCE_SIZE
+ estimatedSizeOf(hosts, HostAddress::getRetainedSizeInBytes);
}
}

private static HostAddress addressForHost(int host)
Expand Down
Loading