Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[dvc][server] verifying discovered host connectivity before initiating blob transfer request #1534

Merged
merged 9 commits into from
Feb 19, 2025
Original file line number Diff line number Diff line change
Expand Up @@ -320,7 +320,8 @@ public DaVinciBackend(
aggVersionedBlobTransferStats,
backendConfig.getRocksDBServerConfig().isRocksDBPlainTableFormatEnabled()
? BlobTransferTableFormat.PLAIN_TABLE
: BlobTransferTableFormat.BLOCK_BASED_TABLE);
: BlobTransferTableFormat.BLOCK_BASED_TABLE,
backendConfig.getBlobTransferPeersConnectivityFreshnessInSeconds());
} else {
aggVersionedBlobTransferStats = null;
blobTransferManager = null;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -351,6 +351,16 @@ public BlobTransferPartitionMetadata getTransferredSnapshotMetadata(String topic
* @return the metadata for the blob transfer request
*/
public BlobTransferPartitionMetadata prepareMetadata(BlobTransferPayload blobTransferRequest) {
if (storageMetadataService == null || storeVersionStateSerializer == null) {
throw new VeniceException("StorageMetadataService or storeVersionStateSerializer is not initialized");
}

if (storageMetadataService.getStoreVersionState(blobTransferRequest.getTopicName()) == null
|| storageMetadataService
.getLastOffset(blobTransferRequest.getTopicName(), blobTransferRequest.getPartition()) == null) {
throw new VeniceException("Cannot get store version state or offset record from storage metadata service.");
}

// prepare metadata
StoreVersionState storeVersionState =
storageMetadataService.getStoreVersionState(blobTransferRequest.getTopicName());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,8 @@ public static BlobTransferManager<Void> getP2PBlobTransferManagerForDVCAndStart(
int snapshotRetentionTimeInMin,
int blobTransferMaxTimeoutInMin,
AggVersionedBlobTransferStats aggVersionedBlobTransferStats,
BlobTransferUtils.BlobTransferTableFormat transferSnapshotTableFormat) {
BlobTransferUtils.BlobTransferTableFormat transferSnapshotTableFormat,
int peersConnectivityFreshnessInSeconds) {
try {
BlobSnapshotManager blobSnapshotManager = new BlobSnapshotManager(
readOnlyStoreRepository,
Expand All @@ -51,7 +52,11 @@ public static BlobTransferManager<Void> getP2PBlobTransferManagerForDVCAndStart(
transferSnapshotTableFormat);
BlobTransferManager<Void> manager = new NettyP2PBlobTransferManager(
new P2PBlobTransferService(p2pTransferServerPort, baseDir, blobTransferMaxTimeoutInMin, blobSnapshotManager),
new NettyFileTransferClient(p2pTransferClientPort, baseDir, storageMetadataService),
new NettyFileTransferClient(
p2pTransferClientPort,
baseDir,
storageMetadataService,
peersConnectivityFreshnessInSeconds),
new DaVinciBlobFinder(clientConfig),
baseDir,
aggVersionedBlobTransferStats);
Expand Down Expand Up @@ -84,7 +89,8 @@ public static BlobTransferManager<Void> getP2PBlobTransferManagerForServerAndSta
int snapshotRetentionTimeInMin,
int blobTransferMaxTimeoutInMin,
AggVersionedBlobTransferStats aggVersionedBlobTransferStats,
BlobTransferUtils.BlobTransferTableFormat transferSnapshotTableFormat) {
BlobTransferUtils.BlobTransferTableFormat transferSnapshotTableFormat,
int peersConnectivityFreshnessInSeconds) {
try {
BlobSnapshotManager blobSnapshotManager = new BlobSnapshotManager(
readOnlyStoreRepository,
Expand All @@ -95,7 +101,11 @@ public static BlobTransferManager<Void> getP2PBlobTransferManagerForServerAndSta
transferSnapshotTableFormat);
BlobTransferManager<Void> manager = new NettyP2PBlobTransferManager(
new P2PBlobTransferService(p2pTransferServerPort, baseDir, blobTransferMaxTimeoutInMin, blobSnapshotManager),
new NettyFileTransferClient(p2pTransferClientPort, baseDir, storageMetadataService),
new NettyFileTransferClient(
p2pTransferClientPort,
baseDir,
storageMetadataService,
peersConnectivityFreshnessInSeconds),
new ServerBlobFinder(customizedViewFuture),
baseDir,
aggVersionedBlobTransferStats);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,12 @@
import java.io.InputStream;
import java.time.Duration;
import java.time.Instant;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionStage;
import java.util.stream.Collectors;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;

Expand Down Expand Up @@ -92,11 +95,10 @@ public CompletionStage<InputStream> get(
}

List<String> discoverPeers = response.getDiscoveryResult();
LOGGER
.info("Discovered peers {} for store {} version {} partition {}", discoverPeers, storeName, version, partition);
Set<String> connectablePeers = getConnectableHosts(discoverPeers, storeName, version, partition);

// 2: Process peers sequentially to fetch the blob
processPeersSequentially(discoverPeers, storeName, version, partition, tableFormat, resultFuture);
processPeersSequentially(connectablePeers, storeName, version, partition, tableFormat, resultFuture);

return resultFuture;
}
Expand All @@ -121,15 +123,15 @@ public CompletionStage<InputStream> get(
* - Success case:
* 1. If the blob is successfully fetched from a peer, an InputStream of the blob is returned.
*
* @param peers the list of peers to process
* @param uniqueConnectablePeers the set of peers to process
* @param storeName the name of the store
* @param version the version of the store
* @param partition the partition of the store
* @param tableFormat the needed table format
* @param resultFuture the future to complete with the InputStream of the blob
*/
private void processPeersSequentially(
List<String> peers,
Set<String> uniqueConnectablePeers,
String storeName,
int version,
int partition,
Expand All @@ -142,19 +144,23 @@ private void processPeersSequentially(
CompletableFuture<Void> chainOfPeersFuture = CompletableFuture.completedFuture(null);

// Iterate through each peer and chain the futures
for (int currentPeerIndex = 0; currentPeerIndex < peers.size(); currentPeerIndex++) {
final int peerIndex = currentPeerIndex;
for (String chosenHost: uniqueConnectablePeers) {
// Chain the next operation to the previous future
chainOfPeersFuture = chainOfPeersFuture.thenCompose(v -> {
String chosenHost = peers.get(peerIndex).split("_")[0];

if (resultFuture.isDone()) {
// If the result future is already completed, skip the current peer
return CompletableFuture.completedFuture(null);
}

// Attempt to fetch the blob from the current peer asynchronously
LOGGER.info("Attempting to connect to host: {}", chosenHost);
LOGGER.info(
"Attempting to connect to host: {} for store {} version {} partition {} table format {}",
chosenHost,
storeName,
version,
partition,
tableFormat);

return nettyClient.get(chosenHost, storeName, version, partition, tableFormat)
.toCompletableFuture()
Expand Down Expand Up @@ -244,4 +250,38 @@ private void updateBlobTransferFileReceiveStats(double transferTime, String stor
e);
}
}

/**
* Get the connectable hosts for the given storeName, version, and partition
* @param discoverPeers the list of discovered peers
* @param storeName the name of the store
* @param version the version of the store
* @param partition the partition of the store
* @return the set of unique connectable hosts
*/
private Set<String> getConnectableHosts(List<String> discoverPeers, String storeName, int version, int partition) {
// Extract unique hosts from the discovered peers
Set<String> uniquePeers = discoverPeers.stream().map(peer -> peer.split("_")[0]).collect(Collectors.toSet());

LOGGER.info(
"Discovered {} unique peers store {} version {} partition {}, peers are {}",
uniquePeers.size(),
storeName,
version,
partition,
uniquePeers);

// Get the connectable hosts for this store, version, and partition
Set<String> connectablePeers =
nettyClient.getConnectableHosts((HashSet<String>) uniquePeers, storeName, version, partition);

LOGGER.info(
"Total {} unique connectable peers for store {} version {} partition {}, peers are {}",
connectablePeers.size(),
storeName,
version,
partition,
connectablePeers);
return connectablePeers;
}
}
Loading
Loading