Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
42 commits
Select commit Hold shift + click to select a range
43977c1
Add SPI for table procedures
losipiuk Oct 5, 2021
1d19c7f
fixup! Add SPI for table procedures
losipiuk Oct 5, 2021
2a6619c
fixup! Add SPI for table procedures
losipiuk Oct 5, 2021
6985dbd
fixup! Add SPI for table procedures
losipiuk Oct 5, 2021
30f646b
Add TableProceduresRegistry to Metadata
losipiuk Sep 14, 2021
1db5b7d
fixup! Add TableProceduresRegistry to Metadata
losipiuk Oct 5, 2021
f153fbf
fixup! Add TableProceduresRegistry to Metadata
losipiuk Oct 5, 2021
9a5b851
Generalize AbstractPropertyManager
losipiuk Oct 5, 2021
f2bd0a1
Add TableProceduresPropertyManager
losipiuk Sep 21, 2021
bf7790f
Add support for table procedures SPI calls to Metadata
losipiuk Sep 14, 2021
14ed0ab
fixup! Add support for table procedures SPI calls to Metadata
losipiuk Oct 5, 2021
fd37a73
Add parser/analyzer support for ALTER TABLE ... EXECUTE
losipiuk Sep 14, 2021
76ab884
fixup! Add parser/analyzer support for ALTER TABLE ... EXECUTE
losipiuk Oct 5, 2021
d08e204
fixup! Add parser/analyzer support for ALTER TABLE ... EXECUTE
losipiuk Oct 5, 2021
b83447f
fixup! Add parser/analyzer support for ALTER TABLE ... EXECUTE
losipiuk Oct 5, 2021
db68344
fixup! Add parser/analyzer support for ALTER TABLE ... EXECUTE
losipiuk Oct 7, 2021
485f24d
Add access control for table procedures
losipiuk Oct 4, 2021
f407be1
fixup! Add access control for table procedures
losipiuk Oct 5, 2021
9597bb2
Define QueryExecution for TableExecute statement
losipiuk Sep 6, 2021
1074941
Add planner logic for TableExecute statement
losipiuk Sep 14, 2021
1a09c45
fixup! Add planner logic for TableExecute statement
losipiuk Oct 5, 2021
63b98da
Do not output rows count from ALTER TABLE EXECUTE
losipiuk Oct 11, 2021
3d63c92
Pass splits info to TableFinish operator for ALTER TABLE EXECUTE
losipiuk Sep 22, 2021
11d30ca
Prefer toImmutableList in HiveMetadata
losipiuk Oct 11, 2021
898fa8c
Add Hive OPTIMIZE table procedure
losipiuk Sep 24, 2021
f56191c
fixup! Add TableProceduresRegistry to Metadata
losipiuk Oct 8, 2021
77e91a3
fixup! Add SPI for table procedures
losipiuk Oct 8, 2021
5138fff
fixup! Add parser/analyzer support for ALTER TABLE ... EXECUTE
losipiuk Oct 8, 2021
3f1586a
fixup! Add planner logic for TableExecute statement
losipiuk Oct 11, 2021
e77c2bc
fixup! Add TableProceduresPropertyManager
losipiuk Oct 11, 2021
62aa891
fixup! Add planner logic for TableExecute statement
losipiuk Oct 11, 2021
04ad808
fixup! Pass splits info to TableFinish operator for ALTER TABLE EXECUTE
losipiuk Oct 11, 2021
d14457c
fixup! Pass splits info to TableFinish operator for ALTER TABLE EXECUTE
losipiuk Oct 11, 2021
b7c9b9b
fixup! Pass splits info to TableFinish operator for ALTER TABLE EXECUTE
losipiuk Oct 11, 2021
2724556
fixup! Pass splits info to TableFinish operator for ALTER TABLE EXECUTE
losipiuk Oct 11, 2021
21bc91c
fixup! Add Hive OPTIMIZE table procedure
losipiuk Oct 11, 2021
012bb1a
fixup! Add SPI for table procedures
losipiuk Oct 12, 2021
a047359
Add tableExecuteSplitsInfo to FixedSplitSource
losipiuk Sep 23, 2021
210b395
Allow filtering on partitions for Hive OPTIMIZE table procedure
losipiuk Oct 12, 2021
5e43e3e
fixup! Add parser/analyzer support for ALTER TABLE ... EXECUTE
losipiuk Oct 12, 2021
96d211f
Allow dropping declared write intentions in SemiTransactionalHiveMeta…
losipiuk Oct 12, 2021
262ebf3
Add logic to limit chance for data loss in Hive OPTIMIZE table procedure
losipiuk Oct 12, 2021
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@
import io.trino.spi.connector.ConnectorRecordSetProvider;
import io.trino.spi.connector.ConnectorSplitManager;
import io.trino.spi.connector.SystemTable;
import io.trino.spi.connector.TableProcedureMetadata;
import io.trino.spi.eventlistener.EventListener;
import io.trino.spi.procedure.Procedure;
import io.trino.spi.session.PropertyMetadata;
Expand Down Expand Up @@ -299,6 +300,8 @@ private synchronized void addConnectorInternal(MaterializedConnector connector)
.ifPresent(partitioningProvider -> nodePartitioningManager.addPartitioningProvider(catalogName, partitioningProvider));

metadataManager.getProcedureRegistry().addProcedures(catalogName, connector.getProcedures());
Set<TableProcedureMetadata> tableProcedures = connector.getTableProcedures();
metadataManager.getTableProcedureRegistry().addTableProcedures(catalogName, tableProcedures);

connector.getAccessControl()
.ifPresent(accessControl -> accessControlManager.addCatalogAccessControl(catalogName, accessControl));
Expand All @@ -308,6 +311,9 @@ private synchronized void addConnectorInternal(MaterializedConnector connector)
metadataManager.getColumnPropertyManager().addProperties(catalogName, connector.getColumnProperties());
metadataManager.getSchemaPropertyManager().addProperties(catalogName, connector.getSchemaProperties());
metadataManager.getAnalyzePropertyManager().addProperties(catalogName, connector.getAnalyzeProperties());
for (TableProcedureMetadata tableProcedure : tableProcedures) {
metadataManager.getTableProceduresPropertyManager().addProperties(catalogName, tableProcedure.getName(), tableProcedure.getProperties());
}
metadataManager.getSessionPropertyManager().addConnectorSessionProperties(catalogName, connector.getSessionProperties());
}

Expand All @@ -332,12 +338,14 @@ private synchronized void removeConnectorInternal(CatalogName catalogName)
indexManager.removeIndexProvider(catalogName);
nodePartitioningManager.removePartitioningProvider(catalogName);
metadataManager.getProcedureRegistry().removeProcedures(catalogName);
metadataManager.getTableProcedureRegistry().removeProcedures(catalogName);
accessControlManager.removeCatalogAccessControl(catalogName);
metadataManager.getTablePropertyManager().removeProperties(catalogName);
metadataManager.getMaterializedViewPropertyManager().removeProperties(catalogName);
metadataManager.getColumnPropertyManager().removeProperties(catalogName);
metadataManager.getSchemaPropertyManager().removeProperties(catalogName);
metadataManager.getAnalyzePropertyManager().removeProperties(catalogName);
metadataManager.getTableProceduresPropertyManager().removeProperties(catalogName);
metadataManager.getSessionPropertyManager().removeConnectorSessionProperties(catalogName);

MaterializedConnector materializedConnector = connectors.remove(catalogName);
Expand Down Expand Up @@ -401,6 +409,7 @@ private static class MaterializedConnector
private final Connector connector;
private final Set<SystemTable> systemTables;
private final Set<Procedure> procedures;
private final Set<TableProcedureMetadata> tableProcedures;
private final Optional<ConnectorSplitManager> splitManager;
private final Optional<ConnectorPageSourceProvider> pageSourceProvider;
private final Optional<ConnectorPageSinkProvider> pageSinkProvider;
Expand Down Expand Up @@ -428,6 +437,10 @@ public MaterializedConnector(CatalogName catalogName, Connector connector)
requireNonNull(procedures, format("Connector '%s' returned a null procedures set", catalogName));
this.procedures = ImmutableSet.copyOf(procedures);

Set<TableProcedureMetadata> tableProcedures = connector.getTableProcedures();
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There is a decision to be made whether table procedures should be statically or dynamically resolved.

for example, what should be the behavior when a procedure is applicable to certain relation types (eg only tables, or only certain type of tables)?
exposing it as if it existed might feel better and then throw at execute time "this is not applicable", but one could say the same about $bucket hidden column, or between-connectors user expectations (users may not appreciate the fact that hive supports different set of procedure names than iceberg connector).

Also, it might be that we want to overload the name -- expose same procedure name, but with different properties or execution mode, or internal implementation. For example "compact table" is logically applicable to transactional and non-transactional tables, but is different operation internally.

Thus, i would suggest we don't register them here, and move the procedure name resolution from Connector into ConnectorMetadata

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We had an offline converstation about this one. Let's leave it static for now.
tl;dr is:

  • it may be beneficial to be able to resolve a single procedure name to different implementations depending on context (e.g compact for Hive transactional table and for non-transactional table can do very different things). Yet we can achieve the same (similar) thing by doing conditional logic within the procedure implementation itself. Actually handling different table types within the procedure can be better, as we may construct nicer (non-generic) exception messages if the procedure is misused. Hence have better UX.
  • as for supporting different relation types (tables vs vies vs materialized views) dynamic resolution is just part of the story. The more tricky part is how the rest of SPI should look to support not just tables. It is not straightforward and we do not want to block this PR on resolving that. It should be a follow-up.

requireNonNull(procedures, format("Connector '%s' returned a null table procedures set", catalogName));
this.tableProcedures = ImmutableSet.copyOf(tableProcedures);

ConnectorSplitManager splitManager = null;
try {
splitManager = connector.getSplitManager();
Expand Down Expand Up @@ -538,6 +551,11 @@ public Set<Procedure> getProcedures()
return procedures;
}

public Set<TableProcedureMetadata> getTableProcedures()
{
return tableProcedures;
}

public Optional<ConnectorSplitManager> getSplitManager()
{
return splitManager;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -132,6 +132,7 @@ public class SqlQueryExecution
private final StatsCalculator statsCalculator;
private final CostCalculator costCalculator;
private final DynamicFilterService dynamicFilterService;
private final TableExecuteContextManager tableExecuteContextManager;

private SqlQueryExecution(
PreparedQuery preparedQuery,
Expand Down Expand Up @@ -159,7 +160,8 @@ private SqlQueryExecution(
StatsCalculator statsCalculator,
CostCalculator costCalculator,
DynamicFilterService dynamicFilterService,
WarningCollector warningCollector)
WarningCollector warningCollector,
TableExecuteContextManager tableExecuteContextManager)
{
try (SetThreadName ignored = new SetThreadName("Query-%s", stateMachine.getQueryId())) {
this.slug = requireNonNull(slug, "slug is null");
Expand All @@ -180,6 +182,7 @@ private SqlQueryExecution(
this.statsCalculator = requireNonNull(statsCalculator, "statsCalculator is null");
this.costCalculator = requireNonNull(costCalculator, "costCalculator is null");
this.dynamicFilterService = requireNonNull(dynamicFilterService, "dynamicFilterService is null");
this.tableExecuteContextManager = requireNonNull(tableExecuteContextManager, "tableExecuteContextManager is null");

checkArgument(scheduleSplitBatchSize > 0, "scheduleSplitBatchSize must be greater than 0");
this.scheduleSplitBatchSize = scheduleSplitBatchSize;
Expand All @@ -195,6 +198,8 @@ private SqlQueryExecution(
}
unregisterDynamicFilteringQuery(
dynamicFilterService.getDynamicFilteringStats(stateMachine.getQueryId(), stateMachine.getSession()));

tableExecuteContextManager.unregisterTableExecuteContextForQuery(stateMachine.getQueryId());
});

// when the query finishes cache the final query info, and clear the reference to the output stage
Expand Down Expand Up @@ -423,6 +428,8 @@ public void start()
}
}

tableExecuteContextManager.registerTableExecuteContextForQuery(getQueryId());

if (!stateMachine.transitionToStarting()) {
// query already started or finished
return;
Expand Down Expand Up @@ -544,7 +551,8 @@ private void planDistribution(PlanRoot plan)
nodeTaskMap,
executionPolicy,
schedulerStats,
dynamicFilterService);
dynamicFilterService,
tableExecuteContextManager);

queryScheduler.set(scheduler);

Expand Down Expand Up @@ -741,6 +749,7 @@ public static class SqlQueryExecutionFactory
private final StatsCalculator statsCalculator;
private final CostCalculator costCalculator;
private final DynamicFilterService dynamicFilterService;
private final TableExecuteContextManager tableExecuteContextManager;

@Inject
SqlQueryExecutionFactory(
Expand All @@ -765,7 +774,8 @@ public static class SqlQueryExecutionFactory
SplitSchedulerStats schedulerStats,
StatsCalculator statsCalculator,
CostCalculator costCalculator,
DynamicFilterService dynamicFilterService)
DynamicFilterService dynamicFilterService,
TableExecuteContextManager tableExecuteContextManager)
{
requireNonNull(config, "config is null");
this.schedulerStats = requireNonNull(schedulerStats, "schedulerStats is null");
Expand All @@ -790,6 +800,7 @@ public static class SqlQueryExecutionFactory
this.statsCalculator = requireNonNull(statsCalculator, "statsCalculator is null");
this.costCalculator = requireNonNull(costCalculator, "costCalculator is null");
this.dynamicFilterService = requireNonNull(dynamicFilterService, "dynamicFilterService is null");
this.tableExecuteContextManager = requireNonNull(tableExecuteContextManager, "tableExecuteContextManager is null");
}

@Override
Expand Down Expand Up @@ -829,7 +840,8 @@ public QueryExecution createQueryExecution(
statsCalculator,
costCalculator,
dynamicFilterService,
warningCollector);
warningCollector,
tableExecuteContextManager);
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
/*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.trino.execution;

import com.google.common.collect.ImmutableList;

import javax.annotation.concurrent.GuardedBy;

import java.util.List;

import static java.util.Objects.requireNonNull;

public class TableExecuteContext
{
@GuardedBy("this")
private List<Object> splitsInfo;

public synchronized void setSplitsInfo(List<Object> splitsInfo)
{
requireNonNull(splitsInfo, "splitsInfo is null");
if (this.splitsInfo != null) {
throw new IllegalStateException("splitsInfo already set to " + this.splitsInfo);
}
this.splitsInfo = ImmutableList.copyOf(splitsInfo);
}

public synchronized List<Object> getSplitsInfo()
{
if (splitsInfo == null) {
throw new IllegalStateException("splitsInfo not set yet");
}
return splitsInfo;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
/*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.trino.execution;

import io.trino.spi.QueryId;

import javax.annotation.concurrent.ThreadSafe;

import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;

@ThreadSafe
public class TableExecuteContextManager
{
private final ConcurrentMap<QueryId, TableExecuteContext> contexts = new ConcurrentHashMap<>();

public void registerTableExecuteContextForQuery(QueryId queryId)
{
TableExecuteContext newContext = new TableExecuteContext();
if (contexts.putIfAbsent(queryId, newContext) != null) {
throw new IllegalStateException("TableExecuteContext already registered for query " + queryId);
}
}

public void unregisterTableExecuteContextForQuery(QueryId queryId)
{
contexts.remove(queryId);
}

public TableExecuteContext getTableExecuteContextForQuery(QueryId queryId)
{
TableExecuteContext context = contexts.get(queryId);
if (context == null) {
throw new IllegalStateException("TableExecuteContext not registered for query " + queryId);
}
return context;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import io.trino.execution.Lifespan;
import io.trino.execution.RemoteTask;
import io.trino.execution.SqlStageExecution;
import io.trino.execution.TableExecuteContextManager;
import io.trino.execution.scheduler.ScheduleResult.BlockedReason;
import io.trino.execution.scheduler.group.DynamicLifespanScheduler;
import io.trino.execution.scheduler.group.FixedLifespanScheduler;
Expand Down Expand Up @@ -75,13 +76,15 @@ public FixedSourcePartitionedScheduler(
OptionalInt concurrentLifespansPerTask,
NodeSelector nodeSelector,
List<ConnectorPartitionHandle> partitionHandles,
DynamicFilterService dynamicFilterService)
DynamicFilterService dynamicFilterService,
TableExecuteContextManager tableExecuteContextManager)
{
requireNonNull(stage, "stage is null");
requireNonNull(splitSources, "splitSources is null");
requireNonNull(bucketNodeMap, "bucketNodeMap is null");
checkArgument(!requireNonNull(nodes, "nodes is null").isEmpty(), "nodes is empty");
requireNonNull(partitionHandles, "partitionHandles is null");
requireNonNull(tableExecuteContextManager, "tableExecuteContextManager is null");

this.stage = stage;
this.nodes = ImmutableList.copyOf(nodes);
Expand Down Expand Up @@ -119,6 +122,7 @@ public FixedSourcePartitionedScheduler(
Math.max(splitBatchSize / concurrentLifespans, 1),
groupedExecutionForScanNode,
dynamicFilterService,
tableExecuteContextManager,
() -> true);

if (stageExecutionDescriptor.isStageGroupedExecution() && !groupedExecutionForScanNode) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,8 @@
import io.trino.execution.Lifespan;
import io.trino.execution.RemoteTask;
import io.trino.execution.SqlStageExecution;
import io.trino.execution.TableExecuteContext;
import io.trino.execution.TableExecuteContextManager;
import io.trino.execution.scheduler.FixedSourcePartitionedScheduler.BucketedSplitPlacementPolicy;
import io.trino.metadata.InternalNode;
import io.trino.metadata.Split;
Expand All @@ -40,6 +42,7 @@
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.ThreadLocalRandom;
import java.util.function.BooleanSupplier;
Expand Down Expand Up @@ -96,6 +99,7 @@ private enum State
private final PlanNodeId partitionedNode;
private final boolean groupedExecution;
private final DynamicFilterService dynamicFilterService;
private final TableExecuteContextManager tableExecuteContextManager;
private final BooleanSupplier anySourceTaskBlocked;

private final Map<Lifespan, ScheduleGroup> scheduleGroups = new HashMap<>();
Expand All @@ -112,13 +116,15 @@ private SourcePartitionedScheduler(
int splitBatchSize,
boolean groupedExecution,
DynamicFilterService dynamicFilterService,
TableExecuteContextManager tableExecuteContextManager,
BooleanSupplier anySourceTaskBlocked)
{
this.stage = requireNonNull(stage, "stage is null");
this.partitionedNode = requireNonNull(partitionedNode, "partitionedNode is null");
this.splitSource = requireNonNull(splitSource, "splitSource is null");
this.splitPlacementPolicy = requireNonNull(splitPlacementPolicy, "splitPlacementPolicy is null");
this.dynamicFilterService = requireNonNull(dynamicFilterService, "dynamicFilterService is null");
this.tableExecuteContextManager = requireNonNull(tableExecuteContextManager, "tableExecuteContextManager is null");
this.anySourceTaskBlocked = requireNonNull(anySourceTaskBlocked, "anySourceTaskBlocked is null");

checkArgument(splitBatchSize > 0, "splitBatchSize must be at least one");
Expand Down Expand Up @@ -146,6 +152,7 @@ public static StageScheduler newSourcePartitionedSchedulerAsStageScheduler(
SplitPlacementPolicy splitPlacementPolicy,
int splitBatchSize,
DynamicFilterService dynamicFilterService,
TableExecuteContextManager tableExecuteContextManager,
BooleanSupplier anySourceTaskBlocked)
{
SourcePartitionedScheduler sourcePartitionedScheduler = new SourcePartitionedScheduler(
Expand All @@ -156,6 +163,7 @@ public static StageScheduler newSourcePartitionedSchedulerAsStageScheduler(
splitBatchSize,
false,
dynamicFilterService,
tableExecuteContextManager,
anySourceTaskBlocked);
sourcePartitionedScheduler.startLifespan(Lifespan.taskWide(), NOT_PARTITIONED);
sourcePartitionedScheduler.noMoreLifespans();
Expand Down Expand Up @@ -197,6 +205,7 @@ public static SourceScheduler newSourcePartitionedSchedulerAsSourceScheduler(
int splitBatchSize,
boolean groupedExecution,
DynamicFilterService dynamicFilterService,
TableExecuteContextManager tableExecuteContextManager,
BooleanSupplier anySourceTaskBlocked)
{
return new SourcePartitionedScheduler(
Expand All @@ -207,6 +216,7 @@ public static SourceScheduler newSourcePartitionedSchedulerAsSourceScheduler(
splitBatchSize,
groupedExecution,
dynamicFilterService,
tableExecuteContextManager,
anySourceTaskBlocked);
}

Expand Down Expand Up @@ -357,6 +367,16 @@ else if (pendingSplits.isEmpty()) {
throw new IllegalStateException("At least 1 split should have been scheduled for this plan node");
case SPLITS_ADDED:
state = State.NO_MORE_SPLITS;

Optional<List<Object>> tableExecuteSplitsInfo = splitSource.getTableExecuteSplitsInfo();
// Here we assume that we can get non-empty tableExecuteSplitsInfo only for queries which facilitate single split-source.
// If we had more than one split-source and got multiple tableExecuteSplitsInfos the call to tableExecuteContext.setSplitsInfo will
// throw an error.
tableExecuteSplitsInfo.ifPresent(info -> {
TableExecuteContext tableExecuteContext = tableExecuteContextManager.getTableExecuteContextForQuery(stage.getStageId().getQueryId());
tableExecuteContext.setSplitsInfo(info);
});

splitSource.close();
// fall through
case NO_MORE_SPLITS:
Expand Down
Loading