Skip to content

Commit

Permalink
Upload artifacts after scraper completion
Browse files Browse the repository at this point in the history
  • Loading branch information
benoit74 committed Feb 21, 2024
1 parent faaac17 commit a229741
Show file tree
Hide file tree
Showing 6 changed files with 128 additions and 8 deletions.
7 changes: 7 additions & 0 deletions dispatcher/backend/src/common/constants.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,13 @@
LOGS_EXPIRATION = int(os.getenv("LOGS_EXPIRATION", "30"))
except Exception:
LOGS_EXPIRATION = 30
ARTIFACTS_UPLOAD_URI = os.getenv(
"ARTIFACTS_UPLOAD_URI", "sftp://uploader@warehouse.farm.openzim.org:1522/artifacts"
)
try:
ARTIFACTS_EXPIRATION = int(os.getenv("ARTIFACTS_EXPIRATION", "30"))
except Exception:
ARTIFACTS_EXPIRATION = 30

# empty ZIMCHECK_OPTION means no zimcheck
ZIMCHECK_OPTION = os.getenv("ZIMCHECK_OPTION", "")
Expand Down
18 changes: 14 additions & 4 deletions dispatcher/backend/src/common/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -117,6 +117,9 @@ def add_to_debug_if_present(
task=task, kwargs_key="timeout", container_key="timeout"
)
add_to_container_if_present(task=task, kwargs_key="log", container_key="log")
add_to_container_if_present(
task=task, kwargs_key="artifacts", container_key="artifacts"
)
add_to_debug_if_present(task=task, kwargs_key="task_log", debug_key="log")
add_to_debug_if_present(task=task, kwargs_key="task_name", debug_key="task_name")
add_to_debug_if_present(task=task, kwargs_key="task_args", debug_key="task_args")
Expand Down Expand Up @@ -348,10 +351,17 @@ def task_checked_file_event_handler(session: so.Session, task_id: UUID, payload:

def task_update_event_handler(session: so.Session, task_id: UUID, payload: dict):
timestamp = get_timestamp_from_event(payload)
log = payload.get("log") # filename / S3 key of text file at upload_uri[logs]
logger.info(f"Task update: {task_id}, log: {log}")

save_event(session, task_id, TaskStatus.update, timestamp, log=log)
if "log" in payload:
log = payload.get("log") # filename / S3 key of text file at upload_uri[logs]
logger.info(f"Task update: {task_id}, log: {log}")
save_event(session, task_id, TaskStatus.update, timestamp, log=log)

if "artifacts" in payload:
artifacts = payload.get(
"artifacts"
) # filename / S3 key of text file at upload_uri[logs]
logger.info(f"Task update: {task_id}, artifacts: {artifacts}")
save_event(session, task_id, TaskStatus.update, timestamp, artifacts=artifacts)


def handle_others(task_id: UUID, event: str, payload: dict):
Expand Down
9 changes: 9 additions & 0 deletions dispatcher/backend/src/tests/unit/routes/test_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -211,6 +211,15 @@
"&bucketName=org-kiwix-zimfarm-logs"
),
},
"artifacts": {
"expiration": 20,
"upload_uri": (
"s3://s3.us-west-1.wasabisys.com/"
"?keyId=this_is_super_secret"
"&secretAccessKey=this_is_super_secret"
"&bucketName=org-kiwix-zimfarm-artifacts"
),
},
},
},
],
Expand Down
6 changes: 6 additions & 0 deletions dispatcher/backend/src/utils/scheduling.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,8 @@
import db.models as dbm
from common import getnow
from common.constants import (
ARTIFACTS_EXPIRATION,
ARTIFACTS_UPLOAD_URI,
DEFAULT_SCHEDULE_DURATION,
ENABLED_SCHEDULER,
LOGS_EXPIRATION,
Expand Down Expand Up @@ -165,6 +167,10 @@ def request_a_schedule(
"upload_uri": LOGS_UPLOAD_URI,
"expiration": LOGS_EXPIRATION,
},
"artifacts": {
"upload_uri": ARTIFACTS_UPLOAD_URI,
"expiration": ARTIFACTS_EXPIRATION,
},
},
notification=schedule.notification if schedule.notification else {},
updated_at=now,
Expand Down
7 changes: 3 additions & 4 deletions workers/app/common/docker.py
Original file line number Diff line number Diff line change
Expand Up @@ -252,13 +252,12 @@ def get_scraper_container_name(task):
)


def upload_container_name(task_id, filename, unique):
ident = "zimup" if filename.endswith(".zim") else "logup"
def upload_container_name(task_id, filename, kind, unique):
if unique:
filename = f"{uuid.uuid4().hex}{pathlib.Path(filename).suffix}"
else:
filename = re.sub(r"[^a-zA-Z0-9_.-]", "_", filename)
return f"{short_id(task_id)}_{ident}_{filename}"
return f"{short_id(task_id)}_{kind}up_{filename}"


def get_ip_address(docker_client, name):
Expand Down Expand Up @@ -562,7 +561,7 @@ def start_uploader(
resume,
watch=False,
):
container_name = upload_container_name(task["_id"], filename, False)
container_name = upload_container_name(task["_id"], filename, kind, False)

# remove container should it exists (should not)
try:
Expand Down
89 changes: 89 additions & 0 deletions workers/app/task/worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
import shutil
import signal
import sys
import tarfile
import time
from typing import Any, Dict

Expand Down Expand Up @@ -104,6 +105,7 @@ def __init__(self, **kwargs):

self.scraper = None # scraper container
self.log_uploader = None # scraper log uploader container
self.artifacts_uploader = None # scraper artifacts uploader container
self.host_logsdir = None # path on host where logs are stored
self.scraper_succeeded = None # whether scraper succeeded

Expand Down Expand Up @@ -338,6 +340,7 @@ def stop(self, timeout=5):
"dnscache",
"scraper",
"log_uploader",
"artifacts_uploader",
"uploader",
"checker",
):
Expand Down Expand Up @@ -461,6 +464,88 @@ def check_scraper_log_upload(self):
}
)

def upload_scraper_artifacts(self):
if not self.scraper:
logger.error("No scraper to upload its artifacts…")
return # scraper gone, we can't access artifacts

artifacts_globs = self.task["config"].get("artifacts_globs", None)
if not artifacts_globs:
logger.debug("No artifacts configured for upload")
return
else:
artifacts_globs = str(artifacts_globs).split("|")
logger.debug(f"Archiving files matching {artifacts_globs}")

logger.debug("Creating a tar of requested artifacts")
filename = f"{self.task['_id']}_{self.task['config']['task_name']}.tar"
try:
files_to_tar = [
file
for pattern in artifacts_globs
for file in self.task_workdir.glob(pattern)
]
if len(files_to_tar) == 0:
logger.debug("No files found to archive")
return

fpath = self.task_workdir / filename
with tarfile.open(fpath, "w") as tar:
for file in files_to_tar:
tar.add(file, arcname=file.relative_to(self.task_workdir))
try:
file.unlink()
except Exception as exc:
logger.debug(
"Unable to delete file after archiving", exc_info=exc
)
except Exception as exc:
logger.error(f"Unable to archive artifacts to {fpath}")
logger.exception(exc)
return False

logger.debug("Starting artifacts uploader container…")
self.artifacts_uploader = start_uploader(
self.docker,
self.task,
"artifacts",
self.username,
host_workdir=self.host_task_workdir,
upload_dir="",
filename=filename,
move=False,
delete=True,
compress=True,
resume=True,
)

def check_scraper_artifacts_upload(self):
if not self.artifacts_uploader or self.container_running("artifacts_uploader"):
return

try:
self.artifacts_uploader.reload()
exit_code = self.artifacts_uploader.attrs["State"]["ExitCode"]
filename = self.artifacts_uploader.labels["filename"]
except docker.errors.NotFound:
# prevent race condition if re-entering between this and container removal
return
logger.info(f"Scraper artifacts upload complete: {exit_code}")
if exit_code != 0:
logger.error(
f"Artifacts Uploader:: "
f"{get_container_logs(self.docker, self.artifacts_uploader.name)}"
)
self.stop_container("artifacts_uploader")

logger.info(f"Sending scraper artifacts filename: {filename}")
self.patch_task(
{
"event": "update",
"payload": {"artifacts": filename},
}
)

def refresh_files_list(self):
for fpath in self.task_workdir.glob("*.zim"):
if fpath.name not in self.zim_files.keys():
Expand Down Expand Up @@ -619,6 +704,7 @@ def handle_stopped_scraper(self):
self.mark_scraper_completed(exit_code, stdout, stderr)
self.scraper_succeeded = exit_code == 0
self.upload_scraper_log()
self.upload_scraper_artifacts()

def sleep(self):
time.sleep(1)
Expand Down Expand Up @@ -670,6 +756,7 @@ def run(self):
or self.container_running("uploader")
or self.container_running("checker")
or self.container_running("log_uploader")
or self.container_running("artifacts_uploader")
):
now = datetime.datetime.now()
if (now - last_check).total_seconds() < SLEEP_INTERVAL:
Expand All @@ -679,10 +766,12 @@ def run(self):
last_check = now
self.handle_files()
self.check_scraper_log_upload()
self.check_scraper_artifacts_upload()

# make sure we submit upload status for last zim and scraper log
self.handle_files()
self.check_scraper_log_upload()
self.check_scraper_artifacts_upload()

# done with processing, cleaning-up and exiting
self.shutdown("succeeded" if self.scraper_succeeded else "failed")

0 comments on commit a229741

Please sign in to comment.