Skip to content
Closed
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion assembly/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@
<parent>
<groupId>org.apache.spark</groupId>
<artifactId>spark-parent_2.12</artifactId>
<version>3.0.0-SNAPSHOT</version>
<version>Using</version>
<relativePath>../pom.xml</relativePath>
</parent>

Expand Down
2 changes: 1 addition & 1 deletion common/kvstore/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@
<parent>
<groupId>org.apache.spark</groupId>
<artifactId>spark-parent_2.12</artifactId>
<version>3.0.0-SNAPSHOT</version>
<version>Using</version>
<relativePath>../../pom.xml</relativePath>
</parent>

Expand Down
2 changes: 1 addition & 1 deletion common/network-common/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@
<parent>
<groupId>org.apache.spark</groupId>
<artifactId>spark-parent_2.12</artifactId>
<version>3.0.0-SNAPSHOT</version>
<version>Using</version>
<relativePath>../../pom.xml</relativePath>
</parent>

Expand Down
2 changes: 1 addition & 1 deletion common/network-shuffle/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@
<parent>
<groupId>org.apache.spark</groupId>
<artifactId>spark-parent_2.12</artifactId>
<version>3.0.0-SNAPSHOT</version>
<version>Using</version>
<relativePath>../../pom.xml</relativePath>
</parent>

Expand Down
2 changes: 1 addition & 1 deletion common/network-yarn/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@
<parent>
<groupId>org.apache.spark</groupId>
<artifactId>spark-parent_2.12</artifactId>
<version>3.0.0-SNAPSHOT</version>
<version>Using</version>
<relativePath>../../pom.xml</relativePath>
</parent>

Expand Down
2 changes: 1 addition & 1 deletion common/sketch/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@
<parent>
<groupId>org.apache.spark</groupId>
<artifactId>spark-parent_2.12</artifactId>
<version>3.0.0-SNAPSHOT</version>
<version>Using</version>
<relativePath>../../pom.xml</relativePath>
</parent>

Expand Down
2 changes: 1 addition & 1 deletion common/tags/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@
<parent>
<groupId>org.apache.spark</groupId>
<artifactId>spark-parent_2.12</artifactId>
<version>3.0.0-SNAPSHOT</version>
<version>Using</version>
<relativePath>../../pom.xml</relativePath>
</parent>

Expand Down
2 changes: 1 addition & 1 deletion common/unsafe/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@
<parent>
<groupId>org.apache.spark</groupId>
<artifactId>spark-parent_2.12</artifactId>
<version>3.0.0-SNAPSHOT</version>
<version>Using</version>
<relativePath>../../pom.xml</relativePath>
</parent>

Expand Down
2 changes: 1 addition & 1 deletion core/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@
<parent>
<groupId>org.apache.spark</groupId>
<artifactId>spark-parent_2.12</artifactId>
<version>3.0.0-SNAPSHOT</version>
<version>Using</version>
<relativePath>../pom.xml</relativePath>
</parent>

Expand Down
2 changes: 1 addition & 1 deletion examples/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@
<parent>
<groupId>org.apache.spark</groupId>
<artifactId>spark-parent_2.12</artifactId>
<version>3.0.0-SNAPSHOT</version>
<version>Using</version>
<relativePath>../pom.xml</relativePath>
</parent>

Expand Down
2 changes: 1 addition & 1 deletion external/avro/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@
<parent>
<groupId>org.apache.spark</groupId>
<artifactId>spark-parent_2.12</artifactId>
<version>3.0.0-SNAPSHOT</version>
<version>Using</version>
<relativePath>../../pom.xml</relativePath>
</parent>

Expand Down
2 changes: 1 addition & 1 deletion external/docker-integration-tests/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@
<parent>
<groupId>org.apache.spark</groupId>
<artifactId>spark-parent_2.12</artifactId>
<version>3.0.0-SNAPSHOT</version>
<version>Using</version>
<relativePath>../../pom.xml</relativePath>
</parent>

Expand Down
2 changes: 1 addition & 1 deletion external/kafka-0-10-assembly/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@
<parent>
<groupId>org.apache.spark</groupId>
<artifactId>spark-parent_2.12</artifactId>
<version>3.0.0-SNAPSHOT</version>
<version>Using</version>
<relativePath>../../pom.xml</relativePath>
</parent>

Expand Down
2 changes: 1 addition & 1 deletion external/kafka-0-10-sql/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@
<parent>
<groupId>org.apache.spark</groupId>
<artifactId>spark-parent_2.12</artifactId>
<version>3.0.0-SNAPSHOT</version>
<version>Using</version>
<relativePath>../../pom.xml</relativePath>
</parent>

Expand Down
2 changes: 1 addition & 1 deletion external/kafka-0-10-token-provider/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@
<parent>
<groupId>org.apache.spark</groupId>
<artifactId>spark-parent_2.12</artifactId>
<version>3.0.0-SNAPSHOT</version>
<version>Using</version>
<relativePath>../../pom.xml</relativePath>
</parent>

Expand Down
2 changes: 1 addition & 1 deletion external/kafka-0-10/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@
<parent>
<groupId>org.apache.spark</groupId>
<artifactId>spark-parent_2.12</artifactId>
<version>3.0.0-SNAPSHOT</version>
<version>Using</version>
<relativePath>../../pom.xml</relativePath>
</parent>

Expand Down
2 changes: 1 addition & 1 deletion external/kinesis-asl-assembly/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@
<parent>
<groupId>org.apache.spark</groupId>
<artifactId>spark-parent_2.12</artifactId>
<version>3.0.0-SNAPSHOT</version>
<version>Using</version>
<relativePath>../../pom.xml</relativePath>
</parent>

Expand Down
2 changes: 1 addition & 1 deletion external/kinesis-asl/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@
<parent>
<groupId>org.apache.spark</groupId>
<artifactId>spark-parent_2.12</artifactId>
<version>3.0.0-SNAPSHOT</version>
<version>Using</version>
<relativePath>../../pom.xml</relativePath>
</parent>

Expand Down
2 changes: 1 addition & 1 deletion external/spark-ganglia-lgpl/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@
<parent>
<groupId>org.apache.spark</groupId>
<artifactId>spark-parent_2.12</artifactId>
<version>3.0.0-SNAPSHOT</version>
<version>Using</version>
<relativePath>../../pom.xml</relativePath>
</parent>

Expand Down
2 changes: 1 addition & 1 deletion graphx/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@
<parent>
<groupId>org.apache.spark</groupId>
<artifactId>spark-parent_2.12</artifactId>
<version>3.0.0-SNAPSHOT</version>
<version>Using</version>
<relativePath>../pom.xml</relativePath>
</parent>

Expand Down
2 changes: 1 addition & 1 deletion hadoop-cloud/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@
<parent>
<groupId>org.apache.spark</groupId>
<artifactId>spark-parent_2.12</artifactId>
<version>3.0.0-SNAPSHOT</version>
<version>Using</version>
<relativePath>../pom.xml</relativePath>
</parent>

Expand Down
2 changes: 1 addition & 1 deletion launcher/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@
<parent>
<groupId>org.apache.spark</groupId>
<artifactId>spark-parent_2.12</artifactId>
<version>3.0.0-SNAPSHOT</version>
<version>Using</version>
<relativePath>../pom.xml</relativePath>
</parent>

Expand Down
2 changes: 1 addition & 1 deletion mllib-local/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@
<parent>
<groupId>org.apache.spark</groupId>
<artifactId>spark-parent_2.12</artifactId>
<version>3.0.0-SNAPSHOT</version>
<version>Using</version>
<relativePath>../pom.xml</relativePath>
</parent>

Expand Down
2 changes: 1 addition & 1 deletion mllib/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@
<parent>
<groupId>org.apache.spark</groupId>
<artifactId>spark-parent_2.12</artifactId>
<version>3.0.0-SNAPSHOT</version>
<version>Using</version>
<relativePath>../pom.xml</relativePath>
</parent>

Expand Down
2 changes: 1 addition & 1 deletion pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@
</parent>
<groupId>org.apache.spark</groupId>
<artifactId>spark-parent_2.12</artifactId>
<version>3.0.0-SNAPSHOT</version>
<version>Using</version>
<packaging>pom</packaging>
<name>Spark Project Parent POM</name>
<url>http://spark.apache.org/</url>
Expand Down
2 changes: 1 addition & 1 deletion repl/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@
<parent>
<groupId>org.apache.spark</groupId>
<artifactId>spark-parent_2.12</artifactId>
<version>3.0.0-SNAPSHOT</version>
<version>Using</version>
<relativePath>../pom.xml</relativePath>
</parent>

Expand Down
2 changes: 1 addition & 1 deletion resource-managers/kubernetes/core/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@
<parent>
<groupId>org.apache.spark</groupId>
<artifactId>spark-parent_2.12</artifactId>
<version>3.0.0-SNAPSHOT</version>
<version>Using</version>
<relativePath>../../../pom.xml</relativePath>
</parent>

Expand Down
2 changes: 1 addition & 1 deletion resource-managers/kubernetes/integration-tests/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@
<parent>
<groupId>org.apache.spark</groupId>
<artifactId>spark-parent_2.12</artifactId>
<version>3.0.0-SNAPSHOT</version>
<version>Using</version>
<relativePath>../../../pom.xml</relativePath>
</parent>

Expand Down
2 changes: 1 addition & 1 deletion resource-managers/mesos/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@
<parent>
<groupId>org.apache.spark</groupId>
<artifactId>spark-parent_2.12</artifactId>
<version>3.0.0-SNAPSHOT</version>
<version>Using</version>
<relativePath>../../pom.xml</relativePath>
</parent>

Expand Down
2 changes: 1 addition & 1 deletion resource-managers/yarn/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@
<parent>
<groupId>org.apache.spark</groupId>
<artifactId>spark-parent_2.12</artifactId>
<version>3.0.0-SNAPSHOT</version>
<version>Using</version>
<relativePath>../../pom.xml</relativePath>
</parent>

Expand Down
2 changes: 1 addition & 1 deletion sql/catalyst/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@
<parent>
<groupId>org.apache.spark</groupId>
<artifactId>spark-parent_2.12</artifactId>
<version>3.0.0-SNAPSHOT</version>
<version>Using</version>
<relativePath>../../pom.xml</relativePath>
</parent>

Expand Down
2 changes: 1 addition & 1 deletion sql/core/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@
<parent>
<groupId>org.apache.spark</groupId>
<artifactId>spark-parent_2.12</artifactId>
<version>3.0.0-SNAPSHOT</version>
<version>Using</version>
<relativePath>../../pom.xml</relativePath>
</parent>

Expand Down
2 changes: 1 addition & 1 deletion sql/hive-thriftserver/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@
<parent>
<groupId>org.apache.spark</groupId>
<artifactId>spark-parent_2.12</artifactId>
<version>3.0.0-SNAPSHOT</version>
<version>Using</version>
<relativePath>../../pom.xml</relativePath>
</parent>

Expand Down
2 changes: 1 addition & 1 deletion sql/hive/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@
<parent>
<groupId>org.apache.spark</groupId>
<artifactId>spark-parent_2.12</artifactId>
<version>3.0.0-SNAPSHOT</version>
<version>Using</version>
<relativePath>../../pom.xml</relativePath>
</parent>

Expand Down
2 changes: 1 addition & 1 deletion streaming/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@
<parent>
<groupId>org.apache.spark</groupId>
<artifactId>spark-parent_2.12</artifactId>
<version>3.0.0-SNAPSHOT</version>
<version>Using</version>
<relativePath>../pom.xml</relativePath>
</parent>

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -112,7 +112,7 @@ private[streaming] class ReceivedBlockTracker(
def allocateBlocksToBatch(batchTime: Time): Unit = synchronized {
if (lastAllocatedBatchTime == null || batchTime > lastAllocatedBatchTime) {
val streamIdToBlocks = streamIds.map { streamId =>
(streamId, getReceivedBlockQueue(streamId).clone())
(streamId, getReceivedBlockQueue(streamId).clone().dequeueAll(x => true))
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This just basically copies the content right? would it be more robust to just construct a new mutable.Seq from the queue to be sure about what the type is? I know it won't be a Queue here but sounds like the representation does kind of matter.

Was https://issues.apache.org/jira/browse/SPARK-23358 / https://issues.apache.org/jira/browse/SPARK-23391 really the cause or did it mostly uncover this?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sure, I could do:

mutable.ArrayBuffer.empty[ReceivedBlockInfo] ++= getReceivedBlockQueue(streamId).clone()

which ensures that we know exactly the sequence type being serialized; would you prefer I did that?

https://issues.apache.org/jira/browse/SPARK-23991 caused it in the sense that it moved from using dequeueAll to just serializing a clone of the queue, and the queue just doesn't serialize correctly after some number of entries (a scala bug, IMO). You could say it "uncovered" a scala bug that was sitting there, but prior to that checking the code wouldn't error with large numbers of entries because dequeueAll constructs an ArrayBuffer and that's what was being serialized.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh I was looking at the wrong JIRA, number typo. Yeah I see it. @gaborgsomogyi that makes sense?

How about ArrayBuffer(...:_*)? might be better as it could exactly allocate the size.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That's not a scala but java limitation. Serialization is vulnerable to stack overflow for certain kind of structures; for example, a long linked list with no special writeObject() methods will be serialized by recursively writing each link. If you've got a 100k links, you're going to try to use 100k stack frames, and quite likely fail with a StackOverflowError. The main thing here is to use something which is not linked.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would also choose a solution which allocates the buffer in one step and we could avoid amortized O(1) issue.

Additionally it will be a weird construct which can be tried to simplified by later developers. I would add a comment here which describes the issue not to enforce people to analyze this again.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes I think that's the point here indeed, but a Queue apparently has a linked-list-like representation. ArrayBuffer won't. Are you saying that's the right change or no? my point was we're not actually guaranteed what dequeueAll returns.

A comment would be good, sure.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think .dequeueAll(x => true) works but not the right change and I would propose similar things what you've mentioned, for example ArrayBuffer.... The main point is to allocate the buffer in one step which is not the case with dequeueAll.

Copy link
Contributor Author

@rlodge rlodge Feb 4, 2019

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sure, I can add a comment. Is

ArrayBuffer(queue.clone(): _*)

sufficient, or do we need

val clonedQueue = getReceivedBlockQueue(streamId).clone()
val bufferToWrite = new mutable.ArrayBuffer[ReceivedBlockInfo](clonedQueue.size)
bufferToWrite ++= clonedQueue
(streamId,
   bufferToWrite)

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The first one looks good to me. It definitely makes an ArrayBuffer! and I can only imagine the ordering that is produced is the order of dequeueing the elements. It was so when I tried it locally.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ok, I pushed those changes.

}.toMap
val allocatedBlocks = AllocatedBlocks(streamIdToBlocks)
if (writeToLog(BatchAllocationEvent(batchTime, allocatedBlocks))) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,13 @@ class ReceivedBlockTrackerSuite
receivedBlockTracker.getUnallocatedBlocks(streamId) shouldEqual blockInfos
}

test("block addition, and block to batch allocation with many blocks") {
val receivedBlockTracker = createTracker(setCheckpointDir = true)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

setCheckpointDir is already true.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ok

val blockInfos = generateBlockInfos(100000)
blockInfos.map(receivedBlockTracker.addBlock)
receivedBlockTracker.allocateBlocksToBatch(1)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would add assertions here as well. If it's not throwing exception doesn't mean the same blocks deserialized.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

sure

}

test("recovery with write ahead logs should remove only allocated blocks from received queue") {
val manualClock = new ManualClock
val batchTime = manualClock.getTimeMillis()
Expand Down Expand Up @@ -362,8 +369,8 @@ class ReceivedBlockTrackerSuite
}

/** Generate blocks infos using random ids */
def generateBlockInfos(): Seq[ReceivedBlockInfo] = {
List.fill(5)(ReceivedBlockInfo(streamId, Some(0L), None,
def generateBlockInfos(blockCount: Int = 5): Seq[ReceivedBlockInfo] = {
List.fill(blockCount)(ReceivedBlockInfo(streamId, Some(0L), None,
BlockManagerBasedStoreResult(StreamBlockId(streamId, math.abs(Random.nextInt)), Some(0L))))
}

Expand Down
2 changes: 1 addition & 1 deletion tools/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@
<parent>
<groupId>org.apache.spark</groupId>
<artifactId>spark-parent_2.12</artifactId>
<version>3.0.0-SNAPSHOT</version>
<version>Using</version>
<relativePath>../pom.xml</relativePath>
</parent>

Expand Down