Skip to content

Conversation

@SophieYu41
Copy link
Collaborator

@SophieYu41 SophieYu41 commented Mar 23, 2023

Summary

Add label aggregation support.
Similar to existing group_by aggregations with following conditions applied.
More details can be found here in "Aggregation Design" Section - https://docs.google.com/document/d/1ccFfws6Sjggxys2AUkXO9sXV7h4QSfZZx_xieMhZf5s/edit

  • Single label and single aggregation allowed for labelPart group_by.
  • (LeftStartOffset, LeftEndOffset) will be inferred from the aggregation window and by default equals to window size - 1.
  • Aggregation window can not be unbounded
  • Aggregation time unit granularity = DAY

Next step:
Allow multiple label joinPart with different windows for aggregation

Why / Goal

Test Plan

  • Added Unit Tests
  • Covered by existing CI
  • Integration tested

Checklist

  • Documentation update

Reviewers

@hzding621 @yunfeng-hao

@SophieYu41 SophieYu41 force-pushed the sophie-label-agg branch 8 times, most recently from dc6ecf5 to 08f14e3 Compare March 31, 2023 17:57
Copy link
Collaborator

@hzding621 hzding621 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

very clean and well tested!

@hzding621
Copy link
Collaborator

(LeftStartOffset, LeftEndOffset) will be inferred from the aggregation window and by default equals to (label_ds - window size). Earlier label_ds will be backfilled given the leftStartOffset

  1. do you mean by default equals to "window size"?
  2. backfill is not implemented in this PR (as we discussed), right?

@SophieYu41
Copy link
Collaborator Author

SophieYu41 commented Apr 4, 2023

(LeftStartOffset, LeftEndOffset) will be inferred from the aggregation window and by default equals to (label_ds - window size). Earlier label_ds will be backfilled given the leftStartOffset

  1. do you mean by default equals to "window size"?
  2. backfill is not implemented in this PR (as we discussed), right?

Yes and yes!

@SophieYu41 SophieYu41 requested a review from hzding621 April 4, 2023 20:37
comparing to label_ds date specified
:param left_end_offset: Integer to define the most recent date label should be refreshed.
:param left_start_offset: Integer to define the earliest date(inclusive) label should be refreshed
comparing to label_ds date specified. For labels with aggregations,
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This file doesn't have label_ds defined. The label join is complicated, maybe add an example in your doc to explain these ds, offset.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good point will do

Sophie Wang added 2 commits April 6, 2023 12:15
@SophieYu41 SophieYu41 merged commit 3c41998 into master Apr 6, 2023
@SophieYu41 SophieYu41 deleted the sophie-label-agg branch April 6, 2023 19:55
tchow-zlai added a commit to zipline-ai/chronon that referenced this pull request Nov 21, 2024
## Summary

- https://app.asana.com/0/1208785567265389/1208812512114700

- This PR addresses some flaky unit test behavior that we've been
observing in the zipline fork. See:
https://zipline-2kh4520.slack.com/archives/C072LUA50KA/p1732043073171339?thread_ts=1732042778.209419&cid=C072LUA50KA

- A previous [CI
test](https://github.com/zipline-ai/chronon/actions/runs/11946764068/job/33301642119?pr=72
) shows that `other_spark_tests` intermittently fails due to a couple
reasons. This PR addresses the flakiness of [FeatureWithLabelJoinTest
.testFinalViewsWithAggLabel](
https://github.com/zipline-ai/chronon/blob/6cb6273551e024d6eecb068f754b510ae0aac464/spark/src/test/scala/ai/chronon/spark/test/FeatureWithLabelJoinTest.scala#L118),
where sometimes the test assertion fails with an unexpected result
value.

### Synopsis

Looks like during a rewrite/refactoring of the code, we did not preserve
the functionality. The diff starts to happen at the time of computing
label joins per partition range, in particular when we materialize the
label join and [scan it
back](https://github.com/zipline-ai/chronon/blob/b64f44d57c90367ccfcb5d5c96327a1ef820e2b3/spark/src/main/scala/ai/chronon/spark/LabelJoin.scala#L200).

In the OSS version, the
[scan](https://github.com/airbnb/chronon/blob/6968c5c29b6e48867f8c08f2b9b8281f09d47c16/spark/src/main/scala/ai/chronon/spark/LabelJoin.scala#L192-L193)
applies a [partition
filter](https://github.com/airbnb/chronon/blob/6968c5c29b6e48867f8c08f2b9b8281f09d47c16/spark/src/main/scala/ai/chronon/spark/DataRange.scala#L102-L104).
We dropped these partition filters during the
[refactoring](c6a377c#diff-57b1d6132977475fa0e87a71f017e66f4a7c94f466f911b33e9178598c6c058dL97-R102)
on Zipline side. As such, the physical plans produced by these two scans
are different:

```
// Zipline
== Physical Plan ==
*(1) ColumnarToRow
+- FileScan parquet spark_catalog.final_join.label_agg_table_listing_labels_agg[listing#53934L,is_active_max_5d#53935,label_ds#53936] Batched: true, DataFilters: [], Format: Parquet, Location: CatalogFileIndex(1 paths)[file:/tmp/chronon/spark-warehouse_6fcd3d/data/final_join.db/label_agg_t..., PartitionFilters: [], PushedFilters: [], ReadSchema: struct<listing:bigint,is_active_max_5d:int>
```

```
// OSS
== Physical Plan ==
Coalesce 1000
+- *(1) ColumnarToRow
   +- FileScan parquet final_join_xggqlu.label_agg_table_listing_labels_agg[listing#50981L,is_active_max_5d#50982,label_ds#50983] Batched: true, DataFilters: [], Format: Parquet, Location: InMemoryFileIndex[file:/tmp/chronon/spark-warehouse_69002f/data/final_join_xggqlu.db/label_agg_ta..., PartitionFilters: [isnotnull(label_ds#50983), (label_ds#50983 >= 2022-10-07), (label_ds#50983 <= 2022-10-07)], PushedFilters: [], ReadSchema: struct<listing:bigint,is_active_max_5d:int>
```

Note that OSS has a non-empty partition filter: `PartitionFilters:
[isnotnull(label_ds#50983), (label_ds#50983 >= 2022-10-07),
(label_ds#50983 <= 2022-10-07)]` where Zipline does not.

The fix is to add these partition filters back, as done in this PR. 





~### Abandoned Investigation~

~It looks like there is some non-determinism computing one of the
intermittent dataframes when computing label joins.
[`dropDuplicates`](https://github.com/zipline-ai/chronon/blob/6cb6273551e024d6eecb068f754b510ae0aac464/spark/src/main/scala/ai/chronon/spark/LabelJoin.scala#L215)
seems to be operating on a row compound key `rowIdentifier`, which
doesn't produce deterministic results. As such we sometimes lose the
expected values. This
[change](https://github.com/airbnb/chronon/pull/380/files#diff-2c74cac973e1af38b615f654fee5b0261594a2b0005ecfd5a8f0941b8e348eedR156)
was introduced in OSS upstream almost 2 years ago. This
[test](airbnb/chronon#435) was contributed a
couple months after .~


~See debugger local values comparison. The left side is test failure,
and right side is test success.~


~<img width="1074" alt="Screenshot 2024-11-21 at 9 26 04 AM"
src="https://github.com/user-attachments/assets/0eba555c-43ab-48a6-bf61-bbb7b4fa2445">~


~Removing the `dropDuplicates` call will allow the tests to pass.
However, unclear if this will produce the semantically correct behavior,
as the tests themselves seem~

## Checklist
- [x] Added Unit Tests
- [ ] Covered by existing CI
- [ ] Integration tested
- [ ] Documentation update



<!-- This is an auto-generated comment: release notes by coderabbit.ai
-->
## Summary by CodeRabbit

- **New Features**
- Reintroduced a testing method to validate label joins, ensuring
accuracy in data processing.

- **Improvements**
- Enhanced data retrieval logic for label joins, emphasizing unique
entries and clearer range specifications.
<!-- end of auto-generated comment: release notes by coderabbit.ai -->
smcnamara2-stripe pushed a commit to smcnamara2-stripe/chronon that referenced this pull request Mar 10, 2025
kumar-zlai pushed a commit to zipline-ai/chronon that referenced this pull request Apr 25, 2025
## Summary

- https://app.asana.com/0/1208785567265389/1208812512114700

- This PR addresses some flaky unit test behavior that we've been
observing in the zipline fork. See:
https://zipline-2kh4520.slack.com/archives/C072LUA50KA/p1732043073171339?thread_ts=1732042778.209419&cid=C072LUA50KA

- A previous [CI
test](https://github.com/zipline-ai/chronon/actions/runs/11946764068/job/33301642119?pr=72
) shows that `other_spark_tests` intermittently fails due to a couple
reasons. This PR addresses the flakiness of [FeatureWithLabelJoinTest
.testFinalViewsWithAggLabel](
https://github.com/zipline-ai/chronon/blob/42c66964e1b14e4b16de4fc9f8474eeefb3f154e/spark/src/test/scala/ai/chronon/spark/test/FeatureWithLabelJoinTest.scala#L118),
where sometimes the test assertion fails with an unexpected result
value.

### Synopsis

Looks like during a rewrite/refactoring of the code, we did not preserve
the functionality. The diff starts to happen at the time of computing
label joins per partition range, in particular when we materialize the
label join and [scan it
back](https://github.com/zipline-ai/chronon/blob/b64f44d57c90367ccfcb5d5c96327a1ef820e2b3/spark/src/main/scala/ai/chronon/spark/LabelJoin.scala#L200).

In the OSS version, the
[scan](https://github.com/airbnb/chronon/blob/6968c5c29b6e48867f8c08f2b9b8281f09d47c16/spark/src/main/scala/ai/chronon/spark/LabelJoin.scala#L192-L193)
applies a [partition
filter](https://github.com/airbnb/chronon/blob/6968c5c29b6e48867f8c08f2b9b8281f09d47c16/spark/src/main/scala/ai/chronon/spark/DataRange.scala#L102-L104).
We dropped these partition filters during the
[refactoring](3f010d6#diff-57b1d6132977475fa0e87a71f017e66f4a7c94f466f911b33e9178598c6c058dL97-R102)
on Zipline side. As such, the physical plans produced by these two scans
are different:

```
// Zipline
== Physical Plan ==
*(1) ColumnarToRow
+- FileScan parquet spark_catalog.final_join.label_agg_table_listing_labels_agg[listing#53934L,is_active_max_5d#53935,label_ds#53936] Batched: true, DataFilters: [], Format: Parquet, Location: CatalogFileIndex(1 paths)[file:/tmp/chronon/spark-warehouse_6fcd3d/data/final_join.db/label_agg_t..., PartitionFilters: [], PushedFilters: [], ReadSchema: struct<listing:bigint,is_active_max_5d:int>
```

```
// OSS
== Physical Plan ==
Coalesce 1000
+- *(1) ColumnarToRow
   +- FileScan parquet final_join_xggqlu.label_agg_table_listing_labels_agg[listing#50981L,is_active_max_5d#50982,label_ds#50983] Batched: true, DataFilters: [], Format: Parquet, Location: InMemoryFileIndex[file:/tmp/chronon/spark-warehouse_69002f/data/final_join_xggqlu.db/label_agg_ta..., PartitionFilters: [isnotnull(label_ds#50983), (label_ds#50983 >= 2022-10-07), (label_ds#50983 <= 2022-10-07)], PushedFilters: [], ReadSchema: struct<listing:bigint,is_active_max_5d:int>
```

Note that OSS has a non-empty partition filter: `PartitionFilters:
[isnotnull(label_ds#50983), (label_ds#50983 >= 2022-10-07),
(label_ds#50983 <= 2022-10-07)]` where Zipline does not.

The fix is to add these partition filters back, as done in this PR. 





~### Abandoned Investigation~

~It looks like there is some non-determinism computing one of the
intermittent dataframes when computing label joins.
[`dropDuplicates`](https://github.com/zipline-ai/chronon/blob/42c66964e1b14e4b16de4fc9f8474eeefb3f154e/spark/src/main/scala/ai/chronon/spark/LabelJoin.scala#L215)
seems to be operating on a row compound key `rowIdentifier`, which
doesn't produce deterministic results. As such we sometimes lose the
expected values. This
[change](https://github.com/airbnb/chronon/pull/380/files#diff-2c74cac973e1af38b615f654fee5b0261594a2b0005ecfd5a8f0941b8e348eedR156)
was introduced in OSS upstream almost 2 years ago. This
[test](airbnb/chronon#435) was contributed a
couple months after .~


~See debugger local values comparison. The left side is test failure,
and right side is test success.~


~<img width="1074" alt="Screenshot 2024-11-21 at 9 26 04 AM"
src="https://github.com/user-attachments/assets/0eba555c-43ab-48a6-bf61-bbb7b4fa2445">~


~Removing the `dropDuplicates` call will allow the tests to pass.
However, unclear if this will produce the semantically correct behavior,
as the tests themselves seem~

## Checklist
- [x] Added Unit Tests
- [ ] Covered by existing CI
- [ ] Integration tested
- [ ] Documentation update



<!-- This is an auto-generated comment: release notes by coderabbit.ai
-->
## Summary by CodeRabbit

- **New Features**
- Reintroduced a testing method to validate label joins, ensuring
accuracy in data processing.

- **Improvements**
- Enhanced data retrieval logic for label joins, emphasizing unique
entries and clearer range specifications.
<!-- end of auto-generated comment: release notes by coderabbit.ai -->
kumar-zlai pushed a commit to zipline-ai/chronon that referenced this pull request Apr 29, 2025
## Summary

- https://app.asana.com/0/1208785567265389/1208812512114700

- This PR addresses some flaky unit test behavior that we've been
observing in the zipline fork. See:
https://zipline-2kh4520.slack.com/archives/C072LUA50KA/p1732043073171339?thread_ts=1732042778.209419&cid=C072LUA50KA

- A previous [CI
test](https://github.com/zipline-ai/chronon/actions/runs/11946764068/job/33301642119?pr=72
) shows that `other_spark_tests` intermittently fails due to a couple
reasons. This PR addresses the flakiness of [FeatureWithLabelJoinTest
.testFinalViewsWithAggLabel](
https://github.com/zipline-ai/chronon/blob/2cfe42d7d02b6ddd073e42f3d930dc64c93da219/spark/src/test/scala/ai/chronon/spark/test/FeatureWithLabelJoinTest.scala#L118),
where sometimes the test assertion fails with an unexpected result
value.

### Synopsis

Looks like during a rewrite/refactoring of the code, we did not preserve
the functionality. The diff starts to happen at the time of computing
label joins per partition range, in particular when we materialize the
label join and [scan it
back](https://github.com/zipline-ai/chronon/blob/b64f44d57c90367ccfcb5d5c96327a1ef820e2b3/spark/src/main/scala/ai/chronon/spark/LabelJoin.scala#L200).

In the OSS version, the
[scan](https://github.com/airbnb/chronon/blob/6968c5c29b6e48867f8c08f2b9b8281f09d47c16/spark/src/main/scala/ai/chronon/spark/LabelJoin.scala#L192-L193)
applies a [partition
filter](https://github.com/airbnb/chronon/blob/6968c5c29b6e48867f8c08f2b9b8281f09d47c16/spark/src/main/scala/ai/chronon/spark/DataRange.scala#L102-L104).
We dropped these partition filters during the
[refactoring](3719912#diff-57b1d6132977475fa0e87a71f017e66f4a7c94f466f911b33e9178598c6c058dL97-R102)
on Zipline side. As such, the physical plans produced by these two scans
are different:

```
// Zipline
== Physical Plan ==
*(1) ColumnarToRow
+- FileScan parquet spark_catalog.final_join.label_agg_table_listing_labels_agg[listing#53934L,is_active_max_5d#53935,label_ds#53936] Batched: true, DataFilters: [], Format: Parquet, Location: CatalogFileIndex(1 paths)[file:/tmp/chronon/spark-warehouse_6fcd3d/data/final_join.db/label_agg_t..., PartitionFilters: [], PushedFilters: [], ReadSchema: struct<listing:bigint,is_active_max_5d:int>
```

```
// OSS
== Physical Plan ==
Coalesce 1000
+- *(1) ColumnarToRow
   +- FileScan parquet final_join_xggqlu.label_agg_table_listing_labels_agg[listing#50981L,is_active_max_5d#50982,label_ds#50983] Batched: true, DataFilters: [], Format: Parquet, Location: InMemoryFileIndex[file:/tmp/chronon/spark-warehouse_69002f/data/final_join_xggqlu.db/label_agg_ta..., PartitionFilters: [isnotnull(label_ds#50983), (label_ds#50983 >= 2022-10-07), (label_ds#50983 <= 2022-10-07)], PushedFilters: [], ReadSchema: struct<listing:bigint,is_active_max_5d:int>
```

Note that OSS has a non-empty partition filter: `PartitionFilters:
[isnotnull(label_ds#50983), (label_ds#50983 >= 2022-10-07),
(label_ds#50983 <= 2022-10-07)]` where Zipline does not.

The fix is to add these partition filters back, as done in this PR. 





~### Abandoned Investigation~

~It looks like there is some non-determinism computing one of the
intermittent dataframes when computing label joins.
[`dropDuplicates`](https://github.com/zipline-ai/chronon/blob/2cfe42d7d02b6ddd073e42f3d930dc64c93da219/spark/src/main/scala/ai/chronon/spark/LabelJoin.scala#L215)
seems to be operating on a row compound key `rowIdentifier`, which
doesn't produce deterministic results. As such we sometimes lose the
expected values. This
[change](https://github.com/airbnb/chronon/pull/380/files#diff-2c74cac973e1af38b615f654fee5b0261594a2b0005ecfd5a8f0941b8e348eedR156)
was introduced in OSS upstream almost 2 years ago. This
[test](airbnb/chronon#435) was contributed a
couple months after .~


~See debugger local values comparison. The left side is test failure,
and right side is test success.~


~<img width="1074" alt="Screenshot 2024-11-21 at 9 26 04 AM"
src="https://github.com/user-attachments/assets/0eba555c-43ab-48a6-bf61-bbb7b4fa2445">~


~Removing the `dropDuplicates` call will allow the tests to pass.
However, unclear if this will produce the semantically correct behavior,
as the tests themselves seem~

## Checklist
- [x] Added Unit Tests
- [ ] Covered by existing CI
- [ ] Integration tested
- [ ] Documentation update



<!-- This is an auto-generated comment: release notes by coderabbit.ai
-->
## Summary by CodeRabbit

- **New Features**
- Reintroduced a testing method to validate label joins, ensuring
accuracy in data processing.

- **Improvements**
- Enhanced data retrieval logic for label joins, emphasizing unique
entries and clearer range specifications.
<!-- end of auto-generated comment: release notes by coderabbit.ai -->
chewy-zlai pushed a commit to zipline-ai/chronon that referenced this pull request May 15, 2025
## Summary

- https://app.asana.com/0/1208785567265389/1208812512114700

- This PR addresses some flaky unit test behavior that we've been
observing in the zipline fork. See:
https://zipline-2kh4520.slack.com/archives/C072LUA50KA/p1732043073171339?thread_ts=1732042778.209419&cid=C072LUA50KA

- A previous [CI
test](https://github.com/zipline-ai/chronon/actions/runs/11946764068/job/33301642119?pr=72
) shows that `other_spark_tests` intermittently fails due to a couple
reasons. This PR addresses the flakiness of [FeatureWithLabelJoinTest
.testFinalViewsWithAggLabel](
https://github.com/zipline-ai/chronon/blob/2cfe42d7d02b6ddd073e42f3d930dc64c93da219/spark/src/test/scala/ai/chronon/spark/test/FeatureWithLabelJoinTest.scala#L118),
where sometimes the test assertion fails with an unexpected result
value.

### Synopsis

Looks like during a rewrite/refactoring of the code, we did not preserve
the functionality. The diff starts to happen at the time of computing
label joins per partition range, in particular when we materialize the
label join and [scan it
back](https://github.com/zipline-ai/chronon/blob/fb876f547df3221f30d7850841dd568e6c62b264/spark/src/main/scala/ai/chronon/spark/LabelJoin.scala#L200).

In the OSS version, the
[scan](https://github.com/airbnb/chronon/blob/6968c5c29b6e48867f8c08f2b9b8281f09d47c16/spark/src/main/scala/ai/chronon/spark/LabelJoin.scala#L192-L193)
applies a [partition
filter](https://github.com/airbnb/chronon/blob/6968c5c29b6e48867f8c08f2b9b8281f09d47c16/spark/src/main/scala/ai/chronon/spark/DataRange.scala#L102-L104).
We dropped these partition filters during the
[refactoring](3719912#diff-57b1d6132977475fa0e87a71f017e66f4a7c94f466f911b33e9178598c6c058dL97-R102)
on Zipline side. As such, the physical plans produced by these two scans
are different:

```
// Zipline
== Physical Plan ==
*(1) ColumnarToRow
+- FileScan parquet spark_catalog.final_join.label_agg_table_listing_labels_agg[listing#53934L,is_active_max_5d#53935,label_ds#53936] Batched: true, DataFilters: [], Format: Parquet, Location: CatalogFileIndex(1 paths)[file:/tmp/chronon/spark-warehouse_6fcd3d/data/final_join.db/label_agg_t..., PartitionFilters: [], PushedFilters: [], ReadSchema: struct<listing:bigint,is_active_max_5d:int>
```

```
// OSS
== Physical Plan ==
Coalesce 1000
+- *(1) ColumnarToRow
   +- FileScan parquet final_join_xggqlu.label_agg_table_listing_labels_agg[listing#50981L,is_active_max_5d#50982,label_ds#50983] Batched: true, DataFilters: [], Format: Parquet, Location: InMemoryFileIndex[file:/tmp/chronon/spark-warehouse_69002f/data/final_join_xggqlu.db/label_agg_ta..., PartitionFilters: [isnotnull(label_ds#50983), (label_ds#50983 >= 2022-10-07), (label_ds#50983 <= 2022-10-07)], PushedFilters: [], ReadSchema: struct<listing:bigint,is_active_max_5d:int>
```

Note that OSS has a non-empty partition filter: `PartitionFilters:
[isnotnull(label_ds#50983), (label_ds#50983 >= 2022-10-07),
(label_ds#50983 <= 2022-10-07)]` where Zipline does not.

The fix is to add these partition filters back, as done in this PR. 





~### Abandoned Investigation~

~It looks like there is some non-determinism computing one of the
intermittent dataframes when computing label joins.
[`dropDuplicates`](https://github.com/zipline-ai/chronon/blob/2cfe42d7d02b6ddd073e42f3d930dc64c93da219/spark/src/main/scala/ai/chronon/spark/LabelJoin.scala#L215)
seems to be operating on a row compound key `rowIdentifier`, which
doesn't produce deterministic results. As such we sometimes lose the
expected values. This
[change](https://github.com/airbnb/chronon/pull/380/files#diff-2c74cac973e1af38b615f654fee5b0261594a2b0005ecfd5a8f0941b8e348eedR156)
was introduced in OSS upstream almost 2 years ago. This
[test](airbnb/chronon#435) was contributed a
couple months after .~


~See debugger local values comparison. The left side is test failure,
and right side is test success.~


~<img width="1074" alt="Screenshot 2024-11-21 at 9 26 04 AM"
src="https://github.com/user-attachments/assets/0eba555c-43ab-48a6-bf61-bbb7b4fa2445">~


~Removing the `dropDuplicates` call will allow the tests to pass.
However, unclear if this will produce the semantically correct behavior,
as the tests themselves seem~

## Checklist
- [x] Added Unit Tests
- [ ] Covered by existing CI
- [ ] Integration tested
- [ ] Documentation update



<!-- This is an auto-generated comment: release notes by coderabbit.ai
-->
## Summary by CodeRabbit

- **New Features**
- Reintroduced a testing method to validate label joins, ensuring
accuracy in data processing.

- **Improvements**
- Enhanced data retrieval logic for label joins, emphasizing unique
entries and clearer range specifications.
<!-- end of auto-generated comment: release notes by coderabbit.ai -->
chewy-zlai pushed a commit to zipline-ai/chronon that referenced this pull request May 16, 2025
## Summary

- https://app.asana.com/0/1208785567265389/1208812512114700

- This PR addresses some flaky unit test behavior that we've been
observing in the zipline fork. See:
https://zipline-2kh4520.slaour clients.com/archives/C072LUA50KA/p1732043073171339?thread_ts=1732042778.209419&cid=C072LUA50KA

- A previous [CI
test](https://github.com/zipline-ai/chronon/actions/runs/11946764068/job/33301642119?pr=72
) shows that `other_spark_tests` intermittently fails due to a couple
reasons. This PR addresses the flakiness of [FeatureWithLabelJoinTest
.testFinalViewsWithAggLabel](
https://github.com/zipline-ai/chronon/blob/2e2fe109a0c508a073b0969225aceedb11aeb040/spark/src/test/scala/ai/chronon/spark/test/FeatureWithLabelJoinTest.scala#L118),
where sometimes the test assertion fails with an unexpected result
value.

### Synopsis

Looks like during a rewrite/refactoring of the code, we did not preserve
the functionality. The diff starts to happen at the time of computing
label joins per partition range, in particular when we materialize the
label join and [scan it
baour clients](https://github.com/zipline-ai/chronon/blob/2dc38dc20ec3f19978aefaca31163145de57f03a/spark/src/main/scala/ai/chronon/spark/LabelJoin.scala#L200).

In the OSS version, the
[scan](https://github.com/airbnb/chronon/blob/6968c5c29b6e48867f8c08f2b9b8281f09d47c16/spark/src/main/scala/ai/chronon/spark/LabelJoin.scala#L192-L193)
applies a [partition
filter](https://github.com/airbnb/chronon/blob/6968c5c29b6e48867f8c08f2b9b8281f09d47c16/spark/src/main/scala/ai/chronon/spark/DataRange.scala#L102-L104).
We dropped these partition filters during the
[refactoring](7cdb6ba#diff-57b1d6132977475fa0e87a71f017e66f4a7c94f466f911b33e9178598c6c058dL97-R102)
on Zipline side. As such, the physical plans produced by these two scans
are different:

```
// Zipline
== Physical Plan ==
*(1) ColumnarToRow
+- FileScan parquet spark_catalog.final_join.label_agg_table_listing_labels_agg[listing#53934L,is_active_max_5d#53935,label_ds#53936] Batched: true, DataFilters: [], Format: Parquet, Location: CatalogFileIndex(1 paths)[file:/tmp/chronon/spark-warehouse_6fcd3d/data/final_join.db/label_agg_t..., PartitionFilters: [], PushedFilters: [], ReadSchema: struct<listing:bigint,is_active_max_5d:int>
```

```
// OSS
== Physical Plan ==
Coalesce 1000
+- *(1) ColumnarToRow
   +- FileScan parquet final_join_xggqlu.label_agg_table_listing_labels_agg[listing#50981L,is_active_max_5d#50982,label_ds#50983] Batched: true, DataFilters: [], Format: Parquet, Location: InMemoryFileIndex[file:/tmp/chronon/spark-warehouse_69002f/data/final_join_xggqlu.db/label_agg_ta..., PartitionFilters: [isnotnull(label_ds#50983), (label_ds#50983 >= 2022-10-07), (label_ds#50983 <= 2022-10-07)], PushedFilters: [], ReadSchema: struct<listing:bigint,is_active_max_5d:int>
```

Note that OSS has a non-empty partition filter: `PartitionFilters:
[isnotnull(label_ds#50983), (label_ds#50983 >= 2022-10-07),
(label_ds#50983 <= 2022-10-07)]` where Zipline does not.

The fix is to add these partition filters baour clients, as done in this PR. 





~### Abandoned Investigation~

~It looks like there is some non-determinism computing one of the
intermittent dataframes when computing label joins.
[`dropDuplicates`](https://github.com/zipline-ai/chronon/blob/2e2fe109a0c508a073b0969225aceedb11aeb040/spark/src/main/scala/ai/chronon/spark/LabelJoin.scala#L215)
seems to be operating on a row compound key `rowIdentifier`, which
doesn't produce deterministic results. As such we sometimes lose the
expected values. This
[change](https://github.com/airbnb/chronon/pull/380/files#diff-2c74cac973e1af38b615f654fee5b0261594a2b0005ecfd5a8f0941b8e348eedR156)
was introduced in OSS upstream almost 2 years ago. This
[test](airbnb/chronon#435) was contributed a
couple months after .~


~See debugger local values comparison. The left side is test failure,
and right side is test success.~


~<img width="1074" alt="Screenshot 2024-11-21 at 9 26 04 AM"
src="https://github.com/user-attachments/assets/0eba555c-43ab-48a6-bf61-bbb7b4fa2445">~


~Removing the `dropDuplicates` call will allow the tests to pass.
However, unclear if this will produce the semantically correct behavior,
as the tests themselves seem~

## Cheour clientslist
- [x] Added Unit Tests
- [ ] Covered by existing CI
- [ ] Integration tested
- [ ] Documentation update



<!-- This is an auto-generated comment: release notes by coderabbit.ai
-->
## Summary by CodeRabbit

- **New Features**
- Reintroduced a testing method to validate label joins, ensuring
accuracy in data processing.

- **Improvements**
- Enhanced data retrieval logic for label joins, emphasizing unique
entries and clearer range specifications.
<!-- end of auto-generated comment: release notes by coderabbit.ai -->
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

4 participants