Scheduler
There are many situations where you may want to control the number of calcjobs that are running at the same time. For example:
On the HPC cluster, user may has a limit on the maximum number of submissions that can be running at the same time.
On the local workstation, user may want to limit the number of calcjobs that are running at the same time to avoid overloading the system.
Managing the Scheduler
Start a scheduler with name test
:
workgraph scheduler start test
Stop the scheduler:
workgraph scheduler stop test
Show the status of the scheduler:
workgraph scheduler status test
Show details of the processes submitted to the scheduler:
workgraph scheduler show test
Set the maximum number of calcjobs that can be running at the same time:
workgraph scheduler set-max-calcjobs test 5
Set the maximum number of workflows (top-level WorkGraph) that can be running at the same time:
workgraph scheduler set-max-workflows test 5
Let’s start a scheduler
[1]:
!workgraph scheduler start test --max-calcjobs 2 --max-workflows 10
Starting the scheduler ...
Check the status of the scheduler:
[2]:
!workgraph scheduler status test
Name status pk waiting process calcjob workflow
test Running 122897 0 0/10000 0/2 0/10
Example Usage
Let’s walk through an example where we creates four WorkGraphs with five calcjobs each.
[3]:
from aiida_workgraph import WorkGraph
from aiida import load_profile, orm
from aiida.calculations.arithmetic.add import ArithmeticAddCalculation
load_profile()
# Use the calcjob: ArithmeticAddCalculation
code = orm.load_code("add@localhost")
for i in range(4):
wg = WorkGraph("test_max_number_jobs")
# Create N tasks
for i in range(5):
temp = wg.add_task(ArithmeticAddCalculation, name=f"add{i}", x=1, y=1, code=code)
# Set a sleep option for each job (e.g., 10 seconds per job)
temp.set({"metadata.options.sleep": 10})
# submit the workgraph to a scheduler called "test-scheduler"
wg.submit(scheduler="test")
WorkGraph process created, PK: 122898
WorkGraph process created, PK: 122899
WorkGraph process created, PK: 122900
WorkGraph process created, PK: 122909
Note, all the WorkGraphs are submitted to a scheduler named test
. Now, you can check the progress of the Scheduler using the following command:
[4]:
! workgraph scheduler show test
Report: Scheduler: test
PK Created Process label Process State Priorities
------ --------- ------------------------------- --------------- ------------
122898 6s ago WorkGraph<test_max_number_jobs> ⏵ Waiting
122899 5s ago WorkGraph<test_max_number_jobs> ⏵ Waiting
122900 4s ago WorkGraph<test_max_number_jobs> ⏹ Created
122903 4s ago ArithmeticAddCalculation ⏹ Created
122906 4s ago ArithmeticAddCalculation ⏹ Created
122909 3s ago WorkGraph<test_max_number_jobs> ⏹ Created -3
122910 3s ago ArithmeticAddCalculation ⏹ Created 0
122913 3s ago ArithmeticAddCalculation ⏹ Created 0
122916 3s ago ArithmeticAddCalculation ⏹ Created 0
122919 1s ago ArithmeticAddCalculation ⏹ Created -1
122922 1s ago ArithmeticAddCalculation ⏹ Created -1
122925 1s ago ArithmeticAddCalculation ⏹ Created -1
122928 1s ago ArithmeticAddCalculation ⏹ Created -1
122931 0s ago ArithmeticAddCalculation ⏹ Created -1
Total results: 14
name: test
pk: 122897
running_process: 5
waiting_process: 9
running_workflow: 3
running_calcjob: 2
max_calcjobs: 2
max_workflows: 10
max_processes: 10000
This command will display information about the currently running calcjobs, including the fact that there are a maximum of 2 calcjobs running simultaneously.
Additionally, you can monitor the progress visually by visiting http://127.0.0.1:8000/scheduler/
, and go to the detail page of the scheduler by clicking on the scheduler name. This will show you the status of the scheduler.
WorkChain Support in the Scheduler
The scheduler in aiida-workgraph
can also be used with WorkChains. However, this requires two additional steps compared to standard WorkChain usage:
Use the
submit_to_scheduler
function fromaiida_workgraph.utils.control
.Override the
submit
method of the WorkChain you want to use.
Example
Here we override the submit
method of the aiida.workflows.arithmetic.multiply_add.MultiplyAddWorkChain
to ensure it uses the scheduler for submitting processes.
from aiida.orm import ProcessNode
from typing import Type, Any
class MultiplyAddWorkChain:
"""WorkChain to multiply two numbers and add a third, adapted for scheduling."""
# other methods and attributes...
def submit(
self,
process: Type["Process"],
inputs: dict[str, Any] | None = None,
**kwargs,
) -> ProcessNode:
"""Submit a process inside the workchain via the scheduler."""
from aiida_workgraph.utils.control import submit_to_scheduler_inside_workchain
return submit_to_scheduler_inside_workchain(self, process, inputs, **kwargs)
Submit via Scheduler
You can now submit the patched WorkChain using:
from aiida_workgraph.utils.control import submit_to_scheduler
x = 1
y = 2
z = 3
submit_to_scheduler(
MultiplyAddWorkChain,
inputs={"x": x, "y": y, "z": z, "code": code},
scheduler="test"
)
⚠️ Warning: Nested WorkChains
If the WorkChain calls other WorkChains internally (i.e. nested WorkChains), this approach will not work out of the box. You must also:
Patch all nested WorkChains in the same way (override their
submit
method).Restart the AiiDA daemon.
Failure to do so can result in processes that are not properly tracked or submitted via the scheduler.
⚠️ Warning: verdi process repair
Currently, the command verdi process repair
will send the stuck processes to the normal AiiDA daemon worker queue, instead of the scheduler. This destroys the scheduler’s ability to track the processes. Also, may result in running the same process multiple times.
Persistent Scheduler
Last but not least, the scheduler is persistent. You can stop and restart it at any time using the same scheduler name, and all associated information will be preserved automatically.