from __future__ import annotations
import typing
from pathlib import Path
from typing import Optional, Union
from openeo.internal.documentation import openeo_process
from openeo.internal.graph_building import PGNode
from openeo.rest import (
DEFAULT_JOB_STATUS_POLL_CONNECTION_RETRY_INTERVAL,
DEFAULT_JOB_STATUS_POLL_INTERVAL_MAX,
)
from openeo.rest._datacube import _ProcessGraphAbstraction
from openeo.rest.job import BatchJob
if typing.TYPE_CHECKING:
from openeo.rest.connection import Connection
[docs]
class StacResource(_ProcessGraphAbstraction):
"""
Handle for a progress graph node that represents a STAC resource (object with subtype "stac"),
e.g. as returned by openEO process ``save_result``,
or handled by openEO processes ``export_workspace``/``stac_modify``.
Refers to a STAC resource of any type (Catalog, Collection, or Item).
It can refer to:
- static STAC resources, e.g. hosted on cloud storage
- dynamic STAC resources made available via a STAC API
- a STAC JSON representation embedded as an argument into an openEO user-defined process
.. versionadded:: 0.39.0
"""
def __init__(self, graph: PGNode, connection: Optional[Connection] = None):
super().__init__(pgnode=graph, connection=connection)
[docs]
@openeo_process
def export_workspace(self, workspace: str, merge: Union[str, None] = None) -> StacResource:
"""
Export data to a cloud user workspace
Exports the given processing results made available through a STAC resource
(e.g., a STAC Collection) to the given user workspace.
The STAC resource itself is exported with all STAC resources and assets underneath.
:param workspace: The identifier of the workspace to export to.
:param merge: (optional) Provides a cloud-specific path identifier
to a STAC resource to merge the given STAC resource into.
If not provided, the STAC resource is kept separate
from any other STAC resources in the workspace.
:return: the potentially updated STAC resource.
"""
return StacResource(
graph=self._build_pgnode(
process_id="export_workspace", arguments={"data": self, "workspace": workspace, "merge": merge}
),
connection=self._connection,
)
[docs]
def download(
self,
outputfile: Optional[Union[str, Path]] = None,
*,
validate: Optional[bool] = None,
additional: Optional[dict] = None,
job_options: Optional[dict] = None,
):
"""
Send the underlying process graph to the backend
for synchronous processing and directly download the result.
If ``outputfile`` is provided, the result is downloaded to that path.
Otherwise a :py:class:`bytes` object is returned with the raw data.
:param outputfile: (optional) output path to download to.
:param validate: (optional) toggle to enable/prevent validation of the process graphs before execution
(overruling the connection's ``auto_validate`` setting).
:param additional: (optional) additional (top-level) properties to set in the request body
:param job_options: (optional) dictionary of job options to pass to the backend
(under top-level property "job_options")
:return: if ``outputfile`` was not specified:
a :py:class:`bytes` object containing the raw data.
Otherwise, ``None`` is returned.
"""
return self._connection.download(
graph=self.flat_graph(),
outputfile=outputfile,
validate=validate,
additional=additional,
job_options=job_options,
)
[docs]
def create_job(
self,
*,
title: Optional[str] = None,
description: Optional[str] = None,
plan: Optional[str] = None,
budget: Optional[float] = None,
additional: Optional[dict] = None,
job_options: Optional[dict] = None,
validate: Optional[bool] = None,
log_level: Optional[str] = None,
) -> BatchJob:
"""
Send the underlying process graph to the backend
to create an openEO batch job
and return a corresponding :py:class:`~openeo.rest.job.BatchJob` instance.
Note that this method only *creates* the openEO batch job at the backend,
but it does not *start* it.
Use :py:meth:`execute_batch` instead to let the openEO Python client
take care of the full job life cycle: create, start and track its progress until completion.
:param title: (optional) job title.
:param description: (optional) job description.
:param plan: (optional) the billing plan to process and charge the job with.
:param budget: (optional) maximum budget to be spent on executing the job.
Note that some backends do not honor this limit.
:param additional: (optional) additional (top-level) properties to set in the request body
:param job_options: (optional) dictionary of job options to pass to the backend
(under top-level property "job_options")
:param validate: (optional) toggle to enable/prevent validation of the process graphs before execution
(overruling the connection's ``auto_validate`` setting).
:param log_level: (optional) minimum severity level for log entries that the back-end should keep track of.
One of "error" (highest severity), "warning", "info", and "debug" (lowest severity).
:return: Handle to the job created at the backend.
"""
return self._connection.create_job(
process_graph=self.flat_graph(),
title=title,
description=description,
plan=plan,
budget=budget,
validate=validate,
additional=additional,
job_options=job_options,
log_level=log_level,
)
[docs]
def execute_batch(
self,
outputfile: Optional[Union[str, Path]] = None,
*,
title: Optional[str] = None,
description: Optional[str] = None,
plan: Optional[str] = None,
budget: Optional[float] = None,
print: typing.Callable[[str], None] = print,
max_poll_interval: float = DEFAULT_JOB_STATUS_POLL_INTERVAL_MAX,
connection_retry_interval: float = DEFAULT_JOB_STATUS_POLL_CONNECTION_RETRY_INTERVAL,
additional: Optional[dict] = None,
job_options: Optional[dict] = None,
validate: Optional[bool] = None,
show_error_logs: bool = True,
log_level: Optional[str] = None,
) -> BatchJob:
"""
Execute the underlying process graph at the backend in batch job mode:
- create the job (like :py:meth:`create_job`)
- start the job (like :py:meth:`BatchJob.start() <openeo.rest.job.BatchJob.start>`)
- track the job's progress with an active polling loop
(like :py:meth:`BatchJob.run_synchronous() <openeo.rest.job.BatchJob.run_synchronous>`)
- optionally (if ``outputfile`` is specified) download the job's results
when the job finished successfully
.. note::
Because of the active polling loop,
which blocks any further progress of your script or application,
this :py:meth:`execute_batch` method is mainly recommended
for batch jobs that are expected to complete
in a time that is reasonable for your use case.
:param outputfile: (optional) output path to download to.
:param title: (optional) job title.
:param description: (optional) job description.
:param plan: (optional) the billing plan to process and charge the job with.
:param budget: (optional) maximum budget to be spent on executing the job.
Note that some backends do not honor this limit.
:param additional: (optional) additional (top-level) properties to set in the request body
:param job_options: (optional) dictionary of job options to pass to the backend
(under top-level property "job_options")
:param validate: (optional) toggle to enable/prevent validation of the process graphs before execution
(overruling the connection's ``auto_validate`` setting).
:param log_level: (optional) minimum severity level for log entries that the back-end should keep track of.
One of "error" (highest severity), "warning", "info", and "debug" (lowest severity).
:param print: print/logging function to show progress/status
:param max_poll_interval: maximum number of seconds to sleep between job status polls
:param connection_retry_interval: how long to wait when status poll failed due to connection issue
:param show_error_logs: whether to automatically print error logs when the batch job failed.
:return: Handle to the job created at the backend.
"""
job = self.create_job(
title=title,
description=description,
plan=plan,
budget=budget,
additional=additional,
job_options=job_options,
validate=validate,
log_level=log_level,
)
job.start_and_wait(
print=print,
max_poll_interval=max_poll_interval,
connection_retry_interval=connection_retry_interval,
show_error_logs=show_error_logs,
)
if outputfile is not None:
job.download_result(target=outputfile)
return job