From adf186df4aed9261f948fc58751147d57f77959b Mon Sep 17 00:00:00 2001 From: Andrew Or Date: Thu, 6 Nov 2014 13:55:37 -0800 Subject: [PATCH 1/5] Move byte buffer String conversion logic to JavaUtils --- .../apache/spark/network/util/JavaUtils.java | 20 ++++++++++++++++ .../network/sasl/ShuffleSecretManager.java | 24 ++----------------- .../spark/deploy/yarn/ExecutorRunnable.scala | 6 +++-- .../spark/deploy/yarn/ExecutorRunnable.scala | 6 +++-- 4 files changed, 30 insertions(+), 26 deletions(-) diff --git a/network/common/src/main/java/org/apache/spark/network/util/JavaUtils.java b/network/common/src/main/java/org/apache/spark/network/util/JavaUtils.java index 40b71b0c87a4..40fc29b564a9 100644 --- a/network/common/src/main/java/org/apache/spark/network/util/JavaUtils.java +++ b/network/common/src/main/java/org/apache/spark/network/util/JavaUtils.java @@ -17,6 +17,9 @@ package org.apache.spark.network.util; +import java.nio.ByteBuffer; +import java.nio.charset.Charset; + import java.io.ByteArrayInputStream; import java.io.ByteArrayOutputStream; import java.io.Closeable; @@ -30,6 +33,7 @@ public class JavaUtils { private static final Logger logger = LoggerFactory.getLogger(JavaUtils.class); + private static final Charset UTF8_CHARSET = Charset.forName("UTF-8"); /** Closes the given object, ignoring IOExceptions. */ public static void closeQuietly(Closeable closeable) { @@ -73,4 +77,20 @@ public static int nonNegativeHash(Object obj) { int hash = obj.hashCode(); return hash != Integer.MIN_VALUE ? Math.abs(hash) : 0; } + + /** + * Convert the given string to a byte buffer. The resulting buffer can be + * converted back to the same string through {@link #bytesToString(ByteBuffer)}. + */ + public static ByteBuffer stringToBytes(String s) { + return ByteBuffer.wrap(s.getBytes(UTF8_CHARSET)); + } + + /** + * Convert the given byte buffer to a string. The resulting string can be + * converted back to the same byte buffer through {@link #stringToBytes(String)}. + */ + public static String bytesToString(ByteBuffer b) { + return new String(b.array(), UTF8_CHARSET); + } } diff --git a/network/shuffle/src/main/java/org/apache/spark/network/sasl/ShuffleSecretManager.java b/network/shuffle/src/main/java/org/apache/spark/network/sasl/ShuffleSecretManager.java index e66c4af0f1eb..351c7930a900 100644 --- a/network/shuffle/src/main/java/org/apache/spark/network/sasl/ShuffleSecretManager.java +++ b/network/shuffle/src/main/java/org/apache/spark/network/sasl/ShuffleSecretManager.java @@ -19,13 +19,13 @@ import java.lang.Override; import java.nio.ByteBuffer; -import java.nio.charset.Charset; import java.util.concurrent.ConcurrentHashMap; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.apache.spark.network.sasl.SecretKeyHolder; +import org.apache.spark.network.util.JavaUtils; /** * A class that manages shuffle secret used by the external shuffle service. @@ -34,30 +34,10 @@ public class ShuffleSecretManager implements SecretKeyHolder { private final Logger logger = LoggerFactory.getLogger(ShuffleSecretManager.class); private final ConcurrentHashMap shuffleSecretMap; - private static final Charset UTF8_CHARSET = Charset.forName("UTF-8"); - // Spark user used for authenticating SASL connections // Note that this must match the value in org.apache.spark.SecurityManager private static final String SPARK_SASL_USER = "sparkSaslUser"; - /** - * Convert the given string to a byte buffer. The resulting buffer can be converted back to - * the same string through {@link #bytesToString(ByteBuffer)}. This is used if the external - * shuffle service represents shuffle secrets as bytes buffers instead of strings. - */ - public static ByteBuffer stringToBytes(String s) { - return ByteBuffer.wrap(s.getBytes(UTF8_CHARSET)); - } - - /** - * Convert the given byte buffer to a string. The resulting string can be converted back to - * the same byte buffer through {@link #stringToBytes(String)}. This is used if the external - * shuffle service represents shuffle secrets as bytes buffers instead of strings. - */ - public static String bytesToString(ByteBuffer b) { - return new String(b.array(), UTF8_CHARSET); - } - public ShuffleSecretManager() { shuffleSecretMap = new ConcurrentHashMap(); } @@ -80,7 +60,7 @@ public void registerApp(String appId, String shuffleSecret) { * Register an application with its secret specified as a byte buffer. */ public void registerApp(String appId, ByteBuffer shuffleSecret) { - registerApp(appId, bytesToString(shuffleSecret)); + registerApp(appId, JavaUtils.bytesToString(shuffleSecret)); } /** diff --git a/yarn/alpha/src/main/scala/org/apache/spark/deploy/yarn/ExecutorRunnable.scala b/yarn/alpha/src/main/scala/org/apache/spark/deploy/yarn/ExecutorRunnable.scala index 5f47c79cabae..7a13c7d0003c 100644 --- a/yarn/alpha/src/main/scala/org/apache/spark/deploy/yarn/ExecutorRunnable.scala +++ b/yarn/alpha/src/main/scala/org/apache/spark/deploy/yarn/ExecutorRunnable.scala @@ -36,7 +36,7 @@ import org.apache.hadoop.yarn.ipc.YarnRPC import org.apache.hadoop.yarn.util.{Apps, ConverterUtils, Records, ProtoUtils} import org.apache.spark.{SecurityManager, SparkConf, Logging} -import org.apache.spark.network.sasl.ShuffleSecretManager +import org.apache.spark.network.util.JavaUtils @deprecated("use yarn/stable", "1.2.0") class ExecutorRunnable( @@ -98,7 +98,9 @@ class ExecutorRunnable( val secretString = securityMgr.getSecretKey() val secretBytes = if (secretString != null) { - ShuffleSecretManager.stringToBytes(secretString) + // This uses a JavaUtils method because the reverse conversion takes + // place in the Yarn shuffle service, which is implemented in Java + JavaUtils.stringToBytes(secretString) } else { // Authentication is not enabled, so just provide dummy metadata ByteBuffer.allocate(0) diff --git a/yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/ExecutorRunnable.scala b/yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/ExecutorRunnable.scala index 18f48b4b6caf..b3eca725d02d 100644 --- a/yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/ExecutorRunnable.scala +++ b/yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/ExecutorRunnable.scala @@ -36,7 +36,7 @@ import org.apache.hadoop.yarn.ipc.YarnRPC import org.apache.hadoop.yarn.util.{Apps, ConverterUtils, Records} import org.apache.spark.{SecurityManager, SparkConf, Logging} -import org.apache.spark.network.sasl.ShuffleSecretManager +import org.apache.spark.network.util.JavaUtils class ExecutorRunnable( @@ -97,7 +97,9 @@ class ExecutorRunnable( val secretString = securityMgr.getSecretKey() val secretBytes = if (secretString != null) { - ShuffleSecretManager.stringToBytes(secretString) + // This uses a JavaUtils method because the reverse conversion takes + // place in the Yarn shuffle service, which is implemented in Java + JavaUtils.stringToBytes(secretString) } else { // Authentication is not enabled, so just provide dummy metadata ByteBuffer.allocate(0) From 057135bd31aa68ed1e5d1028cc482d2e09911447 Mon Sep 17 00:00:00 2001 From: Andrew Or Date: Thu, 6 Nov 2014 14:59:08 -0800 Subject: [PATCH 2/5] Reword comment --- .../scala/org/apache/spark/deploy/yarn/ExecutorRunnable.scala | 3 +-- .../scala/org/apache/spark/deploy/yarn/ExecutorRunnable.scala | 3 +-- 2 files changed, 2 insertions(+), 4 deletions(-) diff --git a/yarn/alpha/src/main/scala/org/apache/spark/deploy/yarn/ExecutorRunnable.scala b/yarn/alpha/src/main/scala/org/apache/spark/deploy/yarn/ExecutorRunnable.scala index 7a13c7d0003c..7023a1170654 100644 --- a/yarn/alpha/src/main/scala/org/apache/spark/deploy/yarn/ExecutorRunnable.scala +++ b/yarn/alpha/src/main/scala/org/apache/spark/deploy/yarn/ExecutorRunnable.scala @@ -98,8 +98,7 @@ class ExecutorRunnable( val secretString = securityMgr.getSecretKey() val secretBytes = if (secretString != null) { - // This uses a JavaUtils method because the reverse conversion takes - // place in the Yarn shuffle service, which is implemented in Java + // This conversion must match how the YarnShuffleService decodes our secret JavaUtils.stringToBytes(secretString) } else { // Authentication is not enabled, so just provide dummy metadata diff --git a/yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/ExecutorRunnable.scala b/yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/ExecutorRunnable.scala index b3eca725d02d..fdd3c2300fa7 100644 --- a/yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/ExecutorRunnable.scala +++ b/yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/ExecutorRunnable.scala @@ -97,8 +97,7 @@ class ExecutorRunnable( val secretString = securityMgr.getSecretKey() val secretBytes = if (secretString != null) { - // This uses a JavaUtils method because the reverse conversion takes - // place in the Yarn shuffle service, which is implemented in Java + // This conversion must match how the YarnShuffleService decodes our secret JavaUtils.stringToBytes(secretString) } else { // Authentication is not enabled, so just provide dummy metadata From 85202a530b51fcae39ec21699054bed85b719d49 Mon Sep 17 00:00:00 2001 From: Andrew Or Date: Thu, 6 Nov 2014 15:03:07 -0800 Subject: [PATCH 3/5] Use guava Charsets --- .../main/java/org/apache/spark/network/util/JavaUtils.java | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/network/common/src/main/java/org/apache/spark/network/util/JavaUtils.java b/network/common/src/main/java/org/apache/spark/network/util/JavaUtils.java index 40fc29b564a9..7db9fd19424e 100644 --- a/network/common/src/main/java/org/apache/spark/network/util/JavaUtils.java +++ b/network/common/src/main/java/org/apache/spark/network/util/JavaUtils.java @@ -28,12 +28,12 @@ import java.io.ObjectOutputStream; import com.google.common.io.Closeables; +import com.google.common.base.Charsets; import org.slf4j.Logger; import org.slf4j.LoggerFactory; public class JavaUtils { private static final Logger logger = LoggerFactory.getLogger(JavaUtils.class); - private static final Charset UTF8_CHARSET = Charset.forName("UTF-8"); /** Closes the given object, ignoring IOExceptions. */ public static void closeQuietly(Closeable closeable) { @@ -83,7 +83,7 @@ public static int nonNegativeHash(Object obj) { * converted back to the same string through {@link #bytesToString(ByteBuffer)}. */ public static ByteBuffer stringToBytes(String s) { - return ByteBuffer.wrap(s.getBytes(UTF8_CHARSET)); + return ByteBuffer.wrap(s.getBytes(Charsets.UTF_8)); } /** @@ -91,6 +91,6 @@ public static ByteBuffer stringToBytes(String s) { * converted back to the same byte buffer through {@link #stringToBytes(String)}. */ public static String bytesToString(ByteBuffer b) { - return new String(b.array(), UTF8_CHARSET); + return new String(b.array(), Charsets.UTF_8); } } From 94e205c92f131b1a6adf6bd276f36b9c4f030b80 Mon Sep 17 00:00:00 2001 From: Andrew Or Date: Thu, 6 Nov 2014 15:06:47 -0800 Subject: [PATCH 4/5] Use netty Unpooled --- .../main/java/org/apache/spark/network/util/JavaUtils.java | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/network/common/src/main/java/org/apache/spark/network/util/JavaUtils.java b/network/common/src/main/java/org/apache/spark/network/util/JavaUtils.java index 7db9fd19424e..c03fb5621823 100644 --- a/network/common/src/main/java/org/apache/spark/network/util/JavaUtils.java +++ b/network/common/src/main/java/org/apache/spark/network/util/JavaUtils.java @@ -29,6 +29,7 @@ import com.google.common.io.Closeables; import com.google.common.base.Charsets; +import io.netty.buffer.Unpooled; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -83,7 +84,7 @@ public static int nonNegativeHash(Object obj) { * converted back to the same string through {@link #bytesToString(ByteBuffer)}. */ public static ByteBuffer stringToBytes(String s) { - return ByteBuffer.wrap(s.getBytes(Charsets.UTF_8)); + return Unpooled.wrappedBuffer(s.getBytes(Charsets.UTF_8)).nioBuffer(); } /** @@ -91,6 +92,6 @@ public static ByteBuffer stringToBytes(String s) { * converted back to the same byte buffer through {@link #stringToBytes(String)}. */ public static String bytesToString(ByteBuffer b) { - return new String(b.array(), Charsets.UTF_8); + return Unpooled.wrappedBuffer(b).toString(Charsets.UTF_8); } } From b6c08bf91657333b0c02e2931925f1a80ee22a66 Mon Sep 17 00:00:00 2001 From: Andrew Or Date: Thu, 6 Nov 2014 16:02:33 -0800 Subject: [PATCH 5/5] Remove unused import --- .../src/main/java/org/apache/spark/network/util/JavaUtils.java | 1 - 1 file changed, 1 deletion(-) diff --git a/network/common/src/main/java/org/apache/spark/network/util/JavaUtils.java b/network/common/src/main/java/org/apache/spark/network/util/JavaUtils.java index c03fb5621823..2856d1c8c933 100644 --- a/network/common/src/main/java/org/apache/spark/network/util/JavaUtils.java +++ b/network/common/src/main/java/org/apache/spark/network/util/JavaUtils.java @@ -18,7 +18,6 @@ package org.apache.spark.network.util; import java.nio.ByteBuffer; -import java.nio.charset.Charset; import java.io.ByteArrayInputStream; import java.io.ByteArrayOutputStream;