Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
44 changes: 37 additions & 7 deletions api/py/ai/chronon/join.py
Original file line number Diff line number Diff line change
Expand Up @@ -233,16 +233,31 @@ def LabelPart(labels: List[api.JoinPart],
labels with features in the training window user specified using `leftStartOffset` and
`leftEndOffset`.

Labels will be refreshed within this window given a label ds. As a result, there could be multiple
label versions based on the label ds. Label definition can be updated along the way but label join
job can only accommodate the changes going forward unless a backfill is manually triggered
The offsets are relative days compared to given label landing date `label_ds`. This parameter is required to be
passed in for each label join job. For example, given `label_ds = 2023-04-30`, `left_start_offset = 30`, and
`left_end_offset = 10`, the left size start date will be computed as 30 days before `label_ds` (inclusive),
which is 2023-04-01. Similarly, the left end date will be 2023-04-21. Labels will be refreshed within this window
[2023-04-01, 2023-04-21] in this specific label job run.

Since label join job will run continuously based on the schedule, multiple labels could be generated but with
different label_ds or label version. Label join job would have all computed label versions available, as well as
a view of latest version for easy label retrieval.

LabelPart definition can be updated along the way, but label join job can only accommodate these changes going
forward unless a backfill is manually triggered.

Label aggregation is also supported but with conditions applied. Single aggregation with one window is allowed
for now. If aggregation is present, we would infer the left_start_offset and left_end_offset same as window size
and the param input will be ignored.

:param labels: List of labels
:param left_start_offset: Integer to define the earliest date label should be refreshed
comparing to label_ds date specified
:param left_end_offset: Integer to define the most recent date label should be refreshed.
:param left_start_offset: Relative integer to define the earliest date label should be refreshed
comparing to label_ds date specified. For labels with aggregations,
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This file doesn't have label_ds defined. The label join is complicated, maybe add an example in your doc to explain these ds, offset.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good point will do

this param will be inferred from aggregation window and no need to pass in.
:param left_end_offset: Relative integer to define the most recent date(inclusive) label should be refreshed.
e.g. left_end_offset = 3 most recent label available will be 3 days
prior to 'label_ds'
prior to 'label_ds' (including `label_ds`). For labels with aggregations, this param will be
inferred from aggregation window and no need to pass in.
:param label_offline_schedule: Cron expression for Airflow to schedule a DAG for offline
label join compute tasks
"""
Expand All @@ -251,6 +266,21 @@ def LabelPart(labels: List[api.JoinPart],
offlineSchedule=label_offline_schedule
)

for label in labels:
if label.groupBy.aggregations is not None:
assert len(labels) == 1, "Multiple label joinPart is not supported yet"
valid_agg = (len(label.groupBy.aggregations) == 1
and label.groupBy.aggregations[0].windows is not None
and len(label.groupBy.aggregations[0].windows) == 1)
assert valid_agg, "Too many aggregations or invalid windows found. " \
"Single aggregation with one window allowed."
valid_time_unit = label.groupBy.aggregations[0].windows[0].timeUnit == api.TimeUnit.DAYS
assert valid_time_unit, "Label aggregation window unit must be DAYS"
window_size = label.groupBy.aggregations[0].windows[0].length
if left_start_offset != window_size or left_start_offset != left_end_offset:
print(f"""WARNING: left_start_offset and left_end_offset will be inferred to be same as aggregation \
window and the given values {left_start_offset} and {left_end_offset} be ignored. """)

return api.LabelPart(
labels=labels,
leftStartOffset=left_start_offset,
Expand Down
4 changes: 3 additions & 1 deletion api/py/test/sample/joins/sample_team/sample_label_join.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,9 @@
)

from ai.chronon.join import Join, JoinPart, LabelPart
from ai.chronon.group_by import GroupBy
from ai.chronon.group_by import (
GroupBy,
)

label_part_group_by = GroupBy(
name="sample_label_group_by",
Expand Down
59 changes: 59 additions & 0 deletions api/py/test/sample/joins/sample_team/sample_label_join_with_agg.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
"""
Sample Label Join
"""
from sources import test_sources
from group_bys.sample_team import (
event_sample_group_by,
entity_sample_group_by_from_module,
group_by_with_kwargs,
)

from ai.chronon.join import Join, JoinPart, LabelPart
from ai.chronon.group_by import (
GroupBy,
Aggregation,
Operation,
Window,
TimeUnit,
)

label_part_group_by = GroupBy(
name="sample_label_group_by",
sources=test_sources.entity_source,
keys=["group_by_subject"],
aggregations=[
Aggregation(input_column="group_by_subject", operation=Operation.SUM, windows=[Window(7, TimeUnit.DAYS)]),
],
online=False,
)

v1 = Join(
left=test_sources.event_source,
output_namespace="sample_namespace",
right_parts=[
JoinPart(
group_by=event_sample_group_by.v1,
key_mapping={'subject': 'group_by_subject'},
),
JoinPart(
group_by=group_by_with_kwargs.v1,
key_mapping={'subject': 'group_by_subject'},
),
],
label_part=LabelPart([
JoinPart(
group_by=label_part_group_by
),
],
left_start_offset=7,
left_end_offset=7,
label_offline_schedule="@weekly"
),
additional_args={
'custom_arg': 'custom_value'
},
additional_env={
'custom_env': 'custom_env_value'
},
online=False
)
Original file line number Diff line number Diff line change
@@ -0,0 +1,249 @@
{
"metaData": {
"name": "sample_team.sample_label_join_with_agg.v1",
"online": 0,
"production": 0,
"customJson": "{\"check_consistency\": false, \"lag\": 0, \"additional_args\": {\"custom_arg\": \"custom_value\"}, \"additional_env\": {\"custom_env\": \"custom_env_value\"}, \"join_tags\": null, \"join_part_tags\": {}}",
"dependencies": [
"{\"name\": \"wait_for_sample_namespace.sample_table_group_by_ds\", \"spec\": \"sample_namespace.sample_table_group_by/ds={{ ds }}\", \"start\": \"2021-04-09\", \"end\": null}",
"{\"name\": \"wait_for_sample_namespace.sample_table_group_by_ds\", \"spec\": \"sample_namespace.sample_table_group_by/ds={{ ds }}\", \"start\": \"2021-03-01\", \"end\": \"2021-04-09\"}",
"{\"name\": \"wait_for_sample_namespace.another_sample_table_group_by_ds\", \"spec\": \"sample_namespace.another_sample_table_group_by/ds={{ ds }}\", \"start\": \"2021-03-01\", \"end\": null}"
],
"tableProperties": {
"source": "chronon"
},
"outputNamespace": "sample_namespace",
"team": "sample_team",
"samplePercent": 100.0,
"offlineSchedule": "@daily"
},
"left": {
"events": {
"table": "sample_namespace.sample_table_group_by",
"query": {
"selects": {
"event": "event_expr",
"group_by_subject": "group_by_expr",
"ts": "ts"
},
"startPartition": "2021-04-09",
"timeColumn": "ts",
"setups": []
}
}
},
"joinParts": [
{
"groupBy": {
"metaData": {
"name": "sample_team.event_sample_group_by.v1",
"online": 1,
"customJson": "{\"lag\": 0, \"groupby_tags\": {\"TO_DEPRECATE\": true}, \"column_tags\": {\"event_sum_7d\": {\"DETAILED_TYPE\": \"CONTINUOUS\"}}}",
"dependencies": [
"{\"name\": \"wait_for_sample_namespace.sample_table_group_by_ds\", \"spec\": \"sample_namespace.sample_table_group_by/ds={{ ds }}\", \"start\": \"2021-04-09\", \"end\": null}"
],
"tableProperties": {
"source": "chronon"
},
"outputNamespace": "sample_namespace",
"team": "sample_team",
"offlineSchedule": "@daily"
},
"sources": [
{
"events": {
"table": "sample_namespace.sample_table_group_by",
"query": {
"selects": {
"event": "event_expr",
"group_by_subject": "group_by_expr"
},
"startPartition": "2021-04-09",
"timeColumn": "ts",
"setups": []
}
}
}
],
"keyColumns": [
"group_by_subject"
],
"aggregations": [
{
"inputColumn": "event",
"operation": 7,
"argMap": {},
"windows": [
{
"length": 7,
"timeUnit": 1
}
]
},
{
"inputColumn": "event",
"operation": 7,
"argMap": {}
},
{
"inputColumn": "event",
"operation": 12,
"argMap": {
"k": "200",
"percentiles": "[0.99, 0.95, 0.5]"
}
}
]
},
"keyMapping": {
"subject": "group_by_subject"
}
},
{
"groupBy": {
"metaData": {
"name": "sample_team.group_by_with_kwargs.v1",
"online": 1,
"customJson": "{\"additional_argument\": \"To be placed in customJson\", \"lag\": 0, \"groupby_tags\": null, \"column_tags\": {}}",
"dependencies": [
"{\"name\": \"wait_for_sample_namespace.sample_table_group_by_ds\", \"spec\": \"sample_namespace.sample_table_group_by/ds={{ ds }}\", \"start\": \"2021-03-01\", \"end\": \"2021-04-09\"}",
"{\"name\": \"wait_for_sample_namespace.another_sample_table_group_by_ds\", \"spec\": \"sample_namespace.another_sample_table_group_by/ds={{ ds }}\", \"start\": \"2021-03-01\", \"end\": null}"
],
"tableProperties": {
"source": "chronon"
},
"outputNamespace": "chronon_db",
"team": "sample_team",
"offlineSchedule": "@daily"
},
"sources": [
{
"events": {
"table": "sample_namespace.sample_table_group_by",
"query": {
"selects": {
"group_by_subject": "group_by_subject_expr_old_version",
"event": "event_expr_old_version"
},
"startPartition": "2021-03-01",
"endPartition": "2021-04-09",
"timeColumn": "UNIX_TIMESTAMP(ts) * 1000",
"setups": []
}
}
},
{
"events": {
"table": "sample_namespace.another_sample_table_group_by",
"query": {
"selects": {
"group_by_subject": "possibly_different_group_by_subject_expr",
"event": "possibly_different_event_expr"
},
"startPartition": "2021-03-01",
"timeColumn": "__timestamp",
"setups": []
}
}
}
],
"keyColumns": [
"group_by_subject"
],
"aggregations": [
{
"inputColumn": "event",
"operation": 7,
"argMap": {}
},
{
"inputColumn": "event",
"operation": 12,
"argMap": {
"k": "128",
"percentiles": "[0.5]"
}
},
{
"inputColumn": "event",
"operation": 7,
"argMap": {},
"windows": [
{
"length": 7,
"timeUnit": 1
}
]
}
]
},
"keyMapping": {
"subject": "group_by_subject"
}
}
],
"labelPart": {
"labels": [
{
"groupBy": {
"metaData": {
"name": "sample_label_group_by",
"online": 0,
"customJson": "{\"lag\": 0, \"groupby_tags\": null, \"column_tags\": {}}",
"dependencies": [
"{\"name\": \"wait_for_sample_table.sample_entity_snapshot_ds\", \"spec\": \"sample_table.sample_entity_snapshot/ds={{ ds }}\", \"start\": \"2021-03-01\", \"end\": null}",
"{\"name\": \"wait_for_sample_table.sample_entity_mutations_ds\", \"spec\": \"sample_table.sample_entity_mutations/ds={{ ds }}/hr=00:00\", \"start\": \"2021-03-01\", \"end\": null}"
],
"team": "sample_team",
"offlineSchedule": "@daily"
},
"sources": [
{
"entities": {
"snapshotTable": "sample_table.sample_entity_snapshot",
"mutationTable": "sample_table.sample_entity_mutations/hr=00:00",
"mutationTopic": "sample_topic",
"query": {
"selects": {
"group_by_subject": "group_by_subject_expr",
"entity": "entity_expr"
},
"startPartition": "2021-03-01",
"timeColumn": "ts",
"setups": []
}
}
}
],
"keyColumns": [
"group_by_subject"
],
"aggregations": [
{
"inputColumn": "group_by_subject",
"operation": 7,
"argMap": {},
"windows": [
{
"length": 7,
"timeUnit": 1
}
]
}
]
}
}
],
"leftStartOffset": 7,
"leftEndOffset": 7,
"metaData": {
"dependencies": [
"{\"name\": \"wait_for_sample_namespace.sample_table_group_by_ds\", \"spec\": \"sample_namespace.sample_table_group_by/ds={{ ds }}\", \"start\": \"2021-04-09\", \"end\": null}",
"{\"name\": \"wait_for_sample_table.sample_entity_snapshot_ds\", \"spec\": \"sample_table.sample_entity_snapshot/ds={{ ds }}\", \"start\": \"2021-03-01\", \"end\": null}",
"{\"name\": \"wait_for_sample_table.sample_entity_mutations_ds\", \"spec\": \"sample_table.sample_entity_mutations/ds={{ ds }}/hr=00:00\", \"start\": \"2021-03-01\", \"end\": null}",
"{\"name\": \"wait_for_sample_namespace.sample_team_sample_label_join_with_agg_v1\", \"spec\": \"sample_namespace.sample_team_sample_label_join_with_agg_v1/ds={{ ds }}\"}"
],
"offlineSchedule": "@weekly"
}
}
}
Loading