-
Notifications
You must be signed in to change notification settings - Fork 10
observability script for demo #87
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from 4 commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,35 @@ | ||
| FROM apache/spark:3.5.3-scala2.12-java17-ubuntu | ||
|
|
||
| # Switch to root to install Java 17 | ||
| USER root | ||
|
|
||
| # Install Amazon Corretto 17 | ||
| RUN apt-get update && \ | ||
| apt-get install -y wget software-properties-common gnupg2 && \ | ||
| wget -O- https://apt.corretto.aws/corretto.key --https-only | apt-key add - && \ | ||
| add-apt-repository 'deb https://apt.corretto.aws stable main' && \ | ||
| apt-get update && \ | ||
| apt-get install -y java-17-amazon-corretto-jdk && \ | ||
| update-alternatives --set java /usr/lib/jvm/java-17-amazon-corretto/bin/java && \ | ||
| apt-get clean && \ | ||
| rm -rf /var/lib/apt/lists/* | ||
|
|
||
| # Create directory and set appropriate permissions | ||
| RUN mkdir -p /opt/chronon/jars && \ | ||
| chown -R 185:185 /opt/chronon && \ | ||
| chmod 755 /opt/chronon/jars | ||
|
|
||
| # Set JAVA_HOME | ||
| ENV JAVA_HOME=/usr/lib/jvm/java-17-amazon-corretto | ||
| ENV PATH=$PATH:$JAVA_HOME/bin | ||
|
|
||
| # Switch back to spark user | ||
| USER 185 | ||
|
|
||
| # Set environment variables for Spark classpath | ||
| ENV SPARK_CLASSPATH="/opt/spark/jars/*" | ||
| ENV SPARK_DIST_CLASSPATH="/opt/spark/jars/*" | ||
| ENV SPARK_EXTRA_CLASSPATH="/opt/spark/jars/*:/opt/chronon/jars/*" | ||
| ENV HADOOP_CLASSPATH="/opt/spark/jars/*" | ||
|
|
||
| CMD ["tail", "-f", "/dev/null"] |
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,2 @@ | ||
| run build.sh once, and you can repeatedly exec | ||
| sbt spark/assembly + run.sh on iterations to the chronon code. |
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1 @@ | ||
| docker build -t obs . |
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,33 @@ | ||
| #!/bin/bash | ||
|
|
||
| # Stop and remove existing container | ||
| if docker ps -a | grep -q spark-app; then | ||
| docker stop spark-app || echo "Failed to stop container" | ||
| docker rm spark-app || echo "Failed to remove container" | ||
| fi | ||
|
|
||
| CHRONON_JAR_PATH="${CHRONON_JAR_PATH:-$HOME/repos/chronon/spark/target/scala-2.12}" | ||
|
|
||
| if [ ! -d "$CHRONON_JAR_PATH" ]; then | ||
| echo "Error: JAR directory not found: $CHRONON_JAR_PATH" | ||
| exit 1 | ||
| fi | ||
|
|
||
| # Run new container | ||
| docker run -d \ | ||
| --name spark-app \ | ||
| -v "$CHRONON_JAR_PATH":/opt/chronon/jars \ | ||
| obs | ||
|
|
||
| # Submit with increased memory | ||
| docker exec spark-app \ | ||
| /opt/spark/bin/spark-submit \ | ||
| --master "local[*]" \ | ||
| --driver-memory 8g \ | ||
| --conf "spark.driver.maxResultSize=6g" \ | ||
| --conf "spark.driver.memory=8g" \ | ||
| --driver-class-path "/opt/spark/jars/*:/opt/chronon/jars/*" \ | ||
| --conf "spark.driver.host=localhost" \ | ||
| --conf "spark.driver.bindAddress=0.0.0.0" \ | ||
|
Comment on lines
+30
to
+31
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. 💡 Codebase verification Network configuration needs to be updated for Docker Compose environment The current network configuration in
The Spark configuration should be updated to:
🔗 Analysis chainVerify network configuration for container networking The current network configuration might not work correctly in all Docker network configurations. The 🏁 Scripts executedThe following scripts were executed for the analysis: Script: #!/bin/bash
# Check if custom Docker networks are used
docker network ls --format "{{.Name}}" | grep -q "spark" && {
echo "Custom Spark network exists. Current localhost binding might not work."
}
# Check container's network mode
docker inspect spark-app --format '{{.HostConfig.NetworkMode}}'
Length of output: 316 Script: #!/bin/bash
# Let's analyze the Docker and Spark configuration in the codebase
# Search for Docker network configurations
rg -l "docker.*network"
# Search for Spark network configurations
rg "spark\.(driver|executor)\.(host|bindAddress)" -A 2
# Look for Docker Compose files that might define networks
fd -e yml -e yaml | xargs grep -l "networks:"
# Check if there are any other Spark configuration files
fd -e conf -e properties | grep -i spark
Length of output: 922 Script: #!/bin/bash
# Let's check the network configuration in compose file and other Spark settings
cat docker-init/compose.yaml
# Check Spark default configurations
cat quickstart/conf/spark-defaults.conf
# Check the SparkSessionBuilder implementation for network settings
cat spark/src/main/scala/ai/chronon/spark/SparkSessionBuilder.scala
Length of output: 11114 |
||
| --class ai.chronon.spark.scripts.ObservabilityDemo \ | ||
|
Collaborator
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. are we running the in-memory kvstore on the spark driver? curious how that'd work in a cluster-mode setup or is that out of scope for the demo?
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. yeah basically only for the demo. we don't plan to launch this. |
||
| /opt/chronon/jars/spark-assembly-0.1.0-SNAPSHOT.jar | ||
nikhil-zlai marked this conversation as resolved.
Show resolved
Hide resolved
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -1,7 +1,13 @@ | ||
| #!/bin/bash | ||
|
|
||
|
Comment on lines
1
to
+2
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Add environment variable verification The script uses several environment variables ( Add this at the beginning of the script: # Verify required environment variables
for var in SPARK_JAR CLOUD_AWS_JAR ONLINE_CLASS; do
if [[ -z "${!var}" ]]; then
echo "Error: Required environment variable $var is not set" >&2
exit 1
fi
done |
||
| start_time=$(date +%s) | ||
| if ! python3.8 generate_anomalous_data.py; then | ||
| echo "Error: Failed to generate anomalous data" >&2 | ||
| exit 1 | ||
| else | ||
| end_time=$(date +%s) | ||
| elapsed_time=$((end_time - start_time)) | ||
| echo "Anomalous data generated successfully! Took $elapsed_time seconds." | ||
| fi | ||
|
|
||
|
|
||
|
|
@@ -11,7 +17,6 @@ if [[ ! -f $SPARK_JAR ]] || [[ ! -f $CLOUD_AWS_JAR ]]; then | |
| exit 1 | ||
| fi | ||
|
|
||
|
|
||
| # Load up metadata into DynamoDB | ||
| echo "Loading metadata.." | ||
| if ! java -cp $SPARK_JAR:$CLASSPATH ai.chronon.spark.Driver metadata-upload --conf-path=/chronon_sample/production/ --online-jar=$CLOUD_AWS_JAR --online-class=$ONLINE_CLASS; then | ||
|
|
@@ -32,9 +37,11 @@ fi | |
| echo "DynamoDB Table created successfully!" | ||
|
|
||
|
|
||
| # Load up summary data into DynamoDB | ||
| echo "Loading Summary.." | ||
| if ! java --add-opens=java.base/sun.nio.ch=ALL-UNNAMED --add-opens=java.base/sun.security.action=ALL-UNNAMED \ | ||
| start_time=$(date +%s) | ||
|
|
||
| if ! java \ | ||
| --add-opens=java.base/sun.nio.ch=ALL-UNNAMED \ | ||
| --add-opens=java.base/sun.security.action=ALL-UNNAMED \ | ||
| -cp $SPARK_JAR:$CLASSPATH ai.chronon.spark.Driver summarize-and-upload \ | ||
| --online-jar=$CLOUD_AWS_JAR \ | ||
| --online-class=$ONLINE_CLASS \ | ||
|
|
@@ -43,8 +50,11 @@ if ! java --add-opens=java.base/sun.nio.ch=ALL-UNNAMED --add-opens=java.base/sun | |
| --time-column=transaction_time; then | ||
| echo "Error: Failed to load summary data into DynamoDB" >&2 | ||
| exit 1 | ||
| else | ||
| end_time=$(date +%s) | ||
| elapsed_time=$((end_time - start_time)) | ||
| echo "Summary load completed successfully! Took $elapsed_time seconds." | ||
| fi | ||
| echo "Summary load completed successfully!" | ||
|
|
||
| # Add these java options as without them we hit the below error: | ||
| # throws java.lang.ClassFormatError accessible: module java.base does not "opens java.lang" to unnamed module @36328710 | ||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,12 @@ | ||
| <configuration> | ||
| <appender name="CONSOLE" class="ch.qos.logback.core.ConsoleAppender"> | ||
| <encoder> | ||
| <pattern>[%date] {%logger{0}} %level - %message%n</pattern> | ||
| </encoder> | ||
| </appender> | ||
| <logger name="ai.chronon" level="INFO" /> | ||
|
|
||
| <root level="INFO"> | ||
| <appender-ref ref="CONSOLE"/> | ||
| </root> | ||
| </configuration> |
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -34,6 +34,7 @@ object SparkSessionBuilder { | |
| // we would want to share locally generated warehouse during CI testing | ||
| def build(name: String, | ||
| local: Boolean = false, | ||
| hiveSupport: Boolean = true, | ||
| localWarehouseLocation: Option[String] = None, | ||
| additionalConfig: Option[Map[String, String]] = None, | ||
| enforceKryoSerializer: Boolean = true): SparkSession = { | ||
|
|
@@ -44,7 +45,10 @@ object SparkSessionBuilder { | |
| var baseBuilder = SparkSession | ||
| .builder() | ||
| .appName(name) | ||
| .enableHiveSupport() | ||
|
|
||
| if (hiveSupport) baseBuilder = baseBuilder.enableHiveSupport() | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. is this needed as part of this PR? I don't see it in use - can we drop for a follow up? |
||
|
|
||
| baseBuilder = baseBuilder | ||
| .config("spark.sql.session.timeZone", "UTC") | ||
| //otherwise overwrite will delete ALL partitions, not just the ones it touches | ||
| .config("spark.sql.sources.partitionOverwriteMode", "dynamic") | ||
|
|
||
| Original file line number | Diff line number | Diff line change | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|
| @@ -0,0 +1,192 @@ | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| package ai.chronon.spark.scripts | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| import ai.chronon | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| import ai.chronon.api.ColorPrinter.ColorString | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| import ai.chronon.api.Constants | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| import ai.chronon.api.DriftMetric | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| import ai.chronon.api.Extensions.MetadataOps | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| import ai.chronon.api.PartitionSpec | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| import ai.chronon.api.TileDriftSeries | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| import ai.chronon.api.TileSummarySeries | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| import ai.chronon.api.Window | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| import ai.chronon.online.KVStore | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| import ai.chronon.online.stats.DriftStore | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| import ai.chronon.spark.SparkSessionBuilder | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| import ai.chronon.spark.TableUtils | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| import ai.chronon.spark.stats.drift.Summarizer | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| import ai.chronon.spark.stats.drift.SummaryUploader | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| import ai.chronon.spark.stats.drift.scripts.PrepareData | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| import ai.chronon.spark.utils.InMemoryKvStore | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| import ai.chronon.spark.utils.MockApi | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| import org.rogach.scallop.ScallopConf | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| import org.rogach.scallop.ScallopOption | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| import org.slf4j.Logger | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| import org.slf4j.LoggerFactory | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| import java.util.concurrent.TimeUnit | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| import scala.concurrent.Await | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| import scala.concurrent.duration.Duration | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| import scala.util.ScalaJavaConversions.IteratorOps | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| object ObservabilityDemo { | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| @transient lazy val logger: Logger = LoggerFactory.getLogger(getClass) | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| def Time(message: String)(block: => Unit): Unit = { | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
nikhil-zlai marked this conversation as resolved.
Outdated
Show resolved
Hide resolved
|
||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| def Time(message: String)(block: => Unit): Unit = { | |
| def time(message: String)(block: => Unit): Unit = { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
do we need this to be threadsafe or no
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
not for this demo i think
Outdated
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
🛠️ Refactor suggestion
Improve timing implementation for thread safety and precision.
The current implementation has two issues:
- It's not thread-safe as timing operations from multiple threads could interleave.
System.currentTimeMillis()is less precise thanSystem.nanoTime()for duration measurements.
- def Time(message: String)(block: => Unit): Unit = {
+ def time(message: String)(block: => Unit): Unit = {
logger.info(s"$message..".yellow)
- val start = System.currentTimeMillis()
+ val start = System.nanoTime()
block
- val end = System.currentTimeMillis()
- logger.info(s"$message took ${end - start} ms".green)
+ val end = System.nanoTime()
+ val durationMs = TimeUnit.NANOSECONDS.toMillis(end - start)
+ logger.info(s"$message took $durationMs ms".green)
}For thread safety, consider using a synchronized block or atomic operations if concurrent timing is needed.
📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
| def Time(message: String)(block: => Unit): Unit = { | |
| logger.info(s"$message..".yellow) | |
| val start = System.currentTimeMillis() | |
| block | |
| val end = System.currentTimeMillis() | |
| logger.info(s"$message took ${end - start} ms".green) | |
| } | |
| def time(message: String)(block: => Unit): Unit = { | |
| logger.info(s"$message..".yellow) | |
| val start = System.nanoTime() | |
| block | |
| val end = System.nanoTime() | |
| val durationMs = TimeUnit.NANOSECONDS.toMillis(end - start) | |
| logger.info(s"$message took $durationMs ms".green) | |
| } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Fix invalid default end date.
The default end date "2023-02-30" is invalid as February never has 30 days. This could cause runtime issues.
- default = Some("2023-02-30"),
+ default = Some("2023-02-28"),📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
| default = Some("2023-02-30"), | |
| default = Some("2023-02-28"), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
🛠️ Refactor suggestion
Add input validation for configuration parameters.
The configuration parameters need validation to ensure:
- Valid date formats
- Positive row count
- End date is after start date
class Conf(arguments: Seq[String]) extends ScallopConf(arguments) {
+ private def isValidDate(date: String): Boolean = {
+ try {
+ PartitionSpec.daily.isValid(date)
+ } catch {
+ case _: Exception => false
+ }
+ }
+
val startDs: ScallopOption[String] = opt[String](
name = "start-ds",
default = Some("2023-01-01"),
descr = "Start date in YYYY-MM-DD format"
- )
+ ).validate(date => isValidDate(date))
val endDs: ScallopOption[String] = opt[String](
name = "end-ds",
default = Some("2023-02-28"),
descr = "End date in YYYY-MM-DD format"
- )
+ ).validate(date => isValidDate(date))
val rowCount: ScallopOption[Int] = opt[Int](
name = "row-count",
default = Some(700000),
descr = "Number of rows to generate"
- )
+ ).validate(count => count > 0)
+ validate((conf: Conf) => {
+ val start = conf.startDs()
+ val end = conf.endDs()
+ PartitionSpec.daily.epochMillis(start) < PartitionSpec.daily.epochMillis(end)
+ }) { "End date must be after start date" }📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
| class Conf(arguments: Seq[String]) extends ScallopConf(arguments) { | |
| val startDs: ScallopOption[String] = opt[String]( | |
| name = "start-ds", | |
| default = Some("2023-01-01"), | |
| descr = "Start date in YYYY-MM-DD format" | |
| ) | |
| val endDs: ScallopOption[String] = opt[String]( | |
| name = "end-ds", | |
| default = Some("2023-02-30"), | |
| descr = "End date in YYYY-MM-DD format" | |
| ) | |
| val rowCount: ScallopOption[Int] = opt[Int]( | |
| name = "row-count", | |
| default = Some(700000), | |
| descr = "Number of rows to generate" | |
| ) | |
| val namespace: ScallopOption[String] = opt[String]( | |
| name = "namespace", | |
| default = Some("observability_demo"), | |
| descr = "Namespace for the demo" | |
| ) | |
| verify() | |
| } | |
| class Conf(arguments: Seq[String]) extends ScallopConf(arguments) { | |
| private def isValidDate(date: String): Boolean = { | |
| try { | |
| PartitionSpec.daily.isValid(date) | |
| } catch { | |
| case _: Exception => false | |
| } | |
| } | |
| val startDs: ScallopOption[String] = opt[String]( | |
| name = "start-ds", | |
| default = Some("2023-01-01"), | |
| descr = "Start date in YYYY-MM-DD format" | |
| ).validate(date => isValidDate(date)) | |
| val endDs: ScallopOption[String] = opt[String]( | |
| name = "end-ds", | |
| default = Some("2023-02-28"), | |
| descr = "End date in YYYY-MM-DD format" | |
| ).validate(date => isValidDate(date)) | |
| val rowCount: ScallopOption[Int] = opt[Int]( | |
| name = "row-count", | |
| default = Some(700000), | |
| descr = "Number of rows to generate" | |
| ).validate(count => count > 0) | |
| val namespace: ScallopOption[String] = opt[String]( | |
| name = "namespace", | |
| default = Some("observability_demo"), | |
| descr = "Namespace for the demo" | |
| ) | |
| validate((conf: Conf) => { | |
| val start = conf.startDs() | |
| val end = conf.endDs() | |
| PartitionSpec.daily.epochMillis(start) < PartitionSpec.daily.epochMillis(end) | |
| }) { "End date must be after start date" } | |
| verify() | |
| } |
nikhil-zlai marked this conversation as resolved.
Show resolved
Hide resolved
nikhil-zlai marked this conversation as resolved.
Show resolved
Hide resolved
nikhil-zlai marked this conversation as resolved.
Show resolved
Hide resolved
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
hmm wiring up to kick off play from within the JVM process might end up being painful (it's typically triggered via the command line to launch play.core.server.ProdServerStart with the appropriate params / jars etc)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
🛠️ Refactor suggestion
Make timeout values configurable.
Multiple operations use hardcoded 10-second timeouts. These should be configurable to accommodate different environments and network conditions.
+ val defaultTimeout = Duration.create(
+ config.timeout.getOrElse(10),
+ TimeUnit.SECONDS
+ )
+
- val summaries = Await.result(summariesFuture, Duration.create(10, TimeUnit.SECONDS))
+ val summaries = Await.result(summariesFuture, defaultTimeout)
- driftSeries = Await.result(driftSeriesFuture.get, Duration.create(10, TimeUnit.SECONDS))
+ driftSeries = Await.result(driftSeriesFuture.get, defaultTimeout)
- summarySeries = Await.result(summarySeriesFuture.get, Duration.create(10, TimeUnit.SECONDS))
+ summarySeries = Await.result(summarySeriesFuture.get, defaultTimeout)Add timeout configuration to the Conf class:
val timeout: ScallopOption[Int] = opt[Int](
name = "timeout",
default = Some(10),
descr = "Timeout in seconds for async operations"
)Also applies to: 145-145, 169-169
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
🛠️ Refactor suggestion
Replace mutable state with immutable variables.
Use val instead of var to prevent accidental modifications and improve code clarity.
- var driftSeries: Seq[TileDriftSeries] = null
+ val driftSeries: Seq[TileDriftSeries] = time("Fetching drift series") {
+ val driftSeriesFuture = driftStore.getDriftSeries(
+ // ... existing parameters ...
+ )
+ Await.result(driftSeriesFuture.get, defaultTimeout)
+ }
- var summarySeries: Seq[TileSummarySeries] = null
+ val summarySeries: Seq[TileSummarySeries] = time("Fetching summary series") {
+ // ... move the fetching logic here ...
+ }Also applies to: 161-161
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Improve cleanup process.
The cleanup process has two issues:
- No error handling for spark.stop()
- Using System.exit(0) might prevent proper cleanup of resources
- spark.stop()
- System.exit(0)
+ try {
+ spark.stop()
+ } catch {
+ case e: Exception =>
+ logger.error("Failed to stop Spark session", e)
+ throw e
+ } finally {
+ // Allow natural program termination instead of forcing exit
+ }Committable suggestion skipped: line range outside the PR's diff.
Uh oh!
There was an error while loading. Please reload this page.