Source code for openeo.extra.spectral_indices.spectral_indices

import functools
import json
import re
from typing import Dict, List, Optional, Set

from openeo import BaseOpenEoException
from openeo.processes import ProcessBuilder, array_create, array_modify
from openeo.rest.datacube import DataCube

try:
    import importlib_resources
except ImportError:
    import importlib.resources as importlib_resources


@functools.lru_cache(maxsize=1)
def load_indices() -> Dict[str, dict]:
    """Load set of supported spectral indices."""
    # TODO: encapsulate all this json loading in a single Awesome Spectral Indices registry class?
    specs = {}

    for path in [
        "resources/awesome-spectral-indices/spectral-indices-dict.json",
        # TODO #506 Deprecate extra-indices-dict.json as a whole
        #      and provide an alternative mechanism to work with custom indices
        "resources/extra-indices-dict.json",
    ]:
        with importlib_resources.files("openeo.extra.spectral_indices") / path as resource_path:
            data = json.loads(resource_path.read_text(encoding="utf8"))
            overwrites = set(specs.keys()).intersection(data["SpectralIndices"].keys())
            if overwrites:
                raise RuntimeError(f"Duplicate spectral indices: {overwrites} from {path}")
            specs.update(data["SpectralIndices"])

    return specs


@functools.lru_cache(maxsize=1)
def load_constants() -> Dict[str, float]:
    """Load constants defined by Awesome Spectral Indices."""
    # TODO: encapsulate all this json loading in a single Awesome Spectral Indices registry class?
    with importlib_resources.files(
        "openeo.extra.spectral_indices"
    ) / "resources/awesome-spectral-indices/constants.json" as resource_path:
        data = json.loads(resource_path.read_text(encoding="utf8"))

    return {k: v["default"] for k, v in data.items() if isinstance(v["default"], (int, float))}


@functools.lru_cache(maxsize=1)
def _load_bands() -> Dict[str, dict]:
    """Load band name mapping defined by Awesome Spectral Indices."""
    # TODO: encapsulate all this json loading in a single Awesome Spectral Indices registry class?
    with importlib_resources.files(
        "openeo.extra.spectral_indices"
    ) / "resources/awesome-spectral-indices/bands.json" as resource_path:
        data = json.loads(resource_path.read_text(encoding="utf8"))
    return data


class BandMappingException(BaseOpenEoException):
    """Failure to determine band-variable mapping."""


class _BandMapping:
    """
    Helper class to extract mappings between band names and variable names used in Awesome Spectral Indices formulas.
    """

    _EXTRA = {
        "sentinel1": {"HH": "HH", "HV": "HV", "VH": "VH", "VV": "VV"},
    }

    def __init__(self):
        # Load bands.json from Awesome Spectral Indices
        self._band_data = _load_bands()

    @staticmethod
    def _normalize_platform(platform: str) -> str:
        platform = platform.lower().replace("-", "").replace(" ", "")
        if platform in {"sentinel2a", "sentinel2b"}:
            platform = "sentinel2"
        return platform

    @staticmethod
    def _normalize_band_name(band_name: str) -> str:
        band_name = band_name.upper()
        # Normalize band names like "B01" to "B1"
        band_name = re.sub(r"^B0+(\d+)$", r"B\1", band_name)
        return band_name

    @functools.lru_cache(maxsize=1)
    def get_platforms(self) -> Set[str]:
        """Get list of supported (normalized) satellite platforms."""
        platforms = {p for var_data in self._band_data.values() for p in var_data.get("platforms", {}).keys()}
        platforms.update(self._EXTRA.keys())
        platforms.update({self._normalize_platform(p) for p in platforms})
        return platforms

    def guess_platform(self, name: str) -> str:
        """Guess platform from given collection id or name."""
        # First check original id, then retry with removed separators as last resort.
        for haystack in [name.lower(), re.sub("[_ -]", "", name.lower())]:
            for platform in sorted(self.get_platforms(), key=len, reverse=True):
                if platform in haystack:
                    return platform
        raise BandMappingException(f"Unable to guess satellite platform from id {name!r}.")

    def variable_to_band_name_map(self, platform: str) -> Dict[str, str]:
        """
        Build mapping from Awesome Spectral Indices variable names to (normalized) band names for given satellite platform.
        """
        platform_normalized = self._normalize_platform(platform)
        if platform_normalized in self._EXTRA:
            return self._EXTRA[platform_normalized]

        var_to_band = {
            var: pf_data["band"]
            for var, var_data in self._band_data.items()
            for pf, pf_data in var_data.get("platforms", {}).items()
            if self._normalize_platform(pf) == platform_normalized
        }
        if not var_to_band:
            raise BandMappingException(f"Empty band mapping derived for satellite platform {platform!r}")
        return var_to_band

    def actual_band_name_to_variable_map(self, platform: str, band_names: List[str]) -> Dict[str, str]:
        """Build mapping from actual band names (as given) to Awesome Spectral Indices variable names."""
        var_to_band = self.variable_to_band_name_map(platform=platform)
        band_to_var = {
            band_name: var
            for var, normalized_band_name in var_to_band.items()
            for band_name in band_names
            if self._normalize_band_name(band_name) == normalized_band_name
        }
        return band_to_var


[docs] def list_indices() -> List[str]: """List names of supported spectral indices""" specs = load_indices() return list(specs.keys())
def _check_params(item, params): range_vals = ["input_range", "output_range"] if set(params) != set(range_vals): raise ValueError( f"You have set the parameters {params} on {item}, while the following are required {range_vals}" ) for rng in range_vals: if params[rng] is None: continue if len(params[rng]) != 2: raise ValueError( f"The list of provided values {params[rng]} for parameter {rng} for {item} is not of length 2" ) # TODO: allow float too? if not all(isinstance(val, int) for val in params[rng]): raise ValueError("The ranges you supplied are not all of type int") if (params["input_range"] is None) != (params["output_range"] is None): raise ValueError(f"The index_range and output_range of {item} should either be both supplied, or both None") def _check_validity_index_dict(index_dict: dict, index_specs: dict): # TODO: this `index_dict` API needs some more rethinking: # - the dictionary has no explicit order of indices, which can be important for end user # - allow "collection" to be missing (e.g. if no rescaling is desired, or input data is not kept)? # - option to define default output range, instead of having it to specify it for each index? # - keep "rescaling" feature separate/orthogonal from "spectral indices" feature. It could be useful as # a more generic machine learning data preparation feature input_vals = ["collection", "indices"] if set(index_dict.keys()) != set(input_vals): raise ValueError( f"The first level of the dictionary should contain the keys 'collection' and 'indices', but they contain {index_dict.keys()}" ) _check_params("collection", index_dict["collection"]) for index, params in index_dict["indices"].items(): if index not in index_specs.keys(): raise NotImplementedError("Index " + index + " is not supported.") _check_params(index, params) def _callback( x: ProcessBuilder, index_dict: dict, index_specs: dict, append: bool, band_names: List[str], band_to_var: Dict[str, str], ) -> ProcessBuilder: index_values = [] x_res = x # TODO: use `label` parameter of `array_element` to avoid index based band references variables = {band_to_var[bn]: x.array_element(i) for i, bn in enumerate(band_names) if bn in band_to_var} eval_globals = { **load_constants(), **variables, } # TODO: user might want to control order of indices, which is tricky through a dictionary. for index, params in index_dict["indices"].items(): index_result = eval(index_specs[index]["formula"], eval_globals) if params["input_range"] is not None: index_result = index_result.linear_scale_range(*params["input_range"], *params["output_range"]) index_values.append(index_result) if index_dict["collection"]["input_range"] is not None: x_res = x_res.linear_scale_range( *index_dict["collection"]["input_range"], *index_dict["collection"]["output_range"] ) if append: return array_modify(data=x_res, values=index_values, index=len(band_names)) else: return array_create(data=index_values)
[docs] def compute_and_rescale_indices( datacube: DataCube, index_dict: dict, *, append: bool = False, variable_map: Optional[Dict[str, str]] = None, platform: Optional[str] = None, ) -> DataCube: """ Computes a list of indices from a data cube :param datacube: input data cube :param index_dict: a dictionary that contains the input- and output range of the collection on which you calculate the indices as well as the indices that you want to calculate with their responding input- and output ranges It follows the following format:: { "collection": { "input_range": [0,8000], "output_range": [0,250] }, "indices": { "NDVI": { "input_range": [-1,1], "output_range": [0,250] }, } } If you don't want to rescale your data, you can fill the input-, index- and output-range with ``None``. See `list_indices()` for supported indices. :param append: append the indices as bands to the given data cube instead of creating a new cube with only the calculated indices :param variable_map: (optional) mapping from Awesome Spectral Indices formula variable to actual cube band names. To be specified if the given data cube has non-standard band names, or the satellite platform can not be recognized from the data cube metadata. See :ref:`spectral_indices_manual_band_mapping` for more information. :param platform: optionally specify the satellite platform (to determine band name mapping) if the given data cube has no or an unhandled collection id in its metadata. See :ref:`spectral_indices_manual_band_mapping` for more information. :return: the datacube with the indices attached as bands .. warning:: this "rescaled" index helper uses an experimental API (e.g. `index_dict` argument) that is subject to change. .. versionadded:: 0.26.0 Added `variable_map` and `platform` arguments. """ index_specs = load_indices() _check_validity_index_dict(index_dict, index_specs) if variable_map is None: # Automatic band mapping band_mapping = _BandMapping() if platform is None: if datacube.metadata and datacube.metadata.get("id"): platform = band_mapping.guess_platform(name=datacube.metadata.get("id")) else: raise BandMappingException("Unable to determine satellite platform from data cube metadata") band_to_var = band_mapping.actual_band_name_to_variable_map( platform=platform, band_names=datacube.metadata.band_names ) else: band_to_var = {b: v for v, b in variable_map.items()} res = datacube.apply_dimension( dimension="bands", process=lambda x: _callback( x, index_dict=index_dict, index_specs=index_specs, append=append, band_names=datacube.metadata.band_names, band_to_var=band_to_var, ), ) if append: return res.rename_labels("bands", target=datacube.metadata.band_names + list(index_dict["indices"].keys())) else: return res.rename_labels("bands", target=list(index_dict["indices"].keys()))
[docs] def append_and_rescale_indices( datacube: DataCube, index_dict: dict, *, variable_map: Optional[Dict[str, str]] = None, platform: Optional[str] = None, ) -> DataCube: """ Computes a list of indices from a datacube and appends them to the existing datacube :param datacube: input data cube :param index_dict: a dictionary that contains the input- and output range of the collection on which you calculate the indices as well as the indices that you want to calculate with their responding input- and output ranges It follows the following format:: { "collection": { "input_range": [0,8000], "output_range": [0,250] }, "indices": { "NDVI": { "input_range": [-1,1], "output_range": [0,250] }, } } See `list_indices()` for supported indices. :param variable_map: (optional) mapping from Awesome Spectral Indices formula variable to actual cube band names. To be specified if the given data cube has non-standard band names, or the satellite platform can not be recognized from the data cube metadata. See :ref:`spectral_indices_manual_band_mapping` for more information. :param platform: optionally specify the satellite platform (to determine band name mapping) if the given data cube has no or an unhandled collection id in its metadata. See :ref:`spectral_indices_manual_band_mapping` for more information. :return: data cube with appended indices .. warning:: this "rescaled" index helper uses an experimental API (e.g. `index_dict` argument) that is subject to change. .. versionadded:: 0.26.0 Added `variable_map` and `platform` arguments. """ return compute_and_rescale_indices( datacube=datacube, index_dict=index_dict, append=True, variable_map=variable_map, platform=platform )
[docs] def compute_indices( datacube: DataCube, indices: List[str], *, append: bool = False, variable_map: Optional[Dict[str, str]] = None, platform: Optional[str] = None, ) -> DataCube: """ Compute multiple spectral indices from the given data cube. :param datacube: input data cube :param indices: list of names of the indices to compute and append. See `list_indices()` for supported indices. :param append: append the indices as bands to the given data cube instead of creating a new cube with only the calculated indices :param variable_map: (optional) mapping from Awesome Spectral Indices formula variable to actual cube band names. To be specified if the given data cube has non-standard band names, or the satellite platform can not be recognized from the data cube metadata. See :ref:`spectral_indices_manual_band_mapping` for more information. :param platform: optionally specify the satellite platform (to determine band name mapping) if the given data cube has no or an unhandled collection id in its metadata. See :ref:`spectral_indices_manual_band_mapping` for more information. :return: data cube containing the indices as bands .. versionadded:: 0.26.0 Added `variable_map` and `platform` arguments. """ # TODO: it's bit weird to have to specify all these None's in this structure index_dict = { "collection": { "input_range": None, "output_range": None, }, "indices": {index: {"input_range": None, "output_range": None} for index in indices}, } return compute_and_rescale_indices( datacube=datacube, index_dict=index_dict, append=append, variable_map=variable_map, platform=platform )
[docs] def append_indices( datacube: DataCube, indices: List[str], *, variable_map: Optional[Dict[str, str]] = None, platform: Optional[str] = None, ) -> DataCube: """ Compute multiple spectral indices and append them to the given data cube. :param datacube: input data cube :param indices: list of names of the indices to compute and append. See `list_indices()` for supported indices. :param variable_map: (optional) mapping from Awesome Spectral Indices formula variable to actual cube band names. To be specified if the given data cube has non-standard band names, or the satellite platform can not be recognized from the data cube metadata. See :ref:`spectral_indices_manual_band_mapping` for more information. :param platform: optionally specify the satellite platform (to determine band name mapping) if the given data cube has no or an unhandled collection id in its metadata. See :ref:`spectral_indices_manual_band_mapping` for more information. :return: data cube with appended indices .. versionadded:: 0.26.0 Added `variable_map` and `platform` arguments. """ return compute_indices( datacube=datacube, indices=indices, append=True, variable_map=variable_map, platform=platform )
[docs] def compute_index( datacube: DataCube, index: str, *, variable_map: Optional[Dict[str, str]] = None, platform: Optional[str] = None ) -> DataCube: """ Compute a single spectral index from a data cube. :param datacube: input data cube :param index: name of the index to compute. See `list_indices()` for supported indices. :param variable_map: (optional) mapping from Awesome Spectral Indices formula variable to actual cube band names. To be specified if the given data cube has non-standard band names, or the satellite platform can not be recognized from the data cube metadata. See :ref:`spectral_indices_manual_band_mapping` for more information. :param platform: optionally specify the satellite platform (to determine band name mapping) if the given data cube has no or an unhandled collection id in its metadata. See :ref:`spectral_indices_manual_band_mapping` for more information. :return: data cube containing the index as band .. versionadded:: 0.26.0 Added `variable_map` and `platform` arguments. """ # TODO: option to compute the index with `reduce_dimension` instead of `apply_dimension`? return compute_indices( datacube=datacube, indices=[index], append=False, variable_map=variable_map, platform=platform )
[docs] def append_index( datacube: DataCube, index: str, *, variable_map: Optional[Dict[str, str]] = None, platform: Optional[str] = None ) -> DataCube: """ Compute a single spectral index and append it to the given data cube. :param cube: input data cube :param index: name of the index to compute and append. See `list_indices()` for supported indices. :param variable_map: (optional) mapping from Awesome Spectral Indices formula variable to actual cube band names. To be specified if the given data cube has non-standard band names, or the satellite platform can not be recognized from the data cube metadata. See :ref:`spectral_indices_manual_band_mapping` for more information. :param platform: optionally specify the satellite platform (to determine band name mapping) if the given data cube has no or an unhandled collection id in its metadata. See :ref:`spectral_indices_manual_band_mapping` for more information. :return: data cube with appended index .. versionadded:: 0.26.0 Added `variable_map` and `platform` arguments. """ return compute_indices( datacube=datacube, indices=[index], append=True, variable_map=variable_map, platform=platform )