Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import com.facebook.presto.spi.ColumnMetadata;
import com.facebook.presto.spi.ConnectorId;
import com.facebook.presto.spi.MaterializedViewDefinition;
import com.facebook.presto.spi.NewTableLayout;
import com.facebook.presto.spi.TableHandle;
import com.facebook.presto.spi.analyzer.AccessControlInfo;
import com.facebook.presto.spi.analyzer.AccessControlInfoForTable;
Expand All @@ -34,13 +35,15 @@
import com.facebook.presto.spi.function.FunctionKind;
import com.facebook.presto.spi.function.table.Argument;
import com.facebook.presto.spi.function.table.ConnectorTableFunctionHandle;
import com.facebook.presto.spi.plan.PartitioningHandle;
import com.facebook.presto.spi.procedure.DistributedProcedure;
import com.facebook.presto.spi.security.AccessControl;
import com.facebook.presto.spi.security.AccessControlContext;
import com.facebook.presto.spi.security.AllowAllAccessControl;
import com.facebook.presto.spi.security.Identity;
import com.facebook.presto.sql.tree.ExistsPredicate;
import com.facebook.presto.sql.tree.Expression;
import com.facebook.presto.sql.tree.FieldReference;
import com.facebook.presto.sql.tree.FunctionCall;
import com.facebook.presto.sql.tree.GroupingOperation;
import com.facebook.presto.sql.tree.Identifier;
Expand Down Expand Up @@ -197,6 +200,7 @@ public class Analysis
private Optional<TableHandle> analyzeTarget = Optional.empty();

private Optional<List<ColumnMetadata>> updatedColumns = Optional.empty();
private Optional<MergeAnalysis> mergeAnalysis = Optional.empty();

// for describe input and describe output
private final boolean isDescribe;
Expand Down Expand Up @@ -233,6 +237,9 @@ public class Analysis
private final Set<NodeRef<Relation>> aliasedRelations = new LinkedHashSet<>();
private final Set<NodeRef<TableFunctionInvocation>> polymorphicTableFunctions = new LinkedHashSet<>();

// Row id field used for MERGE INTO command.
private final Map<NodeRef<Table>, FieldReference> rowIdField = new LinkedHashMap<>();

public Analysis(@Nullable Statement root, Map<NodeRef<Parameter>, Expression> parameters, boolean isDescribe)
{
this.root = root;
Expand Down Expand Up @@ -445,6 +452,16 @@ public Expression getJoinCriteria(Join join)
return joins.get(NodeRef.of(join));
}

public void setRowIdField(Table table, FieldReference field)
{
rowIdField.put(NodeRef.of(table), field);
}

public FieldReference getRowIdField(Table table)
{
return rowIdField.get(NodeRef.of(table));
}

public void recordSubqueries(Node node, ExpressionAnalysis expressionAnalysis)
{
NodeRef<Node> key = NodeRef.of(node);
Expand Down Expand Up @@ -778,6 +795,16 @@ public Optional<List<ColumnMetadata>> getUpdatedColumns()
return updatedColumns;
}

public Optional<MergeAnalysis> getMergeAnalysis()
{
return mergeAnalysis;
}

public void setMergeAnalysis(MergeAnalysis mergeAnalysis)
{
this.mergeAnalysis = Optional.of(mergeAnalysis);
}

public void setRefreshMaterializedViewAnalysis(RefreshMaterializedViewAnalysis refreshMaterializedViewAnalysis)
{
this.refreshMaterializedViewAnalysis = Optional.of(refreshMaterializedViewAnalysis);
Expand Down Expand Up @@ -1817,4 +1844,108 @@ public ConnectorTransactionHandle getTransactionHandle()
return transactionHandle;
}
}

public static class MergeAnalysis
{
private final Table targetTable;
private final List<ColumnMetadata> targetColumnsMetadata;
private final List<ColumnHandle> targetColumnHandles;
private final List<ColumnHandle> targetRedistributionColumnHandles;
private final List<List<ColumnHandle>> mergeCaseColumnHandles;
private final Set<ColumnHandle> nonNullableColumnHandles;
private final Map<ColumnHandle, Integer> columnHandleFieldNumbers;
private final List<Integer> insertPartitioningArgumentIndexes;
private final Optional<NewTableLayout> insertLayout;
private final Optional<PartitioningHandle> updateLayout;
private final Scope targetTableScope;
private final Scope joinScope;

public MergeAnalysis(
Table targetTable,
List<ColumnMetadata> targetColumnsMetadata,
List<ColumnHandle> targetColumnHandles,
List<ColumnHandle> targetRedistributionColumnHandles,
List<List<ColumnHandle>> mergeCaseColumnHandles,
Set<ColumnHandle> nonNullableTargetColumnHandles,
Map<ColumnHandle, Integer> targetColumnHandleFieldNumbers,
List<Integer> insertPartitioningArgumentIndexes,
Optional<NewTableLayout> insertLayout,
Optional<PartitioningHandle> updateLayout,
Scope targetTableScope,
Scope joinScope)
{
this.targetTable = requireNonNull(targetTable, "targetTable is null");
this.targetColumnsMetadata = requireNonNull(targetColumnsMetadata, "targetColumnsMetadata is null");
this.targetColumnHandles = requireNonNull(targetColumnHandles, "targetColumnHandles is null");
this.targetRedistributionColumnHandles = requireNonNull(targetRedistributionColumnHandles, "targetRedistributionColumnHandles is null");
this.mergeCaseColumnHandles = requireNonNull(mergeCaseColumnHandles, "mergeCaseColumnHandles is null");
this.nonNullableColumnHandles = requireNonNull(nonNullableTargetColumnHandles, "nonNullableTargetColumnHandles is null");
this.columnHandleFieldNumbers = requireNonNull(targetColumnHandleFieldNumbers, "targetColumnHandleFieldNumbers is null");
this.insertLayout = requireNonNull(insertLayout, "insertLayout is null");
this.updateLayout = requireNonNull(updateLayout, "updateLayout is null");
this.insertPartitioningArgumentIndexes = (requireNonNull(insertPartitioningArgumentIndexes, "insertPartitioningArgumentIndexes is null"));
this.targetTableScope = requireNonNull(targetTableScope, "targetTableScope is null");
this.joinScope = requireNonNull(joinScope, "joinScope is null");
}

public Table getTargetTable()
{
return targetTable;
}

public List<ColumnMetadata> getTargetColumnsMetadata()
{
return targetColumnsMetadata;
}

public List<ColumnHandle> getTargetColumnHandles()
{
return targetColumnHandles;
}

public List<ColumnHandle> getTargetRedistributionColumnHandles()
{
return targetRedistributionColumnHandles;
}

public List<List<ColumnHandle>> getMergeCaseColumnHandles()
{
return mergeCaseColumnHandles;
}

public Set<ColumnHandle> getNonNullableColumnHandles()
{
return nonNullableColumnHandles;
}

public Map<ColumnHandle, Integer> getColumnHandleFieldNumbers()
{
return columnHandleFieldNumbers;
}

public List<Integer> getInsertPartitioningArgumentIndexes()
{
return insertPartitioningArgumentIndexes;
}

public Optional<NewTableLayout> getInsertLayout()
{
return insertLayout;
}

public Optional<PartitioningHandle> getUpdateLayout()
{
return updateLayout;
}

public Scope getJoinScope()
{
return joinScope;
}

public Scope getTargetTableScope()
{
return targetTableScope;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import com.facebook.presto.spi.connector.ConnectorTransactionHandle;

import java.util.List;
import java.util.Optional;
import java.util.function.ToIntFunction;

import static com.facebook.presto.spi.StandardErrorCode.NOT_SUPPORTED;
Expand All @@ -43,10 +44,10 @@ public BlackHoleNodePartitioningProvider(NodeManager nodeManager)
}

@Override
public ConnectorBucketNodeMap getBucketNodeMap(ConnectorTransactionHandle transactionHandle, ConnectorSession session, ConnectorPartitioningHandle partitioningHandle, List<Node> sortedNodes)
public Optional<ConnectorBucketNodeMap> getBucketNodeMap(ConnectorTransactionHandle transactionHandle, ConnectorSession session, ConnectorPartitioningHandle partitioningHandle, List<Node> sortedNodes)
{
// create one bucket per node
return createBucketNodeMap(nodeManager.getRequiredWorkerNodes().size());
return Optional.of(createBucketNodeMap(nodeManager.getRequiredWorkerNodes().size()));
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -307,4 +307,27 @@ static Block[] ensureBlocksAreLoaded(Block[] blocks)
// No newly loaded blocks
return blocks;
}

static boolean[] copyIsNullAndAppendNull(@Nullable boolean[] isNull, int offsetBase, int positionCount)
{
int desiredLength = offsetBase + positionCount + 1;
boolean[] newIsNull = new boolean[desiredLength];
if (isNull != null) {
checkArrayRange(isNull, offsetBase, positionCount);
System.arraycopy(isNull, 0, newIsNull, 0, desiredLength - 1);
}
// mark the last element to append null
newIsNull[desiredLength - 1] = true;
return newIsNull;
}

static int[] copyOffsetsAndAppendNull(int[] offsets, int offsetBase, int positionCount)
{
int desiredLength = offsetBase + positionCount + 2;
checkArrayRange(offsets, offsetBase, positionCount + 1);
int[] newOffsets = Arrays.copyOf(offsets, desiredLength);
// Null element does not move the offset forward
newOffsets[desiredLength - 1] = newOffsets[desiredLength - 2];
return newOffsets;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -508,6 +508,33 @@ public Block getLoadedBlock()
return new DictionaryBlock(idsOffset, getPositionCount(), loadedDictionary, ids, false, randomDictionaryId());
}

public Block createProjection(Block newDictionary)
{
if (newDictionary.getPositionCount() != dictionary.getPositionCount()) {
throw new IllegalArgumentException("newDictionary must have the same position count");
}

// if the new dictionary is lazy be careful to not materialize it
if (newDictionary instanceof LazyBlock) {
return new LazyBlock(positionCount, (block) -> {
Block newDictionaryBlock = newDictionary.getBlock(0);
Block newBlock = createProjection(newDictionaryBlock);
block.setBlock(newBlock);
});
}
if (newDictionary instanceof RunLengthEncodedBlock) {
RunLengthEncodedBlock rle = (RunLengthEncodedBlock) newDictionary;
return new RunLengthEncodedBlock(rle.getValue(), positionCount);
}

// unwrap dictionary in dictionary
int[] newIds = new int[positionCount];
for (int position = 0; position < positionCount; position++) {
newIds[position] = getIdUnchecked(position);
}
return new DictionaryBlock(0, positionCount, newDictionary, newIds, false, randomDictionaryId());
}

public Block getDictionary()
{
return dictionary;
Expand All @@ -533,6 +560,11 @@ public int getId(int position)
return ids[position + idsOffset];
}

private int getIdUnchecked(int position)
{
return ids[position + idsOffset];
}

public DictionaryId getDictionarySourceId()
{
return dictionarySourceId;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,11 @@
import org.openjdk.jol.info.ClassLayout;

import java.util.Arrays;
import java.util.List;
import java.util.Objects;
import java.util.Optional;
import java.util.function.ObjLongConsumer;
import java.util.stream.Collectors;

import static com.facebook.presto.common.block.BlockUtil.ensureBlocksAreLoaded;
import static io.airlift.slice.SizeOf.sizeOf;
Expand Down Expand Up @@ -248,6 +250,39 @@ public void retainedBytesForEachPart(ObjLongConsumer<Object> consumer)
consumer.accept(this, INSTANCE_SIZE);
}

/**
* Returns the row fields from the specified block. The block maybe a LazyBlock, RunLengthEncodedBlock, or
* DictionaryBlock, but the underlying block must be a RowBlock. The returned field blocks will be the same
* length as the specified block, which means they are not null suppressed.
*/
public static List<Block> getRowFieldsFromBlock(Block block)
{
// if the block is lazy, be careful to not materialize the nested blocks
if (block instanceof LazyBlock) {
LazyBlock lazyBlock = (LazyBlock) block;
block = lazyBlock.getBlock(0);
}

if (block instanceof RunLengthEncodedBlock) {
RunLengthEncodedBlock runLengthEncodedBlock = (RunLengthEncodedBlock) block;
RowBlock rowBlock = (RowBlock) runLengthEncodedBlock.getValue();
return Arrays.stream(rowBlock.fieldBlocks)
.map(fieldBlock -> new RunLengthEncodedBlock(fieldBlock, runLengthEncodedBlock.getPositionCount()))
.collect(Collectors.toList());
}
if (block instanceof DictionaryBlock) {
DictionaryBlock dictionaryBlock = (DictionaryBlock) block;
RowBlock rowBlock = (RowBlock) dictionaryBlock.getDictionary();
return Arrays.stream(rowBlock.fieldBlocks)
.map(dictionaryBlock::createProjection)
.collect(Collectors.toList());
}
if (block instanceof RowBlock) {
return Arrays.asList(((RowBlock) block).fieldBlocks);
}
throw new IllegalArgumentException("Unexpected block type: " + block.getClass().getSimpleName());
}

@Override
public String toString()
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,12 @@ public void writeBoolean(BlockBuilder blockBuilder, boolean value)
throw new UnsupportedOperationException(getClass().getName());
}

@Override
public byte getByte(Block block, int position)
{
throw new UnsupportedOperationException(getClass().getName());
}

@Override
public long getLong(Block block, int position)
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -144,6 +144,12 @@ public void writeBoolean(BlockBuilder blockBuilder, boolean value)
throw new UnsupportedOperationException(getClass().getName());
}

@Override
public byte getByte(Block block, int position)
{
throw new UnsupportedOperationException(getClass().getName());
}

@Override
public long getLong(Block block, int position)
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -133,6 +133,12 @@ public long getLong(Block block, int position)
return (long) block.getByte(position);
}

@Override
public byte getByte(Block block, int position)
{
return block.getByte(position);
}

@Override
public long getLongUnchecked(UncheckedBlock block, int internalPosition)
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -98,6 +98,11 @@ default boolean equalValuesAreIdentical()
*/
boolean getBooleanUnchecked(UncheckedBlock block, int internalPosition);

/**
* Gets the value at the {@code block} {@code position} as a byte.
*/
byte getByte(Block block, int position);

/**
* Gets the value at the {@code block} {@code position} as a long.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -145,6 +145,12 @@ public boolean getBooleanUnchecked(UncheckedBlock block, int internalPosition)
return type.getBooleanUnchecked(block, internalPosition);
}

@Override
public byte getByte(Block block, int position)
{
return type.getByte(block, position);
}

@Override
public long getLong(Block block, int position)
{
Expand Down
Loading
Loading