-
Notifications
You must be signed in to change notification settings - Fork 8
Flink updates to use TileKey on tiling flow writes #307
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
WalkthroughThe pull request updates several modules to streamline window resolution and tiling functionality. In the aggregator, a method now returns a guaranteed long value instead of an optional. In the API, a new utility method for computing window start times is added. The Flink components are refactored to pass tiling window size as a mandatory parameter and adjust tile key processing accordingly, with corresponding changes in integration tests. Changes
Sequence Diagram(s)sequenceDiagram
participant Job as FlinkJob
participant Codec as TiledAvroCodecFn
participant Utils as Window/TilingUtils
participant Test as IntegrationTest
Job->>Codec: Construct with tilingWindowSizeMs
Codec->>Utils: Call windowStartMillis(timestamp, windowSize)
Codec->>Codec: Build and serialize TileKey
Test->>Utils: Deserialize TileKey from keyBytes
Test->>Codec: Decode key using keyCodec
Possibly related PRs
Suggested reviewers
Poem
Warning Review ran into problems🔥 ProblemsGitHub Actions and Pipeline Checks: Resource not accessible by integration - https://docs.github.com/rest/actions/workflow-runs#list-workflow-runs-for-a-repository. Please grant the required permissions to the CodeRabbit GitHub App under the organization or repository settings. 🪧 TipsChatThere are 3 ways to chat with CodeRabbit:
Note: Be mindful of the bot's finite context window. It's strongly recommended to break down tasks such as reading entire modules into smaller chunks. For a focused discussion, use review comments to chat about specific files and their changes, instead of using the PR comments. CodeRabbit Commands (Invoked using PR comments)
Other keywords and placeholders
CodeRabbit Configuration File (
|
1579735 to
b03cdcb
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Caution
Inline review comments failed to post. This is likely due to GitHub's limits when posting large numbers of comments.
Actionable comments posted: 1
🛑 Comments failed to post (1)
cloud_gcp/src/main/scala/ai/chronon/integrations/cloud_gcp/BigTableKVStoreImpl.scala (1)
453-458:
⚠️ Potential issueFix row key construction bug.
The second assignment to baseRowKey overwrites the first one, causing the dayTs to be lost.
def buildTiledRowKey(baseKeyBytes: Seq[Byte], dataset: String, ts: Long, tileSizeMs: Long): Array[Byte] = { val baseRowKey = s"$dataset#".getBytes(Charset.forName("UTF-8")) ++ baseKeyBytes val dayTs = ts - (ts % 1.day.toMillis) - baseRowKey ++ s"#$dayTs".getBytes(Charset.forName("UTF-8")) - baseRowKey ++ s"#$tileSizeMs".getBytes(Charset.forName("UTF-8")) + baseRowKey ++ s"#$dayTs#$tileSizeMs".getBytes(Charset.forName("UTF-8")) }📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.def buildTiledRowKey(baseKeyBytes: Seq[Byte], dataset: String, ts: Long, tileSizeMs: Long): Array[Byte] = { val baseRowKey = s"$dataset#".getBytes(Charset.forName("UTF-8")) ++ baseKeyBytes val dayTs = ts - (ts % 1.day.toMillis) baseRowKey ++ s"#$dayTs#$tileSizeMs".getBytes(Charset.forName("UTF-8")) }
b03cdcb to
a6053a3
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 0
🧹 Nitpick comments (1)
flink/src/main/scala/ai/chronon/flink/AvroCodecFn.scala (1)
165-186: Consider performance and logging improvements.Two suggestions:
- Avoid unnecessary List conversion of entityKeyBytes
- Include TileKey details in debug logging
- tileKey.setKeyBytes(entityKeyBytes.toList.asJava.asInstanceOf[java.util.List[java.lang.Byte]]) + // Use Arrays.asList for better performance + tileKey.setKeyBytes(java.util.Arrays.asList(entityKeyBytes:_*).asInstanceOf[java.util.List[java.lang.Byte]]) logger.debug( s""" |Avro converting tile to PutRequest - tile=${in} |groupBy=${groupByServingInfoParsed.groupBy.getMetaData.getName} tsMills=$tsMills keys=$keys |keyBytes=${java.util.Base64.getEncoder.encodeToString(entityKeyBytes)} |valueBytes=${java.util.Base64.getEncoder.encodeToString(valueBytes)} - |streamingDataset=$streamingDataset""".stripMargin + |streamingDataset=$streamingDataset + |tileKey=$tileKey""".stripMargin
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro (Legacy)
📒 Files selected for processing (5)
aggregator/src/main/scala/ai/chronon/aggregator/windowing/Resolution.scala(1 hunks)api/src/main/scala/ai/chronon/api/Extensions.scala(1 hunks)flink/src/main/scala/ai/chronon/flink/AvroCodecFn.scala(3 hunks)flink/src/main/scala/ai/chronon/flink/FlinkJob.scala(3 hunks)flink/src/test/scala/ai/chronon/flink/test/FlinkJobIntegrationTest.scala(2 hunks)
🚧 Files skipped from review as they are similar to previous changes (4)
- flink/src/test/scala/ai/chronon/flink/test/FlinkJobIntegrationTest.scala
- aggregator/src/main/scala/ai/chronon/aggregator/windowing/Resolution.scala
- flink/src/main/scala/ai/chronon/flink/FlinkJob.scala
- api/src/main/scala/ai/chronon/api/Extensions.scala
⏰ Context from checks skipped due to timeout of 90000ms (3)
- GitHub Check: no_spark_scala_tests
- GitHub Check: scala_compile_fmt_fix
- GitHub Check: enforce_triggered_workflows
🔇 Additional comments (2)
flink/src/main/scala/ai/chronon/flink/AvroCodecFn.scala (2)
6-10: LGTM!Required imports added for TileKey integration.
132-135: LGTM!Constructor updated to include required tiling window size.
| val tileKey = new TileKey() | ||
| val tileStart = WindowUtils.windowStartMillis(tsMills, tilingWindowSizeMs) | ||
| tileKey.setDataset(streamingDataset) | ||
| tileKey.setKeyBytes(entityKeyBytes.toList.asJava.asInstanceOf[java.util.List[java.lang.Byte]]) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
might be better to box individually? eg.
keyBytes.map(java.lang.Byte.valueOf).toList.asJavaThere was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I went with this as I thought it would be better to skip iterating over all the bytes in the key bytes list given Scala and Java bytes are equivalent - so we can just do a constant time operation of casting the top level types.
Seems to work in unit tests. Wdyt?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
okay I thought it would have been an Exception but if it works in UT's that's good.
|
|
||
| /** | ||
| * Find the smallest tail window resolution in a GroupBy. Returns None if the GroupBy does not define any windows. | ||
| * Find the smallest tail window resolution in a GroupBy. Returns 1D if the GroupBy does not define any windows (all-time aggregates). |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
can you expand on this change?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah so if a user creates a GroupBy with no windows we end up defaulting to all time windows in Chronon. Currently the Flink code will throw an error for this scenario (we return None here and in FlinkJob when we call .get it errors out). Instead of failing we go with a 1D tile size as thats the best option to help us compute the all time window (batch has the rest)
## Summary Update the Flink job code on the tiling path to use the TileKey. I haven't wired up the KV store side of things yet (can do the write and read side of the KV store collaboratively with Thomas as they need to go together to keep the tests happy). The tiling version of the Flink job isn't in use so these changes should be safe to go and keeps things incremental. ## Checklist - [ ] Added Unit Tests - [X] Covered by existing CI - [ ] Integration tested - [ ] Documentation update <!-- This is an auto-generated comment: release notes by coderabbit.ai --> ## Summary by CodeRabbit - **New Features** - Added a utility to determine the start timestamp for a defined time window. - **Refactor/Enhancements** - Streamlined time window handling by providing a default one-day resolution when none is specified. - Improved tiled data processing with consistent tiling window sizing and enriched metadata management. - **Tests** - Updated integration tests to validate the new tile processing and time window behavior. <!-- end of auto-generated comment: release notes by coderabbit.ai -->
## Summary Update the Flink job code on the tiling path to use the TileKey. I haven't wired up the KV store side of things yet (can do the write and read side of the KV store collaboratively with Thomas as they need to go together to keep the tests happy). The tiling version of the Flink job isn't in use so these changes should be safe to go and keeps things incremental. ## Checklist - [ ] Added Unit Tests - [X] Covered by existing CI - [ ] Integration tested - [ ] Documentation update <!-- This is an auto-generated comment: release notes by coderabbit.ai --> ## Summary by CodeRabbit - **New Features** - Added a utility to determine the start timestamp for a defined time window. - **Refactor/Enhancements** - Streamlined time window handling by providing a default one-day resolution when none is specified. - Improved tiled data processing with consistent tiling window sizing and enriched metadata management. - **Tests** - Updated integration tests to validate the new tile processing and time window behavior. <!-- end of auto-generated comment: release notes by coderabbit.ai -->
## Summary Update the Flink job code on the tiling path to use the TileKey. I haven't wired up the KV store side of things yet (can do the write and read side of the KV store collaboratively with Thomas as they need to go together to keep the tests happy). The tiling version of the Flink job isn't in use so these changes should be safe to go and keeps things incremental. ## Checklist - [ ] Added Unit Tests - [X] Covered by existing CI - [ ] Integration tested - [ ] Documentation update <!-- This is an auto-generated comment: release notes by coderabbit.ai --> ## Summary by CodeRabbit - **New Features** - Added a utility to determine the start timestamp for a defined time window. - **Refactor/Enhancements** - Streamlined time window handling by providing a default one-day resolution when none is specified. - Improved tiled data processing with consistent tiling window sizing and enriched metadata management. - **Tests** - Updated integration tests to validate the new tile processing and time window behavior. <!-- end of auto-generated comment: release notes by coderabbit.ai -->
## Summary Update the Flink job code on the tiling path to use the TileKey. I haven't wired up the KV store side of things yet (can do the write and read side of the KV store collaboratively with Thomas as they need to go together to keep the tests happy). The tiling version of the Flink job isn't in use so these changes should be safe to go and keeps things incremental. ## Checklist - [ ] Added Unit Tests - [X] Covered by existing CI - [ ] Integration tested - [ ] Documentation update <!-- This is an auto-generated comment: release notes by coderabbit.ai --> ## Summary by CodeRabbit - **New Features** - Added a utility to determine the start timestamp for a defined time window. - **Refactor/Enhancements** - Streamlined time window handling by providing a default one-day resolution when none is specified. - Improved tiled data processing with consistent tiling window sizing and enriched metadata management. - **Tests** - Updated integration tests to validate the new tile processing and time window behavior. <!-- end of auto-generated comment: release notes by coderabbit.ai -->
## Summary Update the Flink job code on the tiling path to use the TileKey. I haven't wired up the KV store side of things yet (can do the write and read side of the KV store collaboratively with Thomas as they need to go together to keep the tests happy). The tiling version of the Flink job isn't in use so these changes should be safe to go and keeps things incremental. ## Cheour clientslist - [ ] Added Unit Tests - [X] Covered by existing CI - [ ] Integration tested - [ ] Documentation update <!-- This is an auto-generated comment: release notes by coderabbit.ai --> ## Summary by CodeRabbit - **New Features** - Added a utility to determine the start timestamp for a defined time window. - **Refactor/Enhancements** - Streamlined time window handling by providing a default one-day resolution when none is specified. - Improved tiled data processing with consistent tiling window sizing and enriched metadata management. - **Tests** - Updated integration tests to validate the new tile processing and time window behavior. <!-- end of auto-generated comment: release notes by coderabbit.ai -->
Summary
Update the Flink job code on the tiling path to use the TileKey. I haven't wired up the KV store side of things yet (can do the write and read side of the KV store collaboratively with Thomas as they need to go together to keep the tests happy).
The tiling version of the Flink job isn't in use so these changes should be safe to go and keeps things incremental.
Checklist
Summary by CodeRabbit
New Features
Refactor/Enhancements
Tests