diff --git a/presto-raptor/src/main/java/com/facebook/presto/raptor/RaptorSplitManager.java b/presto-raptor/src/main/java/com/facebook/presto/raptor/RaptorSplitManager.java index da6056deb1c2c..3c65e33bc2141 100644 --- a/presto-raptor/src/main/java/com/facebook/presto/raptor/RaptorSplitManager.java +++ b/presto-raptor/src/main/java/com/facebook/presto/raptor/RaptorSplitManager.java @@ -247,7 +247,12 @@ private ConnectorSplit createSplit(BucketShards bucketShards) throw new PrestoException(NO_NODES_AVAILABLE, "No nodes available to run query"); } Node node = selectRandom(availableNodes); - shardManager.replaceShardAssignment(tableId, shardUuid, node.getNodeIdentifier(), true); + shardManager.replaceShardAssignment( + tableId, + shardUuid, + deltaShardUuid, + node.getNodeIdentifier(), + true); addresses = ImmutableList.of(node.getHostAndPort()); } diff --git a/presto-raptor/src/main/java/com/facebook/presto/raptor/metadata/DatabaseShardManager.java b/presto-raptor/src/main/java/com/facebook/presto/raptor/metadata/DatabaseShardManager.java index 12fd6147b99f7..5d41a5cf1cdf6 100644 --- a/presto-raptor/src/main/java/com/facebook/presto/raptor/metadata/DatabaseShardManager.java +++ b/presto-raptor/src/main/java/com/facebook/presto/raptor/metadata/DatabaseShardManager.java @@ -957,6 +957,12 @@ public ShardMetadata getShard(UUID shardUuid) return dao.getShard(shardUuid); } + @Override + public Set getNodeShardsAndDeltas(String nodeIdentifier) + { + return dao.getNodeShardsAndDeltas(nodeIdentifier, null); + } + @Override public Set getNodeShards(String nodeIdentifier) { @@ -982,7 +988,7 @@ public ResultIterator getShardNodesBucketed(long tableId, boolean } @Override - public void replaceShardAssignment(long tableId, UUID shardUuid, String nodeIdentifier, boolean gracePeriod) + public void replaceShardAssignment(long tableId, UUID shardUuid, Optional deltaUuid, String nodeIdentifier, boolean gracePeriod) { if (gracePeriod && (nanosSince(startTime).compareTo(startupGracePeriod) < 0)) { throw new PrestoException(SERVER_STARTING_UP, "Cannot reassign shards while server is starting"); @@ -994,10 +1000,18 @@ public void replaceShardAssignment(long tableId, UUID shardUuid, String nodeIden ShardDao dao = shardDaoSupplier.attach(handle); Set oldAssignments = new HashSet<>(fetchLockedNodeIds(handle, tableId, shardUuid)); + + // 1. Update index table updateNodeIds(handle, tableId, shardUuid, ImmutableSet.of(nodeId)); + // 2. Update shards table dao.deleteShardNodes(shardUuid, oldAssignments); dao.insertShardNode(shardUuid, nodeId); + + if (deltaUuid.isPresent()) { + dao.deleteShardNodes(deltaUuid.get(), oldAssignments); + dao.insertShardNode(deltaUuid.get(), nodeId); + } return null; }); } diff --git a/presto-raptor/src/main/java/com/facebook/presto/raptor/metadata/ShardCleaner.java b/presto-raptor/src/main/java/com/facebook/presto/raptor/metadata/ShardCleaner.java index 5faa855898de9..3dfc542bf6cc6 100644 --- a/presto-raptor/src/main/java/com/facebook/presto/raptor/metadata/ShardCleaner.java +++ b/presto-raptor/src/main/java/com/facebook/presto/raptor/metadata/ShardCleaner.java @@ -379,7 +379,7 @@ synchronized void cleanLocalShardsImmediately(Set local) throws IOException { // get shards assigned to the local node - Set assigned = dao.getNodeShards(currentNode, null).stream() + Set assigned = dao.getNodeShardsAndDeltas(currentNode, null).stream() .map(ShardMetadata::getShardUuid) .collect(toSet()); @@ -402,7 +402,7 @@ synchronized void cleanLocalShards() Set local = getLocalShards(); // get shards assigned to the local node - Set assigned = dao.getNodeShards(currentNode, null).stream() + Set assigned = dao.getNodeShardsAndDeltas(currentNode, null).stream() .map(ShardMetadata::getShardUuid) .collect(toSet()); diff --git a/presto-raptor/src/main/java/com/facebook/presto/raptor/metadata/ShardDao.java b/presto-raptor/src/main/java/com/facebook/presto/raptor/metadata/ShardDao.java index d51ece99a2d60..82140d862db9b 100644 --- a/presto-raptor/src/main/java/com/facebook/presto/raptor/metadata/ShardDao.java +++ b/presto-raptor/src/main/java/com/facebook/presto/raptor/metadata/ShardDao.java @@ -87,6 +87,32 @@ public interface ShardDao " AND (s.table_id = :tableId OR :tableId IS NULL)\n" + ") x") @Mapper(ShardMetadata.Mapper.class) + Set getNodeShardsAndDeltas(@Bind("nodeIdentifier") String nodeIdentifier, @Bind("tableId") Long tableId); + + @SqlQuery("SELECT " + SHARD_METADATA_COLUMNS + "\n" + + "FROM (\n" + + " SELECT s.*\n" + + " FROM shards s\n" + + " JOIN shard_nodes sn ON (s.shard_id = sn.shard_id)\n" + + " JOIN nodes n ON (sn.node_id = n.node_id)\n" + + " WHERE n.node_identifier = :nodeIdentifier\n" + + " AND s.bucket_number IS NULL\n" + + " AND s.is_delta = false\n" + + " AND (s.table_id = :tableId OR :tableId IS NULL)\n" + + " UNION ALL\n" + + " SELECT s.*\n" + + " FROM shards s\n" + + " JOIN tables t ON (s.table_id = t.table_id)\n" + + " JOIN distributions d ON (t.distribution_id = d.distribution_id)\n" + + " JOIN buckets b ON (\n" + + " d.distribution_id = b.distribution_id AND\n" + + " s.bucket_number = b.bucket_number)\n" + + " JOIN nodes n ON (b.node_id = n.node_id)\n" + + " WHERE n.node_identifier = :nodeIdentifier\n" + + " AND s.is_delta = false\n" + + " AND (s.table_id = :tableId OR :tableId IS NULL)\n" + + ") x") + @Mapper(ShardMetadata.Mapper.class) Set getNodeShards(@Bind("nodeIdentifier") String nodeIdentifier, @Bind("tableId") Long tableId); @SqlQuery("SELECT n.node_identifier, x.bytes\n" + diff --git a/presto-raptor/src/main/java/com/facebook/presto/raptor/metadata/ShardManager.java b/presto-raptor/src/main/java/com/facebook/presto/raptor/metadata/ShardManager.java index 7377611853e52..9443aaefbac44 100644 --- a/presto-raptor/src/main/java/com/facebook/presto/raptor/metadata/ShardManager.java +++ b/presto-raptor/src/main/java/com/facebook/presto/raptor/metadata/ShardManager.java @@ -73,12 +73,19 @@ public interface ShardManager ShardMetadata getShard(UUID shardUuid); /** - * Get shard metadata for shards on a given node. + * Get shard and delta metadata for shards on a given node. + */ + Set getNodeShardsAndDeltas(String nodeIdentifier); + + /** + * Get only shard metadata for shards on a given node. + * Note: shard metadata will contain its delta */ Set getNodeShards(String nodeIdentifier); /** - * Get shard metadata for shards on a given node. + * Get only shard metadata for shards on a given node. + * Note: shard metadata will contain its delta */ Set getNodeShards(String nodeIdentifier, long tableId); @@ -95,7 +102,7 @@ public interface ShardManager /** * Remove all old shard assignments and assign a shard to a node */ - void replaceShardAssignment(long tableId, UUID shardUuid, String nodeIdentifier, boolean gracePeriod); + void replaceShardAssignment(long tableId, UUID shardUuid, Optional deltaUuid, String nodeIdentifier, boolean gracePeriod); /** * Get the number of bytes used by assigned shards per node. diff --git a/presto-raptor/src/main/java/com/facebook/presto/raptor/storage/ShardEjector.java b/presto-raptor/src/main/java/com/facebook/presto/raptor/storage/ShardEjector.java index 3b2ec0c97d219..6ee632b365320 100644 --- a/presto-raptor/src/main/java/com/facebook/presto/raptor/storage/ShardEjector.java +++ b/presto-raptor/src/main/java/com/facebook/presto/raptor/storage/ShardEjector.java @@ -215,7 +215,7 @@ void process() // only include nodes that are below threshold nodes = new HashMap<>(filterValues(nodes, size -> size <= averageSize)); - // get non-bucketed node shards by size, largest to smallest + // get non-bucketed node shards(only) by size, largest to smallest List shards = shardManager.getNodeShards(currentNode).stream() .filter(shard -> !shard.getBucketNumber().isPresent()) .sorted(comparingLong(ShardMetadata::getCompressedSize).reversed()) @@ -227,6 +227,7 @@ void process() ShardMetadata shard = queue.remove(); long shardSize = shard.getCompressedSize(); UUID shardUuid = shard.getShardUuid(); + Optional deltaUuid = shard.getDeltaUuid(); // verify backup exists if (!backupStore.get().shardExists(shardUuid)) { @@ -250,7 +251,7 @@ void process() nodeSize -= shardSize; // move assignment - shardManager.replaceShardAssignment(shard.getTableId(), shardUuid, target, false); + shardManager.replaceShardAssignment(shard.getTableId(), shardUuid, deltaUuid, target, false); // delete local file Path file = storageService.getStorageFile(shardUuid); diff --git a/presto-raptor/src/main/java/com/facebook/presto/raptor/storage/ShardRecoveryManager.java b/presto-raptor/src/main/java/com/facebook/presto/raptor/storage/ShardRecoveryManager.java index 7775d73603bba..142fbe51b0ad1 100644 --- a/presto-raptor/src/main/java/com/facebook/presto/raptor/storage/ShardRecoveryManager.java +++ b/presto-raptor/src/main/java/com/facebook/presto/raptor/storage/ShardRecoveryManager.java @@ -188,7 +188,7 @@ private synchronized void enqueueMissingShards() private Set getMissingShards() { - return shardManager.getNodeShards(nodeIdentifier).stream() + return shardManager.getNodeShardsAndDeltas(nodeIdentifier).stream() .filter(shard -> shardNeedsRecovery(shard.getShardUuid(), shard.getCompressedSize())) .collect(toSet()); } diff --git a/presto-raptor/src/main/java/com/facebook/presto/raptor/storage/organization/CompactionSetCreator.java b/presto-raptor/src/main/java/com/facebook/presto/raptor/storage/organization/CompactionSetCreator.java index b0014114dff7e..b5769b5dfbe05 100644 --- a/presto-raptor/src/main/java/com/facebook/presto/raptor/storage/organization/CompactionSetCreator.java +++ b/presto-raptor/src/main/java/com/facebook/presto/raptor/storage/organization/CompactionSetCreator.java @@ -70,20 +70,22 @@ private Set buildCompactionSets(Table tableInfo, Set builder = ImmutableSet.builder(); ImmutableSet.Builder compactionSets = ImmutableSet.builder(); + int priority = 0; for (ShardIndexInfo shard : shards) { if (((consumedBytes + shard.getUncompressedSize()) > maxShardSize.toBytes()) || (consumedRows + shard.getRowCount() > maxShardRows)) { // Finalize this compaction set, and start a new one for the rest of the shards Set shardsToCompact = builder.build(); + addToCompactionSets(compactionSets, shardsToCompact, tableId, tableInfo, priority); - if (shardsToCompact.size() > 1) { - compactionSets.add(createOrganizationSet(tableId, shardsToCompact)); - } - + priority = 0; builder = ImmutableSet.builder(); consumedBytes = 0; consumedRows = 0; } + if (shard.getDeltaUuid().isPresent()) { + priority += 1; + } builder.add(shard); consumedBytes += shard.getUncompressedSize(); consumedRows += shard.getRowCount(); @@ -91,12 +93,18 @@ private Set buildCompactionSets(Table tableInfo, Set shardsToCompact = builder.build(); - if (shardsToCompact.size() > 1) { - compactionSets.add(createOrganizationSet(tableId, shardsToCompact)); - } + addToCompactionSets(compactionSets, shardsToCompact, tableId, tableInfo, priority); return compactionSets.build(); } + private void addToCompactionSets(ImmutableSet.Builder compactionSets, Set shardsToCompact, long tableId, Table tableInfo, int priority) + { + // Add special rule for shard which is too big to compact with other shards but have delta to compact + if (shardsToCompact.size() > 1 || shardsToCompact.stream().anyMatch(shard -> shard.getDeltaUuid().isPresent())) { + compactionSets.add(createOrganizationSet(tableId, tableInfo.isTableSupportsDeltaDelete(), shardsToCompact, priority)); + } + } + private static Comparator getShardIndexInfoComparator(Table tableInfo) { if (!tableInfo.getTemporalColumnId().isPresent()) { diff --git a/presto-raptor/src/main/java/com/facebook/presto/raptor/storage/organization/OrganizationJob.java b/presto-raptor/src/main/java/com/facebook/presto/raptor/storage/organization/OrganizationJob.java index c54916cfc205b..2470dd7153d3f 100644 --- a/presto-raptor/src/main/java/com/facebook/presto/raptor/storage/organization/OrganizationJob.java +++ b/presto-raptor/src/main/java/com/facebook/presto/raptor/storage/organization/OrganizationJob.java @@ -24,11 +24,14 @@ import java.io.IOException; import java.io.UncheckedIOException; import java.util.List; +import java.util.Map; +import java.util.Optional; import java.util.OptionalInt; -import java.util.Set; +import java.util.OptionalLong; import java.util.UUID; import static com.facebook.presto.spi.block.SortOrder.ASC_NULLS_FIRST; +import static com.google.common.base.MoreObjects.toStringHelper; import static java.util.Collections.nCopies; import static java.util.Objects.requireNonNull; import static java.util.stream.Collectors.toList; @@ -55,21 +58,19 @@ public OrganizationJob(OrganizationSet organizationSet, MetadataDao metadataDao, public void run() { try { - runJob(organizationSet.getTableId(), organizationSet.getBucketNumber(), organizationSet.getShards()); + runJob(organizationSet.getTableId(), organizationSet.isTableSupportsDeltaDelete(), organizationSet.getBucketNumber(), organizationSet.getShardsMap()); } catch (IOException e) { throw new UncheckedIOException(e); } } - private void runJob(long tableId, OptionalInt bucketNumber, Set shardUuids) + private void runJob(long tableId, boolean tableSupportsDeltaDelete, OptionalInt bucketNumber, Map> shardUuidsMap) throws IOException { long transactionId = shardManager.beginTransaction(); try { - // todo add logic in organization for delta or it may corrupt data - return; - //runJob(transactionId, bucketNumber, tableId, shardUuids); + runJob(transactionId, tableSupportsDeltaDelete, bucketNumber, tableId, shardUuidsMap); } catch (Throwable e) { shardManager.rollbackTransaction(transactionId); @@ -77,13 +78,22 @@ private void runJob(long tableId, OptionalInt bucketNumber, Set shardUuids } } - private void runJob(long transactionId, OptionalInt bucketNumber, long tableId, Set shardUuids) + private void runJob(long transactionId, boolean tableSupportsDeltaDelete, OptionalInt bucketNumber, long tableId, Map> shardUuidsMap) throws IOException { - // todo add logic in organization for delta or it may corrupt data TableMetadata metadata = getTableMetadata(tableId); - List newShards = performCompaction(transactionId, bucketNumber, shardUuids, metadata); - log.info("Compacted shards %s into %s", shardUuids, newShards.stream().map(ShardInfo::getShardUuid).collect(toList())); + List newShards = performCompaction(transactionId, tableSupportsDeltaDelete, bucketNumber, shardUuidsMap, metadata); + log.info("Compacted shards %s into %s for table: %s", + shardUuidsMap, + newShards.stream().map(ShardInfo::getShardUuid).collect(toList()), + tableId); + // TODO: Will merge these new function with old function once new feature is stable + if (tableSupportsDeltaDelete) { + shardManager.replaceShardUuids(transactionId, tableId, metadata.getColumns(), shardUuidsMap, newShards, OptionalLong.empty(), true); + } + else { + shardManager.replaceShardUuids(transactionId, tableId, metadata.getColumns(), shardUuidsMap.keySet(), newShards, OptionalLong.empty()); + } } private TableMetadata getTableMetadata(long tableId) @@ -100,18 +110,36 @@ private TableMetadata getTableMetadata(long tableId) return new TableMetadata(tableId, columns, sortColumnIds); } - private List performCompaction(long transactionId, OptionalInt bucketNumber, Set shardUuids, TableMetadata tableMetadata) + private List performCompaction(long transactionId, boolean tableSupportsDeltaDelete, OptionalInt bucketNumber, Map> shardUuidsMap, TableMetadata tableMetadata) throws IOException { if (tableMetadata.getSortColumnIds().isEmpty()) { - return compactor.compact(transactionId, bucketNumber, shardUuids, tableMetadata.getColumns()); + return compactor.compact(transactionId, tableSupportsDeltaDelete, bucketNumber, shardUuidsMap, tableMetadata.getColumns()); } return compactor.compactSorted( transactionId, + tableSupportsDeltaDelete, bucketNumber, - shardUuids, + shardUuidsMap, tableMetadata.getColumns(), tableMetadata.getSortColumnIds(), nCopies(tableMetadata.getSortColumnIds().size(), ASC_NULLS_FIRST)); } + + public int getPriority() + { + return organizationSet.getPriority(); + } + + @Override + public String toString() + { + return toStringHelper(this) + .add("metadataDao", metadataDao) + .add("shardManager", shardManager) + .add("compactor", compactor) + .add("organizationSet", organizationSet) + .add("priority", organizationSet.getPriority()) + .toString(); + } } diff --git a/presto-raptor/src/main/java/com/facebook/presto/raptor/storage/organization/OrganizationSet.java b/presto-raptor/src/main/java/com/facebook/presto/raptor/storage/organization/OrganizationSet.java index c85f2ba0a5de8..2f7ffdfb4e943 100644 --- a/presto-raptor/src/main/java/com/facebook/presto/raptor/storage/organization/OrganizationSet.java +++ b/presto-raptor/src/main/java/com/facebook/presto/raptor/storage/organization/OrganizationSet.java @@ -13,25 +13,32 @@ */ package com.facebook.presto.raptor.storage.organization; +import java.util.Map; import java.util.Objects; +import java.util.Optional; import java.util.OptionalInt; import java.util.Set; import java.util.UUID; import static com.google.common.base.MoreObjects.toStringHelper; import static java.util.Objects.requireNonNull; +import static java.util.stream.Collectors.toSet; public class OrganizationSet { private final long tableId; - private final Set shards; + private final boolean tableSupportsDeltaDelete; + private final Map> shardsMap; private final OptionalInt bucketNumber; + private final int priority; - public OrganizationSet(long tableId, Set shards, OptionalInt bucketNumber) + public OrganizationSet(long tableId, boolean tableSupportsDeltaDelete, Map> shardsMap, OptionalInt bucketNumber, int priority) { this.tableId = tableId; - this.shards = requireNonNull(shards, "shards is null"); + this.tableSupportsDeltaDelete = tableSupportsDeltaDelete; + this.shardsMap = requireNonNull(shardsMap, "shards is null"); this.bucketNumber = requireNonNull(bucketNumber, "bucketNumber is null"); + this.priority = priority; } public long getTableId() @@ -39,9 +46,19 @@ public long getTableId() return tableId; } + public boolean isTableSupportsDeltaDelete() + { + return tableSupportsDeltaDelete; + } + + public Map> getShardsMap() + { + return shardsMap; + } + public Set getShards() { - return shards; + return shardsMap.keySet(); } public OptionalInt getBucketNumber() @@ -49,6 +66,11 @@ public OptionalInt getBucketNumber() return bucketNumber; } + public int getPriority() + { + return priority; + } + @Override public boolean equals(Object o) { @@ -60,14 +82,15 @@ public boolean equals(Object o) } OrganizationSet that = (OrganizationSet) o; return tableId == that.tableId && - Objects.equals(shards, that.shards) && + tableSupportsDeltaDelete == that.tableSupportsDeltaDelete && + Objects.equals(shardsMap, that.shardsMap) && Objects.equals(bucketNumber, that.bucketNumber); } @Override public int hashCode() { - return Objects.hash(tableId, shards, bucketNumber); + return Objects.hash(tableId, tableSupportsDeltaDelete, shardsMap, bucketNumber); } @Override @@ -75,7 +98,10 @@ public String toString() { return toStringHelper(this) .add("tableId", tableId) - .add("shards", shards) + .add("tableSupportsDeltaDelete", tableSupportsDeltaDelete) + .add("shardSize", shardsMap.size()) + .add("deltaSize", shardsMap.values().stream().filter(Optional::isPresent).collect(toSet()).size()) + .add("priority", priority) .add("bucketNumber", bucketNumber.isPresent() ? bucketNumber.getAsInt() : null) .omitNullValues() .toString(); diff --git a/presto-raptor/src/main/java/com/facebook/presto/raptor/storage/organization/ShardCompactionManager.java b/presto-raptor/src/main/java/com/facebook/presto/raptor/storage/organization/ShardCompactionManager.java index 651801c571a2f..cafcdc2285d7f 100644 --- a/presto-raptor/src/main/java/com/facebook/presto/raptor/storage/organization/ShardCompactionManager.java +++ b/presto-raptor/src/main/java/com/facebook/presto/raptor/storage/organization/ShardCompactionManager.java @@ -179,7 +179,7 @@ private void discoverShards() } List shards = entry.getValue(); Collection organizationSets = filterAndCreateCompactionSets(tableId, shards); - log.info("Created %s organization set(s) for table ID %s", organizationSets.size(), tableId); + log.info("Created %s compaction set(s) from %s shards for table ID %s", organizationSets.size(), shards.size(), tableId); for (OrganizationSet set : organizationSets) { organizer.enqueue(set); @@ -232,6 +232,10 @@ private boolean needsCompaction(ShardMetadata shard) if (shard.getRowCount() < (FILL_FACTOR * maxShardRows)) { return true; } + + if (shard.getDeltaUuid().isPresent()) { + return true; + } return false; } } diff --git a/presto-raptor/src/main/java/com/facebook/presto/raptor/storage/organization/ShardCompactor.java b/presto-raptor/src/main/java/com/facebook/presto/raptor/storage/organization/ShardCompactor.java index 38b5414f436a8..d2bc5d2b1fda9 100644 --- a/presto-raptor/src/main/java/com/facebook/presto/raptor/storage/organization/ShardCompactor.java +++ b/presto-raptor/src/main/java/com/facebook/presto/raptor/storage/organization/ShardCompactor.java @@ -37,12 +37,12 @@ import java.io.IOException; import java.util.Iterator; import java.util.List; +import java.util.Map; import java.util.NoSuchElementException; import java.util.Optional; import java.util.OptionalInt; import java.util.PriorityQueue; import java.util.Queue; -import java.util.Set; import java.util.UUID; import static com.facebook.airlift.concurrent.MoreFutures.getFutureValue; @@ -50,14 +50,17 @@ import static io.airlift.units.Duration.nanosSince; import static java.util.Objects.requireNonNull; import static java.util.stream.Collectors.toList; +import static java.util.stream.Collectors.toSet; public final class ShardCompactor { private final StorageManager storageManager; private final CounterStat inputShards = new CounterStat(); + private final CounterStat inputDeltaShards = new CounterStat(); private final CounterStat outputShards = new CounterStat(); private final DistributionStat inputShardsPerCompaction = new DistributionStat(); + private final DistributionStat inputDeltaShardsPerCompaction = new DistributionStat(); private final DistributionStat outputShardsPerCompaction = new DistributionStat(); private final DistributionStat compactionLatencyMillis = new DistributionStat(); private final DistributionStat sortedCompactionLatencyMillis = new DistributionStat(); @@ -70,7 +73,7 @@ public ShardCompactor(StorageManager storageManager, ReaderAttributes readerAttr this.readerAttributes = requireNonNull(readerAttributes, "readerAttributes is null"); } - public List compact(long transactionId, OptionalInt bucketNumber, Set uuids, List columns) + public List compact(long transactionId, boolean tableSupportsDeltaDelete, OptionalInt bucketNumber, Map> uuidsMap, List columns) throws IOException { long start = System.nanoTime(); @@ -81,23 +84,41 @@ public List compact(long transactionId, OptionalInt bucketNumber, Set List shardInfos; try { - shardInfos = compact(storagePageSink, bucketNumber, uuids, columnIds, columnTypes); + shardInfos = compact(storagePageSink, tableSupportsDeltaDelete, bucketNumber, uuidsMap, columnIds, columnTypes); } catch (IOException | RuntimeException e) { storagePageSink.rollback(); throw e; } - updateStats(uuids.size(), shardInfos.size(), nanosSince(start).toMillis()); + int deltaCount = uuidsMap.values().stream().filter(Optional::isPresent).collect(toSet()).size(); + updateStats(uuidsMap.size(), deltaCount, shardInfos.size(), nanosSince(start).toMillis()); + return shardInfos; } - private List compact(StoragePageSink storagePageSink, OptionalInt bucketNumber, Set uuids, List columnIds, List columnTypes) + private List compact( + StoragePageSink storagePageSink, + boolean tableSupportsDeltaDelete, + OptionalInt bucketNumber, + Map> uuidsMap, + List columnIds, + List columnTypes) throws IOException { - for (UUID uuid : uuids) { - // todo add logic in organization for delta or it may corrupt data - try (ConnectorPageSource pageSource = storageManager.getPageSource(FileSystemContext.DEFAULT_RAPTOR_CONTEXT, uuid, Optional.empty(), false, bucketNumber, columnIds, columnTypes, TupleDomain.all(), readerAttributes)) { + for (Map.Entry> entry : uuidsMap.entrySet()) { + UUID uuid = entry.getKey(); + Optional deltaUuid = entry.getValue(); + try (ConnectorPageSource pageSource = storageManager.getPageSource( + FileSystemContext.DEFAULT_RAPTOR_CONTEXT, + uuid, + deltaUuid, + tableSupportsDeltaDelete, + bucketNumber, + columnIds, + columnTypes, + TupleDomain.all(), + readerAttributes)) { while (!pageSource.isFinished()) { Page page = pageSource.getNextPage(); if (isNullOrEmptyPage(page)) { @@ -113,7 +134,14 @@ private List compact(StoragePageSink storagePageSink, OptionalInt buc return getFutureValue(storagePageSink.commit()); } - public List compactSorted(long transactionId, OptionalInt bucketNumber, Set uuids, List columns, List sortColumnIds, List sortOrders) + public List compactSorted( + long transactionId, + boolean tableSupportsDeltaDelete, + OptionalInt bucketNumber, + Map> uuidsMap, + List columns, + List sortColumnIds, + List sortOrders) throws IOException { checkArgument(sortColumnIds.size() == sortOrders.size(), "sortColumnIds and sortOrders must be of the same size"); @@ -132,12 +160,21 @@ public List compactSorted(long transactionId, OptionalInt bucketNumbe Queue rowSources = new PriorityQueue<>(); StoragePageSink outputPageSink = storageManager.createStoragePageSink(FileSystemContext.DEFAULT_RAPTOR_CONTEXT, transactionId, bucketNumber, columnIds, columnTypes, false); try { - // todo add logic in organization for delta or it may corrupt data - for (UUID uuid : uuids) { - ConnectorPageSource pageSource = storageManager.getPageSource(FileSystemContext.DEFAULT_RAPTOR_CONTEXT, uuid, Optional.empty(), false, bucketNumber, columnIds, columnTypes, TupleDomain.all(), readerAttributes); + uuidsMap.forEach((uuid, deltaUuid) -> { + ConnectorPageSource pageSource = storageManager.getPageSource( + FileSystemContext.DEFAULT_RAPTOR_CONTEXT, + uuid, + deltaUuid, + tableSupportsDeltaDelete, + bucketNumber, + columnIds, + columnTypes, + TupleDomain.all(), + readerAttributes); SortedPageSource rowSource = new SortedPageSource(pageSource, columnTypes, sortIndexes, sortOrders); rowSources.add(rowSource); - } + }); + while (!rowSources.isEmpty()) { SortedPageSource rowSource = rowSources.poll(); if (!rowSource.hasNext()) { @@ -157,7 +194,8 @@ public List compactSorted(long transactionId, OptionalInt bucketNumbe outputPageSink.flush(); List shardInfos = getFutureValue(outputPageSink.commit()); - updateStats(uuids.size(), shardInfos.size(), nanosSince(start).toMillis()); + int deltaCount = uuidsMap.values().stream().filter(Optional::isPresent).collect(toSet()).size(); + updateStats(uuidsMap.size(), deltaCount, shardInfos.size(), nanosSince(start).toMillis()); return shardInfos; } @@ -289,12 +327,14 @@ private static boolean isNullOrEmptyPage(Page nextPage) return nextPage == null || nextPage.getPositionCount() == 0; } - private void updateStats(int inputShardsCount, int outputShardsCount, long latency) + private void updateStats(int inputShardsCount, int inputDeltaShardsCount, int outputShardsCount, long latency) { inputShards.update(inputShardsCount); + inputDeltaShards.update(inputDeltaShardsCount); outputShards.update(outputShardsCount); inputShardsPerCompaction.add(inputShardsCount); + inputDeltaShardsPerCompaction.add(inputDeltaShardsCount); outputShardsPerCompaction.add(outputShardsCount); compactionLatencyMillis.add(latency); @@ -307,6 +347,13 @@ public CounterStat getInputShards() return inputShards; } + @Managed + @Nested + public CounterStat getInputDeltaShards() + { + return inputDeltaShards; + } + @Managed @Nested public CounterStat getOutputShards() @@ -321,6 +368,13 @@ public DistributionStat getInputShardsPerCompaction() return inputShardsPerCompaction; } + @Managed + @Nested + public DistributionStat getInputDeltaShardsPerCompaction() + { + return inputDeltaShardsPerCompaction; + } + @Managed @Nested public DistributionStat getOutputShardsPerCompaction() diff --git a/presto-raptor/src/main/java/com/facebook/presto/raptor/storage/organization/ShardIndexInfo.java b/presto-raptor/src/main/java/com/facebook/presto/raptor/storage/organization/ShardIndexInfo.java index a875f3009da61..d938b99bbfa14 100644 --- a/presto-raptor/src/main/java/com/facebook/presto/raptor/storage/organization/ShardIndexInfo.java +++ b/presto-raptor/src/main/java/com/facebook/presto/raptor/storage/organization/ShardIndexInfo.java @@ -26,6 +26,8 @@ public class ShardIndexInfo private final long tableId; private final OptionalInt bucketNumber; private final UUID shardUuid; + private final boolean isDelta; + private final Optional deltaUuid; private final long rowCount; private final long uncompressedSize; private final Optional sortRange; @@ -35,6 +37,8 @@ public ShardIndexInfo( long tableId, OptionalInt bucketNumber, UUID shardUuid, + boolean isDelta, + Optional deltaUuid, long rowCount, long uncompressedSize, Optional sortRange, @@ -43,6 +47,8 @@ public ShardIndexInfo( this.tableId = tableId; this.bucketNumber = requireNonNull(bucketNumber, "bucketNumber is null"); this.shardUuid = requireNonNull(shardUuid, "shardUuid is null"); + this.isDelta = isDelta; + this.deltaUuid = requireNonNull(deltaUuid, "Optional is null"); this.rowCount = rowCount; this.uncompressedSize = uncompressedSize; this.sortRange = requireNonNull(sortRange, "sortRange is null"); @@ -64,6 +70,16 @@ public UUID getShardUuid() return shardUuid; } + public boolean isDelta() + { + return isDelta; + } + + public Optional getDeltaUuid() + { + return deltaUuid; + } + public long getRowCount() { return rowCount; @@ -97,6 +113,8 @@ public boolean equals(Object o) return tableId == that.tableId && rowCount == that.rowCount && uncompressedSize == that.uncompressedSize && + isDelta == that.isDelta && + Objects.equals(deltaUuid, that.deltaUuid) && Objects.equals(bucketNumber, that.bucketNumber) && Objects.equals(shardUuid, that.shardUuid) && Objects.equals(sortRange, that.sortRange) && @@ -106,7 +124,7 @@ public boolean equals(Object o) @Override public int hashCode() { - return Objects.hash(tableId, bucketNumber, shardUuid, rowCount, uncompressedSize, sortRange, temporalRange); + return Objects.hash(tableId, bucketNumber, shardUuid, rowCount, isDelta, deltaUuid, uncompressedSize, sortRange, temporalRange); } @Override @@ -116,6 +134,8 @@ public String toString() .add("tableId", tableId) .add("bucketNumber", bucketNumber.isPresent() ? bucketNumber.getAsInt() : null) .add("shardUuid", shardUuid) + .add("isDelta", isDelta) + .add("deltaUuid", deltaUuid.orElse(null)) .add("rowCount", rowCount) .add("uncompressedSize", uncompressedSize) .add("sortRange", sortRange.orElse(null)) diff --git a/presto-raptor/src/main/java/com/facebook/presto/raptor/storage/organization/ShardOrganizationManager.java b/presto-raptor/src/main/java/com/facebook/presto/raptor/storage/organization/ShardOrganizationManager.java index 025e5f3865c17..a67b2caede55f 100644 --- a/presto-raptor/src/main/java/com/facebook/presto/raptor/storage/organization/ShardOrganizationManager.java +++ b/presto-raptor/src/main/java/com/facebook/presto/raptor/storage/organization/ShardOrganizationManager.java @@ -26,6 +26,8 @@ import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableSet; import com.google.common.collect.Maps; +import com.google.common.util.concurrent.ListenableFuture; +import com.google.common.util.concurrent.MoreExecutors; import io.airlift.units.Duration; import org.skife.jdbi.v2.IDBI; @@ -38,13 +40,11 @@ import java.util.List; import java.util.Map; import java.util.Set; -import java.util.concurrent.CompletableFuture; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.ThreadLocalRandom; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; -import static com.facebook.airlift.concurrent.MoreFutures.allAsList; import static com.facebook.airlift.concurrent.Threads.daemonThreadsNamed; import static com.facebook.presto.raptor.storage.organization.ShardOrganizerUtil.createOrganizationSet; import static com.facebook.presto.raptor.storage.organization.ShardOrganizerUtil.getOrganizationEligibleShards; @@ -208,15 +208,16 @@ private void runOrganization(long tableId) long lastStartTime = System.currentTimeMillis(); tablesInProgress.add(tableId); - ImmutableList.Builder> futures = ImmutableList.builder(); + ImmutableList.Builder> futures = ImmutableList.builder(); for (OrganizationSet organizationSet : organizationSets) { futures.add(organizer.enqueue(organizationSet)); } - allAsList(futures.build()) - .whenComplete((value, throwable) -> { + futures.build().forEach(listenableFuture -> listenableFuture.addListener( + () -> { tablesInProgress.remove(tableId); organizerDao.updateLastStartTime(currentNodeIdentifier, tableId, lastStartTime); - }); + }, + MoreExecutors.directExecutor())); } private boolean shouldRunOrganization(TableOrganizationInfo info) @@ -279,7 +280,7 @@ private static Set getOverlappingOrganizationSets(Table tableIn else { Set indexInfos = builder.build(); if (indexInfos.size() > 1) { - organizationSets.add(createOrganizationSet(tableInfo.getTableId(), indexInfos)); + organizationSets.add(createOrganizationSet(tableInfo.getTableId(), tableInfo.isTableSupportsDeltaDelete(), indexInfos, 0)); } builder = ImmutableSet.builder(); previousRange = nextRange; @@ -290,7 +291,7 @@ private static Set getOverlappingOrganizationSets(Table tableIn Set indexInfos = builder.build(); if (indexInfos.size() > 1) { - organizationSets.add(createOrganizationSet(tableInfo.getTableId(), indexInfos)); + organizationSets.add(createOrganizationSet(tableInfo.getTableId(), tableInfo.isTableSupportsDeltaDelete(), indexInfos, 0)); } return organizationSets; } diff --git a/presto-raptor/src/main/java/com/facebook/presto/raptor/storage/organization/ShardOrganizer.java b/presto-raptor/src/main/java/com/facebook/presto/raptor/storage/organization/ShardOrganizer.java index e85fed5722119..756eb6f9c0679 100644 --- a/presto-raptor/src/main/java/com/facebook/presto/raptor/storage/organization/ShardOrganizer.java +++ b/presto-raptor/src/main/java/com/facebook/presto/raptor/storage/organization/ShardOrganizer.java @@ -17,41 +17,51 @@ import com.facebook.airlift.log.Logger; import com.facebook.airlift.stats.CounterStat; import com.facebook.presto.raptor.storage.StorageManagerConfig; +import com.facebook.presto.raptor.util.PrioritizedFifoExecutor; +import com.google.common.annotations.VisibleForTesting; +import com.google.common.util.concurrent.FutureCallback; +import com.google.common.util.concurrent.Futures; +import com.google.common.util.concurrent.ListenableFuture; +import com.google.common.util.concurrent.MoreExecutors; import org.weakref.jmx.Managed; import org.weakref.jmx.Nested; import javax.annotation.PreDestroy; import javax.inject.Inject; -import java.util.Set; +import java.util.Comparator; +import java.util.Map; +import java.util.Optional; import java.util.UUID; -import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ExecutorService; import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.atomic.AtomicBoolean; import static com.facebook.airlift.concurrent.Threads.daemonThreadsNamed; import static com.google.common.base.Preconditions.checkArgument; -import static com.google.common.collect.Sets.newConcurrentHashSet; import static java.util.Objects.requireNonNull; -import static java.util.concurrent.CompletableFuture.runAsync; -import static java.util.concurrent.Executors.newFixedThreadPool; +import static java.util.concurrent.Executors.newCachedThreadPool; +import static java.util.stream.Collectors.toSet; public class ShardOrganizer { private static final Logger log = Logger.get(ShardOrganizer.class); private final ExecutorService executorService; + private final PrioritizedFifoExecutor prioritizedFifoExecutor; private final ThreadPoolExecutorMBean executorMBean; private final AtomicBoolean shutdown = new AtomicBoolean(); // Tracks shards that are scheduled for compaction so that we do not schedule them more than once - private final Set shardsInProgress = newConcurrentHashSet(); + private final Map> shardsInProgress = new ConcurrentHashMap<>(); private final JobFactory jobFactory; private final CounterStat successCount = new CounterStat(); private final CounterStat failureCount = new CounterStat(); + private int deltaCountInProgress; + @Inject public ShardOrganizer(JobFactory jobFactory, StorageManagerConfig config) { @@ -62,7 +72,8 @@ public ShardOrganizer(JobFactory jobFactory, int threads) { checkArgument(threads > 0, "threads must be > 0"); this.jobFactory = requireNonNull(jobFactory, "jobFactory is null"); - this.executorService = newFixedThreadPool(threads, daemonThreadsNamed("shard-organizer-%s")); + this.executorService = newCachedThreadPool(daemonThreadsNamed("shard-organizer-%s")); + this.prioritizedFifoExecutor = new PrioritizedFifoExecutor(executorService, threads, new OrganizationJobComparator()); this.executorMBean = new ThreadPoolExecutorMBean((ThreadPoolExecutor) executorService); } @@ -74,25 +85,43 @@ public void shutdown() } } - public CompletableFuture enqueue(OrganizationSet organizationSet) + public ListenableFuture enqueue(OrganizationSet organizationSet) { - shardsInProgress.addAll(organizationSet.getShards()); - return runAsync(jobFactory.create(organizationSet), executorService) - .whenComplete((none, throwable) -> { - shardsInProgress.removeAll(organizationSet.getShards()); - if (throwable == null) { + log.info("enqueue organizationSet: %s", organizationSet); + shardsInProgress.putAll(organizationSet.getShardsMap()); + deltaCountInProgress = shardsInProgress.values().stream().filter(Optional::isPresent).collect(toSet()).size(); + ListenableFuture listenableFuture = prioritizedFifoExecutor.submit(jobFactory.create(organizationSet)); + listenableFuture.addListener( + () -> { + for (UUID uuid : organizationSet.getShardsMap().keySet()) { + shardsInProgress.remove(uuid); + deltaCountInProgress = shardsInProgress.values().stream().filter(Optional::isPresent).collect(toSet()).size(); + } + }, + MoreExecutors.directExecutor()); + Futures.addCallback( + listenableFuture, + new FutureCallback() { + @Override + public void onSuccess(Object o) + { successCount.update(1); } - else { + + @Override + public void onFailure(Throwable throwable) + { log.warn(throwable, "Error running organization job"); failureCount.update(1); } - }); + }, + MoreExecutors.directExecutor()); + return listenableFuture; } public boolean inProgress(UUID shardUuid) { - return shardsInProgress.contains(shardUuid); + return shardsInProgress.containsKey(shardUuid); } @Managed @@ -108,6 +137,12 @@ public int getShardsInProgress() return shardsInProgress.size(); } + @Managed + public int getDeltaCountInProgress() + { + return deltaCountInProgress; + } + @Managed @Nested public CounterStat getSuccessCount() @@ -121,4 +156,15 @@ public CounterStat getFailureCount() { return failureCount; } + + @VisibleForTesting + static class OrganizationJobComparator + implements Comparator + { + @Override + public int compare(OrganizationJob left, OrganizationJob right) + { + return right.getPriority() - left.getPriority(); + } + } } diff --git a/presto-raptor/src/main/java/com/facebook/presto/raptor/storage/organization/ShardOrganizerUtil.java b/presto-raptor/src/main/java/com/facebook/presto/raptor/storage/organization/ShardOrganizerUtil.java index 8826fee7d2eee..70985b4986336 100644 --- a/presto-raptor/src/main/java/com/facebook/presto/raptor/storage/organization/ShardOrganizerUtil.java +++ b/presto-raptor/src/main/java/com/facebook/presto/raptor/storage/organization/ShardOrganizerUtil.java @@ -42,6 +42,7 @@ import static com.facebook.presto.raptor.metadata.DatabaseShardManager.shardIndexTable; import static com.facebook.presto.raptor.storage.ColumnIndexStatsUtils.jdbcType; import static com.google.common.base.Preconditions.checkArgument; +import static com.google.common.collect.ImmutableMap.toImmutableMap; import static com.google.common.collect.Iterables.getOnlyElement; import static com.google.common.collect.Iterables.partition; import static com.google.common.collect.Maps.uniqueIndex; @@ -137,6 +138,8 @@ private static ShardIndexInfo toShardIndexInfo(ShardMetadata shardMetadata, Opti shardMetadata.getTableId(), shardMetadata.getBucketNumber(), shardMetadata.getShardUuid(), + shardMetadata.isDelta(), + shardMetadata.getDeltaUuid(), shardMetadata.getRowCount(), shardMetadata.getUncompressedSize(), sortRange, @@ -238,17 +241,16 @@ private static Object getValue(ResultSet resultSet, Type type, String columnName throw new IllegalArgumentException("Unhandled type: " + type); } - static OrganizationSet createOrganizationSet(long tableId, Set shardsToCompact) + static OrganizationSet createOrganizationSet(long tableId, boolean tableSupportsDeltaDelete, Set shardsToCompact, int priority) { - Set uuids = shardsToCompact.stream() - .map(ShardIndexInfo::getShardUuid) - .collect(toSet()); + Map> uuidsMap = shardsToCompact.stream() + .collect(toImmutableMap(ShardIndexInfo::getShardUuid, ShardIndexInfo::getDeltaUuid)); Set bucketNumber = shardsToCompact.stream() .map(ShardIndexInfo::getBucketNumber) .collect(toSet()); checkArgument(bucketNumber.size() == 1); - return new OrganizationSet(tableId, uuids, getOnlyElement(bucketNumber)); + return new OrganizationSet(tableId, tableSupportsDeltaDelete, uuidsMap, getOnlyElement(bucketNumber), priority); } } diff --git a/presto-raptor/src/test/java/com/facebook/presto/raptor/metadata/TestDatabaseShardManager.java b/presto-raptor/src/test/java/com/facebook/presto/raptor/metadata/TestDatabaseShardManager.java index 6ad466efa0e09..beb12e3bce725 100644 --- a/presto-raptor/src/test/java/com/facebook/presto/raptor/metadata/TestDatabaseShardManager.java +++ b/presto-raptor/src/test/java/com/facebook/presto/raptor/metadata/TestDatabaseShardManager.java @@ -222,7 +222,7 @@ public void testAssignShard() assertEquals(actual, new ShardNodes(shard, Optional.empty(), ImmutableSet.of("node1"))); try { - shardManager.replaceShardAssignment(tableId, shard, "node2", true); + shardManager.replaceShardAssignment(tableId, shard, Optional.empty(), "node2", true); fail("expected exception"); } catch (PrestoException e) { @@ -230,13 +230,13 @@ public void testAssignShard() } // replace shard assignment to another node - shardManager.replaceShardAssignment(tableId, shard, "node2", false); + shardManager.replaceShardAssignment(tableId, shard, Optional.empty(), "node2", false); actual = getOnlyElement(getShardNodes(tableId, TupleDomain.all())); assertEquals(actual, new ShardNodes(shard, Optional.empty(), ImmutableSet.of("node2"))); // replacing shard assignment should be idempotent - shardManager.replaceShardAssignment(tableId, shard, "node2", false); + shardManager.replaceShardAssignment(tableId, shard, Optional.empty(), "node2", false); actual = getOnlyElement(getShardNodes(tableId, TupleDomain.all())); assertEquals(actual, new ShardNodes(shard, Optional.empty(), ImmutableSet.of("node2"))); @@ -266,7 +266,7 @@ public void testGetNodeBytes() assertEquals(shardManager.getNodeBytes(), ImmutableMap.of("node1", 88L)); - shardManager.replaceShardAssignment(tableId, shard1, "node2", false); + shardManager.replaceShardAssignment(tableId, shard1, Optional.empty(), "node2", false); assertEquals(getShardNodes(tableId, TupleDomain.all()), ImmutableSet.of( new ShardNodes(shard1, Optional.empty(), ImmutableSet.of("node2")), @@ -296,7 +296,7 @@ public void testGetNodeTableShards() shardManager.commitShards(transactionId, tableId, columns, inputShards.build(), Optional.empty(), 0); for (String node : nodes) { - Set shardMetadata = shardManager.getNodeShards(node); + Set shardMetadata = shardManager.getNodeShardsAndDeltas(node); Set expectedUuids = ImmutableSet.copyOf(nodeShardMap.get(node)); Set actualUuids = shardMetadata.stream().map(ShardMetadata::getShardUuid).collect(toSet()); assertEquals(actualUuids, expectedUuids); @@ -352,7 +352,7 @@ public void testReplaceShardUuidsFunction() ShardInfo newShardInfo4 = new ShardInfo(newUuid5, OptionalInt.empty(), ImmutableSet.of("node1"), ImmutableList.of(), 5, 5, 5, 5); // toReplace - Set shardMetadata = shardManager.getNodeShards("node1"); + Set shardMetadata = shardManager.getNodeShardsAndDeltas("node1"); Set replacedUuids = shardMetadata.stream().map(ShardMetadata::getShardUuid).collect(toSet()); Map> replaceUuidMap = replacedUuids.stream().collect(Collectors.toMap(uuid -> uuid, uuid -> Optional.empty())); @@ -360,7 +360,7 @@ public void testReplaceShardUuidsFunction() shardManager.replaceShardUuids(transactionId, tableId, columns, replaceUuidMap, ImmutableList.of(newShardInfo4), OptionalLong.of(0), true); // check shards on this node1 are correct - shardMetadata = shardManager.getNodeShards("node1"); + shardMetadata = shardManager.getNodeShardsAndDeltas("node1"); assertEquals(shardMetadata.size(), 1); for (ShardMetadata actual : shardMetadata) { assertEquals(actual.getShardUuid(), newUuid5); @@ -422,7 +422,7 @@ public void testReplaceShardUuids() .build(); // toReplace - Set shardMetadata = shardManager.getNodeShards(nodes.get(0)); + Set shardMetadata = shardManager.getNodeShardsAndDeltas(nodes.get(0)); Set replacedUuids = shardMetadata.stream().map(ShardMetadata::getShardUuid).collect(toSet()); Map> replaceUuidMap = replacedUuids.stream().collect(Collectors.toMap(uuid -> uuid, uuid -> Optional.empty())); @@ -430,7 +430,7 @@ public void testReplaceShardUuids() shardManager.replaceShardUuids(transactionId, tableId, columns, replaceUuidMap, newShards, OptionalLong.of(0), true); // check that shards are replaced in shards table for node1 - shardMetadata = shardManager.getNodeShards(nodes.get(0)); + shardMetadata = shardManager.getNodeShardsAndDeltas(nodes.get(0)); Set actualUuids = shardMetadata.stream().map(ShardMetadata::getShardUuid).collect(toSet()); assertEquals(actualUuids, ImmutableSet.copyOf(expectedUuids)); @@ -518,7 +518,7 @@ public void testReplaceDeltaUuidsFunction() shardManager.replaceDeltaUuids(transactionId, tableId, columns, shardMap, OptionalLong.of(0)); // check shards on this node1 are correct - Set shardMetadata = shardManager.getNodeShards("node1"); + Set shardMetadata = shardManager.getNodeShardsAndDeltas("node1"); assertEquals(shardMetadata.size(), 3); // check index table as well @@ -576,13 +576,13 @@ public void testReplaceDeltaUuids() shardManager.replaceDeltaUuids(transactionId, tableId, columns, shardMap, OptionalLong.of(0)); // check that delta shard are added in shards table for node1 - Set shardMetadata = shardManager.getNodeShards(nodes.get(0)); + Set shardMetadata = shardManager.getNodeShardsAndDeltas(nodes.get(0)); Map> actualUuidsMap = shardMetadata.stream().collect(toImmutableMap(ShardMetadata::getShardUuid, ShardMetadata::getDeltaUuid)); Map> expectedUuidsMap = ImmutableMap.of(originalUuids.get(0), Optional.of(newDeltaUuid1), newDeltaUuid1, Optional.empty()); assertEquals(actualUuidsMap, expectedUuidsMap); // check that shard are deleted in shards table for node2 - shardMetadata = shardManager.getNodeShards(nodes.get(1)); + shardMetadata = shardManager.getNodeShardsAndDeltas(nodes.get(1)); actualUuidsMap = shardMetadata.stream().collect(toImmutableMap(ShardMetadata::getShardUuid, ShardMetadata::getDeltaUuid)); expectedUuidsMap = ImmutableMap.of(); assertEquals(actualUuidsMap, expectedUuidsMap); @@ -628,7 +628,7 @@ public void testReplaceDeltaUuids() shardManager.replaceDeltaUuids(transactionId, tableId, columns, shardMap, OptionalLong.of(0)); // check that delta shard are added in shards table for node1 - shardMetadata = shardManager.getNodeShards(nodes.get(0)); + shardMetadata = shardManager.getNodeShardsAndDeltas(nodes.get(0)); actualUuidsMap = shardMetadata.stream().collect(toImmutableMap(ShardMetadata::getShardUuid, ShardMetadata::getDeltaUuid)); expectedUuidsMap = ImmutableMap.of(originalUuids.get(0), Optional.of(anotherNewDeltaUuid1), anotherNewDeltaUuid1, Optional.empty()); assertEquals(actualUuidsMap, expectedUuidsMap); @@ -648,7 +648,7 @@ public void testReplaceDeltaUuids() shardManager.replaceShardUuids(transactionId, tableId, columns, replaceUuidMap, ImmutableSet.of(shardInfo(uuid4, nodes.get(0))), OptionalLong.of(0), true); // check that new shard are added, old shard and delta are deleted in shards table for node1 - shardMetadata = shardManager.getNodeShards(nodes.get(0)); + shardMetadata = shardManager.getNodeShardsAndDeltas(nodes.get(0)); actualUuidsMap = shardMetadata.stream().collect(toImmutableMap(ShardMetadata::getShardUuid, ShardMetadata::getDeltaUuid)); expectedUuidsMap = ImmutableMap.of(uuid4, Optional.empty()); assertEquals(actualUuidsMap, expectedUuidsMap); diff --git a/presto-raptor/src/test/java/com/facebook/presto/raptor/metadata/TestShardDao.java b/presto-raptor/src/test/java/com/facebook/presto/raptor/metadata/TestShardDao.java index e7cc247d95ef3..ac162e032e522 100644 --- a/presto-raptor/src/test/java/com/facebook/presto/raptor/metadata/TestShardDao.java +++ b/presto-raptor/src/test/java/com/facebook/presto/raptor/metadata/TestShardDao.java @@ -188,8 +188,8 @@ public void testNodeShards() assertEquals(dao.getShards(plainTableId), ImmutableSet.of(shardUuid1, shardUuid2)); assertEquals(dao.getShards(bucketedTableId), ImmutableSet.of(shardUuid3, shardUuid4, shardUuid5)); - assertEquals(dao.getNodeShards(nodeName1, null), ImmutableSet.of(shard3)); - assertEquals(dao.getNodeShards(nodeName2, null), ImmutableSet.of(shard4, shard5)); + assertEquals(dao.getNodeShardsAndDeltas(nodeName1, null), ImmutableSet.of(shard3)); + assertEquals(dao.getNodeShardsAndDeltas(nodeName2, null), ImmutableSet.of(shard4, shard5)); assertEquals(dao.getNodeSizes(), ImmutableSet.of( new NodeSize(nodeName1, 33), new NodeSize(nodeName2, 44 + 55))); @@ -198,16 +198,16 @@ public void testNodeShards() dao.insertShardNode(shardId2, nodeId1); dao.insertShardNode(shardId1, nodeId2); - assertEquals(dao.getNodeShards(nodeName1, null), ImmutableSet.of(shard1, shard2, shard3)); - assertEquals(dao.getNodeShards(nodeName2, null), ImmutableSet.of(shard1, shard4, shard5)); + assertEquals(dao.getNodeShardsAndDeltas(nodeName1, null), ImmutableSet.of(shard1, shard2, shard3)); + assertEquals(dao.getNodeShardsAndDeltas(nodeName2, null), ImmutableSet.of(shard1, shard4, shard5)); assertEquals(dao.getNodeSizes(), ImmutableSet.of( new NodeSize(nodeName1, 11 + 22 + 33), new NodeSize(nodeName2, 11 + 44 + 55))); dao.dropShardNodes(plainTableId); - assertEquals(dao.getNodeShards(nodeName1, null), ImmutableSet.of(shard3)); - assertEquals(dao.getNodeShards(nodeName2, null), ImmutableSet.of(shard4, shard5)); + assertEquals(dao.getNodeShardsAndDeltas(nodeName1, null), ImmutableSet.of(shard3)); + assertEquals(dao.getNodeShardsAndDeltas(nodeName2, null), ImmutableSet.of(shard4, shard5)); assertEquals(dao.getNodeSizes(), ImmutableSet.of( new NodeSize(nodeName1, 33), new NodeSize(nodeName2, 44 + 55))); diff --git a/presto-raptor/src/test/java/com/facebook/presto/raptor/storage/TestShardEjector.java b/presto-raptor/src/test/java/com/facebook/presto/raptor/storage/TestShardEjector.java index 1a8f724284428..899469ef3d4a9 100644 --- a/presto-raptor/src/test/java/com/facebook/presto/raptor/storage/TestShardEjector.java +++ b/presto-raptor/src/test/java/com/facebook/presto/raptor/storage/TestShardEjector.java @@ -152,7 +152,7 @@ public void testEjector() .map(ShardInfo::getShardUuid) .collect(toSet()); - Set remaining = uuids(shardManager.getNodeShards("node1")); + Set remaining = uuids(shardManager.getNodeShardsAndDeltas("node1")); for (UUID uuid : ejectedShards) { assertFalse(remaining.contains(uuid)); @@ -165,10 +165,10 @@ public void testEjector() } Set others = ImmutableSet.builder() - .addAll(uuids(shardManager.getNodeShards("node2"))) - .addAll(uuids(shardManager.getNodeShards("node3"))) - .addAll(uuids(shardManager.getNodeShards("node4"))) - .addAll(uuids(shardManager.getNodeShards("node5"))) + .addAll(uuids(shardManager.getNodeShardsAndDeltas("node2"))) + .addAll(uuids(shardManager.getNodeShardsAndDeltas("node3"))) + .addAll(uuids(shardManager.getNodeShardsAndDeltas("node4"))) + .addAll(uuids(shardManager.getNodeShardsAndDeltas("node5"))) .build(); assertTrue(others.containsAll(ejectedShards)); diff --git a/presto-raptor/src/test/java/com/facebook/presto/raptor/storage/organization/TestCompactionSetCreator.java b/presto-raptor/src/test/java/com/facebook/presto/raptor/storage/organization/TestCompactionSetCreator.java index dd7de5dbaecb3..66a6312f19b4c 100644 --- a/presto-raptor/src/test/java/com/facebook/presto/raptor/storage/organization/TestCompactionSetCreator.java +++ b/presto-raptor/src/test/java/com/facebook/presto/raptor/storage/organization/TestCompactionSetCreator.java @@ -16,13 +16,15 @@ import com.facebook.presto.raptor.metadata.Table; import com.facebook.presto.spi.type.Type; import com.google.common.collect.ImmutableList; +import com.google.common.collect.ImmutableMap; import com.google.common.collect.ImmutableSet; import io.airlift.units.DataSize; import org.testng.annotations.Test; import java.time.Duration; -import java.util.HashSet; +import java.util.HashMap; import java.util.List; +import java.util.Map; import java.util.Optional; import java.util.OptionalInt; import java.util.OptionalLong; @@ -57,7 +59,20 @@ public void testNonTemporalOrganizationSetSimple() Set compactionSets = compactionSetCreator.createCompactionSets(tableInfo, inputShards); assertEquals(compactionSets.size(), 1); - assertEquals(getOnlyElement(compactionSets).getShards(), extractIndexes(inputShards, 0, 1, 2)); + assertEquals(getOnlyElement(compactionSets).getShardsMap(), extractIndexes(inputShards, 0, 1, 2)); + } + + @Test + public void testNonTemporalOrganizationSetDelta() + { + List inputShards = ImmutableList.of( + sharAndDeltaWithSize(10, 10), + sharAndDeltaWithSize(10, 10), + sharAndDeltaWithSize(10, 10)); + + Set compactionSets = compactionSetCreator.createCompactionSets(tableInfo, inputShards); + assertEquals(compactionSets.size(), 1); + assertEquals(getOnlyElement(compactionSets).getShardsMap(), extractIndexes(inputShards, 0, 1, 2)); } @Test @@ -71,11 +86,11 @@ public void testNonTemporalSizeBasedOrganizationSet() Set compactionSets = compactionSetCreator.createCompactionSets(tableInfo, inputShards); - Set actual = new HashSet<>(); + Map> actual = new HashMap<>(); for (OrganizationSet set : compactionSets) { - actual.addAll(set.getShards()); + actual.putAll(set.getShardsMap()); } - assertTrue(extractIndexes(inputShards, 0, 1, 2).containsAll(actual)); + assertTrue(extractIndexes(inputShards, 0, 1, 2).keySet().containsAll(actual.keySet())); } @Test @@ -89,12 +104,12 @@ public void testNonTemporalRowCountBasedOrganizationSet() Set compactionSets = compactionSetCreator.createCompactionSets(tableInfo, inputShards); - Set actual = new HashSet<>(); + Map> actual = new HashMap<>(); for (OrganizationSet set : compactionSets) { - actual.addAll(set.getShards()); + actual.putAll(set.getShardsMap()); } - assertTrue(extractIndexes(inputShards, 0, 2, 3).containsAll(actual)); + assertTrue(extractIndexes(inputShards, 0, 2, 3).keySet().containsAll(actual.keySet())); } @Test @@ -115,8 +130,8 @@ public void testTemporalCompactionNoCompactionAcrossDays() assertEquals(actual.size(), 2); Set expected = ImmutableSet.of( - new OrganizationSet(temporalTableInfo.getTableId(), extractIndexes(inputShards, 0, 3), OptionalInt.empty()), - new OrganizationSet(temporalTableInfo.getTableId(), extractIndexes(inputShards, 1, 2), OptionalInt.empty())); + new OrganizationSet(temporalTableInfo.getTableId(), false, extractIndexes(inputShards, 0, 3), OptionalInt.empty(), 0), + new OrganizationSet(temporalTableInfo.getTableId(), false, extractIndexes(inputShards, 1, 2), OptionalInt.empty(), 0)); assertEquals(actual, expected); } @@ -143,8 +158,8 @@ public void testTemporalCompactionSpanningDays() assertEquals(compactionSets.size(), 2); Set expected = ImmutableSet.of( - new OrganizationSet(tableId, extractIndexes(inputShards, 0, 1, 5, 6), OptionalInt.empty()), - new OrganizationSet(tableId, extractIndexes(inputShards, 2, 3, 4), OptionalInt.empty())); + new OrganizationSet(tableId, false, extractIndexes(inputShards, 0, 1, 5, 6), OptionalInt.empty(), 0), + new OrganizationSet(tableId, false, extractIndexes(inputShards, 2, 3, 4), OptionalInt.empty(), 0)); assertEquals(compactionSets, expected); } @@ -169,8 +184,8 @@ public void testTemporalCompactionDate() assertEquals(actual.size(), 2); Set expected = ImmutableSet.of( - new OrganizationSet(tableId, extractIndexes(inputShards, 0, 3, 5), OptionalInt.empty()), - new OrganizationSet(tableId, extractIndexes(inputShards, 1, 4), OptionalInt.empty())); + new OrganizationSet(tableId, false, extractIndexes(inputShards, 0, 3, 5), OptionalInt.empty(), 0), + new OrganizationSet(tableId, false, extractIndexes(inputShards, 1, 4), OptionalInt.empty(), 0)); assertEquals(actual, expected); } @@ -191,16 +206,16 @@ public void testBucketedTableCompaction() assertEquals(actual.size(), 2); Set expected = ImmutableSet.of( - new OrganizationSet(tableId, extractIndexes(inputShards, 0, 3, 5), OptionalInt.of(1)), - new OrganizationSet(tableId, extractIndexes(inputShards, 1, 2, 4), OptionalInt.of(2))); + new OrganizationSet(tableId, false, extractIndexes(inputShards, 0, 3, 5), OptionalInt.of(1), 0), + new OrganizationSet(tableId, false, extractIndexes(inputShards, 1, 2, 4), OptionalInt.of(2), 0)); assertEquals(actual, expected); } - static Set extractIndexes(List inputShards, int... indexes) + static Map> extractIndexes(List inputShards, int... indexes) { - ImmutableSet.Builder builder = ImmutableSet.builder(); + ImmutableMap.Builder> builder = ImmutableMap.builder(); for (int index : indexes) { - builder.add(inputShards.get(index).getShardUuid()); + builder.put(inputShards.get(index).getShardUuid(), inputShards.get(index).getDeltaUuid()); } return builder.build(); } @@ -227,8 +242,8 @@ public void testBucketedTemporalTableCompaction() assertEquals(actual.size(), 2); Set expected = ImmutableSet.of( - new OrganizationSet(tableId, extractIndexes(inputShards, 0, 2), OptionalInt.of(1)), - new OrganizationSet(tableId, extractIndexes(inputShards, 1, 3), OptionalInt.of(2))); + new OrganizationSet(tableId, false, extractIndexes(inputShards, 0, 2), OptionalInt.of(1), 0), + new OrganizationSet(tableId, false, extractIndexes(inputShards, 1, 3), OptionalInt.of(2), 0)); assertEquals(actual, expected); } @@ -238,6 +253,22 @@ private static ShardIndexInfo shardWithSize(long rows, long size) 1, OptionalInt.empty(), UUID.randomUUID(), + false, + Optional.empty(), + rows, + size, + Optional.empty(), + Optional.empty()); + } + + private static ShardIndexInfo sharAndDeltaWithSize(long rows, long size) + { + return new ShardIndexInfo( + 1, + OptionalInt.empty(), + UUID.randomUUID(), + false, + Optional.of(UUID.randomUUID()), rows, size, Optional.empty(), @@ -255,6 +286,8 @@ private static ShardIndexInfo shardWithBucket(int bucketNumber) 1, OptionalInt.of(bucketNumber), UUID.randomUUID(), + false, + Optional.empty(), 1, 1, Optional.empty(), @@ -268,6 +301,8 @@ private static ShardIndexInfo shardWithTemporalBucket(OptionalInt bucketNumber, 1, bucketNumber, UUID.randomUUID(), + false, + Optional.empty(), 1, 1, Optional.empty(), @@ -277,6 +312,8 @@ private static ShardIndexInfo shardWithTemporalBucket(OptionalInt bucketNumber, 1, bucketNumber, UUID.randomUUID(), + false, + Optional.empty(), 1, 1, Optional.empty(), diff --git a/presto-raptor/src/test/java/com/facebook/presto/raptor/storage/organization/TestShardCompactor.java b/presto-raptor/src/test/java/com/facebook/presto/raptor/storage/organization/TestShardCompactor.java index 4f2c2a745ca84..2ffbdc1b49006 100644 --- a/presto-raptor/src/test/java/com/facebook/presto/raptor/storage/organization/TestShardCompactor.java +++ b/presto-raptor/src/test/java/com/facebook/presto/raptor/storage/organization/TestShardCompactor.java @@ -32,6 +32,7 @@ import com.facebook.presto.testing.MaterializedResult; import com.facebook.presto.testing.MaterializedRow; import com.google.common.collect.ImmutableList; +import com.google.common.collect.ImmutableMap; import io.airlift.units.DataSize; import org.skife.jdbi.v2.DBI; import org.skife.jdbi.v2.Handle; @@ -43,13 +44,16 @@ import java.io.File; import java.io.IOException; +import java.util.HashMap; import java.util.List; +import java.util.Map; import java.util.Optional; import java.util.OptionalInt; import java.util.Set; import java.util.UUID; import static com.facebook.airlift.concurrent.MoreFutures.getFutureValue; +import static com.facebook.presto.RowPagesBuilder.rowPagesBuilder; import static com.facebook.presto.raptor.storage.TestOrcStorageManager.createOrcStorageManager; import static com.facebook.presto.spi.block.SortOrder.ASC_NULLS_FIRST; import static com.facebook.presto.spi.type.BigintType.BIGINT; @@ -120,15 +124,58 @@ public void testShardCompactor(boolean useOptimizedOrcWriter) .sum(); long expectedOutputShards = computeExpectedOutputShards(totalRows); - Set inputUuids = inputShards.stream().map(ShardInfo::getShardUuid).collect(toSet()); + Map> inputUuids = new HashMap<>(); + for (ShardInfo shardInfo : inputShards) { + inputUuids.put(shardInfo.getShardUuid(), Optional.empty()); + } long transactionId = 1; ShardCompactor compactor = new ShardCompactor(storageManager, READER_ATTRIBUTES); - List outputShards = compactor.compact(transactionId, OptionalInt.empty(), inputUuids, getColumnInfo(columnIds, columnTypes)); + List outputShards = compactor.compact(transactionId, false, OptionalInt.empty(), inputUuids, getColumnInfo(columnIds, columnTypes)); assertEquals(outputShards.size(), expectedOutputShards); Set outputUuids = outputShards.stream().map(ShardInfo::getShardUuid).collect(toSet()); - assertShardEqualsIgnoreOrder(storageManager, inputUuids, outputUuids, columnIds, columnTypes); + assertShardEqualsIgnoreOrder(storageManager, inputUuids.keySet(), outputUuids, columnIds, columnTypes); + } + + @Test + public void testShardCompactorWithDelta() + throws Exception + { + StorageManager storageManager = createOrcStorageManager(dbi, temporary, MAX_SHARD_ROWS); + List columnIds = ImmutableList.of(3L, 7L, 2L, 1L, 5L); + List columnTypes = ImmutableList.of(BIGINT, createVarcharType(20), DOUBLE, DATE, TIMESTAMP); + + List inputShards = createShards(storageManager, columnIds, columnTypes, 3); + assertEquals(inputShards.size(), 3); + + List deltaColumnIds = ImmutableList.of(1L); + List deltaColumnTypes = ImmutableList.of(BIGINT); + StoragePageSink deltaSink = createStoragePageSink(storageManager, deltaColumnIds, deltaColumnTypes); + List deltaPages = rowPagesBuilder(deltaColumnTypes) + .row(1L) + .row(2L) + .build(); + deltaSink.appendPages(deltaPages); + List deltaShards = getFutureValue(deltaSink.commit()); + + long totalRows = inputShards.stream() + .mapToLong(ShardInfo::getRowCount) + .sum(); + long expectedOutputShardsCount = computeExpectedOutputShards(totalRows - 2); + + Map> inputUuidsMap = new HashMap<>(); + inputUuidsMap.put(inputShards.get(0).getShardUuid(), Optional.of(deltaShards.get(0).getShardUuid())); + inputUuidsMap.put(inputShards.get(1).getShardUuid(), Optional.empty()); + inputUuidsMap.put(inputShards.get(2).getShardUuid(), Optional.empty()); + + long transactionId = 1; + ShardCompactor compactor = new ShardCompactor(storageManager, READER_ATTRIBUTES); + List outputShards = compactor.compact(transactionId, true, OptionalInt.empty(), inputUuidsMap, getColumnInfo(columnIds, columnTypes)); + assertEquals(outputShards.size(), expectedOutputShardsCount); + + Set outputUuids = outputShards.stream().map(ShardInfo::getShardUuid).collect(toSet()); + assertShardEqualsIgnoreOrder(storageManager, inputUuidsMap, outputUuids, columnIds, columnTypes); } @Test(dataProvider = "useOptimizedOrcWriter") @@ -150,17 +197,20 @@ public void testShardCompactorSorted(boolean useOptimizedOrcWriter) long totalRows = inputShards.stream().mapToLong(ShardInfo::getRowCount).sum(); long expectedOutputShards = computeExpectedOutputShards(totalRows); - Set inputUuids = inputShards.stream().map(ShardInfo::getShardUuid).collect(toSet()); + Map> inputUuids = new HashMap<>(); + for (ShardInfo shardInfo : inputShards) { + inputUuids.put(shardInfo.getShardUuid(), Optional.empty()); + } long transactionId = 1; ShardCompactor compactor = new ShardCompactor(storageManager, READER_ATTRIBUTES); - List outputShards = compactor.compactSorted(transactionId, OptionalInt.empty(), inputUuids, getColumnInfo(columnIds, columnTypes), sortColumnIds, sortOrders); + List outputShards = compactor.compactSorted(transactionId, false, OptionalInt.empty(), inputUuids, getColumnInfo(columnIds, columnTypes), sortColumnIds, sortOrders); List outputUuids = outputShards.stream() .map(ShardInfo::getShardUuid) .collect(toList()); assertEquals(outputShards.size(), expectedOutputShards); - assertShardEqualsSorted(storageManager, inputUuids, outputUuids, columnIds, columnTypes, sortIndexes, sortOrders); + assertShardEqualsSorted(storageManager, inputUuids.keySet(), outputUuids, columnIds, columnTypes, sortIndexes, sortOrders); } private static long computeExpectedOutputShards(long totalRows) @@ -177,6 +227,15 @@ private void assertShardEqualsIgnoreOrder(StorageManager storageManager, Set> inputUuidsMap, Set outputUuids, List columnIds, List columnTypes) + throws IOException + { + MaterializedResult inputRows = getMaterializedRows(storageManager, ImmutableMap.copyOf(inputUuidsMap), columnIds, columnTypes); + MaterializedResult outputRows = getMaterializedRows(storageManager, ImmutableList.copyOf(outputUuids), columnIds, columnTypes); + + assertEqualsIgnoreOrder(outputRows, inputRows); + } + private void assertShardEqualsSorted(StorageManager storageManager, Set inputUuids, List outputUuids, List columnIds, List columnTypes, List sortIndexes, List sortOrders) throws IOException { @@ -237,7 +296,7 @@ private List getPages(StorageManager storageManager, Set uuids, List { ImmutableList.Builder pages = ImmutableList.builder(); for (UUID uuid : uuids) { - try (ConnectorPageSource pageSource = getPageSource(storageManager, columnIds, columnTypes, uuid)) { + try (ConnectorPageSource pageSource = getPageSource(storageManager, columnIds, columnTypes, uuid, Optional.empty(), false)) { while (!pageSource.isFinished()) { Page outputPage = pageSource.getNextPage(); if (outputPage == null) { @@ -255,7 +314,22 @@ private MaterializedResult getMaterializedRows(StorageManager storageManager, Li { MaterializedResult.Builder rows = MaterializedResult.resultBuilder(SESSION, columnTypes); for (UUID uuid : uuids) { - try (ConnectorPageSource pageSource = getPageSource(storageManager, columnIds, columnTypes, uuid)) { + try (ConnectorPageSource pageSource = getPageSource(storageManager, columnIds, columnTypes, uuid, Optional.empty(), false)) { + MaterializedResult result = materializeSourceDataStream(SESSION, pageSource, columnTypes); + rows.rows(result.getMaterializedRows()); + } + } + return rows.build(); + } + + private MaterializedResult getMaterializedRows(StorageManager storageManager, Map> uuidsMap, List columnIds, List columnTypes) + throws IOException + { + MaterializedResult.Builder rows = MaterializedResult.resultBuilder(SESSION, columnTypes); + for (Map.Entry> entry : uuidsMap.entrySet()) { + UUID uuid = entry.getKey(); + Optional deltaUuid = entry.getValue(); + try (ConnectorPageSource pageSource = getPageSource(storageManager, columnIds, columnTypes, uuid, deltaUuid, true)) { MaterializedResult result = materializeSourceDataStream(SESSION, pageSource, columnTypes); rows.rows(result.getMaterializedRows()); } @@ -263,9 +337,9 @@ private MaterializedResult getMaterializedRows(StorageManager storageManager, Li return rows.build(); } - private ConnectorPageSource getPageSource(StorageManager storageManager, List columnIds, List columnTypes, UUID uuid) + private ConnectorPageSource getPageSource(StorageManager storageManager, List columnIds, List columnTypes, UUID uuid, Optional deltaShardUuid, boolean tableSupportsDeltaDelete) { - return storageManager.getPageSource(FileSystemContext.DEFAULT_RAPTOR_CONTEXT, uuid, Optional.empty(), false, OptionalInt.empty(), columnIds, columnTypes, TupleDomain.all(), READER_ATTRIBUTES); + return storageManager.getPageSource(FileSystemContext.DEFAULT_RAPTOR_CONTEXT, uuid, deltaShardUuid, tableSupportsDeltaDelete, OptionalInt.empty(), columnIds, columnTypes, TupleDomain.all(), READER_ATTRIBUTES); } private static List createSortedShards(StorageManager storageManager, List columnIds, List columnTypes, List sortChannels, List sortOrders, int shardCount) diff --git a/presto-raptor/src/test/java/com/facebook/presto/raptor/storage/organization/TestShardOrganizationManager.java b/presto-raptor/src/test/java/com/facebook/presto/raptor/storage/organization/TestShardOrganizationManager.java index 9f746196be6c7..202c7eb341593 100644 --- a/presto-raptor/src/test/java/com/facebook/presto/raptor/storage/organization/TestShardOrganizationManager.java +++ b/presto-raptor/src/test/java/com/facebook/presto/raptor/storage/organization/TestShardOrganizationManager.java @@ -140,7 +140,7 @@ public void testSimple() assertEquals(actual.size(), 1); // Shards 0, 1 and 2 are overlapping, so we should get an organization set with these shards - assertEquals(getOnlyElement(actual).getShards(), extractIndexes(shards, 0, 1, 2)); + assertEquals(getOnlyElement(actual).getShardsMap(), extractIndexes(shards, 0, 1, 2)); } @Test @@ -167,7 +167,7 @@ public void testSimpleTemporal() // expect 2 organization sets, of overlapping shards (0, 2) and (1, 3) assertEquals(organizationSets.size(), 2); - assertEquals(actual, ImmutableSet.of(extractIndexes(shards, 0, 2), extractIndexes(shards, 1, 3))); + assertEquals(actual, ImmutableSet.of(extractIndexes(shards, 0, 2).keySet(), extractIndexes(shards, 1, 3).keySet())); } private static ShardIndexInfo shardWithSortRange(int bucketNumber, ShardRange sortRange) @@ -176,6 +176,8 @@ private static ShardIndexInfo shardWithSortRange(int bucketNumber, ShardRange so 1, OptionalInt.of(bucketNumber), UUID.randomUUID(), + false, + Optional.empty(), 1, 1, Optional.of(sortRange), @@ -188,6 +190,8 @@ private static ShardIndexInfo shardWithTemporalRange(int bucketNumber, ShardRang 1, OptionalInt.of(bucketNumber), UUID.randomUUID(), + false, + Optional.empty(), 1, 1, Optional.of(sortRange), diff --git a/presto-raptor/src/test/java/com/facebook/presto/raptor/storage/organization/TestShardOrganizer.java b/presto-raptor/src/test/java/com/facebook/presto/raptor/storage/organization/TestShardOrganizer.java index 43126d6f3ee4f..bae7bc4b3e01c 100644 --- a/presto-raptor/src/test/java/com/facebook/presto/raptor/storage/organization/TestShardOrganizer.java +++ b/presto-raptor/src/test/java/com/facebook/presto/raptor/storage/organization/TestShardOrganizer.java @@ -13,11 +13,12 @@ */ package com.facebook.presto.raptor.storage.organization; -import com.google.common.collect.ImmutableSet; +import com.google.common.collect.ImmutableMap; import org.testng.annotations.Test; +import java.util.Map; +import java.util.Optional; import java.util.OptionalInt; -import java.util.Set; import java.util.UUID; import static com.google.common.collect.Iterables.getOnlyElement; @@ -35,18 +36,18 @@ public void testShardOrganizerInProgress() { ShardOrganizer organizer = createShardOrganizer(); - Set shards = ImmutableSet.of(UUID.randomUUID()); - OrganizationSet organizationSet = new OrganizationSet(1L, shards, OptionalInt.empty()); + Map> shards = ImmutableMap.of(UUID.randomUUID(), Optional.empty()); + OrganizationSet organizationSet = new OrganizationSet(1L, false, shards, OptionalInt.empty(), 0); organizer.enqueue(organizationSet); - assertTrue(organizer.inProgress(getOnlyElement(shards))); + assertTrue(organizer.inProgress(getOnlyElement(shards.keySet()))); assertEquals(organizer.getShardsInProgress(), 1); - while (organizer.inProgress(getOnlyElement(shards))) { + while (organizer.inProgress(getOnlyElement(shards.keySet()))) { MILLISECONDS.sleep(10); } - assertFalse(organizer.inProgress(getOnlyElement(shards))); + assertFalse(organizer.inProgress(getOnlyElement(shards.keySet()))); assertEquals(organizer.getShardsInProgress(), 0); organizer.shutdown(); } diff --git a/presto-raptor/src/test/java/com/facebook/presto/raptor/storage/organization/TestShardOrganizerUtil.java b/presto-raptor/src/test/java/com/facebook/presto/raptor/storage/organization/TestShardOrganizerUtil.java index 5f291c0e375fd..daa46852cd3ee 100644 --- a/presto-raptor/src/test/java/com/facebook/presto/raptor/storage/organization/TestShardOrganizerUtil.java +++ b/presto-raptor/src/test/java/com/facebook/presto/raptor/storage/organization/TestShardOrganizerUtil.java @@ -161,7 +161,7 @@ public void testGetOrganizationEligibleShards() long transactionId = shardManager.beginTransaction(); shardManager.commitShards(transactionId, tableInfo.getTableId(), COLUMNS, shards, Optional.empty(), 0); - Set shardMetadatas = shardManager.getNodeShards("node1"); + Set shardMetadatas = shardManager.getNodeShardsAndDeltas("node1"); Long temporalColumnId = metadataDao.getTemporalColumnId(tableInfo.getTableId()); TableColumn temporalColumn = metadataDao.getTableColumn(tableInfo.getTableId(), temporalColumnId); @@ -227,6 +227,8 @@ private static List getShardIndexInfo(Table tableInfo, List