diff --git a/api/py/ai/chronon/cli/compile/compile_context.py b/api/py/ai/chronon/cli/compile/compile_context.py index 2c7c943fb4..da89fabb8c 100644 --- a/api/py/ai/chronon/cli/compile/compile_context.py +++ b/api/py/ai/chronon/cli/compile/compile_context.py @@ -3,14 +3,13 @@ from typing import Any, Dict, List, Type from ai.chronon.api.common.ttypes import ConfigType -from ai.chronon.api.ttypes import GroupBy, Join, StagingQuery, Team +from ai.chronon.api.ttypes import GroupBy, Join, StagingQuery, Team, Model from ai.chronon.cli.compile.display.compiled_obj import CompiledObj from ai.chronon.cli.compile.display.compile_status import CompileStatus from ai.chronon.cli.compile.serializer import file2thrift from ai.chronon.cli.compile.conf_validator import ConfValidator import ai.chronon.cli.compile.parse_teams as teams from ai.chronon.cli.logger import require, get_logger -from ai.chronon.model import Model logger = get_logger() diff --git a/api/py/ai/chronon/cli/compile/compiler.py b/api/py/ai/chronon/cli/compile/compiler.py index 9ebb99d135..efba5521c1 100644 --- a/api/py/ai/chronon/cli/compile/compiler.py +++ b/api/py/ai/chronon/cli/compile/compiler.py @@ -73,8 +73,15 @@ def _compile_class_configs(self, config_info: ConfigInfo) -> CompileResult: objects, errors = self._write_objects_in_folder(compiled_objects) + if objects: compile_result.obj_dict.update(objects) + name_to_compiled_objects = {obj.name: obj for obj in compiled_objects} + for name, successful_obj in objects.items(): + compiled_obj = name_to_compiled_objects[name] + self.compile_context.compile_status.add_object_update_display( + compiled_obj, config_info.cls.__name__ + ) if errors: compile_result.error_dict.update(errors) @@ -102,10 +109,6 @@ def _write_objects_in_folder( for error in errors: print(f"\nError processing conf {co.name}: {error}") - traceback.print_exception( - type(error), error, error.__traceback__ - ) - print("\n") else: object_dict[co.name] = co.obj diff --git a/api/py/ai/chronon/cli/compile/parse_configs.py b/api/py/ai/chronon/cli/compile/parse_configs.py index 5e2d57c640..b863538896 100644 --- a/api/py/ai/chronon/cli/compile/parse_configs.py +++ b/api/py/ai/chronon/cli/compile/parse_configs.py @@ -43,10 +43,6 @@ def from_folder( ) results.append(result) - compile_context.compile_status.add_object_update_display( - result, cls.__name__ - ) - except Exception as e: result = CompiledObj( @@ -55,10 +51,6 @@ def from_folder( results.append(result) - compile_context.compile_status.add_object_update_display( - result, cls.__name__ - ) - return results @@ -71,7 +63,9 @@ def from_file(file_path: str, cls: type, input_dir: str): rel_path_without_extension = os.path.splitext(rel_path)[0] module_name = rel_path_without_extension.replace("/", ".") - conf_type, team_name, rest = module_name.split(".", 2) + + conf_type, team_name_with_path = module_name.split(".", 1) + mod_path = team_name_with_path.replace("/", ".") module = importlib.import_module(module_name) @@ -81,9 +75,9 @@ def from_file(file_path: str, cls: type, input_dir: str): if isinstance(obj, cls): - name = f"{team_name}.{rest}.{var_name}" + name = f"{mod_path}.{var_name}" obj.metaData.name = name - obj.metaData.team = team_name + obj.metaData.team = mod_path.split(".")[0] result[name] = obj diff --git a/api/py/ai/chronon/cli/compile/parse_teams.py b/api/py/ai/chronon/cli/compile/parse_teams.py index 4e6540af27..aeb67d23c7 100644 --- a/api/py/ai/chronon/cli/compile/parse_teams.py +++ b/api/py/ai/chronon/cli/compile/parse_teams.py @@ -1,5 +1,7 @@ from copy import deepcopy import importlib +import importlib.util +import sys import os from ai.chronon.api.common.ttypes import ( ExecutionInfo, @@ -12,9 +14,27 @@ logger = get_logger() -_DEFAULT_CONF_TEAM = "common" +_DEFAULT_CONF_TEAM = "default" +def import_module_from_file(file_path): + # Get the module name from the file path (without .py extension) + module_name = file_path.split("/")[-1].replace(".py", "") + + # Create the module spec + spec = importlib.util.spec_from_file_location(module_name, file_path) + + # Create the module based on the spec + module = importlib.util.module_from_spec(spec) + + # Add the module to sys.modules + sys.modules[module_name] = module + + # Execute the module + spec.loader.exec_module(module) + + return module + def load_teams(conf_root: str) -> Dict[str, Team]: teams_file = os.path.join(conf_root, "teams.py") @@ -24,7 +44,7 @@ def load_teams(conf_root: str) -> Dict[str, Team]: f"Team config file: {teams_file} not found. You might be running this from the wrong directory.", ) - team_module = importlib.import_module("teams") + team_module = import_module_from_file(teams_file) require( team_module is not None, @@ -32,7 +52,7 @@ def load_teams(conf_root: str) -> Dict[str, Team]: ) team_dict = {} - + logger.info(f"Processing teams from path {teams_file}") for name, obj in team_module.__dict__.items(): if isinstance(obj, Team): obj.name = name @@ -64,7 +84,7 @@ def update_metadata(obj: Any, team_dict: Dict[str, Team]): require( _DEFAULT_CONF_TEAM in team_dict, - f"'{_DEFAULT_CONF_TEAM}' team not found in teams.py, please add an entry 🙏", + f"'{_DEFAULT_CONF_TEAM}' team not found in teams.py, please add an entry 🙏.", ) metadata.outputNamespace = team_dict[team].outputNamespace diff --git a/api/py/ai/chronon/cli/logger.py b/api/py/ai/chronon/cli/logger.py index 386b207549..ce6d59c2b1 100644 --- a/api/py/ai/chronon/cli/logger.py +++ b/api/py/ai/chronon/cli/logger.py @@ -53,9 +53,9 @@ def green(text): def require(cond, message): if not cond: - print(f"{red("❌")} {message}") + print(f"X: {message}") sys.exit(1) def done(cond, message): - print(f"{green("✅")} {message}") + print(f"DONE: {message}") diff --git a/api/py/ai/chronon/repo/compilev3.py b/api/py/ai/chronon/repo/compilev3.py new file mode 100644 index 0000000000..220ace4327 --- /dev/null +++ b/api/py/ai/chronon/repo/compilev3.py @@ -0,0 +1,34 @@ +import click +import os + +from ai.chronon.cli.compile.compile_context import CompileContext +from ai.chronon.cli.compile.compiler import Compiler + + +@click.command(name="compile") +@click.option( + "--chronon_root", + envvar="CHRONON_ROOT", + help="Path to the root chronon folder", +) +def compile_v3(chronon_root): + if chronon_root: + chronon_root_path = os.path.expanduser(chronon_root) + os.chdir(chronon_root_path) + + # check that a "teams.py" file exists in the current directory + if not (os.path.exists("teams.py") or os.path.exists("teams.json")): + raise click.ClickException( + ( + "teams.py or teams.json file not found in current directory." + " Please run from the top level of conf directory." + ) + ) + + compile_context = CompileContext() + compiler = Compiler(compile_context) + compiler.compile(compile_context) + + +if __name__ == "__main__": + compile_v3() diff --git a/api/py/ai/chronon/repo/teams.py b/api/py/ai/chronon/repo/teams.py index 2110327461..bee3825d5e 100644 --- a/api/py/ai/chronon/repo/teams.py +++ b/api/py/ai/chronon/repo/teams.py @@ -1,5 +1,4 @@ -"""A module used for reading teams.json file. -""" +"""A module used for reading teams.json file.""" # Copyright (C) 2023 The Chronon Authors. # @@ -18,7 +17,7 @@ import json # `default` team in teams.json contains default values. -DEFAULT_CONF_TEAM = 'default' +DEFAULT_CONF_TEAM = "default" loaded_jsons = {} diff --git a/api/py/ai/chronon/repo/zipline.py b/api/py/ai/chronon/repo/zipline.py index c23e0bb7f4..57ba0ec800 100644 --- a/api/py/ai/chronon/repo/zipline.py +++ b/api/py/ai/chronon/repo/zipline.py @@ -1,6 +1,6 @@ import click -from ai.chronon.repo.compile import extract_and_convert +from ai.chronon.repo.compilev3 import compile_v3 from ai.chronon.repo.run import main as run_main from ai.chronon.repo.init import main as init_main @@ -10,6 +10,6 @@ def zipline(): pass -zipline.add_command(extract_and_convert) +zipline.add_command(compile_v3) zipline.add_command(run_main) zipline.add_command(init_main) diff --git a/api/py/ai/chronon/utils.py b/api/py/ai/chronon/utils.py index 8157ac3d07..993f0cd4f9 100644 --- a/api/py/ai/chronon/utils.py +++ b/api/py/ai/chronon/utils.py @@ -25,14 +25,15 @@ import ai.chronon.api.ttypes as api import ai.chronon.repo.extract_objects as eo +from ai.chronon.cli.compile import parse_teams from ai.chronon.repo import ( # GROUP_BY_FOLDER_NAME, # JOIN_FOLDER_NAME, # STAGING_QUERY_FOLDER_NAME, # MODEL_FOLDER_NAME, FOLDER_NAME_TO_CLASS, - TEAMS_FILE_PATH, - teams, + # TEAMS_FILE_PATH, + # teams, ) @@ -313,6 +314,11 @@ def get_staging_query_output_table_name( return output_table_name(staging_query, full_name=full_name) +def get_team_conf_from_py(team, key): + team_module = importlib.import_module(f"teams.{team}") + return getattr(team_module, key) + + def get_join_output_table_name(join: api.Join, full_name: bool = False): """generate output table name for join backfill job""" # join sources could also be created inline alongside groupBy file @@ -322,9 +328,13 @@ def get_join_output_table_name(join: api.Join, full_name: bool = False): # set output namespace if not join.metaData.outputNamespace: team_name = join.metaData.name.split(".")[0] - namespace = teams.get_team_conf( - os.path.join(chronon_root_path, TEAMS_FILE_PATH), team_name, "namespace" + namespace = ( + parse_teams.load_teams(chronon_root_path).get(team_name).outputNamespace ) + # TODO replace this with teams.py + # namespace = teams.get_team_conf( + # os.path.join(chronon_root_path, TEAMS_FILE_PATH), team_name, "namespace" + # ) join.metaData.outputNamespace = namespace return output_table_name(join, full_name=full_name) diff --git a/api/py/requirements/base.in b/api/py/requirements/base.in index a0f618f151..7fd5c8feab 100644 --- a/api/py/requirements/base.in +++ b/api/py/requirements/base.in @@ -7,4 +7,5 @@ sqlglot crcmod==1.7 glom boto3 -importlib-resources==6.5.2 \ No newline at end of file +importlib-resources==6.5.2 +rich \ No newline at end of file diff --git a/api/py/requirements/base.txt b/api/py/requirements/base.txt index bcad0dff70..e1a9b17d68 100644 --- a/api/py/requirements/base.txt +++ b/api/py/requirements/base.txt @@ -1,10 +1,16 @@ -# SHA1:68652bbb7f3ec5c449c5d85307085c0c94bc4da3 +# SHA1:f6642699c69070a051b23fe523edcec65b717c6f # # This file is autogenerated by pip-compile-multi # To update, run: # # pip-compile-multi # +attrs==25.3.0 + # via glom +boltons==25.0.0 + # via + # face + # glom boto3==1.37.6 # via -r requirements/base.in botocore==1.37.6 @@ -19,6 +25,10 @@ click==8.1.8 # via -r requirements/base.in crcmod==1.7 # via -r requirements/base.in +face==24.0.0 + # via glom +glom==24.11.0 + # via -r requirements/base. google-api-core==2.24.0 # via # google-cloud-core @@ -46,6 +56,8 @@ jmespath==1.0.1 # via # boto3 # botocore +markdown-it-py==3.0.0 + # via rich proto-plus==1.25.0 # via google-api-core protobuf==5.29.3 @@ -59,12 +71,16 @@ pyasn1==0.6.1 # rsa pyasn1-modules==0.4.1 # via google-auth +pygments==2.19.1 + # via rich python-dateutil==2.9.0.post0 # via botocore requests==2.32.3 # via # google-api-core # google-cloud-storage +rich==13.9.4 + # via -r requirements/base.in rsa==4.9 # via google-auth s3transfer==0.11.4 diff --git a/api/py/test/sample/teams.json b/api/py/test/sample/deprecated_teams.json similarity index 100% rename from api/py/test/sample/teams.json rename to api/py/test/sample/deprecated_teams.json diff --git a/api/py/test/sample/group_bys/sample_team/sample_group_by_missing_input_column.py b/api/py/test/sample/group_bys/sample_team/sample_group_by_missing_input_column.py index 3b4cdc1d74..886ac279f7 100644 --- a/api/py/test/sample/group_bys/sample_team/sample_group_by_missing_input_column.py +++ b/api/py/test/sample/group_bys/sample_team/sample_group_by_missing_input_column.py @@ -25,6 +25,7 @@ sources=test_sources.staging_entities, keys=["s2CellId", "place_id"], aggregations=[ + # Intentionally left out `input_column` to test error handling Aggregation(operation=Operation.COUNT), Aggregation(operation=Operation.COUNT), ], diff --git a/api/py/test/sample/teams.py b/api/py/test/sample/teams.py index 0466a7ba7c..dfb0f29427 100644 --- a/api/py/test/sample/teams.py +++ b/api/py/test/sample/teams.py @@ -42,7 +42,7 @@ test = Team( - namespace="test", + outputNamespace="test", env=EnvironmentVariables( common={ "GCP_BIGTABLE_INSTANCE_ID": "test-instance" # example, custom bigtable instance @@ -63,7 +63,7 @@ sample_team = Team( - namespace="test", + outputNamespace="test", env=EnvironmentVariables( common={ "GCP_BIGTABLE_INSTANCE_ID": "test-instance" # example, custom bigtable instance @@ -82,10 +82,10 @@ ), ) -etsy_search = Team(namespace="etsy-search") +etsy_search = Team(outputNamespace="etsy-search") -kaggle = Team(namespace="kaggle") +kaggle = Team(outputNamespace="kaggle") -quickstart = Team(namespace="quickstart") +quickstart = Team(outputNamespace="quickstart") -risk = Team(namespace="risk") +risk = Team(outputNamespace="risk") diff --git a/api/py/tox.ini b/api/py/tox.ini index f594f7cb3e..a9845da6fc 100644 --- a/api/py/tox.ini +++ b/api/py/tox.ini @@ -9,15 +9,10 @@ allowlist_externals = rm, mkdir, cp setenv = PYTHONPATH = {toxinidir}:{toxinidir}/test/sample # Run a compile test run. commands_pre = - rm -rf test/sample/production - python3 ai/chronon/repo/compile.py \ - --chronon_root=test/sample \ - --input_path=joins/sample_team/ - python3 ai/chronon/repo/compile.py \ - --chronon_root=test/sample \ - --input_path=group_bys/sample_team/ - mkdir -p {envtmpdir}/test/sample/production - cp -r test/sample/production/ {envtmpdir}/test/sample/production/ + rm -rf test/sample/compiled + python3 ai/chronon/repo/compilev3.py --chronon_root=test/sample + mkdir -p {envtmpdir}/test/sample/compiled + cp -r test/sample/compiled/ {envtmpdir}/test/sample/compiled/ commands = pytest test/ \ --cov=ai/ \