Skip to content

Commit 9fd0da7

Browse files
SPARK-1729. Use foreach instead of map for all Options.
1 parent 8136aa6 commit 9fd0da7

File tree

4 files changed

+17
-13
lines changed

4 files changed

+17
-13
lines changed

external/flume-sink/src/main/scala/org/apache/spark/flume/sink/SparkAvroCallbackHandler.scala

Lines changed: 10 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -59,12 +59,16 @@ private class SparkAvroCallbackHandler(val threads: Int, val channel: Channel,
5959
val sequenceNumber = seqBase + seqCounter.incrementAndGet()
6060
val processor = new TransactionProcessor(channel, sequenceNumber,
6161
n, transactionTimeout, backOffInterval, this)
62-
transactionExecutorOpt.map(executor => {
62+
transactionExecutorOpt.foreach(executor => {
6363
executor.submit(processor)
6464
})
65-
processorMap.put(sequenceNumber, processor)
66-
// Wait until a batch is available - will be an error if
67-
processor.getEventBatch
65+
// Wait until a batch is available - will be an error if error message is non-empty
66+
val batch = processor.getEventBatch
67+
if (batch.getErrorMsg != null && !batch.getErrorMsg.equals("")) {
68+
processorMap.put(sequenceNumber, processor)
69+
}
70+
71+
batch
6872
}
6973

7074
/**
@@ -93,7 +97,7 @@ private class SparkAvroCallbackHandler(val threads: Int, val channel: Channel,
9397
* @param success Whether the batch was successful or not.
9498
*/
9599
private def completeTransaction(sequenceNumber: CharSequence, success: Boolean) {
96-
Option(removeAndGetProcessor(sequenceNumber)).map(processor => {
100+
Option(removeAndGetProcessor(sequenceNumber)).foreach(processor => {
97101
processor.batchProcessed(success)
98102
})
99103
}
@@ -112,7 +116,7 @@ private class SparkAvroCallbackHandler(val threads: Int, val channel: Channel,
112116
* Shuts down the executor used to process transactions.
113117
*/
114118
def shutdown() {
115-
transactionExecutorOpt.map(executor => {
119+
transactionExecutorOpt.foreach(executor => {
116120
executor.shutdownNow()
117121
})
118122
}

external/flume-sink/src/main/scala/org/apache/spark/flume/sink/SparkSink.scala

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -84,7 +84,7 @@ class SparkSink extends AbstractSink with Configurable {
8484
// dependencies which are being excluded in the build. In practice,
8585
// Netty dependencies are already available on the JVM as Flume would have pulled them in.
8686
serverOpt = Option(new NettyServer(responder, new InetSocketAddress(hostname, port)))
87-
serverOpt.map(server => {
87+
serverOpt.foreach(server => {
8888
LOG.info("Starting Avro server for sink: " + getName)
8989
server.start()
9090
})
@@ -93,10 +93,10 @@ class SparkSink extends AbstractSink with Configurable {
9393

9494
override def stop() {
9595
LOG.info("Stopping Spark Sink: " + getName)
96-
handler.map(callbackHandler => {
96+
handler.foreach(callbackHandler => {
9797
callbackHandler.shutdown()
9898
})
99-
serverOpt.map(server => {
99+
serverOpt.foreach(server => {
100100
LOG.info("Stopping Avro Server for sink: " + getName)
101101
server.close()
102102
server.join()

external/flume-sink/src/main/scala/org/apache/spark/flume/sink/TransactionProcessor.scala

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -106,7 +106,7 @@ private class TransactionProcessor(val channel: Channel, val seqNum: String,
106106
eventBatch.setErrorMsg("Something went wrong. Channel was " +
107107
"unable to create a transaction!")
108108
}
109-
txOpt.map(tx => {
109+
txOpt.foreach(tx => {
110110
tx.begin()
111111
val events = new util.ArrayList[SparkSinkEvent](maxBatchSize)
112112
val loop = new Breaks
@@ -145,7 +145,7 @@ private class TransactionProcessor(val channel: Channel, val seqNum: String,
145145
LOG.error("Error while processing transaction.", e)
146146
eventBatch.setErrorMsg(e.getMessage)
147147
try {
148-
txOpt.map(tx => {
148+
txOpt.foreach(tx => {
149149
rollbackAndClose(tx, close = true)
150150
})
151151
} finally {
@@ -163,7 +163,7 @@ private class TransactionProcessor(val channel: Channel, val seqNum: String,
163163
*/
164164
private def processAckOrNack() {
165165
batchAckLatch.await(transactionTimeout, TimeUnit.SECONDS)
166-
txOpt.map(tx => {
166+
txOpt.foreach(tx => {
167167
if (batchSuccess) {
168168
try {
169169
tx.commit()

external/flume/src/main/scala/org/apache/spark/streaming/flume/FlumePollingInputDStream.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -151,7 +151,7 @@ private[streaming] class FlumePollingReceiver(
151151
override def onStop(): Unit = {
152152
logInfo("Shutting down Flume Polling Receiver")
153153
receiverExecutor.shutdownNow()
154-
connections.map(connection => {
154+
connections.foreach(connection => {
155155
connection.transceiver.close()
156156
})
157157
channelFactory.releaseExternalResources()

0 commit comments

Comments
 (0)