diff --git a/.dockerignore b/.dockerignore index df40d809..c7168f07 100644 --- a/.dockerignore +++ b/.dockerignore @@ -1,13 +1,12 @@ *~ *# *.swp -**/*.iml +**/*Dockerfile **/*.DS_Store **/*.tox **/*.idea **/*venv **/__pycache__/ -**/*.py[cod] **/*$py.class **/*.egg-info/ **/.coverage @@ -17,5 +16,4 @@ **/.mypy_cache **/doc/_apidoc/ **/build -**/dist **/htmlcov diff --git a/.github/workflows/python-tox.yml b/.github/workflows/python-tox.yml index 469027ca..82f17e69 100644 --- a/.github/workflows/python-tox.yml +++ b/.github/workflows/python-tox.yml @@ -13,10 +13,10 @@ jobs: - uses: actions/checkout@v4 with: lfs: 'true' - - name: Set up Python 3.10 - uses: actions/setup-python@v3 + - name: Set up Python 3.11 + uses: actions/setup-python@v5 with: - python-version: "3.10" + python-version: "3.11" - name: Install dependencies run: | python -m pip install --upgrade pip tox diff --git a/Dockerfile b/Dockerfile index f3228330..b3e15319 100755 --- a/Dockerfile +++ b/Dockerfile @@ -1,48 +1,64 @@ -FROM public.ecr.aws/amazonlinux/amazonlinux:2023 as model_runner +# Stage 1: Build environment +FROM public.ecr.aws/amazonlinux/amazonlinux:2023-minimal as build -# only override if you're using a mirror with a cert pulled in using cert-base as a build parameter +# Set up build arguments and environment variables ARG BUILD_CERT=/etc/pki/ca-trust/extracted/pem/tls-ca-bundle.pem ARG PIP_INSTALL_LOCATION=https://pypi.org/simple/ +ARG MINICONDA_VERSION=Miniconda3-latest-Linux-x86_64 +ARG MINICONDA_URL=https://repo.anaconda.com/miniconda/${MINICONDA_VERSION}.sh +ENV PATH=/opt/conda/bin:$PATH +ENV CONDA_TARGET_ENV=osml_model_runner -# give sudo permissions -USER root - -# set working directory to home +# Set working directory WORKDIR /home -# configure, update, and refresh yum enviornment -RUN yum update -y && yum clean all && yum makecache && yum install -y wget shadow-utils +# Install necessary packages +USER root +RUN dnf update -y && dnf install -y wget shadow-utils gcc && dnf clean all + +# Install Miniconda +RUN wget -c ${MINICONDA_URL} && \ + chmod +x ${MINICONDA_VERSION}.sh && \ + ./${MINICONDA_VERSION}.sh -b -f -p /opt/conda && \ + rm ${MINICONDA_VERSION}.sh && \ + ln -s /opt/conda/etc/profile.d/conda.sh /etc/profile.d/conda.sh + +# Copy the conda environment file +COPY environment-py311.yml environment.yml + +# Create the conda environment and remove additional unnecessary files +RUN conda env create -f environment.yml \ + && conda clean -afy \ + && find /opt/conda/ -follow -type f -name '*.a' -delete \ + && find /opt/conda/ -follow -type f -name '*.pyc' -delete \ + && find /opt/conda/ -follow -type f -name '*.js.map' -delete + +# Copy the application source code +COPY . osml-model-runner -# install miniconda -ARG MINICONDA_VERSION=Miniconda3-latest-Linux-x86_64 -ARG MINICONDA_URL=https://repo.anaconda.com/miniconda/${MINICONDA_VERSION}.sh -RUN wget -c ${MINICONDA_URL} \ - && chmod +x ${MINICONDA_VERSION}.sh \ - && ./${MINICONDA_VERSION}.sh -b -f -p /opt/conda \ - && rm ${MINICONDA_VERSION}.sh \ - && ln -s /opt/conda/etc/profile.d/conda.sh /etc/profile.d/conda.sh +# Install the model runner application +RUN . /opt/conda/etc/profile.d/conda.sh && \ + conda activate ${CONDA_TARGET_ENV} && \ + python3 -m pip install osml-model-runner/. -# add conda to the path so we can execute it by name -ENV PATH=/opt/conda/bin:$PATH +# Stage 2: Runtime environment +FROM public.ecr.aws/amazonlinux/amazonlinux:2023-minimal as model_runner -# set all the ENV vars needed for build +# Set up runtime environment variables ENV CONDA_TARGET_ENV=osml_model_runner -ENV CC="clang" -ENV CXX="clang++" -ENV ARCHFLAGS="-arch x86_64" -ENV LD_LIBRARY_PATH="${LD_LIBRARY_PATH}:/opt/conda/lib/:/opt/conda/bin:/usr/include:/usr/local/" -ENV PROJ_LIB=/opt/conda/share/proj - -# copy our conda env configuration for Python 3.10 -COPY environment-py310.yml environment.yml - -# create the conda env -RUN conda env create - -# create /entry.sh which will be our new shell entry point -# this performs actions to configure the environment -# before starting a new shell (which inherits the env). -# the exec is important as this allows signals to passpw +ENV PATH=/opt/conda/bin:$PATH + +# Set working directory +WORKDIR /home + +# Copy the conda environment from the build stage +COPY --from=build /opt/conda /opt/conda +RUN ln -s /opt/conda/etc/profile.d/conda.sh /etc/profile.d/conda.sh + +# Copy the application from the build stage +COPY --from=build /home/osml-model-runner /home/osml-model-runner + +# Create entrypoint script RUN (echo '#!/bin/bash' \ && echo '__conda_setup="$(/opt/conda/bin/conda shell.bash hook 2> /dev/null)"' \ && echo 'eval "$__conda_setup"' \ @@ -51,29 +67,12 @@ RUN (echo '#!/bin/bash' \ && echo 'exec "$@"'\ ) >> /entry.sh && chmod +x /entry.sh -# tell the docker build process to use this for RUN. -# the default shell on Linux is ["/bin/sh", "-c"], and on Windows is ["cmd", "/S", "/C"] -SHELL ["/entry.sh", "/bin/bash", "-c"] - -# configure .bashrc to drop into a conda env and immediately activate our TARGET env -RUN conda init && echo 'conda activate "${CONDA_TARGET_ENV:-base}"' >> ~/.bashrc - -# copy our lcoal application source into the container -COPY . osml-model-runner - -# install the model runner application from source -RUN python3 -m pip install osml-model-runner/. - -# clean up the conda install -RUN conda clean -afy - -# set up a health check at that port -HEALTHCHECK NONE +# Configure user and permissions +RUN dnf install -y shadow-utils +RUN adduser modelrunner && \ + chown -R modelrunner:modelrunner ./ -# set up a user to run the container as and assume it -RUN adduser modelrunner -RUN chown -R modelrunner:modelrunner ./ USER modelrunner -# set the entry point script +# Set entry point ENTRYPOINT ["/entry.sh", "/bin/bash", "-c", "python3 osml-model-runner/bin/oversightml-mr-entry-point.py"] diff --git a/bin/oversightml-mr-entry-point.py b/bin/oversightml-mr-entry-point.py index 3278c756..659658f1 100644 --- a/bin/oversightml-mr-entry-point.py +++ b/bin/oversightml-mr-entry-point.py @@ -9,8 +9,10 @@ from typing import Optional from codeguru_profiler_agent import Profiler +from pythonjsonlogger import jsonlogger from aws.osml.model_runner.app import ModelRunner +from aws.osml.model_runner.common import ThreadingLocalContextFilter def handler_stop_signals(signal_num: int, frame: Optional[FrameType], model_runner: ModelRunner) -> None: @@ -18,6 +20,12 @@ def handler_stop_signals(signal_num: int, frame: Optional[FrameType], model_runn def configure_logging(verbose: bool) -> None: + """ + This function configures the Python logging module to use a JSON formatter with and thread local context + variables. + + :param verbose: if true the logging level will be set to DEBUG, otherwise it will be set to INFO. + """ logging_level = logging.INFO if verbose: logging_level = logging.DEBUG @@ -27,7 +35,10 @@ def configure_logging(verbose: bool) -> None: ch = logging.StreamHandler() ch.setLevel(logging_level) - formatter = logging.Formatter("%(levelname)-8s %(message)s") + ch.addFilter(ThreadingLocalContextFilter(["job_id", "image_id"])) + formatter = jsonlogger.JsonFormatter( + fmt="%(levelname)s %(message)s %(job_id)s %(image_id)s", datefmt="%Y-%m-%dT%H:%M:%S" + ) ch.setFormatter(formatter) root_logger.addHandler(ch) diff --git a/environment-py310.yml b/environment-py310.yml deleted file mode 100644 index 4dc46099..00000000 --- a/environment-py310.yml +++ /dev/null @@ -1,7 +0,0 @@ -name: osml_model_runner -channels: - - conda-forge -dependencies: - - conda-forge::python=3.10.12 - - conda-forge::gdal=3.7.0 - - conda-forge::proj=9.2.1 diff --git a/environment-py311.yml b/environment-py311.yml new file mode 100644 index 00000000..4cbe0965 --- /dev/null +++ b/environment-py311.yml @@ -0,0 +1,7 @@ +name: osml_model_runner +channels: + - conda-forge +dependencies: + - conda-forge::python=3.11.6 + - conda-forge::gdal=3.8.3 + - conda-forge::proj=9.3.1 diff --git a/environment.yml b/environment.yml index 72b64358..ecab725e 100644 --- a/environment.yml +++ b/environment.yml @@ -2,5 +2,5 @@ name: osml_model_runner channels: - conda-forge dependencies: - - conda-forge::gdal=3.7.0 - - conda-forge::proj=9.2.1 + - conda-forge::gdal=3.8.3 + - conda-forge::proj=9.3.1 diff --git a/setup.cfg b/setup.cfg index 90e00eb4..f4b2b0c8 100755 --- a/setup.cfg +++ b/setup.cfg @@ -1,6 +1,6 @@ [metadata] name = osml-model-runner -version = 1.3.0 +version = 1.4.0 description = Application to run large scale imagery against AI/ML models long_description = file: README.md long_description_content_type = text/markdown @@ -40,20 +40,21 @@ include_package_data = True install_requires = osml-imagery-toolkit>=1.2.0 - numpy>=1.23.0 - shapely>=1.8.5 - aws-embedded-metrics==3.1.0 - boto3==1.28.1 - botocore==1.31.1 - setuptools==68.0.0 - cachetools==5.3.0 - geojson>=3.0.0 + numpy>=1.24.4 + shapely>=2.0.2 + aws-embedded-metrics==3.2.0 + python-json-logger>=2.0.0 + boto3==1.34.28 + botocore==1.34.28 + setuptools==69.0.3 + cachetools==5.3.2 + geojson>=3.1.0 scikit-optimize>=0.9.0 - pyproj>=3.6.0 + pyproj>=3.6.1 scipy==1.9.1;python_version<'3.11.0' - scipy==1.11.0;python_version>='3.11' + scipy==1.12.0;python_version>='3.11' argparse==1.4.0 - dacite==1.8.0 + dacite==1.8.1 ensemble-boxes==1.0.9 codeguru-profiler-agent==1.2.4 defusedxml>=0.7.1 @@ -70,6 +71,6 @@ package_data = [options.extras_require] gdal = - gdal>=3.7.0 + gdal>=3.8.3 test = tox diff --git a/src/aws/osml/model_runner/app.py b/src/aws/osml/model_runner/app.py index 1e7370bf..9ed46f8e 100755 --- a/src/aws/osml/model_runner/app.py +++ b/src/aws/osml/model_runner/app.py @@ -1,12 +1,15 @@ # Copyright 2023 Amazon.com, Inc. or its affiliates. import ast +import functools import json import logging +import math from dataclasses import asdict from datetime import datetime from decimal import Decimal from json import dumps +from math import degrees from secrets import token_hex from typing import Any, Dict, List, Optional, Tuple @@ -25,7 +28,7 @@ load_gdal_dataset, set_gdal_default_configuration, ) -from aws.osml.photogrammetry import DigitalElevationModel, ElevationModel, SensorModel, SRTMTileSet +from aws.osml.photogrammetry import DigitalElevationModel, ElevationModel, ImageCoordinate, SensorModel, SRTMTileSet from .api import VALID_MODEL_HOSTING_OPTIONS, ImageRequest, InvalidImageRequestException, RegionRequest, SinkMode from .app_config import MetricLabels, ServiceConfig @@ -37,6 +40,7 @@ ImageRegion, ImageRequestStatus, RegionRequestStatus, + ThreadingLocalContextFilter, Timer, build_embedded_metrics_config, get_credentials_for_assumed_role, @@ -62,7 +66,7 @@ from .tile_worker import generate_crops, process_tiles, setup_tile_workers # Set up metrics configuration -Config = build_embedded_metrics_config() +build_embedded_metrics_config() # Set up logging configuration logger = logging.getLogger(__name__) @@ -122,8 +126,7 @@ def create_elevation_model() -> Optional[ElevationModel]: return None - @metric_scope - def monitor_work_queues(self, metrics: MetricsLogger = None) -> None: + def monitor_work_queues(self) -> None: """ Monitors SQS queues for ImageRequest and RegionRequest The region work queue is checked first and will wait for up to 10 seconds to start work. Only if no regions need to be processed in that time will this worker check to @@ -141,6 +144,7 @@ def monitor_work_queues(self, metrics: MetricsLogger = None) -> None: while self.running: logger.debug("Checking work queue for regions to process ...") (receipt_handle, region_request_attributes) = next(self.region_requests_iter) + ThreadingLocalContextFilter.set_context(region_request_attributes) # If we found a region request on the queue if region_request_attributes is not None: @@ -158,6 +162,7 @@ def monitor_work_queues(self, metrics: MetricsLogger = None) -> None: # Load the image into a GDAL dataset raster_dataset, sensor_model = load_gdal_dataset(image_path) + image_format = str(raster_dataset.GetDriver().ShortName).upper() # Get RegionRequestItem if not create new RegionRequestItem region_request_item = self.region_request_table.get_region_request( @@ -186,7 +191,7 @@ def monitor_work_queues(self, metrics: MetricsLogger = None) -> None: # Check if the image is complete if self.job_table.is_image_request_complete(image_request_item): # If so complete the image request - self.complete_image_request(region_request) + self.complete_image_request(region_request, image_format) # Update the queue self.region_request_queue.finish_request(receipt_handle) @@ -210,6 +215,7 @@ def monitor_work_queues(self, metrics: MetricsLogger = None) -> None: try: # Parse the message into a working ImageRequest image_request = ImageRequest.from_external_message(image_request_message) + ThreadingLocalContextFilter.set_context(image_request.__dict__) # Check that our image request looks good if not image_request.is_valid(): @@ -235,14 +241,13 @@ def monitor_work_queues(self, metrics: MetricsLogger = None) -> None: job_arn=min_job_arn, processing_time=Decimal(0), ) - self.fail_image_request_send_messages(minimal_job_item, err, metrics) + self.fail_image_request_send_messages(minimal_job_item, err) self.image_request_queue.finish_request(receipt_handle) finally: # If we stop monitoring the queue set run state to false self.running = False - @metric_scope - def process_image_request(self, image_request: ImageRequest, metrics: MetricsLogger = None) -> None: + def process_image_request(self, image_request: ImageRequest) -> None: """ Processes ImageRequest objects that are picked up from queue. Loads the specified image into memory to be chipped apart into regions and sent downstream for processing via RegionRequest. This will also process the @@ -251,12 +256,9 @@ def process_image_request(self, image_request: ImageRequest, metrics: MetricsLog other workers in this cluster. :param image_request: ImageRequest = the image request derived from the ImageRequest SQS message - :param metrics: MetricsLogger = the metrics logger to use to report metrics. :return: None """ - if isinstance(metrics, MetricsLogger): - metrics.set_dimensions() image_request_item = None try: if ServiceConfig.self_throttling: @@ -292,11 +294,11 @@ def process_image_request(self, image_request: ImageRequest, metrics: MetricsLog self.status_monitor.process_event(image_request_item, ImageRequestStatus.STARTED, "Started image request") # Check we have a valid image request, throws if not - self.validate_model_hosting(image_request_item, metrics) + self.validate_model_hosting(image_request_item) # Load the relevant image meta data into memory image_extension, raster_dataset, sensor_model, all_regions = self.load_image_request( - image_request_item, image_request.roi, metrics + image_request_item, image_request.roi ) if sensor_model is None: @@ -312,7 +314,7 @@ def process_image_request(self, image_request: ImageRequest, metrics: MetricsLog image_request_item.width = Decimal(raster_dataset.RasterXSize) image_request_item.height = Decimal(raster_dataset.RasterYSize) try: - image_request_item.extents = json.dumps(ModelRunner.get_extents(raster_dataset)) + image_request_item.extents = json.dumps(ModelRunner.get_extents(raster_dataset, sensor_model)) except Exception as e: logger.warning(f"Could not get extents for image: {image_request_item.image_id}") logger.exception(e) @@ -339,7 +341,7 @@ def process_image_request(self, image_request: ImageRequest, metrics: MetricsLog except Exception as err: # We failed try and gracefully update our image request if image_request_item: - self.fail_image_request(image_request_item, err, metrics) + self.fail_image_request(image_request_item, err) else: minimal_job_item = JobItem( image_id=image_request.image_id, @@ -347,7 +349,7 @@ def process_image_request(self, image_request: ImageRequest, metrics: MetricsLog job_arn=image_request.job_arn, processing_time=Decimal(0), ) - self.fail_image_request(minimal_job_item, err, metrics) + self.fail_image_request(minimal_job_item, err) # Let the application know that we failed to process image raise ProcessImageException("Failed to process image region!") from err @@ -434,7 +436,8 @@ def queue_region_request( # If the image is finished then complete it if self.job_table.is_image_request_complete(image_request_item): - self.complete_image_request(first_region_request) + image_format = str(raster_dataset.GetDriver().ShortName).upper() + self.complete_image_request(first_region_request, image_format) @metric_scope def process_region_request( @@ -463,13 +466,17 @@ def process_region_request( if not region_request.is_valid(): logger.error("Invalid Region Request! {}".format(region_request.__dict__)) - if isinstance(metrics, MetricsLogger): - metrics.put_metric(MetricLabels.INVALID_REQUEST, 1, str(Unit.COUNT.value)) - metrics.put_metric(MetricLabels.REGION_PROCESSING_ERROR, 1, str(Unit.COUNT.value)) raise ValueError("Invalid Region Request") if isinstance(metrics, MetricsLogger): - metrics.put_dimensions({"ImageFormat": region_request.image_extension}) + image_format = str(raster_dataset.GetDriver().ShortName).upper() + metrics.put_dimensions( + { + MetricLabels.OPERATION_DIMENSION: MetricLabels.REGION_PROCESSING_OPERATION, + MetricLabels.MODEL_NAME_DIMENSION: region_request.model_name, + MetricLabels.INPUT_FORMAT_DIMENSION: image_format, + } + ) if ServiceConfig.self_throttling: max_regions = self.endpoint_utils.calculate_max_regions( @@ -481,7 +488,7 @@ def process_region_request( if in_progress >= max_regions: if isinstance(metrics, MetricsLogger): - metrics.put_metric(MetricLabels.REGIONS_SELF_THROTTLED, 1, str(Unit.COUNT.value)) + metrics.put_metric(MetricLabels.THROTTLES, 1, str(Unit.COUNT.value)) logger.info("Throttling region request. (Max: {} In-progress: {}".format(max_regions, in_progress)) raise SelfThrottledRegionException @@ -491,16 +498,16 @@ def process_region_request( try: with Timer( task_str="Processing region {} {}".format(region_request.image_url, region_request.region_bounds), - metric_name=MetricLabels.REGION_LATENCY, + metric_name=MetricLabels.DURATION, logger=logger, metrics_logger=metrics, ): # Set up our threaded tile worker pool - tile_queue, tile_workers = setup_tile_workers(region_request, sensor_model, self.elevation_model, metrics) + tile_queue, tile_workers = setup_tile_workers(region_request, sensor_model, self.elevation_model) # Process all our tiles total_tile_count, tile_error_count = process_tiles( - region_request, tile_queue, tile_workers, raster_dataset, metrics, sensor_model + region_request, tile_queue, tile_workers, raster_dataset, sensor_model ) # Update table w/ total tile counts @@ -517,8 +524,8 @@ def process_region_request( # Write CloudWatch Metrics to the Logs if isinstance(metrics, MetricsLogger): - metrics.put_metric(MetricLabels.REGIONS_PROCESSED, 1, str(Unit.COUNT.value)) - metrics.put_metric(MetricLabels.TILES_PROCESSED, total_tile_count, str(Unit.COUNT.value)) + # TODO: Consider adding the +1 invocation to timer + metrics.put_metric(MetricLabels.INVOCATIONS, 1, str(Unit.COUNT.value)) # Return the updated item return image_request_item @@ -538,7 +545,6 @@ def process_region_request( def load_image_request( image_request_item: JobItem, roi: shapely.geometry.base.BaseGeometry, - metrics: MetricsLogger = None, ) -> Tuple[str, Dataset, Optional[SensorModel], List[ImageRegion]]: """ Loads the required image file metadata into memory to be chipped apart into regions and @@ -546,7 +552,6 @@ def load_image_request( :param image_request_item: JobItem = the region request to update. :param roi: BaseGeometry = the region of interest shape - :param metrics: MetricsLogger = the metrics logger to use to report metrics. :return: Tuple[Queue, List[TileWorker]: A list of tile workers and the queue that manages them """ @@ -563,12 +568,9 @@ def load_image_request( with GDALConfigEnv().with_aws_credentials(assumed_credentials): # Use GDAL to access the dataset and geo positioning metadata if not image_request_item.image_url: - if isinstance(metrics, MetricsLogger): - metrics.put_metric(MetricLabels.NO_IMAGE_URL, 1, str(Unit.COUNT.value)) - metrics.put_metric(MetricLabels.IMAGE_PROCESSING_ERROR, 1, str(Unit.COUNT.value)) raise InvalidImageURLException("No image URL specified. Image URL is required.") - # If the image request have a valid s3 image url, otherwise this is a local file + # If the image request has a valid s3 image url, otherwise this is a local file if "s3:/" in image_request_item.image_url: # Validate that image exists in S3 ImageRequest.validate_image_path(image_request_item.image_url, image_request_item.image_read_role) @@ -579,21 +581,16 @@ def load_image_request( # Use gdal to load the image url we were given raster_dataset, sensor_model = load_gdal_dataset(image_path) - if isinstance(metrics, MetricsLogger): - image_extension = get_image_extension(image_path) - metrics.put_dimensions({"ImageFormat": image_extension}) + image_extension = get_image_extension(image_path) # Determine how much of this image should be processed. # Bounds are: UL corner (row, column) , dimensions (w, h) processing_bounds = calculate_processing_bounds(raster_dataset, roi, sensor_model) if not processing_bounds: logger.info("Requested ROI does not intersect image. Nothing to do") - if isinstance(metrics, MetricsLogger): - metrics.put_metric(MetricLabels.INVALID_ROI, 1, str(Unit.COUNT.value)) - metrics.put_metric(MetricLabels.IMAGE_PROCESSING_ERROR, 1, str(Unit.COUNT.value)) raise LoadImageException("Failed to create processing bounds for image!") else: - # Calculate a set of ML engine sized regions that we need to process for this image + # Calculate a set of ML engine-sized regions that we need to process for this image # Region size chosen to break large images into pieces that can be handled by a # single tile worker region_size: ImageDimensions = ast.literal_eval(ServiceConfig.region_size) @@ -606,49 +603,37 @@ def load_image_request( return image_extension, raster_dataset, sensor_model, all_regions - def fail_image_request(self, image_request_item: JobItem, err: Exception, metrics: MetricsLogger = None) -> None: + def fail_image_request(self, image_request_item: JobItem, err: Exception) -> None: """ Handles failure events/exceptions for image requests and tries to update the status monitor accordingly :param image_request_item: JobItem = the image request that failed. :param err: Exception = the exception that caused the failure - :param metrics: MetricsLogger = the metrics logger to use to report metrics. :return: None """ - self.fail_image_request_send_messages(image_request_item, err, metrics) + self.fail_image_request_send_messages(image_request_item, err) self.job_table.end_image_request(image_request_item.image_id) - def fail_image_request_send_messages( - self, image_request_item: JobItem, err: Exception, metrics: MetricsLogger = None - ) -> None: + def fail_image_request_send_messages(self, image_request_item: JobItem, err: Exception) -> None: """ Updates failed metrics and update the status monitor accordingly :param image_request_item: JobItem = the image request that failed. :param err: Exception = the exception that caused the failure - :param metrics: MetricsLogger = the metrics logger to use to report metrics. :return: None """ logger.exception("Failed to start image processing!: {}".format(err)) - if isinstance(metrics, MetricsLogger): - metrics.put_metric(MetricLabels.PROCESSING_FAILURE, 1, str(Unit.COUNT.value)) - metrics.put_metric(MetricLabels.IMAGE_PROCESSING_ERROR, 1, str(Unit.COUNT.value)) self.status_monitor.process_event(image_request_item, ImageRequestStatus.FAILED, str(err)) - def complete_image_request( - self, - region_request: RegionRequest, - metrics: MetricsLogger = None, - ) -> None: + def complete_image_request(self, region_request: RegionRequest, image_format: str) -> None: """ Runs after every region has completed processing to check if that was the last region and run required completion logic for the associated ImageRequest. :param region_request: RegionRequest = the region request to update. - :param metrics: MetricsLogger = the metrics logger to use to report metrics. - + :param image_format: the format of the image :return: None """ try: @@ -671,29 +656,53 @@ def complete_image_request( "Failed to write features to S3 or Kinesis! Please check the " "log..." ) - if isinstance(metrics, MetricsLogger): - # Record model used for this image - metrics.set_dimensions() - metrics.put_dimensions({"ModelName": image_request_item.model_name}) - # Put our end time on our image_request_item completed_image_request_item = self.job_table.end_image_request(image_request_item.image_id) # Ensure we have a valid start time for our record + # TODO: Figure out why we wouldn't have a valid start time?!?! if completed_image_request_item.processing_time is not None: image_request_status = self.status_monitor.get_image_request_status(completed_image_request_item) self.status_monitor.process_event( completed_image_request_item, image_request_status, "Completed image processing" ) - if isinstance(metrics, MetricsLogger): - processing_time = float(completed_image_request_item.processing_time) - metrics.put_metric(MetricLabels.IMAGE_LATENCY, processing_time, str(Unit.SECONDS.value)) + self.generate_image_processing_metrics(completed_image_request_item, image_format) else: raise InvalidImageRequestException("ImageRequest has no start time") except Exception as err: raise AggregateFeaturesException("Failed to aggregate features for region!") from err + @metric_scope + def generate_image_processing_metrics( + self, image_request_item: JobItem, image_format: str, metrics: MetricsLogger = None + ) -> None: + """ + Output the metrics for the full image processing timeline. + + :param image_request_item: the completed image request item that tracks the duration and error counts + :param image_format: the input image format + :param metrics: the current metric scope + """ + if not metrics: + logger.warning("Unable to generate image processing metrics. Metrics logger is None!") + return + + if isinstance(metrics, MetricsLogger): + metrics.set_dimensions() + metrics.put_dimensions( + { + MetricLabels.OPERATION_DIMENSION: MetricLabels.IMAGE_PROCESSING_OPERATION, + MetricLabels.MODEL_NAME_DIMENSION: image_request_item.model_name, + MetricLabels.INPUT_FORMAT_DIMENSION: image_format, + } + ) + + metrics.put_metric(MetricLabels.DURATION, float(image_request_item.processing_time), str(Unit.SECONDS.value)) + metrics.put_metric(MetricLabels.INVOCATIONS, 1, str(Unit.COUNT.value)) + if image_request_item.region_error > 0: + metrics.put_metric(MetricLabels.ERRORS, 1, str(Unit.COUNT.value)) + def fail_region_request( self, region_request_item: RegionRequestItem, @@ -709,8 +718,7 @@ def fail_region_request( :return: None """ if isinstance(metrics, MetricsLogger): - metrics.put_metric(MetricLabels.PROCESSING_FAILURE, 1, str(Unit.COUNT.value)) - metrics.put_metric(MetricLabels.REGION_PROCESSING_ERROR, 1, str(Unit.COUNT.value)) + metrics.put_metric(MetricLabels.ERRORS, 1, str(Unit.COUNT.value)) try: region_request_item = self.region_request_table.complete_region_request( region_request_item, RegionRequestStatus.FAILED @@ -721,12 +729,11 @@ def fail_region_request( logger.exception(status_error) raise ProcessRegionException("Failed to process image region!") - def validate_model_hosting(self, image_request: JobItem, metrics: MetricsLogger = None): + def validate_model_hosting(self, image_request: JobItem): """ Validates that the image request is valid. If not, raises an exception. :param image_request: JobItem = the image request - :param metrics: MetricsLogger = the metrics logger to use to report metrics. :return: None """ @@ -737,15 +744,12 @@ def validate_model_hosting(self, image_request: JobItem, metrics: MetricsLogger ImageRequestStatus.FAILED, error, ) - if isinstance(metrics, MetricsLogger): - metrics.put_metric(MetricLabels.UNSUPPORTED_MODEL_HOST, 1, str(Unit.COUNT.value)) - metrics.put_metric(MetricLabels.IMAGE_PROCESSING_ERROR, 1, str(Unit.COUNT.value)) raise UnsupportedModelException(error) @staticmethod + @metric_scope def aggregate_features( - image_request_item: JobItem, - feature_table: FeatureTable, + image_request_item: JobItem, feature_table: FeatureTable, metrics: MetricsLogger = None ) -> List[Feature]: """ For a given image processing job - aggregate all the features that were collected for it and @@ -753,16 +757,24 @@ def aggregate_features( :param image_request_item: JobItem = the image request :param feature_table: FeatureTable = the table storing features from all completed regions + :param metrics: the current metrics scope :return: List[geojson.Feature] = the list of features """ - # Ensure we are given a validate tile size and overlap - if image_request_item.tile_size and image_request_item.tile_overlap: - # Read all the features from DDB. + if isinstance(metrics, MetricsLogger): + metrics.set_dimensions() + metrics.put_dimensions( + { + MetricLabels.OPERATION_DIMENSION: MetricLabels.FEATURE_AGG_OPERATION, + } + ) + + with Timer( + task_str="Aggregating Features", metric_name=MetricLabels.DURATION, logger=logger, metrics_logger=metrics + ): features = feature_table.get_features(image_request_item.image_id) logger.info(f"Total features aggregated: {len(features)}") - else: - raise AggregateFeaturesException("Tile size and overlap must be provided for feature aggregation") + return features @staticmethod @@ -850,9 +862,16 @@ def select_features( """ if isinstance(metrics, MetricsLogger): metrics.set_dimensions() + metrics.put_dimensions( + { + MetricLabels.OPERATION_DIMENSION: MetricLabels.FEATURE_SELECTION_OPERATION, + } + ) + metrics.put_metric(MetricLabels.INVOCATIONS, 1, str(Unit.COUNT.value)) + with Timer( task_str="Select (deduplicate) image features", - metric_name=MetricLabels.FEATURE_SELECTION_LATENCY, + metric_name=MetricLabels.DURATION, logger=logger, metrics_logger=metrics, ): @@ -898,44 +917,62 @@ def select_features( return deduped_features @staticmethod - def sync_features(image_request_item: JobItem, features: List[Feature]) -> bool: + @metric_scope + def sync_features(image_request_item: JobItem, features: List[Feature], metrics: MetricsLogger = None) -> bool: """ Writing the features output to S3 and/or Kinesis Stream :param image_request_item: JobItem = the job table item for an image request :param features: List[Features] = the list of features to update + :param metrics: the current metrics scope :return: bool = if it has successfully written to an output sync """ - tracking_output_sinks = { - "S3": False, - "Kinesis": False, - } # format: job_id = {"s3": true, "kinesis": true} - - # Ensure we have outputs defined for where to dump our features - if image_request_item.outputs: - logging.info("Writing aggregate feature for job '{}'".format(image_request_item.job_id)) - for sink in SinkFactory.outputs_to_sinks(json.loads(image_request_item.outputs)): - if sink.mode == SinkMode.AGGREGATE and image_request_item.job_id: - is_write_output_succeeded = sink.write(image_request_item.job_id, features) - tracking_output_sinks[sink.name()] = is_write_output_succeeded - - # Log them let them know if both written to both outputs (S3 and Kinesis) or one in another - # If both couldn't write to either stream because both were down, return False. Otherwise True - if tracking_output_sinks["S3"] and not tracking_output_sinks["Kinesis"]: - logging.info("OSMLModelRunner was able to write the features to S3 but not Kinesis. Continuing...") - return True - elif not tracking_output_sinks["S3"] and tracking_output_sinks["Kinesis"]: - logging.info("OSMLModelRunner was able to write the features to Kinesis but not S3. Continuing...") - return True - elif tracking_output_sinks["S3"] and tracking_output_sinks["Kinesis"]: - logging.info("OSMLModelRunner was able to write the features to both S3 and Kinesis. Continuing...") - return True + + if isinstance(metrics, MetricsLogger): + metrics.set_dimensions() + metrics.put_dimensions( + { + MetricLabels.OPERATION_DIMENSION: MetricLabels.FEATURE_DISSEMINATE_OPERATION, + } + ) + metrics.put_metric(MetricLabels.INVOCATIONS, 1, str(Unit.COUNT.value)) + + with Timer( + task_str="Sink image features", + metric_name=MetricLabels.DURATION, + logger=logger, + metrics_logger=metrics, + ): + tracking_output_sinks = { + "S3": False, + "Kinesis": False, + } # format: job_id = {"s3": true, "kinesis": true} + + # Ensure we have outputs defined for where to dump our features + if image_request_item.outputs: + logging.info("Writing aggregate feature for job '{}'".format(image_request_item.job_id)) + for sink in SinkFactory.outputs_to_sinks(json.loads(image_request_item.outputs)): + if sink.mode == SinkMode.AGGREGATE and image_request_item.job_id: + is_write_output_succeeded = sink.write(image_request_item.job_id, features) + tracking_output_sinks[sink.name()] = is_write_output_succeeded + + # Log them let them know if both written to both outputs (S3 and Kinesis) or one in another + # If both couldn't write to either stream because both were down, return False. Otherwise True + if tracking_output_sinks["S3"] and not tracking_output_sinks["Kinesis"]: + logging.info("OSMLModelRunner was able to write the features to S3 but not Kinesis. Continuing...") + return True + elif not tracking_output_sinks["S3"] and tracking_output_sinks["Kinesis"]: + logging.info("OSMLModelRunner was able to write the features to Kinesis but not S3. Continuing...") + return True + elif tracking_output_sinks["S3"] and tracking_output_sinks["Kinesis"]: + logging.info("OSMLModelRunner was able to write the features to both S3 and Kinesis. Continuing...") + return True + else: + logging.error("OSMLModelRunner was not able to write the features to either S3 or Kinesis. Failing...") + return False else: - logging.error("OSMLModelRunner was not able to write the features to either S3 or Kinesis. Failing...") - return False - else: - raise InvalidImageRequestException("No output destinations were defined for this image request!") + raise InvalidImageRequestException("No output destinations were defined for this image request!") def add_properties_to_features(self, image_request_item: JobItem, features: List[Feature]) -> List[Feature]: """ @@ -1001,22 +1038,38 @@ def get_inference_metadata_property(image_request_item: JobItem, inference_time: return inference_metadata_property @staticmethod - def get_extents(ds: gdal.Dataset) -> dict[str, Any]: + def get_extents(ds: gdal.Dataset, sm: SensorModel) -> Dict[str, Any]: """ - Returns a list of driver extensions - - :param ds: the gdal dataset + Returns the geographic extents of the given GDAL dataset. - :return: List[number] = the extent of the image + :param ds: GDAL dataset. + :param sm: OSML Sensor Model imputed for dataset + :return: Dictionary with keys 'north', 'south', 'east', 'west' representing the extents. """ - geo_transform = ds.GetGeoTransform() - minx = geo_transform[0] - maxy = geo_transform[3] - maxx = minx + geo_transform[1] * ds.RasterXSize - miny = maxy + geo_transform[5] * ds.RasterYSize - - extents = {"north": maxy, "south": miny, "east": maxx, "west": minx} - return extents + try: + # Compute WGS-84 world coordinates for each image corners to impute the extents for visualizations + image_corners = [[0, 0], [ds.RasterXSize, 0], [ds.RasterXSize, ds.RasterYSize], [0, ds.RasterYSize]] + geo_image_corners = [sm.image_to_world(ImageCoordinate(corner)) for corner in image_corners] + locations = [(degrees(p.latitude), degrees(p.longitude)) for p in geo_image_corners] + feature_bounds = functools.reduce( + lambda prev, f: [ + min(f[0], prev[0]), + min(f[1], prev[1]), + max(f[0], prev[2]), + max(f[1], prev[3]), + ], + locations, + [math.inf, math.inf, -math.inf, -math.inf], + ) + + return { + "north": feature_bounds[2], + "south": feature_bounds[0], + "east": feature_bounds[3], + "west": feature_bounds[1], + } + except Exception as e: + logger.error(f"Error in getting extents: {e}") @staticmethod def calculate_region_status(total_tile_count: int, tile_error_count: int) -> RegionRequestStatus: diff --git a/src/aws/osml/model_runner/app_config.py b/src/aws/osml/model_runner/app_config.py index 45621958..10a9323b 100755 --- a/src/aws/osml/model_runner/app_config.py +++ b/src/aws/osml/model_runner/app_config.py @@ -89,32 +89,28 @@ class MetricLabels(str, Enum): Enumeration defining the metric labels used by OSML """ - ENDPOINT_LATENCY = "EndpointLatency" - ENDPOINT_RETRY_COUNT = "EndpointRetryCount" - FEATURE_AGG_LATENCY = "FeatureAggLatency" - FEATURE_SELECTION_LATENCY = "FeatureSelectionLatency" - FEATURE_ERROR = "FeatureError" - FEATURE_STORE_LATENCY = "FeatureStoreLatency" - IMAGE_PROCESSING_ERROR = "ImageProcessingError" - METADATA_LATENCY = "MetadataLatency" - MODEL_INVOCATION = "ModelInvocation" - MODEL_ERROR = "ModelError" - REGION_LATENCY = "RegionLatency" - REGION_PROCESSING_ERROR = "RegionProcessingError" - REGIONS_PROCESSED = "RegionsProcessed" - REGIONS_SELF_THROTTLED = "RegionsSelfThrottled" - TILING_LATENCY = "TilingLatency" - TILES_PROCESSED = "TilesProcessed" - IMAGE_LATENCY = ("ImageLatency",) - FEATURE_DECODE = "FeatureDecodeError" - FEATURE_MISSING_GEO = "FeatureMissingGeometry" - FEATURE_TO_SHAPE = "FeatureToShapeConversion" - FEATURE_UPDATE = "FeatureUpdateFailure" - FEATURE_UPDATE_EXCEPTION = "FeatureUpdateException" - INVALID_REQUEST = "InvalidRequest" - INVALID_ROI = "InvalidROI" - NO_IMAGE_URL = "NoImageURL" - PROCESSING_FAILURE = "ProcessingFailure" - TILE_PROCESSING_ERROR = "TileProcessingError" - TILE_CREATION_FAILURE = "TileCreationFailure" - UNSUPPORTED_MODEL_HOST = "UnsupportedModelHost" + # These are based on common metric names used by a variety of AWS services (e.g. Lambda) + DURATION = "Duration" + INVOCATIONS = "Invocations" + ERRORS = "Errors" + THROTTLES = "Throttles" + RETRIES = "Retries" + + # These dimensions allow us to limit the scope of a metric value to a particular portion of the + # ModelRunner application, a data type, or input format. + OPERATION_DIMENSION = "Operation" + MODEL_NAME_DIMENSION = "ModelName" + INPUT_FORMAT_DIMENSION = "InputFormat" + + # These operation names can be used along with the Operation dimension to restrict the scope + # of the common metrics to a specific portion of the ModelRunner application. + IMAGE_PROCESSING_OPERATION = "ImageProcessing" + REGION_PROCESSING_OPERATION = "RegionProcessing" + TILE_GENERATION_OPERATION = "TileGeneration" + TILE_PROCESSING_OPERATION = "TileProcessing" + MODEL_INVOCATION_OPERATION = "ModelInvocation" + FEATURE_REFINEMENT_OPERATION = "FeatureRefinement" + FEATURE_STORAGE_OPERATION = "FeatureStorage" + FEATURE_AGG_OPERATION = "FeatureAggregation" + FEATURE_SELECTION_OPERATION = "FeatureSelection" + FEATURE_DISSEMINATE_OPERATION = "FeatureDissemination" diff --git a/src/aws/osml/model_runner/common/__init__.py b/src/aws/osml/model_runner/common/__init__.py index f46b0c69..0544886a 100755 --- a/src/aws/osml/model_runner/common/__init__.py +++ b/src/aws/osml/model_runner/common/__init__.py @@ -8,6 +8,7 @@ from .credentials_utils import get_credentials_for_assumed_role from .endpoint_utils import EndpointUtils from .exceptions import InvalidAssumedRoleException, InvalidClassificationException +from .log_context import ThreadingLocalContextFilter from .metrics_utils import build_embedded_metrics_config from .mr_post_processing import ( FeatureDistillationAlgorithm, diff --git a/src/aws/osml/model_runner/common/log_context.py b/src/aws/osml/model_runner/common/log_context.py new file mode 100644 index 00000000..f40ec691 --- /dev/null +++ b/src/aws/osml/model_runner/common/log_context.py @@ -0,0 +1,39 @@ +import logging +import threading +from typing import List, Optional + +_LOG_CONTEXT = threading.local() + + +class ThreadingLocalContextFilter(logging.Filter): + """ + This is a filter that injects contextual information into the log message. The contextual information is + set using the static methods of this class. + """ + + def __init__(self, attribute_names: List[str]): + super().__init__() + self.attribute_names = attribute_names + + def filter(self, record: logging.LogRecord) -> bool: + """ + This method is called for each log record. It injects the contextual information into the log record. + + :param record: the log record to filter + :return: True, this filter does not exclude information from the log + """ + for attribute_name in self.attribute_names: + setattr(record, attribute_name, getattr(_LOG_CONTEXT, attribute_name, None)) + return True + + @staticmethod + def set_context(context: Optional[dict]): + """ + Set the context for the current thread. If None all context information is cleared. + + :param context: dict = the context to set + """ + if context is None: + _LOG_CONTEXT.__dict__.clear() + else: + _LOG_CONTEXT.__dict__.update(context) diff --git a/src/aws/osml/model_runner/common/metrics_utils.py b/src/aws/osml/model_runner/common/metrics_utils.py index 83945487..a4c4342b 100755 --- a/src/aws/osml/model_runner/common/metrics_utils.py +++ b/src/aws/osml/model_runner/common/metrics_utils.py @@ -10,5 +10,5 @@ def build_embedded_metrics_config(): metrics_config = get_config() metrics_config.service_name = "OSML" metrics_config.log_group_name = "/aws/OSML/MRService" - metrics_config.namespace = "OSML" + metrics_config.namespace = "OSML/ModelRunner" metrics_config.environment = "local" diff --git a/src/aws/osml/model_runner/database/feature_table.py b/src/aws/osml/model_runner/database/feature_table.py index 79522587..2c141d01 100755 --- a/src/aws/osml/model_runner/database/feature_table.py +++ b/src/aws/osml/model_runner/database/feature_table.py @@ -89,6 +89,13 @@ def add_features(self, features: List[Feature], metrics: MetricsLogger = None): """ if isinstance(metrics, MetricsLogger): metrics.set_dimensions() + metrics.put_dimensions( + { + MetricLabels.OPERATION_DIMENSION: MetricLabels.FEATURE_STORAGE_OPERATION, + } + ) + metrics.put_metric(MetricLabels.INVOCATIONS, 1, str(Unit.COUNT.value)) + start_time_millisec = int(time.time() * 1000) # These records are temporary and will expire 24 hours after creation. Jobs should take # minutes to run, so this time should be conservative enough to let a team debug an urgent @@ -96,7 +103,7 @@ def add_features(self, features: List[Feature], metrics: MetricsLogger = None): expire_time_epoch_sec = Decimal(int(start_time_millisec / 1000) + (2 * 60 * 60)) with Timer( task_str="Add image features", - metric_name=MetricLabels.FEATURE_STORE_LATENCY, + metric_name=MetricLabels.DURATION, logger=logger, metrics_logger=metrics, ): @@ -148,8 +155,7 @@ def add_features(self, features: List[Feature], metrics: MetricsLogger = None): except Exception as err: if isinstance(metrics, MetricsLogger): - metrics.put_metric(MetricLabels.FEATURE_UPDATE_EXCEPTION, 1, str(Unit.COUNT.value)) - metrics.put_metric(MetricLabels.FEATURE_ERROR, 1, str(Unit.COUNT.value)) + metrics.put_metric(MetricLabels.ERRORS, 1, str(Unit.COUNT.value)) raise AddFeaturesException("Failed to add features for tile!") from err @metric_scope @@ -172,12 +178,14 @@ def process_query(index: int): if isinstance(metrics, MetricsLogger): metrics.set_dimensions() + metrics.put_dimensions({MetricLabels.OPERATION_DIMENSION: MetricLabels.FEATURE_AGG_OPERATION}) + metrics.put_metric(MetricLabels.INVOCATIONS, 1, str(Unit.COUNT.value)) features: List[Feature] = [] with Timer( task_str="Aggregate image features", - metric_name=MetricLabels.FEATURE_AGG_LATENCY, + metric_name=MetricLabels.DURATION, logger=logger, metrics_logger=metrics, ): diff --git a/src/aws/osml/model_runner/inference/http_detector.py b/src/aws/osml/model_runner/inference/http_detector.py index 2d9b91bf..3fd648a5 100644 --- a/src/aws/osml/model_runner/inference/http_detector.py +++ b/src/aws/osml/model_runner/inference/http_detector.py @@ -111,16 +111,21 @@ def find_features(self, payload: BufferedReader, metrics: MetricsLogger) -> Feat logger.info("Invoking Model: {}".format(self.name)) if isinstance(metrics, MetricsLogger): metrics.set_dimensions() - metrics.put_dimensions({"ModelName": self.name}) + metrics.put_dimensions( + { + MetricLabels.OPERATION_DIMENSION: MetricLabels.MODEL_INVOCATION_OPERATION, + MetricLabels.MODEL_NAME_DIMENSION: self.name, + } + ) try: self.request_count += 1 if isinstance(metrics, MetricsLogger): - metrics.put_metric(MetricLabels.MODEL_INVOCATION, 1, str(Unit.COUNT.value)) + metrics.put_metric(MetricLabels.INVOCATIONS, 1, str(Unit.COUNT.value)) with Timer( task_str="Invoke HTTP Endpoint", - metric_name=MetricLabels.ENDPOINT_LATENCY, + metric_name=MetricLabels.DURATION, logger=logger, metrics_logger=metrics, ): @@ -138,25 +143,24 @@ def find_features(self, payload: BufferedReader, metrics: MetricsLogger) -> Feat # get the history of retries and count them retry_count = self.retry.retry_counts if isinstance(metrics, MetricsLogger): - metrics.put_metric(MetricLabels.ENDPOINT_RETRY_COUNT, retry_count, str(Unit.COUNT.value)) + metrics.put_metric(MetricLabels.RETRIES, retry_count, str(Unit.COUNT.value)) return geojson.loads(response.data.decode("utf-8")) except RetryError as err: self.error_count += 1 if isinstance(metrics, MetricsLogger): - metrics.put_metric(MetricLabels.MODEL_ERROR, 1, str(Unit.COUNT.value)) + metrics.put_metric(MetricLabels.ERRORS, 1, str(Unit.COUNT.value)) logger.error("Retry failed - failed due to {}".format(err)) logger.exception(err) except MaxRetryError as err: self.error_count += 1 if isinstance(metrics, MetricsLogger): - metrics.put_metric(MetricLabels.MODEL_ERROR, 1, str(Unit.COUNT.value)) + metrics.put_metric(MetricLabels.ERRORS, 1, str(Unit.COUNT.value)) logger.error("Max retries reached - failed due to {}".format(err.reason)) logger.exception(err) except JSONDecodeError as err: self.error_count += 1 if isinstance(metrics, MetricsLogger): - metrics.put_metric(MetricLabels.FEATURE_DECODE, 1, str(Unit.COUNT.value)) - metrics.put_metric(MetricLabels.MODEL_ERROR, 1, str(Unit.COUNT.value)) + metrics.put_metric(MetricLabels.ERRORS, 1, str(Unit.COUNT.value)) logger.error( "Unable to decode response from model. URL: {}, Status: {}, Headers: {}, Response: {}".format( self.endpoint, response.status, response.info(), response.data diff --git a/src/aws/osml/model_runner/inference/sm_detector.py b/src/aws/osml/model_runner/inference/sm_detector.py index b7e65ae5..e0480a3b 100644 --- a/src/aws/osml/model_runner/inference/sm_detector.py +++ b/src/aws/osml/model_runner/inference/sm_detector.py @@ -73,16 +73,21 @@ def find_features(self, payload: BufferedReader, metrics: MetricsLogger) -> Feat logger.info("Invoking Model: {}".format(self.endpoint)) if isinstance(metrics, MetricsLogger): metrics.set_dimensions() - metrics.put_dimensions({"ModelName": self.endpoint}) + metrics.put_dimensions( + { + MetricLabels.OPERATION_DIMENSION: MetricLabels.MODEL_INVOCATION_OPERATION, + MetricLabels.MODEL_NAME_DIMENSION: self.endpoint, + } + ) try: self.request_count += 1 if isinstance(metrics, MetricsLogger): - metrics.put_metric(MetricLabels.MODEL_INVOCATION, 1, str(Unit.COUNT.value)) + metrics.put_metric(MetricLabels.INVOCATIONS, 1, str(Unit.COUNT.value)) with Timer( task_str="Invoke SM Endpoint", - metric_name=MetricLabels.ENDPOINT_LATENCY, + metric_name=MetricLabels.DURATION, logger=logger, metrics_logger=metrics, ): @@ -98,7 +103,7 @@ def find_features(self, payload: BufferedReader, metrics: MetricsLogger) -> Feat model_response = self.sm_client.invoke_endpoint(EndpointName=self.endpoint, Body=payload) retry_count = model_response.get("ResponseMetadata", {}).get("RetryAttempts", 0) if isinstance(metrics, MetricsLogger): - metrics.put_metric(MetricLabels.ENDPOINT_RETRY_COUNT, retry_count, str(Unit.COUNT.value)) + metrics.put_metric(MetricLabels.RETRIES, retry_count, str(Unit.COUNT.value)) # We are expecting the body of the message to contain a geojson FeatureCollection return geojson.loads(model_response.get("Body").read()) @@ -108,7 +113,7 @@ def find_features(self, payload: BufferedReader, metrics: MetricsLogger) -> Feat error_code = ce.response.get("Error", {}).get("Code") http_status_code = ce.response.get("ResponseMetadata", {}).get("HTTPStatusCode") if isinstance(metrics, MetricsLogger): - metrics.put_metric(MetricLabels.MODEL_ERROR, 1, str(Unit.COUNT.value)) + metrics.put_metric(MetricLabels.ERRORS, 1, str(Unit.COUNT.value)) logger.error( "Unable to get detections from model - HTTP Status Code: {}, Error Code: {}".format( http_status_code, error_code @@ -119,8 +124,7 @@ def find_features(self, payload: BufferedReader, metrics: MetricsLogger) -> Feat except JSONDecodeError as de: self.error_count += 1 if isinstance(metrics, MetricsLogger): - metrics.put_metric(MetricLabels.FEATURE_DECODE, 1, str(Unit.COUNT.value)) - metrics.put_metric(MetricLabels.MODEL_ERROR, 1, str(Unit.COUNT.value)) + metrics.put_metric(MetricLabels.ERRORS, 1, str(Unit.COUNT.value)) logger.error("Unable to decode response from model.") logger.exception(de) diff --git a/src/aws/osml/model_runner/status/status_monitor.py b/src/aws/osml/model_runner/status/status_monitor.py index 77560d82..16a63f88 100755 --- a/src/aws/osml/model_runner/status/status_monitor.py +++ b/src/aws/osml/model_runner/status/status_monitor.py @@ -33,8 +33,14 @@ def process_event(self, image_request_item: JobItem, status: ImageRequestStatus, and image_request_item.processing_time is not None ): try: - status_message = f"StatusMonitor update: {status} {image_request_item.job_id}: {message}" - logging.info(status_message) + logging.info( + "StatusMonitorUpdate", + extra={ + "reason": message, + "status": status, + "request": image_request_item.__dict__, + }, + ) sns_message_attributes = ImageRequestStatusMessage( image_status=status, @@ -44,6 +50,7 @@ def process_event(self, image_request_item: JobItem, status: ImageRequestStatus, processing_duration=image_request_item.processing_time, ) + status_message = f"StatusMonitor update: {status} {image_request_item.job_id}: {message}" self.image_status_sns.publish_message( status_message, sns_message_attributes.asdict_str_values(), diff --git a/src/aws/osml/model_runner/tile_worker/tile_worker.py b/src/aws/osml/model_runner/tile_worker/tile_worker.py index 76b2d4d0..ea8e1f4f 100755 --- a/src/aws/osml/model_runner/tile_worker/tile_worker.py +++ b/src/aws/osml/model_runner/tile_worker/tile_worker.py @@ -5,19 +5,23 @@ from datetime import datetime, timezone from queue import Queue from threading import Thread -from typing import Dict, Optional +from typing import Dict, List, Optional +import geojson from aws_embedded_metrics.logger.metrics_logger import MetricsLogger +from aws_embedded_metrics.metric_scope import metric_scope from aws_embedded_metrics.unit import Unit from shapely.affinity import translate from shapely.geometry import Polygon from aws.osml.model_runner.app_config import MetricLabels -from aws.osml.model_runner.common import GeojsonDetectionField +from aws.osml.model_runner.common import GeojsonDetectionField, ThreadingLocalContextFilter, Timer from aws.osml.model_runner.database import FeatureTable from aws.osml.model_runner.inference import Detector from aws.osml.model_runner.tile_worker import FeatureRefinery +logger = logging.getLogger(__name__) + class TileWorker(Thread): def __init__( @@ -26,20 +30,19 @@ def __init__( feature_detector: Detector, feature_refinery: Optional[FeatureRefinery], feature_table: FeatureTable, - metrics: MetricsLogger, ) -> None: super().__init__() self.in_queue = in_queue self.feature_detector = feature_detector self.feature_refinery = feature_refinery self.feature_table = feature_table - self.metrics = metrics def run(self) -> None: thread_event_loop = asyncio.new_event_loop() asyncio.set_event_loop(thread_event_loop) while True: image_info: Dict = self.in_queue.get() + ThreadingLocalContextFilter.set_context(image_info) if image_info is None: logging.info("All images processed. Stopping tile worker.") @@ -51,55 +54,7 @@ def run(self) -> None: break try: - logging.info("Invoking Feature Detector Endpoint") - with open(image_info["image_path"], mode="rb") as payload: - feature_collection = self.feature_detector.find_features(payload) - - # Convert the features to reference the full image - features = [] - ulx = image_info["region"][0][1] - uly = image_info["region"][0][0] - if isinstance(feature_collection, dict) and "features" in feature_collection: - logging.info("SM Model returned {} features".format(len(feature_collection["features"]))) - for feature in feature_collection["features"]: - # model returned a mask - scaled_polygon = None - if GeojsonDetectionField.GEOM in feature["properties"]: - polygon = Polygon(feature["properties"][GeojsonDetectionField.GEOM]) - scaled_polygon = translate(polygon, xoff=ulx, yoff=uly) - feature["properties"][GeojsonDetectionField.GEOM] = list(scaled_polygon.exterior.coords) - # model returned a bounding box - if GeojsonDetectionField.BOUNDS in feature["properties"]: - tile_bbox = feature["properties"][GeojsonDetectionField.BOUNDS] - scaled_bbox = translate( - Polygon(FeatureRefinery.imcoords_bbox_to_polygon(tile_bbox)), xoff=ulx, yoff=uly - ) - feature["properties"][GeojsonDetectionField.BOUNDS] = scaled_bbox.bounds - elif scaled_polygon is not None: - feature["properties"][GeojsonDetectionField.BOUNDS] = scaled_polygon.bounds - else: - logging.warning(f"There isn't a valid detection shape for feature: {feature}") - feature["properties"]["image_id"] = image_info["image_id"] - feature["properties"]["inferenceTime"] = datetime.now(tz=timezone.utc).isoformat() - FeatureRefinery.feature_property_transformation(feature) - features.append(feature) - - logging.info("# Features Created: {}".format(len(features))) - if len(features) > 0: - if self.feature_refinery is not None: - # Create a geometry for each feature in the result. The geographic coordinates of these - # features are computed using the sensor model provided in the image metadata - self.feature_refinery.refine_features_for_tile(features) - logging.info("Created Geographic Coordinates for {} features".format(len(features))) - - self.feature_table.add_features(features) - except Exception as e: - logging.error("Failed to process region tile!") - logging.exception(e) - self.feature_detector.error_count += 1 # borrow the feature detector error count to tally other errors - if self.metrics: - self.metrics.put_metric(MetricLabels.TILE_PROCESSING_ERROR, 1, str(Unit.COUNT.value)) - self.metrics.put_metric(MetricLabels.TILE_CREATION_FAILURE, 1, str(Unit.COUNT.value)) + self.process_tile(image_info) finally: self.in_queue.task_done() @@ -109,3 +64,110 @@ def run(self) -> None: except Exception as e: logging.warning("Failed to stop and close the thread event loop") logging.exception(e) + + @metric_scope + def process_tile(self, image_info: Dict, metrics: MetricsLogger = None) -> None: + """ + This method handles the processing of a single tile by invoking the ML model, geolocating the detections to + create features and finally storing the features in the database. + + :param image_info: description of the tile to be processed + :param metrics: the current metric scope + """ + if isinstance(metrics, MetricsLogger): + metrics.set_dimensions() + metrics.put_dimensions( + { + MetricLabels.OPERATION_DIMENSION: MetricLabels.TILE_PROCESSING_OPERATION, + MetricLabels.MODEL_NAME_DIMENSION: self.feature_detector.endpoint, + } + ) + metrics.put_metric(MetricLabels.INVOCATIONS, 1, str(Unit.COUNT.value)) + + try: + with Timer( + task_str=f"Processing Tile {image_info['image_path']}", + metric_name=MetricLabels.DURATION, + logger=logger, + metrics_logger=metrics, + ): + with open(image_info["image_path"], mode="rb") as payload: + feature_collection = self.feature_detector.find_features(payload) + + features = self._refine_features(feature_collection, image_info) + + if len(features) > 0: + self.feature_table.add_features(features) + + except Exception as e: + logging.error("Failed to process region tile!") + logging.exception(e) + self.feature_detector.error_count += 1 # borrow the feature detector error count to tally other errors + if isinstance(metrics, MetricsLogger): + metrics.put_metric(MetricLabels.ERRORS, 1, str(Unit.COUNT.value)) + + @metric_scope + def _refine_features(self, feature_collection, image_info: Dict, metrics: MetricsLogger = None) -> List[geojson.Feature]: + """ + This method converts the detections returned by the model into geolocated features. It first updates the + image coordinates of each detection to be in relation to the full image then it geolocates the image feature. + + :param feature_collection: the features from the ML model + :param image_info: a description of the image tile containing the features + :param metrics: the current metric scope + :return: a list of GeoJSON features + """ + if isinstance(metrics, MetricsLogger): + metrics.set_dimensions() + metrics.put_dimensions( + { + MetricLabels.OPERATION_DIMENSION: MetricLabels.FEATURE_REFINEMENT_OPERATION, + } + ) + + with Timer( + task_str=f"Refining Features for Tile:{image_info['image_path']}", + metric_name=MetricLabels.DURATION, + logger=logger, + metrics_logger=metrics, + ): + # TODO: Consider move invocations to Timer + if isinstance(metrics, MetricsLogger): + metrics.put_metric(MetricLabels.INVOCATIONS, 1, str(Unit.COUNT.value)) + + features = [] + ulx = image_info["region"][0][1] + uly = image_info["region"][0][0] + if isinstance(feature_collection, dict) and "features" in feature_collection: + logging.info("SM Model returned {} features".format(len(feature_collection["features"]))) + for feature in feature_collection["features"]: + # model returned a mask + scaled_polygon = None + if GeojsonDetectionField.GEOM in feature["properties"]: + polygon = Polygon(feature["properties"][GeojsonDetectionField.GEOM]) + scaled_polygon = translate(polygon, xoff=ulx, yoff=uly) + feature["properties"][GeojsonDetectionField.GEOM] = list(scaled_polygon.exterior.coords) + # model returned a bounding box + if GeojsonDetectionField.BOUNDS in feature["properties"]: + tile_bbox = feature["properties"][GeojsonDetectionField.BOUNDS] + scaled_bbox = translate( + Polygon(FeatureRefinery.imcoords_bbox_to_polygon(tile_bbox)), xoff=ulx, yoff=uly + ) + feature["properties"][GeojsonDetectionField.BOUNDS] = scaled_bbox.bounds + elif scaled_polygon is not None: + feature["properties"][GeojsonDetectionField.BOUNDS] = scaled_polygon.bounds + else: + logging.warning(f"There isn't a valid detection shape for feature: {feature}") + feature["properties"]["image_id"] = image_info["image_id"] + feature["properties"]["inferenceTime"] = datetime.now(tz=timezone.utc).isoformat() + FeatureRefinery.feature_property_transformation(feature) + features.append(feature) + logging.info("# Features Created: {}".format(len(features))) + if len(features) > 0: + if self.feature_refinery is not None: + # Create a geometry for each feature in the result. The geographic coordinates of these + # features are computed using the sensor model provided in the image metadata + self.feature_refinery.refine_features_for_tile(features) + logging.info("Created Geographic Coordinates for {} features".format(len(features))) + + return features diff --git a/src/aws/osml/model_runner/tile_worker/tile_worker_utils.py b/src/aws/osml/model_runner/tile_worker/tile_worker_utils.py index 82a4426a..02f1cea5 100755 --- a/src/aws/osml/model_runner/tile_worker/tile_worker_utils.py +++ b/src/aws/osml/model_runner/tile_worker/tile_worker_utils.py @@ -8,6 +8,7 @@ from typing import List, Optional, Tuple from aws_embedded_metrics import MetricsLogger +from aws_embedded_metrics.metric_scope import metric_scope from aws_embedded_metrics.unit import Unit from osgeo import gdal @@ -30,7 +31,6 @@ def setup_tile_workers( region_request: RegionRequest, sensor_model: Optional[SensorModel] = None, elevation_model: Optional[ElevationModel] = None, - metrics: MetricsLogger = None, ) -> Tuple[Queue, List[TileWorker]]: """ Sets up a pool of tile-workers to process image tiles from a region request @@ -38,7 +38,6 @@ def setup_tile_workers( :param region_request: RegionRequest = the region request to update. :param sensor_model: Optional[SensorModel] = the sensor model for this raster dataset :param elevation_model: Optional[ElevationModel] = an elevation model used to fix the elevation of the image coordinate - :param metrics: MetricsLogger = the metrics logger to use to report metrics. :return: Tuple[Queue, List[TileWorker] = a list of tile workers and the queue that manages them """ @@ -71,7 +70,7 @@ def setup_tile_workers( if sensor_model is not None: feature_refinery = FeatureRefinery(sensor_model, elevation_model=elevation_model) - worker = TileWorker(tile_queue, feature_detector, feature_refinery, feature_table, metrics) + worker = TileWorker(tile_queue, feature_detector, feature_refinery, feature_table) worker.start() tile_workers.append(worker) @@ -88,7 +87,6 @@ def process_tiles( tile_queue: Queue, tile_workers: List[TileWorker], raster_dataset: gdal.Dataset, - metrics: MetricsLogger = None, sensor_model: Optional[SensorModel] = None, ) -> Tuple[int, int]: """ @@ -98,7 +96,6 @@ def process_tiles( :param tile_queue: Queue = keeps the image in the queue for processing :param tile_workers: List[Tileworker] = the list of tile workers :param raster_dataset: gdal.Dataset = the raster dataset containing the region - :param metrics: MetricsLogger = the metrics logger to use to report metrics. :param sensor_model: Optional[SensorModel] = the sensor model for this raster dataset :return: Tuple[int, int] = number of tiles processed, number of tiles with an error @@ -148,38 +145,10 @@ def process_tiles( # Set a path for the tmp image tmp_image_path = Path(tmp, region_image_filename) - # Use GDAL to create an encoded tile of the image region - absolute_tile_path = tmp_image_path.absolute() - with Timer( - task_str="Creating image tile: {}".format(absolute_tile_path), - metric_name=MetricLabels.TILING_LATENCY, - logger=logger, - metrics_logger=metrics, - ): - encoded_tile_data = gdal_tile_factory.create_encoded_tile( - [tile_bounds[0][1], tile_bounds[0][0], tile_bounds[1][0], tile_bounds[1][1]] - ) - - with open(absolute_tile_path, "wb") as binary_file: - binary_file.write(encoded_tile_data) - - # GDAL doesn't always generate errors, so we need to make sure the NITF - # encoded region was actually created. - if not tmp_image_path.is_file(): - logger.error( - "GDAL unable to create tile %s. Does not exist!", - absolute_tile_path, - ) - if isinstance(metrics, MetricsLogger): - metrics.put_metric(MetricLabels.TILE_CREATION_FAILURE, 1, str(Unit.COUNT.value)) - metrics.put_metric(MetricLabels.REGION_PROCESSING_ERROR, 1, str(Unit.COUNT.value)) + # Generate an encoded tile of the requested image region + absolute_tile_path = _create_tile(gdal_tile_factory, tile_bounds, tmp_image_path) + if not absolute_tile_path: continue - else: - logger.info( - "Created %s size %s", - absolute_tile_path, - sizeof_fmt(tmp_image_path.stat().st_size), - ) # Put the image info on the tile worker queue allowing each tile to be # processed in parallel. @@ -187,6 +156,7 @@ def process_tiles( "image_path": tmp_image_path, "region": tile_bounds, "image_id": region_request.image_id, + "job_id": region_request.job_id, } # Increment our tile count tracking total_tile_count += 1 @@ -219,6 +189,64 @@ def process_tiles( return total_tile_count, tile_error_count +@metric_scope +def _create_tile(gdal_tile_factory, tile_bounds, tmp_image_path, metrics: MetricsLogger = None) -> Optional[str]: + """ + Create an encoded tile of the requested image region. + + :param gdal_tile_factory: the factory used to create the tile + :param tile_bounds: the requested tile boundary + :param tmp_image_path: the output location of the tile + :param metrics: the current metrics scope + :return: the resulting tile path or None if the tile could not be created + """ + if isinstance(metrics, MetricsLogger): + metrics.set_dimensions() + metrics.put_dimensions( + { + MetricLabels.OPERATION_DIMENSION: MetricLabels.TILE_GENERATION_OPERATION, + MetricLabels.INPUT_FORMAT_DIMENSION: str(gdal_tile_factory.raster_dataset.GetDriver().ShortName).upper(), + } + ) + + # Use GDAL to create an encoded tile of the image region + absolute_tile_path = tmp_image_path.absolute() + with Timer( + task_str="Creating image tile: {}".format(absolute_tile_path), + metric_name=MetricLabels.DURATION, + logger=logger, + metrics_logger=metrics, + ): + if isinstance(metrics, MetricsLogger): + metrics.put_metric(MetricLabels.INVOCATIONS, 1, str(Unit.COUNT.value)) + + encoded_tile_data = gdal_tile_factory.create_encoded_tile( + [tile_bounds[0][1], tile_bounds[0][0], tile_bounds[1][0], tile_bounds[1][1]] + ) + + with open(absolute_tile_path, "wb") as binary_file: + binary_file.write(encoded_tile_data) + + # GDAL doesn't always generate errors, so we need to make sure the NITF + # encoded region was actually created. + if not tmp_image_path.is_file(): + logger.error( + "GDAL unable to create tile %s. Does not exist!", + absolute_tile_path, + ) + if isinstance(metrics, MetricsLogger): + metrics.put_metric(MetricLabels.ERRORS, 1, str(Unit.COUNT.value)) + return None + else: + logger.info( + "Created %s size %s", + absolute_tile_path, + sizeof_fmt(tmp_image_path.stat().st_size), + ) + + return absolute_tile_path + + def sizeof_fmt(num: float, suffix: str = "B") -> str: for unit in ["", "Ki", "Mi", "Gi", "Ti", "Pi", "Ei", "Zi"]: if abs(num) < 1024.0: diff --git a/test/aws/osml/model_runner/queue/test_image_request.py b/test/aws/osml/model_runner/api/test_image_request.py similarity index 100% rename from test/aws/osml/model_runner/queue/test_image_request.py rename to test/aws/osml/model_runner/api/test_image_request.py diff --git a/test/aws/osml/model_runner/queue/test_region_request.py b/test/aws/osml/model_runner/api/test_region_request.py similarity index 54% rename from test/aws/osml/model_runner/queue/test_region_request.py rename to test/aws/osml/model_runner/api/test_region_request.py index 11f4894b..a77bc6ef 100755 --- a/test/aws/osml/model_runner/queue/test_region_request.py +++ b/test/aws/osml/model_runner/api/test_region_request.py @@ -1,7 +1,20 @@ -# Copyright 2023 Amazon.com, Inc. or its affiliates. +# Copyright 2023-2024 Amazon.com, Inc. or its affiliates. import unittest -from test.data.sample_request_data import SAMPLE_REGION_REQUEST_DATA + +SAMPLE_REGION_REQUEST_DATA = { + "tile_size": (10, 10), + "tile_overlap": (1, 1), + "tile_format": "NITF", + "image_id": "test-image-id", + "image_url": "test-image-url", + "region_bounds": ((0, 0), (50, 50)), + "model_name": "test-model-name", + "model_invoke_mode": "SM_ENDPOINT", + "output_bucket": "unit-test", + "output_prefix": "region-request", + "execution_role": "arn:aws:iam::012345678910:role/OversightMLBetaInvokeRole", +} class TestRegionRequest(unittest.TestCase): diff --git a/test/aws/osml/model_runner/queue/test_request_utils.py b/test/aws/osml/model_runner/api/test_request_utils.py similarity index 88% rename from test/aws/osml/model_runner/queue/test_request_utils.py rename to test/aws/osml/model_runner/api/test_request_utils.py index a7ddc0fe..ad3171f8 100755 --- a/test/aws/osml/model_runner/queue/test_request_utils.py +++ b/test/aws/osml/model_runner/api/test_request_utils.py @@ -1,7 +1,20 @@ -# Copyright 2023 Amazon.com, Inc. or its affiliates. +# Copyright 2023-2024 Amazon.com, Inc. or its affiliates. import unittest -from test.data.sample_request_data import SAMPLE_REGION_REQUEST_DATA + +SAMPLE_REGION_REQUEST_DATA = { + "tile_size": (10, 10), + "tile_overlap": (1, 1), + "tile_format": "NITF", + "image_id": "test-image-id", + "image_url": "test-image-url", + "region_bounds": ((0, 0), (50, 50)), + "model_name": "test-model-name", + "model_invoke_mode": "SM_ENDPOINT", + "output_bucket": "unit-test", + "output_prefix": "region-request", + "execution_role": "arn:aws:iam::012345678910:role/OversightMLBetaInvokeRole", +} class TestRequestUtils(unittest.TestCase): diff --git a/test/aws/osml/model_runner/common/test_log_context.py b/test/aws/osml/model_runner/common/test_log_context.py new file mode 100644 index 00000000..546a08db --- /dev/null +++ b/test/aws/osml/model_runner/common/test_log_context.py @@ -0,0 +1,33 @@ +import logging +from threading import Thread +from unittest import TestCase, main + + +class TestThread(TestCase): + def test_filter_adds_thread_local_context(self): + from aws.osml.model_runner.common import ThreadingLocalContextFilter + + context_filter = ThreadingLocalContextFilter(attribute_names=["context_value"]) + context_filter.set_context({"context_value": "A"}) + test_log_record = logging.LogRecord("test-name", logging.DEBUG, "some_module.py", 1, "test message", None, None) + assert context_filter.filter(test_log_record) is True + assert test_log_record.context_value == "A" + + thread_log_record = logging.LogRecord("test-name", logging.DEBUG, "some_module.py", 1, "test message", None, None) + + def sample_task(): + context_filter.set_context({"context_value": "B"}) + context_filter.filter(thread_log_record) is True + + thread = Thread(target=sample_task) + thread.start() + thread.join() + assert thread_log_record.context_value == "B" + + test_log_record = logging.LogRecord("test-name", logging.DEBUG, "some_module.py", 1, "test message", None, None) + assert context_filter.filter(test_log_record) is True + assert test_log_record.context_value == "A" + + +if __name__ == "__main__": + main() diff --git a/test/aws/osml/model_runner/common/test_security_classificaiton.py b/test/aws/osml/model_runner/common/test_security_classification.py similarity index 100% rename from test/aws/osml/model_runner/common/test_security_classificaiton.py rename to test/aws/osml/model_runner/common/test_security_classification.py diff --git a/test/aws/osml/model_runner/common/test_timer.py b/test/aws/osml/model_runner/common/test_timer.py index 5324e188..4ca13927 100755 --- a/test/aws/osml/model_runner/common/test_timer.py +++ b/test/aws/osml/model_runner/common/test_timer.py @@ -1,12 +1,11 @@ # Copyright 2023 Amazon.com, Inc. or its affiliates. import time -import unittest +from unittest import TestCase, main +from unittest.mock import Mock -from mock import Mock - -class TestTimer(unittest.TestCase): +class TestTimer(TestCase): def test_timer_with_normalized_unit(self): from aws.osml.model_runner.common import Timer @@ -114,4 +113,4 @@ def test_timer_throw_exception(self): if __name__ == "__main__": - unittest.main() + main() diff --git a/test/aws/osml/model_runner/database/test_ddb_helper.py b/test/aws/osml/model_runner/database/test_ddb_helper.py index 88267a47..ddeb8401 100755 --- a/test/aws/osml/model_runner/database/test_ddb_helper.py +++ b/test/aws/osml/model_runner/database/test_ddb_helper.py @@ -4,14 +4,14 @@ import boto3 from botocore.exceptions import ClientError -from moto import mock_dynamodb +from moto import mock_aws TEST_JOB_TABLE_KEY_SCHEMA = [{"AttributeName": "image_id", "KeyType": "HASH"}] TEST_JOB_TABLE_ATTRIBUTE_DEFINITIONS = [{"AttributeName": "image_id", "AttributeType": "S"}] TEST_IMAGE_ID = "test-image-id" -@mock_dynamodb +@mock_aws class TestDDBHelper(TestCase): def setUp(self): from aws.osml.model_runner.app_config import BotoConfig diff --git a/test/aws/osml/model_runner/database/test_endpoint_statistics_table.py b/test/aws/osml/model_runner/database/test_endpoint_statistics_table.py index 25fb2854..2f1fd6eb 100755 --- a/test/aws/osml/model_runner/database/test_endpoint_statistics_table.py +++ b/test/aws/osml/model_runner/database/test_endpoint_statistics_table.py @@ -5,7 +5,7 @@ import boto3 from botocore.exceptions import ClientError -from moto import mock_dynamodb +from moto import mock_aws TEST_ENDPOINT_TABLE_KEY_SCHEMA = [{"AttributeName": "endpoint", "KeyType": "HASH"}] TEST_ENDPOINT_TABLE_ATTRIBUTE_DEFINITIONS = [ @@ -18,7 +18,7 @@ TEST_MOCK_UPDATE_EXCEPTION = Mock(side_effect=ClientError({"Error": {"Code": 500, "Message": "ClientError"}}, "update_item")) -@mock_dynamodb +@mock_aws class TestEndpointStatisticsTable(unittest.TestCase): def setUp(self): """ diff --git a/test/aws/osml/model_runner/database/test_feature_table.py b/test/aws/osml/model_runner/database/test_feature_table.py index 20d986a7..8e5509c8 100755 --- a/test/aws/osml/model_runner/database/test_feature_table.py +++ b/test/aws/osml/model_runner/database/test_feature_table.py @@ -8,7 +8,7 @@ import geojson from botocore.exceptions import ClientError from botocore.stub import ANY, Stubber -from moto import mock_dynamodb +from moto import mock_aws image_id = ( "7db12549-3bcb-49c8-acba-25d46ef5cbf3:s3://spacenet-dataset/AOIs/AOI_1_Rio/srcData/mosaic_3band/013022223131.tif" # noqa @@ -62,7 +62,7 @@ ) -@mock_dynamodb +@mock_aws class TestFeatureTable(unittest.TestCase): def setUp(self): """ diff --git a/test/aws/osml/model_runner/database/test_job_table.py b/test/aws/osml/model_runner/database/test_job_table.py index 5fd16fb7..3c103fc2 100755 --- a/test/aws/osml/model_runner/database/test_job_table.py +++ b/test/aws/osml/model_runner/database/test_job_table.py @@ -6,7 +6,7 @@ import boto3 from botocore.exceptions import ClientError -from moto import mock_dynamodb +from moto import mock_aws TEST_IMAGE_ID = "test-image-id" TEST_REGION_ID = "test-region-id" @@ -17,7 +17,7 @@ TEST_MOCK_UPDATE_EXCEPTION = Mock(side_effect=ClientError({"Error": {"Code": 500, "Message": "ClientError"}}, "update_item")) -@mock_dynamodb +@mock_aws class TestJobTable(unittest.TestCase): def setUp(self): """ diff --git a/test/aws/osml/model_runner/database/test_region_request_table.py b/test/aws/osml/model_runner/database/test_region_request_table.py index 1e7e8e43..a967b713 100755 --- a/test/aws/osml/model_runner/database/test_region_request_table.py +++ b/test/aws/osml/model_runner/database/test_region_request_table.py @@ -5,7 +5,7 @@ import boto3 from botocore.exceptions import ClientError -from moto import mock_dynamodb +from moto import mock_aws TEST_IMAGE_ID = "test-image-id" TEST_REGION_ID = "test-region-id" @@ -22,7 +22,7 @@ ] -@mock_dynamodb +@mock_aws class TestRegionRequestTable(unittest.TestCase): def setUp(self): """ diff --git a/test/aws/osml/model_runner/inference/test_feature_detector_factory.py b/test/aws/osml/model_runner/inference/test_endpoint_factory.py similarity index 100% rename from test/aws/osml/model_runner/inference/test_feature_detector_factory.py rename to test/aws/osml/model_runner/inference/test_endpoint_factory.py diff --git a/test/aws/osml/model_runner/inference/test_http_detector.py b/test/aws/osml/model_runner/inference/test_http_detector.py index 0bdececc..a5595f08 100644 --- a/test/aws/osml/model_runner/inference/test_http_detector.py +++ b/test/aws/osml/model_runner/inference/test_http_detector.py @@ -1,7 +1,7 @@ import json from unittest import TestCase +from unittest.mock import patch -import mock from urllib3.response import HTTPResponse MOCK_RESPONSE = HTTPResponse( @@ -30,7 +30,7 @@ class TestSMDetector(TestCase): - @mock.patch("aws.osml.model_runner.inference.http_detector.urllib3.PoolManager", autospec=True) + @patch("aws.osml.model_runner.inference.http_detector.urllib3.PoolManager", autospec=True) def test_find_features(self, mock_pool_manager): from aws.osml.model_runner.inference import HTTPDetector @@ -46,7 +46,7 @@ def test_find_features(self, mock_pool_manager): assert feature_collection["type"] == "FeatureCollection" assert len(feature_collection["features"]) == 1 - @mock.patch("aws.osml.model_runner.inference.http_detector.urllib3.PoolManager", autospec=True) + @patch("aws.osml.model_runner.inference.http_detector.urllib3.PoolManager", autospec=True) def test_find_features_RetryError(self, mock_pool_manager): from requests.exceptions import RetryError @@ -61,7 +61,7 @@ def test_find_features_RetryError(self, mock_pool_manager): feature_detector.find_features(image_file) assert feature_detector.error_count == 1 - @mock.patch("aws.osml.model_runner.inference.http_detector.urllib3.PoolManager", autospec=True) + @patch("aws.osml.model_runner.inference.http_detector.urllib3.PoolManager", autospec=True) def test_find_features_MaxRetryError(self, mock_pool_manager): from urllib3.exceptions import MaxRetryError @@ -76,7 +76,7 @@ def test_find_features_MaxRetryError(self, mock_pool_manager): feature_detector.find_features(image_file) assert feature_detector.error_count == 1 - @mock.patch("aws.osml.model_runner.inference.http_detector.urllib3.PoolManager", autospec=True) + @patch("aws.osml.model_runner.inference.http_detector.urllib3.PoolManager", autospec=True) def test_find_features_JSONDecodeError(self, mock_pool_manager): from aws.osml.model_runner.inference import HTTPDetector diff --git a/test/aws/osml/model_runner/inference/test_sm_detector.py b/test/aws/osml/model_runner/inference/test_sm_detector.py index 91994dd9..4f3c673c 100755 --- a/test/aws/osml/model_runner/inference/test_sm_detector.py +++ b/test/aws/osml/model_runner/inference/test_sm_detector.py @@ -3,13 +3,12 @@ import datetime import io import json -import unittest from json import JSONDecodeError -from unittest.mock import Mock +from unittest import TestCase +from unittest.mock import Mock, patch import boto3 import botocore -import mock from botocore.stub import ANY, Stubber MOCK_RESPONSE = { @@ -36,7 +35,7 @@ } -class TestSMDetector(unittest.TestCase): +class TestSMDetector(TestCase): def test_construct_with_execution_role(self): from aws.osml.model_runner.inference import SMDetector @@ -49,7 +48,7 @@ def test_construct_with_execution_role(self): "SessionToken": "FAKE-SESSION-TOKEN", "Expiration": datetime.datetime.now(), } - with mock.patch("aws.osml.model_runner.inference.sm_detector.boto3") as mock_boto3: + with patch("aws.osml.model_runner.inference.sm_detector.boto3") as mock_boto3: mock_boto3.client.return_value = sm_client SMDetector("test-endpoint", aws_credentials) mock_boto3.client.assert_called_once_with( diff --git a/test/aws/osml/model_runner/queue/test_request_queue.py b/test/aws/osml/model_runner/queue/test_request_queue.py index 09f48961..d533d124 100755 --- a/test/aws/osml/model_runner/queue/test_request_queue.py +++ b/test/aws/osml/model_runner/queue/test_request_queue.py @@ -5,7 +5,7 @@ import boto3 from botocore.exceptions import ClientError -from moto import mock_sqs +from moto import mock_aws TEST_MOCK_MESSAGE = { "Type": "Notification", @@ -32,7 +32,7 @@ ) -@mock_sqs +@mock_aws class TestRequestQueue(unittest.TestCase): def setUp(self): from aws.osml.model_runner.app_config import BotoConfig diff --git a/test/aws/osml/model_runner/status/test_image_status_helper.py b/test/aws/osml/model_runner/status/test_image_request_status.py similarity index 99% rename from test/aws/osml/model_runner/status/test_image_status_helper.py rename to test/aws/osml/model_runner/status/test_image_request_status.py index 3d10af54..cb846e42 100755 --- a/test/aws/osml/model_runner/status/test_image_status_helper.py +++ b/test/aws/osml/model_runner/status/test_image_request_status.py @@ -6,7 +6,7 @@ from shapely.geometry import Point -class TestImageStatusHelper(TestCase): +class TestImageRequestStatus(TestCase): def setUp(self): from aws.osml.model_runner.api.inference import ModelInvokeMode from aws.osml.model_runner.common.typing import ImageCompression, ImageFormats, ImageRequestStatus diff --git a/test/aws/osml/model_runner/status/test_sns_helper.py b/test/aws/osml/model_runner/status/test_sns_helper.py index b2968c0b..b2699f1b 100755 --- a/test/aws/osml/model_runner/status/test_sns_helper.py +++ b/test/aws/osml/model_runner/status/test_sns_helper.py @@ -8,13 +8,12 @@ import boto3 from botocore.exceptions import ClientError -from moto import mock_sns, mock_sqs +from moto import mock_aws TEST_MOCK_PUBLISH_EXCEPTION = Mock(side_effect=ClientError({"Error": {"Code": 500, "Message": "ClientError"}}, "publish")) -@mock_sqs -@mock_sns +@mock_aws class TestSnsHelper(TestCase): def setUp(self): from aws.osml.model_runner.app_config import BotoConfig diff --git a/test/aws/osml/model_runner/status/test_status_monitor.py b/test/aws/osml/model_runner/status/test_status_monitor.py index 01711c37..901c483e 100755 --- a/test/aws/osml/model_runner/status/test_status_monitor.py +++ b/test/aws/osml/model_runner/status/test_status_monitor.py @@ -6,11 +6,10 @@ from unittest import TestCase import boto3 -from moto import mock_sns, mock_sqs +from moto import mock_aws -@mock_sqs -@mock_sns +@mock_aws class TestStatusMonitor(TestCase): def setUp(self) -> None: from aws.osml.model_runner.app_config import BotoConfig diff --git a/test/aws/osml/model_runner/tile_worker/test_tile_worker_utils.py b/test/aws/osml/model_runner/tile_worker/test_tile_worker_utils.py index bdaba573..53f62e0d 100755 --- a/test/aws/osml/model_runner/tile_worker/test_tile_worker_utils.py +++ b/test/aws/osml/model_runner/tile_worker/test_tile_worker_utils.py @@ -1,14 +1,14 @@ # Copyright 2023 Amazon.com, Inc. or its affiliates. -from unittest import TestCase, main, mock +from unittest import TestCase, main +from unittest.mock import Mock, patch import pytest -from mock import Mock class TestTileWorkerUtils(TestCase): - @mock.patch("aws.osml.model_runner.tile_worker.tile_worker_utils.TileWorker", autospec=True) - @mock.patch("aws.osml.model_runner.tile_worker.tile_worker_utils.ServiceConfig", autospec=True) + @patch("aws.osml.model_runner.tile_worker.tile_worker_utils.TileWorker", autospec=True) + @patch("aws.osml.model_runner.tile_worker.tile_worker_utils.ServiceConfig", autospec=True) def test_setup_tile_workers(self, mock_service_config, mock_tile_worker): from aws.osml.model_runner.api import RegionRequest from aws.osml.model_runner.tile_worker.tile_worker_utils import setup_tile_workers @@ -30,15 +30,12 @@ def test_setup_tile_workers(self, mock_service_config, mock_tile_worker): ) mock_sensor_model = None mock_elevation_model = None - mock_metrics = None - work_queue, tile_worker_list = setup_tile_workers( - mock_region_request, mock_sensor_model, mock_elevation_model, mock_metrics - ) + work_queue, tile_worker_list = setup_tile_workers(mock_region_request, mock_sensor_model, mock_elevation_model) assert len(tile_worker_list) == mock_num_tile_workers - @mock.patch("aws.osml.model_runner.tile_worker.tile_worker_utils.FeatureTable", autospec=True) - @mock.patch("aws.osml.model_runner.tile_worker.tile_worker_utils.TileWorker", autospec=True) - @mock.patch("aws.osml.model_runner.tile_worker.tile_worker_utils.ServiceConfig", autospec=True) + @patch("aws.osml.model_runner.tile_worker.tile_worker_utils.FeatureTable", autospec=True) + @patch("aws.osml.model_runner.tile_worker.tile_worker_utils.TileWorker", autospec=True) + @patch("aws.osml.model_runner.tile_worker.tile_worker_utils.ServiceConfig", autospec=True) def test_setup_tile_workers_exception(self, mock_service_config, mock_tile_worker, mock_feature_table): from aws.osml.model_runner.api import RegionRequest from aws.osml.model_runner.tile_worker.exceptions import SetupTileWorkersException @@ -62,10 +59,9 @@ def test_setup_tile_workers_exception(self, mock_service_config, mock_tile_worke ) mock_sensor_model = None mock_elevation_model = None - mock_metrics = None with self.assertRaises(SetupTileWorkersException): # with self.assertRaises(ValueError): - setup_tile_workers(mock_region_request, mock_sensor_model, mock_elevation_model, mock_metrics) + setup_tile_workers(mock_region_request, mock_sensor_model, mock_elevation_model) def test_chip_generator(self): from aws.osml.model_runner.tile_worker.tile_worker_utils import generate_crops diff --git a/test/data/sample_request_data.py b/test/data/sample_request_data.py deleted file mode 100755 index 09c5d4bf..00000000 --- a/test/data/sample_request_data.py +++ /dev/null @@ -1,22 +0,0 @@ -# Copyright 2023 Amazon.com, Inc. or its affiliates. - -SAMPLE_REGION_REQUEST_DATA = { - "tile_size": (10, 10), - "tile_overlap": (1, 1), - "tile_format": "NITF", - "image_id": "test-image-id", - "image_url": "test-image-url", - "region_bounds": ((0, 0), (50, 50)), - "model_name": "test-model-name", - "model_invoke_mode": "SM_ENDPOINT", - "output_bucket": "unit-test", - "output_prefix": "region-request", - "execution_role": "arn:aws:iam::012345678910:role/OversightMLBetaInvokeRole", -} - -SAMPLE_IMAGE_REQUEST_DATA = { - "imageURL": "s3://test-account/path/to/data/sample_file.tif", - "outputBucket": "output-bucket", - "outputPrefix": "oversight/sample", - "modelName": "test-model", -} diff --git a/test/test_api.py b/test/test_api.py index 2efe5129..417fb7b7 100755 --- a/test/test_api.py +++ b/test/test_api.py @@ -1,10 +1,10 @@ # Copyright 2023 Amazon.com, Inc. or its affiliates. -import unittest from typing import Any, Dict +from unittest import TestCase, main +from unittest.mock import patch import boto3 -import mock import pytest import shapely.geometry from botocore.stub import Stubber @@ -20,7 +20,7 @@ } -class TestModelRunnerAPI(unittest.TestCase): +class TestModelRunnerAPI(TestCase): def test_region_request_constructor(self): from aws.osml.model_runner.api.image_request import ModelInvokeMode from aws.osml.model_runner.api.region_request import RegionRequest @@ -108,7 +108,7 @@ def test_image_request_constructor(self): assert s3_sink.__getattribute__("prefix") == "images/outputs" assert ir.roi is None - @mock.patch("aws.osml.model_runner.common.credentials_utils.sts_client") + @patch("aws.osml.model_runner.common.credentials_utils.sts_client") def test_image_request_from_message(self, mock_sts): from aws.osml.model_runner.api.image_request import ImageRequest from aws.osml.model_runner.common.typing import ImageCompression @@ -282,4 +282,4 @@ def test_image_request_invalid_image_path(self): if __name__ == "__main__": - unittest.main() + main() diff --git a/test/test_app.py b/test/test_app.py index 8785e702..673cae2d 100755 --- a/test/test_app.py +++ b/test/test_app.py @@ -1,15 +1,15 @@ -# Copyright 2023 Amazon.com, Inc. or its affiliates. +# Copyright 2023-2024 Amazon.com, Inc. or its affiliates. + import os -import unittest from importlib import reload from queue import Queue +from unittest import TestCase, main +from unittest.mock import Mock, patch import boto3 import geojson -import mock from botocore.exceptions import ClientError -from mock import Mock -from moto import mock_dynamodb, mock_ec2, mock_kinesis, mock_s3, mock_sagemaker, mock_sns, mock_sqs +from moto import mock_aws from osgeo import gdal TEST_MOCK_PUT_EXCEPTION = Mock(side_effect=ClientError({"Error": {"Code": 500, "Message": "ClientError"}}, "put_item")) @@ -80,14 +80,8 @@ def __eq__(self, other): return other["region"] == self.region_request["region"] and other["image_id"] == self.region_request["image_id"] -@mock_dynamodb -@mock_ec2 -@mock_s3 -@mock_sagemaker -@mock_sqs -@mock_sns -@mock_kinesis -class TestModelRunner(unittest.TestCase): +@mock_aws +class TestModelRunner(TestCase): def setUp(self): """ Set up virtual AWS resources for use by our unit tests @@ -414,8 +408,8 @@ def test_process_geom_image_request(self): # Remember that with multiple patch decorators the order of the mocks in the parameter list is # reversed (i.e. the first mock parameter is the last decorator defined). Also note that the # pytest fixtures must come at the end. - @mock.patch("aws.osml.model_runner.app.setup_tile_workers") - @mock.patch("aws.osml.model_runner.app.process_tiles") + @patch("aws.osml.model_runner.app.setup_tile_workers") + @patch("aws.osml.model_runner.app.process_tiles") def test_process_region_request( self, mock_process_tiles, @@ -452,8 +446,8 @@ def test_process_region_request( self.model_runner.endpoint_statistics_table.increment_region_count.assert_called_once_with(TEST_MODEL_ENDPOINT) self.model_runner.endpoint_statistics_table.decrement_region_count.assert_called_once_with(TEST_MODEL_ENDPOINT) - @mock.patch("aws.osml.model_runner.app.setup_tile_workers") - @mock.patch("aws.osml.model_runner.app.process_tiles") + @patch("aws.osml.model_runner.app.setup_tile_workers") + @patch("aws.osml.model_runner.app.process_tiles") def test_process_region_request_exception( self, mock_process_tiles, @@ -489,8 +483,8 @@ def test_process_region_request_exception( self.model_runner.endpoint_statistics_table.increment_region_count.assert_called_once_with(TEST_MODEL_ENDPOINT) self.model_runner.endpoint_statistics_table.decrement_region_count.assert_called_once_with(TEST_MODEL_ENDPOINT) - @mock.patch("aws.osml.model_runner.app.setup_tile_workers") - @mock.patch("aws.osml.model_runner.app.process_tiles") + @patch("aws.osml.model_runner.app.setup_tile_workers") + @patch("aws.osml.model_runner.app.process_tiles") def test_process_region_request_invalid( self, mock_process_tiles, @@ -539,10 +533,10 @@ def test_process_region_request_invalid( self.model_runner.endpoint_statistics_table.increment_region_count.assert_not_called() self.model_runner.endpoint_statistics_table.decrement_region_count.assert_not_called() - @mock.patch("aws.osml.model_runner.tile_worker.tile_worker_utils.FeatureDetectorFactory", autospec=True) - @mock.patch("aws.osml.model_runner.tile_worker.tile_worker_utils.FeatureTable", autospec=True) - @mock.patch("aws.osml.model_runner.tile_worker.tile_worker_utils.TileWorker", autospec=True) - @mock.patch("aws.osml.model_runner.tile_worker.tile_worker_utils.Queue", autospec=True) + @patch("aws.osml.model_runner.tile_worker.tile_worker_utils.FeatureDetectorFactory", autospec=True) + @patch("aws.osml.model_runner.tile_worker.tile_worker_utils.FeatureTable", autospec=True) + @patch("aws.osml.model_runner.tile_worker.tile_worker_utils.TileWorker", autospec=True) + @patch("aws.osml.model_runner.tile_worker.tile_worker_utils.Queue", autospec=True) def test_process_region_request_throttled( self, mock_queue, @@ -553,9 +547,13 @@ def test_process_region_request_throttled( from aws.osml.gdal.gdal_utils import load_gdal_dataset from aws.osml.model_runner.database.endpoint_statistics_table import EndpointStatisticsTable from aws.osml.model_runner.database.job_table import JobTable - from aws.osml.model_runner.database.region_request_table import RegionRequestTable + from aws.osml.model_runner.database.region_request_table import RegionRequestItem, RegionRequestTable from aws.osml.model_runner.exceptions import SelfThrottledRegionException + region_request_item = RegionRequestItem( + image_id=TEST_IMAGE_ID, region_id="test-region-id", region_pixel_bounds="(0, 0)(50, 50)" + ) + # Load up our test image raster_dataset, sensor_model = load_gdal_dataset(self.region_request.image_url) @@ -565,7 +563,7 @@ def test_process_region_request_throttled( self.model_runner.endpoint_statistics_table.current_in_progress_regions.return_value = 10000 with self.assertRaises(SelfThrottledRegionException): - self.model_runner.process_region_request(self.region_request, raster_dataset, sensor_model) + self.model_runner.process_region_request(self.region_request, region_request_item, raster_dataset, sensor_model) self.model_runner.endpoint_statistics_table.increment_region_count.assert_not_called() self.model_runner.endpoint_statistics_table.decrement_region_count.assert_not_called() @@ -577,7 +575,7 @@ def test_process_region_request_throttled( # Check to make sure a queue was created and populated with appropriate region requests mock_queue.assert_not_called() - @mock.patch.dict("os.environ", values={"ELEVATION_DATA_LOCATION": TEST_ELEVATION_DATA_LOCATION}) + @patch.dict("os.environ", values={"ELEVATION_DATA_LOCATION": TEST_ELEVATION_DATA_LOCATION}) def test_create_elevation_model(self): # These imports/reloads are necessary to force the ServiceConfig instance used by model runner # to have the patched environment variables @@ -654,4 +652,4 @@ def get_dataset_and_camera(): if __name__ == "__main__": - unittest.main() + main() diff --git a/tox.ini b/tox.ini index c3b5a889..0c6a2afb 100755 --- a/tox.ini +++ b/tox.ini @@ -28,7 +28,7 @@ deps = pytest-xdist>=3.2.0 pytest-asyncio>=0.20.3 mock>=5.0.0 - moto>=4.1.0 + moto[all]>=5.0.0 defusedxml>=0.7.1 setenv = # ModelRunner