Task

Task is the basic building block of the WorkGraph. A task has inputs, outputs, and the executor. A task executor can be a Python function, AiiDA components (calcfunction, workfunction, calcjob, Workchain, and ProcessBuilder). A task can be created in three ways.

Decorator

Decorate any Python function using the task decorator.

from aiida_workgraph import task
from aiida import orm

# define add task
@task  # this is equivalent to passing no arguments @task()
def add(x, y):
    return x + y


# define AiiDA calcfunction task
@task.calcfunction  # this is equivalent to passing no arguments @task.calculation()
def multiply(x, y):
    return orm.Float(x + y)


# export the task to html file so that it can be visualized in a browser
add._TaskCls().to_html()

# visualize the task in jupyter-notebook
# add._TaskCls()


The input ports (also named sockets) are generated automatically based on the function arguments. The default name of the output port is result. There are also some built-in ports, like _wait and _outputs. One can create a task instance and inspect its inputs and outputs:

add1 = add._TaskCls()
print("Inputs:", add1.get_input_names())
print("Outputs:", add1.get_output_names())
Inputs: ['x', 'y', '_wait', 'metadata', 'function_data', 'process_label', 'function_inputs', 'deserializers', 'serializers']
Outputs: ['result', '_wait', '_outputs', 'exit_code']

If you want to change the name of the output ports, or if there are more than one output. You can define the outputs explicitly. For example:

# define the outputs explicitly
@task(outputs=["sum", "diff"])
def add_minus(x, y):
    return {"sum": x + y, "difference": x - y}


print("Inputs:", add_minus._TaskCls().get_input_names())
print("Outputs:", add_minus._TaskCls().get_output_names())
Inputs: ['x', 'y', '_wait', 'metadata', 'function_data', 'process_label', 'function_inputs', 'deserializers', 'serializers']
Outputs: ['sum', 'diff', '_wait', '_outputs', 'exit_code']

One can also add an identifier to indicates the data type. The data type tell the code how to display the port in the GUI, validate the data, and serialize data into database. We use workgraph.Any for any data type. For the moment, the data validation is experimentally supported, and the GUI display is not implemented. Thus, I suggest you to always workgraph.Any for the port.

# define the outputs with identifier
@task(
    outputs=[
        {"name": "sum", "identifier": "workgraph.Any"},
        {"name": "diff", "identifier": "workgraph.Any"},
    ]
)
def add_minus(x, y):
    return {"sum": x + y, "difference": x - y}

Then, one can use the task inside the WorkGraph:

from aiida_workgraph import WorkGraph

wg = WorkGraph()
add_minus1 = wg.add_task(add_minus, name="add_minus1")
multiply1 = wg.add_task(multiply, name="multiply1")
wg.add_link(add_minus1.outputs.sum, multiply1.inputs.x)
NodeLink(from="add_minus1.sum", to="multiply1.x")

Build from Callable

One can build a task from an already existing Python function.

from aiida_workgraph import WorkGraph, build_task

from scipy.linalg import norm

NormTask = build_task(norm)

wg = WorkGraph()
norm_task = wg.add_task(NormTask, name="norm1")
norm_task.to_html()


The inputs and outputs of the task are automatically generated. One can also define the outputs explicitly.

NormTask = build_task(norm, outputs=[{"name": "norm", "identifier": "workgraph.Any"}])
wg = WorkGraph()
norm_task = wg.add_task(NormTask, name="norm1")

print("Inputs: ", norm_task.inputs)
print("Outputs: ", norm_task.outputs)
Inputs:  TaskSocketNamespace(name='inputs', sockets=['a', 'ord', 'axis', 'keepdims', 'check_finite', '_wait', 'metadata', 'function_data', 'process_label', 'function_inputs', 'deserializers', 'serializers'])
Outputs:  TaskSocketNamespace(name='outputs', sockets=['norm', '_wait', '_outputs', 'exit_code'])

For specifying the outputs, the most explicit way is to provide a list of dictionaries, as shown above. In addition, as a shortcut, it is also possible to pass a list of strings. In that case, WorkGraph will internally convert the list of strings into a list of dictionaries in which case, each name key will be assigned each passed string value. Furthermore, also a mixed list of string and dict elements can be passed, which can be useful in cases where multiple outputs should be specified, but more detailed properties are only required for some of the outputs. The above also applies for the outputs argument of the @task decorator introduced earlier, as well as the inputs, given that they are explicitly specified rather than derived from the signature of the Callable. Finally, all lines below are valid specifiers for the outputs of the ``build_task`:

NormTask = build_task(norm, outputs=["norm"])
NormTask = build_task(norm, outputs=["norm", "norm2"])
NormTask = build_task(
    norm, outputs=["norm", {"name": "norm2", "identifier": "workgraph.Any"}]
)
NormTask = build_task(
    norm,
    outputs=[
        {"name": "norm", "identifier": "workgraph.Any"},
        {"name": "norm2", "identifier": "workgraph.Any"},
    ],
)

One can use these AiiDA component directly in the WorkGraph. The inputs and outputs of the task is automatically generated based on the input and output port of the AiiDA component. In case of calcfunction, the default output is result. If there are more than one output task, one need to define the outputs explictily.

from aiida.calculations.arithmetic.add import ArithmeticAddCalculation

wg = WorkGraph()
add1 = wg.add_task(ArithmeticAddCalculation, name="add1")

Define a Task

Create a task class by inheriting from Task base class.

from aiida_workgraph.task import Task


class MyAdd(Task):

    identifier: str = "MyAdd"
    name = "MyAdd"
    node_type = "calcfunction"
    catalog = "Test"

    _executor = {
        "module_path": "aiida_workgraph.executors.test",
        "callable_name": "add",
    }

    def create_sockets(self):
        self.inputs._clear()
        self.outputs._clear()
        inp = self.add_input("workgraph.Any", "x")
        inp = self.add_input("workgraph.Any", "y")
        self.add_output("workgraph.Any", "sum")

Then, one can use the task by using its identifier.

from aiida_workgraph import WorkGraph

wg = WorkGraph()
add1_task = wg.add_task(MyAdd, name="add1")

One can also register the task in task pool, and then use its identifer directly.

wg.add_task("MyAdd", name="add1")

Total running time of the script: (0 minutes 0.046 seconds)

Gallery generated by Sphinx-Gallery