Workflows: Interactive and Interleaving workflows

Wait interface

The aim of the operator OPH_WAIT is to temporarily pause the execution of the workflow which the task belongs to. The execution is then resumed when a specific event occurs: presence of a file on the disk or the setting of some variables in workflow environment. Duration/deadline of a pause could be even set as input parameter.

Note that only the completion of waiting task is delayed, thus deferring the execution of the depending tasks, whereas independent tasks are scheduled as usual and can be executed during the waiting time.

There are three interface types, listed below.

  1. clock

    Waiting time is determined by the input argument timeout. The user can set its value (in seconds) or a deadline (i.e. the time instant when the pause should end).

  2. input

    This interface enables interactive tasks. Waiting time is determined (by the value of timeout) or undetermined (potentially infinity). The user can stop the pause by sending a command with the operator OPH_INPUT.

    Consider the adoption of this interface in case of workflow input parameters have to be set during the execution. To this aim the user can send a list of key-value pairs by filling the arguments key and value of OPH_INPUT, which will be processed as variables of the workflow environment. Default values could be set by means of arguments key and value of operator OPH_WAIT.

  3. file

    Ophidia Server periodically checks if a file is present on the disk or at given URL (polling procedure) and stops the pause when the file is found. Waiting time is undetermined (potentially infinity). The user can stop the pause by sending a command with the operator OPH_INPUT.

In the following workflow the operator OPH_WAIT pauses the workflow execution for 10 seconds before the import of a dataset into Ophidia platform is started.

{
        "name": "ExampleW1",
        "author": "Foo",
        "abstract": "Simple workflow with a pause",
        "exec_mode": "sync",
        "ncores": "1",
        "tasks":
        [
                {
                        "name": "Pause",
                        "operator": "oph_wait",
                        "arguments": [ "timeout=10" ]
                },
                {
                        "name": "Import data",
                        "operator": "oph_importnc",
                        "arguments":
                        [
                                "measure=measure",
                                "src_path=source.nc"
                        ],
                        "dependencies":
                        [
                                { "task": "Pause" }
                        ]
                }
        ]
}

While the workflow is sleeping because of the pause, the graphical output of the command view is the following:

Workflow with a pause

Note that the disk associated with the waiting task is cyan.

During the waiting time even the workflow status is set to OPH_STATUS_WAITING, rather to OPH_STATUS_RUNNING. This feature is not very useful when OPH_WAIT is used to set a pause having pre-defined duration, but it allows to easily detect workflow waiting for input data, which potentially could get stuck for a long time. This is an example:

{
        "name": "ExampleW2",
        "author": "Foo",
        "abstract": "Simple workflow with an interactive task",
        "exec_mode": "sync",
        "ncores": "1",
        "tasks":
        [
                {
                        "name": "Pause",
                        "operator": "oph_wait",
                        "arguments":
                        [
                                "type=input",
                                "message=Use OPH_INPUT to unblock this task"
                        ]
                },
                {
                        "name": "Import data",
                        "operator": "oph_importnc",
                        "arguments":
                        [
                                "measure=measure",
                                "src_path=source.nc"
                        ],
                        "dependencies":
                        [
                                { "task": "Pause" }
                        ]
                }
        ]
}

In this case the output the command view submitted during the (undetermined) waiting time reports the message set as argument of OPH_WAIT by the user.

[Response]:
Interactive task
¯¯¯¯¯¯¯¯¯¯¯¯¯¯¯¯
Use OPH_INPUT to unblock this task

This message could be very useful to understand because a workflow is waiting and then unblock it.

The workflow ExampleW2 is an example of interactive task.

In the following workflow an example of the file-type interface is shown. In this case the import is deferred until data source is reachable. Note that timeout is the polling time, i.e. the period between two consecutive checks.

{
        "name": "ExampleW2",
        "author": "Foo",
        "abstract": "Simple workflow with source checking",
        "exec_mode": "sync",
        "ncores": "1",
        "tasks":
        [
                {
                        "name": "Check for data source availability",
                        "operator": "oph_wait",
                        "arguments":
                        [
                                "type=file",
                                "filename=http://localhost/source.nc",
                                "timeout=5"
                        ]
                },
                {
                        "name": "Import data",
                        "operator": "oph_importnc",
                        "arguments":
                        [
                                "measure=measure",
                                "src_path=http://localhost/source.nc"
                        ],
                        "dependencies":
                        [
                                { "task": "Check for data source availability" }
                        ]
                }
        ]
}

Input interface

The operator OPH_INPUT is very similar to OPH_SET. It can be used to set variables in workflow environment or change their values.

Unlike OPH_SET the input interface can be also adopted to send commands to interactive tasks (based on the operator OPH_WAIT). In fact it can be used to stop pauses (at command line or even from other workflows).

For instance, to unblock the workflow ExampleW2 shown above (assuming that its identifier is 34) the following command has to be submitted via Ophidia Terminal:

oph_input id=34

In case target workflow includes more than one interactive task, the argument taskname has to be used to specify the task which the command has to be delivered to.

From interactive to interleaving workflows

As described above, wait and input interfaces can be effectively used to setup interactive tasks, namely tasks that can be completed only after the user sends additional data (for example the values of a parameter list) or simply sends a command continue by the Ophidia Terminal. A workflow with interactive tasks is an interactive workflow.

Besides interactive workflows, the operators OPH_WAIT and OPH_INPUT can also enable interleaving workflows. In this case the completion of the interactive tasks is not triggered by the user, but rather by other workflows: the input task of a workflow sends data or a command to a waiting task of another workflow, and vice versa. It is also possible that more than two workflows interact in this way.

For instance, consider the following example with two interleaving workflows named, respectively, Master and Slave.

{
    "name": "Master",
    "author": "Foo",
    "abstract": "Master workflow in an interleaving session",
    "exec_mode": "sync",
    "ncores": "1",
    "tasks": [
        {
                "name": "CreateContainer",
                "operator": "oph_createcontainer",
                "arguments": [
                        "container=demo",
                        "dim=lat|lon|time",
                        "hierarchy=oph_base|oph_base|oph_time",
                        "vocabulary=CF"
                ],
                "on_error": "skip"
        },
        {
                "name": "CreateCube",
                "operator": "oph_randcube",
                "arguments": [
                        "compressed=no",
                        "container=demo",
                        "dim=lat|lon|time",
                        "dim_size=10|10|360",
                        "exp_ndim=2",
                        "measure=random",
                        "measure_type=float",
                        "nfrag=10",
                        "ntuple=10",
                        "concept_level=c|c|d"
                ],
                "dependencies": [
                        { "task": "CreateContainer" }
                ]
        },
        {
                "name": "WaitForCube",
                "operator": "oph_wait",
                "arguments": [
                        "type=input",
                        "message=Insert the PID of the datacube to be compared",
                        "key=cubo_pid|cooperative_workflow_id",
                        "value=@CUBE|@OPH_WORKFLOW_ID"
                ],
                "dependencies": [
                        { "task": "CreateCube", "type": "single" }
                ]
        },
        {
                "name": "Intercomparison",
                "operator": "oph_intercube",
                "arguments": [
                        "cube2=@cubo_pid",
                        "operation=sub"
                ],
                "dependencies": [
                        { "task": "CreateCube", "type": "single" },
                        { "task": "WaitForCube" }
                ]
        },
        {
                "name": "Reduce",
                "operator": "oph_reduce",
                "arguments": [ "operation=max" ],
                "dependencies": [
                        { "task": "Intercomparison", "type": "single" }
                ]
        },
        {
                "name": "Merge",
                "operator": "oph_merge",
                "arguments": [ ],
                "dependencies": [
                        { "task": "Reduce", "type": "single" }
                ]
        },
        {
                "name": "Aggregate",
                "operator": "oph_aggregate",
                "arguments": [
                        "operation=max",
                        "description=Maximum difference"
                ],
                "dependencies": [
                        { "task": "Merge", "type": "single" }
                ]
        },
        {
                "name": "SendPID",
                "operator": "oph_input",
                "arguments": [
                        "id=@cooperative_workflow_id",
                        "key=apex",
                        "value=@CUBE"
                ],
                "dependencies": [
                        { "task": "Aggregate", "type": "single" }
                ]
        }
   ]
}
{
    "name": "Slave",
    "author": "Foo",
    "abstract": "Slave workflow in an interleaving session",
    "exec_mode": "sync",
    "ncores": "1",
    "tasks": [
        {
                "name": "CreateContainer",
                "operator": "oph_createcontainer",
                "arguments": [
                        "container=demo",
                        "dim=lat|lon|time",
                        "hierarchy=oph_base|oph_base|oph_time",
                        "vocabulary=CF"
                ],
                "on_error": "skip"
        },
        {
                "name": "CreateCube",
                "operator": "oph_randcube",
                "arguments": [
                        "compressed=no",
                        "container=demo",
                        "dim=lat|lon|time",
                        "dim_size=10|10|360",
                        "exp_ndim=2",
                        "measure=random",
                        "measure_type=float",
                        "nfrag=10",
                        "ntuple=10",
                        "concept_level=c|c|d"
                ],
                "dependencies": [
                        { "task": "CreateContainer" }
                ]
        },
        {
                "name": "SendPID",
                "operator": "oph_input",
                "arguments": [
                        "id=$1",
                        "key=cubo_pid|cooperative_workflow_id",
                        "value=@CUBE|@OPH_WORKFLOW_ID"
                ],
                "dependencies": [
                        { "task": "CreateCube", "type": "single" }
                ]
        },
        {
                "name": "WaitForCube",
                "operator": "oph_wait",
                "arguments": [
                        "type=input",
                        "message=Insert the PID of the datacube to be displayed",
                        "key=apex",
                        "value=unknown"
                ],
                "dependencies": [
                        { "task": "SendPID" }
                ]
        },
        {
                "name": "ShowResult",
                "operator": "oph_explorecube",
                "arguments": [ "cube=@apex" ],
                "dependencies": [
                        { "task": "WaitForCube" }
                ]
        },
        {
                "name": "DropAllCubes",
                "operator": "oph_delete",
                "arguments": [ "cube=[*]" ],
                "dependencies": [
                        { "task": "ShowResult" }
                ]
        }
   ]
}

The Master workflow has to be started firstly. After creating a container and a cube inside, the Master workflow begins to wait for the identifier (PID) of a cube from the other workflow. The following figure reports the workflow status during this waiting time.

Snapshot of Master workflow while waiting for reply from Slave workflow

Now, the Slave workflow is started. It creates a new cube in the same container and sends its identifier to Master. Of course, container creation fails in this case because the container demo has been already created by the Master, hence the task will be marked as skipped.

Upon the Master receives the PID of the cube created by the Slave, an intercomparison task is executed. Then, the Master evaluates the APEX of resulting cube (i.e. the maximum difference between corresponding points), sends the PID of Apex cube to Slave and finishes. The delivery of the PID of APEX cube from Master to Slave is performed by exploiting another interactive task, i.e. another pair of wait-input tasks.

Finally, the Slave prints the Apex cube in a JSON file (the response associated to task ShowResult) and delete all the cubes using a massive operation.

The full procedure can be graphically represented by the following image, where virtual workflow communications implemented by means interactive tasks are shown by red arrows.

Interleaving workflows