diff --git a/api/py/test/sample/group_bys/risk/merchant_data.py b/api/py/test/sample/group_bys/risk/merchant_data.py new file mode 100644 index 0000000000..a97397c089 --- /dev/null +++ b/api/py/test/sample/group_bys/risk/merchant_data.py @@ -0,0 +1,29 @@ +from ai.chronon.api.ttypes import Source, EntitySource +from ai.chronon.query import Query, select +from ai.chronon.group_by import ( + GroupBy, + Aggregation, + Operation, + Window, + TimeUnit +) + +""" +This GroupBy aggregates metrics about a user's previous purchases in various windows. +""" + +# This source is raw purchase events. Every time a user makes a purchase, it will be one entry in this source. +source_merchants = Source( + entities=EntitySource( + snapshotTable="data.merchants", # This points to the log table in the warehouse with historical purchase events, updated in batch daily + query=Query( + selects=select("merchant_id","account_age", "zipcode", "is_big_merchant", "country", "account_type", "preferred_language"), # Select the fields we care about + ) + ) +) + +merchant_group_by = GroupBy( + sources=[source_merchants], + keys=["merchant_id"], + aggregations=None +) \ No newline at end of file diff --git a/api/py/test/sample/group_bys/risk/transaction_events.py b/api/py/test/sample/group_bys/risk/transaction_events.py new file mode 100644 index 0000000000..7974656202 --- /dev/null +++ b/api/py/test/sample/group_bys/risk/transaction_events.py @@ -0,0 +1,52 @@ +from ai.chronon.api.ttypes import Source, EventSource +from ai.chronon.query import Query, select +from ai.chronon.group_by import ( + GroupBy, + Aggregation, + Operation, + Window, + TimeUnit +) + +""" +This GroupBy aggregates metrics about a user's previous purchases in various windows. +""" + +def create_transaction_source(key_field): + return Source( + events=EventSource( + table="data.txn_events", # Points to the historical purchase events table + topic=None, + query=Query( + selects=select(key_field, "transaction_amount", "transaction_type"), + time_column="transaction_time" + ) + ) + ) + +window_sizes = [Window(length=1, timeUnit=TimeUnit.HOURS), Window(length=1, timeUnit=TimeUnit.DAYS), Window(length=30, timeUnit=TimeUnit.DAYS), Window(length=365, timeUnit=TimeUnit.DAYS)] + +def create_txn_group_by(source, key): + return GroupBy( + sources=[source], + keys=[key], + online=True, + aggregations=[ + Aggregation( + input_column="transaction_amount", + operation=Operation.COUNT, + windows=window_sizes + ), + Aggregation( + input_column="transaction_amount", + operation=Operation.SUM, + windows=[Window(length=1, timeUnit=TimeUnit.HOURS)] + ) + ] + ) + +source_user_transactions = create_transaction_source("user_id") +txn_group_by_user = create_txn_group_by(source_user_transactions, "user_id") + +source_merchant_transactions = create_transaction_source("merchant_id") +txn_group_by_merchant = create_txn_group_by(source_merchant_transactions, "merchant_id") \ No newline at end of file diff --git a/api/py/test/sample/group_bys/risk/user_data.py b/api/py/test/sample/group_bys/risk/user_data.py new file mode 100644 index 0000000000..e928aeab18 --- /dev/null +++ b/api/py/test/sample/group_bys/risk/user_data.py @@ -0,0 +1,29 @@ +from ai.chronon.api.ttypes import Source, EntitySource +from ai.chronon.query import Query, select +from ai.chronon.group_by import ( + GroupBy, + Aggregation, + Operation, + Window, + TimeUnit +) + +""" +This GroupBy aggregates metrics about a user's previous purchases in various windows. +""" + +# This source is raw purchase events. Every time a user makes a purchase, it will be one entry in this source. +source_users = Source( + entities=EntitySource( + snapshotTable="data.users", # This points to the log table in the warehouse with historical purchase events, updated in batch daily + query=Query( + selects=select("user_id","account_age", "account_balance", "credit_score", "number_of_devices", "country", "account_type", "preferred_language"), # Select the fields we care about + ) # The event time + ) +) + +user_group_by = GroupBy( + sources=[source_users], + keys=["user_id"], + aggregations=None +) \ No newline at end of file diff --git a/api/py/test/sample/joins/risk/user_transactions.py b/api/py/test/sample/joins/risk/user_transactions.py new file mode 100644 index 0000000000..08eb5de5b3 --- /dev/null +++ b/api/py/test/sample/joins/risk/user_transactions.py @@ -0,0 +1,21 @@ +from ai.chronon.api.ttypes import Source, EventSource +from ai.chronon.join import Join, JoinPart +from ai.chronon.query import Query, select +from group_bys.risk.transaction_events import txn_group_by_user, txn_group_by_merchant +from group_bys.risk.user_data import user_group_by +from group_bys.risk.merchant_data import merchant_group_by + +source_users = Source( + events=EventSource( + table="data.users", + query=Query( + selects=select("user_id"), + time_column="ts" + ) + ) +) + +txn_join = Join( + left=source_users, + right_parts=[JoinPart(group_by=txn_group_by_user, prefix="user"), JoinPart(group_by=txn_group_by_merchant, prefix="merchant"), JoinPart(group_by=user_group_by, prefix="user"), JoinPart(group_by=merchant_group_by, prefix="merchant")] +) diff --git a/api/py/test/sample/production/group_bys/risk/transaction_events.txn_group_by_merchant b/api/py/test/sample/production/group_bys/risk/transaction_events.txn_group_by_merchant new file mode 100644 index 0000000000..35706d1dbf --- /dev/null +++ b/api/py/test/sample/production/group_bys/risk/transaction_events.txn_group_by_merchant @@ -0,0 +1,71 @@ +{ + "metaData": { + "name": "risk.transaction_events.txn_group_by_merchant", + "online": 1, + "customJson": "{\"lag\": 0, \"groupby_tags\": null, \"column_tags\": {}}", + "dependencies": [ + "{\"name\": \"wait_for_data.txn_events_ds\", \"spec\": \"data.txn_events/ds={{ ds }}\", \"start\": null, \"end\": null}" + ], + "tableProperties": { + "source": "chronon" + }, + "outputNamespace": "default", + "team": "risk", + "offlineSchedule": "@daily" + }, + "sources": [ + { + "events": { + "table": "data.txn_events", + "query": { + "selects": { + "merchant_id": "merchant_id", + "transaction_amount": "transaction_amount", + "transaction_type": "transaction_type" + }, + "timeColumn": "transaction_time", + "setups": [] + } + } + } + ], + "keyColumns": [ + "merchant_id" + ], + "aggregations": [ + { + "inputColumn": "transaction_amount", + "operation": 6, + "argMap": {}, + "windows": [ + { + "length": 1, + "timeUnit": 0 + }, + { + "length": 1, + "timeUnit": 1 + }, + { + "length": 30, + "timeUnit": 1 + }, + { + "length": 365, + "timeUnit": 1 + } + ] + }, + { + "inputColumn": "transaction_amount", + "operation": 7, + "argMap": {}, + "windows": [ + { + "length": 1, + "timeUnit": 0 + } + ] + } + ] +} \ No newline at end of file diff --git a/api/py/test/sample/production/group_bys/risk/transaction_events.txn_group_by_user b/api/py/test/sample/production/group_bys/risk/transaction_events.txn_group_by_user new file mode 100644 index 0000000000..daa0f07326 --- /dev/null +++ b/api/py/test/sample/production/group_bys/risk/transaction_events.txn_group_by_user @@ -0,0 +1,71 @@ +{ + "metaData": { + "name": "risk.transaction_events.txn_group_by_user", + "online": 1, + "customJson": "{\"lag\": 0, \"groupby_tags\": null, \"column_tags\": {}}", + "dependencies": [ + "{\"name\": \"wait_for_data.txn_events_ds\", \"spec\": \"data.txn_events/ds={{ ds }}\", \"start\": null, \"end\": null}" + ], + "tableProperties": { + "source": "chronon" + }, + "outputNamespace": "default", + "team": "risk", + "offlineSchedule": "@daily" + }, + "sources": [ + { + "events": { + "table": "data.txn_events", + "query": { + "selects": { + "user_id": "user_id", + "transaction_amount": "transaction_amount", + "transaction_type": "transaction_type" + }, + "timeColumn": "transaction_time", + "setups": [] + } + } + } + ], + "keyColumns": [ + "user_id" + ], + "aggregations": [ + { + "inputColumn": "transaction_amount", + "operation": 6, + "argMap": {}, + "windows": [ + { + "length": 1, + "timeUnit": 0 + }, + { + "length": 1, + "timeUnit": 1 + }, + { + "length": 30, + "timeUnit": 1 + }, + { + "length": 365, + "timeUnit": 1 + } + ] + }, + { + "inputColumn": "transaction_amount", + "operation": 7, + "argMap": {}, + "windows": [ + { + "length": 1, + "timeUnit": 0 + } + ] + } + ] +} \ No newline at end of file diff --git a/api/py/test/sample/production/joins/risk/user_transactions.txn_join b/api/py/test/sample/production/joins/risk/user_transactions.txn_join new file mode 100644 index 0000000000..d845dc432a --- /dev/null +++ b/api/py/test/sample/production/joins/risk/user_transactions.txn_join @@ -0,0 +1,248 @@ +{ + "metaData": { + "name": "risk.user_transactions.txn_join", + "online": 0, + "production": 0, + "customJson": "{\"check_consistency\": false, \"lag\": 0, \"join_tags\": null, \"join_part_tags\": {}}", + "dependencies": [ + "{\"name\": \"wait_for_data.users_ds\", \"spec\": \"data.users/ds={{ ds }}\", \"start\": null, \"end\": null}", + "{\"name\": \"wait_for_data.txn_events_ds\", \"spec\": \"data.txn_events/ds={{ ds }}\", \"start\": null, \"end\": null}", + "{\"name\": \"wait_for_data.merchants_ds\", \"spec\": \"data.merchants/ds={{ ds }}\", \"start\": null, \"end\": null}" + ], + "tableProperties": { + "source": "chronon" + }, + "outputNamespace": "default", + "team": "risk", + "samplePercent": 100.0, + "offlineSchedule": "@daily" + }, + "left": { + "events": { + "table": "data.users", + "query": { + "selects": { + "user_id": "user_id", + "ts": "ts" + }, + "timeColumn": "ts", + "setups": [] + } + } + }, + "joinParts": [ + { + "groupBy": { + "metaData": { + "name": "risk.transaction_events.txn_group_by_user", + "online": 1, + "customJson": "{\"lag\": 0, \"groupby_tags\": null, \"column_tags\": {}}", + "dependencies": [ + "{\"name\": \"wait_for_data.txn_events_ds\", \"spec\": \"data.txn_events/ds={{ ds }}\", \"start\": null, \"end\": null}" + ], + "team": "risk", + "offlineSchedule": "@daily" + }, + "sources": [ + { + "events": { + "table": "data.txn_events", + "query": { + "selects": { + "user_id": "user_id", + "transaction_amount": "transaction_amount", + "transaction_type": "transaction_type" + }, + "timeColumn": "transaction_time", + "setups": [] + } + } + } + ], + "keyColumns": [ + "user_id" + ], + "aggregations": [ + { + "inputColumn": "transaction_amount", + "operation": 6, + "argMap": {}, + "windows": [ + { + "length": 1, + "timeUnit": 0 + }, + { + "length": 1, + "timeUnit": 1 + }, + { + "length": 30, + "timeUnit": 1 + }, + { + "length": 365, + "timeUnit": 1 + } + ] + }, + { + "inputColumn": "transaction_amount", + "operation": 7, + "argMap": {}, + "windows": [ + { + "length": 1, + "timeUnit": 0 + } + ] + } + ] + }, + "prefix": "user" + }, + { + "groupBy": { + "metaData": { + "name": "risk.transaction_events.txn_group_by_merchant", + "online": 1, + "customJson": "{\"lag\": 0, \"groupby_tags\": null, \"column_tags\": {}}", + "dependencies": [ + "{\"name\": \"wait_for_data.txn_events_ds\", \"spec\": \"data.txn_events/ds={{ ds }}\", \"start\": null, \"end\": null}" + ], + "team": "risk", + "offlineSchedule": "@daily" + }, + "sources": [ + { + "events": { + "table": "data.txn_events", + "query": { + "selects": { + "merchant_id": "merchant_id", + "transaction_amount": "transaction_amount", + "transaction_type": "transaction_type" + }, + "timeColumn": "transaction_time", + "setups": [] + } + } + } + ], + "keyColumns": [ + "merchant_id" + ], + "aggregations": [ + { + "inputColumn": "transaction_amount", + "operation": 6, + "argMap": {}, + "windows": [ + { + "length": 1, + "timeUnit": 0 + }, + { + "length": 1, + "timeUnit": 1 + }, + { + "length": 30, + "timeUnit": 1 + }, + { + "length": 365, + "timeUnit": 1 + } + ] + }, + { + "inputColumn": "transaction_amount", + "operation": 7, + "argMap": {}, + "windows": [ + { + "length": 1, + "timeUnit": 0 + } + ] + } + ] + }, + "prefix": "merchant" + }, + { + "groupBy": { + "metaData": { + "name": "risk.user_data.user_group_by", + "customJson": "{\"lag\": 0, \"groupby_tags\": null, \"column_tags\": {}}", + "dependencies": [ + "{\"name\": \"wait_for_data.users_ds\", \"spec\": \"data.users/ds={{ ds }}\", \"start\": null, \"end\": null}" + ], + "team": "risk", + "offlineSchedule": "@daily" + }, + "sources": [ + { + "entities": { + "snapshotTable": "data.users", + "query": { + "selects": { + "user_id": "user_id", + "account_age": "account_age", + "account_balance": "account_balance", + "credit_score": "credit_score", + "number_of_devices": "number_of_devices", + "country": "country", + "account_type": "account_type", + "preferred_language": "preferred_language" + }, + "setups": [] + } + } + } + ], + "keyColumns": [ + "user_id" + ] + }, + "prefix": "user" + }, + { + "groupBy": { + "metaData": { + "name": "risk.merchant_data.merchant_group_by", + "customJson": "{\"lag\": 0, \"groupby_tags\": null, \"column_tags\": {}}", + "dependencies": [ + "{\"name\": \"wait_for_data.merchants_ds\", \"spec\": \"data.merchants/ds={{ ds }}\", \"start\": null, \"end\": null}" + ], + "team": "risk", + "offlineSchedule": "@daily" + }, + "sources": [ + { + "entities": { + "snapshotTable": "data.merchants", + "query": { + "selects": { + "merchant_id": "merchant_id", + "account_age": "account_age", + "zipcode": "zipcode", + "is_big_merchant": "is_big_merchant", + "country": "country", + "account_type": "account_type", + "preferred_language": "preferred_language" + }, + "setups": [] + } + } + } + ], + "keyColumns": [ + "merchant_id" + ] + }, + "prefix": "merchant" + } + ] +} \ No newline at end of file diff --git a/api/py/test/sample/teams.json b/api/py/test/sample/teams.json index 05f40ddec0..65120076ec 100644 --- a/api/py/test/sample/teams.json +++ b/api/py/test/sample/teams.json @@ -57,5 +57,10 @@ "quickstart": { "description": "Used for the quickstart example", "namespace": "default" + }, + "risk": { + "description": "Used for proof of concept", + "namespace": "default" } + }