Skip to content
This repository was archived by the owner on May 21, 2024. It is now read-only.

Support for build step-enabled pipelines #213

Merged
merged 19 commits into from
Aug 17, 2020
Merged
Show file tree
Hide file tree
Changes from 13 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions .travis.yml
Original file line number Diff line number Diff line change
Expand Up @@ -33,11 +33,11 @@ matrix:
- python: 3.8
env:
- TOXENV=lint
- PACHYDERM_VERSION=1.10.0
- PACHYDERM_VERSION=1.11.0
- python: 3.8
env:
- TOXENV=examples
- PACHYDERM_VERSION=1.10.0
- PACHYDERM_VERSION=1.11.0

install:
- make ci-install
Expand Down
2 changes: 1 addition & 1 deletion examples/opencv/edges/requirements.txt
Original file line number Diff line number Diff line change
@@ -1,2 +1,2 @@
matplotlib>=3.1.2
opencv-python>=4.1.2.30
opencv-python==4.3.0.36
8 changes: 6 additions & 2 deletions examples/opencv/opencv.py
Original file line number Diff line number Diff line change
Expand Up @@ -37,13 +37,17 @@ def main():
python_pachyderm.create_python_pipeline(
client,
relpath("edges"),
python_pachyderm.Input(pfs=python_pachyderm.PFSInput(glob="/*", repo="images")),
input=python_pachyderm.Input(pfs=python_pachyderm.PFSInput(glob="/*", repo="images")),
)

# Create the montage pipeline
client.create_pipeline(
"montage",
transform=python_pachyderm.Transform(cmd=["sh"], image="v4tech/imagemagick", stdin=["montage -shadow -background SkyBlue -geometry 300x300+2+2 $(find /pfs -type f | sort) /pfs/out/montage.png"]),
transform=python_pachyderm.Transform(
cmd=["sh"],
image="v4tech/imagemagick",
stdin=["montage -shadow -background SkyBlue -geometry 300x300+2+2 $(find /pfs -type f | sort) /pfs/out/montage.png"]
),
input=python_pachyderm.Input(cross=[
python_pachyderm.Input(pfs=python_pachyderm.PFSInput(glob="/", repo="images")),
python_pachyderm.Input(pfs=python_pachyderm.PFSInput(glob="/", repo="edges")),
Expand Down
116 changes: 116 additions & 0 deletions src/python_pachyderm/mixin/pps.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,8 @@
import os
import json
import base64
import warnings
from pathlib import Path

from python_pachyderm.proto.pps import pps_pb2 as pps_proto
from python_pachyderm.service import Service
Expand Down Expand Up @@ -201,6 +204,87 @@ def create_pipeline(self, pipeline_name, transform, parallelism_spec=None, hasht
* `sidecar_resource_limits`: An optional `ResourceSpec` setting
resource limits for the pipeline sidecar.
"""

# Support for build step-enabled pipelines. This is a python port of
# the equivalent functionality in pachyderm core's
# 'src/server/pps/cmds/cmds.go', and any changes made here likely have
# to be reflected there as well.
if transform.build.image or transform.build.language or transform.build.path:
if spout:
raise Exception("build step-enabled pipelines do not work with spouts")
if not input:
raise Exception("no `input` specified")
if (not transform.build.language) and (not transform.build.image):
raise Exception("must specify either a build `language` or `image`")
if transform.build.language and transform.build.image:
raise Exception("cannot specify both a build `language` and `image`")
if any(pipeline_input_name(i) in ("build", "source") for i in pipeline_inputs(input)):
raise Exception(
"build step-enabled pipelines cannot have inputs with the name "
+ "'build' or 'source', as they are reserved for build assets"
)

build_path = Path(transform.build.path or ".")
if not build_path.exists():
raise Exception("build path {} does not exist".format(build_path))
if (build_path / ".pachignore").exists():
warnings.warn(
"detected a '.pachignore' file, but it's unsupported by python_pachyderm -- use `pachctl` instead",
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For posterity: #221

RuntimeWarning
)

build_pipeline_name = "{}_build".format(pipeline_name)

image = transform.build.image
if not image:
version = self.get_remote_version()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is the idea that the standard build pipeline images follow pachd's versions? If so we'll need to make it part of our release process to update the image tags

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes that's correct. Builder images are already automatically tagged and pushed as part of the release process.

version_str = "{}.{}.{}{}".format(version.major, version.minor, version.micro, version.additional)
image = "pachyderm/{}-build:{}".format(transform.build.language, version_str)
if not transform.image:
transform.image = image

def create_build_pipeline_input(name):
return pps_proto.Input(
pfs=pps_proto.PFSInput(
name=name,
glob="/",
repo=build_pipeline_name,
branch=name,
)
)

self.create_repo(build_pipeline_name, update=True)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm probably just blind but is the ***_source repo created? Does that need to happen?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Also, this is just a general question about the feature, but why do we need to create the _build repo separately (rather than having create_pipeline do it)? Is it like a consistency thing?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

  1. With build pipelines (but unlike the version of create_python_pipeline that this is replacing), there is only the _build repo, which contains the source code in one branch, and the build assets pipeline outputting to another branch.
  2. I'm not sure if I follow your second question. Is it "why are we calling self._req directly rather than self.create_pipeline in the following line?"

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think I just didn't quite understand how this feature worked (that the source and build output were two branches in the same repo), but this clarifies it—thanks!


self._req(
Service.PPS, "CreatePipeline",
pipeline=pps_proto.Pipeline(name=build_pipeline_name),
transform=pps_proto.Transform(image=image, cmd=["sh", "./build.sh"]),
parallelism_spec=pps_proto.ParallelismSpec(constant=1),
input=create_build_pipeline_input("source"),
output_branch="build",
update=update,
)

with self.put_file_client() as pfc:
if update:
pfc.delete_file((build_pipeline_name, "source"), "/")
for root, _, filenames in os.walk(str(build_path)):
for filename in filenames:
source_filepath = os.path.join(root, filename)
dest_filepath = os.path.join("/", os.path.relpath(source_filepath, start=str(build_path)))
pfc.put_file_from_filepath((build_pipeline_name, "source"), dest_filepath, source_filepath)

input = pps_proto.Input(
cross=[
create_build_pipeline_input("source"),
create_build_pipeline_input("build"),
input,
]
)

if not transform.cmd:
transform.cmd[:] = ["sh", "/pfs/build/run.sh"]

return self._req(
Service.PPS, "CreatePipeline",
pipeline=pps_proto.Pipeline(name=pipeline_name),
Expand Down Expand Up @@ -602,3 +686,35 @@ def garbage_collect(self, memory_bytes=None):
precise garbage collection (at the cost of more memory usage).
"""
return self._req(Service.PPS, "GarbageCollect", memory_bytes=memory_bytes)


def pipeline_input_name(i):
if i is None:
return None
if i.pfs is not None:
return i.pfs.name
if i.cross is not None:
if len(i.cross) > 0:
return pipeline_input_name(i.cross[0])
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just to check that I understand how this code works, technically you could return the empty string here, right? Like, IIUC, the idea is that you're assembling an iterator over all inputs (pipeline_inputs below) and then converting to a list of pipeline names, but only the names of the PFS inputs matter because that's all you check for above?

Copy link
Contributor Author

@ysimonson ysimonson Aug 15, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah that's right, I'll fix this. This current function is just a transliteration of the equivalent code in go (I think in ppsutil.)

if i.join is not None:
if len(i.join) > 0:
return pipeline_input_name(i.join[0])
if i.union is not None:
if len(i.union) > 0:
return pipeline_input_name(i.union[0])
return None


def pipeline_inputs(root):
if root is None:
return
elif root.cross is not None:
for i in root.cross:
yield from pipeline_inputs(i)
elif root.join is not None:
for i in root.join:
yield from pipeline_inputs(i)
elif root.union is not None:
for i in root.union:
yield from pipeline_inputs(i)
yield root
154 changes: 15 additions & 139 deletions src/python_pachyderm/util.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
import os
from pathlib import Path

from .proto.pps.pps_pb2 import Input, Transform, PFSInput, ParallelismSpec, CreatePipelineRequest
from .proto.pps.pps_pb2 import Transform, CreatePipelineRequest, BuildSpec

from google.protobuf import json_format

Expand Down Expand Up @@ -61,53 +62,22 @@ def put_files(client, source_path, commit, dest_path, **kwargs):
pfc.put_file_from_filepath(commit, dest_filepath, source_filepath, **kwargs)


def create_python_pipeline(client, path, input=None, pipeline_name=None, image_pull_secrets=None, debug=None,
env=None, secrets=None, image=None, update=False, **pipeline_kwargs):
def create_python_pipeline(client, path, input=None, pipeline_name=None, image_pull_secrets=None,
debug=None, env=None, secrets=None, image=None, update=False,
**pipeline_kwargs):
"""
Utility function for creating (or updating) a pipeline specially built for
executing python code that is stored locally at `path`. `path` can either
reference a directory with python code, or a single python file.
executing python code that is stored locally at `path`.

A normal pipeline creation process (i.e. a call to
`client.create_pipeline`) requires you to first build and push a container
image with the source and dependencies baked in. As an alternative
process, this function circumvents container image creation by creating:
A normal pipeline creation process requires you to first build and push a
container image with the source and dependencies baked in. As an alternative
process, this function circumvents container image creation by using build
step-enabled pipelines. See the pachyderm core docs for more info.

1) a PFS repo that stores the source code at `path`.
2) If there's a `requirements.txt` in `path`, a pipeline for building the
dependencies into wheels.
3) A pipeline for executing the PFS stored source code with the built
dependencies.

This is what the DAG looks like:

```
.------------------------. .-----------------------.
| <pipeline_name>_source | ---▶ | <pipeline_name>_build |
'------------------------' '-----------------------'
| /
▼ /
.-----------------. /
| <pipeline_name> | ◀---'
'-----------------'
|
.---------.
| <input> |
'---------'

```

(without a `requirements.txt`, there is no build pipeline.)

If `path` references a directory, it should have following:
If `path` references a directory, it should have:

* A `main.py`, as the pipeline entry-point.
* An optional `requirements.txt` that specifies pip requirements.
* An optional `build.sh` if you wish to override the default build
process.
* An optional `run.sh` if you wish to override the default pipeline
execution process.

Params:

Expand All @@ -129,116 +99,22 @@ def create_python_pipeline(client, path, input=None, pipeline_name=None, image_p
* `secrets`: An optional list of `Secret` objects for secret environment
variables.
* `image`: An optional string specifying the docker image to use for the
pipeline. Defaults to `python`.
pipeline. Defaults to using pachyderm's official python language builder.
* `update`: Whether to act as an upsert.
* `pipeline_kwargs`: Keyword arguments to forward to `create_pipeline`.
"""

# Verify & set defaults for arguments
if not os.path.exists(path):
raise Exception("path does not exist")

if not os.path.isfile(path) and not os.path.exists(os.path.join(path, "main.py")):
raise Exception("no main.py detected")

if pipeline_name is None:
pipeline_name = os.path.basename(path)
if os.path.isfile(path):
if path.endswith(".py"):
pipeline_name = pipeline_name[:-3]
else:
if path.endswith("/"):
pipeline_name = os.path.basename(path[:-1])

if not pipeline_name:
raise Exception("could not derive pipeline name")

image = image or "python:3"

# Create the source repo
source_repo_name = "{}_source".format(pipeline_name)

client.create_repo(
source_repo_name,
description="python_pachyderm.create_python_pipeline: source code for pipeline {}.".format(pipeline_name),
update=update,
)

# Create the build pipeline
build_pipeline_name = None
if os.path.exists(os.path.join(path, "requirements.txt")):
build_pipeline_name = "{}_build".format(pipeline_name)

if build_pipeline_name is not None:
build_pipeline_desc = """
python_pachyderm.create_python_pipeline: build artifacts for pipeline {}.
""".format(pipeline_name).strip()

client.create_pipeline(
build_pipeline_name,
Transform(
image=image,
cmd=["bash", "/pfs/{}/build.sh".format(source_repo_name)],
image_pull_secrets=image_pull_secrets,
debug=debug,
),
input=Input(pfs=PFSInput(glob="/", repo=source_repo_name)),
update=update,
description=build_pipeline_desc,
parallelism_spec=ParallelismSpec(constant=1),
)

source_commit_desc = "python_pachyderm.create_python_pipeline: sync source code."
with client.commit(source_repo_name, branch="master", description=source_commit_desc) as commit:
# Utility function for inserting build.sh/run.sh
def put_templated_script(filename, template):
client.put_file_bytes(commit, filename, template.format(
set_args="ex" if debug else "e",
source_repo_name=source_repo_name,
build_pipeline_name=build_pipeline_name,
).encode("utf8"))

# Delete any existing source code
if update:
client.delete_file(commit, "/")

# Insert the source code
if build_pipeline_name is None:
if os.path.isfile(path):
with open(path, "rb") as f:
client.put_file_bytes(commit, "main.py", f)
else:
put_files(client, path, commit, "/")

put_templated_script("run.sh", RUNNER_SCRIPT_WITHOUT_WHEELS)
else:
put_files(client, path, commit, "/")

if not os.path.exists(os.path.join(path, "run.sh")):
put_templated_script("run.sh", RUNNER_SCRIPT_WITH_WHEELS)
if not os.path.exists(os.path.join(path, "build.sh")):
put_templated_script("build.sh", BUILDER_SCRIPT)

# Create the pipeline
inputs = [Input(pfs=PFSInput(glob="/", repo=source_repo_name))]

if input is not None:
inputs.append(input)
if build_pipeline_name is not None:
inputs.append(Input(pfs=PFSInput(glob="/", repo=build_pipeline_name)))

return client.create_pipeline(
pipeline_name,
pipeline_name or Path(path).name,
Transform(
image=image,
cmd=["bash", "/pfs/{}/run.sh".format(source_repo_name)],
image_pull_secrets=image_pull_secrets,
debug=debug,
env=env,
secrets=secrets,
build=BuildSpec(path=path, image=image) if image else BuildSpec(path=path, language="python"),
),
input=Input(cross=inputs) if len(inputs) > 1 else inputs[0],
update=update,
input=input,
**pipeline_kwargs
)

Expand Down
Loading