Note
Go to the end to download the full example code.
WorkGraph
This WorkGraph
object is a collection of tasks and links.
Create and launch workgraph
First, create an empty workgraph:
from aiida_workgraph import WorkGraph, task
from aiida import load_profile
load_profile()
wg = WorkGraph(name="my_first_workgraph")
Define and use tasks
# Define a task:
@task()
def add(x, y):
return x + y
# Add tasks to the workgraph
add1 = wg.add_task(add, name="add1")
add2 = wg.add_task(add, name="add2")
Add a link between tasks:
wg.add_link(add1.outputs.result, add2.inputs.x)
wg.to_html()
Submit the workgraph:
wg.submit(inputs={"add1": {"x": 1, "y": 2}, "add2": {"y": 3}}, wait=True)
WorkGraph process created, PK: 156
Process 156 finished with state: FINISHED
<WorkGraphNode: uuid: ec0148fc-6b4a-4084-a2a8-8272b6ac61c5 (pk: 156) (aiida.workflows:workgraph.engine)>
Load workgraph from the AiiDA process
WorkGraph saves its data as an extra attribute in its process, allowing reconstruction of the WorkGraph from the process.
from aiida_workgraph import WorkGraph
wg_loaded = WorkGraph.load(wg.pk)
Execute order
The tasks will be executed under the following conditions:
No input task
All input tasks are finished.
Grouping Inputs and Outputs in a WorkGraph
Defining group-level inputs and outputs allows you to:
Reuse inputs across multiple tasks (e.g., when several tasks share the same parameter).
Present only the necessary inputs to users, simplify the external interface of a complex workflow.
Collect and optionally rename outputs from individual tasks as grouped outputs.
wg = WorkGraph("test_workgraph_outputs")
# Define group-level input
wg.inputs.x = 2
# Add tasks using the group-level input
wg.add_task(add, "add1", x=wg.inputs.x, y=3)
wg.add_task(add, "add2", x=wg.inputs.x, y=wg.tasks.add1.outputs.result)
# Define group-level outputs to expose selected task results
wg.outputs.sum1 = wg.tasks.add1.outputs.result
wg.outputs.sum2 = wg.tasks.add2.outputs.result
# Run the workgraph
wg.submit(wait=True)
# Verify the final output
assert wg.outputs.sum2.value == 2 + (2 + 3)
WorkGraph process created, PK: 166
Process 166 finished with state: FINISHED
List of all Methods
- class aiida_workgraph.workgraph.WorkGraph(name: str = 'WorkGraph', **kwargs)[source]
Build flexible workflows with AiiDA.
The class extends from NodeGraph and provides methods to run, submit tasks, wait for tasks to finish, and update the process status. It is used to handle various states of a workgraph process and provides convenient operations to interact with it.
- process
The process node that represents the process status and other details.
- Type:
aiida.orm.ProcessNode
- state
The current state of the workgraph process.
- Type:
str
- pk
The primary key of the process node.
- Type:
int
- add_error_handler(handler, name, tasks: dict = None) None [source]
Attach an error handler to the workgraph.
- add_link(source: TaskSocket | Task, target: TaskSocket) NodeLink [source]
Add a link between two tasks.
- add_task(identifier: str | callable, name: str = None, **kwargs) Task [source]
Add a task to the workgraph.
- continue_process()[source]
Continue a saved process by sending the task to RabbitMA. Use with caution, this may launch duplicate processes.
- property error_handlers: Dict[str, Any]
Get the error handlers.
- extend(wg: WorkGraph, prefix: str = '') None [source]
Append a workgraph to the current workgraph. prefix is used to add a prefix to the task names.
- classmethod from_dict(wgdata: Dict[str, Any]) WorkGraph [source]
Rebuilds a node graph from a dictionary.
- Parameters:
ngdata (Dict[str, Any]) – The data of the node graph.
- Returns:
The rebuilt node graph.
- Return type:
NodeGraph
- classmethod from_yaml(filename: str = None, string: str = None) WorkGraph [source]
Build WrokGraph from yaml file.
- classmethod load(pk: int | ProcessNode) WorkGraph | None [source]
Load WorkGraph from the process node with the given primary key.
- Parameters:
pk (int) – The primary key of the process node.
- property meta_tasks: dict
Meta tasks are the context and group inputs/outputs
- run(inputs: Dict[str, Any] | None = None, metadata: Dict[str, Any] | None = None) Any [source]
Run the AiiDA workgraph process and update the process status. The method uses AiiDA’s engine to run the process, when the process is finished, update the status of the tasks
- save(metadata: Dict[str, Any] | None = None) None [source]
Save the udpated workgraph to the process This is only used for a running workgraph. Save the AiiDA workgraph process and update the process status.
- save_to_base(wgdata: Dict[str, Any]) None [source]
Save new wgdata to attribute. It will first check the difference, and reset tasks if needed.
- submit(inputs: Dict[str, Any] | None = None, wait: bool = False, timeout: int = 600, interval: int = 5, metadata: Dict[str, Any] | None = None, scheduler: str | None = None) ProcessNode [source]
Submit the AiiDA workgraph process and optionally wait for it to finish. :param wait: Wait for the process to finish. :type wait: bool :param timeout: The maximum time in seconds to wait for the process to finish. Defaults to 600. :type timeout: int :param restart: Restart the process, and reset the modified tasks, then only re-run the modified tasks. :type restart: bool :param new: Submit a new process. :type new: bool
- property tasks: TaskCollection
Add alias to nodes for WorkGraph
- to_dict(should_serialize: bool = False) Dict[str, Any] [source]
Convert the workgraph to a dictionary.
- to_html(output: str = None, **kwargs)[source]
Write a standalone html file to visualize the workgraph.
- to_widget_value() Dict[str, Any] [source]
Convert the workgraph to a dictionary that can be used by the widget.
- update() None [source]
Update the current state and primary key of the process node as well as the state, node and primary key of the tasks that are outgoing from the process node. This includes updating the state of process nodes linked to the current process, and data nodes linked to the current process.
- wait(timeout: int = 600, tasks: dict = None, interval: int = 5) None [source]
Periodically checks and waits for the AiiDA workgraph process to finish until a given timeout.
- Parameters:
timeout (int) – The maximum time in seconds to wait for the process to finish. Defaults to 600.
tasks (dict) – Optional; specifies task states to wait for in the format {task_name: [acceptable_states]}.
interval (int) – The time interval in seconds between checks. Defaults to 5.
- Raises:
TimeoutError – If the process does not finish within the given timeout.
Total running time of the script: (0 minutes 11.076 seconds)