From a67cdbff052904574367cf19625271582ec94a86 Mon Sep 17 00:00:00 2001 From: Johan Schreurs Date: Thu, 8 Dec 2022 15:36:31 +0100 Subject: [PATCH 1/7] Issue #84: Implement caching for AggregatorSecondaryServices.service_types --- src/openeo_aggregator/backend.py | 13 ++++- tests/test_backend.py | 81 +++++++++++++++++++++++++------- 2 files changed, 76 insertions(+), 18 deletions(-) diff --git a/src/openeo_aggregator/backend.py b/src/openeo_aggregator/backend.py index 8f0fe389..faa2d47f 100644 --- a/src/openeo_aggregator/backend.py +++ b/src/openeo_aggregator/backend.py @@ -648,10 +648,16 @@ class AggregatorSecondaryServices(SecondaryServices): def __init__( self, backends: MultiBackendConnection, - processing: AggregatorProcessing + processing: AggregatorProcessing, + config: AggregatorConfig ): super(AggregatorSecondaryServices, self).__init__() self._backends = backends + + self._memoizer = memoizer_from_config(config=config, namespace="SecondaryServices") + self._backends.on_connections_change.add(self._memoizer.invalidate) + + # TODO Issue #84 Decide which backend based on service type. Will need to remove self._processing for this. self._processing = processing def _get_connection_and_backend_service_id( @@ -671,6 +677,9 @@ def _get_connection_and_backend_service_id( return con, backend_service_id def service_types(self) -> dict: + return self._memoizer.get_or_call(key=("all_service_types",), callback=self._get_service_types) + + def _get_service_types(self) -> dict: """https://openeo.org/documentation/1.0/developers/api/reference.html#operation/list-service-types""" # TODO: add caching. Also see https://github.com/Open-EO/openeo-aggregator/issues/78#issuecomment-1326180557 service_types = {} @@ -848,7 +857,7 @@ def __init__(self, backends: MultiBackendConnection, config: AggregatorConfig): partitioned_job_tracker=partitioned_job_tracker ) - secondary_services = AggregatorSecondaryServices(backends=backends, processing=processing) + secondary_services = AggregatorSecondaryServices(backends=backends, processing=processing, config=config) super().__init__( catalog=catalog, diff --git a/tests/test_backend.py b/tests/test_backend.py index 50922cae..199a1b0f 100644 --- a/tests/test_backend.py +++ b/tests/test_backend.py @@ -164,11 +164,60 @@ def test_service_types_simple( requests_mock.get(backend1 + "/service_types", json=single_service_type) requests_mock.get(backend2 + "/service_types", json={}) processing = AggregatorProcessing(backends=multi_backend_connection, catalog=catalog, config=config) - implementation = AggregatorSecondaryServices(backends=multi_backend_connection, processing=processing) + implementation = AggregatorSecondaryServices(backends=multi_backend_connection, processing=processing, config=config) service_types = implementation.service_types() assert service_types == single_service_type + def test_service_types_simple_cached( + self, multi_backend_connection, config, catalog, backend1, backend2, requests_mock + ): + """The service_types call is cached: + When we get the service types several times, the second call that happens before the cache expires, + doesn't hit the backend. + But the third call that happens that happens after the cache has expired does hit the backend again. + """ + single_service_type = { + "WMTS": { + "configuration": { + "colormap": { + "default": "YlGn", + "description": + "The colormap to apply to single band layers", + "type": "string" + }, + "version": { + "default": "1.0.0", + "description": "The WMTS version to use.", + "enum": ["1.0.0"], + "type": "string" + } + }, + "links": [], + "process_parameters": [], + "title": "Web Map Tile Service" + } + } + mock_be1 = requests_mock.get(backend1 + "/service_types", json=single_service_type) + processing = AggregatorProcessing(backends=multi_backend_connection, catalog=catalog, config=config) + implementation = AggregatorSecondaryServices(backends=multi_backend_connection, processing=processing, config=config) + + service_types = implementation.service_types() + assert mock_be1.call_count == 1 + assert service_types == single_service_type + + # Second call happens before the cache item expires: it should not query the backend. + service_types = implementation.service_types() + assert mock_be1.call_count == 1 + assert service_types == single_service_type + + # Third call happens when the cached item has expired: it should query the backend. + with clock_mock(offset=100): + service_types = implementation.service_types() + assert mock_be1.call_count == 2 + assert service_types == single_service_type + + # TODO: Issue #84 Check if we should remove this merging behavior. def test_service_types_merging(self, multi_backend_connection, config, catalog, backend1, backend2, requests_mock ): @@ -205,7 +254,7 @@ def test_service_types_merging(self, multi_backend_connection, config, catalog, requests_mock.get(backend1 + "/service_types", json=service_type_1) requests_mock.get(backend2 + "/service_types", json=service_type_2) processing = AggregatorProcessing(backends=multi_backend_connection, catalog=catalog, config=config) - implementation = AggregatorSecondaryServices(backends=multi_backend_connection, processing=processing) + implementation = AggregatorSecondaryServices(backends=multi_backend_connection, processing=processing, config=config) actual_service_types = implementation.service_types() @@ -251,7 +300,7 @@ def test_service_info_succeeds( requests_mock.get(backend1 + "/services/wmts-foo", json=json_wmts_foo) requests_mock.get(backend2 + "/services/wms-bar", json=json_wms_bar) processing = AggregatorProcessing(backends=multi_backend_connection, catalog=catalog, config=config) - implementation = AggregatorSecondaryServices(backends=multi_backend_connection, processing=processing) + implementation = AggregatorSecondaryServices(backends=multi_backend_connection, processing=processing, config=config) # Check the expected metadata on *both* of the services. with flask_app.test_request_context(headers=TEST_USER_AUTH_HEADER): @@ -280,7 +329,7 @@ def test_service_info_wrong_backend_id( requests_mock.get(backend1 + "/services/wmts-foo", json=service_metadata_wmts_foo.prepare_for_json()) processing = AggregatorProcessing(backends=multi_backend_connection, catalog=catalog, config=config) - implementation = AggregatorSecondaryServices(backends=multi_backend_connection, processing=processing) + implementation = AggregatorSecondaryServices(backends=multi_backend_connection, processing=processing, config=config) with flask_app.test_request_context(headers=TEST_USER_AUTH_HEADER): with pytest.raises(ServiceNotFoundException): @@ -293,7 +342,7 @@ def test_service_info_wrong_service_id( requests_mock.get(backend1 + "/services/service-does-not-exist", status_code=404) processing = AggregatorProcessing(backends=multi_backend_connection, catalog=catalog, config=config) - implementation = AggregatorSecondaryServices(backends=multi_backend_connection, processing=processing) + implementation = AggregatorSecondaryServices(backends=multi_backend_connection, processing=processing, config=config) with flask_app.test_request_context(headers=TEST_USER_AUTH_HEADER): with pytest.raises(ServiceNotFoundException): @@ -322,7 +371,7 @@ def test_create_service_succeeds( status_code=201 ) processing = AggregatorProcessing(backends=multi_backend_connection, catalog=catalog, config=config) - implementation = AggregatorSecondaryServices(backends=multi_backend_connection, processing=processing) + implementation = AggregatorSecondaryServices(backends=multi_backend_connection, processing=processing, config=config) with flask_app.test_request_context(headers=TEST_USER_AUTH_HEADER): actual_openeo_id = implementation.create_service( @@ -349,7 +398,7 @@ def test_create_service_backend_raises_openeoapiexception( exc=exception_class("Some server error"), ) processing = AggregatorProcessing(backends=multi_backend_connection, catalog=catalog, config=config) - implementation = AggregatorSecondaryServices(backends=multi_backend_connection, processing=processing) + implementation = AggregatorSecondaryServices(backends=multi_backend_connection, processing=processing, config=config) with flask_app.test_request_context(headers=TEST_USER_AUTH_HEADER): with pytest.raises(OpenEOApiException): @@ -380,7 +429,7 @@ def test_create_service_backend_reraises( exc=exception_class("Some server error") ) processing = AggregatorProcessing(backends=multi_backend_connection, catalog=catalog, config=config) - implementation = AggregatorSecondaryServices(backends=multi_backend_connection, processing=processing) + implementation = AggregatorSecondaryServices(backends=multi_backend_connection, processing=processing, config=config) with flask_app.test_request_context(headers=TEST_USER_AUTH_HEADER): # These exception types should be re-raised, not become an OpenEOApiException. @@ -400,7 +449,7 @@ def test_remove_service_succeeds( mock_delete = requests_mock.delete(backend1 + "/services/wmts-foo", status_code=204) processing = AggregatorProcessing(backends=multi_backend_connection, catalog=catalog, config=config) - implementation = AggregatorSecondaryServices(backends=multi_backend_connection, processing=processing) + implementation = AggregatorSecondaryServices(backends=multi_backend_connection, processing=processing, config=config) with flask_app.test_request_context(headers=TEST_USER_AUTH_HEADER): implementation.remove_service(user_id=TEST_USER, service_id="b1-wmts-foo") @@ -414,7 +463,7 @@ def test_remove_service_but_backend_id_not_found( """When the backend ID/prefix does not exist then the aggregator raises an ServiceNotFoundException.""" processing = AggregatorProcessing(backends=multi_backend_connection, catalog=catalog, config=config) - implementation = AggregatorSecondaryServices(backends=multi_backend_connection, processing=processing) + implementation = AggregatorSecondaryServices(backends=multi_backend_connection, processing=processing, config=config) # Case 1: the backend doesn't even exist with flask_app.test_request_context(headers=TEST_USER_AUTH_HEADER): @@ -428,7 +477,7 @@ def test_remove_service_but_service_id_not_found( """When the service ID does not exist for the specified backend then the aggregator raises an ServiceNotFoundException.""" processing = AggregatorProcessing(backends=multi_backend_connection, catalog=catalog, config=config) - implementation = AggregatorSecondaryServices(backends=multi_backend_connection, processing=processing) + implementation = AggregatorSecondaryServices(backends=multi_backend_connection, processing=processing, config=config) # The backend exists but the service ID does not. mock_delete1 = requests_mock.delete( @@ -450,7 +499,7 @@ def test_remove_service_backend_response_is_an_error_status( requests_mock.delete(backend1 + "/services/wmts-foo", status_code=500) processing = AggregatorProcessing(backends=multi_backend_connection, catalog=catalog, config=config) - implementation = AggregatorSecondaryServices(backends=multi_backend_connection, processing=processing) + implementation = AggregatorSecondaryServices(backends=multi_backend_connection, processing=processing, config=config) with flask_app.test_request_context(headers=TEST_USER_AUTH_HEADER): with pytest.raises(OpenEoApiError) as e: @@ -471,7 +520,7 @@ def test_update_service_succeeds( status_code=204, ) processing = AggregatorProcessing(backends=multi_backend_connection, catalog=catalog, config=config) - implementation = AggregatorSecondaryServices(backends=multi_backend_connection, processing=processing) + implementation = AggregatorSecondaryServices(backends=multi_backend_connection, processing=processing, config=config) process_graph_after = {"bar": {"process_id": "bar", "arguments": {"arg1": "bar"}}} with flask_app.test_request_context(headers=TEST_USER_AUTH_HEADER): @@ -490,7 +539,7 @@ def test_update_service_but_backend_id_does_not_exist( """When the backend ID/prefix does not exist then the aggregator raises an ServiceNotFoundException.""" processing = AggregatorProcessing(backends=multi_backend_connection, catalog=catalog, config=config) - implementation = AggregatorSecondaryServices(backends=multi_backend_connection, processing=processing) + implementation = AggregatorSecondaryServices(backends=multi_backend_connection, processing=processing, config=config) process_graph_after = {"bar": {"process_id": "bar", "arguments": {"arg1": "bar"}}} with flask_app.test_request_context(headers=TEST_USER_AUTH_HEADER): @@ -509,7 +558,7 @@ def test_update_service_but_service_id_not_found( status_code=404, ) processing = AggregatorProcessing(backends=multi_backend_connection, catalog=catalog, config=config) - implementation = AggregatorSecondaryServices(backends=multi_backend_connection, processing=processing) + implementation = AggregatorSecondaryServices(backends=multi_backend_connection, processing=processing, config=config) process_graph_after = {"bar": {"process_id": "bar", "arguments": {"arg1": "bar"}}} with flask_app.test_request_context(headers=TEST_USER_AUTH_HEADER): @@ -529,7 +578,7 @@ def test_update_service_backend_response_is_an_error_status( status_code=500, ) processing = AggregatorProcessing(backends=multi_backend_connection, catalog=catalog, config=config) - implementation = AggregatorSecondaryServices(backends=multi_backend_connection, processing=processing) + implementation = AggregatorSecondaryServices(backends=multi_backend_connection, processing=processing, config=config) new_process_graph = {"bar": {"process_id": "bar", "arguments": {"arg1": "bar"}}} with flask_app.test_request_context(headers=TEST_USER_AUTH_HEADER): From d32dbaf971303b2135480c1a8fe2b9c804018b40 Mon Sep 17 00:00:00 2001 From: Johan Schreurs Date: Fri, 9 Dec 2022 11:55:38 +0100 Subject: [PATCH 2/7] Issue #84: Implement selection of backend based on service type, for create_service. --- src/openeo_aggregator/backend.py | 67 +++++++++++++++---- tests/test_backend.py | 106 ++++++++++++++++++------------- tests/test_views.py | 27 +++++++- 3 files changed, 142 insertions(+), 58 deletions(-) diff --git a/src/openeo_aggregator/backend.py b/src/openeo_aggregator/backend.py index faa2d47f..eaad26e1 100644 --- a/src/openeo_aggregator/backend.py +++ b/src/openeo_aggregator/backend.py @@ -34,7 +34,7 @@ from openeo_driver.datacube import DriverDataCube from openeo_driver.errors import CollectionNotFoundException, OpenEOApiException, ProcessGraphMissingException, \ JobNotFoundException, JobNotFinishedException, ProcessGraphInvalidException, PermissionsInsufficientException, \ - FeatureUnsupportedException, ServiceNotFoundException + FeatureUnsupportedException, ServiceNotFoundException, ServiceUnsupportedException from openeo_driver.processes import ProcessRegistry from openeo_driver.users import User from openeo_driver.utils import EvalEnv @@ -677,18 +677,61 @@ def _get_connection_and_backend_service_id( return con, backend_service_id def service_types(self) -> dict: - return self._memoizer.get_or_call(key=("all_service_types",), callback=self._get_service_types) + """https://openeo.org/documentation/1.0/developers/api/reference.html#operation/list-service-types""" + # TODO: implement household data to cache useful data for selecting backend based on service and backend's capabilities. + + cached_info = self._memoizer.get_or_call(key=("_get_service_types",), callback=self._get_service_types) + # Convert the cached results back to the format that service_types should return. + return {name: data["service_type"] for name, data, in cached_info.items()} + + def _find_backend_id_for_service_type(self, service_type: str) -> Optional[str]: + """Returns the ID of the backend that provides the service_type or None if no backend supports that service.""" + cached_info = self._memoizer.get_or_call(key=("_get_service_types",), callback=self._get_service_types) + return cached_info.get(service_type, {}).get("backend_id") def _get_service_types(self) -> dict: - """https://openeo.org/documentation/1.0/developers/api/reference.html#operation/list-service-types""" - # TODO: add caching. Also see https://github.com/Open-EO/openeo-aggregator/issues/78#issuecomment-1326180557 + """Returns a dict that maps the service name to a dict that contains 2 items: + 1) the backend_id that provides this secondary service + 2) the service_types for self.service_types. + + For example: + {'WMTS': + {'backend_id': 'b1', + 'service_type': // contains what backend b1 returned for service type WMTS. + {'configuration': + ... + } + } + 'WMS': + {'backend_id': 'b2', + 'service_type': + {'configuration': + ... + } + } + } + + Info for selecting the right backend to create a new service + ------------------------------------------------------------ + + We are assuming that there is only one upstream backend per service type. + That means we don't really need to merge duplicate service types, i.e. there is no need to resolve + some kind of conflicts between duplicate service types. + + So far we don't store info about the backends' capabilities, but in the future + we may need to store that to select the right backend when we create a service. + + See: issues #83 and #84: + https://github.com/Open-EO/openeo-aggregator/issues/83 + https://github.com/Open-EO/openeo-aggregator/issues/84 + """ service_types = {} # TODO: Instead of merge: prefix each type with backend-id? #83 - def merge(formats: dict, to_add: dict): - for name, data in to_add.items(): - if name.lower() not in {k.lower() for k in formats.keys()}: - formats[name] = data + def merge(services: dict, backend_id: str, new_service: dict): + for name, data in new_service.items(): + if name.lower() not in {k.lower() for k in services.keys()}: + services[name] = dict(backend_id=backend_id, service_type=data) # Collect all service types from the backends. for con in self._backends: @@ -700,7 +743,7 @@ def merge(formats: dict, to_add: dict): _log.warning(f"Failed to get service_types from {con.id}: {e!r}", exc_info=True) continue # TODO #1 smarter merging: parameter differences? - merge(service_types, types_to_add) + merge(service_types, con.id, types_to_add) return service_types @@ -764,9 +807,9 @@ def create_service(self, user_id: str, process_graph: dict, service_type: str, a if "sentinelhub" in self._backends._backend_urls: backend_id = "sentinelhub" else: - backend_id = self._processing.get_backend_for_process_graph( - process_graph=process_graph, api_version=api_version - ) + backend_id = self._find_backend_id_for_service_type(service_type) + if backend_id is None: + raise ServiceUnsupportedException(service_type) process_graph = self._processing.preprocess_process_graph(process_graph, backend_id=backend_id) con = self._backends.get_connection(backend_id) diff --git a/tests/test_backend.py b/tests/test_backend.py index 199a1b0f..536a1024 100644 --- a/tests/test_backend.py +++ b/tests/test_backend.py @@ -134,33 +134,35 @@ class TestAggregatorSecondaryServices: # TODO: most tests here (the ones that do flask app stuff and auth) # belong under test_views.py + WMTS_SERVICE_TYPE = { + "WMTS": { + "configuration": { + "colormap": { + "default": "YlGn", + "description": + "The colormap to apply to single band layers", + "type": "string" + }, + "version": { + "default": "1.0.0", + "description": "The WMTS version to use.", + "enum": ["1.0.0"], + "type": "string" + } + }, + "links": [], + "process_parameters": [], + "title": "Web Map Tile Service" + } + } + def test_service_types_simple( self, multi_backend_connection, config, catalog, backend1, backend2, requests_mock ): """Given 2 backends and only 1 backend has a single service type, then the aggregator returns that 1 service type's metadata. """ - single_service_type = { - "WMTS": { - "configuration": { - "colormap": { - "default": "YlGn", - "description": - "The colormap to apply to single band layers", - "type": "string" - }, - "version": { - "default": "1.0.0", - "description": "The WMTS version to use.", - "enum": ["1.0.0"], - "type": "string" - } - }, - "links": [], - "process_parameters": [], - "title": "Web Map Tile Service" - } - } + single_service_type = self.WMTS_SERVICE_TYPE requests_mock.get(backend1 + "/service_types", json=single_service_type) requests_mock.get(backend2 + "/service_types", json={}) processing = AggregatorProcessing(backends=multi_backend_connection, catalog=catalog, config=config) @@ -177,27 +179,7 @@ def test_service_types_simple_cached( doesn't hit the backend. But the third call that happens that happens after the cache has expired does hit the backend again. """ - single_service_type = { - "WMTS": { - "configuration": { - "colormap": { - "default": "YlGn", - "description": - "The colormap to apply to single band layers", - "type": "string" - }, - "version": { - "default": "1.0.0", - "description": "The WMTS version to use.", - "enum": ["1.0.0"], - "type": "string" - } - }, - "links": [], - "process_parameters": [], - "title": "Web Map Tile Service" - } - } + single_service_type = self.WMTS_SERVICE_TYPE mock_be1 = requests_mock.get(backend1 + "/service_types", json=single_service_type) processing = AggregatorProcessing(backends=multi_backend_connection, catalog=catalog, config=config) implementation = AggregatorSecondaryServices(backends=multi_backend_connection, processing=processing, config=config) @@ -217,8 +199,7 @@ def test_service_types_simple_cached( assert mock_be1.call_count == 2 assert service_types == single_service_type - # TODO: Issue #84 Check if we should remove this merging behavior. - def test_service_types_merging(self, multi_backend_connection, config, catalog, + def test_service_types_multiple_backends(self, multi_backend_connection, config, catalog, backend1, backend2, requests_mock ): """Given 2 backends with each 1 service type, then the aggregator lists both service types.""" @@ -370,6 +351,7 @@ def test_create_service_succeeds( }, status_code=201 ) + requests_mock.get(backend1 + "/service_types", json=self.WMTS_SERVICE_TYPE) processing = AggregatorProcessing(backends=multi_backend_connection, catalog=catalog, config=config) implementation = AggregatorSecondaryServices(backends=multi_backend_connection, processing=processing, config=config) @@ -383,6 +365,40 @@ def test_create_service_succeeds( ) assert actual_openeo_id == expected_service_id + def test_create_service_raises_serviceunsupportedexception( + self, flask_app, multi_backend_connection, config, catalog, backend1, requests_mock + ): + """When it gets a request for a service type that no backend supports, it raises ServiceUnsupportedException.""" + + # Set up one service type. Don't want test to succeed because there are no services at all. + requests_mock.get(backend1 + "/service_types", json=self.WMTS_SERVICE_TYPE) + + non_existent_service_id = "b1-doesnotexist" + # check that this requests_mock does not get called + location_backend_1 = backend1 + "/services/" + non_existent_service_id + process_graph = {"foo": {"process_id": "foo", "arguments": {}}} + requests_mock.post( + backend1 + "/services", + headers={ + "OpenEO-Identifier": "wmts-foo", + "Location": location_backend_1 + }, + status_code=201 + ) + processing = AggregatorProcessing(backends=multi_backend_connection, catalog=catalog, config=config) + implementation = AggregatorSecondaryServices(backends=multi_backend_connection, processing=processing, config=config) + + with flask_app.test_request_context(headers=TEST_USER_AUTH_HEADER): + with pytest.raises(ServiceUnsupportedException): + implementation.create_service( + user_id=TEST_USER, + process_graph=process_graph, + service_type="does-not-exist", + api_version="1.0.0", + configuration={} + ) + + @pytest.mark.parametrize("exception_class", [OpenEoApiError, OpenEoRestError]) def test_create_service_backend_raises_openeoapiexception( self, flask_app, multi_backend_connection, config, catalog, @@ -397,6 +413,7 @@ def test_create_service_backend_raises_openeoapiexception( backend1 + "/services", exc=exception_class("Some server error"), ) + requests_mock.get(backend1 + "/service_types", json=self.WMTS_SERVICE_TYPE) processing = AggregatorProcessing(backends=multi_backend_connection, catalog=catalog, config=config) implementation = AggregatorSecondaryServices(backends=multi_backend_connection, processing=processing, config=config) @@ -428,6 +445,7 @@ def test_create_service_backend_reraises( backend1 + "/services", exc=exception_class("Some server error") ) + requests_mock.get(backend1 + "/service_types", json=self.WMTS_SERVICE_TYPE) processing = AggregatorProcessing(backends=multi_backend_connection, catalog=catalog, config=config) implementation = AggregatorSecondaryServices(backends=multi_backend_connection, processing=processing, config=config) diff --git a/tests/test_views.py b/tests/test_views.py index 56983bb7..0cdc06f3 100644 --- a/tests/test_views.py +++ b/tests/test_views.py @@ -1363,6 +1363,27 @@ def service_metadata_wmts_foo(self): # not setting "created": This is used to test creating a service. ) + WMTS_SERVICE_TYPE = { + "WMTS": { + "configuration": { + "colormap": { + "default": "YlGn", + "description": + "The colormap to apply to single band layers", + "type": "string" + }, + "version": { + "default": "1.0.0", + "description": "The WMTS version to use.", + "enum": ["1.0.0"], + "type": "string" + } + }, + "links": [], + "process_parameters": [], + "title": "Web Map Tile Service" + } + } def test_service_types_simple(self, api100, backend1, backend2, requests_mock): """Given 2 backends but only 1 backend has a single service, then the aggregator @@ -1390,12 +1411,12 @@ def test_service_types_simple(self, api100, backend1, backend2, requests_mock): } } requests_mock.get(backend1 + "/service_types", json=single_service_type) - requests_mock.get(backend2 + "/service_types", json=single_service_type) + requests_mock.get(backend2 + "/service_types", json={}) resp = api100.get('/service_types').assert_status_code(200) assert resp.json == single_service_type - def test_service_types_merging(self, api100, backend1, backend2, requests_mock): + def test_service_types_multiple_backends(self, api100, backend1, backend2, requests_mock): """Given 2 backends with each 1 service, then the aggregator lists both services.""" service_type_1 = { "WMTS": { @@ -1704,6 +1725,7 @@ def test_create_wmts(self, api100, requests_mock, backend1): }, status_code=201 ) + requests_mock.get(backend1 + "/service_types", json=self.WMTS_SERVICE_TYPE) resp = api100.post('/services', json=post_data).assert_status_code(201) @@ -1800,6 +1822,7 @@ def test_create_wmts_reports_500_server_error(self, api100, requests_mock, backe backend1 + "/services", exc=exception_class("Testing exception handling") ) + requests_mock.get(backend1 + "/service_types", json=self.WMTS_SERVICE_TYPE) resp = api100.post('/services', json=post_data) assert resp.status_code == 500 From 0cbaf495a589fcaa311ebba1921270315ef70ea7 Mon Sep 17 00:00:00 2001 From: Johan Schreurs Date: Fri, 9 Dec 2022 13:53:38 +0100 Subject: [PATCH 3/7] Issue #84: A bit of cleanup --- src/openeo_aggregator/backend.py | 4 ++-- tests/test_backend.py | 17 +++++++++-------- tests/test_views.py | 29 +++++------------------------ 3 files changed, 16 insertions(+), 34 deletions(-) diff --git a/src/openeo_aggregator/backend.py b/src/openeo_aggregator/backend.py index eaad26e1..afc35092 100644 --- a/src/openeo_aggregator/backend.py +++ b/src/openeo_aggregator/backend.py @@ -652,12 +652,11 @@ def __init__( config: AggregatorConfig ): super(AggregatorSecondaryServices, self).__init__() - self._backends = backends + self._backends = backends self._memoizer = memoizer_from_config(config=config, namespace="SecondaryServices") self._backends.on_connections_change.add(self._memoizer.invalidate) - # TODO Issue #84 Decide which backend based on service type. Will need to remove self._processing for this. self._processing = processing def _get_connection_and_backend_service_id( @@ -804,6 +803,7 @@ def create_service(self, user_id: str, process_graph: dict, service_type: str, a # Instead, properly determine backend based on service type? # See https://github.com/Open-EO/openeo-aggregator/issues/78#issuecomment-1326180557 # and https://github.com/Open-EO/openeo-aggregator/issues/83 + # Should be able to remove this hardcoded workaround once issue #85 has been implemented. if "sentinelhub" in self._backends._backend_urls: backend_id = "sentinelhub" else: diff --git a/tests/test_backend.py b/tests/test_backend.py index 536a1024..94f0f3d2 100644 --- a/tests/test_backend.py +++ b/tests/test_backend.py @@ -134,7 +134,7 @@ class TestAggregatorSecondaryServices: # TODO: most tests here (the ones that do flask app stuff and auth) # belong under test_views.py - WMTS_SERVICE_TYPE = { + SERVICE_TYPES_ONLT_WMTS = { "WMTS": { "configuration": { "colormap": { @@ -162,7 +162,7 @@ def test_service_types_simple( """Given 2 backends and only 1 backend has a single service type, then the aggregator returns that 1 service type's metadata. """ - single_service_type = self.WMTS_SERVICE_TYPE + single_service_type = self.SERVICE_TYPES_ONLT_WMTS requests_mock.get(backend1 + "/service_types", json=single_service_type) requests_mock.get(backend2 + "/service_types", json={}) processing = AggregatorProcessing(backends=multi_backend_connection, catalog=catalog, config=config) @@ -174,12 +174,13 @@ def test_service_types_simple( def test_service_types_simple_cached( self, multi_backend_connection, config, catalog, backend1, backend2, requests_mock ): - """The service_types call is cached: + """Scenario: The service_types call is cached: When we get the service types several times, the second call that happens before the cache expires, doesn't hit the backend. But the third call that happens that happens after the cache has expired does hit the backend again. """ - single_service_type = self.WMTS_SERVICE_TYPE + # Just need one service type for the test. + single_service_type = self.SERVICE_TYPES_ONLT_WMTS mock_be1 = requests_mock.get(backend1 + "/service_types", json=single_service_type) processing = AggregatorProcessing(backends=multi_backend_connection, catalog=catalog, config=config) implementation = AggregatorSecondaryServices(backends=multi_backend_connection, processing=processing, config=config) @@ -351,7 +352,7 @@ def test_create_service_succeeds( }, status_code=201 ) - requests_mock.get(backend1 + "/service_types", json=self.WMTS_SERVICE_TYPE) + requests_mock.get(backend1 + "/service_types", json=self.SERVICE_TYPES_ONLT_WMTS) processing = AggregatorProcessing(backends=multi_backend_connection, catalog=catalog, config=config) implementation = AggregatorSecondaryServices(backends=multi_backend_connection, processing=processing, config=config) @@ -371,7 +372,7 @@ def test_create_service_raises_serviceunsupportedexception( """When it gets a request for a service type that no backend supports, it raises ServiceUnsupportedException.""" # Set up one service type. Don't want test to succeed because there are no services at all. - requests_mock.get(backend1 + "/service_types", json=self.WMTS_SERVICE_TYPE) + requests_mock.get(backend1 + "/service_types", json=self.SERVICE_TYPES_ONLT_WMTS) non_existent_service_id = "b1-doesnotexist" # check that this requests_mock does not get called @@ -413,7 +414,7 @@ def test_create_service_backend_raises_openeoapiexception( backend1 + "/services", exc=exception_class("Some server error"), ) - requests_mock.get(backend1 + "/service_types", json=self.WMTS_SERVICE_TYPE) + requests_mock.get(backend1 + "/service_types", json=self.SERVICE_TYPES_ONLT_WMTS) processing = AggregatorProcessing(backends=multi_backend_connection, catalog=catalog, config=config) implementation = AggregatorSecondaryServices(backends=multi_backend_connection, processing=processing, config=config) @@ -445,7 +446,7 @@ def test_create_service_backend_reraises( backend1 + "/services", exc=exception_class("Some server error") ) - requests_mock.get(backend1 + "/service_types", json=self.WMTS_SERVICE_TYPE) + requests_mock.get(backend1 + "/service_types", json=self.SERVICE_TYPES_ONLT_WMTS) processing = AggregatorProcessing(backends=multi_backend_connection, catalog=catalog, config=config) implementation = AggregatorSecondaryServices(backends=multi_backend_connection, processing=processing, config=config) diff --git a/tests/test_views.py b/tests/test_views.py index 0cdc06f3..160893ab 100644 --- a/tests/test_views.py +++ b/tests/test_views.py @@ -1363,7 +1363,7 @@ def service_metadata_wmts_foo(self): # not setting "created": This is used to test creating a service. ) - WMTS_SERVICE_TYPE = { + SERVICE_TYPES_ONLT_WMTS = { "WMTS": { "configuration": { "colormap": { @@ -1389,27 +1389,8 @@ def test_service_types_simple(self, api100, backend1, backend2, requests_mock): """Given 2 backends but only 1 backend has a single service, then the aggregator returns that 1 service's metadata. """ - single_service_type = { - "WMTS": { - "configuration": { - "colormap": { - "default": "YlGn", - "description": - "The colormap to apply to single band layers", - "type": "string" - }, - "version": { - "default": "1.0.0", - "description": "The WMTS version to use.", - "enum": ["1.0.0"], - "type": "string" - } - }, - "links": [], - "process_parameters": [], - "title": "Web Map Tile Service" - } - } + # Only need a single service type. + single_service_type = self.SERVICE_TYPES_ONLT_WMTS requests_mock.get(backend1 + "/service_types", json=single_service_type) requests_mock.get(backend2 + "/service_types", json={}) @@ -1725,7 +1706,7 @@ def test_create_wmts(self, api100, requests_mock, backend1): }, status_code=201 ) - requests_mock.get(backend1 + "/service_types", json=self.WMTS_SERVICE_TYPE) + requests_mock.get(backend1 + "/service_types", json=self.SERVICE_TYPES_ONLT_WMTS) resp = api100.post('/services', json=post_data).assert_status_code(201) @@ -1822,7 +1803,7 @@ def test_create_wmts_reports_500_server_error(self, api100, requests_mock, backe backend1 + "/services", exc=exception_class("Testing exception handling") ) - requests_mock.get(backend1 + "/service_types", json=self.WMTS_SERVICE_TYPE) + requests_mock.get(backend1 + "/service_types", json=self.SERVICE_TYPES_ONLT_WMTS) resp = api100.post('/services', json=post_data) assert resp.status_code == 500 From 425d4f32b94c1520b45c9fd52a69b7d958bf1096 Mon Sep 17 00:00:00 2001 From: Johan Schreurs Date: Fri, 9 Dec 2022 14:27:06 +0100 Subject: [PATCH 4/7] Issue #84: Made a test slightly stricter, and updated some comments --- src/openeo_aggregator/backend.py | 15 +++++++++------ tests/test_backend.py | 8 +++++--- 2 files changed, 14 insertions(+), 9 deletions(-) diff --git a/src/openeo_aggregator/backend.py b/src/openeo_aggregator/backend.py index afc35092..3ad51839 100644 --- a/src/openeo_aggregator/backend.py +++ b/src/openeo_aggregator/backend.py @@ -677,8 +677,6 @@ def _get_connection_and_backend_service_id( def service_types(self) -> dict: """https://openeo.org/documentation/1.0/developers/api/reference.html#operation/list-service-types""" - # TODO: implement household data to cache useful data for selecting backend based on service and backend's capabilities. - cached_info = self._memoizer.get_or_call(key=("_get_service_types",), callback=self._get_service_types) # Convert the cached results back to the format that service_types should return. return {name: data["service_type"] for name, data, in cached_info.items()} @@ -689,21 +687,21 @@ def _find_backend_id_for_service_type(self, service_type: str) -> Optional[str]: return cached_info.get(service_type, {}).get("backend_id") def _get_service_types(self) -> dict: - """Returns a dict that maps the service name to a dict that contains 2 items: + """Returns a dict that maps the service name to another dict that contains 2 items: 1) the backend_id that provides this secondary service 2) the service_types for self.service_types. For example: {'WMTS': - {'backend_id': 'b1', - 'service_type': // contains what backend b1 returned for service type WMTS. + {'backend_id': 'b1', + 'service_type': // contains what backend b1 returned for service type WMTS. {'configuration': ... } } 'WMS': {'backend_id': 'b2', - 'service_type': + 'service_type': {'configuration': ... } @@ -724,6 +722,8 @@ def _get_service_types(self) -> dict: https://github.com/Open-EO/openeo-aggregator/issues/83 https://github.com/Open-EO/openeo-aggregator/issues/84 """ + + # TODO: Issue #85 data about backend capabilities could be added to the service_types data structure as well. service_types = {} # TODO: Instead of merge: prefix each type with backend-id? #83 @@ -799,6 +799,9 @@ def create_service(self, user_id: str, process_graph: dict, service_type: str, a """ # TODO: configuration is not used. What to do with it? + # TODO: Strictly speaking it would be better to override _create_service instead of create_service + # but for now we override create_service so we can keep the special case for the "sentinelhub" backend. + # TODO: hardcoded/forced "SentinelHub only" support for now. # Instead, properly determine backend based on service type? # See https://github.com/Open-EO/openeo-aggregator/issues/78#issuecomment-1326180557 diff --git a/tests/test_backend.py b/tests/test_backend.py index 94f0f3d2..d948e06d 100644 --- a/tests/test_backend.py +++ b/tests/test_backend.py @@ -371,14 +371,15 @@ def test_create_service_raises_serviceunsupportedexception( ): """When it gets a request for a service type that no backend supports, it raises ServiceUnsupportedException.""" - # Set up one service type. Don't want test to succeed because there are no services at all. + # At least 1 service type must be present. + # We don't want test to succeed erroneously simply because there are no services at all. requests_mock.get(backend1 + "/service_types", json=self.SERVICE_TYPES_ONLT_WMTS) non_existent_service_id = "b1-doesnotexist" - # check that this requests_mock does not get called + # Check that this requests_mock does not get called. location_backend_1 = backend1 + "/services/" + non_existent_service_id process_graph = {"foo": {"process_id": "foo", "arguments": {}}} - requests_mock.post( + mock_post = requests_mock.post( backend1 + "/services", headers={ "OpenEO-Identifier": "wmts-foo", @@ -398,6 +399,7 @@ def test_create_service_raises_serviceunsupportedexception( api_version="1.0.0", configuration={} ) + assert not mock_post.called @pytest.mark.parametrize("exception_class", [OpenEoApiError, OpenEoRestError]) From ff0138d02714e91899a53e927eef931468328a79 Mon Sep 17 00:00:00 2001 From: Johan Schreurs Date: Mon, 12 Dec 2022 15:21:35 +0100 Subject: [PATCH 5/7] Issue #84, Implement improvements from code review --- src/openeo_aggregator/backend.py | 34 ++++++++++++++---------- tests/test_backend.py | 44 ++++++++++++++++++++++++++++++++ 2 files changed, 64 insertions(+), 14 deletions(-) diff --git a/src/openeo_aggregator/backend.py b/src/openeo_aggregator/backend.py index 3ad51839..898801c8 100644 --- a/src/openeo_aggregator/backend.py +++ b/src/openeo_aggregator/backend.py @@ -677,14 +677,20 @@ def _get_connection_and_backend_service_id( def service_types(self) -> dict: """https://openeo.org/documentation/1.0/developers/api/reference.html#operation/list-service-types""" - cached_info = self._memoizer.get_or_call(key=("_get_service_types",), callback=self._get_service_types) + cached_info = self._get_service_types_cached() # Convert the cached results back to the format that service_types should return. return {name: data["service_type"] for name, data, in cached_info.items()} - def _find_backend_id_for_service_type(self, service_type: str) -> Optional[str]: + def _find_backend_id_for_service_type(self, service_type: str) -> str: """Returns the ID of the backend that provides the service_type or None if no backend supports that service.""" - cached_info = self._memoizer.get_or_call(key=("_get_service_types",), callback=self._get_service_types) - return cached_info.get(service_type, {}).get("backend_id") + cached_info = self._get_service_types_cached() + backend_id = cached_info.get(service_type, {}).get("backend_id") + if backend_id is None: + raise ServiceUnsupportedException(service_type) + return backend_id + + def _get_service_types_cached(self): + return self._memoizer.get_or_call(key=("service_types"), callback=self._get_service_types) def _get_service_types(self) -> dict: """Returns a dict that maps the service name to another dict that contains 2 items: @@ -726,12 +732,6 @@ def _get_service_types(self) -> dict: # TODO: Issue #85 data about backend capabilities could be added to the service_types data structure as well. service_types = {} - # TODO: Instead of merge: prefix each type with backend-id? #83 - def merge(services: dict, backend_id: str, new_service: dict): - for name, data in new_service.items(): - if name.lower() not in {k.lower() for k in services.keys()}: - services[name] = dict(backend_id=backend_id, service_type=data) - # Collect all service types from the backends. for con in self._backends: # TODO: skip back-ends that do not support secondary services. https://github.com/Open-EO/openeo-aggregator/issues/78#issuecomment-1326180557 @@ -742,8 +742,16 @@ def merge(services: dict, backend_id: str, new_service: dict): _log.warning(f"Failed to get service_types from {con.id}: {e!r}", exc_info=True) continue # TODO #1 smarter merging: parameter differences? - merge(service_types, con.id, types_to_add) - + # TODO: Instead of merge: prefix each type with backend-id? #83 + for name, data in types_to_add.items(): + if name.lower() not in {k.lower() for k in service_types.keys()}: + service_types[name] = dict(backend_id=con.id, service_type=data) + else: + conflicting_backend = service_types[name]["backend_id"] + _log.warning( + f'Conflicting secondary service types: "{name}" is present in more than one backend, ' + + f'already found in backend: {conflicting_backend}' + ) return service_types def list_services(self, user_id: str) -> List[ServiceMetadata]: @@ -811,8 +819,6 @@ def create_service(self, user_id: str, process_graph: dict, service_type: str, a backend_id = "sentinelhub" else: backend_id = self._find_backend_id_for_service_type(service_type) - if backend_id is None: - raise ServiceUnsupportedException(service_type) process_graph = self._processing.preprocess_process_graph(process_graph, backend_id=backend_id) con = self._backends.get_connection(backend_id) diff --git a/tests/test_backend.py b/tests/test_backend.py index d948e06d..2a98daf3 100644 --- a/tests/test_backend.py +++ b/tests/test_backend.py @@ -1,4 +1,5 @@ import datetime as dt +import logging import pytest @@ -244,6 +245,49 @@ def test_service_types_multiple_backends(self, multi_backend_connection, config, expected_service_types.update(service_type_2) assert actual_service_types == expected_service_types + def test_service_types_warns_about_duplicate_service(self, multi_backend_connection, config, catalog, + backend1, backend2, requests_mock, caplog + ): + """ + Given 2 backends which have conflicting service types, + then the aggregator lists only the service type from the first backend + and it logs a warning about the conflicting types. + """ + caplog.set_level(logging.WARNING) + service_type_1 = { + "WMS": { + "title": "OGC Web Map Service", + "configuration": {}, + "process_parameters": [], + "links": [] + } + } + service_type_2 = { + "WMS": { + "title": "A duplicate OGC Web Map Service", + "configuration": {}, + "process_parameters": [], + "links": [] + } + } + requests_mock.get(backend1 + "/service_types", json=service_type_1) + requests_mock.get(backend2 + "/service_types", json=service_type_2) + processing = AggregatorProcessing(backends=multi_backend_connection, catalog=catalog, config=config) + implementation = AggregatorSecondaryServices(backends=multi_backend_connection, processing=processing, config=config) + + actual_service_types = implementation.service_types() + + # There were duplicate service types: + # Therefore it should find only one service type, and the log should contain a warning. + expected_service_types = dict(service_type_1) + assert actual_service_types == expected_service_types + + expected_log_message = ( + 'Conflicting secondary service types: "WMS" is present in more than one backend, ' + + 'already found in backend: b1' + ) + assert expected_log_message in caplog.text + @pytest.fixture def service_metadata_wmts_foo(self): return ServiceMetadata( From ded38cccc07995ea9f7ce759ed1ec5567922cdbb Mon Sep 17 00:00:00 2001 From: Johan Schreurs Date: Mon, 12 Dec 2022 15:37:35 +0100 Subject: [PATCH 6/7] Issue #84, Code review: removed workaround that forces sentinelhub for all secondary service creation --- src/openeo_aggregator/backend.py | 10 +------- tests/test_views.py | 41 -------------------------------- 2 files changed, 1 insertion(+), 50 deletions(-) diff --git a/src/openeo_aggregator/backend.py b/src/openeo_aggregator/backend.py index 898801c8..8736e8b9 100644 --- a/src/openeo_aggregator/backend.py +++ b/src/openeo_aggregator/backend.py @@ -810,15 +810,7 @@ def create_service(self, user_id: str, process_graph: dict, service_type: str, a # TODO: Strictly speaking it would be better to override _create_service instead of create_service # but for now we override create_service so we can keep the special case for the "sentinelhub" backend. - # TODO: hardcoded/forced "SentinelHub only" support for now. - # Instead, properly determine backend based on service type? - # See https://github.com/Open-EO/openeo-aggregator/issues/78#issuecomment-1326180557 - # and https://github.com/Open-EO/openeo-aggregator/issues/83 - # Should be able to remove this hardcoded workaround once issue #85 has been implemented. - if "sentinelhub" in self._backends._backend_urls: - backend_id = "sentinelhub" - else: - backend_id = self._find_backend_id_for_service_type(service_type) + backend_id = self._find_backend_id_for_service_type(service_type) process_graph = self._processing.preprocess_process_graph(process_graph, backend_id=backend_id) con = self._backends.get_connection(backend_id) diff --git a/tests/test_views.py b/tests/test_views.py index 160893ab..a4731180 100644 --- a/tests/test_views.py +++ b/tests/test_views.py @@ -1713,47 +1713,6 @@ def test_create_wmts(self, api100, requests_mock, backend1): assert resp.headers["OpenEO-Identifier"] == expected_agg_id assert resp.headers["Location"] == expected_location - @pytest.mark.parametrize("backend2_id", ["sentinelhub"]) - def test_create_wmts_forced_sentinelhub( - self, api100, requests_mock, backend1, backend2_id, backend2 - ): - """When the payload is correct the service should be successfully created, - the service ID should be prepended with the backend ID, - and location should point to the aggregator, not to the backend directly. - """ - # TODO this is a temp test for a temp "force sentinelhub" workaround hack (https://github.com/Open-EO/openeo-aggregator/issues/78) - - api100.set_auth_bearer_token(TEST_USER_BEARER_TOKEN) - - backend_service_id = "c63d6c27-c4c2-4160-b7bd-9e32f582daec" - expected_agg_id = "sentinelhub-" + backend_service_id - - # The aggregator MUST NOT point to the backend instance but to its own endpoint. - # This is handled by the openeo python driver in openeo_driver.views.services_post. - expected_location = f"/openeo/1.0.0/services/{expected_agg_id}" - upstream_location = f"{backend2}/services/{backend_service_id}" - - process_graph = {"foo": {"process_id": "foo", "arguments": {}}} - post_data = { - "type": "WMTS", - "process": {"process_graph": process_graph, "id": "filter_temporal_wmts"}, - "title": "My Service", - "description": "Service description", - } - requests_mock.post( - backend2 + "/services", - headers={ - "OpenEO-Identifier": backend_service_id, - "Location": upstream_location, - }, - status_code=201, - ) - - resp = api100.post("/services", json=post_data).assert_status_code(201) - - assert resp.headers["OpenEO-Identifier"] == expected_agg_id - assert resp.headers["Location"] == expected_location - # ProcessGraphMissingException and ProcessGraphInvalidException are well known reasons for a bad client request. @pytest.mark.parametrize("exception_class", [ProcessGraphMissingException, ProcessGraphInvalidException]) def test_create_wmts_reports_400_client_error(self, api100, requests_mock, backend1, exception_class): From 5e8b3f9215c61c53f37cbd07c155d7a6d0bb10a6 Mon Sep 17 00:00:00 2001 From: Johan Schreurs Date: Mon, 12 Dec 2022 16:26:24 +0100 Subject: [PATCH 7/7] Issue #84: Cleanup left over from workaround that forces sentinelhub for secondary service creation --- src/openeo_aggregator/backend.py | 7 ++----- 1 file changed, 2 insertions(+), 5 deletions(-) diff --git a/src/openeo_aggregator/backend.py b/src/openeo_aggregator/backend.py index 8736e8b9..7d00e57a 100644 --- a/src/openeo_aggregator/backend.py +++ b/src/openeo_aggregator/backend.py @@ -682,7 +682,7 @@ def service_types(self) -> dict: return {name: data["service_type"] for name, data, in cached_info.items()} def _find_backend_id_for_service_type(self, service_type: str) -> str: - """Returns the ID of the backend that provides the service_type or None if no backend supports that service.""" + """Returns the ID of the backend that provides the service_type.""" cached_info = self._get_service_types_cached() backend_id = cached_info.get(service_type, {}).get("backend_id") if backend_id is None: @@ -800,16 +800,13 @@ def service_info(self, user_id: str, service_id: str) -> ServiceMetadata: service_json["id"] = ServiceIdMapping.get_aggregator_service_id(service_json["id"], con.id) return ServiceMetadata.from_dict(service_json) - def create_service(self, user_id: str, process_graph: dict, service_type: str, api_version: str, + def _create_service(self, user_id: str, process_graph: dict, service_type: str, api_version: str, configuration: dict) -> str: """ https://openeo.org/documentation/1.0/developers/api/reference.html#operation/create-service """ # TODO: configuration is not used. What to do with it? - # TODO: Strictly speaking it would be better to override _create_service instead of create_service - # but for now we override create_service so we can keep the special case for the "sentinelhub" backend. - backend_id = self._find_backend_id_for_service_type(service_type) process_graph = self._processing.preprocess_process_graph(process_graph, backend_id=backend_id)