Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ import org.apache.spark.sql.streaming.Trigger
class KafkaContinuousSourceSuite extends KafkaSourceSuiteBase with KafkaContinuousTest {
import testImplicits._

override val streamingTimeout = 60.seconds
override val streamingTimeout = 90.seconds

test("read Kafka transactional messages: read_committed") {
val table = "kafka_continuous_source_test"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2270,12 +2270,12 @@ class TaskSchedulerImplSuite extends SparkFunSuite with LocalSparkContext
// 1 executor with 4 GPUS
Seq(true, false).foreach { barrierMode =>
val barrier = if (barrierMode) "barrier" else ""
scala.util.Random.shuffle((1 to 20).toList).take(5).foreach { taskNum =>
scala.util.Random.shuffle((1 to 12).toList).take(3).foreach { taskNum =>
val gpuTaskAmount = ResourceAmountUtils.toFractionalResource(ONE_ENTIRE_RESOURCE / taskNum)
test(s"SPARK-45527 default rp with task.gpu.amount=${gpuTaskAmount} can " +
s"restrict $taskNum $barrier tasks run in the same executor") {
val taskCpus = 1
val executorCpus = 100 // cpu will not limit the concurrent tasks number
val executorCpus = 50 // cpu will not limit the concurrent tasks number
val executorGpus = 1

val taskScheduler = setupScheduler(numCores = executorCpus,
Expand Down Expand Up @@ -2317,12 +2317,12 @@ class TaskSchedulerImplSuite extends SparkFunSuite with LocalSparkContext
// 4 executors, each of which has 1 GPU
Seq(true, false).foreach { barrierMode =>
val barrier = if (barrierMode) "barrier" else ""
scala.util.Random.shuffle((1 to 20).toList).take(5).foreach { taskNum =>
scala.util.Random.shuffle((1 to 12).toList).take(3).foreach { taskNum =>
val gpuTaskAmount = ResourceAmountUtils.toFractionalResource(ONE_ENTIRE_RESOURCE / taskNum)
test(s"SPARK-45527 default rp with task.gpu.amount=${gpuTaskAmount} can " +
s"restrict $taskNum $barrier tasks run on the different executor") {
val taskCpus = 1
val executorCpus = 100 // cpu will not limit the concurrent tasks number
val executorCpus = 50 // cpu will not limit the concurrent tasks number
val executorGpus = 1

val taskScheduler = setupScheduler(numCores = executorCpus,
Expand Down Expand Up @@ -2374,11 +2374,11 @@ class TaskSchedulerImplSuite extends SparkFunSuite with LocalSparkContext
// 1 executor with 4 GPUS
Seq(true, false).foreach { barrierMode =>
val barrier = if (barrierMode) "barrier" else ""
scala.util.Random.shuffle((1 to 20).toList).take(5).foreach { taskNum =>
scala.util.Random.shuffle((1 to 12).toList).take(3).foreach { taskNum =>
val gpuTaskAmount = ResourceAmountUtils.toFractionalResource(ONE_ENTIRE_RESOURCE / taskNum)
test(s"SPARK-45527 TaskResourceProfile with task.gpu.amount=${gpuTaskAmount} can " +
s"restrict $taskNum $barrier tasks run in the same executor") {
val executorCpus = 100 // cpu will not limit the concurrent tasks number
val executorCpus = 50 // cpu will not limit the concurrent tasks number

val taskScheduler = setupScheduler(numCores = executorCpus,
config.CPUS_PER_TASK.key -> "1",
Expand Down Expand Up @@ -2423,11 +2423,11 @@ class TaskSchedulerImplSuite extends SparkFunSuite with LocalSparkContext
// 4 executors, each of which has 1 GPU
Seq(true, false).foreach { barrierMode =>
val barrier = if (barrierMode) "barrier" else ""
scala.util.Random.shuffle((1 to 20).toList).take(5).foreach { taskNum =>
scala.util.Random.shuffle((1 to 12).toList).take(3).foreach { taskNum =>
val gpuTaskAmount = ResourceAmountUtils.toFractionalResource(ONE_ENTIRE_RESOURCE / taskNum)
test(s"SPARK-45527 TaskResourceProfile with task.gpu.amount=${gpuTaskAmount} can " +
s"restrict $taskNum $barrier tasks run on the different executor") {
val executorCpus = 100 // cpu will not limit the concurrent tasks number
val executorCpus = 50 // cpu will not limit the concurrent tasks number

val taskScheduler = setupScheduler(numCores = executorCpus,
config.CPUS_PER_TASK.key -> "1",
Expand Down Expand Up @@ -2489,7 +2489,7 @@ class TaskSchedulerImplSuite extends SparkFunSuite with LocalSparkContext
val taskCpus = 1
val taskGpus = 0.3
val executorGpus = 4
val executorCpus = 100
val executorCpus = 50

// each tasks require 0.3 gpu
val taskScheduler = setupScheduler(numCores = executorCpus,
Expand Down Expand Up @@ -2545,7 +2545,7 @@ class TaskSchedulerImplSuite extends SparkFunSuite with LocalSparkContext
val taskCpus = 1
val taskGpus = 0.3
val executorGpus = 4
val executorCpus = 1000
val executorCpus = 50

// each tasks require 0.3 gpu
val taskScheduler = setupScheduler(numCores = executorCpus,
Expand All @@ -2554,27 +2554,27 @@ class TaskSchedulerImplSuite extends SparkFunSuite with LocalSparkContext
EXECUTOR_GPU_ID.amountConf -> executorGpus.toString,
config.EXECUTOR_CORES.key -> executorCpus.toString
)
val lowerTaskSet = FakeTask.createTaskSet(100, 1, 0, 1,
val lowerTaskSet = FakeTask.createTaskSet(30, 1, 0, 1,
ResourceProfile.DEFAULT_RESOURCE_PROFILE_ID)

// each task require 0.7 gpu
val treqs = new TaskResourceRequests().cpus(1).resource(GPU, 0.7)
val rp = new TaskResourceProfile(treqs.requests)
taskScheduler.sc.resourceProfileManager.addResourceProfile(rp)

val higherRpTaskSet = FakeTask.createTaskSet(1000, stageId = 2, stageAttemptId = 0,
val higherRpTaskSet = FakeTask.createTaskSet(50, stageId = 2, stageAttemptId = 0,
priority = 0, rpId = rp.id)

val workerOffers =
IndexedSeq(
// cpu won't be a problem
WorkerOffer("executor0", "host0", 1000, None, new ExecutorResourcesAmounts(
WorkerOffer("executor0", "host0", 50, None, new ExecutorResourcesAmounts(
Map(GPU -> toInternalResource(Map("0" -> 1.0))))),
WorkerOffer("executor1", "host1", 1000, None, new ExecutorResourcesAmounts(
WorkerOffer("executor1", "host1", 50, None, new ExecutorResourcesAmounts(
Map(GPU -> toInternalResource(Map("1" -> 1.0))))),
WorkerOffer("executor2", "host2", 1000, None, new ExecutorResourcesAmounts(
WorkerOffer("executor2", "host2", 50, None, new ExecutorResourcesAmounts(
Map(GPU -> toInternalResource(Map("2" -> 1.0))))),
WorkerOffer("executor3", "host3", 1000, None, new ExecutorResourcesAmounts(
WorkerOffer("executor3", "host3", 50, None, new ExecutorResourcesAmounts(
Map(GPU -> toInternalResource(Map("3" -> 1.0)))))
)

Expand Down