Workflows: Selection interface

Usually a workflow consists of a list of tasks to be executed back-to-back according to pre-defined dependency rules related to data passing, e.g. the output of task is the input of another task, or data availability, e.g. a dataset can be downloaded from a repository only when it has been published on that repository, a dataset could be deleted only when it has been processed by all the intended tasks, etc. As a consequence, the workflow can be represented as a “static” task graph.

The adoption of the operator OPH_FOR enables a workflow to include parallel tasks or iterative tasks (that can be executed more times, maybe processing several datasets), thus providing greater dynamism while saving representation simplicity.

The Selection interface provides further flexibility by enabling the Workflow manager to execute one or more tasks based on boolean conditions that could be checked at run-time and depend on input parameters, data, metadata, etc.

The development of the Selection interface involved the design of new Ophidia operators:

  1. OPH_IF
  2. OPH_ELSEIF
  3. OPH_ELSE
  4. OPH_ENDIF

Similarly to other flow control operators (such as OPH_FOR), they does not process data or metadata directly, but they could be adopted to enable (or to skip) the execution of a set of tasks based on run-time conditions. For instance, the operator OPH_IF evaluates a conditional test (including mathematical and logic operators) such as

50 < 100
@A > 5
@B + 273 < 30
@A <= @B

where variables @A and @B are set using OPH_SET, and enables the execution of its task children only in case the test is positive, otherwise the task children are skipped.

Next sections report some sample workflows where the Selection interface is used. First of all a simple workflow with an optional task is considered (one selection block). Then, a workflow with two disjoint tasks is shown. Finally, let us consider a workflow with more disjoint alternatives.

Selection statement with one selection block

Let’s consider a workflow that evaluates the maximum value of the cube received as input and publishes the cube only in case the maximum value is higher than a certain threshold.

Assuming that input cube consists of one fragment, the workflow should be similar to the following one.

{
        "name": "ExampleS1",
        "author": "Foo",
        "abstract": "Simple workflow with one selection block",
        "exec_mode": "sync",
        "ncores": "1",
        "cube": "$1",
        "tasks":
        [
                {
                        "name": "Data reduction",
                        "operator": "oph_reduce",
                        "arguments": [ "cube=$1", "operation=max" ]
                },
                {
                        "name": "Data aggregation",
                        "operator": "oph_aggregate",
                        "arguments": [ "operation=max" ],
                        "dependencies":
                        [
                                { "task": "Data reduction",
                                  "type": "single" }
                        ]
                },
                {
                        "name": "Show the maximum value",
                        "operator": "oph_explorecube",
                        "arguments": [ ],
                        "dependencies":
                        [
                                { "task": "Data aggregation",
                                  "type": "single" }
                        ]
                },
                {
                        "name": "Extract the maximum value",
                        "operator": "oph_set",
                        "arguments": [ "key=apex", "value=explorecube_data(1,3)"],
                        "dependencies":
                        [
                                { "task": "Show the maximum value" }
                        ]
                },
                {
                        "name": "Check the maximum value",
                        "operator": "oph_if",
                        "arguments": [ "condition=@apex > $2" ],
                        "dependencies":
                        [
                                { "task": "Extract the maximum value" }
                        ]
                },
                {
                        "name": "Publish the cube",
                        "operator": "oph_publish",
                        "arguments": [ "cube=$1" ],
                        "dependencies":
                        [
                                { "task": "Check the maximum value" }
                        ]
                },
                {
                        "name": "Selection block end",
                        "operator": "oph_endif",
                        "arguments": [ ],
                        "dependencies":
                        [
                                { "task": "Publish the cube" }
                        ]
                }
        ]
}

where $1 is the PID of input cube and $2 is the threshold.

The task Extract the maximum value sets the variable $apex to the apex value, which is logically returned from explorecube_data(1,3): it is assumed that the value is shown in the third column of OPH_EXPLORECUBE output as follows.

[Response]:
measure_name
¯¯¯¯¯¯¯¯¯¯¯¯
+=====+=====+=================+
| lat | lon | measure_name    |
+=====+=====+=================+
| ALL | ALL | 998.9388468670  |
+=====+=====+=================+

The execution of task Check the maximum value consists of the evaluation of the boolean expression $apex > $2, i.e. the threshold check. The task Publish the cube is executed only in case the boolean expression is TRUE, otherwise the task is skipped.

The final task Selection block end closes the selection block simply; no operation is executed.

The following figure shows the graphical representation of this workflow obtained with the command check.

Workflow with a selection interface with one block

To emphasize the optional section the selection block is encircled by a square and the workflow management operators are shown in a diamond rather than a circle.

During the execution and after the completion of a workflow diamonds and circles are filled as usual (see Basic usage), however the tasks belonging to unselected blocks are shown as grey disks, like unscheduled tasks. For instance, in case threshold is too high, the final representation of the sample workflow considered above will be the following.

Workflow with an unselected task

The approach described above is useful, for example, in case of workflows with optional tasks: one or more tasks can be enabled by some flags set as input parameters.

Selection statement with two selection blocks

In the following workflow the selection interface is used to code two possible implementations of a task that imports data into Ophidia platform from an external source, i.e. a file named source.nc:

  1. to import all the dataset and, then, extract the part to be shown on the screen;
  2. to import only the subset from input file.

The actual implementation to be adopted is selected by means of the input parameter $1: a numerical non-zero value for option A, 0 for option B.

{
        "name": "ExampleS2",
        "author": "Foo",
        "abstract": "Simple workflow with two selection blocks",
        "exec_mode": "sync",
        "ncores": "1",
        "tasks":
        [
                {
                        "name": "IF block",
                        "operator": "oph_if",
                        "arguments": [ "condition=$1" ]
                },
                {
                        "name": "Import data",
                        "operator": "oph_importnc",
                        "arguments":
                        [
                                "measure=measure",
                                "src_path=source.nc"
                        ],
                        "dependencies":
                        [
                                { "task": "IF block" }
                        ]
                },
                {
                        "name": "Subset data",
                        "operator": "oph_subset",
                        "arguments":
                        [
                               "subset_dims=lat|lon",
                               "subset_filter=1:10|1:10"
                        ],
                        "dependencies":
                        [
                                { "task": "Import data",
                                  "type": "single" }
                        ]
                },
                {
                        "name": "ELSE block",
                        "operator": "oph_else",
                        "arguments": [  ],
                        "dependencies":
                        [
                                { "task": "IF block" }
                        ]
                },
                {
                        "name": "Import and subset data",
                        "operator": "oph_importnc",
                        "arguments":
                        [
                                "measure=measure",
                                "src_path=source.nc",
                                "subset_dims=lat|lon",
                                "subset_filter=1:10|1:10"
                        ],
                        "dependencies":
                        [
                                { "task": "ELSE block" }
                        ]
                },
                {
                        "name": "Selection block end",
                        "operator": "oph_endif",
                        "arguments": [ ],
                        "dependencies":
                        [
                                { "task": "Subset data",
                                  "type": "single" },
                                { "task": "Import and subset data",
                                  "type": "single" }
                        ]
                },
                {
                        "name": "Show data",
                        "operator": "oph_explorecube",
                        "arguments": [ ],
                        "dependencies":
                        [
                                { "task": "Selection block end",
                                  "type": "single" }
                        ]
                }
        ]
}

As shown even in the next figure, task dependencies have to be set according to the following rules:

  1. the task with the operator OPH_ELSE has to be a child of the task with the operator OPH_IF
  2. the set of tasks belonging to any branch that begins from OPH_IF and ends to OPH_ENDIF except the tasks belonging to branches starting from OPH_ELSE is the sub-workflow to be executed in case the condition set for OPH_IF is satisfied
  3. the set of tasks belonging to any branch that begins from between OPH_ELSE and ends to OPH_ENDIF is the sub-workflow to be executed in case the condition set for OPH_IF is not satisfied
Workflow with a selection interface with two blocks

The following figure shows the workflow at the end of an execution in which the option B (import and subset with only task) has been selected. As expected the tasks related to option A are shown as unselected.

Workflow with a selection interface with two blocks

An interesting application of this approach is the setting of alternate workflows, for example one for production and one for debugging purposes.

Selection statement with more than two selection blocks

The following figure reports a workflow in which all the operators of selection interface are adopted. The aim is to select one task among a task set based on the value of the input parameter likewise a classic switch-case statement.

Workflow with a selection interface with two blocks

Task dependencies are set according to the following additional rules:

  1. the task OPH_ELSEIF has to be a child of OPH_IF or another OPH_ELSEIF
  2. the task OPH_ELSE could be a child of OPH_ELSEIF
  3. the set of tasks belonging to any branch that begins from OPH_ELSEIF and ends to OPH_ENDIF except the tasks belonging to branches starting from other inner OPH_ELSEIF or OPH_ELSE is the sub-workflow to be executed in case the condition set for OPH_ELSEIF is satisfied

Also in this case, an application of this approach is the setting of alternate workflows.