Skip to content
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark.network.client;

public interface PrepareRequestReceivedCallBack {
void onSuccess();

void onFailure(Throwable e);
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark.network.shuffle;

public interface BlockPreparingListener {
void onBlockPrepareSuccess();
void onBlockPrepareFailure(Throwable exception);
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,88 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark.network.shuffle;

import java.lang.Override;
import java.lang.String;
import java.lang.Throwable;
import java.nio.ByteBuffer;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.spark.network.client.PrepareRequestReceivedCallBack;
import org.apache.spark.network.client.RpcResponseCallback;
import org.apache.spark.network.client.TransportClient;
import org.apache.spark.network.shuffle.protocol.PrepareBlocks;


public class BlockToPrepareInfoSender {
private final Logger logger = LoggerFactory.getLogger(BlockToPrepareInfoSender.class);

private final TransportClient client;
private final PrepareBlocks prepareMessage;
private final String[] blockIds;
private final String[] blocksToRelease;
private final BlockPreparingListener listener;
private final PrepareRequestReceivedCallBack requestReceivedCallBack;

public BlockToPrepareInfoSender(
TransportClient client,
String appId,
String execId,
String[] blockIds,
String[] blocksToRelease,
BlockPreparingListener listener) {
this.client = client;
this.prepareMessage = new PrepareBlocks(appId, execId, blockIds, blocksToRelease);
this.blockIds = blockIds;
this.blocksToRelease = blocksToRelease;
this.listener = listener;
this.requestReceivedCallBack = new PrepareCallBack();
}

private class PrepareCallBack implements PrepareRequestReceivedCallBack {
@Override
public void onSuccess() {
listener.onBlockPrepareSuccess();
}

@Override
public void onFailure(Throwable e) {
listener.onBlockPrepareFailure(e);
}
}

public void start() {
if (blockIds.length == 0) {
// throw new IllegalArgumentException("Zero-size blockIds array");
logger.warn("Zero-size blockIds array");
}

client.sendRpc(prepareMessage.toByteBuffer(), new RpcResponseCallback() {
@Override
public void onSuccess(ByteBuffer response) {
logger.debug("Successfully send prepare block's info, ready for the next step");
}

@Override
public void onFailure(Throwable e) {
logger.error("Failed while send the prepare message");
}
});
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -151,4 +151,38 @@ public void registerWithShuffleServer(
public void close() {
clientFactory.close();
}

@Override
public void prepareBlocks(
final String host,
final int port,
final String execId,
String[] prepareBlockIds,
final String[] releaseBlockIds,
BlockPreparingListener listener) {
logger.debug("Send prepare block info to {}:{} (executor id {})", host, port, execId);

try {
RetryingBlockPreparer.PreparerStarter blockPrepareStarter = new RetryingBlockPreparer.PreparerStarter() {
@Override
public void createAndStart(String[] prepareBlockIds, String[] releaseBlocks, BlockPreparingListener listener) throws IOException {
TransportClient client = clientFactory.createClient(host, port);
new BlockToPrepareInfoSender(client, appId, execId, prepareBlockIds,
releaseBlockIds, listener).start();
}
};

int maxRetries = conf.maxIORetries();
if (maxRetries > 0) {
new RetryingBlockPreparer(conf, blockPrepareStarter, prepareBlockIds,
releaseBlockIds, listener).start();
} else {
blockPrepareStarter.createAndStart(prepareBlockIds, releaseBlockIds, listener);
}

} catch (Exception e) {
logger.error("Exception while sending the block list", e);
listener.onBlockPrepareFailure(e);
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,167 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark.network.shuffle;

import com.google.common.collect.Sets;
import com.google.common.util.concurrent.Uninterruptibles;
import org.apache.spark.network.util.NettyUtils;
import org.apache.spark.network.util.TransportConf;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.IOException;
import java.util.Collections;
import java.util.LinkedHashSet;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;

public class RetryingBlockPreparer {

public static interface PreparerStarter {
void createAndStart(String[] prepareBlockIds, String[] releaseBlocks, BlockPreparingListener listener) throws IOException;
}

private static final ExecutorService executorService = Executors.newCachedThreadPool(
NettyUtils.createThreadFactory("Prepare Info Send Retry")
);

private final Logger logger = LoggerFactory.getLogger(RetryingBlockPreparer.class);

private final PreparerStarter preparerStarter;

private final BlockPreparingListener listener;

private final int maxRetries;

private final int retryWaitTime;

private int retryCount = 0;

private final LinkedHashSet<String> outstandingBlockInfosForPrepare;

private final LinkedHashSet<String> outStandingBlockInfosForRelease;

private RetryingBlockPreparerListener currentListener;

public RetryingBlockPreparer(
TransportConf conf,
PreparerStarter prepareStarter,
String[] prepareBlockIds,
String[] releaseBlockIds,
BlockPreparingListener listener) {
this.preparerStarter = prepareStarter;
this.listener = listener;
this.maxRetries = conf.maxIORetries();
this.retryWaitTime = conf.ioRetryWaitTimeMs();
this.outstandingBlockInfosForPrepare = Sets.newLinkedHashSet();
this.outStandingBlockInfosForRelease = Sets.newLinkedHashSet();
Collections.addAll(outstandingBlockInfosForPrepare, prepareBlockIds);
Collections.addAll(outStandingBlockInfosForRelease, releaseBlockIds);
this.currentListener = new RetryingBlockPreparerListener();
}

public void start(){
senAllOutStanding();
}

private void senAllOutStanding() {
String[] blockIdsToSendForPrepare;
String[] blockIdsToSendForRelease;
int numRetries;
RetryingBlockPreparerListener myListener;
synchronized (this) {
blockIdsToSendForPrepare = outstandingBlockInfosForPrepare.toArray(new String[outstandingBlockInfosForPrepare.size()]);
blockIdsToSendForRelease = outStandingBlockInfosForRelease.toArray(new String[outStandingBlockInfosForRelease.size()]);
numRetries = retryCount;
myListener = currentListener;
}

try {
preparerStarter.createAndStart(blockIdsToSendForPrepare, blockIdsToSendForRelease ,myListener);
listener.onBlockPrepareSuccess();
} catch (Exception e) {
logger.error(String.format("Exception while begin send %s outstanding block info %s",
blockIdsToSendForPrepare.length, numRetries > 0 ? "(after )" + numRetries + "retries)" : ""), e);
if (shouldRetry(e)) {
initiateRetry();
} else {
for (String bid: blockIdsToSendForPrepare) {
listener.onBlockPrepareFailure(e);
}
}
}
}

private synchronized void initiateRetry(){
retryCount += 1;
currentListener = new RetryingBlockPreparerListener();
logger.info("Retrying send ({}/{}) for {} outstading_prepare and release blocks after {} ms",
retryCount, maxRetries, outstandingBlockInfosForPrepare.size()+outStandingBlockInfosForRelease.size(), retryWaitTime);

executorService.submit(new Runnable() {
@Override
public void run() {
Uninterruptibles.sleepUninterruptibly(retryWaitTime, TimeUnit.MILLISECONDS);
senAllOutStanding();
}
});
}

private synchronized boolean shouldRetry(Throwable e) {
boolean isIOException = e instanceof IOException
|| (e.getCause() != null
&& e.getCause() instanceof IOException);
boolean hasRemainRetries = retryCount < maxRetries;
return isIOException && hasRemainRetries;
}

private class RetryingBlockPreparerListener implements BlockPreparingListener {
@Override
public void onBlockPrepareSuccess() {
boolean shouldForwardSuccess = false;
synchronized (RetryingBlockPreparer.this) {
if (this == currentListener) {
shouldForwardSuccess = true;
}
}

if (shouldForwardSuccess) {
listener.onBlockPrepareSuccess();
}
}

@Override
public void onBlockPrepareFailure(Throwable exception) {
boolean shouldForwardFailure = false;
synchronized (RetryingBlockPreparer.this) {
if (this == currentListener) {
initiateRetry();
} else {
logger.error(String.format("PrepareBlock failed to send blocks' info, " +
"and will not retry (%s retries)", retryCount), exception);
shouldForwardFailure = true;
}
}

if (shouldForwardFailure) {
listener.onBlockPrepareFailure(exception);
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -41,4 +41,15 @@ public abstract void fetchBlocks(
String execId,
String[] blockIds,
BlockFetchingListener listener);

/**
* Prepare a sequence of blocks from remote node asynchronously
*/
public abstract void prepareBlocks(
String host,
int port,
String execId,
String[] prepareBlockIds,
String[] releaseBlocks,
BlockPreparingListener listener);
}
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ public abstract class BlockTransferMessage implements Encodable {
/** Preceding every serialized message is its type, which allows us to deserialize it. */
public enum Type {
OPEN_BLOCKS(0), UPLOAD_BLOCK(1), REGISTER_EXECUTOR(2), STREAM_HANDLE(3), REGISTER_DRIVER(4),
HEARTBEAT(5);
HEARTBEAT(5), PREPARE_BLOCKS(6);

private final byte id;

Expand All @@ -67,6 +67,7 @@ public static BlockTransferMessage fromByteBuffer(ByteBuffer msg) {
case 3: return StreamHandle.decode(buf);
case 4: return RegisterDriver.decode(buf);
case 5: return ShuffleServiceHeartbeat.decode(buf);
case 6: return PrepareBlocks.decode(buf);
default: throw new IllegalArgumentException("Unknown message type: " + type);
}
}
Expand Down
Loading