Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -2611,9 +2611,9 @@ public ConnectorDeleteTableHandle beginDelete(ConnectorSession session, Connecto
}

@Override
public ColumnHandle getDeleteRowIdColumnHandle(ConnectorSession session, ConnectorTableHandle tableHandle)
public Optional<ColumnHandle> getDeleteRowIdColumn(ConnectorSession session, ConnectorTableHandle tableHandle)
{
return updateRowIdHandle();
return Optional.of(updateRowIdHandle());
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -695,9 +695,9 @@ private void handleFinishData(CommitTaskData task, Table icebergTable, Partition
}

@Override
public ColumnHandle getDeleteRowIdColumnHandle(ConnectorSession session, ConnectorTableHandle tableHandle)
public Optional<ColumnHandle> getDeleteRowIdColumn(ConnectorSession session, ConnectorTableHandle tableHandle)
{
return IcebergColumnHandle.create(ROW_POSITION, typeManager, REGULAR);
return Optional.of(IcebergColumnHandle.create(ROW_POSITION, typeManager, REGULAR));
}

@Override
Expand Down Expand Up @@ -1271,7 +1271,7 @@ else if (tableVersion.getVersionExpressionType() instanceof VarcharType) {
* @return A column handle for the Row ID update column.
*/
@Override
public ColumnHandle getUpdateRowIdColumnHandle(ConnectorSession session, ConnectorTableHandle tableHandle, List<ColumnHandle> updatedColumns)
public Optional<ColumnHandle> getUpdateRowIdColumn(ConnectorSession session, ConnectorTableHandle tableHandle, List<ColumnHandle> updatedColumns)
{
List<NestedField> unmodifiedColumns = new ArrayList<>();
unmodifiedColumns.add(ROW_POSITION);
Expand All @@ -1287,7 +1287,7 @@ public ColumnHandle getUpdateRowIdColumnHandle(ConnectorSession session, Connect
}
}
NestedField field = NestedField.required(UPDATE_ROW_DATA.getId(), UPDATE_ROW_DATA.getColumnName(), Types.StructType.of(unmodifiedColumns));
return IcebergColumnHandle.create(field, typeManager, SYNTHESIZED);
return Optional.of(IcebergColumnHandle.create(field, typeManager, SYNTHESIZED));
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -368,9 +368,9 @@ public Optional<ConnectorOutputMetadata> finishCreateTable(ConnectorSession sess
}

@Override
public ColumnHandle getDeleteRowIdColumnHandle(ConnectorSession session, ConnectorTableHandle tableHandle)
public Optional<ColumnHandle> getDeleteRowIdColumn(ConnectorSession session, ConnectorTableHandle tableHandle)
{
return KuduColumnHandle.ROW_ID_HANDLE;
return Optional.of(KuduColumnHandle.ROW_ID_HANDLE);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -361,15 +361,15 @@ public Optional<ConnectorOutputMetadata> finishInsert(
}

@Override
public ColumnHandle getDeleteRowIdColumnHandle(Session session, TableHandle tableHandle)
public Optional<ColumnHandle> getDeleteRowIdColumn(Session session, TableHandle tableHandle)
{
return delegate.getDeleteRowIdColumnHandle(session, tableHandle);
return delegate.getDeleteRowIdColumn(session, tableHandle);
}

@Override
public ColumnHandle getUpdateRowIdColumnHandle(Session session, TableHandle tableHandle, List<ColumnHandle> updatedColumns)
public Optional<ColumnHandle> getUpdateRowIdColumn(Session session, TableHandle tableHandle, List<ColumnHandle> updatedColumns)
{
return delegate.getUpdateRowIdColumnHandle(session, tableHandle, updatedColumns);
return delegate.getUpdateRowIdColumn(session, tableHandle, updatedColumns);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -309,12 +309,12 @@ public interface Metadata
/**
* Get the row ID column handle used with UpdatablePageSource#deleteRows.
*/
ColumnHandle getDeleteRowIdColumnHandle(Session session, TableHandle tableHandle);
Optional<ColumnHandle> getDeleteRowIdColumn(Session session, TableHandle tableHandle);

/**
* Get the row ID column handle used with UpdatablePageSource.
*/
ColumnHandle getUpdateRowIdColumnHandle(Session session, TableHandle tableHandle, List<ColumnHandle> updatedColumns);
Optional<ColumnHandle> getUpdateRowIdColumn(Session session, TableHandle tableHandle, List<ColumnHandle> updatedColumns);

/**
* @return whether delete without table scan is supported
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -886,19 +886,19 @@ public Optional<ConnectorOutputMetadata> finishInsert(Session session, InsertTab
}

@Override
public ColumnHandle getDeleteRowIdColumnHandle(Session session, TableHandle tableHandle)
public Optional<ColumnHandle> getDeleteRowIdColumn(Session session, TableHandle tableHandle)
{
ConnectorId connectorId = tableHandle.getConnectorId();
ConnectorMetadata metadata = getMetadata(session, connectorId);
return metadata.getDeleteRowIdColumnHandle(session.toConnectorSession(connectorId), tableHandle.getConnectorHandle());
return metadata.getDeleteRowIdColumn(session.toConnectorSession(connectorId), tableHandle.getConnectorHandle());
}

@Override
public ColumnHandle getUpdateRowIdColumnHandle(Session session, TableHandle tableHandle, List<ColumnHandle> updatedColumns)
public Optional<ColumnHandle> getUpdateRowIdColumn(Session session, TableHandle tableHandle, List<ColumnHandle> updatedColumns)
{
ConnectorId connectorId = tableHandle.getConnectorId();
ConnectorMetadata metadata = getMetadata(session, connectorId);
return metadata.getUpdateRowIdColumnHandle(session.toConnectorSession(connectorId), tableHandle.getConnectorHandle(), updatedColumns);
return metadata.getUpdateRowIdColumn(session.toConnectorSession(connectorId), tableHandle.getConnectorHandle(), updatedColumns);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2929,8 +2929,10 @@ public PhysicalOperation visitTableFinish(TableFinishNode node, LocalExecutionPl
public PhysicalOperation visitDelete(DeleteNode node, LocalExecutionPlanContext context)
{
PhysicalOperation source = node.getSource().accept(this, context);

OperatorFactory operatorFactory = new DeleteOperatorFactory(context.getNextOperatorId(), node.getId(), source.getLayout().get(node.getRowId()), tableCommitContextCodec);
if (!node.getRowId().isPresent()) {
throw new PrestoException(NOT_SUPPORTED, "DELETE is not supported by this connector");
}
OperatorFactory operatorFactory = new DeleteOperatorFactory(context.getNextOperatorId(), node.getId(), source.getLayout().get(node.getRowId().get()), tableCommitContextCodec);

Map<VariableReferenceExpression, Integer> layout = ImmutableMap.<VariableReferenceExpression, Integer>builder()
.put(node.getOutputVariables().get(0), 0)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -253,8 +253,6 @@ public DeleteNode plan(Delete node)
{
RelationType descriptor = analysis.getOutputDescriptor(node.getTable());
TableHandle handle = analysis.getTableHandle(node.getTable());
ColumnHandle rowIdHandle = metadata.getDeleteRowIdColumnHandle(session, handle);
Type rowIdType = metadata.getColumnMetadata(session, handle, rowIdHandle).getType();

// add table columns
ImmutableList.Builder<VariableReferenceExpression> outputVariablesBuilder = ImmutableList.builder();
Expand All @@ -268,11 +266,16 @@ public DeleteNode plan(Delete node)
}

// add rowId column
Field rowIdField = Field.newUnqualified(node.getLocation(), Optional.empty(), rowIdType);
VariableReferenceExpression rowIdVariable = variableAllocator.newVariable(getSourceLocation(node), "$rowId", rowIdField.getType());
outputVariablesBuilder.add(rowIdVariable);
columns.put(rowIdVariable, rowIdHandle);
fields.add(rowIdField);
Optional<ColumnHandle> rowIdHandle = metadata.getDeleteRowIdColumn(session, handle);
Optional<Field> rowIdField = Optional.empty();
if (rowIdHandle.isPresent()) {
Type rowIdType = metadata.getColumnMetadata(session, handle, rowIdHandle.get()).getType();
rowIdField = Optional.of(Field.newUnqualified(node.getLocation(), Optional.empty(), rowIdType));
VariableReferenceExpression rowIdVariable = variableAllocator.newVariable(getSourceLocation(node), "$rowId", rowIdType);
outputVariablesBuilder.add(rowIdVariable);
columns.put(rowIdVariable, rowIdHandle.get());
fields.add(rowIdField.get());
}

// create table scan
List<VariableReferenceExpression> outputVariables = outputVariablesBuilder.build();
Expand All @@ -290,12 +293,14 @@ public DeleteNode plan(Delete node)
}

// create delete node
VariableReferenceExpression rowId = new VariableReferenceExpression(Optional.empty(), builder.translate(new FieldReference(relationPlan.getDescriptor().indexOf(rowIdField))).getName(), rowIdField.getType());
PlanBuilder finalBuilder = builder;
Optional<VariableReferenceExpression> rowId = rowIdField.map(f ->
new VariableReferenceExpression(Optional.empty(), finalBuilder.translate(new FieldReference(relationPlan.getDescriptor().indexOf(f))).getName(), f.getType()));
List<VariableReferenceExpression> deleteNodeOutputVariables = ImmutableList.of(
variableAllocator.newVariable("partialrows", BIGINT),
variableAllocator.newVariable("fragment", VARBINARY));

return new DeleteNode(getSourceLocation(node), idAllocator.getNextId(), builder.getRoot(), rowId, deleteNodeOutputVariables, Optional.empty());
return new DeleteNode(getSourceLocation(node), idAllocator.getNextId(), finalBuilder.getRoot(), rowId, deleteNodeOutputVariables, Optional.empty());
}

public UpdateNode plan(Update node)
Expand All @@ -312,8 +317,6 @@ public UpdateNode plan(Update node)
.map(Map.Entry::getValue)
.collect(toImmutableList());
handle = metadata.beginUpdate(session, handle, updatedColumns);
ColumnHandle rowIdHandle = metadata.getUpdateRowIdColumnHandle(session, handle, updatedColumns);
Type rowIdType = metadata.getColumnMetadata(session, handle, rowIdHandle).getType();

List<String> targetColumnNames = node.getAssignments().stream()
.map(assignment -> assignment.getName().getValue())
Expand All @@ -338,11 +341,16 @@ public UpdateNode plan(Update node)
List<Expression> orderedColumnValues = orderedColumnValuesBuilder.build();

// add rowId column
Field rowIdField = Field.newUnqualified(node.getLocation(), Optional.empty(), rowIdType);
VariableReferenceExpression rowIdVariable = variableAllocator.newVariable(getSourceLocation(node), "$rowId", rowIdField.getType());
outputVariablesBuilder.add(rowIdVariable);
columns.put(rowIdVariable, rowIdHandle);
fields.add(rowIdField);
Optional<ColumnHandle> rowIdHandle = metadata.getUpdateRowIdColumn(session, handle, updatedColumns);
Optional<Field> rowIdField = Optional.empty();
if (rowIdHandle.isPresent()) {
Type rowIdType = metadata.getColumnMetadata(session, handle, rowIdHandle.get()).getType();
rowIdField = Optional.of(Field.newUnqualified(node.getLocation(), Optional.empty(), rowIdType));
VariableReferenceExpression rowIdVariable = variableAllocator.newVariable(getSourceLocation(node), "$rowId", rowIdType);
outputVariablesBuilder.add(rowIdVariable);
columns.put(rowIdVariable, rowIdHandle.get());
fields.add(rowIdField.get());
}

// create table scan
List<VariableReferenceExpression> outputVariables = outputVariablesBuilder.build();
Expand All @@ -365,8 +373,10 @@ public UpdateNode plan(Update node)

ImmutableList.Builder<VariableReferenceExpression> updatedColumnValuesBuilder = ImmutableList.builder();
orderedColumnValues.forEach(columnValue -> updatedColumnValuesBuilder.add(planAndMappings.get(columnValue)));
VariableReferenceExpression rowId = new VariableReferenceExpression(Optional.empty(), builder.translate(new FieldReference(relationPlan.getDescriptor().indexOf(rowIdField))).getName(), rowIdField.getType());
updatedColumnValuesBuilder.add(rowId);
PlanBuilder finalBuilder = builder;
Optional<VariableReferenceExpression> rowId = rowIdField.map(f ->
new VariableReferenceExpression(Optional.empty(), finalBuilder.translate(new FieldReference(relationPlan.getDescriptor().indexOf(f))).getName(), f.getType()));
rowId.ifPresent(r -> updatedColumnValuesBuilder.add(r));

List<VariableReferenceExpression> outputs = ImmutableList.of(
variableAllocator.newVariable("partialrows", BIGINT),
Expand All @@ -379,7 +389,7 @@ public UpdateNode plan(Update node)
return new UpdateNode(
getSourceLocation(node),
idAllocator.getNextId(),
builder.getRoot(),
finalBuilder.getRoot(),
rowId,
updatedColumnValuesBuilder.build(),
outputs);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -816,7 +816,7 @@ public PlanNode visitTableFinish(TableFinishNode node, RewriteContext<Set<Variab
public PlanNode visitDelete(DeleteNode node, RewriteContext<Set<VariableReferenceExpression>> context)
{
ImmutableSet.Builder<VariableReferenceExpression> builder = ImmutableSet.builder();
builder.add(node.getRowId());
node.getRowId().ifPresent(r -> builder.add(r));
if (node.getInputDistribution().isPresent()) {
builder.addAll(node.getInputDistribution().get().getInputVariables());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -420,7 +420,7 @@ public PlanNode visitDelete(DeleteNode node, RewriteContext<Context> context)
if (node.getInputDistribution().isPresent()) {
context.get().variables.addAll(node.getInputDistribution().get().getInputVariables());
}
context.get().variables.add(node.getRowId());
node.getRowId().ifPresent(r -> context.get().variables.add(r));
return context.defaultRewrite(node, context.get());
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ public class UpdateNode
extends InternalPlanNode
{
private final PlanNode source;
private final VariableReferenceExpression rowId;
private final Optional<VariableReferenceExpression> rowId;
private final List<VariableReferenceExpression> columnValueAndRowIdSymbols;
private final List<VariableReferenceExpression> outputVariables;

Expand All @@ -43,7 +43,7 @@ public UpdateNode(
Optional<SourceLocation> sourceLocation,
@JsonProperty("id") PlanNodeId id,
@JsonProperty("source") PlanNode source,
@JsonProperty("rowId") VariableReferenceExpression rowId,
@JsonProperty("rowId") Optional<VariableReferenceExpression> rowId,
@JsonProperty("columnValueAndRowIdSymbols") List<VariableReferenceExpression> columnValueAndRowIdSymbols,
@JsonProperty("outputVariables") List<VariableReferenceExpression> outputVariables)
{
Expand All @@ -55,7 +55,7 @@ public UpdateNode(
PlanNodeId id,
Optional<PlanNode> statsEquivalentPlanNode,
PlanNode source,
VariableReferenceExpression rowId,
Optional<VariableReferenceExpression> rowId,
List<VariableReferenceExpression> columnValueAndRowIdSymbols,
List<VariableReferenceExpression> outputVariables)
{
Expand All @@ -74,7 +74,7 @@ public PlanNode getSource()
}

@JsonProperty
public VariableReferenceExpression getRowId()
public Optional<VariableReferenceExpression> getRowId()
{
return rowId;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -614,7 +614,9 @@ public Void visitDelete(DeleteNode node, Set<VariableReferenceExpression> boundV
PlanNode source = node.getSource();
source.accept(this, boundVariables); // visit child

checkArgument(source.getOutputVariables().contains(node.getRowId()), "Invalid node. Row ID symbol (%s) is not in source plan output (%s)", node.getRowId(), node.getSource().getOutputVariables());
node.getRowId().ifPresent(rowid ->
checkArgument(source.getOutputVariables().contains(rowid),
"Invalid node. Row ID symbol (%s) is not in source plan output (%s)", rowid, node.getSource().getOutputVariables()));

return null;
}
Expand All @@ -624,7 +626,8 @@ public Void visitUpdate(UpdateNode node, Set<VariableReferenceExpression> boundV
{
PlanNode source = node.getSource();
source.accept(this, boundVariables); // visit child
checkArgument(source.getOutputVariables().contains(node.getRowId()), "Invalid node. Row ID symbol (%s) is not in source plan output (%s)", node.getRowId(), node.getSource().getOutputVariables());
node.getRowId().ifPresent(r ->
checkArgument(source.getOutputVariables().contains(r), "Invalid node. Row ID symbol (%s) is not in source plan output (%s)", node.getRowId(), node.getSource().getOutputVariables()));
checkArgument(source.getOutputVariables().containsAll(node.getColumnValueAndRowIdSymbols()), "Invalid node. Some UPDATE SET expression symbols (%s) are not contained in the outputSymbols (%s)", node.getColumnValueAndRowIdSymbols(), source.getOutputVariables());

return null;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -394,13 +394,13 @@ public Optional<ConnectorOutputMetadata> finishInsert(Session session, InsertTab
}

@Override
public ColumnHandle getDeleteRowIdColumnHandle(Session session, TableHandle tableHandle)
public Optional<ColumnHandle> getDeleteRowIdColumn(Session session, TableHandle tableHandle)
{
throw new UnsupportedOperationException();
}

@Override
public ColumnHandle getUpdateRowIdColumnHandle(Session session, TableHandle tableHandle, List<ColumnHandle> updatedColumns)
public Optional<ColumnHandle> getUpdateRowIdColumn(Session session, TableHandle tableHandle, List<ColumnHandle> updatedColumns)
{
throw new UnsupportedOperationException();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -590,7 +590,7 @@ public TableFinishNode tableDelete(SchemaTableName schemaTableName, PlanNode del
deleteSource.getSourceLocation(),
idAllocator.getNextId(),
deleteSource,
deleteRowId,
Optional.of(deleteRowId),
ImmutableList.of(deleteRowId),
Optional.empty()))
.addInputsSet(deleteRowId)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -943,7 +943,7 @@ void from_json(const json& j, DeleteHandle& p);
namespace facebook::presto::protocol {
struct DeleteNode : public PlanNode {
std::shared_ptr<PlanNode> source = {};
VariableReferenceExpression rowId = {};
std::shared_ptr<VariableReferenceExpression> rowId = {};
List<VariableReferenceExpression> outputVariables = {};
std::shared_ptr<InputDistribution> inputDistribution = {};

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -105,7 +105,8 @@ TEST_F(DeleteTest, jsonRoundtrip) {
ASSERT_EQ(d.outputVariables[0].name, "$row_group_id");
ASSERT_EQ(d.outputVariables[1].name, "$row_number");

ASSERT_EQ(d.rowId.name, "$rowid");
ASSERT_NE(d.rowId, nullptr);
ASSERT_EQ(d.rowId->name, "$rowid");

testJsonRoundtrip(j, d);
}
Loading
Loading