-
Notifications
You must be signed in to change notification settings - Fork 43
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Restructured the fetch stage to be a Composite Beam transform and separated the request & download stage of fetching #148
base: main
Are you sure you want to change the base?
Conversation
…arated the request & download stage of fetching
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Overall, the pipeline improvements look good! Here's my early feedback for the current draft.
def fetch(self, dataset: str, selection: t.Dict) -> None: | ||
pass | ||
|
||
def download(self, dataset: str, result: t.Dict, output: str) -> None: | ||
pass |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
These should raise a NotImplementedError
.
@@ -152,6 +170,90 @@ def __exit__(self, exc_type, exc_value, traceback): | |||
self._redirector.__exit__(exc_type, exc_value, traceback) | |||
|
|||
|
|||
class APIRequestExtended(api.APIRequest): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
How about we include "MARS" in the name of this class?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Naming discussion. I kind of like "extended", but is there a better name? What about "Adapter"? Any other ideas?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
On second thought, adding "MARS" to the name of the class may not be necessary.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Named SplitMARSRequest
.
def __init__(self, *args, **kwargs): | ||
super().__init__(*args, **kwargs) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You can omit the constructor if it just calls super.
tries = 0 | ||
while size != result["size"] and tries < 10: | ||
size = self._transfer( | ||
urljoin(self.url, result["href"]), target, result["size"] | ||
) | ||
if size != result["size"] and tries < 10: | ||
tries += 1 | ||
self.log("Transfer interrupted, resuming in 60s...") | ||
time.sleep(60) | ||
else: | ||
break |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ideally, we could update or replace this block with code that is capable of downloading more than 1 MB of data at a time. It'd be nice if we could do something like this, for example:
shutil.copyfileobj(source_file, dest_file, DEFAULT_READ_BUFFER_SIZE) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, necessary code changes have been done.
open(target, "w").close() | ||
|
||
size = -1 | ||
tries = 0 |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Following up with the comment below: We could use Beam's retry logic with exponential backoff (via the decorator) instead of re-using ECMWF's implementation. WDYT?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, necessary code changes have been done.
@@ -78,22 +171,30 @@ def fetch_data(self, config: Config, *, worker_name: str = 'default') -> None: | |||
|
|||
with self.manifest.transact(config.selection, target, config.user_id): | |||
with tempfile.NamedTemporaryFile() as temp: | |||
logger.info(f'[{worker_name}] Fetching data for {target!r}.') | |||
logger.info(f'[{worker_name}] Fetching and Downloading data for {target!r}.') |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
petty nit: can you use an &
instead of "and" here :) ?
with tempfile.NamedTemporaryFile() as temp: | ||
logger.info(f'[{worker_name}] Fetching data for {target!r}.') | ||
result = self.fetch(client, config.dataset, config.selection) | ||
yield (result, config, worker_name, temp.name, target) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Consider creating an internal data class for passing along this result.
Also: Are you sure that using the temp.name
is safe? It's possible that the temporary file will disappear.
Furthermore, looking at the code: do you need to create a temporary file here? If you don't need it for the fetch, it's probably safer to move this to the next stage.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Maybe we don't need the dataclass since we can probably simplify what's returned. For example, We don't need to pass the target since that can be derived from the config. So, I see the tuple consisting of three parts: config, worker_name, result
.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
(you can choose your favorite order for these).
client = CLIENTS[self.client_name](config) | ||
target = prepare_target_name(config) | ||
|
||
with self.manifest.transact(config.selection, target, config.user_id): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
A complication that I didn't anticipate until now: It would probably be best if we updated the manifest to distinguish between retrieved and downloaded. WDYT?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Made necessary changes. Added a new class variable in DownloadStatus
named stage
which represents the current stage of the request.
@retry_with_exponential_backoff | ||
def upload(self, src: str, dest: str) -> None: | ||
"""Upload blob to cloud storage, with retries.""" | ||
with io.FileIO(src, 'rb') as src_: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
can this be with open(src, 'rb')
?
@@ -129,6 +130,8 @@ def run(argv: t.List[str], save_main_session: bool = True) -> PipelineArgs: | |||
help='Number of concurrent requests to make per API key. ' | |||
'Default: make an educated guess per client & config. ' | |||
'Please see the client documentation for more details.') | |||
parser.add_argument('-o', '--optimise-download', action='store_true', default=False, | |||
help="Optimised the downloads.") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Make sure you cross apply the description of what this does here.
…r cosmetic changes.
…llelism and updated the manifest.
…e docstring in RetrieveData
I'm taking a leaf from @mahrsee1997's PR #148 so that we can copy data from the MARS server faster (using a larger buffer size). Thanks for the primary contribution here, Rahul. * restructured the fetch stage to be a Composite Beam transform and separated the request & download stage of fetching * retry logic of downloads for MARS client & other cosmetic changes. * Remove fetch / dl split * retrieve in two steps. * rm fetch + dl methods. * Fix: `nim_requests_per_key` does not require class construction. * fix lint: removed unused import. * add support for aria2 for faster download * code changes as per Alex feedback. Co-authored-by: mahrsee1997 <rahul@infocusp.in>
No description provided.