Note
Go to the end to download the full example code.
Graph Builder for nested and dynamic workflows
Introduction
In this example we learn how to create nested workflows by creating a task out of a WorkGraph. Further, we will learn how to do the same with the Graph Builder, a decorator that allows us to move the creation of the WorkGraph to runtime, so we can create dynamic workflows that change depending on the inputs. This is of particular interest for integrating for-loops and if-then-else logic into your workflow.
# Load the AiiDA profile.
from aiida import load_profile
load_profile()
Profile<uuid='90da34ae855c481f9f23e4e2526238f1' name='presto'>
Nested workflows with WorkGraph
We will discuss how to use WorkGraph’s nested workflows. Suppose we want to reuse the WorkGraph computing (x+y)*z to perform the operation
wg_out = (x+y)*z
out = (x+y)*wg_out
We can integrate a WorkGraph to another WorkGraph by creating a task out of it.
from aiida_workgraph import task, WorkGraph
from aiida.orm import Int
# define add task
@task()
def add(x, y):
return x + y
# define multiply task
@task()
def multiply(x, y):
return x * y
def add_multiply(x=None, y=None, z=None):
wg = WorkGraph()
wg.add_task(add, name="add", x=x, y=y)
wg.add_task(multiply, name="multiply", x=z)
wg.add_link(wg.tasks.add.outputs.result, wg.tasks.multiply.inputs.y)
return wg
wg = WorkGraph("nested_workgraph")
# Creating a task from the WorkGraph
add_multiply1 = wg.add_task(
add_multiply(x=Int(2), y=Int(3), z=Int(4)), name="add_multiply1"
)
add_multiply2 = wg.add_task(add_multiply(x=Int(2), y=Int(3)), name="add_multiply2")
# link the output of a task to the input of another task
wg.add_link(add_multiply1.outputs.multiply.result, add_multiply2.inputs.multiply.x)
wg.to_html()
The created WorkGraphTask behaves similarly as a normal WorkGraph would (and indeed actually has the associated WorkGraph attached as an attribute).That means we can access elements of the sub-WorkGraph, for instance, its tasks, inputs, etc., via:
print(wg.tasks.add_multiply1.tasks)
print(wg.tasks.add_multiply1.tasks.add.inputs.x)
# or
print(wg.tasks["add_multiply1"].tasks)
# and
print(wg.tasks.add_multiply1.inputs)
print(wg.tasks.add_multiply1.outputs)
NodeCollection(parent = "WorkGraph", nodes = ["add", "multiply"])
SocketAny(name='x', value=uuid: 08115b07-9389-4501-8e75-5fa484911f19 (unstored) value: 2)
NodeCollection(parent = "WorkGraph", nodes = ["add", "multiply"])
TaskSocketNamespace(name='outputs', sockets=['add', 'multiply', '_wait'])
TaskSocketNamespace(name='inputs', sockets=['add', 'multiply', '_wait', '_outputs'])
Finally, we run the workgraph
wg.submit(wait=True)
# (2+3)*4 = 20
# (2+3)*20 = 100
assert add_multiply2.outputs.multiply.result.value == 100
WorkGraph process created, PK: 379
Process 379 finished with state: FINISHED
And to generate the node graph from the AiiDA process
from aiida_workgraph.utils import generate_node_graph
generate_node_graph(wg.pk)
Graph builder
A much more powerful tool to create nested WorkGraphs is the Graph Builder. It is a decorator that we can add to a function that returns a WorkGraph similar as add_multiply to have more control what we want to expose to the user and to create dynamic workflows.
Expose outputs
We add task.graph_builder decorator to a function to define a graph builder function. The function constructs a WorkGraph based on the inputs, and returns it at the end. We can expose the outputs of the WorkGraph as the outputs of the task.
@task.graph_builder(outputs = [{"name": "multiply"}])
This will expose the multiply output of the workgraph as the multiply output of the task.
# We use task.graph_builder decorator and expose the output of the "multiply"
# task as the output of the graph builder function.
@task.graph_builder(outputs=[{"name": "multiply"}])
def add_multiply(x, y, z):
# Create a WorkGraph
wg = WorkGraph()
wg.add_task(add, name="add", x=x, y=y)
wg.add_task(multiply, name="multiply", x=z)
wg.add_link(wg.tasks.add.outputs[0], wg.tasks.multiply.inputs.y)
wg.outputs.multiply = wg.tasks.multiply.outputs.result
# Don't forget to return the `wg`
return wg
Create nested workflow
We can use the graph builder task inside another WorkGraph to create nested workflow similar as with a regular WorkGraph.
wg = WorkGraph("test_graph_build")
# create a task using the graph builder, note the difference as the inputs
# are specified as ports here
add_multiply1 = wg.add_task(add_multiply, x=Int(2), y=Int(3), z=Int(4))
add_multiply2 = wg.add_task(add_multiply, x=Int(2), y=Int(3))
# link the output of a task to the input of another task
wg.add_link(add_multiply1.outputs[0], add_multiply2.inputs.z)
wg.submit(wait=True)
assert add_multiply2.outputs[0].value == 100
wg.to_html()
WorkGraph process created, PK: 408
Process 408 finished with state: FINISHED
Generate node graph from the AiiDA process,and we can see that the multiply task was executed.
generate_node_graph(wg.pk)
Looking at the process list we can also that multiple WorkGraphs have been submitted. Please run this now in the terminal:
verdi process list -a
Use the graph builder directly
Of course, one can use the graph builder directly to create a WorkGraph. Here is an example:
wg = add_multiply(2, 3, 4)
wg.submit(wait=True)
WorkGraph process created, PK: 430
Process 430 finished with state: FINISHED
<WorkGraphNode: uuid: 991ee8f2-7944-4c3b-9989-b4be4a19ae2b (pk: 430) (aiida.workflows:workgraph.engine)>
More usage (like if and while) of graph builder will be shown in the following how-tos.
Dynamic workflows
The Graph Builder also allows us to create dynamic workflows that can change depending on the input.
Example for loop
In this example we will create a dynamic number of tasks as specified in the input of the WorkGraph.
@task()
def add_one(x):
return x + 1
@task.graph_builder(outputs=[{"name": "result"}])
def for_loop(nb_iterations: Int):
wg = WorkGraph()
for i in range(nb_iterations.value):
task = wg.add_task(add_one, x=i)
# We cannot refer to a specific task as output in the graph builder decorator
# as in the examples before since the name of the last task depends on the input.
# Remember that each task is always assigned unique name automatically.
# Therefore we use the context to not directly refer to the name but the last
# task object that was created. The context can then be referred in the outputs
# of the graph builder decorator.
# Put result of the task to the context under the name task_out
wg.update_ctx({"task_out": task.outputs.result})
wg.outputs.result = wg.ctx.task_out
# If want to know more about the usage of the context please refer to the
# context howto in the documentation
return wg
wg = WorkGraph("Nested workflow: For")
loop_task = wg.add_task(for_loop, nb_iterations=Int(2))
wg.to_html()
Running the workgraph.
wg.submit(wait=True)
print("Output of last task", loop_task.outputs.result.value) # 1 + 1 result
WorkGraph process created, PK: 440
Process 440 finished with state: FINISHED
Output of last task uuid: c3836949-e960-4983-92cc-c1c22ee010d7 (pk: 451) value: 2
Plotting provenance
generate_node_graph(wg.pk)
Example if-then-else
Suppose we want to run a different task depending on the input. We run the add_one task if the number is below 2 otherwise we run a modulo 2 task.
from aiida_workgraph import task, WorkGraph
from aiida.orm import Int
@task()
def modulo_two(x):
return x % 2
@task.graph_builder(outputs=[{"name": "result"}])
def if_then_else(i: Int):
wg = WorkGraph()
if i.value < 2:
task = wg.add_task(add_one, x=i)
else:
task = wg.add_task(modulo_two, x=i)
# same concept as before, please read the for loop example for explanation
wg.update_ctx({"task_out": task.outputs.result})
wg.outputs.result = wg.ctx.task_out
return wg
wg = WorkGraph("Nested workflow: If")
task1 = wg.add_task(if_then_else, i=Int(1))
task2 = wg.add_task(if_then_else, i=task1.outputs.result)
wg.to_html()
Running the workgraph.
wg.submit(wait=True)
print("Output of first task", task1.outputs.result.value) # 1 + 1 result
print("Output of second task", task2.outputs.result.value) # 2 % 2 result
WorkGraph process created, PK: 453
Process 453 finished with state: FINISHED
Output of first task uuid: 8f7ec440-11d6-4dfa-9bc4-f51b653f02aa (pk: 458) value: 2
Output of second task uuid: 5ffa0755-c9b8-46d7-a0b5-c1c3b87686c3 (pk: 463) value: 0
Plotting provenance
from aiida_workgraph.utils import generate_node_graph
generate_node_graph(wg.pk)
Total running time of the script: (0 minutes 34.122 seconds)