Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

CID blockstore #5787

Merged
merged 19 commits into from
Mar 1, 2025
Merged
Show file tree
Hide file tree
Changes from 9 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
44 changes: 43 additions & 1 deletion modules/nextflow/src/main/groovy/nextflow/Session.groovy
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,11 @@

package nextflow

import nextflow.data.cid.CidStore
import nextflow.data.cid.DefaultCidStore
import nextflow.data.config.DataConfig
import nextflow.util.CacheHelper

import java.nio.file.Files
import java.nio.file.Path
import java.nio.file.Paths
Expand Down Expand Up @@ -254,6 +259,18 @@ class Session implements ISession {

private boolean statsEnabled

private volatile boolean cidEnabled

boolean getCidEnabled() { cidEnabled }

private HashCode executionHash

private CidStore cidStore

CidStore getCidStore() { cidStore }

String getExecutionHash() { executionHash }

private WorkflowMetadata workflowMetadata

private WorkflowStatsObserver statsObserver
Expand Down Expand Up @@ -393,6 +410,12 @@ class Session implements ISession {
// -- file porter config
this.filePorter = new FilePorter(this)

if (config.cid) {
this.cidEnabled = true
this.cidStore = new DefaultCidStore()
this.cidStore.open(DataConfig.create(this))
}

}

protected Path cloudCachePath(Map cloudcache, Path workDir) {
Expand All @@ -405,12 +428,32 @@ class Session implements ISession {
}
return result
}
private HashCode generateExecutionHash(ScriptFile scriptFile){
List keys = [generateScriptHash(scriptFile).toString(), scriptFile?.repository, scriptFile?.commitId, uniqueId, (Map)config.params]
return CacheHelper.hasher(keys).hash()
}

private HashCode generateScriptHash(ScriptFile scriptFile){
List keys = [ scriptFile?.scriptId ]
for( Path p : ScriptMeta.allScriptNames().values() ){
keys << CacheHelper.hasher(p.text).hash().toString()
}
return CacheHelper.hasher(keys).hash()
}

/**
* Initialize the session workDir, libDir, baseDir and scriptName variables
*/
Session init( ScriptFile scriptFile, List<String> args=null ) {

if(cidEnabled) {
this.executionHash = generateExecutionHash(scriptFile)
this.outputDir = cidStore.getPath().resolve(executionHash.toString())
log.warn("CID store enabled. Defined output directory will be ignored and set to ${outputDir}.")
if( !HistoryFile.disabled() && HistoryFile.DEFAULT.exists() ) {
HistoryFile.DEFAULT.updateCidHash(runName,executionHash.toString())
}
}
if(!workDir.mkdirs()) throw new AbortOperationException("Cannot create work-dir: $workDir -- Make sure you have write permissions or specify a different directory by using the `-w` command line option")
log.debug "Work-dir: ${workDir.toUriString()} [${FileHelper.getPathFsType(workDir)}]"

Expand Down Expand Up @@ -439,7 +482,6 @@ class Session implements ISession {
binding.setArgs( new ScriptRunner.ArgsList(args) )

cache = CacheFactory.create(uniqueId,runName).open()

return this
}

Expand Down
167 changes: 165 additions & 2 deletions modules/nextflow/src/main/groovy/nextflow/cli/CmdCid.groovy
Original file line number Diff line number Diff line change
Expand Up @@ -18,10 +18,24 @@
package nextflow.cli

import com.beust.jcommander.Parameter
import groovy.json.JsonSlurper
import groovy.transform.Canonical
import groovy.transform.CompileStatic
import nextflow.Session
import nextflow.config.ConfigBuilder
import nextflow.dag.MermaidHtmlRenderer
import nextflow.data.cid.CidStore
import nextflow.data.cid.DefaultCidStore
import nextflow.data.cid.model.DataType
import nextflow.data.config.DataConfig
import nextflow.exception.AbortOperationException
import nextflow.plugin.Plugins

import java.nio.file.Path
import java.nio.file.Paths

import static nextflow.data.cid.CidObserver.*

/**
*
* @author Paolo Di Tommaso <paolo.ditommaso@gmail.com>
Expand All @@ -33,13 +47,15 @@ class CmdCid extends CmdBase {

interface SubCmd {
String getName()
void apply(List<String> result)
void usage(List<String> result)
void apply(List<String> args)
void usage()
}

private List<SubCmd> commands = new ArrayList<>()

CmdCid() {
commands << new CmdShow()
commands << new CmdLineage()

}

Expand Down Expand Up @@ -75,4 +91,151 @@ class CmdCid extends CmdBase {
msg += " -- Did you mean one of these?\n" + matches.collect { " $it"}.join('\n')
throw new AbortOperationException(msg)
}

class CmdShow implements SubCmd{

@Override
String getName() {
return 'show'
}

@Override
void apply(List<String> args) {
if (args.size() != 1) {
println("ERROR: Incorrect number of parameters")
usage()
return
}
final config = new ConfigBuilder()
.setOptions(getLauncher().getOptions())
.setBaseDir(Paths.get('.'))
.build()
final session = new Session(config)
final store = session.cidStore
println store.load("${args[0]}/$METADATA_FILE").toString()
}

@Override
void usage() {
println 'Usage: nextflow cid show <cid reference>'
}
}


class CmdLineage implements SubCmd {

@Canonical
class Edge {
String source
String destination
String label
}

@Override
String getName() { 'lineage' }

@Override
void apply(List<String> args) {
if (args.size() != 2) {
println("ERROR: Incorrect number of parameters")
usage()
return
}
try {
final config = new ConfigBuilder()
.setOptions(getLauncher().getOptions())
.setBaseDir(Paths.get('.'))
.build()
final session = new Session(config)
final store = session.cidStore
final template = readTemplate()
final network = getLineage(store, args[0])
Path file = Path.of(args[1])
file.text = template.replace('REPLACE_WITH_NETWORK_DATA', network)
println("Linage graph for ${args[0]} rendered in ${args[1]}")
} catch (Throwable e) {
println("ERROR: rendering lineage graph. ${e.getLocalizedMessage()}")
}
}

private String getLineage(CidStore store, String dataCid) {
def lines = [] as List<String>
lines << "flowchart BT".toString()

final nodesToRender = new LinkedList<String>()
nodesToRender.add(dataCid)
final edgesToRender = new LinkedList<Edge>()
while (!nodesToRender.isEmpty()) {
final node = nodesToRender.removeFirst()
processNode(lines, node, nodesToRender, edgesToRender, store)
}
lines << ""
edgesToRender.each { lines << " ${it.source} -->${it.destination}".toString() }
lines << ""
return lines.join('\n')
}

private void processNode(List<String> lines, String nodeToRender, LinkedList<String> nodes, LinkedList<Edge> edges, CidStore store) {
final slurper = new JsonSlurper()
final cidObject = slurper.parse(store.load("$nodeToRender/$METADATA_FILE").toString().toCharArray()) as Map
switch (DataType.valueOf(cidObject.type as String)) {
case DataType.TaskOutput:
case DataType.WorkflowOutput:
lines << " ${nodeToRender}@{shape: document, label: \"${nodeToRender}\"}".toString();
final source = cidObject.source as String
if (source) {
if (source.startsWith(CID_PROT)) {
final cid = source.substring(CID_PROT.size())
nodes.add(cid)
edges.add(new Edge(cid, nodeToRender))
} else {
lines << " ${source}@{shape: document, label: \"${source}\"}".toString();
edges.add(new Edge(source, nodeToRender))
}
}

break;
case DataType.WorkflowRun:
lines << "${nodeToRender}@{shape: processes, label: \"${cidObject.runName}\"}".toString()
final parameters = cidObject.params as Map
parameters.values().each {
lines << " ${it}@{shape: document, label: \"${it}\"}".toString();
edges.add(new Edge(it.toString(), nodeToRender))
}
break;
case DataType.TaskRun:
lines << " ${nodeToRender}@{shape: process, label: \"${cidObject.name}\"}".toString()
final parameters = cidObject.inputs as List<String>
parameters.each { String source ->
if (source.startsWith(CID_PROT)) {
final cid = source.substring(CID_PROT.size())
nodes.add(cid)
edges.add(new Edge(cid, nodeToRender))
} else {
lines << " ${source}@{shape: document, label: \"${source}\"}".toString();
edges.add(new Edge(source, nodeToRender))
}
}
break;
default:
throw new Exception("Unrecognized type reference ${cidObject.type}")
}
}

private String readTemplate() {
final writer = new StringWriter()
final res = MermaidHtmlRenderer.class.getResourceAsStream('mermaid.dag.template.html')
int ch
while( (ch=res.read()) != -1 ) {
writer.append(ch as char)
}
writer.toString()
}

@Override
void usage() {
println 'Usage: nextflow cid lineage <cid workflow output > <html output file>'
}

}
}
Original file line number Diff line number Diff line change
Expand Up @@ -203,6 +203,7 @@ class CmdLog extends CmdBase implements CacheBase {
.head('STATUS')
.head('REVISION ID')
.head('SESSION ID')
.head('CID HASH')
.head('COMMAND')

history.eachRow { List<String> row ->
Expand Down
Loading