diff --git a/api/python/ai/chronon/cli/compile/compile_context.py b/api/python/ai/chronon/cli/compile/compile_context.py index 14d2d4a641..45f1da86db 100644 --- a/api/python/ai/chronon/cli/compile/compile_context.py +++ b/api/python/ai/chronon/cli/compile/compile_context.py @@ -1,10 +1,10 @@ import os from dataclasses import dataclass -from typing import Any, Dict, List, Type +from typing import Any, Dict, List, Optional, Type import ai.chronon.cli.compile.parse_teams as teams from ai.chronon.api.common.ttypes import ConfigType -from ai.chronon.api.ttypes import GroupBy, Join, Model, StagingQuery, Team +from ai.chronon.api.ttypes import GroupBy, Join, MetaData, Model, StagingQuery, Team from ai.chronon.cli.compile.conf_validator import ConfValidator from ai.chronon.cli.compile.display.compile_status import CompileStatus from ai.chronon.cli.compile.display.compiled_obj import CompiledObj @@ -18,7 +18,7 @@ class ConfigInfo: folder_name: str cls: Type - config_type: ConfigType + config_type: Optional[ConfigType] @dataclass @@ -42,6 +42,7 @@ def __init__(self): config_type=ConfigType.STAGING_QUERY, ), ConfigInfo(folder_name="models", cls=Model, config_type=ConfigType.MODEL), + ConfigInfo(folder_name="teams_metadata", cls=MetaData, config_type=None), # only for team metadata ] self.compile_status = CompileStatus(use_live=False) @@ -51,6 +52,8 @@ def __init__(self): cls = config_info.cls self.existing_confs[cls] = self._parse_existing_confs(cls) + + self.validator: ConfValidator = ConfValidator( input_root=self.chronon_root, output_root=self.compile_dir, @@ -94,15 +97,15 @@ def output_dir(self, cls: type = None) -> str: self.chronon_root, self.compile_dir, config_info.folder_name ) - def staging_output_path(self, obj: Any): + def staging_output_path(self, compiled_obj: CompiledObj): """ - eg., input: group_by with name search.clicks.features.v1 - eg., output: root/compiled_staging/group_bys/search/clicks.features.v1 """ - output_dir = self.staging_output_dir(obj.__class__) # compiled/joins + output_dir = self.staging_output_dir(compiled_obj.obj.__class__) # compiled/joins - team, rest = obj.metaData.name.split(".", 1) # search, clicks.features.v1 + team, rest = compiled_obj.name.split(".", 1) # search, clicks.features.v1 return os.path.join( output_dir, @@ -139,22 +142,30 @@ def _parse_existing_confs(self, obj_class: type) -> Dict[str, object]: try: obj = file2thrift(full_path, obj_class) - if obj and hasattr(obj, "metaData"): - result[obj.metaData.name] = obj - - compiled_obj = CompiledObj( - name=obj.metaData.name, - obj=obj, - file=obj.metaData.sourceFile, - errors=None, - obj_type=obj_class.__name__, - tjson=open(full_path).read(), - ) - - self.compile_status.add_existing_object_update_display( - compiled_obj - ) - + if obj: + if hasattr(obj, "metaData"): + result[obj.metaData.name] = obj + compiled_obj = CompiledObj( + name=obj.metaData.name, + obj=obj, + file=obj.metaData.sourceFile, + errors=None, + obj_type=obj_class.__name__, + tjson=open(full_path).read(), + ) + self.compile_status.add_existing_object_update_display(compiled_obj) + elif isinstance(obj, MetaData): + team_metadata_name = '.'.join(full_path.split('/')[-2:]) # use the name of the file as team metadata won't have name + result[team_metadata_name] = obj + compiled_obj = CompiledObj( + name=team_metadata_name, + obj=obj, + file=obj.sourceFile, + errors=None, + obj_type=obj_class.__name__, + tjson=open(full_path).read(), + ) + self.compile_status.add_existing_object_update_display(compiled_obj) else: logger.errors( f"Parsed object from {full_path} has no metaData attribute" diff --git a/api/python/ai/chronon/cli/compile/compiler.py b/api/python/ai/chronon/cli/compile/compiler.py index 8525efc321..de00ea54ba 100644 --- a/api/python/ai/chronon/cli/compile/compiler.py +++ b/api/python/ai/chronon/cli/compile/compiler.py @@ -8,9 +8,12 @@ import ai.chronon.cli.compile.parse_configs as parser import ai.chronon.cli.logger as logger from ai.chronon.api.common.ttypes import ConfigType +from ai.chronon.cli.compile import serializer from ai.chronon.cli.compile.compile_context import CompileContext, ConfigInfo from ai.chronon.cli.compile.display.compiled_obj import CompiledObj from ai.chronon.cli.compile.display.console import console +from ai.chronon.cli.compile.parse_teams import merge_team_execution_info +from ai.chronon.types import MetaData logger = logger.get_logger() @@ -37,6 +40,7 @@ def compile(self) -> Dict[ConfigType, CompileResult]: configs = self._compile_class_configs(config_info) compile_results[config_info.config_type] = configs + self._compile_team_metadata() # check if staging_output_dir exists staging_dir = self.compile_context.staging_output_dir() @@ -58,6 +62,31 @@ def compile(self) -> Dict[ConfigType, CompileResult]: return compile_results + def _compile_team_metadata(self): + """ + Compile the team metadata and return the compiled object. + """ + teams_dict = self.compile_context.teams_dict + for team in teams_dict: + m = MetaData() + merge_team_execution_info(m, teams_dict, team) + + tjson = serializer.thrift_simple_json(m) + name = f"{team}.{team}_team_metadata" + result = CompiledObj( + name=name, + obj=m, + file=name, + errors=None, + obj_type=MetaData.__name__, + tjson=tjson, + ) + self._write_object(result) + self.compile_context.compile_status.add_object_update_display(result, MetaData.__name__) + + # Done writing team metadata, close the class + self.compile_context.compile_status.close_cls(MetaData.__name__) + def _compile_class_configs(self, config_info: ConfigInfo) -> CompileResult: compile_result = CompileResult( @@ -120,10 +149,7 @@ def _write_objects_in_folder( return object_dict, error_dict def _write_object(self, compiled_obj: CompiledObj) -> Optional[List[BaseException]]: - - obj = compiled_obj.obj - - output_path = self.compile_context.staging_output_path(obj) + output_path = self.compile_context.staging_output_path(compiled_obj) folder = os.path.dirname(output_path) diff --git a/api/python/ai/chronon/cli/compile/parse_teams.py b/api/python/ai/chronon/cli/compile/parse_teams.py index f8582305ac..ecc862a7dd 100644 --- a/api/python/ai/chronon/cli/compile/parse_teams.py +++ b/api/python/ai/chronon/cli/compile/parse_teams.py @@ -69,6 +69,7 @@ def load_teams(conf_root: str, print: bool = True) -> Dict[str, Team]: return team_dict + def update_metadata(obj: Any, team_dict: Dict[str, Team]): assert obj is not None, "Cannot update metadata None object" @@ -121,16 +122,23 @@ def set_group_by_metadata(join_part_gb, output_namespace): if metadata.executionInfo is None: metadata.executionInfo = ExecutionInfo() + merge_team_execution_info(metadata, team_dict, team) + +def merge_team_execution_info(metadata: MetaData, team_dict: Dict[str, Team], team_name: str): + default_team = team_dict.get(_DEFAULT_CONF_TEAM) + if not metadata.executionInfo: + metadata.executionInfo = ExecutionInfo() + metadata.executionInfo.env = _merge_mode_maps( - team_dict[_DEFAULT_CONF_TEAM].env, - team_dict[team].env, + default_team.env if default_team else {}, + team_dict[team_name].env, metadata.executionInfo.env, env_or_config_attribute=EnvOrConfigAttribute.ENV, ) metadata.executionInfo.conf = _merge_mode_maps( - team_dict[_DEFAULT_CONF_TEAM].conf, - team_dict[team].conf, + default_team.conf if default_team else {}, + team_dict[team_name].conf, metadata.executionInfo.conf, env_or_config_attribute=EnvOrConfigAttribute.CONFIG, ) diff --git a/api/python/ai/chronon/repo/constants.py b/api/python/ai/chronon/repo/constants.py index 16d5ee2745..5f66add345 100644 --- a/api/python/ai/chronon/repo/constants.py +++ b/api/python/ai/chronon/repo/constants.py @@ -99,7 +99,7 @@ def __str__(self): RunMode.SOURCE_JOB: OFFLINE_ARGS, RunMode.JOIN_PART_JOB: OFFLINE_ARGS, RunMode.MERGE_JOB: OFFLINE_ARGS, - RunMode.METASTORE: "--partition-names={partition_names}", + RunMode.METASTORE: "", # purposely left blank. we'll handle this specifically RunMode.INFO: "", } diff --git a/api/python/ai/chronon/repo/default_runner.py b/api/python/ai/chronon/repo/default_runner.py index c3a2873b29..15a2bbd0f4 100644 --- a/api/python/ai/chronon/repo/default_runner.py +++ b/api/python/ai/chronon/repo/default_runner.py @@ -26,7 +26,7 @@ def __init__(self, args, jar_path): self.online_jar = args.get(ONLINE_JAR_ARG) self.online_class = args.get(ONLINE_CLASS_ARG) - self.conf_type = args.get("conf_type", "").replace( + self.conf_type = (args.get("conf_type") or "").replace( "-", "_" ) # in case user sets dash instead of underscore @@ -54,7 +54,8 @@ def __init__(self, args, jar_path): os.environ["CHRONON_ONLINE_JAR"] = self.online_jar print("Downloaded jar to {}".format(self.online_jar)) - if self.conf: + if (self.conf + and (self.mode != "metastore")): # TODO: don't check for metastore try: self.context, self.conf_type, self.team, _ = self.conf.split("/")[-4:] except Exception as e: @@ -231,23 +232,23 @@ def run(self): def _gen_final_args( self, start_ds=None, end_ds=None, override_conf_path=None, **kwargs ): - base_args = MODE_ARGS[self.mode].format( + base_args = MODE_ARGS.get(self.mode).format( conf_path=override_conf_path if override_conf_path else self.conf, ds=end_ds if end_ds else self.ds, online_jar=self.online_jar, online_class=self.online_class, ) - base_args = ( - base_args + f" --conf-type={self.conf_type} " - if self.conf_type - else base_args - ) + submitter_args = [] + + if self.conf_type: + submitter_args.append(f"--conf-type={self.conf_type}") if self.mode != RunMode.FETCH: - base_args += " --local-conf-path={conf}".format( + submitter_args.append(" --local-conf-path={conf}".format( conf=self.local_abs_conf_path - ) + " --original-mode={mode}".format(mode=self.mode) + )) + submitter_args.append(" --original-mode={mode}".format(mode=self.mode)) override_start_partition_arg = ( "--start-partition-override=" + start_ds if start_ds else "" @@ -260,7 +261,7 @@ def _gen_final_args( ) final_args = " ".join( - [base_args, str(self.args), override_start_partition_arg, additional_args] + [base_args, str(self.args), override_start_partition_arg, ' '.join(submitter_args), additional_args] ) return final_args diff --git a/api/python/ai/chronon/repo/gcp.py b/api/python/ai/chronon/repo/gcp.py index 20d3b201d9..08eba08131 100644 --- a/api/python/ai/chronon/repo/gcp.py +++ b/api/python/ai/chronon/repo/gcp.py @@ -235,6 +235,8 @@ def generate_dataproc_submitter_args( final_args = "{user_args} --jar-uri={jar_uri} --job-type={job_type} --main-class={main_class}" + + if job_type == JobType.FLINK: main_class = "ai.chronon.flink.FlinkJob" flink_jar_uri = ( @@ -259,7 +261,7 @@ def generate_dataproc_submitter_args( jar_uri=jar_uri, job_type=job_type.value, main_class=main_class, - ) + f" --files={gcs_file_args}" + ) + (f" --files={gcs_file_args}" if gcs_file_args else "") ) else: @@ -321,9 +323,13 @@ def run(self): "--partition-names" in args ), "Must specify a list of `--partition-names=schema.table/pk1=pv1/pk2=pv2" + local_files_to_upload_to_gcs = ( + [os.path.join(self.repo, self.conf)] if self.conf else [] + ) dataproc_args = self.generate_dataproc_submitter_args( # for now, self.conf is the only local file that requires uploading to gcs - user_args=args, + local_files_to_upload=local_files_to_upload_to_gcs, + user_args=self._gen_final_args(), version=self._args["version"], ) command = f"java -cp {self.jar_path} {DATAPROC_ENTRY} {dataproc_args}" diff --git a/api/python/ai/chronon/repo/run.py b/api/python/ai/chronon/repo/run.py index ffb05f7bc0..9683eb044b 100755 --- a/api/python/ai/chronon/repo/run.py +++ b/api/python/ai/chronon/repo/run.py @@ -72,7 +72,6 @@ def set_defaults(ctx): # NOTE: We don't want to ever call the fetch_online_jar.py script since we're working # on our internal zipline fork of the chronon repo # "online_jar_fetch": os.path.join(chronon_repo_path, "scripts/fetch_online_jar.py"), - "conf_type": "group_bys", "online_args": os.environ.get("CHRONON_ONLINE_ARGS", ""), "chronon_jar": os.environ.get("CHRONON_DRIVER_JAR"), "list_apps": "python3 " diff --git a/api/python/ai/chronon/repo/utils.py b/api/python/ai/chronon/repo/utils.py index b10217cb9c..2b01daaf8a 100644 --- a/api/python/ai/chronon/repo/utils.py +++ b/api/python/ai/chronon/repo/utils.py @@ -179,34 +179,35 @@ def set_runtime_env_v3(params, conf): if os.path.isfile(conf_path): with open(conf_path, "r") as infile: conf_json = json.load(infile) - env = conf_json.get("metaData", {}).get("executionInfo", {}).get("env", {}) + metadata = conf_json.get("metaData", {}) or conf_json # user may just pass metadata as the entire json + env = metadata.get("executionInfo", {}).get("env", {}) runtime_env.update(env.get(EnvOrConfigAttribute.ENV,{}).get(effective_mode,{}) or env.get("common", {})) # Also set APP_NAME try: _, conf_type, team, _ = conf.split("/")[-4:] - except Exception as e: - LOG.error( - "Invalid conf path: {}, please ensure to supply the relative path to zipline/ folder".format( + if not team: + team = "default" + # context is the environment in which the job is running, which is provided from the args, + # default to be dev. + if params["env"]: + context = params["env"] + else: + context = "dev" + LOG.info(f"Context: {context} -- conf_type: {conf_type} -- team: {team}") + + runtime_env["APP_NAME"] = APP_NAME_TEMPLATE.format( + mode=effective_mode, + conf_type=conf_type, + context=context, + name=conf_json["metaData"]["name"], + ) + except Exception: + LOG.warn( + "Failed to set APP_NAME due to invalid conf path: {}, please ensure to supply the " + "relative path to zipline/ folder".format( conf ) ) - raise e - if not team: - team = "default" - # context is the environment in which the job is running, which is provided from the args, - # default to be dev. - if params["env"]: - context = params["env"] - else: - context = "dev" - LOG.info(f"Context: {context} -- conf_type: {conf_type} -- team: {team}") - - runtime_env["APP_NAME"] = APP_NAME_TEMPLATE.format( - mode=effective_mode, - conf_type=conf_type, - context=context, - name=conf_json["metaData"]["name"], - ) else: if not params.get("app_name") and not os.environ.get("APP_NAME"): # Provide basic app_name when no conf is defined. diff --git a/spark/src/main/scala/ai/chronon/spark/Driver.scala b/spark/src/main/scala/ai/chronon/spark/Driver.scala index 40d7f65c20..60d489c4ab 100644 --- a/spark/src/main/scala/ai/chronon/spark/Driver.scala +++ b/spark/src/main/scala/ai/chronon/spark/Driver.scala @@ -1013,7 +1013,13 @@ object Driver { val tableUtils = args.buildTableUtils() val isAllPartitionsPresent = tablesToPartitionSpec.forall { case (tbl, spec) => - tableUtils.allPartitions(tbl).contains(spec) + val result = tableUtils.containsPartitions(tbl, spec) + if (result) { + logger.info(s"Table ${tbl} has partition ${spec} present.") + } else { + logger.info(s"Table ${tbl} does not have partition ${spec} present.") + } + result } if (isAllPartitionsPresent) { logger.info(s"All partitions ${partitionNames} are present.") diff --git a/spark/src/main/scala/ai/chronon/spark/TableUtils.scala b/spark/src/main/scala/ai/chronon/spark/TableUtils.scala index d2b611302b..c5c5fde888 100644 --- a/spark/src/main/scala/ai/chronon/spark/TableUtils.scala +++ b/spark/src/main/scala/ai/chronon/spark/TableUtils.scala @@ -22,7 +22,7 @@ import ai.chronon.api.ScalaJavaConversions._ import ai.chronon.api.{Constants, PartitionRange, PartitionSpec, Query, QueryUtils, TsUtils} import ai.chronon.spark.Extensions._ import ai.chronon.spark.format.CreationUtils.alterTablePropertiesSql -import ai.chronon.spark.format.{CreationUtils, FormatProvider} +import ai.chronon.spark.format.{CreationUtils, FormatProvider, Iceberg} import org.apache.hadoop.hive.metastore.api.AlreadyExistsException import org.apache.spark.sql.{AnalysisException, DataFrame, SaveMode, SparkSession} import org.apache.spark.sql.catalyst.analysis.TableAlreadyExistsException @@ -134,6 +134,24 @@ class TableUtils(@transient val sparkSession: SparkSession) extends Serializable } } + def containsPartitions(tableName: String, partitionSpec: Map[String, String]): Boolean = { + if (!tableReachable(tableName)) return false + + val format = tableFormatProvider + .readFormat(tableName) + .getOrElse( + throw new IllegalStateException( + s"Could not determine read format of table ${tableName}. It is no longer reachable.")) + + format match { + case Iceberg => { + partitionSpec.values.toSet.subsetOf(this.partitions(tableName).toSet) + } + case _ => this.allPartitions(tableName).contains(partitionSpec) + } + + } + // return all specified partition columns in a table in format of Map[partitionName, PartitionValue] def allPartitions(tableName: String, partitionColumnsFilter: List[String] = List.empty): List[Map[String, String]] = { diff --git a/spark/src/main/scala/ai/chronon/spark/submission/JobSubmitter.scala b/spark/src/main/scala/ai/chronon/spark/submission/JobSubmitter.scala index 1b49a7a191..de8e6927a2 100644 --- a/spark/src/main/scala/ai/chronon/spark/submission/JobSubmitter.scala +++ b/spark/src/main/scala/ai/chronon/spark/submission/JobSubmitter.scala @@ -39,16 +39,28 @@ object JobSubmitter { ThriftJsonCodec.fromJsonFile[T](confPath, check = true) def getModeConfigProperties(args: Array[String]): Option[Map[String, String]] = { + println(s"args: ${args.mkString(",")}") val localConfPathValue = getArgValue(args, LocalConfPathArgKeyword) val confTypeValue = getArgValue(args, ConfTypeArgKeyword) - val modeConfigProperties = if (localConfPathValue.isDefined && confTypeValue.isDefined) { - val metadata = confTypeValue.get match { - case "joins" => parseConf[api.Join](localConfPathValue.get).metaData - case "group_bys" => parseConf[api.GroupBy](localConfPathValue.get).metaData - case "staging_queries" => parseConf[api.StagingQuery](localConfPathValue.get).metaData - case "models" => parseConf[api.Model](localConfPathValue.get).metaData - case _ => throw new Exception("Invalid conf type") + val modeConfigProperties = if (localConfPathValue.isDefined) { + val originalMode = getArgValue(args, OriginalModeArgKeyword) + val metadata = if (confTypeValue.isDefined) { + confTypeValue.get match { + case "joins" => parseConf[api.Join](localConfPathValue.get).metaData + case "group_bys" => parseConf[api.GroupBy](localConfPathValue.get).metaData + case "staging_queries" => parseConf[api.StagingQuery](localConfPathValue.get).metaData + case "models" => parseConf[api.Model](localConfPathValue.get).metaData + case _ => + throw new IllegalArgumentException( + s"Unable to retrieve object metadata due to invalid confType $confTypeValue" + ) + } + } else if (originalMode.isDefined && originalMode.get == "metastore") { + // attempt to parse as a generic MetaData object + parseConf[api.MetaData](localConfPathValue.get) + } else { + throw new IllegalArgumentException("Unable to retrieve object metadata") } val executionInfo = Option(metadata.getExecutionInfo) @@ -73,6 +85,8 @@ object JobSubmitter { } } else None + println(s"Setting job properties: $modeConfigProperties") + modeConfigProperties } }