Skip to content

Commit

Permalink
fix(scheduler): add fair scheduler and a new parallel mode to metorikku
Browse files Browse the repository at this point in the history
  • Loading branch information
lyogev committed May 4, 2020
1 parent 83faf0d commit 259c941
Show file tree
Hide file tree
Showing 9 changed files with 52 additions and 11 deletions.
5 changes: 4 additions & 1 deletion build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,9 @@ lazy val excludeAvro = ExclusionRule(organization = "org.apache.avro", name = "a
lazy val excludeSpark = ExclusionRule(organization = "org.apache.spark")
lazy val excludeFasterXML = ExclusionRule(organization = "com.fasterxml.jackson.module", name= "jackson-module-scala_2.12")
lazy val excludeMetricsCore = ExclusionRule(organization = "io.dropwizard.metrics", name= "metrics-core")
lazy val excludeLog4j = ExclusionRule(organization = "org.apache.logging.log4j")
lazy val excludeParquet = ExclusionRule(organization = "org.apache.parquet")


libraryDependencies ++= Seq(
"org.apache.spark" %% "spark-core" % sparkVersion % "provided",
Expand Down Expand Up @@ -65,7 +68,7 @@ libraryDependencies ++= Seq(
"org.apache.hudi" %% "hudi-spark-bundle" % "0.5.2-incubating" % "provided" excludeAll excludeFasterXML,
"org.apache.parquet" % "parquet-avro" % "1.10.1" % "provided",
"org.apache.avro" % "avro" % "1.8.2" % "provided",
"org.apache.hive" % "hive-jdbc" % "2.3.3" % "provided" excludeAll(excludeNetty, excludeNettyAll)
"org.apache.hive" % "hive-jdbc" % "2.3.3" % "provided" excludeAll(excludeLog4j, excludeParquet, excludeNetty, excludeNettyAll)
)

// Temporary fix for https://github.com/databricks/spark-redshift/issues/315#issuecomment-285294306
Expand Down
1 change: 1 addition & 0 deletions docker/spark/Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,7 @@ RUN rm -f /spark/jars/guava-*.jar
ADD log4j.json.properties /spark/conf/
ADD spark-defaults.conf /spark/conf/
ADD spark-env.sh /spark/conf/
ADD fairscheduler.xml /spark/conf/

ENV PYTHONHASHSEED 1
ENV SPARK_HOME /spark
Expand Down
7 changes: 7 additions & 0 deletions docker/spark/fairscheduler.xml
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
<allocations>
<pool name="default">
<schedulingMode>FAIR</schedulingMode>
<weight>1</weight>
<minShare>2</minShare>
</pool>
</allocations>
6 changes: 4 additions & 2 deletions docker/spark/scripts/entrypoint-submit.sh
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ SPARK_MASTER_HOST=${SPARK_MASTER_HOST:=spark-master}
MAX_RETRIES=${MAX_RETRIES:=300}
MIN_WORKERS=${MIN_WORKERS:=1}
SPARK_UI_PORT=${SPARK_UI_PORT:=4040}
SHUFFLE_SERVICE_PORT=${SHUFFLE_SERVICE_PORT:=7337}
POST_SCRIPT=${POST_SCRIPT:=/scripts/finish-submit.sh}

# Atlas
Expand Down Expand Up @@ -36,8 +37,9 @@ fi
# Run command
SPARK_MASTER="spark://${SPARK_MASTER_HOST}:${SPARK_MASTER_PORT}"
echo -e "
spark.master $SPARK_MASTER
spark.ui.port $SPARK_UI_PORT
spark.master=$SPARK_MASTER
spark.ui.port=$SPARK_UI_PORT
spark.shuffle.service.port=$SHUFFLE_SERVICE_PORT
" >> /spark/conf/spark-defaults.conf

if [[ ! -z ${HIVE_METASTORE_URI} ]]; then
Expand Down
9 changes: 7 additions & 2 deletions docker/spark/scripts/entrypoint-worker.sh
Original file line number Diff line number Diff line change
Expand Up @@ -5,16 +5,21 @@ SPARK_MASTER_PORT=${SPARK_MASTER_PORT:=7077}
SPARK_WORKER_WEBUI_PORT=${SPARK_WORKER_WEBUI_PORT:=8081}
SPARK_MASTER_HOST=${SPARK_MASTER_HOST:=spark-master}
SPARK_WORKER_PORT=${SPARK_WORKER_PORT:=7078}
SPARK_WORKER_OPTS=${SPARK_WORKER_OPTS:="-Dspark.worker.cleanup.enabled=true -Dspark.worker.cleanup.appDataTtl=1800"}
SHUFFLE_SERVICE_PORT=${SHUFFLE_SERVICE_PORT:=7337}
SPARK_WORKER_OPTS=${SPARK_WORKER_OPTS:="-Dspark.shuffle.service.enabled=true -Dspark.worker.cleanup.enabled=true -Dspark.worker.cleanup.appDataTtl=1800"}

# Logs
/scripts/init-logs-metrics.sh

# Monitor Logs
/scripts/monitor-executor-logs.sh &

echo -e "
spark.shuffle.service.port=$SHUFFLE_SERVICE_PORT
" >> /spark/conf/spark-defaults.conf

. "/spark/sbin/spark-config.sh"
. "/spark/bin/load-spark-env.sh"
SPARK_MASTER="spark://$SPARK_MASTER_HOST:$SPARK_MASTER_PORT"
/spark/sbin/../bin/spark-class org.apache.spark.deploy.worker.Worker \
--webui-port ${SPARK_WORKER_WEBUI_PORT} --port ${SPARK_WORKER_PORT} ${SPARK_MASTER}
--webui-port ${SPARK_WORKER_WEBUI_PORT} --port ${SPARK_WORKER_PORT} --properties-file /spark/conf/spark-defaults.conf ${SPARK_MASTER}
5 changes: 4 additions & 1 deletion docker/spark/spark-defaults.conf
Original file line number Diff line number Diff line change
Expand Up @@ -20,4 +20,7 @@ spark.serializer=org.apache.spark.serializer.KryoSerializer
spark.sql.hive.convertMetastoreParquet=false
spark.sql.legacy.allowCreatingManagedTableUsingNonemptyLocation=true
spark.driver.extraClassPath=/spark/pre-jars/*
spark.executor.extraClassPath=/spark/pre-jars/*
spark.executor.extraClassPath=/spark/pre-jars/*
spark.shuffle.service.enabled=true
spark.dynamicAllocation.enabled=true
spark.scheduler.mode=FAIR
27 changes: 23 additions & 4 deletions src/main/scala/com/yotpo/metorikku/Metorikku.scala
Original file line number Diff line number Diff line change
Expand Up @@ -30,12 +30,31 @@ object Metorikku extends App {
ex.scheduleAtFixedRate(task, initialDelay, periodic.getTriggerDurationInSeconds(), TimeUnit.SECONDS)
}

def runMetrics(job: Job): Unit = {
job.config.metrics match {
case Some(metrics) => metrics.foreach(metricSetPath => {
def runMetricsInParallel(job: Job, metrics: Seq[String]): Unit = {
val threads = metrics.map(metricSetPath => new Thread(new Runnable {
def run() {
val metricSet = new MetricSet(metricSetPath)
metricSet.run(job)
})
}
})).toList

threads.foreach(t => t.start())
threads.foreach(t => t.join())
}

def runMetrics(job: Job): Unit = {
job.config.metrics match {
case Some(metrics) => {
session.config.parallel match {
case Some(true) => runMetricsInParallel(job, metrics)
case _ => {
metrics.foreach(metricSetPath => {
val metricSet = new MetricSet(metricSetPath)
metricSet.run(job)
})
}
}
}
case None => log.warn("No mertics were defined, exiting")
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ case class Configuration(metrics: Option[Seq[String]],
showQuery: Option[Boolean],
streaming: Option[Streaming],
periodic: Option[Periodic],
parallel: Option[Boolean],
var logLevel: Option[String],
var showPreviewLines: Option[Int],
var explain: Option[Boolean],
Expand Down
2 changes: 1 addition & 1 deletion src/main/scala/com/yotpo/metorikku/test/Tester.scala
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ case class Tester(config: TesterConfig) {
val variables = params.variables
val inputs = getMockFilesFromDir(config.test.mocks, config.basePath)
Configuration(Option(metrics), inputs, variables, None, None, None, None, None,
Option(config.preview > 0), None, None, None, Option(config.preview), None, None, None, None)
Option(config.preview > 0), None, None, None, None, Option(config.preview), None, None, None, None)
}

private def getMockFilesFromDir(mocks: Option[List[Mock]], testDir: File): Option[Map[String, Input]] = {
Expand Down

0 comments on commit 259c941

Please sign in to comment.