Source code for openeo_udf.api.udf_data

#!/usr/bin/env python
# -*- coding: utf-8 -*-
"""OpenEO Python UDF interface"""

import xarray
from typing import Optional, List, Dict
from openeo_udf.api.feature_collection import FeatureCollection
from openeo_udf.api.datacube import DataCube
from openeo_udf.api.machine_learn_model import MachineLearnModelConfig
from openeo_udf.api.spatial_extent import SpatialExtent
from openeo_udf.api.structured_data import StructuredData


__license__ = "Apache License, Version 2.0"
__author__     = "Soeren Gebbert"
__copyright__  = "Copyright 2018, Soeren Gebbert"
__maintainer__ = "Soeren Gebbert"
__email__      = "soerengebbert@googlemail.com"


[docs]class UdfData: """The class that stores the arguments for a user defined function (UDF) Some basic tests: >>> from shapely.geometry import Point >>> import geopandas >>> import numpy, pandas >>> from sklearn.ensemble import RandomForestRegressor >>> from sklearn.externals import joblib >>> data = numpy.zeros(shape=(1,1,1)) >>> extent = SpatialExtent(top=100, bottom=0, right=100, left=0, height=10, width=10) >>> starts = pandas.DatetimeIndex([pandas.Timestamp('2012-05-01')]) >>> ends = pandas.DatetimeIndex([pandas.Timestamp('2012-05-02')]) >>> p1 = Point(0,0) >>> p2 = Point(100,100) >>> p3 = Point(100,0) >>> pseries = [p1, p2, p3] >>> data = geopandas.GeoDataFrame(geometry=pseries, columns=["a", "b"]) >>> data["a"] = [1,2,3] >>> data["b"] = ["a","b","c"] >>> C = FeatureCollection(id="C", data=data) >>> D = FeatureCollection(id="D", data=data) >>> udf_data = UdfData(proj={"EPSG":4326}, feature_collection_list=[C, D]) >>> model = RandomForestRegressor(n_estimators=10, max_depth=2, verbose=0) >>> path = '/tmp/test.pkl.xz' >>> dummy = joblib.dump(value=model, filename=path, compress=("xz", 3)) >>> m = MachineLearnModelConfig(framework="sklearn", name="test", ... description="Machine learn model", path=path) >>> udf_data.append_machine_learn_model(m) >>> print(udf_data.get_feature_collection_by_id("C")) id: C start_times: None end_times: None data: a b geometry 0 1 a POINT (0 0) 1 2 b POINT (100 100) 2 3 c POINT (100 0) >>> print(udf_data.get_feature_collection_by_id("D")) id: D start_times: None end_times: None data: a b geometry 0 1 a POINT (0 0) 1 2 b POINT (100 100) 2 3 c POINT (100 0) >>> print(len(udf_data.get_feature_collection_list()) == 2) True >>> print(udf_data.ml_model_list[0].path) /tmp/test.pkl.xz >>> print(udf_data.ml_model_list[0].framework) sklearn >>> import json >>> json.dumps(udf_data.to_dict()) # doctest: +ELLIPSIS ... # doctest: +NORMALIZE_WHITESPACE '{"proj": {"EPSG": 4326}, "user_context": {}, "server_context": {}, "datacubes": [], "feature_collection_list": [{"id": "C", "data": {"type": "FeatureCollection", "features": [{"id": "0", "type": "Feature", "properties": {"a": 1, "b": "a"}, "geometry": {"type": "Point", "coordinates": [0.0, 0.0]}}, {"id": "1", "type": "Feature", "properties": {"a": 2, "b": "b"}, "geometry": {"type": "Point", "coordinates": [100.0, 100.0]}}, {"id": "2", "type": "Feature", "properties": {"a": 3, "b": "c"}, "geometry": {"type": "Point", "coordinates": [100.0, 0.0]}}]}}, {"id": "D", "data": {"type": "FeatureCollection", "features": [{"id": "0", "type": "Feature", "properties": {"a": 1, "b": "a"}, "geometry": {"type": "Point", "coordinates": [0.0, 0.0]}}, {"id": "1", "type": "Feature", "properties": {"a": 2, "b": "b"}, "geometry": {"type": "Point", "coordinates": [100.0, 100.0]}}, {"id": "2", "type": "Feature", "properties": {"a": 3, "b": "c"}, "geometry": {"type": "Point", "coordinates": [100.0, 0.0]}}]}}], "structured_data_list": [], "machine_learn_models": [{"description": "Machine learn model", "name": "test", "framework": "sklearn", "path": "/tmp/test.pkl.xz", "md5_hash": null}]}' >>> udf = UdfData.from_dict(udf_data.to_dict()) >>> json.dumps(udf.to_dict()) # doctest: +ELLIPSIS ... # doctest: +NORMALIZE_WHITESPACE '{"proj": {"EPSG": 4326}, "user_context": {}, "server_context": {}, "datacubes": [], "feature_collection_list": [{"id": "C", "data": {"type": "FeatureCollection", "features": [{"id": "0", "type": "Feature", "properties": {"a": 1, "b": "a"}, "geometry": {"type": "Point", "coordinates": [0.0, 0.0]}}, {"id": "1", "type": "Feature", "properties": {"a": 2, "b": "b"}, "geometry": {"type": "Point", "coordinates": [100.0, 100.0]}}, {"id": "2", "type": "Feature", "properties": {"a": 3, "b": "c"}, "geometry": {"type": "Point", "coordinates": [100.0, 0.0]}}]}}, {"id": "D", "data": {"type": "FeatureCollection", "features": [{"id": "0", "type": "Feature", "properties": {"a": 1, "b": "a"}, "geometry": {"type": "Point", "coordinates": [0.0, 0.0]}}, {"id": "1", "type": "Feature", "properties": {"a": 2, "b": "b"}, "geometry": {"type": "Point", "coordinates": [100.0, 100.0]}}, {"id": "2", "type": "Feature", "properties": {"a": 3, "b": "c"}, "geometry": {"type": "Point", "coordinates": [100.0, 0.0]}}]}}], "structured_data_list": [], "machine_learn_models": [{"description": "Machine learn model", "name": "test", "framework": "sklearn", "path": "/tmp/test.pkl.xz", "md5_hash": null}]}' >>> sd_list = StructuredData(description="Data list", data={"list":[1,2,3]}, type="list") >>> sd_dict = StructuredData(description="Data dict", data={"A":{"B": 1}}, type="dict") >>> udf = UdfData(proj={"EPSG":4326}, structured_data_list=[sd_list, sd_dict]) >>> json.dumps(udf.to_dict()) # doctest: +ELLIPSIS ... # doctest: +NORMALIZE_WHITESPACE '{"proj": {"EPSG": 4326}, "user_context": {}, "server_context": {}, "datacubes": [], "feature_collection_list": [], "structured_data_list": [{"description": "Data list", "data": {"list": [1, 2, 3]}, "type": "list"}, {"description": "Data dict", "data": {"A": {"B": 1}}, "type": "dict"}], "machine_learn_models": []}' >>> array = xarray.DataArray(numpy.zeros(shape=(2, 3)), coords={'x': [1, 2], 'y': [1, 2, 3]}, dims=('x', 'y')) >>> array.attrs["description"] = "This is an xarray with two dimensions" >>> array.name = "testdata" >>> h = DataCube(array=array) >>> udf_data = UdfData(proj={"EPSG":4326}, datacube_list=[h]) >>> udf_data.user_context = {"kernel": 3} >>> udf_data.server_context = {"reduction_dimension": "t"} >>> udf_data.user_context {'kernel': 3} >>> udf_data.server_context {'reduction_dimension': 't'} >>> print(udf_data.get_datacube_by_id("testdata").to_dict()) {'id': 'testdata', 'data': [[0.0, 0.0, 0.0], [0.0, 0.0, 0.0]], 'dimensions': [{'name': 'x', 'coordinates': [1, 2]}, {'name': 'y', 'coordinates': [1, 2, 3]}], 'description': 'This is an xarray with two dimensions'} >>> json.dumps(udf_data.to_dict()) # doctest: +ELLIPSIS ... # doctest: +NORMALIZE_WHITESPACE '{"proj": {"EPSG": 4326}, "user_context": {"kernel": 3}, "server_context": {"reduction_dimension": "t"}, "datacubes": [{"id": "testdata", "data": [[0.0, 0.0, 0.0], [0.0, 0.0, 0.0]], "dimensions": [{"name": "x", "coordinates": [1, 2]}, {"name": "y", "coordinates": [1, 2, 3]}], "description": "This is an xarray with two dimensions"}], "feature_collection_list": [], "structured_data_list": [], "machine_learn_models": []}' >>> udf = UdfData.from_dict(udf_data.to_dict()) >>> json.dumps(udf.to_dict()) # doctest: +ELLIPSIS ... # doctest: +NORMALIZE_WHITESPACE '{"proj": {"EPSG": 4326}, "user_context": {}, "server_context": {}, "datacubes": [{"id": "testdata", "data": [[0.0, 0.0, 0.0], [0.0, 0.0, 0.0]], "dimensions": [{"name": "x", "coordinates": [1, 2]}, {"name": "y", "coordinates": [1, 2, 3]}], "description": "This is an xarray with two dimensions"}], "feature_collection_list": [], "structured_data_list": [], "machine_learn_models": []}' """ def __init__(self, proj: Dict = None, datacube_list: Optional[List[DataCube]]=None, feature_collection_list: Optional[List[FeatureCollection]]=None, structured_data_list: Optional[List[StructuredData]]=None, ml_model_list: Optional[List[MachineLearnModelConfig]]=None, metadata: 'openeo_udf.server.data_model.metadata_schema.MetadataModel' = None): """The constructor of the UDF argument class that stores all data required by the user defined function. Args: proj (dict): A dictionary of form {"proj type string": "projection decription"} i. e. {"EPSG":4326} datacube_list (list(HyperCube)): A list of HyperCube objects feature_collection_list (list[FeatureCollection]): A list of VectorTile objects structured_data_list (list[StructuredData]): A list of structured data objects ml_model_list (list[MachineLearnModelConfig]): A list of machine learn models metadata (MetadataModel): additional metadata """ self._datacube_list = [] self._feature_tile_list = [] self._datacube_dict = {} self._feature_tile_dict = {} self._structured_data_list = [] self._ml_model_list = [] self.proj = proj self._metadata: 'openeo_udf.server.data_model.metadata_schema.MetadataModel' = None self._user_context: Dict = dict() self._server_context: Dict = dict() if datacube_list: self.set_datacube_list(datacube_list=datacube_list) if feature_collection_list: self.set_feature_collection_list(feature_collection_list=feature_collection_list) if structured_data_list: self.set_structured_data_list(structured_data_list=structured_data_list) if ml_model_list: self.set_ml_model_list(ml_model_list=ml_model_list) if metadata: self.metadata = metadata @property def metadata(self) -> 'openeo_udf.server.data_model.metadata_schema.MetadataModel': return self._metadata @metadata.setter def metadata(self, model: 'openeo_udf.server.data_model.metadata_schema.MetadataModel'): self._metadata = model @property def user_context(self) -> Dict: """Return the user context that was passed to the run_udf function""" return self._user_context @user_context.setter def user_context(self, context: Dict): """Set the user context""" self._user_context = context @property def server_context(self) -> Dict: """Return the server context that is passed from the backend to the UDF server for runtime configuration""" return self._server_context @server_context.setter def server_context(self, context: Dict): """Return the server context""" self._server_context = context
[docs] def get_datacube_by_id(self, id: str) -> Optional[DataCube]: """Get a datacube by its id Args: id (str): The datacube id Returns: HypeCube: the requested datacube or None if not found """ if id in self._datacube_dict: return self._datacube_dict[id] return None
[docs] def get_feature_collection_by_id(self, id: str) -> Optional[FeatureCollection]: """Get a feature collection by its id Args: id (str): The vector tile id Returns: FeatureCollection: the requested feature collection or None if not found """ if id in self._feature_tile_dict: return self._feature_tile_dict[id] return None
[docs] def get_datacube_list(self) -> Optional[List[DataCube]]: """Get the datacube list """ return self._datacube_list
[docs] def set_datacube_list(self, datacube_list: List[DataCube]): """Set the datacube list If datacube_list is None, then the list will be cleared Args: datacube_list (List[DataCube]): A list of HyperCube's """ self.del_datacube_list() if datacube_list is None: return for datacube in datacube_list: self.append_datacube(datacube)
[docs] def del_datacube_list(self): """Delete all datacubes """ self._datacube_list.clear() self._datacube_dict.clear()
[docs] def get_feature_collection_list(self) -> Optional[List[FeatureCollection]]: """Get all feature collections as list Returns: list[FeatureCollection]: The list of feature collections """ return self._feature_tile_list
[docs] def set_feature_collection_list(self, feature_collection_list: Optional[List[FeatureCollection]]): """Set the feature collection tiles If feature_collection_tiles is None, then the list will be cleared Args: feature_collection_list (list[FeatureCollection]): A list of FeatureCollectionTile's """ self.del_feature_collection_list() if feature_collection_list is None: return for entry in feature_collection_list: self.append_feature_collection(entry)
[docs] def del_feature_collection_list(self): """Delete all feature collection tiles """ self._feature_tile_list.clear() self._feature_tile_dict.clear()
[docs] def get_structured_data_list(self) -> Optional[List[StructuredData]]: """Get all structured data entries Returns: (list[StructuredData]): A list of StructuredData objects """ return self._structured_data_list
[docs] def set_structured_data_list(self, structured_data_list: Optional[List[StructuredData]]): """Set the list of structured data If structured_data_list is None, then the list will be cleared Args: structured_data_list (list[StructuredData]): A list of StructuredData objects """ self.del_structured_data_list() if structured_data_list is None: return for entry in structured_data_list: self._structured_data_list.append(entry)
[docs] def del_structured_data_list(self): """Delete all structured data entries """ self._structured_data_list.clear()
[docs] def get_ml_model_list(self) -> Optional[List[MachineLearnModelConfig]]: """Get all machine learn models Returns: (list[MachineLearnModel]): A list of MachineLearnModel objects """ return self._ml_model_list
[docs] def set_ml_model_list(self, ml_model_list: Optional[List[MachineLearnModelConfig]]): """Set the list of machine learn models If ml_model_list is None, then the list will be cleared Args: ml_model_list (list[MachineLearnModelConfig]): A list of MachineLearnModel objects """ self.del_ml_model_list() if ml_model_list is None: return for entry in ml_model_list: self._ml_model_list.append(entry)
[docs] def del_ml_model_list(self): """Delete all machine learn models """ self._ml_model_list.clear()
datacube_list = property(fget=get_datacube_list, fset=set_datacube_list, fdel=del_datacube_list) feature_collection_list = property(fget=get_feature_collection_list, fset=set_feature_collection_list, fdel=del_feature_collection_list) structured_data_list = property(fget=get_structured_data_list, fset=set_structured_data_list, fdel=del_structured_data_list) ml_model_list = property(fget=get_ml_model_list, fset=set_ml_model_list, fdel=del_ml_model_list)
[docs] def append_datacube(self, datacube: DataCube): """Append a HyperCube to the list It will be automatically added to the dictionary of all datacubes Args: datacube (DataCube): The HyperCube to append """ self._datacube_list.append(datacube) self._datacube_dict[datacube.id] = datacube
[docs] def append_feature_collection(self, feature_collection_tile: FeatureCollection): """Append a feature collection tile to the list It will be automatically added to the dictionary of all feature collection tiles Args: feature_collection_tile (FeatureCollection): The feature collection tile to append """ self._feature_tile_list.append(feature_collection_tile) self._feature_tile_dict[feature_collection_tile.id] = feature_collection_tile
[docs] def append_structured_data(self, structured_data: StructuredData): """Append a structured data object to the list Args: structured_data (StructuredData): A StructuredData objects """ self._structured_data_list.append(structured_data)
[docs] def append_machine_learn_model(self, machine_learn_model: MachineLearnModelConfig): """Append a machine learn model to the list Args: machine_learn_model (MachineLearnModelConfig): A MachineLearnModel objects """ self._ml_model_list.append(machine_learn_model)
[docs] def to_dict(self) -> Dict: """Convert this UdfData object into a dictionary that can be converted into a valid JSON representation Returns: dict: UdfData object as a dictionary """ d = {"proj": self.proj, "user_context": self.user_context, "server_context": self.server_context} if self._datacube_list is not None: l = [] for datacube in self._datacube_list: l.append(datacube.to_dict()) d["datacubes"] = l if self._feature_tile_list is not None: l = [] for tile in self._feature_tile_list: l.append(tile.to_dict()) d["feature_collection_list"] = l if self._structured_data_list is not None: l = [] for entry in self._structured_data_list: l.append(entry.to_dict()) d["structured_data_list"] = l if self._ml_model_list is not None: l = [] for entry in self._ml_model_list: l.append(entry.to_dict()) d["machine_learn_models"] = l return d
[docs] @staticmethod def from_dict(udf_dict: Dict): """Create a udf data object from a python dictionary that was created from the JSON definition of the UdfData class Args: udf_dict (dict): The dictionary that contains the udf data definition Returns: UdfData: A new UdfData object """ if "proj" not in udf_dict: raise Exception("Missing projection in dictionary") udf_data = UdfData(proj=udf_dict["proj"]) if "user_context" in udf_dict: udf_data.user_context = udf_dict["user_context"] if "server_context" in udf_dict: udf_data.server_context = udf_dict["server_context"] if "datacubes" in udf_dict: l = udf_dict["datacubes"] for entry in l: h = DataCube.from_dict(entry) udf_data.append_datacube(h) if "feature_collection_list" in udf_dict: l = udf_dict["feature_collection_list"] for entry in l: fct = FeatureCollection.from_dict(entry) udf_data.append_feature_collection(fct) if "structured_data_list" in udf_dict: l = udf_dict["structured_data_list"] for entry in l: sd = StructuredData.from_dict(entry) udf_data.append_structured_data(sd) if "machine_learn_models" in udf_dict: l = udf_dict["machine_learn_models"] for entry in l: mlm = MachineLearnModelConfig.from_dict(entry) udf_data.append_machine_learn_model(mlm) return udf_data
[docs] @staticmethod def from_udf_data_model(udf_model: 'openeo_udf.server.data_model.udf_schemas.UdfDataModel') -> 'UdfData': """TODO: Must be implemented Args: udf_model: Returns: """ udf_data = UdfData() udf_data.server_context = udf_model.server_context udf_data.user_context = udf_model.user_context for d in udf_model.structured_data_list: sd = StructuredData.from_dict(d) udf_data.append_structured_data(sd) for m in udf_model.machine_learn_models: mlm = MachineLearnModelConfig.from_dict(m.dict()) udf_data.append_machine_learn_model(mlm) cubes = DataCube.from_data_collection(udf_model.data_collection) udf_data.set_datacube_list(cubes) return udf_data
if __name__ == "__main__": import doctest doctest.testmod()