diff --git a/common/unsafe/src/main/java/org/apache/spark/unsafe/Platform.java b/common/unsafe/src/main/java/org/apache/spark/unsafe/Platform.java index aca6fca00c48..076b693f81c8 100644 --- a/common/unsafe/src/main/java/org/apache/spark/unsafe/Platform.java +++ b/common/unsafe/src/main/java/org/apache/spark/unsafe/Platform.java @@ -19,10 +19,10 @@ import java.lang.reflect.Constructor; import java.lang.reflect.Field; +import java.lang.reflect.InvocationTargetException; import java.lang.reflect.Method; import java.nio.ByteBuffer; -import sun.misc.Cleaner; import sun.misc.Unsafe; public final class Platform { @@ -67,6 +67,60 @@ public final class Platform { unaligned = _unaligned; } + // Access fields and constructors once and store them, for performance: + + private static final Constructor DBB_CONSTRUCTOR; + private static final Field DBB_CLEANER_FIELD; + static { + try { + Class cls = Class.forName("java.nio.DirectByteBuffer"); + Constructor constructor = cls.getDeclaredConstructor(Long.TYPE, Integer.TYPE); + constructor.setAccessible(true); + Field cleanerField = cls.getDeclaredField("cleaner"); + cleanerField.setAccessible(true); + DBB_CONSTRUCTOR = constructor; + DBB_CLEANER_FIELD = cleanerField; + } catch (ClassNotFoundException | NoSuchMethodException | NoSuchFieldException e) { + throw new IllegalStateException(e); + } + } + + private static final Method CLEANER_CREATE_METHOD; + static { + // The implementation of Cleaner changed from JDK 8 to 9 + // Split java.version on non-digit chars: + int majorVersion = Integer.parseInt(System.getProperty("java.version").split("\\D+")[0]); + String cleanerClassName; + if (majorVersion < 9) { + cleanerClassName = "sun.misc.Cleaner"; + } else { + cleanerClassName = "jdk.internal.ref.Cleaner"; + } + try { + Class cleanerClass = Class.forName(cleanerClassName); + Method createMethod = cleanerClass.getMethod("create", Object.class, Runnable.class); + // Accessing jdk.internal.ref.Cleaner should actually fail by default in JDK 9+, + // unfortunately, unless the user has allowed access with something like + // --add-opens java.base/java.lang=ALL-UNNAMED If not, we can't really use the Cleaner + // hack below. It doesn't break, just means the user might run into the default JVM limit + // on off-heap memory and increase it or set the flag above. This tests whether it's + // available: + try { + createMethod.invoke(null, null, null); + } catch (IllegalAccessException e) { + // Don't throw an exception, but can't log here? + createMethod = null; + } catch (InvocationTargetException ite) { + // shouldn't happen; report it + throw new IllegalStateException(ite); + } + CLEANER_CREATE_METHOD = createMethod; + } catch (ClassNotFoundException | NoSuchMethodException e) { + throw new IllegalStateException(e); + } + + } + /** * @return true when running JVM is having sun's Unsafe package available in it and underlying * system having unaligned-access capability. @@ -159,18 +213,18 @@ public static long reallocateMemory(long address, long oldSize, long newSize) { * MaxDirectMemorySize limit (the default limit is too low and we do not want to require users * to increase it). */ - @SuppressWarnings("unchecked") public static ByteBuffer allocateDirectBuffer(int size) { try { - Class cls = Class.forName("java.nio.DirectByteBuffer"); - Constructor constructor = cls.getDeclaredConstructor(Long.TYPE, Integer.TYPE); - constructor.setAccessible(true); - Field cleanerField = cls.getDeclaredField("cleaner"); - cleanerField.setAccessible(true); long memory = allocateMemory(size); - ByteBuffer buffer = (ByteBuffer) constructor.newInstance(memory, size); - Cleaner cleaner = Cleaner.create(buffer, () -> freeMemory(memory)); - cleanerField.set(buffer, cleaner); + ByteBuffer buffer = (ByteBuffer) DBB_CONSTRUCTOR.newInstance(memory, size); + if (CLEANER_CREATE_METHOD != null) { + try { + DBB_CLEANER_FIELD.set(buffer, + CLEANER_CREATE_METHOD.invoke(null, buffer, (Runnable) () -> freeMemory(memory))); + } catch (IllegalAccessException | InvocationTargetException e) { + throw new IllegalStateException(e); + } + } return buffer; } catch (Exception e) { throwException(e); diff --git a/core/src/main/scala/org/apache/spark/storage/StorageUtils.scala b/core/src/main/scala/org/apache/spark/storage/StorageUtils.scala index adc406bb1c44..1c9ea1dba97d 100644 --- a/core/src/main/scala/org/apache/spark/storage/StorageUtils.scala +++ b/core/src/main/scala/org/apache/spark/storage/StorageUtils.scala @@ -22,9 +22,12 @@ import java.nio.{ByteBuffer, MappedByteBuffer} import scala.collection.Map import scala.collection.mutable +import org.apache.commons.lang3.{JavaVersion, SystemUtils} +import sun.misc.Unsafe import sun.nio.ch.DirectBuffer import org.apache.spark.internal.Logging +import org.apache.spark.util.Utils /** * Storage information for each BlockManager. @@ -193,6 +196,31 @@ private[spark] class StorageStatus( /** Helper methods for storage-related objects. */ private[spark] object StorageUtils extends Logging { + + // In Java 8, the type of DirectBuffer.cleaner() was sun.misc.Cleaner, and it was possible + // to access the method sun.misc.Cleaner.clean() to invoke it. The type changed to + // jdk.internal.ref.Cleaner in later JDKs, and the .clean() method is not accessible even with + // reflection. However sun.misc.Unsafe added a invokeCleaner() method in JDK 9+ and this is + // still accessible with reflection. + private val bufferCleaner: DirectBuffer => Unit = + if (SystemUtils.isJavaVersionAtLeast(JavaVersion.JAVA_9)) { + val cleanerMethod = + Utils.classForName("sun.misc.Unsafe").getMethod("invokeCleaner", classOf[ByteBuffer]) + val unsafeField = classOf[Unsafe].getDeclaredField("theUnsafe") + unsafeField.setAccessible(true) + val unsafe = unsafeField.get(null).asInstanceOf[Unsafe] + buffer: DirectBuffer => cleanerMethod.invoke(unsafe, buffer) + } else { + val cleanerMethod = Utils.classForName("sun.misc.Cleaner").getMethod("clean") + buffer: DirectBuffer => { + // Careful to avoid the return type of .cleaner(), which changes with JDK + val cleaner: AnyRef = buffer.cleaner() + if (cleaner != null) { + cleanerMethod.invoke(cleaner) + } + } + } + /** * Attempt to clean up a ByteBuffer if it is direct or memory-mapped. This uses an *unsafe* Sun * API that will cause errors if one attempts to read from the disposed buffer. However, neither @@ -204,14 +232,8 @@ private[spark] object StorageUtils extends Logging { def dispose(buffer: ByteBuffer): Unit = { if (buffer != null && buffer.isInstanceOf[MappedByteBuffer]) { logTrace(s"Disposing of $buffer") - cleanDirectBuffer(buffer.asInstanceOf[DirectBuffer]) + bufferCleaner(buffer.asInstanceOf[DirectBuffer]) } } - private def cleanDirectBuffer(buffer: DirectBuffer) = { - val cleaner = buffer.cleaner() - if (cleaner != null) { - cleaner.clean() - } - } } diff --git a/core/src/test/scala/org/apache/spark/util/UtilsSuite.scala b/core/src/test/scala/org/apache/spark/util/UtilsSuite.scala index 39f4fba78583..5293645cab05 100644 --- a/core/src/test/scala/org/apache/spark/util/UtilsSuite.scala +++ b/core/src/test/scala/org/apache/spark/util/UtilsSuite.scala @@ -33,7 +33,7 @@ import scala.util.Random import com.google.common.io.Files import org.apache.commons.io.IOUtils -import org.apache.commons.lang3.SystemUtils +import org.apache.commons.lang3.{JavaVersion, SystemUtils} import org.apache.commons.math3.stat.inference.ChiSquareTest import org.apache.hadoop.conf.Configuration import org.apache.hadoop.fs.Path @@ -932,10 +932,7 @@ class UtilsSuite extends SparkFunSuite with ResetSystemProperties with Logging { signal(pid, "SIGKILL") } - val versionParts = System.getProperty("java.version").split("[+.\\-]+", 3) - var majorVersion = versionParts(0).toInt - if (majorVersion == 1) majorVersion = versionParts(1).toInt - if (majorVersion >= 8) { + if (SystemUtils.isJavaVersionAtLeast(JavaVersion.JAVA_1_8)) { // We'll make sure that forcibly terminating a process works by // creating a very misbehaving process. It ignores SIGTERM and has been SIGSTOPed. On // older versions of java, this will *not* terminate. diff --git a/dev/deps/spark-deps-hadoop-2.7 b/dev/deps/spark-deps-hadoop-2.7 index 15a570908cc9..8c03ab617fe4 100644 --- a/dev/deps/spark-deps-hadoop-2.7 +++ b/dev/deps/spark-deps-hadoop-2.7 @@ -43,7 +43,7 @@ commons-digester-1.8.jar commons-httpclient-3.1.jar commons-io-2.4.jar commons-lang-2.6.jar -commons-lang3-3.5.jar +commons-lang3-3.8.1.jar commons-logging-1.1.3.jar commons-math3-3.4.1.jar commons-net-3.1.jar diff --git a/dev/deps/spark-deps-hadoop-3.1 b/dev/deps/spark-deps-hadoop-3.1 index 6d9191a4abb4..6582057478e7 100644 --- a/dev/deps/spark-deps-hadoop-3.1 +++ b/dev/deps/spark-deps-hadoop-3.1 @@ -40,7 +40,7 @@ commons-dbcp-1.4.jar commons-httpclient-3.1.jar commons-io-2.4.jar commons-lang-2.6.jar -commons-lang3-3.5.jar +commons-lang3-3.8.1.jar commons-logging-1.1.3.jar commons-math3-3.4.1.jar commons-net-3.1.jar @@ -116,8 +116,8 @@ jersey-container-servlet-core-2.22.2.jar jersey-guava-2.22.2.jar jersey-media-jaxb-2.22.2.jar jersey-server-2.22.2.jar -jetty-webapp-9.3.24.v20180605.jar -jetty-xml-9.3.24.v20180605.jar +jetty-webapp-9.4.12.v20180830.jar +jetty-xml-9.4.12.v20180830.jar jline-2.14.6.jar joda-time-2.9.3.jar jodd-core-3.5.2.jar diff --git a/examples/src/main/scala/org/apache/spark/examples/LogQuery.scala b/examples/src/main/scala/org/apache/spark/examples/LogQuery.scala index c55b68e03396..03187aee044e 100644 --- a/examples/src/main/scala/org/apache/spark/examples/LogQuery.scala +++ b/examples/src/main/scala/org/apache/spark/examples/LogQuery.scala @@ -32,13 +32,13 @@ object LogQuery { | GTB7.4; .NET CLR 2.0.50727; .NET CLR 3.0.04506.30; .NET CLR 3.0.04506.648; .NET CLR | 3.5.21022; .NET CLR 3.0.4506.2152; .NET CLR 1.0.3705; .NET CLR 1.1.4322; .NET CLR | 3.5.30729; Release=ARP)" "UD-1" - "image/jpeg" "whatever" 0.350 "-" - "" 265 923 934 "" - | 62.24.11.25 images.com 1358492167 - Whatup""".stripMargin.lines.mkString, + | 62.24.11.25 images.com 1358492167 - Whatup""".stripMargin.split('\n').mkString, """10.10.10.10 - "FRED" [18/Jan/2013:18:02:37 +1100] "GET http://images.com/2013/Generic.jpg | HTTP/1.1" 304 306 "http:/referall.com" "Mozilla/4.0 (compatible; MSIE 7.0; Windows NT 5.1; | GTB7.4; .NET CLR 2.0.50727; .NET CLR 3.0.04506.30; .NET CLR 3.0.04506.648; .NET CLR | 3.5.21022; .NET CLR 3.0.4506.2152; .NET CLR 1.0.3705; .NET CLR 1.1.4322; .NET CLR | 3.5.30729; Release=ARP)" "UD-1" - "image/jpeg" "whatever" 0.352 "-" - "" 256 977 988 "" - | 0 73.23.2.15 images.com 1358492557 - Whatup""".stripMargin.lines.mkString + | 0 73.23.2.15 images.com 1358492557 - Whatup""".stripMargin.split('\n').mkString ) def main(args: Array[String]) { diff --git a/mllib-local/src/test/scala/org/apache/spark/ml/linalg/MatricesSuite.scala b/mllib-local/src/test/scala/org/apache/spark/ml/linalg/MatricesSuite.scala index ace44165b106..332734bd2834 100644 --- a/mllib-local/src/test/scala/org/apache/spark/ml/linalg/MatricesSuite.scala +++ b/mllib-local/src/test/scala/org/apache/spark/ml/linalg/MatricesSuite.scala @@ -862,10 +862,10 @@ class MatricesSuite extends SparkMLFunSuite { mat.toString(0, 0) mat.toString(Int.MinValue, Int.MinValue) mat.toString(Int.MaxValue, Int.MaxValue) - var lines = mat.toString(6, 50).lines.toArray + var lines = mat.toString(6, 50).split('\n') assert(lines.size == 5 && lines.forall(_.size <= 50)) - lines = mat.toString(5, 100).lines.toArray + lines = mat.toString(5, 100).split('\n') assert(lines.size == 5 && lines.forall(_.size <= 100)) } diff --git a/mllib/src/test/scala/org/apache/spark/mllib/linalg/MatricesSuite.scala b/mllib/src/test/scala/org/apache/spark/mllib/linalg/MatricesSuite.scala index d76edb940b2b..2c3f84617cfa 100644 --- a/mllib/src/test/scala/org/apache/spark/mllib/linalg/MatricesSuite.scala +++ b/mllib/src/test/scala/org/apache/spark/mllib/linalg/MatricesSuite.scala @@ -511,10 +511,10 @@ class MatricesSuite extends SparkFunSuite { mat.toString(0, 0) mat.toString(Int.MinValue, Int.MinValue) mat.toString(Int.MaxValue, Int.MaxValue) - var lines = mat.toString(6, 50).lines.toArray + var lines = mat.toString(6, 50).split('\n') assert(lines.size == 5 && lines.forall(_.size <= 50)) - lines = mat.toString(5, 100).lines.toArray + lines = mat.toString(5, 100).split('\n') assert(lines.size == 5 && lines.forall(_.size <= 100)) } diff --git a/pom.xml b/pom.xml index a08b7fda3338..177427dc61d8 100644 --- a/pom.xml +++ b/pom.xml @@ -133,7 +133,7 @@ 1.5.3 nohive 1.6.0 - 9.3.24.v20180605 + 9.4.12.v20180830 3.1.0 0.9.3 2.4.0 @@ -166,7 +166,7 @@ 2.6 - 3.5 + 3.8.1 3.2.10 3.0.10 2.22.2 @@ -2016,7 +2016,7 @@ net.alchim31.maven scala-maven-plugin - 3.2.2 + 3.4.4 eclipse-add-source @@ -2281,7 +2281,19 @@ org.apache.maven.plugins maven-shade-plugin - 3.1.0 + 3.2.0 + + + org.ow2.asm + asm + 7.0 + + + org.ow2.asm + asm-commons + 7.0 + + org.apache.maven.plugins @@ -2296,7 +2308,7 @@ org.apache.maven.plugins maven-dependency-plugin - 3.0.2 + 3.1.1 default-cli diff --git a/sql/hive-thriftserver/src/main/java/org/apache/hive/service/cli/thrift/ThriftHttpCLIService.java b/sql/hive-thriftserver/src/main/java/org/apache/hive/service/cli/thrift/ThriftHttpCLIService.java index 341a7fdbb59b..a10245b372d7 100644 --- a/sql/hive-thriftserver/src/main/java/org/apache/hive/service/cli/thrift/ThriftHttpCLIService.java +++ b/sql/hive-thriftserver/src/main/java/org/apache/hive/service/cli/thrift/ThriftHttpCLIService.java @@ -19,7 +19,6 @@ package org.apache.hive.service.cli.thrift; import java.util.Arrays; -import java.util.concurrent.ExecutorService; import java.util.concurrent.SynchronousQueue; import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; @@ -65,7 +64,7 @@ public void run() { // Server thread pool // Start with minWorkerThreads, expand till maxWorkerThreads and reject subsequent requests String threadPoolName = "HiveServer2-HttpHandler-Pool"; - ExecutorService executorService = new ThreadPoolExecutor(minWorkerThreads, maxWorkerThreads, + ThreadPoolExecutor executorService = new ThreadPoolExecutor(minWorkerThreads, maxWorkerThreads, workerKeepAliveTime, TimeUnit.SECONDS, new SynchronousQueue(), new ThreadFactoryWithGarbageCleanup(threadPoolName)); ExecutorThreadPool threadPool = new ExecutorThreadPool(executorService);