Skip to content

Commit 1751401

Browse files
feat: Make tags and remove partition values allow null values in map (delta-io#2281)
## What changes are proposed in this pull request? - `remove.tags`, `remove.partitionValues`, `checkpointMetadata.tags`, `cdc.tags`, and `sidecar.tags` all allow null values in map. These null values will be ignored when read by kernel. - Extend the `#[allow_null_container_values]` derive macro attribute to support `Option<HashMap<...>>` fields The Delta protocol allows null values in partitionValues maps (a null partition value means the partition column is null for that file) and in tags maps. The Add action schema already handled this correctly, but Remove and other actions did not, causing Arrow validation errors when reading actions with null map values. ## How was this change tested? - New parameterized test read_actions_with_null_map_values covers null map values across all action types (remove, add, cdc, sidecar, checkpointMetadata). - NOTE: commitInfo.operationParameters and metadata.configuration are left as future work. - verified `#[allow_null_container_values]` works on Option<HashMap<...>> and fails on other non-hashmap Optionals. - Existing schema tests updated to expect nullable map values
1 parent 224ec79 commit 1751401

5 files changed

Lines changed: 231 additions & 12 deletions

File tree

derive-macros/src/lib.rs

Lines changed: 27 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -82,6 +82,26 @@ fn get_schema_name(name: &Ident) -> Ident {
8282
Ident::new(&ret, name.span())
8383
}
8484

85+
/// Check if a path segment is `Option<HashMap<K, V>>`.
86+
fn is_option_of_hashmap(seg: &syn::PathSegment) -> bool {
87+
if seg.ident != "Option" {
88+
return false;
89+
}
90+
let PathArguments::AngleBracketed(angle_args) = &seg.arguments else {
91+
return false;
92+
};
93+
// Option has exactly one type argument
94+
let Some(syn::GenericArgument::Type(Type::Path(inner_type))) = angle_args.args.first() else {
95+
return false;
96+
};
97+
// Check if the inner type's last segment is HashMap
98+
inner_type
99+
.path
100+
.segments
101+
.last()
102+
.is_some_and(|seg| seg.ident == "HashMap")
103+
}
104+
85105
fn gen_schema_fields(data: &Data) -> TokenStream {
86106
let fields = match data {
87107
Data::Struct(DataStruct {
@@ -119,11 +139,13 @@ fn gen_schema_fields(data: &Data) -> TokenStream {
119139
}
120140
});
121141
if have_schema_null {
122-
if let Some(last_ident) = type_path.path.segments.last().map(|seg| &seg.ident) {
123-
if last_ident != "HashMap" {
124-
return Error::new(
125-
last_ident.span(),
126-
format!("Can only use allow_null_container_values on HashMap fields, not {last_ident}")
142+
if let Some(last_seg) = type_path.path.segments.last() {
143+
let is_valid =
144+
last_seg.ident == "HashMap" || is_option_of_hashmap(last_seg);
145+
if !is_valid {
146+
return Error::new(
147+
last_seg.ident.span(),
148+
format!("Can only use allow_null_container_values on HashMap or Option<HashMap> fields, not {}", last_seg.ident)
127149
).to_compile_error()
128150
}
129151
}

kernel/src/actions/mod.rs

Lines changed: 18 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -786,7 +786,14 @@ pub(crate) struct Remove {
786786
#[cfg_attr(test, serde(skip_serializing_if = "Option::is_none"))]
787787
pub(crate) extended_file_metadata: Option<bool>,
788788

789-
/// A map from partition column to value for this logical file.
789+
/// A map from partition column to value for this logical file. This map can contain null in
790+
/// the values meaning a partition is null. We drop those values from this map, due to the
791+
/// `allow_null_container_values` annotation allowing them and because [`materialize`] drops
792+
/// null values. This means an engine can assume that if a partition is found in
793+
/// [`Metadata::partition_columns`] but not in this map, its value is null.
794+
///
795+
/// [`materialize`]: crate::engine_data::EngineMap::materialize
796+
#[allow_null_container_values]
790797
#[cfg_attr(test, serde(skip_serializing_if = "Option::is_none"))]
791798
pub(crate) partition_values: Option<HashMap<String, String>>,
792799

@@ -800,7 +807,8 @@ pub(crate) struct Remove {
800807
#[cfg_attr(test, serde(skip_serializing_if = "Option::is_none"))]
801808
pub stats: Option<String>,
802809

803-
/// Map containing metadata about this logical file.
810+
/// Map containing metadata about this logical file. Values can be null.
811+
#[allow_null_container_values]
804812
#[cfg_attr(test, serde(skip_serializing_if = "Option::is_none"))]
805813
pub(crate) tags: Option<HashMap<String, String>>,
806814

@@ -850,7 +858,8 @@ pub(crate) struct Cdc {
850858
/// data of the table
851859
pub data_change: bool,
852860

853-
/// Map containing metadata about this logical file.
861+
/// Map containing metadata about this logical file. Values can be null.
862+
#[allow_null_container_values]
854863
pub tags: Option<HashMap<String, String>>,
855864
}
856865

@@ -900,7 +909,8 @@ pub(crate) struct Sidecar {
900909
/// The time this logical file was created, as milliseconds since the epoch.
901910
pub modification_time: i64,
902911

903-
/// A map containing any additional metadata about the logicial file.
912+
/// A map containing any additional metadata about the logical file. Values can be null.
913+
#[allow_null_container_values]
904914
pub tags: Option<HashMap<String, String>>,
905915
}
906916

@@ -937,7 +947,8 @@ pub(crate) struct CheckpointMetadata {
937947
/// See issue #786 for tracking progress.
938948
pub(crate) version: i64,
939949

940-
/// Map containing any additional metadata about the V2 spec checkpoint.
950+
/// Map containing any additional metadata about the V2 spec checkpoint. Values can be null.
951+
#[allow_null_container_values]
941952
pub(crate) tags: Option<HashMap<String, String>>,
942953
}
943954

@@ -1153,14 +1164,14 @@ mod tests {
11531164
fn tags_field() -> StructField {
11541165
StructField::nullable(
11551166
"tags",
1156-
MapType::new(DataType::STRING, DataType::STRING, false),
1167+
MapType::new(DataType::STRING, DataType::STRING, true),
11571168
)
11581169
}
11591170

11601171
fn partition_values_field() -> StructField {
11611172
StructField::nullable(
11621173
"partitionValues",
1163-
MapType::new(DataType::STRING, DataType::STRING, false),
1174+
MapType::new(DataType::STRING, DataType::STRING, true),
11641175
)
11651176
}
11661177

kernel/src/doctests/to_schema.rs

Lines changed: 27 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -52,6 +52,33 @@ pub struct MacroTestStructWithAttributedField;
5252
#[cfg(doctest)]
5353
pub struct MacroTestStructWithInvalidAttributeTarget;
5454

55+
/// Verify that `#[allow_null_container_values]` works on `Option<HashMap<_, _>>` fields.
56+
/// This is needed for optional map fields like `Remove.partition_values` that can contain
57+
/// null values.
58+
/// ```
59+
/// # use delta_kernel_derive::ToSchema;
60+
/// # use std::collections::HashMap;
61+
/// #[derive(ToSchema)]
62+
/// pub struct WithOptionalAttributedField {
63+
/// #[allow_null_container_values]
64+
/// map_field: Option<HashMap<String, String>>,
65+
/// }
66+
/// ```
67+
#[cfg(doctest)]
68+
pub struct MacroTestStructWithOptionalAttributedField;
69+
70+
/// Verify that `#[allow_null_container_values]` fails on `Option<_>` fields that are not maps.
71+
/// ```compile_fail
72+
/// # use delta_kernel_derive::ToSchema;
73+
/// #[derive(ToSchema)]
74+
/// pub struct WithInvalidOptionalAttributeTarget {
75+
/// #[allow_null_container_values]
76+
/// some_name: Option<String>,
77+
/// }
78+
/// ```
79+
#[cfg(doctest)]
80+
pub struct MacroTestStructWithInvalidOptionalAttributeTarget;
81+
5582
/// ```compile_fail
5683
/// # use delta_kernel_derive::ToSchema;
5784
/// # use syn::Token;

kernel/src/log_segment/tests.rs

Lines changed: 152 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4233,3 +4233,155 @@ fn test_schema_to_is_not_null_predicate(
42334233
) {
42344234
assert_eq!(schema_to_is_not_null_predicate(&schema), expected);
42354235
}
4236+
4237+
/// Verify that `read_actions` correctly handles null values in map fields across all
4238+
/// action types. The Delta protocol allows null values in `partitionValues` maps (a null
4239+
/// partition value means the partition column is null for that file) and in `tags` maps.
4240+
///
4241+
/// Spark defaults all `Map[String, String]` types to `valueContainsNull = true`, and
4242+
/// checkpoint writing calls `schema.asNullable` which forces all maps nullable. The
4243+
/// schema must match this behavior.
4244+
///
4245+
/// This test reads JSON actions through `DefaultEngine` + `InMemory` store +
4246+
/// `log_segment.read_actions()`, then re-validates the resulting Arrow `StructArray` with
4247+
/// `StructArray::try_new`. Without the fix, non-nullable map value fields cause:
4248+
/// "Found unmasked nulls for non-nullable StructArray field 'value'"
4249+
#[rstest]
4250+
// remove.partitionValues.month: null
4251+
#[case::remove_partition_values(
4252+
"remove",
4253+
"partitionValues",
4254+
r#"{"remove":{"path":"file.parquet","deletionTimestamp":1000,"dataChange":true,"extendedFileMetadata":true,"partitionValues":{"year":"2024","month":null},"size":100}}"#
4255+
)]
4256+
// remove.tags.key2: null
4257+
#[case::remove_tags(
4258+
"remove",
4259+
"tags",
4260+
r#"{"remove":{"path":"file.parquet","deletionTimestamp":1000,"dataChange":true,"tags":{"key1":"val1","key2":null}}}"#
4261+
)]
4262+
// add.partitionValues.month: null
4263+
#[case::add_partition_values(
4264+
"add",
4265+
"partitionValues",
4266+
r#"{"add":{"path":"file.parquet","partitionValues":{"year":"2024","month":null},"size":100,"modificationTime":1000,"dataChange":true}}"#
4267+
)]
4268+
// add.tags.key2: null
4269+
#[case::add_tags(
4270+
"add",
4271+
"tags",
4272+
r#"{"add":{"path":"file.parquet","partitionValues":{},"size":100,"modificationTime":1000,"dataChange":true,"tags":{"key1":"val1","key2":null}}}"#
4273+
)]
4274+
// cdc.partitionValues.month: null
4275+
#[case::cdc_partition_values(
4276+
"cdc",
4277+
"partitionValues",
4278+
r#"{"cdc":{"path":"file.parquet","partitionValues":{"year":"2024","month":null},"size":100,"dataChange":false}}"#
4279+
)]
4280+
// cdc.tags.key2: null
4281+
#[case::cdc_tags(
4282+
"cdc",
4283+
"tags",
4284+
r#"{"cdc":{"path":"file.parquet","partitionValues":{},"size":100,"dataChange":false,"tags":{"key1":"val1","key2":null}}}"#
4285+
)]
4286+
// sidecar.tags.key2: null
4287+
#[case::sidecar_tags(
4288+
"sidecar",
4289+
"tags",
4290+
r#"{"sidecar":{"path":"sidecar.parquet","sizeInBytes":100,"modificationTime":1000,"tags":{"key1":"val1","key2":null}}}"#
4291+
)]
4292+
// checkpointMetadata.tags.key2: null
4293+
#[case::checkpoint_metadata_tags(
4294+
"checkpointMetadata",
4295+
"tags",
4296+
r#"{"checkpointMetadata":{"version":0,"tags":{"key1":"val1","key2":null}}}"#
4297+
)]
4298+
// Known issues: these map fields don't yet have #[allow_null_container_values].
4299+
// commitInfo.operationParameters.description: null
4300+
#[should_panic(expected = "StructArray re-validation failed")]
4301+
#[case::commit_info_operation_parameters_known_issue(
4302+
"commitInfo",
4303+
"operationParameters",
4304+
r#"{"commitInfo":{"timestamp":1000,"operation":"WRITE","operationParameters":{"mode":"ErrorIfExists","description":null}}}"#
4305+
)]
4306+
// metaData.configuration.key2: null
4307+
#[should_panic(expected = "StructArray re-validation failed")]
4308+
#[case::metadata_configuration_known_issue(
4309+
"metaData",
4310+
"configuration",
4311+
r#"{"metaData":{"id":"test","format":{"provider":"parquet","options":{}},"schemaString":"{\"type\":\"struct\",\"fields\":[]}","partitionColumns":[],"configuration":{"key1":"val1","key2":null},"createdTime":1000}}"#
4312+
)]
4313+
#[tokio::test]
4314+
async fn read_actions_with_null_map_values(
4315+
#[case] action_name: &str,
4316+
#[case] map_field: &str,
4317+
#[case] json_action: &str,
4318+
) {
4319+
use crate::arrow::array::{Array, AsArray, MapArray, StructArray};
4320+
4321+
let store = Arc::new(InMemory::new());
4322+
let log_root = Url::parse("memory:///_delta_log/").unwrap();
4323+
4324+
// Write a single commit file with the action containing null map values.
4325+
store
4326+
.put(
4327+
&delta_path_for_version(0, "json"),
4328+
json_action.to_string().into(),
4329+
)
4330+
.await
4331+
.unwrap();
4332+
4333+
// Build engine and read actions -- same as DeltaActionExtractor::get_actions.
4334+
let engine = DefaultEngineBuilder::new(store).build();
4335+
let log_segment =
4336+
LogSegment::for_table_changes(engine.storage_handler().as_ref(), log_root, 0, Some(0))
4337+
.unwrap();
4338+
4339+
// Use all_actions_schema to cover sidecar and checkpointMetadata (checkpoint-only actions).
4340+
let action_schema = get_all_actions_schema().clone();
4341+
let action_batches = log_segment
4342+
.read_actions(&engine, action_schema)
4343+
.expect("read_actions should succeed");
4344+
4345+
// Iterate batches and verify the map value field is nullable.
4346+
let mut found = false;
4347+
for batch_result in action_batches {
4348+
let actions_batch = batch_result.expect("Iterating action batches should succeed");
4349+
4350+
let data_any = actions_batch.actions.into_any();
4351+
let arrow_data = data_any
4352+
.downcast_ref::<ArrowEngineData>()
4353+
.expect("ArrowEngineData");
4354+
let rb = arrow_data.record_batch();
4355+
4356+
let Some(action_col) = rb.column_by_name(action_name) else {
4357+
continue;
4358+
};
4359+
let action_struct = action_col
4360+
.as_struct_opt()
4361+
.unwrap_or_else(|| panic!("{action_name} column should be a struct"));
4362+
let map_col = action_struct
4363+
.column_by_name(map_field)
4364+
.unwrap_or_else(|| panic!("{action_name}.{map_field} not found"));
4365+
let map_array = map_col
4366+
.as_any()
4367+
.downcast_ref::<MapArray>()
4368+
.unwrap_or_else(|| panic!("{action_name}.{map_field} should be a MapArray"));
4369+
// Re-validate the entries StructArray with its own schema, same as what Arrow's
4370+
// IPC deserializer does. Without the fix, this fails with:
4371+
// "Found unmasked nulls for non-nullable StructArray field 'value'"
4372+
let entries = map_array.entries();
4373+
StructArray::try_new(
4374+
entries.fields().clone(),
4375+
entries.columns().to_vec(),
4376+
entries.nulls().cloned(),
4377+
)
4378+
.unwrap_or_else(|e| {
4379+
panic!(
4380+
"{action_name}.{map_field} entries StructArray re-validation failed: {e}. \
4381+
This means the schema has non-nullable value field but the data has nulls."
4382+
)
4383+
});
4384+
found = true;
4385+
}
4386+
assert!(found, "Should have found a {action_name} action batch");
4387+
}

kernel/src/schema/derive_macro_utils.rs

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -123,3 +123,10 @@ impl<T: ToNullableContainerType> GetNullableContainerStructField for T {
123123
StructField::not_null(name, T::to_nullable_container_type())
124124
}
125125
}
126+
127+
// Optional container types produce nullable fields with nullable values.
128+
impl<T: ToNullableContainerType> GetNullableContainerStructField for Option<T> {
129+
fn get_nullable_container_struct_field(name: impl Into<String>) -> StructField {
130+
StructField::nullable(name, T::to_nullable_container_type())
131+
}
132+
}

0 commit comments

Comments
 (0)