Skip to content
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
33 changes: 33 additions & 0 deletions docker-init/demo/Dockerfile
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
FROM apache/spark:latest

# Switch to root to install Java 17
USER root

# Install Amazon Corretto 17
RUN apt-get update && \
apt-get install -y wget software-properties-common gnupg2 && \
wget -O- https://apt.corretto.aws/corretto.key | apt-key add - && \
add-apt-repository 'deb https://apt.corretto.aws stable main' && \
apt-get update && \
apt-get install -y java-17-amazon-corretto-jdk && \
update-alternatives --set java /usr/lib/jvm/java-17-amazon-corretto/bin/java

# Create directory and set appropriate permissions
RUN mkdir -p /opt/chronon/jars && \
chown -R 185:185 /opt/chronon && \
chmod 755 /opt/chronon/jars

# Set JAVA_HOME
ENV JAVA_HOME=/usr/lib/jvm/java-17-amazon-corretto
ENV PATH=$PATH:$JAVA_HOME/bin

# Switch back to spark user
USER 185

# Set environment variables for Spark classpath
ENV SPARK_CLASSPATH="/opt/spark/jars/*"
ENV SPARK_DIST_CLASSPATH="/opt/spark/jars/*"
ENV SPARK_EXTRA_CLASSPATH="/opt/spark/jars/*:/opt/chronon/jars/*"
ENV HADOOP_CLASSPATH="/opt/spark/jars/*"

CMD ["tail", "-f", "/dev/null"]
2 changes: 2 additions & 0 deletions docker-init/demo/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
run build.sh once, and you can repeatedly exec
sbt spark/assembly + run.sh on iterations to the chronon code.
1 change: 1 addition & 0 deletions docker-init/demo/build.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
docker build -t obs .
22 changes: 22 additions & 0 deletions docker-init/demo/run.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
# Stop and remove existing container
docker stop spark-app
docker rm spark-app
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🛠️ Refactor suggestion

Add error handling for Docker commands

The Docker stop and remove commands should handle cases where the container doesn't exist or can't be stopped.

Replace with this more robust implementation:

-docker stop spark-app
-docker rm spark-app
+if docker ps -a | grep -q spark-app; then
+  docker stop spark-app || echo "Failed to stop container"
+  docker rm spark-app || echo "Failed to remove container"
+fi
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
docker stop spark-app
docker rm spark-app
if docker ps -a | grep -q spark-app; then
docker stop spark-app || echo "Failed to stop container"
docker rm spark-app || echo "Failed to remove container"
fi


# Run new container
docker run -d \
--name spark-app \
-v $HOME/repos/chronon/spark/target/scala-2.12:/opt/chronon/jars \
obs
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue

Improve volume mount path handling

The volume mount path assumes a specific directory structure which could cause issues in different environments.

Consider:

  1. Making the path configurable via environment variable
  2. Validating the path exists before mounting
+CHRONON_JAR_PATH="${CHRONON_JAR_PATH:-$HOME/repos/chronon/spark/target/scala-2.12}"
+
+if [ ! -d "$CHRONON_JAR_PATH" ]; then
+  echo "Error: JAR directory not found: $CHRONON_JAR_PATH"
+  exit 1
+fi
+
 docker run -d \
   --name spark-app \
-  -v $HOME/repos/chronon/spark/target/scala-2.12:/opt/chronon/jars \
+  -v "$CHRONON_JAR_PATH":/opt/chronon/jars \
   obs
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
docker run -d \
--name spark-app \
-v $HOME/repos/chronon/spark/target/scala-2.12:/opt/chronon/jars \
obs
CHRONON_JAR_PATH="${CHRONON_JAR_PATH:-$HOME/repos/chronon/spark/target/scala-2.12}"
if [ ! -d "$CHRONON_JAR_PATH" ]; then
echo "Error: JAR directory not found: $CHRONON_JAR_PATH"
exit 1
fi
docker run -d \
--name spark-app \
-v "$CHRONON_JAR_PATH":/opt/chronon/jars \
obs


# Submit with increased memory
docker exec spark-app \
/opt/spark/bin/spark-submit \
--master "local[*]" \
--driver-memory 8g \
--conf "spark.driver.maxResultSize=6g" \
--conf "spark.driver.memory=8g" \
--driver-class-path "/opt/spark/jars/*:/opt/chronon/jars/*" \
--conf "spark.driver.host=localhost" \
--conf "spark.driver.bindAddress=0.0.0.0" \
Comment on lines +30 to +31
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

💡 Codebase verification

Network configuration needs to be updated for Docker Compose environment

The current network configuration in run.sh using localhost and 0.0.0.0 is incompatible with the Docker Compose setup. The compose file shows that:

  • A custom network temporal-network is defined
  • Spark services are running in the compose environment
  • The Spark master is accessible via hostname spark (not localhost)

The Spark configuration should be updated to:

  • Replace spark.driver.host=localhost with spark.driver.host=spark
  • Keep spark.driver.bindAddress=0.0.0.0 as it allows accepting connections from any network interface
🔗 Analysis chain

Verify network configuration for container networking

The current network configuration might not work correctly in all Docker network configurations. The localhost and 0.0.0.0 bindings need verification.

🏁 Scripts executed

The following scripts were executed for the analysis:

Script:

#!/bin/bash
# Check if custom Docker networks are used
docker network ls --format "{{.Name}}" | grep -q "spark" && {
  echo "Custom Spark network exists. Current localhost binding might not work."
}

# Check container's network mode
docker inspect spark-app --format '{{.HostConfig.NetworkMode}}'

Length of output: 316


Script:

#!/bin/bash
# Let's analyze the Docker and Spark configuration in the codebase
# Search for Docker network configurations
rg -l "docker.*network" 

# Search for Spark network configurations
rg "spark\.(driver|executor)\.(host|bindAddress)" -A 2

# Look for Docker Compose files that might define networks
fd -e yml -e yaml | xargs grep -l "networks:"

# Check if there are any other Spark configuration files
fd -e conf -e properties | grep -i spark

Length of output: 922


Script:

#!/bin/bash
# Let's check the network configuration in compose file and other Spark settings
cat docker-init/compose.yaml

# Check Spark default configurations
cat quickstart/conf/spark-defaults.conf

# Check the SparkSessionBuilder implementation for network settings
cat spark/src/main/scala/ai/chronon/spark/SparkSessionBuilder.scala

Length of output: 11114

--class ai.chronon.spark.scripts.ObservabilityDemo \
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

are we running the in-memory kvstore on the spark driver? curious how that'd work in a cluster-mode setup or is that out of scope for the demo?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yeah basically only for the demo. we don't plan to launch this.

/opt/chronon/jars/spark-assembly-0.1.0-SNAPSHOT.jar
2 changes: 1 addition & 1 deletion docker-init/generate_anomalous_data.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
from pyspark.sql.types import StructType, StructField, DoubleType, IntegerType, StringType, TimestampType, BooleanType

# Initialize Spark session
spark = SparkSession.builder.appName("FraudClassificationSchema").getOrCreate()
spark = SparkSession.builder.appName("FraudClassificationSchema").config("spark.log.level", "WARN").getOrCreate()

def time_to_value(t, base_value, amplitude, noise_level, scale=1):
if scale is None:
Expand Down
32 changes: 26 additions & 6 deletions docker-init/start.sh
Original file line number Diff line number Diff line change
@@ -1,7 +1,13 @@
#!/bin/bash

Comment on lines 1 to +2
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue

Add environment variable verification

The script uses several environment variables (SPARK_JAR, CLOUD_AWS_JAR, ONLINE_CLASS) but doesn't verify their presence at the start.

Add this at the beginning of the script:

# Verify required environment variables
for var in SPARK_JAR CLOUD_AWS_JAR ONLINE_CLASS; do
  if [[ -z "${!var}" ]]; then
    echo "Error: Required environment variable $var is not set" >&2
    exit 1
  fi
done

start_time=$(date +%s)
if ! python3.8 generate_anomalous_data.py; then
echo "Error: Failed to generate anomalous data" >&2
exit 1
else
end_time=$(date +%s)
elapsed_time=$((end_time - start_time))
echo "Anomalous data generated successfully! Took $elapsed_time seconds."
fi


Expand All @@ -11,18 +17,27 @@ if [[ ! -f $SPARK_JAR ]] || [[ ! -f $CLOUD_AWS_JAR ]]; then
exit 1
fi

# Load up summary data into DynamoDB
echo "Loading Summary.."
#check if log file exists
if [[ ! -f log4j.properties ]]; then
echo "Error: log4j.properties file not found" >&2
exit 1
else
echo "log4j.properties file found!"
fi

# Load up metadata into DynamoDB
echo "Loading metadata.."
if ! java -cp $SPARK_JAR:$CLASSPATH ai.chronon.spark.Driver metadata-upload --conf-path=/chronon_sample/production/ --online-jar=$CLOUD_AWS_JAR --online-class=$ONLINE_CLASS; then
if ! java -Dlog4j.configurationFile=log4j.properties -cp $SPARK_JAR:$CLASSPATH ai.chronon.spark.Driver metadata-upload --conf-path=/chronon_sample/production/ --online-jar=$CLOUD_AWS_JAR --online-class=$ONLINE_CLASS; then
echo "Error: Failed to load metadata into DynamoDB" >&2
exit 1
fi
echo "Metadata load completed successfully!"

# Initialize DynamoDB
echo "Initializing DynamoDB Table .."
if ! output=$(java -cp $SPARK_JAR:$CLASSPATH ai.chronon.spark.Driver create-summary-dataset \
if ! output=$(java -Dlog4j.configurationFile=log4j.properties -cp $SPARK_JAR:$CLASSPATH ai.chronon.spark.Driver create-summary-dataset \
--online-jar=$CLOUD_AWS_JAR \
--online-class=$ONLINE_CLASS 2>&1); then
echo "Error: Failed to bring up DynamoDB table" >&2
Expand All @@ -32,9 +47,11 @@ fi
echo "DynamoDB Table created successfully!"


# Load up summary data into DynamoDB
echo "Loading Summary.."
if ! java --add-opens=java.base/sun.nio.ch=ALL-UNNAMED --add-opens=java.base/sun.security.action=ALL-UNNAMED \
start_time=$(date +%s)

if ! java -Dlog4j.configurationFile=log4j.properties \
--add-opens=java.base/sun.nio.ch=ALL-UNNAMED \
--add-opens=java.base/sun.security.action=ALL-UNNAMED \
-cp $SPARK_JAR:$CLASSPATH ai.chronon.spark.Driver summarize-and-upload \
--online-jar=$CLOUD_AWS_JAR \
--online-class=$ONLINE_CLASS \
Expand All @@ -43,8 +60,11 @@ if ! java --add-opens=java.base/sun.nio.ch=ALL-UNNAMED --add-opens=java.base/sun
--time-column=transaction_time; then
echo "Error: Failed to load summary data into DynamoDB" >&2
exit 1
else
end_time=$(date +%s)
elapsed_time=$((end_time - start_time))
echo "Summary load completed successfully! Took $elapsed_time seconds."
fi
echo "Summary load completed successfully!"

# Add these java options as without them we hit the below error:
# throws java.lang.ClassFormatError accessible: module java.base does not "opens java.lang" to unnamed module @36328710
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,7 @@ class DriftStore(kvStore: KVStore,
def getSummaries(joinConf: api.Join,
startMs: Option[Long],
endMs: Option[Long],
columnPrefix: Option[String] = None): Future[Seq[TileSummaryInfo]] = {
columnPrefix: Option[String]): Future[Seq[TileSummaryInfo]] = {

val serializer: TSerializer = compactSerializer
val tileKeyMap = tileKeysForJoin(joinConf, columnPrefix)
Expand Down
4 changes: 3 additions & 1 deletion spark/src/main/scala/ai/chronon/spark/Driver.scala
Original file line number Diff line number Diff line change
Expand Up @@ -147,7 +147,9 @@ object Driver {

protected def buildSparkSession(): SparkSession = {
if (localTableMapping.nonEmpty) {
val localSession = SparkSessionBuilder.build(subcommandName(), local = true, localWarehouseLocation.toOption)
val localSession = SparkSessionBuilder.build(subcommandName(),
local = true,
localWarehouseLocation = localWarehouseLocation.toOption)
localTableMapping.foreach {
case (table, filePath) =>
val file = new File(filePath)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ object SparkSessionBuilder {
// we would want to share locally generated warehouse during CI testing
def build(name: String,
local: Boolean = false,
hiveSupport: Boolean = true,
localWarehouseLocation: Option[String] = None,
additionalConfig: Option[Map[String, String]] = None,
enforceKryoSerializer: Boolean = true): SparkSession = {
Expand All @@ -44,7 +45,10 @@ object SparkSessionBuilder {
var baseBuilder = SparkSession
.builder()
.appName(name)
.enableHiveSupport()

if (hiveSupport) baseBuilder = baseBuilder.enableHiveSupport()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is this needed as part of this PR? I don't see it in use - can we drop for a follow up?


baseBuilder = baseBuilder
.config("spark.sql.session.timeZone", "UTC")
//otherwise overwrite will delete ALL partitions, not just the ones it touches
.config("spark.sql.sources.partitionOverwriteMode", "dynamic")
Expand Down
7 changes: 4 additions & 3 deletions spark/src/main/scala/ai/chronon/spark/TableUtils.scala
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,7 @@ case class TableUtils(sparkSession: SparkSession) {
sparkSession.conf.get("spark.chronon.backfill.small_mode_cutoff", "5000").toInt
val backfillValidationEnforced: Boolean =
sparkSession.conf.get("spark.chronon.backfill.validation.enabled", "true").toBoolean
// Threshold to control whether or not to use bloomfilter on join backfill. If the backfill row approximate count is under this threshold, we will use bloomfilter.
// Threshold to control whether to use bloomfilter on join backfill. If the backfill row approximate count is under this threshold, we will use bloomfilter.
// default threshold is 100K rows
val bloomFilterThreshold: Long =
sparkSession.conf.get("spark.chronon.backfill.bloomfilter.threshold", "1000000").toLong
Expand Down Expand Up @@ -327,9 +327,9 @@ case class TableUtils(sparkSession: SparkSession) {
sql(creationSql)
} catch {
case _: TableAlreadyExistsException =>
logger.info(s"Table $tableName already exists, skipping creation")
println(s"Table $tableName already exists, skipping creation")
case e: Exception =>
logger.error(s"Failed to create table $tableName", e)
println(s"Failed to create table $tableName", e)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

revert

throw e
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue

Revert println statements back to logger calls

Using println for logging is not recommended in production code as it:

  1. Bypasses the logging framework's configuration
  2. Makes it harder to manage and redirect logs
  3. Reduces observability in production environments
-          println(s"Table $tableName already exists, skipping creation")
+          logger.info(s"Table $tableName already exists, skipping creation")
-          println(s"Failed to create table $tableName", e)
+          logger.error(s"Failed to create table $tableName", e)

Committable suggestion skipped: line range outside the PR's diff.

}
Expand Down Expand Up @@ -357,6 +357,7 @@ case class TableUtils(sparkSession: SparkSession) {
// so that an exception will be thrown below
dfRearranged
}
println(s"Repartitioning and writing into table $tableName".yellow)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

revert

repartitionAndWrite(finalizedDf, tableName, saveMode, stats, sortByCols)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue

Use logger instead of println for repartitioning status

For critical operations like repartitioning and writing data, it's important to maintain proper logging through the logging framework rather than using println.

-    println(s"Repartitioning and writing into table $tableName".yellow)
+    logger.info(s"Repartitioning and writing into table $tableName")

Committable suggestion skipped: line range outside the PR's diff.

}

Expand Down
154 changes: 154 additions & 0 deletions spark/src/main/scala/ai/chronon/spark/scripts/ObservabilityDemo.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,154 @@
package ai.chronon.spark.scripts

import ai.chronon
import ai.chronon.api.ColorPrinter.ColorString
import ai.chronon.api.Constants
import ai.chronon.api.DriftMetric
import ai.chronon.api.Extensions.MetadataOps
import ai.chronon.api.PartitionSpec
import ai.chronon.api.TileDriftSeries
import ai.chronon.api.TileSummarySeries
import ai.chronon.api.Window
import ai.chronon.online.KVStore
import ai.chronon.online.stats.DriftStore
import ai.chronon.spark.SparkSessionBuilder
import ai.chronon.spark.TableUtils
import ai.chronon.spark.stats.drift.Summarizer
import ai.chronon.spark.stats.drift.SummaryUploader
import ai.chronon.spark.stats.drift.scripts.PrepareData
import ai.chronon.spark.utils.InMemoryKvStore
import ai.chronon.spark.utils.MockApi

import java.util.concurrent.TimeUnit
import scala.concurrent.Await
import scala.concurrent.duration.Duration
import scala.util.ScalaJavaConversions.IteratorOps

object ObservabilityDemo {

def Time(message: String)(block: => Unit): Unit = {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🛠️ Refactor suggestion

Rename Time method to follow Scala naming conventions.

Method names in Scala should start with lowercase letters.

-  def Time(message: String)(block: => Unit): Unit = {
+  def time(message: String)(block: => Unit): Unit = {
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
def Time(message: String)(block: => Unit): Unit = {
def time(message: String)(block: => Unit): Unit = {

println(s"$message..".yellow)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: change to log message.

val start = System.currentTimeMillis()
block
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

do we need this to be threadsafe or no

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

not for this demo i think

val end = System.currentTimeMillis()
println(s"$message took ${end - start} ms".green)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: change to log

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done

}

def main(args: Array[String]): Unit = {

val startDs = "2023-01-01"
val endDs = "2023-02-30"
val rowCount = 700000
val namespace = "observability_demo"
val spark = SparkSessionBuilder.build(namespace, local = true)
implicit val tableUtils: TableUtils = TableUtils(spark)
tableUtils.createDatabase(namespace)

// generate anomalous data (join output)
val prepareData = PrepareData(namespace)
val join = prepareData.generateAnomalousFraudJoin

Time("Preparing data") {
val df = prepareData.generateFraudSampleData(rowCount, startDs, endDs, join.metaData.loggedTable)
df.show(10, truncate = false)
}

Time("Summarizing data") {
// compute summary table and packed table (for uploading)
Summarizer.compute(join.metaData, ds = endDs, useLogs = true)
}

val packedTable = join.metaData.packedSummaryTable
// mock api impl for online fetching and uploading
val kvStoreFunc: () => KVStore = () => {
// cannot reuse the variable - or serialization error
val result = InMemoryKvStore.build(namespace, () => null)
result
}
val api = new MockApi(kvStoreFunc, namespace)

// create necessary tables in kvstore
val kvStore = api.genKvStore
kvStore.create(Constants.MetadataDataset)
kvStore.create(Constants.TiledSummaryDataset)

// upload join conf
api.buildFetcher().putJoinConf(join)

Time("Uploading summaries") {
val uploader = new SummaryUploader(tableUtils.loadTable(packedTable), api)
uploader.run()
}

// test drift store methods
val driftStore = new DriftStore(api.genKvStore)

// TODO: Wire up drift store into hub and create an endpoint
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

hmm wiring up to kick off play from within the JVM process might end up being painful (it's typically triggered via the command line to launch play.core.server.ProdServerStart with the appropriate params / jars etc)


// fetch keys
val tileKeys = driftStore.tileKeysForJoin(join)
val tileKeysSimple = tileKeys.mapValues(_.map(_.column).toSeq)
tileKeysSimple.foreach { case (k, v) => println(s"$k -> [${v.mkString(", ")}]") }

// fetch summaries
val startMs = PartitionSpec.daily.epochMillis(startDs)
val endMs = PartitionSpec.daily.epochMillis(endDs)
val summariesFuture = driftStore.getSummaries(join, Some(startMs), Some(endMs), None)
val summaries = Await.result(summariesFuture, Duration.create(10, TimeUnit.SECONDS))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🛠️ Refactor suggestion

Make timeout values configurable.

Multiple operations use hardcoded 10-second timeouts. These should be configurable to accommodate different environments and network conditions.

+    val defaultTimeout = Duration.create(
+      config.timeout.getOrElse(10),
+      TimeUnit.SECONDS
+    )
+
-    val summaries = Await.result(summariesFuture, Duration.create(10, TimeUnit.SECONDS))
+    val summaries = Await.result(summariesFuture, defaultTimeout)

-    driftSeries = Await.result(driftSeriesFuture.get, Duration.create(10, TimeUnit.SECONDS))
+    driftSeries = Await.result(driftSeriesFuture.get, defaultTimeout)

-    summarySeries = Await.result(summarySeriesFuture.get, Duration.create(10, TimeUnit.SECONDS))
+    summarySeries = Await.result(summarySeriesFuture.get, defaultTimeout)

Add timeout configuration to the Conf class:

val timeout: ScallopOption[Int] = opt[Int](
  name = "timeout",
  default = Some(10),
  descr = "Timeout in seconds for async operations"
)

Also applies to: 145-145, 169-169

println(summaries)

var driftSeries: Seq[TileDriftSeries] = null
// fetch drift series
Comment on lines +135 to +136
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🛠️ Refactor suggestion

Replace mutable state with immutable variables.

Use val instead of var to prevent accidental modifications and improve code clarity.

-    var driftSeries: Seq[TileDriftSeries] = null
+    val driftSeries: Seq[TileDriftSeries] = time("Fetching drift series") {
+      val driftSeriesFuture = driftStore.getDriftSeries(
+        // ... existing parameters ...
+      )
+      Await.result(driftSeriesFuture.get, defaultTimeout)
+    }

-    var summarySeries: Seq[TileSummarySeries] = null
+    val summarySeries: Seq[TileSummarySeries] = time("Fetching summary series") {
+      // ... move the fetching logic here ...
+    }

Also applies to: 161-161

Time("Fetching drift series") {
val driftSeriesFuture = driftStore.getDriftSeries(
join.metaData.nameToFilePath,
DriftMetric.JENSEN_SHANNON,
lookBack = new Window(7, chronon.api.TimeUnit.DAYS),
startMs,
endMs
)
driftSeries = Await.result(driftSeriesFuture.get, Duration.create(10, TimeUnit.SECONDS))
}

val (nulls, totals) = driftSeries.iterator.foldLeft(0 -> 0) {
case ((nulls, total), s) =>
val currentNulls = s.getPercentileDriftSeries.iterator().toScala.count(_ == null)
val currentCount = s.getPercentileDriftSeries.size()
(nulls + currentNulls, total + currentCount)
}

println(s"""drift totals: $totals
|drift nulls: $nulls
|""".stripMargin.red)

println("Drift series fetched successfully".green)

var summarySeries: Seq[TileSummarySeries] = null

Time("Fetching summary series") {
val summarySeriesFuture = driftStore.getSummarySeries(
join.metaData.nameToFilePath,
startMs,
endMs
)
summarySeries = Await.result(summarySeriesFuture.get, Duration.create(10, TimeUnit.SECONDS))
}

val (summaryNulls, summaryTotals) = summarySeries.iterator.foldLeft(0 -> 0) {
case ((nulls, total), s) =>
if (s.getPercentiles == null) {
(nulls + 1) -> (total + 1)
} else {
val currentNulls = s.getPercentiles.iterator().toScala.count(_ == null)
val currentCount = s.getPercentiles.size()
(nulls + currentNulls, total + currentCount)
}
}
println(s"""summary ptile totals: $summaryTotals
|summary ptile nulls: $summaryNulls
|""".stripMargin)

println("Summary series fetched successfully".green)
}

}
Loading