Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -13,8 +13,6 @@
*/
package io.trino.connector;

import io.trino.memory.LocalMemoryManager;
import io.trino.memory.MemoryPool;
import io.trino.metadata.Catalog;
import io.trino.spi.catalog.CatalogProperties;
import io.trino.spi.connector.CatalogHandle;
Expand All @@ -24,9 +22,6 @@
import java.util.Optional;

import static com.google.common.base.MoreObjects.toStringHelper;
import static io.airlift.units.DataSize.succinctBytes;
import static io.trino.ExceededMemoryLimitException.exceededLocalUserMemoryLimit;
import static java.lang.String.format;
import static java.util.Objects.requireNonNull;

public class CatalogConnector
Expand All @@ -38,33 +33,21 @@ public class CatalogConnector
private final ConnectorServices systemConnector;
private final Optional<CatalogProperties> catalogProperties;
private final Catalog catalog;
private final LocalMemoryManager localMemoryManager;
private final long connectorMemory;

public CatalogConnector(
CatalogHandle catalogHandle,
ConnectorName connectorName,
ConnectorServices catalogConnector,
ConnectorServices informationSchemaConnector,
ConnectorServices systemConnector,
LocalMemoryManager localMemoryManager,
Optional<CatalogProperties> catalogProperties)
{
this.catalogHandle = requireNonNull(catalogHandle, "catalogHandle is null");
this.connectorName = requireNonNull(connectorName, "connectorName is null");
this.catalogConnector = requireNonNull(catalogConnector, "catalogConnector is null");
this.informationSchemaConnector = requireNonNull(informationSchemaConnector, "informationSchemaConnector is null");
this.systemConnector = requireNonNull(systemConnector, "systemConnector is null");
this.localMemoryManager = requireNonNull(localMemoryManager, "localMemoryManager is null");
this.catalogProperties = requireNonNull(catalogProperties, "catalogProperties is null");
this.connectorMemory = catalogConnector.getConnector().getInitialMemoryRequirement();

MemoryPool memoryPool = localMemoryManager.getMemoryPool();
boolean success = memoryPool.tryReserveConnectorMemory(connectorMemory);
if (!success) {
String info = format("tried to reserve %s for connector %s", succinctBytes(connectorMemory), connectorName);
throw exceededLocalUserMemoryLimit(succinctBytes(memoryPool.getMaxBytes()), info);
}

this.catalog = new Catalog(
catalogHandle.getCatalogName(),
Expand Down Expand Up @@ -106,8 +89,6 @@ public ConnectorServices getMaterializedConnector(CatalogHandleType type)

public void shutdown()
{
localMemoryManager.getMemoryPool().freeConnectorMemory(connectorMemory);

catalogConnector.shutdown();
informationSchemaConnector.shutdown();
systemConnector.shutdown();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,6 @@
import io.trino.connector.system.SystemConnector;
import io.trino.connector.system.SystemTablesProvider;
import io.trino.execution.scheduler.NodeSchedulerConfig;
import io.trino.memory.LocalMemoryManager;
import io.trino.metadata.InternalNodeManager;
import io.trino.metadata.Metadata;
import io.trino.security.AccessControl;
Expand Down Expand Up @@ -73,7 +72,6 @@ public class DefaultCatalogFactory
private final int maxPrefetchedInformationSchemaPrefixes;

private final ConcurrentMap<ConnectorName, ConnectorFactory> connectorFactories = new ConcurrentHashMap<>();
private final LocalMemoryManager localMemoryManager;
private final SecretsResolver secretsResolver;

@Inject
Expand All @@ -90,7 +88,6 @@ public DefaultCatalogFactory(
TypeManager typeManager,
NodeSchedulerConfig nodeSchedulerConfig,
OptimizerConfig optimizerConfig,
LocalMemoryManager localMemoryManager,
SecretsResolver secretsResolver)
{
this.metadata = requireNonNull(metadata, "metadata is null");
Expand All @@ -105,7 +102,6 @@ public DefaultCatalogFactory(
this.typeManager = requireNonNull(typeManager, "typeManager is null");
this.schedulerIncludeCoordinator = nodeSchedulerConfig.isIncludeCoordinator();
this.maxPrefetchedInformationSchemaPrefixes = optimizerConfig.getMaxPrefetchedInformationSchemaPrefixes();
this.localMemoryManager = requireNonNull(localMemoryManager, "localMemoryManager is null");
this.secretsResolver = requireNonNull(secretsResolver, "secretsResolver is null");
}

Expand Down Expand Up @@ -188,7 +184,6 @@ private CatalogConnector createCatalog(CatalogHandle catalogHandle, ConnectorNam
catalogConnector,
informationSchemaConnector,
systemConnector,
localMemoryManager,
catalogProperties);
}

Expand Down
41 changes: 0 additions & 41 deletions core/trino-main/src/main/java/io/trino/memory/MemoryPool.java
Original file line number Diff line number Diff line change
Expand Up @@ -71,9 +71,6 @@ public class MemoryPool
@GuardedBy("this")
private final Map<TaskId, Long> taskRevocableMemoryReservations = new HashMap<>();

@GuardedBy("this")
private long connectorsReservedBytes;

private final List<MemoryPoolListener> listeners = new CopyOnWriteArrayList<>();

public MemoryPool(DataSize size)
Expand Down Expand Up @@ -210,20 +207,6 @@ public boolean tryReserve(TaskId taskId, String allocationTag, long bytes)
return true;
}

public boolean tryReserveConnectorMemory(long bytes)
{
checkArgument(bytes >= 0, "'%s' is negative", bytes);
synchronized (this) {
if (getFreeBytes() - bytes < 0) {
return false;
}
connectorsReservedBytes += bytes;
reservedBytes += bytes;
}
onMemoryReserved();
return true;
}

public boolean tryReserveRevocable(long bytes)
{
checkArgument(bytes >= 0, "'%s' is negative", bytes);
Expand Down Expand Up @@ -338,24 +321,6 @@ public synchronized void freeRevocable(long bytes)
}
}

public synchronized void freeConnectorMemory(long bytes)
{
checkArgument(bytes >= 0, "'%s' is negative", bytes);
checkArgument(reservedBytes >= bytes, "tried to free more memory than is reserved");
checkArgument(connectorsReservedBytes >= bytes, "tried to free more memory for the connectors than is reserved");
if (bytes == 0) {
// Freeing zero bytes is a no-op
return;
}

connectorsReservedBytes -= bytes;
reservedBytes -= bytes;
if (getFreeBytes() > 0 && future != null) {
future.set(null);
future = null;
}
}

/**
* Returns the number of free bytes. This value may be negative, which indicates that the pool is over-committed.
*/
Expand Down Expand Up @@ -383,12 +348,6 @@ public synchronized long getReservedRevocableBytes()
return reservedRevocableBytes;
}

@Managed
public synchronized long getConnectorsReservedBytes()
{
return connectorsReservedBytes;
}

long getQueryMemoryReservation(QueryId queryId)
{
return queryMemoryReservations.getOrDefault(queryId, 0L);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,6 @@
import io.trino.execution.scheduler.NodeSchedulerConfig;
import io.trino.execution.scheduler.UniformNodeSelectorFactory;
import io.trino.execution.warnings.WarningCollector;
import io.trino.memory.LocalMemoryManager;
import io.trino.memory.MemoryManagerConfig;
import io.trino.memory.NodeMemoryConfig;
import io.trino.metadata.AnalyzePropertyManager;
Expand Down Expand Up @@ -396,7 +395,6 @@ private PlanTester(Session defaultSession, int nodeCountForStats)
typeManager,
nodeSchedulerConfig,
optimizerConfig,
new LocalMemoryManager(new NodeMemoryConfig()),
secretsResolver));
this.splitManager = new SplitManager(createSplitManagerProvider(catalogManager), tracer, new QueryManagerConfig());
this.pageSourceManager = new PageSourceManager(createPageSourceProviderFactory(catalogManager));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -131,7 +131,6 @@ public CatalogConnector createCatalog(CatalogProperties catalogProperties)
noOpConnectorService,
noOpConnectorService,
noOpConnectorService,
new LocalMemoryManager(new NodeMemoryConfig()),
Optional.of(catalogProperties));
}

Expand Down
19 changes: 0 additions & 19 deletions core/trino-main/src/test/java/io/trino/memory/TestMemoryPools.java
Original file line number Diff line number Diff line change
Expand Up @@ -303,25 +303,6 @@ void testPerTaskAllocations()
assertThat(testPool.getTaskMemoryReservation(q2task1)).isEqualTo(9L);
}

@Test
void testGlobalAllocations()
{
MemoryPool testPool = new MemoryPool(DataSize.ofBytes(1000));

assertThat(testPool.tryReserveConnectorMemory(999)).isTrue();
assertThat(testPool.tryReserveConnectorMemory(2)).isFalse();
assertThat(testPool.getReservedBytes()).isEqualTo(999);
assertThat(testPool.getConnectorsReservedBytes()).isEqualTo(999);
assertThat(testPool.getReservedRevocableBytes()).isEqualTo(0);
assertThat(testPool.getTaskMemoryReservations()).isEmpty();
assertThat(testPool.getQueryMemoryReservations()).isEmpty();
assertThat(testPool.getTaggedMemoryAllocations()).isEmpty();

testPool.freeConnectorMemory(999);
assertThat(testPool.getReservedBytes()).isEqualTo(0);
assertThat(testPool.getConnectorsReservedBytes()).isEqualTo(0);
}

@Test
void testGlobalRevocableAllocations()
{
Expand Down
6 changes: 6 additions & 0 deletions core/trino-spi/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -308,6 +308,12 @@
<old>method java.lang.String io.trino.spi.block.VariableWidthBlock::getEncodingName()</old>
<justification>Removal of bidrectional coupling of Blocks and BlockEncodings</justification>
</item>
<item>
<ignore>true</ignore>
<code>java.method.removed</code>
<old>method long io.trino.spi.connector.Connector::getInitialMemoryRequirement()</old>
<justification>Remove unused connector method</justification>
</item>
</differences>
</revapi.differences>
</analysisConfiguration>
Expand Down
12 changes: 0 additions & 12 deletions core/trino-spi/src/main/java/io/trino/spi/connector/Connector.java
Original file line number Diff line number Diff line change
Expand Up @@ -146,18 +146,6 @@ default Set<TableProcedureMetadata> getTableProcedures()
return emptySet();
}

/**
* Retrieves the initial memory requirement for the connector.
* <p>
* The memory allocation is per catalog and is freed when the catalog is shut down.
*
* @return the initial memory requirement in bytes.
*/
default long getInitialMemoryRequirement()
{
return 0;
}

/**
* @return the set of table functions provided by this connector
*/
Expand Down