-
Notifications
You must be signed in to change notification settings - Fork 8
feat: implement external source sensor ZIP-726 #967
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
WalkthroughThe Changes
Sequence Diagram(s)sequenceDiagram
participant User
participant BatchNodeRunner
participant MetaData
participant TableUtils
User->>BatchNodeRunner: run(metadata, conf, range, tableUtils)
alt conf is EXTERNAL_SOURCE_SENSOR
BatchNodeRunner->>BatchNodeRunner: checkPartitions(conf, metadata, tableUtils, range)
BatchNodeRunner->>MetaData: get partition spec for table
BatchNodeRunner->>TableUtils: verify partitions exist
alt partitions missing & retries left
BatchNodeRunner->>BatchNodeRunner: wait retryInterval, retry
end
alt all partitions exist
BatchNodeRunner-->>User: exit(0)
else retries exhausted
BatchNodeRunner-->>User: log error, exit(1)
end
else
BatchNodeRunner->>User: handle other node types (unchanged)
end
Estimated code review effort2 (~20 minutes) Possibly related PRs
Suggested reviewers
Poem
🪧 TipsChatThere are 3 ways to chat with CodeRabbit:
SupportNeed help? Create a ticket on our support page for assistance with any issues or questions. Note: Be mindful of the bot's finite context window. It's strongly recommended to break down tasks such as reading entire modules into smaller chunks. For a focused discussion, use review comments to chat about specific files and their changes, instead of using the PR comments. CodeRabbit Commands (Invoked using PR comments)
Other keywords and placeholders
CodeRabbit Configuration File (
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 0
🧹 Nitpick comments (1)
spark/src/main/scala/ai/chronon/spark/batch/BatchNodeRunner.scala (1)
56-71: Consider error handling for missing table in metadata.The
find()operation could returnNoneif the table isn't found intableDependencies, leading tospec = None. WhiletableUtils.partitions()acceptsOption[PartitionSpec], consider logging a warning when the table isn't found in metadata.val spec = metadata.executionInfo.tableDependencies.asScala .find(_.tableInfo.table == tableName) .map(_.tableInfo.partitionSpec(tableUtils.partitionSpec)) + if (spec.isEmpty) { + logger.warn(s"Table ${tableName} not found in metadata dependencies") + }
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro
📒 Files selected for processing (1)
spark/src/main/scala/ai/chronon/spark/batch/BatchNodeRunner.scala(2 hunks)
🧰 Additional context used
🧬 Code Graph Analysis (1)
spark/src/main/scala/ai/chronon/spark/batch/BatchNodeRunner.scala (1)
api/src/main/scala/ai/chronon/api/Extensions.scala (1)
partitionSpec(1206-1211)
⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (9)
- GitHub Check: scala_compile_fmt_fix
- GitHub Check: analyzer_tests
- GitHub Check: groupby_tests
- GitHub Check: streaming_tests
- GitHub Check: join_tests
- GitHub Check: batch_tests
- GitHub Check: fetcher_tests
- GitHub Check: spark_tests
- GitHub Check: enforce_triggered_workflows
🔇 Additional comments (1)
spark/src/main/scala/ai/chronon/spark/batch/BatchNodeRunner.scala (1)
144-144: LGTM on metadata extraction.Follows the established pattern for other node content types.
2d05dd3 to
f3a1b00
Compare
ad2b0d0 to
d39ad94
Compare
7f4d220 to
eeb66aa
Compare
d39ad94 to
0fc488c
Compare
46fb39b to
e2323c7
Compare
0fc488c to
0015f0d
Compare
b3f473c to
66af081
Compare
66af081 to
40854bf
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 1
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro
📒 Files selected for processing (1)
spark/src/main/scala/ai/chronon/spark/batch/BatchNodeRunner.scala(3 hunks)
🧠 Learnings (1)
spark/src/main/scala/ai/chronon/spark/batch/BatchNodeRunner.scala (11)
Learnt from: piyush-zlai
PR: #33
File: cloud_aws/src/main/scala/ai/chronon/integrations/aws/DynamoDBKVStoreImpl.scala:29-30
Timestamp: 2024-10-08T16:18:45.669Z
Learning: In the codebase, the KVStore implementation provides an implicit ExecutionContext in scope, so it's unnecessary to import another.
Learnt from: chewy-zlai
PR: #50
File: spark/src/test/scala/ai/chronon/spark/test/MockKVStore.scala:19-28
Timestamp: 2024-10-31T18:29:45.027Z
Learning: In MockKVStore located at spark/src/test/scala/ai/chronon/spark/test/MockKVStore.scala, the multiPut method is intended to be a simple implementation without dataset existence validation, duplicate validation logic elimination, or actual storage of key-value pairs for verification.
Learnt from: nikhil-zlai
PR: #70
File: service/src/main/java/ai/chronon/service/ApiProvider.java:6-6
Timestamp: 2024-12-03T04:04:33.809Z
Learning: The import scala.util.ScalaVersionSpecificCollectionsConverter in service/src/main/java/ai/chronon/service/ApiProvider.java is correct and should not be flagged in future reviews.
Learnt from: chewy-zlai
PR: #62
File: spark/src/main/scala/ai/chronon/spark/stats/drift/SummaryUploader.scala:9-10
Timestamp: 2024-11-06T21:54:56.160Z
Learning: In Spark applications, when defining serializable classes, passing an implicit ExecutionContext parameter can cause serialization issues. In such cases, it's acceptable to use scala.concurrent.ExecutionContext.Implicits.global.
Learnt from: chewy-zlai
PR: #47
File: online/src/main/scala/ai/chronon/online/MetadataStore.scala:232-0
Timestamp: 2024-10-17T00:12:09.763Z
Learning: In the KVStore trait located at online/src/main/scala/ai/chronon/online/KVStore.scala, there are two create methods: def create(dataset: String): Unit and def create(dataset: String, props: Map[String, Any]): Unit. The version with props ignores the props parameter, and the simpler version without props is appropriate when props are not needed.
Learnt from: piyush-zlai
PR: #33
File: cloud_aws/src/test/scala/ai/chronon/integrations/aws/DynamoDBKVStoreTest.scala:175-175
Timestamp: 2024-10-07T15:09:51.567Z
Learning: Hardcoding future timestamps in tests within DynamoDBKVStoreTest.scala is acceptable when data is generated and queried within the same time range, ensuring the tests remain valid over time.
Learnt from: piyush-zlai
PR: #44
File: hub/app/store/DynamoDBMonitoringStore.scala:98-143
Timestamp: 2024-10-15T15:30:15.514Z
Learning: In the Scala file hub/app/store/DynamoDBMonitoringStore.scala, within the makeLoadedConfs method, the .recover method is correctly applied to the Try returned by response.values to handle exceptions from the underlying store.
Learnt from: piyush-zlai
PR: #33
File: online/src/main/scala/ai/chronon/online/Api.scala:69-69
Timestamp: 2024-10-07T15:21:50.787Z
Learning: In the KVStore trait located at online/src/main/scala/ai/chronon/online/Api.scala, the default implementation of the create method (def create(dataset: String, props: Map[String, Any]): Unit = create(dataset)) doesn't leverage the props parameter, but subclasses like DynamoDBKVStoreImpl use the props parameter in their overridden implementations.
Learnt from: piyush-zlai
PR: #33
File: online/src/main/scala/ai/chronon/online/Api.scala:69-69
Timestamp: 2024-10-08T16:18:45.669Z
Learning: In the KVStore trait located at online/src/main/scala/ai/chronon/online/Api.scala, the default implementation of the create method (def create(dataset: String, props: Map[String, Any]): Unit = create(dataset)) doesn't leverage the props parameter, but subclasses like DynamoDBKVStoreImpl use the props parameter in their overridden implementations.
Learnt from: piyush-zlai
PR: #53
File: hub/app/controllers/TimeSeriesController.scala:224-224
Timestamp: 2024-10-29T15:21:58.102Z
Learning: In the mocked data implementation in hub/app/controllers/TimeSeriesController.scala, potential NumberFormatException exceptions due to parsing errors (e.g., when using val featureId = name.split("_").last.toInt) are acceptable and will be addressed when adding the concrete backend.
Learnt from: tchow-zlai
PR: #263
File: cloud_gcp/src/main/scala/ai/chronon/integrations/cloud_gcp/BigQueryFormat.scala:29-60
Timestamp: 2025-01-24T23:55:30.256Z
Learning: In BigQuery integration, table existence check is performed outside the BigQueryFormat.createTable method, at a higher level in TableUtils.createTable.
🧰 Additional context used
🧠 Learnings (1)
spark/src/main/scala/ai/chronon/spark/batch/BatchNodeRunner.scala (11)
Learnt from: piyush-zlai
PR: #33
File: cloud_aws/src/main/scala/ai/chronon/integrations/aws/DynamoDBKVStoreImpl.scala:29-30
Timestamp: 2024-10-08T16:18:45.669Z
Learning: In the codebase, the KVStore implementation provides an implicit ExecutionContext in scope, so it's unnecessary to import another.
Learnt from: chewy-zlai
PR: #50
File: spark/src/test/scala/ai/chronon/spark/test/MockKVStore.scala:19-28
Timestamp: 2024-10-31T18:29:45.027Z
Learning: In MockKVStore located at spark/src/test/scala/ai/chronon/spark/test/MockKVStore.scala, the multiPut method is intended to be a simple implementation without dataset existence validation, duplicate validation logic elimination, or actual storage of key-value pairs for verification.
Learnt from: nikhil-zlai
PR: #70
File: service/src/main/java/ai/chronon/service/ApiProvider.java:6-6
Timestamp: 2024-12-03T04:04:33.809Z
Learning: The import scala.util.ScalaVersionSpecificCollectionsConverter in service/src/main/java/ai/chronon/service/ApiProvider.java is correct and should not be flagged in future reviews.
Learnt from: chewy-zlai
PR: #62
File: spark/src/main/scala/ai/chronon/spark/stats/drift/SummaryUploader.scala:9-10
Timestamp: 2024-11-06T21:54:56.160Z
Learning: In Spark applications, when defining serializable classes, passing an implicit ExecutionContext parameter can cause serialization issues. In such cases, it's acceptable to use scala.concurrent.ExecutionContext.Implicits.global.
Learnt from: chewy-zlai
PR: #47
File: online/src/main/scala/ai/chronon/online/MetadataStore.scala:232-0
Timestamp: 2024-10-17T00:12:09.763Z
Learning: In the KVStore trait located at online/src/main/scala/ai/chronon/online/KVStore.scala, there are two create methods: def create(dataset: String): Unit and def create(dataset: String, props: Map[String, Any]): Unit. The version with props ignores the props parameter, and the simpler version without props is appropriate when props are not needed.
Learnt from: piyush-zlai
PR: #33
File: cloud_aws/src/test/scala/ai/chronon/integrations/aws/DynamoDBKVStoreTest.scala:175-175
Timestamp: 2024-10-07T15:09:51.567Z
Learning: Hardcoding future timestamps in tests within DynamoDBKVStoreTest.scala is acceptable when data is generated and queried within the same time range, ensuring the tests remain valid over time.
Learnt from: piyush-zlai
PR: #44
File: hub/app/store/DynamoDBMonitoringStore.scala:98-143
Timestamp: 2024-10-15T15:30:15.514Z
Learning: In the Scala file hub/app/store/DynamoDBMonitoringStore.scala, within the makeLoadedConfs method, the .recover method is correctly applied to the Try returned by response.values to handle exceptions from the underlying store.
Learnt from: piyush-zlai
PR: #33
File: online/src/main/scala/ai/chronon/online/Api.scala:69-69
Timestamp: 2024-10-07T15:21:50.787Z
Learning: In the KVStore trait located at online/src/main/scala/ai/chronon/online/Api.scala, the default implementation of the create method (def create(dataset: String, props: Map[String, Any]): Unit = create(dataset)) doesn't leverage the props parameter, but subclasses like DynamoDBKVStoreImpl use the props parameter in their overridden implementations.
Learnt from: piyush-zlai
PR: #33
File: online/src/main/scala/ai/chronon/online/Api.scala:69-69
Timestamp: 2024-10-08T16:18:45.669Z
Learning: In the KVStore trait located at online/src/main/scala/ai/chronon/online/Api.scala, the default implementation of the create method (def create(dataset: String, props: Map[String, Any]): Unit = create(dataset)) doesn't leverage the props parameter, but subclasses like DynamoDBKVStoreImpl use the props parameter in their overridden implementations.
Learnt from: piyush-zlai
PR: #53
File: hub/app/controllers/TimeSeriesController.scala:224-224
Timestamp: 2024-10-29T15:21:58.102Z
Learning: In the mocked data implementation in hub/app/controllers/TimeSeriesController.scala, potential NumberFormatException exceptions due to parsing errors (e.g., when using val featureId = name.split("_").last.toInt) are acceptable and will be addressed when adding the concrete backend.
Learnt from: tchow-zlai
PR: #263
File: cloud_gcp/src/main/scala/ai/chronon/integrations/cloud_gcp/BigQueryFormat.scala:29-60
Timestamp: 2025-01-24T23:55:30.256Z
Learning: In BigQuery integration, table existence check is performed outside the BigQueryFormat.createTable method, at a higher level in TableUtils.createTable.
⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (17)
- GitHub Check: service_tests
- GitHub Check: scala_compile_fmt_fix
- GitHub Check: cloud_gcp_tests
- GitHub Check: api_tests
- GitHub Check: online_tests
- GitHub Check: aggregator_tests
- GitHub Check: flink_tests
- GitHub Check: cloud_aws_tests
- GitHub Check: service_commons_tests
- GitHub Check: streaming_tests
- GitHub Check: analyzer_tests
- GitHub Check: groupby_tests
- GitHub Check: batch_tests
- GitHub Check: fetcher_tests
- GitHub Check: join_tests
- GitHub Check: spark_tests
- GitHub Check: enforce_triggered_workflows
🔇 Additional comments (3)
spark/src/main/scala/ai/chronon/spark/batch/BatchNodeRunner.scala (3)
8-15: LGTM on import additions.Clean addition of required imports for external source sensor functionality.
23-23: LGTM on TimeUnit import.Required for retry interval functionality.
56-90: Well-structured retry mechanism with solid error handling.The checkPartitions method correctly implements:
- Configurable retry count and interval
- Proper partition validation logic
- Clean recursive retry pattern
- Appropriate logging
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 0
🧹 Nitpick comments (1)
spark/src/test/scala/ai/chronon/spark/test/batch/BatchNodeRunnerTest.scala (1)
492-513: Retry behavior test needs timing verification improvement.The timing assertion on line 511 may be flaky due to system variance. Consider a more generous upper bound or removing it entirely since the retry interval is set to 0.
- assertTrue("Test should complete within reasonable time", (endTime - startTime) < 5000) + assertTrue("Test should complete within reasonable time", (endTime - startTime) < 10000)
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro
📒 Files selected for processing (2)
spark/src/main/scala/ai/chronon/spark/batch/BatchNodeRunner.scala(3 hunks)spark/src/test/scala/ai/chronon/spark/test/batch/BatchNodeRunnerTest.scala(2 hunks)
🧠 Learnings (1)
spark/src/test/scala/ai/chronon/spark/test/batch/BatchNodeRunnerTest.scala (7)
Learnt from: piyush-zlai
PR: #33
File: cloud_aws/src/test/scala/ai/chronon/integrations/aws/DynamoDBKVStoreTest.scala:175-175
Timestamp: 2024-10-07T15:09:51.567Z
Learning: Hardcoding future timestamps in tests within DynamoDBKVStoreTest.scala is acceptable when data is generated and queried within the same time range, ensuring the tests remain valid over time.
Learnt from: piyush-zlai
PR: #33
File: cloud_aws/src/main/scala/ai/chronon/integrations/aws/DynamoDBKVStoreImpl.scala:29-30
Timestamp: 2024-10-08T16:18:45.669Z
Learning: In the codebase, the KVStore implementation provides an implicit ExecutionContext in scope, so it's unnecessary to import another.
Learnt from: chewy-zlai
PR: #50
File: spark/src/test/scala/ai/chronon/spark/test/MockKVStore.scala:19-28
Timestamp: 2024-10-31T18:29:45.027Z
Learning: In MockKVStore located at spark/src/test/scala/ai/chronon/spark/test/MockKVStore.scala, the multiPut method is intended to be a simple implementation without dataset existence validation, duplicate validation logic elimination, or actual storage of key-value pairs for verification.
Learnt from: chewy-zlai
PR: #47
File: online/src/main/scala/ai/chronon/online/MetadataStore.scala:232-0
Timestamp: 2024-10-17T00:12:09.763Z
Learning: In the KVStore trait located at online/src/main/scala/ai/chronon/online/KVStore.scala, there are two create methods: def create(dataset: String): Unit and def create(dataset: String, props: Map[String, Any]): Unit. The version with props ignores the props parameter, and the simpler version without props is appropriate when props are not needed.
Learnt from: piyush-zlai
PR: #33
File: online/src/main/scala/ai/chronon/online/Api.scala:69-69
Timestamp: 2024-10-07T15:21:50.787Z
Learning: In the KVStore trait located at online/src/main/scala/ai/chronon/online/Api.scala, the default implementation of the create method (def create(dataset: String, props: Map[String, Any]): Unit = create(dataset)) doesn't leverage the props parameter, but subclasses like DynamoDBKVStoreImpl use the props parameter in their overridden implementations.
Learnt from: piyush-zlai
PR: #33
File: online/src/main/scala/ai/chronon/online/Api.scala:69-69
Timestamp: 2024-10-08T16:18:45.669Z
Learning: In the KVStore trait located at online/src/main/scala/ai/chronon/online/Api.scala, the default implementation of the create method (def create(dataset: String, props: Map[String, Any]): Unit = create(dataset)) doesn't leverage the props parameter, but subclasses like DynamoDBKVStoreImpl use the props parameter in their overridden implementations.
Learnt from: chewy-zlai
PR: #62
File: spark/src/main/scala/ai/chronon/spark/stats/drift/SummaryUploader.scala:9-10
Timestamp: 2024-11-06T21:54:56.160Z
Learning: In Spark applications, when defining serializable classes, passing an implicit ExecutionContext parameter can cause serialization issues. In such cases, it's acceptable to use scala.concurrent.ExecutionContext.Implicits.global.
🚧 Files skipped from review as they are similar to previous changes (1)
- spark/src/main/scala/ai/chronon/spark/batch/BatchNodeRunner.scala
🧰 Additional context used
🧠 Learnings (1)
spark/src/test/scala/ai/chronon/spark/test/batch/BatchNodeRunnerTest.scala (7)
Learnt from: piyush-zlai
PR: #33
File: cloud_aws/src/test/scala/ai/chronon/integrations/aws/DynamoDBKVStoreTest.scala:175-175
Timestamp: 2024-10-07T15:09:51.567Z
Learning: Hardcoding future timestamps in tests within DynamoDBKVStoreTest.scala is acceptable when data is generated and queried within the same time range, ensuring the tests remain valid over time.
Learnt from: piyush-zlai
PR: #33
File: cloud_aws/src/main/scala/ai/chronon/integrations/aws/DynamoDBKVStoreImpl.scala:29-30
Timestamp: 2024-10-08T16:18:45.669Z
Learning: In the codebase, the KVStore implementation provides an implicit ExecutionContext in scope, so it's unnecessary to import another.
Learnt from: chewy-zlai
PR: #50
File: spark/src/test/scala/ai/chronon/spark/test/MockKVStore.scala:19-28
Timestamp: 2024-10-31T18:29:45.027Z
Learning: In MockKVStore located at spark/src/test/scala/ai/chronon/spark/test/MockKVStore.scala, the multiPut method is intended to be a simple implementation without dataset existence validation, duplicate validation logic elimination, or actual storage of key-value pairs for verification.
Learnt from: chewy-zlai
PR: #47
File: online/src/main/scala/ai/chronon/online/MetadataStore.scala:232-0
Timestamp: 2024-10-17T00:12:09.763Z
Learning: In the KVStore trait located at online/src/main/scala/ai/chronon/online/KVStore.scala, there are two create methods: def create(dataset: String): Unit and def create(dataset: String, props: Map[String, Any]): Unit. The version with props ignores the props parameter, and the simpler version without props is appropriate when props are not needed.
Learnt from: piyush-zlai
PR: #33
File: online/src/main/scala/ai/chronon/online/Api.scala:69-69
Timestamp: 2024-10-07T15:21:50.787Z
Learning: In the KVStore trait located at online/src/main/scala/ai/chronon/online/Api.scala, the default implementation of the create method (def create(dataset: String, props: Map[String, Any]): Unit = create(dataset)) doesn't leverage the props parameter, but subclasses like DynamoDBKVStoreImpl use the props parameter in their overridden implementations.
Learnt from: piyush-zlai
PR: #33
File: online/src/main/scala/ai/chronon/online/Api.scala:69-69
Timestamp: 2024-10-08T16:18:45.669Z
Learning: In the KVStore trait located at online/src/main/scala/ai/chronon/online/Api.scala, the default implementation of the create method (def create(dataset: String, props: Map[String, Any]): Unit = create(dataset)) doesn't leverage the props parameter, but subclasses like DynamoDBKVStoreImpl use the props parameter in their overridden implementations.
Learnt from: chewy-zlai
PR: #62
File: spark/src/main/scala/ai/chronon/spark/stats/drift/SummaryUploader.scala:9-10
Timestamp: 2024-11-06T21:54:56.160Z
Learning: In Spark applications, when defining serializable classes, passing an implicit ExecutionContext parameter can cause serialization issues. In such cases, it's acceptable to use scala.concurrent.ExecutionContext.Implicits.global.
⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (17)
- GitHub Check: analyzer_tests
- GitHub Check: streaming_tests
- GitHub Check: groupby_tests
- GitHub Check: join_tests
- GitHub Check: batch_tests
- GitHub Check: fetcher_tests
- GitHub Check: spark_tests
- GitHub Check: cloud_gcp_tests
- GitHub Check: service_commons_tests
- GitHub Check: api_tests
- GitHub Check: service_tests
- GitHub Check: cloud_aws_tests
- GitHub Check: online_tests
- GitHub Check: flink_tests
- GitHub Check: aggregator_tests
- GitHub Check: scala_compile_fmt_fix
- GitHub Check: enforce_triggered_workflows
🔇 Additional comments (5)
spark/src/test/scala/ai/chronon/spark/test/batch/BatchNodeRunnerTest.scala (5)
23-23: Import addition looks good.The ExternalSourceSensorNode import is correctly added for the new test functionality.
433-450: Test setup and success case validation are correct.The test properly validates the happy path where all partitions exist in the source table.
452-471: Missing partition failure test is well-implemented.Good coverage of the failure scenario with appropriate assertions checking the exception message content.
473-490: Default retry values test is appropriate.Validates that default behavior works when retry parameters aren't explicitly set.
515-538: Non-existent table error handling test is comprehensive.Good test coverage for edge case with flexible assertion that handles different possible error messages.
Summary
Checklist
Summary by CodeRabbit