Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Ensure proper workgraph resolution #126

Open
wants to merge 17 commits into
base: main
Choose a base branch
from
Open

Ensure proper workgraph resolution #126

wants to merge 17 commits into from

Conversation

leclairm
Copy link
Contributor

@leclairm leclairm commented Mar 14, 2025

Make sure workgraph items are created before using them. This PR fixes the fact that, currently, input nodes might not be already created before linking them. At the moment, this depends on the order they've been specified in the config file which should not be required.

Copy link
Collaborator

@agoscinski agoscinski left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Are you sure about it? We first add all the available on init inputs here https://github.com/C2SM/Sirocco/blob/060e5f984640daf0e1570111db5d03290fb9aaec/src/sirocco/workgraph.py#L83C14-L83C33
then in

self._add_tasks()

we create output nodes
self._link_output_nodes_to_task(task, output)

before linking any of the inputs
self._link_input_nodes_to_task(task, input_)

It is a bit hard to read the actual change of the order with all the refactoring of the functions. Can you add a unittest that would help me and is anyway necessary? One should needs to add a new fixture to create a core.workflow with data dependency

# conftest.py the one fixture we use
@pytest.fixture(scope="session")
def minimal_config() -> models.ConfigWorkflow:
    return models.ConfigWorkflow(
        name="minimal",
        rootdir=pathlib.Path("minimal"),
        cycles=[models.ConfigCycle(name="minimal", tasks=[models.ConfigCycleTask(name="some_task")])], # I think here  you need to create a new task and link the inputs and outputs, we have no test yet
        tasks=[models.ConfigShellTask(name="some_task")],
        data=models.ConfigData(
            available=[models.ConfigAvailableData(name="foo", type=models.DataType.FILE, src="foo.txt")],
            generated=[models.ConfigGeneratedData(name="bar", type=models.DataType.DIR, src="bar")],
        ),
        parameters={},
    )

raise NotImplementedError(exc)
else:
exc = f"Task: {task.name} not implemented yet."
raise NotImplementedError(exc)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Note for me for future PR: I think we want to get Task.plugin_classes and raise a NotImplementedError if its in it, and ValueError if it is not in it.

for data in self._core_workflow.data:
if isinstance(data, core.AvailableData):
self._add_aiida_input_data_node(data)

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can we move this back to reduce commit noise (I mean changes that are not relevant to the PR)?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I have grouped all helper methods to access nodes together on purpose, I don't think it would make sense to move this function back in the middle of the helper ones.

Copy link
Collaborator

@agoscinski agoscinski Mar 24, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hm... I am not really feeling this is moving us to a clearer API but I also don't think it is important to argue about now. Also I think it is due to me not defining a clear API in this class in the first place. Fine for me.

Copy link
Collaborator

@agoscinski agoscinski left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

notifier

@leclairm
Copy link
Contributor Author

Are you sure about it?

Yes, see this comment on the other PR: #125 (comment)

I'm sorry, I think this PR hides a very simple fix - breaking the loop over tasks - in a lot of rewriting that just made the overall code feel more readable to me.

@leclairm leclairm force-pushed the ref_workgraph branch 2 times, most recently from 57c37aa to 592abd6 Compare March 17, 2025 20:44
@leclairm leclairm requested a review from agoscinski March 18, 2025 08:47
@leclairm
Copy link
Contributor Author

It is a bit hard to read the actual change of the order with all the refactoring of the functions. Can you add a unittest that would help me and is anyway necessary? One should needs to add a new fixture to create a core.workflow with data dependency

I created a unit test with reversed (non logical) order in how tasks are specified.

@leclairm leclairm force-pushed the ref_workgraph branch 3 times, most recently from 6304bfb to 6630fe7 Compare March 19, 2025 11:24
Copy link
Collaborator

@GeigerJ2 GeigerJ2 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Also had some difficulty disentangling the various changes, but nice catch, @leclairm! I see you are still adding commits here, so will only leave my comments for now.

Comment on lines 299 to 298
if (workgraph_task_arguments := workgraph_task.inputs.arguments) is None:
msg = (
f"Workgraph task {workgraph_task.name!r} did not initialize arguments nodes in the workgraph "
f"before linking. This is a bug in the code, please contact developers."
)
raise ValueError(msg)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
if (workgraph_task_arguments := workgraph_task.inputs.arguments) is None:
msg = (
f"Workgraph task {workgraph_task.name!r} did not initialize arguments nodes in the workgraph "
f"before linking. This is a bug in the code, please contact developers."
)
raise ValueError(msg)
if (workgraph_task_arguments_socket := workgraph_task.inputs.arguments) is None:
msg = (
f"Workgraph task {workgraph_task.name!r} did not initialize arguments nodes in the workgraph "
f"before linking. This is a bug in the code, please contact developers."
)
raise ValueError(msg)

For readability?

Copy link
Contributor Author

@leclairm leclairm Mar 19, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Isn't it misleading? I thought sockets in AiiDA were stg else. Here we update the arguments string of the shell job. Is that also considered a socket?

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sockets are only introduced by aiida-workgraph, while aiida-core uses Ports. Ports can be considered as placeholders for inputs/outputs of processes that are populated at runtime, while Sockets abstracts that one step further. Sockets allow defining connections between processes even before runtime, that is, when the WorkGraph is created. The Socket in WorkGraph is thus used to define links (of the graph), and serves as a placeholder for its value, which will be the actual data created at runtime. They are also used to visualize the workflow in WorkGraph's GUI.

I appended the _socket because I entered this part of the code via the debugger, and type(workgraph_task_arguments) returned SocketAny, so to make it clearer when reading the code, which kind of entity we are dealing with.

Copy link
Collaborator

@agoscinski agoscinski Mar 24, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The argument string is converted after initialization of the task to a Socket. Therefore it is also accessed like workgraph_task.inputs.arguments and not workgraph_task.arguments. As Julian said it is a placeholder to populate when the workgraph is run as not all inputs are available during init. The value can be accessed with socket.value.

If it is just about making the line clearer by documenting the type you can also do workgraph_task_arguments: SocketAny one line before to keep the name shorter.

Comment on lines +306 to +302
input_labels = {port: list(map(self.label_placeholder, task.inputs[port])) for port in task.inputs}
_, arguments = self.split_cmd_arg(task.resolve_ports(input_labels))
workgraph_task_arguments.value = arguments
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
input_labels = {port: list(map(self.label_placeholder, task.inputs[port])) for port in task.inputs}
_, arguments = self.split_cmd_arg(task.resolve_ports(input_labels))
workgraph_task_arguments.value = arguments
input_labels = {port: list(map(self.label_placeholder, task.inputs[port])) for port in task.inputs}
_, arguments = self.split_cmd_arg(task.resolve_ports(input_labels))
workgraph_task_arguments_socket.value = arguments

Comment on lines +282 to +285
def _link_wait_on_to_tasks(self):
for task in self._core_workflow.tasks:
self.task_from_core(task).wait = [self.task_from_core(wt) for wt in task.wait_on]
Copy link
Collaborator

@GeigerJ2 GeigerJ2 Mar 19, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Wait-on links are not showing in the WG GUI:
image
Not sure if this is a bug in Sirocco or AiiDA WG, @agoscinski.

Further, WG tasks don't have a .wait attribute? Is this intentional that this is dynamically set here? Or should it be waiting_on? Internally, in tdata, WG uses the wait key, but the DecoratedNode we are accessing through self.task_from_core(task) only has the .waiting_on attribute, as well as the _wait input and output Socket.

Tried to fix it starting from #96, but to no avail so far.

Copy link
Collaborator

@GeigerJ2 GeigerJ2 Mar 19, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

OK, nvm. With this, from #96, waiting_on actually works:

    def _link_wait_on_to_task(self, task: core.Task):
        label = AiidaWorkGraph.get_aiida_label_from_graph_item(task)
        workgraph_task = self._aiida_task_nodes[label]
        workgraph_task.waiting_on.clear()
        for wait_on in task.wait_on:
            wait_on_task_label = AiidaWorkGraph.get_aiida_label_from_graph_item(wait_on)
            workgraph_task.waiting_on.add(self._aiida_task_nodes[wait_on_task_label])

    def _link_wait_on_to_tasks(self):
        for task in self._core_workflow.tasks:
            self._link_wait_on_to_task(task=task)

We can merge this PR here, then rebase and adapt #96, and merge that, as well?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I saw that as well because pyright complains about .wait but I'm not familiar enough with aiida-workgraph so I thought it was on purpose. Now if you see it doesn't show up in the GUI, it might be an actual pb.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We can merge this PR here, then rebase and adapt #96, and merge that, as well?

Is #96 still useful if we fix it here?

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For me, either is fine. We can fix it here and close #96. History would then be not as clean, as we mix two changes here, but I think at this early stage of the code base, that's still fine. Otherwise, the approach I mentioned is also fine.

Base automatically changed from shell_cli to main March 19, 2025 14:11
Copy link
Collaborator

@GeigerJ2 GeigerJ2 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

After discussion with @agoscinski, we agreed to leave the waiting_on fix out of this PR, and instead make another PR to fix it after, to keep the history cleaner. From my side, this can be merged now.

GeigerJ2

This comment was marked as duplicate.

Copy link
Collaborator

@agoscinski agoscinski left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could we disentangle the fix plus tests from the refactor to another PR? Or move the refactor to another PR?

"ShellJob",
name=label,
command=command,
arguments="__ERROR_NOT_SET__",
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

But you can have commands without arguments?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, but not at that stage. This placeholder is systematically replaced _set_shelljob_arguments()

Copy link
Collaborator

@agoscinski agoscinski Mar 26, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

But if it is just for internal validation that a function is invoked and doing its job, it is better to create a test that checks if the argument is empty after construction rather than creating this placeholder. When there is a bug this placeholder does not really help the user to debug it while a test actually validates the behavior. This seems something temporary you add for debugging.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would be okay to mark this line with an issue that we should replace this placeholder with a test for the arguments being not empty after initialization to not block the PR (since it it might be not a super trivial test that needs just 5 minutes to implement)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No pb, I'll remove it. We can even discuss if the test is worthwhile in our next meeting.

# linking inputs must come after outputs to make sure they were created,
# either as data or as socket, before linking
self._link_input_nodes_to_tasks()
self._set_shelljob_arguments()
Copy link
Collaborator

@agoscinski agoscinski Mar 24, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

arguments are part of the the shelljob inputs that is why it was part of the input nodes _link_inputs_to_ports. Also I think it is better to have task specific function after the isinstance check on the task type and not in this higher level

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

arguments are part of the the shelljob inputs

Ok, I had the Sirocco perspective more than the workgraph one. Building the command line sounded a very different action to me than linking the input data nodes. Also, the later has to be done for any task, while the former is only happening for shelljob.

Also I think it is better to have task specific function after the isinstance check

I agree. I wanted to avoid too much refactoring but I could do that and but all the loops over self._core_workflow.tasks and self._core_workflow.data at the beginning of __init__. That would also make the algorithm clearer.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

more prcisely, I was thinking about such a sequence in __init__:

        # create input data nodes
        for data in self._core_workflow.data:
            if isinstance(data, core.AvailableData):
                self._add_aiida_input_data_node(data)
                
        # create workgraph task nodes and output sockets
        for task in self._core_workflow.tasks:
            # Create the workgraph task itself
            if isinstance(task, core.ShellTask):
                self._create_shell_task_node(task)
            elif isinstance(task, core.IconTask):
                exc = "IconTask not implemented yet."
                raise NotImplementedError(exc)
            else:
                exc = f"Task: {task.name} not implemented yet."
                raise NotImplementedError(exc)
            # Create and link corresponding output sockets
            for output in task.outputs:
                self._link_output_node_to_task(task, output)

        # link input nodes to workgraph tasks
        for task in self._core_workflow.tasks:
            for input_ in task.input_data_nodes():
                self._link_input_node_to_task(task, input_)

        # set shelljob arguments
        for task in self._core_workflow.tasks:
            if isinstance(task, core.ShellTask):
                self._set_shelljob_arguments(task)

        # link wait on to workgraph tasks
        for task in self._core_workflow.tasks:
            self._link_wait_on_to_task(task)

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am fine with that. Just a note _link_input_node_to_task would need to also be separately implemented for icon because shelljob uses a dynamic namespace called nodes to contain all data node (see e.g.

if not hasattr(workgraph_task.inputs.nodes, f"{input_label}"):
). While in icon ever input is "static" or you could say "hardcoded" (by being defined in aiida-icon) so we directly can set the value in the corresponding socket (task.inputs.icon_namlist instead of task.inputs.nodes.icon_namlist)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

All right Thanks for the heads up. Then I'll rewrite this way.

for data in self._core_workflow.data:
if isinstance(data, core.AvailableData):
self._add_aiida_input_data_node(data)

Copy link
Collaborator

@agoscinski agoscinski Mar 24, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hm... I am not really feeling this is moving us to a clearer API but I also don't think it is important to argue about now. Also I think it is due to me not defining a clear API in this class in the first place. Fine for me.

Comment on lines 299 to 298
if (workgraph_task_arguments := workgraph_task.inputs.arguments) is None:
msg = (
f"Workgraph task {workgraph_task.name!r} did not initialize arguments nodes in the workgraph "
f"before linking. This is a bug in the code, please contact developers."
)
raise ValueError(msg)
Copy link
Collaborator

@agoscinski agoscinski Mar 24, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The argument string is converted after initialization of the task to a Socket. Therefore it is also accessed like workgraph_task.inputs.arguments and not workgraph_task.arguments. As Julian said it is a placeholder to populate when the workgraph is run as not all inputs are available during init. The value can be accessed with socket.value.

If it is just about making the line clearer by documenting the type you can also do workgraph_task_arguments: SocketAny one line before to keep the name shorter.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants