Source code for openeo.rest.multiresult

from __future__ import annotations

from typing import Dict, List, Optional

from openeo import BatchJob
from openeo.internal.graph_building import FlatGraphableMixin, MultiLeafGraph
from openeo.rest import OpenEoClientException
from openeo.rest.connection import Connection, extract_connections


[docs] class MultiResult(FlatGraphableMixin): """ Helper to create and run batch jobs with process graphs that contain multiple result nodes or, more generally speaking, multiple process graph "leaf" nodes. Provide multiple :py:class:`~openeo.rest.datacube.DataCube`/:py:class:`~openeo.rest.vectorcube.VectorCube` instances to the constructor, and start a batch job from that, for example as follows: .. code-block:: python from openeo import MultiResult cube1 = ... cube2 = ... multi_result = MultiResult([cube1, cube2]) job = multi_result.create_job() .. seealso:: :ref:`multi-result-process-graphs` .. versionadded:: 0.35.0 """ __slots__ = ("_multi_leaf_graph", "_connection")
[docs] def __init__(self, leaves: List[FlatGraphableMixin], connection: Optional[Connection] = None): """ Build a :py:class:`MultiResult` instance from multiple leaf nodes :param leaves: list of objects that can be converted to an openEO-style (flat) process graph representation, typically :py:class:`~openeo.rest.datacube.DataCube` or :py:class:`~openeo.rest.vectorcube.VectorCube` instances. :param connection: Optional connection to use for creating/starting batch jobs, for special use cases where the provided leaf instances are not already associated with a connection. """ self._multi_leaf_graph = MultiLeafGraph(leaves=leaves) self._connection = self._extract_connection(leaves=leaves, connection=connection)
@staticmethod def _extract_connection(leaves: List[FlatGraphableMixin], connection: Optional[Connection] = None) -> Connection: """ Extract common connection from leaves and/or explicitly provided connection. Fails if there are multiple or none. """ connections = set() if connection: connections.add(connection) connections.update(extract_connections(leaves)) if len(connections) == 1: return connections.pop() elif len(connections) == 0: raise OpenEoClientException("No connection in any of the MultiResult leaves") else: raise OpenEoClientException("MultiResult with multiple different connections") def flat_graph(self) -> Dict[str, dict]: return self._multi_leaf_graph.flat_graph() def create_job( self, *, title: Optional[str] = None, description: Optional[str] = None, additional: Optional[dict] = None, job_options: Optional[dict] = None, validate: Optional[bool] = None, log_level: Optional[str] = None, ) -> BatchJob: return self._connection.create_job( process_graph=self._multi_leaf_graph, title=title, description=description, additional=additional, job_options=job_options, validate=validate, log_level=log_level, ) def execute_batch( self, *, title: Optional[str] = None, description: Optional[str] = None, additional: Optional[dict] = None, job_options: Optional[dict] = None, validate: Optional[bool] = None, log_level: Optional[str] = None, ) -> BatchJob: job = self.create_job( title=title, description=description, additional=additional, job_options=job_options, validate=validate, log_level=log_level, ) return job.run_synchronous()