Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Allow to add custom traces and use them as metadata #4425

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -323,6 +323,9 @@ class BashWrapperBuilder {
// patch root ownership problem on files created with docker
binding.fix_ownership = fixOwnership() ? "[ \${NXF_OWNER:=''} ] && (shopt -s extglob; GLOBIGNORE='..'; chown -fR --from root \$NXF_OWNER ${workDir}/{*,.*}) || true" : null

binding.custom_trace_collect = getCustomTraceCollect()
binding.custom_trace_write = getCustomTraceWrite()

binding.trace_script = isTraceRequired() ? getTraceScript(binding) : null

return binding
Expand Down Expand Up @@ -428,6 +431,26 @@ class BashWrapperBuilder {
return result
}

private String getCustomTraceCollect() {
if( !customTraces )
return null
String result=''
for( String key : customTraces.keySet() ) {
result += "local custom_${key}=\$(${customTraces.get(key)})"
}
return result
}

private String getCustomTraceWrite() {
if( !customTraces )
return null
String result=''
for( String key : customTraces.keySet() ) {
result += "echo \"custom_${key}=\$custom_${key}\" >> \$trace_file"
}
return result
}

private String getCondaActivateSnippet() {
if( !condaEnv )
return null
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,8 @@ class TaskBean implements Serializable, Cloneable {

List<String> moduleNames

Map<String,String> customTraces

Path workDir

Path targetDir
Expand Down Expand Up @@ -156,6 +158,7 @@ class TaskBean implements Serializable, Cloneable {
this.stageOutMode = task.config.getStageOutMode()

this.resourceLabels = task.config.getResourceLabels()
this.customTraces = task.config.getCustomTraces()
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -206,6 +206,19 @@ class TaskConfig extends LazyMap implements Cloneable {
return get('stageOutMode')
}

Map<String, String> getCustomTraces() {
def value = get('customTraces')
if( value == null )
return null

if( value instanceof Map ) {
//TODO validate key names
return (Map) value
}

throw new IllegalArgumentException("Not a valid `customTraces` value: ${value}")
}

boolean getDebug() {
// check both `debug` and `echo` for backward
// compatibility until `echo` is not removed
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -91,7 +91,8 @@ class ProcessConfig implements Map<String,Object>, Cloneable {
'stdout',
'stageInMode',
'stageOutMode',
'resourceLabels'
'resourceLabels',
'customTraces'
]

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@

package nextflow.script

import nextflow.trace.TraceRecord

import java.nio.file.Path
import java.nio.file.Paths
import java.time.OffsetDateTime
Expand Down Expand Up @@ -207,6 +209,11 @@ class WorkflowMetadata {
*/
Manifest manifest

/**
* The workflow completed traces
*/
List<TraceRecord> traces = []

private Session session

final private List<Closure> onCompleteActions = []
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ class DefaultObserverFactory implements TraceObserverFactory {
createTimelineObserver(result)
createDagObserver(result)
createAnsiLogObserver(result)
createTraceMetadataObserver(result)
return result
}

Expand Down Expand Up @@ -101,4 +102,8 @@ class DefaultObserverFactory implements TraceObserverFactory {
result << observer
}

protected void createTraceMetadataObserver(Collection<TraceObserver> result) {
result << new TraceMetadataObserver(session)
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,64 @@
/*
* Copyright 2013-2023, Seqera Labs
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package nextflow.trace

import groovy.transform.CompileStatic
import groovy.transform.PackageScope
import groovy.util.logging.Slf4j
import groovyx.gpars.agent.Agent
import nextflow.ISession
import nextflow.Session
import nextflow.processor.TaskHandler
import nextflow.processor.TaskId
import nextflow.processor.TaskProcessor
import nextflow.script.WorkflowMetadata

import java.nio.file.Files
import java.nio.file.Path
import java.util.concurrent.ConcurrentHashMap

@Slf4j
@CompileStatic
class TraceMetadataObserver implements TraceObserver {

Session session

TraceMetadataObserver(Session session) {
this.session = session
}

@Override
void onProcessComplete(TaskHandler handler, TraceRecord trace) {
final taskId = handler.task.id
if( !trace ) {
log.debug "[WARN] Unable to find record for task run with id: ${taskId}"
return
}

session.workflowMetadata.traces.add(trace)
}

@Override
void onProcessCached(TaskHandler handler, TraceRecord trace) {
onProcessComplete(handler, trace)
}

@Override
boolean enableMetrics() {
return true
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -242,11 +242,6 @@ class TraceRecord implements Serializable {

Map<String,Object> getStore() { store }

@Memoized
Set<String> keySet() {
FIELDS.keySet()
}

def propertyMissing(String name, value) {
put(name,value)
}
Expand All @@ -261,7 +256,7 @@ class TraceRecord implements Serializable {
}

def get( String name ) {
assert keySet().contains(name), "Not a valid TraceRecord field: '$name'"
assert name.startsWith("custom_") || FIELDS.containsKey(name), "Not a valid TraceRecord field: '$name'"
if( name == 'env' ) {
final ret = store.get(name)
return ret ? secureEnvString(ret.toString()) : ret
Expand All @@ -275,7 +270,7 @@ class TraceRecord implements Serializable {


void put( String name, def value ) {
if( !keySet().contains(name) ) {
if( !name.startsWith('custom_') && !FIELDS.containsKey(name) ) {
log.warn1 "Unknown trace record field: $name"
return
}
Expand Down Expand Up @@ -434,6 +429,12 @@ class TraceRecord implements Serializable {
if( value == null )
continue

// Parse custom traces always as string
if( name.startsWith("custom_") ) {
this.put(name, value)
continue
}

switch (name) {
case '%cpu':
case '%mem':
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -170,6 +170,10 @@ nxf_trace_linux() {
local cpu_time0=$(2> /dev/null < /proc/$pid/stat awk '{printf "%.0f", ($16+$17)*10 }' || echo -n 'X')
local io_stat0=($(2> /dev/null < /proc/$pid/io sed 's/^.*:\s*//' | head -n 6 | tr '\n' ' ' || echo -n '0 0 0 0 0 0'))
local start_millis=$(nxf_date)

## collect custom traces
{{custom_trace_collect}}

## capture error and kill mem watcher
trap 'kill $mem_proc' ERR

Expand Down Expand Up @@ -209,6 +213,7 @@ nxf_trace_linux() {
echo "syscw=${io_stat1[3]}" >> $trace_file
echo "read_bytes=${io_stat1[4]}" >> $trace_file
echo "write_bytes=${io_stat1[5]}" >> $trace_file
{{custom_trace_write}}

## join nxf_mem_watch
[ -e /proc/$mem_proc ] && eval "echo 'DONE' >&$mem_fd" || true
Expand Down