fix(serialization): DagParam in partial() of mapped task serializes stably#69091
Open
gingeekrishna wants to merge 1 commit into
Open
fix(serialization): DagParam in partial() of mapped task serializes stably#69091gingeekrishna wants to merge 1 commit into
gingeekrishna wants to merge 1 commit into
Conversation
…tably Fixes apache#68941 When DagParam is used as a kwarg in partial() of a dynamically mapped task, BaseSerialization.serialize() had no branch for DagParam and fell through to str(var), embedding a non-deterministic memory address in partial_kwargs. This caused a new DAG version to be written on every scheduler parse cycle. Add DAT.DAG_PARAM to the type enum and handle DagParam in both serialize() and deserialize(). During deserialization a _DagParamRef placeholder (similar to _XComRef for XComArg) stores the stable dag_id/name/default triple until set_task_dag_references() can attach the live DAG reference once the DAG is fully hydrated. Four regression tests cover: stable serialization (no memory address), full round-trip (DagParam reconstructed with correct dag reference), version stability (two serializations of same DAG are identical), and NOTSET default preservation.
2 tasks
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Add this suggestion to a batch that can be applied as a single commit.This suggestion is invalid because no changes were made to the code.Suggestions cannot be applied while the pull request is closed.Suggestions cannot be applied while viewing a subset of changes.Only one suggestion per line can be applied in a batch.Add this suggestion to a batch that can be applied as a single commit.Applying suggestions on deleted lines is not supported.You must change the existing code in this line in order to create a valid suggestion.Outdated suggestions cannot be applied.This suggestion has been applied or marked resolved.Suggestions cannot be applied from pending reviews.Suggestions cannot be applied on multi-line comments.Suggestions cannot be applied while the pull request is queued to merge.Suggestion cannot be applied right now. Please check back later.
Summary
Fixes #68941
Problem
When
DagParamis used as a kwarg in.partial()of a dynamically mapped task,BaseSerialization.serialize()had no branch forDagParamand fell through tocls.default_serialization(strict, var)which callsstr(var). Becausestr(DagParam)includes the object's memory address (e.g.<DagParam object at 0x7f3a...>), the serializedpartial_kwargschanged on every scheduler parse, causing a new DAG version to be written on each cycle — DAG version inflation.Reproducer:
Fix
enums.py— AddDAT.DAG_PARAM = dag_paramto the type dispatch enum.serialized_objects.py:DagParamto the imports._DagParamRefNamedTuple (modelled on_XComRef) that acts as a stable placeholder during deserialization, holdingdag_id,name, anddefaultuntil the DAG is available.DagParambranch inBaseSerialization.serialize()that encodes the stable triple(dag_id, name, default)using the newDAT.DAG_PARAMtype.DAT.DAG_PARAMcase inBaseSerialization.deserialize()that returns a_DagParamRefplaceholder.OperatorSerialization._resolve_dag_param_refs()to recursively resolve placeholders once the DAG is hydrated.set_task_dag_references()to call_resolve_dag_param_refsonpartial_kwargsforMappedOperator.Why
_DagParamRefinstead of deserializing immediately?DagParam.__init__requires a liveDAGobject (current_dag.params[name] = default) butpartial_kwargsare deserialized before the full DAG is assembled. This is the same deferred-resolution pattern already used by_XComRefforXComArg.Tests
Four regression tests added to
test_dag_serialization.py:test_dagparam_in_partial_is_serialized_stablypartial_kwargs;dag_paramencoding contains correctname/dag_id/default;_DagParamRefplaceholder created on individual-op deserializationtest_dagparam_in_partial_roundtripDagSerialization.to_dict / from_dictcycle produces a liveDagParamwith correct_name,_default, andcurrent_dagreferencetest_dagparam_in_partial_version_stabilitytest_dagparam_in_partial_no_defaultDagParamwith no default (NOTSET) round-trips correctly