From dda571540c0416d7ea5ba8311dea3e2a6bdc4c65 Mon Sep 17 00:00:00 2001 From: Chewy Shaw Date: Wed, 2 Oct 2024 15:14:21 -0700 Subject: [PATCH 1/3] Join and Group Bys for POC --- .../sample/group_bys/risk/merchant_data.py | 29 ++ .../group_bys/risk/transaction_events.py | 73 ++++++ .../test/sample/group_bys/risk/user_data.py | 29 ++ .../sample/joins/risk/user_transactions.py | 21 ++ .../transaction_events.txn_group_by_merchant | 71 +++++ .../risk/transaction_events.txn_group_by_user | 71 +++++ .../joins/risk/user_transactions.txn_join | 248 ++++++++++++++++++ api/py/test/sample/teams.json | 5 + 8 files changed, 547 insertions(+) create mode 100644 api/py/test/sample/group_bys/risk/merchant_data.py create mode 100644 api/py/test/sample/group_bys/risk/transaction_events.py create mode 100644 api/py/test/sample/group_bys/risk/user_data.py create mode 100644 api/py/test/sample/joins/risk/user_transactions.py create mode 100644 api/py/test/sample/production/group_bys/risk/transaction_events.txn_group_by_merchant create mode 100644 api/py/test/sample/production/group_bys/risk/transaction_events.txn_group_by_user create mode 100644 api/py/test/sample/production/joins/risk/user_transactions.txn_join diff --git a/api/py/test/sample/group_bys/risk/merchant_data.py b/api/py/test/sample/group_bys/risk/merchant_data.py new file mode 100644 index 0000000000..a97397c089 --- /dev/null +++ b/api/py/test/sample/group_bys/risk/merchant_data.py @@ -0,0 +1,29 @@ +from ai.chronon.api.ttypes import Source, EntitySource +from ai.chronon.query import Query, select +from ai.chronon.group_by import ( + GroupBy, + Aggregation, + Operation, + Window, + TimeUnit +) + +""" +This GroupBy aggregates metrics about a user's previous purchases in various windows. +""" + +# This source is raw purchase events. Every time a user makes a purchase, it will be one entry in this source. +source_merchants = Source( + entities=EntitySource( + snapshotTable="data.merchants", # This points to the log table in the warehouse with historical purchase events, updated in batch daily + query=Query( + selects=select("merchant_id","account_age", "zipcode", "is_big_merchant", "country", "account_type", "preferred_language"), # Select the fields we care about + ) + ) +) + +merchant_group_by = GroupBy( + sources=[source_merchants], + keys=["merchant_id"], + aggregations=None +) \ No newline at end of file diff --git a/api/py/test/sample/group_bys/risk/transaction_events.py b/api/py/test/sample/group_bys/risk/transaction_events.py new file mode 100644 index 0000000000..7c4b9a61f8 --- /dev/null +++ b/api/py/test/sample/group_bys/risk/transaction_events.py @@ -0,0 +1,73 @@ +from ai.chronon.api.ttypes import Source, EventSource +from ai.chronon.query import Query, select +from ai.chronon.group_by import ( + GroupBy, + Aggregation, + Operation, + Window, + TimeUnit +) + +""" +This GroupBy aggregates metrics about a user's previous purchases in various windows. +""" + +# This source is raw purchase events. Every time a user makes a purchase, it will be one entry in this source. +source_user_transactions = Source( + events=EventSource( + table="data.txn_events", # This points to the log table in the warehouse with historical purchase events, updated in batch daily + topic=None, + query=Query( + selects=select("user_id","transaction_amount", "transaction_type"), # Select the fields we care about + time_column="transaction_time") # The event time + )) + +window_sizes = [Window(length=1, timeUnit=TimeUnit.HOURS), Window(length=1, timeUnit=TimeUnit.DAYS), Window(length=30, timeUnit=TimeUnit.DAYS), Window(length=365, timeUnit=TimeUnit.DAYS)] + +txn_group_by_user = GroupBy( + sources=[source_user_transactions], + keys=["user_id"], + online=True, + aggregations=[ + Aggregation( + input_column="transaction_amount", + operation=Operation.COUNT, + windows=window_sizes + ), + Aggregation( + input_column="transaction_amount", + operation=Operation.SUM, + windows=[Window(length=1, timeUnit=TimeUnit.HOURS)] + ) + ] +) + +# This source is raw purchase events. Every time a user makes a purchase, it will be one entry in this source. +source_merchant_transactions = Source( + events=EventSource( + table="data.txn_events", # This points to the log table in the warehouse with historical purchase events, updated in batch daily + topic=None, + query=Query( + selects=select("merchant_id","transaction_amount", "transaction_type"), # Select the fields we care about + time_column="transaction_time") # The event time + )) + +window_sizes = [Window(length=1, timeUnit=TimeUnit.HOURS), Window(length=1, timeUnit=TimeUnit.DAYS), Window(length=30, timeUnit=TimeUnit.DAYS), Window(length=365, timeUnit=TimeUnit.DAYS)] + +txn_group_by_merchant = GroupBy( + sources=[source_merchant_transactions], + keys=["merchant_id"], + online=True, + aggregations=[ + Aggregation( + input_column="transaction_amount", + operation=Operation.COUNT, + windows=window_sizes + ), + Aggregation( + input_column="transaction_amount", + operation=Operation.SUM, + windows=[Window(length=1, timeUnit=TimeUnit.HOURS)] + ) + ] +) \ No newline at end of file diff --git a/api/py/test/sample/group_bys/risk/user_data.py b/api/py/test/sample/group_bys/risk/user_data.py new file mode 100644 index 0000000000..e928aeab18 --- /dev/null +++ b/api/py/test/sample/group_bys/risk/user_data.py @@ -0,0 +1,29 @@ +from ai.chronon.api.ttypes import Source, EntitySource +from ai.chronon.query import Query, select +from ai.chronon.group_by import ( + GroupBy, + Aggregation, + Operation, + Window, + TimeUnit +) + +""" +This GroupBy aggregates metrics about a user's previous purchases in various windows. +""" + +# This source is raw purchase events. Every time a user makes a purchase, it will be one entry in this source. +source_users = Source( + entities=EntitySource( + snapshotTable="data.users", # This points to the log table in the warehouse with historical purchase events, updated in batch daily + query=Query( + selects=select("user_id","account_age", "account_balance", "credit_score", "number_of_devices", "country", "account_type", "preferred_language"), # Select the fields we care about + ) # The event time + ) +) + +user_group_by = GroupBy( + sources=[source_users], + keys=["user_id"], + aggregations=None +) \ No newline at end of file diff --git a/api/py/test/sample/joins/risk/user_transactions.py b/api/py/test/sample/joins/risk/user_transactions.py new file mode 100644 index 0000000000..08eb5de5b3 --- /dev/null +++ b/api/py/test/sample/joins/risk/user_transactions.py @@ -0,0 +1,21 @@ +from ai.chronon.api.ttypes import Source, EventSource +from ai.chronon.join import Join, JoinPart +from ai.chronon.query import Query, select +from group_bys.risk.transaction_events import txn_group_by_user, txn_group_by_merchant +from group_bys.risk.user_data import user_group_by +from group_bys.risk.merchant_data import merchant_group_by + +source_users = Source( + events=EventSource( + table="data.users", + query=Query( + selects=select("user_id"), + time_column="ts" + ) + ) +) + +txn_join = Join( + left=source_users, + right_parts=[JoinPart(group_by=txn_group_by_user, prefix="user"), JoinPart(group_by=txn_group_by_merchant, prefix="merchant"), JoinPart(group_by=user_group_by, prefix="user"), JoinPart(group_by=merchant_group_by, prefix="merchant")] +) diff --git a/api/py/test/sample/production/group_bys/risk/transaction_events.txn_group_by_merchant b/api/py/test/sample/production/group_bys/risk/transaction_events.txn_group_by_merchant new file mode 100644 index 0000000000..35706d1dbf --- /dev/null +++ b/api/py/test/sample/production/group_bys/risk/transaction_events.txn_group_by_merchant @@ -0,0 +1,71 @@ +{ + "metaData": { + "name": "risk.transaction_events.txn_group_by_merchant", + "online": 1, + "customJson": "{\"lag\": 0, \"groupby_tags\": null, \"column_tags\": {}}", + "dependencies": [ + "{\"name\": \"wait_for_data.txn_events_ds\", \"spec\": \"data.txn_events/ds={{ ds }}\", \"start\": null, \"end\": null}" + ], + "tableProperties": { + "source": "chronon" + }, + "outputNamespace": "default", + "team": "risk", + "offlineSchedule": "@daily" + }, + "sources": [ + { + "events": { + "table": "data.txn_events", + "query": { + "selects": { + "merchant_id": "merchant_id", + "transaction_amount": "transaction_amount", + "transaction_type": "transaction_type" + }, + "timeColumn": "transaction_time", + "setups": [] + } + } + } + ], + "keyColumns": [ + "merchant_id" + ], + "aggregations": [ + { + "inputColumn": "transaction_amount", + "operation": 6, + "argMap": {}, + "windows": [ + { + "length": 1, + "timeUnit": 0 + }, + { + "length": 1, + "timeUnit": 1 + }, + { + "length": 30, + "timeUnit": 1 + }, + { + "length": 365, + "timeUnit": 1 + } + ] + }, + { + "inputColumn": "transaction_amount", + "operation": 7, + "argMap": {}, + "windows": [ + { + "length": 1, + "timeUnit": 0 + } + ] + } + ] +} \ No newline at end of file diff --git a/api/py/test/sample/production/group_bys/risk/transaction_events.txn_group_by_user b/api/py/test/sample/production/group_bys/risk/transaction_events.txn_group_by_user new file mode 100644 index 0000000000..daa0f07326 --- /dev/null +++ b/api/py/test/sample/production/group_bys/risk/transaction_events.txn_group_by_user @@ -0,0 +1,71 @@ +{ + "metaData": { + "name": "risk.transaction_events.txn_group_by_user", + "online": 1, + "customJson": "{\"lag\": 0, \"groupby_tags\": null, \"column_tags\": {}}", + "dependencies": [ + "{\"name\": \"wait_for_data.txn_events_ds\", \"spec\": \"data.txn_events/ds={{ ds }}\", \"start\": null, \"end\": null}" + ], + "tableProperties": { + "source": "chronon" + }, + "outputNamespace": "default", + "team": "risk", + "offlineSchedule": "@daily" + }, + "sources": [ + { + "events": { + "table": "data.txn_events", + "query": { + "selects": { + "user_id": "user_id", + "transaction_amount": "transaction_amount", + "transaction_type": "transaction_type" + }, + "timeColumn": "transaction_time", + "setups": [] + } + } + } + ], + "keyColumns": [ + "user_id" + ], + "aggregations": [ + { + "inputColumn": "transaction_amount", + "operation": 6, + "argMap": {}, + "windows": [ + { + "length": 1, + "timeUnit": 0 + }, + { + "length": 1, + "timeUnit": 1 + }, + { + "length": 30, + "timeUnit": 1 + }, + { + "length": 365, + "timeUnit": 1 + } + ] + }, + { + "inputColumn": "transaction_amount", + "operation": 7, + "argMap": {}, + "windows": [ + { + "length": 1, + "timeUnit": 0 + } + ] + } + ] +} \ No newline at end of file diff --git a/api/py/test/sample/production/joins/risk/user_transactions.txn_join b/api/py/test/sample/production/joins/risk/user_transactions.txn_join new file mode 100644 index 0000000000..d845dc432a --- /dev/null +++ b/api/py/test/sample/production/joins/risk/user_transactions.txn_join @@ -0,0 +1,248 @@ +{ + "metaData": { + "name": "risk.user_transactions.txn_join", + "online": 0, + "production": 0, + "customJson": "{\"check_consistency\": false, \"lag\": 0, \"join_tags\": null, \"join_part_tags\": {}}", + "dependencies": [ + "{\"name\": \"wait_for_data.users_ds\", \"spec\": \"data.users/ds={{ ds }}\", \"start\": null, \"end\": null}", + "{\"name\": \"wait_for_data.txn_events_ds\", \"spec\": \"data.txn_events/ds={{ ds }}\", \"start\": null, \"end\": null}", + "{\"name\": \"wait_for_data.merchants_ds\", \"spec\": \"data.merchants/ds={{ ds }}\", \"start\": null, \"end\": null}" + ], + "tableProperties": { + "source": "chronon" + }, + "outputNamespace": "default", + "team": "risk", + "samplePercent": 100.0, + "offlineSchedule": "@daily" + }, + "left": { + "events": { + "table": "data.users", + "query": { + "selects": { + "user_id": "user_id", + "ts": "ts" + }, + "timeColumn": "ts", + "setups": [] + } + } + }, + "joinParts": [ + { + "groupBy": { + "metaData": { + "name": "risk.transaction_events.txn_group_by_user", + "online": 1, + "customJson": "{\"lag\": 0, \"groupby_tags\": null, \"column_tags\": {}}", + "dependencies": [ + "{\"name\": \"wait_for_data.txn_events_ds\", \"spec\": \"data.txn_events/ds={{ ds }}\", \"start\": null, \"end\": null}" + ], + "team": "risk", + "offlineSchedule": "@daily" + }, + "sources": [ + { + "events": { + "table": "data.txn_events", + "query": { + "selects": { + "user_id": "user_id", + "transaction_amount": "transaction_amount", + "transaction_type": "transaction_type" + }, + "timeColumn": "transaction_time", + "setups": [] + } + } + } + ], + "keyColumns": [ + "user_id" + ], + "aggregations": [ + { + "inputColumn": "transaction_amount", + "operation": 6, + "argMap": {}, + "windows": [ + { + "length": 1, + "timeUnit": 0 + }, + { + "length": 1, + "timeUnit": 1 + }, + { + "length": 30, + "timeUnit": 1 + }, + { + "length": 365, + "timeUnit": 1 + } + ] + }, + { + "inputColumn": "transaction_amount", + "operation": 7, + "argMap": {}, + "windows": [ + { + "length": 1, + "timeUnit": 0 + } + ] + } + ] + }, + "prefix": "user" + }, + { + "groupBy": { + "metaData": { + "name": "risk.transaction_events.txn_group_by_merchant", + "online": 1, + "customJson": "{\"lag\": 0, \"groupby_tags\": null, \"column_tags\": {}}", + "dependencies": [ + "{\"name\": \"wait_for_data.txn_events_ds\", \"spec\": \"data.txn_events/ds={{ ds }}\", \"start\": null, \"end\": null}" + ], + "team": "risk", + "offlineSchedule": "@daily" + }, + "sources": [ + { + "events": { + "table": "data.txn_events", + "query": { + "selects": { + "merchant_id": "merchant_id", + "transaction_amount": "transaction_amount", + "transaction_type": "transaction_type" + }, + "timeColumn": "transaction_time", + "setups": [] + } + } + } + ], + "keyColumns": [ + "merchant_id" + ], + "aggregations": [ + { + "inputColumn": "transaction_amount", + "operation": 6, + "argMap": {}, + "windows": [ + { + "length": 1, + "timeUnit": 0 + }, + { + "length": 1, + "timeUnit": 1 + }, + { + "length": 30, + "timeUnit": 1 + }, + { + "length": 365, + "timeUnit": 1 + } + ] + }, + { + "inputColumn": "transaction_amount", + "operation": 7, + "argMap": {}, + "windows": [ + { + "length": 1, + "timeUnit": 0 + } + ] + } + ] + }, + "prefix": "merchant" + }, + { + "groupBy": { + "metaData": { + "name": "risk.user_data.user_group_by", + "customJson": "{\"lag\": 0, \"groupby_tags\": null, \"column_tags\": {}}", + "dependencies": [ + "{\"name\": \"wait_for_data.users_ds\", \"spec\": \"data.users/ds={{ ds }}\", \"start\": null, \"end\": null}" + ], + "team": "risk", + "offlineSchedule": "@daily" + }, + "sources": [ + { + "entities": { + "snapshotTable": "data.users", + "query": { + "selects": { + "user_id": "user_id", + "account_age": "account_age", + "account_balance": "account_balance", + "credit_score": "credit_score", + "number_of_devices": "number_of_devices", + "country": "country", + "account_type": "account_type", + "preferred_language": "preferred_language" + }, + "setups": [] + } + } + } + ], + "keyColumns": [ + "user_id" + ] + }, + "prefix": "user" + }, + { + "groupBy": { + "metaData": { + "name": "risk.merchant_data.merchant_group_by", + "customJson": "{\"lag\": 0, \"groupby_tags\": null, \"column_tags\": {}}", + "dependencies": [ + "{\"name\": \"wait_for_data.merchants_ds\", \"spec\": \"data.merchants/ds={{ ds }}\", \"start\": null, \"end\": null}" + ], + "team": "risk", + "offlineSchedule": "@daily" + }, + "sources": [ + { + "entities": { + "snapshotTable": "data.merchants", + "query": { + "selects": { + "merchant_id": "merchant_id", + "account_age": "account_age", + "zipcode": "zipcode", + "is_big_merchant": "is_big_merchant", + "country": "country", + "account_type": "account_type", + "preferred_language": "preferred_language" + }, + "setups": [] + } + } + } + ], + "keyColumns": [ + "merchant_id" + ] + }, + "prefix": "merchant" + } + ] +} \ No newline at end of file diff --git a/api/py/test/sample/teams.json b/api/py/test/sample/teams.json index 05f40ddec0..65120076ec 100644 --- a/api/py/test/sample/teams.json +++ b/api/py/test/sample/teams.json @@ -57,5 +57,10 @@ "quickstart": { "description": "Used for the quickstart example", "namespace": "default" + }, + "risk": { + "description": "Used for proof of concept", + "namespace": "default" } + } From 34c5052c3e172fd3ce8a368d71fe48fcbf914c91 Mon Sep 17 00:00:00 2001 From: Chewy Shaw Date: Thu, 3 Oct 2024 10:10:17 -0700 Subject: [PATCH 2/3] Code re-use to simplify the code --- .../group_bys/risk/transaction_events.py | 88 +++++++------------ 1 file changed, 34 insertions(+), 54 deletions(-) diff --git a/api/py/test/sample/group_bys/risk/transaction_events.py b/api/py/test/sample/group_bys/risk/transaction_events.py index 7c4b9a61f8..de46f0530e 100644 --- a/api/py/test/sample/group_bys/risk/transaction_events.py +++ b/api/py/test/sample/group_bys/risk/transaction_events.py @@ -12,62 +12,42 @@ This GroupBy aggregates metrics about a user's previous purchases in various windows. """ -# This source is raw purchase events. Every time a user makes a purchase, it will be one entry in this source. -source_user_transactions = Source( - events=EventSource( - table="data.txn_events", # This points to the log table in the warehouse with historical purchase events, updated in batch daily - topic=None, - query=Query( - selects=select("user_id","transaction_amount", "transaction_type"), # Select the fields we care about - time_column="transaction_time") # The event time - )) - -window_sizes = [Window(length=1, timeUnit=TimeUnit.HOURS), Window(length=1, timeUnit=TimeUnit.DAYS), Window(length=30, timeUnit=TimeUnit.DAYS), Window(length=365, timeUnit=TimeUnit.DAYS)] - -txn_group_by_user = GroupBy( - sources=[source_user_transactions], - keys=["user_id"], - online=True, - aggregations=[ - Aggregation( - input_column="transaction_amount", - operation=Operation.COUNT, - windows=window_sizes - ), - Aggregation( - input_column="transaction_amount", - operation=Operation.SUM, - windows=[Window(length=1, timeUnit=TimeUnit.HOURS)] +def create_transaction_source(key_field): + return Source( + events=EventSource( + table="data.txn_events", # Points to the historical purchase events table + topic=None, + query=Query( + selects=select(key_field, "transaction_amount", "transaction_type"), + time_column="transaction_time" + ) ) - ] -) + ) + +def create_txn_group_by(source, key): + return GroupBy( + sources=[source], + keys=[key], + online=True, + aggregations=[ + Aggregation( + input_column="transaction_amount", + operation=Operation.COUNT, + windows=window_sizes + ), + Aggregation( + input_column="transaction_amount", + operation=Operation.SUM, + windows=[Window(length=1, timeUnit=TimeUnit.HOURS)] + ) + ] + ) -# This source is raw purchase events. Every time a user makes a purchase, it will be one entry in this source. -source_merchant_transactions = Source( - events=EventSource( - table="data.txn_events", # This points to the log table in the warehouse with historical purchase events, updated in batch daily - topic=None, - query=Query( - selects=select("merchant_id","transaction_amount", "transaction_type"), # Select the fields we care about - time_column="transaction_time") # The event time - )) window_sizes = [Window(length=1, timeUnit=TimeUnit.HOURS), Window(length=1, timeUnit=TimeUnit.DAYS), Window(length=30, timeUnit=TimeUnit.DAYS), Window(length=365, timeUnit=TimeUnit.DAYS)] -txn_group_by_merchant = GroupBy( - sources=[source_merchant_transactions], - keys=["merchant_id"], - online=True, - aggregations=[ - Aggregation( - input_column="transaction_amount", - operation=Operation.COUNT, - windows=window_sizes - ), - Aggregation( - input_column="transaction_amount", - operation=Operation.SUM, - windows=[Window(length=1, timeUnit=TimeUnit.HOURS)] - ) - ] -) \ No newline at end of file +source_user_transactions = create_transaction_source("user_id") +txn_group_by_user = create_txn_group_by(source_user_transactions, "user_id") + +source_merchant_transactions = create_transaction_source("merchant_id") +txn_group_by_merchant = create_txn_group_by(source_merchant_transactions, "merchant_id") \ No newline at end of file From 18712cbc0a8cc3bbcc345cdc283eb4c9eaddb2c1 Mon Sep 17 00:00:00 2001 From: Chewy Shaw Date: Thu, 3 Oct 2024 11:26:05 -0700 Subject: [PATCH 3/3] Coderabbit cleanup of code ordering --- api/py/test/sample/group_bys/risk/transaction_events.py | 5 ++--- 1 file changed, 2 insertions(+), 3 deletions(-) diff --git a/api/py/test/sample/group_bys/risk/transaction_events.py b/api/py/test/sample/group_bys/risk/transaction_events.py index de46f0530e..7974656202 100644 --- a/api/py/test/sample/group_bys/risk/transaction_events.py +++ b/api/py/test/sample/group_bys/risk/transaction_events.py @@ -24,6 +24,8 @@ def create_transaction_source(key_field): ) ) +window_sizes = [Window(length=1, timeUnit=TimeUnit.HOURS), Window(length=1, timeUnit=TimeUnit.DAYS), Window(length=30, timeUnit=TimeUnit.DAYS), Window(length=365, timeUnit=TimeUnit.DAYS)] + def create_txn_group_by(source, key): return GroupBy( sources=[source], @@ -43,9 +45,6 @@ def create_txn_group_by(source, key): ] ) - -window_sizes = [Window(length=1, timeUnit=TimeUnit.HOURS), Window(length=1, timeUnit=TimeUnit.DAYS), Window(length=30, timeUnit=TimeUnit.DAYS), Window(length=365, timeUnit=TimeUnit.DAYS)] - source_user_transactions = create_transaction_source("user_id") txn_group_by_user = create_txn_group_by(source_user_transactions, "user_id")