Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Restructured the fetch stage to be a Composite Beam transform and separated the request & download stage of fetching #148

Draft
wants to merge 8 commits into
base: main
Choose a base branch
from

Conversation

mahrsee1997
Copy link
Collaborator

No description provided.

…arated the request & download stage of fetching
@mahrsee1997 mahrsee1997 requested a review from alxmrs April 20, 2022 20:29
Copy link
Collaborator

@alxmrs alxmrs left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Overall, the pipeline improvements look good! Here's my early feedback for the current draft.

Comment on lines 117 to 121
def fetch(self, dataset: str, selection: t.Dict) -> None:
pass

def download(self, dataset: str, result: t.Dict, output: str) -> None:
pass
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

These should raise a NotImplementedError.

@@ -152,6 +170,90 @@ def __exit__(self, exc_type, exc_value, traceback):
self._redirector.__exit__(exc_type, exc_value, traceback)


class APIRequestExtended(api.APIRequest):
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How about we include "MARS" in the name of this class?

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Naming discussion. I kind of like "extended", but is there a better name? What about "Adapter"? Any other ideas?

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

On second thought, adding "MARS" to the name of the class may not be necessary.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Named SplitMARSRequest.

Comment on lines 174 to 175
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You can omit the constructor if it just calls super.

Comment on lines 209 to 219
tries = 0
while size != result["size"] and tries < 10:
size = self._transfer(
urljoin(self.url, result["href"]), target, result["size"]
)
if size != result["size"] and tries < 10:
tries += 1
self.log("Transfer interrupted, resuming in 60s...")
time.sleep(60)
else:
break
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ideally, we could update or replace this block with code that is capable of downloading more than 1 MB of data at a time. It'd be nice if we could do something like this, for example:

shutil.copyfileobj(source_file, dest_file, DEFAULT_READ_BUFFER_SIZE)

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, necessary code changes have been done.

open(target, "w").close()

size = -1
tries = 0
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Following up with the comment below: We could use Beam's retry logic with exponential backoff (via the decorator) instead of re-using ECMWF's implementation. WDYT?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, necessary code changes have been done.

@@ -78,22 +171,30 @@ def fetch_data(self, config: Config, *, worker_name: str = 'default') -> None:

with self.manifest.transact(config.selection, target, config.user_id):
with tempfile.NamedTemporaryFile() as temp:
logger.info(f'[{worker_name}] Fetching data for {target!r}.')
logger.info(f'[{worker_name}] Fetching and Downloading data for {target!r}.')
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

petty nit: can you use an & instead of "and" here :) ?

with tempfile.NamedTemporaryFile() as temp:
logger.info(f'[{worker_name}] Fetching data for {target!r}.')
result = self.fetch(client, config.dataset, config.selection)
yield (result, config, worker_name, temp.name, target)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Consider creating an internal data class for passing along this result.

Also: Are you sure that using the temp.name is safe? It's possible that the temporary file will disappear.

Furthermore, looking at the code: do you need to create a temporary file here? If you don't need it for the fetch, it's probably safer to move this to the next stage.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe we don't need the dataclass since we can probably simplify what's returned. For example, We don't need to pass the target since that can be derived from the config. So, I see the tuple consisting of three parts: config, worker_name, result.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

(you can choose your favorite order for these).

client = CLIENTS[self.client_name](config)
target = prepare_target_name(config)

with self.manifest.transact(config.selection, target, config.user_id):
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

A complication that I didn't anticipate until now: It would probably be best if we updated the manifest to distinguish between retrieved and downloaded. WDYT?

Copy link
Collaborator Author

@mahrsee1997 mahrsee1997 Apr 22, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Made necessary changes. Added a new class variable in DownloadStatus named stage which represents the current stage of the request.

@retry_with_exponential_backoff
def upload(self, src: str, dest: str) -> None:
"""Upload blob to cloud storage, with retries."""
with io.FileIO(src, 'rb') as src_:
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can this be with open(src, 'rb')?

@@ -129,6 +130,8 @@ def run(argv: t.List[str], save_main_session: bool = True) -> PipelineArgs:
help='Number of concurrent requests to make per API key. '
'Default: make an educated guess per client & config. '
'Please see the client documentation for more details.')
parser.add_argument('-o', '--optimise-download', action='store_true', default=False,
help="Optimised the downloads.")
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Make sure you cross apply the description of what this does here.

@mahrsee1997 mahrsee1997 requested a review from alxmrs April 22, 2022 18:39
alxmrs added a commit that referenced this pull request Jan 4, 2023
I'm taking a leaf from @mahrsee1997's PR #148 so that we can copy data from the MARS server faster (using a larger buffer size). Thanks for the primary contribution here, Rahul.

* restructured the fetch stage to be a Composite Beam transform and separated the request & download stage of fetching

* retry logic of downloads for MARS client & other cosmetic changes.

* Remove fetch / dl split

* retrieve in two steps.

* rm fetch + dl methods.

* Fix: `nim_requests_per_key` does not require class construction.

* fix lint: removed unused import.

* add support for aria2 for faster download

* code changes as per Alex feedback.

Co-authored-by: mahrsee1997 <rahul@infocusp.in>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants