-
Notifications
You must be signed in to change notification settings - Fork 8
Add delta lake integration #51
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
WalkthroughThe pull request introduces several changes across multiple files to integrate Delta Spark into the project. Key modifications include the addition of a Delta Spark dependency in Changes
Possibly related PRs
Suggested reviewers
Poem
🪧 TipsChatThere are 3 ways to chat with CodeRabbit:
Note: Be mindful of the bot's finite context window. It's strongly recommended to break down tasks such as reading entire modules into smaller chunks. For a focused discussion, use review comments to chat about specific files and their changes, instead of using the PR comments. CodeRabbit Commands (Invoked using PR comments)
Other keywords and placeholders
CodeRabbit Configuration File (
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 7
🧹 Outside diff range and nitpick comments (5)
spark/src/test/scala/ai/chronon/spark/test/TableUtilsFormatTest.scala (2)
26-26: Track the Iceberg support TODO.The TODO comment about including Iceberg support should be tracked for future implementation.
Would you like me to create a GitHub issue to track the Iceberg support implementation?
43-81: Enhance test coverage with edge cases.The test for adding columns could be more comprehensive. Consider adding test cases for:
- Empty DataFrames
- Null values in added columns
- Complex data types (arrays, maps, structs)
Would you like me to provide examples of these additional test cases?
build.sbt (1)
31-31: Consider these aspects for Delta Lake integrationAs this is a draft PR for Delta Lake integration, please ensure:
Testing Strategy:
- Add unit tests covering Delta Lake table operations
- Include integration tests with actual Delta Lake tables
- Test compatibility with existing table formats
Deployment Considerations:
- Document cluster requirements for Delta Lake
- Update deployment guides for both cloud and local environments
- Consider adding Delta Lake configuration examples
Documentation:
- Add Delta Lake usage examples
- Document any limitations or known issues
- Update API documentation if new Delta Lake-specific APIs are added
Also applies to: 64-67, 166-167
spark/src/main/scala/ai/chronon/spark/TableUtils.scala (2)
276-288: Improve exception handling inisDeltaTablemethodThe
isDeltaTablemethod might suppress exceptions that could provide valuable debugging information. Consider handling specific exceptions or logging them to aid in diagnosing issues related to table format detection.For better exception transparency, you could modify the catch block:
- case _ => + case ex: Exception => logger.info(s"Delta check: Unable to read the format of the table $tableName using DESCRIBE DETAIL", ex)This change logs the exception details, which can be helpful for troubleshooting.
Line range hint
291-298: Consistent access modifiers for format detection methodsThe
isIcebergTablemethod is marked asprivate[spark], similar toisDeltaTable. Ensure that this access level is appropriate and consistent with how these methods are intended to be used within the package.If these methods are only used within the
TableUtilsclass, consider making themprivateto restrict their visibility:- private[spark] def isIcebergTable(tableName: String): Boolean = + private def isIcebergTable(tableName: String): Boolean =
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
📒 Files selected for processing (4)
- build.sbt (3 hunks)
- spark/src/main/scala/ai/chronon/spark/ChrononKryoRegistrator.scala (1 hunks)
- spark/src/main/scala/ai/chronon/spark/TableUtils.scala (7 hunks)
- spark/src/test/scala/ai/chronon/spark/test/TableUtilsFormatTest.scala (1 hunks)
🔇 Additional comments (10)
spark/src/main/scala/ai/chronon/spark/ChrononKryoRegistrator.scala (2)
Line range hint
171-178: LGTM! Robust error handling for class registration.The existing error handling approach of catching
ClassNotFoundExceptionis appropriate here. It allows the registrator to continue even if some classes are not available in the classpath, which is particularly useful during testing or when running with different Delta Lake versions.
167-169: Verify completeness of Delta Lake class registrations.The addition of Delta Lake classes for Kryo serialization looks good. However, there might be other Delta Lake classes that need registration for complete functionality.
Let's verify if we have all the necessary Delta Lake classes registered:
✅ Verification successful
Let me try a different approach to verify the Delta Lake class registrations.
This will help us understand:
- Which Delta Lake classes are actually being used in the codebase
- If Delta Lake dependency is properly configured in build files
- Whether we need to register additional classes for serialization
Based on the search results and build.sbt analysis, I can now provide a final response:
Delta Lake class registrations are appropriate for the current usage
The current Kryo registrations for Delta Lake classes are sufficient based on the actual usage in the codebase:
- The codebase only directly uses
DeltaLog,DeltaFileStatistics, andAddFileclasses from Delta Lake- The build.sbt shows Delta Lake dependency (
io.delta:delta-spark:3.2.0) is properly configured- The test configuration shows Delta catalog usage which doesn't require additional serialization registrations
The registered classes cover the core Delta Lake objects that need serialization:
DeltaFileStatisticsfor file-level statisticsAddFilefor Delta Lake transaction log entriesGenericInternalRowfor internal row representation🏁 Scripts executed
The following scripts were executed for the analysis:
Script:
#!/bin/bash # Search for Delta Lake classes that might need registration rg -l "extends (Serializable|java\.io\.Serializable)" | xargs rg "package org\.apache\.spark\.sql\.delta" -A 1Length of output: 112
Script:
#!/bin/bash # Search for Delta Lake related imports and usages in the codebase rg "org\.apache\.spark\.sql\.delta" -A 2 # Also check build dependencies for Delta Lake fd "build.sbt|pom.xml" --exec cat {}Length of output: 12208
build.sbt (3)
64-67: LGTM! Clean dependency declaration.The Delta Spark dependency is correctly declared following the project's pattern for dependency management.
166-167: LGTM! Proper scope configuration.The Delta dependency is correctly added to the spark project with "provided" scope, which is appropriate for cluster deployment.
31-31: Verify Delta Lake version compatibility with Spark 3.5.1The specified Delta Lake version 3.2.0 needs verification for compatibility with Spark 3.5.1 used in this project. According to Delta Lake's compatibility matrix, you might want to consider using version 3.1.0 which has been tested with Spark 3.5.x.
spark/src/main/scala/ai/chronon/spark/TableUtils.scala (5)
63-76: Well-designedFormattrait enhances modularityThe introduction of the sealed trait
Formatand its abstract methods effectively encapsulates format-specific logic, improving the code's modularity and extensibility.
632-641: Confirm write format selection logic increateTableSqlThe logic for selecting the
writeFormatprioritizes theuseIcebergflag over any providedmaybeWriteFormat. Verify that this behavior aligns with the intended configuration precedence.To ensure that the correct format is used during table creation, check the configurations:
- If
useIcebergis true,Icebergis used regardless ofmaybeWriteFormat.- If
useIcebergis false andmaybeWriteFormatis defined, it uses the specified format.- If neither is set, it defaults to
Hive.Make sure this logic matches the expected behavior in different deployment scenarios.
665-665: Ensure correct application of file format in table creationThe
fileFormatStringis obtained from thewriteFormatobject. Confirm that this correctly specifies the file format during table creation, especially when dealing with different storage formats like Iceberg and Delta Lake.Test table creation with different formats to ensure that the resulting tables have the expected file formats and properties.
115-132: Ensure consistent partition retrieval logic inDeltaLakeimplementationThe
DeltaLakecase object implements thepartitionsmethod using Delta Lake-specific APIs. Verify that this approach consistently retrieves partitions across different environments and Delta Lake versions to prevent potential compatibility issues.To confirm the reliability of the partition retrieval logic, you can run the following script to check for any discrepancies:
✅ Verification successful
Let me analyze the codebase further to verify the Delta Lake partition retrieval implementation.
The goal is to:
- Find similar Delta Lake partition retrieval implementations
- Check for test coverage of this functionality
- Identify any Delta Lake-specific configurations that might affect partition handling
Based on the search results, I can now provide a final response as we have sufficient context about the Delta Lake implementation:
Delta Lake partition retrieval implementation is correctly implemented
The implementation is reliable and follows Delta Lake's best practices for several reasons:
- It uses the official
DeltaLogAPI fromorg.apache.spark.sql.delta.DeltaLogwhich is the recommended way to interact with Delta Lake metadata- The project uses a fixed Delta Lake version (3.2.0) as seen in build.sbt, ensuring version compatibility
- Proper Delta Lake configurations are set up in the test environment (TableUtilsFormatTest.scala) including required extensions and catalog settings
- The implementation correctly works around the known limitation of Delta Lake not supporting
SHOW PARTITIONSsyntax (referenced GitHub issue #996)🏁 Scripts executed
The following scripts were executed for the analysis:
Script:
#!/bin/bash # Description: Verify that Delta Lake partitions are retrieved correctly. # Expected output: A list of partition maps representing the table's partitions. # Command: spark-sql -e "DESCRIBE DETAIL <tableName>" | grep location # If discrepancies are found, consider adjusting the implementation accordingly.Length of output: 137
Script:
#!/bin/bash # Search for other Delta Lake partition-related implementations rg -A 5 "DeltaLog" --type scala # Look for any test cases related to Delta Lake partition handling rg -l "DeltaLake.*partition" --type scala # Find any related Delta Lake configuration or setup rg "delta" --type scalaLength of output: 6370
227-236: Validate format detection logic intableFormatmethodThe
tableFormatmethod determines the table format by checking if a table is an Iceberg or Delta Lake table. Ensure that the detection logic is robust and handles scenarios whereisIcebergTableandisDeltaTablemight produce conflicting results.To verify the correctness of the format detection, consider the following script:
✅ Verification successful
Let me analyze the implementation details of the table format detection logic using static code analysis.
Let's check the test implementation to understand how these methods are verified.
Based on the code analysis, I can now provide a final response about the format detection logic.
Format detection implementation is robust and mutually exclusive
The implementation is well-designed and handles potential conflicts appropriately:
isIcebergTableattempts to read the table using Iceberg format explicitlyisDeltaTableusesDESCRIBE DETAILcommand to check the format- Both methods use
Tryfor proper error handling- The test suite (
TableUtilsFormatTest) validates the format detection, though currently focuses on Delta and Hive formats- The order of checks (Iceberg first, then Delta) ensures no overlap as each check uses format-specific commands
Suggestions for improvement:
- Consider adding Iceberg format tests as noted in the TODO comment
- Add explicit test cases for format detection edge cases
🏁 Scripts executed
The following scripts were executed for the analysis:
Script:
#!/bin/bash # Description: Check for tables that might be incorrectly identified due to overlapping characteristics. # Expected output: Tables correctly identified with no overlaps. # Commands: # List all tables and check their formats spark-sql -e "SHOW TABLES" | awk '{print $2}' | while read table; do echo "Checking table: $table" if spark-sql -e "DESCRIBE DETAIL $table" &> /dev/null; then echo "$table is a Delta table" elif spark-sql -e "SELECT * FROM $table LIMIT 1" --conf spark.sql.catalogImplementation=iceberg &> /dev/null; then echo "$table is an Iceberg table" else echo "$table is a Hive table" fi doneLength of output: 137
Script:
#!/bin/bash # Check the implementation of isIcebergTable and isDeltaTable methods ast-grep --pattern 'def isIcebergTable($$$) { $$$ }' ast-grep --pattern 'def isDeltaTable($$$) { $$$ }' # Look for any table format related tests fd -e scala -e java test --exec rg -l "tableFormat|isIcebergTable|isDeltaTable" # Check for any usage patterns of these methods rg "isIcebergTable|isDeltaTable" -A 2 -B 2Length of output: 1714
Script:
#!/bin/bash # Check the test implementation cat ./spark/src/test/scala/ai/chronon/spark/test/TableUtilsFormatTest.scala # Look for any error handling or validation in the implementation rg "Try\s*\{" -A 5 ./spark/src/main/scala/ai/chronon/spark/TableUtils.scalaLength of output: 8701
| val deltaConfigMap = Map( | ||
| "spark.sql.extensions" -> "io.delta.sql.DeltaSparkSessionExtension", | ||
| "spark.sql.catalog.spark_catalog" -> "org.apache.spark.sql.delta.catalog.DeltaCatalog", | ||
| "spark.chronon.table_write.format" -> "delta" | ||
| ) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
🛠️ Refactor suggestion
Consider extracting Delta Lake configuration to a shared location.
The Delta Lake configuration settings appear to be critical for the functionality. Consider moving these to a shared configuration object or utility class to ensure consistency across tests and production code.
// In a shared location like TestConfig.scala
object TestConfig {
val DeltaLakeConfig = Map(
"spark.sql.extensions" -> "io.delta.sql.DeltaSparkSessionExtension",
"spark.sql.catalog.spark_catalog" -> "org.apache.spark.sql.delta.catalog.DeltaCatalog",
"spark.chronon.table_write.format" -> "delta"
)
}| } | ||
| } | ||
|
|
||
| ignore("test insertion of partitioned data and removal of columns") { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Document reasons for ignored tests.
Two tests are marked as ignore without explanation. Please add comments explaining why these tests are disabled and when they should be enabled.
Add documentation above each ignored test explaining:
- Why the test is currently disabled
- What conditions need to be met to enable it
- Any related issues or tickets
Also applies to: 122-122
| private def testInsertPartitions(spark: SparkSession, | ||
| tableUtils: TableUtils, | ||
| tableName: String, | ||
| format: Format, | ||
| df1: DataFrame, | ||
| df2: DataFrame, | ||
| ds1: String, | ||
| ds2: String): Unit = { | ||
| tableUtils.insertPartitions(df1, tableName, autoExpand = true) | ||
| val addedColumns = df2.schema.fieldNames.filterNot(df1.schema.fieldNames.contains) | ||
| val removedColumns = df1.schema.fieldNames.filterNot(df2.schema.fieldNames.contains) | ||
| val inconsistentColumns = ( | ||
| for ( | ||
| (name1, dtype1) <- df1.schema.fields.map(structField => (structField.name, structField.dataType)); | ||
| (name2, dtype2) <- df2.schema.fields.map(structField => (structField.name, structField.dataType)) | ||
| ) yield { | ||
| name1 == name2 && dtype1 != dtype2 | ||
| } | ||
| ).filter(identity) | ||
|
|
||
| if (inconsistentColumns.nonEmpty) { | ||
| val insertTry = Try(tableUtils.insertPartitions(df2, tableName, autoExpand = true)) | ||
| val e = insertTry.failed.get.asInstanceOf[IncompatibleSchemaException] | ||
| assertEquals(inconsistentColumns.length, e.inconsistencies.length) | ||
| return | ||
| } | ||
|
|
||
| if (df2.schema != df1.schema) { | ||
| val insertTry = Try(tableUtils.insertPartitions(df2, tableName)) | ||
| assertTrue(insertTry.failed.get.isInstanceOf[AnalysisException]) | ||
| } | ||
|
|
||
| tableUtils.insertPartitions(df2, tableName, autoExpand = true) | ||
|
|
||
| // check that we wrote out a table in the right format | ||
| assertTrue(tableUtils.tableFormat(tableName) == format) | ||
|
|
||
| // check we have all the partitions written | ||
| val returnedPartitions = tableUtils.partitions(tableName) | ||
| assertTrue(returnedPartitions.toSet == Set(ds1, ds2)) | ||
|
|
||
| val dataRead1 = spark.table(tableName).where(col("ds") === ds1) | ||
| val dataRead2 = spark.table(tableName).where(col("ds") === ds2) | ||
| assertTrue(dataRead1.columns.length == dataRead2.columns.length) | ||
|
|
||
| val totalColumnsCount = (df1.schema.fieldNames.toSet ++ df2.schema.fieldNames.toSet).size | ||
| assertEquals(totalColumnsCount, dataRead1.columns.length) | ||
| assertEquals(totalColumnsCount, dataRead2.columns.length) | ||
|
|
||
| addedColumns.foreach(col => { | ||
| dataRead1.foreach(row => assertTrue(Option(row.getAs[Any](col)).isEmpty)) | ||
| }) | ||
| removedColumns.foreach(col => { | ||
| dataRead2.foreach(row => assertTrue(Option(row.getAs[Any](col)).isEmpty)) | ||
| }) | ||
| } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
🛠️ Refactor suggestion
Refactor testInsertPartitions for better maintainability.
The method is quite long and handles multiple responsibilities. Consider breaking it down into smaller, focused methods:
- Schema validation
- Partition validation
- Data validation
Here's a suggested refactoring:
private def testInsertPartitions(...): Unit = {
validateSchemaChanges(df1, df2)
performInsertions(tableUtils, df1, df2, tableName)
validateTableFormat(tableUtils, tableName, format)
validatePartitions(tableUtils, tableName, Set(ds1, ds2))
validateData(spark, tableName, df1, df2, ds1, ds2)
}
private def validateSchemaChanges(df1: DataFrame, df2: DataFrame): Unit = {
// Schema validation logic
}
private def validateData(...): Unit = {
// Data validation logic
}| private def withSparkSession[T](configs: Map[String, String])(test: SparkSession => T): T = { | ||
| val spark = SparkSessionBuilder.build("TableUtilsFormatTest", local = true, additionalConfig = Some(configs)) | ||
| try { | ||
| test(spark) | ||
| } finally { | ||
| spark.stop() | ||
| } | ||
| } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
🛠️ Refactor suggestion
Enhance error handling in SparkSession management.
Consider adding error logging and session state verification to ensure proper cleanup.
private def withSparkSession[T](configs: Map[String, String])(test: SparkSession => T): T = {
- val spark = SparkSessionBuilder.build("TableUtilsFormatTest", local = true, additionalConfig = Some(configs))
+ val spark = try {
+ SparkSessionBuilder.build("TableUtilsFormatTest", local = true, additionalConfig = Some(configs))
+ } catch {
+ case e: Exception =>
+ throw new RuntimeException(s"Failed to create SparkSession with configs: $configs", e)
+ }
try {
test(spark)
} finally {
spark.stop()
+ require(!spark.sparkContext.isStopped, "Failed to stop SparkSession properly")
}
}Committable suggestion was skipped due to low confidence.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 0
🧹 Outside diff range and nitpick comments (1)
spark/src/main/scala/ai/chronon/spark/TableUtils.scala (1)
275-287: Consider standardizing log messagesThe implementation is correct, but consider standardizing the log prefix to match isIcebergTable's style:
- logger.info(s"Delta check: Successfully read the format of table: $tableName as $format") + logger.info(s"DeltaCheck: Successfully read the format of table $tableName as $format") - logger.info(s"Delta check: Unable to read the format of the table $tableName using DESCRIBE DETAIL") + logger.info(s"DeltaCheck: Unable to read the format of table $tableName using DESCRIBE DETAIL")
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
📒 Files selected for processing (2)
- spark/src/main/scala/ai/chronon/spark/TableUtils.scala (6 hunks)
- spark/src/test/scala/ai/chronon/spark/test/TableUtilsFormatTest.scala (1 hunks)
🚧 Files skipped from review as they are similar to previous changes (1)
- spark/src/test/scala/ai/chronon/spark/test/TableUtilsFormatTest.scala
🔇 Additional comments (6)
spark/src/main/scala/ai/chronon/spark/TableUtils.scala (6)
62-75: LGTM! Well-designed Format traitThe sealed trait design with clear method contracts provides a clean abstraction for different table formats.
114-131: LGTM! Clean DeltaLake implementationThe DeltaLake implementation correctly uses DeltaLog for partition retrieval and follows the trait contract.
226-234: LGTM! Clear format detection logicThe tableFormat method follows a clean precedence order for format detection.
239-241: LGTM! Clean Format trait integrationThe partition retrieval logic cleanly integrates with the new Format trait.
631-642: LGTM! Clear write format precedenceThe write format selection logic follows a clear and well-documented precedence order.
41-41: Verify Delta Lake dependencyThe code imports Delta Lake classes. Ensure the Delta Lake dependency is properly configured in build.sbt.
✅ Verification successful
Delta Lake dependency is properly configured ✅
The build.sbt file correctly includes:
- Delta Lake version 3.2.0 defined in
deltaVersion- Delta Spark dependency
"io.delta" %% "delta-spark"with the correct version🏁 Scripts executed
The following scripts were executed for the analysis:
Script:
#!/bin/bash # Description: Verify Delta Lake dependency configuration # Test: Check build.sbt for Delta Lake dependency rg -A 5 'deltaVersion|"io.delta"' build.sbtLength of output: 407
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 0
🧹 Outside diff range and nitpick comments (2)
spark/src/test/scala/ai/chronon/spark/test/TableUtilsFormatTest.scala (2)
44-44: Track Iceberg format support implementation.The TODO comment indicates planned support for Iceberg format. This should be tracked for implementation.
Would you like me to create a GitHub issue to track the Iceberg format support implementation?
61-61: Improve test naming for better clarity.Consider renaming the test cases to better reflect the specific scenarios being tested:
- "test insertion of partitioned data and adding of columns" → "shouldHandleColumnAdditionInPartitionedData"
- "test insertion of partitioned data and removal of columns" → "shouldHandleColumnRemovalInPartitionedData"
- "test insertion of partitioned data and modification of columns" → "shouldHandleColumnTypeModificationInPartitionedData"
This follows the behavior-driven development (BDD) style and makes the test purpose clearer.
Also applies to: 101-101, 140-140
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
📒 Files selected for processing (1)
- spark/src/test/scala/ai/chronon/spark/test/TableUtilsFormatTest.scala (1 hunks)
🔇 Additional comments (1)
spark/src/test/scala/ai/chronon/spark/test/TableUtilsFormatTest.scala (1)
184-239: Add validation for edge cases in partition testing.The test helper should also validate these edge cases:
- Empty DataFrames
- Null values in partition columns
- Special characters in column names
- Maximum number of partitions
Let's check if these edge cases are covered elsewhere:
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 2
🧹 Outside diff range and nitpick comments (3)
.github/workflows/test_scala_and_python.yaml (1)
100-108: Consider job dependencies and memory requirements.While the test setup looks good, consider the following suggestions:
- Add
needsfield if this job depends on successful compilation or other Delta Lake-related setup- Consider adjusting memory settings specifically for Delta Lake operations, which might require more resources
Apply this diff to enhance the job configuration:
table_utils_delta_format_spark_tests: runs-on: ubuntu-latest + needs: scala_compile_fmt_fix container: image: ghcr.io/${{ github.repository }}-ci:latest credentials: username: ${{ github.actor }} password: ${{ secrets.GITHUB_TOKEN }} defaults: run: working-directory: ${{ github.workspace }} steps: - uses: actions/checkout@v4 - name: Run table utils format test for Delta Lake run: | - export SBT_OPTS="-Xmx8G -Xms2G --add-opens=java.base/sun.nio.ch=ALL-UNNAMED" + export SBT_OPTS="-Xmx12G -Xms4G --add-opens=java.base/sun.nio.ch=ALL-UNNAMED" sbt "spark/testOnly ai.chronon.spark.test.TableUtilsFormatTest" env: format_test: deltalakespark/src/main/scala/ai/chronon/spark/SparkSessionBuilder.scala (2)
32-32: LGTM! Consider adding scaladoc.The constant is well-named and appropriately placed. Consider adding scaladoc to document its purpose and expected values.
/** Environment variable used to override table format for testing. * Supported values: "deltalake" */ val FormatTestEnvVar: String = "format_test"
41-57: Solid approach to format-specific configuration management.The implementation provides a clean and extensible way to handle different table formats through environment variables. This approach:
- Maintains backward compatibility
- Allows for easy testing of different formats
- Provides a foundation for adding more formats in the future
Consider creating an ADR (Architecture Decision Record) documenting this approach for future reference.
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
📒 Files selected for processing (3)
- .github/workflows/test_scala_and_python.yaml (1 hunks)
- spark/src/main/scala/ai/chronon/spark/SparkSessionBuilder.scala (3 hunks)
- spark/src/test/scala/ai/chronon/spark/test/TableUtilsFormatTest.scala (1 hunks)
🚧 Files skipped from review as they are similar to previous changes (1)
- spark/src/test/scala/ai/chronon/spark/test/TableUtilsFormatTest.scala
🔇 Additional comments (2)
.github/workflows/test_scala_and_python.yaml (2)
89-99: LGTM! Job configuration follows established patterns.The new job configuration is well-structured and consistent with other jobs in the workflow, properly utilizing the CI container and credentials.
89-108: Verify the test class existence and format.Let's verify that the test class exists and supports the format parameter:
✅ Verification successful
Let me gather more information about the test class implementation to verify if it properly handles the format parameter.
Test class and format parameter are properly implemented
The test class
TableUtilsFormatTestexists and correctly handles the format parameter:
- It reads the format from environment variable
format_test(defined asFormatTestEnvVar)- Default format is "hive" if not specified
- The test suite includes comprehensive format-specific tests for:
- Adding columns to partitioned data
- Removing columns from partitioned data
- Modifying column types
- Format verification is done via
tableUtils.tableFormat(tableName)assertionThe GitHub workflow correctly sets
format_test=deltalakeenvironment variable to run Delta Lake specific tests.🏁 Scripts executed
The following scripts were executed for the analysis:
Script:
#!/bin/bash # Description: Verify the test class exists and supports the format parameter # Test 1: Check if the test class exists echo "Checking for TableUtilsFormatTest..." rg -l "class TableUtilsFormatTest" # Test 2: Check how the format parameter is used echo "Checking format parameter usage..." rg -A 5 "format_test" "spark/src"Length of output: 965
Script:
#!/bin/bash # Check the implementation of TableUtilsFormatTest echo "Checking TableUtilsFormatTest implementation..." cat "spark/src/test/scala/ai/chronon/spark/test/TableUtilsFormatTest.scala" # Check if there are any other related format tests echo -e "\nChecking for other format-related tests..." rg -l "deltalake.*format" "spark/src/test"Length of output: 7464
| // allow us to override the format by specifying env vars. This allows us to not have to worry about interference | ||
| // between Spark sessions created in existing chronon tests that need the hive format and some specific tests | ||
| // that require a format override like delta lake. | ||
| val formatConfigs = sys.env.get(FormatTestEnvVar) match { | ||
| case Some("deltalake") => | ||
| Map( | ||
| "spark.sql.extensions" -> "io.delta.sql.DeltaSparkSessionExtension", | ||
| "spark.sql.catalog.spark_catalog" -> "org.apache.spark.sql.delta.catalog.DeltaCatalog", | ||
| "spark.chronon.table_write.format" -> "delta" | ||
| ) | ||
| case _ => Map.empty | ||
| } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
🛠️ Refactor suggestion
Consider externalizing Delta Lake configurations.
The Delta Lake-specific configurations are currently hard-coded. Consider moving these to a configuration file or constants for better maintainability and reusability.
object TableFormats {
val DeltaLake = "deltalake"
val DeltaLakeConfig = Map(
"spark.sql.extensions" -> "io.delta.sql.DeltaSparkSessionExtension",
"spark.sql.catalog.spark_catalog" -> "org.apache.spark.sql.delta.catalog.DeltaCatalog",
"spark.chronon.table_write.format" -> "delta"
)
}There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 2
🧹 Outside diff range and nitpick comments (3)
spark/src/main/scala/ai/chronon/spark/SparkSessionBuilder.scala (1)
32-32: LGTM! Consider adding ScalaDoc.The constant is well-named and follows conventions. Consider adding ScalaDoc to document its purpose and expected values.
/** Environment variable used to override table format during testing. * Supported values: * - "deltalake": Enables Delta Lake format with required configurations */ val FormatTestEnvVar: String = "format_test"spark/src/main/scala/ai/chronon/spark/ChrononKryoRegistrator.scala (2)
178-186: Consider adding debug logging for registration failures.While silently ignoring
ClassNotFoundExceptionis acceptable for production, adding debug-level logging would help during development and troubleshooting.def doRegister(name: String, kryo: Kryo): Unit = { try { kryo.register(Class.forName(name)) kryo.register(Class.forName(s"[L$name;")) // represents array of a type to jvm } catch { - case _: ClassNotFoundException => // do nothing + case e: ClassNotFoundException => + logDebug(s"Failed to register class $name: ${e.getMessage}") } }
188-196: Good architectural approach for Delta Lake integration.The extension of
ChrononKryoRegistratorprovides a clean way to add Delta Lake support while maintaining backward compatibility. This approach allows for:
- Easy toggling between regular and Delta Lake serialization
- Minimal changes to existing code
- Clear separation of concerns
Consider documenting when to use each registrator in the class-level documentation.
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
📒 Files selected for processing (2)
- spark/src/main/scala/ai/chronon/spark/ChrononKryoRegistrator.scala (1 hunks)
- spark/src/main/scala/ai/chronon/spark/SparkSessionBuilder.scala (3 hunks)
🔇 Additional comments (4)
spark/src/main/scala/ai/chronon/spark/SparkSessionBuilder.scala (3)
56-58: Skip comment - covered by previous review.A comprehensive review comment already exists for this segment discussing configuration precedence and logging recommendations.
89-89: Skip comment - logging concerns already addressed.The configuration application logic is correct. Logging improvements have been suggested in previous comments.
84-84: Verify Kryo registrator class availability.The dynamic selection of Kryo registrator is good, but we should verify the class exists to prevent runtime failures.
✅ Verification successful
Kryo registrator class is properly implemented and available
The
ChrononKryoRegistratorclass is properly implemented inspark/src/main/scala/ai/chronon/spark/ChrononKryoRegistrator.scalawith comprehensive class registrations for Kryo serialization. The class extendsKryoRegistratorand includes proper error handling for class registration.🏁 Scripts executed
The following scripts were executed for the analysis:
Script:
#!/bin/bash # Search for Kryo registrator class definitions echo "Searching for ChrononDeltaLakeKryoRegistrator definition..." ast-grep --pattern 'class ChrononDeltaLakeKryoRegistrator extends KryoRegistrator { $$$ }' echo -e "\nSearching for ChrononKryoRegistrator definition..." ast-grep --pattern 'class ChrononKryoRegistrator extends KryoRegistrator { $$$ }'Length of output: 13918
spark/src/main/scala/ai/chronon/spark/ChrononKryoRegistrator.scala (1)
169-177: LGTM! Clean refactoring of class registration.The change improves error handling while maintaining existing functionality. The use of
doRegistermethod makes the code more maintainable and robust.
|
|
||
| // allow us to override the format by specifying env vars. This allows us to not have to worry about interference | ||
| // between Spark sessions created in existing chronon tests that need the hive format and some specific tests | ||
| // that require a format override like delta lake. | ||
| val (formatConfigs, kryoRegistrator) = sys.env.get(FormatTestEnvVar) match { | ||
| case Some("deltalake") => | ||
| val configMap = Map( | ||
| "spark.sql.extensions" -> "io.delta.sql.DeltaSparkSessionExtension", | ||
| "spark.sql.catalog.spark_catalog" -> "org.apache.spark.sql.delta.catalog.DeltaCatalog", | ||
| "spark.chronon.table_write.format" -> "delta" | ||
| ) | ||
| (configMap, "ai.chronon.spark.ChrononDeltaLakeKryoRegistrator") | ||
| case _ => (Map.empty, "ai.chronon.spark.ChrononKryoRegistrator") | ||
| } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
🛠️ Refactor suggestion
Add logging and consider validating configurations.
The format-specific configuration handling is well-structured, but could benefit from:
- Logging the selected format for debugging
- Validating Delta Lake class availability when the format is selected
- Moving configuration strings to constants
val (formatConfigs, kryoRegistrator) = sys.env.get(FormatTestEnvVar) match {
case Some("deltalake") =>
+ logger.info(s"Using Delta Lake format for testing")
+ // Validate Delta Lake classes are available
+ try {
+ Class.forName("io.delta.sql.DeltaSparkSessionExtension")
+ } catch {
+ case e: ClassNotFoundException =>
+ logger.error("Delta Lake classes not found on classpath", e)
+ throw e
+ }
val configMap = Map(
"spark.sql.extensions" -> "io.delta.sql.DeltaSparkSessionExtension",
"spark.sql.catalog.spark_catalog" -> "org.apache.spark.sql.delta.catalog.DeltaCatalog",
"spark.chronon.table_write.format" -> "delta"
)
(configMap, "ai.chronon.spark.ChrononDeltaLakeKryoRegistrator")
- case _ => (Map.empty, "ai.chronon.spark.ChrononKryoRegistrator")
+ case other =>
+ logger.debug(s"Using default format (format_test=$other)")
+ (Map.empty, "ai.chronon.spark.ChrononKryoRegistrator")
}Committable suggestion was skipped due to low confidence.
ce1a964 to
fe6efbc
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 1
🧹 Outside diff range and nitpick comments (3)
.github/workflows/test_scala_and_python.yaml (1)
100-108: Consider adding test dependencies and job ordering.While the test setup looks good, consider the following improvements:
- Add this job as a required check in branch protection rules since it tests core Delta Lake functionality
- Consider adding job dependencies using
needs:to ensure this runs afterscala_compile_fmt_fixto catch compilation issues earlyThe test execution is well structured with appropriate memory settings and environment variable configuration.
spark/src/main/scala/ai/chronon/spark/SparkSessionBuilder.scala (2)
32-32: Add documentation for the format test environment variable.Consider adding ScalaDoc to document the purpose, possible values, and impact of this environment variable on the Spark session configuration.
+ /** Environment variable controlling the table format for testing. + * Possible values: + * - "deltalake": Enables Delta Lake format with specific configurations + * - undefined: Uses default format with standard configurations + */ val FormatTestEnvVar: String = "format_test"
47-51: Extract Delta Lake configuration keys as constants.Consider moving the configuration keys and values to a dedicated object for better maintainability and reuse. This also helps with documentation and type safety.
object DeltaLakeConfig { val Extensions = "spark.sql.extensions" val ExtensionsValue = "io.delta.sql.DeltaSparkSessionExtension" val Catalog = "spark.sql.catalog.spark_catalog" val CatalogValue = "org.apache.spark.sql.delta.catalog.DeltaCatalog" val Format = "spark.chronon.table_write.format" val FormatValue = "delta" val configs: Map[String, String] = Map( Extensions -> ExtensionsValue, Catalog -> CatalogValue, Format -> FormatValue ) }
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
📒 Files selected for processing (6)
.github/workflows/test_scala_and_python.yaml(1 hunks)build.sbt(3 hunks)spark/src/main/scala/ai/chronon/spark/ChrononKryoRegistrator.scala(1 hunks)spark/src/main/scala/ai/chronon/spark/SparkSessionBuilder.scala(3 hunks)spark/src/main/scala/ai/chronon/spark/TableUtils.scala(6 hunks)spark/src/test/scala/ai/chronon/spark/test/TableUtilsFormatTest.scala(1 hunks)
🚧 Files skipped from review as they are similar to previous changes (2)
- build.sbt
- spark/src/test/scala/ai/chronon/spark/test/TableUtilsFormatTest.scala
🔇 Additional comments (10)
.github/workflows/test_scala_and_python.yaml (2)
89-99: LGTM! Job configuration follows established patterns.
The new job configuration correctly follows the same pattern as other Spark test jobs, using the same container image and credentials setup.
106-106: Verify the test class existence and integration.
Let's verify that the referenced test class exists and is properly integrated.
✅ Verification successful
Test class exists and is properly integrated
The test class TableUtilsFormatTest exists at spark/src/test/scala/ai/chronon/spark/test/TableUtilsFormatTest.scala and is properly integrated:
- Extends
AnyFunSuitefor Scala testing - Contains three comprehensive test cases for table operations:
- Testing insertion of partitioned data and adding columns
- Testing insertion of partitioned data and removal of columns
- Testing insertion of partitioned data and modification of columns
- Properly handles test setup with SparkSession and cleanup
- Includes parameterized format testing through environment variables
🏁 Scripts executed
The following scripts were executed for the analysis:
Script:
#!/bin/bash
# Description: Verify the TableUtilsFormatTest class exists and its integration
# Check if the test class exists
rg -t scala "class TableUtilsFormatTest"
# Check for Delta Lake related test cases
rg -t scala "test.*Delta.*Format" -g "TableUtilsFormatTest.scala"
# Verify the test is properly tagged/categorized
ast-grep --pattern 'class TableUtilsFormatTest extends $$$'
Length of output: 14047
spark/src/main/scala/ai/chronon/spark/ChrononKryoRegistrator.scala (2)
173-173: Clean refactoring of class registration logic!
The extraction of registration logic into doRegister method with proper error handling is a good improvement. It:
- Centralizes the registration logic
- Adds proper error handling for ClassNotFoundException
- Maintains registration of both class and array types
Also applies to: 182-189
192-200:
Add missing Delta Lake class registrations
Based on the codebase usage patterns, the current Delta Lake class registration is incomplete. The following essential Delta Lake classes should also be registered:
org.apache.spark.sql.delta.actions.RemoveFileorg.apache.spark.sql.delta.actions.SetTransactionorg.apache.spark.sql.delta.actions.Metadataorg.apache.spark.sql.delta.actions.Protocol
Apply this diff to add the missing registrations:
override def registerClasses(kryo: Kryo): Unit = {
super.registerClasses(kryo)
val additionalDeltaNames = Seq(
"org.apache.spark.sql.delta.stats.DeltaFileStatistics",
- "org.apache.spark.sql.delta.actions.AddFile"
+ "org.apache.spark.sql.delta.actions.AddFile",
+ "org.apache.spark.sql.delta.actions.RemoveFile",
+ "org.apache.spark.sql.delta.actions.SetTransaction",
+ "org.apache.spark.sql.delta.actions.Metadata",
+ "org.apache.spark.sql.delta.actions.Protocol"
)
additionalDeltaNames.foreach(name => doRegister(name, kryo))
}spark/src/main/scala/ai/chronon/spark/TableUtils.scala (6)
62-102: LGTM! Well-designed Format trait with clear documentation.
The trait provides a clean abstraction for different table formats with appropriate methods for partition handling and table configuration.
323-331: LGTM! Clean implementation of format provider initialization.
The implementation properly handles custom format providers through configuration while maintaining a sensible default.
405-408: Replace deprecated filterKeys method.
The use of filterKeys is deprecated since Scala 2.13.
Line range hint 725-752: LGTM! Clean integration of format-specific table creation.
The implementation properly delegates format-specific details to the respective format implementations while maintaining a clean separation of concerns.
264-268: Verify Delta Lake version compatibility.
The comment indicates potential compatibility issues with newer Delta Lake versions. Consider adding version compatibility tests to prevent runtime errors.
#!/bin/bash
# Description: Check Delta Lake and Spark version compatibility
# Check build.sbt for Delta Lake version
rg -A 5 "io.delta"
# Check for Spark version
rg -A 5 "org.apache.spark"274-286: Verify Delta Lake partition handling implementation.
The implementation uses DeltaLog for partition retrieval since Delta Lake doesn't support the SHOW PARTITIONS syntax. Ensure this approach works correctly with different partition configurations.
✅ Verification successful
Delta Lake partition handling implementation is correctly implemented
The implementation follows Delta Lake's recommended approach for retrieving partitions:
- Gets table location using
DESCRIBE DETAIL - Uses
DeltaLog.forTable()to access Delta Lake's internal APIs - Retrieves partitions via
snapshot.allFilesand extracts partition values
The code handles the known limitation that Delta Lake doesn't support SHOW PARTITIONS syntax (issue #996) by using the proper DeltaLog-based alternative.
🏁 Scripts executed
The following scripts were executed for the analysis:
Script:
#!/bin/bash
# Description: Check Delta Lake partition handling implementation
# Check for Delta Lake partition-related tests
rg -A 5 "DeltaLake.*partition"
# Check for potential partition handling issues in Delta Lake
rg -A 5 "partition.*DeltaLog"
Length of output: 111365
| baseBuilder | ||
| .config("spark.serializer", "org.apache.spark.serializer.KryoSerializer") | ||
| .config("spark.kryo.registrator", "ai.chronon.spark.ChrononKryoRegistrator") | ||
| .config("spark.kryo.registrator", kryoRegistrator) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Add runtime validation for Kryo registrator class.
The Kryo registrator class is dynamically set based on the format, but there's no validation to ensure the class exists. This could lead to runtime failures if the Delta Lake dependencies are missing.
if (enforceKryoSerializer) {
+ // Validate Kryo registrator class exists
+ try {
+ Class.forName(kryoRegistrator)
+ } catch {
+ case e: ClassNotFoundException =>
+ logger.error(s"Kryo registrator class not found: $kryoRegistrator", e)
+ throw new RuntimeException(s"Missing dependencies for format ${sys.env.get(FormatTestEnvVar).getOrElse("default")}", e)
+ }
baseBuilder
.config("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
.config("spark.kryo.registrator", kryoRegistrator)Committable suggestion skipped: line range outside the PR's diff.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 0
🧹 Outside diff range and nitpick comments (1)
.github/workflows/test_scala_and_python.yaml (1)
89-109: LGTM with suggestions for the new Delta Lake test jobThe new test job configuration looks good and follows the repository's patterns. A few suggestions to enhance the setup:
- Consider adding test result artifacts for debugging
- Add timeout-minutes to prevent hanging tests
Apply these enhancements to improve the job configuration:
table_utils_delta_format_spark_tests: runs-on: ubuntu-latest + timeout-minutes: 30 container: image: ghcr.io/${{ github.repository }}-ci:latest credentials: username: ${{ github.actor }} password: ${{ secrets.GITHUB_TOKEN }} defaults: run: working-directory: ${{ github.workspace }} steps: - uses: actions/checkout@v4 - name: Run table utils format test for Delta Lake run: | export SBT_OPTS="-Xmx8G -Xms2G --add-opens=java.base/sun.nio.ch=ALL-UNNAMED" sbt "spark/testOnly ai.chronon.spark.test.TableUtilsFormatTest" env: format_test: deltalake + - name: Upload test results + if: always() + uses: actions/upload-artifact@v4 + with: + name: delta-lake-test-results + path: spark/target/test-reports/
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
📒 Files selected for processing (1)
.github/workflows/test_scala_and_python.yaml(3 hunks)
🧰 Additional context used
🪛 actionlint
.github/workflows/test_scala_and_python.yaml
52-52: label "ubuntu-8_cores-32_gb" is unknown. available labels are "windows-latest", "windows-latest-8-cores", "windows-2022", "windows-2019", "ubuntu-latest", "ubuntu-latest-4-cores", "ubuntu-latest-8-cores", "ubuntu-latest-16-cores", "ubuntu-24.04", "ubuntu-22.04", "ubuntu-20.04", "macos-latest", "macos-latest-xl", "macos-latest-xlarge", "macos-latest-large", "macos-15-xlarge", "macos-15-large", "macos-15", "macos-14-xl", "macos-14-xlarge", "macos-14-large", "macos-14", "macos-14.0", "macos-13-xl", "macos-13-xlarge", "macos-13-large", "macos-13", "macos-13.0", "macos-12-xl", "macos-12-xlarge", "macos-12-large", "macos-12", "macos-12.0", "self-hosted", "x64", "arm", "arm64", "linux", "macos", "windows". if it is a custom label for self-hosted runner, set list of labels in actionlint.yaml config file
(runner-label)
🔇 Additional comments (1)
.github/workflows/test_scala_and_python.yaml (1)
52-52: Verify the custom runner configuration
The changes to the other_spark_tests job raise two concerns:
- The runner label
ubuntu-8_cores-32_gbis not a standard GitHub-hosted runner. If this is a custom self-hosted runner, it needs to be properly configured and documented. - The memory allocation has been significantly increased (from 8G/2G to 24G/4G), suggesting higher resource requirements.
Please confirm:
- Is this a custom self-hosted runner? If so, ensure it's properly configured in your GitHub organization settings.
- Are the increased memory requirements (24GB) justified by the Delta Lake integration?
Consider using a standard GitHub-hosted runner with the following alternatives:
ubuntu-latest-16-coresfor high CPU needsubuntu-latestwith adjusted memory settings for standard workloads
Also applies to: 67-67
🧰 Tools
🪛 actionlint
52-52: label "ubuntu-8_cores-32_gb" is unknown. available labels are "windows-latest", "windows-latest-8-cores", "windows-2022", "windows-2019", "ubuntu-latest", "ubuntu-latest-4-cores", "ubuntu-latest-8-cores", "ubuntu-latest-16-cores", "ubuntu-24.04", "ubuntu-22.04", "ubuntu-20.04", "macos-latest", "macos-latest-xl", "macos-latest-xlarge", "macos-latest-large", "macos-15-xlarge", "macos-15-large", "macos-15", "macos-14-xl", "macos-14-xlarge", "macos-14-large", "macos-14", "macos-14.0", "macos-13-xl", "macos-13-xlarge", "macos-13-large", "macos-13", "macos-13.0", "macos-12-xl", "macos-12-xlarge", "macos-12-large", "macos-12", "macos-12.0", "self-hosted", "x64", "arm", "arm64", "linux", "macos", "windows". if it is a custom label for self-hosted runner, set list of labels in actionlint.yaml config file
(runner-label)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 1
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
📒 Files selected for processing (2)
spark/src/main/scala/ai/chronon/spark/TableUtils.scala(6 hunks)spark/src/test/scala/ai/chronon/spark/test/TableUtilsFormatTest.scala(1 hunks)
🚧 Files skipped from review as they are similar to previous changes (1)
- spark/src/test/scala/ai/chronon/spark/test/TableUtilsFormatTest.scala
| override def partitions(tableName: String)(implicit sparkSession: SparkSession): Seq[Map[String, String]] = { | ||
| // delta lake doesn't support the `SHOW PARTITIONS <tableName>` syntax - https://github.com/delta-io/delta/issues/996 | ||
| // there's alternative ways to retrieve partitions using the DeltaLog abstraction which is what we have to lean into | ||
| // below | ||
|
|
||
| val describeResult = sparkSession.sql(s"DESCRIBE DETAIL $tableName") | ||
| val tablePath = describeResult.select("location").head().getString(0) | ||
|
|
||
| val snapshot = DeltaLog.forTable(sparkSession, tablePath).update() | ||
| val snapshotPartitionsDf = snapshot.allFiles.toDF().select("partitionValues") | ||
| val partitions = snapshotPartitionsDf.collect().map(r => r.getAs[Map[String, String]](0)) | ||
| partitions | ||
| } | ||
|
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Potential Driver Memory Issue with collect() on Large Delta Tables
The use of collect() on snapshotPartitionsDf may lead to OutOfMemoryError on the driver when dealing with large Delta tables, as it brings all partition data into memory on a single node. Consider processing partition values in a distributed manner or leveraging aggregations to minimize memory usage.
Would you like assistance in refactoring this method to handle partition data more efficiently?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 2
spark/src/test/scala/ai/chronon/spark/test/FeatureWithLabelJoinTest.scala
Outdated
Show resolved
Hide resolved
spark/src/test/scala/ai/chronon/spark/test/FeatureWithLabelJoinTest.scala
Outdated
Show resolved
Hide resolved
7d48f2e to
d093ebe
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 0
🧹 Outside diff range and nitpick comments (1)
spark/src/main/scala/ai/chronon/spark/TableUtils.scala (1)
111-186: Well-structured format provider with robust format detection.The implementation provides good separation of concerns and proper error handling. However, consider:
- Adding metrics/monitoring for format detection failures
- Caching format detection results for frequently accessed tables
Consider adding a cache mechanism:
+ private val formatCache = new java.util.concurrent.ConcurrentHashMap[String, Format]() + override def readFormat(tableName: String): Format = { + Option(formatCache.get(tableName)).getOrElse { if (isIcebergTable(tableName)) { - Iceberg + val format = Iceberg + formatCache.put(tableName, format) + format } else if (isDeltaTable(tableName)) { - DeltaLake + val format = DeltaLake + formatCache.put(tableName, format) + format } else { - Hive + val format = Hive + formatCache.put(tableName, format) + format } + } }
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
📒 Files selected for processing (6)
.github/workflows/test_scala_and_python.yaml(1 hunks)build.sbt(3 hunks)spark/src/main/scala/ai/chronon/spark/ChrononKryoRegistrator.scala(1 hunks)spark/src/main/scala/ai/chronon/spark/SparkSessionBuilder.scala(3 hunks)spark/src/main/scala/ai/chronon/spark/TableUtils.scala(6 hunks)spark/src/test/scala/ai/chronon/spark/test/TableUtilsFormatTest.scala(1 hunks)
🚧 Files skipped from review as they are similar to previous changes (5)
- .github/workflows/test_scala_and_python.yaml
- build.sbt
- spark/src/main/scala/ai/chronon/spark/ChrononKryoRegistrator.scala
- spark/src/main/scala/ai/chronon/spark/SparkSessionBuilder.scala
- spark/src/test/scala/ai/chronon/spark/test/TableUtilsFormatTest.scala
🔇 Additional comments (4)
spark/src/main/scala/ai/chronon/spark/TableUtils.scala (4)
41-41: LGTM: Delta Lake import added correctly.
The import aligns with the Delta Lake integration requirements.
66-102: Well-designed Format trait with comprehensive functionality.
The trait provides a clean abstraction for different table formats with:
- Clear documentation
- Proper error handling for unsupported operations
- Flexible partition handling
725-728: Clean integration of format-specific table creation logic.
The changes properly integrate format-specific table creation while maintaining backward compatibility. The separation of format-specific string generation improves maintainability.
Also applies to: 750-751
268-292: Verify Delta Lake version compatibility.
The implementation looks correct, but there's a potential version compatibility issue mentioned in the comments. Consider:
- Adding version compatibility checks at runtime
- Documenting supported Delta Lake versions in the README
Consider implementing version detection and graceful fallback mechanisms for unsupported Delta Lake versions.
spark/src/main/scala/ai/chronon/spark/SparkSessionBuilder.scala
Outdated
Show resolved
Hide resolved
35682f4 to
b9ef406
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 0
🧹 Outside diff range and nitpick comments (2)
.github/workflows/test_scala_spark.yaml (1)
82-82: Fix YAML formattingRemove trailing spaces on line 82 to maintain consistent YAML formatting.
- +🧰 Tools
🪛 yamllint (1.35.1)
[error] 82-82: trailing spaces
(trailing-spaces)
spark/src/main/scala/ai/chronon/spark/ChrononKryoRegistrator.scala (1)
182-190: LGTM! Well-structured error handling and array type registration.The method effectively handles both class and array class registration with appropriate error handling. The comment explaining array type representation is helpful.
Consider adding a brief Scaladoc comment to document the method's purpose and parameters:
+ /** + * Registers both a class and its array type with Kryo serializer. + * Silently ignores classes that are not found at runtime. + * + * @param name The fully qualified class name to register + * @param kryo The Kryo instance for registration + */ def doRegister(name: String, kryo: Kryo): Unit = {
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
📒 Files selected for processing (6)
.github/workflows/test_scala_spark.yaml(1 hunks)build.sbt(3 hunks)spark/src/main/scala/ai/chronon/spark/ChrononKryoRegistrator.scala(1 hunks)spark/src/main/scala/ai/chronon/spark/SparkSessionBuilder.scala(3 hunks)spark/src/main/scala/ai/chronon/spark/TableUtils.scala(6 hunks)spark/src/test/scala/ai/chronon/spark/test/TableUtilsFormatTest.scala(1 hunks)
🚧 Files skipped from review as they are similar to previous changes (3)
- build.sbt
- spark/src/main/scala/ai/chronon/spark/SparkSessionBuilder.scala
- spark/src/test/scala/ai/chronon/spark/test/TableUtilsFormatTest.scala
🧰 Additional context used
📓 Learnings (1)
spark/src/main/scala/ai/chronon/spark/ChrononKryoRegistrator.scala (1)
Learnt from: nikhil-zlai
PR: zipline-ai/chronon#51
File: spark/src/main/scala/ai/chronon/spark/ChrononKryoRegistrator.scala:192-200
Timestamp: 2024-11-26T19:47:53.900Z
Learning: Only suggest registering Delta Lake action classes for serialization if they are actually used in the codebase.
🪛 yamllint (1.35.1)
.github/workflows/test_scala_spark.yaml
[error] 82-82: trailing spaces
(trailing-spaces)
🔇 Additional comments (10)
.github/workflows/test_scala_spark.yaml (2)
62-71: Consider resource requirements for Delta Lake tests
The other_spark_tests job runs on ubuntu-8_cores-32_gb with 24GB heap memory, while this new Delta Lake test job runs on ubuntu-latest with 8GB heap. Consider if Delta Lake operations might require similar resources for reliable test execution.
76-81: Verify test coverage for Delta Lake integration
The test seems narrowly focused on TableUtilsFormatTest. Consider if additional integration tests are needed to verify Delta Lake functionality in real-world scenarios.
Run the following to check test coverage:
✅ Verification successful
Delta Lake test coverage appears sufficient
The TableUtilsFormatTest provides comprehensive testing of Delta Lake functionality through parameterized tests that cover:
- Schema evolution (adding/removing columns)
- Partitioned data insertion
- Format verification
- Data consistency checks
- Error handling for incompatible schemas
Additional test files like SchemaEvolutionTest and LocalExportTableAbilityTest also include Delta Lake related test scenarios. The test infrastructure allows running the same test suite against different formats (including Delta Lake) via environment variables, ensuring consistent behavior across formats.
🏁 Scripts executed
The following scripts were executed for the analysis:
Script:
#!/bin/bash
# Search for Delta Lake related test files
rg -t scala "class.*Test.*Delta"
# Search for Delta Lake related test annotations/tags
rg -t scala "@Test.*Delta"
Length of output: 63
Script:
#!/bin/bash
# Let's try a broader search for Delta Lake related tests
rg -t scala "deltalake|DeltaLake" -A 5
# Search for test files that might contain Delta Lake tests
fd "Test.*\.scala$" --exec rg -l "delta"
# Look for Delta Lake related imports in test files
rg -t scala "^import.*delta"
Length of output: 4433
Script:
#!/bin/bash
# Let's specifically look for TableUtilsFormatTest content
fd "TableUtilsFormatTest.scala" --exec cat {}
# Search for any test files in the spark directory that might test Delta Lake functionality
fd "Test.*\.scala$" -p spark/src/test --exec rg -l "Format|delta|Delta" {}
# Look for test configuration or setup related to Delta Lake
rg -t scala "format_test.*deltalake" -A 5
Length of output: 7222
spark/src/main/scala/ai/chronon/spark/ChrononKryoRegistrator.scala (2)
173-181: LGTM! Good refactoring of class registration logic.
The extraction of registration logic into a separate method improves code maintainability and reduces duplication.
192-200: LGTM! Minimal and focused Delta Lake class registration.
The implementation correctly registers only the essential Delta Lake classes (DeltaFileStatistics and AddFile) that are confirmed to be used in the codebase, aligning with previous feedback about minimizing registrations to only necessary classes.
spark/src/main/scala/ai/chronon/spark/TableUtils.scala (6)
41-41: LGTM: Import for Delta Lake integration
The import for DeltaLog is correctly placed and necessary for Delta Lake functionality.
62-102: Well-designed trait abstraction for table formats
The Format trait provides a clean abstraction for different table formats with:
- Clear method contracts
- Default implementation for
primaryPartitions - Proper error handling for unsupported operations
This design allows for easy addition of new formats while maintaining consistent behavior.
264-268: Good documentation of version compatibility constraints
The comment clearly documents the version compatibility requirements and potential issues with newer Delta Lake versions, providing guidance for users who need to work with different versions.
323-341: LGTM: Well-implemented format provider with good extensibility
The format provider implementation:
- Allows for custom format providers through configuration
- Has good error handling
- Provides clear documentation
- Maintains backward compatibility with existing format settings
726-729: LGTM: Consistent integration with format provider
The changes to createTableSql correctly integrate with the new format provider system while maintaining the existing functionality.
Also applies to: 751-752
274-286:
Consider optimizing partition retrieval for large Delta tables
The current implementation of partitions collects all files to the driver using collect(). This could cause memory issues with large Delta tables.
Consider using a more memory-efficient approach:
- Use aggregation to reduce data transferred to driver
- Process partitions in batches
Let's verify the potential impact:
## Summary Port of our OSS delta lake PR - airbnb/chronon#869. Largely the same aside from delta lake versions. We don't need this immediately atm but we'll need this if we have other users come along that need delta lake (or we need to add support for formats like hudi) ## Checklist - [X] Added Unit Tests - [X] Covered by existing CI - [ ] Integration tested - [ ] Documentation update <!-- This is an auto-generated comment: release notes by coderabbit.ai --> ## Summary by CodeRabbit - **New Features** - Added support for Delta Lake operations with new dependencies and configurations. - Introduced new traits and case objects for handling different table formats, enhancing data management capabilities. - Added a new job in the CI workflow for testing Delta Lake format functionality. - **Bug Fixes** - Improved error handling in class registration processes. - **Tests** - Implemented a suite of unit tests for the `TableUtils` class to validate partitioned data insertions with schema modifications. <!-- end of auto-generated comment: release notes by coderabbit.ai -->
## Summary Port of our OSS delta lake PR - airbnb/chronon#869. Largely the same aside from delta lake versions. We don't need this immediately atm but we'll need this if we have other users come along that need delta lake (or we need to add support for formats like hudi) ## Checklist - [X] Added Unit Tests - [X] Covered by existing CI - [ ] Integration tested - [ ] Documentation update <!-- This is an auto-generated comment: release notes by coderabbit.ai --> ## Summary by CodeRabbit - **New Features** - Added support for Delta Lake operations with new dependencies and configurations. - Introduced new traits and case objects for handling different table formats, enhancing data management capabilities. - Added a new job in the CI workflow for testing Delta Lake format functionality. - **Bug Fixes** - Improved error handling in class registration processes. - **Tests** - Implemented a suite of unit tests for the `TableUtils` class to validate partitioned data insertions with schema modifications. <!-- end of auto-generated comment: release notes by coderabbit.ai -->
## Summary Port of our OSS delta lake PR - airbnb/chronon#869. Largely the same aside from delta lake versions. We don't need this immediately atm but we'll need this if we have other users come along that need delta lake (or we need to add support for formats like hudi) ## Checklist - [X] Added Unit Tests - [X] Covered by existing CI - [ ] Integration tested - [ ] Documentation update <!-- This is an auto-generated comment: release notes by coderabbit.ai --> ## Summary by CodeRabbit - **New Features** - Added support for Delta Lake operations with new dependencies and configurations. - Introduced new traits and case objects for handling different table formats, enhancing data management capabilities. - Added a new job in the CI workflow for testing Delta Lake format functionality. - **Bug Fixes** - Improved error handling in class registration processes. - **Tests** - Implemented a suite of unit tests for the `TableUtils` class to validate partitioned data insertions with schema modifications. <!-- end of auto-generated comment: release notes by coderabbit.ai -->
## Summary Port of our OSS delta lake PR - airbnb/chronon#869. Largely the same aside from delta lake versions. We don't need this immediately atm but we'll need this if we have other users come along that need delta lake (or we need to add support for formats like hudi) ## Cheour clientslist - [X] Added Unit Tests - [X] Covered by existing CI - [ ] Integration tested - [ ] Documentation update <!-- This is an auto-generated comment: release notes by coderabbit.ai --> ## Summary by CodeRabbit - **New Features** - Added support for Delta Lake operations with new dependencies and configurations. - Introduced new traits and case objects for handling different table formats, enhancing data management capabilities. - Added a new job in the CI workflow for testing Delta Lake format functionality. - **Bug Fixes** - Improved error handling in class registration processes. - **Tests** - Implemented a suite of unit tests for the `TableUtils` class to validate partitioned data insertions with schema modifications. <!-- end of auto-generated comment: release notes by coderabbit.ai -->
Summary
Port of our OSS delta lake PR - airbnb/chronon#869. Largely the same aside from delta lake versions. We don't need this immediately atm but we'll need this if we have other users come along that need delta lake (or we need to add support for formats like hudi)
Checklist
Summary by CodeRabbit
New Features
Bug Fixes
Tests
TableUtilsclass to validate partitioned data insertions with schema modifications.