Skip to content

Commit 9e95046

Browse files
RajuGujjalapatiasingamanenishwen-gine
authored
Feature run job task, sparkJAR and SQL task (#117)
* Adding Spark Jar Task Modifying the expected test files Modifying the packages * Updating tests to fix the irregularity in dict diff * Adding initial code for Run Job Task * All the tests pass * fix: added changes for run-job-task with test cases * fix added spark-jar task code * added SQLTask code * fix: added sql task test cases code * fix: added databricks bundle code, updated workflow and docs * Update brickflow/engine/task.py Co-authored-by: Ashok Singamaneni <[email protected]> Signed-off-by: RAJU <[email protected]> * Update brickflow/engine/task.py Co-authored-by: Ashok Singamaneni <[email protected]> Signed-off-by: RAJU <[email protected]> * Update brickflow/engine/task.py Co-authored-by: Ashok Singamaneni <[email protected]> Signed-off-by: RAJU <[email protected]> * Update brickflow/cli/projects.py Co-authored-by: Ashok Singamaneni <[email protected]> Signed-off-by: RAJU <[email protected]> * fix: removed real names and id's from code base * fix: removed real id's from code base --------- Signed-off-by: Ashok Singamaneni <[email protected]> Signed-off-by: RAJU <[email protected]> Co-authored-by: Ashok Singamaneni <[email protected]> Co-authored-by: Shwetha Bc <[email protected]> Co-authored-by: Ashok Singamaneni <[email protected]>
1 parent 144de7a commit 9e95046

17 files changed

+2644
-1385
lines changed

.gitignore

+3
Original file line numberDiff line numberDiff line change
@@ -204,6 +204,9 @@ integration_workflows
204204

205205
*venv
206206

207+
# VScode
208+
.vscode
209+
207210
# GENERATED BY BRICKFLOW CLI --START--
208211

209212
### Terraform ###

brickflow/__init__.py

+6
Original file line numberDiff line numberDiff line change
@@ -274,6 +274,9 @@ def get_bundles_project_env() -> str:
274274
DLTEdition,
275275
DLTChannels,
276276
NotebookTask,
277+
SparkJarTask,
278+
RunJobTask,
279+
SqlTask,
277280
)
278281
from brickflow.engine.compute import Cluster, Runtimes
279282
from brickflow.engine.project import Project
@@ -314,11 +317,14 @@ def get_bundles_project_env() -> str:
314317
"EmailNotifications",
315318
"DLTPipeline",
316319
"NotebookTask",
320+
"SparkJarTask",
321+
"RunJobTask",
317322
"DLTEdition",
318323
"DLTChannels",
319324
"Cluster",
320325
"Runtimes",
321326
"Project",
327+
"SqlTask",
322328
"_ilog",
323329
"log",
324330
"BrickflowEnvVars",

brickflow/codegen/databricks_bundle.py

+103-5
Original file line numberDiff line numberDiff line change
@@ -37,6 +37,9 @@
3737
JobsTasksDependsOn,
3838
JobsTasksLibraries,
3939
JobsTasksNotebookTask,
40+
JobsTasksRunJobTask,
41+
JobsTasksSparkJarTask,
42+
JobsTasksSqlTask,
4043
Resources,
4144
Workspace,
4245
Bundle,
@@ -458,21 +461,93 @@ def _build_native_notebook_task(
458461
**task.cluster.job_task_field_dict,
459462
)
460463

464+
def _build_native_spark_jar_task(
465+
self,
466+
task_name: str,
467+
task: Task,
468+
task_libraries: List[JobsTasksLibraries],
469+
task_settings: TaskSettings,
470+
depends_on: List[JobsTasksDependsOn],
471+
) -> JobsTasks:
472+
try:
473+
spark_jar_task: JobsTasksSparkJarTask = task.task_func()
474+
except Exception as e:
475+
raise ValueError(
476+
f"Error while building jar task {task_name}. "
477+
f"Make sure {task_name} returns a SparkJarTask object."
478+
) from e
479+
480+
return JobsTasks(
481+
**task_settings.to_tf_dict(),
482+
spark_jar_task=spark_jar_task,
483+
libraries=task_libraries,
484+
depends_on=depends_on,
485+
task_key=task_name,
486+
# unpack dictionary provided by cluster object, will either be key or
487+
# existing cluster id
488+
**task.cluster.job_task_field_dict,
489+
)
490+
491+
def _build_native_run_job_task(
492+
self,
493+
task_name: str,
494+
task: Task,
495+
task_settings: TaskSettings,
496+
depends_on: List[JobsTasksDependsOn],
497+
) -> JobsTasks:
498+
try:
499+
run_job_task: JobsTasksRunJobTask = task.task_func()
500+
except Exception as e:
501+
raise ValueError(
502+
f"Error while building run job task {task_name}. "
503+
f"Make sure {task_name} returns a RunJobTask object."
504+
) from e
505+
return JobsTasks(
506+
**task_settings.to_tf_dict(), # type: ignore
507+
run_job_task=JobsTasksRunJobTask(job_id=run_job_task.job_id),
508+
depends_on=depends_on,
509+
task_key=task_name,
510+
)
511+
512+
def _build_native_sql_file_task(
513+
self,
514+
task_name: str,
515+
task: Task,
516+
task_settings: TaskSettings,
517+
depends_on: List[JobsTasksDependsOn],
518+
) -> JobsTasks:
519+
try:
520+
sql_task: JobsTasksSqlTask = task.task_func()
521+
except Exception as e:
522+
print(e)
523+
raise ValueError(
524+
f"Error while building sql file task {task_name}. "
525+
f"Make sure {task_name} returns a JobsTasksSqlTask object."
526+
) from e
527+
return JobsTasks(
528+
**task_settings.to_tf_dict(), # type: ignore
529+
sql_task=sql_task,
530+
depends_on=depends_on,
531+
task_key=task_name,
532+
)
533+
461534
def _build_dlt_task(
462535
self,
463536
task_name: str,
464537
task: Task,
465538
workflow: Workflow,
539+
task_settings: TaskSettings,
466540
depends_on: List[JobsTasksDependsOn],
467541
) -> JobsTasks:
468542
dlt_task: DLTPipeline = task.task_func()
469543
# tasks.append(Pipelines(**dlt_task.to_dict())) # TODO: fix this so pipeline also gets created
470544
pipeline_ref = self.get_pipeline_reference(workflow, dlt_task)
471545
return JobsTasks(
546+
**task_settings.to_tf_dict(), # type: ignore
472547
pipeline_task=JobsTasksPipelineTask(
473548
pipeline_id=f"${{resources.pipelines.{pipeline_ref}.id}}",
474549
# full_refresh=..., TODO: add full refresh
475-
),
550+
), # type: ignore
476551
depends_on=depends_on,
477552
task_key=task_name,
478553
)
@@ -484,7 +559,7 @@ def workflow_obj_to_tasks(
484559
for task_name, task in workflow.tasks.items():
485560
# TODO: DLT
486561
# pipeline_task: Pipeline = self._create_dlt_notebooks(stack, task)
487-
depends_on = [JobsTasksDependsOn(task_key=f) for f in task.depends_on_names]
562+
depends_on = [JobsTasksDependsOn(task_key=f) for f in task.depends_on_names] # type: ignore
488563
libraries = TaskLibrary.unique_libraries(
489564
task.libraries + (self.project.libraries or [])
490565
)
@@ -493,13 +568,15 @@ def workflow_obj_to_tasks(
493568
libraries += get_brickflow_libraries(workflow.enable_plugins)
494569

495570
task_libraries = [
496-
JobsTasksLibraries(**library.dict) for library in libraries
571+
JobsTasksLibraries(**library.dict) for library in libraries # type: ignore
497572
]
498573
task_settings = workflow.default_task_settings.merge(task.task_settings)
499574
if task.task_type == TaskType.DLT:
500575
# native dlt task
501576
tasks.append(
502-
self._build_dlt_task(task_name, task, workflow, depends_on)
577+
self._build_dlt_task(
578+
task_name, task, workflow, task_settings, depends_on
579+
)
503580
)
504581
elif task.task_type == TaskType.NOTEBOOK_TASK:
505582
# native notebook task
@@ -508,13 +585,34 @@ def workflow_obj_to_tasks(
508585
task_name, task, task_libraries, task_settings, depends_on
509586
)
510587
)
588+
elif task.task_type == TaskType.SPARK_JAR_TASK:
589+
# native jar task
590+
tasks.append(
591+
self._build_native_spark_jar_task(
592+
task_name, task, task_libraries, task_settings, depends_on
593+
)
594+
)
595+
elif task.task_type == TaskType.RUN_JOB_TASK:
596+
# native run job task
597+
tasks.append(
598+
self._build_native_run_job_task(
599+
task_name, task, task_settings, depends_on
600+
)
601+
)
602+
elif task.task_type == TaskType.SQL:
603+
# native run job task
604+
tasks.append(
605+
self._build_native_sql_file_task(
606+
task_name, task, task_settings, depends_on
607+
)
608+
)
511609
else:
512610
# brickflow entrypoint task
513611
task_obj = JobsTasks(
514612
**{
515613
task.databricks_task_type_str: self.task_to_task_obj(task),
516614
**task_settings.to_tf_dict(),
517-
},
615+
}, # type: ignore
518616
libraries=task_libraries,
519617
depends_on=depends_on,
520618
task_key=task_name,

0 commit comments

Comments
 (0)