Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[dvc][server] verifying discovered host connectivity before initiating blob transfer request #1534

Merged
merged 9 commits into from
Feb 19, 2025
Original file line number Diff line number Diff line change
Expand Up @@ -320,7 +320,8 @@ public DaVinciBackend(
aggVersionedBlobTransferStats,
backendConfig.getRocksDBServerConfig().isRocksDBPlainTableFormatEnabled()
? BlobTransferTableFormat.PLAIN_TABLE
: BlobTransferTableFormat.BLOCK_BASED_TABLE);
: BlobTransferTableFormat.BLOCK_BASED_TABLE,
backendConfig.getBlobTransferPeersConnectivityFreshnessInSeconds());
} else {
aggVersionedBlobTransferStats = null;
blobTransferManager = null;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -351,6 +351,16 @@ public BlobTransferPartitionMetadata getTransferredSnapshotMetadata(String topic
* @return the metadata for the blob transfer request
*/
public BlobTransferPartitionMetadata prepareMetadata(BlobTransferPayload blobTransferRequest) {
if (storageMetadataService == null || storeVersionStateSerializer == null) {
throw new VeniceException("StorageMetadataService or storeVersionStateSerializer is not initialized");
}

if (storageMetadataService.getStoreVersionState(blobTransferRequest.getTopicName()) == null
|| storageMetadataService
.getLastOffset(blobTransferRequest.getTopicName(), blobTransferRequest.getPartition()) == null) {
throw new VeniceException("Cannot get store version state or offset record from storage metadata service.");
}

// prepare metadata
StoreVersionState storeVersionState =
storageMetadataService.getStoreVersionState(blobTransferRequest.getTopicName());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,8 @@ public static BlobTransferManager<Void> getP2PBlobTransferManagerForDVCAndStart(
int snapshotRetentionTimeInMin,
int blobTransferMaxTimeoutInMin,
AggVersionedBlobTransferStats aggVersionedBlobTransferStats,
BlobTransferUtils.BlobTransferTableFormat transferSnapshotTableFormat) {
BlobTransferUtils.BlobTransferTableFormat transferSnapshotTableFormat,
int peersConnectivityFreshnessInSeconds) {
try {
BlobSnapshotManager blobSnapshotManager = new BlobSnapshotManager(
readOnlyStoreRepository,
Expand All @@ -51,7 +52,11 @@ public static BlobTransferManager<Void> getP2PBlobTransferManagerForDVCAndStart(
transferSnapshotTableFormat);
BlobTransferManager<Void> manager = new NettyP2PBlobTransferManager(
new P2PBlobTransferService(p2pTransferServerPort, baseDir, blobTransferMaxTimeoutInMin, blobSnapshotManager),
new NettyFileTransferClient(p2pTransferClientPort, baseDir, storageMetadataService),
new NettyFileTransferClient(
p2pTransferClientPort,
baseDir,
storageMetadataService,
peersConnectivityFreshnessInSeconds),
new DaVinciBlobFinder(clientConfig),
baseDir,
aggVersionedBlobTransferStats);
Expand Down Expand Up @@ -84,7 +89,8 @@ public static BlobTransferManager<Void> getP2PBlobTransferManagerForServerAndSta
int snapshotRetentionTimeInMin,
int blobTransferMaxTimeoutInMin,
AggVersionedBlobTransferStats aggVersionedBlobTransferStats,
BlobTransferUtils.BlobTransferTableFormat transferSnapshotTableFormat) {
BlobTransferUtils.BlobTransferTableFormat transferSnapshotTableFormat,
int peersConnectivityFreshnessInSeconds) {
try {
BlobSnapshotManager blobSnapshotManager = new BlobSnapshotManager(
readOnlyStoreRepository,
Expand All @@ -95,7 +101,11 @@ public static BlobTransferManager<Void> getP2PBlobTransferManagerForServerAndSta
transferSnapshotTableFormat);
BlobTransferManager<Void> manager = new NettyP2PBlobTransferManager(
new P2PBlobTransferService(p2pTransferServerPort, baseDir, blobTransferMaxTimeoutInMin, blobSnapshotManager),
new NettyFileTransferClient(p2pTransferClientPort, baseDir, storageMetadataService),
new NettyFileTransferClient(
p2pTransferClientPort,
baseDir,
storageMetadataService,
peersConnectivityFreshnessInSeconds),
new ServerBlobFinder(customizedViewFuture),
baseDir,
aggVersionedBlobTransferStats);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,12 @@
import java.io.InputStream;
import java.time.Duration;
import java.time.Instant;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionStage;
import java.util.stream.Collectors;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;

Expand Down Expand Up @@ -92,11 +95,10 @@ public CompletionStage<InputStream> get(
}

List<String> discoverPeers = response.getDiscoveryResult();
LOGGER
.info("Discovered peers {} for store {} version {} partition {}", discoverPeers, storeName, version, partition);
Set<String> connectablePeers = getConnectableHosts(discoverPeers, storeName, version, partition);

// 2: Process peers sequentially to fetch the blob
processPeersSequentially(discoverPeers, storeName, version, partition, tableFormat, resultFuture);
processPeersSequentially(connectablePeers, storeName, version, partition, tableFormat, resultFuture);

return resultFuture;
}
Expand All @@ -121,15 +123,15 @@ public CompletionStage<InputStream> get(
* - Success case:
* 1. If the blob is successfully fetched from a peer, an InputStream of the blob is returned.
*
* @param peers the list of peers to process
* @param uniqueConnectablePeers the set of peers to process
* @param storeName the name of the store
* @param version the version of the store
* @param partition the partition of the store
* @param tableFormat the needed table format
* @param resultFuture the future to complete with the InputStream of the blob
*/
private void processPeersSequentially(
List<String> peers,
Set<String> uniqueConnectablePeers,
String storeName,
int version,
int partition,
Expand All @@ -142,19 +144,23 @@ private void processPeersSequentially(
CompletableFuture<Void> chainOfPeersFuture = CompletableFuture.completedFuture(null);

// Iterate through each peer and chain the futures
for (int currentPeerIndex = 0; currentPeerIndex < peers.size(); currentPeerIndex++) {
final int peerIndex = currentPeerIndex;
for (String chosenHost: uniqueConnectablePeers) {
// Chain the next operation to the previous future
chainOfPeersFuture = chainOfPeersFuture.thenCompose(v -> {
String chosenHost = peers.get(peerIndex).split("_")[0];

if (resultFuture.isDone()) {
// If the result future is already completed, skip the current peer
return CompletableFuture.completedFuture(null);
}

// Attempt to fetch the blob from the current peer asynchronously
LOGGER.info("Attempting to connect to host: {}", chosenHost);
LOGGER.info(
"Attempting to connect to host: {} for store {} version {} partition {} table format {}",
chosenHost,
storeName,
version,
partition,
tableFormat);

return nettyClient.get(chosenHost, storeName, version, partition, tableFormat)
.toCompletableFuture()
Expand Down Expand Up @@ -244,4 +250,38 @@ private void updateBlobTransferFileReceiveStats(double transferTime, String stor
e);
}
}

/**
* Get the connectable hosts for the given storeName, version, and partition
* @param discoverPeers the list of discovered peers
* @param storeName the name of the store
* @param version the version of the store
* @param partition the partition of the store
* @return the set of unique connectable hosts
*/
private Set<String> getConnectableHosts(List<String> discoverPeers, String storeName, int version, int partition) {
// Extract unique hosts from the discovered peers
Set<String> uniquePeers = discoverPeers.stream().map(peer -> peer.split("_")[0]).collect(Collectors.toSet());

LOGGER.info(
"Discovered {} unique peers store {} version {} partition {}, peers are {}",
uniquePeers.size(),
storeName,
version,
partition,
uniquePeers);

// Get the connectable hosts for this store, version, and partition
Set<String> connectablePeers =
nettyClient.getConnectableHosts((HashSet<String>) uniquePeers, storeName, version, partition);

LOGGER.info(
"Total {} unique connectable peers for store {} version {} partition {}, peers are {}",
connectablePeers.size(),
storeName,
version,
partition,
connectablePeers);
return connectablePeers;
}
}
Original file line number Diff line number Diff line change
@@ -1,8 +1,11 @@
package com.linkedin.davinci.blobtransfer.client;

import com.linkedin.alpini.base.concurrency.Executors;
import com.linkedin.davinci.blobtransfer.BlobTransferUtils.BlobTransferTableFormat;
import com.linkedin.davinci.storage.StorageMetadataService;
import com.linkedin.venice.exceptions.VenicePeersConnectionException;
import com.linkedin.venice.utils.DaemonThreadFactory;
import com.linkedin.venice.utils.concurrent.VeniceConcurrentHashMap;
import io.netty.bootstrap.Bootstrap;
import io.netty.channel.Channel;
import io.netty.channel.ChannelInitializer;
Expand All @@ -18,27 +21,49 @@
import io.netty.handler.codec.http.HttpVersion;
import io.netty.handler.timeout.IdleStateHandler;
import java.io.InputStream;
import java.util.ArrayList;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionStage;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;


public class NettyFileTransferClient {
private static final Logger LOGGER = LogManager.getLogger(NettyFileTransferClient.class);
private static final int MAX_METADATA_CONTENT_LENGTH = 1024 * 1024 * 100;
private static final int TIMEOUT_IN_MINUTES = 5;
EventLoopGroup workerGroup;
Bootstrap clientBootstrap;
private final String baseDir;
private final int serverPort;
private final int peersConnectivityFreshnessInSeconds;
private StorageMetadataService storageMetadataService;
private final ExecutorService hostConnectExecutorService;
private final ScheduledExecutorService connectTimeoutScheduler;

// A map to contain the connectable and unconnectable hosts for saving effort on reconnection
// format: host -> timestamp of the last connection attempt
private VeniceConcurrentHashMap<String, Long> unconnectableHostsToTimestamp = new VeniceConcurrentHashMap<>();
private VeniceConcurrentHashMap<String, Long> connectedHostsToTimestamp = new VeniceConcurrentHashMap<>();

// TODO 1: move tunable configs to a config class
// TODO 2: consider either increasing worker threads or have a dedicated thread pool to handle requests.
public NettyFileTransferClient(int serverPort, String baseDir, StorageMetadataService storageMetadataService) {
public NettyFileTransferClient(
int serverPort,
String baseDir,
StorageMetadataService storageMetadataService,
int peersConnectivityFreshnessInSeconds) {
this.baseDir = baseDir;
this.serverPort = serverPort;
this.storageMetadataService = storageMetadataService;
this.peersConnectivityFreshnessInSeconds = peersConnectivityFreshnessInSeconds;

clientBootstrap = new Bootstrap();
workerGroup = new NioEventLoopGroup();
Expand All @@ -51,6 +76,110 @@ public void initChannel(SocketChannel ch) {
ch.pipeline().addLast(new HttpClientCodec());
}
});
this.hostConnectExecutorService =
Executors.newCachedThreadPool(new DaemonThreadFactory("Venice-BlobTransfer-Host-Connect-Executor-Service"));
this.connectTimeoutScheduler = Executors
.newSingleThreadScheduledExecutor(new DaemonThreadFactory("Venice-BlobTransfer-Client-Timeout-Checker"));
}

/**
* A method to get the connectable hosts for the given store, version, and partition
* This method is only used for checking connectivity to the hosts. Channel is closed after checking.
* @param discoveredHosts the list of discovered hosts for the store, version, and partition, but not necessarily connectable
* @param storeName the store name
* @param version the version
* @param partition the partition
* @return the list of connectable hosts
*/
public Set<String> getConnectableHosts(
HashSet<String> discoveredHosts,
String storeName,
int version,
int partition) {
List<CompletableFuture<String>> futures = new ArrayList<>();

purgeStaleConnectivityRecords();

discoveredHosts.removeAll(unconnectableHostsToTimestamp.keySet());
for (String host: discoveredHosts) {
CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> {
try {
if (connectedHostsToTimestamp.keySet().contains(host)) {
return host; // already verified via previous connection
}
// Check if the host is connectable
Channel channel = connectToHost(host, storeName, version, partition);
if (channel != null && channel.isActive()) {
// Mark the host as connected
connectedHostsToTimestamp.put(host, System.currentTimeMillis());
channel.close(); // this is only for checking connectivity no need to open it.
return host;
} else {
unconnectableHostsToTimestamp.put(host, System.currentTimeMillis());
return null;
}
} catch (Exception e) {
unconnectableHostsToTimestamp.put(host, System.currentTimeMillis());
return null;
}
}, hostConnectExecutorService);

futures.add(future);
}

// Wait for all futures to complete
CompletableFuture<Void> allConnections = CompletableFuture.allOf(futures.toArray(new CompletableFuture[0]));

connectTimeoutScheduler.schedule(() -> {
if (!allConnections.isDone()) {
for (CompletableFuture<String> future: futures) {
if (!future.isDone()) {
future.complete(null);
}
}
allConnections.complete(null);
}
}, TIMEOUT_IN_MINUTES, TimeUnit.MINUTES);

allConnections.join();

// Collect only the successfully connected hosts
Set<String> connectableHostsResult = new HashSet<>();
for (CompletableFuture<String> future: futures) {
try {
String host = future.get();
if (host != null) {
connectableHostsResult.add(host);
}
} catch (Exception e) {
LOGGER.error("Error getting result from future", e);
}
}

return connectableHostsResult;
}

/**
* Check the freshness of the connectivity records and purge the stale records
*/
private void purgeStaleConnectivityRecords() {
// Purge the unconnectable hosts
for (String host: unconnectableHostsToTimestamp.keySet()) {
Long lastAttempt = unconnectableHostsToTimestamp.get(host);
if (lastAttempt == null || System.currentTimeMillis() - lastAttempt > TimeUnit.SECONDS
.toMillis(peersConnectivityFreshnessInSeconds)) {
unconnectableHostsToTimestamp.remove(host);
}
}

// Purge the connected hosts
for (String host: connectedHostsToTimestamp.keySet()) {
Long lastConnected = connectedHostsToTimestamp.get(host);
if (lastConnected == null || System.currentTimeMillis() - lastConnected > TimeUnit.SECONDS
.toMillis(peersConnectivityFreshnessInSeconds)) {
connectedHostsToTimestamp.remove(host);
}
}
}

public CompletionStage<InputStream> get(
Expand All @@ -62,6 +191,8 @@ public CompletionStage<InputStream> get(
CompletionStage<InputStream> inputStream = new CompletableFuture<>();
try {
// Connects to the remote host
// Must open a new connection for each request (per store per version per partition level),
// Otherwise response will be mixed up
Channel ch = connectToHost(host, storeName, version, partition);

// Request to get the blob file and metadata
Expand All @@ -88,6 +219,16 @@ public CompletionStage<InputStream> get(
requestedTableFormat));
// Send a GET request
ch.writeAndFlush(prepareRequest(storeName, version, partition, requestedTableFormat));
// Set a timeout, otherwise if the host is not responding, the future will never complete
connectTimeoutScheduler.schedule(() -> {
if (!inputStream.toCompletableFuture().isDone()) {
inputStream.toCompletableFuture()
.completeExceptionally(
new TimeoutException(
"Request timed out for store " + storeName + " version " + version + " partition " + partition
+ " table format " + requestedTableFormat + " from host " + host));
}
}, TIMEOUT_IN_MINUTES, TimeUnit.MINUTES);
} catch (Exception e) {
if (!inputStream.toCompletableFuture().isCompletedExceptionally()) {
inputStream.toCompletableFuture().completeExceptionally(e);
Expand All @@ -98,6 +239,10 @@ public CompletionStage<InputStream> get(

public void close() {
workerGroup.shutdownGracefully();
hostConnectExecutorService.shutdown();
connectTimeoutScheduler.shutdown();
unconnectableHostsToTimestamp.clear();
connectedHostsToTimestamp.clear();
}

private FullHttpRequest prepareRequest(
Expand Down
Loading
Loading