Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,10 @@
import com.facebook.airlift.json.JsonCodec;
import com.facebook.airlift.log.Logger;
import com.facebook.presto.raptor.metadata.ColumnInfo;
import com.facebook.presto.raptor.metadata.DeltaInfoPair;
import com.facebook.presto.raptor.metadata.Distribution;
import com.facebook.presto.raptor.metadata.MetadataDao;
import com.facebook.presto.raptor.metadata.ShardDeleteDelta;
import com.facebook.presto.raptor.metadata.ShardDelta;
import com.facebook.presto.raptor.metadata.ShardInfo;
import com.facebook.presto.raptor.metadata.ShardManager;
Expand Down Expand Up @@ -134,6 +136,7 @@ public class RaptorMetadata

private static final JsonCodec<ShardInfo> SHARD_INFO_CODEC = jsonCodec(ShardInfo.class);
private static final JsonCodec<ShardDelta> SHARD_DELTA_CODEC = jsonCodec(ShardDelta.class);
private static final JsonCodec<ShardDeleteDelta> SHARD_DELETE_DELTA_CODEC = jsonCodec(ShardDeleteDelta.class);

private final IDBI dbi;
private final MetadataDao dao;
Expand Down Expand Up @@ -821,23 +824,35 @@ public void finishDelete(ConnectorSession session, ConnectorTableHandle tableHan
.map(RaptorColumnHandle.class::cast)
.map(ColumnInfo::fromHandle).collect(toList());

ImmutableSet.Builder<UUID> oldShardUuidsBuilder = ImmutableSet.builder();
ImmutableList.Builder<ShardInfo> newShardsBuilder = ImmutableList.builder();
if (table.isTableSupportsDeltaDelete()) {
ImmutableMap.Builder<UUID, DeltaInfoPair> shardMapBuilder = ImmutableMap.builder();

fragments.stream()
.map(fragment -> SHARD_DELTA_CODEC.fromJson(fragment.getBytes()))
.forEach(delta -> {
oldShardUuidsBuilder.addAll(delta.getOldShardUuids());
newShardsBuilder.addAll(delta.getNewShards());
});
fragments.stream()
.map(fragment -> SHARD_DELETE_DELTA_CODEC.fromJson(fragment.getBytes()))
.forEach(delta -> shardMapBuilder.put(delta.getOldShardUuid(), delta.getDeltaInfoPair()));
OptionalLong updateTime = OptionalLong.of(session.getStartTime());

Set<UUID> oldShardUuids = oldShardUuidsBuilder.build();
List<ShardInfo> newShards = newShardsBuilder.build();
OptionalLong updateTime = OptionalLong.of(session.getStartTime());
log.info("Finishing delete for tableId %s (affected shardUuid: %s)", tableId, shardMapBuilder.build().size());
shardManager.replaceDeltaUuids(transactionId, tableId, columns, shardMapBuilder.build(), updateTime);
}
else {
ImmutableSet.Builder<UUID> oldShardUuidsBuilder = ImmutableSet.builder();
ImmutableList.Builder<ShardInfo> newShardsBuilder = ImmutableList.builder();

log.info("Finishing delete for tableId %s (removed: %s, rewritten: %s)", tableId, oldShardUuids.size() - newShards.size(), newShards.size());
shardManager.replaceShardUuids(transactionId, tableId, columns, oldShardUuids, newShards, updateTime);
fragments.stream()
.map(fragment -> SHARD_DELTA_CODEC.fromJson(fragment.getBytes()))
.forEach(delta -> {
oldShardUuidsBuilder.addAll(delta.getOldShardUuids());
newShardsBuilder.addAll(delta.getNewShards());
});

Set<UUID> oldShardUuids = oldShardUuidsBuilder.build();
List<ShardInfo> newShards = newShardsBuilder.build();
OptionalLong updateTime = OptionalLong.of(session.getStartTime());

log.info("Finishing delete for tableId %s (removed: %s, rewritten: %s)", tableId, oldShardUuids.size() - newShards.size(), newShards.size());
shardManager.replaceShardUuids(transactionId, tableId, columns, oldShardUuids, newShards, updateTime);
}
clearRollback();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -60,24 +60,55 @@ public ConnectorPageSource createPageSource(ConnectorTransactionHandle transacti
ReaderAttributes attributes = ReaderAttributes.from(session);
OptionalLong transactionId = raptorSplit.getTransactionId();
Optional<Map<String, Type>> columnTypes = raptorSplit.getColumnTypes();
boolean tableSupportsDeltaDelete = raptorSplit.isTableSupportsDeltaDelete();

FileSystemContext context = new FileSystemContext(session);

Map<UUID, UUID> shardDeltaMap = raptorSplit.getShardDeltaMap();
if (raptorSplit.getShardUuids().size() == 1) {
UUID shardUuid = raptorSplit.getShardUuids().iterator().next();
return createPageSource(context, shardUuid, bucketNumber, columns, predicate, attributes, transactionId, columnTypes);
return createPageSource(
context,
shardUuid,
Optional.ofNullable(shardDeltaMap.get(shardUuid)),
tableSupportsDeltaDelete,
bucketNumber,
columns,
predicate,
attributes,
transactionId,
columnTypes);
}

Iterator<ConnectorPageSource> iterator = raptorSplit.getShardUuids().stream()
.map(shardUuid -> createPageSource(context, shardUuid, bucketNumber, columns, predicate, attributes, transactionId, columnTypes))
.map(shardUuid -> createPageSource(
context,
shardUuid,
Optional.ofNullable(shardDeltaMap.get(shardUuid)),
tableSupportsDeltaDelete,
bucketNumber,
columns,
predicate,
attributes,
transactionId,
columnTypes))
.iterator();

return new ConcatPageSource(iterator);
}

/**
*
* @param deltaShardUuid delta of one shard
* @param tableSupportsDeltaDelete table property indicating if this table supports delta_delete
* In the future, we could have the concept of delta_delete as session property.
* Having these two parameters at the same time gives us the flexibility and compatibility to future features.
*/
private ConnectorPageSource createPageSource(
FileSystemContext context,
UUID shardUuid,
Optional<UUID> deltaShardUuid,
boolean tableSupportsDeltaDelete,
OptionalInt bucketNumber,
List<ColumnHandle> columns,
TupleDomain<RaptorColumnHandle> predicate,
Expand All @@ -89,6 +120,6 @@ private ConnectorPageSource createPageSource(
List<Long> columnIds = columnHandles.stream().map(RaptorColumnHandle::getColumnId).collect(toList());
List<Type> columnTypes = columnHandles.stream().map(RaptorColumnHandle::getColumnType).collect(toList());

return storageManager.getPageSource(context, shardUuid, bucketNumber, columnIds, columnTypes, predicate, attributes, transactionId, allColumnTypes);
return storageManager.getPageSource(context, shardUuid, deltaShardUuid, tableSupportsDeltaDelete, bucketNumber, columnIds, columnTypes, predicate, attributes, transactionId, allColumnTypes);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.ImmutableSet;

import java.util.List;
Expand All @@ -38,6 +39,8 @@ public class RaptorSplit
{
private final String connectorId;
private final Set<UUID> shardUuids;
private final Map<UUID, UUID> shardDeltaMap;
private final boolean tableSupportsDeltaDelete;
private final OptionalInt bucketNumber;
private final List<HostAddress> addresses;
private final TupleDomain<RaptorColumnHandle> effectivePredicate;
Expand All @@ -48,40 +51,75 @@ public class RaptorSplit
public RaptorSplit(
@JsonProperty("connectorId") String connectorId,
@JsonProperty("shardUuids") Set<UUID> shardUuids,
@JsonProperty("shardDeltaMap") Map<UUID, UUID> shardDeltaMap,
@JsonProperty("tableSupportsDeltaDelete") boolean tableSupportsDeltaDelete,
@JsonProperty("bucketNumber") OptionalInt bucketNumber,
@JsonProperty("effectivePredicate") TupleDomain<RaptorColumnHandle> effectivePredicate,
@JsonProperty("transactionId") OptionalLong transactionId,
@JsonProperty("columnTypes") Optional<Map<String, Type>> columnTypes)
{
this(connectorId, shardUuids, bucketNumber, ImmutableList.of(), effectivePredicate, transactionId, columnTypes);
this(
connectorId,
shardUuids,
shardDeltaMap,
tableSupportsDeltaDelete,
bucketNumber,
ImmutableList.of(),
effectivePredicate,
transactionId,
columnTypes);
}

public RaptorSplit(
String connectorId,
UUID shardUuid,
Optional<UUID> deltaShardUuid,
boolean tableSupportsDeltaDelete,
List<HostAddress> addresses,
TupleDomain<RaptorColumnHandle> effectivePredicate,
OptionalLong transactionId,
Optional<Map<String, Type>> columnTypes)
{
this(connectorId, ImmutableSet.of(shardUuid), OptionalInt.empty(), addresses, effectivePredicate, transactionId, columnTypes);
this(
connectorId,
ImmutableSet.of(shardUuid),
deltaShardUuid.map(deltaUuid -> ImmutableMap.of(shardUuid, deltaUuid)).orElse(ImmutableMap.of()),
tableSupportsDeltaDelete,
OptionalInt.empty(),
addresses,
effectivePredicate,
transactionId,
columnTypes);
}

public RaptorSplit(
String connectorId,
Set<UUID> shardUuids,
Map<UUID, UUID> shardDeltaMap,
boolean tableSupportsDeltaDelete,
int bucketNumber,
HostAddress address,
TupleDomain<RaptorColumnHandle> effectivePredicate,
OptionalLong transactionId,
Optional<Map<String, Type>> columnTypes)
{
this(connectorId, shardUuids, OptionalInt.of(bucketNumber), ImmutableList.of(address), effectivePredicate, transactionId, columnTypes);
this(
connectorId,
shardUuids,
shardDeltaMap,
tableSupportsDeltaDelete,
OptionalInt.of(bucketNumber),
ImmutableList.of(address),
effectivePredicate,
transactionId,
columnTypes);
}

private RaptorSplit(
String connectorId,
Set<UUID> shardUuids,
Map<UUID, UUID> shardDeltaMap,
boolean tableSupportsDeltaDelete,
OptionalInt bucketNumber,
List<HostAddress> addresses,
TupleDomain<RaptorColumnHandle> effectivePredicate,
Expand All @@ -90,6 +128,8 @@ private RaptorSplit(
{
this.connectorId = requireNonNull(connectorId, "connectorId is null");
this.shardUuids = ImmutableSet.copyOf(requireNonNull(shardUuids, "shardUuid is null"));
this.shardDeltaMap = requireNonNull(shardDeltaMap, "shardUuid is null");
this.tableSupportsDeltaDelete = tableSupportsDeltaDelete;
this.bucketNumber = requireNonNull(bucketNumber, "bucketNumber is null");
this.addresses = ImmutableList.copyOf(requireNonNull(addresses, "addresses is null"));
this.effectivePredicate = requireNonNull(effectivePredicate, "effectivePredicate is null");
Expand Down Expand Up @@ -121,6 +161,18 @@ public Set<UUID> getShardUuids()
return shardUuids;
}

@JsonProperty
public Map<UUID, UUID> getShardDeltaMap()
{
return shardDeltaMap;
}

@JsonProperty
public boolean isTableSupportsDeltaDelete()
{
return tableSupportsDeltaDelete;
}

@JsonProperty
public OptionalInt getBucketNumber()
{
Expand Down Expand Up @@ -156,6 +208,8 @@ public String toString()
{
return toStringHelper(this)
.add("shardUuids", shardUuids)
.add("shardDeltaMap", shardDeltaMap.toString())
.add("tableSupportsDeltaDelete", tableSupportsDeltaDelete)
.add("bucketNumber", bucketNumber.isPresent() ? bucketNumber.getAsInt() : null)
.add("hosts", addresses)
.omitNullValues()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
import com.facebook.presto.spi.predicate.TupleDomain;
import com.facebook.presto.spi.type.Type;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import org.skife.jdbi.v2.ResultIterator;

import javax.annotation.PreDestroy;
Expand Down Expand Up @@ -111,7 +112,7 @@ public ConnectorSplitSource getSplits(
OptionalLong transactionId = table.getTransactionId();
Optional<List<String>> bucketToNode = handle.getPartitioning().map(RaptorPartitioningHandle::getBucketToNode);
verify(bucketed == bucketToNode.isPresent(), "mismatched bucketCount and bucketToNode presence");
return new RaptorSplitSource(tableId, merged, effectivePredicate, transactionId, table.getColumnTypes(), bucketToNode);
return new RaptorSplitSource(tableId, merged, effectivePredicate, transactionId, table.getColumnTypes(), bucketToNode, handle.getTable().isTableSupportsDeltaDelete());
}

private static List<HostAddress> getAddressesForNodes(Map<String, Node> nodeMap, Iterable<String> nodeIdentifiers)
Expand Down Expand Up @@ -148,6 +149,7 @@ private class RaptorSplitSource
private final Optional<Map<String, Type>> columnTypes;
private final Optional<List<String>> bucketToNode;
private final ResultIterator<BucketShards> iterator;
private final boolean tableSupportsDeltaDelete;

@GuardedBy("this")
private CompletableFuture<ConnectorSplitBatch> future;
Expand All @@ -158,20 +160,22 @@ public RaptorSplitSource(
TupleDomain<RaptorColumnHandle> effectivePredicate,
OptionalLong transactionId,
Optional<Map<String, Type>> columnTypes,
Optional<List<String>> bucketToNode)
Optional<List<String>> bucketToNode,
boolean tableSupportsDeltaDelete)
{
this.tableId = tableId;
this.effectivePredicate = requireNonNull(effectivePredicate, "effectivePredicate is null");
this.transactionId = requireNonNull(transactionId, "transactionId is null");
this.columnTypes = requireNonNull(columnTypes, "columnTypesis null");
this.bucketToNode = requireNonNull(bucketToNode, "bucketToNode is null");
this.tableSupportsDeltaDelete = tableSupportsDeltaDelete;

ResultIterator<BucketShards> iterator;
if (bucketToNode.isPresent()) {
iterator = shardManager.getShardNodesBucketed(tableId, merged, bucketToNode.get(), effectivePredicate);
iterator = shardManager.getShardNodesBucketed(tableId, merged, bucketToNode.get(), effectivePredicate, tableSupportsDeltaDelete);
}
else {
iterator = shardManager.getShardNodes(tableId, effectivePredicate);
iterator = shardManager.getShardNodes(tableId, effectivePredicate, tableSupportsDeltaDelete);
}
this.iterator = new SynchronizedResultIterator<>(iterator);
}
Expand Down Expand Up @@ -226,13 +230,14 @@ private ConnectorSplit createSplit(BucketShards bucketShards)

verify(bucketShards.getShards().size() == 1, "wrong shard count for non-bucketed table");
ShardNodes shard = getOnlyElement(bucketShards.getShards());
UUID shardId = shard.getShardUuid();
UUID shardUuid = shard.getShardUuid();
Optional<UUID> deltaShardUuid = shard.getDeltaShardUuid();
Set<String> nodeIds = shard.getNodeIdentifiers();

List<HostAddress> addresses = getAddressesForNodes(nodesById, nodeIds);
if (addresses.isEmpty()) {
if (!backupAvailable) {
throw new PrestoException(RAPTOR_NO_HOST_FOR_SHARD, format("No host for shard %s found: %s", shardId, nodeIds));
throw new PrestoException(RAPTOR_NO_HOST_FOR_SHARD, format("No host for shard %s found: %s", shardUuid, nodeIds));
}

// Pick a random node and optimistically assign the shard to it.
Expand All @@ -242,11 +247,19 @@ private ConnectorSplit createSplit(BucketShards bucketShards)
throw new PrestoException(NO_NODES_AVAILABLE, "No nodes available to run query");
}
Node node = selectRandom(availableNodes);
shardManager.replaceShardAssignment(tableId, shardId, node.getNodeIdentifier(), true);
shardManager.replaceShardAssignment(tableId, shardUuid, node.getNodeIdentifier(), true);
addresses = ImmutableList.of(node.getHostAndPort());
}

return new RaptorSplit(connectorId, shardId, addresses, effectivePredicate, transactionId, columnTypes);
return new RaptorSplit(
connectorId,
shardUuid,
deltaShardUuid,
tableSupportsDeltaDelete,
addresses,
effectivePredicate,
transactionId,
columnTypes);
}

private ConnectorSplit createBucketSplit(int bucketNumber, Set<ShardNodes> shards)
Expand All @@ -263,9 +276,25 @@ private ConnectorSplit createBucketSplit(int bucketNumber, Set<ShardNodes> shard
Set<UUID> shardUuids = shards.stream()
.map(ShardNodes::getShardUuid)
.collect(toSet());
ImmutableMap.Builder<UUID, UUID> shardMapBuilder = ImmutableMap.builder();
shards.forEach(
shard -> {
if (shard.getDeltaShardUuid().isPresent()) {
shardMapBuilder.put(shard.getShardUuid(), shard.getDeltaShardUuid().get());
}
});
HostAddress address = node.getHostAndPort();

return new RaptorSplit(connectorId, shardUuids, bucketNumber, address, effectivePredicate, transactionId, columnTypes);
return new RaptorSplit(
connectorId,
shardUuids,
shardMapBuilder.build(),
tableSupportsDeltaDelete,
bucketNumber,
address,
effectivePredicate,
transactionId,
columnTypes);
}
}
}
Loading