Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -99,7 +99,8 @@ private[spark] class TorrentBroadcast[T: ClassTag](obj: T, id: Long)
private def calcChecksum(block: ByteBuffer): Int = {
val adler = new Adler32()
if (block.hasArray) {
adler.update(block.array, block.arrayOffset + block.position, block.limit - block.position)
adler.update(block.array, block.arrayOffset + block.position(), block.limit()
- block.position())
} else {
val bytes = new Array[Byte](block.remaining())
block.duplicate.get(bytes)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -452,7 +452,7 @@ private[spark] class Executor(
// TODO: do not serialize value twice
val directResult = new DirectTaskResult(valueBytes, accumUpdates)
val serializedDirectResult = ser.serialize(directResult)
val resultSize = serializedDirectResult.limit
val resultSize = serializedDirectResult.limit()

// directSend = sending directly back to the driver
val serializedResult: ByteBuffer = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -288,13 +288,13 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val rpcEnv: Rp
private def launchTasks(tasks: Seq[Seq[TaskDescription]]) {
for (task <- tasks.flatten) {
val serializedTask = TaskDescription.encode(task)
if (serializedTask.limit >= maxRpcMessageSize) {
if (serializedTask.limit() >= maxRpcMessageSize) {
scheduler.taskIdToTaskSetManager.get(task.taskId).foreach { taskSetMgr =>
try {
var msg = "Serialized task %s:%d was %d bytes, which exceeds max allowed: " +
"spark.rpc.message.maxSize (%d bytes). Consider increasing " +
"spark.rpc.message.maxSize or using broadcast variables for large values."
msg = msg.format(task.taskId, task.index, serializedTask.limit, maxRpcMessageSize)
msg = msg.format(task.taskId, task.index, serializedTask.limit(), maxRpcMessageSize)
taskSetMgr.abort(msg)
} catch {
case e: Exception => logError("Exception in error callback", e)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ class ByteBufferInputStream(private var buffer: ByteBuffer)
override def skip(bytes: Long): Long = {
if (buffer != null) {
val amountToSkip = math.min(bytes, buffer.remaining).toInt
buffer.position(buffer.position + amountToSkip)
buffer.position(buffer.position() + amountToSkip)
if (buffer.remaining() == 0) {
cleanUp()
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@ private[spark] class ChunkedByteBuffer(var chunks: Array[ByteBuffer]) {
for (bytes <- getChunks()) {
while (bytes.remaining() > 0) {
val ioSize = Math.min(bytes.remaining(), bufferWriteChunkSize)
bytes.limit(bytes.position + ioSize)
bytes.limit(bytes.position() + ioSize)
channel.write(bytes)
}
}
Expand Down Expand Up @@ -206,7 +206,7 @@ private[spark] class ChunkedByteBufferInputStream(
override def skip(bytes: Long): Long = {
if (currentChunk != null) {
val amountToSkip = math.min(bytes, currentChunk.remaining).toInt
currentChunk.position(currentChunk.position + amountToSkip)
currentChunk.position(currentChunk.position() + amountToSkip)
if (currentChunk.remaining() == 0) {
if (chunks.hasNext) {
currentChunk = chunks.next()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -199,7 +199,7 @@ class KryoSerializerSuite extends SparkFunSuite with SharedSparkContext {
def check[T: ClassTag](t: T) {
assert(ser.deserialize[T](ser.serialize(t)) === t)
// Check that very long ranges don't get written one element at a time
assert(ser.serialize(t).limit < 100)
assert(ser.serialize(t).limit() < 100)
}
check(1 to 1000000)
check(1 to 1000000 by 2)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -118,7 +118,7 @@ class DiskStoreSuite extends SparkFunSuite {
val chunks = chunkedByteBuffer.chunks
assert(chunks.size === 2)
for (chunk <- chunks) {
assert(chunk.limit === 10 * 1024)
assert(chunk.limit() === 10 * 1024)
}

val e = intercept[IllegalArgumentException]{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -296,7 +296,9 @@ class KafkaTestUtils(withBrokerProps: Map[String, Object] = Map.empty) extends L
props.put("replica.socket.timeout.ms", "1500")
props.put("delete.topic.enable", "true")
props.put("offsets.topic.num.partitions", "1")
props.putAll(withBrokerProps.asJava)
// Can not use properties.putAll(propsMap.asJava) in scala-2.12
// See https://github.com/scala/bug/issues/10418
withBrokerProps.foreach { case (k, v) => props.put(k, v) }
props
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ private[columnar] trait NullableColumnAccessor extends ColumnAccessor {
nextNullIndex = if (nullCount > 0) ByteBufferHelper.getInt(nullsBuffer) else -1
pos = 0

underlyingBuffer.position(underlyingBuffer.position + 4 + nullCount * 4)
underlyingBuffer.position(underlyingBuffer.position() + 4 + nullCount * 4)
super.initialize()
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -112,7 +112,7 @@ private[columnar] case object PassThrough extends CompressionScheme {
var nextNullIndex = if (nullCount > 0) ByteBufferHelper.getInt(nullsBuffer) else capacity
var pos = 0
var seenNulls = 0
var bufferPos = buffer.position
var bufferPos = buffer.position()
while (pos < capacity) {
if (pos != nextNullIndex) {
val len = nextNullIndex - pos
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -412,7 +412,9 @@ case class HiveScriptIOSchema (
propsMap = propsMap + (serdeConstants.LIST_COLUMN_TYPES -> columnTypesNames)

val properties = new Properties()
properties.putAll(propsMap.asJava)
// Can not use properties.putAll(propsMap.asJava) in scala-2.12
// See https://github.com/scala/bug/issues/10418
propsMap.foreach { case (k, v) => properties.put(k, v) }
serde.initialize(null, properties)

serde
Expand All @@ -424,7 +426,9 @@ case class HiveScriptIOSchema (
recordReaderClass.map { klass =>
val instance = Utils.classForName(klass).newInstance().asInstanceOf[RecordReader]
val props = new Properties()
props.putAll(outputSerdeProps.toMap.asJava)
// Can not use props.putAll(outputSerdeProps.toMap.asJava) in scala-2.12
// See https://github.com/scala/bug/issues/10418
outputSerdeProps.toMap.foreach { case (k, v) => props.put(k, v) }
instance.initialize(inputStream, conf, props)
instance
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -98,7 +98,7 @@ class RawNetworkReceiver(host: String, port: Int, storageLevel: StorageLevel)

/** Read a buffer fully from a given Channel */
private def readFully(channel: ReadableByteChannel, dest: ByteBuffer) {
while (dest.position < dest.limit) {
while (dest.position() < dest.limit()) {
if (channel.read(dest) == -1) {
throw new EOFException("End of channel")
}
Expand Down