diff --git a/core/trino-main/src/main/java/io/trino/FeaturesConfig.java b/core/trino-main/src/main/java/io/trino/FeaturesConfig.java index b8bdf0aabd96..66514000ecda 100644 --- a/core/trino-main/src/main/java/io/trino/FeaturesConfig.java +++ b/core/trino-main/src/main/java/io/trino/FeaturesConfig.java @@ -72,7 +72,7 @@ public class FeaturesConfig private boolean dynamicScheduleForGroupedExecution; private int concurrentLifespansPerTask; private boolean redistributeWrites = true; - private boolean scaleWriters; + private boolean scaleWriters = true; private DataSize writerMinSize = DataSize.of(32, DataSize.Unit.MEGABYTE); private DataIntegrityVerification exchangeDataIntegrityVerification = DataIntegrityVerification.ABORT; private boolean exchangeCompressionEnabled; diff --git a/core/trino-main/src/main/java/io/trino/metadata/Metadata.java b/core/trino-main/src/main/java/io/trino/metadata/Metadata.java index 0d30e870bd52..f15fa85961b9 100644 --- a/core/trino-main/src/main/java/io/trino/metadata/Metadata.java +++ b/core/trino-main/src/main/java/io/trino/metadata/Metadata.java @@ -701,6 +701,16 @@ default boolean isMaterializedView(Session session, QualifiedObjectName viewName */ boolean isValidTableVersion(Session session, QualifiedObjectName tableName, TableVersion version); + /** + * Returns true if the connector reports number of written bytes for an existing table. Otherwise, it returns false. + */ + boolean supportsReportingWrittenBytes(Session session, TableHandle tableHandle); + + /** + * Returns true if the connector reports number of written bytes for a new table. Otherwise, it returns false. + */ + boolean supportsReportingWrittenBytes(Session session, QualifiedObjectName tableName, Map tableProperties); + /** * Returns a table handle for the specified table name with a specified version */ diff --git a/core/trino-main/src/main/java/io/trino/metadata/MetadataManager.java b/core/trino-main/src/main/java/io/trino/metadata/MetadataManager.java index 64a8c05b9f1d..096f36b39003 100644 --- a/core/trino-main/src/main/java/io/trino/metadata/MetadataManager.java +++ b/core/trino-main/src/main/java/io/trino/metadata/MetadataManager.java @@ -2403,6 +2403,21 @@ public boolean isValidTableVersion(Session session, QualifiedObjectName tableNam return metadata.isSupportedVersionType(session.toConnectorSession(), tableName.asSchemaTableName(), version.getPointerType(), version.getObjectType()); } + @Override + public boolean supportsReportingWrittenBytes(Session session, QualifiedObjectName tableName, Map tableProperties) + { + CatalogName catalogName = new CatalogName(tableName.getCatalogName()); + ConnectorMetadata metadata = getMetadata(session, catalogName); + return metadata.supportsReportingWrittenBytes(session.toConnectorSession(catalogName), tableName.asSchemaTableName(), tableProperties); + } + + @Override + public boolean supportsReportingWrittenBytes(Session session, TableHandle tableHandle) + { + ConnectorMetadata metadata = getMetadata(session, tableHandle.getCatalogName()); + return metadata.supportsReportingWrittenBytes(session.toConnectorSession(tableHandle.getCatalogName()), tableHandle.getConnectorHandle()); + } + private Optional toConnectorVersion(Optional version) { Optional connectorVersion = Optional.empty(); diff --git a/core/trino-main/src/main/java/io/trino/sql/planner/LogicalPlanner.java b/core/trino-main/src/main/java/io/trino/sql/planner/LogicalPlanner.java index e59ea95741ad..ef11d6fcf67a 100644 --- a/core/trino-main/src/main/java/io/trino/sql/planner/LogicalPlanner.java +++ b/core/trino-main/src/main/java/io/trino/sql/planner/LogicalPlanner.java @@ -399,7 +399,6 @@ private RelationPlan createTableCreationPlan(Analysis analysis, Query query) .collect(toImmutableList()); TableStatisticsMetadata statisticsMetadata = metadata.getStatisticsCollectionMetadataForWrite(session, destination.getCatalogName(), tableMetadata); - return createTableWriterPlan( analysis, plan.getRoot(), @@ -856,7 +855,8 @@ private RelationPlan createTableExecutePlan(Analysis analysis, TableExecute stat .map(ColumnMetadata::getName) .collect(toImmutableList()); - TableWriterNode.TableExecuteTarget tableExecuteTarget = new TableWriterNode.TableExecuteTarget(executeHandle, Optional.empty(), tableName.asSchemaTableName()); + boolean supportsReportingWrittenBytes = metadata.supportsReportingWrittenBytes(session, tableHandle); + TableWriterNode.TableExecuteTarget tableExecuteTarget = new TableWriterNode.TableExecuteTarget(executeHandle, Optional.empty(), tableName.asSchemaTableName(), supportsReportingWrittenBytes); Optional layout = metadata.getLayoutForTableExecute(session, executeHandle); diff --git a/core/trino-main/src/main/java/io/trino/sql/planner/optimizations/AddExchanges.java b/core/trino-main/src/main/java/io/trino/sql/planner/optimizations/AddExchanges.java index 9a6f3f454019..8ac2c80f2593 100644 --- a/core/trino-main/src/main/java/io/trino/sql/planner/optimizations/AddExchanges.java +++ b/core/trino-main/src/main/java/io/trino/sql/planner/optimizations/AddExchanges.java @@ -592,13 +592,13 @@ public PlanWithProperties visitRefreshMaterializedView(RefreshMaterializedViewNo @Override public PlanWithProperties visitTableWriter(TableWriterNode node, PreferredProperties preferredProperties) { - return visitTableWriter(node, node.getPartitioningScheme(), node.getSource(), preferredProperties); + return visitTableWriter(node, node.getPartitioningScheme(), node.getSource(), preferredProperties, node.getTarget()); } @Override public PlanWithProperties visitTableExecute(TableExecuteNode node, PreferredProperties preferredProperties) { - return visitTableWriter(node, node.getPartitioningScheme(), node.getSource(), preferredProperties); + return visitTableWriter(node, node.getPartitioningScheme(), node.getSource(), preferredProperties, node.getTarget()); } @Override @@ -611,12 +611,12 @@ public PlanWithProperties visitSimpleTableExecuteNode(SimpleTableExecuteNode nod .build()); } - private PlanWithProperties visitTableWriter(PlanNode node, Optional partitioningScheme, PlanNode source, PreferredProperties preferredProperties) + private PlanWithProperties visitTableWriter(PlanNode node, Optional partitioningScheme, PlanNode source, PreferredProperties preferredProperties, TableWriterNode.WriterTarget writerTarget) { PlanWithProperties newSource = source.accept(this, preferredProperties); if (partitioningScheme.isEmpty()) { - if (scaleWriters) { + if (scaleWriters && writerTarget.supportsReportingWrittenBytes(plannerContext.getMetadata(), session)) { partitioningScheme = Optional.of(new PartitioningScheme(Partitioning.create(SCALED_WRITER_DISTRIBUTION, ImmutableList.of()), newSource.getNode().getOutputSymbols())); } else if (redistributeWrites) { diff --git a/core/trino-main/src/main/java/io/trino/sql/planner/optimizations/BeginTableWrite.java b/core/trino-main/src/main/java/io/trino/sql/planner/optimizations/BeginTableWrite.java index 82325676a935..ec9623510dc6 100644 --- a/core/trino-main/src/main/java/io/trino/sql/planner/optimizations/BeginTableWrite.java +++ b/core/trino-main/src/main/java/io/trino/sql/planner/optimizations/BeginTableWrite.java @@ -237,7 +237,8 @@ public WriterTarget getWriterTarget(PlanNode node) return new TableExecuteTarget( target.getExecuteHandle(), findTableScanHandleForTableExecute(((TableExecuteNode) node).getSource()), - target.getSchemaTableName()); + target.getSchemaTableName(), + target.isReportingWrittenBytesSupported()); } if (node instanceof ExchangeNode || node instanceof UnionNode) { Set writerTargets = node.getSources().stream() @@ -254,11 +255,11 @@ private WriterTarget createWriterTarget(WriterTarget target) // TODO: we shouldn't need to store the schemaTableName in the handles, but there isn't a good way to pass this around with the current architecture if (target instanceof CreateReference) { CreateReference create = (CreateReference) target; - return new CreateTarget(metadata.beginCreateTable(session, create.getCatalog(), create.getTableMetadata(), create.getLayout()), create.getTableMetadata().getTable()); + return new CreateTarget(metadata.beginCreateTable(session, create.getCatalog(), create.getTableMetadata(), create.getLayout()), create.getTableMetadata().getTable(), target.supportsReportingWrittenBytes(metadata, session)); } if (target instanceof InsertReference) { InsertReference insert = (InsertReference) target; - return new InsertTarget(metadata.beginInsert(session, insert.getHandle(), insert.getColumns()), metadata.getTableMetadata(session, insert.getHandle()).getTable()); + return new InsertTarget(metadata.beginInsert(session, insert.getHandle(), insert.getColumns()), metadata.getTableMetadata(session, insert.getHandle()).getTable(), target.supportsReportingWrittenBytes(metadata, session)); } if (target instanceof DeleteTarget) { DeleteTarget delete = (DeleteTarget) target; @@ -285,8 +286,7 @@ private WriterTarget createWriterTarget(WriterTarget target) if (target instanceof TableExecuteTarget) { TableExecuteTarget tableExecute = (TableExecuteTarget) target; BeginTableExecuteResult result = metadata.beginTableExecute(session, tableExecute.getExecuteHandle(), tableExecute.getMandatorySourceHandle()); - - return new TableExecuteTarget(result.getTableExecuteHandle(), Optional.of(result.getSourceHandle()), tableExecute.getSchemaTableName()); + return new TableExecuteTarget(result.getTableExecuteHandle(), Optional.of(result.getSourceHandle()), tableExecute.getSchemaTableName(), tableExecute.isReportingWrittenBytesSupported()); } throw new IllegalArgumentException("Unhandled target type: " + target.getClass().getSimpleName()); } diff --git a/core/trino-main/src/main/java/io/trino/sql/planner/plan/TableWriterNode.java b/core/trino-main/src/main/java/io/trino/sql/planner/plan/TableWriterNode.java index 980d5d53adcf..f2464741a7c5 100644 --- a/core/trino-main/src/main/java/io/trino/sql/planner/plan/TableWriterNode.java +++ b/core/trino-main/src/main/java/io/trino/sql/planner/plan/TableWriterNode.java @@ -21,8 +21,11 @@ import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableSet; import com.google.common.collect.Iterables; +import io.trino.Session; import io.trino.metadata.InsertTableHandle; +import io.trino.metadata.Metadata; import io.trino.metadata.OutputTableHandle; +import io.trino.metadata.QualifiedObjectName; import io.trino.metadata.TableExecuteHandle; import io.trino.metadata.TableHandle; import io.trino.metadata.TableLayout; @@ -208,6 +211,8 @@ public abstract static class WriterTarget { @Override public abstract String toString(); + + public abstract boolean supportsReportingWrittenBytes(Metadata metadata, Session session); } // only used during planning -- will not be serialized @@ -230,9 +235,14 @@ public String getCatalog() return catalog; } - public ConnectorTableMetadata getTableMetadata() + @Override + public boolean supportsReportingWrittenBytes(Metadata metadata, Session session) { - return tableMetadata; + QualifiedObjectName fullTableName = new QualifiedObjectName( + catalog, + tableMetadata.getTableSchema().getTable().getSchemaName(), + tableMetadata.getTableSchema().getTable().getTableName()); + return metadata.supportsReportingWrittenBytes(session, fullTableName, tableMetadata.getProperties()); } public Optional getLayout() @@ -240,6 +250,11 @@ public Optional getLayout() return layout; } + public ConnectorTableMetadata getTableMetadata() + { + return tableMetadata; + } + @Override public String toString() { @@ -252,14 +267,17 @@ public static class CreateTarget { private final OutputTableHandle handle; private final SchemaTableName schemaTableName; + private final boolean reportingWrittenBytesSupported; @JsonCreator public CreateTarget( @JsonProperty("handle") OutputTableHandle handle, - @JsonProperty("schemaTableName") SchemaTableName schemaTableName) + @JsonProperty("schemaTableName") SchemaTableName schemaTableName, + @JsonProperty("reportingWrittenBytesSupported") boolean reportingWrittenBytesSupported) { this.handle = requireNonNull(handle, "handle is null"); this.schemaTableName = requireNonNull(schemaTableName, "schemaTableName is null"); + this.reportingWrittenBytesSupported = reportingWrittenBytesSupported; } @JsonProperty @@ -274,11 +292,23 @@ public SchemaTableName getSchemaTableName() return schemaTableName; } + @JsonProperty + public boolean getReportingWrittenBytesSupported() + { + return reportingWrittenBytesSupported; + } + @Override public String toString() { return handle.toString(); } + + @Override + public boolean supportsReportingWrittenBytes(Metadata metadata, Session session) + { + return reportingWrittenBytesSupported; + } } // only used during planning -- will not be serialized @@ -309,6 +339,12 @@ public String toString() { return handle.toString(); } + + @Override + public boolean supportsReportingWrittenBytes(Metadata metadata, Session session) + { + return metadata.supportsReportingWrittenBytes(session, handle); + } } public static class InsertTarget @@ -316,14 +352,17 @@ public static class InsertTarget { private final InsertTableHandle handle; private final SchemaTableName schemaTableName; + private final boolean reportingWrittenBytesSupported; @JsonCreator public InsertTarget( @JsonProperty("handle") InsertTableHandle handle, - @JsonProperty("schemaTableName") SchemaTableName schemaTableName) + @JsonProperty("schemaTableName") SchemaTableName schemaTableName, + @JsonProperty("reportingWrittenBytesSupported") boolean reportingWrittenBytesSupported) { this.handle = requireNonNull(handle, "handle is null"); this.schemaTableName = requireNonNull(schemaTableName, "schemaTableName is null"); + this.reportingWrittenBytesSupported = reportingWrittenBytesSupported; } @JsonProperty @@ -338,11 +377,23 @@ public SchemaTableName getSchemaTableName() return schemaTableName; } + @JsonProperty + public boolean getReportingWrittenBytesSupported() + { + return reportingWrittenBytesSupported; + } + @Override public String toString() { return handle.toString(); } + + @Override + public boolean supportsReportingWrittenBytes(Metadata metadata, Session session) + { + return reportingWrittenBytesSupported; + } } public static class RefreshMaterializedViewReference @@ -379,6 +430,12 @@ public String toString() { return table.toString(); } + + @Override + public boolean supportsReportingWrittenBytes(Metadata metadata, Session session) + { + return metadata.supportsReportingWrittenBytes(session, storageTableHandle); + } } public static class RefreshMaterializedViewTarget @@ -431,6 +488,12 @@ public String toString() { return insertHandle.toString(); } + + @Override + public boolean supportsReportingWrittenBytes(Metadata metadata, Session session) + { + return metadata.supportsReportingWrittenBytes(session, tableHandle); + } } public static class DeleteTarget @@ -471,6 +534,12 @@ public String toString() { return handle.map(Object::toString).orElse("[]"); } + + @Override + public boolean supportsReportingWrittenBytes(Metadata metadata, Session session) + { + throw new UnsupportedOperationException(); + } } public static class UpdateTarget @@ -530,6 +599,12 @@ public String toString() { return handle.map(Object::toString).orElse("[]"); } + + @Override + public boolean supportsReportingWrittenBytes(Metadata metadata, Session session) + { + throw new UnsupportedOperationException(); + } } public static class TableExecuteTarget @@ -538,16 +613,19 @@ public static class TableExecuteTarget private final TableExecuteHandle executeHandle; private final Optional sourceHandle; private final SchemaTableName schemaTableName; + private final boolean reportingWrittenBytesSupported; @JsonCreator public TableExecuteTarget( @JsonProperty("executeHandle") TableExecuteHandle executeHandle, @JsonProperty("sourceHandle") Optional sourceHandle, - @JsonProperty("schemaTableName") SchemaTableName schemaTableName) + @JsonProperty("schemaTableName") SchemaTableName schemaTableName, + @JsonProperty("reportingWrittenBytesSupported") boolean reportingWrittenBytesSupported) { this.executeHandle = requireNonNull(executeHandle, "handle is null"); this.sourceHandle = requireNonNull(sourceHandle, "sourceHandle is null"); this.schemaTableName = requireNonNull(schemaTableName, "schemaTableName is null"); + this.reportingWrittenBytesSupported = reportingWrittenBytesSupported; } @JsonProperty @@ -573,10 +651,22 @@ public SchemaTableName getSchemaTableName() return schemaTableName; } + @JsonProperty + public boolean isReportingWrittenBytesSupported() + { + return reportingWrittenBytesSupported; + } + @Override public String toString() { return executeHandle.toString(); } + + @Override + public boolean supportsReportingWrittenBytes(Metadata metadata, Session session) + { + return sourceHandle.map(tableHandle -> metadata.supportsReportingWrittenBytes(session, tableHandle)).orElse(reportingWrittenBytesSupported); + } } } diff --git a/core/trino-main/src/main/java/io/trino/sql/planner/sanity/PlanSanityChecker.java b/core/trino-main/src/main/java/io/trino/sql/planner/sanity/PlanSanityChecker.java index 8d2a08fe8567..f07236617a4f 100644 --- a/core/trino-main/src/main/java/io/trino/sql/planner/sanity/PlanSanityChecker.java +++ b/core/trino-main/src/main/java/io/trino/sql/planner/sanity/PlanSanityChecker.java @@ -56,6 +56,7 @@ public PlanSanityChecker(boolean forceSingleNode) new VerifyNoFilteredAggregations(), new VerifyUseConnectorNodePartitioningSet(), new ValidateAggregationsWithDefaultValues(forceSingleNode), + new ValidateScaledWritersUsage(), new ValidateStreamingAggregations(), new ValidateLimitWithPresortedInput(), new DynamicFiltersChecker(), diff --git a/core/trino-main/src/main/java/io/trino/sql/planner/sanity/ValidateScaledWritersUsage.java b/core/trino-main/src/main/java/io/trino/sql/planner/sanity/ValidateScaledWritersUsage.java new file mode 100644 index 000000000000..55387e4cf381 --- /dev/null +++ b/core/trino-main/src/main/java/io/trino/sql/planner/sanity/ValidateScaledWritersUsage.java @@ -0,0 +1,103 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.trino.sql.planner.sanity; + +import com.google.common.collect.ImmutableList; +import io.trino.Session; +import io.trino.execution.warnings.WarningCollector; +import io.trino.sql.PlannerContext; +import io.trino.sql.planner.PartitioningHandle; +import io.trino.sql.planner.TypeAnalyzer; +import io.trino.sql.planner.TypeProvider; +import io.trino.sql.planner.plan.ExchangeNode; +import io.trino.sql.planner.plan.PlanNode; +import io.trino.sql.planner.plan.PlanVisitor; +import io.trino.sql.planner.plan.TableWriterNode; +import io.trino.sql.planner.sanity.PlanSanityChecker.Checker; + +import java.util.List; +import java.util.Objects; +import java.util.stream.Collectors; + +import static com.google.common.base.Preconditions.checkState; +import static io.trino.sql.planner.SystemPartitioningHandle.SCALED_WRITER_DISTRIBUTION; +import static java.util.Objects.requireNonNull; + +/** + * When a SCALED_WRITER_DISTRIBUTION is chosen as partitioning method then target writer should support for it. + * This validator ensure that. + */ +public class ValidateScaledWritersUsage + implements Checker +{ + @Override + public void validate( + PlanNode planNode, + Session session, + PlannerContext plannerContext, + TypeAnalyzer typeAnalyzer, + TypeProvider types, + WarningCollector warningCollector) + { + planNode.accept(new Visitor(session, plannerContext), null); + } + + private static class Visitor + extends PlanVisitor, Void> + { + private final Session session; + private final PlannerContext plannerContext; + + private Visitor(Session session, PlannerContext plannerContext) + { + this.session = requireNonNull(session, "session is null"); + this.plannerContext = requireNonNull(plannerContext, "plannerContext is null"); + } + + @Override + protected List visitPlan(PlanNode node, Void context) + { + return collectPartitioningHandles(node.getSources()); + } + + @Override + public List visitTableWriter(TableWriterNode node, Void context) + { + List children = collectPartitioningHandles(node.getSources()); + boolean anyScaledWriterDistribution = children.stream().anyMatch(partitioningHandle -> partitioningHandle == SCALED_WRITER_DISTRIBUTION); + TableWriterNode.WriterTarget target = node.getTarget(); + checkState(!anyScaledWriterDistribution || target.supportsReportingWrittenBytes(plannerContext.getMetadata(), session), + "The partitioning scheme is set to SCALED_WRITER_DISTRIBUTION but writer target %s does support for it", target); + return children; + } + + @Override + public List visitExchange(ExchangeNode node, Void context) + { + return ImmutableList.builder() + .add(node.getPartitioningScheme().getPartitioning().getHandle()) + .addAll(collectPartitioningHandles(node.getSources())) + .build(); + } + + private List collectPartitioningHandles(List nodes) + { + return nodes.stream() + .map(node -> node.accept(this, null)) + .flatMap(List::stream) + .filter(Objects::nonNull) + .collect(Collectors.toList()); + } + } +} diff --git a/core/trino-main/src/test/java/io/trino/connector/MockConnector.java b/core/trino-main/src/test/java/io/trino/connector/MockConnector.java index 453bce63a9f5..6794e961c6a8 100644 --- a/core/trino-main/src/test/java/io/trino/connector/MockConnector.java +++ b/core/trino-main/src/test/java/io/trino/connector/MockConnector.java @@ -134,6 +134,7 @@ public class MockConnector private final Optional accessControl; private final Function>> data; private final Set procedures; + private final boolean supportsReportingWrittenBytes; private final boolean allowMissingColumnsOnInsert; private final Supplier>> schemaProperties; private final Supplier>> tableProperties; @@ -169,7 +170,8 @@ public class MockConnector Set procedures, boolean allowMissingColumnsOnInsert, Supplier>> schemaProperties, - Supplier>> tableProperties) + Supplier>> tableProperties, + boolean supportsReportingWrittenBytes) { this.sessionProperties = ImmutableList.copyOf(requireNonNull(sessionProperties, "sessionProperties is null")); this.listSchemaNames = requireNonNull(listSchemaNames, "listSchemaNames is null"); @@ -198,6 +200,7 @@ public class MockConnector this.accessControl = requireNonNull(accessControl, "accessControl is null"); this.data = requireNonNull(data, "data is null"); this.procedures = requireNonNull(procedures, "procedures is null"); + this.supportsReportingWrittenBytes = supportsReportingWrittenBytes; this.allowMissingColumnsOnInsert = allowMissingColumnsOnInsert; this.schemaProperties = requireNonNull(schemaProperties, "schemaProperties is null"); this.tableProperties = requireNonNull(tableProperties, "tableProperties is null"); @@ -684,6 +687,18 @@ public void revokeTablePrivileges(ConnectorSession session, SchemaTableName tabl getMockAccessControl().revokeTablePrivileges(tableName, privileges, revokee, grantOption); } + @Override + public boolean supportsReportingWrittenBytes(ConnectorSession session, SchemaTableName schemaTableName, Map tableProperties) + { + return supportsReportingWrittenBytes; + } + + @Override + public boolean supportsReportingWrittenBytes(ConnectorSession session, ConnectorTableHandle tableHandle) + { + return supportsReportingWrittenBytes; + } + private MockConnectorAccessControl getMockAccessControl() { return (MockConnectorAccessControl) getAccessControl(); diff --git a/core/trino-main/src/test/java/io/trino/connector/MockConnectorFactory.java b/core/trino-main/src/test/java/io/trino/connector/MockConnectorFactory.java index 757b2c6b8259..cd8257789419 100644 --- a/core/trino-main/src/test/java/io/trino/connector/MockConnectorFactory.java +++ b/core/trino-main/src/test/java/io/trino/connector/MockConnectorFactory.java @@ -105,6 +105,7 @@ public class MockConnectorFactory // access control private final ListRoleGrants roleGrants; private final Optional accessControl; + private final boolean supportsReportingWrittenBytes; private MockConnectorFactory( String name, @@ -136,6 +137,7 @@ private MockConnectorFactory( Supplier>> tableProperties, Optional partitioningProvider, ListRoleGrants roleGrants, + boolean supportsReportingWrittenBytes, Optional accessControl, boolean allowMissingColumnsOnInsert) { @@ -170,6 +172,7 @@ private MockConnectorFactory( this.data = requireNonNull(data, "data is null"); this.procedures = requireNonNull(procedures, "procedures is null"); this.allowMissingColumnsOnInsert = allowMissingColumnsOnInsert; + this.supportsReportingWrittenBytes = supportsReportingWrittenBytes; } @Override @@ -211,7 +214,8 @@ public Connector create(String catalogName, Map config, Connecto procedures, allowMissingColumnsOnInsert, schemaProperties, - tableProperties); + tableProperties, + supportsReportingWrittenBytes); } public static MockConnectorFactory create() @@ -330,6 +334,7 @@ public static final class Builder private Grants tableGrants = new AllowAllGrants<>(); private Function rowFilter = tableName -> null; private BiFunction columnMask = (tableName, columnName) -> null; + private boolean supportsReportingWrittenBytes; private boolean allowMissingColumnsOnInsert; private Builder() {} @@ -555,6 +560,12 @@ public Builder withColumnMask(BiFunction tableProperties) + { + throw new UnsupportedOperationException(); + } + @Override public Optional getTableHandle(Session session, QualifiedObjectName table, Optional startVersion, Optional endVersion) { diff --git a/core/trino-main/src/test/java/io/trino/metadata/CountingAccessMetadata.java b/core/trino-main/src/test/java/io/trino/metadata/CountingAccessMetadata.java index 4a295450cc8f..5dec1b241319 100644 --- a/core/trino-main/src/test/java/io/trino/metadata/CountingAccessMetadata.java +++ b/core/trino-main/src/test/java/io/trino/metadata/CountingAccessMetadata.java @@ -847,6 +847,18 @@ public boolean isValidTableVersion(Session session, QualifiedObjectName tableNam return delegate.isValidTableVersion(session, tableName, version); } + @Override + public boolean supportsReportingWrittenBytes(Session session, TableHandle tableHandle) + { + return false; + } + + @Override + public boolean supportsReportingWrittenBytes(Session session, QualifiedObjectName tableName, Map tableProperties) + { + return false; + } + @Override public Optional getTableHandle(Session session, QualifiedObjectName tableName, Optional startVersion, Optional endVersion) { diff --git a/core/trino-main/src/test/java/io/trino/operator/TestTableWriterOperator.java b/core/trino-main/src/test/java/io/trino/operator/TestTableWriterOperator.java index 8db7a99eb719..aa2099060a6e 100644 --- a/core/trino-main/src/test/java/io/trino/operator/TestTableWriterOperator.java +++ b/core/trino-main/src/test/java/io/trino/operator/TestTableWriterOperator.java @@ -289,6 +289,7 @@ private Operator createTableWriterOperator( { List notNullColumnNames = new ArrayList<>(1); notNullColumnNames.add(null); + SchemaTableName schemaTableName = new SchemaTableName("testSchema", "testTable"); TableWriterOperatorFactory factory = new TableWriterOperatorFactory( 0, new PlanNodeId("test"), @@ -297,7 +298,8 @@ private Operator createTableWriterOperator( CONNECTOR_ID, new ConnectorTransactionHandle() {}, new ConnectorOutputTableHandle() {}), - new SchemaTableName("testSchema", "testTable")), + schemaTableName, + false), ImmutableList.of(0), notNullColumnNames, session, diff --git a/core/trino-main/src/test/java/io/trino/sql/analyzer/TestFeaturesConfig.java b/core/trino-main/src/test/java/io/trino/sql/analyzer/TestFeaturesConfig.java index eb2b3a75e9e4..66aec81395fe 100644 --- a/core/trino-main/src/test/java/io/trino/sql/analyzer/TestFeaturesConfig.java +++ b/core/trino-main/src/test/java/io/trino/sql/analyzer/TestFeaturesConfig.java @@ -40,7 +40,7 @@ public void testDefaults() .setDynamicScheduleForGroupedExecutionEnabled(false) .setConcurrentLifespansPerTask(0) .setRedistributeWrites(true) - .setScaleWriters(false) + .setScaleWriters(true) .setWriterMinSize(DataSize.of(32, MEGABYTE)) .setRegexLibrary(JONI) .setRe2JDfaStatesLimit(Integer.MAX_VALUE) @@ -77,7 +77,7 @@ public void testExplicitPropertyMappings() .put("dynamic-schedule-for-grouped-execution", "true") .put("concurrent-lifespans-per-task", "1") .put("redistribute-writes", "false") - .put("scale-writers", "true") + .put("scale-writers", "false") .put("writer-min-size", "42GB") .put("regex-library", "RE2J") .put("re2j.dfa-states-limit", "42") @@ -111,7 +111,7 @@ public void testExplicitPropertyMappings() .setDynamicScheduleForGroupedExecutionEnabled(true) .setConcurrentLifespansPerTask(1) .setRedistributeWrites(false) - .setScaleWriters(true) + .setScaleWriters(false) .setWriterMinSize(DataSize.of(42, GIGABYTE)) .setRegexLibrary(RE2J) .setRe2JDfaStatesLimit(42) diff --git a/core/trino-main/src/test/java/io/trino/sql/planner/TestingWriterTarget.java b/core/trino-main/src/test/java/io/trino/sql/planner/TestingWriterTarget.java index c31c71e1754b..ec55ce67ddab 100644 --- a/core/trino-main/src/test/java/io/trino/sql/planner/TestingWriterTarget.java +++ b/core/trino-main/src/test/java/io/trino/sql/planner/TestingWriterTarget.java @@ -14,6 +14,8 @@ package io.trino.sql.planner; +import io.trino.Session; +import io.trino.metadata.Metadata; import io.trino.sql.planner.plan.TableWriterNode; public class TestingWriterTarget @@ -24,4 +26,10 @@ public String toString() { return "testing handle"; } + + @Override + public boolean supportsReportingWrittenBytes(Metadata metadata, Session session) + { + return false; + } } diff --git a/core/trino-main/src/test/java/io/trino/sql/planner/iterative/rule/test/PlanBuilder.java b/core/trino-main/src/test/java/io/trino/sql/planner/iterative/rule/test/PlanBuilder.java index 33905baad1a6..4923e65c2dec 100644 --- a/core/trino-main/src/test/java/io/trino/sql/planner/iterative/rule/test/PlanBuilder.java +++ b/core/trino-main/src/test/java/io/trino/sql/planner/iterative/rule/test/PlanBuilder.java @@ -24,6 +24,7 @@ import io.trino.cost.PlanNodeStatsEstimate; import io.trino.metadata.IndexHandle; import io.trino.metadata.Metadata; +import io.trino.metadata.OutputTableHandle; import io.trino.metadata.ResolvedFunction; import io.trino.metadata.TableExecuteHandle; import io.trino.metadata.TableHandle; @@ -84,8 +85,10 @@ import io.trino.sql.planner.plan.TableFinishNode; import io.trino.sql.planner.plan.TableScanNode; import io.trino.sql.planner.plan.TableWriterNode; +import io.trino.sql.planner.plan.TableWriterNode.CreateTarget; import io.trino.sql.planner.plan.TableWriterNode.DeleteTarget; import io.trino.sql.planner.plan.TableWriterNode.UpdateTarget; +import io.trino.sql.planner.plan.TableWriterNode.WriterTarget; import io.trino.sql.planner.plan.TopNNode; import io.trino.sql.planner.plan.TopNRankingNode; import io.trino.sql.planner.plan.TopNRankingNode.RankingType; @@ -99,6 +102,7 @@ import io.trino.sql.tree.FunctionCall; import io.trino.sql.tree.NullLiteral; import io.trino.sql.tree.Row; +import io.trino.testing.TestingHandle; import io.trino.testing.TestingMetadata.TestingColumnHandle; import io.trino.testing.TestingMetadata.TestingTableHandle; import io.trino.testing.TestingTableExecuteHandle; @@ -687,6 +691,27 @@ public TableFinishNode tableWithExchangeDelete(SchemaTableName schemaTableName, Optional.empty()); } + public TableFinishNode tableWithExchangeCreate(WriterTarget target, PlanNode source, Symbol rowCountSymbol, PartitioningScheme partitioningScheme) + { + return new TableFinishNode( + idAllocator.getNextId(), + exchange(e -> e + .addSource(tableWriter( + ImmutableList.of(rowCountSymbol), + ImmutableList.of("column_a"), + Optional.empty(), + Optional.empty(), + target, + source, + rowCountSymbol)) + .addInputsSet(rowCountSymbol) + .partitioningScheme(partitioningScheme)), + target, + rowCountSymbol, + Optional.empty(), + Optional.empty()); + } + public TableFinishNode tableDelete(SchemaTableName schemaTableName, PlanNode deleteSource, Symbol deleteRowId) { DeleteTarget deleteTarget = deleteTarget(schemaTableName); @@ -724,6 +749,18 @@ private DeleteTarget deleteTarget(SchemaTableName schemaTableName) schemaTableName); } + public CreateTarget createTarget(CatalogName catalog, SchemaTableName schemaTableName, boolean reportingWrittenBytesSupported) + { + OutputTableHandle tableHandle = new OutputTableHandle( + catalog, + TestingConnectorTransactionHandle.INSTANCE, + TestingHandle.INSTANCE); + return new CreateTarget( + tableHandle, + schemaTableName, + reportingWrittenBytesSupported); + } + public TableFinishNode tableUpdate(SchemaTableName schemaTableName, PlanNode updateSource, Symbol updateRowId, List columnsToBeUpdated) { UpdateTarget updateTarget = updateTarget( @@ -754,11 +791,12 @@ public TableFinishNode tableUpdate(SchemaTableName schemaTableName, PlanNode upd private UpdateTarget updateTarget(SchemaTableName schemaTableName, List columnsToBeUpdated) { + TableHandle tableHandle = new TableHandle( + new CatalogName("testConnector"), + new TestingTableHandle(), + TestingTransactionHandle.create()); return new UpdateTarget( - Optional.of(new TableHandle( - new CatalogName("testConnector"), - new TestingTableHandle(), - TestingTransactionHandle.create())), + Optional.of(tableHandle), schemaTableName, columnsToBeUpdated, columnsToBeUpdated.stream() @@ -1128,6 +1166,30 @@ public TableWriterNode tableWriter(List columns, List columnName return tableWriter(columns, columnNames, Optional.empty(), Optional.empty(), Optional.empty(), Optional.empty(), source); } + public TableWriterNode tableWriter( + List columns, + List columnNames, + Optional partitioningScheme, + Optional preferredPartitioningScheme, + TableWriterNode.WriterTarget target, + PlanNode source, + Symbol rowCountSymbol) + { + return new TableWriterNode( + idAllocator.getNextId(), + source, + target, + rowCountSymbol, + rowCountSymbol, + columns, + columnNames, + ImmutableSet.of(), + partitioningScheme, + preferredPartitioningScheme, + Optional.empty(), + Optional.empty()); + } + public TableWriterNode tableWriter( List columns, List columnNames, @@ -1164,16 +1226,18 @@ public TableExecuteNode tableExecute( Optional preferredPartitioningScheme, PlanNode source) { + CatalogName catalogName = new CatalogName("testConnector"); return new TableExecuteNode( idAllocator.getNextId(), source, new TableWriterNode.TableExecuteTarget( new TableExecuteHandle( - new CatalogName("testConnector"), + catalogName, TestingTransactionHandle.create(), new TestingTableExecuteHandle()), Optional.empty(), - SchemaTableName.schemaTableName("testschema", "testtable")), + new SchemaTableName(catalogName.getCatalogName(), "tableName"), + false), symbol("partialrows", BIGINT), symbol("fragment", VARBINARY), columns, diff --git a/core/trino-main/src/test/java/io/trino/sql/planner/optimizations/TestAddExchangesScaledWriters.java b/core/trino-main/src/test/java/io/trino/sql/planner/optimizations/TestAddExchangesScaledWriters.java new file mode 100644 index 000000000000..ba9fa4415571 --- /dev/null +++ b/core/trino-main/src/test/java/io/trino/sql/planner/optimizations/TestAddExchangesScaledWriters.java @@ -0,0 +1,98 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.trino.sql.planner.optimizations; + +import com.google.common.collect.ImmutableMap; +import io.trino.Session; +import io.trino.connector.MockConnectorFactory; +import io.trino.plugin.tpch.TpchConnectorFactory; +import io.trino.sql.planner.LogicalPlanner; +import io.trino.sql.planner.SubPlan; +import io.trino.sql.planner.assertions.BasePlanTest; +import io.trino.testing.LocalQueryRunner; +import org.intellij.lang.annotations.Language; +import org.testng.annotations.DataProvider; +import org.testng.annotations.Test; + +import static io.trino.sql.planner.SystemPartitioningHandle.SCALED_WRITER_DISTRIBUTION; +import static io.trino.testing.TestingSession.testSessionBuilder; +import static org.assertj.core.api.Assertions.assertThat; + +public class TestAddExchangesScaledWriters + extends BasePlanTest +{ + @Override + protected LocalQueryRunner createLocalQueryRunner() + { + Session session = testSessionBuilder() + .setCatalog("tpch") + .setSchema("tiny") + .build(); + LocalQueryRunner queryRunner = LocalQueryRunner.create(session); + queryRunner.createCatalog("tpch", new TpchConnectorFactory(1), ImmutableMap.of()); + queryRunner.createCatalog("mock_dont_report_written_bytes", createConnectorFactorySupportingReportingBytesWritten(false, "mock_dont_report_written_bytes"), ImmutableMap.of()); + queryRunner.createCatalog("mock_report_written_bytes", createConnectorFactorySupportingReportingBytesWritten(true, "mock_report_written_bytes"), ImmutableMap.of()); + return queryRunner; + } + + private MockConnectorFactory createConnectorFactorySupportingReportingBytesWritten(boolean supportsWrittenBytes, String name) + { + MockConnectorFactory connectorFactory = MockConnectorFactory.builder() + .withSupportsReportingWrittenBytes(supportsWrittenBytes) + .withGetTableHandle(((session, schemaTableName) -> null)) + .withName(name) + .build(); + return connectorFactory; + } + + @DataProvider(name = "scale_writers") + public Object[][] prepareScaledWritersOption() + { + return new Object[][] {{true}, {false}}; + } + + @Test(dataProvider = "scale_writers") + public void testScaledWritersEnabled(boolean isScaleWritersEnabled) + { + Session session = testSessionBuilder() + .setSystemProperty("scale_writers", Boolean.toString(isScaleWritersEnabled)) + .build(); + + @Language("SQL") + String query = "CREATE TABLE mock_report_written_bytes.mock.test AS SELECT * FROM tpch.tiny.nation"; + SubPlan subPlan = subplan(query, LogicalPlanner.Stage.OPTIMIZED_AND_VALIDATED, false, session); + if (isScaleWritersEnabled) { + assertThat(subPlan.getAllFragments().get(1).getPartitioning().getConnectorHandle()).isEqualTo(SCALED_WRITER_DISTRIBUTION.getConnectorHandle()); + } + else { + subPlan.getAllFragments().forEach( + fragment -> assertThat(fragment.getPartitioning().getConnectorHandle()).isNotEqualTo(SCALED_WRITER_DISTRIBUTION.getConnectorHandle())); + } + } + + @Test(dataProvider = "scale_writers") + public void testScaledWritersDisabled(boolean isScaleWritersEnabled) + { + Session session = testSessionBuilder() + .setSystemProperty("scale_writers", Boolean.toString(isScaleWritersEnabled)) + .build(); + + @Language("SQL") + String query = "CREATE TABLE mock_dont_report_written_bytes.mock.test AS SELECT * FROM tpch.tiny.nation"; + SubPlan subPlan = subplan(query, LogicalPlanner.Stage.OPTIMIZED_AND_VALIDATED, false, session); + subPlan.getAllFragments().forEach( + fragment -> assertThat(fragment.getPartitioning().getConnectorHandle()).isNotEqualTo(SCALED_WRITER_DISTRIBUTION.getConnectorHandle())); + } +} diff --git a/core/trino-main/src/test/java/io/trino/sql/planner/optimizations/TestBeginTableWrite.java b/core/trino-main/src/test/java/io/trino/sql/planner/optimizations/TestBeginTableWrite.java index 904a9285c267..da6a06f8ce36 100644 --- a/core/trino-main/src/test/java/io/trino/sql/planner/optimizations/TestBeginTableWrite.java +++ b/core/trino-main/src/test/java/io/trino/sql/planner/optimizations/TestBeginTableWrite.java @@ -19,7 +19,9 @@ import io.trino.metadata.AbstractMockMetadata; import io.trino.metadata.Metadata; import io.trino.metadata.TableHandle; +import io.trino.metadata.TableMetadata; import io.trino.spi.connector.ColumnHandle; +import io.trino.spi.connector.ConnectorTableMetadata; import io.trino.spi.connector.SchemaTableName; import io.trino.spi.type.BigintType; import io.trino.sql.planner.PlanNodeIdAllocator; @@ -156,5 +158,13 @@ public TableHandle beginUpdate(Session session, TableHandle tableHandle, List null)) + .withName(name) + .build(); + } + + @Test + public void testScaledWritersUsedAndTargetSupportsIt() + { + PlanNode tableWriterSource = planBuilder.exchange(ex -> + ex + .partitioningScheme(new PartitioningScheme(Partitioning.create(SINGLE_DISTRIBUTION, ImmutableList.of()), ImmutableList.of(symbol))) + .addInputsSet(symbol) + .addSource(planBuilder.exchange(innerExchange -> + innerExchange + .partitioningScheme(new PartitioningScheme(Partitioning.create(SCALED_WRITER_DISTRIBUTION, ImmutableList.of()), ImmutableList.of(symbol))) + .addInputsSet(symbol) + .addSource(tableScanNode)))); + PlanNode root = planBuilder.output( + outputBuilder -> outputBuilder + .source(planBuilder.tableWithExchangeCreate( + planBuilder.createTarget(catalogSupportingScaledWriters, schemaTableName, true), + tableWriterSource, + symbol, + new PartitioningScheme(Partitioning.create(SINGLE_DISTRIBUTION, ImmutableList.of()), ImmutableList.of(symbol))))); + validatePlan(root); + } + + @Test + public void testScaledWritersUsedAndTargetDoesNotSupportIt() + { + PlanNode tableWriterSource = planBuilder.exchange(ex -> + ex + .partitioningScheme(new PartitioningScheme(Partitioning.create(SINGLE_DISTRIBUTION, ImmutableList.of()), ImmutableList.of(symbol))) + .addInputsSet(symbol) + .addSource(planBuilder.exchange(innerExchange -> + innerExchange + .partitioningScheme(new PartitioningScheme(Partitioning.create(SCALED_WRITER_DISTRIBUTION, ImmutableList.of()), ImmutableList.of(symbol))) + .addInputsSet(symbol) + .addSource(tableScanNode)))); + PlanNode root = planBuilder.output( + outputBuilder -> outputBuilder + .source(planBuilder.tableWithExchangeCreate( + planBuilder.createTarget(catalogNotSupportingScaledWriters, schemaTableName, false), + tableWriterSource, + symbol, + new PartitioningScheme(Partitioning.create(SINGLE_DISTRIBUTION, ImmutableList.of()), ImmutableList.of(symbol))))); + assertThatThrownBy(() -> validatePlan(root)) + .isInstanceOf(IllegalStateException.class) + .hasMessage("The partitioning scheme is set to SCALED_WRITER_DISTRIBUTION but writer target no_bytes_written_reported:INSTANCE does support for it"); + } + + @Test + public void testScaledWritersUsedAndTargetDoesNotSupportItMultipleSourceExchanges() + { + PlanNode tableWriterSource = planBuilder.exchange(ex -> + ex + .partitioningScheme(new PartitioningScheme(Partitioning.create(SINGLE_DISTRIBUTION, ImmutableList.of()), ImmutableList.of(symbol, symbol))) + .addInputsSet(symbol, symbol) + .addInputsSet(symbol, symbol) + .addSource(planBuilder.exchange(innerExchange -> + innerExchange + .partitioningScheme(new PartitioningScheme(Partitioning.create(SCALED_WRITER_DISTRIBUTION, ImmutableList.of()), ImmutableList.of(symbol))) + .addInputsSet(symbol) + .addSource(tableScanNode))) + .addSource(planBuilder.exchange(innerExchange -> + innerExchange + .partitioningScheme(new PartitioningScheme(Partitioning.create(SINGLE_DISTRIBUTION, ImmutableList.of()), ImmutableList.of(symbol))) + .addInputsSet(symbol) + .addSource(tableScanNode)))); + PlanNode root = planBuilder.output( + outputBuilder -> outputBuilder + .source(planBuilder.tableWithExchangeCreate( + planBuilder.createTarget(catalogNotSupportingScaledWriters, schemaTableName, false), + tableWriterSource, + symbol, + new PartitioningScheme(Partitioning.create(SINGLE_DISTRIBUTION, ImmutableList.of()), ImmutableList.of(symbol))))); + assertThatThrownBy(() -> validatePlan(root)) + .isInstanceOf(IllegalStateException.class) + .hasMessage("The partitioning scheme is set to SCALED_WRITER_DISTRIBUTION but writer target no_bytes_written_reported:INSTANCE does support for it"); + } + + @Test + public void testScaledWritersUsedAndTargetSupportsItMultipleSourceExchanges() + { + PlanNode tableWriterSource = planBuilder.exchange(ex -> + ex + .partitioningScheme(new PartitioningScheme(Partitioning.create(SINGLE_DISTRIBUTION, ImmutableList.of()), ImmutableList.of(symbol, symbol))) + .addInputsSet(symbol, symbol) + .addInputsSet(symbol, symbol) + .addSource(planBuilder.exchange(innerExchange -> + innerExchange + .partitioningScheme(new PartitioningScheme(Partitioning.create(SCALED_WRITER_DISTRIBUTION, ImmutableList.of()), ImmutableList.of(symbol))) + .addInputsSet(symbol) + .addSource(tableScanNode))) + .addSource(planBuilder.exchange(innerExchange -> + innerExchange + .partitioningScheme(new PartitioningScheme(Partitioning.create(SINGLE_DISTRIBUTION, ImmutableList.of()), ImmutableList.of(symbol))) + .addInputsSet(symbol) + .addSource(tableScanNode)))); + PlanNode root = planBuilder.output( + outputBuilder -> outputBuilder + .source(planBuilder.tableWithExchangeCreate( + planBuilder.createTarget(catalogSupportingScaledWriters, schemaTableName, true), + tableWriterSource, + symbol, + new PartitioningScheme(Partitioning.create(SINGLE_DISTRIBUTION, ImmutableList.of()), ImmutableList.of(symbol))))); + validatePlan(root); + } + + @Test + public void testScaledWritersUsedAboveTableWriterInThePlanTree() + { + PlanNode tableWriterSource = planBuilder.exchange(ex -> + ex + .partitioningScheme(new PartitioningScheme(Partitioning.create(SINGLE_DISTRIBUTION, ImmutableList.of()), ImmutableList.of(symbol))) + .addInputsSet(symbol) + .addSource(planBuilder.exchange(innerExchange -> + innerExchange + .partitioningScheme(new PartitioningScheme(Partitioning.create(SINGLE_DISTRIBUTION, ImmutableList.of()), ImmutableList.of(symbol))) + .addInputsSet(symbol) + .addSource(tableScanNode)))); + PlanNode root = planBuilder.output( + outputBuilder -> outputBuilder + .source(planBuilder.tableWithExchangeCreate( + planBuilder.createTarget(catalogNotSupportingScaledWriters, schemaTableName, false), + tableWriterSource, + symbol, + new PartitioningScheme(Partitioning.create(SCALED_WRITER_DISTRIBUTION, ImmutableList.of()), ImmutableList.of(symbol))))); + validatePlan(root); + } + + @Test + public void testScaledWritersTwoTableWritersNodes() + { + PlanNode tableWriterSource = planBuilder.exchange(ex -> + ex + .partitioningScheme(new PartitioningScheme(Partitioning.create(SINGLE_DISTRIBUTION, ImmutableList.of()), ImmutableList.of(symbol))) + .addInputsSet(symbol) + .addSource(planBuilder.tableWriter( + ImmutableList.of(symbol), + ImmutableList.of("column_a"), + Optional.empty(), + Optional.empty(), + planBuilder.createTarget(catalogSupportingScaledWriters, schemaTableName, true), + planBuilder.exchange(innerExchange -> + innerExchange + .partitioningScheme(new PartitioningScheme(Partitioning.create(SCALED_WRITER_DISTRIBUTION, ImmutableList.of()), ImmutableList.of(symbol))) + .addInputsSet(symbol) + .addSource(tableScanNode)), + symbol))); + PlanNode root = planBuilder.output( + outputBuilder -> outputBuilder + .source(planBuilder.tableWithExchangeCreate( + planBuilder.createTarget(catalogNotSupportingScaledWriters, schemaTableName, false), + tableWriterSource, + symbol, + new PartitioningScheme(Partitioning.create(SINGLE_DISTRIBUTION, ImmutableList.of()), ImmutableList.of(symbol))))); + assertThatThrownBy(() -> validatePlan(root)) + .isInstanceOf(IllegalStateException.class) + .hasMessage("The partitioning scheme is set to SCALED_WRITER_DISTRIBUTION but writer target no_bytes_written_reported:INSTANCE does support for it"); + } + + private void validatePlan(PlanNode root) + { + queryRunner.inTransaction(session -> { + // metadata.getCatalogHandle() registers the catalog for the transaction + plannerContext.getMetadata().getCatalogHandle(session, catalogSupportingScaledWriters.getCatalogName()); + plannerContext.getMetadata().getCatalogHandle(session, catalogNotSupportingScaledWriters.getCatalogName()); + new ValidateScaledWritersUsage().validate( + root, + session, + plannerContext, + createTestingTypeAnalyzer(plannerContext), + TypeProvider.empty(), + WarningCollector.NOOP); + return null; + }); + } +} diff --git a/core/trino-spi/src/main/java/io/trino/spi/connector/ConnectorMetadata.java b/core/trino-spi/src/main/java/io/trino/spi/connector/ConnectorMetadata.java index 5fadbe9ff5a7..29e453ae5638 100644 --- a/core/trino-spi/src/main/java/io/trino/spi/connector/ConnectorMetadata.java +++ b/core/trino-spi/src/main/java/io/trino/spi/connector/ConnectorMetadata.java @@ -1370,4 +1370,14 @@ default boolean isSupportedVersionType(ConnectorSession session, SchemaTableName { throw new TrinoException(NOT_SUPPORTED, "This connector does not support versioned tables"); } + + default boolean supportsReportingWrittenBytes(ConnectorSession session, SchemaTableName schemaTableName, Map tableProperties) + { + return false; + } + + default boolean supportsReportingWrittenBytes(ConnectorSession session, ConnectorTableHandle connectorTableHandle) + { + return false; + } } diff --git a/lib/trino-plugin-toolkit/src/main/java/io/trino/plugin/base/classloader/ClassLoaderSafeConnectorMetadata.java b/lib/trino-plugin-toolkit/src/main/java/io/trino/plugin/base/classloader/ClassLoaderSafeConnectorMetadata.java index 3607c9e62a9b..85393f7ba9fa 100644 --- a/lib/trino-plugin-toolkit/src/main/java/io/trino/plugin/base/classloader/ClassLoaderSafeConnectorMetadata.java +++ b/lib/trino-plugin-toolkit/src/main/java/io/trino/plugin/base/classloader/ClassLoaderSafeConnectorMetadata.java @@ -1062,4 +1062,20 @@ public boolean isSupportedVersionType(ConnectorSession session, SchemaTableName return delegate.isSupportedVersionType(session, tableName, pointerType, versioning); } } + + @Override + public boolean supportsReportingWrittenBytes(ConnectorSession session, SchemaTableName schemaTableName, Map tableProperties) + { + try (ThreadContextClassLoader ignored = new ThreadContextClassLoader(classLoader)) { + return delegate.supportsReportingWrittenBytes(session, schemaTableName, tableProperties); + } + } + + @Override + public boolean supportsReportingWrittenBytes(ConnectorSession session, ConnectorTableHandle connectorTableHandle) + { + try (ThreadContextClassLoader ignored = new ThreadContextClassLoader(classLoader)) { + return delegate.supportsReportingWrittenBytes(session, connectorTableHandle); + } + } } diff --git a/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/DeltaLakeMetadata.java b/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/DeltaLakeMetadata.java index 2c6fc91d1ae9..6f28c46ef5fd 100644 --- a/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/DeltaLakeMetadata.java +++ b/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/DeltaLakeMetadata.java @@ -1925,6 +1925,18 @@ public void finishStatisticsCollection(ConnectorSession session, ConnectorTableH statisticsAccess.updateExtendedStatistics(session, location, mergedExtendedStatistics); } + @Override + public boolean supportsReportingWrittenBytes(ConnectorSession session, ConnectorTableHandle connectorTableHandle) + { + return true; + } + + @Override + public boolean supportsReportingWrittenBytes(ConnectorSession session, SchemaTableName fullTableName, Map tableProperties) + { + return true; + } + private void cleanExtraOutputFiles(ConnectorSession session, String baseLocation, List validDataFiles) { Set writtenFilePaths = validDataFiles.stream() diff --git a/plugin/trino-delta-lake/src/test/java/io/trino/plugin/deltalake/AbstractTestDeltaLakeCreateTableStatistics.java b/plugin/trino-delta-lake/src/test/java/io/trino/plugin/deltalake/AbstractTestDeltaLakeCreateTableStatistics.java index 2ff6e2a1a952..7d68e6b48cb3 100644 --- a/plugin/trino-delta-lake/src/test/java/io/trino/plugin/deltalake/AbstractTestDeltaLakeCreateTableStatistics.java +++ b/plugin/trino-delta-lake/src/test/java/io/trino/plugin/deltalake/AbstractTestDeltaLakeCreateTableStatistics.java @@ -16,6 +16,7 @@ import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableMap; import com.google.common.collect.ImmutableSet; +import io.trino.Session; import io.trino.plugin.deltalake.transactionlog.AddFileEntry; import io.trino.plugin.deltalake.transactionlog.TransactionLogAccess; import io.trino.plugin.deltalake.transactionlog.checkpoint.CheckpointSchemaManager; @@ -62,6 +63,7 @@ import static io.trino.spi.type.TimestampWithTimeZoneType.TIMESTAMP_TZ_MILLIS; import static io.trino.spi.type.VarcharType.createUnboundedVarcharType; import static io.trino.testing.TestingConnectorSession.SESSION; +import static io.trino.testing.TestingSession.testSessionBuilder; import static io.trino.testing.assertions.Assert.assertEventually; import static io.trino.testing.sql.TestTable.randomTableSuffix; import static java.lang.Double.NEGATIVE_INFINITY; @@ -442,10 +444,17 @@ public void testMultiFileTable() throws Exception { DeltaLakeColumnHandle columnHandle = new DeltaLakeColumnHandle("name", createUnboundedVarcharType(), REGULAR); + Session session = testSessionBuilder() + .setCatalog(DELTA_CATALOG) + .setSystemProperty("scale_writers", "false") + .setSchema(SCHEMA) + .build(); try (TestTable table = new TestTable( "test_partitioned_table_", ImmutableList.of(), - "SELECT name FROM tpch.tiny.nation UNION select name from tpch.tiny.customer")) { + ImmutableList.of(), + "SELECT name FROM tpch.tiny.nation UNION select name from tpch.tiny.customer", + session)) { List addFileEntries = getAddFileEntries(table.getName()); assertThat(addFileEntries.size()).isGreaterThan(1); @@ -468,10 +477,17 @@ public void testMultiFileTableWithNaNValue() assertEventually(() -> { String columnName = "orderkey"; DeltaLakeColumnHandle columnHandle = new DeltaLakeColumnHandle(columnName, DoubleType.DOUBLE, REGULAR); + Session session = testSessionBuilder() + .setCatalog(DELTA_CATALOG) + .setSchema(SCHEMA) + .setSystemProperty("scale_writers", "false") + .build(); try (TestTable table = new TestTable( "test_partitioned_table_", ImmutableList.of(columnName), - "SELECT IF(orderkey = 50597, nan(), CAST(orderkey AS double)) FROM tpch.tiny.orders")) { + ImmutableList.of(), + "SELECT IF(orderkey = 50597, nan(), CAST(orderkey AS double)) FROM tpch.tiny.orders", + session)) { List addFileEntries = getAddFileEntries(table.getName()); assertThat(addFileEntries.size()).isGreaterThan(1); @@ -495,17 +511,22 @@ public TestTable(String name, List columnNames, String values) this(name, columnNames, ImmutableList.of(), values); } - public TestTable(String name, List columnNames, List partitionNames, String values) + public TestTable(String name, List columnNames, List partitionNames, String values, Session session) { this.name = name + randomTableSuffix(); String columns = columnNames.isEmpty() ? "" : "(" + String.join(",", columnNames) + ")"; String partitionedBy = partitionNames.isEmpty() ? "" : format(", partitioned_by = ARRAY[%s]", partitionNames.stream().map(partitionName -> "'" + partitionName + "'").collect(Collectors.joining(","))); - computeActual(format("CREATE TABLE %s %s WITH (location = 's3://%s/%1$s' %s) AS %s", + computeActual(session, format("CREATE TABLE %s %s WITH (location = 's3://%s/%1$s' %s) AS %s", this.name, columns, bucketName, partitionedBy, values)); } + public TestTable(String name, List columnNames, List partitionNames, String values) + { + this(name, columnNames, partitionNames, values, getSession()); + } + public String getName() { return name; diff --git a/plugin/trino-hive/src/main/java/io/trino/plugin/hive/HiveMetadata.java b/plugin/trino-hive/src/main/java/io/trino/plugin/hive/HiveMetadata.java index 537c13d76b52..ec0d2aa2052e 100644 --- a/plugin/trino-hive/src/main/java/io/trino/plugin/hive/HiveMetadata.java +++ b/plugin/trino-hive/src/main/java/io/trino/plugin/hive/HiveMetadata.java @@ -3592,4 +3592,16 @@ private static boolean isQueryPartitionFilterRequiredForTable(ConnectorSession s return isQueryPartitionFilterRequired(session) && requiredSchemas.isEmpty() || requiredSchemas.contains(schemaTableName.getSchemaName()); } + + @Override + public boolean supportsReportingWrittenBytes(ConnectorSession session, ConnectorTableHandle connectorTableHandle) + { + return true; + } + + @Override + public boolean supportsReportingWrittenBytes(ConnectorSession session, SchemaTableName schemaTableName, Map tableProperties) + { + return true; + } } diff --git a/plugin/trino-hive/src/test/java/io/trino/plugin/hive/BaseHiveConnectorTest.java b/plugin/trino-hive/src/test/java/io/trino/plugin/hive/BaseHiveConnectorTest.java index a9cbcb5072ac..5c72f82aa21a 100644 --- a/plugin/trino-hive/src/test/java/io/trino/plugin/hive/BaseHiveConnectorTest.java +++ b/plugin/trino-hive/src/test/java/io/trino/plugin/hive/BaseHiveConnectorTest.java @@ -1796,6 +1796,7 @@ protected void testTargetMaxFileSize(int expectedTableWriters) // verify the default behavior is one file per node Session session = Session.builder(getSession()) .setSystemProperty("task_writer_count", "1") + .setSystemProperty("scale_writers", "false") .build(); assertUpdate(session, createTableSql, 1000000); assertThat(computeActual(selectFileInfo).getRowCount()).isEqualTo(expectedTableWriters); @@ -1838,6 +1839,7 @@ protected void testTargetMaxFileSizePartitioned(int expectedTableWriters) // verify the default behavior is one file per node per partition Session session = Session.builder(getSession()) .setSystemProperty("task_writer_count", "1") + .setSystemProperty("scale_writers", "false") .build(); assertUpdate(session, createTableSql, 1000000); assertThat(computeActual(selectFileInfo).getRowCount()).isEqualTo(expectedTableWriters * 3); diff --git a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergMetadata.java b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergMetadata.java index 3be20b50f75c..c3b07bddf2a4 100644 --- a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergMetadata.java +++ b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergMetadata.java @@ -1762,6 +1762,18 @@ else if (strings.size() != 2) { return new MaterializedViewFreshness(true); } + @Override + public boolean supportsReportingWrittenBytes(ConnectorSession session, ConnectorTableHandle connectorTableHandle) + { + return true; + } + + @Override + public boolean supportsReportingWrittenBytes(ConnectorSession session, SchemaTableName fullTableName, Map tableProperties) + { + return true; + } + @Override public void setColumnComment(ConnectorSession session, ConnectorTableHandle tableHandle, ColumnHandle column, Optional comment) { diff --git a/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/BaseIcebergConnectorTest.java b/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/BaseIcebergConnectorTest.java index 2621040007ce..6d25ea0d9b9e 100644 --- a/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/BaseIcebergConnectorTest.java +++ b/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/BaseIcebergConnectorTest.java @@ -80,6 +80,8 @@ import static com.google.common.collect.MoreCollectors.onlyElement; import static io.trino.SystemSessionProperties.JOIN_DISTRIBUTION_TYPE; import static io.trino.SystemSessionProperties.PREFERRED_WRITE_PARTITIONING_MIN_NUMBER_OF_PARTITIONS; +import static io.trino.SystemSessionProperties.SCALE_WRITERS; +import static io.trino.SystemSessionProperties.TASK_WRITER_COUNT; import static io.trino.plugin.hive.HdfsEnvironment.HdfsContext; import static io.trino.plugin.hive.HiveTestUtils.HDFS_ENVIRONMENT; import static io.trino.plugin.iceberg.IcebergFileFormat.ORC; @@ -2542,26 +2544,33 @@ public void testAllAvailableTypes() @Test public void testLocalDynamicFilteringWithSelectiveBuildSizeJoin() { - long fullTableScan = (Long) computeActual("SELECT count(*) FROM lineitem").getOnlyValue(); + // We need to prepare tables for this test. The test is required to use tables that are backed by at lest two files + Session session = Session.builder(getSession()) + .setSystemProperty(TASK_WRITER_COUNT, "2") + .build(); + getQueryRunner().execute(session, format("CREATE TABLE IF NOT EXISTS %s AS SELECT * FROM %s", "linetime_multiple_file_backed", "tpch.tiny.lineitem")).getMaterializedRows(); + getQueryRunner().execute(session, format("CREATE TABLE IF NOT EXISTS %s AS SELECT * FROM %s", "orders_multiple_file_backed", "tpch.tiny.orders")).getMaterializedRows(); + + long fullTableScan = (Long) computeActual("SELECT count(*) FROM linetime_multiple_file_backed").getOnlyValue(); // Pick a value for totalprice where file level stats will not be able to filter out any data // This assumes the totalprice ranges in every file have some overlap, otherwise this test will fail. - MaterializedRow range = getOnlyElement(computeActual("SELECT max(lower_bounds[4]), min(upper_bounds[4]) FROM \"orders$files\"").getMaterializedRows()); + MaterializedRow range = getOnlyElement(computeActual("SELECT max(lower_bounds[4]), min(upper_bounds[4]) FROM \"orders_multiple_file_backed$files\"").getMaterializedRows()); double totalPrice = (Double) computeActual(format( - "SELECT totalprice FROM orders WHERE totalprice > %s AND totalprice < %s LIMIT 1", + "SELECT totalprice FROM orders_multiple_file_backed WHERE totalprice > %s AND totalprice < %s LIMIT 1", range.getField(0), range.getField(1))) .getOnlyValue(); - Session session = Session.builder(getSession()) + session = Session.builder(getSession()) .setSystemProperty(JOIN_DISTRIBUTION_TYPE, BROADCAST.name()) .build(); ResultWithQueryId result = getDistributedQueryRunner().executeWithQueryId( session, - "SELECT * FROM lineitem JOIN orders ON lineitem.orderkey = orders.orderkey AND orders.totalprice = " + totalPrice); + "SELECT * FROM linetime_multiple_file_backed JOIN orders_multiple_file_backed ON linetime_multiple_file_backed.orderkey = orders_multiple_file_backed.orderkey AND orders_multiple_file_backed.totalprice = " + totalPrice); OperatorStats probeStats = searchScanFilterAndProjectOperatorStats( result.getQueryId(), - new QualifiedObjectName(ICEBERG_CATALOG, "tpch", "lineitem")); + new QualifiedObjectName(ICEBERG_CATALOG, "tpch", "linetime_multiple_file_backed")); // Assert some lineitem rows were filtered out on file level assertThat(probeStats.getInputPositions()).isLessThan(fullTableScan); @@ -2623,6 +2632,7 @@ private void testStatsBasedRepartitionData(boolean ctas) .build(); Session sessionRepartitionMany = Session.builder(getSession()) .setSystemProperty(PREFERRED_WRITE_PARTITIONING_MIN_NUMBER_OF_PARTITIONS, "5") + .setSystemProperty(SCALE_WRITERS, "false") .build(); // Use DISTINCT to add data redistribution between source table and the writer. This makes it more likely that all writers get some data. String sourceRelation = "(SELECT DISTINCT orderkey, custkey, orderstatus FROM tpch.tiny.orders)"; diff --git a/testing/trino-testing/src/main/java/io/trino/testing/BaseConnectorTest.java b/testing/trino-testing/src/main/java/io/trino/testing/BaseConnectorTest.java index 2d7fa696e657..ec18a1fbe108 100644 --- a/testing/trino-testing/src/main/java/io/trino/testing/BaseConnectorTest.java +++ b/testing/trino-testing/src/main/java/io/trino/testing/BaseConnectorTest.java @@ -15,10 +15,12 @@ import com.google.common.base.Stopwatch; import com.google.common.collect.ImmutableList; +import com.google.common.collect.ImmutableMap; import com.google.common.collect.ImmutableSet; import com.google.common.util.concurrent.UncheckedTimeoutException; import io.airlift.units.Duration; import io.trino.Session; +import io.trino.connector.CatalogName; import io.trino.cost.StatsAndCosts; import io.trino.dispatcher.DispatchManager; import io.trino.execution.QueryInfo; @@ -101,6 +103,7 @@ import static io.trino.testing.assertions.Assert.assertEquals; import static io.trino.testing.assertions.Assert.assertEventually; import static io.trino.testing.sql.TestTable.randomTableSuffix; +import static io.trino.transaction.TransactionBuilder.transaction; import static java.lang.String.format; import static java.lang.String.join; import static java.lang.Thread.currentThread; @@ -2401,6 +2404,34 @@ protected String errorMessageForInsertNegativeDate(String date) throw new UnsupportedOperationException("This method should be overridden"); } + protected boolean isReportingWrittenBytesSupported(Session session) + { + CatalogName catalogName = session.getCatalog() + .map(CatalogName::new) + .orElseThrow(); + Metadata metadata = getQueryRunner().getMetadata(); + metadata.getCatalogHandle(session, catalogName.getCatalogName()); + QualifiedObjectName fullTableName = new QualifiedObjectName(catalogName.getCatalogName(), "any", "any"); + return getQueryRunner().getMetadata().supportsReportingWrittenBytes(session, fullTableName, ImmutableMap.of()); + } + + @Test + public void isReportingWrittenBytesSupported() + { + transaction(getQueryRunner().getTransactionManager(), getQueryRunner().getAccessControl()) + .singleStatement() + .execute(getSession(), (Consumer) session -> skipTestUnless(isReportingWrittenBytesSupported(session))); + + @Language("SQL") + String query = "CREATE TABLE temp AS SELECT * FROM tpch.tiny.nation"; + + assertQueryStats( + getSession(), + query, + queryStats -> assertThat(queryStats.getPhysicalWrittenDataSize().toBytes()).isGreaterThan(0L), + results -> {}); + } + @Test public void testInsertIntoNotNullColumn() {