Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 3 additions & 3 deletions api/py/ai/chronon/repo/run.py
Original file line number Diff line number Diff line change
Expand Up @@ -186,7 +186,7 @@ def download_only_once(url, path, skip_download=False):
)
)
if local_size == remote_size:
print("Sizes match. Assuming its already downloaded.")
print("Sizes match. Assuming it's already downloaded.")
should_download = False
if should_download:
print("Different file from remote at local: " + path + ". Re-downloading..")
Expand Down Expand Up @@ -412,7 +412,7 @@ def __init__(self, args, jar_path):
raise e
possible_modes = list(ROUTES[self.conf_type].keys()) + UNIVERSAL_ROUTES
assert (
args["mode"] in possible_modes
args["mode"] in possible_modes
), "Invalid mode:{} for conf:{} of type:{}, please choose from {}".format(
args["mode"], self.conf, self.conf_type, possible_modes
)
Expand Down Expand Up @@ -733,7 +733,7 @@ def generate_dataproc_submitter_args(local_files_to_upload_to_gcs: List[str], us

# include chronon jar uri. should also already be in the bucket
chronon_jar_uri = f"{zipline_artifacts_bucket_prefix}-{get_customer_id()}" + \
"/jars/cloud_gcp-assembly-0.1.0-SNAPSHOT.jar"
"/jars/cloud_gcp-assembly-0.1.0-SNAPSHOT.jar"

final_args = (f"{user_args} --additional-conf-path=additional-confs.yaml --gcs_files={gcs_file_args} "
f"--chronon_jar_uri={chronon_jar_uri}")
Expand Down
2 changes: 1 addition & 1 deletion chronon_dataproc_submitter.env
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
ZIPLINE_CUSTOMER_ID=canary
ZIPLINE_GCP_PROJECT_ID=canary-443022
ZIPLINE_GCP_REGION=us-central1
ZIPLINE_GCP_DATAPROC_CLUSTER_NAME=zipline-canary-cluster
ZIPLINE_GCP_DATAPROC_CLUSTER_NAME=canary-2
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why this change?

Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,6 @@ import org.apache.spark.sql.functions.col
import org.apache.spark.sql.functions.date_format
import org.apache.spark.sql.functions.to_date

import scala.collection.JavaConverters._

case class GcpFormatProvider(sparkSession: SparkSession) extends FormatProvider {
// Order of Precedence for Default Project
// Explicitly configured project in code (e.g., setProjectId()).
Expand Down Expand Up @@ -62,22 +60,22 @@ case class GcpFormatProvider(sparkSession: SparkSession) extends FormatProvider
.map((table) => {

if (table.getDefinition.isInstanceOf[ExternalTableDefinition]) {
val uris = table.getDefinition
.asInstanceOf[ExternalTableDefinition]
.getSourceUris
.asScala
.toList
.map((uri) => uri.stripSuffix("/*") + "/")

assert(uris.length == 1, s"External table ${tableName} can be backed by only one URI.")

val formatStr = table.getDefinition
.asInstanceOf[ExternalTableDefinition]
.getFormatOptions
.asInstanceOf[FormatOptions]
.getType

GCS(table.getTableId.getProject, uris.head, formatStr)
val externalTable = table.getDefinition.asInstanceOf[ExternalTableDefinition]
val uri = Option(externalTable.getHivePartitioningOptions)
.map(_.getSourceUriPrefix)
.getOrElse {
val uris = externalTable.getSourceUris
require(uris.size == 1, s"External table ${tableName} can be backed by only one URI.")
uris.get(0).replaceAll("/\\*\\.parquet$", "")
Copy link
Collaborator

@tchow-zlai tchow-zlai Jan 13, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

sounds good, thanks. Would be great to unit test this but not a big deal.

}
GCS(table.getTableId.getProject, uri, formatStr)

} else if (table.getDefinition.isInstanceOf[StandardTableDefinition]) {
BQuery(table.getTableId.getProject, Map.empty)
} else throw new IllegalStateException(s"Cannot support table of type: ${table.getDefinition}")
Expand Down
Loading