A workflow consists of one or more tasks and each task can be a simple operator or a massive operation. There is no difference in syntax between the two task types, except for massive tasks having one or more filtering rules, enclosed between square brackets, among the operator’s arguments.
With regard to dependency specification, the schema provides 3 different types of dependency, identified by 3 keywords:
For the first two options, it is also possible to specify the particular operator argument whose value is depending on the output produced by another task. For example, it is possible to use the identifier of a datacube produced elsewhere in the workflow as the value assigned to the argument cube, common to most Ophidia operators, and then perform a transformation on those data, with no a-priori knowledge on the actual cube identifier, generated at runtime. By default, the parameter argument of a dependency is set to cube so that it can be usually omitted, for instance in case of the workflow is a linear chain.
When the user submits a workflow by using Ophidia Terminal, the environmental variables OPH_SESSION_ID, OPH_EXEC_MODE, OPH_NCORES, OPH_CWD and OPH_DATACUBE are used to set a number of global attributes and task arguments (without changing the original JSON Request) if not already set.
Each user submission is a the request for the execution of a workflow. In fact, even single commands or massive operations are actually converted (by Ophidia Terminal) into JSON Requests for the execution of simple workflows with only one task each one. For example, the command
oph_list level=2;
is converted into the following request
{
"ncores":"1",
"cwd":"/",
"command":"oph_list level=2;",
"name":"oph_list",
"author":"oph-test",
"abstract":"Workflow generated by Oph_Term to wrap a command",
"exec_mode":"sync",
"tasks":
[
{
"name":"Task 0",
"operator":"oph_list",
"arguments": ["level=2","ncores=1","cwd=/"]
}
]
}
There is a one-to-one translation between the submission string as used with the Ophidia Terminal and the definition of a task in a workflow file.
By using the terminal, a user can submit an operator specifying its name and, after a white space, a series of semicolon-separated key-value pairs. In the corresponding workflow there are two distinct keywords, one for the operator and one for the arguments, and all key-value pairs are listed in a JSON array without using semicolons.
Note that some global attributes of the workflow generated by the terminal are set automatically based on some environmental variables: the parameter author is set to username, whereas command is set to effective submission string written by the user. Of course, command arguments are splitted and then inserted into the related array.
There exists an important difference between the output of a single (or a massive) operation and the output of a generic workflow with more tasks. In the first case Ophidia Server returns the output of the specific task. Otherwise, workflow output is a table reporting some information on each task: status, identifier (Marker ID), name, etc. The user can obtain the output of a specific task by its identifier using the command view.
A JSON Request has to be compliant with the schema reported here. JSON schema has been defined with a set of keywords that can be used for defining tasks, linking them together with different types of dependencies, setting global variables, specifying the system behaviour in case of errors and so on.
Comments are allowed to be inserted in JSON Requests and, in particular, the format related to some common programming languages like C and Java is adopted (see the following example).
{
"ncores":"1",
"cwd":"/",
"command":"oph_list level=2;",
"name":"oph_list",
"author":"oph-test",
"abstract":"Workflow generated by Oph_Term to wrap a command",
/*
This is a comment block
*/
"exec_mode":"sync",
"tasks":
[
{
"name":"Task 0", // This is an inline comment
"operator":"oph_list",
"arguments": ["level=2","ncores=1","cwd=/"]
}
]
}
The schema of a JSON Response depends on the specific operator. It represents a text, a table, a graph or other more complex data structs. Usually a JSON Response represents a table. In this case it will be compliant to the schema reported here.
The workflow manager allows the user to specify the preferred behaviour in case of error events, task failures, etc. By default, workflow execution is blocked immediately after an error occurs, but it is possible to ignore the error only for a particular task and its descendants, and continue with the execution of the remaining tasks, as well as repeat the execution of the failed task before break the workflow. To exploit this feature, use the argument on_error of each task as follows.
Consider the setting to skip in case of temporary containers or cubes have to be created.
The default value of on_error can be set as global attribute as follows.
{
"name": "Example5",
"author": "Foo",
"abstract": "Simple workflow with automatic repetition in case of errors",
"exec_mode": "sync",
"ncores": "1",
"cube": "http://hostname/1/1",
"on_error": "repeat 2",
"tasks":
[
{
"name": "Data reduction",
"operator": "oph_reduce",
"arguments": [ "operation=avg" ]
}
]
}
Although a workflow may generate many data, only the last results could be usually analyzed by the user, exported as files, published on the web. In this cases the intermediate results could be deleted (e.g. to save disk space) as soon as the workflow ends since they are not useful. By using parameter on_exit the user can select the cubes that will be dropped out when a workflow ends. Consider the following example
{
"name": "Example6",
"author": "Foo",
"abstract": "Simple workflow with three tasks",
"exec_mode": "sync",
"ncores": "1",
"cube": "http://hostname/1/1",
"tasks":
[
{
"name": "Extract maximum value",
"operator": "oph_reduce",
"arguments": [ "operation=max" ],
"on_exit": "oph_delete"
},
{
"name": "Extract minimum value",
"operator": "oph_reduce",
"arguments": [ "operation=min" ],
"on_exit": "oph_delete"
},
{
"name": "Evaluate max-min range",
"operator": "oph_intercube",
"arguments": [ "operation=sub" ],
"dependencies":
[
{ "task": "Extract maximum value", "type": "single", "argument": "cube" },
{ "task": "Extract minimum value", "type": "single", "argument": "cube2" }
]
}
]
}
where the intermediate results of data reduction are dropped out at the end of the workflow execution by setting the task parameter on_exit to oph_delete.
The default value of on_exit can be set as global attribute. Admitted values are:
In case more cubes needs to be removed, the workflow engine performs a massive operation using the operator OPH_DELETE. Containers are removed simply by submitting the operator OPH_DELETECONTAINER.
Most of ophidia data operators save output data cubes in the same analytics nodes (or a subset) where input data are stored. A system catalog is adopted to manage internal hierachical data model and select analytics nodes correctly, so that the user does not have to list the nodes involved in an operation.
The analytics nodes involved in ophidia import operators can be chosen by the user, by selecting one (pre-registered) host partition. Refer to operator documentation for further information.
If in a workflow two or more data cubes forged by ophidia import operators are processed by binary operators (such as OPH_INTERCUBE) it is important that corresponding data are stored at the same host partition and using the same node order (in case of data fragmentation). For this reason, the user has to specify the intended host partition by setting the global variable host_partition. By default Ophidia uses the first available host partition and selects the analytics nodes according to a round-robin discipline, so that the nodes could be different each time.
For instance, consider the following workflow where two datasets are imported and then compared each other (partition name main needs to be set to an available partition).
{
"name": "Example7",
"author": "Foo",
"abstract": "Simple workflow with three tasks",
"host_partition": "main",
"tasks":
[
{
"name": "Input dataset 1",
"operator": "oph_importnc",
"arguments": [ "src_path=file1.nc", "measure=measure" ]
},
{
"name": "Input dataset 2",
"operator": "oph_importnc",
"arguments": [ "src_path=file2.nc", "measure=measure" ]
},
{
"name": "Compare the datasets",
"operator": "oph_intercube",
"arguments": [ "operation=sub" ],
"dependencies":
[
{ "task": "Input dataset 1", "type": "single", "argument": "cube" },
{ "task": "Input dataset 2", "type": "single", "argument": "cube2" }
]
}
]
}
In this example, the comparison operator can be executed only in case host_partition is set; otherwise, the analytics nodes selected during the import operations could be different, thus leading the final operation to fail.
Many Ophidia operators are very useful in case they called in interactive mode. As an example consider OPH_CUBESCHEMA: the output of this operator includes a number of metadata associated to a datacube that user could wish to know before launching a new command while using Ophidia Terminal. Another example is OPH_LIST, which is used to extract the set of datacubes in a given folder and return it to the user as grid.
Of course these metadata operators can also represent the tasks of a workflow, but their outputs (JSON Responses) will be neither returned to user nor processed further in the rest of workflow.
Actually, the JSON Response of every operator can be processed within a workflow, maybe to set parameters of a possible child task. To this consider a task based on OPH_SET.
Basically, this operator allocates a user-defined variable and set it to the value of a (scalar or vector) element of the JSON Response related to a parent task. A constant string/numeric value or the result of a mathematical expression can be also assigned.
After setting a variable, its name can be then used in the rest of workflow to refer to the value of that field. The scope of variable is limited to the workflow in which it is defined.
The syntax adopted to select a field of JSON Response is a flexible dot-notation. In case of reference to grids (JSON Responses usually consist of one or more grids) the following formats can be used
objkey.gridname.colname(rowindex)
objkey.gridname(rowindex,colindex)
where labels are:
The selected value can be accessed by means of @name or @{name}, where name is the variable name (i.e. the value of parameter name of OPH_SET). Variable name should comply with IEEE Std 1003.1-2001 conventions, unless brackets {} are adopted to refer it. In this case, the name of a variabile could even be obtained from the value of other variables: this option enables the definition of complex variables (e.g. arrays) and is particulary useful in case of selection or iterative tasks. For instance, the value of variable @{suffix} determines the name of variable @{prefix @{suffix}}: if the former is “example”, the latter will be “prefix example”. During workflow execution, variable substitution follows the rule “inner variables are substituted before outer variables”.
The following variables are pre-defined in any workflow without calling OPH_SET explicitly:
In addition, each argument of a task can be consided as runtime variable and, hence, its value can be assigned to other arguments. The key associated with this virtual variable is the name of the corresponding argument with capital letters. Hence use @CUBE to get the value of argument cube, use @LEVEL for level, use @OPERATION for operation and so on.
For instance, in the following task
{
"name": "Set all data to zero",
"operator": "oph_intercube",
"arguments": [
"cube2=@CUBE",
"operation=sub"
],
"dependencies":
[
{ "task": "Previous task", "type": "single", "argument": "cube" },
]
}
the value of argument cube is assigned to argument cube2.
The following workflow shows the use of OPH_SET to set the variable datatype to field MEASURE TYPE in the output of OPH_CUBESCHEMA. The variable is then used to set the first parameter of primitive OPH_SUM_SCALAR, so that the conversion can be done independently on data type of input cube.
{
"name": "Example8",
"author": "Foo",
"abstract": "Simple workflow with a variable",
"exec_mode": "sync",
"ncores": "1",
"cube": "http://hostname/1/1",
"tasks":
[
{
"name": "CubeSchema",
"operator": "oph_cubeschema",
"arguments": [ ]
},
{
"name": "Set",
"operator": "oph_set",
"arguments": [ "key=datatype", "value=cubeschema_cubeinfo.Datacube Information.MEASURE TYPE(1)" ],
"dependencies": [ { "task":"CubeSchema" } ]
},
{
"name": "Conversion",
"operator": "oph_apply",
"arguments": [ "query=oph_sum_scalar('oph_@datatype','oph_double',measure,273.15)" ],
"dependencies": [ { "task":"Set" } ]
}
]
}
As an additional example, consider the following workflow. In this case OPH_SET is used to select the mean value meanvalue along implicit dimensions of input cube and, then, to evalute deviations. It is assumed that:
{
"name": "Example9",
"author": "Foo",
"abstract": "Workflow used to evaluate deviations from the mean",
"exec_mode": "sync",
"ncores": "1",
"cube": "http://hostname/1/1",
"tasks":
[
{
"name": "Reduction",
"operator": "oph_reduce",
"arguments": [ "operation=avg" ]
},
{
"name": "Aggregation",
"operator": "oph_aggregate",
"arguments": [ "operation=avg" ],
"dependencies": [ { "task":"Reduction", "type":"single" } ]
},
{
"name": "ExploreCube",
"operator": "oph_explorecube",
"arguments": [ ],
"dependencies": [ { "task":"Aggregation", "type":"single" } ]
},
{
"name": "Set",
"operator": "oph_set",
"arguments": [ "key=meanvalue", "value=temperature(1,3)" ],
"dependencies": [ { "task":"ExploreCube" } ]
},
{
"name": "Apply",
"operator": "oph_apply",
"arguments": [ "query=oph_sum_scalar(measure,-@meanvalue)","measure_type=auto" ],
"dependencies": [ { "task":"Set" } ]
}
]
}
Note that output of task OPH_EXPLORECUBE is similar to
[Response]:
temperature
¯¯¯¯¯¯¯¯¯¯¯
+=====+=====+================+
| lat | lon | temperature |
+=====+=====+================+
| ALL | ALL | 288.2395324707 |
+=====+=====+================+
so that the name of output table is temperature and the mean value obtained by reduction is in position (1,3). Thus, parameter value of OPH_SET can be set to one of the following options:
temperature(1,3)
explorecube_data.temperature(1,3)
explorecube_data.temperature.temperature(1)
Use the keyword end to get the last element of a raw or column as follows.
{
"name": "Set",
"operator": "oph_set",
"arguments": [ "key=meanvalue", "value=temperature(1,end)" ],
"dependencies": [ { "task":"ExploreCube" } ]
}
Macro EVAL() can be used to evaluate expressions as in the next example. This feature cannot be used to process data from JSON Responses.
{
"name": "Set",
"operator": "oph_set",
"arguments": [ "key=meanvalue", "value=EVAL(100 + 100)" ]
}
Note that variable is scalar, but it could also be set to an array, whose values are separated by pipe. For instance, consider to extract one raw or one column of a grid. This feature is very useful in case of indexes of OPH_FOR: by default iterative and parallel interfaces sets the variable to different value of the array for each cycle, following the order of the array.
In next example the keyword * is used to refer to all the values of an array.
{
"tasks":
[
{
"name": "Get dimension value",
"operator": "oph_cubeschema",
"arguments": [ "level=1", "dim=time" ]
},
{
"name": "Set",
"operator": "oph_set",
"arguments": [ "key=timevalues", "value=time(*,1)" ],
"dependencies": [ { "task":"Get dimension value" } ]
}
]
}
Actually OPH_SET sets the variable identified by argument key to the value of argument value (without skipping the pipes) and creates a number of additional variables whose name is key followed by a numerical suffix (starting from 1): key_1, key_2, key_3... Adopt the approach shown in next example to access these variables.
Finally, consider the following tasks representing a cycle.
{
"tasks":
[
{
"name": "Begin loop",
"operator": "oph_for",
"arguments":
[
"name=index",
"counter=1:12"
]
},
{
"name": "Compare cube_0 with a list of cubes",
"operator": "oph_intercube",
"arguments": [
"cube=@cube_0",
"cube2=@{cube_@{index}}",
"description=comparison between @cube_0 and @{cube_@{index}}"
],
"dependencies": [ { "task": "Begin loop" } ]
},
{
"name": "End loop",
"operator": "oph_endfor",
"arguments": [ ],
"dependencies": [ { "task": "Compare cube_0 with a list of cubes" } ]
}
]
}
In this case the variable @{cube_0} contains the PID of a cube to be compared with a number of cubes identified by @{cube_1}, @{cube_2}, ... @{cube_12}: the counter @{index} is used to refer to these variables more concisely, making them items of a sort of array.
The possible values of workflow status or task status reported in workflow output are: