Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[dvc][server] verifying discovered host connectivity before initiating blob transfer request #1534

Merged
merged 9 commits into from
Feb 19, 2025
Original file line number Diff line number Diff line change
Expand Up @@ -351,6 +351,16 @@ public BlobTransferPartitionMetadata getTransferredSnapshotMetadata(String topic
* @return the metadata for the blob transfer request
*/
public BlobTransferPartitionMetadata prepareMetadata(BlobTransferPayload blobTransferRequest) {
if (storageMetadataService == null || storeVersionStateSerializer == null) {
throw new VeniceException("StorageMetadataService or storeVersionStateSerializer is not initialized");
}

if (storageMetadataService.getStoreVersionState(blobTransferRequest.getTopicName()) == null
|| storageMetadataService
.getLastOffset(blobTransferRequest.getTopicName(), blobTransferRequest.getPartition()) == null) {
throw new VeniceException("Cannot get store version state or offset record from storage metadata service.");
}

// prepare metadata
StoreVersionState storeVersionState =
storageMetadataService.getStoreVersionState(blobTransferRequest.getTopicName());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,12 @@
import java.io.InputStream;
import java.time.Duration;
import java.time.Instant;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionStage;
import java.util.stream.Collectors;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;

Expand Down Expand Up @@ -92,11 +95,10 @@ public CompletionStage<InputStream> get(
}

List<String> discoverPeers = response.getDiscoveryResult();
LOGGER
.info("Discovered peers {} for store {} version {} partition {}", discoverPeers, storeName, version, partition);
Set<String> uniqueConnectablePeers = getUniqueConnectableHosts(discoverPeers, storeName, version, partition);

// 2: Process peers sequentially to fetch the blob
processPeersSequentially(discoverPeers, storeName, version, partition, tableFormat, resultFuture);
processPeersSequentially(uniqueConnectablePeers, storeName, version, partition, tableFormat, resultFuture);

return resultFuture;
}
Expand All @@ -121,15 +123,15 @@ public CompletionStage<InputStream> get(
* - Success case:
* 1. If the blob is successfully fetched from a peer, an InputStream of the blob is returned.
*
* @param peers the list of peers to process
* @param uniqueConnectablePeers the set of peers to process
* @param storeName the name of the store
* @param version the version of the store
* @param partition the partition of the store
* @param tableFormat the needed table format
* @param resultFuture the future to complete with the InputStream of the blob
*/
private void processPeersSequentially(
List<String> peers,
Set<String> uniqueConnectablePeers,
String storeName,
int version,
int partition,
Expand All @@ -142,19 +144,23 @@ private void processPeersSequentially(
CompletableFuture<Void> chainOfPeersFuture = CompletableFuture.completedFuture(null);

// Iterate through each peer and chain the futures
for (int currentPeerIndex = 0; currentPeerIndex < peers.size(); currentPeerIndex++) {
final int peerIndex = currentPeerIndex;
for (String chosenHost: uniqueConnectablePeers) {
// Chain the next operation to the previous future
chainOfPeersFuture = chainOfPeersFuture.thenCompose(v -> {
String chosenHost = peers.get(peerIndex).split("_")[0];

if (resultFuture.isDone()) {
// If the result future is already completed, skip the current peer
return CompletableFuture.completedFuture(null);
}

// Attempt to fetch the blob from the current peer asynchronously
LOGGER.info("Attempting to connect to host: {}", chosenHost);
LOGGER.info(
"Attempting to connect to host: {} for store {} version {} partition {} table format {}",
chosenHost,
storeName,
version,
partition,
tableFormat);

return nettyClient.get(chosenHost, storeName, version, partition, tableFormat)
.toCompletableFuture()
Expand Down Expand Up @@ -244,4 +250,46 @@ private void updateBlobTransferFileReceiveStats(double transferTime, String stor
e);
}
}

/**
* Get the unique connectable hosts for the given storeName, version, and partition
* @param discoverPeers the list of discovered peers
* @param storeName the name of the store
* @param version the version of the store
* @param partition the partition of the store
* @return the set of unique connectable hosts
*/
private Set<String> getUniqueConnectableHosts(
List<String> discoverPeers,
String storeName,
int version,
int partition) {
// Extract unique hosts from the discovered peers
Set<String> uniquePeers = discoverPeers.stream().map(peer -> peer.split("_")[0]).collect(Collectors.toSet());

LOGGER.info(
"Discovered {} unique peers store {} version {} partition {}, peers are {}",
uniquePeers.size(),
storeName,
version,
partition,
uniquePeers);

// Get the connectable hosts for this store, version, and partition
Set<String> connectablePeers =
nettyClient.getConnectableHosts((HashSet<String>) uniquePeers, storeName, version, partition);

// Exclude the current host
String currentHost = Utils.getHostName();
connectablePeers.remove(currentHost);

LOGGER.info(
"Total {} unique connectable peers for store {} version {} partition {}, peers are {}",
connectablePeers.size(),
storeName,
version,
partition,
connectablePeers);
return connectablePeers;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
import com.linkedin.davinci.blobtransfer.BlobTransferUtils.BlobTransferTableFormat;
import com.linkedin.davinci.storage.StorageMetadataService;
import com.linkedin.venice.exceptions.VenicePeersConnectionException;
import com.linkedin.venice.utils.concurrent.VeniceConcurrentHashMap;
import io.netty.bootstrap.Bootstrap;
import io.netty.channel.Channel;
import io.netty.channel.ChannelInitializer;
Expand All @@ -18,8 +19,17 @@
import io.netty.handler.codec.http.HttpVersion;
import io.netty.handler.timeout.IdleStateHandler;
import java.io.InputStream;
import java.util.ArrayList;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionStage;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;

Expand All @@ -32,6 +42,10 @@ public class NettyFileTransferClient {
private final String baseDir;
private final int serverPort;
private StorageMetadataService storageMetadataService;
private final ExecutorService executorService;
// A set to contain the connectable and unconnectable hosts for saving effort on reconnection
private Set<String> unconnectableHosts = VeniceConcurrentHashMap.newKeySet();
private Set<String> connectedHosts = VeniceConcurrentHashMap.newKeySet();

// TODO 1: move tunable configs to a config class
// TODO 2: consider either increasing worker threads or have a dedicated thread pool to handle requests.
Expand All @@ -51,6 +65,69 @@ public void initChannel(SocketChannel ch) {
ch.pipeline().addLast(new HttpClientCodec());
}
});
this.executorService = Executors.newCachedThreadPool();
}

/**
* A method to get the connectable hosts for the given store, version, and partition
* This method is only used for checking connectivity to the hosts. Channel is closed after checking.
* @param discoveredHosts the list of discovered hosts for the store, version, and partition, but not necessarily connectable
* @param storeName the store name
* @param version the version
* @param partition the partition
* @return the list of connectable hosts
*/
public Set<String> getConnectableHosts(
HashSet<String> discoveredHosts,
String storeName,
int version,
int partition) {
List<CompletableFuture<String>> futures = new ArrayList<>();
Set<String> connectableHosts = new HashSet<>();

discoveredHosts.removeAll(unconnectableHosts);
for (String host: discoveredHosts) {
CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> {
try {
if (connectableHosts.contains(host)) {
return host;
}
// Check if the host is connectable
Channel channel = connectToHost(host, storeName, version, partition);
if (channel != null && channel.isActive()) {
connectableHosts.add(host);
channel.close(); // this is only for checking connectivity no need to open it.
return host;
} else {
unconnectableHosts.add(host);
return null;
}
} catch (Exception e) {
unconnectableHosts.add(host);
return null;
}
}, executorService);

futures.add(future);
}

// Wait for all futures to complete
CompletableFuture<Void> allConnections = CompletableFuture.allOf(futures.toArray(new CompletableFuture[0]));
allConnections.join();

// Collect only the successfully connected hosts
for (CompletableFuture<String> future: futures) {
try {
String host = future.get();
if (host != null) {
connectableHosts.add(host);
}
} catch (Exception e) {
LOGGER.error("Error getting result from future", e);
}
}

return connectableHosts;
}

public CompletionStage<InputStream> get(
Expand All @@ -60,8 +137,11 @@ public CompletionStage<InputStream> get(
int partition,
BlobTransferTableFormat requestedTableFormat) {
CompletionStage<InputStream> inputStream = new CompletableFuture<>();
ScheduledExecutorService scheduledExecutorService = Executors.newSingleThreadScheduledExecutor();
try {
// Connects to the remote host
// Must open a new connection for each request (per store per version per partition level),
// Otherwise response will be mixed up
Channel ch = connectToHost(host, storeName, version, partition);

// Request to get the blob file and metadata
Expand All @@ -88,6 +168,16 @@ public CompletionStage<InputStream> get(
requestedTableFormat));
// Send a GET request
ch.writeAndFlush(prepareRequest(storeName, version, partition, requestedTableFormat));
// Set a timeout, otherwise if the host is not responding, the future will never complete
scheduledExecutorService.schedule(() -> {
if (!inputStream.toCompletableFuture().isDone()) {
inputStream.toCompletableFuture()
.completeExceptionally(
new TimeoutException(
"Request timed out for store " + storeName + " version " + version + " partition " + partition
+ " table format " + requestedTableFormat + " from host " + host));
}
}, 5, TimeUnit.MINUTES);
} catch (Exception e) {
if (!inputStream.toCompletableFuture().isCompletedExceptionally()) {
inputStream.toCompletableFuture().completeExceptionally(e);
Expand All @@ -98,6 +188,9 @@ public CompletionStage<InputStream> get(

public void close() {
workerGroup.shutdownGracefully();
executorService.shutdown();
unconnectableHosts.clear();
connectedHosts.clear();
}

private FullHttpRequest prepareRequest(
Expand Down
Loading