Skip to content
Merged
Show file tree
Hide file tree
Changes from 10 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
29 changes: 25 additions & 4 deletions api/py/ai/chronon/join.py
Original file line number Diff line number Diff line change
Expand Up @@ -237,12 +237,18 @@ def LabelPart(labels: List[api.JoinPart],
label versions based on the label ds. Label definition can be updated along the way but label join
job can only accommodate the changes going forward unless a backfill is manually triggered

Label aggregation is supported with conditions applied. Single aggregation with one window is allowed.
If aggregation is present, we would infer the left_start_offset and left_end_offset based on the window
and the param input will be ignored.

:param labels: List of labels
:param left_start_offset: Integer to define the earliest date label should be refreshed
comparing to label_ds date specified
:param left_end_offset: Integer to define the most recent date label should be refreshed.
:param left_start_offset: Integer to define the earliest date(inclusive) label should be refreshed
comparing to label_ds date specified. For labels with aggregations,
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This file doesn't have label_ds defined. The label join is complicated, maybe add an example in your doc to explain these ds, offset.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good point will do

this param will be inferred from aggregation window and no need to pass in.
:param left_end_offset: Integer to define the most recent date(inclusive) label should be refreshed.
e.g. left_end_offset = 3 most recent label available will be 3 days
prior to 'label_ds'
prior to 'label_ds' (including `label_ds`). For labels with aggregations, this param will be
inferred from aggregation window and no need to pass in.
:param label_offline_schedule: Cron expression for Airflow to schedule a DAG for offline
label join compute tasks
"""
Expand All @@ -251,6 +257,21 @@ def LabelPart(labels: List[api.JoinPart],
offlineSchedule=label_offline_schedule
)

for label in labels:
if label.groupBy.aggregations is not None:
assert len(labels) == 1, "Multiple label joinPart is not supported yet"
valid_agg = (len(label.groupBy.aggregations) == 1
and label.groupBy.aggregations[0].windows is not None
and len(label.groupBy.aggregations[0].windows) == 1)
assert valid_agg, "Too many aggregations or invalid windows found. " \
"Single aggregation with one window allowed."
valid_time_unit = label.groupBy.aggregations[0].windows[0].timeUnit == api.TimeUnit.DAYS
assert valid_time_unit, "Label aggregation window unit must be DAYS"
window_size = label.groupBy.aggregations[0].windows[0].length
if left_start_offset != window_size or left_start_offset != left_end_offset:
print(f"""WARNING: left_start_offset and left_end_offset will be inferred to be same as aggregation \
window and the given values {left_start_offset} and {left_end_offset} be ignored. """)

return api.LabelPart(
labels=labels,
leftStartOffset=left_start_offset,
Expand Down
4 changes: 3 additions & 1 deletion api/py/test/sample/joins/sample_team/sample_label_join.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,9 @@
)

from ai.chronon.join import Join, JoinPart, LabelPart
from ai.chronon.group_by import GroupBy
from ai.chronon.group_by import (
GroupBy,
)

label_part_group_by = GroupBy(
name="sample_label_group_by",
Expand Down
59 changes: 59 additions & 0 deletions api/py/test/sample/joins/sample_team/sample_label_join_with_agg.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
"""
Sample Label Join
"""
from sources import test_sources
from group_bys.sample_team import (
event_sample_group_by,
entity_sample_group_by_from_module,
group_by_with_kwargs,
)

from ai.chronon.join import Join, JoinPart, LabelPart
from ai.chronon.group_by import (
GroupBy,
Aggregation,
Operation,
Window,
TimeUnit,
)

label_part_group_by = GroupBy(
name="sample_label_group_by",
sources=test_sources.entity_source,
keys=["group_by_subject"],
aggregations=[
Aggregation(input_column="group_by_subject", operation=Operation.SUM, windows=[Window(7, TimeUnit.DAYS)]),
],
online=False,
)

v1 = Join(
left=test_sources.event_source,
output_namespace="sample_namespace",
right_parts=[
JoinPart(
group_by=event_sample_group_by.v1,
key_mapping={'subject': 'group_by_subject'},
),
JoinPart(
group_by=group_by_with_kwargs.v1,
key_mapping={'subject': 'group_by_subject'},
),
],
label_part=LabelPart([
JoinPart(
group_by=label_part_group_by
),
],
left_start_offset=7,
left_end_offset=7,
label_offline_schedule="@weekly"
),
additional_args={
'custom_arg': 'custom_value'
},
additional_env={
'custom_env': 'custom_env_value'
},
online=False
)
Original file line number Diff line number Diff line change
@@ -0,0 +1,249 @@
{
"metaData": {
"name": "sample_team.sample_label_join_with_agg.v1",
"online": 0,
"production": 0,
"customJson": "{\"check_consistency\": false, \"lag\": 0, \"additional_args\": {\"custom_arg\": \"custom_value\"}, \"additional_env\": {\"custom_env\": \"custom_env_value\"}, \"join_tags\": null, \"join_part_tags\": {}}",
"dependencies": [
"{\"name\": \"wait_for_sample_namespace.sample_table_group_by_ds\", \"spec\": \"sample_namespace.sample_table_group_by/ds={{ ds }}\", \"start\": \"2021-04-09\", \"end\": null}",
"{\"name\": \"wait_for_sample_namespace.sample_table_group_by_ds\", \"spec\": \"sample_namespace.sample_table_group_by/ds={{ ds }}\", \"start\": \"2021-03-01\", \"end\": \"2021-04-09\"}",
"{\"name\": \"wait_for_sample_namespace.another_sample_table_group_by_ds\", \"spec\": \"sample_namespace.another_sample_table_group_by/ds={{ ds }}\", \"start\": \"2021-03-01\", \"end\": null}"
],
"tableProperties": {
"source": "chronon"
},
"outputNamespace": "sample_namespace",
"team": "sample_team",
"samplePercent": 100.0,
"offlineSchedule": "@daily"
},
"left": {
"events": {
"table": "sample_namespace.sample_table_group_by",
"query": {
"selects": {
"event": "event_expr",
"group_by_subject": "group_by_expr",
"ts": "ts"
},
"startPartition": "2021-04-09",
"timeColumn": "ts",
"setups": []
}
}
},
"joinParts": [
{
"groupBy": {
"metaData": {
"name": "sample_team.event_sample_group_by.v1",
"online": 1,
"customJson": "{\"lag\": 0, \"groupby_tags\": {\"TO_DEPRECATE\": true}, \"column_tags\": {\"event_sum_7d\": {\"DETAILED_TYPE\": \"CONTINUOUS\"}}}",
"dependencies": [
"{\"name\": \"wait_for_sample_namespace.sample_table_group_by_ds\", \"spec\": \"sample_namespace.sample_table_group_by/ds={{ ds }}\", \"start\": \"2021-04-09\", \"end\": null}"
],
"tableProperties": {
"source": "chronon"
},
"outputNamespace": "sample_namespace",
"team": "sample_team",
"offlineSchedule": "@daily"
},
"sources": [
{
"events": {
"table": "sample_namespace.sample_table_group_by",
"query": {
"selects": {
"event": "event_expr",
"group_by_subject": "group_by_expr"
},
"startPartition": "2021-04-09",
"timeColumn": "ts",
"setups": []
}
}
}
],
"keyColumns": [
"group_by_subject"
],
"aggregations": [
{
"inputColumn": "event",
"operation": 7,
"argMap": {},
"windows": [
{
"length": 7,
"timeUnit": 1
}
]
},
{
"inputColumn": "event",
"operation": 7,
"argMap": {}
},
{
"inputColumn": "event",
"operation": 12,
"argMap": {
"k": "200",
"percentiles": "[0.99, 0.95, 0.5]"
}
}
]
},
"keyMapping": {
"subject": "group_by_subject"
}
},
{
"groupBy": {
"metaData": {
"name": "sample_team.group_by_with_kwargs.v1",
"online": 1,
"customJson": "{\"additional_argument\": \"To be placed in customJson\", \"lag\": 0, \"groupby_tags\": null, \"column_tags\": {}}",
"dependencies": [
"{\"name\": \"wait_for_sample_namespace.sample_table_group_by_ds\", \"spec\": \"sample_namespace.sample_table_group_by/ds={{ ds }}\", \"start\": \"2021-03-01\", \"end\": \"2021-04-09\"}",
"{\"name\": \"wait_for_sample_namespace.another_sample_table_group_by_ds\", \"spec\": \"sample_namespace.another_sample_table_group_by/ds={{ ds }}\", \"start\": \"2021-03-01\", \"end\": null}"
],
"tableProperties": {
"source": "chronon"
},
"outputNamespace": "chronon_db",
"team": "sample_team",
"offlineSchedule": "@daily"
},
"sources": [
{
"events": {
"table": "sample_namespace.sample_table_group_by",
"query": {
"selects": {
"group_by_subject": "group_by_subject_expr_old_version",
"event": "event_expr_old_version"
},
"startPartition": "2021-03-01",
"endPartition": "2021-04-09",
"timeColumn": "UNIX_TIMESTAMP(ts) * 1000",
"setups": []
}
}
},
{
"events": {
"table": "sample_namespace.another_sample_table_group_by",
"query": {
"selects": {
"group_by_subject": "possibly_different_group_by_subject_expr",
"event": "possibly_different_event_expr"
},
"startPartition": "2021-03-01",
"timeColumn": "__timestamp",
"setups": []
}
}
}
],
"keyColumns": [
"group_by_subject"
],
"aggregations": [
{
"inputColumn": "event",
"operation": 7,
"argMap": {}
},
{
"inputColumn": "event",
"operation": 12,
"argMap": {
"k": "128",
"percentiles": "[0.5]"
}
},
{
"inputColumn": "event",
"operation": 7,
"argMap": {},
"windows": [
{
"length": 7,
"timeUnit": 1
}
]
}
]
},
"keyMapping": {
"subject": "group_by_subject"
}
}
],
"labelPart": {
"labels": [
{
"groupBy": {
"metaData": {
"name": "sample_label_group_by",
"online": 0,
"customJson": "{\"lag\": 0, \"groupby_tags\": null, \"column_tags\": {}}",
"dependencies": [
"{\"name\": \"wait_for_sample_table.sample_entity_snapshot_ds\", \"spec\": \"sample_table.sample_entity_snapshot/ds={{ ds }}\", \"start\": \"2021-03-01\", \"end\": null}",
"{\"name\": \"wait_for_sample_table.sample_entity_mutations_ds\", \"spec\": \"sample_table.sample_entity_mutations/ds={{ ds }}/hr=00:00\", \"start\": \"2021-03-01\", \"end\": null}"
],
"team": "sample_team",
"offlineSchedule": "@daily"
},
"sources": [
{
"entities": {
"snapshotTable": "sample_table.sample_entity_snapshot",
"mutationTable": "sample_table.sample_entity_mutations/hr=00:00",
"mutationTopic": "sample_topic",
"query": {
"selects": {
"group_by_subject": "group_by_subject_expr",
"entity": "entity_expr"
},
"startPartition": "2021-03-01",
"timeColumn": "ts",
"setups": []
}
}
}
],
"keyColumns": [
"group_by_subject"
],
"aggregations": [
{
"inputColumn": "group_by_subject",
"operation": 7,
"argMap": {},
"windows": [
{
"length": 7,
"timeUnit": 1
}
]
}
]
}
}
],
"leftStartOffset": 7,
"leftEndOffset": 7,
"metaData": {
"dependencies": [
"{\"name\": \"wait_for_sample_namespace.sample_table_group_by_ds\", \"spec\": \"sample_namespace.sample_table_group_by/ds={{ ds }}\", \"start\": \"2021-04-09\", \"end\": null}",
"{\"name\": \"wait_for_sample_table.sample_entity_snapshot_ds\", \"spec\": \"sample_table.sample_entity_snapshot/ds={{ ds }}\", \"start\": \"2021-03-01\", \"end\": null}",
"{\"name\": \"wait_for_sample_table.sample_entity_mutations_ds\", \"spec\": \"sample_table.sample_entity_mutations/ds={{ ds }}/hr=00:00\", \"start\": \"2021-03-01\", \"end\": null}",
"{\"name\": \"wait_for_sample_namespace.sample_team_sample_label_join_with_agg_v1\", \"spec\": \"sample_namespace.sample_team_sample_label_join_with_agg_v1/ds={{ ds }}\"}"
],
"offlineSchedule": "@weekly"
}
}
}
12 changes: 7 additions & 5 deletions spark/src/main/scala/ai/chronon/spark/DataRange.scala
Original file line number Diff line number Diff line change
Expand Up @@ -52,9 +52,9 @@ case class PartitionRange(start: String, end: String) extends DataRange with Ord
.toArray
}

def whereClauses: Seq[String] = {
val startClause = Option(start).map(s"${Constants.PartitionColumn} >= '" + _ + "'")
val endClause = Option(end).map(s"${Constants.PartitionColumn} <= '" + _ + "'")
def whereClauses(partitionColumn: String = Constants.PartitionColumn): Seq[String] = {
val startClause = Option(start).map(s"${partitionColumn} >= '" + _ + "'")
val endClause = Option(end).map(s"${partitionColumn} <= '" + _ + "'")
(startClause ++ endClause).toSeq
}

Expand All @@ -70,10 +70,12 @@ case class PartitionRange(start: String, end: String) extends DataRange with Ord
}
}

def genScanQuery(query: Query, table: String, fillIfAbsent: Map[String, String] = Map.empty): String = {
def genScanQuery(query: Query, table: String,
fillIfAbsent: Map[String, String] = Map.empty,
partitionColumn: String = Constants.PartitionColumn): String = {
val queryOpt = Option(query)
val wheres =
whereClauses ++ queryOpt.flatMap(q => Option(q.wheres).map(_.asScala)).getOrElse(Seq.empty[String])
whereClauses(partitionColumn) ++ queryOpt.flatMap(q => Option(q.wheres).map(_.asScala)).getOrElse(Seq.empty[String])
QueryUtils.build(selects = queryOpt.map { query => Option(query.selects).map(_.asScala.toMap).orNull }.orNull,
from = table,
wheres = wheres,
Expand Down
Loading