User Defined Functions (UDF’s) explained

User defined functions are a very important feature of OpenEO. They allow you as a user to reuse existing code, by submitting it to the backend.

As datacubes can be very large, the backend will only be able to run your code on a smaller chunk of the whole cube. So you need to help the backend a bit, by designing your code to work on as small a piece of data as possible.

There are a few different types of operations where UDF’s can be used:

  1. Applying a process to each pixel: https://open-eo.github.io/openeo-api/processreference/#apply

  2. Applying a process to all pixels along a dimension, without changing cardinality: apply_dimension

  3. Reducing values along a dimension: https://open-eo.github.io/openeo-api/processreference/#reduce

  4. Applying a process to all pixels in a multidimensional neighborhood: apply_neighborhood

Not all functions will require you to write a custom process. For instance, if you want to take the absolute value of your datacube, you can simply use the predefined absolute value function. In fact, it is recommended to try and use predefined functions, as they can be more efficiëntly implemented.

However, when you have a large piece of code that is hard to transform into predefined openEO functions, then it makes sense to use the UDF functionality.

The section below gives an example to get you started.

Example: Smoothing timeseries with a user defined function (UDF)

In this example, we start from the ‘evi_cube’ that was created in the previous example, and want to apply a temporal smoothing on it. More specifically, we want to use the “Savitzky Golay” smoother that is available in the SciPy Python library.

To ensure that openEO understand your function, it needs to follow some rules, the UDF specification. This is an example that follows those rules:

UDF code
from typing import Dict
import xarray
from scipy.signal import savgol_filter
from openeo_udf.api.datacube import DataCube

def apply_datacube(cube: DataCube, context: Dict) -> DataCube:
    """
    Applies a savitzky-golay smoothing to a timeseries datacube.
    This UDF preserves dimensionality, and assumes a datacube with a temporal dimension 't' as input.
    """
    array: xarray.DataArray = cube.get_array()
    filled = array.interpolate_na(dim='t')
    smoothed_array = savgol_filter(filled.values, 5, 2, axis=0)
    return DataCube(xarray.DataArray(smoothed_array,dims=array.dims,coords=array.coords))

The method signature of the UDF is very important, because the backend will use it to detect the type of UDF. This particular example accepts a ‘DataCube’ object as input and also returns a ‘DataCube’ object. The type annotations and method name are actually used to detect how to invoke the UDF, so make sure they remain unchanged.

The API of the ‘DataCube’ class can be found here openeo.api.

Once the UDF is defined in a separate file, we need to load it:

>>> def get_resource(relative_path):
        return str(Path( relative_path))

    def load_udf(relative_path):
        import json
        with open(get_resource(relative_path), 'r+') as f:
            return f.read()

    smoothing_udf = load_udf('udf/smooth_savitzky_golay.py')
    print(smoothing_udf)

after that, we can simply apply it along a dimension:

>>> smoothed_evi = evi_cube_masked.apply_dimension(smoothing_udf,runtime='Python')

Example: downloading a datacube and executing an UDF locally

Sometimes it is advantageous to run a UDF on the client machine (for example when developing/testing that UDF). This is possible by using the convenience function execute_local_udf(). For example running this UDF:

from typing import Dict import xarray from scipy.signal import savgol_filter from openeo_udf.api.datacube import DataCube

def apply_datacube(cube: DataCube, context: Dict) -> DataCube:

“”” Applies a savitzky-golay smoothing to a timeseries datacube. This UDF preserves dimensionality, and assumes a datacube with a temporal dimension ‘t’ as input. “”” array: xarray.DataArray = cube.get_array() filled = array.interpolate_na(dim=’t’) smoothed_array = savgol_filter(filled.values, 5, 2, axis=0) return DataCube(xarray.DataArray(smoothed_array,dims=array.dims,coords=array.coords))

locally, one has to:

  • Run the process and download the result in ‘NetCDF’ or ‘JSON’ format.

  • Run execute_local_udf on the file.

For example:

>>> def load_udf(path):
        with open(path), 'r+') as f:
            return f.read()

>>> ... # preparing the process

>>> myprocess.download('test_input.nc', format='NetCDF')

>>> udfstr=load_udf('../examples/udf/smooth_savitzky_golay.py')
>>> DataCube.execute_local_udf(udfstr, 'test_input.nc', fmt='netcdf'):

Note: this algorithm’s primary purpose is to aid client side development of UDFs using small datasets. It is not designed for large jobs.

UDF function names

There’s a predefined set of function signatures that you have to use to implement a UDF:

This module defines a number of function signatures that can be implemented by UDF’s. Both the name of the function and the argument types are/can be used by the backend to validate if the provided UDF is compatible with the calling context of the process graph in which it is used.

openeo_udf.api.udf_signatures.apply_datacube(cube, context)[source]

Map a DataCube to another DataCube. Depending on the context in which this function is used, the DataCube dimensions have to be retained or can be chained. For instance, in the context of a reducing operation along a dimension, that dimension will have to be reduced to a single value. In the context of a 1 to 1 mapping operation, all dimensions have to be retained.

Return type

DataCube

Parameters
  • cube (DataCube) – A DataCube object

  • context (Dict) – A dictionary containing user context.

Returns

A DataCube object

openeo_udf.api.udf_signatures.apply_timeseries(series, context)[source]

Process a timeseries of values, without changing the time instants. This can for instance be used for smoothing or gap-filling. TODO: do we need geospatial coordinates for the series?

Return type

Series

Parameters
  • series (Series) – A Pandas Series object with a date-time index.

  • context (Dict) – A dictionary containing user context.

Returns

A Pandas Series object with the same datetime index.

Profile a process server-side

Warning

Experimental feature - This feature only works on backends running the Geotrellis implementation, and has not yet been adopted in the openEO API.

Sometimes users want to ‘profile’ their UDF on the backend. While it’s recommended to first profile it offline, in the same manner as you can debug UDF’s, backends may support profiling directly. Note that this will only generate statistics over the python part of the execution, therefore it is only suitable for profiling UDFs.

Usage

Only batch jobs are supported! In order to turn on profiling, set ‘profile’ to ‘true’ in job options:

job_options={'profile':'true'}
... # prepare the process
process.execute_batch('result.tif',job_options=job_options)

When the process has finished, it will also download a file called ‘profile_dumps.tar.gz’:

  • rdd_-1.pstats is the profile data of the python driver,

  • the rest are the profiling results of the individual rdd id-s (that can be correlated with the execution using the SPARK UI).

Viewing profiling information

The simplest way is to visualize the results with a graphical visualization tool called kcachegrind. In order to do that, install kcachegrind packages (most linux distributions have it installed by default) and it’s python connector pyprof2calltree. From command line run:

pyprof2calltree rdd_<INTERESTING_RDD_ID>.pstats.

Another way is to use the builtin pstats functionality from within python:

import pstats
        p = pstats.Stats('restats')
        p.print_stats()

Example

An example code can be found here .