Source code for openeo_udf.api.run_code
# -*- coding: utf-8 -*-
"""OpenEO Python UDF interface"""
import functools
from pprint import pprint
import xarray
import numpy
import pandas
import geopandas
import shapely
from copy import deepcopy
import math
from typing import Dict
from inspect import signature
from openeo_udf.api.feature_collection import FeatureCollection
from openeo_udf.api.datacube import DataCube
from openeo_udf.api.machine_learn_model import MachineLearnModelConfig
from openeo_udf.api.spatial_extent import SpatialExtent
from openeo_udf.api.structured_data import StructuredData
from openeo_udf.api.udf_data import UdfData
__license__ = "Apache License, Version 2.0"
__author__ = "Soeren Gebbert"
__copyright__ = "Copyright 2018, Soeren Gebbert"
__maintainer__ = "Soeren Gebbert"
__email__ = "soerengebbert@googlemail.com"
[docs]def run_udf_model_user_code(udf_model: 'openeo_udf.server.data_model.udf_schemas.UdfRequestModel') -> UdfData:
"""Run the user defined python code
Args:
python: the udf request object with code and data collection
Returns:
"""
code = udf_model.code
data = UdfData.from_udf_data_model(udf_model.data)
result_data = run_user_code(code.source, data)
return result_data
[docs]def run_legacy_user_code(dict_data: Dict) -> Dict:
"""Run the user defined python code on legacy data
Args:
dict_data: the udf request object with code and legacy data organized in a dictionary
Returns:
"""
code = dict_data["code"]["source"]
data = UdfData.from_dict(dict_data["data"])
result_data = run_user_code(code, data)
return result_data.to_dict()
def _build_default_execution_context():
context = {
'numpy': numpy,
'xarray': xarray,
'geopandas': geopandas,
'pandas': pandas,
'shapely': shapely,
'math': math,
'FeatureCollection': FeatureCollection,
'SpatialExtent': SpatialExtent,
'StructuredData': StructuredData,
'MachineLearnModel': MachineLearnModelConfig,
'DataCube': DataCube,
'UdfData': UdfData
}
try:
import torch
context['torch'] = torch
import torchvision
except ImportError as e:
print('torch not available')
try:
import tensorflow
context['tensorflow'] = tensorflow
import tensorboard
except ImportError as e:
print('tensorflow not available')
return context
[docs]@functools.lru_cache(maxsize=100)
def load_module_from_string(code:str):
"""
Experimental: avoid loading same UDF module more than once, to make caching inside the udf work.
@param code:
@return:
"""
module = _build_default_execution_context()
exec(code, module)
return module
[docs]def run_user_code(code:str, data:UdfData) -> UdfData:
module = load_module_from_string(code)
functions = {t[0]:t[1] for t in module.items() if callable(t[1])}
for func in functions.items():
try:
sig = signature(func[1])
except ValueError:
continue
params = sig.parameters
params_list = [t[1] for t in sig.parameters.items()]
if(func[0] == 'apply_timeseries' and 'series' in params and 'context' in params and 'pandas.core.series.Series'
in str(params['series'].annotation) and 'pandas.core.series.Series' in str(sig.return_annotation) ):
#this is a UDF that transforms pandas series
from .udf_wrapper import apply_timeseries_generic
return apply_timeseries_generic(data, func[1])
elif( (func[0] == 'apply_hypercube' or func[0] == 'apply_datacube' ) and 'cube' in params and 'context' in params and 'openeo_udf.api.datacube.DataCube'
in str(params['cube'].annotation) and 'openeo_udf.api.datacube.DataCube' in str(sig.return_annotation) ):
#found a datacube mapping function
if len(data.get_datacube_list()) != 1:
raise ValueError("The provided UDF expects exactly one datacube, but only: %s were provided." % len(data.get_datacube_list()))
result_cube = func[1](data.get_datacube_list()[0], data.user_context)
if not isinstance(result_cube,DataCube):
raise ValueError("The provided UDF did not return a DataCube, but got: %s" %result_cube)
data.set_datacube_list([result_cube])
break
elif len(params_list) == 1 and (params_list[0].annotation == 'openeo_udf.api.udf_data.UdfData' or params_list[0].annotation == UdfData) :
#found a generic UDF function
func[1](data)
break
return data