Skip to content

Commit

Permalink
Decouple the runner from the outpack_server store.
Browse files Browse the repository at this point in the history
In the previous design, outpack_server, the runner API and the workers
all shared a single outpack directory, and a Git repository (mostly, the
workers used a clone of the shared repository for the actual execution).

This creates a very tight and brittle coupling between all the
components. It makes it impossible to deploy the different components on
separate machines. It requires careful reasoning about data races and
conflicts between the different bits. It prevents us from sharing worker
processes across multiple Packit instances, and it prevents us from
using multiple Git repositories within a single instance.

The new design completely splits up the storage.
- The API server and each worker have their own local Git clones of the
  repositories, that are directly pulled from the upstream (eg. GitHub).
- The API servers and workers store bare Git clones of the repositories,
  without any worktree. When running a report, workers create a new
  worktree in a temporary directory, run the report and delete the
  worktree. This ensures a completely clean slate every time.
- The workers use their own outpack store, that is not shared with any
  other process.
- The workers can pull and push packets using any protocol supported by
  orderly2. In practice, we will be using HTTP to interact with the
  outpack_server used by Packit.

Currently, the workers create a new outpack store for each run, meaning
they do not cache any of the packet dependencies and need to download
them from the outpack_server from scratch every time. Given that, at
least for now, workers and outpack_server will be operating on the same
or nearby machines, this seems like a reasonable overhead.

Ideally we would keep a per-worker cache, however we need to be careful
not to mix packets between different instances. One possible approach
may be to re-use the file store, but start from an empty metadata store
everytime. This way large unnecessary file downloads are avoided, while
preserving some degree of isolation between runs and instances.
  • Loading branch information
plietar committed Jan 15, 2025
1 parent 42b295d commit 5b11ef6
Show file tree
Hide file tree
Showing 20 changed files with 583 additions and 552 deletions.
35 changes: 19 additions & 16 deletions R/api.R
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,6 @@
##'
##' @title Create orderly runner
##'
##' @param root Orderly root
##'
##' @param repositories_base_path Path in which Git repositories are
##' cloned.
##'
Expand All @@ -22,7 +20,7 @@
##'
##' @export
api <- function(
root, repositories_base_path,
repositories_base_path,
validate = NULL, log_level = "info",
skip_queue_creation = FALSE) {
logger <- porcelain::porcelain_logger(log_level)
Expand All @@ -31,12 +29,11 @@ api <- function(
if (skip_queue_creation) {
queue <- NULL
} else {
queue <- Queue$new(root)
queue <- Queue$new()
}

api <- porcelain::porcelain$new(validate = validate, logger = logger)
api$include_package_endpoints(state = list(
root = root,
repositories_base_path = repositories_base_path,
queue = queue))
api
Expand Down Expand Up @@ -90,11 +87,13 @@ repository_branches <- function(repositories_base_path, url) {

##' @porcelain
##' GET /report/list => json(report_list)
##' query url :: string
##' query ref :: string
##' state root :: root
report_list <- function(root, ref) {
base <- git_remote_default_branch_ref(root)
reports <- get_reports(root = root, ref = ref, base = base)
##' state repositories_base_path :: repositories_base_path
report_list <- function(repositories_base_path, url, ref) {
repo <- repository_path(repositories_base_path, url)
base <- git_remote_default_branch_ref(repo)
reports <- get_reports(root = repo, ref = ref, base = base)

data.frame(
name = reports$name,
Expand All @@ -105,11 +104,14 @@ report_list <- function(root, ref) {


##' @porcelain
##' GET /report/<name:string>/parameters => json(report_parameters)
##' GET /report/parameters => json(report_parameters)
##' query url :: string
##' query ref :: string
##' state root :: root
report_parameters <- function(root, ref, name) {
params <- get_report_parameters(name, ref, root)
##' query name :: string
##' state repositories_base_path :: repositories_base_path
report_parameters <- function(repositories_base_path, url, ref, name) {
repo <- repository_path(repositories_base_path, url)
params <- get_report_parameters(name, ref, repo)
lapply(names(params), function(param_name) {
value <- params[[param_name]]
list(
Expand All @@ -121,16 +123,17 @@ report_parameters <- function(root, ref, name) {

##' @porcelain
##' POST /report/run => json(report_run_response)
##' state root :: root
##' state queue :: queue
##' body data :: json(report_run_request)
submit_report_run <- function(root, queue, data) {
submit_report_run <- function(queue, data) {
data <- jsonlite::parse_json(data)
task_id <- queue$submit(
data$name,
url = data$url,
branch = data$branch,
ref = data$hash,
parameters = data$parameters
parameters = data$parameters,
location = data$location
)
list(taskId = scalar(task_id))
}
Expand Down
12 changes: 12 additions & 0 deletions R/git.R
Original file line number Diff line number Diff line change
Expand Up @@ -110,3 +110,15 @@ git_diff_tree <- function(left, right, repo = NULL) {
" (?<status>[A-Z])(?<score>\\d+)?\\t(?<src>[^\\t]*)(?:\\t(?<dst>[^\\t]*))?$")
as.data.frame(stringr::str_match(output, re)[, -1, drop = FALSE])
}


create_temporary_worktree <- function(repo, branch, tmpdir, env = parent.frame()) {
path <- withr::local_tempdir(tmpdir = tmpdir, .local_envir = env)

git_run(c("worktree", "add", path, branch), repo = repo)
withr::defer(env = env, {
git_run(c("worktree", "remove", "--force", path), repo = repo)
})

path
}
22 changes: 8 additions & 14 deletions R/main.R
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
parse_main <- function(args = commandArgs(TRUE)) {
usage <- "Usage:
orderly.runner.server [options] <path> <repositories>
orderly.runner.server [options] <path>
Options:
--log-level=LEVEL Log-level (off, info, all) [default: info]
Expand All @@ -11,14 +11,13 @@ Options:
list(log_level = dat$log_level,
validate = dat$validate,
port = as.integer(dat$port),
path = dat$path,
repositories = dat$repositories,
repositories = dat$path,
host = dat$host)
}

main <- function(args = commandArgs(TRUE)) {
dat <- parse_main(args)
api_obj <- api(dat$path, dat$repositories, dat$validate, dat$log_level)
api_obj <- api(dat$repositories, dat$validate, dat$log_level)
api_obj$run(host = dat$host, port = dat$port)
}

Expand All @@ -33,16 +32,11 @@ orderly.runner.worker <path>"
main_worker <- function(args = commandArgs(TRUE)) {
dat <- parse_main_worker(args)

# assumes ORDERLY_RUNNER_QUEUE_ID is set
queue <- Queue$new(dat$path)
queue_id <- Sys.getenv("ORDERLY_RUNNER_QUEUE_ID")
worker <- rrq::rrq_worker$new(queue_id, timeout_config = 30)

worker <- rrq::rrq_worker$new(
queue$controller$queue_id,
con = queue$controller$con
)
worker_path <- file.path(dat$path, ".packit", "workers", worker$id)
fs::dir_create(worker_path)
gert::git_clone(dat$path, path = worker_path)
fs::dir_create(dat$path)

worker$loop()
# This environment variable is used by the code submitted by the API.
withr::with_envvar(c(ORDERLY_WORKER_STORAGE = dat$path), worker$loop())
}
14 changes: 7 additions & 7 deletions R/porcelain.R

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

38 changes: 14 additions & 24 deletions R/queue.R
Original file line number Diff line number Diff line change
Expand Up @@ -4,29 +4,18 @@
Queue <- R6::R6Class("Queue", # nolint
cloneable = FALSE,
public = list(
#' @field root Orderly root
root = NULL,
#' @field config Orderly config
config = NULL,
#' @field controller RRQ controller
controller = NULL,

#' @description
#' Create object, read configuration and setup Redis connection.
#'
#' @param root Orderly root.
#' @param queue_id ID of an existing queue to connect to, creates a new one
#' if NULL (default NULL)
#' @param logs_dir directory to store worker logs
initialize = function(root, queue_id = NULL, logs_dir = "logs/worker") {
self$root <- root
self$config <- orderly2::orderly_config(self$root)
if (!runner_has_git(self$root)) {
cli::cli_abort(paste(
"Not starting server as orderly",
"root is not version controlled."
))
}
initialize = function(queue_id = NULL, logs_dir = "logs/worker") {
# Connect to Redis
con <- redux::hiredis()

# Create queue
self$controller <- rrq::rrq_controller(queue_id %||% orderly_queue_id())
Expand All @@ -40,20 +29,21 @@ Queue <- R6::R6Class("Queue", # nolint
#' @description
#' Submit a job the Redis queue for runner to run.
#'
#' @param reportname Name of orderly report.
#' @param parameters Parameters to run the report with (default NULL)
#' @param branch Name of git branch to checkout the repository
#' (default master)
#' @param url The URL of the Git repository containing the reports.
#' @param branch Name of git branch to checkout the repository.
#' @param ref Git commit-ish value (e.g HEAD or commit hash or branch name).
#' We reset hard to this ref and run the report. (default HEAD)
submit = function(reportname, parameters = NULL,
branch = "master", ref = "HEAD") {
#' @param reportname Name of orderly report.
#' @param parameters Parameters to run the report with.
#' @param location Location of the outpack repository from which to pull
#' dependencies and push the produced packet.
submit = function(url, branch, ref, reportname, parameters, location) {
run_args <- list(
self$root,
url,
branch,
ref,
reportname,
parameters,
branch,
ref
location
)
rrq::rrq_task_create_call(runner_run, run_args,
separate_process = TRUE,
Expand Down
4 changes: 2 additions & 2 deletions R/reports.R
Original file line number Diff line number Diff line change
Expand Up @@ -41,8 +41,8 @@ get_report_parameters <- function(name, ref, root) {
}


get_orderly_script_path <- function(name, ref, root) {
contents <- gert::git_ls(root, ref = ref)
get_orderly_script_path <- function(name, ref, repo) {
contents <- gert::git_ls(repo, ref = ref)
re <- sprintf("^src/%s/(%s|orderly)\\.R$", name, name)
matches <- grep(re, contents$path, value = TRUE, perl = TRUE)
if (length(matches) != 1) {
Expand Down
96 changes: 32 additions & 64 deletions R/runner.R
Original file line number Diff line number Diff line change
@@ -1,66 +1,34 @@
runner_run <- function(orderly_root, reportname, parameters, branch, ref, ...) {
# Setup
worker_id <- Sys.getenv("RRQ_WORKER_ID")
worker_path <- file.path(orderly_root, ".packit", "workers", worker_id)
point_head_to_ref(worker_path, branch, ref)

# Initial cleanup
git_clean(worker_path)

# Run
id <- withr::with_envvar(
c(ORDERLY_SRC_ROOT = file.path(worker_path, "src", reportname)),
orderly2::orderly_run(reportname, parameters = parameters,
root = orderly_root, ...)
)

# Cleanup
git_clean(worker_path)

runner_run <- function(url, branch, ref, reportname, parameters, location, ...) {
storage <- Sys.getenv("ORDERLY_WORKER_STORAGE")
stopifnot(nzchar(storage) && fs::dir_exists(storage))

repositories <- fs::dir_create(storage, "git")
worktree_base <- fs::dir_create(storage, "worktrees")

repo <- git_sync(repositories, url)

# We could create the worktree with a detached HEAD and not bother with
# creating the branch, but then Orderly's metadata wouldn't be as complete.
#
# Using a named branch does introduce some persistent state in the Git
# repository, which the runner generally does its best to avoid. That is an
# acceptable compromise given that repositories are private to each worker.
#
# The branch/ref association here does not have to match the remote: if
# the upstream repository has just been pushed to, the commit associated with
# this run may be an older commit of that branch. We will happily use that
# older commit.
gert::git_branch_create(branch, ref = ref, force = TRUE, checkout = FALSE,
repo = repo)
worktree <- create_temporary_worktree(repo, branch, worktree_base)

orderly2::orderly_init(worktree)
orderly2::orderly_location_add("upstream", location$type, location$args,
root = worktree)

id <- orderly2::orderly_run(reportname, parameters = parameters,
fetch_metadata = TRUE, allow_remote = TRUE,
location = "upstream", root = worktree, ...)
orderly2::orderly_location_push(id, "upstream", root = worktree)
id
}

point_head_to_ref <- function(worker_path, branch, ref) {
gert::git_fetch(repo = worker_path)
gert::git_branch_checkout(branch, repo = worker_path)
gert::git_reset_hard(ref, repo = worker_path)
}

add_dir_parent_if_empty <- function(files_to_delete, path) {
contained_files <- list.files(path, full.names = TRUE)
if (length(setdiff(contained_files, files_to_delete)) > 0) {
return(files_to_delete)
}
add_dir_parent_if_empty(c(files_to_delete, path), dirname(path))
}

get_empty_dirs <- function(worker_path) {
dirs <- fs::dir_ls(worker_path, recurse = TRUE, type = "directory")
Reduce(add_dir_parent_if_empty, c(list(character()), dirs))
}

git_clean <- function(worker_path) {
# gert does not have git clean but this should achieve the same thing
tryCatch(
{
gert::git_stash_save(
include_untracked = TRUE,
include_ignored = TRUE,
repo = worker_path
)
gert::git_stash_drop(repo = worker_path)
},
error = function(e) {
# we don't need to rethrow the error here since it doesn't break any
# further report runs
if (e$message != "cannot stash changes - there is nothing to stash.") {
# TODO add logger here
message(e$message)
}
NULL
}
)
# however git ignores all directories, only cares about files, so we may
# have empty directories left
unlink(get_empty_dirs(worker_path), recursive = TRUE)
}
4 changes: 0 additions & 4 deletions docker/Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -9,10 +9,6 @@ COPY . /src
RUN Rscript -e "pak::local_install('/src')"

COPY docker/bin /usr/local/bin/

RUN git config --global --add safe.directory "*"
RUN echo ".packit" > /.gitignore
RUN git config --global core.excludesFile "/.gitignore"

# ENTRYPOINT for server is /usr/local/bin/orderly.runner.server
# ENTRYPOINT for worker is /usr/local/bin/orderly.runner.worker
Loading

0 comments on commit 5b11ef6

Please sign in to comment.