Use map to generate tasks dynamically

The active_map_zone in WorkGraph is conceptually similar to Python’s built-in map function. In WorkGraph, the active_map_zone spawns the tasks in the WorkGraph for each item in a dataset.

Why use active_map_zone?

  • Creates dynamic tasks: The number of tasks is determined at runtime based on input size.

  • Encapsulates child tasks: All tasks inside a active_map_zone block are executed independently for each input item.

Example: Running Tasks in Parallel with active_map_zone

Let’s consider a simple workflow where we:

  1. Generate a dictionary of numbers.

  2. Add 1 to each number.

  3. Multiply the result by 2.

[1]:
from aiida_workgraph import task, WorkGraph, active_map_zone
from aiida import load_profile, orm

load_profile()

@task.calcfunction(outputs=["result"])
def generate_data(n: int) -> dict:
    """Generate a dictionary of integers."""
    result = {f"key_{i}": orm.Int(i) for i in range(n.value)}
    return {"result": result}

@task.calcfunction()
def add(x, y):
    return x + y

@task.calcfunction()
def multiply(x, y):
    return x * y

n = 3
with WorkGraph("map_example_1") as wg:
    data = generate_data(n=n)
    with active_map_zone(data) as map_zone:
        result = add(x=map_zone.item, y=1)
        result = multiply(x=result, y=2)

# Show the WorkGraph
wg.to_html()
# wg
[1]:

🔹 What happens here?

  • active_map_zone automatically applies the tasks (add1 and multiply2) to each element in the dictionary output of generate_data.

  • Each instance of add1 and multiply2 runs independently and in parallel.

  • The map_zone.item is a special reference that ensures each element is mapped correctly.

Running and Checking Results

[2]:
wg.run()
print("State of WorkGraph:   {}".format(wg.state))
03/26/2025 09:24:19 PM <761089> aiida.orm.nodes.process.workflow.workchain.WorkChainNode: [REPORT] [61657|WorkGraphEngine|continue_workgraph]: tasks ready to run: generate_data1
03/26/2025 09:24:20 PM <761089> aiida.orm.nodes.process.workflow.workchain.WorkChainNode: [REPORT] [61657|WorkGraphEngine|update_task_state]: Task: generate_data1, type: CALCFUNCTION, finished.
03/26/2025 09:24:20 PM <761089> aiida.orm.nodes.process.workflow.workchain.WorkChainNode: [REPORT] [61657|WorkGraphEngine|continue_workgraph]: tasks ready to run: map_zone2
03/26/2025 09:24:20 PM <761089> aiida.orm.nodes.process.workflow.workchain.WorkChainNode: [REPORT] [61657|WorkGraphEngine|continue_workgraph]: tasks ready to run: key_0_add3,key_1_add3,key_2_add3
03/26/2025 09:24:20 PM <761089> aiida.orm.nodes.process.workflow.workchain.WorkChainNode: [REPORT] [61657|WorkGraphEngine|update_task_state]: Task: key_0_add3, type: CALCFUNCTION, finished.
03/26/2025 09:24:20 PM <761089> aiida.orm.nodes.process.workflow.workchain.WorkChainNode: [REPORT] [61657|WorkGraphEngine|continue_workgraph]: tasks ready to run: key_0_multiply4,key_1_add3,key_2_add3
03/26/2025 09:24:21 PM <761089> aiida.orm.nodes.process.workflow.workchain.WorkChainNode: [REPORT] [61657|WorkGraphEngine|update_task_state]: Task: key_0_multiply4, type: CALCFUNCTION, finished.
03/26/2025 09:24:21 PM <761089> aiida.orm.nodes.process.workflow.workchain.WorkChainNode: [REPORT] [61657|WorkGraphEngine|continue_workgraph]: tasks ready to run: key_1_add3,key_2_add3
03/26/2025 09:24:21 PM <761089> aiida.orm.nodes.process.workflow.workchain.WorkChainNode: [REPORT] [61657|WorkGraphEngine|update_task_state]: Task: key_1_add3, type: CALCFUNCTION, finished.
03/26/2025 09:24:21 PM <761089> aiida.orm.nodes.process.workflow.workchain.WorkChainNode: [REPORT] [61657|WorkGraphEngine|continue_workgraph]: tasks ready to run: key_1_multiply4,key_2_add3
03/26/2025 09:24:21 PM <761089> aiida.orm.nodes.process.workflow.workchain.WorkChainNode: [REPORT] [61657|WorkGraphEngine|update_task_state]: Task: key_1_multiply4, type: CALCFUNCTION, finished.
03/26/2025 09:24:21 PM <761089> aiida.orm.nodes.process.workflow.workchain.WorkChainNode: [REPORT] [61657|WorkGraphEngine|continue_workgraph]: tasks ready to run: key_2_add3
03/26/2025 09:24:22 PM <761089> aiida.orm.nodes.process.workflow.workchain.WorkChainNode: [REPORT] [61657|WorkGraphEngine|update_task_state]: Task: key_2_add3, type: CALCFUNCTION, finished.
03/26/2025 09:24:22 PM <761089> aiida.orm.nodes.process.workflow.workchain.WorkChainNode: [REPORT] [61657|WorkGraphEngine|update_template_task_state]: Task: add3 finished.
03/26/2025 09:24:22 PM <761089> aiida.orm.nodes.process.workflow.workchain.WorkChainNode: [REPORT] [61657|WorkGraphEngine|continue_workgraph]: tasks ready to run: key_2_multiply4
03/26/2025 09:24:22 PM <761089> aiida.orm.nodes.process.workflow.workchain.WorkChainNode: [REPORT] [61657|WorkGraphEngine|update_task_state]: Task: key_2_multiply4, type: CALCFUNCTION, finished.
03/26/2025 09:24:22 PM <761089> aiida.orm.nodes.process.workflow.workchain.WorkChainNode: [REPORT] [61657|WorkGraphEngine|update_template_task_state]: Task: multiply4 finished.
03/26/2025 09:24:22 PM <761089> aiida.orm.nodes.process.workflow.workchain.WorkChainNode: [REPORT] [61657|WorkGraphEngine|update_zone_task_state]: Task: map_zone2 finished.
03/26/2025 09:24:22 PM <761089> aiida.orm.nodes.process.workflow.workchain.WorkChainNode: [REPORT] [61657|WorkGraphEngine|continue_workgraph]: tasks ready to run:
03/26/2025 09:24:22 PM <761089> aiida.orm.nodes.process.workflow.workchain.WorkChainNode: [REPORT] [61657|WorkGraphEngine|finalize]: Finalize workgraph.
State of WorkGraph:   FINISHED

Generate node graph from the AiiDA process:

[3]:
from aiida_workgraph.utils import generate_node_graph
generate_node_graph(wg.pk)
[3]:
../_images/howto_map_5_0.svg

Second workflow: gather results

Now I want to gather the results from the previous multiply_parallel tasks and calculate the sum of all their results. Let’s update the multiply_parallel function to multiply_parallel_gather.

[4]:
@task.calcfunction()
def calc_sum(**datas) -> dict:
    """Compute the sum of all provided values."""
    return  sum(datas.values())

n = 3
with WorkGraph("map_example_1") as wg:
    data = generate_data(n=n)
    with active_map_zone(data) as map_zone:
        result = add(x=map_zone.item, y=1)
        result = multiply(x=result, y=2)
    total = calc_sum(datas=result)
wg.to_html()

[4]:

Run the workgraph and get the result of the tasks:

[5]:
wg.run()
print("State of WorkGraph:   {}".format(wg.state))
print('Result of task add1: {}'.format(total.value))
03/26/2025 09:24:24 PM <761089> aiida.orm.nodes.process.workflow.workchain.WorkChainNode: [REPORT] [61681|WorkGraphEngine|continue_workgraph]: tasks ready to run: generate_data1
03/26/2025 09:24:24 PM <761089> aiida.orm.nodes.process.workflow.workchain.WorkChainNode: [REPORT] [61681|WorkGraphEngine|update_task_state]: Task: generate_data1, type: CALCFUNCTION, finished.
03/26/2025 09:24:24 PM <761089> aiida.orm.nodes.process.workflow.workchain.WorkChainNode: [REPORT] [61681|WorkGraphEngine|continue_workgraph]: tasks ready to run: map_zone2
03/26/2025 09:24:25 PM <761089> aiida.orm.nodes.process.workflow.workchain.WorkChainNode: [REPORT] [61681|WorkGraphEngine|continue_workgraph]: tasks ready to run: key_0_add3,key_1_add3,key_2_add3
03/26/2025 09:24:26 PM <761089> aiida.orm.nodes.process.workflow.workchain.WorkChainNode: [REPORT] [61681|WorkGraphEngine|update_task_state]: Task: key_0_add3, type: CALCFUNCTION, finished.
03/26/2025 09:24:26 PM <761089> aiida.orm.nodes.process.workflow.workchain.WorkChainNode: [REPORT] [61681|WorkGraphEngine|continue_workgraph]: tasks ready to run: key_0_multiply4,key_1_add3,key_2_add3
03/26/2025 09:24:26 PM <761089> aiida.orm.nodes.process.workflow.workchain.WorkChainNode: [REPORT] [61681|WorkGraphEngine|update_task_state]: Task: key_0_multiply4, type: CALCFUNCTION, finished.
03/26/2025 09:24:26 PM <761089> aiida.orm.nodes.process.workflow.workchain.WorkChainNode: [REPORT] [61681|WorkGraphEngine|continue_workgraph]: tasks ready to run: key_1_add3,key_2_add3
03/26/2025 09:24:27 PM <761089> aiida.orm.nodes.process.workflow.workchain.WorkChainNode: [REPORT] [61681|WorkGraphEngine|update_task_state]: Task: key_1_add3, type: CALCFUNCTION, finished.
03/26/2025 09:24:27 PM <761089> aiida.orm.nodes.process.workflow.workchain.WorkChainNode: [REPORT] [61681|WorkGraphEngine|continue_workgraph]: tasks ready to run: key_1_multiply4,key_2_add3
03/26/2025 09:24:28 PM <761089> aiida.orm.nodes.process.workflow.workchain.WorkChainNode: [REPORT] [61681|WorkGraphEngine|update_task_state]: Task: key_1_multiply4, type: CALCFUNCTION, finished.
03/26/2025 09:24:28 PM <761089> aiida.orm.nodes.process.workflow.workchain.WorkChainNode: [REPORT] [61681|WorkGraphEngine|continue_workgraph]: tasks ready to run: key_2_add3
03/26/2025 09:24:29 PM <761089> aiida.orm.nodes.process.workflow.workchain.WorkChainNode: [REPORT] [61681|WorkGraphEngine|update_task_state]: Task: key_2_add3, type: CALCFUNCTION, finished.
03/26/2025 09:24:29 PM <761089> aiida.orm.nodes.process.workflow.workchain.WorkChainNode: [REPORT] [61681|WorkGraphEngine|update_template_task_state]: Task: add3 finished.
03/26/2025 09:24:29 PM <761089> aiida.orm.nodes.process.workflow.workchain.WorkChainNode: [REPORT] [61681|WorkGraphEngine|continue_workgraph]: tasks ready to run: key_2_multiply4
03/26/2025 09:24:30 PM <761089> aiida.orm.nodes.process.workflow.workchain.WorkChainNode: [REPORT] [61681|WorkGraphEngine|update_task_state]: Task: key_2_multiply4, type: CALCFUNCTION, finished.
03/26/2025 09:24:30 PM <761089> aiida.orm.nodes.process.workflow.workchain.WorkChainNode: [REPORT] [61681|WorkGraphEngine|update_template_task_state]: Task: multiply4 finished.
03/26/2025 09:24:30 PM <761089> aiida.orm.nodes.process.workflow.workchain.WorkChainNode: [REPORT] [61681|WorkGraphEngine|update_zone_task_state]: Task: map_zone2 finished.
03/26/2025 09:24:30 PM <761089> aiida.orm.nodes.process.workflow.workchain.WorkChainNode: [REPORT] [61681|WorkGraphEngine|continue_workgraph]: tasks ready to run: calc_sum5
03/26/2025 09:24:30 PM <761089> aiida.orm.nodes.process.workflow.workchain.WorkChainNode: [REPORT] [61681|WorkGraphEngine|update_task_state]: Task: calc_sum5, type: CALCFUNCTION, finished.
03/26/2025 09:24:30 PM <761089> aiida.orm.nodes.process.workflow.workchain.WorkChainNode: [REPORT] [61681|WorkGraphEngine|continue_workgraph]: tasks ready to run:
03/26/2025 09:24:30 PM <761089> aiida.orm.nodes.process.workflow.workchain.WorkChainNode: [REPORT] [61681|WorkGraphEngine|finalize]: Finalize workgraph.
State of WorkGraph:   FINISHED
Result of task add1: uuid: d9e4d2c2-dd45-4834-871b-56ae6572be96 (pk: 61706) value: 12
[6]:
from aiida_workgraph.utils import generate_node_graph
generate_node_graph(wg.pk)
[6]:
../_images/howto_map_10_0.svg

You can see that the outputs of all add tasks are linked to the input of the calc_sum task.