const Environment = require('./env');
const Utils = require('@openeo/js-commons/src/utils');
const ProcessRegistry = require('@openeo/js-commons/src/processRegistry');
const axios = require('axios');
const StacMigrate = require('@radiantearth/stac-migrate');
const AuthProvider = require('./authprovider');
const BasicProvider = require('./basicprovider');
const OidcProvider = require('./oidcprovider');
const Capabilities = require('./capabilities');
const FileTypes = require('./filetypes');
const UserFile = require('./userfile');
const Job = require('./job');
const UserProcess = require('./userprocess');
const Service = require('./service');
const Builder = require('./builder/builder');
const BuilderNode = require('./builder/node');
const CONFORMANCE_RELS = [
'conformance',
'http://www.opengis.net/def/rel/ogc/1.0/conformance'
];
/**
* A connection to a back-end.
*/
class Connection {
/**
* Creates a new Connection.
*
* @param {string} baseUrl - The versioned URL or the back-end instance.
* @param {Options} [options={}] - Additional options for the connection.
* @param {?string} [url=null] - User-provided URL of the backend connected to.
*/
constructor(baseUrl, options = {}, url = null) {
/**
* User-provided URL of the backend connected to.
*
* `null` if not given and the connection was directly made to a versioned instance of the back-end.
*
* @protected
* @type {string | null}
*/
this.url = url;
/**
* The versioned URL or the back-end instance.
*
* @protected
* @type {string}
*/
this.baseUrl = Utils.normalizeUrl(baseUrl);
/**
* Auth Provider cache
*
* @protected
* @type {Array.<AuthProvider> | null}
*/
this.authProviderList = null;
/**
* Current auth provider
*
* @protected
* @type {AuthProvider | null}
*/
this.authProvider = null;
/**
* Capability cache
*
* @protected
* @type {Capabilities | null}
*/
this.capabilitiesObject = null;
/**
* Listeners for events.
*
* @protected
* @type {object.<string|Function>}
*/
this.listeners = {};
/**
* Additional options for the connection.
*
* @protected
* @type {Options}
*/
this.options = options;
/**
* Process cache
*
* @protected
* @type {ProcessRegistry}
*/
this.processes = new ProcessRegistry([], Boolean(options.addNamespaceToProcess));
this.processes.listeners.push((...args) => this.emit('processesChanged', ...args));
}
/**
* Initializes the connection by requesting the capabilities.
*
* @async
* @protected
* @returns {Promise<Capabilities>} Capabilities
* @throws {Error}
*/
async init() {
let response = await this._get('/');
let data = Object.assign({}, response.data);
data.links = this.makeLinksAbsolute(data.links, response);
if (!Array.isArray(data.conformsTo) && Array.isArray(data.links)) {
let conformanceLink = this._getLinkHref(data.links, CONFORMANCE_RELS);
if (conformanceLink) {
let response2 = await this._get(conformanceLink);
if (Utils.isObject(response2.data) && Array.isArray(response2.data.conformsTo)) {
data.conformsTo = response2.data.conformsTo;
}
}
}
this.capabilitiesObject = new Capabilities(data);
return this.capabilitiesObject;
}
/**
* Refresh the cache for processes.
*
* @async
* @protected
* @returns {Promise}
*/
async refreshProcessCache() {
if (this.processes.count() === 0) {
return;
}
let promises = this.processes.namespaces().map(namespace => {
let fn = () => Promise.resolve();
if (namespace === 'user') {
let userProcesses = this.processes.namespace('user');
if (!this.isAuthenticated()) {
fn = () => (this.processes.remove(null, 'user') ? Promise.resolve() : Promise.reject(new Error("Can't clear user processes")));
}
else if (this.capabilities().hasFeature('listUserProcesses')) {
fn = () => this.listUserProcesses(userProcesses);
}
}
else if (this.capabilities().hasFeature('listProcesses')) {
fn = () => this.listProcesses(namespace);
}
return fn().catch(error => console.warn(`Could not update processes for namespace '${namespace}' due to an error: ${error.message}`));
});
return await Promise.all(promises);
}
/**
* Returns the URL of the versioned back-end instance currently connected to.
*
* @returns {string} The versioned URL or the back-end instance.
*/
getBaseUrl() {
return this.baseUrl;
}
/**
* Returns the user-provided URL of the back-end currently connected to.
*
* @returns {string} The URL or the back-end.
*/
getUrl() {
return this.url || this.baseUrl;
}
/**
* Returns the capabilities of the back-end.
*
* @returns {Capabilities} Capabilities
*/
capabilities() {
return this.capabilitiesObject;
}
/**
* List the supported output file formats.
*
* @async
* @returns {Promise<FileTypes>} A response compatible to the API specification.
* @throws {Error}
*/
async listFileTypes() {
let response = await this._get('/file_formats');
return new FileTypes(response.data);
}
/**
* List the supported secondary service types.
*
* @async
* @returns {Promise<object.<string, ServiceType>>} A response compatible to the API specification.
* @throws {Error}
*/
async listServiceTypes() {
let response = await this._get('/service_types');
return response.data;
}
/**
* List the supported UDF runtimes.
*
* @async
* @returns {Promise<object.<string, UdfRuntime>>} A response compatible to the API specification.
* @throws {Error}
*/
async listUdfRuntimes() {
let response = await this._get('/udf_runtimes');
return response.data;
}
/**
* List all collections available on the back-end.
*
* The collections returned always comply to the latest STAC version (currently 1.0.0).
*
* @async
* @returns {Promise<Collections>} A response compatible to the API specification.
* @throws {Error}
*/
async listCollections() {
let response = await this._get('/collections');
if (Utils.isObject(response.data) && Array.isArray(response.data.collections)) {
response.data.collections = response.data.collections.map(collection => {
if (collection.stac_version) {
return StacMigrate.collection(collection);
}
return collection;
});
}
return response.data;
}
/**
* Get further information about a single collection.
*
* The collection returned always complies to the latest STAC version (currently 1.0.0).
*
* @async
* @param {string} collectionId - Collection ID to request further metadata for.
* @returns {Promise<Collection>} - A response compatible to the API specification.
* @throws {Error}
*/
async describeCollection(collectionId) {
let response = await this._get('/collections/' + collectionId);
if (response.data.stac_version) {
return StacMigrate.collection(response.data);
}
else {
return response.data;
}
}
/**
* Loads items for a specific image collection.
* May not be available for all collections.
*
* The items returned always comply to the latest STAC version (currently 1.0.0).
*
* This is an experimental API and is subject to change.
*
* @async
* @param {string} collectionId - Collection ID to request items for.
* @param {?Array.<number>} [spatialExtent=null] - Limits the items to the given bounding box in WGS84:
* 1. Lower left corner, coordinate axis 1
* 2. Lower left corner, coordinate axis 2
* 3. Upper right corner, coordinate axis 1
* 4. Upper right corner, coordinate axis 2
* @param {?Array} [temporalExtent=null] - Limits the items to the specified temporal interval.
* The interval has to be specified as an array with exactly two elements (start, end) and
* each must be either an RFC 3339 compatible string or a Date object.
* Also supports open intervals by setting one of the boundaries to `null`, but never both.
* @param {?number} [limit=null] - The amount of items per request/page as integer. If `null` (default), the back-end decides.
* @yields {Promise<ItemCollection>} A response compatible to the API specification.
* @throws {Error}
*/
async * listCollectionItems(collectionId, spatialExtent = null, temporalExtent = null, limit = null) {
let page = 1;
let nextUrl = '/collections/' + collectionId + '/items';
while(nextUrl) {
let params = {};
if (page === 1) {
if (Array.isArray(spatialExtent)) {
params.bbox = spatialExtent.join(',');
}
if (Array.isArray(temporalExtent)) {
params.datetime = temporalExtent
.map(e => {
if (e instanceof Date) {
return e.toISOString();
}
else if (typeof e === 'string') {
return e;
}
return '..'; // Open date range
})
.join('/');
}
if (limit > 0) {
params.limit = limit;
}
}
let response = await this._get(nextUrl, params);
if (Utils.isObject(response.data) && Array.isArray(response.data.features)) {
response.data.features = response.data.features.map(item => {
if (item.stac_version) {
return StacMigrate.item(item);
}
return item;
});
}
yield response.data;
page++;
let links = this.makeLinksAbsolute(response.data.links);
nextUrl = this._getLinkHref(links, 'next');
}
}
/**
* Normalisation of the namespace to a value that is compatible with the OpenEO specs - EXPERIMENTAL.
*
* This is required to support UDP that are shared as public. These can only be executed with providing the full URL
* (e.g. https://<backend>/processes/<namespace>/<process_id>) as the namespace value in the processing graph. For other
* parts of the API (such as the listing of the processes, only the name of the namespace is required.
*
* This function will extract the short name of the namespace from a shareable URL.
*
* @protected
* @param {?string} namespace - Namespace of the process
* @returns {?string}
*/
normalizeNamespace(namespace) {
// The pattern in https://github.com/Open-EO/openeo-api/pull/348 doesn't include the double colon yet - the regexp may change in the future
const matches = namespace.match( /^https?:\/\/.*\/processes\/(@?[\w\-.~:]+)\/?/i);
return matches && matches.length > 1 ? matches[1] : namespace;
}
/**
* List processes available on the back-end.
*
* Requests pre-defined processes by default.
* Set the namespace parameter to request processes from a specific namespace.
*
* Note: The list of namespaces can be retrieved by calling `listProcesses` without a namespace given.
* The namespaces are then listed in the property `namespaces`.
*
* @async
* @param {?string} [namespace=null] - Namespace of the processes (default to `null`, i.e. pre-defined processes). EXPERIMENTAL!
* @returns {Promise<Processes>} - A response compatible to the API specification.
* @throws {Error}
*/
async listProcesses(namespace = null) {
if (!namespace) {
namespace = 'backend';
}
let path = (namespace === 'backend') ? '/processes' : `/processes/${this.normalizeNamespace(namespace)}`;
let response = await this._get(path);
if (!Utils.isObject(response.data) || !Array.isArray(response.data.processes)) {
throw new Error('Invalid response received for processes');
}
// Store processes in cache
this.processes.remove(null, namespace);
this.processes.addAll(response.data.processes, namespace);
return Object.assign(response.data, {processes: this.processes.namespace(namespace)});
}
/**
* Get information about a single process.
*
* @async
* @param {string} processId - Collection ID to request further metadata for.
* @param {?string} [namespace=null] - Namespace of the process (default to `null`, i.e. pre-defined processes). EXPERIMENTAL!
* @returns {Promise<?Process>} - A single process as object, or `null` if none is found.
* @throws {Error}
* @see Connection#listProcesses
*/
async describeProcess(processId, namespace = null) {
if (!namespace) {
namespace = 'backend';
}
if (namespace === 'backend') {
await this.listProcesses();
}
else {
let response = await this._get(`/processes/${this.normalizeNamespace(namespace)}/${processId}`);
if (!Utils.isObject(response.data) || typeof response.data.id !== 'string') {
throw new Error('Invalid response received for process');
}
this.processes.add(response.data, namespace);
}
return this.processes.get(processId, namespace);
}
/**
* Returns an object to simply build user-defined processes based upon pre-defined processes.
*
* @async
* @param {string} id - A name for the process.
* @returns {Promise<Builder>}
* @throws {Error}
* @see Connection#listProcesses
*/
async buildProcess(id) {
await this.listProcesses();
return new Builder(this.processes, null, id);
}
/**
* List all authentication methods supported by the back-end.
*
* @async
* @returns {Promise<Array.<AuthProvider>>} An array containing all supported AuthProviders (including all OIDC providers and HTTP Basic).
* @throws {Error}
* @see AuthProvider
*/
async listAuthProviders() {
if (this.authProviderList !== null) {
return this.authProviderList;
}
this.authProviderList = [];
let cap = this.capabilities();
// Add OIDC providers
if (cap.hasFeature('authenticateOIDC')) {
let res = await this._get('/credentials/oidc');
let oidcFactory = this.getOidcProviderFactory();
if (Utils.isObject(res.data) && Array.isArray(res.data.providers) && typeof oidcFactory === 'function') {
for(let i in res.data.providers) {
let obj = oidcFactory(res.data.providers[i]);
if (obj instanceof AuthProvider) {
this.authProviderList.push(obj);
}
}
}
}
// Add Basic provider
if (cap.hasFeature('authenticateBasic')) {
this.authProviderList.push(new BasicProvider(this));
}
return this.authProviderList;
}
/**
* This function is meant to create the OIDC providers used for authentication.
*
* The function gets passed a single argument that contains the
* provider information as provided by the API, e.g. having the properties
* `id`, `issuer`, `title` etc.
*
* The function must return an instance of AuthProvider or any derived class.
* May return `null` if the instance can't be created.
*
* @callback oidcProviderFactoryFunction
* @param {object.<string, *>} providerInfo - The provider information as provided by the API, having the properties `id`, `issuer`, `title` etc.
* @returns {AuthProvider | null}
*/
/**
* Sets a factory function that creates custom OpenID Connect provider instances.
*
* You only need to call this if you have implemented a new AuthProvider based
* on the AuthProvider interface (or OIDCProvider class), e.g. to use a
* OIDC library other than oidc-client-js.
*
* @param {?oidcProviderFactoryFunction} [providerFactoryFunc=null]
* @see AuthProvider
*/
setOidcProviderFactory(providerFactoryFunc) {
this.oidcProviderFactory = providerFactoryFunc;
}
/**
* Get the OpenID Connect provider factory.
*
* Returns `null` if OIDC is not supported by the client or an instance
* can't be created for whatever reason.
*
* @returns {oidcProviderFactoryFunction | null}
* @see AuthProvider
*/
getOidcProviderFactory() {
if (typeof this.oidcProviderFactory === 'function') {
return this.oidcProviderFactory;
}
else {
if (OidcProvider.isSupported()) {
return providerInfo => new OidcProvider(this, providerInfo);
}
else {
return null;
}
}
}
/**
* Authenticates with username and password against a back-end supporting HTTP Basic Authentication.
*
* DEPRECATED in favor of using `listAuthProviders` and `BasicProvider`.
*
* @async
* @deprecated
* @param {string} username
* @param {string} password
* @see BasicProvider
* @see Connection#listAuthProviders
*/
async authenticateBasic(username, password) {
let basic = new BasicProvider(this);
await basic.login(username, password);
}
/**
* Returns whether the user is authenticated (logged in) at the back-end or not.
*
* @returns {boolean} `true` if authenticated, `false` if not.
*/
isAuthenticated() {
return (this.authProvider !== null);
}
/**
* Emits the given event.
*
* @protected
* @param {string} event
* @param {...*} args
*/
emit(event, ...args) {
if (typeof this.listeners[event] === 'function') {
this.listeners[event](...args);
}
}
/**
* Registers a listener with the given event.
*
* Currently supported:
* - authProviderChanged(provider): Raised when the auth provider has changed.
* - tokenChanged(token): Raised when the access token has changed.
* - processesChanged(type, data, namespace): Raised when the process registry has changed (i.e. a process was added, updated or deleted).
*
* @param {string} event
* @param {Function} callback
*/
on(event, callback) {
this.listeners[event] = callback;
}
/**
* Removes a listener from the given event.
*
* @param {string} event
*/
off(event) {
delete this.listeners[event];
}
/**
* Returns the AuthProvider.
*
* @returns {AuthProvider | null}
*/
getAuthProvider() {
return this.authProvider;
}
/**
* Sets the AuthProvider.
*
* @param {AuthProvider} provider
*/
setAuthProvider(provider) {
if (provider === this.authProvider) {
return;
}
if (provider instanceof AuthProvider) {
this.authProvider = provider;
}
else {
this.authProvider = null;
}
this.emit('authProviderChanged', this.authProvider);
// Update process cache on auth changes: https://github.com/Open-EO/openeo-js-client/issues/55
this.refreshProcessCache();
}
/**
* Sets the authentication token for the connection.
*
* This creates a new custom `AuthProvider` with the given details and returns it.
* After calling this function you can make requests against the API.
*
* This is NOT recommended to use. Only use if you know what you are doing.
* It is recommended to authenticate through `listAuthProviders` or related functions.
*
* @param {string} type - The authentication type, e.g. `basic` or `oidc`.
* @param {string} providerId - The provider identifier. For OIDC the `id` of the provider.
* @param {string} token - The actual access token as given by the authentication method during the login process.
* @returns {AuthProvider}
*/
setAuthToken(type, providerId, token) {
let provider = new AuthProvider(type, this, {
id: providerId,
title: "Custom",
description: ""
});
provider.setToken(token);
this.setAuthProvider(provider);
return provider;
}
/**
* Get information about the authenticated user.
*
* Updates the User ID if available.
*
* @async
* @returns {Promise<UserAccount>} A response compatible to the API specification.
* @throws {Error}
*/
async describeAccount() {
let response = await this._get('/me');
return response.data;
}
/**
* Lists all files from the user workspace.
*
* @async
* @returns {Promise<ResponseArray.<UserFile>>} A list of files.
* @throws {Error}
*/
async listFiles() {
let response = await this._get('/files');
let files = response.data.files.map(
f => new UserFile(this, f.path).setAll(f)
);
return this._toResponseArray(files, response.data);
}
/**
* A callback that is executed on upload progress updates.
*
* @callback uploadStatusCallback
* @param {number} percentCompleted - The percent (0-100) completed.
* @param {UserFile} file - The file object corresponding to the callback.
*/
/**
* Uploads a file to the user workspace.
* If a file with the name exists, overwrites it.
*
* This method has different behaviour depending on the environment.
* In a nodeJS environment the source must be a path to a file as string.
* In a browser environment the source must be an object from a file upload form.
*
* @async
* @param {*} source - The source, see method description for details.
* @param {?string} [targetPath=null] - The target path on the server, relative to the user workspace. Defaults to the file name of the source file.
* @param {?uploadStatusCallback} [statusCallback=null] - Optionally, a callback that is executed on upload progress updates.
* @param {?AbortController} [abortController=null] - An AbortController object that can be used to cancel the upload process.
* @returns {Promise<UserFile>}
* @throws {Error}
*/
async uploadFile(source, targetPath = null, statusCallback = null, abortController = null) {
if (targetPath === null) {
targetPath = Environment.fileNameForUpload(source);
}
let file = await this.getFile(targetPath);
return await file.uploadFile(source, statusCallback, abortController);
}
/**
* Opens a (existing or non-existing) file without reading any information or creating a new file at the back-end.
*
* @async
* @param {string} path - Path to the file, relative to the user workspace.
* @returns {Promise<UserFile>} A file.
* @throws {Error}
*/
async getFile(path) {
return new UserFile(this, path);
}
/**
* Takes a UserProcess, BuilderNode or a plain object containing process nodes
* and converts it to an API compliant object.
*
* @param {UserProcess|BuilderNode|object.<string, *>} process - Process to be normalized.
* @param {object.<string, *>} additional - Additional properties to be merged with the resulting object.
* @returns {object.<string, *>}
* @protected
*/
_normalizeUserProcess(process, additional = {}) {
if (process instanceof UserProcess) {
process = process.toJSON();
}
else if (process instanceof BuilderNode) {
process.result = true;
process = process.parent.toJSON();
}
else if (Utils.isObject(process) && !Utils.isObject(process.process_graph)) {
process = {
process_graph: process
};
}
return Object.assign({}, additional, {process: process});
}
/**
* Validates a user-defined process at the back-end.
*
* @async
* @param {Process} process - User-defined process to validate.
* @returns {Promise<ValidationResult>} errors - A list of API compatible error objects. A valid process returns an empty list.
* @throws {Error}
*/
async validateProcess(process) {
let response = await this._post('/validation', this._normalizeUserProcess(process).process);
if (Array.isArray(response.data.errors)) {
const errors = response.data.errors;
errors['federation:backends'] = Array.isArray(response.data['federation:missing']) ? response.data['federation:missing'] : [];
return errors;
}
else {
throw new Error("Invalid validation response received.");
}
}
/**
* Lists all user-defined processes of the authenticated user.
*
* @async
* @param {Array.<UserProcess>} [oldProcesses=[]] - A list of existing user-defined processes to update.
* @returns {Promise<ResponseArray.<UserProcess>>} A list of user-defined processes.
* @throws {Error}
*/
async listUserProcesses(oldProcesses = []) {
let response = await this._get('/process_graphs');
if (!Utils.isObject(response.data) || !Array.isArray(response.data.processes)) {
throw new Error('Invalid response received for processes');
}
// Remove existing processes from cache
this.processes.remove(null, 'user');
// Update existing processes if needed
let newProcesses = response.data.processes.map(newProcess => {
let process = oldProcesses.find(oldProcess => oldProcess.id === newProcess.id);
if (!process) {
process = new UserProcess(this, newProcess.id);
}
return process.setAll(newProcess);
});
// Store plain JS variant (i.e. no Job objects involved) of processes in cache
let jsonProcesses = oldProcesses.length > 0 ? newProcesses.map(p => p.toJSON()) : response.data.processes;
this.processes.addAll(jsonProcesses, 'user');
return this._toResponseArray(newProcesses, response.data);
}
/**
* Creates a new stored user-defined process at the back-end.
*
* @async
* @param {string} id - Unique identifier for the process.
* @param {Process} process - A user-defined process.
* @returns {Promise<UserProcess>} The new user-defined process.
* @throws {Error}
*/
async setUserProcess(id, process) {
let pg = new UserProcess(this, id);
return await pg.replaceUserProcess(process);
}
/**
* Get all information about a user-defined process.
*
* @async
* @param {string} id - Identifier of the user-defined process.
* @returns {Promise<UserProcess>} The user-defined process.
* @throws {Error}
*/
async getUserProcess(id) {
let pg = new UserProcess(this, id);
return await pg.describeUserProcess();
}
/**
* Executes a process synchronously and returns the result as the response.
*
* Please note that requests can take a very long time of several minutes or even hours.
*
* @async
* @param {Process} process - A user-defined process.
* @param {?string} [plan=null] - The billing plan to use for this computation.
* @param {?number} [budget=null] - The maximum budget allowed to spend for this computation.
* @param {?AbortController} [abortController=null] - An AbortController object that can be used to cancel the processing request.
* @param {object.<string, *>} [additional={}] - Other parameters to pass for the batch job, e.g. `log_level`.
* @returns {Promise<SyncResult>} - An object with the data and some metadata.
*/
async computeResult(process, plan = null, budget = null, abortController = null, additional = {}) {
let requestBody = this._normalizeUserProcess(
process,
Object.assign({}, additional, {
plan: plan,
budget: budget
})
);
let response = await this._post('/result', requestBody, Environment.getResponseType(), abortController);
let syncResult = {
data: response.data,
costs: null,
type: null,
logs: []
};
if (typeof response.headers['openeo-costs'] === 'number') {
syncResult.costs = response.headers['openeo-costs'];
}
if (typeof response.headers['content-type'] === 'string') {
syncResult.type = response.headers['content-type'];
}
let links = Array.isArray(response.headers.link) ? response.headers.link : [response.headers.link];
for(let link of links) {
if (typeof link !== 'string') {
continue;
}
let logs = link.match(/^<([^>]+)>;\s?rel="monitor"/i);
if (Array.isArray(logs) && logs.length > 1) {
try {
let logsResponse = await this._get(logs[1]);
if (Utils.isObject(logsResponse.data) && Array.isArray(logsResponse.data.logs)) {
syncResult.logs = logsResponse.data.logs;
}
} catch(error) {
console.warn(error);
}
}
}
return syncResult;
}
/**
* Executes a process synchronously and downloads to result the given path.
*
* Please note that requests can take a very long time of several minutes or even hours.
*
* This method has different behaviour depending on the environment.
* If a NodeJs environment, writes the downloaded file to the target location on the file system.
* In a browser environment, offers the file for downloading using the specified name (folders are not supported).
*
* @async
* @param {Process} process - A user-defined process.
* @param {string} targetPath - The target, see method description for details.
* @param {?string} [plan=null] - The billing plan to use for this computation.
* @param {?number} [budget=null] - The maximum budget allowed to spend for this computation.
* @param {?AbortController} [abortController=null] - An AbortController object that can be used to cancel the processing request.
* @throws {Error}
*/
async downloadResult(process, targetPath, plan = null, budget = null, abortController = null) {
let response = await this.computeResult(process, plan, budget, abortController);
// @ts-ignore
await Environment.saveToFile(response.data, targetPath);
}
/**
* Lists all batch jobs of the authenticated user.
*
* @async
* @param {Array.<Job>} [oldJobs=[]] - A list of existing jobs to update.
* @returns {Promise<ResponseArray.<Job>>} A list of jobs.
* @throws {Error}
*/
async listJobs(oldJobs = []) {
let response = await this._get('/jobs');
let newJobs = response.data.jobs.map(newJob => {
let job = oldJobs.find(oldJob => oldJob.id === newJob.id);
if (!job) {
job = new Job(this, newJob.id);
}
return job.setAll(newJob);
});
return this._toResponseArray(newJobs, response.data);
}
/**
* Creates a new batch job at the back-end.
*
* @async
* @param {Process} process - A user-define process to execute.
* @param {?string} [title=null] - A title for the batch job.
* @param {?string} [description=null] - A description for the batch job.
* @param {?string} [plan=null] - The billing plan to use for this batch job.
* @param {?number} [budget=null] - The maximum budget allowed to spend for this batch job.
* @param {object.<string, *>} [additional={}] - Other parameters to pass for the batch job, e.g. `log_level`.
* @returns {Promise<Job>} The stored batch job.
* @throws {Error}
*/
async createJob(process, title = null, description = null, plan = null, budget = null, additional = {}) {
additional = Object.assign({}, additional, {
title: title,
description: description,
plan: plan,
budget: budget
});
let requestBody = this._normalizeUserProcess(process, additional);
let response = await this._post('/jobs', requestBody);
if (typeof response.headers['openeo-identifier'] !== 'string') {
throw new Error("Response did not contain a Job ID. Job has likely been created, but may not show up yet.");
}
let job = new Job(this, response.headers['openeo-identifier']).setAll(requestBody);
if (this.capabilities().hasFeature('describeJob')) {
return await job.describeJob();
}
else {
return job;
}
}
/**
* Get all information about a batch job.
*
* @async
* @param {string} id - Batch Job ID.
* @returns {Promise<Job>} The batch job.
* @throws {Error}
*/
async getJob(id) {
let job = new Job(this, id);
return await job.describeJob();
}
/**
* Lists all secondary web services of the authenticated user.
*
* @async
* @param {Array.<Service>} [oldServices=[]] - A list of existing services to update.
* @returns {Promise<ResponseArray.<Job>>} A list of services.
* @throws {Error}
*/
async listServices(oldServices = []) {
let response = await this._get('/services');
let newServices = response.data.services.map(newService => {
let service = oldServices.find(oldService => oldService.id === newService.id);
if (!service) {
service = new Service(this, newService.id);
}
return service.setAll(newService);
});
return this._toResponseArray(newServices, response.data);
}
/**
* Creates a new secondary web service at the back-end.
*
* @async
* @param {Process} process - A user-defined process.
* @param {string} type - The type of service to be created (see `Connection.listServiceTypes()`).
* @param {?string} [title=null] - A title for the service.
* @param {?string} [description=null] - A description for the service.
* @param {boolean} [enabled=true] - Enable the service (`true`, default) or not (`false`).
* @param {object.<string, *>} [configuration={}] - Configuration parameters to pass to the service.
* @param {?string} [plan=null] - The billing plan to use for this service.
* @param {?number} [budget=null] - The maximum budget allowed to spend for this service.
* @param {object.<string, *>} [additional={}] - Other parameters to pass for the service, e.g. `log_level`.
* @returns {Promise<Service>} The stored service.
* @throws {Error}
*/
async createService(process, type, title = null, description = null, enabled = true, configuration = {}, plan = null, budget = null, additional = {}) {
let requestBody = this._normalizeUserProcess(process, Object.assign({
title: title,
description: description,
type: type,
enabled: enabled,
configuration: configuration,
plan: plan,
budget: budget
}, additional));
let response = await this._post('/services', requestBody);
if (typeof response.headers['openeo-identifier'] !== 'string') {
throw new Error("Response did not contain a Service ID. Service has likely been created, but may not show up yet.");
}
let service = new Service(this, response.headers['openeo-identifier']).setAll(requestBody);
if (this.capabilities().hasFeature('describeService')) {
return service.describeService();
}
else {
return service;
}
}
/**
* Get all information about a secondary web service.
*
* @async
* @param {string} id - Service ID.
* @returns {Promise<Service>} The service.
* @throws {Error}
*/
async getService(id) {
let service = new Service(this, id);
return await service.describeService();
}
/**
* Adds additional response details to the array.
*
* Adds links and federation:missing.
*
* @protected
* @param {Array.<*>} arr
* @param {object.<string, *>} response
* @returns {ResponseArray}
*/
_toResponseArray(arr, response) {
arr.links = Array.isArray(response.links) ? response.links : [];
arr['federation:missing'] = Array.isArray(response['federation:missing']) ? response['federation:missing'] : [];
return arr;
}
/**
* Get the a link with the given rel type.
*
* @protected
* @param {Array.<Link>} links - An array of links.
* @param {string|Array.<string>} rel - Relation type(s) to find.
* @returns {string | null}
* @throws {Error}
*/
_getLinkHref(links, rel) {
if (!Array.isArray(rel)) {
rel = [rel];
}
if (Array.isArray(links)) {
let link = links.find(l => Utils.isObject(l) && rel.includes(l.rel) && typeof l.href === 'string');
if (link) {
return link.href;
}
}
return null;
}
/**
* Makes all links in the list absolute.
*
* @param {Array.<Link>} links - An array of links.
* @param {?string|AxiosResponse} [base=null] - The base url to use for relative links, or an response to derive the url from.
* @returns {Array.<Link>}
*/
makeLinksAbsolute(links, base = null) {
if (!Array.isArray(links)) {
return links;
}
let baseUrl = null;
if (Utils.isObject(base) && base.headers && base.config && base.request) { // AxiosResponse
baseUrl = base.config.baseURL + base.config.url;
}
else if (typeof base !== 'string') {
baseUrl = this._getLinkHref(links, 'self');
}
else {
baseUrl = base;
}
if (!baseUrl) {
return links;
}
return links.map((link) => {
if (!Utils.isObject(link) || typeof link.href !== 'string') {
return link;
}
try {
let url = new URL(link.href, baseUrl);
return Object.assign({}, link, {href: url.toString()});
} catch(error) {
return link;
}
});
}
/**
* Sends a GET request.
*
* @protected
* @async
* @param {string} path
* @param {object.<string, *>} query
* @param {string} responseType - Response type according to axios, defaults to `json`.
* @param {?AbortController} [abortController=null] - An AbortController object that can be used to cancel the request.
* @returns {Promise<AxiosResponse>}
* @throws {Error}
* @see https://github.com/axios/axios#request-config
*/
async _get(path, query, responseType, abortController = null) {
return await this._send({
method: 'get',
responseType: responseType,
url: path,
// Timeout for capabilities requests as they are used for a quick first discovery to check whether the server is a openEO back-end.
// Without timeout connecting with a wrong server url may take forever.
timeout: path === '/' ? 5000 : 0,
params: query
}, abortController);
}
/**
* Sends a POST request.
*
* @protected
* @async
* @param {string} path
* @param {*} body
* @param {string} responseType - Response type according to axios, defaults to `json`.
* @param {?AbortController} [abortController=null] - An AbortController object that can be used to cancel the request.
* @returns {Promise<AxiosResponse>}
* @throws {Error}
* @see https://github.com/axios/axios#request-config
*/
async _post(path, body, responseType, abortController = null) {
let options = {
method: 'post',
responseType: responseType,
url: path,
data: body
};
return await this._send(options, abortController);
}
/**
* Sends a PUT request.
*
* @protected
* @async
* @param {string} path
* @param {*} body
* @returns {Promise<AxiosResponse>}
* @throws {Error}
*/
async _put(path, body) {
return await this._send({
method: 'put',
url: path,
data: body
});
}
/**
* Sends a PATCH request.
*
* @protected
* @async
* @param {string} path
* @param {*} body
* @returns {Promise<AxiosResponse>}
* @throws {Error}
*/
async _patch(path, body) {
return await this._send({
method: 'patch',
url: path,
data: body
});
}
/**
* Sends a DELETE request.
*
* @protected
* @async
* @param {string} path
* @returns {Promise<AxiosResponse>}
* @throws {Error}
*/
async _delete(path) {
return await this._send({
method: 'delete',
url: path
});
}
/**
* Downloads data from a URL.
*
* May include authorization details where required.
*
* @param {string} url - An absolute or relative URL to download data from.
* @param {boolean} authorize - Send authorization details (`true`) or not (`false`).
* @returns {Promise<Stream.Readable|Blob>} - Returns the data as `Stream` in NodeJS environments or as `Blob` in browsers
* @throws {Error}
*/
async download(url, authorize) {
let result = await this._send({
method: 'get',
responseType: Environment.getResponseType(),
url: url,
authorization: authorize
});
return result.data;
}
/**
* Get the authorization header for requests.
*
* @protected
* @returns {object.<string, string>}
*/
_getAuthHeaders() {
const headers = {};
if (this.isAuthenticated()) {
headers.Authorization = 'Bearer ' + this.authProvider.getToken();
}
return headers;
}
/**
* Sends a HTTP request.
*
* Options mostly conform to axios,
* see {@link https://github.com/axios/axios#request-config}.
*
* Automatically sets a baseUrl and the authorization information.
* Default responseType is `json`.
*
* Tries to smoothly handle error responses by providing an object for all response types,
* instead of Streams or Blobs for non-JSON response types.
*
* @protected
* @async
* @param {object.<string, *>} options
* @param {?AbortController} [abortController=null] - An AbortController object that can be used to cancel the request.
* @returns {Promise<AxiosResponse>}
* @throws {Error}
* @see https://github.com/axios/axios
*/
async _send(options, abortController = null) {
options.baseURL = this.baseUrl;
if (typeof options.authorization === 'undefined' || options.authorization === true) {
if (!options.headers) {
options.headers = {};
}
Object.assign(options.headers, this._getAuthHeaders());
}
if (!options.responseType) {
options.responseType = 'json';
}
if (abortController) {
options.signal = abortController.signal;
}
try {
let response = await axios(options);
let capabilities = this.capabilities();
if (capabilities) {
response = capabilities.migrate(response);
}
return response;
} catch(error) {
if (axios.isCancel(error)) {
throw error;
}
const checkContentType = type => (typeof type === 'string' && type.indexOf('/json') !== -1);
const enrichError = (origin, response) => {
if (typeof response.message === 'string') {
origin.message = response.message;
}
origin.code = typeof response.code === 'string' ? response.code : "";
origin.id = response.id;
origin.links = Array.isArray(response.links) ? response.links : [];
return origin;
};
if (Utils.isObject(error.response) && Utils.isObject(error.response.data) && (checkContentType(error.response.data.type) || (Utils.isObject(error.response.headers) && checkContentType(error.response.headers['content-type'])))) {
// JSON error responses are Blobs and streams if responseType is set as such, so convert to JSON if required.
// See: https://github.com/axios/axios/issues/815
if (options.responseType === Environment.getResponseType()) {
try {
let errorResponse = await Environment.handleErrorResponse(error);
throw enrichError(error, errorResponse);
} catch (error2) {
console.error(error2);
}
}
else {
throw enrichError(error, error.response.data);
}
}
throw error;
}
}
}
module.exports = Connection;