diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/Fetcher.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/Fetcher.java index 5b316c1c2d546..16d126da49ddd 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/Fetcher.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/Fetcher.java @@ -102,6 +102,7 @@ private enum ShuffleErrors{IO_ERROR, WRONG_LENGTH, BAD_ID, WRONG_MAP, protected HttpURLConnection connection; private volatile boolean stopped = false; + private ShuffleHeader.HeaderVersionProtocol headerVersionProtocol; // Initiative value is 0, which means it hasn't retried yet. private long retryStartTime = 0; @@ -452,10 +453,11 @@ private void verifyConnection(URL url, String msgToEncode, String encHash) ": " + connection.getResponseMessage()); } // get the shuffle version - if (!ShuffleHeader.DEFAULT_HTTP_HEADER_NAME.equals( - connection.getHeaderField(ShuffleHeader.HTTP_HEADER_NAME)) - || !ShuffleHeader.DEFAULT_HTTP_HEADER_VERSION.equals( - connection.getHeaderField(ShuffleHeader.HTTP_HEADER_VERSION))) { + headerVersionProtocol = ShuffleHeader + .getHeaderVersionProtocol(connection.getHeaderField(ShuffleHeader.HTTP_HEADER_VERSION)); + boolean isCompatible = headerVersionProtocol + .isHeaderCompatible(connection.getHeaderField(ShuffleHeader.HTTP_HEADER_NAME)); + if (!isCompatible) { throw new IOException("Incompatible shuffle response version"); } // get the replyHash which is HMac of the encHash we sent to the server @@ -480,6 +482,9 @@ private void setupShuffleConnection(String encHash) { ShuffleHeader.DEFAULT_HTTP_HEADER_NAME); connection.addRequestProperty(ShuffleHeader.HTTP_HEADER_VERSION, ShuffleHeader.DEFAULT_HTTP_HEADER_VERSION); + // set target version param to negotiate with shuffle server + connection.addRequestProperty(ShuffleHeader.HTTP_HEADER_TARGET_VERSION, + ShuffleHeader.getNewestVersion().getVersionStr()); } private static TaskAttemptID[] EMPTY_ATTEMPT_ID_ARRAY = new TaskAttemptID[0]; @@ -498,7 +503,7 @@ private TaskAttemptID[] copyMapOutput(MapHost host, int forReduce = -1; //Read the shuffle header try { - ShuffleHeader header = new ShuffleHeader(); + ShuffleHeader header = new ShuffleHeader(headerVersionProtocol.getCompatibleVersion()); header.readFields(input); mapId = TaskAttemptID.forName(header.mapId); compressedLength = header.compressedLength; diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/ShuffleHeader.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/ShuffleHeader.java index b42c018427dcc..6619e07ee95b0 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/ShuffleHeader.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/ShuffleHeader.java @@ -20,27 +20,47 @@ import java.io.DataInput; import java.io.DataOutput; import java.io.IOException; +import java.util.List; +import org.apache.commons.lang3.StringUtils; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.classification.InterfaceStability; import org.apache.hadoop.io.Text; import org.apache.hadoop.io.Writable; import org.apache.hadoop.io.WritableUtils; +import com.google.common.collect.Lists; + /** - * Shuffle Header information that is sent by the TaskTracker and + * Shuffle Header information that is sent by the TaskTracker and * deciphered by the Fetcher thread of Reduce task * */ @InterfaceAudience.Private @InterfaceStability.Stable public class ShuffleHeader implements Writable { + private static final Log LOG = LogFactory.getLog(ShuffleHeader.class); /** Header info of the shuffle http request/response */ public static final String HTTP_HEADER_NAME = "name"; public static final String DEFAULT_HTTP_HEADER_NAME = "mapreduce"; public static final String HTTP_HEADER_VERSION = "version"; + public static final String HTTP_HEADER_TARGET_VERSION = "target_version"; public static final String DEFAULT_HTTP_HEADER_VERSION = "1.0.0"; + /** Header version instances*/ + public static final HeaderVersion DEFAULT_HEADER_VERSION_INSTANCE = new HeaderVersion( + DEFAULT_HTTP_HEADER_VERSION); + public static final HeaderVersion HEADER_VERSION_INSTANCE_V1_1 = new HeaderVersion("1.1.0"); + // the smaller index is the newer version, if client&server not matched, should set back to nex index + public static final List HEADER_VERSION_LIST = Lists + .newArrayList( + HEADER_VERSION_INSTANCE_V1_1, + DEFAULT_HEADER_VERSION_INSTANCE + ); + public static final HeaderVersionProtocol DEFAULT_VERSION_PROTOCOL = new HeaderVersionProtocol( + DEFAULT_HEADER_VERSION_INSTANCE, DEFAULT_HEADER_VERSION_INSTANCE); /** * The longest possible length of task attempt id that we will accept. @@ -51,22 +71,56 @@ public class ShuffleHeader implements Writable { long uncompressedLength; long compressedLength; int forReduce; - - public ShuffleHeader() { } - + /** + * use to decide property to write or read. + */ + HeaderVersion headerVersion; + + /** + * for shuffle client + */ + public ShuffleHeader() { + this.headerVersion = DEFAULT_HEADER_VERSION_INSTANCE; + } + + /** + * for shuffle client + */ + public ShuffleHeader(HeaderVersion headerVersion) { + this.headerVersion = headerVersion; + } + public ShuffleHeader(String mapId, long compressedLength, - long uncompressedLength, int forReduce) { + long uncompressedLength, int forReduce,HeaderVersion headerVersion) { this.mapId = mapId; this.compressedLength = compressedLength; this.uncompressedLength = uncompressedLength; this.forReduce = forReduce; + this.headerVersion = headerVersion; + } + + public ShuffleHeader(String mapId, long compressedLength, + long uncompressedLength, int forReduce) { + this(mapId, compressedLength, uncompressedLength, forReduce, DEFAULT_HEADER_VERSION_INSTANCE); } - + public void readFields(DataInput in) throws IOException { mapId = WritableUtils.readStringSafely(in, MAX_ID_LENGTH); compressedLength = WritableUtils.readVLong(in); uncompressedLength = WritableUtils.readVLong(in); forReduce = WritableUtils.readVInt(in); + readByVersion(in); + } + + private void readByVersion(DataInput in) throws IOException { + if (headerVersion == null || headerVersion.compareTo(DEFAULT_HEADER_VERSION_INSTANCE) == 0) { + return; + } + // if current version larger then target version, + // we should read the properties owned by the version in order. + if (headerVersion.compareTo(HEADER_VERSION_INSTANCE_V1_1) >= 0) { + // todo here read version properties + } } public void write(DataOutput out) throws IOException { @@ -74,5 +128,232 @@ public void write(DataOutput out) throws IOException { WritableUtils.writeVLong(out, compressedLength); WritableUtils.writeVLong(out, uncompressedLength); WritableUtils.writeVInt(out, forReduce); + writeByVersion(out); + } + + /** + * unified hard code header version for new properties. + * @throws IOException + */ + private void writeByVersion(DataOutput out) throws IOException { + if (headerVersion == null || headerVersion.compareTo(DEFAULT_HEADER_VERSION_INSTANCE) == 0) { + return; + } + // if current version larger then target version, + // we should serialize the properties owned by the version in order. + if (headerVersion.compareTo(HEADER_VERSION_INSTANCE_V1_1) >= 0) { + // todo here write version properties + } + } + + public void setHeaderVersion( + String headerVersion) { this.headerVersion = new HeaderVersion(headerVersion); } + + /** + * get current newest header version, which client or server side can support. + * @return + */ + public static HeaderVersion getNewestVersion() { + return HEADER_VERSION_LIST.get(0); + } + + /** + * for shuffle server + * @param currentHeaderVersion + * @param targetHeaderVersion + * @return + */ + public static HeaderVersionProtocol getHeaderVersionProtocol(String currentHeaderVersion, + String targetHeaderVersion) { + HeaderVersion current = currentHeaderVersion == null ? DEFAULT_HEADER_VERSION_INSTANCE + : new HeaderVersion(currentHeaderVersion); + // if client request header not contains target_version, + // means in upgrade phase, use current header version in compatible + HeaderVersion target = targetHeaderVersion == null ? current + : new HeaderVersion(targetHeaderVersion); + + return new HeaderVersionProtocol(current, target); + } + + /** + * for shuffle client + * @param currentHeaderVersion + * @return + */ + public static HeaderVersionProtocol getHeaderVersionProtocol(String currentHeaderVersion) { + HeaderVersion current = currentHeaderVersion == null ? DEFAULT_HEADER_VERSION_INSTANCE + : new HeaderVersion(currentHeaderVersion); + + return new HeaderVersionProtocol(current); + } + + public static class HeaderVersionProtocol { + + // for compatible + private HeaderVersion defaultVersion; + private HeaderVersion targetVersion; + // final chosen version + private HeaderVersion compatibleVersion; + + /** + * for shuffle client + * @param defaultVersion + */ + public HeaderVersionProtocol( + HeaderVersion defaultVersion) { + this.defaultVersion = defaultVersion; + this.targetVersion = defaultVersion; + this.compatibleVersion = defaultVersion; + } + + /** + * for shuffle server + * @param defaultVersion + * @param targetVersion + */ + public HeaderVersionProtocol( + HeaderVersion defaultVersion, + HeaderVersion targetVersion) { + this.defaultVersion = defaultVersion; + this.targetVersion = targetVersion; + setUpCompatibleVersion(); + } + + private void setUpCompatibleVersion() { + HeaderVersion matchedDefaultVersion = null; + for (HeaderVersion version : HEADER_VERSION_LIST) { + if (version.compareTo(targetVersion) <= 0) { + compatibleVersion = version; + // find first one which less or equal than target + // if found, should break. + break; + } + if (version.compareTo(defaultVersion) == 0) { + matchedDefaultVersion = version; + } + } + + // if can not find compatible version, set to client default version + if (compatibleVersion == null) { + compatibleVersion = matchedDefaultVersion; + } + } + + public boolean isHeaderCompatible(String headerName) { + if (!ShuffleHeader.DEFAULT_HTTP_HEADER_NAME.equals(headerName)) { + LOG.error( + String.format( + "Shuffle isHeaderCompatible: false, request header name: %s", headerName)); + return false; + } + String versionLogMsg = getVersionMsg(); + if (compatibleVersion == null || !isMatchedVersion()) { + LOG.error(String.format("Shuffle version is not compatible, %s", versionLogMsg)); + return false; + } + + // compare with newest version + if (compatibleVersion.compareTo(getNewestVersion()) != 0) { + if (LOG.isDebugEnabled()) { + LOG.debug(String.format("Shuffle version should fall back to compatible version: %s, %s", + compatibleVersion, versionLogMsg)); + } + } + return true; + } + + private boolean isMatchedVersion() { + for (HeaderVersion version : HEADER_VERSION_LIST) { + if (version.compareTo(compatibleVersion) == 0) { + return true; + } + } + return false; + } + + private String getVersionMsg() { + StringBuilder supportedVersion = new StringBuilder(); + for (HeaderVersion headerVersion : HEADER_VERSION_LIST) { + supportedVersion.append(headerVersion.getVersionStr()).append(","); + } + return String.format("get protocol: %s, supported versions: %s", + this.toString(), supportedVersion.toString()); + } + + @Override + public String toString() { + return "HeaderVersionProtocol{" + + "defaultVersion=" + defaultVersion + + ", targetVersion=" + targetVersion + + ", compatibleVersion=" + compatibleVersion + + '}'; + } + + public HeaderVersion getDefaultVersion() { return defaultVersion; } + + public void setDefaultVersion( + HeaderVersion defaultVersion) { this.defaultVersion = defaultVersion; } + + public HeaderVersion getTargetVersion() { return targetVersion; } + + public void setTargetVersion( + HeaderVersion targetVersion) { this.targetVersion = targetVersion; } + + public HeaderVersion getCompatibleVersion() { return compatibleVersion; } + + public void setCompatibleVersion( + HeaderVersion compatibleVersion) { this.compatibleVersion = compatibleVersion; } + } + + /** + * parse header version & compare between versions + * eg 1.0.0 + */ + public static class HeaderVersion implements Comparable { + + private final String versionStr; + private final int majorVersion; + private final int minorVersion; + private final int revision; + + public HeaderVersion(String version) { + versionStr = version; + String[] versions = StringUtils.split(version, "."); + majorVersion = Integer.parseInt(versions[0]); + minorVersion = Integer.parseInt(versions[1]); + revision = Integer.parseInt(versions[2]); + } + + public String getVersionStr() { return versionStr; } + + @Override + public int compareTo(HeaderVersion o) { + if (o.majorVersion > majorVersion) { + return -1; + } else if (o.majorVersion < majorVersion) { + return 1; + } + + if (o.minorVersion > minorVersion) { + return -1; + } else if (o.minorVersion < minorVersion) { + return 1; + } + + if (o.revision > revision) { + return -1; + } else if (o.revision < revision) { + return 1; + } + + return 0; + } + + @Override + public String toString() { + return "HeaderVersion{" + + "versionStr='" + versionStr + '\'' + + '}'; + } } } \ No newline at end of file diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapreduce/task/reduce/TestFetcher.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapreduce/task/reduce/TestFetcher.java index ed35ff6a2bd64..5c5f1a9ecc10d 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapreduce/task/reduce/TestFetcher.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapreduce/task/reduce/TestFetcher.java @@ -716,6 +716,26 @@ public void testCopyFromHostWithRetryUnreserve() throws Exception { verify(immo).abort(); } + @Test(timeout = 10000) + public void testHeaderVersionCompatible() { + // client [1.0.0, 1.1.0, 1.1.1], server [1.0.0, 1.1.0] + // compatible: 1.1.0 + ShuffleHeader.HeaderVersionProtocol protocol = + ShuffleHeader.getHeaderVersionProtocol("1.0.0", "1.1.1"); + assertTrue(protocol.isHeaderCompatible(ShuffleHeader.DEFAULT_HTTP_HEADER_NAME)); + assertEquals(protocol.getCompatibleVersion().getVersionStr(), "1.1.0"); + // client [1.0.0, 1.1.0], server [1.0.0, 1.1.0] + // compatible: 1.1.0 + protocol = ShuffleHeader.getHeaderVersionProtocol("1.0.0", "1.1.0"); + assertTrue(protocol.isHeaderCompatible(ShuffleHeader.DEFAULT_HTTP_HEADER_NAME)); + assertEquals(protocol.getCompatibleVersion().getVersionStr(), "1.1.0"); + // client [1.0.0, 1.0.0], server [1.0.0, 1.1.0] + // compatible: 1.0.0 + protocol = ShuffleHeader.getHeaderVersionProtocol("1.0.0", "1.0.0"); + assertTrue(protocol.isHeaderCompatible(ShuffleHeader.DEFAULT_HTTP_HEADER_NAME)); + assertEquals(protocol.getCompatibleVersion().getVersionStr(), "1.0.0"); + } + public static class FakeFetcher extends Fetcher { // If connection need to be reopen. diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-shuffle/src/main/java/org/apache/hadoop/mapred/ShuffleHandler.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-shuffle/src/main/java/org/apache/hadoop/mapred/ShuffleHandler.java index 1ffb93300b82d..d9eabb55fa1fd 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-shuffle/src/main/java/org/apache/hadoop/mapred/ShuffleHandler.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-shuffle/src/main/java/org/apache/hadoop/mapred/ShuffleHandler.java @@ -933,14 +933,20 @@ public void messageReceived(ChannelHandlerContext ctx, MessageEvent evt) sendError(ctx, METHOD_NOT_ALLOWED); return; } - // Check whether the shuffle version is compatible - if (!ShuffleHeader.DEFAULT_HTTP_HEADER_NAME.equals( - request.headers() != null ? - request.headers().get(ShuffleHeader.HTTP_HEADER_NAME) : null) - || !ShuffleHeader.DEFAULT_HTTP_HEADER_VERSION.equals( - request.headers() != null ? - request.headers() - .get(ShuffleHeader.HTTP_HEADER_VERSION) : null)) { + + boolean isCompatible = request.headers() != null; + ShuffleHeader.HeaderVersionProtocol headerVersionProtocol = null; + if (isCompatible) { + headerVersionProtocol = + ShuffleHeader.getHeaderVersionProtocol( + request.headers().get(ShuffleHeader.HTTP_HEADER_VERSION), + request.headers().get(ShuffleHeader.HTTP_HEADER_TARGET_VERSION)); + // Check whether the shuffle version is compatible + isCompatible = + headerVersionProtocol.isHeaderCompatible( + request.headers().get(ShuffleHeader.HTTP_HEADER_NAME)); + } + if (!isCompatible) { sendError(ctx, "Incompatible shuffle request version", BAD_REQUEST); } final Map> q = @@ -995,7 +1001,7 @@ public void messageReceived(ChannelHandlerContext ctx, MessageEvent evt) HttpResponse response = new DefaultHttpResponse(HTTP_1_1, OK); try { verifyRequest(jobId, ctx, request, response, - new URL("http", "", this.port, reqUri)); + new URL("http", "", this.port, reqUri), headerVersionProtocol); } catch (IOException e) { LOG.warn("Shuffle failure ", e); sendError(ctx, e.getMessage(), UNAUTHORIZED); @@ -1013,7 +1019,7 @@ public void messageReceived(ChannelHandlerContext ctx, MessageEvent evt) try { populateHeaders(mapIds, jobId, user, reduceId, request, - response, keepAliveParam, mapOutputInfoMap); + response, keepAliveParam, mapOutputInfoMap, headerVersionProtocol); } catch(IOException e) { ch.write(response); LOG.error("Shuffle error in populating headers :", e); @@ -1106,7 +1112,12 @@ private String getBaseLocation(String jobId, String user) { } protected MapOutputInfo getMapOutputInfo(String mapId, int reduce, - String jobId, String user) throws IOException { + String jobId, String user) throws IOException { + return getMapOutputInfo(mapId, reduce, jobId, user, ShuffleHeader.DEFAULT_VERSION_PROTOCOL); + } + + protected MapOutputInfo getMapOutputInfo(String mapId, int reduce, + String jobId, String user, ShuffleHeader.HeaderVersionProtocol protocol) throws IOException { AttemptPathInfo pathInfo; try { AttemptPathIdentifier identifier = new AttemptPathIdentifier( @@ -1135,13 +1146,14 @@ protected MapOutputInfo getMapOutputInfo(String mapId, int reduce, } MapOutputInfo outputInfo = new MapOutputInfo(pathInfo.dataPath, info); + outputInfo.setHeaderVersion(protocol.getCompatibleVersion()); return outputInfo; } protected void populateHeaders(List mapIds, String jobId, String user, int reduce, HttpRequest request, HttpResponse response, - boolean keepAliveParam, Map mapOutputInfoMap) - throws IOException { + boolean keepAliveParam, Map mapOutputInfoMap, + ShuffleHeader.HeaderVersionProtocol protocol) throws IOException { long contentLength = 0; for (String mapId : mapIds) { @@ -1151,8 +1163,12 @@ protected void populateHeaders(List mapIds, String jobId, } ShuffleHeader header = - new ShuffleHeader(mapId, outputInfo.indexRecord.partLength, - outputInfo.indexRecord.rawLength, reduce); + new ShuffleHeader( + mapId, + outputInfo.indexRecord.partLength, + outputInfo.indexRecord.rawLength, + reduce, + protocol.getCompatibleVersion()); DataOutputBuffer dob = new DataOutputBuffer(); header.write(dob); @@ -1197,15 +1213,29 @@ protected void setResponseHeaders(HttpResponse response, class MapOutputInfo { final Path mapOutputFileName; final IndexRecord indexRecord; + private ShuffleHeader.HeaderVersion headerVersion; MapOutputInfo(Path mapOutputFileName, IndexRecord indexRecord) { this.mapOutputFileName = mapOutputFileName; this.indexRecord = indexRecord; } + + public void setHeaderVersion(ShuffleHeader.HeaderVersion headerVersion) { + this.headerVersion = headerVersion; + } + + public ShuffleHeader.HeaderVersion getHeaderVersion() { + return headerVersion; + } } - protected void verifyRequest(String appid, ChannelHandlerContext ctx, - HttpRequest request, HttpResponse response, URL requestUri) + protected void verifyRequest( + String appid, + ChannelHandlerContext ctx, + HttpRequest request, + HttpResponse response, + URL requestUri, + ShuffleHeader.HeaderVersionProtocol versionProtocol) throws IOException { SecretKey tokenSecret = secretManager.retrieveTokenSecret(appid); if (null == tokenSecret) { @@ -1238,7 +1268,7 @@ protected void verifyRequest(String appid, ChannelHandlerContext ctx, response.headers().set(ShuffleHeader.HTTP_HEADER_NAME, ShuffleHeader.DEFAULT_HTTP_HEADER_NAME); response.headers().set(ShuffleHeader.HTTP_HEADER_VERSION, - ShuffleHeader.DEFAULT_HTTP_HEADER_VERSION); + versionProtocol.getCompatibleVersion().getVersionStr()); if (LOG.isDebugEnabled()) { int len = reply.length(); LOG.debug("Fetcher request verfied. enc_str=" + enc_str + ";reply=" + @@ -1251,7 +1281,8 @@ protected ChannelFuture sendMapOutput(ChannelHandlerContext ctx, Channel ch, throws IOException { final IndexRecord info = mapOutputInfo.indexRecord; final ShuffleHeader header = - new ShuffleHeader(mapId, info.partLength, info.rawLength, reduce); + new ShuffleHeader( + mapId, info.partLength, info.rawLength, reduce, mapOutputInfo.getHeaderVersion()); final DataOutputBuffer dob = new DataOutputBuffer(); header.write(dob); ch.write(wrappedBuffer(dob.getData(), 0, dob.getLength())); diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-shuffle/src/test/java/org/apache/hadoop/mapred/TestShuffleHandler.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-shuffle/src/test/java/org/apache/hadoop/mapred/TestShuffleHandler.java index af3cb87760c62..521f009e06296 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-shuffle/src/test/java/org/apache/hadoop/mapred/TestShuffleHandler.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-shuffle/src/test/java/org/apache/hadoop/mapred/TestShuffleHandler.java @@ -62,6 +62,7 @@ import org.apache.hadoop.mapreduce.security.token.JobTokenIdentifier; import org.apache.hadoop.mapreduce.security.token.JobTokenSecretManager; import org.apache.hadoop.mapreduce.task.reduce.ShuffleHeader; +import org.apache.hadoop.mapreduce.task.reduce.ShuffleHeader.HeaderVersionProtocol; import org.apache.hadoop.metrics2.MetricsRecordBuilder; import org.apache.hadoop.metrics2.MetricsSource; import org.apache.hadoop.metrics2.MetricsSystem; @@ -115,7 +116,7 @@ protected Shuffle getShuffle(final Configuration conf) { return new Shuffle(conf) { @Override protected void verifyRequest(String appid, ChannelHandlerContext ctx, - HttpRequest request, HttpResponse response, URL requestUri) + HttpRequest request, HttpResponse response, URL requestUri, HeaderVersionProtocol versionProtocol) throws IOException { } @Override @@ -128,7 +129,7 @@ protected MapOutputInfo getMapOutputInfo(String mapId, int reduce, protected void populateHeaders(List mapIds, String jobId, String user, int reduce, HttpRequest request, HttpResponse response, boolean keepAliveParam, - Map infoMap) throws IOException { + Map infoMap, HeaderVersionProtocol versionProtocol) throws IOException { // Do nothing. } @Override @@ -191,7 +192,7 @@ protected Shuffle getShuffle(final Configuration conf) { return new Shuffle(conf) { @Override protected void verifyRequest(String appid, ChannelHandlerContext ctx, - HttpRequest request, HttpResponse response, URL requestUri) + HttpRequest request, HttpResponse response, URL requestUri, HeaderVersionProtocol versionProtocol) throws IOException { SocketChannel channel = (SocketChannel)(ctx.getChannel()); socketKeepAlive = channel.getConfig().isKeepAlive(); @@ -279,14 +280,14 @@ protected MapOutputInfo getMapOutputInfo(String mapId, int reduce, protected void populateHeaders(List mapIds, String jobId, String user, int reduce, HttpRequest request, HttpResponse response, boolean keepAliveParam, - Map infoMap) throws IOException { + Map infoMap, HeaderVersionProtocol versionProtocol) throws IOException { // Only set response headers and skip everything else // send some dummy value for content-length super.setResponseHeaders(response, keepAliveParam, 100); } @Override protected void verifyRequest(String appid, ChannelHandlerContext ctx, - HttpRequest request, HttpResponse response, URL requestUri) + HttpRequest request, HttpResponse response, URL requestUri, HeaderVersionProtocol versionProtocol) throws IOException { } @Override @@ -384,7 +385,7 @@ protected MapOutputInfo getMapOutputInfo(String mapId, int reduce, } @Override protected void verifyRequest(String appid, ChannelHandlerContext ctx, - HttpRequest request, HttpResponse response, URL requestUri) + HttpRequest request, HttpResponse response, URL requestUri, HeaderVersionProtocol versionProtocol) throws IOException { } @@ -392,7 +393,7 @@ protected void verifyRequest(String appid, ChannelHandlerContext ctx, protected void populateHeaders(List mapIds, String jobId, String user, int reduce, HttpRequest request, HttpResponse response, boolean keepAliveParam, - Map infoMap) throws IOException { + Map infoMap, HeaderVersionProtocol versionProtocol) throws IOException { // Send some dummy data (populate content length details) ShuffleHeader header = new ShuffleHeader("attempt_12345_1_m_1_0", 5678, 5678, 1); @@ -576,8 +577,8 @@ public void testIncompatibleShuffleVersion() throws Exception { HttpURLConnection conn = (HttpURLConnection)url.openConnection(); conn.setRequestProperty(ShuffleHeader.HTTP_HEADER_NAME, i == 0 ? "mapreduce" : "other"); - conn.setRequestProperty(ShuffleHeader.HTTP_HEADER_VERSION, - i == 1 ? "1.0.0" : "1.0.1"); + // server can not find any version smaller than 0.9.0 + conn.setRequestProperty(ShuffleHeader.HTTP_HEADER_VERSION, i == 1 ? "1.0.0" : "0.9.0"); conn.connect(); Assert.assertEquals( HttpURLConnection.HTTP_BAD_REQUEST, conn.getResponseCode()); @@ -613,12 +614,12 @@ protected MapOutputInfo getMapOutputInfo(String mapId, int reduce, protected void populateHeaders(List mapIds, String jobId, String user, int reduce, HttpRequest request, HttpResponse response, boolean keepAliveParam, - Map infoMap) throws IOException { + Map infoMap, HeaderVersionProtocol versionProtocol) throws IOException { // Do nothing. } @Override protected void verifyRequest(String appid, ChannelHandlerContext ctx, - HttpRequest request, HttpResponse response, URL requestUri) + HttpRequest request, HttpResponse response, URL requestUri, HeaderVersionProtocol versionProtocol) throws IOException { // Do nothing. } @@ -731,7 +732,7 @@ protected Shuffle getShuffle(Configuration conf) { @Override protected void verifyRequest(String appid, ChannelHandlerContext ctx, - HttpRequest request, HttpResponse response, URL requestUri) + HttpRequest request, HttpResponse response, URL requestUri, HeaderVersionProtocol versionProtocol) throws IOException { // Do nothing. } @@ -1053,7 +1054,8 @@ protected Shuffle getShuffle(Configuration conf) { protected void populateHeaders(List mapIds, String outputBaseStr, String user, int reduce, HttpRequest request, HttpResponse response, - boolean keepAliveParam, Map infoMap) + boolean keepAliveParam, Map infoMap, + HeaderVersionProtocol versionProtocol) throws IOException { // Only set response headers and skip everything else // send some dummy value for content-length @@ -1062,7 +1064,7 @@ protected void populateHeaders(List mapIds, @Override protected void verifyRequest(String appid, ChannelHandlerContext ctx, HttpRequest request, - HttpResponse response, URL requestUri) throws IOException { + HttpResponse response, URL requestUri, HeaderVersionProtocol versionProtocol) throws IOException { // Do nothing. } @Override