Skip to content

Commit

Permalink
Add flag temporary to all files and remove from outputs (#102)
Browse files Browse the repository at this point in the history
* add temporary flag to all files then remove from outputs

* linting

* fix test to single stage to avoid docker image rollback

* bump version and lint

* reorder tests

* raise error if retry is to high

* add jitter
  • Loading branch information
rzlim08 authored Mar 1, 2023
1 parent ece1205 commit ad6393b
Show file tree
Hide file tree
Showing 4 changed files with 124 additions and 2 deletions.
46 changes: 46 additions & 0 deletions miniwdl-plugins/s3upload/miniwdl_s3upload.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,8 @@
import threading
import json
import logging
import time
import random
from pathlib import Path
from urllib.parse import urlparse
from typing import Dict, Optional, Tuple, Union
Expand Down Expand Up @@ -80,6 +82,42 @@ def flag_temporary(s3uri):
)


def remove_temporary_flag(s3uri, retry=0):
""" Remove temporary flag from s3 if in outputs.json """
uri = urlparse(s3uri)
bucket, key = uri.hostname, uri.path[1:]
tags = s3_client.get_object_tagging(
Bucket=bucket,
Key=key,
)
remaining_tags = []
for tag in tags["TagSet"]:
if not (tag["Key"] == "swipe_temporary" and tag["Value"] == "true"):
remaining_tags.append(tag)
try:
if remaining_tags:
s3_client.put_object_tagging(
Bucket=bucket,
Key=key,
Tagging={
'TagSet': remaining_tags
},
)
else:
s3_client.delete_object_tagging(
Bucket=bucket,
Key=key,
)
except botocore.exceptions.ClientError as e:
if retry > 3:
raise e
print(f"Error deleting tags for object {key} in bucket {bucket}: {e}")
delay = 20 + random.randint(0, 10)
print(f"Retrying in {delay} seconds...")
time.sleep(delay)
remove_temporary_flag(s3uri, retry+1)


def inode(link: str):
if link.startswith("s3://"):
return link
Expand Down Expand Up @@ -293,6 +331,13 @@ def rewriter(fd):
json.dump(outputs_s3_json, outfile, indent=2)
outfile.write("\n")

for output_file in outputs_s3_json.values():
if isinstance(output_file, list):
for filename in output_file:
remove_temporary_flag(filename)
elif output_file and output_file.startswith("s3://"):
remove_temporary_flag(output_file)

s3cp(logger, fn, os.environ.get("WDL_OUTPUT_URI", os.path.join(s3prefix, "outputs.s3.json")))


Expand All @@ -317,3 +362,4 @@ def s3cp(logger, fn, s3uri):
)
)
raise WDL.Error.RuntimeError("failed: " + " ".join(cmd))
flag_temporary(s3uri)
2 changes: 1 addition & 1 deletion miniwdl-plugins/s3upload/setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@

setup(
name="miniwdl-s3upload",
version="0.0.8",
version="0.0.9",
description="miniwdl plugin for progressive upload of task output files to Amazon S3",
url="https://github.com/chanzuckerberg/miniwdl-s3upload",
project_urls={
Expand Down
76 changes: 76 additions & 0 deletions test/test_wdl.py
Original file line number Diff line number Diff line change
Expand Up @@ -104,6 +104,49 @@
}
"""

test_wdl_temp = """
version 1.0
workflow swipe_test {
input {
File hello
String docker_image_id
}
call add_world_temp {
input:
input_file = hello,
docker_image_id = docker_image_id
}
output {
File out_world = add_world_temp.out_world
}
}
task add_world_temp {
input {
File input_file
String docker_image_id
}
command <<<
cat ~{input_file} > out_world.txt
echo world >> out_world.txt
echo "temporary file" > temporary.txt
>>>
output {
File out_world = "out_world.txt"
File temporary = "temporary.txt"
}
runtime {
docker: docker_image_id
}
}
"""

test_fail_wdl = """
version 1.0
workflow swipe_test {
Expand Down Expand Up @@ -214,6 +257,8 @@ def setUp(self) -> None:
self.wdl_fail_obj.put(Body=test_fail_wdl.encode())
self.wdl_two_obj = self.test_bucket.Object("test-two-v1.0.0.wdl")
self.wdl_two_obj.put(Body=test_two_wdl.encode())
self.wdl_obj_temp = self.test_bucket.Object("test-temp-v1.0.0.wdl")
self.wdl_obj_temp.put(Body=test_wdl_temp.replace("swipe_test", "temp_test").encode())

with NamedTemporaryFile(suffix=".wdl.zip") as f:
Zip.build(load(join(dirname(realpath(__file__)), 'multi_wdl/run.wdl')), f.name, self.logger)
Expand Down Expand Up @@ -360,6 +405,37 @@ def test_failing_wdl_workflow(self):
[-1]["executionFailedEventDetails"]["error"])
self.assertTrue(errorType in ["UncaughtError", "RunFailed"])

def test_temp_tag(self):
output_prefix = "out-temp-tag"
sfn_input: Dict[str, Any] = {
"RUN_WDL_URI": f"s3://{self.wdl_obj_temp.bucket_name}/{self.wdl_obj_temp.key}",
"OutputPrefix": f"s3://{self.input_obj.bucket_name}/{output_prefix}",
"Input": {
"Run": {
"hello": f"s3://{self.input_obj.bucket_name}/{self.input_obj.key}",
"docker_image_id": "ubuntu",
}
},
}

self._wait_sfn(sfn_input, self.single_sfn_arn)

# test temporary tag is there for intermediate file
temporary_tagset = self.s3_client.get_object_tagging(
Bucket="swipe-test",
Key=f"{output_prefix}/test-temp-1/temporary.txt"
).get("TagSet", [])
self.assertEqual(len(temporary_tagset), 1)
self.assertEqual(temporary_tagset[0].get("Key"), "swipe_temporary")
self.assertEqual(temporary_tagset[0].get("Value"), "true")

# test temporary tag got removed for output file
output_tagset = self.s3_client.get_object_tagging(
Bucket="swipe-test",
Key=f"{output_prefix}/test-temp-1/out_world.txt"
).get("TagSet", [])
self.assertEqual(len(output_tagset), 0)

def test_staged_sfn_wdl_workflow(self):
output_prefix = "out-2"
sfn_input: Dict[str, Any] = {
Expand Down
2 changes: 1 addition & 1 deletion version
Original file line number Diff line number Diff line change
@@ -1 +1 @@
v1.3.1
v1.3.2

0 comments on commit ad6393b

Please sign in to comment.