Monitor Task Decorator
The monitor
decorator is designed for tasks that need to poll a specific state at regular intervals until a success criterion is met. This is useful for various scenarios, including time-based triggers, file existence checks, and monitoring other tasks or WorkGraphs.
Possible use Cases
Time Trigger: Start a task at a specified time.
File Trigger: Execute a task when a particular file exists.
Task Monitor: Observe the state of another task and act based on certain conditions.
Cross-WorkGraph Dependencies: Check the state of a task in a different WorkGraph.
Behavior
While polling, the task sleeps for a specified interval (default 1.0 second, customizable by the user), allowing the WorkGraph engine to manage other tasks.
Example Usage
The monitor task has two built-in parameters:
interval
: The time interval between each poll.timeout
: The maximum time to wait for the success criterion to be met.
Time Monitor
A task waits until a specified time.
[1]:
from aiida_workgraph import WorkGraph, task
from aiida import load_profile
import datetime
load_profile()
@task.monitor()
def time_monitor(time):
"""Return True if the current time is greater than the given time."""
return datetime.datetime.now() > time
@task.calcfunction()
def add(x, y, t=1):
import time
time.sleep(t.value)
return x + y
wg = WorkGraph(name="test_monitor")
# The task will wait until 2024-08-16, 10:54:00
monitor1 = wg.add_task(time_monitor, time=datetime.datetime(2024, 8, 16, 10, 54, 0))
add1 = wg.add_task(add, x=1, y=2)
add1.waiting_on.add(monitor1)
File Monitor
Start a task when a specified file exists.
[2]:
@task.monitor()
def file_monitor(filepath):
"""Check if the file exists."""
import os
return os.path.exists(filepath)
# Usage
wg = WorkGraph(name="test_monitor")
# The task will wait until the file exists, checking every 2 seconds, with a timeout of 10 seconds
monitor1 = wg.add_task(file_monitor, filepath="/tmp/test.txt", interval=2.0, timeout=10.0)
add1 = wg.add_task(add, x=1, y=2)
add1.waiting_on.add(monitor1)
Built-in Tasks
Two built-in tasks for common monitoring needs:
monitor1 = wg.add_task("workgraph.time_monitor", datetime=datetime.datetime.now() + datetime.timedelta(seconds=10))
monitor2 = wg.add_task("workgraph.file_monitor", filepath="/tmp/test.txt")
Awaitable Task Decorator
The awaitable
decorator allows for the integration of asyncio
within tasks, letting users control asynchronous functions.
Define and use an awaitable task within the WorkGraph.
[3]:
import asyncio
from aiida_workgraph import WorkGraph, task
@task.awaitable()
async def awaitable_func(x, y):
await asyncio.sleep(0.5)
return x + y
wg = WorkGraph(name="test_awaitable")
awaitable1 = wg.add_task(awaitable_func, x=1, y=2)
wg.run()
08/16/2024 11:26:24 AM <721494> aiida.orm.nodes.process.workflow.workchain.WorkChainNode: [REPORT] [108451|WorkGraphEngine|continue_workgraph]: Continue workgraph.
08/16/2024 11:26:24 AM <721494> aiida.orm.nodes.process.workflow.workchain.WorkChainNode: [REPORT] [108451|WorkGraphEngine|continue_workgraph]: tasks ready to run: awaitable_func1
08/16/2024 11:26:24 AM <721494> aiida.orm.nodes.process.workflow.workchain.WorkChainNode: [REPORT] [108451|WorkGraphEngine|run_tasks]: Run task: awaitable_func1, type: awaitable
08/16/2024 11:26:24 AM <721494> aiida.orm.nodes.process.workflow.workchain.WorkChainNode: [REPORT] [108451|WorkGraphEngine|on_wait]: Process status: Waiting for child processes: awaitable_func1
update task state: awaitable_func1
Continue workgraph.
------------------------------------------------------------
task: awaitable_func1 RUNNING
is workgraph finished: False
08/16/2024 11:26:24 AM <721494> aiida.orm.nodes.process.workflow.workchain.WorkChainNode: [REPORT] [108451|WorkGraphEngine|set_normal_task_results]: Task: awaitable_func1 finished.
08/16/2024 11:26:24 AM <721494> aiida.orm.nodes.process.workflow.workchain.WorkChainNode: [REPORT] [108451|WorkGraphEngine|continue_workgraph]: Continue workgraph.
08/16/2024 11:26:24 AM <721494> aiida.orm.nodes.process.workflow.workchain.WorkChainNode: [REPORT] [108451|WorkGraphEngine|continue_workgraph]: tasks ready to run:
08/16/2024 11:26:25 AM <721494> aiida.orm.nodes.process.workflow.workchain.WorkChainNode: [REPORT] [108451|WorkGraphEngine|finalize]: Finalize
on awaitable finished: awaitable_func1
set normal task results: awaitable_func1 results: {'result': 3}
update task state: awaitable_func1
Continue workgraph.
task: awaitable_func1 FINISHED
is workgraph finished: True
workgraph outputs: []
Finalize workgraph test_awaitable
[3]:
{'new_data': {},
'execution_count': <Int: uuid: 7596abd1-4b06-45ff-a029-a2eb52ad7a13 (pk: 108452) value: 0>}
Kill the monitor task
One can kill a running monitor task by using the following command:
workgraph task kill <workgraph_pk> <task_name>
# for example
workgraph task kill 119974 monitor1
A killed task will has the status KILLED
and the following task will not be executed.
Notes on asyncio Integration
The awaitable task lets the WorkGraph enter a Waiting
state, yielding control to the asyncio event loop. This enables other tasks to run concurrently, although long-running calculations may delay the execution of awaitable tasks.
Conclusion
These enhancements provide powerful tools for managing dependencies and asynchronous operations within WorkGraph, offering greater flexibility and efficiency in task execution.