Source code for openeo.rest._datacube

from __future__ import annotations

import logging
import pathlib
import re
import textwrap
import typing
import uuid
import warnings
from typing import Dict, List, Optional, Tuple, Union

import requests

import openeo
from openeo.internal.graph_building import FlatGraphableMixin, PGNode, _FromNodeMixin
from openeo.internal.jupyter import render_component
from openeo.internal.processes.builder import (
    convert_callable_to_pgnode,
    get_parameter_names,
)
from openeo.internal.warnings import UserDeprecationWarning
from openeo.rest import OpenEoClientException
from openeo.rest.models.general import ValidationResponse
from openeo.util import dict_no_none, str_truncate

if typing.TYPE_CHECKING:
    # Imports for type checking only (circular import issue at runtime).
    from openeo.rest.connection import Connection
    from openeo.rest.result import SaveResult
    from openeo.rest.stac_resource import StacResource

log = logging.getLogger(__name__)

# Sentinel object to refer to "current" cube in chained cube processing expressions.
THIS = object()


class _ProcessGraphAbstraction(_FromNodeMixin, FlatGraphableMixin):
    """
    Base class for client-side abstractions/wrappers
    for structures that are represented by a openEO process graph:
    raster data cubes, vector cubes, ML models, ...
    """

    def __init__(self, pgnode: PGNode, connection: Union[Connection, None]):
        self._pg = pgnode
        # TODO: now that connection can officially be None:
        #       improve exceptions in cases where is it still assumed to be a real connection (download, create_job, ...)
        self._connection = connection

    def __str__(self):
        return "{t}({pg})".format(t=self.__class__.__name__, pg=self._pg)

    def flat_graph(self) -> Dict[str, dict]:
        """
        Get the process graph in internal flat dict representation.

        .. warning:: This method is mainly intended for internal use.
            It is not recommended for general use and is *subject to change*.

            Instead, it is recommended to use
            :py:meth:`to_json()` or :py:meth:`print_json()`
            to obtain a standardized, interoperable JSON representation of the process graph.
            See :ref:`process_graph_export` for more information.
        """
        # TODO: wrap in {"process_graph":...} by default/optionally?
        return self._pg.flat_graph()

    @property
    def _api_version(self):
        return self._connection.capabilities().api_version_check

    @property
    def connection(self) -> Connection:
        return self._connection

    def result_node(self) -> PGNode:
        """
        Get the current result node (:py:class:`PGNode`) of the process graph.

        .. versionadded:: 0.10.1
        """
        return self._pg

    def from_node(self):
        # _FromNodeMixin API
        return self._pg

    def _build_pgnode(
        self,
        process_id: str,
        arguments: Optional[dict] = None,
        namespace: Optional[str] = None,
        **kwargs
    ) -> PGNode:
        """
        Helper to build a PGNode from given argument dict and/or kwargs,
        and possibly resolving the `THIS` reference.
        """
        arguments = {**(arguments or {}), **kwargs}
        for k, v in arguments.items():
            if v is THIS:
                arguments[k] = self
            # TODO: also necessary to traverse lists/dictionaries?
        return PGNode(process_id=process_id, arguments=arguments, namespace=namespace)

    # TODO #278 also move process graph "execution" methods here: `download`, `execute`, `execute_batch`, `create_job`, `save_udf`,  ...

    def validate(self) -> ValidationResponse:
        """
        Validate a process graph without executing it.

        :return: container of validation of errors (dictionaries with "code" and "message" fields)

        .. versionadded:: 0.41.0
        """
        return self._connection.validate_process_graph(self)

    def _repr_html_(self):
        process = {"process_graph": self.flat_graph()}
        parameters = {
            "id": uuid.uuid4().hex,
            "explicit-zoom": True,
            "height": "400px",
        }
        return render_component("model-builder", data=process, parameters=parameters)


[docs] class UDF: """ Helper class to load UDF code (e.g. from file) and embed them as "callback" or child process in a process graph. Usage example: .. code-block:: python udf = UDF.from_file("my-udf-code.py") cube = cube.apply(process=udf) .. versionchanged:: 0.13.0 Added auto-detection of ``runtime``. Specifying the ``data`` argument is not necessary anymore, and actually deprecated. Added :py:meth:`from_file` to simplify loading UDF code from a file. See :ref:`old_udf_api` for more background about the changes. .. versionchanged:: 0.43.0 Automatically un-indent given UDF code, to simplify writing valid and properly formatted inline UDF code. """ # TODO: eliminate dependency on `openeo.rest.connection` and move to somewhere under `openeo.internal`? __slots__ = ["code", "_runtime", "version", "context", "_source"] def __init__( self, code: str, runtime: Optional[str] = None, data=None, # TODO #181 remove `data` argument version: Optional[str] = None, context: Optional[dict] = None, *, _source=None, auto_dedent: bool = True, ): """ Construct a UDF object from given code string and other argument related to the ``run_udf`` process. :param code: UDF source code string (Python, R, ...) :param runtime: optional UDF runtime identifier, will be autodetected from source code if omitted. :param data: unused leftover from old API. Don't use this argument, it will be removed in a future release. :param version: optional UDF runtime version string :param context: optional additional UDF context data :param _source: (for internal use) source identifier """ if auto_dedent: code = textwrap.dedent(code) self.code = code self._runtime = runtime self.version = version self.context = context self._source = _source if data is not None: # TODO #181 remove `data` argument warnings.warn( f"The `data` argument of `{self.__class__.__name__}` is deprecated, unused and will be removed in a future release.", category=UserDeprecationWarning, stacklevel=2, ) def __repr__(self): return f"<{type(self).__name__} runtime={self._runtime!r} code={str_truncate(self.code, width=200)!r}>" def get_runtime(self, connection: Optional[Connection] = None) -> str: return self._runtime or self._guess_runtime(connection=connection)
[docs] @classmethod def from_file( cls, path: Union[str, pathlib.Path], runtime: Optional[str] = None, version: Optional[str] = None, context: Optional[dict] = None, ) -> UDF: """ Load a UDF from a local file. .. seealso:: :py:meth:`from_url` for loading from a URL. :param path: path to the local file with UDF source code :param runtime: optional UDF runtime identifier, will be auto-detected from source code if omitted. :param version: optional UDF runtime version string :param context: optional additional UDF context data """ path = pathlib.Path(path) code = path.read_text(encoding="utf-8") return cls( code=code, runtime=runtime, version=version, context=context, _source=path )
[docs] @classmethod def from_url( cls, url: str, runtime: Optional[str] = None, version: Optional[str] = None, context: Optional[dict] = None, ) -> UDF: """ Load a UDF from a URL. .. seealso:: :py:meth:`from_file` for loading from a local file. :param url: URL path to load the UDF source code from :param runtime: optional UDF runtime identifier, will be auto-detected from source code if omitted. :param version: optional UDF runtime version string :param context: optional additional UDF context data """ resp = requests.get(url) resp.raise_for_status() code = resp.text return cls( code=code, runtime=runtime, version=version, context=context, _source=url )
def _guess_runtime(self, connection: Optional[Connection] = None) -> str: """Guess UDF runtime from UDF source (path) or source code.""" # First, guess UDF language language = None if isinstance(self._source, pathlib.Path): language = self._guess_runtime_from_suffix(self._source.suffix) elif isinstance(self._source, str): url_match = re.match( r"https?://.*?(?P<suffix>\.\w+)([&#].*)?$", self._source ) if url_match: language = self._guess_runtime_from_suffix(url_match.group("suffix")) if not language: # Guess language from UDF code if re.search(r"^def [\w0-9_]+\(", self.code, flags=re.MULTILINE): language = "Python" # TODO: detection heuristics for R and other languages? if not language: raise OpenEoClientException("Failed to detect language of UDF code.") runtime = language if connection: # Some additional best-effort validation/normalization of the runtime # TODO: this just does some case-normalization, just drop that all together to eliminate # the dependency on a connection object. See https://github.com/Open-EO/openeo-api/issues/510 runtimes = {k.lower(): k for k in connection.list_udf_runtimes().keys()} runtime = runtimes.get(runtime.lower(), runtime) return runtime def _guess_runtime_from_suffix(self, suffix: str) -> Union[str]: return { ".py": "Python", ".r": "R", }.get(suffix.lower())
[docs] def get_run_udf_callback(self, connection: Optional[Connection] = None, data_parameter: str = "data") -> PGNode: """ For internal use: construct `run_udf` node to be used as callback in `apply`, `reduce_dimension`, ... """ arguments = dict_no_none( data={"from_parameter": data_parameter}, udf=self.code, runtime=self.get_runtime(connection=connection), version=self.version, context=self.context, ) return PGNode(process_id="run_udf", arguments=arguments)
def build_child_callback( process: Union[str, PGNode, typing.Callable, UDF], parent_parameters: List[str], connection: Optional[Connection] = None, ) -> dict: """ Build a "callback" process: a user defined process that is used by another process (such as `apply`, `apply_dimension`, `reduce`, ....) :param process: process id string, PGNode or callable that uses the ProcessBuilder mechanism to build a process :param parent_parameters: list of parameter names defined for child process :param connection: optional connection object to improve runtime validation for UDFs :return: """ # TODO: move this to more generic process graph building utility module # TODO: autodetect the parameters defined by parent process? # TODO: eliminate need for connection object (also see `UDF._guess_runtime`) # TODO: when `openeo.rest` deps are gone: move this helper to somewhere under `openeo.internal` if isinstance(process, PGNode): # Assume this is already a valid callback process pg = process elif isinstance(process, str): # Assume given reducer is a simple predefined reduce process_id # TODO: avoid local import (workaround for circular import issue) import openeo.processes if process in openeo.processes.__dict__: process_params = get_parameter_names(openeo.processes.__dict__[process]) # TODO: switch to "Callable" handling here else: # Best effort guess process_params = parent_parameters if parent_parameters == ["x", "y"] and (len(process_params) == 1 or process_params[:1] == ["data"]): # Special case: wrap all parent parameters in an array arguments = {process_params[0]: [{"from_parameter": p} for p in parent_parameters]} else: # Only pass parameters that correspond with an arg name common = set(process_params).intersection(parent_parameters) arguments = {p: {"from_parameter": p} for p in common} pg = PGNode(process_id=process, arguments=arguments) elif isinstance(process, typing.Callable): pg = convert_callable_to_pgnode(process, parent_parameters=parent_parameters) elif isinstance(process, UDF): pg = process.get_run_udf_callback(connection=connection, data_parameter=parent_parameters[0]) elif isinstance(process, dict) and isinstance(process.get("process_graph"), PGNode): pg = process["process_graph"] else: raise ValueError(process) return PGNode.to_process_graph_argument(pg)