Skip to content
Merged
Show file tree
Hide file tree
Changes from 16 commits
Commits
Show all changes
69 commits
Select commit Hold shift + click to select a range
1367dae
Add transient clusters to dataproc submitter
chewy-zlai May 19, 2025
412852b
Remove wrong import
chewy-zlai May 19, 2025
68b6510
fix import
chewy-zlai May 19, 2025
59e848c
Add GCP to dataproc env variables
chewy-zlai May 19, 2025
78ea196
Use transient clusters for canary tests
chewy-zlai May 19, 2025
7722fc3
Make tags and initialization actions optional
chewy-zlai May 19, 2025
dd0273f
Remove opsagent initialization action
chewy-zlai May 19, 2025
3cb5080
Test with 2 workers
chewy-zlai May 19, 2025
0e867ed
Add artifact_prefix to metadata and switch to n2 as default machine type
chewy-zlai May 19, 2025
e243b9b
Fix metadata value
chewy-zlai May 19, 2025
a21daf3
Only create cluster if GCP_CREATE_CLUSTER is true
chewy-zlai May 19, 2025
9e26a67
Only create cluster for backfill jobs
chewy-zlai May 19, 2025
c234f00
Switch gating flag to be GCP_CREATE_DATAPROC=true
chewy-zlai May 19, 2025
68aadd3
Add GCP_DATAPROC_CLUSTER as default for canary tests other than backfill
chewy-zlai May 19, 2025
4bac270
Create cluster for upload as well
chewy-zlai May 19, 2025
d315bc1
Reorder env variables for clarity
chewy-zlai May 19, 2025
3e76796
Throw error if num_workers is not an int. Also scalafmt
chewy-zlai May 19, 2025
2b2b9c9
Add better error handling for dataproc creation
chewy-zlai May 19, 2025
4e03b27
Make master host type configurable
chewy-zlai May 19, 2025
9ec8107
Undo change to cluster name order
chewy-zlai May 19, 2025
4188b03
Merge branch 'main' of https://github.com/zipline-ai/chronon into che…
chewy-zlai May 19, 2025
96ff2ee
Make subnetwork configurable
chewy-zlai May 19, 2025
6b7bb66
Update timeout for cluster creation to 10 minutes, and use smaller ma…
chewy-zlai May 20, 2025
135d62e
Use transient clusters for uploads instead of backfills
chewy-zlai May 20, 2025
6936697
Set timeout for cluster creation to 15 minutes
chewy-zlai May 20, 2025
9287f58
Merge branch 'main' of https://github.com/zipline-ai/chronon into che…
chewy-zlai May 20, 2025
43d6ad0
scalafmt
chewy-zlai May 20, 2025
61cb324
Add option of setting cluster config by file
chewy-zlai May 20, 2025
6e15c9e
Parse json from config file
chewy-zlai May 21, 2025
cb82b54
Merge branch 'main' of https://github.com/zipline-ai/chronon into che…
chewy-zlai May 21, 2025
81e3d38
Merge branch 'main' of https://github.com/zipline-ai/chronon into che…
chewy-zlai May 21, 2025
2c1cfa6
Add error handling for failure to parse cluster config
chewy-zlai May 22, 2025
f4c5417
Create cluster config section of teams.py
chewy-zlai May 23, 2025
b1a1950
Merge branch 'main' of https://github.com/zipline-ai/chronon into che…
chewy-zlai May 23, 2025
b06692b
Plumb cluster config into dataproc submitter
chewy-zlai May 23, 2025
7d978b1
Merge branch 'main' of https://github.com/zipline-ai/chronon into che…
chewy-zlai May 23, 2025
08fdef4
Use types.py to import ClusterConfigProperties
chewy-zlai May 23, 2025
82018fe
fix cluster conf extraction in job submitter
chewy-zlai May 23, 2025
b07d668
fix JobSubmitter errors
chewy-zlai May 23, 2025
c8fbf2f
Make ClusterConfigProperties a map of string to string for consistency
chewy-zlai May 23, 2025
337181a
Null out cluster name for upload jobs. Use dataproc.config as map key…
chewy-zlai May 23, 2025
20d8241
Use seconds instead of hours for idle ttl
chewy-zlai May 23, 2025
5973e19
drop network from config and only include subnetwork
chewy-zlai May 23, 2025
3e8a28f
Shorten transient cluster name
chewy-zlai May 23, 2025
97e9492
Handle artifact_prefix being given with trailing /
chewy-zlai May 23, 2025
1fba8ed
python lint
chewy-zlai May 23, 2025
71d37fe
sort imports
chewy-zlai May 23, 2025
5b3c55a
sort imports
chewy-zlai May 23, 2025
1d9c592
ruff --fix
chewy-zlai May 23, 2025
dfd14a7
Update DataprocSubmitterTest
chewy-zlai May 24, 2025
e81aa66
Remove references to environment variable version of transient clusters
chewy-zlai May 26, 2025
24583fe
Revert back to println from logger as it seems to have removed the lo…
chewy-zlai May 26, 2025
902bb84
Make idle timeout configurable via generate_dataproc_cluster_config p…
chewy-zlai May 26, 2025
c97357a
flush the log message so it prints before the cluster is created
chewy-zlai May 26, 2025
7b46949
Use System.out.flush
chewy-zlai May 26, 2025
16c9ac7
Use scala.Console.println
chewy-zlai May 26, 2025
19b4712
Clean up imports
chewy-zlai May 26, 2025
e3bd354
Handle errors for getCluster call
chewy-zlai May 27, 2025
b780d50
readd transient clustername as zipline-{uuid}
chewy-zlai May 27, 2025
c6af5b5
Add a small sleep so the buffer gets flushed
chewy-zlai May 27, 2025
d0c820b
Test having both clustername and a cluster config set
chewy-zlai May 27, 2025
afbc897
Try using slf4j logger again with added import
chewy-zlai May 27, 2025
739d77f
Remove clustername from test for uploads
chewy-zlai May 27, 2025
f858596
import logback
chewy-zlai May 27, 2025
c763043
Use System.err.println to see if that gets printed quickly
chewy-zlai May 27, 2025
49e5875
Add test of name and config given
chewy-zlai May 27, 2025
927a621
Have canary include a name of the transient cluster so it only gets m…
chewy-zlai May 27, 2025
a3c68a6
scalafmt
chewy-zlai May 27, 2025
bdbd94c
Merge branch 'main' of https://github.com/zipline-ai/chronon into che…
chewy-zlai May 27, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 14 additions & 2 deletions api/python/test/canary/teams.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,9 +23,9 @@
"CUSTOMER_ID": "dev",
"GCP_PROJECT_ID": "canary-443022",
"GCP_REGION": "us-central1",
"GCP_DATAPROC_CLUSTER_NAME": "zipline-canary-cluster",
"GCP_BIGTABLE_INSTANCE_ID": "zipline-canary-instance",
"FLINK_STATE_URI": "gs://zipline-warehouse-canary/flink-state",
"GCP_DATAPROC_CLUSTER_NAME": "zipline-canary-cluster"
},
),
)
Expand Down Expand Up @@ -61,9 +61,21 @@
"CUSTOMER_ID": "dev",
"GCP_PROJECT_ID": "canary-443022",
"GCP_REGION": "us-central1",
"GCP_DATAPROC_CLUSTER_NAME": "zipline-canary-cluster",
"GCP_BIGTABLE_INSTANCE_ID": "zipline-canary-instance",
"GCP_DATAPROC_CLUSTER_NAME": "zipline-canary-cluster",
},
modeEnvironments={
RunMode.BACKFILL: {
"GCP_CREATE_DATAPROC": "true",
"GCP_DATAPROC_NUM_WORKERS": "2",
"ARTIFACT_PREFIX": "gs://zipline-artifacts-canary",
},
RunMode.UPLOAD: {
"GCP_CREATE_DATAPROC": "true",
"GCP_DATAPROC_NUM_WORKERS": "2",
"ARTIFACT_PREFIX": "gs://zipline-artifacts-canary",
}
}
),
conf=ConfigProperties(
common={
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ import ai.chronon.spark.submission.JobSubmitterConstants._
import ai.chronon.spark.submission.{JobSubmitter, JobType, FlinkJob => TypeFlinkJob, SparkJob => TypeSparkJob}
import com.google.api.gax.rpc.ApiException
import com.google.cloud.dataproc.v1._
import com.google.protobuf.Duration
import org.apache.hadoop.fs.Path
import org.json4s._
import org.json4s.jackson.JsonMethods._
Expand Down Expand Up @@ -351,8 +352,16 @@ object DataprocSubmitter {
private def initializeDataprocSubmitter(): DataprocSubmitter = {
val projectId = sys.env.getOrElse(GcpProjectIdEnvVar, throw new Exception(s"$GcpProjectIdEnvVar not set"))
val region = sys.env.getOrElse(GcpRegionEnvVar, throw new Exception(s"$GcpRegionEnvVar not set"))
val clusterName = sys.env
.getOrElse(GcpDataprocClusterNameEnvVar, throw new Exception(s"$GcpDataprocClusterNameEnvVar not set"))
val clusterName = if (sys.env.getOrElse(GcpCreateDataprocEnvVar, "false").toBoolean) {
val dataprocClient = ClusterControllerClient.create(
ClusterControllerSettings.newBuilder().setEndpoint(s"$region-dataproc.googleapis.com:443").build())
createDataprocCluster(projectId, region, dataprocClient)
} else if (sys.env.contains(GcpDataprocClusterNameEnvVar)) {
sys.env
.getOrElse(GcpDataprocClusterNameEnvVar, throw new Exception(s"$GcpDataprocClusterNameEnvVar not set"))
} else {
throw new Exception(s"Either $GcpDataprocClusterNameEnvVar or $GcpCreateDataprocEnvVar must be set, but neither are")
}

val submitterConf = SubmitterConf(
projectId,
Expand Down Expand Up @@ -517,6 +526,179 @@ object DataprocSubmitter {
}
}


private def buildClusterConfig(projectId: String, artifact_prefix: String): ClusterConfig = {
val numWorkers = sys.env
.getOrElse(GcpDataprocNumWorkersEnvVar, throw new Exception(s"$GcpCreateDataprocEnvVar is true but $GcpDataprocNumWorkersEnvVar not set"))
.toInt
val hostType = sys.env
.getOrElse(GcpDataprocHostTypeEnvVar, "n2-highmem-4")
val networkUri = sys.env
.getOrElse(GcpDataprocNetworkEnvVar, "default")
val initializationActions = sys.env
.getOrElse(GcpDataprocInitializationActionsEnvVar, "").split(",").toList
val tags = sys.env
.getOrElse(GcpDataprocTagsEnvVar, "").split(",").toList

// Build the cluster configuration with autoscaling
val config = ClusterConfig
.newBuilder()
.setMasterConfig(
InstanceGroupConfig
.newBuilder()
.setNumInstances(1)
.setMachineTypeUri("n2-highmem-64") // Adjust machine type as needed
.setDiskConfig(
DiskConfig
.newBuilder()
.setBootDiskType("pd-standard") // Use SSD for better performance
.setBootDiskSizeGb(1024) // Adjust disk size as needed
.build()
)
.build()
)
.setWorkerConfig(
InstanceGroupConfig
.newBuilder()
.setNumInstances(numWorkers) // Initial number of worker nodes. Autoscaling will adjust this
.setMachineTypeUri(hostType)
.setDiskConfig(
DiskConfig
.newBuilder()
.setBootDiskType("pd-standard")
.setBootDiskSizeGb(64)
.setNumLocalSsds(2)
.build()
)
.build()
)

val gceClusterConfig = GceClusterConfig
.newBuilder()
.setNetworkUri(networkUri)
.setServiceAccount(f"dataproc@$projectId.iam.gserviceaccount.com")
.addAllServiceAccountScopes(
List(
"https://www.googleapis.com/auth/cloud-platform",
"https://www.googleapis.com/auth/cloud.useraccounts.readonly",
"https://www.googleapis.com/auth/devstorage.read_write",
"https://www.googleapis.com/auth/logging.write"
).asJava
)
.putMetadata("hive-version", "3.1.2")
.putMetadata("SPARK_BQ_CONNECTOR_URL", "gs://spark-lib/bigquery/spark-3.5-bigquery-0.42.1.jar")
.putMetadata("artifact_prefix", artifact_prefix)
.setInternalIpOnly(true)

for(tag <- tags if tag != "") {
gceClusterConfig
.addTags(tag)
}

config
.setGceClusterConfig(
gceClusterConfig.build()
)
.setSoftwareConfig(
SoftwareConfig
.newBuilder()
.setImageVersion("2.2.50-debian12")
.addOptionalComponents(Component.FLINK)
.addOptionalComponents(Component.JUPYTER)
.putProperties("flink:env.java.opts.client",
"-Djava.net.preferIPv4Stack=true -Djava.security.properties=/etc/flink/conf/java.security")
.build()
)
.setEndpointConfig(
EndpointConfig
.newBuilder()
.setEnableHttpPortAccess(true)
.build()
)
.setLifecycleConfig(
LifecycleConfig
.newBuilder()
.setIdleDeleteTtl(
Duration
.newBuilder()
.setSeconds(7200) // 2 hours
.build()
)
.build()
)
.addInitializationActions(
NodeInitializationAction
.newBuilder()
.setExecutableFile(f"$artifact_prefix/scripts/copy_java_security.sh")
.build()
)

for(action <- initializationActions if action != "") {
config.addInitializationActions(
NodeInitializationAction
.newBuilder()
.setExecutableFile(action)
.build()
)
}

config.build()
}

private[cloud_gcp] def createDataprocCluster(projectId: String, region: String,
dataprocClient: ClusterControllerClient): String = {
val artifact_prefix = sys.env
.getOrElse(ArtifactPrefixEnvVar, throw new Exception(s"$ArtifactPrefixEnvVar not set"))

val clusterConfig = buildClusterConfig(projectId, artifact_prefix)

val clusterName = s"zipline-transient-cluster-${System.currentTimeMillis()}"

val cluster: Cluster = Cluster
.newBuilder()
.setClusterName(clusterName)
.setProjectId(projectId)
.setConfig(clusterConfig)
.build()

val createRequest = CreateClusterRequest
.newBuilder()
.setProjectId(projectId)
.setRegion(region)
.setCluster(cluster)
.build()

// Asynchronously create the cluster and wait for it to be ready
dataprocClient
.createClusterAsync(createRequest)
.get(5, java.util.concurrent.TimeUnit.MINUTES) match {
case null =>
throw new RuntimeException("Failed to create Dataproc cluster.")
case _ =>
println(s"Created Dataproc cluster: $clusterName")
}
// Check status of the cluster creation
var currentState = dataprocClient.getCluster(projectId, region, clusterName).getStatus.getState
while (
currentState != ClusterStatus.State.RUNNING &&
currentState != ClusterStatus.State.ERROR &&
currentState != ClusterStatus.State.STOPPING
) {
println(s"Waiting for Dataproc cluster $clusterName to be in RUNNING state. Current state: $currentState")
Thread.sleep(30000) // Wait for 30 seconds before checking again
currentState = dataprocClient.getCluster(projectId, region, clusterName).getStatus.getState
}
currentState match {
case ClusterStatus.State.RUNNING =>
println(s"Dataproc cluster $clusterName is running.")
clusterName
case ClusterStatus.State.ERROR =>
throw new RuntimeException(s"Failed to create Dataproc cluster $clusterName: ERROR state.")
case _ =>
throw new RuntimeException(s"Dataproc cluster $clusterName is in unexpected state: $currentState.")
}
}

private[cloud_gcp] def run(args: Array[String],
submitter: DataprocSubmitter,
envMap: Map[String, Option[String]] = Map.empty): Unit = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,9 @@ package ai.chronon.integrations.cloud_gcp
import ai.chronon.spark
import ai.chronon.spark.submission
import ai.chronon.spark.submission.JobSubmitterConstants._
import com.google.api.core.ApiFuture
import com.google.api.gax.longrunning.{OperationFuture, OperationSnapshot}
import com.google.api.gax.retrying.RetryingFuture
import com.google.api.gax.rpc.UnaryCallable
import com.google.cloud.dataproc.v1.JobControllerClient.ListJobsPagedResponse
import com.google.cloud.dataproc.v1._
Expand All @@ -15,9 +18,18 @@ import org.scalatest.flatspec.AnyFlatSpec
import org.scalatestplus.mockito.MockitoSugar

import java.nio.file.Paths
import java.util.concurrent.TimeUnit
import scala.jdk.CollectionConverters._

class DataprocSubmitterTest extends AnyFlatSpec with MockitoSugar {
def setEnv(key: String, value: String): Unit = {
val env = System.getenv()
val field = env.getClass.getDeclaredField("m")
field.setAccessible(true)
val writableEnv = field.get(env).asInstanceOf[java.util.Map[String, String]]
writableEnv.put(key, value)
}

it should "test buildFlinkJob with the expected flinkStateUri and savepointUri" in {
val submitter = new DataprocSubmitter(jobControllerClient = mock[JobControllerClient],
conf = SubmitterConf("test-project", "test-region", "test-cluster"))
Expand Down Expand Up @@ -715,6 +727,39 @@ class DataprocSubmitterTest extends AnyFlatSpec with MockitoSugar {

}

it should "create a Dataproc cluster successfully" in {
val mockDataprocClient = mock[ClusterControllerClient]

val mockOperationFuture = mock[OperationFuture[Cluster, ClusterOperationMetadata]]
val mockRetryingFuture = mock[RetryingFuture[OperationSnapshot]]
val mockMetadataFuture = mock[ApiFuture[ClusterOperationMetadata]]
val mockCluster = Cluster
.newBuilder()
.setStatus(ClusterStatus.newBuilder().setState(ClusterStatus.State.RUNNING))
.build()

when(mockDataprocClient.createClusterAsync(any[CreateClusterRequest]))
.thenReturn(mockOperationFuture)
when(mockOperationFuture.getPollingFuture).thenReturn(mockRetryingFuture)
when(mockOperationFuture.peekMetadata()).thenReturn(mockMetadataFuture)
when(mockOperationFuture.get(anyLong(), any[TimeUnit])).thenReturn(mockCluster)

when(mockDataprocClient.createClusterAsync(any[CreateClusterRequest]))
.thenReturn(mockOperationFuture)

when(mockDataprocClient.getCluster(any[String], any[String], any[String])).thenReturn(mockCluster)


val region = "test-region"
val projectId = "test-project"
setEnv(ArtifactPrefixEnvVar, "gs://test-bucket")
setEnv(GcpDataprocNumWorkersEnvVar, "2")

val clusterName = DataprocSubmitter.createDataprocCluster(region, projectId, mockDataprocClient)

verify(mockDataprocClient).createClusterAsync(any())
}

it should "test getZiplineVersionOfDataprocJob successfully" in {
val jobId = "mock-job-id"
val mockJob = mock[Job]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -178,6 +178,16 @@ object JobSubmitterConstants {
val GcpRegionEnvVar = "GCP_REGION"
val GcpDataprocClusterNameEnvVar = "GCP_DATAPROC_CLUSTER_NAME"

// Transient Dataproc creation
val GcpCreateDataprocEnvVar = "GCP_CREATE_DATAPROC"
val GcpDataprocNumWorkersEnvVar = "GCP_DATAPROC_NUM_WORKERS"
val GcpDataprocHostTypeEnvVar = "GCP_DATAPROC_HOST_TYPE"
val GcpDataprocNetworkEnvVar = "GCP_DATAPROC_NETWORK"
val GcpDataprocTagsEnvVar = "GCP_DATAPROC_TAGS"
val GcpDataprocInitializationActionsEnvVar = "GCP_DATAPROC_INITIALIZATION_ACTIONS"

val ArtifactPrefixEnvVar = "ARTIFACT_PREFIX"

val CheckIfJobIsRunning = "check-if-job-is-running"
val StreamingDeploy = "deploy"

Expand Down
Loading