Skip to content

Commit

Permalink
Merge pull request #176 from openzim/feature/language-agnostic-fetch
Browse files Browse the repository at this point in the history
add support for grabbing all videos if languages is not set
  • Loading branch information
benoit74 authored Apr 4, 2024
2 parents ed322db + c6c9a6b commit e0ab3d5
Show file tree
Hide file tree
Showing 2 changed files with 164 additions and 46 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0

- New `long_description` CLI argument to set the ZIM long description
- New `disable_metadata_check` CLI argument to disable the metadata checks which are automated since zimscraperlib 3.x
- When `--languages` CLI arugment is not passed, no filtering by language is done (#171)

### Changed

Expand Down
209 changes: 163 additions & 46 deletions src/ted2zim/scraper.py
Original file line number Diff line number Diff line change
Expand Up @@ -184,7 +184,7 @@ def sort_languages_hack(languages: set[str]) -> list[str]:
self.source_languages = (
[] if not self.languages else self.to_ted_langcodes(self.languages)
)
self.already_visited = []
self.already_visited = set()

# set and record locale for translations
locale_details = get_language_details(locale_name)
Expand Down Expand Up @@ -290,16 +290,51 @@ def extract_videos_from_playlist(self, playlist):
for element in video_elements:
relative_path = element.get("href")
url = urllib.parse.urljoin(self.talks_base_url, relative_path)
if self.extract_info_from_video_page(url):
if self.source_languages and len(self.source_languages) > 1:
other_lang_urls = self.generate_urls_for_other_languages(url)
logger.debug(
f"Searching info for the video in other {len(other_lang_urls)} "
"language(s)"
)
for lang_url in other_lang_urls:
self.extract_info_from_video_page(lang_url)
self.already_visited.append(urllib.parse.urlparse(url).path)
json_data = self.extract_info_from_video_page(url)

if json_data is not None:
player_data = json_data["playerData"]
lang_code = json_data["language"]
if self.source_languages:
# If the first video which was fetched is in source_languages,
# save it.
if lang_code in self.source_languages:
self.update_videos_list_from_info(json_data)
# Determine the next languages to fetch from source_languages
other_languages = [
code for code in self.source_languages if code != lang_code
]
else:
# No languages were specified. Save the first video
self.update_videos_list_from_info(json_data)
# We use the the languages returned from the first
# video to generate other language urls.
other_languages = [
language["languageCode"]
for language in player_data["languages"]
if language["languageCode"] != lang_code
]

if not other_languages:
# No need to generate urls for other languages as the list
# is empty
self.already_visited.add(urllib.parse.urlparse(url).path)
continue

other_lang_urls = self.generate_urls_for_other_languages(
url, other_languages
)

logger.debug(
f"Searching info for the video in other {len(other_lang_urls)} "
"other language(s)"
)
for lang_url in other_lang_urls:
data = self.extract_info_from_video_page(lang_url)
if data is not None:
self.update_videos_list_from_info(data)

self.already_visited.add(urllib.parse.urlparse(url).path)
logger.debug(f"Seen {relative_path}")
logger.debug(f"Total videos found on playlist: {len(video_elements)}")
if not video_elements:
Expand Down Expand Up @@ -440,7 +475,7 @@ def generate_subtitle_list(self, video_id, langs, page_lang, audio_lang):

return update_subtitles_list(video_id, subtitles)

def generate_urls_for_other_languages(self, url):
def generate_urls_for_other_languages(self, url, languages):
"""Possible URLs for other requested languages based on a video url"""

urls = []
Expand All @@ -450,7 +485,7 @@ def generate_urls_for_other_languages(self, url):
url_parts = list(urllib.parse.urlparse(url))

# update the language query field value with other languages and form URLs
for language in self.source_languages:
for language in languages:
if language != page_lang:
query.update({"language": language})
url_parts[4] = urllib.parse.urlencode(query)
Expand All @@ -464,9 +499,98 @@ def extract_videos_in_search_results(self, result_json):
logger.debug(f"{nb_listed} video(s) found on current page")
for hit in hits:
url = urllib.parse.urljoin(self.talks_base_url, hit["slug"])
if self.extract_info_from_video_page(url):
nb_extracted += 1
json_data = self.extract_info_from_video_page(url)

if json_data is None:
continue

lang_code = json_data["language"]
player_data = json_data["playerData"]
# we need to filter videos since this has not been done
# before for topics with the "new" search page (2023)
if self.source_languages:
# If the first video which was fetched is in self.source_languages
# save it and increment the counter.
if (
lang_code in self.source_languages
and self.update_videos_list_from_info(json_data)
):
nb_extracted += 1

# Determine the next languages to fetch from source_languages
other_languages = [
code for code in self.source_languages if code != lang_code
]

# If there are any valid language codes which can be fetched, fetch them
# and save accordingly
if other_languages:
other_lang_urls = self.generate_urls_for_other_languages(
url, other_languages
)
logger.debug(
f"Searching info for the video in {len(other_lang_urls)} "
"other language(s)"
)
for lang_url in other_lang_urls:
data = self.extract_info_from_video_page(lang_url)
if data is not None and self.update_videos_list_from_info(data):
# It is possible that this is the first time we
# are saving this video as the first video might
# not necessarily be in the source_languages.
# We increment the counter relying on the fact that
# update_videos_list returns True only if this
# is the first time we are saving the video.
nb_extracted += 1

if lang_code not in self.source_languages:
# Video language fetched is not among the selected ones, we have to
# check subtitles if they are enough
if not self.subtitles_enough:
logger.debug(
f"Ignoring video in non-selected language {lang_code}"
)
else:
matching_languages = [
lang
for lang in player_data["languages"]
if lang["languageCode"] in self.source_languages
]
if len(matching_languages) == 0:
logger.debug(
"Ignoring video without a selected language"
"in audio or subtitles"
)
else:
# Since we are searching for all languages, first update the
# videos list with the data we just scraped.
if self.update_videos_list_from_info(json_data):
nb_extracted += 1

# We use the the languages returned from the json_data of
# this video to generate other language urls
other_languages = []
for language in player_data["languages"]:
# Do not include the language of the video that was just scraped
if language["languageCode"] == lang_code:
continue
other_languages.append(language["languageCode"])

if other_languages:
other_lang_urls = self.generate_urls_for_other_languages(
url, other_languages
)
logger.debug(
f"Searching info for the video in {len(other_lang_urls)} "
"other language(s)"
)
for lang_url in other_lang_urls:
data = self.extract_info_from_video_page(lang_url)
if data is not None:
self.update_videos_list_from_info(data)

logger.debug(f"Seen {hit['slug']}")
self.already_visited.add(urllib.parse.urlparse(url).path)
return nb_extracted, nb_listed

def get_lang_code_from_url(self, url, *, with_full_query=False):
Expand Down Expand Up @@ -575,12 +699,13 @@ def update_videos_list(
"languageName": self.get_display_name(lang_code, lang_name),
}
)
if self.subtitles_setting in (MATCHING, NONE):

if self.subtitles_setting in (MATCHING, NONE) and len(subtitles) == 1:
self.videos[index]["subtitles"] += subtitles
return False

def extract_video_info_from_json(self, json_data):
player_data = json.loads(json_data["playerData"])
def get_lang_code_and_name(self, json_data):
player_data = json_data["playerData"]
lang_code = json_data["language"]
try:
lang_name = [
Expand All @@ -591,30 +716,14 @@ def extract_video_info_from_json(self, json_data):
except Exception as exc:
logger.warning(f"player data has no entry for {lang_code}: {exc}")
lang_name = lang_code
if self.topics:
# we need to filter videos since this has not been done before for topics
# with the "new" search page (2023)
if lang_code not in self.source_languages:
# video language is not among the selected ones, we have to check
# subtitles if they are enough
if not self.subtitles_enough:
logger.debug(f"Ignoring video in non-selected language {lang_code}")
return False
else:
matching_languages = [
lang
for lang in player_data["languages"]
if lang["languageCode"] in self.source_languages
]
if len(matching_languages) == 0:
logger.debug(
"Ignoring video without a selected language in audio or "
"subtitles"
)
return False

native_talk_language = player_data["nativeLanguage"]
return lang_code, lang_name

def update_videos_list_from_info(self, json_data):
player_data = json_data["playerData"]
lang_code, lang_name = self.get_lang_code_and_name(json_data)

native_talk_language = player_data["nativeLanguage"]
# Extract the speaker of the TED talk
if len(json_data["speakers"]):
if isinstance(json_data["speakers"], dict):
Expand Down Expand Up @@ -688,8 +797,13 @@ def extract_video_info_from_json(self, json_data):
subtitles=subtitles,
)

def extract_info_from_video_page(self, url, retry_count=0):
"""extract all info from a TED video page url and update self.videos"""
def extract_info_from_video_page(
self, url: str, retry_count: int = 0
) -> dict | None:
"""extract all info from a TED video page url.
Returns a dict containign the video information if search was
successful, else None.
"""

# Every TED video page has a <script>-tag with a Javascript
# object with JSON in it. We will just stip away the object
Expand All @@ -698,12 +812,12 @@ def extract_info_from_video_page(self, url, retry_count=0):

# don't scrape if URL already visited
if urllib.parse.urlparse(url).path in self.already_visited:
return False
return None

# don't scrape if maximum retry count is reached
if retry_count > 5: # noqa: PLR2004
logger.error("Max retries exceeded. Skipping video")
return False
return None

logger.debug(f"extract_info_from_video_page: {url}")
html_content = request_url(url).text
Expand All @@ -721,8 +835,11 @@ def extract_info_from_video_page(self, url, retry_count=0):
logger.error(
f"Video has not yet been translated into {requested_lang_code}"
)
return False
return self.extract_video_info_from_json(json_data)
return None
# Desrialize the data at json_data["playerData"] into a dict
# and overwrite it accordingly
json_data["playerData"] = json.loads(json_data["playerData"])
return json_data
except Exception:
logger.error(
f"Problem occured while parsing {url}. HTML content was:\n"
Expand Down

0 comments on commit e0ab3d5

Please sign in to comment.