-
Notifications
You must be signed in to change notification settings - Fork 8
Cherry-pick Adding Timestamp Check to Join Analyzer (#802) #169
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
WalkthroughThis pull request introduces comprehensive timestamp validation checks for Changes
Possibly related PRs
Suggested reviewers
Poem
Warning Review ran into problems🔥 ProblemsGitHub Actions: Resource not accessible by integration - https://docs.github.com/rest/actions/workflow-runs#list-workflow-runs-for-a-repository. Please grant the required permissions to the CodeRabbit GitHub App under the organization or repository settings. 🪧 TipsChatThere are 3 ways to chat with CodeRabbit:
Note: Be mindful of the bot's finite context window. It's strongly recommended to break down tasks such as reading entire modules into smaller chunks. For a focused discussion, use review comments to chat about specific files and their changes, instead of using the PR comments. CodeRabbit Commands (Invoked using PR comments)
Other keywords and placeholders
CodeRabbit Configuration File (
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 1
🧹 Nitpick comments (2)
spark/src/main/scala/ai/chronon/spark/Analyzer.scala (1)
553-591: Assert usage invalidateTimestampChecksAsserts may terminate the job abruptly. Consider a custom exception or error flag for more controlled handling.
spark/src/test/scala/ai/chronon/spark/test/AnalyzerTest.scala (1)
368-405:getTestGBSourceWithTsfactoryNeatly generates data scenarios. Maybe factor out repeated code for clarity.
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro (Legacy)
📒 Files selected for processing (3)
docs/source/test_deploy_serve/Test.md(1 hunks)spark/src/main/scala/ai/chronon/spark/Analyzer.scala(3 hunks)spark/src/test/scala/ai/chronon/spark/test/AnalyzerTest.scala(2 hunks)
⏰ Context from checks skipped due to timeout of 90000ms (5)
- GitHub Check: mutation_spark_tests
- GitHub Check: table_utils_delta_format_spark_tests
- GitHub Check: join_spark_tests
- GitHub Check: scala_compile_fmt_fix
- GitHub Check: fetcher_spark_tests
🔇 Additional comments (10)
spark/src/main/scala/ai/chronon/spark/Analyzer.scala (3)
213-216: Timestamp checks for GroupByGood addition. Optionally consider a fallback if
groupBy.inputDfis empty to avoid a runtime error.
299-301: Timestamp checks for JoinThe usage mirrors the GroupBy logic. Looks fine. Maybe unify both calls to reduce duplication.
520-551: Consider edge cases inrunTimestampChecksUse guards for empty DataFrame to prevent
.head()or.limit(sampleN)from failing unexpectedly.spark/src/test/scala/ai/chronon/spark/test/AnalyzerTest.scala (6)
216-245:testJoinAnalyzerCheckTimestampHasValuesis conciseCovers valid timestamp scenario. Good coverage.
248-278:testJoinAnalyzerCheckTimestampOutOfRangeMakes sense for negative test. Fine.
280-310:testJoinAnalyzerCheckTimestampAllNullsThorough negative test. Nicely verifies null checks.
312-329:testGroupByAnalyzerCheckTimestampHasValuesEnsures non-null timestamps in GroupBy. Helpful test.
331-347:testGroupByAnalyzerCheckTimestampAllNullsValidation is correct. Test is clear.
349-366:testGroupByAnalyzerCheckTimestampOutOfRangeCovers invalid timestamp range scenario adequately.
docs/source/test_deploy_serve/Test.md (1)
34-36: Timestamp validations doc is clearNicely explains why we need valid and non-null timestamps.
| def dataFrameToMap(inputDf: DataFrame): Map[String, String] = { | ||
| val row: Row = inputDf.head() | ||
| val schema = inputDf.schema | ||
| val columns = schema.fieldNames | ||
| val values = row.toSeq | ||
| columns | ||
| .zip(values) | ||
| .map { | ||
| case (column, value) => | ||
| (column, value.toString) | ||
| } | ||
| .toMap | ||
| } | ||
|
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
dataFrameToMap may fail on empty results
Calling inputDf.head() can throw if DataFrame has no rows. Safely handle or guard by checking row count first.
+if (inputDf.isEmpty) {
+ return Map.empty
+}
val row: Row = inputDf.head()📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
| def dataFrameToMap(inputDf: DataFrame): Map[String, String] = { | |
| val row: Row = inputDf.head() | |
| val schema = inputDf.schema | |
| val columns = schema.fieldNames | |
| val values = row.toSeq | |
| columns | |
| .zip(values) | |
| .map { | |
| case (column, value) => | |
| (column, value.toString) | |
| } | |
| .toMap | |
| } | |
| def dataFrameToMap(inputDf: DataFrame): Map[String, String] = { | |
| if (inputDf.isEmpty) { | |
| return Map.empty | |
| } | |
| val row: Row = inputDf.head() | |
| val schema = inputDf.schema | |
| val columns = schema.fieldNames | |
| val values = row.toSeq | |
| columns | |
| .zip(values) | |
| .map { | |
| case (column, value) => | |
| (column, value.toString) | |
| } | |
| .toMap | |
| } |
* Update unit tests * remove dead import * Update documentation * refactor validation check * add comment --------- Co-authored-by: Praveen Kundurthy <[email protected]>
1a2947c to
803b071
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 0
♻️ Duplicate comments (1)
spark/src/main/scala/ai/chronon/spark/Analyzer.scala (1)
592-604:⚠️ Potential issueHandle empty DataFrame case.
Add null check to prevent NoSuchElementException.
def dataFrameToMap(inputDf: DataFrame): Map[String, String] = { + if (inputDf.isEmpty) { + return Map.empty + } val row: Row = inputDf.head()
🧹 Nitpick comments (1)
spark/src/main/scala/ai/chronon/spark/Analyzer.scala (1)
520-551: Extract timestamp range constants.Move hardcoded timestamp range values to constants for better maintainability.
+ private val MIN_VALID_TIMESTAMP = 31536000000L // 1971-01-01 00:00:00 + private val MAX_VALID_TIMESTAMP = 4102473599999L // 2099-12-31 23:59:59 def runTimestampChecks(df: DataFrame, sampleNumber: Int = 1000): Map[String, String] = { // ... - sum(when(col(Constants.TimeColumn).between(31536000000L, 4102473599999L), lit(0)).otherwise(lit(1))) + sum(when(col(Constants.TimeColumn).between(MIN_VALID_TIMESTAMP, MAX_VALID_TIMESTAMP), lit(0)).otherwise(lit(1)))
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro (Legacy)
📒 Files selected for processing (3)
docs/source/test_deploy_serve/Test.md(1 hunks)spark/src/main/scala/ai/chronon/spark/Analyzer.scala(3 hunks)spark/src/test/scala/ai/chronon/spark/test/AnalyzerTest.scala(2 hunks)
🚧 Files skipped from review as they are similar to previous changes (1)
- docs/source/test_deploy_serve/Test.md
⏰ Context from checks skipped due to timeout of 90000ms (6)
- GitHub Check: table_utils_delta_format_spark_tests
- GitHub Check: other_spark_tests
- GitHub Check: fetcher_spark_tests
- GitHub Check: mutation_spark_tests
- GitHub Check: scala_compile_fmt_fix
- GitHub Check: join_spark_tests
🔇 Additional comments (8)
spark/src/test/scala/ai/chronon/spark/test/AnalyzerTest.scala (7)
216-246: LGTM!Test follows AAA pattern and verifies timestamp validation for join analysis.
248-278: LGTM!Test verifies that analyzer fails for out-of-range timestamps.
280-310: LGTM!Test verifies that analyzer fails for all null timestamps.
312-329: LGTM!Test verifies timestamp validation for group-by analysis.
331-347: LGTM!Test verifies that analyzer fails for all null timestamps in group-by.
349-366: LGTM!Test verifies that analyzer fails for out-of-range timestamps in group-by.
368-403: LGTM!Helper function generates test data with different timestamp scenarios.
spark/src/main/scala/ai/chronon/spark/Analyzer.scala (1)
553-590: LGTM!Function provides clear error messages and logs validation results.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 0
🧹 Nitpick comments (2)
spark/src/main/scala/ai/chronon/spark/Analyzer.scala (2)
523-552: Consider making sample size configurableThe hard-coded sample size limit of 100 could be made configurable through the class constructor.
538-541: Extract timestamp range constantsMagic numbers for timestamp validation range should be extracted as named constants.
+ private val MIN_VALID_TIMESTAMP = 31536000000L // 1971-01-01 + private val MAX_VALID_TIMESTAMP = 4102473599999L // 2099-12-31 sum(when(col(Constants.TimeColumn) - .between(31536000000L, 4102473599999L), lit(0)) + .between(MIN_VALID_TIMESTAMP, MAX_VALID_TIMESTAMP), lit(0))
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro (Legacy)
📒 Files selected for processing (1)
spark/src/main/scala/ai/chronon/spark/Analyzer.scala(4 hunks)
⏰ Context from checks skipped due to timeout of 90000ms (6)
- GitHub Check: table_utils_delta_format_spark_tests
- GitHub Check: other_spark_tests
- GitHub Check: join_spark_tests
- GitHub Check: fetcher_spark_tests
- GitHub Check: scala_compile_fmt_fix
- GitHub Check: mutation_spark_tests
🔇 Additional comments (3)
spark/src/main/scala/ai/chronon/spark/Analyzer.scala (3)
48-48: LGTM!Import statement correctly includes required Spark SQL functions for timestamp validation.
214-217: LGTM!Timestamp validation is consistently integrated into both GroupBy and Join analysis workflows.
Also applies to: 300-302
593-605:⚠️ Potential issueHandle empty DataFrames in dataFrameToMap
The method will fail if the DataFrame is empty.
def dataFrameToMap(inputDf: DataFrame): Map[String, String] = { + if (inputDf.isEmpty) { + return Map.empty + } val row: Row = inputDf.head() val schema = inputDf.schema val columns = schema.fieldNamesLikely invalid or redundant comment.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 0
🧹 Nitpick comments (4)
spark/src/test/scala/ai/chronon/spark/test/AnalyzerTest.scala (4)
274-274: Fix typo in comment-//run analyzer an ensure ts timestamp values result in analyzer passing +//run analyzer and ensure ts timestamp values result in analyzer passing
362-362: Fix incorrect comment-//run analyzer and trigger assertion error when timestamps are all NULL +//run analyzer and trigger assertion error when timestamps are out of range
368-403: Consider making timestamp manipulation more explicitThe nanosecond conversion could be more clear with a named constant.
- .withColumn("ts", col("ts")*lit(1000)) // convert to nanoseconds to test analyzer + val NANOS_MULTIPLIER = 1000 + .withColumn("ts", col("ts")*lit(NANOS_MULTIPLIER)) // convert to nanoseconds to test analyzer
396-401: Consider adding return type annotation- val out = Builders.Source.events( + val out: api.Source = Builders.Source.events(
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro (Legacy)
📒 Files selected for processing (1)
spark/src/test/scala/ai/chronon/spark/test/AnalyzerTest.scala(3 hunks)
⏰ Context from checks skipped due to timeout of 90000ms (6)
- GitHub Check: mutation_spark_tests
- GitHub Check: fetcher_spark_tests
- GitHub Check: other_spark_tests
- GitHub Check: table_utils_delta_format_spark_tests
- GitHub Check: join_spark_tests
- GitHub Check: scala_compile_fmt_fix
🔇 Additional comments (3)
spark/src/test/scala/ai/chronon/spark/test/AnalyzerTest.scala (3)
28-28: LGTM!Required imports for timestamp manipulation.
216-246: LGTM!Well-structured test for valid timestamp values.
312-366: LGTM!Comprehensive test coverage for GroupBy timestamp validation.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 0
♻️ Duplicate comments (1)
spark/src/main/scala/ai/chronon/spark/Analyzer.scala (1)
594-606:⚠️ Potential issueHandle empty DataFrames.
Add a check for empty DataFrames to avoid
NoSuchElementException.def dataFrameToMap(inputDf: DataFrame): Map[String, String] = { + if (inputDf.isEmpty) { + return Map.empty + } val row: Row = inputDf.head() val schema = inputDf.schema val columns = schema.fieldNames
🧹 Nitpick comments (2)
spark/src/main/scala/ai/chronon/spark/Analyzer.scala (2)
524-553: Consider extracting magic numbers to constants.Extract timestamp range values and sample size limit to named constants for better maintainability.
+ private val MIN_TIMESTAMP_MS = 31536000000L // 1971-01-01 00:00:00 + private val MAX_TIMESTAMP_MS = 4102473599999L // 2099-12-31 23:59:59 + private val MAX_SAMPLE_SIZE = 100 def runTimestampChecks(df: DataFrame, sampleNumber: Int = 100): Map[String, String] = { val hasTimestamp = df.schema.fieldNames.contains(Constants.TimeColumn) val mapTimestampChecks = if (hasTimestamp) { - val sampleN = if (sampleNumber > 100) { 100 } + val sampleN = if (sampleNumber > MAX_SAMPLE_SIZE) { MAX_SAMPLE_SIZE } else { sampleNumber } dataFrameToMap( df.limit(sampleN) .agg( sum(when(col(Constants.TimeColumn).isNull, lit(0)).otherwise(lit(1))) .cast(StringType) .as("notNullCount"), - sum(when(col(Constants.TimeColumn).between(31536000000L, 4102473599999L), lit(0)).otherwise(lit(1))) + sum(when(col(Constants.TimeColumn).between(MIN_TIMESTAMP_MS, MAX_TIMESTAMP_MS), lit(0)).otherwise(lit(1))) .cast(StringType) .as("badRangeCount") )
562-592: Consider improving error messages.Add specific examples of valid timestamp ranges in the error messages for better debugging.
assert( timestampCheckMap("notNullCount") != "0", s"""[ERROR]: $configType validation failed. - | Please check that source has non-null timestamps. + | Please check that source has non-null timestamps in column '${Constants.TimeColumn}'. | check notNullCount: ${timestampCheckMap("notNullCount")} | """.stripMargin ) assert( timestampCheckMap("badRangeCount") == "0", s"""[ERROR]: $configType validation failed. - | Please check that source has valid epoch millisecond timestamps. + | Please check that source has valid epoch millisecond timestamps between 1971-01-01 and 2099-12-31. | badRangeCount: ${timestampCheckMap("badRangeCount")} | """.stripMargin )
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro (Legacy)
📒 Files selected for processing (2)
spark/src/main/scala/ai/chronon/spark/Analyzer.scala(4 hunks)spark/src/test/scala/ai/chronon/spark/test/AnalyzerTest.scala(3 hunks)
🔇 Additional comments (7)
spark/src/test/scala/ai/chronon/spark/test/AnalyzerTest.scala (7)
217-247: LGTM!Test verifies that join analyzer accepts valid timestamps.
249-279: LGTM!Test verifies that join analyzer rejects out-of-range timestamps.
281-311: LGTM!Test verifies that join analyzer rejects all-null timestamps.
313-330: LGTM!Test verifies that group-by analyzer accepts valid timestamps.
332-348: LGTM!Test verifies that group-by analyzer rejects all-null timestamps.
350-367: LGTM!Test verifies that group-by analyzer rejects out-of-range timestamps.
369-404: LGTM!Helper method creates test sources with different timestamp scenarios.
nikhil-zlai
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM!
* Update unit tests * remove dead import * Update documentation * refactor validation check * add comment --------- ## Summary ## Checklist - [ ] Added Unit Tests - [ ] Covered by existing CI - [ ] Integration tested - [ ] Documentation update <!-- This is an auto-generated comment: release notes by coderabbit.ai --> ## Summary by CodeRabbit - **New Features** - Added timestamp validation checks for GroupBy and Join configurations. - Implemented validation to ensure timestamp columns are non-null and within the range of 1971-01-01 to 2099-01-01. - **Tests** - Added comprehensive test cases to verify timestamp validation functionality for both Join and GroupBy operations. - **Documentation** - Updated documentation to include new validation guidelines for timestamp configurations. <!-- end of auto-generated comment: release notes by coderabbit.ai --> --------- Co-authored-by: Praveen Kundurthy <[email protected]> Co-authored-by: Praveen Kundurthy <[email protected]> Co-authored-by: ezvz <[email protected]> Co-authored-by: Thomas Chow <[email protected]>
* Update unit tests * remove dead import * Update documentation * refactor validation check * add comment --------- ## Summary ## Checklist - [ ] Added Unit Tests - [ ] Covered by existing CI - [ ] Integration tested - [ ] Documentation update <!-- This is an auto-generated comment: release notes by coderabbit.ai --> ## Summary by CodeRabbit - **New Features** - Added timestamp validation checks for GroupBy and Join configurations. - Implemented validation to ensure timestamp columns are non-null and within the range of 1971-01-01 to 2099-01-01. - **Tests** - Added comprehensive test cases to verify timestamp validation functionality for both Join and GroupBy operations. - **Documentation** - Updated documentation to include new validation guidelines for timestamp configurations. <!-- end of auto-generated comment: release notes by coderabbit.ai --> --------- Co-authored-by: Praveen Kundurthy <[email protected]> Co-authored-by: Praveen Kundurthy <[email protected]> Co-authored-by: ezvz <[email protected]>
* Update unit tests * remove dead import * Update documentation * refactor validation check * add comment --------- ## Summary ## Checklist - [ ] Added Unit Tests - [ ] Covered by existing CI - [ ] Integration tested - [ ] Documentation update <!-- This is an auto-generated comment: release notes by coderabbit.ai --> ## Summary by CodeRabbit - **New Features** - Added timestamp validation checks for GroupBy and Join configurations. - Implemented validation to ensure timestamp columns are non-null and within the range of 1971-01-01 to 2099-01-01. - **Tests** - Added comprehensive test cases to verify timestamp validation functionality for both Join and GroupBy operations. - **Documentation** - Updated documentation to include new validation guidelines for timestamp configurations. <!-- end of auto-generated comment: release notes by coderabbit.ai --> --------- Co-authored-by: Praveen Kundurthy <[email protected]> Co-authored-by: Praveen Kundurthy <[email protected]> Co-authored-by: ezvz <[email protected]>
* Update unit tests * remove dead import * Update documentation * refactor validation check * add comment --------- ## Summary ## Checklist - [ ] Added Unit Tests - [ ] Covered by existing CI - [ ] Integration tested - [ ] Documentation update <!-- This is an auto-generated comment: release notes by coderabbit.ai --> ## Summary by CodeRabbit - **New Features** - Added timestamp validation checks for GroupBy and Join configurations. - Implemented validation to ensure timestamp columns are non-null and within the range of 1971-01-01 to 2099-01-01. - **Tests** - Added comprehensive test cases to verify timestamp validation functionality for both Join and GroupBy operations. - **Documentation** - Updated documentation to include new validation guidelines for timestamp configurations. <!-- end of auto-generated comment: release notes by coderabbit.ai --> --------- Co-authored-by: Praveen Kundurthy <[email protected]> Co-authored-by: Praveen Kundurthy <[email protected]> Co-authored-by: ezvz <[email protected]>
…#802) (#169) * Update unit tests * remove dead import * Update documentation * refactor validation cheour clients * add comment --------- ## Summary ## Cheour clientslist - [ ] Added Unit Tests - [ ] Covered by existing CI - [ ] Integration tested - [ ] Documentation update <!-- This is an auto-generated comment: release notes by coderabbit.ai --> ## Summary by CodeRabbit - **New Features** - Added timestamp validation cheour clientss for GroupBy and Join configurations. - Implemented validation to ensure timestamp columns are non-null and within the range of 1971-01-01 to 2099-01-01. - **Tests** - Added comprehensive test cases to verify timestamp validation functionality for both Join and GroupBy operations. - **Documentation** - Updated documentation to include new validation guidelines for timestamp configurations. <!-- end of auto-generated comment: release notes by coderabbit.ai --> --------- Co-authored-by: Praveen Kundurthy <[email protected]> Co-authored-by: Praveen Kundurthy <[email protected]> Co-authored-by: ezvz <[email protected]>
Update unit tests
remove dead import
Update documentation
refactor validation check
add comment
Summary
Checklist
Summary by CodeRabbit
New Features
Tests
Documentation