Source code for openeo.extra.artifacts.artifact_helper
from typing import Dict, Optional, Type
from openeo import Connection
from openeo.extra.artifacts._artifact_helper_abc import ArtifactHelperABC
from openeo.extra.artifacts._backend import ArtifactCapabilities
from openeo.extra.artifacts._config import ArtifactsStorageConfigABC
# noinspection PyProtectedMember
from openeo.extra.artifacts._s3sts.artifact_helper import S3STSArtifactHelper
# noinspection PyProtectedMember
from openeo.extra.artifacts._s3sts.config import S3STSConfig
from openeo.extra.artifacts.exceptions import UnsupportedArtifactsType
config_to_helper: Dict[Type[ArtifactsStorageConfigABC], Type[ArtifactHelperABC]] = {S3STSConfig: S3STSArtifactHelper}
config_type_to_helper: Dict[str, Type[ArtifactHelperABC]] = {
ArtifactsStorageConfigABC.get_type_from(cfg): helper for cfg, helper in config_to_helper.items()
}
[docs]
def build_artifact_helper(
connection: Connection, config: Optional[ArtifactsStorageConfigABC] = None
) -> ArtifactHelperABC:
"""
:param connection: ``openeo.Connection`` connection to an openEOBackend
:param config: Optional **This parameter should only be used when instructed by the maintainer of the OpenEO
backend.** object to specify configuration for Artifact storage. If omitted the helper will try to
get the preferred config as advertised by the OpenEO backend.
:return: An Artifact helper instance that can be used to manage artifacts
"""
if config is None:
config_type = ArtifactCapabilities(connection).get_preferred_artifacts_provider().get_type()
else:
config_type = config.get_type()
try:
artifact_helper = config_type_to_helper[config_type]
return artifact_helper.from_openeo_connection(
connection, ArtifactCapabilities(connection).get_preferred_artifacts_provider(), config=config
)
except KeyError as ke:
raise UnsupportedArtifactsType(config_type) from ke