Skip to content

Commit

Permalink
Issue #84: Implement selection of backend based on service type, for …
Browse files Browse the repository at this point in the history
…create_service.
  • Loading branch information
JohanKJSchreurs committed Dec 9, 2022
1 parent a67cdbf commit d32dbaf
Show file tree
Hide file tree
Showing 3 changed files with 142 additions and 58 deletions.
67 changes: 55 additions & 12 deletions src/openeo_aggregator/backend.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@
from openeo_driver.datacube import DriverDataCube
from openeo_driver.errors import CollectionNotFoundException, OpenEOApiException, ProcessGraphMissingException, \
JobNotFoundException, JobNotFinishedException, ProcessGraphInvalidException, PermissionsInsufficientException, \
FeatureUnsupportedException, ServiceNotFoundException
FeatureUnsupportedException, ServiceNotFoundException, ServiceUnsupportedException
from openeo_driver.processes import ProcessRegistry
from openeo_driver.users import User
from openeo_driver.utils import EvalEnv
Expand Down Expand Up @@ -677,18 +677,61 @@ def _get_connection_and_backend_service_id(
return con, backend_service_id

def service_types(self) -> dict:
return self._memoizer.get_or_call(key=("all_service_types",), callback=self._get_service_types)
"""https://openeo.org/documentation/1.0/developers/api/reference.html#operation/list-service-types"""
# TODO: implement household data to cache useful data for selecting backend based on service and backend's capabilities.

cached_info = self._memoizer.get_or_call(key=("_get_service_types",), callback=self._get_service_types)
# Convert the cached results back to the format that service_types should return.
return {name: data["service_type"] for name, data, in cached_info.items()}

def _find_backend_id_for_service_type(self, service_type: str) -> Optional[str]:
"""Returns the ID of the backend that provides the service_type or None if no backend supports that service."""
cached_info = self._memoizer.get_or_call(key=("_get_service_types",), callback=self._get_service_types)
return cached_info.get(service_type, {}).get("backend_id")

def _get_service_types(self) -> dict:
"""https://openeo.org/documentation/1.0/developers/api/reference.html#operation/list-service-types"""
# TODO: add caching. Also see https://github.com/Open-EO/openeo-aggregator/issues/78#issuecomment-1326180557
"""Returns a dict that maps the service name to a dict that contains 2 items:
1) the backend_id that provides this secondary service
2) the service_types for self.service_types.
For example:
{'WMTS':
{'backend_id': 'b1',
'service_type': // contains what backend b1 returned for service type WMTS.
{'configuration':
...
}
}
'WMS':
{'backend_id': 'b2',
'service_type':
{'configuration':
...
}
}
}
Info for selecting the right backend to create a new service
------------------------------------------------------------
We are assuming that there is only one upstream backend per service type.
That means we don't really need to merge duplicate service types, i.e. there is no need to resolve
some kind of conflicts between duplicate service types.
So far we don't store info about the backends' capabilities, but in the future
we may need to store that to select the right backend when we create a service.
See: issues #83 and #84:
https://github.com/Open-EO/openeo-aggregator/issues/83
https://github.com/Open-EO/openeo-aggregator/issues/84
"""
service_types = {}

# TODO: Instead of merge: prefix each type with backend-id? #83
def merge(formats: dict, to_add: dict):
for name, data in to_add.items():
if name.lower() not in {k.lower() for k in formats.keys()}:
formats[name] = data
def merge(services: dict, backend_id: str, new_service: dict):
for name, data in new_service.items():
if name.lower() not in {k.lower() for k in services.keys()}:
services[name] = dict(backend_id=backend_id, service_type=data)

# Collect all service types from the backends.
for con in self._backends:
Expand All @@ -700,7 +743,7 @@ def merge(formats: dict, to_add: dict):
_log.warning(f"Failed to get service_types from {con.id}: {e!r}", exc_info=True)
continue
# TODO #1 smarter merging: parameter differences?
merge(service_types, types_to_add)
merge(service_types, con.id, types_to_add)

return service_types

Expand Down Expand Up @@ -764,9 +807,9 @@ def create_service(self, user_id: str, process_graph: dict, service_type: str, a
if "sentinelhub" in self._backends._backend_urls:
backend_id = "sentinelhub"
else:
backend_id = self._processing.get_backend_for_process_graph(
process_graph=process_graph, api_version=api_version
)
backend_id = self._find_backend_id_for_service_type(service_type)
if backend_id is None:
raise ServiceUnsupportedException(service_type)
process_graph = self._processing.preprocess_process_graph(process_graph, backend_id=backend_id)

con = self._backends.get_connection(backend_id)
Expand Down
106 changes: 62 additions & 44 deletions tests/test_backend.py
Original file line number Diff line number Diff line change
Expand Up @@ -134,33 +134,35 @@ class TestAggregatorSecondaryServices:
# TODO: most tests here (the ones that do flask app stuff and auth)
# belong under test_views.py

WMTS_SERVICE_TYPE = {
"WMTS": {
"configuration": {
"colormap": {
"default": "YlGn",
"description":
"The colormap to apply to single band layers",
"type": "string"
},
"version": {
"default": "1.0.0",
"description": "The WMTS version to use.",
"enum": ["1.0.0"],
"type": "string"
}
},
"links": [],
"process_parameters": [],
"title": "Web Map Tile Service"
}
}

def test_service_types_simple(
self, multi_backend_connection, config, catalog, backend1, backend2, requests_mock
):
"""Given 2 backends and only 1 backend has a single service type, then the aggregator
returns that 1 service type's metadata.
"""
single_service_type = {
"WMTS": {
"configuration": {
"colormap": {
"default": "YlGn",
"description":
"The colormap to apply to single band layers",
"type": "string"
},
"version": {
"default": "1.0.0",
"description": "The WMTS version to use.",
"enum": ["1.0.0"],
"type": "string"
}
},
"links": [],
"process_parameters": [],
"title": "Web Map Tile Service"
}
}
single_service_type = self.WMTS_SERVICE_TYPE
requests_mock.get(backend1 + "/service_types", json=single_service_type)
requests_mock.get(backend2 + "/service_types", json={})
processing = AggregatorProcessing(backends=multi_backend_connection, catalog=catalog, config=config)
Expand All @@ -177,27 +179,7 @@ def test_service_types_simple_cached(
doesn't hit the backend.
But the third call that happens that happens after the cache has expired does hit the backend again.
"""
single_service_type = {
"WMTS": {
"configuration": {
"colormap": {
"default": "YlGn",
"description":
"The colormap to apply to single band layers",
"type": "string"
},
"version": {
"default": "1.0.0",
"description": "The WMTS version to use.",
"enum": ["1.0.0"],
"type": "string"
}
},
"links": [],
"process_parameters": [],
"title": "Web Map Tile Service"
}
}
single_service_type = self.WMTS_SERVICE_TYPE
mock_be1 = requests_mock.get(backend1 + "/service_types", json=single_service_type)
processing = AggregatorProcessing(backends=multi_backend_connection, catalog=catalog, config=config)
implementation = AggregatorSecondaryServices(backends=multi_backend_connection, processing=processing, config=config)
Expand All @@ -217,8 +199,7 @@ def test_service_types_simple_cached(
assert mock_be1.call_count == 2
assert service_types == single_service_type

# TODO: Issue #84 Check if we should remove this merging behavior.
def test_service_types_merging(self, multi_backend_connection, config, catalog,
def test_service_types_multiple_backends(self, multi_backend_connection, config, catalog,
backend1, backend2, requests_mock
):
"""Given 2 backends with each 1 service type, then the aggregator lists both service types."""
Expand Down Expand Up @@ -370,6 +351,7 @@ def test_create_service_succeeds(
},
status_code=201
)
requests_mock.get(backend1 + "/service_types", json=self.WMTS_SERVICE_TYPE)
processing = AggregatorProcessing(backends=multi_backend_connection, catalog=catalog, config=config)
implementation = AggregatorSecondaryServices(backends=multi_backend_connection, processing=processing, config=config)

Expand All @@ -383,6 +365,40 @@ def test_create_service_succeeds(
)
assert actual_openeo_id == expected_service_id

def test_create_service_raises_serviceunsupportedexception(
self, flask_app, multi_backend_connection, config, catalog, backend1, requests_mock
):
"""When it gets a request for a service type that no backend supports, it raises ServiceUnsupportedException."""

# Set up one service type. Don't want test to succeed because there are no services at all.
requests_mock.get(backend1 + "/service_types", json=self.WMTS_SERVICE_TYPE)

non_existent_service_id = "b1-doesnotexist"
# check that this requests_mock does not get called
location_backend_1 = backend1 + "/services/" + non_existent_service_id
process_graph = {"foo": {"process_id": "foo", "arguments": {}}}
requests_mock.post(
backend1 + "/services",
headers={
"OpenEO-Identifier": "wmts-foo",
"Location": location_backend_1
},
status_code=201
)
processing = AggregatorProcessing(backends=multi_backend_connection, catalog=catalog, config=config)
implementation = AggregatorSecondaryServices(backends=multi_backend_connection, processing=processing, config=config)

with flask_app.test_request_context(headers=TEST_USER_AUTH_HEADER):
with pytest.raises(ServiceUnsupportedException):
implementation.create_service(
user_id=TEST_USER,
process_graph=process_graph,
service_type="does-not-exist",
api_version="1.0.0",
configuration={}
)


@pytest.mark.parametrize("exception_class", [OpenEoApiError, OpenEoRestError])
def test_create_service_backend_raises_openeoapiexception(
self, flask_app, multi_backend_connection, config, catalog,
Expand All @@ -397,6 +413,7 @@ def test_create_service_backend_raises_openeoapiexception(
backend1 + "/services",
exc=exception_class("Some server error"),
)
requests_mock.get(backend1 + "/service_types", json=self.WMTS_SERVICE_TYPE)
processing = AggregatorProcessing(backends=multi_backend_connection, catalog=catalog, config=config)
implementation = AggregatorSecondaryServices(backends=multi_backend_connection, processing=processing, config=config)

Expand Down Expand Up @@ -428,6 +445,7 @@ def test_create_service_backend_reraises(
backend1 + "/services",
exc=exception_class("Some server error")
)
requests_mock.get(backend1 + "/service_types", json=self.WMTS_SERVICE_TYPE)
processing = AggregatorProcessing(backends=multi_backend_connection, catalog=catalog, config=config)
implementation = AggregatorSecondaryServices(backends=multi_backend_connection, processing=processing, config=config)

Expand Down
27 changes: 25 additions & 2 deletions tests/test_views.py
Original file line number Diff line number Diff line change
Expand Up @@ -1363,6 +1363,27 @@ def service_metadata_wmts_foo(self):
# not setting "created": This is used to test creating a service.
)

WMTS_SERVICE_TYPE = {
"WMTS": {
"configuration": {
"colormap": {
"default": "YlGn",
"description":
"The colormap to apply to single band layers",
"type": "string"
},
"version": {
"default": "1.0.0",
"description": "The WMTS version to use.",
"enum": ["1.0.0"],
"type": "string"
}
},
"links": [],
"process_parameters": [],
"title": "Web Map Tile Service"
}
}

def test_service_types_simple(self, api100, backend1, backend2, requests_mock):
"""Given 2 backends but only 1 backend has a single service, then the aggregator
Expand Down Expand Up @@ -1390,12 +1411,12 @@ def test_service_types_simple(self, api100, backend1, backend2, requests_mock):
}
}
requests_mock.get(backend1 + "/service_types", json=single_service_type)
requests_mock.get(backend2 + "/service_types", json=single_service_type)
requests_mock.get(backend2 + "/service_types", json={})

resp = api100.get('/service_types').assert_status_code(200)
assert resp.json == single_service_type

def test_service_types_merging(self, api100, backend1, backend2, requests_mock):
def test_service_types_multiple_backends(self, api100, backend1, backend2, requests_mock):
"""Given 2 backends with each 1 service, then the aggregator lists both services."""
service_type_1 = {
"WMTS": {
Expand Down Expand Up @@ -1704,6 +1725,7 @@ def test_create_wmts(self, api100, requests_mock, backend1):
},
status_code=201
)
requests_mock.get(backend1 + "/service_types", json=self.WMTS_SERVICE_TYPE)

resp = api100.post('/services', json=post_data).assert_status_code(201)

Expand Down Expand Up @@ -1800,6 +1822,7 @@ def test_create_wmts_reports_500_server_error(self, api100, requests_mock, backe
backend1 + "/services",
exc=exception_class("Testing exception handling")
)
requests_mock.get(backend1 + "/service_types", json=self.WMTS_SERVICE_TYPE)

resp = api100.post('/services', json=post_data)
assert resp.status_code == 500
Expand Down

0 comments on commit d32dbaf

Please sign in to comment.