Skip to content

Commit 57f1ec0

Browse files
committed
WIP towards packed record pointers for use in optimized shuffle sort.
1 parent 69232fd commit 57f1ec0

File tree

2 files changed

+131
-0
lines changed

2 files changed

+131
-0
lines changed
Lines changed: 74 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,74 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.spark.shuffle.unsafe;
19+
20+
/**
21+
* Wrapper around an 8-byte word that holds a 24-bit partition number and 40-bit record pointer.
22+
*/
23+
final class PackedRecordPointer {
24+
25+
/** Bit mask for the lower 40 bits of a long. */
26+
private static final long MASK_LONG_LOWER_40_BITS = 0xFFFFFFFFFFL;
27+
28+
/** Bit mask for the upper 24 bits of a long */
29+
private static final long MASK_LONG_UPPER_24_BITS = ~MASK_LONG_LOWER_40_BITS;
30+
31+
/** Bit mask for the lower 27 bits of a long. */
32+
private static final long MASK_LONG_LOWER_27_BITS = 0x7FFFFFFL;
33+
34+
/** Bit mask for the lower 51 bits of a long. */
35+
private static final long MASK_LONG_LOWER_51_BITS = 0x7FFFFFFFFFFFFL;
36+
37+
/** Bit mask for the upper 13 bits of a long */
38+
private static final long MASK_LONG_UPPER_13_BITS = ~MASK_LONG_LOWER_51_BITS;
39+
40+
// TODO: this shifting is probably extremely inefficient; this is just for prototyping
41+
42+
/**
43+
* Pack a record address and partition id into a single word.
44+
*
45+
* @param recordPointer a record pointer encoded by TaskMemoryManager.
46+
* @param partitionId a shuffle partition id (maximum value of 2^24).
47+
* @return a packed pointer that can be decoded using the {@link PackedRecordPointer} class.
48+
*/
49+
public static long packPointer(long recordPointer, int partitionId) {
50+
// Note that without word alignment we can address 2^27 bytes = 128 megabytes per page.
51+
// Also note that this relies on some internals of how TaskMemoryManager encodes its addresses.
52+
final int pageNumber = (int) ((recordPointer & MASK_LONG_UPPER_13_BITS) >>> 51);
53+
final long compressedAddress =
54+
(((long) pageNumber) << 27) | (recordPointer & MASK_LONG_LOWER_27_BITS);
55+
return (((long) partitionId) << 40) | compressedAddress;
56+
}
57+
58+
public long packedRecordPointer;
59+
60+
public int getPartitionId() {
61+
return (int) ((packedRecordPointer & MASK_LONG_UPPER_24_BITS) >>> 40);
62+
}
63+
64+
public long getRecordPointer() {
65+
final long compressedAddress = packedRecordPointer & MASK_LONG_LOWER_40_BITS;
66+
final long pageNumber = (compressedAddress << 24) & MASK_LONG_UPPER_13_BITS;
67+
final long offsetInPage = compressedAddress & MASK_LONG_LOWER_27_BITS;
68+
return pageNumber | offsetInPage;
69+
}
70+
71+
public int getRecordLength() {
72+
return -1; // TODO
73+
}
74+
}
Lines changed: 57 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,57 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.spark.shuffle.unsafe;
19+
20+
import org.junit.Assert;
21+
import org.junit.Test;
22+
23+
import org.apache.spark.unsafe.memory.ExecutorMemoryManager;
24+
import org.apache.spark.unsafe.memory.MemoryAllocator;
25+
import org.apache.spark.unsafe.memory.MemoryBlock;
26+
import org.apache.spark.unsafe.memory.TaskMemoryManager;
27+
28+
public class PackedRecordPointerSuite {
29+
30+
@Test
31+
public void heap() {
32+
final TaskMemoryManager memoryManager =
33+
new TaskMemoryManager(new ExecutorMemoryManager(MemoryAllocator.HEAP));
34+
final MemoryBlock page0 = memoryManager.allocatePage(100);
35+
final MemoryBlock page1 = memoryManager.allocatePage(100);
36+
final long addressInPage1 = memoryManager.encodePageNumberAndOffset(page1, 42);
37+
PackedRecordPointer packedPointerWrapper = new PackedRecordPointer();
38+
packedPointerWrapper.packedRecordPointer = PackedRecordPointer.packPointer(addressInPage1, 360);
39+
Assert.assertEquals(360, packedPointerWrapper.getPartitionId());
40+
Assert.assertEquals(addressInPage1, packedPointerWrapper.getRecordPointer());
41+
memoryManager.cleanUpAllAllocatedMemory();
42+
}
43+
44+
@Test
45+
public void offHeap() {
46+
final TaskMemoryManager memoryManager =
47+
new TaskMemoryManager(new ExecutorMemoryManager(MemoryAllocator.UNSAFE));
48+
final MemoryBlock page0 = memoryManager.allocatePage(100);
49+
final MemoryBlock page1 = memoryManager.allocatePage(100);
50+
final long addressInPage1 = memoryManager.encodePageNumberAndOffset(page1, 42);
51+
PackedRecordPointer packedPointerWrapper = new PackedRecordPointer();
52+
packedPointerWrapper.packedRecordPointer = PackedRecordPointer.packPointer(addressInPage1, 360);
53+
Assert.assertEquals(360, packedPointerWrapper.getPartitionId());
54+
Assert.assertEquals(addressInPage1, packedPointerWrapper.getRecordPointer());
55+
memoryManager.cleanUpAllAllocatedMemory();
56+
}
57+
}

0 commit comments

Comments
 (0)