Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion core/trino-main/src/main/java/io/trino/FeaturesConfig.java
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,7 @@ public class FeaturesConfig
private boolean dynamicScheduleForGroupedExecution;
private int concurrentLifespansPerTask;
private boolean redistributeWrites = true;
private boolean scaleWriters;
private boolean scaleWriters = true;
private DataSize writerMinSize = DataSize.of(32, DataSize.Unit.MEGABYTE);
private DataIntegrityVerification exchangeDataIntegrityVerification = DataIntegrityVerification.ABORT;
private boolean exchangeCompressionEnabled;
Expand Down
10 changes: 10 additions & 0 deletions core/trino-main/src/main/java/io/trino/metadata/Metadata.java
Original file line number Diff line number Diff line change
Expand Up @@ -701,6 +701,16 @@ default boolean isMaterializedView(Session session, QualifiedObjectName viewName
*/
boolean isValidTableVersion(Session session, QualifiedObjectName tableName, TableVersion version);

/**
* Returns true if the connector reports number of written bytes for an existing table. Otherwise, it returns false.
*/
boolean supportsReportingWrittenBytes(Session session, TableHandle tableHandle);

/**
* Returns true if the connector reports number of written bytes for a new table. Otherwise, it returns false.
*/
boolean supportsReportingWrittenBytes(Session session, QualifiedObjectName tableName, Map<String, Object> tableProperties);

/**
* Returns a table handle for the specified table name with a specified version
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2403,6 +2403,21 @@ public boolean isValidTableVersion(Session session, QualifiedObjectName tableNam
return metadata.isSupportedVersionType(session.toConnectorSession(), tableName.asSchemaTableName(), version.getPointerType(), version.getObjectType());
}

@Override
public boolean supportsReportingWrittenBytes(Session session, QualifiedObjectName tableName, Map<String, Object> tableProperties)
{
CatalogName catalogName = new CatalogName(tableName.getCatalogName());
ConnectorMetadata metadata = getMetadata(session, catalogName);
return metadata.supportsReportingWrittenBytes(session.toConnectorSession(catalogName), tableName.asSchemaTableName(), tableProperties);
}

@Override
public boolean supportsReportingWrittenBytes(Session session, TableHandle tableHandle)
{
ConnectorMetadata metadata = getMetadata(session, tableHandle.getCatalogName());
return metadata.supportsReportingWrittenBytes(session.toConnectorSession(tableHandle.getCatalogName()), tableHandle.getConnectorHandle());
}

private Optional<ConnectorTableVersion> toConnectorVersion(Optional<TableVersion> version)
{
Optional<ConnectorTableVersion> connectorVersion = Optional.empty();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -399,7 +399,6 @@ private RelationPlan createTableCreationPlan(Analysis analysis, Query query)
.collect(toImmutableList());

TableStatisticsMetadata statisticsMetadata = metadata.getStatisticsCollectionMetadataForWrite(session, destination.getCatalogName(), tableMetadata);

return createTableWriterPlan(
analysis,
plan.getRoot(),
Expand Down Expand Up @@ -856,7 +855,8 @@ private RelationPlan createTableExecutePlan(Analysis analysis, TableExecute stat
.map(ColumnMetadata::getName)
.collect(toImmutableList());

TableWriterNode.TableExecuteTarget tableExecuteTarget = new TableWriterNode.TableExecuteTarget(executeHandle, Optional.empty(), tableName.asSchemaTableName());
boolean supportsReportingWrittenBytes = metadata.supportsReportingWrittenBytes(session, tableHandle);
TableWriterNode.TableExecuteTarget tableExecuteTarget = new TableWriterNode.TableExecuteTarget(executeHandle, Optional.empty(), tableName.asSchemaTableName(), supportsReportingWrittenBytes);

Optional<TableLayout> layout = metadata.getLayoutForTableExecute(session, executeHandle);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -592,13 +592,13 @@ public PlanWithProperties visitRefreshMaterializedView(RefreshMaterializedViewNo
@Override
public PlanWithProperties visitTableWriter(TableWriterNode node, PreferredProperties preferredProperties)
{
return visitTableWriter(node, node.getPartitioningScheme(), node.getSource(), preferredProperties);
return visitTableWriter(node, node.getPartitioningScheme(), node.getSource(), preferredProperties, node.getTarget());
}

@Override
public PlanWithProperties visitTableExecute(TableExecuteNode node, PreferredProperties preferredProperties)
{
return visitTableWriter(node, node.getPartitioningScheme(), node.getSource(), preferredProperties);
return visitTableWriter(node, node.getPartitioningScheme(), node.getSource(), preferredProperties, node.getTarget());
}

@Override
Expand All @@ -611,12 +611,12 @@ public PlanWithProperties visitSimpleTableExecuteNode(SimpleTableExecuteNode nod
.build());
}

private PlanWithProperties visitTableWriter(PlanNode node, Optional<PartitioningScheme> partitioningScheme, PlanNode source, PreferredProperties preferredProperties)
private PlanWithProperties visitTableWriter(PlanNode node, Optional<PartitioningScheme> partitioningScheme, PlanNode source, PreferredProperties preferredProperties, TableWriterNode.WriterTarget writerTarget)
{
PlanWithProperties newSource = source.accept(this, preferredProperties);

if (partitioningScheme.isEmpty()) {
if (scaleWriters) {
if (scaleWriters && writerTarget.supportsReportingWrittenBytes(plannerContext.getMetadata(), session)) {
partitioningScheme = Optional.of(new PartitioningScheme(Partitioning.create(SCALED_WRITER_DISTRIBUTION, ImmutableList.of()), newSource.getNode().getOutputSymbols()));
}
else if (redistributeWrites) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -237,7 +237,8 @@ public WriterTarget getWriterTarget(PlanNode node)
return new TableExecuteTarget(
target.getExecuteHandle(),
findTableScanHandleForTableExecute(((TableExecuteNode) node).getSource()),
target.getSchemaTableName());
target.getSchemaTableName(),
target.isReportingWrittenBytesSupported());
}
if (node instanceof ExchangeNode || node instanceof UnionNode) {
Set<WriterTarget> writerTargets = node.getSources().stream()
Expand All @@ -254,11 +255,11 @@ private WriterTarget createWriterTarget(WriterTarget target)
// TODO: we shouldn't need to store the schemaTableName in the handles, but there isn't a good way to pass this around with the current architecture
if (target instanceof CreateReference) {
CreateReference create = (CreateReference) target;
return new CreateTarget(metadata.beginCreateTable(session, create.getCatalog(), create.getTableMetadata(), create.getLayout()), create.getTableMetadata().getTable());
return new CreateTarget(metadata.beginCreateTable(session, create.getCatalog(), create.getTableMetadata(), create.getLayout()), create.getTableMetadata().getTable(), target.supportsReportingWrittenBytes(metadata, session));
}
if (target instanceof InsertReference) {
InsertReference insert = (InsertReference) target;
return new InsertTarget(metadata.beginInsert(session, insert.getHandle(), insert.getColumns()), metadata.getTableMetadata(session, insert.getHandle()).getTable());
return new InsertTarget(metadata.beginInsert(session, insert.getHandle(), insert.getColumns()), metadata.getTableMetadata(session, insert.getHandle()).getTable(), target.supportsReportingWrittenBytes(metadata, session));
}
if (target instanceof DeleteTarget) {
DeleteTarget delete = (DeleteTarget) target;
Expand All @@ -285,8 +286,7 @@ private WriterTarget createWriterTarget(WriterTarget target)
if (target instanceof TableExecuteTarget) {
TableExecuteTarget tableExecute = (TableExecuteTarget) target;
BeginTableExecuteResult<TableExecuteHandle, TableHandle> result = metadata.beginTableExecute(session, tableExecute.getExecuteHandle(), tableExecute.getMandatorySourceHandle());

return new TableExecuteTarget(result.getTableExecuteHandle(), Optional.of(result.getSourceHandle()), tableExecute.getSchemaTableName());
return new TableExecuteTarget(result.getTableExecuteHandle(), Optional.of(result.getSourceHandle()), tableExecute.getSchemaTableName(), tableExecute.isReportingWrittenBytesSupported());
}
throw new IllegalArgumentException("Unhandled target type: " + target.getClass().getSimpleName());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,11 @@
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableSet;
import com.google.common.collect.Iterables;
import io.trino.Session;
import io.trino.metadata.InsertTableHandle;
import io.trino.metadata.Metadata;
import io.trino.metadata.OutputTableHandle;
import io.trino.metadata.QualifiedObjectName;
import io.trino.metadata.TableExecuteHandle;
import io.trino.metadata.TableHandle;
import io.trino.metadata.TableLayout;
Expand Down Expand Up @@ -208,6 +211,8 @@ public abstract static class WriterTarget
{
@Override
public abstract String toString();

public abstract boolean supportsReportingWrittenBytes(Metadata metadata, Session session);
}

// only used during planning -- will not be serialized
Expand All @@ -230,16 +235,26 @@ public String getCatalog()
return catalog;
}

public ConnectorTableMetadata getTableMetadata()
@Override
public boolean supportsReportingWrittenBytes(Metadata metadata, Session session)
{
return tableMetadata;
QualifiedObjectName fullTableName = new QualifiedObjectName(
catalog,
tableMetadata.getTableSchema().getTable().getSchemaName(),
tableMetadata.getTableSchema().getTable().getTableName());
return metadata.supportsReportingWrittenBytes(session, fullTableName, tableMetadata.getProperties());
}

public Optional<TableLayout> getLayout()
{
return layout;
}

public ConnectorTableMetadata getTableMetadata()
{
return tableMetadata;
}

@Override
public String toString()
{
Expand All @@ -252,14 +267,17 @@ public static class CreateTarget
{
private final OutputTableHandle handle;
private final SchemaTableName schemaTableName;
private final boolean reportingWrittenBytesSupported;

@JsonCreator
public CreateTarget(
@JsonProperty("handle") OutputTableHandle handle,
@JsonProperty("schemaTableName") SchemaTableName schemaTableName)
@JsonProperty("schemaTableName") SchemaTableName schemaTableName,
@JsonProperty("reportingWrittenBytesSupported") boolean reportingWrittenBytesSupported)
{
this.handle = requireNonNull(handle, "handle is null");
this.schemaTableName = requireNonNull(schemaTableName, "schemaTableName is null");
this.reportingWrittenBytesSupported = reportingWrittenBytesSupported;
}

@JsonProperty
Expand All @@ -274,11 +292,23 @@ public SchemaTableName getSchemaTableName()
return schemaTableName;
}

@JsonProperty
public boolean getReportingWrittenBytesSupported()
{
return reportingWrittenBytesSupported;
}

@Override
public String toString()
{
return handle.toString();
}

@Override
public boolean supportsReportingWrittenBytes(Metadata metadata, Session session)
{
return reportingWrittenBytesSupported;
}
}

// only used during planning -- will not be serialized
Expand Down Expand Up @@ -309,21 +339,30 @@ public String toString()
{
return handle.toString();
}

@Override
public boolean supportsReportingWrittenBytes(Metadata metadata, Session session)
{
return metadata.supportsReportingWrittenBytes(session, handle);
}
}

public static class InsertTarget
extends WriterTarget
{
private final InsertTableHandle handle;
private final SchemaTableName schemaTableName;
private final boolean reportingWrittenBytesSupported;

@JsonCreator
public InsertTarget(
@JsonProperty("handle") InsertTableHandle handle,
@JsonProperty("schemaTableName") SchemaTableName schemaTableName)
@JsonProperty("schemaTableName") SchemaTableName schemaTableName,
@JsonProperty("reportingWrittenBytesSupported") boolean reportingWrittenBytesSupported)
{
this.handle = requireNonNull(handle, "handle is null");
this.schemaTableName = requireNonNull(schemaTableName, "schemaTableName is null");
this.reportingWrittenBytesSupported = reportingWrittenBytesSupported;
}

@JsonProperty
Expand All @@ -338,11 +377,23 @@ public SchemaTableName getSchemaTableName()
return schemaTableName;
}

@JsonProperty
public boolean getReportingWrittenBytesSupported()
{
return reportingWrittenBytesSupported;
}

@Override
public String toString()
{
return handle.toString();
}

@Override
public boolean supportsReportingWrittenBytes(Metadata metadata, Session session)
{
return reportingWrittenBytesSupported;
}
}

public static class RefreshMaterializedViewReference
Expand Down Expand Up @@ -379,6 +430,12 @@ public String toString()
{
return table.toString();
}

@Override
public boolean supportsReportingWrittenBytes(Metadata metadata, Session session)
{
return metadata.supportsReportingWrittenBytes(session, storageTableHandle);
}
}

public static class RefreshMaterializedViewTarget
Expand Down Expand Up @@ -431,6 +488,12 @@ public String toString()
{
return insertHandle.toString();
}

@Override
public boolean supportsReportingWrittenBytes(Metadata metadata, Session session)
{
return metadata.supportsReportingWrittenBytes(session, tableHandle);
}
}

public static class DeleteTarget
Expand Down Expand Up @@ -471,6 +534,12 @@ public String toString()
{
return handle.map(Object::toString).orElse("[]");
}

@Override
public boolean supportsReportingWrittenBytes(Metadata metadata, Session session)
{
throw new UnsupportedOperationException();
}
}

public static class UpdateTarget
Expand Down Expand Up @@ -530,6 +599,12 @@ public String toString()
{
return handle.map(Object::toString).orElse("[]");
}

@Override
public boolean supportsReportingWrittenBytes(Metadata metadata, Session session)
{
throw new UnsupportedOperationException();
}
}

public static class TableExecuteTarget
Expand All @@ -538,16 +613,19 @@ public static class TableExecuteTarget
private final TableExecuteHandle executeHandle;
private final Optional<TableHandle> sourceHandle;
private final SchemaTableName schemaTableName;
private final boolean reportingWrittenBytesSupported;

@JsonCreator
public TableExecuteTarget(
@JsonProperty("executeHandle") TableExecuteHandle executeHandle,
@JsonProperty("sourceHandle") Optional<TableHandle> sourceHandle,
@JsonProperty("schemaTableName") SchemaTableName schemaTableName)
@JsonProperty("schemaTableName") SchemaTableName schemaTableName,
@JsonProperty("reportingWrittenBytesSupported") boolean reportingWrittenBytesSupported)
{
this.executeHandle = requireNonNull(executeHandle, "handle is null");
this.sourceHandle = requireNonNull(sourceHandle, "sourceHandle is null");
this.schemaTableName = requireNonNull(schemaTableName, "schemaTableName is null");
this.reportingWrittenBytesSupported = reportingWrittenBytesSupported;
}

@JsonProperty
Expand All @@ -573,10 +651,22 @@ public SchemaTableName getSchemaTableName()
return schemaTableName;
}

@JsonProperty
public boolean isReportingWrittenBytesSupported()
{
return reportingWrittenBytesSupported;
}

@Override
public String toString()
{
return executeHandle.toString();
}

@Override
public boolean supportsReportingWrittenBytes(Metadata metadata, Session session)
{
return sourceHandle.map(tableHandle -> metadata.supportsReportingWrittenBytes(session, tableHandle)).orElse(reportingWrittenBytesSupported);
Comment thread
radek-kondziolka marked this conversation as resolved.
Outdated
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@ public PlanSanityChecker(boolean forceSingleNode)
new VerifyNoFilteredAggregations(),
new VerifyUseConnectorNodePartitioningSet(),
new ValidateAggregationsWithDefaultValues(forceSingleNode),
new ValidateScaledWritersUsage(),
new ValidateStreamingAggregations(),
new ValidateLimitWithPresortedInput(),
new DynamicFiltersChecker(),
Expand Down
Loading