Multi Backend Job Manager

API

Warning

This is a new experimental API, subject to change.

class openeo.extra.job_management.MultiBackendJobManager(poll_sleep=60, root_dir='.', *, cancel_running_job_after=None)[source]

Tracker for multiple jobs on multiple backends.

Usage example:

import logging
import pandas as pd
import openeo
from openeo.extra.job_management import MultiBackendJobManager

logging.basicConfig(
    format='%(asctime)s - %(name)s - %(levelname)s - %(message)s',
    level=logging.INFO
)

manager = MultiBackendJobManager()
manager.add_backend("foo", connection=openeo.connect("http://foo.test"))
manager.add_backend("bar", connection=openeo.connect("http://bar.test"))

jobs_df = pd.DataFrame(...)
output_file = "jobs.csv"

def start_job(
    row: pd.Series,
    connection: openeo.Connection,
    **kwargs
) -> openeo.BatchJob:
    year = row["year"]
    cube = connection.load_collection(
        ...,
        temporal_extent=[f"{year}-01-01", f"{year+1}-01-01"],
    )
    ...
    return cube.create_job(...)

manager.run_jobs(df=jobs_df, start_job=start_job, output_file=output_file)

See run_jobs() for more information on the start_job callable.

Parameters:
  • poll_sleep (int) – How many seconds to sleep between polls.

  • root_dir (Union[str, Path, None]) –

    Root directory to save files for the jobs, e.g. metadata and error logs. This defaults to “.” the current directory.

    Each job gets its own subfolder in this root directory. You can use the following methods to find the relevant paths, based on the job ID:

    • get_job_dir

    • get_error_log_path

    • get_job_metadata_path

  • cancel_running_job_after (Optional[int]) – Optional temporal limit (in seconds) after which running jobs should be canceled by the job manager.

Added in version 0.14.0.

Changed in version 0.32.0: Added cancel_running_job_after parameter.

add_backend(name, connection, parallel_jobs=2)[source]

Register a backend with a name and a Connection getter.

Parameters:
  • name (str) – Name of the backend.

  • connection (Union[Connection, Callable[[], Connection]]) – Either a Connection to the backend, or a callable to create a backend connection.

  • parallel_jobs (int) – Maximum number of jobs to allow in parallel on a backend.

ensure_job_dir_exists(job_id)[source]

Create the job folder if it does not exist yet.

Return type:

Path

get_error_log_path(job_id)[source]

Path where error log file for the job is saved.

Return type:

Path

get_job_dir(job_id)[source]

Path to directory where job metadata, results and error logs are be saved.

Return type:

Path

get_job_metadata_path(job_id)[source]

Path where job metadata file is saved.

Return type:

Path

on_job_cancel(job, row)[source]

Handle a job that was cancelled. Can be overridden to provide custom behaviour.

Default implementation does not do anything.

Parameters:
  • job (BatchJob) – The job that was canceled.

  • row – DataFrame row containing the job’s metadata.

on_job_done(job, row)[source]

Handles jobs that have finished. Can be overridden to provide custom behaviour.

Default implementation downloads the results into a folder containing the title.

Parameters:
  • job (BatchJob) – The job that has finished.

  • row – DataFrame row containing the job’s metadata.

on_job_error(job, row)[source]

Handles jobs that stopped with errors. Can be overridden to provide custom behaviour.

Default implementation writes the error logs to a JSON file.

Parameters:
  • job (BatchJob) – The job that has finished.

  • row – DataFrame row containing the job’s metadata.

run_jobs(df=None, start_job=<function _start_job_default>, job_db=None, **kwargs)[source]

Runs jobs, specified in a dataframe, and tracks parameters.

Parameters:
  • df (Optional[DataFrame]) – DataFrame that specifies the jobs, and tracks the jobs’ statuses. If None, the job_db has to be specified and will be used.

  • start_job (Callable[[], BatchJob]) –

    A callback which will be invoked with, amongst others, the row of the dataframe for which a job should be created and/or started. This callable should return a openeo.rest.job.BatchJob object.

    The following parameters will be passed to start_job:

    row (pandas.Series):

    The row in the pandas dataframe that stores the jobs state and other tracked data.

    connection_provider:

    A getter to get a connection by backend name. Typically, you would need either the parameter connection_provider, or the parameter connection, but likely you will not need both.

    connection (Connection):

    The Connection itself, that has already been created. Typically, you would need either the parameter connection_provider, or the parameter connection, but likely you will not need both.

    provider (str):

    The name of the backend that will run the job.

    You do not have to define all the parameters described below, but if you leave any of them out, then remember to include the *args and **kwargs parameters. Otherwise you will have an exception because run_jobs() passes unknown parameters to start_job.

  • job_db (Union[str, Path, JobDatabaseInterface, None]) –

    Job database to load/store existing job status data and other metadata from/to. Can be specified as a path to CSV or Parquet file, or as a custom database object following the JobDatabaseInterface interface.

    Note

    Support for Parquet files depends on the pyarrow package as optional dependency.

Return type:

dict

Returns:

dictionary with stats collected during the job running loop. Note that the set of fields in this dictionary is experimental and subject to change

Changed in version 0.31.0: Added support for persisting the job metadata in Parquet format.

Changed in version 0.31.0: Replace output_file argument with job_db argument, which can be a path to a CSV or Parquet file, or a user-defined JobDatabaseInterface object. The deprecated output_file argument is still supported for now.

Changed in version 0.33.0: return a stats dictionary

start_job_thread(start_job, job_db)[source]

Start running the jobs in a separate thread, returns afterwards.

Parameters:
  • start_job (Callable[[], BatchJob]) –

    A callback which will be invoked with, amongst others, the row of the dataframe for which a job should be created and/or started. This callable should return a openeo.rest.job.BatchJob object.

    The following parameters will be passed to start_job:

    row (pandas.Series):

    The row in the pandas dataframe that stores the jobs state and other tracked data.

    connection_provider:

    A getter to get a connection by backend name. Typically, you would need either the parameter connection_provider, or the parameter connection, but likely you will not need both.

    connection (Connection):

    The Connection itself, that has already been created. Typically, you would need either the parameter connection_provider, or the parameter connection, but likely you will not need both.

    provider (str):

    The name of the backend that will run the job.

    You do not have to define all the parameters described below, but if you leave any of them out, then remember to include the *args and **kwargs parameters. Otherwise you will have an exception because run_jobs() passes unknown parameters to start_job.

  • job_db (JobDatabaseInterface) –

    Job database to load/store existing job status data and other metadata from/to. Can be specified as a path to CSV or Parquet file, or as a custom database object following the JobDatabaseInterface interface.

    Note

    Support for Parquet files depends on the pyarrow package as optional dependency.

Added in version 0.32.0.

stop_job_thread(timeout_seconds=<object object>)[source]

Stop the job polling thread.

Parameters:

timeout_seconds (Optional[float]) – The time to wait for the thread to stop. By default, it will wait for 2 times the poll_sleep time. Set to None to wait indefinitely.

Added in version 0.32.0.

class openeo.extra.job_management.JobDatabaseInterface[source]

Interface for a database of job metadata to use with the MultiBackendJobManager, allowing to regularly persist the job metadata while polling the job statuses and resume/restart the job tracking after it was interrupted.

Added in version 0.31.0.

abstract count_by_status(statuses=())[source]

Retrieve the number of jobs per status.

Parameters:

statuses (Iterable[str]) – List/set of statuses to include. If empty, all statuses are included.

Return type:

dict

Returns:

dictionary with status as key and the count as value.

abstract exists()[source]

Does the job database already exist, to read job data from?

Return type:

bool

abstract get_by_status(statuses, max=None)[source]

Returns a dataframe with jobs, filtered by status.

Parameters:
  • statuses (List[str]) – List of statuses to include.

  • max – Maximum number of jobs to return.

Return type:

DataFrame

Returns:

DataFrame with jobs filtered by status.

abstract persist(df)[source]

Store job data to the database. The provided dataframe may contain partial information, which is merged into the larger database.

Parameters:

df (DataFrame) – job data to store.

class openeo.extra.job_management.CsvJobDatabase(path)[source]

Persist/load job metadata with a CSV file.

Implements:

JobDatabaseInterface

Parameters:

path (Union[str, Path]) – Path to local CSV file.

Note

Support for GeoPandas dataframes depends on the geopandas package as optional dependency.

Added in version 0.31.0.

class openeo.extra.job_management.ParquetJobDatabase(path)[source]

Persist/load job metadata with a Parquet file.

Implements:

JobDatabaseInterface

Parameters:

path (Union[str, Path]) – Path to the Parquet file.

Note

Support for Parquet files depends on the pyarrow package as optional dependency.

Support for GeoPandas dataframes depends on the geopandas package as optional dependency.

Added in version 0.31.0.

class openeo.extra.job_management.ProcessBasedJobCreator(*, process_id=None, namespace=None, parameter_defaults=None, parameter_column_map=None)[source]

Batch job creator (to be used together with MultiBackendJobManager) that takes a parameterized openEO process definition (e.g a user-defined process (UDP) or a remote openEO process definition), and creates a batch job for each row of the dataframe managed by the MultiBackendJobManager by filling in the process parameters with corresponding row values.

See also

See Job creation based on parameterized processes for more information and examples.

Process parameters are linked to dataframe columns by name. While this intuitive name-based matching should cover most use cases, there are additional options for overrides or fallbacks:

  • When provided, parameter_column_map will be consulted for resolving a process parameter name (key in the dictionary) to a desired dataframe column name (corresponding value).

  • One common case is handled automatically as convenience functionality.

    When:

    • parameter_column_map is not provided (or set to None),

    • and there is a single parameter that accepts inline GeoJSON geometries,

    • and the dataframe is a GeoPandas dataframe with a single geometry column,

    then this parameter and this geometries column will be linked automatically.

  • If a parameter can not be matched with a column by name as described above, a default value will be picked, first by looking in parameter_defaults (if provided), and then by looking up the default value from the parameter schema in the process definition.

  • Finally if no (default) value can be determined and the parameter is not flagged as optional, an error will be raised.

Parameters:
  • process_id (Optional[str]) – (optional) openEO process identifier. Can be omitted when working with a remote process definition that is fully defined with a URL in the namespace parameter.

  • namespace (Optional[str]) – (optional) openEO process namespace. Typically used to provide a URL to a remote process definition.

  • parameter_defaults (Optional[dict]) – (optional) default values for process parameters, to be used when not available in the dataframe managed by MultiBackendJobManager.

  • parameter_column_map (Optional[dict]) – Optional overrides for linking process parameters to dataframe columns: mapping of process parameter names as key to dataframe column names as value.

Added in version 0.33.0.

Warning

This is an experimental API subject to change, and we greatly welcome feedback and suggestions for improvement.

__call__(*arg, **kwargs)[source]

Syntactic sugar for calling start_job().

Return type:

BatchJob

start_job(row, connection, **_)[source]

Implementation of the start_job callable interface of MultiBackendJobManager.run_jobs() to create a job based on given dataframe row

Parameters:
  • row (Series) – The row in the pandas dataframe that stores the jobs state and other tracked data.

  • connection (Connection) – The connection to the backend.

Return type:

BatchJob

Job creation based on parameterized processes

The openEO API supports parameterized processes out of the box, which allows to work with flexible, reusable openEO building blocks in the form of user-defined processes or remote openEO process definitions. This can also be leveraged for job creation in the context of the MultiBackendJobManager: define a “template” job as a parameterized process and let the job manager fill in the parameters from a given data frame.

The ProcessBasedJobCreator helper class allows to do exactly that. Given a reference to a parameterized process, such as a user-defined process or remote process definition, it can be used directly as start_job callable to run_jobs() which will fill in the process parameters from the dataframe.

Basic ProcessBasedJobCreator example

Basic usage example with a remote process definition:

Basic ProcessBasedJobCreator example snippet
 1from openeo.extra.job_management import (
 2    MultiBackendJobManager,
 3    create_job_db,
 4    ProcessBasedJobCreator,
 5)
 6
 7# Job creator, based on a parameterized openEO process
 8# (specified by the remote process definition at given URL)
 9# which has parameters "start_date" and "bands" for example.
10job_starter = ProcessBasedJobCreator(
11    namespace="https://example.com/my_process.json",
12    parameter_defaults={
13        "bands": ["B02", "B03"],
14    },
15)
16
17# Initialize job database from a dataframe,
18# with desired parameter values to fill in.
19df = pd.DataFrame({
20    "start_date": ["2021-01-01", "2021-02-01", "2021-03-01"],
21})
22job_db = create_job_db("jobs.csv").initialize_from_df(df)
23
24# Create and run job manager,
25# which will start a job for each of the `start_date` values in the dataframe
26# and use the default band list ["B02", "B03"] for the "bands" parameter.
27job_manager = MultiBackendJobManager(...)
28job_manager.run_jobs(job_db=job_db, start_job=job_starter)

In this example, a ProcessBasedJobCreator is instantiated based on a remote process definition, which has parameters start_date and bands. When passed to run_jobs(), a job for each row in the dataframe will be created, with parameter values based on matching columns in the dataframe:

  • the start_date parameter will be filled in with the values from the “start_date” column of the dataframe,

  • the bands parameter has no corresponding column in the dataframe, and will get its value from the default specified in the parameter_defaults argument.

ProcessBasedJobCreator with geometry handling

Apart from the intuitive name-based parameter-column linking, ProcessBasedJobCreator also automatically links:

  • a process parameters that accepts inline GeoJSON geometries/features (which practically means it has a schema like {"type": "object", "subtype": "geojson"}, as produced by Parameter.geojson).

  • with the geometry column in a GeoPandas dataframe.

even if the name of the parameter does not exactly match the name of the GeoPandas geometry column (geometry by default). This automatic liking is only done if there is only one GeoJSON parameter and one geometry column in the dataframe.

to do

Add example with geometry handling.