Source code for openeo.extra.job_management._job_db

import abc
import logging
from pathlib import Path
from typing import Iterable, Union

import pandas as pd
import shapely.errors
import shapely.wkt

import openeo.extra.job_management._manager
from openeo.extra.job_management._interface import JobDatabaseInterface

_log = logging.getLogger(__name__)


class FullDataFrameJobDatabase(JobDatabaseInterface):
    def __init__(self):
        super().__init__()
        self._df = None

    def initialize_from_df(self, df: pd.DataFrame, *, on_exists: str = "error"):
        """
        Initialize the job database from a given dataframe,
        which will be first normalized to be compatible
        with :py:class:`~openeo.extra.job_management._manager.MultiBackendJobManager` usage.

        :param df: dataframe with some columns your ``start_job`` callable expects
        :param on_exists: what to do when the job database already exists (persisted on disk):
            - "error": (default) raise an exception
            - "skip": work with existing database, ignore given dataframe and skip any initialization

        :return: initialized job database.

        .. versionadded:: 0.33.0
        """
        # TODO: option to provide custom MultiBackendJobManager subclass with custom normalize?
        if self.exists():
            if on_exists == "skip":
                return self
            elif on_exists == "error":
                raise FileExistsError(f"Job database {self!r} already exists.")
            else:
                # TODO handle other on_exists modes: e.g. overwrite, merge, ...
                raise ValueError(f"Invalid on_exists={on_exists!r}")
        df = openeo.extra.job_management._manager.MultiBackendJobManager._column_requirements.normalize_df(df)
        self.persist(df)
        # Return self to allow chaining with constructor.
        return self

    @abc.abstractmethod
    def read(self) -> pd.DataFrame:
        """
        Read job data from the database as pandas DataFrame.

        :return: loaded job data.
        """
        ...

    @property
    def df(self) -> pd.DataFrame:
        if self._df is None:
            self._df = self.read()
        return self._df

    def count_by_status(self, statuses: Iterable[str] = ()) -> dict:
        status_histogram = self.df.groupby("status").size().to_dict()
        statuses = set(statuses)
        if statuses:
            status_histogram = {k: v for k, v in status_histogram.items() if k in statuses}
        return status_histogram

    def get_by_status(self, statuses, max=None) -> pd.DataFrame:
        """
        Returns a dataframe with jobs, filtered by status.

        :param statuses: List of statuses to include.
        :param max: Maximum number of jobs to return.

        :return: DataFrame with jobs filtered by status.
        """
        df = self.df
        filtered = df[df.status.isin(statuses)]
        return filtered.head(max) if max is not None else filtered

    def _merge_into_df(self, df: pd.DataFrame):
        if self._df is not None:
            unknown_indices = set(df.index).difference(df.index)
            if unknown_indices:
                _log.warning(f"Merging DataFrame with {unknown_indices=} which will be lost.")
            self._df.update(df, overwrite=True)
        else:
            self._df = df

    def get_by_indices(self, indices: Iterable[Union[int, str]]) -> pd.DataFrame:
        indices = set(indices)
        known = indices.intersection(self.df.index)
        unknown = indices.difference(self.df.index)
        if unknown:
            _log.warning(f"Ignoring unknown DataFrame indices {unknown}")
        return self._df.loc[list(known)]


[docs] class CsvJobDatabase(FullDataFrameJobDatabase): """ Persist/load job metadata with a CSV file. :implements: :py:class:`~openeo.extra.job_management._interface.JobDatabaseInterface` :param path: Path to local CSV file. .. note:: Support for GeoPandas dataframes depends on the ``geopandas`` package as :ref:`optional dependency <installation-optional-dependencies>`. .. versionadded:: 0.31.0 """ def __init__(self, path: Union[str, Path]): super().__init__() self.path = Path(path) def __repr__(self): return f"{self.__class__.__name__}({str(self.path)!r})" def exists(self) -> bool: return self.path.exists() def _is_valid_wkt(self, wkt: str) -> bool: try: shapely.wkt.loads(wkt) return True except shapely.errors.WKTReadingError: return False def read(self) -> pd.DataFrame: df = pd.read_csv( self.path, # TODO: possible to avoid hidden coupling with MultiBackendJobManager here? dtype=openeo.extra.job_management._manager.MultiBackendJobManager._column_requirements.dtype_mapping(), ) if ( "geometry" in df.columns and df["geometry"].dtype.name != "geometry" and self._is_valid_wkt(df["geometry"].iloc[0]) ): import geopandas # `df.to_csv()` in `persist()` has encoded geometries as WKT, so we decode that here. df.geometry = geopandas.GeoSeries.from_wkt(df["geometry"]) df = geopandas.GeoDataFrame(df) return df def persist(self, df: pd.DataFrame): self._merge_into_df(df) self.path.parent.mkdir(parents=True, exist_ok=True) self.df.to_csv(self.path, index=False)
[docs] class ParquetJobDatabase(FullDataFrameJobDatabase): """ Persist/load job metadata with a Parquet file. :implements: :py:class:`~openeo.extra.job_management._interface.JobDatabaseInterface` :param path: Path to the Parquet file. .. note:: Support for Parquet files depends on the ``pyarrow`` package as :ref:`optional dependency <installation-optional-dependencies>`. Support for GeoPandas dataframes depends on the ``geopandas`` package as :ref:`optional dependency <installation-optional-dependencies>`. .. versionadded:: 0.31.0 """ def __init__(self, path: Union[str, Path]): super().__init__() self.path = Path(path) def __repr__(self): return f"{self.__class__.__name__}({str(self.path)!r})" def exists(self) -> bool: return self.path.exists() def read(self) -> pd.DataFrame: # Unfortunately, a naive `pandas.read_parquet()` does not easily allow # reconstructing geometries from a GeoPandas Parquet file. # And vice-versa, `geopandas.read_parquet()` does not support reading # Parquet file without geometries. # So we have to guess which case we have. # TODO is there a cleaner way to do this? import pyarrow.parquet metadata = pyarrow.parquet.read_metadata(self.path) if b"geo" in metadata.metadata: import geopandas return geopandas.read_parquet(self.path) else: return pd.read_parquet(self.path) def persist(self, df: pd.DataFrame): self._merge_into_df(df) self.path.parent.mkdir(parents=True, exist_ok=True) self.df.to_parquet(self.path, index=False)
def get_job_db(path: Union[str, Path]) -> JobDatabaseInterface: """ Factory to get a job database at a given path, guessing the database type from filename extension. :param path: path to job database file. .. versionadded:: 0.33.0 """ path = Path(path) if path.suffix.lower() in {".csv"}: job_db = CsvJobDatabase(path=path) elif path.suffix.lower() in {".parquet", ".geoparquet"}: job_db = ParquetJobDatabase(path=path) else: raise ValueError(f"Could not guess job database type from {path!r}") return job_db def create_job_db(path: Union[str, Path], df: pd.DataFrame, *, on_exists: str = "error"): """ Factory to create a job database at given path, initialized from a given dataframe, and its database type guessed from filename extension. :param path: Path to the job database file. :param df: DataFrame to store in the job database. :param on_exists: What to do when the job database already exists: - "error": (default) raise an exception - "skip": work with existing database, ignore given dataframe and skip any initialization .. versionadded:: 0.33.0 """ job_db = get_job_db(path) if isinstance(job_db, FullDataFrameJobDatabase): job_db.initialize_from_df(df=df, on_exists=on_exists) else: raise NotImplementedError(f"Initialization of {type(job_db)} is not supported.") return job_db