Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 12 additions & 2 deletions api/python/ai/chronon/join.py
Original file line number Diff line number Diff line change
Expand Up @@ -356,7 +356,7 @@ def Join(
left: api.Source,
right_parts: List[api.JoinPart],
version: int,
row_ids: Union[str, List[str]],
row_ids: Union[str, List[str]] = None,
online_external_parts: List[api.ExternalPart] = None,
bootstrap_parts: List[api.BootstrapPart] = None,
bootstrap_from_log: bool = False,
Expand Down Expand Up @@ -478,7 +478,10 @@ def Join(
# create a deep copy for case: multiple LeftOuterJoin use the same left,
# validation will fail after the first iteration
updated_left = copy.deepcopy(left)
if left.events and left.events.query.selects:

selects = None
if left.events:
selects = left.events.query.selects
assert "ts" not in left.events.query.selects.keys(), (
"'ts' is a reserved key word for Chronon,"
" please specify the expression in timeColumn"
Expand All @@ -487,6 +490,13 @@ def Join(
updated_left.events.query.selects.update(
{"ts": updated_left.events.query.timeColumn}
)
elif left.entities:
selects = left.entities.query.selects

if selects:
# For JoinSource, we can rely on the validation at the base join level
# TODO add more docs about row ID and link here
assert "row_id" in selects, "Left side of the join must contain `row_id` as a column."

if label_part:
label_metadata = api.MetaData(
Expand Down
2 changes: 1 addition & 1 deletion api/python/ai/chronon/query.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@


def Query(
selects: Dict[str, str] = None,
selects: Dict[str, str],
wheres: List[str] = None,
start_partition: str = None,
end_partition: str = None,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
"user_id_purchase_price_last10": "5609e04d61a47a8cafc67970785a3a59",
"listing_id": "f2f6c814b8ae1521176b22a1ae7f2d0d",
"user_id": "493d3df28f80664abd11b19fcd33b6e6",
"row_id": "6c80b6474532731fdb20d0a43bcb287f",
"ts": "ad9fd4c611e20ad833819a4ce9d752bf"
},
"online": 1,
Expand Down Expand Up @@ -144,6 +145,7 @@
"selects": {
"listing_id": "EXPLODE(TRANSFORM(SPLIT(COALESCE(attributes['sold_listing_ids'], attributes['listing_id']), ','), e -> CAST(e AS LONG)))",
"user_id": "attributes['user_id']",
"row_id": "request_UUID",
"ts": "timestamp"
},
"timeColumn": "timestamp"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
"user_id_purchase_price_last10": "5609e04d61a47a8cafc67970785a3a59",
"listing_id": "f2f6c814b8ae1521176b22a1ae7f2d0d",
"user_id": "493d3df28f80664abd11b19fcd33b6e6",
"row_id": "6c80b6474532731fdb20d0a43bcb287f",
"ts": "ad9fd4c611e20ad833819a4ce9d752bf"
},
"online": 1,
Expand Down Expand Up @@ -148,6 +149,7 @@
"selects": {
"listing_id": "EXPLODE(TRANSFORM(SPLIT(COALESCE(attributes['sold_listing_ids'], attributes['listing_id']), ','), e -> CAST(e AS LONG)))",
"user_id": "attributes['user_id']",
"row_id": "request_UUID",
"ts": "timestamp"
},
"timeColumn": "timestamp"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
"listing_id_favorite_sum_1d": "bb07e39a4b7ea81aca5baf027fd8f555",
"listing_id": "f2f6c814b8ae1521176b22a1ae7f2d0d",
"user_id": "493d3df28f80664abd11b19fcd33b6e6",
"row_id": "6c80b6474532731fdb20d0a43bcb287f",
"ts": "ad9fd4c611e20ad833819a4ce9d752bf"
},
"online": 1,
Expand Down Expand Up @@ -138,6 +139,7 @@
"selects": {
"listing_id": "EXPLODE(TRANSFORM(SPLIT(COALESCE(attributes['sold_listing_ids'], attributes['listing_id']), ','), e -> CAST(e AS LONG)))",
"user_id": "attributes['user_id']",
"row_id": "request_UUID",
"ts": "timestamp"
},
"timeColumn": "timestamp"
Expand Down Expand Up @@ -251,8 +253,5 @@
"useLongNames": 0
}
],
"rowIds": [
"user_id"
],
"useLongNames": 0
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
"user_id_purchase_price_average_7d": "e3351cc3265d17cfd1b99f8da7fb083d",
"user_id_purchase_price_last10": "e3351cc3265d17cfd1b99f8da7fb083d",
"user_id": "a673c80ffc3260ee5ca173869ba07f7c",
"row_id": "cd335c9bb6f12c6e71b482b33109e07c",
"ts": "a173b2adccfe3c386f262c4cd7ffe9bd"
},
"online": 0,
Expand Down Expand Up @@ -142,6 +143,7 @@
"query": {
"selects": {
"user_id": "user_id",
"row_id": "request_UUID",
"ts": "ts"
},
"startPartition": "2023-11-01",
Expand Down Expand Up @@ -265,8 +267,5 @@
"useLongNames": 0
}
],
"rowIds": [
"user_id"
],
"useLongNames": 0
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
"user_id_purchase_price_average_7d": "1d85a493771d48ac9fb6581c18f186e2",
"user_id_purchase_price_last10": "1d85a493771d48ac9fb6581c18f186e2",
"user_id": "fbc25deb4a5dd5d9c1d6e960c3922d0e",
"row_id": "70230580430bc568b5ab415e20b802dc",
"ts": "c4364195d7639fb8b71e2f87ee017d4d"
},
"online": 0,
Expand Down Expand Up @@ -142,6 +143,7 @@
"query": {
"selects": {
"user_id": "user_id",
"row_id": "request_UUID",
"ts": "ts"
},
"timeColumn": "ts",
Expand Down Expand Up @@ -266,8 +268,5 @@
"useLongNames": 0
}
],
"rowIds": [
"user_id"
],
"useLongNames": 0
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
"user_id_purchase_price_average_7d": "e3351cc3265d17cfd1b99f8da7fb083d",
"user_id_purchase_price_last10": "e3351cc3265d17cfd1b99f8da7fb083d",
"user_id": "a673c80ffc3260ee5ca173869ba07f7c",
"row_id": "cd335c9bb6f12c6e71b482b33109e07c",
"ts": "a173b2adccfe3c386f262c4cd7ffe9bd"
},
"online": 0,
Expand Down Expand Up @@ -142,6 +143,7 @@
"query": {
"selects": {
"user_id": "user_id",
"row_id": "request_UUID",
"ts": "ts"
},
"startPartition": "2023-11-01",
Expand Down Expand Up @@ -265,8 +267,5 @@
"useLongNames": 0
}
],
"rowIds": [
"user_id"
],
"useLongNames": 0
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
"user_id_purchase_price_average_7d": "1d85a493771d48ac9fb6581c18f186e2",
"user_id_purchase_price_last10": "1d85a493771d48ac9fb6581c18f186e2",
"user_id": "fbc25deb4a5dd5d9c1d6e960c3922d0e",
"row_id": "70230580430bc568b5ab415e20b802dc",
"ts": "c4364195d7639fb8b71e2f87ee017d4d"
},
"online": 0,
Expand Down Expand Up @@ -142,6 +143,7 @@
"query": {
"selects": {
"user_id": "user_id",
"row_id": "request_UUID",
"ts": "ts"
},
"timeColumn": "ts",
Expand Down Expand Up @@ -266,8 +268,5 @@
"useLongNames": 0
}
],
"rowIds": [
"user_id"
],
"useLongNames": 0
}
2 changes: 1 addition & 1 deletion api/python/test/canary/joins/gcp/item_event_join.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
selects=selects(
listing_id="EXPLODE(TRANSFORM(SPLIT(COALESCE(attributes['sold_listing_ids'], attributes['listing_id']), ','), e -> CAST(e AS LONG)))",
user_id="attributes['user_id']",
row_id="request_UUID",
),
time_column="timestamp",
),
Expand All @@ -20,7 +21,6 @@
# Join with just a streaming GB
canary_streaming_v1 = Join(
left=source,
row_ids="user_id",
right_parts=[
JoinPart(group_by=item_event_canary.actions_pubsub_v2)
],
Expand Down
10 changes: 4 additions & 6 deletions api/python/test/canary/joins/gcp/training_set.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,8 @@
table="data.checkouts",
query=Query(
selects=selects(
"user_id"
user_id="user_id",
row_id="request_UUID",
), # The primary key used to join various GroupBys together
start_partition="2023-11-01",
time_column="ts",
Expand All @@ -23,7 +24,6 @@

v1_test = Join(
left=source,
row_ids="user_id",
right_parts=[
JoinPart(group_by=purchases.v1_test)
],
Expand All @@ -41,7 +41,6 @@

v1_dev = Join(
left=source,
row_ids="user_id",
right_parts=[
JoinPart(group_by=purchases.v1_dev)
],
Expand All @@ -53,7 +52,8 @@
table="data.checkouts_notds",
query=Query(
selects=selects(
"user_id"
user_id = "user_id",
row_id="request_UUID",
), # The primary key used to join various GroupBys together
time_column="ts",
partition_column="notds"
Expand All @@ -63,7 +63,6 @@

v1_test_notds = Join(
left=source_notds,
row_ids=["user_id"],
right_parts=[
JoinPart(group_by=purchases.v1_test_notds)
],
Expand All @@ -72,7 +71,6 @@

v1_dev_notds = Join(
left=source_notds,
row_ids=["user_id"],
right_parts=[
JoinPart(group_by=purchases.v1_dev_notds)
],
Expand Down
3 changes: 1 addition & 2 deletions api/python/test/sample/joins/kaggle/outbrain.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,7 @@

training_set = Join( # left equi join
left=outbrain_left_events(
"uuid", "display_id", "ad_id", "document_id", "clicked", "geo_location", "platform"),
row_ids="uuid",
"uuid", "display_id", "ad_id", "document_id", "clicked", "geo_location", "platform", "row_id"),
right_parts=[JoinPart(group_by=group_by) for group_by in [ad_doc, ad_uuid, ad_streaming, ad_platform]],
use_long_names = True,
version=0,
Expand Down
5 changes: 2 additions & 3 deletions api/python/test/sample/joins/quickstart/training_set.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,8 @@
table="data.checkouts",
query=Query(
selects=selects(
"user_id"
user_id="user_id",
row_id="request_UUID",
), # The primary key used to join various GroupBys together
time_column="ts",
), # The event time used to compute feature values as-of
Expand All @@ -38,7 +39,6 @@

v1 = Join(
left=source,
row_ids="user_id",
right_parts=[
JoinPart(group_by=group_by) for group_by in [purchases_v1, returns_v1, users]
], # Include the three GroupBys
Expand All @@ -47,7 +47,6 @@

v2 = Join(
left=source,
row_ids=["user_id"],
right_parts=[
JoinPart(group_by=group_by) for group_by in [purchases_v1, returns_v1]
], # Include the two online GroupBys
Expand Down
3 changes: 1 addition & 2 deletions api/python/test/sample/joins/risk/user_transactions.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,13 +8,12 @@

source_users = Source(
events=EventSource(
table="data.users", query=Query(selects=selects("user_id"), time_column="ts")
table="data.users", query=Query(selects=selects("user_id", "row_id"), time_column="ts")
)
)

txn_join = Join(
left=source_users,
row_ids="user_id",
right_parts=[
JoinPart(group_by=txn_group_by_user, prefix="user"),
JoinPart(group_by=txn_group_by_merchant, prefix="merchant"),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@

v0 = Join(
left=test_sources.event_source,
row_ids="subject",
right_parts=[JoinPart(group_by=mutation_sample_group_by.v0)],
online=False,
version=0,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@

v1 = Join(
left=test_sources.event_source,
row_ids=["subject", "event"],
right_parts=[
JoinPart(
group_by=chaining_group_by_v1,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@

parent_join = Join(
left=test_sources.event_source,
row_ids="group_by_subject",
right_parts=[
JoinPart(
group_by=event_sample_group_by.v1,
Expand Down
5 changes: 0 additions & 5 deletions api/python/test/sample/joins/sample_team/sample_join.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,6 @@
v1 = Join(
left=test_sources.staging_entities,
right_parts=[JoinPart(group_by=sample_group_by.v1)],
row_ids="place_id",
table_properties={"config_json": """{"sample_key": "sample_value"}"""},
output_namespace="sample_namespace",
env_vars=EnvironmentVariables(
Expand All @@ -41,7 +40,6 @@
never = Join(
left=test_sources.staging_entities,
right_parts=[JoinPart(group_by=sample_group_by.v1)],
row_ids=["s2CellId", "place_id"],
output_namespace="sample_namespace",
offline_schedule="@never",
version=0,
Expand All @@ -50,15 +48,13 @@
group_by_of_group_by = Join(
left=test_sources.staging_entities,
right_parts=[JoinPart(group_by=sample_group_by_group_by.v1)],
row_ids="s2CellId",
output_namespace="sample_namespace",
version=0,
)

consistency_check = Join(
left=test_sources.staging_entities,
right_parts=[JoinPart(group_by=sample_group_by.v1)],
row_ids="place_id",
output_namespace="sample_namespace",
check_consistency=True,
version=0,
Expand All @@ -67,7 +63,6 @@
no_log_flattener = Join(
left=test_sources.staging_entities,
right_parts=[JoinPart(group_by=sample_group_by.v1)],
row_ids=["place_id"],
output_namespace="sample_namespace",
sample_percent=0.0,
version=0,
Expand Down
Loading
Loading