Skip to content

Commit 92d5a06

Browse files
committed
Address a number of minor code review comments.
1 parent 1f4b716 commit 92d5a06

File tree

8 files changed

+79
-64
lines changed

8 files changed

+79
-64
lines changed

sql/catalyst/src/main/java/org/apache/spark/sql/catalyst/expressions/UnsafeFixedWidthAggregationMap.java

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -154,7 +154,7 @@ public UnsafeRow getAggregationBuffer(Row groupingKey) {
154154
if (!loc.isDefined()) {
155155
// This is the first time that we've seen this grouping key, so we'll insert a copy of the
156156
// empty aggregation buffer into the map:
157-
loc.storeKeyAndValue(
157+
loc.putNewKey(
158158
groupingKeyConversionScratchSpace,
159159
PlatformDependent.LONG_ARRAY_OFFSET,
160160
groupingKeySize,
@@ -166,7 +166,7 @@ public UnsafeRow getAggregationBuffer(Row groupingKey) {
166166

167167
// Reset the pointer to point to the value that we just stored or looked up:
168168
final MemoryLocation address = loc.getValueAddress();
169-
currentAggregationBuffer.set(
169+
currentAggregationBuffer.pointTo(
170170
address.getBaseObject(),
171171
address.getBaseOffset(),
172172
aggregationBufferSchema.length(),
@@ -201,13 +201,13 @@ public MapEntry next() {
201201
final BytesToBytesMap.Location loc = mapLocationIterator.next();
202202
final MemoryLocation keyAddress = loc.getKeyAddress();
203203
final MemoryLocation valueAddress = loc.getValueAddress();
204-
entry.key.set(
204+
entry.key.pointTo(
205205
keyAddress.getBaseObject(),
206206
keyAddress.getBaseOffset(),
207207
groupingKeySchema.length(),
208208
groupingKeySchema
209209
);
210-
entry.value.set(
210+
entry.value.pointTo(
211211
valueAddress.getBaseObject(),
212212
valueAddress.getBaseOffset(),
213213
aggregationBufferSchema.length(),

sql/catalyst/src/main/java/org/apache/spark/sql/catalyst/expressions/UnsafeRow.java

Lines changed: 36 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -17,17 +17,6 @@
1717

1818
package org.apache.spark.sql.catalyst.expressions;
1919

20-
21-
import org.apache.spark.sql.Row;
22-
import org.apache.spark.sql.types.DataType;
23-
import static org.apache.spark.sql.types.DataTypes.*;
24-
25-
import org.apache.spark.sql.types.StructField;
26-
import org.apache.spark.sql.types.StructType;
27-
import org.apache.spark.sql.types.UTF8String;
28-
import org.apache.spark.unsafe.PlatformDependent;
29-
import org.apache.spark.unsafe.bitset.BitSetMethods;
30-
import org.apache.spark.unsafe.string.UTF8StringMethods;
3120
import scala.collection.Map;
3221
import scala.collection.Seq;
3322
import scala.collection.mutable.ArraySeq;
@@ -40,12 +29,20 @@
4029
import java.util.List;
4130
import java.util.Set;
4231

32+
import org.apache.spark.sql.Row;
33+
import org.apache.spark.sql.types.DataType;
34+
import static org.apache.spark.sql.types.DataTypes.*;
35+
import org.apache.spark.sql.types.StructType;
36+
import org.apache.spark.sql.types.UTF8String;
37+
import org.apache.spark.unsafe.PlatformDependent;
38+
import org.apache.spark.unsafe.bitset.BitSetMethods;
39+
import org.apache.spark.unsafe.string.UTF8StringMethods;
4340

4441
// TODO: pick a better name for this class, since this is potentially confusing.
4542
// Maybe call it UnsafeMutableRow?
4643

4744
/**
48-
* An Unsafe implementation of Row which is backed by raw memory instead of Java objets.
45+
* An Unsafe implementation of Row which is backed by raw memory instead of Java objects.
4946
*
5047
* Each tuple has three parts: [null bit set] [values] [variable length portion]
5148
*
@@ -56,6 +53,9 @@
5653
* primitive types, such as long, double, or int, we store the value directly in the word. For
5754
* fields with non-primitive or variable-length values, we store a relative offset (w.r.t. the
5855
* base address of the row) that points to the beginning of the variable-length field.
56+
*
57+
* Instances of `UnsafeRow` act as pointers to row data stored in this format, similar to how
58+
* `Writable` objects work in Hadoop.
5959
*/
6060
public final class UnsafeRow implements MutableRow {
6161

@@ -64,6 +64,11 @@ public final class UnsafeRow implements MutableRow {
6464
private int numFields;
6565
/** The width of the null tracking bit set, in bytes */
6666
private int bitSetWidthInBytes;
67+
/**
68+
* This optional schema is required if you want to call generic get() and set() methods on
69+
* this UnsafeRow, but is optional if callers will only use type-specific getTYPE() and setTYPE()
70+
* methods.
71+
*/
6772
@Nullable
6873
private StructType schema;
6974

@@ -103,9 +108,27 @@ public static int calculateBitSetWidthInBytes(int numFields) {
103108
readableFieldTypes.addAll(settableFieldTypes);
104109
}
105110

111+
/**
112+
* Construct a new UnsafeRow. The resulting row won't be usable until `pointTo()` has been called,
113+
* since the value returned by this constructor is equivalent to a null pointer.
114+
*/
106115
public UnsafeRow() { }
107116

108-
public void set(Object baseObject, long baseOffset, int numFields, StructType schema) {
117+
/**
118+
* Update this UnsafeRow to point to different backing data.
119+
*
120+
* @param baseObject the base object
121+
* @param baseOffset the offset within the base object
122+
* @param numFields the number of fields in this row
123+
* @param schema an optional schema; this is necessary if you want to call generic get() or set()
124+
* methods on this row, but is optional if the caller will only use type-specific
125+
* getTYPE() and setTYPE() methods.
126+
*/
127+
public void pointTo(
128+
Object baseObject,
129+
long baseOffset,
130+
int numFields,
131+
@Nullable StructType schema) {
109132
assert numFields >= 0 : "numFields should >= 0";
110133
assert schema == null || schema.fields().length == numFields;
111134
this.bitSetWidthInBytes = calculateBitSetWidthInBytes(numFields);

sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/UnsafeRowConverter.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -180,7 +180,7 @@ class UnsafeRowConverter(fieldTypes: Array[DataType]) {
180180
}
181181

182182
def writeRow(row: Row, baseObject: Object, baseOffset: Long): Long = {
183-
unsafeRow.set(baseObject, baseOffset, writers.length, null)
183+
unsafeRow.pointTo(baseObject, baseOffset, writers.length, null)
184184
var fieldNumber = 0
185185
var appendCursor: Int = fixedLengthSize
186186
while (fieldNumber < writers.length) {

sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/UnsafeRowConverterSuite.scala

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -38,7 +38,7 @@ class UnsafeRowConverterSuite extends FunSuite with Matchers {
3838
val numBytesWritten = converter.writeRow(row, buffer, PlatformDependent.LONG_ARRAY_OFFSET)
3939
numBytesWritten should be (sizeRequired)
4040
val unsafeRow = new UnsafeRow()
41-
unsafeRow.set(buffer, PlatformDependent.LONG_ARRAY_OFFSET, fieldTypes.length, null)
41+
unsafeRow.pointTo(buffer, PlatformDependent.LONG_ARRAY_OFFSET, fieldTypes.length, null)
4242
unsafeRow.getLong(0) should be (0)
4343
unsafeRow.getLong(1) should be (1)
4444
unsafeRow.getInt(2) should be (2)
@@ -59,7 +59,7 @@ class UnsafeRowConverterSuite extends FunSuite with Matchers {
5959
val numBytesWritten = converter.writeRow(row, buffer, PlatformDependent.LONG_ARRAY_OFFSET)
6060
numBytesWritten should be (sizeRequired)
6161
val unsafeRow = new UnsafeRow()
62-
unsafeRow.set(buffer, PlatformDependent.LONG_ARRAY_OFFSET, fieldTypes.length, null)
62+
unsafeRow.pointTo(buffer, PlatformDependent.LONG_ARRAY_OFFSET, fieldTypes.length, null)
6363
unsafeRow.getLong(0) should be (0)
6464
unsafeRow.getString(1) should be ("Hello")
6565
unsafeRow.getString(2) should be ("World")

unsafe/src/main/java/org/apache/spark/unsafe/map/BytesToBytesMap.java

Lines changed: 24 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -17,14 +17,6 @@
1717

1818
package org.apache.spark.unsafe.map;
1919

20-
import org.apache.spark.unsafe.*;
21-
import org.apache.spark.unsafe.array.ByteArrayMethods;
22-
import org.apache.spark.unsafe.array.LongArray;
23-
import org.apache.spark.unsafe.bitset.BitSet;
24-
import org.apache.spark.unsafe.hash.Murmur3_x86_32;
25-
import org.apache.spark.unsafe.memory.*;
26-
27-
import java.lang.IllegalStateException;
2820
import java.lang.Long;
2921
import java.lang.Object;
3022
import java.lang.Override;
@@ -33,8 +25,17 @@
3325
import java.util.LinkedList;
3426
import java.util.List;
3527

28+
import org.apache.spark.unsafe.*;
29+
import org.apache.spark.unsafe.array.ByteArrayMethods;
30+
import org.apache.spark.unsafe.array.LongArray;
31+
import org.apache.spark.unsafe.bitset.BitSet;
32+
import org.apache.spark.unsafe.hash.Murmur3_x86_32;
33+
import org.apache.spark.unsafe.memory.*;
34+
3635
/**
37-
* A bytes to bytes hash map where keys and values are contiguous regions of bytes.
36+
* An append-only hash map where keys and values are contiguous regions of bytes.
37+
*
38+
* This class is not thread-safe.
3839
*
3940
* This is backed by a power-of-2-sized hash table, using quadratic probing with triangular numbers,
4041
* which is guaranteed to exhaust the space.
@@ -350,36 +351,34 @@ public long getValueLength() {
350351
}
351352

352353
/**
353-
* Sets the value defined at this position. This method may only be called once for a given
354-
* key; if you want to update the value associated with a key, then you can directly manipulate
355-
* the bytes stored at the value address.
354+
* Store a new key and value. This method may only be called once for a given key; if you want
355+
* to update the value associated with a key, then you can directly manipulate the bytes stored
356+
* at the value address.
356357
*
357-
* It is only valid to call this method after having first called `lookup()` using the same key.
358+
* It is only valid to call this method immediately after calling `lookup()` using the same key.
358359
*
359360
* After calling this method, calls to `get[Key|Value]Address()` and `get[Key|Value]Length`
360-
* will return information on the data stored by this `storeKeyAndValue` call.
361+
* will return information on the data stored by this `putNewKey` call.
361362
*
362363
* As an example usage, here's the proper way to store a new key:
363364
*
364365
* <code>
365366
* Location loc = map.lookup(keyBaseOffset, keyBaseObject, keyLengthInBytes);
366367
* if (!loc.isDefined()) {
367-
* loc.storeKeyAndValue(keyBaseOffset, keyBaseObject, keyLengthInBytes, ...)
368+
* loc.putNewKey(keyBaseOffset, keyBaseObject, keyLengthInBytes, ...)
368369
* }
369370
* </code>
370371
*
371372
* Unspecified behavior if the key is not defined.
372373
*/
373-
public void storeKeyAndValue(
374-
Object keyBaseObject,
375-
long keyBaseOffset,
376-
int keyLengthBytes, // TODO(josh): words? bytes? eventually, we'll want to be more consistent about this
377-
Object valueBaseObject,
378-
long valueBaseOffset,
379-
long valueLengthBytes) {
380-
if (isDefined) {
381-
throw new IllegalStateException("Can only set value once for a key");
382-
}
374+
public void putNewKey(
375+
Object keyBaseObject,
376+
long keyBaseOffset,
377+
int keyLengthBytes, // TODO(josh): words? bytes? eventually, we'll want to be more consistent about this
378+
Object valueBaseObject,
379+
long valueBaseOffset,
380+
long valueLengthBytes) {
381+
assert (!isDefined) : "Can only set value once for a key";
383382
isDefined = true;
384383
assert (keyLengthBytes % 8 == 0);
385384
assert (valueLengthBytes % 8 == 0);
@@ -388,7 +387,6 @@ public void storeKeyAndValue(
388387
// must be stored in the same memory page.
389388
final long requiredSize = 8 + 8 + keyLengthBytes + valueLengthBytes;
390389
assert(requiredSize <= PAGE_SIZE_BYTES);
391-
// Bookeeping
392390
size++;
393391
bitset.set(pos);
394392

unsafe/src/main/java/org/apache/spark/unsafe/map/HashMapGrowthStrategy.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,7 @@ public interface HashMapGrowthStrategy {
2727
/**
2828
* Double the size of the hash map every time.
2929
*/
30-
HashMapGrowthStrategy DOUBLING = new Doubling();
30+
HashMapGrowthStrategy DOUBLING = new Doubling();
3131

3232
class Doubling implements HashMapGrowthStrategy {
3333
@Override

unsafe/src/main/java/org/apache/spark/unsafe/string/UTF8StringMethods.java

Lines changed: 6 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -17,11 +17,13 @@
1717

1818
package org.apache.spark.unsafe.string;
1919

20+
import java.io.UnsupportedEncodingException;
21+
import java.lang.Object;
22+
import java.lang.String;
23+
2024
import org.apache.spark.unsafe.PlatformDependent;
2125
import org.apache.spark.unsafe.array.ByteArrayMethods;
2226

23-
import java.io.UnsupportedEncodingException;import java.lang.Object;import java.lang.String;
24-
2527
/**
2628
* A String encoded in UTF-8 as long representing the string's length, followed by a
2729
* contiguous region of bytes; see http://en.wikipedia.org/wiki/UTF-8 for details.
@@ -33,14 +35,6 @@ private UTF8StringMethods() {
3335
// See UTF8StringPointer for a more object-oriented interface to UTF8String data.
3436
}
3537

36-
/**
37-
* Return the length of the string, in bytes (NOT characters), not including
38-
* the space to store the length itself.
39-
*/
40-
static long getLengthInBytes(Object baseObject, long baseOffset) {
41-
return PlatformDependent.UNSAFE.getLong(baseObject, baseOffset);
42-
}
43-
4438
public static int compare(
4539
Object leftBaseObject,
4640
long leftBaseOffset,
@@ -68,7 +62,7 @@ public static boolean startsWith(
6862
int prefixLengthInBytes) {
6963
if (prefixLengthInBytes > strLengthInBytes) {
7064
return false;
71-
} {
65+
} else {
7266
return ByteArrayMethods.arrayEquals(
7367
strBaseObject,
7468
strBaseOffset,
@@ -87,7 +81,7 @@ public static boolean endsWith(
8781
int suffixLengthInBytes) {
8882
if (suffixLengthInBytes > strLengthInBytes) {
8983
return false;
90-
} {
84+
} else {
9185
return ByteArrayMethods.arrayEquals(
9286
strBaseObject,
9387
strBaseOffset + strLengthInBytes - suffixLengthInBytes,

unsafe/src/test/java/org/apache/spark/unsafe/map/AbstractTestBytesToBytesMap.java

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -96,7 +96,7 @@ public void setAndRetrieveAKey() {
9696
final BytesToBytesMap.Location loc =
9797
map.lookup(keyData, BYTE_ARRAY_OFFSET, recordLengthBytes);
9898
Assert.assertFalse(loc.isDefined());
99-
loc.storeKeyAndValue(
99+
loc.putNewKey(
100100
keyData,
101101
BYTE_ARRAY_OFFSET,
102102
recordLengthBytes,
@@ -119,7 +119,7 @@ public void setAndRetrieveAKey() {
119119
Assert.assertArrayEquals(valueData, getByteArray(loc.getValueAddress(), recordLengthBytes));
120120

121121
try {
122-
loc.storeKeyAndValue(
122+
loc.putNewKey(
123123
keyData,
124124
BYTE_ARRAY_OFFSET,
125125
recordLengthBytes,
@@ -146,7 +146,7 @@ public void iteratorTest() throws Exception {
146146
final BytesToBytesMap.Location loc =
147147
map.lookup(value, PlatformDependent.LONG_ARRAY_OFFSET, 8);
148148
Assert.assertFalse(loc.isDefined());
149-
loc.storeKeyAndValue(
149+
loc.putNewKey(
150150
value,
151151
PlatformDependent.LONG_ARRAY_OFFSET,
152152
8,
@@ -196,15 +196,15 @@ public void randomizedStressTest() {
196196
key.length
197197
);
198198
Assert.assertFalse(loc.isDefined());
199-
loc.storeKeyAndValue(
199+
loc.putNewKey(
200200
key,
201201
BYTE_ARRAY_OFFSET,
202202
key.length,
203203
value,
204204
BYTE_ARRAY_OFFSET,
205205
value.length
206206
);
207-
// After calling storeKeyAndValue, the following should be true, even before calling
207+
// After calling putNewKey, the following should be true, even before calling
208208
// lookup():
209209
Assert.assertTrue(loc.isDefined());
210210
Assert.assertEquals(key.length, loc.getKeyLength());

0 commit comments

Comments
 (0)