Source code for openeo.metadata

from __future__ import annotations

import logging
import warnings
from typing import Any, Callable, List, NamedTuple, Optional, Tuple, Union

import pystac

from openeo.internal.jupyter import render_component
from openeo.util import deep_get

_log = logging.getLogger(__name__)

class MetadataException(Exception):

class DimensionAlreadyExistsException(MetadataException):

class Dimension:
    """Base class for dimensions."""

    def __init__(self, type: str, name: str):
        self.type = type = name

    def __repr__(self):
        return "{c}({f})".format(
            f=", ".join("{k!s}={v!r}".format(k=k, v=v) for (k, v) in self.__dict__.items())

    def __eq__(self, other):
        return self.__class__ == other.__class__ and self.__dict__ == other.__dict__

    def rename(self, name) -> Dimension:
        """Create new dimension with new name."""
        return Dimension(type=self.type, name=name)

    def rename_labels(self, target, source) -> Dimension:
        Rename labels, if the type of dimension allows it.

        :param target: List of target labels
        :param source: Source labels, or empty list
        :return: A new dimension with modified labels, or the same if no change is applied.
        # In general, we don't have/manage label info here, so do nothing.
        return Dimension(type=self.type,

[docs] class SpatialDimension(Dimension): DEFAULT_CRS = 4326 def __init__( self, name: str, extent: Union[Tuple[float, float], List[float]], crs: Union[str, int, dict] = DEFAULT_CRS, step=None, ): """ @param name: @param extent: @param crs: @param step: The space between the values. Use null for irregularly spaced steps. """ super().__init__(type="spatial", name=name) self.extent = extent = crs self.step = step
[docs] def rename(self, name) -> Dimension: return SpatialDimension(name=name, extent=self.extent,, step=self.step)
[docs] class TemporalDimension(Dimension): def __init__(self, name: str, extent: Union[Tuple[str, str], List[str]]): super().__init__(type="temporal", name=name) self.extent = extent
[docs] def rename(self, name) -> Dimension: return TemporalDimension(name=name, extent=self.extent)
[docs] def rename_labels(self, target, source) -> Dimension: # TODO should we check if the extent has changed with the new labels? return TemporalDimension(, extent=self.extent)
class Band(NamedTuple): """ Simple container class for band metadata. Based on """ name: str common_name: Optional[str] = None # wavelength in micrometer wavelength_um: Optional[float] = None aliases: Optional[List[str]] = None # "openeo:gsd" field ( gsd: Optional[dict] = None
[docs] class BandDimension(Dimension): def __init__(self, name: str, bands: List[Band]): super().__init__(type="bands", name=name) self.bands = bands @property def band_names(self) -> List[str]: return [ for b in self.bands] @property def band_aliases(self) -> List[List[str]]: return [b.aliases for b in self.bands] @property def common_names(self) -> List[str]: return [b.common_name for b in self.bands]
[docs] def band_index(self, band: Union[int, str]) -> int: """ Resolve a given band (common) name/index to band index :param band: band name, common name or index :return int: band index """ band_names = self.band_names if isinstance(band, int) and 0 <= band < len(band_names): return band elif isinstance(band, str): common_names = self.common_names # First try common names if possible if band in common_names: return common_names.index(band) if band in band_names: return band_names.index(band) # Check band aliases to still support old band names aliases = [True if aliases and band in aliases else False for aliases in self.band_aliases] if any(aliases): return aliases.index(True) raise ValueError("Invalid band name/index {b!r}. Valid names: {n!r}".format(b=band, n=band_names))
[docs] def band_name(self, band: Union[str, int], allow_common=True) -> str: """Resolve (common) name or index to a valid (common) name""" if isinstance(band, str): if band in self.band_names: return band elif band in self.common_names: if allow_common: return band else: return self.band_names[self.common_names.index(band)] elif any([True if aliases and band in aliases else False for aliases in self.band_aliases]): return self.band_names[self.band_index(band)] elif isinstance(band, int) and 0 <= band < len(self.bands): return self.band_names[band] raise ValueError("Invalid band name/index {b!r}. Valid names: {n!r}".format(b=band, n=self.band_names))
[docs] def filter_bands(self, bands: List[Union[int, str]]) -> BandDimension: """ Construct new BandDimension with subset of bands, based on given band indices or (common) names """ return BandDimension(, bands=[self.bands[self.band_index(b)] for b in bands] )
[docs] def append_band(self, band: Band) -> BandDimension: """Create new BandDimension with appended band.""" if in self.band_names: raise ValueError("Duplicate band {b!r}".format(b=band)) return BandDimension(, bands=self.bands + [band] )
[docs] def rename_labels(self, target, source) -> Dimension: if source: if len(target) != len(source): raise ValueError( "In rename_labels, `target` and `source` should have same number of labels, " "but got: `target` {t} and `source` {s}".format(t=target, s=source) ) new_bands = self.bands.copy() for old_name, new_name in zip(source, target): band_index = self.band_index(old_name) the_band = new_bands[band_index] new_bands[band_index] = Band( name=new_name, common_name=the_band.common_name, wavelength_um=the_band.wavelength_um, aliases=the_band.aliases, gsd=the_band.gsd, ) else: new_bands = [Band(name=n) for n in target] return BandDimension(, bands=new_bands)
[docs] def rename(self, name) -> Dimension: return BandDimension(name=name, bands=self.bands)
class CubeMetadata: """ Interface for metadata of a data cube. Allows interaction with the cube dimensions and their labels (if available). """ def __init__(self, dimensions: Optional[List[Dimension]] = None): # Original collection metadata (actual cube metadata might be altered through processes) self._dimensions = dimensions self._band_dimension = None self._temporal_dimension = None if dimensions is not None: for dim in self._dimensions: # TODO: here we blindly pick last bands or temporal dimension if multiple. Let user choose? # TODO: add spacial dimension handling? if dim.type == "bands": if isinstance(dim, BandDimension): self._band_dimension = dim else: raise MetadataException("Invalid band dimension {d!r}".format(d=dim)) if dim.type == "temporal": if isinstance(dim, TemporalDimension): self._temporal_dimension = dim else: raise MetadataException("Invalid temporal dimension {d!r}".format(d=dim)) def __eq__(self, o: Any) -> bool: return isinstance(o, type(self)) and self._dimensions == o._dimensions def _clone_and_update(self, dimensions: Optional[List[Dimension]] = None, **kwargs) -> CubeMetadata: """Create a new instance (of same class) with copied/updated fields.""" cls = type(self) if dimensions is None: dimensions = self._dimensions return cls(dimensions=dimensions, **kwargs) def dimension_names(self) -> List[str]: return list( for d in self._dimensions) def assert_valid_dimension(self, dimension: str) -> str: """Make sure given dimension name is valid.""" names = self.dimension_names() if dimension not in names: raise ValueError(f"Invalid dimension {dimension!r}. Should be one of {names}") return dimension def has_band_dimension(self) -> bool: return isinstance(self._band_dimension, BandDimension) @property def band_dimension(self) -> BandDimension: """Dimension corresponding to spectral/logic/thematic "bands".""" if not self.has_band_dimension(): raise MetadataException("No band dimension") return self._band_dimension def has_temporal_dimension(self) -> bool: return isinstance(self._temporal_dimension, TemporalDimension) @property def temporal_dimension(self) -> TemporalDimension: if not self.has_temporal_dimension(): raise MetadataException("No temporal dimension") return self._temporal_dimension @property def spatial_dimensions(self) -> List[SpatialDimension]: return [d for d in self._dimensions if isinstance(d, SpatialDimension)] @property def bands(self) -> List[Band]: """Get band metadata as list of Band metadata tuples""" return self.band_dimension.bands @property def band_names(self) -> List[str]: """Get band names of band dimension""" return self.band_dimension.band_names @property def band_common_names(self) -> List[str]: return self.band_dimension.common_names def get_band_index(self, band: Union[int, str]) -> int: # TODO: eliminate this shortcut for smaller API surface return self.band_dimension.band_index(band) def filter_bands(self, band_names: List[Union[int, str]]) -> CubeMetadata: """ Create new `CubeMetadata` with filtered band dimension :param band_names: list of band names/indices to keep :return: """ assert self.band_dimension return self._clone_and_update( dimensions=[d.filter_bands(band_names) if isinstance(d, BandDimension) else d for d in self._dimensions] ) def append_band(self, band: Band) -> CubeMetadata: """ Create new `CubeMetadata` with given band added to band dimension. """ assert self.band_dimension return self._clone_and_update( dimensions=[d.append_band(band) if isinstance(d, BandDimension) else d for d in self._dimensions] ) def rename_labels(self, dimension: str, target: list, source: list = None) -> CubeMetadata: """ Renames the labels of the specified dimension from source to target. :param dimension: Dimension name :param target: The new names for the labels. :param source: The names of the labels as they are currently in the data cube. :return: Updated metadata """ self.assert_valid_dimension(dimension) loc = self.dimension_names().index(dimension) new_dimensions = self._dimensions.copy() new_dimensions[loc] = new_dimensions[loc].rename_labels(target, source) return self._clone_and_update(dimensions=new_dimensions) def rename_dimension(self, source: str, target: str) -> CubeMetadata: """ Rename source dimension into target, preserving other properties """ self.assert_valid_dimension(source) loc = self.dimension_names().index(source) new_dimensions = self._dimensions.copy() new_dimensions[loc] = new_dimensions[loc].rename(target) return self._clone_and_update(dimensions=new_dimensions) def reduce_dimension(self, dimension_name: str) -> CubeMetadata: """Create new CubeMetadata object by collapsing/reducing a dimension.""" # TODO: option to keep reduced dimension (with a single value)? # TODO: rename argument to `name` for more internal consistency # TODO: merge with drop_dimension (which does the same). self.assert_valid_dimension(dimension_name) loc = self.dimension_names().index(dimension_name) dimensions = self._dimensions[:loc] + self._dimensions[loc + 1 :] return self._clone_and_update(dimensions=dimensions) def reduce_spatial(self) -> CubeMetadata: """Create new CubeMetadata object by reducing the spatial dimensions.""" dimensions = [d for d in self._dimensions if not isinstance(d, SpatialDimension)] return self._clone_and_update(dimensions=dimensions) def add_dimension(self, name: str, label: Union[str, float], type: str = None) -> CubeMetadata: """Create new CubeMetadata object with added dimension""" if any( == name for d in self._dimensions): raise DimensionAlreadyExistsException(f"Dimension with name {name!r} already exists") if type == "bands": dim = BandDimension(name=name, bands=[Band(name=label)]) elif type == "spatial": dim = SpatialDimension(name=name, extent=[label, label]) elif type == "temporal": dim = TemporalDimension(name=name, extent=[label, label]) else: dim = Dimension(type=type or "other", name=name) return self._clone_and_update(dimensions=self._dimensions + [dim]) def drop_dimension(self, name: str = None) -> CubeMetadata: """Create new CubeMetadata object without dropped dimension with given name""" dimension_names = self.dimension_names() if name not in dimension_names: raise ValueError("No dimension named {n!r} (valid names: {ns!r})".format(n=name, ns=dimension_names)) return self._clone_and_update(dimensions=[d for d in self._dimensions if not == name]) def __str__(self) -> str: bands = self.band_names if self.has_band_dimension() else "no bands dimension" return f"CubeMetadata({bands} - {self.dimension_names()})"
[docs] class CollectionMetadata(CubeMetadata): """ Wrapper for EO Data Collection metadata. Simplifies getting values from deeply nested mappings, allows additional parsing and normalizing compatibility issues. Metadata is expected to follow format defined by (with partial support for older versions) """ def __init__(self, metadata: dict, dimensions: List[Dimension] = None): self._orig_metadata = metadata if dimensions is None: dimensions = self._parse_dimensions(self._orig_metadata) super().__init__(dimensions=dimensions) @classmethod def _parse_dimensions(cls, spec: dict, complain: Callable[[str], None] = warnings.warn) -> List[Dimension]: """ Extract data cube dimension metadata from STAC-like description of a collection. Dimension metadata comes from different places in spec: - 'cube:dimensions' has dimension names (e.g. 'x', 'y', 't'), dimension extent info and band names for band dimensions - 'eo:bands' has more detailed band information like "common" name and wavelength info This helper tries to normalize/combine these sources. :param spec: STAC like collection metadata dict :param complain: handler for warnings :return list: list of `Dimension` objects """ # Dimension info is in `cube:dimensions` (or 0.4-style `properties/cube:dimensions`) cube_dimensions = ( deep_get(spec, "cube:dimensions", default=None) or deep_get(spec, "properties", "cube:dimensions", default=None) or {} ) if not cube_dimensions: complain("No cube:dimensions metadata") dimensions = [] for name, info in cube_dimensions.items(): dim_type = info.get("type") if dim_type == "spatial": dimensions.append( SpatialDimension( name=name, extent=info.get("extent"), crs=info.get("reference_system", SpatialDimension.DEFAULT_CRS), step=info.get("step", None), ) ) elif dim_type == "temporal": dimensions.append(TemporalDimension(name=name, extent=info.get("extent"))) elif dim_type == "bands": bands = [Band(name=b) for b in info.get("values", [])] if not bands: complain("No band names in dimension {d!r}".format(d=name)) dimensions.append(BandDimension(name=name, bands=bands)) else: complain("Unknown dimension type {t!r}".format(t=dim_type)) dimensions.append(Dimension(name=name, type=dim_type)) # Detailed band information: `summaries/[eo|raster]:bands` (and 0.4 style `properties/eo:bands`) eo_bands = ( deep_get(spec, "summaries", "eo:bands", default=None) or deep_get(spec, "summaries", "raster:bands", default=None) or deep_get(spec, "properties", "eo:bands", default=None) ) if eo_bands: # center_wavelength is in micrometer according to spec bands_detailed = [ Band( name=b["name"], common_name=b.get("common_name"), wavelength_um=b.get("center_wavelength"), aliases=b.get("aliases"), gsd=b.get("openeo:gsd"), ) for b in eo_bands ] # Update band dimension with more detailed info band_dimensions = [d for d in dimensions if d.type == "bands"] if len(band_dimensions) == 1: dim = band_dimensions[0] # Update band values from 'cube:dimensions' with more detailed 'eo:bands' info eo_band_names = [ for b in bands_detailed] cube_dimension_band_names = [ for b in dim.bands] if eo_band_names == cube_dimension_band_names: dim.bands = bands_detailed else: complain("Band name mismatch: {a} != {b}".format(a=cube_dimension_band_names, b=eo_band_names)) elif len(band_dimensions) == 0: if len(dimensions) == 0: complain("Assuming name 'bands' for anonymous band dimension.") dimensions.append(BandDimension(name="bands", bands=bands_detailed)) else: complain("No 'bands' dimension in 'cube:dimensions' while having 'eo:bands' or 'raster:bands'") else: complain("Multiple dimensions of type 'bands'") return dimensions def _clone_and_update( self, metadata: dict = None, dimensions: List[Dimension] = None, **kwargs ) -> CollectionMetadata: """ Create a new instance (of same class) with copied/updated fields. This overrides the method in `CubeMetadata` to keep the original metadata. """ cls = type(self) if metadata is None: metadata = self._orig_metadata if dimensions is None: dimensions = self._dimensions return cls(metadata=metadata, dimensions=dimensions, **kwargs) def get(self, *args, default=None): return deep_get(self._orig_metadata, *args, default=default) @property def extent(self) -> dict: # TODO: is this currently used and relevant? # TODO: check against extent metadata in dimensions return self._orig_metadata.get("extent") def _repr_html_(self): return render_component("collection", data=self._orig_metadata) def __str__(self) -> str: bands = self.band_names if self.has_band_dimension() else "no bands dimension" return f"CollectionMetadata({self.extent} - {bands} - {self.dimension_names()})"
def metadata_from_stac(url: str) -> CubeMetadata: """ Reads the band metadata a static STAC catalog or a STAC API Collection and returns it as a :py:class:`CubeMetadata` :param url: The URL to a static STAC catalog (STAC Item, STAC Collection, or STAC Catalog) or a specific STAC API Collection :return: A :py:class:`CubeMetadata` containing the DataCube band metadata from the url. """ def get_band_metadata(eo_bands_location: dict) -> List[Band]: # TODO: return None iso empty list when no metadata? return [ Band(name=band["name"], common_name=band.get("common_name"), wavelength_um=band.get("center_wavelength")) for band in eo_bands_location.get("eo:bands", []) ] def get_band_names(bands: List[Band]) -> List[str]: return [ for band in bands] def is_band_asset(asset: pystac.Asset) -> bool: return "eo:bands" in asset.extra_fields stac_object = pystac.read_file(href=url) if isinstance(stac_object, pystac.Item): item = stac_object if "eo:bands" in eo_bands_location = elif item.get_collection() is not None: # TODO: Also do asset based band detection (like below)? eo_bands_location = item.get_collection().summaries.lists else: eo_bands_location = {} bands = get_band_metadata(eo_bands_location) elif isinstance(stac_object, pystac.Collection): collection = stac_object bands = get_band_metadata(collection.summaries.lists) # Summaries is not a required field in a STAC collection, so also check the assets for itm in collection.get_items(): band_assets = {asset_id: asset for asset_id, asset in itm.get_assets().items() if is_band_asset(asset)} for asset in band_assets.values(): asset_bands = get_band_metadata(asset.extra_fields) for asset_band in asset_bands: if not in get_band_names(bands): bands.append(asset_band) elif isinstance(stac_object, pystac.Catalog): catalog = stac_object bands = get_band_metadata(catalog.extra_fields.get("summaries", {})) else: raise ValueError(stac_object) # TODO: conditionally include band dimension when there was actual indication of band metadata? band_dimension = BandDimension(name="bands", bands=bands) metadata = CubeMetadata(dimensions=[band_dimension]) return metadata