Multi Backend Job Manager¶
Warning
This is a new experimental API, subject to change.
- class openeo.extra.job_management.MultiBackendJobManager(poll_sleep=60, root_dir='.', *, cancel_running_job_after=None)[source]¶
Tracker for multiple jobs on multiple backends.
Usage example:
import logging import pandas as pd import openeo from openeo.extra.job_management import MultiBackendJobManager logging.basicConfig( format='%(asctime)s - %(name)s - %(levelname)s - %(message)s', level=logging.INFO ) manager = MultiBackendJobManager() manager.add_backend("foo", connection=openeo.connect("http://foo.test")) manager.add_backend("bar", connection=openeo.connect("http://bar.test")) jobs_df = pd.DataFrame(...) output_file = "jobs.csv" def start_job( row: pd.Series, connection: openeo.Connection, **kwargs ) -> openeo.BatchJob: year = row["year"] cube = connection.load_collection( ..., temporal_extent=[f"{year}-01-01", f"{year+1}-01-01"], ) ... return cube.create_job(...) manager.run_jobs(df=jobs_df, start_job=start_job, output_file=output_file)
See
run_jobs()
for more information on thestart_job
callable.Added in version 0.14.0.
- add_backend(name, connection, parallel_jobs=2)[source]¶
Register a backend with a name and a Connection getter.
- Parameters:
name (
str
) – Name of the backend.connection (
Union
[Connection
,Callable
[[],Connection
]]) – Either a Connection to the backend, or a callable to create a backend connection.parallel_jobs (
int
) – Maximum number of jobs to allow in parallel on a backend.
- ensure_job_dir_exists(job_id)[source]¶
Create the job folder if it does not exist yet.
- Return type:
Path
- get_error_log_path(job_id)[source]¶
Path where error log file for the job is saved.
- Return type:
Path
- get_job_dir(job_id)[source]¶
Path to directory where job metadata, results and error logs are be saved.
- Return type:
Path
- on_job_cancel(job, row)[source]¶
Handle a job that was cancelled. Can be overridden to provide custom behaviour.
Default implementation does not do anything.
- Parameters:
job (
BatchJob
) – The job that was canceled.row – DataFrame row containing the job’s metadata.
- on_job_done(job, row)[source]¶
Handles jobs that have finished. Can be overridden to provide custom behaviour.
Default implementation downloads the results into a folder containing the title.
- Parameters:
job (
BatchJob
) – The job that has finished.row – DataFrame row containing the job’s metadata.
- on_job_error(job, row)[source]¶
Handles jobs that stopped with errors. Can be overridden to provide custom behaviour.
Default implementation writes the error logs to a JSON file.
- Parameters:
job (
BatchJob
) – The job that has finished.row – DataFrame row containing the job’s metadata.
- run_jobs(df, start_job, job_db=None, **kwargs)[source]¶
Runs jobs, specified in a dataframe, and tracks parameters.
- Parameters:
df (
DataFrame
) – DataFrame that specifies the jobs, and tracks the jobs’ statuses.start_job (
Callable
[[],BatchJob
]) –A callback which will be invoked with, amongst others, the row of the dataframe for which a job should be created and/or started. This callable should return a
openeo.rest.job.BatchJob
object.The following parameters will be passed to
start_job
:row
(pandas.Series
):The row in the pandas dataframe that stores the jobs state and other tracked data.
connection_provider
:A getter to get a connection by backend name. Typically, you would need either the parameter
connection_provider
, or the parameterconnection
, but likely you will not need both.connection
(Connection
):The
Connection
itself, that has already been created. Typically, you would need either the parameterconnection_provider
, or the parameterconnection
, but likely you will not need both.provider
(str
):The name of the backend that will run the job.
You do not have to define all the parameters described below, but if you leave any of them out, then remember to include the
*args
and**kwargs
parameters. Otherwise you will have an exception becauserun_jobs()
passes unknown parameters tostart_job
.job_db (
Union
[str
,Path
,JobDatabaseInterface
,None
]) –Job database to load/store existing job status data and other metadata from/to. Can be specified as a path to CSV or Parquet file, or as a custom database object following the
JobDatabaseInterface
interface.Note
Support for Parquet files depends on the
pyarrow
package as optional dependency.
Changed in version 0.31.0: Added support for persisting the job metadata in Parquet format.
Changed in version 0.31.0: Replace
output_file
argument withjob_db
argument, which can be a path to a CSV or Parquet file, or a user-definedJobDatabaseInterface
object. The deprecatedoutput_file
argument is still supported for now.
- class openeo.extra.job_management.JobDatabaseInterface[source]¶
Interface for a database of job metadata to use with the
MultiBackendJobManager
, allowing to regularly persist the job metadata while polling the job statuses and resume/restart the job tracking after it was interrupted.Added in version 0.31.0.
- abstract exists()[source]¶
Does the job database already exist, to read job data from?
- Return type:
bool
- class openeo.extra.job_management.CsvJobDatabase(path)[source]¶
Persist/load job metadata with a CSV file.
- Implements:
- Parameters:
path (
Union
[str
,Path
]) – Path to local CSV file.
Note
Support for GeoPandas dataframes depends on the
geopandas
package as optional dependency.Added in version 0.31.0.
- class openeo.extra.job_management.ParquetJobDatabase(path)[source]¶
Persist/load job metadata with a Parquet file.
- Implements:
- Parameters:
path (
Union
[str
,Path
]) – Path to the Parquet file.
Note
Support for Parquet files depends on the
pyarrow
package as optional dependency.Support for GeoPandas dataframes depends on the
geopandas
package as optional dependency.Added in version 0.31.0.