Skip to content

Commit a4a5212

Browse files
[streaming] SPARK-10955. Disable dynamic allocation for Streaming applications.
Dynamic allocation can be painful for streaming apps and can lose data. The one drawback though is that apps which run Streaming and non-streaming form the same context will also end up not being able to do dynamic allocation.
1 parent e978360 commit a4a5212

File tree

1 file changed

+10
-1
lines changed

1 file changed

+10
-1
lines changed

streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala

Lines changed: 10 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -44,7 +44,7 @@ import org.apache.spark.streaming.dstream._
4444
import org.apache.spark.streaming.receiver.{ActorReceiver, ActorSupervisorStrategy, Receiver}
4545
import org.apache.spark.streaming.scheduler.{JobScheduler, StreamingListener}
4646
import org.apache.spark.streaming.ui.{StreamingJobProgressListener, StreamingTab}
47-
import org.apache.spark.util.{CallSite, ShutdownHookManager, ThreadUtils}
47+
import org.apache.spark.util.{Utils, CallSite, ShutdownHookManager, ThreadUtils}
4848

4949
/**
5050
* Main entry point for Spark Streaming functionality. It provides methods used to create
@@ -564,6 +564,15 @@ class StreamingContext private[streaming] (
564564
)
565565
}
566566
}
567+
568+
if (Utils.isDynamicAllocationEnabled(sc.conf)) {
569+
val maxExecutors = sc.conf.getInt("spark.dynamicAllocation.maxExecutors", 2)
570+
sc.conf.set("spark.dynamicAllocation.enabled", false.toString)
571+
sc.conf.set("spark.executor.instances", maxExecutors.toString)
572+
logWarning("Dynamic allocation is not supported with Spark Streaming currently, since it " +
573+
s"could lead to data loss in some cases. " +
574+
s"The number of executors is being set to $maxExecutors")
575+
}
567576
}
568577

569578
/**

0 commit comments

Comments
 (0)