Skip to content

Commit 56e2b83

Browse files
committed
merge master and resolve conflicts
2 parents 8adad7c + 97a1aa2 commit 56e2b83

File tree

28 files changed

+2437
-168
lines changed

28 files changed

+2437
-168
lines changed

bin/pyspark

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -68,7 +68,7 @@ if [[ -n "$SPARK_TESTING" ]]; then
6868
unset YARN_CONF_DIR
6969
unset HADOOP_CONF_DIR
7070
export PYTHONHASHSEED=0
71-
exec "$PYSPARK_DRIVER_PYTHON" -m "$1"
71+
exec "$PYSPARK_DRIVER_PYTHON" -m "$@"
7272
exit
7373
fi
7474

common/network-common/pom.xml

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -90,7 +90,8 @@
9090
<dependency>
9191
<groupId>org.apache.spark</groupId>
9292
<artifactId>spark-tags_${scala.binary.version}</artifactId>
93-
</dependency>
93+
<scope>test</scope>
94+
</dependency>
9495

9596
<!--
9697
This spark-tags test-dep is needed even though it isn't used in this module, otherwise testing-cmds that exclude

common/network-common/src/main/java/org/apache/spark/network/client/TransportResponseHandler.java

Lines changed: 10 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -24,10 +24,10 @@
2424
import java.util.concurrent.ConcurrentLinkedQueue;
2525
import java.util.concurrent.atomic.AtomicLong;
2626

27-
import scala.Tuple2;
28-
2927
import com.google.common.annotations.VisibleForTesting;
3028
import io.netty.channel.Channel;
29+
import org.apache.commons.lang3.tuple.ImmutablePair;
30+
import org.apache.commons.lang3.tuple.Pair;
3131
import org.slf4j.Logger;
3232
import org.slf4j.LoggerFactory;
3333

@@ -58,7 +58,7 @@ public class TransportResponseHandler extends MessageHandler<ResponseMessage> {
5858

5959
private final Map<Long, RpcResponseCallback> outstandingRpcs;
6060

61-
private final Queue<Tuple2<String, StreamCallback>> streamCallbacks;
61+
private final Queue<Pair<String, StreamCallback>> streamCallbacks;
6262
private volatile boolean streamActive;
6363

6464
/** Records the time (in system nanoseconds) that the last fetch or RPC request was sent. */
@@ -92,7 +92,7 @@ public void removeRpcRequest(long requestId) {
9292

9393
public void addStreamCallback(String streamId, StreamCallback callback) {
9494
timeOfLastRequestNs.set(System.nanoTime());
95-
streamCallbacks.offer(new Tuple2<>(streamId, callback));
95+
streamCallbacks.offer(ImmutablePair.of(streamId, callback));
9696
}
9797

9898
@VisibleForTesting
@@ -119,9 +119,9 @@ private void failOutstandingRequests(Throwable cause) {
119119
logger.warn("RpcResponseCallback.onFailure throws exception", e);
120120
}
121121
}
122-
for (Tuple2<String, StreamCallback> entry : streamCallbacks) {
122+
for (Pair<String, StreamCallback> entry : streamCallbacks) {
123123
try {
124-
entry._2().onFailure(entry._1(), cause);
124+
entry.getValue().onFailure(entry.getKey(), cause);
125125
} catch (Exception e) {
126126
logger.warn("StreamCallback.onFailure throws exception", e);
127127
}
@@ -208,9 +208,9 @@ public void handle(ResponseMessage message) throws Exception {
208208
}
209209
} else if (message instanceof StreamResponse) {
210210
StreamResponse resp = (StreamResponse) message;
211-
Tuple2<String, StreamCallback> entry = streamCallbacks.poll();
211+
Pair<String, StreamCallback> entry = streamCallbacks.poll();
212212
if (entry != null) {
213-
StreamCallback callback = entry._2();
213+
StreamCallback callback = entry.getValue();
214214
if (resp.byteCount > 0) {
215215
StreamInterceptor interceptor = new StreamInterceptor(this, resp.streamId, resp.byteCount,
216216
callback);
@@ -235,9 +235,9 @@ public void handle(ResponseMessage message) throws Exception {
235235
}
236236
} else if (message instanceof StreamFailure) {
237237
StreamFailure resp = (StreamFailure) message;
238-
Tuple2<String, StreamCallback> entry = streamCallbacks.poll();
238+
Pair<String, StreamCallback> entry = streamCallbacks.poll();
239239
if (entry != null) {
240-
StreamCallback callback = entry._2();
240+
StreamCallback callback = entry.getValue();
241241
try {
242242
callback.onFailure(resp.streamId, new RuntimeException(resp.error));
243243
} catch (IOException ioe) {

common/network-common/src/main/java/org/apache/spark/network/server/OneForOneStreamManager.java

Lines changed: 5 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -23,8 +23,6 @@
2323
import java.util.concurrent.ConcurrentHashMap;
2424
import java.util.concurrent.atomic.AtomicLong;
2525

26-
import scala.Tuple2;
27-
2826
import com.google.common.base.Preconditions;
2927
import io.netty.channel.Channel;
3028
import org.slf4j.Logger;
@@ -98,21 +96,16 @@ public ManagedBuffer getChunk(long streamId, int chunkIndex) {
9896

9997
@Override
10098
public ManagedBuffer openStream(String streamChunkId) {
101-
Tuple2<Long, Integer> streamIdAndChunkId = parseStreamChunkId(streamChunkId);
102-
return getChunk(streamIdAndChunkId._1, streamIdAndChunkId._2);
103-
}
104-
105-
public static String genStreamChunkId(long streamId, int chunkId) {
106-
return String.format("%d_%d", streamId, chunkId);
107-
}
108-
109-
public static Tuple2<Long, Integer> parseStreamChunkId(String streamChunkId) {
11099
String[] array = streamChunkId.split("_");
111100
assert array.length == 2:
112101
"Stream id and chunk index should be specified when open stream for fetching block.";
113102
long streamId = Long.valueOf(array[0]);
114103
int chunkIndex = Integer.valueOf(array[1]);
115-
return new Tuple2<>(streamId, chunkIndex);
104+
return getChunk(streamId, chunkIndex);
105+
}
106+
107+
public static String genStreamChunkId(long streamId, int chunkId) {
108+
return String.format("%d_%d", streamId, chunkId);
116109
}
117110

118111
@Override

common/network-shuffle/pom.xml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -69,6 +69,7 @@
6969
<dependency>
7070
<groupId>org.apache.spark</groupId>
7171
<artifactId>spark-tags_${scala.binary.version}</artifactId>
72+
<scope>test</scope>
7273
</dependency>
7374

7475
<!--

common/network-yarn/pom.xml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -48,6 +48,7 @@
4848
<dependency>
4949
<groupId>org.apache.spark</groupId>
5050
<artifactId>spark-tags_${scala.binary.version}</artifactId>
51+
<scope>test</scope>
5152
</dependency>
5253

5354
<!--

core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeExternalSorter.java

Lines changed: 30 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -589,29 +589,54 @@ public long getKeyPrefix() {
589589
}
590590

591591
/**
592-
* Returns a iterator, which will return the rows in the order as inserted.
592+
* Returns an iterator starts from startIndex, which will return the rows in the order as
593+
* inserted.
593594
*
594595
* It is the caller's responsibility to call `cleanupResources()`
595596
* after consuming this iterator.
596597
*
597598
* TODO: support forced spilling
598599
*/
599-
public UnsafeSorterIterator getIterator() throws IOException {
600+
public UnsafeSorterIterator getIterator(int startIndex) throws IOException {
600601
if (spillWriters.isEmpty()) {
601602
assert(inMemSorter != null);
602-
return inMemSorter.getSortedIterator();
603+
UnsafeSorterIterator iter = inMemSorter.getSortedIterator();
604+
moveOver(iter, startIndex);
605+
return iter;
603606
} else {
604607
LinkedList<UnsafeSorterIterator> queue = new LinkedList<>();
608+
int i = 0;
605609
for (UnsafeSorterSpillWriter spillWriter : spillWriters) {
606-
queue.add(spillWriter.getReader(serializerManager));
610+
if (i + spillWriter.recordsSpilled() > startIndex) {
611+
UnsafeSorterIterator iter = spillWriter.getReader(serializerManager);
612+
moveOver(iter, startIndex - i);
613+
queue.add(iter);
614+
}
615+
i += spillWriter.recordsSpilled();
607616
}
608617
if (inMemSorter != null) {
609-
queue.add(inMemSorter.getSortedIterator());
618+
UnsafeSorterIterator iter = inMemSorter.getSortedIterator();
619+
moveOver(iter, startIndex - i);
620+
queue.add(iter);
610621
}
611622
return new ChainedIterator(queue);
612623
}
613624
}
614625

626+
private void moveOver(UnsafeSorterIterator iter, int steps)
627+
throws IOException {
628+
if (steps > 0) {
629+
for (int i = 0; i < steps; i++) {
630+
if (iter.hasNext()) {
631+
iter.loadNext();
632+
} else {
633+
throw new ArrayIndexOutOfBoundsException("Failed to move the iterator " + steps +
634+
" steps forward");
635+
}
636+
}
637+
}
638+
}
639+
615640
/**
616641
* Chain multiple UnsafeSorterIterator together as single one.
617642
*/

core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeSorterSpillWriter.java

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -155,4 +155,8 @@ public File getFile() {
155155
public UnsafeSorterSpillReader getReader(SerializerManager serializerManager) throws IOException {
156156
return new UnsafeSorterSpillReader(serializerManager, file, blockId);
157157
}
158+
159+
public int recordsSpilled() {
160+
return numRecordsSpilled;
161+
}
158162
}

core/src/test/java/org/apache/spark/util/collection/unsafe/sort/UnsafeExternalSorterSuite.java

Lines changed: 33 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -395,7 +395,7 @@ public void forcedSpillingWithoutComparator() throws Exception {
395395
sorter.spill();
396396
}
397397
}
398-
UnsafeSorterIterator iter = sorter.getIterator();
398+
UnsafeSorterIterator iter = sorter.getIterator(0);
399399
for (int i = 0; i < n; i++) {
400400
iter.hasNext();
401401
iter.loadNext();
@@ -479,5 +479,37 @@ public void testPeakMemoryUsed() throws Exception {
479479
}
480480
}
481481

482+
@Test
483+
public void testGetIterator() throws Exception {
484+
final UnsafeExternalSorter sorter = newSorter();
485+
for (int i = 0; i < 100; i++) {
486+
insertNumber(sorter, i);
487+
}
488+
verifyIntIterator(sorter.getIterator(0), 0, 100);
489+
verifyIntIterator(sorter.getIterator(79), 79, 100);
490+
491+
sorter.spill();
492+
for (int i = 100; i < 200; i++) {
493+
insertNumber(sorter, i);
494+
}
495+
sorter.spill();
496+
verifyIntIterator(sorter.getIterator(79), 79, 200);
497+
498+
for (int i = 200; i < 300; i++) {
499+
insertNumber(sorter, i);
500+
}
501+
verifyIntIterator(sorter.getIterator(79), 79, 300);
502+
verifyIntIterator(sorter.getIterator(139), 139, 300);
503+
verifyIntIterator(sorter.getIterator(279), 279, 300);
504+
}
505+
506+
private void verifyIntIterator(UnsafeSorterIterator iter, int start, int end)
507+
throws IOException {
508+
for (int i = start; i < end; i++) {
509+
assert (iter.hasNext());
510+
iter.loadNext();
511+
assert (Platform.getInt(iter.getBaseObject(), iter.getBaseOffset()) == i);
512+
}
513+
}
482514
}
483515

dev/deps/spark-deps-hadoop-2.6

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,9 @@ apacheds-kerberos-codec-2.0.0-M15.jar
1313
api-asn1-api-1.0.0-M20.jar
1414
api-util-1.0.0-M20.jar
1515
arpack_combined_all-0.1.jar
16+
arrow-format-0.4.0.jar
17+
arrow-memory-0.4.0.jar
18+
arrow-vector-0.4.0.jar
1619
avro-1.7.7.jar
1720
avro-ipc-1.7.7.jar
1821
avro-mapred-1.7.7-hadoop2.jar
@@ -55,6 +58,7 @@ datanucleus-core-3.2.10.jar
5558
datanucleus-rdbms-3.2.9.jar
5659
derby-10.12.1.1.jar
5760
eigenbase-properties-1.1.5.jar
61+
flatbuffers-1.2.0-3f79e055.jar
5862
gson-2.2.4.jar
5963
guava-14.0.1.jar
6064
guice-3.0.jar
@@ -77,6 +81,7 @@ hadoop-yarn-server-web-proxy-2.6.5.jar
7781
hk2-api-2.4.0-b34.jar
7882
hk2-locator-2.4.0-b34.jar
7983
hk2-utils-2.4.0-b34.jar
84+
hppc-0.7.1.jar
8085
htrace-core-3.0.4.jar
8186
httpclient-4.5.2.jar
8287
httpcore-4.4.4.jar

0 commit comments

Comments
 (0)