Skip to content
Merged
Show file tree
Hide file tree
Changes from 27 commits
Commits
Show all changes
29 commits
Select commit Hold shift + click to select a range
f941a6f
Connect run.py to DataprocSubmitter.scala so that offline jobs can be…
david-zlai Jan 8, 2025
a6b154b
put this back.
david-zlai Jan 8, 2025
e3b0c48
fix lint stuff.
david-zlai Jan 8, 2025
7472f45
add some more notes on why we only take the filename.
david-zlai Jan 8, 2025
61eba99
rename
david-zlai Jan 8, 2025
a8525bc
fix python linting.
david-zlai Jan 8, 2025
67d2159
Adding google-cloud-storage to dev.in and running pip-compile-multi …
david-zlai Jan 8, 2025
49b60a7
pin zipp
david-zlai Jan 9, 2025
ad3207c
try removing certifi
david-zlai Jan 9, 2025
f9ef45f
upgrade zipp to 3.21.0 even higher.
david-zlai Jan 9, 2025
957afad
remove zipp.
david-zlai Jan 9, 2025
8d0257a
try again with snyk
david-zlai Jan 9, 2025
0096de2
pin also in base.
david-zlai Jan 9, 2025
d7c82c3
downgrade google cloud core
david-zlai Jan 9, 2025
f3da7d7
downgrade google-cloud-core.
david-zlai Jan 9, 2025
f96d062
one more attempt
david-zlai Jan 9, 2025
3d1d6ff
PR comments.
david-zlai Jan 9, 2025
fc40e7d
more changes.
david-zlai Jan 9, 2025
01ba185
cleanup
david-zlai Jan 9, 2025
96a748a
fix delete
david-zlai Jan 9, 2025
263899c
format
david-zlai Jan 9, 2025
fe13f57
more PR comments.
david-zlai Jan 9, 2025
e5282e0
fix deps
david-zlai Jan 9, 2025
a1463cd
python formatting
david-zlai Jan 9, 2025
047c21e
actually fix formatting
david-zlai Jan 9, 2025
903c23f
fix project id
david-zlai Jan 9, 2025
431b7e1
constant around dataproc entry
david-zlai Jan 9, 2025
20141ea
download dataproc jar once only
david-zlai Jan 9, 2025
14ec11b
remove print
david-zlai Jan 9, 2025
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
386 changes: 272 additions & 114 deletions api/py/ai/chronon/repo/run.py

Large diffs are not rendered by default.

2 changes: 1 addition & 1 deletion api/py/requirements/base.in
Original file line number Diff line number Diff line change
@@ -1,2 +1,2 @@
click
thrift==0.13.0
thrift==0.21.0
8 changes: 4 additions & 4 deletions api/py/requirements/base.txt
Original file line number Diff line number Diff line change
@@ -1,13 +1,13 @@
# SHA1:1d44bb5a0f927ef885e838e299990ba7ecd68dda
# SHA1:e6acf05ccde0547fea15f1185172abe12ea24af2
#
# This file is autogenerated by pip-compile-multi
# To update, run:
#
# pip-compile-multi
#
click==8.1.7
click==8.1.8
# via -r requirements/base.in
six==1.16.0
six==1.17.0
# via thrift
thrift==0.20.0
thrift==0.21.0
# via -r requirements/base.in
4 changes: 4 additions & 0 deletions api/py/requirements/dev.in
Original file line number Diff line number Diff line change
Expand Up @@ -5,3 +5,7 @@ black
pre-commit
isort
autoflake
zipp==3.19.1
importlib-metadata==8.4.0
google-cloud-storage==2.19.0

88 changes: 71 additions & 17 deletions api/py/requirements/dev.txt
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
# SHA1:4a4dda2421311c0c074c847c55e8d962d8c2e7cf
# SHA1:06b28bca353cb7f65a2a3712ef7c6deaeb35f0bb
#
# This file is autogenerated by pip-compile-multi
# To update, run:
Expand All @@ -8,26 +8,55 @@
-r base.txt
autoflake==2.3.1
# via -r requirements/dev.in
black==24.4.2
black==24.10.0
# via -r requirements/dev.in
cachetools==5.3.3
# via tox
cachetools==5.5.0
# via
# google-auth
# tox
cfgv==3.4.0
# via pre-commit
chardet==5.2.0
# via tox
charset-normalizer==3.4.1
# via requests
colorama==0.4.6
# via tox
coverage[toml]==7.5.4
coverage[toml]==7.6.10
# via pytest-cov
distlib==0.3.8
distlib==0.3.9
# via virtualenv
filelock==3.15.4
filelock==3.16.1
# via
# tox
# virtualenv
identify==2.5.36
google-api-core==2.24.0
# via
# google-cloud-core
# google-cloud-storage
google-auth==2.37.0
# via
# google-api-core
# google-cloud-core
# google-cloud-storage
google-cloud-core==2.4.1
# via google-cloud-storage
google-cloud-storage==2.19.0
# via -r requirements/dev.in
google-crc32c==1.6.0
# via
# google-cloud-storage
# google-resumable-media
google-resumable-media==2.7.2
# via google-cloud-storage
googleapis-common-protos==1.66.0
# via google-api-core
identify==2.6.5
# via pre-commit
idna==3.10
# via requests
importlib-metadata==8.4.0
# via -r requirements/dev.in
iniconfig==2.0.0
# via pytest
isort==5.13.2
Expand All @@ -36,15 +65,15 @@ mypy-extensions==1.0.0
# via black
nodeenv==1.9.1
# via pre-commit
packaging==24.1
packaging==24.2
# via
# black
# pyproject-api
# pytest
# tox
pathspec==0.12.1
# via black
platformdirs==4.2.2
platformdirs==4.3.6
# via
# black
# tox
Expand All @@ -53,21 +82,46 @@ pluggy==1.5.0
# via
# pytest
# tox
pre-commit==3.7.1
pre-commit==4.0.1
# via -r requirements/dev.in
proto-plus==1.25.0
# via google-api-core
protobuf==5.29.3
# via
# google-api-core
# googleapis-common-protos
# proto-plus
pyasn1==0.6.1
# via
# pyasn1-modules
# rsa
pyasn1-modules==0.4.1
# via google-auth
pyflakes==3.2.0
# via autoflake
pyproject-api==1.7.1
pyproject-api==1.8.0
# via tox
pytest==8.2.2
pytest==8.3.4
# via pytest-cov
pytest-cov==5.0.0
pytest-cov==6.0.0
# via -r requirements/dev.in
pyyaml==6.0.1
pyyaml==6.0.2
# via pre-commit
tox==4.16.0
requests==2.32.3
# via
# google-api-core
# google-cloud-storage
rsa==4.9
# via google-auth
tox==4.23.2
# via -r requirements/dev.in
virtualenv==20.26.3
urllib3==2.3.0
# via requests
virtualenv==20.28.1
# via
# pre-commit
# tox
zipp==3.19.1
# via
# -r requirements/dev.in
# importlib-metadata
19 changes: 18 additions & 1 deletion build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,18 @@ inThisBuild(
lazy val supportedVersions = List(scala_2_12) // List(scala211, scala212, scala213)

lazy val root = (project in file("."))
.aggregate(api, aggregator, online, spark, flink, cloud_gcp, cloud_aws, service_commons, service, hub, orchestration)
.aggregate(api,
aggregator,
online,
spark,
flink,
cloud_gcp,
cloud_gcp_submitter,
cloud_aws,
service_commons,
service,
hub,
orchestration)
.settings(name := "chronon")

val spark_sql = Seq(
Expand Down Expand Up @@ -233,6 +244,12 @@ lazy val cloud_gcp = project
dependencyOverrides += "ch.qos.reload4j" % "reload4j" % "1.2.25"
)

lazy val cloud_gcp_submitter = project
.dependsOn(cloud_gcp)
.settings(
mainClass in (Compile, run) := Some("ai.chronon.integrations.cloud_gcp.DataprocSubmitter")
)

lazy val cloud_aws = project
.dependsOn(api.%("compile->compile;test->test"), online)
.settings(
Expand Down
4 changes: 4 additions & 0 deletions chronon_dataproc_submitter.env
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
ZIPLINE_CUSTOMER_ID=canary
ZIPLINE_GCP_PROJECT_ID=canary-443022
ZIPLINE_GCP_REGION=us-central1
ZIPLINE_GCP_DATAPROC_CLUSTER_NAME=zipline-canary-cluster
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,13 @@ object DataprocSubmitter {
new DataprocSubmitter(jobControllerClient, conf)
}

def apply(conf: SubmitterConf): DataprocSubmitter = {
val jobControllerClient = JobControllerClient.create(
JobControllerSettings.newBuilder().setEndpoint(conf.endPoint).build()
)
new DataprocSubmitter(jobControllerClient, conf)
}

private[cloud_gcp] def loadConfig: SubmitterConf = {
val inputStreamOption = Option(getClass.getClassLoader.getResourceAsStream("dataproc-submitter-conf.yaml"))
val yamlLoader = new Yaml()
Expand All @@ -105,6 +112,48 @@ object DataprocSubmitter {
.getOrElse(throw new IllegalArgumentException("Yaml conf not found or invalid yaml"))

}
def main(args: Array[String]): Unit = {
val chrononJarUri = args.filter(_.startsWith("--chronon_jar_uri"))(0).split("=")(1)

// search args array for prefix `--gcs_files`
val gcsFiles = args
.filter(_.startsWith("--gcs_files"))(0)
.split("=")(1)
.split(",")

Comment on lines +116 to +123
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue

Validate command-line arguments to prevent exceptions

Check for missing or malformed arguments to avoid crashes.

Comment on lines +115 to +123
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue

Add argument validation

Add bounds and format checking to prevent runtime exceptions.

-    val chrononJarUri = args.filter(_.startsWith("--chronon_jar_uri"))(0).split("=")(1)
+    val chrononJarUriArg = args.find(_.startsWith("--chronon_jar_uri"))
+      .getOrElse(throw new IllegalArgumentException("Missing required argument: --chronon_jar_uri"))
+    val chrononJarUri = chrononJarUriArg.split("=").lift(1)
+      .getOrElse(throw new IllegalArgumentException("Invalid format for --chronon_jar_uri"))

-    val gcsFiles = args
-      .filter(_.startsWith("--gcs_files"))(0)
-      .split("=")(1)
-      .split(",")
+    val gcsFilesArg = args.find(_.startsWith("--gcs_files"))
+      .getOrElse(throw new IllegalArgumentException("Missing required argument: --gcs_files"))
+    val gcsFiles = gcsFilesArg.split("=").lift(1)
+      .getOrElse(throw new IllegalArgumentException("Invalid format for --gcs_files"))
+      .split(",")
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
def main(args: Array[String]): Unit = {
val chrononJarUri = args.filter(_.startsWith("--chronon_jar_uri"))(0).split("=")(1)
// search args array for prefix `--gcs_files`
val gcsFiles = args
.filter(_.startsWith("--gcs_files"))(0)
.split("=")(1)
.split(",")
def main(args: Array[String]): Unit = {
val chrononJarUriArg = args.find(_.startsWith("--chronon_jar_uri"))
.getOrElse(throw new IllegalArgumentException("Missing required argument: --chronon_jar_uri"))
val chrononJarUri = chrononJarUriArg.split("=").lift(1)
.getOrElse(throw new IllegalArgumentException("Invalid format for --chronon_jar_uri"))
val gcsFilesArg = args.find(_.startsWith("--gcs_files"))
.getOrElse(throw new IllegalArgumentException("Missing required argument: --gcs_files"))
val gcsFiles = gcsFilesArg.split("=").lift(1)
.getOrElse(throw new IllegalArgumentException("Invalid format for --gcs_files"))
.split(",")

val userArgs = args.filter(f => !f.startsWith("--gcs_files") && !f.startsWith("--chronon_jar_uri"))

val required_vars = List.apply(
"ZIPLINE_GCP_PROJECT_ID",
"ZIPLINE_GCP_REGION",
"ZIPLINE_GCP_DATAPROC_CLUSTER_NAME"
)
val missing_vars = required_vars.filter(!sys.env.contains(_))
if (missing_vars.nonEmpty) {
throw new Exception(s"Missing required environment variables: ${missing_vars.mkString(", ")}")
}

val projectId = sys.env.getOrElse("ZIPLINE_GCP_PROJECT_ID", throw new Exception("ZIPLINE_GCP_PROJECT_ID not set"))
val region = sys.env.getOrElse("ZIPLINE_GCP_REGION", throw new Exception("ZIPLINE_GCP_REGION not set"))
val clusterName = sys.env
.getOrElse("ZIPLINE_GCP_DATAPROC_CLUSTER_NAME", throw new Exception("ZIPLINE_GCP_DATAPROC_CLUSTER_NAME not set"))

val submitterConf = SubmitterConf(
projectId,
region,
clusterName,
chrononJarUri,
"ai.chronon.spark.Driver"
)

val a = DataprocSubmitter(submitterConf)

val jobId = a.submit(
gcsFiles.toList,
userArgs: _*
)
println("Dataproc submitter job id: " + jobId)
}
}

object DataprocAuth extends JobAuth {}
Loading