diff --git a/docs/container.md b/docs/container.md index ba8a563828..e9129a5beb 100644 --- a/docs/container.md +++ b/docs/container.md @@ -59,7 +59,7 @@ In the above example replace `/path/to/apptainer.img` with any Apptainer image o Read the {ref}`config-page` page to learn more about the `nextflow.config` file and how to use it to configure your pipeline execution. :::{note} -Unlike Docker, Nextflow does not automatically mount host paths in the container when using Apptainer. It expects that the paths are configure and mounted system wide by the Apptainer runtime. If your Apptainer installation allows user defined bind points, read the {ref}`Apptainer configuration ` section to learn how to enable Nextflow auto mounts. +Unlike Docker, Nextflow does not automatically mount host paths in the container when using Apptainer. It expects that the paths are configured and mounted system wide by the Apptainer runtime. If your Apptainer installation allows user defined bind points, read the {ref}`Apptainer configuration ` section to learn how to enable Nextflow auto mounts. ::: :::{warning} diff --git a/gradle.properties b/gradle.properties index 1e989cc8ed..26137bfbae 100644 --- a/gradle.properties +++ b/gradle.properties @@ -1,2 +1,3 @@ org.gradle.caching=true org.gradle.jvmargs=-Xmx4g +org.gradle.parallel=true diff --git a/modules/nextflow/src/main/groovy/nextflow/Session.groovy b/modules/nextflow/src/main/groovy/nextflow/Session.groovy index 09152bc301..9e5352520b 100644 --- a/modules/nextflow/src/main/groovy/nextflow/Session.groovy +++ b/modules/nextflow/src/main/groovy/nextflow/Session.groovy @@ -16,6 +16,11 @@ package nextflow +import nextflow.data.cid.CidStore +import nextflow.data.cid.CidStoreFactory +import nextflow.data.cid.DefaultCidStore +import nextflow.data.config.DataConfig + import java.nio.file.Files import java.nio.file.Path import java.nio.file.Paths @@ -254,6 +259,14 @@ class Session implements ISession { private boolean statsEnabled + private volatile boolean cidEnabled + + boolean getCidEnabled() { cidEnabled } + + private CidStore cidStore + + CidStore getCidStore() { cidStore } + private WorkflowMetadata workflowMetadata private WorkflowStatsObserver statsObserver @@ -393,6 +406,11 @@ class Session implements ISession { // -- file porter config this.filePorter = new FilePorter(this) + if(config.navigate('workflow.data')) { + this.cidEnabled = true + this.cidStore = CidStoreFactory.create(DataConfig.create(this)) + } + } protected Path cloudCachePath(Map cloudcache, Path workDir) { @@ -439,7 +457,6 @@ class Session implements ISession { binding.setArgs( new ScriptRunner.ArgsList(args) ) cache = CacheFactory.create(uniqueId,runName).open() - return this } diff --git a/modules/nextflow/src/main/groovy/nextflow/cli/CmdCid.groovy b/modules/nextflow/src/main/groovy/nextflow/cli/CmdCid.groovy index d6d242fd6d..3b17ca35b8 100644 --- a/modules/nextflow/src/main/groovy/nextflow/cli/CmdCid.groovy +++ b/modules/nextflow/src/main/groovy/nextflow/cli/CmdCid.groovy @@ -18,9 +18,24 @@ package nextflow.cli import com.beust.jcommander.Parameter +import groovy.json.JsonSlurper +import groovy.transform.Canonical import groovy.transform.CompileStatic +import nextflow.Session +import nextflow.config.ConfigBuilder +import nextflow.dag.MermaidHtmlRenderer +import nextflow.data.cid.CidHistoryFile +import nextflow.data.cid.CidStore +import nextflow.data.cid.model.DataType import nextflow.exception.AbortOperationException import nextflow.plugin.Plugins +import nextflow.ui.TableBuilder + +import java.nio.file.Path +import java.nio.file.Paths + +import static nextflow.data.cid.fs.CidPath.CID_PROT +import static nextflow.data.cid.fs.CidPath.METADATA_FILE /** * @@ -33,13 +48,17 @@ class CmdCid extends CmdBase { interface SubCmd { String getName() - void apply(List result) - void usage(List result) + void apply(List args) + void usage() } private List commands = new ArrayList<>() CmdCid() { + commands << new CmdLog() + commands << new CmdShow() + commands << new CmdLineage() + } @@ -75,4 +94,236 @@ class CmdCid extends CmdBase { msg += " -- Did you mean one of these?\n" + matches.collect { " $it"}.join('\n') throw new AbortOperationException(msg) } + + class CmdLog implements SubCmd { + + @Override + String getName() { + return 'log' + } + + @Override + void apply(List args) { + if (args.size() != 0) { + println("ERROR: Incorrect number of parameters") + usage() + return + } + final config = new ConfigBuilder() + .setOptions(getLauncher().getOptions()) + .setBaseDir(Paths.get('.')) + .build() + final session = new Session(config) + printHistory(session.cidStore) + + } + + private void printHistory(CidStore store) { + + + final historyFile = store.getHistoryFile() + if (historyFile.exists()) { + def table = new TableBuilder(cellSeparator: '\t') + .head('TIMESTAMP') + .head('RUN NAME') + .head('SESSION ID') + .head('RUN CID') + historyFile.eachLine { table.append(CidHistoryFile.CidRecord.parse(it).toList()) } + println table.toString() + } else { + println("No workflow runs CIDs found.") + } + } + + @Override + void usage() { + println 'Usage: nextflow cid log' + } + } + class CmdShow implements SubCmd{ + + @Override + String getName() { + return 'show' + } + + @Override + void apply(List args) { + if (args.size() != 1) { + println("ERROR: Incorrect number of parameters") + usage() + return + } + if (!args[0].startsWith(CID_PROT)) + throw new Exception("Identifier is not a CID URL") + final key = args[0].substring(CID_PROT.size()) + "/$METADATA_FILE" + final config = new ConfigBuilder() + .setOptions(getLauncher().getOptions()) + .setBaseDir(Paths.get('.')) + .build() + final session = new Session(config) + final store = session.cidStore + try { + println store.load(key).toString() + }catch (Throwable e){ + println "Error loading ${args[0]}." + } + } + + @Override + void usage() { + println 'Usage: nextflow cid show ' + } + } + + + class CmdLineage implements SubCmd { + + @Canonical + class Edge { + String source + String destination + String label + } + + @Override + String getName() { 'lineage' } + + @Override + void apply(List args) { + if (args.size() != 2) { + println("ERROR: Incorrect number of parameters") + usage() + return + } + try { + final config = new ConfigBuilder() + .setOptions(getLauncher().getOptions()) + .setBaseDir(Paths.get('.')) + .build() + final session = new Session(config) + final store = session.cidStore + final template = readTemplate() + final network = getLineage(store, args[0]) + Path file = Path.of(args[1]) + file.text = template.replace('REPLACE_WITH_NETWORK_DATA', network) + println("Linage graph for ${args[0]} rendered in ${args[1]}") + } catch (Throwable e) { + println("ERROR: rendering lineage graph. ${e.message}") + } + } + + private String getLineage(CidStore store, String dataCid) { + def lines = [] as List + lines << "flowchart BT".toString() + + final nodesToRender = new LinkedList() + nodesToRender.add(dataCid) + final edgesToRender = new LinkedList() + while (!nodesToRender.isEmpty()) { + final node = nodesToRender.removeFirst() + processNode(lines, node, nodesToRender, edgesToRender, store) + } + lines << "" + edgesToRender.each { lines << " ${it.source} -->${it.destination}".toString() } + lines << "" + return lines.join('\n') + } + + private void processNode(List lines, String nodeToRender, LinkedList nodes, LinkedList edges, CidStore store) { + if (!nodeToRender.startsWith(CID_PROT)) + throw new Exception("Identifier is not a CID URL") + final slurper = new JsonSlurper() + final key = nodeToRender.substring(CID_PROT.size()) + "/$METADATA_FILE" + final cidObject = slurper.parse(store.load(key).toString().toCharArray()) as Map + switch (DataType.valueOf(cidObject.type as String)) { + case DataType.TaskOutput: + case DataType.WorkflowOutput: + lines << " ${nodeToRender}@{shape: document, label: \"${nodeToRender}\"}".toString(); + final source = cidObject.source as String + if (source) { + if (source.startsWith(CID_PROT)) { + nodes.add(source) + edges.add(new Edge(source, nodeToRender)) + } else { + final label = convertToLabel(source) + lines << " ${source}@{shape: document, label: \"${label}\"}".toString(); + edges.add(new Edge(source, nodeToRender)) + } + } + + break; + case DataType.WorkflowRun: + lines << "${nodeToRender}@{shape: processes, label: \"${cidObject.runName}\"}".toString() + final parameters = cidObject.params as List + parameters.each { + final label = convertToLabel(it.value.toString()) + lines << " ${it.value.toString()}@{shape: document, label: \"${label}\"}".toString(); + edges.add(new Edge(it.value.toString(), nodeToRender)) + } + break; + case DataType.TaskRun: + lines << " ${nodeToRender}@{shape: process, label: \"${cidObject.name}\"}".toString() + final parameters = cidObject.inputs as List + for (nextflow.data.cid.model.Parameter source: parameters){ + if (source.type.equals(nextflow.script.params.FileInParam.simpleName)) { + manageFileInParam(lines, nodeToRender, nodes, edges, source.value) + } else { + final label = convertToLabel(source.value.toString()) + lines << " ${source.value.toString()}@{shape: document, label: \"${label}\"}".toString(); + edges.add(new Edge(source.value.toString(), nodeToRender)) + } + } + break; + default: + throw new Exception("Unrecognized type reference ${cidObject.type}") + } + } + + private String convertToLabel(String label){ + return label.replace('http', 'h\u200Ettp') + } + + private void manageFileInParam(List lines, String nodeToRender, LinkedList nodes, LinkedList edges, value){ + if (value instanceof Collection) { + value.each { manageFileInParam(lines, nodeToRender, nodes, edges, it) } + return + } + if (value instanceof CharSequence) { + final source = value.toString() + if (source.startsWith(CID_PROT)) { + nodes.add(source) + edges.add(new Edge(source, nodeToRender)) + return + } + } + if (value instanceof Map) { + if (value.path) { + final label = convertToLabel(value.path.toString()) + lines << " ${value.path}@{shape: document, label: \"${label}\"}".toString(); + edges.add(new Edge(value.path.toString(), nodeToRender)) + return + } + } + final label = convertToLabel(value.toString()) + lines << " ${value.toString()}@{shape: document, label: \"${label}\"}".toString(); + edges.add(new Edge(value.toString(), nodeToRender)) + } + + protected static String readTemplate() { + final writer = new StringWriter() + final res = MermaidHtmlRenderer.class.getResourceAsStream('mermaid.dag.template.html') + int ch + while( (ch=res.read()) != -1 ) { + writer.append(ch as char) + } + writer.toString() + } + + @Override + void usage() { + println 'Usage: nextflow cid lineage ' + } + + } } diff --git a/modules/nextflow/src/main/groovy/nextflow/data/cid/CidHistoryFile.groovy b/modules/nextflow/src/main/groovy/nextflow/data/cid/CidHistoryFile.groovy new file mode 100644 index 0000000000..07b4e24b16 --- /dev/null +++ b/modules/nextflow/src/main/groovy/nextflow/data/cid/CidHistoryFile.groovy @@ -0,0 +1,144 @@ +/* + * Copyright 2013-2024, Seqera Labs + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ +package nextflow.data.cid + +import groovy.transform.EqualsAndHashCode +import groovy.util.logging.Slf4j +import nextflow.util.WithLockFile + +import java.nio.file.Path +import java.text.DateFormat +import java.text.SimpleDateFormat + +/** + * File to store a history of the workflow executions and their corresponding CIDs + * + * @author Jorge Ejarque + */ +@Slf4j +class CidHistoryFile extends WithLockFile { + private static final DateFormat TIMESTAMP_FMT = new SimpleDateFormat('yyyy-MM-dd HH:mm:ss') + + CidHistoryFile(Path file) { + super(file.toString()) + } + + void write(String name, UUID key, String runCid, Date date = null) { + assert key + + withFileLock { + def timestamp = date ?: new Date() + log.debug("Writting record for $key in CID history file $this") + this << new CidRecord(timestamp: timestamp, runName: name, sessionId: key, runCid: runCid).toString() << '\n' + } + } + + void update(UUID sessionId, String runCid) { + assert sessionId + + try { + withFileLock { update0(sessionId, runCid) } + } + catch (Throwable e) { + log.warn "Can't update cid history file: $this", e + } + } + + String getRunCid(UUID id){ + assert id + + for (String line: this.readLines()){ + def current = line ? CidRecord.parse(line) : null + if (current.sessionId == id) { + return current.runCid + } + } + log.warn("Can't find session $id in CID history file $this") + return null + } + + private void update0(UUID id, String runCid) { + assert id + def newHistory = new StringBuilder() + + this.readLines().each { line -> + try { + def current = line ? CidRecord.parse(line) : null + if (current.sessionId == id) { + log.debug("Updating record for $id in CID history file $this") + current.runCid = runCid + newHistory << current.toString() << '\n' + } else { + newHistory << line << '\n' + } + } + catch (IllegalArgumentException e) { + log.warn("Can't read CID history file: $this", e) + } + } + + // rewrite the history content + this.setText(newHistory.toString()) + } + + @EqualsAndHashCode(includes = 'runName,sessionId') + static class CidRecord { + Date timestamp + String runName + UUID sessionId + String runCid + + CidRecord(UUID sessionId, String name = null) { + this.runName = name + this.sessionId = sessionId + } + + protected CidRecord() {} + + List toList() { + def line = new ArrayList(4) + line << (timestamp ? TIMESTAMP_FMT.format(timestamp) : '-') + line << (runName ?: '-') + line << (sessionId.toString()) + line << (runCid ?: '-') + } + + @Override + String toString() { + toList().join('\t') + } + + static CidRecord parse(String line) { + def cols = line.tokenize('\t') + if (cols.size() == 2) + return new CidRecord(UUID.fromString(cols[0])) + + if (cols.size() == 4) { + + return new CidRecord( + timestamp: TIMESTAMP_FMT.parse(cols[0]), + runName: cols[1], + sessionId: UUID.fromString(cols[2]), + runCid: cols[3] + ) + } + + throw new IllegalArgumentException("Not a valid history entry: `$line`") + } + } + +} \ No newline at end of file diff --git a/modules/nextflow/src/main/groovy/nextflow/data/cid/CidObserver.groovy b/modules/nextflow/src/main/groovy/nextflow/data/cid/CidObserver.groovy index 348c9ad1d5..4a52b2ee75 100644 --- a/modules/nextflow/src/main/groovy/nextflow/data/cid/CidObserver.groovy +++ b/modules/nextflow/src/main/groovy/nextflow/data/cid/CidObserver.groovy @@ -17,6 +17,21 @@ package nextflow.data.cid +import groovy.util.logging.Slf4j +import nextflow.data.cid.model.DataPath +import nextflow.data.cid.model.Parameter +import nextflow.data.cid.model.WorkflowResults +import nextflow.data.cid.model.Workflow +import nextflow.data.cid.model.WorkflowRun +import nextflow.file.FileHelper +import nextflow.file.FileHolder +import nextflow.script.ScriptMeta +import nextflow.script.params.DefaultInParam +import nextflow.script.params.FileInParam +import nextflow.script.params.InParam +import nextflow.util.PathNormalizer +import nextflow.util.TestOnly + import java.nio.file.Files import java.nio.file.Path import java.nio.file.attribute.BasicFileAttributes @@ -25,8 +40,7 @@ import groovy.json.JsonOutput import groovy.transform.CompileStatic import nextflow.Session import nextflow.data.cid.model.DataType -import nextflow.data.cid.model.TaskOutput -import nextflow.data.config.DataConfig +import nextflow.data.cid.model.Output import nextflow.processor.TaskHandler import nextflow.processor.TaskRun import nextflow.script.params.FileOutParam @@ -34,74 +48,304 @@ import nextflow.trace.TraceObserver import nextflow.trace.TraceRecord import nextflow.util.CacheHelper +import static nextflow.data.cid.fs.CidPath.CID_PROT +import static nextflow.data.cid.fs.CidPath.METADATA_FILE /** + * Observer to write the generated workflow metadata in a CID store. * * @author Paolo Di Tommaso */ +@Slf4j @CompileStatic class CidObserver implements TraceObserver { + private String executionHash private CidStore store + private Session session + private WorkflowResults workflowResults + private Map outputsStoreDirCid = new HashMap(10) + + CidObserver(Session session){ + this.session = session + this.store = session.cidStore + } @Override void onFlowCreate(Session session) { - store = new DefaultCidStore() - store.open(DataConfig.create(session)) + this.store.getHistoryFile().write(session.runName, session.uniqueId, '-') + } + + @TestOnly + String getExecutionHash(){ executionHash } + + @Override + void onFlowBegin() { + this.executionHash = storeWorkflowRun() + workflowResults = new WorkflowResults( + DataType.WorkflowResults, + "$CID_PROT${executionHash}", + new ArrayList()) + this.store.getHistoryFile().update(session.uniqueId, "${CID_PROT}${this.executionHash}") + } + + @Override + void onFlowComplete(){ + if (this.workflowResults){ + final content = JsonOutput.prettyPrint(JsonOutput.toJson(workflowResults)) + final wfResultsHash = CacheHelper.hasher(content).hash().toString() + this.store.save("${wfResultsHash}/$METADATA_FILE", content) + this.store.getHistoryFile().update(session.uniqueId, "${CID_PROT}${wfResultsHash}") + } + } + + protected String storeWorkflowRun() { + final normalizer = new PathNormalizer(session.workflowMetadata) + final mainScript = new DataPath( + normalizer.normalizePath(session.workflowMetadata.scriptFile.normalize()), + session.workflowMetadata.scriptId + ) + List otherScripts = new LinkedList<>() + for (Path p: ScriptMeta.allScriptNames().values()) { + if (p && p != session.workflowMetadata.scriptFile) { + otherScripts.add(new DataPath(normalizer.normalizePath(p.normalize()), + CacheHelper.hasher(p.text).hash().toString())) + } + } + final workflow = new Workflow( + DataType.Workflow, + mainScript, + otherScripts, + session.workflowMetadata.repository, + session.workflowMetadata.commitId + ) + final value = new WorkflowRun( + DataType.WorkflowRun, + workflow, + session.uniqueId.toString(), + session.runName, + getNormalizedParams(session.params, normalizer) + ) + + final content = JsonOutput.prettyPrint(JsonOutput.toJson(value)) + final executionHash = CacheHelper.hasher(content).hash().toString() + store.save("${executionHash}/$METADATA_FILE", content) + return executionHash + } + + private static List getNormalizedParams(Map params, PathNormalizer normalizer){ + final normalizedParams = new LinkedList() + params.each{String key, Object value -> + if( value instanceof Path ) + normalizedParams.add( new Parameter( Path.class.simpleName, key, normalizer.normalizePath( value as Path ) ) ) + else if ( value instanceof CharSequence ) + normalizedParams.add( new Parameter( String.class.simpleName, key, normalizer.normalizePath( value.toString() ) ) ) + else + normalizedParams.add( new Parameter( value.class.simpleName, key, value) ) + } + return normalizedParams } + @Override void onProcessComplete(TaskHandler handler, TraceRecord trace) { storeTaskInfo(handler.task) } - void storeTaskInfo(TaskRun task) { + protected void storeTaskInfo(TaskRun task) { + final pathNormalizer = new PathNormalizer(session.workflowMetadata) // store the task run entry - storeTaskRun(task) + storeTaskRun(task, pathNormalizer) // store all task outputs files final outputs = task.getOutputsByType(FileOutParam) - for( Map.Entry entry : outputs ) { - final value = entry.value - if( value instanceof Path ) { - storeTaskOutput(task, (Path)value) - } - else if( value instanceof Collection ) { - for( Path it : value ) - storeTaskOutput(task, (Path)it) + outputs.forEach { FileOutParam key, Object value -> manageFileOutParams(value, task)} + + } + + private void manageFileOutParams( Object value, TaskRun task) { + if (value instanceof Path) { + storeTaskOutput(task, (Path) value) + } else if (value instanceof Collection) { + for (Path it : value) { + storeTaskOutput(task, (Path) it) } } } - protected void storeTaskRun(TaskRun task) { + protected String storeTaskRun(TaskRun task, PathNormalizer normalizer) { final value = new nextflow.data.cid.model.TaskRun( - DataType.Task, - task.id.value, + DataType.TaskRun, + session.uniqueId.toString(), task.getName(), - task.hash.toString() ) + CacheHelper.hasher(session.stubRun ? task.stubSource: task.source).hash().toString(), + task.inputs ? manageInputs(task.inputs, normalizer): null, + task.isContainerEnabled() ? task.getContainerFingerprint(): null, + normalizer.normalizePath(task.getCondaEnv()), + normalizer.normalizePath(task.getSpackEnv()), + task.config?.getArchitecture()?.toString(), + task.processor.getTaskGlobalVars(task), + task.processor.getTaskBinEntries(task.source).collect { Path p -> new DataPath(normalizer.normalizePath(p.normalize()), + CacheHelper.hasher(p).hash().toString() )} + ) + // store in the underlying persistence - final key = "${value.hash}/.data.json" + final key = "${task.hash}/$METADATA_FILE" store.save(key, JsonOutput.prettyPrint(JsonOutput.toJson(value))) + return task.hash.toString() } protected void storeTaskOutput(TaskRun task, Path path) { - final attrs = readAttributes(path) - final rel = task.workDir.relativize(path).toString() - final cid = "${task.hash}/${rel}" - final uri = "cid://${cid}" - final key = "${cid}/.data.json" - final hash = CacheHelper.hasher(path).hash().toString() - final value = new TaskOutput( - DataType.Output, - uri, - path.toUriString(), - hash, - attrs.size(), - attrs.creationTime().toMillis(), - attrs.lastModifiedTime().toMillis() ) - // store in the underlying persistence - store.save(key, JsonOutput.prettyPrint(JsonOutput.toJson(value))) + try { + final attrs = readAttributes(path) + final rel = getTaskRelative(task, path) + final cid = "${task.hash}/${rel}" + final key = "${cid}/$METADATA_FILE" + final hash = CacheHelper.hasher(path).hash().toString() + final value = new Output( + DataType.TaskOutput, + path.toString(), + hash, + "$CID_PROT$task.hash", + attrs.size(), + attrs.creationTime().toMillis(), + attrs.lastModifiedTime().toMillis()) + store.save(key, JsonOutput.prettyPrint(JsonOutput.toJson(value))) + } catch (Throwable e) { + log.warn("Exception storing CID output $path for task ${task.name}. ${e.getLocalizedMessage()}") + } + } + + protected String getTaskRelative(TaskRun task, Path path){ + if (path.isAbsolute()) { + final rel = getTaskRelative0(task, path) + if (rel) return rel + throw new Exception("Cannot asses the relative path for output $path of ${task.name}") + } else { + //Check if contains workdir or storeDir + final rel = getTaskRelative0(task, path.toAbsolutePath()) + if (rel) return rel + if (path.normalize().getName(0).toString() == "..") + throw new Exception("Cannot asses the relative path for output $path of ${task.name}" ) + return path.normalize().toString() + } + + } + + private String getTaskRelative0(TaskRun task, Path path){ + final workDirAbsolute = task.workDir.toAbsolutePath() + if (path.startsWith(workDirAbsolute)) { + return workDirAbsolute.relativize(path).toString() + } + //If task output is not in the workDir check if output is stored in the task's storeDir + final storeDir = task.getConfig().getStoreDir().toAbsolutePath() + if( storeDir && path.startsWith(storeDir)) { + final rel = storeDir.relativize(path) + //If output stored in storeDir, keep the path in case it is used as workflow output + this.outputsStoreDirCid.put(path.toString(), "$CID_PROT${task.hash}/$rel".toString()) + return rel + } } protected BasicFileAttributes readAttributes(Path path) { Files.readAttributes(path, BasicFileAttributes) } + + @Override + void onFilePublish(Path destination, Path source){ + try { + final hash = CacheHelper.hasher(destination).hash().toString() + final rel = getWorkflowRelative(destination) + final key = "$executionHash/${rel}/$METADATA_FILE" + final sourceReference = getSourceReference(source) + final attrs = readAttributes(destination) + final value = new Output( + DataType.WorkflowOutput, + destination.toString(), + hash, + sourceReference, + attrs.size(), + attrs.creationTime().toMillis(), + attrs.lastModifiedTime().toMillis()) + store.save(key, JsonOutput.prettyPrint(JsonOutput.toJson(value))) + workflowResults.outputs.add("${CID_PROT}${executionHash}/${rel}") + } catch (Throwable e) { + log.warn("Exception storing CID output $destination for workflow ${executionHash}.", e) + } + } + + String getSourceReference(Path source){ + final hash = FileHelper.getTaskHashFromPath(source, session.workDir) + if (hash) { + final target = FileHelper.getWorkFolder(session.workDir, hash).relativize(source).toString() + return "$CID_PROT$hash/$target" + } else { + final storeDirReference = outputsStoreDirCid.get(source.toString()) + if (storeDirReference) + return "$CID_PROT$storeDirReference" + } + return null + } + + @Override + void onFilePublish(Path destination){ + try { + final hash = CacheHelper.hasher(destination).hash().toString() + final rel = getWorkflowRelative(destination) + final key = "$executionHash/${rel}/$METADATA_FILE" + final attrs = readAttributes(destination) + final value = new Output( + DataType.WorkflowOutput, + destination.toString(), + hash, + "${CID_PROT}${executionHash}".toString(), + attrs.size(), + attrs.creationTime().toMillis(), + attrs.lastModifiedTime().toMillis()) + store.save(key, JsonOutput.prettyPrint(JsonOutput.toJson(value))) + workflowResults.outputs.add("${CID_PROT}${executionHash}/${rel}") + }catch (Throwable e) { + log.warn("Exception storing CID output $destination for workflow ${executionHash}. ${e.getLocalizedMessage()}") + } + } + + protected String getWorkflowRelative(Path path){ + final outputDirAbs = session.outputDir.toAbsolutePath() + if (path.isAbsolute()) { + if (path.startsWith(outputDirAbs)) { + return outputDirAbs.relativize(path).toString() + } else { + throw new Exception("Cannot asses the relative path for workflow output $path") + } + } else { + final pathAbs = path.toAbsolutePath() + if (pathAbs.startsWith(outputDirAbs)) { + return outputDirAbs.relativize(pathAbs).toString() + } + if (path.normalize().getName(0).toString() == "..") + throw new Exception("Cannot asses the relative path for workflow output $path") + return path.normalize().toString() + } + + } + + protected List manageInputs(Map inputs, PathNormalizer normalizer) { + List managedInputs = new LinkedList() + inputs.forEach{ param, value -> + final type = param.class.simpleName + final name = param.name + if( param instanceof FileInParam ) + managedInputs.add( new Parameter( type, name, manageFileInParam( (List)value , normalizer) ) ) + else if( !(param instanceof DefaultInParam) ) + managedInputs.add( new Parameter( type, name, value) ) + } + return managedInputs + } + + private List manageFileInParam(List files, PathNormalizer normalizer){ + final paths = new LinkedList(); + for( FileHolder it : files ) { + final ref = getSourceReference(it.storePath) + paths.add(ref ? ref : new DataPath(normalizer.normalizePath(it.storePath), CacheHelper.hasher(it.storePath).hash().toString())) + } + return paths + } } diff --git a/modules/nextflow/src/main/groovy/nextflow/data/cid/CidStore.groovy b/modules/nextflow/src/main/groovy/nextflow/data/cid/CidStore.groovy index 6591e67b0c..f012c8f130 100644 --- a/modules/nextflow/src/main/groovy/nextflow/data/cid/CidStore.groovy +++ b/modules/nextflow/src/main/groovy/nextflow/data/cid/CidStore.groovy @@ -17,6 +17,7 @@ package nextflow.data.cid +import java.nio.file.Path import java.util.function.Consumer import groovy.transform.CompileStatic @@ -36,4 +37,8 @@ interface CidStore { Object load(String key) + Path getPath() + + CidHistoryFile getHistoryFile() + } diff --git a/modules/nextflow/src/main/groovy/nextflow/data/cid/CidStoreFactory.groovy b/modules/nextflow/src/main/groovy/nextflow/data/cid/CidStoreFactory.groovy new file mode 100644 index 0000000000..f27e3f7602 --- /dev/null +++ b/modules/nextflow/src/main/groovy/nextflow/data/cid/CidStoreFactory.groovy @@ -0,0 +1,49 @@ +/* + * Copyright 2013-2024, Seqera Labs + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package nextflow.data.cid + +import groovy.transform.CompileStatic +import groovy.util.logging.Slf4j +import nextflow.data.config.DataConfig +import nextflow.plugin.Plugins +import org.pf4j.ExtensionPoint + +/** + * Factory for CidStore + * + * @author Jorge Ejarque + */ +@Slf4j +@CompileStatic +abstract class CidStoreFactory implements ExtensionPoint { + + protected abstract CidStore newInstance(DataConfig config) + + static CidStore create(DataConfig config){ + final all = Plugins.getPriorityExtensions(CidStoreFactory) + if( !all ) + throw new IllegalStateException("Unable to find Nextflow CID store factory") + final factory = all.first() + log.debug "Using Nextflow CID store factory: ${factory.getClass().getName()}" + return factory.newInstance(config) + + + } + + +} diff --git a/modules/nextflow/src/main/groovy/nextflow/data/cid/DefaultCidStore.groovy b/modules/nextflow/src/main/groovy/nextflow/data/cid/DefaultCidStore.groovy index ae6faaeceb..9f35052861 100644 --- a/modules/nextflow/src/main/groovy/nextflow/data/cid/DefaultCidStore.groovy +++ b/modules/nextflow/src/main/groovy/nextflow/data/cid/DefaultCidStore.groovy @@ -27,6 +27,7 @@ import nextflow.data.config.DataConfig import nextflow.exception.AbortOperationException /** + * Default Implementation for the a CID store. * * @author Paolo Di Tommaso */ @@ -34,18 +35,21 @@ import nextflow.exception.AbortOperationException @CompileStatic class DefaultCidStore implements CidStore { + private static String HISTORY_FILE_NAME =".history" + private Path metaLocation private Path location void open(DataConfig config) { - location = config.store.location.resolve('.meta') - if( !Files.exists(location) && !Files.createDirectories(location) ) { - throw new AbortOperationException("Unable to create CID store directory: $location") + location = config.store.location + metaLocation = getMetadataPath(config) + if( !Files.exists(metaLocation) && !Files.createDirectories(metaLocation) ) { + throw new AbortOperationException("Unable to create CID store directory: $metaLocation") } } @Override void save(String key, Object value) { - final path = location.resolve(key) + final path = metaLocation.resolve(key) Files.createDirectories(path.parent) log.debug "Save CID file path: $path" path.text = value @@ -53,15 +57,26 @@ class DefaultCidStore implements CidStore { @Override void list(String key, Consumer consumer) { - for( Path it : Files.walk(location.resolve(key)) ) { - final fileKey = location.relativize(it).toString() + for( Path it : Files.walk(metaLocation.resolve(key)) ) { + final fileKey = metaLocation.relativize(it).toString() consumer.accept(fileKey) } } @Override Object load(String key) { - location.resolve(key).text + metaLocation.resolve(key).text } + @Override + Path getPath(){ location } + + @Override + CidHistoryFile getHistoryFile(){ + return new CidHistoryFile(metaLocation.resolve(HISTORY_FILE_NAME)) + } + + static Path getMetadataPath(DataConfig config){ config.store.location.resolve('.meta') } + + } diff --git a/modules/nextflow/src/main/groovy/nextflow/data/cid/DefaultCidStoreFactory.groovy b/modules/nextflow/src/main/groovy/nextflow/data/cid/DefaultCidStoreFactory.groovy new file mode 100644 index 0000000000..df8e9243a4 --- /dev/null +++ b/modules/nextflow/src/main/groovy/nextflow/data/cid/DefaultCidStoreFactory.groovy @@ -0,0 +1,38 @@ +/* + * Copyright 2013-2024, Seqera Labs + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ +package nextflow.data.cid + +import groovy.transform.CompileStatic +import nextflow.data.config.DataConfig +import nextflow.plugin.Priority + +/** + * Default Factory for CidStore + * + * @author Jorge Ejarque + */ +@CompileStatic +@Priority(0) +class DefaultCidStoreFactory extends CidStoreFactory{ + + @Override + protected CidStore newInstance(DataConfig config) { + final cidStore = new DefaultCidStore() + cidStore.open(config) + return cidStore + } +} diff --git a/modules/nextflow/src/main/groovy/nextflow/data/cid/fs/CidFileSystem.groovy b/modules/nextflow/src/main/groovy/nextflow/data/cid/fs/CidFileSystem.groovy new file mode 100644 index 0000000000..d6105624f7 --- /dev/null +++ b/modules/nextflow/src/main/groovy/nextflow/data/cid/fs/CidFileSystem.groovy @@ -0,0 +1,130 @@ +/* + * Copyright 2013-2024, Seqera Labs + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package nextflow.data.cid.fs + +import nextflow.data.cid.DefaultCidStore + +import java.nio.file.FileStore +import java.nio.file.FileSystem +import java.nio.file.Path +import java.nio.file.PathMatcher +import java.nio.file.WatchService +import java.nio.file.attribute.UserPrincipalLookupService +import java.nio.file.spi.FileSystemProvider + +import groovy.transform.CompileStatic +import nextflow.data.config.DataConfig + +/** + * File system for CID Paths + * + * @author Jorge Ejarque + */ +@CompileStatic +class CidFileSystem extends FileSystem { + + private CidFileSystemProvider provider + + private Path basePath + + /* + * Only needed to prevent serialization issues - see https://github.com/nextflow-io/nextflow/issues/5208 + */ + protected CidFileSystem(){} + + CidFileSystem(CidFileSystemProvider provider, DataConfig config) { + this.provider = provider + this.basePath = DefaultCidStore.getMetadataPath(config) + } + + Path getBasePath() { + return basePath + } + + @Override + boolean equals( Object other ) { + if( this.class != other.class ) return false + final that = (CidFileSystem)other + this.provider == that.provider && this.basePath == that.basePath + } + + @Override + int hashCode() { + Objects.hash(provider,basePath) + } + + @Override + FileSystemProvider provider() { + return provider + } + + @Override + void close() throws IOException { + + } + + @Override + boolean isOpen() { + return false + } + + @Override + boolean isReadOnly() { + return true + } + + @Override + String getSeparator() { + return CidPath.SEPARATOR + } + + @Override + Iterable getRootDirectories() { + return null + } + + @Override + Iterable getFileStores() { + return null + } + + @Override + Set supportedFileAttributeViews() { + return null + } + + @Override + Path getPath(String first, String... more) { + return new CidPath(this,first,more) + } + + @Override + PathMatcher getPathMatcher(String syntaxAndPattern) { + throw new UnsupportedOperationException(); + } + + @Override + UserPrincipalLookupService getUserPrincipalLookupService() { + throw new UnsupportedOperationException('User Principal Lookup Service not supported') + } + + @Override + WatchService newWatchService() throws IOException { + throw new UnsupportedOperationException('Watch Service not supported') + } +} diff --git a/modules/nextflow/src/main/groovy/nextflow/data/cid/fs/CidFileSystemProvider.groovy b/modules/nextflow/src/main/groovy/nextflow/data/cid/fs/CidFileSystemProvider.groovy new file mode 100644 index 0000000000..91f24984ec --- /dev/null +++ b/modules/nextflow/src/main/groovy/nextflow/data/cid/fs/CidFileSystemProvider.groovy @@ -0,0 +1,324 @@ +/* + * Copyright 2013-2024, Seqera Labs + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package nextflow.data.cid.fs + +import java.nio.ByteBuffer +import java.nio.channels.SeekableByteChannel +import java.nio.file.AccessDeniedException +import java.nio.file.AccessMode +import java.nio.file.CopyOption +import java.nio.file.DirectoryStream +import java.nio.file.FileStore +import java.nio.file.FileSystem +import java.nio.file.FileSystemNotFoundException +import java.nio.file.LinkOption +import java.nio.file.OpenOption +import java.nio.file.Path +import java.nio.file.ProviderMismatchException +import java.nio.file.StandardOpenOption +import java.nio.file.attribute.BasicFileAttributes +import java.nio.file.attribute.FileAttribute +import java.nio.file.attribute.FileAttributeView +import java.nio.file.spi.FileSystemProvider + +import groovy.transform.CompileStatic +import nextflow.data.config.DataConfig + +/** + * File System Provider for CID Paths + * + * @author Jorge Ejarque + */ +@CompileStatic +class CidFileSystemProvider extends FileSystemProvider { + + public static final String SCHEME = "cid" + + private CidFileSystem fileSystem + + @Override + String getScheme() { + return SCHEME + } + + protected CidPath toCidPath(Path path) { + if (path !instanceof CidPath) + throw new ProviderMismatchException() + return (CidPath) path + } + + private void checkScheme(URI uri) { + final scheme = uri.scheme.toLowerCase() + if( scheme != getScheme() ) + throw new IllegalArgumentException("Not a valid ${getScheme().toUpperCase()} scheme: $scheme") + } + + @Override + synchronized FileSystem newFileSystem(URI uri, Map config) throws IOException { + checkScheme(uri) + if( !fileSystem ) { + //Overwrite default values with provided configuration + final defaultConfig = DataConfig.asMap() + if (config) { + for (Map.Entry e : config.entrySet()) { + defaultConfig.put(e.key, e.value) + } + } + fileSystem = new CidFileSystem(this, new DataConfig(defaultConfig)) + } + return fileSystem + } + + @Override + FileSystem getFileSystem(URI uri) throws FileSystemNotFoundException { + if (!fileSystem) + throw new FileSystemNotFoundException() + return fileSystem + } + + synchronized FileSystem getFileSystemOrCreate(URI uri) { + checkScheme(uri) + if( !fileSystem ) { + fileSystem = (CidFileSystem) newFileSystem(uri, DataConfig.asMap()) + } + return fileSystem + } + + @Override + CidPath getPath(URI uri) { + // the URI authority holds the base component of the CID path + final base = uri.authority + final path = uri.path + return (CidPath) getFileSystemOrCreate(uri).getPath(base, path) + } + + @Override + OutputStream newOutputStream(Path path, OpenOption... options) throws IOException { + throw new UnsupportedOperationException("Write not supported by ${getScheme().toUpperCase()} file system provider") + } + + @Override + InputStream newInputStream(Path path, OpenOption... options) throws IOException { + final cid = toCidPath(path) + final realPath = cid.getTargetPath() + realPath.fileSystem.provider().newInputStream(realPath, options) + } + + @Override + SeekableByteChannel newByteChannel(Path path, Set options, FileAttribute... attrs) throws IOException { + final cid = toCidPath(path) + if (options.size() > 0) { + for (OpenOption opt: options) { + // All OpenOption values except for APPEND and WRITE are allowed + if (opt == StandardOpenOption.APPEND || opt == StandardOpenOption.WRITE) + throw new UnsupportedOperationException("'$opt' not allowed"); + } + } + final realPath = cid.getTargetPath() + final channel = realPath.fileSystem.provider().newByteChannel(realPath, options, attrs) + + new SeekableByteChannel() { + + @Override + int read(ByteBuffer dst) throws IOException { + channel.read(dst) + } + + @Override + int write(ByteBuffer src) throws IOException { + throw new UnsupportedOperationException("Write operation not supported") + } + + @Override + long position() throws IOException { + channel.position() + } + + @Override + SeekableByteChannel position(long newPosition) throws IOException { + throw new UnsupportedOperationException("Position operation not supported") + } + + @Override + long size() throws IOException { + channel.size() + } + + @Override + SeekableByteChannel truncate(long unused) throws IOException { + throw new UnsupportedOperationException("Truncate operation not supported") + } + + @Override + boolean isOpen() { + channel.isOpen() + } + + @Override + void close() throws IOException { + channel.close() + } + } + } + + @Override + DirectoryStream newDirectoryStream(Path path, DirectoryStream.Filter filter) throws IOException { + final cid = toCidPath(path) + final real = cid.getTargetPath() + final stream = real + .getFileSystem() + .provider() + .newDirectoryStream(real, new CidFilter(fileSystem)) + + return new DirectoryStream() { + + @Override + Iterator iterator() { + return new CidIterator(fileSystem, stream.iterator(), cid, real) + } + + @Override + void close() throws IOException { + stream.close() + } + } + } + private class CidFilter implements DirectoryStream.Filter { + + private final CidFileSystem fs + + CidFilter(CidFileSystem fs){ + this.fs = fs + } + + @Override + boolean accept(Path entry) throws IOException { + if( entry.startsWith(fs.getBasePath()) && entry.getFileName().toString() == CidPath.METADATA_FILE ) { + return false + } + return true + } + } + + private static CidPath fromRealToCidPath(Path toConvert, Path realBase, CidPath cidBase){ + final fs = cidBase.fileSystem as CidFileSystem + if (toConvert.startsWith(fs.basePath)) { + return new CidPath(fs, toConvert) + } else { + final relative = realBase.relativize(toConvert) + return (CidPath) cidBase.resolve(relative.toString()) + } + } + + private static class CidIterator implements Iterator { + + private final CidFileSystem fs + private final Iterator target + private final CidPath parent + private final Path parentReal + + CidIterator(CidFileSystem fs, Iterator itr, CidPath parent, Path real) { + this.fs = fs + this.target = itr + this.parent = parent + this.parentReal = real + } + + @Override + boolean hasNext() { + return target.hasNext() + } + + @Override + CidPath next() { + final path = target.next() + return path ? fromRealToCidPath(path, parentReal, parent) : null + } + } + + @Override + void createDirectory(Path dir, FileAttribute... attrs) throws IOException { + throw new UnsupportedOperationException("Create directory not supported by ${getScheme().toUpperCase()} file system provider") + } + + @Override + void delete(Path path) throws IOException { + throw new UnsupportedOperationException("Delete not supported by ${getScheme().toUpperCase()} file system provider") + } + + @Override + void copy(Path source, Path target, CopyOption... options) throws IOException { + throw new UnsupportedOperationException("Copy not supported by ${getScheme().toUpperCase()} file system provider") + } + + @Override + void move(Path source, Path target, CopyOption... options) throws IOException { + throw new UnsupportedOperationException("Move not supported by ${getScheme().toUpperCase()} file system provider") + } + + @Override + boolean isSameFile(Path path, Path path2) throws IOException { + return path == path2 + } + + @Override + boolean isHidden(Path path) throws IOException { + return toCidPath(path).getTargetPath().isHidden() + } + + @Override + FileStore getFileStore(Path path) throws IOException { + throw new UnsupportedOperationException("File store not supported by ${getScheme().toUpperCase()} file system provider") + } + + @Override + void checkAccess(Path path, AccessMode... modes) throws IOException { + final cid = toCidPath(path) + for( AccessMode m : modes ) { + if( m == AccessMode.WRITE ) + throw new AccessDeniedException("Write mode not supported") + if( m == AccessMode.EXECUTE ) + throw new AccessDeniedException("Execute mode not supported") + } + final real = cid.getTargetPath() + real.fileSystem.provider().checkAccess(real, modes) + } + + @Override + V getFileAttributeView(Path path, Class type, LinkOption... options) { + return null + } + + @Override + A readAttributes(Path path, Class type, LinkOption... options) throws IOException { + final cid = toCidPath(path) + final real = cid.getTargetPath() + real.fileSystem.provider().readAttributes(real,type,options) + } + + @Override + Map readAttributes(Path path, String attributes, LinkOption... options) throws IOException { + throw new UnsupportedOperationException("Read file attributes not supported by ${getScheme().toUpperCase()} file system provider") + } + + @Override + void setAttribute(Path path, String attribute, Object value, LinkOption... options) throws IOException { + throw new UnsupportedOperationException("Set file attributes not supported by ${getScheme().toUpperCase()} file system provider") + } + +} diff --git a/modules/nextflow/src/main/groovy/nextflow/data/cid/fs/CidPath.groovy b/modules/nextflow/src/main/groovy/nextflow/data/cid/fs/CidPath.groovy new file mode 100644 index 0000000000..98dadb65ec --- /dev/null +++ b/modules/nextflow/src/main/groovy/nextflow/data/cid/fs/CidPath.groovy @@ -0,0 +1,388 @@ +/* + * Copyright 2013-2024, Seqera Labs + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package nextflow.data.cid.fs + +import groovy.json.JsonSlurper +import groovy.util.logging.Slf4j +import nextflow.data.cid.model.DataType +import nextflow.util.CacheHelper +import nextflow.util.TestOnly + +import static nextflow.data.cid.fs.CidFileSystemProvider.* + +import java.nio.file.FileSystem +import java.nio.file.LinkOption +import java.nio.file.Path +import java.nio.file.ProviderMismatchException +import java.nio.file.WatchEvent +import java.nio.file.WatchKey +import java.nio.file.WatchService + +import groovy.transform.CompileStatic +import nextflow.file.FileHelper + +/** + * CID file system path + * + * @author Jorge Ejarque + */ +@Slf4j +@CompileStatic +class CidPath implements Path { + + static public String SEPARATOR = '/' + public static final String METADATA_FILE = '.data.json' + public static final String CID_PROT = "${SCHEME}://".toString() + + static private String[] EMPTY = new String[] {} + + private CidFileSystem fileSystem + + // Path of the file in the metadata cid store + private Path storePath + + // String with the cid file path + private String filePath + + /* + * Only needed to prevent serialization issues - see https://github.com/nextflow-io/nextflow/issues/5208 + */ + protected CidPath(){} + + protected CidPath(CidFileSystem fs, Path target) { + this.fileSystem = fs + this.storePath = target + this.filePath = filePath0(fs, target) + } + + CidPath(CidFileSystem fs, String path) { + this(fs, path, EMPTY) + } + + CidPath(CidFileSystem fs, String path, String[] more) { + this.fileSystem = fs + this.storePath = resolve0(fs, norm0(path), norm0(more)) + this.filePath = filePath0(fs, storePath) + } + + private static void validateHash(Map cidObject) { + final hashedPath = Path.of(cidObject.path as String) + if( !hashedPath.exists() ) + throw new FileNotFoundException("Target path $cidObject.path does not exists.") + if( cidObject.checksum && CacheHelper.hasher(hashedPath).hash().toString() != cidObject.checksum ) { + log.warn("Checksum of $hashedPath does not match with the one stored in the metadata") + } + } + + @TestOnly + protected String getFilePath(){ this.filePath } + + @TestOnly + protected Path getStorePath(){ this.storePath } + + + /** + * Finds the target path of a CID path + **/ + protected static Path findTarget(Path cidStorePath, CidFileSystem fs, String[] childs=[]){ + assert fs + if( fs.basePath == cidStorePath ) + return null + final metadata = cidStorePath.resolve(METADATA_FILE).toFile() + if ( metadata.exists() ){ + final slurper = new JsonSlurper() + final cidObject = slurper.parse(metadata.text.toCharArray()) as Map + final type = DataType.valueOf(cidObject.type as String) + if( type == DataType.TaskOutput || type == DataType.WorkflowOutput ) { + // return the real path stored in the metadata + validateHash(cidObject) + final realPath = Path.of(cidObject.path as String, childs) + if( !realPath.exists() ) + throw new FileNotFoundException("Target path $realPath for $cidStorePath does not exists.") + return realPath + } + } else { + // If there isn't metadata check the parent to check if it is a subfolder of a task/workflow output + final parent = cidStorePath.getParent() + if( parent) { + ArrayList newChilds = new ArrayList() + newChilds.add(cidStorePath.getFileName().toString()) + newChilds.addAll(childs) + return findTarget(parent, fs, newChilds as String[]) + } + } + return null + } + + private static String filePath0(CidFileSystem fs, Path target) { + if( !fs ) + return target.toString() + return fs.basePath != target + ? fs.basePath.relativize(target).toString() + : SEPARATOR + } + + private static Path resolve0(CidFileSystem fs, String base, String[] more) { + if( !base || base == SEPARATOR ) { + return resolveEmptyPathCase(fs, more as List) + } + if( base.contains(SEPARATOR) ) { + final parts = base.tokenize(SEPARATOR) + final remain = parts[1..-1] + more.toList() + return resolve0(fs, parts[0], remain as String[]) + } + final result = fs ? fs.basePath.resolve(base) : Path.of(base) + return more + ? result.resolve(more.join(SEPARATOR)) + : result + } + + private static Path resolveEmptyPathCase(CidFileSystem fs, List more ){ + switch(more.size()) { + case 0: + return fs ? fs.basePath : Path.of("/") + case 1: + return resolve0(fs, more[0], EMPTY) + default: + return resolve0(fs, more[0], more[1..-1] as String[]) + } + + } + + static private String norm0(String path) { + if( !path ) + return "" + if( path==SEPARATOR ) + return path + //Remove repeated elements + path = Path.of(path).normalize().toString() + //Remove initial and final separators + if( path.startsWith(SEPARATOR) ) + path = path.substring(1) + if( path.endsWith(SEPARATOR) ) + path = path.substring(0,path.size()-1) + return path + } + + static private String[] norm0(String... path) { + for( int i=0; i1 ) + return subpath(0,c-1) + if( c==1 ) + return new CidPath(fileSystem,"/") + return null + } + + @Override + int getNameCount() { + return fileSystem ? storePath.nameCount-fileSystem.basePath.nameCount : storePath.nameCount + } + + @Override + Path getName(int index) { + if( index<0 ) + throw new IllegalArgumentException("Path name index cannot be less than zero - offending value: $index") + final c= fileSystem.basePath.nameCount + return new CidPath(index==0 ? fileSystem : null, storePath.getName(c + index).toString()) + } + + @Override + Path subpath(int beginIndex, int endIndex) { + if( beginIndex<0 ) + throw new IllegalArgumentException("subpath begin index cannot be less than zero - offending value: $beginIndex") + final c= fileSystem.basePath.nameCount + return new CidPath(beginIndex==0 ? fileSystem : null, storePath.subpath(c+beginIndex, c+endIndex).toString()) + } + + @Override + Path normalize() { + return new CidPath(fileSystem, storePath.normalize()) + } + + @Override + boolean startsWith(Path other) { + return startsWith(other.toString()) + } + + @Override + boolean startsWith(String other) { + return storePath.startsWith(fileSystem.basePath.resolve(other)) + } + + @Override + boolean endsWith(Path other) { + return endsWith(other.toString()) + } + + @Override + boolean endsWith(String other) { + return storePath.endsWith(other) + } + + @Override + Path resolve(Path other) { + if( CidPath.class != other.class ) + throw new ProviderMismatchException() + + final that = (CidPath)other + + if( that.fileSystem && this.fileSystem != that.fileSystem ) + return other + if( that.isAbsolute() ) { + return that + } + if( that.storePath ) { + final newPath = this.storePath.resolve(that.storePath) + return new CidPath(fileSystem, newPath) + } + return this + } + + @Override + Path resolve(String path) { + if( !path ) + return this + final scheme = FileHelper.getUrlProtocol(path) + if( !scheme ) { + // consider the path as a cid relative path + return resolve(new CidPath(null,path)) + } + if( scheme != SCHEME ) { + throw new ProviderMismatchException() + } + final that = fileSystem.provider().getPath(asUri(path)) + return resolve(that) + } + + + @Override + Path relativize(Path other) { + if( CidPath.class != other.class ) { + throw new ProviderMismatchException() + } + final path = storePath.relativize(((CidPath) other).storePath) + return new CidPath(null , path.getNameCount()>0 ? path.toString(): SEPARATOR) + } + + @Override + URI toUri() { + asUri("${SCHEME}://${filePath}") + } + + String toUriString() { + return toUri().toString() + } + + @Override + Path toAbsolutePath() { + return this + } + + @Override + Path toRealPath(LinkOption... options) throws IOException { + return getTargetPath() + } + + protected Path getTargetPath(){ + final target = findTarget(storePath, fileSystem) + return target ? target : storePath + } + + @Override + File toFile() throws IOException { + throw new UnsupportedOperationException("toFile not supported by CidPath") + } + + @Override + WatchKey register(WatchService watcher, WatchEvent.Kind[] events, WatchEvent.Modifier... modifiers) throws IOException { + throw new UnsupportedOperationException("Register not supported by CidPath") + } + + @Override + int compareTo(Path other) { + if( CidPath.class != other.class ) + throw new ProviderMismatchException() + final that = other as CidPath + return this.storePath.compareTo(that.storePath) + } + + @Override + boolean equals(Object other) { + if( CidPath.class != other.class ) { + return false + } + final that = (CidPath)other + return this.fileSystem == that.fileSystem && this.storePath.equals(that.storePath) + } + + /** + * @return The unique hash code for this path + */ + @Override + int hashCode() { + return Objects.hash(fileSystem,storePath) + } + + static URI asUri(String path) { + if (!path) + throw new IllegalArgumentException("Missing 'path' argument") + if (!path.startsWith(CID_PROT)) + throw new IllegalArgumentException("Invalid CID file system path URI - it must start with '${CID_PROT}' prefix - offendinf value: $path") + if (path.startsWith(CID_PROT + SEPARATOR) && path.length() > 7) + throw new IllegalArgumentException("Invalid CID file system path URI - make sure the schema prefix does not container more than two slash characters - offending value: $path") + if (path == CID_PROT) //Empty path case + return new URI("") + return new URI(path) + } + + @Override + String toString() { + filePath + } + + +} diff --git a/modules/nextflow/src/main/groovy/nextflow/data/cid/fs/CidPathFactory.groovy b/modules/nextflow/src/main/groovy/nextflow/data/cid/fs/CidPathFactory.groovy new file mode 100644 index 0000000000..a7a365a6f7 --- /dev/null +++ b/modules/nextflow/src/main/groovy/nextflow/data/cid/fs/CidPathFactory.groovy @@ -0,0 +1,61 @@ +/* + * Copyright 2013-2024, Seqera Labs + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package nextflow.data.cid.fs + +import java.nio.file.Path + +import groovy.transform.CompileStatic +import nextflow.data.config.DataConfig +import nextflow.file.FileHelper +import nextflow.file.FileSystemPathFactory + +import static nextflow.data.cid.fs.CidPath.CID_PROT + +/** + * Implements a {@link FileSystemPathFactory} for CID file system + * + * @author Jorge Ejarque + */ +@CompileStatic +class CidPathFactory extends FileSystemPathFactory { + + @Override + protected Path parseUri(String uri) { + return uri.startsWith(CID_PROT) ? create(uri) : null + } + + @Override + protected String toUriString(Path path) { + return path instanceof CidPath ? ((CidPath)path).toUriString() : null + } + + @Override + protected String getBashLib(Path target) { + return null + } + + @Override + protected String getUploadCmd(String source, Path target) { + return null + } + + static CidPath create(String path) { + final uri = CidPath.asUri(path) + return (CidPath) FileHelper.getOrCreateFileSystemFor(uri, DataConfig.asMap()).provider().getPath(uri) + } +} diff --git a/modules/nextflow/src/main/groovy/nextflow/data/cid/model/DataPath.groovy b/modules/nextflow/src/main/groovy/nextflow/data/cid/model/DataPath.groovy new file mode 100644 index 0000000000..18d98f9747 --- /dev/null +++ b/modules/nextflow/src/main/groovy/nextflow/data/cid/model/DataPath.groovy @@ -0,0 +1,33 @@ +/* + * Copyright 2013-2024, Seqera Labs + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package nextflow.data.cid.model + +import groovy.transform.Canonical +import groovy.transform.CompileStatic + +/** + * Models a data path which includes the path and a checksum to validate the content of the path. + * + * @author Jorge Ejarque */ enum DataType { - Task, Workflow, Output + TaskRun, Workflow, WorkflowRun, TaskOutput, WorkflowOutput, WorkflowResults } diff --git a/modules/nextflow/src/main/groovy/nextflow/data/cid/model/TaskOutput.groovy b/modules/nextflow/src/main/groovy/nextflow/data/cid/model/Output.groovy similarity index 92% rename from modules/nextflow/src/main/groovy/nextflow/data/cid/model/TaskOutput.groovy rename to modules/nextflow/src/main/groovy/nextflow/data/cid/model/Output.groovy index 6467d36c6f..610168b129 100644 --- a/modules/nextflow/src/main/groovy/nextflow/data/cid/model/TaskOutput.groovy +++ b/modules/nextflow/src/main/groovy/nextflow/data/cid/model/Output.groovy @@ -26,11 +26,11 @@ import groovy.transform.CompileStatic */ @Canonical @CompileStatic -class TaskOutput { +class Output { DataType type - String uri - String realPath - String hash + String path + String checksum + String source long size long createdAt long modifiedAt diff --git a/modules/nextflow/src/main/groovy/nextflow/data/cid/model/Parameter.groovy b/modules/nextflow/src/main/groovy/nextflow/data/cid/model/Parameter.groovy new file mode 100644 index 0000000000..11cbe4ee9d --- /dev/null +++ b/modules/nextflow/src/main/groovy/nextflow/data/cid/model/Parameter.groovy @@ -0,0 +1,34 @@ +/* + * Copyright 2013-2024, Seqera Labs + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package nextflow.data.cid.model + +import groovy.transform.Canonical +import groovy.transform.CompileStatic + +/** + * Model Workflow and Task Parameters. + * + * @author Jorge Ejarque */ @@ -28,8 +29,15 @@ import groovy.transform.CompileStatic @CompileStatic class TaskRun { DataType type - int id + String sessionId String name - String hash + String code + List inputs + String container + String conda + String spack + String architecture + Map globalVars + List binEntries List annotations } diff --git a/modules/nextflow/src/main/groovy/nextflow/data/cid/model/Workflow.groovy b/modules/nextflow/src/main/groovy/nextflow/data/cid/model/Workflow.groovy new file mode 100644 index 0000000000..c4b8824db4 --- /dev/null +++ b/modules/nextflow/src/main/groovy/nextflow/data/cid/model/Workflow.groovy @@ -0,0 +1,37 @@ +/* + * Copyright 2013-2024, Seqera Labs + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package nextflow.data.cid.model + +import groovy.transform.Canonical +import groovy.transform.CompileStatic + + +/** + * Models a workflow definition. + * + * @author Jorge Ejarque otherScriptFiles + String repository + String commitId +} diff --git a/modules/nextflow/src/main/groovy/nextflow/data/cid/model/WorkflowResults.groovy b/modules/nextflow/src/main/groovy/nextflow/data/cid/model/WorkflowResults.groovy new file mode 100644 index 0000000000..23d6ad179b --- /dev/null +++ b/modules/nextflow/src/main/groovy/nextflow/data/cid/model/WorkflowResults.groovy @@ -0,0 +1,34 @@ +/* + * Copyright 2013-2024, Seqera Labs + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package nextflow.data.cid.model + +import groovy.transform.Canonical +import groovy.transform.CompileStatic + +/** + * Models the results of a workflow execution. + * + * @author Jorge Ejarque params +} diff --git a/modules/nextflow/src/main/groovy/nextflow/data/config/DataConfig.groovy b/modules/nextflow/src/main/groovy/nextflow/data/config/DataConfig.groovy index 7d188ef9a9..467598218e 100644 --- a/modules/nextflow/src/main/groovy/nextflow/data/config/DataConfig.groovy +++ b/modules/nextflow/src/main/groovy/nextflow/data/config/DataConfig.groovy @@ -35,9 +35,13 @@ class DataConfig { this.store = new DataStoreOpts(opts.store as Map ?: Map.of()) } + static Map asMap() { + session?.config?.navigate('workflow.data') as Map ?: new HashMap() + } + static DataConfig create(Session session) { if( session ) { - return new DataConfig(session.config.navigate('workflow.data') as Map ?: Map.of()) + return new DataConfig( session.config.navigate('workflow.data') as Map ?: Map.of()) } else throw new IllegalStateException("Missing Nextflow session") diff --git a/modules/nextflow/src/main/groovy/nextflow/processor/TaskProcessor.groovy b/modules/nextflow/src/main/groovy/nextflow/processor/TaskProcessor.groovy index 05a668c82e..b0bf67aaca 100644 --- a/modules/nextflow/src/main/groovy/nextflow/processor/TaskProcessor.groovy +++ b/modules/nextflow/src/main/groovy/nextflow/processor/TaskProcessor.groovy @@ -2274,7 +2274,7 @@ class TaskProcessor { * @return The list of paths of scripts in the project bin folder referenced in the task command */ @Memoized - protected List getTaskBinEntries(String script) { + public List getTaskBinEntries(String script) { List result = [] def tokenizer = new StringTokenizer(script," \t\n\r\f()[]{};&|<>`") while( tokenizer.hasMoreTokens() ) { @@ -2307,7 +2307,7 @@ class TaskProcessor { log.info(buffer.toString()) } - protected Map getTaskGlobalVars(TaskRun task) { + public Map getTaskGlobalVars(TaskRun task) { final result = task.getGlobalVars(ownerScript.binding) final directives = getTaskExtensionDirectiveVars(task) result.putAll(directives) diff --git a/modules/nextflow/src/main/groovy/nextflow/processor/TaskRun.groovy b/modules/nextflow/src/main/groovy/nextflow/processor/TaskRun.groovy index bde46722f1..df3395d9e8 100644 --- a/modules/nextflow/src/main/groovy/nextflow/processor/TaskRun.groovy +++ b/modules/nextflow/src/main/groovy/nextflow/processor/TaskRun.groovy @@ -979,5 +979,9 @@ class TaskRun implements Cloneable { CondaConfig getCondaConfig() { return processor.session.getCondaConfig() } + + String getStubSource(){ + return config?.getStubBlock()?.source + } } diff --git a/modules/nextflow/src/main/groovy/nextflow/trace/DefaultObserverFactory.groovy b/modules/nextflow/src/main/groovy/nextflow/trace/DefaultObserverFactory.groovy index 4782a2d2f7..dd57c4168d 100644 --- a/modules/nextflow/src/main/groovy/nextflow/trace/DefaultObserverFactory.groovy +++ b/modules/nextflow/src/main/groovy/nextflow/trace/DefaultObserverFactory.groovy @@ -26,12 +26,14 @@ class DefaultObserverFactory implements TraceObserverFactory { createTimelineObserver(result) createDagObserver(result) createAnsiLogObserver(result) - createCidObserver(result) + if( session.cidEnabled ){ + createCidObserver(result) + } return result } protected void createCidObserver(Collection result) { - result.add( new CidObserver() ) + result.add( new CidObserver(this.session) ) } protected void createAnsiLogObserver(Collection result) { diff --git a/modules/nextflow/src/main/groovy/nextflow/util/HistoryFile.groovy b/modules/nextflow/src/main/groovy/nextflow/util/HistoryFile.groovy index 15d5cb83ca..8e3b8cb73c 100644 --- a/modules/nextflow/src/main/groovy/nextflow/util/HistoryFile.groovy +++ b/modules/nextflow/src/main/groovy/nextflow/util/HistoryFile.groovy @@ -33,7 +33,7 @@ import nextflow.exception.AbortOperationException * @author Paolo Di Tommaso */ @Slf4j -class HistoryFile extends File { +class HistoryFile extends WithLockFile { static String defaultFileName() { Const.appCacheDir.resolve('history').toString() } @@ -410,52 +410,7 @@ class HistoryFile extends File { } } - /** - * Apply the given action by using a file lock - * - * @param action The closure implementing the action to be executed with a file lock - * @return The value returned by the action closure - */ - private withFileLock(Closure action) { - - def rnd = new Random() - long ts = System.currentTimeMillis() - String parent = this.parent ?: new File('.').absolutePath - def file = new File(parent, "${this.name}.lock".toString()) - def fos = new FileOutputStream(file) - try { - Throwable error - FileLock lock = null - try { - while( true ) { - lock = fos.getChannel().tryLock() - if( lock ) break - if( System.currentTimeMillis() - ts < 1_000 ) - sleep rnd.nextInt(75) - else { - error = new IllegalStateException("Can't lock file: ${this.absolutePath} -- Nextflow needs to run in a file system that supports file locks") - break - } - } - if( lock ) { - return action.call() - } - } - catch( Exception e ) { - return action.call() - } - finally { - if( lock?.isValid() ) lock.release() - } - - if( error ) throw error - } - finally { - fos.closeQuietly() - file.delete() - } - } Set findAllRunNames() { findAll().findResults{ it.runName } diff --git a/modules/nextflow/src/main/groovy/nextflow/util/PathNormalizer.groovy b/modules/nextflow/src/main/groovy/nextflow/util/PathNormalizer.groovy new file mode 100644 index 0000000000..7da3c5a925 --- /dev/null +++ b/modules/nextflow/src/main/groovy/nextflow/util/PathNormalizer.groovy @@ -0,0 +1,93 @@ +/* + * Copyright 2013-2024, Seqera Labs + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package nextflow.util + +import groovy.transform.CompileStatic +import nextflow.script.WorkflowMetadata + +import java.nio.file.Path + +/** + * + * @author Ben Sherman + */ +@CompileStatic +class PathNormalizer { + + private URL repository + + private String commitId + + private String projectDir + + private String workDir + + PathNormalizer(WorkflowMetadata metadata) { + repository = metadata.repository ? new URL(metadata.repository) : null + commitId = metadata.commitId + projectDir = metadata.projectDir.normalize().toUriString() + workDir = metadata.workDir.normalize().toUriString() + } + + /** + * Normalize paths against the original remote URL, or + * work directory, where appropriate. + * + * @param path + */ + String normalizePath(Path path) { + normalizePath(path.toUriString()) + } + + String normalizePath(String path) { + if(!path) + return null + // replace work directory with relative path + if( path.startsWith(workDir) ) + return path.replace(workDir, 'work') + + // replace project directory with source URL (if applicable) + if( repository && path.startsWith(projectDir) ) + return getProjectSourceUrl(path) + + // encode local absolute paths as file URLs + if( path.startsWith('/') ) + return 'file://' + path + + return path + } + + /** + * Get the source URL for a project asset. + * + * @param path + */ + private String getProjectSourceUrl(String path) { + switch( repository.host ) { + case 'bitbucket.org': + return path.replace(projectDir, "${repository}/src/${commitId}") + case 'github.com': + return path.replace(projectDir, "${repository}/tree/${commitId}") + case 'gitlab.com': + return path.replace(projectDir, "${repository}/-/tree/${commitId}") + default: + return path + } + } + +} diff --git a/modules/nextflow/src/main/groovy/nextflow/util/WithLockFile.groovy b/modules/nextflow/src/main/groovy/nextflow/util/WithLockFile.groovy new file mode 100644 index 0000000000..20f6553bb6 --- /dev/null +++ b/modules/nextflow/src/main/groovy/nextflow/util/WithLockFile.groovy @@ -0,0 +1,78 @@ +/* + * Copyright 2013-2024, Seqera Labs + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package nextflow.util + +import java.nio.channels.FileLock + +/** + * File with a file lock. + * + * @author Jorge Ejarque + */ +class WithLockFile extends File { + + WithLockFile(String filepath){ + super(filepath) + } + + /** + * Apply the given action by using a file lock + * + * @param action The closure implementing the action to be executed with a file lock + * @return The value returned by the action closure + */ + protected withFileLock(Closure action) { + + def rnd = new Random() + long ts = System.currentTimeMillis() + String parent = this.parent ?: new File('.').absolutePath + def file = new File(parent, "${this.name}.lock".toString()) + def fos = new FileOutputStream(file) + try { + Throwable error + FileLock lock = null + + try { + while( true ) { + lock = fos.getChannel().tryLock() + if( lock ) break + if( System.currentTimeMillis() - ts < 1_000 ) + sleep rnd.nextInt(75) + else { + error = new IllegalStateException("Can't lock file: ${this.absolutePath} -- Nextflow needs to run in a file system that supports file locks") + break + } + } + if( lock ) { + return action.call() + } + } + catch( Exception e ) { + return action.call() + } + finally { + if( lock?.isValid() ) lock.release() + } + + if( error ) throw error + } + finally { + fos.closeQuietly() + file.delete() + } + } +} diff --git a/modules/nextflow/src/main/resources/META-INF/extensions.idx b/modules/nextflow/src/main/resources/META-INF/extensions.idx index 7fb037c37d..e7ba19b1ab 100644 --- a/modules/nextflow/src/main/resources/META-INF/extensions.idx +++ b/modules/nextflow/src/main/resources/META-INF/extensions.idx @@ -25,4 +25,5 @@ nextflow.mail.SimpleMailProvider nextflow.mail.JavaMailProvider nextflow.processor.tip.DefaultTaskTipProvider nextflow.fusion.FusionTokenDefault +nextflow.data.cid.DefaultCidStoreFactory diff --git a/modules/nextflow/src/main/resources/META-INF/services/java.nio.file.spi.FileSystemProvider b/modules/nextflow/src/main/resources/META-INF/services/java.nio.file.spi.FileSystemProvider new file mode 100644 index 0000000000..ba80b4b30a --- /dev/null +++ b/modules/nextflow/src/main/resources/META-INF/services/java.nio.file.spi.FileSystemProvider @@ -0,0 +1,17 @@ +# +# Copyright 2013-2024, Seqera Labs +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +nextflow.data.cid.fs.CidFileSystemProvider diff --git a/modules/nextflow/src/main/resources/nextflow/dag/mermaid.dag.template.html b/modules/nextflow/src/main/resources/nextflow/dag/mermaid.dag.template.html index 0ab1d9475e..ebbf8e834a 100644 --- a/modules/nextflow/src/main/resources/nextflow/dag/mermaid.dag.template.html +++ b/modules/nextflow/src/main/resources/nextflow/dag/mermaid.dag.template.html @@ -36,7 +36,7 @@ REPLACE_WITH_NETWORK_DATA diff --git a/modules/nextflow/src/test/groovy/nextflow/cli/CmdCidTest.groovy b/modules/nextflow/src/test/groovy/nextflow/cli/CmdCidTest.groovy new file mode 100644 index 0000000000..774a5cd63a --- /dev/null +++ b/modules/nextflow/src/test/groovy/nextflow/cli/CmdCidTest.groovy @@ -0,0 +1,258 @@ +/* + * Copyright 2013-2024, Seqera Labs + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package nextflow.cli + +import groovy.json.JsonOutput + +import java.nio.file.Files + +import nextflow.data.cid.CidHistoryFile +import nextflow.plugin.Plugins + +import org.junit.Rule +import spock.lang.Specification +import test.OutputCapture + +/** + * CLI cid Tests + * + * @author Jorge Ejarque + */ +class CmdCidTest extends Specification { + + def cleanup() { + Plugins.stop() + } + /* + * Read more http://mrhaki.blogspot.com.es/2015/02/spocklight-capture-and-assert-system.html + */ + @Rule + OutputCapture capture = new OutputCapture() + + def 'should print executions cids' (){ + given: + def folder = Files.createTempDirectory('test') + def configFile = folder.resolve('nextflow.config') + configFile.text = "workflow.data.store.location = '$folder'".toString() + def historyFile = folder.resolve(".meta/.history") + Files.createDirectories(historyFile.parent) + def uniqueId = UUID.randomUUID() + def date = new Date(); + def launcher = Mock(Launcher){ + getOptions() >> new CliOptions(config: [configFile.toString()]) + } + def recordEntry = "${CidHistoryFile.TIMESTAMP_FMT.format(date)}\trun_name\t${uniqueId}\tcid://1234".toString() + historyFile.text = recordEntry + when: + def cidCmd = new CmdCid(launcher: launcher, args: ["log"]) + cidCmd.run() + def stdout = capture + .toString() + .readLines()// remove the log part + .findResults { line -> !line.contains('DEBUG') ? line : null } + .findResults { line -> !line.contains('INFO') ? line : null } + .findResults { line -> !line.contains('plugin') ? line : null } + + then: + stdout.size() == 2 + stdout[1] == recordEntry + + cleanup: + folder?.deleteDir() + } + + def 'should print no history' (){ + given: + def folder = Files.createTempDirectory('test') + def configFile = folder.resolve('nextflow.config') + configFile.text = "workflow.data.store.location = '$folder'".toString() + def historyFile = folder.resolve(".meta/.history") + Files.createDirectories(historyFile.parent) + def launcher = Mock(Launcher){ + getOptions() >> new CliOptions(config: [configFile.toString()]) + } + when: + def cidCmd = new CmdCid(launcher: launcher, args: ["log"]) + cidCmd.run() + def stdout = capture + .toString() + .readLines()// remove the log part + .findResults { line -> !line.contains('DEBUG') ? line : null } + .findResults { line -> !line.contains('INFO') ? line : null } + .findResults { line -> !line.contains('plugin') ? line : null } + + then: + stdout.size() == 1 + stdout[0] == "No workflow runs CIDs found." + + cleanup: + folder?.deleteDir() + } + + def 'should show cid content' (){ + given: + def folder = Files.createTempDirectory('test') + def configFile = folder.resolve('nextflow.config') + configFile.text = "workflow.data.store.location = '$folder'".toString() + def cidFile = folder.resolve(".meta/12345/.data.json") + Files.createDirectories(cidFile.parent) + def launcher = Mock(Launcher){ + getOptions() >> new CliOptions(config: [configFile.toString()]) + } + + def recordEntry = JsonOutput.prettyPrint('{"type":"WorkflowOutput",' + + '"path":"/path/to/file",' + + '"checksum":"45372qe",' + + '"source":"cid://123987/file.bam",' + + '"size": 1234,' + + '"createdAt": 123456789,' + + '"modifiedAt": 123456789,' + + '"annotations":null}') + cidFile.text = recordEntry + when: + def cidCmd = new CmdCid(launcher: launcher, args: ["show", "cid://12345"]) + cidCmd.run() + def stdout = capture + .toString() + .readLines()// remove the log part + .findResults { line -> !line.contains('DEBUG') ? line : null } + .findResults { line -> !line.contains('INFO') ? line : null } + .findResults { line -> !line.contains('plugin') ? line : null } + + then: + stdout.size() == recordEntry.readLines().size() + stdout.join('\n') == recordEntry + + cleanup: + folder?.deleteDir() + } + + def 'should warn if no cid content' (){ + given: + def folder = Files.createTempDirectory('test') + def configFile = folder.resolve('nextflow.config') + configFile.text = "workflow.data.store.location = '$folder'".toString() + def launcher = Mock(Launcher){ + getOptions() >> new CliOptions(config: [configFile.toString()]) + } + + when: + def cidCmd = new CmdCid(launcher: launcher, args: ["show", "cid://12345"]) + cidCmd.run() + def stdout = capture + .toString() + .readLines()// remove the log part + .findResults { line -> !line.contains('DEBUG') ? line : null } + .findResults { line -> !line.contains('INFO') ? line : null } + .findResults { line -> !line.contains('plugin') ? line : null } + + then: + stdout.size() == 1 + stdout[0] == "Error loading cid://12345." + + cleanup: + folder?.deleteDir() + } + + def 'should get lineage cid content' (){ + given: + def folder = Files.createTempDirectory('test') + def configFile = folder.resolve('nextflow.config') + def outputHtml = folder.resolve('lineage.html') + configFile.text = "workflow.data.store.location = '$folder'".toString() + def launcher = Mock(Launcher){ + getOptions() >> new CliOptions(config: [configFile.toString()]) + } + def cidFile = folder.resolve(".meta/12345/file.bam/.data.json") + def cidFile2 = folder.resolve(".meta/123987/file.bam/.data.json") + def cidFile3 = folder.resolve(".meta/123987/.data.json") + def cidFile4 = folder.resolve(".meta/45678/output.txt/.data.json") + def cidFile5 = folder.resolve(".meta/45678/.data.json") + Files.createDirectories(cidFile.parent) + Files.createDirectories(cidFile2.parent) + Files.createDirectories(cidFile3.parent) + Files.createDirectories(cidFile4.parent) + Files.createDirectories(cidFile5.parent) + + def recordEntry = JsonOutput.prettyPrint('{"type":"WorkflowOutput",' + + '"path":"/path/to/file","checksum":"45372qe","source":"cid://123987/file.bam",' + + '"size": 1234,"createdAt": 123456789, "modifiedAt": 123456789,"annotations":null}') + cidFile.text = recordEntry + recordEntry = JsonOutput.prettyPrint('{"type":"TaskOutput",' + + '"path":"/path/to/file","checksum":"45372qe","source":"cid://123987",' + + '"size": 1234,"createdAt": 123456789,"modifiedAt": 123456789,"annotations":null}') + cidFile2.text = recordEntry + recordEntry = JsonOutput.prettyPrint('{"type":"TaskRun",' + + '"sessionId":"u345-2346-1stw2", "name":"foo","code":"abcde2345",' + + '"inputs": [{"type": "ValueInParam","name": "sample_id","value": "ggal_gut"},' + + '{"type": "FileInParam","name": "reads","value": ["cid://45678/output.txt"]}],' + + '"container": null,"conda": null,"spack": null,"architecture": null,' + + '"globalVars": {},"binEntries": [],"annotations":null}') + cidFile3.text = recordEntry + recordEntry = JsonOutput.prettyPrint('{"type":"TaskOutput",' + + '"path":"/path/to/file","checksum":"45372qe","source":"cid://45678",' + + '"size": 1234,"createdAt": 123456789,"modifiedAt": 123456789,"annotations":null}') + cidFile4.text = recordEntry + recordEntry = JsonOutput.prettyPrint('{"type":"TaskRun",' + + '"sessionId":"u345-2346-1stw2", "name":"bar","code":"abfs2556",' + + '"inputs": null,"container": null,"conda": null,"spack": null,"architecture": null,' + + '"globalVars": {},"binEntries": [],"annotations":null}') + cidFile5.text = recordEntry + final network = """flowchart BT + cid://12345/file.bam@{shape: document, label: "cid://12345/file.bam"} + cid://123987/file.bam@{shape: document, label: "cid://123987/file.bam"} + cid://123987@{shape: process, label: "foo"} + ggal_gut@{shape: document, label: "ggal_gut"} + cid://45678/output.txt@{shape: document, label: "cid://45678/output.txt"} + cid://45678@{shape: process, label: "bar"} + + cid://123987/file.bam -->cid://12345/file.bam + cid://123987 -->cid://123987/file.bam + ggal_gut -->cid://123987 + cid://45678/output.txt -->cid://123987 + cid://45678 -->cid://45678/output.txt +""" + final template = CmdCid.CmdLineage.readTemplate() + def expectedOutput = template.replace('REPLACE_WITH_NETWORK_DATA', network) + + when: + def cidCmd = new CmdCid(launcher: launcher, args: ["lineage", "cid://12345/file.bam", outputHtml.toString()]) + cidCmd.run() + def stdout = capture + .toString() + .readLines()// remove the log part + .findResults { line -> !line.contains('DEBUG') ? line : null } + .findResults { line -> !line.contains('INFO') ? line : null } + .findResults { line -> !line.contains('plugin') ? line : null } + + then: + stdout.size() == 1 + stdout[0] == "Linage graph for cid://12345/file.bam rendered in ${outputHtml}" + outputHtml.exists() + outputHtml.text == expectedOutput + + + cleanup: + folder?.deleteDir() + } + + + + + + +} diff --git a/modules/nextflow/src/test/groovy/nextflow/data/cid/CidHistoryFileTest.groovy b/modules/nextflow/src/test/groovy/nextflow/data/cid/CidHistoryFileTest.groovy new file mode 100644 index 0000000000..2b3412466d --- /dev/null +++ b/modules/nextflow/src/test/groovy/nextflow/data/cid/CidHistoryFileTest.groovy @@ -0,0 +1,158 @@ +/* + * Copyright 2013-2024, Seqera Labs + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ +package nextflow.data.cid + +import spock.lang.Specification +import spock.lang.TempDir + +import java.nio.file.Files +import java.nio.file.Path + +/** + * CID History file tests + * + * @author Jorge Ejarque + */ +class CidHistoryFileTest extends Specification { + + @TempDir + Path tempDir + + Path historyFile + CidHistoryFile cidHistoryFile + + def setup() { + historyFile = tempDir.resolve("cid-history.txt") + Files.createFile(historyFile) + cidHistoryFile = new CidHistoryFile(historyFile) + } + + def "write should append a new record to the file"() { + given: + UUID sessionId = UUID.randomUUID() + String runName = "TestRun" + String runCid = "cid://123" + + when: + cidHistoryFile.write(runName, sessionId, runCid) + + then: + def lines = Files.readAllLines(historyFile) + lines.size() == 1 + def parsedRecord = CidHistoryFile.CidRecord.parse(lines[0]) + parsedRecord.sessionId == sessionId + parsedRecord.runName == runName + parsedRecord.runCid == runCid + } + + def "getRunCid should return correct runCid for existing session"() { + given: + UUID sessionId = UUID.randomUUID() + String runName = "Run1" + String runCid = "cid://123" + + and: + cidHistoryFile.write(runName, sessionId, runCid) + + expect: + cidHistoryFile.getRunCid(sessionId) == runCid + } + + def "getRunCid should return null if session does not exist"() { + expect: + cidHistoryFile.getRunCid(UUID.randomUUID()) == null + } + + def "update should modify existing runCid for given session"() { + given: + UUID sessionId = UUID.randomUUID() + String runName = "Run1" + String initialCid = "cid-abc" + String updatedCid = "cid-updated" + + and: + cidHistoryFile.write(runName, sessionId, initialCid) + + when: + cidHistoryFile.update(sessionId, updatedCid) + + then: + def lines = Files.readAllLines(historyFile) + lines.size() == 1 + def parsedRecord = CidHistoryFile.CidRecord.parse(lines[0]) + parsedRecord.runCid == updatedCid + } + + def "update should do nothing if session does not exist"() { + given: + UUID existingSessionId = UUID.randomUUID() + UUID nonExistingSessionId = UUID.randomUUID() + String runName = "Run1" + String runCid = "cid://123" + + and: + cidHistoryFile.write(runName, existingSessionId, runCid) + + when: + cidHistoryFile.update(nonExistingSessionId, "new-cid") + + then: + def lines = Files.readAllLines(historyFile) + lines.size() == 1 + def parsedRecord = CidHistoryFile.CidRecord.parse(lines[0]) + parsedRecord.runCid == runCid + } + + def "CidRecord parse should throw for invalid record"() { + when: + CidHistoryFile.CidRecord.parse("invalid-record") + + then: + thrown(IllegalArgumentException) + } + + def "CidRecord parse should handle 4-column record"() { + given: + def timestamp = new Date() + def formattedTimestamp = CidHistoryFile.TIMESTAMP_FMT.format(timestamp) + def line = "${formattedTimestamp}\trun-1\t${UUID.randomUUID()}\tcid://123" + + when: + def record = CidHistoryFile.CidRecord.parse(line) + + then: + record.timestamp != null + record.runName == "run-1" + record.runCid == "cid://123" + } + + def "CidRecord toString should produce tab-separated format"() { + given: + UUID sessionId = UUID.randomUUID() + def record = new CidHistoryFile.CidRecord(sessionId, "TestRun") + record.timestamp = new Date() + record.runCid = "cid://123" + + when: + def line = record.toString() + + then: + line.contains("\t") + line.split("\t").size() == 4 + } +} + diff --git a/modules/nextflow/src/test/groovy/nextflow/data/cid/CidObserverTest.groovy b/modules/nextflow/src/test/groovy/nextflow/data/cid/CidObserverTest.groovy index fe06bb0bb7..3167350cca 100644 --- a/modules/nextflow/src/test/groovy/nextflow/data/cid/CidObserverTest.groovy +++ b/modules/nextflow/src/test/groovy/nextflow/data/cid/CidObserverTest.groovy @@ -17,41 +17,117 @@ package nextflow.data.cid +import groovy.json.JsonOutput +import nextflow.data.config.DataConfig +import nextflow.processor.TaskConfig +import nextflow.processor.TaskProcessor +import nextflow.script.ScriptBinding +import nextflow.script.WorkflowMetadata +import nextflow.util.CacheHelper +import nextflow.util.PathNormalizer + import java.nio.file.Files +import java.nio.file.Path import java.nio.file.attribute.BasicFileAttributes -import java.nio.file.attribute.FileTime -import java.time.Instant import com.google.common.hash.HashCode import nextflow.Session import nextflow.processor.TaskId import nextflow.processor.TaskRun import spock.lang.Specification + +import static nextflow.data.cid.fs.CidPath.CID_PROT + /** * * @author Paolo Di Tommaso */ class CidObserverTest extends Specification { - def 'should save task run' () { + def 'should save workflow' (){ given: def folder = Files.createTempDirectory('test') def config = [workflow:[data:[store:[location:folder.toString()]]]] - def session = Mock(Session) { getConfig()>>config } - def observer = new CidObserver() + def store = new DefaultCidStore(); + def uniqueId = UUID.randomUUID() + def scriptFile = folder.resolve("main.nf") + def metadata = Mock(WorkflowMetadata){ + getRepository() >> "https://nextflow.io/nf-test/" + getCommitId() >> "123456" + getScriptId() >> "78910" + getScriptFile() >> scriptFile + getProjectDir() >> folder.resolve("projectDir") + getWorkDir() >> folder.resolve("workDir") + } + def session = Mock(Session) { + getConfig() >> config + getCidStore() >> store + getUniqueId() >> uniqueId + getRunName() >> "test_run" + getWorkflowMetadata() >> metadata + getParams() >> new ScriptBinding.ParamsMap() + } + store.open(DataConfig.create(session)) + def observer = new CidObserver(session) + def expectedString = '{"type":"WorkflowRun","workflow":{"type": "Workflow",' + + '"mainScriptFile":{"path":"file://' + scriptFile.toString() + '", "checksum": "78910"},' + + '"otherScriptFiles": [], "repository": "https://nextflow.io/nf-test/",' + + '"commitId": "123456" },' + + '"sessionId": "' + uniqueId + '",' + + '"name": "test_run", "params": []}' + when: observer.onFlowCreate(session) + observer.onFlowBegin() + then: + folder.resolve(".meta/${observer.executionHash}/.data.json").text == JsonOutput.prettyPrint(expectedString) + + cleanup: + folder?.deleteDir() + } + + def 'should save task run' () { + given: + def folder = Files.createTempDirectory('test') + def config = [workflow:[data:[store:[location:folder.toString()]]]] + def store = new DefaultCidStore(); + def uniqueId = UUID.randomUUID() + def session = Mock(Session) { + getConfig()>>config + getCidStore()>>store + getUniqueId()>>uniqueId + getRunName()>>"test_run" + } + store.open(DataConfig.create(session)) + def observer = new CidObserver(session) and: def hash = HashCode.fromInt(123456789) and: + def processor = Mock(TaskProcessor){ + getTaskGlobalVars(_) >> [:] + getTaskBinEntries(_) >> [] + } def task = Mock(TaskRun) { getId() >> TaskId.of(100) getName() >> 'foo' getHash() >> hash + getProcessor() >> processor + getSource() >> 'echo task source' } + def sourceHash =CacheHelper.hasher('echo task source').hash().toString() + def normalizer = Mock(PathNormalizer.class) { + normalizePath( _ as Path) >> {Path p -> p?.toString()} + normalizePath( _ as String) >> {String p -> p} + } + def expectedString = '{"type":"TaskRun",' + + '"sessionId":"'+uniqueId.toString() + '",' + + '"name":"foo","code":"' + sourceHash + '",' + + '"inputs": null,"container": null,"conda": null,' + + '"spack": null,"architecture": null,' + + '"globalVars": {},"binEntries": [],"annotations":null}' when: - observer.storeTaskRun(task) + observer.storeTaskRun(task, normalizer) then: - folder.resolve(hash.toString()).text == '{"id":100,"name":"foo","hash":"15cd5b07","annotations":null}' + folder.resolve(".meta/${hash.toString()}/.data.json").text == JsonOutput.prettyPrint(expectedString) cleanup: folder?.deleteDir() @@ -61,9 +137,13 @@ class CidObserverTest extends Specification { given: def folder = Files.createTempDirectory('test') def config = [workflow:[data:[store:[location:folder.toString()]]]] - def session = Mock(Session) { getConfig()>>config } - def observer = Spy(new CidObserver()) - observer.onFlowCreate(session) + def store = new DefaultCidStore(); + def session = Mock(Session) { + getConfig()>>config + getCidStore()>>store + } + store.open(DataConfig.create(session)) + def observer = Spy(new CidObserver(session)) and: def workDir = folder.resolve('12/34567890') Files.createDirectories(workDir) @@ -71,6 +151,7 @@ class CidObserverTest extends Specification { def outFile = workDir.resolve('foo/bar/file.bam') Files.createDirectories(outFile.parent) outFile.text = 'some data' + def fileHash = CacheHelper.hasher(outFile).hash().toString() and: def hash = HashCode.fromInt(123456789) and: @@ -81,24 +162,228 @@ class CidObserverTest extends Specification { getWorkDir() >> workDir } and: - def ts1 = Instant.ofEpochMilli(1737914400) - def ts2 = Instant.ofEpochMilli(1737914500) - def attrs = Mock(BasicFileAttributes) { - size() >> 100 - creationTime() >> FileTime.from(ts1) - lastModifiedTime() >> FileTime.from(ts2) - } + def attrs = Files.readAttributes(outFile, BasicFileAttributes) + def expectedString = '{"type":"TaskOutput",' + + '"path":"' + outFile.toString() + '",' + + '"checksum":"'+ fileHash + '",' + + '"source":"cid://15cd5b07",' + + '"size":'+attrs.size() + ',' + + '"createdAt":' + attrs.creationTime().toMillis() + ',' + + '"modifiedAt":'+ attrs.lastModifiedTime().toMillis() + ',' + + '"annotations":null}' + and: observer.readAttributes(outFile) >> attrs when: observer.storeTaskOutput(task, outFile) then: - folder.resolve("${hash}/foo/bar/file.bam").text - == '{"uri":"cid://15cd5b07/foo/bar/file.bam","size":100,"createdAt":1737914400,"modifiedAt":1737914500,"annotations":null}' + folder.resolve(".meta/${hash}/foo/bar/file.bam/.data.json").text + == JsonOutput.prettyPrint(expectedString) cleanup: folder?.deleteDir() } + def 'should relativise task output dirs' (){ + when: + def config = [workflow:[data:[store:[location:'cid']]]] + def store = new DefaultCidStore(); + def session = Mock(Session) { + getConfig()>>config + getCidStore()>>store + } + def hash = HashCode.fromInt(123456789) + def taskConfig = Mock(TaskConfig){ + getStoreDir() >> STORE_DIR + } + def task = Mock(TaskRun) { + getId() >> TaskId.of(100) + getName() >> 'foo' + getHash() >> hash + getWorkDir() >> WORK_DIR + getConfig() >> taskConfig + } + store.open(DataConfig.create(session)) + def observer = new CidObserver(session) + then: + observer.getTaskRelative(task, PATH) == EXPECTED + where: + WORK_DIR | STORE_DIR | PATH | EXPECTED + Path.of('/path/to/work/12/3456789') | Path.of('/path/to/storeDir') | Path.of('/path/to/work/12/3456789/relative') | "relative" + Path.of('/path/to/work/12/3456789') | Path.of('/path/to/storeDir') | Path.of('/path/to/storeDir/relative') | "relative" + Path.of('work/12/3456789') | Path.of('storeDir') | Path.of('work/12/3456789/relative') | "relative" + Path.of('work/12/3456789') | Path.of('storeDir') | Path.of('storeDir/relative') | "relative" + Path.of('work/12/3456789') | Path.of('storeDir') | Path.of('results/relative') | "results/relative" + Path.of('/path/to/work/12/3456789') | Path.of('storeDir') | Path.of('./relative') | "relative" + } + + def 'should return exception when relativize task output dirs' (){ + when: + def config = [workflow:[data:[store:[location:'cid']]]] + def store = new DefaultCidStore(); + def session = Mock(Session) { + getConfig()>>config + getCidStore()>>store + } + def hash = HashCode.fromInt(123456789) + def taskConfig = Mock(TaskConfig){ + getStoreDir() >> STORE_DIR + } + def task = Mock(TaskRun) { + getId() >> TaskId.of(100) + getName() >> 'foo' + getHash() >> hash + getWorkDir() >> WORK_DIR + getConfig() >> taskConfig + } + store.open(DataConfig.create(session)) + def observer = new CidObserver(session) + observer.getTaskRelative(task, PATH) + then: + def e = thrown(Exception) + e.message == "Cannot asses the relative path for output $PATH of ${task.name}".toString() + + where: + WORK_DIR | STORE_DIR | PATH + Path.of('/path/to/work/12/3456789') | Path.of('/path/to/storeDir') | Path.of('/another/path/relative') + Path.of('/path/to/work/12/3456789') | Path.of('/path/to/storeDir') | Path.of('../path/to/storeDir/relative') + } + + def 'should relativise workflow output dirs' (){ + when: + def config = [workflow:[data:[store:[location:'cid']]]] + def store = new DefaultCidStore(); + def session = Mock(Session) { + getOutputDir()>>OUTPUT_DIR + getConfig()>>config + getCidStore()>>store + } + store.open(DataConfig.create(session)) + def observer = new CidObserver(session) + then: + observer.getWorkflowRelative(PATH) == EXPECTED + where: + OUTPUT_DIR | PATH | EXPECTED + Path.of('/path/to/outDir') | Path.of('/path/to/outDir/relative') | "relative" + Path.of('outDir') | Path.of('outDir/relative') | "relative" + Path.of('/path/to/outDir') | Path.of('results/relative') | "results/relative" + Path.of('/path/to/outDir') | Path.of('./relative') | "relative" + + + } + + def 'should return exception when relativise workflow output dirs' (){ + when: + def config = [workflow:[data:[store:[location:'cid']]]] + def store = new DefaultCidStore(); + def session = Mock(Session) { + getOutputDir()>>OUTPUT_DIR + getConfig()>>config + getCidStore()>>store + } + def observer = new CidObserver(session) + observer.getWorkflowRelative(PATH) + then: + def e = thrown(Exception) + e.message == "Cannot asses the relative path for workflow output $PATH" + where: + OUTPUT_DIR | PATH | EXPECTED + Path.of('/path/to/outDir') | Path.of('/another/path/') | "relative" + Path.of('/path/to/outDir') | Path.of('../relative') | "relative" + + + } + + def 'should save workflow output' (){ + given: + def folder = Files.createTempDirectory('test') + def config = [workflow:[data:[store:[location:folder.toString()]]]] + def store = new DefaultCidStore(); + def outputDir = folder.resolve('results') + def uniqueId = UUID.randomUUID() + def scriptFile = folder.resolve("main.nf") + def workDir= folder.resolve("work") + def metadata = Mock(WorkflowMetadata){ + getRepository() >> "https://nextflow.io/nf-test/" + getCommitId() >> "123456" + getScriptId() >> "78910" + getScriptFile() >> scriptFile + getProjectDir() >> folder.resolve("projectDir") + getWorkDir() >> workDir + } + def session = Mock(Session) { + getConfig()>>config + getCidStore()>>store + getOutputDir()>>outputDir + getWorkDir() >> workDir + getWorkflowMetadata()>>metadata + getUniqueId()>>uniqueId + getRunName()>>"test_run" + getParams() >> new ScriptBinding.ParamsMap() + } + store.open(DataConfig.create(session)) + def observer = new CidObserver(session) + + when: 'Starting workflow' + observer.onFlowCreate(session) + observer.onFlowBegin() + then: 'History file should contain execution hash' + def cid = store.getHistoryFile().getRunCid(uniqueId).substring(CID_PROT.size()) + cid == observer.executionHash + + when: ' publish output with source file' + def outFile1 = outputDir.resolve('foo/file.bam') + Files.createDirectories(outFile1.parent) + outFile1.text = 'some data1' + def sourceFile1 = workDir.resolve('12/3987/file.bam') + Files.createDirectories(sourceFile1.parent) + sourceFile1.text = 'some data1' + observer.onFilePublish(outFile1, sourceFile1) + then: 'check file 1 output metadata in cid store' + def attrs1 = Files.readAttributes(outFile1, BasicFileAttributes) + def fileHash1 = CacheHelper.hasher(outFile1).hash().toString() + def expectedString1 = '{"type":"WorkflowOutput",' + + '"path":"' + outFile1.toString() + '",' + + '"checksum":"'+ fileHash1 + '",' + + '"source":"cid://123987/file.bam",' + + '"size":'+attrs1.size() + ',' + + '"createdAt":' + attrs1.creationTime().toMillis() + ',' + + '"modifiedAt":'+ attrs1.lastModifiedTime().toMillis() + ',' + + '"annotations":null}' + folder.resolve(".meta/${observer.executionHash}/foo/file.bam/.data.json").text == JsonOutput.prettyPrint(expectedString1) + + when: 'publish without source path' + def outFile2 = outputDir.resolve('foo/file2.bam') + Files.createDirectories(outFile2.parent) + outFile2.text = 'some data2' + def attrs2 = Files.readAttributes(outFile2, BasicFileAttributes) + def fileHash2 = CacheHelper.hasher(outFile2).hash().toString() + observer.onFilePublish(outFile2) + then: 'Check outFile2 metadata in cid store' + def expectedString2 = '{"type":"WorkflowOutput",' + + '"path":"' + outFile2.toString() + '",' + + '"checksum":"'+ fileHash2 + '",' + + '"source":"cid://' + observer.executionHash +'",' + + '"size":'+attrs2.size() + ',' + + '"createdAt":' + attrs2.creationTime().toMillis() + ',' + + '"modifiedAt":'+ attrs2.lastModifiedTime().toMillis() + ',' + + '"annotations":null}' + folder.resolve(".meta/${observer.executionHash}/foo/file2.bam/.data.json").text == JsonOutput.prettyPrint(expectedString2) + + when: 'Workflow complete' + observer.onFlowComplete() + then: 'Check history file is updated and Workflow Result is written in the cid store' + def expectedString3 = '{"type":"WorkflowResults",' + + '"run":"cid://' + observer.executionHash +'",' + + '"outputs": [ "cid://'+ observer.executionHash + '/foo/file.bam",' + + '"cid://'+ observer.executionHash + '/foo/file2.bam" ]}' + def finalCid = store.getHistoryFile().getRunCid(uniqueId).substring(CID_PROT.size()) + finalCid != observer.executionHash + folder.resolve(".meta/${finalCid}/.data.json").text == JsonOutput.prettyPrint(expectedString3) + + cleanup: + folder?.deleteDir() + } + } diff --git a/modules/nextflow/src/test/groovy/nextflow/data/cid/fs/CidFileSystemProviderTest.groovy b/modules/nextflow/src/test/groovy/nextflow/data/cid/fs/CidFileSystemProviderTest.groovy new file mode 100644 index 0000000000..72979f580b --- /dev/null +++ b/modules/nextflow/src/test/groovy/nextflow/data/cid/fs/CidFileSystemProviderTest.groovy @@ -0,0 +1,372 @@ +/* + * Copyright 2013-2024, Seqera Labs + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package nextflow.data.cid.fs + +import spock.lang.Shared + +import java.nio.ByteBuffer +import java.nio.file.FileSystemNotFoundException +import java.nio.file.Files +import java.nio.file.Path +import java.nio.file.ProviderMismatchException +import java.nio.file.StandardOpenOption +import java.nio.file.attribute.BasicFileAttributes + +import nextflow.Global +import nextflow.Session +import spock.lang.Specification + +/** + * CID File system provider tests + * @author Jorge Ejarque + */ +class CidFileSystemProviderTest extends Specification { + + @Shared def wdir = Files.createTempDirectory('wdir') + @Shared def meta = wdir.resolve('.meta') + @Shared def data = wdir.resolve('work') + + def setupSpec(){ + meta.mkdirs() + data.mkdirs() + } + + def cleanupSpec(){ + wdir.deleteDir() + } + + def 'should return cid scheme' () { + given: + def provider = new CidFileSystemProvider() + expect: + provider.getScheme() == 'cid' + } + + def 'should get cid path' () { + given: + def cid = Mock(CidPath) + and: + def provider = new CidFileSystemProvider() + expect: + provider.toCidPath(cid) == cid + + when: + provider.toCidPath(Path.of('foo')) + then: + thrown(ProviderMismatchException) + } + + def 'should create new file system' () { + given: + def provider = new CidFileSystemProvider() + def config = [store:[location:'/data']] + def cid = CidPath.asUri('cid://12345') + when: + def fs = provider.newFileSystem(cid, config) as CidFileSystem + then: + fs.basePath == Path.of('/data/.meta') + } + + def 'should get a file system' () { + given: + def provider = new CidFileSystemProvider() + def config = [store:[location:'/data']] + def uri = CidPath.asUri('cid://12345') + when: + provider.getFileSystem(uri) + then: + thrown(FileSystemNotFoundException) + + when: + provider.newFileSystem(uri, config) as CidFileSystem + and: + def result = provider.getFileSystem(uri) as CidFileSystem + then: + result.basePath == Path.of('/data/.meta') + } + + def 'should get or create a file system' () { + given: + def config = [workflow:[data:[store:[location:'/this/that']]]] + Global.session = Mock(Session) { getConfig()>>config } + and: + def uri = CidPath.asUri('cid://12345') + def provider = new CidFileSystemProvider() + + when: + def fs = provider.getFileSystemOrCreate(uri) as CidFileSystem + then: + fs.basePath == Path.of('/this/that/.meta') + + when: + def fs2 = provider.getFileSystemOrCreate(uri) as CidFileSystem + then: + fs2.is(fs) + } + + def 'should get a path' () { + given: + def config = [workflow:[data:[store:[location:'/data']]]] + Global.session = Mock(Session) { getConfig()>>config } + and: + def provider = new CidFileSystemProvider() + def uri1 = CidPath.asUri('cid://12345') + def uri2 = CidPath.asUri('cid://12345/foo/bar') + + when: + def cid1 = provider.getPath(uri1) + then: + cid1.getTargetPath() == Path.of('/data/.meta/12345') + + when: + def cid2 = provider.getPath(uri2) + then: + cid2.getTargetPath() == Path.of('/data/.meta/12345/foo/bar') + } + + def 'should create new byte channel' () { + given: + def config = [workflow:[data:[store:[location:wdir.toString()]]]] + def outputMeta = meta.resolve("12345/output.txt") + def output = data.resolve("output.txt") + output.text = "Hello, World!" + outputMeta.mkdirs() + outputMeta.resolve(".data.json").text = '{"type":"WorkflowOutput","path":"'+output.toString()+'"}' + + Global.session = Mock(Session) { getConfig()>>config } + and: + def provider = new CidFileSystemProvider() + def cid = provider.getPath(CidPath.asUri('cid://12345/output.txt')) + def opts = Set.of(StandardOpenOption.READ) + when: + def channel = provider.newByteChannel(cid, opts) + and: + def buffer = ByteBuffer.allocate(1000); + def read = channel.read(buffer) + channel.close() + def bytes = new byte[read] + buffer.get(0,bytes) + then: + bytes == "Hello, World!".getBytes() + + cleanup: + outputMeta.deleteDir() + output.delete() + } + + def 'should read cid' () { + given: + def config = [workflow:[data:[store:[location:wdir.toString()]]]] + def outputMeta = meta.resolve("12345/output.txt") + def output = data.resolve("output.txt") + output.text = "Hello, World!" + outputMeta.mkdirs() + outputMeta.resolve(".data.json").text = '{"type":"WorkflowOutput","path":"'+output.toString()+'"}' + + Global.session = Mock(Session) { getConfig()>>config } + and: + def provider = new CidFileSystemProvider() + def cid = provider.getPath(CidPath.asUri('cid://12345/output.txt')) + def opts = Set.of(StandardOpenOption.READ) + + expect: + cid.text == "Hello, World!" + + cleanup: + outputMeta.deleteDir() + output.delete() + } + + def 'should not create a directory' () { + given: + def config = [workflow:[data:[store:[location:'test']]]] + Global.session = Mock(Session) { getConfig()>>config } + and: + def provider = new CidFileSystemProvider() + def cid = provider.getPath(CidPath.asUri('cid://12345')) + + when: + provider.createDirectory(cid) + then: + thrown(UnsupportedOperationException) + + } + + def 'should create directory stream' () { + given: + def output1 = data.resolve('path') + output1.mkdir() + output1.resolve('file1.txt').text = 'file1' + output1.resolve('file2.txt').text = 'file2' + output1.resolve('file3.txt').text = 'file3' + meta.resolve('12345/output1').mkdirs() + meta.resolve('12345/output2').mkdirs() + meta.resolve('12345/.data.json').text = '{"type":"TaskRun"}' + meta.resolve('12345/output1/.data.json').text = '{"type":"TaskOutput", "path": "' + output1.toString() + '"}' + + and: + def config = [workflow:[data:[store:[location:wdir.toString()]]]] + Global.session = Mock(Session) { getConfig()>>config } + and: + def provider = new CidFileSystemProvider() + def cid = provider.getPath(CidPath.asUri('cid://12345/output1')) + def cid2 = provider.getPath(CidPath.asUri('cid://12345')) + + expect: + Files.exists(cid) + Files.exists(cid.resolve('file1.txt')) + Files.exists(cid.resolve('file2.txt')) + Files.exists(cid.resolve('file3.txt')) + + when: + def stream = provider.newDirectoryStream(cid2, (p) -> true) + and: + def result = stream.toList() + then: + result.toSet() == [ + cid2.resolve('output1'), + cid2.resolve('output2'), + ] as Set + + when: + def stream2 = provider.newDirectoryStream(cid, (p) -> true) + and: + def result2 = stream2.toList() + then: + result2.toSet() == [ + cid.resolve('file1.txt'), + cid.resolve('file2.txt'), + cid.resolve('file3.txt') + ] as Set + + } + + def 'should not delete a file' () { + given: + def config = [workflow:[data:[store:[location:'test']]]] + Global.session = Mock(Session) { getConfig()>>config } + and: + def provider = new CidFileSystemProvider() + def cid = provider.getPath(CidPath.asUri('cid://12345')) + + when: + provider.delete(cid) + then: + thrown(UnsupportedOperationException) + + } + + def 'should not copy a file' () { + given: + def config = [workflow:[data:[store:[location:'test']]]] + Global.session = Mock(Session) { getConfig()>>config } + and: + def provider = new CidFileSystemProvider() + def cid1 = provider.getPath(CidPath.asUri('cid://12345/abc')) + def cid2 = provider.getPath(CidPath.asUri('cid://54321/foo')) + + when: + provider.copy(cid1, cid2) + then: + thrown(UnsupportedOperationException) + } + + def 'should not move a file' () { + given: + def config = [workflow:[data:[store:[location:'test']]]] + Global.session = Mock(Session) { getConfig()>>config } + and: + def provider = new CidFileSystemProvider() + def cid1 = provider.getPath(CidPath.asUri('cid://12345/abc')) + def cid2 = provider.getPath(CidPath.asUri('cid://54321/foo')) + + when: + provider.move(cid1, cid2) + then: + thrown(UnsupportedOperationException) + } + + def 'should check is same file' () { + given: + def folder = Files.createTempDirectory('test') + def config = [workflow:[data:[store:[location:folder.toString()]]]] + Global.session = Mock(Session) { getConfig()>>config } + and: + def provider = new CidFileSystemProvider() + def cid1 = provider.getPath(CidPath.asUri('cid://12345/abc')) + def cid2 = provider.getPath(CidPath.asUri('cid://54321/foo')) + def cid3 = provider.getPath(CidPath.asUri('cid://54321/foo')) + + expect: + !provider.isSameFile(cid1, cid2) + !provider.isSameFile(cid1, cid3) + and: + provider.isSameFile(cid2, cid3) + + cleanup: + folder?.deleteDir() + } + + def 'should check is hidden file' () { + given: + def folder = Files.createTempDirectory('test') + def config = [workflow:[data:[store:[location:folder.toString()]]]] + Global.session = Mock(Session) { getConfig()>>config } + and: + def provider = new CidFileSystemProvider() + def cid1 = provider.getPath(CidPath.asUri('cid://12345/abc')) + def cid2 = provider.getPath(CidPath.asUri('cid://54321/.foo')) + + expect: + !provider.isHidden(cid1) + provider.isHidden(cid2) + + cleanup: + folder?.deleteDir() + } + + def 'should read file attributes' () { + given: + def config = [workflow:[data:[store:[location:wdir.toString()]]]] + def file = data.resolve('abc') + file.text = 'Hello' + meta.resolve('12345/abc').mkdirs() + meta.resolve('12345/abc/.data.json').text = '{"type":"TaskOutput", "path": "' + file.toString() + '"}' + Global.session = Mock(Session) { getConfig()>>config } + and: + def provider = new CidFileSystemProvider() + def cid1 = provider.getPath(CidPath.asUri('cid://12345/abc')) + + when: + def attr1 = provider.readAttributes(cid1, BasicFileAttributes) + def real1= Files.readAttributes(file,BasicFileAttributes) + then: + !attr1.directory + attr1.isRegularFile() + attr1.size() == real1.size() + attr1.creationTime() == real1.creationTime() + attr1.lastModifiedTime() == real1.lastModifiedTime() + attr1.lastAccessTime() == real1.lastAccessTime() + + cleanup: + file?.delete() + meta.resolve('12345').deleteDir() + } + +} + diff --git a/modules/nextflow/src/test/groovy/nextflow/data/cid/fs/CidPathTest.groovy b/modules/nextflow/src/test/groovy/nextflow/data/cid/fs/CidPathTest.groovy new file mode 100644 index 0000000000..fc2592d170 --- /dev/null +++ b/modules/nextflow/src/test/groovy/nextflow/data/cid/fs/CidPathTest.groovy @@ -0,0 +1,280 @@ +/* + * Copyright 2013-2024, Seqera Labs + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package nextflow.data.cid.fs + +import java.nio.file.Files +import java.nio.file.Path + +import spock.lang.Shared +import spock.lang.Specification +import spock.lang.Unroll + +/** + * CID Path Tests + * @author Jorge Ejarque + */ +class CidPathTest extends Specification { + + @Shared def BASE = Path.of('/some/base/data') + @Shared def fs = Mock(CidFileSystem){ getBasePath() >> BASE } + @Shared def wdir = Files.createTempDirectory('wdir') + @Shared def cid = wdir.resolve('.meta') + @Shared def data = wdir.resolve('work') + + def cleanupSpec(){ + wdir.deleteDir() + } + + def 'should create correct cid Path' () { + when: + def cid = new CidPath(FS, PATH, MORE) + then: + cid.storePath == EXPECTED_STORE + cid.filePath == EXPECTED_FILE + where: + FS | PATH | MORE | EXPECTED_STORE | EXPECTED_FILE + fs | '/' | [] as String[] | BASE | '/' + null | '/' | [] as String[] | Path.of('/') | '/' + fs | '/' | ['a','b'] as String[] | BASE.resolve('a/b') | 'a/b' + null | '/' | ['a','b'] as String[] | Path.of('a/b') | 'a/b' + fs | '' | [] as String[] | BASE | '/' + null | '' | [] as String[] | Path.of('/') | '/' + fs | '' | ['a','b'] as String[] | BASE.resolve('a/b') | 'a/b' + null | '' | ['a','b'] as String[] | Path.of('a/b') | 'a/b' + fs | '1234' | [] as String[] | BASE.resolve('1234') | '1234' + null | '1234' | [] as String[] | Path.of('1234') | '1234' + fs | '1234' | ['a','b'] as String[] | BASE.resolve('1234/a/b') | '1234/a/b' + null | '1234' | ['a','b'] as String[] | Path.of('1234/a/b') | '1234/a/b' + fs | '1234/c' | [] as String[] | BASE.resolve('1234/c') | '1234/c' + null | '1234/c' | [] as String[] | Path.of('1234/c') | '1234/c' + fs | '1234/c' | ['a','b'] as String[] | BASE.resolve('1234/c/a/b') | '1234/c/a/b' + null | '1234/c' | ['a','b'] as String[] | Path.of('1234/c/a/b') | '1234/c/a/b' + fs | '/1234/c' | [] as String[] | BASE.resolve('1234/c') | '1234/c' + null | '/1234/c' | [] as String[] | Path.of('1234/c') | '1234/c' + fs | '/1234/c' | ['a','b'] as String[] | BASE.resolve('1234/c/a/b') | '1234/c/a/b' + null | '/1234/c' | ['a','b'] as String[] | Path.of('1234/c/a/b') | '1234/c/a/b' + } + + def 'should get target path' () { + given: + def output1 = data.resolve('output') + output1.resolve('some/path').mkdirs() + output1.resolve('some/path/file1.txt').text = "this is file1" + def output2 = data.resolve('file2.txt') + output2.text = "this is file2" + def cidFs = Mock(CidFileSystem){ getBasePath() >> cid } + cid.resolve('12345/output1').mkdirs() + cid.resolve('12345/path/to/file2.txt').mkdirs() + cid.resolve('12345/.data.json').text = '{"type":"TaskRun"}' + cid.resolve('12345/output1/.data.json').text = '{"type":"TaskOutput", "path": "' + output1.toString() + '"}' + cid.resolve('12345/path/to/file2.txt/.data.json').text = '{"type":"TaskOutput", "path": "' + output2.toString() + '"}' + + expect: + new CidPath(cidFs, PATH).getTargetPath() == EXPECTED + where: + PATH | EXPECTED + '/' | cid + '12345' | cid.resolve('12345') + '12345/output1' | data.resolve('output') + '12345/output1/some/path' | data.resolve('output/some/path') + '12345/path/to/' | cid.resolve('12345/path/to/') + '12345/path/to/file2.txt/' | data.resolve('file2.txt') + } + + def 'should get file name' () { + when: + def cid1 = new CidPath(fs, '1234567890/this/file.bam') + then: + cid1.getFileName() == new CidPath(null, 'file.bam') + } + + def 'should get file parent' () { + when: + def cid1 = new CidPath(fs, '1234567890/this/file.bam') + then: + cid1.getParent() == new CidPath(fs, '1234567890/this') + cid1.getParent().getParent() == new CidPath(fs, '1234567890') + cid1.getParent().getParent().getParent() == new CidPath(fs, "/") + cid1.getParent().getParent().getParent().getParent() == null + } + + @Unroll + def 'should get name count' () { + expect: + new CidPath(fs, PATH).getNameCount() == EXPECTED + where: + PATH | EXPECTED + '/' | 0 + '123' | 1 + '123/a' | 2 + '123/a/' | 2 + '123/a/b' | 3 + '' | 0 + } + + @Unroll + def 'should get name by index' () { + expect: + new CidPath(fs, PATH).getName(INDEX) == EXPECTED + where: + PATH | INDEX | EXPECTED + '123' | 0 | new CidPath(fs, '123') + '123/a' | 1 | new CidPath(null, 'a') + '123/a/' | 1 | new CidPath(null, 'a') + '123/a/b' | 2 | new CidPath(null, 'b') + } + + @Unroll + def 'should get subpath' () { + expect: + new CidPath(fs, PATH).subpath(BEGIN,END) == EXPECTED + where: + PATH | BEGIN | END | EXPECTED + '123' | 0 | 1 | new CidPath(fs, '123') + '123/a' | 0 | 2 | new CidPath(fs, '123/a') + '123/a/' | 0 | 2 | new CidPath(fs, '123/a') + '123/a' | 1 | 2 | new CidPath(null, 'a') + '123/a/' | 1 | 2 | new CidPath(null, 'a') + '123/a/b' | 2 | 3 | new CidPath(null, 'b') + '123/a/b' | 1 | 3 | new CidPath(null, 'a/b') + } + + def 'should normalize a path' () { + expect: + new CidPath(fs, '123').normalize() == new CidPath(fs, '123') + new CidPath(fs, '123/a/b').normalize() == new CidPath(fs, '123/a/b') + new CidPath(fs, '123/./a/b').normalize() == new CidPath(fs, '123/a/b') + new CidPath(fs, '123/a/../a/b').normalize() == new CidPath(fs, '123/a/b') + } + + @Unroll + def 'should validate startWith' () { + expect: + new CidPath(fs,PATH).startsWith(OTHER) == EXPECTED + where: + PATH | OTHER | EXPECTED + '12345/a/b' | '12345' | true + '12345/a/b' | '12345/a' | true + '12345/a/b' | '12345/a/b' | true + and: + '12345/a/b' | '12345/b' | false + '12345/a/b' | 'xyz' | false + } + + @Unroll + def 'should validate endsWith' () { + expect: + new CidPath(fs,PATH).endsWith(OTHER) == EXPECTED + where: + PATH | OTHER | EXPECTED + '12345/a/b' | 'b' | true + '12345/a/b' | 'a/b' | true + '12345/a/b' | '12345/a/b' | true + and: + '12345/a/b' | '12345/b' | false + '12345/a/b' | 'xyz' | false + } + + def 'should validate isAbsolute' () { + expect: + new CidPath(fs,'1234/a/b/c').isAbsolute() + new CidPath(fs,'1234/a/b/c').getRoot().isAbsolute() + new CidPath(fs,'1234/a/b/c').getParent().isAbsolute() + new CidPath(fs,'1234/a/b/c').normalize().isAbsolute() + new CidPath(fs,'1234/a/b/c').getName(0).isAbsolute() + new CidPath(fs,'1234/a/b/c').subpath(0,2).isAbsolute() + and: + !new CidPath(fs,'1234/a/b/c').getFileName().isAbsolute() + !new CidPath(fs,'1234/a/b/c').getName(1).isAbsolute() + !new CidPath(fs,'1234/a/b/c').subpath(1,3).isAbsolute() + } + + @Unroll + def 'should get root path' () { + expect: + new CidPath(fs,PATH).getRoot() == new CidPath(fs,EXPECTED) + where: + PATH | EXPECTED + '12345' | '/' + '12345/a' | '/' + } + + def 'should resolve path' () { + when: + def cid1 = new CidPath(fs, '123/a/b/c') + def cid2 = new CidPath(fs, '321/x/y/z') + def rel1 = new CidPath(null, 'foo') + def rel2 = new CidPath(null, 'bar/') + + then: + cid1.resolve(cid2) == cid2 + cid2.resolve(cid1) == cid1 + and: + cid1.resolve(rel1) == new CidPath(fs,'123/a/b/c/foo') + cid1.resolve(rel2) == new CidPath(fs,'123/a/b/c/bar') + and: + rel1.resolve(rel2) == new CidPath(null, 'foo/bar') + rel2.resolve(rel1) == new CidPath(null, 'bar/foo') + } + + def 'should resolve path as string' () { + given: + def pr = Mock(CidFileSystemProvider) + def cidfs = Mock(CidFileSystem){ + getBasePath() >> BASE + provider() >> pr} + + + def cid1 = new CidPath(cidfs, '123/a/b/c') + + expect: + cid1.resolve('x/y') == new CidPath(cidfs, '123/a/b/c/x/y') + cid1.resolve('/x/y/') == new CidPath(cidfs, '123/a/b/c/x/y') + + when: + def result = cid1.resolve('cid://321') + then: + pr.getPath(CidPath.asUri('cid://321')) >> new CidPath(cidfs, '321') + and: + result == new CidPath(cidfs, '321') + } + + @Unroll + def 'should get to uri string' () { + expect: + new CidPath(fs, PATH).toUriString() == EXPECTED + where: + PATH | EXPECTED + '/' | 'cid:///' + '1234' | 'cid://1234' + '1234/a/b/c' | 'cid://1234/a/b/c' + '' | 'cid:///' + } + + @Unroll + def 'should get string' () { + expect: + new CidPath(fs, PATH).toString() == EXPECTED + where: + PATH | EXPECTED + '/' | '/' + '1234' | '1234' + '1234/a/b/c' | '1234/a/b/c' + '' | '/' + } +} diff --git a/modules/nextflow/src/test/groovy/nextflow/data/cid/fs/CifPathFactoryTest.groovy b/modules/nextflow/src/test/groovy/nextflow/data/cid/fs/CifPathFactoryTest.groovy new file mode 100644 index 0000000000..800a60f637 --- /dev/null +++ b/modules/nextflow/src/test/groovy/nextflow/data/cid/fs/CifPathFactoryTest.groovy @@ -0,0 +1,88 @@ +/* + * Copyright 2013-2024, Seqera Labs + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package nextflow.data.cid.fs + +import java.nio.file.Path + +import nextflow.Global +import nextflow.Session +import spock.lang.Specification +import spock.lang.Unroll + +/** + * CID Path Factory tests. + * + * @author Jorge Ejarque + */ +class CifPathFactoryTest extends Specification { + + def setup() { + Global.session = Mock(Session) { getConfig()>> [workflow:[data:[store:[location: '/some/data']]]] } + } + + def cleanup() { + Global.session = null + } + + def 'should create cid path' () { + given: + def factory = new CidPathFactory() + + expect: + factory.parseUri('foo') == null + + when: + def p1 = factory.parseUri('cid://12345') + then: + p1.getTargetPath() == Path.of('/some/data/.meta/12345') + p1.toUriString() == 'cid://12345' + + when: + def p2 = factory.parseUri('cid://12345/x/y/z') + then: + p2.getTargetPath() == Path.of('/some/data/.meta/12345/x/y/z') + p2.toUriString() == 'cid://12345/x/y/z' + + when: + def p3 = factory.parseUri('cid://12345//x///y/z//') + then: + p3.getTargetPath() == Path.of('/some/data/.meta/12345/x/y/z') + p2.toUriString() == 'cid://12345/x/y/z' + + when: + factory.parseUri('cid:///12345') + then: + thrown(IllegalArgumentException) + } + + @Unroll + def 'should convert get cid uri string' () { + given: + def factory = new CidPathFactory() + + when: + def cid = CidPathFactory.create(EXPECTED) + then: + factory.toUriString(cid) == EXPECTED + + where: + _ | EXPECTED + _ | 'cid://123' + _ | 'cid://123/a/b/c' + } +} diff --git a/modules/nf-commons/src/main/nextflow/file/FileHelper.groovy b/modules/nf-commons/src/main/nextflow/file/FileHelper.groovy index f76e95d55b..be885ed40e 100644 --- a/modules/nf-commons/src/main/nextflow/file/FileHelper.groovy +++ b/modules/nf-commons/src/main/nextflow/file/FileHelper.groovy @@ -238,7 +238,7 @@ class FileHelper { return !(path.getFileSystem().provider().scheme in UNSUPPORTED_GLOB_WILDCARDS) } - static Path toCanonicalPath(value) { + static Path toPath(value){ if( value==null ) return null @@ -252,6 +252,14 @@ class FileHelper { else { throw new IllegalArgumentException("Unexpected path value: '$value' [${value.getClass().getName()}]") } + return result + } + + static Path toCanonicalPath(value) { + if( value==null ) + return null + + Path result = toPath(value) if( result.fileSystem != FileSystems.default ) { // remote file paths are expected to be absolute by definition @@ -1163,4 +1171,23 @@ class FileHelper { return null } + public static HashCode getTaskHashFromPath(Path sourcePath, Path workPath) { + assert sourcePath + assert workPath + if (sourcePath.startsWith(workPath)) { + Path relativePath = workPath.relativize(sourcePath) + if (relativePath.getNameCount() >= 2) { + final bucket = relativePath.getName(0).toString() + if (bucket.size() == 2) { + final strHash = bucket + relativePath.getName(1).toString() + try { + return HashCode.fromString(strHash) + } catch (Throwable e) { + log.debug("String '${strHash}' is not a valid hash", e) + } + } + } + } + return null + } } diff --git a/plugins/nf-google/src/main/nextflow/cloud/google/batch/GoogleBatchTaskHandler.groovy b/plugins/nf-google/src/main/nextflow/cloud/google/batch/GoogleBatchTaskHandler.groovy index 442fea9a6a..adbdb7f4c9 100644 --- a/plugins/nf-google/src/main/nextflow/cloud/google/batch/GoogleBatchTaskHandler.groovy +++ b/plugins/nf-google/src/main/nextflow/cloud/google/batch/GoogleBatchTaskHandler.groovy @@ -268,7 +268,7 @@ class GoogleBatchTaskHandler extends TaskHandler implements FusionAwareTask { LifecyclePolicy.newBuilder() .setActionCondition( LifecyclePolicy.ActionCondition.newBuilder() - .addExitCodes(50001) + .addAllExitCodes(executor.config.autoRetryExitCodes) ) .setAction(LifecyclePolicy.Action.RETRY_TASK) ) diff --git a/plugins/nf-google/src/test/nextflow/cloud/google/batch/GoogleBatchTaskHandlerTest.groovy b/plugins/nf-google/src/test/nextflow/cloud/google/batch/GoogleBatchTaskHandlerTest.groovy index 37b27e0b5a..be9a6b0bb0 100644 --- a/plugins/nf-google/src/test/nextflow/cloud/google/batch/GoogleBatchTaskHandlerTest.groovy +++ b/plugins/nf-google/src/test/nextflow/cloud/google/batch/GoogleBatchTaskHandlerTest.groovy @@ -146,6 +146,7 @@ class GoogleBatchTaskHandlerTest extends Specification { getBootDiskImage() >> BOOT_IMAGE getCpuPlatform() >> CPU_PLATFORM getMaxSpotAttempts() >> 5 + getAutoRetryExitCodes() >> [50001,50002] getSpot() >> true getNetwork() >> 'net-1' getServiceAccountEmail() >> 'foo@bar.baz' @@ -198,7 +199,9 @@ class GoogleBatchTaskHandlerTest extends Specification { taskSpec.getMaxRunDuration().getSeconds() == TIMEOUT.seconds taskSpec.getVolumes(0).getMountPath() == '/tmp' taskSpec.getMaxRetryCount() == 5 + taskSpec.getLifecyclePolicies(0).getActionCondition().getExitCodesCount() == 2 taskSpec.getLifecyclePolicies(0).getActionCondition().getExitCodes(0) == 50001 + taskSpec.getLifecyclePolicies(0).getActionCondition().getExitCodes(1) == 50002 taskSpec.getLifecyclePolicies(0).getAction().toString() == 'RETRY_TASK' and: runnable.getContainer().getCommandsList().join(' ') == '/bin/bash -o pipefail -c bash .command.run'