Wait for another task
Introduction
In WorkGraph, tasks are generally launched when their input tasks are completed. However, there may be scenarios where you need to delay the execution of a task until another unrelated task is complete. This can occur, for example, when the task’s input depends on data dynamically populated in the context
, and you must ensure this data is ready before proceeding.
This tutorial will guide you through using the task.waiting_on
attribute in Task to manage such dependencies effectively.
Note: for cross-WorkGraph dependencies, i.e. you want to wait for a task from another WorkGraph, you can use the monitor
task, please refer to the Monitor Task tutorial.
Adding Tasks to Waiting List
To ensure task3
does not start before task1
and task2
are completed, you can add these tasks to its waiting_on
list.
Here’s how to add tasks to the waiting_on
list:
# You can use the name of the task, or the task object itself
# assuming task1 and task2 are already defined
task3.waiting_on.add(["task1", task2])
Removing Tasks from Waiting List
You can remove them from the waiting_on
list as follows:
task3.waiting_on.remove(["task1", "task2"])
Clearing the Waiting List
If you need to remove all waiting dependencies from task3
, you can clear the entire waiting_on
list:
task3.waiting_on.clear()
Example
Here we create two add
tasks and one sum
task. The sum
task will wait for the two add
tasks to finish.
[1]:
from aiida_workgraph import task
from aiida.orm import Float
from aiida import load_profile
load_profile()
# define add task
@task.calcfunction()
def add(x, y):
return x + y
# define sum task
@task.calcfunction()
def sum(**datas):
total = 0
for data in datas.values():
total += data.value
return Float(total)
[2]:
from aiida_workgraph import WorkGraph
wg = WorkGraph("test_wait")
add1 = wg.add_task(add, name="add1", x=1, y=1)
wg.update_ctx({"data.add1": add1.outputs.result})
add2 = wg.add_task(add, name="add2", x=2, y=2)
wg.update_ctx({"data.add2": add2.outputs.result})
# let sum task wait for add1 and add2, and the `data` in the context is ready
sum3 = wg.add_task(sum, name="sum1", datas=wg.ctx.data)
sum3.waiting_on.add(["add1", "add2"])
wg.submit(wait=True)
WorkGraph process created, PK: 107107
[2]:
<WorkChainNode: uuid: 0818adef-71c3-4e8c-858c-339ea8b6f4a2 (pk: 107107) (aiida.workflows:workgraph.engine)>
[3]:
print("State of WorkGraph : {}".format(wg.state))
print('Result of sum1: {}'.format(wg.tasks.sum1.outputs.result.value))
State of WorkGraph : FINISHED
Result of sum1: 6.0
Generate node graph from the AiiDA process:
[5]:
from aiida_workgraph.utils import generate_node_graph
generate_node_graph(wg.pk)
[5]: