Skip to content
Closed
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
73 changes: 63 additions & 10 deletions common/unsafe/src/main/java/org/apache/spark/unsafe/Platform.java
Original file line number Diff line number Diff line change
Expand Up @@ -19,10 +19,10 @@

import java.lang.reflect.Constructor;
import java.lang.reflect.Field;
import java.lang.reflect.InvocationTargetException;
import java.lang.reflect.Method;
import java.nio.ByteBuffer;

import sun.misc.Cleaner;
import sun.misc.Unsafe;

public final class Platform {
Expand Down Expand Up @@ -67,6 +67,59 @@ public final class Platform {
unaligned = _unaligned;
}

// Access fields and constructors once and store them, for performance:

private static final Constructor<?> DBB_CONSTRUCTOR;
private static final Field DBB_CLEANER_FIELD;
static {
try {
Class<?> cls = Class.forName("java.nio.DirectByteBuffer");
Constructor<?> constructor = cls.getDeclaredConstructor(Long.TYPE, Integer.TYPE);
constructor.setAccessible(true);
Field cleanerField = cls.getDeclaredField("cleaner");
cleanerField.setAccessible(true);
DBB_CONSTRUCTOR = constructor;
DBB_CLEANER_FIELD = cleanerField;
} catch (ClassNotFoundException | NoSuchMethodException | NoSuchFieldException e) {
throw new IllegalStateException(e);
}
}

private static final Method CLEANER_CREATE_METHOD;
static {
// The implementation of Cleaner changed from JDK 8 to 9
Copy link
Contributor

@skonto skonto Nov 12, 2018

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why do we need to reference java 9, its dead right and it will have no LTS AFAIK.
Shouldnt we only support LTS?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, this comment is just noting that the change happened between Java 8 and 9. We are targeting support for Java 11. However I expect virtually all of the changes that we have to make are due to changes in Java 9. (And I have no reason to believe Java 9-10 wouldn't work; we want them to work too)

int majorVersion = Integer.parseInt(System.getProperty("java.version").split("\\.")[0]);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is there a defined fixed format for this?
we are doing some java version check and found very different format from different JDK sources (Oracle vs OpenJDK vs IBM ...)

Copy link
Member

@kiszk kiszk Nov 11, 2018

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

From Java 9, here is a new definition. In the following example, I found a few exceptions.

I confirmed it can work for OpenJDK, OpenJ9, and IBM JDK 8 by running the following code

public class Version {
  public static void main(String[] args){
    System.out.println("jave.specification.version=" + System.getProperty("java.specification.version"));
    System.out.println("jave.version=" + System.getProperty("java.version"));
    System.out.println("jave.version.split(\".\")[0]=" + System.getProperty("java.version").split("\\.")[0]);
  }
}

OpenJDK

$ ../OpenJDK-8/java -version
java version "1.8.0_162"
Java(TM) SE Runtime Environment (build 1.8.0_162-b12)
Java HotSpot(TM) 64-Bit Server VM (build 25.162-b12, mixed mode)

$ ../OpenJDK-8/java Version
jave.specification.version=1.8
jave.version=1.8.0_162
jave.version.split(".")[0]=1

$ ../OpenJDK-9/java -version
openjdk version "9"
OpenJDK Runtime Environment (build 9+181)
OpenJDK 64-Bit Server VM (build 9+181, mixed mode)

$ ../OpenJDK-9/java Version
jave.specification.version=9
jave.version=9
jave.version.split(".")[0]=9

$ ../OpenJDK-11/java -version
openjdk version "11.0.1" 2018-10-16
OpenJDK Runtime Environment 18.9 (build 11.0.1+13)
OpenJDK 64-Bit Server VM 18.9 (build 11.0.1+13, mixed mode)

$ ../OpenJDK-11/java Version
jave.specification.version=11
jave.version=11.0.1
jave.version.split(".")[0]=11

OpenJ9

$ ../OpenJ9-8/java -version
openjdk version "1.8.0_192"
OpenJDK Runtime Environment (build 1.8.0_192-b12)
Eclipse OpenJ9 VM (build openj9-0.11.0, JRE 1.8.0 Windows 10 amd64-64-Bit Compressed References 20181019_105 (JIT enabled, AOT enabled)
OpenJ9   - 090ff9dc
OMR      - ea548a66
JCL      - 51609250b5 based on jdk8u192-b12)

$ ../OpenJ9-8/java Version
jave.specification.version=1.8
jave.version=1.8.0_192
jave.version.split(".")[0]=1

$ ../OpenJ9-9/java -version
openjdk version "9.0.4-adoptopenjdk"
OpenJDK Runtime Environment (build 9.0.4-adoptopenjdk+12)
Eclipse OpenJ9 VM (build openj9-0.9.0, JRE 9 Windows 8.1 amd64-64-Bit Compressed References 20180814_161 (JIT enabled, AOT enabled)
OpenJ9   - 24e53631
OMR      - fad6bf6e
JCL      - feec4d2ae based on jdk-9.0.4+12)

$ ../OpenJ9-9/java Version
jave.specification.version=9
jave.version=9.0.4-adoptopenjdk
jave.version.split(".")[0]=9


$ ../OpenJ9-11/java -version
openjdk version "11.0.1" 2018-10-16
OpenJDK Runtime Environment AdoptOpenJDK (build 11.0.1+13)
Eclipse OpenJ9 VM AdoptOpenJDK (build openj9-0.11.0, JRE 11 Windows 10 amd64-64-Bit Compressed References 20181020_83 (JIT enabled, AOT enabled)
OpenJ9   - 090ff9dc
OMR      - ea548a66
JCL      - f62696f378 based on jdk-11.0.1+13)

$ ../OpenJ9-11/java Version
jave.specification.version=11
jave.version=11.0.1
jave.version.split(".")[0]=11

IBM JDK

$ ../IBMJDK-8/java -version
java version "1.8.0"
Java(TM) SE Runtime Environment (build pwa6480-20150129_02)
IBM J9 VM (build 2.8, JRE 1.8.0 Windows 8.1 amd64-64 Compressed References 20150116_231420 (JIT enabled, AOT enabled)
J9VM - R28_Java8_GA_20150116_2030_B231420
JIT  - tr.r14.java_20150109_82886.02
GC   - R28_Java8_GA_20150116_2030_B231420_CMPRSS
J9CL - 20150116_231420)
JCL - 20150123_01 based on Oracle jdk8u31-b12

$ ../IBMJDK-8/java Version
jave.specification.version=1.8
jave.version=1.8.0
jave.version.split(".")[0]=1

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This should be safe, but I recall that some early-access Java 9 builds had a version like "9-ea". Maybe I should make the regex grab the leading digits only to be safe.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

looks like it could be java.version=1.8.0_192 or java.version=11.0.1

ie. first integer or second integer (1.8 => 8?)

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That's right, but Java versions <= 8 are always 1.8, 1.7, etc. We're just detecting Java 9+ here, and for Java 8 and earlier, the first digit is 1 and so will compare as less than 9.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How about using java.specification.version and parse it as double? Then check if greater than 2.

String cleanerClassName;
if (majorVersion < 9) {
cleanerClassName = "sun.misc.Cleaner";
} else {
cleanerClassName = "jdk.internal.ref.Cleaner";
}
try {
Class<?> cleanerClass = Class.forName(cleanerClassName);
Method createMethod = cleanerClass.getMethod("create", Object.class, Runnable.class);
// Accessing jdk.internal.ref.Cleaner should actually fail by default in JDK 9+,
// unfortunately, unless the user has allowed access with something like
// --add-opens java.base/java.lang=ALL-UNNAMED If not, we can't really use the Cleaner
// hack below. It doesn't break, just means the user might run into the default JVM limit
// on off-heap memory and increase it or set the flag above. This tests whether it's
// available:
try {
createMethod.invoke(null, null, null);
} catch (IllegalAccessException e) {
// Don't throw an exception, but can't log here?
createMethod = null;
} catch (InvocationTargetException ite) {
// shouldn't happen; report it
throw new IllegalStateException(ite);
}
CLEANER_CREATE_METHOD = createMethod;
} catch (ClassNotFoundException | NoSuchMethodException e) {
throw new IllegalStateException(e);
}

}

/**
* @return true when running JVM is having sun's Unsafe package available in it and underlying
* system having unaligned-access capability.
Expand Down Expand Up @@ -159,18 +212,18 @@ public static long reallocateMemory(long address, long oldSize, long newSize) {
* MaxDirectMemorySize limit (the default limit is too low and we do not want to require users
* to increase it).
*/
@SuppressWarnings("unchecked")
public static ByteBuffer allocateDirectBuffer(int size) {
try {
Class<?> cls = Class.forName("java.nio.DirectByteBuffer");
Constructor<?> constructor = cls.getDeclaredConstructor(Long.TYPE, Integer.TYPE);
constructor.setAccessible(true);
Field cleanerField = cls.getDeclaredField("cleaner");
cleanerField.setAccessible(true);
long memory = allocateMemory(size);
ByteBuffer buffer = (ByteBuffer) constructor.newInstance(memory, size);
Cleaner cleaner = Cleaner.create(buffer, () -> freeMemory(memory));
cleanerField.set(buffer, cleaner);
ByteBuffer buffer = (ByteBuffer) DBB_CONSTRUCTOR.newInstance(memory, size);
if (CLEANER_CREATE_METHOD != null) {
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

See #23424 ; I now think this was an error.

try {
DBB_CLEANER_FIELD.set(buffer,
CLEANER_CREATE_METHOD.invoke(null, buffer, (Runnable) () -> freeMemory(memory)));
} catch (IllegalAccessException | InvocationTargetException e) {
throw new IllegalStateException(e);
}
}
return buffer;
} catch (Exception e) {
throwException(e);
Expand Down
38 changes: 31 additions & 7 deletions core/src/main/scala/org/apache/spark/storage/StorageUtils.scala
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import java.nio.{ByteBuffer, MappedByteBuffer}
import scala.collection.Map
import scala.collection.mutable

import sun.misc.Unsafe
import sun.nio.ch.DirectBuffer

import org.apache.spark.internal.Logging
Expand Down Expand Up @@ -193,6 +194,35 @@ private[spark] class StorageStatus(

/** Helper methods for storage-related objects. */
private[spark] object StorageUtils extends Logging {

// In Java 8, the type of DirectBuffer.cleaner() was sun.misc.Cleaner, and it was possible
// to access the method sun.misc.Cleaner.clean() to invoke it. The type changed to
// jdk.internal.ref.Cleaner in later JDKs, and the .clean() method is not accessible even with
// reflection. However sun.misc.Unsafe added a invokeCleaner() method in JDK 9+ and this is
// still accessible with reflection.
private val bufferCleaner: DirectBuffer => Unit =
if (System.getProperty("java.version").split("\\.").head.toInt < 9) {
// scalastyle:off classforname
val cleanerMethod = Class.forName("sun.misc.Cleaner").getMethod("clean")
// scalastyle:on classforname
(buffer: DirectBuffer) => {
// Careful to avoid the return type of .cleaner(), which changes with JDK
val cleaner: AnyRef = buffer.cleaner()
if (cleaner != null) {
cleanerMethod.invoke(cleaner)
}
}
} else {
// scalastyle:off classforname
val cleanerMethod =
Class.forName("sun.misc.Unsafe").getMethod("invokeCleaner", classOf[ByteBuffer])
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we use Utils forname method here?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I wasn't sure whether it was good or not to use the Spark classloader, as that method does. If there's any good reason to, sure. This is kind of a special situation, but don't know if it must use Class directly.

Copy link
Contributor

@skonto skonto Nov 13, 2018

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Probably it is safer not to mix strategies (https://www.javaworld.com/article/2077344/core-java/find-a-way-out-of-the-classloader-maze.html). Utils forName method first checks the thread context class loader and from what I see this method is called in numerous places. Spark creates custom classloaders AFAIK so the question is from which classloader chain you want this class loading to happen. I think both should work, it is not a custom class or anything.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The problem is, we can't change the usage in Platform.java, so we'd be mixing strategies here. There are already usages of Class.forName for this reason. I don't really object to changing one instance here; I have no reason to believe either of them fails. But is that meeting your goal?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ok, in general I cant think of something failing at the moment, just wanted to provide some feedback that this mixes strategies.

// scalastyle:on classforname
val unsafeField = classOf[Unsafe].getDeclaredField("theUnsafe")
unsafeField.setAccessible(true)
val unsafe = unsafeField.get(null).asInstanceOf[Unsafe]
(buffer: DirectBuffer) => cleanerMethod.invoke(unsafe, buffer)
}

/**
* Attempt to clean up a ByteBuffer if it is direct or memory-mapped. This uses an *unsafe* Sun
* API that will cause errors if one attempts to read from the disposed buffer. However, neither
Expand All @@ -204,14 +234,8 @@ private[spark] object StorageUtils extends Logging {
def dispose(buffer: ByteBuffer): Unit = {
if (buffer != null && buffer.isInstanceOf[MappedByteBuffer]) {
logTrace(s"Disposing of $buffer")
cleanDirectBuffer(buffer.asInstanceOf[DirectBuffer])
bufferCleaner(buffer.asInstanceOf[DirectBuffer])
}
}

private def cleanDirectBuffer(buffer: DirectBuffer) = {
val cleaner = buffer.cleaner()
if (cleaner != null) {
cleaner.clean()
}
}
}
4 changes: 2 additions & 2 deletions dev/deps/spark-deps-hadoop-3.1
Original file line number Diff line number Diff line change
Expand Up @@ -116,8 +116,8 @@ jersey-container-servlet-core-2.22.2.jar
jersey-guava-2.22.2.jar
jersey-media-jaxb-2.22.2.jar
jersey-server-2.22.2.jar
jetty-webapp-9.3.24.v20180605.jar
jetty-xml-9.3.24.v20180605.jar
jetty-webapp-9.4.12.v20180830.jar
jetty-xml-9.4.12.v20180830.jar
jline-2.14.6.jar
joda-time-2.9.3.jar
jodd-core-3.5.2.jar
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,13 +32,13 @@ object LogQuery {
| GTB7.4; .NET CLR 2.0.50727; .NET CLR 3.0.04506.30; .NET CLR 3.0.04506.648; .NET CLR
| 3.5.21022; .NET CLR 3.0.4506.2152; .NET CLR 1.0.3705; .NET CLR 1.1.4322; .NET CLR
| 3.5.30729; Release=ARP)" "UD-1" - "image/jpeg" "whatever" 0.350 "-" - "" 265 923 934 ""
| 62.24.11.25 images.com 1358492167 - Whatup""".stripMargin.lines.mkString,
| 62.24.11.25 images.com 1358492167 - Whatup""".stripMargin.split('\n').mkString,
"""10.10.10.10 - "FRED" [18/Jan/2013:18:02:37 +1100] "GET http://images.com/2013/Generic.jpg
| HTTP/1.1" 304 306 "http:/referall.com" "Mozilla/4.0 (compatible; MSIE 7.0; Windows NT 5.1;
| GTB7.4; .NET CLR 2.0.50727; .NET CLR 3.0.04506.30; .NET CLR 3.0.04506.648; .NET CLR
| 3.5.21022; .NET CLR 3.0.4506.2152; .NET CLR 1.0.3705; .NET CLR 1.1.4322; .NET CLR
| 3.5.30729; Release=ARP)" "UD-1" - "image/jpeg" "whatever" 0.352 "-" - "" 256 977 988 ""
| 0 73.23.2.15 images.com 1358492557 - Whatup""".stripMargin.lines.mkString
| 0 73.23.2.15 images.com 1358492557 - Whatup""".stripMargin.split('\n').mkString
)

def main(args: Array[String]) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -862,10 +862,10 @@ class MatricesSuite extends SparkMLFunSuite {
mat.toString(0, 0)
mat.toString(Int.MinValue, Int.MinValue)
mat.toString(Int.MaxValue, Int.MaxValue)
var lines = mat.toString(6, 50).lines.toArray
var lines = mat.toString(6, 50).split('\n')
assert(lines.size == 5 && lines.forall(_.size <= 50))

lines = mat.toString(5, 100).lines.toArray
lines = mat.toString(5, 100).split('\n')
assert(lines.size == 5 && lines.forall(_.size <= 100))
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -511,10 +511,10 @@ class MatricesSuite extends SparkFunSuite {
mat.toString(0, 0)
mat.toString(Int.MinValue, Int.MinValue)
mat.toString(Int.MaxValue, Int.MaxValue)
var lines = mat.toString(6, 50).lines.toArray
var lines = mat.toString(6, 50).split('\n')
assert(lines.size == 5 && lines.forall(_.size <= 50))

lines = mat.toString(5, 100).lines.toArray
lines = mat.toString(5, 100).split('\n')
assert(lines.size == 5 && lines.forall(_.size <= 100))
}

Expand Down
20 changes: 16 additions & 4 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -133,7 +133,7 @@
<orc.version>1.5.3</orc.version>
<orc.classifier>nohive</orc.classifier>
<hive.parquet.version>1.6.0</hive.parquet.version>
<jetty.version>9.3.24.v20180605</jetty.version>
<jetty.version>9.4.12.v20180830</jetty.version>
<javaxservlet.version>3.1.0</javaxservlet.version>
<chill.version>0.9.3</chill.version>
<ivy.version>2.4.0</ivy.version>
Expand Down Expand Up @@ -2016,7 +2016,7 @@
<groupId>net.alchim31.maven</groupId>
<artifactId>scala-maven-plugin</artifactId>
<!-- 3.3.1 won't work with zinc; fails to find javac from java.home -->
<version>3.2.2</version>
<version>3.4.4</version>
<executions>
<execution>
<id>eclipse-add-source</id>
Expand Down Expand Up @@ -2281,7 +2281,19 @@
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-shade-plugin</artifactId>
<version>3.1.0</version>
<version>3.2.0</version>
<dependencies>
<dependency>
<groupId>org.ow2.asm</groupId>
<artifactId>asm</artifactId>
<version>7.0</version>
</dependency>
<dependency>
<groupId>org.ow2.asm</groupId>
<artifactId>asm-commons</artifactId>
<version>7.0</version>
</dependency>
</dependencies>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
Expand All @@ -2296,7 +2308,7 @@
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-dependency-plugin</artifactId>
<version>3.0.2</version>
<version>3.1.1</version>
<executions>
<execution>
<id>default-cli</id>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@
package org.apache.hive.service.cli.thrift;

import java.util.Arrays;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.SynchronousQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
Expand Down Expand Up @@ -65,7 +64,7 @@ public void run() {
// Server thread pool
// Start with minWorkerThreads, expand till maxWorkerThreads and reject subsequent requests
String threadPoolName = "HiveServer2-HttpHandler-Pool";
ExecutorService executorService = new ThreadPoolExecutor(minWorkerThreads, maxWorkerThreads,
ThreadPoolExecutor executorService = new ThreadPoolExecutor(minWorkerThreads, maxWorkerThreads,
workerKeepAliveTime, TimeUnit.SECONDS, new SynchronousQueue<Runnable>(),
new ThreadFactoryWithGarbageCleanup(threadPoolName));
ExecutorThreadPool threadPool = new ExecutorThreadPool(executorService);
Expand Down