diff --git a/dev/.documenter-siteinfo.json b/dev/.documenter-siteinfo.json index 26c072b0..b0347a0d 100644 --- a/dev/.documenter-siteinfo.json +++ b/dev/.documenter-siteinfo.json @@ -1 +1 @@ -{"documenter":{"julia_version":"1.11.2","generation_timestamp":"2024-12-03T19:07:24","documenter_version":"1.8.0"}} \ No newline at end of file +{"documenter":{"julia_version":"1.11.2","generation_timestamp":"2024-12-06T20:43:14","documenter_version":"1.8.0"}} \ No newline at end of file diff --git a/dev/api-dagger/functions/index.html b/dev/api-dagger/functions/index.html index c822d102..a00b383d 100644 --- a/dev/api-dagger/functions/index.html +++ b/dev/api-dagger/functions/index.html @@ -1,10 +1,10 @@ -Functions and Macros · Dagger.jl

Dagger Functions

Task Functions/Macros

Dagger.@spawnMacro
Dagger.@spawn [option=value]... f(args...; kwargs...) -> DTask

Spawns a Dagger DTask that will call f(args...; kwargs...). This DTask is like a Julia Task, and has many similarities:

  • The DTask can be wait'd on and fetch'd from to see its final result
  • By default, the DTask will be automatically run on the first available compute resource
  • If all dependencies are satisfied, the DTask will be run as soon as possible
  • The DTask may be run in parallel with other DTasks, and the scheduler will automatically manage dependencies
  • If a DTask throws an exception, it will be propagated to any calls to fetch, but not to calls to wait

However, the DTask also has many key differences from a Task:

  • The DTask may run on any thread of any Julia process, and even on a remote machine, in your cluster (see Distributed.addprocs)
  • The DTask might automatically utilize GPUs or other accelerators, if available
  • If arguments to a DTask are also DTasks, then the scheduler will execute those arguments' DTasks first, before running the "downstream" task
  • If an argument to a DTask t2 is a DTask t1, then the result of t1 (gotten via fetch(t1)) will be passed to t2 (no need for t2 to call fetch!)
  • DTasks are generally expected to be defined "functionally", meaning that they should not mutate global state, mutate their arguments, or have side effects
  • DTasks are function call-focused, meaning that Dagger.@spawn expects a single function call, and not a block of code
  • All DTask arguments are expected to be safe to serialize and send to other Julia processes; if not, use the scope option or Dagger.@mutable to control execution location

Options to the DTask can be set before the call to f with key-value syntax, e.g. Dagger.@spawn myopt=2 do_something(1, 3.0), which would set the option myopt to 2 for this task. Multiple options may be provided, which are specified like Dagger.@spawn myopt=2 otheropt=4 do_something(1, 3.0).

These options control a variety of properties of the resulting DTask:

  • scope: The execution "scope" of the task, which determines where the task will run. By default, the task will run on the first available compute resource. If you have multiple compute resources, you can specify a scope to run the task on a specific resource. For example, Dagger.@spawn scope=Dagger.scope(worker=2) do_something(1, 3.0) would run do_something(1, 3.0) on worker 2.
  • meta: If true, instead of the scheduler automatically fetching values from other tasks, the raw Chunk objects will be passed to f. Useful for doing manual fetching or manipulation of Chunk references. Non-Chunk arguments are still passed as-is.

Other options exist; see Dagger.Sch.ThunkOptions for the full list.

This macro is a semi-thin wrapper around Dagger.spawn - it creates a call to Dagger.spawn on f with arguments args and keyword arguments kwargs, and also passes along any options in an Options struct. For example, Dagger.@spawn myopt=2 do_something(1, 3.0) would essentially become Dagger.spawn(do_something, Dagger.Options(;myopt=2), 1, 3.0).

source
Dagger.spawnFunction
Dagger.spawn(f, args...; kwargs...) -> DTask

Spawns a DTask that will call f(args...; kwargs...). Also supports passing a Dagger.Options struct as the first argument to set task options. See Dagger.@spawn for more details on DTasks.

source

Task Options Functions/Macros

Dagger.with_optionsFunction
with_options(f, options::NamedTuple) -> Any
-with_options(f; options...) -> Any

Sets one or more options to the given values, executes f(), resets the options to their previous values, and returns the result of f(). This is the recommended way to set options, as it only affects tasks spawned within its scope. Note that setting an option here will propagate its value across Julia or Dagger tasks spawned by f() or its callees (i.e. the options propagate).

source
Dagger.get_optionsFunction
get_options(key::Symbol, default) -> Any
-get_options(key::Symbol) -> Any

Returns the value of the option named key. If option does not have a value set, then an error will be thrown, unless default is set, in which case it will be returned instead of erroring.

get_options() -> NamedTuple

Returns a NamedTuple of all option key-value pairs.

source
Dagger.@optionMacro
@option name myfunc(A, B, C) = value

A convenience macro for defining default_option. For example:

Dagger.@option single mylocalfunc(Int) = 1

The above call will set the single option to 1 for any Dagger task calling mylocalfunc(Int) with an Int argument.

source
Dagger.default_optionFunction
default_option(::Val{name}, Tf, Targs...) where name = value

Defines the default value for option name to value when Dagger is preparing to execute a function with type Tf with the argument types Targs. Users and libraries may override this to set default values for tasks.

An easier way to define these defaults is with @option.

Note that the actual task's argument values are not passed, as it may not always be possible or efficient to gather all Dagger task arguments on one worker.

This function may be executed within the scheduler, so it should generally be made very cheap to execute. If the function throws an error, the scheduler will use whatever the global default value is for that option instead.

source

Data Management Functions

Dagger.tochunkFunction
tochunk(x, proc::Processor, scope::AbstractScope; device=nothing, kwargs...) -> Chunk

Create a chunk from data x which resides on proc and which has scope scope.

device specifies a MemPool.StorageDevice (which is itself wrapped in a Chunk) which will be used to manage the reference contained in the Chunk generated by this function. If device is nothing (the default), the data will be inspected to determine if it's safe to serialize; if so, the default MemPool storage device will be used; if not, then a MemPool.CPURAMDevice will be used.

All other kwargs are passed directly to MemPool.poolset.

source
Dagger.mutableFunction
mutable(f::Base.Callable; worker, processor, scope) -> Chunk

Calls f() on the specified worker or processor, returning a Chunk referencing the result with the specified scope scope.

source
Dagger.shardFunction
shard(f; kwargs...) -> Chunk{Shard}

Executes f on all workers in workers, wrapping the result in a process-scoped Chunk, and constructs a Chunk{Shard} containing all of these Chunks on the current worker.

Keyword arguments:

  • procs – The list of processors to create pieces on. May be any iterable container of Processors.
  • workers – The list of workers to create pieces on. May be any iterable container of Integers.
  • per_thread::Bool=false – If true, creates a piece per each thread, rather than a piece per each worker.
source

Data Dependencies Functions

Dagger.spawn_datadepsFunction
spawn_datadeps(f::Base.Callable; traversal::Symbol=:inorder)

Constructs a "datadeps" (data dependencies) region and calls f within it. Dagger tasks launched within f may wrap their arguments with In, Out, or InOut to indicate whether the task will read, write, or read+write that argument, respectively. These argument dependencies will be used to specify which tasks depend on each other based on the following rules:

  • Dependencies across different arguments are independent; only dependencies on the same argument synchronize with each other ("same-ness" is determined based on isequal)
  • InOut is the same as In and Out applied simultaneously, and synchronizes with the union of the In and Out effects
  • Any two or more In dependencies do not synchronize with each other, and may execute in parallel
  • An Out dependency synchronizes with any previous In and Out dependencies
  • An In dependency synchronizes with any previous Out dependencies
  • If unspecified, an In dependency is assumed

In general, the result of executing tasks following the above rules will be equivalent to simply executing tasks sequentially and in order of submission. Of course, if dependencies are incorrectly specified, undefined behavior (and unexpected results) may occur.

Unlike other Dagger tasks, tasks executed within a datadeps region are allowed to write to their arguments when annotated with Out or InOut appropriately.

At the end of executing f, spawn_datadeps will wait for all launched tasks to complete, rethrowing the first error, if any. The result of f will be returned from spawn_datadeps.

The keyword argument traversal controls the order that tasks are launched by the scheduler, and may be set to :bfs or :dfs for Breadth-First Scheduling or Depth-First Scheduling, respectively. All traversal orders respect the dependencies and ordering of the launched tasks, but may provide better or worse performance for a given set of datadeps tasks. This argument is experimental and subject to change.

source

Scope Functions

Dagger.scopeFunction
scope(scs...) -> AbstractScope
+Functions and Macros · Dagger.jl

Dagger Functions

Task Functions/Macros

Dagger.@spawnMacro
Dagger.@spawn [option=value]... f(args...; kwargs...) -> DTask

Spawns a Dagger DTask that will call f(args...; kwargs...). This DTask is like a Julia Task, and has many similarities:

  • The DTask can be wait'd on and fetch'd from to see its final result
  • By default, the DTask will be automatically run on the first available compute resource
  • If all dependencies are satisfied, the DTask will be run as soon as possible
  • The DTask may be run in parallel with other DTasks, and the scheduler will automatically manage dependencies
  • If a DTask throws an exception, it will be propagated to any calls to fetch, but not to calls to wait

However, the DTask also has many key differences from a Task:

  • The DTask may run on any thread of any Julia process, and even on a remote machine, in your cluster (see Distributed.addprocs)
  • The DTask might automatically utilize GPUs or other accelerators, if available
  • If arguments to a DTask are also DTasks, then the scheduler will execute those arguments' DTasks first, before running the "downstream" task
  • If an argument to a DTask t2 is a DTask t1, then the result of t1 (gotten via fetch(t1)) will be passed to t2 (no need for t2 to call fetch!)
  • DTasks are generally expected to be defined "functionally", meaning that they should not mutate global state, mutate their arguments, or have side effects
  • DTasks are function call-focused, meaning that Dagger.@spawn expects a single function call, and not a block of code
  • All DTask arguments are expected to be safe to serialize and send to other Julia processes; if not, use the scope option or Dagger.@mutable to control execution location

Options to the DTask can be set before the call to f with key-value syntax, e.g. Dagger.@spawn myopt=2 do_something(1, 3.0), which would set the option myopt to 2 for this task. Multiple options may be provided, which are specified like Dagger.@spawn myopt=2 otheropt=4 do_something(1, 3.0).

These options control a variety of properties of the resulting DTask:

  • scope: The execution "scope" of the task, which determines where the task will run. By default, the task will run on the first available compute resource. If you have multiple compute resources, you can specify a scope to run the task on a specific resource. For example, Dagger.@spawn scope=Dagger.scope(worker=2) do_something(1, 3.0) would run do_something(1, 3.0) on worker 2.
  • meta: If true, instead of the scheduler automatically fetching values from other tasks, the raw Chunk objects will be passed to f. Useful for doing manual fetching or manipulation of Chunk references. Non-Chunk arguments are still passed as-is.

Other options exist; see Dagger.Sch.ThunkOptions for the full list.

This macro is a semi-thin wrapper around Dagger.spawn - it creates a call to Dagger.spawn on f with arguments args and keyword arguments kwargs, and also passes along any options in an Options struct. For example, Dagger.@spawn myopt=2 do_something(1, 3.0) would essentially become Dagger.spawn(do_something, Dagger.Options(;myopt=2), 1, 3.0).

source
Dagger.spawnFunction
Dagger.spawn(f, args...; kwargs...) -> DTask

Spawns a DTask that will call f(args...; kwargs...). Also supports passing a Dagger.Options struct as the first argument to set task options. See Dagger.@spawn for more details on DTasks.

source

Task Options Functions/Macros

Dagger.with_optionsFunction
with_options(f, options::NamedTuple) -> Any
+with_options(f; options...) -> Any

Sets one or more options to the given values, executes f(), resets the options to their previous values, and returns the result of f(). This is the recommended way to set options, as it only affects tasks spawned within its scope. Note that setting an option here will propagate its value across Julia or Dagger tasks spawned by f() or its callees (i.e. the options propagate).

source
Dagger.get_optionsFunction
get_options(key::Symbol, default) -> Any
+get_options(key::Symbol) -> Any

Returns the value of the option named key. If option does not have a value set, then an error will be thrown, unless default is set, in which case it will be returned instead of erroring.

get_options() -> NamedTuple

Returns a NamedTuple of all option key-value pairs.

source
Dagger.@optionMacro
@option name myfunc(A, B, C) = value

A convenience macro for defining default_option. For example:

Dagger.@option single mylocalfunc(Int) = 1

The above call will set the single option to 1 for any Dagger task calling mylocalfunc(Int) with an Int argument.

source
Dagger.default_optionFunction
default_option(::Val{name}, Tf, Targs...) where name = value

Defines the default value for option name to value when Dagger is preparing to execute a function with type Tf with the argument types Targs. Users and libraries may override this to set default values for tasks.

An easier way to define these defaults is with @option.

Note that the actual task's argument values are not passed, as it may not always be possible or efficient to gather all Dagger task arguments on one worker.

This function may be executed within the scheduler, so it should generally be made very cheap to execute. If the function throws an error, the scheduler will use whatever the global default value is for that option instead.

source

Data Management Functions

Dagger.tochunkFunction
tochunk(x, proc::Processor, scope::AbstractScope; device=nothing, kwargs...) -> Chunk

Create a chunk from data x which resides on proc and which has scope scope.

device specifies a MemPool.StorageDevice (which is itself wrapped in a Chunk) which will be used to manage the reference contained in the Chunk generated by this function. If device is nothing (the default), the data will be inspected to determine if it's safe to serialize; if so, the default MemPool storage device will be used; if not, then a MemPool.CPURAMDevice will be used.

All other kwargs are passed directly to MemPool.poolset.

source
Dagger.mutableFunction
mutable(f::Base.Callable; worker, processor, scope) -> Chunk

Calls f() on the specified worker or processor, returning a Chunk referencing the result with the specified scope scope.

source
Dagger.shardFunction
shard(f; kwargs...) -> Chunk{Shard}

Executes f on all workers in workers, wrapping the result in a process-scoped Chunk, and constructs a Chunk{Shard} containing all of these Chunks on the current worker.

Keyword arguments:

  • procs – The list of processors to create pieces on. May be any iterable container of Processors.
  • workers – The list of workers to create pieces on. May be any iterable container of Integers.
  • per_thread::Bool=false – If true, creates a piece per each thread, rather than a piece per each worker.
source

Data Dependencies Functions

Dagger.spawn_datadepsFunction
spawn_datadeps(f::Base.Callable; traversal::Symbol=:inorder)

Constructs a "datadeps" (data dependencies) region and calls f within it. Dagger tasks launched within f may wrap their arguments with In, Out, or InOut to indicate whether the task will read, write, or read+write that argument, respectively. These argument dependencies will be used to specify which tasks depend on each other based on the following rules:

  • Dependencies across different arguments are independent; only dependencies on the same argument synchronize with each other ("same-ness" is determined based on isequal)
  • InOut is the same as In and Out applied simultaneously, and synchronizes with the union of the In and Out effects
  • Any two or more In dependencies do not synchronize with each other, and may execute in parallel
  • An Out dependency synchronizes with any previous In and Out dependencies
  • An In dependency synchronizes with any previous Out dependencies
  • If unspecified, an In dependency is assumed

In general, the result of executing tasks following the above rules will be equivalent to simply executing tasks sequentially and in order of submission. Of course, if dependencies are incorrectly specified, undefined behavior (and unexpected results) may occur.

Unlike other Dagger tasks, tasks executed within a datadeps region are allowed to write to their arguments when annotated with Out or InOut appropriately.

At the end of executing f, spawn_datadeps will wait for all launched tasks to complete, rethrowing the first error, if any. The result of f will be returned from spawn_datadeps.

The keyword argument traversal controls the order that tasks are launched by the scheduler, and may be set to :bfs or :dfs for Breadth-First Scheduling or Depth-First Scheduling, respectively. All traversal orders respect the dependencies and ordering of the launched tasks, but may provide better or worse performance for a given set of datadeps tasks. This argument is experimental and subject to change.

source

Scope Functions

Dagger.scopeFunction
scope(scs...) -> AbstractScope
 scope(;scs...) -> AbstractScope

Constructs an AbstractScope from a set of scope specifiers. Each element in scs is a separate specifier; if scs is empty, an empty UnionScope() is produced; if scs has one element, then exactly one specifier is constructed; if scs has more than one element, a UnionScope of the scopes specified by scs is constructed. A variety of specifiers can be passed to construct a scope:

  • :any - Constructs an AnyScope()
  • :default - Constructs a DefaultScope()
  • (scs...,) - Constructs a UnionScope of scopes, each specified by scs
  • thread=tid or threads=[tids...] - Constructs an ExactScope or UnionScope containing all Dagger.ThreadProcs with thread ID tid/tids across all workers.
  • worker=wid or workers=[wids...] - Constructs a ProcessScope or UnionScope containing all Dagger.ThreadProcs with worker ID wid/wids across all threads.
  • thread=tid/threads=tids and worker=wid/workers=wids - Constructs an ExactScope, ProcessScope, or UnionScope containing all Dagger.ThreadProcs with worker ID wid/wids and threads tid/tids.

Aside from the worker and thread specifiers, it's possible to add custom specifiers for scoping to other kinds of processors (like GPUs) or providing different ways to specify a scope. Specifier selection is determined by a precedence ordering: by default, all specifiers have precedence 0, which can be changed by defining scope_key_precedence(::Val{spec}) = precedence (where spec is the specifier as a Symbol). The specifier with the highest precedence in a set of specifiers is used to determine the scope by calling to_scope(::Val{spec}, sc::NamedTuple) (where sc is the full set of specifiers), which should be overriden for each custom specifier, and which returns an AbstractScope. For example:

# Setup a GPU specifier
 Dagger.scope_key_precedence(::Val{:gpu}) = 1
 Dagger.to_scope(::Val{:gpu}, sc::NamedTuple) = ExactScope(MyGPUDevice(sc.worker, sc.gpu))
 
 # Generate an `ExactScope` for `MyGPUDevice` on worker 2, device 3
-Dagger.scope(gpu=3, worker=2)
source
Dagger.constrainFunction
constraint(x::AbstractScope, y::AbstractScope) -> ::AbstractScope

Constructs a scope that is the intersection of scopes x and y.

source

Processor Functions

Dagger.execute!Function
execute!(proc::Processor, f, args...; kwargs...) -> Any

Executes the function f with arguments args and keyword arguments kwargs on processor proc. This function can be overloaded by Processor subtypes to allow executing function calls differently than normal Julia.

source
Dagger.iscompatibleFunction
iscompatible(proc::Processor, opts, f, Targs...) -> Bool

Indicates whether proc can execute f over Targs given opts. Processor subtypes should overload this function to return true if and only if it is essentially guaranteed that f(::Targs...) is supported. Additionally, iscompatible_func and iscompatible_arg can be overriden to determine compatibility of f and Targs individually. The default implementation returns false.

source
Dagger.default_enabledFunction
default_enabled(proc::Processor) -> Bool

Returns whether processor proc is enabled by default. The default value is false, which is an opt-out of the processor from execution when not specifically requested by the user, and true implies opt-in, which causes the processor to always participate in execution when possible.

source
Dagger.get_processorsFunction
get_processors(proc::Processor) -> Set{<:Processor}

Returns the set of processors contained in proc, if any. Processor subtypes should overload this function if they can contain sub-processors. The default method will return a Set containing proc itself.

source
Dagger.get_parentFunction
get_parent(proc::Processor) -> Processor

Returns the parent processor for proc. The ultimate parent processor is an OSProc. Processor subtypes should overload this to return their most direct parent.

source
Dagger.moveFunction
move(from_proc::Processor, to_proc::Processor, x)

Moves and/or converts x such that it's available and suitable for usage on the to_proc processor. This function can be overloaded by Processor subtypes to transport arguments and convert them to an appropriate form before being used for exection. Subtypes of Processor wishing to implement efficient data movement should provide implementations where x::Chunk.

source

Context Functions

Dagger.addprocs!Function
addprocs!(ctx::Context, xs)

Add new workers xs to ctx.

Workers will typically be assigned new tasks in the next scheduling iteration if scheduling is ongoing.

Workers can be either Processors or the underlying process IDs as Integers.

source
Dagger.rmprocs!Function
rmprocs!(ctx::Context, xs)

Remove the specified workers xs from ctx.

Workers will typically finish all their assigned tasks if scheduling is ongoing but will not be assigned new tasks after removal.

Workers can be either Processors or the underlying process IDs as Integers.

source

DTask Execution Environment Functions

These functions are used within the function called by a DTask.

Dynamic Scheduler Control Functions

These functions query and control the scheduler remotely.

Base.fetchFunction
Base.fetch(c::DArray)

If a DArray tree has a Thunk in it, make the whole thing a big thunk.

source

Waits on a thunk to complete, and fetches its result.

source
+Dagger.scope(gpu=3, worker=2)
source
Dagger.constrainFunction
constraint(x::AbstractScope, y::AbstractScope) -> ::AbstractScope

Constructs a scope that is the intersection of scopes x and y.

source

Processor Functions

Dagger.execute!Function
execute!(proc::Processor, f, args...; kwargs...) -> Any

Executes the function f with arguments args and keyword arguments kwargs on processor proc. This function can be overloaded by Processor subtypes to allow executing function calls differently than normal Julia.

source
Dagger.iscompatibleFunction
iscompatible(proc::Processor, opts, f, Targs...) -> Bool

Indicates whether proc can execute f over Targs given opts. Processor subtypes should overload this function to return true if and only if it is essentially guaranteed that f(::Targs...) is supported. Additionally, iscompatible_func and iscompatible_arg can be overriden to determine compatibility of f and Targs individually. The default implementation returns false.

source
Dagger.default_enabledFunction
default_enabled(proc::Processor) -> Bool

Returns whether processor proc is enabled by default. The default value is false, which is an opt-out of the processor from execution when not specifically requested by the user, and true implies opt-in, which causes the processor to always participate in execution when possible.

source
Dagger.get_processorsFunction
get_processors(proc::Processor) -> Set{<:Processor}

Returns the set of processors contained in proc, if any. Processor subtypes should overload this function if they can contain sub-processors. The default method will return a Set containing proc itself.

source
Dagger.get_parentFunction
get_parent(proc::Processor) -> Processor

Returns the parent processor for proc. The ultimate parent processor is an OSProc. Processor subtypes should overload this to return their most direct parent.

source
Dagger.moveFunction
move(from_proc::Processor, to_proc::Processor, x)

Moves and/or converts x such that it's available and suitable for usage on the to_proc processor. This function can be overloaded by Processor subtypes to transport arguments and convert them to an appropriate form before being used for exection. Subtypes of Processor wishing to implement efficient data movement should provide implementations where x::Chunk.

source

Context Functions

Dagger.addprocs!Function
addprocs!(ctx::Context, xs)

Add new workers xs to ctx.

Workers will typically be assigned new tasks in the next scheduling iteration if scheduling is ongoing.

Workers can be either Processors or the underlying process IDs as Integers.

source
Dagger.rmprocs!Function
rmprocs!(ctx::Context, xs)

Remove the specified workers xs from ctx.

Workers will typically finish all their assigned tasks if scheduling is ongoing but will not be assigned new tasks after removal.

Workers can be either Processors or the underlying process IDs as Integers.

source

DTask Execution Environment Functions

These functions are used within the function called by a DTask.

Dynamic Scheduler Control Functions

These functions query and control the scheduler remotely.

Base.fetchFunction
Base.fetch(c::DArray)

If a DArray tree has a Thunk in it, make the whole thing a big thunk.

source

Waits on a thunk to complete, and fetches its result.

source
diff --git a/dev/api-dagger/types/index.html b/dev/api-dagger/types/index.html index c3180b53..b24891e5 100644 --- a/dev/api-dagger/types/index.html +++ b/dev/api-dagger/types/index.html @@ -4,7 +4,7 @@ Thunk(sin, (π,)) julia> collect(t) # computes the result and returns it to the current process -1.2246467991473532e-16

Arguments

for each property are described in the next section.

Public Properties

Chunks and passing the raw arguments to f, instead pass the Chunk. Useful for doing manual fetching or manipulation of Chunk references. Non-Chunk arguments are still passed as-is.

f is a callable struct that exists on a given processor and should be transferred appropriately.

Useful if f is a function or callable struct that may only be transferred to, and executed within, the specified scope.

Options

If omitted, options can also be specified by passing key-value pairs as kwargs.

source
Dagger.DTaskType
DTask

Returned from Dagger.@spawn/Dagger.spawn calls. Represents a task that is in the scheduler, potentially ready to execute, executing, or finished executing. May be fetch'd or wait'd on at any time. See Dagger.@spawn for more details.

source

Task Options Types

Dagger.OptionsType
Options(::NamedTuple)
-Options(; kwargs...)

Options for thunks and the scheduler. See Task Spawning for more information.

source
Dagger.Sch.ThunkOptionsType
ThunkOptions

Stores Thunk-local options to be passed to the Dagger.Sch scheduler.

Arguments

  • single::Int=0: (Deprecated) Force thunk onto worker with specified id. 0 disables this option.
  • proclist=nothing: (Deprecated) Force thunk to use one or more processors that are instances/subtypes of a contained type. Alternatively, a function can be supplied, and the function will be called with a processor as the sole argument and should return a Bool result to indicate whether or not to use the given processor. nothing enables all default processors.
  • time_util::Dict{Type,Any}: Indicates the maximum expected time utilization for this thunk. Each keypair maps a processor type to the utilization, where the value can be a real (approximately the number of nanoseconds taken), or MaxUtilization() (utilizes all processors of this type). By default, the scheduler assumes that this thunk only uses one processor.
  • alloc_util::Dict{Type,UInt64}: Indicates the maximum expected memory utilization for this thunk. Each keypair maps a processor type to the utilization, where the value is an integer representing approximately the maximum number of bytes allocated at any one time.
  • occupancy::Dict{Type,Real}: Indicates the maximum expected processor occupancy for this thunk. Each keypair maps a processor type to the utilization, where the value can be a real between 0 and 1 (the occupancy ratio, where 1 is full occupancy). By default, the scheduler assumes that this thunk has full occupancy.
  • allow_errors::Bool=true: Allow this thunk to error without affecting non-dependent thunks.
  • checkpoint=nothing: If not nothing, uses the provided function to save the result of the thunk to persistent storage, for later retrieval by restore.
  • restore=nothing: If not nothing, uses the provided function to return the (cached) result of this thunk, were it to execute. If this returns a Chunk, this thunk will be skipped, and its result will be set to the Chunk. If nothing is returned, restoring is skipped, and the thunk will execute as usual. If this function throws an error, restoring will be skipped, and the error will be displayed.
  • storage::Union{Chunk,Nothing}=nothing: If not nothing, references a MemPool.StorageDevice which will be passed to MemPool.poolset internally when constructing Chunks (such as when constructing the return value). The device must support MemPool.CPURAMResource. When nothing, uses MemPool.GLOBAL_DEVICE[].
  • storage_root_tag::Any=nothing: If not nothing, specifies the MemPool storage leaf tag to associate with the thunk's result. This tag can be used by MemPool's storage devices to manipulate their behavior, such as the file name used to store data on disk."
  • storage_leaf_tag::MemPool.Tag,Nothing}=nothing: If not nothing, specifies the MemPool storage leaf tag to associate with the thunk's result. This tag can be used by MemPool's storage devices to manipulate their behavior, such as the file name used to store data on disk."
  • storage_retain::Bool=false: The value of retain to pass to MemPool.poolset when constructing the result Chunk.
source
Dagger.Sch.SchedulerOptionsType
SchedulerOptions

Stores DAG-global options to be passed to the Dagger.Sch scheduler.

Arguments

  • single::Int=0: (Deprecated) Force all work onto worker with specified id. 0 disables this option.
  • proclist=nothing: (Deprecated) Force scheduler to use one or more processors that are instances/subtypes of a contained type. Alternatively, a function can be supplied, and the function will be called with a processor as the sole argument and should return a Bool result to indicate whether or not to use the given processor. nothing enables all default processors.
  • allow_errors::Bool=true: Allow thunks to error without affecting non-dependent thunks.
  • checkpoint=nothing: If not nothing, uses the provided function to save the final result of the current scheduler invocation to persistent storage, for later retrieval by restore.
  • restore=nothing: If not nothing, uses the provided function to return the (cached) final result of the current scheduler invocation, were it to execute. If this returns a Chunk, all thunks will be skipped, and the Chunk will be returned. If nothing is returned, restoring is skipped, and the scheduler will execute as usual. If this function throws an error, restoring will be skipped, and the error will be displayed.
source

Data Management Types

Dagger.ChunkType
Chunk

A reference to a piece of data located on a remote worker. Chunks are typically created with Dagger.tochunk(data), and the data can then be accessed from any worker with collect(::Chunk). Chunks are serialization-safe, and use distributed refcounting (provided by MemPool.DRef) to ensure that the data referenced by a Chunk won't be GC'd, as long as a reference exists on some worker.

Each Chunk is associated with a given Dagger.Processor, which is (in a sense) the processor that "owns" or contains the data. Calling collect(::Chunk) will perform data movement and conversions defined by that processor to safely serialize the data to the calling worker.

Constructors

See tochunk.

source
Dagger.ShardType

Maps a value to one of multiple distributed "mirror" values automatically when used as a thunk argument. Construct using @shard or shard.

source

Data Dependencies Types

Dagger.InType

Specifies a read-only dependency.

source
Dagger.OutType

Specifies a write-only dependency.

source
Dagger.InOutType

Specifies a read-write dependency.

source
Dagger.DepsType

Specifies one or more dependencies.

source

Processor Types

Dagger.ProcessorType
Processor

An abstract type representing a processing device and associated memory, where data can be stored and operated on. Subtypes should be immutable, and instances should compare equal if they represent the same logical processing device/memory. Subtype instances should be serializable between different nodes. Subtype instances may contain a "parent" Processor to make it easy to transfer data to/from other types of Processor at runtime.

source
Dagger.OSProcType
OSProc <: Processor

Julia CPU (OS) process, identified by Distributed pid. The logical parent of all processors on a given node, but otherwise does not participate in computations.

source
Dagger.ThreadProcType
ThreadProc <: Processor

Julia CPU (OS) thread, identified by Julia thread ID.

source

Scope Types

Dagger.AnyScopeType

Widest scope that contains all processors.

source
Dagger.NodeScopeType

Scoped to the same physical node.

source
Dagger.ProcessScopeType

Scoped to the same OS process.

source
Dagger.ProcessorTypeScopeFunction

Scoped to any processor with a given supertype.

source
Dagger.TaintScopeType

Taints a scope for later evaluation.

source
Dagger.UnionScopeType

Union of two or more scopes.

source
Dagger.ExactScopeType

Scoped to a specific processor.

source

Context Types

Dagger.ContextType
Context(xs::Vector{OSProc}) -> Context
-Context(xs::Vector{Int}) -> Context

Create a Context, by default adding each available worker.

It is also possible to create a Context from a vector of OSProc, or equivalently the underlying process ids can also be passed directly as a Vector{Int}.

Special fields include:

  • 'log_sink': A log sink object to use, if any.
  • profile::Bool: Whether or not to perform profiling with Profile stdlib.
source

Array Types

Dagger.DArrayType
DArray{T,N,F}(domain, subdomains, chunks, concat)
-DArray(T, domain, subdomains, chunks, [concat=cat])

An N-dimensional distributed array of element type T, with a concatenation function of type F.

Arguments

  • T: element type
  • domain::ArrayDomain{N}: the whole ArrayDomain of the array
  • subdomains::AbstractArray{ArrayDomain{N}, N}: a DomainBlocks of the same dimensions as the array
  • chunks::AbstractArray{Union{Chunk,Thunk}, N}: an array of chunks of dimension N
  • concat::F: a function of type F. concat(x, y; dims=d) takes two chunks x and y and concatenates them along dimension d. cat is used by default.
source
Dagger.BlocksType
Blocks(xs...)

Indicates the size of an array operation, specified as xs, whose length indicates the number of dimensions in the resulting array.

source
Dagger.ArrayDomainType
ArrayDomain{N}

An N-dimensional domain over an array.

source
Dagger.UnitDomainType
UnitDomain

Default domain – has no information about the value

source

Logging Event Types

Dagger.Events.BytesAllocdType
BytesAllocd

Tracks memory allocated for Chunks.

source
Dagger.Events.ProcessorSaturationType
ProcessorSaturation

Tracks the compute saturation (running tasks) per-processor.

source
Dagger.Events.WorkerSaturationType
WorkerSaturation

Tracks the compute saturation (running tasks).

source
+1.2246467991473532e-16

Arguments

for each property are described in the next section.

Public Properties

Chunks and passing the raw arguments to f, instead pass the Chunk. Useful for doing manual fetching or manipulation of Chunk references. Non-Chunk arguments are still passed as-is.

f is a callable struct that exists on a given processor and should be transferred appropriately.

Useful if f is a function or callable struct that may only be transferred to, and executed within, the specified scope.

Options

If omitted, options can also be specified by passing key-value pairs as kwargs.

source
Dagger.DTaskType
DTask

Returned from Dagger.@spawn/Dagger.spawn calls. Represents a task that is in the scheduler, potentially ready to execute, executing, or finished executing. May be fetch'd or wait'd on at any time. See Dagger.@spawn for more details.

source

Task Options Types

Dagger.OptionsType
Options(::NamedTuple)
+Options(; kwargs...)

Options for thunks and the scheduler. See Task Spawning for more information.

source
Dagger.Sch.ThunkOptionsType
ThunkOptions

Stores Thunk-local options to be passed to the Dagger.Sch scheduler.

Arguments

  • single::Int=0: (Deprecated) Force thunk onto worker with specified id. 0 disables this option.
  • proclist=nothing: (Deprecated) Force thunk to use one or more processors that are instances/subtypes of a contained type. Alternatively, a function can be supplied, and the function will be called with a processor as the sole argument and should return a Bool result to indicate whether or not to use the given processor. nothing enables all default processors.
  • time_util::Dict{Type,Any}: Indicates the maximum expected time utilization for this thunk. Each keypair maps a processor type to the utilization, where the value can be a real (approximately the number of nanoseconds taken), or MaxUtilization() (utilizes all processors of this type). By default, the scheduler assumes that this thunk only uses one processor.
  • alloc_util::Dict{Type,UInt64}: Indicates the maximum expected memory utilization for this thunk. Each keypair maps a processor type to the utilization, where the value is an integer representing approximately the maximum number of bytes allocated at any one time.
  • occupancy::Dict{Type,Real}: Indicates the maximum expected processor occupancy for this thunk. Each keypair maps a processor type to the utilization, where the value can be a real between 0 and 1 (the occupancy ratio, where 1 is full occupancy). By default, the scheduler assumes that this thunk has full occupancy.
  • allow_errors::Bool=true: Allow this thunk to error without affecting non-dependent thunks.
  • checkpoint=nothing: If not nothing, uses the provided function to save the result of the thunk to persistent storage, for later retrieval by restore.
  • restore=nothing: If not nothing, uses the provided function to return the (cached) result of this thunk, were it to execute. If this returns a Chunk, this thunk will be skipped, and its result will be set to the Chunk. If nothing is returned, restoring is skipped, and the thunk will execute as usual. If this function throws an error, restoring will be skipped, and the error will be displayed.
  • storage::Union{Chunk,Nothing}=nothing: If not nothing, references a MemPool.StorageDevice which will be passed to MemPool.poolset internally when constructing Chunks (such as when constructing the return value). The device must support MemPool.CPURAMResource. When nothing, uses MemPool.GLOBAL_DEVICE[].
  • storage_root_tag::Any=nothing: If not nothing, specifies the MemPool storage leaf tag to associate with the thunk's result. This tag can be used by MemPool's storage devices to manipulate their behavior, such as the file name used to store data on disk."
  • storage_leaf_tag::MemPool.Tag,Nothing}=nothing: If not nothing, specifies the MemPool storage leaf tag to associate with the thunk's result. This tag can be used by MemPool's storage devices to manipulate their behavior, such as the file name used to store data on disk."
  • storage_retain::Bool=false: The value of retain to pass to MemPool.poolset when constructing the result Chunk.
source
Dagger.Sch.SchedulerOptionsType
SchedulerOptions

Stores DAG-global options to be passed to the Dagger.Sch scheduler.

Arguments

  • single::Int=0: (Deprecated) Force all work onto worker with specified id. 0 disables this option.
  • proclist=nothing: (Deprecated) Force scheduler to use one or more processors that are instances/subtypes of a contained type. Alternatively, a function can be supplied, and the function will be called with a processor as the sole argument and should return a Bool result to indicate whether or not to use the given processor. nothing enables all default processors.
  • allow_errors::Bool=true: Allow thunks to error without affecting non-dependent thunks.
  • checkpoint=nothing: If not nothing, uses the provided function to save the final result of the current scheduler invocation to persistent storage, for later retrieval by restore.
  • restore=nothing: If not nothing, uses the provided function to return the (cached) final result of the current scheduler invocation, were it to execute. If this returns a Chunk, all thunks will be skipped, and the Chunk will be returned. If nothing is returned, restoring is skipped, and the scheduler will execute as usual. If this function throws an error, restoring will be skipped, and the error will be displayed.
source

Data Management Types

Dagger.ChunkType
Chunk

A reference to a piece of data located on a remote worker. Chunks are typically created with Dagger.tochunk(data), and the data can then be accessed from any worker with collect(::Chunk). Chunks are serialization-safe, and use distributed refcounting (provided by MemPool.DRef) to ensure that the data referenced by a Chunk won't be GC'd, as long as a reference exists on some worker.

Each Chunk is associated with a given Dagger.Processor, which is (in a sense) the processor that "owns" or contains the data. Calling collect(::Chunk) will perform data movement and conversions defined by that processor to safely serialize the data to the calling worker.

Constructors

See tochunk.

source
Dagger.ShardType

Maps a value to one of multiple distributed "mirror" values automatically when used as a thunk argument. Construct using @shard or shard.

source

Data Dependencies Types

Dagger.InType

Specifies a read-only dependency.

source
Dagger.OutType

Specifies a write-only dependency.

source
Dagger.InOutType

Specifies a read-write dependency.

source
Dagger.DepsType

Specifies one or more dependencies.

source

Processor Types

Dagger.ProcessorType
Processor

An abstract type representing a processing device and associated memory, where data can be stored and operated on. Subtypes should be immutable, and instances should compare equal if they represent the same logical processing device/memory. Subtype instances should be serializable between different nodes. Subtype instances may contain a "parent" Processor to make it easy to transfer data to/from other types of Processor at runtime.

source
Dagger.OSProcType
OSProc <: Processor

Julia CPU (OS) process, identified by Distributed pid. The logical parent of all processors on a given node, but otherwise does not participate in computations.

source
Dagger.ThreadProcType
ThreadProc <: Processor

Julia CPU (OS) thread, identified by Julia thread ID.

source

Scope Types

Dagger.AnyScopeType

Widest scope that contains all processors.

source
Dagger.NodeScopeType

Scoped to the same physical node.

source
Dagger.ProcessScopeType

Scoped to the same OS process.

source
Dagger.ProcessorTypeScopeFunction

Scoped to any processor with a given supertype.

source
Dagger.TaintScopeType

Taints a scope for later evaluation.

source
Dagger.UnionScopeType

Union of two or more scopes.

source
Dagger.ExactScopeType

Scoped to a specific processor.

source

Context Types

Dagger.ContextType
Context(xs::Vector{OSProc}) -> Context
+Context(xs::Vector{Int}) -> Context

Create a Context, by default adding each available worker.

It is also possible to create a Context from a vector of OSProc, or equivalently the underlying process ids can also be passed directly as a Vector{Int}.

Special fields include:

  • 'log_sink': A log sink object to use, if any.
  • profile::Bool: Whether or not to perform profiling with Profile stdlib.
source

Array Types

Dagger.DArrayType
DArray{T,N,F}(domain, subdomains, chunks, concat)
+DArray(T, domain, subdomains, chunks, [concat=cat])

An N-dimensional distributed array of element type T, with a concatenation function of type F.

Arguments

  • T: element type
  • domain::ArrayDomain{N}: the whole ArrayDomain of the array
  • subdomains::AbstractArray{ArrayDomain{N}, N}: a DomainBlocks of the same dimensions as the array
  • chunks::AbstractArray{Union{Chunk,Thunk}, N}: an array of chunks of dimension N
  • concat::F: a function of type F. concat(x, y; dims=d) takes two chunks x and y and concatenates them along dimension d. cat is used by default.
source
Dagger.BlocksType
Blocks(xs...)

Indicates the size of an array operation, specified as xs, whose length indicates the number of dimensions in the resulting array.

source
Dagger.ArrayDomainType
ArrayDomain{N}

An N-dimensional domain over an array.

source
Dagger.UnitDomainType
UnitDomain

Default domain – has no information about the value

source

Logging Event Types

Dagger.Events.BytesAllocdType
BytesAllocd

Tracks memory allocated for Chunks.

source
Dagger.Events.ProcessorSaturationType
ProcessorSaturation

Tracks the compute saturation (running tasks) per-processor.

source
Dagger.Events.WorkerSaturationType
WorkerSaturation

Tracks the compute saturation (running tasks).

source
diff --git a/dev/api-daggerwebdash/functions/index.html b/dev/api-daggerwebdash/functions/index.html index 09b3aba5..a7abc502 100644 --- a/dev/api-daggerwebdash/functions/index.html +++ b/dev/api-daggerwebdash/functions/index.html @@ -1,2 +1,2 @@ -Functions and Macros · Dagger.jl
+Functions and Macros · Dagger.jl
diff --git a/dev/api-daggerwebdash/types/index.html b/dev/api-daggerwebdash/types/index.html index cbb4b475..e2bd25b5 100644 --- a/dev/api-daggerwebdash/types/index.html +++ b/dev/api-daggerwebdash/types/index.html @@ -1,5 +1,5 @@ -Types · Dagger.jl

DaggerWebDash Types

Logging Event Types

DaggerWebDash.D3RendererType
D3Renderer(port::Int, port_range::UnitRange; seek_store=nothing) -> D3Renderer

Constructs a D3Renderer, which is a TimespanLogging aggregator which renders the logs over HTTP using the d3.js library. port is the port that will be serving the HTTP website. port_range specifies a range of ports that will be used to listen for connections from other Dagger workers. seek_store, if specified, is a Tables.jl-compatible object that logs will be written to and read from. This table can be written to disk and then re-read later for offline log analysis.

source
DaggerWebDash.TableStorageType
TableStorage

LogWindow-compatible aggregator which stores logs in a Tables.jl-compatible sink.

Using a TableStorage is reasonably simple:

ml = TimespanLogging.MultiEventLog()
+Types · Dagger.jl

DaggerWebDash Types

Logging Event Types

DaggerWebDash.D3RendererType
D3Renderer(port::Int, port_range::UnitRange; seek_store=nothing) -> D3Renderer

Constructs a D3Renderer, which is a TimespanLogging aggregator which renders the logs over HTTP using the d3.js library. port is the port that will be serving the HTTP website. port_range specifies a range of ports that will be used to listen for connections from other Dagger workers. seek_store, if specified, is a Tables.jl-compatible object that logs will be written to and read from. This table can be written to disk and then re-read later for offline log analysis.

source
DaggerWebDash.TableStorageType
TableStorage

LogWindow-compatible aggregator which stores logs in a Tables.jl-compatible sink.

Using a TableStorage is reasonably simple:

ml = TimespanLogging.MultiEventLog()
 
 ... # Add some events
 
@@ -15,4 +15,4 @@
 ml.aggregators[:lw] = lw
 
 # Logs will now be saved into `df` automatically, and packages like
-# DaggerWebDash.jl will automatically use it to retrieve subsets of the logs.
source
+# DaggerWebDash.jl will automatically use it to retrieve subsets of the logs.
source
diff --git a/dev/api-timespanlogging/functions/index.html b/dev/api-timespanlogging/functions/index.html index d6443356..0c8a365b 100644 --- a/dev/api-timespanlogging/functions/index.html +++ b/dev/api-timespanlogging/functions/index.html @@ -1,2 +1,2 @@ -Functions and Macros · Dagger.jl

TimespanLogging Functions

Basic Functions

TimespanLogging.timespan_startFunction
timespan_start(ctx, category::Symbol, id, tl)

Generates an Event{:start} which denotes the start of an event. The event is categorized by category, and uniquely identified by id; these two must be the same passed to timespan_finish to close the event. tl is the "timeline" of the event, which is just an arbitrary payload attached to the event.

source
TimespanLogging.timespan_finishFunction
timespan_finish(ctx, category::Symbol, id, tl)

Generates an Event{:finish} which denotes the end of an event. The event is categorized by category, and uniquely identified by id; these two must be the same as previously passed to timespan_start. tl is the "timeline" of the event, which is just an arbitrary payload attached to the event.

source
TimespanLogging.get_logs!Function
get_logs!(::LocalEventLog, raw=false; only_local=false) -> Union{Vector{Timespan},Vector{Event}}

Get the logs from each process' local event log, clearing it in the process. Set raw to true to get potentially unmatched Events; the default is to return only matched events as Timespans. If only_local is set true, only process-local logs will be fetched; the default is to fetch logs from all processes.

source

Logging Metric Functions

+Functions and Macros · Dagger.jl

TimespanLogging Functions

Basic Functions

TimespanLogging.timespan_startFunction
timespan_start(ctx, category::Symbol, id, tl)

Generates an Event{:start} which denotes the start of an event. The event is categorized by category, and uniquely identified by id; these two must be the same passed to timespan_finish to close the event. tl is the "timeline" of the event, which is just an arbitrary payload attached to the event.

source
TimespanLogging.timespan_finishFunction
timespan_finish(ctx, category::Symbol, id, tl)

Generates an Event{:finish} which denotes the end of an event. The event is categorized by category, and uniquely identified by id; these two must be the same as previously passed to timespan_start. tl is the "timeline" of the event, which is just an arbitrary payload attached to the event.

source
TimespanLogging.get_logs!Function
get_logs!(::LocalEventLog, raw=false; only_local=false) -> Union{Vector{Timespan},Vector{Event}}

Get the logs from each process' local event log, clearing it in the process. Set raw to true to get potentially unmatched Events; the default is to return only matched events as Timespans. If only_local is set true, only process-local logs will be fetched; the default is to fetch logs from all processes.

source

Logging Metric Functions

diff --git a/dev/api-timespanlogging/types/index.html b/dev/api-timespanlogging/types/index.html index 91570f2b..1551c0a1 100644 --- a/dev/api-timespanlogging/types/index.html +++ b/dev/api-timespanlogging/types/index.html @@ -1,2 +1,2 @@ -Types · Dagger.jl

TimespanLogging Types

Log Sink Types

TimespanLogging.MultiEventLogType
MultiEventLog

Processes events immediately, generating multiple log streams. Multiple consumers may register themselves in the MultiEventLog, and when accessed, log events will be provided to all consumers. A consumer is simply a function or callable struct which will be called with an event when it's generated. The return value of the consumer will be pushed into a log stream dedicated to that consumer. Errors thrown by consumers will be caught and rendered, but will not otherwise interrupt consumption by other consumers, or future consumption cycles. An error will result in nothing being appended to that consumer's log.

source
TimespanLogging.LocalEventLogType
LocalEventLog

Stores events in a process-local array. Accessing the logs is all-or-nothing; if multiple consumers call get_logs!, they will get different sets of logs.

source

Built-in Event Types

+Types · Dagger.jl

TimespanLogging Types

Log Sink Types

TimespanLogging.MultiEventLogType
MultiEventLog

Processes events immediately, generating multiple log streams. Multiple consumers may register themselves in the MultiEventLog, and when accessed, log events will be provided to all consumers. A consumer is simply a function or callable struct which will be called with an event when it's generated. The return value of the consumer will be pushed into a log stream dedicated to that consumer. Errors thrown by consumers will be caught and rendered, but will not otherwise interrupt consumption by other consumers, or future consumption cycles. An error will result in nothing being appended to that consumer's log.

source
TimespanLogging.LocalEventLogType
LocalEventLog

Stores events in a process-local array. Accessing the logs is all-or-nothing; if multiple consumers call get_logs!, they will get different sets of logs.

source

Built-in Event Types

diff --git a/dev/benchmarking/index.html b/dev/benchmarking/index.html index adb4891e..2d54b790 100644 --- a/dev/benchmarking/index.html +++ b/dev/benchmarking/index.html @@ -1,2 +1,2 @@ -Benchmarking · Dagger.jl

Benchmarking Dagger

For ease of benchmarking changes to Dagger's scheduler and the DArray, a benchmarking script exists at benchmarks/benchmark.jl. This script currently allows benchmarking a non-negative matrix factorization (NNMF) algorithm, which we've found to be a good evaluator of scheduling performance. The benchmark script can test with and without Dagger, and also has support for using CUDA or AMD GPUs to accelerate the NNMF via DaggerGPU.jl.

The script checks for a number of environment variables, which are used to control the benchmarks that are performed (all of which are optional):

  • BENCHMARK_PROCS: Selects the number of Julia processes and threads to start-up. Specified as 8:4, this option would start 8 extra Julia processes, with 4 threads each. Defaults to 2 processors and 1 thread each.
  • BENCHMARK_REMOTES: Specifies a colon-separated list of remote servers to connect to and start Julia processes on, using BENCHMARK_PROCS to indicate the processor/thread configuration of those remotes. Disabled by default (uses the local machine).
  • BENCHMARK_OUTPUT_FORMAT: Selects the output format for benchmark results. Defaults to jls, which uses Julia's Serialization stdlib, and can also be jld to use JLD.jl.
  • BENCHMARK_RENDER: Configures rendering, which is disabled by default. Can be "live" or "offline", which are explained below.
  • BENCHMARK: Specifies the set of benchmarks to run as a comma-separated list, where each entry can be one of cpu, cuda, or amdgpu, and may optionally append +dagger (like cuda+dagger) to indicate whether or not to use Dagger. Defaults to cpu,cpu+dagger, which runs CPU benchmarks with and without Dagger.
  • BENCHMARK_SCALE: Determines how much to scale the benchmark sizing by, typically specified as a UnitRange{Int}. Defaults to 1:5:50, which runs each scale from 1 to 50, in steps of 5.

Rendering with BENCHMARK_RENDER

Dagger contains visualization code for the scheduler (as a Gantt chart) and thunk execution profiling (flamechart), which can be enabled with BENCHMARK_RENDER. Additionally, rendering can be done "live", served via a Mux.jl webserver run locally, or "offline", where the visualization will be embedded into the results output file. By default, rendering is disabled. If BENCHMARK_RENDER is set to live, a Mux webserver is started at localhost:8000 (the address is not yet configurable), and the Gantt chart and profiling flamechart will be rendered once the benchmarks start. If set to offline, data visualization will happen in the background, and will be passed in the results file.

Note that Gantt chart and flamechart output is only generated and relevant during Dagger execution.

TODO: Plotting

+Benchmarking · Dagger.jl

Benchmarking Dagger

For ease of benchmarking changes to Dagger's scheduler and the DArray, a benchmarking script exists at benchmarks/benchmark.jl. This script currently allows benchmarking a non-negative matrix factorization (NNMF) algorithm, which we've found to be a good evaluator of scheduling performance. The benchmark script can test with and without Dagger, and also has support for using CUDA or AMD GPUs to accelerate the NNMF via DaggerGPU.jl.

The script checks for a number of environment variables, which are used to control the benchmarks that are performed (all of which are optional):

  • BENCHMARK_PROCS: Selects the number of Julia processes and threads to start-up. Specified as 8:4, this option would start 8 extra Julia processes, with 4 threads each. Defaults to 2 processors and 1 thread each.
  • BENCHMARK_REMOTES: Specifies a colon-separated list of remote servers to connect to and start Julia processes on, using BENCHMARK_PROCS to indicate the processor/thread configuration of those remotes. Disabled by default (uses the local machine).
  • BENCHMARK_OUTPUT_FORMAT: Selects the output format for benchmark results. Defaults to jls, which uses Julia's Serialization stdlib, and can also be jld to use JLD.jl.
  • BENCHMARK_RENDER: Configures rendering, which is disabled by default. Can be "live" or "offline", which are explained below.
  • BENCHMARK: Specifies the set of benchmarks to run as a comma-separated list, where each entry can be one of cpu, cuda, or amdgpu, and may optionally append +dagger (like cuda+dagger) to indicate whether or not to use Dagger. Defaults to cpu,cpu+dagger, which runs CPU benchmarks with and without Dagger.
  • BENCHMARK_SCALE: Determines how much to scale the benchmark sizing by, typically specified as a UnitRange{Int}. Defaults to 1:5:50, which runs each scale from 1 to 50, in steps of 5.

Rendering with BENCHMARK_RENDER

Dagger contains visualization code for the scheduler (as a Gantt chart) and thunk execution profiling (flamechart), which can be enabled with BENCHMARK_RENDER. Additionally, rendering can be done "live", served via a Mux.jl webserver run locally, or "offline", where the visualization will be embedded into the results output file. By default, rendering is disabled. If BENCHMARK_RENDER is set to live, a Mux webserver is started at localhost:8000 (the address is not yet configurable), and the Gantt chart and profiling flamechart will be rendered once the benchmarks start. If set to offline, data visualization will happen in the background, and will be passed in the results file.

Note that Gantt chart and flamechart output is only generated and relevant during Dagger execution.

TODO: Plotting

diff --git a/dev/checkpointing/index.html b/dev/checkpointing/index.html index aed0b546..dc505eb8 100644 --- a/dev/checkpointing/index.html +++ b/dev/checkpointing/index.html @@ -27,4 +27,4 @@ open("checkpoint-final.bin", "r") do io Dagger.tochunk(deserialize(io)) end -end))

In this case, the entire computation will be skipped if checkpoint-final.bin exists!

+end))

In this case, the entire computation will be skipped if checkpoint-final.bin exists!

diff --git a/dev/darray/index.html b/dev/darray/index.html index 350f2469..ab3b7587 100644 --- a/dev/darray/index.html +++ b/dev/darray/index.html @@ -299,4 +299,4 @@ 1.09357 5.34114 0.74001 0.216408 0.342061 1.13217 3.20539 5.18716 5.69556 3.12244 0.347957 0.683841 2.84005 5.40096 3.15125 1.34437 4.90718 5.57208 3.08056 5.40758 3.71409 0.276977 5.06768 4.40841 - 4.75448 0.0475118 4.58058 0.153437 5.34085 1.09899 3.18051 3.13105

A variety of other operations exist on the DArray, and it should generally behave otherwise similar to any other AbstractArray type. If you find that it's missing an operation that you need, please file an issue!

Known Supported Operations

This list is not exhaustive, but documents operations which are known to work well with the DArray:

From Base:

From Random:

From Statistics:

From LinearAlgebra:

+ 4.75448 0.0475118 4.58058 0.153437 5.34085 1.09899 3.18051 3.13105

A variety of other operations exist on the DArray, and it should generally behave otherwise similar to any other AbstractArray type. If you find that it's missing an operation that you need, please file an issue!

Known Supported Operations

This list is not exhaustive, but documents operations which are known to work well with the DArray:

From Base:

From Random:

From Statistics:

From LinearAlgebra:

diff --git a/dev/data-management/index.html b/dev/data-management/index.html index ffdbf493..fda29046 100644 --- a/dev/data-management/index.html +++ b/dev/data-management/index.html @@ -12,4 +12,4 @@ wait.([Dagger.@spawn Threads.atomic_add!(cs, 1) for i in 1:1000]) # And let's fetch the total sum of all counters: -@assert sum(map(ctr->fetch(ctr)[], cs)) == 1000

Note that map, when used on a shard, will execute the provided function once per shard "piece", and each result is considered immutable. map is an easy way to make a copy of each piece of the shard, to be later reduced, scanned, etc.

Further details about what arguments can be passed to @shard/shard can be found in Data Management Functions.

+@assert sum(map(ctr->fetch(ctr)[], cs)) == 1000

Note that map, when used on a shard, will execute the provided function once per shard "piece", and each result is considered immutable. map is an easy way to make a copy of each piece of the shard, to be later reduced, scanned, etc.

Further details about what arguments can be passed to @shard/shard can be found in Data Management Functions.

diff --git a/dev/datadeps/index.html b/dev/datadeps/index.html index fca54caa..fa2d4ca7 100644 --- a/dev/datadeps/index.html +++ b/dev/datadeps/index.html @@ -56,4 +56,4 @@ # This task aliases with the `inc_upper!` task (`UpperTriangular` accesses the diagonal of the array) Dagger.@spawn inc_diag!(Deps(A, InOut(Diagonal))) -end

You can pass any number of aliasing modifiers to Deps. This is particularly useful for declaring aliasing with Diagonal, Bidiagonal, Tridiagonal, and SymTridiagonal access, as these "wrappers" make a copy of their parent array and thus can't be used to "mask" access to the parent like UpperTriangular and UnitLowerTriangular can (which is valuable for writing memory-efficient, generic algorithms in Julia).

+end

You can pass any number of aliasing modifiers to Deps. This is particularly useful for declaring aliasing with Diagonal, Bidiagonal, Tridiagonal, and SymTridiagonal access, as these "wrappers" make a copy of their parent array and thus can't be used to "mask" access to the parent like UpperTriangular and UnitLowerTriangular can (which is valuable for writing memory-efficient, generic algorithms in Julia).

diff --git a/dev/dynamic/index.html b/dev/dynamic/index.html index ee7ff0ad..b062f781 100644 --- a/dev/dynamic/index.html +++ b/dev/dynamic/index.html @@ -9,4 +9,4 @@ y + 1 end return fetch(h, id) -end

Alternatively, Base.wait can be used when one does not wish to retrieve the returned value of the thunk.

Users with needs not covered by the built-in functions should use the Dagger.exec! function to pass a user-defined function, closure, or callable struct to the scheduler, along with a payload which will be provided to that function:

Dagger.exec!

Note that all functions called by Dagger.exec! take the scheduler's internal lock, so it's safe to manipulate the internal ComputeState object within the user-provided function.

+end

Alternatively, Base.wait can be used when one does not wish to retrieve the returned value of the thunk.

Users with needs not covered by the built-in functions should use the Dagger.exec! function to pass a user-defined function, closure, or callable struct to the scheduler, along with a payload which will be provided to that function:

Dagger.exec!

Note that all functions called by Dagger.exec! take the scheduler's internal lock, so it's safe to manipulate the internal ComputeState object within the user-provided function.

diff --git a/dev/external-languages/python/index.html b/dev/external-languages/python/index.html index b6d1811d..8b444c0f 100644 --- a/dev/external-languages/python/index.html +++ b/dev/external-languages/python/index.html @@ -32,4 +32,4 @@ # Fetch the result result2 = daggerjl.fetch(task2) -print(f"The element-wise sum of the last result with itself is: {result2}")

Keep an eye on Dagger and pydaggerjl - new features are soon to come!

+print(f"The element-wise sum of the last result with itself is: {result2}")

Keep an eye on Dagger and pydaggerjl - new features are soon to come!

diff --git a/dev/index.html b/dev/index.html index f86e8a25..21f8ed6d 100644 --- a/dev/index.html +++ b/dev/index.html @@ -89,4 +89,4 @@ Dagger.@spawn copyto!(C, X) -# C = [4,5,6,7,1,2,3,9,8]

In contrast to the previous example, here, the tasks are executed without argument annotations. As a result, there is a possibility of the copyto! task being executed before the sort! task, leading to unexpected results in the output array C.

+# C = [4,5,6,7,1,2,3,9,8]

In contrast to the previous example, here, the tasks are executed without argument annotations. As a result, there is a possibility of the copyto! task being executed before the sort! task, leading to unexpected results in the output array C.

diff --git a/dev/logging-advanced/index.html b/dev/logging-advanced/index.html index 66a464cf..2272450d 100644 --- a/dev/logging-advanced/index.html +++ b/dev/logging-advanced/index.html @@ -9,4 +9,4 @@ t1 = Dagger.@spawn 3*4 fetch(Dagger.@spawn 1+t1) log = Dagger.fetch_logs!(ctx)[1] # Get the logs for worker 1 -@show log[:bytes]
Note

TimespanLogging.get_logs! clears out the event logs, so that old events don't mix with new ones from future DAGs.

You'll then see that some number of bytes are allocated and then freed during the process of executing and completing those tasks.

There are a variety of other consumers built-in to TimespanLogging and Dagger, under the TimespanLogging.Events and Dagger.Events modules, respectively; see Dagger Types and TimespanLogging Types for details.

The MultiEventLog also has a mechanism to call a set of functions, called "aggregators", after all consumers have been executed, and are passed the full set of log streams as a Dict{Symbol,Vector{Any}}. The only one currently shipped with TimespanLogging directly is the LogWindow, and DaggerWebDash.jl has the TableStorage which integrates with it; see DaggerWebDash Types for details.

+@show log[:bytes]
Note

TimespanLogging.get_logs! clears out the event logs, so that old events don't mix with new ones from future DAGs.

You'll then see that some number of bytes are allocated and then freed during the process of executing and completing those tasks.

There are a variety of other consumers built-in to TimespanLogging and Dagger, under the TimespanLogging.Events and Dagger.Events modules, respectively; see Dagger Types and TimespanLogging Types for details.

The MultiEventLog also has a mechanism to call a set of functions, called "aggregators", after all consumers have been executed, and are passed the full set of log streams as a Dict{Symbol,Vector{Any}}. The only one currently shipped with TimespanLogging directly is the LogWindow, and DaggerWebDash.jl has the TableStorage which integrates with it; see DaggerWebDash Types for details.

diff --git a/dev/logging-visualization/index.html b/dev/logging-visualization/index.html index ee5f832e..5b57de31 100644 --- a/dev/logging-visualization/index.html +++ b/dev/logging-visualization/index.html @@ -48,4 +48,4 @@ ml.aggregators[:d3r] = d3r ctx.log_sink = ml -# ... use `ctx`

Once the server has started, you can browse to http://localhost:8080/ (if running on your local machine) to view the plots in real time. The dashboard also provides options at the top of the page to control the drawing speed, enable and disable reading updates from the server (disabling freezes the display at the current instant), and a selector for which worker to look at. If the connection to the server is lost for any reason, the dashboard will attempt to reconnect at 5 second intervals. The dashboard can usually survive restarts of the server perfectly well, although refreshing the page is usually a good idea. Informational messages are also logged to the browser console for debugging.

+# ... use `ctx`

Once the server has started, you can browse to http://localhost:8080/ (if running on your local machine) to view the plots in real time. The dashboard also provides options at the top of the page to control the drawing speed, enable and disable reading updates from the server (disabling freezes the display at the current instant), and a selector for which worker to look at. If the connection to the server is lost for any reason, the dashboard will attempt to reconnect at 5 second intervals. The dashboard can usually survive restarts of the server perfectly well, although refreshing the page is usually a good idea. Informational messages are also logged to the browser console for debugging.

diff --git a/dev/logging/index.html b/dev/logging/index.html index 150502b7..d61f62e0 100644 --- a/dev/logging/index.html +++ b/dev/logging/index.html @@ -1,2 +1,2 @@ -Logging: Basics · Dagger.jl

Logging and Graphing

Dagger's scheduler keeps track of the important and potentially expensive actions it does, such as moving data between workers or executing thunks, and tracks how much time and memory allocations these operations consume, among other things. It does it through the TimespanLogging.jl package (which used to be directly integrated into Dagger). Saving this information somewhere accessible is disabled by default, but it's quite easy to turn it on, through two mechanisms.

The first is Dagger.enable_logging!, which provides an easy-to-use interface to both enable and configure logging. The defaults are usually sufficient for most users, but can be tweaked with keyword arguments.

The second is done by setting a "log sink" in the Dagger Context being used, as ctx.log_sink. These log sinks drive how Dagger's logging behaves, and are configurable by the user, without the need to tweak any of Dagger's internal code.

A variety of log sinks are built-in to TimespanLogging; the NoOpLog is the default log sink when one isn't explicitly specified, and disables logging entirely (to minimize overhead). There are currently two other log sinks of interest; the first and newer of the two is the MultiEventLog, which generates multiple independent log streams, one per "consumer" (details in the next section). This is the log sink that enable_logging! uses, as it's easily the most flexible. The second and older sink is the LocalEventLog, which is explained later in this document. Most users are recommended to use the MultiEventLog (ideally via enable_logging!) since it's far more flexible and extensible, and is more performant in general.

Log sinks are explained in detail in Logging: Advanced; however, if using enable_logging!, everything is already configured for you. Then, all you need to do is call Dagger.fetch_logs!() to get the logs for all workers as a Dict. A variety of tools can operate on these logs, including visualization through show_logs and render_logs.

+Logging: Basics · Dagger.jl

Logging and Graphing

Dagger's scheduler keeps track of the important and potentially expensive actions it does, such as moving data between workers or executing thunks, and tracks how much time and memory allocations these operations consume, among other things. It does it through the TimespanLogging.jl package (which used to be directly integrated into Dagger). Saving this information somewhere accessible is disabled by default, but it's quite easy to turn it on, through two mechanisms.

The first is Dagger.enable_logging!, which provides an easy-to-use interface to both enable and configure logging. The defaults are usually sufficient for most users, but can be tweaked with keyword arguments.

The second is done by setting a "log sink" in the Dagger Context being used, as ctx.log_sink. These log sinks drive how Dagger's logging behaves, and are configurable by the user, without the need to tweak any of Dagger's internal code.

A variety of log sinks are built-in to TimespanLogging; the NoOpLog is the default log sink when one isn't explicitly specified, and disables logging entirely (to minimize overhead). There are currently two other log sinks of interest; the first and newer of the two is the MultiEventLog, which generates multiple independent log streams, one per "consumer" (details in the next section). This is the log sink that enable_logging! uses, as it's easily the most flexible. The second and older sink is the LocalEventLog, which is explained later in this document. Most users are recommended to use the MultiEventLog (ideally via enable_logging!) since it's far more flexible and extensible, and is more performant in general.

Log sinks are explained in detail in Logging: Advanced; however, if using enable_logging!, everything is already configured for you. Then, all you need to do is call Dagger.fetch_logs!() to get the logs for all workers as a Dict. A variety of tools can operate on these logs, including visualization through show_logs and render_logs.

diff --git a/dev/processors/index.html b/dev/processors/index.html index c7e4842c..3d512f7d 100644 --- a/dev/processors/index.html +++ b/dev/processors/index.html @@ -33,4 +33,4 @@ @show fetch(job) |> unique # and cleanup after ourselves... -workers() |> rmprocs +workers() |> rmprocs diff --git a/dev/propagation/index.html b/dev/propagation/index.html index a1cc036e..c05969ce 100644 --- a/dev/propagation/index.html +++ b/dev/propagation/index.html @@ -15,4 +15,4 @@ # Or, if `scope` might not have been propagated as an option, we can give # it a default value: fetch(@async @assert Dagger.get_options(:scope, AnyScope()) == ProcessScope(2)) -end

This is a very powerful concept: with a single call to with_options, we can apply any set of options to any nested set of operations. This is great for isolating large workloads to different workers or processors, defining global checkpoint/restore behavior, and more.

+end

This is a very powerful concept: with a single call to with_options, we can apply any set of options to any nested set of operations. This is great for isolating large workloads to different workers or processors, defining global checkpoint/restore behavior, and more.

diff --git a/dev/scheduler-internals/index.html b/dev/scheduler-internals/index.html index 0ed17700..ef9327e0 100644 --- a/dev/scheduler-internals/index.html +++ b/dev/scheduler-internals/index.html @@ -1,2 +1,2 @@ -Scheduler Internals · Dagger.jl

Scheduler Internals

Dagger's scheduler can be found primarily in the Dagger.Sch module. It performs a variety of functions to support tasks and data, and as such is a complex system. This documentation attempts to shed light on how the scheduler works internally (from a somewhat high level), with the hope that it will help users and contributors understand how to improve the scheduler or fix any bugs that may arise from it.

Warn

Dagger's scheduler is evolving at a rapid pace, and is a complex mix of interacting parts. As such, this documentation may become out of date very quickly, and may not reflect the current state of the scheduler. Please feel free to file PRs to correct or improve this document, but also beware that the true functionality is defined in Dagger's source!

Core vs. Worker Schedulers

Dagger's scheduler is really two kinds of entities: the "core" scheduler, and "worker" schedulers:

The core scheduler runs on worker 1, thread 1, and is the entrypoint to tasks which have been submitted. The core scheduler manages all task dependencies, notifies calls to wait and fetch of task completion, and generally performs initial task placement. The core scheduler has cached information about each worker and their processors, and uses that information (together with metrics about previous tasks and other aspects of the Dagger runtime) to generate a near-optimal just-in-time task schedule.

The worker schedulers each run as a set of tasks across all workers and all processors, and handles data movement and task execution. Once the core scheduler has scheduled and launched a task, it arrives at the worker scheduler for handling. The worker scheduler will pass the task to a queue for the assigned processor, where it will wait until the processor has a sufficient amount of "occupancy" for the task. Once the processor is ready for the task, it will first fetch all of the task's arguments from other workers, and then it will execute the task, package the task's result into a Chunk, and pass that back to the core scheduler.

Core: Basics

The core scheduler contains a single internal instance of type ComputeState, which maintains (among many other things) all necessary state to represent the set of waiting, ready, and running tasks, cached task results, and maps of interdependencies between tasks. It uses Julia's task infrastructure to asynchronously send work requests to remote Julia processes, and uses a RemoteChannel as an inbound queue for completed work.

There is an outer loop which drives the scheduler, which continues executing either eternally (excepting any internal scheduler errors or Julia exiting), or until all tasks in the graph have completed executing and the final task in the graph is ready to be returned to the user. This outer loop continuously performs two main operations: the first is to launch the execution of nodes which have become "ready" to execute; the second is to "finish" nodes which have been completed.

Core: Initialization

At the very beginning of a scheduler's lifecycle, a ComputeState object is allocated, workers are asynchronously initialized, and the outer loop is started. Additionally, the scheduler is passed one or more tasks to start scheduling, and so it will also fill out the ComputeState with the computed sets of dependencies between tasks, initially placing all tasks are placed in the "waiting" state. If any of the tasks are found to only have non-task input arguments, then they are considered ready to execute and moved from the "waiting" state to "ready".

Core: Outer Loop

At each outer loop iteration, all tasks in the "ready" state will be scheduled, moved into the "running" state, and asynchronously sent to the workers for execution (called "firing"). Once all tasks are either waiting or running, the scheduler may sleep until actions need to be performed

When fired tasks have completed executing, an entry will exist in the inbound queue signalling the task's result and other metadata. At this point, the most recently-queued task is removed from the queue, "finished", and placed in the "finished" state. Finishing usually unlocks downstream tasks from the waiting state and allows them to transition to the ready state.

Core: Task Scheduling

Once one or more tasks are ready to be scheduled, the scheduler will begin assigning them to the processors within each available worker. This is a sequential operation consisting of:

  • Selecting candidate processors based on the task's combined scope
  • Calculating the cost to move needed data to each candidate processor
  • Adding a "wait time" cost proportional to the estimated run time for all the tasks currently executing on each candidate processor
  • Selecting the least costly candidate processor as the executor for this task

After these operations have been performed for each task, the tasks will be fired off to their appropriate worker for handling.

Worker: Task Execution

Once a worker receives one or more tasks to be executed, the tasks are immediately enqueued into the appropriate processor's queue, and the processors are notified that work is available to be executed. The processors will asynchronously look at their queues and pick the task with the lowest occupancy first; a task with zero occupancy will always be executed immediately, but most tasks have non-zero occupancy, and so will be executed in order of increasing occupancy (effectively prioritizing asynchronous tasks like I/O).

Before a task begins executions, the processor will collect the task's arguments from other workers as needed, and convert them as needed to execute correctly according to the processor's semantics. This operation is called a "move".

Once a task's arguments have been moved, the task's function will be called with the arguments, and assuming the task doesn't throw an error, the result will be wrapped in a Chunk object. This Chunk will then be sent back to the core scheduler along with information about which task generated it. If the task does throw an error, then the error is instead propagated to the core scheduler, along with a flag indicating that the task failed.

Worker: Workload Balancing

In general, Dagger's core scheduler tries to balance workloads as much as possible across all the available processors, but it can fail to do so effectively when either its cached knowledge of each worker's status is outdated, or when its estimates about the task's behavior are inaccurate. To minimize the possibility of workload imbalance, the worker schedulers' processors will attempt to steal tasks from each other when they are under-occupied. Tasks will only be stolen if the task's scope is compatible with the processor attempting the steal, so tasks with wider scopes have better balancing potential.

Core: Finishing

Finishing a task which has completed executing is generally a simple set of operations:

  • The task's result is registered in the ComputeState for any tasks or user code which will need it
  • Any unneeded data is cleared from the scheduler (such as preserved Chunk arguments)
  • Downstream dependencies will be moved from "waiting" to "ready" if this task was the last upstream dependency to them

Core: Shutdown

If the core scheduler needs to shutdown due to an error or Julia exiting, then all workers will be shutdown, and the scheduler will close any open channels. If shutdown was due to an error, then an error will be printed or thrown back to the caller.

+Scheduler Internals · Dagger.jl

Scheduler Internals

Dagger's scheduler can be found primarily in the Dagger.Sch module. It performs a variety of functions to support tasks and data, and as such is a complex system. This documentation attempts to shed light on how the scheduler works internally (from a somewhat high level), with the hope that it will help users and contributors understand how to improve the scheduler or fix any bugs that may arise from it.

Warn

Dagger's scheduler is evolving at a rapid pace, and is a complex mix of interacting parts. As such, this documentation may become out of date very quickly, and may not reflect the current state of the scheduler. Please feel free to file PRs to correct or improve this document, but also beware that the true functionality is defined in Dagger's source!

Core vs. Worker Schedulers

Dagger's scheduler is really two kinds of entities: the "core" scheduler, and "worker" schedulers:

The core scheduler runs on worker 1, thread 1, and is the entrypoint to tasks which have been submitted. The core scheduler manages all task dependencies, notifies calls to wait and fetch of task completion, and generally performs initial task placement. The core scheduler has cached information about each worker and their processors, and uses that information (together with metrics about previous tasks and other aspects of the Dagger runtime) to generate a near-optimal just-in-time task schedule.

The worker schedulers each run as a set of tasks across all workers and all processors, and handles data movement and task execution. Once the core scheduler has scheduled and launched a task, it arrives at the worker scheduler for handling. The worker scheduler will pass the task to a queue for the assigned processor, where it will wait until the processor has a sufficient amount of "occupancy" for the task. Once the processor is ready for the task, it will first fetch all of the task's arguments from other workers, and then it will execute the task, package the task's result into a Chunk, and pass that back to the core scheduler.

Core: Basics

The core scheduler contains a single internal instance of type ComputeState, which maintains (among many other things) all necessary state to represent the set of waiting, ready, and running tasks, cached task results, and maps of interdependencies between tasks. It uses Julia's task infrastructure to asynchronously send work requests to remote Julia processes, and uses a RemoteChannel as an inbound queue for completed work.

There is an outer loop which drives the scheduler, which continues executing either eternally (excepting any internal scheduler errors or Julia exiting), or until all tasks in the graph have completed executing and the final task in the graph is ready to be returned to the user. This outer loop continuously performs two main operations: the first is to launch the execution of nodes which have become "ready" to execute; the second is to "finish" nodes which have been completed.

Core: Initialization

At the very beginning of a scheduler's lifecycle, a ComputeState object is allocated, workers are asynchronously initialized, and the outer loop is started. Additionally, the scheduler is passed one or more tasks to start scheduling, and so it will also fill out the ComputeState with the computed sets of dependencies between tasks, initially placing all tasks are placed in the "waiting" state. If any of the tasks are found to only have non-task input arguments, then they are considered ready to execute and moved from the "waiting" state to "ready".

Core: Outer Loop

At each outer loop iteration, all tasks in the "ready" state will be scheduled, moved into the "running" state, and asynchronously sent to the workers for execution (called "firing"). Once all tasks are either waiting or running, the scheduler may sleep until actions need to be performed

When fired tasks have completed executing, an entry will exist in the inbound queue signalling the task's result and other metadata. At this point, the most recently-queued task is removed from the queue, "finished", and placed in the "finished" state. Finishing usually unlocks downstream tasks from the waiting state and allows them to transition to the ready state.

Core: Task Scheduling

Once one or more tasks are ready to be scheduled, the scheduler will begin assigning them to the processors within each available worker. This is a sequential operation consisting of:

  • Selecting candidate processors based on the task's combined scope
  • Calculating the cost to move needed data to each candidate processor
  • Adding a "wait time" cost proportional to the estimated run time for all the tasks currently executing on each candidate processor
  • Selecting the least costly candidate processor as the executor for this task

After these operations have been performed for each task, the tasks will be fired off to their appropriate worker for handling.

Worker: Task Execution

Once a worker receives one or more tasks to be executed, the tasks are immediately enqueued into the appropriate processor's queue, and the processors are notified that work is available to be executed. The processors will asynchronously look at their queues and pick the task with the lowest occupancy first; a task with zero occupancy will always be executed immediately, but most tasks have non-zero occupancy, and so will be executed in order of increasing occupancy (effectively prioritizing asynchronous tasks like I/O).

Before a task begins executions, the processor will collect the task's arguments from other workers as needed, and convert them as needed to execute correctly according to the processor's semantics. This operation is called a "move".

Once a task's arguments have been moved, the task's function will be called with the arguments, and assuming the task doesn't throw an error, the result will be wrapped in a Chunk object. This Chunk will then be sent back to the core scheduler along with information about which task generated it. If the task does throw an error, then the error is instead propagated to the core scheduler, along with a flag indicating that the task failed.

Worker: Workload Balancing

In general, Dagger's core scheduler tries to balance workloads as much as possible across all the available processors, but it can fail to do so effectively when either its cached knowledge of each worker's status is outdated, or when its estimates about the task's behavior are inaccurate. To minimize the possibility of workload imbalance, the worker schedulers' processors will attempt to steal tasks from each other when they are under-occupied. Tasks will only be stolen if the task's scope is compatible with the processor attempting the steal, so tasks with wider scopes have better balancing potential.

Core: Finishing

Finishing a task which has completed executing is generally a simple set of operations:

  • The task's result is registered in the ComputeState for any tasks or user code which will need it
  • Any unneeded data is cleared from the scheduler (such as preserved Chunk arguments)
  • Downstream dependencies will be moved from "waiting" to "ready" if this task was the last upstream dependency to them

Core: Shutdown

If the core scheduler needs to shutdown due to an error or Julia exiting, then all workers will be shutdown, and the scheduler will close any open channels. If shutdown was due to an error, then an error will be printed or thrown back to the caller.

diff --git a/dev/scopes/index.html b/dev/scopes/index.html index ade4aba8..7a5b0d38 100644 --- a/dev/scopes/index.html +++ b/dev/scopes/index.html @@ -54,4 +54,4 @@ d2 = Dagger.@spawn generate(ps2) # Run on process 2 d3 = Dagger.@spawn generate(ps3) # Run on process 3 -res = Dagger.@spawn d2 * d3 # An error!

Moral of the story: only use scopes when you know you really need them, and if you aren't careful to arrange everything just right, be prepared for Dagger to refuse to schedule your tasks! Scopes should only be used to ensure correctness of your programs, and are not intended to be used to optimize the schedule that Dagger uses for your tasks, since restricting the scope of execution for tasks will necessarily reduce the optimizations that Dagger's scheduler can perform.

+res = Dagger.@spawn d2 * d3 # An error!

Moral of the story: only use scopes when you know you really need them, and if you aren't careful to arrange everything just right, be prepared for Dagger to refuse to schedule your tasks! Scopes should only be used to ensure correctness of your programs, and are not intended to be used to optimize the schedule that Dagger uses for your tasks, since restricting the scope of execution for tasks will necessarily reduce the optimizations that Dagger's scheduler can perform.

diff --git a/dev/task-queues/index.html b/dev/task-queues/index.html index c6124dd6..18481b18 100644 --- a/dev/task-queues/index.html +++ b/dev/task-queues/index.html @@ -23,4 +23,4 @@ Dagger.@spawn vcopy!(B2, A) end Dagger.@spawn vadd!(C, B1, B2) -end)

Conveniently, Dagger's task queues can be nested to get the expected behavior; the above example will submit the two vcopy! tasks as a group (and they can execute concurrently), while still ensuring that those two tasks finish before the vadd! task executes.

Warn

Task queues do not propagate to nested tasks; if a Dagger task launches another task internally, the child task doesn't inherit the task queue that the parent task was enqueued in.

+end)

Conveniently, Dagger's task queues can be nested to get the expected behavior; the above example will submit the two vcopy! tasks as a group (and they can execute concurrently), while still ensuring that those two tasks finish before the vadd! task executes.

Warn

Task queues do not propagate to nested tasks; if a Dagger task launches another task internally, the child task doesn't inherit the task queue that the parent task was enqueued in.

diff --git a/dev/task-spawning/index.html b/dev/task-spawning/index.html index b529d404..27c99473 100644 --- a/dev/task-spawning/index.html +++ b/dev/task-spawning/index.html @@ -83,4 +83,4 @@ 0.105665 seconds (15.40 k allocations: 1.072 MiB) 0.107495 seconds (28.56 k allocations: 1.940 MiB) 0.109904 seconds (55.03 k allocations: 3.631 MiB) - 0.117239 seconds (87.95 k allocations: 5.372 MiB) + 0.117239 seconds (87.95 k allocations: 5.372 MiB) diff --git a/dev/use-cases/parallel-nested-loops/index.html b/dev/use-cases/parallel-nested-loops/index.html index f4be784b..ab306c4b 100644 --- a/dev/use-cases/parallel-nested-loops/index.html +++ b/dev/use-cases/parallel-nested-loops/index.html @@ -32,4 +32,4 @@ res.z = fetch.(res.z) res.σ = fetch.(res.σ) res -end

In this code we have job interdependence. Firstly, we are calculating the standard deviation σ, and then we are using that value in the function f. Since Dagger.@spawn yields a DTask rather than actual values, we need to use the fetch function to obtain those values. In this example, the value fetching is performed once all computations are completed (note that @sync preceding the loop forces the loop to wait for all jobs to complete). Also, note that contrary to the previous example, we do not need to implement locking as we are just pushing the DTask results of Dagger.@spawn serially into the DataFrame (which is fast since Dagger.@spawn doesn't block).

The above use case scenario has been tested by running julia -t 8 (or with JULIA_NUM_THREADS=8 as environment variable). The Threads.@threads code takes 1.8 seconds to run, while the Dagger code, which is also one line shorter, runs around 0.9 seconds, resulting in a 2x speedup.

Warning

Annotating an inner loop with @sync will block the outer loop from iterating until the inner @sync loop is fully completed, negating some potential parallelism. @sync should only be applied to the outermost loop before a fetch.

+end

In this code we have job interdependence. Firstly, we are calculating the standard deviation σ, and then we are using that value in the function f. Since Dagger.@spawn yields a DTask rather than actual values, we need to use the fetch function to obtain those values. In this example, the value fetching is performed once all computations are completed (note that @sync preceding the loop forces the loop to wait for all jobs to complete). Also, note that contrary to the previous example, we do not need to implement locking as we are just pushing the DTask results of Dagger.@spawn serially into the DataFrame (which is fast since Dagger.@spawn doesn't block).

The above use case scenario has been tested by running julia -t 8 (or with JULIA_NUM_THREADS=8 as environment variable). The Threads.@threads code takes 1.8 seconds to run, while the Dagger code, which is also one line shorter, runs around 0.9 seconds, resulting in a 2x speedup.

Warning

Annotating an inner loop with @sync will block the outer loop from iterating until the inner @sync loop is fully completed, negating some potential parallelism. @sync should only be applied to the outermost loop before a fetch.