Source code for openeo_udf.api.udf_wrapper
from openeo_udf.api.datacube import DataCube
from openeo_udf.api.udf_data import UdfData
from typing import Dict, Callable
import xarray
import numpy
import pandas
from pandas import Series
[docs]def apply_timeseries(series: Series, context:Dict)->Series:
"""
Do something with the timeseries
:param series:
:param context:
:return:
"""
return series
[docs]def apply_timeseries_generic(udf_data: UdfData, callback: Callable = apply_timeseries):
"""
Implements the UDF contract by calling a user provided time series transformation function (apply_timeseries).
Multiple bands are currently handled separately, another approach could provide a dataframe with a timeseries for each band.
:param udf_data:
:return:
"""
# The list of tiles that were created
tile_results = []
# Iterate over each cube
for cube in udf_data.get_datacube_list():
array3d = []
#use rollaxis to make the time dimension the last one
for time_x_slice in numpy.rollaxis(cube.array.values, 1):
time_x_result = []
for time_slice in time_x_slice:
series = pandas.Series(time_slice)
transformed_series = callback(series,udf_data.user_context)
time_x_result.append(transformed_series)
array3d.append(time_x_result)
# We need to create a new 3D array with the correct shape for the computed aggregate
result_tile = numpy.rollaxis(numpy.asarray(array3d),1)
assert result_tile.shape == cube.array.shape
# Create the new raster collection cube
rct = DataCube(xarray.DataArray(result_tile))
tile_results.append(rct)
# Insert the new tiles as list of raster collection tiles in the input object. The new tiles will
# replace the original input tiles.
udf_data.set_datacube_list(tile_results)
return udf_data