Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
import com.facebook.presto.raptor.metadata.ColumnInfo;
import com.facebook.presto.raptor.metadata.Distribution;
import com.facebook.presto.raptor.metadata.MetadataDao;
import com.facebook.presto.raptor.metadata.ShardDeleteDelta;
import com.facebook.presto.raptor.metadata.ShardDelta;
import com.facebook.presto.raptor.metadata.ShardInfo;
import com.facebook.presto.raptor.metadata.ShardManager;
Expand Down Expand Up @@ -60,6 +61,7 @@
import com.google.common.collect.Maps;
import com.google.common.collect.Multimaps;
import io.airlift.slice.Slice;
import javafx.util.Pair;
import org.skife.jdbi.v2.IDBI;

import javax.annotation.Nullable;
Expand Down Expand Up @@ -96,13 +98,15 @@
import static com.facebook.presto.raptor.RaptorTableProperties.DISTRIBUTION_NAME_PROPERTY;
import static com.facebook.presto.raptor.RaptorTableProperties.ORDERING_PROPERTY;
import static com.facebook.presto.raptor.RaptorTableProperties.ORGANIZED_PROPERTY;
import static com.facebook.presto.raptor.RaptorTableProperties.TABLE_SUPPORTS_DELTA_DELETE;
import static com.facebook.presto.raptor.RaptorTableProperties.TEMPORAL_COLUMN_PROPERTY;
import static com.facebook.presto.raptor.RaptorTableProperties.getBucketColumns;
import static com.facebook.presto.raptor.RaptorTableProperties.getBucketCount;
import static com.facebook.presto.raptor.RaptorTableProperties.getDistributionName;
import static com.facebook.presto.raptor.RaptorTableProperties.getSortColumns;
import static com.facebook.presto.raptor.RaptorTableProperties.getTemporalColumn;
import static com.facebook.presto.raptor.RaptorTableProperties.isOrganized;
import static com.facebook.presto.raptor.RaptorTableProperties.isTableSupportsDeltaDelete;
import static com.facebook.presto.raptor.systemtables.ColumnRangesSystemTable.getSourceTable;
import static com.facebook.presto.raptor.util.DatabaseUtil.daoTransaction;
import static com.facebook.presto.raptor.util.DatabaseUtil.onDemandDao;
Expand All @@ -118,10 +122,12 @@
import static com.facebook.presto.spi.type.TimestampType.TIMESTAMP;
import static com.google.common.base.Preconditions.checkArgument;
import static com.google.common.base.Preconditions.checkState;
import static com.google.common.collect.ImmutableMap.toImmutableMap;
import static com.google.common.collect.Iterables.getOnlyElement;
import static java.lang.String.format;
import static java.util.Collections.nCopies;
import static java.util.Objects.requireNonNull;
import static java.util.function.Function.identity;
import static java.util.stream.Collectors.toCollection;
import static java.util.stream.Collectors.toList;

Expand All @@ -132,6 +138,7 @@ public class RaptorMetadata

private static final JsonCodec<ShardInfo> SHARD_INFO_CODEC = jsonCodec(ShardInfo.class);
private static final JsonCodec<ShardDelta> SHARD_DELTA_CODEC = jsonCodec(ShardDelta.class);
private static final JsonCodec<ShardDeleteDelta> SHARD_DELTA_DELETE_CODEC = jsonCodec(ShardDeleteDelta.class);

private final IDBI dbi;
private final MetadataDao dao;
Expand Down Expand Up @@ -195,7 +202,8 @@ private RaptorTableHandle getTableHandle(SchemaTableName tableName)
table.isOrganized(),
OptionalLong.empty(),
Optional.empty(),
false);
false,
table.isTableSupportsDeltaDelete());
}

@Override
Expand Down Expand Up @@ -237,6 +245,7 @@ public ConnectorTableMetadata getTableMetadata(ConnectorSession session, Connect

handle.getBucketCount().ifPresent(bucketCount -> properties.put(BUCKET_COUNT_PROPERTY, bucketCount));
handle.getDistributionName().ifPresent(distributionName -> properties.put(DISTRIBUTION_NAME_PROPERTY, distributionName));
properties.put(TABLE_SUPPORTS_DELTA_DELETE, handle.isTableSupportsDeltaDelete());
// Only display organization property if set
if (handle.isOrganized()) {
properties.put(ORGANIZED_PROPERTY, true);
Expand Down Expand Up @@ -568,6 +577,7 @@ public ConnectorOutputTableHandle beginCreateTable(ConnectorSession session, Con
}

boolean organized = isOrganized(tableMetadata.getProperties());
boolean tableSupportsDeltaDelete = isTableSupportsDeltaDelete(tableMetadata.getProperties());
if (organized) {
if (temporalColumnHandle.isPresent()) {
throw new PrestoException(NOT_SUPPORTED, "Table with temporal columns cannot be organized");
Expand Down Expand Up @@ -597,6 +607,7 @@ public ConnectorOutputTableHandle beginCreateTable(ConnectorSession session, Con
distribution.map(info -> OptionalLong.of(info.getDistributionId())).orElse(OptionalLong.empty()),
distribution.map(info -> OptionalInt.of(info.getBucketCount())).orElse(OptionalInt.empty()),
organized,
tableSupportsDeltaDelete,
distribution.map(DistributionInfo::getBucketColumns).orElse(ImmutableList.of()));
}

Expand Down Expand Up @@ -659,7 +670,7 @@ public Optional<ConnectorOutputMetadata> finishCreateTable(ConnectorSession sess

Long distributionId = table.getDistributionId().isPresent() ? table.getDistributionId().getAsLong() : null;
// TODO: update default value of organization_enabled to true
long tableId = dao.insertTable(table.getSchemaName(), table.getTableName(), true, table.isOrganized(), distributionId, updateTime);
long tableId = dao.insertTable(table.getSchemaName(), table.getTableName(), true, table.isOrganized(), distributionId, updateTime, table.isTableSupportsDeltaDelete());

List<RaptorColumnHandle> sortColumnHandles = table.getSortColumnHandles();
List<RaptorColumnHandle> bucketColumnHandles = table.getBucketColumnHandles();
Expand Down Expand Up @@ -689,7 +700,7 @@ public Optional<ConnectorOutputMetadata> finishCreateTable(ConnectorSession sess
.orElse(OptionalLong.empty());

// TODO: refactor this to avoid creating an empty table on failure
shardManager.createTable(newTableId, columns, table.getBucketCount().isPresent(), temporalColumnId);
shardManager.createTable(newTableId, table.isTableSupportsDeltaDelete(), columns, table.getBucketCount().isPresent(), temporalColumnId);
shardManager.commitShards(transactionId, newTableId, columns, parseFragments(fragments), Optional.empty(), updateTime);

clearRollback();
Expand Down Expand Up @@ -799,7 +810,8 @@ public ConnectorTableHandle beginDelete(ConnectorSession session, ConnectorTable
handle.isOrganized(),
OptionalLong.of(transactionId),
Optional.of(columnTypes),
true);
true,
handle.isTableSupportsDeltaDelete());
}

@Override
Expand All @@ -813,22 +825,36 @@ public void finishDelete(ConnectorSession session, ConnectorTableHandle tableHan
.map(RaptorColumnHandle.class::cast)
.map(ColumnInfo::fromHandle).collect(toList());

ImmutableSet.Builder<UUID> oldShardUuidsBuilder = ImmutableSet.builder();
ImmutableList.Builder<ShardInfo> newShardsBuilder = ImmutableList.builder();
if (table.isTableSupportsDeltaDelete()) {
ImmutableMap.Builder<UUID, Pair<Optional<UUID>, Optional<ShardInfo>>> shardMapBuilder = new ImmutableMap.Builder<>();

fragments.stream()
.map(fragment -> SHARD_DELTA_CODEC.fromJson(fragment.getBytes()))
.forEach(delta -> {
oldShardUuidsBuilder.addAll(delta.getOldShardUuids());
newShardsBuilder.addAll(delta.getNewShards());
});
fragments.stream()
.map(fragment -> SHARD_DELTA_DELETE_CODEC.fromJson(fragment.getBytes()))
.forEach(delta -> shardMapBuilder.put(delta.getOldShardUuid(), new Pair<Optional<UUID>, Optional<ShardInfo>>(delta.getOldDeltaDeleteShard(), delta.getNewDeltaDeleteShard())));
OptionalLong updateTime = OptionalLong.of(session.getStartTime());

Set<UUID> oldShardUuids = oldShardUuidsBuilder.build();
List<ShardInfo> newShards = newShardsBuilder.build();
OptionalLong updateTime = OptionalLong.of(session.getStartTime());
log.info("Finishing delete for tableId %s (affected shardUuid: %s)", tableId, shardMapBuilder.build().size());
shardManager.replaceDeltaUuids(transactionId, tableId, columns, shardMapBuilder.build(), updateTime);
}
else {
ImmutableSet.Builder<UUID> oldShardUuidsBuilder = ImmutableSet.builder();
ImmutableList.Builder<ShardInfo> newShardsBuilder = ImmutableList.builder();

fragments.stream()
.map(fragment -> SHARD_DELTA_CODEC.fromJson(fragment.getBytes()))
.forEach(delta -> {
oldShardUuidsBuilder.addAll(delta.getOldShardUuids());
newShardsBuilder.addAll(delta.getNewShards());
});

log.info("Finishing delete for tableId %s (removed: %s, rewritten: %s)", tableId, oldShardUuids.size() - newShards.size(), newShards.size());
shardManager.replaceShardUuids(transactionId, tableId, columns, oldShardUuids, newShards, updateTime);
Set<UUID> oldShardUuids = oldShardUuidsBuilder.build();
List<ShardInfo> newShards = newShardsBuilder.build();
OptionalLong updateTime = OptionalLong.of(session.getStartTime());

log.info("Finishing delete for tableId %s (removed: %s, rewritten: %s)", tableId, oldShardUuids.size() - newShards.size(), newShards.size());
Map<UUID, Optional<UUID>> oldShardUuidsMap = oldShardUuids.stream().collect(toImmutableMap(identity(), uuid -> Optional.empty()));
shardManager.replaceShardUuids(transactionId, false, tableId, columns, oldShardUuidsMap, newShards, updateTime);
}

clearRollback();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
import com.facebook.presto.raptor.metadata.Distribution;
import com.facebook.presto.raptor.metadata.ForMetadata;
import com.facebook.presto.raptor.metadata.TableColumn;
import com.facebook.presto.raptor.storage.StorageManagerConfig;
import com.facebook.presto.raptor.systemtables.ShardMetadataSystemTable;
import com.facebook.presto.raptor.systemtables.TableMetadataSystemTable;
import com.facebook.presto.raptor.systemtables.TableStatsSystemTable;
Expand All @@ -33,6 +34,7 @@

import javax.inject.Singleton;

import static com.facebook.airlift.configuration.ConfigBinder.configBinder;
import static com.facebook.presto.raptor.metadata.SchemaDaoUtil.createTablesWithRetry;
import static com.google.inject.multibindings.Multibinder.newSetBinder;
import static java.util.Objects.requireNonNull;
Expand All @@ -50,6 +52,8 @@ public RaptorModule(String connectorId)
@Override
public void configure(Binder binder)
{
configBinder(binder).bindConfig(StorageManagerConfig.class);
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

not necessary, remove.


binder.bind(RaptorConnectorId.class).toInstance(new RaptorConnectorId(connectorId));
binder.bind(RaptorConnector.class).in(Scopes.SINGLETON);
binder.bind(RaptorMetadataFactory.class).in(Scopes.SINGLETON);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ public class RaptorOutputTableHandle
private final OptionalInt bucketCount;
private final List<RaptorColumnHandle> bucketColumnHandles;
private final boolean organized;
private final boolean tableSupportsDeltaDelete;

@JsonCreator
public RaptorOutputTableHandle(
Expand All @@ -60,6 +61,7 @@ public RaptorOutputTableHandle(
@JsonProperty("distributionId") OptionalLong distributionId,
@JsonProperty("bucketCount") OptionalInt bucketCount,
@JsonProperty("organized") boolean organized,
@JsonProperty("tableSupportsDeltaDelete") boolean tableSupportsDeltaDelete,
@JsonProperty("bucketColumnHandles") List<RaptorColumnHandle> bucketColumnHandles)
{
this.connectorId = requireNonNull(connectorId, "connectorId is null");
Expand All @@ -75,6 +77,7 @@ public RaptorOutputTableHandle(
this.bucketCount = requireNonNull(bucketCount, "bucketCount is null");
this.bucketColumnHandles = ImmutableList.copyOf(requireNonNull(bucketColumnHandles, "bucketColumnHandles is null"));
this.organized = organized;
this.tableSupportsDeltaDelete = tableSupportsDeltaDelete;
}

@JsonProperty
Expand Down Expand Up @@ -155,6 +158,12 @@ public boolean isOrganized()
return organized;
}

@JsonProperty
public boolean isTableSupportsDeltaDelete()
{
return tableSupportsDeltaDelete;
}

@Override
public String toString()
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -60,16 +60,22 @@ public ConnectorPageSource createPageSource(ConnectorTransactionHandle transacti
ReaderAttributes attributes = ReaderAttributes.from(session);
OptionalLong transactionId = raptorSplit.getTransactionId();
Optional<Map<String, Type>> columnTypes = raptorSplit.getColumnTypes();
boolean tableSupportsDeltaDelete = raptorSplit.isTableSupportsDeltaDelete();

FileSystemContext context = new FileSystemContext(session);

Map<UUID, UUID> shardDeltaMap = raptorSplit.getShardDeltaMap();
if (raptorSplit.getShardUuids().size() == 1) {
UUID shardUuid = raptorSplit.getShardUuids().iterator().next();
return createPageSource(context, shardUuid, bucketNumber, columns, predicate, attributes, transactionId, columnTypes);
return createPageSource(context, shardUuid,
Optional.ofNullable(shardDeltaMap.get(shardUuid)),
tableSupportsDeltaDelete, bucketNumber, columns, predicate, attributes, transactionId, columnTypes);
}

Iterator<ConnectorPageSource> iterator = raptorSplit.getShardUuids().stream()
.map(shardUuid -> createPageSource(context, shardUuid, bucketNumber, columns, predicate, attributes, transactionId, columnTypes))
.map(shardUuid -> createPageSource(context, shardUuid,
Optional.ofNullable(shardDeltaMap.get(shardUuid)),
tableSupportsDeltaDelete, bucketNumber, columns, predicate, attributes, transactionId, columnTypes))
.iterator();

return new ConcatPageSource(iterator);
Expand All @@ -78,6 +84,8 @@ public ConnectorPageSource createPageSource(ConnectorTransactionHandle transacti
private ConnectorPageSource createPageSource(
FileSystemContext context,
UUID shardUuid,
Optional<UUID> deltaShardUuid,
boolean tableSupportsDeltaDelete,
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We don't need this flag if we can use Optional.empty for deltaShardUuid

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We don't need this flag if we can use Optional.empty for deltaShardUuid

According to the discussion with Jiexi, the design is that we have a table property indicating if this table supports delta_delete. But in the future, we could have the concept of delta_delete as session property.

Thus having these two parameters give us the flexibility and compatibility to future features.

That's why I didn't fix it for last comment, sorry that I didn't explain last time.

OptionalInt bucketNumber,
List<ColumnHandle> columns,
TupleDomain<RaptorColumnHandle> predicate,
Expand All @@ -89,6 +97,6 @@ private ConnectorPageSource createPageSource(
List<Long> columnIds = columnHandles.stream().map(RaptorColumnHandle::getColumnId).collect(toList());
List<Type> columnTypes = columnHandles.stream().map(RaptorColumnHandle::getColumnType).collect(toList());

return storageManager.getPageSource(context, shardUuid, bucketNumber, columnIds, columnTypes, predicate, attributes, transactionId, allColumnTypes);
return storageManager.getPageSource(context, shardUuid, deltaShardUuid, tableSupportsDeltaDelete, bucketNumber, columnIds, columnTypes, predicate, attributes, transactionId, allColumnTypes);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,8 @@ public class RaptorSplit
{
private final String connectorId;
private final Set<UUID> shardUuids;
private final Map<UUID, UUID> shardDeltaMap;
private final boolean tableSupportsDeltaDelete;
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

remove

private final OptionalInt bucketNumber;
private final List<HostAddress> addresses;
private final TupleDomain<RaptorColumnHandle> effectivePredicate;
Expand All @@ -48,40 +50,48 @@ public class RaptorSplit
public RaptorSplit(
@JsonProperty("connectorId") String connectorId,
@JsonProperty("shardUuids") Set<UUID> shardUuids,
@JsonProperty("shardDeltaMap") Map<UUID, UUID> shardDeltaMap,
@JsonProperty("tableSupportsDeltaDelete") boolean tableSupportsDeltaDelete,
@JsonProperty("bucketNumber") OptionalInt bucketNumber,
@JsonProperty("effectivePredicate") TupleDomain<RaptorColumnHandle> effectivePredicate,
@JsonProperty("transactionId") OptionalLong transactionId,
@JsonProperty("columnTypes") Optional<Map<String, Type>> columnTypes)
{
this(connectorId, shardUuids, bucketNumber, ImmutableList.of(), effectivePredicate, transactionId, columnTypes);
this(connectorId, shardUuids, shardDeltaMap, tableSupportsDeltaDelete, bucketNumber, ImmutableList.of(), effectivePredicate, transactionId, columnTypes);
}

public RaptorSplit(
String connectorId,
UUID shardUuid,
Map<UUID, UUID> shardDeltaMap,
boolean tableSupportsDeltaDelete,
List<HostAddress> addresses,
TupleDomain<RaptorColumnHandle> effectivePredicate,
OptionalLong transactionId,
Optional<Map<String, Type>> columnTypes)
{
this(connectorId, ImmutableSet.of(shardUuid), OptionalInt.empty(), addresses, effectivePredicate, transactionId, columnTypes);
this(connectorId, ImmutableSet.of(shardUuid), shardDeltaMap, tableSupportsDeltaDelete, OptionalInt.empty(), addresses, effectivePredicate, transactionId, columnTypes);
}

public RaptorSplit(
String connectorId,
Set<UUID> shardUuids,
Map<UUID, UUID> shardDeltaMap,
boolean tableSupportsDeltaDelete,
int bucketNumber,
HostAddress address,
TupleDomain<RaptorColumnHandle> effectivePredicate,
OptionalLong transactionId,
Optional<Map<String, Type>> columnTypes)
{
this(connectorId, shardUuids, OptionalInt.of(bucketNumber), ImmutableList.of(address), effectivePredicate, transactionId, columnTypes);
this(connectorId, shardUuids, shardDeltaMap, tableSupportsDeltaDelete, OptionalInt.of(bucketNumber), ImmutableList.of(address), effectivePredicate, transactionId, columnTypes);
}

private RaptorSplit(
String connectorId,
Set<UUID> shardUuids,
Map<UUID, UUID> shardDeltaMap,
boolean tableSupportsDeltaDelete,
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

remove

OptionalInt bucketNumber,
List<HostAddress> addresses,
TupleDomain<RaptorColumnHandle> effectivePredicate,
Expand All @@ -90,6 +100,8 @@ private RaptorSplit(
{
this.connectorId = requireNonNull(connectorId, "connectorId is null");
this.shardUuids = ImmutableSet.copyOf(requireNonNull(shardUuids, "shardUuid is null"));
this.shardDeltaMap = requireNonNull(shardDeltaMap, "shardUuid is null");
this.tableSupportsDeltaDelete = requireNonNull(tableSupportsDeltaDelete, "tableSupportsDeltaDelete is null");
this.bucketNumber = requireNonNull(bucketNumber, "bucketNumber is null");
this.addresses = ImmutableList.copyOf(requireNonNull(addresses, "addresses is null"));
this.effectivePredicate = requireNonNull(effectivePredicate, "effectivePredicate is null");
Expand Down Expand Up @@ -121,6 +133,12 @@ public Set<UUID> getShardUuids()
return shardUuids;
}

@JsonProperty
public Map<UUID, UUID> getShardDeltaMap()
{
return shardDeltaMap;
}

@JsonProperty
public OptionalInt getBucketNumber()
{
Expand All @@ -145,6 +163,12 @@ public Optional<Map<String, Type>> getColumnTypes()
return columnTypes;
}

@JsonProperty
public boolean isTableSupportsDeltaDelete()
{
return tableSupportsDeltaDelete;
}

@Override
public Object getInfo()
{
Expand All @@ -156,6 +180,8 @@ public String toString()
{
return toStringHelper(this)
.add("shardUuids", shardUuids)
.add("shardDeltaMap", shardDeltaMap.toString())
.add("tableSupportsDeltaDelete", tableSupportsDeltaDelete)
.add("bucketNumber", bucketNumber.isPresent() ? bucketNumber.getAsInt() : null)
.add("hosts", addresses)
.omitNullValues()
Expand Down
Loading