Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
23 changes: 12 additions & 11 deletions api/py/ai/chronon/group_by.py
Original file line number Diff line number Diff line change
Expand Up @@ -370,24 +370,25 @@ def GroupBy(
:param sources:
can be constructed as entities or events or joinSource::

import ai.chronon.api.ttypes as chronon
events = chronon.Source(events=chronon.Events(
import ai.chronon.query as query
import ai.chronon.source as source
events = source.EventSource(
table=YOUR_TABLE,
topic=YOUR_TOPIC # <- OPTIONAL for serving
query=chronon.Query(...)
isCumulative=False # <- defaults to false.
query=query.Query(...)
is_cumulative=False # <- defaults to false.
))
Or
entities = chronon.Source(entities=chronon.Entities(
snapshotTable=YOUR_TABLE,
mutationTopic=YOUR_TOPIC,
mutationTable=YOUR_MUTATION_TABLE
query=chronon.Query(...)
entities = source.EntitySource(
snapshot_table=YOUR_TABLE,
mutation_topic=YOUR_TOPIC,
mutation_table=YOUR_MUTATION_TABLE
query=query.Query(...)
))
or
joinSource = chronon.Source(joinSource=chronon.JoinSource(
joinSource = source.JoinSource(
join = YOUR_CHRONON_PARENT_JOIN,
query = chronon.Query(...)
query = query.Query(...)
))

Multiple sources can be supplied to backfill the historical values with their respective start and end
Expand Down
81 changes: 81 additions & 0 deletions api/py/ai/chronon/source.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,81 @@
import json
from typing import Optional

import ai.chronon.api.ttypes as ttypes


def EventSource(
table: str,
query: ttypes.Query,
topic: Optional[str] = None,
is_cumulative: Optional[bool] = None,
**kwargs
) -> ttypes.Source:
"""

:param table: Points to a table that has historical data for the input events
:param query:
Contains row level transformations and filtering expressed as Spark SQL statements.
Applied to both table and topic
:param topic: (Optional) Kafka topic that can be listened to for realtime updates
:param is_cumulative:
Indicates that each new partition contains not just the current day's events but the entire set of events
since the beginning
:return:
A source object of kind EventSource
"""
return ttypes.Source(
events=ttypes.EventSource(
table=table,
topic=topic,
query=query,
isCumulative=is_cumulative,
customJson=json.dumps(kwargs) if kwargs else None,
)
)


def EntitySource(
snapshot_table: str,
query: ttypes.Query,
mutation_table: Optional[str] = None,
mutation_topic: Optional[str] = None,
**kwargs
) -> ttypes.Source:
"""

:param snapshot_table: Points to a table that contains periodical snapshots of the entire dataset
:param query: Contains row level transformations and filtering expressed as Spark SQL statements
:param mutation_table: (Optional) Points to a table that contains all changes applied to the dataset
:param mutation_topic: (Optional) Kafka topic that delivers changes in realtime
:return:
A source object of kind EntitySource
"""
return ttypes.Source(
entities=ttypes.EntitySource(
snapshotTable=snapshot_table,
mutationTable=mutation_table,
query=query,
mutationTopic=mutation_topic,
customJson=json.dumps(kwargs) if kwargs else None,
)
)


def JoinSource(
join: ttypes.Join,
query: ttypes.Query,
) -> ttypes.Source:
"""

:param join: Output of downstream Join operation
:param query: Contains row level transformations and filtering expressed as Spark SQL statements
:return:
A source object of kind JoinSource
"""
return ttypes.Source(
joinSource=ttypes.JoinSource(
join=join,
query=query
)
)
5 changes: 3 additions & 2 deletions api/py/test/lineage/test_parse_group_by.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
import unittest

from ai.chronon import group_by
from ai.chronon import source
from ai.chronon.api import ttypes
from ai.chronon.group_by import Accuracy, Derivation
from ai.chronon.lineage.lineage_metadata import ConfigType, TableType
Expand All @@ -25,7 +26,7 @@

class TestParseGroupBy(unittest.TestCase):
def setUp(self):
gb_event_source = ttypes.EventSource(
gb_event_source = source.EventSource(
table="source.gb_table",
topic=None,
query=ttypes.Query(
Expand All @@ -35,7 +36,7 @@ def setUp(self):
),
)

gb_event_source1 = ttypes.EventSource(
gb_event_source1 = source.EventSource(
table="source.gb_table1",
topic=None,
query=ttypes.Query(
Expand Down
23 changes: 11 additions & 12 deletions api/py/test/lineage/test_parse_join.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
import unittest

from ai.chronon import group_by
from ai.chronon import source
from ai.chronon.api import ttypes
from ai.chronon.api import ttypes as api
from ai.chronon.group_by import Derivation
Expand All @@ -27,7 +28,7 @@

class TestParseJoin(unittest.TestCase):
def setUp(self):
gb_event_source = ttypes.EventSource(
gb_event_source = source.EventSource(
table="gb_table",
topic=None,
query=ttypes.Query(
Expand Down Expand Up @@ -59,17 +60,15 @@ def setUp(self):
)

self.join = Join(
left=api.Source(
events=api.EventSource(
table="join_event_table",
query=api.Query(
startPartition="2020-04-09",
selects={
"subject": "subject",
"event_id": "event",
},
timeColumn="CAST(ts AS DOUBLE)",
),
left=source.EventSource(
table="join_event_table",
query=api.Query(
startPartition="2020-04-09",
selects={
"subject": "subject",
"event_id": "event",
},
timeColumn="CAST(ts AS DOUBLE)",
),
),
output_namespace="test_db",
Expand Down
17 changes: 8 additions & 9 deletions api/py/test/sample/group_bys/kaggle/clicks.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,6 @@
# See the License for the specific language governing permissions and
# limitations under the License.

from ai.chronon.api.ttypes import Source, EventSource
from ai.chronon.query import Query, select
from ai.chronon.group_by import (
GroupBy,
Expand All @@ -23,6 +22,7 @@
TimeUnit,
Accuracy
)
from ai.chronon.source import EventSource
from ai.chronon.utils import get_staging_query_output_table_name
from staging_queries.kaggle.outbrain import base_table

Expand All @@ -43,14 +43,13 @@
"""


source = Source(
events=EventSource(
table=get_staging_query_output_table_name(base_table), # Here we use the staging query output table because it has the necessary fields, but for a true streaming source we would likely use a log table
topic="some_topic", # You would set your streaming source topic here
query=Query(
selects=select("ad_id", "clicked"),
time_column="ts")
))
source = EventSource(
table=get_staging_query_output_table_name(base_table), # Here we use the staging query output table because it has the necessary fields, but for a true streaming source we would likely use a log table
topic="some_topic", # You would set your streaming source topic here
query=Query(
selects=select("ad_id", "clicked"),
time_column="ts")
)

ad_streaming = GroupBy(
sources=[source],
Expand Down
1 change: 0 additions & 1 deletion api/py/test/sample/group_bys/kaggle/outbrain.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,6 @@
# See the License for the specific language governing permissions and
# limitations under the License.

from ai.chronon.api.ttypes import Source, EventSource
from ai.chronon.query import Query, select
from ai.chronon.group_by import (
GroupBy,
Expand Down
17 changes: 8 additions & 9 deletions api/py/test/sample/group_bys/quickstart/purchases.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,8 +13,8 @@
# See the License for the specific language governing permissions and
# limitations under the License.

from ai.chronon.api.ttypes import Source, EventSource
from ai.chronon.query import Query, select
from ai.chronon.source import EventSource
from ai.chronon.group_by import (
GroupBy,
Aggregation,
Expand All @@ -28,14 +28,13 @@
"""

# This source is raw purchase events. Every time a user makes a purchase, it will be one entry in this source.
source = Source(
events=EventSource(
table="data.purchases", # This points to the log table in the warehouse with historical purchase events, updated in batch daily
topic=None, # See the 'returns' GroupBy for an example that has a streaming source configured. In this case, this would be the streaming source topic that can be listened to for realtime events
query=Query(
selects=select("user_id","purchase_price"), # Select the fields we care about
time_column="ts") # The event time
))
source = EventSource(
table="data.purchases", # This points to the log table in the warehouse with historical purchase events, updated in batch daily
topic=None, # See the 'returns' GroupBy for an example that has a streaming source configured. In this case, this would be the streaming source topic that can be listened to for realtime events
query=Query(
selects=select("user_id","purchase_price"), # Select the fields we care about
time_column="ts") # The event time
)

window_sizes = [Window(length=day, timeUnit=TimeUnit.DAYS) for day in [3, 14, 30]] # Define some window sizes to use below

Expand Down
17 changes: 8 additions & 9 deletions api/py/test/sample/group_bys/quickstart/returns.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,8 +13,8 @@
# See the License for the specific language governing permissions and
# limitations under the License.

from ai.chronon.api.ttypes import Source, EventSource
from ai.chronon.query import Query, select
from ai.chronon.source import EventSource
from ai.chronon.group_by import (
GroupBy,
Aggregation,
Expand All @@ -28,14 +28,13 @@
This GroupBy aggregates metrics about a user's previous purchases in various windows.
"""

source = Source(
events=EventSource(
table="data.returns", # This points to the log table with historical return events
topic="events.returns/fields=ts,return_id,user_id,product_id,refund_amt/host=kafka/port=9092",
query=Query(
selects=select("user_id","refund_amt"), # Select the fields we care about
time_column="ts") # The event time
))
source = EventSource(
table="data.returns", # This points to the log table with historical return events
topic="events.returns/fields=ts,return_id,user_id,product_id,refund_amt/host=kafka/port=9092",
query=Query(
selects=select("user_id","refund_amt"), # Select the fields we care about
time_column="ts") # The event time
)

window_sizes = [Window(length=day, timeUnit=TimeUnit.DAYS) for day in [3, 14, 30]] # Define some window sizes to use below

Expand Down
22 changes: 10 additions & 12 deletions api/py/test/sample/group_bys/quickstart/schema.py
Original file line number Diff line number Diff line change
@@ -1,20 +1,18 @@
from ai.chronon.group_by import GroupBy, Aggregation, Operation
from ai.chronon.api.ttypes import Source, EventSource
from ai.chronon.source import EventSource
from ai.chronon.query import Query, select


logging_schema_source = Source(
events=EventSource(
table="default.chronon_log_table",
query=Query(
selects=select(
schema_hash="decode(unbase64(key_base64), 'utf-8')",
schema_value="decode(unbase64(value_base64), 'utf-8')"
),
wheres=["name='SCHEMA_PUBLISH_EVENT'"],
time_column="ts_millis",
logging_schema_source = EventSource(
table="default.chronon_log_table",
query=Query(
selects=select(
schema_hash="decode(unbase64(key_base64), 'utf-8')",
schema_value="decode(unbase64(value_base64), 'utf-8')"
),
)
wheres=["name='SCHEMA_PUBLISH_EVENT'"],
time_column="ts_millis",
),
)

v1 = GroupBy(
Expand Down
15 changes: 7 additions & 8 deletions api/py/test/sample/group_bys/quickstart/users.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,24 +13,23 @@
# See the License for the specific language governing permissions and
# limitations under the License.

from ai.chronon.api.ttypes import Source, EntitySource
from ai.chronon.query import Query, select
from ai.chronon.group_by import (
GroupBy,
)
from ai.chronon.source import EntitySource

"""
The primary key for this GroupBy is the same as the primary key of the source table. Therefore,
it doesn't perform any aggregation, but just extracts user fields as features.
"""

source = Source(
entities=EntitySource(
snapshotTable="data.users", # This points to a table that contains daily snapshots of the entire product catalog
query=Query(
selects=select("user_id","account_created_ds","email_verified"), # Select the fields we care about
)
))
source = EntitySource(
snapshotTable="data.users", # This points to a table that contains daily snapshots of the entire product catalog
query=Query(
selects=select("user_id","account_created_ds","email_verified"), # Select the fields we care about
)
)

v1 = GroupBy(
sources=[source],
Expand Down
Original file line number Diff line number Diff line change
@@ -1,21 +1,19 @@
from ai.chronon.api import ttypes
from ai.chronon.api.ttypes import Aggregation, Operation, TimeUnit, Window
from ai.chronon.group_by import Aggregations, GroupBy
from ai.chronon.query import Query, select
from ai.chronon.source import EventSource

source = ttypes.Source(
events=ttypes.EventSource(
table="random_table_name",
query=Query(
selects=select(
user="id_item",
play="if(transaction_type='A', 1, 0)",
pause="if(transaction_type='B', 1, 0)",
),
start_partition="2023-03-01",
time_column="UNIX_TIMESTAMP(ts) * 1000",
source = EventSource(
table="random_table_name",
query=Query(
selects=select(
user="id_item",
play="if(transaction_type='A', 1, 0)",
pause="if(transaction_type='B', 1, 0)",
),
)
start_partition="2023-03-01",
time_column="UNIX_TIMESTAMP(ts) * 1000",
),
)

windows = [
Expand Down
Loading