Multi Backend Job Manager¶
The MultiBackendJobManager
helps to run and manage a large number of batch jobs
across one or more openEO backends.
It handles job creation, submission, status tracking, result downloading,
error handling, and persistence of job metadata, all automatically.
It is designed for scenarios where you need to process many tasks in parallel, for example tiling a large area of interest into smaller regions and running a batch job for each tile.
Tip
For hands-on, end-to-end Jupyter notebook examples, see the Managing Multiple Large Scale Jobs notebooks in the openEO community examples repository. These cover real-world workflows including job splitting, result visualization, and more.
Getting Started¶
There are three main ingredients to using the
MultiBackendJobManager:
A manager with one or more registered backends.
A job database (backed by a DataFrame) that describes the work to do; one row per job.
A start_job callback that turns a single row into an openEO batch job.
The sections below walk through each of these, and then show how they come together.
Setting up the manager¶
Create a MultiBackendJobManager
and register the backend you want to use.
Each backend gets a name and an authenticated connection
Connection:
import openeo
from openeo.extra.job_management import MultiBackendJobManager
manager = MultiBackendJobManager()
manager.add_backend("cdse", connection=openeo.connect(
"https://openeo.dataspace.copernicus.eu/"
).authenticate_oidc())
You can register more than one backend, the manager will distribute jobs across them automatically:
manager.add_backend("dev", connection=openeo.connect(
"https://openeo-dev.example.com"
).authenticate_oidc())
The optional parallel_jobs argument to
add_backend()
controls how many jobs the manager will try to keep active simultaneously on that backend (default: 2).
This is the manager’s own limit, independent of the backend’s infrastructure limits.
The actual number of jobs that can run in parallel depends on the backend’s capacity per user.
In addition, the manager also applies an internal queueing cap per backend to avoid flooding a backend with too many queued jobs at once.
Preparing the job database¶
The job database is a pandas.DataFrame where each row
represents one job you want to run. The columns hold the parameters
your start_job callback will read for example a year, a spatial
extent, a file path, etc.
Wrap the DataFrame in a persistent job database (CSV or Parquet) so progress is saved to disk and can be resumed if interrupted:
import pandas as pd
from openeo.extra.job_management import create_job_db
df = pd.DataFrame({
"spatial_extent": [
{"west": 5.0, "south": 51.0, "east": 5.1, "north": 51.1},
{"west": 5.1, "south": 51.1, "east": 5.2, "north": 51.2},
],
"year": [2021, 2022],
})
job_db = create_job_db("jobs.csv", df=df)
The manager will automatically add bookkeeping columns
(status, id, backend_name, start_time, …),
you only need to supply the columns relevant to your processing.
Defining the start_job callback¶
The start_job callback is a function you write. It receives a
pandas.Series (one row of the DataFrame) and a
Connection, and should return
a BatchJob:
def start_job(row, connection, **kwargs):
cube = connection.load_collection(
"SENTINEL2_L2A",
spatial_extent=row["spatial_extent"],
temporal_extent=[f"{row['year']}-01-01", f"{row['year']+1}-01-01"],
bands=["B04", "B08"],
)
cube = cube.ndvi(nir="B08", red="B04")
return cube.create_job(
title=f"NDVI {row['year']}",
out_format="GTiff",
)
A few things to note:
The callback should create the job (
create_job), but does not need to start it, the manager takes care of that.Always include
**kwargsso the manager can pass extra arguments (likeprovider,connection_provider) without causing errors.You can read any column you put in the DataFrame via
row["..."].
See run_jobs()
for the full list of parameters passed to the callback.
Running everything¶
With all three pieces in place, a single call kicks off the processing loop. It blocks until every job has finished, failed, or been canceled:
import logging
logging.basicConfig(
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s',
level=logging.INFO,
)
manager.run_jobs(job_db=job_db, start_job=start_job)
The logging (as shown above) is recommended as the manager logs status changes, retries, and errors so you can follow progress.
Job Database¶
The job manager persists job metadata (status, backend, timing, costs, …) to a job database so that processing can be resumed after an interruption. Several storage backends are available.
CSV and Parquet files¶
The easiest option is to use a local CSV or Parquet file.
Use the create_job_db() factory
to create and initialize a job database from a pandas.DataFrame or a geopandas.GeoDataFrame:
from openeo.extra.job_management import create_job_db
job_db = create_job_db("jobs.csv", df=df)
# or for Parquet:
job_db = create_job_db("jobs.parquet", df=df)
If the file already exists (e.g. from a previous interrupted run),
you can re-open it with get_job_db():
from openeo.extra.job_management import get_job_db
job_db = get_job_db("jobs.csv")
and pass it directly to
run_jobs()
to resume where you left off.
Tip
Parquet files are generally recommended over CSV for large job databases,
as they are faster to read/write and handle data types more reliably.
Parquet support requires the pyarrow package
(see optional dependencies).
STAC API (experimental)¶
For advanced use cases, the
STACAPIJobDatabase
allows persisting job metadata to a STAC API service.
This is an unstable, experimental feature.
from openeo.extra.job_management.stac_job_db import STACAPIJobDatabase
job_db = STACAPIJobDatabase(
collection_id="my-jobs",
stac_root_url="https://stac.example.com",
)
job_db.initialize_from_df(df)
Custom interfaces¶
You can implement your own storage backend by subclassing
JobDatabaseInterface.
See the API reference below for the full interface.
Customizing Job Handling¶
The MultiBackendJobManager provides
callback methods that can be overridden to customize what happens
when a job finishes, fails, or is canceled:
on_job_done(): called when a job completes successfully. The default implementation downloads the results and saves job metadata.on_job_error(): called when a job fails with an error. The default implementation saves the error logs to a JSON file.on_job_cancel(): called when a job is canceled. The default implementation does nothing.
Example: subclass to add custom post-processing:
class MyJobManager(MultiBackendJobManager):
def on_job_done(self, job, row):
# First, do the default download
super().on_job_done(job, row)
# Then add custom post-processing
job_dir = self.get_job_dir(job.job_id)
print(f"Results for job {job.job_id} saved to {job_dir}")
def on_job_error(self, job, row):
super().on_job_error(job, row)
# e.g. send a notification
print(f"Job {job.job_id} failed!")
Automatic Result Downloading¶
By default, the job manager downloads results of completed jobs automatically.
This can be disabled by setting download_results=False:
manager = MultiBackendJobManager(download_results=False)
Results and metadata are saved under the root_dir directory
(defaults to the current directory), in per-job subfolders like job_{job_id}/.
Added in version 0.47.0: The download_results parameter.
Canceling Long-Running Jobs¶
You can set an automatic timeout for running jobs with the
cancel_running_job_after parameter (in seconds).
Jobs that exceed this duration will be automatically canceled:
# Cancel any job that has been running for more than 2 hours
manager = MultiBackendJobManager(cancel_running_job_after=7200)
Added in version 0.32.0.
Running in a Background Thread¶
By default, run_jobs() blocks the main thread until all jobs are finished, failed, or canceled.
To keep your main program responsive (e.g., in a Jupyter notebook or GUI),
run the job manager loop in a background thread so you can still monitor
or interact with the dataframe.
manager.start_job_thread(start_job=start_job, job_db=job_db)
# ... do other work in the main thread ...
# For example, you can monitor job_db, update a UI, or submit new jobs.
# When done, stop the background thread
manager.stop_job_thread()
While the background thread is running, you can inspect the job database (e.g., with pandas or geopandas) to monitor progress, or perform other tasks in your main program. This is especially useful in interactive environments where you want to avoid blocking the UI or kernel.
Caveats:
The background thread will keep running until all jobs are finished, failed, or canceled, or until you call
stop_job_thread().Logging output from the background thread will still appear in the console.
Added in version 0.32.0.
Job Status Tracking¶
The job database includes a status column that reflects the lifecycle of each job.
This makes it easy to monitor progress and spot failures directly in the DataFrame.
status
Typical lifecycle:
not_started→queued_for_start→created→queued→running→ terminal state.Terminal states are:
finished: job completed successfully.
error: job failed after submission.
canceled: job was canceled.
start_failed: job could not be created/submitted by the manager.
skipped: job was intentionally not submitted.In short, most jobs follow the standard openEO states, while
not_started,queued_for_start,start_failed, andskippedare manager-side bookkeeping states.
Job creation based on parameterized processes¶
The openEO API supports parameterized processes out of the box, which allows to work with flexible, reusable openEO building blocks in the form of user-defined processes.
This can also be leveraged for job creation in the context of the
MultiBackendJobManager:
define a “template” job as a parameterized process
and let the job manager fill in the parameters from a given data frame of parameter values.
The ProcessBasedJobCreator helper class
allows to do exactly that.
Given a reference to a parameterized process,
such as a user-defined process or remote process definition,
it can be used directly as start_job callable to
run_jobs()
which will fill in the process parameters from the dataframe.
Example use case:¶
The ProcessBasedJobCreator is especially useful
for running the same UDP many times, each with different parameters;
for example, different spatial extents, time ranges, or bands.
This is a common pattern for large-scale analysis such as computing NDVI across many tiles or time periods.
For a real-world, end-to-end example (including visualization and result management), see the Jupyter notebook: VisualisingMultipleOpeneoJobs.ipynb in the openEO community examples repository.
Three rules govern how parameters are handled:
The UDP must declare all parameters it needs (e.g.
bands,spatial_extent,temporal_extent). The namespace URL (or backend process ID) points to that UDP definition.Constant parameters identical values for every job; go in
parameter_defaults. They will be used for any job whose DataFrame row does not override them.Varying parameters which differ per job, must be columns in the job database DataFrame, with column names that exactly match the UDP parameter names. The value from each row is passed to the corresponding parameter for that job.
Below is a minimal example where bands and spatial_extent are constant (set via parameter_defaults)
while temporal_extent varies per job (set via a DataFrame column):
ProcessBasedJobCreator example¶ 1import pandas as pd
2from openeo.extra.job_management import MultiBackendJobManager, create_job_db
3from openeo.extra.job_management.process_based import ProcessBasedJobCreator
4
5# Point to the remote UDP definition (e.g. hosted on an openEO backend or public URL).
6# The UDP is expected to accept parameters: bands, spatial_extent, temporal_extent.
7job_starter = ProcessBasedJobCreator(
8 namespace="https://example.com/ndvi_process.json",
9 parameter_defaults={
10 # These values are constant across all jobs.
11 "bands": ["B04", "B08"],
12 "spatial_extent": {"west": 5.0, "south": 51.0, "east": 5.1, "north": 51.1},
13 },
14)
15
16# Each row defines one job. The column name must match the UDP parameter name exactly.
17# Here, temporal_extent varies per job; bands and spatial_extent use the defaults above.
18df = pd.DataFrame({
19 "temporal_extent": [
20 ["2021-01-01", "2021-01-31"],
21 ["2021-02-01", "2021-02-28"],
22 ["2021-03-01", "2021-03-31"],
23 ],
24})
25
26job_db = create_job_db("jobs.csv", df=df)
27
28job_manager = MultiBackendJobManager(...)
29job_manager.run_jobs(job_db=job_db, start_job=job_starter)
Tip
To vary any parameter per job (e.g. bands or spatial_extent),
simply add a column with the matching name to the DataFrame.
A column value always takes precedence over the corresponding parameter_defaults entry.
API Reference¶
Warning
This is a new experimental API, subject to change.
MultiBackendJobManager¶
- class openeo.extra.job_management.MultiBackendJobManager(poll_sleep=60, root_dir='.', *, download_results=True, cancel_running_job_after=None)[source]¶
Tracker for multiple jobs on multiple backends.
Usage example:
import logging import pandas as pd import openeo from openeo.extra.job_management import MultiBackendJobManager logging.basicConfig( format='%(asctime)s - %(name)s - %(levelname)s - %(message)s', level=logging.INFO ) manager = MultiBackendJobManager() manager.add_backend("foo", connection=openeo.connect("http://foo.test")) manager.add_backend("bar", connection=openeo.connect("http://bar.test")) jobs_df = pd.DataFrame(...) output_file = "jobs.csv" def start_job( row: pd.Series, connection: openeo.Connection, **kwargs ) -> openeo.BatchJob: year = row["year"] cube = connection.load_collection( ..., temporal_extent=[f"{year}-01-01", f"{year+1}-01-01"], ) ... return cube.create_job(...) manager.run_jobs(df=jobs_df, start_job=start_job, output_file=output_file)
See
run_jobs()for more information on thestart_jobcallable.- Parameters:
poll_sleep (
int) – How many seconds to sleep between polls.root_dir (
Union[str,Path,None]) –Root directory to save files for the jobs, e.g. metadata and error logs. This defaults to “.” the current directory.
Each job gets its own subfolder in this root directory. You can use the following methods to find the relevant paths, based on the job ID:
get_job_dir
get_error_log_path
get_job_metadata_path
download_results (
bool) – Whether to download job results automatically once the job is completed.cancel_running_job_after (
Optional[int]) – Optional temporal limit (in seconds) after which running jobs should be canceled by the job manager.
Added in version 0.14.0.
Changed in version 0.32.0: Added
cancel_running_job_afterparameter.Changed in version 0.47.0: Added
download_resultsparameter.- add_backend(name, connection, parallel_jobs=2)[source]¶
Register a backend with a name and a
Connectiongetter.Note
For optimal throughput and responsiveness, it is recommended to provide a
Connectioninstance without its own (blocking) retry behavior, since the job manager will do retries in a non-blocking way, allowing to take care of other tasks before retrying failed requests.- Parameters:
name (
str) – Name of the backend.connection (
Union[Connection,Callable[[],Connection]]) – Either a Connection to the backend, or a callable to create a backend connection.parallel_jobs (
int) – Maximum number of jobs to allow in parallel on a backend.
- get_job_dir(job_id)[source]¶
Path to directory where job metadata, results and error logs are be saved.
- Return type:
- on_job_cancel(job, row)[source]¶
Handle a job that was cancelled. Can be overridden to provide custom behaviour.
Default implementation does not do anything.
- Parameters:
job (
BatchJob) – The job that was canceled.row – DataFrame row containing the job’s metadata.
- on_job_done(job, row)[source]¶
Handles jobs that have finished. Can be overridden to provide custom behaviour.
Default implementation downloads the results into a folder containing the title.
- Parameters:
job (
BatchJob) – The job that has finished.row – DataFrame row containing the job’s metadata.
- on_job_error(job, row)[source]¶
Handles jobs that stopped with errors. Can be overridden to provide custom behaviour.
Default implementation writes the error logs to a JSON file.
- Parameters:
job (
BatchJob) – The job that has finished.row – DataFrame row containing the job’s metadata.
- run_jobs(df=None, start_job=<function _start_job_default>, job_db=None, **kwargs)[source]¶
Runs jobs, specified in a dataframe, and tracks parameters.
- Parameters:
df (
Optional[DataFrame]) – DataFrame that specifies the jobs, and tracks the jobs’ statuses. If None, the job_db has to be specified and will be used.start_job (
Callable[[],BatchJob]) –A callback which will be invoked with, amongst others, the row of the dataframe for which a job should be created and/or started. This callable should return a
openeo.rest.job.BatchJobobject.The following parameters will be passed to
start_job:row(pandas.Series):The row in the pandas dataframe that stores the jobs state and other tracked data.
connection_provider:A getter to get a connection by backend name. Typically, you would need either the parameter
connection_provider, or the parameterconnection, but likely you will not need both.connection(Connection):The
Connectionitself, that has already been created. Typically, you would need either the parameterconnection_provider, or the parameterconnection, but likely you will not need both.provider(str):The name of the backend that will run the job.
You do not have to define all the parameters described below, but if you leave any of them out, then remember to include the
*argsand**kwargsparameters. Otherwise you will have an exception becauserun_jobs()passes unknown parameters tostart_job.job_db (
Union[str,Path,JobDatabaseInterface,None]) –Job database to load/store existing job status data and other metadata from/to. Can be specified as a path to CSV or Parquet file, or as a custom database object following the
JobDatabaseInterfaceinterface.Note
Support for Parquet files depends on the
pyarrowpackage as optional dependency.
- Return type:
- Returns:
dictionary with stats collected during the job running loop. Note that the set of fields in this dictionary is experimental and subject to change
Changed in version 0.31.0: Added support for persisting the job metadata in Parquet format.
Changed in version 0.31.0: Replace
output_fileargument withjob_dbargument, which can be a path to a CSV or Parquet file, or a user-definedJobDatabaseInterfaceobject. The deprecatedoutput_fileargument is still supported for now.Changed in version 0.33.0: return a stats dictionary
- start_job_thread(start_job, job_db)[source]¶
Start running the jobs in a separate thread, returns afterwards.
- Parameters:
start_job (
Callable[[],BatchJob]) –A callback which will be invoked with, amongst others, the row of the dataframe for which a job should be created and/or started. This callable should return a
openeo.rest.job.BatchJobobject.The following parameters will be passed to
start_job:row(pandas.Series):The row in the pandas dataframe that stores the jobs state and other tracked data.
connection_provider:A getter to get a connection by backend name. Typically, you would need either the parameter
connection_provider, or the parameterconnection, but likely you will not need both.connection(Connection):The
Connectionitself, that has already been created. Typically, you would need either the parameterconnection_provider, or the parameterconnection, but likely you will not need both.provider(str):The name of the backend that will run the job.
You do not have to define all the parameters described below, but if you leave any of them out, then remember to include the
*argsand**kwargsparameters. Otherwise you will have an exception becauserun_jobs()passes unknown parameters tostart_job.job_db (
JobDatabaseInterface) –Job database to load/store existing job status data and other metadata from/to. Can be specified as a path to CSV or Parquet file, or as a custom database object following the
JobDatabaseInterfaceinterface.Note
Support for Parquet files depends on the
pyarrowpackage as optional dependency.
Added in version 0.32.0.
Job Database¶
- class openeo.extra.job_management.JobDatabaseInterface[source]¶
Interface for a database of job metadata to use with the
MultiBackendJobManager, allowing to regularly persist the job metadata while polling the job statuses and resume/restart the job tracking after it was interrupted.Added in version 0.31.0.
- abstractmethod exists()[source]¶
Does the job database already exist, to read job data from?
- Return type:
- abstractmethod get_by_indices(indices)[source]¶
Returns a dataframe with jobs based on their (dataframe) index
- abstractmethod get_by_status(statuses, max=None)[source]¶
Returns a dataframe with jobs, filtered by status.
- abstractmethod persist(df)[source]¶
Store (now or updated) job data to the database.
The provided dataframe may only cover a subset of all the jobs (“rows”) of the whole database, so it should be merged with the existing data (if any) instead of overwriting it completely.
- Parameters:
df (
DataFrame) – job data to store.
- class openeo.extra.job_management.FullDataFrameJobDatabase[source]¶
- initialize_from_df(df, *, on_exists='error')[source]¶
Initialize the job database from a given dataframe, which will be first normalized to be compatible with
MultiBackendJobManagerusage.- Parameters:
df (
DataFrame) – dataframe with some columns yourstart_jobcallable expectson_exists (
str) – what to do when the job database already exists (persisted on disk): - “error”: (default) raise an exception - “skip”: work with existing database, ignore given dataframe and skip any initialization
- Returns:
initialized job database.
Added in version 0.33.0.
- class openeo.extra.job_management.CsvJobDatabase(path)[source]¶
Persist/load job metadata with a CSV file.
- Implements:
- Parameters:
Note
Support for GeoPandas dataframes depends on the
geopandaspackage as optional dependency.Added in version 0.31.0.
- class openeo.extra.job_management.ParquetJobDatabase(path)[source]¶
Persist/load job metadata with a Parquet file.
- Implements:
- Parameters:
Note
Support for Parquet files depends on the
pyarrowpackage as optional dependency.Support for GeoPandas dataframes depends on the
geopandaspackage as optional dependency.Added in version 0.31.0.
- openeo.extra.job_management.create_job_db(path, df, *, on_exists='error')[source]¶
Factory to create a job database at given path, initialized from a given dataframe, and its database type guessed from filename extension.
- Parameters:
df (
DataFrame) – DataFrame to store in the job database.on_exists (
str) – What to do when the job database already exists: - “error”: (default) raise an exception - “skip”: work with existing database, ignore given dataframe and skip any initialization
Added in version 0.33.0.
ProcessBasedJobCreator¶
- class openeo.extra.job_management.process_based.ProcessBasedJobCreator(*, process_id=None, namespace=None, parameter_defaults=None, parameter_column_map=None)[source]¶
Batch job creator (to be used together with
MultiBackendJobManager) that takes a parameterized openEO process definition (e.g a user-defined process (UDP) or a remote openEO process definition), and creates a batch job for each row of the dataframe managed by theMultiBackendJobManagerby filling in the process parameters with corresponding row values.See also
See Job creation based on parameterized processes for more information and examples.
Process parameters are linked to dataframe columns by name. While this intuitive name-based matching should cover most use cases, there are additional options for overrides or fallbacks:
When provided,
parameter_column_mapwill be consulted for resolving a process parameter name (key in the dictionary) to a desired dataframe column name (corresponding value).One common case is handled automatically as convenience functionality.
When:
parameter_column_mapis not provided (or set toNone),and there is a single parameter that accepts inline GeoJSON geometries,
and the dataframe is a GeoPandas dataframe with a single geometry column,
then this parameter and this geometries column will be linked automatically.
If a parameter can not be matched with a column by name as described above, a default value will be picked, first by looking in
parameter_defaults(if provided), and then by looking up the default value from the parameter schema in the process definition.Finally if no (default) value can be determined and the parameter is not flagged as optional, an error will be raised.
- Parameters:
process_id (
Optional[str]) – (optional) openEO process identifier. Can be omitted when working with a remote process definition that is fully defined with a URL in thenamespaceparameter.namespace (
Optional[str]) – (optional) openEO process namespace. Typically used to provide a URL to a remote process definition.parameter_defaults (
Optional[dict]) – (optional) default values for process parameters, to be used when not available in the dataframe managed byMultiBackendJobManager.parameter_column_map (
Optional[dict]) – Optional overrides for linking process parameters to dataframe columns: mapping of process parameter names as key to dataframe column names as value.
Added in version 0.33.0.
Warning
This is an experimental API subject to change, and we greatly welcome feedback and suggestions for improvement.
- __call__(*arg, **kwargs)[source]¶
Syntactic sugar for calling
start_job().- Return type:
- start_job(row, connection, **_)[source]¶
Implementation of the
start_jobcallable interface ofMultiBackendJobManager.run_jobs()to create a job based on given dataframe row- Parameters:
row (
Series) – The row in the pandas dataframe that stores the jobs state and other tracked data.connection (
Connection) – The connection to the backend.
- Return type: