Skip to content

Commit

Permalink
Merge pull request #3 from eodcgmbh/add_extra_functionality
Browse files Browse the repository at this point in the history
Replication using rules
  • Loading branch information
SerRichard authored Feb 17, 2025
2 parents 20db846 + a21acdb commit 584d827
Show file tree
Hide file tree
Showing 4 changed files with 263 additions and 15 deletions.
138 changes: 138 additions & 0 deletions examples/stac_integration.ipynb
Original file line number Diff line number Diff line change
@@ -0,0 +1,138 @@
{
"cells": [
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"from stac_rucio.client import StacClient\n",
"\n",
"stac_rucio = StacClient(\n",
" target=\"mfcover\"\n",
")"
]
},
{
"cell_type": "code",
"execution_count": 2,
"metadata": {},
"outputs": [],
"source": [
"from pystac_client import Client\n",
"\n",
"eodc = Client.open(\n",
" \"https://stac.eodc.eu/api/v1\",\n",
" modifier=stac_rucio.rucio_item\n",
")"
]
},
{
"cell_type": "code",
"execution_count": 3,
"metadata": {},
"outputs": [],
"source": [
"# Define your area of interest\n",
"aoi = {\n",
" \"type\": \"Polygon\",\n",
" \"coordinates\": [\n",
" [\n",
" [9.184570312500002, 43.628123412124616],\n",
" [25.543212890625004, 43.628123412124616],\n",
" [25.543212890625004, 49.830896288288976],\n",
" [9.184570312500002, 49.830896288288976],\n",
" [9.184570312500002, 43.628123412124616]\n",
" ]\n",
" ]\n",
"}\n",
"\n",
"# Define your temporal range\n",
"daterange = {\"interval\": [\"2022-12-01T00:00:00Z\", \"2022-12-31T00:00:00Z\"]}\n"
]
},
{
"cell_type": "code",
"execution_count": 4,
"metadata": {},
"outputs": [],
"source": [
"items = eodc.search(filter_lang=\"cql2-json\", filter={\n",
" \"op\": \"and\",\n",
" \"args\": [\n",
" {\"op\": \"s_intersects\", \"args\": [{\"property\": \"geometry\"}, aoi]},\n",
" {\"op\": \"anyinteracts\", \"args\": [{\"property\": \"datetime\"}, daterange]},\n",
" {\"op\": \"=\", \"args\": [{\"property\": \"collection\"}, \"RUCIO_SENTINEL2_MFCOVER\"]}\n",
" ]}\n",
")"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"all_items = [ item for item in items.items() ]\n",
"all_items[0]"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"stac_rucio.create_replicas(items=all_items, rse=\"EODC-DATA\")"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"stac_rucio.create_replication_rules(items=all_items, src_rse=\"EODC-DATA\", dst_rse=\"DESY-DCACHE\")"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"stac_rucio.replication_availability(items=all_items, rse=\"DESY-DCACHE\")"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"stac_rucio.download_available(items=all_items[:2], rse=\"DESY-DCACHE\")"
]
}
],
"metadata": {
"kernelspec": {
"display_name": "Python 3",
"language": "python",
"name": "python3"
},
"language_info": {
"codemirror_mode": {
"name": "ipython",
"version": 3
},
"file_extension": ".py",
"mimetype": "text/x-python",
"name": "python",
"nbconvert_exporter": "python",
"pygments_lexer": "ipython3",
"version": "3.11.6"
}
},
"nbformat": 4,
"nbformat_minor": 2
}
2 changes: 1 addition & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[tool.poetry]
name = "stac-rucio"
version = "2024.11.1"
version = "2025.2.1"
description = "A python package for managing STAC asset data locality when those assets are managed by a RUCIO server."
authors = ["sean <sean.hoyal@external.eodc.eu>"]
license = "Apache License 2.0"
Expand Down
136 changes: 122 additions & 14 deletions stac_rucio/client.py
Original file line number Diff line number Diff line change
@@ -1,49 +1,157 @@
from pystac import Asset, Item, ItemCollection
from rucio.client import Client
from rucio.client.downloadclient import DownloadClient
from urllib.parse import urlparse, urlunparse

from stac_rucio.models import RucioStac

class ReplicaExists(Exception):
pass

class StacClient:
def __init__(self):
def __init__(self, target):
self.rucio = Client()
self.rucio.whoami()
self.downloader = DownloadClient()
# TODO Should handle a list of targets
self.target = target

def _ensure_port(self, href: str, port: int = 443):

res = urlparse(href)
new_netloc = res.netloc + f":{port}"

return urlunparse(res._replace(netloc=new_netloc))

def _get_existing_replicas(self, item: dict, rse: str = None):

config = RucioStac(**item['assets'][self.target]["rucio:config"])

replicas = [
rep for rep in self.rucio.list_replicas(
dids=[{"scope": self.rucio.self.rucio.account, "name": config.name}],
schemes=[
config.scheme,
],
rse_expression=rse
)
]

if replicas:
if not replicas[0]["rses"]:
# Replicas found, but not for the rse expression
return []

return replicas

def rucio_item(self, items: dict):
"""Take a stac item, and extend the assets with the files as available from Rucio."""

for item in items["features"]:
config = RucioStac(**item["assets"]["mfcover"]["rucio:config"])

replicas = [
replica
for replica in self.rucio.list_replicas(
dids=[{"scope": config.scope, "name": config.name}],
schemes=[
config.scheme,
],
)
]

replicas = self._get_existing_replicas(item, self.target)

for replica in replicas:
for key, value in replica["rses"].items():
# TODO For multiple targets, link asset to original asset.
item["assets"][key] = Asset(
href=value[0], title=f"Rucio Storage Element: {key}"
).to_dict()

def create_replicas(self, items: list, src_rse: str):
"""Create replicas for stac items if they do not already exist at a non-deterministic RSE."""

for item in items:
# TODO Put this into it's own function and add exception handling for existing replicas.

tmp = item.to_dict()
config = RucioStac(**tmp["assets"][self.target]["rucio:config"])

replicas = self._get_existing_replicas(tmp, src_rse)

if not replicas:
self.rucio.add_replica(
rse=src_rse,
scope=self.rucio.account,
name=config.name,
pfn=self._ensure_port(tmp["assets"][self.target]["href"]),
bytes_=config.size,
adler32=config.adler32
)

return True

def create_replication_rules(self, items: list, src_rse: str, dst_rse: str):
"""Create replication rules for stac items existing as a non-deterministic RSE to replication to a different rse location."""

for item in items:
# TODO Put this into it's own function and add exception handling for existing replication rules.

tmp = item.to_dict()
config = RucioStac(**tmp["assets"][self.target]["rucio:config"])

replicas = self._get_existing_replicas(tmp, self.target, src_rse)

if not replicas:
self.rucio.add_replication_rule(
dids=[{"scope": self.rucio.account, "name": config.name}],
copies=1,
rse_expression=dst_rse
)

return True

def replication_availability(self, items: list, rse: str):
"""Determine the number of available replications based off the replication rule state. """

item_ids = [ item.id for item in items ]

rules = [ x for x in self.rucio.list_replication_rules(filters={"scope":self.rucio.account}) if x["name"] in item_ids]

available = [ rule for rule in rules if rule["state"] == "OK"]
stuck = [ rule for rule in rules if rule["state"] == "STUCK"]
replicating = [ rule for rule in rules if rule["state"] == "REPLICATING"]

return {
"OK": len(available),
"STUCK": len(stuck),
"REPLICATING": len(replicating),
"Total": len(rules)
}


def download(self, item, rse):
"""Download from a specic RSE."""

config = RucioStac(**item.assets["mfcover"].extra_fields["rucio:config"])
config = RucioStac(**item.assets[self.target].extra_fields["rucio:config"])

self.downloader.download_dids(
items=[
{
"did": f"{config.scope}:{config.name}",
"did": f"{self.rucio.account}:{config.name}",
"rse": rse,
"pfn": item.assets[rse].href,
}
]
)

def get_available_names(self, items: list, rse: str):

item_ids = [ item.id for item in items ]
rules = [
rule["name"] for rule in self.rucio.list_replication_rules(filters={"scope":self.rucio.account}) if rule["name"] in item_ids and rule["state"] == "OK"
]

return rules


def download_available(self, items: list, rse: str):

available_names = self.get_available_names(items, rse)
available_items = [ item for item in items if item.id in available_names ]

print("Available: ", len(available_items))
for item in available_items:
self.download(item, rse)

return True
2 changes: 2 additions & 0 deletions stac_rucio/models.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,3 +6,5 @@ class RucioStac(BaseModel):
scope: str = Field(alias="rucio:scope")
name: str = Field(alias="rucio:name")
scheme: str = Field(alias="rucio:scheme", default="https")
size: int = Field(alias="rucio:size")
adler32: str = Field(alias="rucio:adler32")

0 comments on commit 584d827

Please sign in to comment.