Skip to content

Commit

Permalink
Merge branch 'issue22-capabilities-federation'
Browse files Browse the repository at this point in the history
  • Loading branch information
soxofaan committed Mar 5, 2025
2 parents b26702c + 1ad08db commit 29cabee
Show file tree
Hide file tree
Showing 10 changed files with 121 additions and 36 deletions.
2 changes: 2 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,8 @@ The format is roughly based on [Keep a Changelog](https://keepachangelog.com/en/
## unreleased

- Bump minimum required Python version to 3.11 ([#127](https://github.com/Open-EO/openeo-aggregator/issues/127), [#174](https://github.com/Open-EO/openeo-aggregator/issues/174))
- Add title, description, online/offline status and last_status_check to federation listing in capabilities document ([#22](https://github.com/Open-EO/openeo-aggregator/issues/22))


## 0.43.0

Expand Down
8 changes: 2 additions & 6 deletions src/openeo_aggregator/backend.py
Original file line number Diff line number Diff line change
Expand Up @@ -1596,12 +1596,8 @@ def capabilities_billing(self) -> dict:

def postprocess_capabilities(self, capabilities: dict) -> dict:
# TODO: which url to use? unversioned or versioned? see https://github.com/Open-EO/openeo-api/pull/419
capabilities["federation"] = {
bid: {
"url": status["root_url"],
}
for bid, status in self._backends.get_status().items()
}
capabilities["federation"] = self._backends.get_federation_overview()

# TODO: standardize this field?
capabilities["_partitioned_job_tracking"] = bool(self.batch_jobs.partitioned_job_tracker)
return capabilities
Expand Down
1 change: 1 addition & 0 deletions src/openeo_aggregator/config/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@ class AggregatorBackendConfig(OpenEoBackendConfig):
packages=["openeo", "openeo_driver", "openeo_aggregator"],
)

# TODO: allow to specify more info per backend in addtion to just URL: title, description, experimental flag, ...
aggregator_backends: Dict[str, str] = attrs.field(validator=attrs.validators.min_len(1))

# See `ZooKeeperPartitionedJobDB.from_config` for supported fields.
Expand Down
54 changes: 40 additions & 14 deletions src/openeo_aggregator/connection.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
import concurrent.futures
import contextlib
import dataclasses
import datetime
import logging
import re
from typing import (
Expand Down Expand Up @@ -41,7 +42,7 @@
STREAM_CHUNK_SIZE_DEFAULT,
get_backend_config,
)
from openeo_aggregator.utils import _UNSET, Clock, EventHandler
from openeo_aggregator.utils import _UNSET, Clock, EventHandler, timestamp_to_rfc3339

_log = logging.getLogger(__name__)

Expand Down Expand Up @@ -198,7 +199,10 @@ def request(*args, **kwargs):
self.request = request


_ConnectionsCache = collections.namedtuple("_ConnectionsCache", ["expiry", "connections"])
@dataclasses.dataclass(frozen=True)
class _ConnectionsCache:
timestamp: float
connections: List[BackendConnection]


@dataclasses.dataclass(frozen=True)
Expand Down Expand Up @@ -234,14 +238,15 @@ def __init__(
)
# TODO: backend_urls as dict does not have explicit order, while this is important.
_log.info(f"Creating MultiBackendConnection with {backends=!r}")
# TODO: support more backend info than just URL (title, description, experimental flag, ...)
self._backend_urls = backends
self._configured_oidc_providers = configured_oidc_providers

# General (metadata/status) caching
self._memoizer: Memoizer = memoizer or NullMemoizer()

# Caching of connection objects
self._connections_cache = _ConnectionsCache(expiry=0, connections=[])
self._connections_cache = _ConnectionsCache(timestamp=0, connections=[])
self._connections_cache_ttl = connections_cache_ttl
# Event handler for when there is a change in the set of working back-end ids.
self.on_connections_change = EventHandler("connections_change")
Expand All @@ -258,7 +263,7 @@ def from_config() -> "MultiBackendConnection":
)

def _get_connections(self, skip_failures=False) -> Iterator[BackendConnection]:
"""Create new backend connections."""
"""Create new backend connections, possibly skipping connection failures."""
for bid, url in self._backend_urls.items():
try:
_log.info(f"Create backend {bid!r} connection to {url!r}")
Expand All @@ -273,13 +278,14 @@ def _get_connections(self, skip_failures=False) -> Iterator[BackendConnection]:
def get_connections(self) -> List[BackendConnection]:
"""Get backend connections (re-created automatically if cache ttl expired)"""
now = Clock.time()
if now > self._connections_cache.expiry:
_log.debug(f"Connections cache expired ({now:.2f}>{self._connections_cache.expiry:.2f})")
expiry = self._connections_cache.timestamp + self._connections_cache_ttl
if now > expiry:
_log.debug(f"Connections cache expired ({now:.2f}>{expiry:.2f})")
orig_bids = [c.id for c in self._connections_cache.connections]
for con in self._connections_cache.connections:
con.invalidate()
self._connections_cache = _ConnectionsCache(
expiry=now + self._connections_cache_ttl, connections=list(self._get_connections(skip_failures=True))
timestamp=now, connections=list(self._get_connections(skip_failures=True))
)
new_bids = [c.id for c in self._connections_cache.connections]
_log.debug(
Expand Down Expand Up @@ -313,17 +319,37 @@ def get_connection(self, backend_id: str) -> BackendConnection:
return con
raise OpenEOApiException(f"No backend with id {backend_id!r}")

def get_status(self) -> dict:
return {
def get_federation_overview(self) -> dict:
"""
Federation overview, to be used in the capabilities document (`GET /`),
under the "federation" field, per federation extension
(conformance class https://api.openeo.org/extensions/federation/0.1.0)
"""
online_connections = self.get_connections()
last_status_check = timestamp_to_rfc3339(self._connections_cache.timestamp)
federation = {
c.id: {
# TODO: avoid private attributes?
# TODO: add real backend status? (cached?)
"root_url": c._root_url,
"orig_url": c._orig_url,
"url": c.root_url,
"title": c.capabilities().get("title", f"Backend {c.id!r}"),
"description": c.capabilities().get("description", f"Federated openEO backend {c.id!r}"),
"status": "online",
"last_status_check": last_status_check,
}
for c in self.get_connections()
for c in online_connections
}

for bid, url in self._backend_urls.items():
if bid not in federation:
federation[bid] = {
"url": url,
"status": "offline",
"title": f"Backend {bid!r}",
"description": f"Federated openEO backend {bid!r}",
"last_status_check": last_status_check,
}

return federation

def _get_api_versions(self) -> List[str]:
return list(set(c.capabilities().api_version() for c in self.get_connections()))

Expand Down
6 changes: 4 additions & 2 deletions src/openeo_aggregator/testing.py
Original file line number Diff line number Diff line change
Expand Up @@ -193,6 +193,8 @@ def capabilities(
api_version: str = "1.1.0",
stac_version: str = "0.9.0",
secondary_services: bool = False,
title: Optional[str] = None,
description: Optional[str] = None,
) -> dict:
"""
Helper to build a capabilities doc.
Expand All @@ -203,8 +205,8 @@ def capabilities(
"backend_version": openeo_aggregator.about.__version__,
"stac_version": stac_version,
"id": "openeo-aggregator-testing",
"title": "Test openEO Aggregator",
"description": "Test instance of openEO Aggregator",
"title": title or "Dummy federation",
"description": description or "Test instance of openEO Aggregator",
"endpoints": [
{"path": "/collections", "methods": ["GET"]},
{"path": "/collections/{collection_id}", "methods": ["GET"]},
Expand Down
4 changes: 2 additions & 2 deletions src/openeo_aggregator/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -168,7 +168,7 @@ def utcnow(cls) -> datetime.datetime:
"""
Like `datetime.datetime.utcnow()`: Current UTC datetime (naive).
"""
return datetime.datetime.utcfromtimestamp(cls.time())
return datetime.datetime.fromtimestamp(cls.time(), tz=datetime.timezone.utc)


class BoundingBox(NamedTuple):
Expand Down Expand Up @@ -211,7 +211,7 @@ def strip_join(separator: str, *args: str) -> str:

def timestamp_to_rfc3339(timestamp: float) -> str:
"""Convert unix epoch timestamp to RFC3339 datetime string"""
dt = datetime.datetime.utcfromtimestamp(timestamp)
dt = datetime.datetime.fromtimestamp(timestamp, tz=datetime.timezone.utc)
return rfc3339.datetime(dt)


Expand Down
15 changes: 13 additions & 2 deletions tests/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,13 @@ def pytest_configure(config):
def backend1(requests_mock, mbldr) -> str:
domain = "https://b1.test/v1"
# TODO: how to work with different API versions?
requests_mock.get(domain + "/", json=mbldr.capabilities())
requests_mock.get(
domain + "/",
json=mbldr.capabilities(
title="Dummy Federation One",
description="Welcome to Federation One.",
),
)
requests_mock.get(domain + "/credentials/oidc", json=mbldr.credentials_oidc())
requests_mock.get(domain + "/processes", json=mbldr.processes(*_DEFAULT_PROCESSES))
return domain
Expand All @@ -52,7 +58,12 @@ def backend1(requests_mock, mbldr) -> str:
@pytest.fixture
def backend2(requests_mock, mbldr) -> str:
domain = "https://b2.test/v1"
requests_mock.get(domain + "/", json=mbldr.capabilities())
requests_mock.get(
domain + "/",
json=mbldr.capabilities(
title="Dummy The Second",
),
)
requests_mock.get(domain + "/credentials/oidc", json=mbldr.credentials_oidc())
requests_mock.get(domain + "/processes", json=mbldr.processes(*_DEFAULT_PROCESSES))
return domain
Expand Down
41 changes: 37 additions & 4 deletions tests/test_connection.py
Original file line number Diff line number Diff line change
Expand Up @@ -267,10 +267,43 @@ def test_map(self, multi_backend_connection, backend1, backend2, requests_mock):
assert isinstance(res, types.GeneratorType)
assert list(res) == [("b1", {"bar": 1}), ("b2", {"meh": 2})]

def test_status(self, multi_backend_connection):
assert multi_backend_connection.get_status() == {
"b1": {"orig_url": "https://b1.test/v1", "root_url": "https://b1.test/v1"},
"b2": {"orig_url": "https://b2.test/v1", "root_url": "https://b2.test/v1"},
@clock_mock("2025-03-05T12:34:56Z")
def test_get_federation_overview_basic(self, multi_backend_connection):
assert multi_backend_connection.get_federation_overview() == {
"b1": {
"url": "https://b1.test/v1",
"title": "Dummy Federation One",
"description": "Welcome to Federation One.",
"status": "online",
"last_status_check": "2025-03-05T12:34:56Z",
},
"b2": {
"url": "https://b2.test/v1",
"title": "Dummy The Second",
"description": "Test instance of openEO Aggregator",
"status": "online",
"last_status_check": "2025-03-05T12:34:56Z",
},
}

@clock_mock("2025-03-05T12:34:56Z")
def test_get_federation_overview_offline(self, multi_backend_connection, backend1, backend2, requests_mock):
requests_mock.get(f"{backend2}/", status_code=500, json={"error": "nope"})
assert multi_backend_connection.get_federation_overview() == {
"b1": {
"url": "https://b1.test/v1",
"title": "Dummy Federation One",
"description": "Welcome to Federation One.",
"status": "online",
"last_status_check": "2025-03-05T12:34:56Z",
},
"b2": {
"url": "https://b2.test/v1",
"description": "Federated openEO backend 'b2'",
"title": "Backend 'b2'",
"status": "offline",
"last_status_check": "2025-03-05T12:34:56Z",
},
}

@pytest.mark.parametrize(
Expand Down
8 changes: 4 additions & 4 deletions tests/test_testing.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ def test_mock_clock_basic(fail):
try:
with clock_mock():
assert Clock.time() == 1500000000
assert Clock.utcnow() == datetime.datetime(2017, 7, 14, 2, 40, 0)
assert Clock.utcnow() == datetime.datetime(2017, 7, 14, 2, 40, 0, tzinfo=datetime.timezone.utc)
if fail:
raise RuntimeError
except RuntimeError:
Expand All @@ -32,9 +32,9 @@ def test_mock_clock_basic(fail):
@pytest.mark.parametrize(
["start", "expected_time", "expected_date"],
[
(1000, 1000, datetime.datetime(1970, 1, 1, 0, 16, 40)),
("2021-02-21", 1613865600, datetime.datetime(2021, 2, 21)),
("2021-02-21T12:34:56Z", 1613910896, datetime.datetime(2021, 2, 21, 12, 34, 56)),
(1000, 1000, datetime.datetime(1970, 1, 1, 0, 16, 40, tzinfo=datetime.timezone.utc)),
("2021-02-21", 1613865600, datetime.datetime(2021, 2, 21, tzinfo=datetime.timezone.utc)),
("2021-02-21T12:34:56Z", 1613910896, datetime.datetime(2021, 2, 21, 12, 34, 56, tzinfo=datetime.timezone.utc)),
],
)
def test_mock_clock_start(start, expected_time, expected_date):
Expand Down
18 changes: 16 additions & 2 deletions tests/test_views.py
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,8 @@


class TestGeneral:

@clock_mock("2025-03-05T12:34:56Z")
def test_capabilities(self, api100):
res = api100.get("/").assert_status_code(200)
capabilities = res.json
Expand All @@ -48,8 +50,20 @@ def test_capabilities(self, api100):
assert {"methods": ["GET"], "path": "/collections/{collection_id}"} in endpoints
assert {"methods": ["GET"], "path": "/processes"} in endpoints
assert capabilities["federation"] == {
"b1": {"url": "https://b1.test/v1"},
"b2": {"url": "https://b2.test/v1"},
"b1": {
"url": "https://b1.test/v1",
"title": "Dummy Federation One",
"description": "Welcome to Federation One.",
"status": "online",
"last_status_check": "2025-03-05T12:34:56Z",
},
"b2": {
"url": "https://b2.test/v1",
"title": "Dummy The Second",
"description": "Test instance of openEO Aggregator",
"status": "online",
"last_status_check": "2025-03-05T12:34:56Z",
},
}

def test_title_and_description(self, api100):
Expand Down

0 comments on commit 29cabee

Please sign in to comment.