Skip to content

Commit

Permalink
Merge pull request #8 from rfcx/develop
Browse files Browse the repository at this point in the history
Implement parallel download function
  • Loading branch information
Nutto55 authored Sep 22, 2020
2 parents 7e336aa + a58c60a commit f2e5f27
Show file tree
Hide file tree
Showing 3 changed files with 116 additions and 9 deletions.
77 changes: 71 additions & 6 deletions package-rfcx/rfcx/audio.py
Original file line number Diff line number Diff line change
@@ -1,19 +1,28 @@
import urllib.request
import datetime
import requests
import shutil
import os
import concurrent.futures
from rfcx._api_rfcx import guardianAudio

def __save_file(url, local_path):
""" Download the file from `url` and save it locally under `local_path` """
with urllib.request.urlopen(url) as response, open(local_path, 'wb') as out_file:
shutil.copyfileobj(response, out_file)
response = requests.get(url, stream=True)
if (response.status_code == 200):
with open(local_path, 'wb') as out_file:
response.raw.decode_content = True
shutil.copyfileobj(response.raw, out_file)
else:
print("Can not download {} with status {}".format(url, response.status_code))

def __local_audio_file_path(path, audio_id, audio_extension):
def __local_audio_file_path(path, audio_name, audio_extension):
""" Create string for the name and the path """
return path + '/' + audio_id + "." + audio_extension
return path + '/' + audio_name + "." + audio_extension

def save_audio_file(destination_path, audio_id, source_audio_extension='opus'):
""" Prepare `url` and `local_path` and save it using function `__save_file`
Args:
destination_path: Path to the save directory.
destination_path: Audio save path.
audio_id: RFCx audio id.
source_audio_extension: (optional, default= '.opus') Extension for saving audio files.
Expand All @@ -28,3 +37,59 @@ def save_audio_file(destination_path, audio_id, source_audio_extension='opus'):
local_path = __local_audio_file_path(destination_path, audio_id, source_audio_extension)
__save_file(url, local_path)
print('File {}.{} saved to {}'.format(audio_id, source_audio_extension, destination_path))

def __generate_date_list_in_isoformat(start, end):
""" Generate list of date in iso format ending with `Z` """
delta = end - start
dates = [(start + datetime.timedelta(days=i)).replace(microsecond=0).isoformat() + 'Z' for i in range(delta.days + 1)]
return dates

def __segmentDownload(audio_path, file_ext, segment):
audio_id = segment['guid']
audio_name = "{}_{}_{}".format(segment['guardian_guid'], segment['measured_at'].replace(':', '-').replace('.', '-'), audio_id)
url = "https://assets.rfcx.org/audio/" + audio_id + "." + file_ext
local_path = __local_audio_file_path(audio_path, audio_name, file_ext)
__save_file(url, local_path)

def downloadGuardianAudio(token, destination_path, guardian_id, min_date, max_date, file_ext='opus', parallel=True):
""" Download RFCx audio on specific time range using `guardianAudio` to get audio segments information
and save it using function `__save_file`
Args:
token: RFCx client token.
destination_path: Audio save path.
guardian_id: RFCx guardian id
min_date: Download start date
max_date: Download end date
file_ext: (optional, default= '.opus') Extension for saving audio file.
parallel: (optional, default= True) Enable to parallel download audio from RFCx
Returns:
None.
Raises:
TypeError: if missing required arguements.
"""
audio_path = destination_path + '/' + guardian_id
if not os.path.exists(audio_path):
os.makedirs(audio_path)
dates = __generate_date_list_in_isoformat(min_date, max_date)

for date in dates:
date_end = date.replace('00:00:00', '23:59:59')
segments = guardianAudio(token, guardian_id, date, date_end, limit=1000, descending=False)

if segments:
if(parallel):
with concurrent.futures.ThreadPoolExecutor(max_workers=100) as executor:
futures = []
for segment in segments:
futures.append(executor.submit(__segmentDownload, audio_path=audio_path, file_ext=file_ext, segment=segment))

futures, _ = concurrent.futures.wait(futures)
else:
for segment in segments:
__segmentDownload(audio_path, file_ext, segment)
print("Finish download on", guardian_id, date[:-10])
else:
print("No data on date:", date[:-10])
46 changes: 44 additions & 2 deletions package-rfcx/rfcx/client.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
import getpass
import datetime
from os import path
import os
import re
import rfcx.audio as audio
import rfcx._pkce as pkce
import rfcx._api_rfcx as api_rfcx
import rfcx._api_auth as api_auth
Expand Down Expand Up @@ -38,7 +40,7 @@ def authenticate(self, persist=True):
access_token = None

# Attempt to load the credentials from disk
if path.exists(self.persisted_credentials_path):
if os.path.exists(self.persisted_credentials_path):
with open(self.persisted_credentials_path, 'r') as f:
lines = f.read().splitlines()
if len(lines) == 5 and lines[0] == 'version 1':
Expand Down Expand Up @@ -186,3 +188,43 @@ def tags(self, type, labels, start=None, end=None, sites=None, limit=1000):
end = datetime.datetime.utcnow().replace(microsecond=0).isoformat() + 'Z'

return api_rfcx.tags(self.credentials.id_token, type, labels, start, end, sites, limit)

def downloadGuardianAudio(self, dest_path=None, guardian_id=None, min_date=None, max_date=None, file_ext='opus', parallel=True):
"""Download audio using audio information from `guardianAudio`
Args:
dest_path: (Required) Path to save audio.
guardianId: (Required) The guid of a guardian
min_date: Minimum timestamp of the audio. If None then defaults to exactly 30 days ago.
max_date: Maximum timestamp of the audio. If None then defaults to now.
file_ext: Audio file extension. Default to `.opus`
parallel: Parallel download audio. Defaults to True.
Returns:
None.
"""
if self.credentials == None:
print('Not authenticated')
return

if dest_path == None:
if not os.path.exists('./audios'):
os.makedirs('./audios')
if guardian_id == None:
print("Please specific the guardian id.")
return

if min_date == None:
min_date = datetime.datetime.utcnow() - datetime.timedelta(days=30)
if not isinstance(min_date, datetime.datetime):
print("min_date is not type datetime")
return

if max_date == None:
max_date = datetime.datetime.utcnow()
if not isinstance(max_date, datetime.datetime):
print("max_date is not type datetime")
return

return audio.downloadGuardianAudio(self.credentials.id_token, dest_path, guardian_id, min_date, max_date, file_ext, parallel)

2 changes: 1 addition & 1 deletion package-rfcx/setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
REQUIRED_PACKAGES = ['httplib2', 'six']

setup(name='rfcx',
version='0.0.7',
version='0.0.9',
url='https://github.com/rfcx/rfcx-sdk-python',
license='None',
author='Rainforest Connection',
Expand Down

0 comments on commit f2e5f27

Please sign in to comment.