WorkGraph

This WorkGraph object is a collection of tasks and links.

Create and launch workgraph

First, create an empty workgraph:

from aiida_workgraph import WorkGraph, task
from aiida import load_profile

load_profile()

wg = WorkGraph(name="my_first_workgraph")

Define and use tasks

# Define a task:
@task()
def add(x, y):
    return x + y


# Add tasks to the workgraph
add1 = wg.add_task(add, name="add1")
add2 = wg.add_task(add, name="add2")

Add a link between tasks:

wg.add_link(add1.outputs.result, add2.inputs.x)
wg.to_html()


Submit the workgraph:

wg.submit(inputs={"add1": {"x": 1, "y": 2}, "add2": {"y": 3}}, wait=True)
WorkGraph process created, PK: 156
Process 156 finished with state: FINISHED

<WorkGraphNode: uuid: ec0148fc-6b4a-4084-a2a8-8272b6ac61c5 (pk: 156) (aiida.workflows:workgraph.engine)>

Load workgraph from the AiiDA process

WorkGraph saves its data as an extra attribute in its process, allowing reconstruction of the WorkGraph from the process.

from aiida_workgraph import WorkGraph

wg_loaded = WorkGraph.load(wg.pk)

Execute order

The tasks will be executed under the following conditions:

  • No input task

  • All input tasks are finished.

Grouping Inputs and Outputs in a WorkGraph

Defining group-level inputs and outputs allows you to:

  • Reuse inputs across multiple tasks (e.g., when several tasks share the same parameter).

  • Present only the necessary inputs to users, simplify the external interface of a complex workflow.

  • Collect and optionally rename outputs from individual tasks as grouped outputs.

wg = WorkGraph("test_workgraph_outputs")

# Define group-level input
wg.inputs.x = 2

# Add tasks using the group-level input
wg.add_task(add, "add1", x=wg.inputs.x, y=3)
wg.add_task(add, "add2", x=wg.inputs.x, y=wg.tasks.add1.outputs.result)

# Define group-level outputs to expose selected task results
wg.outputs.sum1 = wg.tasks.add1.outputs.result
wg.outputs.sum2 = wg.tasks.add2.outputs.result

# Run the workgraph
wg.submit(wait=True)

# Verify the final output
assert wg.outputs.sum2.value == 2 + (2 + 3)
WorkGraph process created, PK: 166
Process 166 finished with state: FINISHED

List of all Methods

class aiida_workgraph.workgraph.WorkGraph(name: str = 'WorkGraph', **kwargs)[source]

Build flexible workflows with AiiDA.

The class extends from NodeGraph and provides methods to run, submit tasks, wait for tasks to finish, and update the process status. It is used to handle various states of a workgraph process and provides convenient operations to interact with it.

process

The process node that represents the process status and other details.

Type:

aiida.orm.ProcessNode

state

The current state of the workgraph process.

Type:

str

pk

The primary key of the process node.

Type:

int

add_error_handler(handler, name, tasks: dict = None) None[source]

Attach an error handler to the workgraph.

Add a link between two tasks.

add_task(identifier: str | callable, name: str = None, **kwargs) Task[source]

Add a task to the workgraph.

build_connectivity() None[source]

Analyze the connectivity of workgraph and save it into dict.

continue_process()[source]

Continue a saved process by sending the task to RabbitMA. Use with caution, this may launch duplicate processes.

property error_handlers: Dict[str, Any]

Get the error handlers.

extend(wg: WorkGraph, prefix: str = '') None[source]

Append a workgraph to the current workgraph. prefix is used to add a prefix to the task names.

classmethod from_dict(wgdata: Dict[str, Any]) WorkGraph[source]

Rebuilds a node graph from a dictionary.

Parameters:

ngdata (Dict[str, Any]) – The data of the node graph.

Returns:

The rebuilt node graph.

Return type:

NodeGraph

classmethod from_yaml(filename: str = None, string: str = None) WorkGraph[source]

Build WrokGraph from yaml file.

get_error_handlers() Dict[str, Any][source]

Get the error handlers.

kill_tasks(tasks: List[str]) None[source]

Kill the given tasks

classmethod load(pk: int | ProcessNode) WorkGraph | None[source]

Load WorkGraph from the process node with the given primary key.

Parameters:

pk (int) – The primary key of the process node.

property meta_tasks: dict

Meta tasks are the context and group inputs/outputs

pause_tasks(tasks: List[str]) None[source]

Pause the given tasks.

play_tasks(tasks: List[str]) None[source]

Play the given tasks

reset() None[source]

Reset the workgraph to create a new submission.

restart()[source]

Create a restart submission.

run(inputs: Dict[str, Any] | None = None, metadata: Dict[str, Any] | None = None) Any[source]

Run the AiiDA workgraph process and update the process status. The method uses AiiDA’s engine to run the process, when the process is finished, update the status of the tasks

save(metadata: Dict[str, Any] | None = None) None[source]

Save the udpated workgraph to the process This is only used for a running workgraph. Save the AiiDA workgraph process and update the process status.

save_to_base(wgdata: Dict[str, Any]) None[source]

Save new wgdata to attribute. It will first check the difference, and reset tasks if needed.

show() None[source]

Print the current state of the workgraph process.

submit(inputs: Dict[str, Any] | None = None, wait: bool = False, timeout: int = 600, interval: int = 5, metadata: Dict[str, Any] | None = None, scheduler: str | None = None) ProcessNode[source]

Submit the AiiDA workgraph process and optionally wait for it to finish. :param wait: Wait for the process to finish. :type wait: bool :param timeout: The maximum time in seconds to wait for the process to finish. Defaults to 600. :type timeout: int :param restart: Restart the process, and reset the modified tasks, then only re-run the modified tasks. :type restart: bool :param new: Submit a new process. :type new: bool

property tasks: TaskCollection

Add alias to nodes for WorkGraph

to_dict(should_serialize: bool = False) Dict[str, Any][source]

Convert the workgraph to a dictionary.

to_html(output: str = None, **kwargs)[source]

Write a standalone html file to visualize the workgraph.

to_widget_value() Dict[str, Any][source]

Convert the workgraph to a dictionary that can be used by the widget.

update() None[source]

Update the current state and primary key of the process node as well as the state, node and primary key of the tasks that are outgoing from the process node. This includes updating the state of process nodes linked to the current process, and data nodes linked to the current process.

wait(timeout: int = 600, tasks: dict = None, interval: int = 5) None[source]

Periodically checks and waits for the AiiDA workgraph process to finish until a given timeout.

Parameters:
  • timeout (int) – The maximum time in seconds to wait for the process to finish. Defaults to 600.

  • tasks (dict) – Optional; specifies task states to wait for in the format {task_name: [acceptable_states]}.

  • interval (int) – The time interval in seconds between checks. Defaults to 5.

Raises:

TimeoutError – If the process does not finish within the given timeout.

Total running time of the script: (0 minutes 11.076 seconds)

Gallery generated by Sphinx-Gallery