Skip to content

Commit

Permalink
Merge pull request #448 from hotosm/develop
Browse files Browse the repository at this point in the history
Production Release
  • Loading branch information
nrjadkry authored Jan 25, 2025
2 parents d09300d + 85e9901 commit 3d627f1
Show file tree
Hide file tree
Showing 16 changed files with 1,276 additions and 57 deletions.
4 changes: 2 additions & 2 deletions .pre-commit-config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,7 @@ repos:

# Deps: ensure Python uv lockfile is up to date
- repo: https://github.com/astral-sh/uv-pre-commit
rev: 0.5.13
rev: 0.5.21
hooks:
- id: uv-lock
files: src/backend/pyproject.toml
Expand All @@ -95,7 +95,7 @@ repos:
# Lint / autoformat: Python code
- repo: https://github.com/astral-sh/ruff-pre-commit
# Ruff version.
rev: "v0.9.1"
rev: "v0.9.2"
hooks:
# Run the linter
- id: ruff
Expand Down
81 changes: 55 additions & 26 deletions src/backend/app/projects/image_processing.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,16 +3,22 @@
import uuid
import tempfile
import shutil
import asyncio
import aiohttp
from pathlib import Path
from app.tasks import task_logic
from app.models.enums import State, ImageProcessingStatus
from app.utils import timestamp
from app.db import database
from app.projects import project_logic
from pyodm import Node
from app.s3 import get_file_from_bucket, list_objects_from_bucket, add_file_to_bucket
from app.s3 import (
get_file_from_bucket,
list_objects_from_bucket,
add_file_to_bucket,
get_presigned_url,
)
from loguru import logger as log
from concurrent.futures import ThreadPoolExecutor
from psycopg import Connection
from asgiref.sync import async_to_sync
from app.config import settings
Expand Down Expand Up @@ -46,6 +52,7 @@ def __init__(
self.db = db
self.task_id = task_id
self.task_ids = task_ids or ([] if task_id is None else [task_id])
self.max_concurrent_downloads = 4

def options_list_to_dict(
self, options: List[Dict[str, Any]] = None
Expand Down Expand Up @@ -77,30 +84,31 @@ def list_images(self, directory: str) -> List[str]:
images.append(str(file))
return images

def download_object(self, bucket_name: str, obj, images_folder: str):
"""
Download an object from the bucket if it's an image or related file.
async def download_image(self, session, url, save_path):
try:
async with session.get(url) as response:
if response.status == 200:
with open(save_path, "wb") as f:
f.write(await response.read())
else:
log.error(f"Failed to download {url}, status: {response.status}")
except Exception as e:
log.error(f"Error downloading {url}: {e}")

:param bucket_name: Bucket name
:param obj: Object to download
:param images_folder: Destination folder
"""
if obj.object_name.endswith(
(".jpg", ".jpeg", ".JPG", ".png", ".PNG", ".txt", ".laz")
):
local_path = Path(images_folder) / Path(obj.object_name).name
local_path.parent.mkdir(parents=True, exist_ok=True)
get_file_from_bucket(bucket_name, obj.object_name, local_path)

def download_images_from_s3(
self, bucket_name: str, local_dir: str, task_id: Optional[uuid.UUID] = None
async def download_images_from_s3(
self,
bucket_name: str,
local_dir: str,
task_id: Optional[uuid.UUID] = None,
batch_size: int = 20,
):
"""
Download images from S3 for a specific task or project.
Asynchronously download images from S3 in batches.
:param bucket_name: Bucket name
:param local_dir: Local directory to save images
:param task_id: Optional specific task ID
:param batch_size: Number of images to download concurrently
"""
prefix = (
f"dtm-data/projects/{self.project_id}/{task_id}"
Expand All @@ -109,11 +117,32 @@ def download_images_from_s3(
)
objects = list_objects_from_bucket(bucket_name, prefix)

with ThreadPoolExecutor() as executor:
executor.map(
lambda obj: self.download_object(bucket_name, obj, local_dir),
objects,
)
if not objects:
log.warning(f"No images found in S3 for task {task_id}")
return

log.info(f"Downloading images from S3 for task {task_id}...")

s3_download_url = settings.S3_DOWNLOAD_ROOT
if s3_download_url:
object_urls = [f"{s3_download_url}/{obj.object_name}" for obj in objects]
else:
# generate pre-signed URL for each object
object_urls = [
get_presigned_url(bucket_name, obj.object_name, 12) for obj in objects
]

async with aiohttp.ClientSession() as session:
for i in range(0, len(object_urls), batch_size):
batch = object_urls[i : i + batch_size]

tasks = [
self.download_image(
session, url, os.path.join(local_dir, f"file_{i + 1}.jpg")
)
for i, url in enumerate(batch)
]
await asyncio.gather(*tasks)

def process_new_task(
self,
Expand Down Expand Up @@ -163,7 +192,7 @@ async def _process_images(
images_list = []
# Download images based on single or multiple task processing
if single_task: # and self.task_id:
self.download_images_from_s3(bucket_name, temp_dir, self.task_id)
await self.download_images_from_s3(bucket_name, temp_dir, self.task_id)
images_list = self.list_images(temp_dir)
else:
gcp_list_file = f"dtm-data/projects/{self.project_id}/gcp/gcp_list.txt"
Expand All @@ -178,7 +207,7 @@ async def _process_images(
)

for task_id in self.task_ids:
self.download_images_from_s3(bucket_name, temp_dir, task_id)
await self.download_images_from_s3(bucket_name, temp_dir, task_id)
images_list.extend(self.list_images(temp_dir))

# Start a new processing task
Expand Down
4 changes: 2 additions & 2 deletions src/backend/app/projects/project_routes.py
Original file line number Diff line number Diff line change
Expand Up @@ -504,7 +504,7 @@ async def odm_webhook_for_processing_whole_project(
node_odm_url=settings.NODE_ODM_URL,
dtm_project_id=dtm_project_id,
odm_task_id=odm_task_id,
odm_status=status["code"],
odm_status_code=status["code"],
)

return {"message": "Webhook received", "task_id": dtm_project_id}
Expand Down Expand Up @@ -542,7 +542,7 @@ async def odm_webhook_for_processing_a_single_task(
message="Task completed.",
dtm_task_id=dtm_task_id,
dtm_user_id=dtm_user_id,
odm_status=40,
odm_status_code=40,
)

elif status["code"] == 30 and state_value != State.IMAGE_PROCESSING_FAILED:
Expand Down
4 changes: 3 additions & 1 deletion src/backend/app/waypoints/waypoint_routes.py
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@ async def get_task_waypoint(
task_id: uuid.UUID,
download: bool = True,
mode: FlightMode = FlightMode.waylines,
rotation_angle: float = 0,
take_off_point: waypoint_schemas.PointField = None,
):
"""
Expand All @@ -65,6 +66,7 @@ async def get_task_waypoint(
If `download` is True, returns a KMZ file as a download response.
"""

rotation_angle = 360 - rotation_angle
task_geojson = await get_task_geojson(db, task_id)
project = await project_deps.get_project_by_id(project_id, db)

Expand Down Expand Up @@ -119,7 +121,7 @@ async def get_task_waypoint(
"gsd": gsd,
"forward_overlap": forward_overlap,
"side_overlap": side_overlap,
"rotation_angle": 0,
"rotation_angle": rotation_angle,
"generate_3d": generate_3d,
"take_off_point": take_off_point,
}
Expand Down
3 changes: 2 additions & 1 deletion src/backend/pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -27,13 +27,14 @@ dependencies = [
"Jinja2>=3.1.4",
"numpy==1.26.4",
"GDAL==3.6.2",
"aiohttp>=3.11.11",
"aiosmtplib>=3.0.1",
"python-slugify>=8.0.4",
"psycopg2>=2.9.9",
"pyodm>=1.5.11",
"asgiref>=3.8.1",
"bcrypt>=4.2.1",
"drone-flightplan>=0.3.4rc2",
"drone-flightplan>=0.3.4",
"Scrapy==2.12.0",
"asgi-lifespan>=2.1.0",
]
Expand Down
Loading

0 comments on commit 3d627f1

Please sign in to comment.