Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -141,19 +141,6 @@ private[spark] class IndexShuffleBlockResolver(
val indexFile = getIndexFile(shuffleId, mapId)
val indexTmp = Utils.tempFileWith(indexFile)
try {
val out = new DataOutputStream(new BufferedOutputStream(new FileOutputStream(indexTmp)))
Utils.tryWithSafeFinally {
// We take in lengths of each block, need to convert it to offsets.
var offset = 0L
out.writeLong(offset)
for (length <- lengths) {
offset += length
out.writeLong(offset)
}
} {
out.close()
}

val dataFile = getDataFile(shuffleId, mapId)
// There is only one IndexShuffleBlockResolver per executor, this synchronization make sure
// the following check and rename are atomic.
Expand All @@ -166,10 +153,22 @@ private[spark] class IndexShuffleBlockResolver(
if (dataTmp != null && dataTmp.exists()) {
dataTmp.delete()
}
indexTmp.delete()
} else {
// This is the first successful attempt in writing the map outputs for this task,
// so override any existing index and data files with the ones we wrote.
val out = new DataOutputStream(new BufferedOutputStream(new FileOutputStream(indexTmp)))
Utils.tryWithSafeFinally {
// We take in lengths of each block, need to convert it to offsets.
var offset = 0L
out.writeLong(offset)
for (length <- lengths) {
offset += length
out.writeLong(offset)
}
} {
out.close()
}

if (indexFile.exists()) {
indexFile.delete()
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@

package org.apache.spark.shuffle.sort

import java.io.{File, FileInputStream, FileOutputStream}
import java.io.{DataInputStream, File, FileInputStream, FileOutputStream}

import org.mockito.{Mock, MockitoAnnotations}
import org.mockito.Answers.RETURNS_SMART_NULLS
Expand Down Expand Up @@ -64,6 +64,9 @@ class IndexShuffleBlockResolverSuite extends SparkFunSuite with BeforeAndAfterEa
}

test("commit shuffle files multiple times") {
val shuffleId = 1
val mapId = 2
val idxName = s"shuffle_${shuffleId}_${mapId}_0.index"
val resolver = new IndexShuffleBlockResolver(conf, blockManager)
val lengths = Array[Long](10, 0, 20)
val dataTmp = File.createTempFile("shuffle", null, tempDir)
Expand All @@ -73,9 +76,13 @@ class IndexShuffleBlockResolverSuite extends SparkFunSuite with BeforeAndAfterEa
} {
out.close()
}
resolver.writeIndexFileAndCommit(1, 2, lengths, dataTmp)
resolver.writeIndexFileAndCommit(shuffleId, mapId, lengths, dataTmp)

val dataFile = resolver.getDataFile(1, 2)
val indexFile = new File(tempDir.getAbsolutePath, idxName)
val dataFile = resolver.getDataFile(shuffleId, mapId)

assert(indexFile.exists())
assert(indexFile.length() === (lengths.length + 1) * 8)
assert(dataFile.exists())
assert(dataFile.length() === 30)
assert(!dataTmp.exists())
Expand All @@ -89,26 +96,37 @@ class IndexShuffleBlockResolverSuite extends SparkFunSuite with BeforeAndAfterEa
} {
out2.close()
}
resolver.writeIndexFileAndCommit(1, 2, lengths2, dataTmp2)
resolver.writeIndexFileAndCommit(shuffleId, mapId, lengths2, dataTmp2)

assert(indexFile.length() === (lengths.length + 1) * 8)
assert(lengths2.toSeq === lengths.toSeq)
assert(dataFile.exists())
assert(dataFile.length() === 30)
assert(!dataTmp2.exists())

// The dataFile should be the previous one
val firstByte = new Array[Byte](1)
val in = new FileInputStream(dataFile)
val dataIn = new FileInputStream(dataFile)
Utils.tryWithSafeFinally {
in.read(firstByte)
dataIn.read(firstByte)
} {
in.close()
dataIn.close()
}
assert(firstByte(0) === 0)

// The index file should not change
val indexIn = new DataInputStream(new FileInputStream(indexFile))
Utils.tryWithSafeFinally {
indexIn.readLong() // the first offset is always 0
assert(indexIn.readLong() === 10, "The index file should not change")
} {
indexIn.close()
}

// remove data file
dataFile.delete()

val lengths3 = Array[Long](10, 10, 15)
val lengths3 = Array[Long](7, 10, 15, 3)
val dataTmp3 = File.createTempFile("shuffle", null, tempDir)
val out3 = new FileOutputStream(dataTmp3)
Utils.tryWithSafeFinally {
Expand All @@ -117,20 +135,29 @@ class IndexShuffleBlockResolverSuite extends SparkFunSuite with BeforeAndAfterEa
} {
out3.close()
}
resolver.writeIndexFileAndCommit(1, 2, lengths3, dataTmp3)
resolver.writeIndexFileAndCommit(shuffleId, mapId, lengths3, dataTmp3)
assert(indexFile.length() === (lengths3.length + 1) * 8)
assert(lengths3.toSeq != lengths.toSeq)
assert(dataFile.exists())
assert(dataFile.length() === 35)
assert(!dataTmp2.exists())
assert(!dataTmp3.exists())

// The dataFile should be the previous one
val firstByte2 = new Array[Byte](1)
val in2 = new FileInputStream(dataFile)
// The dataFile should be the new one, since we deleted the dataFile from the first attempt
val dataIn2 = new FileInputStream(dataFile)
Utils.tryWithSafeFinally {
dataIn2.read(firstByte)
} {
dataIn2.close()
}
assert(firstByte(0) === 2)

// The index file should be updated, since we deleted the dataFile from the first attempt
val indexIn2 = new DataInputStream(new FileInputStream(indexFile))
Utils.tryWithSafeFinally {
in2.read(firstByte2)
indexIn2.readLong() // the first offset is always 0
assert(indexIn2.readLong() === 7, "The index file should be updated")
} {
in2.close()
indexIn2.close()
}
assert(firstByte2(0) === 2)
}
}