Skip to content

Conversation

@varant-zlai
Copy link
Collaborator

@varant-zlai varant-zlai commented Jan 29, 2025

Summary

Cherry picking: airbnb/chronon#767 (review)

Checklist

  • Added Unit Tests
  • Covered by existing CI
  • Integration tested
  • Documentation update

Summary by CodeRabbit

Release Notes

  • New Features

    • Enhanced DataFrame null value handling with more flexible filtering options
    • Improved join operations with clearer key matching and column selection
  • Improvements

    • Added Kryo serialization support for BitSet class
    • Refined join and temporal entity processing methods
  • Testing

    • Added new test case for join operations with partial keys

These updates improve data processing flexibility and serialization capabilities in the Spark data processing framework.

ken-zlai and others added 30 commits September 24, 2024 09:33
Basic service scaffolding using Scala Play framework
# Conflicts:
#	build.sbt
#	hub/app/controllers/FrontendController.scala
#	hub/app/views/index.scala.html
#	hub/public/images/favicon.png
Focus on fraud case for demo. Make parquet file as data.parquet
Adds a readme, and keeps the docker container active so the parquet table can be accessed.
Create svelte project and build it using play for deployment
tchow-zlai and others added 24 commits January 23, 2025 12:27
## Summary

## Checklist
- [ ] Added Unit Tests
- [ ] Covered by existing CI
- [ ] Integration tested
- [ ] Documentation update
<!-- This is an auto-generated comment: release notes by coderabbit.ai
-->
## Summary by CodeRabbit

- **Bug Fixes**
- Improved BigQuery partition loading mechanism to use more direct
method calls when retrieving partition columns and values.

Note: The changes appear technical and primarily affect internal data
loading processes, with no direct end-user visible impact.
<!-- end of auto-generated comment: release notes by coderabbit.ai -->

<!-- av pr metadata
This information is embedded by the av CLI when creating PRs to track
the status of stacks when using Aviator. Please do not delete or edit
this section of the PR.
```
{"parent":"main","parentHead":"","trunk":"main"}
```
-->

---------

Co-authored-by: Thomas Chow <[email protected]>
## Summary

## Checklist
- [ ] Added Unit Tests
- [ ] Covered by existing CI
- [ ] Integration tested
- [ ] Documentation update
<!-- This is an auto-generated comment: release notes by coderabbit.ai
-->
## Summary by CodeRabbit

- **Chores**
	- Updated Java version to Corretto 11.0.25.9.1
	- Upgraded Google Cloud SDK (gcloud) from version 504.0.1 to 507.0.0
<!-- end of auto-generated comment: release notes by coderabbit.ai -->

<!-- av pr metadata
This information is embedded by the av CLI when creating PRs to track
the status of stacks when using Aviator. Please do not delete or edit
this section of the PR.
```
{"parent":"main","parentHead":"","trunk":"main"}
```
-->

---------

Co-authored-by: Thomas Chow <[email protected]>
## Summary
We have a strict dependency on java 11 for all the dataproc stuff so
it's good to be consistent across our project. Currently only
service_commons package has strict dependency on java 17 so made changes
to be compatible with java 11.

was able to successfully build using both sbt and bazel.

## Checklist
- [ ] Added Unit Tests
- [x] Covered by existing CI
- [ ] Integration tested
- [ ] Documentation update



<!-- This is an auto-generated comment: release notes by coderabbit.ai
-->

## Summary by CodeRabbit

- **Configuration**
- Downgraded Java language and runtime versions from 17 to 11 in Bazel
configuration

- **Code Improvement**
- Updated type handling in `RouteHandlerWrapper` method signature for
enhanced type safety

<!-- end of auto-generated comment: release notes by coderabbit.ai -->
## Summary
Set up a Flink job that can take beacon data as avro (configured in gcs)
and emit it at a configurable rate to Kafka. We can use this stream in
our GB streaming jobs

## Checklist
- [ ] Added Unit Tests
- [ ] Covered by existing CI
- [X] Integration tested
Kicked off the job and you can see events flowing in topic
[test-beacon-main](https://console.cloud.google.com/managedkafka/us-central1/clusters/zipline-kafka-cluster/topics/test-beacon-main?hl=en&invt=Abnpeg&project=canary-443022)

- [ ] Documentation update



<!-- This is an auto-generated comment: release notes by coderabbit.ai
-->
## Summary by CodeRabbit

## Release Notes

- **New Features**
  - Added Kafka data ingestion capabilities using Apache Flink.
- Introduced a new driver for streaming events from GCS to Kafka with
configurable delay.

- **Dependencies**
  - Added Apache Flink connectors for Kafka, Avro, and file integration.
- Integrated managed Kafka authentication handler for cloud
environments.

- **Infrastructure**
  - Created new project configuration for Kafka data ingestion.
  - Updated build settings to support advanced streaming workflows.
  - Updated cluster name configuration for Dataproc submitter.
<!-- end of auto-generated comment: release notes by coderabbit.ai -->
## Summary

- the way tables are created can be different depending on the
underlying storage catalog that we are using. In the case of BigQuery,
we cannot issue a spark sql statement to do this operation. Let's
abstract that out into the `Format` layer for now, but eventually we
will need a `Catalog` abstraction that supports this.
- Try to remove the dependency on a sparksession in `Format`, invert the
dependency with a HOF

## Checklist
- [ ] Added Unit Tests
- [ ] Covered by existing CI
- [ ] Integration tested
- [ ] Documentation update
<!-- This is an auto-generated comment: release notes by coderabbit.ai
-->
## Summary by CodeRabbit

- **New Features**
	- Enhanced BigQuery integration with new client parameter
	- Added support for more flexible table creation methods
	- Improved logging capabilities for table operations

- **Bug Fixes**
	- Standardized table creation process across different formats
	- Removed unsupported BigQuery table creation operations

- **Refactor**
	- Simplified table creation and partition insertion logic
- Updated method signatures to support more comprehensive table
management
<!-- end of auto-generated comment: release notes by coderabbit.ai -->

<!-- av pr metadata
This information is embedded by the av CLI when creating PRs to track
the status of stacks when using Aviator. Please do not delete or edit
this section of the PR.
```
{"parent":"main","parentHead":"","trunk":"main"}
```
-->

---------

Co-authored-by: Thomas Chow <[email protected]>
## Summary


- https://app.asana.com/0/1208949807589885/1209206040434612/f 
- Support explicit bigquery table creation. 
## Checklist
- [ ] Added Unit Tests
- [ ] Covered by existing CI
- [ ] Integration tested
- [ ] Documentation update
---
- To see the specific tasks where the Asana app for GitHub is being
used, see below:
  - https://app.asana.com/0/0/1209206040434612
<!-- This is an auto-generated comment: release notes by coderabbit.ai
-->
## Summary by CodeRabbit

- **New Features**
- Enhanced BigQuery table creation functionality with improved schema
and partitioning support.
	- Streamlined table creation process in Spark's TableUtils.

- **Refactor**
	- Simplified table existence checking logic.
	- Consolidated import statements for better readability.
	- Removed unused import in BigQuery catalog test.
- Updated import statement in GcpFormatProviderTest for better
integration with Spark BigQuery connector.

- **Bug Fixes**
	- Improved error handling for table creation scenarios.
<!-- end of auto-generated comment: release notes by coderabbit.ai -->

<!-- av pr metadata
This information is embedded by the av CLI when creating PRs to track
the status of stacks when using Aviator. Please do not delete or edit
this section of the PR.
```
{"parent":"main","parentHead":"","trunk":"main"}
```
-->

---------

Co-authored-by: Thomas Chow <[email protected]>
…obs (#274)

## Summary
Changes to avoid runtime errors while running Spark and Flume jobs on
the cluster using Dataproc submit for the newly built bazel assembly
jars. Successfully ran the DataprocSubmitterTest jobs using the newly
built bazel jars for testing.

Testing Steps
1. Build the assembly jars
```
bazel build //cloud_gcp:lib_deploy.jar
bazel build //flink:assembly_deploy.jar
bazel build //flink:kafka-assembly_deploy.jar
```
2. Copy the jars to gcp account used by our jobs
```
gsutil cp bazel-bin/cloud_gcp/lib_deploy.jar gs://zipline-jars/bazel-cloud-gcp.jar
gsutil cp bazel-bin/flink/assembly_deploy.jar gs://zipline-jars/bazel-flink.jar
gsutil cp bazel-bin/flink/kafka-assembly_deploy.jar gs://zipline-jars/bazel-flink-kafka.jar
```
3. Modify the jar paths in the DataprocSubmitterTest file and run the
tests

## Checklist
- [ ] Added Unit Tests
- [x] Covered by existing CI
- [ ] Integration tested
- [ ] Documentation update



<!-- This is an auto-generated comment: release notes by coderabbit.ai
-->
## Summary by CodeRabbit

## Release Notes

- **New Features**
  - Added Kafka and Avro support for Flink.
  - Introduced new Flink assembly and Kafka assembly binaries.

- **Dependency Management**
  - Updated Maven artifact dependencies, including Kafka and Avro.
- Excluded specific Hadoop and Spark-related artifacts to prevent
runtime conflicts.

- **Build Configuration**
  - Enhanced build rules for Flink and Spark environments.
- Improved dependency management to prevent class compatibility issues.
<!-- end of auto-generated comment: release notes by coderabbit.ai -->
## Summary
The string formatting here was breaking zipline commands when I built a
new wheel:

```
  File "/Users/davidhan/zipline/chronon/dev_chronon/lib/python3.11/site-packages/ai/chronon/repo/hub_uploader.py", line 73
    print(f"\n\nUploading:\n {"\n".join(diffed_entities.keys())}")
                                ^
SyntaxError: unexpected character after line continuation character
```

## Checklist
- [ ] Added Unit Tests
- [ ] Covered by existing CI
- [ ] Integration tested
- [ ] Documentation update



<!-- This is an auto-generated comment: release notes by coderabbit.ai
-->

## Summary by CodeRabbit

- **Style**
- Improved string formatting and error message construction for better
code readability
    - Enhanced log message clarity in upload process

Note: These changes are internal improvements that do not impact
end-user functionality.

<!-- end of auto-generated comment: release notes by coderabbit.ai -->
## Summary
- With #263 we control table
creation ourselves. We don't need to rely on indirect writes to then do
the table creation (and partitioning) for us, we just simply use the
storage API to write directly into the table we created. This should be
much more performant and preferred over indirect writes because we don't
need to stage data, then load as a temp BQ table, and it uses the
BigQuery storage API directly.
- Remove configs that are used only for indirect writes

## Checklist
- [ ] Added Unit Tests
- [ ] Covered by existing CI
- [ ] Integration tested
- [ ] Documentation update
<!-- This is an auto-generated comment: release notes by coderabbit.ai
-->
## Summary by CodeRabbit

## Release Notes

- **Improvements**
- Enhanced BigQuery data writing process with more precise configuration
options.
  - Simplified table creation and partition insertion logic.
- Improved handling of DataFrame column arrangements during data
operations.

- **Changes**
  - Updated BigQuery write method to use a direct writing approach.
- Introduced a new option to prevent table creation if it does not
exist.
  - Modified table creation process to be more format-aware.
  - Streamlined partition insertion mechanism.

These updates improve data management and writing efficiency in cloud
data processing workflows.
<!-- end of auto-generated comment: release notes by coderabbit.ai -->

<!-- av pr metadata
This information is embedded by the av CLI when creating PRs to track
the status of stacks when using Aviator. Please do not delete or edit
this section of the PR.
```
{"parent":"main","parentHead":"","trunk":"main"}
```
-->

---------

Co-authored-by: Thomas Chow <[email protected]>
## Summary

- Pull alter table properties functionality into `Format` 

## Checklist
- [ ] Added Unit Tests
- [ ] Covered by existing CI
- [ ] Integration tested
- [ ] Documentation update

<!-- av pr metadata
This information is embedded by the av CLI when creating PRs to track
the status of stacks when using Aviator. Please do not delete or edit
this section of the PR.
```
{"parent":"main","parentHead":"","trunk":"main"}
```
-->


<!-- This is an auto-generated comment: release notes by coderabbit.ai
-->

## Summary by CodeRabbit

- **New Features**
- Added a placeholder method for altering table properties in BigQuery
format
- Introduced a new method to modify table properties across different
Spark formats
- Enhanced table creation utility to use format-specific property
alteration methods

- **Refactor**
- Improved table creation process by abstracting table property
modifications
- Standardized approach to handling table property changes across
different formats

<!-- end of auto-generated comment: release notes by coderabbit.ai -->

---------

Co-authored-by: Thomas Chow <[email protected]>
## Summary

## Checklist
- [ ] Added Unit Tests
- [ ] Covered by existing CI
- [ ] Integration tested
- [ ] Documentation update



<!-- This is an auto-generated comment: release notes by coderabbit.ai
-->
## Summary by CodeRabbit

- **New Features**
  - Added configurable repartitioning option for DataFrame writes.
- Introduced a new configuration setting to control repartitioning
behavior.
  - Enhanced test suite with functionality to handle empty DataFrames.

- **Chores**
  - Improved code formatting and logging for DataFrame writing process.
<!-- end of auto-generated comment: release notes by coderabbit.ai -->

<!-- av pr metadata
This information is embedded by the av CLI when creating PRs to track
the status of stacks when using Aviator. Please do not delete or edit
this section of the PR.
```
{"parent":"main","parentHead":"","trunk":"main"}
```
-->

---------

Co-authored-by: tchow-zlai <[email protected]>
Co-authored-by: Thomas Chow <[email protected]>
)

## Summary

we cannot represent absent data in time series with null values because
the thrift serializer doesn't allow nulls in list<double> - so we create
a magic double value (based on string "chronon") to represent nulls

## Checklist
- [x] Added Unit Tests
- [ ] Covered by existing CI
- [ ] Integration tested
- [ ] Documentation update



<!-- This is an auto-generated comment: release notes by coderabbit.ai
-->

## Summary by CodeRabbit

## Release Notes

- **New Features**
- Introduced a new constant `magicNullDouble` for handling null value
representations

- **Bug Fixes**
- Improved null value handling in serialization and data processing
workflows

- **Tests**
- Added new test case for `TileDriftSeries` serialization to validate
null value management

The changes enhance data processing consistency and provide a
standardized approach to managing null values across the system.

<!-- end of auto-generated comment: release notes by coderabbit.ai -->
## Summary
This fixes an issue where Infinity/NaN values in drift calculations were
causing JSON parse errors in the frontend. These special values are now
converted to our standard magic null value (-2.7980863399423856E16)
before serialization.

## Checklist
- [x] Added Unit Tests
- [ ] Covered by existing CI
- [ ] Integration tested
- [ ] Documentation update



<!-- This is an auto-generated comment: release notes by coderabbit.ai
-->

## Summary by CodeRabbit

- **Bug Fixes**
- Enhanced error handling for special double values (infinity and NaN)
in data processing
- Improved serialization test case to handle null and special values
more robustly

- **Tests**
- Updated test case for `TileDriftSeries` serialization to cover edge
cases with special double values

<!-- end of auto-generated comment: release notes by coderabbit.ai -->
## Summary
Changed return type to seq (backed by ArrayBuffer under the hood). Added
unit tests.

## Checklist
- [x] Added Unit Tests
- [x] Covered by existing CI
- [ ] Integration tested
- [ ] Documentation update



<!-- This is an auto-generated comment: release notes by coderabbit.ai
-->
## Summary by CodeRabbit

- **Refactor**
- Updated SQL utility methods to return sequences of results instead of
optional single results.
- Modified result handling in SQL-related utility classes to support
multiple result rows.
- Enhanced logic to handle cases with no valid inputs in data processing
methods.

- **Tests**
	- Updated test cases to accommodate new result handling approach.
- Added new test for handling "explode" invocations in SQL select
clauses.
- Refined handling of null values and special floating-point values in
tests.

- **Bug Fixes**
- Improved error handling and result processing in SQL transformation
methods.
<!-- end of auto-generated comment: release notes by coderabbit.ai -->
## Summary
fixed null handling in drift test code.

## Checklist
- [ ] Added Unit Tests
- [x] Covered by existing CI
- [ ] Integration tested
- [ ] Documentation update



<!-- This is an auto-generated comment: release notes by coderabbit.ai
-->

## Summary by CodeRabbit

- **Bug Fixes**
- Improved null handling in drift series processing to prevent potential
runtime exceptions
- Enhanced error resilience by safely checking for null values during
data aggregation

<!-- end of auto-generated comment: release notes by coderabbit.ai -->
)

## Summary

- branch protection requires explicitly adding github actions that need
to succeed before a user can merge a PR. Some github workflows do not
qualify to run for a PR based on path filtering etc. Skipped workflows
will still be required to "succeed" for a branch protection rule. This
PR adds a blanked workflow that will poll for workflows explicitly
triggered to succeed instead.

## Checklist
- [ ] Added Unit Tests
- [ ] Covered by existing CI
- [ ] Integration tested
- [ ] Documentation update
<!-- This is an auto-generated comment: release notes by coderabbit.ai
-->
## Summary by CodeRabbit

- **Chores**
- Added a new GitHub Actions workflow to enforce status checks on pull
requests.
- Reformatted test file for improved code readability without changing
functionality.
<!-- end of auto-generated comment: release notes by coderabbit.ai -->

<!-- av pr metadata
This information is embedded by the av CLI when creating PRs to track
the status of stacks when using Aviator. Please do not delete or edit
this section of the PR.
```
{"parent":"main","parentHead":"","trunk":"main"}
```
-->

---------

Co-authored-by: Thomas Chow <[email protected]>
## Summary
Plumbing through a setting that allows us to specify a checkpoint or a
savepoint location to resume a job. When we plug into the CLI we'll ask
users to manually specify it if needed, when we set up the orchestrator
it can be triggered as part of the deployment process - trigger
savepoint, deploy new build using savepoint, ..

## Checklist
- [ ] Added Unit Tests
- [ ] Covered by existing CI
- [X] Integration tested - Tested on the cluster and confirmed setting
is picked up
- [ ] Documentation update



<!-- This is an auto-generated comment: release notes by coderabbit.ai
-->
## Summary by CodeRabbit

- **New Features**
	- Enhanced Flink job submission with optional savepoint URI support.
	- Added ability to retain checkpoints when the job is canceled.

- **Improvements**
- Updated job submission configuration to support more flexible
checkpoint management.
- Introduced new constant for savepoint URI in the job submission
process.

- **Technical Updates**
- Modified checkpoint configuration to preserve checkpoints on job
cancellation.
	- Updated test cases to reflect new job submission parameters.
<!-- end of auto-generated comment: release notes by coderabbit.ai -->
## Summary

Manual cherry-pick from https://github.com/airbnb/chronon/pull/909/files

Should be rebased and tested off of
#276

## Checklist
- [ ] Added Unit Tests
- [ ] Covered by existing CI
- [ ] Integration tested
- [ ] Documentation update
- [x] Not yet tested, should test after merging 276



<!-- This is an auto-generated comment: release notes by coderabbit.ai
-->
## Summary by CodeRabbit

- **New Features**
- Added a builder pattern for `JavaFetcher` to enhance object creation
flexibility.
- Introduced an optional `ExecutionContext` parameter across multiple
classes to improve execution context management.

- **Improvements**
- Updated constructor signatures in Java and Scala classes to support
more configurable execution contexts.
- Enhanced instantiation process with more structured object creation
methods.
<!-- end of auto-generated comment: release notes by coderabbit.ai -->

---------

Co-authored-by: ezvz <[email protected]>
Co-authored-by: Nikhil Simha <[email protected]>
Co-authored-by: Thomas Chow <[email protected]>
## Summary
Cherry pick bugfix from oss

## Checklist
- [x] Added Unit Tests
- [ ] Covered by existing CI
- [ ] Integration tested
- [ ] Documentation update



<!-- This is an auto-generated comment: release notes by coderabbit.ai
-->

## Summary by CodeRabbit

- **New Features**
- Added a method to escape single quotes in string values during SQL
query generation
	- Introduced a new test method to validate join utility functionality

- **Tests**
	- Extended test coverage for join utilities with a new test method

The changes improve SQL query robustness and ensure proper handling of
string values in join operations.

<!-- end of auto-generated comment: release notes by coderabbit.ai -->

---------

Co-authored-by: ezvz <[email protected]>
Co-authored-by: Thomas Chow <[email protected]>
## Summary

Cherry picking oss 906 to memoize and reuse time range

## Checklist
- [ ] Added Unit Tests
- [ ] Covered by existing CI
- [ ] Integration tested
- [ ] Documentation update



<!-- This is an auto-generated comment: release notes by coderabbit.ai
-->
## Summary by CodeRabbit

- **New Features**
	- Introduced a new property for computing the time range of DataFrames.

- **Refactor**
	- Updated time range calculation method in DataFrame operations.
- Renamed and modified internal methods for improved clarity and
consistency.
	- Streamlined DataFrame statistics handling in join operations.

- **Technical Improvements**
- Updated method signatures to enhance code readability and
maintainability.
<!-- end of auto-generated comment: release notes by coderabbit.ai -->

---------

Co-authored-by: ezvz <[email protected]>
Co-authored-by: Thomas Chow <[email protected]>
## Summary

## Checklist
- [ ] Added Unit Tests
- [ ] Covered by existing CI
- [ ] Integration tested
- [ ] Documentation update


<!-- av pr metadata
This information is embedded by the av CLI when creating PRs to track
the status of stacks when using Aviator. Please do not delete or edit
this section of the PR.
```
{"parent":"main","parentHead":"","trunk":"main"}
```
-->

Co-authored-by: Thomas Chow <[email protected]>
…chema (#283)

## Summary
Wire up support to look up schemas from schema registry + set up
appropriate Avro Spark encoders to help with SparkExprEval.
There's a few things to call out:
* Had to move some TopicChecker code around (moved it out of the spark
module) to be able to re-use topic presence checks + partition count
lookups. We set parallelism based on partition count.
* Look up schemas in schema registry atm. We can add an implementation
that does these based on jar artifacts. The schema registry support can
also mean the Kafka wire format is schema registry (this is the Etsy
default). In which case every Kafka message consists of a magic byte +
schema ID int. There's other ways of using the registry (e.g. the way we
had it at Stripe) where the wire format is the avro / thrift / proto
bytes but the lookup of the schema is done based on topic + subject
only.
* Add support for a Flink Kafka source which plugs in a
[DeserializationSchema](https://github.com/apache/flink/blob/master/flink-core/src/main/java/org/apache/flink/api/common/serialization/DeserializationSchema.java).
This currently for us is Avro only and essentially allows us to crack
open Avro Array[Bytes] to a Spark Row that we use over the rest of the
Flink job (and makes it easier to run expr eval).
* Assign watermarks post spark expr eval -> this allows us to use the
user GroupBy Source configured timestamp column to accurately set
watermarks and timestamps. This is not needed in the untiled but will be
essential when we turn on tiling to ensure correctness.

## Checklist
- [X] Added Unit Tests
- [ ] Covered by existing CI
- [X] Integration tested - Kicked off on the cluster -
[Job](https://zztr2oh6h5dktnjaibsfknngae-dot-us-central1.dataproc.googleusercontent.com/gateway/default/yarn/proxy/application_1738016914840_0004/)
- [ ] Documentation update



<!-- This is an auto-generated comment: release notes by coderabbit.ai
-->
## Summary by CodeRabbit

Based on the comprehensive changes across multiple files, here are the
updated release notes:

- **New Features**
	- Added Kafka Schema Registry support for Flink and Spark.
- Introduced Avro deserialization support for Row-based data processing.
	- Enhanced event stream processing with watermarking capabilities.
	- Added support for dynamic topic and message bus configuration.
	- New `KafkaFlinkSource` class for integrating Kafka with Flink.
	- New `SchemaRegistrySchemaProvider` class for managing Avro schemas.
	- New `TopicCheckerApp` for command-line topic management.
	- New `User` Avro schema for structured user data representation.

- **Improvements**
	- Refactored package and import structures.
	- Improved error handling for schema and topic management.
	- Added more flexible configuration options for streaming sources.
	- Enhanced metric computation with edit distance functionality.

- **Bug Fixes**
- Updated topic and stream handling to support more dynamic
configurations.
	- Resolved potential issues with schema deserialization.

- **Chores**
	- Reorganized project structure.
	- Updated dependencies to include Confluent Schema Registry client.
<!-- end of auto-generated comment: release notes by coderabbit.ai -->
## Summary
- Starting this immediately might cause this check to race with the
other CI actions - it might just succeed before the other workflows have
a chance to start.
## Checklist
- [ ] Added Unit Tests
- [ ] Covered by existing CI
- [ ] Integration tested
- [ ] Documentation update


<!-- av pr metadata
This information is embedded by the av CLI when creating PRs to track
the status of stacks when using Aviator. Please do not delete or edit
this section of the PR.
```
{"parent":"main","parentHead":"","trunk":"main"}
```
-->


<!-- This is an auto-generated comment: release notes by coderabbit.ai
-->

## Summary by CodeRabbit

- **Chores**
- Updated GitHub workflow configuration to add a 10-second delay before
processing status checks.

<!-- end of auto-generated comment: release notes by coderabbit.ai -->

Co-authored-by: Thomas Chow <[email protected]>
@coderabbitai
Copy link
Contributor

coderabbitai bot commented Jan 29, 2025

Walkthrough

This pull request introduces enhancements to Spark-related utility classes, focusing on improving serialization, null value handling, and join operations. The changes span multiple files in the Spark module, including ChrononKryoRegistrator, Extensions, GroupBy, JoinUtils, and adding a new test case in JoinTest. The modifications primarily address Kryo serialization, DataFrame null filtering, and join operation clarity.

Changes

File Change Summary
spark/src/main/scala/ai/chronon/spark/ChrononKryoRegistrator.scala Added BitSet class registration for Kryo serialization
spark/src/main/scala/ai/chronon/spark/Extensions.scala Updated removeNulls method with includePartial parameter
spark/src/main/scala/ai/chronon/spark/GroupBy.scala Modified null filtering methods to use includePartial
spark/src/main/scala/ai/chronon/spark/JoinUtils.scala Refactored coalescedJoin method with improved DataFrame aliasing
spark/src/test/scala/ai/chronon/spark/test/JoinTest.scala Added new test case for partial key joins

Sequence Diagram

sequenceDiagram
    participant Kryo as Kryo Serializer
    participant DataFrame as DataFrame
    participant JoinUtils as Join Utility
    
    Kryo->>Kryo: Register BitSet class
    DataFrame->>DataFrame: Apply removeNulls with includePartial
    JoinUtils->>JoinUtils: Perform coalesced join with aliases
Loading

Possibly related PRs

Suggested reviewers

  • nikhil-zlai
  • piyush-zlai

Poem

🧩 Bits and bytes dance free,
Serialization's new melody
Null values filtered with grace
Joins embrace a clearer space
Code's symphony plays on! 🚀

Warning

Review ran into problems

🔥 Problems

GitHub Actions: Resource not accessible by integration - https://docs.github.com/rest/actions/workflow-runs#list-workflow-runs-for-a-repository.

Please grant the required permissions to the CodeRabbit GitHub App under the organization or repository settings.


🪧 Tips

Chat

There are 3 ways to chat with CodeRabbit:

  • Review comments: Directly reply to a review comment made by CodeRabbit. Example:
    • I pushed a fix in commit <commit_id>, please review it.
    • Generate unit testing code for this file.
    • Open a follow-up GitHub issue for this discussion.
  • Files and specific lines of code (under the "Files changed" tab): Tag @coderabbitai in a new review comment at the desired location with your query. Examples:
    • @coderabbitai generate unit testing code for this file.
    • @coderabbitai modularize this function.
  • PR comments: Tag @coderabbitai in a new PR comment to ask questions about the PR branch. For the best results, please provide a very specific query, as very limited context is provided in this mode. Examples:
    • @coderabbitai gather interesting stats about this repository and render them as a table. Additionally, render a pie chart showing the language distribution in the codebase.
    • @coderabbitai read src/utils.ts and generate unit testing code.
    • @coderabbitai read the files in the src/scheduler package and generate a class diagram using mermaid and a README in the markdown format.
    • @coderabbitai help me debug CodeRabbit configuration file.

Note: Be mindful of the bot's finite context window. It's strongly recommended to break down tasks such as reading entire modules into smaller chunks. For a focused discussion, use review comments to chat about specific files and their changes, instead of using the PR comments.

CodeRabbit Commands (Invoked using PR comments)

  • @coderabbitai pause to pause the reviews on a PR.
  • @coderabbitai resume to resume the paused reviews.
  • @coderabbitai review to trigger an incremental review. This is useful when automatic reviews are disabled for the repository.
  • @coderabbitai full review to do a full review from scratch and review all the files again.
  • @coderabbitai summary to regenerate the summary of the PR.
  • @coderabbitai generate docstrings to generate docstrings for this PR. (Beta)
  • @coderabbitai resolve resolve all the CodeRabbit review comments.
  • @coderabbitai configuration to show the current CodeRabbit configuration for the repository.
  • @coderabbitai help to get help.

Other keywords and placeholders

  • Add @coderabbitai ignore anywhere in the PR description to prevent this PR from being reviewed.
  • Add @coderabbitai summary to generate the high-level summary at a specific location in the PR description.
  • Add @coderabbitai anywhere in the PR title to generate the title automatically.

CodeRabbit Configuration File (.coderabbit.yaml)

  • You can programmatically configure CodeRabbit by adding a .coderabbit.yaml file to the root of your repository.
  • Please see the configuration documentation for more information.
  • If your editor has YAML language server enabled, you can add the path at the top of this file to enable auto-completion and validation: # yaml-language-server: $schema=https://coderabbit.ai/integrations/schema.v2.json

Documentation and Community

  • Visit our Documentation for detailed information on how to use CodeRabbit.
  • Join our Discord Community to get help, request features, and share feedback.
  • Follow us on X/Twitter for updates and announcements.

Copy link
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 0

🧹 Nitpick comments (3)
spark/src/test/scala/ai/chronon/spark/test/JoinTest.scala (3)

1341-1362: Consider parameterizing test data.

The test data is hardcoded. Consider extracting it to a test data helper for better maintainability.

-    val rows = Seq(
-      ("user", "email", "order", 0, LocalDateTime.of(2024, 5, 10, 0, 0)),
+    val TEST_DATA = Seq(
+      ("user", "email", "order", 0, LocalDateTime.of(2024, 5, 10, 0, 0)),

1372-1402: Consider adding validation for empty keys.

The makeJoinConf helper should validate that the keys sequence is not empty.

   def makeJoinConf(keys: Seq[String]) = {
+    require(keys.nonEmpty, "Keys sequence cannot be empty")
     val groupBy = Builders.GroupBy(

1421-1427: Add test cases for edge scenarios.

Consider adding test cases for:

  • All null keys
  • Mixed null and non-null keys
   // Check non-string nulls
   assertJoin(Seq("user", "amount"))
+  // Check all null keys
+  assertJoin(Seq("email", "amount"))
+  // Check mixed null and non-null keys
+  assertJoin(Seq("user", "email", "amount"))
📜 Review details

Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro (Legacy)

📥 Commits

Reviewing files that changed from the base of the PR and between 16c05a8 and 7e805f5.

📒 Files selected for processing (5)
  • spark/src/main/scala/ai/chronon/spark/ChrononKryoRegistrator.scala (1 hunks)
  • spark/src/main/scala/ai/chronon/spark/Extensions.scala (1 hunks)
  • spark/src/main/scala/ai/chronon/spark/GroupBy.scala (3 hunks)
  • spark/src/main/scala/ai/chronon/spark/JoinUtils.scala (1 hunks)
  • spark/src/test/scala/ai/chronon/spark/test/JoinTest.scala (2 hunks)
🧰 Additional context used
📓 Learnings (1)
spark/src/main/scala/ai/chronon/spark/ChrononKryoRegistrator.scala (1)
Learnt from: nikhil-zlai
PR: zipline-ai/chronon#51
File: spark/src/main/scala/ai/chronon/spark/ChrononKryoRegistrator.scala:192-200
Timestamp: 2024-11-26T19:47:53.900Z
Learning: Only suggest registering Delta Lake action classes for serialization if they are actually used in the codebase.
⏰ Context from checks skipped due to timeout of 90000ms (7)
  • GitHub Check: table_utils_delta_format_spark_tests
  • GitHub Check: other_spark_tests
  • GitHub Check: mutation_spark_tests
  • GitHub Check: fetcher_spark_tests
  • GitHub Check: scala_compile_fmt_fix
  • GitHub Check: join_spark_tests
  • GitHub Check: enforce_triggered_workflows
🔇 Additional comments (9)
spark/src/main/scala/ai/chronon/spark/ChrononKryoRegistrator.scala (1)

140-140: LGTM: BitSet registration added for Kryo serialization.

The addition of BitSet registration is appropriate for optimizing serialization performance.

spark/src/main/scala/ai/chronon/spark/Extensions.scala (1)

236-240: LGTM: Enhanced null handling flexibility.

The addition of includePartial parameter allows for more granular control over null filtering, supporting partial null keys in joins.

spark/src/main/scala/ai/chronon/spark/JoinUtils.scala (1)

155-185: LGTM: Improved join handling with null equality support.

Changes enhance join clarity through:

  • Explicit DataFrame aliasing
  • Proper null equality handling in join conditions
  • Clean column selection logic
spark/src/main/scala/ai/chronon/spark/GroupBy.scala (3)

194-194: LGTM: Consistent null handling in temporalEntities.

Added support for partial null keys in temporal entity processing.


297-297: LGTM: Consistent null handling in temporalEvents.

Added support for partial null keys in temporal event processing.


532-533: LGTM: Consistent null handling in GroupBy initialization.

Replaced string concatenation with proper removeNulls call supporting partial keys.

spark/src/test/scala/ai/chronon/spark/test/JoinTest.scala (3)

1321-1339: LGTM! Clean schema definition.

The schema includes all necessary fields for testing partial key joins.


1363-1371: LGTM! Clear source configuration.

The source configuration is well-structured with appropriate time column and partition settings.


1404-1419: Verify expected counts.

The hardcoded expected counts List(1, 1, 1) should be derived from the test data.

✅ Verification successful

Expected counts match test data structure

The hardcoded List(1, 1, 1) correctly represents one latest record for each of the three email combinations in the test data (email, email2, null).

🏁 Scripts executed

The following scripts were executed for the analysis:

Script:

#!/bin/bash
# Count unique combinations of user and email, and user and amount
echo "Verifying expected counts match the test data structure"
ast-grep --pattern 'val rows = Seq($$$)' | grep -c "LocalDateTime"

Length of output: 189


Script:

#!/bin/bash
# Find test data setup
rg -A 10 "val (rows|testData)" spark/src/test/scala/ai/chronon/spark/test/JoinTest.scala

Length of output: 1077

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.