Skip to content

Commit 480a74a

Browse files
committed
Initial import of code from Databricks unsafe utils repo.
1 parent 3a3f710 commit 480a74a

24 files changed

+1986
-2
lines changed

pom.xml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -97,6 +97,7 @@
9797
<module>sql/catalyst</module>
9898
<module>sql/core</module>
9999
<module>sql/hive</module>
100+
<module>unsafe</module>
100101
<module>assembly</module>
101102
<module>external/twitter</module>
102103
<module>external/flume</module>

project/SparkBuild.scala

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -34,11 +34,11 @@ object BuildCommons {
3434

3535
val allProjects@Seq(bagel, catalyst, core, graphx, hive, hiveThriftServer, mllib, repl,
3636
sql, networkCommon, networkShuffle, streaming, streamingFlumeSink, streamingFlume, streamingKafka,
37-
streamingMqtt, streamingTwitter, streamingZeromq, launcher) =
37+
streamingMqtt, streamingTwitter, streamingZeromq, launcher, unsafe) =
3838
Seq("bagel", "catalyst", "core", "graphx", "hive", "hive-thriftserver", "mllib", "repl",
3939
"sql", "network-common", "network-shuffle", "streaming", "streaming-flume-sink",
4040
"streaming-flume", "streaming-kafka", "streaming-mqtt", "streaming-twitter",
41-
"streaming-zeromq", "launcher").map(ProjectRef(buildLocation, _))
41+
"streaming-zeromq", "launcher", "unsafe").map(ProjectRef(buildLocation, _))
4242

4343
val optionallyEnabledProjects@Seq(yarn, yarnStable, java8Tests, sparkGangliaLgpl,
4444
sparkKinesisAsl) = Seq("yarn", "yarn-stable", "java8-tests", "ganglia-lgpl",
Lines changed: 87 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,87 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.spark.unsafe;
19+
20+
import java.lang.reflect.Field;
21+
22+
import sun.misc.Unsafe;
23+
24+
public final class PlatformDependent {
25+
26+
public static final Unsafe UNSAFE;
27+
28+
public static final int BYTE_ARRAY_OFFSET;
29+
30+
public static final int INT_ARRAY_OFFSET;
31+
32+
public static final int LONG_ARRAY_OFFSET;
33+
34+
public static final int DOUBLE_ARRAY_OFFSET;
35+
36+
/**
37+
* Limits the number of bytes to copy per {@link Unsafe#copyMemory(long, long, long)} to
38+
* allow safepoint polling during a large copy.
39+
*/
40+
private static final long UNSAFE_COPY_THRESHOLD = 1024L * 1024L;
41+
42+
static {
43+
sun.misc.Unsafe unsafe;
44+
try {
45+
Field unsafeField = Unsafe.class.getDeclaredField("theUnsafe");
46+
unsafeField.setAccessible(true);
47+
unsafe = (sun.misc.Unsafe) unsafeField.get(null);
48+
} catch (Throwable cause) {
49+
unsafe = null;
50+
}
51+
UNSAFE = unsafe;
52+
53+
if (UNSAFE != null) {
54+
BYTE_ARRAY_OFFSET = UNSAFE.arrayBaseOffset(byte[].class);
55+
INT_ARRAY_OFFSET = UNSAFE.arrayBaseOffset(int[].class);
56+
LONG_ARRAY_OFFSET = UNSAFE.arrayBaseOffset(long[].class);
57+
DOUBLE_ARRAY_OFFSET = UNSAFE.arrayBaseOffset(double[].class);
58+
} else {
59+
BYTE_ARRAY_OFFSET = 0;
60+
INT_ARRAY_OFFSET = 0;
61+
LONG_ARRAY_OFFSET = 0;
62+
DOUBLE_ARRAY_OFFSET = 0;
63+
}
64+
}
65+
66+
static public void copyMemory(
67+
Object src,
68+
long srcOffset,
69+
Object dst,
70+
long dstOffset,
71+
long length) {
72+
while (length > 0) {
73+
long size = Math.min(length, UNSAFE_COPY_THRESHOLD);
74+
UNSAFE.copyMemory(src, srcOffset, dst, dstOffset, size);
75+
length -= size;
76+
srcOffset += size;
77+
dstOffset += size;
78+
}
79+
}
80+
81+
/**
82+
* Raises an exception bypassing compiler checks for checked exceptions.
83+
*/
84+
public static void throwException(Throwable t) {
85+
UNSAFE.throwException(t);
86+
}
87+
}
Lines changed: 49 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,49 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.spark.unsafe.array;
19+
20+
import org.apache.spark.unsafe.PlatformDependent;
21+
22+
import java.lang.Object;
23+
24+
public class ByteArrayMethods {
25+
26+
private ByteArrayMethods() {
27+
// Private constructor, since this class only contains static methods.
28+
}
29+
30+
/**
31+
* Optimized byte array equality check for 8-byte-word-aligned byte arrays.
32+
* @return true if the arrays are equal, false otherwise
33+
*/
34+
public static boolean wordAlignedArrayEquals(
35+
Object leftBaseObject,
36+
long leftBaseOffset,
37+
Object rightBaseObject,
38+
long rightBaseOffset,
39+
long arrayLengthInBytes) {
40+
for (int i = 0; i < arrayLengthInBytes; i += 8) {
41+
final long left =
42+
PlatformDependent.UNSAFE.getLong(leftBaseObject, leftBaseOffset + i);
43+
final long right =
44+
PlatformDependent.UNSAFE.getLong(rightBaseObject, rightBaseOffset + i);
45+
if (left != right) return false;
46+
}
47+
return true;
48+
}
49+
}
Lines changed: 98 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,98 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.spark.unsafe.array;
19+
20+
import org.apache.spark.unsafe.PlatformDependent;
21+
import org.apache.spark.unsafe.memory.MemoryBlock;
22+
23+
/**
24+
* An array of long values. Compared with native JVM arrays, this:
25+
* <ul>
26+
* <li>supports using both in-heap and off-heap memory</li>
27+
* <li>supports 64-bit addressing, i.e. array length greater than {@code Integer.MAX_VALUE}</li>
28+
* <li>has no bound checking, and thus can crash the JVM process when assert is turned off</li>
29+
* </ul>
30+
*/
31+
public final class LongArray {
32+
33+
private static final int WIDTH = 8;
34+
private static final long ARRAY_OFFSET = PlatformDependent.LONG_ARRAY_OFFSET;
35+
36+
private final MemoryBlock memory;
37+
private final Object baseObj;
38+
private final long baseOffset;
39+
40+
private final long length;
41+
42+
public LongArray(MemoryBlock memory) {
43+
assert memory.size() % WIDTH == 0 : "Memory not aligned (" + memory.size() + ")";
44+
this.memory = memory;
45+
this.baseObj = memory.getBaseObject();
46+
this.baseOffset = memory.getBaseOffset();
47+
this.length = memory.size() / WIDTH;
48+
}
49+
50+
public MemoryBlock memoryBlock() {
51+
return memory;
52+
}
53+
54+
/**
55+
* Returns the number of elements this array can hold.
56+
*/
57+
public long size() {
58+
return length;
59+
}
60+
61+
/**
62+
* Sets the value at position {@code index}.
63+
*/
64+
public void set(long index, long value) {
65+
assert index >= 0 : "index (" + index + ") should >= 0";
66+
assert index < length : "index (" + index + ") should < length (" + length + ")";
67+
PlatformDependent.UNSAFE.putLong(baseObj, baseOffset + index * WIDTH, value);
68+
}
69+
70+
/**
71+
* Returns the value at position {@code index}.
72+
*/
73+
public long get(long index) {
74+
assert index >= 0 : "index (" + index + ") should >= 0";
75+
assert index < length : "index (" + index + ") should < length (" + length + ")";
76+
return PlatformDependent.UNSAFE.getLong(baseObj, baseOffset + index * WIDTH);
77+
}
78+
79+
/**
80+
* Returns a copy of the array as a JVM native array. The caller should make sure this array's
81+
* length is less than {@code Integer.MAX_VALUE}.
82+
*/
83+
public long[] toJvmArray() throws IndexOutOfBoundsException {
84+
if (length > Integer.MAX_VALUE) {
85+
throw new IndexOutOfBoundsException(
86+
"array size (" + length + ") too large and cannot be converted into JVM array");
87+
}
88+
89+
final long[] arr = new long[(int) length];
90+
PlatformDependent.UNSAFE.copyMemory(
91+
baseObj,
92+
baseOffset,
93+
arr,
94+
ARRAY_OFFSET,
95+
length * WIDTH);
96+
return arr;
97+
}
98+
}
Lines changed: 117 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,117 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.spark.unsafe.bitset;
19+
20+
import org.apache.spark.unsafe.array.LongArray;
21+
import org.apache.spark.unsafe.memory.MemoryBlock;
22+
23+
/**
24+
* A fixed size uncompressed bit set backed by a {@link LongArray}.
25+
*
26+
* Each bit occupies exactly one bit of storage.
27+
*/
28+
public final class BitSet {
29+
30+
/** A long array for the bits. */
31+
private final LongArray words;
32+
33+
/** Length of the long array. */
34+
private final long numWords;
35+
36+
/**
37+
* Creates a new {@link BitSet} using the specified memory block. Size of the memory block must be
38+
* multiple of 8 bytes (i.e. 64 bits).
39+
*/
40+
public BitSet(MemoryBlock memory) {
41+
words = new LongArray(memory);
42+
numWords = words.size();
43+
}
44+
45+
public MemoryBlock memoryBlock() {
46+
return words.memoryBlock();
47+
}
48+
49+
/**
50+
* Returns the number of bits in this {@code BitSet}.
51+
*/
52+
public long capacity() {
53+
return numWords * 64;
54+
}
55+
56+
/**
57+
* Sets the bit at the specified index to {@code true}.
58+
*/
59+
public void set(long index) {
60+
assert index < numWords * 64 : "index (" + index + ") should < length (" + numWords * 64 + ")";
61+
BitSetMethods.set(
62+
words.memoryBlock().getBaseObject(), words.memoryBlock().getBaseOffset(), index);
63+
}
64+
65+
/**
66+
* Sets the bit at the specified index to {@code false}.
67+
*/
68+
public void unset(long index) {
69+
assert index < numWords * 64 : "index (" + index + ") should < length (" + numWords * 64 + ")";
70+
BitSetMethods.unset(
71+
words.memoryBlock().getBaseObject(), words.memoryBlock().getBaseOffset(), index);
72+
}
73+
74+
/**
75+
* Returns {@code true} if the bit is set at the specified index.
76+
*/
77+
public boolean isSet(long index) {
78+
assert index < numWords * 64 : "index (" + index + ") should < length (" + numWords * 64 + ")";
79+
return BitSetMethods.isSet(
80+
words.memoryBlock().getBaseObject(), words.memoryBlock().getBaseOffset(), index);
81+
}
82+
83+
/**
84+
* Returns the number of bits set to {@code true} in this {@link BitSet}.
85+
*/
86+
public long cardinality() {
87+
long sum = 0L;
88+
for (long i = 0; i < numWords; i++) {
89+
sum += java.lang.Long.bitCount(words.get(i));
90+
}
91+
return sum;
92+
}
93+
94+
/**
95+
* Returns the index of the first bit that is set to true that occurs on or after the
96+
* specified starting index. If no such bit exists then {@code -1} is returned.
97+
* <p>
98+
* To iterate over the true bits in a BitSet, use the following loop:
99+
* <pre>
100+
* <code>
101+
* for (long i = bs.nextSetBit(0); i >= 0; i = bs.nextSetBit(i + 1)) {
102+
* // operate on index i here
103+
* }
104+
* </code>
105+
* </pre>
106+
*
107+
* @param fromIndex the index to start checking from (inclusive)
108+
* @return the index of the next set bit, or -1 if there is no such bit
109+
*/
110+
public long nextSetBit(long fromIndex) {
111+
return BitSetMethods.nextSetBit(
112+
words.memoryBlock().getBaseObject(),
113+
words.memoryBlock().getBaseOffset(),
114+
fromIndex,
115+
numWords);
116+
}
117+
}

0 commit comments

Comments
 (0)