Source code for openeo_udf.api.run_code

# -*- coding: utf-8 -*-
"""OpenEO Python UDF interface"""
import functools
from pprint import pprint

import xarray
import numpy
import pandas
import geopandas
import shapely
from copy import deepcopy
import math
from typing import Dict
from inspect import signature

from openeo_udf.api.feature_collection import FeatureCollection
from openeo_udf.api.datacube import DataCube
from openeo_udf.api.machine_learn_model import MachineLearnModelConfig
from openeo_udf.api.spatial_extent import SpatialExtent
from openeo_udf.api.structured_data import StructuredData
from openeo_udf.api.udf_data import UdfData

__license__ = "Apache License, Version 2.0"
__author__ = "Soeren Gebbert"
__copyright__ = "Copyright 2018, Soeren Gebbert"
__maintainer__ = "Soeren Gebbert"
__email__ = "soerengebbert@googlemail.com"


[docs]def run_udf_model_user_code(udf_model: 'openeo_udf.server.data_model.udf_schemas.UdfRequestModel') -> UdfData: """Run the user defined python code Args: python: the udf request object with code and data collection Returns: """ code = udf_model.code data = UdfData.from_udf_data_model(udf_model.data) result_data = run_user_code(code.source, data) return result_data
[docs]def run_legacy_user_code(dict_data: Dict) -> Dict: """Run the user defined python code on legacy data Args: dict_data: the udf request object with code and legacy data organized in a dictionary Returns: """ code = dict_data["code"]["source"] data = UdfData.from_dict(dict_data["data"]) result_data = run_user_code(code, data) return result_data.to_dict()
def _build_default_execution_context(): context = { 'numpy': numpy, 'xarray': xarray, 'geopandas': geopandas, 'pandas': pandas, 'shapely': shapely, 'math': math, 'FeatureCollection': FeatureCollection, 'SpatialExtent': SpatialExtent, 'StructuredData': StructuredData, 'MachineLearnModel': MachineLearnModelConfig, 'DataCube': DataCube, 'UdfData': UdfData } try: import torch context['torch'] = torch import torchvision except ImportError as e: print('torch not available') try: import tensorflow context['tensorflow'] = tensorflow import tensorboard except ImportError as e: print('tensorflow not available') return context
[docs]@functools.lru_cache(maxsize=100) def load_module_from_string(code:str): """ Experimental: avoid loading same UDF module more than once, to make caching inside the udf work. @param code: @return: """ module = _build_default_execution_context() exec(code, module) return module
[docs]def run_user_code(code:str, data:UdfData) -> UdfData: module = load_module_from_string(code) functions = {t[0]:t[1] for t in module.items() if callable(t[1])} for func in functions.items(): try: sig = signature(func[1]) except ValueError: continue params = sig.parameters params_list = [t[1] for t in sig.parameters.items()] if(func[0] == 'apply_timeseries' and 'series' in params and 'context' in params and 'pandas.core.series.Series' in str(params['series'].annotation) and 'pandas.core.series.Series' in str(sig.return_annotation) ): #this is a UDF that transforms pandas series from .udf_wrapper import apply_timeseries_generic return apply_timeseries_generic(data, func[1]) elif( (func[0] == 'apply_hypercube' or func[0] == 'apply_datacube' ) and 'cube' in params and 'context' in params and 'openeo_udf.api.datacube.DataCube' in str(params['cube'].annotation) and 'openeo_udf.api.datacube.DataCube' in str(sig.return_annotation) ): #found a datacube mapping function if len(data.get_datacube_list()) != 1: raise ValueError("The provided UDF expects exactly one datacube, but only: %s were provided." % len(data.get_datacube_list())) result_cube = func[1](data.get_datacube_list()[0], data.user_context) if not isinstance(result_cube,DataCube): raise ValueError("The provided UDF did not return a DataCube, but got: %s" %result_cube) data.set_datacube_list([result_cube]) break elif len(params_list) == 1 and (params_list[0].annotation == 'openeo_udf.api.udf_data.UdfData' or params_list[0].annotation == UdfData) : #found a generic UDF function func[1](data) break return data