Skip to content

Conversation

@nikhil-zlai
Copy link
Contributor

@nikhil-zlai nikhil-zlai commented Nov 24, 2024

Summary

Checklist

  • Added Unit Tests
  • Covered by existing CI
  • Integration tested
  • Documentation update

Summary by CodeRabbit

  • New Features

    • Enhanced logging configuration for Spark sessions to reduce verbosity.
    • Improved timing and error handling in the data generation script.
    • New method introduced for alternative streaming data handling in OnlineUtils.
    • Added a demonstration object for observability features in Spark applications.
    • New configuration file for structured logging setup.
  • Bug Fixes

    • Adjusted method signatures to ensure clarity and correct parameter usage in various classes.
  • Documentation

    • Updated import statements to reflect package restructuring for better organization.
    • Added instructions for building and executing the project in the README.
  • Tests

    • Integrated MockApi into various test classes to enhance testing capabilities and simulate API interactions.
    • Enhanced test coverage by utilizing the MockApi for more robust testing scenarios.

@coderabbitai
Copy link
Contributor

coderabbitai bot commented Nov 24, 2024

Walkthrough

The pull request introduces several changes across multiple files, primarily focused on enhancing logging configurations, improving error handling, and restructuring package declarations. Key modifications include adding a logging level configuration for Spark sessions, introducing timing mechanisms in scripts, and updating method signatures to require explicit parameters. Additionally, several classes and methods have undergone package restructuring for better organization. The changes do not alter the core functionality but aim to improve clarity, robustness, and testing capabilities.

Changes

File Path Change Summary
docker-init/generate_anomalous_data.py Added logging level configuration to Spark session initialization.
docker-init/start.sh Introduced timing mechanism, enhanced error handling for configuration files, and updated success messages to include execution duration.
online/src/main/scala/ai/chronon/online/stats/DriftStore.scala Modified getSummaries method to require explicit columnPrefix parameter.
spark/src/main/scala/ai/chronon/spark/Driver.scala Updated buildSparkSession method calls for clarity by explicitly naming parameters.
spark/src/main/scala/ai/chronon/spark/SparkSessionBuilder.scala Added hiveSupport parameter to build method for conditional Hive support.
spark/src/main/scala/ai/chronon/spark/stats/drift/scripts/PrepareData.scala Refactored package declaration and imports; minor formatting changes for readability.
spark/src/main/scala/ai/chronon/spark/utils/InMemoryKvStore.scala Changed package declaration without altering functionality.
spark/src/main/scala/ai/chronon/spark/utils/InMemoryStream.scala Changed package declaration without altering functionality.
spark/src/main/scala/ai/chronon/spark/utils/MockApi.scala Changed package declaration and removed unused import; functionality remains unchanged.
spark/src/test/scala/ai/chronon/spark/test/ChainingFetcherTest.scala Added import for MockApi to enhance testing capabilities.
spark/src/test/scala/ai/chronon/spark/test/ExternalSourcesTest.scala Added import for MockApi for enhanced testing capabilities.
spark/src/test/scala/ai/chronon/spark/test/FetcherTest.scala Added import for MockApi and updated compareTemporalFetch method to use MockApi for testing fetch operations.
spark/src/test/scala/ai/chronon/spark/test/GroupByUploadTest.scala Added import for MockApi and expanded listingRatingCategoryJoinSourceTest method for complex testing scenarios.
spark/src/test/scala/ai/chronon/spark/test/JavaFetcherTest.java Updated SparkSession instantiation to include new parameters; added imports for InMemoryKvStore and MockApi.
spark/src/test/scala/ai/chronon/spark/test/LocalDataLoaderTest.scala Updated parameter name for clarity in SparkSession instantiation.
spark/src/test/scala/ai/chronon/spark/test/LocalTableExporterTest.scala Updated parameter name for clarity in SparkSession instantiation.
spark/src/test/scala/ai/chronon/spark/test/OnlineUtils.scala Added imports for InMemoryKvStore and MockApi; introduced new putStreamingNew method for enhanced streaming data handling.
spark/src/test/scala/ai/chronon/spark/test/SchemaEvolutionTest.scala Added imports for InMemoryKvStore and MockApi to enhance testing capabilities.
spark/src/test/scala/ai/chronon/spark/test/SchemaEvolutionUtils.scala Updated runLogSchemaGroupBy method to include mockApi parameter for enhanced functionality.
spark/src/test/scala/ai/chronon/spark/test/StreamingTest.scala Updated buildInMemoryKvStore method to return an instance of InMemoryKvStore.
spark/src/test/scala/ai/chronon/spark/test/bootstrap/DerivationTest.scala Updated import statement for MockApi to reflect package restructuring.
spark/src/test/scala/ai/chronon/spark/test/bootstrap/LogBootstrapTest.scala Updated import statement for MockApi to reflect package restructuring.
spark/src/test/scala/ai/chronon/spark/test/stats/drift/DriftTest.scala Updated import statements and modified getSummaries method call to include an additional parameter.
docker-init/demo/Dockerfile Introduced a new setup for an Apache Spark environment with Java 17 using Amazon Corretto, including installation steps and environment variable configurations.
docker-init/demo/README.md Added instructions for building and executing the project, clarifying the build and execution process for users.
docker-init/demo/build.sh Added command to execute a Docker build process, facilitating the creation of a Docker image.
docker-init/demo/run.sh Introduced a shell script for managing a Docker container running a Spark application, including configurations for memory settings and the Spark job execution command.
spark/src/main/scala/ai/chronon/spark/TableUtils.scala Updated comments for clarity and modified logging practices within the insertPartitions method.
spark/src/main/scala/ai/chronon/spark/scripts/ObservabilityDemo.scala Introduced a new object for observability features in a Spark application, including data preparation, summarization, and uploading results to a key-value store.
spark/src/main/resources/logback.xml Added a new configuration file for Logback logging, setting up a console appender and defining logging levels.

Possibly related PRs

  • Wire up Play frontend + server in docker setup #24: The changes in this PR involve modifications to the generate_anomalous_data.py file, which is directly related to the main PR's focus on logging configuration in the Spark session initialization.
  • Summary upload #50: This PR introduces a SummaryUploader class that manages the uploading of summary statistics, which may interact with the data generation process mentioned in the main PR.
  • feat: enable slf4j logging to work with logback #76: The addition of logging configuration in this PR complements the logging level configuration change made in the main PR, enhancing the overall logging capabilities of the application.

Suggested reviewers

  • chewy-zlai
  • piyush-zlai

Poem

🐇 In the meadow where bunnies hop,
Changes made, we’ll never stop!
With logs set to WARN, clear and bright,
Our scripts now run with pure delight!
From tests to sessions, all is well,
In code we trust, let’s ring the bell! 🛎️


🪧 Tips

Chat

There are 3 ways to chat with CodeRabbit:

  • Review comments: Directly reply to a review comment made by CodeRabbit. Example:
    • I pushed a fix in commit <commit_id>, please review it.
    • Generate unit testing code for this file.
    • Open a follow-up GitHub issue for this discussion.
  • Files and specific lines of code (under the "Files changed" tab): Tag @coderabbitai in a new review comment at the desired location with your query. Examples:
    • @coderabbitai generate unit testing code for this file.
    • @coderabbitai modularize this function.
  • PR comments: Tag @coderabbitai in a new PR comment to ask questions about the PR branch. For the best results, please provide a very specific query, as very limited context is provided in this mode. Examples:
    • @coderabbitai gather interesting stats about this repository and render them as a table. Additionally, render a pie chart showing the language distribution in the codebase.
    • @coderabbitai read src/utils.ts and generate unit testing code.
    • @coderabbitai read the files in the src/scheduler package and generate a class diagram using mermaid and a README in the markdown format.
    • @coderabbitai help me debug CodeRabbit configuration file.

Note: Be mindful of the bot's finite context window. It's strongly recommended to break down tasks such as reading entire modules into smaller chunks. For a focused discussion, use review comments to chat about specific files and their changes, instead of using the PR comments.

CodeRabbit Commands (Invoked using PR comments)

  • @coderabbitai pause to pause the reviews on a PR.
  • @coderabbitai resume to resume the paused reviews.
  • @coderabbitai review to trigger an incremental review. This is useful when automatic reviews are disabled for the repository.
  • @coderabbitai full review to do a full review from scratch and review all the files again.
  • @coderabbitai summary to regenerate the summary of the PR.
  • @coderabbitai resolve resolve all the CodeRabbit review comments.
  • @coderabbitai configuration to show the current CodeRabbit configuration for the repository.
  • @coderabbitai help to get help.

Other keywords and placeholders

  • Add @coderabbitai ignore anywhere in the PR description to prevent this PR from being reviewed.
  • Add @coderabbitai summary to generate the high-level summary at a specific location in the PR description.
  • Add @coderabbitai anywhere in the PR title to generate the title automatically.

CodeRabbit Configuration File (.coderabbit.yaml)

  • You can programmatically configure CodeRabbit by adding a .coderabbit.yaml file to the root of your repository.
  • Please see the configuration documentation for more information.
  • If your editor has YAML language server enabled, you can add the path at the top of this file to enable auto-completion and validation: # yaml-language-server: $schema=https://coderabbit.ai/integrations/schema.v2.json

Documentation and Community

  • Visit our Documentation for detailed information on how to use CodeRabbit.
  • Join our Discord Community to get help, request features, and share feedback.
  • Follow us on X/Twitter for updates and announcements.

Copy link
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 4

🧹 Outside diff range and nitpick comments (20)
spark/src/test/scala/ai/chronon/spark/test/SchemaEvolutionUtils.scala (1)

Line range hint 25-36: Good use of dependency injection pattern

The addition of the mockApi parameter makes the dependencies explicit and improves testability. The implementation properly utilizes the MockApi instance for accessing required properties.

spark/src/test/scala/ai/chronon/spark/test/LocalDataLoaderTest.scala (2)

Line range hint 49-63: Consider enhancing test coverage with negative scenarios.

While the happy path is well tested, consider adding test cases for:

  • Invalid CSV file format
  • Missing required columns
  • Empty files
  • Files with special characters in names

Would you like me to provide example test cases for these scenarios?


Line range hint 65-78: Consider adding validation for column data types.

The test verifies column names and row count but doesn't validate the data types of the columns. Consider adding assertions to verify that the loaded data maintains the expected schema.

Example addition:

val expectedSchema = spark.sql(s"SELECT * FROM $nameSpaceAndTable").schema
assert(expectedSchema("id_listing_view_event").dataType == IntegerType)
assert(expectedSchema("ds").dataType == DateType)
docker-init/start.sh (4)

3-10: Consider using SECONDS bash variable for timing

While the current implementation works correctly, using the built-in SECONDS variable would be more idiomatic in bash:

-start_time=$(date +%s)
+SECONDS=0
 if ! python3.8 generate_anomalous_data.py; then
     echo "Error: Failed to generate anomalous data" >&2
     exit 1
 else
-    end_time=$(date +%s)
-    elapsed_time=$((end_time - start_time))
-    echo "Anomalous data generated successfully! Took $elapsed_time seconds."
+    echo "Anomalous data generated successfully! Took $SECONDS seconds."
 fi

32-35: Add timing measurement for consistency

For consistency with other operations, consider adding timing measurement to the metadata loading step.

 echo "Loading metadata.."
+SECONDS=0
 if ! java -Dlog4j.configurationFile=log4j.properties -cp $SPARK_JAR:$CLASSPATH ai.chronon.spark.Driver metadata-upload --conf-path=/chronon_sample/production/ --online-jar=$CLOUD_AWS_JAR --online-class=$ONLINE_CLASS; then
   echo "Error: Failed to load metadata into DynamoDB" >&2
   exit 1
 fi
-echo "Metadata load completed successfully!"
+echo "Metadata load completed successfully! Took $SECONDS seconds."

Line range hint 40-46: Add timing measurement for consistency

For consistency with other operations, consider adding timing measurement to the DynamoDB initialization step.

 echo "Initializing DynamoDB Table .."
+SECONDS=0
 if ! output=$(java -Dlog4j.configurationFile=log4j.properties -cp $SPARK_JAR:$CLASSPATH ai.chronon.spark.Driver create-summary-dataset \
   --online-jar=$CLOUD_AWS_JAR \
   --online-class=$ONLINE_CLASS 2>&1); then
   echo "Error: Failed to bring up DynamoDB table" >&2
   echo "Java command output: $output" >&2
   exit 1
 fi
-echo "DynamoDB Table created successfully!"
+echo "DynamoDB Table created successfully! Took $SECONDS seconds."

Line range hint 50-66: Refactor timing implementation and Java options

Several improvements can be made to this section:

  1. Use SECONDS for consistency with earlier suggestion
  2. Java module options are duplicated with the JAVA_OPTS export at the end
  3. Long command line could be more readable
-start_time=$(date +%s)
+SECONDS=0

 if ! java -Dlog4j.configurationFile=log4j.properties \
-  --add-opens=java.base/sun.nio.ch=ALL-UNNAMED \
-  --add-opens=java.base/sun.security.action=ALL-UNNAMED \
   -cp $SPARK_JAR:$CLASSPATH ai.chronon.spark.Driver summarize-and-upload \
   --online-jar=$CLOUD_AWS_JAR \
   --online-class=$ONLINE_CLASS \
   --parquet-path="$(pwd)/drift_data" \
   --conf-path=/chronon_sample/production/ \
   --time-column=transaction_time; then
   echo "Error: Failed to load summary data into DynamoDB" >&2
   exit 1
 else
-  end_time=$(date +%s)
-  elapsed_time=$((end_time - start_time))
-  echo "Summary load completed successfully! Took $elapsed_time seconds."
+  echo "Summary load completed successfully! Took $SECONDS seconds."
 fi

Consider moving the Java options to a variable at the start of the script:

# At the start of the script
JAVA_MODULE_OPTIONS="--add-opens=java.base/java.lang=ALL-UNNAMED \
  --add-opens=java.base/java.util=ALL-UNNAMED \
  --add-opens=java.base/sun.nio.ch=ALL-UNNAMED \
  --add-opens=java.base/sun.security.action=ALL-UNNAMED"
spark/src/test/scala/ai/chronon/spark/test/JavaFetcherTest.java (1)

45-47: Well-structured test infrastructure enhancement!

The introduction of MockApi and InMemoryKvStore improves test isolation and maintainability. The initialization chain is logically structured:

  1. TableUtils with SparkSession
  2. InMemoryKvStore with TableUtils
  3. MockApi with KvStore
  4. JavaFetcher from MockApi

Consider documenting the test infrastructure setup pattern in the test package's README.md to help other contributors follow this pattern consistently.

spark/src/test/scala/ai/chronon/spark/test/StreamingTest.scala (1)

Line range hint 39-43: Consider improving the configuration and resource management.

Several improvements could enhance the robustness and maintainability of this method:

  1. Extract "StreamingTest" as a constant to avoid duplication
  2. Consider making the local mode configurable for different test environments
  3. Consider caching the TableUtils instance instead of creating a new one in each lambda call

Here's a suggested improvement:

object StreamingTest {
+ private val TestName = "StreamingTest"
+ private val LocalMode = true
+ @volatile private var tableUtilsInstance: TableUtils = _
+
  def buildInMemoryKvStore(): InMemoryKvStore = {
-    InMemoryKvStore.build("StreamingTest",
-                          { () => TableUtils(SparkSessionBuilder.build("StreamingTest", local = true)) })
+    InMemoryKvStore.build(TestName, { () =>
+      if (tableUtilsInstance == null) {
+        synchronized {
+          if (tableUtilsInstance == null) {
+            tableUtilsInstance = TableUtils(SparkSessionBuilder.build(TestName, local = LocalMode))
+          }
+        }
+      }
+      tableUtilsInstance
+    })
  }
}
spark/src/main/scala/ai/chronon/spark/SparkSessionBuilder.scala (1)

48-50: LGTM! Good optimization for resource usage.

The conditional Hive support is well implemented. This change allows for more efficient Spark sessions when Hive support isn't needed, potentially reducing memory footprint and initialization time.

Consider documenting the performance benefits in the project's performance tuning guide, as this parameter can be useful for optimizing resource usage in scenarios where Hive support is unnecessary.

spark/src/test/scala/ai/chronon/spark/test/ExternalSourcesTest.scala (1)

Line range hint 33-186: Consider improving test maintainability.

While the test is comprehensive and well-structured, consider these improvements:

  1. Extract the timeout duration to a constant
  2. Break down the long test method into smaller, focused test methods
  3. Use named constants for test data ranges instead of magic numbers

Example refactor for the timeout:

+ private val FetchTimeout = Duration(10, SECONDS)
  
  val responsesF = fetcher.fetchJoin(requests)
- val responses = Await.result(responsesF, Duration(10, SECONDS))
+ val responses = Await.result(responsesF, FetchTimeout)
online/src/main/scala/ai/chronon/online/stats/DriftStore.scala (1)

Line range hint 119-123: Improve error handling and add instrumentation.

The current error handling approach has a few issues:

  1. Using printStackTrace is not suitable for production as it:

    • Writes directly to stderr
    • Doesn't integrate with the logging system
    • Makes it difficult to track and monitor errors
  2. The TODO comment indicates missing instrumentation for failures

Consider replacing with proper logging and metrics:

- // TODO instrument failures
- case Failure(exception) => exception.printStackTrace(); None
+ case Failure(exception) =>
+   logger.error("Failed to process drift summary response", exception)
+   metrics.incrementCounter("drift.summary.failures")
+   None
spark/src/test/scala/ai/chronon/spark/test/OnlineUtils.scala (2)

Line range hint 201-203: Technical debt: Investigate and document test harness quirk

The TODO comment indicates uncertainty about the dropDsOnWrite parameter's purpose. This technical debt should be addressed by:

  1. Investigating why this parameter is necessary
  2. Documenting the findings
  3. Refactoring the test harness to remove the quirk if possible

Would you like me to help create a GitHub issue to track this investigation?


Line range hint 146-149: Improve async operation handling

The current implementation uses a fixed Thread.sleep(5000) to handle async operations, which is fragile and could lead to flaky tests. Consider:

  1. Implementing a proper async wait mechanism with timeout
  2. Adding error handling for async operations
  3. Using a more reliable approach like polling or callbacks

Example improvement:

def awaitAsyncCompletion(maxWaitMs: Long = 5000, pollIntervalMs: Long = 100): Unit = {
  val endTime = System.currentTimeMillis() + maxWaitMs
  while (System.currentTimeMillis() < endTime && !isAsyncWorkComplete) {
    Thread.sleep(pollIntervalMs)
  }
  if (!isAsyncWorkComplete) {
    throw new TimeoutException(s"Async operations did not complete within ${maxWaitMs}ms")
  }
}
spark/src/main/scala/ai/chronon/spark/stats/drift/scripts/PrepareData.scala (6)

163-167: Add documentation for the fraud patterns being generated.

Consider adding scaladoc comments explaining:

  • The types of fraud patterns this generates
  • The rationale behind the chosen window sizes
  • Expected usage scenarios

163-167: Add parameter validation for numerical inputs.

Consider adding validation for:

  • baseValue should be non-negative
  • amplitude should be non-negative
  • noiseLevel should be non-negative
 def timeToValue(t: LocalTime,
                 baseValue: Double,
                 amplitude: Double,
                 noiseLevel: Double,
                 scale: Double = 1.0): java.lang.Double = {
+  require(baseValue >= 0, "baseValue must be non-negative")
+  require(amplitude >= 0, "amplitude must be non-negative")
+  require(noiseLevel >= 0, "noiseLevel must be non-negative")
   if (scale == 0) null
   else {

Line range hint 345-354: Improve progress reporting frequency.

The current implementation logs every 100,000 rows, which might be too infrequent for smaller datasets. Consider:

  1. Making the logging frequency configurable
  2. Using a percentage-based approach
-      if (i % 100000 == 0) {
+      if (i % Math.max(numSamples / 10, 1000) == 0) {
-        println(s"Generated $i/$numSamples rows of data.")
+        println(s"Generated $i/$numSamples rows of data (${(i.toDouble/numSamples*100).toInt}%)")
       }

379-389: Consider batch processing for better memory management.

The current implementation builds the entire dataset in memory. For large datasets, consider:

  1. Processing in batches
  2. Using Spark's RDD API more effectively

Line range hint 391-433: Externalize configuration values.

Consider moving hardcoded values to configuration:

  • Country lists
  • Language codes
  • Random value ranges
    This would make the data generation more flexible and maintainable.

477-477: Enhance path handling robustness.

Consider adding:

  1. Path normalization
  2. Directory creation if needed
  3. File existence checks
  4. Platform-specific path separator handling
📜 Review details

Configuration used: CodeRabbit UI
Review profile: CHILL

📥 Commits

Reviewing files that changed from the base of the PR and between 37960d0 and 672a94e.

📒 Files selected for processing (23)
  • docker-init/generate_anomalous_data.py (1 hunks)
  • docker-init/start.sh (4 hunks)
  • online/src/main/scala/ai/chronon/online/stats/DriftStore.scala (1 hunks)
  • spark/src/main/scala/ai/chronon/spark/Driver.scala (1 hunks)
  • spark/src/main/scala/ai/chronon/spark/SparkSessionBuilder.scala (2 hunks)
  • spark/src/main/scala/ai/chronon/spark/stats/drift/scripts/PrepareData.scala (15 hunks)
  • spark/src/main/scala/ai/chronon/spark/utils/InMemoryKvStore.scala (2 hunks)
  • spark/src/main/scala/ai/chronon/spark/utils/InMemoryStream.scala (1 hunks)
  • spark/src/main/scala/ai/chronon/spark/utils/MockApi.scala (1 hunks)
  • spark/src/test/scala/ai/chronon/spark/test/ChainingFetcherTest.scala (1 hunks)
  • spark/src/test/scala/ai/chronon/spark/test/ExternalSourcesTest.scala (1 hunks)
  • spark/src/test/scala/ai/chronon/spark/test/FetcherTest.scala (1 hunks)
  • spark/src/test/scala/ai/chronon/spark/test/GroupByUploadTest.scala (1 hunks)
  • spark/src/test/scala/ai/chronon/spark/test/JavaFetcherTest.java (2 hunks)
  • spark/src/test/scala/ai/chronon/spark/test/LocalDataLoaderTest.scala (1 hunks)
  • spark/src/test/scala/ai/chronon/spark/test/LocalTableExporterTest.scala (1 hunks)
  • spark/src/test/scala/ai/chronon/spark/test/OnlineUtils.scala (1 hunks)
  • spark/src/test/scala/ai/chronon/spark/test/SchemaEvolutionTest.scala (1 hunks)
  • spark/src/test/scala/ai/chronon/spark/test/SchemaEvolutionUtils.scala (1 hunks)
  • spark/src/test/scala/ai/chronon/spark/test/StreamingTest.scala (1 hunks)
  • spark/src/test/scala/ai/chronon/spark/test/bootstrap/DerivationTest.scala (1 hunks)
  • spark/src/test/scala/ai/chronon/spark/test/bootstrap/LogBootstrapTest.scala (1 hunks)
  • spark/src/test/scala/ai/chronon/spark/test/stats/drift/DriftTest.scala (2 hunks)
✅ Files skipped from review due to trivial changes (8)
  • docker-init/generate_anomalous_data.py
  • spark/src/main/scala/ai/chronon/spark/utils/InMemoryKvStore.scala
  • spark/src/main/scala/ai/chronon/spark/utils/InMemoryStream.scala
  • spark/src/main/scala/ai/chronon/spark/utils/MockApi.scala
  • spark/src/test/scala/ai/chronon/spark/test/ChainingFetcherTest.scala
  • spark/src/test/scala/ai/chronon/spark/test/SchemaEvolutionTest.scala
  • spark/src/test/scala/ai/chronon/spark/test/bootstrap/DerivationTest.scala
  • spark/src/test/scala/ai/chronon/spark/test/bootstrap/LogBootstrapTest.scala
🔇 Additional comments (21)
spark/src/test/scala/ai/chronon/spark/test/GroupByUploadTest.scala (1)

28-28: Import Statement Added Appropriately

The addition of import ai.chronon.spark.utils.MockApi is appropriate and necessary for utilizing MockApi within the GroupByUploadTest class. This import allows for effective simulation of API interactions in your tests, enhancing test coverage and reliability.

spark/src/test/scala/ai/chronon/spark/test/SchemaEvolutionUtils.scala (2)

22-22: LGTM: Import statement is correctly structured

The import statement for MockApi follows proper Scala conventions and aligns with the package restructuring.


Line range hint 25-36: Verify test coverage and caller updates

Since this is a test utility method with a signature change, we should verify that all callers have been updated and tests are passing.

✅ Verification successful

Let me analyze the search results. I can see that runLogSchemaGroupBy is called in four test files:

  1. LogBootstrapTest.scala
  2. DerivationTest.scala
  3. SchemaEvolutionTest.scala
  4. FetcherTest.scala

Let's verify if all these calls are using the correct number of arguments by checking their implementations:


All callers have been updated with the correct parameters

All four callers of runLogSchemaGroupBy are using the method with three parameters (mockApi, date string, backfill date string), which matches the method signature in SchemaEvolutionUtils. The test coverage appears complete as all usages have been properly updated.

🏁 Scripts executed

The following scripts were executed for the analysis:

Script:

#!/bin/bash
# Search for all usages of runLogSchemaGroupBy to ensure they've been updated
# with the new parameter

echo "Searching for method calls..."
rg -t scala "runLogSchemaGroupBy\(" --no-heading

# Look for potential test files that might use this utility
echo -e "\nSearching for related test files..."
fd -e scala -t f "Test.scala$" spark/src/test

Length of output: 3446


Script:

#!/bin/bash
# Check the implementation of each caller
echo "LogBootstrapTest.scala implementation:"
rg -A 2 -B 2 "runLogSchemaGroupBy" spark/src/test/scala/ai/chronon/spark/test/bootstrap/LogBootstrapTest.scala

echo -e "\nDerivationTest.scala implementation:"
rg -A 2 -B 2 "runLogSchemaGroupBy" spark/src/test/scala/ai/chronon/spark/test/bootstrap/DerivationTest.scala

echo -e "\nSchemaEvolutionTest.scala implementation:"
rg -A 2 -B 2 "runLogSchemaGroupBy" spark/src/test/scala/ai/chronon/spark/test/SchemaEvolutionTest.scala

echo -e "\nFetcherTest.scala implementation:"
rg -A 2 -B 2 "runLogSchemaGroupBy" spark/src/test/scala/ai/chronon/spark/test/FetcherTest.scala

Length of output: 2044

spark/src/test/scala/ai/chronon/spark/test/LocalDataLoaderTest.scala (1)

38-38: LGTM! Verify consistent parameter naming across the codebase.

The explicit parameter naming improves code readability. Let's verify this naming convention is consistently applied across other SparkSessionBuilder usages.

✅ Verification successful

Parameter naming is consistently applied across the codebase

The search results show that all usages of SparkSessionBuilder.build across the codebase consistently use explicit parameter naming, including:

  • local = true
  • localWarehouseLocation = Some(...)
  • enforceKryoSerializer = ...
  • additionalConfig = Some(...)
🏁 Scripts executed

The following scripts were executed for the analysis:

Script:

#!/bin/bash
# Description: Check for other SparkSessionBuilder.build calls to ensure consistent parameter naming

# Search for SparkSessionBuilder.build calls
rg -t scala "SparkSessionBuilder\.build" -A 3

Length of output: 20476

spark/src/test/scala/ai/chronon/spark/test/JavaFetcherTest.java (2)

25-26: LGTM! Clean import additions.

The new imports for test utilities are properly organized and necessary for the enhanced testing infrastructure.


44-44: Verify SparkSessionBuilder usage across test files.

The SparkSessionBuilder.build() method signature has been updated with an additional boolean parameter. Let's ensure all test files are updated consistently.

✅ Verification successful

Let me analyze the SparkSessionBuilder.build usage across the codebase. I notice there are different method signatures being used:

  1. Two parameters: build(namespace, local = true)
  2. Three parameters: build(namespace, local = true, localWarehouseLocation = Some(path))
  3. Six parameters (in JavaFetcherTest): build(namespace, true, true, scala.Option.apply(null), scala.Option.apply(null), true)

Let's verify the actual method signatures in SparkSessionBuilder to understand if this is a valid overload:


Let's try a different approach to find the SparkSessionBuilder definition:


Let's see the complete method signature to understand all parameters:


The SparkSessionBuilder.build usage in JavaFetcherTest is correct

The method signature in SparkSessionBuilder matches exactly with how it's being used in JavaFetcherTest:

def build(name: String,
          local: Boolean = false,
          hiveSupport: Boolean = true,
          localWarehouseLocation: Option[String] = None,
          additionalConfig: Option[Map[String, String]] = None,
          enforceKryoSerializer: Boolean = true)

The Java code correctly calls this method with all six parameters:

SparkSessionBuilder.build(namespace, true, true, scala.Option.apply(null), scala.Option.apply(null), true);

All other usages in the codebase are also correct as they rely on the default parameter values. No issues found.

🏁 Scripts executed

The following scripts were executed for the analysis:

Script:

#!/bin/bash
# Search for all SparkSessionBuilder.build calls to verify consistent parameter usage
rg -t scala -t java "SparkSessionBuilder\.build\(" -A 1

Length of output: 11473


Script:

#!/bin/bash
# Find SparkSessionBuilder class definition and its build methods
ast-grep --pattern 'object SparkSessionBuilder {
  $$$
  def build($$$) {
    $$$
  }
  $$$
}'

Length of output: 96


Script:

#!/bin/bash
# Search for SparkSessionBuilder definition using ripgrep
rg -t scala "object SparkSessionBuilder" -A 10

Length of output: 1300


Script:

#!/bin/bash
# Get the complete build method signature
rg -t scala "def build" -A 10 spark/src/main/scala/ai/chronon/spark/SparkSessionBuilder.scala

Length of output: 1165

spark/src/test/scala/ai/chronon/spark/test/StreamingTest.scala (2)

30-30: LGTM! Package restructuring looks good.

The import statement correctly reflects the movement of InMemoryKvStore from the test package to utils, suggesting its broader utility across the codebase.


Line range hint 114-117: Address potential resource management and configuration issues.

Several concerns in the test implementation:

  1. Multiple instances of InMemoryKvStore are created which could lead to resource leaks
  2. The timeout value (10000ms) is hardcoded
  3. Potential concurrency issues when using multiple store instances

Let's verify the usage of InMemoryKvStore across tests:

Consider these improvements:

- val inMemoryKvStore = buildInMemoryKvStore()
- val metadataStore = new MetadataStore(inMemoryKvStore, timeoutMillis = 10000)
+ private val DefaultTimeout = 10000
+ val store = buildInMemoryKvStore()
+ try {
+   val metadataStore = new MetadataStore(store, timeoutMillis = DefaultTimeout)
+   // ... existing code ...
+ } finally {
+   // Add cleanup if store implements AutoCloseable
+   store.close()
+ }
spark/src/test/scala/ai/chronon/spark/test/LocalTableExporterTest.scala (1)

44-44: LGTM! Parameter name change improves clarity.

The change from local to localWarehouseLocation makes the parameter's purpose more explicit and self-documenting.

Let's verify that this parameter naming is consistent across the codebase:

✅ Verification successful

Parameter localWarehouseLocation is only used in LocalTableExporterTest while other tests use only local

The search results show that:

  • All test files consistently use local = true
  • Only LocalTableExporterTest uses the additional localWarehouseLocation parameter
  • This suggests that localWarehouseLocation is a new parameter specific to table export functionality

Let's verify the SparkSessionBuilder implementation to understand if this is intentional:


Parameter usage is correct and consistent with the implementation

The codebase analysis reveals that:

  • SparkSessionBuilder.build method correctly defines both local and localWarehouseLocation as optional parameters
  • localWarehouseLocation is specifically used for customizing the warehouse directory path
  • LocalTableExporterTest is the only test that needs to specify a custom warehouse location, which makes sense as it deals with table exports
  • Other tests can safely use the default warehouse location by omitting this parameter
🏁 Scripts executed

The following scripts were executed for the analysis:

Script:

#!/bin/bash
# Description: Check for any remaining uses of the old parameter name
# and verify consistent usage of the new parameter name

# Search for SparkSessionBuilder.build calls
rg -t scala "SparkSessionBuilder\.build.*local\s*=" --no-filename

# Search for the new parameter usage
rg -t scala "SparkSessionBuilder\.build.*localWarehouseLocation\s*=" --no-filename

Length of output: 3415


Script:

#!/bin/bash
# Find SparkSessionBuilder definition and implementation
ast-grep --pattern 'object SparkSessionBuilder {
  $$$
  def build($$$) {
    $$$
  }
  $$$
}'

Length of output: 96


Script:

#!/bin/bash
# Try to find SparkSessionBuilder implementation using ripgrep
rg -t scala "object SparkSessionBuilder" -A 20

Length of output: 2407

spark/src/test/scala/ai/chronon/spark/test/stats/drift/DriftTest.scala (2)

16-18: LGTM: Import paths correctly updated

The import statements have been properly updated to reflect the package restructuring.


92-92: Verify all getSummaries calls are updated

The getSummaries method signature has been updated to require an explicit columnPrefix parameter. While this instance has been updated correctly with None, we should verify other usages.

✅ Verification successful

All getSummaries calls are properly updated

Based on the verification results, all calls to getSummaries are correctly updated with the columnPrefix parameter:

  • In DriftTest.scala: Uses None as shown in the original change
  • In DriftStore.scala: All calls pass the columnPrefix variable which is properly defined in the scope
🏁 Scripts executed

The following scripts were executed for the analysis:

Script:

#!/bin/bash
# Search for any remaining calls to getSummaries that might need updating
rg -l "getSummaries.*Some.*Some.*[^None]" --type scala

Length of output: 190


Script:

#!/bin/bash
# Let's examine the actual calls to getSummaries in these files
rg "getSummaries.*Some.*Some" --type scala -A 2

Length of output: 1598


Script:

#!/bin/bash
# Let's check the method definition to understand the expected parameters
ast-grep --pattern 'def getSummaries' --type scala

Length of output: 232


Script:

#!/bin/bash
# Let's try to find the method definition using ripgrep instead
rg "def getSummaries.*\(" --type scala -A 2

Length of output: 698


Script:

#!/bin/bash
# Let's see the complete method signature including the columnPrefix parameter
rg "def getSummaries.*\(" --type scala -B 1 -A 4

Length of output: 1379

spark/src/main/scala/ai/chronon/spark/SparkSessionBuilder.scala (1)

37-37: Add ScalaDoc for the new parameter.

Consider adding parameter documentation to explain the purpose and impact of hiveSupport.

Let's verify the impact of this signature change:

✅ Verification successful

The signature change has no breaking impact on existing code.

All existing calls to SparkSessionBuilder.build() in the codebase use named parameters, and the new hiveSupport parameter has a default value of true. The codebase shows consistent usage patterns:

  • Most calls use local = true with named parameter
  • Some calls include additional named parameters like localWarehouseLocation and enforceKryoSerializer
  • No instances of positional parameter usage found
🏁 Scripts executed

The following scripts were executed for the analysis:

Script:

#!/bin/bash
# Search for existing calls to build() that might be affected
rg "SparkSessionBuilder\.build\(" --type scala -A 3

Length of output: 20102

spark/src/test/scala/ai/chronon/spark/test/ExternalSourcesTest.scala (1)

22-22: LGTM! Package restructuring looks good.

The import change reflects a good architectural decision to move MockApi to the utils package, which is a more appropriate location for shared testing utilities.

Let's verify that the package restructuring was done consistently:

✅ Verification successful

Package restructuring has been consistently applied across the codebase

The verification confirms that:

  • MockApi class is correctly located in ai.chronon.spark.utils package
  • All imports (10 files in total) are consistently using the new package path ai.chronon.spark.utils.MockApi
  • No references to the old package path remain in the codebase
🏁 Scripts executed

The following scripts were executed for the analysis:

Script:

#!/bin/bash
# Description: Verify that MockApi is properly moved and there are no lingering references to the old package

# Check if MockApi exists in the new location
fd -e scala "MockApi.scala" -x cat {} | grep -l "package ai.chronon.spark.utils"

# Check for any remaining references to the old package path
rg "ai.chronon.spark.test.MockApi"

Length of output: 134


Script:

#!/bin/bash
# Let's try a different approach to verify the package restructuring

# First, let's find MockApi class in the codebase
rg -l "class MockApi"

# Then, let's find all imports of MockApi
rg -l "import.*MockApi"

# Finally, let's see the actual content of files containing MockApi
fd -e scala "MockApi.scala" -x cat {}

Length of output: 7952


Script:

#!/bin/bash
# Let's check the actual imports in the files to verify they've been updated correctly
rg -A 1 "import.*MockApi" --type scala

# Also check Java files separately as they might have different import syntax
rg -A 1 "import.*MockApi" --type java

Length of output: 2386

online/src/main/scala/ai/chronon/online/stats/DriftStore.scala (1)

77-77: LGTM! Verify all callers are updated.

The removal of the default value for columnPrefix makes the API more explicit, which is a good practice. The internal usage through getSummariesForRange maintains backward compatibility by providing the default.

Let's verify that all callers have been updated to provide the columnPrefix parameter explicitly:

✅ Verification successful

All callers have been updated to provide the columnPrefix parameter explicitly

The verification shows that all callers of getSummaries are providing the columnPrefix parameter explicitly:

  • In DriftStore.scala: All internal calls pass through the columnPrefix parameter correctly
  • In DriftTest.scala: The test explicitly provides None as the parameter
🏁 Scripts executed

The following scripts were executed for the analysis:

Script:

#!/bin/bash
# Search for direct calls to getSummaries to ensure they provide the columnPrefix parameter
ast-grep --pattern 'getSummaries($$$, $$$, $$$)' 

# Search for any remaining references that might need updating
rg -A 2 'getSummaries\(' 

Length of output: 2460

spark/src/test/scala/ai/chronon/spark/test/OnlineUtils.scala (2)

34-36: LGTM: Well-organized test utility imports

The new imports from ai.chronon.spark.utils package appropriately introduce the necessary test utilities for in-memory operations.


Line range hint 134-134: Clarify deprecation timeline for putStreaming method

The TODO comment indicates that putStreaming should be deprecated, but there's no clear timeline or migration plan. Consider:

  1. Adding a @Deprecated annotation with explanation
  2. Documenting the migration path to putStreamingNew
  3. Setting a timeline for removal
spark/src/main/scala/ai/chronon/spark/stats/drift/scripts/PrepareData.scala (1)

Line range hint 1-34: LGTM! Package reorganization improves code organization.

The move from test package to scripts package better reflects the utility nature of this code. The import organization is clean and well-structured.

spark/src/test/scala/ai/chronon/spark/test/FetcherTest.scala (3)

38-38: LGTM!

The import statement for MockApi is correctly placed and follows the project's import organization pattern.


Line range hint 479-481: LGTM! Good enhancement to test isolation.

The addition of the namespace parameter and MockApi initialization improves test isolation and makes the helper method more flexible.


Line range hint 673-677: LGTM! Well-structured test isolation.

Each test case uses a unique namespace, following testing best practices by ensuring proper isolation between test cases. This prevents potential interference and makes tests more reliable.

Also applies to: 679-684, 686-690

spark/src/main/scala/ai/chronon/spark/Driver.scala (1)

150-152: LGTM! Parameter naming improves code clarity.

The explicit naming of the localWarehouseLocation parameter enhances code readability and maintainability.

Let's verify consistent parameter naming across the codebase:

✅ Verification successful

Let's analyze the SparkSessionBuilder.build method calls across the codebase to verify parameter naming consistency.


Based on the analysis of the codebase, I can now provide the final response:

Parameter naming is consistent with method signature, but some calls could be improved

From the verification results:

  • The SparkSessionBuilder.build method has localWarehouseLocation as an optional named parameter
  • Most test files use minimal parameters (name and local only) which is fine as they use default values
  • Only a few files explicitly use localWarehouseLocation:
    • LocalTableExporterTest.scala
    • LocalDataLoaderTest.scala
    • Driver.scala (the file under review)

The explicit parameter naming in the reviewed change aligns with the method signature and improves readability where the parameter is actually used.

🏁 Scripts executed

The following scripts were executed for the analysis:

Script:

#!/bin/bash
# Description: Check for consistent parameter naming in SparkSessionBuilder.build calls
# Expect: All calls should use named parameters for localWarehouseLocation

# Search for all SparkSessionBuilder.build method calls
ast-grep --pattern 'SparkSessionBuilder.build($$$)'

Length of output: 8701


Script:

#!/bin/bash
# Let's examine the SparkSessionBuilder class to understand the method signature
ast-grep --pattern 'def build($$$)'

Length of output: 37


Script:

#!/bin/bash
# Let's try to find the SparkSessionBuilder class definition and its build method
rg -A 10 "object SparkSessionBuilder"

Length of output: 1291


Script:

#!/bin/bash
# Let's see the complete method signature of build
rg -A 20 "def build\(name: String," spark/src/main/scala/ai/chronon/spark/SparkSessionBuilder.scala

Length of output: 1028

Comment on lines 1 to +2
#!/bin/bash

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue

Add environment variable verification

The script uses several environment variables (SPARK_JAR, CLOUD_AWS_JAR, ONLINE_CLASS) but doesn't verify their presence at the start.

Add this at the beginning of the script:

# Verify required environment variables
for var in SPARK_JAR CLOUD_AWS_JAR ONLINE_CLASS; do
  if [[ -z "${!var}" ]]; then
    echo "Error: Required environment variable $var is not set" >&2
    exit 1
  fi
done

Copy link
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 15

🧹 Outside diff range and nitpick comments (9)
docker-init/demo/Dockerfile (4)

16-18: Document the user ID reference

Add a comment to clarify that user ID 185 refers to the spark user for better maintainability.

 RUN mkdir -p /opt/chronon/jars && \
-    chown -R 185:185 /opt/chronon && \
+    # 185 is the spark user ID in the base image
+    chown -R 185:185 /opt/chronon && \
     chmod 755 /opt/chronon/jars

21-22: Optimize Dockerfile layers

Consider moving these ENV commands to the Java installation RUN block to reduce the number of layers.

 RUN apt-get update && \
     ... && \
-    update-alternatives --set java /usr/lib/jvm/java-17-amazon-corretto/bin/java
+    update-alternatives --set java /usr/lib/jvm/java-17-amazon-corretto/bin/java && \
+    echo "export JAVA_HOME=/usr/lib/jvm/java-17-amazon-corretto" >> /etc/environment && \
+    echo "export PATH=\$PATH:\$JAVA_HOME/bin" >> /etc/environment

28-31: Consolidate classpath configurations

The current setup has redundant classpath definitions. Consider consolidating them into a single environment variable.

-ENV SPARK_CLASSPATH="/opt/spark/jars/*"
-ENV SPARK_DIST_CLASSPATH="/opt/spark/jars/*"
-ENV SPARK_EXTRA_CLASSPATH="/opt/spark/jars/*:/opt/chronon/jars/*"
-ENV HADOOP_CLASSPATH="/opt/spark/jars/*"
+ENV SPARK_CLASSPATH="/opt/spark/jars/*:/opt/chronon/jars/*"
+ENV SPARK_DIST_CLASSPATH="${SPARK_CLASSPATH}"
+ENV SPARK_EXTRA_CLASSPATH="${SPARK_CLASSPATH}"
+ENV HADOOP_CLASSPATH="/opt/spark/jars/*"

33-33: Consider using a more appropriate entrypoint

Using tail -f /dev/null is a workaround to keep the container running. Consider implementing a proper entrypoint script that handles signals and container lifecycle properly.

Example entrypoint script:

#!/bin/bash
trap "exit" TERM INT
while true; do
  sleep 1
done
spark/src/main/scala/ai/chronon/spark/scripts/ObservabilityDemo.scala (3)

29-35: Consider enhancing the Time utility method.

While functional, the timing utility could be improved:

  1. Add try-catch block to handle exceptions in the timed block
  2. Consider using System.nanoTime() for more precise measurements
  3. Make color output configurable for environments where it might not be supported
-  def Time(message: String)(block: => Unit): Unit = {
+  def Time[T](message: String)(block: => T): T = {
     println(s"$message..".yellow)
-    val start = System.currentTimeMillis()
+    val start = System.nanoTime()
     try {
-      block
+      val result = block
+      val end = System.nanoTime()
+      val durationMs = (end - start) / 1_000_000.0
+      println(s"$message took $durationMs ms".green)
+      result
     } catch {
+      case e: Exception =>
+        println(s"$message failed: ${e.getMessage}".red)
+        throw e
     } finally {
-      val end = System.currentTimeMillis()
-      println(s"$message took ${end - start} ms".green)
     }
   }

56-60: Enhance summarization configuration and validation.

The summarization needs improvements:

  1. Make useLogs parameter configurable
  2. Add validation of computation success
   Time("Summarizing data") {
     // compute summary table and packed table (for uploading)
-    Summarizer.compute(join.metaData, ds = endDs, useLogs = true)
+    val useLogs = spark.conf.getBoolean("chronon.summarizer.use_logs", defaultValue = true)
+    val summary = Summarizer.compute(join.metaData, ds = endDs, useLogs = useLogs)
+    require(summary != null, "Summary computation failed")
   }

86-86: Address the TODO comment about drift store integration.

The TODO indicates missing integration work that should be tracked.

Would you like me to create a GitHub issue to track the drift store integration task?

spark/src/main/scala/ai/chronon/spark/TableUtils.scala (2)

79-82: Fix inconsistency between comment and actual threshold value

The comment states "default threshold is 100K rows" but the actual default value in the code is 1000000 (1M). Please update either the comment or the value to maintain consistency.

-  // default threshold is 100K rows
+  // default threshold is 1M rows

Line range hint 1-824: Well-structured table utilities with robust error handling

The overall implementation of TableUtils is well-designed with:

  • Proper error handling for table operations
  • Configurable parameters with sensible defaults
  • Robust schema evolution support
  • Type-safe operations
  • Comprehensive documentation

However, consider adding unit tests for critical paths, especially around:

  • Schema evolution logic
  • Partition management
  • Error handling scenarios
📜 Review details

Configuration used: CodeRabbit UI
Review profile: CHILL

📥 Commits

Reviewing files that changed from the base of the PR and between 672a94e and 54d66b8.

📒 Files selected for processing (6)
  • docker-init/demo/Dockerfile (1 hunks)
  • docker-init/demo/README.md (1 hunks)
  • docker-init/demo/build.sh (1 hunks)
  • docker-init/demo/run.sh (1 hunks)
  • spark/src/main/scala/ai/chronon/spark/TableUtils.scala (3 hunks)
  • spark/src/main/scala/ai/chronon/spark/scripts/ObservabilityDemo.scala (1 hunks)
✅ Files skipped from review due to trivial changes (1)
  • docker-init/demo/README.md
🧰 Additional context used
🪛 Shellcheck (0.9.0-1)
docker-init/demo/build.sh

[error] 1-1: Tips depend on target shell and yours is unknown. Add a shebang or a 'shell' directive.

(SC2148)

docker-init/demo/run.sh

[error] 1-1: Tips depend on target shell and yours is unknown. Add a shebang or a 'shell' directive.

(SC2148)

🔇 Additional comments (1)
spark/src/main/scala/ai/chronon/spark/scripts/ObservabilityDemo.scala (1)

1-26: LGTM! Well-organized imports.

The imports are logically grouped and all appear to be utilized within the implementation.

Comment on lines 2 to 3
docker stop spark-app
docker rm spark-app
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🛠️ Refactor suggestion

Add error handling for Docker commands

The Docker stop and remove commands should handle cases where the container doesn't exist or can't be stopped.

Replace with this more robust implementation:

-docker stop spark-app
-docker rm spark-app
+if docker ps -a | grep -q spark-app; then
+  docker stop spark-app || echo "Failed to stop container"
+  docker rm spark-app || echo "Failed to remove container"
+fi
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
docker stop spark-app
docker rm spark-app
if docker ps -a | grep -q spark-app; then
docker stop spark-app || echo "Failed to stop container"
docker rm spark-app || echo "Failed to remove container"
fi

Comment on lines 6 to 9
docker run -d \
--name spark-app \
-v $HOME/repos/chronon/spark/target/scala-2.12:/opt/chronon/jars \
obs
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue

Improve volume mount path handling

The volume mount path assumes a specific directory structure which could cause issues in different environments.

Consider:

  1. Making the path configurable via environment variable
  2. Validating the path exists before mounting
+CHRONON_JAR_PATH="${CHRONON_JAR_PATH:-$HOME/repos/chronon/spark/target/scala-2.12}"
+
+if [ ! -d "$CHRONON_JAR_PATH" ]; then
+  echo "Error: JAR directory not found: $CHRONON_JAR_PATH"
+  exit 1
+fi
+
 docker run -d \
   --name spark-app \
-  -v $HOME/repos/chronon/spark/target/scala-2.12:/opt/chronon/jars \
+  -v "$CHRONON_JAR_PATH":/opt/chronon/jars \
   obs
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
docker run -d \
--name spark-app \
-v $HOME/repos/chronon/spark/target/scala-2.12:/opt/chronon/jars \
obs
CHRONON_JAR_PATH="${CHRONON_JAR_PATH:-$HOME/repos/chronon/spark/target/scala-2.12}"
if [ ! -d "$CHRONON_JAR_PATH" ]; then
echo "Error: JAR directory not found: $CHRONON_JAR_PATH"
exit 1
fi
docker run -d \
--name spark-app \
-v "$CHRONON_JAR_PATH":/opt/chronon/jars \
obs

Comment on lines 360 to 361
println(s"Repartitioning and writing into table $tableName".yellow)
repartitionAndWrite(finalizedDf, tableName, saveMode, stats, sortByCols)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue

Use logger instead of println for repartitioning status

For critical operations like repartitioning and writing data, it's important to maintain proper logging through the logging framework rather than using println.

-    println(s"Repartitioning and writing into table $tableName".yellow)
+    logger.info(s"Repartitioning and writing into table $tableName")

Committable suggestion skipped: line range outside the PR's diff.

Comment on lines 330 to 334
println(s"Table $tableName already exists, skipping creation")
case e: Exception =>
logger.error(s"Failed to create table $tableName", e)
println(s"Failed to create table $tableName", e)
throw e
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue

Revert println statements back to logger calls

Using println for logging is not recommended in production code as it:

  1. Bypasses the logging framework's configuration
  2. Makes it harder to manage and redirect logs
  3. Reduces observability in production environments
-          println(s"Table $tableName already exists, skipping creation")
+          logger.info(s"Table $tableName already exists, skipping creation")
-          println(s"Failed to create table $tableName", e)
+          logger.error(s"Failed to create table $tableName", e)

Committable suggestion skipped: line range outside the PR's diff.

Copy link
Contributor

@piyush-zlai piyush-zlai left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Trying to figure out the best way to wire the InMem store with the appropriate data - I'm thinking it might be worth adding a controller that exposes a bulkPut endpoint that you can call in the summary uploader - the backend of that controller will be the in-mem store. This way we have the spark jvm computing stuff and throwing it over the wall to the play jvm with the in mem kv store

.appName(name)
.enableHiveSupport()

if (hiveSupport) baseBuilder = baseBuilder.enableHiveSupport()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is this needed as part of this PR? I don't see it in use - can we drop for a follow up?

println(s"Table $tableName already exists, skipping creation")
case e: Exception =>
logger.error(s"Failed to create table $tableName", e)
println(s"Failed to create table $tableName", e)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

revert

// so that an exception will be thrown below
dfRearranged
}
println(s"Repartitioning and writing into table $tableName".yellow)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

revert

// test drift store methods
val driftStore = new DriftStore(api.genKvStore)

// TODO: Wire up drift store into hub and create an endpoint
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

hmm wiring up to kick off play from within the JVM process might end up being painful (it's typically triggered via the command line to launch play.core.server.ProdServerStart with the appropriate params / jars etc)

Copy link
Contributor

@piyush-zlai piyush-zlai left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Aside from log4j.properties the rest is minor clean up stuff. Stamping to unblock for now and I'll build on top of this with the http bulkPut code on my side

--driver-class-path "/opt/spark/jars/*:/opt/chronon/jars/*" \
--conf "spark.driver.host=localhost" \
--conf "spark.driver.bindAddress=0.0.0.0" \
--class ai.chronon.spark.scripts.ObservabilityDemo \
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

are we running the in-memory kvstore on the spark driver? curious how that'd work in a cluster-mode setup or is that out of scope for the demo?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yeah basically only for the demo. we don't plan to launch this.

object ObservabilityDemo {

def Time(message: String)(block: => Unit): Unit = {
println(s"$message..".yellow)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: change to log message.

def Time(message: String)(block: => Unit): Unit = {
println(s"$message..".yellow)
val start = System.currentTimeMillis()
block
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

do we need this to be threadsafe or no

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

not for this demo i think

val start = System.currentTimeMillis()
block
val end = System.currentTimeMillis()
println(s"$message took ${end - start} ms".green)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: change to log

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done

Copy link
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 7

🧹 Outside diff range and nitpick comments (4)
docker-init/demo/run.sh (1)

24-33: Consider cluster mode compatibility

Based on the conversation about in-memory kvstore running on the Spark driver, this setup might not work in cluster mode. While it's noted as "only for demo", it would be helpful to document this limitation.

Consider adding a comment in the script:

 # Submit with increased memory
+# Note: This demo uses driver-local kvstore and is not suitable for cluster mode
 docker exec spark-app \
spark/src/main/scala/ai/chronon/spark/scripts/ObservabilityDemo.scala (3)

132-132: Extract timeout configuration and use consistent duration creation.

Hardcoded timeout values should be configurable and use a consistent creation pattern.

+  private val DefaultTimeout: Duration = Duration(10, TimeUnit.SECONDS)
+
   def main(args: Array[String]): Unit = {
     // ... existing code ...
-    val summaries = Await.result(summariesFuture, Duration.create(10, TimeUnit.SECONDS))
+    val summaries = Await.result(summariesFuture, DefaultTimeout)
     
     // ... in drift series fetching ...
-    driftSeries = Await.result(driftSeriesFuture.get, Duration.create(10, TimeUnit.SECONDS))
+    driftSeries = Await.result(driftSeriesFuture.get, DefaultTimeout)
     
     // ... in summary series fetching ...
-    summarySeries = Await.result(summarySeriesFuture.get, Duration.create(10, TimeUnit.SECONDS))
+    summarySeries = Await.result(summarySeriesFuture.get, DefaultTimeout)

Also applies to: 145-145, 169-169


183-185: Use logger consistently instead of println.

Replace println with logger calls for consistent logging throughout the code.

-    println(s"""summary ptile totals: $summaryTotals
-           |summary ptile nulls: $summaryNulls
-           |""".stripMargin)
+    logger.info(s"""summary ptile totals: $summaryTotals
+           |summary ptile nulls: $summaryNulls
+           |""".stripMargin)

121-121: Address the TODO comment about wiring up drift store.

The TODO comment indicates missing functionality that needs to be implemented.

Would you like me to help create a GitHub issue to track the implementation of wiring up the drift store into the hub and creating an endpoint?

📜 Review details

Configuration used: CodeRabbit UI
Review profile: CHILL

📥 Commits

Reviewing files that changed from the base of the PR and between 54d66b8 and ccb5a0f.

📒 Files selected for processing (6)
  • docker-init/demo/Dockerfile (1 hunks)
  • docker-init/demo/run.sh (1 hunks)
  • docker-init/start.sh (3 hunks)
  • spark/src/main/resources/logback.xml (1 hunks)
  • spark/src/main/scala/ai/chronon/spark/TableUtils.scala (1 hunks)
  • spark/src/main/scala/ai/chronon/spark/scripts/ObservabilityDemo.scala (1 hunks)
✅ Files skipped from review due to trivial changes (1)
  • spark/src/main/resources/logback.xml
🚧 Files skipped from review as they are similar to previous changes (3)
  • docker-init/demo/Dockerfile
  • docker-init/start.sh
  • spark/src/main/scala/ai/chronon/spark/TableUtils.scala

# Run new container
docker run -d \
--name spark-app \
-v "$CHRONON_JAR_PATH":/opt/chronon/jars:/opt/chronon/jars \
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue

Fix duplicate path in volume mount

There's a duplicate path in the volume mount specification which would cause Docker to fail.

-  -v "$CHRONON_JAR_PATH":/opt/chronon/jars:/opt/chronon/jars \
+  -v "$CHRONON_JAR_PATH":/opt/chronon/jars \
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
-v "$CHRONON_JAR_PATH":/opt/chronon/jars:/opt/chronon/jars \
-v "$CHRONON_JAR_PATH":/opt/chronon/jars \

Comment on lines +30 to +31
--conf "spark.driver.host=localhost" \
--conf "spark.driver.bindAddress=0.0.0.0" \
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

💡 Codebase verification

Network configuration needs to be updated for Docker Compose environment

The current network configuration in run.sh using localhost and 0.0.0.0 is incompatible with the Docker Compose setup. The compose file shows that:

  • A custom network temporal-network is defined
  • Spark services are running in the compose environment
  • The Spark master is accessible via hostname spark (not localhost)

The Spark configuration should be updated to:

  • Replace spark.driver.host=localhost with spark.driver.host=spark
  • Keep spark.driver.bindAddress=0.0.0.0 as it allows accepting connections from any network interface
🔗 Analysis chain

Verify network configuration for container networking

The current network configuration might not work correctly in all Docker network configurations. The localhost and 0.0.0.0 bindings need verification.

🏁 Scripts executed

The following scripts were executed for the analysis:

Script:

#!/bin/bash
# Check if custom Docker networks are used
docker network ls --format "{{.Name}}" | grep -q "spark" && {
  echo "Custom Spark network exists. Current localhost binding might not work."
}

# Check container's network mode
docker inspect spark-app --format '{{.HostConfig.NetworkMode}}'

Length of output: 316


Script:

#!/bin/bash
# Let's analyze the Docker and Spark configuration in the codebase
# Search for Docker network configurations
rg -l "docker.*network" 

# Search for Spark network configurations
rg "spark\.(driver|executor)\.(host|bindAddress)" -A 2

# Look for Docker Compose files that might define networks
fd -e yml -e yaml | xargs grep -l "networks:"

# Check if there are any other Spark configuration files
fd -e conf -e properties | grep -i spark

Length of output: 922


Script:

#!/bin/bash
# Let's check the network configuration in compose file and other Spark settings
cat docker-init/compose.yaml

# Check Spark default configurations
cat quickstart/conf/spark-defaults.conf

# Check the SparkSessionBuilder implementation for network settings
cat spark/src/main/scala/ai/chronon/spark/SparkSessionBuilder.scala

Length of output: 11114

object ObservabilityDemo {
@transient lazy val logger: Logger = LoggerFactory.getLogger(getClass)

def Time(message: String)(block: => Unit): Unit = {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🛠️ Refactor suggestion

Rename Time method to follow Scala naming conventions.

Method names in Scala should start with lowercase letters.

-  def Time(message: String)(block: => Unit): Unit = {
+  def time(message: String)(block: => Unit): Unit = {
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
def Time(message: String)(block: => Unit): Unit = {
def time(message: String)(block: => Unit): Unit = {

Comment on lines 34 to 40
def Time(message: String)(block: => Unit): Unit = {
logger.info(s"$message..".yellow)
val start = System.currentTimeMillis()
block
val end = System.currentTimeMillis()
logger.info(s"$message took ${end - start} ms".green)
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🛠️ Refactor suggestion

Improve timing implementation for thread safety and precision.

The current implementation has two issues:

  1. It's not thread-safe as timing operations from multiple threads could interleave.
  2. System.currentTimeMillis() is less precise than System.nanoTime() for duration measurements.
-  def Time(message: String)(block: => Unit): Unit = {
+  def time(message: String)(block: => Unit): Unit = {
     logger.info(s"$message..".yellow)
-    val start = System.currentTimeMillis()
+    val start = System.nanoTime()
     block
-    val end = System.currentTimeMillis()
-    logger.info(s"$message took ${end - start} ms".green)
+    val end = System.nanoTime()
+    val durationMs = TimeUnit.NANOSECONDS.toMillis(end - start)
+    logger.info(s"$message took $durationMs ms".green)
   }

For thread safety, consider using a synchronized block or atomic operations if concurrent timing is needed.

📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
def Time(message: String)(block: => Unit): Unit = {
logger.info(s"$message..".yellow)
val start = System.currentTimeMillis()
block
val end = System.currentTimeMillis()
logger.info(s"$message took ${end - start} ms".green)
}
def time(message: String)(block: => Unit): Unit = {
logger.info(s"$message..".yellow)
val start = System.nanoTime()
block
val end = System.nanoTime()
val durationMs = TimeUnit.NANOSECONDS.toMillis(end - start)
logger.info(s"$message took $durationMs ms".green)
}

Comment on lines 70 to 190
def main(args: Array[String]): Unit = {

val config = new Conf(args)
val startDs = config.startDs()
val endDs = config.endDs()
val rowCount = config.rowCount()
val namespace = config.namespace()

val spark = SparkSessionBuilder.build(namespace, local = true)
implicit val tableUtils: TableUtils = TableUtils(spark)
tableUtils.createDatabase(namespace)

// generate anomalous data (join output)
val prepareData = PrepareData(namespace)
val join = prepareData.generateAnomalousFraudJoin

Time("Preparing data") {
val df = prepareData.generateFraudSampleData(rowCount, startDs, endDs, join.metaData.loggedTable)
df.show(10, truncate = false)
}

Time("Summarizing data") {
// compute summary table and packed table (for uploading)
Summarizer.compute(join.metaData, ds = endDs, useLogs = true)
}

val packedTable = join.metaData.packedSummaryTable
// mock api impl for online fetching and uploading
val kvStoreFunc: () => KVStore = () => {
// cannot reuse the variable - or serialization error
val result = InMemoryKvStore.build(namespace, () => null)
result
}
val api = new MockApi(kvStoreFunc, namespace)

// create necessary tables in kvstore
val kvStore = api.genKvStore
kvStore.create(Constants.MetadataDataset)
kvStore.create(Constants.TiledSummaryDataset)

// upload join conf
api.buildFetcher().putJoinConf(join)

Time("Uploading summaries") {
val uploader = new SummaryUploader(tableUtils.loadTable(packedTable), api)
uploader.run()
}

// test drift store methods
val driftStore = new DriftStore(api.genKvStore)

// TODO: Wire up drift store into hub and create an endpoint

// fetch keys
val tileKeys = driftStore.tileKeysForJoin(join)
val tileKeysSimple = tileKeys.mapValues(_.map(_.column).toSeq)
tileKeysSimple.foreach { case (k, v) => logger.info(s"$k -> [${v.mkString(", ")}]") }

// fetch summaries
val startMs = PartitionSpec.daily.epochMillis(startDs)
val endMs = PartitionSpec.daily.epochMillis(endDs)
val summariesFuture = driftStore.getSummaries(join, Some(startMs), Some(endMs), None)
val summaries = Await.result(summariesFuture, Duration.create(10, TimeUnit.SECONDS))
logger.info(summaries.toString())

var driftSeries: Seq[TileDriftSeries] = null
// fetch drift series
Time("Fetching drift series") {
val driftSeriesFuture = driftStore.getDriftSeries(
join.metaData.nameToFilePath,
DriftMetric.JENSEN_SHANNON,
lookBack = new Window(7, chronon.api.TimeUnit.DAYS),
startMs,
endMs
)
driftSeries = Await.result(driftSeriesFuture.get, Duration.create(10, TimeUnit.SECONDS))
}

val (nulls, totals) = driftSeries.iterator.foldLeft(0 -> 0) {
case ((nulls, total), s) =>
val currentNulls = s.getPercentileDriftSeries.iterator().toScala.count(_ == null)
val currentCount = s.getPercentileDriftSeries.size()
(nulls + currentNulls, total + currentCount)
}

logger.info(s"""drift totals: $totals
|drift nulls: $nulls
|""".stripMargin.red)

logger.info("Drift series fetched successfully".green)

var summarySeries: Seq[TileSummarySeries] = null

Time("Fetching summary series") {
val summarySeriesFuture = driftStore.getSummarySeries(
join.metaData.nameToFilePath,
startMs,
endMs
)
summarySeries = Await.result(summarySeriesFuture.get, Duration.create(10, TimeUnit.SECONDS))
}

val (summaryNulls, summaryTotals) = summarySeries.iterator.foldLeft(0 -> 0) {
case ((nulls, total), s) =>
if (s.getPercentiles == null) {
(nulls + 1) -> (total + 1)
} else {
val currentNulls = s.getPercentiles.iterator().toScala.count(_ == null)
val currentCount = s.getPercentiles.size()
(nulls + currentNulls, total + currentCount)
}
}

println(s"""summary ptile totals: $summaryTotals
|summary ptile nulls: $summaryNulls
|""".stripMargin)

logger.info("Summary series fetched successfully".green)

spark.stop()
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🛠️ Refactor suggestion

Add proper resource cleanup using try-finally.

Ensure resources are properly cleaned up even if an error occurs.

   def main(args: Array[String]): Unit = {
+    var spark = null
+    try {
       val config = new Conf(args)
       // ... existing configuration setup ...
 
-      val spark = SparkSessionBuilder.build(namespace, local = true)
+      spark = SparkSessionBuilder.build(namespace, local = true)
       // ... rest of the code ...
 
-      spark.stop()
+    } catch {
+      case e: Exception =>
+        logger.error("Failed to run observability demo", e)
+        throw e
+    } finally {
+      if (spark != null) {
+        try {
+          spark.stop()
+        } catch {
+          case e: Exception =>
+            logger.warn("Failed to stop Spark session", e)
+        }
+      }
+    }
   }

Committable suggestion skipped: line range outside the PR's diff.

Copy link
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 3

🧹 Outside diff range and nitpick comments (1)
spark/src/main/scala/ai/chronon/spark/scripts/ObservabilityDemo.scala (1)

121-121: Track and implement the drift store integration TODO.

The TODO comment indicates missing functionality for drift store integration. This should be tracked and implemented.

Would you like me to help create a GitHub issue to track this task? I can provide a detailed implementation plan considering the JVM process integration challenges mentioned in the previous review.

📜 Review details

Configuration used: CodeRabbit UI
Review profile: CHILL

📥 Commits

Reviewing files that changed from the base of the PR and between ccb5a0f and e640b00.

📒 Files selected for processing (2)
  • docker-init/demo/run.sh (1 hunks)
  • spark/src/main/scala/ai/chronon/spark/scripts/ObservabilityDemo.scala (1 hunks)
🚧 Files skipped from review as they are similar to previous changes (1)
  • docker-init/demo/run.sh
🔇 Additional comments (4)
spark/src/main/scala/ai/chronon/spark/scripts/ObservabilityDemo.scala (4)

1-30: LGTM! Imports are well-organized and necessary.


34-40: Skipping comment as issues are already addressed in previous reviews.


82-89: Skipping comment as issues are already addressed in previous reviews.


96-109: Skipping comment as issues are already addressed in previous reviews.

val startMs = PartitionSpec.daily.epochMillis(startDs)
val endMs = PartitionSpec.daily.epochMillis(endDs)
val summariesFuture = driftStore.getSummaries(join, Some(startMs), Some(endMs), None)
val summaries = Await.result(summariesFuture, Duration.create(10, TimeUnit.SECONDS))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🛠️ Refactor suggestion

Make timeout values configurable.

Multiple operations use hardcoded 10-second timeouts. These should be configurable to accommodate different environments and network conditions.

+    val defaultTimeout = Duration.create(
+      config.timeout.getOrElse(10),
+      TimeUnit.SECONDS
+    )
+
-    val summaries = Await.result(summariesFuture, Duration.create(10, TimeUnit.SECONDS))
+    val summaries = Await.result(summariesFuture, defaultTimeout)

-    driftSeries = Await.result(driftSeriesFuture.get, Duration.create(10, TimeUnit.SECONDS))
+    driftSeries = Await.result(driftSeriesFuture.get, defaultTimeout)

-    summarySeries = Await.result(summarySeriesFuture.get, Duration.create(10, TimeUnit.SECONDS))
+    summarySeries = Await.result(summarySeriesFuture.get, defaultTimeout)

Add timeout configuration to the Conf class:

val timeout: ScallopOption[Int] = opt[Int](
  name = "timeout",
  default = Some(10),
  descr = "Timeout in seconds for async operations"
)

Also applies to: 145-145, 169-169


val endDs: ScallopOption[String] = opt[String](
name = "end-ds",
default = Some("2023-02-30"),
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue

Fix invalid default end date.

The default end date "2023-02-30" is invalid as February never has 30 days. This could cause runtime issues.

-      default = Some("2023-02-30"),
+      default = Some("2023-02-28"),
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
default = Some("2023-02-30"),
default = Some("2023-02-28"),

Comment on lines +189 to +191
spark.stop()
System.exit(0)
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue

Improve cleanup process.

The cleanup process has two issues:

  1. No error handling for spark.stop()
  2. Using System.exit(0) might prevent proper cleanup of resources
-    spark.stop()
-    System.exit(0)
+    try {
+      spark.stop()
+    } catch {
+      case e: Exception =>
+        logger.error("Failed to stop Spark session", e)
+        throw e
+    } finally {
+      // Allow natural program termination instead of forcing exit
+    }

Committable suggestion skipped: line range outside the PR's diff.

Copy link
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 3

🧹 Outside diff range and nitpick comments (2)
spark/src/main/scala/ai/chronon/spark/scripts/ObservabilityDemo.scala (2)

132-132: Make timeout values configurable.

Multiple operations use hardcoded 10-second timeouts. These should be configurable to accommodate different environments and network conditions.

+    val defaultTimeout = Duration.create(
+      config.timeout.getOrElse(10),
+      TimeUnit.SECONDS
+    )

-    val summaries = Await.result(summariesFuture, Duration.create(10, TimeUnit.SECONDS))
+    val summaries = Await.result(summariesFuture, defaultTimeout)

-    driftSeries = Await.result(driftSeriesFuture.get, Duration.create(10, TimeUnit.SECONDS))
+    driftSeries = Await.result(driftSeriesFuture.get, defaultTimeout)

-    summarySeries = Await.result(summarySeriesFuture.get, Duration.create(10, TimeUnit.SECONDS))
+    summarySeries = Await.result(summarySeriesFuture.get, defaultTimeout)

Also applies to: 145-145, 169-169


121-121: Address TODO comment about wiring up drift store.

The TODO comment indicates missing functionality for integrating the drift store with the hub.

Would you like me to help create a GitHub issue to track this task and provide implementation guidance?

📜 Review details

Configuration used: CodeRabbit UI
Review profile: CHILL

📥 Commits

Reviewing files that changed from the base of the PR and between e640b00 and cd3e266.

📒 Files selected for processing (1)
  • spark/src/main/scala/ai/chronon/spark/scripts/ObservabilityDemo.scala (1 hunks)
🔇 Additional comments (2)
spark/src/main/scala/ai/chronon/spark/scripts/ObservabilityDemo.scala (2)

189-191: ⚠️ Potential issue

Improve cleanup process.

The cleanup process needs proper error handling and should avoid using System.exit(0).

-    spark.stop()
-    System.exit(0)
+    try {
+      spark.stop()
+    } catch {
+      case e: Exception =>
+        logger.error("Failed to stop Spark session", e)
+        throw e
+    }

Likely invalid or redundant comment.


49-53: ⚠️ Potential issue

Fix invalid default end date.

The default end date "2023-02-30" is invalid as February never has 30 days.

   val endDs: ScallopOption[String] = opt[String](
     name = "end-ds",
-    default = Some("2023-02-30"),
+    default = Some("2023-02-28"),
     descr = "End date in YYYY-MM-DD format"
   )

Likely invalid or redundant comment.

Comment on lines +135 to +136
var driftSeries: Seq[TileDriftSeries] = null
// fetch drift series
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🛠️ Refactor suggestion

Replace mutable state with immutable variables.

Use val instead of var to prevent accidental modifications and improve code clarity.

-    var driftSeries: Seq[TileDriftSeries] = null
+    val driftSeries: Seq[TileDriftSeries] = time("Fetching drift series") {
+      val driftSeriesFuture = driftStore.getDriftSeries(
+        // ... existing parameters ...
+      )
+      Await.result(driftSeriesFuture.get, defaultTimeout)
+    }

-    var summarySeries: Seq[TileSummarySeries] = null
+    val summarySeries: Seq[TileSummarySeries] = time("Fetching summary series") {
+      // ... move the fetching logic here ...
+    }

Also applies to: 161-161

Comment on lines +34 to +40
def time(message: String)(block: => Unit): Unit = {
logger.info(s"$message..".yellow)
val start = System.currentTimeMillis()
block
val end = System.currentTimeMillis()
logger.info(s"$message took ${end - start} ms".green)
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🛠️ Refactor suggestion

Improve timing implementation for precision and naming convention.

  1. Use System.nanoTime() instead of System.currentTimeMillis() for more precise duration measurements
  2. Follow Scala naming conventions by using lowercase for method names
-  def time(message: String)(block: => Unit): Unit = {
+  def time(message: String)(block: => Unit): Unit = {
     logger.info(s"$message..".yellow)
-    val start = System.currentTimeMillis()
+    val start = System.nanoTime()
     block
-    val end = System.currentTimeMillis()
-    logger.info(s"$message took ${end - start} ms".green)
+    val end = System.nanoTime()
+    val durationMs = TimeUnit.NANOSECONDS.toMillis(end - start)
+    logger.info(s"$message took $durationMs ms".green)
   }
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
def time(message: String)(block: => Unit): Unit = {
logger.info(s"$message..".yellow)
val start = System.currentTimeMillis()
block
val end = System.currentTimeMillis()
logger.info(s"$message took ${end - start} ms".green)
}
def time(message: String)(block: => Unit): Unit = {
logger.info(s"$message..".yellow)
val start = System.nanoTime()
block
val end = System.nanoTime()
val durationMs = TimeUnit.NANOSECONDS.toMillis(end - start)
logger.info(s"$message took $durationMs ms".green)
}

Comment on lines +42 to +68
class Conf(arguments: Seq[String]) extends ScallopConf(arguments) {
val startDs: ScallopOption[String] = opt[String](
name = "start-ds",
default = Some("2023-01-01"),
descr = "Start date in YYYY-MM-DD format"
)

val endDs: ScallopOption[String] = opt[String](
name = "end-ds",
default = Some("2023-02-30"),
descr = "End date in YYYY-MM-DD format"
)

val rowCount: ScallopOption[Int] = opt[Int](
name = "row-count",
default = Some(700000),
descr = "Number of rows to generate"
)

val namespace: ScallopOption[String] = opt[String](
name = "namespace",
default = Some("observability_demo"),
descr = "Namespace for the demo"
)

verify()
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🛠️ Refactor suggestion

Add input validation for configuration parameters.

The configuration parameters need validation to ensure:

  1. Valid date formats
  2. Positive row count
  3. End date is after start date
   class Conf(arguments: Seq[String]) extends ScallopConf(arguments) {
+    private def isValidDate(date: String): Boolean = {
+      try {
+        PartitionSpec.daily.isValid(date)
+      } catch {
+        case _: Exception => false
+      }
+    }
+
     val startDs: ScallopOption[String] = opt[String](
       name = "start-ds",
       default = Some("2023-01-01"),
       descr = "Start date in YYYY-MM-DD format"
-    )
+    ).validate(date => isValidDate(date))

     val endDs: ScallopOption[String] = opt[String](
       name = "end-ds",
       default = Some("2023-02-28"),
       descr = "End date in YYYY-MM-DD format"
-    )
+    ).validate(date => isValidDate(date))

     val rowCount: ScallopOption[Int] = opt[Int](
       name = "row-count",
       default = Some(700000),
       descr = "Number of rows to generate"
-    )
+    ).validate(count => count > 0)

+    validate((conf: Conf) => {
+      val start = conf.startDs()
+      val end = conf.endDs()
+      PartitionSpec.daily.epochMillis(start) < PartitionSpec.daily.epochMillis(end)
+    }) { "End date must be after start date" }
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
class Conf(arguments: Seq[String]) extends ScallopConf(arguments) {
val startDs: ScallopOption[String] = opt[String](
name = "start-ds",
default = Some("2023-01-01"),
descr = "Start date in YYYY-MM-DD format"
)
val endDs: ScallopOption[String] = opt[String](
name = "end-ds",
default = Some("2023-02-30"),
descr = "End date in YYYY-MM-DD format"
)
val rowCount: ScallopOption[Int] = opt[Int](
name = "row-count",
default = Some(700000),
descr = "Number of rows to generate"
)
val namespace: ScallopOption[String] = opt[String](
name = "namespace",
default = Some("observability_demo"),
descr = "Namespace for the demo"
)
verify()
}
class Conf(arguments: Seq[String]) extends ScallopConf(arguments) {
private def isValidDate(date: String): Boolean = {
try {
PartitionSpec.daily.isValid(date)
} catch {
case _: Exception => false
}
}
val startDs: ScallopOption[String] = opt[String](
name = "start-ds",
default = Some("2023-01-01"),
descr = "Start date in YYYY-MM-DD format"
).validate(date => isValidDate(date))
val endDs: ScallopOption[String] = opt[String](
name = "end-ds",
default = Some("2023-02-28"),
descr = "End date in YYYY-MM-DD format"
).validate(date => isValidDate(date))
val rowCount: ScallopOption[Int] = opt[Int](
name = "row-count",
default = Some(700000),
descr = "Number of rows to generate"
).validate(count => count > 0)
val namespace: ScallopOption[String] = opt[String](
name = "namespace",
default = Some("observability_demo"),
descr = "Namespace for the demo"
)
validate((conf: Conf) => {
val start = conf.startDs()
val end = conf.endDs()
PartitionSpec.daily.epochMillis(start) < PartitionSpec.daily.epochMillis(end)
}) { "End date must be after start date" }
verify()
}

@nikhil-zlai nikhil-zlai merged commit ab5cedd into main Nov 26, 2024
8 checks passed
@nikhil-zlai nikhil-zlai deleted the obs_demo branch November 26, 2024 06:31
piyush-zlai added a commit that referenced this pull request Nov 27, 2024
…ontend (#95)

## Summary
Builds on a couple of the summary computation PRs and data generation to
wire things up so that Hub can serve them.
* Yanked out mock data based endpoints (model perf / drift, join &
feature skew) - decided it would be confusing to have a mix of mock and
generated data so we just have the generated data served
* Dropped a few of the scripts introduced in
#87. We bring up our
containers the way and we have a script `load_summaries.sh` that we can
trigger that leverages the existing app container to load data.
* DDB ingestion was taking too long and we were dropping a lot of data
due to rejected execution exceptions. To unblock for now, we've gone
with an approach of making a bulk put HTTP call from the
ObservabilityDemo app -> Hub and Hub utilizing a InMemoryKV store to
persist and serve up features.
* Added an endpoint to serve the join that are configured as we've
switched from the model based world.

There's still an issue to resolve around fetching individual feature
series data. Once I resolve that, we can switch this PR out of wip mode.

To test / run:
start up our docker containers:
```
$ docker-compose -f docker-init/compose.yaml up --build
...
```
In a different term load data:
```
$ ./docker-init/demo/load_summaries.sh 
Done uploading summaries! 🥳
```

You can now curl join & feature time series data.
Join drift (null ratios)
```
curl -X GET   'http://localhost:9000/api/v1/join/risk.user_transactions.txn_join/timeseries?startTs=1673308800000&endTs=1674172800000&metricType=drift&metrics=null&offset=10h&algorithm=psi'
```

Join drift (value drift)
```
curl -X GET   'http://localhost:9000/api/v1/join/risk.user_transactions.txn_join/timeseries?startTs=1673308800000&endTs=1674172800000&metricType=drift&metrics=value&offset=10h&algorithm=psi'
```

Feature drift:
```
curl -X GET   'http://localhost:9000/api/v1/join/risk.user_transactions.txn_join/feature/dim_user_account_type/timeseries?startTs=1673308800000&endTs=1674172800000&metricType=drift&metrics=value&offset=1D&algorithm=psi&granularity=aggregates'
```

Feature summaries:
```
curl -X GET   'http://localhost:9000/api/v1/join/risk.user_transactions.txn_join/feature/dim_user_account_type/timeseries?startTs=1673308800000&endTs=1674172800000&metricType=drift&metrics=value&offset=1D&algorithm=psi&granularity=percentile'
```

Join metadata
```
curl -X GET 'http://localhost:9000/api/v1/joins'
curl -X GET 'http://localhost:9000/api/v1/join/risk.user_transactions.txn_join'
```

## Checklist
- [X] Added Unit Tests
- [ ] Covered by existing CI
- [X] Integration tested
- [ ] Documentation update



<!-- This is an auto-generated comment: release notes by coderabbit.ai
-->
## Summary by CodeRabbit

## Release Notes

- **New Features**
- Introduced a new `JoinController` for managing joins with pagination
support.
- Added functionality for an in-memory key-value store with bulk data
upload capabilities.
- Implemented observability demo data loading within a Spark
application.
- Added a new `HTTPKVStore` class for remote key-value store
interactions over HTTP.

- **Improvements**
- Enhanced the `ModelController` and `SearchController` to align with
the new join data structure.
- Updated the `TimeSeriesController` to support asynchronous operations
and improved error handling.
- Refined dependency management in the build configuration for better
clarity and maintainability.
- Updated API routes to include new endpoints for listing and retrieving
joins.
- Updated configuration to replace the `DynamoDBModule` with
`ModelStoreModule`, adding `InMemoryKVStoreModule` and
`DriftStoreModule`.

- **Documentation**
- Revised README instructions for Docker container setup and demo data
loading.
- Updated API routes documentation to reflect new endpoints for joins
and in-memory data operations.

- **Bug Fixes**
- Resolved issues related to error handling in various controllers and
improved logging for better traceability.
<!-- end of auto-generated comment: release notes by coderabbit.ai -->

---------

Co-authored-by: nikhil-zlai <[email protected]>
kumar-zlai pushed a commit that referenced this pull request Apr 25, 2025
## Summary

## Checklist
- [ ] Added Unit Tests
- [ ] Covered by existing CI
- [ ] Integration tested
- [ ] Documentation update



<!-- This is an auto-generated comment: release notes by coderabbit.ai
-->
## Summary by CodeRabbit

- **New Features**
- Enhanced logging configuration for Spark sessions to reduce verbosity.
	- Improved timing and error handling in the data generation script.
- New method introduced for alternative streaming data handling in
`OnlineUtils`.
- Added a demonstration object for observability features in Spark
applications.
	- New configuration file for structured logging setup.

- **Bug Fixes**
- Adjusted method signatures to ensure clarity and correct parameter
usage in various classes.

- **Documentation**
- Updated import statements to reflect package restructuring for better
organization.
- Added instructions for building and executing the project in the
README.

- **Tests**
- Integrated `MockApi` into various test classes to enhance testing
capabilities and simulate API interactions.
- Enhanced test coverage by utilizing the `MockApi` for more robust
testing scenarios.
<!-- end of auto-generated comment: release notes by coderabbit.ai -->
kumar-zlai pushed a commit that referenced this pull request Apr 25, 2025
…ontend (#95)

## Summary
Builds on a couple of the summary computation PRs and data generation to
wire things up so that Hub can serve them.
* Yanked out mock data based endpoints (model perf / drift, join &
feature skew) - decided it would be confusing to have a mix of mock and
generated data so we just have the generated data served
* Dropped a few of the scripts introduced in
#87. We bring up our
containers the way and we have a script `load_summaries.sh` that we can
trigger that leverages the existing app container to load data.
* DDB ingestion was taking too long and we were dropping a lot of data
due to rejected execution exceptions. To unblock for now, we've gone
with an approach of making a bulk put HTTP call from the
ObservabilityDemo app -> Hub and Hub utilizing a InMemoryKV store to
persist and serve up features.
* Added an endpoint to serve the join that are configured as we've
switched from the model based world.

There's still an issue to resolve around fetching individual feature
series data. Once I resolve that, we can switch this PR out of wip mode.

To test / run:
start up our docker containers:
```
$ docker-compose -f docker-init/compose.yaml up --build
...
```
In a different term load data:
```
$ ./docker-init/demo/load_summaries.sh 
Done uploading summaries! 🥳
```

You can now curl join & feature time series data.
Join drift (null ratios)
```
curl -X GET   'http://localhost:9000/api/v1/join/risk.user_transactions.txn_join/timeseries?startTs=1673308800000&endTs=1674172800000&metricType=drift&metrics=null&offset=10h&algorithm=psi'
```

Join drift (value drift)
```
curl -X GET   'http://localhost:9000/api/v1/join/risk.user_transactions.txn_join/timeseries?startTs=1673308800000&endTs=1674172800000&metricType=drift&metrics=value&offset=10h&algorithm=psi'
```

Feature drift:
```
curl -X GET   'http://localhost:9000/api/v1/join/risk.user_transactions.txn_join/feature/dim_user_account_type/timeseries?startTs=1673308800000&endTs=1674172800000&metricType=drift&metrics=value&offset=1D&algorithm=psi&granularity=aggregates'
```

Feature summaries:
```
curl -X GET   'http://localhost:9000/api/v1/join/risk.user_transactions.txn_join/feature/dim_user_account_type/timeseries?startTs=1673308800000&endTs=1674172800000&metricType=drift&metrics=value&offset=1D&algorithm=psi&granularity=percentile'
```

Join metadata
```
curl -X GET 'http://localhost:9000/api/v1/joins'
curl -X GET 'http://localhost:9000/api/v1/join/risk.user_transactions.txn_join'
```

## Checklist
- [X] Added Unit Tests
- [ ] Covered by existing CI
- [X] Integration tested
- [ ] Documentation update



<!-- This is an auto-generated comment: release notes by coderabbit.ai
-->
## Summary by CodeRabbit

## Release Notes

- **New Features**
- Introduced a new `JoinController` for managing joins with pagination
support.
- Added functionality for an in-memory key-value store with bulk data
upload capabilities.
- Implemented observability demo data loading within a Spark
application.
- Added a new `HTTPKVStore` class for remote key-value store
interactions over HTTP.

- **Improvements**
- Enhanced the `ModelController` and `SearchController` to align with
the new join data structure.
- Updated the `TimeSeriesController` to support asynchronous operations
and improved error handling.
- Refined dependency management in the build configuration for better
clarity and maintainability.
- Updated API routes to include new endpoints for listing and retrieving
joins.
- Updated configuration to replace the `DynamoDBModule` with
`ModelStoreModule`, adding `InMemoryKVStoreModule` and
`DriftStoreModule`.

- **Documentation**
- Revised README instructions for Docker container setup and demo data
loading.
- Updated API routes documentation to reflect new endpoints for joins
and in-memory data operations.

- **Bug Fixes**
- Resolved issues related to error handling in various controllers and
improved logging for better traceability.
<!-- end of auto-generated comment: release notes by coderabbit.ai -->

---------

Co-authored-by: nikhil-zlai <[email protected]>
kumar-zlai pushed a commit that referenced this pull request Apr 29, 2025
## Summary

## Checklist
- [ ] Added Unit Tests
- [ ] Covered by existing CI
- [ ] Integration tested
- [ ] Documentation update



<!-- This is an auto-generated comment: release notes by coderabbit.ai
-->
## Summary by CodeRabbit

- **New Features**
- Enhanced logging configuration for Spark sessions to reduce verbosity.
	- Improved timing and error handling in the data generation script.
- New method introduced for alternative streaming data handling in
`OnlineUtils`.
- Added a demonstration object for observability features in Spark
applications.
	- New configuration file for structured logging setup.

- **Bug Fixes**
- Adjusted method signatures to ensure clarity and correct parameter
usage in various classes.

- **Documentation**
- Updated import statements to reflect package restructuring for better
organization.
- Added instructions for building and executing the project in the
README.

- **Tests**
- Integrated `MockApi` into various test classes to enhance testing
capabilities and simulate API interactions.
- Enhanced test coverage by utilizing the `MockApi` for more robust
testing scenarios.
<!-- end of auto-generated comment: release notes by coderabbit.ai -->
kumar-zlai pushed a commit that referenced this pull request Apr 29, 2025
…ontend (#95)

## Summary
Builds on a couple of the summary computation PRs and data generation to
wire things up so that Hub can serve them.
* Yanked out mock data based endpoints (model perf / drift, join &
feature skew) - decided it would be confusing to have a mix of mock and
generated data so we just have the generated data served
* Dropped a few of the scripts introduced in
#87. We bring up our
containers the way and we have a script `load_summaries.sh` that we can
trigger that leverages the existing app container to load data.
* DDB ingestion was taking too long and we were dropping a lot of data
due to rejected execution exceptions. To unblock for now, we've gone
with an approach of making a bulk put HTTP call from the
ObservabilityDemo app -> Hub and Hub utilizing a InMemoryKV store to
persist and serve up features.
* Added an endpoint to serve the join that are configured as we've
switched from the model based world.

There's still an issue to resolve around fetching individual feature
series data. Once I resolve that, we can switch this PR out of wip mode.

To test / run:
start up our docker containers:
```
$ docker-compose -f docker-init/compose.yaml up --build
...
```
In a different term load data:
```
$ ./docker-init/demo/load_summaries.sh 
Done uploading summaries! 🥳
```

You can now curl join & feature time series data.
Join drift (null ratios)
```
curl -X GET   'http://localhost:9000/api/v1/join/risk.user_transactions.txn_join/timeseries?startTs=1673308800000&endTs=1674172800000&metricType=drift&metrics=null&offset=10h&algorithm=psi'
```

Join drift (value drift)
```
curl -X GET   'http://localhost:9000/api/v1/join/risk.user_transactions.txn_join/timeseries?startTs=1673308800000&endTs=1674172800000&metricType=drift&metrics=value&offset=10h&algorithm=psi'
```

Feature drift:
```
curl -X GET   'http://localhost:9000/api/v1/join/risk.user_transactions.txn_join/feature/dim_user_account_type/timeseries?startTs=1673308800000&endTs=1674172800000&metricType=drift&metrics=value&offset=1D&algorithm=psi&granularity=aggregates'
```

Feature summaries:
```
curl -X GET   'http://localhost:9000/api/v1/join/risk.user_transactions.txn_join/feature/dim_user_account_type/timeseries?startTs=1673308800000&endTs=1674172800000&metricType=drift&metrics=value&offset=1D&algorithm=psi&granularity=percentile'
```

Join metadata
```
curl -X GET 'http://localhost:9000/api/v1/joins'
curl -X GET 'http://localhost:9000/api/v1/join/risk.user_transactions.txn_join'
```

## Checklist
- [X] Added Unit Tests
- [ ] Covered by existing CI
- [X] Integration tested
- [ ] Documentation update



<!-- This is an auto-generated comment: release notes by coderabbit.ai
-->
## Summary by CodeRabbit

## Release Notes

- **New Features**
- Introduced a new `JoinController` for managing joins with pagination
support.
- Added functionality for an in-memory key-value store with bulk data
upload capabilities.
- Implemented observability demo data loading within a Spark
application.
- Added a new `HTTPKVStore` class for remote key-value store
interactions over HTTP.

- **Improvements**
- Enhanced the `ModelController` and `SearchController` to align with
the new join data structure.
- Updated the `TimeSeriesController` to support asynchronous operations
and improved error handling.
- Refined dependency management in the build configuration for better
clarity and maintainability.
- Updated API routes to include new endpoints for listing and retrieving
joins.
- Updated configuration to replace the `DynamoDBModule` with
`ModelStoreModule`, adding `InMemoryKVStoreModule` and
`DriftStoreModule`.

- **Documentation**
- Revised README instructions for Docker container setup and demo data
loading.
- Updated API routes documentation to reflect new endpoints for joins
and in-memory data operations.

- **Bug Fixes**
- Resolved issues related to error handling in various controllers and
improved logging for better traceability.
<!-- end of auto-generated comment: release notes by coderabbit.ai -->

---------

Co-authored-by: nikhil-zlai <[email protected]>
chewy-zlai pushed a commit that referenced this pull request May 15, 2025
## Summary

## Checklist
- [ ] Added Unit Tests
- [ ] Covered by existing CI
- [ ] Integration tested
- [ ] Documentation update



<!-- This is an auto-generated comment: release notes by coderabbit.ai
-->
## Summary by CodeRabbit

- **New Features**
- Enhanced logging configuration for Spark sessions to reduce verbosity.
	- Improved timing and error handling in the data generation script.
- New method introduced for alternative streaming data handling in
`OnlineUtils`.
- Added a demonstration object for observability features in Spark
applications.
	- New configuration file for structured logging setup.

- **Bug Fixes**
- Adjusted method signatures to ensure clarity and correct parameter
usage in various classes.

- **Documentation**
- Updated import statements to reflect package restructuring for better
organization.
- Added instructions for building and executing the project in the
README.

- **Tests**
- Integrated `MockApi` into various test classes to enhance testing
capabilities and simulate API interactions.
- Enhanced test coverage by utilizing the `MockApi` for more robust
testing scenarios.
<!-- end of auto-generated comment: release notes by coderabbit.ai -->
chewy-zlai pushed a commit that referenced this pull request May 15, 2025
…ontend (#95)

## Summary
Builds on a couple of the summary computation PRs and data generation to
wire things up so that Hub can serve them.
* Yanked out mock data based endpoints (model perf / drift, join &
feature skew) - decided it would be confusing to have a mix of mock and
generated data so we just have the generated data served
* Dropped a few of the scripts introduced in
#87. We bring up our
containers the way and we have a script `load_summaries.sh` that we can
trigger that leverages the existing app container to load data.
* DDB ingestion was taking too long and we were dropping a lot of data
due to rejected execution exceptions. To unblock for now, we've gone
with an approach of making a bulk put HTTP call from the
ObservabilityDemo app -> Hub and Hub utilizing a InMemoryKV store to
persist and serve up features.
* Added an endpoint to serve the join that are configured as we've
switched from the model based world.

There's still an issue to resolve around fetching individual feature
series data. Once I resolve that, we can switch this PR out of wip mode.

To test / run:
start up our docker containers:
```
$ docker-compose -f docker-init/compose.yaml up --build
...
```
In a different term load data:
```
$ ./docker-init/demo/load_summaries.sh 
Done uploading summaries! 🥳
```

You can now curl join & feature time series data.
Join drift (null ratios)
```
curl -X GET   'http://localhost:9000/api/v1/join/risk.user_transactions.txn_join/timeseries?startTs=1673308800000&endTs=1674172800000&metricType=drift&metrics=null&offset=10h&algorithm=psi'
```

Join drift (value drift)
```
curl -X GET   'http://localhost:9000/api/v1/join/risk.user_transactions.txn_join/timeseries?startTs=1673308800000&endTs=1674172800000&metricType=drift&metrics=value&offset=10h&algorithm=psi'
```

Feature drift:
```
curl -X GET   'http://localhost:9000/api/v1/join/risk.user_transactions.txn_join/feature/dim_user_account_type/timeseries?startTs=1673308800000&endTs=1674172800000&metricType=drift&metrics=value&offset=1D&algorithm=psi&granularity=aggregates'
```

Feature summaries:
```
curl -X GET   'http://localhost:9000/api/v1/join/risk.user_transactions.txn_join/feature/dim_user_account_type/timeseries?startTs=1673308800000&endTs=1674172800000&metricType=drift&metrics=value&offset=1D&algorithm=psi&granularity=percentile'
```

Join metadata
```
curl -X GET 'http://localhost:9000/api/v1/joins'
curl -X GET 'http://localhost:9000/api/v1/join/risk.user_transactions.txn_join'
```

## Checklist
- [X] Added Unit Tests
- [ ] Covered by existing CI
- [X] Integration tested
- [ ] Documentation update



<!-- This is an auto-generated comment: release notes by coderabbit.ai
-->
## Summary by CodeRabbit

## Release Notes

- **New Features**
- Introduced a new `JoinController` for managing joins with pagination
support.
- Added functionality for an in-memory key-value store with bulk data
upload capabilities.
- Implemented observability demo data loading within a Spark
application.
- Added a new `HTTPKVStore` class for remote key-value store
interactions over HTTP.

- **Improvements**
- Enhanced the `ModelController` and `SearchController` to align with
the new join data structure.
- Updated the `TimeSeriesController` to support asynchronous operations
and improved error handling.
- Refined dependency management in the build configuration for better
clarity and maintainability.
- Updated API routes to include new endpoints for listing and retrieving
joins.
- Updated configuration to replace the `DynamoDBModule` with
`ModelStoreModule`, adding `InMemoryKVStoreModule` and
`DriftStoreModule`.

- **Documentation**
- Revised README instructions for Docker container setup and demo data
loading.
- Updated API routes documentation to reflect new endpoints for joins
and in-memory data operations.

- **Bug Fixes**
- Resolved issues related to error handling in various controllers and
improved logging for better traceability.
<!-- end of auto-generated comment: release notes by coderabbit.ai -->

---------

Co-authored-by: nikhil-zlai <[email protected]>
chewy-zlai pushed a commit that referenced this pull request May 16, 2025
## Summary

## Cheour clientslist
- [ ] Added Unit Tests
- [ ] Covered by existing CI
- [ ] Integration tested
- [ ] Documentation update



<!-- This is an auto-generated comment: release notes by coderabbit.ai
-->
## Summary by CodeRabbit

- **New Features**
- Enhanced logging configuration for Spark sessions to reduce verbosity.
	- Improved timing and error handling in the data generation script.
- New method introduced for alternative streaming data handling in
`OnlineUtils`.
- Added a demonstration object for observability features in Spark
applications.
	- New configuration file for structured logging setup.

- **Bug Fixes**
- Adjusted method signatures to ensure clarity and correct parameter
usage in various classes.

- **Documentation**
- Updated import statements to reflect paour clientsage restructuring for better
organization.
- Added instructions for building and executing the project in the
README.

- **Tests**
- Integrated `Moour clientsApi` into various test classes to enhance testing
capabilities and simulate API interactions.
- Enhanced test coverage by utilizing the `Moour clientsApi` for more robust
testing scenarios.
<!-- end of auto-generated comment: release notes by coderabbit.ai -->
chewy-zlai pushed a commit that referenced this pull request May 16, 2025
…ontend (#95)

## Summary
Builds on a couple of the summary computation PRs and data generation to
wire things up so that Hub can serve them.
* Yanked out moour clients data based endpoints (model perf / drift, join &
feature skew) - decided it would be confusing to have a mix of moour clients and
generated data so we just have the generated data served
* Dropped a few of the scripts introduced in
#87. We bring up our
containers the way and we have a script `load_summaries.sh` that we can
trigger that leverages the existing app container to load data.
* DDB ingestion was taking too long and we were dropping a lot of data
due to rejected execution exceptions. To unbloour clients for now, we've gone
with an approach of making a bulk put HTTP call from the
ObservabilityDemo app -> Hub and Hub utilizing a InMemoryKV store to
persist and serve up features.
* Added an endpoint to serve the join that are configured as we've
switched from the model based world.

There's still an issue to resolve around fetching individual feature
series data. Once I resolve that, we can switch this PR out of wip mode.

To test / run:
start up our doour clientser containers:
```
$ doour clientser-compose -f doour clientser-init/compose.yaml up --build
...
```
In a different term load data:
```
$ ./doour clientser-init/demo/load_summaries.sh 
Done uploading summaries! 🥳
```

You can now curl join & feature time series data.
Join drift (null ratios)
```
curl -X GET   'http://localhost:9000/api/v1/join/risk.user_transactions.txn_join/timeseries?startTs=1673308800000&endTs=1674172800000&metricType=drift&metrics=null&offset=10h&algorithm=psi'
```

Join drift (value drift)
```
curl -X GET   'http://localhost:9000/api/v1/join/risk.user_transactions.txn_join/timeseries?startTs=1673308800000&endTs=1674172800000&metricType=drift&metrics=value&offset=10h&algorithm=psi'
```

Feature drift:
```
curl -X GET   'http://localhost:9000/api/v1/join/risk.user_transactions.txn_join/feature/dim_user_account_type/timeseries?startTs=1673308800000&endTs=1674172800000&metricType=drift&metrics=value&offset=1D&algorithm=psi&granularity=aggregates'
```

Feature summaries:
```
curl -X GET   'http://localhost:9000/api/v1/join/risk.user_transactions.txn_join/feature/dim_user_account_type/timeseries?startTs=1673308800000&endTs=1674172800000&metricType=drift&metrics=value&offset=1D&algorithm=psi&granularity=percentile'
```

Join metadata
```
curl -X GET 'http://localhost:9000/api/v1/joins'
curl -X GET 'http://localhost:9000/api/v1/join/risk.user_transactions.txn_join'
```

## Cheour clientslist
- [X] Added Unit Tests
- [ ] Covered by existing CI
- [X] Integration tested
- [ ] Documentation update



<!-- This is an auto-generated comment: release notes by coderabbit.ai
-->
## Summary by CodeRabbit

## Release Notes

- **New Features**
- Introduced a new `JoinController` for managing joins with pagination
support.
- Added functionality for an in-memory key-value store with bulk data
upload capabilities.
- Implemented observability demo data loading within a Spark
application.
- Added a new `HTTPKVStore` class for remote key-value store
interactions over HTTP.

- **Improvements**
- Enhanced the `ModelController` and `SearchController` to align with
the new join data structure.
- Updated the `TimeSeriesController` to support asynchronous operations
and improved error handling.
- Refined dependency management in the build configuration for better
clarity and maintainability.
- Updated API routes to include new endpoints for listing and retrieving
joins.
- Updated configuration to replace the `DynamoDBModule` with
`ModelStoreModule`, adding `InMemoryKVStoreModule` and
`DriftStoreModule`.

- **Documentation**
- Revised README instructions for Doour clientser container setup and demo data
loading.
- Updated API routes documentation to reflect new endpoints for joins
and in-memory data operations.

- **Bug Fixes**
- Resolved issues related to error handling in various controllers and
improved logging for better traceability.
<!-- end of auto-generated comment: release notes by coderabbit.ai -->

---------

Co-authored-by: nikhil-zlai <[email protected]>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

4 participants