-
Notifications
You must be signed in to change notification settings - Fork 8
observability script for demo #87
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,35 @@ | ||
| FROM apache/spark:3.5.3-scala2.12-java17-ubuntu | ||
|
|
||
| # Switch to root to install Java 17 | ||
| USER root | ||
|
|
||
| # Install Amazon Corretto 17 | ||
| RUN apt-get update && \ | ||
| apt-get install -y wget software-properties-common gnupg2 && \ | ||
| wget -O- https://apt.corretto.aws/corretto.key --https-only | apt-key add - && \ | ||
| add-apt-repository 'deb https://apt.corretto.aws stable main' && \ | ||
| apt-get update && \ | ||
| apt-get install -y java-17-amazon-corretto-jdk && \ | ||
| update-alternatives --set java /usr/lib/jvm/java-17-amazon-corretto/bin/java && \ | ||
| apt-get clean && \ | ||
| rm -rf /var/lib/apt/lists/* | ||
|
|
||
| # Create directory and set appropriate permissions | ||
| RUN mkdir -p /opt/chronon/jars && \ | ||
| chown -R 185:185 /opt/chronon && \ | ||
| chmod 755 /opt/chronon/jars | ||
|
|
||
| # Set JAVA_HOME | ||
| ENV JAVA_HOME=/usr/lib/jvm/java-17-amazon-corretto | ||
| ENV PATH=$PATH:$JAVA_HOME/bin | ||
|
|
||
| # Switch back to spark user | ||
| USER 185 | ||
|
|
||
| # Set environment variables for Spark classpath | ||
| ENV SPARK_CLASSPATH="/opt/spark/jars/*" | ||
| ENV SPARK_DIST_CLASSPATH="/opt/spark/jars/*" | ||
| ENV SPARK_EXTRA_CLASSPATH="/opt/spark/jars/*:/opt/chronon/jars/*" | ||
| ENV HADOOP_CLASSPATH="/opt/spark/jars/*" | ||
|
|
||
| CMD ["tail", "-f", "/dev/null"] |
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,2 @@ | ||
| run build.sh once, and you can repeatedly exec | ||
| sbt spark/assembly + run.sh on iterations to the chronon code. |
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1 @@ | ||
| docker build -t obs . |
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,33 @@ | ||
| #!/bin/bash | ||
|
|
||
| # Stop and remove existing container | ||
| if docker ps -a | grep -q spark-app; then | ||
| docker stop spark-app || echo "Failed to stop container" | ||
| docker rm spark-app || echo "Failed to remove container" | ||
| fi | ||
|
|
||
| CHRONON_JAR_PATH="${CHRONON_JAR_PATH:-$HOME/repos/chronon/spark/target/scala-2.12}" | ||
|
|
||
| if [ ! -d "$CHRONON_JAR_PATH" ]; then | ||
| echo "Error: JAR directory not found: $CHRONON_JAR_PATH" | ||
| exit 1 | ||
| fi | ||
|
|
||
| # Run new container | ||
| docker run -d \ | ||
| --name spark-app \ | ||
| -v "$CHRONON_JAR_PATH":/opt/chronon/jars \ | ||
| obs | ||
|
|
||
| # Submit with increased memory | ||
| docker exec spark-app \ | ||
| /opt/spark/bin/spark-submit \ | ||
| --master "local[*]" \ | ||
| --driver-memory 8g \ | ||
| --conf "spark.driver.maxResultSize=6g" \ | ||
| --conf "spark.driver.memory=8g" \ | ||
| --driver-class-path "/opt/spark/jars/*:/opt/chronon/jars/*" \ | ||
| --conf "spark.driver.host=localhost" \ | ||
| --conf "spark.driver.bindAddress=0.0.0.0" \ | ||
|
Comment on lines
+30
to
+31
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. 💡 Codebase verification Network configuration needs to be updated for Docker Compose environment The current network configuration in
The Spark configuration should be updated to:
🔗 Analysis chainVerify network configuration for container networking The current network configuration might not work correctly in all Docker network configurations. The 🏁 Scripts executedThe following scripts were executed for the analysis: Script: #!/bin/bash
# Check if custom Docker networks are used
docker network ls --format "{{.Name}}" | grep -q "spark" && {
echo "Custom Spark network exists. Current localhost binding might not work."
}
# Check container's network mode
docker inspect spark-app --format '{{.HostConfig.NetworkMode}}'
Length of output: 316 Script: #!/bin/bash
# Let's analyze the Docker and Spark configuration in the codebase
# Search for Docker network configurations
rg -l "docker.*network"
# Search for Spark network configurations
rg "spark\.(driver|executor)\.(host|bindAddress)" -A 2
# Look for Docker Compose files that might define networks
fd -e yml -e yaml | xargs grep -l "networks:"
# Check if there are any other Spark configuration files
fd -e conf -e properties | grep -i spark
Length of output: 922 Script: #!/bin/bash
# Let's check the network configuration in compose file and other Spark settings
cat docker-init/compose.yaml
# Check Spark default configurations
cat quickstart/conf/spark-defaults.conf
# Check the SparkSessionBuilder implementation for network settings
cat spark/src/main/scala/ai/chronon/spark/SparkSessionBuilder.scala
Length of output: 11114 |
||
| --class ai.chronon.spark.scripts.ObservabilityDemo \ | ||
|
Collaborator
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. are we running the in-memory kvstore on the spark driver? curious how that'd work in a cluster-mode setup or is that out of scope for the demo?
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. yeah basically only for the demo. we don't plan to launch this. |
||
| /opt/chronon/jars/spark-assembly-0.1.0-SNAPSHOT.jar | ||
nikhil-zlai marked this conversation as resolved.
Show resolved
Hide resolved
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -1,7 +1,13 @@ | ||
| #!/bin/bash | ||
|
|
||
|
Comment on lines
1
to
+2
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Add environment variable verification The script uses several environment variables ( Add this at the beginning of the script: # Verify required environment variables
for var in SPARK_JAR CLOUD_AWS_JAR ONLINE_CLASS; do
if [[ -z "${!var}" ]]; then
echo "Error: Required environment variable $var is not set" >&2
exit 1
fi
done |
||
| start_time=$(date +%s) | ||
| if ! python3.8 generate_anomalous_data.py; then | ||
| echo "Error: Failed to generate anomalous data" >&2 | ||
| exit 1 | ||
| else | ||
| end_time=$(date +%s) | ||
| elapsed_time=$((end_time - start_time)) | ||
| echo "Anomalous data generated successfully! Took $elapsed_time seconds." | ||
| fi | ||
|
|
||
|
|
||
|
|
@@ -11,7 +17,6 @@ if [[ ! -f $SPARK_JAR ]] || [[ ! -f $CLOUD_AWS_JAR ]]; then | |
| exit 1 | ||
| fi | ||
|
|
||
|
|
||
| # Load up metadata into DynamoDB | ||
| echo "Loading metadata.." | ||
| if ! java -cp $SPARK_JAR:$CLASSPATH ai.chronon.spark.Driver metadata-upload --conf-path=/chronon_sample/production/ --online-jar=$CLOUD_AWS_JAR --online-class=$ONLINE_CLASS; then | ||
|
|
@@ -32,9 +37,11 @@ fi | |
| echo "DynamoDB Table created successfully!" | ||
|
|
||
|
|
||
| # Load up summary data into DynamoDB | ||
| echo "Loading Summary.." | ||
| if ! java --add-opens=java.base/sun.nio.ch=ALL-UNNAMED --add-opens=java.base/sun.security.action=ALL-UNNAMED \ | ||
| start_time=$(date +%s) | ||
|
|
||
| if ! java \ | ||
| --add-opens=java.base/sun.nio.ch=ALL-UNNAMED \ | ||
| --add-opens=java.base/sun.security.action=ALL-UNNAMED \ | ||
| -cp $SPARK_JAR:$CLASSPATH ai.chronon.spark.Driver summarize-and-upload \ | ||
| --online-jar=$CLOUD_AWS_JAR \ | ||
| --online-class=$ONLINE_CLASS \ | ||
|
|
@@ -43,8 +50,11 @@ if ! java --add-opens=java.base/sun.nio.ch=ALL-UNNAMED --add-opens=java.base/sun | |
| --time-column=transaction_time; then | ||
| echo "Error: Failed to load summary data into DynamoDB" >&2 | ||
| exit 1 | ||
| else | ||
| end_time=$(date +%s) | ||
| elapsed_time=$((end_time - start_time)) | ||
| echo "Summary load completed successfully! Took $elapsed_time seconds." | ||
| fi | ||
| echo "Summary load completed successfully!" | ||
|
|
||
| # Add these java options as without them we hit the below error: | ||
| # throws java.lang.ClassFormatError accessible: module java.base does not "opens java.lang" to unnamed module @36328710 | ||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,12 @@ | ||
| <configuration> | ||
| <appender name="CONSOLE" class="ch.qos.logback.core.ConsoleAppender"> | ||
| <encoder> | ||
| <pattern>[%date] {%logger{0}} %level - %message%n</pattern> | ||
| </encoder> | ||
| </appender> | ||
| <logger name="ai.chronon" level="INFO" /> | ||
|
|
||
| <root level="INFO"> | ||
| <appender-ref ref="CONSOLE"/> | ||
| </root> | ||
| </configuration> |
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -34,6 +34,7 @@ object SparkSessionBuilder { | |
| // we would want to share locally generated warehouse during CI testing | ||
| def build(name: String, | ||
| local: Boolean = false, | ||
| hiveSupport: Boolean = true, | ||
| localWarehouseLocation: Option[String] = None, | ||
| additionalConfig: Option[Map[String, String]] = None, | ||
| enforceKryoSerializer: Boolean = true): SparkSession = { | ||
|
|
@@ -44,7 +45,10 @@ object SparkSessionBuilder { | |
| var baseBuilder = SparkSession | ||
| .builder() | ||
| .appName(name) | ||
| .enableHiveSupport() | ||
|
|
||
| if (hiveSupport) baseBuilder = baseBuilder.enableHiveSupport() | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. is this needed as part of this PR? I don't see it in use - can we drop for a follow up? |
||
|
|
||
| baseBuilder = baseBuilder | ||
| .config("spark.sql.session.timeZone", "UTC") | ||
| //otherwise overwrite will delete ALL partitions, not just the ones it touches | ||
| .config("spark.sql.sources.partitionOverwriteMode", "dynamic") | ||
|
|
||
| Original file line number | Diff line number | Diff line change | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|
| @@ -0,0 +1,192 @@ | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| package ai.chronon.spark.scripts | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| import ai.chronon | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| import ai.chronon.api.ColorPrinter.ColorString | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| import ai.chronon.api.Constants | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| import ai.chronon.api.DriftMetric | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| import ai.chronon.api.Extensions.MetadataOps | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| import ai.chronon.api.PartitionSpec | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| import ai.chronon.api.TileDriftSeries | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| import ai.chronon.api.TileSummarySeries | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| import ai.chronon.api.Window | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| import ai.chronon.online.KVStore | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| import ai.chronon.online.stats.DriftStore | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| import ai.chronon.spark.SparkSessionBuilder | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| import ai.chronon.spark.TableUtils | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| import ai.chronon.spark.stats.drift.Summarizer | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| import ai.chronon.spark.stats.drift.SummaryUploader | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| import ai.chronon.spark.stats.drift.scripts.PrepareData | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| import ai.chronon.spark.utils.InMemoryKvStore | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| import ai.chronon.spark.utils.MockApi | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| import org.rogach.scallop.ScallopConf | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| import org.rogach.scallop.ScallopOption | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| import org.slf4j.Logger | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| import org.slf4j.LoggerFactory | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| import java.util.concurrent.TimeUnit | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| import scala.concurrent.Await | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| import scala.concurrent.duration.Duration | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| import scala.util.ScalaJavaConversions.IteratorOps | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| object ObservabilityDemo { | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| @transient lazy val logger: Logger = LoggerFactory.getLogger(getClass) | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| def time(message: String)(block: => Unit): Unit = { | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| logger.info(s"$message..".yellow) | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| val start = System.currentTimeMillis() | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| block | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
Collaborator
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. do we need this to be threadsafe or no
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. not for this demo i think |
||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| val end = System.currentTimeMillis() | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| logger.info(s"$message took ${end - start} ms".green) | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| } | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
Comment on lines
+34
to
+40
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. 🛠️ Refactor suggestion Improve timing implementation for precision and naming convention.
- def time(message: String)(block: => Unit): Unit = {
+ def time(message: String)(block: => Unit): Unit = {
logger.info(s"$message..".yellow)
- val start = System.currentTimeMillis()
+ val start = System.nanoTime()
block
- val end = System.currentTimeMillis()
- logger.info(s"$message took ${end - start} ms".green)
+ val end = System.nanoTime()
+ val durationMs = TimeUnit.NANOSECONDS.toMillis(end - start)
+ logger.info(s"$message took $durationMs ms".green)
}📝 Committable suggestion
Suggested change
|
||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| class Conf(arguments: Seq[String]) extends ScallopConf(arguments) { | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| val startDs: ScallopOption[String] = opt[String]( | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| name = "start-ds", | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| default = Some("2023-01-01"), | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| descr = "Start date in YYYY-MM-DD format" | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| ) | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| val endDs: ScallopOption[String] = opt[String]( | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| name = "end-ds", | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| default = Some("2023-02-30"), | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Fix invalid default end date. The default end date "2023-02-30" is invalid as February never has 30 days. This could cause runtime issues. - default = Some("2023-02-30"),
+ default = Some("2023-02-28"),📝 Committable suggestion
Suggested change
|
||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| descr = "End date in YYYY-MM-DD format" | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| ) | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| val rowCount: ScallopOption[Int] = opt[Int]( | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| name = "row-count", | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| default = Some(700000), | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| descr = "Number of rows to generate" | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| ) | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| val namespace: ScallopOption[String] = opt[String]( | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| name = "namespace", | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| default = Some("observability_demo"), | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| descr = "Namespace for the demo" | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| ) | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| verify() | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| } | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
Comment on lines
+42
to
+68
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. 🛠️ Refactor suggestion Add input validation for configuration parameters. The configuration parameters need validation to ensure:
class Conf(arguments: Seq[String]) extends ScallopConf(arguments) {
+ private def isValidDate(date: String): Boolean = {
+ try {
+ PartitionSpec.daily.isValid(date)
+ } catch {
+ case _: Exception => false
+ }
+ }
+
val startDs: ScallopOption[String] = opt[String](
name = "start-ds",
default = Some("2023-01-01"),
descr = "Start date in YYYY-MM-DD format"
- )
+ ).validate(date => isValidDate(date))
val endDs: ScallopOption[String] = opt[String](
name = "end-ds",
default = Some("2023-02-28"),
descr = "End date in YYYY-MM-DD format"
- )
+ ).validate(date => isValidDate(date))
val rowCount: ScallopOption[Int] = opt[Int](
name = "row-count",
default = Some(700000),
descr = "Number of rows to generate"
- )
+ ).validate(count => count > 0)
+ validate((conf: Conf) => {
+ val start = conf.startDs()
+ val end = conf.endDs()
+ PartitionSpec.daily.epochMillis(start) < PartitionSpec.daily.epochMillis(end)
+ }) { "End date must be after start date" }📝 Committable suggestion
Suggested change
|
||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| def main(args: Array[String]): Unit = { | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| val config = new Conf(args) | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| val startDs = config.startDs() | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| val endDs = config.endDs() | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| val rowCount = config.rowCount() | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| val namespace = config.namespace() | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| val spark = SparkSessionBuilder.build(namespace, local = true) | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| implicit val tableUtils: TableUtils = TableUtils(spark) | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| tableUtils.createDatabase(namespace) | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| // generate anomalous data (join output) | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| val prepareData = PrepareData(namespace) | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| val join = prepareData.generateAnomalousFraudJoin | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| time("Preparing data") { | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| val df = prepareData.generateFraudSampleData(rowCount, startDs, endDs, join.metaData.loggedTable) | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| df.show(10, truncate = false) | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| } | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| time("Summarizing data") { | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| // compute summary table and packed table (for uploading) | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| Summarizer.compute(join.metaData, ds = endDs, useLogs = true) | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| } | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| val packedTable = join.metaData.packedSummaryTable | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| // mock api impl for online fetching and uploading | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| val kvStoreFunc: () => KVStore = () => { | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| // cannot reuse the variable - or serialization error | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| val result = InMemoryKvStore.build(namespace, () => null) | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| result | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| } | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| val api = new MockApi(kvStoreFunc, namespace) | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| // create necessary tables in kvstore | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| val kvStore = api.genKvStore | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| kvStore.create(Constants.MetadataDataset) | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| kvStore.create(Constants.TiledSummaryDataset) | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
nikhil-zlai marked this conversation as resolved.
Show resolved
Hide resolved
|
||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| // upload join conf | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| api.buildFetcher().putJoinConf(join) | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| time("Uploading summaries") { | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| val uploader = new SummaryUploader(tableUtils.loadTable(packedTable), api) | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| uploader.run() | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| } | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| // test drift store methods | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| val driftStore = new DriftStore(api.genKvStore) | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| // TODO: Wire up drift store into hub and create an endpoint | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. hmm wiring up to kick off play from within the JVM process might end up being painful (it's typically triggered via the command line to launch |
||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| // fetch keys | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| val tileKeys = driftStore.tileKeysForJoin(join) | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| val tileKeysSimple = tileKeys.mapValues(_.map(_.column).toSeq) | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| tileKeysSimple.foreach { case (k, v) => logger.info(s"$k -> [${v.mkString(", ")}]") } | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| // fetch summaries | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| val startMs = PartitionSpec.daily.epochMillis(startDs) | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| val endMs = PartitionSpec.daily.epochMillis(endDs) | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| val summariesFuture = driftStore.getSummaries(join, Some(startMs), Some(endMs), None) | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| val summaries = Await.result(summariesFuture, Duration.create(10, TimeUnit.SECONDS)) | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. 🛠️ Refactor suggestion Make timeout values configurable. Multiple operations use hardcoded 10-second timeouts. These should be configurable to accommodate different environments and network conditions. + val defaultTimeout = Duration.create(
+ config.timeout.getOrElse(10),
+ TimeUnit.SECONDS
+ )
+
- val summaries = Await.result(summariesFuture, Duration.create(10, TimeUnit.SECONDS))
+ val summaries = Await.result(summariesFuture, defaultTimeout)
- driftSeries = Await.result(driftSeriesFuture.get, Duration.create(10, TimeUnit.SECONDS))
+ driftSeries = Await.result(driftSeriesFuture.get, defaultTimeout)
- summarySeries = Await.result(summarySeriesFuture.get, Duration.create(10, TimeUnit.SECONDS))
+ summarySeries = Await.result(summarySeriesFuture.get, defaultTimeout)Add timeout configuration to the Conf class: val timeout: ScallopOption[Int] = opt[Int](
name = "timeout",
default = Some(10),
descr = "Timeout in seconds for async operations"
)Also applies to: 145-145, 169-169 |
||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| logger.info(summaries.toString()) | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| var driftSeries: Seq[TileDriftSeries] = null | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| // fetch drift series | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
Comment on lines
+135
to
+136
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. 🛠️ Refactor suggestion Replace mutable state with immutable variables. Use - var driftSeries: Seq[TileDriftSeries] = null
+ val driftSeries: Seq[TileDriftSeries] = time("Fetching drift series") {
+ val driftSeriesFuture = driftStore.getDriftSeries(
+ // ... existing parameters ...
+ )
+ Await.result(driftSeriesFuture.get, defaultTimeout)
+ }
- var summarySeries: Seq[TileSummarySeries] = null
+ val summarySeries: Seq[TileSummarySeries] = time("Fetching summary series") {
+ // ... move the fetching logic here ...
+ }Also applies to: 161-161 |
||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| time("Fetching drift series") { | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| val driftSeriesFuture = driftStore.getDriftSeries( | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| join.metaData.nameToFilePath, | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| DriftMetric.JENSEN_SHANNON, | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| lookBack = new Window(7, chronon.api.TimeUnit.DAYS), | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| startMs, | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| endMs | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| ) | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| driftSeries = Await.result(driftSeriesFuture.get, Duration.create(10, TimeUnit.SECONDS)) | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| } | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| val (nulls, totals) = driftSeries.iterator.foldLeft(0 -> 0) { | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| case ((nulls, total), s) => | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| val currentNulls = s.getPercentileDriftSeries.iterator().toScala.count(_ == null) | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| val currentCount = s.getPercentileDriftSeries.size() | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| (nulls + currentNulls, total + currentCount) | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| } | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| logger.info(s"""drift totals: $totals | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| |drift nulls: $nulls | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| |""".stripMargin.red) | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| logger.info("Drift series fetched successfully".green) | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| var summarySeries: Seq[TileSummarySeries] = null | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| time("Fetching summary series") { | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| val summarySeriesFuture = driftStore.getSummarySeries( | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| join.metaData.nameToFilePath, | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| startMs, | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| endMs | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| ) | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| summarySeries = Await.result(summarySeriesFuture.get, Duration.create(10, TimeUnit.SECONDS)) | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| } | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| val (summaryNulls, summaryTotals) = summarySeries.iterator.foldLeft(0 -> 0) { | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| case ((nulls, total), s) => | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| if (s.getPercentiles == null) { | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| (nulls + 1) -> (total + 1) | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| } else { | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| val currentNulls = s.getPercentiles.iterator().toScala.count(_ == null) | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| val currentCount = s.getPercentiles.size() | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| (nulls + currentNulls, total + currentCount) | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| } | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| } | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| println(s"""summary ptile totals: $summaryTotals | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| |summary ptile nulls: $summaryNulls | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| |""".stripMargin) | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| logger.info("Summary series fetched successfully".green) | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| spark.stop() | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| System.exit(0) | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| } | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
Comment on lines
+189
to
+191
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Improve cleanup process. The cleanup process has two issues:
- spark.stop()
- System.exit(0)
+ try {
+ spark.stop()
+ } catch {
+ case e: Exception =>
+ logger.error("Failed to stop Spark session", e)
+ throw e
+ } finally {
+ // Allow natural program termination instead of forcing exit
+ }
|
||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| } | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
Uh oh!
There was an error while loading. Please reload this page.