Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

CID blockstore #5787

Merged
merged 19 commits into from
Mar 1, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion docs/container.md
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ In the above example replace `/path/to/apptainer.img` with any Apptainer image o
Read the {ref}`config-page` page to learn more about the `nextflow.config` file and how to use it to configure your pipeline execution.

:::{note}
Unlike Docker, Nextflow does not automatically mount host paths in the container when using Apptainer. It expects that the paths are configure and mounted system wide by the Apptainer runtime. If your Apptainer installation allows user defined bind points, read the {ref}`Apptainer configuration <config-apptainer>` section to learn how to enable Nextflow auto mounts.
Unlike Docker, Nextflow does not automatically mount host paths in the container when using Apptainer. It expects that the paths are configured and mounted system wide by the Apptainer runtime. If your Apptainer installation allows user defined bind points, read the {ref}`Apptainer configuration <config-apptainer>` section to learn how to enable Nextflow auto mounts.
:::

:::{warning}
Expand Down
1 change: 1 addition & 0 deletions gradle.properties
Original file line number Diff line number Diff line change
@@ -1,2 +1,3 @@
org.gradle.caching=true
org.gradle.jvmargs=-Xmx4g
org.gradle.parallel=true
19 changes: 18 additions & 1 deletion modules/nextflow/src/main/groovy/nextflow/Session.groovy
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,11 @@

package nextflow

import nextflow.data.cid.CidStore
import nextflow.data.cid.CidStoreFactory
import nextflow.data.cid.DefaultCidStore
import nextflow.data.config.DataConfig

import java.nio.file.Files
import java.nio.file.Path
import java.nio.file.Paths
Expand Down Expand Up @@ -254,6 +259,14 @@ class Session implements ISession {

private boolean statsEnabled

private volatile boolean cidEnabled

boolean getCidEnabled() { cidEnabled }

private CidStore cidStore

CidStore getCidStore() { cidStore }

private WorkflowMetadata workflowMetadata

private WorkflowStatsObserver statsObserver
Expand Down Expand Up @@ -393,6 +406,11 @@ class Session implements ISession {
// -- file porter config
this.filePorter = new FilePorter(this)

if(config.navigate('workflow.data')) {
this.cidEnabled = true
this.cidStore = CidStoreFactory.create(DataConfig.create(this))
}

}

protected Path cloudCachePath(Map cloudcache, Path workDir) {
Expand Down Expand Up @@ -439,7 +457,6 @@ class Session implements ISession {
binding.setArgs( new ScriptRunner.ArgsList(args) )

cache = CacheFactory.create(uniqueId,runName).open()

return this
}

Expand Down
255 changes: 253 additions & 2 deletions modules/nextflow/src/main/groovy/nextflow/cli/CmdCid.groovy
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,24 @@
package nextflow.cli

import com.beust.jcommander.Parameter
import groovy.json.JsonSlurper
import groovy.transform.Canonical
import groovy.transform.CompileStatic
import nextflow.Session
import nextflow.config.ConfigBuilder
import nextflow.dag.MermaidHtmlRenderer
import nextflow.data.cid.CidHistoryFile
import nextflow.data.cid.CidStore
import nextflow.data.cid.model.DataType
import nextflow.exception.AbortOperationException
import nextflow.plugin.Plugins
import nextflow.ui.TableBuilder

import java.nio.file.Path
import java.nio.file.Paths

import static nextflow.data.cid.fs.CidPath.CID_PROT
import static nextflow.data.cid.fs.CidPath.METADATA_FILE

/**
*
Expand All @@ -33,13 +48,17 @@ class CmdCid extends CmdBase {

interface SubCmd {
String getName()
void apply(List<String> result)
void usage(List<String> result)
void apply(List<String> args)
void usage()
}

private List<SubCmd> commands = new ArrayList<>()

CmdCid() {
commands << new CmdLog()
commands << new CmdShow()
commands << new CmdLineage()


}

Expand Down Expand Up @@ -75,4 +94,236 @@ class CmdCid extends CmdBase {
msg += " -- Did you mean one of these?\n" + matches.collect { " $it"}.join('\n')
throw new AbortOperationException(msg)
}

class CmdLog implements SubCmd {

@Override
String getName() {
return 'log'
}

@Override
void apply(List<String> args) {
if (args.size() != 0) {
println("ERROR: Incorrect number of parameters")
usage()
return
}
final config = new ConfigBuilder()
.setOptions(getLauncher().getOptions())
.setBaseDir(Paths.get('.'))
.build()
final session = new Session(config)
printHistory(session.cidStore)

}

private void printHistory(CidStore store) {


final historyFile = store.getHistoryFile()
if (historyFile.exists()) {
def table = new TableBuilder(cellSeparator: '\t')
.head('TIMESTAMP')
.head('RUN NAME')
.head('SESSION ID')
.head('RUN CID')
historyFile.eachLine { table.append(CidHistoryFile.CidRecord.parse(it).toList()) }
println table.toString()
} else {
println("No workflow runs CIDs found.")
}
}

@Override
void usage() {
println 'Usage: nextflow cid log'
}
}
class CmdShow implements SubCmd{

@Override
String getName() {
return 'show'
}

@Override
void apply(List<String> args) {
if (args.size() != 1) {
println("ERROR: Incorrect number of parameters")
usage()
return
}
if (!args[0].startsWith(CID_PROT))
throw new Exception("Identifier is not a CID URL")
final key = args[0].substring(CID_PROT.size()) + "/$METADATA_FILE"
final config = new ConfigBuilder()
.setOptions(getLauncher().getOptions())
.setBaseDir(Paths.get('.'))
.build()
final session = new Session(config)
final store = session.cidStore
try {
println store.load(key).toString()
}catch (Throwable e){
println "Error loading ${args[0]}."
}
}

@Override
void usage() {
println 'Usage: nextflow cid show <cid reference>'
}
}


class CmdLineage implements SubCmd {

@Canonical
class Edge {
String source
String destination
String label
}

@Override
String getName() { 'lineage' }

@Override
void apply(List<String> args) {
if (args.size() != 2) {
println("ERROR: Incorrect number of parameters")
usage()
return
}
try {
final config = new ConfigBuilder()
.setOptions(getLauncher().getOptions())
.setBaseDir(Paths.get('.'))
.build()
final session = new Session(config)
final store = session.cidStore
final template = readTemplate()
final network = getLineage(store, args[0])
Path file = Path.of(args[1])
file.text = template.replace('REPLACE_WITH_NETWORK_DATA', network)
println("Linage graph for ${args[0]} rendered in ${args[1]}")
} catch (Throwable e) {
println("ERROR: rendering lineage graph. ${e.message}")
}
}

private String getLineage(CidStore store, String dataCid) {
def lines = [] as List<String>
lines << "flowchart BT".toString()

final nodesToRender = new LinkedList<String>()
nodesToRender.add(dataCid)
final edgesToRender = new LinkedList<Edge>()
while (!nodesToRender.isEmpty()) {
final node = nodesToRender.removeFirst()
processNode(lines, node, nodesToRender, edgesToRender, store)
}
lines << ""
edgesToRender.each { lines << " ${it.source} -->${it.destination}".toString() }
lines << ""
return lines.join('\n')
}

private void processNode(List<String> lines, String nodeToRender, LinkedList<String> nodes, LinkedList<Edge> edges, CidStore store) {
if (!nodeToRender.startsWith(CID_PROT))
throw new Exception("Identifier is not a CID URL")
final slurper = new JsonSlurper()
final key = nodeToRender.substring(CID_PROT.size()) + "/$METADATA_FILE"
final cidObject = slurper.parse(store.load(key).toString().toCharArray()) as Map
switch (DataType.valueOf(cidObject.type as String)) {
case DataType.TaskOutput:
case DataType.WorkflowOutput:
lines << " ${nodeToRender}@{shape: document, label: \"${nodeToRender}\"}".toString();
final source = cidObject.source as String
if (source) {
if (source.startsWith(CID_PROT)) {
nodes.add(source)
edges.add(new Edge(source, nodeToRender))
} else {
final label = convertToLabel(source)
lines << " ${source}@{shape: document, label: \"${label}\"}".toString();
edges.add(new Edge(source, nodeToRender))
}
}

break;
case DataType.WorkflowRun:
lines << "${nodeToRender}@{shape: processes, label: \"${cidObject.runName}\"}".toString()
final parameters = cidObject.params as List<nextflow.data.cid.model.Parameter>
parameters.each {
final label = convertToLabel(it.value.toString())
lines << " ${it.value.toString()}@{shape: document, label: \"${label}\"}".toString();
edges.add(new Edge(it.value.toString(), nodeToRender))
}
break;
case DataType.TaskRun:
lines << " ${nodeToRender}@{shape: process, label: \"${cidObject.name}\"}".toString()
final parameters = cidObject.inputs as List<nextflow.data.cid.model.Parameter>
for (nextflow.data.cid.model.Parameter source: parameters){
if (source.type.equals(nextflow.script.params.FileInParam.simpleName)) {
manageFileInParam(lines, nodeToRender, nodes, edges, source.value)
} else {
final label = convertToLabel(source.value.toString())
lines << " ${source.value.toString()}@{shape: document, label: \"${label}\"}".toString();
edges.add(new Edge(source.value.toString(), nodeToRender))
}
}
break;
default:
throw new Exception("Unrecognized type reference ${cidObject.type}")
}
}

private String convertToLabel(String label){
return label.replace('http', 'h\u200Ettp')
}

private void manageFileInParam(List<String> lines, String nodeToRender, LinkedList<String> nodes, LinkedList<Edge> edges, value){
if (value instanceof Collection) {
value.each { manageFileInParam(lines, nodeToRender, nodes, edges, it) }
return
}
if (value instanceof CharSequence) {
final source = value.toString()
if (source.startsWith(CID_PROT)) {
nodes.add(source)
edges.add(new Edge(source, nodeToRender))
return
}
}
if (value instanceof Map) {
if (value.path) {
final label = convertToLabel(value.path.toString())
lines << " ${value.path}@{shape: document, label: \"${label}\"}".toString();
edges.add(new Edge(value.path.toString(), nodeToRender))
return
}
}
final label = convertToLabel(value.toString())
lines << " ${value.toString()}@{shape: document, label: \"${label}\"}".toString();
edges.add(new Edge(value.toString(), nodeToRender))
}

protected static String readTemplate() {
final writer = new StringWriter()
final res = MermaidHtmlRenderer.class.getResourceAsStream('mermaid.dag.template.html')
int ch
while( (ch=res.read()) != -1 ) {
writer.append(ch as char)
}
writer.toString()
}

@Override
void usage() {
println 'Usage: nextflow cid lineage <cid workflow output > <html output file>'
}

}
}
Loading
Loading