From ac8a4bb6a0408b95b0cec9f702b857df3ddc45f4 Mon Sep 17 00:00:00 2001 From: Jan-Thorsten Peter Date: Fri, 3 Mar 2023 13:32:08 +0100 Subject: [PATCH] Update example/recipe Co-authored-by: michelwi --- example/config/workflow.py | 6 +++--- example/recipe/splitter/__init__.py | 8 +++++--- example/recipe/tools.py | 21 ++++++++++----------- 3 files changed, 18 insertions(+), 17 deletions(-) diff --git a/example/config/workflow.py b/example/config/workflow.py index d351a5b..a3947f4 100644 --- a/example/config/workflow.py +++ b/example/config/workflow.py @@ -16,17 +16,17 @@ async def main(): # Count lines, lines is a Variable meaning once it's computed its value can be accessed using .get() # or by converting it into a string - lines = tools.WordCount(sentences).lines + num_lines = tools.WordCount(sentences).lines # You can run computations on these variables even though they are not computed yet. # The requested computation is stored in a wrapper object and only resolved when .get() is called # or the object is converted into a string. An exception is raised if you call get() on an unfinished Variable # which doesn't have a backup value. - middle = lines // 2 + middle = num_lines // 2 first_half = tools.Head(sentences, middle).out - tk.register_output('first_half', first_half) # Tell Sisyphus that this output is needed and should be linked inside the output directory + tk.register_output('first_half', first_half) tk.register_output('sentences', sentences) # Split each paragraph into a new line and again register output diff --git a/example/recipe/splitter/__init__.py b/example/recipe/splitter/__init__.py index 1f1678b..505c388 100644 --- a/example/recipe/splitter/__init__.py +++ b/example/recipe/splitter/__init__.py @@ -1,12 +1,14 @@ from sisyphus import * import os +from typing import Iterable, List + # All paths created by RelPath will be relative to the current directory # RelPath('splitter.py') points therefore at the splitter.py file inside this directory RelPath = setup_path(__package__) class ParagraphSplitter(Job): - def __init__(self, text, splitter=RelPath('splitter.py')): + def __init__(self, text: tk.Path, splitter: tk.Path = RelPath('splitter.py')): assert text assert isinstance(text, tk.Path) @@ -17,7 +19,7 @@ def __init__(self, text, splitter=RelPath('splitter.py')): # It's unclear how many outputs will be created by this job # A way around it is to compute all output paths after this job has finished - async def outputs(self): + async def outputs(self) -> List[tk.Path]: await tk.async_run(self.splitted_dir) out = [] for path in sorted(os.listdir(str(self.splitted_dir))): @@ -28,5 +30,5 @@ async def outputs(self): def run(self): self.sh('cat {text} | {splitter} {splitted_dir}/{out_prefix}') - def tasks(self): + def tasks(self) -> Iterable[Task]: yield Task('run', 'run') diff --git a/example/recipe/tools.py b/example/recipe/tools.py index daccc49..c3ad298 100644 --- a/example/recipe/tools.py +++ b/example/recipe/tools.py @@ -1,19 +1,18 @@ from sisyphus import * import os -from typing import List +from typing import List, Any, Dict, Iterable, Union class Pipeline(Job): """ This job takes a text file as input and pipes it through all given commands """ # Everything given to the constructor will be used to compute a hash for this job. - def __init__(self, text: tk.Path, pipeline: List): + def __init__(self, text: tk.Path, pipeline: List[Any]): # You can validating the inputs to spot errors earlier assert text, "No text given" assert isinstance(text, tk.Path), "Given input" assert pipeline - # self.text = text self.pipeline = pipeline @@ -21,7 +20,7 @@ def __init__(self, text: tk.Path, pipeline: List): # Task should return a list, a generator, or something else Sisyphus can iterator over containing all # tasks of this job. In this example the job has only one task calling the `run` function - def tasks(self): + def tasks(self) -> Iterable[Task]: # Getting the size of the given input file to estimate how much time we need. # tasks() is only called when all inputs are available, we can therefore assume all input files exist. size = os.path.getsize(self.text.get_path()) @@ -42,7 +41,7 @@ def tasks(self): rqmt={'time': time, 'mem': 2, 'cpu': 2}, # Requirements needed to run this task tries=3)] # 3 tries if pipe result is empty or pipe failed - # This function will be call when the job is started + # This function will be called when the job is started def run(self): self.pipe = ' | '.join([str(i) for i in self.pipeline]) # self.sh will run the given string in a shell. Before executing it the string format function will be called @@ -54,7 +53,7 @@ def run(self): # Jobs are regular python classes, meaning you can just subclass an existing class to reuse it's code class Head(Pipeline): - def __init__(self, text, length, check_output_length=True): + def __init__(self, text: tk.Path, length: Union[int, str, tk.Delayed], check_output_length: bool = True): # tk.Delayed takes any object and converts it into a Delayed object which allows us to define operations # which will only be computed at runtime. Here we want to delay formatting since the value of length # isn't known before length is computed. @@ -63,15 +62,15 @@ def __init__(self, text, length, check_output_length=True): self.check_output_length = check_output_length def run(self): - super.run() + super().run() if self.check_output_length: output_length = int(self.sh('cat {out} | wc -l', capture_output=True)) - assert self.length.get() == output_length, "Created output file is to short" + assert self.length.get() == output_length, "Created output file length does not match" # This is how the computed hash can be modified, since `check_output_length` does not change the output # of this job we can exclude it from the hash computation @classmethod - def hash(cls, parsed_args): + def hash(cls, parsed_args: Dict[str, Any]) -> str: args = parsed_args.copy() del args['check_output_length'] return super().hash(args) @@ -79,7 +78,7 @@ def hash(cls, parsed_args): # Here is a Job with multiple Variables as output class WordCount(Job): - def __init__(self, text): + def __init__(self, text: tk.Path): self.text = text self.character = self.output_var('char') self.lines = self.output_var('lines') @@ -93,5 +92,5 @@ def run(self): self.character.set(int(c)) # Here is an example of task returning a generator - def tasks(self): + def tasks(self) -> Iterable[Task]: yield Task('run')