Skip to content
Merged
Show file tree
Hide file tree
Changes from 23 commits
Commits
Show all changes
24 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
55 changes: 33 additions & 22 deletions api/python/ai/chronon/cli/compile/compile_context.py
Original file line number Diff line number Diff line change
@@ -1,10 +1,10 @@
import os
from dataclasses import dataclass
from typing import Any, Dict, List, Type
from typing import Any, Dict, List, Optional, Type

import ai.chronon.cli.compile.parse_teams as teams
from ai.chronon.api.common.ttypes import ConfigType
from ai.chronon.api.ttypes import GroupBy, Join, Model, StagingQuery, Team
from ai.chronon.api.ttypes import GroupBy, Join, MetaData, Model, StagingQuery, Team
from ai.chronon.cli.compile.conf_validator import ConfValidator
from ai.chronon.cli.compile.display.compile_status import CompileStatus
from ai.chronon.cli.compile.display.compiled_obj import CompiledObj
Expand All @@ -18,7 +18,7 @@
class ConfigInfo:
folder_name: str
cls: Type
config_type: ConfigType
config_type: Optional[ConfigType]


@dataclass
Expand All @@ -42,6 +42,7 @@ def __init__(self):
config_type=ConfigType.STAGING_QUERY,
),
ConfigInfo(folder_name="models", cls=Model, config_type=ConfigType.MODEL),
ConfigInfo(folder_name="teams_metadata", cls=MetaData, config_type=None), # only for team metadata
]

self.compile_status = CompileStatus(use_live=False)
Expand All @@ -51,6 +52,8 @@ def __init__(self):
cls = config_info.cls
self.existing_confs[cls] = self._parse_existing_confs(cls)



self.validator: ConfValidator = ConfValidator(
input_root=self.chronon_root,
output_root=self.compile_dir,
Expand Down Expand Up @@ -94,15 +97,15 @@ def output_dir(self, cls: type = None) -> str:
self.chronon_root, self.compile_dir, config_info.folder_name
)

def staging_output_path(self, obj: Any):
def staging_output_path(self, compiled_obj: CompiledObj):
"""
- eg., input: group_by with name search.clicks.features.v1
- eg., output: root/compiled_staging/group_bys/search/clicks.features.v1
"""

output_dir = self.staging_output_dir(obj.__class__) # compiled/joins
output_dir = self.staging_output_dir(compiled_obj.obj.__class__) # compiled/joins

team, rest = obj.metaData.name.split(".", 1) # search, clicks.features.v1
team, rest = compiled_obj.name.split(".", 1) # search, clicks.features.v1

return os.path.join(
output_dir,
Expand Down Expand Up @@ -139,22 +142,30 @@ def _parse_existing_confs(self, obj_class: type) -> Dict[str, object]:
try:
obj = file2thrift(full_path, obj_class)

if obj and hasattr(obj, "metaData"):
result[obj.metaData.name] = obj

compiled_obj = CompiledObj(
name=obj.metaData.name,
obj=obj,
file=obj.metaData.sourceFile,
errors=None,
obj_type=obj_class.__name__,
tjson=open(full_path).read(),
)

self.compile_status.add_existing_object_update_display(
compiled_obj
)

if obj:
if hasattr(obj, "metaData"):
result[obj.metaData.name] = obj
compiled_obj = CompiledObj(
name=obj.metaData.name,
obj=obj,
file=obj.metaData.sourceFile,
errors=None,
obj_type=obj_class.__name__,
tjson=open(full_path).read(),
)
self.compile_status.add_existing_object_update_display(compiled_obj)
elif isinstance(obj, MetaData):
team_metadata_name = '.'.join(full_path.split('/')[-2:]) # use the name of the file as team metadata won't have name
result[team_metadata_name] = obj
compiled_obj = CompiledObj(
name=team_metadata_name,
obj=obj,
file=obj.sourceFile,
errors=None,
obj_type=obj_class.__name__,
tjson=open(full_path).read(),
)
self.compile_status.add_existing_object_update_display(compiled_obj)
else:
logger.errors(
f"Parsed object from {full_path} has no metaData attribute"
Expand Down
34 changes: 30 additions & 4 deletions api/python/ai/chronon/cli/compile/compiler.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,9 +8,12 @@
import ai.chronon.cli.compile.parse_configs as parser
import ai.chronon.cli.logger as logger
from ai.chronon.api.common.ttypes import ConfigType
from ai.chronon.cli.compile import serializer
from ai.chronon.cli.compile.compile_context import CompileContext, ConfigInfo
from ai.chronon.cli.compile.display.compiled_obj import CompiledObj
from ai.chronon.cli.compile.display.console import console
from ai.chronon.cli.compile.parse_teams import merge_team_execution_info
from ai.chronon.types import MetaData

logger = logger.get_logger()

Expand All @@ -37,6 +40,7 @@ def compile(self) -> Dict[ConfigType, CompileResult]:
configs = self._compile_class_configs(config_info)

compile_results[config_info.config_type] = configs
self._compile_team_metadata()

# check if staging_output_dir exists
staging_dir = self.compile_context.staging_output_dir()
Expand All @@ -58,6 +62,31 @@ def compile(self) -> Dict[ConfigType, CompileResult]:

return compile_results

def _compile_team_metadata(self):
"""
Compile the team metadata and return the compiled object.
"""
teams_dict = self.compile_context.teams_dict
for team in teams_dict:
m = MetaData()
merge_team_execution_info(m, teams_dict, team)

tjson = serializer.thrift_simple_json(m)
name = f"{team}.{team}_team_metadata"
result = CompiledObj(
name=name,
obj=m,
file=name,
errors=None,
obj_type=MetaData.__name__,
tjson=tjson,
)
self._write_object(result)
self.compile_context.compile_status.add_object_update_display(result, MetaData.__name__)

# Done writing team metadata, close the class
self.compile_context.compile_status.close_cls(MetaData.__name__)

def _compile_class_configs(self, config_info: ConfigInfo) -> CompileResult:

compile_result = CompileResult(
Expand Down Expand Up @@ -120,10 +149,7 @@ def _write_objects_in_folder(
return object_dict, error_dict

def _write_object(self, compiled_obj: CompiledObj) -> Optional[List[BaseException]]:

obj = compiled_obj.obj

output_path = self.compile_context.staging_output_path(obj)
output_path = self.compile_context.staging_output_path(compiled_obj)

folder = os.path.dirname(output_path)

Expand Down
16 changes: 12 additions & 4 deletions api/python/ai/chronon/cli/compile/parse_teams.py
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,7 @@ def load_teams(conf_root: str, print: bool = True) -> Dict[str, Team]:
return team_dict



def update_metadata(obj: Any, team_dict: Dict[str, Team]):

assert obj is not None, "Cannot update metadata None object"
Expand Down Expand Up @@ -121,16 +122,23 @@ def set_group_by_metadata(join_part_gb, output_namespace):
if metadata.executionInfo is None:
metadata.executionInfo = ExecutionInfo()

merge_team_execution_info(metadata, team_dict, team)

def merge_team_execution_info(metadata: MetaData, team_dict: Dict[str, Team], team_name: str):
default_team = team_dict.get(_DEFAULT_CONF_TEAM)
if not metadata.executionInfo:
metadata.executionInfo = ExecutionInfo()

metadata.executionInfo.env = _merge_mode_maps(
team_dict[_DEFAULT_CONF_TEAM].env,
team_dict[team].env,
default_team.env if default_team else {},
team_dict[team_name].env,
metadata.executionInfo.env,
env_or_config_attribute=EnvOrConfigAttribute.ENV,
)

metadata.executionInfo.conf = _merge_mode_maps(
team_dict[_DEFAULT_CONF_TEAM].conf,
team_dict[team].conf,
default_team.conf if default_team else {},
team_dict[team_name].conf,
metadata.executionInfo.conf,
env_or_config_attribute=EnvOrConfigAttribute.CONFIG,
)
Expand Down
2 changes: 1 addition & 1 deletion api/python/ai/chronon/repo/constants.py
Original file line number Diff line number Diff line change
Expand Up @@ -99,7 +99,7 @@ def __str__(self):
RunMode.SOURCE_JOB: OFFLINE_ARGS,
RunMode.JOIN_PART_JOB: OFFLINE_ARGS,
RunMode.MERGE_JOB: OFFLINE_ARGS,
RunMode.METASTORE: "--partition-names={partition_names}",
RunMode.METASTORE: "", # purposely left blank. we'll handle this specifically
RunMode.INFO: "",
}

Expand Down
23 changes: 12 additions & 11 deletions api/python/ai/chronon/repo/default_runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ def __init__(self, args, jar_path):
self.online_jar = args.get(ONLINE_JAR_ARG)
self.online_class = args.get(ONLINE_CLASS_ARG)

self.conf_type = args.get("conf_type", "").replace(
self.conf_type = (args.get("conf_type") or "").replace(
"-", "_"
) # in case user sets dash instead of underscore

Expand Down Expand Up @@ -54,7 +54,8 @@ def __init__(self, args, jar_path):
os.environ["CHRONON_ONLINE_JAR"] = self.online_jar
print("Downloaded jar to {}".format(self.online_jar))

if self.conf:
if (self.conf
and (self.mode != "metastore")): # TODO: don't check for metastore
try:
self.context, self.conf_type, self.team, _ = self.conf.split("/")[-4:]
except Exception as e:
Expand Down Expand Up @@ -231,23 +232,23 @@ def run(self):
def _gen_final_args(
self, start_ds=None, end_ds=None, override_conf_path=None, **kwargs
):
base_args = MODE_ARGS[self.mode].format(
base_args = MODE_ARGS.get(self.mode).format(
conf_path=override_conf_path if override_conf_path else self.conf,
ds=end_ds if end_ds else self.ds,
online_jar=self.online_jar,
online_class=self.online_class,
)

base_args = (
base_args + f" --conf-type={self.conf_type} "
if self.conf_type
else base_args
)
submitter_args = []

if self.conf_type:
submitter_args.append(f"--conf-type={self.conf_type}")

if self.mode != RunMode.FETCH:
base_args += " --local-conf-path={conf}".format(
submitter_args.append(" --local-conf-path={conf}".format(
conf=self.local_abs_conf_path
) + " --original-mode={mode}".format(mode=self.mode)
))
submitter_args.append(" --original-mode={mode}".format(mode=self.mode))

override_start_partition_arg = (
"--start-partition-override=" + start_ds if start_ds else ""
Expand All @@ -260,7 +261,7 @@ def _gen_final_args(
)

final_args = " ".join(
[base_args, str(self.args), override_start_partition_arg, additional_args]
[base_args, str(self.args), override_start_partition_arg, ' '.join(submitter_args), additional_args]
)

return final_args
10 changes: 8 additions & 2 deletions api/python/ai/chronon/repo/gcp.py
Original file line number Diff line number Diff line change
Expand Up @@ -235,6 +235,8 @@ def generate_dataproc_submitter_args(

final_args = "{user_args} --jar-uri={jar_uri} --job-type={job_type} --main-class={main_class}"



if job_type == JobType.FLINK:
main_class = "ai.chronon.flink.FlinkJob"
flink_jar_uri = (
Expand All @@ -259,7 +261,7 @@ def generate_dataproc_submitter_args(
jar_uri=jar_uri,
job_type=job_type.value,
main_class=main_class,
) + f" --files={gcs_file_args}"
) + (f" --files={gcs_file_args}" if gcs_file_args else "")

)
else:
Expand Down Expand Up @@ -321,9 +323,13 @@ def run(self):
"--partition-names" in args
), "Must specify a list of `--partition-names=schema.table/pk1=pv1/pk2=pv2"

local_files_to_upload_to_gcs = (
[os.path.join(self.repo, self.conf)] if self.conf else []
)
dataproc_args = self.generate_dataproc_submitter_args(
# for now, self.conf is the only local file that requires uploading to gcs
user_args=args,
local_files_to_upload=local_files_to_upload_to_gcs,
user_args=self._gen_final_args(),
version=self._args["version"],
)
command = f"java -cp {self.jar_path} {DATAPROC_ENTRY} {dataproc_args}"
Expand Down
1 change: 0 additions & 1 deletion api/python/ai/chronon/repo/run.py
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,6 @@ def set_defaults(ctx):
# NOTE: We don't want to ever call the fetch_online_jar.py script since we're working
# on our internal zipline fork of the chronon repo
# "online_jar_fetch": os.path.join(chronon_repo_path, "scripts/fetch_online_jar.py"),
"conf_type": "group_bys",
"online_args": os.environ.get("CHRONON_ONLINE_ARGS", ""),
"chronon_jar": os.environ.get("CHRONON_DRIVER_JAR"),
"list_apps": "python3 "
Expand Down
43 changes: 22 additions & 21 deletions api/python/ai/chronon/repo/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -179,34 +179,35 @@ def set_runtime_env_v3(params, conf):
if os.path.isfile(conf_path):
with open(conf_path, "r") as infile:
conf_json = json.load(infile)
env = conf_json.get("metaData", {}).get("executionInfo", {}).get("env", {})
metadata = conf_json.get("metaData", {}) or conf_json # user may just pass metadata as the entire json
env = metadata.get("executionInfo", {}).get("env", {})
runtime_env.update(env.get(EnvOrConfigAttribute.ENV,{}).get(effective_mode,{}) or env.get("common", {}))
# Also set APP_NAME
try:
_, conf_type, team, _ = conf.split("/")[-4:]
except Exception as e:
LOG.error(
"Invalid conf path: {}, please ensure to supply the relative path to zipline/ folder".format(
if not team:
team = "default"
# context is the environment in which the job is running, which is provided from the args,
# default to be dev.
if params["env"]:
context = params["env"]
else:
context = "dev"
LOG.info(f"Context: {context} -- conf_type: {conf_type} -- team: {team}")

runtime_env["APP_NAME"] = APP_NAME_TEMPLATE.format(
mode=effective_mode,
conf_type=conf_type,
context=context,
name=conf_json["metaData"]["name"],
)
except Exception:
LOG.warn(
"Failed to set APP_NAME due to invalid conf path: {}, please ensure to supply the "
"relative path to zipline/ folder".format(
conf
)
)
raise e
if not team:
team = "default"
# context is the environment in which the job is running, which is provided from the args,
# default to be dev.
if params["env"]:
context = params["env"]
else:
context = "dev"
LOG.info(f"Context: {context} -- conf_type: {conf_type} -- team: {team}")

runtime_env["APP_NAME"] = APP_NAME_TEMPLATE.format(
mode=effective_mode,
conf_type=conf_type,
context=context,
name=conf_json["metaData"]["name"],
)
else:
if not params.get("app_name") and not os.environ.get("APP_NAME"):
# Provide basic app_name when no conf is defined.
Expand Down
8 changes: 7 additions & 1 deletion spark/src/main/scala/ai/chronon/spark/Driver.scala
Original file line number Diff line number Diff line change
Expand Up @@ -1013,7 +1013,13 @@ object Driver {
val tableUtils = args.buildTableUtils()

val isAllPartitionsPresent = tablesToPartitionSpec.forall { case (tbl, spec) =>
tableUtils.allPartitions(tbl).contains(spec)
val result = tableUtils.containsPartitions(tbl, spec)
if (result) {
logger.info(s"Table ${tbl} has partition ${spec} present.")
} else {
logger.info(s"Table ${tbl} does not have partition ${spec} present.")
}
result
}
if (isAllPartitionsPresent) {
logger.info(s"All partitions ${partitionNames} are present.")
Expand Down
Loading
Loading