Skip to content

Conversation

@varant-zlai
Copy link
Collaborator

@varant-zlai varant-zlai commented Jan 29, 2025

Summary

Cherry pick airbnb/chronon#781

Checklist

  • Added Unit Tests
  • Covered by existing CI
  • Integration tested
  • Documentation update

Summary by CodeRabbit

  • New Features

    • Added a new bounded_unique_count aggregation operation that limits unique element tracking.
    • Supports counting unique items with a configurable maximum limit (default 8).
    • Can be applied to various data types including integers, strings, and binary data.
  • Documentation

    • Updated documentation to explain the new bounded_unique_count aggregation type.
  • Tests

    • Added test cases to validate bounded unique count functionality across different scenarios.

ken-zlai and others added 30 commits September 24, 2024 16:00
# Conflicts:
#	build.sbt
#	hub/app/controllers/FrontendController.scala
#	hub/app/views/index.scala.html
#	hub/public/images/favicon.png
Focus on fraud case for demo. Make parquet file as data.parquet
Adds a readme, and keeps the docker container active so the parquet table can be accessed.
Create svelte project and build it using play for deployment
david-zlai and others added 13 commits January 27, 2025 13:47
## Summary

## Checklist
- [ ] Added Unit Tests
- [ ] Covered by existing CI
- [ ] Integration tested
- [ ] Documentation update



<!-- This is an auto-generated comment: release notes by coderabbit.ai
-->
## Summary by CodeRabbit

- **New Features**
  - Added configurable repartitioning option for DataFrame writes.
- Introduced a new configuration setting to control repartitioning
behavior.
  - Enhanced test suite with functionality to handle empty DataFrames.

- **Chores**
  - Improved code formatting and logging for DataFrame writing process.
<!-- end of auto-generated comment: release notes by coderabbit.ai -->

<!-- av pr metadata
This information is embedded by the av CLI when creating PRs to track
the status of stacks when using Aviator. Please do not delete or edit
this section of the PR.
```
{"parent":"main","parentHead":"","trunk":"main"}
```
-->

---------

Co-authored-by: tchow-zlai <[email protected]>
Co-authored-by: Thomas Chow <[email protected]>
)

## Summary

we cannot represent absent data in time series with null values because
the thrift serializer doesn't allow nulls in list<double> - so we create
a magic double value (based on string "chronon") to represent nulls

## Checklist
- [x] Added Unit Tests
- [ ] Covered by existing CI
- [ ] Integration tested
- [ ] Documentation update



<!-- This is an auto-generated comment: release notes by coderabbit.ai
-->

## Summary by CodeRabbit

## Release Notes

- **New Features**
- Introduced a new constant `magicNullDouble` for handling null value
representations

- **Bug Fixes**
- Improved null value handling in serialization and data processing
workflows

- **Tests**
- Added new test case for `TileDriftSeries` serialization to validate
null value management

The changes enhance data processing consistency and provide a
standardized approach to managing null values across the system.

<!-- end of auto-generated comment: release notes by coderabbit.ai -->
## Summary
This fixes an issue where Infinity/NaN values in drift calculations were
causing JSON parse errors in the frontend. These special values are now
converted to our standard magic null value (-2.7980863399423856E16)
before serialization.

## Checklist
- [x] Added Unit Tests
- [ ] Covered by existing CI
- [ ] Integration tested
- [ ] Documentation update



<!-- This is an auto-generated comment: release notes by coderabbit.ai
-->

## Summary by CodeRabbit

- **Bug Fixes**
- Enhanced error handling for special double values (infinity and NaN)
in data processing
- Improved serialization test case to handle null and special values
more robustly

- **Tests**
- Updated test case for `TileDriftSeries` serialization to cover edge
cases with special double values

<!-- end of auto-generated comment: release notes by coderabbit.ai -->
## Summary
Changed return type to seq (backed by ArrayBuffer under the hood). Added
unit tests.

## Checklist
- [x] Added Unit Tests
- [x] Covered by existing CI
- [ ] Integration tested
- [ ] Documentation update



<!-- This is an auto-generated comment: release notes by coderabbit.ai
-->
## Summary by CodeRabbit

- **Refactor**
- Updated SQL utility methods to return sequences of results instead of
optional single results.
- Modified result handling in SQL-related utility classes to support
multiple result rows.
- Enhanced logic to handle cases with no valid inputs in data processing
methods.

- **Tests**
	- Updated test cases to accommodate new result handling approach.
- Added new test for handling "explode" invocations in SQL select
clauses.
- Refined handling of null values and special floating-point values in
tests.

- **Bug Fixes**
- Improved error handling and result processing in SQL transformation
methods.
<!-- end of auto-generated comment: release notes by coderabbit.ai -->
## Summary
fixed null handling in drift test code.

## Checklist
- [ ] Added Unit Tests
- [x] Covered by existing CI
- [ ] Integration tested
- [ ] Documentation update



<!-- This is an auto-generated comment: release notes by coderabbit.ai
-->

## Summary by CodeRabbit

- **Bug Fixes**
- Improved null handling in drift series processing to prevent potential
runtime exceptions
- Enhanced error resilience by safely checking for null values during
data aggregation

<!-- end of auto-generated comment: release notes by coderabbit.ai -->
)

## Summary

- branch protection requires explicitly adding github actions that need
to succeed before a user can merge a PR. Some github workflows do not
qualify to run for a PR based on path filtering etc. Skipped workflows
will still be required to "succeed" for a branch protection rule. This
PR adds a blanked workflow that will poll for workflows explicitly
triggered to succeed instead.

## Checklist
- [ ] Added Unit Tests
- [ ] Covered by existing CI
- [ ] Integration tested
- [ ] Documentation update
<!-- This is an auto-generated comment: release notes by coderabbit.ai
-->
## Summary by CodeRabbit

- **Chores**
- Added a new GitHub Actions workflow to enforce status checks on pull
requests.
- Reformatted test file for improved code readability without changing
functionality.
<!-- end of auto-generated comment: release notes by coderabbit.ai -->

<!-- av pr metadata
This information is embedded by the av CLI when creating PRs to track
the status of stacks when using Aviator. Please do not delete or edit
this section of the PR.
```
{"parent":"main","parentHead":"","trunk":"main"}
```
-->

---------

Co-authored-by: Thomas Chow <[email protected]>
## Summary
Plumbing through a setting that allows us to specify a checkpoint or a
savepoint location to resume a job. When we plug into the CLI we'll ask
users to manually specify it if needed, when we set up the orchestrator
it can be triggered as part of the deployment process - trigger
savepoint, deploy new build using savepoint, ..

## Checklist
- [ ] Added Unit Tests
- [ ] Covered by existing CI
- [X] Integration tested - Tested on the cluster and confirmed setting
is picked up
- [ ] Documentation update



<!-- This is an auto-generated comment: release notes by coderabbit.ai
-->
## Summary by CodeRabbit

- **New Features**
	- Enhanced Flink job submission with optional savepoint URI support.
	- Added ability to retain checkpoints when the job is canceled.

- **Improvements**
- Updated job submission configuration to support more flexible
checkpoint management.
- Introduced new constant for savepoint URI in the job submission
process.

- **Technical Updates**
- Modified checkpoint configuration to preserve checkpoints on job
cancellation.
	- Updated test cases to reflect new job submission parameters.
<!-- end of auto-generated comment: release notes by coderabbit.ai -->
## Summary

Manual cherry-pick from https://github.com/airbnb/chronon/pull/909/files

Should be rebased and tested off of
#276

## Checklist
- [ ] Added Unit Tests
- [ ] Covered by existing CI
- [ ] Integration tested
- [ ] Documentation update
- [x] Not yet tested, should test after merging 276



<!-- This is an auto-generated comment: release notes by coderabbit.ai
-->
## Summary by CodeRabbit

- **New Features**
- Added a builder pattern for `JavaFetcher` to enhance object creation
flexibility.
- Introduced an optional `ExecutionContext` parameter across multiple
classes to improve execution context management.

- **Improvements**
- Updated constructor signatures in Java and Scala classes to support
more configurable execution contexts.
- Enhanced instantiation process with more structured object creation
methods.
<!-- end of auto-generated comment: release notes by coderabbit.ai -->

---------

Co-authored-by: ezvz <[email protected]>
Co-authored-by: Nikhil Simha <[email protected]>
Co-authored-by: Thomas Chow <[email protected]>
## Summary
Cherry pick bugfix from oss

## Checklist
- [x] Added Unit Tests
- [ ] Covered by existing CI
- [ ] Integration tested
- [ ] Documentation update



<!-- This is an auto-generated comment: release notes by coderabbit.ai
-->

## Summary by CodeRabbit

- **New Features**
- Added a method to escape single quotes in string values during SQL
query generation
	- Introduced a new test method to validate join utility functionality

- **Tests**
	- Extended test coverage for join utilities with a new test method

The changes improve SQL query robustness and ensure proper handling of
string values in join operations.

<!-- end of auto-generated comment: release notes by coderabbit.ai -->

---------

Co-authored-by: ezvz <[email protected]>
Co-authored-by: Thomas Chow <[email protected]>
## Summary

Cherry picking oss 906 to memoize and reuse time range

## Checklist
- [ ] Added Unit Tests
- [ ] Covered by existing CI
- [ ] Integration tested
- [ ] Documentation update



<!-- This is an auto-generated comment: release notes by coderabbit.ai
-->
## Summary by CodeRabbit

- **New Features**
	- Introduced a new property for computing the time range of DataFrames.

- **Refactor**
	- Updated time range calculation method in DataFrame operations.
- Renamed and modified internal methods for improved clarity and
consistency.
	- Streamlined DataFrame statistics handling in join operations.

- **Technical Improvements**
- Updated method signatures to enhance code readability and
maintainability.
<!-- end of auto-generated comment: release notes by coderabbit.ai -->

---------

Co-authored-by: ezvz <[email protected]>
Co-authored-by: Thomas Chow <[email protected]>
## Summary

## Checklist
- [ ] Added Unit Tests
- [ ] Covered by existing CI
- [ ] Integration tested
- [ ] Documentation update


<!-- av pr metadata
This information is embedded by the av CLI when creating PRs to track
the status of stacks when using Aviator. Please do not delete or edit
this section of the PR.
```
{"parent":"main","parentHead":"","trunk":"main"}
```
-->

Co-authored-by: Thomas Chow <[email protected]>
…chema (#283)

## Summary
Wire up support to look up schemas from schema registry + set up
appropriate Avro Spark encoders to help with SparkExprEval.
There's a few things to call out:
* Had to move some TopicChecker code around (moved it out of the spark
module) to be able to re-use topic presence checks + partition count
lookups. We set parallelism based on partition count.
* Look up schemas in schema registry atm. We can add an implementation
that does these based on jar artifacts. The schema registry support can
also mean the Kafka wire format is schema registry (this is the Etsy
default). In which case every Kafka message consists of a magic byte +
schema ID int. There's other ways of using the registry (e.g. the way we
had it at Stripe) where the wire format is the avro / thrift / proto
bytes but the lookup of the schema is done based on topic + subject
only.
* Add support for a Flink Kafka source which plugs in a
[DeserializationSchema](https://github.com/apache/flink/blob/master/flink-core/src/main/java/org/apache/flink/api/common/serialization/DeserializationSchema.java).
This currently for us is Avro only and essentially allows us to crack
open Avro Array[Bytes] to a Spark Row that we use over the rest of the
Flink job (and makes it easier to run expr eval).
* Assign watermarks post spark expr eval -> this allows us to use the
user GroupBy Source configured timestamp column to accurately set
watermarks and timestamps. This is not needed in the untiled but will be
essential when we turn on tiling to ensure correctness.

## Checklist
- [X] Added Unit Tests
- [ ] Covered by existing CI
- [X] Integration tested - Kicked off on the cluster -
[Job](https://zztr2oh6h5dktnjaibsfknngae-dot-us-central1.dataproc.googleusercontent.com/gateway/default/yarn/proxy/application_1738016914840_0004/)
- [ ] Documentation update



<!-- This is an auto-generated comment: release notes by coderabbit.ai
-->
## Summary by CodeRabbit

Based on the comprehensive changes across multiple files, here are the
updated release notes:

- **New Features**
	- Added Kafka Schema Registry support for Flink and Spark.
- Introduced Avro deserialization support for Row-based data processing.
	- Enhanced event stream processing with watermarking capabilities.
	- Added support for dynamic topic and message bus configuration.
	- New `KafkaFlinkSource` class for integrating Kafka with Flink.
	- New `SchemaRegistrySchemaProvider` class for managing Avro schemas.
	- New `TopicCheckerApp` for command-line topic management.
	- New `User` Avro schema for structured user data representation.

- **Improvements**
	- Refactored package and import structures.
	- Improved error handling for schema and topic management.
	- Added more flexible configuration options for streaming sources.
	- Enhanced metric computation with edit distance functionality.

- **Bug Fixes**
- Updated topic and stream handling to support more dynamic
configurations.
	- Resolved potential issues with schema deserialization.

- **Chores**
	- Reorganized project structure.
	- Updated dependencies to include Confluent Schema Registry client.
<!-- end of auto-generated comment: release notes by coderabbit.ai -->
## Summary
- Starting this immediately might cause this check to race with the
other CI actions - it might just succeed before the other workflows have
a chance to start.
## Checklist
- [ ] Added Unit Tests
- [ ] Covered by existing CI
- [ ] Integration tested
- [ ] Documentation update


<!-- av pr metadata
This information is embedded by the av CLI when creating PRs to track
the status of stacks when using Aviator. Please do not delete or edit
this section of the PR.
```
{"parent":"main","parentHead":"","trunk":"main"}
```
-->


<!-- This is an auto-generated comment: release notes by coderabbit.ai
-->

## Summary by CodeRabbit

- **Chores**
- Updated GitHub workflow configuration to add a 10-second delay before
processing status checks.

<!-- end of auto-generated comment: release notes by coderabbit.ai -->

Co-authored-by: Thomas Chow <[email protected]>
@coderabbitai
Copy link
Contributor

coderabbitai bot commented Jan 29, 2025

Walkthrough

This pull request introduces a new BoundedUniqueCount aggregation feature across multiple components of the Chronon system. The implementation spans Scala, Python, and Thrift files, adding a bounded unique counting operation that limits the number of unique elements tracked to a specified maximum (default 8). The changes enable a memory-efficient way to count unique items while preventing unbounded set growth.

Changes

File Change Summary
aggregator/.../SimpleAggregators.scala Added BoundedUniqueCount class with methods to track unique elements up to a limit
aggregator/.../ColumnAggregator.scala Added BOUNDED_UNIQUE_COUNT operation handling
api/py/.../group_by.py Introduced BOUNDED_UNIQUE_COUNT operation type
api/thrift/api.thrift Added BOUNDED_UNIQUE_COUNT = 19 to Operation enum
docs/source/.../GroupBy.md Documented new bounded_unique_count aggregation type
spark/test/.../FetcherTest.scala Updated test data generation with bounded unique count
spark/test/.../GroupByTest.scala Added test case for bounded unique count functionality
aggregator/.../test/BoundedUniqueCountTest.scala Added unit tests for BoundedUniqueCount class

Possibly related PRs

  • Cherry pick OSS 906 #285: This PR introduces a new operation BOUNDED_UNIQUE_COUNT in the ColumnAggregator, which is directly related to the new BoundedUniqueCount class added in the main PR, as both deal with bounded unique counting functionality.

Suggested Reviewers

  • piyush-zlai

Poem

🔢 Counting unique, but with grace and might,
Bounded by limits, keeping memory light,
A set that whispers "no more than k!"
Efficiency dancing in each byte's play,
Chronon's new magic, counting just right! 🎉

Warning

Review ran into problems

🔥 Problems

GitHub Actions: Resource not accessible by integration - https://docs.github.com/rest/actions/workflow-runs#list-workflow-runs-for-a-repository.

Please grant the required permissions to the CodeRabbit GitHub App under the organization or repository settings.


🪧 Tips

Chat

There are 3 ways to chat with CodeRabbit:

  • Review comments: Directly reply to a review comment made by CodeRabbit. Example:
    • I pushed a fix in commit <commit_id>, please review it.
    • Generate unit testing code for this file.
    • Open a follow-up GitHub issue for this discussion.
  • Files and specific lines of code (under the "Files changed" tab): Tag @coderabbitai in a new review comment at the desired location with your query. Examples:
    • @coderabbitai generate unit testing code for this file.
    • @coderabbitai modularize this function.
  • PR comments: Tag @coderabbitai in a new PR comment to ask questions about the PR branch. For the best results, please provide a very specific query, as very limited context is provided in this mode. Examples:
    • @coderabbitai gather interesting stats about this repository and render them as a table. Additionally, render a pie chart showing the language distribution in the codebase.
    • @coderabbitai read src/utils.ts and generate unit testing code.
    • @coderabbitai read the files in the src/scheduler package and generate a class diagram using mermaid and a README in the markdown format.
    • @coderabbitai help me debug CodeRabbit configuration file.

Note: Be mindful of the bot's finite context window. It's strongly recommended to break down tasks such as reading entire modules into smaller chunks. For a focused discussion, use review comments to chat about specific files and their changes, instead of using the PR comments.

CodeRabbit Commands (Invoked using PR comments)

  • @coderabbitai pause to pause the reviews on a PR.
  • @coderabbitai resume to resume the paused reviews.
  • @coderabbitai review to trigger an incremental review. This is useful when automatic reviews are disabled for the repository.
  • @coderabbitai full review to do a full review from scratch and review all the files again.
  • @coderabbitai summary to regenerate the summary of the PR.
  • @coderabbitai generate docstrings to generate docstrings for this PR. (Beta)
  • @coderabbitai resolve resolve all the CodeRabbit review comments.
  • @coderabbitai configuration to show the current CodeRabbit configuration for the repository.
  • @coderabbitai help to get help.

Other keywords and placeholders

  • Add @coderabbitai ignore anywhere in the PR description to prevent this PR from being reviewed.
  • Add @coderabbitai summary to generate the high-level summary at a specific location in the PR description.
  • Add @coderabbitai anywhere in the PR title to generate the title automatically.

CodeRabbit Configuration File (.coderabbit.yaml)

  • You can programmatically configure CodeRabbit by adding a .coderabbit.yaml file to the root of your repository.
  • Please see the configuration documentation for more information.
  • If your editor has YAML language server enabled, you can add the path at the top of this file to enable auto-completion and validation: # yaml-language-server: $schema=https://coderabbit.ai/integrations/schema.v2.json

Documentation and Community

  • Visit our Documentation for detailed information on how to use CodeRabbit.
  • Join our Discord Community to get help, request features, and share feedback.
  • Follow us on X/Twitter for updates and announcements.

Copy link
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 0

🧹 Nitpick comments (3)
spark/src/test/scala/ai/chronon/spark/test/GroupByTest.scala (1)

659-698: Add test coverage for edge cases.

The test verifies the count bounds but should also test:

  • Actual counts for known input data
  • Empty input behavior
  • Boundary values of k (e.g., k=1)
aggregator/src/main/scala/ai/chronon/aggregator/base/SimpleAggregators.scala (1)

621-657: LGTM! Clean implementation of bounded unique count.

The implementation correctly maintains a bounded set size and handles all required operations efficiently.

Two suggestions for improvement:

  1. Consider adding validation for negative k values
  2. Document the rationale for choosing 8 as the default limit
spark/src/test/scala/ai/chronon/spark/test/FetcherTest.scala (1)

578-582: Consider explicitly specifying the limit k.

For clarity and consistency with other test cases, consider specifying the limit explicitly.

 Builders.Aggregation(
   operation = Operation.BOUNDED_UNIQUE_COUNT,
   inputColumn = "rating",
+  argMap = Map("k" -> "5"),
   windows = Seq(new Window(1, TimeUnit.DAYS))
 )
📜 Review details

Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro (Legacy)

📥 Commits

Reviewing files that changed from the base of the PR and between 16c05a8 and 66c5214.

📒 Files selected for processing (7)
  • aggregator/src/main/scala/ai/chronon/aggregator/base/SimpleAggregators.scala (1 hunks)
  • aggregator/src/main/scala/ai/chronon/aggregator/row/ColumnAggregator.scala (1 hunks)
  • api/py/ai/chronon/group_by.py (1 hunks)
  • api/thrift/api.thrift (1 hunks)
  • docs/source/authoring_features/GroupBy.md (1 hunks)
  • spark/src/test/scala/ai/chronon/spark/test/FetcherTest.scala (2 hunks)
  • spark/src/test/scala/ai/chronon/spark/test/GroupByTest.scala (1 hunks)
⏰ Context from checks skipped due to timeout of 90000ms (8)
  • GitHub Check: fetcher_spark_tests
  • GitHub Check: table_utils_delta_format_spark_tests
  • GitHub Check: other_spark_tests
  • GitHub Check: mutation_spark_tests
  • GitHub Check: scala_compile_fmt_fix
  • GitHub Check: no_spark_scala_tests
  • GitHub Check: join_spark_tests
  • GitHub Check: enforce_triggered_workflows
🔇 Additional comments (5)
api/thrift/api.thrift (1)

166-168: LGTM!

The enum value follows the existing pattern and is logically placed.

aggregator/src/main/scala/ai/chronon/aggregator/row/ColumnAggregator.scala (1)

307-319: LGTM!

The implementation is consistent with other count operations and handles all primitive types correctly.

api/py/ai/chronon/group_by.py (1)

68-68: LGTM!

The constant follows the existing pattern and is logically placed.

spark/src/test/scala/ai/chronon/spark/test/FetcherTest.scala (1)

310-313: LGTM! Well-configured bounded unique count aggregation.

The aggregation is properly configured with appropriate windows and limit.

docs/source/authoring_features/GroupBy.md (1)

150-151: LGTM! Clear and accurate documentation.

The bounded_unique_count aggregation is well-documented with all necessary properties.

ken-zlai and others added 3 commits January 29, 2025 13:42
## Summary
These tests aren't really doing anything valuable, might as well remove
them

## Checklist
- [ ] Added Unit Tests
- [ ] Covered by existing CI
- [ ] Integration tested
- [ ] Documentation update



<!-- This is an auto-generated comment: release notes by coderabbit.ai
-->

## Summary by CodeRabbit

- **Tests**
	- Removed a unit test suite for model-related types and API responses.

<!-- end of auto-generated comment: release notes by coderabbit.ai -->
… PoJos (#294)

## Summary
We use case classes for many of our intermediate types in Flink and a
few of these are persisted in Flink state. In the tiled setup the
TimestampIR as an example. In untiled & tiled the Input and Output types
of the AsyncKVWriter are persisted to state. In the future if we do need
to update these intermediate types to include additional fields (like
we're thinking of doing to support tiling) it will not be possible as
Flink doesn't support [state schema evolution for case
classes](https://nightlies.apache.org/flink/flink-docs-release-1.17/docs/dev/datastream/fault-tolerance/serialization/schema_evolution/).
Due to that we'll need to do a time consuming migration where we spin up
parallel operators with the new types, dual write and then cut over in a
subsequent job. Instead we can try and set this up before we're in prod
to hopefully minimize running into this issue down the line.

This PR essentially swaps the case classes for Scala PoJo types for the
tiled aggregations, AvroCodecOutput (as that feeds into KV store writer)
and the KV store write response. In a subsequent PR we can update the
TimestampTile to include the startTs of the tile (and plumb the latestTs
through to the sink so we can track e2e lag).

(Choose the Scala PoJo route here as there's a bit of interplay with
some of our aggregator libs and other Scala code related to these
classes - e.g. the FlinkRowAggregatorFunction and there's a bunch of
casting needed to interop. The Flink 2.0 migration will likely be a
decent sized chunk given all our Flink code is in Scala and I think we
can bite it off then)

## Checklist
- [ ] Added Unit Tests
- [X] Covered by existing CI
- [X] Integration tested - Confirmed that this works by kicking off our
TestFlinkJob on the cluster
- [ ] Documentation update



<!-- This is an auto-generated comment: release notes by coderabbit.ai
-->

## Summary by CodeRabbit

- **New Features**
- Introduced new data types (`AvroCodecOutput`, `TimestampedTile`,
`TimestampedIR`, `WriteResponse`) for improved data handling in Flink
jobs
	- Enhanced schema evolution support for stateful data processing

- **Refactor**
- Replaced `PutRequest` with `AvroCodecOutput` across multiple Flink
processing components
- Updated method signatures and data type handling in various
Flink-related classes
	- Simplified timestamp retrieval and data structure interactions

- **Chores**
	- Reorganized type imports and package structure
	- Updated test cases to align with new data type implementations

<!-- end of auto-generated comment: release notes by coderabbit.ai -->
## Summary

- Sometimes we have DateTypes in the the spark dataframe. Let's try to
support the ccorresponding avro conversion of that.

## Checklist
- [ ] Added Unit Tests
- [ ] Covered by existing CI
- [ ] Integration tested
- [ ] Documentation update
<!-- This is an auto-generated comment: release notes by coderabbit.ai
-->
## Summary by CodeRabbit

- **New Features**
	- Enhanced date type handling in Avro conversions.
	- Added support for Java 8 DateTime API in Spark.
	- Improved Kryo serialization for LocalDate objects.
- New test case to verify BigQuery connector's date conversion behavior.

- **Improvements**
	- Added logical type conversion for date handling in Avro encoding.
	- Enabled better compatibility with Java 8 date and time types.
- Added context for DateType mapping to LocalDate based on
configuration.
	- Updated dependency version for improved functionality.
<!-- end of auto-generated comment: release notes by coderabbit.ai -->

<!-- av pr metadata
This information is embedded by the av CLI when creating PRs to track
the status of stacks when using Aviator. Please do not delete or edit
this section of the PR.
```
{"parent":"main","parentHead":"","trunk":"main"}
```
-->

---------

Co-authored-by: Thomas Chow <[email protected]>
@piyush-zlai
Copy link
Contributor

@varant-zlai - do we also need: airbnb/chronon#781 (review) ?
(The comment from Pengyu)

piyush-zlai and others added 4 commits January 30, 2025 13:34
## Summary
Define the thrift interface for the TileKey struct as we're building out
the tiling + tile layering implementation on GCP.

Idea is that Flink code will create a tile key struct with the
appropriate fields, serialize it and send it as part of a PutRequest
object in a multiput call. BigTableKVStoreImpl (and later DDBImpl etc)
will deser this TileKey struct, construct the appropriate RowKey /
timestamp structure and write out to the store.

On the Fetcher side for each tile size we support (just one until we
support tile layering) we will create a TileKey and pass it as part of a
GetRequest object (which also includes the start / end range). KV store
will deser this and run the lookup + range query.

I've currently not added utils to help with murmur hashing pieces of the
TileKey that are needed for BigTable's RowKey. We can add those if we
decide to pursue the hashing to distribute data option.

## Checklist
- [X] Added Unit Tests
- [ ] Covered by existing CI
- [ ] Integration tested
- [ ] Documentation update



<!-- This is an auto-generated comment: release notes by coderabbit.ai
-->
## Summary by CodeRabbit

- **New Features**
- Introduced `TilingUtils` with serialization and deserialization
capabilities for `TileKey` objects.
	- Added `TileKey` struct for managing tile-based data partitioning.

- **Tests**
- Added comprehensive test suite for `TilingUtils` serialization
functionality.
<!-- end of auto-generated comment: release notes by coderabbit.ai -->
## Summary

Cherry pick: https://github.com/airbnb/chronon/pull/774/files

## Checklist
- [x] Added Unit Tests
- [ ] Covered by existing CI
- [ ] Integration tested
- [ ] Documentation update



<!-- This is an auto-generated comment: release notes by coderabbit.ai
-->

## Summary by CodeRabbit

- **New Features**
	- Enhanced group-by request processing with improved key validation
- Added a new method for parsing group-by responses with more robust
error handling

- **Tests**
	- Added comprehensive test cases for new response parsing method
	- Verified handling of scenarios with null keys and missing keys

<!-- end of auto-generated comment: release notes by coderabbit.ai -->

Co-authored-by: ezvz <[email protected]>
@varant-zlai varant-zlai force-pushed the vz--cherry-pick-bounded-unique-count-oss781 branch from 66c5214 to c53f3e2 Compare January 30, 2025 19:19
Copy link
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 0

🧹 Nitpick comments (2)
aggregator/src/main/scala/ai/chronon/aggregator/base/SimpleAggregators.scala (1)

621-621: Consider documenting the default value of k.

Add a comment explaining why 8 was chosen as the default value.

aggregator/src/test/scala/ai/chronon/aggregator/test/BoundedUniqueCountTest.scala (1)

1-44: Add more test cases for edge scenarios.

Consider adding tests for:

  • Empty set behavior
  • Null input handling
  • Duplicate input handling
📜 Review details

Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro (Legacy)

📥 Commits

Reviewing files that changed from the base of the PR and between 66c5214 and c53f3e2.

📒 Files selected for processing (8)
  • aggregator/src/main/scala/ai/chronon/aggregator/base/SimpleAggregators.scala (1 hunks)
  • aggregator/src/main/scala/ai/chronon/aggregator/row/ColumnAggregator.scala (1 hunks)
  • aggregator/src/test/scala/ai/chronon/aggregator/test/BoundedUniqueCountTest.scala (1 hunks)
  • api/py/ai/chronon/group_by.py (2 hunks)
  • api/thrift/api.thrift (1 hunks)
  • docs/source/authoring_features/GroupBy.md (1 hunks)
  • spark/src/test/scala/ai/chronon/spark/test/FetcherTest.scala (2 hunks)
  • spark/src/test/scala/ai/chronon/spark/test/GroupByTest.scala (1 hunks)
🚧 Files skipped from review as they are similar to previous changes (4)
  • docs/source/authoring_features/GroupBy.md
  • api/thrift/api.thrift
  • api/py/ai/chronon/group_by.py
  • aggregator/src/main/scala/ai/chronon/aggregator/row/ColumnAggregator.scala
⏰ Context from checks skipped due to timeout of 90000ms (8)
  • GitHub Check: other_spark_tests
  • GitHub Check: table_utils_delta_format_spark_tests
  • GitHub Check: mutation_spark_tests
  • GitHub Check: join_spark_tests
  • GitHub Check: fetcher_spark_tests
  • GitHub Check: no_spark_scala_tests
  • GitHub Check: scala_compile_fmt_fix
  • GitHub Check: enforce_triggered_workflows
🔇 Additional comments (4)
aggregator/src/main/scala/ai/chronon/aggregator/base/SimpleAggregators.scala (1)

621-657: LGTM! Clean and efficient implementation.

The implementation correctly maintains a bounded set of unique elements and handles all required operations appropriately.

spark/src/test/scala/ai/chronon/spark/test/GroupByTest.scala (1)

659-698: LGTM! Comprehensive integration test.

The test effectively validates that bounded unique counts respect the specified limit.

spark/src/test/scala/ai/chronon/spark/test/FetcherTest.scala (2)

310-313: LGTM! Good test configuration.

The bounded unique count is well configured with appropriate windows and limit.


579-583: LGTM! Good test coverage.

The test effectively validates bounded unique count in event-only scenarios.

@varant-zlai
Copy link
Collaborator Author

@varant-zlai - do we also need: airbnb/chronon#781 (review) ? (The comment from Pengyu)

Good call, updated!

}

class BoundedUniqueCount[T](inputType: DataType, k: Int = 8) extends SimpleAggregator[T, util.Set[T], Long] {
override def prepare(input: T): util.Set[T] = {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we should convert these inputs to their hashes and just store the hashes

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.