Add workflow DAG reporting to nf-tower plugin#7218
Conversation
Serialize the run execution DAG faithfully and send it to Seqera Platform
once at run start, so consumers can render a live execution graph.
A new `nextflow.dag.DagSerializer` mirrors the DAG model one-to-one (every
vertex and edge, no filtering or reduction) into a plain map with the shape
`{vertices:[{id,type,label,scope}], edges:[{source,target,label}]}`. It lives
in the core `nextflow.dag` package alongside the existing DOT/GEXF/Mermaid
renderers, where `DAG.Type`/`addVertex`/`createVertex` (package-scoped) are
accessible. For PROCESS vertices `label` is the fully-qualified process name
(scope prefix included) and `scope` is the enclosing subworkflow, so the FQ
name stays reconstructable. Edges join via the stable per-vertex id.
`TowerObserver` serializes the DAG in `makeBeginReq` as an optional, additive
`dag` field on the begin payload. The DAG is complete at `onFlowBegin`: the
script body builds the full dataflow network before `fireDataflowNetwork`
fires the begin event, and `CH.broadcast()` (which runs afterwards) adds no
DAG vertices. Serialization is null-safe and swallows errors so DAG reporting
can never disrupt a run; older consumers ignore the unknown field.
Note: the consumer side (accepting `dag` on begin, persisting it, reducing and
serving it for the graph view) is a separate seqeralabs/platform PR; the two
should land together.
Signed-off-by: Phil Ewels <phil.ewels@seqera.io>
✅ Deploy Preview for nextflow-docs-staging ready!
To edit notification comments on pull requests, go to your Netlify project configuration. |
pditommaso
left a comment
There was a problem hiding this comment.
This is a good idea — sending the DAG faithfully at run start and leaving all interpretation to the consumer is the right call, and the design claims hold up against the code. I verified the key points locally:
- DAG complete at
onFlowBegin— confirmed:scriptLoader.runScript()builds the full dataflow network beforesession.fireDataflowNetwork(), andnotifyFlowBegin()fires first there. The subsequentCH.broadcast()(queue bridging and topicMixOp) never callsNodeMarker.addOperatorNode, so no vertices are added after the snapshot. Serialization also happens synchronously on the main thread before igniters run, so no concurrent-mutation risk. normalize()idempotency — confirmed, safe alongsideGraphObserver's ownnormalize()at completion.- Wire format —
TowerJsonGenerator's scheme map only truncates listed paths, it's not a whitelist, so thedagfield passes through as-is. - Tests — ran
DagSerializerTest,TowerObserverTestandTowerJsonGeneratorTeston this branch, all green. Coverage is genuinely good: the operator-preservation property is asserted at three layers (core serializer, observer, real wire generator round-trip).
To address
renderDagcatchesException, but the DAG invariant throwsAssertionError.DAG.normalizeMissingVertices()enforces its invariant with a Groovyassert(DAG.groovy:314), which throwsAssertionError— anError, not anException— so it would escape the catch and abort the run, contradicting the stated "can never abort a run" contract. Unlikely in practice, but worth catchingThrowable(orException | AssertionError) to make the contract bulletproof.
Nice to have (non-blocking)
- "Stable id" wording is slightly misleading.
Vertex.nextIDis a static JVM-globalAtomicLong— ids aren't deterministic across runs and won't match between a run and its-resume. They're only stable within one payload, which is sufficient for edge joins, but a one-word doc tweak ("payload-scoped id") would prevent a consumer treating them as cross-run identity. DagSerializer.toMap()mutates its argument vianormalize()— a side effect in a method namedtoMap. Documented and idempotent, so acceptable, but aserialize(DAG)name or an explicit note would make the mutation harder to miss.- Raw
LinkedHashMap/Maptypes inDagSerializer—Map<String,Object>would be cleaner under@CompileStatic. entry.scope = v.workflow ?: nulldeliberately converts the empty-string default fromcreateVertexto null — correct, but a short comment would explain the?:.
…docs - renderDag now catches Throwable, not just Exception: DAG.normalize() enforces its invariant with a Groovy assert (an AssertionError), so the 'DAG reporting can never abort a run' contract now holds for that case too. - Clarify that vertex ids are payload-scoped (static JVM-global counter, not deterministic across runs or a -resume), not cross-run stable. - Document the in-place normalize() side effect of DagSerializer.toMap(). - Parameterize raw Map/LinkedHashMap types as Map<String,Object>. - Comment the empty-string-to-null scope coercion. Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com> Signed-off-by: Phil Ewels <phil.ewels@seqera.io>
|
Thanks for the thorough review — all points addressed in f65bf8e. To address
Nice to have
|
Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com> Signed-off-by: Phil Ewels <phil.ewels@seqera.io>
bentsherman
left a comment
There was a problem hiding this comment.
I think Nextflow should just send a map of process name -> upstream processes. Everything else (operators, channel names, etc) are internal details that are not appropriate for exposure via API. That is, it limits our ability to change the DAG structure because external systems like platform are now dependent on it
|
Hah, I started off with an implementation like that and then rolled it back to be generic instead. Basically the same logic, but in reverse - I was thinking if we just sent the whole DAG then Nextflow could be agnostic and we would be better future proofed for anything we want to do without needing further Nextflow changes. |
Summary
Serialize the run execution DAG faithfully and send it to Seqera Platform once at run start (on the
beginrequest), so Platform (or any consumer) can render a live execution graph.The plugin's responsibility is deliberately minimal and stable: it serializes
session.getDag()and sends it as-is — no reduction, filtering or reinterpretation. All interpretation (collapsing operator/channel vertices to a process-only graph, building a subworkflow hierarchy, choosing a layout) happens consumer-side, where it can evolve without a Nextflow release. The DAG is small (KB-scale even for large pipelines), so it's sent whole.What's in here
New core serializer —
modules/nextflow/src/main/groovy/nextflow/dag/DagSerializer.groovystatic Map toMap(DAG dag)mirrors the DAG model 1:1:{ "dag": { "vertices": [ { "id": "<stable id>", "type": "PROCESS", "label": "<vertex label>", "scope": "<enclosing workflow>" } ], "edges": [ { "source": "<id>", "target": "<id>", "label": "<optional channel label>" } ] } }PROCESS/OPERATOR/ORIGIN/NODE) and every edge is emitted — operator/channel vertices between processes are preserved, never dropped.nextflow.dagpackage alongside the existing DOT/GEXF/Mermaid renderers becauseDAG.Type,addVertexandcreateVertexare@PackageScope— placing it there keeps it@CompileStatic-clean and lets the faithful unit test construct PROCESS+OPERATOR vertices.nf-towerjust invokes it.TowerObserver— addsrenderDag(session)(null-safe; swallows errors so DAG reporting can never abort a run) and wires the result intomakeBeginReqas an optional, additive top-leveldagfield (omitted when empty/unavailable). No existing fields changed.Key decisions
onFlowBegin. Verified viaScriptRunner.run():scriptLoader.runScript()builds the entire dataflow network (and thus the DAG) beforesession.fireDataflowNetwork()→notifyFlowBegin().CH.broadcast(), which runs afteronFlowBegin, uses GPars' nativequeue.into()/MixOpand adds noNodeMarkerDAG vertices — so the DAG is structurally complete at begin. Sent exactly once.Vertex.labeldirectly. When a process runs inside a subworkflow,BindableDefclones it withprefix + SCOPE_SEP + name, soprocess.name(stored asVertex.label) is already the fully-qualified name (e.g.RNASEQ:ALIGN:STAR). We also emitscope(Vertex.workflow, the enclosing subworkflow) so the consumer never has to guess. The stable edge join usesVertex.id(theAtomicLong, stable acrossnormalize()unlike the positionalv0/v1name).normalize()before serializing — resolves channel-derived edge labels and fillsORIGIN/NODEboundary vertices, matching the file renderers. Idempotent, so it's safe alongsideGraphObserver's ownnormalize()at completion.Backwards compatibility
The
dagfield is optional and additive. Older Platform/consumers ignore the unknown field, and the plugin never fails if the server doesn't accept it. No existing fields changed.Tests
./gradlew :plugins:nf-tower:testand:nextflow:test— all green.DagSerializerTest(core): faithful vertex/edge serialization with an operator between two processes (asserts it's preserved), FQ name + scope exposure, boundary-vertex fill-in, null/empty handling.TowerObserverTest:dagpresent in begin request, omitted when null,renderDagempty/exception null-safety, populated-DAG operator-chain check.TowerJsonGeneratorTest: thedagmap round-trips through the real wire generator with operators and FQ labels/scope intact.E2E: ran a pipeline with a named subworkflow (
SUB) via./launch.sh … -with-dag— the DAG showed FQ namesSUB:FOO/SUB:BAR, the interveningmapoperator, boundary vertices, and labeled edges (ch,collected) — exactly the dataDagSerializeremits.🤖 Generated with Claude Code