# -*- coding: utf-8 -*-
import msgpack
import base64
from fastapi import FastAPI
from fastapi import Body
from starlette.requests import Request
import pprint
import traceback
import sys
import os
from os import listdir
from os.path import isfile, join
from typing import List
import requests
from fastapi import HTTPException
from starlette.responses import PlainTextResponse
import ujson
from openeo_udf.server.config import UdfConfiguration
from openeo_udf.server.data_model.legacy.udf_legacy_schemas import UdfLegacyDataModel, UdfLegacyRequestModel
from openeo_udf.server.data_model.udf_schemas import UdfRequestModel, ErrorResponseModel, UdfDataModel
from openeo_udf.api.run_code import run_legacy_user_code, run_udf_model_user_code
from openeo_udf.server.machine_learn_database import ResponseStorageModel, RequestStorageModel, store_model
__license__ = "Apache License, Version 2.0"
__author__ = "Soeren Gebbert"
__copyright__ = "Copyright 2018, Soeren Gebbert"
__maintainer__ = "Soeren Gebbert"
__email__ = "soerengebbert@googlemail.com"
"""
There are several different approached available in python that can be implemented:
- UBJSON https://en.wikipedia.org/wiki/UBJSON
- BSON https://en.wikipedia.org/wiki/BSON
- MessagePack https://en.wikipedia.org/wiki/MessagePack
Important is the support for arrays with integer and floating point numbers that are used in
xarray and numpy. Support for timestamps is required as well. For structured data is the support of lists
and maps important.
array integer float map time
MessagePack y y y y y
BJSON y y y y n
UBJSON n y y n n
Based on these requirement it seems that MessagePack is the most potent candidate to use
for serialization and supports many different languages.
MessagePack is available here: https://msgpack.org/
Using Messagepack is quite easy:
In [1]: import msgpack
In [2]: import base64
In [3]: d = {1:[1,2,3,4,5,6], "w":"fffff", "d":{"d":"d"}}
In [4]: d
Out[4]: {1: [1, 2, 3, 4, 5, 6], 'w': 'fffff', 'd': {'d': 'd'}}
In [5]: msgpack.packb(d)
Out[5]: b'\x83\x01\x96\x01\x02\x03\x04\x05\x06\xa1w\xa5fffff\xa1d\x81\xa1d\xa1d'
In [6]: p = msgpack.packb(d)
In [7]: base64.b64encode(p)
Out[7]: b'gwGWAQIDBAUGoXelZmZmZmahZIGhZKFk'
In [8]: t = base64.b64encode(p)
In [9]: base64.b64decode(t)
Out[9]: b'\x83\x01\x96\x01\x02\x03\x04\x05\x06\xa1w\xa5fffff\xa1d\x81\xa1d\xa1d'
In [10]: msgpack.unpackb(base64.b64decode(t))
Out[10]: {1: [1, 2, 3, 4, 5, 6], b'w': b'fffff', b'd': {b'd': b'd'}}
"""
app = FastAPI(title="UDF Server for geodata processing",
description="This server processes UDF data")
[docs]@app.post("/udf", response_model=UdfDataModel, tags=["udf"])
async def udf(request: UdfRequestModel = Body(...)):
"""Run a Python user defined function (UDF) on the provided data collection"""
try:
result = run_udf_model_user_code(udf_model=request)
return result
except Exception:
e_type, e_value, e_tb = sys.exc_info()
response = ErrorResponseModel(message=str(e_value), traceback=str(traceback.format_tb(e_tb)))
raise HTTPException(status_code=400, detail=response.dict())
[docs]@app.post("/udf_message_pack", tags=["udf"], response_model=str,
responses={200: {"content": {"application/base64": {}},
"description": "The base64 encoded string"},
400: {"content": {"application/json": {}}}})
async def udf_message_pack(request: Request):
"""Run a Python user defined function (UDF) on the provided data collection
that are base64 encoded message pack objects"""
try:
data = await request.body()
blob = base64.b64decode(data)
udf_model: UdfRequestModel = msgpack.unpackb(blob, raw=False)
result = run_udf_model_user_code(udf_model=udf_model)
result = base64.b64encode(msgpack.packb(result.to_dict()))
return PlainTextResponse(result)
except Exception:
e_type, e_value, e_tb = sys.exc_info()
response = ErrorResponseModel(message=str(e_value), traceback=str(traceback.format_tb(e_tb)))
raise HTTPException(status_code=400, detail=response.dict())
[docs]@app.post("/udf_legacy", response_model=UdfLegacyDataModel, tags=["udf legacy"])
async def udf_legacy(request: UdfLegacyRequestModel = Body(...)):
"""Run a Python user defined function (UDF) on the provided legacy data"""
try:
result = run_legacy_user_code(dict_data=request.dict())
return result
except Exception:
e_type, e_value, e_tb = sys.exc_info()
response = ErrorResponseModel(message=str(e_value), traceback=str(traceback.format_tb(e_tb)))
raise HTTPException(status_code=400, detail=response.dict())
[docs]@app.post("/udf_legacy_message_pack", response_model=str, tags=["udf legacy"],
responses={200: {"content": {"application/base64": {}},
"description": "The base64 encoded string"},
400: {"content": {"application/json": {}}}})
async def udf_legacy_message_pack(request: Request):
"""Run a Python user defined function (UDF) on the provided legacy
data that are base64 encoded message pack objects"""
try:
data = await request.body()
blob = base64.b64decode(data)
dict_data = msgpack.unpackb(blob, raw=False)
result = run_legacy_user_code(dict_data=dict_data)
result = base64.b64encode(msgpack.packb(result))
return PlainTextResponse(result)
except Exception:
e_type, e_value, e_tb = sys.exc_info()
response = ErrorResponseModel(message=str(e_value), traceback=str(traceback.format_tb(e_tb)))
raise HTTPException(status_code=400, detail=response.dict())
[docs]@app.get("/storage", response_model=List[ResponseStorageModel], tags=["ML Storage"],
responses={200: {"content": {"application/json": {}},
"description": "A list of metadata information about the stored machine model that include "
"the md5 hash, the source, the title and the description.."},
400: {"content": {"application/json": {}}}})
async def ml_get():
"""Return all md5 hashes of the stored machine learn models as list
"""
try:
path = UdfConfiguration.machine_learn_storage_path
if os.path.isdir(path):
result = []
file_list = [f for f in listdir(path) if isfile(join(path, f))]
for f in file_list:
if ".json" in f:
meta_file = open(join(path, f), "r")
d = ujson.loads(meta_file.read())
pprint.pprint(d)
model = ResponseStorageModel(**d)
result.append(model)
return result
response = ErrorResponseModel(message=f"The storage path of the machine learn models was not found on server.")
raise HTTPException(status_code=400, detail=response)
except Exception:
e_type, e_value, e_tb = sys.exc_info()
response = ErrorResponseModel(message=str(e_value), traceback=str(traceback.format_tb(e_tb)))
raise HTTPException(status_code=400, detail=response.dict())
[docs]@app.delete("/storage/{md5_hash}", response_model=str, tags=["ML Storage"],
responses={200: {"content": {"text/plain": {}},
"description": "The removed md5 hash"},
400: {"content": {"application/json": {}}}})
async def ml_delete(md5_hash: str):
"""
Delete a machine learn model in the udf machine learn database
that matches the provided md5 hash. The md5 hash of the to be deleted model
must be provided as text in the HTTP request.
"""
try:
path = os.path.join(UdfConfiguration.machine_learn_storage_path, md5_hash)
if os.path.exists(path):
os.remove(path)
# Remove the json file
if os.path.exists(path + ".json"):
os.remove(path + ".json")
return PlainTextResponse(md5_hash)
response = ErrorResponseModel(message=f"The machine learn model for hash {md5_hash} was not found")
raise HTTPException(status_code=400, detail=response.dict())
except Exception:
e_type, e_value, e_tb = sys.exc_info()
response = ErrorResponseModel(message=str(e_value), traceback=str(traceback.format_tb(e_tb)))
raise HTTPException(status_code=400, detail=response.dict())
[docs]@app.post("/storage", response_model=str, tags=["ML Storage"], responses={200: {"content": {"text/plain": {}},
"description": "The generated md5 hash"},
400: {"content": {"application/json": {}}}})
async def ml_post(request_storage: RequestStorageModel):
"""
Store a machine learn model in the udf machine learn database
and return the corresponding md5 hash. The URL were the model is located
must be provided as text in the HTTP request
"""
try:
if os.path.exists(request_storage.uri):
filepath = request_storage.uri
else:
# Check if thr URL exists by investigating the HTTP header
resp = requests.head(request_storage.uri, allow_redirects=True)
if resp.status_code != 200:
raise Exception("The URL <%s> can not be accessed." % request_storage.uri)
filename = request_storage.uri.rsplit('/', 1)[1]
filepath = os.path.join(UdfConfiguration.temporary_storage_path, filename)
print(request_storage)
r = requests.get(request_storage.uri, allow_redirects=True)
model_file = open(filepath, 'wb')
model_file.write(r.content)
model_file.close()
md5_hash = store_model(filepath=filepath, request_storage=request_storage)
if md5_hash:
return PlainTextResponse(md5_hash)
response = ErrorResponseModel(message=f"Unable to access machine learn model at {request_storage.uri}")
raise HTTPException(status_code=400, detail=response.dict())
except Exception:
e_type, e_value, e_tb = sys.exc_info()
response = ErrorResponseModel(message=str(e_value), traceback=str(traceback.format_tb(e_tb)))
raise HTTPException(status_code=400, detail=response.dict())