Skip to content

Commit 9855fd8

Browse files
committed
Add sample data, Job class
1 parent b42d26c commit 9855fd8

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

42 files changed

+509
-194
lines changed

iptk/dataset.py

+12-38
Original file line numberDiff line numberDiff line change
@@ -1,43 +1,17 @@
1-
import os, shutil
1+
import os, re, shutil
22

33
class Dataset(object):
44
"""
5-
Instances of the Dataset class represent IPTK datasets on disk. A valid
6-
path must be passed to the instance upon creation. An IPTK dataset will be
7-
created at the given path if it does not exist already.
5+
Instances of the Dataset class represent IPTK datasets in an abstract
6+
fashion. A concrete dataset is specified by the combination of a pair of
7+
a Dataset and a DatasetStore instance.
88
"""
9-
def __init__(self, path):
9+
def __init__(self, identifier, store=None):
1010
super(Dataset, self).__init__()
11-
self.path = path
12-
self.initialize()
13-
14-
@property
15-
def data_path(self):
16-
return os.path.join(self.path, 'data')
17-
18-
def initialize(self):
19-
"""
20-
Initializes the dataset by creating an empty IPTK dataset structure
21-
including the data/, temp/, and meta/ directories. After
22-
initialization, the dataset can be edited until it is locked by a call
23-
to the lock() method or the removal of the temp/ directory.
24-
Initializing an existing dataset is a no-op.
25-
"""
26-
if not os.path.exists(self.data_path):
27-
subdirs = ["temp", "data", "meta"]
28-
for s in subdirs:
29-
os.makedirs(os.path.join(self.path, s), exist_ok=True)
30-
31-
def lock(self):
32-
"""
33-
Locks the dataset. Locked datasets can be used as job inputs but the
34-
content of their data/ directory must remain unchanged. A locked
35-
dataset is indicated by the absence of a temp/ subdirectory. Unlocking
36-
a dataset by re-creating temp/ is not allowed and may lead to
37-
unpleasant side effects. Locking a locked dataset is a no-op.
38-
"""
39-
tmp_dir = os.path.join(self.path, 'temp')
40-
if os.path.exists(tmp_dir):
41-
shutil.rmtree(tmp_dir)
42-
43-
11+
if not re.match("^[0-9a-z]{40}$", identifier):
12+
raise ValueError('Invalid dataset identifier')
13+
self.identifier = identifier
14+
self.store = store
15+
16+
def __repr__(self):
17+
return f"<{self.__class__.__name__} {self.identifier}>"

iptk/dataset_store.py

+75-8
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
import os, re
1+
import json, os, re, shutil
22
from .dataset import Dataset
33

44
class DatasetStore(object):
@@ -7,6 +7,51 @@ def __init__(self, root_path):
77
super(DatasetStore, self).__init__()
88
self.root_path = root_path
99

10+
def get_path(self, dataset):
11+
"""
12+
Returns the path of the requested dataset on disk.
13+
"""
14+
subdir = "/".join(list(dataset.identifier[:4]))
15+
path = os.path.join(self.root_path, subdir, dataset.identifier)
16+
return path
17+
18+
def get_data_path(self, dataset, mutable=None):
19+
"""
20+
Returns the full path to the given dataset's data/ directory. If the
21+
mutable argument is set, an exception is raised for locked datasets (if
22+
mutable == True) or unlocked datasets (if mutable == False).
23+
"""
24+
if (mutable is not None) and (self.is_locked(dataset) == mutable):
25+
raise ValueError("Dataset lock state does not match request")
26+
dataset_path = self.get_path(dataset)
27+
return os.path.join(dataset_path, "data")
28+
29+
def get_meta_path(self, dataset, spec):
30+
dataset_path = self.get_path(dataset)
31+
return os.path.join(dataset_path, "meta", f"{spec.identifier}.json")
32+
33+
def get_metadata(self, dataset, spec):
34+
"""
35+
Returns the metadata saved within this store for the given combination
36+
of dataset and metadata specification.
37+
"""
38+
path = self.get_meta_path(dataset, spec)
39+
if not os.path.exists(path):
40+
return None
41+
with open(path, "r") as f:
42+
return json.load(f)
43+
44+
def set_metadata(self, dataset, spec, data):
45+
"""
46+
Sets the metadata for the given dataset and metadata specification. The
47+
data object must be JSON-serializable. Note that your changes may be
48+
overwritten if the metadata specification is associated with a metadata
49+
generator.
50+
"""
51+
path = self.get_meta_path(dataset, spec)
52+
with open(path, "w+") as f:
53+
return json.dump(data, f, sort_keys=True, indent=4, separators=(',', ': '))
54+
1055
def get_dataset(self, dataset_id, create_ok=False):
1156
"""
1257
Fetch a Dataset object backed by this DatasetStore. Raises a value
@@ -15,10 +60,32 @@ def get_dataset(self, dataset_id, create_ok=False):
1560
method can optionally create an empty dataset if no dataset with the
1661
given identifier exists.
1762
"""
18-
if not re.match("^[0-9a-z]{40}$", dataset_id):
19-
raise ValueError('Invalid dataset identifier')
20-
subdir = "/".join(list(dataset_id[:4]))
21-
path = os.path.join(self.root_path, subdir, dataset_id)
22-
if not os.path.exists(path) and not create_ok:
23-
raise ValueError('No existing dataset at path and create_ok is False')
24-
return Dataset(path)
63+
dataset = Dataset(dataset_id, self)
64+
path = self.get_path(dataset)
65+
if not os.path.exists(path):
66+
if create_ok:
67+
subdirs = ["temp", "data", "meta"]
68+
for s in subdirs:
69+
os.makedirs(os.path.join(path, s), exist_ok=True)
70+
else:
71+
raise ValueError("Dataset not found in this store")
72+
return dataset
73+
74+
def is_locked(self, dataset):
75+
tmp_dir = os.path.join(self.get_path(dataset), 'temp')
76+
return not os.path.exists(tmp_dir)
77+
78+
def lock_dataset(self, dataset):
79+
"""
80+
Locks the dataset. Locked datasets can be used as job inputs but the
81+
content of their data/ directory must remain unchanged. A locked
82+
dataset is indicated by the absence of a temp/ subdirectory. Unlocking
83+
a dataset by re-creating temp/ is not allowed and may lead to
84+
unpleasant side effects. Locking a locked dataset is a no-op.
85+
"""
86+
tmp_dir = os.path.join(self.get_path(dataset), 'temp')
87+
if os.path.exists(tmp_dir):
88+
shutil.rmtree(tmp_dir)
89+
90+
def __repr__(self):
91+
return f"<{self.__class__.__name__} {self.root_path}>"

iptk/docker_utils.py

+71-39
Original file line numberDiff line numberDiff line change
@@ -1,45 +1,77 @@
11
#!/usr/local/bin/python3
22
import requests
33

4-
def get_digest(registry, repository, tag):
5-
manifest_url = f"https://{registry}/v2/{repository}/manifests/{tag}"
6-
headers = {"Accept": "application/vnd.docker.distribution.manifest.v2+json"}
7-
r = requests.head(manifest_url, headers=headers)
8-
if r.status_code == 401:
9-
token_url = f"https://auth.docker.io/token"
10-
token_params = {
11-
"service": "registry.docker.io",
12-
"scope": f"repository:{repository}:pull"
4+
class DockerImage(object):
5+
"""
6+
Represents a specific Docker image in IPTK. While this can be constructed
7+
from a human-readable image reference, the reference will be resolved to a
8+
digest value on instance creation.
9+
"""
10+
def __init__(self, reference):
11+
super(DockerImage, self).__init__()
12+
self.resolve(reference)
13+
14+
@classmethod
15+
def from_dict(cls, specification):
16+
registry = specification["registry"]
17+
repository = specification["repository"]
18+
digest = specification["digest"]
19+
reference = f"{registry}/{repository}@{digest}"
20+
return cls(reference)
21+
22+
@property
23+
def spec(self):
24+
spec = {
25+
"registry": self.registry,
26+
"repository": self.repository,
27+
"digest": self.digest
1328
}
14-
token = requests.get(token_url, params=token_params, json=True).json()["token"]
15-
headers["Authorization"] = f"Bearer {token}"
29+
return spec
30+
31+
32+
def get_digest(self, registry, repository, tag):
33+
manifest_url = f"https://{registry}/v2/{repository}/manifests/{tag}"
34+
headers = {"Accept": "application/vnd.docker.distribution.manifest.v2+json"}
1635
r = requests.head(manifest_url, headers=headers)
17-
return r.headers['Docker-Content-Digest']
36+
if r.status_code == 401:
37+
token_url = f"https://auth.docker.io/token"
38+
token_params = {
39+
"service": "registry.docker.io",
40+
"scope": f"repository:{repository}:pull"
41+
}
42+
token = requests.get(token_url, params=token_params, json=True).json()["token"]
43+
headers["Authorization"] = f"Bearer {token}"
44+
r = requests.head(manifest_url, headers=headers)
45+
return r.headers['Docker-Content-Digest']
1846

19-
def get_parts(reference):
20-
# Docker default values
21-
registry = "registry-1.docker.io"
22-
repository = reference
23-
tag = "latest"
24-
digest = None
25-
# Parse domain part, if any
26-
if "/" in reference:
27-
domain, remainder = reference.split("/", 1)
28-
if domain == "localhost" or "." in domain or ":" in domain:
29-
registry = domain
30-
repository = remainder
31-
# Separate image reference and digest
32-
if "@" in repository:
33-
repository, digest = repository.split("@", 1)
34-
# See if image contains a tag
35-
if ":" in repository:
36-
repository, tag = repository.split(":", 1)
37-
# Handle "familiar" Docker references
38-
if registry == "registry-1.docker.io" and "/" not in repository:
39-
repository = "library/" + repository
40-
if not digest:
41-
digest = get_digest(registry, repository, tag)
42-
return (registry, repository, tag, digest)
43-
44-
def get_reference(registry, repository, digest):
45-
return f"{registry}/{repository}@{digest}"
47+
def resolve(self, reference):
48+
# Docker default values
49+
registry = "registry-1.docker.io"
50+
repository = reference
51+
tag = "latest"
52+
digest = None
53+
# Parse domain part, if any
54+
if "/" in repository:
55+
domain, remainder = repository.split("/", 1)
56+
if domain == "localhost" or "." in domain or ":" in domain:
57+
registry = domain
58+
repository = remainder
59+
# Separate image reference and digest
60+
if "@" in repository:
61+
repository, digest = repository.split("@", 1)
62+
# See if image contains a tag
63+
if ":" in repository:
64+
repository, tag = repository.split(":", 1)
65+
# Handle "familiar" Docker references
66+
if registry == "registry-1.docker.io" and "/" not in repository:
67+
repository = "library/" + repository
68+
if not digest:
69+
digest = self.get_digest(registry, repository, tag)
70+
self.registry = registry
71+
self.repository = repository
72+
self.tag = tag
73+
self.digest = digest
74+
self.reference = f"{registry}/{repository}@{digest}"
75+
76+
def __repr__(self):
77+
return f"<{self.__class__.__name__} {self.reference}>"

iptk/job.py

+68
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,68 @@
1+
from datetime import datetime
2+
from .json_utils import json_hash
3+
from .metadata_spec import MetadataSpec
4+
from .mount import Mount
5+
from .docker_utils import DockerImage
6+
7+
class Job(object):
8+
"""
9+
IPTK jobs take a variable amount of input datasets and a Docker image
10+
specification. The IPTK runner will create a container from the given image
11+
and mount the input datasets as specified. An output dataset will
12+
automatically be created and it's data/ directory will be mounted in the
13+
container at /output. The output dataset will automatically be locked after
14+
the container has finished executing.
15+
"""
16+
def __init__(self, image, mounts, command=None):
17+
super(Job, self).__init__()
18+
self.image = image
19+
self.mounts = mounts
20+
self.command = command
21+
22+
@classmethod
23+
def from_dict(cls, specification):
24+
image = DockerImage.from_dict(specification["image"])
25+
mounts = []
26+
for m in specification["mounts"]:
27+
mounts.append(Mount.from_dict(m))
28+
command = specification.get("command", None)
29+
return cls(image, mounts, command)
30+
31+
@property
32+
def minimal_spec(self):
33+
spec = {
34+
"command": self.command,
35+
"image": self.image.spec,
36+
"mounts": [m.spec for m in self.mounts]
37+
}
38+
return spec
39+
40+
@property
41+
def spec(self):
42+
spec = self.minimal_spec
43+
return spec
44+
45+
@property
46+
def identifier(self):
47+
"""
48+
A job's identifier is also the identifier of the resulting dataset if
49+
the jobs is executed by the IPTK runner.
50+
"""
51+
return json_hash(self.minimal_spec)
52+
53+
def save(self, store):
54+
"""
55+
Save this job to a dataset store. This creates a new dataset with this
56+
job's specification stored in it's metadata. An IPTK runner can then be
57+
used to run the job.
58+
"""
59+
meta_spec = MetadataSpec("University of Münster", "IPTK Job", 3)
60+
dataset = store.get_dataset(self.identifier, create_ok=True)
61+
current_job_spec = store.get_metadata(dataset, meta_spec)
62+
if current_job_spec:
63+
current_job = Job.from_dict(current_job_spec)
64+
if self.minimal_spec != current_job.minimal_spec:
65+
raise Exception("Different job exists with equal identifier")
66+
store.set_metadata(dataset, meta_spec, self.spec)
67+
68+

0 commit comments

Comments
 (0)