From e794860babc4ba0cdb848db308f87ce1b8a76964 Mon Sep 17 00:00:00 2001 From: "Artiom N." Date: Sun, 28 Apr 2024 01:17:35 +0300 Subject: [PATCH] Images name hashing implemented: #24 --- markdown_tool.py | 1 + markdown_toolset/article_processor.py | 3 ++ markdown_toolset/image_downloader.py | 53 +++++++++++++++++++++------ markdown_toolset/www_tools.py | 19 +++++++--- tests/test_image_downloader.py | 19 ++++++++++ 5 files changed, 78 insertions(+), 17 deletions(-) diff --git a/markdown_tool.py b/markdown_tool.py index 0a24572..b095e50 100755 --- a/markdown_tool.py +++ b/markdown_tool.py @@ -65,6 +65,7 @@ def main(arguments): deduplication_type=getattr(DeduplicationVariant, arguments.deduplication_type.upper()), images_dirname=arguments.images_dirname, save_hierarchy=arguments.prepend_images_with_path, + replace_image_names=arguments.replace_image_names, ) processor.process() diff --git a/markdown_toolset/article_processor.py b/markdown_toolset/article_processor.py index 891f758..d1608d0 100644 --- a/markdown_toolset/article_processor.py +++ b/markdown_toolset/article_processor.py @@ -38,6 +38,7 @@ def __init__( deduplication_type: DeduplicationVariant = DeduplicationVariant.DISABLED, images_dirname: Union[Path, str] = 'images', save_hierarchy: bool = False, + replace_image_names: bool = False, ): self._article_formatter = get_formatter(output_format, FORMATTERS) self._article_downloader = ArticleDownloader( @@ -60,6 +61,7 @@ def __init__( self._save_hierarchy = save_hierarchy self._img_downloader = None self._running = False + self._replace_image_names = replace_image_names def process(self): try: @@ -103,6 +105,7 @@ def process(self): download_incorrect_mime_types=self._download_incorrect_mime, downloading_timeout=self._downloading_timeout, deduplicator=deduplicator, + replace_image_names=self._replace_image_names, ) result = self._transform_article(article_path, self._input_formats, TRANSFORMERS) diff --git a/markdown_toolset/image_downloader.py b/markdown_toolset/image_downloader.py index 5c6f843..3ef5823 100644 --- a/markdown_toolset/image_downloader.py +++ b/markdown_toolset/image_downloader.py @@ -9,7 +9,8 @@ from .deduplicators.deduplicator import Deduplicator from .out_path_maker import OutPathMaker -from .www_tools import download_from_url, get_filename_from_url, is_url, remove_protocol_prefix +from .www_tools import download_from_url, get_filename_from_url, is_url, remove_protocol_prefix, split_file_ext +from .string_tools import is_binary_same class ImageLink: @@ -68,6 +69,7 @@ def __init__( download_incorrect_mime_types: bool = False, downloading_timeout: float = -1, deduplicator: Optional[Deduplicator] = None, + replace_image_names: bool = False, ): """ :parameter out_path_maker: image local path creating strategy. @@ -78,6 +80,7 @@ def __init__( `downloading_timeout` seconds. :parameter download_incorrect_mime_types: download images even if MIME type can't be identified. :parameter deduplicator: file deduplicator object. + :parameter replace_image_names: replace image names with hash. """ self._out_path_maker = out_path_maker @@ -87,6 +90,7 @@ def __init__( self._download_incorrect_mime_types = download_incorrect_mime_types self._deduplicator = deduplicator self._running = False + self._replace_image_names = replace_image_names # pylint: disable=R0912(too-many-branches) def download_images(self, images: List[Union[str, ImageLink]]) -> dict: @@ -152,6 +156,13 @@ def download_images(self, images: List[Union[str, ImageLink]]) -> dict: 'Empty image filename, probably this is incorrect link: "%s".', image_download_url ) continue + + if self._replace_image_names: + _, image_ext = split_file_ext(image_filename) + image_content_hash = hashlib.sha384(image_content).hexdigest() + logging.debug('Image content hash: %s', image_filename) + image_filename = f'{image_content_hash}.{image_ext}' + except Exception as e: if self._skip_all_errors: logging.warning( @@ -171,8 +182,24 @@ def download_images(self, images: List[Union[str, ImageLink]]) -> dict: if not result: continue - real_image_path = self._process_image_path(image_url, image_filename, replacement_mapping) + image_local_url, real_image_path = self._get_real_path(image_url, image_filename) + + if self._replace_image_names and real_image_path.exists(): + # Image by this content hash exists, but possibly this is a collision. + with open(real_image_path, 'rb') as real_file: + if not is_binary_same(real_file, BytesIO(image_content)): + # Fix collision, changing name. + img_num: int = 0 + while real_image_path.exists(): + numerated_image_filename = f'{image_num}{image_filename}' + real_image_path = self._out_path_maker.get_real_path( + image_local_url, numerated_image_filename + ) + img_num += 1 + + image_filename = numerated_image_filename + self._update_mapping(image_url, image_local_url, image_filename, replacement_mapping) self._write_image(real_image_path, image_content, image_link) finally: logging.info('Finished images downloading.') @@ -205,26 +232,28 @@ def _resize_image(image_content: bytes, new_size, filename): logging.debug('Saving resized image to the %s', filename) img.save(filename) - def _process_image_path(self, image_url, image_filename, replacement_mapping): - """Get real image path and update replacement mapping.""" - + def _get_real_path(self, image_url, image_filename): + """Get real image path.""" image_local_url = Path(remove_protocol_prefix(image_url)).parent.as_posix() + real_image_path = self._out_path_maker.get_real_path(image_local_url, image_filename) + + logging.debug('Real image path = "%s", image filename = "%s"', real_image_path, image_filename) + + return image_local_url, real_image_path + + def _update_mapping(self, image_url, image_local_url, image_filename, replacement_mapping): + """Update replacement mapping.""" document_img_path = self._out_path_maker.get_document_img_path(image_local_url, image_filename) image_filename, document_img_path = self._fix_paths( replacement_mapping, document_img_path, image_url, image_filename ) - - real_image_path = self._out_path_maker.get_real_path(image_local_url, image_filename) + replacement_mapping.setdefault(image_url, '/'.join(document_img_path.parts)) logging.debug( - 'Real image path = "%s", document image path = "%s", image filename = "%s"', - real_image_path, + 'Document image path = "%s", image filename = "%s"', document_img_path, image_filename, ) - replacement_mapping.setdefault(image_url, '/'.join(document_img_path.parts)) - - return real_image_path def _make_directories(self, path: Optional[Path] = None): """Create directories hierarchy, started from images directory.""" diff --git a/markdown_toolset/www_tools.py b/markdown_toolset/www_tools.py index d56a109..b8554c2 100644 --- a/markdown_toolset/www_tools.py +++ b/markdown_toolset/www_tools.py @@ -3,7 +3,7 @@ """ import logging -from typing import Optional +from typing import Optional, Tuple from mimetypes import guess_extension import re from urllib.parse import urlparse, urlunparse @@ -65,6 +65,18 @@ def download_from_url(url: str, timeout: float = None): return response +def split_file_ext(file_name: str) -> Tuple[str, str]: + """ + Split filename to the name and extension. + """ + name, ext = ( + (name_and_ext := file_name.rsplit('.', 1)), + (*name_and_ext, None) if len(name_and_ext) == 1 else name_and_ext, + )[1:][0] + + return name, ext + + def get_filename_from_url(req: requests.Response) -> Optional[str]: """ Get filename from url and, if not found, try to get from content-disposition. @@ -90,10 +102,7 @@ def get_filename_from_url(req: requests.Response) -> Optional[str]: result = file_name[0] - f_name, f_ext = ( - (name_and_ext := result.rsplit('.', 1)), - (*name_and_ext, None) if len(name_and_ext) == 1 else name_and_ext, - )[1:][0] + f_name, f_ext = split_file_ext(result) if f_name == '': return None diff --git a/tests/test_image_downloader.py b/tests/test_image_downloader.py index 963d314..c58817b 100644 --- a/tests/test_image_downloader.py +++ b/tests/test_image_downloader.py @@ -1,3 +1,4 @@ +import hashlib from pathlib import Path import pytest @@ -68,3 +69,21 @@ def test_resizing(self): with Image.open(self._out_image_filepath) as img: assert img.width == w assert img.height == h + + def test_names_replacing(self): + image_downloader = ImageDownloader( + out_path_maker=self._out_path_maker, + skip_list=[], + skip_all_errors=False, + download_incorrect_mime_types=True, + downloading_timeout=-1, + deduplicator=None, + replace_image_names=True, + ) + + with open(self._article_images_path / self._image_filename, 'rb') as image_file: + image_hash = hashlib.sha384(image_file.read()).hexdigest() + + image_downloader.download_images([self._image_in_relpath]) + + assert (self._images_out_path / f'{image_hash}.png').exists()