Skip to content
Closed
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -99,7 +99,8 @@ private[spark] class TorrentBroadcast[T: ClassTag](obj: T, id: Long)
private def calcChecksum(block: ByteBuffer): Int = {
val adler = new Adler32()
if (block.hasArray) {
adler.update(block.array, block.arrayOffset + block.position, block.limit - block.position)
adler.update(block.array, block.arrayOffset + block.position(), block.limit()
- block.position())
} else {
val bytes = new Array[Byte](block.remaining())
block.duplicate.get(bytes)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -452,7 +452,7 @@ private[spark] class Executor(
// TODO: do not serialize value twice
val directResult = new DirectTaskResult(valueBytes, accumUpdates)
val serializedDirectResult = ser.serialize(directResult)
val resultSize = serializedDirectResult.limit
val resultSize = serializedDirectResult.limit()

// directSend = sending directly back to the driver
val serializedResult: ByteBuffer = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -288,13 +288,13 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val rpcEnv: Rp
private def launchTasks(tasks: Seq[Seq[TaskDescription]]) {
for (task <- tasks.flatten) {
val serializedTask = TaskDescription.encode(task)
if (serializedTask.limit >= maxRpcMessageSize) {
if (serializedTask.limit() >= maxRpcMessageSize) {
scheduler.taskIdToTaskSetManager.get(task.taskId).foreach { taskSetMgr =>
try {
var msg = "Serialized task %s:%d was %d bytes, which exceeds max allowed: " +
"spark.rpc.message.maxSize (%d bytes). Consider increasing " +
"spark.rpc.message.maxSize or using broadcast variables for large values."
msg = msg.format(task.taskId, task.index, serializedTask.limit, maxRpcMessageSize)
msg = msg.format(task.taskId, task.index, serializedTask.limit(), maxRpcMessageSize)
taskSetMgr.abort(msg)
} catch {
case e: Exception => logError("Exception in error callback", e)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ class ByteBufferInputStream(private var buffer: ByteBuffer)
override def skip(bytes: Long): Long = {
if (buffer != null) {
val amountToSkip = math.min(bytes, buffer.remaining).toInt
buffer.position(buffer.position + amountToSkip)
buffer.position(buffer.position() + amountToSkip)
if (buffer.remaining() == 0) {
cleanUp()
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@ private[spark] class ChunkedByteBuffer(var chunks: Array[ByteBuffer]) {
for (bytes <- getChunks()) {
while (bytes.remaining() > 0) {
val ioSize = Math.min(bytes.remaining(), bufferWriteChunkSize)
bytes.limit(bytes.position + ioSize)
bytes.limit(bytes.position() + ioSize)
channel.write(bytes)
}
}
Expand Down Expand Up @@ -206,7 +206,7 @@ private[spark] class ChunkedByteBufferInputStream(
override def skip(bytes: Long): Long = {
if (currentChunk != null) {
val amountToSkip = math.min(bytes, currentChunk.remaining).toInt
currentChunk.position(currentChunk.position + amountToSkip)
currentChunk.position(currentChunk.position() + amountToSkip)
if (currentChunk.remaining() == 0) {
if (chunks.hasNext) {
currentChunk = chunks.next()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -199,7 +199,7 @@ class KryoSerializerSuite extends SparkFunSuite with SharedSparkContext {
def check[T: ClassTag](t: T) {
assert(ser.deserialize[T](ser.serialize(t)) === t)
// Check that very long ranges don't get written one element at a time
assert(ser.serialize(t).limit < 100)
assert(ser.serialize(t).limit() < 100)
}
check(1 to 1000000)
check(1 to 1000000 by 2)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -118,7 +118,7 @@ class DiskStoreSuite extends SparkFunSuite {
val chunks = chunkedByteBuffer.chunks
assert(chunks.size === 2)
for (chunk <- chunks) {
assert(chunk.limit === 10 * 1024)
assert(chunk.limit() === 10 * 1024)
}

val e = intercept[IllegalArgumentException]{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -296,7 +296,8 @@ class KafkaTestUtils(withBrokerProps: Map[String, Object] = Map.empty) extends L
props.put("replica.socket.timeout.ms", "1500")
props.put("delete.topic.enable", "true")
props.put("offsets.topic.num.partitions", "1")
props.putAll(withBrokerProps.asJava)
// props.putAll(withBrokerProps.asJava)
withBrokerProps.foreach{ case (k, v) => props.put(k, v) }

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this related?

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

scala/bug#10418. I see.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes, the change to the putAll is similar as the solution provided in scala/bug#10418

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: h{ -> h {

props
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ private[columnar] trait NullableColumnAccessor extends ColumnAccessor {
nextNullIndex = if (nullCount > 0) ByteBufferHelper.getInt(nullsBuffer) else -1
pos = 0

underlyingBuffer.position(underlyingBuffer.position + 4 + nullCount * 4)
underlyingBuffer.position(underlyingBuffer.position() + 4 + nullCount * 4)
super.initialize()
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -112,7 +112,7 @@ private[columnar] case object PassThrough extends CompressionScheme {
var nextNullIndex = if (nullCount > 0) ByteBufferHelper.getInt(nullsBuffer) else capacity
var pos = 0
var seenNulls = 0
var bufferPos = buffer.position
var bufferPos = buffer.position()
while (pos < capacity) {
if (pos != nextNullIndex) {
val len = nextNullIndex - pos
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -412,7 +412,9 @@ case class HiveScriptIOSchema (
propsMap = propsMap + (serdeConstants.LIST_COLUMN_TYPES -> columnTypesNames)

val properties = new Properties()
properties.putAll(propsMap.asJava)
// Can not use properties.putAll(propsMap.asJava) in scala-2.12
// See https://github.com/scala/bug/issues/10418
propsMap.foreach { case (k, v) => properties.put(k, v) }
serde.initialize(null, properties)

serde
Expand All @@ -424,7 +426,9 @@ case class HiveScriptIOSchema (
recordReaderClass.map { klass =>
val instance = Utils.classForName(klass).newInstance().asInstanceOf[RecordReader]
val props = new Properties()
props.putAll(outputSerdeProps.toMap.asJava)
// Can not use props.putAll(outputSerdeProps.toMap.asJava) in scala-2.12
// See https://github.com/scala/bug/issues/10418
outputSerdeProps.toMap.foreach { case (k, v) => props.put(k, v) }
instance.initialize(inputStream, conf, props)
instance
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -98,7 +98,7 @@ class RawNetworkReceiver(host: String, port: Int, storageLevel: StorageLevel)

/** Read a buffer fully from a given Channel */
private def readFully(channel: ReadableByteChannel, dest: ByteBuffer) {
while (dest.position < dest.limit) {
while (dest.position() < dest.limit()) {
if (channel.read(dest) == -1) {
throw new EOFException("End of channel")
}
Expand Down