Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Issue78 list secondary services #79

Closed
wants to merge 28 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
28 commits
Select commit Hold shift + click to select a range
9c45de8
Added vscode to gitignore
JohanKJSchreurs Oct 27, 2022
009d68c
Issue #78 Implemented GET /service_types
JohanKJSchreurs Oct 27, 2022
466616e
Issue #78 Implementing GET /services
JohanKJSchreurs Nov 2, 2022
88b7e6c
Issue #78 Implementing service_info - GET /services/{service_id}
JohanKJSchreurs Nov 2, 2022
203ca21
Issue #78 Implementing create_service - POST /services/
JohanKJSchreurs Nov 4, 2022
e78e6d6
Issue #78 Adding tests for views for SecondaryServices
JohanKJSchreurs Nov 7, 2022
38f48f9
Issue #78 Cleaned up the new tests a bit.
JohanKJSchreurs Nov 7, 2022
5b72f75
Issue #78 Adding more tests for service_info and create_service
JohanKJSchreurs Nov 7, 2022
584d7dd
Issue #78 Adding more tests for secondary services
JohanKJSchreurs Nov 8, 2022
4097e0b
Issue #78 Implementing remove_service
JohanKJSchreurs Nov 8, 2022
9a09b79
Issue #78, correcting remove_service and improve tests
JohanKJSchreurs Nov 9, 2022
7469979
Issue #78 start implementing update_service and improve tests
JohanKJSchreurs Nov 9, 2022
d68a32c
Issue #78 add more tests in test_views.py
JohanKJSchreurs Nov 9, 2022
a665493
Issue #78 Removed test case for remove_service that is not so useful.
JohanKJSchreurs Nov 10, 2022
35e554c
Issue #78 WIP on tests for Secondary Services
JohanKJSchreurs Nov 10, 2022
386b2ce
Removing .flake8 which was added accidentally
JohanKJSchreurs Nov 10, 2022
667bfa8
Issue #78 Fix: methods for secondary services were not using the user_id
JohanKJSchreurs Nov 10, 2022
7f7b8f1
Merge branch 'master' into issue78-list-secondary-services
JohanKJSchreurs Nov 10, 2022
66f005a
Issue #78 Add user authentication on update_service
JohanKJSchreurs Nov 10, 2022
c5ffaf1
Issue #78, code review, doing some small corrections first
JohanKJSchreurs Nov 14, 2022
304b6ec
Issue #78, code review: removing test coverage for API version 0.4
JohanKJSchreurs Nov 14, 2022
c0b1480
Issue #78, corrected: methods from AggregatorSecondaryServices were n…
JohanKJSchreurs Nov 14, 2022
f1904f1
Issue #78, code review, Removed left-over of test coverage for API ve…
JohanKJSchreurs Nov 14, 2022
6a9c621
Issue #78, code review, prepend service_id on backend with the backen…
JohanKJSchreurs Nov 14, 2022
7cfd11b
Issue #78, clean up authentication code in test_backend.py
JohanKJSchreurs Nov 14, 2022
a17a5f3
Merge branch 'master' into issue78-list-secondary-services
JohanKJSchreurs Nov 15, 2022
0d2e026
Issue #78 Updated changelog
JohanKJSchreurs Nov 15, 2022
4b80f9b
Issue #78 Removed unused arguments from test functions
JohanKJSchreurs Nov 15, 2022
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -106,6 +106,9 @@ ENV/
/.project
/.pydevproject

# Visual Studio Code
.vscode

# Various
openeo_driver/data
tmp*
Expand Down
6 changes: 6 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,12 @@ All notable changes to this project will be documented in this file.
The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.html).

## [Unreleased]

### Added

- Implement secondary services [#78](https://github.com/Open-EO/openeo-aggregator/issues/78))

## [0.6.x]

### Added
Expand Down
203 changes: 200 additions & 3 deletions src/openeo_aggregator/backend.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,11 +30,11 @@
from openeo_aggregator.utils import subdict, dict_merge, normalize_issuer_url
from openeo_driver.ProcessGraphDeserializer import SimpleProcessing
from openeo_driver.backend import OpenEoBackendImplementation, AbstractCollectionCatalog, LoadParameters, Processing, \
OidcProvider, BatchJobs, BatchJobMetadata
OidcProvider, BatchJobs, BatchJobMetadata, SecondaryServices, ServiceMetadata
from openeo_driver.datacube import DriverDataCube
from openeo_driver.errors import CollectionNotFoundException, OpenEOApiException, ProcessGraphMissingException, \
JobNotFoundException, JobNotFinishedException, ProcessGraphInvalidException, PermissionsInsufficientException, \
FeatureUnsupportedException
FeatureUnsupportedException, ServiceNotFoundException
from openeo_driver.processes import ProcessRegistry
from openeo_driver.users import User
from openeo_driver.utils import EvalEnv
Expand Down Expand Up @@ -620,6 +620,200 @@ def get_log_entries(self, job_id: str, user_id: str, offset: Optional[str] = Non
return con.job(backend_job_id).logs(offset=offset)


class ServiceIdMapping:
"""Mapping between aggregator service ids and backend job ids"""

@staticmethod
def get_aggregator_service_id(backend_service_id: str, backend_id: str) -> str:
"""Construct aggregator service id from given backend job id and backend id"""
return f"{backend_id}-{backend_service_id}"

@classmethod
def parse_aggregator_service_id(cls, backends: MultiBackendConnection, aggregator_service_id: str) -> Tuple[str, str]:
"""Given aggregator service id: extract backend service id and backend id"""
for prefix in [f"{con.id}-" for con in backends]:
if aggregator_service_id.startswith(prefix):
backend_id, backend_job_id = aggregator_service_id.split("-", maxsplit=1)
return backend_job_id, backend_id
raise ServiceNotFoundException(service_id=aggregator_service_id)


class AggregatorSecondaryServices(SecondaryServices):
"""
Aggregator implementation of the Secondary Services "microservice"
https://openeo.org/documentation/1.0/developers/api/reference.html#tag/Secondary-Services
"""

def __init__(
self,
backends: MultiBackendConnection,
processing: AggregatorProcessing
):
super(AggregatorSecondaryServices, self).__init__()
self._backends = backends
self._processing = processing

def _get_connection_and_backend_service_id(
self,
aggregator_service_id: str
) -> Tuple[BackendConnection, str]:
"""Get connection to the backend and the corresponding service ID in that backend.

raises: ServiceNotFoundException when service_id does not exist in any of the backends.
"""
backend_service_id, backend_id = ServiceIdMapping.parse_aggregator_service_id(
backends=self._backends,
aggregator_service_id=aggregator_service_id
)

con = self._backends.get_connection(backend_id)
return con, backend_service_id

def service_types(self) -> dict:
"""https://openeo.org/documentation/1.0/developers/api/reference.html#operation/list-service-types"""

service_types = {}

def merge(formats: dict, to_add: dict):
for name, data in to_add.items():
if name.lower() not in {k.lower() for k in formats.keys()}:
formats[name] = data

# Collect all service types from the backends.
for con in self._backends:
try:
types_to_add = con.get("/service_types").json()
except Exception as e:
# TODO: fail instead of warn?
_log.warning(f"Failed to get service_types from {con.id}: {e!r}", exc_info=True)
continue
# TODO #1 smarter merging: parameter differences?
merge(service_types, types_to_add)

return service_types

def list_services(self, user_id: str) -> List[ServiceMetadata]:
"""https://openeo.org/documentation/1.0/developers/api/reference.html#operation/list-services"""

# TODO: use ServiceIdMapping to prepend all service IDs with their backend-id
all_services = []
def merge(services, to_add):
# For now ignore the links
services_to_add = to_add.get("services")
if services_to_add:
services_metadata = [ServiceMetadata.from_dict(s) for s in services_to_add]
services.extend(services_metadata)

# Collect all services from the backends.
for con in self._backends:
with con.authenticated_from_request(request=flask.request, user=User(user_id)):
services_json = None
try:
services_json = con.get("/services").json()
except Exception as e:
_log.warning(f"Failed to get services from {con.id}: {e!r}", exc_info=True)
continue

if services_json:
merge(all_services, services_json)

return all_services

def service_info(self, user_id: str, service_id: str) -> ServiceMetadata:
"""https://openeo.org/documentation/1.0/developers/api/reference.html#operation/describe-service"""

con, backend_service_id = self._get_connection_and_backend_service_id(service_id)
with con.authenticated_from_request(request=flask.request, user=User(user_id)):
try:
service_json = con.get(f"/services/{backend_service_id}").json()
except (OpenEoApiError) as e:
if e.http_status_code == 404:
# Expected error
_log.debug(f"No service with ID={service_id!r} in backend with ID={con.id!r}: {e!r}", exc_info=True)
raise ServiceNotFoundException(service_id=service_id) from e
raise
except Exception as e:
_log.debug(f"Failed to get service with ID={backend_service_id} from backend with ID={con.id}: {e!r}", exc_info=True)
raise
else:
# Adapt the service ID so it points to the aggregator, with the backend ID included.
service_json["id"] = ServiceIdMapping.get_aggregator_service_id(service_json["id"], con.id)
return ServiceMetadata.from_dict(service_json)

def create_service(self, user_id: str, process_graph: dict, service_type: str, api_version: str,
configuration: dict) -> str:
"""
https://openeo.org/documentation/1.0/developers/api/reference.html#operation/create-service
"""
# TODO: configuration is not used. What to do with it?

backend_id = self._processing.get_backend_for_process_graph(
process_graph=process_graph, api_version=api_version
)
process_graph = self._processing.preprocess_process_graph(process_graph, backend_id=backend_id)

con = self._backends.get_connection(backend_id)
with con.authenticated_from_request(request=flask.request, user=User(user_id)):
try:
# create_service can raise ServiceUnsupportedException and OpenEOApiException.
service = con.create_service(graph=process_graph, type=service_type)

# TODO: This exception handling was copy-pasted. What do we actually need here?
except OpenEoApiError as e:
for exc_class in [ProcessGraphMissingException, ProcessGraphInvalidException]:
if e.code == exc_class.code:
raise exc_class
raise OpenEOApiException(f"Failed to create secondary service on backend {backend_id!r}: {e!r}")
except (OpenEoRestError, OpenEoClientException) as e:
raise OpenEOApiException(f"Failed to create secondary service on backend {backend_id!r}: {e!r}")

return ServiceIdMapping.get_aggregator_service_id(service.service_id, backend_id)

def remove_service(self, user_id: str, service_id: str) -> None:
"""https://openeo.org/documentation/1.0/developers/api/reference.html#operation/delete-service"""

# Will raise ServiceNotFoundException if service_id does not exist in any of the backends.
con, backend_service_id = self._get_connection_and_backend_service_id(service_id)

with con.authenticated_from_request(request=flask.request, user=User(user_id)):
try:
con.delete(f"/services/{backend_service_id}", expected_status=204)
except (OpenEoApiError) as e:
if e.http_status_code == 404:
# Expected error
_log.debug(f"No service with ID={service_id!r} in backend with ID={con.id!r}: {e!r}", exc_info=True)
raise ServiceNotFoundException(service_id=service_id) from e
_log.warning(f"Failed to delete service {backend_service_id!r} from {con.id!r}: {e!r}", exc_info=True)
raise
except Exception as e:
_log.warning(f"Failed to delete service {backend_service_id!r} from {con.id!r}: {e!r}", exc_info=True)
raise OpenEOApiException(
f"Failed to delete service {backend_service_id!r} on backend {con.id!r}: {e!r}"
) from e

def update_service(self, user_id: str, service_id: str, process_graph: dict) -> None:
"""https://openeo.org/documentation/1.0/developers/api/reference.html#operation/update-service"""

# Will raise ServiceNotFoundException if service_id does not exist in any of the backends.
con, backend_service_id = self._get_connection_and_backend_service_id(service_id)

with con.authenticated_from_request(request=flask.request, user=User(user_id)):
try:
json = {"process": {"process_graph": process_graph}}
con.patch(f"/services/{backend_service_id}", json=json, expected_status=204)
except (OpenEoApiError) as e:
if e.http_status_code == 404:
# Expected error
_log.debug(f"No service with ID={backend_service_id!r} in backend with ID={con.id!r}: {e!r}", exc_info=True)
raise ServiceNotFoundException(service_id=service_id) from e
raise
except Exception as e:
_log.warning(f"Failed to update service {backend_service_id!r} from {con.id!r}: {e!r}", exc_info=True)
raise OpenEOApiException(
f"Failed to update service {backend_service_id!r} from {con.id!r}: {e!r}"
) from e


class AggregatorBackendImplementation(OpenEoBackendImplementation):
# No basic auth: OIDC auth is required (to get EGI Check-in eduperson_entitlement data)
enable_basic_auth = False
Expand All @@ -645,10 +839,13 @@ def __init__(self, backends: MultiBackendConnection, config: AggregatorConfig):
processing=processing,
partitioned_job_tracker=partitioned_job_tracker
)

secondary_services = AggregatorSecondaryServices(backends=backends, processing=processing)

super().__init__(
catalog=catalog,
processing=processing,
secondary_services=None,
secondary_services=secondary_services,
batch_jobs=batch_jobs,
user_defined_processes=None,
)
Expand Down
Loading