-
Notifications
You must be signed in to change notification settings - Fork 8
wip: run join tests in parallel #79
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
WalkthroughThe changes in Changes
Possibly related PRs
Suggested reviewers
Poem
🪧 TipsChatThere are 3 ways to chat with CodeRabbit:
Note: Be mindful of the bot's finite context window. It's strongly recommended to break down tasks such as reading entire modules into smaller chunks. For a focused discussion, use review comments to chat about specific files and their changes, instead of using the PR comments. CodeRabbit Commands (Invoked using PR comments)
Other keywords and placeholders
CodeRabbit Configuration File (
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 1
🧹 Outside diff range and nitpick comments (20)
spark/src/test/scala/ai/chronon/spark/test/JoinTest.scala (20)
59-66: Consider cleaning up test databases after executionThe
withFixturemethod creates a new database for each test but does not clean it up afterwards. This could lead to an accumulation of test databases over time.You might want to modify the
withFixturemethod to drop the database after the test runs:override def withFixture(test: OneArgTest) = { val namespace: FixtureParam = f"test_namespace_jointest__${test.name.split(" ").mkString("_")}" tableUtils.createDatabase(namespace) + try { test(namespace) + } finally { + tableUtils.dropDatabase(namespace) + } }
129-129: UnnecessarytoStringcall onnamespaceThe
namespaceparameter is already aString. Callingnamespace.toStringis redundant.Apply this diff to simplify the code:
Builders.MetaData(name = "unit_test.user_transactions", namespace = namespace.toString, team = "chronon") + Builders.MetaData(name = "unit_test.user_transactions", namespace = namespace, team = "chronon")
157-157: UnnecessarytoStringcall onnamespaceThe
namespaceis aString; callingtoStringis unnecessary.Apply this diff:
Builders.MetaData(name = "test.user_transaction_features", namespace = namespace.toString, team = "chronon") + Builders.MetaData(name = "test.user_transaction_features", namespace = namespace, team = "chronon")
296-296: UnnecessarytoStringcall onnamespaceAvoid redundant
toStringon aStringvariable.Apply this diff:
metaData = Builders.MetaData(name = "unit_test.country_weights", namespace = namespace.toString) + metaData = Builders.MetaData(name = "unit_test.country_weights", namespace = namespace)
315-315: UnnecessarytoStringcall onnamespaceThe
namespacevariable is already aString.Apply this diff:
metaData = Builders.MetaData(name = "unit_test.country_heights", namespace = namespace.toString) + metaData = Builders.MetaData(name = "unit_test.country_heights", namespace = namespace)
328-328: UnnecessarytoStringcall onnamespaceSimplify the
namespaceassignment.Apply this diff:
metaData = Builders.MetaData(name = "test.country_features", namespace = namespace.toString, team = "chronon") + metaData = Builders.MetaData(name = "test.country_features", namespace = namespace, team = "chronon")
413-413: UnnecessarytoStringcall onnamespaceRemove the redundant
toStringmethod.Apply this diff:
metaData = Builders.MetaData(name = "test.country_weights_no_backfill", namespace = namespace.toString) + metaData = Builders.MetaData(name = "test.country_weights_no_backfill", namespace = namespace)
427-427: UnnecessarytoStringonnamespaceApply this diff:
namespace = namespace.toString, + namespace = namespace,
468-468: UnnecessarytoStringcall onnamespaceSimplify the
metaDataconstruction.Apply this diff:
metaData = Builders.MetaData(name = "unit_test.item_views", namespace = namespace.toString), + metaData = Builders.MetaData(name = "unit_test.item_views", namespace = namespace),
485-485: UnnecessarytoStringcall onnamespaceApply this diff:
Builders.MetaData(name = "test.item_snapshot_features_2", namespace = namespace.toString, team = "chronon") + Builders.MetaData(name = "test.item_snapshot_features_2", namespace = namespace, team = "chronon")
542-542: UnnecessarytoStringcall onnamespaceApply this diff:
metaData = Builders.MetaData(name = "unit_test.item_views", namespace = namespace.toString) + metaData = Builders.MetaData(name = "unit_test.item_views", namespace = namespace)
732-732: UnnecessarytoStringcall onnamespaceApply this diff:
metaData = Builders.MetaData(name = "test.user_features", namespace = namespace.toString, team = "chronon") + metaData = Builders.MetaData(name = "test.user_features", namespace = namespace, team = "chronon")
985-986: UnnecessarytoStringcall onnamespaceApply this diff:
metaData = Builders.MetaData(name = "test.item_snapshot_bloom_test", namespace = namespace.toString, team = "chronon") + metaData = + Builders.MetaData(name = "test.item_snapshot_bloom_test", namespace = namespace, team = "chronon")
1035-1037: UnnecessarytoStringcall onnamespaceApply this diff:
metaData = Builders.MetaData(name = s"unit_test.item_views_$nameSuffix", namespace = namespace.toString, team = "item_team"), + metaData = Builders.MetaData(name = s"unit_test.item_views_$nameSuffix", + namespace = namespace, + team = "item_team"),
1044-1046: UnnecessarytoStringcall onnamespaceApply this diff:
metaData = Builders.MetaData(name = s"test.item_temporal_features$nameSuffix", namespace = namespace.toString, team = "item_team") + metaData = Builders.MetaData(name = s"test.item_temporal_features$nameSuffix", + namespace = namespace, + team = "item_team")
1144-1146: UnnecessarytoStringcall onnamespaceApply this diff:
Builders.MetaData(name = "unit_test.key_overlap.user_features", namespace = namespace.toString, team = "chronon") + Builders.MetaData(name = "unit_test.key_overlap.user_features", + namespace = namespace, + team = "chronon")
1198-1198: UnnecessarytoStringcall onnamespaceApply this diff:
metaData = Builders.MetaData(name = "unit_test.item_views_selected_join_parts_1", namespace = namespace.toString, team = "item_team"), + metaData = Builders.MetaData(name = "unit_test.item_views_selected_join_parts_1", + namespace = namespace, + team = "item_team"),
1214-1214: UnnecessarytoStringcall onnamespaceApply this diff:
metaData = Builders.MetaData(name = "unit_test.item_views_selected_join_parts_2", namespace = namespace.toString, team = "item_team"), + metaData = Builders.MetaData(name = "unit_test.item_views_selected_join_parts_2", + namespace = namespace, + team = "item_team"),
1230-1230: UnnecessarytoStringcall onnamespaceApply this diff:
metaData = Builders.MetaData(name = "unit_test.item_views_selected_join_parts_3", namespace = namespace.toString, team = "item_team"), + metaData = Builders.MetaData(name = "unit_test.item_views_selected_join_parts_3", + namespace = namespace, + team = "item_team"),
1244-1246: UnnecessarytoStringcall onnamespaceApply this diff:
metaData = Builders.MetaData(name = "unit_test.item_temporal_features.selected_join_parts", namespace = namespace.toString, team = "item_team", online = true) + metaData = Builders.MetaData(name = "unit_test.item_temporal_features.selected_join_parts", + namespace = namespace, + team = "item_team", + online = true)
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
📒 Files selected for processing (1)
spark/src/test/scala/ai/chronon/spark/test/JoinTest.scala(36 hunks)
🔇 Additional comments (7)
spark/src/test/scala/ai/chronon/spark/test/JoinTest.scala (7)
42-43: Imports added for fixture support and parallel execution
The added imports for FixtureAnyFunSuite and ParallelTestExecution are necessary for the updated test structure.
49-49: Ensure thread safety with ParallelTestExecution
By extending ParallelTestExecution, tests will run in parallel. Please verify that shared resources, such as databases or file systems, are properly synchronized to avoid race conditions or data inconsistencies.
70-70: Update test methods to accept namespace parameter
The test method now accepts a namespace parameter, enhancing test isolation and allowing dynamic namespace creation for each test case.
907-907: Update method to accept namespace parameter
The getEventsEventsTemporal method now accepts a namespace parameter, allowing for dynamic namespace usage within the method.
868-868: Update method to accept namespace parameter
The getViewsGroupBy method now includes a namespace parameter, enhancing flexibility in specifying namespaces.
651-651: Update method to accept namespace parameter
Method getGroupByForIncrementalSourceTest now accepts namespace as a parameter, allowing for better control over test namespaces.
927-927: Update test method to accept namespace parameter
The test method now uses the namespace parameter, ensuring consistency across test cases.
| test("test migration") { _ => | ||
| // Left | ||
|
|
||
| val namespace = "test_namespace_jointest" |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Inconsistent use of namespace parameter
In the test("test migration") method, the namespace parameter is ignored ({ _ =>) and a local variable namespace is hardcoded. For consistency and to leverage the dynamic namespace generation, consider using the fixture parameter.
Apply this diff to use the fixture parameter:
- test("test migration") { _ =>
-
- val namespace = "test_namespace_jointest"
+ test("test migration") { namespace =>
+
+ // `namespace` is provided by the fixture📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
| test("test migration") { _ => | |
| // Left | |
| val namespace = "test_namespace_jointest" | |
| test("test migration") { namespace => | |
| // `namespace` is provided by the fixture | |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 0
🧹 Outside diff range and nitpick comments (10)
spark/src/test/scala/ai/chronon/spark/test/JoinTest.scala (10)
126-127: Redundant.toStringonnamespaceThe
namespacevariable is already of typeString; calling.toStringis unnecessary.Apply this diff:
- Builders.MetaData(name = "unit_test.user_transactions", namespace = namespace.toString, team = "chronon") + Builders.MetaData(name = "unit_test.user_transactions", namespace = namespace, team = "chronon")
154-155: Redundant.toStringonnamespaceApply this diff:
- Builders.MetaData(name = "test.user_transaction_features", namespace = namespace.toString, team = "chronon") + Builders.MetaData(name = "test.user_transaction_features", namespace = namespace, team = "chronon")
293-294: Redundant.toStringonnamespaceApply this diff:
- metaData = Builders.MetaData(name = "unit_test.country_weights", namespace = namespace.toString) + metaData = Builders.MetaData(name = "unit_test.country_weights", namespace = namespace)
312-313: Redundant.toStringonnamespaceApply this diff:
- metaData = Builders.MetaData(name = "unit_test.country_heights", namespace = namespace.toString) + metaData = Builders.MetaData(name = "unit_test.country_heights", namespace = namespace)
425-425: Redundant.toStringonnamespaceApply this diff:
- metaData = Builders.MetaData(name = "test.country_no_historical_backfill", - namespace = namespace.toString, - team = "chronon", - historicalBackfill = false) + metaData = Builders.MetaData(name = "test.country_no_historical_backfill", + namespace = namespace, + team = "chronon", + historicalBackfill = false)
483-483: Redundant.toStringonnamespaceApply this diff:
- Builders.MetaData(name = "test.item_snapshot_features_2", namespace = namespace.toString, team = "chronon") + Builders.MetaData(name = "test.item_snapshot_features_2", namespace = namespace, team = "chronon")
540-540: Redundant.toStringonnamespaceApply this diff:
- metaData = Builders.MetaData(name = "unit_test.item_views", namespace = namespace.toString) + metaData = Builders.MetaData(name = "unit_test.item_views", namespace = namespace)
594-594: Redundant.toStringonnamespacein Method CallIn the method call
getViewsGroupBy(namespace.toString, suffix = "cumulative", makeCumulative = true),namespaceis already aString, so calling.toStringis unnecessary.Apply this diff:
- val viewsGroupBy = getViewsGroupBy(namespace.toString, suffix = "cumulative", makeCumulative = true) + val viewsGroupBy = getViewsGroupBy(namespace, suffix = "cumulative", makeCumulative = true)
1033-1035: Redundant.toStringonnamespaceApply this diff:
- metaData = Builders.MetaData(name = s"unit_test.item_views_$nameSuffix", - namespace = namespace.toString, - team = "item_team") + metaData = Builders.MetaData(name = s"unit_test.item_views_$nameSuffix", + namespace = namespace, + team = "item_team")
1142-1144: Redundant.toStringonnamespaceApply this diff:
- metaData = Builders.MetaData(name = "unit_test.key_overlap.user_features", - namespace = namespace.toString, - team = "chronon") + metaData = Builders.MetaData(name = "unit_test.key_overlap.user_features", + namespace = namespace, + team = "chronon")
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
📒 Files selected for processing (2)
.github/workflows/test_scala_and_python.yaml(1 hunks)spark/src/test/scala/ai/chronon/spark/test/JoinTest.scala(36 hunks)
🔇 Additional comments (3)
.github/workflows/test_scala_and_python.yaml (1)
87-87: Verify parallel test execution configuration
The change introduces parallel test execution with 8 threads (-P8). While this can improve test execution speed, please ensure:
- The tests are truly independent and don't share mutable state
- The memory settings (
-Xmx8G) are sufficient for parallel execution - The CI environment can handle the increased load
Run this script to check for potential test dependencies and shared resources:
Consider the following improvements:
- Scale the number of parallel threads (
-P8) based on the CI environment's CPU cores - Add memory monitoring to detect potential OOM issues
- Consider moving this job to
ubuntu-8_cores-32_gbrunner likeother_spark_testsif memory becomes a constraint
✅ Verification successful
Based on the results, I can now analyze the test isolation patterns. Let me check the specific implementation of JoinTest to verify thread safety:
Based on the code analysis, I can now provide a definitive response about the parallel test execution safety:
Parallel test execution is safe with current implementation
The JoinTest implementation is thread-safe for parallel execution because:
-
It extends
ParallelTestExecutiontrait and usesFixtureAnyFunSuitewith proper test isolation:- Each test gets its own namespace via
test_namespace_jointest__${test.name} - Database and tables are created fresh for each test via
withFixture
- Each test gets its own namespace via
-
No shared mutable state between tests:
- SparkSession is immutable and thread-safe
- TableUtils is effectively immutable
- Each test operates in its own database namespace
- Test data is generated uniquely per test
-
Resource isolation:
- Each test creates and manages its own tables in isolated namespaces
- The
-Xmx8Gheap setting is sufficient for parallel execution with 8 threads given the test data volumes
🏁 Scripts executed
The following scripts were executed for the analysis:
Script:
#!/bin/bash
# Description: Check for potential shared resources in JoinTest
# Look for shared resource patterns in JoinTest
ast-grep --pattern 'object $OBJECT {
$$$
val $FIELD = $$$
$$$
}' | grep -B2 -A2 'JoinTest'
# Check for static/shared database connections
rg -A 5 'Connection|DataSource|Database'
Length of output: 34305
Script:
#!/bin/bash
# Check JoinTest implementation for thread safety patterns
ast-grep --pattern 'class JoinTest extends $_ {
$$$
}'
# Look for shared state patterns in test setup
rg -A 10 "def beforeEach|def beforeAll|@Before|@BeforeClass" spark/src/test/scala/ai/chronon/spark/test/JoinTest.scala
Length of output: 100378
spark/src/test/scala/ai/chronon/spark/test/JoinTest.scala (2)
1050-1053: Inconsistent use of namespace parameter
63-63:
Syntax Error: Incorrect String Interpolation
In Scala, string interpolation uses the s prefix instead of f. Replace f with s to fix the syntax error.
Apply this diff to correct the syntax:
- val namespace: FixtureParam = f"test_namespace_jointest__${test.name.split(" ").mkString("_")}"
+ val namespace: FixtureParam = s"test_namespace_jointest__${test.name.split(" ").mkString("_")}"Likely invalid or redundant comment.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 1
🧹 Outside diff range and nitpick comments (1)
spark/src/test/scala/ai/chronon/spark/test/JoinTest.scala (1)
126-127: Redundant.toStringCall on String VariablenamespaceThe
namespacevariable is already of typeString, so calling.toStringon it is unnecessary. This occurs in multiple places. Consider removing.toStringto improve code clarity.Apply this change across the affected lines:
- namespace = namespace.toString + namespace = namespaceAlso applies to: 154-155, 273-274, 294-294, 313-313, 326-326, 411-411, 425-425, 466-466, 482-483, 540-540, 594-594, 596-596, 730-730, 771-771, 796-797, 918-918, 926-926, 968-968, 983-984, 1033-1035, 1042-1044, 1142-1144, 1196-1196, 1212-1212, 1228-1228, 1242-1245
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
📒 Files selected for processing (1)
spark/src/test/scala/ai/chronon/spark/test/JoinTest.scala(36 hunks)
🔇 Additional comments (2)
spark/src/test/scala/ai/chronon/spark/test/JoinTest.scala (2)
49-49: Ensure Thread Safety with Parallel Test Execution
By extending ParallelTestExecution, tests will run concurrently. Please verify that shared resources like spark and tableUtils are thread-safe or appropriately synchronized to prevent race conditions.
1050-1053: Inconsistent Use of namespace Parameter
This issue has been previously raised: In the test("test migration") method, the namespace parameter is ignored ({ _ =>), and a local variable namespace is hardcoded. For consistency and to leverage the dynamic namespace generation, consider using the fixture parameter.
| val spark: SparkSession = SparkSessionBuilder.build("JoinTest", local = true) | ||
| private implicit val tableUtils = TableUtils(spark) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Potential Thread Safety Issue with Shared spark and tableUtils Instances
The spark and tableUtils instances are shared across all tests. Since tests now run in parallel, this could lead to concurrency issues if these instances are not thread-safe. Consider instantiating spark and tableUtils within each test or ensuring they are safe for concurrent use.
Apply this diff to instantiate spark and tableUtils within each test:
- val spark: SparkSession = SparkSessionBuilder.build("JoinTest", local = true)
- private implicit val tableUtils = TableUtils(spark)
+ // Remove shared instances; create per-test instances in 'withFixture' or individual tests.Committable suggestion skipped: line range outside the PR's diff.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 0
🧹 Outside diff range and nitpick comments (3)
spark/src/test/scala/ai/chronon/spark/test/JoinTest.scala (3)
60-67: Consider adding cleanup for test namespaces.The fixture implementation correctly creates unique namespaces for test isolation. However, there's no cleanup of these namespaces after test execution, which could lead to accumulation of test databases over time.
Consider adding cleanup in the fixture:
override def withFixture(test: OneArgTest): Outcome = { val namespace: FixtureParam = f"test_namespace_jointest__${test.name.split(" ").mkString("_")}" tableUtils.createDatabase(namespace) - test(namespace) + try { + test(namespace) + } finally { + tableUtils.dropDatabase(namespace, cascade = true) + } }
Line range hint
69-1051: Consider extracting common test patterns into helper methods.The test methods follow similar patterns for setup and verification. Consider extracting these common patterns into helper methods to improve maintainability and reduce code duplication:
- Table setup with DataFrameGen
- Join configuration creation
- Result verification with diff comparison
Example helper method:
private def verifyJoinResults(computed: DataFrame, expected: DataFrame, keyColumns: List[String]): Unit = { val diff = Comparison.sideBySide(computed, expected, keyColumns) if (diff.count() > 0) { println(s"Actual count: ${computed.count()}") println(s"Expected count: ${expected.count()}") println(s"Diff count: ${diff.count()}") diff.show() } assertEquals(0, diff.count()) }Also applies to: 1051-1100
Line range hint
42-1275: Good implementation of parallel test execution with proper test isolation.The changes successfully achieve the PR objective of running join tests in parallel while maintaining test isolation through:
- Proper use of ScalaTest's parallel execution support
- Test isolation using unique namespaces per test
- Consistent fixture parameter usage
Some architectural considerations:
- The parallel execution might increase resource usage during test runs
- Test isolation through namespaces provides good separation but requires proper cleanup
- Consider monitoring test execution times to ensure parallel execution provides expected benefits
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
📒 Files selected for processing (2)
.github/workflows/test_scala_and_python.yaml(1 hunks)spark/src/test/scala/ai/chronon/spark/test/JoinTest.scala(36 hunks)
🚧 Files skipped from review as they are similar to previous changes (1)
- .github/workflows/test_scala_and_python.yaml
🔇 Additional comments (2)
spark/src/test/scala/ai/chronon/spark/test/JoinTest.scala (2)
42-50: LGTM! Good implementation of parallel test execution.
The changes correctly implement parallel test execution by:
- Adding necessary imports for parallel testing and fixture support
- Extending
FixtureAnyFunSuiteand mixing inParallelTestExecution
Line range hint 867-922: LGTM! Well-structured helper methods.
The helper methods are well-implemented with:
- Clear separation of concerns
- Proper parameter handling
- Consistent namespace usage
Also applies to: 906-922
| import scala.util.ScalaJavaConversions.ListOps | ||
|
|
||
| // Run as follows: sbt "spark/testOnly -- -n jointest" | ||
| class JoinTest extends AnyFunSuite with TaggedFilterSuite { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
the challenge with yanking the tagged filter suite iirc is that this test will get triggered as part of the other spark tests step ('sbt "spark/testOnly"'). For this one we don't want the dual run right?
| run: | | ||
| export SBT_OPTS="-Xmx8G -Xms2G --add-opens=java.base/sun.nio.ch=ALL-UNNAMED" | ||
| sbt "spark/testOnly -- -n jointest" | ||
| sbt "spark/testOnly ai.chronon.spark.test.JoinTest -- -P8" |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
isn't the runs-on: ubuntu-latest single core?
Summary
Checklist
Summary by CodeRabbit