Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Refactor Spark Python Executor #1231

Merged
merged 10 commits into from
Apr 24, 2023

Conversation

hsubbaraj-spiral
Copy link
Contributor

@hsubbaraj-spiral hsubbaraj-spiral commented Apr 20, 2023

Describe your changes and why you are making these changes

This PR refactors the spark python executors and reduces the maintenance overhead.

The technical complexity was how to share code with similar definitions but one additional parameter, a SparkSession. We also want to avoid importing pyspark in our normal codepaths that don't execute in Spark environments. To do this, we pass in the differing functions as parameters (read_artifacts, write_artifact, infer_type_artifact, and setup_connector) while sending the SparkSession object as a kwarg. This way the generic function passed in can be called in the same manner by both Spark and non-Spark codepaths.

Related issue number (if any)

Loom demo (if any)

Checklist before requesting a review

  • I have created a descriptive PR title. The PR title should complete the sentence "This PR...".
  • I have performed a self-review of my code.
  • I have included a small demo of the changes. For the UI, this would be a screenshot or a Loom video.
  • If this is a new feature, I have added unit tests and integration tests.
  • I have run the integration tests locally and they are passing.
  • I have run the linter script locally (See python3 scripts/run_linters.py -h for usage).
  • All features on the UI continue to work correctly.
  • Added one of the following CI labels:
    • run_integration_test: Runs integration tests
    • skip_integration_test: Skips integration tests (Should be used when changes are ONLY documentation/UI)

@hsubbaraj-spiral hsubbaraj-spiral added the run_integration_test Triggers integration tests label Apr 20, 2023
def run_helper(
spec: Spec,
read_artifacts_func: Any,
write_artifact_func: Any,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If we are already parsing function objects, does it make sense to pass different function objects based on whether it's spark or not? In this way we don't even need to pass is_spark and other stuff as arguments.

For example, could we do something like

if is_spark:
 run_helper(spec, read_artifact_func=utils.read_spark_artifacts, write_artifact_func=utils.write_spark_artifacts, ...)

Copy link
Contributor

@likawind likawind left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks great, this is a huge improvements! I have some minor comments but I don't think we have to spend too much time iterating on this

read_artifacts_func: Any,
write_artifact_func: Any,
setup_connector_func: Any,
is_spark: bool,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If we have to pass this arg around, does it makes sense to explicitly pass a single Optional[spark.Session] object to decide if spark is enabled? Also I'd like to remove **kwargs and since it's not clear what to expect and how it's used. This pattern is more useful in cases like decorators where we support arbitrary inputs, which is not the case here

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is the correct pattern, however it requires that we import pyspark.sql in the regular code path, which we want to avoid. The kwargs pattern was to avoid the import in non-Spark environments.

Copy link
Contributor

@kenxu95 kenxu95 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just some stylistic/readability concerns from me!

- write_artifact_func: function used to write artifacts to storage layer
- setup_connector_func: function to use to setup the connectors
- is_spark Whether or not we are running in a Spark env.
The only kwarg we expect is spark_session_obj
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: Can we use the same docstring format as we do in the SDK (see the ones in client.py)?

)


def run_helper(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we call this execute_function_spec() or something? run_helper() doesn't tell me very much.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same thing for the other run_helper() -> execute_data_spec() or something

@hsubbaraj-spiral hsubbaraj-spiral merged commit e918800 into main Apr 24, 2023
@hsubbaraj-spiral hsubbaraj-spiral deleted the eng-2388-reduce-maintenance-overhead-of-spark branch April 24, 2023 17:05
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
run_integration_test Triggers integration tests
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants