Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import com.facebook.presto.spi.ColumnHandle;
import com.facebook.presto.spi.ColumnMetadata;
import com.facebook.presto.spi.ConnectorId;
import com.facebook.presto.spi.NewTableLayout;
import com.facebook.presto.spi.TableHandle;
import com.facebook.presto.spi.analyzer.AccessControlInfo;
import com.facebook.presto.spi.analyzer.AccessControlInfoForTable;
Expand All @@ -32,12 +33,14 @@
import com.facebook.presto.spi.function.FunctionKind;
import com.facebook.presto.spi.function.table.Argument;
import com.facebook.presto.spi.function.table.ConnectorTableFunctionHandle;
import com.facebook.presto.spi.plan.PartitioningHandle;
import com.facebook.presto.spi.security.AccessControl;
import com.facebook.presto.spi.security.AccessControlContext;
import com.facebook.presto.spi.security.AllowAllAccessControl;
import com.facebook.presto.spi.security.Identity;
import com.facebook.presto.sql.tree.ExistsPredicate;
import com.facebook.presto.sql.tree.Expression;
import com.facebook.presto.sql.tree.FieldReference;
import com.facebook.presto.sql.tree.FunctionCall;
import com.facebook.presto.sql.tree.GroupingOperation;
import com.facebook.presto.sql.tree.Identifier;
Expand Down Expand Up @@ -187,6 +190,7 @@ public class Analysis
private Optional<TableHandle> analyzeTarget = Optional.empty();

private Optional<List<ColumnMetadata>> updatedColumns = Optional.empty();
private Optional<MergeAnalysis> mergeAnalysis = Optional.empty();

// for describe input and describe output
private final boolean isDescribe;
Expand Down Expand Up @@ -221,6 +225,9 @@ public class Analysis
private final Set<NodeRef<Relation>> aliasedRelations = new LinkedHashSet<>();
private final Set<NodeRef<TableFunctionInvocation>> polymorphicTableFunctions = new LinkedHashSet<>();

// Row id field used for MERGE INTO command.
private final Map<NodeRef<Table>, FieldReference> rowIdField = new LinkedHashMap<>();

public Analysis(@Nullable Statement root, Map<NodeRef<Parameter>, Expression> parameters, boolean isDescribe)
{
this.root = root;
Expand Down Expand Up @@ -433,6 +440,16 @@ public Expression getJoinCriteria(Join join)
return joins.get(NodeRef.of(join));
}

public void setRowIdField(Table table, FieldReference field)
{
rowIdField.put(NodeRef.of(table), field);
}

public FieldReference getRowIdField(Table table)
{
return rowIdField.get(NodeRef.of(table));
}

public void recordSubqueries(Node node, ExpressionAnalysis expressionAnalysis)
{
NodeRef<Node> key = NodeRef.of(node);
Expand Down Expand Up @@ -726,6 +743,16 @@ public Optional<List<ColumnMetadata>> getUpdatedColumns()
return updatedColumns;
}

public Optional<MergeAnalysis> getMergeAnalysis()
{
return mergeAnalysis;
}

public void setMergeAnalysis(MergeAnalysis mergeAnalysis)
{
this.mergeAnalysis = Optional.of(mergeAnalysis);
}

public void setRefreshMaterializedViewAnalysis(RefreshMaterializedViewAnalysis refreshMaterializedViewAnalysis)
{
this.refreshMaterializedViewAnalysis = Optional.of(refreshMaterializedViewAnalysis);
Expand Down Expand Up @@ -1694,4 +1721,108 @@ public ConnectorTransactionHandle getTransactionHandle()
return transactionHandle;
}
}

public static class MergeAnalysis
{
private final Table targetTable;
private final List<ColumnMetadata> targetColumnsMetadata;
private final List<ColumnHandle> targetColumnHandles;
private final List<ColumnHandle> targetRedistributionColumnHandles;
private final List<List<ColumnHandle>> mergeCaseColumnHandles;
private final Set<ColumnHandle> nonNullableColumnHandles;
private final Map<ColumnHandle, Integer> columnHandleFieldNumbers;
private final List<Integer> insertPartitioningArgumentIndexes;
private final Optional<NewTableLayout> insertLayout;
private final Optional<PartitioningHandle> updateLayout;
private final Scope targetTableScope;
private final Scope joinScope;

public MergeAnalysis(
Table targetTable,
List<ColumnMetadata> targetColumnsMetadata,
List<ColumnHandle> targetColumnHandles,
List<ColumnHandle> targetRedistributionColumnHandles,
List<List<ColumnHandle>> mergeCaseColumnHandles,
Set<ColumnHandle> nonNullableTargetColumnHandles,
Map<ColumnHandle, Integer> targetColumnHandleFieldNumbers,
List<Integer> insertPartitioningArgumentIndexes,
Optional<NewTableLayout> insertLayout,
Optional<PartitioningHandle> updateLayout,
Scope targetTableScope,
Scope joinScope)
{
this.targetTable = requireNonNull(targetTable, "targetTable is null");
this.targetColumnsMetadata = requireNonNull(targetColumnsMetadata, "targetColumnsMetadata is null");
this.targetColumnHandles = requireNonNull(targetColumnHandles, "targetColumnHandles is null");
this.targetRedistributionColumnHandles = requireNonNull(targetRedistributionColumnHandles, "targetRedistributionColumnHandles is null");
this.mergeCaseColumnHandles = requireNonNull(mergeCaseColumnHandles, "mergeCaseColumnHandles is null");
this.nonNullableColumnHandles = requireNonNull(nonNullableTargetColumnHandles, "nonNullableTargetColumnHandles is null");
this.columnHandleFieldNumbers = requireNonNull(targetColumnHandleFieldNumbers, "targetColumnHandleFieldNumbers is null");
this.insertLayout = requireNonNull(insertLayout, "insertLayout is null");
this.updateLayout = requireNonNull(updateLayout, "updateLayout is null");
this.insertPartitioningArgumentIndexes = (requireNonNull(insertPartitioningArgumentIndexes, "insertPartitioningArgumentIndexes is null"));
this.targetTableScope = requireNonNull(targetTableScope, "targetTableScope is null");
this.joinScope = requireNonNull(joinScope, "joinScope is null");
}

public Table getTargetTable()
{
return targetTable;
}

public List<ColumnMetadata> getTargetColumnsMetadata()
{
return targetColumnsMetadata;
}

public List<ColumnHandle> getTargetColumnHandles()
{
return targetColumnHandles;
}

public List<ColumnHandle> getTargetRedistributionColumnHandles()
{
return targetRedistributionColumnHandles;
}

public List<List<ColumnHandle>> getMergeCaseColumnHandles()
{
return mergeCaseColumnHandles;
}

public Set<ColumnHandle> getNonNullableColumnHandles()
{
return nonNullableColumnHandles;
}

public Map<ColumnHandle, Integer> getColumnHandleFieldNumbers()
{
return columnHandleFieldNumbers;
}

public List<Integer> getInsertPartitioningArgumentIndexes()
{
return insertPartitioningArgumentIndexes;
}

public Optional<NewTableLayout> getInsertLayout()
{
return insertLayout;
}

public Optional<PartitioningHandle> getUpdateLayout()
{
return updateLayout;
}

public Scope getJoinScope()
{
return joinScope;
}

public Scope getTargetTableScope()
{
return targetTableScope;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import com.facebook.presto.spi.connector.ConnectorTransactionHandle;

import java.util.List;
import java.util.Optional;
import java.util.function.ToIntFunction;

import static com.facebook.presto.spi.StandardErrorCode.NOT_SUPPORTED;
Expand All @@ -43,10 +44,10 @@ public BlackHoleNodePartitioningProvider(NodeManager nodeManager)
}

@Override
public ConnectorBucketNodeMap getBucketNodeMap(ConnectorTransactionHandle transactionHandle, ConnectorSession session, ConnectorPartitioningHandle partitioningHandle, List<Node> sortedNodes)
public Optional<ConnectorBucketNodeMap> getBucketNodeMap(ConnectorTransactionHandle transactionHandle, ConnectorSession session, ConnectorPartitioningHandle partitioningHandle, List<Node> sortedNodes)
{
// create one bucket per node
return createBucketNodeMap(nodeManager.getRequiredWorkerNodes().size());
return Optional.of(createBucketNodeMap(nodeManager.getRequiredWorkerNodes().size()));
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -357,6 +357,17 @@ public Page getLoadedPage(int... channels)
return wrapBlocksWithoutCopy(positionCount, blocks);
}

public Page getColumns(int... columns)
{
requireNonNull(columns, "columns is null");

Block[] blocks = new Block[columns.length];
for (int i = 0; i < columns.length; i++) {
blocks[i] = this.blocks[columns[i]];
}
return wrapBlocksWithoutCopy(positionCount, blocks);
}

@Override
public String toString()
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -400,4 +400,20 @@ default long toLong(int position)
{
throw new UnsupportedOperationException(getClass().getName());
}

/**
* Returns the underlying value block underlying this block.
*/
default Block getUnderlyingValueBlock()
{
return this;
}

/**
* Returns the position in the underlying value block corresponding to the specified position in this block.
*/
default int getUnderlyingValuePosition(int position)
{
return position;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -307,4 +307,27 @@ static Block[] ensureBlocksAreLoaded(Block[] blocks)
// No newly loaded blocks
return blocks;
}

static boolean[] copyIsNullAndAppendNull(@Nullable boolean[] isNull, int offsetBase, int positionCount)
{
int desiredLength = offsetBase + positionCount + 1;
boolean[] newIsNull = new boolean[desiredLength];
if (isNull != null) {
checkArrayRange(isNull, offsetBase, positionCount);
System.arraycopy(isNull, 0, newIsNull, 0, desiredLength - 1);
}
// mark the last element to append null
newIsNull[desiredLength - 1] = true;
return newIsNull;
}

static int[] copyOffsetsAndAppendNull(int[] offsets, int offsetBase, int positionCount)
{
int desiredLength = offsetBase + positionCount + 2;
checkArrayRange(offsets, offsetBase, positionCount + 1);
int[] newOffsets = Arrays.copyOf(offsets, desiredLength);
// Null element does not move the offset forward
newOffsets[desiredLength - 1] = newOffsets[desiredLength - 2];
return newOffsets;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -508,6 +508,45 @@ public Block getLoadedBlock()
return new DictionaryBlock(idsOffset, getPositionCount(), loadedDictionary, ids, false, randomDictionaryId());
}

@Override
public Block getUnderlyingValueBlock()
{
return dictionary.getUnderlyingValueBlock();
}

@Override
public int getUnderlyingValuePosition(int position)
{
return dictionary.getUnderlyingValuePosition(getId(position));
}

public Block createProjection(Block newDictionary)
{
if (newDictionary.getPositionCount() != dictionary.getPositionCount()) {
throw new IllegalArgumentException("newDictionary must have the same position count");
}

// if the new dictionary is lazy be careful to not materialize it
if (newDictionary instanceof LazyBlock) {
return new LazyBlock(positionCount, (block) -> {
Block newDictionaryBlock = newDictionary.getBlock(0);
Block newBlock = createProjection(newDictionaryBlock);
block.setBlock(newBlock);
});
}
if (newDictionary instanceof RunLengthEncodedBlock) {
RunLengthEncodedBlock rle = (RunLengthEncodedBlock) newDictionary;
return new RunLengthEncodedBlock(rle.getValue(), positionCount);
}

// unwrap dictionary in dictionary
int[] newIds = new int[positionCount];
for (int position = 0; position < positionCount; position++) {
newIds[position] = newDictionary.getUnderlyingValuePosition(getIdUnchecked(position));
}
return new DictionaryBlock(0, positionCount, newDictionary.getUnderlyingValueBlock(), newIds, false, randomDictionaryId());
}

public Block getDictionary()
{
return dictionary;
Expand All @@ -533,6 +572,11 @@ public int getId(int position)
return ids[position + idsOffset];
}

private int getIdUnchecked(int position)
{
return ids[position + idsOffset];
}

public DictionaryId getDictionarySourceId()
{
return dictionarySourceId;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -389,6 +389,18 @@ public boolean isNullUnchecked(int internalPosition)
return block.isNull(internalPosition);
}

@Override
public Block getUnderlyingValueBlock()
{
return block.getUnderlyingValueBlock();
}

@Override
public int getUnderlyingValuePosition(int position)
{
return block.getUnderlyingValuePosition(position);
}

@Override
public Block appendNull()
{
Expand Down
Loading
Loading