Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions docs/developer/nextflow.dag.md
Original file line number Diff line number Diff line change
Expand Up @@ -17,3 +17,5 @@ Some classes may be excluded from the above diagram for brevity.
The workflow DAG defines the network of processes, channels, and operators that comprise a workflow. It is produced by the execution of the Nextflow script. See [nextflow.script](nextflow.script.md) for more details.

Implementations of the `DagRenderer` interface define how to render the workflow DAG to a particular diagram format. See {ref}`workflow-diagram` for more details.

Separately, `DagSerializer` produces a faithful machine-readable map of the DAG (every vertex and edge, with no reduction) for transport to external consumers such as Seqera Platform, as opposed to the file-oriented `DagRenderer` implementations.
116 changes: 116 additions & 0 deletions modules/nextflow/src/main/groovy/nextflow/dag/DagSerializer.groovy
Original file line number Diff line number Diff line change
@@ -0,0 +1,116 @@
/*
* Copyright 2013-2026, Seqera Labs
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package nextflow.dag

import groovy.transform.CompileStatic
import groovy.util.logging.Slf4j

/**
* Serialize a {@link DAG} into a plain {@code Map} that mirrors the DAG model
* one-to-one — every vertex and every edge, with no filtering, reduction or
* reinterpretation.
*
* Unlike the file-oriented renderers in this package ({@link DotRenderer},
* {@link GexfRenderer}, {@link MermaidRenderer}, ...) which emit a layout-ready
* document, this serializer produces a faithful machine-readable dump intended
* for transport to external consumers (e.g. Seqera Platform). Any interpretation
* — collapsing operator/channel vertices to a process-only graph, building a
* subworkflow hierarchy, choosing a layout — is left to the consumer, so it can
* evolve without a Nextflow release.
*
* Output shape:
* <pre>
* {
* vertices: [ { id, type, label, scope } ... ],
* edges: [ { source, target, label } ... ]
* }
* </pre>
*
* Notes:
* <ul>
* <li>{@code id} is the payload-scoped per-vertex id ({@link DAG.Vertex#getId()})
* used to join edges to vertices within this payload. It comes from a static
* JVM-global counter, so it is <em>not</em> deterministic across runs and will
* not match between a run and its {@code -resume} — treat it as identity only
* within one serialized payload, never as cross-run identity. It is preferred
* over the positional {@code name} ({@code v0}, {@code v1}, ...) because
* {@link DAG#normalize()} inserts boundary vertices and shifts positions.</li>
* <li>for {@code PROCESS} vertices {@code label} carries the fully-qualified
* process name with the scope prefix included (e.g. {@code RNASEQ:ALIGN:STAR}),
* while {@code scope} carries the enclosing (sub)workflow name — so the FQ
* name remains reconstructable without guessing.</li>
* <li>every vertex type ({@code PROCESS}, {@code OPERATOR}, {@code ORIGIN},
* {@code NODE}) and every edge is emitted as-is; operator/channel vertices
* between processes are preserved, never dropped.</li>
* </ul>
*
* @author Phil Ewels <phil.ewels@seqera.io>
*/
@Slf4j
@CompileStatic
class DagSerializer {

/**
* Convert the given DAG into a faithful {@code Map} representation.
*
* The DAG is {@link DAG#normalize() normalized} first so that dangling edges
* get their boundary ({@code ORIGIN}/{@code NODE}) vertices and channel-derived
* edge labels are resolved — matching the behaviour of the file renderers.
*
* <p><b>Side effect:</b> this mutates {@code dag} in place via
* {@link DAG#normalize()}. {@code normalize()} is idempotent, so calling this
* alongside other normalizers (e.g. {@code GraphObserver} at completion) is safe.
*
* @param dag The DAG to serialize, or {@code null}
* @return A map with {@code vertices} and {@code edges} lists, or {@code null}
* when the given DAG is {@code null}
*/
static Map<String,Object> toMap(DAG dag) {
if( dag == null )
return null

// resolve dangling edge endpoints and channel-derived edge labels (mutates dag)
dag.normalize()

final vertices = new ArrayList<Map<String,Object>>(dag.vertices.size())
for( DAG.Vertex v : dag.vertices ) {
final entry = new LinkedHashMap<String,Object>(4)
entry.id = String.valueOf(v.id)
entry.type = v.type?.toString()
entry.label = v.label
// createVertex defaults `workflow` to "" — coerce the empty-string root
// scope back to null so consumers see a clean "no enclosing subworkflow"
entry.scope = v.workflow ?: null
vertices.add(entry)
}

final edges = new ArrayList<Map<String,Object>>(dag.edges.size())
for( DAG.Edge e : dag.edges ) {
final entry = new LinkedHashMap<String,Object>(3)
entry.source = e.from != null ? String.valueOf(e.from.id) : null
entry.target = e.to != null ? String.valueOf(e.to.id) : null
entry.label = e.label
edges.add(entry)
}

final result = new LinkedHashMap<String,Object>(2)
result.vertices = vertices
result.edges = edges
return result
}

}
135 changes: 135 additions & 0 deletions modules/nextflow/src/test/groovy/nextflow/dag/DagSerializerTest.groovy
Original file line number Diff line number Diff line change
@@ -0,0 +1,135 @@
/*
* Copyright 2013-2026, Seqera Labs
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package nextflow.dag

import groovyx.gpars.dataflow.DataflowChannel
import nextflow.Session
import spock.lang.Specification
/**
* @author Phil Ewels <phil.ewels@seqera.io>
*/
class DagSerializerTest extends Specification {

def setupSpec() {
new Session()
}

def 'should return null for a null dag' () {
expect:
DagSerializer.toMap(null) == null
}

def 'should serialize an empty dag' () {
when:
def result = DagSerializer.toMap(new DAG())
then:
result == [vertices: [], edges: []]
}

def 'should faithfully serialize vertices and edges' () {
given:
def ch1 = Mock(DataflowChannel)
def ch2 = Mock(DataflowChannel)
def ch3 = Mock(DataflowChannel)
and:
def dag = new DAG()
// PROCESS --ch2--> OPERATOR --ch3--> PROCESS
dag.addVertex(
DAG.Type.PROCESS,
'foo',
[ new DAG.ChannelHandler(channel: ch1, label: 'in') ],
[ new DAG.ChannelHandler(channel: ch2, label: 'out') ] )
dag.addVertex(
DAG.Type.OPERATOR,
'map',
[ new DAG.ChannelHandler(channel: ch2) ],
[ new DAG.ChannelHandler(channel: ch3) ] )
dag.addVertex(
DAG.Type.PROCESS,
'bar',
[ new DAG.ChannelHandler(channel: ch3) ],
[] )

when:
def result = DagSerializer.toMap(dag)
def vFoo = result.vertices.find { it.label == 'foo' }
def vMap = result.vertices.find { it.label == 'map' }
def vBar = result.vertices.find { it.label == 'bar' }

then:
// every declared vertex is preserved, including the operator between the two processes
// (normalize() additionally inserts an ORIGIN for the dangling `in` channel)
vFoo.type == 'PROCESS'
vMap.type == 'OPERATOR'
vBar.type == 'PROCESS'
and:
// the operator vertex is not dropped/collapsed
result.vertices.count { it.type == 'OPERATOR' } == 1
and:
// ids are stable strings and unique, edges join via those ids
result.vertices*.id.every { it instanceof String }
result.vertices*.id.unique().size() == result.vertices.size()

and:
// the process -> operator -> process chain is intact
result.edges.find { it.source == vFoo.id && it.target == vMap.id }
result.edges.find { it.source == vMap.id && it.target == vBar.id }
}

def 'should expose fully-qualified process name and enclosing scope' () {
given:
def dag = new DAG()
and:
// a process invoked inside a subworkflow gets a scope-prefixed FQ name as its label,
// while `workflow` holds the enclosing (sub)workflow scope
def v = dag.createVertex(DAG.Type.PROCESS, 'RNASEQ:ALIGN:STAR')
v.workflow = 'RNASEQ:ALIGN'

when:
def result = DagSerializer.toMap(dag)

then:
result.vertices.size() == 1
result.vertices[0].label == 'RNASEQ:ALIGN:STAR'
result.vertices[0].scope == 'RNASEQ:ALIGN'
}

def 'should fill in boundary vertices for dangling edges' () {
given:
def ch1 = Mock(DataflowChannel)
def ch2 = Mock(DataflowChannel)
and:
def dag = new DAG()
// a single process with one dangling input and one dangling output:
// normalize() should create an ORIGIN source and a NODE termination
dag.addVertex(
DAG.Type.PROCESS,
'foo',
[ new DAG.ChannelHandler(channel: ch1) ],
[ new DAG.ChannelHandler(channel: ch2) ] )

when:
def result = DagSerializer.toMap(dag)

then:
result.vertices*.type.toSet() == ['ORIGIN', 'PROCESS', 'NODE'] as Set
and:
// both edges now have a resolved source and target
result.edges.every { it.source != null && it.target != null }
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import groovy.transform.ToString
import groovy.util.logging.Slf4j
import nextflow.Session
import nextflow.container.resolver.ContainerMeta
import nextflow.dag.DagSerializer
import nextflow.exception.AbortRunException
import nextflow.processor.TaskHandler
import nextflow.processor.TaskId
Expand Down Expand Up @@ -389,14 +390,45 @@ class TowerObserver implements TraceObserverV2 {
workflow.logFile = getLogFile()
workflow.outFile = getOutFile()

def result = new LinkedHashMap(5)
def result = new LinkedHashMap(6)
result.workflow = workflow
result.processNames = new ArrayList(processNames)
result.towerLaunch = towerLaunch
// faithful serialization of the execution DAG (optional, additive field).
// The dataflow network is fully built by the time `onFlowBegin` fires (the
// script body runs before `fireDataflowNetwork`), so the DAG is complete here.
final dag = renderDag(session)
if( dag )
result.dag = dag
result.instant = Instant.now().toEpochMilli()
return result
}

/**
* Serialize the run execution DAG into a plain map for transport to Seqera Platform.
*
* Failures are swallowed (logged at debug) so that DAG reporting can never abort or
* disrupt the run — the field is optional and additive.
*
* @param session The current Nextflow session
* @return The serialized DAG, or {@code null} if it is empty or cannot be serialized
*/
protected Map renderDag(Session session) {
try {
final dag = session.getDag()
if( dag == null || dag.isEmpty() )
return null
return DagSerializer.toMap(dag)
}
catch( Throwable e ) {
// catch Throwable (not just Exception): DAG.normalize() enforces its
// invariant with a Groovy `assert`, which throws AssertionError (an Error,
// not an Exception) — so DAG reporting can never abort or disrupt the run.
log.debug("Unable to serialize execution DAG for Seqera Platform -- Cause: ${e.message ?: e}", e)
return null
}
}

protected Map makeCompleteReq(Session session) {
def workflow = session.getWorkflowMetadata().toMap()
//Remove retrieved platform info
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -248,6 +248,44 @@ class TowerJsonGeneratorTest extends Specification {
}
}

def 'should serialise the execution dag faithfully' () {
given:
def gen = TowerJsonGenerator.create([:])
and:
// a process -> operator -> process chain; the operator must survive serialization
def dag = [
vertices: [
[id: '0', type: 'PROCESS', label: 'RNASEQ:FOO', scope: 'RNASEQ'],
[id: '1', type: 'OPERATOR', label: 'map', scope: 'RNASEQ'],
[id: '2', type: 'PROCESS', label: 'RNASEQ:BAR', scope: 'RNASEQ'],
],
edges: [
[source: '0', target: '1', label: 'ch_reads'],
[source: '1', target: '2', label: null],
]
]

when:
def json = gen.toJson([dag: dag])
def copy = (Map) new JsonSlurper().parseText(json)

then:
// the operator vertex between the two processes is preserved, not dropped
copy.dag.vertices.size() == 3
copy.dag.vertices*.type == ['PROCESS', 'OPERATOR', 'PROCESS']
copy.dag.vertices.find { it.type == 'OPERATOR' && it.label == 'map' }
and:
// fully-qualified process labels and enclosing scope round-trip intact
copy.dag.vertices[0].label == 'RNASEQ:FOO'
copy.dag.vertices[0].scope == 'RNASEQ'
and:
// edges join vertices by id, optional channel label preserved
copy.dag.edges.size() == 2
copy.dag.edges[0] == [source: '0', target: '1', label: 'ch_reads']
copy.dag.edges[1].source == '1'
copy.dag.edges[1].target == '2'
}

def 'should serialise container meta' () {
given:
def gen = TowerJsonGenerator.create([:])
Expand Down
Loading
Loading