Workflows: Advanced features

Single and massive operations

A workflow consists of one or more tasks and each task can be a simple operator or a massive operation. There is no difference in syntax between the two task types, except for massive tasks having one or more filtering rules, enclosed between square brackets, among the operator’s arguments.

Dependency specification

With regard to dependency specification, the schema provides 3 different types of dependency, identified by 3 keywords:

  1. single: use this type of dependency in case the child task exploits only one output of parent task; if more outputs are produced an error is raised and the workflow aborts;
  2. all: use this keyword in case the child task processes all the outputs of parent task (possibly more than one); for example use it in case of dependences between massive operations;
  3. embedded: use this type to specify a simple flow dependency, namely the child task has to begin only after the parent task has finished, with no (implicit) dependency on the outputs of the parent task; in this case all the necessary inputs are already available to the current task or, as the keyword suggests, are embedded into its definition (for example the argument cube is already set in the JSON array arguments of the child task).

For the first two options, it is also possible to specify the particular operator argument whose value is depending on the output produced by another task. For example, it is possible to use the identifier of a datacube produced elsewhere in the workflow as the value assigned to the argument cube, common to most Ophidia operators, and then perform a transformation on those data, with no a-priori knowledge on the actual cube identifier, generated at runtime. By default, the parameter argument of a dependency is set to cube so that it can be usually omitted, for instance in case of the workflow is a linear chain.

When the user submits a workflow by using Ophidia Terminal, the environmental variables OPH_SESSION_ID, OPH_EXEC_MODE, OPH_NCORES, OPH_CWD and OPH_DATACUBE are used to set a number of global attributes and task arguments (without changing the original JSON Request) if not already set.

Workflows and Terminal

Each user submission is a the request for the execution of a workflow. In fact, even single commands or massive operations are actually converted (by Ophidia Terminal) into JSON Requests for the execution of simple workflows with only one task each one. For example, the command

oph_list level=2;

is converted into the following request

{
        "ncores":"1",
        "cwd":"/",
        "command":"oph_list level=2;",
        "name":"oph_list",
        "author":"oph-test",
        "abstract":"Workflow generated by Oph_Term to wrap a command",
        "exec_mode":"sync",
        "tasks":
        [
                {
                        "name":"Task 0",
                        "operator":"oph_list",
                        "arguments": ["level=2","ncores=1","cwd=/"]
                }
        ]
}

There is a one-to-one translation between the submission string as used with the Ophidia Terminal and the definition of a task in a workflow file.

By using the terminal, a user can submit an operator specifying its name and, after a white space, a series of semicolon-separated key-value pairs. In the corresponding workflow there are two distinct keywords, one for the operator and one for the arguments, and all key-value pairs are listed in a JSON array without using semicolons.

Note that some global attributes of the workflow generated by the terminal are set automatically based on some environmental variables: the parameter author is set to username, whereas command is set to effective submission string written by the user. Of course, command arguments are splitted and then inserted into the related array.

There exists an important difference between the output of a single (or a massive) operation and the output of a generic workflow with more tasks. In the first case Ophidia Server returns the output of the specific task. Otherwise, workflow output is a table reporting some information on each task: status, identifier (Marker ID), name, etc. The user can obtain the output of a specific task by its identifier using the command view.

JSON Requests and JSON Responses

A JSON Request has to be compliant with the schema reported here. JSON schema has been defined with a set of keywords that can be used for defining tasks, linking them together with different types of dependencies, setting global variables, specifying the system behaviour in case of errors and so on.

Comments are allowed to be inserted in JSON Requests and, in particular, the format related to some common programming languages like C and Java is adopted (see the following example).

{
        "ncores":"1",
        "cwd":"/",
        "command":"oph_list level=2;",
        "name":"oph_list",
        "author":"oph-test",
        "abstract":"Workflow generated by Oph_Term to wrap a command",
/*
        This is a comment block
*/
        "exec_mode":"sync",
        "tasks":
        [
                {
                        "name":"Task 0", // This is an inline comment
                        "operator":"oph_list",
                        "arguments": ["level=2","ncores=1","cwd=/"]
                }
        ]
}

The schema of a JSON Response depends on the specific operator. It represents a text, a table, a graph or other more complex data structs. Usually a JSON Response represents a table. In this case it will be compliant to the schema reported here.

Handling task errors

The workflow manager allows the user to specify the preferred behaviour in case of error events, task failures, etc. By default, workflow execution is blocked immediately after an error occurs, but it is possible to ignore the error only for a particular task and its descendants, and continue with the execution of the remaining tasks, as well as repeat the execution of the failed task before break the workflow. To exploit this feature, use the argument on_error of each task as follows.

  1. skip: the task is considered successfully in any case and its descendants can be executed (provided that other dependencies are not broken);
  2. continue: in case of errors the task is considered failed and its descendants are not executed, but workflow execution will continue thus ending eventually with success;
  3. break: workflow execution is aborted in case of task failure (default);
  4. repeat N: the task will be re-executed in case of errors until N times before aborting the workflow.

Consider the setting to skip in case of temporary containers or cubes have to be created.

The default value of on_error can be set as global attribute as follows.

{
        "name": "Example5",
        "author": "Foo",
        "abstract": "Simple workflow with automatic repetition in case of errors",
        "exec_mode": "sync",
        "ncores": "1",
        "cube": "http://hostname/1/1",
        "on_error": "repeat 2",
        "tasks":
        [
                {
                        "name": "Data reduction",
                        "operator": "oph_reduce",
                        "arguments": [ "operation=avg" ]
                }
        ]
}

Intermediate results

Although a workflow may generate many data, only the last results could be usually analyzed by the user, exported as files, published on the web. In this cases the intermediate results could be deleted (e.g. to save disk space) as soon as the workflow ends since they are not useful. By using parameter on_exit the user can select the cubes that will be dropped out when a workflow ends. Consider the following example

{
        "name": "Example6",
        "author": "Foo",
        "abstract": "Simple workflow with three tasks",
        "exec_mode": "sync",
        "ncores": "1",
        "cube": "http://hostname/1/1",
        "tasks":
        [
                {
                        "name": "Extract maximum value",
                        "operator": "oph_reduce",
                        "arguments": [ "operation=max" ],
                        "on_exit": "oph_delete"
                },
                {
                        "name": "Extract minimum value",
                        "operator": "oph_reduce",
                        "arguments": [ "operation=min" ],
                        "on_exit": "oph_delete"
                },
                {
                        "name": "Evaluate max-min range",
                        "operator": "oph_intercube",
                        "arguments": [ "operation=sub" ],
                        "dependencies":
                        [
                                { "task": "Extract maximum value", "type": "single", "argument": "cube" },
                                { "task": "Extract minimum value", "type": "single", "argument": "cube2" }
                        ]
                }
        ]
}

where the intermediate results of data reduction are dropped out at the end of the workflow execution by setting the task parameter on_exit to oph_delete.

The default value of on_exit can be set as global attribute. Admitted values are:

  1. oph_delete: remove the output cube
  2. oph_deletecontainer: remove the output container (valid only for OPH_CREATECONTAINER)
  3. nop: no operation has to be applied (default).

In case more cubes needs to be removed, the workflow engine performs a massive operation using the operator OPH_DELETE. Containers are removed simply by submitting the operator OPH_DELETECONTAINER.

Host partitions

Most of ophidia data operators save output data cubes in the same analytics nodes (or a subset) where input data are stored. A system catalog is adopted to manage internal hierachical data model and select analytics nodes correctly, so that the user does not have to list the nodes involved in an operation.

The analytics nodes involved in ophidia import operators can be chosen by the user, by selecting one (pre-registered) host partition. Refer to operator documentation for further information.

If in a workflow two or more data cubes forged by ophidia import operators are processed by binary operators (such as OPH_INTERCUBE) it is important that corresponding data are stored at the same host partition and using the same node order (in case of data fragmentation). For this reason, the user has to specify the intended host partition by setting the global variable host_partition. By default Ophidia uses the first available host partition and selects the analytics nodes according to a round-robin discipline, so that the nodes could be different each time.

For instance, consider the following workflow where two datasets are imported and then compared each other (partition name main needs to be set to an available partition).

{
        "name": "Example7",
        "author": "Foo",
        "abstract": "Simple workflow with three tasks",
        "host_partition": "main",
        "tasks":
        [
                {
                        "name": "Input dataset 1",
                        "operator": "oph_importnc",
                        "arguments": [ "src_path=file1.nc", "measure=measure" ]
                },
                {
                        "name": "Input dataset 2",
                        "operator": "oph_importnc",
                        "arguments": [ "src_path=file2.nc", "measure=measure" ]
                },
                {
                        "name": "Compare the datasets",
                        "operator": "oph_intercube",
                        "arguments": [ "operation=sub" ],
                        "dependencies":
                        [
                                { "task": "Input dataset 1", "type": "single", "argument": "cube" },
                                { "task": "Input dataset 2", "type": "single", "argument": "cube2" }
                        ]
                }
        ]
}

In this example, the comparison operator can be executed only in case host_partition is set; otherwise, the analytics nodes selected during the import operations could be different, thus leading the final operation to fail.

Using runtime variables

Many Ophidia operators are very useful in case they called in interactive mode. As an example consider OPH_CUBESCHEMA: the output of this operator includes a number of metadata associated to a datacube that user could wish to know before launching a new command while using Ophidia Terminal. Another example is OPH_LIST, which is used to extract the set of datacubes in a given folder and return it to the user as grid.

Of course these metadata operators can also represent the tasks of a workflow, but their outputs (JSON Responses) will be neither returned to user nor processed further in the rest of workflow.

Actually, the JSON Response of every operator can be processed within a workflow, maybe to set parameters of a possible child task. To this consider a task based on OPH_SET.

Basically, this operator allocates a user-defined variable and set it to the value of a (scalar or vector) element of the JSON Response related to a parent task. A constant string/numeric value or the result of a mathematical expression can be also assigned.

After setting a variable, its name can be then used in the rest of workflow to refer to the value of that field. The scope of variable is limited to the workflow in which it is defined.

The syntax adopted to select a field of JSON Response is a flexible dot-notation. In case of reference to grids (JSON Responses usually consist of one or more grids) the following formats can be used

objkey.gridname.colname(rowindex)
objkey.gridname(rowindex,colindex)

where labels are:

  1. objkey (i.e. object key) is a reference to a specific output of parent task and can be omitted in case only one object is available;
  2. gridname is the table name;
  3. colname is the name of the column containing the field to be selected;
  4. rowindex is the (numeric) index of the row containing the field to be selected (use * in case all the items of a column has to be selected, use end for the last row);
  5. colindex is the (numeric) index of the column containing the field to be selected (use * in case all the items of a row has to be selected, use end for the last column).

The selected value can be accessed by means of @name or @{name}, where name is the variable name (i.e. the value of parameter name of OPH_SET). Variable name should comply with IEEE Std 1003.1-2001 conventions, unless brackets {} are adopted to refer it. In this case, the name of a variabile could even be obtained from the value of other variables: this option enables the definition of complex variables (e.g. arrays) and is particulary useful in case of selection or iterative tasks. For instance, the value of variable @{suffix} determines the name of variable @{prefix @{suffix}}: if the former is “example”, the latter will be “prefix example”. During workflow execution, variable substitution follows the rule “inner variables are substituted before outer variables”.

The following variables are pre-defined in any workflow without calling OPH_SET explicitly:

  1. OPH_SESSION_ID (e.g. http://hostname/ophidia/sessions/8912794871982734981234/experiment)
  2. OPH_SESSION_CODE (e.g. 8912794871982734981234)
  3. OPH_WORKFLOW_ID (workflow identifier)
  4. OPH_MARKER_ID (task identifier)
  5. OPH_SERVER_HOST
  6. OPH_SERVER_PORT
  7. OPH_USER (used to access Ophidia)
  8. OPH_PASSWD (used to access Ophidia)
  9. OPH_OS_USER (identifer used by the operating system to identify the user that submits requests to Ophidia Analytics Framework)
  10. OPH_NCORES
  11. OPH_NHOST

In addition, each argument of a task can be consided as runtime variable and, hence, its value can be assigned to other arguments. The key associated with this virtual variable is the name of the corresponding argument with capital letters. Hence use @CUBE to get the value of argument cube, use @LEVEL for level, use @OPERATION for operation and so on.

For instance, in the following task

{
        "name": "Set all data to zero",
        "operator": "oph_intercube",
        "arguments": [
                "cube2=@CUBE",
                "operation=sub"
        ],
        "dependencies":
        [
                { "task": "Previous task", "type": "single", "argument": "cube" },
        ]
}

the value of argument cube is assigned to argument cube2.

The following workflow shows the use of OPH_SET to set the variable datatype to field MEASURE TYPE in the output of OPH_CUBESCHEMA. The variable is then used to set the first parameter of primitive OPH_SUM_SCALAR, so that the conversion can be done independently on data type of input cube.

{
        "name": "Example8",
        "author": "Foo",
        "abstract": "Simple workflow with a variable",
        "exec_mode": "sync",
        "ncores": "1",
        "cube": "http://hostname/1/1",
        "tasks":
        [
                {
                        "name": "CubeSchema",
                        "operator": "oph_cubeschema",
                        "arguments": [ ]
                },
                {
                        "name": "Set",
                        "operator": "oph_set",
                        "arguments": [ "key=datatype", "value=cubeschema_cubeinfo.Datacube Information.MEASURE TYPE(1)" ],
                        "dependencies": [ { "task":"CubeSchema" } ]
                },
                {
                        "name": "Conversion",
                        "operator": "oph_apply",
                        "arguments": [ "query=oph_sum_scalar('oph_@datatype','oph_double',measure,273.15)" ],
                        "dependencies": [ { "task":"Set" } ]
                }
        ]
}

As an additional example, consider the following workflow. In this case OPH_SET is used to select the mean value meanvalue along implicit dimensions of input cube and, then, to evalute deviations. It is assumed that:

  1. two dimensions are explicit (e.g. latitude and longitude) and one dimension is implicit (e.g. time);
  2. input cube consists of one fragment only;
  3. measure name is temperature.
{
        "name": "Example9",
        "author": "Foo",
        "abstract": "Workflow used to evaluate deviations from the mean",
        "exec_mode": "sync",
        "ncores": "1",
        "cube": "http://hostname/1/1",
        "tasks":
        [
                {
                        "name": "Reduction",
                        "operator": "oph_reduce",
                        "arguments": [ "operation=avg" ]
                },
                {
                        "name": "Aggregation",
                        "operator": "oph_aggregate",
                        "arguments": [ "operation=avg" ],
                        "dependencies": [ { "task":"Reduction", "type":"single" } ]
                },
                {
                        "name": "ExploreCube",
                        "operator": "oph_explorecube",
                        "arguments": [ ],
                        "dependencies": [ { "task":"Aggregation", "type":"single" } ]
                },
                {
                        "name": "Set",
                        "operator": "oph_set",
                        "arguments": [ "key=meanvalue", "value=temperature(1,3)" ],
                        "dependencies": [ { "task":"ExploreCube" } ]
                },
                {
                        "name": "Apply",
                        "operator": "oph_apply",
                        "arguments": [ "query=oph_sum_scalar(measure,-@meanvalue)","measure_type=auto" ],
                        "dependencies": [ { "task":"Set" } ]
                }
        ]
}

Note that output of task OPH_EXPLORECUBE is similar to

[Response]:
temperature
¯¯¯¯¯¯¯¯¯¯¯
+=====+=====+================+
| lat | lon | temperature    |
+=====+=====+================+
| ALL | ALL | 288.2395324707 |
+=====+=====+================+

so that the name of output table is temperature and the mean value obtained by reduction is in position (1,3). Thus, parameter value of OPH_SET can be set to one of the following options:

temperature(1,3)
explorecube_data.temperature(1,3)
explorecube_data.temperature.temperature(1)

Use the keyword end to get the last element of a raw or column as follows.

{
        "name": "Set",
        "operator": "oph_set",
        "arguments": [ "key=meanvalue", "value=temperature(1,end)" ],
        "dependencies": [ { "task":"ExploreCube" } ]
}

Macro EVAL() can be used to evaluate expressions as in the next example. This feature cannot be used to process data from JSON Responses.

{
        "name": "Set",
        "operator": "oph_set",
        "arguments": [ "key=meanvalue", "value=EVAL(100 + 100)" ]
}

Note that variable is scalar, but it could also be set to an array, whose values are separated by pipe. For instance, consider to extract one raw or one column of a grid. This feature is very useful in case of indexes of OPH_FOR: by default iterative and parallel interfaces sets the variable to different value of the array for each cycle, following the order of the array.

In next example the keyword * is used to refer to all the values of an array.

{
        "tasks":
        [
                {
                        "name": "Get dimension value",
                        "operator": "oph_cubeschema",
                        "arguments": [ "level=1", "dim=time" ]
                },
                {
                        "name": "Set",
                        "operator": "oph_set",
                        "arguments": [ "key=timevalues", "value=time(*,1)" ],
                        "dependencies": [ { "task":"Get dimension value" } ]
                }
        ]
}

Actually OPH_SET sets the variable identified by argument key to the value of argument value (without skipping the pipes) and creates a number of additional variables whose name is key followed by a numerical suffix (starting from 1): key_1, key_2, key_3... Adopt the approach shown in next example to access these variables.

Finally, consider the following tasks representing a cycle.

{
        "tasks":
        [
                {
                        "name": "Begin loop",
                        "operator": "oph_for",
                        "arguments":
                        [
                                "name=index",
                                "counter=1:12"
                        ]
                },
                {
                        "name": "Compare cube_0 with a list of cubes",
                        "operator": "oph_intercube",
                        "arguments": [
                                "cube=@cube_0",
                                "cube2=@{cube_@{index}}",
                                "description=comparison between @cube_0 and @{cube_@{index}}"
                        ],
                        "dependencies": [ { "task": "Begin loop" } ]
                },
                {
                        "name": "End loop",
                        "operator": "oph_endfor",
                        "arguments": [ ],
                        "dependencies": [ { "task": "Compare cube_0 with a list of cubes" } ]
                }
        ]
}

In this case the variable @{cube_0} contains the PID of a cube to be compared with a number of cubes identified by @{cube_1}, @{cube_2}, ... @{cube_12}: the counter @{index} is used to refer to these variables more concisely, making them items of a sort of array.

Workflow and task status

The possible values of workflow status or task status reported in workflow output are:

  1. OPH_STATUS_PENDING : the task has been queued for execution by resource manager
  2. OPH_STATUS_WAITING : the task is waiting for an event: timeout, input data from Ophidia Terminal, presence of a file on disk, etc.
  3. OPH_STATUS_RUNNING : the workflow is running
  4. OPH_STATUS_START : the task has been accepted by the analytics framework
  5. OPH_STATUS_SET_ENV : the running environment has been set up
  6. OPH_STATUS_INIT : task input arguments have been processed
  7. OPH_STATUS_DISTRIBUTE : data have been sent to processes (in case of parallel operators)
  8. OPH_STATUS_EXECUTE : task execution has been completed
  9. OPH_STATUS_REDUCE : data have been gathered from processes (in case of parallel operators)
  10. OPH_STATUS_DESTROY : task temporary variables have been unset
  11. OPH_STATUS_UNSET_ENV : the running environment has been unset
  12. OPH_STATUS_COMPLETED : the workflow or the task has been completed
  13. OPH_STATUS_ERROR : the workflow has failed
  14. OPH_STATUS_PENDING_ERROR : the task has failed while it was queued
  15. OPH_STATUS_RUNNING_ERROR : the task has failed while starting the analytics framework
  16. OPH_STATUS_START_ERROR : the task has failed upon the delivery to resource manager
  17. OPH_STATUS_SET_ENV_ERROR : the task has failed while setting up the running environment
  18. OPH_STATUS_INIT_ERROR : the task has failed while parsing task input arguments
  19. OPH_STATUS_DISTRIBUTE_ERROR : the task has failed while data are sent to processes (in case of parallel operators)
  20. OPH_STATUS_EXECUTE_ERROR : the task has failed during the execution
  21. OPH_STATUS_REDUCE_ERROR : the task has failed while gathering data from processes (in case of parallel operators)
  22. OPH_STATUS_DESTROY_ERROR : the task has failed while unsetting temporary variables
  23. OPH_STATUS_UNSET_ENV_ERROR : the task has failed while unsetting the running environment
  24. OPH_STATUS_SKIPPED : the task has been skipped due a failure
  25. OPH_STATUS_ABORTED : the task has been aborted (e.g. a parent task has failed)
  26. OPH_STATUS_UNSELECTED : the task has been discarded from workflow manager as belonging to an unselected selection block
  27. OPH_STATUS_EXPIRED : the workflow has been cancelled due to timeout