Monitor Task Decorator

The monitor decorator is designed for tasks that need to poll a specific state at regular intervals until a success criterion is met. This is useful for various scenarios, including time-based triggers, file existence checks, and monitoring other tasks or WorkGraphs.

Possible use Cases

  • Time Trigger: Start a task at a specified time.

  • File Trigger: Execute a task when a particular file exists.

  • Task Monitor: Observe the state of another task and act based on certain conditions.

  • Cross-WorkGraph Dependencies: Check the state of a task in a different WorkGraph.

Behavior

While polling, the task sleeps for a specified interval (default 1.0 second, customizable by the user), allowing the WorkGraph engine to manage other tasks.

Example Usage

The monitor task has two built-in parameters:

  • interval: The time interval between each poll.

  • timeout: The maximum time to wait for the success criterion to be met.

Time Monitor

A task waits until a specified time.

[1]:
from aiida_workgraph import WorkGraph, task
from aiida import load_profile
import datetime

load_profile()

@task.monitor()
def time_monitor(time):
    """Return True if the current time is greater than the given time."""
    return datetime.datetime.now() > time

@task.calcfunction()
def add(x, y, t=1):
    import time
    time.sleep(t.value)
    return x + y

wg = WorkGraph(name="test_monitor")
# The task will wait until 2024-08-16, 10:54:00
monitor1 = wg.add_task(time_monitor, time=datetime.datetime(2024, 8, 16, 10, 54, 0))
add1 = wg.add_task(add, x=1, y=2)
add1.waiting_on.add(monitor1)

File Monitor

Start a task when a specified file exists.

[2]:
@task.monitor()
def file_monitor(filepath):
    """Check if the file exists."""
    import os
    return os.path.exists(filepath)

# Usage
wg = WorkGraph(name="test_monitor")
# The task will wait until the file exists, checking every 2 seconds, with a timeout of 10 seconds
monitor1 = wg.add_task(file_monitor, filepath="/tmp/test.txt", interval=2.0, timeout=10.0)
add1 = wg.add_task(add, x=1, y=2)
add1.waiting_on.add(monitor1)

Built-in Tasks

Two built-in tasks for common monitoring needs:

monitor1 = wg.add_task("workgraph.time_monitor", datetime=datetime.datetime.now() + datetime.timedelta(seconds=10))
monitor2 = wg.add_task("workgraph.file_monitor", filepath="/tmp/test.txt")

Awaitable Task Decorator

The awaitable decorator allows for the integration of asyncio within tasks, letting users control asynchronous functions.

Define and use an awaitable task within the WorkGraph.

[3]:
import asyncio
from aiida_workgraph import WorkGraph, task

@task.awaitable()
async def awaitable_func(x, y):
    await asyncio.sleep(0.5)
    return x + y

wg = WorkGraph(name="test_awaitable")
awaitable1 = wg.add_task(awaitable_func, x=1, y=2)
wg.run()
08/16/2024 11:26:24 AM <721494> aiida.orm.nodes.process.workflow.workchain.WorkChainNode: [REPORT] [108451|WorkGraphEngine|continue_workgraph]: Continue workgraph.
08/16/2024 11:26:24 AM <721494> aiida.orm.nodes.process.workflow.workchain.WorkChainNode: [REPORT] [108451|WorkGraphEngine|continue_workgraph]: tasks ready to run: awaitable_func1
08/16/2024 11:26:24 AM <721494> aiida.orm.nodes.process.workflow.workchain.WorkChainNode: [REPORT] [108451|WorkGraphEngine|run_tasks]: Run task: awaitable_func1, type: awaitable
08/16/2024 11:26:24 AM <721494> aiida.orm.nodes.process.workflow.workchain.WorkChainNode: [REPORT] [108451|WorkGraphEngine|on_wait]: Process status: Waiting for child processes: awaitable_func1
update task state:  awaitable_func1
Continue workgraph.
------------------------------------------------------------
task:  awaitable_func1 RUNNING
is workgraph finished:  False
08/16/2024 11:26:24 AM <721494> aiida.orm.nodes.process.workflow.workchain.WorkChainNode: [REPORT] [108451|WorkGraphEngine|set_normal_task_results]: Task: awaitable_func1 finished.
08/16/2024 11:26:24 AM <721494> aiida.orm.nodes.process.workflow.workchain.WorkChainNode: [REPORT] [108451|WorkGraphEngine|continue_workgraph]: Continue workgraph.
08/16/2024 11:26:24 AM <721494> aiida.orm.nodes.process.workflow.workchain.WorkChainNode: [REPORT] [108451|WorkGraphEngine|continue_workgraph]: tasks ready to run:
08/16/2024 11:26:25 AM <721494> aiida.orm.nodes.process.workflow.workchain.WorkChainNode: [REPORT] [108451|WorkGraphEngine|finalize]: Finalize
on awaitable finished:  awaitable_func1
set normal task results:  awaitable_func1 results:  {'result': 3}
update task state:  awaitable_func1
Continue workgraph.
task:  awaitable_func1 FINISHED
is workgraph finished:  True
workgraph outputs:  []
Finalize workgraph test_awaitable

[3]:
{'new_data': {},
 'execution_count': <Int: uuid: 7596abd1-4b06-45ff-a029-a2eb52ad7a13 (pk: 108452) value: 0>}

Kill the monitor task

One can kill a running monitor task by using the following command:

workgraph task kill <workgraph_pk> <task_name>
# for example
workgraph task kill 119974 monitor1

A killed task will has the status KILLED and the following task will not be executed.

Notes on asyncio Integration

The awaitable task lets the WorkGraph enter a Waiting state, yielding control to the asyncio event loop. This enables other tasks to run concurrently, although long-running calculations may delay the execution of awaitable tasks.

Conclusion

These enhancements provide powerful tools for managing dependencies and asynchronous operations within WorkGraph, offering greater flexibility and efficiency in task execution.