Under which category would you file this issue?
Airflow Core
Apache Airflow version
3.2.2
What happened and how to reproduce it?
When using a DagParam as a kwarg to partial it will get serialized with its memory address. As the address is not stable this leads to version inflation.
Repro:
from __future__ import annotations
import json
from airflow.sdk import DAG
from airflow.sdk import task
from airflow.serialization.definitions.param import SerializedParam
from airflow.serialization.serialized_objects import DagSerialization
if __name__ == "__main__":
with DAG(dag_id="repro_dagparam") as dag:
@task
def add(value):
return value
@task
def do(something):
return something
do(dag.param("some", "some_default_val"))
add.partial(value=dag.param("p", "p_default_val")).expand(value=[1, 2, 3])
ser = DagSerialization.to_dict(dag)
mapped = ser["dag"]["tasks"][1]["__var"]
print(json.dumps(mapped.get("partial_kwargs"), indent=2, default=str))
print("ROUNDTRIP:")
deser = DagSerialization.from_dict(ser)
t = deser.get_task("add")
print(type(t.partial_kwargs.get("op_kwargs")), t.partial_kwargs.get("op_kwargs"))
This will print
{
"op_kwargs": {
"__var": {
"value": "<airflow.sdk.definitions.param.DagParam object at 0x71b92d0789e0>"
},
"__type": "dict"
},
"op_args": []
}
Note the memory address <airflow.sdk.definitions.param.DagParam object at 0x71b92d0789e0>
What you think should happen instead?
No response
Operating System
No response
Deployment
Official Apache Airflow Helm Chart
Apache Airflow Provider(s)
No response
Versions of Apache Airflow Providers
No response
Official Helm Chart version
1.22.0 (latest released)
Kubernetes Version
No response
Helm Chart configuration
No response
Docker Image customizations
No response
Anything else?
A workaround exists by using Jinja templates to pass DagParams in partial
add.partial(value="{{ params.p }}").expand(value=[1, 2, 3])
The root cause is in serialize() not having a case for DagParam running into the default branch and serializing the object as string.
I think either a separate DagParam case needs to be added to serialize() similar to XComArg or serialize_mapped_operator() needs to handle the partial_kwargs ops with serialize_template_field() or something similar.
If somebody can point me in a direction for a fix, I am willing to contribute a PR.
Are you willing to submit PR?
Code of Conduct
Under which category would you file this issue?
Airflow Core
Apache Airflow version
3.2.2
What happened and how to reproduce it?
When using a DagParam as a kwarg to
partialit will get serialized with its memory address. As the address is not stable this leads to version inflation.Repro:
This will print
Note the memory address
<airflow.sdk.definitions.param.DagParam object at 0x71b92d0789e0>What you think should happen instead?
No response
Operating System
No response
Deployment
Official Apache Airflow Helm Chart
Apache Airflow Provider(s)
No response
Versions of Apache Airflow Providers
No response
Official Helm Chart version
1.22.0 (latest released)
Kubernetes Version
No response
Helm Chart configuration
No response
Docker Image customizations
No response
Anything else?
A workaround exists by using Jinja templates to pass DagParams in partial
The root cause is in
serialize()not having a case forDagParamrunning into the default branch and serializing the object as string.I think either a separate
DagParamcase needs to be added toserialize()similar toXComArgorserialize_mapped_operator()needs to handle thepartial_kwargsops withserialize_template_field()or something similar.If somebody can point me in a direction for a fix, I am willing to contribute a PR.
Are you willing to submit PR?
Code of Conduct