Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 5 additions & 5 deletions core/src/main/scala/org/apache/spark/ContextCleaner.scala
Original file line number Diff line number Diff line change
Expand Up @@ -278,9 +278,9 @@ private object ContextCleaner {
* Listener class used for testing when any item has been cleaned by the Cleaner class.
*/
private[spark] trait CleanerListener {
def rddCleaned(rddId: Int)
def shuffleCleaned(shuffleId: Int)
def broadcastCleaned(broadcastId: Long)
def accumCleaned(accId: Long)
def checkpointCleaned(rddId: Long)
def rddCleaned(rddId: Int): Unit
def shuffleCleaned(shuffleId: Int): Unit
def broadcastCleaned(broadcastId: Long): Unit
def accumCleaned(accId: Long): Unit
def checkpointCleaned(rddId: Long): Unit
}
4 changes: 2 additions & 2 deletions core/src/main/scala/org/apache/spark/FutureAction.scala
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ trait FutureAction[T] extends Future[T] {
/**
* Cancels the execution of this action.
*/
def cancel()
def cancel(): Unit

/**
* Blocks until this action completes.
Expand All @@ -65,7 +65,7 @@ trait FutureAction[T] extends Future[T] {
* When this action is completed, either through an exception, or a value, applies the provided
* function.
*/
def onComplete[U](func: (Try[T]) => U)(implicit executor: ExecutionContext)
def onComplete[U](func: (Try[T]) => U)(implicit executor: ExecutionContext): Unit

/**
* Returns whether the action has already been completed with a value or an exception.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,8 @@ private[spark] trait AppClientListener {
/** An application death is an unrecoverable failure condition. */
def dead(reason: String): Unit

def executorAdded(fullId: String, workerId: String, hostPort: String, cores: Int, memory: Int)
def executorAdded(
fullId: String, workerId: String, hostPort: String, cores: Int, memory: Int): Unit

def executorRemoved(fullId: String, message: String, exitStatus: Option[Int]): Unit
}
Original file line number Diff line number Diff line change
Expand Up @@ -32,8 +32,8 @@ trait LeaderElectionAgent {

@DeveloperApi
trait LeaderElectable {
def electedLeader()
def revokedLeadership()
def electedLeader(): Unit
def revokedLeadership(): Unit
}

/** Single-node implementation of LeaderElectionAgent -- we're initially and always the leader. */
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,12 +40,12 @@ abstract class PersistenceEngine {
* Defines how the object is serialized and persisted. Implementation will
* depend on the store used.
*/
def persist(name: String, obj: Object)
def persist(name: String, obj: Object): Unit

/**
* Defines how the object referred by its name is removed from the store.
*/
def unpersist(name: String)
def unpersist(name: String): Unit

/**
* Gives all objects, matching a prefix. This defines how objects are
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -218,7 +218,7 @@ private[deploy] class DriverRunner(
}

private[deploy] trait Sleeper {
def sleep(seconds: Int)
def sleep(seconds: Int): Unit
}

// Needed because ProcessBuilder is a final class and cannot be mocked
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,6 @@ import org.apache.spark.TaskState.TaskState
* A pluggable interface used by the Executor to send updates to the cluster scheduler.
*/
private[spark] trait ExecutorBackend {
def statusUpdate(taskId: Long, state: TaskState, data: ByteBuffer)
def statusUpdate(taskId: Long, state: TaskState, data: ByteBuffer): Unit
}

Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ abstract class BlockTransferService extends ShuffleClient with Closeable with Lo
* Initialize the transfer service by giving it the BlockDataManager that can be used to fetch
* local blocks or put local blocks.
*/
def init(blockDataManager: BlockDataManager)
def init(blockDataManager: BlockDataManager): Unit

/**
* Tear down the transfer service.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,6 @@ package org.apache.spark.scheduler
* job fails (and no further taskSucceeded events will happen).
*/
private[spark] trait JobListener {
def taskSucceeded(index: Int, result: Any)
def jobFailed(exception: Exception)
def taskSucceeded(index: Int, result: Any): Unit
def jobFailed(exception: Exception): Unit
}
Original file line number Diff line number Diff line change
Expand Up @@ -34,9 +34,9 @@ import org.apache.spark.util.Utils
private[spark] trait SchedulableBuilder {
def rootPool: Pool

def buildPools()
def buildPools(): Unit

def addTaskSetManager(manager: Schedulable, properties: Properties)
def addTaskSetManager(manager: Schedulable, properties: Properties): Unit
}

private[spark] class FIFOSchedulableBuilder(val rootPool: Pool)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ private[spark] trait TaskScheduler {
def submitTasks(taskSet: TaskSet): Unit

// Cancel a stage.
def cancelTasks(stageId: Int, interruptThread: Boolean)
def cancelTasks(stageId: Int, interruptThread: Boolean): Unit

// Set the DAG scheduler for upcalls. This is guaranteed to be set before submitTasks is called.
def setDAGScheduler(dagScheduler: DAGScheduler): Unit
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -357,7 +357,7 @@ private[spark] class KryoSerializerInstance(ks: KryoSerializer) extends Serializ
* serialization.
*/
trait KryoRegistrator {
def registerClasses(kryo: Kryo)
def registerClasses(kryo: Kryo): Unit
}

private[serializer] object KryoSerializer {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ private[spark] trait ShuffleWriterGroup {
val writers: Array[DiskBlockObjectWriter]

/** @param success Indicates all writes were successful. If false, no blocks will be recorded. */
def releaseWriters(success: Boolean)
def releaseWriters(success: Boolean): Unit
}

/**
Expand Down
2 changes: 1 addition & 1 deletion core/src/main/scala/org/apache/spark/ui/WebUI.scala
Original file line number Diff line number Diff line change
Expand Up @@ -129,7 +129,7 @@ private[spark] abstract class WebUI(
}

/** Initialize all components of the server. */
def initialize()
def initialize(): Unit

/** Bind to the HTTP server behind this web interface. */
def bind() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,10 +32,10 @@ private[spark] trait RollingPolicy {
def shouldRollover(bytesToBeWritten: Long): Boolean

/** Notify that rollover has occurred */
def rolledOver()
def rolledOver(): Unit

/** Notify that bytes have been written */
def bytesWritten(bytes: Long)
def bytesWritten(bytes: Long): Unit

/** Get the desired name of the rollover file */
def generateRolledOverFileSuffix(): String
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,5 +26,5 @@ import org.apache.spark.annotation.DeveloperApi
@DeveloperApi
trait Pseudorandom {
/** Set random seed. */
def setSeed(seed: Long)
def setSeed(seed: Long): Unit
}
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@ import org.apache.spark.sql.types._
abstract class MutableValue extends Serializable {
var isNull: Boolean = true
def boxed: Any
def update(v: Any)
def update(v: Any): Unit
def copy(): MutableValue
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -164,7 +164,7 @@ trait BaseGenericInternalRow extends InternalRow {
abstract class MutableRow extends InternalRow {
def setNullAt(i: Int): Unit

def update(i: Int, value: Any)
def update(i: Int, value: Any): Unit

// default implementation (slow)
def setBoolean(i: Int, value: Boolean): Unit = { update(i, value) }
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ private[columnar] trait ColumnAccessor {

def hasNext: Boolean

def extractTo(row: MutableRow, ordinal: Int)
def extractTo(row: MutableRow, ordinal: Int): Unit

protected def underlyingBuffer: ByteBuffer
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,12 +28,12 @@ private[columnar] trait ColumnBuilder {
/**
* Initializes with an approximate lower bound on the expected number of elements in this column.
*/
def initialize(initialSize: Int, columnName: String = "", useCompression: Boolean = false)
def initialize(initialSize: Int, columnName: String = "", useCompression: Boolean = false): Unit

/**
* Appends `row(ordinal)` to the column builder.
*/
def appendFrom(row: InternalRow, ordinal: Int)
def appendFrom(row: InternalRow, ordinal: Int): Unit

/**
* Column statistics information
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ trait StateStore {
def get(key: UnsafeRow): Option[UnsafeRow]

/** Put a new value for a key. */
def put(key: UnsafeRow, value: UnsafeRow)
def put(key: UnsafeRow, value: UnsafeRow): Unit

/**
* Remove keys that match the following condition.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ abstract class ContinuousQueryListener {
* `DataFrameWriter.startStream()` returns the corresponding [[ContinuousQuery]]. Please
* don't block this method as it will block your query.
*/
def onQueryStarted(queryStarted: QueryStarted)
def onQueryStarted(queryStarted: QueryStarted): Unit

/**
* Called when there is some status update (ingestion rate updated, etc.)
Expand All @@ -47,10 +47,10 @@ abstract class ContinuousQueryListener {
* may be changed before/when you process the event. E.g., you may find [[ContinuousQuery]]
* is terminated when you are processing [[QueryProgress]].
*/
def onQueryProgress(queryProgress: QueryProgress)
def onQueryProgress(queryProgress: QueryProgress): Unit

/** Called when a query is stopped, with or without error */
def onQueryTerminated(queryTerminated: QueryTerminated)
def onQueryTerminated(queryTerminated: QueryTerminated): Unit
}


Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -107,8 +107,8 @@ abstract class InputDStream[T: ClassTag](_ssc: StreamingContext)
}

/** Method called to start receiving data. Subclasses must implement this method. */
def start()
def start(): Unit

/** Method called to stop receiving data. Subclasses must implement this method. */
def stop()
def stop(): Unit
}
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ private[streaming] trait BlockGeneratorListener {
* that will be useful when a block is generated. Any long blocking operation in this callback
* will hurt the throughput.
*/
def onAddData(data: Any, metadata: Any)
def onAddData(data: Any, metadata: Any): Unit

/**
* Called when a new block of data is generated by the block generator. The block generation
Expand All @@ -47,21 +47,21 @@ private[streaming] trait BlockGeneratorListener {
* be useful when the block has been successfully stored. Any long blocking operation in this
* callback will hurt the throughput.
*/
def onGenerateBlock(blockId: StreamBlockId)
def onGenerateBlock(blockId: StreamBlockId): Unit

/**
* Called when a new block is ready to be pushed. Callers are supposed to store the block into
* Spark in this method. Internally this is called from a single
* thread, that is not synchronized with any other callbacks. Hence it is okay to do long
* blocking operation in this callback.
*/
def onPushBlock(blockId: StreamBlockId, arrayBuffer: ArrayBuffer[_])
def onPushBlock(blockId: StreamBlockId, arrayBuffer: ArrayBuffer[_]): Unit

/**
* Called when an error has occurred in the BlockGenerator. Can be called form many places
* so better to not do any long block operation in this callback.
*/
def onError(message: String, throwable: Throwable)
def onError(message: String, throwable: Throwable): Unit
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ private[streaming] trait ReceivedBlockHandler {
def storeBlock(blockId: StreamBlockId, receivedBlock: ReceivedBlock): ReceivedBlockStoreResult

/** Cleanup old blocks older than the given threshold time */
def cleanupOldBlocks(threshTime: Long)
def cleanupOldBlocks(threshTime: Long): Unit
}


Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -99,13 +99,13 @@ abstract class Receiver[T](val storageLevel: StorageLevel) extends Serializable
* (iii) `restart(...)` can be called to restart the receiver. This will call `onStop()`
* immediately, and then `onStart()` after a delay.
*/
def onStart()
def onStart(): Unit

/**
* This method is called by the system when the receiver is stopped. All resources
* (threads, buffers, etc.) set up in `onStart()` must be cleaned up in this method.
*/
def onStop()
def onStop(): Unit

/** Override this to specify a preferred location (hostname). */
def preferredLocation: Option[String] = None
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -70,28 +70,28 @@ private[streaming] abstract class ReceiverSupervisor(
@volatile private[streaming] var receiverState = Initialized

/** Push a single data item to backend data store. */
def pushSingle(data: Any)
def pushSingle(data: Any): Unit

/** Store the bytes of received data as a data block into Spark's memory. */
def pushBytes(
bytes: ByteBuffer,
optionalMetadata: Option[Any],
optionalBlockId: Option[StreamBlockId]
)
): Unit

/** Store a iterator of received data as a data block into Spark's memory. */
def pushIterator(
iterator: Iterator[_],
optionalMetadata: Option[Any],
optionalBlockId: Option[StreamBlockId]
)
): Unit

/** Store an ArrayBuffer of received data as a data block into Spark's memory. */
def pushArrayBuffer(
arrayBuffer: ArrayBuffer[_],
optionalMetadata: Option[Any],
optionalBlockId: Option[StreamBlockId]
)
): Unit

/**
* Create a custom [[BlockGenerator]] that the receiver implementation can directly control
Expand All @@ -103,7 +103,7 @@ private[streaming] abstract class ReceiverSupervisor(
def createBlockGenerator(blockGeneratorListener: BlockGeneratorListener): BlockGenerator

/** Report errors. */
def reportError(message: String, throwable: Throwable)
def reportError(message: String, throwable: Throwable): Unit

/**
* Called when supervisor is started.
Expand Down