diff --git a/.gitignore b/.gitignore index 3800b1cd..0534e444 100644 --- a/.gitignore +++ b/.gitignore @@ -106,6 +106,9 @@ ENV/ /.project /.pydevproject +# Visual Studio Code +.vscode + # Various openeo_driver/data tmp* diff --git a/CHANGELOG.md b/CHANGELOG.md index 0df8a589..5a1b853d 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -5,6 +5,12 @@ All notable changes to this project will be documented in this file. The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/), and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.html). +## [Unreleased] + +### Added + +- Implement secondary services [#78](https://github.com/Open-EO/openeo-aggregator/issues/78)) + ## [0.6.x] ### Added diff --git a/src/openeo_aggregator/backend.py b/src/openeo_aggregator/backend.py index 194a5098..ab0db071 100644 --- a/src/openeo_aggregator/backend.py +++ b/src/openeo_aggregator/backend.py @@ -30,11 +30,11 @@ from openeo_aggregator.utils import subdict, dict_merge, normalize_issuer_url from openeo_driver.ProcessGraphDeserializer import SimpleProcessing from openeo_driver.backend import OpenEoBackendImplementation, AbstractCollectionCatalog, LoadParameters, Processing, \ - OidcProvider, BatchJobs, BatchJobMetadata + OidcProvider, BatchJobs, BatchJobMetadata, SecondaryServices, ServiceMetadata from openeo_driver.datacube import DriverDataCube from openeo_driver.errors import CollectionNotFoundException, OpenEOApiException, ProcessGraphMissingException, \ JobNotFoundException, JobNotFinishedException, ProcessGraphInvalidException, PermissionsInsufficientException, \ - FeatureUnsupportedException + FeatureUnsupportedException, ServiceNotFoundException from openeo_driver.processes import ProcessRegistry from openeo_driver.users import User from openeo_driver.utils import EvalEnv @@ -620,6 +620,200 @@ def get_log_entries(self, job_id: str, user_id: str, offset: Optional[str] = Non return con.job(backend_job_id).logs(offset=offset) +class ServiceIdMapping: + """Mapping between aggregator service ids and backend job ids""" + + @staticmethod + def get_aggregator_service_id(backend_service_id: str, backend_id: str) -> str: + """Construct aggregator service id from given backend job id and backend id""" + return f"{backend_id}-{backend_service_id}" + + @classmethod + def parse_aggregator_service_id(cls, backends: MultiBackendConnection, aggregator_service_id: str) -> Tuple[str, str]: + """Given aggregator service id: extract backend service id and backend id""" + for prefix in [f"{con.id}-" for con in backends]: + if aggregator_service_id.startswith(prefix): + backend_id, backend_job_id = aggregator_service_id.split("-", maxsplit=1) + return backend_job_id, backend_id + raise ServiceNotFoundException(service_id=aggregator_service_id) + + +class AggregatorSecondaryServices(SecondaryServices): + """ + Aggregator implementation of the Secondary Services "microservice" + https://openeo.org/documentation/1.0/developers/api/reference.html#tag/Secondary-Services + """ + + def __init__( + self, + backends: MultiBackendConnection, + processing: AggregatorProcessing + ): + super(AggregatorSecondaryServices, self).__init__() + self._backends = backends + self._processing = processing + + def _get_connection_and_backend_service_id( + self, + aggregator_service_id: str + ) -> Tuple[BackendConnection, str]: + """Get connection to the backend and the corresponding service ID in that backend. + + raises: ServiceNotFoundException when service_id does not exist in any of the backends. + """ + backend_service_id, backend_id = ServiceIdMapping.parse_aggregator_service_id( + backends=self._backends, + aggregator_service_id=aggregator_service_id + ) + + con = self._backends.get_connection(backend_id) + return con, backend_service_id + + def service_types(self) -> dict: + """https://openeo.org/documentation/1.0/developers/api/reference.html#operation/list-service-types""" + + service_types = {} + + def merge(formats: dict, to_add: dict): + for name, data in to_add.items(): + if name.lower() not in {k.lower() for k in formats.keys()}: + formats[name] = data + + # Collect all service types from the backends. + for con in self._backends: + try: + types_to_add = con.get("/service_types").json() + except Exception as e: + # TODO: fail instead of warn? + _log.warning(f"Failed to get service_types from {con.id}: {e!r}", exc_info=True) + continue + # TODO #1 smarter merging: parameter differences? + merge(service_types, types_to_add) + + return service_types + + def list_services(self, user_id: str) -> List[ServiceMetadata]: + """https://openeo.org/documentation/1.0/developers/api/reference.html#operation/list-services""" + + # TODO: use ServiceIdMapping to prepend all service IDs with their backend-id + all_services = [] + def merge(services, to_add): + # For now ignore the links + services_to_add = to_add.get("services") + if services_to_add: + services_metadata = [ServiceMetadata.from_dict(s) for s in services_to_add] + services.extend(services_metadata) + + # Collect all services from the backends. + for con in self._backends: + with con.authenticated_from_request(request=flask.request, user=User(user_id)): + services_json = None + try: + services_json = con.get("/services").json() + except Exception as e: + _log.warning(f"Failed to get services from {con.id}: {e!r}", exc_info=True) + continue + + if services_json: + merge(all_services, services_json) + + return all_services + + def service_info(self, user_id: str, service_id: str) -> ServiceMetadata: + """https://openeo.org/documentation/1.0/developers/api/reference.html#operation/describe-service""" + + con, backend_service_id = self._get_connection_and_backend_service_id(service_id) + with con.authenticated_from_request(request=flask.request, user=User(user_id)): + try: + service_json = con.get(f"/services/{backend_service_id}").json() + except (OpenEoApiError) as e: + if e.http_status_code == 404: + # Expected error + _log.debug(f"No service with ID={service_id!r} in backend with ID={con.id!r}: {e!r}", exc_info=True) + raise ServiceNotFoundException(service_id=service_id) from e + raise + except Exception as e: + _log.debug(f"Failed to get service with ID={backend_service_id} from backend with ID={con.id}: {e!r}", exc_info=True) + raise + else: + # Adapt the service ID so it points to the aggregator, with the backend ID included. + service_json["id"] = ServiceIdMapping.get_aggregator_service_id(service_json["id"], con.id) + return ServiceMetadata.from_dict(service_json) + + def create_service(self, user_id: str, process_graph: dict, service_type: str, api_version: str, + configuration: dict) -> str: + """ + https://openeo.org/documentation/1.0/developers/api/reference.html#operation/create-service + """ + # TODO: configuration is not used. What to do with it? + + backend_id = self._processing.get_backend_for_process_graph( + process_graph=process_graph, api_version=api_version + ) + process_graph = self._processing.preprocess_process_graph(process_graph, backend_id=backend_id) + + con = self._backends.get_connection(backend_id) + with con.authenticated_from_request(request=flask.request, user=User(user_id)): + try: + # create_service can raise ServiceUnsupportedException and OpenEOApiException. + service = con.create_service(graph=process_graph, type=service_type) + + # TODO: This exception handling was copy-pasted. What do we actually need here? + except OpenEoApiError as e: + for exc_class in [ProcessGraphMissingException, ProcessGraphInvalidException]: + if e.code == exc_class.code: + raise exc_class + raise OpenEOApiException(f"Failed to create secondary service on backend {backend_id!r}: {e!r}") + except (OpenEoRestError, OpenEoClientException) as e: + raise OpenEOApiException(f"Failed to create secondary service on backend {backend_id!r}: {e!r}") + + return ServiceIdMapping.get_aggregator_service_id(service.service_id, backend_id) + + def remove_service(self, user_id: str, service_id: str) -> None: + """https://openeo.org/documentation/1.0/developers/api/reference.html#operation/delete-service""" + + # Will raise ServiceNotFoundException if service_id does not exist in any of the backends. + con, backend_service_id = self._get_connection_and_backend_service_id(service_id) + + with con.authenticated_from_request(request=flask.request, user=User(user_id)): + try: + con.delete(f"/services/{backend_service_id}", expected_status=204) + except (OpenEoApiError) as e: + if e.http_status_code == 404: + # Expected error + _log.debug(f"No service with ID={service_id!r} in backend with ID={con.id!r}: {e!r}", exc_info=True) + raise ServiceNotFoundException(service_id=service_id) from e + _log.warning(f"Failed to delete service {backend_service_id!r} from {con.id!r}: {e!r}", exc_info=True) + raise + except Exception as e: + _log.warning(f"Failed to delete service {backend_service_id!r} from {con.id!r}: {e!r}", exc_info=True) + raise OpenEOApiException( + f"Failed to delete service {backend_service_id!r} on backend {con.id!r}: {e!r}" + ) from e + + def update_service(self, user_id: str, service_id: str, process_graph: dict) -> None: + """https://openeo.org/documentation/1.0/developers/api/reference.html#operation/update-service""" + + # Will raise ServiceNotFoundException if service_id does not exist in any of the backends. + con, backend_service_id = self._get_connection_and_backend_service_id(service_id) + + with con.authenticated_from_request(request=flask.request, user=User(user_id)): + try: + json = {"process": {"process_graph": process_graph}} + con.patch(f"/services/{backend_service_id}", json=json, expected_status=204) + except (OpenEoApiError) as e: + if e.http_status_code == 404: + # Expected error + _log.debug(f"No service with ID={backend_service_id!r} in backend with ID={con.id!r}: {e!r}", exc_info=True) + raise ServiceNotFoundException(service_id=service_id) from e + raise + except Exception as e: + _log.warning(f"Failed to update service {backend_service_id!r} from {con.id!r}: {e!r}", exc_info=True) + raise OpenEOApiException( + f"Failed to update service {backend_service_id!r} from {con.id!r}: {e!r}" + ) from e + + class AggregatorBackendImplementation(OpenEoBackendImplementation): # No basic auth: OIDC auth is required (to get EGI Check-in eduperson_entitlement data) enable_basic_auth = False @@ -645,10 +839,13 @@ def __init__(self, backends: MultiBackendConnection, config: AggregatorConfig): processing=processing, partitioned_job_tracker=partitioned_job_tracker ) + + secondary_services = AggregatorSecondaryServices(backends=backends, processing=processing) + super().__init__( catalog=catalog, processing=processing, - secondary_services=None, + secondary_services=secondary_services, batch_jobs=batch_jobs, user_defined_processes=None, ) diff --git a/tests/test_backend.py b/tests/test_backend.py index aa95b930..fecd44fc 100644 --- a/tests/test_backend.py +++ b/tests/test_backend.py @@ -1,15 +1,29 @@ +import datetime as dt + import pytest from openeo_aggregator.backend import AggregatorCollectionCatalog, AggregatorProcessing, \ - AggregatorBackendImplementation, _InternalCollectionMetadata, JobIdMapping + AggregatorBackendImplementation, _InternalCollectionMetadata, JobIdMapping, \ + AggregatorSecondaryServices from openeo_aggregator.caching import DictMemoizer from openeo_aggregator.testing import clock_mock -from openeo_driver.errors import OpenEOApiException, CollectionNotFoundException, JobNotFoundException +from openeo_driver.backend import ServiceMetadata +from openeo_driver.errors import OpenEOApiException, CollectionNotFoundException, JobNotFoundException, \ + ServiceNotFoundException from openeo_driver.testing import DictSubSet from openeo_driver.users.oidc import OidcProvider +from openeo_driver.users.auth import HttpAuthHandler +from openeo_driver.errors import ProcessGraphMissingException, ProcessGraphInvalidException, ServiceUnsupportedException +from openeo.rest import OpenEoApiError, OpenEoRestError from .conftest import DEFAULT_MEMOIZER_CONFIG +TEST_USER = "Mr.Test" +TEST_USER_BEARER_TOKEN = "basic//" + HttpAuthHandler.build_basic_access_token(user_id=TEST_USER) +TEST_USER_AUTH_HEADER = { + "Authorization": "Bearer " + TEST_USER_BEARER_TOKEN +} + class TestAggregatorBackendImplementation: def test_oidc_providers(self, multi_backend_connection, config, backend1, backend2, requests_mock): @@ -111,6 +125,524 @@ def test_file_formats_merging(self, multi_backend_connection, config, backend1, } +class TestAggregatorSecondaryServices: + + def test_service_types_simple( + self, multi_backend_connection, config, catalog, backend1, backend2, requests_mock + ): + """Given 2 backends and only 1 backend has a single service type, then the aggregator + returns that 1 service type's metadata. + """ + single_service_type = { + "WMTS": { + "configuration": { + "colormap": { + "default": "YlGn", + "description": + "The colormap to apply to single band layers", + "type": "string" + }, + "version": { + "default": "1.0.0", + "description": "The WMTS version to use.", + "enum": ["1.0.0"], + "type": "string" + } + }, + "links": [], + "process_parameters": [], + "title": "Web Map Tile Service" + } + } + requests_mock.get(backend1 + "/service_types", json=single_service_type) + requests_mock.get(backend2 + "/service_types", json={}) + processing = AggregatorProcessing(backends=multi_backend_connection, catalog=catalog, config=config) + implementation = AggregatorSecondaryServices(backends=multi_backend_connection, processing=processing) + + service_types = implementation.service_types() + assert service_types == single_service_type + + def test_service_types_merging(self, multi_backend_connection, config, catalog, + backend1, backend2, requests_mock + ): + """Given 2 backends with each 1 service type, then the aggregator lists both service types.""" + service_type_1 = { + "WMTS": { + "configuration": { + "colormap": { + "default": "YlGn", + "description": + "The colormap to apply to single band layers", + "type": "string" + }, + "version": { + "default": "1.0.0", + "description": "The WMTS version to use.", + "enum": ["1.0.0"], + "type": "string" + } + }, + "links": [], + "process_parameters": [], + "title": "Web Map Tile Service" + } + } + service_type_2 = { + "WMS": { + "title": "OGC Web Map Service", + "configuration": {}, + "process_parameters": [], + "links": [] + } + } + requests_mock.get(backend1 + "/service_types", json=service_type_1) + requests_mock.get(backend2 + "/service_types", json=service_type_2) + processing = AggregatorProcessing(backends=multi_backend_connection, catalog=catalog, config=config) + implementation = AggregatorSecondaryServices(backends=multi_backend_connection, processing=processing) + + actual_service_types = implementation.service_types() + + expected_service_types = dict(service_type_1) + expected_service_types.update(service_type_2) + assert actual_service_types == expected_service_types + + @pytest.fixture + def service_metadata_wmts_foo(self): + return ServiceMetadata( + id="wmts-foo", + process={"process_graph": {"foo": {"process_id": "foo", "arguments": {}}}}, + url='https://oeo.net/wmts/foo', + type="WMTS", + enabled=True, + configuration={"version": "0.5.8"}, + attributes={}, + title="Test WMTS service", + created=dt.datetime(2020, 4, 9, 15, 5, 8) + ) + + @pytest.fixture + def service_metadata_wms_bar(self): + return ServiceMetadata( + id="wms-bar", + process={"process_graph": {"bar": {"process_id": "bar", "arguments": {}}}}, + url='https://oeo.net/wms/bar', + type="WMS", + enabled=True, + configuration={"version": "0.5.8"}, + attributes={}, + title="Test WMS service", + created=dt.datetime(2022, 2, 1, 13, 30, 3) + ) + + def test_list_services_simple( + self, flask_app, multi_backend_connection, config, catalog, + backend1, backend2, requests_mock, service_metadata_wmts_foo + ): + """Given 2 backends but only 1 backend has a single service, then the aggregator + returns that 1 service's metadata. + """ + services1 = {"services": [service_metadata_wmts_foo.prepare_for_json()], "links": []} + services2 = {} + requests_mock.get(backend1 + "/services", json=services1) + requests_mock.get(backend2 + "/services", json=services2) + processing = AggregatorProcessing(backends=multi_backend_connection, catalog=catalog, config=config) + implementation = AggregatorSecondaryServices(backends=multi_backend_connection, processing=processing) + + with flask_app.test_request_context(headers=TEST_USER_AUTH_HEADER): + actual_services = implementation.list_services(user_id=TEST_USER) + + # Construct expected result. We have get just data from the service in services1 + # (there is only one) for conversion to a ServiceMetadata. + the_service = dict(services1["services"][0]) + + # TODO: prepend the backend's service_id with the backend_id + # the_service["id"] = "-wmts-foo" + expected_services = [ServiceMetadata.from_dict(the_service)] + assert actual_services == expected_services + + def test_list_services_merged( + self, flask_app, multi_backend_connection, config, catalog, + backend1, backend2, requests_mock, service_metadata_wmts_foo, service_metadata_wms_bar + ): + """Given 2 backends with each 1 service, then the aggregator lists both services.""" + + services1 = {"services": [service_metadata_wmts_foo.prepare_for_json()], "links": []} + services2 = {"services": [service_metadata_wms_bar.prepare_for_json()], "links": []} + requests_mock.get(backend1 + "/services", json=services1) + requests_mock.get(backend2 + "/services", json=services2) + processing = AggregatorProcessing(backends=multi_backend_connection, catalog=catalog, config=config) + implementation = AggregatorSecondaryServices(backends=multi_backend_connection, processing=processing) + + with flask_app.test_request_context(headers=TEST_USER_AUTH_HEADER): + actual_services = implementation.list_services(user_id=TEST_USER) + + expected_services = [service_metadata_wmts_foo, service_metadata_wms_bar] + assert sorted(actual_services) == sorted(expected_services) + + def test_list_services_merged_multiple( + self, flask_app, multi_backend_connection, config, catalog, + backend1, backend2, requests_mock, service_metadata_wmts_foo, service_metadata_wms_bar + ): + """Given multiple services across 2 backends, the aggregator lists all service types from all backends.""" + services1 = { + "services": [{ + "id": "wms-nvdi", + "title": "NDVI based on Sentinel 2", + "description": "Deriving minimum NDVI measurements over pixel time series of Sentinel 2", + "url": "https://example.openeo.org/wms/wms-nvdi", + "type": "wms", + "enabled": True, + "process": { + "id": "ndvi", + "summary": "string", + "description": "string", + "links": [{ + "rel": "related", + "href": "https://example.openeo.org", + "type": "text/html", + "title": "openEO" + }], + "process_graph": {"foo": {"process_id": "foo", "arguments": {}}}, + }, + "configuration": { + "version": "1.3.0" + }, + "attributes": { + "layers": ["ndvi", "evi"] + }, + "created": "2017-01-01T09:32:12Z", + }], + "links": [{ + "rel": "related", + "href": "https://example.openeo.org", + "type": "text/html", + "title": "openEO" + }] + } + services2 = {"services": [ + service_metadata_wmts_foo.prepare_for_json(), + service_metadata_wms_bar.prepare_for_json() + ], + "links": [] + } + requests_mock.get(backend1 + "/services", json=services1) + requests_mock.get(backend2 + "/services", json=services2) + processing = AggregatorProcessing(backends=multi_backend_connection, catalog=catalog, config=config) + implementation = AggregatorSecondaryServices(backends=multi_backend_connection, processing=processing) + + with flask_app.test_request_context(headers=TEST_USER_AUTH_HEADER): + actual_services = implementation.list_services(user_id=TEST_USER) + + # Construct expected result. We have get just data from the service in + # services1 (there is only one) for conversion to a ServiceMetadata. + # TODO: do we need to take care of the links part in the JSON as well? + service1 = services1["services"][0] + service1_md = ServiceMetadata.from_dict(service1) + expected_services = [ + service1_md, service_metadata_wmts_foo, service_metadata_wms_bar + ] + + assert sorted(actual_services) == sorted(expected_services) + + def test_service_info_succeeds( + self, flask_app, multi_backend_connection, config, catalog, + backend1, backend2, requests_mock, service_metadata_wmts_foo, service_metadata_wms_bar + ): + """When it gets a correct service ID, it returns the expected ServiceMetadata.""" + json_wmts_foo = service_metadata_wmts_foo.prepare_for_json() + json_wms_bar = service_metadata_wms_bar.prepare_for_json() + requests_mock.get(backend1 + "/services/wmts-foo", json=json_wmts_foo) + requests_mock.get(backend2 + "/services/wms-bar", json=json_wms_bar) + processing = AggregatorProcessing(backends=multi_backend_connection, catalog=catalog, config=config) + implementation = AggregatorSecondaryServices(backends=multi_backend_connection, processing=processing) + + # Check the expected metadata on *both* of the services. + with flask_app.test_request_context(headers=TEST_USER_AUTH_HEADER): + actual_service1 = implementation.service_info(user_id=TEST_USER, service_id="b1-wmts-foo") + + json = dict(json_wmts_foo) + json["id"] = "b1-" + json["id"] + expected_service1 = ServiceMetadata.from_dict(json) + + assert actual_service1 == expected_service1 + + with flask_app.test_request_context(headers=TEST_USER_AUTH_HEADER): + actual_service2 = implementation.service_info(user_id=TEST_USER, service_id="b2-wms-bar") + + json = dict(json_wms_bar) + json["id"] = "b2-" + json["id"] + expected_service2 = ServiceMetadata.from_dict(json) + + assert actual_service2 == expected_service2 + + def test_service_info_wrong_backend_id( + self, flask_app, multi_backend_connection, config, catalog, backend1, requests_mock, + service_metadata_wmts_foo + ): + """When it gets a non-existent service ID, it raises a ServiceNotFoundException.""" + + requests_mock.get(backend1 + "/services/wmts-foo", json=service_metadata_wmts_foo.prepare_for_json()) + processing = AggregatorProcessing(backends=multi_backend_connection, catalog=catalog, config=config) + implementation = AggregatorSecondaryServices(backends=multi_backend_connection, processing=processing) + + with flask_app.test_request_context(headers=TEST_USER_AUTH_HEADER): + with pytest.raises(ServiceNotFoundException): + implementation.service_info(user_id=TEST_USER, service_id="backenddoesnotexist-wtms-foo") + + def test_service_info_wrong_service_id( + self, flask_app, multi_backend_connection, config, catalog, backend1, requests_mock, + ): + """When it gets a non-existent service ID, it raises a ServiceNotFoundException.""" + + requests_mock.get(backend1 + "/services/service-does-not-exist", status_code=404) + processing = AggregatorProcessing(backends=multi_backend_connection, catalog=catalog, config=config) + implementation = AggregatorSecondaryServices(backends=multi_backend_connection, processing=processing) + + with flask_app.test_request_context(headers=TEST_USER_AUTH_HEADER): + with pytest.raises(ServiceNotFoundException): + implementation.service_info(user_id=TEST_USER, service_id="b1-service-does-not-exist") + + assert requests_mock.called + + def test_create_service_succeeds( + self, flask_app, multi_backend_connection, config, catalog, backend1, requests_mock + ): + """When it gets a correct params for a new service, it successfully creates it.""" + + # Set up responses for creating the service in backend 1 + backend_service_id = "wmts-foo" + # The aggregator should prepend the service_id with the backend_id + expected_service_id = "b1-wmts-foo" + + location_backend_1 = backend1 + "/services/" + backend_service_id + process_graph = {"foo": {"process_id": "foo", "arguments": {}}} + requests_mock.post( + backend1 + "/services", + headers={ + "OpenEO-Identifier": backend_service_id, + "Location": location_backend_1 + }, + status_code=201 + ) + processing = AggregatorProcessing(backends=multi_backend_connection, catalog=catalog, config=config) + implementation = AggregatorSecondaryServices(backends=multi_backend_connection, processing=processing) + + with flask_app.test_request_context(headers=TEST_USER_AUTH_HEADER): + actual_openeo_id = implementation.create_service( + user_id=TEST_USER, + process_graph=process_graph, + service_type="WMTS", + api_version="1.0.0", + configuration={} + ) + assert actual_openeo_id == expected_service_id + + @pytest.mark.parametrize("exception_class", [OpenEoApiError, OpenEoRestError]) + def test_create_service_backend_raises_openeoapiexception( + self, flask_app, multi_backend_connection, config, catalog, + backend1, requests_mock, exception_class + ): + """When the backend raises a general exception the aggregator raises an OpenEOApiException.""" + + # Set up responses for creating the service in backend 1: + # This time the backend raises an error, one that will be reported as a OpenEOApiException. + process_graph = {"foo": {"process_id": "foo", "arguments": {}}} + requests_mock.post( + backend1 + "/services", + exc=exception_class("Some server error"), + ) + processing = AggregatorProcessing(backends=multi_backend_connection, catalog=catalog, config=config) + implementation = AggregatorSecondaryServices(backends=multi_backend_connection, processing=processing) + + with flask_app.test_request_context(headers=TEST_USER_AUTH_HEADER): + with pytest.raises(OpenEOApiException): + implementation.create_service( + user_id=TEST_USER, + process_graph=process_graph, + service_type="WMTS", + api_version="1.0.0", + configuration={} + ) + + @pytest.mark.parametrize("exception_class", + [ProcessGraphMissingException, ProcessGraphInvalidException, ServiceUnsupportedException] + ) + def test_create_service_backend_reraises( + self, flask_app, multi_backend_connection, config, catalog, + backend1, requests_mock, exception_class + ): + """When the backend raises exception types that indicate client error / bad input data, + the aggregator raises and OpenEOApiException. + """ + + # Set up responses for creating the service in backend 1 + # This time the backend raises an error, one that will simply be re-raised/passed on as it is. + process_graph = {"foo": {"process_id": "foo", "arguments": {}}} + requests_mock.post( + backend1 + "/services", + exc=exception_class("Some server error") + ) + processing = AggregatorProcessing(backends=multi_backend_connection, catalog=catalog, config=config) + implementation = AggregatorSecondaryServices(backends=multi_backend_connection, processing=processing) + + with flask_app.test_request_context(headers=TEST_USER_AUTH_HEADER): + # These exception types should be re-raised, not become an OpenEOApiException. + with pytest.raises(exception_class): + implementation.create_service( + user_id=TEST_USER, + process_graph=process_graph, + service_type="WMTS", + api_version="1.0.0", + configuration={} + ) + + def test_remove_service_succeeds( + self, flask_app, multi_backend_connection, config, catalog, backend1, requests_mock + ): + """When remove_service is called with an existing service ID, it removes service and returns HTTP 204.""" + + mock_delete = requests_mock.delete(backend1 + "/services/wmts-foo", status_code=204) + processing = AggregatorProcessing(backends=multi_backend_connection, catalog=catalog, config=config) + implementation = AggregatorSecondaryServices(backends=multi_backend_connection, processing=processing) + + with flask_app.test_request_context(headers=TEST_USER_AUTH_HEADER): + implementation.remove_service(user_id=TEST_USER, service_id="b1-wmts-foo") + + # Make sure the aggregator asked the backend to remove the service. + assert mock_delete.called + + def test_remove_service_but_backend_id_not_found( + self, flask_app, multi_backend_connection, config, catalog, + ): + """When the backend ID/prefix does not exist then the aggregator raises an ServiceNotFoundException.""" + + processing = AggregatorProcessing(backends=multi_backend_connection, catalog=catalog, config=config) + implementation = AggregatorSecondaryServices(backends=multi_backend_connection, processing=processing) + + # Case 1: the backend doesn't even exist + with flask_app.test_request_context(headers=TEST_USER_AUTH_HEADER): + with pytest.raises(ServiceNotFoundException): + implementation.remove_service(user_id=TEST_USER, service_id="doesnotexist-wmts-foo") + + def test_remove_service_but_service_id_not_found( + self, flask_app, multi_backend_connection, config, catalog, + backend1, requests_mock + ): + """When the service ID does not exist for the specified backend then the aggregator raises an ServiceNotFoundException.""" + + processing = AggregatorProcessing(backends=multi_backend_connection, catalog=catalog, config=config) + implementation = AggregatorSecondaryServices(backends=multi_backend_connection, processing=processing) + + # The backend exists but the service ID does not. + mock_delete1 = requests_mock.delete( + backend1 + "/services/doesnotexist", + status_code=404 + ) + with flask_app.test_request_context(headers=TEST_USER_AUTH_HEADER): + with pytest.raises(ServiceNotFoundException): + implementation.remove_service(user_id=TEST_USER, service_id="b1-doesnotexist") + + # This should have tried to delete it on the backend so the mock must be called. + assert mock_delete1.called + + def test_remove_service_backend_response_is_an_error_status( + self, flask_app, multi_backend_connection, config, catalog, + backend1, requests_mock + ): + """When the backend response is an HTTP error status then the aggregator raises an OpenEoApiError.""" + + requests_mock.delete(backend1 + "/services/wmts-foo", status_code=500) + processing = AggregatorProcessing(backends=multi_backend_connection, catalog=catalog, config=config) + implementation = AggregatorSecondaryServices(backends=multi_backend_connection, processing=processing) + + with flask_app.test_request_context(headers=TEST_USER_AUTH_HEADER): + with pytest.raises(OpenEoApiError) as e: + implementation.remove_service(user_id=TEST_USER, service_id="b1-wmts-foo") + + # If the backend reports HTTP 400/500, we would expect the same status code from the aggregator. + # TODO: Statement above is an assumption. Is that really what we expect? + assert e.value.http_status_code == 500 + + def test_update_service_succeeds( + self, flask_app, multi_backend_connection, config, catalog, + backend1, requests_mock + ): + """When it receives an existing service ID and a correct payload, it updates the expected service.""" + + mock_patch = requests_mock.patch( + backend1 + "/services/wmts-foo", + status_code=204, + ) + processing = AggregatorProcessing(backends=multi_backend_connection, catalog=catalog, config=config) + implementation = AggregatorSecondaryServices(backends=multi_backend_connection, processing=processing) + process_graph_after = {"bar": {"process_id": "bar", "arguments": {"arg1": "bar"}}} + + with flask_app.test_request_context(headers=TEST_USER_AUTH_HEADER): + implementation.update_service(user_id=TEST_USER, service_id="b1-wmts-foo", process_graph=process_graph_after) + + # Make sure the aggregator asked the backend to remove the service. + assert mock_patch.called + + # TODO: I am not too sure this json payload is correct. Check with codebases of other backend drivers. + expected_process = {"process": {"process_graph": process_graph_after}} + assert mock_patch.last_request.json() == expected_process + + def test_update_service_but_backend_id_does_not_exist( + self, flask_app, multi_backend_connection, config, catalog, + ): + """When the backend ID/prefix does not exist then the aggregator raises an ServiceNotFoundException.""" + + processing = AggregatorProcessing(backends=multi_backend_connection, catalog=catalog, config=config) + implementation = AggregatorSecondaryServices(backends=multi_backend_connection, processing=processing) + process_graph_after = {"bar": {"process_id": "bar", "arguments": {"arg1": "bar"}}} + + with flask_app.test_request_context(headers=TEST_USER_AUTH_HEADER): + with pytest.raises(ServiceNotFoundException): + implementation.update_service(user_id=TEST_USER, service_id="doesnotexist-wmts-foo", process_graph=process_graph_after) + + def test_update_service_but_service_id_not_found( + self, flask_app, multi_backend_connection, config, catalog, + backend1, requests_mock + ): + """When the service ID does not exist for the specified backend then the aggregator raises an ServiceNotFoundException.""" + + # These requests should not be executed, so check they are not called. + mock_patch1 = requests_mock.patch( + backend1 + "/services/doesnotexist", + status_code=404, + ) + processing = AggregatorProcessing(backends=multi_backend_connection, catalog=catalog, config=config) + implementation = AggregatorSecondaryServices(backends=multi_backend_connection, processing=processing) + process_graph_after = {"bar": {"process_id": "bar", "arguments": {"arg1": "bar"}}} + + with flask_app.test_request_context(headers=TEST_USER_AUTH_HEADER): + with pytest.raises(ServiceNotFoundException): + implementation.update_service(user_id=TEST_USER, service_id="b1-doesnotexist", process_graph=process_graph_after) + + assert mock_patch1.called + + def test_update_service_backend_response_is_an_error_status( + self, flask_app, multi_backend_connection, config, catalog, + backend1, requests_mock + ): + """When the backend response is an HTTP error status then the aggregator raises an OpenEoApiError.""" + + mock_patch = requests_mock.patch( + backend1 + "/services/wmts-foo", + status_code=500, + ) + processing = AggregatorProcessing(backends=multi_backend_connection, catalog=catalog, config=config) + implementation = AggregatorSecondaryServices(backends=multi_backend_connection, processing=processing) + new_process_graph = {"bar": {"process_id": "bar", "arguments": {"arg1": "bar"}}} + + with flask_app.test_request_context(headers=TEST_USER_AUTH_HEADER): + with pytest.raises(OpenEoApiError) as e: + implementation.update_service(user_id=TEST_USER, service_id="b1-wmts-foo", process_graph=new_process_graph) + + assert e.value.http_status_code == 500 + assert mock_patch.called + + class TestInternalCollectionMetadata: def test_get_set_backends_for_collection(self): @@ -1092,6 +1624,29 @@ def test_parse_aggregator_job_id_fail(self, multi_backend_connection): ) +from openeo_aggregator.backend import ServiceIdMapping +class TestServiceIdMapping: + + def test_get_aggregator_job_id(self): + assert ServiceIdMapping.get_aggregator_service_id( + backend_service_id="service-x17-abc", backend_id="vito" + ) == "vito-service-x17-abc" + + def test_parse_aggregator_job_id(self, multi_backend_connection): + assert ServiceIdMapping.parse_aggregator_service_id( + backends=multi_backend_connection, aggregator_service_id="b1-serv021b" + ) == ("serv021b", "b1") + assert ServiceIdMapping.parse_aggregator_service_id( + backends=multi_backend_connection, aggregator_service_id="b2-someservice-321-ab14jh" + ) == ("someservice-321-ab14jh", "b2") + + def test_parse_aggregator_job_id_fail(self, multi_backend_connection): + with pytest.raises(ServiceNotFoundException): + ServiceIdMapping.parse_aggregator_service_id( + backends=multi_backend_connection, aggregator_service_id="b3-b6tch-j0b-o123423" + ) + + class TestAggregatorProcessing: def test_get_process_registry( self, diff --git a/tests/test_views.py b/tests/test_views.py index 59dea044..bf735bf2 100644 --- a/tests/test_views.py +++ b/tests/test_views.py @@ -1,5 +1,4 @@ import logging -import logging import re from typing import Tuple, List @@ -7,11 +6,14 @@ import requests from openeo.rest.connection import url_join +from openeo.rest import OpenEoApiError, OpenEoRestError +from openeo_aggregator.backend import AggregatorCollectionCatalog from openeo_aggregator.config import AggregatorConfig from openeo_aggregator.metadata import STAC_PROPERTY_PROVIDER_BACKEND from openeo_aggregator.testing import clock_mock from openeo_driver.errors import JobNotFoundException, JobNotFinishedException, \ - ProcessGraphInvalidException + ProcessGraphInvalidException, ProcessGraphMissingException +from openeo_driver.backend import ServiceMetadata from openeo_driver.testing import ApiTester, TEST_USER_AUTH_HEADER, TEST_USER, TEST_USER_BEARER_TOKEN, DictSubSet, \ RegexMatcher from .conftest import assert_dict_subset, get_api100, get_flask_app @@ -1317,6 +1319,366 @@ def post_jobs(request: requests.Request, context): ] +class TestSecondaryServices: + + @pytest.fixture + def service_metadata_wmts_foo(self): + return ServiceMetadata( + id="wmts-foo", + process={"process_graph": {"foo": {"process_id": "foo", "arguments": {}}}}, + url='https://oeo.net/wmts/foo', + type="WMTS", + enabled=True, + configuration={"version": "0.5.8"}, + attributes={}, + title="Test WMTS service" + # not setting "created": This is used to test creating a service. + ) + + + def test_service_types_simple(self, api100, backend1, backend2, requests_mock): + """Given 2 backends but only 1 backend has a single service, then the aggregator + returns that 1 service's metadata. + """ + single_service_type = { + "WMTS": { + "configuration": { + "colormap": { + "default": "YlGn", + "description": + "The colormap to apply to single band layers", + "type": "string" + }, + "version": { + "default": "1.0.0", + "description": "The WMTS version to use.", + "enum": ["1.0.0"], + "type": "string" + } + }, + "links": [], + "process_parameters": [], + "title": "Web Map Tile Service" + } + } + requests_mock.get(backend1 + "/service_types", json=single_service_type) + requests_mock.get(backend2 + "/service_types", json=single_service_type) + + resp = api100.get('/service_types').assert_status_code(200) + assert resp.json == single_service_type + + def test_service_types_merging(self, api100, backend1, backend2, requests_mock): + """Given 2 backends with each 1 service, then the aggregator lists both services.""" + service_type_1 = { + "WMTS": { + "configuration": { + "colormap": { + "default": "YlGn", + "description": + "The colormap to apply to single band layers", + "type": "string" + }, + "version": { + "default": "1.0.0", + "description": "The WMTS version to use.", + "enum": ["1.0.0"], + "type": "string" + } + }, + "links": [], + "process_parameters": [], + "title": "Web Map Tile Service" + } + } + service_type_2 = { + "WMS": { + "title": "OGC Web Map Service", + "configuration": {}, + "process_parameters": [], + "links": [] + } + } + requests_mock.get(backend1 + "/service_types", json=service_type_1) + requests_mock.get(backend2 + "/service_types", json=service_type_2) + + resp = api100.get("/service_types").assert_status_code(200) + actual_service_types = resp.json + + expected_service_types = dict(service_type_1) + expected_service_types.update(service_type_2) + assert actual_service_types == expected_service_types + + def test_service_info(self, api100, backend1, requests_mock): + """When it gets a correct service ID, it returns the expected service's metadata as JSON.""" + + json_wmts_foo = { + "id": "wmts-foo", + "process": {"process_graph": {"foo": {"process_id": "foo", "arguments": {}}}}, + "url": "https://oeo.net/wmts/foo", + "type": "WMTS", + "enabled": "True", + "configuration": {"version": "0.5.8"}, + "attributes": {}, + "title": "Test WMTS service" + } + requests_mock.get(backend1 + "/services/wmts-foo", json=json_wmts_foo) + api100.set_auth_bearer_token(token=TEST_USER_BEARER_TOKEN) + + resp = api100.get("/services/b1-wmts-foo").assert_status_code(200) + + expected_json_wmts_foo = dict(json_wmts_foo) + expected_json_wmts_foo["id"] = "b1-" + json_wmts_foo["id"] + assert resp.json == expected_json_wmts_foo + + def test_service_info_wrong_id(self, api100): + """When it gets a non-existent service ID, the aggregator responds with HTTP 404, not found.""" + api100.set_auth_bearer_token(token=TEST_USER_BEARER_TOKEN) + + # The backend ID is wrong. + api100.get("/services/doesnotexist-someservice").assert_status_code(404) + + # The backend ID exists but the service ID is wrong. + api100.get("/services/b1-doesnotexist").assert_status_code(404) + + def test_create_wmts(self, api100, requests_mock, backend1): + """When the payload is correct the service should be successfully created, + the service ID should be prepended with the backend ID, + and location should point to the aggregator, not to the backend directly. + """ + api100.set_auth_bearer_token(TEST_USER_BEARER_TOKEN) + + backend_service_id = 'c63d6c27-c4c2-4160-b7bd-9e32f582daec' + expected_openeo_id = "b1-" + backend_service_id + + # The aggregator MUST NOT point to the backend instance but to its own endpoint. + # This is handled by the openeo python driver in openeo_driver.views.services_post. + expected_location = "/openeo/1.0.0/services/" + expected_openeo_id + # However, backend1 must report its OWN location. + location_backend_1 = backend1 + "/services" + backend_service_id + + process_graph = {"foo": {"process_id": "foo", "arguments": {}}} + post_data = { + "type": 'WMTS', + "process": { + "process_graph": process_graph, + "id": "filter_temporal_wmts" + }, + "title": "My Service", + "description": "Service description" + } + requests_mock.post( + backend1 + "/services", + headers={ + "OpenEO-Identifier": backend_service_id, + "Location": location_backend_1 + }, + status_code=201 + ) + + resp = api100.post('/services', json=post_data).assert_status_code(201) + + assert resp.headers['OpenEO-Identifier'] == expected_openeo_id + assert resp.headers['Location'] == expected_location + + # ProcessGraphMissingException and ProcessGraphInvalidException are well known reasons for a bad client request. + @pytest.mark.parametrize("exception_class", [ProcessGraphMissingException, ProcessGraphInvalidException]) + def test_create_wmts_reports_400_client_error(self, api100, requests_mock, backend1, exception_class): + """When the backend raises exceptions that are typically a bad request / HTTP 400, then + we expect the aggregator to return a HTTP 400 status code.""" + + api100.set_auth_bearer_token(TEST_USER_BEARER_TOKEN) + process_graph = {"foo": {"process_id": "foo", "arguments": {}}} + post_data = { + "type": 'WMTS', + "process": { + "process_graph": process_graph, + "id": "filter_temporal_wmts" + }, + "title": "My Service", + "description": "Service description" + } + # TODO: In theory we should make the backend report a HTTP 400 status and then the aggregator + # should also report HTTP 400. But in fact that comes back as HTTP 500. + requests_mock.post( + backend1 + "/services", + exc=exception_class("Testing exception handling") + ) + + resp = api100.post('/services', json=post_data) + assert resp.status_code == 400 + + # OpenEoApiError, OpenEoRestError: more general errors we can expect to lead to a HTTP 500 server error. + @pytest.mark.parametrize("exception_class", [OpenEoApiError, OpenEoRestError]) + def test_create_wmts_reports_500_server_error(self, api100, requests_mock, backend1, exception_class): + """When the backend raises exceptions that are typically a server error / HTTP 500, then + we expect the aggregator to return a HTTP 500 status code.""" + + api100.set_auth_bearer_token(TEST_USER_BEARER_TOKEN) + + process_graph = {"foo": {"process_id": "foo", "arguments": {}}} + post_data = { + "type": 'WMTS', + "process": { + "process_graph": process_graph, + "id": "filter_temporal_wmts" + }, + "title": "My Service", + "description": "Service description" + } + requests_mock.post( + backend1 + "/services", + exc=exception_class("Testing exception handling") + ) + + resp = api100.post('/services', json=post_data) + assert resp.status_code == 500 + + def test_remove_service_succeeds(self, api100, requests_mock, backend1): + """When remove_service is called with an existing service ID, it removes service and returns HTTP 204.""" + + api100.set_auth_bearer_token(TEST_USER_BEARER_TOKEN) + mock_delete = requests_mock.delete(backend1 + "/services/wmts-foo", status_code=204) + + resp = api100.delete("/services/b1-wmts-foo") + + assert resp.status_code == 204 + # Make sure the aggregator asked the backend to remove the service. + assert mock_delete.called + + + def test_remove_service_but_backend_id_not_found(self, api100): + """When the service ID does not exist then the aggregator responds with HTTP 404, not found.""" + + api100.set_auth_bearer_token(TEST_USER_BEARER_TOKEN) + + resp = api100.delete("/services/wmts-foo") + + assert resp.status_code == 404 + + def test_remove_service_but_service_id_not_found(self, api100, backend1, requests_mock): + """When the service ID does not exist then the aggregator responds with HTTP 404, not found.""" + + api100.set_auth_bearer_token(TEST_USER_BEARER_TOKEN) + mock_delete = requests_mock.delete( + backend1 + "/services/wmts-foo", + status_code=404, + ) + + resp = api100.delete("/services/b1-wmts-foo") + + assert resp.status_code == 404 + assert mock_delete.called + + def test_remove_service_backend_response_is_an_error_status( + self, api100, requests_mock, backend1, service_metadata_wmts_foo + ): + """When the backend response is an error, HTTP 500, then the aggregator also responds with HTTP 500 status.""" + + api100.set_auth_bearer_token(TEST_USER_BEARER_TOKEN) + requests_mock.get( + backend1 + "/services/wmts-foo", + json=service_metadata_wmts_foo.prepare_for_json(), + status_code=200 + ) + mock_delete = requests_mock.delete( + backend1 + "/services/wmts-foo", + status_code=500, + json={ + "id": "936DA01F-9ABD-4D9D-80C7-02AF85C822A8", + "code": "ErrorRemovingService", + "message": "Service 'wmts-foo' could not be removed.", + "url": "https://example.openeo.org/docs/errors/SampleError" + } + ) + + resp = api100.delete("/services/b1-wmts-foo") + + assert resp.status_code == 500 + # Verify the aggregator effectively asked the backend to remove the service, + # so we can reasonably assume that is where the error came from. + assert mock_delete.called + + def test_update_service_service_succeeds( + self, api100, backend1, requests_mock, service_metadata_wmts_foo + ): + """When it receives an existing service ID and a correct payload, it updates the expected service.""" + + api100.set_auth_bearer_token(TEST_USER_BEARER_TOKEN) + + mock_patch = requests_mock.patch( + backend1 + "/services/wmts-foo", + json=service_metadata_wmts_foo.prepare_for_json(), + status_code=204 + ) + process_graph = {"bar": {"process_id": "bar", "arguments": {"new_arg": "somevalue"}}} + json_payload = {"process": {"process_graph": process_graph}} + + resp = api100.patch("/services/b1-wmts-foo", json=json_payload) + + assert resp.status_code == 204 + # Make sure the aggregator asked the backend to update the service. + assert mock_patch.called + assert mock_patch.last_request.json() == json_payload + + + def test_update_service_but_backend_id_not_found(self, api100): + """When the service ID does not exist because the backend prefix is wrong, then the aggregator responds with HTTP 404, not found.""" + + api100.set_auth_bearer_token(TEST_USER_BEARER_TOKEN) + process_graph = {"bar": {"process_id": "bar", "arguments": {"new_arg": "somevalue"}}} + json_payload = {"process": {"process_graph": process_graph}} + + resp = api100.patch("/services/backenddoesnotexist-someservice", json=json_payload) + + assert resp.status_code == 404 + + def test_update_service_service_id_not_found( + self, api100, backend1, requests_mock, service_metadata_wmts_foo + ): + """When the service ID does not exist for the specified backend, then the aggregator responds with HTTP 404, not found.""" + + api100.set_auth_bearer_token(TEST_USER_BEARER_TOKEN) + mock_patch = requests_mock.patch( + backend1 + "/services/servicedoesnotexist", + json=service_metadata_wmts_foo.prepare_for_json(), + status_code=404 + ) + process_graph = {"bar": {"process_id": "bar", "arguments": {"new_arg": "somevalue"}}} + json_payload = {"process": {"process_graph": process_graph}} + + resp = api100.patch("/services/b1-servicedoesnotexist", json=json_payload) + + assert resp.status_code == 404 + assert mock_patch.called + + # TODO: for now, not bothering with HTTP 400 in the backend. To be decided if this is necessary. + @pytest.mark.parametrize("backend_http_status", [500]) + def test_update_service_backend_response_is_an_error_status( + self, api100, backend1, requests_mock, backend_http_status + ): + """When the backend response is an error HTTP 400/500 then the aggregator raises an OpenEoApiError.""" + + api100.set_auth_bearer_token(TEST_USER_BEARER_TOKEN) + mock_patch = requests_mock.patch( + backend1 + "/services/wmts-foo", + status_code=backend_http_status, + json={ + "id": "936DA01F-9ABD-4D9D-80C7-02AF85C822A8", + "code": "ErrorUpdatingService", + "message": "Service 'wmts-foo' could not be updated.", + "url": "https://example.openeo.org/docs/errors/SampleError" + } + ) + process_graph = {"bar": {"process_id": "bar", "arguments": {"new_arg": "somevalue"}}} + json_payload = {"process": {"process_graph": process_graph}} + + resp = api100.patch("/services/b1-wmts-foo", json=json_payload) + + assert resp.status_code == backend_http_status + assert mock_patch.called + assert mock_patch.last_request.json() == json_payload + + class TestResilience: @pytest.fixture