Source code for openeo_udf.api.udf_wrapper
from openeo_udf.api.datacube import DataCube
from openeo_udf.api.udf_data import UdfData
from typing import Dict, Callable
import xarray
import numpy
import pandas
from pandas import Series
[docs]def apply_timeseries(series: Series, context:Dict)->Series:
    """
    Do something with the timeseries
    :param series:
    :param context:
    :return:
    """
    return series 
[docs]def apply_timeseries_generic(udf_data: UdfData, callback: Callable = apply_timeseries):
    """
    Implements the UDF contract by calling a user provided time series transformation function (apply_timeseries).
    Multiple bands are currently handled separately, another approach could provide a dataframe with a timeseries for each band.
    :param udf_data:
    :return:
    """
    # The list of tiles that were created
    tile_results = []
    # Iterate over each cube
    for cube in udf_data.get_datacube_list():
        array3d = []
        #use rollaxis to make the time dimension the last one
        for time_x_slice in numpy.rollaxis(cube.array.values, 1):
            time_x_result = []
            for time_slice in time_x_slice:
                series = pandas.Series(time_slice)
                transformed_series = callback(series,udf_data.user_context)
                time_x_result.append(transformed_series)
            array3d.append(time_x_result)
        # We need to create a new 3D array with the correct shape for the computed aggregate
        result_tile = numpy.rollaxis(numpy.asarray(array3d),1)
        assert result_tile.shape == cube.array.shape
        # Create the new raster collection cube
        rct = DataCube(xarray.DataArray(result_tile))
        tile_results.append(rct)
    # Insert the new tiles as list of raster collection tiles in the input object. The new tiles will
    # replace the original input tiles.
    udf_data.set_datacube_list(tile_results)
    return udf_data