Multi Backend Job Manager

Warning

This is a new experimental API, subject to change.

class openeo.extra.job_management.MultiBackendJobManager(poll_sleep=60, root_dir='.')[source]

Tracker for multiple jobs on multiple backends.

Usage example:

import logging
import pandas as pd
import openeo
from openeo.extra.job_management import MultiBackendJobManager

logging.basicConfig(
    format='%(asctime)s - %(name)s - %(levelname)s - %(message)s',
    level=logging.INFO
)

manager = MultiBackendJobManager()
manager.add_backend("foo", connection=openeo.connect("http://foo.test"))
manager.add_backend("bar", connection=openeo.connect("http://bar.test"))

jobs_df = pd.DataFrame(...)
output_file = "jobs.csv"

def start_job(
    row: pd.Series,
    connection: openeo.Connection,
    **kwargs
) -> openeo.BatchJob:
    year = row["year"]
    cube = connection.load_collection(
        ...,
        temporal_extent=[f"{year}-01-01", f"{year+1}-01-01"],
    )
    ...
    return cube.create_job(...)

manager.run_jobs(df=jobs_df, start_job=start_job, output_file=output_file)

See run_jobs() for more information on the start_job callable.

New in version 0.14.0.

add_backend(name, connection, parallel_jobs=2)[source]

Register a backend with a name and a Connection getter.

Parameters:
  • name (str) – Name of the backend.

  • connection (Union[Connection, Callable[[], Connection]]) – Either a Connection to the backend, or a callable to create a backend connection.

  • parallel_jobs (int) – Maximum number of jobs to allow in parallel on a backend.

ensure_job_dir_exists(job_id)[source]

Create the job folder if it does not exist yet.

Return type:

Path

get_error_log_path(job_id)[source]

Path where error log file for the job is saved.

Return type:

Path

get_job_dir(job_id)[source]

Path to directory where job metadata, results and error logs are be saved.

Return type:

Path

get_job_metadata_path(job_id)[source]

Path where job metadata file is saved.

Return type:

Path

on_job_done(job, row)[source]

Handles jobs that have finished. Can be overridden to provide custom behaviour.

Default implementation downloads the results into a folder containing the title.

Parameters:
  • job (BatchJob) – The job that has finished.

  • row – DataFrame row containing the job’s metadata.

on_job_error(job, row)[source]

Handles jobs that stopped with errors. Can be overridden to provide custom behaviour.

Default implementation writes the error logs to a JSON file.

Parameters:
  • job (BatchJob) – The job that has finished.

  • row – DataFrame row containing the job’s metadata.

run_jobs(df, start_job, output_file)[source]

Runs jobs, specified in a dataframe, and tracks parameters.

Parameters:
  • df (DataFrame) – DataFrame that specifies the jobs, and tracks the jobs’ statuses.

  • start_job (Callable[[], BatchJob]) –

    A callback which will be invoked with, amongst others, the row of the dataframe for which a job should be created and/or started. This callable should return a openeo.rest.job.BatchJob object.

    The following parameters will be passed to start_job:

    row (pandas.Series):

    The row in the pandas dataframe that stores the jobs state and other tracked data.

    connection_provider:

    A getter to get a connection by backend name. Typically, you would need either the parameter connection_provider, or the parameter connection, but likely you will not need both.

    connection (Connection):

    The Connection itself, that has already been created. Typically, you would need either the parameter connection_provider, or the parameter connection, but likely you will not need both.

    provider (str):

    The name of the backend that will run the job.

    You do not have to define all the parameters described below, but if you leave any of them out, then remember to include the *args and **kwargs parameters. Otherwise you will have an exception because run_jobs() passes unknown parameters to start_job.

  • output_file (Union[str, Path]) – Path to output file (CSV) containing the status and metadata of the jobs.

openeo.extra.job_management.ignore_connection_errors(context=None)[source]

Context manager to ignore connection errors.