Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(scheduler): add fair scheduler and a new parallel mode #310

Open
wants to merge 3 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ libraryDependencies ++= Seq(
"org.apache.commons" % "commons-text" % "1.6",
"org.influxdb" % "influxdb-java" % "2.14",
"org.apache.kafka" %% "kafka" % "2.2.0" % "provided",
"za.co.absa" % "abris_2.11" % "3.1.1" % "provided" excludeAll(excludeAvro, excludeSpark),
"za.co.absa" % "abris_2.11" % "3.2.0" % "provided" excludeAll(excludeAvro, excludeSpark),
"org.apache.hudi" %% "hudi-spark-bundle" % "0.5.2-incubating" % "provided" excludeAll excludeFasterXML,
"org.apache.parquet" % "parquet-avro" % "1.10.1" % "provided",
"org.apache.avro" % "avro" % "1.8.2" % "provided",
Expand Down
1 change: 1 addition & 0 deletions docker/spark/Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@ RUN rm -f /spark/jars/httpclient-*.jar && wget -q https://repo1.maven.org/maven2
ADD log4j.json.properties /spark/conf/
ADD spark-defaults.conf /spark/conf/
ADD spark-env.sh /spark/conf/
ADD fairscheduler.xml /spark/conf/

ENV PYTHONHASHSEED 1
ENV SPARK_HOME /spark
Expand Down
7 changes: 7 additions & 0 deletions docker/spark/fairscheduler.xml
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
<allocations>
<pool name="default">
<schedulingMode>FAIR</schedulingMode>
<weight>1</weight>
<minShare>0</minShare>
</pool>
</allocations>
2 changes: 2 additions & 0 deletions docker/spark/scripts/entrypoint-submit.sh
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ SPARK_MASTER_HOST=${SPARK_MASTER_HOST:=spark-master}
MAX_RETRIES=${MAX_RETRIES:=300}
MIN_WORKERS=${MIN_WORKERS:=1}
SPARK_UI_PORT=${SPARK_UI_PORT:=4040}
SHUFFLE_SERVICE_PORT=${SHUFFLE_SERVICE_PORT:=7337}
POST_SCRIPT=${POST_SCRIPT:=/scripts/finish-submit.sh}
USE_BUILTIN_HIVE_METASTORE=${USE_BUILTIN_HIVE_METASTORE:=false}

Expand Down Expand Up @@ -39,6 +40,7 @@ SPARK_MASTER="spark://${SPARK_MASTER_HOST}:${SPARK_MASTER_PORT}"
echo -e "
spark.master=$SPARK_MASTER
spark.ui.port=$SPARK_UI_PORT
spark.shuffle.service.port=$SHUFFLE_SERVICE_PORT
" >> /spark/conf/spark-defaults.conf

if [[ ! -z ${HIVE_METASTORE_URI} ]]; then
Expand Down
9 changes: 7 additions & 2 deletions docker/spark/scripts/entrypoint-worker.sh
Original file line number Diff line number Diff line change
Expand Up @@ -5,16 +5,21 @@ SPARK_MASTER_PORT=${SPARK_MASTER_PORT:=7077}
SPARK_WORKER_WEBUI_PORT=${SPARK_WORKER_WEBUI_PORT:=8081}
SPARK_MASTER_HOST=${SPARK_MASTER_HOST:=spark-master}
SPARK_WORKER_PORT=${SPARK_WORKER_PORT:=7078}
SPARK_WORKER_OPTS=${SPARK_WORKER_OPTS:="-Dspark.worker.cleanup.enabled=true -Dspark.worker.cleanup.appDataTtl=1800"}
SHUFFLE_SERVICE_PORT=${SHUFFLE_SERVICE_PORT:=7337}
SPARK_WORKER_OPTS=${SPARK_WORKER_OPTS:="-Dspark.shuffle.service.enabled=true -Dspark.worker.cleanup.enabled=true -Dspark.worker.cleanup.appDataTtl=1800"}

# Logs
/scripts/init-logs-metrics.sh

# Monitor Logs
/scripts/monitor-executor-logs.sh &

echo -e "
spark.shuffle.service.port=$SHUFFLE_SERVICE_PORT
" >> /spark/conf/spark-defaults.conf

. "/spark/sbin/spark-config.sh"
. "/spark/bin/load-spark-env.sh"
SPARK_MASTER="spark://$SPARK_MASTER_HOST:$SPARK_MASTER_PORT"
/spark/sbin/../bin/spark-class org.apache.spark.deploy.worker.Worker \
--webui-port ${SPARK_WORKER_WEBUI_PORT} --port ${SPARK_WORKER_PORT} ${SPARK_MASTER}
--webui-port ${SPARK_WORKER_WEBUI_PORT} --port ${SPARK_WORKER_PORT} --properties-file /spark/conf/spark-defaults.conf ${SPARK_MASTER}
5 changes: 4 additions & 1 deletion docker/spark/spark-defaults.conf
Original file line number Diff line number Diff line change
Expand Up @@ -14,4 +14,7 @@ spark.port.maxRetries=0
spark.rdd.compress=true
spark.serializer=org.apache.spark.serializer.KryoSerializer
spark.sql.hive.convertMetastoreParquet=false
spark.sql.legacy.allowCreatingManagedTableUsingNonemptyLocation=true
spark.sql.legacy.allowCreatingManagedTableUsingNonemptyLocation=true
spark.shuffle.service.enabled=true
spark.dynamicAllocation.enabled=true
spark.scheduler.mode=FAIR
2 changes: 1 addition & 1 deletion src/main/scala/com/yotpo/metorikku/Job.scala
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ case class Job(val config: Configuration) {
// Set up instrumentation
val instrumentationFactory = InstrumentationProvider.getInstrumentationFactory(
config.appName, config.instrumentation)
UserMetricsSystem.initialize(sparkContext, "Metorikku")
// UserMetricsSystem.initialize(sparkContext, "Metorikku")

val instrumentationClient = instrumentationFactory.create()
sparkContext.addSparkListener(new SparkListener() {
Expand Down
38 changes: 30 additions & 8 deletions src/main/scala/com/yotpo/metorikku/Metorikku.scala
Original file line number Diff line number Diff line change
Expand Up @@ -5,24 +5,46 @@ import java.util.concurrent.{ScheduledThreadPoolExecutor, TimeUnit}
import com.yotpo.metorikku.configuration.job.{ConfigurationParser, Periodic}
import com.yotpo.metorikku.metric.MetricSet
import org.apache.log4j.LogManager
import org.apache.spark.groupon.metrics.UserMetricsSystem
import org.apache.spark.sql.SparkSession

object Metorikku extends App {
val log = LogManager.getLogger(this.getClass)
log.info("Starting Metorikku - Parsing configuration")
val session = Job(ConfigurationParser.parse(args))

session.config.periodic match {
case Some(periodic) => {
executePeriodicTask(periodic)
val configurations = ConfigurationParser.parse(args)

UserMetricsSystem.initialize(SparkSession.builder().getOrCreate().sparkContext, "Metorikku")

val jobs = configurations.map(config =>
new Runnable {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can u try use parallel collections instead of Runnables? its more scala like
https://docs.scala-lang.org/overviews/parallel-collections/overview.html

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not sure how these are related...

def run(): Unit = {
val job = Job(config)

job.config.periodic match {
case Some(periodic) => {
executePeriodicTask(job, periodic)
}
case _ => runMetrics(job)
}
}
}
)

jobs match {
case s if s.length > 1 => {
val threads = jobs.map(r => new Thread(r))
threads.foreach(t => t.start())
threads.foreach(t => t.join())
}
case _ => runMetrics(session)
case _ => jobs.foreach(r => r.run())
}

private def executePeriodicTask(periodic: Periodic) = {
private def executePeriodicTask(job: Job, periodic: Periodic) = {
val task = new Runnable {
def run() = {
session.sparkSession.catalog.clearCache()
runMetrics(session)
job.sparkSession.catalog.clearCache()
runMetrics(job)
}
}
val ex = new ScheduledThreadPoolExecutor(1)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ case class Configuration(metrics: Option[Seq[String]],
showQuery: Option[Boolean],
streaming: Option[Streaming],
periodic: Option[Periodic],
parallel: Option[Boolean],
var logLevel: Option[String],
var showPreviewLines: Option[Int],
var explain: Option[Boolean],
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,28 +10,32 @@ import scopt.OptionParser
object ConfigurationParser {
val log: Logger = LogManager.getLogger(this.getClass)

case class ConfigFileName(job: Option[String] = None, filename: Option[String] = None)
case class ConfigFileName(job: Option[String] = None, filename: Option[Seq[String]] = None)

val CLIparser: OptionParser[ConfigFileName] = new scopt.OptionParser[ConfigFileName]("Metorikku") {
head("Metorikku", "1.0")
opt[String]('j', "job")
.action((x, c) => c.copy(job = Option(x)))
.text("Job configuration JSON")
opt[String]('c', "config")
.text("Path to the job config file (YAML/JSON)")
opt[Seq[String]]('c', "config")
.text("Path to the job config file (YAML/JSON), you can add multiple files by concatenating the file names with ,")
.action((x, c) => c.copy(filename = Option(x)))
help("help") text "use command line arguments to specify the configuration file path or content"
}

def parse(args: Array[String]): Configuration = {
def parse(args: Array[String]): Seq[Configuration] = {
log.info("Starting Metorikku - Parsing configuration")

CLIparser.parse(args, ConfigFileName()) match {
case Some(arguments) =>
arguments.job match {
case Some(job) => parseConfigurationFile(job, FileUtils.getObjectMapperByExtension("json"))
case Some(job) => Seq(parseConfigurationFile(job, FileUtils.getObjectMapperByExtension("json")))
case None => arguments.filename match {
case Some(filename) => parseConfigurationFile(FileUtils.readConfigurationFile(filename), FileUtils.getObjectMapperByFileName(filename))
case Some(filenames) =>
filenames.
map(filename =>
parseConfigurationFile(FileUtils.readConfigurationFile(filename),
FileUtils.getObjectMapperByFileName(filename))).toList
case None => throw new MetorikkuException("Failed to parse config file")
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ class HudiOutputWriter(props: Map[String, Object], hudiOutput: Option[Hudi]) ext
// scalastyle:off method.length
override def write(dataFrame: DataFrame): Unit = {

if (dataFrame.head(1).isEmpty) {
if (dataFrame.cache().head(1).isEmpty) {
log.info("Skipping writing to hudi on empty dataframe")
return
}
Expand Down
2 changes: 1 addition & 1 deletion src/main/scala/com/yotpo/metorikku/test/Tester.scala
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ case class Tester(config: TesterConfig) {
val variables = params.variables
val inputs = getMockFilesFromDir(config.test.mocks, config.basePath)
Configuration(Option(metrics), inputs, variables, None, None, None, None, None,
Option(config.preview > 0), None, None, None, Option(config.preview), None, None, None, None)
Option(config.preview > 0), None, None, None, None, Option(config.preview), None, None, None, None)
}

private def getMockFilesFromDir(mocks: Option[List[Mock]], testDir: File): Option[Map[String, Input]] = {
Expand Down