How to write error-resistant workflows
Introduction
In this tutorial, we will show how to implement the error handling in a WorkGraph.
Load the AiiDA profile.
[1]:
%load_ext aiida
from aiida import load_profile
load_profile()
[1]:
Profile<uuid='91ab760297b7474e99505a4bc4da8805' name='presto'>
Normal WorkGraph
We will show how to implement the error handlers for the ArithmeticAddCalculation
. We start by creating a normal WorkGraph:
[2]:
from aiida_workgraph import WorkGraph
from aiida import orm
from aiida.calculations.arithmetic.add import ArithmeticAddCalculation
wg = WorkGraph("normal_graph")
wg.add_task(ArithmeticAddCalculation, name="add1")
#------------------------- Submit the calculation -------------------
code = orm.load_code("add@localhost")
wg.submit(inputs={"add1": {"code": code,
"x": orm.Int(1),
"y": orm.Int(2)
}},
wait=True)
print("Task finished OK? ", wg.tasks["add1"].process.is_finished_ok)
print("Exit code: ", wg.tasks["add1"].process.exit_code)
print("Exit Message: ", wg.tasks["add1"].process.exit_message)
WorkGraph process created, PK: 61468
Process 61468 finished with state: FINISHED
Task finished OK? True
Exit code: None
Exit Message: None
Error code
If the computed sum of the inputs x and y is negative, the ArithmeticAddCalculation
fails with exit code 410. Let’s reset the WorkGraph and modify the inputs:
[3]:
wg.reset()
wg.submit(inputs={"add1": {"code": code,
"x": orm.Int(1),
"y": orm.Int(-6)
}},
wait=True)
print("Task finished OK? ", wg.tasks["add1"].process.is_finished_ok)
print("Exit code: ", wg.tasks["add1"].process.exit_code)
print("Exit Message: ", wg.tasks["add1"].process.exit_message)
WorkGraph process created, PK: 61475
Process 61475 finished with state: FINISHED
Task finished OK? False
Exit code: ExitCode(status=410, message='The sum of the operands is a negative number.', invalidates_cache=False)
Exit Message: The sum of the operands is a negative number.
We can confirm that the task fails by:
[4]:
%verdi process status {wg.pk}
WorkGraph<normal_graph><61475> Finished [302]
└── ArithmeticAddCalculation<61476> Finished [410]
Error handling
To “register” a error handler for a WorkGraph, you simply define a function that takes the self
and task_name
as its arguments, and attach it as the error_hanlders
of the WorkGraph.
You can specify the tasks and their exit codes that should trigger the error handler, as well as the maximum number of retries for a task:
tasks={"add1": {"exit_codes": [410],
"max_retries": 5}
}
[5]:
from aiida_workgraph import WorkGraph, Task
from aiida import orm
from aiida.calculations.arithmetic.add import ArithmeticAddCalculation
def handle_negative_sum(task: Task):
"""Handle the failure code 410 of the `ArithmeticAddCalculation`.
Simply make the inputs positive by taking the absolute value.
"""
# modify task inputs
task.set({"x": orm.Int(abs(task.inputs["x"].value)),
"y": orm.Int(abs(task.inputs["y"].value))})
msg = "Run error handler: handle_negative_sum."
return msg
wg = WorkGraph("normal_graph")
wg.add_task(ArithmeticAddCalculation, name="add1")
# register error handler
wg.add_error_handler(handle_negative_sum, name="handle_negative_sum",
tasks={"add1": {"exit_codes": [410],
"max_retries": 5}
})
#------------------------- Submit the calculation -------------------
wg.submit(inputs={"add1": {"code": code,
"x": orm.Int(1),
"y": orm.Int(-6)
},
},
wait=True)
print("Task finished OK? ", wg.tasks["add1"].process.is_finished_ok)
print("Exit code: ", wg.tasks["add1"].process.exit_code)
print("Exit Message: ", wg.tasks["add1"].process.exit_message)
WorkGraph process created, PK: 61482
Process 61482 finished with state: FINISHED
Task finished OK? True
Exit code: None
Exit Message: None
We can confirm that the task first fails again with a 410. Then the WorkGraph restarts the task with the new inputs, and it finishes successfully.
[6]:
%verdi process status {wg.pk}
WorkGraph<normal_graph><61482> Finished [0]
├── ArithmeticAddCalculation<61483> Finished [410]
└── ArithmeticAddCalculation<61489> Finished [0]
Custom parameters for error handlers
One can also pass custom parameters to the error handler. For example, instead of simply make the inputs positive by taking the absolute value, we add an increment to the inputs. And the increment
is a custom parameter of the error handler, which the user can specify when attaching the error handler to the WorkGraph, or update it during the execution of the WorkGraph.
[7]:
def handle_negative_sum(task: Task, increment: int = 1):
"""Handle the failure code 410 of the `ArithmeticAddCalculation`.
Simply add an increment to the inputs.
"""
# modify task inputs
task.set({"x": orm.Int(task.inputs["x"].value + increment),
"y": orm.Int(task.inputs["y"].value + increment)})
# update the task in the WorkGraph engine
msg = "Run error handler: handle_negative_sum."
return msg
wg = WorkGraph("normal_graph")
wg.add_task(ArithmeticAddCalculation, name="add1")
# register error handler
wg.add_error_handler(handle_negative_sum, name="handle_negative_sum",
tasks={"add1": {"exit_codes": [410],
"max_retries": 5,
"kwargs": {"increment": 1}}
})
#------------------------- Submit the calculation -------------------
wg.submit(inputs={"add1": {"code": code,
"x": orm.Int(1),
"y": orm.Int(-6)
},
},
wait=True)
print("Task finished OK? ", wg.tasks["add1"].process.is_finished_ok)
print("Exit code: ", wg.tasks["add1"].process.exit_code)
print("Exit Message: ", wg.tasks["add1"].process.exit_message)
WorkGraph process created, PK: 61495
Process 61495 finished with state: FINISHED
Task finished OK? True
Exit code: None
Exit Message: None
Since we increase the inputs by a increment
, so it takes three retries before it finished successfully:
[8]:
%verdi process status {wg.pk}
WorkGraph<normal_graph><61495> Finished [0]
├── ArithmeticAddCalculation<61496> Finished [410]
├── ArithmeticAddCalculation<61502> Finished [410]
├── ArithmeticAddCalculation<61508> Finished [410]
└── ArithmeticAddCalculation<61514> Finished [0]
One can increase the increment
before submiting the WorkGraph:
[9]:
wg.reset()
wg.error_handlers["handle_negative_sum"]["tasks"]["add1"]["kwargs"]["increment"] = 3
wg.submit(inputs={"add1": {"code": code,
"x": orm.Int(1),
"y": orm.Int(-6)
},
},
wait=True)
WorkGraph process created, PK: 61520
Process 61520 finished with state: FINISHED
[9]:
<WorkGraphNode: uuid: 3b33318b-f59d-4eae-9030-341217c2c203 (pk: 61520) (aiida.workflows:workgraph.engine)>
In this case, it only needs one retry to finish successfully.
[10]:
%verdi process status {wg.pk}
WorkGraph<normal_graph><61520> Finished [0]
├── ArithmeticAddCalculation<61521> Finished [410]
└── ArithmeticAddCalculation<61527> Finished [0]
Attach error handler to the task directly
PythonJob
task allows the user to attach the error handler directly to the task.
[16]:
from aiida_workgraph import WorkGraph, task
def handle_negative_sum(task: Task, increment: int = 1):
"""Handle the failure code 410 of the `ArithmeticAddCalculation`.
Simply add an increment to the inputs.
"""
# modify task inputs
task.set({"x": task.inputs["x"].value.value + increment,
"y": task.inputs["y"].value.value + increment})
# update the task in the WorkGraph engine
msg = "Run error handler: handle_negative_sum."
return msg
@task.pythonjob(outputs=[{"name": "sum"}],
error_handlers=[{"handler": handle_negative_sum,
"exit_codes": [410],
"max_retries": 5}])
def add(x: int, y: int) -> int:
sum = x + y
if sum < 0:
exit_code = {"status": 410, "message": "Sum is negative"}
return {"sum": sum, "exit_code": exit_code}
return {"sum": sum}
wg = WorkGraph("test_PythonJob")
wg.add_task(add, name="add1", x=1, y=-2, computer="localhost")
wg.submit(wait=True)
print("exit status: ", wg.tasks["add1"].process.exit_status)
print("exit message: ", wg.tasks["add1"].process.exit_message)
WorkGraph process created, PK: 61553
Process 61553 finished with state: FINISHED
exit status: 0
exit message: None
We can confirm that the task first fails again with a 410. Then the WorkGraph restarts the task with the new inputs, and it finishes successfully.
[17]:
%verdi process status {wg.pk}
WorkGraph<test_PythonJob><61553> Finished [0]
├── PythonJob<add1><61557> Finished [410]
└── PythonJob<add1><61566> Finished [0]
Compare to the BaseRestartWorkChain
AiiDA provides a BaseRestartWorkChain
class that can be used to write workflows that can handle known failure modes of processes and calculations.
[13]:
from aiida.engine import BaseRestartWorkChain
from aiida.plugins import CalculationFactory
from aiida import orm
from aiida.engine import while_
from aiida.engine import process_handler, ProcessHandlerReport
ArithmeticAddCalculation = CalculationFactory('core.arithmetic.add')
class ArithmeticAddBaseWorkChain(BaseRestartWorkChain):
_process_class = ArithmeticAddCalculation
@classmethod
def define(cls, spec):
"""Define the process specification."""
super().define(spec)
spec.expose_inputs(ArithmeticAddCalculation, namespace='add')
spec.expose_outputs(ArithmeticAddCalculation)
spec.outline(
cls.setup,
while_(cls.should_run_process)(
cls.run_process,
cls.inspect_process,
),
cls.results,
)
def setup(self):
"""Call the `setup` of the `BaseRestartWorkChain` and then create the inputs dictionary in `self.ctx.inputs`.
This `self.ctx.inputs` dictionary will be used by the `BaseRestartWorkChain` to submit the process in the
internal loop.
"""
super().setup()
self.ctx.inputs = self.exposed_inputs(ArithmeticAddCalculation, 'add')
@process_handler
def handle_negative_sum(self, node):
"""Check if the calculation failed with `ERROR_NEGATIVE_NUMBER`.
If this is the case, simply make the inputs positive by taking the absolute value.
:param node: the node of the subprocess that was ran in the current iteration.
:return: optional :class:`~aiida.engine.processes.workchains.utils.ProcessHandlerReport` instance to signal
that a problem was detected and potentially handled.
"""
if node.exit_status == ArithmeticAddCalculation.exit_codes.ERROR_NEGATIVE_NUMBER.status:
self.ctx.inputs['x'] = orm.Int(abs(node.inputs.x.value))
self.ctx.inputs['y'] = orm.Int(abs(node.inputs.y.value))
return ProcessHandlerReport()
In the BaseRestartWorkChain
, the error handling is implemented for a specific Calculation class. While, the error handling in a WorkGraph is more general and can be applied to any task in the WorkGraph, with custom parameters.
Summary
Here we have shown how to implement error handling in a WorkGraph.