Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -247,7 +247,12 @@ private ConnectorSplit createSplit(BucketShards bucketShards)
throw new PrestoException(NO_NODES_AVAILABLE, "No nodes available to run query");
}
Node node = selectRandom(availableNodes);
shardManager.replaceShardAssignment(tableId, shardUuid, node.getNodeIdentifier(), true);
shardManager.replaceShardAssignment(
tableId,
shardUuid,
deltaShardUuid,
node.getNodeIdentifier(),
true);
addresses = ImmutableList.of(node.getHostAndPort());
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -957,6 +957,12 @@ public ShardMetadata getShard(UUID shardUuid)
return dao.getShard(shardUuid);
}

@Override
public Set<ShardMetadata> getNodeShardsAndDeltas(String nodeIdentifier)
{
return dao.getNodeShardsAndDeltas(nodeIdentifier, null);
}

@Override
public Set<ShardMetadata> getNodeShards(String nodeIdentifier)
{
Expand All @@ -982,7 +988,7 @@ public ResultIterator<BucketShards> getShardNodesBucketed(long tableId, boolean
}

@Override
public void replaceShardAssignment(long tableId, UUID shardUuid, String nodeIdentifier, boolean gracePeriod)
public void replaceShardAssignment(long tableId, UUID shardUuid, Optional<UUID> deltaUuid, String nodeIdentifier, boolean gracePeriod)
{
if (gracePeriod && (nanosSince(startTime).compareTo(startupGracePeriod) < 0)) {
throw new PrestoException(SERVER_STARTING_UP, "Cannot reassign shards while server is starting");
Expand All @@ -994,10 +1000,18 @@ public void replaceShardAssignment(long tableId, UUID shardUuid, String nodeIden
ShardDao dao = shardDaoSupplier.attach(handle);

Set<Integer> oldAssignments = new HashSet<>(fetchLockedNodeIds(handle, tableId, shardUuid));

// 1. Update index table
updateNodeIds(handle, tableId, shardUuid, ImmutableSet.of(nodeId));

// 2. Update shards table
dao.deleteShardNodes(shardUuid, oldAssignments);
dao.insertShardNode(shardUuid, nodeId);

if (deltaUuid.isPresent()) {
dao.deleteShardNodes(deltaUuid.get(), oldAssignments);
dao.insertShardNode(deltaUuid.get(), nodeId);
}
return null;
});
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -379,7 +379,7 @@ synchronized void cleanLocalShardsImmediately(Set<UUID> local)
throws IOException
{
// get shards assigned to the local node
Set<UUID> assigned = dao.getNodeShards(currentNode, null).stream()
Set<UUID> assigned = dao.getNodeShardsAndDeltas(currentNode, null).stream()
.map(ShardMetadata::getShardUuid)
.collect(toSet());

Expand All @@ -402,7 +402,7 @@ synchronized void cleanLocalShards()
Set<UUID> local = getLocalShards();

// get shards assigned to the local node
Set<UUID> assigned = dao.getNodeShards(currentNode, null).stream()
Set<UUID> assigned = dao.getNodeShardsAndDeltas(currentNode, null).stream()
.map(ShardMetadata::getShardUuid)
.collect(toSet());

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,32 @@ public interface ShardDao
" AND (s.table_id = :tableId OR :tableId IS NULL)\n" +
") x")
@Mapper(ShardMetadata.Mapper.class)
Set<ShardMetadata> getNodeShardsAndDeltas(@Bind("nodeIdentifier") String nodeIdentifier, @Bind("tableId") Long tableId);

@SqlQuery("SELECT " + SHARD_METADATA_COLUMNS + "\n" +
"FROM (\n" +
" SELECT s.*\n" +
" FROM shards s\n" +
" JOIN shard_nodes sn ON (s.shard_id = sn.shard_id)\n" +
" JOIN nodes n ON (sn.node_id = n.node_id)\n" +
" WHERE n.node_identifier = :nodeIdentifier\n" +
" AND s.bucket_number IS NULL\n" +
" AND s.is_delta = false\n" +
" AND (s.table_id = :tableId OR :tableId IS NULL)\n" +
" UNION ALL\n" +
" SELECT s.*\n" +
" FROM shards s\n" +
" JOIN tables t ON (s.table_id = t.table_id)\n" +
" JOIN distributions d ON (t.distribution_id = d.distribution_id)\n" +
" JOIN buckets b ON (\n" +
" d.distribution_id = b.distribution_id AND\n" +
" s.bucket_number = b.bucket_number)\n" +
" JOIN nodes n ON (b.node_id = n.node_id)\n" +
" WHERE n.node_identifier = :nodeIdentifier\n" +
" AND s.is_delta = false\n" +
" AND (s.table_id = :tableId OR :tableId IS NULL)\n" +
") x")
@Mapper(ShardMetadata.Mapper.class)
Set<ShardMetadata> getNodeShards(@Bind("nodeIdentifier") String nodeIdentifier, @Bind("tableId") Long tableId);

@SqlQuery("SELECT n.node_identifier, x.bytes\n" +
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -73,12 +73,19 @@ public interface ShardManager
ShardMetadata getShard(UUID shardUuid);

/**
* Get shard metadata for shards on a given node.
* Get shard and delta metadata for shards on a given node.
*/
Set<ShardMetadata> getNodeShardsAndDeltas(String nodeIdentifier);

/**
* Get only shard metadata for shards on a given node.
* Note: shard metadata will contain its delta
*/
Set<ShardMetadata> getNodeShards(String nodeIdentifier);

/**
* Get shard metadata for shards on a given node.
* Get only shard metadata for shards on a given node.
* Note: shard metadata will contain its delta
*/
Set<ShardMetadata> getNodeShards(String nodeIdentifier, long tableId);

Expand All @@ -95,7 +102,7 @@ public interface ShardManager
/**
* Remove all old shard assignments and assign a shard to a node
*/
void replaceShardAssignment(long tableId, UUID shardUuid, String nodeIdentifier, boolean gracePeriod);
void replaceShardAssignment(long tableId, UUID shardUuid, Optional<UUID> deltaUuid, String nodeIdentifier, boolean gracePeriod);

/**
* Get the number of bytes used by assigned shards per node.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -215,7 +215,7 @@ void process()
// only include nodes that are below threshold
nodes = new HashMap<>(filterValues(nodes, size -> size <= averageSize));

// get non-bucketed node shards by size, largest to smallest
// get non-bucketed node shards(only) by size, largest to smallest
List<ShardMetadata> shards = shardManager.getNodeShards(currentNode).stream()
.filter(shard -> !shard.getBucketNumber().isPresent())
.sorted(comparingLong(ShardMetadata::getCompressedSize).reversed())
Expand All @@ -227,6 +227,7 @@ void process()
ShardMetadata shard = queue.remove();
long shardSize = shard.getCompressedSize();
UUID shardUuid = shard.getShardUuid();
Optional<UUID> deltaUuid = shard.getDeltaUuid();

// verify backup exists
if (!backupStore.get().shardExists(shardUuid)) {
Expand All @@ -250,7 +251,7 @@ void process()
nodeSize -= shardSize;

// move assignment
shardManager.replaceShardAssignment(shard.getTableId(), shardUuid, target, false);
shardManager.replaceShardAssignment(shard.getTableId(), shardUuid, deltaUuid, target, false);

// delete local file
Path file = storageService.getStorageFile(shardUuid);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -188,7 +188,7 @@ private synchronized void enqueueMissingShards()

private Set<ShardMetadata> getMissingShards()
{
return shardManager.getNodeShards(nodeIdentifier).stream()
return shardManager.getNodeShardsAndDeltas(nodeIdentifier).stream()
.filter(shard -> shardNeedsRecovery(shard.getShardUuid(), shard.getCompressedSize()))
.collect(toSet());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -70,33 +70,41 @@ private Set<OrganizationSet> buildCompactionSets(Table tableInfo, Set<ShardIndex
ImmutableSet.Builder<ShardIndexInfo> builder = ImmutableSet.builder();
ImmutableSet.Builder<OrganizationSet> compactionSets = ImmutableSet.builder();

int priority = 0;
for (ShardIndexInfo shard : shards) {
if (((consumedBytes + shard.getUncompressedSize()) > maxShardSize.toBytes()) ||
(consumedRows + shard.getRowCount() > maxShardRows)) {
// Finalize this compaction set, and start a new one for the rest of the shards
Set<ShardIndexInfo> shardsToCompact = builder.build();
addToCompactionSets(compactionSets, shardsToCompact, tableId, tableInfo, priority);

if (shardsToCompact.size() > 1) {
compactionSets.add(createOrganizationSet(tableId, shardsToCompact));
}

priority = 0;
builder = ImmutableSet.builder();
consumedBytes = 0;
consumedRows = 0;
}
if (shard.getDeltaUuid().isPresent()) {
priority += 1;
}
builder.add(shard);
consumedBytes += shard.getUncompressedSize();
consumedRows += shard.getRowCount();
}

// create compaction set for the remaining shards of this day
Set<ShardIndexInfo> shardsToCompact = builder.build();
if (shardsToCompact.size() > 1) {
compactionSets.add(createOrganizationSet(tableId, shardsToCompact));
}
addToCompactionSets(compactionSets, shardsToCompact, tableId, tableInfo, priority);
return compactionSets.build();
}

private void addToCompactionSets(ImmutableSet.Builder<OrganizationSet> compactionSets, Set<ShardIndexInfo> shardsToCompact, long tableId, Table tableInfo, int priority)
{
// Add special rule for shard which is too big to compact with other shards but have delta to compact
if (shardsToCompact.size() > 1 || shardsToCompact.stream().anyMatch(shard -> shard.getDeltaUuid().isPresent())) {
compactionSets.add(createOrganizationSet(tableId, tableInfo.isTableSupportsDeltaDelete(), shardsToCompact, priority));
}
}

private static Comparator<ShardIndexInfo> getShardIndexInfoComparator(Table tableInfo)
{
if (!tableInfo.getTemporalColumnId().isPresent()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,11 +24,14 @@
import java.io.IOException;
import java.io.UncheckedIOException;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.OptionalInt;
import java.util.Set;
import java.util.OptionalLong;
import java.util.UUID;

import static com.facebook.presto.spi.block.SortOrder.ASC_NULLS_FIRST;
import static com.google.common.base.MoreObjects.toStringHelper;
import static java.util.Collections.nCopies;
import static java.util.Objects.requireNonNull;
import static java.util.stream.Collectors.toList;
Expand All @@ -55,35 +58,42 @@ public OrganizationJob(OrganizationSet organizationSet, MetadataDao metadataDao,
public void run()
{
try {
runJob(organizationSet.getTableId(), organizationSet.getBucketNumber(), organizationSet.getShards());
runJob(organizationSet.getTableId(), organizationSet.isTableSupportsDeltaDelete(), organizationSet.getBucketNumber(), organizationSet.getShardsMap());
}
catch (IOException e) {
throw new UncheckedIOException(e);
}
}

private void runJob(long tableId, OptionalInt bucketNumber, Set<UUID> shardUuids)
private void runJob(long tableId, boolean tableSupportsDeltaDelete, OptionalInt bucketNumber, Map<UUID, Optional<UUID>> shardUuidsMap)
throws IOException
{
long transactionId = shardManager.beginTransaction();
try {
// todo add logic in organization for delta or it may corrupt data
return;
//runJob(transactionId, bucketNumber, tableId, shardUuids);
runJob(transactionId, tableSupportsDeltaDelete, bucketNumber, tableId, shardUuidsMap);
}
catch (Throwable e) {
shardManager.rollbackTransaction(transactionId);
throw e;
}
}

private void runJob(long transactionId, OptionalInt bucketNumber, long tableId, Set<UUID> shardUuids)
private void runJob(long transactionId, boolean tableSupportsDeltaDelete, OptionalInt bucketNumber, long tableId, Map<UUID, Optional<UUID>> shardUuidsMap)
throws IOException
{
// todo add logic in organization for delta or it may corrupt data
TableMetadata metadata = getTableMetadata(tableId);
List<ShardInfo> newShards = performCompaction(transactionId, bucketNumber, shardUuids, metadata);
log.info("Compacted shards %s into %s", shardUuids, newShards.stream().map(ShardInfo::getShardUuid).collect(toList()));
List<ShardInfo> newShards = performCompaction(transactionId, tableSupportsDeltaDelete, bucketNumber, shardUuidsMap, metadata);
log.info("Compacted shards %s into %s for table: %s",
shardUuidsMap,
newShards.stream().map(ShardInfo::getShardUuid).collect(toList()),
tableId);
// TODO: Will merge these new function with old function once new feature is stable
if (tableSupportsDeltaDelete) {
shardManager.replaceShardUuids(transactionId, tableId, metadata.getColumns(), shardUuidsMap, newShards, OptionalLong.empty(), true);
}
else {
shardManager.replaceShardUuids(transactionId, tableId, metadata.getColumns(), shardUuidsMap.keySet(), newShards, OptionalLong.empty());
}
}

private TableMetadata getTableMetadata(long tableId)
Expand All @@ -100,18 +110,36 @@ private TableMetadata getTableMetadata(long tableId)
return new TableMetadata(tableId, columns, sortColumnIds);
}

private List<ShardInfo> performCompaction(long transactionId, OptionalInt bucketNumber, Set<UUID> shardUuids, TableMetadata tableMetadata)
private List<ShardInfo> performCompaction(long transactionId, boolean tableSupportsDeltaDelete, OptionalInt bucketNumber, Map<UUID, Optional<UUID>> shardUuidsMap, TableMetadata tableMetadata)
throws IOException
{
if (tableMetadata.getSortColumnIds().isEmpty()) {
return compactor.compact(transactionId, bucketNumber, shardUuids, tableMetadata.getColumns());
return compactor.compact(transactionId, tableSupportsDeltaDelete, bucketNumber, shardUuidsMap, tableMetadata.getColumns());
}
return compactor.compactSorted(
transactionId,
tableSupportsDeltaDelete,
bucketNumber,
shardUuids,
shardUuidsMap,
tableMetadata.getColumns(),
tableMetadata.getSortColumnIds(),
nCopies(tableMetadata.getSortColumnIds().size(), ASC_NULLS_FIRST));
}

public int getPriority()
{
return organizationSet.getPriority();
}

@Override
public String toString()
{
return toStringHelper(this)
.add("metadataDao", metadataDao)
.add("shardManager", shardManager)
.add("compactor", compactor)
.add("organizationSet", organizationSet)
.add("priority", organizationSet.getPriority())
.toString();
}
}
Loading